From 9e30c3705aed9fbec4c3304570e4d6e707856bcb Mon Sep 17 00:00:00 2001 From: Ralf Baechle Date: Fri, 21 Jan 2000 22:34:01 +0000 Subject: Merge with Linux 2.3.22. --- net/sunrpc/auth.c | 13 + net/sunrpc/sched.c | 171 +++++++++---- net/sunrpc/xprt.c | 721 +++++++++++++++++++++++++++++------------------------ 3 files changed, 528 insertions(+), 377 deletions(-) (limited to 'net/sunrpc') diff --git a/net/sunrpc/auth.c b/net/sunrpc/auth.c index 7c966779b..af9923ec8 100644 --- a/net/sunrpc/auth.c +++ b/net/sunrpc/auth.c @@ -15,6 +15,7 @@ #include #include #include +#include #ifdef RPC_DEBUG # define RPCDBG_FACILITY RPCDBG_AUTH @@ -71,6 +72,8 @@ rpcauth_destroy(struct rpc_auth *auth) auth->au_ops->destroy(auth); } +spinlock_t rpc_credcache_lock = SPIN_LOCK_UNLOCKED; + /* * Initialize RPC credential cache */ @@ -94,6 +97,7 @@ rpcauth_free_credcache(struct rpc_auth *auth) if (!(destroy = auth->au_ops->crdestroy)) destroy = (void (*)(struct rpc_cred *)) rpc_free; + spin_lock(&rpc_credcache_lock); for (i = 0; i < RPC_CREDCACHE_NR; i++) { q = &auth->au_credcache[i]; while ((cred = *q) != NULL) { @@ -101,6 +105,7 @@ rpcauth_free_credcache(struct rpc_auth *auth) destroy(cred); } } + spin_unlock(&rpc_credcache_lock); } /* @@ -113,6 +118,7 @@ rpcauth_gc_credcache(struct rpc_auth *auth) int i, safe = 0; dprintk("RPC: gc'ing RPC credentials for auth %p\n", auth); + spin_lock(&rpc_credcache_lock); for (i = 0; i < RPC_CREDCACHE_NR; i++) { q = &auth->au_credcache[i]; while ((cred = *q) != NULL) { @@ -129,6 +135,7 @@ rpcauth_gc_credcache(struct rpc_auth *auth) q = &cred->cr_next; } } + spin_unlock(&rpc_credcache_lock); while ((cred = free) != NULL) { free = cred->cr_next; rpc_free(cred); @@ -145,10 +152,12 @@ rpcauth_insert_credcache(struct rpc_auth *auth, struct rpc_cred *cred) int nr; nr = (cred->cr_uid % RPC_CREDCACHE_NR); + spin_lock(&rpc_credcache_lock); cred->cr_next = auth->au_credcache[nr]; auth->au_credcache[nr] = cred; cred->cr_expire = jiffies + auth->au_expire; cred->cr_count++; + spin_unlock(&rpc_credcache_lock); } /* @@ -166,6 +175,7 @@ rpcauth_lookup_credcache(struct rpc_task *task) if (time_before(auth->au_nextgc, jiffies)) rpcauth_gc_credcache(auth); + spin_lock(&rpc_credcache_lock); q = &auth->au_credcache[nr]; while ((cred = *q) != NULL) { if (auth->au_ops->crmatch(task, cred)) { @@ -174,6 +184,7 @@ rpcauth_lookup_credcache(struct rpc_task *task) } q = &cred->cr_next; } + spin_unlock(&rpc_credcache_lock); if (!cred) cred = auth->au_ops->crcreate(task); @@ -194,6 +205,7 @@ rpcauth_remove_credcache(struct rpc_auth *auth, struct rpc_cred *cred) int nr; nr = (cred->cr_uid % RPC_CREDCACHE_NR); + spin_lock(&rpc_credcache_lock); q = &auth->au_credcache[nr]; while ((cr = *q) != NULL) { if (cred == cr) { @@ -202,6 +214,7 @@ rpcauth_remove_credcache(struct rpc_auth *auth, struct rpc_cred *cred) } q = &cred->cr_next; } + spin_unlock(&rpc_credcache_lock); } struct rpc_cred * diff --git a/net/sunrpc/sched.c b/net/sunrpc/sched.c index b4e0b4b76..181a7e46c 100644 --- a/net/sunrpc/sched.c +++ b/net/sunrpc/sched.c @@ -18,6 +18,7 @@ #include #include #include +#include #include @@ -73,6 +74,16 @@ static int rpc_inhibit = 0; static u32 swap_buffer[PAGE_SIZE >> 2]; static int swap_buffer_used = 0; +/* + * Spinlock for wait queues. Access to the latter also has to be + * interrupt-safe in order to allow timers to wake up sleeping tasks. + */ +spinlock_t rpc_queue_lock = SPIN_LOCK_UNLOCKED; +/* + * Spinlock for other critical sections of code. + */ +spinlock_t rpc_sched_lock = SPIN_LOCK_UNLOCKED; + /* * Add new request to wait queue. * @@ -81,8 +92,8 @@ static int swap_buffer_used = 0; * improve overall performance. * Everyone else gets appended to the queue to ensure proper FIFO behavior. */ -int -rpc_add_wait_queue(struct rpc_wait_queue *queue, struct rpc_task *task) +static int +__rpc_add_wait_queue(struct rpc_wait_queue *queue, struct rpc_task *task) { if (task->tk_rpcwait) { if (task->tk_rpcwait != queue) @@ -104,12 +115,24 @@ rpc_add_wait_queue(struct rpc_wait_queue *queue, struct rpc_task *task) return 0; } +int +rpc_add_wait_queue(struct rpc_wait_queue *q, struct rpc_task *task) +{ + unsigned long oldflags; + int result; + + spin_lock_irqsave(&rpc_queue_lock, oldflags); + result = __rpc_add_wait_queue(q, task); + spin_unlock_irqrestore(&rpc_queue_lock, oldflags); + return result; +} + /* * Remove request from queue. - * Note: must be called with interrupts disabled. + * Note: must be called with spin lock held. */ -void -rpc_remove_wait_queue(struct rpc_task *task) +static void +__rpc_remove_wait_queue(struct rpc_task *task) { struct rpc_wait_queue *queue; @@ -122,6 +145,16 @@ rpc_remove_wait_queue(struct rpc_task *task) task->tk_pid, queue, rpc_qname(queue)); } +void +rpc_remove_wait_queue(struct rpc_task *task) +{ + unsigned long oldflags; + + spin_lock_irqsave(&rpc_queue_lock, oldflags); + __rpc_remove_wait_queue(task); + spin_unlock_irqrestore(&rpc_queue_lock, oldflags); +} + /* * Set up a timer for the current task. */ @@ -165,7 +198,7 @@ rpc_del_timer(struct rpc_task *task) * Make an RPC task runnable. * * Note: If the task is ASYNC, this must be called with - * interrupts disabled to protect the wait queue operation. + * the spinlock held to protect the wait queue operation. */ static inline void rpc_make_runnable(struct rpc_task *task) @@ -177,7 +210,7 @@ rpc_make_runnable(struct rpc_task *task) task->tk_flags |= RPC_TASK_RUNNING; if (RPC_IS_ASYNC(task)) { int status; - status = rpc_add_wait_queue(&schedq, task); + status = __rpc_add_wait_queue(&schedq, task); if (status) { printk(KERN_WARNING "RPC: failed to add task to queue: error: %d!\n", status); @@ -214,18 +247,12 @@ static void __rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task, rpc_action action, rpc_action timer) { - unsigned long oldflags; int status; dprintk("RPC: %4d sleep_on(queue \"%s\" time %ld)\n", task->tk_pid, rpc_qname(q), jiffies); - /* - * Protect the execution below. - */ - save_flags(oldflags); cli(); - - status = rpc_add_wait_queue(q, task); + status = __rpc_add_wait_queue(q, task); if (status) { printk(KERN_WARNING "RPC: failed to add task to queue: error: %d!\n", status); @@ -240,7 +267,6 @@ __rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task, task->tk_flags &= ~RPC_TASK_RUNNING; } - restore_flags(oldflags); return; } @@ -248,11 +274,17 @@ void rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task, rpc_action action, rpc_action timer) { + unsigned long oldflags; + /* + * Protect the queue operations. + */ + spin_lock_irqsave(&rpc_queue_lock, oldflags); __rpc_sleep_on(q, task, action, timer); + spin_unlock_irqrestore(&rpc_queue_lock, oldflags); } /* - * Wake up a single task -- must be invoked with bottom halves off. + * Wake up a single task -- must be invoked with spin lock held. * * It would probably suffice to cli/sti the del_timer and remove_wait_queue * operations individually. @@ -272,7 +304,7 @@ __rpc_wake_up(struct rpc_task *task) #endif rpc_del_timer(task); if (task->tk_rpcwait != &schedq) - rpc_remove_wait_queue(task); + __rpc_remove_wait_queue(task); if (!RPC_IS_RUNNING(task)) { task->tk_flags |= RPC_TASK_CALLBACK; rpc_make_runnable(task); @@ -289,7 +321,7 @@ __rpc_default_timer(struct rpc_task *task) dprintk("RPC: %d timeout (default timer)\n", task->tk_pid); task->tk_status = -ETIMEDOUT; task->tk_timeout = 0; - __rpc_wake_up(task); + rpc_wake_up_task(task); } /* @@ -300,9 +332,9 @@ rpc_wake_up_task(struct rpc_task *task) { unsigned long oldflags; - save_flags(oldflags); cli(); + spin_lock_irqsave(&rpc_queue_lock, oldflags); __rpc_wake_up(task); - restore_flags(oldflags); + spin_unlock_irqrestore(&rpc_queue_lock, oldflags); } /* @@ -315,10 +347,10 @@ rpc_wake_up_next(struct rpc_wait_queue *queue) struct rpc_task *task; dprintk("RPC: wake_up_next(%p \"%s\")\n", queue, rpc_qname(queue)); - save_flags(oldflags); cli(); + spin_lock_irqsave(&rpc_queue_lock, oldflags); if ((task = queue->task) != 0) __rpc_wake_up(task); - restore_flags(oldflags); + spin_unlock_irqrestore(&rpc_queue_lock, oldflags); return task; } @@ -331,10 +363,10 @@ rpc_wake_up(struct rpc_wait_queue *queue) { unsigned long oldflags; - save_flags(oldflags); cli(); + spin_lock_irqsave(&rpc_queue_lock, oldflags); while (queue->task) __rpc_wake_up(queue->task); - restore_flags(oldflags); + spin_unlock_irqrestore(&rpc_queue_lock, oldflags); } /* @@ -346,12 +378,12 @@ rpc_wake_up_status(struct rpc_wait_queue *queue, int status) struct rpc_task *task; unsigned long oldflags; - save_flags(oldflags); cli(); + spin_lock_irqsave(&rpc_queue_lock, oldflags); while ((task = queue->task) != NULL) { task->tk_status = status; __rpc_wake_up(task); } - restore_flags(oldflags); + spin_unlock_irqrestore(&rpc_queue_lock, oldflags); } /* @@ -369,7 +401,7 @@ static void __rpc_atrun(struct rpc_task *task) { task->tk_status = 0; - __rpc_wake_up(task); + rpc_wake_up_task(task); } /* @@ -432,13 +464,13 @@ __rpc_execute(struct rpc_task *task) * and the RPC reply arrives before we get here, it will * have state RUNNING, but will still be on schedq. */ - save_flags(oldflags); cli(); + spin_lock_irqsave(&rpc_queue_lock, oldflags); if (RPC_IS_RUNNING(task)) { if (task->tk_rpcwait == &schedq) - rpc_remove_wait_queue(task); + __rpc_remove_wait_queue(task); } else while (!RPC_IS_RUNNING(task)) { if (RPC_IS_ASYNC(task)) { - restore_flags(oldflags); + spin_unlock_irqrestore(&rpc_queue_lock, oldflags); return 0; } @@ -448,9 +480,9 @@ __rpc_execute(struct rpc_task *task) if (current->pid == rpciod_pid) printk(KERN_ERR "RPC: rpciod waiting on sync task!\n"); - sti(); + spin_unlock_irq(&rpc_queue_lock); __wait_event(task->tk_wait, RPC_IS_RUNNING(task)); - cli(); + spin_lock_irq(&rpc_queue_lock); /* * When the task received a signal, remove from @@ -462,7 +494,7 @@ __rpc_execute(struct rpc_task *task) dprintk("RPC: %4d sync task resuming\n", task->tk_pid); } - restore_flags(oldflags); + spin_unlock_irqrestore(&rpc_queue_lock, oldflags); /* * When a sync task receives a signal, it exits with @@ -522,15 +554,16 @@ __rpc_schedule(void) int need_resched = current->need_resched; dprintk("RPC: rpc_schedule enter\n"); - save_flags(oldflags); while (1) { - cli(); - if (!(task = schedq.task)) + spin_lock_irqsave(&rpc_queue_lock, oldflags); + if (!(task = schedq.task)) { + spin_unlock_irqrestore(&rpc_queue_lock, oldflags); break; + } rpc_del_timer(task); - rpc_remove_wait_queue(task); + __rpc_remove_wait_queue(task); task->tk_flags |= RPC_TASK_RUNNING; - restore_flags(oldflags); + spin_unlock_irqrestore(&rpc_queue_lock, oldflags); __rpc_execute(task); @@ -541,7 +574,6 @@ __rpc_schedule(void) if (need_resched) schedule(); } - restore_flags(oldflags); dprintk("RPC: rpc_schedule leave\n"); } @@ -626,11 +658,13 @@ rpc_init_task(struct rpc_task *task, struct rpc_clnt *clnt, task->tk_suid_retry = 1; /* Add to global list of all tasks */ + spin_lock(&rpc_sched_lock); task->tk_next_task = all_tasks; task->tk_prev_task = NULL; if (all_tasks) all_tasks->tk_prev_task = task; all_tasks = task; + spin_unlock(&rpc_sched_lock); if (clnt) clnt->cl_users++; @@ -679,10 +713,12 @@ void rpc_release_task(struct rpc_task *task) { struct rpc_task *next, *prev; + unsigned long oldflags; dprintk("RPC: %4d release task\n", task->tk_pid); /* Remove from global task list */ + spin_lock(&rpc_sched_lock); prev = task->tk_prev_task; next = task->tk_next_task; if (next) @@ -691,6 +727,19 @@ rpc_release_task(struct rpc_task *task) prev->tk_next_task = next; else all_tasks = next; + task->tk_next_task = task->tk_prev_task = NULL; + spin_unlock(&rpc_sched_lock); + + /* Protect the execution below. */ + spin_lock_irqsave(&rpc_queue_lock, oldflags); + + /* Delete any running timer */ + rpc_del_timer(task); + + /* Remove from any wait queue we're still on */ + __rpc_remove_wait_queue(task); + + spin_unlock_irqrestore(&rpc_queue_lock, oldflags); /* Release resources */ if (task->tk_rqstp) @@ -738,12 +787,15 @@ rpc_find_parent(struct rpc_task *child) static void rpc_child_exit(struct rpc_task *child) { + unsigned long oldflags; struct rpc_task *parent; + spin_lock_irqsave(&rpc_queue_lock, oldflags); if ((parent = rpc_find_parent(child)) != NULL) { parent->tk_status = child->tk_status; - rpc_wake_up_task(parent); + __rpc_wake_up(parent); } + spin_unlock_irqrestore(&rpc_queue_lock, oldflags); rpc_release_task(child); } @@ -772,11 +824,11 @@ rpc_run_child(struct rpc_task *task, struct rpc_task *child, rpc_action func) { unsigned long oldflags; - save_flags(oldflags); cli(); - rpc_make_runnable(child); - restore_flags(oldflags); + spin_lock_irqsave(&rpc_queue_lock, oldflags); /* N.B. Is it possible for the child to have already finished? */ - rpc_sleep_on(&childq, task, func, NULL); + __rpc_sleep_on(&childq, task, func, NULL); + rpc_make_runnable(child); + spin_unlock_irqrestore(&rpc_queue_lock, oldflags); } /* @@ -789,8 +841,11 @@ rpc_killall_tasks(struct rpc_clnt *clnt) struct rpc_task **q, *rovr; dprintk("RPC: killing all tasks for client %p\n", clnt); - /* N.B. Why bother to inhibit? Nothing blocks here ... */ - rpc_inhibit++; + + /* + * Spin lock all_tasks to prevent changes... + */ + spin_lock(&rpc_sched_lock); for (q = &all_tasks; (rovr = *q); q = &rovr->tk_next_task) { if (!clnt || rovr->tk_client == clnt) { rovr->tk_flags |= RPC_TASK_KILLED; @@ -798,11 +853,18 @@ rpc_killall_tasks(struct rpc_clnt *clnt) rpc_wake_up_task(rovr); } } - rpc_inhibit--; + spin_unlock(&rpc_sched_lock); } static DECLARE_MUTEX_LOCKED(rpciod_running); +static inline int +rpciod_task_pending(void) +{ + return schedq.task != NULL || xprt_tcp_pending(); +} + + /* * This is the rpciod kernel thread */ @@ -810,7 +872,6 @@ static int rpciod(void *ptr) { wait_queue_head_t *assassin = (wait_queue_head_t*) ptr; - unsigned long oldflags; int rounds = 0; MOD_INC_USE_COUNT; @@ -845,18 +906,15 @@ rpciod(void *ptr) schedule(); rounds = 0; } - save_flags(oldflags); cli(); dprintk("RPC: rpciod running checking dispatch\n"); rpciod_tcp_dispatcher(); - if (!schedq.task) { + if (!rpciod_task_pending()) { dprintk("RPC: rpciod back to sleep\n"); - interruptible_sleep_on(&rpciod_idle); + wait_event_interruptible(rpciod_idle, rpciod_task_pending()); dprintk("RPC: switch to rpciod\n"); - rpciod_tcp_dispatcher(); rounds = 0; } - restore_flags(oldflags); } dprintk("RPC: rpciod shutdown commences\n"); @@ -983,8 +1041,12 @@ void rpc_show_tasks(void) struct rpc_task *t = all_tasks, *next; struct nfs_wreq *wreq; - if (!t) + spin_lock(&rpc_sched_lock); + t = all_tasks; + if (!t) { + spin_unlock(&rpc_sched_lock); return; + } printk("-pid- proc flgs status -client- -prog- --rqstp- -timeout " "-rpcwait -action- --exit--\n"); for (; t; t = next) { @@ -1007,5 +1069,6 @@ void rpc_show_tasks(void) wreq->wb_file->f_dentry->d_parent->d_name.name, wreq->wb_file->f_dentry->d_name.name); } + spin_unlock(&rpc_sched_lock); } #endif diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c index 610cfc07b..fb09aac28 100644 --- a/net/sunrpc/xprt.c +++ b/net/sunrpc/xprt.c @@ -31,12 +31,16 @@ * primitives that `transparently' work for processes as well as async * tasks that rely on callbacks. * - * Copyright (C) 1995, 1996, Olaf Kirch + * Copyright (C) 1995-1997, Olaf Kirch * * TCP callback races fixes (C) 1998 Red Hat Software * TCP send fixes (C) 1998 Red Hat Software * TCP NFS related read + write fixes * (C) 1999 Dave Airlie, University of Limerick, Ireland + * + * Rewrite of larges part of the code in order to stabilize TCP stuff. + * Fix behaviour when socket buffer is full. + * (C) 1999 Trond Myklebust */ #define __KERNEL_SYSCALLS__ @@ -62,6 +66,8 @@ #include #define SOCK_HAS_USER_DATA +/* Following value should be > 32k + RPC overhead */ +#define XPRT_MIN_WRITE_SPACE 35000 /* * Local variables @@ -70,6 +76,9 @@ static struct rpc_xprt * sock_list = NULL; #endif +/* Spinlock for critical sections in the code. */ +spinlock_t xprt_lock = SPIN_LOCK_UNLOCKED; + #ifdef RPC_DEBUG # undef RPC_DEBUG_DATA # define RPCDBG_FACILITY RPCDBG_XPRT @@ -84,11 +93,13 @@ static struct rpc_xprt * sock_list = NULL; * Local functions */ static void xprt_request_init(struct rpc_task *, struct rpc_xprt *); +static void do_xprt_transmit(struct rpc_task *); static void xprt_transmit_status(struct rpc_task *task); +static void xprt_transmit_timeout(struct rpc_task *task); static void xprt_receive_status(struct rpc_task *task); static void xprt_reserve_status(struct rpc_task *task); +static void xprt_disconnect(struct rpc_xprt *); static void xprt_reconn_timeout(struct rpc_task *task); -static void xprt_reconn_status(struct rpc_task *task); static struct socket *xprt_create_socket(int, struct sockaddr_in *, struct rpc_timeout *); @@ -144,39 +155,35 @@ xprt_from_sock(struct sock *sk) * Adjust the iovec to move on 'n' bytes */ -extern inline void xprt_move_iov(struct msghdr *msg, struct iovec *niv, int amount) +extern inline void +xprt_move_iov(struct msghdr *msg, struct iovec *niv, int amount) { struct iovec *iv=msg->msg_iov; + int i; /* * Eat any sent iovecs */ - - while(iv->iov_len < amount) - { - amount-=iv->iov_len; + while(iv->iov_len <= amount) { + amount -= iv->iov_len; iv++; msg->msg_iovlen--; } - - msg->msg_iov=niv; - + /* * And chew down the partial one */ - niv[0].iov_len = iv->iov_len-amount; niv[0].iov_base =((unsigned char *)iv->iov_base)+amount; iv++; - + /* * And copy any others */ - - for(amount=1;amountmsg_iovlen; amount++) - { - niv[amount]=*iv++; - } + for(i = 1; i < msg->msg_iovlen; i++) + niv[i]=*iv++; + + msg->msg_iov=niv; } /* @@ -184,43 +191,42 @@ extern inline void xprt_move_iov(struct msghdr *msg, struct iovec *niv, int amou */ static inline int -xprt_sendmsg(struct rpc_xprt *xprt) +xprt_sendmsg(struct rpc_xprt *xprt, struct rpc_rqst *req) { struct socket *sock = xprt->sock; struct msghdr msg; mm_segment_t oldfs; int result; + int slen = req->rq_slen - req->rq_bytes_sent; struct iovec niv[MAX_IOVEC]; + if (slen == 0) + return 0; + xprt_pktdump("packet data:", - xprt->snd_buf.io_vec->iov_base, - xprt->snd_buf.io_vec->iov_len); + req->rq_svec->iov_base, + req->rq_svec->iov_len); msg.msg_flags = MSG_DONTWAIT; - msg.msg_iov = xprt->snd_buf.io_vec; - msg.msg_iovlen = xprt->snd_buf.io_nr; + msg.msg_iov = req->rq_svec; + msg.msg_iovlen = req->rq_snr; msg.msg_name = (struct sockaddr *) &xprt->addr; msg.msg_namelen = sizeof(xprt->addr); msg.msg_control = NULL; msg.msg_controllen = 0; /* Dont repeat bytes */ - - if(xprt->snd_sent) - xprt_move_iov(&msg, niv, xprt->snd_sent); - + if (req->rq_bytes_sent) + xprt_move_iov(&msg, niv, req->rq_bytes_sent); + oldfs = get_fs(); set_fs(get_ds()); - result = sock_sendmsg(sock, &msg, xprt->snd_buf.io_len); + result = sock_sendmsg(sock, &msg, slen); set_fs(oldfs); - dprintk("RPC: xprt_sendmsg(%d) = %d\n", - xprt->snd_buf.io_len, result); + dprintk("RPC: xprt_sendmsg(%d) = %d\n", slen, result); - if (result >= 0) { - xprt->snd_buf.io_len -= result; - xprt->snd_sent += result; + if (result >= 0) return result; - } switch (result) { case -ECONNREFUSED: @@ -229,9 +235,14 @@ xprt_sendmsg(struct rpc_xprt *xprt) */ break; case -EAGAIN: - return 0; - case -ENOTCONN: case -EPIPE: + if (sock->flags & SO_NOSPACE) + result = -ENOMEM; + break; + case -ENOTCONN: + case -EPIPE: /* connection broken */ + if (xprt->stream) + result = -ENOTCONN; break; default: printk(KERN_NOTICE "RPC: sendmsg returned error %d\n", -result); @@ -252,7 +263,6 @@ xprt_recvmsg(struct rpc_xprt *xprt, struct iovec *iov, int nr, int len) mm_segment_t oldfs; int result; -#if LINUX_VERSION_CODE >= 0x020100 msg.msg_flags = MSG_DONTWAIT; msg.msg_iov = iov; msg.msg_iovlen = nr; @@ -264,20 +274,6 @@ xprt_recvmsg(struct rpc_xprt *xprt, struct iovec *iov, int nr, int len) oldfs = get_fs(); set_fs(get_ds()); result = sock_recvmsg(sock, &msg, len, MSG_DONTWAIT); set_fs(oldfs); -#else - int alen = sizeof(sin); - msg.msg_flags = 0; - msg.msg_iov = iov; - msg.msg_iovlen = nr; - msg.msg_name = &sin; - msg.msg_namelen = sizeof(sin); - msg.msg_control = NULL; - msg.msg_controllen = 0; - - oldfs = get_fs(); set_fs(get_ds()); - result = sock->ops->recvmsg(sock, &msg, len, 1, 0, &alen); - set_fs(oldfs); -#endif dprintk("RPC: xprt_recvmsg(iov %p, len %d) = %d\n", iov, len, result); @@ -354,13 +350,15 @@ xprt_close(struct rpc_xprt *xprt) { struct sock *sk = xprt->inet; + xprt_disconnect(xprt); + #ifdef SOCK_HAS_USER_DATA sk->user_data = NULL; #endif sk->data_ready = xprt->old_data_ready; - sk->no_check = 0; sk->state_change = xprt->old_state_change; sk->write_space = xprt->old_write_space; + sk->no_check = 0; sock_release(xprt->sock); /* @@ -378,9 +376,16 @@ static void xprt_disconnect(struct rpc_xprt *xprt) { dprintk("RPC: disconnected transport %p\n", xprt); + xprt->connected = 0; + xprt->tcp_offset = 0; + xprt->tcp_more = 0; + xprt->tcp_total = 0; + xprt->tcp_reclen = 0; + xprt->tcp_copied = 0; + xprt->tcp_rqstp = NULL; + xprt->rx_pending_flag = 0; rpc_wake_up_status(&xprt->pending, -ENOTCONN); rpc_wake_up_status(&xprt->sending, -ENOTCONN); - xprt->connected = 0; } /* @@ -398,22 +403,33 @@ xprt_reconnect(struct rpc_task *task) task->tk_pid, xprt, xprt->connected); task->tk_status = 0; + if (xprt->shutdown) + return; + + if (!xprt->stream) + return; + + start_bh_atomic(); + if (xprt->connected) { + end_bh_atomic(); + return; + } if (xprt->connecting) { task->tk_timeout = xprt->timeout.to_maxval; rpc_sleep_on(&xprt->reconn, task, NULL, NULL); + end_bh_atomic(); return; } xprt->connecting = 1; + end_bh_atomic(); /* Create an unconnected socket */ - if (!(sock = xprt_create_socket(xprt->prot, NULL, &xprt->timeout))) + if (!(sock = xprt_create_socket(xprt->prot, NULL, &xprt->timeout))) { + xprt->connecting = 0; goto defer; + } -#if LINUX_VERSION_CODE >= 0x020100 inet = sock->sk; -#else - inet = (struct sock *) sock->data; -#endif inet->data_ready = xprt->inet->data_ready; inet->state_change = xprt->inet->state_change; inet->write_space = xprt->inet->write_space; @@ -422,18 +438,18 @@ xprt_reconnect(struct rpc_task *task) #endif dprintk("RPC: %4d closing old socket\n", task->tk_pid); - xprt_disconnect(xprt); xprt_close(xprt); - /* Reset to new socket and default congestion */ + /* Reset to new socket */ xprt->sock = sock; xprt->inet = inet; - xprt->cwnd = RPC_INITCWND; /* Now connect it asynchronously. */ dprintk("RPC: %4d connecting new socket\n", task->tk_pid); status = sock->ops->connect(sock, (struct sockaddr *) &xprt->addr, sizeof(xprt->addr), O_NONBLOCK); + + xprt->connecting = 0; if (status < 0) { if (status != -EINPROGRESS && status != -EALREADY) { printk("RPC: TCP connect error %d!\n", -status); @@ -447,37 +463,19 @@ xprt_reconnect(struct rpc_task *task) start_bh_atomic(); if (!xprt->connected) { rpc_sleep_on(&xprt->reconn, task, - xprt_reconn_status, xprt_reconn_timeout); + NULL, xprt_reconn_timeout); end_bh_atomic(); return; } end_bh_atomic(); } - xprt->connecting = 0; - rpc_wake_up(&xprt->reconn); - return; defer: - task->tk_timeout = 30 * HZ; - rpc_sleep_on(&xprt->reconn, task, NULL, NULL); - xprt->connecting = 0; -} - -/* - * Reconnect status - */ -static void -xprt_reconn_status(struct rpc_task *task) -{ - struct rpc_xprt *xprt = task->tk_xprt; - - dprintk("RPC: %4d xprt_reconn_status %d\n", - task->tk_pid, task->tk_status); - if (!xprt->connected && task->tk_status != -ETIMEDOUT) { - task->tk_timeout = 30 * HZ; - rpc_sleep_on(&xprt->reconn, task, NULL, xprt_reconn_timeout); - } + start_bh_atomic(); + if (!xprt->connected) + rpc_wake_up_next(&xprt->reconn); + end_bh_atomic(); } /* @@ -490,11 +488,19 @@ xprt_reconn_timeout(struct rpc_task *task) dprintk("RPC: %4d xprt_reconn_timeout %d\n", task->tk_pid, task->tk_status); task->tk_status = -ENOTCONN; - task->tk_xprt->connecting = 0; + start_bh_atomic(); + if (task->tk_xprt->connecting) + task->tk_xprt->connecting = 0; + if (!task->tk_xprt->connected) + task->tk_status = -ENOTCONN; + else + task->tk_status = -ETIMEDOUT; + end_bh_atomic(); task->tk_timeout = 0; rpc_wake_up_task(task); } +extern spinlock_t rpc_queue_lock; /* * Look up the RPC request corresponding to a reply. */ @@ -503,22 +509,28 @@ xprt_lookup_rqst(struct rpc_xprt *xprt, u32 xid) { struct rpc_task *head, *task; struct rpc_rqst *req; + unsigned long oldflags; int safe = 0; + spin_lock_irqsave(&rpc_queue_lock, oldflags); if ((head = xprt->pending.task) != NULL) { task = head; do { if ((req = task->tk_rqstp) && req->rq_xid == xid) - return req; + goto out; task = task->tk_next; if (++safe > 100) { printk("xprt_lookup_rqst: loop in Q!\n"); - return NULL; + goto out_bad; } } while (task != head); } dprintk("RPC: unknown XID %08x in reply.\n", xid); - return NULL; + out_bad: + req = NULL; + out: + spin_unlock_irqrestore(&rpc_queue_lock, oldflags); + return req; } /* @@ -559,11 +571,13 @@ xprt_complete_rqst(struct rpc_xprt *xprt, struct rpc_rqst *req, int copied) dprintk("RPC: %4d has input (%d bytes)\n", task->tk_pid, copied); task->tk_status = copied; - rpc_wake_up_task(task); + if (!RPC_IS_RUNNING(task)) + rpc_wake_up_task(task); return; } -/* We have set things up such that we perform the checksum of the UDP +/* + * We have set things up such that we perform the checksum of the UDP * packet in parallel with the copies into the RPC client iovec. -DaveM */ static int csum_partial_copy_to_page_cache(struct iovec *iov, @@ -609,7 +623,8 @@ static int csum_partial_copy_to_page_cache(struct iovec *iov, return 0; } -/* Input handler for RPC replies. Called from a bottom half and hence +/* + * Input handler for RPC replies. Called from a bottom half and hence * atomic. */ static inline void @@ -621,12 +636,15 @@ udp_data_ready(struct sock *sk, int len) int err, repsize, copied; dprintk("RPC: udp_data_ready...\n"); - if (!(xprt = xprt_from_sock(sk))) + if (!(xprt = xprt_from_sock(sk))) { + printk("RPC: udp_data_ready request not found!\n"); return; + } + dprintk("RPC: udp_data_ready client %p\n", xprt); if ((skb = skb_recv_datagram(sk, 0, 1, &err)) == NULL) - return; + goto out_err; repsize = skb->len - sizeof(struct udphdr); if (repsize < 4) { @@ -646,6 +664,7 @@ udp_data_ready(struct sock *sk, int len) if ((copied = rovr->rq_rlen) > repsize) copied = repsize; + rovr->rq_damaged = 1; /* Suck it into the iovec, verify checksum if not done by hw. */ if (csum_partial_copy_to_page_cache(rovr->rq_rvec, skb, copied)) goto dropit; @@ -658,6 +677,8 @@ udp_data_ready(struct sock *sk, int len) dropit: skb_free_datagram(sk, skb); return; +out_err: + return; } /* @@ -679,6 +700,7 @@ tcp_input_record(struct rpc_xprt *xprt) int result, maxcpy, reclen, avail, want; dprintk("RPC: tcp_input_record\n"); + offset = xprt->tcp_offset; result = -EAGAIN; if (offset < 4 || (!xprt->tcp_more && offset < 8)) { @@ -687,11 +709,6 @@ tcp_input_record(struct rpc_xprt *xprt) riov.iov_base = xprt->tcp_recm.data + offset; riov.iov_len = want; result = xprt_recvmsg(xprt, &riov, 1, want); - if (!result) - { - dprintk("RPC: empty TCP record.\n"); - return -ENOTCONN; - } if (result < 0) goto done; offset += result; @@ -733,9 +750,9 @@ tcp_input_record(struct rpc_xprt *xprt) dprintk("RPC: %4d TCP receiving %d bytes\n", req->rq_task->tk_pid, want); + /* Request must be re-encoded before retransmit */ + req->rq_damaged = 1; result = xprt_recvmsg(xprt, xprt->tcp_iovec, req->rq_rnr, want); - if (!result && want) - result = -EAGAIN; if (result < 0) goto done; xprt->tcp_copied += result; @@ -754,12 +771,10 @@ tcp_input_record(struct rpc_xprt *xprt) xprt->tcp_copied = 0; xprt->tcp_rqstp = NULL; } - /* Request must be re-encoded before retransmit */ - req->rq_damaged = 1; } /* Skip over any trailing bytes on short reads */ - while (avail) { + while (avail > 0) { static u8 dummy[64]; want = MIN(avail, sizeof(dummy)); @@ -767,8 +782,6 @@ tcp_input_record(struct rpc_xprt *xprt) riov.iov_len = want; dprintk("RPC: TCP skipping %d bytes\n", want); result = xprt_recvmsg(xprt, &riov, 1, want); - if (!result && want) - result=-EAGAIN; if (result < 0) goto done; offset += result; @@ -789,55 +802,40 @@ done: return result; } -static __inline__ void tcp_output_record(struct rpc_xprt *xprt) -{ - if(xprt->snd_sent && xprt->snd_task) - dprintk("RPC: write space\n"); - if(xprt->write_space == 0) - { - xprt->write_space = 1; - if (xprt->snd_task && !RPC_IS_RUNNING(xprt->snd_task)) - { - if(xprt->snd_sent) - dprintk("RPC: Write wakeup snd_sent =%d\n", - xprt->snd_sent); - rpc_wake_up_task(xprt->snd_task); - } - } -} - /* * TCP task queue stuff */ -static struct rpc_xprt *rpc_rx_xprt_pending = NULL; /* Chain by rx_pending of rpc_xprt's */ -static struct rpc_xprt *rpc_tx_xprt_pending = NULL; /* Chain by tx_pending of rpc_xprt's */ +static struct rpc_xprt *rpc_xprt_pending = NULL; /* Chain by rx_pending of rpc_xprt's */ /* * This is protected from tcp_data_ready and the stack as its run * inside of the RPC I/O daemon */ - -void rpciod_tcp_dispatcher(void) +static void +do_rpciod_tcp_dispatcher(void) { struct rpc_xprt *xprt; int result; dprintk("rpciod_tcp_dispatcher: Queue Running\n"); - + /* * Empty each pending socket */ - - while((xprt=rpc_rx_xprt_pending)!=NULL) - { + + while(1) { int safe_retry=0; - - rpc_rx_xprt_pending=xprt->rx_pending; - xprt->rx_pending_flag=0; - + + if ((xprt = rpc_xprt_pending) == NULL) { + break; + } + xprt->rx_pending_flag = 0; + rpc_xprt_pending=xprt->rx_pending; + xprt->rx_pending = NULL; + dprintk("rpciod_tcp_dispatcher: Processing %p\n", xprt); - + do { if (safe_retry++ > 50) @@ -845,28 +843,30 @@ void rpciod_tcp_dispatcher(void) result = tcp_input_record(xprt); } while (result >= 0); - + switch (result) { case -EAGAIN: - continue; case -ENOTCONN: case -EPIPE: - xprt_disconnect(xprt); continue; default: printk(KERN_WARNING "RPC: unexpected error %d from tcp_input_record\n", result); } } +} - while((xprt=rpc_tx_xprt_pending)!=NULL) - { - rpc_tx_xprt_pending = xprt->tx_pending; - xprt->tx_pending_flag = 0; - tcp_output_record(xprt); - } +void rpciod_tcp_dispatcher(void) +{ + start_bh_atomic(); + do_rpciod_tcp_dispatcher(); + end_bh_atomic(); } +int xprt_tcp_pending(void) +{ + return rpc_xprt_pending != NULL; +} extern inline void tcp_rpciod_queue(void) { @@ -890,6 +890,7 @@ static void tcp_data_ready(struct sock *sk, int len) printk("Not a socket with xprt %p\n", sk); return; } + dprintk("RPC: tcp_data_ready client %p\n", xprt); dprintk("RPC: state %x conn %d dead %d zapped %d\n", sk->state, xprt->connected, @@ -898,24 +899,16 @@ static void tcp_data_ready(struct sock *sk, int len) * If we are not waiting for the RPC bh run then * we are now */ - if (!xprt->rx_pending_flag) - { - int start_queue=0; + if (!xprt->rx_pending_flag) { + dprintk("RPC: xprt queue %p\n", rpc_xprt_pending); - dprintk("RPC: xprt queue %p\n", rpc_rx_xprt_pending); - if(rpc_rx_xprt_pending==NULL) - start_queue=1; + xprt->rx_pending=rpc_xprt_pending; + rpc_xprt_pending=xprt; xprt->rx_pending_flag=1; - xprt->rx_pending=rpc_rx_xprt_pending; - rpc_rx_xprt_pending=xprt; - if (start_queue) - { - tcp_rpciod_queue(); - start_queue=0; - } - } - else + } else dprintk("RPC: xprt queued already %p\n", xprt); + tcp_rpciod_queue(); + } @@ -931,17 +924,32 @@ tcp_state_change(struct sock *sk) sk->state, xprt->connected, sk->dead, sk->zapped); - if (sk->state == TCP_ESTABLISHED && !xprt->connected) { + switch(sk->state) { + case TCP_ESTABLISHED: + if (xprt->connected) + break; xprt->connected = 1; xprt->connecting = 0; rpc_wake_up(&xprt->reconn); - } else if (sk->zapped) { - rpc_wake_up_status(&xprt->pending, -ENOTCONN); - rpc_wake_up_status(&xprt->sending, -ENOTCONN); + rpc_wake_up_next(&xprt->sending); + tcp_rpciod_queue(); + break; + case TCP_CLOSE: + if (xprt->connecting) + break; + xprt_disconnect(xprt); rpc_wake_up_status(&xprt->reconn, -ENOTCONN); + break; + default: + break; } + } +/* + * The following 2 routines allow a task to sleep while socket memory is + * low. + */ static void tcp_write_space(struct sock *sk) { @@ -949,17 +957,43 @@ tcp_write_space(struct sock *sk) if (!(xprt = xprt_from_sock(sk))) return; - if (!xprt->tx_pending_flag) { - int start_queue = 0; - - if (rpc_tx_xprt_pending == NULL) - start_queue = 1; - xprt->tx_pending_flag = 1; - xprt->tx_pending = rpc_tx_xprt_pending; - rpc_tx_xprt_pending = xprt; - if (start_queue) - tcp_rpciod_queue(); - } + + /* Wait until we have enough socket memory */ + if (sock_wspace(sk) < min(sk->sndbuf,XPRT_MIN_WRITE_SPACE)) + return; + + if (xprt->write_space) + return; + + xprt->write_space = 1; + + if (!xprt->snd_task) + rpc_wake_up_next(&xprt->sending); + else if (!RPC_IS_RUNNING(xprt->snd_task)) + rpc_wake_up_task(xprt->snd_task); +} + +static void +udp_write_space(struct sock *sk) +{ + struct rpc_xprt *xprt; + + if (!(xprt = xprt_from_sock(sk))) + return; + + + /* Wait until we have enough socket memory */ + if (sock_wspace(sk) < min(sk->sndbuf,XPRT_MIN_WRITE_SPACE)) + return; + + if (xprt->write_space) + return; + + xprt->write_space = 1; + if (!xprt->snd_task) + rpc_wake_up_next(&xprt->sending); + else if (!RPC_IS_RUNNING(xprt->snd_task)) + rpc_wake_up_task(xprt->snd_task); } /* @@ -982,32 +1016,50 @@ xprt_timer(struct rpc_task *task) rpc_wake_up_task(task); } + /* - * (Partly) transmit the RPC packet - * Note that task->tk_status is either 0 or negative on return. - * Only when the reply is received will the status be set to a - * positive value. + * Serialize access to sockets, in order to prevent different + * requests from interfering with each other. */ -static inline int -xprt_transmit_some(struct rpc_xprt *xprt, struct rpc_task *task) +static int +xprt_down_transmit(struct rpc_task *task) { + struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt; struct rpc_rqst *req = task->tk_rqstp; - int result; - task->tk_status = 0; - if ((result = xprt_sendmsg(xprt)) >= 0) { - if (!xprt->snd_buf.io_len || !xprt->stream) { - rpc_wake_up_next(&xprt->sending); - return req->rq_slen; - } - result = -EAGAIN; - } else if (xprt->stream) { - if (result == -ENOTCONN || result == -EPIPE) { - xprt_disconnect(xprt); - result = -ENOTCONN; - } + start_bh_atomic(); + spin_lock(&xprt_lock); + if (xprt->snd_task && xprt->snd_task != task) { + dprintk("RPC: %4d TCP write queue full (task %d)\n", + task->tk_pid, xprt->snd_task->tk_pid); + task->tk_timeout = req->rq_timeout.to_current; + rpc_sleep_on(&xprt->sending, task, xprt_transmit, NULL); + } else if (!xprt->snd_task) { + xprt->snd_task = task; +#ifdef RPC_PROFILE + req->rq_xtime = jiffies; +#endif + req->rq_bytes_sent = 0; + } + spin_unlock(&xprt_lock); + end_bh_atomic(); + return xprt->snd_task == task; +} + +/* + * Releases the socket for use by other requests. + */ +static void +xprt_up_transmit(struct rpc_task *task) +{ + struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt; + + if (xprt->snd_task && xprt->snd_task == task) { + start_bh_atomic(); + xprt->snd_task = NULL; + rpc_wake_up_next(&xprt->sending); + end_bh_atomic(); } - return task->tk_status = result; } /* @@ -1020,71 +1072,65 @@ xprt_transmit(struct rpc_task *task) struct rpc_timeout *timeo; struct rpc_rqst *req = task->tk_rqstp; struct rpc_xprt *xprt = req->rq_xprt; - int status; dprintk("RPC: %4d xprt_transmit(%x)\n", task->tk_pid, *(u32 *)(req->rq_svec[0].iov_base)); - if (xprt->shutdown) { + if (xprt->shutdown) task->tk_status = -EIO; + + if (task->tk_status < 0) return; + + /* Reset timeout parameters */ + timeo = &req->rq_timeout; + if (timeo->to_retries < 0) { + dprintk("RPC: %4d xprt_transmit reset timeo\n", + task->tk_pid); + timeo->to_retries = xprt->timeout.to_retries; + timeo->to_current = timeo->to_initval; } - /* If we're not already in the process of transmitting our call, - * set up everything as needed. */ - if (xprt->snd_task != task) { - /* Write the record marker */ - if (xprt->stream) { - u32 marker; + /* set up everything as needed. */ + /* Write the record marker */ + if (xprt->stream) { + u32 marker; - if (!xprt->connected) { - task->tk_status = -ENOTCONN; - return; - } - marker = htonl(0x80000000|(req->rq_slen-4)); - *((u32 *) req->rq_svec[0].iov_base) = marker; - } + marker = htonl(0x80000000|(req->rq_slen-4)); + *((u32 *) req->rq_svec[0].iov_base) = marker; - /* Reset timeout parameters */ - timeo = &req->rq_timeout; - if (timeo->to_retries < 0) { - dprintk("RPC: %4d xprt_transmit reset timeo\n", - task->tk_pid); - timeo->to_retries = xprt->timeout.to_retries; - timeo->to_current = timeo->to_initval; - } + } -#ifdef RPC_PROFILE - req->rq_xtime = jiffies; -#endif - req->rq_gotit = 0; + if (!xprt_down_transmit(task)) + return; - if (xprt->snd_task) { - dprintk("RPC: %4d TCP write queue full (task %d)\n", - task->tk_pid, xprt->snd_task->tk_pid); - rpc_sleep_on(&xprt->sending, task, - xprt_transmit_status, NULL); - return; - } - xprt->snd_buf = req->rq_snd_buf; - xprt->snd_task = task; - xprt->snd_sent = 0; + do_xprt_transmit(task); +} + +static void +do_xprt_transmit(struct rpc_task *task) +{ + struct rpc_rqst *req = task->tk_rqstp; + struct rpc_xprt *xprt = req->rq_xprt; + int status, retry = 0; + + if (xprt->shutdown) { + task->tk_status = -EIO; + goto out_release; } /* For fast networks/servers we have to put the request on * the pending list now: */ - start_bh_atomic(); + req->rq_gotit = 0; status = rpc_add_wait_queue(&xprt->pending, task); if (!status) task->tk_callback = NULL; - end_bh_atomic(); - if (status) - { + if (status) { printk(KERN_WARNING "RPC: failed to add task to queue: error: %d!\n", status); task->tk_status = status; - return; + goto out_release; } /* Continue transmitting the packet/record. We must be careful @@ -1093,27 +1139,55 @@ xprt_transmit(struct rpc_task *task) */ while (1) { xprt->write_space = 0; - if (xprt_transmit_some(xprt, task) != -EAGAIN) { + status = xprt_sendmsg(xprt, req); + + if (status < 0) + break; + + if (xprt->stream) { + req->rq_bytes_sent += status; + + if (req->rq_bytes_sent >= req->rq_slen) + goto out_release; + } + + if (status < req->rq_slen) + status = -EAGAIN; + + if (status >= 0 || !xprt->stream) { dprintk("RPC: %4d xmit complete\n", task->tk_pid); - xprt->snd_task = NULL; - return; + goto out_release; } - /*d*/dprintk("RPC: %4d xmit incomplete (%d left of %d)\n", - task->tk_pid, xprt->snd_buf.io_len, + dprintk("RPC: %4d xmit incomplete (%d left of %d)\n", + task->tk_pid, req->rq_slen - req->rq_bytes_sent, req->rq_slen); - task->tk_status = 0; - start_bh_atomic(); - if (!xprt->write_space) { - /* Remove from pending */ - rpc_remove_wait_queue(task); - rpc_sleep_on(&xprt->sending, task, - xprt_transmit_status, NULL); - end_bh_atomic(); - return; - } + + if (retry++ > 50) + break; + } + + task->tk_status = (status == -ENOMEM) ? -EAGAIN : status; + + /* We don't care if we got a reply, so don't protect + * against bh. */ + if (task->tk_rpcwait == &xprt->pending) + rpc_remove_wait_queue(task); + + /* Protect against (udp|tcp)_write_space */ + start_bh_atomic(); + if (status == -ENOMEM || status == -EAGAIN) { + task->tk_timeout = req->rq_timeout.to_current; + if (!xprt->write_space) + rpc_sleep_on(&xprt->sending, task, xprt_transmit_status, + xprt_transmit_timeout); end_bh_atomic(); + return; } + end_bh_atomic(); + +out_release: + xprt_up_transmit(task); } /* @@ -1126,18 +1200,26 @@ xprt_transmit_status(struct rpc_task *task) struct rpc_xprt *xprt = task->tk_client->cl_xprt; dprintk("RPC: %4d transmit_status %d\n", task->tk_pid, task->tk_status); - if (xprt->snd_task == task) - { - if (task->tk_status < 0) - { - xprt->snd_task = NULL; - xprt_disconnect(xprt); - } - else - xprt_transmit(task); + if (xprt->snd_task == task) { + task->tk_status = 0; + do_xprt_transmit(task); + return; } } +/* + * RPC transmit timeout handler. + */ +static void +xprt_transmit_timeout(struct rpc_task *task) +{ + dprintk("RPC: %4d transmit_timeout %d\n", task->tk_pid, task->tk_status); + task->tk_status = -ETIMEDOUT; + task->tk_timeout = 0; + rpc_wake_up_task(task); + xprt_up_transmit(task); +} + /* * Wait for the reply to our call. * When the callback is invoked, the congestion window should have @@ -1150,25 +1232,33 @@ xprt_receive(struct rpc_task *task) struct rpc_xprt *xprt = req->rq_xprt; dprintk("RPC: %4d xprt_receive\n", task->tk_pid); - if (xprt->connected == 0) { - task->tk_status = -ENOTCONN; - return; - } /* - * Wait until rq_gotit goes non-null, or timeout elapsed. + * Wait until rq_gotit goes non-null, or timeout elapsed. */ task->tk_timeout = req->rq_timeout.to_current; start_bh_atomic(); + if (task->tk_rpcwait) + rpc_remove_wait_queue(task); + + if (task->tk_status < 0 || xprt->shutdown) { + end_bh_atomic(); + goto out; + } + if (!req->rq_gotit) { rpc_sleep_on(&xprt->pending, task, xprt_receive_status, xprt_timer); + end_bh_atomic(); + return; } end_bh_atomic(); dprintk("RPC: %4d xprt_receive returns %d\n", task->tk_pid, task->tk_status); + out: + xprt_receive_status(task); } static void @@ -1176,8 +1266,9 @@ xprt_receive_status(struct rpc_task *task) { struct rpc_xprt *xprt = task->tk_xprt; - if (xprt->stream && xprt->tcp_rqstp == task->tk_rqstp) + if (xprt->tcp_rqstp == task->tk_rqstp) xprt->tcp_rqstp = NULL; + } /* @@ -1194,7 +1285,7 @@ xprt_reserve(struct rpc_task *task) dprintk("RPC: %4d xprt_reserve cong = %ld cwnd = %ld\n", task->tk_pid, xprt->cong, xprt->cwnd); - if ((!RPCXPRT_CONGESTED(xprt) && xprt->free)) { + if (!RPCXPRT_CONGESTED(xprt) && xprt->free) { xprt_reserve_status(task); task->tk_timeout = 0; } else if (!task->tk_timeout) { @@ -1223,40 +1314,30 @@ xprt_reserve_status(struct rpc_task *task) /* NOP */ } else if (task->tk_rqstp) { /* We've already been given a request slot: NOP */ - } else if (!RPCXPRT_CONGESTED(xprt)) { + } else if (!RPCXPRT_CONGESTED(xprt) && xprt->free) { /* OK: There's room for us. Grab a free slot and bump * congestion value */ - req = xprt->free; - if (!req) - goto bad_list; - if (req->rq_xid) - goto bad_used; + spin_lock(&xprt_lock); + if (!(req = xprt->free)) { + spin_unlock(&xprt_lock); + goto out_nofree; + } xprt->free = req->rq_next; + req->rq_next = NULL; + spin_unlock(&xprt_lock); xprt->cong += RPC_CWNDSCALE; task->tk_rqstp = req; - req->rq_next = NULL; xprt_request_init(task, xprt); - } else { - task->tk_status = -EAGAIN; - } - if (xprt->free && !RPCXPRT_CONGESTED(xprt)) - rpc_wake_up_next(&xprt->backlog); + if (xprt->free) + xprt_clear_backlog(xprt); + } else + goto out_nofree; return; -bad_list: - printk(KERN_ERR - "RPC: %4d inconsistent free list (cong %ld cwnd %ld)\n", - task->tk_pid, xprt->cong, xprt->cwnd); - rpc_debug = ~0; - goto bummer; -bad_used: - printk(KERN_ERR "RPC: used rqst slot %p on free list!\n", req); -bummer: - task->tk_status = -EIO; - xprt->free = NULL; - return; +out_nofree: + task->tk_status = -EAGAIN; } /* @@ -1298,13 +1379,15 @@ xprt_release(struct rpc_task *task) dprintk("RPC: %4d release request %p\n", task->tk_pid, req); + spin_lock(&xprt_lock); + req->rq_next = xprt->free; + xprt->free = req; + spin_unlock(&xprt_lock); + /* remove slot from queue of pending */ start_bh_atomic(); if (task->tk_rpcwait) { printk("RPC: task of released request still queued!\n"); -#ifdef RPC_DEBUG - printk("RPC: (task is on %s)\n", rpc_qname(task->tk_rpcwait)); -#endif rpc_del_timer(task); rpc_remove_wait_queue(task); } @@ -1313,31 +1396,7 @@ xprt_release(struct rpc_task *task) /* Decrease congestion value. */ xprt->cong -= RPC_CWNDSCALE; -#if 0 - /* If congestion threshold is not yet reached, pass on the request slot. - * This looks kind of kludgy, but it guarantees backlogged requests - * are served in order. - * N.B. This doesn't look completely safe, as the task is still - * on the backlog list after wake-up. - */ - if (!RPCXPRT_CONGESTED(xprt)) { - struct rpc_task *next = rpc_wake_up_next(&xprt->backlog); - - if (next && next->tk_rqstp == 0) { - xprt->cong += RPC_CWNDSCALE; - next->tk_rqstp = req; - xprt_request_init(next, xprt); - return; - } - } -#endif - - req->rq_next = xprt->free; - xprt->free = req; - - /* If not congested, wake up the next backlogged process */ - if (!RPCXPRT_CONGESTED(xprt)) - rpc_wake_up_next(&xprt->backlog); + xprt_clear_backlog(xprt); } /* @@ -1382,11 +1441,7 @@ xprt_setup(struct socket *sock, int proto, dprintk("RPC: setting up %s transport...\n", proto == IPPROTO_UDP? "UDP" : "TCP"); -#if LINUX_VERSION_CODE >= 0x020100 inet = sock->sk; -#else - inet = (struct sock *) sock->data; -#endif if ((xprt = kmalloc(sizeof(struct rpc_xprt), GFP_KERNEL)) == NULL) return NULL; @@ -1398,7 +1453,8 @@ xprt_setup(struct socket *sock, int proto, xprt->addr = *ap; xprt->prot = proto; xprt->stream = (proto == IPPROTO_TCP)? 1 : 0; - xprt->cwnd = RPC_INITCWND; + xprt->congtime = jiffies; + init_waitqueue_head(&xprt->cong_wait); #ifdef SOCK_HAS_USER_DATA inet->user_data = xprt; #else @@ -1410,11 +1466,14 @@ xprt_setup(struct socket *sock, int proto, xprt->old_write_space = inet->write_space; if (proto == IPPROTO_UDP) { inet->data_ready = udp_data_ready; + inet->write_space = udp_write_space; inet->no_check = UDP_CSUM_NORCV; + xprt->cwnd = RPC_INITCWND; } else { inet->data_ready = tcp_data_ready; inet->state_change = tcp_state_change; inet->write_space = tcp_write_space; + xprt->cwnd = RPC_MAXCWND; xprt->nocong = 1; } xprt->connected = 1; @@ -1487,6 +1546,7 @@ xprt_create_socket(int proto, struct sockaddr_in *sap, struct rpc_timeout *to) (proto == IPPROTO_UDP)? "udp" : "tcp", proto); type = (proto == IPPROTO_UDP)? SOCK_DGRAM : SOCK_STREAM; + if ((err = sock_create(PF_INET, type, proto, &sock)) < 0) { printk("RPC: can't create socket (%d).\n", -err); goto failed; @@ -1543,6 +1603,21 @@ xprt_shutdown(struct rpc_xprt *xprt) rpc_wake_up(&xprt->pending); rpc_wake_up(&xprt->backlog); rpc_wake_up(&xprt->reconn); + wake_up(&xprt->cong_wait); +} + +/* + * Clear the xprt backlog queue + */ +int +xprt_clear_backlog(struct rpc_xprt *xprt) { + if (!xprt) + return 0; + if (RPCXPRT_CONGESTED(xprt)) + return 0; + rpc_wake_up_next(&xprt->backlog); + wake_up(&xprt->cong_wait); + return 1; } /* -- cgit v1.2.3