]> git.kernelconcepts.de Git - karo-tx-linux.git/blobdiff - fs/ceph/mds_client.c
ceph: switch from BKL to lock_flocks()
[karo-tx-linux.git] / fs / ceph / mds_client.c
index a75ddbf9fe3743973c6fd02724f005b8504c3738..3142b15940c25656a43ec3a5d72af3e1ee1cece9 100644 (file)
@@ -1,17 +1,21 @@
-#include "ceph_debug.h"
+#include <linux/ceph/ceph_debug.h>
 
+#include <linux/fs.h>
 #include <linux/wait.h>
 #include <linux/slab.h>
 #include <linux/sched.h>
+#include <linux/debugfs.h>
+#include <linux/seq_file.h>
 #include <linux/smp_lock.h>
 
-#include "mds_client.h"
-#include "mon_client.h"
 #include "super.h"
-#include "messenger.h"
-#include "decode.h"
-#include "auth.h"
-#include "pagelist.h"
+#include "mds_client.h"
+
+#include <linux/ceph/messenger.h>
+#include <linux/ceph/decode.h>
+#include <linux/ceph/pagelist.h>
+#include <linux/ceph/auth.h>
+#include <linux/ceph/debugfs.h>
 
 /*
  * A cluster of MDS (metadata server) daemons is responsible for
@@ -286,8 +290,9 @@ void ceph_put_mds_session(struct ceph_mds_session *s)
             atomic_read(&s->s_ref), atomic_read(&s->s_ref)-1);
        if (atomic_dec_and_test(&s->s_ref)) {
                if (s->s_authorizer)
-                       s->s_mdsc->client->monc.auth->ops->destroy_authorizer(
-                               s->s_mdsc->client->monc.auth, s->s_authorizer);
+                    s->s_mdsc->fsc->client->monc.auth->ops->destroy_authorizer(
+                            s->s_mdsc->fsc->client->monc.auth,
+                            s->s_authorizer);
                kfree(s);
        }
 }
@@ -344,7 +349,7 @@ static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
        s->s_seq = 0;
        mutex_init(&s->s_mutex);
 
-       ceph_con_init(mdsc->client->msgr, &s->s_con);
+       ceph_con_init(mdsc->fsc->client->msgr, &s->s_con);
        s->s_con.private = s;
        s->s_con.ops = &mds_con_ops;
        s->s_con.peer_name.type = CEPH_ENTITY_TYPE_MDS;
@@ -560,6 +565,13 @@ static void __unregister_request(struct ceph_mds_client *mdsc,
  *
  * Called under mdsc->mutex.
  */
