]> git.kernelconcepts.de Git - karo-tx-linux.git/blobdiff - net/sunrpc/xprtrdma/rpc_rdma.c
xprtrdma: Fix client lock-up after application signal fires
[karo-tx-linux.git] / net / sunrpc / xprtrdma / rpc_rdma.c
index 694e9b13ecf07722848d86345589ad4793474fb5..b6584ae8e25175c1c36bd254b9c0d442d0cdfa97 100644 (file)
@@ -734,6 +734,9 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
                rpclen = 0;
        }
 
+       req->rl_xid = rqst->rq_xid;
+       rpcrdma_insert_req(&r_xprt->rx_buf, req);
+
        /* This implementation supports the following combinations
         * of chunk lists in one RPC-over-RDMA Call message:
         *
@@ -928,6 +931,24 @@ rpcrdma_inline_fixup(struct rpc_rqst *rqst, char *srcp, int copy_len, int pad)
        return fixup_copy_count;
 }
 
+/* Caller must guarantee @rep remains stable during this call.
+ */
+static void
+rpcrdma_mark_remote_invalidation(struct list_head *mws,
+                                struct rpcrdma_rep *rep)
+{
+       struct rpcrdma_mw *mw;
+
+       if (!(rep->rr_wc_flags & IB_WC_WITH_INVALIDATE))
+               return;
+
+       list_for_each_entry(mw, mws, mw_list)
+               if (mw->mw_handle == rep->rr_inv_rkey) {
+                       mw->mw_flags = RPCRDMA_MW_F_RI;
+                       break; /* only one invalidated MR per RPC */
+               }
+}
+
 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
 /* By convention, backchannel calls arrive via rdma_msg type
  * messages, and never populate the chunk lists. This makes
@@ -969,14 +990,16 @@ rpcrdma_reply_handler(struct work_struct *work)
 {
        struct rpcrdma_rep *rep =
                        container_of(work, struct rpcrdma_rep, rr_work);
+       struct rpcrdma_xprt *r_xprt = rep->rr_rxprt;
+       struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
+       struct rpc_xprt *xprt = &r_xprt->rx_xprt;
        struct rpcrdma_msg *headerp;
        struct rpcrdma_req *req;
        struct rpc_rqst *rqst;
-       struct rpcrdma_xprt *r_xprt = rep->rr_rxprt;
-       struct rpc_xprt *xprt = &r_xprt->rx_xprt;
        __be32 *iptr;
        int rdmalen, status, rmerr;
        unsigned long cwnd;
+       struct list_head mws;
 
        dprintk("RPC:       %s: incoming rep %p\n", __func__, rep);
 
@@ -994,27 +1017,45 @@ rpcrdma_reply_handler(struct work_struct *work)
        /* Match incoming rpcrdma_rep to an rpcrdma_req to
         * get context for handling any incoming chunks.
         */
-       spin_lock_bh(&xprt->transport_lock);
-       rqst = xprt_lookup_rqst(xprt, headerp->rm_xid);
-       if (!rqst)
+       spin_lock(&buf->rb_lock);
+       req = rpcrdma_lookup_req_locked(&r_xprt->rx_buf,
+                                       headerp->rm_xid);
+       if (!req)
                goto out_nomatch;
-
-       req = rpcr_to_rdmar(rqst);
        if (req->rl_reply)
                goto out_duplicate;
 
-       /* Sanity checking has passed. We are now committed
-        * to complete this transaction.
+       list_replace_init(&req->rl_registered, &mws);
+       rpcrdma_mark_remote_invalidation(&mws, rep);
+
+       /* Avoid races with signals and duplicate replies
+        * by marking this req as matched.
         */
-       list_del_init(&rqst->rq_list);
-       spin_unlock_bh(&xprt->transport_lock);
+       req->rl_reply = rep;
+       spin_unlock(&buf->rb_lock);
+
        dprintk("RPC:       %s: reply %p completes request %p (xid 0x%08x)\n",
                __func__, rep, req, be32_to_cpu(headerp->rm_xid));
 
-       /* from here on, the reply is no longer an orphan */
-       req->rl_reply = rep;
-       xprt->reestablish_timeout = 0;
+       /* Invalidate and unmap the data payloads before waking the
+        * waiting application. This guarantees the memory regions
+        * are properly fenced from the server before the application
+        * accesses the data. It also ensures proper send flow control:
+        * waking the next RPC waits until this RPC has relinquished
+        * all its Send Queue entries.
+        */
+       if (!list_empty(&mws))
+               r_xprt->rx_ia.ri_ops->ro_unmap_sync(r_xprt, &mws);
 
