]> git.kernelconcepts.de Git - karo-tx-linux.git/commitdiff
xprtrdma: Fix client lock-up after application signal fires
authorChuck Lever <chuck.lever@oracle.com>
Thu, 8 Jun 2017 15:52:20 +0000 (11:52 -0400)
committerAnna Schumaker <Anna.Schumaker@Netapp.com>
Thu, 13 Jul 2017 20:00:11 +0000 (16:00 -0400)
After a signal, the RPC client aborts synchronous RPCs running on
behalf of the signaled application.

The server is still executing those RPCs, and will write the results
back into the client's memory when it's done. By the time the server
writes the results, that memory is likely being used for other
purposes. Therefore xprtrdma has to immediately invalidate all
memory regions used by those aborted RPCs to prevent the server's
writes from clobbering that re-used memory.

With FMR memory registration, invalidation takes a relatively long
time. In fact, the invalidation is often still running when the
server tries to write the results into the memory regions that are
being invalidated.

This sets up a race between two processes:

1.  After the signal, xprt_rdma_free calls ro_unmap_safe.
2.  While ro_unmap_safe is still running, the server replies and
    rpcrdma_reply_handler runs, calling ro_unmap_sync.

Both processes invoke ib_unmap_fmr on the same FMR.

The mlx4 driver allows two ib_unmap_fmr calls on the same FMR at
the same time, but HCAs generally don't tolerate this. Sometimes
this can result in a system crash.

If the HCA happens to survive, rpcrdma_reply_handler continues. It
removes the rpc_rqst from rq_list and releases the transport_lock.
This enables xprt_rdma_free to run in another process, and the
rpc_rqst is released while rpcrdma_reply_handler is still waiting
for the ib_unmap_fmr call to finish.

But further down in rpcrdma_reply_handler, the transport_lock is
taken again, and "rqst" is dereferenced. If "rqst" has already been
released, this triggers a general protection fault. Since bottom-
halves are disabled, the system locks up.

Address both issues by reversing the order of the xprt_lookup_rqst
call and the ro_unmap_sync call. Introduce a separate lookup
mechanism for rpcrdma_req's to enable calling ro_unmap_sync before
xprt_lookup_rqst. Now the handler takes the transport_lock once
and holds it for the XID lookup and RPC completion.

BugLink: https://bugzilla.linux-nfs.org/show_bug.cgi?id=305
Fixes: 68791649a725 ('xprtrdma: Invalidate in the RPC reply ... ')
Signed-off-by: Chuck Lever <chuck.lever@oracle.com>
Signed-off-by: Anna Schumaker <Anna.Schumaker@Netapp.com>
net/sunrpc/xprtrdma/rpc_rdma.c
net/sunrpc/xprtrdma/transport.c
net/sunrpc/xprtrdma/verbs.c
net/sunrpc/xprtrdma/xprt_rdma.h

index c88132d02fb81b2b5a834c292632847850c666b8..b6584ae8e25175c1c36bd254b9c0d442d0cdfa97 100644 (file)
@@ -734,6 +734,9 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
                rpclen = 0;
        }
 
                rpclen = 0;
        }
 
+       req->rl_xid = rqst->rq_xid;
+       rpcrdma_insert_req(&r_xprt->rx_buf, req);
+
        /* This implementation supports the following combinations
         * of chunk lists in one RPC-over-RDMA Call message:
         *
        /* This implementation supports the following combinations
         * of chunk lists in one RPC-over-RDMA Call message:
         *
@@ -987,11 +990,12 @@ rpcrdma_reply_handler(struct work_struct *work)
 {
        struct rpcrdma_rep *rep =
                        container_of(work, struct rpcrdma_rep, rr_work);
 {
        struct rpcrdma_rep *rep =
                        container_of(work, struct rpcrdma_rep, rr_work);
+       struct rpcrdma_xprt *r_xprt = rep->rr_rxprt;
+       struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
+       struct rpc_xprt *xprt = &r_xprt->rx_xprt;
        struct rpcrdma_msg *headerp;
        struct rpcrdma_req *req;
        struct rpc_rqst *rqst;
        struct rpcrdma_msg *headerp;
        struct rpcrdma_req *req;
        struct rpc_rqst *rqst;
-       struct rpcrdma_xprt *r_xprt = rep->rr_rxprt;
-       struct rpc_xprt *xprt = &r_xprt->rx_xprt;
        __be32 *iptr;
        int rdmalen, status, rmerr;
        unsigned long cwnd;
        __be32 *iptr;
        int rdmalen, status, rmerr;
        unsigned long cwnd;
@@ -1013,28 +1017,45 @@ rpcrdma_reply_handler(struct work_struct *work)
        /* Match incoming rpcrdma_rep to an rpcrdma_req to
         * get context for handling any incoming chunks.
         */
        /* Match incoming rpcrdma_rep to an rpcrdma_req to
         * get context for handling any incoming chunks.
         */
