]> git.kernelconcepts.de Git - karo-tx-linux.git/commitdiff
Merge remote-tracking branch 'nfsd/nfsd-next'
authorStephen Rothwell <sfr@canb.auug.org.au>
Mon, 7 Mar 2016 23:09:41 +0000 (10:09 +1100)
committerStephen Rothwell <sfr@canb.auug.org.au>
Mon, 7 Mar 2016 23:09:41 +0000 (10:09 +1100)
30 files changed:
Documentation/filesystems/nfs/pnfs-scsi-server.txt [new file with mode: 0644]
fs/nfs/blocklayout/blocklayout.c
fs/nfs/blocklayout/blocklayout.h
fs/nfs/blocklayout/dev.c
fs/nfs/blocklayout/extent_tree.c
fs/nfs/blocklayout/rpc_pipefs.c
fs/nfsd/Kconfig
fs/nfsd/Makefile
fs/nfsd/blocklayout.c
fs/nfsd/blocklayoutxdr.c
fs/nfsd/blocklayoutxdr.h
fs/nfsd/nfs4layouts.c
fs/nfsd/nfs4proc.c
fs/nfsd/nfs4state.c
fs/nfsd/nfs4xdr.c
fs/nfsd/pnfs.h
fs/xfs/Makefile
fs/xfs/xfs_export.c
fs/xfs/xfs_pnfs.h
include/linux/nfs4.h
include/linux/sunrpc/auth.h
include/linux/sunrpc/rpc_rdma.h
include/linux/sunrpc/svc_rdma.h
net/sunrpc/auth_null.c
net/sunrpc/auth_unix.c
net/sunrpc/xprtrdma/svc_rdma_backchannel.c
net/sunrpc/xprtrdma/svc_rdma_marshal.c
net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
net/sunrpc/xprtrdma/svc_rdma_sendto.c
net/sunrpc/xprtrdma/svc_rdma_transport.c

diff --git a/Documentation/filesystems/nfs/pnfs-scsi-server.txt b/Documentation/filesystems/nfs/pnfs-scsi-server.txt
new file mode 100644 (file)
index 0000000..4150979
--- /dev/null
@@ -0,0 +1,22 @@
+
+pNFS SCSI layout server user guide
+==================================
+
+This document describes support for pNFS SCSI layouts in the Linux NFS server.
+With pNFS SCSI layouts, the NFS server acts as Metadata Server (MDS) for pNFS,
+which in addition to handling all the metadata access to the NFS export,
+also hands out layouts to the clients so that they can directly access the
+underlying SCSI LUNs that are shared with the client.
+
+To use pNFS SCSI layouts with with the Linux NFS server, the exported file
+system needs to support the pNFS SCSI layouts (currently just XFS), and the
+file system must sit on a SCSI LUN that is accessible to the clients in
+addition to the MDS.  As of now the file system needs to sit directly on the
+exported LUN, striping or concatenation of LUNs on the MDS and clients
+is not supported yet.
+
+On the server, pNFS SCSI volume support is automatically enabled if the
+file system is exported using the "pnfs" option and the underlying SCSI
+device support persistent reservations.  On the client make sure the kernel
+has the CONFIG_PNFS_BLOCK option enabled, and the file system is mounted
+using the NFSv4.1 protocol version (mount -o vers=4.1).
index ddd0138f410c8af4963575b4765d2a3f069c21de..b27c409b2f8eea631adac75910dd8014c975f01f 100644 (file)
@@ -446,8 +446,8 @@ static void bl_free_layout_hdr(struct pnfs_layout_hdr *lo)
        kfree(bl);
 }
 
-static struct pnfs_layout_hdr *bl_alloc_layout_hdr(struct inode *inode,
-                                                  gfp_t gfp_flags)
+static struct pnfs_layout_hdr *__bl_alloc_layout_hdr(struct inode *inode,
+               gfp_t gfp_flags, bool is_scsi_layout)
 {
        struct pnfs_block_layout *bl;
 
@@ -460,9 +460,22 @@ static struct pnfs_layout_hdr *bl_alloc_layout_hdr(struct inode *inode,
        bl->bl_ext_ro = RB_ROOT;
        spin_lock_init(&bl->bl_ext_lock);
 
+       bl->bl_scsi_layout = is_scsi_layout;
        return &bl->bl_layout;
 }
 
+static struct pnfs_layout_hdr *bl_alloc_layout_hdr(struct inode *inode,
+                                                  gfp_t gfp_flags)
+{
+       return __bl_alloc_layout_hdr(inode, gfp_flags, false);
+}
+
+static struct pnfs_layout_hdr *sl_alloc_layout_hdr(struct inode *inode,
+                                                  gfp_t gfp_flags)
+{
+       return __bl_alloc_layout_hdr(inode, gfp_flags, true);
+}
+
 static void bl_free_lseg(struct pnfs_layout_segment *lseg)
 {
        dprintk("%s enter\n", __func__);
@@ -888,22 +901,53 @@ static struct pnfs_layoutdriver_type blocklayout_type = {
        .sync                           = pnfs_generic_sync,
 };
 
+static struct pnfs_layoutdriver_type scsilayout_type = {
+       .id                             = LAYOUT_SCSI,
+       .name                           = "LAYOUT_SCSI",
+       .owner                          = THIS_MODULE,
+       .flags                          = PNFS_LAYOUTRET_ON_SETATTR |
+                                         PNFS_READ_WHOLE_PAGE,
+       .read_pagelist                  = bl_read_pagelist,
+       .write_pagelist                 = bl_write_pagelist,
+       .alloc_layout_hdr               = sl_alloc_layout_hdr,
+       .free_layout_hdr                = bl_free_layout_hdr,
+       .alloc_lseg                     = bl_alloc_lseg,
+       .free_lseg                      = bl_free_lseg,
+       .return_range                   = bl_return_range,
+       .prepare_layoutcommit           = bl_prepare_layoutcommit,
+       .cleanup_layoutcommit           = bl_cleanup_layoutcommit,
+       .set_layoutdriver               = bl_set_layoutdriver,
+       .alloc_deviceid_node            = bl_alloc_deviceid_node,
+       .free_deviceid_node             = bl_free_deviceid_node,
+       .pg_read_ops                    = &bl_pg_read_ops,
+       .pg_write_ops                   = &bl_pg_write_ops,
+       .sync                           = pnfs_generic_sync,
+};
+
+
 static int __init nfs4blocklayout_init(void)
 {
        int ret;
 
        dprintk("%s: NFSv4 Block Layout Driver Registering...\n", __func__);
 
-       ret = pnfs_register_layoutdriver(&blocklayout_type);
+       ret = bl_init_pipefs();
        if (ret)
                goto out;
-       ret = bl_init_pipefs();
+
+       ret = pnfs_register_layoutdriver(&blocklayout_type);
        if (ret)
-               goto out_unregister;
+               goto out_cleanup_pipe;
+
+       ret = pnfs_register_layoutdriver(&scsilayout_type);
+       if (ret)
+               goto out_unregister_block;
        return 0;
 
-out_unregister:
+out_unregister_block:
        pnfs_unregister_layoutdriver(&blocklayout_type);
+out_cleanup_pipe:
+       bl_cleanup_pipefs();
 out:
        return ret;
 }
@@ -913,8 +957,9 @@ static void __exit nfs4blocklayout_exit(void)
        dprintk("%s: NFSv4 Block Layout Driver Unregistering...\n",
               __func__);
 
-       bl_cleanup_pipefs();
+       pnfs_unregister_layoutdriver(&scsilayout_type);
        pnfs_unregister_layoutdriver(&blocklayout_type);
+       bl_cleanup_pipefs();
 }
 
 MODULE_ALIAS("nfs-layouttype4-3");
index c556640dcf3bad183659eaac9ac935ca17d30114..bc21205309e086019d7cf11112c780ce5b57963a 100644 (file)
@@ -55,7 +55,6 @@ struct pnfs_block_dev;
  */
 #define PNFS_BLOCK_UUID_LEN    128
 
-
 struct pnfs_block_volume {
        enum pnfs_block_volume_type     type;
        union {
@@ -82,6 +81,13 @@ struct pnfs_block_volume {
                        u32             volumes_count;
                        u32             volumes[PNFS_BLOCK_MAX_DEVICES];
                } stripe;
+               struct {
+                       enum scsi_code_set              code_set;
+                       enum scsi_designator_type       designator_type;
+                       int                             designator_len;
+                       u8                              designator[256];
+                       u64                             pr_key;
+               } scsi;
        };
 };
 
@@ -106,6 +112,9 @@ struct pnfs_block_dev {
        struct block_device             *bdev;
        u64                             disk_offset;
 
+       u64                             pr_key;
+       bool                            pr_registered;
+
        bool (*map)(struct pnfs_block_dev *dev, u64 offset,
                        struct pnfs_block_dev_map *map);
 };
@@ -131,6 +140,7 @@ struct pnfs_block_layout {
        struct rb_root          bl_ext_rw;
        struct rb_root          bl_ext_ro;
        spinlock_t              bl_ext_lock;   /* Protects list manipulation */
+       bool                    bl_scsi_layout;
 };
 
 static inline struct pnfs_block_layout *
@@ -182,6 +192,6 @@ void ext_tree_mark_committed(struct nfs4_layoutcommit_args *arg, int status);
 dev_t bl_resolve_deviceid(struct nfs_server *server,
                struct pnfs_block_volume *b, gfp_t gfp_mask);
 int __init bl_init_pipefs(void);
-void __exit bl_cleanup_pipefs(void);
+void bl_cleanup_pipefs(void);
 
 #endif /* FS_NFS_NFS4BLOCKLAYOUT_H */
index a861bbdfe5778e579ab88f2a5fa1393a441f3fbb..e5b89675263efffc0629dba595782248ade72715 100644 (file)
@@ -1,11 +1,12 @@
 /*
- * Copyright (c) 2014 Christoph Hellwig.
+ * Copyright (c) 2014-2016 Christoph Hellwig.
  */
 #include <linux/sunrpc/svc.h>
 #include <linux/blkdev.h>
 #include <linux/nfs4.h>
 #include <linux/nfs_fs.h>
 #include <linux/nfs_xdr.h>
+#include <linux/pr.h>
 
 #include "blocklayout.h"
 
@@ -21,6 +22,17 @@ bl_free_device(struct pnfs_block_dev *dev)
                        bl_free_device(&dev->children[i]);
                kfree(dev->children);
        } else {
+               if (dev->pr_registered) {
+                       const struct pr_ops *ops =
+                               dev->bdev->bd_disk->fops->pr_ops;
+                       int error;
+
+                       error = ops->pr_register(dev->bdev, dev->pr_key, 0,
+                               false);
+                       if (error)
+                               pr_err("failed to unregister PR key.\n");
+               }
+
                if (dev->bdev)
                        blkdev_put(dev->bdev, FMODE_READ | FMODE_WRITE);
        }
@@ -113,6 +125,24 @@ nfs4_block_decode_volume(struct xdr_stream *xdr, struct pnfs_block_volume *b)
                for (i = 0; i < b->stripe.volumes_count; i++)
                        b->stripe.volumes[i] = be32_to_cpup(p++);
                break;
+       case PNFS_BLOCK_VOLUME_SCSI:
+               p = xdr_inline_decode(xdr, 4 + 4 + 4);
+               if (!p)
+                       return -EIO;
+               b->scsi.code_set = be32_to_cpup(p++);
+               b->scsi.designator_type = be32_to_cpup(p++);
+               b->scsi.designator_len = be32_to_cpup(p++);
+               p = xdr_inline_decode(xdr, b->scsi.designator_len);
+               if (!p)
+                       return -EIO;
+               if (b->scsi.designator_len > 256)
+                       return -EIO;
+               memcpy(&b->scsi.designator, p, b->scsi.designator_len);
+               p = xdr_inline_decode(xdr, 8);
+               if (!p)
+                       return -EIO;
+               p = xdr_decode_hyper(p, &b->scsi.pr_key);
+               break;
        default:
                dprintk("unknown volume type!\n");
                return -EIO;
@@ -216,6 +246,116 @@ bl_parse_simple(struct nfs_server *server, struct pnfs_block_dev *d,
        return 0;
 }
 
+static bool
+bl_validate_designator(struct pnfs_block_volume *v)
+{
+       switch (v->scsi.designator_type) {
+       case PS_DESIGNATOR_EUI64:
+               if (v->scsi.code_set != PS_CODE_SET_BINARY)
+                       return false;
+
+               if (v->scsi.designator_len != 8 &&
+                   v->scsi.designator_len != 10 &&
+                   v->scsi.designator_len != 16)
+                       return false;
+
+               return true;
+       case PS_DESIGNATOR_NAA:
+               if (v->scsi.code_set != PS_CODE_SET_BINARY)
+                       return false;
+
+               if (v->scsi.designator_len != 8 &&
+                   v->scsi.designator_len != 16)
+                       return false;
+
+               return true;
+       case PS_DESIGNATOR_T10:
+       case PS_DESIGNATOR_NAME:
+               pr_err("pNFS: unsupported designator "
+                       "(code set %d, type %d, len %d.\n",
+                       v->scsi.code_set,
+                       v->scsi.designator_type,
+                       v->scsi.designator_len);
+               return false;
+       default:
+               pr_err("pNFS: invalid designator "
+                       "(code set %d, type %d, len %d.\n",
+                       v->scsi.code_set,
+                       v->scsi.designator_type,
+                       v->scsi.designator_len);
+               return false;
+       }
+}
+
+static int
+bl_parse_scsi(struct nfs_server *server, struct pnfs_block_dev *d,
+               struct pnfs_block_volume *volumes, int idx, gfp_t gfp_mask)
+{
+       struct pnfs_block_volume *v = &volumes[idx];
+       const struct pr_ops *ops;
+       const char *devname;
+       int error;
+
+       if (!bl_validate_designator(v))
+               return -EINVAL;
+
+       switch (v->scsi.designator_len) {
+       case 8:
+               devname = kasprintf(GFP_KERNEL, "/dev/disk/by-id/wwn-0x%8phN",
+                               v->scsi.designator);
+               break;
+       case 12:
+               devname = kasprintf(GFP_KERNEL, "/dev/disk/by-id/wwn-0x%12phN",
+                               v->scsi.designator);
+               break;
+       case 16:
+               devname = kasprintf(GFP_KERNEL, "/dev/disk/by-id/wwn-0x%16phN",
+                               v->scsi.designator);
+               break;
+       default:
+               return -EINVAL;
+       }
+
+       d->bdev = blkdev_get_by_path(devname, FMODE_READ, NULL);
+       if (IS_ERR(d->bdev)) {
+               pr_warn("pNFS: failed to open device %s (%ld)\n",
+                       devname, PTR_ERR(d->bdev));
+               kfree(devname);
+               return PTR_ERR(d->bdev);
+       }
+
+       kfree(devname);
+
+       d->len = i_size_read(d->bdev->bd_inode);
+       d->map = bl_map_simple;
+       d->pr_key = v->scsi.pr_key;
+
+       pr_info("pNFS: using block device %s (reservation key 0x%llx)\n",
+               d->bdev->bd_disk->disk_name, d->pr_key);
+
+       ops = d->bdev->bd_disk->fops->pr_ops;
+       if (!ops) {
+               pr_err("pNFS: block device %s does not support reservations.",
+                               d->bdev->bd_disk->disk_name);
+               error = -EINVAL;
+               goto out_blkdev_put;
+       }
+
+       error = ops->pr_register(d->bdev, 0, d->pr_key, true);
+       if (error) {
+               pr_err("pNFS: failed to register key for block device %s.",
+                               d->bdev->bd_disk->disk_name);
+               goto out_blkdev_put;
+       }
+
+       d->pr_registered = true;
+       return 0;
+
+out_blkdev_put:
+       blkdev_put(d->bdev, FMODE_READ);
+       return error;
+}
+
 static int
 bl_parse_slice(struct nfs_server *server, struct pnfs_block_dev *d,
                struct pnfs_block_volume *volumes, int idx, gfp_t gfp_mask)
