]> git.kernelconcepts.de Git - karo-tx-linux.git/blob - fs/dlm/lockspace.c
Merge branches 'intel_pstate' and 'pm-domains'
[karo-tx-linux.git] / fs / dlm / lockspace.c
1 /******************************************************************************
2 *******************************************************************************
3 **
4 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
5 **  Copyright (C) 2004-2011 Red Hat, Inc.  All rights reserved.
6 **
7 **  This copyrighted material is made available to anyone wishing to use,
8 **  modify, copy, or redistribute it subject to the terms and conditions
9 **  of the GNU General Public License v.2.
10 **
11 *******************************************************************************
12 ******************************************************************************/
13
14 #include <linux/module.h>
15
16 #include "dlm_internal.h"
17 #include "lockspace.h"
18 #include "member.h"
19 #include "recoverd.h"
20 #include "dir.h"
21 #include "lowcomms.h"
22 #include "config.h"
23 #include "memory.h"
24 #include "lock.h"
25 #include "recover.h"
26 #include "requestqueue.h"
27 #include "user.h"
28 #include "ast.h"
29
30 static int                      ls_count;
31 static struct mutex             ls_lock;
32 static struct list_head         lslist;
33 static spinlock_t               lslist_lock;
34 static struct task_struct *     scand_task;
35
36
37 static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
38 {
39         ssize_t ret = len;
40         int n;
41         int rc = kstrtoint(buf, 0, &n);
42
43         if (rc)
44                 return rc;
45         ls = dlm_find_lockspace_local(ls->ls_local_handle);
46         if (!ls)
47                 return -EINVAL;
48
49         switch (n) {
50         case 0:
51                 dlm_ls_stop(ls);
52                 break;
53         case 1:
54                 dlm_ls_start(ls);
55                 break;
56         default:
57                 ret = -EINVAL;
58         }
59         dlm_put_lockspace(ls);
60         return ret;
61 }
62
63 static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len)
64 {
65         int rc = kstrtoint(buf, 0, &ls->ls_uevent_result);
66
67         if (rc)
68                 return rc;
69         set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags);
70         wake_up(&ls->ls_uevent_wait);
71         return len;
72 }
73
74 static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf)
75 {
76         return snprintf(buf, PAGE_SIZE, "%u\n", ls->ls_global_id);
77 }
78
79 static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
80 {
81         int rc = kstrtouint(buf, 0, &ls->ls_global_id);
82
83         if (rc)
84                 return rc;
85         return len;
86 }
87
88 static ssize_t dlm_nodir_show(struct dlm_ls *ls, char *buf)
89 {
90         return snprintf(buf, PAGE_SIZE, "%u\n", dlm_no_directory(ls));
91 }
92
93 static ssize_t dlm_nodir_store(struct dlm_ls *ls, const char *buf, size_t len)
94 {
95         int val;
96         int rc = kstrtoint(buf, 0, &val);
97
98         if (rc)
99                 return rc;
100         if (val == 1)
101                 set_bit(LSFL_NODIR, &ls->ls_flags);
102         return len;
103 }
104
105 static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf)
106 {
107         uint32_t status = dlm_recover_status(ls);
108         return snprintf(buf, PAGE_SIZE, "%x\n", status);
109 }
110
111 static ssize_t dlm_recover_nodeid_show(struct dlm_ls *ls, char *buf)
112 {
113         return snprintf(buf, PAGE_SIZE, "%d\n", ls->ls_recover_nodeid);
114 }
115
116 struct dlm_attr {
117         struct attribute attr;
118         ssize_t (*show)(struct dlm_ls *, char *);
119         ssize_t (*store)(struct dlm_ls *, const char *, size_t);
120 };
121
122 static struct dlm_attr dlm_attr_control = {
123         .attr  = {.name = "control", .mode = S_IWUSR},
124         .store = dlm_control_store
125 };
126
127 static struct dlm_attr dlm_attr_event = {
128         .attr  = {.name = "event_done", .mode = S_IWUSR},
129         .store = dlm_event_store
130 };
131
132 static struct dlm_attr dlm_attr_id = {
133         .attr  = {.name = "id", .mode = S_IRUGO | S_IWUSR},
134         .show  = dlm_id_show,
135         .store = dlm_id_store
136 };
137
138 static struct dlm_attr dlm_attr_nodir = {
139         .attr  = {.name = "nodir", .mode = S_IRUGO | S_IWUSR},
140         .show  = dlm_nodir_show,
141         .store = dlm_nodir_store
142 };
143
144 static struct dlm_attr dlm_attr_recover_status = {
145         .attr  = {.name = "recover_status", .mode = S_IRUGO},
146         .show  = dlm_recover_status_show
147 };
148
149 static struct dlm_attr dlm_attr_recover_nodeid = {
150         .attr  = {.name = "recover_nodeid", .mode = S_IRUGO},
151         .show  = dlm_recover_nodeid_show
152 };
153
154 static struct attribute *dlm_attrs[] = {
155         &dlm_attr_control.attr,
156         &dlm_attr_event.attr,
157         &dlm_attr_id.attr,
158         &dlm_attr_nodir.attr,
159         &dlm_attr_recover_status.attr,
160         &dlm_attr_recover_nodeid.attr,
161         NULL,
162 };
163
164 static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr,
165                              char *buf)
166 {
167         struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
168         struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
169         return a->show ? a->show(ls, buf) : 0;
170 }
171
172 static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr,
173                               const char *buf, size_t len)
174 {
175         struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
176         struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
177         return a->store ? a->store(ls, buf, len) : len;
178 }
179
180 static void lockspace_kobj_release(struct kobject *k)
181 {
182         struct dlm_ls *ls  = container_of(k, struct dlm_ls, ls_kobj);
183         kfree(ls);
184 }
185
186 static const struct sysfs_ops dlm_attr_ops = {
187         .show  = dlm_attr_show,
188         .store = dlm_attr_store,
189 };
190
191 static struct kobj_type dlm_ktype = {
192         .default_attrs = dlm_attrs,
193         .sysfs_ops     = &dlm_attr_ops,
194         .release       = lockspace_kobj_release,
195 };
196
197 static struct kset *dlm_kset;
198
199 static int do_uevent(struct dlm_ls *ls, int in)
200 {
201         int error;
202
203         if (in)
204                 kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE);
205         else
206                 kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE);
207
208         log_rinfo(ls, "%s the lockspace group...", in ? "joining" : "leaving");
209
210         /* dlm_controld will see the uevent, do the necessary group management
211            and then write to sysfs to wake us */
212
213         error = wait_event_interruptible(ls->ls_uevent_wait,
214                         test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags));
215
216         log_rinfo(ls, "group event done %d %d", error, ls->ls_uevent_result);
217
218         if (error)
219                 goto out;
220
221         error = ls->ls_uevent_result;
222  out:
223         if (error)
224                 log_error(ls, "group %s failed %d %d", in ? "join" : "leave",
225                           error, ls->ls_uevent_result);
226         return error;
227 }
228
229 static int dlm_uevent(struct kset *kset, struct kobject *kobj,
230                       struct kobj_uevent_env *env)
231 {
232         struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
233
234         add_uevent_var(env, "LOCKSPACE=%s", ls->ls_name);
235         return 0;
236 }
237
238 static struct kset_uevent_ops dlm_uevent_ops = {
239         .uevent = dlm_uevent,
240 };
241
242 int __init dlm_lockspace_init(void)
243 {
244         ls_count = 0;
245         mutex_init(&ls_lock);
246         INIT_LIST_HEAD(&lslist);
247         spin_lock_init(&lslist_lock);
248
249         dlm_kset = kset_create_and_add("dlm", &dlm_uevent_ops, kernel_kobj);
250         if (!dlm_kset) {
251                 printk(KERN_WARNING "%s: can not create kset\n", __func__);
252                 return -ENOMEM;
253         }
254         return 0;
255 }
256
257 void dlm_lockspace_exit(void)
258 {
259         kset_unregister(dlm_kset);
260 }
261
262 static struct dlm_ls *find_ls_to_scan(void)
263 {
264         struct dlm_ls *ls;
265
266         spin_lock(&lslist_lock);
267         list_for_each_entry(ls, &lslist, ls_list) {
268                 if (time_after_eq(jiffies, ls->ls_scan_time +
269                                             dlm_config.ci_scan_secs * HZ)) {
270                         spin_unlock(&lslist_lock);
271                         return ls;
272                 }
273         }
274         spin_unlock(&lslist_lock);
275         return NULL;
276 }
277
278 static int dlm_scand(void *data)
279 {
280         struct dlm_ls *ls;
281
282         while (!kthread_should_stop()) {
283                 ls = find_ls_to_scan();
284                 if (ls) {
285                         if (dlm_lock_recovery_try(ls)) {
286                                 ls->ls_scan_time = jiffies;
287                                 dlm_scan_rsbs(ls);
288                                 dlm_scan_timeout(ls);
289                                 dlm_scan_waiters(ls);
290                                 dlm_unlock_recovery(ls);
291                         } else {
292                                 ls->ls_scan_time += HZ;
293                         }
294                         continue;
295                 }
296                 schedule_timeout_interruptible(dlm_config.ci_scan_secs * HZ);
297         }
298         return 0;
299 }
300
301 static int dlm_scand_start(void)
302 {
303         struct task_struct *p;
304         int error = 0;
305
306         p = kthread_run(dlm_scand, NULL, "dlm_scand");
307         if (IS_ERR(p))
308                 error = PTR_ERR(p);
309         else
310                 scand_task = p;
311         return error;
312 }
313
314 static void dlm_scand_stop(void)
315 {
316         kthread_stop(scand_task);
317 }
318
319 struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
320 {
321         struct dlm_ls *ls;
322
323         spin_lock(&lslist_lock);
324
325         list_for_each_entry(ls, &lslist, ls_list) {
326                 if (ls->ls_global_id == id) {
327                         ls->ls_count++;
328                         goto out;
329                 }
330         }
331         ls = NULL;
332  out:
333         spin_unlock(&lslist_lock);
334         return ls;
335 }
336
337 struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
338 {
339         struct dlm_ls *ls;
340
341         spin_lock(&lslist_lock);
342         list_for_each_entry(ls, &lslist, ls_list) {
343                 if (ls->ls_local_handle == lockspace) {
344                         ls->ls_count++;
345                         goto out;
346                 }
347         }
348         ls = NULL;
349  out:
350         spin_unlock(&lslist_lock);
351         return ls;
352 }
353
354 struct dlm_ls *dlm_find_lockspace_device(int minor)
355 {
356         struct dlm_ls *ls;
357
358         spin_lock(&lslist_lock);
359         list_for_each_entry(ls, &lslist, ls_list) {
360                 if (ls->ls_device.minor == minor) {
361                         ls->ls_count++;
362                         goto out;
363                 }
364         }
365         ls = NULL;
366  out:
367         spin_unlock(&lslist_lock);
368         return ls;
369 }
370
371 void dlm_put_lockspace(struct dlm_ls *ls)
372 {
373         spin_lock(&lslist_lock);
374         ls->ls_count--;
375         spin_unlock(&lslist_lock);
376 }
377
378 static void remove_lockspace(struct dlm_ls *ls)
379 {
380         for (;;) {
381                 spin_lock(&lslist_lock);
382                 if (ls->ls_count == 0) {
383                         WARN_ON(ls->ls_create_count != 0);
384                         list_del(&ls->ls_list);
385                         spin_unlock(&lslist_lock);
386                         return;
387                 }
388                 spin_unlock(&lslist_lock);
389                 ssleep(1);
390         }
391 }
392
393 static int threads_start(void)
394 {
395         int error;
396
397         error = dlm_scand_start();
398         if (error) {
399                 log_print("cannot start dlm_scand thread %d", error);
400                 goto fail;
401         }
402
403         /* Thread for sending/receiving messages for all lockspace's */
404         error = dlm_lowcomms_start();
405         if (error) {
406                 log_print("cannot start dlm lowcomms %d", error);
407                 goto scand_fail;
408         }
409
410         return 0;
411
412  scand_fail:
413         dlm_scand_stop();
414  fail:
415         return error;
416 }
417
418 static void threads_stop(void)
419 {
420         dlm_scand_stop();
421         dlm_lowcomms_stop();
422 }
423
424 static int new_lockspace(const char *name, const char *cluster,
425                          uint32_t flags, int lvblen,
426                          const struct dlm_lockspace_ops *ops, void *ops_arg,
427                          int *ops_result, dlm_lockspace_t **lockspace)
428 {
429         struct dlm_ls *ls;
430         int i, size, error;
431         int do_unreg = 0;
432         int namelen = strlen(name);
433
434         if (namelen > DLM_LOCKSPACE_LEN)
435                 return -EINVAL;
436
437         if (!lvblen || (lvblen % 8))
438                 return -EINVAL;
439
440         if (!try_module_get(THIS_MODULE))
441                 return -EINVAL;
442
443         if (!dlm_user_daemon_available()) {
444                 log_print("dlm user daemon not available");
445                 error = -EUNATCH;
446                 goto out;
447         }
448
449         if (ops && ops_result) {
450                 if (!dlm_config.ci_recover_callbacks)
451                         *ops_result = -EOPNOTSUPP;
452                 else
453                         *ops_result = 0;
454         }
455
456         if (dlm_config.ci_recover_callbacks && cluster &&
457             strncmp(cluster, dlm_config.ci_cluster_name, DLM_LOCKSPACE_LEN)) {
458                 log_print("dlm cluster name %s mismatch %s",
459                           dlm_config.ci_cluster_name, cluster);
460                 error = -EBADR;
461                 goto out;
462         }
463
464         error = 0;
465
466         spin_lock(&lslist_lock);
467         list_for_each_entry(ls, &lslist, ls_list) {
468                 WARN_ON(ls->ls_create_count <= 0);
469                 if (ls->ls_namelen != namelen)
470                         continue;
471                 if (memcmp(ls->ls_name, name, namelen))
472                         continue;
473                 if (flags & DLM_LSFL_NEWEXCL) {
474                         error = -EEXIST;
475                         break;
476                 }
477                 ls->ls_create_count++;
478                 *lockspace = ls;
479                 error = 1;
480                 break;
481         }
482         spin_unlock(&lslist_lock);
483
484         if (error)
485                 goto out;
486
487         error = -ENOMEM;
488
489         ls = kzalloc(sizeof(struct dlm_ls) + namelen, GFP_NOFS);
490         if (!ls)
491                 goto out;
492         memcpy(ls->ls_name, name, namelen);
493         ls->ls_namelen = namelen;
494         ls->ls_lvblen = lvblen;
495         ls->ls_count = 0;
496         ls->ls_flags = 0;
497         ls->ls_scan_time = jiffies;
498
499         if (ops && dlm_config.ci_recover_callbacks) {
500                 ls->ls_ops = ops;
501                 ls->ls_ops_arg = ops_arg;
502         }
503
504         if (flags & DLM_LSFL_TIMEWARN)
505                 set_bit(LSFL_TIMEWARN, &ls->ls_flags);
506
507         /* ls_exflags are forced to match among nodes, and we don't
508            need to require all nodes to have some flags set */
509         ls->ls_exflags = (flags & ~(DLM_LSFL_TIMEWARN | DLM_LSFL_FS |
510                                     DLM_LSFL_NEWEXCL));
511
512         size = dlm_config.ci_rsbtbl_size;
513         ls->ls_rsbtbl_size = size;
514
515         ls->ls_rsbtbl = vmalloc(sizeof(struct dlm_rsbtable) * size);
516         if (!ls->ls_rsbtbl)
517                 goto out_lsfree;
518         for (i = 0; i < size; i++) {
519                 ls->ls_rsbtbl[i].keep.rb_node = NULL;
520                 ls->ls_rsbtbl[i].toss.rb_node = NULL;
521                 spin_lock_init(&ls->ls_rsbtbl[i].lock);
522         }
523
524         spin_lock_init(&ls->ls_remove_spin);
525
526         for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) {
527                 ls->ls_remove_names[i] = kzalloc(DLM_RESNAME_MAXLEN+1,
528                                                  GFP_KERNEL);
529                 if (!ls->ls_remove_names[i])
530                         goto out_rsbtbl;
531         }
532
533         idr_init(&ls->ls_lkbidr);
534         spin_lock_init(&ls->ls_lkbidr_spin);
535
536         INIT_LIST_HEAD(&ls->ls_waiters);
537         mutex_init(&ls->ls_waiters_mutex);
538         INIT_LIST_HEAD(&ls->ls_orphans);
539         mutex_init(&ls->ls_orphans_mutex);
540         INIT_LIST_HEAD(&ls->ls_timeout);
541         mutex_init(&ls->ls_timeout_mutex);
542
543         INIT_LIST_HEAD(&ls->ls_new_rsb);
544         spin_lock_init(&ls->ls_new_rsb_spin);
545
546         INIT_LIST_HEAD(&ls->ls_nodes);
547         INIT_LIST_HEAD(&ls->ls_nodes_gone);
548         ls->ls_num_nodes = 0;
549         ls->ls_low_nodeid = 0;
550         ls->ls_total_weight = 0;
551         ls->ls_node_array = NULL;
552
553         memset(&ls->ls_stub_rsb, 0, sizeof(struct dlm_rsb));
554         ls->ls_stub_rsb.res_ls = ls;
555
556         ls->ls_debug_rsb_dentry = NULL;
557         ls->ls_debug_waiters_dentry = NULL;
558
559         init_waitqueue_head(&ls->ls_uevent_wait);
560         ls->ls_uevent_result = 0;
561         init_completion(&ls->ls_members_done);
562         ls->ls_members_result = -1;
563
564         mutex_init(&ls->ls_cb_mutex);
565         INIT_LIST_HEAD(&ls->ls_cb_delay);
566
567         ls->ls_recoverd_task = NULL;
568         mutex_init(&ls->ls_recoverd_active);
569         spin_lock_init(&ls->ls_recover_lock);
570         spin_lock_init(&ls->ls_rcom_spin);
571         get_random_bytes(&ls->ls_rcom_seq, sizeof(uint64_t));
572         ls->ls_recover_status = 0;
573         ls->ls_recover_seq = 0;
574         ls->ls_recover_args = NULL;
575         init_rwsem(&ls->ls_in_recovery);
576         init_rwsem(&ls->ls_recv_active);
577         INIT_LIST_HEAD(&ls->ls_requestqueue);
578         mutex_init(&ls->ls_requestqueue_mutex);
579         mutex_init(&ls->ls_clear_proc_locks);
580
581         ls->ls_recover_buf = kmalloc(dlm_config.ci_buffer_size, GFP_NOFS);
582         if (!ls->ls_recover_buf)
583                 goto out_lkbidr;
584
585         ls->ls_slot = 0;
586         ls->ls_num_slots = 0;
587         ls->ls_slots_size = 0;
588         ls->ls_slots = NULL;
589
590         INIT_LIST_HEAD(&ls->ls_recover_list);
591         spin_lock_init(&ls->ls_recover_list_lock);
592         idr_init(&ls->ls_recover_idr);
593         spin_lock_init(&ls->ls_recover_idr_lock);
594         ls->ls_recover_list_count = 0;
595         ls->ls_local_handle = ls;
596         init_waitqueue_head(&ls->ls_wait_general);
597         INIT_LIST_HEAD(&ls->ls_root_list);
598         init_rwsem(&ls->ls_root_sem);
599
600         spin_lock(&lslist_lock);
601         ls->ls_create_count = 1;
602         list_add(&ls->ls_list, &lslist);
603         spin_unlock(&lslist_lock);
604
605         if (flags & DLM_LSFL_FS) {
606                 error = dlm_callback_start(ls);
607                 if (error) {
608                         log_error(ls, "can't start dlm_callback %d", error);
609                         goto out_delist;
610                 }
611         }
612
613         init_waitqueue_head(&ls->ls_recover_lock_wait);
614
615         /*
616          * Once started, dlm_recoverd first looks for ls in lslist, then
617          * initializes ls_in_recovery as locked in "down" mode.  We need
618          * to wait for the wakeup from dlm_recoverd because in_recovery
619          * has to start out in down mode.
620          */
621
622         error = dlm_recoverd_start(ls);
623         if (error) {
624                 log_error(ls, "can't start dlm_recoverd %d", error);
625                 goto out_callback;
626         }
627
628         wait_event(ls->ls_recover_lock_wait,
629                    test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags));
630
631         ls->ls_kobj.kset = dlm_kset;
632         error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL,
633                                      "%s", ls->ls_name);
634         if (error)
635                 goto out_recoverd;
636         kobject_uevent(&ls->ls_kobj, KOBJ_ADD);
637
638         /* let kobject handle freeing of ls if there's an error */
639         do_unreg = 1;
640
641         /* This uevent triggers dlm_controld in userspace to add us to the
642            group of nodes that are members of this lockspace (managed by the
643            cluster infrastructure.)  Once it's done that, it tells us who the
644            current lockspace members are (via configfs) and then tells the
645            lockspace to start running (via sysfs) in dlm_ls_start(). */
646
647         error = do_uevent(ls, 1);
648         if (error)
649                 goto out_recoverd;
650
651         wait_for_completion(&ls->ls_members_done);
652         error = ls->ls_members_result;
653         if (error)
654                 goto out_members;
655
656         dlm_create_debug_file(ls);
657
658         log_rinfo(ls, "join complete");
659         *lockspace = ls;
660         return 0;
661
662  out_members:
663         do_uevent(ls, 0);
664         dlm_clear_members(ls);
665         kfree(ls->ls_node_array);
666  out_recoverd:
667         dlm_recoverd_stop(ls);
668  out_callback:
669         dlm_callback_stop(ls);
670  out_delist:
671         spin_lock(&lslist_lock);
672         list_del(&ls->ls_list);
673         spin_unlock(&lslist_lock);
674         idr_destroy(&ls->ls_recover_idr);
675         kfree(ls->ls_recover_buf);
676  out_lkbidr:
677         idr_destroy(&ls->ls_lkbidr);
678         for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) {
679                 if (ls->ls_remove_names[i])
680                         kfree(ls->ls_remove_names[i]);
681         }
682  out_rsbtbl:
683         vfree(ls->ls_rsbtbl);
684  out_lsfree:
685         if (do_unreg)
686                 kobject_put(&ls->ls_kobj);
687         else
688                 kfree(ls);
689  out:
690         module_put(THIS_MODULE);
691         return error;
692 }
693
694 int dlm_new_lockspace(const char *name, const char *cluster,
695                       uint32_t flags, int lvblen,
696                       const struct dlm_lockspace_ops *ops, void *ops_arg,
697                       int *ops_result, dlm_lockspace_t **lockspace)
698 {
699         int error = 0;
700
701         mutex_lock(&ls_lock);
702         if (!ls_count)
703                 error = threads_start();
704         if (error)
705                 goto out;
706
707         error = new_lockspace(name, cluster, flags, lvblen, ops, ops_arg,
708                               ops_result, lockspace);
709         if (!error)
710                 ls_count++;
711         if (error > 0)
712                 error = 0;
713         if (!ls_count)
714                 threads_stop();
715  out:
716         mutex_unlock(&ls_lock);
717         return error;
718 }
719
720 static int lkb_idr_is_local(int id, void *p, void *data)
721 {
722         struct dlm_lkb *lkb = p;
723
724         return lkb->lkb_nodeid == 0 && lkb->lkb_grmode != DLM_LOCK_IV;
725 }
726
727 static int lkb_idr_is_any(int id, void *p, void *data)
728 {
729         return 1;
730 }
731
732 static int lkb_idr_free(int id, void *p, void *data)
733 {
734         struct dlm_lkb *lkb = p;
735
736         if (lkb->lkb_lvbptr && lkb->lkb_flags & DLM_IFL_MSTCPY)
737                 dlm_free_lvb(lkb->lkb_lvbptr);
738
739         dlm_free_lkb(lkb);
740         return 0;
741 }
742
743 /* NOTE: We check the lkbidr here rather than the resource table.
744    This is because there may be LKBs queued as ASTs that have been unlinked
745    from their RSBs and are pending deletion once the AST has been delivered */
746
747 static int lockspace_busy(struct dlm_ls *ls, int force)
748 {
749         int rv;
750
751         spin_lock(&ls->ls_lkbidr_spin);
752         if (force == 0) {
753                 rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_any, ls);
754         } else if (force == 1) {
755                 rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_local, ls);
756         } else {
757                 rv = 0;
758         }
759         spin_unlock(&ls->ls_lkbidr_spin);
760         return rv;
761 }
762
763 static int release_lockspace(struct dlm_ls *ls, int force)
764 {
765         struct dlm_rsb *rsb;
766         struct rb_node *n;
767         int i, busy, rv;
768
769         busy = lockspace_busy(ls, force);
770
771         spin_lock(&lslist_lock);
772         if (ls->ls_create_count == 1) {
773                 if (busy) {
774                         rv = -EBUSY;
775                 } else {
776                         /* remove_lockspace takes ls off lslist */
777                         ls->ls_create_count = 0;
778                         rv = 0;
779                 }
780         } else if (ls->ls_create_count > 1) {
781                 rv = --ls->ls_create_count;
782         } else {
783                 rv = -EINVAL;
784         }
785         spin_unlock(&lslist_lock);
786
787         if (rv) {
788                 log_debug(ls, "release_lockspace no remove %d", rv);
789                 return rv;
790         }
791
792         dlm_device_deregister(ls);
793
794         if (force < 3 && dlm_user_daemon_available())
795                 do_uevent(ls, 0);
796
797         dlm_recoverd_stop(ls);
798
799         dlm_callback_stop(ls);
800
801         remove_lockspace(ls);
802
803         dlm_delete_debug_file(ls);
804
805         kfree(ls->ls_recover_buf);
806
807         /*
808          * Free all lkb's in idr
809          */
810
811         idr_for_each(&ls->ls_lkbidr, lkb_idr_free, ls);
812         idr_destroy(&ls->ls_lkbidr);
813
814         /*
815          * Free all rsb's on rsbtbl[] lists
816          */
817
818         for (i = 0; i < ls->ls_rsbtbl_size; i++) {
819                 while ((n = rb_first(&ls->ls_rsbtbl[i].keep))) {
820                         rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
821                         rb_erase(n, &ls->ls_rsbtbl[i].keep);
822                         dlm_free_rsb(rsb);
823                 }
824
825                 while ((n = rb_first(&ls->ls_rsbtbl[i].toss))) {
826                         rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
827                         rb_erase(n, &ls->ls_rsbtbl[i].toss);
828                         dlm_free_rsb(rsb);
829                 }
830         }
831
832         vfree(ls->ls_rsbtbl);
833
834         for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++)
835                 kfree(ls->ls_remove_names[i]);
836
837         while (!list_empty(&ls->ls_new_rsb)) {
838                 rsb = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb,
839                                        res_hashchain);
840                 list_del(&rsb->res_hashchain);
841                 dlm_free_rsb(rsb);
842         }
843
844         /*
845          * Free structures on any other lists
846          */
847
848         dlm_purge_requestqueue(ls);
849         kfree(ls->ls_recover_args);
850         dlm_clear_members(ls);
851         dlm_clear_members_gone(ls);
852         kfree(ls->ls_node_array);
853         log_rinfo(ls, "release_lockspace final free");
854         kobject_put(&ls->ls_kobj);
855         /* The ls structure will be freed when the kobject is done with */
856
857         module_put(THIS_MODULE);
858         return 0;
859 }
860
861 /*
862  * Called when a system has released all its locks and is not going to use the
863  * lockspace any longer.  We free everything we're managing for this lockspace.
864  * Remaining nodes will go through the recovery process as if we'd died.  The
865  * lockspace must continue to function as usual, participating in recoveries,
866  * until this returns.
867  *
868  * Force has 4 possible values:
869  * 0 - don't destroy locksapce if it has any LKBs
870  * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs
871  * 2 - destroy lockspace regardless of LKBs
872  * 3 - destroy lockspace as part of a forced shutdown
873  */
874
875 int dlm_release_lockspace(void *lockspace, int force)
876 {
877         struct dlm_ls *ls;
878         int error;
879
880         ls = dlm_find_lockspace_local(lockspace);
881         if (!ls)
882                 return -EINVAL;
883         dlm_put_lockspace(ls);
884
885         mutex_lock(&ls_lock);
886         error = release_lockspace(ls, force);
887         if (!error)
888                 ls_count--;
889         if (!ls_count)
890                 threads_stop();
891         mutex_unlock(&ls_lock);
892
893         return error;
894 }
895
896 void dlm_stop_lockspaces(void)
897 {
898         struct dlm_ls *ls;
899         int count;
900
901  restart:
902         count = 0;
903         spin_lock(&lslist_lock);
904         list_for_each_entry(ls, &lslist, ls_list) {
905                 if (!test_bit(LSFL_RUNNING, &ls->ls_flags)) {
906                         count++;
907                         continue;
908                 }
909                 spin_unlock(&lslist_lock);
910                 log_error(ls, "no userland control daemon, stopping lockspace");
911                 dlm_ls_stop(ls);
912                 goto restart;
913         }
914         spin_unlock(&lslist_lock);
915
916         if (count)
917                 log_print("dlm user daemon left %d lockspaces", count);
918 }
919