]> git.kernelconcepts.de Git - karo-tx-linux.git/commitdiff
libceph: update osd request/reply encoding
authorSage Weil <sage@inktank.com>
Tue, 26 Feb 2013 00:11:12 +0000 (16:11 -0800)
committerSage Weil <sage@inktank.com>
Tue, 26 Feb 2013 23:02:50 +0000 (15:02 -0800)
Use the new version of the encoding for osd requests and replies.  In the
process, update the way we are tracking request ops and reply lengths and
results in the struct ceph_osd_request.  Update the rbd and fs/ceph users
appropriately.

The main changes are:
 - we keep pointers into the request memory for fields we need to update
   each time the request is sent out over the wire
 - we keep information about the result in an array in the request struct
   where the users can easily get at it.

Signed-off-by: Sage Weil <sage@inktank.com>
Reviewed-by: Alex Elder <elder@inktank.com>
drivers/block/rbd.c
fs/ceph/addr.c
include/linux/ceph/osd_client.h
include/linux/ceph/rados.h
net/ceph/debugfs.c
net/ceph/osd_client.c

index 22085e86a4097064d4e7c78cc4ccf7473a240d53..6c81a4c040b987e75d4a8b0bb5e4ff65e2108dba 100644 (file)
@@ -196,7 +196,7 @@ struct rbd_obj_request {
 
        u64                     xferred;        /* bytes transferred */
        u64                     version;
-       s32                     result;
+       int                     result;
        atomic_t                done;
 
        rbd_obj_callback_t      callback;
@@ -1282,12 +1282,19 @@ static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request)
 
 static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
 {
-
        dout("%s: obj %p result %d %llu/%llu\n", __func__, obj_request,
                obj_request->result, obj_request->xferred, obj_request->length);
-       if (obj_request->result == (s32) -ENOENT) {
+       /*
+        * ENOENT means a hole in the object.  We zero-fill the
+        * entire length of the request.  A short read also implies
+        * zero-fill to the end of the request.  Either way we
+        * update the xferred count to indicate the whole request
+        * was satisfied.
+        */
+       if (obj_request->result == -ENOENT) {
                zero_bio_chain(obj_request->bio_list, 0);
                obj_request->result = 0;
+               obj_request->xferred = obj_request->length;
        } else if (obj_request->xferred < obj_request->length &&
                        !obj_request->result) {
                zero_bio_chain(obj_request->bio_list, obj_request->xferred);
@@ -1298,20 +1305,14 @@ static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
 
 static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
 {
-       dout("%s: obj %p result %d %llu/%llu\n", __func__, obj_request,
-               obj_request->result, obj_request->xferred, obj_request->length);
-
-       /* A short write really shouldn't occur.  Warn if we see one */
-
-       if (obj_request->xferred != obj_request->length) {
-               struct rbd_img_request *img_request = obj_request->img_request;
-               struct rbd_device *rbd_dev;
-
-               rbd_dev = img_request ? img_request->rbd_dev : NULL;
-               rbd_warn(rbd_dev, "wrote %llu want %llu\n",
-                       obj_request->xferred, obj_request->length);
-       }
-
+       dout("%s: obj %p result %d %llu\n", __func__, obj_request,
+               obj_request->result, obj_request->length);
+       /*
+        * There is no such thing as a successful short write.
+        * Our xferred value is the number of bytes transferred
+        * back.  Set it to our originally-requested length.
+        */
+       obj_request->xferred = obj_request->length;
        obj_request_done_set(obj_request);
 }
 
@@ -1329,9 +1330,6 @@ static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
                                struct ceph_msg *msg)
 {
        struct rbd_obj_request *obj_request = osd_req->r_priv;
-       struct ceph_osd_reply_head *reply_head;
-       struct ceph_osd_op *op;
-       u32 num_ops;
        u16 opcode;
 
        dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg);
@@ -1339,22 +1337,19 @@ static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
        rbd_assert(!!obj_request->img_request ^
                                (obj_request->which == BAD_WHICH));
 
-       reply_head = msg->front.iov_base;
-       obj_request->result = (s32) le32_to_cpu(reply_head->result);
+       if (osd_req->r_result < 0)
+               obj_request->result = osd_req->r_result;
        obj_request->version = le64_to_cpu(osd_req->r_reassert_version.version);
 
-       num_ops = le32_to_cpu(reply_head->num_ops);
-       WARN_ON(num_ops != 1);  /* For now */
+       WARN_ON(osd_req->r_num_ops != 1);       /* For now */
 
        /*
         * We support a 64-bit length, but ultimately it has to be
         * passed to blk_end_request(), which takes an unsigned int.
         */
-       op = &reply_head->ops[0];
-       obj_request->xferred = le64_to_cpu(op->extent.length);
+       obj_request->xferred = osd_req->r_reply_op_len[0];
        rbd_assert(obj_request->xferred < (u64) UINT_MAX);
-
-       opcode = le16_to_cpu(op->op);
+       opcode = osd_req->r_request_ops[0].op;
        switch (opcode) {
        case CEPH_OSD_OP_READ:
                rbd_osd_read_callback(obj_request);
@@ -1719,6 +1714,7 @@ static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
                more = blk_end_request(img_request->rq, result, xferred);
                which++;
        }
+
        rbd_assert(more ^ (which == img_request->obj_request_count));
        img_request->next_completion = which;
 out:
index fc613715af46cd5992e6922c465d3e3cfb8e9279..cfef3e01a9b3a4b1816de96a930984bf7d32c142 100644 (file)
@@ -236,16 +236,10 @@ static int ceph_readpage(struct file *filp, struct page *page)
 static void finish_read(struct ceph_osd_request *req, struct ceph_msg *msg)
 {
        struct inode *inode = req->r_inode;
-       struct ceph_osd_reply_head *replyhead;
-       int rc, bytes;
+       int rc = req->r_result;
+       int bytes = le32_to_cpu(msg->hdr.data_len);
        int i;
 
-       /* parse reply */
-       replyhead = msg->front.iov_base;
-       WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
-       rc = le32_to_cpu(replyhead->result);
-       bytes = le32_to_cpu(msg->hdr.data_len);
-
        dout("finish_read %p req %p rc %d bytes %d\n", inode, req, rc, bytes);
 
        /* unlock all pages, zeroing any data we didn't read */
@@ -553,27 +547,18 @@ static void writepages_finish(struct ceph_osd_request *req,
                              struct ceph_msg *msg)
 {
        struct inode *inode = req->r_inode;
-       struct ceph_osd_reply_head *replyhead;
-       struct ceph_osd_op *op;
        struct ceph_inode_info *ci = ceph_inode(inode);
        unsigned wrote;
        struct page *page;
        int i;
        struct ceph_snap_context *snapc = req->r_snapc;
        struct address_space *mapping = inode->i_mapping;
-       __s32 rc = -EIO;
-       u64 bytes = 0;
+       int rc = req->r_result;
+       u64 bytes = le64_to_cpu(req->r_request_ops[0].extent.length);
        struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
        long writeback_stat;
        unsigned issued = ceph_caps_issued(ci);
 
-       /* parse reply */
-       replyhead = msg->front.iov_base;
-       WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
-       op = (void *)(replyhead + 1);
-       rc = le32_to_cpu(replyhead->result);
-       bytes = le64_to_cpu(op->extent.length);
-
        if (rc >= 0) {
                /*
                 * Assume we wrote the pages we originally sent.  The
@@ -740,8 +725,6 @@ retry:
                struct page *page;
                int want;
                u64 offset, len;
-               struct ceph_osd_request_head *reqhead;
-               struct ceph_osd_op *op;
                long writeback_stat;
 
                next = 0;
@@ -905,10 +888,8 @@ get_more_pages:
 
                /* revise final length, page count */
                req->r_num_pages = locked_pages;
-               reqhead = req->r_request->front.iov_base;
-               op = (void *)(reqhead + 1);
-               op->extent.length = cpu_to_le64(len);
-               op->payload_len = cpu_to_le32(len);
+               req->r_request_ops[0].extent.length = cpu_to_le64(len);
+               req->r_request_ops[0].payload_len = cpu_to_le32(len);
                req->r_request->hdr.data_len = cpu_to_le32(len);
 
                rc = ceph_osdc_start_request(&fsc->client->osdc, req, true);
index ad8899fc3157c58edf3737d3fca1b2e392657ef3..1dd5d466b6f9f01980c8e3efb7d905395f6b3fde 100644 (file)
@@ -47,6 +47,9 @@ struct ceph_osd {
        struct list_head o_keepalive_item;
 };
 
+
+#define CEPH_OSD_MAX_OP 10
+
 /* an in-flight request */
 struct ceph_osd_request {
        u64             r_tid;              /* unique for this client */
@@ -63,9 +66,23 @@ struct ceph_osd_request {
        struct ceph_connection *r_con_filling_msg;
 
        struct ceph_msg  *r_request, *r_reply;
-       int               r_result;
        int               r_flags;     /* any additional flags for the osd */
        u32               r_sent;      /* >0 if r_request is sending/sent */
+       int               r_num_ops;
+
+       /* encoded message content */
+       struct ceph_osd_op *r_request_ops;
+       /* these are updated on each send */
+       __le32           *r_request_osdmap_epoch;
+       __le32           *r_request_flags;
+       __le64           *r_request_pool;
+       void             *r_request_pgid;
+       __le32           *r_request_attempts;
+       struct ceph_eversion *r_request_reassert_version;
+
+       int               r_result;
+       int               r_reply_op_len[CEPH_OSD_MAX_OP];
+       s32               r_reply_op_result[CEPH_OSD_MAX_OP];
        int               r_got_reply;
        int               r_linger;
 
index d784c8dfb09ac7f0eca04eda422507d69428a7e7..68c96a508ac20f8ff3aff08b7e29ff9bdf86bccf 100644 (file)
@@ -416,43 +416,5 @@ struct ceph_osd_op {
        __le32 payload_len;
 } __attribute__ ((packed));
 
-/*
- * osd request message header.  each request may include multiple
- * ceph_osd_op object operations.
- */
-struct ceph_osd_request_head {
-       __le32 client_inc;                 /* client incarnation */
-       struct ceph_object_layout layout;  /* pgid */
-       __le32 osdmap_epoch;               /* client's osdmap epoch */
-
-       __le32 flags;
-
-       struct ceph_timespec mtime;        /* for mutations only */
-       struct ceph_eversion reassert_version; /* if we are replaying op */
-
-       __le32 object_len;     /* length of object name */
-
-       __le64 snapid;         /* snapid to read */
-       __le64 snap_seq;       /* writer's snap context */
-       __le32 num_snaps;
-
-       __le16 num_ops;
-       struct ceph_osd_op ops[];  /* followed by ops[], obj, ticket, snaps */
-} __attribute__ ((packed));
-
-struct ceph_osd_reply_head {
-       __le32 client_inc;                /* client incarnation */
-       __le32 flags;
-       struct ceph_object_layout layout;
-       __le32 osdmap_epoch;
-       struct ceph_eversion reassert_version; /* for replaying uncommitted */
-
-       __le32 result;                    /* result code */
-
-       __le32 object_len;                /* length of object name */
-       __le32 num_ops;
-       struct ceph_osd_op ops[0];  /* ops[], object */
-} __attribute__ ((packed));
-
 
 #endif
index f4d4b27d6026dbd01324c50abc6ae6a9fc28f77b..00d051f4894e2e0c690fcb8420263ab1aa505ca8 100644 (file)
@@ -123,10 +123,7 @@ static int osdc_show(struct seq_file *s, void *pp)
        mutex_lock(&osdc->request_mutex);
        for (p = rb_first(&osdc->requests); p; p = rb_next(p)) {
                struct ceph_osd_request *req;
-               struct ceph_osd_request_head *head;
-               struct ceph_osd_op *op;
-               int num_ops;
-               int opcode, olen;
+               int opcode;
                int i;
 
                req = rb_entry(p, struct ceph_osd_request, r_node);
@@ -135,13 +132,7 @@ static int osdc_show(struct seq_file *s, void *pp)
                           req->r_osd ? req->r_osd->o_osd : -1,
                           req->r_pgid.pool, req->r_pgid.seed);
 
-               head = req->r_request->front.iov_base;
-               op = (void *)(head + 1);
-
-               num_ops = le16_to_cpu(head->num_ops);
-               olen = le32_to_cpu(head->object_len);
-               seq_printf(s, "%.*s", olen,
-                          (const char *)(head->ops + num_ops));
+               seq_printf(s, "%.*s", req->r_oid_len, req->r_oid);
 
                if (req->r_reassert_version.epoch)
                        seq_printf(s, "\t%u'%llu",
@@ -150,10 +141,9 @@ static int osdc_show(struct seq_file *s, void *pp)
                else
                        seq_printf(s, "\t");
 
-               for (i = 0; i < num_ops; i++) {
-                       opcode = le16_to_cpu(op->op);
+               for (i = 0; i < req->r_num_ops; i++) {
+                       opcode = le16_to_cpu(req->r_request_ops[i].op);
                        seq_printf(s, "\t%s", ceph_osd_op_name(opcode));
-                       op++;
                }
 
                seq_printf(s, "\n");
index 5584f0a08e280fde9061f513464dfa580aea6ee4..d730dd4d8eb28bc421babb138f187605ee7c0ec6 100644 (file)
@@ -146,15 +146,23 @@ EXPORT_SYMBOL(ceph_osdc_release_request);
 
 struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
                                               struct ceph_snap_context *snapc,
-                                              unsigned int num_op,
+                                              unsigned int num_ops,
                                               bool use_mempool,
                                               gfp_t gfp_flags)
 {
        struct ceph_osd_request *req;
        struct ceph_msg *msg;
-       size_t msg_size = sizeof(struct ceph_osd_request_head);
-
-       msg_size += num_op*sizeof(struct ceph_osd_op);
+       size_t msg_size;
+
+       msg_size = 4 + 4 + 8 + 8 + 4+8;
+       msg_size += 2 + 4 + 8 + 4 + 4; /* oloc */
+       msg_size += 1 + 8 + 4 + 4;     /* pg_t */
+       msg_size += 4 + MAX_OBJ_NAME_SIZE;
+       msg_size += 2 + num_ops*sizeof(struct ceph_osd_op);
+       msg_size += 8;  /* snapid */
+       msg_size += 8;  /* snap_seq */
+       msg_size += 8 * (snapc ? snapc->num_snaps : 0);  /* snaps */
+       msg_size += 4;
 
        if (use_mempool) {
                req = mempool_alloc(osdc->req_mempool, gfp_flags);
@@ -193,9 +201,6 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
        ceph_pagelist_init(&req->r_trail);
 
        /* create request message; allow space for oid */
-       msg_size += MAX_OBJ_NAME_SIZE;
-       if (snapc)
-               msg_size += sizeof(u64) * snapc->num_snaps;
        if (use_mempool)
                msg = ceph_msgpool_get(&osdc->msgpool_op, 0);
        else
@@ -324,55 +329,80 @@ static void osd_req_encode_op(struct ceph_osd_request *req,
  *
  */
 void ceph_osdc_build_request(struct ceph_osd_request *req,
-                            u64 off, u64 len, unsigned int num_op,
+                            u64 off, u64 len, unsigned int num_ops,
                             struct ceph_osd_req_op *src_ops,
                             struct ceph_snap_context *snapc, u64 snap_id,
                             struct timespec *mtime)
 {
        struct ceph_msg *msg = req->r_request;
-       struct ceph_osd_request_head *head;
        struct ceph_osd_req_op *src_op;
-       struct ceph_osd_op *op;
        void *p;
-       size_t msg_size = sizeof(*head) + num_op*sizeof(*op);
+       size_t msg_size;
        int flags = req->r_flags;
        u64 data_len;
        int i;
 
-       WARN_ON((flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE)) == 0);
-
-       head = msg->front.iov_base;
-       head->snapid = cpu_to_le64(snap_id);
-       op = (void *)(head + 1);
-       p = (void *)(op + num_op);
-
+       req->r_num_ops = num_ops;
+       req->r_snapid = snap_id;
        req->r_snapc = ceph_get_snap_context(snapc);
 
-       head->client_inc = cpu_to_le32(1); /* always, for now. */
-       head->flags = cpu_to_le32(flags);
-       if (flags & CEPH_OSD_FLAG_WRITE)
-               ceph_encode_timespec(&head->mtime, mtime);
-       BUG_ON(num_op > (unsigned int) ((u16) -1));
-       head->num_ops = cpu_to_le16(num_op);
+       /* encode request */
+       msg->hdr.version = cpu_to_le16(4);
 
-       /* fill in oid */
-       head->object_len = cpu_to_le32(req->r_oid_len);
+       p = msg->front.iov_base;
+       ceph_encode_32(&p, 1);   /* client_inc  is always 1 */
+       req->r_request_osdmap_epoch = p;
+       p += 4;
+       req->r_request_flags = p;
+       p += 4;
+       if (req->r_flags & CEPH_OSD_FLAG_WRITE)
+               ceph_encode_timespec(p, mtime);
+       p += sizeof(struct ceph_timespec);
+       req->r_request_reassert_version = p;
+       p += sizeof(struct ceph_eversion); /* will get filled in */
+
+       /* oloc */
+       ceph_encode_8(&p, 4);
+       ceph_encode_8(&p, 4);
+       ceph_encode_32(&p, 8 + 4 + 4);
+       req->r_request_pool = p;
+       p += 8;
+       ceph_encode_32(&p, -1);  /* preferred */
+       ceph_encode_32(&p, 0);   /* key len */
+
+       ceph_encode_8(&p, 1);
+       req->r_request_pgid = p;
+       p += 8 + 4;
+       ceph_encode_32(&p, -1);  /* preferred */
+
+       /* oid */
+       ceph_encode_32(&p, req->r_oid_len);
        memcpy(p, req->r_oid, req->r_oid_len);
+       dout("oid '%.*s' len %d\n", req->r_oid_len, req->r_oid, req->r_oid_len);
        p += req->r_oid_len;
 
+       /* ops */
+       ceph_encode_16(&p, num_ops);
        src_op = src_ops;
-       while (num_op--)
-               osd_req_encode_op(req, op++, src_op++);
+       req->r_request_ops = p;
+       for (i = 0; i < num_ops; i++, src_op++) {
+               osd_req_encode_op(req, p, src_op);
+               p += sizeof(struct ceph_osd_op);
+       }
 
-       if (snapc) {
-               head->snap_seq = cpu_to_le64(snapc->seq);
-               head->num_snaps = cpu_to_le32(snapc->num_snaps);
+       /* snaps */
+       ceph_encode_64(&p, req->r_snapid);
+       ceph_encode_64(&p, req->r_snapc ? req->r_snapc->seq : 0);
+       ceph_encode_32(&p, req->r_snapc ? req->r_snapc->num_snaps : 0);
+       if (req->r_snapc) {
                for (i = 0; i < snapc->num_snaps; i++) {
-                       put_unaligned_le64(snapc->snaps[i], p);
-                       p += sizeof(u64);
+                       ceph_encode_64(&p, req->r_snapc->snaps[i]);
                }
        }
 
+       req->r_request_attempts = p;
+       p += 4;
+
        data_len = req->r_trail.length;
        if (flags & CEPH_OSD_FLAG_WRITE) {
                req->r_request->hdr.data_off = cpu_to_le16(off);
@@ -385,6 +415,9 @@ void ceph_osdc_build_request(struct ceph_osd_request *req,
        msg_size = p - msg->front.iov_base;
        msg->front.iov_len = msg_size;
        msg->hdr.front_len = cpu_to_le32(msg_size);
+
+       dout("build_request msg_size was %d num_ops %d\n", (int)msg_size,
+            num_ops);
        return;
 }
 EXPORT_SYMBOL(ceph_osdc_build_request);
@@ -991,21 +1024,22 @@ out:
 static void __send_request(struct ceph_osd_client *osdc,
                           struct ceph_osd_request *req)
 {
-       struct ceph_osd_request_head *reqhead;
-
-       dout("send_request %p tid %llu to osd%d flags %d\n",
-            req, req->r_tid, req->r_osd->o_osd, req->r_flags);
-
-       reqhead = req->r_request->front.iov_base;
-       reqhead->snapid = cpu_to_le64(req->r_snapid);
-       reqhead->osdmap_epoch = cpu_to_le32(osdc->osdmap->epoch);
-       reqhead->flags |= cpu_to_le32(req->r_flags);  /* e.g., RETRY */
-       reqhead->reassert_version = req->r_reassert_version;
+       void *p;
 
-       reqhead->layout.ol_pgid.ps = cpu_to_le16(req->r_pgid.seed);
-       reqhead->layout.ol_pgid.pool = cpu_to_le32(req->r_pgid.pool);
-       reqhead->layout.ol_pgid.preferred = cpu_to_le16(-1);
-       reqhead->layout.ol_stripe_unit = 0;
+       dout("send_request %p tid %llu to osd%d flags %d pg %lld.%x\n",
+            req, req->r_tid, req->r_osd->o_osd, req->r_flags,
+            (unsigned long long)req->r_pgid.pool, req->r_pgid.seed);
+
+       /* fill in message content that changes each time we send it */
+       put_unaligned_le32(osdc->osdmap->epoch, req->r_request_osdmap_epoch);
+       put_unaligned_le32(req->r_flags, req->r_request_flags);
+       put_unaligned_le64(req->r_pgid.pool, req->r_request_pool);
+       p = req->r_request_pgid;
+       ceph_encode_64(&p, req->r_pgid.pool);
+       ceph_encode_32(&p, req->r_pgid.seed);
+       put_unaligned_le64(1, req->r_request_attempts);  /* FIXME */
+       memcpy(req->r_request_reassert_version, &req->r_reassert_version,
+              sizeof(req->r_reassert_version));
 
        req->r_stamp = jiffies;
        list_move_tail(&req->r_req_lru_item, &osdc->req_lru);
@@ -1105,6 +1139,26 @@ static void complete_request(struct ceph_osd_request *req)
        complete_all(&req->r_safe_completion);  /* fsync waiter */
 }
 
+static int __decode_pgid(void **p, void *end, struct ceph_pg *pgid)
+{
+       __u8 v;
+
+       ceph_decode_need(p, end, 1 + 8 + 4 + 4, bad);
+       v = ceph_decode_8(p);
+       if (v > 1) {
+               pr_warning("do not understand pg encoding %d > 1", v);
+               return -EINVAL;
+       }
+       pgid->pool = ceph_decode_64(p);
+       pgid->seed = ceph_decode_32(p);
+       *p += 4;
+       return 0;
+
+bad:
+       pr_warning("incomplete pg encoding");
+       return -EINVAL;
+}
+
 /*
  * handle osd op reply.  either call the callback if it is specified,
  * or do the completion to wake up the waiting thread.
@@ -1112,22 +1166,42 @@ static void complete_request(struct ceph_osd_request *req)
 static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
                         struct ceph_connection *con)
 {
-       struct ceph_osd_reply_head *rhead = msg->front.iov_base;
+       void *p, *end;
        struct ceph_osd_request *req;
        u64 tid;
-       int numops, object_len, flags;
+       int object_len;
+       int numops, payload_len, flags;
        s32 result;
+       s32 retry_attempt;
+       struct ceph_pg pg;
+       int err;
+       u32 reassert_epoch;
+       u64 reassert_version;
+       u32 osdmap_epoch;
+       int i;
 
        tid = le64_to_cpu(msg->hdr.tid);
-       if (msg->front.iov_len < sizeof(*rhead))
-               goto bad;
-       numops = le32_to_cpu(rhead->num_ops);
-       object_len = le32_to_cpu(rhead->object_len);
-       result = le32_to_cpu(rhead->result);
-       if (msg->front.iov_len != sizeof(*rhead) + object_len +
-           numops * sizeof(struct ceph_osd_op))
+       dout("handle_reply %p tid %llu\n", msg, tid);
+
+       p = msg->front.iov_base;
+       end = p + msg->front.iov_len;
+
+       ceph_decode_need(&p, end, 4, bad);
+       object_len = ceph_decode_32(&p);
+       ceph_decode_need(&p, end, object_len, bad);
+       p += object_len;
+
+       err = __decode_pgid(&p, end, &pg);
+       if (err)
                goto bad;
-       dout("handle_reply %p tid %llu result %d\n", msg, tid, (int)result);
+
+       ceph_decode_need(&p, end, 8 + 4 + 4 + 8 + 4, bad);
+       flags = ceph_decode_64(&p);
+       result = ceph_decode_32(&p);
+       reassert_epoch = ceph_decode_32(&p);
+       reassert_version = ceph_decode_64(&p);
+       osdmap_epoch = ceph_decode_32(&p);
+
        /* lookup */
        mutex_lock(&osdc->request_mutex);
        req = __lookup_request(osdc, tid);
@@ -1137,7 +1211,38 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
                return;
        }
        ceph_osdc_get_request(req);
-       flags = le32_to_cpu(rhead->flags);
+
+       dout("handle_reply %p tid %llu req %p result %d\n", msg, tid,
+            req, result);
+
+       ceph_decode_need(&p, end, 4, bad);
+       numops = ceph_decode_32(&p);
+       if (numops > CEPH_OSD_MAX_OP)
+               goto bad_put;
+       if (numops != req->r_num_ops)
+               goto bad_put;
+       payload_len = 0;
+       ceph_decode_need(&p, end, numops * sizeof(struct ceph_osd_op), bad);
+       for (i = 0; i < numops; i++) {
+               struct ceph_osd_op *op = p;
+               int len;
+
+               len = le32_to_cpu(op->payload_len);
+               req->r_reply_op_len[i] = len;
+               dout(" op %d has %d bytes\n", i, len);
+               payload_len += len;
+               p += sizeof(*op);
+       }
+       if (payload_len != le32_to_cpu(msg->hdr.data_len)) {
+               pr_warning("sum of op payload lens %d != data_len %d",
+                          payload_len, le32_to_cpu(msg->hdr.data_len));
+               goto bad_put;
+       }
+
+       ceph_decode_need(&p, end, 4 + numops * 4, bad);
+       retry_attempt = ceph_decode_32(&p);
+       for (i = 0; i < numops; i++)
+               req->r_reply_op_result[i] = ceph_decode_32(&p);
 
        /*
         * if this connection filled our message, drop our reference now, to
@@ -1152,7 +1257,7 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
        if (!req->r_got_reply) {
                unsigned int bytes;
 
-               req->r_result = le32_to_cpu(rhead->result);
+               req->r_result = result;
                bytes = le32_to_cpu(msg->hdr.data_len);
                dout("handle_reply result %d bytes %d\n", req->r_result,
                     bytes);
@@ -1160,7 +1265,8 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
                        req->r_result = bytes;
 
                /* in case this is a write and we need to replay, */
-               req->r_reassert_version = rhead->reassert_version;
+               req->r_reassert_version.epoch = cpu_to_le32(reassert_epoch);
+               req->r_reassert_version.version = cpu_to_le64(reassert_version);
 
                req->r_got_reply = 1;
        } else if ((flags & CEPH_OSD_FLAG_ONDISK) == 0) {
@@ -1195,10 +1301,11 @@ done:
        ceph_osdc_put_request(req);
        return;
 
+bad_put:
+       ceph_osdc_put_request(req);
 bad:
-       pr_err("corrupt osd_op_reply got %d %d expected %d\n",
-              (int)msg->front.iov_len, le32_to_cpu(msg->hdr.front_len),
-              (int)sizeof(*rhead));
+       pr_err("corrupt osd_op_reply got %d %d\n",
+              (int)msg->front.iov_len, le32_to_cpu(msg->hdr.front_len));
        ceph_msg_dump(msg);
 }