]> git.kernelconcepts.de Git - karo-tx-linux.git/blobdiff - net/sunrpc/xprtrdma/verbs.c
Merge remote-tracking branch 'rdma/for-next'
[karo-tx-linux.git] / net / sunrpc / xprtrdma / verbs.c
index 6c06ba088feac757812cd84309b84b45be75e7bf..eadd1655145a3bc5b81bdefb7015792fb3be566a 100644 (file)
  * internal functions
  */
 
-/*
- * handle replies in tasklet context, using a single, global list
- * rdma tasklet function -- just turn around and call the func
- * for all replies on the list
- */
-
-static DEFINE_SPINLOCK(rpcrdma_tk_lock_g);
-static LIST_HEAD(rpcrdma_tasklets_g);
+static struct workqueue_struct *rpcrdma_receive_wq;
 
-static void
-rpcrdma_run_tasklet(unsigned long data)
+int
+rpcrdma_alloc_wq(void)
 {
-       struct rpcrdma_rep *rep;
-       unsigned long flags;
-
-       data = data;
-       spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
-       while (!list_empty(&rpcrdma_tasklets_g)) {
-               rep = list_entry(rpcrdma_tasklets_g.next,
-                                struct rpcrdma_rep, rr_list);
-               list_del(&rep->rr_list);
-               spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
+       struct workqueue_struct *recv_wq;
 
-               rpcrdma_reply_handler(rep);
+       recv_wq = alloc_workqueue("xprtrdma_receive",
+                                 WQ_MEM_RECLAIM | WQ_UNBOUND | WQ_HIGHPRI,
+                                 0);
+       if (!recv_wq)
+               return -ENOMEM;
 
-               spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
-       }
-       spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
+       rpcrdma_receive_wq = recv_wq;
+       return 0;
 }
 
-static DECLARE_TASKLET(rpcrdma_tasklet_g, rpcrdma_run_tasklet, 0UL);
-
-static void
-rpcrdma_schedule_tasklet(struct list_head *sched_list)
+void
+rpcrdma_destroy_wq(void)
 {
-       unsigned long flags;
+       struct workqueue_struct *wq;
 
-       spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
-       list_splice_tail(sched_list, &rpcrdma_tasklets_g);
-       spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
-       tasklet_schedule(&rpcrdma_tasklet_g);
+       if (rpcrdma_receive_wq) {
+               wq = rpcrdma_receive_wq;
+               rpcrdma_receive_wq = NULL;
+               destroy_workqueue(wq);
+       }
 }
 
 static void
@@ -158,63 +144,54 @@ rpcrdma_sendcq_process_wc(struct ib_wc *wc)
        }
 }
 
-static int
-rpcrdma_sendcq_poll(struct ib_cq *cq, struct rpcrdma_ep *ep)
+/* The common case is a single send completion is waiting. By
+ * passing two WC entries to ib_poll_cq, a return code of 1
+ * means there is exactly one WC waiting and no more. We don't
+ * have to invoke ib_poll_cq again to know that the CQ has been
+ * properly drained.
+ */
+static void
+rpcrdma_sendcq_poll(struct ib_cq *cq)
 {
-       struct ib_wc *wcs;
-       int budget, count, rc;
+       struct ib_wc *pos, wcs[2];
+       int count, rc;
 
-       budget = RPCRDMA_WC_BUDGET / RPCRDMA_POLLSIZE;
        do {
-               wcs = ep->rep_send_wcs;
+               pos = wcs;
 
-               rc = ib_poll_cq(cq, RPCRDMA_POLLSIZE, wcs);
-               if (rc <= 0)
-                       return rc;
+               rc = ib_poll_cq(cq, ARRAY_SIZE(wcs), pos);
+               if (rc < 0)
+                       break;
 
                count = rc;
                while (count-- > 0)
-                       rpcrdma_sendcq_process_wc(wcs++);
-       } while (rc == RPCRDMA_POLLSIZE && --budget);
-       return 0;
+                       rpcrdma_sendcq_process_wc(pos++);
+       } while (rc == ARRAY_SIZE(wcs));
+       return;
 }
 
