]> git.kernelconcepts.de Git - karo-tx-linux.git/blobdiff - fs/ceph/dir.c
net/mlx5e: Add support to get ethtool flow rules
[karo-tx-linux.git] / fs / ceph / dir.c
index 4fb2bbc2a2722af6e9ccf84cf9fc80ce7f3edbeb..6e0fedf6713b5130af5c79cdb14649e90ba8427c 100644 (file)
@@ -5,6 +5,7 @@
 #include <linux/namei.h>
 #include <linux/slab.h>
 #include <linux/sched.h>
+#include <linux/xattr.h>
 
 #include "super.h"
 #include "mds_client.h"
@@ -69,16 +70,42 @@ out_unlock:
 }
 
 /*
- * for readdir, we encode the directory frag and offset within that
- * frag into f_pos.
+ * for f_pos for readdir:
+ * - hash order:
+ *     (0xff << 52) | ((24 bits hash) << 28) |
+ *     (the nth entry has hash collision);
+ * - frag+name order;
+ *     ((frag value) << 28) | (the nth entry in frag);
  */
+#define OFFSET_BITS    28
+#define OFFSET_MASK    ((1 << OFFSET_BITS) - 1)
+#define HASH_ORDER     (0xffull << (OFFSET_BITS + 24))
+loff_t ceph_make_fpos(unsigned high, unsigned off, bool hash_order)
+{
+       loff_t fpos = ((loff_t)high << 28) | (loff_t)off;
+       if (hash_order)
+               fpos |= HASH_ORDER;
+       return fpos;
+}
+
+static bool is_hash_order(loff_t p)
+{
+       return (p & HASH_ORDER) == HASH_ORDER;
+}
+
 static unsigned fpos_frag(loff_t p)
 {
-       return p >> 32;
+       return p >> OFFSET_BITS;
 }
+
+static unsigned fpos_hash(loff_t p)
+{
+       return ceph_frag_value(fpos_frag(p));
+}
+
 static unsigned fpos_off(loff_t p)
 {
-       return p & 0xffffffff;
+       return p & OFFSET_MASK;
 }
 
 static int fpos_cmp(loff_t l, loff_t r)
@@ -110,6 +137,50 @@ static int note_last_dentry(struct ceph_file_info *fi, const char *name,
        return 0;
 }
 