@@ -303,6 +443,8 @@ bl_parse_deviceid(struct nfs_server *server, struct pnfs_block_dev *d,
                return bl_parse_concat(server, d, volumes, idx, gfp_mask);
        case PNFS_BLOCK_VOLUME_STRIPE:
                return bl_parse_stripe(server, d, volumes, idx, gfp_mask);
+       case PNFS_BLOCK_VOLUME_SCSI:
+               return bl_parse_scsi(server, d, volumes, idx, gfp_mask);
        default:
                dprintk("unsupported volume type: %d\n", volumes[idx].type);
                return -EIO;
index 35ab51c04814d67baecf83d3b3627d66fdd79359..720b3ff55fa9b31502c42120f721e3365e735cf3 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2014 Christoph Hellwig.
+ * Copyright (c) 2014-2016 Christoph Hellwig.
  */
 
 #include <linux/vmalloc.h>
@@ -462,10 +462,12 @@ out:
        return err;
 }
 
-static size_t ext_tree_layoutupdate_size(size_t count)
+static size_t ext_tree_layoutupdate_size(struct pnfs_block_layout *bl, size_t count)
 {
-       return sizeof(__be32) /* number of entries */ +
-               PNFS_BLOCK_EXTENT_SIZE * count;
+       if (bl->bl_scsi_layout)
+               return sizeof(__be32) + PNFS_SCSI_RANGE_SIZE * count;
+       else
+               return sizeof(__be32) + PNFS_BLOCK_EXTENT_SIZE * count;
 }
 
 static void ext_tree_free_commitdata(struct nfs4_layoutcommit_args *arg,
@@ -483,6 +485,23 @@ static void ext_tree_free_commitdata(struct nfs4_layoutcommit_args *arg,
        }
 }
 
