]> git.kernelconcepts.de Git - karo-tx-linux.git/blobdiff - fs/ocfs2/stack_user.c
Merge branch 'akpm-current/current'
[karo-tx-linux.git] / fs / ocfs2 / stack_user.c
index 286edf1e231f3598b0e33c77297bc8cf0d91ebe2..13a8537d8e8b0b5732fa730dcff2870255a26e07 100644 (file)
@@ -23,6 +23,7 @@
 #include <linux/mutex.h>
 #include <linux/slab.h>
 #include <linux/reboot.h>
+#include <linux/sched.h>
 #include <asm/uaccess.h>
 
 #include "stackglue.h"
 #define OCFS2_TEXT_UUID_LEN                    32
 #define OCFS2_CONTROL_MESSAGE_VERNUM_LEN       2
 #define OCFS2_CONTROL_MESSAGE_NODENUM_LEN      8
+#define VERSION_LOCK                           "version_lock"
+
+enum ocfs2_connection_type {
+       WITH_CONTROLD,
+       NO_CONTROLD
+};
 
 /*
  * ocfs2_live_connection is refcounted because the filesystem and
 struct ocfs2_live_connection {
        struct list_head                oc_list;
        struct ocfs2_cluster_connection *oc_conn;
+       enum ocfs2_connection_type      oc_type;
+       atomic_t                        oc_this_node;
+       int                             oc_our_slot;
+       struct dlm_lksb                 oc_version_lksb;
+       char                            oc_lvb[DLM_LVB_LEN];
+       struct completion               oc_sync_wait;
+       wait_queue_head_t               oc_wait;
 };
 
 struct ocfs2_control_private {
@@ -198,20 +212,15 @@ static struct ocfs2_live_connection *ocfs2_connection_find(const char *name)
  * mount path.  Since the VFS prevents multiple calls to
  * fill_super(), we can't get dupes here.
  */