+
+static struct dentry *
+__dcache_find_get_entry(struct dentry *parent, u64 idx,
+                       struct ceph_readdir_cache_control *cache_ctl)
+{
+       struct inode *dir = d_inode(parent);
+       struct dentry *dentry;
+       unsigned idx_mask = (PAGE_SIZE / sizeof(struct dentry *)) - 1;
+       loff_t ptr_pos = idx * sizeof(struct dentry *);
+       pgoff_t ptr_pgoff = ptr_pos >> PAGE_SHIFT;
+
+       if (ptr_pos >= i_size_read(dir))
+               return NULL;
+
+       if (!cache_ctl->page || ptr_pgoff != page_index(cache_ctl->page)) {
+               ceph_readdir_cache_release(cache_ctl);
+               cache_ctl->page = find_lock_page(&dir->i_data, ptr_pgoff);
+               if (!cache_ctl->page) {
+                       dout(" page %lu not found\n", ptr_pgoff);
+                       return ERR_PTR(-EAGAIN);
+               }
+               /* reading/filling the cache are serialized by
+                  i_mutex, no need to use page lock */
+               unlock_page(cache_ctl->page);
+               cache_ctl->dentries = kmap(cache_ctl->page);
+       }
+
+       cache_ctl->index = idx & idx_mask;
+
+       rcu_read_lock();
+       spin_lock(&parent->d_lock);
+       /* check i_size again here, because empty directory can be
+        * marked as complete while not holding the i_mutex. */
+       if (ceph_dir_is_complete_ordered(dir) && ptr_pos < i_size_read(dir))
+               dentry = cache_ctl->dentries[cache_ctl->index];
+       else
+               dentry = NULL;
+       spin_unlock(&parent->d_lock);
+       if (dentry && !lockref_get_not_dead(&dentry->d_lockref))
+               dentry = NULL;
+       rcu_read_unlock();
+       return dentry ? : ERR_PTR(-EAGAIN);
+}
+
 /*
  * When possible, we try to satisfy a readdir by peeking at the
  * dcache.  We make this work by carefully ordering dentries on
@@ -129,75 +200,68 @@ static int __dcache_readdir(struct file *file,  struct dir_context *ctx,
        struct inode *dir = d_inode(parent);
        struct dentry *dentry, *last = NULL;
        struct ceph_dentry_info *di;
-       unsigned nsize = PAGE_SIZE / sizeof(struct dentry *);
-       int err = 0;
-       loff_t ptr_pos = 0;
        struct ceph_readdir_cache_control cache_ctl = {};
+       u64 idx = 0;
+       int err = 0;
 
-       dout("__dcache_readdir %p v%u at %llu\n", dir, shared_gen, ctx->pos);
+       dout("__dcache_readdir %p v%u at %llx\n", dir, shared_gen, ctx->pos);
+
+       /* search start position */
+       if (ctx->pos > 2) {
+               u64 count = div_u64(i_size_read(dir), sizeof(struct dentry *));
+               while (count > 0) {
+                       u64 step = count >> 1;
+                       dentry = __dcache_find_get_entry(parent, idx + step,
+                                                        &cache_ctl);
+                       if (!dentry) {
+                               /* use linar search */
+                               idx = 0;
+                               break;
+                       }
+                       if (IS_ERR(dentry)) {
+                               err = PTR_ERR(dentry);
+                               goto out;
+                       }
+                       di = ceph_dentry(dentry);
+                       spin_lock(&dentry->d_lock);
+                       if (fpos_cmp(di->offset, ctx->pos) < 0) {
+                               idx += step + 1;
+                               count -= step + 1;
+                       } else {
+                               count = step;
+                       }
+                       spin_unlock(&dentry->d_lock);
+                       dput(dentry);
+               }
 
-       /* we can calculate cache index for the first dirfrag */
-       if (ceph_frag_is_leftmost(fpos_frag(ctx->pos))) {
-               cache_ctl.index = fpos_off(ctx->pos) - 2;
-               BUG_ON(cache_ctl.index < 0);
-               ptr_pos = cache_ctl.index * sizeof(struct dentry *);
+               dout("__dcache_readdir %p cache idx %llu\n", dir, idx);
        }
 
-       while (true) {
-               pgoff_t pgoff;
-               bool emit_dentry;
 
-               if (ptr_pos >= i_size_read(dir)) {
+       for (;;) {
+               bool emit_dentry = false;
+               dentry = __dcache_find_get_entry(parent, idx++, &cache_ctl);
+               if (!dentry) {
                        fi->flags |= CEPH_F_ATEND;
                        err = 0;
                        break;
                }
-
-               err = -EAGAIN;
-               pgoff = ptr_pos >> PAGE_SHIFT;
-               if (!cache_ctl.page || pgoff != page_index(cache_ctl.page)) {
-                       ceph_readdir_cache_release(&cache_ctl);
-                       cache_ctl.page = find_lock_page(&dir->i_data, pgoff);
-                       if (!cache_ctl.page) {
-                               dout(" page %lu not found\n", pgoff);
-                               break;
-                       }
-                       /* reading/filling the cache are serialized by
-                        * i_mutex, no need to use page lock */
-                       unlock_page(cache_ctl.page);
-                       cache_ctl.dentries = kmap(cache_ctl.page);
+               if (IS_ERR(dentry)) {
+                       err = PTR_ERR(dentry);
+                       goto out;
                }
 
-               rcu_read_lock();
-               spin_lock(&parent->d_lock);
-               /* check i_size again here, because empty directory can be
-                * marked as complete while not holding the i_mutex. */
-               if (ceph_dir_is_complete_ordered(dir) &&
-                   ptr_pos < i_size_read(dir))
-                       dentry = cache_ctl.dentries[cache_ctl.index % nsize];
-               else
-                       dentry = NULL;
-               spin_unlock(&parent->d_lock);
-               if (dentry && !lockref_get_not_dead(&dentry->d_lockref))
-                       dentry = NULL;
-               rcu_read_unlock();
-               if (!dentry)
-                       break;
-
-               emit_dentry = false;
                di = ceph_dentry(dentry);
                spin_lock(&dentry->d_lock);
                if (di->lease_shared_gen == shared_gen &&
                    d_really_is_positive(dentry) &&
-                   ceph_snap(d_inode(dentry)) != CEPH_SNAPDIR &&
-                   ceph_ino(d_inode(dentry)) != CEPH_INO_CEPH &&
                    fpos_cmp(ctx->pos, di->offset) <= 0) {
                        emit_dentry = true;
                }
                spin_unlock(&dentry->d_lock);
 
                if (emit_dentry) {
-                       dout(" %llu (%llu) dentry %p %pd %p\n", di->offset, ctx->pos,
+                       dout(" %llx dentry %p %pd %p\n", di->offset,
                             dentry, dentry, d_inode(dentry));
                        ctx->pos = di->offset;
                        if (!dir_emit(ctx, dentry->d_name.name,
@@ -217,10 +281,8 @@ static int __dcache_readdir(struct file *file,  struct dir_context *ctx,
                } else {
                        dput(dentry);
                }
-
-               cache_ctl.index++;
-               ptr_pos += sizeof(struct dentry *);
        }
+out:
        ceph_readdir_cache_release(&cache_ctl);
        if (last) {
                int ret;
@@ -234,6 +296,16 @@ static int __dcache_readdir(struct file *file,  struct dir_context *ctx,
        return err;
 }
 
+static bool need_send_readdir(struct ceph_file_info *fi, loff_t pos)
+{
+       if (!fi->last_readdir)
+               return true;
+       if (is_hash_order(pos))
+               return !ceph_frag_contains_value(fi->frag, fpos_hash(pos));
+       else
+               return fi->frag != fpos_frag(pos);
+}
+
 static int ceph_readdir(struct file *file, struct dir_context *ctx)
 {
        struct ceph_file_info *fi = file->private_data;
@@ -241,13 +313,12 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx)
        struct ceph_inode_info *ci = ceph_inode(inode);
        struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
        struct ceph_mds_client *mdsc = fsc->mdsc;
-       unsigned frag = fpos_frag(ctx->pos);
-       int off = fpos_off(ctx->pos);
+       int i;
        int err;
        u32 ftype;
        struct ceph_mds_reply_info_parsed *rinfo;
 
-       dout("readdir %p file %p frag %u off %u\n", inode, file, frag, off);
+       dout("readdir %p file %p pos %llx\n", inode, file, ctx->pos);
        if (fi->flags & CEPH_F_ATEND)
                return 0;
 
@@ -259,7 +330,6 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx)
                            inode->i_mode >> 12))
                        return 0;
                ctx->pos = 1;
-               off = 1;
        }
        if (ctx->pos == 1) {
                ino_t ino = parent_ino(file->f_path.dentry);
@@ -269,7 +339,6 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx)
                            inode->i_mode >> 12))
                        return 0;
                ctx->pos = 2;
-               off = 2;
        }
 
        /* can we use the dcache? */
@@ -284,8 +353,6 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx)
                err = __dcache_readdir(file, ctx, shared_gen);
                if (err != -EAGAIN)
                        return err;
-               frag = fpos_frag(ctx->pos);
-               off = fpos_off(ctx->pos);
        } else {
                spin_unlock(&ci->i_ceph_lock);
        }