+static __be32 *encode_block_extent(struct pnfs_block_extent *be, __be32 *p)
+{
+       p = xdr_encode_opaque_fixed(p, be->be_device->deviceid.data,
+                       NFS4_DEVICEID4_SIZE);
+       p = xdr_encode_hyper(p, be->be_f_offset << SECTOR_SHIFT);
+       p = xdr_encode_hyper(p, be->be_length << SECTOR_SHIFT);
+       p = xdr_encode_hyper(p, 0LL);
+       *p++ = cpu_to_be32(PNFS_BLOCK_READWRITE_DATA);
+       return p;
+}
+
+static __be32 *encode_scsi_range(struct pnfs_block_extent *be, __be32 *p)
+{
+       p = xdr_encode_hyper(p, be->be_f_offset << SECTOR_SHIFT);
+       return xdr_encode_hyper(p, be->be_length << SECTOR_SHIFT);
+}
+
 static int ext_tree_encode_commit(struct pnfs_block_layout *bl, __be32 *p,
                size_t buffer_size, size_t *count)
 {
@@ -496,19 +515,16 @@ static int ext_tree_encode_commit(struct pnfs_block_layout *bl, __be32 *p,
                        continue;
 
                (*count)++;
-               if (ext_tree_layoutupdate_size(*count) > buffer_size) {
+               if (ext_tree_layoutupdate_size(bl, *count) > buffer_size) {
                        /* keep counting.. */
                        ret = -ENOSPC;
                        continue;
                }
 
-               p = xdr_encode_opaque_fixed(p, be->be_device->deviceid.data,
-                               NFS4_DEVICEID4_SIZE);
-               p = xdr_encode_hyper(p, be->be_f_offset << SECTOR_SHIFT);
-               p = xdr_encode_hyper(p, be->be_length << SECTOR_SHIFT);
-               p = xdr_encode_hyper(p, 0LL);
-               *p++ = cpu_to_be32(PNFS_BLOCK_READWRITE_DATA);
-
+               if (bl->bl_scsi_layout)
+                       p = encode_scsi_range(be, p);
+               else
+                       p = encode_block_extent(be, p);
                be->be_tag = EXTENT_COMMITTING;
        }
        spin_unlock(&bl->bl_ext_lock);
@@ -537,7 +553,7 @@ retry:
        if (unlikely(ret)) {
                ext_tree_free_commitdata(arg, buffer_size);
 
-               buffer_size = ext_tree_layoutupdate_size(count);
+               buffer_size = ext_tree_layoutupdate_size(bl, count);
                count = 0;
 
                arg->layoutupdate_pages =
@@ -556,7 +572,7 @@ retry:
        }
 
        *start_p = cpu_to_be32(count);
-       arg->layoutupdate_len = ext_tree_layoutupdate_size(count);
+       arg->layoutupdate_len = ext_tree_layoutupdate_size(bl, count);
 
        if (unlikely(arg->layoutupdate_pages != &arg->layoutupdate_page)) {
                void *p = start_p, *end = p + arg->layoutupdate_len;
index dbe5839cdeba3e1cb701757365f113c127ab364e..9fb067a6f7e025983a170ee6dee7c3ffd7bb28cf 100644 (file)
@@ -281,7 +281,7 @@ out:
        return ret;
 }
 
-void __exit bl_cleanup_pipefs(void)
+void bl_cleanup_pipefs(void)
 {
        rpc_pipefs_notifier_unregister(&nfs4blocklayout_block);
        unregister_pernet_subsys(&nfs4blocklayout_net_ops);
index a0b77fc1bd3944e9f966705518d4138cd2e4f0e3..a30a31316e68411f5d500f241c1f8f9b86518d5f 100644 (file)
@@ -84,12 +84,30 @@ config NFSD_V4
          If unsure, say N.
 
 config NFSD_PNFS
-       bool "NFSv4.1 server support for Parallel NFS (pNFS)"
+       bool
+
+config NFSD_BLOCKLAYOUT
+       bool "NFSv4.1 server support for pNFS block layouts"
+       depends on NFSD_V4
+       select NFSD_PNFS
+       help
+         This option enables support for the exporting pNFS block layouts
+         in the kernel's NFS server. The pNFS block layout enables NFS
+         clients to directly perform I/O to block devices accesible to both
+         the server and the clients.  See RFC 5663 for more details.
+
+         If unsure, say N.
+
+config NFSD_SCSILAYOUT
+       bool "NFSv4.1 server support for pNFS SCSI layouts"
        depends on NFSD_V4
+       select NFSD_PNFS
        help
-         This option enables support for the parallel NFS features of the
-         minor version 1 of the NFSv4 protocol (RFC5661) in the kernel's NFS
-         server.
+         This option enables support for the exporting pNFS SCSI layouts
+         in the kernel's NFS server. The pNFS SCSI layout enables NFS
+         clients to directly perform I/O to SCSI devices accesible to both
+         the server and the clients.  See draft-ietf-nfsv4-scsi-layout for
+         more details.
 
          If unsure, say N.
 
index 9a6028e120c68bce8adf15456b39382eab3b6a7c..3ae5f3c77e28b15b532e25668a86403446dc4425 100644 (file)
@@ -17,4 +17,6 @@ nfsd-$(CONFIG_NFSD_V3)        += nfs3proc.o nfs3xdr.o
 nfsd-$(CONFIG_NFSD_V3_ACL) += nfs3acl.o
 nfsd-$(CONFIG_NFSD_V4) += nfs4proc.o nfs4xdr.o nfs4state.o nfs4idmap.o \
                           nfs4acl.o nfs4callback.o nfs4recover.o
-nfsd-$(CONFIG_NFSD_PNFS) += nfs4layouts.o blocklayout.o blocklayoutxdr.o
+nfsd-$(CONFIG_NFSD_PNFS) += nfs4layouts.o
+nfsd-$(CONFIG_NFSD_BLOCKLAYOUT) += blocklayout.o blocklayoutxdr.o
+nfsd-$(CONFIG_NFSD_SCSILAYOUT) += blocklayout.o blocklayoutxdr.o
index c29d9421bd5e1f8c890178c6ec961899b319ceef..0e87e3e59da90be9acb8b96a5a19009f55609cdb 100644 (file)
@@ -1,11 +1,14 @@
 /*
- * Copyright (c) 2014 Christoph Hellwig.
+ * Copyright (c) 2014-2016 Christoph Hellwig.
  */
 #include <linux/exportfs.h>
 #include <linux/genhd.h>
 #include <linux/slab.h>
+#include <linux/pr.h>
 
 #include <linux/nfsd/debug.h>
+#include <scsi/scsi_proto.h>
+#include <scsi/scsi_common.h>
 
 #include "blocklayoutxdr.h"
 #include "pnfs.h"
 #define NFSDDBG_FACILITY       NFSDDBG_PNFS
 
 
-static int
-nfsd4_block_get_device_info_simple(struct super_block *sb,
-               struct nfsd4_getdeviceinfo *gdp)
-{
-       struct pnfs_block_deviceaddr *dev;
-       struct pnfs_block_volume *b;
-
-       dev = kzalloc(sizeof(struct pnfs_block_deviceaddr) +
-                     sizeof(struct pnfs_block_volume), GFP_KERNEL);
-       if (!dev)
-               return -ENOMEM;
-       gdp->gd_device = dev;
-
-       dev->nr_volumes = 1;
-       b = &dev->volumes[0];
-
-       b->type = PNFS_BLOCK_VOLUME_SIMPLE;
-       b->simple.sig_len = PNFS_BLOCK_UUID_LEN;
-       return sb->s_export_op->get_uuid(sb, b->simple.sig, &b->simple.sig_len,
-                       &b->simple.offset);
-}
-
-static __be32
-nfsd4_block_proc_getdeviceinfo(struct super_block *sb,
-               struct nfsd4_getdeviceinfo *gdp)
-{
-       if (sb->s_bdev != sb->s_bdev->bd_contains)
-               return nfserr_inval;
-       return nfserrno(nfsd4_block_get_device_info_simple(sb, gdp));
-}
-
 static __be32
 nfsd4_block_proc_layoutget(struct inode *inode, const struct svc_fh *fhp,
                struct nfsd4_layoutget *args)
@@ -141,20 +113,13 @@ out_layoutunavailable:
 }
 
 static __be32
-nfsd4_block_proc_layoutcommit(struct inode *inode,
-               struct nfsd4_layoutcommit *lcp)
+nfsd4_block_commit_blocks(struct inode *inode, struct nfsd4_layoutcommit *lcp,
+               struct iomap *iomaps, int nr_iomaps)
 {
        loff_t new_size = lcp->lc_last_wr + 1;
        struct iattr iattr = { .ia_valid = 0 };
-       struct iomap *iomaps;
-       int nr_iomaps;
        int error;
 
-       nr_iomaps = nfsd4_block_decode_layoutupdate(lcp->lc_up_layout,
-                       lcp->lc_up_len, &iomaps, 1 << inode->i_blkbits);
-       if (nr_iomaps < 0)
-               return nfserrno(nr_iomaps);
-
        if (lcp->lc_mtime.tv_nsec == UTIME_NOW ||
            timespec_compare(&lcp->lc_mtime, &inode->i_mtime) < 0)
                lcp->lc_mtime = current_fs_time(inode->i_sb);
@@ -172,6 +137,54 @@ nfsd4_block_proc_layoutcommit(struct inode *inode,
        return nfserrno(error);
 }
 
+#ifdef CONFIG_NFSD_BLOCKLAYOUT
+static int
+nfsd4_block_get_device_info_simple(struct super_block *sb,
+               struct nfsd4_getdeviceinfo *gdp)
+{
+       struct pnfs_block_deviceaddr *dev;
+       struct pnfs_block_volume *b;
+
+       dev = kzalloc(sizeof(struct pnfs_block_deviceaddr) +
+                     sizeof(struct pnfs_block_volume), GFP_KERNEL);
+       if (!dev)
+               return -ENOMEM;
+       gdp->gd_device = dev;
+
+       dev->nr_volumes = 1;
+       b = &dev->volumes[0];
+
+       b->type = PNFS_BLOCK_VOLUME_SIMPLE;
+       b->simple.sig_len = PNFS_BLOCK_UUID_LEN;
+       return sb->s_export_op->get_uuid(sb, b->simple.sig, &b->simple.sig_len,
+                       &b->simple.offset);
+}
+
+static __be32
+nfsd4_block_proc_getdeviceinfo(struct super_block *sb,
+               struct nfs4_client *clp,
+               struct nfsd4_getdeviceinfo *gdp)
+{
+       if (sb->s_bdev != sb->s_bdev->bd_contains)
+               return nfserr_inval;
+       return nfserrno(nfsd4_block_get_device_info_simple(sb, gdp));
+}
+
+static __be32
+nfsd4_block_proc_layoutcommit(struct inode *inode,
+               struct nfsd4_layoutcommit *lcp)
+{
+       struct iomap *iomaps;
+       int nr_iomaps;
+
+       nr_iomaps = nfsd4_block_decode_layoutupdate(lcp->lc_up_layout,
+                       lcp->lc_up_len, &iomaps, 1 << inode->i_blkbits);
+       if (nr_iomaps < 0)
+               return nfserrno(nr_iomaps);
+
+       return nfsd4_block_commit_blocks(inode, lcp, iomaps, nr_iomaps);
+}
+
 const struct nfsd4_layout_ops bl_layout_ops = {
        /*
         * Pretend that we send notification to the client.  This is a blatant
@@ -190,3 +203,206 @@ const struct nfsd4_layout_ops bl_layout_ops = {
        .encode_layoutget       = nfsd4_block_encode_layoutget,
        .proc_layoutcommit      = nfsd4_block_proc_layoutcommit,
 };
+#endif /* CONFIG_NFSD_BLOCKLAYOUT */
+
+#ifdef CONFIG_NFSD_SCSILAYOUT
+static int nfsd4_scsi_identify_device(struct block_device *bdev,
+               struct pnfs_block_volume *b)
+{
+       struct request_queue *q = bdev->bd_disk->queue;
+       struct request *rq;
+       size_t bufflen = 252, len, id_len;
+       u8 *buf, *d, type, assoc;
+       int error;
+
+       buf = kzalloc(bufflen, GFP_KERNEL);
+       if (!buf)
+               return -ENOMEM;
+
+       rq = blk_get_request(q, READ, GFP_KERNEL);
+       if (IS_ERR(rq)) {
+               error = -ENOMEM;
+               goto out_free_buf;
+       }
+       blk_rq_set_block_pc(rq);
+
+       error = blk_rq_map_kern(q, rq, buf, bufflen, GFP_KERNEL);
+       if (error)
+               goto out_put_request;
+
+       rq->cmd[0] = INQUIRY;
+       rq->cmd[1] = 1;
+       rq->cmd[2] = 0x83;
+       rq->cmd[3] = bufflen >> 8;
+       rq->cmd[4] = bufflen & 0xff;
+       rq->cmd_len = COMMAND_SIZE(INQUIRY);
+
+       error = blk_execute_rq(rq->q, NULL, rq, 1);
+       if (error) {
+               pr_err("pNFS: INQUIRY 0x83 failed with: %x\n",
+                       rq->errors);
+               
+       }
+
+       len = (buf[2] << 8) + buf[3] + 4;
+       if (len > bufflen) {
+               pr_err("pNFS: INQUIRY 0x83 response invalid (len = %zd)\n",
+                       len);
+               goto out_put_request;
+       }
+
+       d = buf + 4;
+       for (d = buf + 4; d < buf + len; d += id_len + 4) {
+               id_len = d[3];
+               type = d[1] & 0xf;
+               assoc = (d[1] >> 4) & 0x3;
+
+               /*
+                * We only care about a EUI-64 and NAA designator types
+                * with LU association.
+                */
+               if (assoc != 0x00)
+                       continue;
+               if (type != 0x02 && type != 0x03)
+                       continue;
+               if (id_len != 8 && id_len != 12 && id_len != 16)
+                       continue;
+
+               b->scsi.code_set = PS_CODE_SET_BINARY;
+               b->scsi.designator_type = type == 0x02 ?
+                       PS_DESIGNATOR_EUI64 : PS_DESIGNATOR_NAA;
+               b->scsi.designator_len = id_len;
+               memcpy(b->scsi.designator, d + 4, id_len);
+
+               /*
+                * If we found a 8 or 12 byte descriptor continue on to
+                * see if a 16 byte one is available.  If we find a
+                * 16 byte descriptor we're done.
+                */
+               if (id_len == 16)
+                       break;
+       }
+
+out_put_request:
+       blk_put_request(rq);
+out_free_buf:
+       kfree(buf);
+       return error;
+}
+
+#define NFSD_MDS_PR_KEY                0x0100000000000000
+
+/*
+ * We use the client ID as a uniqueue key for the reservations.
+ * This allows us to easily fence a client when recalls fail.
+ */
+static u64 nfsd4_scsi_pr_key(struct nfs4_client *clp)
+{
+       return ((u64)clp->cl_clientid.cl_boot << 32) | clp->cl_clientid.cl_id;
+}
+
+static int
+nfsd4_block_get_device_info_scsi(struct super_block *sb,
+               struct nfs4_client *clp,
+               struct nfsd4_getdeviceinfo *gdp)
+{
+       struct pnfs_block_deviceaddr *dev;
+       struct pnfs_block_volume *b;
+       const struct pr_ops *ops;
+       int error;
+
+       dev = kzalloc(sizeof(struct pnfs_block_deviceaddr) +
+                     sizeof(struct pnfs_block_volume), GFP_KERNEL);
+       if (!dev)
+               return -ENOMEM;
+       gdp->gd_device = dev;
+
+       dev->nr_volumes = 1;
+       b = &dev->volumes[0];
+
+       b->type = PNFS_BLOCK_VOLUME_SCSI;
+       b->scsi.pr_key = nfsd4_scsi_pr_key(clp);
+
+       error = nfsd4_scsi_identify_device(sb->s_bdev, b);
+       if (error)
+               return error;
+
+       ops = sb->s_bdev->bd_disk->fops->pr_ops;
+       if (!ops) {
+               pr_err("pNFS: device %s does not support PRs.\n",
+                       sb->s_id);
+               return -EINVAL;
+       }
+
+       error = ops->pr_register(sb->s_bdev, 0, NFSD_MDS_PR_KEY, true);
+       if (error) {
+               pr_err("pNFS: failed to register key for device %s.\n",
+                       sb->s_id);
+               return -EINVAL;
+       }
+
+       error = ops->pr_reserve(sb->s_bdev, NFSD_MDS_PR_KEY,
+                       PR_EXCLUSIVE_ACCESS_REG_ONLY, 0);
+       if (error) {
+               pr_err("pNFS: failed to reserve device %s.\n",
+                       sb->s_id);
+               return -EINVAL;
+       }
+
+       return 0;
+}
+
+static __be32
+nfsd4_scsi_proc_getdeviceinfo(struct super_block *sb,
+               struct nfs4_client *clp,
+               struct nfsd4_getdeviceinfo *gdp)
+{
+       if (sb->s_bdev != sb->s_bdev->bd_contains)
+               return nfserr_inval;
+       return nfserrno(nfsd4_block_get_device_info_scsi(sb, clp, gdp));
+}
+static __be32
+nfsd4_scsi_proc_layoutcommit(struct inode *inode,
+               struct nfsd4_layoutcommit *lcp)
+{
+       struct iomap *iomaps;
+       int nr_iomaps;
+
+       nr_iomaps = nfsd4_scsi_decode_layoutupdate(lcp->lc_up_layout,
+                       lcp->lc_up_len, &iomaps, 1 << inode->i_blkbits);
+       if (nr_iomaps < 0)
+               return nfserrno(nr_iomaps);
+
+       return nfsd4_block_commit_blocks(inode, lcp, iomaps, nr_iomaps);
+}
+
+static void
+nfsd4_scsi_fence_client(struct nfs4_layout_stateid *ls)
+{
+       struct nfs4_client *clp = ls->ls_stid.sc_client;
+       struct block_device *bdev = ls->ls_file->f_path.mnt->mnt_sb->s_bdev;
+
+       bdev->bd_disk->fops->pr_ops->pr_preempt(bdev, NFSD_MDS_PR_KEY,
+                       nfsd4_scsi_pr_key(clp), 0, true);
+}
+
+const struct nfsd4_layout_ops scsi_layout_ops = {
+       /*
+        * Pretend that we send notification to the client.  This is a blatant
+        * lie to force recent Linux clients to cache our device IDs.
+        * We rarely ever change the device ID, so the harm of leaking deviceids
+        * for a while isn't too bad.  Unfortunately RFC5661 is a complete mess
+        * in this regard, but I filed errata 4119 for this a while ago, and
+        * hopefully the Linux client will eventually start caching deviceids
+        * without this again.
+        */
+       .notify_types           =
+                       NOTIFY_DEVICEID4_DELETE | NOTIFY_DEVICEID4_CHANGE,
+       .proc_getdeviceinfo     = nfsd4_scsi_proc_getdeviceinfo,
+       .encode_getdeviceinfo   = nfsd4_block_encode_getdeviceinfo,
+       .proc_layoutget         = nfsd4_block_proc_layoutget,
+       .encode_layoutget       = nfsd4_block_encode_layoutget,
+       .proc_layoutcommit      = nfsd4_scsi_proc_layoutcommit,
+       .fence_client           = nfsd4_scsi_fence_client,
+};
+#endif /* CONFIG_NFSD_SCSILAYOUT */
index 6d834dc9bbc826bf8b711fb4adfaf58c82ae5c48..ca1883668810145b279ad54cce08b94ba639ae4e 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2014 Christoph Hellwig.
+ * Copyright (c) 2014-2016 Christoph Hellwig.
  */
 #include <linux/sunrpc/svc.h>
 #include <linux/exportfs.h>
@@ -53,6 +53,18 @@ nfsd4_block_encode_volume(struct xdr_stream *xdr, struct pnfs_block_volume *b)
                p = xdr_encode_hyper(p, b->simple.offset);
                p = xdr_encode_opaque(p, b->simple.sig, b->simple.sig_len);
                break;
+       case PNFS_BLOCK_VOLUME_SCSI:
+               len = 4 + 4 + 4 + 4 + b->scsi.designator_len + 8;
+               p = xdr_reserve_space(xdr, len);
+               if (!p)
+                       return -ETOOSMALL;
+
+               *p++ = cpu_to_be32(b->type);
+               *p++ = cpu_to_be32(b->scsi.code_set);
+               *p++ = cpu_to_be32(b->scsi.designator_type);
+               p = xdr_encode_opaque(p, b->scsi.designator, b->scsi.designator_len);
+               p = xdr_encode_hyper(p, b->scsi.pr_key);
+               break;
        default:
                return -ENOTSUPP;
        }
@@ -155,3 +167,54 @@ fail:
        kfree(iomaps);
        return -EINVAL;
 }
+
+int
+nfsd4_scsi_decode_layoutupdate(__be32 *p, u32 len, struct iomap **iomapp,
+               u32 block_size)
+{
+       struct iomap *iomaps;
+       u32 nr_iomaps, expected, i;
+
+       if (len < sizeof(u32)) {
+               dprintk("%s: extent array too small: %u\n", __func__, len);
+               return -EINVAL;
+       }
+
+       nr_iomaps = be32_to_cpup(p++);
+       expected = sizeof(__be32) + nr_iomaps * PNFS_SCSI_RANGE_SIZE;
+       if (len != expected) {
+               dprintk("%s: extent array size mismatch: %u/%u\n",
+                       __func__, len, expected);
+               return -EINVAL;
+       }
+
+       iomaps = kcalloc(nr_iomaps, sizeof(*iomaps), GFP_KERNEL);
+       if (!iomaps) {
+               dprintk("%s: failed to allocate extent array\n", __func__);
+               return -ENOMEM;
+       }
+
+       for (i = 0; i < nr_iomaps; i++) {
+               u64 val;
+
+               p = xdr_decode_hyper(p, &val);
+               if (val & (block_size - 1)) {
+                       dprintk("%s: unaligned offset 0x%llx\n", __func__, val);
+                       goto fail;
+               }
+               iomaps[i].offset = val;
+
+               p = xdr_decode_hyper(p, &val);
+               if (val & (block_size - 1)) {
+                       dprintk("%s: unaligned length 0x%llx\n", __func__, val);
+                       goto fail;
+               }
+               iomaps[i].length = val;
+       }
+
+       *iomapp = iomaps;
+       return nr_iomaps;
+fail:
+       kfree(iomaps);
+       return -EINVAL;
+}
index 6de925fe84991d09081dce75db8c8cd4ba12dded..397bc7563a4927c9673e5715bbb79e0cd1c6006b 100644 (file)
@@ -15,6 +15,11 @@ struct pnfs_block_extent {
        enum pnfs_block_extent_state    es;
 };
 
+struct pnfs_block_range {
+       u64                             foff;
+       u64                             len;
+};
+
 /*
  * Random upper cap for the uuid length to avoid unbounded allocation.
  * Not actually limited by the protocol.
@@ -29,6 +34,13 @@ struct pnfs_block_volume {
                        u32             sig_len;
                        u8              sig[PNFS_BLOCK_UUID_LEN];
                } simple;
+               struct {
+                       enum scsi_code_set              code_set;
+                       enum scsi_designator_type       designator_type;
+                       int                             designator_len;
+                       u8                              designator[256];
+                       u64                             pr_key;
+               } scsi;
        };
 };
 
@@ -43,5 +55,7 @@ __be32 nfsd4_block_encode_layoutget(struct xdr_stream *xdr,
                struct nfsd4_layoutget *lgp);
 int nfsd4_block_decode_layoutupdate(__be32 *p, u32 len, struct iomap **iomapp,
                u32 block_size);
+int nfsd4_scsi_decode_layoutupdate(__be32 *p, u32 len, struct iomap **iomapp,
+               u32 block_size);
 
 #endif /* _NFSD_BLOCKLAYOUTXDR_H */
index ce2d010d3b170627c82b281c11e206bd97109365..cbd804e90b3272c6192d1b52149fb2a0b67881ea 100644 (file)
@@ -1,6 +1,7 @@
 /*
  * Copyright (c) 2014 Christoph Hellwig.
  */
+#include <linux/blkdev.h>
 #include <linux/kmod.h>
 #include <linux/file.h>
 #include <linux/jhash.h>
@@ -26,7 +27,12 @@ static const struct nfsd4_callback_ops nfsd4_cb_layout_ops;
 static const struct lock_manager_operations nfsd4_layouts_lm_ops;
 
 const struct nfsd4_layout_ops *nfsd4_layout_ops[LAYOUT_TYPE_MAX] =  {
+#ifdef CONFIG_NFSD_BLOCKLAYOUT
        [LAYOUT_BLOCK_VOLUME]   = &bl_layout_ops,
+#endif
+#ifdef CONFIG_NFSD_SCSILAYOUT
+       [LAYOUT_SCSI]           = &scsi_layout_ops,
+#endif
 };
 
 /* pNFS device ID to export fsid mapping */
@@ -121,10 +127,24 @@ void nfsd4_setup_layout_type(struct svc_export *exp)
        if (!(exp->ex_flags & NFSEXP_PNFS))
                return;
 
+       /*
+        * Check if the file systems supports exporting a block-like layout.
+        * If the block device supports reservations prefer the SCSI layout,
+        * else advertise the block layout.
+        */
+#ifdef CONFIG_NFSD_BLOCKLAYOUT
        if (sb->s_export_op->get_uuid &&
            sb->s_export_op->map_blocks &&
            sb->s_export_op->commit_blocks)
                exp->ex_layout_type = LAYOUT_BLOCK_VOLUME;
+#endif
+#ifdef CONFIG_NFSD_SCSILAYOUT
+       /* overwrite block layout selection if needed */
+       if (sb->s_export_op->map_blocks &&
+           sb->s_export_op->commit_blocks &&
+           sb->s_bdev && sb->s_bdev->bd_disk->fops->pr_ops)
+               exp->ex_layout_type = LAYOUT_SCSI;
+#endif
 }
 
 static void
@@ -590,8 +610,6 @@ nfsd4_cb_layout_fail(struct nfs4_layout_stateid *ls)
 
        rpc_ntop((struct sockaddr *)&clp->cl_addr, addr_str, sizeof(addr_str));
 
-       trace_layout_recall_fail(&ls->ls_stid.sc_stateid);
-
        printk(KERN_WARNING
                "nfsd: client %s failed to respond to layout recall. "
                "  Fencing..\n", addr_str);
@@ -626,6 +644,7 @@ nfsd4_cb_layout_done(struct nfsd4_callback *cb, struct rpc_task *task)
                container_of(cb, struct nfs4_layout_stateid, ls_recall);
        struct nfsd_net *nn;
        ktime_t now, cutoff;
+       const struct nfsd4_layout_ops *ops;
        LIST_HEAD(reaplist);
 
 
@@ -661,7 +680,13 @@ nfsd4_cb_layout_done(struct nfsd4_callback *cb, struct rpc_task *task)
                /*
                 * Unknown error or non-responding client, we'll need to fence.
                 */
-               nfsd4_cb_layout_fail(ls);
+               trace_layout_recall_fail(&ls->ls_stid.sc_stateid);
+
+               ops = nfsd4_layout_ops[ls->ls_layout_type];
+               if (ops->fence_client)
+                       ops->fence_client(ls);
+               else
+                       nfsd4_cb_layout_fail(ls);
                return -1;
        }
 }