+struct dentry *get_nonsnap_parent(struct dentry *dentry)
+{
+       while (!IS_ROOT(dentry) && ceph_snap(dentry->d_inode) != CEPH_NOSNAP)
+               dentry = dentry->d_parent;
+       return dentry;
+}
+
 static int __choose_mds(struct ceph_mds_client *mdsc,
                        struct ceph_mds_request *req)
 {
@@ -590,14 +602,29 @@ static int __choose_mds(struct ceph_mds_client *mdsc,
        if (req->r_inode) {
                inode = req->r_inode;
        } else if (req->r_dentry) {
-               if (req->r_dentry->d_inode) {
+               struct inode *dir = req->r_dentry->d_parent->d_inode;
+
+               if (dir->i_sb != mdsc->fsc->sb) {
+                       /* not this fs! */
+                       inode = req->r_dentry->d_inode;
+               } else if (ceph_snap(dir) != CEPH_NOSNAP) {
+                       /* direct snapped/virtual snapdir requests
+                        * based on parent dir inode */
+                       struct dentry *dn =
+                               get_nonsnap_parent(req->r_dentry->d_parent);
+                       inode = dn->d_inode;
+                       dout("__choose_mds using nonsnap parent %p\n", inode);
+               } else if (req->r_dentry->d_inode) {
+                       /* dentry target */
                        inode = req->r_dentry->d_inode;
                } else {
-                       inode = req->r_dentry->d_parent->d_inode;
+                       /* dir + name */
+                       inode = dir;
                        hash = req->r_dentry->d_name.hash;
                        is_hash = true;
                }
        }
+
        dout("__choose_mds %p is_hash=%d (%d) mode %d\n", inode, (int)is_hash,
             (int)hash, mode);
        if (!inode)
@@ -862,7 +889,7 @@ static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
        __ceph_remove_cap(cap);
        if (!__ceph_is_any_real_caps(ci)) {
                struct ceph_mds_client *mdsc =
-                       &ceph_sb_to_client(inode->i_sb)->mdsc;
+                       ceph_sb_to_client(inode->i_sb)->mdsc;
 
                spin_lock(&mdsc->cap_dirty_lock);
                if (!list_empty(&ci->i_dirty_item)) {
@@ -1124,7 +1151,7 @@ int ceph_add_cap_releases(struct ceph_mds_client *mdsc,
        struct ceph_msg *msg, *partial = NULL;
        struct ceph_mds_cap_release *head;
        int err = -ENOMEM;
-       int extra = mdsc->client->mount_args->cap_release_safety;
+       int extra = mdsc->fsc->mount_options->cap_release_safety;
        int num;
 
        dout("add_cap_releases %p mds%d extra %d\n", session, session->s_mds,
@@ -2063,7 +2090,7 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
 
        /* insert trace into our cache */
        mutex_lock(&req->r_fill_mutex);
-       err = ceph_fill_trace(mdsc->client->sb, req, req->r_session);
+       err = ceph_fill_trace(mdsc->fsc->sb, req, req->r_session);
        if (err == 0) {
                if (result == 0 && rinfo->dir_nr)
                        ceph_readdir_prepopulate(req, req->r_session);
@@ -2208,7 +2235,7 @@ static void handle_session(struct ceph_mds_session *session,
                        pr_info("mds%d reconnect denied\n", session->s_mds);
                remove_session_caps(session);
                wake = 1; /* for good measure */
-               complete_all(&mdsc->session_close_waiters);
+               wake_up_all(&mdsc->session_close_wq);
                kick_requests(mdsc, mds);
                break;
 
@@ -2302,7 +2329,7 @@ static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
                path = ceph_mdsc_build_path(dentry, &pathlen, &pathbase, 0);
                if (IS_ERR(path)) {
                        err = PTR_ERR(path);
-                       BUG_ON(err);
+                       goto out_dput;
                }
        } else {
                path = NULL;
@@ -2310,7 +2337,7 @@ static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
        }
        err = ceph_pagelist_encode_string(pagelist, path, pathlen);
        if (err)
-               goto out;
+               goto out_free;
 
        spin_lock(&inode->i_lock);
        cap->seq = 0;        /* reset cap seq */
@@ -2339,23 +2366,42 @@ static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
 
        if (recon_state->flock) {
                int num_fcntl_locks, num_flock_locks;
-
-               lock_kernel();
-               ceph_count_locks(inode, &num_fcntl_locks, &num_flock_locks);
-               rec.v2.flock_len = (2*sizeof(u32) +
-                                   (num_fcntl_locks+num_flock_locks) *
-                                   sizeof(struct ceph_filelock));
-
+               struct ceph_pagelist_cursor trunc_point;
+
+               ceph_pagelist_set_cursor(pagelist, &trunc_point);
+               do {
+                       lock_flocks();
+                       ceph_count_locks(inode, &num_fcntl_locks,
+                                        &num_flock_locks);
+                       rec.v2.flock_len = (2*sizeof(u32) +
+                                           (num_fcntl_locks+num_flock_locks) *
+                                           sizeof(struct ceph_filelock));
+                       unlock_flocks();
+
+                       /* pre-alloc pagelist */
+                       ceph_pagelist_truncate(pagelist, &trunc_point);
+                       err = ceph_pagelist_append(pagelist, &rec, reclen);
+                       if (!err)
+                               err = ceph_pagelist_reserve(pagelist,
+                                                           rec.v2.flock_len);
+
+                       /* encode locks */
+                       if (!err) {
+                               lock_flocks();
+                               err = ceph_encode_locks(inode,
+                                                       pagelist,
+                                                       num_fcntl_locks,
+                                                       num_flock_locks);
+                               unlock_flocks();
+                       }
+               } while (err == -ENOSPC);
+       } else {
                err = ceph_pagelist_append(pagelist, &rec, reclen);
-               if (!err)
-                       err = ceph_encode_locks(inode, pagelist,
-                                               num_fcntl_locks,
-                                               num_flock_locks);
-               unlock_kernel();
        }
 
-out:
+out_free:
        kfree(path);
+out_dput:
        dput(dentry);
        return err;
 }
@@ -2588,7 +2634,7 @@ static void handle_lease(struct ceph_mds_client *mdsc,
                         struct ceph_mds_session *session,
                         struct ceph_msg *msg)
 {
-       struct super_block *sb = mdsc->client->sb;
+       struct super_block *sb = mdsc->fsc->sb;
        struct inode *inode;
        struct ceph_inode_info *ci;
        struct dentry *parent, *dentry;
@@ -2866,17 +2912,23 @@ static void delayed_work(struct work_struct *work)
        schedule_delayed(mdsc);
 }
 
+int ceph_mdsc_init(struct ceph_fs_client *fsc)
 
-int ceph_mdsc_init(struct ceph_mds_client *mdsc, struct ceph_client *client)
 {
-       mdsc->client = client;
+       struct ceph_mds_client *mdsc;
+
+       mdsc = kzalloc(sizeof(struct ceph_mds_client), GFP_NOFS);
+       if (!mdsc)
+               return -ENOMEM;
+       mdsc->fsc = fsc;
+       fsc->mdsc = mdsc;
        mutex_init(&mdsc->mutex);
        mdsc->mdsmap = kzalloc(sizeof(*mdsc->mdsmap), GFP_NOFS);
        if (mdsc->mdsmap == NULL)
                return -ENOMEM;
 
        init_completion(&mdsc->safe_umount_waiters);
-       init_completion(&mdsc->session_close_waiters);
+       init_waitqueue_head(&mdsc->session_close_wq);
        INIT_LIST_HEAD(&mdsc->waiting_for_map);
        mdsc->sessions = NULL;
        mdsc->max_sessions = 0;
@@ -2902,7 +2954,7 @@ int ceph_mdsc_init(struct ceph_mds_client *mdsc, struct ceph_client *client)
        INIT_LIST_HEAD(&mdsc->dentry_lru);
 
        ceph_caps_init(mdsc);
-       ceph_adjust_min_caps(mdsc, client->min_caps);
+       ceph_adjust_min_caps(mdsc, fsc->min_caps);
 
        return 0;
 }
@@ -2914,7 +2966,7 @@ int ceph_mdsc_init(struct ceph_mds_client *mdsc, struct ceph_client *client)
 static void wait_requests(struct ceph_mds_client *mdsc)
 {
        struct ceph_mds_request *req;
-       struct ceph_client *client = mdsc->client;
+       struct ceph_fs_client *fsc = mdsc->fsc;
 
        mutex_lock(&mdsc->mutex);
        if (__get_oldest_req(mdsc)) {
@@ -2922,7 +2974,7 @@ static void wait_requests(struct ceph_mds_client *mdsc)
 
                dout("wait_requests waiting for requests\n");
                wait_for_completion_timeout(&mdsc->safe_umount_waiters,
-                                   client->mount_args->mount_timeout * HZ);
+                                   fsc->client->options->mount_timeout * HZ);
 
                /* tear down remaining requests */
                mutex_lock(&mdsc->mutex);
@@ -3005,7 +3057,7 @@ void ceph_mdsc_sync(struct ceph_mds_client *mdsc)
 {
        u64 want_tid, want_flush;
 
-       if (mdsc->client->mount_state == CEPH_MOUNT_SHUTDOWN)
+       if (mdsc->fsc->mount_state == CEPH_MOUNT_SHUTDOWN)
                return;
 
        dout("sync\n");
@@ -3021,6 +3073,23 @@ void ceph_mdsc_sync(struct ceph_mds_client *mdsc)
        wait_event(mdsc->cap_flushing_wq, check_cap_flush(mdsc, want_flush));
 }
 
+/*
+ * true if all sessions are closed, or we force unmount
+ */
+bool done_closing_sessions(struct ceph_mds_client *mdsc)
+{
+       int i, n = 0;
+
+       if (mdsc->fsc->mount_state == CEPH_MOUNT_SHUTDOWN)
+               return true;
+
+       mutex_lock(&mdsc->mutex);
+       for (i = 0; i < mdsc->max_sessions; i++)
+               if (mdsc->sessions[i])
+                       n++;
+       mutex_unlock(&mdsc->mutex);
+       return n == 0;
+}
 
 /*
  * called after sb is ro.
@@ -3029,45 +3098,32 @@ void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc)
 {
        struct ceph_mds_session *session;
        int i;
-       int n;
-       struct ceph_client *client = mdsc->client;
-       unsigned long started, timeout = client->mount_args->mount_timeout * HZ;
+       struct ceph_fs_client *fsc = mdsc->fsc;
+       unsigned long timeout = fsc->client->options->mount_timeout * HZ;
 
        dout("close_sessions\n");
 
-       mutex_lock(&mdsc->mutex);
-
        /* close sessions */
-       started = jiffies;
-       while (time_before(jiffies, started + timeout)) {
-               dout("closing sessions\n");
-               n = 0;
-               for (i = 0; i < mdsc->max_sessions; i++) {
-                       session = __ceph_lookup_mds_session(mdsc, i);
-                       if (!session)
-                               continue;
-                       mutex_unlock(&mdsc->mutex);
-                       mutex_lock(&session->s_mutex);
-                       __close_session(mdsc, session);
-                       mutex_unlock(&session->s_mutex);
-                       ceph_put_mds_session(session);
-                       mutex_lock(&mdsc->mutex);
-                       n++;
-               }
-               if (n == 0)
-                       break;
-
-               if (client->mount_state == CEPH_MOUNT_SHUTDOWN)
-                       break;
-
-               dout("waiting for sessions to close\n");
+       mutex_lock(&mdsc->mutex);
+       for (i = 0; i < mdsc->max_sessions; i++) {
+               session = __ceph_lookup_mds_session(mdsc, i);
+               if (!session)
+                       continue;
                mutex_unlock(&mdsc->mutex);
-               wait_for_completion_timeout(&mdsc->session_close_waiters,
-                                           timeout);
+               mutex_lock(&session->s_mutex);
+               __close_session(mdsc, session);
+               mutex_unlock(&session->s_mutex);
+               ceph_put_mds_session(session);
                mutex_lock(&mdsc->mutex);
        }
+       mutex_unlock(&mdsc->mutex);
+
+       dout("waiting for sessions to close\n");
+       wait_event_timeout(mdsc->session_close_wq, done_closing_sessions(mdsc),
+                          timeout);
 
        /* tear down remaining sessions */
+       mutex_lock(&mdsc->mutex);
        for (i = 0; i < mdsc->max_sessions; i++) {
                if (mdsc->sessions[i]) {
                        session = get_session(mdsc->sessions[i]);
@@ -3080,9 +3136,7 @@ void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc)
                        mutex_lock(&mdsc->mutex);
                }
        }
-
        WARN_ON(!list_empty(&mdsc->cap_delay_list));
-
        mutex_unlock(&mdsc->mutex);
 
        ceph_cleanup_empty_realms(mdsc);
@@ -3092,7 +3146,7 @@ void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc)
        dout("stopped\n");
 }
 
-void ceph_mdsc_stop(struct ceph_mds_client *mdsc)
+static void ceph_mdsc_stop(struct ceph_mds_client *mdsc)
 {
        dout("stop\n");
        cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */
@@ -3102,6 +3156,15 @@ void ceph_mdsc_stop(struct ceph_mds_client *mdsc)
        ceph_caps_finalize(mdsc);
 }
 
+void ceph_mdsc_destroy(struct ceph_fs_client *fsc)
+{
+       struct ceph_mds_client *mdsc = fsc->mdsc;
+
+       ceph_mdsc_stop(mdsc);
+       fsc->mdsc = NULL;
+       kfree(mdsc);
+}
+
 
 /*
  * handle mds map update.
@@ -3118,14 +3181,14 @@ void ceph_mdsc_handle_map(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
 
        ceph_decode_need(&p, end, sizeof(fsid)+2*sizeof(u32), bad);
        ceph_decode_copy(&p, &fsid, sizeof(fsid));
-       if (ceph_check_fsid(mdsc->client, &fsid) < 0)
+       if (ceph_check_fsid(mdsc->fsc->client, &fsid) < 0)
                return;
        epoch = ceph_decode_32(&p);
        maplen = ceph_decode_32(&p);
        dout("handle_map epoch %u len %d\n", epoch, (int)maplen);
 
        /* do we need it? */
-       ceph_monc_got_mdsmap(&mdsc->client->monc, epoch);
+       ceph_monc_got_mdsmap(&mdsc->fsc->client->monc, epoch);
        mutex_lock(&mdsc->mutex);
        if (mdsc->mdsmap && epoch <= mdsc->mdsmap->m_epoch) {
                dout("handle_map epoch %u <= our %u\n",
@@ -3149,7 +3212,7 @@ void ceph_mdsc_handle_map(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
        } else {
                mdsc->mdsmap = newmap;  /* first mds map */
        }
-       mdsc->client->sb->s_maxbytes = mdsc->mdsmap->m_max_file_size;
+       mdsc->fsc->sb->s_maxbytes = mdsc->mdsmap->m_max_file_size;
 
        __wake_requests(mdsc, &mdsc->waiting_for_map);
 
@@ -3250,7 +3313,7 @@ static int get_authorizer(struct ceph_connection *con,
 {
        struct ceph_mds_session *s = con->private;
        struct ceph_mds_client *mdsc = s->s_mdsc;
-       struct ceph_auth_client *ac = mdsc->client->monc.auth;
+       struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
        int ret = 0;
 
        if (force_new && s->s_authorizer) {
@@ -3284,7 +3347,7 @@ static int verify_authorizer_reply(struct ceph_connection *con, int len)
 {
        struct ceph_mds_session *s = con->private;
        struct ceph_mds_client *mdsc = s->s_mdsc;
-       struct ceph_auth_client *ac = mdsc->client->monc.auth;
+       struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
 
        return ac->ops->verify_authorizer_reply(ac, s->s_authorizer, len);
 }
@@ -3293,12 +3356,12 @@ static int invalidate_authorizer(struct ceph_connection *con)
 {
        struct ceph_mds_session *s = con->private;
        struct ceph_mds_client *mdsc = s->s_mdsc;
-       struct ceph_auth_client *ac = mdsc->client->monc.auth;
+       struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
 
        if (ac->ops->invalidate_authorizer)
                ac->ops->invalidate_authorizer(ac, CEPH_ENTITY_TYPE_MDS);
 
-       return ceph_monc_validate_auth(&mdsc->client->monc);
+       return ceph_monc_validate_auth(&mdsc->fsc->client->monc);
 }
 
 static const struct ceph_connection_operations mds_con_ops = {
@@ -3311,7 +3374,4 @@ static const struct ceph_connection_operations mds_con_ops = {
        .peer_reset = peer_reset,
 };
 
-
-
-
 /* eof */