summaryrefslogtreecommitdiffstats
path: root/net/sunrpc
diff options
context:
space:
mode:
authorRalf Baechle <ralf@linux-mips.org>2000-01-21 22:34:01 +0000
committerRalf Baechle <ralf@linux-mips.org>2000-01-21 22:34:01 +0000
commit9e30c3705aed9fbec4c3304570e4d6e707856bcb (patch)
treeb19e6acb5a67af31a4e7742e05c2166dc3f1444c /net/sunrpc
parent72919904796333a20c6a5d5c380091b42e407aa9 (diff)
Merge with Linux 2.3.22.
Diffstat (limited to 'net/sunrpc')
-rw-r--r--net/sunrpc/auth.c13
-rw-r--r--net/sunrpc/sched.c171
-rw-r--r--net/sunrpc/xprt.c721
3 files changed, 528 insertions, 377 deletions
diff --git a/net/sunrpc/auth.c b/net/sunrpc/auth.c
index 7c966779b..af9923ec8 100644
--- a/net/sunrpc/auth.c
+++ b/net/sunrpc/auth.c
@@ -15,6 +15,7 @@
#include <linux/errno.h>
#include <linux/socket.h>
#include <linux/sunrpc/clnt.h>
+#include <linux/spinlock.h>
#ifdef RPC_DEBUG
# define RPCDBG_FACILITY RPCDBG_AUTH
@@ -71,6 +72,8 @@ rpcauth_destroy(struct rpc_auth *auth)
auth->au_ops->destroy(auth);
}
+spinlock_t rpc_credcache_lock = SPIN_LOCK_UNLOCKED;
+
/*
* Initialize RPC credential cache
*/
@@ -94,6 +97,7 @@ rpcauth_free_credcache(struct rpc_auth *auth)
if (!(destroy = auth->au_ops->crdestroy))
destroy = (void (*)(struct rpc_cred *)) rpc_free;
+ spin_lock(&rpc_credcache_lock);
for (i = 0; i < RPC_CREDCACHE_NR; i++) {
q = &auth->au_credcache[i];
while ((cred = *q) != NULL) {
@@ -101,6 +105,7 @@ rpcauth_free_credcache(struct rpc_auth *auth)
destroy(cred);
}
}
+ spin_unlock(&rpc_credcache_lock);
}
/*
@@ -113,6 +118,7 @@ rpcauth_gc_credcache(struct rpc_auth *auth)
int i, safe = 0;
dprintk("RPC: gc'ing RPC credentials for auth %p\n", auth);
+ spin_lock(&rpc_credcache_lock);
for (i = 0; i < RPC_CREDCACHE_NR; i++) {
q = &auth->au_credcache[i];
while ((cred = *q) != NULL) {
@@ -129,6 +135,7 @@ rpcauth_gc_credcache(struct rpc_auth *auth)
q = &cred->cr_next;
}
}
+ spin_unlock(&rpc_credcache_lock);
while ((cred = free) != NULL) {
free = cred->cr_next;
rpc_free(cred);
@@ -145,10 +152,12 @@ rpcauth_insert_credcache(struct rpc_auth *auth, struct rpc_cred *cred)
int nr;
nr = (cred->cr_uid % RPC_CREDCACHE_NR);
+ spin_lock(&rpc_credcache_lock);
cred->cr_next = auth->au_credcache[nr];
auth->au_credcache[nr] = cred;
cred->cr_expire = jiffies + auth->au_expire;
cred->cr_count++;
+ spin_unlock(&rpc_credcache_lock);
}
/*
@@ -166,6 +175,7 @@ rpcauth_lookup_credcache(struct rpc_task *task)
if (time_before(auth->au_nextgc, jiffies))
rpcauth_gc_credcache(auth);
+ spin_lock(&rpc_credcache_lock);
q = &auth->au_credcache[nr];
while ((cred = *q) != NULL) {
if (auth->au_ops->crmatch(task, cred)) {
@@ -174,6 +184,7 @@ rpcauth_lookup_credcache(struct rpc_task *task)
}
q = &cred->cr_next;
}
+ spin_unlock(&rpc_credcache_lock);
if (!cred)
cred = auth->au_ops->crcreate(task);
@@ -194,6 +205,7 @@ rpcauth_remove_credcache(struct rpc_auth *auth, struct rpc_cred *cred)
int nr;
nr = (cred->cr_uid % RPC_CREDCACHE_NR);
+ spin_lock(&rpc_credcache_lock);
q = &auth->au_credcache[nr];
while ((cr = *q) != NULL) {
if (cred == cr) {
@@ -202,6 +214,7 @@ rpcauth_remove_credcache(struct rpc_auth *auth, struct rpc_cred *cred)
}
q = &cred->cr_next;
}
+ spin_unlock(&rpc_credcache_lock);
}
struct rpc_cred *
diff --git a/net/sunrpc/sched.c b/net/sunrpc/sched.c
index b4e0b4b76..181a7e46c 100644
--- a/net/sunrpc/sched.c
+++ b/net/sunrpc/sched.c
@@ -18,6 +18,7 @@
#include <linux/unistd.h>
#include <linux/smp.h>
#include <linux/smp_lock.h>
+#include <linux/spinlock.h>
#include <linux/sunrpc/clnt.h>
@@ -74,6 +75,16 @@ static u32 swap_buffer[PAGE_SIZE >> 2];
static int swap_buffer_used = 0;
/*
+ * Spinlock for wait queues. Access to the latter also has to be
+ * interrupt-safe in order to allow timers to wake up sleeping tasks.
+ */
+spinlock_t rpc_queue_lock = SPIN_LOCK_UNLOCKED;
+/*
+ * Spinlock for other critical sections of code.
+ */
+spinlock_t rpc_sched_lock = SPIN_LOCK_UNLOCKED;
+
+/*
* Add new request to wait queue.
*
* Swapper tasks always get inserted at the head of the queue.
@@ -81,8 +92,8 @@ static int swap_buffer_used = 0;
* improve overall performance.
* Everyone else gets appended to the queue to ensure proper FIFO behavior.
*/
-int
-rpc_add_wait_queue(struct rpc_wait_queue *queue, struct rpc_task *task)
+static int
+__rpc_add_wait_queue(struct rpc_wait_queue *queue, struct rpc_task *task)
{
if (task->tk_rpcwait) {
if (task->tk_rpcwait != queue)
@@ -104,12 +115,24 @@ rpc_add_wait_queue(struct rpc_wait_queue *queue, struct rpc_task *task)
return 0;
}
+int
+rpc_add_wait_queue(struct rpc_wait_queue *q, struct rpc_task *task)
+{
+ unsigned long oldflags;
+ int result;
+
+ spin_lock_irqsave(&rpc_queue_lock, oldflags);
+ result = __rpc_add_wait_queue(q, task);
+ spin_unlock_irqrestore(&rpc_queue_lock, oldflags);
+ return result;
+}
+
/*
* Remove request from queue.
- * Note: must be called with interrupts disabled.
+ * Note: must be called with spin lock held.
*/
-void
-rpc_remove_wait_queue(struct rpc_task *task)
+static void
+__rpc_remove_wait_queue(struct rpc_task *task)
{
struct rpc_wait_queue *queue;
@@ -122,6 +145,16 @@ rpc_remove_wait_queue(struct rpc_task *task)
task->tk_pid, queue, rpc_qname(queue));
}
+void
+rpc_remove_wait_queue(struct rpc_task *task)
+{
+ unsigned long oldflags;
+
+ spin_lock_irqsave(&rpc_queue_lock, oldflags);
+ __rpc_remove_wait_queue(task);
+ spin_unlock_irqrestore(&rpc_queue_lock, oldflags);
+}
+
/*
* Set up a timer for the current task.
*/
@@ -165,7 +198,7 @@ rpc_del_timer(struct rpc_task *task)
* Make an RPC task runnable.
*
* Note: If the task is ASYNC, this must be called with
- * interrupts disabled to protect the wait queue operation.
+ * the spinlock held to protect the wait queue operation.
*/
static inline void
rpc_make_runnable(struct rpc_task *task)
@@ -177,7 +210,7 @@ rpc_make_runnable(struct rpc_task *task)
task->tk_flags |= RPC_TASK_RUNNING;
if (RPC_IS_ASYNC(task)) {
int status;
- status = rpc_add_wait_queue(&schedq, task);
+ status = __rpc_add_wait_queue(&schedq, task);
if (status)
{
printk(KERN_WARNING "RPC: failed to add task to queue: error: %d!\n", status);
@@ -214,18 +247,12 @@ static void
__rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task,
rpc_action action, rpc_action timer)
{
- unsigned long oldflags;
int status;
dprintk("RPC: %4d sleep_on(queue \"%s\" time %ld)\n", task->tk_pid,
rpc_qname(q), jiffies);
- /*
- * Protect the execution below.
- */
- save_flags(oldflags); cli();
-
- status = rpc_add_wait_queue(q, task);
+ status = __rpc_add_wait_queue(q, task);
if (status)
{
printk(KERN_WARNING "RPC: failed to add task to queue: error: %d!\n", status);
@@ -240,7 +267,6 @@ __rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task,
task->tk_flags &= ~RPC_TASK_RUNNING;
}
- restore_flags(oldflags);
return;
}
@@ -248,11 +274,17 @@ void
rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task,
rpc_action action, rpc_action timer)
{
+ unsigned long oldflags;
+ /*
+ * Protect the queue operations.
+ */
+ spin_lock_irqsave(&rpc_queue_lock, oldflags);
__rpc_sleep_on(q, task, action, timer);
+ spin_unlock_irqrestore(&rpc_queue_lock, oldflags);
}
/*
- * Wake up a single task -- must be invoked with bottom halves off.
+ * Wake up a single task -- must be invoked with spin lock held.
*
* It would probably suffice to cli/sti the del_timer and remove_wait_queue
* operations individually.
@@ -272,7 +304,7 @@ __rpc_wake_up(struct rpc_task *task)
#endif
rpc_del_timer(task);
if (task->tk_rpcwait != &schedq)
- rpc_remove_wait_queue(task);
+ __rpc_remove_wait_queue(task);
if (!RPC_IS_RUNNING(task)) {
task->tk_flags |= RPC_TASK_CALLBACK;
rpc_make_runnable(task);
@@ -289,7 +321,7 @@ __rpc_default_timer(struct rpc_task *task)
dprintk("RPC: %d timeout (default timer)\n", task->tk_pid);
task->tk_status = -ETIMEDOUT;
task->tk_timeout = 0;
- __rpc_wake_up(task);
+ rpc_wake_up_task(task);
}
/*
@@ -300,9 +332,9 @@ rpc_wake_up_task(struct rpc_task *task)
{
unsigned long oldflags;
- save_flags(oldflags); cli();
+ spin_lock_irqsave(&rpc_queue_lock, oldflags);
__rpc_wake_up(task);
- restore_flags(oldflags);
+ spin_unlock_irqrestore(&rpc_queue_lock, oldflags);
}
/*
@@ -315,10 +347,10 @@ rpc_wake_up_next(struct rpc_wait_queue *queue)
struct rpc_task *task;
dprintk("RPC: wake_up_next(%p \"%s\")\n", queue, rpc_qname(queue));
- save_flags(oldflags); cli();
+ spin_lock_irqsave(&rpc_queue_lock, oldflags);
if ((task = queue->task) != 0)
__rpc_wake_up(task);
- restore_flags(oldflags);
+ spin_unlock_irqrestore(&rpc_queue_lock, oldflags);
return task;
}
@@ -331,10 +363,10 @@ rpc_wake_up(struct rpc_wait_queue *queue)
{
unsigned long oldflags;
- save_flags(oldflags); cli();
+ spin_lock_irqsave(&rpc_queue_lock, oldflags);
while (queue->task)
__rpc_wake_up(queue->task);
- restore_flags(oldflags);
+ spin_unlock_irqrestore(&rpc_queue_lock, oldflags);
}
/*
@@ -346,12 +378,12 @@ rpc_wake_up_status(struct rpc_wait_queue *queue, int status)
struct rpc_task *task;
unsigned long oldflags;
- save_flags(oldflags); cli();
+ spin_lock_irqsave(&rpc_queue_lock, oldflags);
while ((task = queue->task) != NULL) {
task->tk_status = status;
__rpc_wake_up(task);
}
- restore_flags(oldflags);
+ spin_unlock_irqrestore(&rpc_queue_lock, oldflags);
}
/*
@@ -369,7 +401,7 @@ static void
__rpc_atrun(struct rpc_task *task)
{
task->tk_status = 0;
- __rpc_wake_up(task);
+ rpc_wake_up_task(task);
}
/*
@@ -432,13 +464,13 @@ __rpc_execute(struct rpc_task *task)
* and the RPC reply arrives before we get here, it will
* have state RUNNING, but will still be on schedq.
*/
- save_flags(oldflags); cli();
+ spin_lock_irqsave(&rpc_queue_lock, oldflags);
if (RPC_IS_RUNNING(task)) {
if (task->tk_rpcwait == &schedq)
- rpc_remove_wait_queue(task);
+ __rpc_remove_wait_queue(task);
} else while (!RPC_IS_RUNNING(task)) {
if (RPC_IS_ASYNC(task)) {
- restore_flags(oldflags);
+ spin_unlock_irqrestore(&rpc_queue_lock, oldflags);
return 0;
}
@@ -448,9 +480,9 @@ __rpc_execute(struct rpc_task *task)
if (current->pid == rpciod_pid)
printk(KERN_ERR "RPC: rpciod waiting on sync task!\n");
- sti();
+ spin_unlock_irq(&rpc_queue_lock);
__wait_event(task->tk_wait, RPC_IS_RUNNING(task));
- cli();
+ spin_lock_irq(&rpc_queue_lock);
/*
* When the task received a signal, remove from
@@ -462,7 +494,7 @@ __rpc_execute(struct rpc_task *task)
dprintk("RPC: %4d sync task resuming\n",
task->tk_pid);
}
- restore_flags(oldflags);
+ spin_unlock_irqrestore(&rpc_queue_lock, oldflags);
/*
* When a sync task receives a signal, it exits with
@@ -522,15 +554,16 @@ __rpc_schedule(void)
int need_resched = current->need_resched;
dprintk("RPC: rpc_schedule enter\n");
- save_flags(oldflags);
while (1) {
- cli();
- if (!(task = schedq.task))
+ spin_lock_irqsave(&rpc_queue_lock, oldflags);
+ if (!(task = schedq.task)) {
+ spin_unlock_irqrestore(&rpc_queue_lock, oldflags);
break;
+ }
rpc_del_timer(task);
- rpc_remove_wait_queue(task);
+ __rpc_remove_wait_queue(task);
task->tk_flags |= RPC_TASK_RUNNING;
- restore_flags(oldflags);
+ spin_unlock_irqrestore(&rpc_queue_lock, oldflags);
__rpc_execute(task);
@@ -541,7 +574,6 @@ __rpc_schedule(void)
if (need_resched)
schedule();
}
- restore_flags(oldflags);
dprintk("RPC: rpc_schedule leave\n");
}
@@ -626,11 +658,13 @@ rpc_init_task(struct rpc_task *task, struct rpc_clnt *clnt,
task->tk_suid_retry = 1;
/* Add to global list of all tasks */
+ spin_lock(&rpc_sched_lock);
task->tk_next_task = all_tasks;
task->tk_prev_task = NULL;
if (all_tasks)
all_tasks->tk_prev_task = task;
all_tasks = task;
+ spin_unlock(&rpc_sched_lock);
if (clnt)
clnt->cl_users++;
@@ -679,10 +713,12 @@ void
rpc_release_task(struct rpc_task *task)
{
struct rpc_task *next, *prev;
+ unsigned long oldflags;
dprintk("RPC: %4d release task\n", task->tk_pid);
/* Remove from global task list */
+ spin_lock(&rpc_sched_lock);
prev = task->tk_prev_task;
next = task->tk_next_task;
if (next)
@@ -691,6 +727,19 @@ rpc_release_task(struct rpc_task *task)
prev->tk_next_task = next;
else
all_tasks = next;
+ task->tk_next_task = task->tk_prev_task = NULL;
+ spin_unlock(&rpc_sched_lock);
+
+ /* Protect the execution below. */
+ spin_lock_irqsave(&rpc_queue_lock, oldflags);
+
+ /* Delete any running timer */
+ rpc_del_timer(task);
+
+ /* Remove from any wait queue we're still on */
+ __rpc_remove_wait_queue(task);
+
+ spin_unlock_irqrestore(&rpc_queue_lock, oldflags);
/* Release resources */
if (task->tk_rqstp)
@@ -738,12 +787,15 @@ rpc_find_parent(struct rpc_task *child)
static void
rpc_child_exit(struct rpc_task *child)
{
+ unsigned long oldflags;
struct rpc_task *parent;
+ spin_lock_irqsave(&rpc_queue_lock, oldflags);
if ((parent = rpc_find_parent(child)) != NULL) {
parent->tk_status = child->tk_status;
- rpc_wake_up_task(parent);
+ __rpc_wake_up(parent);
}
+ spin_unlock_irqrestore(&rpc_queue_lock, oldflags);
rpc_release_task(child);
}
@@ -772,11 +824,11 @@ rpc_run_child(struct rpc_task *task, struct rpc_task *child, rpc_action func)
{
unsigned long oldflags;
- save_flags(oldflags); cli();
- rpc_make_runnable(child);
- restore_flags(oldflags);
+ spin_lock_irqsave(&rpc_queue_lock, oldflags);
/* N.B. Is it possible for the child to have already finished? */
- rpc_sleep_on(&childq, task, func, NULL);
+ __rpc_sleep_on(&childq, task, func, NULL);
+ rpc_make_runnable(child);
+ spin_unlock_irqrestore(&rpc_queue_lock, oldflags);
}
/*
@@ -789,8 +841,11 @@ rpc_killall_tasks(struct rpc_clnt *clnt)
struct rpc_task **q, *rovr;
dprintk("RPC: killing all tasks for client %p\n", clnt);
- /* N.B. Why bother to inhibit? Nothing blocks here ... */
- rpc_inhibit++;
+
+ /*
+ * Spin lock all_tasks to prevent changes...
+ */
+ spin_lock(&rpc_sched_lock);
for (q = &all_tasks; (rovr = *q); q = &rovr->tk_next_task) {
if (!clnt || rovr->tk_client == clnt) {
rovr->tk_flags |= RPC_TASK_KILLED;
@@ -798,11 +853,18 @@ rpc_killall_tasks(struct rpc_clnt *clnt)
rpc_wake_up_task(rovr);
}
}
- rpc_inhibit--;
+ spin_unlock(&rpc_sched_lock);
}
static DECLARE_MUTEX_LOCKED(rpciod_running);
+static inline int
+rpciod_task_pending(void)
+{
+ return schedq.task != NULL || xprt_tcp_pending();
+}
+
+
/*
* This is the rpciod kernel thread
*/
@@ -810,7 +872,6 @@ static int
rpciod(void *ptr)
{
wait_queue_head_t *assassin = (wait_queue_head_t*) ptr;
- unsigned long oldflags;
int rounds = 0;
MOD_INC_USE_COUNT;
@@ -845,18 +906,15 @@ rpciod(void *ptr)
schedule();
rounds = 0;
}
- save_flags(oldflags); cli();
dprintk("RPC: rpciod running checking dispatch\n");
rpciod_tcp_dispatcher();
- if (!schedq.task) {
+ if (!rpciod_task_pending()) {
dprintk("RPC: rpciod back to sleep\n");
- interruptible_sleep_on(&rpciod_idle);
+ wait_event_interruptible(rpciod_idle, rpciod_task_pending());
dprintk("RPC: switch to rpciod\n");
- rpciod_tcp_dispatcher();
rounds = 0;
}
- restore_flags(oldflags);
}
dprintk("RPC: rpciod shutdown commences\n");
@@ -983,8 +1041,12 @@ void rpc_show_tasks(void)
struct rpc_task *t = all_tasks, *next;
struct nfs_wreq *wreq;
- if (!t)
+ spin_lock(&rpc_sched_lock);
+ t = all_tasks;
+ if (!t) {
+ spin_unlock(&rpc_sched_lock);
return;
+ }
printk("-pid- proc flgs status -client- -prog- --rqstp- -timeout "
"-rpcwait -action- --exit--\n");
for (; t; t = next) {
@@ -1007,5 +1069,6 @@ void rpc_show_tasks(void)
wreq->wb_file->f_dentry->d_parent->d_name.name,
wreq->wb_file->f_dentry->d_name.name);
}
+ spin_unlock(&rpc_sched_lock);
}
#endif
diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c
index 610cfc07b..fb09aac28 100644
--- a/net/sunrpc/xprt.c
+++ b/net/sunrpc/xprt.c
@@ -31,12 +31,16 @@
* primitives that `transparently' work for processes as well as async
* tasks that rely on callbacks.
*
- * Copyright (C) 1995, 1996, Olaf Kirch <okir@monad.swb.de>
+ * Copyright (C) 1995-1997, Olaf Kirch <okir@monad.swb.de>
*
* TCP callback races fixes (C) 1998 Red Hat Software <alan@redhat.com>
* TCP send fixes (C) 1998 Red Hat Software <alan@redhat.com>
* TCP NFS related read + write fixes
* (C) 1999 Dave Airlie, University of Limerick, Ireland <airlied@linux.ie>
+ *
+ * Rewrite of larges part of the code in order to stabilize TCP stuff.
+ * Fix behaviour when socket buffer is full.
+ * (C) 1999 Trond Myklebust <trond.myklebust@fys.uio.no>
*/
#define __KERNEL_SYSCALLS__
@@ -62,6 +66,8 @@
#include <asm/uaccess.h>
#define SOCK_HAS_USER_DATA
+/* Following value should be > 32k + RPC overhead */
+#define XPRT_MIN_WRITE_SPACE 35000
/*
* Local variables
@@ -70,6 +76,9 @@
static struct rpc_xprt * sock_list = NULL;
#endif
+/* Spinlock for critical sections in the code. */
+spinlock_t xprt_lock = SPIN_LOCK_UNLOCKED;
+
#ifdef RPC_DEBUG
# undef RPC_DEBUG_DATA
# define RPCDBG_FACILITY RPCDBG_XPRT
@@ -84,11 +93,13 @@ static struct rpc_xprt * sock_list = NULL;
* Local functions
*/
static void xprt_request_init(struct rpc_task *, struct rpc_xprt *);
+static void do_xprt_transmit(struct rpc_task *);
static void xprt_transmit_status(struct rpc_task *task);
+static void xprt_transmit_timeout(struct rpc_task *task);
static void xprt_receive_status(struct rpc_task *task);
static void xprt_reserve_status(struct rpc_task *task);
+static void xprt_disconnect(struct rpc_xprt *);
static void xprt_reconn_timeout(struct rpc_task *task);
-static void xprt_reconn_status(struct rpc_task *task);
static struct socket *xprt_create_socket(int, struct sockaddr_in *,
struct rpc_timeout *);
@@ -144,39 +155,35 @@ xprt_from_sock(struct sock *sk)
* Adjust the iovec to move on 'n' bytes
*/
-extern inline void xprt_move_iov(struct msghdr *msg, struct iovec *niv, int amount)
+extern inline void
+xprt_move_iov(struct msghdr *msg, struct iovec *niv, int amount)
{
struct iovec *iv=msg->msg_iov;
+ int i;
/*
* Eat any sent iovecs
*/
-
- while(iv->iov_len < amount)
- {
- amount-=iv->iov_len;
+ while(iv->iov_len <= amount) {
+ amount -= iv->iov_len;
iv++;
msg->msg_iovlen--;
}
-
- msg->msg_iov=niv;
-
+
/*
* And chew down the partial one
*/
-
niv[0].iov_len = iv->iov_len-amount;
niv[0].iov_base =((unsigned char *)iv->iov_base)+amount;
iv++;
-
+
/*
* And copy any others
*/
-
- for(amount=1;amount<msg->msg_iovlen; amount++)
- {
- niv[amount]=*iv++;
- }
+ for(i = 1; i < msg->msg_iovlen; i++)
+ niv[i]=*iv++;
+
+ msg->msg_iov=niv;
}
/*
@@ -184,43 +191,42 @@ extern inline void xprt_move_iov(struct msghdr *msg, struct iovec *niv, int amou
*/
static inline int
-xprt_sendmsg(struct rpc_xprt *xprt)
+xprt_sendmsg(struct rpc_xprt *xprt, struct rpc_rqst *req)
{
struct socket *sock = xprt->sock;
struct msghdr msg;
mm_segment_t oldfs;
int result;
+ int slen = req->rq_slen - req->rq_bytes_sent;
struct iovec niv[MAX_IOVEC];
+ if (slen == 0)
+ return 0;
+
xprt_pktdump("packet data:",
- xprt->snd_buf.io_vec->iov_base,
- xprt->snd_buf.io_vec->iov_len);
+ req->rq_svec->iov_base,
+ req->rq_svec->iov_len);
msg.msg_flags = MSG_DONTWAIT;
- msg.msg_iov = xprt->snd_buf.io_vec;
- msg.msg_iovlen = xprt->snd_buf.io_nr;
+ msg.msg_iov = req->rq_svec;
+ msg.msg_iovlen = req->rq_snr;
msg.msg_name = (struct sockaddr *) &xprt->addr;
msg.msg_namelen = sizeof(xprt->addr);
msg.msg_control = NULL;
msg.msg_controllen = 0;
/* Dont repeat bytes */
-
- if(xprt->snd_sent)
- xprt_move_iov(&msg, niv, xprt->snd_sent);
-
+ if (req->rq_bytes_sent)
+ xprt_move_iov(&msg, niv, req->rq_bytes_sent);
+
oldfs = get_fs(); set_fs(get_ds());
- result = sock_sendmsg(sock, &msg, xprt->snd_buf.io_len);
+ result = sock_sendmsg(sock, &msg, slen);
set_fs(oldfs);
- dprintk("RPC: xprt_sendmsg(%d) = %d\n",
- xprt->snd_buf.io_len, result);
+ dprintk("RPC: xprt_sendmsg(%d) = %d\n", slen, result);
- if (result >= 0) {
- xprt->snd_buf.io_len -= result;
- xprt->snd_sent += result;
+ if (result >= 0)
return result;
- }
switch (result) {
case -ECONNREFUSED:
@@ -229,9 +235,14 @@ xprt_sendmsg(struct rpc_xprt *xprt)
*/
break;
case -EAGAIN:
- return 0;
- case -ENOTCONN: case -EPIPE:
+ if (sock->flags & SO_NOSPACE)
+ result = -ENOMEM;
+ break;
+ case -ENOTCONN:
+ case -EPIPE:
/* connection broken */
+ if (xprt->stream)
+ result = -ENOTCONN;
break;
default:
printk(KERN_NOTICE "RPC: sendmsg returned error %d\n", -result);
@@ -252,7 +263,6 @@ xprt_recvmsg(struct rpc_xprt *xprt, struct iovec *iov, int nr, int len)
mm_segment_t oldfs;
int result;
-#if LINUX_VERSION_CODE >= 0x020100
msg.msg_flags = MSG_DONTWAIT;
msg.msg_iov = iov;
msg.msg_iovlen = nr;
@@ -264,20 +274,6 @@ xprt_recvmsg(struct rpc_xprt *xprt, struct iovec *iov, int nr, int len)
oldfs = get_fs(); set_fs(get_ds());
result = sock_recvmsg(sock, &msg, len, MSG_DONTWAIT);
set_fs(oldfs);
-#else
- int alen = sizeof(sin);
- msg.msg_flags = 0;
- msg.msg_iov = iov;
- msg.msg_iovlen = nr;
- msg.msg_name = &sin;
- msg.msg_namelen = sizeof(sin);
- msg.msg_control = NULL;
- msg.msg_controllen = 0;
-
- oldfs = get_fs(); set_fs(get_ds());
- result = sock->ops->recvmsg(sock, &msg, len, 1, 0, &alen);
- set_fs(oldfs);
-#endif
dprintk("RPC: xprt_recvmsg(iov %p, len %d) = %d\n",
iov, len, result);
@@ -354,13 +350,15 @@ xprt_close(struct rpc_xprt *xprt)
{
struct sock *sk = xprt->inet;
+ xprt_disconnect(xprt);
+
#ifdef SOCK_HAS_USER_DATA
sk->user_data = NULL;
#endif
sk->data_ready = xprt->old_data_ready;
- sk->no_check = 0;
sk->state_change = xprt->old_state_change;
sk->write_space = xprt->old_write_space;
+ sk->no_check = 0;
sock_release(xprt->sock);
/*
@@ -378,9 +376,16 @@ static void
xprt_disconnect(struct rpc_xprt *xprt)
{
dprintk("RPC: disconnected transport %p\n", xprt);
+ xprt->connected = 0;
+ xprt->tcp_offset = 0;
+ xprt->tcp_more = 0;
+ xprt->tcp_total = 0;
+ xprt->tcp_reclen = 0;
+ xprt->tcp_copied = 0;
+ xprt->tcp_rqstp = NULL;
+ xprt->rx_pending_flag = 0;
rpc_wake_up_status(&xprt->pending, -ENOTCONN);
rpc_wake_up_status(&xprt->sending, -ENOTCONN);
- xprt->connected = 0;
}
/*
@@ -398,22 +403,33 @@ xprt_reconnect(struct rpc_task *task)
task->tk_pid, xprt, xprt->connected);
task->tk_status = 0;
+ if (xprt->shutdown)
+ return;
+
+ if (!xprt->stream)
+ return;
+
+ start_bh_atomic();
+ if (xprt->connected) {
+ end_bh_atomic();
+ return;
+ }
if (xprt->connecting) {
task->tk_timeout = xprt->timeout.to_maxval;
rpc_sleep_on(&xprt->reconn, task, NULL, NULL);
+ end_bh_atomic();
return;
}
xprt->connecting = 1;
+ end_bh_atomic();
/* Create an unconnected socket */
- if (!(sock = xprt_create_socket(xprt->prot, NULL, &xprt->timeout)))
+ if (!(sock = xprt_create_socket(xprt->prot, NULL, &xprt->timeout))) {
+ xprt->connecting = 0;
goto defer;
+ }
-#if LINUX_VERSION_CODE >= 0x020100
inet = sock->sk;
-#else
- inet = (struct sock *) sock->data;
-#endif
inet->data_ready = xprt->inet->data_ready;
inet->state_change = xprt->inet->state_change;
inet->write_space = xprt->inet->write_space;
@@ -422,18 +438,18 @@ xprt_reconnect(struct rpc_task *task)
#endif
dprintk("RPC: %4d closing old socket\n", task->tk_pid);
- xprt_disconnect(xprt);
xprt_close(xprt);
- /* Reset to new socket and default congestion */
+ /* Reset to new socket */
xprt->sock = sock;
xprt->inet = inet;
- xprt->cwnd = RPC_INITCWND;
/* Now connect it asynchronously. */
dprintk("RPC: %4d connecting new socket\n", task->tk_pid);
status = sock->ops->connect(sock, (struct sockaddr *) &xprt->addr,
sizeof(xprt->addr), O_NONBLOCK);
+
+ xprt->connecting = 0;
if (status < 0) {
if (status != -EINPROGRESS && status != -EALREADY) {
printk("RPC: TCP connect error %d!\n", -status);
@@ -447,37 +463,19 @@ xprt_reconnect(struct rpc_task *task)
start_bh_atomic();
if (!xprt->connected) {
rpc_sleep_on(&xprt->reconn, task,
- xprt_reconn_status, xprt_reconn_timeout);
+ NULL, xprt_reconn_timeout);
end_bh_atomic();
return;
}
end_bh_atomic();
}
- xprt->connecting = 0;
- rpc_wake_up(&xprt->reconn);
- return;
defer:
- task->tk_timeout = 30 * HZ;
- rpc_sleep_on(&xprt->reconn, task, NULL, NULL);
- xprt->connecting = 0;
-}
-
-/*
- * Reconnect status
- */
-static void
-xprt_reconn_status(struct rpc_task *task)
-{
- struct rpc_xprt *xprt = task->tk_xprt;
-
- dprintk("RPC: %4d xprt_reconn_status %d\n",
- task->tk_pid, task->tk_status);
- if (!xprt->connected && task->tk_status != -ETIMEDOUT) {
- task->tk_timeout = 30 * HZ;
- rpc_sleep_on(&xprt->reconn, task, NULL, xprt_reconn_timeout);
- }
+ start_bh_atomic();
+ if (!xprt->connected)
+ rpc_wake_up_next(&xprt->reconn);
+ end_bh_atomic();
}
/*
@@ -490,11 +488,19 @@ xprt_reconn_timeout(struct rpc_task *task)
dprintk("RPC: %4d xprt_reconn_timeout %d\n",
task->tk_pid, task->tk_status);
task->tk_status = -ENOTCONN;
- task->tk_xprt->connecting = 0;
+ start_bh_atomic();
+ if (task->tk_xprt->connecting)
+ task->tk_xprt->connecting = 0;
+ if (!task->tk_xprt->connected)
+ task->tk_status = -ENOTCONN;
+ else
+ task->tk_status = -ETIMEDOUT;
+ end_bh_atomic();
task->tk_timeout = 0;
rpc_wake_up_task(task);
}
+extern spinlock_t rpc_queue_lock;
/*
* Look up the RPC request corresponding to a reply.
*/
@@ -503,22 +509,28 @@ xprt_lookup_rqst(struct rpc_xprt *xprt, u32 xid)
{
struct rpc_task *head, *task;
struct rpc_rqst *req;
+ unsigned long oldflags;
int safe = 0;
+ spin_lock_irqsave(&rpc_queue_lock, oldflags);
if ((head = xprt->pending.task) != NULL) {
task = head;
do {
if ((req = task->tk_rqstp) && req->rq_xid == xid)
- return req;
+ goto out;
task = task->tk_next;
if (++safe > 100) {
printk("xprt_lookup_rqst: loop in Q!\n");
- return NULL;
+ goto out_bad;
}
} while (task != head);
}
dprintk("RPC: unknown XID %08x in reply.\n", xid);
- return NULL;
+ out_bad:
+ req = NULL;
+ out:
+ spin_unlock_irqrestore(&rpc_queue_lock, oldflags);
+ return req;
}
/*
@@ -559,11 +571,13 @@ xprt_complete_rqst(struct rpc_xprt *xprt, struct rpc_rqst *req, int copied)
dprintk("RPC: %4d has input (%d bytes)\n", task->tk_pid, copied);
task->tk_status = copied;
- rpc_wake_up_task(task);
+ if (!RPC_IS_RUNNING(task))
+ rpc_wake_up_task(task);
return;
}
-/* We have set things up such that we perform the checksum of the UDP
+/*
+ * We have set things up such that we perform the checksum of the UDP
* packet in parallel with the copies into the RPC client iovec. -DaveM
*/
static int csum_partial_copy_to_page_cache(struct iovec *iov,
@@ -609,7 +623,8 @@ static int csum_partial_copy_to_page_cache(struct iovec *iov,
return 0;
}
-/* Input handler for RPC replies. Called from a bottom half and hence
+/*
+ * Input handler for RPC replies. Called from a bottom half and hence
* atomic.
*/
static inline void
@@ -621,12 +636,15 @@ udp_data_ready(struct sock *sk, int len)
int err, repsize, copied;
dprintk("RPC: udp_data_ready...\n");
- if (!(xprt = xprt_from_sock(sk)))
+ if (!(xprt = xprt_from_sock(sk))) {
+ printk("RPC: udp_data_ready request not found!\n");
return;
+ }
+
dprintk("RPC: udp_data_ready client %p\n", xprt);
if ((skb = skb_recv_datagram(sk, 0, 1, &err)) == NULL)
- return;
+ goto out_err;
repsize = skb->len - sizeof(struct udphdr);
if (repsize < 4) {
@@ -646,6 +664,7 @@ udp_data_ready(struct sock *sk, int len)
if ((copied = rovr->rq_rlen) > repsize)
copied = repsize;
+ rovr->rq_damaged = 1;
/* Suck it into the iovec, verify checksum if not done by hw. */
if (csum_partial_copy_to_page_cache(rovr->rq_rvec, skb, copied))
goto dropit;
@@ -658,6 +677,8 @@ udp_data_ready(struct sock *sk, int len)
dropit:
skb_free_datagram(sk, skb);
return;
+out_err:
+ return;
}
/*
@@ -679,6 +700,7 @@ tcp_input_record(struct rpc_xprt *xprt)
int result, maxcpy, reclen, avail, want;
dprintk("RPC: tcp_input_record\n");
+
offset = xprt->tcp_offset;
result = -EAGAIN;
if (offset < 4 || (!xprt->tcp_more && offset < 8)) {
@@ -687,11 +709,6 @@ tcp_input_record(struct rpc_xprt *xprt)
riov.iov_base = xprt->tcp_recm.data + offset;
riov.iov_len = want;
result = xprt_recvmsg(xprt, &riov, 1, want);
- if (!result)
- {
- dprintk("RPC: empty TCP record.\n");
- return -ENOTCONN;
- }
if (result < 0)
goto done;
offset += result;
@@ -733,9 +750,9 @@ tcp_input_record(struct rpc_xprt *xprt)
dprintk("RPC: %4d TCP receiving %d bytes\n",
req->rq_task->tk_pid, want);
+ /* Request must be re-encoded before retransmit */
+ req->rq_damaged = 1;
result = xprt_recvmsg(xprt, xprt->tcp_iovec, req->rq_rnr, want);
- if (!result && want)
- result = -EAGAIN;
if (result < 0)
goto done;
xprt->tcp_copied += result;
@@ -754,12 +771,10 @@ tcp_input_record(struct rpc_xprt *xprt)
xprt->tcp_copied = 0;
xprt->tcp_rqstp = NULL;
}
- /* Request must be re-encoded before retransmit */
- req->rq_damaged = 1;
}
/* Skip over any trailing bytes on short reads */
- while (avail) {
+ while (avail > 0) {
static u8 dummy[64];
want = MIN(avail, sizeof(dummy));
@@ -767,8 +782,6 @@ tcp_input_record(struct rpc_xprt *xprt)
riov.iov_len = want;
dprintk("RPC: TCP skipping %d bytes\n", want);
result = xprt_recvmsg(xprt, &riov, 1, want);
- if (!result && want)
- result=-EAGAIN;
if (result < 0)
goto done;
offset += result;
@@ -789,55 +802,40 @@ done:
return result;
}
-static __inline__ void tcp_output_record(struct rpc_xprt *xprt)
-{
- if(xprt->snd_sent && xprt->snd_task)
- dprintk("RPC: write space\n");
- if(xprt->write_space == 0)
- {
- xprt->write_space = 1;
- if (xprt->snd_task && !RPC_IS_RUNNING(xprt->snd_task))
- {
- if(xprt->snd_sent)
- dprintk("RPC: Write wakeup snd_sent =%d\n",
- xprt->snd_sent);
- rpc_wake_up_task(xprt->snd_task);
- }
- }
-}
-
/*
* TCP task queue stuff
*/
-static struct rpc_xprt *rpc_rx_xprt_pending = NULL; /* Chain by rx_pending of rpc_xprt's */
-static struct rpc_xprt *rpc_tx_xprt_pending = NULL; /* Chain by tx_pending of rpc_xprt's */
+static struct rpc_xprt *rpc_xprt_pending = NULL; /* Chain by rx_pending of rpc_xprt's */
/*
* This is protected from tcp_data_ready and the stack as its run
* inside of the RPC I/O daemon
*/
-
-void rpciod_tcp_dispatcher(void)
+static void
+do_rpciod_tcp_dispatcher(void)
{
struct rpc_xprt *xprt;
int result;
dprintk("rpciod_tcp_dispatcher: Queue Running\n");
-
+
/*
* Empty each pending socket
*/
-
- while((xprt=rpc_rx_xprt_pending)!=NULL)
- {
+
+ while(1) {
int safe_retry=0;
-
- rpc_rx_xprt_pending=xprt->rx_pending;
- xprt->rx_pending_flag=0;
-
+
+ if ((xprt = rpc_xprt_pending) == NULL) {
+ break;
+ }
+ xprt->rx_pending_flag = 0;
+ rpc_xprt_pending=xprt->rx_pending;
+ xprt->rx_pending = NULL;
+
dprintk("rpciod_tcp_dispatcher: Processing %p\n", xprt);
-
+
do
{
if (safe_retry++ > 50)
@@ -845,28 +843,30 @@ void rpciod_tcp_dispatcher(void)
result = tcp_input_record(xprt);
}
while (result >= 0);
-
+
switch (result) {
case -EAGAIN:
- continue;
case -ENOTCONN:
case -EPIPE:
- xprt_disconnect(xprt);
continue;
default:
printk(KERN_WARNING "RPC: unexpected error %d from tcp_input_record\n",
result);
}
}
+}
- while((xprt=rpc_tx_xprt_pending)!=NULL)
- {
- rpc_tx_xprt_pending = xprt->tx_pending;
- xprt->tx_pending_flag = 0;
- tcp_output_record(xprt);
- }
+void rpciod_tcp_dispatcher(void)
+{
+ start_bh_atomic();
+ do_rpciod_tcp_dispatcher();
+ end_bh_atomic();
}
+int xprt_tcp_pending(void)
+{
+ return rpc_xprt_pending != NULL;
+}
extern inline void tcp_rpciod_queue(void)
{
@@ -890,6 +890,7 @@ static void tcp_data_ready(struct sock *sk, int len)
printk("Not a socket with xprt %p\n", sk);
return;
}
+
dprintk("RPC: tcp_data_ready client %p\n", xprt);
dprintk("RPC: state %x conn %d dead %d zapped %d\n",
sk->state, xprt->connected,
@@ -898,24 +899,16 @@ static void tcp_data_ready(struct sock *sk, int len)
* If we are not waiting for the RPC bh run then
* we are now
*/
- if (!xprt->rx_pending_flag)
- {
- int start_queue=0;
+ if (!xprt->rx_pending_flag) {
+ dprintk("RPC: xprt queue %p\n", rpc_xprt_pending);
- dprintk("RPC: xprt queue %p\n", rpc_rx_xprt_pending);
- if(rpc_rx_xprt_pending==NULL)
- start_queue=1;
+ xprt->rx_pending=rpc_xprt_pending;
+ rpc_xprt_pending=xprt;
xprt->rx_pending_flag=1;
- xprt->rx_pending=rpc_rx_xprt_pending;
- rpc_rx_xprt_pending=xprt;
- if (start_queue)
- {
- tcp_rpciod_queue();
- start_queue=0;
- }
- }
- else
+ } else
dprintk("RPC: xprt queued already %p\n", xprt);
+ tcp_rpciod_queue();
+
}
@@ -931,17 +924,32 @@ tcp_state_change(struct sock *sk)
sk->state, xprt->connected,
sk->dead, sk->zapped);
- if (sk->state == TCP_ESTABLISHED && !xprt->connected) {
+ switch(sk->state) {
+ case TCP_ESTABLISHED:
+ if (xprt->connected)
+ break;
xprt->connected = 1;
xprt->connecting = 0;
rpc_wake_up(&xprt->reconn);
- } else if (sk->zapped) {
- rpc_wake_up_status(&xprt->pending, -ENOTCONN);
- rpc_wake_up_status(&xprt->sending, -ENOTCONN);
+ rpc_wake_up_next(&xprt->sending);
+ tcp_rpciod_queue();
+ break;
+ case TCP_CLOSE:
+ if (xprt->connecting)
+ break;
+ xprt_disconnect(xprt);
rpc_wake_up_status(&xprt->reconn, -ENOTCONN);
+ break;
+ default:
+ break;
}
+
}
+/*
+ * The following 2 routines allow a task to sleep while socket memory is
+ * low.
+ */
static void
tcp_write_space(struct sock *sk)
{
@@ -949,17 +957,43 @@ tcp_write_space(struct sock *sk)
if (!(xprt = xprt_from_sock(sk)))
return;
- if (!xprt->tx_pending_flag) {
- int start_queue = 0;
-
- if (rpc_tx_xprt_pending == NULL)
- start_queue = 1;
- xprt->tx_pending_flag = 1;
- xprt->tx_pending = rpc_tx_xprt_pending;
- rpc_tx_xprt_pending = xprt;
- if (start_queue)
- tcp_rpciod_queue();
- }
+
+ /* Wait until we have enough socket memory */
+ if (sock_wspace(sk) < min(sk->sndbuf,XPRT_MIN_WRITE_SPACE))
+ return;
+
+ if (xprt->write_space)
+ return;
+
+ xprt->write_space = 1;
+
+ if (!xprt->snd_task)
+ rpc_wake_up_next(&xprt->sending);
+ else if (!RPC_IS_RUNNING(xprt->snd_task))
+ rpc_wake_up_task(xprt->snd_task);
+}
+
+static void
+udp_write_space(struct sock *sk)
+{
+ struct rpc_xprt *xprt;
+
+ if (!(xprt = xprt_from_sock(sk)))
+ return;
+
+
+ /* Wait until we have enough socket memory */
+ if (sock_wspace(sk) < min(sk->sndbuf,XPRT_MIN_WRITE_SPACE))
+ return;
+
+ if (xprt->write_space)
+ return;
+
+ xprt->write_space = 1;
+ if (!xprt->snd_task)
+ rpc_wake_up_next(&xprt->sending);
+ else if (!RPC_IS_RUNNING(xprt->snd_task))
+ rpc_wake_up_task(xprt->snd_task);
}
/*
@@ -982,32 +1016,50 @@ xprt_timer(struct rpc_task *task)
rpc_wake_up_task(task);
}
+
/*
- * (Partly) transmit the RPC packet
- * Note that task->tk_status is either 0 or negative on return.
- * Only when the reply is received will the status be set to a
- * positive value.
+ * Serialize access to sockets, in order to prevent different
+ * requests from interfering with each other.
*/
-static inline int
-xprt_transmit_some(struct rpc_xprt *xprt, struct rpc_task *task)
+static int
+xprt_down_transmit(struct rpc_task *task)
{
+ struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
struct rpc_rqst *req = task->tk_rqstp;
- int result;
- task->tk_status = 0;
- if ((result = xprt_sendmsg(xprt)) >= 0) {
- if (!xprt->snd_buf.io_len || !xprt->stream) {
- rpc_wake_up_next(&xprt->sending);
- return req->rq_slen;
- }
- result = -EAGAIN;
- } else if (xprt->stream) {
- if (result == -ENOTCONN || result == -EPIPE) {
- xprt_disconnect(xprt);
- result = -ENOTCONN;
- }
+ start_bh_atomic();
+ spin_lock(&xprt_lock);
+ if (xprt->snd_task && xprt->snd_task != task) {
+ dprintk("RPC: %4d TCP write queue full (task %d)\n",
+ task->tk_pid, xprt->snd_task->tk_pid);
+ task->tk_timeout = req->rq_timeout.to_current;
+ rpc_sleep_on(&xprt->sending, task, xprt_transmit, NULL);
+ } else if (!xprt->snd_task) {
+ xprt->snd_task = task;
+#ifdef RPC_PROFILE
+ req->rq_xtime = jiffies;
+#endif
+ req->rq_bytes_sent = 0;
+ }
+ spin_unlock(&xprt_lock);
+ end_bh_atomic();
+ return xprt->snd_task == task;
+}
+
+/*
+ * Releases the socket for use by other requests.
+ */
+static void
+xprt_up_transmit(struct rpc_task *task)
+{
+ struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
+
+ if (xprt->snd_task && xprt->snd_task == task) {
+ start_bh_atomic();
+ xprt->snd_task = NULL;
+ rpc_wake_up_next(&xprt->sending);
+ end_bh_atomic();
}
- return task->tk_status = result;
}
/*
@@ -1020,71 +1072,65 @@ xprt_transmit(struct rpc_task *task)
struct rpc_timeout *timeo;
struct rpc_rqst *req = task->tk_rqstp;
struct rpc_xprt *xprt = req->rq_xprt;
- int status;
dprintk("RPC: %4d xprt_transmit(%x)\n", task->tk_pid,
*(u32 *)(req->rq_svec[0].iov_base));
- if (xprt->shutdown) {
+ if (xprt->shutdown)
task->tk_status = -EIO;
+
+ if (task->tk_status < 0)
return;
+
+ /* Reset timeout parameters */
+ timeo = &req->rq_timeout;
+ if (timeo->to_retries < 0) {
+ dprintk("RPC: %4d xprt_transmit reset timeo\n",
+ task->tk_pid);
+ timeo->to_retries = xprt->timeout.to_retries;
+ timeo->to_current = timeo->to_initval;
}
- /* If we're not already in the process of transmitting our call,
- * set up everything as needed. */
- if (xprt->snd_task != task) {
- /* Write the record marker */
- if (xprt->stream) {
- u32 marker;
+ /* set up everything as needed. */
+ /* Write the record marker */
+ if (xprt->stream) {
+ u32 marker;
- if (!xprt->connected) {
- task->tk_status = -ENOTCONN;
- return;
- }
- marker = htonl(0x80000000|(req->rq_slen-4));
- *((u32 *) req->rq_svec[0].iov_base) = marker;
- }
+ marker = htonl(0x80000000|(req->rq_slen-4));
+ *((u32 *) req->rq_svec[0].iov_base) = marker;
- /* Reset timeout parameters */
- timeo = &req->rq_timeout;
- if (timeo->to_retries < 0) {
- dprintk("RPC: %4d xprt_transmit reset timeo\n",
- task->tk_pid);
- timeo->to_retries = xprt->timeout.to_retries;
- timeo->to_current = timeo->to_initval;
- }
+ }
-#ifdef RPC_PROFILE
- req->rq_xtime = jiffies;
-#endif
- req->rq_gotit = 0;
+ if (!xprt_down_transmit(task))
+ return;
- if (xprt->snd_task) {
- dprintk("RPC: %4d TCP write queue full (task %d)\n",
- task->tk_pid, xprt->snd_task->tk_pid);
- rpc_sleep_on(&xprt->sending, task,
- xprt_transmit_status, NULL);
- return;
- }
- xprt->snd_buf = req->rq_snd_buf;
- xprt->snd_task = task;
- xprt->snd_sent = 0;
+ do_xprt_transmit(task);
+}
+
+static void
+do_xprt_transmit(struct rpc_task *task)
+{
+ struct rpc_rqst *req = task->tk_rqstp;
+ struct rpc_xprt *xprt = req->rq_xprt;
+ int status, retry = 0;
+
+ if (xprt->shutdown) {
+ task->tk_status = -EIO;
+ goto out_release;
}
/* For fast networks/servers we have to put the request on
* the pending list now:
*/
- start_bh_atomic();
+ req->rq_gotit = 0;
status = rpc_add_wait_queue(&xprt->pending, task);
if (!status)
task->tk_callback = NULL;
- end_bh_atomic();
- if (status)
- {
+ if (status) {
printk(KERN_WARNING "RPC: failed to add task to queue: error: %d!\n", status);
task->tk_status = status;
- return;
+ goto out_release;
}
/* Continue transmitting the packet/record. We must be careful
@@ -1093,27 +1139,55 @@ xprt_transmit(struct rpc_task *task)
*/
while (1) {
xprt->write_space = 0;
- if (xprt_transmit_some(xprt, task) != -EAGAIN) {
+ status = xprt_sendmsg(xprt, req);
+
+ if (status < 0)
+ break;
+
+ if (xprt->stream) {
+ req->rq_bytes_sent += status;
+
+ if (req->rq_bytes_sent >= req->rq_slen)
+ goto out_release;
+ }
+
+ if (status < req->rq_slen)
+ status = -EAGAIN;
+
+ if (status >= 0 || !xprt->stream) {
dprintk("RPC: %4d xmit complete\n", task->tk_pid);
- xprt->snd_task = NULL;
- return;
+ goto out_release;
}
- /*d*/dprintk("RPC: %4d xmit incomplete (%d left of %d)\n",
- task->tk_pid, xprt->snd_buf.io_len,
+ dprintk("RPC: %4d xmit incomplete (%d left of %d)\n",
+ task->tk_pid, req->rq_slen - req->rq_bytes_sent,
req->rq_slen);
- task->tk_status = 0;
- start_bh_atomic();
- if (!xprt->write_space) {
- /* Remove from pending */
- rpc_remove_wait_queue(task);
- rpc_sleep_on(&xprt->sending, task,
- xprt_transmit_status, NULL);
- end_bh_atomic();
- return;
- }
+
+ if (retry++ > 50)
+ break;
+ }
+
+ task->tk_status = (status == -ENOMEM) ? -EAGAIN : status;
+
+ /* We don't care if we got a reply, so don't protect
+ * against bh. */
+ if (task->tk_rpcwait == &xprt->pending)
+ rpc_remove_wait_queue(task);
+
+ /* Protect against (udp|tcp)_write_space */
+ start_bh_atomic();
+ if (status == -ENOMEM || status == -EAGAIN) {
+ task->tk_timeout = req->rq_timeout.to_current;
+ if (!xprt->write_space)
+ rpc_sleep_on(&xprt->sending, task, xprt_transmit_status,
+ xprt_transmit_timeout);
end_bh_atomic();
+ return;
}
+ end_bh_atomic();
+
+out_release:
+ xprt_up_transmit(task);
}
/*
@@ -1126,19 +1200,27 @@ xprt_transmit_status(struct rpc_task *task)
struct rpc_xprt *xprt = task->tk_client->cl_xprt;
dprintk("RPC: %4d transmit_status %d\n", task->tk_pid, task->tk_status);
- if (xprt->snd_task == task)
- {
- if (task->tk_status < 0)
- {
- xprt->snd_task = NULL;
- xprt_disconnect(xprt);
- }
- else
- xprt_transmit(task);
+ if (xprt->snd_task == task) {
+ task->tk_status = 0;
+ do_xprt_transmit(task);
+ return;
}
}
/*
+ * RPC transmit timeout handler.
+ */
+static void
+xprt_transmit_timeout(struct rpc_task *task)
+{
+ dprintk("RPC: %4d transmit_timeout %d\n", task->tk_pid, task->tk_status);
+ task->tk_status = -ETIMEDOUT;
+ task->tk_timeout = 0;
+ rpc_wake_up_task(task);
+ xprt_up_transmit(task);
+}
+
+/*
* Wait for the reply to our call.
* When the callback is invoked, the congestion window should have
* been updated already.
@@ -1150,25 +1232,33 @@ xprt_receive(struct rpc_task *task)
struct rpc_xprt *xprt = req->rq_xprt;
dprintk("RPC: %4d xprt_receive\n", task->tk_pid);
- if (xprt->connected == 0) {
- task->tk_status = -ENOTCONN;
- return;
- }
/*
- * Wait until rq_gotit goes non-null, or timeout elapsed.
+ * Wait until rq_gotit goes non-null, or timeout elapsed.
*/
task->tk_timeout = req->rq_timeout.to_current;
start_bh_atomic();
+ if (task->tk_rpcwait)
+ rpc_remove_wait_queue(task);
+
+ if (task->tk_status < 0 || xprt->shutdown) {
+ end_bh_atomic();
+ goto out;
+ }
+
if (!req->rq_gotit) {
rpc_sleep_on(&xprt->pending, task,
xprt_receive_status, xprt_timer);
+ end_bh_atomic();
+ return;
}
end_bh_atomic();
dprintk("RPC: %4d xprt_receive returns %d\n",
task->tk_pid, task->tk_status);
+ out:
+ xprt_receive_status(task);
}
static void
@@ -1176,8 +1266,9 @@ xprt_receive_status(struct rpc_task *task)
{
struct rpc_xprt *xprt = task->tk_xprt;
- if (xprt->stream && xprt->tcp_rqstp == task->tk_rqstp)
+ if (xprt->tcp_rqstp == task->tk_rqstp)
xprt->tcp_rqstp = NULL;
+
}
/*
@@ -1194,7 +1285,7 @@ xprt_reserve(struct rpc_task *task)
dprintk("RPC: %4d xprt_reserve cong = %ld cwnd = %ld\n",
task->tk_pid, xprt->cong, xprt->cwnd);
- if ((!RPCXPRT_CONGESTED(xprt) && xprt->free)) {
+ if (!RPCXPRT_CONGESTED(xprt) && xprt->free) {
xprt_reserve_status(task);
task->tk_timeout = 0;
} else if (!task->tk_timeout) {
@@ -1223,40 +1314,30 @@ xprt_reserve_status(struct rpc_task *task)
/* NOP */
} else if (task->tk_rqstp) {
/* We've already been given a request slot: NOP */
- } else if (!RPCXPRT_CONGESTED(xprt)) {
+ } else if (!RPCXPRT_CONGESTED(xprt) && xprt->free) {
/* OK: There's room for us. Grab a free slot and bump
* congestion value */
- req = xprt->free;
- if (!req)
- goto bad_list;
- if (req->rq_xid)
- goto bad_used;
+ spin_lock(&xprt_lock);
+ if (!(req = xprt->free)) {
+ spin_unlock(&xprt_lock);
+ goto out_nofree;
+ }
xprt->free = req->rq_next;
+ req->rq_next = NULL;
+ spin_unlock(&xprt_lock);
xprt->cong += RPC_CWNDSCALE;
task->tk_rqstp = req;
- req->rq_next = NULL;
xprt_request_init(task, xprt);
- } else {
- task->tk_status = -EAGAIN;
- }
- if (xprt->free && !RPCXPRT_CONGESTED(xprt))
- rpc_wake_up_next(&xprt->backlog);
+ if (xprt->free)
+ xprt_clear_backlog(xprt);
+ } else
+ goto out_nofree;
return;
-bad_list:
- printk(KERN_ERR
- "RPC: %4d inconsistent free list (cong %ld cwnd %ld)\n",
- task->tk_pid, xprt->cong, xprt->cwnd);
- rpc_debug = ~0;
- goto bummer;
-bad_used:
- printk(KERN_ERR "RPC: used rqst slot %p on free list!\n", req);
-bummer:
- task->tk_status = -EIO;
- xprt->free = NULL;
- return;
+out_nofree:
+ task->tk_status = -EAGAIN;
}
/*
@@ -1298,13 +1379,15 @@ xprt_release(struct rpc_task *task)
dprintk("RPC: %4d release request %p\n", task->tk_pid, req);
+ spin_lock(&xprt_lock);
+ req->rq_next = xprt->free;
+ xprt->free = req;
+ spin_unlock(&xprt_lock);
+
/* remove slot from queue of pending */
start_bh_atomic();
if (task->tk_rpcwait) {
printk("RPC: task of released request still queued!\n");
-#ifdef RPC_DEBUG
- printk("RPC: (task is on %s)\n", rpc_qname(task->tk_rpcwait));
-#endif
rpc_del_timer(task);
rpc_remove_wait_queue(task);
}
@@ -1313,31 +1396,7 @@ xprt_release(struct rpc_task *task)
/* Decrease congestion value. */
xprt->cong -= RPC_CWNDSCALE;
-#if 0
- /* If congestion threshold is not yet reached, pass on the request slot.
- * This looks kind of kludgy, but it guarantees backlogged requests
- * are served in order.
- * N.B. This doesn't look completely safe, as the task is still
- * on the backlog list after wake-up.
- */
- if (!RPCXPRT_CONGESTED(xprt)) {
- struct rpc_task *next = rpc_wake_up_next(&xprt->backlog);
-
- if (next && next->tk_rqstp == 0) {
- xprt->cong += RPC_CWNDSCALE;
- next->tk_rqstp = req;
- xprt_request_init(next, xprt);
- return;
- }
- }
-#endif
-
- req->rq_next = xprt->free;
- xprt->free = req;
-
- /* If not congested, wake up the next backlogged process */
- if (!RPCXPRT_CONGESTED(xprt))
- rpc_wake_up_next(&xprt->backlog);
+ xprt_clear_backlog(xprt);
}
/*
@@ -1382,11 +1441,7 @@ xprt_setup(struct socket *sock, int proto,
dprintk("RPC: setting up %s transport...\n",
proto == IPPROTO_UDP? "UDP" : "TCP");
-#if LINUX_VERSION_CODE >= 0x020100
inet = sock->sk;
-#else
- inet = (struct sock *) sock->data;
-#endif
if ((xprt = kmalloc(sizeof(struct rpc_xprt), GFP_KERNEL)) == NULL)
return NULL;
@@ -1398,7 +1453,8 @@ xprt_setup(struct socket *sock, int proto,
xprt->addr = *ap;
xprt->prot = proto;
xprt->stream = (proto == IPPROTO_TCP)? 1 : 0;
- xprt->cwnd = RPC_INITCWND;
+ xprt->congtime = jiffies;
+ init_waitqueue_head(&xprt->cong_wait);
#ifdef SOCK_HAS_USER_DATA
inet->user_data = xprt;
#else
@@ -1410,11 +1466,14 @@ xprt_setup(struct socket *sock, int proto,
xprt->old_write_space = inet->write_space;
if (proto == IPPROTO_UDP) {
inet->data_ready = udp_data_ready;
+ inet->write_space = udp_write_space;
inet->no_check = UDP_CSUM_NORCV;
+ xprt->cwnd = RPC_INITCWND;
} else {
inet->data_ready = tcp_data_ready;
inet->state_change = tcp_state_change;
inet->write_space = tcp_write_space;
+ xprt->cwnd = RPC_MAXCWND;
xprt->nocong = 1;
}
xprt->connected = 1;
@@ -1487,6 +1546,7 @@ xprt_create_socket(int proto, struct sockaddr_in *sap, struct rpc_timeout *to)
(proto == IPPROTO_UDP)? "udp" : "tcp", proto);
type = (proto == IPPROTO_UDP)? SOCK_DGRAM : SOCK_STREAM;
+
if ((err = sock_create(PF_INET, type, proto, &sock)) < 0) {
printk("RPC: can't create socket (%d).\n", -err);
goto failed;
@@ -1543,6 +1603,21 @@ xprt_shutdown(struct rpc_xprt *xprt)
rpc_wake_up(&xprt->pending);
rpc_wake_up(&xprt->backlog);
rpc_wake_up(&xprt->reconn);
+ wake_up(&xprt->cong_wait);
+}
+
+/*
+ * Clear the xprt backlog queue
+ */
+int
+xprt_clear_backlog(struct rpc_xprt *xprt) {
+ if (!xprt)
+ return 0;
+ if (RPCXPRT_CONGESTED(xprt))
+ return 0;
+ rpc_wake_up_next(&xprt->backlog);
+ wake_up(&xprt->cong_wait);
+ return 1;
}
/*