-/*
- * Handle send, fast_reg_mr, and local_inv completions.
- *
- * Send events are typically suppressed and thus do not result
- * in an upcall. Occasionally one is signaled, however. This
- * prevents the provider's completion queue from wrapping and
- * losing a completion.
+/* Handle provider send completion upcalls.
  */
 static void
 rpcrdma_sendcq_upcall(struct ib_cq *cq, void *cq_context)
 {
-       struct rpcrdma_ep *ep = (struct rpcrdma_ep *)cq_context;
-       int rc;
-
-       rc = rpcrdma_sendcq_poll(cq, ep);
-       if (rc) {
-               dprintk("RPC:       %s: ib_poll_cq failed: %i\n",
-                       __func__, rc);
-               return;
-       }
+       do {
+               rpcrdma_sendcq_poll(cq);
+       } while (ib_req_notify_cq(cq, IB_CQ_NEXT_COMP |
+                                 IB_CQ_REPORT_MISSED_EVENTS) > 0);
+}
 
-       rc = ib_req_notify_cq(cq,
-                       IB_CQ_NEXT_COMP | IB_CQ_REPORT_MISSED_EVENTS);
-       if (rc == 0)
-               return;
-       if (rc < 0) {
-               dprintk("RPC:       %s: ib_req_notify_cq failed: %i\n",
-                       __func__, rc);
-               return;
-       }
+static void
+rpcrdma_receive_worker(struct work_struct *work)
+{
+       struct rpcrdma_rep *rep =
+                       container_of(work, struct rpcrdma_rep, rr_work);
 
-       rpcrdma_sendcq_poll(cq, ep);
+       rpcrdma_reply_handler(rep);
 }
 
 static void
-rpcrdma_recvcq_process_wc(struct ib_wc *wc, struct list_head *sched_list)
+rpcrdma_recvcq_process_wc(struct ib_wc *wc)
 {
        struct rpcrdma_rep *rep =
                        (struct rpcrdma_rep *)(unsigned long)wc->wr_id;
@@ -237,91 +214,60 @@ rpcrdma_recvcq_process_wc(struct ib_wc *wc, struct list_head *sched_list)
        prefetch(rdmab_to_msg(rep->rr_rdmabuf));
 
 out_schedule:
-       list_add_tail(&rep->rr_list, sched_list);
+       queue_work(rpcrdma_receive_wq, &rep->rr_work);
        return;
+
 out_fail:
        if (wc->status != IB_WC_WR_FLUSH_ERR)
                pr_err("RPC:       %s: rep %p: %s\n",
                       __func__, rep, ib_wc_status_msg(wc->status));
-       rep->rr_len = ~0U;
+       rep->rr_len = RPCRDMA_BAD_LEN;
        goto out_schedule;
 }
 
-static int
-rpcrdma_recvcq_poll(struct ib_cq *cq, struct rpcrdma_ep *ep)
+/* The wc array is on stack: automatic memory is always CPU-local.
+ *
+ * struct ib_wc is 64 bytes, making the poll array potentially
+ * large. But this is at the bottom of the call chain. Further
+ * substantial work is done in another thread.
+ */
+static void
+rpcrdma_recvcq_poll(struct ib_cq *cq)
 {
-       struct list_head sched_list;
-       struct ib_wc *wcs;
-       int budget, count, rc;
+       struct ib_wc *pos, wcs[4];
+       int count, rc;
 
-       INIT_LIST_HEAD(&sched_list);
-       budget = RPCRDMA_WC_BUDGET / RPCRDMA_POLLSIZE;
        do {
-               wcs = ep->rep_recv_wcs;
+               pos = wcs;
 
-               rc = ib_poll_cq(cq, RPCRDMA_POLLSIZE, wcs);
-               if (rc <= 0)
-                       goto out_schedule;
+               rc = ib_poll_cq(cq, ARRAY_SIZE(wcs), pos);
+               if (rc < 0)
+                       break;
 
                count = rc;
                while (count-- > 0)
-                       rpcrdma_recvcq_process_wc(wcs++, &sched_list);
-       } while (rc == RPCRDMA_POLLSIZE && --budget);
-       rc = 0;
-
-out_schedule:
-       rpcrdma_schedule_tasklet(&sched_list);
-       return rc;
+                       rpcrdma_recvcq_process_wc(pos++);
+       } while (rc == ARRAY_SIZE(wcs));
 }
 