-       spin_lock_bh(&xprt->transport_lock);
-       rqst = xprt_lookup_rqst(xprt, headerp->rm_xid);
-       if (!rqst)
+       spin_lock(&buf->rb_lock);
+       req = rpcrdma_lookup_req_locked(&r_xprt->rx_buf,
+                                       headerp->rm_xid);
+       if (!req)
                goto out_nomatch;
                goto out_nomatch;
-
-       req = rpcr_to_rdmar(rqst);
        if (req->rl_reply)
                goto out_duplicate;
 
        if (req->rl_reply)
                goto out_duplicate;
 
-       /* Sanity checking has passed. We are now committed
-        * to complete this transaction.
-        */
        list_replace_init(&req->rl_registered, &mws);
        rpcrdma_mark_remote_invalidation(&mws, rep);
        list_replace_init(&req->rl_registered, &mws);
        rpcrdma_mark_remote_invalidation(&mws, rep);
-       list_del_init(&rqst->rq_list);
+
+       /* Avoid races with signals and duplicate replies
+        * by marking this req as matched.
+        */
        req->rl_reply = rep;
        req->rl_reply = rep;
-       spin_unlock_bh(&xprt->transport_lock);
+       spin_unlock(&buf->rb_lock);
+
        dprintk("RPC:       %s: reply %p completes request %p (xid 0x%08x)\n",
                __func__, rep, req, be32_to_cpu(headerp->rm_xid));
 
        dprintk("RPC:       %s: reply %p completes request %p (xid 0x%08x)\n",
                __func__, rep, req, be32_to_cpu(headerp->rm_xid));
 
-       xprt->reestablish_timeout = 0;
+       /* Invalidate and unmap the data payloads before waking the
+        * waiting application. This guarantees the memory regions
+        * are properly fenced from the server before the application
+        * accesses the data. It also ensures proper send flow control:
+        * waking the next RPC waits until this RPC has relinquished
+        * all its Send Queue entries.
+        */
+       if (!list_empty(&mws))
+               r_xprt->rx_ia.ri_ops->ro_unmap_sync(r_xprt, &mws);
 
 
+       /* Perform XID lookup, reconstruction of the RPC reply, and
+        * RPC completion while holding the transport lock to ensure
+        * the rep, rqst, and rq_task pointers remain stable.
+        */
+       spin_lock_bh(&xprt->transport_lock);
+       rqst = xprt_lookup_rqst(xprt, headerp->rm_xid);
+       if (!rqst)
+               goto out_norqst;
+       xprt->reestablish_timeout = 0;
        if (headerp->rm_vers != rpcrdma_version)
                goto out_badversion;
 
        if (headerp->rm_vers != rpcrdma_version)
                goto out_badversion;
 
@@ -1109,17 +1130,6 @@ badheader:
        }
 
 out:
        }
 
 out:
-       /* Invalidate and flush the data payloads before waking the
-        * waiting application. This guarantees the memory region is
-        * properly fenced from the server before the application
-        * accesses the data. It also ensures proper send flow
-        * control: waking the next RPC waits until this RPC has
-        * relinquished all its Send Queue entries.
-        */
-       if (!list_empty(&mws))
-               r_xprt->rx_ia.ri_ops->ro_unmap_sync(r_xprt, &mws);
-
-       spin_lock_bh(&xprt->transport_lock);
        cwnd = xprt->cwnd;
        xprt->cwnd = atomic_read(&r_xprt->rx_buf.rb_credits) << RPC_CWNDSHIFT;
        if (xprt->cwnd > cwnd)
        cwnd = xprt->cwnd;
        xprt->cwnd = atomic_read(&r_xprt->rx_buf.rb_credits) << RPC_CWNDSHIFT;
        if (xprt->cwnd > cwnd)