@@ -293,8 +360,9 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx)
        /* proceed with a normal readdir */
 more:
        /* do we have the correct frag content buffered? */
-       if (fi->frag != frag || fi->last_readdir == NULL) {
+       if (need_send_readdir(fi, ctx->pos)) {
                struct ceph_mds_request *req;
+               unsigned frag;
                int op = ceph_snap(inode) == CEPH_SNAPDIR ?
                        CEPH_MDS_OP_LSSNAP : CEPH_MDS_OP_READDIR;
 
@@ -304,6 +372,13 @@ more:
                        fi->last_readdir = NULL;
                }
 
+               if (is_hash_order(ctx->pos)) {
+                       frag = ceph_choose_frag(ci, fpos_hash(ctx->pos),
+                                               NULL, NULL);
+               } else {
+                       frag = fpos_frag(ctx->pos);
+               }
+
                dout("readdir fetching %llx.%llx frag %x offset '%s'\n",
                     ceph_vinop(inode), frag, fi->last_name);
                req = ceph_mdsc_create_request(mdsc, op, USE_AUTH_MDS);
@@ -330,6 +405,8 @@ more:
                req->r_readdir_cache_idx = fi->readdir_cache_idx;
                req->r_readdir_offset = fi->next_offset;
                req->r_args.readdir.frag = cpu_to_le32(frag);
+               req->r_args.readdir.flags =
+                               cpu_to_le16(CEPH_READDIR_REPLY_BITFLAGS);
 
                req->r_inode = inode;
                ihold(inode);
@@ -339,22 +416,26 @@ more:
                        ceph_mdsc_put_request(req);
                        return err;
                }