-static int ocfs2_live_connection_new(struct ocfs2_cluster_connection *conn,
-                                    struct ocfs2_live_connection **c_ret)
+static int ocfs2_live_connection_attach(struct ocfs2_cluster_connection *conn,
+                                    struct ocfs2_live_connection *c)
 {
        int rc = 0;
-       struct ocfs2_live_connection *c;
-
-       c = kzalloc(sizeof(struct ocfs2_live_connection), GFP_KERNEL);
-       if (!c)
-               return -ENOMEM;
 
        mutex_lock(&ocfs2_control_lock);
        c->oc_conn = conn;
 
-       if (atomic_read(&ocfs2_control_opened))
+       if ((c->oc_type == NO_CONTROLD) || atomic_read(&ocfs2_control_opened))
                list_add(&c->oc_list, &ocfs2_live_connection_list);
        else {
                printk(KERN_ERR
@@ -220,12 +229,6 @@ static int ocfs2_live_connection_new(struct ocfs2_cluster_connection *conn,
        }
 
        mutex_unlock(&ocfs2_control_lock);
-
-       if (!rc)
-               *c_ret = c;
-       else
-               kfree(c);
-
        return rc;
 }
 
@@ -799,18 +802,251 @@ static int fs_protocol_compare(struct ocfs2_protocol_version *existing,
        return 0;
 }
 
+static void lvb_to_version(char *lvb, struct ocfs2_protocol_version *ver)
+{
+       struct ocfs2_protocol_version *pv =
+               (struct ocfs2_protocol_version *)lvb;
+       /*
+        * ocfs2_protocol_version has two u8 variables, so we don't
+        * need any endian conversion.
+        */
+       ver->pv_major = pv->pv_major;
+       ver->pv_minor = pv->pv_minor;
+}
+
+static void version_to_lvb(struct ocfs2_protocol_version *ver, char *lvb)
+{
+       struct ocfs2_protocol_version *pv =
+               (struct ocfs2_protocol_version *)lvb;
+       /*
+        * ocfs2_protocol_version has two u8 variables, so we don't
+        * need any endian conversion.
+        */
+       pv->pv_major = ver->pv_major;
+       pv->pv_minor = ver->pv_minor;
+}
+
+static void sync_wait_cb(void *arg)
+{
+       struct ocfs2_cluster_connection *conn = arg;
+       struct ocfs2_live_connection *lc = conn->cc_private;
+       complete(&lc->oc_sync_wait);
+}
+
+static int sync_unlock(struct ocfs2_cluster_connection *conn,
+               struct dlm_lksb *lksb, char *name)
+{
+       int error;
+       struct ocfs2_live_connection *lc = conn->cc_private;
+
+       error = dlm_unlock(conn->cc_lockspace, lksb->sb_lkid, 0, lksb, conn);
+       if (error) {
+               printk(KERN_ERR "%s lkid %x error %d\n",
+                               name, lksb->sb_lkid, error);
+               return error;
+       }
+
+       wait_for_completion(&lc->oc_sync_wait);
+
+       if (lksb->sb_status != -DLM_EUNLOCK) {
+               printk(KERN_ERR "%s lkid %x status %d\n",
+                               name, lksb->sb_lkid, lksb->sb_status);
+               return -1;
+       }
+       return 0;
+}
+
+static int sync_lock(struct ocfs2_cluster_connection *conn,
+               int mode, uint32_t flags,
+               struct dlm_lksb *lksb, char *name)
+{
+       int error, status;
+       struct ocfs2_live_connection *lc = conn->cc_private;
+
+       error = dlm_lock(conn->cc_lockspace, mode, lksb, flags,
+                       name, strlen(name),
+                       0, sync_wait_cb, conn, NULL);
+       if (error) {
+               printk(KERN_ERR "%s lkid %x flags %x mode %d error %d\n",
+                               name, lksb->sb_lkid, flags, mode, error);
+               return error;
+       }
+
+       wait_for_completion(&lc->oc_sync_wait);
+
+       status = lksb->sb_status;
+
+       if (status && status != -EAGAIN) {
+               printk(KERN_ERR "%s lkid %x flags %x mode %d status %d\n",
+                               name, lksb->sb_lkid, flags, mode, status);
+       }
+
+       return status;
+}
+
+
+static int version_lock(struct ocfs2_cluster_connection *conn, int mode,
+               int flags)
+{
+       struct ocfs2_live_connection *lc = conn->cc_private;
+       return sync_lock(conn, mode, flags,
+                       &lc->oc_version_lksb, VERSION_LOCK);
+}
+
+static int version_unlock(struct ocfs2_cluster_connection *conn)
+{
+       struct ocfs2_live_connection *lc = conn->cc_private;
+       return sync_unlock(conn, &lc->oc_version_lksb, VERSION_LOCK);
+}
+
+/* get_protocol_version()
+ *
+ * To exchange ocfs2 versioning, we use the LVB of the version dlm lock.
+ * The algorithm is:
+ * 1. Attempt to take the lock in EX mode (non-blocking).
+ * 2. If successful (which means it is the first mount), write the
+ *    version number and downconvert to PR lock.
+ * 3. If unsuccessful (returns -EAGAIN), read the version from the LVB after
+ *    taking the PR lock.
+ */
+
+static int get_protocol_version(struct ocfs2_cluster_connection *conn)
+{
+       int ret;
+       struct ocfs2_live_connection *lc = conn->cc_private;
+       struct ocfs2_protocol_version pv;
+
+       running_proto.pv_major =
+               ocfs2_user_plugin.sp_max_proto.pv_major;
+       running_proto.pv_minor =
+               ocfs2_user_plugin.sp_max_proto.pv_minor;
+
+       lc->oc_version_lksb.sb_lvbptr = lc->oc_lvb;
+       ret = version_lock(conn, DLM_LOCK_EX,
+                       DLM_LKF_VALBLK|DLM_LKF_NOQUEUE);
+       if (!ret) {
+               conn->cc_version.pv_major = running_proto.pv_major;
+               conn->cc_version.pv_minor = running_proto.pv_minor;
+               version_to_lvb(&running_proto, lc->oc_lvb);
+               version_lock(conn, DLM_LOCK_PR, DLM_LKF_CONVERT|DLM_LKF_VALBLK);
+       } else if (ret == -EAGAIN) {
+               ret = version_lock(conn, DLM_LOCK_PR, DLM_LKF_VALBLK);
+               if (ret)
+                       goto out;
+               lvb_to_version(lc->oc_lvb, &pv);
+
+               if ((pv.pv_major != running_proto.pv_major) ||
+                               (pv.pv_minor > running_proto.pv_minor)) {
+                       ret = -EINVAL;
+                       goto out;
+               }
+
+               conn->cc_version.pv_major = pv.pv_major;
+               conn->cc_version.pv_minor = pv.pv_minor;
+       }
+out:
+       return ret;
+}
+
+static void user_recover_prep(void *arg)
+{
+}
+
+static void user_recover_slot(void *arg, struct dlm_slot *slot)
+{
+       struct ocfs2_cluster_connection *conn = arg;
+       printk(KERN_INFO "ocfs2: Node %d/%d down. Initiating recovery.\n",
+                       slot->nodeid, slot->slot);
+       conn->cc_recovery_handler(slot->nodeid, conn->cc_recovery_data);
+
+}
+
+static void user_recover_done(void *arg, struct dlm_slot *slots,
+               int num_slots, int our_slot,
+               uint32_t generation)
+{
+       struct ocfs2_cluster_connection *conn = arg;
+       struct ocfs2_live_connection *lc = conn->cc_private;
+       int i;
+
+       for (i = 0; i < num_slots; i++)
+               if (slots[i].slot == our_slot) {
+                       atomic_set(&lc->oc_this_node, slots[i].nodeid);
+                       break;
+               }
+
+       lc->oc_our_slot = our_slot;
+       wake_up(&lc->oc_wait);
+}
+
+static const struct dlm_lockspace_ops ocfs2_ls_ops = {
+       .recover_prep = user_recover_prep,
+       .recover_slot = user_recover_slot,
+       .recover_done = user_recover_done,
+};
+
+static int user_cluster_disconnect(struct ocfs2_cluster_connection *conn)
+{
+       version_unlock(conn);
+       dlm_release_lockspace(conn->cc_lockspace, 2);
+       conn->cc_lockspace = NULL;
+       ocfs2_live_connection_drop(conn->cc_private);
+       conn->cc_private = NULL;
+       return 0;
+}
+
 static int user_cluster_connect(struct ocfs2_cluster_connection *conn)
 {
        dlm_lockspace_t *fsdlm;
-       struct ocfs2_live_connection *uninitialized_var(control);
-       int rc = 0;
+       struct ocfs2_live_connection *lc;
+       int rc, ops_rv;
 
        BUG_ON(conn == NULL);
 
-       rc = ocfs2_live_connection_new(conn, &control);
+       lc = kzalloc(sizeof(struct ocfs2_live_connection), GFP_KERNEL);
+       if (!lc) {
+               rc = -ENOMEM;
+               goto out;
+       }
+
+       init_waitqueue_head(&lc->oc_wait);
+       init_completion(&lc->oc_sync_wait);
+       atomic_set(&lc->oc_this_node, 0);
+       conn->cc_private = lc;
+       lc->oc_type = NO_CONTROLD;
+
+       rc = dlm_new_lockspace(conn->cc_name, conn->cc_cluster_name,
+                              DLM_LSFL_FS, DLM_LVB_LEN,
+                              &ocfs2_ls_ops, conn, &ops_rv, &fsdlm);
+       if (rc)
+               goto out;
+
+       if (ops_rv == -EOPNOTSUPP) {
+               lc->oc_type = WITH_CONTROLD;
+               printk(KERN_NOTICE "ocfs2: You seem to be using an older "
+                               "version of dlm_controld and/or ocfs2-tools."
+                               " Please consider upgrading.\n");
+       } else if (ops_rv) {
+               rc = ops_rv;
+               goto out;
+       }
+       conn->cc_lockspace = fsdlm;
+
+       rc = ocfs2_live_connection_attach(conn, lc);
        if (rc)
                goto out;
 
+       if (lc->oc_type == NO_CONTROLD) {
+               rc = get_protocol_version(conn);
+               if (rc) {
+                       printk(KERN_ERR "ocfs2: Could not determine"
+                                       " locking version\n");
+                       user_cluster_disconnect(conn);
+                       goto out;
+               }
+               wait_event(lc->oc_wait, (atomic_read(&lc->oc_this_node) > 0));
+       }
+
        /*
         * running_proto must have been set before we allowed any mounts
         * to proceed.
@@ -818,42 +1054,34 @@ static int user_cluster_connect(struct ocfs2_cluster_connection *conn)
        if (fs_protocol_compare(&running_proto, &conn->cc_version)) {
                printk(KERN_ERR
                       "Unable to mount with fs locking protocol version "
-                      "%u.%u because the userspace control daemon has "
-                      "negotiated %u.%u\n",
+                      "%u.%u because negotiated protocol is %u.%u\n",
                       conn->cc_version.pv_major, conn->cc_version.pv_minor,
                       running_proto.pv_major, running_proto.pv_minor);
                rc = -EPROTO;
-               ocfs2_live_connection_drop(control);
-               goto out;
-       }
-
-       rc = dlm_new_lockspace(conn->cc_name, NULL, DLM_LSFL_FS, DLM_LVB_LEN,
-                              NULL, NULL, NULL, &fsdlm);
-       if (rc) {
-               ocfs2_live_connection_drop(control);
-               goto out;
+               ocfs2_live_connection_drop(lc);
+               lc = NULL;
        }
 
-       conn->cc_private = control;
-       conn->cc_lockspace = fsdlm;
 out:
+       if (rc && lc)
+               kfree(lc);
        return rc;
 }
 
-static int user_cluster_disconnect(struct ocfs2_cluster_connection *conn)
-{
-       dlm_release_lockspace(conn->cc_lockspace, 2);
-       conn->cc_lockspace = NULL;
-       ocfs2_live_connection_drop(conn->cc_private);
-       conn->cc_private = NULL;
-       return 0;
-}
 
-static int user_cluster_this_node(unsigned int *this_node)
+static int user_cluster_this_node(struct ocfs2_cluster_connection *conn,
+                                 unsigned int *this_node)
 {
        int rc;
+       struct ocfs2_live_connection *lc = conn->cc_private;
+
+       if (lc->oc_type == WITH_CONTROLD)
+               rc = ocfs2_control_get_this_node();
+       else if (lc->oc_type == NO_CONTROLD)
+               rc = atomic_read(&lc->oc_this_node);
+       else
+               rc = -EINVAL;
 
-       rc = ocfs2_control_get_this_node();
        if (rc < 0)
                return rc;