]> git.kernelconcepts.de Git - karo-tx-linux.git/blobdiff - net/ceph/messenger.c
Merge tag 'v3.5-rc1'
[karo-tx-linux.git] / net / ceph / messenger.c
index 98ca23726ea675990de0ea60fc74f5241add99e3..5e9f61d6d2340ef932068dc88d65609b593d4f63 100644 (file)
@@ -321,6 +321,7 @@ static int ceph_tcp_connect(struct ceph_connection *con)
 
        dout("connect %s\n", ceph_pr_addr(&con->peer_addr.in_addr));
 
+       con_sock_state_connecting(con);
        ret = sock->ops->connect(sock, (struct sockaddr *)paddr, sizeof(*paddr),
                                 O_NONBLOCK);
        if (ret == -EINPROGRESS) {
@@ -336,8 +337,6 @@ static int ceph_tcp_connect(struct ceph_connection *con)
                return ret;
        }
        con->sock = sock;
-       con_sock_state_connecting(con);
-
        return 0;
 }
 
@@ -414,6 +413,10 @@ static int con_close_socket(struct ceph_connection *con)
 static void ceph_msg_remove(struct ceph_msg *msg)
 {
        list_del_init(&msg->list_head);
+       BUG_ON(msg->con == NULL);
+       ceph_con_put(msg->con);
+       msg->con = NULL;
+
        ceph_msg_put(msg);
 }
 static void ceph_msg_remove_list(struct list_head *head)
@@ -433,8 +436,11 @@ static void reset_connection(struct ceph_connection *con)
        ceph_msg_remove_list(&con->out_sent);
 
        if (con->in_msg) {
+               BUG_ON(con->in_msg->con != con);
+               con->in_msg->con = NULL;
                ceph_msg_put(con->in_msg);
                con->in_msg = NULL;
+               ceph_con_put(con->in_msg->con);
        }
 
        con->connect_seq = 0;
@@ -625,8 +631,10 @@ static void prepare_write_message(struct ceph_connection *con)
                        &con->out_temp_ack);
        }
 
+       BUG_ON(list_empty(&con->out_queue));
        m = list_first_entry(&con->out_queue, struct ceph_msg, list_head);
        con->out_msg = m;
+       BUG_ON(m->con != con);
 
        /* put message on sent list */
        ceph_msg_get(m);
@@ -640,6 +648,10 @@ static void prepare_write_message(struct ceph_connection *con)
                m->hdr.seq = cpu_to_le64(++con->out_seq);
                m->needs_out_seq = false;
        }