@@ -1128,7 +1138,7 @@ out:
        xprt_complete_rqst(rqst->rq_task, status);
        spin_unlock_bh(&xprt->transport_lock);
        dprintk("RPC:       %s: xprt_complete_rqst(0x%p, 0x%p, %d)\n",
        xprt_complete_rqst(rqst->rq_task, status);
        spin_unlock_bh(&xprt->transport_lock);
        dprintk("RPC:       %s: xprt_complete_rqst(0x%p, 0x%p, %d)\n",
-                       __func__, xprt, rqst, status);
+               __func__, xprt, rqst, status);
        return;
 
 out_badstatus:
        return;
 
 out_badstatus:
@@ -1177,26 +1187,37 @@ out_rdmaerr:
        r_xprt->rx_stats.bad_reply_count++;
        goto out;
 
        r_xprt->rx_stats.bad_reply_count++;
        goto out;
 
-/* If no pending RPC transaction was matched, post a replacement
- * receive buffer before returning.
+/* The req was still available, but by the time the transport_lock
+ * was acquired, the rqst and task had been released. Thus the RPC
+ * has already been terminated.
  */
  */
+out_norqst:
+       spin_unlock_bh(&xprt->transport_lock);
+       rpcrdma_buffer_put(req);
+       dprintk("RPC:       %s: race, no rqst left for req %p\n",
+               __func__, req);
+       return;
+
 out_shortreply:
        dprintk("RPC:       %s: short/invalid reply\n", __func__);
        goto repost;
 
 out_nomatch:
 out_shortreply:
        dprintk("RPC:       %s: short/invalid reply\n", __func__);
        goto repost;
 
 out_nomatch:
-       spin_unlock_bh(&xprt->transport_lock);
+       spin_unlock(&buf->rb_lock);
        dprintk("RPC:       %s: no match for incoming xid 0x%08x len %d\n",
                __func__, be32_to_cpu(headerp->rm_xid),
                rep->rr_len);
        goto repost;
 
 out_duplicate:
        dprintk("RPC:       %s: no match for incoming xid 0x%08x len %d\n",
                __func__, be32_to_cpu(headerp->rm_xid),
                rep->rr_len);
        goto repost;
 
 out_duplicate:
-       spin_unlock_bh(&xprt->transport_lock);
+       spin_unlock(&buf->rb_lock);
        dprintk("RPC:       %s: "
                "duplicate reply %p to RPC request %p: xid 0x%08x\n",
                __func__, rep, req, be32_to_cpu(headerp->rm_xid));
 
        dprintk("RPC:       %s: "
                "duplicate reply %p to RPC request %p: xid 0x%08x\n",
                __func__, rep, req, be32_to_cpu(headerp->rm_xid));
 
+/* If no pending RPC transaction was matched, post a replacement
+ * receive buffer before returning.
+ */
 repost:
        r_xprt->rx_stats.bad_reply_count++;
        if (rpcrdma_ep_post_recv(&r_xprt->rx_ia, rep))
 repost:
        r_xprt->rx_stats.bad_reply_count++;
        if (rpcrdma_ep_post_recv(&r_xprt->rx_ia, rep))
index 62ecbccd9748e54874059209070ebe4a6b9591e7..d1c458e5ec4de25b81f30fa5919cb0b9dfc5ffdf 100644 (file)
@@ -684,7 +684,8 @@ xprt_rdma_free(struct rpc_task *task)
 
        dprintk("RPC:       %s: called on 0x%p\n", __func__, req->rl_reply);
 
 
        dprintk("RPC:       %s: called on 0x%p\n", __func__, req->rl_reply);
 
-       if (unlikely(!list_empty(&req->rl_registered)))
+       rpcrdma_remove_req(&r_xprt->rx_buf, req);
+       if (!list_empty(&req->rl_registered))
                ia->ri_ops->ro_unmap_safe(r_xprt, req, !RPC_IS_ASYNC(task));
        rpcrdma_unmap_sges(ia, req);
        rpcrdma_buffer_put(req);
                ia->ri_ops->ro_unmap_safe(r_xprt, req, !RPC_IS_ASYNC(task));
        rpcrdma_unmap_sges(ia, req);
        rpcrdma_buffer_put(req);
index df72224604d2d15b89f2e3f1ceaba3c80c32689d..a215a8759dc202c19fe63831ace31f3589da7167 100644 (file)
@@ -1032,6 +1032,7 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
        spin_lock_init(&buf->rb_recovery_lock);
        INIT_LIST_HEAD(&buf->rb_mws);
        INIT_LIST_HEAD(&buf->rb_all);
        spin_lock_init(&buf->rb_recovery_lock);
        INIT_LIST_HEAD(&buf->rb_mws);
        INIT_LIST_HEAD(&buf->rb_all);