-/*
- * Handle receive completions.
- *
- * It is reentrant but processes single events in order to maintain
- * ordering of receives to keep server credits.
- *
- * It is the responsibility of the scheduled tasklet to return
- * recv buffers to the pool. NOTE: this affects synchronization of
- * connection shutdown. That is, the structures required for
- * the completion of the reply handler must remain intact until
- * all memory has been reclaimed.
+/* Handle provider receive completion upcalls.
  */
 static void
 rpcrdma_recvcq_upcall(struct ib_cq *cq, void *cq_context)
 {
-       struct rpcrdma_ep *ep = (struct rpcrdma_ep *)cq_context;
-       int rc;
-
-       rc = rpcrdma_recvcq_poll(cq, ep);
-       if (rc) {
-               dprintk("RPC:       %s: ib_poll_cq failed: %i\n",
-                       __func__, rc);
-               return;
-       }
-
-       rc = ib_req_notify_cq(cq,
-                       IB_CQ_NEXT_COMP | IB_CQ_REPORT_MISSED_EVENTS);
-       if (rc == 0)
-               return;
-       if (rc < 0) {
-               dprintk("RPC:       %s: ib_req_notify_cq failed: %i\n",
-                       __func__, rc);
-               return;
-       }
-
-       rpcrdma_recvcq_poll(cq, ep);
+       do {
+               rpcrdma_recvcq_poll(cq);
+       } while (ib_req_notify_cq(cq, IB_CQ_NEXT_COMP |
+                                 IB_CQ_REPORT_MISSED_EVENTS) > 0);
 }
 
 static void
 rpcrdma_flush_cqs(struct rpcrdma_ep *ep)
 {
        struct ib_wc wc;
-       LIST_HEAD(sched_list);
 
        while (ib_poll_cq(ep->rep_attr.recv_cq, 1, &wc) > 0)
-               rpcrdma_recvcq_process_wc(&wc, &sched_list);
-       if (!list_empty(&sched_list))
-               rpcrdma_schedule_tasklet(&sched_list);
+               rpcrdma_recvcq_process_wc(&wc);
        while (ib_poll_cq(ep->rep_attr.send_cq, 1, &wc) > 0)
                rpcrdma_sendcq_process_wc(&wc);
 }
@@ -623,6 +569,7 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
        struct ib_device_attr *devattr = &ia->ri_devattr;
        struct ib_cq *sendcq, *recvcq;
        struct ib_cq_init_attr cq_attr = {};
+       unsigned int max_qp_wr;
        int rc, err;
 
        if (devattr->max_sge < RPCRDMA_MAX_IOVS) {
@@ -631,18 +578,27 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
                return -ENOMEM;
        }
 
+       if (devattr->max_qp_wr <= RPCRDMA_BACKWARD_WRS) {
+               dprintk("RPC:       %s: insufficient wqe's available\n",
+                       __func__);
+               return -ENOMEM;
+       }
+       max_qp_wr = devattr->max_qp_wr - RPCRDMA_BACKWARD_WRS;
+
        /* check provider's send/recv wr limits */
-       if (cdata->max_requests > devattr->max_qp_wr)
-               cdata->max_requests = devattr->max_qp_wr;
+       if (cdata->max_requests > max_qp_wr)
+               cdata->max_requests = max_qp_wr;
 
        ep->rep_attr.event_handler = rpcrdma_qp_async_error_upcall;
        ep->rep_attr.qp_context = ep;
        ep->rep_attr.srq = NULL;
        ep->rep_attr.cap.max_send_wr = cdata->max_requests;
+       ep->rep_attr.cap.max_send_wr += RPCRDMA_BACKWARD_WRS;
        rc = ia->ri_ops->ro_open(ia, ep, cdata);
        if (rc)
                return rc;
        ep->rep_attr.cap.max_recv_wr = cdata->max_requests;
+       ep->rep_attr.cap.max_recv_wr += RPCRDMA_BACKWARD_WRS;
        ep->rep_attr.cap.max_send_sge = RPCRDMA_MAX_IOVS;
        ep->rep_attr.cap.max_recv_sge = 1;
        ep->rep_attr.cap.max_inline_data = 0;