-               dout("readdir got and parsed readdir result=%d"
-                    " on frag %x, end=%d, complete=%d\n", err, frag,
+               dout("readdir got and parsed readdir result=%d on "
+                    "frag %x, end=%d, complete=%d, hash_order=%d\n",
+                    err, frag,
                     (int)req->r_reply_info.dir_end,
-                    (int)req->r_reply_info.dir_complete);
-
+                    (int)req->r_reply_info.dir_complete,
+                    (int)req->r_reply_info.hash_order);
 
-               /* note next offset and last dentry name */
                rinfo = &req->r_reply_info;
                if (le32_to_cpu(rinfo->dir_dir->frag) != frag) {
                        frag = le32_to_cpu(rinfo->dir_dir->frag);
-                       off = req->r_readdir_offset;
-                       fi->next_offset = off;
+                       if (!rinfo->hash_order) {
+                               fi->next_offset = req->r_readdir_offset;
+                               /* adjust ctx->pos to beginning of frag */
+                               ctx->pos = ceph_make_fpos(frag,
+                                                         fi->next_offset,
+                                                         false);
+                       }
                }
 
                fi->frag = frag;
-               fi->offset = fi->next_offset;
                fi->last_readdir = req;
 
                if (req->r_did_prepopulate) {
@@ -362,7 +443,8 @@ more:
                        if (fi->readdir_cache_idx < 0) {
                                /* preclude from marking dir ordered */
                                fi->dir_ordered_count = 0;
-                       } else if (ceph_frag_is_leftmost(frag) && off == 2) {
+                       } else if (ceph_frag_is_leftmost(frag) &&
+                                  fi->next_offset == 2) {
                                /* note dir version at start of readdir so
                                 * we can tell if any dentries get dropped */
                                fi->dir_release_count = req->r_dir_release_cnt;
@@ -376,65 +458,87 @@ more:
                        fi->dir_release_count = 0;
                }
 
-               if (req->r_reply_info.dir_end) {
-                       kfree(fi->last_name);
-                       fi->last_name = NULL;
-                       if (ceph_frag_is_rightmost(frag))
-                               fi->next_offset = 2;
-                       else
-                               fi->next_offset = 0;
-               } else {
-                       err = note_last_dentry(fi,
-                                      rinfo->dir_dname[rinfo->dir_nr-1],
-                                      rinfo->dir_dname_len[rinfo->dir_nr-1],
-                                      fi->next_offset + rinfo->dir_nr);
+               /* note next offset and last dentry name */
+               if (rinfo->dir_nr > 0) {
+                       struct ceph_mds_reply_dir_entry *rde =
+                                       rinfo->dir_entries + (rinfo->dir_nr-1);
+                       unsigned next_offset = req->r_reply_info.dir_end ?
+                                       2 : (fpos_off(rde->offset) + 1);
+                       err = note_last_dentry(fi, rde->name, rde->name_len,
+                                              next_offset);
                        if (err)
                                return err;
+               } else if (req->r_reply_info.dir_end) {
+                       fi->next_offset = 2;
+                       /* keep last name */
                }
        }
 
        rinfo = &fi->last_readdir->r_reply_info;
-       dout("readdir frag %x num %d off %d chunkoff %d\n", frag,
-            rinfo->dir_nr, off, fi->offset);
-
-       ctx->pos = ceph_make_fpos(frag, off);
-       while (off >= fi->offset && off - fi->offset < rinfo->dir_nr) {
-               struct ceph_mds_reply_inode *in =
-                       rinfo->dir_in[off - fi->offset].in;
+       dout("readdir frag %x num %d pos %llx chunk first %llx\n",
+            fi->frag, rinfo->dir_nr, ctx->pos,
+            rinfo->dir_nr ? rinfo->dir_entries[0].offset : 0LL);
+
+       i = 0;
+       /* search start position */
+       if (rinfo->dir_nr > 0) {
+               int step, nr = rinfo->dir_nr;
+               while (nr > 0) {
+                       step = nr >> 1;
+                       if (rinfo->dir_entries[i + step].offset < ctx->pos) {
+                               i +=  step + 1;
+                               nr -= step + 1;
+                       } else {
+                               nr = step;
+                       }
+               }
+       }
+       for (; i < rinfo->dir_nr; i++) {
+               struct ceph_mds_reply_dir_entry *rde = rinfo->dir_entries + i;
                struct ceph_vino vino;
                ino_t ino;
 
-               dout("readdir off %d (%d/%d) -> %lld '%.*s' %p\n",
-                    off, off - fi->offset, rinfo->dir_nr, ctx->pos,
-                    rinfo->dir_dname_len[off - fi->offset],
-                    rinfo->dir_dname[off - fi->offset], in);
-               BUG_ON(!in);
-               ftype = le32_to_cpu(in->mode) >> 12;
-               vino.ino = le64_to_cpu(in->ino);
-               vino.snap = le64_to_cpu(in->snapid);
+               BUG_ON(rde->offset < ctx->pos);
+
+               ctx->pos = rde->offset;
+               dout("readdir (%d/%d) -> %llx '%.*s' %p\n",
+                    i, rinfo->dir_nr, ctx->pos,
+                    rde->name_len, rde->name, &rde->inode.in);
+
+               BUG_ON(!rde->inode.in);
+               ftype = le32_to_cpu(rde->inode.in->mode) >> 12;
+               vino.ino = le64_to_cpu(rde->inode.in->ino);
+               vino.snap = le64_to_cpu(rde->inode.in->snapid);
                ino = ceph_vino_to_ino(vino);
-               if (!dir_emit(ctx,
-                           rinfo->dir_dname[off - fi->offset],
-                           rinfo->dir_dname_len[off - fi->offset],
-                           ceph_translate_ino(inode->i_sb, ino), ftype)) {
+
+               if (!dir_emit(ctx, rde->name, rde->name_len,
+                             ceph_translate_ino(inode->i_sb, ino), ftype)) {
                        dout("filldir stopping us...\n");
                        return 0;
                }
-               off++;
                ctx->pos++;
        }
 
-       if (fi->last_name) {
+       if (fi->next_offset > 2) {
                ceph_mdsc_put_request(fi->last_readdir);
                fi->last_readdir = NULL;
                goto more;
        }
 
        /* more frags? */
-       if (!ceph_frag_is_rightmost(frag)) {
-               frag = ceph_frag_next(frag);
-               off = 0;
-               ctx->pos = ceph_make_fpos(frag, off);
+       if (!ceph_frag_is_rightmost(fi->frag)) {
+               unsigned frag = ceph_frag_next(fi->frag);
+               if (is_hash_order(ctx->pos)) {
+                       loff_t new_pos = ceph_make_fpos(ceph_frag_value(frag),
+                                                       fi->next_offset, true);
+                       if (new_pos > ctx->pos)
+                               ctx->pos = new_pos;
+                       /* keep last_name */
+               } else {
+                       ctx->pos = ceph_make_fpos(frag, fi->next_offset, false);
+                       kfree(fi->last_name);
+                       fi->last_name = NULL;
+               }
                dout("readdir next frag is %x\n", frag);
                goto more;
        }
@@ -466,7 +570,7 @@ more:
        return 0;
 }
 
-static void reset_readdir(struct ceph_file_info *fi, unsigned frag)
+static void reset_readdir(struct ceph_file_info *fi)
 {
        if (fi->last_readdir) {
                ceph_mdsc_put_request(fi->last_readdir);
@@ -476,18 +580,38 @@ static void reset_readdir(struct ceph_file_info *fi, unsigned frag)
        fi->last_name = NULL;
        fi->dir_release_count = 0;
        fi->readdir_cache_idx = -1;
-       if (ceph_frag_is_leftmost(frag))
-               fi->next_offset = 2;  /* compensate for . and .. */
-       else
-               fi->next_offset = 0;
+       fi->next_offset = 2;  /* compensate for . and .. */
        fi->flags &= ~CEPH_F_ATEND;
 }
 
+/*
+ * discard buffered readdir content on seekdir(0), or seek to new frag,
+ * or seek prior to current chunk
+ */
+static bool need_reset_readdir(struct ceph_file_info *fi, loff_t new_pos)
+{
+       struct ceph_mds_reply_info_parsed *rinfo;
+       loff_t chunk_offset;
+       if (new_pos == 0)
+               return true;
+       if (is_hash_order(new_pos)) {
+               /* no need to reset last_name for a forward seek when
+                * dentries are sotred in hash order */
+       } else if (fi->frag |= fpos_frag(new_pos)) {
+               return true;
+       }
+       rinfo = fi->last_readdir ? &fi->last_readdir->r_reply_info : NULL;
+       if (!rinfo || !rinfo->dir_nr)
+               return true;
+       chunk_offset = rinfo->dir_entries[0].offset;
+       return new_pos < chunk_offset ||
+              is_hash_order(new_pos) != is_hash_order(chunk_offset);
+}
+
 static loff_t ceph_dir_llseek(struct file *file, loff_t offset, int whence)
 {
        struct ceph_file_info *fi = file->private_data;
        struct inode *inode = file->f_mapping->host;
-       loff_t old_offset = ceph_make_fpos(fi->frag, fi->next_offset);
        loff_t retval;
 
        inode_lock(inode);
@@ -504,25 +628,22 @@ static loff_t ceph_dir_llseek(struct file *file, loff_t offset, int whence)
        }
 
        if (offset >= 0) {
+               if (need_reset_readdir(fi, offset)) {
+                       dout("dir_llseek dropping %p content\n", file);
+                       reset_readdir(fi);
+               } else if (is_hash_order(offset) && offset > file->f_pos) {
+                       /* for hash offset, we don't know if a forward seek
+                        * is within same frag */
+                       fi->dir_release_count = 0;
+                       fi->readdir_cache_idx = -1;
+               }
+
                if (offset != file->f_pos) {
                        file->f_pos = offset;
                        file->f_version = 0;
                        fi->flags &= ~CEPH_F_ATEND;
                }
                retval = offset;
-
-               if (offset == 0 ||
-                   fpos_frag(offset) != fi->frag ||
-                   fpos_off(offset) < fi->offset) {
-                       /* discard buffered readdir content on seekdir(0), or
-                        * seek to new frag, or seek prior to current chunk */
-                       dout("dir_llseek dropping %p content\n", file);
-                       reset_readdir(fi, fpos_frag(offset));
-               } else if (fpos_cmp(offset, old_offset) > 0) {
-                       /* reset dir_release_count if we did a forward seek */
-                       fi->dir_release_count = 0;
-                       fi->readdir_cache_idx = -1;
-               }
        }
 out:
        inode_unlock(inode);
@@ -590,7 +711,7 @@ struct dentry *ceph_finish_lookup(struct ceph_mds_request *req,
        return dentry;
 }
 
-static int is_root_ceph_dentry(struct inode *inode, struct dentry *dentry)
+static bool is_root_ceph_dentry(struct inode *inode, struct dentry *dentry)
 {
        return ceph_ino(inode) == CEPH_INO_ROOT &&
                strncmp(dentry->d_name.name, ".ceph", 5) == 0;
@@ -1342,10 +1463,10 @@ const struct inode_operations ceph_dir_iops = {
        .permission = ceph_permission,
        .getattr = ceph_getattr,
        .setattr = ceph_setattr,
-       .setxattr = ceph_setxattr,
-       .getxattr = ceph_getxattr,
+       .setxattr = generic_setxattr,
+       .getxattr = generic_getxattr,
        .listxattr = ceph_listxattr,
-       .removexattr = ceph_removexattr,
+       .removexattr = generic_removexattr,
        .get_acl = ceph_get_acl,
        .set_acl = ceph_set_acl,
        .mknod = ceph_mknod,