index 4cba7865f4966b825e3d47c7d423fdb8ca11a262..de1ff1d98bb188a5661893f25e67926b70f7182f 100644 (file)
@@ -864,12 +864,10 @@ static __be32
 nfsd4_secinfo(struct svc_rqst *rqstp, struct nfsd4_compound_state *cstate,
              struct nfsd4_secinfo *secinfo)
 {
-       struct svc_fh resfh;
        struct svc_export *exp;
        struct dentry *dentry;
        __be32 err;
 
-       fh_init(&resfh, NFS4_FHSIZE);
        err = fh_verify(rqstp, &cstate->current_fh, S_IFDIR, NFSD_MAY_EXEC);
        if (err)
                return err;
@@ -878,6 +876,7 @@ nfsd4_secinfo(struct svc_rqst *rqstp, struct nfsd4_compound_state *cstate,
                                    &exp, &dentry);
        if (err)
                return err;
+       fh_unlock(&cstate->current_fh);
        if (d_really_is_negative(dentry)) {
                exp_put(exp);
                err = nfserr_noent;
@@ -1269,8 +1268,10 @@ nfsd4_getdeviceinfo(struct svc_rqst *rqstp,
                goto out;
 
        nfserr = nfs_ok;
-       if (gdp->gd_maxcount != 0)
-               nfserr = ops->proc_getdeviceinfo(exp->ex_path.mnt->mnt_sb, gdp);
+       if (gdp->gd_maxcount != 0) {
+               nfserr = ops->proc_getdeviceinfo(exp->ex_path.mnt->mnt_sb,
+                                       cstate->session->se_client, gdp);
+       }
 
        gdp->gd_notify_types &= ops->notify_types;
 out:
index c484a2b6cd109e985d9ea46cd59772146e3e3e3c..0462eeddfff9997f9de2fa0fb53a100deeff941d 100644 (file)
@@ -2408,7 +2408,8 @@ nfsd4_exchange_id(struct svc_rqst *rqstp,
        default:                                /* checked by xdr code */
                WARN_ON_ONCE(1);
        case SP4_SSV:
-               return nfserr_encr_alg_unsupp;
+               status = nfserr_encr_alg_unsupp;
+               goto out_nolock;
        }
 
        /* Cases below refer to rfc 5661 section 18.35.4: */
@@ -2586,21 +2587,26 @@ static __be32 check_forechannel_attrs(struct nfsd4_channel_attrs *ca, struct nfs
        return nfs_ok;
 }
 
+/*
+ * Server's NFSv4.1 backchannel support is AUTH_SYS-only for now.
+ * These are based on similar macros in linux/sunrpc/msg_prot.h .
+ */
+#define RPC_MAX_HEADER_WITH_AUTH_SYS \
+       (RPC_CALLHDRSIZE + 2 * (2 + UNX_CALLSLACK))
+
+#define RPC_MAX_REPHEADER_WITH_AUTH_SYS \
+       (RPC_REPHDRSIZE + (2 + NUL_REPLYSLACK))
+
 #define NFSD_CB_MAX_REQ_SZ     ((NFS4_enc_cb_recall_sz + \
-                                RPC_MAX_HEADER_WITH_AUTH) * sizeof(__be32))
+                                RPC_MAX_HEADER_WITH_AUTH_SYS) * sizeof(__be32))
 #define NFSD_CB_MAX_RESP_SZ    ((NFS4_dec_cb_recall_sz + \
