]> git.kernelconcepts.de Git - karo-tx-linux.git/blob - fs/ocfs2/cluster/heartbeat.c
Merge remote-tracking branch 'ocfs2/linux-next'
[karo-tx-linux.git] / fs / ocfs2 / cluster / heartbeat.c
1 /* -*- mode: c; c-basic-offset: 8; -*-
2  * vim: noexpandtab sw=8 ts=8 sts=0:
3  *
4  * Copyright (C) 2004, 2005 Oracle.  All rights reserved.
5  *
6  * This program is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public
17  * License along with this program; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19  * Boston, MA 021110-1307, USA.
20  */
21
22 #include <linux/kernel.h>
23 #include <linux/sched.h>
24 #include <linux/jiffies.h>
25 #include <linux/module.h>
26 #include <linux/fs.h>
27 #include <linux/bio.h>
28 #include <linux/blkdev.h>
29 #include <linux/delay.h>
30 #include <linux/file.h>
31 #include <linux/kthread.h>
32 #include <linux/configfs.h>
33 #include <linux/random.h>
34 #include <linux/crc32.h>
35 #include <linux/time.h>
36 #include <linux/debugfs.h>
37 #include <linux/slab.h>
38 #include <linux/bitmap.h>
39
40 #include "heartbeat.h"
41 #include "tcp.h"
42 #include "nodemanager.h"
43 #include "quorum.h"
44
45 #include "masklog.h"
46
47
48 /*
49  * The first heartbeat pass had one global thread that would serialize all hb
50  * callback calls.  This global serializing sem should only be removed once
51  * we've made sure that all callees can deal with being called concurrently
52  * from multiple hb region threads.
53  */
54 static DECLARE_RWSEM(o2hb_callback_sem);
55
56 /*
57  * multiple hb threads are watching multiple regions.  A node is live
58  * whenever any of the threads sees activity from the node in its region.
59  */
60 static DEFINE_SPINLOCK(o2hb_live_lock);
61 static struct list_head o2hb_live_slots[O2NM_MAX_NODES];
62 static unsigned long o2hb_live_node_bitmap[BITS_TO_LONGS(O2NM_MAX_NODES)];
63 static LIST_HEAD(o2hb_node_events);
64 static DECLARE_WAIT_QUEUE_HEAD(o2hb_steady_queue);
65
66 /*
67  * In global heartbeat, we maintain a series of region bitmaps.
68  *      - o2hb_region_bitmap allows us to limit the region number to max region.
69  *      - o2hb_live_region_bitmap tracks live regions (seen steady iterations).
70  *      - o2hb_quorum_region_bitmap tracks live regions that have seen all nodes
71  *              heartbeat on it.
72  *      - o2hb_failed_region_bitmap tracks the regions that have seen io timeouts.
73  */
74 static unsigned long o2hb_region_bitmap[BITS_TO_LONGS(O2NM_MAX_REGIONS)];
75 static unsigned long o2hb_live_region_bitmap[BITS_TO_LONGS(O2NM_MAX_REGIONS)];
76 static unsigned long o2hb_quorum_region_bitmap[BITS_TO_LONGS(O2NM_MAX_REGIONS)];
77 static unsigned long o2hb_failed_region_bitmap[BITS_TO_LONGS(O2NM_MAX_REGIONS)];
78
79 #define O2HB_DB_TYPE_LIVENODES          0
80 #define O2HB_DB_TYPE_LIVEREGIONS        1
81 #define O2HB_DB_TYPE_QUORUMREGIONS      2
82 #define O2HB_DB_TYPE_FAILEDREGIONS      3
83 #define O2HB_DB_TYPE_REGION_LIVENODES   4
84 #define O2HB_DB_TYPE_REGION_NUMBER      5
85 #define O2HB_DB_TYPE_REGION_ELAPSED_TIME        6
86 #define O2HB_DB_TYPE_REGION_PINNED      7
87 struct o2hb_debug_buf {
88         int db_type;
89         int db_size;
90         int db_len;
91         void *db_data;
92 };
93
94 static struct o2hb_debug_buf *o2hb_db_livenodes;
95 static struct o2hb_debug_buf *o2hb_db_liveregions;
96 static struct o2hb_debug_buf *o2hb_db_quorumregions;
97 static struct o2hb_debug_buf *o2hb_db_failedregions;
98
99 #define O2HB_DEBUG_DIR                  "o2hb"
100 #define O2HB_DEBUG_LIVENODES            "livenodes"
101 #define O2HB_DEBUG_LIVEREGIONS          "live_regions"
102 #define O2HB_DEBUG_QUORUMREGIONS        "quorum_regions"
103 #define O2HB_DEBUG_FAILEDREGIONS        "failed_regions"
104 #define O2HB_DEBUG_REGION_NUMBER        "num"
105 #define O2HB_DEBUG_REGION_ELAPSED_TIME  "elapsed_time_in_ms"
106 #define O2HB_DEBUG_REGION_PINNED        "pinned"
107
108 static struct dentry *o2hb_debug_dir;
109 static struct dentry *o2hb_debug_livenodes;
110 static struct dentry *o2hb_debug_liveregions;
111 static struct dentry *o2hb_debug_quorumregions;
112 static struct dentry *o2hb_debug_failedregions;
113
114 static LIST_HEAD(o2hb_all_regions);
115
116 static struct o2hb_callback {
117         struct list_head list;
118 } o2hb_callbacks[O2HB_NUM_CB];
119
120 static struct o2hb_callback *hbcall_from_type(enum o2hb_callback_type type);
121
122 #define O2HB_DEFAULT_BLOCK_BITS       9
123
124 enum o2hb_heartbeat_modes {
125         O2HB_HEARTBEAT_LOCAL            = 0,
126         O2HB_HEARTBEAT_GLOBAL,
127         O2HB_HEARTBEAT_NUM_MODES,
128 };
129
130 char *o2hb_heartbeat_mode_desc[O2HB_HEARTBEAT_NUM_MODES] = {
131                 "local",        /* O2HB_HEARTBEAT_LOCAL */
132                 "global",       /* O2HB_HEARTBEAT_GLOBAL */
133 };
134
135 unsigned int o2hb_dead_threshold = O2HB_DEFAULT_DEAD_THRESHOLD;
136 unsigned int o2hb_heartbeat_mode = O2HB_HEARTBEAT_LOCAL;
137
138 /*
139  * o2hb_dependent_users tracks the number of registered callbacks that depend
140  * on heartbeat. o2net and o2dlm are two entities that register this callback.
141  * However only o2dlm depends on the heartbeat. It does not want the heartbeat
142  * to stop while a dlm domain is still active.
143  */
144 unsigned int o2hb_dependent_users;
145
146 /*
147  * In global heartbeat mode, all regions are pinned if there are one or more
148  * dependent users and the quorum region count is <= O2HB_PIN_CUT_OFF. All
149  * regions are unpinned if the region count exceeds the cut off or the number
150  * of dependent users falls to zero.
151  */
152 #define O2HB_PIN_CUT_OFF                3
153
154 /*
155  * In local heartbeat mode, we assume the dlm domain name to be the same as
156  * region uuid. This is true for domains created for the file system but not
157  * necessarily true for userdlm domains. This is a known limitation.
158  *
159  * In global heartbeat mode, we pin/unpin all o2hb regions. This solution
160  * works for both file system and userdlm domains.
161  */
162 static int o2hb_region_pin(const char *region_uuid);
163 static void o2hb_region_unpin(const char *region_uuid);
164
165 /* Only sets a new threshold if there are no active regions.
166  *
167  * No locking or otherwise interesting code is required for reading
168  * o2hb_dead_threshold as it can't change once regions are active and
169  * it's not interesting to anyone until then anyway. */
170 static void o2hb_dead_threshold_set(unsigned int threshold)
171 {
172         if (threshold > O2HB_MIN_DEAD_THRESHOLD) {
173                 spin_lock(&o2hb_live_lock);
174                 if (list_empty(&o2hb_all_regions))
175                         o2hb_dead_threshold = threshold;
176                 spin_unlock(&o2hb_live_lock);
177         }
178 }
179
180 static int o2hb_global_hearbeat_mode_set(unsigned int hb_mode)
181 {
182         int ret = -1;
183
184         if (hb_mode < O2HB_HEARTBEAT_NUM_MODES) {
185                 spin_lock(&o2hb_live_lock);
186                 if (list_empty(&o2hb_all_regions)) {
187                         o2hb_heartbeat_mode = hb_mode;
188                         ret = 0;
189                 }
190                 spin_unlock(&o2hb_live_lock);
191         }
192
193         return ret;
194 }
195
196 struct o2hb_node_event {
197         struct list_head        hn_item;
198         enum o2hb_callback_type hn_event_type;
199         struct o2nm_node        *hn_node;
200         int                     hn_node_num;
201 };
202
203 struct o2hb_disk_slot {
204         struct o2hb_disk_heartbeat_block *ds_raw_block;
205         u8                      ds_node_num;
206         u64                     ds_last_time;
207         u64                     ds_last_generation;
208         u16                     ds_equal_samples;
209         u16                     ds_changed_samples;
210         struct list_head        ds_live_item;
211 };
212
213 /* each thread owns a region.. when we're asked to tear down the region
214  * we ask the thread to stop, who cleans up the region */
215 struct o2hb_region {
216         struct config_item      hr_item;
217
218         struct list_head        hr_all_item;
219         unsigned                hr_unclean_stop:1,
220                                 hr_aborted_start:1,
221                                 hr_item_pinned:1,
222                                 hr_item_dropped:1;
223
224         /* protected by the hr_callback_sem */
225         struct task_struct      *hr_task;
226
227         unsigned int            hr_blocks;
228         unsigned long long      hr_start_block;
229
230         unsigned int            hr_block_bits;
231         unsigned int            hr_block_bytes;
232
233         unsigned int            hr_slots_per_page;
234         unsigned int            hr_num_pages;
235
236         struct page             **hr_slot_data;
237         struct block_device     *hr_bdev;
238         struct o2hb_disk_slot   *hr_slots;
239
240         /* live node map of this region */
241         unsigned long           hr_live_node_bitmap[BITS_TO_LONGS(O2NM_MAX_NODES)];
242         unsigned int            hr_region_num;
243
244         struct dentry           *hr_debug_dir;
245         struct dentry           *hr_debug_livenodes;
246         struct dentry           *hr_debug_regnum;
247         struct dentry           *hr_debug_elapsed_time;
248         struct dentry           *hr_debug_pinned;
249         struct o2hb_debug_buf   *hr_db_livenodes;
250         struct o2hb_debug_buf   *hr_db_regnum;
251         struct o2hb_debug_buf   *hr_db_elapsed_time;
252         struct o2hb_debug_buf   *hr_db_pinned;
253
254         /* let the person setting up hb wait for it to return until it
255          * has reached a 'steady' state.  This will be fixed when we have
256          * a more complete api that doesn't lead to this sort of fragility. */
257         atomic_t                hr_steady_iterations;
258
259         /* terminate o2hb thread if it does not reach steady state
260          * (hr_steady_iterations == 0) within hr_unsteady_iterations */
261         atomic_t                hr_unsteady_iterations;
262
263         char                    hr_dev_name[BDEVNAME_SIZE];
264
265         unsigned int            hr_timeout_ms;
266
267         /* randomized as the region goes up and down so that a node
268          * recognizes a node going up and down in one iteration */
269         u64                     hr_generation;
270
271         struct delayed_work     hr_write_timeout_work;
272         unsigned long           hr_last_timeout_start;
273
274         /* Used during o2hb_check_slot to hold a copy of the block
275          * being checked because we temporarily have to zero out the
276          * crc field. */
277         struct o2hb_disk_heartbeat_block *hr_tmp_block;
278 };
279
280 struct o2hb_bio_wait_ctxt {
281         atomic_t          wc_num_reqs;
282         struct completion wc_io_complete;
283         int               wc_error;
284 };
285
286 static void o2hb_write_timeout(struct work_struct *work)
287 {
288         int failed, quorum;
289         unsigned long flags;
290         struct o2hb_region *reg =
291                 container_of(work, struct o2hb_region,
292                              hr_write_timeout_work.work);
293
294         mlog(ML_ERROR, "Heartbeat write timeout to device %s after %u "
295              "milliseconds\n", reg->hr_dev_name,
296              jiffies_to_msecs(jiffies - reg->hr_last_timeout_start));
297
298         if (o2hb_global_heartbeat_active()) {
299                 spin_lock_irqsave(&o2hb_live_lock, flags);
300                 if (test_bit(reg->hr_region_num, o2hb_quorum_region_bitmap))
301                         set_bit(reg->hr_region_num, o2hb_failed_region_bitmap);
302                 failed = bitmap_weight(o2hb_failed_region_bitmap,
303                                         O2NM_MAX_REGIONS);
304                 quorum = bitmap_weight(o2hb_quorum_region_bitmap,
305                                         O2NM_MAX_REGIONS);
306                 spin_unlock_irqrestore(&o2hb_live_lock, flags);
307
308                 mlog(ML_HEARTBEAT, "Number of regions %d, failed regions %d\n",
309                      quorum, failed);
310
311                 /*
312                  * Fence if the number of failed regions >= half the number
313                  * of  quorum regions
314                  */
315                 if ((failed << 1) < quorum)
316                         return;
317         }
318
319         o2quo_disk_timeout();
320 }
321
322 static void o2hb_arm_write_timeout(struct o2hb_region *reg)
323 {
324         /* Arm writeout only after thread reaches steady state */
325         if (atomic_read(&reg->hr_steady_iterations) != 0)
326                 return;
327
328         mlog(ML_HEARTBEAT, "Queue write timeout for %u ms\n",
329              O2HB_MAX_WRITE_TIMEOUT_MS);
330
331         if (o2hb_global_heartbeat_active()) {
332                 spin_lock(&o2hb_live_lock);
333                 clear_bit(reg->hr_region_num, o2hb_failed_region_bitmap);
334                 spin_unlock(&o2hb_live_lock);
335         }
336         cancel_delayed_work(&reg->hr_write_timeout_work);
337         reg->hr_last_timeout_start = jiffies;
338         schedule_delayed_work(&reg->hr_write_timeout_work,
339                               msecs_to_jiffies(O2HB_MAX_WRITE_TIMEOUT_MS));
340 }
341
342 static void o2hb_disarm_write_timeout(struct o2hb_region *reg)
343 {
344         cancel_delayed_work_sync(&reg->hr_write_timeout_work);
345 }
346
347 static inline void o2hb_bio_wait_init(struct o2hb_bio_wait_ctxt *wc)
348 {
349         atomic_set(&wc->wc_num_reqs, 1);
350         init_completion(&wc->wc_io_complete);
351         wc->wc_error = 0;
352 }
353
354 /* Used in error paths too */
355 static inline void o2hb_bio_wait_dec(struct o2hb_bio_wait_ctxt *wc,
356                                      unsigned int num)
357 {
358         /* sadly atomic_sub_and_test() isn't available on all platforms.  The
359          * good news is that the fast path only completes one at a time */
360         while(num--) {
361                 if (atomic_dec_and_test(&wc->wc_num_reqs)) {
362                         BUG_ON(num > 0);
363                         complete(&wc->wc_io_complete);
364                 }
365         }
366 }
367
368 static void o2hb_wait_on_io(struct o2hb_region *reg,
369                             struct o2hb_bio_wait_ctxt *wc)
370 {
371         o2hb_bio_wait_dec(wc, 1);
372         wait_for_completion(&wc->wc_io_complete);
373 }
374
375 static void o2hb_bio_end_io(struct bio *bio,
376                            int error)
377 {
378         struct o2hb_bio_wait_ctxt *wc = bio->bi_private;
379
380         if (error) {
381                 mlog(ML_ERROR, "IO Error %d\n", error);
382                 wc->wc_error = error;
383         }
384
385         o2hb_bio_wait_dec(wc, 1);
386         bio_put(bio);
387 }
388
389 /* Setup a Bio to cover I/O against num_slots slots starting at
390  * start_slot. */
391 static struct bio *o2hb_setup_one_bio(struct o2hb_region *reg,
392                                       struct o2hb_bio_wait_ctxt *wc,
393                                       unsigned int *current_slot,
394                                       unsigned int max_slots)
395 {
396         int len, current_page;
397         unsigned int vec_len, vec_start;
398         unsigned int bits = reg->hr_block_bits;
399         unsigned int spp = reg->hr_slots_per_page;
400         unsigned int cs = *current_slot;
401         struct bio *bio;
402         struct page *page;
403
404         /* Testing has shown this allocation to take long enough under
405          * GFP_KERNEL that the local node can get fenced. It would be
406          * nicest if we could pre-allocate these bios and avoid this
407          * all together. */
408         bio = bio_alloc(GFP_ATOMIC, 16);
409         if (!bio) {
410                 mlog(ML_ERROR, "Could not alloc slots BIO!\n");
411                 bio = ERR_PTR(-ENOMEM);
412                 goto bail;
413         }
414
415         /* Must put everything in 512 byte sectors for the bio... */
416         bio->bi_sector = (reg->hr_start_block + cs) << (bits - 9);
417         bio->bi_bdev = reg->hr_bdev;
418         bio->bi_private = wc;
419         bio->bi_end_io = o2hb_bio_end_io;
420
421         vec_start = (cs << bits) % PAGE_CACHE_SIZE;
422         while(cs < max_slots) {
423                 current_page = cs / spp;
424                 page = reg->hr_slot_data[current_page];
425
426                 vec_len = min(PAGE_CACHE_SIZE - vec_start,
427                               (max_slots-cs) * (PAGE_CACHE_SIZE/spp) );
428
429                 mlog(ML_HB_BIO, "page %d, vec_len = %u, vec_start = %u\n",
430                      current_page, vec_len, vec_start);
431
432                 len = bio_add_page(bio, page, vec_len, vec_start);
433                 if (len != vec_len) break;
434
435                 cs += vec_len / (PAGE_CACHE_SIZE/spp);
436                 vec_start = 0;
437         }
438
439 bail:
440         *current_slot = cs;
441         return bio;
442 }
443
444 static int o2hb_read_slots(struct o2hb_region *reg,
445                            unsigned int max_slots)
446 {
447         unsigned int current_slot=0;
448         int status;
449         struct o2hb_bio_wait_ctxt wc;
450         struct bio *bio;
451
452         o2hb_bio_wait_init(&wc);
453
454         while(current_slot < max_slots) {
455                 bio = o2hb_setup_one_bio(reg, &wc, &current_slot, max_slots);
456                 if (IS_ERR(bio)) {
457                         status = PTR_ERR(bio);
458                         mlog_errno(status);
459                         goto bail_and_wait;
460                 }
461
462                 atomic_inc(&wc.wc_num_reqs);
463                 submit_bio(READ, bio);
464         }
465
466         status = 0;
467
468 bail_and_wait:
469         o2hb_wait_on_io(reg, &wc);
470         if (wc.wc_error && !status)
471                 status = wc.wc_error;
472
473         return status;
474 }
475
476 static int o2hb_issue_node_write(struct o2hb_region *reg,
477                                  struct o2hb_bio_wait_ctxt *write_wc)
478 {
479         int status;
480         unsigned int slot;
481         struct bio *bio;
482
483         o2hb_bio_wait_init(write_wc);
484
485         slot = o2nm_this_node();
486
487         bio = o2hb_setup_one_bio(reg, write_wc, &slot, slot+1);
488         if (IS_ERR(bio)) {
489                 status = PTR_ERR(bio);
490                 mlog_errno(status);
491                 goto bail;
492         }
493
494         atomic_inc(&write_wc->wc_num_reqs);
495         submit_bio(WRITE, bio);
496
497         status = 0;
498 bail:
499         return status;
500 }
501
502 static u32 o2hb_compute_block_crc_le(struct o2hb_region *reg,
503                                      struct o2hb_disk_heartbeat_block *hb_block)
504 {
505         __le32 old_cksum;
506         u32 ret;
507
508         /* We want to compute the block crc with a 0 value in the
509          * hb_cksum field. Save it off here and replace after the
510          * crc. */
511         old_cksum = hb_block->hb_cksum;
512         hb_block->hb_cksum = 0;
513
514         ret = crc32_le(0, (unsigned char *) hb_block, reg->hr_block_bytes);
515
516         hb_block->hb_cksum = old_cksum;
517
518         return ret;
519 }
520
521 static void o2hb_dump_slot(struct o2hb_disk_heartbeat_block *hb_block)
522 {
523         mlog(ML_ERROR, "Dump slot information: seq = 0x%llx, node = %u, "
524              "cksum = 0x%x, generation 0x%llx\n",
525              (long long)le64_to_cpu(hb_block->hb_seq),
526              hb_block->hb_node, le32_to_cpu(hb_block->hb_cksum),
527              (long long)le64_to_cpu(hb_block->hb_generation));
528 }
529
530 static int o2hb_verify_crc(struct o2hb_region *reg,
531                            struct o2hb_disk_heartbeat_block *hb_block)
532 {
533         u32 read, computed;
534
535         read = le32_to_cpu(hb_block->hb_cksum);
536         computed = o2hb_compute_block_crc_le(reg, hb_block);
537
538         return read == computed;
539 }
540
541 /*
542  * Compare the slot data with what we wrote in the last iteration.
543  * If the match fails, print an appropriate error message. This is to
544  * detect errors like... another node hearting on the same slot,
545  * flaky device that is losing writes, etc.
546  * Returns 1 if check succeeds, 0 otherwise.
547  */
548 static int o2hb_check_own_slot(struct o2hb_region *reg)
549 {
550         struct o2hb_disk_slot *slot;
551         struct o2hb_disk_heartbeat_block *hb_block;
552         char *errstr;
553
554         slot = &reg->hr_slots[o2nm_this_node()];
555         /* Don't check on our 1st timestamp */
556         if (!slot->ds_last_time)
557                 return 0;
558
559         hb_block = slot->ds_raw_block;
560         if (le64_to_cpu(hb_block->hb_seq) == slot->ds_last_time &&
561             le64_to_cpu(hb_block->hb_generation) == slot->ds_last_generation &&
562             hb_block->hb_node == slot->ds_node_num)
563                 return 1;
564
565 #define ERRSTR1         "Another node is heartbeating on device"
566 #define ERRSTR2         "Heartbeat generation mismatch on device"
567 #define ERRSTR3         "Heartbeat sequence mismatch on device"
568
569         if (hb_block->hb_node != slot->ds_node_num)
570                 errstr = ERRSTR1;
571         else if (le64_to_cpu(hb_block->hb_generation) !=
572                  slot->ds_last_generation)
573                 errstr = ERRSTR2;
574         else
575                 errstr = ERRSTR3;
576
577         mlog(ML_ERROR, "%s (%s): expected(%u:0x%llx, 0x%llx), "
578              "ondisk(%u:0x%llx, 0x%llx)\n", errstr, reg->hr_dev_name,
579              slot->ds_node_num, (unsigned long long)slot->ds_last_generation,
580              (unsigned long long)slot->ds_last_time, hb_block->hb_node,
581              (unsigned long long)le64_to_cpu(hb_block->hb_generation),
582              (unsigned long long)le64_to_cpu(hb_block->hb_seq));
583
584         return 0;
585 }
586
587 static inline void o2hb_prepare_block(struct o2hb_region *reg,
588                                       u64 generation)
589 {
590         int node_num;
591         u64 cputime;
592         struct o2hb_disk_slot *slot;
593         struct o2hb_disk_heartbeat_block *hb_block;
594
595         node_num = o2nm_this_node();
596         slot = &reg->hr_slots[node_num];
597
598         hb_block = (struct o2hb_disk_heartbeat_block *)slot->ds_raw_block;
599         memset(hb_block, 0, reg->hr_block_bytes);
600         /* TODO: time stuff */
601         cputime = CURRENT_TIME.tv_sec;
602         if (!cputime)
603                 cputime = 1;
604
605         hb_block->hb_seq = cpu_to_le64(cputime);
606         hb_block->hb_node = node_num;
607         hb_block->hb_generation = cpu_to_le64(generation);
608         hb_block->hb_dead_ms = cpu_to_le32(o2hb_dead_threshold * O2HB_REGION_TIMEOUT_MS);
609
610         /* This step must always happen last! */
611         hb_block->hb_cksum = cpu_to_le32(o2hb_compute_block_crc_le(reg,
612                                                                    hb_block));
613
614         mlog(ML_HB_BIO, "our node generation = 0x%llx, cksum = 0x%x\n",
615              (long long)generation,
616              le32_to_cpu(hb_block->hb_cksum));
617 }
618
619 static void o2hb_fire_callbacks(struct o2hb_callback *hbcall,
620                                 struct o2nm_node *node,
621                                 int idx)
622 {
623         struct list_head *iter;
624         struct o2hb_callback_func *f;
625
626         list_for_each(iter, &hbcall->list) {
627                 f = list_entry(iter, struct o2hb_callback_func, hc_item);
628                 mlog(ML_HEARTBEAT, "calling funcs %p\n", f);
629                 (f->hc_func)(node, idx, f->hc_data);
630         }
631 }
632
633 /* Will run the list in order until we process the passed event */
634 static void o2hb_run_event_list(struct o2hb_node_event *queued_event)
635 {
636         int empty;
637         struct o2hb_callback *hbcall;
638         struct o2hb_node_event *event;
639
640         spin_lock(&o2hb_live_lock);
641         empty = list_empty(&queued_event->hn_item);
642         spin_unlock(&o2hb_live_lock);
643         if (empty)
644                 return;
645
646         /* Holding callback sem assures we don't alter the callback
647          * lists when doing this, and serializes ourselves with other
648          * processes wanting callbacks. */
649         down_write(&o2hb_callback_sem);
650
651         spin_lock(&o2hb_live_lock);
652         while (!list_empty(&o2hb_node_events)
653                && !list_empty(&queued_event->hn_item)) {
654                 event = list_entry(o2hb_node_events.next,
655                                    struct o2hb_node_event,
656                                    hn_item);
657                 list_del_init(&event->hn_item);
658                 spin_unlock(&o2hb_live_lock);
659
660                 mlog(ML_HEARTBEAT, "Node %s event for %d\n",
661                      event->hn_event_type == O2HB_NODE_UP_CB ? "UP" : "DOWN",
662                      event->hn_node_num);
663
664                 hbcall = hbcall_from_type(event->hn_event_type);
665
666                 /* We should *never* have gotten on to the list with a
667                  * bad type... This isn't something that we should try
668                  * to recover from. */
669                 BUG_ON(IS_ERR(hbcall));
670
671                 o2hb_fire_callbacks(hbcall, event->hn_node, event->hn_node_num);
672
673                 spin_lock(&o2hb_live_lock);
674         }
675         spin_unlock(&o2hb_live_lock);
676
677         up_write(&o2hb_callback_sem);
678 }
679
680 static void o2hb_queue_node_event(struct o2hb_node_event *event,
681                                   enum o2hb_callback_type type,
682                                   struct o2nm_node *node,
683                                   int node_num)
684 {
685         assert_spin_locked(&o2hb_live_lock);
686
687         BUG_ON((!node) && (type != O2HB_NODE_DOWN_CB));
688
689         event->hn_event_type = type;
690         event->hn_node = node;
691         event->hn_node_num = node_num;
692
693         mlog(ML_HEARTBEAT, "Queue node %s event for node %d\n",
694              type == O2HB_NODE_UP_CB ? "UP" : "DOWN", node_num);
695
696         list_add_tail(&event->hn_item, &o2hb_node_events);
697 }
698
699 static void o2hb_shutdown_slot(struct o2hb_disk_slot *slot)
700 {
701         struct o2hb_node_event event =
702                 { .hn_item = LIST_HEAD_INIT(event.hn_item), };
703         struct o2nm_node *node;
704
705         node = o2nm_get_node_by_num(slot->ds_node_num);
706         if (!node)
707                 return;
708
709         spin_lock(&o2hb_live_lock);
710         if (!list_empty(&slot->ds_live_item)) {
711                 mlog(ML_HEARTBEAT, "Shutdown, node %d leaves region\n",
712                      slot->ds_node_num);
713
714                 list_del_init(&slot->ds_live_item);
715
716                 if (list_empty(&o2hb_live_slots[slot->ds_node_num])) {
717                         clear_bit(slot->ds_node_num, o2hb_live_node_bitmap);
718
719                         o2hb_queue_node_event(&event, O2HB_NODE_DOWN_CB, node,
720                                               slot->ds_node_num);
721                 }
722         }
723         spin_unlock(&o2hb_live_lock);
724
725         o2hb_run_event_list(&event);
726
727         o2nm_node_put(node);
728 }
729
730 static void o2hb_set_quorum_device(struct o2hb_region *reg)
731 {
732         if (!o2hb_global_heartbeat_active())
733                 return;
734
735         /* Prevent race with o2hb_heartbeat_group_drop_item() */
736         if (kthread_should_stop())
737                 return;
738
739         /* Tag region as quorum only after thread reaches steady state */
740         if (atomic_read(&reg->hr_steady_iterations) != 0)
741                 return;
742
743         spin_lock(&o2hb_live_lock);
744
745         if (test_bit(reg->hr_region_num, o2hb_quorum_region_bitmap))
746                 goto unlock;
747
748         /*
749          * A region can be added to the quorum only when it sees all
750          * live nodes heartbeat on it. In other words, the region has been
751          * added to all nodes.
752          */
753         if (memcmp(reg->hr_live_node_bitmap, o2hb_live_node_bitmap,
754                    sizeof(o2hb_live_node_bitmap)))
755                 goto unlock;
756
757         printk(KERN_NOTICE "o2hb: Region %s (%s) is now a quorum device\n",
758                config_item_name(&reg->hr_item), reg->hr_dev_name);
759
760         set_bit(reg->hr_region_num, o2hb_quorum_region_bitmap);
761
762         /*
763          * If global heartbeat active, unpin all regions if the
764          * region count > CUT_OFF
765          */
766         if (bitmap_weight(o2hb_quorum_region_bitmap,
767                            O2NM_MAX_REGIONS) > O2HB_PIN_CUT_OFF)
768                 o2hb_region_unpin(NULL);
769 unlock:
770         spin_unlock(&o2hb_live_lock);
771 }
772
773 static int o2hb_check_slot(struct o2hb_region *reg,
774                            struct o2hb_disk_slot *slot)
775 {
776         int changed = 0, gen_changed = 0;
777         struct o2hb_node_event event =
778                 { .hn_item = LIST_HEAD_INIT(event.hn_item), };
779         struct o2nm_node *node;
780         struct o2hb_disk_heartbeat_block *hb_block = reg->hr_tmp_block;
781         u64 cputime;
782         unsigned int dead_ms = o2hb_dead_threshold * O2HB_REGION_TIMEOUT_MS;
783         unsigned int slot_dead_ms;
784         int tmp;
785
786         memcpy(hb_block, slot->ds_raw_block, reg->hr_block_bytes);
787
788         /*
789          * If a node is no longer configured but is still in the livemap, we
790          * may need to clear that bit from the livemap.
791          */
792         node = o2nm_get_node_by_num(slot->ds_node_num);
793         if (!node) {
794                 spin_lock(&o2hb_live_lock);
795                 tmp = test_bit(slot->ds_node_num, o2hb_live_node_bitmap);
796                 spin_unlock(&o2hb_live_lock);
797                 if (!tmp)
798                         return 0;
799         }
800
801         if (!o2hb_verify_crc(reg, hb_block)) {
802                 /* all paths from here will drop o2hb_live_lock for
803                  * us. */
804                 spin_lock(&o2hb_live_lock);
805
806                 /* Don't print an error on the console in this case -
807                  * a freshly formatted heartbeat area will not have a
808                  * crc set on it. */
809                 if (list_empty(&slot->ds_live_item))
810                         goto out;
811
812                 /* The node is live but pushed out a bad crc. We
813                  * consider it a transient miss but don't populate any
814                  * other values as they may be junk. */
815                 mlog(ML_ERROR, "Node %d has written a bad crc to %s\n",
816                      slot->ds_node_num, reg->hr_dev_name);
817                 o2hb_dump_slot(hb_block);
818
819                 slot->ds_equal_samples++;
820                 goto fire_callbacks;
821         }
822
823         /* we don't care if these wrap.. the state transitions below
824          * clear at the right places */
825         cputime = le64_to_cpu(hb_block->hb_seq);
826         if (slot->ds_last_time != cputime)
827                 slot->ds_changed_samples++;
828         else
829                 slot->ds_equal_samples++;
830         slot->ds_last_time = cputime;
831
832         /* The node changed heartbeat generations. We assume this to
833          * mean it dropped off but came back before we timed out. We
834          * want to consider it down for the time being but don't want
835          * to lose any changed_samples state we might build up to
836          * considering it live again. */
837         if (slot->ds_last_generation != le64_to_cpu(hb_block->hb_generation)) {
838                 gen_changed = 1;
839                 slot->ds_equal_samples = 0;
840                 mlog(ML_HEARTBEAT, "Node %d changed generation (0x%llx "
841                      "to 0x%llx)\n", slot->ds_node_num,
842                      (long long)slot->ds_last_generation,
843                      (long long)le64_to_cpu(hb_block->hb_generation));
844         }
845
846         slot->ds_last_generation = le64_to_cpu(hb_block->hb_generation);
847
848         mlog(ML_HEARTBEAT, "Slot %d gen 0x%llx cksum 0x%x "
849              "seq %llu last %llu changed %u equal %u\n",
850              slot->ds_node_num, (long long)slot->ds_last_generation,
851              le32_to_cpu(hb_block->hb_cksum),
852              (unsigned long long)le64_to_cpu(hb_block->hb_seq),
853              (unsigned long long)slot->ds_last_time, slot->ds_changed_samples,
854              slot->ds_equal_samples);
855
856         spin_lock(&o2hb_live_lock);
857
858 fire_callbacks:
859         /* dead nodes only come to life after some number of
860          * changes at any time during their dead time */
861         if (list_empty(&slot->ds_live_item) &&
862             slot->ds_changed_samples >= O2HB_LIVE_THRESHOLD) {
863                 mlog(ML_HEARTBEAT, "Node %d (id 0x%llx) joined my region\n",
864                      slot->ds_node_num, (long long)slot->ds_last_generation);
865
866                 set_bit(slot->ds_node_num, reg->hr_live_node_bitmap);
867
868                 /* first on the list generates a callback */
869                 if (list_empty(&o2hb_live_slots[slot->ds_node_num])) {
870                         mlog(ML_HEARTBEAT, "o2hb: Add node %d to live nodes "
871                              "bitmap\n", slot->ds_node_num);
872                         set_bit(slot->ds_node_num, o2hb_live_node_bitmap);
873
874                         o2hb_queue_node_event(&event, O2HB_NODE_UP_CB, node,
875                                               slot->ds_node_num);
876
877                         changed = 1;
878                 }
879
880                 list_add_tail(&slot->ds_live_item,
881                               &o2hb_live_slots[slot->ds_node_num]);
882
883                 slot->ds_equal_samples = 0;
884
885                 /* We want to be sure that all nodes agree on the
886                  * number of milliseconds before a node will be
887                  * considered dead. The self-fencing timeout is
888                  * computed from this value, and a discrepancy might
889                  * result in heartbeat calling a node dead when it
890                  * hasn't self-fenced yet. */
891                 slot_dead_ms = le32_to_cpu(hb_block->hb_dead_ms);
892                 if (slot_dead_ms && slot_dead_ms != dead_ms) {
893                         /* TODO: Perhaps we can fail the region here. */
894                         mlog(ML_ERROR, "Node %d on device %s has a dead count "
895                              "of %u ms, but our count is %u ms.\n"
896                              "Please double check your configuration values "
897                              "for 'O2CB_HEARTBEAT_THRESHOLD'\n",
898                              slot->ds_node_num, reg->hr_dev_name, slot_dead_ms,
899                              dead_ms);
900                 }
901                 goto out;
902         }
903
904         /* if the list is dead, we're done.. */
905         if (list_empty(&slot->ds_live_item))
906                 goto out;
907
908         /* live nodes only go dead after enough consequtive missed
909          * samples..  reset the missed counter whenever we see
910          * activity */
911         if (slot->ds_equal_samples >= o2hb_dead_threshold || gen_changed) {
912                 mlog(ML_HEARTBEAT, "Node %d left my region\n",
913                      slot->ds_node_num);
914
915                 clear_bit(slot->ds_node_num, reg->hr_live_node_bitmap);
916
917                 /* last off the live_slot generates a callback */
918                 list_del_init(&slot->ds_live_item);
919                 if (list_empty(&o2hb_live_slots[slot->ds_node_num])) {
920                         mlog(ML_HEARTBEAT, "o2hb: Remove node %d from live "
921                              "nodes bitmap\n", slot->ds_node_num);
922                         clear_bit(slot->ds_node_num, o2hb_live_node_bitmap);
923
924                         /* node can be null */
925                         o2hb_queue_node_event(&event, O2HB_NODE_DOWN_CB,
926                                               node, slot->ds_node_num);
927
928                         changed = 1;
929                 }
930
931                 /* We don't clear this because the node is still
932                  * actually writing new blocks. */
933                 if (!gen_changed)
934                         slot->ds_changed_samples = 0;
935                 goto out;
936         }
937         if (slot->ds_changed_samples) {
938                 slot->ds_changed_samples = 0;
939                 slot->ds_equal_samples = 0;
940         }
941 out:
942         spin_unlock(&o2hb_live_lock);
943
944         o2hb_run_event_list(&event);
945
946         if (node)
947                 o2nm_node_put(node);
948         return changed;
949 }
950
951 static int o2hb_highest_node(unsigned long *nodes, int numbits)
952 {
953         return find_last_bit(nodes, numbits);
954 }
955
956 static int o2hb_do_disk_heartbeat(struct o2hb_region *reg)
957 {
958         int i, ret, highest_node;
959         int membership_change = 0, own_slot_ok = 0;
960         unsigned long configured_nodes[BITS_TO_LONGS(O2NM_MAX_NODES)];
961         unsigned long live_node_bitmap[BITS_TO_LONGS(O2NM_MAX_NODES)];
962         struct o2hb_bio_wait_ctxt write_wc;
963
964         ret = o2nm_configured_node_map(configured_nodes,
965                                        sizeof(configured_nodes));
966         if (ret) {
967                 mlog_errno(ret);
968                 goto bail;
969         }
970
971         /*
972          * If a node is not configured but is in the livemap, we still need
973          * to read the slot so as to be able to remove it from the livemap.
974          */
975         o2hb_fill_node_map(live_node_bitmap, sizeof(live_node_bitmap));
976         i = -1;
977         while ((i = find_next_bit(live_node_bitmap,
978                                   O2NM_MAX_NODES, i + 1)) < O2NM_MAX_NODES) {
979                 set_bit(i, configured_nodes);
980         }
981
982         highest_node = o2hb_highest_node(configured_nodes, O2NM_MAX_NODES);
983         if (highest_node >= O2NM_MAX_NODES) {
984                 mlog(ML_NOTICE, "o2hb: No configured nodes found!\n");
985                 ret = -EINVAL;
986                 goto bail;
987         }
988
989         /* No sense in reading the slots of nodes that don't exist
990          * yet. Of course, if the node definitions have holes in them
991          * then we're reading an empty slot anyway... Consider this
992          * best-effort. */
993         ret = o2hb_read_slots(reg, highest_node + 1);
994         if (ret < 0) {
995                 mlog_errno(ret);
996                 goto bail;
997         }
998
999         /* With an up to date view of the slots, we can check that no
1000          * other node has been improperly configured to heartbeat in
1001          * our slot. */
1002         own_slot_ok = o2hb_check_own_slot(reg);
1003
1004         /* fill in the proper info for our next heartbeat */
1005         o2hb_prepare_block(reg, reg->hr_generation);
1006
1007         ret = o2hb_issue_node_write(reg, &write_wc);
1008         if (ret < 0) {
1009                 mlog_errno(ret);
1010                 goto bail;
1011         }
1012
1013         i = -1;
1014         while((i = find_next_bit(configured_nodes,
1015                                  O2NM_MAX_NODES, i + 1)) < O2NM_MAX_NODES) {
1016                 membership_change |= o2hb_check_slot(reg, &reg->hr_slots[i]);
1017         }
1018
1019         /*
1020          * We have to be sure we've advertised ourselves on disk
1021          * before we can go to steady state.  This ensures that
1022          * people we find in our steady state have seen us.
1023          */
1024         o2hb_wait_on_io(reg, &write_wc);
1025         if (write_wc.wc_error) {
1026                 /* Do not re-arm the write timeout on I/O error - we
1027                  * can't be sure that the new block ever made it to
1028                  * disk */
1029                 mlog(ML_ERROR, "Write error %d on device \"%s\"\n",
1030                      write_wc.wc_error, reg->hr_dev_name);
1031                 ret = write_wc.wc_error;
1032                 goto bail;
1033         }
1034
1035         /* Skip disarming the timeout if own slot has stale/bad data */
1036         if (own_slot_ok) {
1037                 o2hb_set_quorum_device(reg);
1038                 o2hb_arm_write_timeout(reg);
1039         }
1040
1041 bail:
1042         /* let the person who launched us know when things are steady */
1043         if (atomic_read(&reg->hr_steady_iterations) != 0) {
1044                 if (!ret && own_slot_ok && !membership_change) {
1045                         if (atomic_dec_and_test(&reg->hr_steady_iterations))
1046                                 wake_up(&o2hb_steady_queue);
1047                 }
1048         }
1049
1050         if (atomic_read(&reg->hr_steady_iterations) != 0) {
1051                 if (atomic_dec_and_test(&reg->hr_unsteady_iterations)) {
1052                         printk(KERN_NOTICE "o2hb: Unable to stabilize "
1053                                "heartbeart on region %s (%s)\n",
1054                                config_item_name(&reg->hr_item),
1055                                reg->hr_dev_name);
1056                         atomic_set(&reg->hr_steady_iterations, 0);
1057                         reg->hr_aborted_start = 1;
1058                         wake_up(&o2hb_steady_queue);
1059                         ret = -EIO;
1060                 }
1061         }
1062
1063         return ret;
1064 }
1065
1066 /* Subtract b from a, storing the result in a. a *must* have a larger
1067  * value than b. */
1068 static void o2hb_tv_subtract(struct timeval *a,
1069                              struct timeval *b)
1070 {
1071         /* just return 0 when a is after b */
1072         if (a->tv_sec < b->tv_sec ||
1073             (a->tv_sec == b->tv_sec && a->tv_usec < b->tv_usec)) {
1074                 a->tv_sec = 0;
1075                 a->tv_usec = 0;
1076                 return;
1077         }
1078
1079         a->tv_sec -= b->tv_sec;
1080         a->tv_usec -= b->tv_usec;
1081         while ( a->tv_usec < 0 ) {
1082                 a->tv_sec--;
1083                 a->tv_usec += 1000000;
1084         }
1085 }
1086
1087 static unsigned int o2hb_elapsed_msecs(struct timeval *start,
1088                                        struct timeval *end)
1089 {
1090         struct timeval res = *end;
1091
1092         o2hb_tv_subtract(&res, start);
1093
1094         return res.tv_sec * 1000 + res.tv_usec / 1000;
1095 }
1096
1097 /*
1098  * we ride the region ref that the region dir holds.  before the region
1099  * dir is removed and drops it ref it will wait to tear down this
1100  * thread.
1101  */
1102 static int o2hb_thread(void *data)
1103 {
1104         int i, ret;
1105         struct o2hb_region *reg = data;
1106         struct o2hb_bio_wait_ctxt write_wc;
1107         struct timeval before_hb, after_hb;
1108         unsigned int elapsed_msec;
1109
1110         mlog(ML_HEARTBEAT|ML_KTHREAD, "hb thread running\n");
1111
1112         set_user_nice(current, -20);
1113
1114         /* Pin node */
1115         o2nm_depend_this_node();
1116
1117         while (!kthread_should_stop() &&
1118                !reg->hr_unclean_stop && !reg->hr_aborted_start) {
1119                 /* We track the time spent inside
1120                  * o2hb_do_disk_heartbeat so that we avoid more than
1121                  * hr_timeout_ms between disk writes. On busy systems
1122                  * this should result in a heartbeat which is less
1123                  * likely to time itself out. */
1124                 do_gettimeofday(&before_hb);
1125
1126                 ret = o2hb_do_disk_heartbeat(reg);
1127
1128                 do_gettimeofday(&after_hb);
1129                 elapsed_msec = o2hb_elapsed_msecs(&before_hb, &after_hb);
1130
1131                 mlog(ML_HEARTBEAT,
1132                      "start = %lu.%lu, end = %lu.%lu, msec = %u\n",
1133                      before_hb.tv_sec, (unsigned long) before_hb.tv_usec,
1134                      after_hb.tv_sec, (unsigned long) after_hb.tv_usec,
1135                      elapsed_msec);
1136
1137                 if (!kthread_should_stop() &&
1138                     elapsed_msec < reg->hr_timeout_ms) {
1139                         /* the kthread api has blocked signals for us so no
1140                          * need to record the return value. */
1141                         msleep_interruptible(reg->hr_timeout_ms - elapsed_msec);
1142                 }
1143         }
1144
1145         o2hb_disarm_write_timeout(reg);
1146
1147         /* unclean stop is only used in very bad situation */
1148         for(i = 0; !reg->hr_unclean_stop && i < reg->hr_blocks; i++)
1149                 o2hb_shutdown_slot(&reg->hr_slots[i]);
1150
1151         /* Explicit down notification - avoid forcing the other nodes
1152          * to timeout on this region when we could just as easily
1153          * write a clear generation - thus indicating to them that
1154          * this node has left this region.
1155          */
1156         if (!reg->hr_unclean_stop && !reg->hr_aborted_start) {
1157                 o2hb_prepare_block(reg, 0);
1158                 ret = o2hb_issue_node_write(reg, &write_wc);
1159                 if (ret == 0)
1160                         o2hb_wait_on_io(reg, &write_wc);
1161                 else
1162                         mlog_errno(ret);
1163         }
1164
1165         /* Unpin node */
1166         o2nm_undepend_this_node();
1167
1168         mlog(ML_HEARTBEAT|ML_KTHREAD, "o2hb thread exiting\n");
1169
1170         return 0;
1171 }
1172
1173 #ifdef CONFIG_DEBUG_FS
1174 static int o2hb_debug_open(struct inode *inode, struct file *file)
1175 {
1176         struct o2hb_debug_buf *db = inode->i_private;
1177         struct o2hb_region *reg;
1178         unsigned long map[BITS_TO_LONGS(O2NM_MAX_NODES)];
1179         unsigned long lts;
1180         char *buf = NULL;
1181         int i = -1;
1182         int out = 0;
1183
1184         /* max_nodes should be the largest bitmap we pass here */
1185         BUG_ON(sizeof(map) < db->db_size);
1186
1187         buf = kmalloc(PAGE_SIZE, GFP_KERNEL);
1188         if (!buf)
1189                 goto bail;
1190
1191         switch (db->db_type) {
1192         case O2HB_DB_TYPE_LIVENODES:
1193         case O2HB_DB_TYPE_LIVEREGIONS:
1194         case O2HB_DB_TYPE_QUORUMREGIONS:
1195         case O2HB_DB_TYPE_FAILEDREGIONS:
1196                 spin_lock(&o2hb_live_lock);
1197                 memcpy(map, db->db_data, db->db_size);
1198                 spin_unlock(&o2hb_live_lock);
1199                 break;
1200
1201         case O2HB_DB_TYPE_REGION_LIVENODES:
1202                 spin_lock(&o2hb_live_lock);
1203                 reg = (struct o2hb_region *)db->db_data;
1204                 memcpy(map, reg->hr_live_node_bitmap, db->db_size);
1205                 spin_unlock(&o2hb_live_lock);
1206                 break;
1207
1208         case O2HB_DB_TYPE_REGION_NUMBER:
1209                 reg = (struct o2hb_region *)db->db_data;
1210                 out += snprintf(buf + out, PAGE_SIZE - out, "%d\n",
1211                                 reg->hr_region_num);
1212                 goto done;
1213
1214         case O2HB_DB_TYPE_REGION_ELAPSED_TIME:
1215                 reg = (struct o2hb_region *)db->db_data;
1216                 lts = reg->hr_last_timeout_start;
1217                 /* If 0, it has never been set before */
1218                 if (lts)
1219                         lts = jiffies_to_msecs(jiffies - lts);
1220                 out += snprintf(buf + out, PAGE_SIZE - out, "%lu\n", lts);
1221                 goto done;
1222
1223         case O2HB_DB_TYPE_REGION_PINNED:
1224                 reg = (struct o2hb_region *)db->db_data;
1225                 out += snprintf(buf + out, PAGE_SIZE - out, "%u\n",
1226                                 !!reg->hr_item_pinned);
1227                 goto done;
1228
1229         default:
1230                 goto done;
1231         }
1232
1233         while ((i = find_next_bit(map, db->db_len, i + 1)) < db->db_len)
1234                 out += snprintf(buf + out, PAGE_SIZE - out, "%d ", i);
1235         out += snprintf(buf + out, PAGE_SIZE - out, "\n");
1236
1237 done:
1238         i_size_write(inode, out);
1239
1240         file->private_data = buf;
1241
1242         return 0;
1243 bail:
1244         return -ENOMEM;
1245 }
1246
1247 static int o2hb_debug_release(struct inode *inode, struct file *file)
1248 {
1249         kfree(file->private_data);
1250         return 0;
1251 }
1252
1253 static ssize_t o2hb_debug_read(struct file *file, char __user *buf,
1254                                  size_t nbytes, loff_t *ppos)
1255 {
1256         return simple_read_from_buffer(buf, nbytes, ppos, file->private_data,
1257                                        i_size_read(file->f_mapping->host));
1258 }
1259 #else
1260 static int o2hb_debug_open(struct inode *inode, struct file *file)
1261 {
1262         return 0;
1263 }
1264 static int o2hb_debug_release(struct inode *inode, struct file *file)
1265 {
1266         return 0;
1267 }
1268 static ssize_t o2hb_debug_read(struct file *file, char __user *buf,
1269                                size_t nbytes, loff_t *ppos)
1270 {
1271         return 0;
1272 }
1273 #endif  /* CONFIG_DEBUG_FS */
1274
1275 static const struct file_operations o2hb_debug_fops = {
1276         .open =         o2hb_debug_open,
1277         .release =      o2hb_debug_release,
1278         .read =         o2hb_debug_read,
1279         .llseek =       generic_file_llseek,
1280 };
1281
1282 void o2hb_exit(void)
1283 {
1284         kfree(o2hb_db_livenodes);
1285         kfree(o2hb_db_liveregions);
1286         kfree(o2hb_db_quorumregions);
1287         kfree(o2hb_db_failedregions);
1288         debugfs_remove(o2hb_debug_failedregions);
1289         debugfs_remove(o2hb_debug_quorumregions);
1290         debugfs_remove(o2hb_debug_liveregions);
1291         debugfs_remove(o2hb_debug_livenodes);
1292         debugfs_remove(o2hb_debug_dir);
1293 }
1294
1295 static struct dentry *o2hb_debug_create(const char *name, struct dentry *dir,
1296                                         struct o2hb_debug_buf **db, int db_len,
1297                                         int type, int size, int len, void *data)
1298 {
1299         *db = kmalloc(db_len, GFP_KERNEL);
1300         if (!*db)
1301                 return NULL;
1302
1303         (*db)->db_type = type;
1304         (*db)->db_size = size;
1305         (*db)->db_len = len;
1306         (*db)->db_data = data;
1307
1308         return debugfs_create_file(name, S_IFREG|S_IRUSR, dir, *db,
1309                                    &o2hb_debug_fops);
1310 }
1311
1312 static int o2hb_debug_init(void)
1313 {
1314         int ret = -ENOMEM;
1315
1316         o2hb_debug_dir = debugfs_create_dir(O2HB_DEBUG_DIR, NULL);
1317         if (!o2hb_debug_dir) {
1318                 mlog_errno(ret);
1319                 goto bail;
1320         }
1321
1322         o2hb_debug_livenodes = o2hb_debug_create(O2HB_DEBUG_LIVENODES,
1323                                                  o2hb_debug_dir,
1324                                                  &o2hb_db_livenodes,
1325                                                  sizeof(*o2hb_db_livenodes),
1326                                                  O2HB_DB_TYPE_LIVENODES,
1327                                                  sizeof(o2hb_live_node_bitmap),
1328                                                  O2NM_MAX_NODES,
1329                                                  o2hb_live_node_bitmap);
1330         if (!o2hb_debug_livenodes) {
1331                 mlog_errno(ret);
1332                 goto bail;
1333         }
1334
1335         o2hb_debug_liveregions = o2hb_debug_create(O2HB_DEBUG_LIVEREGIONS,
1336                                                    o2hb_debug_dir,
1337                                                    &o2hb_db_liveregions,
1338                                                    sizeof(*o2hb_db_liveregions),
1339                                                    O2HB_DB_TYPE_LIVEREGIONS,
1340                                                    sizeof(o2hb_live_region_bitmap),
1341                                                    O2NM_MAX_REGIONS,
1342                                                    o2hb_live_region_bitmap);
1343         if (!o2hb_debug_liveregions) {
1344                 mlog_errno(ret);
1345                 goto bail;
1346         }
1347
1348         o2hb_debug_quorumregions =
1349                         o2hb_debug_create(O2HB_DEBUG_QUORUMREGIONS,
1350                                           o2hb_debug_dir,
1351                                           &o2hb_db_quorumregions,
1352                                           sizeof(*o2hb_db_quorumregions),
1353                                           O2HB_DB_TYPE_QUORUMREGIONS,
1354                                           sizeof(o2hb_quorum_region_bitmap),
1355                                           O2NM_MAX_REGIONS,
1356                                           o2hb_quorum_region_bitmap);
1357         if (!o2hb_debug_quorumregions) {
1358                 mlog_errno(ret);
1359                 goto bail;
1360         }
1361
1362         o2hb_debug_failedregions =
1363                         o2hb_debug_create(O2HB_DEBUG_FAILEDREGIONS,
1364                                           o2hb_debug_dir,
1365                                           &o2hb_db_failedregions,
1366                                           sizeof(*o2hb_db_failedregions),
1367                                           O2HB_DB_TYPE_FAILEDREGIONS,
1368                                           sizeof(o2hb_failed_region_bitmap),
1369                                           O2NM_MAX_REGIONS,
1370                                           o2hb_failed_region_bitmap);
1371         if (!o2hb_debug_failedregions) {
1372                 mlog_errno(ret);
1373                 goto bail;
1374         }
1375
1376         ret = 0;
1377 bail:
1378         if (ret)
1379                 o2hb_exit();
1380
1381         return ret;
1382 }
1383
1384 int o2hb_init(void)
1385 {
1386         int i;
1387
1388         for (i = 0; i < ARRAY_SIZE(o2hb_callbacks); i++)
1389                 INIT_LIST_HEAD(&o2hb_callbacks[i].list);
1390
1391         for (i = 0; i < ARRAY_SIZE(o2hb_live_slots); i++)
1392                 INIT_LIST_HEAD(&o2hb_live_slots[i]);
1393
1394         INIT_LIST_HEAD(&o2hb_node_events);
1395
1396         memset(o2hb_live_node_bitmap, 0, sizeof(o2hb_live_node_bitmap));
1397         memset(o2hb_region_bitmap, 0, sizeof(o2hb_region_bitmap));
1398         memset(o2hb_live_region_bitmap, 0, sizeof(o2hb_live_region_bitmap));
1399         memset(o2hb_quorum_region_bitmap, 0, sizeof(o2hb_quorum_region_bitmap));
1400         memset(o2hb_failed_region_bitmap, 0, sizeof(o2hb_failed_region_bitmap));
1401
1402         o2hb_dependent_users = 0;
1403
1404         return o2hb_debug_init();
1405 }
1406
1407 /* if we're already in a callback then we're already serialized by the sem */
1408 static void o2hb_fill_node_map_from_callback(unsigned long *map,
1409                                              unsigned bytes)
1410 {
1411         BUG_ON(bytes < (BITS_TO_LONGS(O2NM_MAX_NODES) * sizeof(unsigned long)));
1412
1413         memcpy(map, &o2hb_live_node_bitmap, bytes);
1414 }
1415
1416 /*
1417  * get a map of all nodes that are heartbeating in any regions
1418  */
1419 void o2hb_fill_node_map(unsigned long *map, unsigned bytes)
1420 {
1421         /* callers want to serialize this map and callbacks so that they
1422          * can trust that they don't miss nodes coming to the party */
1423         down_read(&o2hb_callback_sem);
1424         spin_lock(&o2hb_live_lock);
1425         o2hb_fill_node_map_from_callback(map, bytes);
1426         spin_unlock(&o2hb_live_lock);
1427         up_read(&o2hb_callback_sem);
1428 }
1429 EXPORT_SYMBOL_GPL(o2hb_fill_node_map);
1430
1431 /*
1432  * heartbeat configfs bits.  The heartbeat set is a default set under
1433  * the cluster set in nodemanager.c.
1434  */
1435
1436 static struct o2hb_region *to_o2hb_region(struct config_item *item)
1437 {
1438         return item ? container_of(item, struct o2hb_region, hr_item) : NULL;
1439 }
1440
1441 /* drop_item only drops its ref after killing the thread, nothing should
1442  * be using the region anymore.  this has to clean up any state that
1443  * attributes might have built up. */
1444 static void o2hb_region_release(struct config_item *item)
1445 {
1446         int i;
1447         struct page *page;
1448         struct o2hb_region *reg = to_o2hb_region(item);
1449
1450         mlog(ML_HEARTBEAT, "hb region release (%s)\n", reg->hr_dev_name);
1451
1452         if (reg->hr_tmp_block)
1453                 kfree(reg->hr_tmp_block);
1454
1455         if (reg->hr_slot_data) {
1456                 for (i = 0; i < reg->hr_num_pages; i++) {
1457                         page = reg->hr_slot_data[i];
1458                         if (page)
1459                                 __free_page(page);
1460                 }
1461                 kfree(reg->hr_slot_data);
1462         }
1463
1464         if (reg->hr_bdev)
1465                 blkdev_put(reg->hr_bdev, FMODE_READ|FMODE_WRITE);
1466
1467         if (reg->hr_slots)
1468                 kfree(reg->hr_slots);
1469
1470         kfree(reg->hr_db_regnum);
1471         kfree(reg->hr_db_livenodes);
1472         debugfs_remove(reg->hr_debug_livenodes);
1473         debugfs_remove(reg->hr_debug_regnum);
1474         debugfs_remove(reg->hr_debug_elapsed_time);
1475         debugfs_remove(reg->hr_debug_pinned);
1476         debugfs_remove(reg->hr_debug_dir);
1477
1478         spin_lock(&o2hb_live_lock);
1479         list_del(&reg->hr_all_item);
1480         spin_unlock(&o2hb_live_lock);
1481
1482         kfree(reg);
1483 }
1484
1485 static int o2hb_read_block_input(struct o2hb_region *reg,
1486                                  const char *page,
1487                                  size_t count,
1488                                  unsigned long *ret_bytes,
1489                                  unsigned int *ret_bits)
1490 {
1491         unsigned long bytes;
1492         char *p = (char *)page;
1493
1494         bytes = simple_strtoul(p, &p, 0);
1495         if (!p || (*p && (*p != '\n')))
1496                 return -EINVAL;
1497
1498         /* Heartbeat and fs min / max block sizes are the same. */
1499         if (bytes > 4096 || bytes < 512)
1500                 return -ERANGE;
1501         if (hweight16(bytes) != 1)
1502                 return -EINVAL;
1503
1504         if (ret_bytes)
1505                 *ret_bytes = bytes;
1506         if (ret_bits)
1507                 *ret_bits = ffs(bytes) - 1;
1508
1509         return 0;
1510 }
1511
1512 static ssize_t o2hb_region_block_bytes_read(struct o2hb_region *reg,
1513                                             char *page)
1514 {
1515         return sprintf(page, "%u\n", reg->hr_block_bytes);
1516 }
1517
1518 static ssize_t o2hb_region_block_bytes_write(struct o2hb_region *reg,
1519                                              const char *page,
1520                                              size_t count)
1521 {
1522         int status;
1523         unsigned long block_bytes;
1524         unsigned int block_bits;
1525
1526         if (reg->hr_bdev)
1527                 return -EINVAL;
1528
1529         status = o2hb_read_block_input(reg, page, count,
1530                                        &block_bytes, &block_bits);
1531         if (status)
1532                 return status;
1533
1534         reg->hr_block_bytes = (unsigned int)block_bytes;
1535         reg->hr_block_bits = block_bits;
1536
1537         return count;
1538 }
1539
1540 static ssize_t o2hb_region_start_block_read(struct o2hb_region *reg,
1541                                             char *page)
1542 {
1543         return sprintf(page, "%llu\n", reg->hr_start_block);
1544 }
1545
1546 static ssize_t o2hb_region_start_block_write(struct o2hb_region *reg,
1547                                              const char *page,
1548                                              size_t count)
1549 {
1550         unsigned long long tmp;
1551         char *p = (char *)page;
1552
1553         if (reg->hr_bdev)
1554                 return -EINVAL;
1555
1556         tmp = simple_strtoull(p, &p, 0);
1557         if (!p || (*p && (*p != '\n')))
1558                 return -EINVAL;
1559
1560         reg->hr_start_block = tmp;
1561
1562         return count;
1563 }
1564
1565 static ssize_t o2hb_region_blocks_read(struct o2hb_region *reg,
1566                                        char *page)
1567 {
1568         return sprintf(page, "%d\n", reg->hr_blocks);
1569 }
1570
1571 static ssize_t o2hb_region_blocks_write(struct o2hb_region *reg,
1572                                         const char *page,
1573                                         size_t count)
1574 {
1575         unsigned long tmp;
1576         char *p = (char *)page;
1577
1578         if (reg->hr_bdev)
1579                 return -EINVAL;
1580
1581         tmp = simple_strtoul(p, &p, 0);
1582         if (!p || (*p && (*p != '\n')))
1583                 return -EINVAL;
1584
1585         if (tmp > O2NM_MAX_NODES || tmp == 0)
1586                 return -ERANGE;
1587
1588         reg->hr_blocks = (unsigned int)tmp;
1589
1590         return count;
1591 }
1592
1593 static ssize_t o2hb_region_dev_read(struct o2hb_region *reg,
1594                                     char *page)
1595 {
1596         unsigned int ret = 0;
1597
1598         if (reg->hr_bdev)
1599                 ret = sprintf(page, "%s\n", reg->hr_dev_name);
1600
1601         return ret;
1602 }
1603
1604 static void o2hb_init_region_params(struct o2hb_region *reg)
1605 {
1606         reg->hr_slots_per_page = PAGE_CACHE_SIZE >> reg->hr_block_bits;
1607         reg->hr_timeout_ms = O2HB_REGION_TIMEOUT_MS;
1608
1609         mlog(ML_HEARTBEAT, "hr_start_block = %llu, hr_blocks = %u\n",
1610              reg->hr_start_block, reg->hr_blocks);
1611         mlog(ML_HEARTBEAT, "hr_block_bytes = %u, hr_block_bits = %u\n",
1612              reg->hr_block_bytes, reg->hr_block_bits);
1613         mlog(ML_HEARTBEAT, "hr_timeout_ms = %u\n", reg->hr_timeout_ms);
1614         mlog(ML_HEARTBEAT, "dead threshold = %u\n", o2hb_dead_threshold);
1615 }
1616
1617 static int o2hb_map_slot_data(struct o2hb_region *reg)
1618 {
1619         int i, j;
1620         unsigned int last_slot;
1621         unsigned int spp = reg->hr_slots_per_page;
1622         struct page *page;
1623         char *raw;
1624         struct o2hb_disk_slot *slot;
1625
1626         reg->hr_tmp_block = kmalloc(reg->hr_block_bytes, GFP_KERNEL);
1627         if (reg->hr_tmp_block == NULL) {
1628                 mlog_errno(-ENOMEM);
1629                 return -ENOMEM;
1630         }
1631
1632         reg->hr_slots = kcalloc(reg->hr_blocks,
1633                                 sizeof(struct o2hb_disk_slot), GFP_KERNEL);
1634         if (reg->hr_slots == NULL) {
1635                 mlog_errno(-ENOMEM);
1636                 return -ENOMEM;
1637         }
1638
1639         for(i = 0; i < reg->hr_blocks; i++) {
1640                 slot = &reg->hr_slots[i];
1641                 slot->ds_node_num = i;
1642                 INIT_LIST_HEAD(&slot->ds_live_item);
1643                 slot->ds_raw_block = NULL;
1644         }
1645
1646         reg->hr_num_pages = (reg->hr_blocks + spp - 1) / spp;
1647         mlog(ML_HEARTBEAT, "Going to require %u pages to cover %u blocks "
1648                            "at %u blocks per page\n",
1649              reg->hr_num_pages, reg->hr_blocks, spp);
1650
1651         reg->hr_slot_data = kcalloc(reg->hr_num_pages, sizeof(struct page *),
1652                                     GFP_KERNEL);
1653         if (!reg->hr_slot_data) {
1654                 mlog_errno(-ENOMEM);
1655                 return -ENOMEM;
1656         }
1657
1658         for(i = 0; i < reg->hr_num_pages; i++) {
1659                 page = alloc_page(GFP_KERNEL);
1660                 if (!page) {
1661                         mlog_errno(-ENOMEM);
1662                         return -ENOMEM;
1663                 }
1664
1665                 reg->hr_slot_data[i] = page;
1666
1667                 last_slot = i * spp;
1668                 raw = page_address(page);
1669                 for (j = 0;
1670                      (j < spp) && ((j + last_slot) < reg->hr_blocks);
1671                      j++) {
1672                         BUG_ON((j + last_slot) >= reg->hr_blocks);
1673
1674                         slot = &reg->hr_slots[j + last_slot];
1675                         slot->ds_raw_block =
1676                                 (struct o2hb_disk_heartbeat_block *) raw;
1677
1678                         raw += reg->hr_block_bytes;
1679                 }
1680         }
1681
1682         return 0;
1683 }
1684
1685 /* Read in all the slots available and populate the tracking
1686  * structures so that we can start with a baseline idea of what's
1687  * there. */
1688 static int o2hb_populate_slot_data(struct o2hb_region *reg)
1689 {
1690         int ret, i;
1691         struct o2hb_disk_slot *slot;
1692         struct o2hb_disk_heartbeat_block *hb_block;
1693
1694         ret = o2hb_read_slots(reg, reg->hr_blocks);
1695         if (ret) {
1696                 mlog_errno(ret);
1697                 goto out;
1698         }
1699
1700         /* We only want to get an idea of the values initially in each
1701          * slot, so we do no verification - o2hb_check_slot will
1702          * actually determine if each configured slot is valid and
1703          * whether any values have changed. */
1704         for(i = 0; i < reg->hr_blocks; i++) {
1705                 slot = &reg->hr_slots[i];
1706                 hb_block = (struct o2hb_disk_heartbeat_block *) slot->ds_raw_block;
1707
1708                 /* Only fill the values that o2hb_check_slot uses to
1709                  * determine changing slots */
1710                 slot->ds_last_time = le64_to_cpu(hb_block->hb_seq);
1711                 slot->ds_last_generation = le64_to_cpu(hb_block->hb_generation);
1712         }
1713
1714 out:
1715         return ret;
1716 }
1717
1718 /* this is acting as commit; we set up all of hr_bdev and hr_task or nothing */
1719 static ssize_t o2hb_region_dev_write(struct o2hb_region *reg,
1720                                      const char *page,
1721                                      size_t count)
1722 {
1723         struct task_struct *hb_task;
1724         long fd;
1725         int sectsize;
1726         char *p = (char *)page;
1727         struct fd f;
1728         struct inode *inode;
1729         ssize_t ret = -EINVAL;
1730         int live_threshold;
1731
1732         if (reg->hr_bdev)
1733                 goto out;
1734
1735         /* We can't heartbeat without having had our node number
1736          * configured yet. */
1737         if (o2nm_this_node() == O2NM_MAX_NODES)
1738                 goto out;
1739
1740         fd = simple_strtol(p, &p, 0);
1741         if (!p || (*p && (*p != '\n')))
1742                 goto out;
1743
1744         if (fd < 0 || fd >= INT_MAX)
1745                 goto out;
1746
1747         f = fdget(fd);
1748         if (f.file == NULL)
1749                 goto out;
1750
1751         if (reg->hr_blocks == 0 || reg->hr_start_block == 0 ||
1752             reg->hr_block_bytes == 0)
1753                 goto out2;
1754
1755         inode = igrab(f.file->f_mapping->host);
1756         if (inode == NULL)
1757                 goto out2;
1758
1759         if (!S_ISBLK(inode->i_mode))
1760                 goto out3;
1761
1762         reg->hr_bdev = I_BDEV(f.file->f_mapping->host);
1763         ret = blkdev_get(reg->hr_bdev, FMODE_WRITE | FMODE_READ, NULL);
1764         if (ret) {
1765                 reg->hr_bdev = NULL;
1766                 goto out3;
1767         }
1768         inode = NULL;
1769
1770         bdevname(reg->hr_bdev, reg->hr_dev_name);
1771
1772         sectsize = bdev_logical_block_size(reg->hr_bdev);
1773         if (sectsize != reg->hr_block_bytes) {
1774                 mlog(ML_ERROR,
1775                      "blocksize %u incorrect for device, expected %d",
1776                      reg->hr_block_bytes, sectsize);
1777                 ret = -EINVAL;
1778                 goto out3;
1779         }
1780
1781         o2hb_init_region_params(reg);
1782
1783         /* Generation of zero is invalid */
1784         do {
1785                 get_random_bytes(&reg->hr_generation,
1786                                  sizeof(reg->hr_generation));
1787         } while (reg->hr_generation == 0);
1788
1789         ret = o2hb_map_slot_data(reg);
1790         if (ret) {
1791                 mlog_errno(ret);
1792                 goto out3;
1793         }
1794
1795         ret = o2hb_populate_slot_data(reg);
1796         if (ret) {
1797                 mlog_errno(ret);
1798                 goto out3;
1799         }
1800
1801         INIT_DELAYED_WORK(&reg->hr_write_timeout_work, o2hb_write_timeout);
1802
1803         /*
1804          * A node is considered live after it has beat LIVE_THRESHOLD
1805          * times.  We're not steady until we've given them a chance
1806          * _after_ our first read.
1807          * The default threshold is bare minimum so as to limit the delay
1808          * during mounts. For global heartbeat, the threshold doubled for the
1809          * first region.
1810          */
1811         live_threshold = O2HB_LIVE_THRESHOLD;
1812         if (o2hb_global_heartbeat_active()) {
1813                 spin_lock(&o2hb_live_lock);
1814                 if (bitmap_weight(o2hb_region_bitmap, O2NM_MAX_REGIONS) == 1)
1815                         live_threshold <<= 1;
1816                 spin_unlock(&o2hb_live_lock);
1817         }
1818         ++live_threshold;
1819         atomic_set(&reg->hr_steady_iterations, live_threshold);
1820         /* unsteady_iterations is double the steady_iterations */
1821         atomic_set(&reg->hr_unsteady_iterations, (live_threshold << 1));
1822
1823         hb_task = kthread_run(o2hb_thread, reg, "o2hb-%s",
1824                               reg->hr_item.ci_name);
1825         if (IS_ERR(hb_task)) {
1826                 ret = PTR_ERR(hb_task);
1827                 mlog_errno(ret);
1828                 goto out3;
1829         }
1830
1831         spin_lock(&o2hb_live_lock);
1832         reg->hr_task = hb_task;
1833         spin_unlock(&o2hb_live_lock);
1834
1835         ret = wait_event_interruptible(o2hb_steady_queue,
1836                                 atomic_read(&reg->hr_steady_iterations) == 0);
1837         if (ret) {
1838                 atomic_set(&reg->hr_steady_iterations, 0);
1839                 reg->hr_aborted_start = 1;
1840         }
1841
1842         if (reg->hr_aborted_start) {
1843                 ret = -EIO;
1844                 goto out3;
1845         }
1846
1847         /* Ok, we were woken.  Make sure it wasn't by drop_item() */
1848         spin_lock(&o2hb_live_lock);
1849         hb_task = reg->hr_task;
1850         if (o2hb_global_heartbeat_active())
1851                 set_bit(reg->hr_region_num, o2hb_live_region_bitmap);
1852         spin_unlock(&o2hb_live_lock);
1853
1854         if (hb_task)
1855                 ret = count;
1856         else
1857                 ret = -EIO;
1858
1859         if (hb_task && o2hb_global_heartbeat_active())
1860                 printk(KERN_NOTICE "o2hb: Heartbeat started on region %s (%s)\n",
1861                        config_item_name(&reg->hr_item), reg->hr_dev_name);
1862
1863 out3:
1864         iput(inode);
1865 out2:
1866         fdput(f);
1867 out:
1868         if (ret < 0) {
1869                 if (reg->hr_bdev) {
1870                         blkdev_put(reg->hr_bdev, FMODE_READ|FMODE_WRITE);
1871                         reg->hr_bdev = NULL;
1872                 }
1873         }
1874         return ret;
1875 }
1876
1877 static ssize_t o2hb_region_pid_read(struct o2hb_region *reg,
1878                                       char *page)
1879 {
1880         pid_t pid = 0;
1881
1882         spin_lock(&o2hb_live_lock);
1883         if (reg->hr_task)
1884                 pid = task_pid_nr(reg->hr_task);
1885         spin_unlock(&o2hb_live_lock);
1886
1887         if (!pid)
1888                 return 0;
1889
1890         return sprintf(page, "%u\n", pid);
1891 }
1892
1893 struct o2hb_region_attribute {
1894         struct configfs_attribute attr;
1895         ssize_t (*show)(struct o2hb_region *, char *);
1896         ssize_t (*store)(struct o2hb_region *, const char *, size_t);
1897 };
1898
1899 static struct o2hb_region_attribute o2hb_region_attr_block_bytes = {
1900         .attr   = { .ca_owner = THIS_MODULE,
1901                     .ca_name = "block_bytes",
1902                     .ca_mode = S_IRUGO | S_IWUSR },
1903         .show   = o2hb_region_block_bytes_read,
1904         .store  = o2hb_region_block_bytes_write,
1905 };
1906
1907 static struct o2hb_region_attribute o2hb_region_attr_start_block = {
1908         .attr   = { .ca_owner = THIS_MODULE,
1909                     .ca_name = "start_block",
1910                     .ca_mode = S_IRUGO | S_IWUSR },
1911         .show   = o2hb_region_start_block_read,
1912         .store  = o2hb_region_start_block_write,
1913 };
1914
1915 static struct o2hb_region_attribute o2hb_region_attr_blocks = {
1916         .attr   = { .ca_owner = THIS_MODULE,
1917                     .ca_name = "blocks",
1918                     .ca_mode = S_IRUGO | S_IWUSR },
1919         .show   = o2hb_region_blocks_read,
1920         .store  = o2hb_region_blocks_write,
1921 };
1922
1923 static struct o2hb_region_attribute o2hb_region_attr_dev = {
1924         .attr   = { .ca_owner = THIS_MODULE,
1925                     .ca_name = "dev",
1926                     .ca_mode = S_IRUGO | S_IWUSR },
1927         .show   = o2hb_region_dev_read,
1928         .store  = o2hb_region_dev_write,
1929 };
1930
1931 static struct o2hb_region_attribute o2hb_region_attr_pid = {
1932        .attr   = { .ca_owner = THIS_MODULE,
1933                    .ca_name = "pid",
1934                    .ca_mode = S_IRUGO | S_IRUSR },
1935        .show   = o2hb_region_pid_read,
1936 };
1937
1938 static struct configfs_attribute *o2hb_region_attrs[] = {
1939         &o2hb_region_attr_block_bytes.attr,
1940         &o2hb_region_attr_start_block.attr,
1941         &o2hb_region_attr_blocks.attr,
1942         &o2hb_region_attr_dev.attr,
1943         &o2hb_region_attr_pid.attr,
1944         NULL,
1945 };
1946
1947 static ssize_t o2hb_region_show(struct config_item *item,
1948                                 struct configfs_attribute *attr,
1949                                 char *page)
1950 {
1951         struct o2hb_region *reg = to_o2hb_region(item);
1952         struct o2hb_region_attribute *o2hb_region_attr =
1953                 container_of(attr, struct o2hb_region_attribute, attr);
1954         ssize_t ret = 0;
1955
1956         if (o2hb_region_attr->show)
1957                 ret = o2hb_region_attr->show(reg, page);
1958         return ret;
1959 }
1960
1961 static ssize_t o2hb_region_store(struct config_item *item,
1962                                  struct configfs_attribute *attr,
1963                                  const char *page, size_t count)
1964 {
1965         struct o2hb_region *reg = to_o2hb_region(item);
1966         struct o2hb_region_attribute *o2hb_region_attr =
1967                 container_of(attr, struct o2hb_region_attribute, attr);
1968         ssize_t ret = -EINVAL;
1969
1970         if (o2hb_region_attr->store)
1971                 ret = o2hb_region_attr->store(reg, page, count);
1972         return ret;
1973 }
1974
1975 static struct configfs_item_operations o2hb_region_item_ops = {
1976         .release                = o2hb_region_release,
1977         .show_attribute         = o2hb_region_show,
1978         .store_attribute        = o2hb_region_store,
1979 };
1980
1981 static struct config_item_type o2hb_region_type = {
1982         .ct_item_ops    = &o2hb_region_item_ops,
1983         .ct_attrs       = o2hb_region_attrs,
1984         .ct_owner       = THIS_MODULE,
1985 };
1986
1987 /* heartbeat set */
1988
1989 struct o2hb_heartbeat_group {
1990         struct config_group hs_group;
1991         /* some stuff? */
1992 };
1993
1994 static struct o2hb_heartbeat_group *to_o2hb_heartbeat_group(struct config_group *group)
1995 {
1996         return group ?
1997                 container_of(group, struct o2hb_heartbeat_group, hs_group)
1998                 : NULL;
1999 }
2000
2001 static int o2hb_debug_region_init(struct o2hb_region *reg, struct dentry *dir)
2002 {
2003         int ret = -ENOMEM;
2004
2005         reg->hr_debug_dir =
2006                 debugfs_create_dir(config_item_name(&reg->hr_item), dir);
2007         if (!reg->hr_debug_dir) {
2008                 mlog_errno(ret);
2009                 goto bail;
2010         }
2011
2012         reg->hr_debug_livenodes =
2013                         o2hb_debug_create(O2HB_DEBUG_LIVENODES,
2014                                           reg->hr_debug_dir,
2015                                           &(reg->hr_db_livenodes),
2016                                           sizeof(*(reg->hr_db_livenodes)),
2017                                           O2HB_DB_TYPE_REGION_LIVENODES,
2018                                           sizeof(reg->hr_live_node_bitmap),
2019                                           O2NM_MAX_NODES, reg);
2020         if (!reg->hr_debug_livenodes) {
2021                 mlog_errno(ret);
2022                 goto bail;
2023         }
2024
2025         reg->hr_debug_regnum =
2026                         o2hb_debug_create(O2HB_DEBUG_REGION_NUMBER,
2027                                           reg->hr_debug_dir,
2028                                           &(reg->hr_db_regnum),
2029                                           sizeof(*(reg->hr_db_regnum)),
2030                                           O2HB_DB_TYPE_REGION_NUMBER,
2031                                           0, O2NM_MAX_NODES, reg);
2032         if (!reg->hr_debug_regnum) {
2033                 mlog_errno(ret);
2034                 goto bail;
2035         }
2036
2037         reg->hr_debug_elapsed_time =
2038                         o2hb_debug_create(O2HB_DEBUG_REGION_ELAPSED_TIME,
2039                                           reg->hr_debug_dir,
2040                                           &(reg->hr_db_elapsed_time),
2041                                           sizeof(*(reg->hr_db_elapsed_time)),
2042                                           O2HB_DB_TYPE_REGION_ELAPSED_TIME,
2043                                           0, 0, reg);
2044         if (!reg->hr_debug_elapsed_time) {
2045                 mlog_errno(ret);
2046                 goto bail;
2047         }
2048
2049         reg->hr_debug_pinned =
2050                         o2hb_debug_create(O2HB_DEBUG_REGION_PINNED,
2051                                           reg->hr_debug_dir,
2052                                           &(reg->hr_db_pinned),
2053                                           sizeof(*(reg->hr_db_pinned)),
2054                                           O2HB_DB_TYPE_REGION_PINNED,
2055                                           0, 0, reg);
2056         if (!reg->hr_debug_pinned) {
2057                 mlog_errno(ret);
2058                 goto bail;
2059         }
2060
2061         ret = 0;
2062 bail:
2063         return ret;
2064 }
2065
2066 static struct config_item *o2hb_heartbeat_group_make_item(struct config_group *group,
2067                                                           const char *name)
2068 {
2069         struct o2hb_region *reg = NULL;
2070         int ret;
2071
2072         reg = kzalloc(sizeof(struct o2hb_region), GFP_KERNEL);
2073         if (reg == NULL)
2074                 return ERR_PTR(-ENOMEM);
2075
2076         if (strlen(name) > O2HB_MAX_REGION_NAME_LEN) {
2077                 ret = -ENAMETOOLONG;
2078                 goto free;
2079         }
2080
2081         spin_lock(&o2hb_live_lock);
2082         reg->hr_region_num = 0;
2083         if (o2hb_global_heartbeat_active()) {
2084                 reg->hr_region_num = find_first_zero_bit(o2hb_region_bitmap,
2085                                                          O2NM_MAX_REGIONS);
2086                 if (reg->hr_region_num >= O2NM_MAX_REGIONS) {
2087                         spin_unlock(&o2hb_live_lock);
2088                         ret = -EFBIG;
2089                         goto free;
2090                 }
2091                 set_bit(reg->hr_region_num, o2hb_region_bitmap);
2092         }
2093         list_add_tail(&reg->hr_all_item, &o2hb_all_regions);
2094         spin_unlock(&o2hb_live_lock);
2095
2096         config_item_init_type_name(&reg->hr_item, name, &o2hb_region_type);
2097
2098         ret = o2hb_debug_region_init(reg, o2hb_debug_dir);
2099         if (ret) {
2100                 config_item_put(&reg->hr_item);
2101                 goto free;
2102         }
2103
2104         return &reg->hr_item;
2105 free:
2106         kfree(reg);
2107         return ERR_PTR(ret);
2108 }
2109
2110 static void o2hb_heartbeat_group_drop_item(struct config_group *group,
2111                                            struct config_item *item)
2112 {
2113         struct task_struct *hb_task;
2114         struct o2hb_region *reg = to_o2hb_region(item);
2115         int quorum_region = 0;
2116
2117         /* stop the thread when the user removes the region dir */
2118         spin_lock(&o2hb_live_lock);
2119         hb_task = reg->hr_task;
2120         reg->hr_task = NULL;
2121         reg->hr_item_dropped = 1;
2122         spin_unlock(&o2hb_live_lock);
2123
2124         if (hb_task)
2125                 kthread_stop(hb_task);
2126
2127         if (o2hb_global_heartbeat_active()) {
2128                 spin_lock(&o2hb_live_lock);
2129                 clear_bit(reg->hr_region_num, o2hb_region_bitmap);
2130                 clear_bit(reg->hr_region_num, o2hb_live_region_bitmap);
2131                 if (test_bit(reg->hr_region_num, o2hb_quorum_region_bitmap))
2132                         quorum_region = 1;
2133                 clear_bit(reg->hr_region_num, o2hb_quorum_region_bitmap);
2134                 spin_unlock(&o2hb_live_lock);
2135                 printk(KERN_NOTICE "o2hb: Heartbeat %s on region %s (%s)\n",
2136                        ((atomic_read(&reg->hr_steady_iterations) == 0) ?
2137                         "stopped" : "start aborted"), config_item_name(item),
2138                        reg->hr_dev_name);
2139         }
2140
2141         /*
2142          * If we're racing a dev_write(), we need to wake them.  They will
2143          * check reg->hr_task
2144          */
2145         if (atomic_read(&reg->hr_steady_iterations) != 0) {
2146                 reg->hr_aborted_start = 1;
2147                 atomic_set(&reg->hr_steady_iterations, 0);
2148                 wake_up(&o2hb_steady_queue);
2149         }
2150
2151         config_item_put(item);
2152
2153         if (!o2hb_global_heartbeat_active() || !quorum_region)
2154                 return;
2155
2156         /*
2157          * If global heartbeat active and there are dependent users,
2158          * pin all regions if quorum region count <= CUT_OFF
2159          */
2160         spin_lock(&o2hb_live_lock);
2161
2162         if (!o2hb_dependent_users)
2163                 goto unlock;
2164
2165         if (bitmap_weight(o2hb_quorum_region_bitmap,
2166                            O2NM_MAX_REGIONS) <= O2HB_PIN_CUT_OFF)
2167                 o2hb_region_pin(NULL);
2168
2169 unlock:
2170         spin_unlock(&o2hb_live_lock);
2171 }
2172
2173 struct o2hb_heartbeat_group_attribute {
2174         struct configfs_attribute attr;
2175         ssize_t (*show)(struct o2hb_heartbeat_group *, char *);
2176         ssize_t (*store)(struct o2hb_heartbeat_group *, const char *, size_t);
2177 };
2178
2179 static ssize_t o2hb_heartbeat_group_show(struct config_item *item,
2180                                          struct configfs_attribute *attr,
2181                                          char *page)
2182 {
2183         struct o2hb_heartbeat_group *reg = to_o2hb_heartbeat_group(to_config_group(item));
2184         struct o2hb_heartbeat_group_attribute *o2hb_heartbeat_group_attr =
2185                 container_of(attr, struct o2hb_heartbeat_group_attribute, attr);
2186         ssize_t ret = 0;
2187
2188         if (o2hb_heartbeat_group_attr->show)
2189                 ret = o2hb_heartbeat_group_attr->show(reg, page);
2190         return ret;
2191 }
2192
2193 static ssize_t o2hb_heartbeat_group_store(struct config_item *item,
2194                                           struct configfs_attribute *attr,
2195                                           const char *page, size_t count)
2196 {
2197         struct o2hb_heartbeat_group *reg = to_o2hb_heartbeat_group(to_config_group(item));
2198         struct o2hb_heartbeat_group_attribute *o2hb_heartbeat_group_attr =
2199                 container_of(attr, struct o2hb_heartbeat_group_attribute, attr);
2200         ssize_t ret = -EINVAL;
2201
2202         if (o2hb_heartbeat_group_attr->store)
2203                 ret = o2hb_heartbeat_group_attr->store(reg, page, count);
2204         return ret;
2205 }
2206
2207 static ssize_t o2hb_heartbeat_group_threshold_show(struct o2hb_heartbeat_group *group,
2208                                                      char *page)
2209 {
2210         return sprintf(page, "%u\n", o2hb_dead_threshold);
2211 }
2212
2213 static ssize_t o2hb_heartbeat_group_threshold_store(struct o2hb_heartbeat_group *group,
2214                                                     const char *page,
2215                                                     size_t count)
2216 {
2217         unsigned long tmp;
2218         char *p = (char *)page;
2219
2220         tmp = simple_strtoul(p, &p, 10);
2221         if (!p || (*p && (*p != '\n')))
2222                 return -EINVAL;
2223
2224         /* this will validate ranges for us. */
2225         o2hb_dead_threshold_set((unsigned int) tmp);
2226
2227         return count;
2228 }
2229
2230 static
2231 ssize_t o2hb_heartbeat_group_mode_show(struct o2hb_heartbeat_group *group,
2232                                        char *page)
2233 {
2234         return sprintf(page, "%s\n",
2235                        o2hb_heartbeat_mode_desc[o2hb_heartbeat_mode]);
2236 }
2237
2238 static
2239 ssize_t o2hb_heartbeat_group_mode_store(struct o2hb_heartbeat_group *group,
2240                                         const char *page, size_t count)
2241 {
2242         unsigned int i;
2243         int ret;
2244         size_t len;
2245
2246         len = (page[count - 1] == '\n') ? count - 1 : count;
2247         if (!len)
2248                 return -EINVAL;
2249
2250         for (i = 0; i < O2HB_HEARTBEAT_NUM_MODES; ++i) {
2251                 if (strnicmp(page, o2hb_heartbeat_mode_desc[i], len))
2252                         continue;
2253
2254                 ret = o2hb_global_hearbeat_mode_set(i);
2255                 if (!ret)
2256                         printk(KERN_NOTICE "o2hb: Heartbeat mode set to %s\n",
2257                                o2hb_heartbeat_mode_desc[i]);
2258                 return count;
2259         }
2260
2261         return -EINVAL;
2262
2263 }
2264
2265 static struct o2hb_heartbeat_group_attribute o2hb_heartbeat_group_attr_threshold = {
2266         .attr   = { .ca_owner = THIS_MODULE,
2267                     .ca_name = "dead_threshold",
2268                     .ca_mode = S_IRUGO | S_IWUSR },
2269         .show   = o2hb_heartbeat_group_threshold_show,
2270         .store  = o2hb_heartbeat_group_threshold_store,
2271 };
2272
2273 static struct o2hb_heartbeat_group_attribute o2hb_heartbeat_group_attr_mode = {
2274         .attr   = { .ca_owner = THIS_MODULE,
2275                 .ca_name = "mode",
2276                 .ca_mode = S_IRUGO | S_IWUSR },
2277         .show   = o2hb_heartbeat_group_mode_show,
2278         .store  = o2hb_heartbeat_group_mode_store,
2279 };
2280
2281 static struct configfs_attribute *o2hb_heartbeat_group_attrs[] = {
2282         &o2hb_heartbeat_group_attr_threshold.attr,
2283         &o2hb_heartbeat_group_attr_mode.attr,
2284         NULL,
2285 };
2286
2287 static struct configfs_item_operations o2hb_hearbeat_group_item_ops = {
2288         .show_attribute         = o2hb_heartbeat_group_show,
2289         .store_attribute        = o2hb_heartbeat_group_store,
2290 };
2291
2292 static struct configfs_group_operations o2hb_heartbeat_group_group_ops = {
2293         .make_item      = o2hb_heartbeat_group_make_item,
2294         .drop_item      = o2hb_heartbeat_group_drop_item,
2295 };
2296
2297 static struct config_item_type o2hb_heartbeat_group_type = {
2298         .ct_group_ops   = &o2hb_heartbeat_group_group_ops,
2299         .ct_item_ops    = &o2hb_hearbeat_group_item_ops,
2300         .ct_attrs       = o2hb_heartbeat_group_attrs,
2301         .ct_owner       = THIS_MODULE,
2302 };
2303
2304 /* this is just here to avoid touching group in heartbeat.h which the
2305  * entire damn world #includes */
2306 struct config_group *o2hb_alloc_hb_set(void)
2307 {
2308         struct o2hb_heartbeat_group *hs = NULL;
2309         struct config_group *ret = NULL;
2310
2311         hs = kzalloc(sizeof(struct o2hb_heartbeat_group), GFP_KERNEL);
2312         if (hs == NULL)
2313                 goto out;
2314
2315         config_group_init_type_name(&hs->hs_group, "heartbeat",
2316                                     &o2hb_heartbeat_group_type);
2317
2318         ret = &hs->hs_group;
2319 out:
2320         if (ret == NULL)
2321                 kfree(hs);
2322         return ret;
2323 }
2324
2325 void o2hb_free_hb_set(struct config_group *group)
2326 {
2327         struct o2hb_heartbeat_group *hs = to_o2hb_heartbeat_group(group);
2328         kfree(hs);
2329 }
2330
2331 /* hb callback registration and issuing */
2332
2333 static struct o2hb_callback *hbcall_from_type(enum o2hb_callback_type type)
2334 {
2335         if (type == O2HB_NUM_CB)
2336                 return ERR_PTR(-EINVAL);
2337
2338         return &o2hb_callbacks[type];
2339 }
2340
2341 void o2hb_setup_callback(struct o2hb_callback_func *hc,
2342                          enum o2hb_callback_type type,
2343                          o2hb_cb_func *func,
2344                          void *data,
2345                          int priority)
2346 {
2347         INIT_LIST_HEAD(&hc->hc_item);
2348         hc->hc_func = func;
2349         hc->hc_data = data;
2350         hc->hc_priority = priority;
2351         hc->hc_type = type;
2352         hc->hc_magic = O2HB_CB_MAGIC;
2353 }
2354 EXPORT_SYMBOL_GPL(o2hb_setup_callback);
2355
2356 /*
2357  * In local heartbeat mode, region_uuid passed matches the dlm domain name.
2358  * In global heartbeat mode, region_uuid passed is NULL.
2359  *
2360  * In local, we only pin the matching region. In global we pin all the active
2361  * regions.
2362  */
2363 static int o2hb_region_pin(const char *region_uuid)
2364 {
2365         int ret = 0, found = 0;
2366         struct o2hb_region *reg;
2367         char *uuid;
2368
2369         assert_spin_locked(&o2hb_live_lock);
2370
2371         list_for_each_entry(reg, &o2hb_all_regions, hr_all_item) {
2372                 uuid = config_item_name(&reg->hr_item);
2373
2374                 /* local heartbeat */
2375                 if (region_uuid) {
2376                         if (strcmp(region_uuid, uuid))
2377                                 continue;
2378                         found = 1;
2379                 }
2380
2381                 if (reg->hr_item_pinned || reg->hr_item_dropped)
2382                         goto skip_pin;
2383
2384                 /* Ignore ENOENT only for local hb (userdlm domain) */
2385                 ret = o2nm_depend_item(&reg->hr_item);
2386                 if (!ret) {
2387                         mlog(ML_CLUSTER, "Pin region %s\n", uuid);
2388                         reg->hr_item_pinned = 1;
2389                 } else {
2390                         if (ret == -ENOENT && found)
2391                                 ret = 0;
2392                         else {
2393                                 mlog(ML_ERROR, "Pin region %s fails with %d\n",
2394                                      uuid, ret);
2395                                 break;
2396                         }
2397                 }
2398 skip_pin:
2399                 if (found)
2400                         break;
2401         }
2402
2403         return ret;
2404 }
2405
2406 /*
2407  * In local heartbeat mode, region_uuid passed matches the dlm domain name.
2408  * In global heartbeat mode, region_uuid passed is NULL.
2409  *
2410  * In local, we only unpin the matching region. In global we unpin all the
2411  * active regions.
2412  */
2413 static void o2hb_region_unpin(const char *region_uuid)
2414 {
2415         struct o2hb_region *reg;
2416         char *uuid;
2417         int found = 0;
2418
2419         assert_spin_locked(&o2hb_live_lock);
2420
2421         list_for_each_entry(reg, &o2hb_all_regions, hr_all_item) {
2422                 uuid = config_item_name(&reg->hr_item);
2423                 if (region_uuid) {
2424                         if (strcmp(region_uuid, uuid))
2425                                 continue;
2426                         found = 1;
2427                 }
2428
2429                 if (reg->hr_item_pinned) {
2430                         mlog(ML_CLUSTER, "Unpin region %s\n", uuid);
2431                         o2nm_undepend_item(&reg->hr_item);
2432                         reg->hr_item_pinned = 0;
2433                 }
2434                 if (found)
2435                         break;
2436         }
2437 }
2438
2439 static int o2hb_region_inc_user(const char *region_uuid)
2440 {
2441         int ret = 0;
2442
2443         spin_lock(&o2hb_live_lock);
2444
2445         /* local heartbeat */
2446         if (!o2hb_global_heartbeat_active()) {
2447             ret = o2hb_region_pin(region_uuid);
2448             goto unlock;
2449         }
2450
2451         /*
2452          * if global heartbeat active and this is the first dependent user,
2453          * pin all regions if quorum region count <= CUT_OFF
2454          */
2455         o2hb_dependent_users++;
2456         if (o2hb_dependent_users > 1)
2457                 goto unlock;
2458
2459         if (bitmap_weight(o2hb_quorum_region_bitmap,
2460                            O2NM_MAX_REGIONS) <= O2HB_PIN_CUT_OFF)
2461                 ret = o2hb_region_pin(NULL);
2462
2463 unlock:
2464         spin_unlock(&o2hb_live_lock);
2465         return ret;
2466 }
2467
2468 void o2hb_region_dec_user(const char *region_uuid)
2469 {
2470         spin_lock(&o2hb_live_lock);
2471
2472         /* local heartbeat */
2473         if (!o2hb_global_heartbeat_active()) {
2474             o2hb_region_unpin(region_uuid);
2475             goto unlock;
2476         }
2477
2478         /*
2479          * if global heartbeat active and there are no dependent users,
2480          * unpin all quorum regions
2481          */
2482         o2hb_dependent_users--;
2483         if (!o2hb_dependent_users)
2484                 o2hb_region_unpin(NULL);
2485
2486 unlock:
2487         spin_unlock(&o2hb_live_lock);
2488 }
2489
2490 int o2hb_register_callback(const char *region_uuid,
2491                            struct o2hb_callback_func *hc)
2492 {
2493         struct o2hb_callback_func *tmp;
2494         struct list_head *iter;
2495         struct o2hb_callback *hbcall;
2496         int ret;
2497
2498         BUG_ON(hc->hc_magic != O2HB_CB_MAGIC);
2499         BUG_ON(!list_empty(&hc->hc_item));
2500
2501         hbcall = hbcall_from_type(hc->hc_type);
2502         if (IS_ERR(hbcall)) {
2503                 ret = PTR_ERR(hbcall);
2504                 goto out;
2505         }
2506
2507         if (region_uuid) {
2508                 ret = o2hb_region_inc_user(region_uuid);
2509                 if (ret) {
2510                         mlog_errno(ret);
2511                         goto out;
2512                 }
2513         }
2514
2515         down_write(&o2hb_callback_sem);
2516
2517         list_for_each(iter, &hbcall->list) {
2518                 tmp = list_entry(iter, struct o2hb_callback_func, hc_item);
2519                 if (hc->hc_priority < tmp->hc_priority) {
2520                         list_add_tail(&hc->hc_item, iter);
2521                         break;
2522                 }
2523         }
2524         if (list_empty(&hc->hc_item))
2525                 list_add_tail(&hc->hc_item, &hbcall->list);
2526
2527         up_write(&o2hb_callback_sem);
2528         ret = 0;
2529 out:
2530         mlog(ML_CLUSTER, "returning %d on behalf of %p for funcs %p\n",
2531              ret, __builtin_return_address(0), hc);
2532         return ret;
2533 }
2534 EXPORT_SYMBOL_GPL(o2hb_register_callback);
2535
2536 void o2hb_unregister_callback(const char *region_uuid,
2537                               struct o2hb_callback_func *hc)
2538 {
2539         BUG_ON(hc->hc_magic != O2HB_CB_MAGIC);
2540
2541         mlog(ML_CLUSTER, "on behalf of %p for funcs %p\n",
2542              __builtin_return_address(0), hc);
2543
2544         /* XXX Can this happen _with_ a region reference? */
2545         if (list_empty(&hc->hc_item))
2546                 return;
2547
2548         if (region_uuid)
2549                 o2hb_region_dec_user(region_uuid);
2550
2551         down_write(&o2hb_callback_sem);
2552
2553         list_del_init(&hc->hc_item);
2554
2555         up_write(&o2hb_callback_sem);
2556 }
2557 EXPORT_SYMBOL_GPL(o2hb_unregister_callback);
2558
2559 int o2hb_check_node_heartbeating(u8 node_num)
2560 {
2561         unsigned long testing_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
2562
2563         o2hb_fill_node_map(testing_map, sizeof(testing_map));
2564         if (!test_bit(node_num, testing_map)) {
2565                 mlog(ML_HEARTBEAT,
2566                      "node (%u) does not have heartbeating enabled.\n",
2567                      node_num);
2568                 return 0;
2569         }
2570
2571         return 1;
2572 }
2573 EXPORT_SYMBOL_GPL(o2hb_check_node_heartbeating);
2574
2575 int o2hb_check_node_heartbeating_from_callback(u8 node_num)
2576 {
2577         unsigned long testing_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
2578
2579         o2hb_fill_node_map_from_callback(testing_map, sizeof(testing_map));
2580         if (!test_bit(node_num, testing_map)) {
2581                 mlog(ML_HEARTBEAT,
2582                      "node (%u) does not have heartbeating enabled.\n",
2583                      node_num);
2584                 return 0;
2585         }
2586
2587         return 1;
2588 }
2589 EXPORT_SYMBOL_GPL(o2hb_check_node_heartbeating_from_callback);
2590
2591 /* Makes sure our local node is configured with a node number, and is
2592  * heartbeating. */
2593 int o2hb_check_local_node_heartbeating(void)
2594 {
2595         u8 node_num;
2596
2597         /* if this node was set then we have networking */
2598         node_num = o2nm_this_node();
2599         if (node_num == O2NM_MAX_NODES) {
2600                 mlog(ML_HEARTBEAT, "this node has not been configured.\n");
2601                 return 0;
2602         }
2603
2604         return o2hb_check_node_heartbeating(node_num);
2605 }
2606 EXPORT_SYMBOL_GPL(o2hb_check_local_node_heartbeating);
2607
2608 /*
2609  * this is just a hack until we get the plumbing which flips file systems
2610  * read only and drops the hb ref instead of killing the node dead.
2611  */
2612 void o2hb_stop_all_regions(void)
2613 {
2614         struct o2hb_region *reg;
2615
2616         mlog(ML_ERROR, "stopping heartbeat on all active regions.\n");
2617
2618         spin_lock(&o2hb_live_lock);
2619
2620         list_for_each_entry(reg, &o2hb_all_regions, hr_all_item)
2621                 reg->hr_unclean_stop = 1;
2622
2623         spin_unlock(&o2hb_live_lock);
2624 }
2625 EXPORT_SYMBOL_GPL(o2hb_stop_all_regions);
2626
2627 int o2hb_get_all_regions(char *region_uuids, u8 max_regions)
2628 {
2629         struct o2hb_region *reg;
2630         int numregs = 0;
2631         char *p;
2632
2633         spin_lock(&o2hb_live_lock);
2634
2635         p = region_uuids;
2636         list_for_each_entry(reg, &o2hb_all_regions, hr_all_item) {
2637                 mlog(0, "Region: %s\n", config_item_name(&reg->hr_item));
2638                 if (numregs < max_regions) {
2639                         memcpy(p, config_item_name(&reg->hr_item),
2640                                O2HB_MAX_REGION_NAME_LEN);
2641                         p += O2HB_MAX_REGION_NAME_LEN;
2642                 }
2643                 numregs++;
2644         }
2645
2646         spin_unlock(&o2hb_live_lock);
2647
2648         return numregs;
2649 }
2650 EXPORT_SYMBOL_GPL(o2hb_get_all_regions);
2651
2652 int o2hb_global_heartbeat_active(void)
2653 {
2654         return (o2hb_heartbeat_mode == O2HB_HEARTBEAT_GLOBAL);
2655 }
2656 EXPORT_SYMBOL(o2hb_global_heartbeat_active);