+       INIT_LIST_HEAD(&buf->rb_pending);
        INIT_LIST_HEAD(&buf->rb_stale_mrs);
        INIT_DELAYED_WORK(&buf->rb_refresh_worker,
                          rpcrdma_mr_refresh_worker);
        INIT_LIST_HEAD(&buf->rb_stale_mrs);
        INIT_DELAYED_WORK(&buf->rb_refresh_worker,
                          rpcrdma_mr_refresh_worker);
@@ -1084,7 +1085,7 @@ rpcrdma_buffer_get_req_locked(struct rpcrdma_buffer *buf)
 
        req = list_first_entry(&buf->rb_send_bufs,
                               struct rpcrdma_req, rl_list);
 
        req = list_first_entry(&buf->rb_send_bufs,
                               struct rpcrdma_req, rl_list);
-       list_del(&req->rl_list);
+       list_del_init(&req->rl_list);
        return req;
 }
 
        return req;
 }
 
index ad918c840fc719f7f1f23d04a93d0d0842b0ac30..b282d3f8cdd8004c6f0a6391434b6b94775c29f9 100644 (file)
@@ -341,6 +341,7 @@ enum {
 struct rpcrdma_buffer;
 struct rpcrdma_req {
        struct list_head        rl_list;
 struct rpcrdma_buffer;
 struct rpcrdma_req {
        struct list_head        rl_list;
+       __be32                  rl_xid;
        unsigned int            rl_mapped_sges;
        unsigned int            rl_connect_cookie;
        struct rpcrdma_buffer   *rl_buffer;
        unsigned int            rl_mapped_sges;
        unsigned int            rl_connect_cookie;
        struct rpcrdma_buffer   *rl_buffer;
@@ -402,6 +403,7 @@ struct rpcrdma_buffer {
        int                     rb_send_count, rb_recv_count;
        struct list_head        rb_send_bufs;
        struct list_head        rb_recv_bufs;
        int                     rb_send_count, rb_recv_count;
        struct list_head        rb_send_bufs;
        struct list_head        rb_recv_bufs;
+       struct list_head        rb_pending;
        u32                     rb_max_requests;
        atomic_t                rb_credits;     /* most recent credit grant */
 
        u32                     rb_max_requests;
        atomic_t                rb_credits;     /* most recent credit grant */
 
@@ -550,6 +552,34 @@ void rpcrdma_destroy_req(struct rpcrdma_req *);
 int rpcrdma_buffer_create(struct rpcrdma_xprt *);
 void rpcrdma_buffer_destroy(struct rpcrdma_buffer *);
 
 int rpcrdma_buffer_create(struct rpcrdma_xprt *);
 void rpcrdma_buffer_destroy(struct rpcrdma_buffer *);
 
+static inline void
+rpcrdma_insert_req(struct rpcrdma_buffer *buffers, struct rpcrdma_req *req)
+{
+       spin_lock(&buffers->rb_lock);
+       if (list_empty(&req->rl_list))
+               list_add_tail(&req->rl_list, &buffers->rb_pending);
+       spin_unlock(&buffers->rb_lock);
+}
+
+static inline struct rpcrdma_req *
+rpcrdma_lookup_req_locked(struct rpcrdma_buffer *buffers, __be32 xid)
+{
+       struct rpcrdma_req *pos;
+
+       list_for_each_entry(pos, &buffers->rb_pending, rl_list)
+               if (pos->rl_xid == xid)
+                       return pos;
+       return NULL;
+}
+
+static inline void
+rpcrdma_remove_req(struct rpcrdma_buffer *buffers, struct rpcrdma_req *req)
+{
+       spin_lock(&buffers->rb_lock);
+       list_del(&req->rl_list);
+       spin_unlock(&buffers->rb_lock);
+}
+
 struct rpcrdma_mw *rpcrdma_get_mw(struct rpcrdma_xprt *);
 void rpcrdma_put_mw(struct rpcrdma_xprt *, struct rpcrdma_mw *);
 struct rpcrdma_req *rpcrdma_buffer_get(struct rpcrdma_buffer *);
 struct rpcrdma_mw *rpcrdma_get_mw(struct rpcrdma_xprt *);
 void rpcrdma_put_mw(struct rpcrdma_xprt *, struct rpcrdma_mw *);
 struct rpcrdma_req *rpcrdma_buffer_get(struct rpcrdma_buffer *);