]> git.kernelconcepts.de Git - karo-tx-linux.git/blob - drivers/block/drbd/drbd_worker.c
Merge branch 'for-4.8/core' of git://git.kernel.dk/linux-block
[karo-tx-linux.git] / drivers / block / drbd / drbd_worker.c
1 /*
2    drbd_worker.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23
24 */
25
26 #include <linux/module.h>
27 #include <linux/drbd.h>
28 #include <linux/sched.h>
29 #include <linux/wait.h>
30 #include <linux/mm.h>
31 #include <linux/memcontrol.h>
32 #include <linux/mm_inline.h>
33 #include <linux/slab.h>
34 #include <linux/random.h>
35 #include <linux/string.h>
36 #include <linux/scatterlist.h>
37
38 #include "drbd_int.h"
39 #include "drbd_protocol.h"
40 #include "drbd_req.h"
41
42 static int make_ov_request(struct drbd_device *, int);
43 static int make_resync_request(struct drbd_device *, int);
44
45 /* endio handlers:
46  *   drbd_md_endio (defined here)
47  *   drbd_request_endio (defined here)
48  *   drbd_peer_request_endio (defined here)
49  *   drbd_bm_endio (defined in drbd_bitmap.c)
50  *
51  * For all these callbacks, note the following:
52  * The callbacks will be called in irq context by the IDE drivers,
53  * and in Softirqs/Tasklets/BH context by the SCSI drivers.
54  * Try to get the locking right :)
55  *
56  */
57
58 /* used for synchronous meta data and bitmap IO
59  * submitted by drbd_md_sync_page_io()
60  */
61 void drbd_md_endio(struct bio *bio)
62 {
63         struct drbd_device *device;
64
65         device = bio->bi_private;
66         device->md_io.error = bio->bi_error;
67
68         /* We grabbed an extra reference in _drbd_md_sync_page_io() to be able
69          * to timeout on the lower level device, and eventually detach from it.
70          * If this io completion runs after that timeout expired, this
71          * drbd_md_put_buffer() may allow us to finally try and re-attach.
72          * During normal operation, this only puts that extra reference
73          * down to 1 again.
74          * Make sure we first drop the reference, and only then signal
75          * completion, or we may (in drbd_al_read_log()) cycle so fast into the
76          * next drbd_md_sync_page_io(), that we trigger the
77          * ASSERT(atomic_read(&device->md_io_in_use) == 1) there.
78          */
79         drbd_md_put_buffer(device);
80         device->md_io.done = 1;
81         wake_up(&device->misc_wait);
82         bio_put(bio);
83         if (device->ldev) /* special case: drbd_md_read() during drbd_adm_attach() */
84                 put_ldev(device);
85 }
86
87 /* reads on behalf of the partner,
88  * "submitted" by the receiver
89  */
90 static void drbd_endio_read_sec_final(struct drbd_peer_request *peer_req) __releases(local)
91 {
92         unsigned long flags = 0;
93         struct drbd_peer_device *peer_device = peer_req->peer_device;
94         struct drbd_device *device = peer_device->device;
95
96         spin_lock_irqsave(&device->resource->req_lock, flags);
97         device->read_cnt += peer_req->i.size >> 9;
98         list_del(&peer_req->w.list);
99         if (list_empty(&device->read_ee))
100                 wake_up(&device->ee_wait);
101         if (test_bit(__EE_WAS_ERROR, &peer_req->flags))
102                 __drbd_chk_io_error(device, DRBD_READ_ERROR);
103         spin_unlock_irqrestore(&device->resource->req_lock, flags);
104
105         drbd_queue_work(&peer_device->connection->sender_work, &peer_req->w);
106         put_ldev(device);
107 }
108
109 /* writes on behalf of the partner, or resync writes,
110  * "submitted" by the receiver, final stage.  */
111 void drbd_endio_write_sec_final(struct drbd_peer_request *peer_req) __releases(local)
112 {
113         unsigned long flags = 0;
114         struct drbd_peer_device *peer_device = peer_req->peer_device;
115         struct drbd_device *device = peer_device->device;
116         struct drbd_connection *connection = peer_device->connection;
117         struct drbd_interval i;
118         int do_wake;
119         u64 block_id;
120         int do_al_complete_io;
121
122         /* after we moved peer_req to done_ee,
123          * we may no longer access it,
124          * it may be freed/reused already!
125          * (as soon as we release the req_lock) */
126         i = peer_req->i;
127         do_al_complete_io = peer_req->flags & EE_CALL_AL_COMPLETE_IO;
128         block_id = peer_req->block_id;
129         peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
130
131         spin_lock_irqsave(&device->resource->req_lock, flags);
132         device->writ_cnt += peer_req->i.size >> 9;
133         list_move_tail(&peer_req->w.list, &device->done_ee);
134
135         /*
136          * Do not remove from the write_requests tree here: we did not send the
137          * Ack yet and did not wake possibly waiting conflicting requests.
138          * Removed from the tree from "drbd_process_done_ee" within the
139          * appropriate dw.cb (e_end_block/e_end_resync_block) or from
140          * _drbd_clear_done_ee.
141          */
142
143         do_wake = list_empty(block_id == ID_SYNCER ? &device->sync_ee : &device->active_ee);
144
145         /* FIXME do we want to detach for failed REQ_DISCARD?
146          * ((peer_req->flags & (EE_WAS_ERROR|EE_IS_TRIM)) == EE_WAS_ERROR) */
147         if (peer_req->flags & EE_WAS_ERROR)
148                 __drbd_chk_io_error(device, DRBD_WRITE_ERROR);
149
150         if (connection->cstate >= C_WF_REPORT_PARAMS) {
151                 kref_get(&device->kref); /* put is in drbd_send_acks_wf() */
152                 if (!queue_work(connection->ack_sender, &peer_device->send_acks_work))
153                         kref_put(&device->kref, drbd_destroy_device);
154         }
155         spin_unlock_irqrestore(&device->resource->req_lock, flags);
156
157         if (block_id == ID_SYNCER)
158                 drbd_rs_complete_io(device, i.sector);
159
160         if (do_wake)
161                 wake_up(&device->ee_wait);
162
163         if (do_al_complete_io)
164                 drbd_al_complete_io(device, &i);
165
166         put_ldev(device);
167 }
168
169 /* writes on behalf of the partner, or resync writes,
170  * "submitted" by the receiver.
171  */
172 void drbd_peer_request_endio(struct bio *bio)
173 {
174         struct drbd_peer_request *peer_req = bio->bi_private;
175         struct drbd_device *device = peer_req->peer_device->device;
176         int is_write = bio_data_dir(bio) == WRITE;
177         int is_discard = !!(bio_op(bio) == REQ_OP_DISCARD);
178
179         if (bio->bi_error && __ratelimit(&drbd_ratelimit_state))
180                 drbd_warn(device, "%s: error=%d s=%llus\n",
181                                 is_write ? (is_discard ? "discard" : "write")
182                                         : "read", bio->bi_error,
183                                 (unsigned long long)peer_req->i.sector);
184
185         if (bio->bi_error)
186                 set_bit(__EE_WAS_ERROR, &peer_req->flags);
187
188         bio_put(bio); /* no need for the bio anymore */
189         if (atomic_dec_and_test(&peer_req->pending_bios)) {
190                 if (is_write)
191                         drbd_endio_write_sec_final(peer_req);
192                 else
193                         drbd_endio_read_sec_final(peer_req);
194         }
195 }
196
197 void drbd_panic_after_delayed_completion_of_aborted_request(struct drbd_device *device)
198 {
199         panic("drbd%u %s/%u potential random memory corruption caused by delayed completion of aborted local request\n",
200                 device->minor, device->resource->name, device->vnr);
201 }
202
203 /* read, readA or write requests on R_PRIMARY coming from drbd_make_request
204  */
205 void drbd_request_endio(struct bio *bio)
206 {
207         unsigned long flags;
208         struct drbd_request *req = bio->bi_private;
209         struct drbd_device *device = req->device;
210         struct bio_and_error m;
211         enum drbd_req_event what;
212
213         /* If this request was aborted locally before,
214          * but now was completed "successfully",
215          * chances are that this caused arbitrary data corruption.
216          *
217          * "aborting" requests, or force-detaching the disk, is intended for
218          * completely blocked/hung local backing devices which do no longer
219          * complete requests at all, not even do error completions.  In this
220          * situation, usually a hard-reset and failover is the only way out.
221          *
222          * By "aborting", basically faking a local error-completion,
223          * we allow for a more graceful swichover by cleanly migrating services.
224          * Still the affected node has to be rebooted "soon".
225          *
226          * By completing these requests, we allow the upper layers to re-use
227          * the associated data pages.
228          *
229          * If later the local backing device "recovers", and now DMAs some data
230          * from disk into the original request pages, in the best case it will
231          * just put random data into unused pages; but typically it will corrupt
232          * meanwhile completely unrelated data, causing all sorts of damage.
233          *
234          * Which means delayed successful completion,
235          * especially for READ requests,
236          * is a reason to panic().
237          *
238          * We assume that a delayed *error* completion is OK,
239          * though we still will complain noisily about it.
240          */
241         if (unlikely(req->rq_state & RQ_LOCAL_ABORTED)) {
242                 if (__ratelimit(&drbd_ratelimit_state))
243                         drbd_emerg(device, "delayed completion of aborted local request; disk-timeout may be too aggressive\n");
244
245                 if (!bio->bi_error)
246                         drbd_panic_after_delayed_completion_of_aborted_request(device);
247         }
248
249         /* to avoid recursion in __req_mod */
250         if (unlikely(bio->bi_error)) {
251                 if (bio_op(bio) == REQ_OP_DISCARD)
252                         what = (bio->bi_error == -EOPNOTSUPP)
253                                 ? DISCARD_COMPLETED_NOTSUPP
254                                 : DISCARD_COMPLETED_WITH_ERROR;
255                 else
256                         what = (bio_data_dir(bio) == WRITE)
257                         ? WRITE_COMPLETED_WITH_ERROR
258                         : (bio_rw(bio) == READ)
259                           ? READ_COMPLETED_WITH_ERROR
260                           : READ_AHEAD_COMPLETED_WITH_ERROR;
261         } else
262                 what = COMPLETED_OK;
263
264         bio_put(req->private_bio);
265         req->private_bio = ERR_PTR(bio->bi_error);
266
267         /* not req_mod(), we need irqsave here! */
268         spin_lock_irqsave(&device->resource->req_lock, flags);
269         __req_mod(req, what, &m);
270         spin_unlock_irqrestore(&device->resource->req_lock, flags);
271         put_ldev(device);
272
273         if (m.bio)
274                 complete_master_bio(device, &m);
275 }
276
277 void drbd_csum_ee(struct crypto_ahash *tfm, struct drbd_peer_request *peer_req, void *digest)
278 {
279         AHASH_REQUEST_ON_STACK(req, tfm);
280         struct scatterlist sg;
281         struct page *page = peer_req->pages;
282         struct page *tmp;
283         unsigned len;
284
285         ahash_request_set_tfm(req, tfm);
286         ahash_request_set_callback(req, 0, NULL, NULL);
287
288         sg_init_table(&sg, 1);
289         crypto_ahash_init(req);
290
291         while ((tmp = page_chain_next(page))) {
292                 /* all but the last page will be fully used */
293                 sg_set_page(&sg, page, PAGE_SIZE, 0);
294                 ahash_request_set_crypt(req, &sg, NULL, sg.length);
295                 crypto_ahash_update(req);
296                 page = tmp;
297         }
298         /* and now the last, possibly only partially used page */
299         len = peer_req->i.size & (PAGE_SIZE - 1);
300         sg_set_page(&sg, page, len ?: PAGE_SIZE, 0);
301         ahash_request_set_crypt(req, &sg, digest, sg.length);
302         crypto_ahash_finup(req);
303         ahash_request_zero(req);
304 }
305
306 void drbd_csum_bio(struct crypto_ahash *tfm, struct bio *bio, void *digest)
307 {
308         AHASH_REQUEST_ON_STACK(req, tfm);
309         struct scatterlist sg;
310         struct bio_vec bvec;
311         struct bvec_iter iter;
312
313         ahash_request_set_tfm(req, tfm);
314         ahash_request_set_callback(req, 0, NULL, NULL);
315
316         sg_init_table(&sg, 1);
317         crypto_ahash_init(req);
318
319         bio_for_each_segment(bvec, bio, iter) {
320                 sg_set_page(&sg, bvec.bv_page, bvec.bv_len, bvec.bv_offset);
321                 ahash_request_set_crypt(req, &sg, NULL, sg.length);
322                 crypto_ahash_update(req);
323         }
324         ahash_request_set_crypt(req, NULL, digest, 0);
325         crypto_ahash_final(req);
326         ahash_request_zero(req);
327 }
328
329 /* MAYBE merge common code with w_e_end_ov_req */
330 static int w_e_send_csum(struct drbd_work *w, int cancel)
331 {
332         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
333         struct drbd_peer_device *peer_device = peer_req->peer_device;
334         struct drbd_device *device = peer_device->device;
335         int digest_size;
336         void *digest;
337         int err = 0;
338
339         if (unlikely(cancel))
340                 goto out;
341
342         if (unlikely((peer_req->flags & EE_WAS_ERROR) != 0))
343                 goto out;
344
345         digest_size = crypto_ahash_digestsize(peer_device->connection->csums_tfm);
346         digest = kmalloc(digest_size, GFP_NOIO);
347         if (digest) {
348                 sector_t sector = peer_req->i.sector;
349                 unsigned int size = peer_req->i.size;
350                 drbd_csum_ee(peer_device->connection->csums_tfm, peer_req, digest);
351                 /* Free peer_req and pages before send.
352                  * In case we block on congestion, we could otherwise run into
353                  * some distributed deadlock, if the other side blocks on
354                  * congestion as well, because our receiver blocks in
355                  * drbd_alloc_pages due to pp_in_use > max_buffers. */
356                 drbd_free_peer_req(device, peer_req);
357                 peer_req = NULL;
358                 inc_rs_pending(device);
359                 err = drbd_send_drequest_csum(peer_device, sector, size,
360                                               digest, digest_size,
361                                               P_CSUM_RS_REQUEST);
362                 kfree(digest);
363         } else {
364                 drbd_err(device, "kmalloc() of digest failed.\n");
365                 err = -ENOMEM;
366         }
367
368 out:
369         if (peer_req)
370                 drbd_free_peer_req(device, peer_req);
371
372         if (unlikely(err))
373                 drbd_err(device, "drbd_send_drequest(..., csum) failed\n");
374         return err;
375 }
376
377 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
378
379 static int read_for_csum(struct drbd_peer_device *peer_device, sector_t sector, int size)
380 {
381         struct drbd_device *device = peer_device->device;
382         struct drbd_peer_request *peer_req;
383
384         if (!get_ldev(device))
385                 return -EIO;
386
387         /* GFP_TRY, because if there is no memory available right now, this may
388          * be rescheduled for later. It is "only" background resync, after all. */
389         peer_req = drbd_alloc_peer_req(peer_device, ID_SYNCER /* unused */, sector,
390                                        size, true /* has real payload */, GFP_TRY);
391         if (!peer_req)
392                 goto defer;
393
394         peer_req->w.cb = w_e_send_csum;
395         spin_lock_irq(&device->resource->req_lock);
396         list_add_tail(&peer_req->w.list, &device->read_ee);
397         spin_unlock_irq(&device->resource->req_lock);
398
399         atomic_add(size >> 9, &device->rs_sect_ev);
400         if (drbd_submit_peer_request(device, peer_req, REQ_OP_READ, 0,
401                                      DRBD_FAULT_RS_RD) == 0)
402                 return 0;
403
404         /* If it failed because of ENOMEM, retry should help.  If it failed
405          * because bio_add_page failed (probably broken lower level driver),
406          * retry may or may not help.
407          * If it does not, you may need to force disconnect. */
408         spin_lock_irq(&device->resource->req_lock);
409         list_del(&peer_req->w.list);
410         spin_unlock_irq(&device->resource->req_lock);
411
412         drbd_free_peer_req(device, peer_req);
413 defer:
414         put_ldev(device);
415         return -EAGAIN;
416 }
417
418 int w_resync_timer(struct drbd_work *w, int cancel)
419 {
420         struct drbd_device *device =
421                 container_of(w, struct drbd_device, resync_work);
422
423         switch (device->state.conn) {
424         case C_VERIFY_S:
425                 make_ov_request(device, cancel);
426                 break;
427         case C_SYNC_TARGET:
428                 make_resync_request(device, cancel);
429                 break;
430         }
431
432         return 0;
433 }
434
435 void resync_timer_fn(unsigned long data)
436 {
437         struct drbd_device *device = (struct drbd_device *) data;
438
439         drbd_queue_work_if_unqueued(
440                 &first_peer_device(device)->connection->sender_work,
441                 &device->resync_work);
442 }
443
444 static void fifo_set(struct fifo_buffer *fb, int value)
445 {
446         int i;
447
448         for (i = 0; i < fb->size; i++)
449                 fb->values[i] = value;
450 }
451
452 static int fifo_push(struct fifo_buffer *fb, int value)
453 {
454         int ov;
455
456         ov = fb->values[fb->head_index];
457         fb->values[fb->head_index++] = value;
458
459         if (fb->head_index >= fb->size)
460                 fb->head_index = 0;
461
462         return ov;
463 }
464
465 static void fifo_add_val(struct fifo_buffer *fb, int value)
466 {
467         int i;
468
469         for (i = 0; i < fb->size; i++)
470                 fb->values[i] += value;
471 }
472
473 struct fifo_buffer *fifo_alloc(int fifo_size)
474 {
475         struct fifo_buffer *fb;
476
477         fb = kzalloc(sizeof(struct fifo_buffer) + sizeof(int) * fifo_size, GFP_NOIO);
478         if (!fb)
479                 return NULL;
480
481         fb->head_index = 0;
482         fb->size = fifo_size;
483         fb->total = 0;
484
485         return fb;
486 }
487
488 static int drbd_rs_controller(struct drbd_device *device, unsigned int sect_in)
489 {
490         struct disk_conf *dc;
491         unsigned int want;     /* The number of sectors we want in-flight */
492         int req_sect; /* Number of sectors to request in this turn */
493         int correction; /* Number of sectors more we need in-flight */
494         int cps; /* correction per invocation of drbd_rs_controller() */
495         int steps; /* Number of time steps to plan ahead */
496         int curr_corr;
497         int max_sect;
498         struct fifo_buffer *plan;
499
500         dc = rcu_dereference(device->ldev->disk_conf);
501         plan = rcu_dereference(device->rs_plan_s);
502
503         steps = plan->size; /* (dc->c_plan_ahead * 10 * SLEEP_TIME) / HZ; */
504
505         if (device->rs_in_flight + sect_in == 0) { /* At start of resync */
506                 want = ((dc->resync_rate * 2 * SLEEP_TIME) / HZ) * steps;
507         } else { /* normal path */
508                 want = dc->c_fill_target ? dc->c_fill_target :
509                         sect_in * dc->c_delay_target * HZ / (SLEEP_TIME * 10);
510         }
511
512         correction = want - device->rs_in_flight - plan->total;
513
514         /* Plan ahead */
515         cps = correction / steps;
516         fifo_add_val(plan, cps);
517         plan->total += cps * steps;
518
519         /* What we do in this step */
520         curr_corr = fifo_push(plan, 0);
521         plan->total -= curr_corr;
522
523         req_sect = sect_in + curr_corr;
524         if (req_sect < 0)
525                 req_sect = 0;
526
527         max_sect = (dc->c_max_rate * 2 * SLEEP_TIME) / HZ;
528         if (req_sect > max_sect)
529                 req_sect = max_sect;
530
531         /*
532         drbd_warn(device, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n",
533                  sect_in, device->rs_in_flight, want, correction,
534                  steps, cps, device->rs_planed, curr_corr, req_sect);
535         */
536
537         return req_sect;
538 }
539
540 static int drbd_rs_number_requests(struct drbd_device *device)
541 {
542         unsigned int sect_in;  /* Number of sectors that came in since the last turn */
543         int number, mxb;
544
545         sect_in = atomic_xchg(&device->rs_sect_in, 0);
546         device->rs_in_flight -= sect_in;
547
548         rcu_read_lock();
549         mxb = drbd_get_max_buffers(device) / 2;
550         if (rcu_dereference(device->rs_plan_s)->size) {
551                 number = drbd_rs_controller(device, sect_in) >> (BM_BLOCK_SHIFT - 9);
552                 device->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME;
553         } else {
554                 device->c_sync_rate = rcu_dereference(device->ldev->disk_conf)->resync_rate;
555                 number = SLEEP_TIME * device->c_sync_rate  / ((BM_BLOCK_SIZE / 1024) * HZ);
556         }
557         rcu_read_unlock();
558
559         /* Don't have more than "max-buffers"/2 in-flight.
560          * Otherwise we may cause the remote site to stall on drbd_alloc_pages(),
561          * potentially causing a distributed deadlock on congestion during
562          * online-verify or (checksum-based) resync, if max-buffers,
563          * socket buffer sizes and resync rate settings are mis-configured. */
564
565         /* note that "number" is in units of "BM_BLOCK_SIZE" (which is 4k),
566          * mxb (as used here, and in drbd_alloc_pages on the peer) is
567          * "number of pages" (typically also 4k),
568          * but "rs_in_flight" is in "sectors" (512 Byte). */
569         if (mxb - device->rs_in_flight/8 < number)
570                 number = mxb - device->rs_in_flight/8;
571
572         return number;
573 }
574
575 static int make_resync_request(struct drbd_device *const device, int cancel)
576 {
577         struct drbd_peer_device *const peer_device = first_peer_device(device);
578         struct drbd_connection *const connection = peer_device ? peer_device->connection : NULL;
579         unsigned long bit;
580         sector_t sector;
581         const sector_t capacity = drbd_get_capacity(device->this_bdev);
582         int max_bio_size;
583         int number, rollback_i, size;
584         int align, requeue = 0;
585         int i = 0;
586
587         if (unlikely(cancel))
588                 return 0;
589
590         if (device->rs_total == 0) {
591                 /* empty resync? */
592                 drbd_resync_finished(device);
593                 return 0;
594         }
595
596         if (!get_ldev(device)) {
597                 /* Since we only need to access device->rsync a
598                    get_ldev_if_state(device,D_FAILED) would be sufficient, but
599                    to continue resync with a broken disk makes no sense at
600                    all */
601                 drbd_err(device, "Disk broke down during resync!\n");
602                 return 0;
603         }
604
605         max_bio_size = queue_max_hw_sectors(device->rq_queue) << 9;
606         number = drbd_rs_number_requests(device);
607         if (number <= 0)
608                 goto requeue;
609
610         for (i = 0; i < number; i++) {
611                 /* Stop generating RS requests when half of the send buffer is filled,
612                  * but notify TCP that we'd like to have more space. */
613                 mutex_lock(&connection->data.mutex);
614                 if (connection->data.socket) {
615                         struct sock *sk = connection->data.socket->sk;
616                         int queued = sk->sk_wmem_queued;
617                         int sndbuf = sk->sk_sndbuf;
618                         if (queued > sndbuf / 2) {
619                                 requeue = 1;
620                                 if (sk->sk_socket)
621                                         set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
622                         }
623                 } else
624                         requeue = 1;
625                 mutex_unlock(&connection->data.mutex);
626                 if (requeue)
627                         goto requeue;
628
629 next_sector:
630                 size = BM_BLOCK_SIZE;
631                 bit  = drbd_bm_find_next(device, device->bm_resync_fo);
632
633                 if (bit == DRBD_END_OF_BITMAP) {
634                         device->bm_resync_fo = drbd_bm_bits(device);
635                         put_ldev(device);
636                         return 0;
637                 }
638
639                 sector = BM_BIT_TO_SECT(bit);
640
641                 if (drbd_try_rs_begin_io(device, sector)) {
642                         device->bm_resync_fo = bit;
643                         goto requeue;
644                 }
645                 device->bm_resync_fo = bit + 1;
646
647                 if (unlikely(drbd_bm_test_bit(device, bit) == 0)) {
648                         drbd_rs_complete_io(device, sector);
649                         goto next_sector;
650                 }
651
652 #if DRBD_MAX_BIO_SIZE > BM_BLOCK_SIZE
653                 /* try to find some adjacent bits.
654                  * we stop if we have already the maximum req size.
655                  *
656                  * Additionally always align bigger requests, in order to
657                  * be prepared for all stripe sizes of software RAIDs.
658                  */
659                 align = 1;
660                 rollback_i = i;
661                 while (i < number) {
662                         if (size + BM_BLOCK_SIZE > max_bio_size)
663                                 break;
664
665                         /* Be always aligned */
666                         if (sector & ((1<<(align+3))-1))
667                                 break;
668
669                         /* do not cross extent boundaries */
670                         if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
671                                 break;
672                         /* now, is it actually dirty, after all?
673                          * caution, drbd_bm_test_bit is tri-state for some
674                          * obscure reason; ( b == 0 ) would get the out-of-band
675                          * only accidentally right because of the "oddly sized"
676                          * adjustment below */
677                         if (drbd_bm_test_bit(device, bit+1) != 1)
678                                 break;
679                         bit++;
680                         size += BM_BLOCK_SIZE;
681                         if ((BM_BLOCK_SIZE << align) <= size)
682                                 align++;
683                         i++;
684                 }
685                 /* if we merged some,
686                  * reset the offset to start the next drbd_bm_find_next from */
687                 if (size > BM_BLOCK_SIZE)
688                         device->bm_resync_fo = bit + 1;
689 #endif
690
691                 /* adjust very last sectors, in case we are oddly sized */
692                 if (sector + (size>>9) > capacity)
693                         size = (capacity-sector)<<9;
694
695                 if (device->use_csums) {
696                         switch (read_for_csum(peer_device, sector, size)) {
697                         case -EIO: /* Disk failure */
698                                 put_ldev(device);
699                                 return -EIO;
700                         case -EAGAIN: /* allocation failed, or ldev busy */
701                                 drbd_rs_complete_io(device, sector);
702                                 device->bm_resync_fo = BM_SECT_TO_BIT(sector);
703                                 i = rollback_i;
704                                 goto requeue;
705                         case 0:
706                                 /* everything ok */
707                                 break;
708                         default:
709                                 BUG();
710                         }
711                 } else {
712                         int err;
713
714                         inc_rs_pending(device);
715                         err = drbd_send_drequest(peer_device, P_RS_DATA_REQUEST,
716                                                  sector, size, ID_SYNCER);
717                         if (err) {
718                                 drbd_err(device, "drbd_send_drequest() failed, aborting...\n");
719                                 dec_rs_pending(device);
720                                 put_ldev(device);
721                                 return err;
722                         }
723                 }
724         }
725
726         if (device->bm_resync_fo >= drbd_bm_bits(device)) {
727                 /* last syncer _request_ was sent,
728                  * but the P_RS_DATA_REPLY not yet received.  sync will end (and
729                  * next sync group will resume), as soon as we receive the last
730                  * resync data block, and the last bit is cleared.
731                  * until then resync "work" is "inactive" ...
732                  */
733                 put_ldev(device);
734                 return 0;
735         }
736
737  requeue:
738         device->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
739         mod_timer(&device->resync_timer, jiffies + SLEEP_TIME);
740         put_ldev(device);
741         return 0;
742 }
743
744 static int make_ov_request(struct drbd_device *device, int cancel)
745 {
746         int number, i, size;
747         sector_t sector;
748         const sector_t capacity = drbd_get_capacity(device->this_bdev);
749         bool stop_sector_reached = false;
750
751         if (unlikely(cancel))
752                 return 1;
753
754         number = drbd_rs_number_requests(device);
755
756         sector = device->ov_position;
757         for (i = 0; i < number; i++) {
758                 if (sector >= capacity)
759                         return 1;
760
761                 /* We check for "finished" only in the reply path:
762                  * w_e_end_ov_reply().
763                  * We need to send at least one request out. */
764                 stop_sector_reached = i > 0
765                         && verify_can_do_stop_sector(device)
766                         && sector >= device->ov_stop_sector;
767                 if (stop_sector_reached)
768                         break;
769
770                 size = BM_BLOCK_SIZE;
771
772                 if (drbd_try_rs_begin_io(device, sector)) {
773                         device->ov_position = sector;
774                         goto requeue;
775                 }
776
777                 if (sector + (size>>9) > capacity)
778                         size = (capacity-sector)<<9;
779
780                 inc_rs_pending(device);
781                 if (drbd_send_ov_request(first_peer_device(device), sector, size)) {
782                         dec_rs_pending(device);
783                         return 0;
784                 }
785                 sector += BM_SECT_PER_BIT;
786         }
787         device->ov_position = sector;
788
789  requeue:
790         device->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
791         if (i == 0 || !stop_sector_reached)
792                 mod_timer(&device->resync_timer, jiffies + SLEEP_TIME);
793         return 1;
794 }
795
796 int w_ov_finished(struct drbd_work *w, int cancel)
797 {
798         struct drbd_device_work *dw =
799                 container_of(w, struct drbd_device_work, w);
800         struct drbd_device *device = dw->device;
801         kfree(dw);
802         ov_out_of_sync_print(device);
803         drbd_resync_finished(device);
804
805         return 0;
806 }
807
808 static int w_resync_finished(struct drbd_work *w, int cancel)
809 {
810         struct drbd_device_work *dw =
811                 container_of(w, struct drbd_device_work, w);
812         struct drbd_device *device = dw->device;
813         kfree(dw);
814
815         drbd_resync_finished(device);
816
817         return 0;
818 }
819
820 static void ping_peer(struct drbd_device *device)
821 {
822         struct drbd_connection *connection = first_peer_device(device)->connection;
823
824         clear_bit(GOT_PING_ACK, &connection->flags);
825         request_ping(connection);
826         wait_event(connection->ping_wait,
827                    test_bit(GOT_PING_ACK, &connection->flags) || device->state.conn < C_CONNECTED);
828 }
829
830 int drbd_resync_finished(struct drbd_device *device)
831 {
832         unsigned long db, dt, dbdt;
833         unsigned long n_oos;
834         union drbd_state os, ns;
835         struct drbd_device_work *dw;
836         char *khelper_cmd = NULL;
837         int verify_done = 0;
838
839         /* Remove all elements from the resync LRU. Since future actions
840          * might set bits in the (main) bitmap, then the entries in the
841          * resync LRU would be wrong. */
842         if (drbd_rs_del_all(device)) {
843                 /* In case this is not possible now, most probably because
844                  * there are P_RS_DATA_REPLY Packets lingering on the worker's
845                  * queue (or even the read operations for those packets
846                  * is not finished by now).   Retry in 100ms. */
847
848                 schedule_timeout_interruptible(HZ / 10);
849                 dw = kmalloc(sizeof(struct drbd_device_work), GFP_ATOMIC);
850                 if (dw) {
851                         dw->w.cb = w_resync_finished;
852                         dw->device = device;
853                         drbd_queue_work(&first_peer_device(device)->connection->sender_work,
854                                         &dw->w);
855                         return 1;
856                 }
857                 drbd_err(device, "Warn failed to drbd_rs_del_all() and to kmalloc(dw).\n");
858         }
859
860         dt = (jiffies - device->rs_start - device->rs_paused) / HZ;
861         if (dt <= 0)
862                 dt = 1;
863
864         db = device->rs_total;
865         /* adjust for verify start and stop sectors, respective reached position */
866         if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T)
867                 db -= device->ov_left;
868
869         dbdt = Bit2KB(db/dt);
870         device->rs_paused /= HZ;
871
872         if (!get_ldev(device))
873                 goto out;
874
875         ping_peer(device);
876
877         spin_lock_irq(&device->resource->req_lock);
878         os = drbd_read_state(device);
879
880         verify_done = (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T);
881
882         /* This protects us against multiple calls (that can happen in the presence
883            of application IO), and against connectivity loss just before we arrive here. */
884         if (os.conn <= C_CONNECTED)
885                 goto out_unlock;
886
887         ns = os;
888         ns.conn = C_CONNECTED;
889
890         drbd_info(device, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
891              verify_done ? "Online verify" : "Resync",
892              dt + device->rs_paused, device->rs_paused, dbdt);
893
894         n_oos = drbd_bm_total_weight(device);
895
896         if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
897                 if (n_oos) {
898                         drbd_alert(device, "Online verify found %lu %dk block out of sync!\n",
899                               n_oos, Bit2KB(1));
900                         khelper_cmd = "out-of-sync";
901                 }
902         } else {
903                 D_ASSERT(device, (n_oos - device->rs_failed) == 0);
904
905                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
906                         khelper_cmd = "after-resync-target";
907
908                 if (device->use_csums && device->rs_total) {
909                         const unsigned long s = device->rs_same_csum;
910                         const unsigned long t = device->rs_total;
911                         const int ratio =
912                                 (t == 0)     ? 0 :
913                         (t < 100000) ? ((s*100)/t) : (s/(t/100));
914                         drbd_info(device, "%u %% had equal checksums, eliminated: %luK; "
915                              "transferred %luK total %luK\n",
916                              ratio,
917                              Bit2KB(device->rs_same_csum),
918                              Bit2KB(device->rs_total - device->rs_same_csum),
919                              Bit2KB(device->rs_total));
920                 }
921         }
922
923         if (device->rs_failed) {
924                 drbd_info(device, "            %lu failed blocks\n", device->rs_failed);
925
926                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
927                         ns.disk = D_INCONSISTENT;
928                         ns.pdsk = D_UP_TO_DATE;
929                 } else {
930                         ns.disk = D_UP_TO_DATE;
931                         ns.pdsk = D_INCONSISTENT;
932                 }
933         } else {
934                 ns.disk = D_UP_TO_DATE;
935                 ns.pdsk = D_UP_TO_DATE;
936
937                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
938                         if (device->p_uuid) {
939                                 int i;
940                                 for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
941                                         _drbd_uuid_set(device, i, device->p_uuid[i]);
942                                 drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_CURRENT]);
943                                 _drbd_uuid_set(device, UI_CURRENT, device->p_uuid[UI_CURRENT]);
944                         } else {
945                                 drbd_err(device, "device->p_uuid is NULL! BUG\n");
946                         }
947                 }
948
949                 if (!(os.conn == C_VERIFY_S || os.conn == C_VERIFY_T)) {
950                         /* for verify runs, we don't update uuids here,
951                          * so there would be nothing to report. */
952                         drbd_uuid_set_bm(device, 0UL);
953                         drbd_print_uuids(device, "updated UUIDs");
954                         if (device->p_uuid) {
955                                 /* Now the two UUID sets are equal, update what we
956                                  * know of the peer. */
957                                 int i;
958                                 for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
959                                         device->p_uuid[i] = device->ldev->md.uuid[i];
960                         }
961                 }
962         }
963
964         _drbd_set_state(device, ns, CS_VERBOSE, NULL);
965 out_unlock:
966         spin_unlock_irq(&device->resource->req_lock);
967         put_ldev(device);
968 out:
969         device->rs_total  = 0;
970         device->rs_failed = 0;
971         device->rs_paused = 0;
972
973         /* reset start sector, if we reached end of device */
974         if (verify_done && device->ov_left == 0)
975                 device->ov_start_sector = 0;
976
977         drbd_md_sync(device);
978
979         if (khelper_cmd)
980                 drbd_khelper(device, khelper_cmd);
981
982         return 1;
983 }
984
985 /* helper */
986 static void move_to_net_ee_or_free(struct drbd_device *device, struct drbd_peer_request *peer_req)
987 {
988         if (drbd_peer_req_has_active_page(peer_req)) {
989                 /* This might happen if sendpage() has not finished */
990                 int i = (peer_req->i.size + PAGE_SIZE -1) >> PAGE_SHIFT;
991                 atomic_add(i, &device->pp_in_use_by_net);
992                 atomic_sub(i, &device->pp_in_use);
993                 spin_lock_irq(&device->resource->req_lock);
994                 list_add_tail(&peer_req->w.list, &device->net_ee);
995                 spin_unlock_irq(&device->resource->req_lock);
996                 wake_up(&drbd_pp_wait);
997         } else
998                 drbd_free_peer_req(device, peer_req);
999 }
1000
1001 /**
1002  * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
1003  * @device:     DRBD device.
1004  * @w:          work object.
1005  * @cancel:     The connection will be closed anyways
1006  */
1007 int w_e_end_data_req(struct drbd_work *w, int cancel)
1008 {
1009         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1010         struct drbd_peer_device *peer_device = peer_req->peer_device;
1011         struct drbd_device *device = peer_device->device;
1012         int err;
1013
1014         if (unlikely(cancel)) {
1015                 drbd_free_peer_req(device, peer_req);
1016                 dec_unacked(device);
1017                 return 0;
1018         }
1019
1020         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1021                 err = drbd_send_block(peer_device, P_DATA_REPLY, peer_req);
1022         } else {
1023                 if (__ratelimit(&drbd_ratelimit_state))
1024                         drbd_err(device, "Sending NegDReply. sector=%llus.\n",
1025                             (unsigned long long)peer_req->i.sector);
1026
1027                 err = drbd_send_ack(peer_device, P_NEG_DREPLY, peer_req);
1028         }
1029
1030         dec_unacked(device);
1031
1032         move_to_net_ee_or_free(device, peer_req);
1033
1034         if (unlikely(err))
1035                 drbd_err(device, "drbd_send_block() failed\n");
1036         return err;
1037 }
1038
1039 /**
1040  * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUEST
1041  * @w:          work object.
1042  * @cancel:     The connection will be closed anyways
1043  */
1044 int w_e_end_rsdata_req(struct drbd_work *w, int cancel)
1045 {
1046         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1047         struct drbd_peer_device *peer_device = peer_req->peer_device;
1048         struct drbd_device *device = peer_device->device;
1049         int err;
1050
1051         if (unlikely(cancel)) {
1052                 drbd_free_peer_req(device, peer_req);
1053                 dec_unacked(device);
1054                 return 0;
1055         }
1056
1057         if (get_ldev_if_state(device, D_FAILED)) {
1058                 drbd_rs_complete_io(device, peer_req->i.sector);
1059                 put_ldev(device);
1060         }
1061
1062         if (device->state.conn == C_AHEAD) {
1063                 err = drbd_send_ack(peer_device, P_RS_CANCEL, peer_req);
1064         } else if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1065                 if (likely(device->state.pdsk >= D_INCONSISTENT)) {
1066                         inc_rs_pending(device);
1067                         err = drbd_send_block(peer_device, P_RS_DATA_REPLY, peer_req);
1068                 } else {
1069                         if (__ratelimit(&drbd_ratelimit_state))
1070                                 drbd_err(device, "Not sending RSDataReply, "
1071                                     "partner DISKLESS!\n");
1072                         err = 0;
1073                 }
1074         } else {
1075                 if (__ratelimit(&drbd_ratelimit_state))
1076                         drbd_err(device, "Sending NegRSDReply. sector %llus.\n",
1077                             (unsigned long long)peer_req->i.sector);
1078
1079                 err = drbd_send_ack(peer_device, P_NEG_RS_DREPLY, peer_req);
1080
1081                 /* update resync data with failure */
1082                 drbd_rs_failed_io(device, peer_req->i.sector, peer_req->i.size);
1083         }
1084
1085         dec_unacked(device);
1086
1087         move_to_net_ee_or_free(device, peer_req);
1088
1089         if (unlikely(err))
1090                 drbd_err(device, "drbd_send_block() failed\n");
1091         return err;
1092 }
1093
1094 int w_e_end_csum_rs_req(struct drbd_work *w, int cancel)
1095 {
1096         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1097         struct drbd_peer_device *peer_device = peer_req->peer_device;
1098         struct drbd_device *device = peer_device->device;
1099         struct digest_info *di;
1100         int digest_size;
1101         void *digest = NULL;
1102         int err, eq = 0;
1103
1104         if (unlikely(cancel)) {
1105                 drbd_free_peer_req(device, peer_req);
1106                 dec_unacked(device);
1107                 return 0;
1108         }
1109
1110         if (get_ldev(device)) {
1111                 drbd_rs_complete_io(device, peer_req->i.sector);
1112                 put_ldev(device);
1113         }
1114
1115         di = peer_req->digest;
1116
1117         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1118                 /* quick hack to try to avoid a race against reconfiguration.
1119                  * a real fix would be much more involved,
1120                  * introducing more locking mechanisms */
1121                 if (peer_device->connection->csums_tfm) {
1122                         digest_size = crypto_ahash_digestsize(peer_device->connection->csums_tfm);
1123                         D_ASSERT(device, digest_size == di->digest_size);
1124                         digest = kmalloc(digest_size, GFP_NOIO);
1125                 }
1126                 if (digest) {
1127                         drbd_csum_ee(peer_device->connection->csums_tfm, peer_req, digest);
1128                         eq = !memcmp(digest, di->digest, digest_size);
1129                         kfree(digest);
1130                 }
1131
1132                 if (eq) {
1133                         drbd_set_in_sync(device, peer_req->i.sector, peer_req->i.size);
1134                         /* rs_same_csums unit is BM_BLOCK_SIZE */
1135                         device->rs_same_csum += peer_req->i.size >> BM_BLOCK_SHIFT;
1136                         err = drbd_send_ack(peer_device, P_RS_IS_IN_SYNC, peer_req);
1137                 } else {
1138                         inc_rs_pending(device);
1139                         peer_req->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */
1140                         peer_req->flags &= ~EE_HAS_DIGEST; /* This peer request no longer has a digest pointer */
1141                         kfree(di);
1142                         err = drbd_send_block(peer_device, P_RS_DATA_REPLY, peer_req);
1143                 }
1144         } else {
1145                 err = drbd_send_ack(peer_device, P_NEG_RS_DREPLY, peer_req);
1146                 if (__ratelimit(&drbd_ratelimit_state))
1147                         drbd_err(device, "Sending NegDReply. I guess it gets messy.\n");
1148         }
1149
1150         dec_unacked(device);
1151         move_to_net_ee_or_free(device, peer_req);
1152
1153         if (unlikely(err))
1154                 drbd_err(device, "drbd_send_block/ack() failed\n");
1155         return err;
1156 }
1157
1158 int w_e_end_ov_req(struct drbd_work *w, int cancel)
1159 {
1160         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1161         struct drbd_peer_device *peer_device = peer_req->peer_device;
1162         struct drbd_device *device = peer_device->device;
1163         sector_t sector = peer_req->i.sector;
1164         unsigned int size = peer_req->i.size;
1165         int digest_size;
1166         void *digest;
1167         int err = 0;
1168
1169         if (unlikely(cancel))
1170                 goto out;
1171
1172         digest_size = crypto_ahash_digestsize(peer_device->connection->verify_tfm);
1173         digest = kmalloc(digest_size, GFP_NOIO);
1174         if (!digest) {
1175                 err = 1;        /* terminate the connection in case the allocation failed */
1176                 goto out;
1177         }
1178
1179         if (likely(!(peer_req->flags & EE_WAS_ERROR)))
1180                 drbd_csum_ee(peer_device->connection->verify_tfm, peer_req, digest);
1181         else
1182                 memset(digest, 0, digest_size);
1183
1184         /* Free e and pages before send.
1185          * In case we block on congestion, we could otherwise run into
1186          * some distributed deadlock, if the other side blocks on
1187          * congestion as well, because our receiver blocks in
1188          * drbd_alloc_pages due to pp_in_use > max_buffers. */
1189         drbd_free_peer_req(device, peer_req);
1190         peer_req = NULL;
1191         inc_rs_pending(device);
1192         err = drbd_send_drequest_csum(peer_device, sector, size, digest, digest_size, P_OV_REPLY);
1193         if (err)
1194                 dec_rs_pending(device);
1195         kfree(digest);
1196
1197 out:
1198         if (peer_req)
1199                 drbd_free_peer_req(device, peer_req);
1200         dec_unacked(device);
1201         return err;
1202 }
1203
1204 void drbd_ov_out_of_sync_found(struct drbd_device *device, sector_t sector, int size)
1205 {
1206         if (device->ov_last_oos_start + device->ov_last_oos_size == sector) {
1207                 device->ov_last_oos_size += size>>9;
1208         } else {
1209                 device->ov_last_oos_start = sector;
1210                 device->ov_last_oos_size = size>>9;
1211         }
1212         drbd_set_out_of_sync(device, sector, size);
1213 }
1214
1215 int w_e_end_ov_reply(struct drbd_work *w, int cancel)
1216 {
1217         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1218         struct drbd_peer_device *peer_device = peer_req->peer_device;
1219         struct drbd_device *device = peer_device->device;
1220         struct digest_info *di;
1221         void *digest;
1222         sector_t sector = peer_req->i.sector;
1223         unsigned int size = peer_req->i.size;
1224         int digest_size;
1225         int err, eq = 0;
1226         bool stop_sector_reached = false;
1227
1228         if (unlikely(cancel)) {
1229                 drbd_free_peer_req(device, peer_req);
1230                 dec_unacked(device);
1231                 return 0;
1232         }
1233
1234         /* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1235          * the resync lru has been cleaned up already */
1236         if (get_ldev(device)) {
1237                 drbd_rs_complete_io(device, peer_req->i.sector);
1238                 put_ldev(device);
1239         }
1240
1241         di = peer_req->digest;
1242
1243         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1244                 digest_size = crypto_ahash_digestsize(peer_device->connection->verify_tfm);
1245                 digest = kmalloc(digest_size, GFP_NOIO);
1246                 if (digest) {
1247                         drbd_csum_ee(peer_device->connection->verify_tfm, peer_req, digest);
1248
1249                         D_ASSERT(device, digest_size == di->digest_size);
1250                         eq = !memcmp(digest, di->digest, digest_size);
1251                         kfree(digest);
1252                 }
1253         }
1254
1255         /* Free peer_req and pages before send.
1256          * In case we block on congestion, we could otherwise run into
1257          * some distributed deadlock, if the other side blocks on
1258          * congestion as well, because our receiver blocks in
1259          * drbd_alloc_pages due to pp_in_use > max_buffers. */
1260         drbd_free_peer_req(device, peer_req);
1261         if (!eq)
1262                 drbd_ov_out_of_sync_found(device, sector, size);
1263         else
1264                 ov_out_of_sync_print(device);
1265
1266         err = drbd_send_ack_ex(peer_device, P_OV_RESULT, sector, size,
1267                                eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1268
1269         dec_unacked(device);
1270
1271         --device->ov_left;
1272
1273         /* let's advance progress step marks only for every other megabyte */
1274         if ((device->ov_left & 0x200) == 0x200)
1275                 drbd_advance_rs_marks(device, device->ov_left);
1276
1277         stop_sector_reached = verify_can_do_stop_sector(device) &&
1278                 (sector + (size>>9)) >= device->ov_stop_sector;
1279
1280         if (device->ov_left == 0 || stop_sector_reached) {
1281                 ov_out_of_sync_print(device);
1282                 drbd_resync_finished(device);
1283         }
1284
1285         return err;
1286 }
1287
1288 /* FIXME
1289  * We need to track the number of pending barrier acks,
1290  * and to be able to wait for them.
1291  * See also comment in drbd_adm_attach before drbd_suspend_io.
1292  */
1293 static int drbd_send_barrier(struct drbd_connection *connection)
1294 {
1295         struct p_barrier *p;
1296         struct drbd_socket *sock;
1297
1298         sock = &connection->data;
1299         p = conn_prepare_command(connection, sock);
1300         if (!p)
1301                 return -EIO;
1302         p->barrier = connection->send.current_epoch_nr;
1303         p->pad = 0;
1304         connection->send.current_epoch_writes = 0;
1305         connection->send.last_sent_barrier_jif = jiffies;
1306
1307         return conn_send_command(connection, sock, P_BARRIER, sizeof(*p), NULL, 0);
1308 }
1309
1310 int w_send_write_hint(struct drbd_work *w, int cancel)
1311 {
1312         struct drbd_device *device =
1313                 container_of(w, struct drbd_device, unplug_work);
1314         struct drbd_socket *sock;
1315
1316         if (cancel)
1317                 return 0;
1318         sock = &first_peer_device(device)->connection->data;
1319         if (!drbd_prepare_command(first_peer_device(device), sock))
1320                 return -EIO;
1321         return drbd_send_command(first_peer_device(device), sock, P_UNPLUG_REMOTE, 0, NULL, 0);
1322 }
1323
1324 static void re_init_if_first_write(struct drbd_connection *connection, unsigned int epoch)
1325 {
1326         if (!connection->send.seen_any_write_yet) {
1327                 connection->send.seen_any_write_yet = true;
1328                 connection->send.current_epoch_nr = epoch;
1329                 connection->send.current_epoch_writes = 0;
1330                 connection->send.last_sent_barrier_jif = jiffies;
1331         }
1332 }
1333
1334 static void maybe_send_barrier(struct drbd_connection *connection, unsigned int epoch)
1335 {
1336         /* re-init if first write on this connection */
1337         if (!connection->send.seen_any_write_yet)
1338                 return;
1339         if (connection->send.current_epoch_nr != epoch) {
1340                 if (connection->send.current_epoch_writes)
1341                         drbd_send_barrier(connection);
1342                 connection->send.current_epoch_nr = epoch;
1343         }
1344 }
1345
1346 int w_send_out_of_sync(struct drbd_work *w, int cancel)
1347 {
1348         struct drbd_request *req = container_of(w, struct drbd_request, w);
1349         struct drbd_device *device = req->device;
1350         struct drbd_peer_device *const peer_device = first_peer_device(device);
1351         struct drbd_connection *const connection = peer_device->connection;
1352         int err;
1353
1354         if (unlikely(cancel)) {
1355                 req_mod(req, SEND_CANCELED);
1356                 return 0;
1357         }
1358         req->pre_send_jif = jiffies;
1359
1360         /* this time, no connection->send.current_epoch_writes++;
1361          * If it was sent, it was the closing barrier for the last
1362          * replicated epoch, before we went into AHEAD mode.
1363          * No more barriers will be sent, until we leave AHEAD mode again. */
1364         maybe_send_barrier(connection, req->epoch);
1365
1366         err = drbd_send_out_of_sync(peer_device, req);
1367         req_mod(req, OOS_HANDED_TO_NETWORK);
1368
1369         return err;
1370 }
1371
1372 /**
1373  * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1374  * @w:          work object.
1375  * @cancel:     The connection will be closed anyways
1376  */
1377 int w_send_dblock(struct drbd_work *w, int cancel)
1378 {
1379         struct drbd_request *req = container_of(w, struct drbd_request, w);
1380         struct drbd_device *device = req->device;
1381         struct drbd_peer_device *const peer_device = first_peer_device(device);
1382         struct drbd_connection *connection = peer_device->connection;
1383         int err;
1384
1385         if (unlikely(cancel)) {
1386                 req_mod(req, SEND_CANCELED);
1387                 return 0;
1388         }
1389         req->pre_send_jif = jiffies;
1390
1391         re_init_if_first_write(connection, req->epoch);
1392         maybe_send_barrier(connection, req->epoch);
1393         connection->send.current_epoch_writes++;
1394
1395         err = drbd_send_dblock(peer_device, req);
1396         req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
1397
1398         return err;
1399 }
1400
1401 /**
1402  * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1403  * @w:          work object.
1404  * @cancel:     The connection will be closed anyways
1405  */
1406 int w_send_read_req(struct drbd_work *w, int cancel)
1407 {
1408         struct drbd_request *req = container_of(w, struct drbd_request, w);
1409         struct drbd_device *device = req->device;
1410         struct drbd_peer_device *const peer_device = first_peer_device(device);
1411         struct drbd_connection *connection = peer_device->connection;
1412         int err;
1413
1414         if (unlikely(cancel)) {
1415                 req_mod(req, SEND_CANCELED);
1416                 return 0;
1417         }
1418         req->pre_send_jif = jiffies;
1419
1420         /* Even read requests may close a write epoch,
1421          * if there was any yet. */
1422         maybe_send_barrier(connection, req->epoch);
1423
1424         err = drbd_send_drequest(peer_device, P_DATA_REQUEST, req->i.sector, req->i.size,
1425                                  (unsigned long)req);
1426
1427         req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
1428
1429         return err;
1430 }
1431
1432 int w_restart_disk_io(struct drbd_work *w, int cancel)
1433 {
1434         struct drbd_request *req = container_of(w, struct drbd_request, w);
1435         struct drbd_device *device = req->device;
1436
1437         if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG)
1438                 drbd_al_begin_io(device, &req->i);
1439
1440         drbd_req_make_private_bio(req, req->master_bio);
1441         req->private_bio->bi_bdev = device->ldev->backing_bdev;
1442         generic_make_request(req->private_bio);
1443
1444         return 0;
1445 }
1446
1447 static int _drbd_may_sync_now(struct drbd_device *device)
1448 {
1449         struct drbd_device *odev = device;
1450         int resync_after;
1451
1452         while (1) {
1453                 if (!odev->ldev || odev->state.disk == D_DISKLESS)
1454                         return 1;
1455                 rcu_read_lock();
1456                 resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
1457                 rcu_read_unlock();
1458                 if (resync_after == -1)
1459                         return 1;
1460                 odev = minor_to_device(resync_after);
1461                 if (!odev)
1462                         return 1;
1463                 if ((odev->state.conn >= C_SYNC_SOURCE &&
1464                      odev->state.conn <= C_PAUSED_SYNC_T) ||
1465                     odev->state.aftr_isp || odev->state.peer_isp ||
1466                     odev->state.user_isp)
1467                         return 0;
1468         }
1469 }
1470
1471 /**
1472  * drbd_pause_after() - Pause resync on all devices that may not resync now
1473  * @device:     DRBD device.
1474  *
1475  * Called from process context only (admin command and after_state_ch).
1476  */
1477 static bool drbd_pause_after(struct drbd_device *device)
1478 {
1479         bool changed = false;
1480         struct drbd_device *odev;
1481         int i;
1482
1483         rcu_read_lock();
1484         idr_for_each_entry(&drbd_devices, odev, i) {
1485                 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1486                         continue;
1487                 if (!_drbd_may_sync_now(odev) &&
1488                     _drbd_set_state(_NS(odev, aftr_isp, 1),
1489                                     CS_HARD, NULL) != SS_NOTHING_TO_DO)
1490                         changed = true;
1491         }
1492         rcu_read_unlock();
1493
1494         return changed;
1495 }
1496
1497 /**
1498  * drbd_resume_next() - Resume resync on all devices that may resync now
1499  * @device:     DRBD device.
1500  *
1501  * Called from process context only (admin command and worker).
1502  */
1503 static bool drbd_resume_next(struct drbd_device *device)
1504 {
1505         bool changed = false;
1506         struct drbd_device *odev;
1507         int i;
1508
1509         rcu_read_lock();
1510         idr_for_each_entry(&drbd_devices, odev, i) {
1511                 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1512                         continue;
1513                 if (odev->state.aftr_isp) {
1514                         if (_drbd_may_sync_now(odev) &&
1515                             _drbd_set_state(_NS(odev, aftr_isp, 0),
1516                                             CS_HARD, NULL) != SS_NOTHING_TO_DO)
1517                                 changed = true;
1518                 }
1519         }
1520         rcu_read_unlock();
1521         return changed;
1522 }
1523
1524 void resume_next_sg(struct drbd_device *device)
1525 {
1526         lock_all_resources();
1527         drbd_resume_next(device);
1528         unlock_all_resources();
1529 }
1530
1531 void suspend_other_sg(struct drbd_device *device)
1532 {
1533         lock_all_resources();
1534         drbd_pause_after(device);
1535         unlock_all_resources();
1536 }
1537
1538 /* caller must lock_all_resources() */
1539 enum drbd_ret_code drbd_resync_after_valid(struct drbd_device *device, int o_minor)
1540 {
1541         struct drbd_device *odev;
1542         int resync_after;
1543
1544         if (o_minor == -1)
1545                 return NO_ERROR;
1546         if (o_minor < -1 || o_minor > MINORMASK)
1547                 return ERR_RESYNC_AFTER;
1548
1549         /* check for loops */
1550         odev = minor_to_device(o_minor);
1551         while (1) {
1552                 if (odev == device)
1553                         return ERR_RESYNC_AFTER_CYCLE;
1554
1555                 /* You are free to depend on diskless, non-existing,
1556                  * or not yet/no longer existing minors.
1557                  * We only reject dependency loops.
1558                  * We cannot follow the dependency chain beyond a detached or
1559                  * missing minor.
1560                  */
1561                 if (!odev || !odev->ldev || odev->state.disk == D_DISKLESS)
1562                         return NO_ERROR;
1563
1564                 rcu_read_lock();
1565                 resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
1566                 rcu_read_unlock();
1567                 /* dependency chain ends here, no cycles. */
1568                 if (resync_after == -1)
1569                         return NO_ERROR;
1570
1571                 /* follow the dependency chain */
1572                 odev = minor_to_device(resync_after);
1573         }
1574 }
1575
1576 /* caller must lock_all_resources() */
1577 void drbd_resync_after_changed(struct drbd_device *device)
1578 {
1579         int changed;
1580
1581         do {
1582                 changed  = drbd_pause_after(device);
1583                 changed |= drbd_resume_next(device);
1584         } while (changed);
1585 }
1586
1587 void drbd_rs_controller_reset(struct drbd_device *device)
1588 {
1589         struct gendisk *disk = device->ldev->backing_bdev->bd_contains->bd_disk;
1590         struct fifo_buffer *plan;
1591
1592         atomic_set(&device->rs_sect_in, 0);
1593         atomic_set(&device->rs_sect_ev, 0);
1594         device->rs_in_flight = 0;
1595         device->rs_last_events =
1596                 (int)part_stat_read(&disk->part0, sectors[0]) +
1597                 (int)part_stat_read(&disk->part0, sectors[1]);
1598
1599         /* Updating the RCU protected object in place is necessary since
1600            this function gets called from atomic context.
1601            It is valid since all other updates also lead to an completely
1602            empty fifo */
1603         rcu_read_lock();
1604         plan = rcu_dereference(device->rs_plan_s);
1605         plan->total = 0;
1606         fifo_set(plan, 0);
1607         rcu_read_unlock();
1608 }
1609
1610 void start_resync_timer_fn(unsigned long data)
1611 {
1612         struct drbd_device *device = (struct drbd_device *) data;
1613         drbd_device_post_work(device, RS_START);
1614 }
1615
1616 static void do_start_resync(struct drbd_device *device)
1617 {
1618         if (atomic_read(&device->unacked_cnt) || atomic_read(&device->rs_pending_cnt)) {
1619                 drbd_warn(device, "postponing start_resync ...\n");
1620                 device->start_resync_timer.expires = jiffies + HZ/10;
1621                 add_timer(&device->start_resync_timer);
1622                 return;
1623         }
1624
1625         drbd_start_resync(device, C_SYNC_SOURCE);
1626         clear_bit(AHEAD_TO_SYNC_SOURCE, &device->flags);
1627 }
1628
1629 static bool use_checksum_based_resync(struct drbd_connection *connection, struct drbd_device *device)
1630 {
1631         bool csums_after_crash_only;
1632         rcu_read_lock();
1633         csums_after_crash_only = rcu_dereference(connection->net_conf)->csums_after_crash_only;
1634         rcu_read_unlock();
1635         return connection->agreed_pro_version >= 89 &&          /* supported? */
1636                 connection->csums_tfm &&                        /* configured? */
1637                 (csums_after_crash_only == 0                    /* use for each resync? */
1638                  || test_bit(CRASHED_PRIMARY, &device->flags)); /* or only after Primary crash? */
1639 }
1640
1641 /**
1642  * drbd_start_resync() - Start the resync process
1643  * @device:     DRBD device.
1644  * @side:       Either C_SYNC_SOURCE or C_SYNC_TARGET
1645  *
1646  * This function might bring you directly into one of the
1647  * C_PAUSED_SYNC_* states.
1648  */
1649 void drbd_start_resync(struct drbd_device *device, enum drbd_conns side)
1650 {
1651         struct drbd_peer_device *peer_device = first_peer_device(device);
1652         struct drbd_connection *connection = peer_device ? peer_device->connection : NULL;
1653         union drbd_state ns;
1654         int r;
1655
1656         if (device->state.conn >= C_SYNC_SOURCE && device->state.conn < C_AHEAD) {
1657                 drbd_err(device, "Resync already running!\n");
1658                 return;
1659         }
1660
1661         if (!test_bit(B_RS_H_DONE, &device->flags)) {
1662                 if (side == C_SYNC_TARGET) {
1663                         /* Since application IO was locked out during C_WF_BITMAP_T and
1664                            C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1665                            we check that we might make the data inconsistent. */
1666                         r = drbd_khelper(device, "before-resync-target");
1667                         r = (r >> 8) & 0xff;
1668                         if (r > 0) {
1669                                 drbd_info(device, "before-resync-target handler returned %d, "
1670                                          "dropping connection.\n", r);
1671                                 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
1672                                 return;
1673                         }
1674                 } else /* C_SYNC_SOURCE */ {
1675                         r = drbd_khelper(device, "before-resync-source");
1676                         r = (r >> 8) & 0xff;
1677                         if (r > 0) {
1678                                 if (r == 3) {
1679                                         drbd_info(device, "before-resync-source handler returned %d, "
1680                                                  "ignoring. Old userland tools?", r);
1681                                 } else {
1682                                         drbd_info(device, "before-resync-source handler returned %d, "
1683                                                  "dropping connection.\n", r);
1684                                         conn_request_state(connection,
1685                                                            NS(conn, C_DISCONNECTING), CS_HARD);
1686                                         return;
1687                                 }
1688                         }
1689                 }
1690         }
1691
1692         if (current == connection->worker.task) {
1693                 /* The worker should not sleep waiting for state_mutex,
1694                    that can take long */
1695                 if (!mutex_trylock(device->state_mutex)) {
1696                         set_bit(B_RS_H_DONE, &device->flags);
1697                         device->start_resync_timer.expires = jiffies + HZ/5;
1698                         add_timer(&device->start_resync_timer);
1699                         return;
1700                 }
1701         } else {
1702                 mutex_lock(device->state_mutex);
1703         }
1704
1705         lock_all_resources();
1706         clear_bit(B_RS_H_DONE, &device->flags);
1707         /* Did some connection breakage or IO error race with us? */
1708         if (device->state.conn < C_CONNECTED
1709         || !get_ldev_if_state(device, D_NEGOTIATING)) {
1710                 unlock_all_resources();
1711                 goto out;
1712         }
1713
1714         ns = drbd_read_state(device);
1715
1716         ns.aftr_isp = !_drbd_may_sync_now(device);
1717
1718         ns.conn = side;
1719
1720         if (side == C_SYNC_TARGET)
1721                 ns.disk = D_INCONSISTENT;
1722         else /* side == C_SYNC_SOURCE */
1723                 ns.pdsk = D_INCONSISTENT;
1724
1725         r = _drbd_set_state(device, ns, CS_VERBOSE, NULL);
1726         ns = drbd_read_state(device);
1727
1728         if (ns.conn < C_CONNECTED)
1729                 r = SS_UNKNOWN_ERROR;
1730
1731         if (r == SS_SUCCESS) {
1732                 unsigned long tw = drbd_bm_total_weight(device);
1733                 unsigned long now = jiffies;
1734                 int i;
1735
1736                 device->rs_failed    = 0;
1737                 device->rs_paused    = 0;
1738                 device->rs_same_csum = 0;
1739                 device->rs_last_sect_ev = 0;
1740                 device->rs_total     = tw;
1741                 device->rs_start     = now;
1742                 for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1743                         device->rs_mark_left[i] = tw;
1744                         device->rs_mark_time[i] = now;
1745                 }
1746                 drbd_pause_after(device);
1747                 /* Forget potentially stale cached per resync extent bit-counts.
1748                  * Open coded drbd_rs_cancel_all(device), we already have IRQs
1749                  * disabled, and know the disk state is ok. */
1750                 spin_lock(&device->al_lock);
1751                 lc_reset(device->resync);
1752                 device->resync_locked = 0;
1753                 device->resync_wenr = LC_FREE;
1754                 spin_unlock(&device->al_lock);
1755         }
1756         unlock_all_resources();
1757
1758         if (r == SS_SUCCESS) {
1759                 wake_up(&device->al_wait); /* for lc_reset() above */
1760                 /* reset rs_last_bcast when a resync or verify is started,
1761                  * to deal with potential jiffies wrap. */
1762                 device->rs_last_bcast = jiffies - HZ;
1763
1764                 drbd_info(device, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1765                      drbd_conn_str(ns.conn),
1766                      (unsigned long) device->rs_total << (BM_BLOCK_SHIFT-10),
1767                      (unsigned long) device->rs_total);
1768                 if (side == C_SYNC_TARGET) {
1769                         device->bm_resync_fo = 0;
1770                         device->use_csums = use_checksum_based_resync(connection, device);
1771                 } else {
1772                         device->use_csums = 0;
1773                 }
1774
1775                 /* Since protocol 96, we must serialize drbd_gen_and_send_sync_uuid
1776                  * with w_send_oos, or the sync target will get confused as to
1777                  * how much bits to resync.  We cannot do that always, because for an
1778                  * empty resync and protocol < 95, we need to do it here, as we call
1779                  * drbd_resync_finished from here in that case.
1780                  * We drbd_gen_and_send_sync_uuid here for protocol < 96,
1781                  * and from after_state_ch otherwise. */
1782                 if (side == C_SYNC_SOURCE && connection->agreed_pro_version < 96)
1783                         drbd_gen_and_send_sync_uuid(peer_device);
1784
1785                 if (connection->agreed_pro_version < 95 && device->rs_total == 0) {
1786                         /* This still has a race (about when exactly the peers
1787                          * detect connection loss) that can lead to a full sync
1788                          * on next handshake. In 8.3.9 we fixed this with explicit
1789                          * resync-finished notifications, but the fix
1790                          * introduces a protocol change.  Sleeping for some
1791                          * time longer than the ping interval + timeout on the
1792                          * SyncSource, to give the SyncTarget the chance to
1793                          * detect connection loss, then waiting for a ping
1794                          * response (implicit in drbd_resync_finished) reduces
1795                          * the race considerably, but does not solve it. */
1796                         if (side == C_SYNC_SOURCE) {
1797                                 struct net_conf *nc;
1798                                 int timeo;
1799
1800                                 rcu_read_lock();
1801                                 nc = rcu_dereference(connection->net_conf);
1802                                 timeo = nc->ping_int * HZ + nc->ping_timeo * HZ / 9;
1803                                 rcu_read_unlock();
1804                                 schedule_timeout_interruptible(timeo);
1805                         }
1806                         drbd_resync_finished(device);
1807                 }
1808
1809                 drbd_rs_controller_reset(device);
1810                 /* ns.conn may already be != device->state.conn,
1811                  * we may have been paused in between, or become paused until
1812                  * the timer triggers.
1813                  * No matter, that is handled in resync_timer_fn() */
1814                 if (ns.conn == C_SYNC_TARGET)
1815                         mod_timer(&device->resync_timer, jiffies);
1816
1817                 drbd_md_sync(device);
1818         }
1819         put_ldev(device);
1820 out:
1821         mutex_unlock(device->state_mutex);
1822 }
1823
1824 static void update_on_disk_bitmap(struct drbd_device *device, bool resync_done)
1825 {
1826         struct sib_info sib = { .sib_reason = SIB_SYNC_PROGRESS, };
1827         device->rs_last_bcast = jiffies;
1828
1829         if (!get_ldev(device))
1830                 return;
1831
1832         drbd_bm_write_lazy(device, 0);
1833         if (resync_done && is_sync_state(device->state.conn))
1834                 drbd_resync_finished(device);
1835
1836         drbd_bcast_event(device, &sib);
1837         /* update timestamp, in case it took a while to write out stuff */
1838         device->rs_last_bcast = jiffies;
1839         put_ldev(device);
1840 }
1841
1842 static void drbd_ldev_destroy(struct drbd_device *device)
1843 {
1844         lc_destroy(device->resync);
1845         device->resync = NULL;
1846         lc_destroy(device->act_log);
1847         device->act_log = NULL;
1848
1849         __acquire(local);
1850         drbd_backing_dev_free(device, device->ldev);
1851         device->ldev = NULL;
1852         __release(local);
1853
1854         clear_bit(GOING_DISKLESS, &device->flags);
1855         wake_up(&device->misc_wait);
1856 }
1857
1858 static void go_diskless(struct drbd_device *device)
1859 {
1860         D_ASSERT(device, device->state.disk == D_FAILED);
1861         /* we cannot assert local_cnt == 0 here, as get_ldev_if_state will
1862          * inc/dec it frequently. Once we are D_DISKLESS, no one will touch
1863          * the protected members anymore, though, so once put_ldev reaches zero
1864          * again, it will be safe to free them. */
1865
1866         /* Try to write changed bitmap pages, read errors may have just
1867          * set some bits outside the area covered by the activity log.
1868          *
1869          * If we have an IO error during the bitmap writeout,
1870          * we will want a full sync next time, just in case.
1871          * (Do we want a specific meta data flag for this?)
1872          *
1873          * If that does not make it to stable storage either,
1874          * we cannot do anything about that anymore.
1875          *
1876          * We still need to check if both bitmap and ldev are present, we may
1877          * end up here after a failed attach, before ldev was even assigned.
1878          */
1879         if (device->bitmap && device->ldev) {
1880                 /* An interrupted resync or similar is allowed to recounts bits
1881                  * while we detach.
1882                  * Any modifications would not be expected anymore, though.
1883                  */
1884                 if (drbd_bitmap_io_from_worker(device, drbd_bm_write,
1885                                         "detach", BM_LOCKED_TEST_ALLOWED)) {
1886                         if (test_bit(WAS_READ_ERROR, &device->flags)) {
1887                                 drbd_md_set_flag(device, MDF_FULL_SYNC);
1888                                 drbd_md_sync(device);
1889                         }
1890                 }
1891         }
1892
1893         drbd_force_state(device, NS(disk, D_DISKLESS));
1894 }
1895
1896 static int do_md_sync(struct drbd_device *device)
1897 {
1898         drbd_warn(device, "md_sync_timer expired! Worker calls drbd_md_sync().\n");
1899         drbd_md_sync(device);
1900         return 0;
1901 }
1902
1903 /* only called from drbd_worker thread, no locking */
1904 void __update_timing_details(
1905                 struct drbd_thread_timing_details *tdp,
1906                 unsigned int *cb_nr,
1907                 void *cb,
1908                 const char *fn, const unsigned int line)
1909 {
1910         unsigned int i = *cb_nr % DRBD_THREAD_DETAILS_HIST;
1911         struct drbd_thread_timing_details *td = tdp + i;
1912
1913         td->start_jif = jiffies;
1914         td->cb_addr = cb;
1915         td->caller_fn = fn;
1916         td->line = line;
1917         td->cb_nr = *cb_nr;
1918
1919         i = (i+1) % DRBD_THREAD_DETAILS_HIST;
1920         td = tdp + i;
1921         memset(td, 0, sizeof(*td));
1922
1923         ++(*cb_nr);
1924 }
1925
1926 static void do_device_work(struct drbd_device *device, const unsigned long todo)
1927 {
1928         if (test_bit(MD_SYNC, &todo))
1929                 do_md_sync(device);
1930         if (test_bit(RS_DONE, &todo) ||
1931             test_bit(RS_PROGRESS, &todo))
1932                 update_on_disk_bitmap(device, test_bit(RS_DONE, &todo));
1933         if (test_bit(GO_DISKLESS, &todo))
1934                 go_diskless(device);
1935         if (test_bit(DESTROY_DISK, &todo))
1936                 drbd_ldev_destroy(device);
1937         if (test_bit(RS_START, &todo))
1938                 do_start_resync(device);
1939 }
1940
1941 #define DRBD_DEVICE_WORK_MASK   \
1942         ((1UL << GO_DISKLESS)   \
1943         |(1UL << DESTROY_DISK)  \
1944         |(1UL << MD_SYNC)       \
1945         |(1UL << RS_START)      \
1946         |(1UL << RS_PROGRESS)   \
1947         |(1UL << RS_DONE)       \
1948         )
1949
1950 static unsigned long get_work_bits(unsigned long *flags)
1951 {
1952         unsigned long old, new;
1953         do {
1954                 old = *flags;
1955                 new = old & ~DRBD_DEVICE_WORK_MASK;
1956         } while (cmpxchg(flags, old, new) != old);
1957         return old & DRBD_DEVICE_WORK_MASK;
1958 }
1959
1960 static void do_unqueued_work(struct drbd_connection *connection)
1961 {
1962         struct drbd_peer_device *peer_device;
1963         int vnr;
1964
1965         rcu_read_lock();
1966         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1967                 struct drbd_device *device = peer_device->device;
1968                 unsigned long todo = get_work_bits(&device->flags);
1969                 if (!todo)
1970                         continue;
1971
1972                 kref_get(&device->kref);
1973                 rcu_read_unlock();
1974                 do_device_work(device, todo);
1975                 kref_put(&device->kref, drbd_destroy_device);
1976                 rcu_read_lock();
1977         }
1978         rcu_read_unlock();
1979 }
1980
1981 static bool dequeue_work_batch(struct drbd_work_queue *queue, struct list_head *work_list)
1982 {
1983         spin_lock_irq(&queue->q_lock);
1984         list_splice_tail_init(&queue->q, work_list);
1985         spin_unlock_irq(&queue->q_lock);
1986         return !list_empty(work_list);
1987 }
1988
1989 static void wait_for_work(struct drbd_connection *connection, struct list_head *work_list)
1990 {
1991         DEFINE_WAIT(wait);
1992         struct net_conf *nc;
1993         int uncork, cork;
1994
1995         dequeue_work_batch(&connection->sender_work, work_list);
1996         if (!list_empty(work_list))
1997                 return;
1998
1999         /* Still nothing to do?
2000          * Maybe we still need to close the current epoch,
2001          * even if no new requests are queued yet.
2002          *
2003          * Also, poke TCP, just in case.
2004          * Then wait for new work (or signal). */
2005         rcu_read_lock();
2006         nc = rcu_dereference(connection->net_conf);
2007         uncork = nc ? nc->tcp_cork : 0;
2008         rcu_read_unlock();
2009         if (uncork) {
2010                 mutex_lock(&connection->data.mutex);
2011                 if (connection->data.socket)
2012                         drbd_tcp_uncork(connection->data.socket);
2013                 mutex_unlock(&connection->data.mutex);
2014         }
2015
2016         for (;;) {
2017                 int send_barrier;
2018                 prepare_to_wait(&connection->sender_work.q_wait, &wait, TASK_INTERRUPTIBLE);
2019                 spin_lock_irq(&connection->resource->req_lock);
2020                 spin_lock(&connection->sender_work.q_lock);     /* FIXME get rid of this one? */
2021                 if (!list_empty(&connection->sender_work.q))
2022                         list_splice_tail_init(&connection->sender_work.q, work_list);
2023                 spin_unlock(&connection->sender_work.q_lock);   /* FIXME get rid of this one? */
2024                 if (!list_empty(work_list) || signal_pending(current)) {
2025                         spin_unlock_irq(&connection->resource->req_lock);
2026                         break;
2027                 }
2028
2029                 /* We found nothing new to do, no to-be-communicated request,
2030                  * no other work item.  We may still need to close the last
2031                  * epoch.  Next incoming request epoch will be connection ->
2032                  * current transfer log epoch number.  If that is different
2033                  * from the epoch of the last request we communicated, it is
2034                  * safe to send the epoch separating barrier now.
2035                  */
2036                 send_barrier =
2037                         atomic_read(&connection->current_tle_nr) !=
2038                         connection->send.current_epoch_nr;
2039                 spin_unlock_irq(&connection->resource->req_lock);
2040
2041                 if (send_barrier)
2042                         maybe_send_barrier(connection,
2043                                         connection->send.current_epoch_nr + 1);
2044
2045                 if (test_bit(DEVICE_WORK_PENDING, &connection->flags))
2046                         break;
2047
2048                 /* drbd_send() may have called flush_signals() */
2049                 if (get_t_state(&connection->worker) != RUNNING)
2050                         break;
2051
2052                 schedule();
2053                 /* may be woken up for other things but new work, too,
2054                  * e.g. if the current epoch got closed.
2055                  * In which case we send the barrier above. */
2056         }
2057         finish_wait(&connection->sender_work.q_wait, &wait);
2058
2059         /* someone may have changed the config while we have been waiting above. */
2060         rcu_read_lock();
2061         nc = rcu_dereference(connection->net_conf);
2062         cork = nc ? nc->tcp_cork : 0;
2063         rcu_read_unlock();
2064         mutex_lock(&connection->data.mutex);
2065         if (connection->data.socket) {
2066                 if (cork)
2067                         drbd_tcp_cork(connection->data.socket);
2068                 else if (!uncork)
2069                         drbd_tcp_uncork(connection->data.socket);
2070         }
2071         mutex_unlock(&connection->data.mutex);
2072 }
2073
2074 int drbd_worker(struct drbd_thread *thi)
2075 {
2076         struct drbd_connection *connection = thi->connection;
2077         struct drbd_work *w = NULL;
2078         struct drbd_peer_device *peer_device;
2079         LIST_HEAD(work_list);
2080         int vnr;
2081
2082         while (get_t_state(thi) == RUNNING) {
2083                 drbd_thread_current_set_cpu(thi);
2084
2085                 if (list_empty(&work_list)) {
2086                         update_worker_timing_details(connection, wait_for_work);
2087                         wait_for_work(connection, &work_list);
2088                 }
2089
2090                 if (test_and_clear_bit(DEVICE_WORK_PENDING, &connection->flags)) {
2091                         update_worker_timing_details(connection, do_unqueued_work);
2092                         do_unqueued_work(connection);
2093                 }
2094
2095                 if (signal_pending(current)) {
2096                         flush_signals(current);
2097                         if (get_t_state(thi) == RUNNING) {
2098                                 drbd_warn(connection, "Worker got an unexpected signal\n");
2099                                 continue;
2100                         }
2101                         break;
2102                 }
2103
2104                 if (get_t_state(thi) != RUNNING)
2105                         break;
2106
2107                 if (!list_empty(&work_list)) {
2108                         w = list_first_entry(&work_list, struct drbd_work, list);
2109                         list_del_init(&w->list);
2110                         update_worker_timing_details(connection, w->cb);
2111                         if (w->cb(w, connection->cstate < C_WF_REPORT_PARAMS) == 0)
2112                                 continue;
2113                         if (connection->cstate >= C_WF_REPORT_PARAMS)
2114                                 conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
2115                 }
2116         }
2117
2118         do {
2119                 if (test_and_clear_bit(DEVICE_WORK_PENDING, &connection->flags)) {
2120                         update_worker_timing_details(connection, do_unqueued_work);
2121                         do_unqueued_work(connection);
2122                 }
2123                 if (!list_empty(&work_list)) {
2124                         w = list_first_entry(&work_list, struct drbd_work, list);
2125                         list_del_init(&w->list);
2126                         update_worker_timing_details(connection, w->cb);
2127                         w->cb(w, 1);
2128                 } else
2129                         dequeue_work_batch(&connection->sender_work, &work_list);
2130         } while (!list_empty(&work_list) || test_bit(DEVICE_WORK_PENDING, &connection->flags));
2131
2132         rcu_read_lock();
2133         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
2134                 struct drbd_device *device = peer_device->device;
2135                 D_ASSERT(device, device->state.disk == D_DISKLESS && device->state.conn == C_STANDALONE);
2136                 kref_get(&device->kref);
2137                 rcu_read_unlock();
2138                 drbd_device_cleanup(device);
2139                 kref_put(&device->kref, drbd_destroy_device);
2140                 rcu_read_lock();
2141         }
2142         rcu_read_unlock();
2143
2144         return 0;
2145 }