]> git.kernelconcepts.de Git - karo-tx-linux.git/blob - fs/dlm/lockspace.c
Merge branch 'x86-urgent-for-linus' of git://git.kernel.org/pub/scm/linux/kernel...
[karo-tx-linux.git] / fs / dlm / lockspace.c
1 /******************************************************************************
2 *******************************************************************************
3 **
4 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
5 **  Copyright (C) 2004-2011 Red Hat, Inc.  All rights reserved.
6 **
7 **  This copyrighted material is made available to anyone wishing to use,
8 **  modify, copy, or redistribute it subject to the terms and conditions
9 **  of the GNU General Public License v.2.
10 **
11 *******************************************************************************
12 ******************************************************************************/
13
14 #include "dlm_internal.h"
15 #include "lockspace.h"
16 #include "member.h"
17 #include "recoverd.h"
18 #include "dir.h"
19 #include "lowcomms.h"
20 #include "config.h"
21 #include "memory.h"
22 #include "lock.h"
23 #include "recover.h"
24 #include "requestqueue.h"
25 #include "user.h"
26 #include "ast.h"
27
28 static int                      ls_count;
29 static struct mutex             ls_lock;
30 static struct list_head         lslist;
31 static spinlock_t               lslist_lock;
32 static struct task_struct *     scand_task;
33
34
35 static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
36 {
37         ssize_t ret = len;
38         int n = simple_strtol(buf, NULL, 0);
39
40         ls = dlm_find_lockspace_local(ls->ls_local_handle);
41         if (!ls)
42                 return -EINVAL;
43
44         switch (n) {
45         case 0:
46                 dlm_ls_stop(ls);
47                 break;
48         case 1:
49                 dlm_ls_start(ls);
50                 break;
51         default:
52                 ret = -EINVAL;
53         }
54         dlm_put_lockspace(ls);
55         return ret;
56 }
57
58 static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len)
59 {
60         ls->ls_uevent_result = simple_strtol(buf, NULL, 0);
61         set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags);
62         wake_up(&ls->ls_uevent_wait);
63         return len;
64 }
65
66 static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf)
67 {
68         return snprintf(buf, PAGE_SIZE, "%u\n", ls->ls_global_id);
69 }
70
71 static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
72 {
73         ls->ls_global_id = simple_strtoul(buf, NULL, 0);
74         return len;
75 }
76
77 static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf)
78 {
79         uint32_t status = dlm_recover_status(ls);
80         return snprintf(buf, PAGE_SIZE, "%x\n", status);
81 }
82
83 static ssize_t dlm_recover_nodeid_show(struct dlm_ls *ls, char *buf)
84 {
85         return snprintf(buf, PAGE_SIZE, "%d\n", ls->ls_recover_nodeid);
86 }
87
88 struct dlm_attr {
89         struct attribute attr;
90         ssize_t (*show)(struct dlm_ls *, char *);
91         ssize_t (*store)(struct dlm_ls *, const char *, size_t);
92 };
93
94 static struct dlm_attr dlm_attr_control = {
95         .attr  = {.name = "control", .mode = S_IWUSR},
96         .store = dlm_control_store
97 };
98
99 static struct dlm_attr dlm_attr_event = {
100         .attr  = {.name = "event_done", .mode = S_IWUSR},
101         .store = dlm_event_store
102 };
103
104 static struct dlm_attr dlm_attr_id = {
105         .attr  = {.name = "id", .mode = S_IRUGO | S_IWUSR},
106         .show  = dlm_id_show,
107         .store = dlm_id_store
108 };
109
110 static struct dlm_attr dlm_attr_recover_status = {
111         .attr  = {.name = "recover_status", .mode = S_IRUGO},
112         .show  = dlm_recover_status_show
113 };
114
115 static struct dlm_attr dlm_attr_recover_nodeid = {
116         .attr  = {.name = "recover_nodeid", .mode = S_IRUGO},
117         .show  = dlm_recover_nodeid_show
118 };
119
120 static struct attribute *dlm_attrs[] = {
121         &dlm_attr_control.attr,
122         &dlm_attr_event.attr,
123         &dlm_attr_id.attr,
124         &dlm_attr_recover_status.attr,
125         &dlm_attr_recover_nodeid.attr,
126         NULL,
127 };
128
129 static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr,
130                              char *buf)
131 {
132         struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
133         struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
134         return a->show ? a->show(ls, buf) : 0;
135 }
136
137 static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr,
138                               const char *buf, size_t len)
139 {
140         struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
141         struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
142         return a->store ? a->store(ls, buf, len) : len;
143 }
144
145 static void lockspace_kobj_release(struct kobject *k)
146 {
147         struct dlm_ls *ls  = container_of(k, struct dlm_ls, ls_kobj);
148         kfree(ls);
149 }
150
151 static const struct sysfs_ops dlm_attr_ops = {
152         .show  = dlm_attr_show,
153         .store = dlm_attr_store,
154 };
155
156 static struct kobj_type dlm_ktype = {
157         .default_attrs = dlm_attrs,
158         .sysfs_ops     = &dlm_attr_ops,
159         .release       = lockspace_kobj_release,
160 };
161
162 static struct kset *dlm_kset;
163
164 static int do_uevent(struct dlm_ls *ls, int in)
165 {
166         int error;
167
168         if (in)
169                 kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE);
170         else
171                 kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE);
172
173         log_debug(ls, "%s the lockspace group...", in ? "joining" : "leaving");
174
175         /* dlm_controld will see the uevent, do the necessary group management
176            and then write to sysfs to wake us */
177
178         error = wait_event_interruptible(ls->ls_uevent_wait,
179                         test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags));
180
181         log_debug(ls, "group event done %d %d", error, ls->ls_uevent_result);
182
183         if (error)
184                 goto out;
185
186         error = ls->ls_uevent_result;
187  out:
188         if (error)
189                 log_error(ls, "group %s failed %d %d", in ? "join" : "leave",
190                           error, ls->ls_uevent_result);
191         return error;
192 }
193
194 static int dlm_uevent(struct kset *kset, struct kobject *kobj,
195                       struct kobj_uevent_env *env)
196 {
197         struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
198
199         add_uevent_var(env, "LOCKSPACE=%s", ls->ls_name);
200         return 0;
201 }
202
203 static struct kset_uevent_ops dlm_uevent_ops = {
204         .uevent = dlm_uevent,
205 };
206
207 int __init dlm_lockspace_init(void)
208 {
209         ls_count = 0;
210         mutex_init(&ls_lock);
211         INIT_LIST_HEAD(&lslist);
212         spin_lock_init(&lslist_lock);
213
214         dlm_kset = kset_create_and_add("dlm", &dlm_uevent_ops, kernel_kobj);
215         if (!dlm_kset) {
216                 printk(KERN_WARNING "%s: can not create kset\n", __func__);
217                 return -ENOMEM;
218         }
219         return 0;
220 }
221
222 void dlm_lockspace_exit(void)
223 {
224         kset_unregister(dlm_kset);
225 }
226
227 static struct dlm_ls *find_ls_to_scan(void)
228 {
229         struct dlm_ls *ls;
230
231         spin_lock(&lslist_lock);
232         list_for_each_entry(ls, &lslist, ls_list) {
233                 if (time_after_eq(jiffies, ls->ls_scan_time +
234                                             dlm_config.ci_scan_secs * HZ)) {
235                         spin_unlock(&lslist_lock);
236                         return ls;
237                 }
238         }
239         spin_unlock(&lslist_lock);
240         return NULL;
241 }
242
243 static int dlm_scand(void *data)
244 {
245         struct dlm_ls *ls;
246
247         while (!kthread_should_stop()) {
248                 ls = find_ls_to_scan();
249                 if (ls) {
250                         if (dlm_lock_recovery_try(ls)) {
251                                 ls->ls_scan_time = jiffies;
252                                 dlm_scan_rsbs(ls);
253                                 dlm_scan_timeout(ls);
254                                 dlm_scan_waiters(ls);
255                                 dlm_unlock_recovery(ls);
256                         } else {
257                                 ls->ls_scan_time += HZ;
258                         }
259                         continue;
260                 }
261                 schedule_timeout_interruptible(dlm_config.ci_scan_secs * HZ);
262         }
263         return 0;
264 }
265
266 static int dlm_scand_start(void)
267 {
268         struct task_struct *p;
269         int error = 0;
270
271         p = kthread_run(dlm_scand, NULL, "dlm_scand");
272         if (IS_ERR(p))
273                 error = PTR_ERR(p);
274         else
275                 scand_task = p;
276         return error;
277 }
278
279 static void dlm_scand_stop(void)
280 {
281         kthread_stop(scand_task);
282 }
283
284 struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
285 {
286         struct dlm_ls *ls;
287
288         spin_lock(&lslist_lock);
289
290         list_for_each_entry(ls, &lslist, ls_list) {
291                 if (ls->ls_global_id == id) {
292                         ls->ls_count++;
293                         goto out;
294                 }
295         }
296         ls = NULL;
297  out:
298         spin_unlock(&lslist_lock);
299         return ls;
300 }
301
302 struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
303 {
304         struct dlm_ls *ls;
305
306         spin_lock(&lslist_lock);
307         list_for_each_entry(ls, &lslist, ls_list) {
308                 if (ls->ls_local_handle == lockspace) {
309                         ls->ls_count++;
310                         goto out;
311                 }
312         }
313         ls = NULL;
314  out:
315         spin_unlock(&lslist_lock);
316         return ls;
317 }
318
319 struct dlm_ls *dlm_find_lockspace_device(int minor)
320 {
321         struct dlm_ls *ls;
322
323         spin_lock(&lslist_lock);
324         list_for_each_entry(ls, &lslist, ls_list) {
325                 if (ls->ls_device.minor == minor) {
326                         ls->ls_count++;
327                         goto out;
328                 }
329         }
330         ls = NULL;
331  out:
332         spin_unlock(&lslist_lock);
333         return ls;
334 }
335
336 void dlm_put_lockspace(struct dlm_ls *ls)
337 {
338         spin_lock(&lslist_lock);
339         ls->ls_count--;
340         spin_unlock(&lslist_lock);
341 }
342
343 static void remove_lockspace(struct dlm_ls *ls)
344 {
345         for (;;) {
346                 spin_lock(&lslist_lock);
347                 if (ls->ls_count == 0) {
348                         WARN_ON(ls->ls_create_count != 0);
349                         list_del(&ls->ls_list);
350                         spin_unlock(&lslist_lock);
351                         return;
352                 }
353                 spin_unlock(&lslist_lock);
354                 ssleep(1);
355         }
356 }
357
358 static int threads_start(void)
359 {
360         int error;
361
362         error = dlm_scand_start();
363         if (error) {
364                 log_print("cannot start dlm_scand thread %d", error);
365                 goto fail;
366         }
367
368         /* Thread for sending/receiving messages for all lockspace's */
369         error = dlm_lowcomms_start();
370         if (error) {
371                 log_print("cannot start dlm lowcomms %d", error);
372                 goto scand_fail;
373         }
374
375         return 0;
376
377  scand_fail:
378         dlm_scand_stop();
379  fail:
380         return error;
381 }
382
383 static void threads_stop(void)
384 {
385         dlm_scand_stop();
386         dlm_lowcomms_stop();
387 }
388
389 static int new_lockspace(const char *name, const char *cluster,
390                          uint32_t flags, int lvblen,
391                          const struct dlm_lockspace_ops *ops, void *ops_arg,
392                          int *ops_result, dlm_lockspace_t **lockspace)
393 {
394         struct dlm_ls *ls;
395         int i, size, error;
396         int do_unreg = 0;
397         int namelen = strlen(name);
398
399         if (namelen > DLM_LOCKSPACE_LEN)
400                 return -EINVAL;
401
402         if (!lvblen || (lvblen % 8))
403                 return -EINVAL;
404
405         if (!try_module_get(THIS_MODULE))
406                 return -EINVAL;
407
408         if (!dlm_user_daemon_available()) {
409                 log_print("dlm user daemon not available");
410                 error = -EUNATCH;
411                 goto out;
412         }
413
414         if (ops && ops_result) {
415                 if (!dlm_config.ci_recover_callbacks)
416                         *ops_result = -EOPNOTSUPP;
417                 else
418                         *ops_result = 0;
419         }
420
421         if (dlm_config.ci_recover_callbacks && cluster &&
422             strncmp(cluster, dlm_config.ci_cluster_name, DLM_LOCKSPACE_LEN)) {
423                 log_print("dlm cluster name %s mismatch %s",
424                           dlm_config.ci_cluster_name, cluster);
425                 error = -EBADR;
426                 goto out;
427         }
428
429         error = 0;
430
431         spin_lock(&lslist_lock);
432         list_for_each_entry(ls, &lslist, ls_list) {
433                 WARN_ON(ls->ls_create_count <= 0);
434                 if (ls->ls_namelen != namelen)
435                         continue;
436                 if (memcmp(ls->ls_name, name, namelen))
437                         continue;
438                 if (flags & DLM_LSFL_NEWEXCL) {
439                         error = -EEXIST;
440                         break;
441                 }
442                 ls->ls_create_count++;
443                 *lockspace = ls;
444                 error = 1;
445                 break;
446         }
447         spin_unlock(&lslist_lock);
448
449         if (error)
450                 goto out;
451
452         error = -ENOMEM;
453
454         ls = kzalloc(sizeof(struct dlm_ls) + namelen, GFP_NOFS);
455         if (!ls)
456                 goto out;
457         memcpy(ls->ls_name, name, namelen);
458         ls->ls_namelen = namelen;
459         ls->ls_lvblen = lvblen;
460         ls->ls_count = 0;
461         ls->ls_flags = 0;
462         ls->ls_scan_time = jiffies;
463
464         if (ops && dlm_config.ci_recover_callbacks) {
465                 ls->ls_ops = ops;
466                 ls->ls_ops_arg = ops_arg;
467         }
468
469         if (flags & DLM_LSFL_TIMEWARN)
470                 set_bit(LSFL_TIMEWARN, &ls->ls_flags);
471
472         /* ls_exflags are forced to match among nodes, and we don't
473            need to require all nodes to have some flags set */
474         ls->ls_exflags = (flags & ~(DLM_LSFL_TIMEWARN | DLM_LSFL_FS |
475                                     DLM_LSFL_NEWEXCL));
476
477         size = dlm_config.ci_rsbtbl_size;
478         ls->ls_rsbtbl_size = size;
479
480         ls->ls_rsbtbl = vmalloc(sizeof(struct dlm_rsbtable) * size);
481         if (!ls->ls_rsbtbl)
482                 goto out_lsfree;
483         for (i = 0; i < size; i++) {
484                 ls->ls_rsbtbl[i].keep.rb_node = NULL;
485                 ls->ls_rsbtbl[i].toss.rb_node = NULL;
486                 spin_lock_init(&ls->ls_rsbtbl[i].lock);
487         }
488
489         idr_init(&ls->ls_lkbidr);
490         spin_lock_init(&ls->ls_lkbidr_spin);
491
492         size = dlm_config.ci_dirtbl_size;
493         ls->ls_dirtbl_size = size;
494
495         ls->ls_dirtbl = vmalloc(sizeof(struct dlm_dirtable) * size);
496         if (!ls->ls_dirtbl)
497                 goto out_lkbfree;
498         for (i = 0; i < size; i++) {
499                 INIT_LIST_HEAD(&ls->ls_dirtbl[i].list);
500                 spin_lock_init(&ls->ls_dirtbl[i].lock);
501         }
502
503         INIT_LIST_HEAD(&ls->ls_waiters);
504         mutex_init(&ls->ls_waiters_mutex);
505         INIT_LIST_HEAD(&ls->ls_orphans);
506         mutex_init(&ls->ls_orphans_mutex);
507         INIT_LIST_HEAD(&ls->ls_timeout);
508         mutex_init(&ls->ls_timeout_mutex);
509
510         INIT_LIST_HEAD(&ls->ls_new_rsb);
511         spin_lock_init(&ls->ls_new_rsb_spin);
512
513         INIT_LIST_HEAD(&ls->ls_nodes);
514         INIT_LIST_HEAD(&ls->ls_nodes_gone);
515         ls->ls_num_nodes = 0;
516         ls->ls_low_nodeid = 0;
517         ls->ls_total_weight = 0;
518         ls->ls_node_array = NULL;
519
520         memset(&ls->ls_stub_rsb, 0, sizeof(struct dlm_rsb));
521         ls->ls_stub_rsb.res_ls = ls;
522
523         ls->ls_debug_rsb_dentry = NULL;
524         ls->ls_debug_waiters_dentry = NULL;
525
526         init_waitqueue_head(&ls->ls_uevent_wait);
527         ls->ls_uevent_result = 0;
528         init_completion(&ls->ls_members_done);
529         ls->ls_members_result = -1;
530
531         mutex_init(&ls->ls_cb_mutex);
532         INIT_LIST_HEAD(&ls->ls_cb_delay);
533
534         ls->ls_recoverd_task = NULL;
535         mutex_init(&ls->ls_recoverd_active);
536         spin_lock_init(&ls->ls_recover_lock);
537         spin_lock_init(&ls->ls_rcom_spin);
538         get_random_bytes(&ls->ls_rcom_seq, sizeof(uint64_t));
539         ls->ls_recover_status = 0;
540         ls->ls_recover_seq = 0;
541         ls->ls_recover_args = NULL;
542         init_rwsem(&ls->ls_in_recovery);
543         init_rwsem(&ls->ls_recv_active);
544         INIT_LIST_HEAD(&ls->ls_requestqueue);
545         mutex_init(&ls->ls_requestqueue_mutex);
546         mutex_init(&ls->ls_clear_proc_locks);
547
548         ls->ls_recover_buf = kmalloc(dlm_config.ci_buffer_size, GFP_NOFS);
549         if (!ls->ls_recover_buf)
550                 goto out_dirfree;
551
552         ls->ls_slot = 0;
553         ls->ls_num_slots = 0;
554         ls->ls_slots_size = 0;
555         ls->ls_slots = NULL;
556
557         INIT_LIST_HEAD(&ls->ls_recover_list);
558         spin_lock_init(&ls->ls_recover_list_lock);
559         ls->ls_recover_list_count = 0;
560         ls->ls_local_handle = ls;
561         init_waitqueue_head(&ls->ls_wait_general);
562         INIT_LIST_HEAD(&ls->ls_root_list);
563         init_rwsem(&ls->ls_root_sem);
564
565         down_write(&ls->ls_in_recovery);
566
567         spin_lock(&lslist_lock);
568         ls->ls_create_count = 1;
569         list_add(&ls->ls_list, &lslist);
570         spin_unlock(&lslist_lock);
571
572         if (flags & DLM_LSFL_FS) {
573                 error = dlm_callback_start(ls);
574                 if (error) {
575                         log_error(ls, "can't start dlm_callback %d", error);
576                         goto out_delist;
577                 }
578         }
579
580         /* needs to find ls in lslist */
581         error = dlm_recoverd_start(ls);
582         if (error) {
583                 log_error(ls, "can't start dlm_recoverd %d", error);
584                 goto out_callback;
585         }
586
587         ls->ls_kobj.kset = dlm_kset;
588         error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL,
589                                      "%s", ls->ls_name);
590         if (error)
591                 goto out_recoverd;
592         kobject_uevent(&ls->ls_kobj, KOBJ_ADD);
593
594         /* let kobject handle freeing of ls if there's an error */
595         do_unreg = 1;
596
597         /* This uevent triggers dlm_controld in userspace to add us to the
598            group of nodes that are members of this lockspace (managed by the
599            cluster infrastructure.)  Once it's done that, it tells us who the
600            current lockspace members are (via configfs) and then tells the
601            lockspace to start running (via sysfs) in dlm_ls_start(). */
602
603         error = do_uevent(ls, 1);
604         if (error)
605                 goto out_recoverd;
606
607         wait_for_completion(&ls->ls_members_done);
608         error = ls->ls_members_result;
609         if (error)
610                 goto out_members;
611
612         dlm_create_debug_file(ls);
613
614         log_debug(ls, "join complete");
615         *lockspace = ls;
616         return 0;
617
618  out_members:
619         do_uevent(ls, 0);
620         dlm_clear_members(ls);
621         kfree(ls->ls_node_array);
622  out_recoverd:
623         dlm_recoverd_stop(ls);
624  out_callback:
625         dlm_callback_stop(ls);
626  out_delist:
627         spin_lock(&lslist_lock);
628         list_del(&ls->ls_list);
629         spin_unlock(&lslist_lock);
630         kfree(ls->ls_recover_buf);
631  out_dirfree:
632         vfree(ls->ls_dirtbl);
633  out_lkbfree:
634         idr_destroy(&ls->ls_lkbidr);
635         vfree(ls->ls_rsbtbl);
636  out_lsfree:
637         if (do_unreg)
638                 kobject_put(&ls->ls_kobj);
639         else
640                 kfree(ls);
641  out:
642         module_put(THIS_MODULE);
643         return error;
644 }
645
646 int dlm_new_lockspace(const char *name, const char *cluster,
647                       uint32_t flags, int lvblen,
648                       const struct dlm_lockspace_ops *ops, void *ops_arg,
649                       int *ops_result, dlm_lockspace_t **lockspace)
650 {
651         int error = 0;
652
653         mutex_lock(&ls_lock);
654         if (!ls_count)
655                 error = threads_start();
656         if (error)
657                 goto out;
658
659         error = new_lockspace(name, cluster, flags, lvblen, ops, ops_arg,
660                               ops_result, lockspace);
661         if (!error)
662                 ls_count++;
663         if (error > 0)
664                 error = 0;
665         if (!ls_count)
666                 threads_stop();
667  out:
668         mutex_unlock(&ls_lock);
669         return error;
670 }
671
672 static int lkb_idr_is_local(int id, void *p, void *data)
673 {
674         struct dlm_lkb *lkb = p;
675
676         if (!lkb->lkb_nodeid)
677                 return 1;
678         return 0;
679 }
680
681 static int lkb_idr_is_any(int id, void *p, void *data)
682 {
683         return 1;
684 }
685
686 static int lkb_idr_free(int id, void *p, void *data)
687 {
688         struct dlm_lkb *lkb = p;
689
690         if (lkb->lkb_lvbptr && lkb->lkb_flags & DLM_IFL_MSTCPY)
691                 dlm_free_lvb(lkb->lkb_lvbptr);
692
693         dlm_free_lkb(lkb);
694         return 0;
695 }
696
697 /* NOTE: We check the lkbidr here rather than the resource table.
698    This is because there may be LKBs queued as ASTs that have been unlinked
699    from their RSBs and are pending deletion once the AST has been delivered */
700
701 static int lockspace_busy(struct dlm_ls *ls, int force)
702 {
703         int rv;
704
705         spin_lock(&ls->ls_lkbidr_spin);
706         if (force == 0) {
707                 rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_any, ls);
708         } else if (force == 1) {
709                 rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_local, ls);
710         } else {
711                 rv = 0;
712         }
713         spin_unlock(&ls->ls_lkbidr_spin);
714         return rv;
715 }
716
717 static int release_lockspace(struct dlm_ls *ls, int force)
718 {
719         struct dlm_rsb *rsb;
720         struct rb_node *n;
721         int i, busy, rv;
722
723         busy = lockspace_busy(ls, force);
724
725         spin_lock(&lslist_lock);
726         if (ls->ls_create_count == 1) {
727                 if (busy) {
728                         rv = -EBUSY;
729                 } else {
730                         /* remove_lockspace takes ls off lslist */
731                         ls->ls_create_count = 0;
732                         rv = 0;
733                 }
734         } else if (ls->ls_create_count > 1) {
735                 rv = --ls->ls_create_count;
736         } else {
737                 rv = -EINVAL;
738         }
739         spin_unlock(&lslist_lock);
740
741         if (rv) {
742                 log_debug(ls, "release_lockspace no remove %d", rv);
743                 return rv;
744         }
745
746         dlm_device_deregister(ls);
747
748         if (force < 3 && dlm_user_daemon_available())
749                 do_uevent(ls, 0);
750
751         dlm_recoverd_stop(ls);
752
753         dlm_callback_stop(ls);
754
755         remove_lockspace(ls);
756
757         dlm_delete_debug_file(ls);
758
759         kfree(ls->ls_recover_buf);
760
761         /*
762          * Free direntry structs.
763          */
764
765         dlm_dir_clear(ls);
766         vfree(ls->ls_dirtbl);
767
768         /*
769          * Free all lkb's in idr
770          */
771
772         idr_for_each(&ls->ls_lkbidr, lkb_idr_free, ls);
773         idr_remove_all(&ls->ls_lkbidr);
774         idr_destroy(&ls->ls_lkbidr);
775
776         /*
777          * Free all rsb's on rsbtbl[] lists
778          */
779
780         for (i = 0; i < ls->ls_rsbtbl_size; i++) {
781                 while ((n = rb_first(&ls->ls_rsbtbl[i].keep))) {
782                         rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
783                         rb_erase(n, &ls->ls_rsbtbl[i].keep);
784                         dlm_free_rsb(rsb);
785                 }
786
787                 while ((n = rb_first(&ls->ls_rsbtbl[i].toss))) {
788                         rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
789                         rb_erase(n, &ls->ls_rsbtbl[i].toss);
790                         dlm_free_rsb(rsb);
791                 }
792         }
793
794         vfree(ls->ls_rsbtbl);
795
796         while (!list_empty(&ls->ls_new_rsb)) {
797                 rsb = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb,
798                                        res_hashchain);
799                 list_del(&rsb->res_hashchain);
800                 dlm_free_rsb(rsb);
801         }
802
803         /*
804          * Free structures on any other lists
805          */
806
807         dlm_purge_requestqueue(ls);
808         kfree(ls->ls_recover_args);
809         dlm_clear_free_entries(ls);
810         dlm_clear_members(ls);
811         dlm_clear_members_gone(ls);
812         kfree(ls->ls_node_array);
813         log_debug(ls, "release_lockspace final free");
814         kobject_put(&ls->ls_kobj);
815         /* The ls structure will be freed when the kobject is done with */
816
817         module_put(THIS_MODULE);
818         return 0;
819 }
820
821 /*
822  * Called when a system has released all its locks and is not going to use the
823  * lockspace any longer.  We free everything we're managing for this lockspace.
824  * Remaining nodes will go through the recovery process as if we'd died.  The
825  * lockspace must continue to function as usual, participating in recoveries,
826  * until this returns.
827  *
828  * Force has 4 possible values:
829  * 0 - don't destroy locksapce if it has any LKBs
830  * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs
831  * 2 - destroy lockspace regardless of LKBs
832  * 3 - destroy lockspace as part of a forced shutdown
833  */
834
835 int dlm_release_lockspace(void *lockspace, int force)
836 {
837         struct dlm_ls *ls;
838         int error;
839
840         ls = dlm_find_lockspace_local(lockspace);
841         if (!ls)
842                 return -EINVAL;
843         dlm_put_lockspace(ls);
844
845         mutex_lock(&ls_lock);
846         error = release_lockspace(ls, force);
847         if (!error)
848                 ls_count--;
849         if (!ls_count)
850                 threads_stop();
851         mutex_unlock(&ls_lock);
852
853         return error;
854 }
855
856 void dlm_stop_lockspaces(void)
857 {
858         struct dlm_ls *ls;
859
860  restart:
861         spin_lock(&lslist_lock);
862         list_for_each_entry(ls, &lslist, ls_list) {
863                 if (!test_bit(LSFL_RUNNING, &ls->ls_flags))
864                         continue;
865                 spin_unlock(&lslist_lock);
866                 log_error(ls, "no userland control daemon, stopping lockspace");
867                 dlm_ls_stop(ls);
868                 goto restart;
869         }
870         spin_unlock(&lslist_lock);
871 }
872