diff options
Diffstat (limited to 'net/sunrpc')
-rw-r--r-- | net/sunrpc/auth.c | 121 | ||||
-rw-r--r-- | net/sunrpc/auth_null.c | 10 | ||||
-rw-r--r-- | net/sunrpc/auth_unix.c | 21 | ||||
-rw-r--r-- | net/sunrpc/clnt.c | 359 | ||||
-rw-r--r-- | net/sunrpc/pmap_clnt.c | 3 | ||||
-rw-r--r-- | net/sunrpc/sched.c | 396 | ||||
-rw-r--r-- | net/sunrpc/sunrpc_syms.c | 6 | ||||
-rw-r--r-- | net/sunrpc/xprt.c | 943 |
8 files changed, 1001 insertions, 858 deletions
diff --git a/net/sunrpc/auth.c b/net/sunrpc/auth.c index af9923ec8..b8c0a7be8 100644 --- a/net/sunrpc/auth.c +++ b/net/sunrpc/auth.c @@ -4,12 +4,9 @@ * Generic RPC authentication API. * * Copyright (C) 1996, Olaf Kirch <okir@monad.swb.de> - * - * Modified May 1999, Horst von Brand <vonbrand@sleipnir.valparaiso.cl> */ #include <linux/types.h> -#include <linux/string.h> #include <linux/sched.h> #include <linux/malloc.h> #include <linux/errno.h> @@ -84,6 +81,15 @@ rpcauth_init_credcache(struct rpc_auth *auth) auth->au_nextgc = jiffies + (auth->au_expire >> 1); } +static inline void +rpcauth_crdestroy(struct rpc_auth *auth, struct rpc_cred *cred) +{ + if (auth->au_ops->crdestroy) + auth->au_ops->crdestroy(cred); + else + rpc_free(cred); +} + /* * Clear the RPC credential cache */ @@ -115,18 +121,15 @@ static void rpcauth_gc_credcache(struct rpc_auth *auth) { struct rpc_cred **q, *cred, *free = NULL; - int i, safe = 0; + int i; dprintk("RPC: gc'ing RPC credentials for auth %p\n", auth); spin_lock(&rpc_credcache_lock); for (i = 0; i < RPC_CREDCACHE_NR; i++) { q = &auth->au_credcache[i]; while ((cred = *q) != NULL) { - if (++safe > 500) { - printk("RPC: rpcauth_gc_credcache looping!\n"); - break; - } - if (!cred->cr_count && time_before(cred->cr_expire, jiffies)) { + if (!cred->cr_count && + time_before(cred->cr_expire, jiffies)) { *q = cred->cr_next; cred->cr_next = free; free = cred; @@ -138,7 +141,7 @@ rpcauth_gc_credcache(struct rpc_auth *auth) spin_unlock(&rpc_credcache_lock); while ((cred = free) != NULL) { free = cred->cr_next; - rpc_free(cred); + rpcauth_crdestroy(auth, cred); } auth->au_nextgc = jiffies + auth->au_expire; } @@ -146,7 +149,7 @@ rpcauth_gc_credcache(struct rpc_auth *auth) /* * Insert credential into cache */ -inline void +void rpcauth_insert_credcache(struct rpc_auth *auth, struct rpc_cred *cred) { int nr; @@ -155,8 +158,8 @@ rpcauth_insert_credcache(struct rpc_auth *auth, struct rpc_cred *cred) spin_lock(&rpc_credcache_lock); cred->cr_next = auth->au_credcache[nr]; auth->au_credcache[nr] = cred; - cred->cr_expire = jiffies + auth->au_expire; cred->cr_count++; + cred->cr_expire = jiffies + auth->au_expire; spin_unlock(&rpc_credcache_lock); } @@ -164,13 +167,13 @@ rpcauth_insert_credcache(struct rpc_auth *auth, struct rpc_cred *cred) * Look up a process' credentials in the authentication cache */ static struct rpc_cred * -rpcauth_lookup_credcache(struct rpc_task *task) +rpcauth_lookup_credcache(struct rpc_auth *auth, int taskflags) { - struct rpc_auth *auth = task->tk_auth; struct rpc_cred **q, *cred = NULL; - int nr; + int nr = 0; - nr = RPC_DO_ROOTOVERRIDE(task)? 0 : (current->uid % RPC_CREDCACHE_NR); + if (!(taskflags & RPC_TASK_ROOTCREDS)) + nr = current->uid % RPC_CREDCACHE_NR; if (time_before(auth->au_nextgc, jiffies)) rpcauth_gc_credcache(auth); @@ -178,7 +181,8 @@ rpcauth_lookup_credcache(struct rpc_task *task) spin_lock(&rpc_credcache_lock); q = &auth->au_credcache[nr]; while ((cred = *q) != NULL) { - if (auth->au_ops->crmatch(task, cred)) { + if (!(cred->cr_flags & RPCAUTH_CRED_DEAD) && + auth->au_ops->crmatch(cred, taskflags)) { *q = cred->cr_next; break; } @@ -187,7 +191,7 @@ rpcauth_lookup_credcache(struct rpc_task *task) spin_unlock(&rpc_credcache_lock); if (!cred) - cred = auth->au_ops->crcreate(task); + cred = auth->au_ops->crcreate(taskflags); if (cred) rpcauth_insert_credcache(auth, cred); @@ -198,7 +202,7 @@ rpcauth_lookup_credcache(struct rpc_task *task) /* * Remove cred handle from cache */ -static inline void +static void rpcauth_remove_credcache(struct rpc_auth *auth, struct rpc_cred *cred) { struct rpc_cred **q, *cr; @@ -210,7 +214,8 @@ rpcauth_remove_credcache(struct rpc_auth *auth, struct rpc_cred *cred) while ((cr = *q) != NULL) { if (cred == cr) { *q = cred->cr_next; - return; + cred->cr_next = NULL; + break; } q = &cred->cr_next; } @@ -218,58 +223,78 @@ rpcauth_remove_credcache(struct rpc_auth *auth, struct rpc_cred *cred) } struct rpc_cred * -rpcauth_lookupcred(struct rpc_task *task) +rpcauth_lookupcred(struct rpc_auth *auth, int taskflags) +{ + dprintk("RPC: looking up %s cred\n", + auth->au_ops->au_name); + return rpcauth_lookup_credcache(auth, taskflags); +} + +struct rpc_cred * +rpcauth_bindcred(struct rpc_task *task) { + struct rpc_auth *auth = task->tk_auth; + dprintk("RPC: %4d looking up %s cred\n", task->tk_pid, task->tk_auth->au_ops->au_name); - return task->tk_cred = rpcauth_lookup_credcache(task); + task->tk_msg.rpc_cred = rpcauth_lookup_credcache(auth, task->tk_flags); + if (task->tk_msg.rpc_cred == 0) + task->tk_status = -ENOMEM; + return task->tk_msg.rpc_cred; } int -rpcauth_matchcred(struct rpc_task *task, struct rpc_cred *cred) +rpcauth_matchcred(struct rpc_auth *auth, struct rpc_cred *cred, int taskflags) { - struct rpc_auth *auth = task->tk_auth; - - dprintk("RPC: %4d matching %s cred %p\n", - task->tk_pid, auth->au_ops->au_name, task->tk_cred); - return auth->au_ops->crmatch(task, cred); + dprintk("RPC: matching %s cred %d\n", + auth->au_ops->au_name, taskflags); + return auth->au_ops->crmatch(cred, taskflags); } void rpcauth_holdcred(struct rpc_task *task) { dprintk("RPC: %4d holding %s cred %p\n", - task->tk_pid, task->tk_auth->au_ops->au_name, task->tk_cred); - if (task->tk_cred) - task->tk_cred->cr_count++; + task->tk_pid, task->tk_auth->au_ops->au_name, task->tk_msg.rpc_cred); + if (task->tk_msg.rpc_cred) { + task->tk_msg.rpc_cred->cr_count++; + task->tk_msg.rpc_cred->cr_expire = jiffies + task->tk_auth->au_expire; + } } void -rpcauth_releasecred(struct rpc_task *task) +rpcauth_releasecred(struct rpc_auth *auth, struct rpc_cred *cred) { - struct rpc_auth *auth = task->tk_auth; - struct rpc_cred *cred; - - dprintk("RPC: %4d releasing %s cred %p\n", - task->tk_pid, auth->au_ops->au_name, task->tk_cred); - if ((cred = task->tk_cred) != NULL) { + if (cred != NULL && cred->cr_count > 0) { cred->cr_count--; if (cred->cr_flags & RPCAUTH_CRED_DEAD) { rpcauth_remove_credcache(auth, cred); if (!cred->cr_count) - auth->au_ops->crdestroy(cred); + rpcauth_crdestroy(auth, cred); } - task->tk_cred = NULL; } } +void +rpcauth_unbindcred(struct rpc_task *task) +{ + struct rpc_auth *auth = task->tk_auth; + struct rpc_cred *cred = task->tk_msg.rpc_cred; + + dprintk("RPC: %4d releasing %s cred %p\n", + task->tk_pid, auth->au_ops->au_name, cred); + + rpcauth_releasecred(auth, cred); + task->tk_msg.rpc_cred = NULL; +} + u32 * rpcauth_marshcred(struct rpc_task *task, u32 *p) { struct rpc_auth *auth = task->tk_auth; dprintk("RPC: %4d marshaling %s cred %p\n", - task->tk_pid, auth->au_ops->au_name, task->tk_cred); + task->tk_pid, auth->au_ops->au_name, task->tk_msg.rpc_cred); return auth->au_ops->crmarshal(task, p, task->tk_flags & RPC_CALL_REALUID); } @@ -280,7 +305,7 @@ rpcauth_checkverf(struct rpc_task *task, u32 *p) struct rpc_auth *auth = task->tk_auth; dprintk("RPC: %4d validating %s cred %p\n", - task->tk_pid, auth->au_ops->au_name, task->tk_cred); + task->tk_pid, auth->au_ops->au_name, task->tk_msg.rpc_cred); return auth->au_ops->crvalidate(task, p); } @@ -290,7 +315,7 @@ rpcauth_refreshcred(struct rpc_task *task) struct rpc_auth *auth = task->tk_auth; dprintk("RPC: %4d refreshing %s cred %p\n", - task->tk_pid, auth->au_ops->au_name, task->tk_cred); + task->tk_pid, auth->au_ops->au_name, task->tk_msg.rpc_cred); task->tk_status = auth->au_ops->crrefresh(task); return task->tk_status; } @@ -299,14 +324,14 @@ void rpcauth_invalcred(struct rpc_task *task) { dprintk("RPC: %4d invalidating %s cred %p\n", - task->tk_pid, task->tk_auth->au_ops->au_name, task->tk_cred); - if (task->tk_cred) - task->tk_cred->cr_flags &= ~RPCAUTH_CRED_UPTODATE; + task->tk_pid, task->tk_auth->au_ops->au_name, task->tk_msg.rpc_cred); + if (task->tk_msg.rpc_cred) + task->tk_msg.rpc_cred->cr_flags &= ~RPCAUTH_CRED_UPTODATE; } int rpcauth_uptodatecred(struct rpc_task *task) { - return !(task->tk_cred) || - (task->tk_cred->cr_flags & RPCAUTH_CRED_UPTODATE); + return !(task->tk_msg.rpc_cred) || + (task->tk_msg.rpc_cred->cr_flags & RPCAUTH_CRED_UPTODATE); } diff --git a/net/sunrpc/auth_null.c b/net/sunrpc/auth_null.c index be6d19637..d2e645acd 100644 --- a/net/sunrpc/auth_null.c +++ b/net/sunrpc/auth_null.c @@ -38,6 +38,7 @@ static void nul_destroy(struct rpc_auth *auth) { dprintk("RPC: destroying NULL authenticator %p\n", auth); + rpcauth_free_credcache(auth); rpc_free(auth); } @@ -45,15 +46,12 @@ nul_destroy(struct rpc_auth *auth) * Create NULL creds for current process */ static struct rpc_cred * -nul_create_cred(struct rpc_task *task) +nul_create_cred(int flags) { struct rpc_cred *cred; - if (!(cred = (struct rpc_cred *) rpc_malloc(task, sizeof(*cred)))) { - task->tk_status = -ENOMEM; + if (!(cred = (struct rpc_cred *) rpc_allocate(flags, sizeof(*cred)))) return NULL; - } - cred->cr_count = 0; cred->cr_flags = RPCAUTH_CRED_UPTODATE; @@ -73,7 +71,7 @@ nul_destroy_cred(struct rpc_cred *cred) * Match cred handle against current process */ static int -nul_match(struct rpc_task *task, struct rpc_cred *cred) +nul_match(struct rpc_cred *cred, int taskflags) { return 1; } diff --git a/net/sunrpc/auth_unix.c b/net/sunrpc/auth_unix.c index 6596085b3..8033ed6c1 100644 --- a/net/sunrpc/auth_unix.c +++ b/net/sunrpc/auth_unix.c @@ -4,12 +4,9 @@ * UNIX-style authentication; no AUTH_SHORT support * * Copyright (C) 1996, Olaf Kirch <okir@monad.swb.de> - * - * Modified May 1999 Horst von Brand <vonbrand@sleipnir.valparaiso.cl> */ #include <linux/types.h> -#include <linux/string.h> #include <linux/malloc.h> #include <linux/socket.h> #include <linux/in.h> @@ -63,7 +60,7 @@ unx_destroy(struct rpc_auth *auth) } static struct rpc_cred * -unx_create_cred(struct rpc_task *task) +unx_create_cred(int flags) { struct unx_cred *cred; int i; @@ -71,14 +68,12 @@ unx_create_cred(struct rpc_task *task) dprintk("RPC: allocating UNIX cred for uid %d gid %d\n", current->uid, current->gid); - if (!(cred = (struct unx_cred *) rpc_malloc(task, sizeof(*cred)))) { - task->tk_status = -ENOMEM; + if (!(cred = (struct unx_cred *) rpc_allocate(flags, sizeof(*cred)))) return NULL; - } cred->uc_count = 0; cred->uc_flags = RPCAUTH_CRED_UPTODATE; - if (RPC_DO_ROOTOVERRIDE(task)) { + if (flags & RPC_TASK_ROOTCREDS) { cred->uc_uid = cred->uc_fsuid = 0; cred->uc_gid = cred->uc_fsgid = 0; cred->uc_gids[0] = NOGROUP; @@ -119,7 +114,7 @@ authunix_fake_cred(struct rpc_task *task, uid_t uid, gid_t gid) cred->uc_fsgid = gid; cred->uc_gids[0] = (gid_t) NOGROUP; - return task->tk_cred = (struct rpc_cred *) cred; + return task->tk_msg.rpc_cred = (struct rpc_cred *) cred; } static void @@ -134,12 +129,12 @@ unx_destroy_cred(struct rpc_cred *cred) * request root creds (e.g. for NFS swapping). */ static int -unx_match(struct rpc_task * task, struct rpc_cred *rcred) +unx_match(struct rpc_cred *rcred, int taskflags) { struct unx_cred *cred = (struct unx_cred *) rcred; int i; - if (!RPC_DO_ROOTOVERRIDE(task)) { + if (!(taskflags & RPC_TASK_ROOTCREDS)) { int groups; if (cred->uc_uid != current->uid @@ -169,7 +164,7 @@ static u32 * unx_marshal(struct rpc_task *task, u32 *p, int ruid) { struct rpc_clnt *clnt = task->tk_client; - struct unx_cred *cred = (struct unx_cred *) task->tk_cred; + struct unx_cred *cred = (struct unx_cred *) task->tk_msg.rpc_cred; u32 *base, *hold; int i, n; @@ -210,7 +205,7 @@ unx_marshal(struct rpc_task *task, u32 *p, int ruid) static int unx_refresh(struct rpc_task *task) { - task->tk_cred->cr_flags |= RPCAUTH_CRED_UPTODATE; + task->tk_msg.rpc_cred->cr_flags |= RPCAUTH_CRED_UPTODATE; return task->tk_status = -EACCES; } diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c index b4d6bd81a..806e14bce 100644 --- a/net/sunrpc/clnt.c +++ b/net/sunrpc/clnt.c @@ -42,14 +42,13 @@ static DECLARE_WAIT_QUEUE_HEAD(destroy_wait); -static void call_bind(struct rpc_task *task); static void call_reserve(struct rpc_task *task); static void call_reserveresult(struct rpc_task *task); static void call_allocate(struct rpc_task *task); static void call_encode(struct rpc_task *task); static void call_decode(struct rpc_task *task); +static void call_bind(struct rpc_task *task); static void call_transmit(struct rpc_task *task); -static void call_receive(struct rpc_task *task); static void call_status(struct rpc_task *task); static void call_refresh(struct rpc_task *task); static void call_refreshresult(struct rpc_task *task); @@ -98,7 +97,7 @@ rpc_create_client(struct rpc_xprt *xprt, char *servname, clnt->cl_port = xprt->addr.sin_port; clnt->cl_prog = program->number; clnt->cl_vers = version->number; - clnt->cl_prot = IPPROTO_UDP; + clnt->cl_prot = xprt->prot; clnt->cl_stats = program->stats; clnt->cl_bindwait = RPC_INIT_WAITQ("bindwait"); @@ -117,10 +116,10 @@ out: return clnt; out_no_clnt: - printk("RPC: out of memory in rpc_create_client\n"); + printk(KERN_INFO "RPC: out of memory in rpc_create_client\n"); goto out; out_no_auth: - printk("RPC: Couldn't create auth handle (flavor %d)\n", + printk(KERN_INFO "RPC: Couldn't create auth handle (flavor %d)\n", flavor); rpc_free(clnt); clnt = NULL; @@ -140,7 +139,7 @@ rpc_shutdown_client(struct rpc_clnt *clnt) clnt->cl_protname, clnt->cl_server); while (clnt->cl_users) { #ifdef RPC_DEBUG - printk("rpc_shutdown_client: client %s, tasks=%d\n", + dprintk("RPC: rpc_shutdown_client: client %s, tasks=%d\n", clnt->cl_protname, clnt->cl_users); #endif /* Don't let rpc_release_client destroy us */ @@ -240,42 +239,72 @@ void rpc_clnt_sigunmask(struct rpc_clnt *clnt, sigset_t *oldset) /* * New rpc_call implementation */ -int -rpc_do_call(struct rpc_clnt *clnt, u32 proc, void *argp, void *resp, - int flags, rpc_action func, void *data) +int rpc_call_sync(struct rpc_clnt *clnt, struct rpc_message *msg, int flags) { struct rpc_task my_task, *task = &my_task; sigset_t oldset; - int async, status; + int status; /* If this client is slain all further I/O fails */ if (clnt->cl_dead) return -EIO; + if (flags & RPC_TASK_ASYNC) { + printk("rpc_call_sync: Illegal flag combination for synchronous task\n"); + flags &= ~RPC_TASK_ASYNC; + } + rpc_clnt_sigmask(clnt, &oldset); /* Create/initialize a new RPC task */ - if ((async = (flags & RPC_TASK_ASYNC)) != 0) { - if (!func) - func = rpc_default_callback; - status = -ENOMEM; - if (!(task = rpc_new_task(clnt, func, flags))) - goto out; - task->tk_calldata = data; - } else { - rpc_init_task(task, clnt, NULL, flags); - } + rpc_init_task(task, clnt, NULL, flags); + rpc_call_setup(task, msg, 0); - /* Bind the user cred, set up the call info struct and - * execute the task */ - if (rpcauth_lookupcred(task) != NULL) { - rpc_call_setup(task, proc, argp, resp, 0); - rpc_execute(task); - } else - async = 0; + /* Set up the call info struct and execute the task */ + if (task->tk_status == 0) + status = rpc_execute(task); + else + status = task->tk_status; + rpc_release_task(task); + + rpc_clnt_sigunmask(clnt, &oldset); + + return status; +} - status = 0; - if (!async) { +/* + * New rpc_call implementation + */ +int +rpc_call_async(struct rpc_clnt *clnt, struct rpc_message *msg, int flags, + rpc_action callback, void *data) +{ + struct rpc_task *task; + sigset_t oldset; + int status; + + /* If this client is slain all further I/O fails */ + if (clnt->cl_dead) + return -EIO; + + flags |= RPC_TASK_ASYNC; + + rpc_clnt_sigmask(clnt, &oldset); + + /* Create/initialize a new RPC task */ + if (!callback) + callback = rpc_default_callback; + status = -ENOMEM; + if (!(task = rpc_new_task(clnt, callback, flags))) + goto out; + task->tk_calldata = data; + + rpc_call_setup(task, msg, 0); + + /* Set up the call info struct and execute the task */ + if (task->tk_status == 0) + status = rpc_execute(task); + else { status = task->tk_status; rpc_release_task(task); } @@ -288,18 +317,24 @@ out: void -rpc_call_setup(struct rpc_task *task, u32 proc, - void *argp, void *resp, int flags) +rpc_call_setup(struct rpc_task *task, struct rpc_message *msg, int flags) { - task->tk_action = call_bind; - task->tk_proc = proc; - task->tk_argp = argp; - task->tk_resp = resp; + task->tk_msg = *msg; task->tk_flags |= flags; + /* Bind the user cred */ + if (task->tk_msg.rpc_cred != NULL) { + rpcauth_holdcred(task); + } else + rpcauth_bindcred(task); + + if (task->tk_status == 0) + task->tk_action = call_reserve; + else + task->tk_action = NULL; /* Increment call count */ - if (task->tk_proc < task->tk_client->cl_maxproc) - rpcproc_count(task->tk_client, proc)++; + if (task->tk_msg.rpc_proc < task->tk_client->cl_maxproc) + rpcproc_count(task->tk_client, task->tk_msg.rpc_proc)++; } /* @@ -313,27 +348,8 @@ rpc_restart_call(struct rpc_task *task) rpc_release_task(task); return; } - task->tk_action = call_bind; - rpcproc_count(task->tk_client, task->tk_proc)++; -} - -/* - * 0. Get the server port number if not yet set - */ -static void -call_bind(struct rpc_task *task) -{ - struct rpc_clnt *clnt = task->tk_client; - struct rpc_xprt *xprt = clnt->cl_xprt; - - if (xprt->stream && !xprt->connected) - task->tk_action = call_reconnect; - else - task->tk_action = call_reserve; - task->tk_status = 0; - - if (!clnt->cl_port) - rpc_getport(task, clnt); + task->tk_action = call_reserve; + rpcproc_count(task->tk_client, task->tk_msg.rpc_proc)++; } /* @@ -344,26 +360,22 @@ call_reserve(struct rpc_task *task) { struct rpc_clnt *clnt = task->tk_client; - dprintk("RPC: %4d call_reserve\n", task->tk_pid); - if (!clnt->cl_port) { - printk(KERN_NOTICE "%s: couldn't bind to server %s - %s.\n", - clnt->cl_protname, clnt->cl_server, - clnt->cl_softrtry? "giving up" : "retrying"); - if (!clnt->cl_softrtry) { - task->tk_action = call_bind; - rpc_delay(task, 5*HZ); - return; - } + if (task->tk_msg.rpc_proc > clnt->cl_maxproc) { + printk(KERN_WARNING "%s (vers %d): bad procedure number %d\n", + clnt->cl_protname, clnt->cl_vers, task->tk_msg.rpc_proc); rpc_exit(task, -EIO); return; } + + dprintk("RPC: %4d call_reserve\n", task->tk_pid); if (!rpcauth_uptodatecred(task)) { task->tk_action = call_refresh; return; } + + task->tk_status = 0; task->tk_action = call_reserveresult; task->tk_timeout = clnt->cl_timeout.to_resrvval; - task->tk_status = 0; clnt->cl_stats->rpccnt++; xprt_reserve(task); } @@ -374,6 +386,8 @@ call_reserve(struct rpc_task *task) static void call_reserveresult(struct rpc_task *task) { + int status = task->tk_status; + dprintk("RPC: %4d call_reserveresult (status %d)\n", task->tk_pid, task->tk_status); /* @@ -382,7 +396,7 @@ call_reserveresult(struct rpc_task *task) */ if ((task->tk_status >= 0 && !task->tk_rqstp) || (task->tk_status < 0 && task->tk_rqstp)) - printk("call_reserveresult: status=%d, request=%p??\n", + printk(KERN_ERR "call_reserveresult: status=%d, request=%p??\n", task->tk_status, task->tk_rqstp); if (task->tk_status >= 0) { @@ -390,11 +404,11 @@ call_reserveresult(struct rpc_task *task) return; } - switch (task->tk_status) { + task->tk_status = 0; + switch (status) { case -EAGAIN: case -ENOBUFS: task->tk_timeout = task->tk_client->cl_timeout.to_resrvval; - task->tk_status = 0; task->tk_action = call_reserve; break; case -ETIMEDOUT: @@ -402,11 +416,11 @@ call_reserveresult(struct rpc_task *task) task->tk_action = call_timeout; break; default: - task->tk_action = NULL; if (!task->tk_rqstp) { - printk("RPC: task has no request, exit EIO\n"); + printk(KERN_INFO "RPC: task has no request, exit EIO\n"); rpc_exit(task, -EIO); - } + } else + rpc_exit(task, status); } } @@ -428,13 +442,13 @@ call_allocate(struct rpc_task *task) /* FIXME: compute buffer requirements more exactly using * auth->au_wslack */ - bufsiz = rpcproc_bufsiz(clnt, task->tk_proc) + RPC_SLACK_SPACE; + bufsiz = rpcproc_bufsiz(clnt, task->tk_msg.rpc_proc) + RPC_SLACK_SPACE; if ((task->tk_buffer = rpc_malloc(task, bufsiz)) != NULL) return; - printk("RPC: buffer allocation failed for task %p\n", task); + printk(KERN_INFO "RPC: buffer allocation failed for task %p\n", task); - if (!signalled()) { + if (RPC_IS_ASYNC(task) || !(task->tk_client->cl_intr && signalled())) { xprt_release(task); task->tk_action = call_reserve; rpc_delay(task, HZ>>4); @@ -460,10 +474,10 @@ call_encode(struct rpc_task *task) dprintk("RPC: %4d call_encode (status %d)\n", task->tk_pid, task->tk_status); - task->tk_action = call_transmit; + task->tk_action = call_bind; /* Default buffer setup */ - bufsiz = rpcproc_bufsiz(clnt, task->tk_proc)+RPC_SLACK_SPACE; + bufsiz = rpcproc_bufsiz(clnt, task->tk_msg.rpc_proc)+RPC_SLACK_SPACE; req->rq_svec[0].iov_base = task->tk_buffer; req->rq_svec[0].iov_len = bufsiz; req->rq_slen = 0; @@ -474,23 +488,16 @@ call_encode(struct rpc_task *task) req->rq_rnr = 1; req->rq_damaged = 0; - if (task->tk_proc > clnt->cl_maxproc) { - printk(KERN_WARNING "%s (vers %d): bad procedure number %d\n", - clnt->cl_protname, clnt->cl_vers, task->tk_proc); - rpc_exit(task, -EIO); - return; - } - /* Zero buffer so we have automatic zero-padding of opaque & string */ memset(task->tk_buffer, 0, bufsiz); /* Encode header and provided arguments */ - encode = rpcproc_encode(clnt, task->tk_proc); + encode = rpcproc_encode(clnt, task->tk_msg.rpc_proc); if (!(p = call_header(task))) { - printk("RPC: call_header failed, exit EIO\n"); + printk(KERN_INFO "RPC: call_header failed, exit EIO\n"); rpc_exit(task, -EIO); } else - if (encode && (status = encode(req, p, task->tk_argp)) < 0) { + if (encode && (status = encode(req, p, task->tk_msg.rpc_argp)) < 0) { printk(KERN_WARNING "%s: can't encode arguments: %d\n", clnt->cl_protname, -status); rpc_exit(task, status); @@ -498,38 +505,59 @@ call_encode(struct rpc_task *task) } /* - * 4. Transmit the RPC request + * 4. Get the server port number if not yet set */ static void -call_transmit(struct rpc_task *task) +call_bind(struct rpc_task *task) { - dprintk("RPC: %4d call_transmit (status %d)\n", - task->tk_pid, task->tk_status); + struct rpc_clnt *clnt = task->tk_client; + struct rpc_xprt *xprt = clnt->cl_xprt; - task->tk_action = call_receive; - task->tk_status = 0; - xprt_transmit(task); + task->tk_action = (xprt->connected) ? call_transmit : call_reconnect; + + if (!clnt->cl_port) { + task->tk_action = call_reconnect; + task->tk_timeout = clnt->cl_timeout.to_maxval; + rpc_getport(task, clnt); + } } /* - * 5. Wait for the RPC reply + * 4a. Reconnect to the RPC server (TCP case) */ static void -call_receive(struct rpc_task *task) +call_reconnect(struct rpc_task *task) { - dprintk("RPC: %4d call_receive (status %d)\n", - task->tk_pid, task->tk_status); + struct rpc_clnt *clnt = task->tk_client; - task->tk_action = call_status; + dprintk("RPC: %4d call_reconnect status %d\n", + task->tk_pid, task->tk_status); - /* Need to ensure cleanups are performed by xprt_receive_status() */ - xprt_receive(task); + task->tk_action = call_transmit; + if (task->tk_status < 0 || !clnt->cl_xprt->stream) + return; + clnt->cl_stats->netreconn++; + xprt_reconnect(task); +} - /* If we have no decode function, this means we're performing - * a void call (a la lockd message passing). */ - if (!rpcproc_decode(task->tk_client, task->tk_proc)) { - task->tk_action = NULL; +/* + * 5. Transmit the RPC request, and wait for reply + */ +static void +call_transmit(struct rpc_task *task) +{ + struct rpc_clnt *clnt = task->tk_client; + + dprintk("RPC: %4d call_transmit (status %d)\n", + task->tk_pid, task->tk_status); + + task->tk_action = call_status; + if (task->tk_status < 0) return; + xprt_transmit(task); + if (!rpcproc_decode(clnt, task->tk_msg.rpc_proc)) { + task->tk_action = NULL; + rpc_wake_up_task(task); } } @@ -558,34 +586,30 @@ call_status(struct rpc_task *task) case -ETIMEDOUT: task->tk_action = call_timeout; break; - case -EAGAIN: - if (!req) - task->tk_action = call_reserve; - else if (!task->tk_buffer) - task->tk_action = call_allocate; - else if (req->rq_damaged) { - task->tk_action = call_encode; - clnt->cl_stats->rpcretrans++; - } else { - task->tk_action = call_transmit; - clnt->cl_stats->rpcretrans++; - } - break; case -ECONNREFUSED: case -ENOTCONN: + req->rq_bytes_sent = 0; if (clnt->cl_autobind || !clnt->cl_port) { clnt->cl_port = 0; task->tk_action = call_bind; - } else if (xprt->stream) + break; + } + if (xprt->stream) { task->tk_action = call_reconnect; - else { - rpc_delay(task, 5*HZ); /* Hope it all wears off */ - if (req->rq_damaged) - task->tk_action = call_encode; - else - task->tk_action = call_transmit; - clnt->cl_stats->rpcretrans++; + break; } + /* + * Sleep and dream of an open connection + */ + task->tk_timeout = 5 * HZ; + rpc_sleep_on(&xprt->sending, task, NULL, NULL); + case -ENOMEM: + case -EAGAIN: + if (req->rq_damaged) + task->tk_action = call_encode; + else + task->tk_action = call_transmit; + clnt->cl_stats->rpcretrans++; break; default: if (clnt->cl_chatty) @@ -614,15 +638,13 @@ call_timeout(struct rpc_task *task) task->tk_pid); goto minor_timeout; } - to->to_initval <<= 1; - if (to->to_initval > to->to_maxval) - to->to_initval = to->to_maxval; + to->to_retries = clnt->cl_timeout.to_retries; } dprintk("RPC: %4d call_timeout (major timeo)\n", task->tk_pid); if (clnt->cl_softrtry) { if (clnt->cl_chatty && !task->tk_exit) - printk("%s: server %s not responding, timed out\n", + printk(KERN_NOTICE "%s: server %s not responding, timed out\n", clnt->cl_protname, clnt->cl_server); rpc_exit(task, -EIO); return; @@ -630,24 +652,26 @@ call_timeout(struct rpc_task *task) if (clnt->cl_chatty && !(task->tk_flags & RPC_CALL_MAJORSEEN)) { task->tk_flags |= RPC_CALL_MAJORSEEN; if (req) - printk("%s: server %s not responding, still trying\n", + printk(KERN_NOTICE "%s: server %s not responding, still trying\n", clnt->cl_protname, clnt->cl_server); else - printk("%s: task %d can't get a request slot\n", + printk(KERN_NOTICE "%s: task %d can't get a request slot\n", clnt->cl_protname, task->tk_pid); } if (clnt->cl_autobind) clnt->cl_port = 0; minor_timeout: - if (!clnt->cl_port) { + if (!req) + task->tk_action = call_reserve; + else if (req->rq_damaged) { + task->tk_action = call_encode; + clnt->cl_stats->rpcretrans++; + } else if (!clnt->cl_port) { task->tk_action = call_bind; + clnt->cl_stats->rpcretrans++; } else if (clnt->cl_xprt->stream && !clnt->cl_xprt->connected) { task->tk_action = call_reconnect; - } else if (!req) { - task->tk_action = call_reserve; - } else if (req->rq_damaged) { - task->tk_action = call_encode; clnt->cl_stats->rpcretrans++; } else { task->tk_action = call_transmit; @@ -657,21 +681,6 @@ minor_timeout: } /* - * 6b. Reconnect to the RPC server (TCP case) - */ -static void -call_reconnect(struct rpc_task *task) -{ - struct rpc_clnt *clnt = task->tk_client; - dprintk("RPC: %4d call_reconnect status %d\n", - task->tk_pid, task->tk_status); - task->tk_action = call_reserve; - task->tk_status = 0; - clnt->cl_stats->netreconn++; - xprt_reconnect(task); -} - -/* * 7. Decode the RPC reply */ static void @@ -679,20 +688,20 @@ call_decode(struct rpc_task *task) { struct rpc_clnt *clnt = task->tk_client; struct rpc_rqst *req = task->tk_rqstp; - kxdrproc_t decode = rpcproc_decode(clnt, task->tk_proc); + kxdrproc_t decode = rpcproc_decode(clnt, task->tk_msg.rpc_proc); u32 *p; dprintk("RPC: %4d call_decode (status %d)\n", task->tk_pid, task->tk_status); if (clnt->cl_chatty && (task->tk_flags & RPC_CALL_MAJORSEEN)) { - printk("%s: server %s OK\n", + printk(KERN_NOTICE "%s: server %s OK\n", clnt->cl_protname, clnt->cl_server); task->tk_flags &= ~RPC_CALL_MAJORSEEN; } if (task->tk_status < 12) { - printk("%s: too small RPC reply size (%d bytes)\n", + printk(KERN_WARNING "%s: too small RPC reply size (%d bytes)\n", clnt->cl_protname, task->tk_status); rpc_exit(task, -EIO); return; @@ -708,10 +717,11 @@ call_decode(struct rpc_task *task) */ if (task->tk_client->cl_prog == 100003 && (ntohl(*p) == NFSERR_ACCES || ntohl(*p) == NFSERR_PERM)) { - if (RPC_IS_SETUID(task) && (task->tk_suid_retry)--) { + if (RPC_IS_SETUID(task) && task->tk_suid_retry) { dprintk("RPC: %4d retry squashed uid\n", task->tk_pid); task->tk_flags ^= RPC_CALL_REALUID; task->tk_action = call_encode; + task->tk_suid_retry--; return; } } @@ -719,7 +729,7 @@ call_decode(struct rpc_task *task) task->tk_action = NULL; if (decode) - task->tk_status = decode(req, p, task->tk_resp); + task->tk_status = decode(req, p, task->tk_msg.rpc_resp); dprintk("RPC: %4d call_decode result %d\n", task->tk_pid, task->tk_status); } @@ -751,7 +761,7 @@ call_refreshresult(struct rpc_task *task) if (task->tk_status < 0) rpc_exit(task, -EACCES); else - task->tk_action = call_bind; + task->tk_action = call_reserve; } /* @@ -772,7 +782,7 @@ call_header(struct rpc_task *task) *p++ = htonl(RPC_VERSION); /* RPC version */ *p++ = htonl(clnt->cl_prog); /* program number */ *p++ = htonl(clnt->cl_vers); /* program version */ - *p++ = htonl(task->tk_proc); /* procedure */ + *p++ = htonl(task->tk_msg.rpc_proc); /* procedure */ return rpcauth_marshcred(task, p); } @@ -787,20 +797,21 @@ call_verify(struct rpc_task *task) p += 1; /* skip XID */ if ((n = ntohl(*p++)) != RPC_REPLY) { - printk("call_verify: not an RPC reply: %x\n", n); + printk(KERN_WARNING "call_verify: not an RPC reply: %x\n", n); goto garbage; } if ((n = ntohl(*p++)) != RPC_MSG_ACCEPTED) { int error = -EACCES; if ((n = ntohl(*p++)) != RPC_AUTH_ERROR) { - printk("call_verify: RPC call rejected: %x\n", n); + printk(KERN_WARNING "call_verify: RPC call rejected: %x\n", n); } else switch ((n = ntohl(*p++))) { case RPC_AUTH_REJECTEDCRED: case RPC_AUTH_REJECTEDVERF: - if (!task->tk_cred_retry--) + if (!task->tk_cred_retry) break; + task->tk_cred_retry--; dprintk("RPC: %4d call_verify: retry stale creds\n", task->tk_pid); rpcauth_invalcred(task); @@ -809,17 +820,18 @@ call_verify(struct rpc_task *task) case RPC_AUTH_BADCRED: case RPC_AUTH_BADVERF: /* possibly garbled cred/verf? */ - if (!task->tk_garb_retry--) + if (!task->tk_garb_retry) break; + task->tk_garb_retry--; dprintk("RPC: %4d call_verify: retry garbled creds\n", task->tk_pid); task->tk_action = call_encode; return NULL; case RPC_AUTH_TOOWEAK: - printk("call_verify: server requires stronger " + printk(KERN_NOTICE "call_verify: server requires stronger " "authentication.\n"); default: - printk("call_verify: unknown auth error: %x\n", n); + printk(KERN_WARNING "call_verify: unknown auth error: %x\n", n); error = -EIO; } dprintk("RPC: %4d call_verify: call rejected %d\n", @@ -828,7 +840,7 @@ call_verify(struct rpc_task *task) return NULL; } if (!(p = rpcauth_checkverf(task, p))) { - printk("call_verify: auth check failed\n"); + printk(KERN_WARNING "call_verify: auth check failed\n"); goto garbage; /* bad verifier, retry */ } switch ((n = ntohl(*p++))) { @@ -837,19 +849,20 @@ call_verify(struct rpc_task *task) case RPC_GARBAGE_ARGS: break; /* retry */ default: - printk("call_verify: server accept status: %x\n", n); + printk(KERN_WARNING "call_verify: server accept status: %x\n", n); /* Also retry */ } garbage: dprintk("RPC: %4d call_verify: server saw garbage\n", task->tk_pid); task->tk_client->cl_stats->rpcgarbage++; - if (task->tk_garb_retry--) { - printk("RPC: garbage, retrying %4d\n", task->tk_pid); + if (task->tk_garb_retry) { + task->tk_garb_retry--; + printk(KERN_WARNING "RPC: garbage, retrying %4d\n", task->tk_pid); task->tk_action = call_encode; return NULL; } - printk("RPC: garbage, exit EIO\n"); + printk(KERN_WARNING "RPC: garbage, exit EIO\n"); rpc_exit(task, -EIO); return NULL; } diff --git a/net/sunrpc/pmap_clnt.c b/net/sunrpc/pmap_clnt.c index 2bbe9d50a..6afb28e88 100644 --- a/net/sunrpc/pmap_clnt.c +++ b/net/sunrpc/pmap_clnt.c @@ -41,6 +41,7 @@ rpc_getport(struct rpc_task *task, struct rpc_clnt *clnt) { struct rpc_portmap *map = &clnt->cl_pmap; struct sockaddr_in *sap = &clnt->cl_xprt->addr; + struct rpc_message msg = { PMAP_GETPORT, map, &clnt->cl_port, NULL }; struct rpc_clnt *pmap_clnt; struct rpc_task *child; @@ -66,7 +67,7 @@ rpc_getport(struct rpc_task *task, struct rpc_clnt *clnt) goto bailout; /* Setup the call info struct */ - rpc_call_setup(child, PMAP_GETPORT, map, &clnt->cl_port, 0); + rpc_call_setup(child, &msg, 0); /* ... and run the child task */ rpc_run_child(task, child, pmap_getport_done); diff --git a/net/sunrpc/sched.c b/net/sunrpc/sched.c index ffd4c18ad..bfbfc1580 100644 --- a/net/sunrpc/sched.c +++ b/net/sunrpc/sched.c @@ -69,6 +69,16 @@ static pid_t rpciod_pid = 0; static int rpc_inhibit = 0; /* + * Spinlock for wait queues. Access to the latter also has to be + * interrupt-safe in order to allow timers to wake up sleeping tasks. + */ +spinlock_t rpc_queue_lock = SPIN_LOCK_UNLOCKED; +/* + * Spinlock for other critical sections of code. + */ +spinlock_t rpc_sched_lock = SPIN_LOCK_UNLOCKED; + +/* * This is the last-ditch buffer for NFS swap requests */ static u32 swap_buffer[PAGE_SIZE >> 2]; @@ -87,14 +97,52 @@ static __inline__ void rpc_unlock_swapbuf(void) } /* - * Spinlock for wait queues. Access to the latter also has to be - * interrupt-safe in order to allow timers to wake up sleeping tasks. + * Set up a timer for the current task. */ -spinlock_t rpc_queue_lock = SPIN_LOCK_UNLOCKED; +static inline void +__rpc_add_timer(struct rpc_task *task, rpc_action timer) +{ + if (!task->tk_timeout) + return; + + dprintk("RPC: %4d setting alarm for %lu ms\n", + task->tk_pid, task->tk_timeout * 1000 / HZ); + + if (timer_pending(&task->tk_timer)) { + printk(KERN_ERR "RPC: Bug! Overwriting active timer\n"); + del_timer(&task->tk_timer); + } + if (!timer) + timer = __rpc_default_timer; + init_timer(&task->tk_timer); + task->tk_timer.expires = jiffies + task->tk_timeout; + task->tk_timer.data = (unsigned long) task; + task->tk_timer.function = (void (*)(unsigned long)) timer; + add_timer(&task->tk_timer); +} + /* - * Spinlock for other critical sections of code. + * Set up a timer for an already sleeping task. */ -spinlock_t rpc_sched_lock = SPIN_LOCK_UNLOCKED; +void rpc_add_timer(struct rpc_task *task, rpc_action timer) +{ + spin_lock_bh(&rpc_queue_lock); + if (!(RPC_IS_RUNNING(task) || task->tk_wakeup)) + __rpc_add_timer(task, timer); + spin_unlock_bh(&rpc_queue_lock); +} + +/* + * Delete any timer for the current task. + */ +static inline void +__rpc_del_timer(struct rpc_task *task) +{ + dprintk("RPC: %4d deleting timer\n", task->tk_pid); + if (timer_pending(&task->tk_timer)) + del_timer(&task->tk_timer); + task->tk_timeout = 0; +} /* * Add new request to wait queue. @@ -104,16 +152,15 @@ spinlock_t rpc_sched_lock = SPIN_LOCK_UNLOCKED; * improve overall performance. * Everyone else gets appended to the queue to ensure proper FIFO behavior. */ -static int +static inline int __rpc_add_wait_queue(struct rpc_wait_queue *queue, struct rpc_task *task) { - if (task->tk_rpcwait) { - if (task->tk_rpcwait != queue) - { - printk(KERN_WARNING "RPC: doubly enqueued task!\n"); - return -EWOULDBLOCK; - } + if (task->tk_rpcwait == queue) return 0; + + if (task->tk_rpcwait) { + printk(KERN_WARNING "RPC: doubly enqueued task!\n"); + return -EWOULDBLOCK; } if (RPC_IS_SWAPPER(task)) rpc_insert_list(&queue->task, task); @@ -130,7 +177,7 @@ __rpc_add_wait_queue(struct rpc_wait_queue *queue, struct rpc_task *task) int rpc_add_wait_queue(struct rpc_wait_queue *q, struct rpc_task *task) { - int result; + int result; spin_lock_bh(&rpc_queue_lock); result = __rpc_add_wait_queue(q, task); @@ -142,13 +189,14 @@ rpc_add_wait_queue(struct rpc_wait_queue *q, struct rpc_task *task) * Remove request from queue. * Note: must be called with spin lock held. */ -static void +static inline void __rpc_remove_wait_queue(struct rpc_task *task) { - struct rpc_wait_queue *queue; + struct rpc_wait_queue *queue = task->tk_rpcwait; - if (!(queue = task->tk_rpcwait)) + if (!queue) return; + rpc_remove_list(&queue->task, task); task->tk_rpcwait = NULL; @@ -159,51 +207,14 @@ __rpc_remove_wait_queue(struct rpc_task *task) void rpc_remove_wait_queue(struct rpc_task *task) { + if (!task->tk_rpcwait) + return; spin_lock_bh(&rpc_queue_lock); __rpc_remove_wait_queue(task); spin_unlock_bh(&rpc_queue_lock); } /* - * Set up a timer for the current task. - */ -inline void -rpc_add_timer(struct rpc_task *task, rpc_action timer) -{ - unsigned long expires = jiffies + task->tk_timeout; - - dprintk("RPC: %4d setting alarm for %lu ms\n", - task->tk_pid, task->tk_timeout * 1000 / HZ); - if (!timer) - timer = __rpc_default_timer; - if (time_before(expires, jiffies)) { - printk(KERN_ERR "RPC: bad timeout value %ld - setting to 10 sec!\n", - task->tk_timeout); - expires = jiffies + 10 * HZ; - } - task->tk_timer.expires = expires; - task->tk_timer.data = (unsigned long) task; - task->tk_timer.function = (void (*)(unsigned long)) timer; - task->tk_timer.prev = NULL; - task->tk_timer.next = NULL; - add_timer(&task->tk_timer); -} - -/* - * Delete any timer for the current task. - * Must be called with interrupts off. - */ -inline void -rpc_del_timer(struct rpc_task *task) -{ - if (task->tk_timeout) { - dprintk("RPC: %4d deleting timer\n", task->tk_pid); - del_timer(&task->tk_timer); - task->tk_timeout = 0; - } -} - -/* * Make an RPC task runnable. * * Note: If the task is ASYNC, this must be called with @@ -218,31 +229,44 @@ rpc_make_runnable(struct rpc_task *task) } task->tk_flags |= RPC_TASK_RUNNING; if (RPC_IS_ASYNC(task)) { - int status; - status = __rpc_add_wait_queue(&schedq, task); - if (status) - { - printk(KERN_WARNING "RPC: failed to add task to queue: error: %d!\n", status); - task->tk_status = status; + if (RPC_IS_SLEEPING(task)) { + int status; + status = __rpc_add_wait_queue(&schedq, task); + if (status < 0) { + printk(KERN_WARNING "RPC: failed to add task to queue: error: %d!\n", status); + task->tk_status = status; + } else + task->tk_sleeping = 0; } wake_up(&rpciod_idle); } else { + task->tk_sleeping = 0; wake_up(&task->tk_wait); } } +/* + * Place a newly initialized task on the schedq. + */ +static inline void +rpc_schedule_run(struct rpc_task *task) +{ + /* Don't run a child twice! */ + if (RPC_IS_ACTIVATED(task)) + return; + task->tk_active = 1; + task->tk_sleeping = 1; + rpc_make_runnable(task); +} /* * For other people who may need to wake the I/O daemon * but should (for now) know nothing about its innards */ - void rpciod_wake_up(void) { if(rpciod_pid==0) - { printk(KERN_ERR "rpciod: wot no daemon?\n"); - } wake_up(&rpciod_idle); } @@ -261,19 +285,25 @@ __rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task, dprintk("RPC: %4d sleep_on(queue \"%s\" time %ld)\n", task->tk_pid, rpc_qname(q), jiffies); + if (!RPC_IS_ASYNC(task) && !RPC_IS_ACTIVATED(task)) { + printk(KERN_ERR "RPC: Inactive synchronous task put to sleep!\n"); + return; + } + + /* Mark the task as being activated if so needed */ + if (!RPC_IS_ACTIVATED(task)) { + task->tk_active = 1; + task->tk_sleeping = 1; + } + status = __rpc_add_wait_queue(q, task); - if (status) - { + if (status) { printk(KERN_WARNING "RPC: failed to add task to queue: error: %d!\n", status); task->tk_status = status; - task->tk_flags |= RPC_TASK_RUNNING; - } - else - { - task->tk_callback = action; - if (task->tk_timeout) - rpc_add_timer(task, timer); + } else { task->tk_flags &= ~RPC_TASK_RUNNING; + task->tk_callback = action; + __rpc_add_timer(task, timer); } return; @@ -291,6 +321,19 @@ rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task, spin_unlock_bh(&rpc_queue_lock); } +void +rpc_sleep_locked(struct rpc_wait_queue *q, struct rpc_task *task, + rpc_action action, rpc_action timer) +{ + /* + * Protect the queue operations. + */ + spin_lock_bh(&rpc_queue_lock); + __rpc_sleep_on(q, task, action, timer); + rpc_lock_task(task); + spin_unlock_bh(&rpc_queue_lock); +} + /* * Wake up a single task -- must be invoked with spin lock held. * @@ -307,16 +350,33 @@ __rpc_wake_up(struct rpc_task *task) if (task->tk_magic != 0xf00baa) { printk(KERN_ERR "RPC: attempt to wake up non-existing task!\n"); rpc_debug = ~0; + rpc_show_tasks(); return; } #endif - rpc_del_timer(task); + /* Has the task been executed yet? If not, we cannot wake it up! */ + if (!RPC_IS_ACTIVATED(task)) { + printk(KERN_ERR "RPC: Inactive task (%p) being woken up!\n", task); + return; + } + if (RPC_IS_RUNNING(task)) + return; + + __rpc_del_timer(task); + + /* If the task has been locked, then set tk_wakeup so that + * rpc_unlock_task() wakes us up... */ + if (task->tk_lock) { + task->tk_wakeup = 1; + return; + } else + task->tk_wakeup = 0; + if (task->tk_rpcwait != &schedq) __rpc_remove_wait_queue(task); - if (!RPC_IS_RUNNING(task)) { - task->tk_flags |= RPC_TASK_CALLBACK; - rpc_make_runnable(task); - } + task->tk_flags |= RPC_TASK_CALLBACK; + rpc_make_runnable(task); + dprintk("RPC: __rpc_wake_up done\n"); } @@ -338,6 +398,8 @@ __rpc_default_timer(struct rpc_task *task) void rpc_wake_up_task(struct rpc_task *task) { + if (RPC_IS_RUNNING(task)) + return; spin_lock_bh(&rpc_queue_lock); __rpc_wake_up(task); spin_unlock_bh(&rpc_queue_lock); @@ -389,6 +451,30 @@ rpc_wake_up_status(struct rpc_wait_queue *queue, int status) } /* + * Lock down a sleeping task to prevent it from waking up + * and disappearing from beneath us. + * + * This function should always be called with the + * rpc_queue_lock held. + */ +int +rpc_lock_task(struct rpc_task *task) +{ + if (!RPC_IS_RUNNING(task)) + return ++task->tk_lock; + return 0; +} + +void +rpc_unlock_task(struct rpc_task *task) +{ + spin_lock_bh(&rpc_queue_lock); + if (task->tk_lock && !--task->tk_lock && task->tk_wakeup) + __rpc_wake_up(task); + spin_unlock_bh(&rpc_queue_lock); +} + +/* * Run a task at a later time */ static void __rpc_atrun(struct rpc_task *); @@ -426,7 +512,7 @@ __rpc_execute(struct rpc_task *task) /* * Execute any pending callback. */ - if (task->tk_flags & RPC_TASK_CALLBACK) { + if (RPC_DO_CALLBACK(task)) { /* Define a callback save pointer */ void (*save_callback)(struct rpc_task *); @@ -446,101 +532,89 @@ __rpc_execute(struct rpc_task *task) } /* - * No handler for next step means exit. - */ - if (!task->tk_action) - break; - - /* * Perform the next FSM step. * tk_action may be NULL when the task has been killed * by someone else. */ - if (RPC_IS_RUNNING(task) && task->tk_action) + if (RPC_IS_RUNNING(task)) { + if (!task->tk_action) + break; task->tk_action(task); + } /* * Check whether task is sleeping. - * Note that if the task may go to sleep in tk_action, + * Note that if the task goes to sleep in tk_action, * and the RPC reply arrives before we get here, it will * have state RUNNING, but will still be on schedq. + * 27/9/99: The above has been attempted fixed by + * introduction of task->tk_sleeping. */ spin_lock_bh(&rpc_queue_lock); - if (RPC_IS_RUNNING(task)) { - if (task->tk_rpcwait == &schedq) - __rpc_remove_wait_queue(task); - } else while (!RPC_IS_RUNNING(task)) { + if (!RPC_IS_RUNNING(task)) { + task->tk_sleeping = 1; if (RPC_IS_ASYNC(task)) { spin_unlock_bh(&rpc_queue_lock); return 0; } + } else + task->tk_sleeping = 0; + spin_unlock_bh(&rpc_queue_lock); + while (RPC_IS_SLEEPING(task)) { /* sync task: sleep here */ dprintk("RPC: %4d sync task going to sleep\n", task->tk_pid); if (current->pid == rpciod_pid) printk(KERN_ERR "RPC: rpciod waiting on sync task!\n"); - spin_unlock_bh(&rpc_queue_lock); - __wait_event(task->tk_wait, RPC_IS_RUNNING(task)); - spin_lock_bh(&rpc_queue_lock); + __wait_event(task->tk_wait, !RPC_IS_SLEEPING(task)); + dprintk("RPC: %4d sync task resuming\n", task->tk_pid); /* - * When the task received a signal, remove from - * any queues etc, and make runnable again. + * When a sync task receives a signal, it exits with + * -ERESTARTSYS. In order to catch any callbacks that + * clean up after sleeping on some queue, we don't + * break the loop here, but go around once more. */ - if (signalled()) - __rpc_wake_up(task); - - dprintk("RPC: %4d sync task resuming\n", - task->tk_pid); - } - spin_unlock_bh(&rpc_queue_lock); - - /* - * When a sync task receives a signal, it exits with - * -ERESTARTSYS. In order to catch any callbacks that - * clean up after sleeping on some queue, we don't - * break the loop here, but go around once more. - */ - if (!RPC_IS_ASYNC(task) && signalled()) { - dprintk("RPC: %4d got signal\n", task->tk_pid); - rpc_exit(task, -ERESTARTSYS); + if (task->tk_client->cl_intr && signalled()) { + dprintk("RPC: %4d got signal\n", task->tk_pid); + task->tk_flags |= RPC_TASK_KILLED; + rpc_exit(task, -ERESTARTSYS); + rpc_wake_up_task(task); + } } } dprintk("RPC: %4d exit() = %d\n", task->tk_pid, task->tk_status); - if (task->tk_exit) { - status = task->tk_status; + status = task->tk_status; + if (task->tk_exit) task->tk_exit(task); - } return status; } /* * User-visible entry point to the scheduler. - * The recursion protection is for debugging. It should go away once - * the code has stabilized. + * + * This may be called recursively if e.g. an async NFS task updates + * the attributes and finds that dirty pages must be flushed. */ -void +int rpc_execute(struct rpc_task *task) { - static int executing = 0; - int incr = RPC_IS_ASYNC(task)? 1 : 0; - - if (incr) { - if (rpc_inhibit) { - printk(KERN_INFO "RPC: execution inhibited!\n"); - return; - } - if (executing) - printk(KERN_WARNING "RPC: %d tasks executed\n", executing); + if (rpc_inhibit) { + printk(KERN_INFO "RPC: execution inhibited!\n"); + return -EIO; } + task->tk_flags |= RPC_TASK_RUNNING; + if (task->tk_active) { + printk(KERN_ERR "RPC: active task was run twice!\n"); + return -EWOULDBLOCK; + } + task->tk_active = 1; - executing += incr; - __rpc_execute(task); - executing -= incr; + return __rpc_execute(task); } /* @@ -551,28 +625,33 @@ __rpc_schedule(void) { struct rpc_task *task; int count = 0; - int need_resched = current->need_resched; dprintk("RPC: rpc_schedule enter\n"); while (1) { + /* Ensure equal rights for tcp tasks... */ + rpciod_tcp_dispatcher(); + spin_lock_bh(&rpc_queue_lock); if (!(task = schedq.task)) { spin_unlock_bh(&rpc_queue_lock); break; } - rpc_del_timer(task); + if (task->tk_lock) { + spin_unlock_bh(&rpc_queue_lock); + printk(KERN_ERR "RPC: Locked task was scheduled !!!!\n"); + rpc_debug = ~0; + rpc_show_tasks(); + break; + } __rpc_remove_wait_queue(task); - task->tk_flags |= RPC_TASK_RUNNING; spin_unlock_bh(&rpc_queue_lock); __rpc_execute(task); - if (++count >= 200) { + if (++count >= 200 || current->need_resched) { count = 0; - need_resched = 1; - } - if (need_resched) schedule(); + } } dprintk("RPC: rpc_schedule leave\n"); } @@ -646,8 +725,9 @@ rpc_init_task(struct rpc_task *task, struct rpc_clnt *clnt, rpc_action callback, int flags) { memset(task, 0, sizeof(*task)); + init_timer(&task->tk_timer); task->tk_client = clnt; - task->tk_flags = RPC_TASK_RUNNING | flags; + task->tk_flags = flags; task->tk_exit = callback; init_waitqueue_head(&task->tk_wait); if (current->uid != current->fsuid || current->gid != current->fsgid) @@ -717,6 +797,15 @@ rpc_release_task(struct rpc_task *task) dprintk("RPC: %4d release task\n", task->tk_pid); +#ifdef RPC_DEBUG + if (task->tk_magic != 0xf00baa) { + printk(KERN_ERR "RPC: attempt to release a non-existing task!\n"); + rpc_debug = ~0; + rpc_show_tasks(); + return; + } +#endif + /* Remove from global task list */ spin_lock(&rpc_sched_lock); prev = task->tk_prev_task; @@ -734,18 +823,20 @@ rpc_release_task(struct rpc_task *task) spin_lock_bh(&rpc_queue_lock); /* Delete any running timer */ - rpc_del_timer(task); + __rpc_del_timer(task); /* Remove from any wait queue we're still on */ __rpc_remove_wait_queue(task); + task->tk_active = 0; + spin_unlock_bh(&rpc_queue_lock); /* Release resources */ if (task->tk_rqstp) xprt_release(task); - if (task->tk_cred) - rpcauth_releasecred(task); + if (task->tk_msg.rpc_cred) + rpcauth_unbindcred(task); if (task->tk_buffer) { rpc_free(task->tk_buffer); task->tk_buffer = NULL; @@ -824,7 +915,7 @@ rpc_run_child(struct rpc_task *task, struct rpc_task *child, rpc_action func) spin_lock_bh(&rpc_queue_lock); /* N.B. Is it possible for the child to have already finished? */ __rpc_sleep_on(&childq, task, func, NULL); - rpc_make_runnable(child); + rpc_schedule_run(child); spin_unlock_bh(&rpc_queue_lock); } @@ -903,8 +994,6 @@ rpciod(void *ptr) schedule(); rounds = 0; } - dprintk("RPC: rpciod running checking dispatch\n"); - rpciod_tcp_dispatcher(); if (!rpciod_task_pending()) { dprintk("RPC: rpciod back to sleep\n"); @@ -1032,11 +1121,9 @@ out: } #ifdef RPC_DEBUG -#include <linux/nfs_fs.h> void rpc_show_tasks(void) { struct rpc_task *t = all_tasks, *next; - struct nfs_wreq *wreq; spin_lock(&rpc_sched_lock); t = all_tasks; @@ -1049,22 +1136,11 @@ void rpc_show_tasks(void) for (; t; t = next) { next = t->tk_next_task; printk("%05d %04d %04x %06d %8p %6d %8p %08ld %8s %8p %8p\n", - t->tk_pid, t->tk_proc, t->tk_flags, t->tk_status, + t->tk_pid, t->tk_msg.rpc_proc, t->tk_flags, t->tk_status, t->tk_client, t->tk_client->cl_prog, t->tk_rqstp, t->tk_timeout, t->tk_rpcwait ? rpc_qname(t->tk_rpcwait) : " <NULL> ", t->tk_action, t->tk_exit); - - if (!(t->tk_flags & RPC_TASK_NFSWRITE)) - continue; - /* NFS write requests */ - wreq = (struct nfs_wreq *) t->tk_calldata; - printk(" NFS: flgs=%08x, pid=%d, pg=%p, off=(%d, %d)\n", - wreq->wb_flags, wreq->wb_pid, wreq->wb_page, - wreq->wb_offset, wreq->wb_bytes); - printk(" name=%s/%s\n", - wreq->wb_file->f_dentry->d_parent->d_name.name, - wreq->wb_file->f_dentry->d_name.name); } spin_unlock(&rpc_sched_lock); } diff --git a/net/sunrpc/sunrpc_syms.c b/net/sunrpc/sunrpc_syms.c index 9a1861041..92559fa65 100644 --- a/net/sunrpc/sunrpc_syms.c +++ b/net/sunrpc/sunrpc_syms.c @@ -35,13 +35,16 @@ EXPORT_SYMBOL(rpc_new_child); EXPORT_SYMBOL(rpc_run_child); EXPORT_SYMBOL(rpciod_down); EXPORT_SYMBOL(rpciod_up); +EXPORT_SYMBOL(rpc_new_task); +EXPORT_SYMBOL(rpc_wake_up_status); /* RPC client functions */ EXPORT_SYMBOL(rpc_create_client); EXPORT_SYMBOL(rpc_destroy_client); EXPORT_SYMBOL(rpc_shutdown_client); EXPORT_SYMBOL(rpc_killall_tasks); -EXPORT_SYMBOL(rpc_do_call); +EXPORT_SYMBOL(rpc_call_sync); +EXPORT_SYMBOL(rpc_call_async); EXPORT_SYMBOL(rpc_call_setup); EXPORT_SYMBOL(rpc_clnt_sigmask); EXPORT_SYMBOL(rpc_clnt_sigunmask); @@ -60,6 +63,7 @@ EXPORT_SYMBOL(rpcauth_init_credcache); EXPORT_SYMBOL(rpcauth_free_credcache); EXPORT_SYMBOL(rpcauth_insert_credcache); EXPORT_SYMBOL(rpcauth_lookupcred); +EXPORT_SYMBOL(rpcauth_bindcred); EXPORT_SYMBOL(rpcauth_matchcred); EXPORT_SYMBOL(rpcauth_releasecred); diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c index 48dd5623d..06d682223 100644 --- a/net/sunrpc/xprt.c +++ b/net/sunrpc/xprt.c @@ -68,7 +68,14 @@ /* Following value should be > 32k + RPC overhead */ #define XPRT_MIN_WRITE_SPACE 35000 +extern spinlock_t rpc_queue_lock; + +/* + * Local variables + */ + /* Spinlock for critical sections in the code. */ +spinlock_t xprt_sock_lock = SPIN_LOCK_UNLOCKED; spinlock_t xprt_lock = SPIN_LOCK_UNLOCKED; #ifdef RPC_DEBUG @@ -86,14 +93,12 @@ spinlock_t xprt_lock = SPIN_LOCK_UNLOCKED; */ static void xprt_request_init(struct rpc_task *, struct rpc_xprt *); static void do_xprt_transmit(struct rpc_task *); -static void xprt_transmit_status(struct rpc_task *task); -static void xprt_transmit_timeout(struct rpc_task *task); -static void xprt_receive_status(struct rpc_task *task); static void xprt_reserve_status(struct rpc_task *task); static void xprt_disconnect(struct rpc_xprt *); -static void xprt_reconn_timeout(struct rpc_task *task); -static struct socket *xprt_create_socket(int, struct sockaddr_in *, - struct rpc_timeout *); +static void xprt_reconn_status(struct rpc_task *task); +static struct socket *xprt_create_socket(int, struct rpc_timeout *); +static int xprt_bind_socket(struct rpc_xprt *, struct socket *); +static void xprt_remove_pending(struct rpc_xprt *); #ifdef RPC_DEBUG_DATA /* @@ -140,7 +145,7 @@ xprt_from_sock(struct sock *sk) */ extern inline void -xprt_move_iov(struct msghdr *msg, struct iovec *niv, int amount) +xprt_move_iov(struct msghdr *msg, struct iovec *niv, unsigned amount) { struct iovec *iv=msg->msg_iov; int i; @@ -148,7 +153,7 @@ xprt_move_iov(struct msghdr *msg, struct iovec *niv, int amount) /* * Eat any sent iovecs */ - while(iv->iov_len <= amount) { + while (iv->iov_len <= amount) { amount -= iv->iov_len; iv++; msg->msg_iovlen--; @@ -184,14 +189,17 @@ xprt_sendmsg(struct rpc_xprt *xprt, struct rpc_rqst *req) int slen = req->rq_slen - req->rq_bytes_sent; struct iovec niv[MAX_IOVEC]; - if (slen == 0) + if (slen <= 0) return 0; + if (!sock) + return -ENOTCONN; + xprt_pktdump("packet data:", req->rq_svec->iov_base, req->rq_svec->iov_len); - msg.msg_flags = MSG_DONTWAIT; + msg.msg_flags = MSG_DONTWAIT|MSG_NOSIGNAL; msg.msg_iov = req->rq_svec; msg.msg_iovlen = req->rq_snr; msg.msg_name = (struct sockaddr *) &xprt->addr; @@ -238,23 +246,30 @@ xprt_sendmsg(struct rpc_xprt *xprt, struct rpc_rqst *req) /* * Read data from socket */ -static inline int -xprt_recvmsg(struct rpc_xprt *xprt, struct iovec *iov, int nr, int len) +static int +xprt_recvmsg(struct rpc_xprt *xprt, struct iovec *iov, int nr, unsigned len, unsigned shift) { struct socket *sock = xprt->sock; - struct sockaddr_in sin; struct msghdr msg; mm_segment_t oldfs; + struct iovec niv[MAX_IOVEC]; int result; - msg.msg_flags = MSG_DONTWAIT; + if (!sock) + return -ENOTCONN; + + msg.msg_flags = MSG_DONTWAIT|MSG_NOSIGNAL; msg.msg_iov = iov; msg.msg_iovlen = nr; - msg.msg_name = &sin; - msg.msg_namelen = sizeof(sin); + msg.msg_name = NULL; + msg.msg_namelen = 0; msg.msg_control = NULL; msg.msg_controllen = 0; + /* Adjust the iovec if we've already filled it */ + if (shift) + xprt_move_iov(&msg, niv, shift); + oldfs = get_fs(); set_fs(get_ds()); result = sock_recvmsg(sock, &msg, len, MSG_DONTWAIT); set_fs(oldfs); @@ -309,21 +324,30 @@ xprt_adjust_cwnd(struct rpc_xprt *xprt, int result) int xprt_adjust_timeout(struct rpc_timeout *to) { - if (to->to_exponential) - to->to_current <<= 1; - else - to->to_current += to->to_increment; - if (to->to_maxval && to->to_current >= to->to_maxval) { - to->to_current = to->to_maxval; - to->to_retries = 0; + if (to->to_retries > 0) { + if (to->to_exponential) + to->to_current <<= 1; + else + to->to_current += to->to_increment; + if (to->to_maxval && to->to_current >= to->to_maxval) + to->to_current = to->to_maxval; + } else { + if (to->to_exponential) + to->to_initval <<= 1; + else + to->to_initval += to->to_increment; + if (to->to_maxval && to->to_initval >= to->to_maxval) + to->to_initval = to->to_maxval; + to->to_current = to->to_initval; } + if (!to->to_current) { printk(KERN_WARNING "xprt_adjust_timeout: to_current = 0!\n"); to->to_current = 5 * HZ; } pprintk("RPC: %lu %s\n", jiffies, to->to_retries? "retrans" : "timeout"); - return (to->to_retries)--; + return to->to_retries-- > 0; } /* @@ -332,22 +356,29 @@ xprt_adjust_timeout(struct rpc_timeout *to) static void xprt_close(struct rpc_xprt *xprt) { + struct socket *sock = xprt->sock; struct sock *sk = xprt->inet; - xprt_disconnect(xprt); + if (!sk) + return; + + xprt->inet = NULL; + xprt->sock = NULL; sk->user_data = NULL; sk->data_ready = xprt->old_data_ready; sk->state_change = xprt->old_state_change; sk->write_space = xprt->old_write_space; + + xprt_disconnect(xprt); sk->no_check = 0; - sock_release(xprt->sock); + sock_release(sock); /* * TCP doesnt require the rpciod now - other things may * but rpciod handles that not us. */ - if(xprt->stream && !xprt->connecting) + if(xprt->stream) rpciod_down(); } @@ -360,14 +391,10 @@ xprt_disconnect(struct rpc_xprt *xprt) dprintk("RPC: disconnected transport %p\n", xprt); xprt->connected = 0; xprt->tcp_offset = 0; - xprt->tcp_more = 0; - xprt->tcp_total = 0; - xprt->tcp_reclen = 0; xprt->tcp_copied = 0; - xprt->tcp_rqstp = NULL; - xprt->rx_pending_flag = 0; + xprt->tcp_more = 0; + xprt_remove_pending(xprt); rpc_wake_up_status(&xprt->pending, -ENOTCONN); - rpc_wake_up_status(&xprt->sending, -ENOTCONN); } /* @@ -377,85 +404,87 @@ void xprt_reconnect(struct rpc_task *task) { struct rpc_xprt *xprt = task->tk_xprt; - struct socket *sock; - struct sock *inet; + struct socket *sock = xprt->sock; + struct sock *inet = xprt->inet; int status; dprintk("RPC: %4d xprt_reconnect %p connected %d\n", task->tk_pid, xprt, xprt->connected); - task->tk_status = 0; - if (xprt->shutdown) return; if (!xprt->stream) return; - spin_lock_bh(&xprt_lock); - if (xprt->connected) { - spin_unlock_bh(&xprt_lock); + if (!xprt->addr.sin_port) { + task->tk_status = -EIO; return; } + + spin_lock(&xprt_lock); if (xprt->connecting) { - task->tk_timeout = xprt->timeout.to_maxval; + task->tk_timeout = 0; rpc_sleep_on(&xprt->reconn, task, NULL, NULL); - spin_unlock_bh(&xprt_lock); + spin_unlock(&xprt_lock); return; } xprt->connecting = 1; - spin_unlock_bh(&xprt_lock); + spin_unlock(&xprt_lock); - /* Create an unconnected socket */ - if (!(sock = xprt_create_socket(xprt->prot, NULL, &xprt->timeout))) { - xprt->connecting = 0; - goto defer; + status = -ENOTCONN; + if (!inet) { + /* Create an unconnected socket */ + if (!(sock = xprt_create_socket(xprt->prot, &xprt->timeout))) + goto defer; + xprt_bind_socket(xprt, sock); + inet = sock->sk; } - inet = sock->sk; - inet->data_ready = xprt->inet->data_ready; - inet->state_change = xprt->inet->state_change; - inet->write_space = xprt->inet->write_space; - inet->user_data = xprt; - - dprintk("RPC: %4d closing old socket\n", task->tk_pid); - xprt_close(xprt); - - /* Reset to new socket */ - xprt->sock = sock; - xprt->inet = inet; + xprt_disconnect(xprt); /* Now connect it asynchronously. */ dprintk("RPC: %4d connecting new socket\n", task->tk_pid); status = sock->ops->connect(sock, (struct sockaddr *) &xprt->addr, sizeof(xprt->addr), O_NONBLOCK); - xprt->connecting = 0; if (status < 0) { - if (status != -EINPROGRESS && status != -EALREADY) { + switch (status) { + case -EALREADY: + case -EINPROGRESS: + status = 0; + break; + case -EISCONN: + case -EPIPE: + status = 0; + xprt_close(xprt); + goto defer; + default: printk("RPC: TCP connect error %d!\n", -status); + xprt_close(xprt); goto defer; } dprintk("RPC: %4d connect status %d connected %d\n", task->tk_pid, status, xprt->connected); - task->tk_timeout = 60 * HZ; - spin_lock_bh(&xprt_lock); + spin_lock_bh(&xprt_sock_lock); if (!xprt->connected) { - rpc_sleep_on(&xprt->reconn, task, - NULL, xprt_reconn_timeout); - spin_unlock_bh(&xprt_lock); + task->tk_timeout = xprt->timeout.to_maxval; + rpc_sleep_on(&xprt->reconn, task, xprt_reconn_status, NULL); + spin_unlock_bh(&xprt_sock_lock); return; } - spin_unlock_bh(&xprt_lock); + spin_unlock_bh(&xprt_sock_lock); } - - defer: - spin_lock_bh(&xprt_lock); - if (!xprt->connected) - rpc_wake_up_next(&xprt->reconn); - spin_unlock_bh(&xprt_lock); + spin_lock(&xprt_lock); + xprt->connecting = 0; + if (status < 0) { + rpc_delay(task, 5*HZ); + task->tk_status = -ENOTCONN; + } + rpc_wake_up(&xprt->reconn); + spin_unlock(&xprt_lock); } /* @@ -463,29 +492,21 @@ defer: * process of reconnecting, and leave the rest to the upper layers. */ static void -xprt_reconn_timeout(struct rpc_task *task) +xprt_reconn_status(struct rpc_task *task) { - spin_lock_bh(&xprt_lock); + struct rpc_xprt *xprt = task->tk_xprt; + dprintk("RPC: %4d xprt_reconn_timeout %d\n", task->tk_pid, task->tk_status); - task->tk_status = -ENOTCONN; - if (task->tk_xprt->connecting) - task->tk_xprt->connecting = 0; - if (!task->tk_xprt->connected) - task->tk_status = -ENOTCONN; - else - task->tk_status = -ETIMEDOUT; - task->tk_timeout = 0; - rpc_wake_up_task(task); - spin_unlock_bh(&xprt_lock); + + spin_lock(&xprt_lock); + xprt->connecting = 0; + rpc_wake_up(&xprt->reconn); + spin_unlock(&xprt_lock); } -extern spinlock_t rpc_queue_lock; /* - * Look up the RPC request corresponding to a reply. - * - * RED-PEN: Niiice... Guys, when will we learn finally that locking - * in this manner is NOOP? --ANK + * Look up the RPC request corresponding to a reply, and then lock it. */ static inline struct rpc_rqst * xprt_lookup_rqst(struct rpc_xprt *xprt, u32 xid) @@ -511,6 +532,8 @@ xprt_lookup_rqst(struct rpc_xprt *xprt, u32 xid) out_bad: req = NULL; out: + if (req && !rpc_lock_task(req->rq_task)) + req = NULL; spin_unlock_bh(&rpc_queue_lock); return req; } @@ -524,9 +547,6 @@ xprt_complete_rqst(struct rpc_xprt *xprt, struct rpc_rqst *req, int copied) { struct rpc_task *task = req->rq_task; - req->rq_rlen = copied; - req->rq_gotit = 1; - /* Adjust congestion window */ xprt_adjust_cwnd(xprt, copied); @@ -549,12 +569,11 @@ xprt_complete_rqst(struct rpc_xprt *xprt, struct rpc_rqst *req, int copied) } #endif - /* ... and wake up the process. */ dprintk("RPC: %4d has input (%d bytes)\n", task->tk_pid, copied); task->tk_status = copied; - if (!RPC_IS_RUNNING(task)) - rpc_wake_up_task(task); + /* ... and wake up the process. */ + rpc_wake_up_task(task); return; } @@ -612,6 +631,7 @@ static int csum_partial_copy_to_page_cache(struct iovec *iov, static inline void udp_data_ready(struct sock *sk, int len) { + struct rpc_task *task; struct rpc_xprt *xprt; struct rpc_rqst *rovr; struct sk_buff *skb; @@ -626,7 +646,10 @@ udp_data_ready(struct sock *sk, int len) dprintk("RPC: udp_data_ready client %p\n", xprt); if ((skb = skb_recv_datagram(sk, 0, 1, &err)) == NULL) - goto out_err; + return; + + if (xprt->shutdown) + goto dropit; repsize = skb->len - sizeof(struct udphdr); if (repsize < 4) { @@ -634,14 +657,15 @@ udp_data_ready(struct sock *sk, int len) goto dropit; } - /* Look up the request corresponding to the given XID */ - if (!(rovr = xprt_lookup_rqst(xprt, - *(u32 *) (skb->h.raw + sizeof(struct udphdr))))) + /* Look up and lock the request corresponding to the given XID */ + rovr = xprt_lookup_rqst(xprt, *(u32 *) (skb->h.raw + sizeof(struct udphdr))); + if (!rovr) goto dropit; + task = rovr->rq_task; - dprintk("RPC: %4d received reply\n", rovr->rq_task->tk_pid); + dprintk("RPC: %4d received reply\n", task->tk_pid); xprt_pktdump("packet data:", - (u32 *) (skb->h.raw + sizeof(struct udphdr)), repsize); + (u32 *) (skb->h.raw+sizeof(struct udphdr)), repsize); if ((copied = rovr->rq_rlen) > repsize) copied = repsize; @@ -649,213 +673,287 @@ udp_data_ready(struct sock *sk, int len) rovr->rq_damaged = 1; /* Suck it into the iovec, verify checksum if not done by hw. */ if (csum_partial_copy_to_page_cache(rovr->rq_rvec, skb, copied)) - goto dropit; + goto out_unlock; /* Something worked... */ dst_confirm(skb->dst); xprt_complete_rqst(xprt, rovr, copied); -dropit: + out_unlock: + rpc_unlock_task(task); + + dropit: skb_free_datagram(sk, skb); - return; -out_err: - return; } /* - * TCP record receive routine - * This is not the most efficient code since we call recvfrom twice-- - * first receiving the record marker and XID, then the data. - * - * The optimal solution would be a RPC support in the TCP layer, which - * would gather all data up to the next record marker and then pass us - * the list of all TCP segments ready to be copied. + * TCP read fragment marker */ static inline int -tcp_input_record(struct rpc_xprt *xprt) +tcp_read_fraghdr(struct rpc_xprt *xprt) { - struct rpc_rqst *req; - struct iovec *iov; struct iovec riov; - u32 offset; - int result, maxcpy, reclen, avail, want; + int want, result; - dprintk("RPC: tcp_input_record\n"); + if (xprt->tcp_offset >= xprt->tcp_reclen + sizeof(xprt->tcp_recm)) { + xprt->tcp_offset = 0; + xprt->tcp_reclen = 0; + } + if (xprt->tcp_offset >= sizeof(xprt->tcp_recm)) + goto done; - offset = xprt->tcp_offset; - result = -EAGAIN; - if (offset < 4 || (!xprt->tcp_more && offset < 8)) { - want = (xprt->tcp_more? 4 : 8) - offset; - dprintk("RPC: reading header (%d bytes)\n", want); - riov.iov_base = xprt->tcp_recm.data + offset; + want = sizeof(xprt->tcp_recm) - xprt->tcp_offset; + dprintk("RPC: reading header (%d bytes)\n", want); + do { + riov.iov_base = ((u8*) &xprt->tcp_recm) + xprt->tcp_offset; riov.iov_len = want; - result = xprt_recvmsg(xprt, &riov, 1, want); + result = xprt_recvmsg(xprt, &riov, 1, want, 0); if (result < 0) - goto done; - offset += result; - if (result < want) { - result = -EAGAIN; - goto done; - } + return result; + xprt->tcp_offset += result; + want -= result; + } while (want); - /* Get the record length and mask out the more_fragments bit */ - reclen = ntohl(xprt->tcp_reclen); - dprintk("RPC: reclen %08x\n", reclen); - xprt->tcp_more = (reclen & 0x80000000)? 0 : 1; - reclen &= 0x7fffffff; - xprt->tcp_total += reclen; - xprt->tcp_reclen = reclen; - - dprintk("RPC: got xid %08x reclen %d morefrags %d\n", - xprt->tcp_xid, xprt->tcp_reclen, xprt->tcp_more); - if (!xprt->tcp_copied - && (req = xprt_lookup_rqst(xprt, xprt->tcp_xid))) { - iov = xprt->tcp_iovec; - memcpy(iov, req->rq_rvec, req->rq_rnr * sizeof(iov[0])); -#if 0 -*(u32 *)iov->iov_base = req->rq_xid; -#endif - iov->iov_base += 4; - iov->iov_len -= 4; - xprt->tcp_copied = 4; - xprt->tcp_rqstp = req; - } - } else { - reclen = xprt->tcp_reclen; - } + /* Is this another fragment in the last message */ + if (!xprt->tcp_more) + xprt->tcp_copied = 0; /* No, so we're reading a new message */ + + /* Get the record length and mask out the last fragment bit */ + xprt->tcp_reclen = ntohl(xprt->tcp_recm); + xprt->tcp_more = (xprt->tcp_reclen & 0x80000000) ? 0 : 1; + xprt->tcp_reclen &= 0x7fffffff; + + dprintk("RPC: New record reclen %d morefrags %d\n", + xprt->tcp_reclen, xprt->tcp_more); + done: + return xprt->tcp_reclen + sizeof(xprt->tcp_recm) - xprt->tcp_offset; +} + +/* + * TCP read xid + */ +static inline int +tcp_read_xid(struct rpc_xprt *xprt, int avail) +{ + struct iovec riov; + int want, result; + + if (xprt->tcp_copied >= sizeof(xprt->tcp_xid) || !avail) + goto done; + want = MIN(sizeof(xprt->tcp_xid) - xprt->tcp_copied, avail); + do { + dprintk("RPC: reading xid (%d bytes)\n", want); + riov.iov_base = ((u8*) &xprt->tcp_xid) + xprt->tcp_copied; + riov.iov_len = want; + result = xprt_recvmsg(xprt, &riov, 1, want, 0); + if (result < 0) + return result; + xprt->tcp_copied += result; + xprt->tcp_offset += result; + want -= result; + avail -= result; + } while (want); + done: + return avail; +} - avail = reclen - (offset - 4); - if ((req = xprt->tcp_rqstp) && req->rq_xid == xprt->tcp_xid - && req->rq_task->tk_rpcwait == &xprt->pending) { - want = MIN(req->rq_rlen - xprt->tcp_copied, avail); +/* + * TCP read and complete request + */ +static inline int +tcp_read_request(struct rpc_xprt *xprt, struct rpc_rqst *req, int avail) +{ + int want, result; + if (req->rq_rlen <= xprt->tcp_copied || !avail) + goto done; + want = MIN(req->rq_rlen - xprt->tcp_copied, avail); + do { dprintk("RPC: %4d TCP receiving %d bytes\n", - req->rq_task->tk_pid, want); - /* Request must be re-encoded before retransmit */ - req->rq_damaged = 1; - result = xprt_recvmsg(xprt, xprt->tcp_iovec, req->rq_rnr, want); + req->rq_task->tk_pid, want); + + result = xprt_recvmsg(xprt, req->rq_rvec, req->rq_rnr, want, xprt->tcp_copied); if (result < 0) - goto done; + return result; xprt->tcp_copied += result; - offset += result; + xprt->tcp_offset += result; avail -= result; - if (result < want) { - result = -EAGAIN; - goto done; - } + want -= result; + } while (want); - maxcpy = MIN(req->rq_rlen, xprt->tcp_total); - if (xprt->tcp_copied == maxcpy && !xprt->tcp_more) { - dprintk("RPC: %4d received reply complete\n", - req->rq_task->tk_pid); - xprt_complete_rqst(xprt, req, xprt->tcp_total); - xprt->tcp_copied = 0; - xprt->tcp_rqstp = NULL; - } - } + done: + if (req->rq_rlen > xprt->tcp_copied && xprt->tcp_more) + return avail; + dprintk("RPC: %4d received reply complete\n", req->rq_task->tk_pid); + xprt_complete_rqst(xprt, req, xprt->tcp_copied); - /* Skip over any trailing bytes on short reads */ - while (avail > 0) { - static u8 dummy[64]; + return avail; +} +/* + * TCP discard extra bytes from a short read + */ +static inline int +tcp_read_discard(struct rpc_xprt *xprt, int avail) +{ + struct iovec riov; + static u8 dummy[64]; + int want, result = 0; + + while (avail) { want = MIN(avail, sizeof(dummy)); riov.iov_base = dummy; riov.iov_len = want; dprintk("RPC: TCP skipping %d bytes\n", want); - result = xprt_recvmsg(xprt, &riov, 1, want); + result = xprt_recvmsg(xprt, &riov, 1, want, 0); if (result < 0) - goto done; - offset += result; + return result; + xprt->tcp_offset += result; avail -= result; - if (result < want) { - result = -EAGAIN; - goto done; + } + return avail; +} + +/* + * TCP record receive routine + * This is not the most efficient code since we call recvfrom thrice-- + * first receiving the record marker, then the XID, then the data. + * + * The optimal solution would be a RPC support in the TCP layer, which + * would gather all data up to the next record marker and then pass us + * the list of all TCP segments ready to be copied. + */ +static int +tcp_input_record(struct rpc_xprt *xprt) +{ + struct rpc_rqst *req = NULL; + struct rpc_task *task = NULL; + int avail, result; + + dprintk("RPC: tcp_input_record\n"); + + if (xprt->shutdown) + return -EIO; + if (!xprt->connected) + return -ENOTCONN; + + /* Read in a new fragment marker if necessary */ + /* Can we ever really expect to get completely empty fragments? */ + if ((result = tcp_read_fraghdr(xprt)) <= 0) + return result; + avail = result; + + /* Read in the xid if necessary */ + if ((result = tcp_read_xid(xprt, avail)) <= 0) + return result; + avail = result; + + /* Find and lock the request corresponding to this xid */ + req = xprt_lookup_rqst(xprt, xprt->tcp_xid); + if (req) { + task = req->rq_task; + if (xprt->tcp_copied == sizeof(xprt->tcp_xid) || req->rq_damaged) { + req->rq_damaged = 1; + /* Read in the request data */ + result = tcp_read_request(xprt, req, avail); } + rpc_unlock_task(task); + if (result < 0) + return result; + avail = result; } - if (!xprt->tcp_more) - xprt->tcp_total = 0; - offset = 0; -done: - dprintk("RPC: tcp_input_record done (off %d total %d copied %d)\n", - offset, xprt->tcp_total, xprt->tcp_copied); - xprt->tcp_offset = offset; + /* Skip over any trailing bytes on short reads */ + if ((result = tcp_read_discard(xprt, avail)) < 0) + return result; + + dprintk("RPC: tcp_input_record done (off %d reclen %d copied %d)\n", + xprt->tcp_offset, xprt->tcp_reclen, xprt->tcp_copied); + result = xprt->tcp_reclen; return result; } /* * TCP task queue stuff */ - -static struct rpc_xprt *rpc_xprt_pending = NULL; /* Chain by rx_pending of rpc_xprt's */ +LIST_HEAD(rpc_xprt_pending); /* List of xprts having pending tcp requests */ + +static inline +void tcp_rpciod_queue(void) +{ + rpciod_wake_up(); +} + +static inline +void xprt_append_pending(struct rpc_xprt *xprt) +{ + if (!list_empty(&xprt->rx_pending)) + return; + spin_lock_bh(&rpc_queue_lock); + if (list_empty(&xprt->rx_pending)) { + list_add(&xprt->rx_pending, rpc_xprt_pending.prev); + dprintk("RPC: xprt queue %p\n", xprt); + tcp_rpciod_queue(); + } + spin_unlock_bh(&rpc_queue_lock); +} + +static +void xprt_remove_pending(struct rpc_xprt *xprt) +{ + spin_lock_bh(&rpc_queue_lock); + if (!list_empty(&xprt->rx_pending)) { + list_del(&xprt->rx_pending); + INIT_LIST_HEAD(&xprt->rx_pending); + } + spin_unlock_bh(&rpc_queue_lock); +} + +static inline +struct rpc_xprt *xprt_remove_pending_next(void) +{ + struct rpc_xprt *xprt = NULL; + + spin_lock_bh(&rpc_queue_lock); + if (!list_empty(&rpc_xprt_pending)) { + xprt = list_entry(rpc_xprt_pending.next, struct rpc_xprt, rx_pending); + list_del(&xprt->rx_pending); + INIT_LIST_HEAD(&xprt->rx_pending); + } + spin_unlock_bh(&rpc_queue_lock); + return xprt; +} /* * This is protected from tcp_data_ready and the stack as its run * inside of the RPC I/O daemon */ -static void -do_rpciod_tcp_dispatcher(void) +void +__rpciod_tcp_dispatcher(void) { struct rpc_xprt *xprt; - int result = 0; + int safe_retry = 0, result; dprintk("rpciod_tcp_dispatcher: Queue Running\n"); /* * Empty each pending socket */ - - while(1) { - int safe_retry=0; - - if ((xprt = rpc_xprt_pending) == NULL) { - break; - } - xprt->rx_pending_flag = 0; - rpc_xprt_pending=xprt->rx_pending; - xprt->rx_pending = NULL; - + while ((xprt = xprt_remove_pending_next()) != NULL) { dprintk("rpciod_tcp_dispatcher: Processing %p\n", xprt); - do - { - if (safe_retry++ > 50) - break; + do { result = tcp_input_record(xprt); - } - while (result >= 0); - - switch (result) { - case -EAGAIN: - case -ENOTCONN: - case -EPIPE: - continue; - default: - printk(KERN_WARNING "RPC: unexpected error %d from tcp_input_record\n", - result); + } while (result >= 0); + + if (safe_retry++ > 200) { + schedule(); + safe_retry = 0; } } } -void rpciod_tcp_dispatcher(void) -{ - /* mama... start_bh_atomic was here... - Calls to sock->ops _are_ _impossible_ with disabled bh. Period. --ANK - */ - do_rpciod_tcp_dispatcher(); -} - -int xprt_tcp_pending(void) -{ - return rpc_xprt_pending != NULL; -} - -extern inline void tcp_rpciod_queue(void) -{ - rpciod_wake_up(); -} - /* * data_ready callback for TCP. We can't just jump into the * tcp recvmsg functions inside of the network receive bh or @@ -874,24 +972,15 @@ static void tcp_data_ready(struct sock *sk, int len) return; } + if (xprt->shutdown) + return; + + xprt_append_pending(xprt); + dprintk("RPC: tcp_data_ready client %p\n", xprt); dprintk("RPC: state %x conn %d dead %d zapped %d\n", sk->state, xprt->connected, sk->dead, sk->zapped); - /* - * If we are not waiting for the RPC bh run then - * we are now - */ - if (!xprt->rx_pending_flag) { - dprintk("RPC: xprt queue %p\n", rpc_xprt_pending); - - xprt->rx_pending=rpc_xprt_pending; - rpc_xprt_pending=xprt; - xprt->rx_pending_flag=1; - } else - dprintk("RPC: xprt queued already %p\n", xprt); - tcp_rpciod_queue(); - } @@ -907,26 +996,20 @@ tcp_state_change(struct sock *sk) sk->state, xprt->connected, sk->dead, sk->zapped); - switch(sk->state) { + spin_lock_bh(&xprt_sock_lock); + switch (sk->state) { case TCP_ESTABLISHED: - if (xprt->connected) - break; xprt->connected = 1; - xprt->connecting = 0; + if (xprt->snd_task && xprt->snd_task->tk_rpcwait == &xprt->sending) + rpc_wake_up_task(xprt->snd_task); rpc_wake_up(&xprt->reconn); - rpc_wake_up_next(&xprt->sending); - tcp_rpciod_queue(); - break; - case TCP_CLOSE: - if (xprt->connecting) - break; - xprt_disconnect(xprt); - rpc_wake_up_status(&xprt->reconn, -ENOTCONN); break; default: + xprt->connected = 0; + rpc_wake_up_status(&xprt->pending, -ENOTCONN); break; } - + spin_unlock_bh(&xprt_sock_lock); } /* @@ -940,20 +1023,23 @@ tcp_write_space(struct sock *sk) if (!(xprt = xprt_from_sock(sk))) return; + if (xprt->shutdown) + return; /* Wait until we have enough socket memory */ if (sock_wspace(sk) < min(sk->sndbuf,XPRT_MIN_WRITE_SPACE)) return; + spin_lock_bh(&xprt_sock_lock); if (xprt->write_space) - return; + goto out_unlock; xprt->write_space = 1; - if (!xprt->snd_task) - rpc_wake_up_next(&xprt->sending); - else if (!RPC_IS_RUNNING(xprt->snd_task)) + if (xprt->snd_task && xprt->snd_task->tk_rpcwait == &xprt->sending) rpc_wake_up_task(xprt->snd_task); + out_unlock: + spin_unlock_bh(&xprt_sock_lock); } static void @@ -963,20 +1049,24 @@ udp_write_space(struct sock *sk) if (!(xprt = xprt_from_sock(sk))) return; + if (xprt->shutdown) + return; /* Wait until we have enough socket memory */ if (sock_wspace(sk) < min(sk->sndbuf,XPRT_MIN_WRITE_SPACE)) return; + spin_lock_bh(&xprt_sock_lock); if (xprt->write_space) - return; + goto out_unlock; xprt->write_space = 1; - if (!xprt->snd_task) - rpc_wake_up_next(&xprt->sending); - else if (!RPC_IS_RUNNING(xprt->snd_task)) + + if (xprt->snd_task && xprt->snd_task->tk_rpcwait == &xprt->sending) rpc_wake_up_task(xprt->snd_task); + out_unlock: + spin_unlock_bh(&xprt_sock_lock); } /* @@ -987,9 +1077,8 @@ xprt_timer(struct rpc_task *task) { struct rpc_rqst *req = task->tk_rqstp; - if (req) { + if (req) xprt_adjust_cwnd(task->tk_xprt, -ETIMEDOUT); - } dprintk("RPC: %4d xprt_timer (%s request)\n", task->tk_pid, req ? "pending" : "backlogged"); @@ -1010,12 +1099,13 @@ xprt_down_transmit(struct rpc_task *task) struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt; struct rpc_rqst *req = task->tk_rqstp; - spin_lock_bh(&xprt_lock); + spin_lock(&xprt_lock); if (xprt->snd_task && xprt->snd_task != task) { dprintk("RPC: %4d TCP write queue full (task %d)\n", task->tk_pid, xprt->snd_task->tk_pid); - task->tk_timeout = req->rq_timeout.to_current; - rpc_sleep_on(&xprt->sending, task, xprt_transmit, NULL); + task->tk_timeout = 0; + task->tk_status = -EAGAIN; + rpc_sleep_on(&xprt->sending, task, NULL, NULL); } else if (!xprt->snd_task) { xprt->snd_task = task; #ifdef RPC_PROFILE @@ -1023,23 +1113,23 @@ xprt_down_transmit(struct rpc_task *task) #endif req->rq_bytes_sent = 0; } - spin_unlock_bh(&xprt_lock); + spin_unlock(&xprt_lock); return xprt->snd_task == task; } /* * Releases the socket for use by other requests. */ -static void +static inline void xprt_up_transmit(struct rpc_task *task) { struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt; if (xprt->snd_task && xprt->snd_task == task) { - spin_lock_bh(&xprt_lock); + spin_lock(&xprt_lock); xprt->snd_task = NULL; rpc_wake_up_next(&xprt->sending); - spin_unlock_bh(&xprt_lock); + spin_unlock(&xprt_lock); } } @@ -1050,7 +1140,6 @@ xprt_up_transmit(struct rpc_task *task) void xprt_transmit(struct rpc_task *task) { - struct rpc_timeout *timeo; struct rpc_rqst *req = task->tk_rqstp; struct rpc_xprt *xprt = req->rq_xprt; @@ -1060,26 +1149,21 @@ xprt_transmit(struct rpc_task *task) if (xprt->shutdown) task->tk_status = -EIO; + if (!xprt->connected) + task->tk_status = -ENOTCONN; + if (task->tk_status < 0) return; - /* Reset timeout parameters */ - timeo = &req->rq_timeout; - if (timeo->to_retries < 0) { - dprintk("RPC: %4d xprt_transmit reset timeo\n", - task->tk_pid); - timeo->to_retries = xprt->timeout.to_retries; - timeo->to_current = timeo->to_initval; - } + if (task->tk_rpcwait) + rpc_remove_wait_queue(task); /* set up everything as needed. */ /* Write the record marker */ if (xprt->stream) { - u32 marker; - - marker = htonl(0x80000000|(req->rq_slen-4)); - *((u32 *) req->rq_svec[0].iov_base) = marker; + u32 *marker = req->rq_svec[0].iov_base; + *marker = htonl(0x80000000|(req->rq_slen-sizeof(*marker))); } if (!xprt_down_transmit(task)) @@ -1095,24 +1179,14 @@ do_xprt_transmit(struct rpc_task *task) struct rpc_xprt *xprt = req->rq_xprt; int status, retry = 0; - if (xprt->shutdown) { - task->tk_status = -EIO; - goto out_release; - } /* For fast networks/servers we have to put the request on * the pending list now: + * Note that we don't want the task timing out during the + * call to xprt_sendmsg(), so we initially disable the timeout, + * and then reset it later... */ - req->rq_gotit = 0; - status = rpc_add_wait_queue(&xprt->pending, task); - if (!status) - task->tk_callback = NULL; - - if (status) { - printk(KERN_WARNING "RPC: failed to add task to queue: error: %d!\n", status); - task->tk_status = status; - goto out_release; - } + xprt_receive(task); /* Continue transmitting the packet/record. We must be careful * to cope with writespace callbacks arriving _after_ we have @@ -1129,80 +1203,67 @@ do_xprt_transmit(struct rpc_task *task) req->rq_bytes_sent += status; if (req->rq_bytes_sent >= req->rq_slen) - goto out_release; - } - - if (status < req->rq_slen) - status = -EAGAIN; - - if (status >= 0 || !xprt->stream) { - dprintk("RPC: %4d xmit complete\n", task->tk_pid); - goto out_release; + goto out_receive; + } else { + if (status >= req->rq_slen) + goto out_receive; + status = -ENOMEM; + break; } dprintk("RPC: %4d xmit incomplete (%d left of %d)\n", task->tk_pid, req->rq_slen - req->rq_bytes_sent, req->rq_slen); + status = -EAGAIN; if (retry++ > 50) break; } + rpc_unlock_task(task); - task->tk_status = (status == -ENOMEM) ? -EAGAIN : status; + task->tk_status = status; - /* We don't care if we got a reply, so don't protect - * against bh. */ - if (task->tk_rpcwait == &xprt->pending) - rpc_remove_wait_queue(task); + /* Note: at this point, task->tk_sleeping has not yet been set, + * hence there is no danger of the waking up task being put on + * schedq, and being picked up by a parallel run of rpciod(). + */ + rpc_wake_up_task(task); + if (!RPC_IS_RUNNING(task)) + goto out_release; - /* Protect against (udp|tcp)_write_space */ - spin_lock_bh(&xprt_lock); - if (status == -ENOMEM || status == -EAGAIN) { + switch (status) { + case -ENOMEM: + /* Protect against (udp|tcp)_write_space */ task->tk_timeout = req->rq_timeout.to_current; + spin_lock_bh(&xprt_sock_lock); if (!xprt->write_space) - rpc_sleep_on(&xprt->sending, task, xprt_transmit_status, - xprt_transmit_timeout); - spin_unlock_bh(&xprt_lock); + rpc_sleep_on(&xprt->sending, task, NULL, NULL); + spin_unlock_bh(&xprt_sock_lock); return; - } - spin_unlock_bh(&xprt_lock); - -out_release: - xprt_up_transmit(task); -} - -/* - * This callback is invoked when the sending task is forced to sleep - * because the TCP write buffers are full - */ -static void -xprt_transmit_status(struct rpc_task *task) -{ - struct rpc_xprt *xprt = task->tk_client->cl_xprt; - - dprintk("RPC: %4d transmit_status %d\n", task->tk_pid, task->tk_status); - if (xprt->snd_task == task) { - task->tk_status = 0; - do_xprt_transmit(task); + case -EAGAIN: + /* Keep holding the socket if it is blocked */ + rpc_delay(task, HZ>>4); return; + case -ECONNREFUSED: + case -ENOTCONN: + if (!xprt->stream) + return; + default: + goto out_release; } -} -/* - * RPC transmit timeout handler. - */ -static void -xprt_transmit_timeout(struct rpc_task *task) -{ - dprintk("RPC: %4d transmit_timeout %d\n", task->tk_pid, task->tk_status); - task->tk_status = -ETIMEDOUT; - task->tk_timeout = 0; - rpc_wake_up_task(task); + out_receive: + dprintk("RPC: %4d xmit complete\n", task->tk_pid); + /* Set the task's receive timeout value */ + task->tk_timeout = req->rq_timeout.to_current; + rpc_add_timer(task, xprt_timer); + rpc_unlock_task(task); + out_release: xprt_up_transmit(task); } /* - * Wait for the reply to our call. + * Queue the task for a reply to our call. * When the callback is invoked, the congestion window should have * been updated already. */ @@ -1214,42 +1275,8 @@ xprt_receive(struct rpc_task *task) dprintk("RPC: %4d xprt_receive\n", task->tk_pid); - /* - * Wait until rq_gotit goes non-null, or timeout elapsed. - */ - task->tk_timeout = req->rq_timeout.to_current; - - spin_lock_bh(&xprt_lock); - if (task->tk_rpcwait) - rpc_remove_wait_queue(task); - - if (task->tk_status < 0 || xprt->shutdown) { - spin_unlock_bh(&xprt_lock); - goto out; - } - - if (!req->rq_gotit) { - rpc_sleep_on(&xprt->pending, task, - xprt_receive_status, xprt_timer); - spin_unlock_bh(&xprt_lock); - return; - } - spin_unlock_bh(&xprt_lock); - - dprintk("RPC: %4d xprt_receive returns %d\n", - task->tk_pid, task->tk_status); - out: - xprt_receive_status(task); -} - -static void -xprt_receive_status(struct rpc_task *task) -{ - struct rpc_xprt *xprt = task->tk_xprt; - - if (xprt->tcp_rqstp == task->tk_rqstp) - xprt->tcp_rqstp = NULL; - + task->tk_timeout = 0; + rpc_sleep_locked(&xprt->pending, task, NULL, NULL); } /* @@ -1335,7 +1362,6 @@ xprt_request_init(struct rpc_task *task, struct rpc_xprt *xprt) dprintk("RPC: %4d reserved req %p xid %08x\n", task->tk_pid, req, xid); task->tk_status = 0; - req->rq_gotit = 0; req->rq_timeout = xprt->timeout; req->rq_task = task; req->rq_xprt = xprt; @@ -1353,6 +1379,7 @@ xprt_release(struct rpc_task *task) struct rpc_xprt *xprt = task->tk_xprt; struct rpc_rqst *req; + xprt_up_transmit(task); if (!(req = task->tk_rqstp)) return; task->tk_rqstp = NULL; @@ -1363,16 +1390,16 @@ xprt_release(struct rpc_task *task) spin_lock(&xprt_lock); req->rq_next = xprt->free; xprt->free = req; - spin_unlock(&xprt_lock); /* remove slot from queue of pending */ - spin_lock_bh(&xprt_lock); if (task->tk_rpcwait) { printk("RPC: task of released request still queued!\n"); - rpc_del_timer(task); +#ifdef RPC_DEBUG + printk("RPC: (task is on %s)\n", rpc_qname(task->tk_rpcwait)); +#endif rpc_remove_wait_queue(task); } - spin_unlock_bh(&xprt_lock); + spin_unlock(&xprt_lock); /* Decrease congestion value. */ xprt->cong -= RPC_CWNDSCALE; @@ -1389,7 +1416,7 @@ xprt_default_timeout(struct rpc_timeout *to, int proto) if (proto == IPPROTO_UDP) xprt_set_timeout(to, 5, 5 * HZ); else - xprt_set_timeout(to, 5, 15 * HZ); + xprt_set_timeout(to, 5, 60 * HZ); } /* @@ -1416,52 +1443,33 @@ xprt_setup(struct socket *sock, int proto, { struct rpc_xprt *xprt; struct rpc_rqst *req; - struct sock *inet; int i; dprintk("RPC: setting up %s transport...\n", proto == IPPROTO_UDP? "UDP" : "TCP"); - inet = sock->sk; - if ((xprt = kmalloc(sizeof(struct rpc_xprt), GFP_KERNEL)) == NULL) return NULL; memset(xprt, 0, sizeof(*xprt)); /* Nnnngh! */ - xprt->file = NULL; - xprt->sock = sock; - xprt->inet = inet; xprt->addr = *ap; xprt->prot = proto; xprt->stream = (proto == IPPROTO_TCP)? 1 : 0; - xprt->congtime = jiffies; - init_waitqueue_head(&xprt->cong_wait); - inet->user_data = xprt; - xprt->old_data_ready = inet->data_ready; - xprt->old_state_change = inet->state_change; - xprt->old_write_space = inet->write_space; - if (proto == IPPROTO_UDP) { - inet->data_ready = udp_data_ready; - inet->write_space = udp_write_space; - inet->no_check = UDP_CSUM_NORCV; - xprt->cwnd = RPC_INITCWND; - } else { - inet->data_ready = tcp_data_ready; - inet->state_change = tcp_state_change; - inet->write_space = tcp_write_space; + if (xprt->stream) { xprt->cwnd = RPC_MAXCWND; xprt->nocong = 1; - } - xprt->connected = 1; + } else + xprt->cwnd = RPC_INITCWND; + xprt->congtime = jiffies; + init_waitqueue_head(&xprt->cong_wait); /* Set timeout parameters */ if (to) { xprt->timeout = *to; xprt->timeout.to_current = to->to_initval; xprt->timeout.to_resrvval = to->to_maxval << 1; - } else { + } else xprt_default_timeout(&xprt->timeout, xprt->prot); - } xprt->pending = RPC_INIT_WAITQ("xprt_pending"); xprt->sending = RPC_INIT_WAITQ("xprt_sending"); @@ -1474,13 +1482,11 @@ xprt_setup(struct socket *sock, int proto, req->rq_next = NULL; xprt->free = xprt->slot; + INIT_LIST_HEAD(&xprt->rx_pending); + dprintk("RPC: created transport %p\n", xprt); - /* - * TCP requires the rpc I/O daemon is present - */ - if(proto==IPPROTO_TCP) - rpciod_up(); + xprt_bind_socket(xprt, sock); return xprt; } @@ -1508,17 +1514,52 @@ xprt_bindresvport(struct socket *sock) return err; } +static int +xprt_bind_socket(struct rpc_xprt *xprt, struct socket *sock) +{ + struct sock *sk = sock->sk; + + if (xprt->inet) + return -EBUSY; + + sk->user_data = xprt; + xprt->old_data_ready = sk->data_ready; + xprt->old_state_change = sk->state_change; + xprt->old_write_space = sk->write_space; + if (xprt->prot == IPPROTO_UDP) { + sk->data_ready = udp_data_ready; + sk->write_space = udp_write_space; + sk->no_check = UDP_CSUM_NORCV; + xprt->connected = 1; + } else { + sk->data_ready = tcp_data_ready; + sk->state_change = tcp_state_change; + sk->write_space = tcp_write_space; + xprt->connected = 0; + } + + /* Reset to new socket */ + xprt->sock = sock; + xprt->inet = sk; + /* + * TCP requires the rpc I/O daemon is present + */ + if(xprt->stream) + rpciod_up(); + + return 0; +} + /* * Create a client socket given the protocol and peer address. */ static struct socket * -xprt_create_socket(int proto, struct sockaddr_in *sap, struct rpc_timeout *to) +xprt_create_socket(int proto, struct rpc_timeout *to) { struct socket *sock; int type, err; - dprintk("RPC: xprt_create_socket(%08x, %s %d)\n", - sap? ntohl(sap->sin_addr.s_addr) : 0, + dprintk("RPC: xprt_create_socket(%s %d)\n", (proto == IPPROTO_UDP)? "udp" : "tcp", proto); type = (proto == IPPROTO_UDP)? SOCK_DGRAM : SOCK_STREAM; @@ -1532,15 +1573,6 @@ xprt_create_socket(int proto, struct sockaddr_in *sap, struct rpc_timeout *to) if (!current->fsuid && xprt_bindresvport(sock) < 0) goto failed; - if (type == SOCK_STREAM && sap) { - err = sock->ops->connect(sock, (struct sockaddr *) sap, - sizeof(*sap), 0); - if (err < 0) { - printk("RPC: TCP connect failed (%d).\n", -err); - goto failed; - } - } - return sock; failed: @@ -1559,7 +1591,7 @@ xprt_create_proto(int proto, struct sockaddr_in *sap, struct rpc_timeout *to) dprintk("RPC: xprt_create_proto called\n"); - if (!(sock = xprt_create_socket(proto, sap, to))) + if (!(sock = xprt_create_socket(proto, to))) return NULL; if (!(xprt = xprt_setup(sock, proto, sap, to))) @@ -1587,8 +1619,6 @@ xprt_shutdown(struct rpc_xprt *xprt) */ int xprt_clear_backlog(struct rpc_xprt *xprt) { - if (!xprt) - return 0; if (RPCXPRT_CONGESTED(xprt)) return 0; rpc_wake_up_next(&xprt->backlog); @@ -1603,6 +1633,7 @@ int xprt_destroy(struct rpc_xprt *xprt) { dprintk("RPC: destroying transport %p\n", xprt); + xprt_shutdown(xprt); xprt_close(xprt); kfree(xprt); |