4 This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
6 Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7 Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8 Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
10 drbd is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation; either version 2, or (at your option)
15 drbd is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License for more details.
20 You should have received a copy of the GNU General Public License
21 along with drbd; see the file COPYING. If not, write to
22 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
26 #include <linux/module.h>
27 #include <linux/drbd.h>
28 #include <linux/sched.h>
29 #include <linux/wait.h>
31 #include <linux/memcontrol.h>
32 #include <linux/mm_inline.h>
33 #include <linux/slab.h>
34 #include <linux/random.h>
35 #include <linux/string.h>
36 #include <linux/scatterlist.h>
39 #include "drbd_protocol.h"
42 static int make_ov_request(struct drbd_device *, int);
43 static int make_resync_request(struct drbd_device *, int);
46 * drbd_md_endio (defined here)
47 * drbd_request_endio (defined here)
48 * drbd_peer_request_endio (defined here)
49 * drbd_bm_endio (defined in drbd_bitmap.c)
51 * For all these callbacks, note the following:
52 * The callbacks will be called in irq context by the IDE drivers,
53 * and in Softirqs/Tasklets/BH context by the SCSI drivers.
54 * Try to get the locking right :)
58 /* used for synchronous meta data and bitmap IO
59 * submitted by drbd_md_sync_page_io()
61 void drbd_md_endio(struct bio *bio)
63 struct drbd_device *device;
65 device = bio->bi_private;
66 device->md_io.error = bio->bi_error;
68 /* We grabbed an extra reference in _drbd_md_sync_page_io() to be able
69 * to timeout on the lower level device, and eventually detach from it.
70 * If this io completion runs after that timeout expired, this
71 * drbd_md_put_buffer() may allow us to finally try and re-attach.
72 * During normal operation, this only puts that extra reference
74 * Make sure we first drop the reference, and only then signal
75 * completion, or we may (in drbd_al_read_log()) cycle so fast into the
76 * next drbd_md_sync_page_io(), that we trigger the
77 * ASSERT(atomic_read(&device->md_io_in_use) == 1) there.
79 drbd_md_put_buffer(device);
80 device->md_io.done = 1;
81 wake_up(&device->misc_wait);
83 if (device->ldev) /* special case: drbd_md_read() during drbd_adm_attach() */
87 /* reads on behalf of the partner,
88 * "submitted" by the receiver
90 static void drbd_endio_read_sec_final(struct drbd_peer_request *peer_req) __releases(local)
92 unsigned long flags = 0;
93 struct drbd_peer_device *peer_device = peer_req->peer_device;
94 struct drbd_device *device = peer_device->device;
96 spin_lock_irqsave(&device->resource->req_lock, flags);
97 device->read_cnt += peer_req->i.size >> 9;
98 list_del(&peer_req->w.list);
99 if (list_empty(&device->read_ee))
100 wake_up(&device->ee_wait);
101 if (test_bit(__EE_WAS_ERROR, &peer_req->flags))
102 __drbd_chk_io_error(device, DRBD_READ_ERROR);
103 spin_unlock_irqrestore(&device->resource->req_lock, flags);
105 drbd_queue_work(&peer_device->connection->sender_work, &peer_req->w);
109 /* writes on behalf of the partner, or resync writes,
110 * "submitted" by the receiver, final stage. */
111 void drbd_endio_write_sec_final(struct drbd_peer_request *peer_req) __releases(local)
113 unsigned long flags = 0;
114 struct drbd_peer_device *peer_device = peer_req->peer_device;
115 struct drbd_device *device = peer_device->device;
116 struct drbd_connection *connection = peer_device->connection;
117 struct drbd_interval i;
120 int do_al_complete_io;
122 /* after we moved peer_req to done_ee,
123 * we may no longer access it,
124 * it may be freed/reused already!
125 * (as soon as we release the req_lock) */
127 do_al_complete_io = peer_req->flags & EE_CALL_AL_COMPLETE_IO;
128 block_id = peer_req->block_id;
129 peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
131 spin_lock_irqsave(&device->resource->req_lock, flags);
132 device->writ_cnt += peer_req->i.size >> 9;
133 list_move_tail(&peer_req->w.list, &device->done_ee);
136 * Do not remove from the write_requests tree here: we did not send the
137 * Ack yet and did not wake possibly waiting conflicting requests.
138 * Removed from the tree from "drbd_process_done_ee" within the
139 * appropriate dw.cb (e_end_block/e_end_resync_block) or from
140 * _drbd_clear_done_ee.
143 do_wake = list_empty(block_id == ID_SYNCER ? &device->sync_ee : &device->active_ee);
145 /* FIXME do we want to detach for failed REQ_DISCARD?
146 * ((peer_req->flags & (EE_WAS_ERROR|EE_IS_TRIM)) == EE_WAS_ERROR) */
147 if (peer_req->flags & EE_WAS_ERROR)
148 __drbd_chk_io_error(device, DRBD_WRITE_ERROR);
150 if (connection->cstate >= C_WF_REPORT_PARAMS) {
151 kref_get(&device->kref); /* put is in drbd_send_acks_wf() */
152 if (!queue_work(connection->ack_sender, &peer_device->send_acks_work))
153 kref_put(&device->kref, drbd_destroy_device);
155 spin_unlock_irqrestore(&device->resource->req_lock, flags);
157 if (block_id == ID_SYNCER)
158 drbd_rs_complete_io(device, i.sector);
161 wake_up(&device->ee_wait);
163 if (do_al_complete_io)
164 drbd_al_complete_io(device, &i);
169 /* writes on behalf of the partner, or resync writes,
170 * "submitted" by the receiver.
172 void drbd_peer_request_endio(struct bio *bio)
174 struct drbd_peer_request *peer_req = bio->bi_private;
175 struct drbd_device *device = peer_req->peer_device->device;
176 int is_write = bio_data_dir(bio) == WRITE;
177 int is_discard = !!(bio_op(bio) == REQ_OP_DISCARD);
179 if (bio->bi_error && __ratelimit(&drbd_ratelimit_state))
180 drbd_warn(device, "%s: error=%d s=%llus\n",
181 is_write ? (is_discard ? "discard" : "write")
182 : "read", bio->bi_error,
183 (unsigned long long)peer_req->i.sector);
186 set_bit(__EE_WAS_ERROR, &peer_req->flags);
188 bio_put(bio); /* no need for the bio anymore */
189 if (atomic_dec_and_test(&peer_req->pending_bios)) {
191 drbd_endio_write_sec_final(peer_req);
193 drbd_endio_read_sec_final(peer_req);
197 void drbd_panic_after_delayed_completion_of_aborted_request(struct drbd_device *device)
199 panic("drbd%u %s/%u potential random memory corruption caused by delayed completion of aborted local request\n",
200 device->minor, device->resource->name, device->vnr);
203 /* read, readA or write requests on R_PRIMARY coming from drbd_make_request
205 void drbd_request_endio(struct bio *bio)
208 struct drbd_request *req = bio->bi_private;
209 struct drbd_device *device = req->device;
210 struct bio_and_error m;
211 enum drbd_req_event what;
213 /* If this request was aborted locally before,
214 * but now was completed "successfully",
215 * chances are that this caused arbitrary data corruption.
217 * "aborting" requests, or force-detaching the disk, is intended for
218 * completely blocked/hung local backing devices which do no longer
219 * complete requests at all, not even do error completions. In this
220 * situation, usually a hard-reset and failover is the only way out.
222 * By "aborting", basically faking a local error-completion,
223 * we allow for a more graceful swichover by cleanly migrating services.
224 * Still the affected node has to be rebooted "soon".
226 * By completing these requests, we allow the upper layers to re-use
227 * the associated data pages.
229 * If later the local backing device "recovers", and now DMAs some data
230 * from disk into the original request pages, in the best case it will
231 * just put random data into unused pages; but typically it will corrupt
232 * meanwhile completely unrelated data, causing all sorts of damage.
234 * Which means delayed successful completion,
235 * especially for READ requests,
236 * is a reason to panic().
238 * We assume that a delayed *error* completion is OK,
239 * though we still will complain noisily about it.
241 if (unlikely(req->rq_state & RQ_LOCAL_ABORTED)) {
242 if (__ratelimit(&drbd_ratelimit_state))
243 drbd_emerg(device, "delayed completion of aborted local request; disk-timeout may be too aggressive\n");
246 drbd_panic_after_delayed_completion_of_aborted_request(device);
249 /* to avoid recursion in __req_mod */
250 if (unlikely(bio->bi_error)) {
251 if (bio_op(bio) == REQ_OP_DISCARD)
252 what = (bio->bi_error == -EOPNOTSUPP)
253 ? DISCARD_COMPLETED_NOTSUPP
254 : DISCARD_COMPLETED_WITH_ERROR;
256 what = (bio_data_dir(bio) == WRITE)
257 ? WRITE_COMPLETED_WITH_ERROR
258 : (bio_rw(bio) == READ)
259 ? READ_COMPLETED_WITH_ERROR
260 : READ_AHEAD_COMPLETED_WITH_ERROR;
264 bio_put(req->private_bio);
265 req->private_bio = ERR_PTR(bio->bi_error);
267 /* not req_mod(), we need irqsave here! */
268 spin_lock_irqsave(&device->resource->req_lock, flags);
269 __req_mod(req, what, &m);
270 spin_unlock_irqrestore(&device->resource->req_lock, flags);
274 complete_master_bio(device, &m);
277 void drbd_csum_ee(struct crypto_ahash *tfm, struct drbd_peer_request *peer_req, void *digest)
279 AHASH_REQUEST_ON_STACK(req, tfm);
280 struct scatterlist sg;
281 struct page *page = peer_req->pages;
285 ahash_request_set_tfm(req, tfm);
286 ahash_request_set_callback(req, 0, NULL, NULL);
288 sg_init_table(&sg, 1);
289 crypto_ahash_init(req);
291 while ((tmp = page_chain_next(page))) {
292 /* all but the last page will be fully used */
293 sg_set_page(&sg, page, PAGE_SIZE, 0);
294 ahash_request_set_crypt(req, &sg, NULL, sg.length);
295 crypto_ahash_update(req);
298 /* and now the last, possibly only partially used page */
299 len = peer_req->i.size & (PAGE_SIZE - 1);
300 sg_set_page(&sg, page, len ?: PAGE_SIZE, 0);
301 ahash_request_set_crypt(req, &sg, digest, sg.length);
302 crypto_ahash_finup(req);
303 ahash_request_zero(req);
306 void drbd_csum_bio(struct crypto_ahash *tfm, struct bio *bio, void *digest)
308 AHASH_REQUEST_ON_STACK(req, tfm);
309 struct scatterlist sg;
311 struct bvec_iter iter;
313 ahash_request_set_tfm(req, tfm);
314 ahash_request_set_callback(req, 0, NULL, NULL);
316 sg_init_table(&sg, 1);
317 crypto_ahash_init(req);
319 bio_for_each_segment(bvec, bio, iter) {
320 sg_set_page(&sg, bvec.bv_page, bvec.bv_len, bvec.bv_offset);
321 ahash_request_set_crypt(req, &sg, NULL, sg.length);
322 crypto_ahash_update(req);
324 ahash_request_set_crypt(req, NULL, digest, 0);
325 crypto_ahash_final(req);
326 ahash_request_zero(req);
329 /* MAYBE merge common code with w_e_end_ov_req */
330 static int w_e_send_csum(struct drbd_work *w, int cancel)
332 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
333 struct drbd_peer_device *peer_device = peer_req->peer_device;
334 struct drbd_device *device = peer_device->device;
339 if (unlikely(cancel))
342 if (unlikely((peer_req->flags & EE_WAS_ERROR) != 0))
345 digest_size = crypto_ahash_digestsize(peer_device->connection->csums_tfm);
346 digest = kmalloc(digest_size, GFP_NOIO);
348 sector_t sector = peer_req->i.sector;
349 unsigned int size = peer_req->i.size;
350 drbd_csum_ee(peer_device->connection->csums_tfm, peer_req, digest);
351 /* Free peer_req and pages before send.
352 * In case we block on congestion, we could otherwise run into
353 * some distributed deadlock, if the other side blocks on
354 * congestion as well, because our receiver blocks in
355 * drbd_alloc_pages due to pp_in_use > max_buffers. */
356 drbd_free_peer_req(device, peer_req);
358 inc_rs_pending(device);
359 err = drbd_send_drequest_csum(peer_device, sector, size,
364 drbd_err(device, "kmalloc() of digest failed.\n");
370 drbd_free_peer_req(device, peer_req);
373 drbd_err(device, "drbd_send_drequest(..., csum) failed\n");
377 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
379 static int read_for_csum(struct drbd_peer_device *peer_device, sector_t sector, int size)
381 struct drbd_device *device = peer_device->device;
382 struct drbd_peer_request *peer_req;
384 if (!get_ldev(device))
387 /* GFP_TRY, because if there is no memory available right now, this may
388 * be rescheduled for later. It is "only" background resync, after all. */
389 peer_req = drbd_alloc_peer_req(peer_device, ID_SYNCER /* unused */, sector,
390 size, true /* has real payload */, GFP_TRY);
394 peer_req->w.cb = w_e_send_csum;
395 spin_lock_irq(&device->resource->req_lock);
396 list_add_tail(&peer_req->w.list, &device->read_ee);
397 spin_unlock_irq(&device->resource->req_lock);
399 atomic_add(size >> 9, &device->rs_sect_ev);
400 if (drbd_submit_peer_request(device, peer_req, REQ_OP_READ, 0,
401 DRBD_FAULT_RS_RD) == 0)
404 /* If it failed because of ENOMEM, retry should help. If it failed
405 * because bio_add_page failed (probably broken lower level driver),
406 * retry may or may not help.
407 * If it does not, you may need to force disconnect. */
408 spin_lock_irq(&device->resource->req_lock);
409 list_del(&peer_req->w.list);
410 spin_unlock_irq(&device->resource->req_lock);
412 drbd_free_peer_req(device, peer_req);
418 int w_resync_timer(struct drbd_work *w, int cancel)
420 struct drbd_device *device =
421 container_of(w, struct drbd_device, resync_work);
423 switch (device->state.conn) {
425 make_ov_request(device, cancel);
428 make_resync_request(device, cancel);
435 void resync_timer_fn(unsigned long data)
437 struct drbd_device *device = (struct drbd_device *) data;
439 drbd_queue_work_if_unqueued(
440 &first_peer_device(device)->connection->sender_work,
441 &device->resync_work);
444 static void fifo_set(struct fifo_buffer *fb, int value)
448 for (i = 0; i < fb->size; i++)
449 fb->values[i] = value;
452 static int fifo_push(struct fifo_buffer *fb, int value)
456 ov = fb->values[fb->head_index];
457 fb->values[fb->head_index++] = value;
459 if (fb->head_index >= fb->size)
465 static void fifo_add_val(struct fifo_buffer *fb, int value)
469 for (i = 0; i < fb->size; i++)
470 fb->values[i] += value;
473 struct fifo_buffer *fifo_alloc(int fifo_size)
475 struct fifo_buffer *fb;
477 fb = kzalloc(sizeof(struct fifo_buffer) + sizeof(int) * fifo_size, GFP_NOIO);
482 fb->size = fifo_size;
488 static int drbd_rs_controller(struct drbd_device *device, unsigned int sect_in)
490 struct disk_conf *dc;
491 unsigned int want; /* The number of sectors we want in-flight */
492 int req_sect; /* Number of sectors to request in this turn */
493 int correction; /* Number of sectors more we need in-flight */
494 int cps; /* correction per invocation of drbd_rs_controller() */
495 int steps; /* Number of time steps to plan ahead */
498 struct fifo_buffer *plan;
500 dc = rcu_dereference(device->ldev->disk_conf);
501 plan = rcu_dereference(device->rs_plan_s);
503 steps = plan->size; /* (dc->c_plan_ahead * 10 * SLEEP_TIME) / HZ; */
505 if (device->rs_in_flight + sect_in == 0) { /* At start of resync */
506 want = ((dc->resync_rate * 2 * SLEEP_TIME) / HZ) * steps;
507 } else { /* normal path */
508 want = dc->c_fill_target ? dc->c_fill_target :
509 sect_in * dc->c_delay_target * HZ / (SLEEP_TIME * 10);
512 correction = want - device->rs_in_flight - plan->total;
515 cps = correction / steps;
516 fifo_add_val(plan, cps);
517 plan->total += cps * steps;
519 /* What we do in this step */
520 curr_corr = fifo_push(plan, 0);
521 plan->total -= curr_corr;
523 req_sect = sect_in + curr_corr;
527 max_sect = (dc->c_max_rate * 2 * SLEEP_TIME) / HZ;
528 if (req_sect > max_sect)
532 drbd_warn(device, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n",
533 sect_in, device->rs_in_flight, want, correction,
534 steps, cps, device->rs_planed, curr_corr, req_sect);
540 static int drbd_rs_number_requests(struct drbd_device *device)
542 unsigned int sect_in; /* Number of sectors that came in since the last turn */
545 sect_in = atomic_xchg(&device->rs_sect_in, 0);
546 device->rs_in_flight -= sect_in;
549 mxb = drbd_get_max_buffers(device) / 2;
550 if (rcu_dereference(device->rs_plan_s)->size) {
551 number = drbd_rs_controller(device, sect_in) >> (BM_BLOCK_SHIFT - 9);
552 device->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME;
554 device->c_sync_rate = rcu_dereference(device->ldev->disk_conf)->resync_rate;
555 number = SLEEP_TIME * device->c_sync_rate / ((BM_BLOCK_SIZE / 1024) * HZ);
559 /* Don't have more than "max-buffers"/2 in-flight.
560 * Otherwise we may cause the remote site to stall on drbd_alloc_pages(),
561 * potentially causing a distributed deadlock on congestion during
562 * online-verify or (checksum-based) resync, if max-buffers,
563 * socket buffer sizes and resync rate settings are mis-configured. */
565 /* note that "number" is in units of "BM_BLOCK_SIZE" (which is 4k),
566 * mxb (as used here, and in drbd_alloc_pages on the peer) is
567 * "number of pages" (typically also 4k),
568 * but "rs_in_flight" is in "sectors" (512 Byte). */
569 if (mxb - device->rs_in_flight/8 < number)
570 number = mxb - device->rs_in_flight/8;
575 static int make_resync_request(struct drbd_device *const device, int cancel)
577 struct drbd_peer_device *const peer_device = first_peer_device(device);
578 struct drbd_connection *const connection = peer_device ? peer_device->connection : NULL;
581 const sector_t capacity = drbd_get_capacity(device->this_bdev);
583 int number, rollback_i, size;
584 int align, requeue = 0;
587 if (unlikely(cancel))
590 if (device->rs_total == 0) {
592 drbd_resync_finished(device);
596 if (!get_ldev(device)) {
597 /* Since we only need to access device->rsync a
598 get_ldev_if_state(device,D_FAILED) would be sufficient, but
599 to continue resync with a broken disk makes no sense at
601 drbd_err(device, "Disk broke down during resync!\n");
605 max_bio_size = queue_max_hw_sectors(device->rq_queue) << 9;
606 number = drbd_rs_number_requests(device);
610 for (i = 0; i < number; i++) {
611 /* Stop generating RS requests when half of the send buffer is filled,
612 * but notify TCP that we'd like to have more space. */
613 mutex_lock(&connection->data.mutex);
614 if (connection->data.socket) {
615 struct sock *sk = connection->data.socket->sk;
616 int queued = sk->sk_wmem_queued;
617 int sndbuf = sk->sk_sndbuf;
618 if (queued > sndbuf / 2) {
621 set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
625 mutex_unlock(&connection->data.mutex);
630 size = BM_BLOCK_SIZE;
631 bit = drbd_bm_find_next(device, device->bm_resync_fo);
633 if (bit == DRBD_END_OF_BITMAP) {
634 device->bm_resync_fo = drbd_bm_bits(device);
639 sector = BM_BIT_TO_SECT(bit);
641 if (drbd_try_rs_begin_io(device, sector)) {
642 device->bm_resync_fo = bit;
645 device->bm_resync_fo = bit + 1;
647 if (unlikely(drbd_bm_test_bit(device, bit) == 0)) {
648 drbd_rs_complete_io(device, sector);
652 #if DRBD_MAX_BIO_SIZE > BM_BLOCK_SIZE
653 /* try to find some adjacent bits.
654 * we stop if we have already the maximum req size.
656 * Additionally always align bigger requests, in order to
657 * be prepared for all stripe sizes of software RAIDs.
662 if (size + BM_BLOCK_SIZE > max_bio_size)
665 /* Be always aligned */
666 if (sector & ((1<<(align+3))-1))
669 /* do not cross extent boundaries */
670 if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
672 /* now, is it actually dirty, after all?
673 * caution, drbd_bm_test_bit is tri-state for some
674 * obscure reason; ( b == 0 ) would get the out-of-band
675 * only accidentally right because of the "oddly sized"
676 * adjustment below */
677 if (drbd_bm_test_bit(device, bit+1) != 1)
680 size += BM_BLOCK_SIZE;
681 if ((BM_BLOCK_SIZE << align) <= size)
685 /* if we merged some,
686 * reset the offset to start the next drbd_bm_find_next from */
687 if (size > BM_BLOCK_SIZE)
688 device->bm_resync_fo = bit + 1;
691 /* adjust very last sectors, in case we are oddly sized */
692 if (sector + (size>>9) > capacity)
693 size = (capacity-sector)<<9;
695 if (device->use_csums) {
696 switch (read_for_csum(peer_device, sector, size)) {
697 case -EIO: /* Disk failure */
700 case -EAGAIN: /* allocation failed, or ldev busy */
701 drbd_rs_complete_io(device, sector);
702 device->bm_resync_fo = BM_SECT_TO_BIT(sector);
714 inc_rs_pending(device);
715 err = drbd_send_drequest(peer_device, P_RS_DATA_REQUEST,
716 sector, size, ID_SYNCER);
718 drbd_err(device, "drbd_send_drequest() failed, aborting...\n");
719 dec_rs_pending(device);
726 if (device->bm_resync_fo >= drbd_bm_bits(device)) {
727 /* last syncer _request_ was sent,
728 * but the P_RS_DATA_REPLY not yet received. sync will end (and
729 * next sync group will resume), as soon as we receive the last
730 * resync data block, and the last bit is cleared.
731 * until then resync "work" is "inactive" ...
738 device->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
739 mod_timer(&device->resync_timer, jiffies + SLEEP_TIME);
744 static int make_ov_request(struct drbd_device *device, int cancel)
748 const sector_t capacity = drbd_get_capacity(device->this_bdev);
749 bool stop_sector_reached = false;
751 if (unlikely(cancel))
754 number = drbd_rs_number_requests(device);
756 sector = device->ov_position;
757 for (i = 0; i < number; i++) {
758 if (sector >= capacity)
761 /* We check for "finished" only in the reply path:
762 * w_e_end_ov_reply().
763 * We need to send at least one request out. */
764 stop_sector_reached = i > 0
765 && verify_can_do_stop_sector(device)
766 && sector >= device->ov_stop_sector;
767 if (stop_sector_reached)
770 size = BM_BLOCK_SIZE;
772 if (drbd_try_rs_begin_io(device, sector)) {
773 device->ov_position = sector;
777 if (sector + (size>>9) > capacity)
778 size = (capacity-sector)<<9;
780 inc_rs_pending(device);
781 if (drbd_send_ov_request(first_peer_device(device), sector, size)) {
782 dec_rs_pending(device);
785 sector += BM_SECT_PER_BIT;
787 device->ov_position = sector;
790 device->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
791 if (i == 0 || !stop_sector_reached)
792 mod_timer(&device->resync_timer, jiffies + SLEEP_TIME);
796 int w_ov_finished(struct drbd_work *w, int cancel)
798 struct drbd_device_work *dw =
799 container_of(w, struct drbd_device_work, w);
800 struct drbd_device *device = dw->device;
802 ov_out_of_sync_print(device);
803 drbd_resync_finished(device);
808 static int w_resync_finished(struct drbd_work *w, int cancel)
810 struct drbd_device_work *dw =
811 container_of(w, struct drbd_device_work, w);
812 struct drbd_device *device = dw->device;
815 drbd_resync_finished(device);
820 static void ping_peer(struct drbd_device *device)
822 struct drbd_connection *connection = first_peer_device(device)->connection;
824 clear_bit(GOT_PING_ACK, &connection->flags);
825 request_ping(connection);
826 wait_event(connection->ping_wait,
827 test_bit(GOT_PING_ACK, &connection->flags) || device->state.conn < C_CONNECTED);
830 int drbd_resync_finished(struct drbd_device *device)
832 unsigned long db, dt, dbdt;
834 union drbd_state os, ns;
835 struct drbd_device_work *dw;
836 char *khelper_cmd = NULL;
839 /* Remove all elements from the resync LRU. Since future actions
840 * might set bits in the (main) bitmap, then the entries in the
841 * resync LRU would be wrong. */
842 if (drbd_rs_del_all(device)) {
843 /* In case this is not possible now, most probably because
844 * there are P_RS_DATA_REPLY Packets lingering on the worker's
845 * queue (or even the read operations for those packets
846 * is not finished by now). Retry in 100ms. */
848 schedule_timeout_interruptible(HZ / 10);
849 dw = kmalloc(sizeof(struct drbd_device_work), GFP_ATOMIC);
851 dw->w.cb = w_resync_finished;
853 drbd_queue_work(&first_peer_device(device)->connection->sender_work,
857 drbd_err(device, "Warn failed to drbd_rs_del_all() and to kmalloc(dw).\n");
860 dt = (jiffies - device->rs_start - device->rs_paused) / HZ;
864 db = device->rs_total;
865 /* adjust for verify start and stop sectors, respective reached position */
866 if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T)
867 db -= device->ov_left;
869 dbdt = Bit2KB(db/dt);
870 device->rs_paused /= HZ;
872 if (!get_ldev(device))
877 spin_lock_irq(&device->resource->req_lock);
878 os = drbd_read_state(device);
880 verify_done = (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T);
882 /* This protects us against multiple calls (that can happen in the presence
883 of application IO), and against connectivity loss just before we arrive here. */
884 if (os.conn <= C_CONNECTED)
888 ns.conn = C_CONNECTED;
890 drbd_info(device, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
891 verify_done ? "Online verify" : "Resync",
892 dt + device->rs_paused, device->rs_paused, dbdt);
894 n_oos = drbd_bm_total_weight(device);
896 if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
898 drbd_alert(device, "Online verify found %lu %dk block out of sync!\n",
900 khelper_cmd = "out-of-sync";
903 D_ASSERT(device, (n_oos - device->rs_failed) == 0);
905 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
906 khelper_cmd = "after-resync-target";
908 if (device->use_csums && device->rs_total) {
909 const unsigned long s = device->rs_same_csum;
910 const unsigned long t = device->rs_total;
913 (t < 100000) ? ((s*100)/t) : (s/(t/100));
914 drbd_info(device, "%u %% had equal checksums, eliminated: %luK; "
915 "transferred %luK total %luK\n",
917 Bit2KB(device->rs_same_csum),
918 Bit2KB(device->rs_total - device->rs_same_csum),
919 Bit2KB(device->rs_total));
923 if (device->rs_failed) {
924 drbd_info(device, " %lu failed blocks\n", device->rs_failed);
926 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
927 ns.disk = D_INCONSISTENT;
928 ns.pdsk = D_UP_TO_DATE;
930 ns.disk = D_UP_TO_DATE;
931 ns.pdsk = D_INCONSISTENT;
934 ns.disk = D_UP_TO_DATE;
935 ns.pdsk = D_UP_TO_DATE;
937 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
938 if (device->p_uuid) {
940 for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
941 _drbd_uuid_set(device, i, device->p_uuid[i]);
942 drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_CURRENT]);
943 _drbd_uuid_set(device, UI_CURRENT, device->p_uuid[UI_CURRENT]);
945 drbd_err(device, "device->p_uuid is NULL! BUG\n");
949 if (!(os.conn == C_VERIFY_S || os.conn == C_VERIFY_T)) {
950 /* for verify runs, we don't update uuids here,
951 * so there would be nothing to report. */
952 drbd_uuid_set_bm(device, 0UL);
953 drbd_print_uuids(device, "updated UUIDs");
954 if (device->p_uuid) {
955 /* Now the two UUID sets are equal, update what we
956 * know of the peer. */
958 for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
959 device->p_uuid[i] = device->ldev->md.uuid[i];
964 _drbd_set_state(device, ns, CS_VERBOSE, NULL);
966 spin_unlock_irq(&device->resource->req_lock);
969 device->rs_total = 0;
970 device->rs_failed = 0;
971 device->rs_paused = 0;
973 /* reset start sector, if we reached end of device */
974 if (verify_done && device->ov_left == 0)
975 device->ov_start_sector = 0;
977 drbd_md_sync(device);
980 drbd_khelper(device, khelper_cmd);
986 static void move_to_net_ee_or_free(struct drbd_device *device, struct drbd_peer_request *peer_req)
988 if (drbd_peer_req_has_active_page(peer_req)) {
989 /* This might happen if sendpage() has not finished */
990 int i = (peer_req->i.size + PAGE_SIZE -1) >> PAGE_SHIFT;
991 atomic_add(i, &device->pp_in_use_by_net);
992 atomic_sub(i, &device->pp_in_use);
993 spin_lock_irq(&device->resource->req_lock);
994 list_add_tail(&peer_req->w.list, &device->net_ee);
995 spin_unlock_irq(&device->resource->req_lock);
996 wake_up(&drbd_pp_wait);
998 drbd_free_peer_req(device, peer_req);
1002 * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
1003 * @device: DRBD device.
1005 * @cancel: The connection will be closed anyways
1007 int w_e_end_data_req(struct drbd_work *w, int cancel)
1009 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1010 struct drbd_peer_device *peer_device = peer_req->peer_device;
1011 struct drbd_device *device = peer_device->device;
1014 if (unlikely(cancel)) {
1015 drbd_free_peer_req(device, peer_req);
1016 dec_unacked(device);
1020 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1021 err = drbd_send_block(peer_device, P_DATA_REPLY, peer_req);
1023 if (__ratelimit(&drbd_ratelimit_state))
1024 drbd_err(device, "Sending NegDReply. sector=%llus.\n",
1025 (unsigned long long)peer_req->i.sector);
1027 err = drbd_send_ack(peer_device, P_NEG_DREPLY, peer_req);
1030 dec_unacked(device);
1032 move_to_net_ee_or_free(device, peer_req);
1035 drbd_err(device, "drbd_send_block() failed\n");
1040 * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUEST
1042 * @cancel: The connection will be closed anyways
1044 int w_e_end_rsdata_req(struct drbd_work *w, int cancel)
1046 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1047 struct drbd_peer_device *peer_device = peer_req->peer_device;
1048 struct drbd_device *device = peer_device->device;
1051 if (unlikely(cancel)) {
1052 drbd_free_peer_req(device, peer_req);
1053 dec_unacked(device);
1057 if (get_ldev_if_state(device, D_FAILED)) {
1058 drbd_rs_complete_io(device, peer_req->i.sector);
1062 if (device->state.conn == C_AHEAD) {
1063 err = drbd_send_ack(peer_device, P_RS_CANCEL, peer_req);
1064 } else if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1065 if (likely(device->state.pdsk >= D_INCONSISTENT)) {
1066 inc_rs_pending(device);
1067 err = drbd_send_block(peer_device, P_RS_DATA_REPLY, peer_req);
1069 if (__ratelimit(&drbd_ratelimit_state))
1070 drbd_err(device, "Not sending RSDataReply, "
1071 "partner DISKLESS!\n");
1075 if (__ratelimit(&drbd_ratelimit_state))
1076 drbd_err(device, "Sending NegRSDReply. sector %llus.\n",
1077 (unsigned long long)peer_req->i.sector);
1079 err = drbd_send_ack(peer_device, P_NEG_RS_DREPLY, peer_req);
1081 /* update resync data with failure */
1082 drbd_rs_failed_io(device, peer_req->i.sector, peer_req->i.size);
1085 dec_unacked(device);
1087 move_to_net_ee_or_free(device, peer_req);
1090 drbd_err(device, "drbd_send_block() failed\n");
1094 int w_e_end_csum_rs_req(struct drbd_work *w, int cancel)
1096 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1097 struct drbd_peer_device *peer_device = peer_req->peer_device;
1098 struct drbd_device *device = peer_device->device;
1099 struct digest_info *di;
1101 void *digest = NULL;
1104 if (unlikely(cancel)) {
1105 drbd_free_peer_req(device, peer_req);
1106 dec_unacked(device);
1110 if (get_ldev(device)) {
1111 drbd_rs_complete_io(device, peer_req->i.sector);
1115 di = peer_req->digest;
1117 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1118 /* quick hack to try to avoid a race against reconfiguration.
1119 * a real fix would be much more involved,
1120 * introducing more locking mechanisms */
1121 if (peer_device->connection->csums_tfm) {
1122 digest_size = crypto_ahash_digestsize(peer_device->connection->csums_tfm);
1123 D_ASSERT(device, digest_size == di->digest_size);
1124 digest = kmalloc(digest_size, GFP_NOIO);
1127 drbd_csum_ee(peer_device->connection->csums_tfm, peer_req, digest);
1128 eq = !memcmp(digest, di->digest, digest_size);
1133 drbd_set_in_sync(device, peer_req->i.sector, peer_req->i.size);
1134 /* rs_same_csums unit is BM_BLOCK_SIZE */
1135 device->rs_same_csum += peer_req->i.size >> BM_BLOCK_SHIFT;
1136 err = drbd_send_ack(peer_device, P_RS_IS_IN_SYNC, peer_req);
1138 inc_rs_pending(device);
1139 peer_req->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */
1140 peer_req->flags &= ~EE_HAS_DIGEST; /* This peer request no longer has a digest pointer */
1142 err = drbd_send_block(peer_device, P_RS_DATA_REPLY, peer_req);
1145 err = drbd_send_ack(peer_device, P_NEG_RS_DREPLY, peer_req);
1146 if (__ratelimit(&drbd_ratelimit_state))
1147 drbd_err(device, "Sending NegDReply. I guess it gets messy.\n");
1150 dec_unacked(device);
1151 move_to_net_ee_or_free(device, peer_req);
1154 drbd_err(device, "drbd_send_block/ack() failed\n");
1158 int w_e_end_ov_req(struct drbd_work *w, int cancel)
1160 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1161 struct drbd_peer_device *peer_device = peer_req->peer_device;
1162 struct drbd_device *device = peer_device->device;
1163 sector_t sector = peer_req->i.sector;
1164 unsigned int size = peer_req->i.size;
1169 if (unlikely(cancel))
1172 digest_size = crypto_ahash_digestsize(peer_device->connection->verify_tfm);
1173 digest = kmalloc(digest_size, GFP_NOIO);
1175 err = 1; /* terminate the connection in case the allocation failed */
1179 if (likely(!(peer_req->flags & EE_WAS_ERROR)))
1180 drbd_csum_ee(peer_device->connection->verify_tfm, peer_req, digest);
1182 memset(digest, 0, digest_size);
1184 /* Free e and pages before send.
1185 * In case we block on congestion, we could otherwise run into
1186 * some distributed deadlock, if the other side blocks on
1187 * congestion as well, because our receiver blocks in
1188 * drbd_alloc_pages due to pp_in_use > max_buffers. */
1189 drbd_free_peer_req(device, peer_req);
1191 inc_rs_pending(device);
1192 err = drbd_send_drequest_csum(peer_device, sector, size, digest, digest_size, P_OV_REPLY);
1194 dec_rs_pending(device);
1199 drbd_free_peer_req(device, peer_req);
1200 dec_unacked(device);
1204 void drbd_ov_out_of_sync_found(struct drbd_device *device, sector_t sector, int size)
1206 if (device->ov_last_oos_start + device->ov_last_oos_size == sector) {
1207 device->ov_last_oos_size += size>>9;
1209 device->ov_last_oos_start = sector;
1210 device->ov_last_oos_size = size>>9;
1212 drbd_set_out_of_sync(device, sector, size);
1215 int w_e_end_ov_reply(struct drbd_work *w, int cancel)
1217 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1218 struct drbd_peer_device *peer_device = peer_req->peer_device;
1219 struct drbd_device *device = peer_device->device;
1220 struct digest_info *di;
1222 sector_t sector = peer_req->i.sector;
1223 unsigned int size = peer_req->i.size;
1226 bool stop_sector_reached = false;
1228 if (unlikely(cancel)) {
1229 drbd_free_peer_req(device, peer_req);
1230 dec_unacked(device);
1234 /* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1235 * the resync lru has been cleaned up already */
1236 if (get_ldev(device)) {
1237 drbd_rs_complete_io(device, peer_req->i.sector);
1241 di = peer_req->digest;
1243 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1244 digest_size = crypto_ahash_digestsize(peer_device->connection->verify_tfm);
1245 digest = kmalloc(digest_size, GFP_NOIO);
1247 drbd_csum_ee(peer_device->connection->verify_tfm, peer_req, digest);
1249 D_ASSERT(device, digest_size == di->digest_size);
1250 eq = !memcmp(digest, di->digest, digest_size);
1255 /* Free peer_req and pages before send.
1256 * In case we block on congestion, we could otherwise run into
1257 * some distributed deadlock, if the other side blocks on
1258 * congestion as well, because our receiver blocks in
1259 * drbd_alloc_pages due to pp_in_use > max_buffers. */
1260 drbd_free_peer_req(device, peer_req);
1262 drbd_ov_out_of_sync_found(device, sector, size);
1264 ov_out_of_sync_print(device);
1266 err = drbd_send_ack_ex(peer_device, P_OV_RESULT, sector, size,
1267 eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1269 dec_unacked(device);
1273 /* let's advance progress step marks only for every other megabyte */
1274 if ((device->ov_left & 0x200) == 0x200)
1275 drbd_advance_rs_marks(device, device->ov_left);
1277 stop_sector_reached = verify_can_do_stop_sector(device) &&
1278 (sector + (size>>9)) >= device->ov_stop_sector;
1280 if (device->ov_left == 0 || stop_sector_reached) {
1281 ov_out_of_sync_print(device);
1282 drbd_resync_finished(device);
1289 * We need to track the number of pending barrier acks,
1290 * and to be able to wait for them.
1291 * See also comment in drbd_adm_attach before drbd_suspend_io.
1293 static int drbd_send_barrier(struct drbd_connection *connection)
1295 struct p_barrier *p;
1296 struct drbd_socket *sock;
1298 sock = &connection->data;
1299 p = conn_prepare_command(connection, sock);
1302 p->barrier = connection->send.current_epoch_nr;
1304 connection->send.current_epoch_writes = 0;
1305 connection->send.last_sent_barrier_jif = jiffies;
1307 return conn_send_command(connection, sock, P_BARRIER, sizeof(*p), NULL, 0);
1310 int w_send_write_hint(struct drbd_work *w, int cancel)
1312 struct drbd_device *device =
1313 container_of(w, struct drbd_device, unplug_work);
1314 struct drbd_socket *sock;
1318 sock = &first_peer_device(device)->connection->data;
1319 if (!drbd_prepare_command(first_peer_device(device), sock))
1321 return drbd_send_command(first_peer_device(device), sock, P_UNPLUG_REMOTE, 0, NULL, 0);
1324 static void re_init_if_first_write(struct drbd_connection *connection, unsigned int epoch)
1326 if (!connection->send.seen_any_write_yet) {
1327 connection->send.seen_any_write_yet = true;
1328 connection->send.current_epoch_nr = epoch;
1329 connection->send.current_epoch_writes = 0;
1330 connection->send.last_sent_barrier_jif = jiffies;
1334 static void maybe_send_barrier(struct drbd_connection *connection, unsigned int epoch)
1336 /* re-init if first write on this connection */
1337 if (!connection->send.seen_any_write_yet)
1339 if (connection->send.current_epoch_nr != epoch) {
1340 if (connection->send.current_epoch_writes)
1341 drbd_send_barrier(connection);
1342 connection->send.current_epoch_nr = epoch;
1346 int w_send_out_of_sync(struct drbd_work *w, int cancel)
1348 struct drbd_request *req = container_of(w, struct drbd_request, w);
1349 struct drbd_device *device = req->device;
1350 struct drbd_peer_device *const peer_device = first_peer_device(device);
1351 struct drbd_connection *const connection = peer_device->connection;
1354 if (unlikely(cancel)) {
1355 req_mod(req, SEND_CANCELED);
1358 req->pre_send_jif = jiffies;
1360 /* this time, no connection->send.current_epoch_writes++;
1361 * If it was sent, it was the closing barrier for the last
1362 * replicated epoch, before we went into AHEAD mode.
1363 * No more barriers will be sent, until we leave AHEAD mode again. */
1364 maybe_send_barrier(connection, req->epoch);
1366 err = drbd_send_out_of_sync(peer_device, req);
1367 req_mod(req, OOS_HANDED_TO_NETWORK);
1373 * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1375 * @cancel: The connection will be closed anyways
1377 int w_send_dblock(struct drbd_work *w, int cancel)
1379 struct drbd_request *req = container_of(w, struct drbd_request, w);
1380 struct drbd_device *device = req->device;
1381 struct drbd_peer_device *const peer_device = first_peer_device(device);
1382 struct drbd_connection *connection = peer_device->connection;
1385 if (unlikely(cancel)) {
1386 req_mod(req, SEND_CANCELED);
1389 req->pre_send_jif = jiffies;
1391 re_init_if_first_write(connection, req->epoch);
1392 maybe_send_barrier(connection, req->epoch);
1393 connection->send.current_epoch_writes++;
1395 err = drbd_send_dblock(peer_device, req);
1396 req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
1402 * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1404 * @cancel: The connection will be closed anyways
1406 int w_send_read_req(struct drbd_work *w, int cancel)
1408 struct drbd_request *req = container_of(w, struct drbd_request, w);
1409 struct drbd_device *device = req->device;
1410 struct drbd_peer_device *const peer_device = first_peer_device(device);
1411 struct drbd_connection *connection = peer_device->connection;
1414 if (unlikely(cancel)) {
1415 req_mod(req, SEND_CANCELED);
1418 req->pre_send_jif = jiffies;
1420 /* Even read requests may close a write epoch,
1421 * if there was any yet. */
1422 maybe_send_barrier(connection, req->epoch);
1424 err = drbd_send_drequest(peer_device, P_DATA_REQUEST, req->i.sector, req->i.size,
1425 (unsigned long)req);
1427 req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
1432 int w_restart_disk_io(struct drbd_work *w, int cancel)
1434 struct drbd_request *req = container_of(w, struct drbd_request, w);
1435 struct drbd_device *device = req->device;
1437 if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG)
1438 drbd_al_begin_io(device, &req->i);
1440 drbd_req_make_private_bio(req, req->master_bio);
1441 req->private_bio->bi_bdev = device->ldev->backing_bdev;
1442 generic_make_request(req->private_bio);
1447 static int _drbd_may_sync_now(struct drbd_device *device)
1449 struct drbd_device *odev = device;
1453 if (!odev->ldev || odev->state.disk == D_DISKLESS)
1456 resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
1458 if (resync_after == -1)
1460 odev = minor_to_device(resync_after);
1463 if ((odev->state.conn >= C_SYNC_SOURCE &&
1464 odev->state.conn <= C_PAUSED_SYNC_T) ||
1465 odev->state.aftr_isp || odev->state.peer_isp ||
1466 odev->state.user_isp)
1472 * drbd_pause_after() - Pause resync on all devices that may not resync now
1473 * @device: DRBD device.
1475 * Called from process context only (admin command and after_state_ch).
1477 static bool drbd_pause_after(struct drbd_device *device)
1479 bool changed = false;
1480 struct drbd_device *odev;
1484 idr_for_each_entry(&drbd_devices, odev, i) {
1485 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1487 if (!_drbd_may_sync_now(odev) &&
1488 _drbd_set_state(_NS(odev, aftr_isp, 1),
1489 CS_HARD, NULL) != SS_NOTHING_TO_DO)
1498 * drbd_resume_next() - Resume resync on all devices that may resync now
1499 * @device: DRBD device.
1501 * Called from process context only (admin command and worker).
1503 static bool drbd_resume_next(struct drbd_device *device)
1505 bool changed = false;
1506 struct drbd_device *odev;
1510 idr_for_each_entry(&drbd_devices, odev, i) {
1511 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1513 if (odev->state.aftr_isp) {
1514 if (_drbd_may_sync_now(odev) &&
1515 _drbd_set_state(_NS(odev, aftr_isp, 0),
1516 CS_HARD, NULL) != SS_NOTHING_TO_DO)
1524 void resume_next_sg(struct drbd_device *device)
1526 lock_all_resources();
1527 drbd_resume_next(device);
1528 unlock_all_resources();
1531 void suspend_other_sg(struct drbd_device *device)
1533 lock_all_resources();
1534 drbd_pause_after(device);
1535 unlock_all_resources();
1538 /* caller must lock_all_resources() */
1539 enum drbd_ret_code drbd_resync_after_valid(struct drbd_device *device, int o_minor)
1541 struct drbd_device *odev;
1546 if (o_minor < -1 || o_minor > MINORMASK)
1547 return ERR_RESYNC_AFTER;
1549 /* check for loops */
1550 odev = minor_to_device(o_minor);
1553 return ERR_RESYNC_AFTER_CYCLE;
1555 /* You are free to depend on diskless, non-existing,
1556 * or not yet/no longer existing minors.
1557 * We only reject dependency loops.
1558 * We cannot follow the dependency chain beyond a detached or
1561 if (!odev || !odev->ldev || odev->state.disk == D_DISKLESS)
1565 resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
1567 /* dependency chain ends here, no cycles. */
1568 if (resync_after == -1)
1571 /* follow the dependency chain */
1572 odev = minor_to_device(resync_after);
1576 /* caller must lock_all_resources() */
1577 void drbd_resync_after_changed(struct drbd_device *device)
1582 changed = drbd_pause_after(device);
1583 changed |= drbd_resume_next(device);
1587 void drbd_rs_controller_reset(struct drbd_device *device)
1589 struct gendisk *disk = device->ldev->backing_bdev->bd_contains->bd_disk;
1590 struct fifo_buffer *plan;
1592 atomic_set(&device->rs_sect_in, 0);
1593 atomic_set(&device->rs_sect_ev, 0);
1594 device->rs_in_flight = 0;
1595 device->rs_last_events =
1596 (int)part_stat_read(&disk->part0, sectors[0]) +
1597 (int)part_stat_read(&disk->part0, sectors[1]);
1599 /* Updating the RCU protected object in place is necessary since
1600 this function gets called from atomic context.
1601 It is valid since all other updates also lead to an completely
1604 plan = rcu_dereference(device->rs_plan_s);
1610 void start_resync_timer_fn(unsigned long data)
1612 struct drbd_device *device = (struct drbd_device *) data;
1613 drbd_device_post_work(device, RS_START);
1616 static void do_start_resync(struct drbd_device *device)
1618 if (atomic_read(&device->unacked_cnt) || atomic_read(&device->rs_pending_cnt)) {
1619 drbd_warn(device, "postponing start_resync ...\n");
1620 device->start_resync_timer.expires = jiffies + HZ/10;
1621 add_timer(&device->start_resync_timer);
1625 drbd_start_resync(device, C_SYNC_SOURCE);
1626 clear_bit(AHEAD_TO_SYNC_SOURCE, &device->flags);
1629 static bool use_checksum_based_resync(struct drbd_connection *connection, struct drbd_device *device)
1631 bool csums_after_crash_only;
1633 csums_after_crash_only = rcu_dereference(connection->net_conf)->csums_after_crash_only;
1635 return connection->agreed_pro_version >= 89 && /* supported? */
1636 connection->csums_tfm && /* configured? */
1637 (csums_after_crash_only == 0 /* use for each resync? */
1638 || test_bit(CRASHED_PRIMARY, &device->flags)); /* or only after Primary crash? */
1642 * drbd_start_resync() - Start the resync process
1643 * @device: DRBD device.
1644 * @side: Either C_SYNC_SOURCE or C_SYNC_TARGET
1646 * This function might bring you directly into one of the
1647 * C_PAUSED_SYNC_* states.
1649 void drbd_start_resync(struct drbd_device *device, enum drbd_conns side)
1651 struct drbd_peer_device *peer_device = first_peer_device(device);
1652 struct drbd_connection *connection = peer_device ? peer_device->connection : NULL;
1653 union drbd_state ns;
1656 if (device->state.conn >= C_SYNC_SOURCE && device->state.conn < C_AHEAD) {
1657 drbd_err(device, "Resync already running!\n");
1661 if (!test_bit(B_RS_H_DONE, &device->flags)) {
1662 if (side == C_SYNC_TARGET) {
1663 /* Since application IO was locked out during C_WF_BITMAP_T and
1664 C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1665 we check that we might make the data inconsistent. */
1666 r = drbd_khelper(device, "before-resync-target");
1667 r = (r >> 8) & 0xff;
1669 drbd_info(device, "before-resync-target handler returned %d, "
1670 "dropping connection.\n", r);
1671 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
1674 } else /* C_SYNC_SOURCE */ {
1675 r = drbd_khelper(device, "before-resync-source");
1676 r = (r >> 8) & 0xff;
1679 drbd_info(device, "before-resync-source handler returned %d, "
1680 "ignoring. Old userland tools?", r);
1682 drbd_info(device, "before-resync-source handler returned %d, "
1683 "dropping connection.\n", r);
1684 conn_request_state(connection,
1685 NS(conn, C_DISCONNECTING), CS_HARD);
1692 if (current == connection->worker.task) {
1693 /* The worker should not sleep waiting for state_mutex,
1694 that can take long */
1695 if (!mutex_trylock(device->state_mutex)) {
1696 set_bit(B_RS_H_DONE, &device->flags);
1697 device->start_resync_timer.expires = jiffies + HZ/5;
1698 add_timer(&device->start_resync_timer);
1702 mutex_lock(device->state_mutex);
1705 lock_all_resources();
1706 clear_bit(B_RS_H_DONE, &device->flags);
1707 /* Did some connection breakage or IO error race with us? */
1708 if (device->state.conn < C_CONNECTED
1709 || !get_ldev_if_state(device, D_NEGOTIATING)) {
1710 unlock_all_resources();
1714 ns = drbd_read_state(device);
1716 ns.aftr_isp = !_drbd_may_sync_now(device);
1720 if (side == C_SYNC_TARGET)
1721 ns.disk = D_INCONSISTENT;
1722 else /* side == C_SYNC_SOURCE */
1723 ns.pdsk = D_INCONSISTENT;
1725 r = _drbd_set_state(device, ns, CS_VERBOSE, NULL);
1726 ns = drbd_read_state(device);
1728 if (ns.conn < C_CONNECTED)
1729 r = SS_UNKNOWN_ERROR;
1731 if (r == SS_SUCCESS) {
1732 unsigned long tw = drbd_bm_total_weight(device);
1733 unsigned long now = jiffies;
1736 device->rs_failed = 0;
1737 device->rs_paused = 0;
1738 device->rs_same_csum = 0;
1739 device->rs_last_sect_ev = 0;
1740 device->rs_total = tw;
1741 device->rs_start = now;
1742 for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1743 device->rs_mark_left[i] = tw;
1744 device->rs_mark_time[i] = now;
1746 drbd_pause_after(device);
1747 /* Forget potentially stale cached per resync extent bit-counts.
1748 * Open coded drbd_rs_cancel_all(device), we already have IRQs
1749 * disabled, and know the disk state is ok. */
1750 spin_lock(&device->al_lock);
1751 lc_reset(device->resync);
1752 device->resync_locked = 0;
1753 device->resync_wenr = LC_FREE;
1754 spin_unlock(&device->al_lock);
1756 unlock_all_resources();
1758 if (r == SS_SUCCESS) {
1759 wake_up(&device->al_wait); /* for lc_reset() above */
1760 /* reset rs_last_bcast when a resync or verify is started,
1761 * to deal with potential jiffies wrap. */
1762 device->rs_last_bcast = jiffies - HZ;
1764 drbd_info(device, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1765 drbd_conn_str(ns.conn),
1766 (unsigned long) device->rs_total << (BM_BLOCK_SHIFT-10),
1767 (unsigned long) device->rs_total);
1768 if (side == C_SYNC_TARGET) {
1769 device->bm_resync_fo = 0;
1770 device->use_csums = use_checksum_based_resync(connection, device);
1772 device->use_csums = 0;
1775 /* Since protocol 96, we must serialize drbd_gen_and_send_sync_uuid
1776 * with w_send_oos, or the sync target will get confused as to
1777 * how much bits to resync. We cannot do that always, because for an
1778 * empty resync and protocol < 95, we need to do it here, as we call
1779 * drbd_resync_finished from here in that case.
1780 * We drbd_gen_and_send_sync_uuid here for protocol < 96,
1781 * and from after_state_ch otherwise. */
1782 if (side == C_SYNC_SOURCE && connection->agreed_pro_version < 96)
1783 drbd_gen_and_send_sync_uuid(peer_device);
1785 if (connection->agreed_pro_version < 95 && device->rs_total == 0) {
1786 /* This still has a race (about when exactly the peers
1787 * detect connection loss) that can lead to a full sync
1788 * on next handshake. In 8.3.9 we fixed this with explicit
1789 * resync-finished notifications, but the fix
1790 * introduces a protocol change. Sleeping for some
1791 * time longer than the ping interval + timeout on the
1792 * SyncSource, to give the SyncTarget the chance to
1793 * detect connection loss, then waiting for a ping
1794 * response (implicit in drbd_resync_finished) reduces
1795 * the race considerably, but does not solve it. */
1796 if (side == C_SYNC_SOURCE) {
1797 struct net_conf *nc;
1801 nc = rcu_dereference(connection->net_conf);
1802 timeo = nc->ping_int * HZ + nc->ping_timeo * HZ / 9;
1804 schedule_timeout_interruptible(timeo);
1806 drbd_resync_finished(device);
1809 drbd_rs_controller_reset(device);
1810 /* ns.conn may already be != device->state.conn,
1811 * we may have been paused in between, or become paused until
1812 * the timer triggers.
1813 * No matter, that is handled in resync_timer_fn() */
1814 if (ns.conn == C_SYNC_TARGET)
1815 mod_timer(&device->resync_timer, jiffies);
1817 drbd_md_sync(device);
1821 mutex_unlock(device->state_mutex);
1824 static void update_on_disk_bitmap(struct drbd_device *device, bool resync_done)
1826 struct sib_info sib = { .sib_reason = SIB_SYNC_PROGRESS, };
1827 device->rs_last_bcast = jiffies;
1829 if (!get_ldev(device))
1832 drbd_bm_write_lazy(device, 0);
1833 if (resync_done && is_sync_state(device->state.conn))
1834 drbd_resync_finished(device);
1836 drbd_bcast_event(device, &sib);
1837 /* update timestamp, in case it took a while to write out stuff */
1838 device->rs_last_bcast = jiffies;
1842 static void drbd_ldev_destroy(struct drbd_device *device)
1844 lc_destroy(device->resync);
1845 device->resync = NULL;
1846 lc_destroy(device->act_log);
1847 device->act_log = NULL;
1850 drbd_backing_dev_free(device, device->ldev);
1851 device->ldev = NULL;
1854 clear_bit(GOING_DISKLESS, &device->flags);
1855 wake_up(&device->misc_wait);
1858 static void go_diskless(struct drbd_device *device)
1860 D_ASSERT(device, device->state.disk == D_FAILED);
1861 /* we cannot assert local_cnt == 0 here, as get_ldev_if_state will
1862 * inc/dec it frequently. Once we are D_DISKLESS, no one will touch
1863 * the protected members anymore, though, so once put_ldev reaches zero
1864 * again, it will be safe to free them. */
1866 /* Try to write changed bitmap pages, read errors may have just
1867 * set some bits outside the area covered by the activity log.
1869 * If we have an IO error during the bitmap writeout,
1870 * we will want a full sync next time, just in case.
1871 * (Do we want a specific meta data flag for this?)
1873 * If that does not make it to stable storage either,
1874 * we cannot do anything about that anymore.
1876 * We still need to check if both bitmap and ldev are present, we may
1877 * end up here after a failed attach, before ldev was even assigned.
1879 if (device->bitmap && device->ldev) {
1880 /* An interrupted resync or similar is allowed to recounts bits
1882 * Any modifications would not be expected anymore, though.
1884 if (drbd_bitmap_io_from_worker(device, drbd_bm_write,
1885 "detach", BM_LOCKED_TEST_ALLOWED)) {
1886 if (test_bit(WAS_READ_ERROR, &device->flags)) {
1887 drbd_md_set_flag(device, MDF_FULL_SYNC);
1888 drbd_md_sync(device);
1893 drbd_force_state(device, NS(disk, D_DISKLESS));
1896 static int do_md_sync(struct drbd_device *device)
1898 drbd_warn(device, "md_sync_timer expired! Worker calls drbd_md_sync().\n");
1899 drbd_md_sync(device);
1903 /* only called from drbd_worker thread, no locking */
1904 void __update_timing_details(
1905 struct drbd_thread_timing_details *tdp,
1906 unsigned int *cb_nr,
1908 const char *fn, const unsigned int line)
1910 unsigned int i = *cb_nr % DRBD_THREAD_DETAILS_HIST;
1911 struct drbd_thread_timing_details *td = tdp + i;
1913 td->start_jif = jiffies;
1919 i = (i+1) % DRBD_THREAD_DETAILS_HIST;
1921 memset(td, 0, sizeof(*td));
1926 static void do_device_work(struct drbd_device *device, const unsigned long todo)
1928 if (test_bit(MD_SYNC, &todo))
1930 if (test_bit(RS_DONE, &todo) ||
1931 test_bit(RS_PROGRESS, &todo))
1932 update_on_disk_bitmap(device, test_bit(RS_DONE, &todo));
1933 if (test_bit(GO_DISKLESS, &todo))
1934 go_diskless(device);
1935 if (test_bit(DESTROY_DISK, &todo))
1936 drbd_ldev_destroy(device);
1937 if (test_bit(RS_START, &todo))
1938 do_start_resync(device);
1941 #define DRBD_DEVICE_WORK_MASK \
1942 ((1UL << GO_DISKLESS) \
1943 |(1UL << DESTROY_DISK) \
1945 |(1UL << RS_START) \
1946 |(1UL << RS_PROGRESS) \
1950 static unsigned long get_work_bits(unsigned long *flags)
1952 unsigned long old, new;
1955 new = old & ~DRBD_DEVICE_WORK_MASK;
1956 } while (cmpxchg(flags, old, new) != old);
1957 return old & DRBD_DEVICE_WORK_MASK;
1960 static void do_unqueued_work(struct drbd_connection *connection)
1962 struct drbd_peer_device *peer_device;
1966 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1967 struct drbd_device *device = peer_device->device;
1968 unsigned long todo = get_work_bits(&device->flags);
1972 kref_get(&device->kref);
1974 do_device_work(device, todo);
1975 kref_put(&device->kref, drbd_destroy_device);
1981 static bool dequeue_work_batch(struct drbd_work_queue *queue, struct list_head *work_list)
1983 spin_lock_irq(&queue->q_lock);
1984 list_splice_tail_init(&queue->q, work_list);
1985 spin_unlock_irq(&queue->q_lock);
1986 return !list_empty(work_list);
1989 static void wait_for_work(struct drbd_connection *connection, struct list_head *work_list)
1992 struct net_conf *nc;
1995 dequeue_work_batch(&connection->sender_work, work_list);
1996 if (!list_empty(work_list))
1999 /* Still nothing to do?
2000 * Maybe we still need to close the current epoch,
2001 * even if no new requests are queued yet.
2003 * Also, poke TCP, just in case.
2004 * Then wait for new work (or signal). */
2006 nc = rcu_dereference(connection->net_conf);
2007 uncork = nc ? nc->tcp_cork : 0;
2010 mutex_lock(&connection->data.mutex);
2011 if (connection->data.socket)
2012 drbd_tcp_uncork(connection->data.socket);
2013 mutex_unlock(&connection->data.mutex);
2018 prepare_to_wait(&connection->sender_work.q_wait, &wait, TASK_INTERRUPTIBLE);
2019 spin_lock_irq(&connection->resource->req_lock);
2020 spin_lock(&connection->sender_work.q_lock); /* FIXME get rid of this one? */
2021 if (!list_empty(&connection->sender_work.q))
2022 list_splice_tail_init(&connection->sender_work.q, work_list);
2023 spin_unlock(&connection->sender_work.q_lock); /* FIXME get rid of this one? */
2024 if (!list_empty(work_list) || signal_pending(current)) {
2025 spin_unlock_irq(&connection->resource->req_lock);
2029 /* We found nothing new to do, no to-be-communicated request,
2030 * no other work item. We may still need to close the last
2031 * epoch. Next incoming request epoch will be connection ->
2032 * current transfer log epoch number. If that is different
2033 * from the epoch of the last request we communicated, it is
2034 * safe to send the epoch separating barrier now.
2037 atomic_read(&connection->current_tle_nr) !=
2038 connection->send.current_epoch_nr;
2039 spin_unlock_irq(&connection->resource->req_lock);
2042 maybe_send_barrier(connection,
2043 connection->send.current_epoch_nr + 1);
2045 if (test_bit(DEVICE_WORK_PENDING, &connection->flags))
2048 /* drbd_send() may have called flush_signals() */
2049 if (get_t_state(&connection->worker) != RUNNING)
2053 /* may be woken up for other things but new work, too,
2054 * e.g. if the current epoch got closed.
2055 * In which case we send the barrier above. */
2057 finish_wait(&connection->sender_work.q_wait, &wait);
2059 /* someone may have changed the config while we have been waiting above. */
2061 nc = rcu_dereference(connection->net_conf);
2062 cork = nc ? nc->tcp_cork : 0;
2064 mutex_lock(&connection->data.mutex);
2065 if (connection->data.socket) {
2067 drbd_tcp_cork(connection->data.socket);
2069 drbd_tcp_uncork(connection->data.socket);
2071 mutex_unlock(&connection->data.mutex);
2074 int drbd_worker(struct drbd_thread *thi)
2076 struct drbd_connection *connection = thi->connection;
2077 struct drbd_work *w = NULL;
2078 struct drbd_peer_device *peer_device;
2079 LIST_HEAD(work_list);
2082 while (get_t_state(thi) == RUNNING) {
2083 drbd_thread_current_set_cpu(thi);
2085 if (list_empty(&work_list)) {
2086 update_worker_timing_details(connection, wait_for_work);
2087 wait_for_work(connection, &work_list);
2090 if (test_and_clear_bit(DEVICE_WORK_PENDING, &connection->flags)) {
2091 update_worker_timing_details(connection, do_unqueued_work);
2092 do_unqueued_work(connection);
2095 if (signal_pending(current)) {
2096 flush_signals(current);
2097 if (get_t_state(thi) == RUNNING) {
2098 drbd_warn(connection, "Worker got an unexpected signal\n");
2104 if (get_t_state(thi) != RUNNING)
2107 if (!list_empty(&work_list)) {
2108 w = list_first_entry(&work_list, struct drbd_work, list);
2109 list_del_init(&w->list);
2110 update_worker_timing_details(connection, w->cb);
2111 if (w->cb(w, connection->cstate < C_WF_REPORT_PARAMS) == 0)
2113 if (connection->cstate >= C_WF_REPORT_PARAMS)
2114 conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
2119 if (test_and_clear_bit(DEVICE_WORK_PENDING, &connection->flags)) {
2120 update_worker_timing_details(connection, do_unqueued_work);
2121 do_unqueued_work(connection);
2123 if (!list_empty(&work_list)) {
2124 w = list_first_entry(&work_list, struct drbd_work, list);
2125 list_del_init(&w->list);
2126 update_worker_timing_details(connection, w->cb);
2129 dequeue_work_batch(&connection->sender_work, &work_list);
2130 } while (!list_empty(&work_list) || test_bit(DEVICE_WORK_PENDING, &connection->flags));
2133 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
2134 struct drbd_device *device = peer_device->device;
2135 D_ASSERT(device, device->state.disk == D_DISKLESS && device->state.conn == C_STANDALONE);
2136 kref_get(&device->kref);
2138 drbd_device_cleanup(device);
2139 kref_put(&device->kref, drbd_destroy_device);