+       /* Perform XID lookup, reconstruction of the RPC reply, and
+        * RPC completion while holding the transport lock to ensure
+        * the rep, rqst, and rq_task pointers remain stable.
+        */
+       spin_lock_bh(&xprt->transport_lock);
+       rqst = xprt_lookup_rqst(xprt, headerp->rm_xid);
+       if (!rqst)
+               goto out_norqst;
+       xprt->reestablish_timeout = 0;
        if (headerp->rm_vers != rpcrdma_version)
                goto out_badversion;
 
@@ -1024,12 +1065,9 @@ rpcrdma_reply_handler(struct work_struct *work)
        case rdma_msg:
                /* never expect read chunks */
                /* never expect reply chunks (two ways to check) */
-               /* never expect write chunks without having offered RDMA */
                if (headerp->rm_body.rm_chunks[0] != xdr_zero ||
                    (headerp->rm_body.rm_chunks[1] == xdr_zero &&
-                    headerp->rm_body.rm_chunks[2] != xdr_zero) ||
-                   (headerp->rm_body.rm_chunks[1] != xdr_zero &&
-                    list_empty(&req->rl_registered)))
+                    headerp->rm_body.rm_chunks[2] != xdr_zero))
                        goto badheader;
                if (headerp->rm_body.rm_chunks[1] != xdr_zero) {
                        /* count any expected write chunks in read reply */
@@ -1066,8 +1104,7 @@ rpcrdma_reply_handler(struct work_struct *work)
                /* never expect read or write chunks, always reply chunks */
                if (headerp->rm_body.rm_chunks[0] != xdr_zero ||
                    headerp->rm_body.rm_chunks[1] != xdr_zero ||
-                   headerp->rm_body.rm_chunks[2] != xdr_one ||
-                   list_empty(&req->rl_registered))
+                   headerp->rm_body.rm_chunks[2] != xdr_one)
                        goto badheader;
                iptr = (__be32 *)((unsigned char *)headerp +
                                                        RPCRDMA_HDRLEN_MIN);
@@ -1093,17 +1130,6 @@ badheader:
        }
 
 out:
-       /* Invalidate and flush the data payloads before waking the
-        * waiting application. This guarantees the memory region is
-        * properly fenced from the server before the application
-        * accesses the data. It also ensures proper send flow
-        * control: waking the next RPC waits until this RPC has
-        * relinquished all its Send Queue entries.
-        */
-       if (!list_empty(&req->rl_registered))
-               r_xprt->rx_ia.ri_ops->ro_unmap_sync(r_xprt, req);
-
-       spin_lock_bh(&xprt->transport_lock);
        cwnd = xprt->cwnd;
        xprt->cwnd = atomic_read(&r_xprt->rx_buf.rb_credits) << RPC_CWNDSHIFT;
        if (xprt->cwnd > cwnd)
@@ -1112,7 +1138,7 @@ out:
        xprt_complete_rqst(rqst->rq_task, status);
        spin_unlock_bh(&xprt->transport_lock);
        dprintk("RPC:       %s: xprt_complete_rqst(0x%p, 0x%p, %d)\n",
-                       __func__, xprt, rqst, status);
+               __func__, xprt, rqst, status);
        return;
 
 out_badstatus:
@@ -1161,26 +1187,37 @@ out_rdmaerr:
        r_xprt->rx_stats.bad_reply_count++;
        goto out;
 
-/* If no pending RPC transaction was matched, post a replacement
- * receive buffer before returning.
+/* The req was still available, but by the time the transport_lock
+ * was acquired, the rqst and task had been released. Thus the RPC
+ * has already been terminated.
  */
+out_norqst:
+       spin_unlock_bh(&xprt->transport_lock);
+       rpcrdma_buffer_put(req);
+       dprintk("RPC:       %s: race, no rqst left for req %p\n",
+               __func__, req);
+       return;
+
 out_shortreply:
        dprintk("RPC:       %s: short/invalid reply\n", __func__);
        goto repost;
 
 out_nomatch:
-       spin_unlock_bh(&xprt->transport_lock);
+       spin_unlock(&buf->rb_lock);
        dprintk("RPC:       %s: no match for incoming xid 0x%08x len %d\n",
                __func__, be32_to_cpu(headerp->rm_xid),
                rep->rr_len);
        goto repost;
 
 out_duplicate:
-       spin_unlock_bh(&xprt->transport_lock);
+       spin_unlock(&buf->rb_lock);
        dprintk("RPC:       %s: "
                "duplicate reply %p to RPC request %p: xid 0x%08x\n",
                __func__, rep, req, be32_to_cpu(headerp->rm_xid));
 
+/* If no pending RPC transaction was matched, post a replacement
+ * receive buffer before returning.
+ */
 repost:
        r_xprt->rx_stats.bad_reply_count++;
        if (rpcrdma_ep_post_recv(&r_xprt->rx_ia, rep))