]> git.kernelconcepts.de Git - karo-tx-linux.git/commitdiff
rds: don't let RDS shutdown a connection while senders are present
authorChris Mason <chris.mason@oracle.com>
Tue, 11 May 2010 22:11:11 +0000 (15:11 -0700)
committerAndy Grover <andy.grover@oracle.com>
Thu, 9 Sep 2010 01:15:09 +0000 (18:15 -0700)
This is the first in a long line of patches that tries to fix races
between RDS connection shutdown and RDS traffic.

Here we are maintaining a count of active senders to make sure
the connection doesn't go away while they are using it.

Signed-off-by: Chris Mason <chris.mason@oracle.com>
net/rds/connection.c
net/rds/ib_recv.c
net/rds/message.c
net/rds/rds.h
net/rds/send.c

index 7e4e9dfdbc0b66863b191b54d375b477e822b4fa..9c249f394f29f03a08a00fa376f8943aed98e229 100644 (file)
@@ -148,6 +148,7 @@ static struct rds_connection *__rds_conn_create(__be32 laddr, __be32 faddr,
 
        spin_lock_init(&conn->c_send_lock);
        atomic_set(&conn->c_send_generation, 1);
+       atomic_set(&conn->c_senders, 0);
        INIT_LIST_HEAD(&conn->c_send_queue);
        INIT_LIST_HEAD(&conn->c_retrans);
 
@@ -276,6 +277,12 @@ void rds_conn_shutdown(struct rds_connection *conn)
                spin_lock_irq(&conn->c_send_lock);
                spin_unlock_irq(&conn->c_send_lock);
 
+               while(atomic_read(&conn->c_senders)) {
+                       schedule_timeout(1);
+                       spin_lock_irq(&conn->c_send_lock);
+                       spin_unlock_irq(&conn->c_send_lock);
+               }
+
                conn->c_trans->conn_shutdown(conn);
                rds_conn_reset(conn);
 
index 8f041f7954a28d11528037d3a79feba64b4a703f..24d14615f41a9f06e6a9e1f41103e7becc6b0efd 100644 (file)
@@ -863,18 +863,6 @@ int rds_ib_recv(struct rds_connection *conn)
        int ret = 0;
 
        rdsdebug("conn %p\n", conn);
-
-       /*
-        * If we get a temporary posting failure in this context then
-        * we're really low and we want the caller to back off for a bit.
-        */
-       mutex_lock(&ic->i_recv_mutex);
-       if (rds_ib_recv_refill(conn, 0))
-               ret = -ENOMEM;
-       else
-               rds_ib_stats_inc(s_ib_rx_refill_from_thread);
-       mutex_unlock(&ic->i_recv_mutex);
-
        if (rds_conn_up(conn))
                rds_ib_attempt_ack(ic);
 
index 96e2bf7dc77e85ec381a7982612cb2b2d441d001..84f937f11d475870710abbbfc746b06c4160b5f3 100644 (file)
@@ -81,7 +81,10 @@ static void rds_message_purge(struct rds_message *rm)
 void rds_message_put(struct rds_message *rm)
 {
        rdsdebug("put rm %p ref %d\n", rm, atomic_read(&rm->m_refcount));
-
+       if (atomic_read(&rm->m_refcount) == 0) {
+printk(KERN_CRIT "danger refcount zero on %p\n", rm);
+WARN_ON(1);
+       }
        if (atomic_dec_and_test(&rm->m_refcount)) {
                BUG_ON(!list_empty(&rm->m_sock_item));
                BUG_ON(!list_empty(&rm->m_conn_item));
index 241a0859d16e5505e02d7d1fe276e43500e345d9..4ab3d1aa0237f2ba53f3af7fb5c69780332da51a 100644 (file)
@@ -93,6 +93,7 @@ struct rds_connection {
 
        spinlock_t              c_send_lock;    /* protect send ring */
        atomic_t                c_send_generation;
+       atomic_t                c_senders;
        struct rds_message      *c_xmit_rm;
        unsigned long           c_xmit_sg;
        unsigned int            c_xmit_hdr_off;
index 8e3fd9981c2ea190d865d697a008ef54ccc61f38..d35c43ff792e70ed56dfa1bd372f791638bf921b 100644 (file)
@@ -60,15 +60,23 @@ void rds_send_reset(struct rds_connection *conn)
        struct rds_message *rm, *tmp;
        unsigned long flags;
 
+       spin_lock_irqsave(&conn->c_send_lock, flags);
        if (conn->c_xmit_rm) {
+               rm = conn->c_xmit_rm;
+               conn->c_xmit_rm = NULL;
                /* Tell the user the RDMA op is no longer mapped by the
                 * transport. This isn't entirely true (it's flushed out
                 * independently) but as the connection is down, there's
                 * no ongoing RDMA to/from that memory */
-               rds_message_unmapped(conn->c_xmit_rm);
-               rds_message_put(conn->c_xmit_rm);
-               conn->c_xmit_rm = NULL;
+printk(KERN_CRIT "send reset unmapping %p\n", rm);
+               rds_message_unmapped(rm);
+               spin_unlock_irqrestore(&conn->c_send_lock, flags);
+
+               rds_message_put(rm);
+       } else {
+               spin_unlock_irqrestore(&conn->c_send_lock, flags);
        }
+
        conn->c_xmit_sg = 0;
        conn->c_xmit_hdr_off = 0;
        conn->c_xmit_data_off = 0;
@@ -131,6 +139,7 @@ restart:
                ret = -ENOMEM;
                goto out;
        }
+       atomic_inc(&conn->c_senders);
 
        if (conn->c_trans->xmit_prepare)
                conn->c_trans->xmit_prepare(conn);
@@ -350,6 +359,8 @@ restart:
                rds_send_remove_from_sock(&to_be_dropped, RDS_RDMA_DROPPED);
        }
 
+       atomic_dec(&conn->c_senders);
+
        /*
         * Other senders will see we have c_send_lock and exit. We
         * need to recheck the send queue and race again for c_send_lock