@@ -670,7 +626,7 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
 
        cq_attr.cqe = ep->rep_attr.cap.max_send_wr + 1;
        sendcq = ib_create_cq(ia->ri_device, rpcrdma_sendcq_upcall,
-                             rpcrdma_cq_async_error_upcall, ep, &cq_attr);
+                             rpcrdma_cq_async_error_upcall, NULL, &cq_attr);
        if (IS_ERR(sendcq)) {
                rc = PTR_ERR(sendcq);
                dprintk("RPC:       %s: failed to create send CQ: %i\n",
@@ -687,7 +643,7 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
 
        cq_attr.cqe = ep->rep_attr.cap.max_recv_wr + 1;
        recvcq = ib_create_cq(ia->ri_device, rpcrdma_recvcq_upcall,
-                             rpcrdma_cq_async_error_upcall, ep, &cq_attr);
+                             rpcrdma_cq_async_error_upcall, NULL, &cq_attr);
        if (IS_ERR(recvcq)) {
                rc = PTR_ERR(recvcq);
                dprintk("RPC:       %s: failed to create recv CQ: %i\n",
@@ -754,19 +710,22 @@ rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
 
        cancel_delayed_work_sync(&ep->rep_connect_worker);
 
-       if (ia->ri_id->qp) {
+       if (ia->ri_id->qp)
                rpcrdma_ep_disconnect(ep, ia);
+
+       rpcrdma_clean_cq(ep->rep_attr.recv_cq);
+       rpcrdma_clean_cq(ep->rep_attr.send_cq);
+
+       if (ia->ri_id->qp) {
                rdma_destroy_qp(ia->ri_id);
                ia->ri_id->qp = NULL;
        }
 
-       rpcrdma_clean_cq(ep->rep_attr.recv_cq);
        rc = ib_destroy_cq(ep->rep_attr.recv_cq);
        if (rc)
                dprintk("RPC:       %s: ib_destroy_cq returned %i\n",
                        __func__, rc);
 
-       rpcrdma_clean_cq(ep->rep_attr.send_cq);
        rc = ib_destroy_cq(ep->rep_attr.send_cq);
        if (rc)
                dprintk("RPC:       %s: ib_destroy_cq returned %i\n",
@@ -883,7 +842,21 @@ retry:
                }
                rc = ep->rep_connected;
        } else {
+               struct rpcrdma_xprt *r_xprt;
+               unsigned int extras;
+
                dprintk("RPC:       %s: connected\n", __func__);
+
+               r_xprt = container_of(ia, struct rpcrdma_xprt, rx_ia);
+               extras = r_xprt->rx_buf.rb_bc_srv_max_requests;
+
+               if (extras) {
+                       rc = rpcrdma_ep_post_extra_recv(r_xprt, extras);
+                       if (rc)
+                               pr_warn("%s: rpcrdma_ep_post_extra_recv: %i\n",
+                                       __func__, rc);
+                               rc = 0;
+               }
        }
 
 out:
@@ -920,20 +893,25 @@ rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
        }
 }
 
-static struct rpcrdma_req *
+struct rpcrdma_req *
 rpcrdma_create_req(struct rpcrdma_xprt *r_xprt)
 {
+       struct rpcrdma_buffer *buffer = &r_xprt->rx_buf;
        struct rpcrdma_req *req;
 
        req = kzalloc(sizeof(*req), GFP_KERNEL);
        if (req == NULL)
                return ERR_PTR(-ENOMEM);
 
+       INIT_LIST_HEAD(&req->rl_free);
+       spin_lock(&buffer->rb_reqslock);
+       list_add(&req->rl_all, &buffer->rb_allreqs);
+       spin_unlock(&buffer->rb_reqslock);
        req->rl_buffer = &r_xprt->rx_buf;
        return req;
 }
 
-static struct rpcrdma_rep *
+struct rpcrdma_rep *
 rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt)
 {
        struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
@@ -955,6 +933,7 @@ rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt)
 
        rep->rr_device = ia->ri_device;
        rep->rr_rxprt = r_xprt;
+       INIT_WORK(&rep->rr_work, rpcrdma_receive_worker);
        return rep;
 
 out_free:
@@ -968,44 +947,21 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
 {
        struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
        struct rpcrdma_ia *ia = &r_xprt->rx_ia;
-       struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
-       char *p;
-       size_t len;
        int i, rc;
 
-       buf->rb_max_requests = cdata->max_requests;
+       buf->rb_max_requests = r_xprt->rx_data.max_requests;
+       buf->rb_bc_srv_max_requests = 0;
        spin_lock_init(&buf->rb_lock);
 
-       /* Need to allocate:
-        *   1.  arrays for send and recv pointers
-        *   2.  arrays of struct rpcrdma_req to fill in pointers
-        *   3.  array of struct rpcrdma_rep for replies
-        * Send/recv buffers in req/rep need to be registered
-        */
-       len = buf->rb_max_requests *
-               (sizeof(struct rpcrdma_req *) + sizeof(struct rpcrdma_rep *));
-
-       p = kzalloc(len, GFP_KERNEL);
-       if (p == NULL) {
-               dprintk("RPC:       %s: req_t/rep_t/pad kzalloc(%zd) failed\n",
-                       __func__, len);
-               rc = -ENOMEM;
-               goto out;
-       }
-       buf->rb_pool = p;       /* for freeing it later */
-
-       buf->rb_send_bufs = (struct rpcrdma_req **) p;
-       p = (char *) &buf->rb_send_bufs[buf->rb_max_requests];
-       buf->rb_recv_bufs = (struct rpcrdma_rep **) p;
-       p = (char *) &buf->rb_recv_bufs[buf->rb_max_requests];
-
        rc = ia->ri_ops->ro_init(r_xprt);
        if (rc)
                goto out;
 
+       INIT_LIST_HEAD(&buf->rb_send_bufs);
+       INIT_LIST_HEAD(&buf->rb_allreqs);
+       spin_lock_init(&buf->rb_reqslock);
        for (i = 0; i < buf->rb_max_requests; i++) {
                struct rpcrdma_req *req;
-               struct rpcrdma_rep *rep;
 
                req = rpcrdma_create_req(r_xprt);
                if (IS_ERR(req)) {
@@ -1014,7 +970,13 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
                        rc = PTR_ERR(req);
                        goto out;
                }
-               buf->rb_send_bufs[i] = req;
+               req->rl_backchannel = false;
+               list_add(&req->rl_free, &buf->rb_send_bufs);
+       }
+
+       INIT_LIST_HEAD(&buf->rb_recv_bufs);
+       for (i = 0; i < buf->rb_max_requests + 2; i++) {
+               struct rpcrdma_rep *rep;
 
                rep = rpcrdma_create_rep(r_xprt);
                if (IS_ERR(rep)) {
@@ -1023,7 +985,7 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
                        rc = PTR_ERR(rep);
                        goto out;
                }
-               buf->rb_recv_bufs[i] = rep;
+               list_add(&rep->rr_list, &buf->rb_recv_bufs);
        }
 
        return 0;
@@ -1032,22 +994,38 @@ out:
        return rc;
 }
 
+static struct rpcrdma_req *
+rpcrdma_buffer_get_req_locked(struct rpcrdma_buffer *buf)
+{
+       struct rpcrdma_req *req;
+
+       req = list_first_entry(&buf->rb_send_bufs,
+                              struct rpcrdma_req, rl_free);
+       list_del(&req->rl_free);
+       return req;
+}
+
+static struct rpcrdma_rep *
+rpcrdma_buffer_get_rep_locked(struct rpcrdma_buffer *buf)
+{
+       struct rpcrdma_rep *rep;
+
+       rep = list_first_entry(&buf->rb_recv_bufs,
+                              struct rpcrdma_rep, rr_list);
+       list_del(&rep->rr_list);
+       return rep;
+}
+
 static void
 rpcrdma_destroy_rep(struct rpcrdma_ia *ia, struct rpcrdma_rep *rep)
 {
-       if (!rep)
-               return;
-
        rpcrdma_free_regbuf(ia, rep->rr_rdmabuf);
        kfree(rep);
 }
 