-                                RPC_MAX_REPHEADER_WITH_AUTH) * sizeof(__be32))
+                                RPC_MAX_REPHEADER_WITH_AUTH_SYS) * \
+                                sizeof(__be32))
 
 static __be32 check_backchannel_attrs(struct nfsd4_channel_attrs *ca)
 {
        ca->headerpadsz = 0;
 
-       /*
-        * These RPC_MAX_HEADER macros are overkill, especially since we
-        * don't even do gss on the backchannel yet.  But this is still
-        * less than 1k.  Tighten up this estimate in the unlikely event
-        * it turns out to be a problem for some client:
-        */
        if (ca->maxreq_sz < NFSD_CB_MAX_REQ_SZ)
                return nfserr_toosmall;
        if (ca->maxresp_sz < NFSD_CB_MAX_RESP_SZ)
@@ -2710,10 +2716,9 @@ nfsd4_create_session(struct svc_rqst *rqstp,
                goto out_free_conn;
        }
        status = nfs_ok;
-       /*
-        * We do not support RDMA or persistent sessions
-        */
+       /* Persistent sessions are not supported */
        cr_ses->flags &= ~SESSION4_PERSIST;
+       /* Upshifting from TCP to RDMA is not supported */
        cr_ses->flags &= ~SESSION4_RDMA;
 
        init_session(rqstp, new, conf, cr_ses);
index d6ef0955a979ca0eb3a581d09a33d8fcc7a48fd1..aa87954b4af28afb4dfe5d696a42ab30338a3009 100644 (file)
@@ -1072,8 +1072,9 @@ nfsd4_decode_rename(struct nfsd4_compoundargs *argp, struct nfsd4_rename *rename
 
        READ_BUF(4);
        rename->rn_snamelen = be32_to_cpup(p++);
-       READ_BUF(rename->rn_snamelen + 4);
+       READ_BUF(rename->rn_snamelen);
        SAVEMEM(rename->rn_sname, rename->rn_snamelen);
+       READ_BUF(4);
        rename->rn_tnamelen = be32_to_cpup(p++);
        READ_BUF(rename->rn_tnamelen);
        SAVEMEM(rename->rn_tname, rename->rn_tnamelen);
@@ -1155,13 +1156,14 @@ nfsd4_decode_setclientid(struct nfsd4_compoundargs *argp, struct nfsd4_setclient
        READ_BUF(8);
        setclientid->se_callback_prog = be32_to_cpup(p++);
        setclientid->se_callback_netid_len = be32_to_cpup(p++);
-
-       READ_BUF(setclientid->se_callback_netid_len + 4);
+       READ_BUF(setclientid->se_callback_netid_len);
        SAVEMEM(setclientid->se_callback_netid_val, setclientid->se_callback_netid_len);
+       READ_BUF(4);
        setclientid->se_callback_addr_len = be32_to_cpup(p++);
 
-       READ_BUF(setclientid->se_callback_addr_len + 4);
+       READ_BUF(setclientid->se_callback_addr_len);
        SAVEMEM(setclientid->se_callback_addr_val, setclientid->se_callback_addr_len);
+       READ_BUF(4);
        setclientid->se_callback_ident = be32_to_cpup(p++);
 
        DECODE_TAIL;
@@ -1835,8 +1837,9 @@ nfsd4_decode_compound(struct nfsd4_compoundargs *argp)
 
        READ_BUF(4);
        argp->taglen = be32_to_cpup(p++);
-       READ_BUF(argp->taglen + 8);
+       READ_BUF(argp->taglen);
        SAVEMEM(argp->tag, argp->taglen);
+       READ_BUF(8);
        argp->minorversion = be32_to_cpup(p++);
        argp->opcnt = be32_to_cpup(p++);
        max_reply += 4 + (XDR_QUADLEN(argp->taglen) << 2);
@@ -3060,7 +3063,7 @@ static __be32 nfsd4_encode_bind_conn_to_session(struct nfsd4_compoundres *resp,
                p = xdr_encode_opaque_fixed(p, bcts->sessionid.data,
                                                NFS4_MAX_SESSIONID_LEN);
                *p++ = cpu_to_be32(bcts->dir);
-               /* Sorry, we do not yet support RDMA over 4.1: */
+               /* Upshifting from TCP to RDMA is not supported */
                *p++ = cpu_to_be32(0);
        }
        return nfserr;
index d4c4453674c60d5b98120f1209030ba103dbe671..7d073b9b1553041d32910ef41de7883a90af3b37 100644 (file)
@@ -21,6 +21,7 @@ struct nfsd4_layout_ops {
        u32             notify_types;
 
        __be32 (*proc_getdeviceinfo)(struct super_block *sb,
+                       struct nfs4_client *clp,
                        struct nfsd4_getdeviceinfo *gdevp);
        __be32 (*encode_getdeviceinfo)(struct xdr_stream *xdr,
                        struct nfsd4_getdeviceinfo *gdevp);
@@ -32,10 +33,17 @@ struct nfsd4_layout_ops {
 
        __be32 (*proc_layoutcommit)(struct inode *inode,
                        struct nfsd4_layoutcommit *lcp);
+
+       void (*fence_client)(struct nfs4_layout_stateid *ls);
 };
 
 extern const struct nfsd4_layout_ops *nfsd4_layout_ops[];
+#ifdef CONFIG_NFSD_BLOCKLAYOUT
 extern const struct nfsd4_layout_ops bl_layout_ops;
+#endif
+#ifdef CONFIG_NFSD_SCSILAYOUT
+extern const struct nfsd4_layout_ops scsi_layout_ops;
+#endif
 
 __be32 nfsd4_preprocess_layout_stateid(struct svc_rqst *rqstp,
                struct nfsd4_compound_state *cstate, stateid_t *stateid,
index f64639176670b2228ac532607c0454d4b6e5a784..3542d94fddce5ca4a45e33e08a25fc894b2264eb 100644 (file)
@@ -121,4 +121,5 @@ xfs-$(CONFIG_XFS_RT)                += xfs_rtalloc.o
 xfs-$(CONFIG_XFS_POSIX_ACL)    += xfs_acl.o
 xfs-$(CONFIG_SYSCTL)           += xfs_sysctl.o
 xfs-$(CONFIG_COMPAT)           += xfs_ioctl32.o
-xfs-$(CONFIG_NFSD_PNFS)                += xfs_pnfs.o
+xfs-$(CONFIG_NFSD_BLOCKLAYOUT) += xfs_pnfs.o
+xfs-$(CONFIG_NFSD_SCSILAYOUT)  += xfs_pnfs.o
index 652cd3c5b58c1cac1562239c9c644a19dbe588b7..0492b82281f3f10bf6ed7b166ea7096b1ccccbc3 100644 (file)
@@ -246,7 +246,7 @@ const struct export_operations xfs_export_operations = {
        .fh_to_parent           = xfs_fs_fh_to_parent,
        .get_parent             = xfs_fs_get_parent,
        .commit_metadata        = xfs_fs_nfs_commit_metadata,
-#ifdef CONFIG_NFSD_PNFS
+#ifdef CONFIG_NFSD_BLOCKLAYOUT
        .get_uuid               = xfs_fs_get_uuid,
        .map_blocks             = xfs_fs_map_blocks,
        .commit_blocks          = xfs_fs_commit_blocks,
index 8147ac1088203b285bb8ebd6462c00d0c916abc1..93f74853961b1cce598f5b83e96e622457ae173e 100644 (file)
@@ -1,7 +1,7 @@
 #ifndef _XFS_PNFS_H
 #define _XFS_PNFS_H 1
 
-#ifdef CONFIG_NFSD_PNFS
+#if defined(CONFIG_NFSD_BLOCKLAYOUT) || defined(CONFIG_NFSD_SCSILAYOUT)
 int xfs_fs_get_uuid(struct super_block *sb, u8 *buf, u32 *len, u64 *offset);
 int xfs_fs_map_blocks(struct inode *inode, loff_t offset, u64 length,
                struct iomap *iomap, bool write, u32 *device_generation);
index d6f9b4e6006d0f3013a1386321685b01bd5e45c6..011433478a14811dbac1faaed622d3b5599aeb96 100644 (file)
@@ -529,6 +529,7 @@ enum pnfs_layouttype {
        LAYOUT_OSD2_OBJECTS = 2,
        LAYOUT_BLOCK_VOLUME = 3,
        LAYOUT_FLEX_FILES = 4,
+       LAYOUT_SCSI = 5,
        LAYOUT_TYPE_MAX
 };
 
@@ -555,6 +556,7 @@ enum pnfs_block_volume_type {
        PNFS_BLOCK_VOLUME_SLICE         = 1,
        PNFS_BLOCK_VOLUME_CONCAT        = 2,
        PNFS_BLOCK_VOLUME_STRIPE        = 3,
+       PNFS_BLOCK_VOLUME_SCSI          = 4,
 };
 
 enum pnfs_block_extent_state {
@@ -568,6 +570,23 @@ enum pnfs_block_extent_state {
 #define PNFS_BLOCK_EXTENT_SIZE \
        (7 * sizeof(__be32) + NFS4_DEVICEID4_SIZE)
 
+/* on the wire size of a scsi commit range */
+#define PNFS_SCSI_RANGE_SIZE \
+       (4 * sizeof(__be32))
+
+enum scsi_code_set {
+       PS_CODE_SET_BINARY      = 1,
+       PS_CODE_SET_ASCII       = 2,
+       PS_CODE_SET_UTF8        = 3
+};
+
+enum scsi_designator_type {
+       PS_DESIGNATOR_T10       = 1,
+       PS_DESIGNATOR_EUI64     = 2,
+       PS_DESIGNATOR_NAA       = 3,
+       PS_DESIGNATOR_NAME      = 8
+};
+
 #define NFL4_UFLG_MASK                 0x0000003F
 #define NFL4_UFLG_DENSE                        0x00000001
 #define NFL4_UFLG_COMMIT_THRU_MDS      0x00000002
index 1ecf13e148b8be110f0175950e21e1f77833bbe9..6a241a277249c989990771a76bbb554b86514ddc 100644 (file)
 #include <linux/uidgid.h>
 #include <linux/utsname.h>
 
+/*
+ * Maximum size of AUTH_NONE authentication information, in XDR words.
+ */
+#define NUL_CALLSLACK  (4)
+#define NUL_REPLYSLACK (2)
+
 /*
  * Size of the nodename buffer. RFC1831 specifies a hard limit of 255 bytes,
  * but Linux hostnames are actually limited to __NEW_UTS_LEN bytes.
  */
 #define UNX_MAXNODENAME        __NEW_UTS_LEN
+#define UNX_CALLSLACK  (21 + XDR_QUADLEN(UNX_MAXNODENAME))
 
 struct rpcsec_gss_info;
 
index f33c5a4d6fe47fddb2ae57e4eac448df6ff810c8..8c6d23cb0cae1a8d90874c6962f1d769300bfdb1 100644 (file)
@@ -102,6 +102,7 @@ struct rpcrdma_msg {
  * Smallest RPC/RDMA header: rm_xid through rm_type, then rm_nochunks
  */
 #define RPCRDMA_HDRLEN_MIN     (sizeof(__be32) * 7)
+#define RPCRDMA_HDRLEN_ERR     (sizeof(__be32) * 5)
 
 enum rpcrdma_errcode {
        ERR_VERS = 1,
index 5322fea6fe4c7995ae3ad0cfad15a1818356576f..3081339968c3b7e3224248e9bd91745bf2ec72b5 100644 (file)
@@ -75,8 +75,10 @@ struct svc_rdma_op_ctxt {
        struct svc_rdma_fastreg_mr *frmr;
        int hdr_count;
        struct xdr_buf arg;
+       struct ib_cqe cqe;
+       struct ib_cqe reg_cqe;
+       struct ib_cqe inv_cqe;
        struct list_head dto_q;
-       enum ib_wr_opcode wr_op;
        enum ib_wc_status wc_status;
        u32 byte_len;
        u32 position;
@@ -174,8 +176,6 @@ struct svcxprt_rdma {
        struct work_struct   sc_work;
 };
 /* sc_flags */
-#define RDMAXPRT_RQ_PENDING    1
-#define RDMAXPRT_SQ_PENDING    2
 #define RDMAXPRT_CONN_PENDING  3
 
 #define RPCRDMA_LISTEN_BACKLOG  10
@@ -199,7 +199,7 @@ extern int svc_rdma_handle_bc_reply(struct rpc_xprt *xprt,
                                    struct xdr_buf *rcvbuf);
 
 /* svc_rdma_marshal.c */
-extern int svc_rdma_xdr_decode_req(struct rpcrdma_msg **, struct svc_rqst *);
+extern int svc_rdma_xdr_decode_req(struct rpcrdma_msg *, struct svc_rqst *);
 extern int svc_rdma_xdr_encode_error(struct svcxprt_rdma *,
                                     struct rpcrdma_msg *,
                                     enum rpcrdma_errcode, __be32 *);
@@ -224,16 +224,22 @@ extern int rdma_read_chunk_frmr(struct svcxprt_rdma *, struct svc_rqst *,
 
 /* svc_rdma_sendto.c */
 extern int svc_rdma_map_xdr(struct svcxprt_rdma *, struct xdr_buf *,
-                           struct svc_rdma_req_map *);
+                           struct svc_rdma_req_map *, bool);
 extern int svc_rdma_sendto(struct svc_rqst *);
 extern struct rpcrdma_read_chunk *
        svc_rdma_get_read_chunk(struct rpcrdma_msg *);
+extern void svc_rdma_send_error(struct svcxprt_rdma *, struct rpcrdma_msg *,
+                               int);
 
 /* svc_rdma_transport.c */
+extern void svc_rdma_wc_send(struct ib_cq *, struct ib_wc *);
+extern void svc_rdma_wc_write(struct ib_cq *, struct ib_wc *);
+extern void svc_rdma_wc_reg(struct ib_cq *, struct ib_wc *);
+extern void svc_rdma_wc_read(struct ib_cq *, struct ib_wc *);
+extern void svc_rdma_wc_inv(struct ib_cq *, struct ib_wc *);
 extern int svc_rdma_send(struct svcxprt_rdma *, struct ib_send_wr *);
-extern void svc_rdma_send_error(struct svcxprt_rdma *, struct rpcrdma_msg *,
-                               enum rpcrdma_errcode);
 extern int svc_rdma_post_recv(struct svcxprt_rdma *, gfp_t);
+extern int svc_rdma_repost_recv(struct svcxprt_rdma *, gfp_t);
 extern int svc_rdma_create_listen(struct svc_serv *, int, struct sockaddr *);
 extern struct svc_rdma_op_ctxt *svc_rdma_get_context(struct svcxprt_rdma *);
 extern void svc_rdma_put_context(struct svc_rdma_op_ctxt *, int);
index c2a2b584a056ab3b716b985ebf52058f3326fd37..8d9eb4d5ddd8fd861a70a30f9922a3203ad8bb6f 100644 (file)
@@ -113,8 +113,8 @@ const struct rpc_authops authnull_ops = {
 
 static
 struct rpc_auth null_auth = {
-       .au_cslack      = 4,
-       .au_rslack      = 2,
+       .au_cslack      = NUL_CALLSLACK,
+       .au_rslack      = NUL_REPLYSLACK,
        .au_ops         = &authnull_ops,
        .au_flavor      = RPC_AUTH_NULL,
        .au_count       = ATOMIC_INIT(0),
index 548240dd15fcf018f81134ba0717327c13eb580e..0d3dd364c22f5e1671e1048d8abf4d37b51e268b 100644 (file)
@@ -23,8 +23,6 @@ struct unx_cred {
 };
 #define uc_uid                 uc_base.cr_uid
 
-#define UNX_WRITESLACK         (21 + XDR_QUADLEN(UNX_MAXNODENAME))
-
 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
 # define RPCDBG_FACILITY       RPCDBG_AUTH
 #endif
@@ -228,8 +226,8 @@ const struct rpc_authops authunix_ops = {
 
 static
 struct rpc_auth                unix_auth = {
-       .au_cslack      = UNX_WRITESLACK,
-       .au_rslack      = 2,                    /* assume AUTH_NULL verf */
+       .au_cslack      = UNX_CALLSLACK,
+       .au_rslack      = NUL_REPLYSLACK,
        .au_ops         = &authunix_ops,
        .au_flavor      = RPC_AUTH_UNIX,
        .au_count       = ATOMIC_INIT(0),
index 65a7c232a34569accb9a0b7ee33ab4fc32633be5..a2a7519b0f23575d3b7e891973c5026634fe0537 100644 (file)
@@ -107,26 +107,18 @@ static int svc_rdma_bc_sendto(struct svcxprt_rdma *rdma,
        int ret;
 
        vec = svc_rdma_get_req_map(rdma);
-       ret = svc_rdma_map_xdr(rdma, sndbuf, vec);
+       ret = svc_rdma_map_xdr(rdma, sndbuf, vec, false);
        if (ret)
                goto out_err;
 
-       /* Post a recv buffer to handle the reply for this request. */
-       ret = svc_rdma_post_recv(rdma, GFP_NOIO);
-       if (ret) {
-               pr_err("svcrdma: Failed to post bc receive buffer, err=%d.\n",
-                      ret);
-               pr_err("svcrdma: closing transport %p.\n", rdma);
-               set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags);
-               ret = -ENOTCONN;
+       ret = svc_rdma_repost_recv(rdma, GFP_NOIO);
+       if (ret)
                goto out_err;
-       }
 
        ctxt = svc_rdma_get_context(rdma);
        ctxt->pages[0] = virt_to_page(rqst->rq_buffer);
        ctxt->count = 1;
 
-       ctxt->wr_op = IB_WR_SEND;
        ctxt->direction = DMA_TO_DEVICE;
        ctxt->sge[0].lkey = rdma->sc_pd->local_dma_lkey;
        ctxt->sge[0].length = sndbuf->len;
@@ -140,7 +132,8 @@ static int svc_rdma_bc_sendto(struct svcxprt_rdma *rdma,
        atomic_inc(&rdma->sc_dma_used);
 
        memset(&send_wr, 0, sizeof(send_wr));
-       send_wr.wr_id = (unsigned long)ctxt;
+       ctxt->cqe.done = svc_rdma_wc_send;
+       send_wr.wr_cqe = &ctxt->cqe;
        send_wr.sg_list = ctxt->sge;
        send_wr.num_sge = 1;
        send_wr.opcode = IB_WR_SEND;
index e2fca7617242cc394d8aa41cf9b3a0b9e916dcee..765bca47c74d7e9f28ad1be961ce86dacc30dd81 100644 (file)
@@ -145,29 +145,44 @@ static __be32 *decode_reply_array(__be32 *va, __be32 *vaend)
        return (__be32 *)&ary->wc_array[nchunks];
 }
 
-int svc_rdma_xdr_decode_req(struct rpcrdma_msg **rdma_req,
-                           struct svc_rqst *rqstp)
+int svc_rdma_xdr_decode_req(struct rpcrdma_msg *rmsgp, struct svc_rqst *rqstp)
 {
-       struct rpcrdma_msg *rmsgp = NULL;
        __be32 *va, *vaend;
+       unsigned int len;
        u32 hdr_len;
 
-       rmsgp = (struct rpcrdma_msg *)rqstp->rq_arg.head[0].iov_base;
-
        /* Verify that there's enough bytes for header + something */
-       if (rqstp->rq_arg.len <= RPCRDMA_HDRLEN_MIN) {
+       if (rqstp->rq_arg.len <= RPCRDMA_HDRLEN_ERR) {
                dprintk("svcrdma: header too short = %d\n",
                        rqstp->rq_arg.len);
                return -EINVAL;
        }
 
-       if (rmsgp->rm_vers != rpcrdma_version)
-               return -ENOSYS;
-
-       /* Pull in the extra for the padded case and bump our pointer */
-       if (rmsgp->rm_type == rdma_msgp) {
-               int hdrlen;
+       if (rmsgp->rm_vers != rpcrdma_version) {
+               dprintk("%s: bad version %u\n", __func__,
+                       be32_to_cpu(rmsgp->rm_vers));
+               return -EPROTONOSUPPORT;
+       }
 
+       switch (be32_to_cpu(rmsgp->rm_type)) {
+       case RDMA_MSG:
+       case RDMA_NOMSG:
+               break;
+
+       case RDMA_DONE:
+               /* Just drop it */
+               dprintk("svcrdma: dropping RDMA_DONE message\n");
+               return 0;
+
+       case RDMA_ERROR:
+               /* Possible if this is a backchannel reply.
+                * XXX: We should cancel this XID, though.
+                */
+               dprintk("svcrdma: dropping RDMA_ERROR message\n");
+               return 0;
+
+       case RDMA_MSGP:
+               /* Pull in the extra for the padded case, bump our pointer */
                rmsgp->rm_body.rm_padded.rm_align =
                        be32_to_cpu(rmsgp->rm_body.rm_padded.rm_align);
                rmsgp->rm_body.rm_padded.rm_thresh =
@@ -175,11 +190,15 @@ int svc_rdma_xdr_decode_req(struct rpcrdma_msg **rdma_req,
 
                va = &rmsgp->rm_body.rm_padded.rm_pempty[4];
                rqstp->rq_arg.head[0].iov_base = va;
-               hdrlen = (u32)((unsigned long)va - (unsigned long)rmsgp);
-               rqstp->rq_arg.head[0].iov_len -= hdrlen;
-               if (hdrlen > rqstp->rq_arg.len)
+               len = (u32)((unsigned long)va - (unsigned long)rmsgp);
+               rqstp->rq_arg.head[0].iov_len -= len;
+               if (len > rqstp->rq_arg.len)
                        return -EINVAL;
-               return hdrlen;
+               return len;
+       default:
+               dprintk("svcrdma: bad rdma procedure (%u)\n",
+                       be32_to_cpu(rmsgp->rm_type));
+               return -EINVAL;
        }
 
        /* The chunk list may contain either a read chunk list or a write
@@ -188,20 +207,25 @@ int svc_rdma_xdr_decode_req(struct rpcrdma_msg **rdma_req,
        va = &rmsgp->rm_body.rm_chunks[0];
        vaend = (__be32 *)((unsigned long)rmsgp + rqstp->rq_arg.len);
        va = decode_read_list(va, vaend);
-       if (!va)
+       if (!va) {
+               dprintk("svcrdma: failed to decode read list\n");
                return -EINVAL;
+       }
        va = decode_write_list(va, vaend);
-       if (!va)
+       if (!va) {
+               dprintk("svcrdma: failed to decode write list\n");
                return -EINVAL;
+       }
        va = decode_reply_array(va, vaend);
-       if (!va)
+       if (!va) {
+               dprintk("svcrdma: failed to decode reply chunk\n");
                return -EINVAL;
+       }
 
        rqstp->rq_arg.head[0].iov_base = va;
        hdr_len = (unsigned long)va - (unsigned long)rmsgp;
        rqstp->rq_arg.head[0].iov_len -= hdr_len;
 
-       *rdma_req = rmsgp;
        return hdr_len;
 }
 
index c8b8a8b4181eafa75de0d673040b9f927fa74d9d..3b24a646eb46725219011ffe859d998a0af06bb3 100644 (file)
@@ -180,9 +180,9 @@ int rdma_read_chunk_lcl(struct svcxprt_rdma *xprt,
                clear_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags);
 
        memset(&read_wr, 0, sizeof(read_wr));
-       read_wr.wr.wr_id = (unsigned long)ctxt;
+       ctxt->cqe.done = svc_rdma_wc_read;
+       read_wr.wr.wr_cqe = &ctxt->cqe;
        read_wr.wr.opcode = IB_WR_RDMA_READ;
-       ctxt->wr_op = read_wr.wr.opcode;
        read_wr.wr.send_flags = IB_SEND_SIGNALED;
        read_wr.rkey = rs_handle;
        read_wr.remote_addr = rs_offset;
@@ -299,8 +299,9 @@ int rdma_read_chunk_frmr(struct svcxprt_rdma *xprt,
        ctxt->read_hdr = head;
 
        /* Prepare REG WR */
+       ctxt->reg_cqe.done = svc_rdma_wc_reg;
+       reg_wr.wr.wr_cqe = &ctxt->reg_cqe;
        reg_wr.wr.opcode = IB_WR_REG_MR;
-       reg_wr.wr.wr_id = 0;
        reg_wr.wr.send_flags = IB_SEND_SIGNALED;
        reg_wr.wr.num_sge = 0;
        reg_wr.mr = frmr->mr;
@@ -310,6 +311,8 @@ int rdma_read_chunk_frmr(struct svcxprt_rdma *xprt,
 
        /* Prepare RDMA_READ */
        memset(&read_wr, 0, sizeof(read_wr));
+       ctxt->cqe.done = svc_rdma_wc_read;
+       read_wr.wr.wr_cqe = &ctxt->cqe;
        read_wr.wr.send_flags = IB_SEND_SIGNALED;
        read_wr.rkey = rs_handle;
        read_wr.remote_addr = rs_offset;
@@ -317,19 +320,18 @@ int rdma_read_chunk_frmr(struct svcxprt_rdma *xprt,
        read_wr.wr.num_sge = 1;
        if (xprt->sc_dev_caps & SVCRDMA_DEVCAP_READ_W_INV) {
                read_wr.wr.opcode = IB_WR_RDMA_READ_WITH_INV;
-               read_wr.wr.wr_id = (unsigned long)ctxt;
                read_wr.wr.ex.invalidate_rkey = ctxt->frmr->mr->lkey;
        } else {
                read_wr.wr.opcode = IB_WR_RDMA_READ;
                read_wr.wr.next = &inv_wr;
                /* Prepare invalidate */
                memset(&inv_wr, 0, sizeof(inv_wr));
-               inv_wr.wr_id = (unsigned long)ctxt;
+               ctxt->inv_cqe.done = svc_rdma_wc_inv;
+               inv_wr.wr_cqe = &ctxt->inv_cqe;
                inv_wr.opcode = IB_WR_LOCAL_INV;
                inv_wr.send_flags = IB_SEND_SIGNALED | IB_SEND_FENCE;
                inv_wr.ex.invalidate_rkey = frmr->mr->lkey;
        }
-       ctxt->wr_op = read_wr.wr.opcode;
 
        /* Post the chain */
        ret = svc_rdma_send(xprt, &reg_wr.wr);
@@ -612,7 +614,6 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp)
        struct svc_rdma_op_ctxt *ctxt = NULL;
        struct rpcrdma_msg *rmsgp;
        int ret = 0;
-       int len;
 
        dprintk("svcrdma: rqstp=%p\n", rqstp);
 
@@ -642,8 +643,7 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp)
                 * transport list
                 */
                if (test_bit(XPT_CLOSE, &xprt->xpt_flags))
-                       goto close_out;
-
+                       goto defer;
                goto out;
        }
        dprintk("svcrdma: processing ctxt=%p on xprt=%p, rqstp=%p, status=%d\n",
@@ -654,15 +654,13 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp)
        rdma_build_arg_xdr(rqstp, ctxt, ctxt->byte_len);
 
        /* Decode the RDMA header. */
-       len = svc_rdma_xdr_decode_req(&rmsgp, rqstp);
-       rqstp->rq_xprt_hlen = len;
-
-       /* If the request is invalid, reply with an error */
-       if (len < 0) {
-               if (len == -ENOSYS)
-                       svc_rdma_send_error(rdma_xprt, rmsgp, ERR_VERS);
-               goto close_out;
-       }
+       rmsgp = (struct rpcrdma_msg *)rqstp->rq_arg.head[0].iov_base;
+       ret = svc_rdma_xdr_decode_req(rmsgp, rqstp);
+       if (ret < 0)
+               goto out_err;
+       if (ret == 0)
+               goto out_drop;
+       rqstp->rq_xprt_hlen = ret;
 
        if (svc_rdma_is_backchannel_reply(xprt, rmsgp)) {
                ret = svc_rdma_handle_bc_reply(xprt->xpt_bc_xprt, rmsgp,
@@ -698,26 +696,16 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp)
        svc_xprt_copy_addrs(rqstp, xprt);
        return ret;
 
- close_out:
-       if (ctxt)
-               svc_rdma_put_context(ctxt, 1);
-       dprintk("svcrdma: transport %p is closing\n", xprt);
-       /*
-        * Set the close bit and enqueue it. svc_recv will see the
-        * close bit and call svc_xprt_delete
-        */
-       set_bit(XPT_CLOSE, &xprt->xpt_flags);
+out_err:
+       svc_rdma_send_error(rdma_xprt, rmsgp, ret);
+       svc_rdma_put_context(ctxt, 0);
+       return 0;
+
 defer:
        return 0;
 
+out_drop:
+       svc_rdma_put_context(ctxt, 1);
 repost:
-       ret = svc_rdma_post_recv(rdma_xprt, GFP_KERNEL);
-       if (ret) {
-               pr_err("svcrdma: could not post a receive buffer, err=%d.\n",
-                      ret);
-               pr_err("svcrdma: closing transport %p.\n", rdma_xprt);
-               set_bit(XPT_CLOSE, &rdma_xprt->sc_xprt.xpt_flags);
-               ret = -ENOTCONN;
-       }
-       return ret;
+       return svc_rdma_repost_recv(rdma_xprt, GFP_KERNEL);
 }
index df57f3ce6cd2cc3cae5fa2a709e62ceae2828a26..4f1b1c4f45f9d11d1ca3df8767ea76e6b07d99ae 100644 (file)
 
 #define RPCDBG_FACILITY        RPCDBG_SVCXPRT
 
+static u32 xdr_padsize(u32 len)
+{
+       return (len & 3) ? (4 - (len & 3)) : 0;
+}
+
 int svc_rdma_map_xdr(struct svcxprt_rdma *xprt,
                     struct xdr_buf *xdr,
-                    struct svc_rdma_req_map *vec)
+                    struct svc_rdma_req_map *vec,
+                    bool write_chunk_present)
 {
        int sge_no;
        u32 sge_bytes;
@@ -92,9 +98,20 @@ int svc_rdma_map_xdr(struct svcxprt_rdma *xprt,
 
        /* Tail SGE */
        if (xdr->tail[0].iov_len) {
-               vec->sge[sge_no].iov_base = xdr->tail[0].iov_base;
-               vec->sge[sge_no].iov_len = xdr->tail[0].iov_len;
-               sge_no++;
+               unsigned char *base = xdr->tail[0].iov_base;
+               size_t len = xdr->tail[0].iov_len;
+               u32 xdr_pad = xdr_padsize(xdr->page_len);
+
+               if (write_chunk_present && xdr_pad) {
+                       base += xdr_pad;
+                       len -= xdr_pad;
+               }
+
+               if (len) {
+                       vec->sge[sge_no].iov_base = base;
+                       vec->sge[sge_no].iov_len = len;
+                       sge_no++;
+               }
        }
 
        dprintk("svcrdma: %s: sge_no %d page_no %d "
@@ -166,10 +183,10 @@ svc_rdma_get_write_array(struct rpcrdma_msg *rmsgp)
  * reply array is present
  */
 static struct rpcrdma_write_array *
-svc_rdma_get_reply_array(struct rpcrdma_msg *rmsgp)
+svc_rdma_get_reply_array(struct rpcrdma_msg *rmsgp,
+                        struct rpcrdma_write_array *wr_ary)
 {
        struct rpcrdma_read_chunk *rch;
-       struct rpcrdma_write_array *wr_ary;
        struct rpcrdma_write_array *rp_ary;
 
        /* XXX: Need to fix when reply chunk may occur with read list
@@ -191,7 +208,6 @@ svc_rdma_get_reply_array(struct rpcrdma_msg *rmsgp)
                goto found_it;
        }
 
-       wr_ary = svc_rdma_get_write_array(rmsgp);
        if (wr_ary) {
                int chunk = be32_to_cpu(wr_ary->wc_nchunks);
 
@@ -281,8 +297,8 @@ static int send_write(struct svcxprt_rdma *xprt, struct svc_rqst *rqstp,
 
        /* Prepare WRITE WR */
        memset(&write_wr, 0, sizeof write_wr);
-       ctxt->wr_op = IB_WR_RDMA_WRITE;
-       write_wr.wr.wr_id = (unsigned long)ctxt;
+       ctxt->cqe.done = svc_rdma_wc_write;
+       write_wr.wr.wr_cqe = &ctxt->cqe;
        write_wr.wr.sg_list = &sge[0];
        write_wr.wr.num_sge = sge_no;
        write_wr.wr.opcode = IB_WR_RDMA_WRITE;
@@ -298,41 +314,37 @@ static int send_write(struct svcxprt_rdma *xprt, struct svc_rqst *rqstp,
  err:
        svc_rdma_unmap_dma(ctxt);
        svc_rdma_put_context(ctxt, 0);
-       /* Fatal error, close transport */
        return -EIO;
 }
 
+noinline
 static int send_write_chunks(struct svcxprt_rdma *xprt,
-                            struct rpcrdma_msg *rdma_argp,
+                            struct rpcrdma_write_array *wr_ary,
                             struct rpcrdma_msg *rdma_resp,
                             struct svc_rqst *rqstp,
                             struct svc_rdma_req_map *vec)
 {
-       u32 xfer_len = rqstp->rq_res.page_len + rqstp->rq_res.tail[0].iov_len;
+       u32 xfer_len = rqstp->rq_res.page_len;
        int write_len;
        u32 xdr_off;
        int chunk_off;
        int chunk_no;
        int nchunks;
-       struct rpcrdma_write_array *arg_ary;
        struct rpcrdma_write_array *res_ary;
        int ret;
 
-       arg_ary = svc_rdma_get_write_array(rdma_argp);
-       if (!arg_ary)
-               return 0;
        res_ary = (struct rpcrdma_write_array *)
                &rdma_resp->rm_body.rm_chunks[1];
 
        /* Write chunks start at the pagelist */
-       nchunks = be32_to_cpu(arg_ary->wc_nchunks);
+       nchunks = be32_to_cpu(wr_ary->wc_nchunks);
        for (xdr_off = rqstp->rq_res.head[0].iov_len, chunk_no = 0;
             xfer_len && chunk_no < nchunks;
             chunk_no++) {
                struct rpcrdma_segment *arg_ch;
                u64 rs_offset;
 
-               arg_ch = &arg_ary->wc_array[chunk_no].wc_target;
+               arg_ch = &wr_ary->wc_array[chunk_no].wc_target;
                write_len = min(xfer_len, be32_to_cpu(arg_ch->rs_length));
 
                /* Prepare the response chunk given the length actually
@@ -350,11 +362,8 @@ static int send_write_chunks(struct svcxprt_rdma *xprt,
                                         xdr_off,
                                         write_len,
                                         vec);
-                       if (ret <= 0) {
-                               dprintk("svcrdma: RDMA_WRITE failed, ret=%d\n",
-                                       ret);
-                               return -EIO;
-                       }
+                       if (ret <= 0)
+                               goto out_err;
                        chunk_off += ret;
                        xdr_off += ret;
                        xfer_len -= ret;
@@ -364,11 +373,16 @@ static int send_write_chunks(struct svcxprt_rdma *xprt,
        /* Update the req with the number of chunks actually used */
        svc_rdma_xdr_encode_write_list(rdma_resp, chunk_no);
 
-       return rqstp->rq_res.page_len + rqstp->rq_res.tail[0].iov_len;
+       return rqstp->rq_res.page_len;
+
+out_err:
+       pr_err("svcrdma: failed to send write chunks, rc=%d\n", ret);
+       return -EIO;
 }
 
+noinline
 static int send_reply_chunks(struct svcxprt_rdma *xprt,
-                            struct rpcrdma_msg *rdma_argp,
+                            struct rpcrdma_write_array *rp_ary,
                             struct rpcrdma_msg *rdma_resp,
                             struct svc_rqst *rqstp,
                             struct svc_rdma_req_map *vec)
@@ -380,25 +394,21 @@ static int send_reply_chunks(struct svcxprt_rdma *xprt,
        int chunk_off;
        int nchunks;
        struct rpcrdma_segment *ch;
-       struct rpcrdma_write_array *arg_ary;
        struct rpcrdma_write_array *res_ary;
        int ret;
 
-       arg_ary = svc_rdma_get_reply_array(rdma_argp);
-       if (!arg_ary)
-               return 0;
        /* XXX: need to fix when reply lists occur with read-list and or
         * write-list */
        res_ary = (struct rpcrdma_write_array *)
                &rdma_resp->rm_body.rm_chunks[2];
 
        /* xdr offset starts at RPC message */
-       nchunks = be32_to_cpu(arg_ary->wc_nchunks);
+       nchunks = be32_to_cpu(rp_ary->wc_nchunks);
        for (xdr_off = 0, chunk_no = 0;
             xfer_len && chunk_no < nchunks;
             chunk_no++) {
                u64 rs_offset;
-               ch = &arg_ary->wc_array[chunk_no].wc_target;
+               ch = &rp_ary->wc_array[chunk_no].wc_target;
                write_len = min(xfer_len, be32_to_cpu(ch->rs_length));
 
                /* Prepare the reply chunk given the length actually
@@ -415,11 +425,8 @@ static int send_reply_chunks(struct svcxprt_rdma *xprt,
                                         xdr_off,
                                         write_len,
                                         vec);
-                       if (ret <= 0) {
-                               dprintk("svcrdma: RDMA_WRITE failed, ret=%d\n",
-                                       ret);
-                               return -EIO;
-                       }
+                       if (ret <= 0)
+                               goto out_err;
                        chunk_off += ret;
                        xdr_off += ret;
                        xfer_len -= ret;
@@ -430,6 +437,10 @@ static int send_reply_chunks(struct svcxprt_rdma *xprt,
        svc_rdma_xdr_encode_reply_array(res_ary, chunk_no);
 
        return rqstp->rq_res.len;
+
+out_err:
+       pr_err("svcrdma: failed to send reply chunks, rc=%d\n", ret);
+       return -EIO;
 }
 
 /* This function prepares the portion of the RPCRDMA message to be
@@ -464,13 +475,8 @@ static int send_reply(struct svcxprt_rdma *rdma,
        int pages;
        int ret;
 
-       /* Post a recv buffer to handle another request. */
-       ret = svc_rdma_post_recv(rdma, GFP_KERNEL);
+       ret = svc_rdma_repost_recv(rdma, GFP_KERNEL);
        if (ret) {
-               printk(KERN_INFO
-                      "svcrdma: could not post a receive buffer, err=%d."
-                      "Closing transport %p.\n", ret, rdma);
-               set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags);
                svc_rdma_put_context(ctxt, 0);
                return -ENOTCONN;
        }
@@ -543,8 +549,8 @@ static int send_reply(struct svcxprt_rdma *rdma,
                goto err;
        }
        memset(&send_wr, 0, sizeof send_wr);
-       ctxt->wr_op = IB_WR_SEND;
-       send_wr.wr_id = (unsigned long)ctxt;
+       ctxt->cqe.done = svc_rdma_wc_send;
+       send_wr.wr_cqe = &ctxt->cqe;
        send_wr.sg_list = ctxt->sge;
        send_wr.num_sge = sge_no;
        send_wr.opcode = IB_WR_SEND;
@@ -559,6 +565,7 @@ static int send_reply(struct svcxprt_rdma *rdma,
  err:
        svc_rdma_unmap_dma(ctxt);
        svc_rdma_put_context(ctxt, 1);
+       pr_err("svcrdma: failed to send reply, rc=%d\n", ret);
        return -EIO;
 }
 
@@ -573,7 +580,7 @@ int svc_rdma_sendto(struct svc_rqst *rqstp)
                container_of(xprt, struct svcxprt_rdma, sc_xprt);
        struct rpcrdma_msg *rdma_argp;
        struct rpcrdma_msg *rdma_resp;
-       struct rpcrdma_write_array *reply_ary;
+       struct rpcrdma_write_array *wr_ary, *rp_ary;
        enum rpcrdma_proc reply_type;
        int ret;
        int inline_bytes;
@@ -587,12 +594,14 @@ int svc_rdma_sendto(struct svc_rqst *rqstp)
         * places this at the start of page 0.
         */
        rdma_argp = page_address(rqstp->rq_pages[0]);
+       wr_ary = svc_rdma_get_write_array(rdma_argp);
+       rp_ary = svc_rdma_get_reply_array(rdma_argp, wr_ary);
 
        /* Build an req vec for the XDR */
        ctxt = svc_rdma_get_context(rdma);
        ctxt->direction = DMA_TO_DEVICE;
        vec = svc_rdma_get_req_map(rdma);
-       ret = svc_rdma_map_xdr(rdma, &rqstp->rq_res, vec);
+       ret = svc_rdma_map_xdr(rdma, &rqstp->rq_res, vec, wr_ary != NULL);
        if (ret)
                goto err0;
        inline_bytes = rqstp->rq_res.len;
@@ -603,8 +612,7 @@ int svc_rdma_sendto(struct svc_rqst *rqstp)
        if (!res_page)
                goto err0;
        rdma_resp = page_address(res_page);
-       reply_ary = svc_rdma_get_reply_array(rdma_argp);
-       if (reply_ary)
+       if (rp_ary)
                reply_type = RDMA_NOMSG;
        else
                reply_type = RDMA_MSG;
@@ -612,27 +620,26 @@ int svc_rdma_sendto(struct svc_rqst *rqstp)
                                         rdma_resp, reply_type);
 
        /* Send any write-chunk data and build resp write-list */
-       ret = send_write_chunks(rdma, rdma_argp, rdma_resp,
-                               rqstp, vec);
-       if (ret < 0) {
-               printk(KERN_ERR "svcrdma: failed to send write chunks, rc=%d\n",
-                      ret);
-               goto err1;
+       if (wr_ary) {
+               ret = send_write_chunks(rdma, wr_ary, rdma_resp, rqstp, vec);
+               if (ret < 0)
+                       goto err1;
+               inline_bytes -= ret + xdr_padsize(ret);
        }
-       inline_bytes -= ret;
 
        /* Send any reply-list data and update resp reply-list */
-       ret = send_reply_chunks(rdma, rdma_argp, rdma_resp,
-                               rqstp, vec);
-       if (ret < 0) {
-               printk(KERN_ERR "svcrdma: failed to send reply chunks, rc=%d\n",
-                      ret);
-               goto err1;
+       if (rp_ary) {
+               ret = send_reply_chunks(rdma, rp_ary, rdma_resp, rqstp, vec);
+               if (ret < 0)
+                       goto err1;
+               inline_bytes -= ret;
        }
-       inline_bytes -= ret;
 
        ret = send_reply(rdma, rqstp, res_page, rdma_resp, ctxt, vec,
                         inline_bytes);
+       if (ret < 0)
+               goto err1;
+
        svc_rdma_put_req_map(rdma, vec);
        dprintk("svcrdma: send_reply returns %d\n", ret);
        return ret;
@@ -642,5 +649,68 @@ int svc_rdma_sendto(struct svc_rqst *rqstp)
  err0:
        svc_rdma_put_req_map(rdma, vec);
        svc_rdma_put_context(ctxt, 0);
-       return ret;
+       set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags);
+       return -ENOTCONN;
+}
+
+void svc_rdma_send_error(struct svcxprt_rdma *xprt, struct rpcrdma_msg *rmsgp,
+                        int status)
+{
+       struct ib_send_wr err_wr;
+       struct page *p;
+       struct svc_rdma_op_ctxt *ctxt;
+       enum rpcrdma_errcode err;
+       __be32 *va;
+       int length;
+       int ret;
+
+       ret = svc_rdma_repost_recv(xprt, GFP_KERNEL);
+       if (ret)
+               return;
+
+       p = alloc_page(GFP_KERNEL);
+       if (!p)
+               return;
+       va = page_address(p);
+
+       /* XDR encode an error reply */
+       err = ERR_CHUNK;
+       if (status == -EPROTONOSUPPORT)
+               err = ERR_VERS;
+       length = svc_rdma_xdr_encode_error(xprt, rmsgp, err, va);
+
+       ctxt = svc_rdma_get_context(xprt);
+       ctxt->direction = DMA_TO_DEVICE;
+       ctxt->count = 1;
+       ctxt->pages[0] = p;
+
+       /* Prepare SGE for local address */
+       ctxt->sge[0].lkey = xprt->sc_pd->local_dma_lkey;
+       ctxt->sge[0].length = length;
+       ctxt->sge[0].addr = ib_dma_map_page(xprt->sc_cm_id->device,
+                                           p, 0, length, DMA_TO_DEVICE);
+       if (ib_dma_mapping_error(xprt->sc_cm_id->device, ctxt->sge[0].addr)) {
+               dprintk("svcrdma: Error mapping buffer for protocol error\n");
+               svc_rdma_put_context(ctxt, 1);
+               return;
+       }
+       atomic_inc(&xprt->sc_dma_used);
+
+       /* Prepare SEND WR */
+       memset(&err_wr, 0, sizeof(err_wr));
+       ctxt->cqe.done = svc_rdma_wc_send;
+       err_wr.wr_cqe = &ctxt->cqe;
+       err_wr.sg_list = ctxt->sge;
+       err_wr.num_sge = 1;
+       err_wr.opcode = IB_WR_SEND;
+       err_wr.send_flags = IB_SEND_SIGNALED;
+
+       /* Post It */
+       ret = svc_rdma_send(xprt, &err_wr);
+       if (ret) {
+               dprintk("svcrdma: Error %d posting send for protocol error\n",
+                       ret);
+               svc_rdma_unmap_dma(ctxt);
+               svc_rdma_put_context(ctxt, 1);
+       }
 }
index 5763825d09bf776bfa89f0007be1805203d96adf..90668969d5596b199c9fad0c188a3d72dccc99e6 100644 (file)
@@ -63,17 +63,10 @@ static struct svc_xprt *svc_rdma_create(struct svc_serv *serv,
                                        int flags);
 static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt);
 static void svc_rdma_release_rqst(struct svc_rqst *);
-static void dto_tasklet_func(unsigned long data);
 static void svc_rdma_detach(struct svc_xprt *xprt);
 static void svc_rdma_free(struct svc_xprt *xprt);
 static int svc_rdma_has_wspace(struct svc_xprt *xprt);
 static int svc_rdma_secure_port(struct svc_rqst *);
-static void rq_cq_reap(struct svcxprt_rdma *xprt);
-static void sq_cq_reap(struct svcxprt_rdma *xprt);
-
-static DECLARE_TASKLET(dto_tasklet, dto_tasklet_func, 0UL);
-static DEFINE_SPINLOCK(dto_lock);
-static LIST_HEAD(dto_xprt_q);
 
 static struct svc_xprt_ops svc_rdma_ops = {
        .xpo_create = svc_rdma_create,
@@ -352,15 +345,6 @@ static void svc_rdma_destroy_maps(struct svcxprt_rdma *xprt)
        }
 }
 
-/* ib_cq event handler */
-static void cq_event_handler(struct ib_event *event, void *context)
-{
-       struct svc_xprt *xprt = context;
-       dprintk("svcrdma: received CQ event %s (%d), context=%p\n",
-               ib_event_msg(event->event), event->event, context);
-       set_bit(XPT_CLOSE, &xprt->xpt_flags);
-}
-
 /* QP event handler */
 static void qp_event_handler(struct ib_event *event, void *context)
 {
@@ -392,251 +376,171 @@ static void qp_event_handler(struct ib_event *event, void *context)
        }
 }
 
-/*
- * Data Transfer Operation Tasklet
+/**
+ * svc_rdma_wc_receive - Invoked by RDMA provider for each polled Receive WC
+ * @cq:        completion queue
+ * @wc:        completed WR
  *
- * Walks a list of transports with I/O pending, removing entries as
- * they are added to the server's I/O pending list. Two bits indicate
- * if SQ, RQ, or both have I/O pending. The dto_lock is an irqsave
- * spinlock that serializes access to the transport list with the RQ
- * and SQ interrupt handlers.
  */
-static void dto_tasklet_func(unsigned long data)
+static void svc_rdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc)
 {
-       struct svcxprt_rdma *xprt;
-       unsigned long flags;
+       struct svcxprt_rdma *xprt = cq->cq_context;
+       struct ib_cqe *cqe = wc->wr_cqe;
+       struct svc_rdma_op_ctxt *ctxt;
 
-       spin_lock_irqsave(&dto_lock, flags);
-       while (!list_empty(&dto_xprt_q)) {
-               xprt = list_entry(dto_xprt_q.next,
-                                 struct svcxprt_rdma, sc_dto_q);
-               list_del_init(&xprt->sc_dto_q);
-               spin_unlock_irqrestore(&dto_lock, flags);
+       /* WARNING: Only wc->wr_cqe and wc->status are reliable */
+       ctxt = container_of(cqe, struct svc_rdma_op_ctxt, cqe);
+       ctxt->wc_status = wc->status;
+       svc_rdma_unmap_dma(ctxt);
 
-               rq_cq_reap(xprt);
-               sq_cq_reap(xprt);
+       if (wc->status != IB_WC_SUCCESS)
+               goto flushed;
 
-               svc_xprt_put(&xprt->sc_xprt);
-               spin_lock_irqsave(&dto_lock, flags);
-       }
-       spin_unlock_irqrestore(&dto_lock, flags);
+       /* All wc fields are now known to be valid */
+       ctxt->byte_len = wc->byte_len;
+       spin_lock(&xprt->sc_rq_dto_lock);
+       list_add_tail(&ctxt->dto_q, &xprt->sc_rq_dto_q);
+       spin_unlock(&xprt->sc_rq_dto_lock);
+
+       set_bit(XPT_DATA, &xprt->sc_xprt.xpt_flags);
+       if (test_bit(RDMAXPRT_CONN_PENDING, &xprt->sc_flags))
+               goto out;
+       svc_xprt_enqueue(&xprt->sc_xprt);
+       goto out;
+
+flushed:
+       if (wc->status != IB_WC_WR_FLUSH_ERR)
+               pr_warn("svcrdma: receive: %s (%u/0x%x)\n",
+                       ib_wc_status_msg(wc->status),
+                       wc->status, wc->vendor_err);
+       set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
+       svc_rdma_put_context(ctxt, 1);
+
+out:
+       svc_xprt_put(&xprt->sc_xprt);
 }
 
-/*
- * Receive Queue Completion Handler
- *
- * Since an RQ completion handler is called on interrupt context, we
- * need to defer the handling of the I/O to a tasklet
- */
-static void rq_comp_handler(struct ib_cq *cq, void *cq_context)
+static void svc_rdma_send_wc_common(struct svcxprt_rdma *xprt,
+                                   struct ib_wc *wc,
+                                   const char *opname)
 {
-       struct svcxprt_rdma *xprt = cq_context;
-       unsigned long flags;
-
-       /* Guard against unconditional flush call for destroyed QP */
-       if (atomic_read(&xprt->sc_xprt.xpt_ref.refcount)==0)
-               return;
+       if (wc->status != IB_WC_SUCCESS)
+               goto err;
 
-       /*
-        * Set the bit regardless of whether or not it's on the list
-        * because it may be on the list already due to an SQ
-        * completion.
-        */
-       set_bit(RDMAXPRT_RQ_PENDING, &xprt->sc_flags);
+out:
+       atomic_dec(&xprt->sc_sq_count);
+       wake_up(&xprt->sc_send_wait);
+       return;
+
+err:
+       set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
+       if (wc->status != IB_WC_WR_FLUSH_ERR)
+               pr_err("svcrdma: %s: %s (%u/0x%x)\n",
+                      opname, ib_wc_status_msg(wc->status),
+                      wc->status, wc->vendor_err);
+       goto out;
+}
 
-       /*
-        * If this transport is not already on the DTO transport queue,
-        * add it
-        */
-       spin_lock_irqsave(&dto_lock, flags);
-       if (list_empty(&xprt->sc_dto_q)) {
-               svc_xprt_get(&xprt->sc_xprt);
-               list_add_tail(&xprt->sc_dto_q, &dto_xprt_q);
-       }
-       spin_unlock_irqrestore(&dto_lock, flags);
+static void svc_rdma_send_wc_common_put(struct ib_cq *cq, struct ib_wc *wc,
+                                       const char *opname)
+{
+       struct svcxprt_rdma *xprt = cq->cq_context;
 
-       /* Tasklet does all the work to avoid irqsave locks. */
-       tasklet_schedule(&dto_tasklet);
+       svc_rdma_send_wc_common(xprt, wc, opname);
+       svc_xprt_put(&xprt->sc_xprt);
 }
 
-/*
- * rq_cq_reap - Process the RQ CQ.
- *
- * Take all completing WC off the CQE and enqueue the associated DTO
- * context on the dto_q for the transport.
+/**
+ * svc_rdma_wc_send - Invoked by RDMA provider for each polled Send WC
+ * @cq:        completion queue
+ * @wc:        completed WR
  *
- * Note that caller must hold a transport reference.
  */
-static void rq_cq_reap(struct svcxprt_rdma *xprt)
+void svc_rdma_wc_send(struct ib_cq *cq, struct ib_wc *wc)
 {
-       int ret;
-       struct ib_wc wc;
-       struct svc_rdma_op_ctxt *ctxt = NULL;
+       struct ib_cqe *cqe = wc->wr_cqe;
+       struct svc_rdma_op_ctxt *ctxt;
 
-       if (!test_and_clear_bit(RDMAXPRT_RQ_PENDING, &xprt->sc_flags))
-               return;
+       svc_rdma_send_wc_common_put(cq, wc, "send");
 
-       ib_req_notify_cq(xprt->sc_rq_cq, IB_CQ_NEXT_COMP);
-       atomic_inc(&rdma_stat_rq_poll);
+       ctxt = container_of(cqe, struct svc_rdma_op_ctxt, cqe);
+       svc_rdma_unmap_dma(ctxt);
+       svc_rdma_put_context(ctxt, 1);
+}
 
-       while ((ret = ib_poll_cq(xprt->sc_rq_cq, 1, &wc)) > 0) {
-               ctxt = (struct svc_rdma_op_ctxt *)(unsigned long)wc.wr_id;
-               ctxt->wc_status = wc.status;
-               ctxt->byte_len = wc.byte_len;
-               svc_rdma_unmap_dma(ctxt);
-               if (wc.status != IB_WC_SUCCESS) {
-                       /* Close the transport */
-                       dprintk("svcrdma: transport closing putting ctxt %p\n", ctxt);
-                       set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
-                       svc_rdma_put_context(ctxt, 1);
-                       svc_xprt_put(&xprt->sc_xprt);
-                       continue;
-               }
-               spin_lock_bh(&xprt->sc_rq_dto_lock);
-               list_add_tail(&ctxt->dto_q, &xprt->sc_rq_dto_q);
-               spin_unlock_bh(&xprt->sc_rq_dto_lock);
-               svc_xprt_put(&xprt->sc_xprt);
-       }
+/**
+ * svc_rdma_wc_write - Invoked by RDMA provider for each polled Write WC
+ * @cq:        completion queue
+ * @wc:        completed WR
+ *
+ */
+void svc_rdma_wc_write(struct ib_cq *cq, struct ib_wc *wc)
+{
+       struct ib_cqe *cqe = wc->wr_cqe;
+       struct svc_rdma_op_ctxt *ctxt;
 
-       if (ctxt)
-               atomic_inc(&rdma_stat_rq_prod);
+       svc_rdma_send_wc_common_put(cq, wc, "write");
 
-       set_bit(XPT_DATA, &xprt->sc_xprt.xpt_flags);
-       /*
-        * If data arrived before established event,
-        * don't enqueue. This defers RPC I/O until the
-        * RDMA connection is complete.
-        */
-       if (!test_bit(RDMAXPRT_CONN_PENDING, &xprt->sc_flags))
-               svc_xprt_enqueue(&xprt->sc_xprt);
+       ctxt = container_of(cqe, struct svc_rdma_op_ctxt, cqe);
+       svc_rdma_unmap_dma(ctxt);
+       svc_rdma_put_context(ctxt, 0);
 }
 
-/*
- * Process a completion context
+/**
+ * svc_rdma_wc_reg - Invoked by RDMA provider for each polled FASTREG WC
+ * @cq:        completion queue
+ * @wc:        completed WR
+ *
  */
-static void process_context(struct svcxprt_rdma *xprt,
-                           struct svc_rdma_op_ctxt *ctxt)
+void svc_rdma_wc_reg(struct ib_cq *cq, struct ib_wc *wc)
 {
-       struct svc_rdma_op_ctxt *read_hdr;
-       int free_pages = 0;
-
-       svc_rdma_unmap_dma(ctxt);
+       svc_rdma_send_wc_common_put(cq, wc, "fastreg");
+}
 
-       switch (ctxt->wr_op) {
-       case IB_WR_SEND:
-               free_pages = 1;
-               break;
+/**
+ * svc_rdma_wc_read - Invoked by RDMA provider for each polled Read WC
+ * @cq:        completion queue
+ * @wc:        completed WR
+ *
+ */
+void svc_rdma_wc_read(struct ib_cq *cq, struct ib_wc *wc)
+{
+       struct svcxprt_rdma *xprt = cq->cq_context;
+       struct ib_cqe *cqe = wc->wr_cqe;
+       struct svc_rdma_op_ctxt *ctxt;
 
-       case IB_WR_RDMA_WRITE:
-               break;
+       svc_rdma_send_wc_common(xprt, wc, "read");
 
-       case IB_WR_RDMA_READ:
-       case IB_WR_RDMA_READ_WITH_INV:
-               svc_rdma_put_frmr(xprt, ctxt->frmr);
+       ctxt = container_of(cqe, struct svc_rdma_op_ctxt, cqe);
+       svc_rdma_unmap_dma(ctxt);
+       svc_rdma_put_frmr(xprt, ctxt->frmr);
 
-               if (!test_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags))
-                       break;
+       if (test_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags)) {
+               struct svc_rdma_op_ctxt *read_hdr;
 
                read_hdr = ctxt->read_hdr;
-               svc_rdma_put_context(ctxt, 0);
-
-               spin_lock_bh(&xprt->sc_rq_dto_lock);
-               set_bit(XPT_DATA, &xprt->sc_xprt.xpt_flags);
+               spin_lock(&xprt->sc_rq_dto_lock);
                list_add_tail(&read_hdr->dto_q,
                              &xprt->sc_read_complete_q);
-               spin_unlock_bh(&xprt->sc_rq_dto_lock);
-               svc_xprt_enqueue(&xprt->sc_xprt);
-               return;
+               spin_unlock(&xprt->sc_rq_dto_lock);
 
-       default:
-               dprintk("svcrdma: unexpected completion opcode=%d\n",
-                       ctxt->wr_op);
-               break;
+               set_bit(XPT_DATA, &xprt->sc_xprt.xpt_flags);
+               svc_xprt_enqueue(&xprt->sc_xprt);
        }
 
-       svc_rdma_put_context(ctxt, free_pages);
+       svc_rdma_put_context(ctxt, 0);
+       svc_xprt_put(&xprt->sc_xprt);
 }
 
-/*
- * Send Queue Completion Handler - potentially called on interrupt context.
+/**
+ * svc_rdma_wc_inv - Invoked by RDMA provider for each polled LOCAL_INV WC
+ * @cq:        completion queue
+ * @wc:        completed WR
  *
- * Note that caller must hold a transport reference.
  */
-static void sq_cq_reap(struct svcxprt_rdma *xprt)
-{
-       struct svc_rdma_op_ctxt *ctxt = NULL;
-       struct ib_wc wc_a[6];
-       struct ib_wc *wc;
-       struct ib_cq *cq = xprt->sc_sq_cq;
-       int ret;
-
-       memset(wc_a, 0, sizeof(wc_a));
-
-       if (!test_and_clear_bit(RDMAXPRT_SQ_PENDING, &xprt->sc_flags))
-               return;
-
-       ib_req_notify_cq(xprt->sc_sq_cq, IB_CQ_NEXT_COMP);
-       atomic_inc(&rdma_stat_sq_poll);
-       while ((ret = ib_poll_cq(cq, ARRAY_SIZE(wc_a), wc_a)) > 0) {
-               int i;
-
-               for (i = 0; i < ret; i++) {
-                       wc = &wc_a[i];
-                       if (wc->status != IB_WC_SUCCESS) {
-                               dprintk("svcrdma: sq wc err status %s (%d)\n",
-                                       ib_wc_status_msg(wc->status),
-                                       wc->status);
-
-                               /* Close the transport */
-                               set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
-                       }
-
-                       /* Decrement used SQ WR count */
-                       atomic_dec(&xprt->sc_sq_count);
-                       wake_up(&xprt->sc_send_wait);
-
-                       ctxt = (struct svc_rdma_op_ctxt *)
-                               (unsigned long)wc->wr_id;
-                       if (ctxt)
-                               process_context(xprt, ctxt);
-
-                       svc_xprt_put(&xprt->sc_xprt);
-               }
-       }
-
-       if (ctxt)
-               atomic_inc(&rdma_stat_sq_prod);
-}
-
-static void sq_comp_handler(struct ib_cq *cq, void *cq_context)
+void svc_rdma_wc_inv(struct ib_cq *cq, struct ib_wc *wc)
 {
-       struct svcxprt_rdma *xprt = cq_context;
-       unsigned long flags;
-
-       /* Guard against unconditional flush call for destroyed QP */
-       if (atomic_read(&xprt->sc_xprt.xpt_ref.refcount)==0)
-               return;
-
-       /*
-        * Set the bit regardless of whether or not it's on the list
-        * because it may be on the list already due to an RQ
-        * completion.
-        */
-       set_bit(RDMAXPRT_SQ_PENDING, &xprt->sc_flags);
-
-       /*
-        * If this transport is not already on the DTO transport queue,
-        * add it
-        */
-       spin_lock_irqsave(&dto_lock, flags);
-       if (list_empty(&xprt->sc_dto_q)) {
-               svc_xprt_get(&xprt->sc_xprt);
-               list_add_tail(&xprt->sc_dto_q, &dto_xprt_q);
-       }
-       spin_unlock_irqrestore(&dto_lock, flags);
-
-       /* Tasklet does all the work to avoid irqsave locks. */
-       tasklet_schedule(&dto_tasklet);
+       svc_rdma_send_wc_common_put(cq, wc, "localInv");
 }
 
 static struct svcxprt_rdma *rdma_create_xprt(struct svc_serv *serv,
@@ -681,6 +585,7 @@ int svc_rdma_post_recv(struct svcxprt_rdma *xprt, gfp_t flags)
        ctxt = svc_rdma_get_context(xprt);
        buflen = 0;
        ctxt->direction = DMA_FROM_DEVICE;
+       ctxt->cqe.done = svc_rdma_wc_receive;
        for (sge_no = 0; buflen < xprt->sc_max_req_size; sge_no++) {
                if (sge_no >= xprt->sc_max_sge) {
                        pr_err("svcrdma: Too many sges (%d)\n", sge_no);
@@ -705,7 +610,7 @@ int svc_rdma_post_recv(struct svcxprt_rdma *xprt, gfp_t flags)
        recv_wr.next = NULL;
        recv_wr.sg_list = &ctxt->sge[0];
        recv_wr.num_sge = ctxt->count;
-       recv_wr.wr_id = (u64)(unsigned long)ctxt;
+       recv_wr.wr_cqe = &ctxt->cqe;
 
        svc_xprt_get(&xprt->sc_xprt);
        ret = ib_post_recv(xprt->sc_qp, &recv_wr, &bad_recv_wr);
@@ -722,6 +627,21 @@ int svc_rdma_post_recv(struct svcxprt_rdma *xprt, gfp_t flags)
        return -ENOMEM;
 }
 
+int svc_rdma_repost_recv(struct svcxprt_rdma *xprt, gfp_t flags)
+{
+       int ret = 0;
+
+       ret = svc_rdma_post_recv(xprt, flags);
+       if (ret) {
+               pr_err("svcrdma: could not post a receive buffer, err=%d.\n",
+                      ret);
+               pr_err("svcrdma: closing transport %p.\n", xprt);
+               set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
+               ret = -ENOTCONN;
+       }
+       return ret;
+}
+
 /*
  * This function handles the CONNECT_REQUEST event on a listening
  * endpoint. It is passed the cma_id for the _new_ connection. The context in
@@ -1011,7 +931,6 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
        struct svcxprt_rdma *listen_rdma;
        struct svcxprt_rdma *newxprt = NULL;
        struct rdma_conn_param conn_param;
-       struct ib_cq_init_attr cq_attr = {};
        struct ib_qp_init_attr qp_attr;
        struct ib_device *dev;
        unsigned int i;
@@ -1069,22 +988,14 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
                dprintk("svcrdma: error creating PD for connect request\n");
                goto errout;
        }
-       cq_attr.cqe = newxprt->sc_sq_depth;
-       newxprt->sc_sq_cq = ib_create_cq(dev,
-                                        sq_comp_handler,
-                                        cq_event_handler,
-                                        newxprt,
-                                        &cq_attr);
+       newxprt->sc_sq_cq = ib_alloc_cq(dev, newxprt, newxprt->sc_sq_depth,
+                                       0, IB_POLL_SOFTIRQ);
        if (IS_ERR(newxprt->sc_sq_cq)) {
                dprintk("svcrdma: error creating SQ CQ for connect request\n");
                goto errout;
        }
-       cq_attr.cqe = newxprt->sc_rq_depth;
-       newxprt->sc_rq_cq = ib_create_cq(dev,
-                                        rq_comp_handler,
-                                        cq_event_handler,
-                                        newxprt,
-                                        &cq_attr);
+       newxprt->sc_rq_cq = ib_alloc_cq(dev, newxprt, newxprt->sc_rq_depth,
+                                       0, IB_POLL_SOFTIRQ);
        if (IS_ERR(newxprt->sc_rq_cq)) {
                dprintk("svcrdma: error creating RQ CQ for connect request\n");
                goto errout;
@@ -1173,13 +1084,6 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
        /* Swap out the handler */
        newxprt->sc_cm_id->event_handler = rdma_cma_handler;
 
-       /*
-        * Arm the CQs for the SQ and RQ before accepting so we can't
-        * miss the first message
-        */
-       ib_req_notify_cq(newxprt->sc_sq_cq, IB_CQ_NEXT_COMP);
-       ib_req_notify_cq(newxprt->sc_rq_cq, IB_CQ_NEXT_COMP);
-
        /* Accept Connection */
        set_bit(RDMAXPRT_CONN_PENDING, &newxprt->sc_flags);
        memset(&conn_param, 0, sizeof conn_param);
@@ -1319,10 +1223,10 @@ static void __svc_rdma_free(struct work_struct *work)
                ib_destroy_qp(rdma->sc_qp);
 
        if (rdma->sc_sq_cq && !IS_ERR(rdma->sc_sq_cq))
-               ib_destroy_cq(rdma->sc_sq_cq);
+               ib_free_cq(rdma->sc_sq_cq);
 
        if (rdma->sc_rq_cq && !IS_ERR(rdma->sc_rq_cq))
-               ib_destroy_cq(rdma->sc_rq_cq);
+               ib_free_cq(rdma->sc_rq_cq);
 
        if (rdma->sc_pd && !IS_ERR(rdma->sc_pd))
                ib_dealloc_pd(rdma->sc_pd);
@@ -1383,9 +1287,6 @@ int svc_rdma_send(struct svcxprt_rdma *xprt, struct ib_send_wr *wr)
                        spin_unlock_bh(&xprt->sc_lock);
                        atomic_inc(&rdma_stat_sq_starve);
 
-                       /* See if we can opportunistically reap SQ WR to make room */
-                       sq_cq_reap(xprt);
-
                        /* Wait until SQ WR available if SQ still full */
                        wait_event(xprt->sc_send_wait,
                                   atomic_read(&xprt->sc_sq_count) <
@@ -1418,57 +1319,3 @@ int svc_rdma_send(struct svcxprt_rdma *xprt, struct ib_send_wr *wr)
        }
        return ret;
 }
-
-void svc_rdma_send_error(struct svcxprt_rdma *xprt, struct rpcrdma_msg *rmsgp,
-                        enum rpcrdma_errcode err)
-{
-       struct ib_send_wr err_wr;
-       struct page *p;
-       struct svc_rdma_op_ctxt *ctxt;
-       __be32 *va;
-       int length;
-       int ret;
-
-       p = alloc_page(GFP_KERNEL);
-       if (!p)
-               return;
-       va = page_address(p);
-
-       /* XDR encode error */
-       length = svc_rdma_xdr_encode_error(xprt, rmsgp, err, va);
-
-       ctxt = svc_rdma_get_context(xprt);
-       ctxt->direction = DMA_FROM_DEVICE;
-       ctxt->count = 1;
-       ctxt->pages[0] = p;
-
-       /* Prepare SGE for local address */
-       ctxt->sge[0].addr = ib_dma_map_page(xprt->sc_cm_id->device,
-                                           p, 0, length, DMA_FROM_DEVICE);
-       if (ib_dma_mapping_error(xprt->sc_cm_id->device, ctxt->sge[0].addr)) {
-               put_page(p);
-               svc_rdma_put_context(ctxt, 1);
-               return;
-       }
-       atomic_inc(&xprt->sc_dma_used);
-       ctxt->sge[0].lkey = xprt->sc_pd->local_dma_lkey;
-       ctxt->sge[0].length = length;
-
-       /* Prepare SEND WR */
-       memset(&err_wr, 0, sizeof err_wr);
-       ctxt->wr_op = IB_WR_SEND;
-       err_wr.wr_id = (unsigned long)ctxt;
-       err_wr.sg_list = ctxt->sge;
-       err_wr.num_sge = 1;
-       err_wr.opcode = IB_WR_SEND;
-       err_wr.send_flags = IB_SEND_SIGNALED;
-
-       /* Post It */
-       ret = svc_rdma_send(xprt, &err_wr);
-       if (ret) {
-               dprintk("svcrdma: Error %d posting send for protocol error\n",
-                       ret);
-               svc_rdma_unmap_dma(ctxt);
-               svc_rdma_put_context(ctxt, 1);
-       }
-}