diff options
Diffstat (limited to 'net/sunrpc')
-rw-r--r-- | net/sunrpc/clnt.c | 37 | ||||
-rw-r--r-- | net/sunrpc/pmap_clnt.c | 8 | ||||
-rw-r--r-- | net/sunrpc/sched.c | 8 | ||||
-rw-r--r-- | net/sunrpc/svc.c | 4 | ||||
-rw-r--r-- | net/sunrpc/svcsock.c | 49 | ||||
-rw-r--r-- | net/sunrpc/xprt.c | 42 |
6 files changed, 94 insertions, 54 deletions
diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c index c41dfc1eb..ce93ab71c 100644 --- a/net/sunrpc/clnt.c +++ b/net/sunrpc/clnt.c @@ -22,7 +22,6 @@ */ #include <asm/system.h> -#include <asm/segment.h> #include <linux/types.h> #include <linux/mm.h> @@ -35,7 +34,7 @@ #include <linux/nfs.h> -#define RPC_SLACK_SPACE 1024 /* total overkill */ +#define RPC_SLACK_SPACE 512 /* total overkill */ #ifdef RPC_DEBUG # define RPCDBG_FACILITY RPCDBG_CALL @@ -91,6 +90,7 @@ rpc_create_client(struct rpc_xprt *xprt, char *servname, if (!clnt) goto out_no_clnt; memset(clnt, 0, sizeof(*clnt)); + atomic_set(&clnt->cl_users, 0); clnt->cl_xprt = xprt; clnt->cl_procinfo = version->procs; @@ -140,16 +140,16 @@ rpc_shutdown_client(struct rpc_clnt *clnt) { dprintk("RPC: shutting down %s client for %s\n", clnt->cl_protname, clnt->cl_server); - while (clnt->cl_users) { + while (atomic_read(&clnt->cl_users)) { #ifdef RPC_DEBUG dprintk("RPC: rpc_shutdown_client: client %s, tasks=%d\n", - clnt->cl_protname, clnt->cl_users); + clnt->cl_protname, atomic_read(&clnt->cl_users)); #endif /* Don't let rpc_release_client destroy us */ clnt->cl_oneshot = 0; clnt->cl_dead = 0; rpc_killall_tasks(clnt); - sleep_on(&destroy_wait); + sleep_on_timeout(&destroy_wait, 1*HZ); } return rpc_destroy_client(clnt); } @@ -182,14 +182,10 @@ void rpc_release_client(struct rpc_clnt *clnt) { dprintk("RPC: rpc_release_client(%p, %d)\n", - clnt, clnt->cl_users); - if (clnt->cl_users) { - if (--(clnt->cl_users) > 0) - return; - } else - printk("rpc_release_client: %s client already free??\n", - clnt->cl_protname); + clnt, atomic_read(&clnt->cl_users)); + if (!atomic_dec_and_test(&clnt->cl_users)) + return; wake_up(&destroy_wait); if (clnt->cl_oneshot || clnt->cl_dead) rpc_destroy_client(clnt); @@ -446,7 +442,7 @@ call_allocate(struct rpc_task *task) * auth->au_wslack */ bufsiz = rpcproc_bufsiz(clnt, task->tk_msg.rpc_proc) + RPC_SLACK_SPACE; - if ((task->tk_buffer = rpc_malloc(task, bufsiz)) != NULL) + if ((task->tk_buffer = rpc_malloc(task, bufsiz << 1)) != NULL) return; printk(KERN_INFO "RPC: buffer allocation failed for task %p\n", task); @@ -480,11 +476,11 @@ call_encode(struct rpc_task *task) /* Default buffer setup */ bufsiz = rpcproc_bufsiz(clnt, task->tk_msg.rpc_proc)+RPC_SLACK_SPACE; - req->rq_svec[0].iov_base = task->tk_buffer; + req->rq_svec[0].iov_base = (void *)task->tk_buffer; req->rq_svec[0].iov_len = bufsiz; req->rq_slen = 0; req->rq_snr = 1; - req->rq_rvec[0].iov_base = task->tk_buffer; + req->rq_rvec[0].iov_base = (void *)((char *)task->tk_buffer + bufsiz); req->rq_rvec[0].iov_len = bufsiz; req->rq_rlen = bufsiz; req->rq_rnr = 1; @@ -656,9 +652,11 @@ call_timeout(struct rpc_task *task) if (req) printk(KERN_NOTICE "%s: server %s not responding, still trying\n", clnt->cl_protname, clnt->cl_server); +#ifdef RPC_DEBUG else printk(KERN_NOTICE "%s: task %d can't get a request slot\n", clnt->cl_protname, task->tk_pid); +#endif } if (clnt->cl_autobind) clnt->cl_port = 0; @@ -774,12 +772,13 @@ call_header(struct rpc_task *task) { struct rpc_clnt *clnt = task->tk_client; struct rpc_xprt *xprt = clnt->cl_xprt; - u32 *p = task->tk_buffer; + struct rpc_rqst *req = task->tk_rqstp; + u32 *p = req->rq_svec[0].iov_base; /* FIXME: check buffer size? */ if (xprt->stream) *p++ = 0; /* fill in later */ - *p++ = task->tk_rqstp->rq_xid; /* XID */ + *p++ = req->rq_xid; /* XID */ *p++ = htonl(RPC_CALL); /* CALL */ *p++ = htonl(RPC_VERSION); /* RPC version */ *p++ = htonl(clnt->cl_prog); /* program number */ @@ -794,7 +793,7 @@ call_header(struct rpc_task *task) static u32 * call_verify(struct rpc_task *task) { - u32 *p = task->tk_buffer, n; + u32 *p = task->tk_rqstp->rq_rvec[0].iov_base, n; p += 1; /* skip XID */ @@ -860,7 +859,7 @@ garbage: task->tk_client->cl_stats->rpcgarbage++; if (task->tk_garb_retry) { task->tk_garb_retry--; - printk(KERN_WARNING "RPC: garbage, retrying %4d\n", task->tk_pid); + dprintk(KERN_WARNING "RPC: garbage, retrying %4d\n", task->tk_pid); task->tk_action = call_encode; return NULL; } diff --git a/net/sunrpc/pmap_clnt.c b/net/sunrpc/pmap_clnt.c index 026edcd70..45b775103 100644 --- a/net/sunrpc/pmap_clnt.c +++ b/net/sunrpc/pmap_clnt.c @@ -31,6 +31,7 @@ static struct rpc_clnt * pmap_create(char *, struct sockaddr_in *, int); static void pmap_getport_done(struct rpc_task *); extern struct rpc_program pmap_program; +spinlock_t pmap_lock = SPIN_LOCK_UNLOCKED; /* * Obtain the port for a given RPC service on a given host. This one can @@ -49,11 +50,14 @@ rpc_getport(struct rpc_task *task, struct rpc_clnt *clnt) task->tk_pid, clnt->cl_server, map->pm_prog, map->pm_vers, map->pm_prot); + spin_lock(&pmap_lock); if (clnt->cl_binding) { rpc_sleep_on(&clnt->cl_bindwait, task, NULL, 0); + spin_unlock(&pmap_lock); return; } clnt->cl_binding = 1; + spin_unlock(&pmap_lock); task->tk_status = -EACCES; /* why set this? returns -EIO below */ if (!(pmap_clnt = pmap_create(clnt->cl_server, sap, map->pm_prot))) @@ -74,8 +78,10 @@ rpc_getport(struct rpc_task *task, struct rpc_clnt *clnt) return; bailout: + spin_lock(&pmap_lock); clnt->cl_binding = 0; rpc_wake_up(&clnt->cl_bindwait); + spin_unlock(&pmap_lock); task->tk_status = -EIO; task->tk_action = NULL; } @@ -129,8 +135,10 @@ pmap_getport_done(struct rpc_task *task) clnt->cl_port = htons(clnt->cl_port); clnt->cl_xprt->addr.sin_port = clnt->cl_port; } + spin_lock(&pmap_lock); clnt->cl_binding = 0; rpc_wake_up(&clnt->cl_bindwait); + spin_unlock(&pmap_lock); } /* diff --git a/net/sunrpc/sched.c b/net/sunrpc/sched.c index b1e75b87f..9dc2d1247 100644 --- a/net/sunrpc/sched.c +++ b/net/sunrpc/sched.c @@ -669,8 +669,10 @@ __rpc_schedule(void) if (task->tk_lock) { spin_unlock_bh(&rpc_queue_lock); printk(KERN_ERR "RPC: Locked task was scheduled !!!!\n"); +#ifdef RPC_DEBUG rpc_debug = ~0; rpc_show_tasks(); +#endif break; } __rpc_remove_wait_queue(task); @@ -778,7 +780,7 @@ rpc_init_task(struct rpc_task *task, struct rpc_clnt *clnt, spin_unlock(&rpc_sched_lock); if (clnt) - clnt->cl_users++; + atomic_inc(&clnt->cl_users); #ifdef RPC_DEBUG task->tk_magic = 0xf00baa; @@ -823,8 +825,8 @@ cleanup: /* Check whether to release the client */ if (clnt) { printk("rpc_new_task: failed, users=%d, oneshot=%d\n", - clnt->cl_users, clnt->cl_oneshot); - clnt->cl_users++; /* pretend we were used ... */ + atomic_read(&clnt->cl_users), clnt->cl_oneshot); + atomic_inc(&clnt->cl_users); /* pretend we were used ... */ rpc_release_client(clnt); } goto out; diff --git a/net/sunrpc/svc.c b/net/sunrpc/svc.c index 385c0f30b..051a643ac 100644 --- a/net/sunrpc/svc.c +++ b/net/sunrpc/svc.c @@ -273,8 +273,8 @@ svc_process(struct svc_serv *serv, struct svc_rqst *rqstp) if (prog != progp->pg_prog) goto err_bad_prog; - versp = progp->pg_vers[vers]; - if (!versp || vers >= progp->pg_nvers) + if (vers >= progp->pg_nvers || + !(versp = progp->pg_vers[vers])) goto err_bad_vers; procp = versp->vs_proc + proc; diff --git a/net/sunrpc/svcsock.c b/net/sunrpc/svcsock.c index f64653120..e0a13d725 100644 --- a/net/sunrpc/svcsock.c +++ b/net/sunrpc/svcsock.c @@ -301,7 +301,7 @@ svc_recvfrom(struct svc_rqst *rqstp, struct iovec *iov, int nr, int buflen) mm_segment_t oldfs; struct msghdr msg; struct socket *sock; - int len; + int len, alen; rqstp->rq_addrlen = sizeof(rqstp->rq_addr); sock = rqstp->rq_sock->sk_sock; @@ -319,6 +319,13 @@ svc_recvfrom(struct svc_rqst *rqstp, struct iovec *iov, int nr, int buflen) len = sock_recvmsg(sock, &msg, buflen, MSG_DONTWAIT); set_fs(oldfs); + /* sock_recvmsg doesn't fill in the name/namelen, so we must.. + * possibly we should cache this in the svc_sock structure + * at accept time. FIXME + */ + alen = sizeof(rqstp->rq_addr); + sock->ops->getname(sock, (struct sockaddr *)&rqstp->rq_addr, &alen, 1); + dprintk("svc: socket %p recvfrom(%p, %Zu) = %d\n", rqstp->rq_sock, iov[0].iov_base, iov[0].iov_len, len); @@ -539,15 +546,15 @@ svc_tcp_accept(struct svc_sock *svsk) } /* Ideally, we would want to reject connections from unauthorized - * hosts here, but we have no generic client tables. For now, - * we just punt connects from unprivileged ports. */ + * hosts here, but when we get encription, the IP of the host won't + * tell us anything. For now just warn about unpriv connections. + */ if (ntohs(sin.sin_port) >= 1024) { if (net_ratelimit()) printk(KERN_WARNING - "%s: connect from unprivileged port: %u.%u.%u.%u:%d", + "%s: connect from unprivileged port: %u.%u.%u.%u:%d\n", serv->sv_name, NIPQUAD(sin.sin_addr.s_addr), ntohs(sin.sin_port)); - goto failed; } dprintk("%s: connect from %u.%u.%u.%u:%04x\n", serv->sv_name, @@ -584,7 +591,7 @@ svc_tcp_recvfrom(struct svc_rqst *rqstp) struct svc_sock *svsk = rqstp->rq_sock; struct svc_serv *serv = svsk->sk_server; struct svc_buf *bufp = &rqstp->rq_argbuf; - int len, ready; + int len, ready, used; dprintk("svc: tcp_recv %p data %d conn %d close %d\n", svsk, svsk->sk_data, svsk->sk_conn, svsk->sk_close); @@ -618,6 +625,11 @@ svc_tcp_recvfrom(struct svc_rqst *rqstp) svsk->sk_reclen = ntohl(svsk->sk_reclen); if (!(svsk->sk_reclen & 0x80000000)) { + /* FIXME: technically, a record can be fragmented, + * and non-terminal fragments will not have the top + * bit set in the fragment length header. + * But apparently no known nfs clients send fragmented + * records. */ /* FIXME: shutdown socket */ printk(KERN_NOTICE "RPC: bad TCP reclen %08lx", (unsigned long) svsk->sk_reclen); @@ -633,11 +645,21 @@ svc_tcp_recvfrom(struct svc_rqst *rqstp) goto error; if (len < svsk->sk_reclen) { + /* FIXME: if sk_reclen > window-size, then we will + * never be able to receive the record, so should + * shutdown the connection + */ dprintk("svc: incomplete TCP record (%d of %d)\n", len, svsk->sk_reclen); svc_sock_received(svsk, ready); return -EAGAIN; /* record not complete */ } + /* if we think there is only one more record to read, but + * it is bigger than we expect, then two records must have arrived + * together, so pretend we aren't using the record.. */ + if (len > svsk->sk_reclen && ready == 1) + used = 0; + else used = 1; /* Frob argbuf */ bufp->iov[0].iov_base += 4; @@ -664,7 +686,7 @@ svc_tcp_recvfrom(struct svc_rqst *rqstp) svsk->sk_reclen = 0; svsk->sk_tcplen = 0; - svc_sock_received(svsk, 1); + svc_sock_received(svsk, used); if (serv->sv_stats) serv->sv_stats->nettcpcnt++; @@ -692,6 +714,7 @@ static int svc_tcp_sendto(struct svc_rqst *rqstp) { struct svc_buf *bufp = &rqstp->rq_resbuf; + int sent; /* Set up the first element of the reply iovec. * Any other iovecs that may be in use have been taken @@ -701,7 +724,17 @@ svc_tcp_sendto(struct svc_rqst *rqstp) bufp->iov[0].iov_len = bufp->len << 2; bufp->base[0] = htonl(0x80000000|((bufp->len << 2) - 4)); - return svc_sendto(rqstp, bufp->iov, bufp->nriov); + sent = svc_sendto(rqstp, bufp->iov, bufp->nriov); + if (sent != bufp->len<<2) { + printk(KERN_NOTICE "rpc-srv/tcp: %s: sent only %d bytes of %d - should shutdown socket\n", + rqstp->rq_sock->sk_server->sv_name, + sent, bufp->len << 2); + /* FIXME: should shutdown the socket, or allocate more memort + * or wait and try again or something. Otherwise + * client will get confused + */ + } + return sent; } static int diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c index b353aa37a..7534288db 100644 --- a/net/sunrpc/xprt.c +++ b/net/sunrpc/xprt.c @@ -290,11 +290,12 @@ xprt_adjust_cwnd(struct rpc_xprt *xprt, int result) { unsigned long cwnd = xprt->cwnd; + spin_lock_bh(&xprt_sock_lock); if (xprt->nocong) - return; + goto out; if (result >= 0) { if (xprt->cong < cwnd || time_before(jiffies, xprt->congtime)) - return; + goto out; /* The (cwnd >> 1) term makes sure * the result gets rounded properly. */ cwnd += (RPC_CWNDSCALE * RPC_CWNDSCALE + (cwnd >> 1)) / cwnd; @@ -317,6 +318,8 @@ xprt_adjust_cwnd(struct rpc_xprt *xprt, int result) } xprt->cwnd = cwnd; + out: + spin_unlock_bh(&xprt_sock_lock); } /* @@ -1294,15 +1297,18 @@ xprt_reserve(struct rpc_task *task) dprintk("RPC: %4d xprt_reserve cong = %ld cwnd = %ld\n", task->tk_pid, xprt->cong, xprt->cwnd); - if (!RPCXPRT_CONGESTED(xprt) && xprt->free) { - xprt_reserve_status(task); + spin_lock_bh(&xprt_sock_lock); + xprt_reserve_status(task); + if (task->tk_rqstp) { task->tk_timeout = 0; } else if (!task->tk_timeout) { task->tk_status = -ENOBUFS; } else { dprintk("RPC: xprt_reserve waiting on backlog\n"); - rpc_sleep_on(&xprt->backlog, task, xprt_reserve_status, NULL); + task->tk_status = -EAGAIN; + rpc_sleep_on(&xprt->backlog, task, NULL, NULL); } + spin_unlock_bh(&xprt_sock_lock); dprintk("RPC: %4d xprt_reserve returns %d\n", task->tk_pid, task->tk_status); return task->tk_status; @@ -1323,25 +1329,20 @@ xprt_reserve_status(struct rpc_task *task) /* NOP */ } else if (task->tk_rqstp) { /* We've already been given a request slot: NOP */ - } else if (!RPCXPRT_CONGESTED(xprt) && xprt->free) { + } else { + if (RPCXPRT_CONGESTED(xprt) || !(req = xprt->free)) + goto out_nofree; /* OK: There's room for us. Grab a free slot and bump * congestion value */ - spin_lock(&xprt_lock); - if (!(req = xprt->free)) { - spin_unlock(&xprt_lock); - goto out_nofree; - } xprt->free = req->rq_next; req->rq_next = NULL; - spin_unlock(&xprt_lock); xprt->cong += RPC_CWNDSCALE; task->tk_rqstp = req; xprt_request_init(task, xprt); if (xprt->free) xprt_clear_backlog(xprt); - } else - goto out_nofree; + } return; @@ -1388,24 +1389,21 @@ xprt_release(struct rpc_task *task) dprintk("RPC: %4d release request %p\n", task->tk_pid, req); - spin_lock(&xprt_lock); - req->rq_next = xprt->free; - xprt->free = req; - /* remove slot from queue of pending */ if (task->tk_rpcwait) { printk("RPC: task of released request still queued!\n"); -#ifdef RPC_DEBUG - printk("RPC: (task is on %s)\n", rpc_qname(task->tk_rpcwait)); -#endif rpc_remove_wait_queue(task); } - spin_unlock(&xprt_lock); + + spin_lock_bh(&xprt_sock_lock); + req->rq_next = xprt->free; + xprt->free = req; /* Decrease congestion value. */ xprt->cong -= RPC_CWNDSCALE; xprt_clear_backlog(xprt); + spin_unlock_bh(&xprt_sock_lock); } /* |