-static void
+void
 rpcrdma_destroy_req(struct rpcrdma_ia *ia, struct rpcrdma_req *req)
 {
-       if (!req)
-               return;
-
        rpcrdma_free_regbuf(ia, req->rl_sendbuf);
        rpcrdma_free_regbuf(ia, req->rl_rdmabuf);
        kfree(req);
@@ -1057,25 +1035,29 @@ void
 rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
 {
        struct rpcrdma_ia *ia = rdmab_to_ia(buf);
-       int i;
 
-       /* clean up in reverse order from create
-        *   1.  recv mr memory (mr free, then kfree)
-        *   2.  send mr memory (mr free, then kfree)
-        *   3.  MWs
-        */
-       dprintk("RPC:       %s: entering\n", __func__);
+       while (!list_empty(&buf->rb_recv_bufs)) {
+               struct rpcrdma_rep *rep;
 
-       for (i = 0; i < buf->rb_max_requests; i++) {
-               if (buf->rb_recv_bufs)
-                       rpcrdma_destroy_rep(ia, buf->rb_recv_bufs[i]);
-               if (buf->rb_send_bufs)
-                       rpcrdma_destroy_req(ia, buf->rb_send_bufs[i]);
+               rep = rpcrdma_buffer_get_rep_locked(buf);
+               rpcrdma_destroy_rep(ia, rep);
        }
 
-       ia->ri_ops->ro_destroy(buf);
+       spin_lock(&buf->rb_reqslock);
+       while (!list_empty(&buf->rb_allreqs)) {
+               struct rpcrdma_req *req;
 
-       kfree(buf->rb_pool);
+               req = list_first_entry(&buf->rb_allreqs,
+                                      struct rpcrdma_req, rl_all);
+               list_del(&req->rl_all);
+
+               spin_unlock(&buf->rb_reqslock);
+               rpcrdma_destroy_req(ia, req);
+               spin_lock(&buf->rb_reqslock);
+       }
+       spin_unlock(&buf->rb_reqslock);
+
+       ia->ri_ops->ro_destroy(buf);
 }
 
 struct rpcrdma_mw *
@@ -1107,53 +1089,34 @@ rpcrdma_put_mw(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mw *mw)
        spin_unlock(&buf->rb_mwlock);
 }
 
-static void
-rpcrdma_buffer_put_sendbuf(struct rpcrdma_req *req, struct rpcrdma_buffer *buf)
-{
-       buf->rb_send_bufs[--buf->rb_send_index] = req;
-       req->rl_niovs = 0;
-       if (req->rl_reply) {
-               buf->rb_recv_bufs[--buf->rb_recv_index] = req->rl_reply;
-               req->rl_reply = NULL;
-       }
-}
-
 /*
  * Get a set of request/reply buffers.
  *
- * Reply buffer (if needed) is attached to send buffer upon return.
- * Rule:
- *    rb_send_index and rb_recv_index MUST always be pointing to the
- *    *next* available buffer (non-NULL). They are incremented after
- *    removing buffers, and decremented *before* returning them.
+ * Reply buffer (if available) is attached to send buffer upon return.
  */
 struct rpcrdma_req *
 rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
 {
        struct rpcrdma_req *req;
-       unsigned long flags;
-
-       spin_lock_irqsave(&buffers->rb_lock, flags);
-
-       if (buffers->rb_send_index == buffers->rb_max_requests) {
-               spin_unlock_irqrestore(&buffers->rb_lock, flags);
-               dprintk("RPC:       %s: out of request buffers\n", __func__);
-               return ((struct rpcrdma_req *)NULL);
-       }
 
-       req = buffers->rb_send_bufs[buffers->rb_send_index];
-       if (buffers->rb_send_index < buffers->rb_recv_index) {
-               dprintk("RPC:       %s: %d extra receives outstanding (ok)\n",
-                       __func__,
-                       buffers->rb_recv_index - buffers->rb_send_index);
-               req->rl_reply = NULL;
-       } else {
-               req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
-               buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
-       }
-       buffers->rb_send_bufs[buffers->rb_send_index++] = NULL;
+       spin_lock(&buffers->rb_lock);
+       if (list_empty(&buffers->rb_send_bufs))
+               goto out_reqbuf;
+       req = rpcrdma_buffer_get_req_locked(buffers);
+       if (list_empty(&buffers->rb_recv_bufs))
+               goto out_repbuf;
+       req->rl_reply = rpcrdma_buffer_get_rep_locked(buffers);
+       spin_unlock(&buffers->rb_lock);
+       return req;
 
-       spin_unlock_irqrestore(&buffers->rb_lock, flags);
+out_reqbuf:
+       spin_unlock(&buffers->rb_lock);
+       pr_warn("RPC:       %s: out of request buffers\n", __func__);
+       return NULL;
+out_repbuf:
+       spin_unlock(&buffers->rb_lock);
+       pr_warn("RPC:       %s: out of reply buffers\n", __func__);
+       req->rl_reply = NULL;
        return req;
 }
 
@@ -1165,30 +1128,31 @@ void
 rpcrdma_buffer_put(struct rpcrdma_req *req)
 {
        struct rpcrdma_buffer *buffers = req->rl_buffer;
-       unsigned long flags;
+       struct rpcrdma_rep *rep = req->rl_reply;
 
-       spin_lock_irqsave(&buffers->rb_lock, flags);
-       rpcrdma_buffer_put_sendbuf(req, buffers);
-       spin_unlock_irqrestore(&buffers->rb_lock, flags);
+       req->rl_niovs = 0;
+       req->rl_reply = NULL;
+
+       spin_lock(&buffers->rb_lock);
+       list_add_tail(&req->rl_free, &buffers->rb_send_bufs);
+       if (rep)
+               list_add_tail(&rep->rr_list, &buffers->rb_recv_bufs);
+       spin_unlock(&buffers->rb_lock);
 }
 
 /*
  * Recover reply buffers from pool.
- * This happens when recovering from error conditions.
- * Post-increment counter/array index.
+ * This happens when recovering from disconnect.
  */
 void
 rpcrdma_recv_buffer_get(struct rpcrdma_req *req)
 {
        struct rpcrdma_buffer *buffers = req->rl_buffer;
-       unsigned long flags;
 
-       spin_lock_irqsave(&buffers->rb_lock, flags);
-       if (buffers->rb_recv_index < buffers->rb_max_requests) {
-               req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
-               buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
-       }
-       spin_unlock_irqrestore(&buffers->rb_lock, flags);
+       spin_lock(&buffers->rb_lock);
+       if (!list_empty(&buffers->rb_recv_bufs))
+               req->rl_reply = rpcrdma_buffer_get_rep_locked(buffers);
+       spin_unlock(&buffers->rb_lock);
 }
 
 /*
@@ -1199,11 +1163,10 @@ void
 rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
 {
        struct rpcrdma_buffer *buffers = &rep->rr_rxprt->rx_buf;
-       unsigned long flags;
 
-       spin_lock_irqsave(&buffers->rb_lock, flags);
-       buffers->rb_recv_bufs[--buffers->rb_recv_index] = rep;
-       spin_unlock_irqrestore(&buffers->rb_lock, flags);
+       spin_lock(&buffers->rb_lock);
+       list_add_tail(&rep->rr_list, &buffers->rb_recv_bufs);
+       spin_unlock(&buffers->rb_lock);
 }
 
 /*
@@ -1360,6 +1323,47 @@ rpcrdma_ep_post_recv(struct rpcrdma_ia *ia,
        return rc;
 }
 
+/**
+ * rpcrdma_ep_post_extra_recv - Post buffers for incoming backchannel requests
+ * @r_xprt: transport associated with these backchannel resources
+ * @min_reqs: minimum number of incoming requests expected
+ *
+ * Returns zero if all requested buffers were posted, or a negative errno.
+ */
+int
+rpcrdma_ep_post_extra_recv(struct rpcrdma_xprt *r_xprt, unsigned int count)
+{
+       struct rpcrdma_buffer *buffers = &r_xprt->rx_buf;
+       struct rpcrdma_ia *ia = &r_xprt->rx_ia;
+       struct rpcrdma_ep *ep = &r_xprt->rx_ep;
+       struct rpcrdma_rep *rep;
+       unsigned long flags;
+       int rc;
+
+       while (count--) {
+               spin_lock_irqsave(&buffers->rb_lock, flags);
+               if (list_empty(&buffers->rb_recv_bufs))
+                       goto out_reqbuf;
+               rep = rpcrdma_buffer_get_rep_locked(buffers);
+               spin_unlock_irqrestore(&buffers->rb_lock, flags);
+
+               rc = rpcrdma_ep_post_recv(ia, ep, rep);
+               if (rc)
+                       goto out_rc;
+       }
+
+       return 0;
+
+out_reqbuf:
+       spin_unlock_irqrestore(&buffers->rb_lock, flags);
+       pr_warn("%s: no extra receive buffers\n", __func__);
+       return -ENOMEM;
+
+out_rc:
+       rpcrdma_recv_buffer_put(rep);
+       return rc;
+}
+
 /* How many chunk list items fit within our inline buffers?
  */
 unsigned int