+#ifdef CONFIG_BLOCK
+       else
+               m->bio_iter = NULL;
+#endif
 
        dout("prepare_write_message %p seq %lld type %d len %d+%d+%d %d pgs\n",
             m, con->out_seq, le16_to_cpu(m->hdr.type),
@@ -777,7 +789,7 @@ static void prepare_write_banner(struct ceph_connection *con)
 
 static int prepare_write_connect(struct ceph_connection *con)
 {
-       unsigned global_seq = get_global_seq(con->msgr, 0);
+       unsigned int global_seq = get_global_seq(con->msgr, 0);
        int proto;
        int auth_proto;
        struct ceph_auth_handshake *auth;
@@ -905,7 +917,7 @@ static void iter_bio_next(struct bio **bio_iter, int *seg)
 static int write_partial_msg_pages(struct ceph_connection *con)
 {
        struct ceph_msg *msg = con->out_msg;
-       unsigned data_len = le32_to_cpu(msg->hdr.data_len);
+       unsigned int data_len = le32_to_cpu(msg->hdr.data_len);
        size_t len;
        bool do_datacrc = !con->msgr->nocrc;
        int ret;
@@ -1661,7 +1673,7 @@ static bool ceph_con_in_msg_alloc(struct ceph_connection *con,
 
 static int read_partial_message_pages(struct ceph_connection *con,
                                      struct page **pages,
-                                     unsigned data_len, bool do_datacrc)
+                                     unsigned int data_len, bool do_datacrc)
 {
        void *p;
        int ret;
@@ -1694,7 +1706,7 @@ static int read_partial_message_pages(struct ceph_connection *con,
 #ifdef CONFIG_BLOCK
 static int read_partial_message_bio(struct ceph_connection *con,
                                    struct bio **bio_iter, int *bio_seg,
-                                   unsigned data_len, bool do_datacrc)
+                                   unsigned int data_len, bool do_datacrc)
 {
        struct bio_vec *bv = bio_iovec_idx(*bio_iter, *bio_seg);
        void *p;
@@ -1737,7 +1749,7 @@ static int read_partial_message(struct ceph_connection *con)
        int size;
        int end;
        int ret;
-       unsigned front_len, middle_len, data_len;
+       unsigned int front_len, middle_len, data_len;
        bool do_datacrc = !con->msgr->nocrc;
        u64 seq;
        u32 crc;
@@ -1806,6 +1818,8 @@ static int read_partial_message(struct ceph_connection *con)
                                "error allocating memory for incoming message";
                        return -ENOMEM;
                }
+
+               BUG_ON(con->in_msg->con != con);
                m = con->in_msg;
                m->front.iov_len = 0;    /* haven't read it yet */
                if (m->middle)
@@ -1901,8 +1915,11 @@ static void process_message(struct ceph_connection *con)
 {
        struct ceph_msg *msg;
 
+       BUG_ON(con->in_msg->con != con);
+       con->in_msg->con = NULL;
        msg = con->in_msg;
        con->in_msg = NULL;
+       ceph_con_put(con);
 
        /* if first message, set peer_name */
        if (con->peer_name.type == 0)
@@ -2260,8 +2277,11 @@ static void ceph_fault(struct ceph_connection *con)
        con_close_socket(con);
 
        if (con->in_msg) {
+               BUG_ON(con->in_msg->con != con);
+               con->in_msg->con = NULL;
                ceph_msg_put(con->in_msg);
                con->in_msg = NULL;
+               ceph_con_put(con);
        }
 
        /* Requeue anything that hasn't been acked */
@@ -2378,6 +2398,11 @@ void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg)
 
        /* queue */
        mutex_lock(&con->mutex);
+
+       BUG_ON(msg->con != NULL);
+       msg->con = ceph_con_get(con);
+       BUG_ON(msg->con == NULL);
+
        BUG_ON(!list_empty(&msg->list_head));
        list_add_tail(&msg->list_head, &con->out_queue);
        dout("----- %p to %s%lld %d=%s len %d+%d+%d -----\n", msg,
@@ -2399,24 +2424,34 @@ EXPORT_SYMBOL(ceph_con_send);
 /*
  * Revoke a message that was previously queued for send
  */
-void ceph_con_revoke(struct ceph_connection *con, struct ceph_msg *msg)
+void ceph_msg_revoke(struct ceph_msg *msg)
 {
+       struct ceph_connection *con = msg->con;
+
+       if (!con)
+               return;         /* Message not in our possession */
+
        mutex_lock(&con->mutex);
        if (!list_empty(&msg->list_head)) {
-               dout("con_revoke %p msg %p - was on queue\n", con, msg);
+               dout("%s %p msg %p - was on queue\n", __func__, con, msg);
                list_del_init(&msg->list_head);
-               ceph_msg_put(msg);
+               BUG_ON(msg->con == NULL);
+               ceph_con_put(msg->con);
+               msg->con = NULL;
                msg->hdr.seq = 0;
+
+               ceph_msg_put(msg);
        }
        if (con->out_msg == msg) {
-               dout("con_revoke %p msg %p - was sending\n", con, msg);
+               dout("%s %p msg %p - was sending\n", __func__, con, msg);
                con->out_msg = NULL;
                if (con->out_kvec_is_msg) {
                        con->out_skip = con->out_kvec_bytes;
                        con->out_kvec_is_msg = false;
                }
-               ceph_msg_put(msg);
                msg->hdr.seq = 0;
+
+               ceph_msg_put(msg);
        }
        mutex_unlock(&con->mutex);
 }
@@ -2424,17 +2459,27 @@ void ceph_con_revoke(struct ceph_connection *con, struct ceph_msg *msg)
 /*
  * Revoke a message that we may be reading data into
  */
-void ceph_con_revoke_message(struct ceph_connection *con, struct ceph_msg *msg)
+void ceph_msg_revoke_incoming(struct ceph_msg *msg)
 {
+       struct ceph_connection *con;
+
+       BUG_ON(msg == NULL);
+       if (!msg->con) {
+               dout("%s msg %p null con\n", __func__, msg);
+
+               return;         /* Message not in our possession */
+       }
+
+       con = msg->con;
        mutex_lock(&con->mutex);
-       if (con->in_msg && con->in_msg == msg) {
-               unsigned front_len = le32_to_cpu(con->in_hdr.front_len);
-               unsigned middle_len = le32_to_cpu(con->in_hdr.middle_len);
-               unsigned data_len = le32_to_cpu(con->in_hdr.data_len);
+       if (con->in_msg == msg) {
+               unsigned int front_len = le32_to_cpu(con->in_hdr.front_len);
+               unsigned int middle_len = le32_to_cpu(con->in_hdr.middle_len);
+               unsigned int data_len = le32_to_cpu(con->in_hdr.data_len);
 
                /* skip rest of message */
-               dout("con_revoke_pages %p msg %p revoked\n", con, msg);
-                       con->in_base_pos = con->in_base_pos -
+               dout("%s %p msg %p revoked\n", __func__, con, msg);
+               con->in_base_pos = con->in_base_pos -
                                sizeof(struct ceph_msg_header) -
                                front_len -
                                middle_len -
@@ -2445,8 +2490,8 @@ void ceph_con_revoke_message(struct ceph_connection *con, struct ceph_msg *msg)
                con->in_tag = CEPH_MSGR_TAG_READY;
                con->in_seq++;
        } else {
-               dout("con_revoke_pages %p msg %p pages %p no-op\n",
-                    con, con->in_msg, msg);
+               dout("%s %p in_msg %p msg %p no-op\n",
+                    __func__, con, con->in_msg, msg);
        }
        mutex_unlock(&con->mutex);
 }
@@ -2478,6 +2523,8 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags,
        if (m == NULL)
                goto out;
        kref_init(&m->kref);
+
+       m->con = NULL;
        INIT_LIST_HEAD(&m->list_head);
 
        m->hdr.tid = 0;
@@ -2598,6 +2645,10 @@ static bool ceph_con_in_msg_alloc(struct ceph_connection *con,
                mutex_unlock(&con->mutex);
                con->in_msg = con->ops->alloc_msg(con, hdr, &skip);
                mutex_lock(&con->mutex);
+               if (con->in_msg) {
+                       con->in_msg->con = ceph_con_get(con);
+                       BUG_ON(con->in_msg->con == NULL);
+               }
                if (skip)
                        con->in_msg = NULL;
 
@@ -2611,6 +2662,8 @@ static bool ceph_con_in_msg_alloc(struct ceph_connection *con,
                               type, front_len);
                        return false;
                }
+               con->in_msg->con = ceph_con_get(con);
+               BUG_ON(con->in_msg->con == NULL);
                con->in_msg->page_alignment = le16_to_cpu(hdr->data_off);
        }
        memcpy(&con->in_msg->hdr, &con->in_hdr, sizeof(con->in_hdr));