summaryrefslogtreecommitdiffstats
path: root/net/sunrpc
diff options
context:
space:
mode:
Diffstat (limited to 'net/sunrpc')
-rw-r--r--net/sunrpc/clnt.c37
-rw-r--r--net/sunrpc/pmap_clnt.c8
-rw-r--r--net/sunrpc/sched.c8
-rw-r--r--net/sunrpc/svc.c4
-rw-r--r--net/sunrpc/svcsock.c49
-rw-r--r--net/sunrpc/xprt.c42
6 files changed, 94 insertions, 54 deletions
diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c
index c41dfc1eb..ce93ab71c 100644
--- a/net/sunrpc/clnt.c
+++ b/net/sunrpc/clnt.c
@@ -22,7 +22,6 @@
*/
#include <asm/system.h>
-#include <asm/segment.h>
#include <linux/types.h>
#include <linux/mm.h>
@@ -35,7 +34,7 @@
#include <linux/nfs.h>
-#define RPC_SLACK_SPACE 1024 /* total overkill */
+#define RPC_SLACK_SPACE 512 /* total overkill */
#ifdef RPC_DEBUG
# define RPCDBG_FACILITY RPCDBG_CALL
@@ -91,6 +90,7 @@ rpc_create_client(struct rpc_xprt *xprt, char *servname,
if (!clnt)
goto out_no_clnt;
memset(clnt, 0, sizeof(*clnt));
+ atomic_set(&clnt->cl_users, 0);
clnt->cl_xprt = xprt;
clnt->cl_procinfo = version->procs;
@@ -140,16 +140,16 @@ rpc_shutdown_client(struct rpc_clnt *clnt)
{
dprintk("RPC: shutting down %s client for %s\n",
clnt->cl_protname, clnt->cl_server);
- while (clnt->cl_users) {
+ while (atomic_read(&clnt->cl_users)) {
#ifdef RPC_DEBUG
dprintk("RPC: rpc_shutdown_client: client %s, tasks=%d\n",
- clnt->cl_protname, clnt->cl_users);
+ clnt->cl_protname, atomic_read(&clnt->cl_users));
#endif
/* Don't let rpc_release_client destroy us */
clnt->cl_oneshot = 0;
clnt->cl_dead = 0;
rpc_killall_tasks(clnt);
- sleep_on(&destroy_wait);
+ sleep_on_timeout(&destroy_wait, 1*HZ);
}
return rpc_destroy_client(clnt);
}
@@ -182,14 +182,10 @@ void
rpc_release_client(struct rpc_clnt *clnt)
{
dprintk("RPC: rpc_release_client(%p, %d)\n",
- clnt, clnt->cl_users);
- if (clnt->cl_users) {
- if (--(clnt->cl_users) > 0)
- return;
- } else
- printk("rpc_release_client: %s client already free??\n",
- clnt->cl_protname);
+ clnt, atomic_read(&clnt->cl_users));
+ if (!atomic_dec_and_test(&clnt->cl_users))
+ return;
wake_up(&destroy_wait);
if (clnt->cl_oneshot || clnt->cl_dead)
rpc_destroy_client(clnt);
@@ -446,7 +442,7 @@ call_allocate(struct rpc_task *task)
* auth->au_wslack */
bufsiz = rpcproc_bufsiz(clnt, task->tk_msg.rpc_proc) + RPC_SLACK_SPACE;
- if ((task->tk_buffer = rpc_malloc(task, bufsiz)) != NULL)
+ if ((task->tk_buffer = rpc_malloc(task, bufsiz << 1)) != NULL)
return;
printk(KERN_INFO "RPC: buffer allocation failed for task %p\n", task);
@@ -480,11 +476,11 @@ call_encode(struct rpc_task *task)
/* Default buffer setup */
bufsiz = rpcproc_bufsiz(clnt, task->tk_msg.rpc_proc)+RPC_SLACK_SPACE;
- req->rq_svec[0].iov_base = task->tk_buffer;
+ req->rq_svec[0].iov_base = (void *)task->tk_buffer;
req->rq_svec[0].iov_len = bufsiz;
req->rq_slen = 0;
req->rq_snr = 1;
- req->rq_rvec[0].iov_base = task->tk_buffer;
+ req->rq_rvec[0].iov_base = (void *)((char *)task->tk_buffer + bufsiz);
req->rq_rvec[0].iov_len = bufsiz;
req->rq_rlen = bufsiz;
req->rq_rnr = 1;
@@ -656,9 +652,11 @@ call_timeout(struct rpc_task *task)
if (req)
printk(KERN_NOTICE "%s: server %s not responding, still trying\n",
clnt->cl_protname, clnt->cl_server);
+#ifdef RPC_DEBUG
else
printk(KERN_NOTICE "%s: task %d can't get a request slot\n",
clnt->cl_protname, task->tk_pid);
+#endif
}
if (clnt->cl_autobind)
clnt->cl_port = 0;
@@ -774,12 +772,13 @@ call_header(struct rpc_task *task)
{
struct rpc_clnt *clnt = task->tk_client;
struct rpc_xprt *xprt = clnt->cl_xprt;
- u32 *p = task->tk_buffer;
+ struct rpc_rqst *req = task->tk_rqstp;
+ u32 *p = req->rq_svec[0].iov_base;
/* FIXME: check buffer size? */
if (xprt->stream)
*p++ = 0; /* fill in later */
- *p++ = task->tk_rqstp->rq_xid; /* XID */
+ *p++ = req->rq_xid; /* XID */
*p++ = htonl(RPC_CALL); /* CALL */
*p++ = htonl(RPC_VERSION); /* RPC version */
*p++ = htonl(clnt->cl_prog); /* program number */
@@ -794,7 +793,7 @@ call_header(struct rpc_task *task)
static u32 *
call_verify(struct rpc_task *task)
{
- u32 *p = task->tk_buffer, n;
+ u32 *p = task->tk_rqstp->rq_rvec[0].iov_base, n;
p += 1; /* skip XID */
@@ -860,7 +859,7 @@ garbage:
task->tk_client->cl_stats->rpcgarbage++;
if (task->tk_garb_retry) {
task->tk_garb_retry--;
- printk(KERN_WARNING "RPC: garbage, retrying %4d\n", task->tk_pid);
+ dprintk(KERN_WARNING "RPC: garbage, retrying %4d\n", task->tk_pid);
task->tk_action = call_encode;
return NULL;
}
diff --git a/net/sunrpc/pmap_clnt.c b/net/sunrpc/pmap_clnt.c
index 026edcd70..45b775103 100644
--- a/net/sunrpc/pmap_clnt.c
+++ b/net/sunrpc/pmap_clnt.c
@@ -31,6 +31,7 @@
static struct rpc_clnt * pmap_create(char *, struct sockaddr_in *, int);
static void pmap_getport_done(struct rpc_task *);
extern struct rpc_program pmap_program;
+spinlock_t pmap_lock = SPIN_LOCK_UNLOCKED;
/*
* Obtain the port for a given RPC service on a given host. This one can
@@ -49,11 +50,14 @@ rpc_getport(struct rpc_task *task, struct rpc_clnt *clnt)
task->tk_pid, clnt->cl_server,
map->pm_prog, map->pm_vers, map->pm_prot);
+ spin_lock(&pmap_lock);
if (clnt->cl_binding) {
rpc_sleep_on(&clnt->cl_bindwait, task, NULL, 0);
+ spin_unlock(&pmap_lock);
return;
}
clnt->cl_binding = 1;
+ spin_unlock(&pmap_lock);
task->tk_status = -EACCES; /* why set this? returns -EIO below */
if (!(pmap_clnt = pmap_create(clnt->cl_server, sap, map->pm_prot)))
@@ -74,8 +78,10 @@ rpc_getport(struct rpc_task *task, struct rpc_clnt *clnt)
return;
bailout:
+ spin_lock(&pmap_lock);
clnt->cl_binding = 0;
rpc_wake_up(&clnt->cl_bindwait);
+ spin_unlock(&pmap_lock);
task->tk_status = -EIO;
task->tk_action = NULL;
}
@@ -129,8 +135,10 @@ pmap_getport_done(struct rpc_task *task)
clnt->cl_port = htons(clnt->cl_port);
clnt->cl_xprt->addr.sin_port = clnt->cl_port;
}
+ spin_lock(&pmap_lock);
clnt->cl_binding = 0;
rpc_wake_up(&clnt->cl_bindwait);
+ spin_unlock(&pmap_lock);
}
/*
diff --git a/net/sunrpc/sched.c b/net/sunrpc/sched.c
index b1e75b87f..9dc2d1247 100644
--- a/net/sunrpc/sched.c
+++ b/net/sunrpc/sched.c
@@ -669,8 +669,10 @@ __rpc_schedule(void)
if (task->tk_lock) {
spin_unlock_bh(&rpc_queue_lock);
printk(KERN_ERR "RPC: Locked task was scheduled !!!!\n");
+#ifdef RPC_DEBUG
rpc_debug = ~0;
rpc_show_tasks();
+#endif
break;
}
__rpc_remove_wait_queue(task);
@@ -778,7 +780,7 @@ rpc_init_task(struct rpc_task *task, struct rpc_clnt *clnt,
spin_unlock(&rpc_sched_lock);
if (clnt)
- clnt->cl_users++;
+ atomic_inc(&clnt->cl_users);
#ifdef RPC_DEBUG
task->tk_magic = 0xf00baa;
@@ -823,8 +825,8 @@ cleanup:
/* Check whether to release the client */
if (clnt) {
printk("rpc_new_task: failed, users=%d, oneshot=%d\n",
- clnt->cl_users, clnt->cl_oneshot);
- clnt->cl_users++; /* pretend we were used ... */
+ atomic_read(&clnt->cl_users), clnt->cl_oneshot);
+ atomic_inc(&clnt->cl_users); /* pretend we were used ... */
rpc_release_client(clnt);
}
goto out;
diff --git a/net/sunrpc/svc.c b/net/sunrpc/svc.c
index 385c0f30b..051a643ac 100644
--- a/net/sunrpc/svc.c
+++ b/net/sunrpc/svc.c
@@ -273,8 +273,8 @@ svc_process(struct svc_serv *serv, struct svc_rqst *rqstp)
if (prog != progp->pg_prog)
goto err_bad_prog;
- versp = progp->pg_vers[vers];
- if (!versp || vers >= progp->pg_nvers)
+ if (vers >= progp->pg_nvers ||
+ !(versp = progp->pg_vers[vers]))
goto err_bad_vers;
procp = versp->vs_proc + proc;
diff --git a/net/sunrpc/svcsock.c b/net/sunrpc/svcsock.c
index f64653120..e0a13d725 100644
--- a/net/sunrpc/svcsock.c
+++ b/net/sunrpc/svcsock.c
@@ -301,7 +301,7 @@ svc_recvfrom(struct svc_rqst *rqstp, struct iovec *iov, int nr, int buflen)
mm_segment_t oldfs;
struct msghdr msg;
struct socket *sock;
- int len;
+ int len, alen;
rqstp->rq_addrlen = sizeof(rqstp->rq_addr);
sock = rqstp->rq_sock->sk_sock;
@@ -319,6 +319,13 @@ svc_recvfrom(struct svc_rqst *rqstp, struct iovec *iov, int nr, int buflen)
len = sock_recvmsg(sock, &msg, buflen, MSG_DONTWAIT);
set_fs(oldfs);
+ /* sock_recvmsg doesn't fill in the name/namelen, so we must..
+ * possibly we should cache this in the svc_sock structure
+ * at accept time. FIXME
+ */
+ alen = sizeof(rqstp->rq_addr);
+ sock->ops->getname(sock, (struct sockaddr *)&rqstp->rq_addr, &alen, 1);
+
dprintk("svc: socket %p recvfrom(%p, %Zu) = %d\n",
rqstp->rq_sock, iov[0].iov_base, iov[0].iov_len, len);
@@ -539,15 +546,15 @@ svc_tcp_accept(struct svc_sock *svsk)
}
/* Ideally, we would want to reject connections from unauthorized
- * hosts here, but we have no generic client tables. For now,
- * we just punt connects from unprivileged ports. */
+ * hosts here, but when we get encription, the IP of the host won't
+ * tell us anything. For now just warn about unpriv connections.
+ */
if (ntohs(sin.sin_port) >= 1024) {
if (net_ratelimit())
printk(KERN_WARNING
- "%s: connect from unprivileged port: %u.%u.%u.%u:%d",
+ "%s: connect from unprivileged port: %u.%u.%u.%u:%d\n",
serv->sv_name,
NIPQUAD(sin.sin_addr.s_addr), ntohs(sin.sin_port));
- goto failed;
}
dprintk("%s: connect from %u.%u.%u.%u:%04x\n", serv->sv_name,
@@ -584,7 +591,7 @@ svc_tcp_recvfrom(struct svc_rqst *rqstp)
struct svc_sock *svsk = rqstp->rq_sock;
struct svc_serv *serv = svsk->sk_server;
struct svc_buf *bufp = &rqstp->rq_argbuf;
- int len, ready;
+ int len, ready, used;
dprintk("svc: tcp_recv %p data %d conn %d close %d\n",
svsk, svsk->sk_data, svsk->sk_conn, svsk->sk_close);
@@ -618,6 +625,11 @@ svc_tcp_recvfrom(struct svc_rqst *rqstp)
svsk->sk_reclen = ntohl(svsk->sk_reclen);
if (!(svsk->sk_reclen & 0x80000000)) {
+ /* FIXME: technically, a record can be fragmented,
+ * and non-terminal fragments will not have the top
+ * bit set in the fragment length header.
+ * But apparently no known nfs clients send fragmented
+ * records. */
/* FIXME: shutdown socket */
printk(KERN_NOTICE "RPC: bad TCP reclen %08lx",
(unsigned long) svsk->sk_reclen);
@@ -633,11 +645,21 @@ svc_tcp_recvfrom(struct svc_rqst *rqstp)
goto error;
if (len < svsk->sk_reclen) {
+ /* FIXME: if sk_reclen > window-size, then we will
+ * never be able to receive the record, so should
+ * shutdown the connection
+ */
dprintk("svc: incomplete TCP record (%d of %d)\n",
len, svsk->sk_reclen);
svc_sock_received(svsk, ready);
return -EAGAIN; /* record not complete */
}
+ /* if we think there is only one more record to read, but
+ * it is bigger than we expect, then two records must have arrived
+ * together, so pretend we aren't using the record.. */
+ if (len > svsk->sk_reclen && ready == 1)
+ used = 0;
+ else used = 1;
/* Frob argbuf */
bufp->iov[0].iov_base += 4;
@@ -664,7 +686,7 @@ svc_tcp_recvfrom(struct svc_rqst *rqstp)
svsk->sk_reclen = 0;
svsk->sk_tcplen = 0;
- svc_sock_received(svsk, 1);
+ svc_sock_received(svsk, used);
if (serv->sv_stats)
serv->sv_stats->nettcpcnt++;
@@ -692,6 +714,7 @@ static int
svc_tcp_sendto(struct svc_rqst *rqstp)
{
struct svc_buf *bufp = &rqstp->rq_resbuf;
+ int sent;
/* Set up the first element of the reply iovec.
* Any other iovecs that may be in use have been taken
@@ -701,7 +724,17 @@ svc_tcp_sendto(struct svc_rqst *rqstp)
bufp->iov[0].iov_len = bufp->len << 2;
bufp->base[0] = htonl(0x80000000|((bufp->len << 2) - 4));
- return svc_sendto(rqstp, bufp->iov, bufp->nriov);
+ sent = svc_sendto(rqstp, bufp->iov, bufp->nriov);
+ if (sent != bufp->len<<2) {
+ printk(KERN_NOTICE "rpc-srv/tcp: %s: sent only %d bytes of %d - should shutdown socket\n",
+ rqstp->rq_sock->sk_server->sv_name,
+ sent, bufp->len << 2);
+ /* FIXME: should shutdown the socket, or allocate more memort
+ * or wait and try again or something. Otherwise
+ * client will get confused
+ */
+ }
+ return sent;
}
static int
diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c
index b353aa37a..7534288db 100644
--- a/net/sunrpc/xprt.c
+++ b/net/sunrpc/xprt.c
@@ -290,11 +290,12 @@ xprt_adjust_cwnd(struct rpc_xprt *xprt, int result)
{
unsigned long cwnd = xprt->cwnd;
+ spin_lock_bh(&xprt_sock_lock);
if (xprt->nocong)
- return;
+ goto out;
if (result >= 0) {
if (xprt->cong < cwnd || time_before(jiffies, xprt->congtime))
- return;
+ goto out;
/* The (cwnd >> 1) term makes sure
* the result gets rounded properly. */
cwnd += (RPC_CWNDSCALE * RPC_CWNDSCALE + (cwnd >> 1)) / cwnd;
@@ -317,6 +318,8 @@ xprt_adjust_cwnd(struct rpc_xprt *xprt, int result)
}
xprt->cwnd = cwnd;
+ out:
+ spin_unlock_bh(&xprt_sock_lock);
}
/*
@@ -1294,15 +1297,18 @@ xprt_reserve(struct rpc_task *task)
dprintk("RPC: %4d xprt_reserve cong = %ld cwnd = %ld\n",
task->tk_pid, xprt->cong, xprt->cwnd);
- if (!RPCXPRT_CONGESTED(xprt) && xprt->free) {
- xprt_reserve_status(task);
+ spin_lock_bh(&xprt_sock_lock);
+ xprt_reserve_status(task);
+ if (task->tk_rqstp) {
task->tk_timeout = 0;
} else if (!task->tk_timeout) {
task->tk_status = -ENOBUFS;
} else {
dprintk("RPC: xprt_reserve waiting on backlog\n");
- rpc_sleep_on(&xprt->backlog, task, xprt_reserve_status, NULL);
+ task->tk_status = -EAGAIN;
+ rpc_sleep_on(&xprt->backlog, task, NULL, NULL);
}
+ spin_unlock_bh(&xprt_sock_lock);
dprintk("RPC: %4d xprt_reserve returns %d\n",
task->tk_pid, task->tk_status);
return task->tk_status;
@@ -1323,25 +1329,20 @@ xprt_reserve_status(struct rpc_task *task)
/* NOP */
} else if (task->tk_rqstp) {
/* We've already been given a request slot: NOP */
- } else if (!RPCXPRT_CONGESTED(xprt) && xprt->free) {
+ } else {
+ if (RPCXPRT_CONGESTED(xprt) || !(req = xprt->free))
+ goto out_nofree;
/* OK: There's room for us. Grab a free slot and bump
* congestion value */
- spin_lock(&xprt_lock);
- if (!(req = xprt->free)) {
- spin_unlock(&xprt_lock);
- goto out_nofree;
- }
xprt->free = req->rq_next;
req->rq_next = NULL;
- spin_unlock(&xprt_lock);
xprt->cong += RPC_CWNDSCALE;
task->tk_rqstp = req;
xprt_request_init(task, xprt);
if (xprt->free)
xprt_clear_backlog(xprt);
- } else
- goto out_nofree;
+ }
return;
@@ -1388,24 +1389,21 @@ xprt_release(struct rpc_task *task)
dprintk("RPC: %4d release request %p\n", task->tk_pid, req);
- spin_lock(&xprt_lock);
- req->rq_next = xprt->free;
- xprt->free = req;
-
/* remove slot from queue of pending */
if (task->tk_rpcwait) {
printk("RPC: task of released request still queued!\n");
-#ifdef RPC_DEBUG
- printk("RPC: (task is on %s)\n", rpc_qname(task->tk_rpcwait));
-#endif
rpc_remove_wait_queue(task);
}
- spin_unlock(&xprt_lock);
+
+ spin_lock_bh(&xprt_sock_lock);
+ req->rq_next = xprt->free;
+ xprt->free = req;
/* Decrease congestion value. */
xprt->cong -= RPC_CWNDSCALE;
xprt_clear_backlog(xprt);
+ spin_unlock_bh(&xprt_sock_lock);
}
/*