diff options
Diffstat (limited to 'net/sunrpc/xprt.c')
-rw-r--r-- | net/sunrpc/xprt.c | 721 |
1 files changed, 398 insertions, 323 deletions
diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c index 610cfc07b..fb09aac28 100644 --- a/net/sunrpc/xprt.c +++ b/net/sunrpc/xprt.c @@ -31,12 +31,16 @@ * primitives that `transparently' work for processes as well as async * tasks that rely on callbacks. * - * Copyright (C) 1995, 1996, Olaf Kirch <okir@monad.swb.de> + * Copyright (C) 1995-1997, Olaf Kirch <okir@monad.swb.de> * * TCP callback races fixes (C) 1998 Red Hat Software <alan@redhat.com> * TCP send fixes (C) 1998 Red Hat Software <alan@redhat.com> * TCP NFS related read + write fixes * (C) 1999 Dave Airlie, University of Limerick, Ireland <airlied@linux.ie> + * + * Rewrite of larges part of the code in order to stabilize TCP stuff. + * Fix behaviour when socket buffer is full. + * (C) 1999 Trond Myklebust <trond.myklebust@fys.uio.no> */ #define __KERNEL_SYSCALLS__ @@ -62,6 +66,8 @@ #include <asm/uaccess.h> #define SOCK_HAS_USER_DATA +/* Following value should be > 32k + RPC overhead */ +#define XPRT_MIN_WRITE_SPACE 35000 /* * Local variables @@ -70,6 +76,9 @@ static struct rpc_xprt * sock_list = NULL; #endif +/* Spinlock for critical sections in the code. */ +spinlock_t xprt_lock = SPIN_LOCK_UNLOCKED; + #ifdef RPC_DEBUG # undef RPC_DEBUG_DATA # define RPCDBG_FACILITY RPCDBG_XPRT @@ -84,11 +93,13 @@ static struct rpc_xprt * sock_list = NULL; * Local functions */ static void xprt_request_init(struct rpc_task *, struct rpc_xprt *); +static void do_xprt_transmit(struct rpc_task *); static void xprt_transmit_status(struct rpc_task *task); +static void xprt_transmit_timeout(struct rpc_task *task); static void xprt_receive_status(struct rpc_task *task); static void xprt_reserve_status(struct rpc_task *task); +static void xprt_disconnect(struct rpc_xprt *); static void xprt_reconn_timeout(struct rpc_task *task); -static void xprt_reconn_status(struct rpc_task *task); static struct socket *xprt_create_socket(int, struct sockaddr_in *, struct rpc_timeout *); @@ -144,39 +155,35 @@ xprt_from_sock(struct sock *sk) * Adjust the iovec to move on 'n' bytes */ -extern inline void xprt_move_iov(struct msghdr *msg, struct iovec *niv, int amount) +extern inline void +xprt_move_iov(struct msghdr *msg, struct iovec *niv, int amount) { struct iovec *iv=msg->msg_iov; + int i; /* * Eat any sent iovecs */ - - while(iv->iov_len < amount) - { - amount-=iv->iov_len; + while(iv->iov_len <= amount) { + amount -= iv->iov_len; iv++; msg->msg_iovlen--; } - - msg->msg_iov=niv; - + /* * And chew down the partial one */ - niv[0].iov_len = iv->iov_len-amount; niv[0].iov_base =((unsigned char *)iv->iov_base)+amount; iv++; - + /* * And copy any others */ - - for(amount=1;amount<msg->msg_iovlen; amount++) - { - niv[amount]=*iv++; - } + for(i = 1; i < msg->msg_iovlen; i++) + niv[i]=*iv++; + + msg->msg_iov=niv; } /* @@ -184,43 +191,42 @@ extern inline void xprt_move_iov(struct msghdr *msg, struct iovec *niv, int amou */ static inline int -xprt_sendmsg(struct rpc_xprt *xprt) +xprt_sendmsg(struct rpc_xprt *xprt, struct rpc_rqst *req) { struct socket *sock = xprt->sock; struct msghdr msg; mm_segment_t oldfs; int result; + int slen = req->rq_slen - req->rq_bytes_sent; struct iovec niv[MAX_IOVEC]; + if (slen == 0) + return 0; + xprt_pktdump("packet data:", - xprt->snd_buf.io_vec->iov_base, - xprt->snd_buf.io_vec->iov_len); + req->rq_svec->iov_base, + req->rq_svec->iov_len); msg.msg_flags = MSG_DONTWAIT; - msg.msg_iov = xprt->snd_buf.io_vec; - msg.msg_iovlen = xprt->snd_buf.io_nr; + msg.msg_iov = req->rq_svec; + msg.msg_iovlen = req->rq_snr; msg.msg_name = (struct sockaddr *) &xprt->addr; msg.msg_namelen = sizeof(xprt->addr); msg.msg_control = NULL; msg.msg_controllen = 0; /* Dont repeat bytes */ - - if(xprt->snd_sent) - xprt_move_iov(&msg, niv, xprt->snd_sent); - + if (req->rq_bytes_sent) + xprt_move_iov(&msg, niv, req->rq_bytes_sent); + oldfs = get_fs(); set_fs(get_ds()); - result = sock_sendmsg(sock, &msg, xprt->snd_buf.io_len); + result = sock_sendmsg(sock, &msg, slen); set_fs(oldfs); - dprintk("RPC: xprt_sendmsg(%d) = %d\n", - xprt->snd_buf.io_len, result); + dprintk("RPC: xprt_sendmsg(%d) = %d\n", slen, result); - if (result >= 0) { - xprt->snd_buf.io_len -= result; - xprt->snd_sent += result; + if (result >= 0) return result; - } switch (result) { case -ECONNREFUSED: @@ -229,9 +235,14 @@ xprt_sendmsg(struct rpc_xprt *xprt) */ break; case -EAGAIN: - return 0; - case -ENOTCONN: case -EPIPE: + if (sock->flags & SO_NOSPACE) + result = -ENOMEM; + break; + case -ENOTCONN: + case -EPIPE: /* connection broken */ + if (xprt->stream) + result = -ENOTCONN; break; default: printk(KERN_NOTICE "RPC: sendmsg returned error %d\n", -result); @@ -252,7 +263,6 @@ xprt_recvmsg(struct rpc_xprt *xprt, struct iovec *iov, int nr, int len) mm_segment_t oldfs; int result; -#if LINUX_VERSION_CODE >= 0x020100 msg.msg_flags = MSG_DONTWAIT; msg.msg_iov = iov; msg.msg_iovlen = nr; @@ -264,20 +274,6 @@ xprt_recvmsg(struct rpc_xprt *xprt, struct iovec *iov, int nr, int len) oldfs = get_fs(); set_fs(get_ds()); result = sock_recvmsg(sock, &msg, len, MSG_DONTWAIT); set_fs(oldfs); -#else - int alen = sizeof(sin); - msg.msg_flags = 0; - msg.msg_iov = iov; - msg.msg_iovlen = nr; - msg.msg_name = &sin; - msg.msg_namelen = sizeof(sin); - msg.msg_control = NULL; - msg.msg_controllen = 0; - - oldfs = get_fs(); set_fs(get_ds()); - result = sock->ops->recvmsg(sock, &msg, len, 1, 0, &alen); - set_fs(oldfs); -#endif dprintk("RPC: xprt_recvmsg(iov %p, len %d) = %d\n", iov, len, result); @@ -354,13 +350,15 @@ xprt_close(struct rpc_xprt *xprt) { struct sock *sk = xprt->inet; + xprt_disconnect(xprt); + #ifdef SOCK_HAS_USER_DATA sk->user_data = NULL; #endif sk->data_ready = xprt->old_data_ready; - sk->no_check = 0; sk->state_change = xprt->old_state_change; sk->write_space = xprt->old_write_space; + sk->no_check = 0; sock_release(xprt->sock); /* @@ -378,9 +376,16 @@ static void xprt_disconnect(struct rpc_xprt *xprt) { dprintk("RPC: disconnected transport %p\n", xprt); + xprt->connected = 0; + xprt->tcp_offset = 0; + xprt->tcp_more = 0; + xprt->tcp_total = 0; + xprt->tcp_reclen = 0; + xprt->tcp_copied = 0; + xprt->tcp_rqstp = NULL; + xprt->rx_pending_flag = 0; rpc_wake_up_status(&xprt->pending, -ENOTCONN); rpc_wake_up_status(&xprt->sending, -ENOTCONN); - xprt->connected = 0; } /* @@ -398,22 +403,33 @@ xprt_reconnect(struct rpc_task *task) task->tk_pid, xprt, xprt->connected); task->tk_status = 0; + if (xprt->shutdown) + return; + + if (!xprt->stream) + return; + + start_bh_atomic(); + if (xprt->connected) { + end_bh_atomic(); + return; + } if (xprt->connecting) { task->tk_timeout = xprt->timeout.to_maxval; rpc_sleep_on(&xprt->reconn, task, NULL, NULL); + end_bh_atomic(); return; } xprt->connecting = 1; + end_bh_atomic(); /* Create an unconnected socket */ - if (!(sock = xprt_create_socket(xprt->prot, NULL, &xprt->timeout))) + if (!(sock = xprt_create_socket(xprt->prot, NULL, &xprt->timeout))) { + xprt->connecting = 0; goto defer; + } -#if LINUX_VERSION_CODE >= 0x020100 inet = sock->sk; -#else - inet = (struct sock *) sock->data; -#endif inet->data_ready = xprt->inet->data_ready; inet->state_change = xprt->inet->state_change; inet->write_space = xprt->inet->write_space; @@ -422,18 +438,18 @@ xprt_reconnect(struct rpc_task *task) #endif dprintk("RPC: %4d closing old socket\n", task->tk_pid); - xprt_disconnect(xprt); xprt_close(xprt); - /* Reset to new socket and default congestion */ + /* Reset to new socket */ xprt->sock = sock; xprt->inet = inet; - xprt->cwnd = RPC_INITCWND; /* Now connect it asynchronously. */ dprintk("RPC: %4d connecting new socket\n", task->tk_pid); status = sock->ops->connect(sock, (struct sockaddr *) &xprt->addr, sizeof(xprt->addr), O_NONBLOCK); + + xprt->connecting = 0; if (status < 0) { if (status != -EINPROGRESS && status != -EALREADY) { printk("RPC: TCP connect error %d!\n", -status); @@ -447,37 +463,19 @@ xprt_reconnect(struct rpc_task *task) start_bh_atomic(); if (!xprt->connected) { rpc_sleep_on(&xprt->reconn, task, - xprt_reconn_status, xprt_reconn_timeout); + NULL, xprt_reconn_timeout); end_bh_atomic(); return; } end_bh_atomic(); } - xprt->connecting = 0; - rpc_wake_up(&xprt->reconn); - return; defer: - task->tk_timeout = 30 * HZ; - rpc_sleep_on(&xprt->reconn, task, NULL, NULL); - xprt->connecting = 0; -} - -/* - * Reconnect status - */ -static void -xprt_reconn_status(struct rpc_task *task) -{ - struct rpc_xprt *xprt = task->tk_xprt; - - dprintk("RPC: %4d xprt_reconn_status %d\n", - task->tk_pid, task->tk_status); - if (!xprt->connected && task->tk_status != -ETIMEDOUT) { - task->tk_timeout = 30 * HZ; - rpc_sleep_on(&xprt->reconn, task, NULL, xprt_reconn_timeout); - } + start_bh_atomic(); + if (!xprt->connected) + rpc_wake_up_next(&xprt->reconn); + end_bh_atomic(); } /* @@ -490,11 +488,19 @@ xprt_reconn_timeout(struct rpc_task *task) dprintk("RPC: %4d xprt_reconn_timeout %d\n", task->tk_pid, task->tk_status); task->tk_status = -ENOTCONN; - task->tk_xprt->connecting = 0; + start_bh_atomic(); + if (task->tk_xprt->connecting) + task->tk_xprt->connecting = 0; + if (!task->tk_xprt->connected) + task->tk_status = -ENOTCONN; + else + task->tk_status = -ETIMEDOUT; + end_bh_atomic(); task->tk_timeout = 0; rpc_wake_up_task(task); } +extern spinlock_t rpc_queue_lock; /* * Look up the RPC request corresponding to a reply. */ @@ -503,22 +509,28 @@ xprt_lookup_rqst(struct rpc_xprt *xprt, u32 xid) { struct rpc_task *head, *task; struct rpc_rqst *req; + unsigned long oldflags; int safe = 0; + spin_lock_irqsave(&rpc_queue_lock, oldflags); if ((head = xprt->pending.task) != NULL) { task = head; do { if ((req = task->tk_rqstp) && req->rq_xid == xid) - return req; + goto out; task = task->tk_next; if (++safe > 100) { printk("xprt_lookup_rqst: loop in Q!\n"); - return NULL; + goto out_bad; } } while (task != head); } dprintk("RPC: unknown XID %08x in reply.\n", xid); - return NULL; + out_bad: + req = NULL; + out: + spin_unlock_irqrestore(&rpc_queue_lock, oldflags); + return req; } /* @@ -559,11 +571,13 @@ xprt_complete_rqst(struct rpc_xprt *xprt, struct rpc_rqst *req, int copied) dprintk("RPC: %4d has input (%d bytes)\n", task->tk_pid, copied); task->tk_status = copied; - rpc_wake_up_task(task); + if (!RPC_IS_RUNNING(task)) + rpc_wake_up_task(task); return; } -/* We have set things up such that we perform the checksum of the UDP +/* + * We have set things up such that we perform the checksum of the UDP * packet in parallel with the copies into the RPC client iovec. -DaveM */ static int csum_partial_copy_to_page_cache(struct iovec *iov, @@ -609,7 +623,8 @@ static int csum_partial_copy_to_page_cache(struct iovec *iov, return 0; } -/* Input handler for RPC replies. Called from a bottom half and hence +/* + * Input handler for RPC replies. Called from a bottom half and hence * atomic. */ static inline void @@ -621,12 +636,15 @@ udp_data_ready(struct sock *sk, int len) int err, repsize, copied; dprintk("RPC: udp_data_ready...\n"); - if (!(xprt = xprt_from_sock(sk))) + if (!(xprt = xprt_from_sock(sk))) { + printk("RPC: udp_data_ready request not found!\n"); return; + } + dprintk("RPC: udp_data_ready client %p\n", xprt); if ((skb = skb_recv_datagram(sk, 0, 1, &err)) == NULL) - return; + goto out_err; repsize = skb->len - sizeof(struct udphdr); if (repsize < 4) { @@ -646,6 +664,7 @@ udp_data_ready(struct sock *sk, int len) if ((copied = rovr->rq_rlen) > repsize) copied = repsize; + rovr->rq_damaged = 1; /* Suck it into the iovec, verify checksum if not done by hw. */ if (csum_partial_copy_to_page_cache(rovr->rq_rvec, skb, copied)) goto dropit; @@ -658,6 +677,8 @@ udp_data_ready(struct sock *sk, int len) dropit: skb_free_datagram(sk, skb); return; +out_err: + return; } /* @@ -679,6 +700,7 @@ tcp_input_record(struct rpc_xprt *xprt) int result, maxcpy, reclen, avail, want; dprintk("RPC: tcp_input_record\n"); + offset = xprt->tcp_offset; result = -EAGAIN; if (offset < 4 || (!xprt->tcp_more && offset < 8)) { @@ -687,11 +709,6 @@ tcp_input_record(struct rpc_xprt *xprt) riov.iov_base = xprt->tcp_recm.data + offset; riov.iov_len = want; result = xprt_recvmsg(xprt, &riov, 1, want); - if (!result) - { - dprintk("RPC: empty TCP record.\n"); - return -ENOTCONN; - } if (result < 0) goto done; offset += result; @@ -733,9 +750,9 @@ tcp_input_record(struct rpc_xprt *xprt) dprintk("RPC: %4d TCP receiving %d bytes\n", req->rq_task->tk_pid, want); + /* Request must be re-encoded before retransmit */ + req->rq_damaged = 1; result = xprt_recvmsg(xprt, xprt->tcp_iovec, req->rq_rnr, want); - if (!result && want) - result = -EAGAIN; if (result < 0) goto done; xprt->tcp_copied += result; @@ -754,12 +771,10 @@ tcp_input_record(struct rpc_xprt *xprt) xprt->tcp_copied = 0; xprt->tcp_rqstp = NULL; } - /* Request must be re-encoded before retransmit */ - req->rq_damaged = 1; } /* Skip over any trailing bytes on short reads */ - while (avail) { + while (avail > 0) { static u8 dummy[64]; want = MIN(avail, sizeof(dummy)); @@ -767,8 +782,6 @@ tcp_input_record(struct rpc_xprt *xprt) riov.iov_len = want; dprintk("RPC: TCP skipping %d bytes\n", want); result = xprt_recvmsg(xprt, &riov, 1, want); - if (!result && want) - result=-EAGAIN; if (result < 0) goto done; offset += result; @@ -789,55 +802,40 @@ done: return result; } -static __inline__ void tcp_output_record(struct rpc_xprt *xprt) -{ - if(xprt->snd_sent && xprt->snd_task) - dprintk("RPC: write space\n"); - if(xprt->write_space == 0) - { - xprt->write_space = 1; - if (xprt->snd_task && !RPC_IS_RUNNING(xprt->snd_task)) - { - if(xprt->snd_sent) - dprintk("RPC: Write wakeup snd_sent =%d\n", - xprt->snd_sent); - rpc_wake_up_task(xprt->snd_task); - } - } -} - /* * TCP task queue stuff */ -static struct rpc_xprt *rpc_rx_xprt_pending = NULL; /* Chain by rx_pending of rpc_xprt's */ -static struct rpc_xprt *rpc_tx_xprt_pending = NULL; /* Chain by tx_pending of rpc_xprt's */ +static struct rpc_xprt *rpc_xprt_pending = NULL; /* Chain by rx_pending of rpc_xprt's */ /* * This is protected from tcp_data_ready and the stack as its run * inside of the RPC I/O daemon */ - -void rpciod_tcp_dispatcher(void) +static void +do_rpciod_tcp_dispatcher(void) { struct rpc_xprt *xprt; int result; dprintk("rpciod_tcp_dispatcher: Queue Running\n"); - + /* * Empty each pending socket */ - - while((xprt=rpc_rx_xprt_pending)!=NULL) - { + + while(1) { int safe_retry=0; - - rpc_rx_xprt_pending=xprt->rx_pending; - xprt->rx_pending_flag=0; - + + if ((xprt = rpc_xprt_pending) == NULL) { + break; + } + xprt->rx_pending_flag = 0; + rpc_xprt_pending=xprt->rx_pending; + xprt->rx_pending = NULL; + dprintk("rpciod_tcp_dispatcher: Processing %p\n", xprt); - + do { if (safe_retry++ > 50) @@ -845,28 +843,30 @@ void rpciod_tcp_dispatcher(void) result = tcp_input_record(xprt); } while (result >= 0); - + switch (result) { case -EAGAIN: - continue; case -ENOTCONN: case -EPIPE: - xprt_disconnect(xprt); continue; default: printk(KERN_WARNING "RPC: unexpected error %d from tcp_input_record\n", result); } } +} - while((xprt=rpc_tx_xprt_pending)!=NULL) - { - rpc_tx_xprt_pending = xprt->tx_pending; - xprt->tx_pending_flag = 0; - tcp_output_record(xprt); - } +void rpciod_tcp_dispatcher(void) +{ + start_bh_atomic(); + do_rpciod_tcp_dispatcher(); + end_bh_atomic(); } +int xprt_tcp_pending(void) +{ + return rpc_xprt_pending != NULL; +} extern inline void tcp_rpciod_queue(void) { @@ -890,6 +890,7 @@ static void tcp_data_ready(struct sock *sk, int len) printk("Not a socket with xprt %p\n", sk); return; } + dprintk("RPC: tcp_data_ready client %p\n", xprt); dprintk("RPC: state %x conn %d dead %d zapped %d\n", sk->state, xprt->connected, @@ -898,24 +899,16 @@ static void tcp_data_ready(struct sock *sk, int len) * If we are not waiting for the RPC bh run then * we are now */ - if (!xprt->rx_pending_flag) - { - int start_queue=0; + if (!xprt->rx_pending_flag) { + dprintk("RPC: xprt queue %p\n", rpc_xprt_pending); - dprintk("RPC: xprt queue %p\n", rpc_rx_xprt_pending); - if(rpc_rx_xprt_pending==NULL) - start_queue=1; + xprt->rx_pending=rpc_xprt_pending; + rpc_xprt_pending=xprt; xprt->rx_pending_flag=1; - xprt->rx_pending=rpc_rx_xprt_pending; - rpc_rx_xprt_pending=xprt; - if (start_queue) - { - tcp_rpciod_queue(); - start_queue=0; - } - } - else + } else dprintk("RPC: xprt queued already %p\n", xprt); + tcp_rpciod_queue(); + } @@ -931,17 +924,32 @@ tcp_state_change(struct sock *sk) sk->state, xprt->connected, sk->dead, sk->zapped); - if (sk->state == TCP_ESTABLISHED && !xprt->connected) { + switch(sk->state) { + case TCP_ESTABLISHED: + if (xprt->connected) + break; xprt->connected = 1; xprt->connecting = 0; rpc_wake_up(&xprt->reconn); - } else if (sk->zapped) { - rpc_wake_up_status(&xprt->pending, -ENOTCONN); - rpc_wake_up_status(&xprt->sending, -ENOTCONN); + rpc_wake_up_next(&xprt->sending); + tcp_rpciod_queue(); + break; + case TCP_CLOSE: + if (xprt->connecting) + break; + xprt_disconnect(xprt); rpc_wake_up_status(&xprt->reconn, -ENOTCONN); + break; + default: + break; } + } +/* + * The following 2 routines allow a task to sleep while socket memory is + * low. + */ static void tcp_write_space(struct sock *sk) { @@ -949,17 +957,43 @@ tcp_write_space(struct sock *sk) if (!(xprt = xprt_from_sock(sk))) return; - if (!xprt->tx_pending_flag) { - int start_queue = 0; - - if (rpc_tx_xprt_pending == NULL) - start_queue = 1; - xprt->tx_pending_flag = 1; - xprt->tx_pending = rpc_tx_xprt_pending; - rpc_tx_xprt_pending = xprt; - if (start_queue) - tcp_rpciod_queue(); - } + + /* Wait until we have enough socket memory */ + if (sock_wspace(sk) < min(sk->sndbuf,XPRT_MIN_WRITE_SPACE)) + return; + + if (xprt->write_space) + return; + + xprt->write_space = 1; + + if (!xprt->snd_task) + rpc_wake_up_next(&xprt->sending); + else if (!RPC_IS_RUNNING(xprt->snd_task)) + rpc_wake_up_task(xprt->snd_task); +} + +static void +udp_write_space(struct sock *sk) +{ + struct rpc_xprt *xprt; + + if (!(xprt = xprt_from_sock(sk))) + return; + + + /* Wait until we have enough socket memory */ + if (sock_wspace(sk) < min(sk->sndbuf,XPRT_MIN_WRITE_SPACE)) + return; + + if (xprt->write_space) + return; + + xprt->write_space = 1; + if (!xprt->snd_task) + rpc_wake_up_next(&xprt->sending); + else if (!RPC_IS_RUNNING(xprt->snd_task)) + rpc_wake_up_task(xprt->snd_task); } /* @@ -982,32 +1016,50 @@ xprt_timer(struct rpc_task *task) rpc_wake_up_task(task); } + /* - * (Partly) transmit the RPC packet - * Note that task->tk_status is either 0 or negative on return. - * Only when the reply is received will the status be set to a - * positive value. + * Serialize access to sockets, in order to prevent different + * requests from interfering with each other. */ -static inline int -xprt_transmit_some(struct rpc_xprt *xprt, struct rpc_task *task) +static int +xprt_down_transmit(struct rpc_task *task) { + struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt; struct rpc_rqst *req = task->tk_rqstp; - int result; - task->tk_status = 0; - if ((result = xprt_sendmsg(xprt)) >= 0) { - if (!xprt->snd_buf.io_len || !xprt->stream) { - rpc_wake_up_next(&xprt->sending); - return req->rq_slen; - } - result = -EAGAIN; - } else if (xprt->stream) { - if (result == -ENOTCONN || result == -EPIPE) { - xprt_disconnect(xprt); - result = -ENOTCONN; - } + start_bh_atomic(); + spin_lock(&xprt_lock); + if (xprt->snd_task && xprt->snd_task != task) { + dprintk("RPC: %4d TCP write queue full (task %d)\n", + task->tk_pid, xprt->snd_task->tk_pid); + task->tk_timeout = req->rq_timeout.to_current; + rpc_sleep_on(&xprt->sending, task, xprt_transmit, NULL); + } else if (!xprt->snd_task) { + xprt->snd_task = task; +#ifdef RPC_PROFILE + req->rq_xtime = jiffies; +#endif + req->rq_bytes_sent = 0; + } + spin_unlock(&xprt_lock); + end_bh_atomic(); + return xprt->snd_task == task; +} + +/* + * Releases the socket for use by other requests. + */ +static void +xprt_up_transmit(struct rpc_task *task) +{ + struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt; + + if (xprt->snd_task && xprt->snd_task == task) { + start_bh_atomic(); + xprt->snd_task = NULL; + rpc_wake_up_next(&xprt->sending); + end_bh_atomic(); } - return task->tk_status = result; } /* @@ -1020,71 +1072,65 @@ xprt_transmit(struct rpc_task *task) struct rpc_timeout *timeo; struct rpc_rqst *req = task->tk_rqstp; struct rpc_xprt *xprt = req->rq_xprt; - int status; dprintk("RPC: %4d xprt_transmit(%x)\n", task->tk_pid, *(u32 *)(req->rq_svec[0].iov_base)); - if (xprt->shutdown) { + if (xprt->shutdown) task->tk_status = -EIO; + + if (task->tk_status < 0) return; + + /* Reset timeout parameters */ + timeo = &req->rq_timeout; + if (timeo->to_retries < 0) { + dprintk("RPC: %4d xprt_transmit reset timeo\n", + task->tk_pid); + timeo->to_retries = xprt->timeout.to_retries; + timeo->to_current = timeo->to_initval; } - /* If we're not already in the process of transmitting our call, - * set up everything as needed. */ - if (xprt->snd_task != task) { - /* Write the record marker */ - if (xprt->stream) { - u32 marker; + /* set up everything as needed. */ + /* Write the record marker */ + if (xprt->stream) { + u32 marker; - if (!xprt->connected) { - task->tk_status = -ENOTCONN; - return; - } - marker = htonl(0x80000000|(req->rq_slen-4)); - *((u32 *) req->rq_svec[0].iov_base) = marker; - } + marker = htonl(0x80000000|(req->rq_slen-4)); + *((u32 *) req->rq_svec[0].iov_base) = marker; - /* Reset timeout parameters */ - timeo = &req->rq_timeout; - if (timeo->to_retries < 0) { - dprintk("RPC: %4d xprt_transmit reset timeo\n", - task->tk_pid); - timeo->to_retries = xprt->timeout.to_retries; - timeo->to_current = timeo->to_initval; - } + } -#ifdef RPC_PROFILE - req->rq_xtime = jiffies; -#endif - req->rq_gotit = 0; + if (!xprt_down_transmit(task)) + return; - if (xprt->snd_task) { - dprintk("RPC: %4d TCP write queue full (task %d)\n", - task->tk_pid, xprt->snd_task->tk_pid); - rpc_sleep_on(&xprt->sending, task, - xprt_transmit_status, NULL); - return; - } - xprt->snd_buf = req->rq_snd_buf; - xprt->snd_task = task; - xprt->snd_sent = 0; + do_xprt_transmit(task); +} + +static void +do_xprt_transmit(struct rpc_task *task) +{ + struct rpc_rqst *req = task->tk_rqstp; + struct rpc_xprt *xprt = req->rq_xprt; + int status, retry = 0; + + if (xprt->shutdown) { + task->tk_status = -EIO; + goto out_release; } /* For fast networks/servers we have to put the request on * the pending list now: */ - start_bh_atomic(); + req->rq_gotit = 0; status = rpc_add_wait_queue(&xprt->pending, task); if (!status) task->tk_callback = NULL; - end_bh_atomic(); - if (status) - { + if (status) { printk(KERN_WARNING "RPC: failed to add task to queue: error: %d!\n", status); task->tk_status = status; - return; + goto out_release; } /* Continue transmitting the packet/record. We must be careful @@ -1093,27 +1139,55 @@ xprt_transmit(struct rpc_task *task) */ while (1) { xprt->write_space = 0; - if (xprt_transmit_some(xprt, task) != -EAGAIN) { + status = xprt_sendmsg(xprt, req); + + if (status < 0) + break; + + if (xprt->stream) { + req->rq_bytes_sent += status; + + if (req->rq_bytes_sent >= req->rq_slen) + goto out_release; + } + + if (status < req->rq_slen) + status = -EAGAIN; + + if (status >= 0 || !xprt->stream) { dprintk("RPC: %4d xmit complete\n", task->tk_pid); - xprt->snd_task = NULL; - return; + goto out_release; } - /*d*/dprintk("RPC: %4d xmit incomplete (%d left of %d)\n", - task->tk_pid, xprt->snd_buf.io_len, + dprintk("RPC: %4d xmit incomplete (%d left of %d)\n", + task->tk_pid, req->rq_slen - req->rq_bytes_sent, req->rq_slen); - task->tk_status = 0; - start_bh_atomic(); - if (!xprt->write_space) { - /* Remove from pending */ - rpc_remove_wait_queue(task); - rpc_sleep_on(&xprt->sending, task, - xprt_transmit_status, NULL); - end_bh_atomic(); - return; - } + + if (retry++ > 50) + break; + } + + task->tk_status = (status == -ENOMEM) ? -EAGAIN : status; + + /* We don't care if we got a reply, so don't protect + * against bh. */ + if (task->tk_rpcwait == &xprt->pending) + rpc_remove_wait_queue(task); + + /* Protect against (udp|tcp)_write_space */ + start_bh_atomic(); + if (status == -ENOMEM || status == -EAGAIN) { + task->tk_timeout = req->rq_timeout.to_current; + if (!xprt->write_space) + rpc_sleep_on(&xprt->sending, task, xprt_transmit_status, + xprt_transmit_timeout); end_bh_atomic(); + return; } + end_bh_atomic(); + +out_release: + xprt_up_transmit(task); } /* @@ -1126,19 +1200,27 @@ xprt_transmit_status(struct rpc_task *task) struct rpc_xprt *xprt = task->tk_client->cl_xprt; dprintk("RPC: %4d transmit_status %d\n", task->tk_pid, task->tk_status); - if (xprt->snd_task == task) - { - if (task->tk_status < 0) - { - xprt->snd_task = NULL; - xprt_disconnect(xprt); - } - else - xprt_transmit(task); + if (xprt->snd_task == task) { + task->tk_status = 0; + do_xprt_transmit(task); + return; } } /* + * RPC transmit timeout handler. + */ +static void +xprt_transmit_timeout(struct rpc_task *task) +{ + dprintk("RPC: %4d transmit_timeout %d\n", task->tk_pid, task->tk_status); + task->tk_status = -ETIMEDOUT; + task->tk_timeout = 0; + rpc_wake_up_task(task); + xprt_up_transmit(task); +} + +/* * Wait for the reply to our call. * When the callback is invoked, the congestion window should have * been updated already. @@ -1150,25 +1232,33 @@ xprt_receive(struct rpc_task *task) struct rpc_xprt *xprt = req->rq_xprt; dprintk("RPC: %4d xprt_receive\n", task->tk_pid); - if (xprt->connected == 0) { - task->tk_status = -ENOTCONN; - return; - } /* - * Wait until rq_gotit goes non-null, or timeout elapsed. + * Wait until rq_gotit goes non-null, or timeout elapsed. */ task->tk_timeout = req->rq_timeout.to_current; start_bh_atomic(); + if (task->tk_rpcwait) + rpc_remove_wait_queue(task); + + if (task->tk_status < 0 || xprt->shutdown) { + end_bh_atomic(); + goto out; + } + if (!req->rq_gotit) { rpc_sleep_on(&xprt->pending, task, xprt_receive_status, xprt_timer); + end_bh_atomic(); + return; } end_bh_atomic(); dprintk("RPC: %4d xprt_receive returns %d\n", task->tk_pid, task->tk_status); + out: + xprt_receive_status(task); } static void @@ -1176,8 +1266,9 @@ xprt_receive_status(struct rpc_task *task) { struct rpc_xprt *xprt = task->tk_xprt; - if (xprt->stream && xprt->tcp_rqstp == task->tk_rqstp) + if (xprt->tcp_rqstp == task->tk_rqstp) xprt->tcp_rqstp = NULL; + } /* @@ -1194,7 +1285,7 @@ xprt_reserve(struct rpc_task *task) dprintk("RPC: %4d xprt_reserve cong = %ld cwnd = %ld\n", task->tk_pid, xprt->cong, xprt->cwnd); - if ((!RPCXPRT_CONGESTED(xprt) && xprt->free)) { + if (!RPCXPRT_CONGESTED(xprt) && xprt->free) { xprt_reserve_status(task); task->tk_timeout = 0; } else if (!task->tk_timeout) { @@ -1223,40 +1314,30 @@ xprt_reserve_status(struct rpc_task *task) /* NOP */ } else if (task->tk_rqstp) { /* We've already been given a request slot: NOP */ - } else if (!RPCXPRT_CONGESTED(xprt)) { + } else if (!RPCXPRT_CONGESTED(xprt) && xprt->free) { /* OK: There's room for us. Grab a free slot and bump * congestion value */ - req = xprt->free; - if (!req) - goto bad_list; - if (req->rq_xid) - goto bad_used; + spin_lock(&xprt_lock); + if (!(req = xprt->free)) { + spin_unlock(&xprt_lock); + goto out_nofree; + } xprt->free = req->rq_next; + req->rq_next = NULL; + spin_unlock(&xprt_lock); xprt->cong += RPC_CWNDSCALE; task->tk_rqstp = req; - req->rq_next = NULL; xprt_request_init(task, xprt); - } else { - task->tk_status = -EAGAIN; - } - if (xprt->free && !RPCXPRT_CONGESTED(xprt)) - rpc_wake_up_next(&xprt->backlog); + if (xprt->free) + xprt_clear_backlog(xprt); + } else + goto out_nofree; return; -bad_list: - printk(KERN_ERR - "RPC: %4d inconsistent free list (cong %ld cwnd %ld)\n", - task->tk_pid, xprt->cong, xprt->cwnd); - rpc_debug = ~0; - goto bummer; -bad_used: - printk(KERN_ERR "RPC: used rqst slot %p on free list!\n", req); -bummer: - task->tk_status = -EIO; - xprt->free = NULL; - return; +out_nofree: + task->tk_status = -EAGAIN; } /* @@ -1298,13 +1379,15 @@ xprt_release(struct rpc_task *task) dprintk("RPC: %4d release request %p\n", task->tk_pid, req); + spin_lock(&xprt_lock); + req->rq_next = xprt->free; + xprt->free = req; + spin_unlock(&xprt_lock); + /* remove slot from queue of pending */ start_bh_atomic(); if (task->tk_rpcwait) { printk("RPC: task of released request still queued!\n"); -#ifdef RPC_DEBUG - printk("RPC: (task is on %s)\n", rpc_qname(task->tk_rpcwait)); -#endif rpc_del_timer(task); rpc_remove_wait_queue(task); } @@ -1313,31 +1396,7 @@ xprt_release(struct rpc_task *task) /* Decrease congestion value. */ xprt->cong -= RPC_CWNDSCALE; -#if 0 - /* If congestion threshold is not yet reached, pass on the request slot. - * This looks kind of kludgy, but it guarantees backlogged requests - * are served in order. - * N.B. This doesn't look completely safe, as the task is still - * on the backlog list after wake-up. - */ - if (!RPCXPRT_CONGESTED(xprt)) { - struct rpc_task *next = rpc_wake_up_next(&xprt->backlog); - - if (next && next->tk_rqstp == 0) { - xprt->cong += RPC_CWNDSCALE; - next->tk_rqstp = req; - xprt_request_init(next, xprt); - return; - } - } -#endif - - req->rq_next = xprt->free; - xprt->free = req; - - /* If not congested, wake up the next backlogged process */ - if (!RPCXPRT_CONGESTED(xprt)) - rpc_wake_up_next(&xprt->backlog); + xprt_clear_backlog(xprt); } /* @@ -1382,11 +1441,7 @@ xprt_setup(struct socket *sock, int proto, dprintk("RPC: setting up %s transport...\n", proto == IPPROTO_UDP? "UDP" : "TCP"); -#if LINUX_VERSION_CODE >= 0x020100 inet = sock->sk; -#else - inet = (struct sock *) sock->data; -#endif if ((xprt = kmalloc(sizeof(struct rpc_xprt), GFP_KERNEL)) == NULL) return NULL; @@ -1398,7 +1453,8 @@ xprt_setup(struct socket *sock, int proto, xprt->addr = *ap; xprt->prot = proto; xprt->stream = (proto == IPPROTO_TCP)? 1 : 0; - xprt->cwnd = RPC_INITCWND; + xprt->congtime = jiffies; + init_waitqueue_head(&xprt->cong_wait); #ifdef SOCK_HAS_USER_DATA inet->user_data = xprt; #else @@ -1410,11 +1466,14 @@ xprt_setup(struct socket *sock, int proto, xprt->old_write_space = inet->write_space; if (proto == IPPROTO_UDP) { inet->data_ready = udp_data_ready; + inet->write_space = udp_write_space; inet->no_check = UDP_CSUM_NORCV; + xprt->cwnd = RPC_INITCWND; } else { inet->data_ready = tcp_data_ready; inet->state_change = tcp_state_change; inet->write_space = tcp_write_space; + xprt->cwnd = RPC_MAXCWND; xprt->nocong = 1; } xprt->connected = 1; @@ -1487,6 +1546,7 @@ xprt_create_socket(int proto, struct sockaddr_in *sap, struct rpc_timeout *to) (proto == IPPROTO_UDP)? "udp" : "tcp", proto); type = (proto == IPPROTO_UDP)? SOCK_DGRAM : SOCK_STREAM; + if ((err = sock_create(PF_INET, type, proto, &sock)) < 0) { printk("RPC: can't create socket (%d).\n", -err); goto failed; @@ -1543,6 +1603,21 @@ xprt_shutdown(struct rpc_xprt *xprt) rpc_wake_up(&xprt->pending); rpc_wake_up(&xprt->backlog); rpc_wake_up(&xprt->reconn); + wake_up(&xprt->cong_wait); +} + +/* + * Clear the xprt backlog queue + */ +int +xprt_clear_backlog(struct rpc_xprt *xprt) { + if (!xprt) + return 0; + if (RPCXPRT_CONGESTED(xprt)) + return 0; + rpc_wake_up_next(&xprt->backlog); + wake_up(&xprt->cong_wait); + return 1; } /* |