]> git.kernelconcepts.de Git - karo-tx-linux.git/blob - drivers/block/drbd/drbd_receiver.c
Merge branch 'for-3.5/drivers' of git://git.kernel.dk/linux-block
[karo-tx-linux.git] / drivers / block / drbd / drbd_receiver.c
1 /*
2    drbd_receiver.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24
25
26 #include <linux/module.h>
27
28 #include <asm/uaccess.h>
29 #include <net/sock.h>
30
31 #include <linux/drbd.h>
32 #include <linux/fs.h>
33 #include <linux/file.h>
34 #include <linux/in.h>
35 #include <linux/mm.h>
36 #include <linux/memcontrol.h>
37 #include <linux/mm_inline.h>
38 #include <linux/slab.h>
39 #include <linux/pkt_sched.h>
40 #define __KERNEL_SYSCALLS__
41 #include <linux/unistd.h>
42 #include <linux/vmalloc.h>
43 #include <linux/random.h>
44 #include <linux/string.h>
45 #include <linux/scatterlist.h>
46 #include "drbd_int.h"
47 #include "drbd_req.h"
48
49 #include "drbd_vli.h"
50
51 enum finish_epoch {
52         FE_STILL_LIVE,
53         FE_DESTROYED,
54         FE_RECYCLED,
55 };
56
57 static int drbd_do_handshake(struct drbd_conf *mdev);
58 static int drbd_do_auth(struct drbd_conf *mdev);
59
60 static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *, struct drbd_epoch *, enum epoch_event);
61 static int e_end_block(struct drbd_conf *, struct drbd_work *, int);
62
63
64 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
65
66 /*
67  * some helper functions to deal with single linked page lists,
68  * page->private being our "next" pointer.
69  */
70
71 /* If at least n pages are linked at head, get n pages off.
72  * Otherwise, don't modify head, and return NULL.
73  * Locking is the responsibility of the caller.
74  */
75 static struct page *page_chain_del(struct page **head, int n)
76 {
77         struct page *page;
78         struct page *tmp;
79
80         BUG_ON(!n);
81         BUG_ON(!head);
82
83         page = *head;
84
85         if (!page)
86                 return NULL;
87
88         while (page) {
89                 tmp = page_chain_next(page);
90                 if (--n == 0)
91                         break; /* found sufficient pages */
92                 if (tmp == NULL)
93                         /* insufficient pages, don't use any of them. */
94                         return NULL;
95                 page = tmp;
96         }
97
98         /* add end of list marker for the returned list */
99         set_page_private(page, 0);
100         /* actual return value, and adjustment of head */
101         page = *head;
102         *head = tmp;
103         return page;
104 }
105
106 /* may be used outside of locks to find the tail of a (usually short)
107  * "private" page chain, before adding it back to a global chain head
108  * with page_chain_add() under a spinlock. */
109 static struct page *page_chain_tail(struct page *page, int *len)
110 {
111         struct page *tmp;
112         int i = 1;
113         while ((tmp = page_chain_next(page)))
114                 ++i, page = tmp;
115         if (len)
116                 *len = i;
117         return page;
118 }
119
120 static int page_chain_free(struct page *page)
121 {
122         struct page *tmp;
123         int i = 0;
124         page_chain_for_each_safe(page, tmp) {
125                 put_page(page);
126                 ++i;
127         }
128         return i;
129 }
130
131 static void page_chain_add(struct page **head,
132                 struct page *chain_first, struct page *chain_last)
133 {
134 #if 1
135         struct page *tmp;
136         tmp = page_chain_tail(chain_first, NULL);
137         BUG_ON(tmp != chain_last);
138 #endif
139
140         /* add chain to head */
141         set_page_private(chain_last, (unsigned long)*head);
142         *head = chain_first;
143 }
144
145 static struct page *drbd_pp_first_pages_or_try_alloc(struct drbd_conf *mdev, int number)
146 {
147         struct page *page = NULL;
148         struct page *tmp = NULL;
149         int i = 0;
150
151         /* Yes, testing drbd_pp_vacant outside the lock is racy.
152          * So what. It saves a spin_lock. */
153         if (drbd_pp_vacant >= number) {
154                 spin_lock(&drbd_pp_lock);
155                 page = page_chain_del(&drbd_pp_pool, number);
156                 if (page)
157                         drbd_pp_vacant -= number;
158                 spin_unlock(&drbd_pp_lock);
159                 if (page)
160                         return page;
161         }
162
163         /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
164          * "criss-cross" setup, that might cause write-out on some other DRBD,
165          * which in turn might block on the other node at this very place.  */
166         for (i = 0; i < number; i++) {
167                 tmp = alloc_page(GFP_TRY);
168                 if (!tmp)
169                         break;
170                 set_page_private(tmp, (unsigned long)page);
171                 page = tmp;
172         }
173
174         if (i == number)
175                 return page;
176
177         /* Not enough pages immediately available this time.
178          * No need to jump around here, drbd_pp_alloc will retry this
179          * function "soon". */
180         if (page) {
181                 tmp = page_chain_tail(page, NULL);
182                 spin_lock(&drbd_pp_lock);
183                 page_chain_add(&drbd_pp_pool, page, tmp);
184                 drbd_pp_vacant += i;
185                 spin_unlock(&drbd_pp_lock);
186         }
187         return NULL;
188 }
189
190 static void reclaim_net_ee(struct drbd_conf *mdev, struct list_head *to_be_freed)
191 {
192         struct drbd_epoch_entry *e;
193         struct list_head *le, *tle;
194
195         /* The EEs are always appended to the end of the list. Since
196            they are sent in order over the wire, they have to finish
197            in order. As soon as we see the first not finished we can
198            stop to examine the list... */
199
200         list_for_each_safe(le, tle, &mdev->net_ee) {
201                 e = list_entry(le, struct drbd_epoch_entry, w.list);
202                 if (drbd_ee_has_active_page(e))
203                         break;
204                 list_move(le, to_be_freed);
205         }
206 }
207
208 static void drbd_kick_lo_and_reclaim_net(struct drbd_conf *mdev)
209 {
210         LIST_HEAD(reclaimed);
211         struct drbd_epoch_entry *e, *t;
212
213         spin_lock_irq(&mdev->req_lock);
214         reclaim_net_ee(mdev, &reclaimed);
215         spin_unlock_irq(&mdev->req_lock);
216
217         list_for_each_entry_safe(e, t, &reclaimed, w.list)
218                 drbd_free_net_ee(mdev, e);
219 }
220
221 /**
222  * drbd_pp_alloc() - Returns @number pages, retries forever (or until signalled)
223  * @mdev:       DRBD device.
224  * @number:     number of pages requested
225  * @retry:      whether to retry, if not enough pages are available right now
226  *
227  * Tries to allocate number pages, first from our own page pool, then from
228  * the kernel, unless this allocation would exceed the max_buffers setting.
229  * Possibly retry until DRBD frees sufficient pages somewhere else.
230  *
231  * Returns a page chain linked via page->private.
232  */
233 static struct page *drbd_pp_alloc(struct drbd_conf *mdev, unsigned number, bool retry)
234 {
235         struct page *page = NULL;
236         DEFINE_WAIT(wait);
237
238         /* Yes, we may run up to @number over max_buffers. If we
239          * follow it strictly, the admin will get it wrong anyways. */
240         if (atomic_read(&mdev->pp_in_use) < mdev->net_conf->max_buffers)
241                 page = drbd_pp_first_pages_or_try_alloc(mdev, number);
242
243         while (page == NULL) {
244                 prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
245
246                 drbd_kick_lo_and_reclaim_net(mdev);
247
248                 if (atomic_read(&mdev->pp_in_use) < mdev->net_conf->max_buffers) {
249                         page = drbd_pp_first_pages_or_try_alloc(mdev, number);
250                         if (page)
251                                 break;
252                 }
253
254                 if (!retry)
255                         break;
256
257                 if (signal_pending(current)) {
258                         dev_warn(DEV, "drbd_pp_alloc interrupted!\n");
259                         break;
260                 }
261
262                 schedule();
263         }
264         finish_wait(&drbd_pp_wait, &wait);
265
266         if (page)
267                 atomic_add(number, &mdev->pp_in_use);
268         return page;
269 }
270
271 /* Must not be used from irq, as that may deadlock: see drbd_pp_alloc.
272  * Is also used from inside an other spin_lock_irq(&mdev->req_lock);
273  * Either links the page chain back to the global pool,
274  * or returns all pages to the system. */
275 static void drbd_pp_free(struct drbd_conf *mdev, struct page *page, int is_net)
276 {
277         atomic_t *a = is_net ? &mdev->pp_in_use_by_net : &mdev->pp_in_use;
278         int i;
279
280         if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE)*minor_count)
281                 i = page_chain_free(page);
282         else {
283                 struct page *tmp;
284                 tmp = page_chain_tail(page, &i);
285                 spin_lock(&drbd_pp_lock);
286                 page_chain_add(&drbd_pp_pool, page, tmp);
287                 drbd_pp_vacant += i;
288                 spin_unlock(&drbd_pp_lock);
289         }
290         i = atomic_sub_return(i, a);
291         if (i < 0)
292                 dev_warn(DEV, "ASSERTION FAILED: %s: %d < 0\n",
293                         is_net ? "pp_in_use_by_net" : "pp_in_use", i);
294         wake_up(&drbd_pp_wait);
295 }
296
297 /*
298 You need to hold the req_lock:
299  _drbd_wait_ee_list_empty()
300
301 You must not have the req_lock:
302  drbd_free_ee()
303  drbd_alloc_ee()
304  drbd_init_ee()
305  drbd_release_ee()
306  drbd_ee_fix_bhs()
307  drbd_process_done_ee()
308  drbd_clear_done_ee()
309  drbd_wait_ee_list_empty()
310 */
311
312 struct drbd_epoch_entry *drbd_alloc_ee(struct drbd_conf *mdev,
313                                      u64 id,
314                                      sector_t sector,
315                                      unsigned int data_size,
316                                      gfp_t gfp_mask) __must_hold(local)
317 {
318         struct drbd_epoch_entry *e;
319         struct page *page;
320         unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
321
322         if (drbd_insert_fault(mdev, DRBD_FAULT_AL_EE))
323                 return NULL;
324
325         e = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
326         if (!e) {
327                 if (!(gfp_mask & __GFP_NOWARN))
328                         dev_err(DEV, "alloc_ee: Allocation of an EE failed\n");
329                 return NULL;
330         }
331
332         page = drbd_pp_alloc(mdev, nr_pages, (gfp_mask & __GFP_WAIT));
333         if (!page)
334                 goto fail;
335
336         INIT_HLIST_NODE(&e->collision);
337         e->epoch = NULL;
338         e->mdev = mdev;
339         e->pages = page;
340         atomic_set(&e->pending_bios, 0);
341         e->size = data_size;
342         e->flags = 0;
343         e->sector = sector;
344         e->block_id = id;
345
346         return e;
347
348  fail:
349         mempool_free(e, drbd_ee_mempool);
350         return NULL;
351 }
352
353 void drbd_free_some_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e, int is_net)
354 {
355         if (e->flags & EE_HAS_DIGEST)
356                 kfree(e->digest);
357         drbd_pp_free(mdev, e->pages, is_net);
358         D_ASSERT(atomic_read(&e->pending_bios) == 0);
359         D_ASSERT(hlist_unhashed(&e->collision));
360         mempool_free(e, drbd_ee_mempool);
361 }
362
363 int drbd_release_ee(struct drbd_conf *mdev, struct list_head *list)
364 {
365         LIST_HEAD(work_list);
366         struct drbd_epoch_entry *e, *t;
367         int count = 0;
368         int is_net = list == &mdev->net_ee;
369
370         spin_lock_irq(&mdev->req_lock);
371         list_splice_init(list, &work_list);
372         spin_unlock_irq(&mdev->req_lock);
373
374         list_for_each_entry_safe(e, t, &work_list, w.list) {
375                 drbd_free_some_ee(mdev, e, is_net);
376                 count++;
377         }
378         return count;
379 }
380
381
382 /*
383  * This function is called from _asender only_
384  * but see also comments in _req_mod(,barrier_acked)
385  * and receive_Barrier.
386  *
387  * Move entries from net_ee to done_ee, if ready.
388  * Grab done_ee, call all callbacks, free the entries.
389  * The callbacks typically send out ACKs.
390  */
391 static int drbd_process_done_ee(struct drbd_conf *mdev)
392 {
393         LIST_HEAD(work_list);
394         LIST_HEAD(reclaimed);
395         struct drbd_epoch_entry *e, *t;
396         int ok = (mdev->state.conn >= C_WF_REPORT_PARAMS);
397
398         spin_lock_irq(&mdev->req_lock);
399         reclaim_net_ee(mdev, &reclaimed);
400         list_splice_init(&mdev->done_ee, &work_list);
401         spin_unlock_irq(&mdev->req_lock);
402
403         list_for_each_entry_safe(e, t, &reclaimed, w.list)
404                 drbd_free_net_ee(mdev, e);
405
406         /* possible callbacks here:
407          * e_end_block, and e_end_resync_block, e_send_discard_ack.
408          * all ignore the last argument.
409          */
410         list_for_each_entry_safe(e, t, &work_list, w.list) {
411                 /* list_del not necessary, next/prev members not touched */
412                 ok = e->w.cb(mdev, &e->w, !ok) && ok;
413                 drbd_free_ee(mdev, e);
414         }
415         wake_up(&mdev->ee_wait);
416
417         return ok;
418 }
419
420 void _drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
421 {
422         DEFINE_WAIT(wait);
423
424         /* avoids spin_lock/unlock
425          * and calling prepare_to_wait in the fast path */
426         while (!list_empty(head)) {
427                 prepare_to_wait(&mdev->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
428                 spin_unlock_irq(&mdev->req_lock);
429                 io_schedule();
430                 finish_wait(&mdev->ee_wait, &wait);
431                 spin_lock_irq(&mdev->req_lock);
432         }
433 }
434
435 void drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
436 {
437         spin_lock_irq(&mdev->req_lock);
438         _drbd_wait_ee_list_empty(mdev, head);
439         spin_unlock_irq(&mdev->req_lock);
440 }
441
442 /* see also kernel_accept; which is only present since 2.6.18.
443  * also we want to log which part of it failed, exactly */
444 static int drbd_accept(struct drbd_conf *mdev, const char **what,
445                 struct socket *sock, struct socket **newsock)
446 {
447         struct sock *sk = sock->sk;
448         int err = 0;
449
450         *what = "listen";
451         err = sock->ops->listen(sock, 5);
452         if (err < 0)
453                 goto out;
454
455         *what = "sock_create_lite";
456         err = sock_create_lite(sk->sk_family, sk->sk_type, sk->sk_protocol,
457                                newsock);
458         if (err < 0)
459                 goto out;
460
461         *what = "accept";
462         err = sock->ops->accept(sock, *newsock, 0);
463         if (err < 0) {
464                 sock_release(*newsock);
465                 *newsock = NULL;
466                 goto out;
467         }
468         (*newsock)->ops  = sock->ops;
469         __module_get((*newsock)->ops->owner);
470
471 out:
472         return err;
473 }
474
475 static int drbd_recv_short(struct drbd_conf *mdev, struct socket *sock,
476                     void *buf, size_t size, int flags)
477 {
478         mm_segment_t oldfs;
479         struct kvec iov = {
480                 .iov_base = buf,
481                 .iov_len = size,
482         };
483         struct msghdr msg = {
484                 .msg_iovlen = 1,
485                 .msg_iov = (struct iovec *)&iov,
486                 .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
487         };
488         int rv;
489
490         oldfs = get_fs();
491         set_fs(KERNEL_DS);
492         rv = sock_recvmsg(sock, &msg, size, msg.msg_flags);
493         set_fs(oldfs);
494
495         return rv;
496 }
497
498 static int drbd_recv(struct drbd_conf *mdev, void *buf, size_t size)
499 {
500         mm_segment_t oldfs;
501         struct kvec iov = {
502                 .iov_base = buf,
503                 .iov_len = size,
504         };
505         struct msghdr msg = {
506                 .msg_iovlen = 1,
507                 .msg_iov = (struct iovec *)&iov,
508                 .msg_flags = MSG_WAITALL | MSG_NOSIGNAL
509         };
510         int rv;
511
512         oldfs = get_fs();
513         set_fs(KERNEL_DS);
514
515         for (;;) {
516                 rv = sock_recvmsg(mdev->data.socket, &msg, size, msg.msg_flags);
517                 if (rv == size)
518                         break;
519
520                 /* Note:
521                  * ECONNRESET   other side closed the connection
522                  * ERESTARTSYS  (on  sock) we got a signal
523                  */
524
525                 if (rv < 0) {
526                         if (rv == -ECONNRESET)
527                                 dev_info(DEV, "sock was reset by peer\n");
528                         else if (rv != -ERESTARTSYS)
529                                 dev_err(DEV, "sock_recvmsg returned %d\n", rv);
530                         break;
531                 } else if (rv == 0) {
532                         dev_info(DEV, "sock was shut down by peer\n");
533                         break;
534                 } else  {
535                         /* signal came in, or peer/link went down,
536                          * after we read a partial message
537                          */
538                         /* D_ASSERT(signal_pending(current)); */
539                         break;
540                 }
541         };
542
543         set_fs(oldfs);
544
545         if (rv != size)
546                 drbd_force_state(mdev, NS(conn, C_BROKEN_PIPE));
547
548         return rv;
549 }
550
551 /* quoting tcp(7):
552  *   On individual connections, the socket buffer size must be set prior to the
553  *   listen(2) or connect(2) calls in order to have it take effect.
554  * This is our wrapper to do so.
555  */
556 static void drbd_setbufsize(struct socket *sock, unsigned int snd,
557                 unsigned int rcv)
558 {
559         /* open coded SO_SNDBUF, SO_RCVBUF */
560         if (snd) {
561                 sock->sk->sk_sndbuf = snd;
562                 sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
563         }
564         if (rcv) {
565                 sock->sk->sk_rcvbuf = rcv;
566                 sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
567         }
568 }
569
570 static struct socket *drbd_try_connect(struct drbd_conf *mdev)
571 {
572         const char *what;
573         struct socket *sock;
574         struct sockaddr_in6 src_in6;
575         int err;
576         int disconnect_on_error = 1;
577
578         if (!get_net_conf(mdev))
579                 return NULL;
580
581         what = "sock_create_kern";
582         err = sock_create_kern(((struct sockaddr *)mdev->net_conf->my_addr)->sa_family,
583                 SOCK_STREAM, IPPROTO_TCP, &sock);
584         if (err < 0) {
585                 sock = NULL;
586                 goto out;
587         }
588
589         sock->sk->sk_rcvtimeo =
590         sock->sk->sk_sndtimeo =  mdev->net_conf->try_connect_int*HZ;
591         drbd_setbufsize(sock, mdev->net_conf->sndbuf_size,
592                         mdev->net_conf->rcvbuf_size);
593
594        /* explicitly bind to the configured IP as source IP
595         *  for the outgoing connections.
596         *  This is needed for multihomed hosts and to be
597         *  able to use lo: interfaces for drbd.
598         * Make sure to use 0 as port number, so linux selects
599         *  a free one dynamically.
600         */
601         memcpy(&src_in6, mdev->net_conf->my_addr,
602                min_t(int, mdev->net_conf->my_addr_len, sizeof(src_in6)));
603         if (((struct sockaddr *)mdev->net_conf->my_addr)->sa_family == AF_INET6)
604                 src_in6.sin6_port = 0;
605         else
606                 ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
607
608         what = "bind before connect";
609         err = sock->ops->bind(sock,
610                               (struct sockaddr *) &src_in6,
611                               mdev->net_conf->my_addr_len);
612         if (err < 0)
613                 goto out;
614
615         /* connect may fail, peer not yet available.
616          * stay C_WF_CONNECTION, don't go Disconnecting! */
617         disconnect_on_error = 0;
618         what = "connect";
619         err = sock->ops->connect(sock,
620                                  (struct sockaddr *)mdev->net_conf->peer_addr,
621                                  mdev->net_conf->peer_addr_len, 0);
622
623 out:
624         if (err < 0) {
625                 if (sock) {
626                         sock_release(sock);
627                         sock = NULL;
628                 }
629                 switch (-err) {
630                         /* timeout, busy, signal pending */
631                 case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
632                 case EINTR: case ERESTARTSYS:
633                         /* peer not (yet) available, network problem */
634                 case ECONNREFUSED: case ENETUNREACH:
635                 case EHOSTDOWN:    case EHOSTUNREACH:
636                         disconnect_on_error = 0;
637                         break;
638                 default:
639                         dev_err(DEV, "%s failed, err = %d\n", what, err);
640                 }
641                 if (disconnect_on_error)
642                         drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
643         }
644         put_net_conf(mdev);
645         return sock;
646 }
647
648 static struct socket *drbd_wait_for_connect(struct drbd_conf *mdev)
649 {
650         int timeo, err;
651         struct socket *s_estab = NULL, *s_listen;
652         const char *what;
653
654         if (!get_net_conf(mdev))
655                 return NULL;
656
657         what = "sock_create_kern";
658         err = sock_create_kern(((struct sockaddr *)mdev->net_conf->my_addr)->sa_family,
659                 SOCK_STREAM, IPPROTO_TCP, &s_listen);
660         if (err) {
661                 s_listen = NULL;
662                 goto out;
663         }
664
665         timeo = mdev->net_conf->try_connect_int * HZ;
666         timeo += (random32() & 1) ? timeo / 7 : -timeo / 7; /* 28.5% random jitter */
667
668         s_listen->sk->sk_reuse    = SK_CAN_REUSE; /* SO_REUSEADDR */
669         s_listen->sk->sk_rcvtimeo = timeo;
670         s_listen->sk->sk_sndtimeo = timeo;
671         drbd_setbufsize(s_listen, mdev->net_conf->sndbuf_size,
672                         mdev->net_conf->rcvbuf_size);
673
674         what = "bind before listen";
675         err = s_listen->ops->bind(s_listen,
676                               (struct sockaddr *) mdev->net_conf->my_addr,
677                               mdev->net_conf->my_addr_len);
678         if (err < 0)
679                 goto out;
680
681         err = drbd_accept(mdev, &what, s_listen, &s_estab);
682
683 out:
684         if (s_listen)
685                 sock_release(s_listen);
686         if (err < 0) {
687                 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
688                         dev_err(DEV, "%s failed, err = %d\n", what, err);
689                         drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
690                 }
691         }
692         put_net_conf(mdev);
693
694         return s_estab;
695 }
696
697 static int drbd_send_fp(struct drbd_conf *mdev,
698         struct socket *sock, enum drbd_packets cmd)
699 {
700         struct p_header80 *h = &mdev->data.sbuf.header.h80;
701
702         return _drbd_send_cmd(mdev, sock, cmd, h, sizeof(*h), 0);
703 }
704
705 static enum drbd_packets drbd_recv_fp(struct drbd_conf *mdev, struct socket *sock)
706 {
707         struct p_header80 *h = &mdev->data.rbuf.header.h80;
708         int rr;
709
710         rr = drbd_recv_short(mdev, sock, h, sizeof(*h), 0);
711
712         if (rr == sizeof(*h) && h->magic == BE_DRBD_MAGIC)
713                 return be16_to_cpu(h->command);
714
715         return 0xffff;
716 }
717
718 /**
719  * drbd_socket_okay() - Free the socket if its connection is not okay
720  * @mdev:       DRBD device.
721  * @sock:       pointer to the pointer to the socket.
722  */
723 static int drbd_socket_okay(struct drbd_conf *mdev, struct socket **sock)
724 {
725         int rr;
726         char tb[4];
727
728         if (!*sock)
729                 return false;
730
731         rr = drbd_recv_short(mdev, *sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
732
733         if (rr > 0 || rr == -EAGAIN) {
734                 return true;
735         } else {
736                 sock_release(*sock);
737                 *sock = NULL;
738                 return false;
739         }
740 }
741
742 /*
743  * return values:
744  *   1 yes, we have a valid connection
745  *   0 oops, did not work out, please try again
746  *  -1 peer talks different language,
747  *     no point in trying again, please go standalone.
748  *  -2 We do not have a network config...
749  */
750 static int drbd_connect(struct drbd_conf *mdev)
751 {
752         struct socket *s, *sock, *msock;
753         int try, h, ok;
754         enum drbd_state_rv rv;
755
756         D_ASSERT(!mdev->data.socket);
757
758         if (drbd_request_state(mdev, NS(conn, C_WF_CONNECTION)) < SS_SUCCESS)
759                 return -2;
760
761         clear_bit(DISCARD_CONCURRENT, &mdev->flags);
762
763         sock  = NULL;
764         msock = NULL;
765
766         do {
767                 for (try = 0;;) {
768                         /* 3 tries, this should take less than a second! */
769                         s = drbd_try_connect(mdev);
770                         if (s || ++try >= 3)
771                                 break;
772                         /* give the other side time to call bind() & listen() */
773                         schedule_timeout_interruptible(HZ / 10);
774                 }
775
776                 if (s) {
777                         if (!sock) {
778                                 drbd_send_fp(mdev, s, P_HAND_SHAKE_S);
779                                 sock = s;
780                                 s = NULL;
781                         } else if (!msock) {
782                                 drbd_send_fp(mdev, s, P_HAND_SHAKE_M);
783                                 msock = s;
784                                 s = NULL;
785                         } else {
786                                 dev_err(DEV, "Logic error in drbd_connect()\n");
787                                 goto out_release_sockets;
788                         }
789                 }
790
791                 if (sock && msock) {
792                         schedule_timeout_interruptible(mdev->net_conf->ping_timeo*HZ/10);
793                         ok = drbd_socket_okay(mdev, &sock);
794                         ok = drbd_socket_okay(mdev, &msock) && ok;
795                         if (ok)
796                                 break;
797                 }
798
799 retry:
800                 s = drbd_wait_for_connect(mdev);
801                 if (s) {
802                         try = drbd_recv_fp(mdev, s);
803                         drbd_socket_okay(mdev, &sock);
804                         drbd_socket_okay(mdev, &msock);
805                         switch (try) {
806                         case P_HAND_SHAKE_S:
807                                 if (sock) {
808                                         dev_warn(DEV, "initial packet S crossed\n");
809                                         sock_release(sock);
810                                 }
811                                 sock = s;
812                                 break;
813                         case P_HAND_SHAKE_M:
814                                 if (msock) {
815                                         dev_warn(DEV, "initial packet M crossed\n");
816                                         sock_release(msock);
817                                 }
818                                 msock = s;
819                                 set_bit(DISCARD_CONCURRENT, &mdev->flags);
820                                 break;
821                         default:
822                                 dev_warn(DEV, "Error receiving initial packet\n");
823                                 sock_release(s);
824                                 if (random32() & 1)
825                                         goto retry;
826                         }
827                 }
828
829                 if (mdev->state.conn <= C_DISCONNECTING)
830                         goto out_release_sockets;
831                 if (signal_pending(current)) {
832                         flush_signals(current);
833                         smp_rmb();
834                         if (get_t_state(&mdev->receiver) == Exiting)
835                                 goto out_release_sockets;
836                 }
837
838                 if (sock && msock) {
839                         ok = drbd_socket_okay(mdev, &sock);
840                         ok = drbd_socket_okay(mdev, &msock) && ok;
841                         if (ok)
842                                 break;
843                 }
844         } while (1);
845
846         msock->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
847         sock->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
848
849         sock->sk->sk_allocation = GFP_NOIO;
850         msock->sk->sk_allocation = GFP_NOIO;
851
852         sock->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
853         msock->sk->sk_priority = TC_PRIO_INTERACTIVE;
854
855         /* NOT YET ...
856          * sock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10;
857          * sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
858          * first set it to the P_HAND_SHAKE timeout,
859          * which we set to 4x the configured ping_timeout. */
860         sock->sk->sk_sndtimeo =
861         sock->sk->sk_rcvtimeo = mdev->net_conf->ping_timeo*4*HZ/10;
862
863         msock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10;
864         msock->sk->sk_rcvtimeo = mdev->net_conf->ping_int*HZ;
865
866         /* we don't want delays.
867          * we use TCP_CORK where appropriate, though */
868         drbd_tcp_nodelay(sock);
869         drbd_tcp_nodelay(msock);
870
871         mdev->data.socket = sock;
872         mdev->meta.socket = msock;
873         mdev->last_received = jiffies;
874
875         D_ASSERT(mdev->asender.task == NULL);
876
877         h = drbd_do_handshake(mdev);
878         if (h <= 0)
879                 return h;
880
881         if (mdev->cram_hmac_tfm) {
882                 /* drbd_request_state(mdev, NS(conn, WFAuth)); */
883                 switch (drbd_do_auth(mdev)) {
884                 case -1:
885                         dev_err(DEV, "Authentication of peer failed\n");
886                         return -1;
887                 case 0:
888                         dev_err(DEV, "Authentication of peer failed, trying again.\n");
889                         return 0;
890                 }
891         }
892
893         sock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10;
894         sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
895
896         atomic_set(&mdev->packet_seq, 0);
897         mdev->peer_seq = 0;
898
899         if (drbd_send_protocol(mdev) == -1)
900                 return -1;
901         set_bit(STATE_SENT, &mdev->flags);
902         drbd_send_sync_param(mdev, &mdev->sync_conf);
903         drbd_send_sizes(mdev, 0, 0);
904         drbd_send_uuids(mdev);
905         drbd_send_current_state(mdev);
906         clear_bit(USE_DEGR_WFC_T, &mdev->flags);
907         clear_bit(RESIZE_PENDING, &mdev->flags);
908
909         spin_lock_irq(&mdev->req_lock);
910         rv = _drbd_set_state(_NS(mdev, conn, C_WF_REPORT_PARAMS), CS_VERBOSE, NULL);
911         if (mdev->state.conn != C_WF_REPORT_PARAMS)
912                 clear_bit(STATE_SENT, &mdev->flags);
913         spin_unlock_irq(&mdev->req_lock);
914
915         if (rv < SS_SUCCESS)
916                 return 0;
917
918         drbd_thread_start(&mdev->asender);
919         mod_timer(&mdev->request_timer, jiffies + HZ); /* just start it here. */
920
921         return 1;
922
923 out_release_sockets:
924         if (sock)
925                 sock_release(sock);
926         if (msock)
927                 sock_release(msock);
928         return -1;
929 }
930
931 static int drbd_recv_header(struct drbd_conf *mdev, enum drbd_packets *cmd, unsigned int *packet_size)
932 {
933         union p_header *h = &mdev->data.rbuf.header;
934         int r;
935
936         r = drbd_recv(mdev, h, sizeof(*h));
937         if (unlikely(r != sizeof(*h))) {
938                 if (!signal_pending(current))
939                         dev_warn(DEV, "short read expecting header on sock: r=%d\n", r);
940                 return false;
941         }
942
943         if (likely(h->h80.magic == BE_DRBD_MAGIC)) {
944                 *cmd = be16_to_cpu(h->h80.command);
945                 *packet_size = be16_to_cpu(h->h80.length);
946         } else if (h->h95.magic == BE_DRBD_MAGIC_BIG) {
947                 *cmd = be16_to_cpu(h->h95.command);
948                 *packet_size = be32_to_cpu(h->h95.length);
949         } else {
950                 dev_err(DEV, "magic?? on data m: 0x%08x c: %d l: %d\n",
951                     be32_to_cpu(h->h80.magic),
952                     be16_to_cpu(h->h80.command),
953                     be16_to_cpu(h->h80.length));
954                 return false;
955         }
956         mdev->last_received = jiffies;
957
958         return true;
959 }
960
961 static void drbd_flush(struct drbd_conf *mdev)
962 {
963         int rv;
964
965         if (mdev->write_ordering >= WO_bdev_flush && get_ldev(mdev)) {
966                 rv = blkdev_issue_flush(mdev->ldev->backing_bdev, GFP_KERNEL,
967                                         NULL);
968                 if (rv) {
969                         dev_info(DEV, "local disk flush failed with status %d\n", rv);
970                         /* would rather check on EOPNOTSUPP, but that is not reliable.
971                          * don't try again for ANY return value != 0
972                          * if (rv == -EOPNOTSUPP) */
973                         drbd_bump_write_ordering(mdev, WO_drain_io);
974                 }
975                 put_ldev(mdev);
976         }
977 }
978
979 /**
980  * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
981  * @mdev:       DRBD device.
982  * @epoch:      Epoch object.
983  * @ev:         Epoch event.
984  */
985 static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *mdev,
986                                                struct drbd_epoch *epoch,
987                                                enum epoch_event ev)
988 {
989         int epoch_size;
990         struct drbd_epoch *next_epoch;
991         enum finish_epoch rv = FE_STILL_LIVE;
992
993         spin_lock(&mdev->epoch_lock);
994         do {
995                 next_epoch = NULL;
996
997                 epoch_size = atomic_read(&epoch->epoch_size);
998
999                 switch (ev & ~EV_CLEANUP) {
1000                 case EV_PUT:
1001                         atomic_dec(&epoch->active);
1002                         break;
1003                 case EV_GOT_BARRIER_NR:
1004                         set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1005                         break;
1006                 case EV_BECAME_LAST:
1007                         /* nothing to do*/
1008                         break;
1009                 }
1010
1011                 if (epoch_size != 0 &&
1012                     atomic_read(&epoch->active) == 0 &&
1013                     (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags) || ev & EV_CLEANUP)) {
1014                         if (!(ev & EV_CLEANUP)) {
1015                                 spin_unlock(&mdev->epoch_lock);
1016                                 drbd_send_b_ack(mdev, epoch->barrier_nr, epoch_size);
1017                                 spin_lock(&mdev->epoch_lock);
1018                         }
1019                         if (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags))
1020                                 dec_unacked(mdev);
1021
1022                         if (mdev->current_epoch != epoch) {
1023                                 next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1024                                 list_del(&epoch->list);
1025                                 ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1026                                 mdev->epochs--;
1027                                 kfree(epoch);
1028
1029                                 if (rv == FE_STILL_LIVE)
1030                                         rv = FE_DESTROYED;
1031                         } else {
1032                                 epoch->flags = 0;
1033                                 atomic_set(&epoch->epoch_size, 0);
1034                                 /* atomic_set(&epoch->active, 0); is already zero */
1035                                 if (rv == FE_STILL_LIVE)
1036                                         rv = FE_RECYCLED;
1037                                 wake_up(&mdev->ee_wait);
1038                         }
1039                 }
1040
1041                 if (!next_epoch)
1042                         break;
1043
1044                 epoch = next_epoch;
1045         } while (1);
1046
1047         spin_unlock(&mdev->epoch_lock);
1048
1049         return rv;
1050 }
1051
1052 /**
1053  * drbd_bump_write_ordering() - Fall back to an other write ordering method
1054  * @mdev:       DRBD device.
1055  * @wo:         Write ordering method to try.
1056  */
1057 void drbd_bump_write_ordering(struct drbd_conf *mdev, enum write_ordering_e wo) __must_hold(local)
1058 {
1059         enum write_ordering_e pwo;
1060         static char *write_ordering_str[] = {
1061                 [WO_none] = "none",
1062                 [WO_drain_io] = "drain",
1063                 [WO_bdev_flush] = "flush",
1064         };
1065
1066         pwo = mdev->write_ordering;
1067         wo = min(pwo, wo);
1068         if (wo == WO_bdev_flush && mdev->ldev->dc.no_disk_flush)
1069                 wo = WO_drain_io;
1070         if (wo == WO_drain_io && mdev->ldev->dc.no_disk_drain)
1071                 wo = WO_none;
1072         mdev->write_ordering = wo;
1073         if (pwo != mdev->write_ordering || wo == WO_bdev_flush)
1074                 dev_info(DEV, "Method to ensure write ordering: %s\n", write_ordering_str[mdev->write_ordering]);
1075 }
1076
1077 /**
1078  * drbd_submit_ee()
1079  * @mdev:       DRBD device.
1080  * @e:          epoch entry
1081  * @rw:         flag field, see bio->bi_rw
1082  *
1083  * May spread the pages to multiple bios,
1084  * depending on bio_add_page restrictions.
1085  *
1086  * Returns 0 if all bios have been submitted,
1087  * -ENOMEM if we could not allocate enough bios,
1088  * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1089  *  single page to an empty bio (which should never happen and likely indicates
1090  *  that the lower level IO stack is in some way broken). This has been observed
1091  *  on certain Xen deployments.
1092  */
1093 /* TODO allocate from our own bio_set. */
1094 int drbd_submit_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e,
1095                 const unsigned rw, const int fault_type)
1096 {
1097         struct bio *bios = NULL;
1098         struct bio *bio;
1099         struct page *page = e->pages;
1100         sector_t sector = e->sector;
1101         unsigned ds = e->size;
1102         unsigned n_bios = 0;
1103         unsigned nr_pages = (ds + PAGE_SIZE -1) >> PAGE_SHIFT;
1104         int err = -ENOMEM;
1105
1106         /* In most cases, we will only need one bio.  But in case the lower
1107          * level restrictions happen to be different at this offset on this
1108          * side than those of the sending peer, we may need to submit the
1109          * request in more than one bio.
1110          *
1111          * Plain bio_alloc is good enough here, this is no DRBD internally
1112          * generated bio, but a bio allocated on behalf of the peer.
1113          */
1114 next_bio:
1115         bio = bio_alloc(GFP_NOIO, nr_pages);
1116         if (!bio) {
1117                 dev_err(DEV, "submit_ee: Allocation of a bio failed\n");
1118                 goto fail;
1119         }
1120         /* > e->sector, unless this is the first bio */
1121         bio->bi_sector = sector;
1122         bio->bi_bdev = mdev->ldev->backing_bdev;
1123         bio->bi_rw = rw;
1124         bio->bi_private = e;
1125         bio->bi_end_io = drbd_endio_sec;
1126
1127         bio->bi_next = bios;
1128         bios = bio;
1129         ++n_bios;
1130
1131         page_chain_for_each(page) {
1132                 unsigned len = min_t(unsigned, ds, PAGE_SIZE);
1133                 if (!bio_add_page(bio, page, len, 0)) {
1134                         /* A single page must always be possible!
1135                          * But in case it fails anyways,
1136                          * we deal with it, and complain (below). */
1137                         if (bio->bi_vcnt == 0) {
1138                                 dev_err(DEV,
1139                                         "bio_add_page failed for len=%u, "
1140                                         "bi_vcnt=0 (bi_sector=%llu)\n",
1141                                         len, (unsigned long long)bio->bi_sector);
1142                                 err = -ENOSPC;
1143                                 goto fail;
1144                         }
1145                         goto next_bio;
1146                 }
1147                 ds -= len;
1148                 sector += len >> 9;
1149                 --nr_pages;
1150         }
1151         D_ASSERT(page == NULL);
1152         D_ASSERT(ds == 0);
1153
1154         atomic_set(&e->pending_bios, n_bios);
1155         do {
1156                 bio = bios;
1157                 bios = bios->bi_next;
1158                 bio->bi_next = NULL;
1159
1160                 drbd_generic_make_request(mdev, fault_type, bio);
1161         } while (bios);
1162         return 0;
1163
1164 fail:
1165         while (bios) {
1166                 bio = bios;
1167                 bios = bios->bi_next;
1168                 bio_put(bio);
1169         }
1170         return err;
1171 }
1172
1173 static int receive_Barrier(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
1174 {
1175         int rv;
1176         struct p_barrier *p = &mdev->data.rbuf.barrier;
1177         struct drbd_epoch *epoch;
1178
1179         inc_unacked(mdev);
1180
1181         mdev->current_epoch->barrier_nr = p->barrier;
1182         rv = drbd_may_finish_epoch(mdev, mdev->current_epoch, EV_GOT_BARRIER_NR);
1183
1184         /* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1185          * the activity log, which means it would not be resynced in case the
1186          * R_PRIMARY crashes now.
1187          * Therefore we must send the barrier_ack after the barrier request was
1188          * completed. */
1189         switch (mdev->write_ordering) {
1190         case WO_none:
1191                 if (rv == FE_RECYCLED)
1192                         return true;
1193
1194                 /* receiver context, in the writeout path of the other node.
1195                  * avoid potential distributed deadlock */
1196                 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1197                 if (epoch)
1198                         break;
1199                 else
1200                         dev_warn(DEV, "Allocation of an epoch failed, slowing down\n");
1201                         /* Fall through */
1202
1203         case WO_bdev_flush:
1204         case WO_drain_io:
1205                 drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
1206                 drbd_flush(mdev);
1207
1208                 if (atomic_read(&mdev->current_epoch->epoch_size)) {
1209                         epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1210                         if (epoch)
1211                                 break;
1212                 }
1213
1214                 epoch = mdev->current_epoch;
1215                 wait_event(mdev->ee_wait, atomic_read(&epoch->epoch_size) == 0);
1216
1217                 D_ASSERT(atomic_read(&epoch->active) == 0);
1218                 D_ASSERT(epoch->flags == 0);
1219
1220                 return true;
1221         default:
1222                 dev_err(DEV, "Strangeness in mdev->write_ordering %d\n", mdev->write_ordering);
1223                 return false;
1224         }
1225
1226         epoch->flags = 0;
1227         atomic_set(&epoch->epoch_size, 0);
1228         atomic_set(&epoch->active, 0);
1229
1230         spin_lock(&mdev->epoch_lock);
1231         if (atomic_read(&mdev->current_epoch->epoch_size)) {
1232                 list_add(&epoch->list, &mdev->current_epoch->list);
1233                 mdev->current_epoch = epoch;
1234                 mdev->epochs++;
1235         } else {
1236                 /* The current_epoch got recycled while we allocated this one... */
1237                 kfree(epoch);
1238         }
1239         spin_unlock(&mdev->epoch_lock);
1240
1241         return true;
1242 }
1243
1244 /* used from receive_RSDataReply (recv_resync_read)
1245  * and from receive_Data */
1246 static struct drbd_epoch_entry *
1247 read_in_block(struct drbd_conf *mdev, u64 id, sector_t sector, int data_size) __must_hold(local)
1248 {
1249         const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
1250         struct drbd_epoch_entry *e;
1251         struct page *page;
1252         int dgs, ds, rr;
1253         void *dig_in = mdev->int_dig_in;
1254         void *dig_vv = mdev->int_dig_vv;
1255         unsigned long *data;
1256
1257         dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_r_tfm) ?
1258                 crypto_hash_digestsize(mdev->integrity_r_tfm) : 0;
1259
1260         if (dgs) {
1261                 rr = drbd_recv(mdev, dig_in, dgs);
1262                 if (rr != dgs) {
1263                         if (!signal_pending(current))
1264                                 dev_warn(DEV,
1265                                         "short read receiving data digest: read %d expected %d\n",
1266                                         rr, dgs);
1267                         return NULL;
1268                 }
1269         }
1270
1271         data_size -= dgs;
1272
1273         ERR_IF(data_size == 0) return NULL;
1274         ERR_IF(data_size &  0x1ff) return NULL;
1275         ERR_IF(data_size >  DRBD_MAX_BIO_SIZE) return NULL;
1276
1277         /* even though we trust out peer,
1278          * we sometimes have to double check. */
1279         if (sector + (data_size>>9) > capacity) {
1280                 dev_err(DEV, "request from peer beyond end of local disk: "
1281                         "capacity: %llus < sector: %llus + size: %u\n",
1282                         (unsigned long long)capacity,
1283                         (unsigned long long)sector, data_size);
1284                 return NULL;
1285         }
1286
1287         /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1288          * "criss-cross" setup, that might cause write-out on some other DRBD,
1289          * which in turn might block on the other node at this very place.  */
1290         e = drbd_alloc_ee(mdev, id, sector, data_size, GFP_NOIO);
1291         if (!e)
1292                 return NULL;
1293
1294         ds = data_size;
1295         page = e->pages;
1296         page_chain_for_each(page) {
1297                 unsigned len = min_t(int, ds, PAGE_SIZE);
1298                 data = kmap(page);
1299                 rr = drbd_recv(mdev, data, len);
1300                 if (drbd_insert_fault(mdev, DRBD_FAULT_RECEIVE)) {
1301                         dev_err(DEV, "Fault injection: Corrupting data on receive\n");
1302                         data[0] = data[0] ^ (unsigned long)-1;
1303                 }
1304                 kunmap(page);
1305                 if (rr != len) {
1306                         drbd_free_ee(mdev, e);
1307                         if (!signal_pending(current))
1308                                 dev_warn(DEV, "short read receiving data: read %d expected %d\n",
1309                                 rr, len);
1310                         return NULL;
1311                 }
1312                 ds -= rr;
1313         }
1314
1315         if (dgs) {
1316                 drbd_csum_ee(mdev, mdev->integrity_r_tfm, e, dig_vv);
1317                 if (memcmp(dig_in, dig_vv, dgs)) {
1318                         dev_err(DEV, "Digest integrity check FAILED: %llus +%u\n",
1319                                 (unsigned long long)sector, data_size);
1320                         drbd_bcast_ee(mdev, "digest failed",
1321                                         dgs, dig_in, dig_vv, e);
1322                         drbd_free_ee(mdev, e);
1323                         return NULL;
1324                 }
1325         }
1326         mdev->recv_cnt += data_size>>9;
1327         return e;
1328 }
1329
1330 /* drbd_drain_block() just takes a data block
1331  * out of the socket input buffer, and discards it.
1332  */
1333 static int drbd_drain_block(struct drbd_conf *mdev, int data_size)
1334 {
1335         struct page *page;
1336         int rr, rv = 1;
1337         void *data;
1338
1339         if (!data_size)
1340                 return true;
1341
1342         page = drbd_pp_alloc(mdev, 1, 1);
1343
1344         data = kmap(page);
1345         while (data_size) {
1346                 rr = drbd_recv(mdev, data, min_t(int, data_size, PAGE_SIZE));
1347                 if (rr != min_t(int, data_size, PAGE_SIZE)) {
1348                         rv = 0;
1349                         if (!signal_pending(current))
1350                                 dev_warn(DEV,
1351                                         "short read receiving data: read %d expected %d\n",
1352                                         rr, min_t(int, data_size, PAGE_SIZE));
1353                         break;
1354                 }
1355                 data_size -= rr;
1356         }
1357         kunmap(page);
1358         drbd_pp_free(mdev, page, 0);
1359         return rv;
1360 }
1361
1362 static int recv_dless_read(struct drbd_conf *mdev, struct drbd_request *req,
1363                            sector_t sector, int data_size)
1364 {
1365         struct bio_vec *bvec;
1366         struct bio *bio;
1367         int dgs, rr, i, expect;
1368         void *dig_in = mdev->int_dig_in;
1369         void *dig_vv = mdev->int_dig_vv;
1370
1371         dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_r_tfm) ?
1372                 crypto_hash_digestsize(mdev->integrity_r_tfm) : 0;
1373
1374         if (dgs) {
1375                 rr = drbd_recv(mdev, dig_in, dgs);
1376                 if (rr != dgs) {
1377                         if (!signal_pending(current))
1378                                 dev_warn(DEV,
1379                                         "short read receiving data reply digest: read %d expected %d\n",
1380                                         rr, dgs);
1381                         return 0;
1382                 }
1383         }
1384
1385         data_size -= dgs;
1386
1387         /* optimistically update recv_cnt.  if receiving fails below,
1388          * we disconnect anyways, and counters will be reset. */
1389         mdev->recv_cnt += data_size>>9;
1390
1391         bio = req->master_bio;
1392         D_ASSERT(sector == bio->bi_sector);
1393
1394         bio_for_each_segment(bvec, bio, i) {
1395                 expect = min_t(int, data_size, bvec->bv_len);
1396                 rr = drbd_recv(mdev,
1397                              kmap(bvec->bv_page)+bvec->bv_offset,
1398                              expect);
1399                 kunmap(bvec->bv_page);
1400                 if (rr != expect) {
1401                         if (!signal_pending(current))
1402                                 dev_warn(DEV, "short read receiving data reply: "
1403                                         "read %d expected %d\n",
1404                                         rr, expect);
1405                         return 0;
1406                 }
1407                 data_size -= rr;
1408         }
1409
1410         if (dgs) {
1411                 drbd_csum_bio(mdev, mdev->integrity_r_tfm, bio, dig_vv);
1412                 if (memcmp(dig_in, dig_vv, dgs)) {
1413                         dev_err(DEV, "Digest integrity check FAILED. Broken NICs?\n");
1414                         return 0;
1415                 }
1416         }
1417
1418         D_ASSERT(data_size == 0);
1419         return 1;
1420 }
1421
1422 /* e_end_resync_block() is called via
1423  * drbd_process_done_ee() by asender only */
1424 static int e_end_resync_block(struct drbd_conf *mdev, struct drbd_work *w, int unused)
1425 {
1426         struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1427         sector_t sector = e->sector;
1428         int ok;
1429
1430         D_ASSERT(hlist_unhashed(&e->collision));
1431
1432         if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1433                 drbd_set_in_sync(mdev, sector, e->size);
1434                 ok = drbd_send_ack(mdev, P_RS_WRITE_ACK, e);
1435         } else {
1436                 /* Record failure to sync */
1437                 drbd_rs_failed_io(mdev, sector, e->size);
1438
1439                 ok  = drbd_send_ack(mdev, P_NEG_ACK, e);
1440         }
1441         dec_unacked(mdev);
1442
1443         return ok;
1444 }
1445
1446 static int recv_resync_read(struct drbd_conf *mdev, sector_t sector, int data_size) __releases(local)
1447 {
1448         struct drbd_epoch_entry *e;
1449
1450         e = read_in_block(mdev, ID_SYNCER, sector, data_size);
1451         if (!e)
1452                 goto fail;
1453
1454         dec_rs_pending(mdev);
1455
1456         inc_unacked(mdev);
1457         /* corresponding dec_unacked() in e_end_resync_block()
1458          * respective _drbd_clear_done_ee */
1459
1460         e->w.cb = e_end_resync_block;
1461
1462         spin_lock_irq(&mdev->req_lock);
1463         list_add(&e->w.list, &mdev->sync_ee);
1464         spin_unlock_irq(&mdev->req_lock);
1465
1466         atomic_add(data_size >> 9, &mdev->rs_sect_ev);
1467         if (drbd_submit_ee(mdev, e, WRITE, DRBD_FAULT_RS_WR) == 0)
1468                 return true;
1469
1470         /* don't care for the reason here */
1471         dev_err(DEV, "submit failed, triggering re-connect\n");
1472         spin_lock_irq(&mdev->req_lock);
1473         list_del(&e->w.list);
1474         spin_unlock_irq(&mdev->req_lock);
1475
1476         drbd_free_ee(mdev, e);
1477 fail:
1478         put_ldev(mdev);
1479         return false;
1480 }
1481
1482 static int receive_DataReply(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
1483 {
1484         struct drbd_request *req;
1485         sector_t sector;
1486         int ok;
1487         struct p_data *p = &mdev->data.rbuf.data;
1488
1489         sector = be64_to_cpu(p->sector);
1490
1491         spin_lock_irq(&mdev->req_lock);
1492         req = _ar_id_to_req(mdev, p->block_id, sector);
1493         spin_unlock_irq(&mdev->req_lock);
1494         if (unlikely(!req)) {
1495                 dev_err(DEV, "Got a corrupt block_id/sector pair(1).\n");
1496                 return false;
1497         }
1498
1499         /* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
1500          * special casing it there for the various failure cases.
1501          * still no race with drbd_fail_pending_reads */
1502         ok = recv_dless_read(mdev, req, sector, data_size);
1503
1504         if (ok)
1505                 req_mod(req, data_received);
1506         /* else: nothing. handled from drbd_disconnect...
1507          * I don't think we may complete this just yet
1508          * in case we are "on-disconnect: freeze" */
1509
1510         return ok;
1511 }
1512
1513 static int receive_RSDataReply(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
1514 {
1515         sector_t sector;
1516         int ok;
1517         struct p_data *p = &mdev->data.rbuf.data;
1518
1519         sector = be64_to_cpu(p->sector);
1520         D_ASSERT(p->block_id == ID_SYNCER);
1521
1522         if (get_ldev(mdev)) {
1523                 /* data is submitted to disk within recv_resync_read.
1524                  * corresponding put_ldev done below on error,
1525                  * or in drbd_endio_write_sec. */
1526                 ok = recv_resync_read(mdev, sector, data_size);
1527         } else {
1528                 if (__ratelimit(&drbd_ratelimit_state))
1529                         dev_err(DEV, "Can not write resync data to local disk.\n");
1530
1531                 ok = drbd_drain_block(mdev, data_size);
1532
1533                 drbd_send_ack_dp(mdev, P_NEG_ACK, p, data_size);
1534         }
1535
1536         atomic_add(data_size >> 9, &mdev->rs_sect_in);
1537
1538         return ok;
1539 }
1540
1541 /* e_end_block() is called via drbd_process_done_ee().
1542  * this means this function only runs in the asender thread
1543  */
1544 static int e_end_block(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1545 {
1546         struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1547         sector_t sector = e->sector;
1548         int ok = 1, pcmd;
1549
1550         if (mdev->net_conf->wire_protocol == DRBD_PROT_C) {
1551                 if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1552                         pcmd = (mdev->state.conn >= C_SYNC_SOURCE &&
1553                                 mdev->state.conn <= C_PAUSED_SYNC_T &&
1554                                 e->flags & EE_MAY_SET_IN_SYNC) ?
1555                                 P_RS_WRITE_ACK : P_WRITE_ACK;
1556                         ok &= drbd_send_ack(mdev, pcmd, e);
1557                         if (pcmd == P_RS_WRITE_ACK)
1558                                 drbd_set_in_sync(mdev, sector, e->size);
1559                 } else {
1560                         ok  = drbd_send_ack(mdev, P_NEG_ACK, e);
1561                         /* we expect it to be marked out of sync anyways...
1562                          * maybe assert this?  */
1563                 }
1564                 dec_unacked(mdev);
1565         }
1566         /* we delete from the conflict detection hash _after_ we sent out the
1567          * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right.  */
1568         if (mdev->net_conf->two_primaries) {
1569                 spin_lock_irq(&mdev->req_lock);
1570                 D_ASSERT(!hlist_unhashed(&e->collision));
1571                 hlist_del_init(&e->collision);
1572                 spin_unlock_irq(&mdev->req_lock);
1573         } else {
1574                 D_ASSERT(hlist_unhashed(&e->collision));
1575         }
1576
1577         drbd_may_finish_epoch(mdev, e->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
1578
1579         return ok;
1580 }
1581
1582 static int e_send_discard_ack(struct drbd_conf *mdev, struct drbd_work *w, int unused)
1583 {
1584         struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1585         int ok = 1;
1586
1587         D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
1588         ok = drbd_send_ack(mdev, P_DISCARD_ACK, e);
1589
1590         spin_lock_irq(&mdev->req_lock);
1591         D_ASSERT(!hlist_unhashed(&e->collision));
1592         hlist_del_init(&e->collision);
1593         spin_unlock_irq(&mdev->req_lock);
1594
1595         dec_unacked(mdev);
1596
1597         return ok;
1598 }
1599
1600 static bool overlapping_resync_write(struct drbd_conf *mdev, struct drbd_epoch_entry *data_e)
1601 {
1602
1603         struct drbd_epoch_entry *rs_e;
1604         bool rv = 0;
1605
1606         spin_lock_irq(&mdev->req_lock);
1607         list_for_each_entry(rs_e, &mdev->sync_ee, w.list) {
1608                 if (overlaps(data_e->sector, data_e->size, rs_e->sector, rs_e->size)) {
1609                         rv = 1;
1610                         break;
1611                 }
1612         }
1613         spin_unlock_irq(&mdev->req_lock);
1614
1615         return rv;
1616 }
1617
1618 /* Called from receive_Data.
1619  * Synchronize packets on sock with packets on msock.
1620  *
1621  * This is here so even when a P_DATA packet traveling via sock overtook an Ack
1622  * packet traveling on msock, they are still processed in the order they have
1623  * been sent.
1624  *
1625  * Note: we don't care for Ack packets overtaking P_DATA packets.
1626  *
1627  * In case packet_seq is larger than mdev->peer_seq number, there are
1628  * outstanding packets on the msock. We wait for them to arrive.
1629  * In case we are the logically next packet, we update mdev->peer_seq
1630  * ourselves. Correctly handles 32bit wrap around.
1631  *
1632  * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
1633  * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
1634  * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
1635  * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
1636  *
1637  * returns 0 if we may process the packet,
1638  * -ERESTARTSYS if we were interrupted (by disconnect signal). */
1639 static int drbd_wait_peer_seq(struct drbd_conf *mdev, const u32 packet_seq)
1640 {
1641         DEFINE_WAIT(wait);
1642         unsigned int p_seq;
1643         long timeout;
1644         int ret = 0;
1645         spin_lock(&mdev->peer_seq_lock);
1646         for (;;) {
1647                 prepare_to_wait(&mdev->seq_wait, &wait, TASK_INTERRUPTIBLE);
1648                 if (seq_le(packet_seq, mdev->peer_seq+1))
1649                         break;
1650                 if (signal_pending(current)) {
1651                         ret = -ERESTARTSYS;
1652                         break;
1653                 }
1654                 p_seq = mdev->peer_seq;
1655                 spin_unlock(&mdev->peer_seq_lock);
1656                 timeout = schedule_timeout(30*HZ);
1657                 spin_lock(&mdev->peer_seq_lock);
1658                 if (timeout == 0 && p_seq == mdev->peer_seq) {
1659                         ret = -ETIMEDOUT;
1660                         dev_err(DEV, "ASSERT FAILED waited 30 seconds for sequence update, forcing reconnect\n");
1661                         break;
1662                 }
1663         }
1664         finish_wait(&mdev->seq_wait, &wait);
1665         if (mdev->peer_seq+1 == packet_seq)
1666                 mdev->peer_seq++;
1667         spin_unlock(&mdev->peer_seq_lock);
1668         return ret;
1669 }
1670
1671 /* see also bio_flags_to_wire()
1672  * DRBD_REQ_*, because we need to semantically map the flags to data packet
1673  * flags and back. We may replicate to other kernel versions. */
1674 static unsigned long wire_flags_to_bio(struct drbd_conf *mdev, u32 dpf)
1675 {
1676         return  (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
1677                 (dpf & DP_FUA ? REQ_FUA : 0) |
1678                 (dpf & DP_FLUSH ? REQ_FLUSH : 0) |
1679                 (dpf & DP_DISCARD ? REQ_DISCARD : 0);
1680 }
1681
1682 /* mirrored write */
1683 static int receive_Data(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
1684 {
1685         sector_t sector;
1686         struct drbd_epoch_entry *e;
1687         struct p_data *p = &mdev->data.rbuf.data;
1688         int rw = WRITE;
1689         u32 dp_flags;
1690
1691         if (!get_ldev(mdev)) {
1692                 spin_lock(&mdev->peer_seq_lock);
1693                 if (mdev->peer_seq+1 == be32_to_cpu(p->seq_num))
1694                         mdev->peer_seq++;
1695                 spin_unlock(&mdev->peer_seq_lock);
1696
1697                 drbd_send_ack_dp(mdev, P_NEG_ACK, p, data_size);
1698                 atomic_inc(&mdev->current_epoch->epoch_size);
1699                 return drbd_drain_block(mdev, data_size);
1700         }
1701
1702         /* get_ldev(mdev) successful.
1703          * Corresponding put_ldev done either below (on various errors),
1704          * or in drbd_endio_write_sec, if we successfully submit the data at
1705          * the end of this function. */
1706
1707         sector = be64_to_cpu(p->sector);
1708         e = read_in_block(mdev, p->block_id, sector, data_size);
1709         if (!e) {
1710                 put_ldev(mdev);
1711                 return false;
1712         }
1713
1714         e->w.cb = e_end_block;
1715
1716         dp_flags = be32_to_cpu(p->dp_flags);
1717         rw |= wire_flags_to_bio(mdev, dp_flags);
1718
1719         if (dp_flags & DP_MAY_SET_IN_SYNC)
1720                 e->flags |= EE_MAY_SET_IN_SYNC;
1721
1722         spin_lock(&mdev->epoch_lock);
1723         e->epoch = mdev->current_epoch;
1724         atomic_inc(&e->epoch->epoch_size);
1725         atomic_inc(&e->epoch->active);
1726         spin_unlock(&mdev->epoch_lock);
1727
1728         /* I'm the receiver, I do hold a net_cnt reference. */
1729         if (!mdev->net_conf->two_primaries) {
1730                 spin_lock_irq(&mdev->req_lock);
1731         } else {
1732                 /* don't get the req_lock yet,
1733                  * we may sleep in drbd_wait_peer_seq */
1734                 const int size = e->size;
1735                 const int discard = test_bit(DISCARD_CONCURRENT, &mdev->flags);
1736                 DEFINE_WAIT(wait);
1737                 struct drbd_request *i;
1738                 struct hlist_node *n;
1739                 struct hlist_head *slot;
1740                 int first;
1741
1742                 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
1743                 BUG_ON(mdev->ee_hash == NULL);
1744                 BUG_ON(mdev->tl_hash == NULL);
1745
1746                 /* conflict detection and handling:
1747                  * 1. wait on the sequence number,
1748                  *    in case this data packet overtook ACK packets.
1749                  * 2. check our hash tables for conflicting requests.
1750                  *    we only need to walk the tl_hash, since an ee can not
1751                  *    have a conflict with an other ee: on the submitting
1752                  *    node, the corresponding req had already been conflicting,
1753                  *    and a conflicting req is never sent.
1754                  *
1755                  * Note: for two_primaries, we are protocol C,
1756                  * so there cannot be any request that is DONE
1757                  * but still on the transfer log.
1758                  *
1759                  * unconditionally add to the ee_hash.
1760                  *
1761                  * if no conflicting request is found:
1762                  *    submit.
1763                  *
1764                  * if any conflicting request is found
1765                  * that has not yet been acked,
1766                  * AND I have the "discard concurrent writes" flag:
1767                  *       queue (via done_ee) the P_DISCARD_ACK; OUT.
1768                  *
1769                  * if any conflicting request is found:
1770                  *       block the receiver, waiting on misc_wait
1771                  *       until no more conflicting requests are there,
1772                  *       or we get interrupted (disconnect).
1773                  *
1774                  *       we do not just write after local io completion of those
1775                  *       requests, but only after req is done completely, i.e.
1776                  *       we wait for the P_DISCARD_ACK to arrive!
1777                  *
1778                  *       then proceed normally, i.e. submit.
1779                  */
1780                 if (drbd_wait_peer_seq(mdev, be32_to_cpu(p->seq_num)))
1781                         goto out_interrupted;
1782
1783                 spin_lock_irq(&mdev->req_lock);
1784
1785                 hlist_add_head(&e->collision, ee_hash_slot(mdev, sector));
1786
1787 #define OVERLAPS overlaps(i->sector, i->size, sector, size)
1788                 slot = tl_hash_slot(mdev, sector);
1789                 first = 1;
1790                 for (;;) {
1791                         int have_unacked = 0;
1792                         int have_conflict = 0;
1793                         prepare_to_wait(&mdev->misc_wait, &wait,
1794                                 TASK_INTERRUPTIBLE);
1795                         hlist_for_each_entry(i, n, slot, collision) {
1796                                 if (OVERLAPS) {
1797                                         /* only ALERT on first iteration,
1798                                          * we may be woken up early... */
1799                                         if (first)
1800                                                 dev_alert(DEV, "%s[%u] Concurrent local write detected!"
1801                                                       " new: %llus +%u; pending: %llus +%u\n",
1802                                                       current->comm, current->pid,
1803                                                       (unsigned long long)sector, size,
1804                                                       (unsigned long long)i->sector, i->size);
1805                                         if (i->rq_state & RQ_NET_PENDING)
1806                                                 ++have_unacked;
1807                                         ++have_conflict;
1808                                 }
1809                         }
1810 #undef OVERLAPS
1811                         if (!have_conflict)
1812                                 break;
1813
1814                         /* Discard Ack only for the _first_ iteration */
1815                         if (first && discard && have_unacked) {
1816                                 dev_alert(DEV, "Concurrent write! [DISCARD BY FLAG] sec=%llus\n",
1817                                      (unsigned long long)sector);
1818                                 inc_unacked(mdev);
1819                                 e->w.cb = e_send_discard_ack;
1820                                 list_add_tail(&e->w.list, &mdev->done_ee);
1821
1822                                 spin_unlock_irq(&mdev->req_lock);
1823
1824                                 /* we could probably send that P_DISCARD_ACK ourselves,
1825                                  * but I don't like the receiver using the msock */
1826
1827                                 put_ldev(mdev);
1828                                 wake_asender(mdev);
1829                                 finish_wait(&mdev->misc_wait, &wait);
1830                                 return true;
1831                         }
1832
1833                         if (signal_pending(current)) {
1834                                 hlist_del_init(&e->collision);
1835
1836                                 spin_unlock_irq(&mdev->req_lock);
1837
1838                                 finish_wait(&mdev->misc_wait, &wait);
1839                                 goto out_interrupted;
1840                         }
1841
1842                         spin_unlock_irq(&mdev->req_lock);
1843                         if (first) {
1844                                 first = 0;
1845                                 dev_alert(DEV, "Concurrent write! [W AFTERWARDS] "
1846                                      "sec=%llus\n", (unsigned long long)sector);
1847                         } else if (discard) {
1848                                 /* we had none on the first iteration.
1849                                  * there must be none now. */
1850                                 D_ASSERT(have_unacked == 0);
1851                         }
1852                         schedule();
1853                         spin_lock_irq(&mdev->req_lock);
1854                 }
1855                 finish_wait(&mdev->misc_wait, &wait);
1856         }
1857
1858         list_add(&e->w.list, &mdev->active_ee);
1859         spin_unlock_irq(&mdev->req_lock);
1860
1861         if (mdev->state.conn == C_SYNC_TARGET)
1862                 wait_event(mdev->ee_wait, !overlapping_resync_write(mdev, e));
1863
1864         switch (mdev->net_conf->wire_protocol) {
1865         case DRBD_PROT_C:
1866                 inc_unacked(mdev);
1867                 /* corresponding dec_unacked() in e_end_block()
1868                  * respective _drbd_clear_done_ee */
1869                 break;
1870         case DRBD_PROT_B:
1871                 /* I really don't like it that the receiver thread
1872                  * sends on the msock, but anyways */
1873                 drbd_send_ack(mdev, P_RECV_ACK, e);
1874                 break;
1875         case DRBD_PROT_A:
1876                 /* nothing to do */
1877                 break;
1878         }
1879
1880         if (mdev->state.pdsk < D_INCONSISTENT) {
1881                 /* In case we have the only disk of the cluster, */
1882                 drbd_set_out_of_sync(mdev, e->sector, e->size);
1883                 e->flags |= EE_CALL_AL_COMPLETE_IO;
1884                 e->flags &= ~EE_MAY_SET_IN_SYNC;
1885                 drbd_al_begin_io(mdev, e->sector);
1886         }
1887
1888         if (drbd_submit_ee(mdev, e, rw, DRBD_FAULT_DT_WR) == 0)
1889                 return true;
1890
1891         /* don't care for the reason here */
1892         dev_err(DEV, "submit failed, triggering re-connect\n");
1893         spin_lock_irq(&mdev->req_lock);
1894         list_del(&e->w.list);
1895         hlist_del_init(&e->collision);
1896         spin_unlock_irq(&mdev->req_lock);
1897         if (e->flags & EE_CALL_AL_COMPLETE_IO)
1898                 drbd_al_complete_io(mdev, e->sector);
1899
1900 out_interrupted:
1901         drbd_may_finish_epoch(mdev, e->epoch, EV_PUT + EV_CLEANUP);
1902         put_ldev(mdev);
1903         drbd_free_ee(mdev, e);
1904         return false;
1905 }
1906
1907 /* We may throttle resync, if the lower device seems to be busy,
1908  * and current sync rate is above c_min_rate.
1909  *
1910  * To decide whether or not the lower device is busy, we use a scheme similar
1911  * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
1912  * (more than 64 sectors) of activity we cannot account for with our own resync
1913  * activity, it obviously is "busy".
1914  *
1915  * The current sync rate used here uses only the most recent two step marks,
1916  * to have a short time average so we can react faster.
1917  */
1918 int drbd_rs_should_slow_down(struct drbd_conf *mdev, sector_t sector)
1919 {
1920         struct gendisk *disk = mdev->ldev->backing_bdev->bd_contains->bd_disk;
1921         unsigned long db, dt, dbdt;
1922         struct lc_element *tmp;
1923         int curr_events;
1924         int throttle = 0;
1925
1926         /* feature disabled? */
1927         if (mdev->sync_conf.c_min_rate == 0)
1928                 return 0;
1929
1930         spin_lock_irq(&mdev->al_lock);
1931         tmp = lc_find(mdev->resync, BM_SECT_TO_EXT(sector));
1932         if (tmp) {
1933                 struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
1934                 if (test_bit(BME_PRIORITY, &bm_ext->flags)) {
1935                         spin_unlock_irq(&mdev->al_lock);
1936                         return 0;
1937                 }
1938                 /* Do not slow down if app IO is already waiting for this extent */
1939         }
1940         spin_unlock_irq(&mdev->al_lock);
1941
1942         curr_events = (int)part_stat_read(&disk->part0, sectors[0]) +
1943                       (int)part_stat_read(&disk->part0, sectors[1]) -
1944                         atomic_read(&mdev->rs_sect_ev);
1945
1946         if (!mdev->rs_last_events || curr_events - mdev->rs_last_events > 64) {
1947                 unsigned long rs_left;
1948                 int i;
1949
1950                 mdev->rs_last_events = curr_events;
1951
1952                 /* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
1953                  * approx. */
1954                 i = (mdev->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
1955
1956                 if (mdev->state.conn == C_VERIFY_S || mdev->state.conn == C_VERIFY_T)
1957                         rs_left = mdev->ov_left;
1958                 else
1959                         rs_left = drbd_bm_total_weight(mdev) - mdev->rs_failed;
1960
1961                 dt = ((long)jiffies - (long)mdev->rs_mark_time[i]) / HZ;
1962                 if (!dt)
1963                         dt++;
1964                 db = mdev->rs_mark_left[i] - rs_left;
1965                 dbdt = Bit2KB(db/dt);
1966
1967                 if (dbdt > mdev->sync_conf.c_min_rate)
1968                         throttle = 1;
1969         }
1970         return throttle;
1971 }
1972
1973
1974 static int receive_DataRequest(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int digest_size)
1975 {
1976         sector_t sector;
1977         const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
1978         struct drbd_epoch_entry *e;
1979         struct digest_info *di = NULL;
1980         int size, verb;
1981         unsigned int fault_type;
1982         struct p_block_req *p = &mdev->data.rbuf.block_req;
1983
1984         sector = be64_to_cpu(p->sector);
1985         size   = be32_to_cpu(p->blksize);
1986
1987         if (size <= 0 || (size & 0x1ff) != 0 || size > DRBD_MAX_BIO_SIZE) {
1988                 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
1989                                 (unsigned long long)sector, size);
1990                 return false;
1991         }
1992         if (sector + (size>>9) > capacity) {
1993                 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
1994                                 (unsigned long long)sector, size);
1995                 return false;
1996         }
1997
1998         if (!get_ldev_if_state(mdev, D_UP_TO_DATE)) {
1999                 verb = 1;
2000                 switch (cmd) {
2001                 case P_DATA_REQUEST:
2002                         drbd_send_ack_rp(mdev, P_NEG_DREPLY, p);
2003                         break;
2004                 case P_RS_DATA_REQUEST:
2005                 case P_CSUM_RS_REQUEST:
2006                 case P_OV_REQUEST:
2007                         drbd_send_ack_rp(mdev, P_NEG_RS_DREPLY , p);
2008                         break;
2009                 case P_OV_REPLY:
2010                         verb = 0;
2011                         dec_rs_pending(mdev);
2012                         drbd_send_ack_ex(mdev, P_OV_RESULT, sector, size, ID_IN_SYNC);
2013                         break;
2014                 default:
2015                         dev_err(DEV, "unexpected command (%s) in receive_DataRequest\n",
2016                                 cmdname(cmd));
2017                 }
2018                 if (verb && __ratelimit(&drbd_ratelimit_state))
2019                         dev_err(DEV, "Can not satisfy peer's read request, "
2020                             "no local data.\n");
2021
2022                 /* drain possibly payload */
2023                 return drbd_drain_block(mdev, digest_size);
2024         }
2025
2026         /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2027          * "criss-cross" setup, that might cause write-out on some other DRBD,
2028          * which in turn might block on the other node at this very place.  */
2029         e = drbd_alloc_ee(mdev, p->block_id, sector, size, GFP_NOIO);
2030         if (!e) {
2031                 put_ldev(mdev);
2032                 return false;
2033         }
2034
2035         switch (cmd) {
2036         case P_DATA_REQUEST:
2037                 e->w.cb = w_e_end_data_req;
2038                 fault_type = DRBD_FAULT_DT_RD;
2039                 /* application IO, don't drbd_rs_begin_io */
2040                 goto submit;
2041
2042         case P_RS_DATA_REQUEST:
2043                 e->w.cb = w_e_end_rsdata_req;
2044                 fault_type = DRBD_FAULT_RS_RD;
2045                 /* used in the sector offset progress display */
2046                 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
2047                 break;
2048
2049         case P_OV_REPLY:
2050         case P_CSUM_RS_REQUEST:
2051                 fault_type = DRBD_FAULT_RS_RD;
2052                 di = kmalloc(sizeof(*di) + digest_size, GFP_NOIO);
2053                 if (!di)
2054                         goto out_free_e;
2055
2056                 di->digest_size = digest_size;
2057                 di->digest = (((char *)di)+sizeof(struct digest_info));
2058
2059                 e->digest = di;
2060                 e->flags |= EE_HAS_DIGEST;
2061
2062                 if (drbd_recv(mdev, di->digest, digest_size) != digest_size)
2063                         goto out_free_e;
2064
2065                 if (cmd == P_CSUM_RS_REQUEST) {
2066                         D_ASSERT(mdev->agreed_pro_version >= 89);
2067                         e->w.cb = w_e_end_csum_rs_req;
2068                         /* used in the sector offset progress display */
2069                         mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
2070                 } else if (cmd == P_OV_REPLY) {
2071                         /* track progress, we may need to throttle */
2072                         atomic_add(size >> 9, &mdev->rs_sect_in);
2073                         e->w.cb = w_e_end_ov_reply;
2074                         dec_rs_pending(mdev);
2075                         /* drbd_rs_begin_io done when we sent this request,
2076                          * but accounting still needs to be done. */
2077                         goto submit_for_resync;
2078                 }
2079                 break;
2080
2081         case P_OV_REQUEST:
2082                 if (mdev->ov_start_sector == ~(sector_t)0 &&
2083                     mdev->agreed_pro_version >= 90) {
2084                         unsigned long now = jiffies;
2085                         int i;
2086                         mdev->ov_start_sector = sector;
2087                         mdev->ov_position = sector;
2088                         mdev->ov_left = drbd_bm_bits(mdev) - BM_SECT_TO_BIT(sector);
2089                         mdev->rs_total = mdev->ov_left;
2090                         for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2091                                 mdev->rs_mark_left[i] = mdev->ov_left;
2092                                 mdev->rs_mark_time[i] = now;
2093                         }
2094                         dev_info(DEV, "Online Verify start sector: %llu\n",
2095                                         (unsigned long long)sector);
2096                 }
2097                 e->w.cb = w_e_end_ov_req;
2098                 fault_type = DRBD_FAULT_RS_RD;
2099                 break;
2100
2101         default:
2102                 dev_err(DEV, "unexpected command (%s) in receive_DataRequest\n",
2103                     cmdname(cmd));
2104                 fault_type = DRBD_FAULT_MAX;
2105                 goto out_free_e;
2106         }
2107
2108         /* Throttle, drbd_rs_begin_io and submit should become asynchronous
2109          * wrt the receiver, but it is not as straightforward as it may seem.
2110          * Various places in the resync start and stop logic assume resync
2111          * requests are processed in order, requeuing this on the worker thread
2112          * introduces a bunch of new code for synchronization between threads.
2113          *
2114          * Unlimited throttling before drbd_rs_begin_io may stall the resync
2115          * "forever", throttling after drbd_rs_begin_io will lock that extent
2116          * for application writes for the same time.  For now, just throttle
2117          * here, where the rest of the code expects the receiver to sleep for
2118          * a while, anyways.
2119          */
2120
2121         /* Throttle before drbd_rs_begin_io, as that locks out application IO;
2122          * this defers syncer requests for some time, before letting at least
2123          * on request through.  The resync controller on the receiving side
2124          * will adapt to the incoming rate accordingly.
2125          *
2126          * We cannot throttle here if remote is Primary/SyncTarget:
2127          * we would also throttle its application reads.
2128          * In that case, throttling is done on the SyncTarget only.
2129          */
2130         if (mdev->state.peer != R_PRIMARY && drbd_rs_should_slow_down(mdev, sector))
2131                 schedule_timeout_uninterruptible(HZ/10);
2132         if (drbd_rs_begin_io(mdev, sector))
2133                 goto out_free_e;
2134
2135 submit_for_resync:
2136         atomic_add(size >> 9, &mdev->rs_sect_ev);
2137
2138 submit:
2139         inc_unacked(mdev);
2140         spin_lock_irq(&mdev->req_lock);
2141         list_add_tail(&e->w.list, &mdev->read_ee);
2142         spin_unlock_irq(&mdev->req_lock);
2143
2144         if (drbd_submit_ee(mdev, e, READ, fault_type) == 0)
2145                 return true;
2146
2147         /* don't care for the reason here */
2148         dev_err(DEV, "submit failed, triggering re-connect\n");
2149         spin_lock_irq(&mdev->req_lock);
2150         list_del(&e->w.list);
2151         spin_unlock_irq(&mdev->req_lock);
2152         /* no drbd_rs_complete_io(), we are dropping the connection anyways */
2153
2154 out_free_e:
2155         put_ldev(mdev);
2156         drbd_free_ee(mdev, e);
2157         return false;
2158 }
2159
2160 static int drbd_asb_recover_0p(struct drbd_conf *mdev) __must_hold(local)
2161 {
2162         int self, peer, rv = -100;
2163         unsigned long ch_self, ch_peer;
2164
2165         self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2166         peer = mdev->p_uuid[UI_BITMAP] & 1;
2167
2168         ch_peer = mdev->p_uuid[UI_SIZE];
2169         ch_self = mdev->comm_bm_set;
2170
2171         switch (mdev->net_conf->after_sb_0p) {
2172         case ASB_CONSENSUS:
2173         case ASB_DISCARD_SECONDARY:
2174         case ASB_CALL_HELPER:
2175                 dev_err(DEV, "Configuration error.\n");
2176                 break;
2177         case ASB_DISCONNECT:
2178                 break;
2179         case ASB_DISCARD_YOUNGER_PRI:
2180                 if (self == 0 && peer == 1) {
2181                         rv = -1;
2182                         break;
2183                 }
2184                 if (self == 1 && peer == 0) {
2185                         rv =  1;
2186                         break;
2187                 }
2188                 /* Else fall through to one of the other strategies... */
2189         case ASB_DISCARD_OLDER_PRI:
2190                 if (self == 0 && peer == 1) {
2191                         rv = 1;
2192                         break;
2193                 }
2194                 if (self == 1 && peer == 0) {
2195                         rv = -1;
2196                         break;
2197                 }
2198                 /* Else fall through to one of the other strategies... */
2199                 dev_warn(DEV, "Discard younger/older primary did not find a decision\n"
2200                      "Using discard-least-changes instead\n");
2201         case ASB_DISCARD_ZERO_CHG:
2202                 if (ch_peer == 0 && ch_self == 0) {
2203                         rv = test_bit(DISCARD_CONCURRENT, &mdev->flags)
2204                                 ? -1 : 1;
2205                         break;
2206                 } else {
2207                         if (ch_peer == 0) { rv =  1; break; }
2208                         if (ch_self == 0) { rv = -1; break; }
2209                 }
2210                 if (mdev->net_conf->after_sb_0p == ASB_DISCARD_ZERO_CHG)
2211                         break;
2212         case ASB_DISCARD_LEAST_CHG:
2213                 if      (ch_self < ch_peer)
2214                         rv = -1;
2215                 else if (ch_self > ch_peer)
2216                         rv =  1;
2217                 else /* ( ch_self == ch_peer ) */
2218                      /* Well, then use something else. */
2219                         rv = test_bit(DISCARD_CONCURRENT, &mdev->flags)
2220                                 ? -1 : 1;
2221                 break;
2222         case ASB_DISCARD_LOCAL:
2223                 rv = -1;
2224                 break;
2225         case ASB_DISCARD_REMOTE:
2226                 rv =  1;
2227         }
2228
2229         return rv;
2230 }
2231
2232 static int drbd_asb_recover_1p(struct drbd_conf *mdev) __must_hold(local)
2233 {
2234         int hg, rv = -100;
2235
2236         switch (mdev->net_conf->after_sb_1p) {
2237         case ASB_DISCARD_YOUNGER_PRI:
2238         case ASB_DISCARD_OLDER_PRI:
2239         case ASB_DISCARD_LEAST_CHG:
2240         case ASB_DISCARD_LOCAL:
2241         case ASB_DISCARD_REMOTE:
2242                 dev_err(DEV, "Configuration error.\n");
2243                 break;
2244         case ASB_DISCONNECT:
2245                 break;
2246         case ASB_CONSENSUS:
2247                 hg = drbd_asb_recover_0p(mdev);
2248                 if (hg == -1 && mdev->state.role == R_SECONDARY)
2249                         rv = hg;
2250                 if (hg == 1  && mdev->state.role == R_PRIMARY)
2251                         rv = hg;
2252                 break;
2253         case ASB_VIOLENTLY:
2254                 rv = drbd_asb_recover_0p(mdev);
2255                 break;
2256         case ASB_DISCARD_SECONDARY:
2257                 return mdev->state.role == R_PRIMARY ? 1 : -1;
2258         case ASB_CALL_HELPER:
2259                 hg = drbd_asb_recover_0p(mdev);
2260                 if (hg == -1 && mdev->state.role == R_PRIMARY) {
2261                         enum drbd_state_rv rv2;
2262
2263                         drbd_set_role(mdev, R_SECONDARY, 0);
2264                          /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2265                           * we might be here in C_WF_REPORT_PARAMS which is transient.
2266                           * we do not need to wait for the after state change work either. */
2267                         rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2268                         if (rv2 != SS_SUCCESS) {
2269                                 drbd_khelper(mdev, "pri-lost-after-sb");
2270                         } else {
2271                                 dev_warn(DEV, "Successfully gave up primary role.\n");
2272                                 rv = hg;
2273                         }
2274                 } else
2275                         rv = hg;
2276         }
2277
2278         return rv;
2279 }
2280
2281 static int drbd_asb_recover_2p(struct drbd_conf *mdev) __must_hold(local)
2282 {
2283         int hg, rv = -100;
2284
2285         switch (mdev->net_conf->after_sb_2p) {
2286         case ASB_DISCARD_YOUNGER_PRI:
2287         case ASB_DISCARD_OLDER_PRI:
2288         case ASB_DISCARD_LEAST_CHG:
2289         case ASB_DISCARD_LOCAL:
2290         case ASB_DISCARD_REMOTE:
2291         case ASB_CONSENSUS:
2292         case ASB_DISCARD_SECONDARY:
2293                 dev_err(DEV, "Configuration error.\n");
2294                 break;
2295         case ASB_VIOLENTLY:
2296                 rv = drbd_asb_recover_0p(mdev);
2297                 break;
2298         case ASB_DISCONNECT:
2299                 break;
2300         case ASB_CALL_HELPER:
2301                 hg = drbd_asb_recover_0p(mdev);
2302                 if (hg == -1) {
2303                         enum drbd_state_rv rv2;
2304
2305                          /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2306                           * we might be here in C_WF_REPORT_PARAMS which is transient.
2307                           * we do not need to wait for the after state change work either. */
2308                         rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2309                         if (rv2 != SS_SUCCESS) {
2310                                 drbd_khelper(mdev, "pri-lost-after-sb");
2311                         } else {
2312                                 dev_warn(DEV, "Successfully gave up primary role.\n");
2313                                 rv = hg;
2314                         }
2315                 } else
2316                         rv = hg;
2317         }
2318
2319         return rv;
2320 }
2321
2322 static void drbd_uuid_dump(struct drbd_conf *mdev, char *text, u64 *uuid,
2323                            u64 bits, u64 flags)
2324 {
2325         if (!uuid) {
2326                 dev_info(DEV, "%s uuid info vanished while I was looking!\n", text);
2327                 return;
2328         }
2329         dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
2330              text,
2331              (unsigned long long)uuid[UI_CURRENT],
2332              (unsigned long long)uuid[UI_BITMAP],
2333              (unsigned long long)uuid[UI_HISTORY_START],
2334              (unsigned long long)uuid[UI_HISTORY_END],
2335              (unsigned long long)bits,
2336              (unsigned long long)flags);
2337 }
2338
2339 /*
2340   100   after split brain try auto recover
2341     2   C_SYNC_SOURCE set BitMap
2342     1   C_SYNC_SOURCE use BitMap
2343     0   no Sync
2344    -1   C_SYNC_TARGET use BitMap
2345    -2   C_SYNC_TARGET set BitMap
2346  -100   after split brain, disconnect
2347 -1000   unrelated data
2348 -1091   requires proto 91
2349 -1096   requires proto 96
2350  */
2351 static int drbd_uuid_compare(struct drbd_conf *mdev, int *rule_nr) __must_hold(local)
2352 {
2353         u64 self, peer;
2354         int i, j;
2355
2356         self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2357         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2358
2359         *rule_nr = 10;
2360         if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
2361                 return 0;
2362
2363         *rule_nr = 20;
2364         if ((self == UUID_JUST_CREATED || self == (u64)0) &&
2365              peer != UUID_JUST_CREATED)
2366                 return -2;
2367
2368         *rule_nr = 30;
2369         if (self != UUID_JUST_CREATED &&
2370             (peer == UUID_JUST_CREATED || peer == (u64)0))
2371                 return 2;
2372
2373         if (self == peer) {
2374                 int rct, dc; /* roles at crash time */
2375
2376                 if (mdev->p_uuid[UI_BITMAP] == (u64)0 && mdev->ldev->md.uuid[UI_BITMAP] != (u64)0) {
2377
2378                         if (mdev->agreed_pro_version < 91)
2379                                 return -1091;
2380
2381                         if ((mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
2382                             (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
2383                                 dev_info(DEV, "was SyncSource, missed the resync finished event, corrected myself:\n");
2384                                 drbd_uuid_set_bm(mdev, 0UL);
2385
2386                                 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2387                                                mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2388                                 *rule_nr = 34;
2389                         } else {
2390                                 dev_info(DEV, "was SyncSource (peer failed to write sync_uuid)\n");
2391                                 *rule_nr = 36;
2392                         }
2393
2394                         return 1;
2395                 }
2396
2397                 if (mdev->ldev->md.uuid[UI_BITMAP] == (u64)0 && mdev->p_uuid[UI_BITMAP] != (u64)0) {
2398
2399                         if (mdev->agreed_pro_version < 91)
2400                                 return -1091;
2401
2402                         if ((mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_BITMAP] & ~((u64)1)) &&
2403                             (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
2404                                 dev_info(DEV, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
2405
2406                                 mdev->p_uuid[UI_HISTORY_START + 1] = mdev->p_uuid[UI_HISTORY_START];
2407                                 mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_BITMAP];
2408                                 mdev->p_uuid[UI_BITMAP] = 0UL;
2409
2410                                 drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2411                                 *rule_nr = 35;
2412                         } else {
2413                                 dev_info(DEV, "was SyncTarget (failed to write sync_uuid)\n");
2414                                 *rule_nr = 37;
2415                         }
2416
2417                         return -1;
2418                 }
2419
2420                 /* Common power [off|failure] */
2421                 rct = (test_bit(CRASHED_PRIMARY, &mdev->flags) ? 1 : 0) +
2422                         (mdev->p_uuid[UI_FLAGS] & 2);
2423                 /* lowest bit is set when we were primary,
2424                  * next bit (weight 2) is set when peer was primary */
2425                 *rule_nr = 40;
2426
2427                 switch (rct) {
2428                 case 0: /* !self_pri && !peer_pri */ return 0;
2429                 case 1: /*  self_pri && !peer_pri */ return 1;
2430                 case 2: /* !self_pri &&  peer_pri */ return -1;
2431                 case 3: /*  self_pri &&  peer_pri */
2432                         dc = test_bit(DISCARD_CONCURRENT, &mdev->flags);
2433                         return dc ? -1 : 1;
2434                 }
2435         }
2436
2437         *rule_nr = 50;
2438         peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2439         if (self == peer)
2440                 return -1;
2441
2442         *rule_nr = 51;
2443         peer = mdev->p_uuid[UI_HISTORY_START] & ~((u64)1);
2444         if (self == peer) {
2445                 if (mdev->agreed_pro_version < 96 ?
2446                     (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
2447                     (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
2448                     peer + UUID_NEW_BM_OFFSET == (mdev->p_uuid[UI_BITMAP] & ~((u64)1))) {
2449                         /* The last P_SYNC_UUID did not get though. Undo the last start of
2450                            resync as sync source modifications of the peer's UUIDs. */
2451
2452                         if (mdev->agreed_pro_version < 91)
2453                                 return -1091;
2454
2455                         mdev->p_uuid[UI_BITMAP] = mdev->p_uuid[UI_HISTORY_START];
2456                         mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_HISTORY_START + 1];
2457
2458                         dev_info(DEV, "Lost last syncUUID packet, corrected:\n");
2459                         drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2460
2461                         return -1;
2462                 }
2463         }
2464
2465         *rule_nr = 60;
2466         self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2467         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2468                 peer = mdev->p_uuid[i] & ~((u64)1);
2469                 if (self == peer)
2470                         return -2;
2471         }
2472
2473         *rule_nr = 70;
2474         self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2475         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2476         if (self == peer)
2477                 return 1;
2478
2479         *rule_nr = 71;
2480         self = mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2481         if (self == peer) {
2482                 if (mdev->agreed_pro_version < 96 ?
2483                     (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
2484                     (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
2485                     self + UUID_NEW_BM_OFFSET == (mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
2486                         /* The last P_SYNC_UUID did not get though. Undo the last start of
2487                            resync as sync source modifications of our UUIDs. */
2488
2489                         if (mdev->agreed_pro_version < 91)
2490                                 return -1091;
2491
2492                         _drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_HISTORY_START]);
2493                         _drbd_uuid_set(mdev, UI_HISTORY_START, mdev->ldev->md.uuid[UI_HISTORY_START + 1]);
2494
2495                         dev_info(DEV, "Last syncUUID did not get through, corrected:\n");
2496                         drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2497                                        mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2498
2499                         return 1;
2500                 }
2501         }
2502
2503
2504         *rule_nr = 80;
2505         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2506         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2507                 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2508                 if (self == peer)
2509                         return 2;
2510         }
2511
2512         *rule_nr = 90;
2513         self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2514         peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2515         if (self == peer && self != ((u64)0))
2516                 return 100;
2517
2518         *rule_nr = 100;
2519         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2520                 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2521                 for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
2522                         peer = mdev->p_uuid[j] & ~((u64)1);
2523                         if (self == peer)
2524                                 return -100;
2525                 }
2526         }
2527
2528         return -1000;
2529 }
2530
2531 /* drbd_sync_handshake() returns the new conn state on success, or
2532    CONN_MASK (-1) on failure.
2533  */
2534 static enum drbd_conns drbd_sync_handshake(struct drbd_conf *mdev, enum drbd_role peer_role,
2535                                            enum drbd_disk_state peer_disk) __must_hold(local)
2536 {
2537         int hg, rule_nr;
2538         enum drbd_conns rv = C_MASK;
2539         enum drbd_disk_state mydisk;
2540
2541         mydisk = mdev->state.disk;
2542         if (mydisk == D_NEGOTIATING)
2543                 mydisk = mdev->new_state_tmp.disk;
2544
2545         dev_info(DEV, "drbd_sync_handshake:\n");
2546         drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid, mdev->comm_bm_set, 0);
2547         drbd_uuid_dump(mdev, "peer", mdev->p_uuid,
2548                        mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2549
2550         hg = drbd_uuid_compare(mdev, &rule_nr);
2551
2552         dev_info(DEV, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
2553
2554         if (hg == -1000) {
2555                 dev_alert(DEV, "Unrelated data, aborting!\n");
2556                 return C_MASK;
2557         }
2558         if (hg < -1000) {
2559                 dev_alert(DEV, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
2560                 return C_MASK;
2561         }
2562
2563         if    ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
2564             (peer_disk == D_INCONSISTENT && mydisk    > D_INCONSISTENT)) {
2565                 int f = (hg == -100) || abs(hg) == 2;
2566                 hg = mydisk > D_INCONSISTENT ? 1 : -1;
2567                 if (f)
2568                         hg = hg*2;
2569                 dev_info(DEV, "Becoming sync %s due to disk states.\n",
2570                      hg > 0 ? "source" : "target");
2571         }
2572
2573         if (abs(hg) == 100)
2574                 drbd_khelper(mdev, "initial-split-brain");
2575
2576         if (hg == 100 || (hg == -100 && mdev->net_conf->always_asbp)) {
2577                 int pcount = (mdev->state.role == R_PRIMARY)
2578                            + (peer_role == R_PRIMARY);
2579                 int forced = (hg == -100);
2580
2581                 switch (pcount) {
2582                 case 0:
2583                         hg = drbd_asb_recover_0p(mdev);
2584                         break;
2585                 case 1:
2586                         hg = drbd_asb_recover_1p(mdev);
2587                         break;
2588                 case 2:
2589                         hg = drbd_asb_recover_2p(mdev);
2590                         break;
2591                 }
2592                 if (abs(hg) < 100) {
2593                         dev_warn(DEV, "Split-Brain detected, %d primaries, "
2594                              "automatically solved. Sync from %s node\n",
2595                              pcount, (hg < 0) ? "peer" : "this");
2596                         if (forced) {
2597                                 dev_warn(DEV, "Doing a full sync, since"
2598                                      " UUIDs where ambiguous.\n");
2599                                 hg = hg*2;
2600                         }
2601                 }
2602         }
2603
2604         if (hg == -100) {
2605                 if (mdev->net_conf->want_lose && !(mdev->p_uuid[UI_FLAGS]&1))
2606                         hg = -1;
2607                 if (!mdev->net_conf->want_lose && (mdev->p_uuid[UI_FLAGS]&1))
2608                         hg = 1;
2609
2610                 if (abs(hg) < 100)
2611                         dev_warn(DEV, "Split-Brain detected, manually solved. "
2612                              "Sync from %s node\n",
2613                              (hg < 0) ? "peer" : "this");
2614         }
2615
2616         if (hg == -100) {
2617                 /* FIXME this log message is not correct if we end up here
2618                  * after an attempted attach on a diskless node.
2619                  * We just refuse to attach -- well, we drop the "connection"
2620                  * to that disk, in a way... */
2621                 dev_alert(DEV, "Split-Brain detected but unresolved, dropping connection!\n");
2622                 drbd_khelper(mdev, "split-brain");
2623                 return C_MASK;
2624         }
2625
2626         if (hg > 0 && mydisk <= D_INCONSISTENT) {
2627                 dev_err(DEV, "I shall become SyncSource, but I am inconsistent!\n");
2628                 return C_MASK;
2629         }
2630
2631         if (hg < 0 && /* by intention we do not use mydisk here. */
2632             mdev->state.role == R_PRIMARY && mdev->state.disk >= D_CONSISTENT) {
2633                 switch (mdev->net_conf->rr_conflict) {
2634                 case ASB_CALL_HELPER:
2635                         drbd_khelper(mdev, "pri-lost");
2636                         /* fall through */
2637                 case ASB_DISCONNECT:
2638                         dev_err(DEV, "I shall become SyncTarget, but I am primary!\n");
2639                         return C_MASK;
2640                 case ASB_VIOLENTLY:
2641                         dev_warn(DEV, "Becoming SyncTarget, violating the stable-data"
2642                              "assumption\n");
2643                 }
2644         }
2645
2646         if (mdev->net_conf->dry_run || test_bit(CONN_DRY_RUN, &mdev->flags)) {
2647                 if (hg == 0)
2648                         dev_info(DEV, "dry-run connect: No resync, would become Connected immediately.\n");
2649                 else
2650                         dev_info(DEV, "dry-run connect: Would become %s, doing a %s resync.",
2651                                  drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
2652                                  abs(hg) >= 2 ? "full" : "bit-map based");
2653                 return C_MASK;
2654         }
2655
2656         if (abs(hg) >= 2) {
2657                 dev_info(DEV, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
2658                 if (drbd_bitmap_io(mdev, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
2659                                         BM_LOCKED_SET_ALLOWED))
2660                         return C_MASK;
2661         }
2662
2663         if (hg > 0) { /* become sync source. */
2664                 rv = C_WF_BITMAP_S;
2665         } else if (hg < 0) { /* become sync target */
2666                 rv = C_WF_BITMAP_T;
2667         } else {
2668                 rv = C_CONNECTED;
2669                 if (drbd_bm_total_weight(mdev)) {
2670                         dev_info(DEV, "No resync, but %lu bits in bitmap!\n",
2671                              drbd_bm_total_weight(mdev));
2672                 }
2673         }
2674
2675         return rv;
2676 }
2677
2678 /* returns 1 if invalid */
2679 static int cmp_after_sb(enum drbd_after_sb_p peer, enum drbd_after_sb_p self)
2680 {
2681         /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
2682         if ((peer == ASB_DISCARD_REMOTE && self == ASB_DISCARD_LOCAL) ||
2683             (self == ASB_DISCARD_REMOTE && peer == ASB_DISCARD_LOCAL))
2684                 return 0;
2685
2686         /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
2687         if (peer == ASB_DISCARD_REMOTE || peer == ASB_DISCARD_LOCAL ||
2688             self == ASB_DISCARD_REMOTE || self == ASB_DISCARD_LOCAL)
2689                 return 1;
2690
2691         /* everything else is valid if they are equal on both sides. */
2692         if (peer == self)
2693                 return 0;
2694
2695         /* everything es is invalid. */
2696         return 1;
2697 }
2698
2699 static int receive_protocol(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
2700 {
2701         struct p_protocol *p = &mdev->data.rbuf.protocol;
2702         int p_proto, p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
2703         int p_want_lose, p_two_primaries, cf;
2704         char p_integrity_alg[SHARED_SECRET_MAX] = "";
2705
2706         p_proto         = be32_to_cpu(p->protocol);
2707         p_after_sb_0p   = be32_to_cpu(p->after_sb_0p);
2708         p_after_sb_1p   = be32_to_cpu(p->after_sb_1p);
2709         p_after_sb_2p   = be32_to_cpu(p->after_sb_2p);
2710         p_two_primaries = be32_to_cpu(p->two_primaries);
2711         cf              = be32_to_cpu(p->conn_flags);
2712         p_want_lose = cf & CF_WANT_LOSE;
2713
2714         clear_bit(CONN_DRY_RUN, &mdev->flags);
2715
2716         if (cf & CF_DRY_RUN)
2717                 set_bit(CONN_DRY_RUN, &mdev->flags);
2718
2719         if (p_proto != mdev->net_conf->wire_protocol) {
2720                 dev_err(DEV, "incompatible communication protocols\n");
2721                 goto disconnect;
2722         }
2723
2724         if (cmp_after_sb(p_after_sb_0p, mdev->net_conf->after_sb_0p)) {
2725                 dev_err(DEV, "incompatible after-sb-0pri settings\n");
2726                 goto disconnect;
2727         }
2728
2729         if (cmp_after_sb(p_after_sb_1p, mdev->net_conf->after_sb_1p)) {
2730                 dev_err(DEV, "incompatible after-sb-1pri settings\n");
2731                 goto disconnect;
2732         }
2733
2734         if (cmp_after_sb(p_after_sb_2p, mdev->net_conf->after_sb_2p)) {
2735                 dev_err(DEV, "incompatible after-sb-2pri settings\n");
2736                 goto disconnect;
2737         }
2738
2739         if (p_want_lose && mdev->net_conf->want_lose) {
2740                 dev_err(DEV, "both sides have the 'want_lose' flag set\n");
2741                 goto disconnect;
2742         }
2743
2744         if (p_two_primaries != mdev->net_conf->two_primaries) {
2745                 dev_err(DEV, "incompatible setting of the two-primaries options\n");
2746                 goto disconnect;
2747         }
2748
2749         if (mdev->agreed_pro_version >= 87) {
2750                 unsigned char *my_alg = mdev->net_conf->integrity_alg;
2751
2752                 if (drbd_recv(mdev, p_integrity_alg, data_size) != data_size)
2753                         return false;
2754
2755                 p_integrity_alg[SHARED_SECRET_MAX-1] = 0;
2756                 if (strcmp(p_integrity_alg, my_alg)) {
2757                         dev_err(DEV, "incompatible setting of the data-integrity-alg\n");
2758                         goto disconnect;
2759                 }
2760                 dev_info(DEV, "data-integrity-alg: %s\n",
2761                      my_alg[0] ? my_alg : (unsigned char *)"<not-used>");
2762         }
2763
2764         return true;
2765
2766 disconnect:
2767         drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2768         return false;
2769 }
2770
2771 /* helper function
2772  * input: alg name, feature name
2773  * return: NULL (alg name was "")
2774  *         ERR_PTR(error) if something goes wrong
2775  *         or the crypto hash ptr, if it worked out ok. */
2776 struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_conf *mdev,
2777                 const char *alg, const char *name)
2778 {
2779         struct crypto_hash *tfm;
2780
2781         if (!alg[0])
2782                 return NULL;
2783
2784         tfm = crypto_alloc_hash(alg, 0, CRYPTO_ALG_ASYNC);
2785         if (IS_ERR(tfm)) {
2786                 dev_err(DEV, "Can not allocate \"%s\" as %s (reason: %ld)\n",
2787                         alg, name, PTR_ERR(tfm));
2788                 return tfm;
2789         }
2790         if (!drbd_crypto_is_hash(crypto_hash_tfm(tfm))) {
2791                 crypto_free_hash(tfm);
2792                 dev_err(DEV, "\"%s\" is not a digest (%s)\n", alg, name);
2793                 return ERR_PTR(-EINVAL);
2794         }
2795         return tfm;
2796 }
2797
2798 static int receive_SyncParam(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int packet_size)
2799 {
2800         int ok = true;
2801         struct p_rs_param_95 *p = &mdev->data.rbuf.rs_param_95;
2802         unsigned int header_size, data_size, exp_max_sz;
2803         struct crypto_hash *verify_tfm = NULL;
2804         struct crypto_hash *csums_tfm = NULL;
2805         const int apv = mdev->agreed_pro_version;
2806         int *rs_plan_s = NULL;
2807         int fifo_size = 0;
2808
2809         exp_max_sz  = apv <= 87 ? sizeof(struct p_rs_param)
2810                     : apv == 88 ? sizeof(struct p_rs_param)
2811                                         + SHARED_SECRET_MAX
2812                     : apv <= 94 ? sizeof(struct p_rs_param_89)
2813                     : /* apv >= 95 */ sizeof(struct p_rs_param_95);
2814
2815         if (packet_size > exp_max_sz) {
2816                 dev_err(DEV, "SyncParam packet too long: received %u, expected <= %u bytes\n",
2817                     packet_size, exp_max_sz);
2818                 return false;
2819         }
2820
2821         if (apv <= 88) {
2822                 header_size = sizeof(struct p_rs_param) - sizeof(struct p_header80);
2823                 data_size   = packet_size  - header_size;
2824         } else if (apv <= 94) {
2825                 header_size = sizeof(struct p_rs_param_89) - sizeof(struct p_header80);
2826                 data_size   = packet_size  - header_size;
2827                 D_ASSERT(data_size == 0);
2828         } else {
2829                 header_size = sizeof(struct p_rs_param_95) - sizeof(struct p_header80);
2830                 data_size   = packet_size  - header_size;
2831                 D_ASSERT(data_size == 0);
2832         }
2833
2834         /* initialize verify_alg and csums_alg */
2835         memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
2836
2837         if (drbd_recv(mdev, &p->head.payload, header_size) != header_size)
2838                 return false;
2839
2840         mdev->sync_conf.rate      = be32_to_cpu(p->rate);
2841
2842         if (apv >= 88) {
2843                 if (apv == 88) {
2844                         if (data_size > SHARED_SECRET_MAX || data_size == 0) {
2845                                 dev_err(DEV, "verify-alg of wrong size, "
2846                                         "peer wants %u, accepting only up to %u byte\n",
2847                                         data_size, SHARED_SECRET_MAX);
2848                                 return false;
2849                         }
2850
2851                         if (drbd_recv(mdev, p->verify_alg, data_size) != data_size)
2852                                 return false;
2853
2854                         /* we expect NUL terminated string */
2855                         /* but just in case someone tries to be evil */
2856                         D_ASSERT(p->verify_alg[data_size-1] == 0);
2857                         p->verify_alg[data_size-1] = 0;
2858
2859                 } else /* apv >= 89 */ {
2860                         /* we still expect NUL terminated strings */
2861                         /* but just in case someone tries to be evil */
2862                         D_ASSERT(p->verify_alg[SHARED_SECRET_MAX-1] == 0);
2863                         D_ASSERT(p->csums_alg[SHARED_SECRET_MAX-1] == 0);
2864                         p->verify_alg[SHARED_SECRET_MAX-1] = 0;
2865                         p->csums_alg[SHARED_SECRET_MAX-1] = 0;
2866                 }
2867
2868                 if (strcmp(mdev->sync_conf.verify_alg, p->verify_alg)) {
2869                         if (mdev->state.conn == C_WF_REPORT_PARAMS) {
2870                                 dev_err(DEV, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
2871                                     mdev->sync_conf.verify_alg, p->verify_alg);
2872                                 goto disconnect;
2873                         }
2874                         verify_tfm = drbd_crypto_alloc_digest_safe(mdev,
2875                                         p->verify_alg, "verify-alg");
2876                         if (IS_ERR(verify_tfm)) {
2877                                 verify_tfm = NULL;
2878                                 goto disconnect;
2879                         }
2880                 }
2881
2882                 if (apv >= 89 && strcmp(mdev->sync_conf.csums_alg, p->csums_alg)) {
2883                         if (mdev->state.conn == C_WF_REPORT_PARAMS) {
2884                                 dev_err(DEV, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
2885                                     mdev->sync_conf.csums_alg, p->csums_alg);
2886                                 goto disconnect;
2887                         }
2888                         csums_tfm = drbd_crypto_alloc_digest_safe(mdev,
2889                                         p->csums_alg, "csums-alg");
2890                         if (IS_ERR(csums_tfm)) {
2891                                 csums_tfm = NULL;
2892                                 goto disconnect;
2893                         }
2894                 }
2895
2896                 if (apv > 94) {
2897                         mdev->sync_conf.rate      = be32_to_cpu(p->rate);
2898                         mdev->sync_conf.c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
2899                         mdev->sync_conf.c_delay_target = be32_to_cpu(p->c_delay_target);
2900                         mdev->sync_conf.c_fill_target = be32_to_cpu(p->c_fill_target);
2901                         mdev->sync_conf.c_max_rate = be32_to_cpu(p->c_max_rate);
2902
2903                         fifo_size = (mdev->sync_conf.c_plan_ahead * 10 * SLEEP_TIME) / HZ;
2904                         if (fifo_size != mdev->rs_plan_s.size && fifo_size > 0) {
2905                                 rs_plan_s   = kzalloc(sizeof(int) * fifo_size, GFP_KERNEL);
2906                                 if (!rs_plan_s) {
2907                                         dev_err(DEV, "kmalloc of fifo_buffer failed");
2908                                         goto disconnect;
2909                                 }
2910                         }
2911                 }
2912
2913                 spin_lock(&mdev->peer_seq_lock);
2914                 /* lock against drbd_nl_syncer_conf() */
2915                 if (verify_tfm) {
2916                         strcpy(mdev->sync_conf.verify_alg, p->verify_alg);
2917                         mdev->sync_conf.verify_alg_len = strlen(p->verify_alg) + 1;
2918                         crypto_free_hash(mdev->verify_tfm);
2919                         mdev->verify_tfm = verify_tfm;
2920                         dev_info(DEV, "using verify-alg: \"%s\"\n", p->verify_alg);
2921                 }
2922                 if (csums_tfm) {
2923                         strcpy(mdev->sync_conf.csums_alg, p->csums_alg);
2924                         mdev->sync_conf.csums_alg_len = strlen(p->csums_alg) + 1;
2925                         crypto_free_hash(mdev->csums_tfm);
2926                         mdev->csums_tfm = csums_tfm;
2927                         dev_info(DEV, "using csums-alg: \"%s\"\n", p->csums_alg);
2928                 }
2929                 if (fifo_size != mdev->rs_plan_s.size) {
2930                         kfree(mdev->rs_plan_s.values);
2931                         mdev->rs_plan_s.values = rs_plan_s;
2932                         mdev->rs_plan_s.size   = fifo_size;
2933                         mdev->rs_planed = 0;
2934                 }
2935                 spin_unlock(&mdev->peer_seq_lock);
2936         }
2937
2938         return ok;
2939 disconnect:
2940         /* just for completeness: actually not needed,
2941          * as this is not reached if csums_tfm was ok. */
2942         crypto_free_hash(csums_tfm);
2943         /* but free the verify_tfm again, if csums_tfm did not work out */
2944         crypto_free_hash(verify_tfm);
2945         drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2946         return false;
2947 }
2948
2949 /* warn if the arguments differ by more than 12.5% */
2950 static void warn_if_differ_considerably(struct drbd_conf *mdev,
2951         const char *s, sector_t a, sector_t b)
2952 {
2953         sector_t d;
2954         if (a == 0 || b == 0)
2955                 return;
2956         d = (a > b) ? (a - b) : (b - a);
2957         if (d > (a>>3) || d > (b>>3))
2958                 dev_warn(DEV, "Considerable difference in %s: %llus vs. %llus\n", s,
2959                      (unsigned long long)a, (unsigned long long)b);
2960 }
2961
2962 static int receive_sizes(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
2963 {
2964         struct p_sizes *p = &mdev->data.rbuf.sizes;
2965         enum determine_dev_size dd = unchanged;
2966         sector_t p_size, p_usize, my_usize;
2967         int ldsc = 0; /* local disk size changed */
2968         enum dds_flags ddsf;
2969
2970         p_size = be64_to_cpu(p->d_size);
2971         p_usize = be64_to_cpu(p->u_size);
2972
2973         if (p_size == 0 && mdev->state.disk == D_DISKLESS) {
2974                 dev_err(DEV, "some backing storage is needed\n");
2975                 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2976                 return false;
2977         }
2978
2979         /* just store the peer's disk size for now.
2980          * we still need to figure out whether we accept that. */
2981         mdev->p_size = p_size;
2982
2983         if (get_ldev(mdev)) {
2984                 warn_if_differ_considerably(mdev, "lower level device sizes",
2985                            p_size, drbd_get_max_capacity(mdev->ldev));
2986                 warn_if_differ_considerably(mdev, "user requested size",
2987                                             p_usize, mdev->ldev->dc.disk_size);
2988
2989                 /* if this is the first connect, or an otherwise expected
2990                  * param exchange, choose the minimum */
2991                 if (mdev->state.conn == C_WF_REPORT_PARAMS)
2992                         p_usize = min_not_zero((sector_t)mdev->ldev->dc.disk_size,
2993                                              p_usize);
2994
2995                 my_usize = mdev->ldev->dc.disk_size;
2996
2997                 if (mdev->ldev->dc.disk_size != p_usize) {
2998                         mdev->ldev->dc.disk_size = p_usize;
2999                         dev_info(DEV, "Peer sets u_size to %lu sectors\n",
3000                              (unsigned long)mdev->ldev->dc.disk_size);
3001                 }
3002
3003                 /* Never shrink a device with usable data during connect.
3004                    But allow online shrinking if we are connected. */
3005                 if (drbd_new_dev_size(mdev, mdev->ldev, 0) <
3006                    drbd_get_capacity(mdev->this_bdev) &&
3007                    mdev->state.disk >= D_OUTDATED &&
3008                    mdev->state.conn < C_CONNECTED) {
3009                         dev_err(DEV, "The peer's disk size is too small!\n");
3010                         drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3011                         mdev->ldev->dc.disk_size = my_usize;
3012                         put_ldev(mdev);
3013                         return false;
3014                 }
3015                 put_ldev(mdev);
3016         }
3017
3018         ddsf = be16_to_cpu(p->dds_flags);
3019         if (get_ldev(mdev)) {
3020                 dd = drbd_determine_dev_size(mdev, ddsf);
3021                 put_ldev(mdev);
3022                 if (dd == dev_size_error)
3023                         return false;
3024                 drbd_md_sync(mdev);
3025         } else {
3026                 /* I am diskless, need to accept the peer's size. */
3027                 drbd_set_my_capacity(mdev, p_size);
3028         }
3029
3030         mdev->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
3031         drbd_reconsider_max_bio_size(mdev);
3032
3033         if (get_ldev(mdev)) {
3034                 if (mdev->ldev->known_size != drbd_get_capacity(mdev->ldev->backing_bdev)) {
3035                         mdev->ldev->known_size = drbd_get_capacity(mdev->ldev->backing_bdev);
3036                         ldsc = 1;
3037                 }
3038
3039                 put_ldev(mdev);
3040         }
3041
3042         if (mdev->state.conn > C_WF_REPORT_PARAMS) {
3043                 if (be64_to_cpu(p->c_size) !=
3044                     drbd_get_capacity(mdev->this_bdev) || ldsc) {
3045                         /* we have different sizes, probably peer
3046                          * needs to know my new size... */
3047                         drbd_send_sizes(mdev, 0, ddsf);
3048                 }
3049                 if (test_and_clear_bit(RESIZE_PENDING, &mdev->flags) ||
3050                     (dd == grew && mdev->state.conn == C_CONNECTED)) {
3051                         if (mdev->state.pdsk >= D_INCONSISTENT &&
3052                             mdev->state.disk >= D_INCONSISTENT) {
3053                                 if (ddsf & DDSF_NO_RESYNC)
3054                                         dev_info(DEV, "Resync of new storage suppressed with --assume-clean\n");
3055                                 else
3056                                         resync_after_online_grow(mdev);
3057                         } else
3058                                 set_bit(RESYNC_AFTER_NEG, &mdev->flags);
3059                 }
3060         }
3061
3062         return true;
3063 }
3064
3065 static int receive_uuids(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3066 {
3067         struct p_uuids *p = &mdev->data.rbuf.uuids;
3068         u64 *p_uuid;
3069         int i, updated_uuids = 0;
3070
3071         p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
3072
3073         for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
3074                 p_uuid[i] = be64_to_cpu(p->uuid[i]);
3075
3076         kfree(mdev->p_uuid);
3077         mdev->p_uuid = p_uuid;
3078
3079         if (mdev->state.conn < C_CONNECTED &&
3080             mdev->state.disk < D_INCONSISTENT &&
3081             mdev->state.role == R_PRIMARY &&
3082             (mdev->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
3083                 dev_err(DEV, "Can only connect to data with current UUID=%016llX\n",
3084                     (unsigned long long)mdev->ed_uuid);
3085                 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3086                 return false;
3087         }
3088
3089         if (get_ldev(mdev)) {
3090                 int skip_initial_sync =
3091                         mdev->state.conn == C_CONNECTED &&
3092                         mdev->agreed_pro_version >= 90 &&
3093                         mdev->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
3094                         (p_uuid[UI_FLAGS] & 8);
3095                 if (skip_initial_sync) {
3096                         dev_info(DEV, "Accepted new current UUID, preparing to skip initial sync\n");
3097                         drbd_bitmap_io(mdev, &drbd_bmio_clear_n_write,
3098                                         "clear_n_write from receive_uuids",
3099                                         BM_LOCKED_TEST_ALLOWED);
3100                         _drbd_uuid_set(mdev, UI_CURRENT, p_uuid[UI_CURRENT]);
3101                         _drbd_uuid_set(mdev, UI_BITMAP, 0);
3102                         _drbd_set_state(_NS2(mdev, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
3103                                         CS_VERBOSE, NULL);
3104                         drbd_md_sync(mdev);
3105                         updated_uuids = 1;
3106                 }
3107                 put_ldev(mdev);
3108         } else if (mdev->state.disk < D_INCONSISTENT &&
3109                    mdev->state.role == R_PRIMARY) {
3110                 /* I am a diskless primary, the peer just created a new current UUID
3111                    for me. */
3112                 updated_uuids = drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3113         }
3114
3115         /* Before we test for the disk state, we should wait until an eventually
3116            ongoing cluster wide state change is finished. That is important if
3117            we are primary and are detaching from our disk. We need to see the
3118            new disk state... */
3119         wait_event(mdev->misc_wait, !test_bit(CLUSTER_ST_CHANGE, &mdev->flags));
3120         if (mdev->state.conn >= C_CONNECTED && mdev->state.disk < D_INCONSISTENT)
3121                 updated_uuids |= drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3122
3123         if (updated_uuids)
3124                 drbd_print_uuids(mdev, "receiver updated UUIDs to");
3125
3126         return true;
3127 }
3128
3129 /**
3130  * convert_state() - Converts the peer's view of the cluster state to our point of view
3131  * @ps:         The state as seen by the peer.
3132  */
3133 static union drbd_state convert_state(union drbd_state ps)
3134 {
3135         union drbd_state ms;
3136
3137         static enum drbd_conns c_tab[] = {
3138                 [C_CONNECTED] = C_CONNECTED,
3139
3140                 [C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
3141                 [C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
3142                 [C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
3143                 [C_VERIFY_S]       = C_VERIFY_T,
3144                 [C_MASK]   = C_MASK,
3145         };
3146
3147         ms.i = ps.i;
3148
3149         ms.conn = c_tab[ps.conn];
3150         ms.peer = ps.role;
3151         ms.role = ps.peer;
3152         ms.pdsk = ps.disk;
3153         ms.disk = ps.pdsk;
3154         ms.peer_isp = (ps.aftr_isp | ps.user_isp);
3155
3156         return ms;
3157 }
3158
3159 static int receive_req_state(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3160 {
3161         struct p_req_state *p = &mdev->data.rbuf.req_state;
3162         union drbd_state mask, val;
3163         enum drbd_state_rv rv;
3164
3165         mask.i = be32_to_cpu(p->mask);
3166         val.i = be32_to_cpu(p->val);
3167
3168         if (test_bit(DISCARD_CONCURRENT, &mdev->flags) &&
3169             test_bit(CLUSTER_ST_CHANGE, &mdev->flags)) {
3170                 drbd_send_sr_reply(mdev, SS_CONCURRENT_ST_CHG);
3171                 return true;
3172         }
3173
3174         mask = convert_state(mask);
3175         val = convert_state(val);
3176
3177         rv = drbd_change_state(mdev, CS_VERBOSE, mask, val);
3178
3179         drbd_send_sr_reply(mdev, rv);
3180         drbd_md_sync(mdev);
3181
3182         return true;
3183 }
3184
3185 static int receive_state(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3186 {
3187         struct p_state *p = &mdev->data.rbuf.state;
3188         union drbd_state os, ns, peer_state;
3189         enum drbd_disk_state real_peer_disk;
3190         enum chg_state_flags cs_flags;
3191         int rv;
3192
3193         peer_state.i = be32_to_cpu(p->state);
3194
3195         real_peer_disk = peer_state.disk;
3196         if (peer_state.disk == D_NEGOTIATING) {
3197                 real_peer_disk = mdev->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
3198                 dev_info(DEV, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
3199         }
3200
3201         spin_lock_irq(&mdev->req_lock);
3202  retry:
3203         os = ns = mdev->state;
3204         spin_unlock_irq(&mdev->req_lock);
3205
3206         /* If some other part of the code (asender thread, timeout)
3207          * already decided to close the connection again,
3208          * we must not "re-establish" it here. */
3209         if (os.conn <= C_TEAR_DOWN)
3210                 return false;
3211
3212         /* If this is the "end of sync" confirmation, usually the peer disk
3213          * transitions from D_INCONSISTENT to D_UP_TO_DATE. For empty (0 bits
3214          * set) resync started in PausedSyncT, or if the timing of pause-/
3215          * unpause-sync events has been "just right", the peer disk may
3216          * transition from D_CONSISTENT to D_UP_TO_DATE as well.
3217          */
3218         if ((os.pdsk == D_INCONSISTENT || os.pdsk == D_CONSISTENT) &&
3219             real_peer_disk == D_UP_TO_DATE &&
3220             os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
3221                 /* If we are (becoming) SyncSource, but peer is still in sync
3222                  * preparation, ignore its uptodate-ness to avoid flapping, it
3223                  * will change to inconsistent once the peer reaches active
3224                  * syncing states.
3225                  * It may have changed syncer-paused flags, however, so we
3226                  * cannot ignore this completely. */
3227                 if (peer_state.conn > C_CONNECTED &&
3228                     peer_state.conn < C_SYNC_SOURCE)
3229                         real_peer_disk = D_INCONSISTENT;
3230
3231                 /* if peer_state changes to connected at the same time,
3232                  * it explicitly notifies us that it finished resync.
3233                  * Maybe we should finish it up, too? */
3234                 else if (os.conn >= C_SYNC_SOURCE &&
3235                          peer_state.conn == C_CONNECTED) {
3236                         if (drbd_bm_total_weight(mdev) <= mdev->rs_failed)
3237                                 drbd_resync_finished(mdev);
3238                         return true;
3239                 }
3240         }
3241
3242         /* peer says his disk is inconsistent, while we think it is uptodate,
3243          * and this happens while the peer still thinks we have a sync going on,
3244          * but we think we are already done with the sync.
3245          * We ignore this to avoid flapping pdsk.
3246          * This should not happen, if the peer is a recent version of drbd. */
3247         if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
3248             os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
3249                 real_peer_disk = D_UP_TO_DATE;
3250
3251         if (ns.conn == C_WF_REPORT_PARAMS)
3252                 ns.conn = C_CONNECTED;
3253
3254         if (peer_state.conn == C_AHEAD)
3255                 ns.conn = C_BEHIND;
3256
3257         if (mdev->p_uuid && peer_state.disk >= D_NEGOTIATING &&
3258             get_ldev_if_state(mdev, D_NEGOTIATING)) {
3259                 int cr; /* consider resync */
3260
3261                 /* if we established a new connection */
3262                 cr  = (os.conn < C_CONNECTED);
3263                 /* if we had an established connection
3264                  * and one of the nodes newly attaches a disk */
3265                 cr |= (os.conn == C_CONNECTED &&
3266                        (peer_state.disk == D_NEGOTIATING ||
3267                         os.disk == D_NEGOTIATING));
3268                 /* if we have both been inconsistent, and the peer has been
3269                  * forced to be UpToDate with --overwrite-data */
3270                 cr |= test_bit(CONSIDER_RESYNC, &mdev->flags);
3271                 /* if we had been plain connected, and the admin requested to
3272                  * start a sync by "invalidate" or "invalidate-remote" */
3273                 cr |= (os.conn == C_CONNECTED &&
3274                                 (peer_state.conn >= C_STARTING_SYNC_S &&
3275                                  peer_state.conn <= C_WF_BITMAP_T));
3276
3277                 if (cr)
3278                         ns.conn = drbd_sync_handshake(mdev, peer_state.role, real_peer_disk);
3279
3280                 put_ldev(mdev);
3281                 if (ns.conn == C_MASK) {
3282                         ns.conn = C_CONNECTED;
3283                         if (mdev->state.disk == D_NEGOTIATING) {
3284                                 drbd_force_state(mdev, NS(disk, D_FAILED));
3285                         } else if (peer_state.disk == D_NEGOTIATING) {
3286                                 dev_err(DEV, "Disk attach process on the peer node was aborted.\n");
3287                                 peer_state.disk = D_DISKLESS;
3288                                 real_peer_disk = D_DISKLESS;
3289                         } else {
3290                                 if (test_and_clear_bit(CONN_DRY_RUN, &mdev->flags))
3291                                         return false;
3292                                 D_ASSERT(os.conn == C_WF_REPORT_PARAMS);
3293                                 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3294                                 return false;
3295                         }
3296                 }
3297         }
3298
3299         spin_lock_irq(&mdev->req_lock);
3300         if (mdev->state.i != os.i)
3301                 goto retry;
3302         clear_bit(CONSIDER_RESYNC, &mdev->flags);
3303         ns.peer = peer_state.role;
3304         ns.pdsk = real_peer_disk;
3305         ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
3306         if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
3307                 ns.disk = mdev->new_state_tmp.disk;
3308         cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
3309         if (ns.pdsk == D_CONSISTENT && is_susp(ns) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
3310             test_bit(NEW_CUR_UUID, &mdev->flags)) {
3311                 /* Do not allow tl_restart(resend) for a rebooted peer. We can only allow this
3312                    for temporal network outages! */
3313                 spin_unlock_irq(&mdev->req_lock);
3314                 dev_err(DEV, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
3315                 tl_clear(mdev);
3316                 drbd_uuid_new_current(mdev);
3317                 clear_bit(NEW_CUR_UUID, &mdev->flags);
3318                 drbd_force_state(mdev, NS2(conn, C_PROTOCOL_ERROR, susp, 0));
3319                 return false;
3320         }
3321         rv = _drbd_set_state(mdev, ns, cs_flags, NULL);
3322         ns = mdev->state;
3323         spin_unlock_irq(&mdev->req_lock);
3324
3325         if (rv < SS_SUCCESS) {
3326                 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3327                 return false;
3328         }
3329
3330         if (os.conn > C_WF_REPORT_PARAMS) {
3331                 if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
3332                     peer_state.disk != D_NEGOTIATING ) {
3333                         /* we want resync, peer has not yet decided to sync... */
3334                         /* Nowadays only used when forcing a node into primary role and
3335                            setting its disk to UpToDate with that */
3336                         drbd_send_uuids(mdev);
3337                         drbd_send_current_state(mdev);
3338                 }
3339         }
3340
3341         mdev->net_conf->want_lose = 0;
3342
3343         drbd_md_sync(mdev); /* update connected indicator, la_size, ... */
3344
3345         return true;
3346 }
3347
3348 static int receive_sync_uuid(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3349 {
3350         struct p_rs_uuid *p = &mdev->data.rbuf.rs_uuid;
3351
3352         wait_event(mdev->misc_wait,
3353                    mdev->state.conn == C_WF_SYNC_UUID ||
3354                    mdev->state.conn == C_BEHIND ||
3355                    mdev->state.conn < C_CONNECTED ||
3356                    mdev->state.disk < D_NEGOTIATING);
3357
3358         /* D_ASSERT( mdev->state.conn == C_WF_SYNC_UUID ); */
3359
3360         /* Here the _drbd_uuid_ functions are right, current should
3361            _not_ be rotated into the history */
3362         if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
3363                 _drbd_uuid_set(mdev, UI_CURRENT, be64_to_cpu(p->uuid));
3364                 _drbd_uuid_set(mdev, UI_BITMAP, 0UL);
3365
3366                 drbd_print_uuids(mdev, "updated sync uuid");
3367                 drbd_start_resync(mdev, C_SYNC_TARGET);
3368
3369                 put_ldev(mdev);
3370         } else
3371                 dev_err(DEV, "Ignoring SyncUUID packet!\n");
3372
3373         return true;
3374 }
3375
3376 /**
3377  * receive_bitmap_plain
3378  *
3379  * Return 0 when done, 1 when another iteration is needed, and a negative error
3380  * code upon failure.
3381  */
3382 static int
3383 receive_bitmap_plain(struct drbd_conf *mdev, unsigned int data_size,
3384                      unsigned long *buffer, struct bm_xfer_ctx *c)
3385 {
3386         unsigned num_words = min_t(size_t, BM_PACKET_WORDS, c->bm_words - c->word_offset);
3387         unsigned want = num_words * sizeof(long);
3388         int err;
3389
3390         if (want != data_size) {
3391                 dev_err(DEV, "%s:want (%u) != data_size (%u)\n", __func__, want, data_size);
3392                 return -EIO;
3393         }
3394         if (want == 0)
3395                 return 0;
3396         err = drbd_recv(mdev, buffer, want);
3397         if (err != want) {
3398                 if (err >= 0)
3399                         err = -EIO;
3400                 return err;
3401         }
3402
3403         drbd_bm_merge_lel(mdev, c->word_offset, num_words, buffer);
3404
3405         c->word_offset += num_words;
3406         c->bit_offset = c->word_offset * BITS_PER_LONG;
3407         if (c->bit_offset > c->bm_bits)
3408                 c->bit_offset = c->bm_bits;
3409
3410         return 1;
3411 }
3412
3413 /**
3414  * recv_bm_rle_bits
3415  *
3416  * Return 0 when done, 1 when another iteration is needed, and a negative error
3417  * code upon failure.
3418  */
3419 static int
3420 recv_bm_rle_bits(struct drbd_conf *mdev,
3421                 struct p_compressed_bm *p,
3422                 struct bm_xfer_ctx *c)
3423 {
3424         struct bitstream bs;
3425         u64 look_ahead;
3426         u64 rl;
3427         u64 tmp;
3428         unsigned long s = c->bit_offset;
3429         unsigned long e;
3430         int len = be16_to_cpu(p->head.length) - (sizeof(*p) - sizeof(p->head));
3431         int toggle = DCBP_get_start(p);
3432         int have;
3433         int bits;
3434
3435         bitstream_init(&bs, p->code, len, DCBP_get_pad_bits(p));
3436
3437         bits = bitstream_get_bits(&bs, &look_ahead, 64);
3438         if (bits < 0)
3439                 return -EIO;
3440
3441         for (have = bits; have > 0; s += rl, toggle = !toggle) {
3442                 bits = vli_decode_bits(&rl, look_ahead);
3443                 if (bits <= 0)
3444                         return -EIO;
3445
3446                 if (toggle) {
3447                         e = s + rl -1;
3448                         if (e >= c->bm_bits) {
3449                                 dev_err(DEV, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
3450                                 return -EIO;
3451                         }
3452                         _drbd_bm_set_bits(mdev, s, e);
3453                 }
3454
3455                 if (have < bits) {
3456                         dev_err(DEV, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
3457                                 have, bits, look_ahead,
3458                                 (unsigned int)(bs.cur.b - p->code),
3459                                 (unsigned int)bs.buf_len);
3460                         return -EIO;
3461                 }
3462                 look_ahead >>= bits;
3463                 have -= bits;
3464
3465                 bits = bitstream_get_bits(&bs, &tmp, 64 - have);
3466                 if (bits < 0)
3467                         return -EIO;
3468                 look_ahead |= tmp << have;
3469                 have += bits;
3470         }
3471
3472         c->bit_offset = s;
3473         bm_xfer_ctx_bit_to_word_offset(c);
3474
3475         return (s != c->bm_bits);
3476 }
3477
3478 /**
3479  * decode_bitmap_c
3480  *
3481  * Return 0 when done, 1 when another iteration is needed, and a negative error
3482  * code upon failure.
3483  */
3484 static int
3485 decode_bitmap_c(struct drbd_conf *mdev,
3486                 struct p_compressed_bm *p,
3487                 struct bm_xfer_ctx *c)
3488 {
3489         if (DCBP_get_code(p) == RLE_VLI_Bits)
3490                 return recv_bm_rle_bits(mdev, p, c);
3491
3492         /* other variants had been implemented for evaluation,
3493          * but have been dropped as this one turned out to be "best"
3494          * during all our tests. */
3495
3496         dev_err(DEV, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
3497         drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
3498         return -EIO;
3499 }
3500
3501 void INFO_bm_xfer_stats(struct drbd_conf *mdev,
3502                 const char *direction, struct bm_xfer_ctx *c)
3503 {
3504         /* what would it take to transfer it "plaintext" */
3505         unsigned plain = sizeof(struct p_header80) *
3506                 ((c->bm_words+BM_PACKET_WORDS-1)/BM_PACKET_WORDS+1)
3507                 + c->bm_words * sizeof(long);
3508         unsigned total = c->bytes[0] + c->bytes[1];
3509         unsigned r;
3510
3511         /* total can not be zero. but just in case: */
3512         if (total == 0)
3513                 return;
3514
3515         /* don't report if not compressed */
3516         if (total >= plain)
3517                 return;
3518
3519         /* total < plain. check for overflow, still */
3520         r = (total > UINT_MAX/1000) ? (total / (plain/1000))
3521                                     : (1000 * total / plain);
3522
3523         if (r > 1000)
3524                 r = 1000;
3525
3526         r = 1000 - r;
3527         dev_info(DEV, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
3528              "total %u; compression: %u.%u%%\n",
3529                         direction,
3530                         c->bytes[1], c->packets[1],
3531                         c->bytes[0], c->packets[0],
3532                         total, r/10, r % 10);
3533 }
3534
3535 /* Since we are processing the bitfield from lower addresses to higher,
3536    it does not matter if the process it in 32 bit chunks or 64 bit
3537    chunks as long as it is little endian. (Understand it as byte stream,
3538    beginning with the lowest byte...) If we would use big endian
3539    we would need to process it from the highest address to the lowest,
3540    in order to be agnostic to the 32 vs 64 bits issue.
3541
3542    returns 0 on failure, 1 if we successfully received it. */
3543 static int receive_bitmap(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3544 {
3545         struct bm_xfer_ctx c;
3546         void *buffer;
3547         int err;
3548         int ok = false;
3549         struct p_header80 *h = &mdev->data.rbuf.header.h80;
3550
3551         drbd_bm_lock(mdev, "receive bitmap", BM_LOCKED_SET_ALLOWED);
3552         /* you are supposed to send additional out-of-sync information
3553          * if you actually set bits during this phase */
3554
3555         /* maybe we should use some per thread scratch page,
3556          * and allocate that during initial device creation? */
3557         buffer   = (unsigned long *) __get_free_page(GFP_NOIO);
3558         if (!buffer) {
3559                 dev_err(DEV, "failed to allocate one page buffer in %s\n", __func__);
3560                 goto out;
3561         }
3562
3563         c = (struct bm_xfer_ctx) {
3564                 .bm_bits = drbd_bm_bits(mdev),
3565                 .bm_words = drbd_bm_words(mdev),
3566         };
3567
3568         for(;;) {
3569                 if (cmd == P_BITMAP) {
3570                         err = receive_bitmap_plain(mdev, data_size, buffer, &c);
3571                 } else if (cmd == P_COMPRESSED_BITMAP) {
3572                         /* MAYBE: sanity check that we speak proto >= 90,
3573                          * and the feature is enabled! */
3574                         struct p_compressed_bm *p;
3575
3576                         if (data_size > BM_PACKET_PAYLOAD_BYTES) {
3577                                 dev_err(DEV, "ReportCBitmap packet too large\n");
3578                                 goto out;
3579                         }
3580                         /* use the page buff */
3581                         p = buffer;
3582                         memcpy(p, h, sizeof(*h));
3583                         if (drbd_recv(mdev, p->head.payload, data_size) != data_size)
3584                                 goto out;
3585                         if (data_size <= (sizeof(*p) - sizeof(p->head))) {
3586                                 dev_err(DEV, "ReportCBitmap packet too small (l:%u)\n", data_size);
3587                                 goto out;
3588                         }
3589                         err = decode_bitmap_c(mdev, p, &c);
3590                 } else {
3591                         dev_warn(DEV, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", cmd);
3592                         goto out;
3593                 }
3594
3595                 c.packets[cmd == P_BITMAP]++;
3596                 c.bytes[cmd == P_BITMAP] += sizeof(struct p_header80) + data_size;
3597
3598                 if (err <= 0) {
3599                         if (err < 0)
3600                                 goto out;
3601                         break;
3602                 }
3603                 if (!drbd_recv_header(mdev, &cmd, &data_size))
3604                         goto out;
3605         }
3606
3607         INFO_bm_xfer_stats(mdev, "receive", &c);
3608
3609         if (mdev->state.conn == C_WF_BITMAP_T) {
3610                 enum drbd_state_rv rv;
3611
3612                 ok = !drbd_send_bitmap(mdev);
3613                 if (!ok)
3614                         goto out;
3615                 /* Omit CS_ORDERED with this state transition to avoid deadlocks. */
3616                 rv = _drbd_request_state(mdev, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
3617                 D_ASSERT(rv == SS_SUCCESS);
3618         } else if (mdev->state.conn != C_WF_BITMAP_S) {
3619                 /* admin may have requested C_DISCONNECTING,
3620                  * other threads may have noticed network errors */
3621                 dev_info(DEV, "unexpected cstate (%s) in receive_bitmap\n",
3622                     drbd_conn_str(mdev->state.conn));
3623         }
3624
3625         ok = true;
3626  out:
3627         drbd_bm_unlock(mdev);
3628         if (ok && mdev->state.conn == C_WF_BITMAP_S)
3629                 drbd_start_resync(mdev, C_SYNC_SOURCE);
3630         free_page((unsigned long) buffer);
3631         return ok;
3632 }
3633
3634 static int receive_skip(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3635 {
3636         /* TODO zero copy sink :) */
3637         static char sink[128];
3638         int size, want, r;
3639
3640         dev_warn(DEV, "skipping unknown optional packet type %d, l: %d!\n",
3641                  cmd, data_size);
3642
3643         size = data_size;
3644         while (size > 0) {
3645                 want = min_t(int, size, sizeof(sink));
3646                 r = drbd_recv(mdev, sink, want);
3647                 ERR_IF(r <= 0) break;
3648                 size -= r;
3649         }
3650         return size == 0;
3651 }
3652
3653 static int receive_UnplugRemote(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3654 {
3655         /* Make sure we've acked all the TCP data associated
3656          * with the data requests being unplugged */
3657         drbd_tcp_quickack(mdev->data.socket);
3658
3659         return true;
3660 }
3661
3662 static int receive_out_of_sync(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3663 {
3664         struct p_block_desc *p = &mdev->data.rbuf.block_desc;
3665
3666         switch (mdev->state.conn) {
3667         case C_WF_SYNC_UUID:
3668         case C_WF_BITMAP_T:
3669         case C_BEHIND:
3670                         break;
3671         default:
3672                 dev_err(DEV, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
3673                                 drbd_conn_str(mdev->state.conn));
3674         }
3675
3676         drbd_set_out_of_sync(mdev, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
3677
3678         return true;
3679 }
3680
3681 typedef int (*drbd_cmd_handler_f)(struct drbd_conf *, enum drbd_packets cmd, unsigned int to_receive);
3682
3683 struct data_cmd {
3684         int expect_payload;
3685         size_t pkt_size;
3686         drbd_cmd_handler_f function;
3687 };
3688
3689 static struct data_cmd drbd_cmd_handler[] = {
3690         [P_DATA]            = { 1, sizeof(struct p_data), receive_Data },
3691         [P_DATA_REPLY]      = { 1, sizeof(struct p_data), receive_DataReply },
3692         [P_RS_DATA_REPLY]   = { 1, sizeof(struct p_data), receive_RSDataReply } ,
3693         [P_BARRIER]         = { 0, sizeof(struct p_barrier), receive_Barrier } ,
3694         [P_BITMAP]          = { 1, sizeof(struct p_header80), receive_bitmap } ,
3695         [P_COMPRESSED_BITMAP] = { 1, sizeof(struct p_header80), receive_bitmap } ,
3696         [P_UNPLUG_REMOTE]   = { 0, sizeof(struct p_header80), receive_UnplugRemote },
3697         [P_DATA_REQUEST]    = { 0, sizeof(struct p_block_req), receive_DataRequest },
3698         [P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
3699         [P_SYNC_PARAM]      = { 1, sizeof(struct p_header80), receive_SyncParam },
3700         [P_SYNC_PARAM89]    = { 1, sizeof(struct p_header80), receive_SyncParam },
3701         [P_PROTOCOL]        = { 1, sizeof(struct p_protocol), receive_protocol },
3702         [P_UUIDS]           = { 0, sizeof(struct p_uuids), receive_uuids },
3703         [P_SIZES]           = { 0, sizeof(struct p_sizes), receive_sizes },
3704         [P_STATE]           = { 0, sizeof(struct p_state), receive_state },
3705         [P_STATE_CHG_REQ]   = { 0, sizeof(struct p_req_state), receive_req_state },
3706         [P_SYNC_UUID]       = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
3707         [P_OV_REQUEST]      = { 0, sizeof(struct p_block_req), receive_DataRequest },
3708         [P_OV_REPLY]        = { 1, sizeof(struct p_block_req), receive_DataRequest },
3709         [P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
3710         [P_DELAY_PROBE]     = { 0, sizeof(struct p_delay_probe93), receive_skip },
3711         [P_OUT_OF_SYNC]     = { 0, sizeof(struct p_block_desc), receive_out_of_sync },
3712         /* anything missing from this table is in
3713          * the asender_tbl, see get_asender_cmd */
3714         [P_MAX_CMD]         = { 0, 0, NULL },
3715 };
3716
3717 /* All handler functions that expect a sub-header get that sub-heder in
3718    mdev->data.rbuf.header.head.payload.
3719
3720    Usually in mdev->data.rbuf.header.head the callback can find the usual
3721    p_header, but they may not rely on that. Since there is also p_header95 !
3722  */
3723
3724 static void drbdd(struct drbd_conf *mdev)
3725 {
3726         union p_header *header = &mdev->data.rbuf.header;
3727         unsigned int packet_size;
3728         enum drbd_packets cmd;
3729         size_t shs; /* sub header size */
3730         int rv;
3731
3732         while (get_t_state(&mdev->receiver) == Running) {
3733                 drbd_thread_current_set_cpu(mdev);
3734                 if (!drbd_recv_header(mdev, &cmd, &packet_size))
3735                         goto err_out;
3736
3737                 if (unlikely(cmd >= P_MAX_CMD || !drbd_cmd_handler[cmd].function)) {
3738                         dev_err(DEV, "unknown packet type %d, l: %d!\n", cmd, packet_size);
3739                         goto err_out;
3740                 }
3741
3742                 shs = drbd_cmd_handler[cmd].pkt_size - sizeof(union p_header);
3743                 if (packet_size - shs > 0 && !drbd_cmd_handler[cmd].expect_payload) {
3744                         dev_err(DEV, "No payload expected %s l:%d\n", cmdname(cmd), packet_size);
3745                         goto err_out;
3746                 }
3747
3748                 if (shs) {
3749                         rv = drbd_recv(mdev, &header->h80.payload, shs);
3750                         if (unlikely(rv != shs)) {
3751                                 if (!signal_pending(current))
3752                                         dev_warn(DEV, "short read while reading sub header: rv=%d\n", rv);
3753                                 goto err_out;
3754                         }
3755                 }
3756
3757                 rv = drbd_cmd_handler[cmd].function(mdev, cmd, packet_size - shs);
3758
3759                 if (unlikely(!rv)) {
3760                         dev_err(DEV, "error receiving %s, l: %d!\n",
3761                             cmdname(cmd), packet_size);
3762                         goto err_out;
3763                 }
3764         }
3765
3766         if (0) {
3767         err_out:
3768                 drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
3769         }
3770         /* If we leave here, we probably want to update at least the
3771          * "Connected" indicator on stable storage. Do so explicitly here. */
3772         drbd_md_sync(mdev);
3773 }
3774
3775 void drbd_flush_workqueue(struct drbd_conf *mdev)
3776 {
3777         struct drbd_wq_barrier barr;
3778
3779         barr.w.cb = w_prev_work_done;
3780         init_completion(&barr.done);
3781         drbd_queue_work(&mdev->data.work, &barr.w);
3782         wait_for_completion(&barr.done);
3783 }
3784
3785 void drbd_free_tl_hash(struct drbd_conf *mdev)
3786 {
3787         struct hlist_head *h;
3788
3789         spin_lock_irq(&mdev->req_lock);
3790
3791         if (!mdev->tl_hash || mdev->state.conn != C_STANDALONE) {
3792                 spin_unlock_irq(&mdev->req_lock);
3793                 return;
3794         }
3795         /* paranoia code */
3796         for (h = mdev->ee_hash; h < mdev->ee_hash + mdev->ee_hash_s; h++)
3797                 if (h->first)
3798                         dev_err(DEV, "ASSERT FAILED ee_hash[%u].first == %p, expected NULL\n",
3799                                 (int)(h - mdev->ee_hash), h->first);
3800         kfree(mdev->ee_hash);
3801         mdev->ee_hash = NULL;
3802         mdev->ee_hash_s = 0;
3803
3804         /* paranoia code */
3805         for (h = mdev->tl_hash; h < mdev->tl_hash + mdev->tl_hash_s; h++)
3806                 if (h->first)
3807                         dev_err(DEV, "ASSERT FAILED tl_hash[%u] == %p, expected NULL\n",
3808                                 (int)(h - mdev->tl_hash), h->first);
3809         kfree(mdev->tl_hash);
3810         mdev->tl_hash = NULL;
3811         mdev->tl_hash_s = 0;
3812         spin_unlock_irq(&mdev->req_lock);
3813 }
3814
3815 static void drbd_disconnect(struct drbd_conf *mdev)
3816 {
3817         enum drbd_fencing_p fp;
3818         union drbd_state os, ns;
3819         int rv = SS_UNKNOWN_ERROR;
3820         unsigned int i;
3821
3822         if (mdev->state.conn == C_STANDALONE)
3823                 return;
3824
3825         /* We are about to start the cleanup after connection loss.
3826          * Make sure drbd_make_request knows about that.
3827          * Usually we should be in some network failure state already,
3828          * but just in case we are not, we fix it up here.
3829          */
3830         drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE));
3831
3832         /* asender does not clean up anything. it must not interfere, either */
3833         drbd_thread_stop(&mdev->asender);
3834         drbd_free_sock(mdev);
3835
3836         /* wait for current activity to cease. */
3837         spin_lock_irq(&mdev->req_lock);
3838         _drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
3839         _drbd_wait_ee_list_empty(mdev, &mdev->sync_ee);
3840         _drbd_wait_ee_list_empty(mdev, &mdev->read_ee);
3841         spin_unlock_irq(&mdev->req_lock);
3842
3843         /* We do not have data structures that would allow us to
3844          * get the rs_pending_cnt down to 0 again.
3845          *  * On C_SYNC_TARGET we do not have any data structures describing
3846          *    the pending RSDataRequest's we have sent.
3847          *  * On C_SYNC_SOURCE there is no data structure that tracks
3848          *    the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
3849          *  And no, it is not the sum of the reference counts in the
3850          *  resync_LRU. The resync_LRU tracks the whole operation including
3851          *  the disk-IO, while the rs_pending_cnt only tracks the blocks
3852          *  on the fly. */
3853         drbd_rs_cancel_all(mdev);
3854         mdev->rs_total = 0;
3855         mdev->rs_failed = 0;
3856         atomic_set(&mdev->rs_pending_cnt, 0);
3857         wake_up(&mdev->misc_wait);
3858
3859         /* make sure syncer is stopped and w_resume_next_sg queued */
3860         del_timer_sync(&mdev->resync_timer);
3861         resync_timer_fn((unsigned long)mdev);
3862
3863         /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
3864          * w_make_resync_request etc. which may still be on the worker queue
3865          * to be "canceled" */
3866         drbd_flush_workqueue(mdev);
3867
3868         /* This also does reclaim_net_ee().  If we do this too early, we might
3869          * miss some resync ee and pages.*/
3870         drbd_process_done_ee(mdev);
3871
3872         kfree(mdev->p_uuid);
3873         mdev->p_uuid = NULL;
3874
3875         if (!is_susp(mdev->state))
3876                 tl_clear(mdev);
3877
3878         dev_info(DEV, "Connection closed\n");
3879
3880         drbd_md_sync(mdev);
3881
3882         fp = FP_DONT_CARE;
3883         if (get_ldev(mdev)) {
3884                 fp = mdev->ldev->dc.fencing;
3885                 put_ldev(mdev);
3886         }
3887
3888         if (mdev->state.role == R_PRIMARY && fp >= FP_RESOURCE && mdev->state.pdsk >= D_UNKNOWN)
3889                 drbd_try_outdate_peer_async(mdev);
3890
3891         spin_lock_irq(&mdev->req_lock);
3892         os = mdev->state;
3893         if (os.conn >= C_UNCONNECTED) {
3894                 /* Do not restart in case we are C_DISCONNECTING */
3895                 ns = os;
3896                 ns.conn = C_UNCONNECTED;
3897                 rv = _drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
3898         }
3899         spin_unlock_irq(&mdev->req_lock);
3900
3901         if (os.conn == C_DISCONNECTING) {
3902                 wait_event(mdev->net_cnt_wait, atomic_read(&mdev->net_cnt) == 0);
3903
3904                 crypto_free_hash(mdev->cram_hmac_tfm);
3905                 mdev->cram_hmac_tfm = NULL;
3906
3907                 kfree(mdev->net_conf);
3908                 mdev->net_conf = NULL;
3909                 drbd_request_state(mdev, NS(conn, C_STANDALONE));
3910         }
3911
3912         /* serialize with bitmap writeout triggered by the state change,
3913          * if any. */
3914         wait_event(mdev->misc_wait, !test_bit(BITMAP_IO, &mdev->flags));
3915
3916         /* tcp_close and release of sendpage pages can be deferred.  I don't
3917          * want to use SO_LINGER, because apparently it can be deferred for
3918          * more than 20 seconds (longest time I checked).
3919          *
3920          * Actually we don't care for exactly when the network stack does its
3921          * put_page(), but release our reference on these pages right here.
3922          */
3923         i = drbd_release_ee(mdev, &mdev->net_ee);
3924         if (i)
3925                 dev_info(DEV, "net_ee not empty, killed %u entries\n", i);
3926         i = atomic_read(&mdev->pp_in_use_by_net);
3927         if (i)
3928                 dev_info(DEV, "pp_in_use_by_net = %d, expected 0\n", i);
3929         i = atomic_read(&mdev->pp_in_use);
3930         if (i)
3931                 dev_info(DEV, "pp_in_use = %d, expected 0\n", i);
3932
3933         D_ASSERT(list_empty(&mdev->read_ee));
3934         D_ASSERT(list_empty(&mdev->active_ee));
3935         D_ASSERT(list_empty(&mdev->sync_ee));
3936         D_ASSERT(list_empty(&mdev->done_ee));
3937
3938         /* ok, no more ee's on the fly, it is safe to reset the epoch_size */
3939         atomic_set(&mdev->current_epoch->epoch_size, 0);
3940         D_ASSERT(list_empty(&mdev->current_epoch->list));
3941 }
3942
3943 /*
3944  * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
3945  * we can agree on is stored in agreed_pro_version.
3946  *
3947  * feature flags and the reserved array should be enough room for future
3948  * enhancements of the handshake protocol, and possible plugins...
3949  *
3950  * for now, they are expected to be zero, but ignored.
3951  */
3952 static int drbd_send_handshake(struct drbd_conf *mdev)
3953 {
3954         /* ASSERT current == mdev->receiver ... */
3955         struct p_handshake *p = &mdev->data.sbuf.handshake;
3956         int ok;
3957
3958         if (mutex_lock_interruptible(&mdev->data.mutex)) {
3959                 dev_err(DEV, "interrupted during initial handshake\n");
3960                 return 0; /* interrupted. not ok. */
3961         }
3962
3963         if (mdev->data.socket == NULL) {
3964                 mutex_unlock(&mdev->data.mutex);
3965                 return 0;
3966         }
3967
3968         memset(p, 0, sizeof(*p));
3969         p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
3970         p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
3971         ok = _drbd_send_cmd( mdev, mdev->data.socket, P_HAND_SHAKE,
3972                              (struct p_header80 *)p, sizeof(*p), 0 );
3973         mutex_unlock(&mdev->data.mutex);
3974         return ok;
3975 }
3976
3977 /*
3978  * return values:
3979  *   1 yes, we have a valid connection
3980  *   0 oops, did not work out, please try again
3981  *  -1 peer talks different language,
3982  *     no point in trying again, please go standalone.
3983  */
3984 static int drbd_do_handshake(struct drbd_conf *mdev)
3985 {
3986         /* ASSERT current == mdev->receiver ... */
3987         struct p_handshake *p = &mdev->data.rbuf.handshake;
3988         const int expect = sizeof(struct p_handshake) - sizeof(struct p_header80);
3989         unsigned int length;
3990         enum drbd_packets cmd;
3991         int rv;
3992
3993         rv = drbd_send_handshake(mdev);
3994         if (!rv)
3995                 return 0;
3996
3997         rv = drbd_recv_header(mdev, &cmd, &length);
3998         if (!rv)
3999                 return 0;
4000
4001         if (cmd != P_HAND_SHAKE) {
4002                 dev_err(DEV, "expected HandShake packet, received: %s (0x%04x)\n",
4003                      cmdname(cmd), cmd);
4004                 return -1;
4005         }
4006
4007         if (length != expect) {
4008                 dev_err(DEV, "expected HandShake length: %u, received: %u\n",
4009                      expect, length);
4010                 return -1;
4011         }
4012
4013         rv = drbd_recv(mdev, &p->head.payload, expect);
4014
4015         if (rv != expect) {
4016                 if (!signal_pending(current))
4017                         dev_warn(DEV, "short read receiving handshake packet: l=%u\n", rv);
4018                 return 0;
4019         }
4020
4021         p->protocol_min = be32_to_cpu(p->protocol_min);
4022         p->protocol_max = be32_to_cpu(p->protocol_max);
4023         if (p->protocol_max == 0)
4024                 p->protocol_max = p->protocol_min;
4025
4026         if (PRO_VERSION_MAX < p->protocol_min ||
4027             PRO_VERSION_MIN > p->protocol_max)
4028                 goto incompat;
4029
4030         mdev->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
4031
4032         dev_info(DEV, "Handshake successful: "
4033              "Agreed network protocol version %d\n", mdev->agreed_pro_version);
4034
4035         return 1;
4036
4037  incompat:
4038         dev_err(DEV, "incompatible DRBD dialects: "
4039             "I support %d-%d, peer supports %d-%d\n",
4040             PRO_VERSION_MIN, PRO_VERSION_MAX,
4041             p->protocol_min, p->protocol_max);
4042         return -1;
4043 }
4044
4045 #if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
4046 static int drbd_do_auth(struct drbd_conf *mdev)
4047 {
4048         dev_err(DEV, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
4049         dev_err(DEV, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
4050         return -1;
4051 }
4052 #else
4053 #define CHALLENGE_LEN 64
4054
4055 /* Return value:
4056         1 - auth succeeded,
4057         0 - failed, try again (network error),
4058         -1 - auth failed, don't try again.
4059 */
4060
4061 static int drbd_do_auth(struct drbd_conf *mdev)
4062 {
4063         char my_challenge[CHALLENGE_LEN];  /* 64 Bytes... */
4064         struct scatterlist sg;
4065         char *response = NULL;
4066         char *right_response = NULL;
4067         char *peers_ch = NULL;
4068         unsigned int key_len = strlen(mdev->net_conf->shared_secret);
4069         unsigned int resp_size;
4070         struct hash_desc desc;
4071         enum drbd_packets cmd;
4072         unsigned int length;
4073         int rv;
4074
4075         desc.tfm = mdev->cram_hmac_tfm;
4076         desc.flags = 0;
4077
4078         rv = crypto_hash_setkey(mdev->cram_hmac_tfm,
4079                                 (u8 *)mdev->net_conf->shared_secret, key_len);
4080         if (rv) {
4081                 dev_err(DEV, "crypto_hash_setkey() failed with %d\n", rv);
4082                 rv = -1;
4083                 goto fail;
4084         }
4085
4086         get_random_bytes(my_challenge, CHALLENGE_LEN);
4087
4088         rv = drbd_send_cmd2(mdev, P_AUTH_CHALLENGE, my_challenge, CHALLENGE_LEN);
4089         if (!rv)
4090                 goto fail;
4091
4092         rv = drbd_recv_header(mdev, &cmd, &length);
4093         if (!rv)
4094                 goto fail;
4095
4096         if (cmd != P_AUTH_CHALLENGE) {
4097                 dev_err(DEV, "expected AuthChallenge packet, received: %s (0x%04x)\n",
4098                     cmdname(cmd), cmd);
4099                 rv = 0;
4100                 goto fail;
4101         }
4102
4103         if (length > CHALLENGE_LEN * 2) {
4104                 dev_err(DEV, "expected AuthChallenge payload too big.\n");
4105                 rv = -1;
4106                 goto fail;
4107         }
4108
4109         peers_ch = kmalloc(length, GFP_NOIO);
4110         if (peers_ch == NULL) {
4111                 dev_err(DEV, "kmalloc of peers_ch failed\n");
4112                 rv = -1;
4113                 goto fail;
4114         }
4115
4116         rv = drbd_recv(mdev, peers_ch, length);
4117
4118         if (rv != length) {
4119                 if (!signal_pending(current))
4120                         dev_warn(DEV, "short read AuthChallenge: l=%u\n", rv);
4121                 rv = 0;
4122                 goto fail;
4123         }
4124
4125         resp_size = crypto_hash_digestsize(mdev->cram_hmac_tfm);
4126         response = kmalloc(resp_size, GFP_NOIO);
4127         if (response == NULL) {
4128                 dev_err(DEV, "kmalloc of response failed\n");
4129                 rv = -1;
4130                 goto fail;
4131         }
4132
4133         sg_init_table(&sg, 1);
4134         sg_set_buf(&sg, peers_ch, length);
4135
4136         rv = crypto_hash_digest(&desc, &sg, sg.length, response);
4137         if (rv) {
4138                 dev_err(DEV, "crypto_hash_digest() failed with %d\n", rv);
4139                 rv = -1;
4140                 goto fail;
4141         }
4142
4143         rv = drbd_send_cmd2(mdev, P_AUTH_RESPONSE, response, resp_size);
4144         if (!rv)
4145                 goto fail;
4146
4147         rv = drbd_recv_header(mdev, &cmd, &length);
4148         if (!rv)
4149                 goto fail;
4150
4151         if (cmd != P_AUTH_RESPONSE) {
4152                 dev_err(DEV, "expected AuthResponse packet, received: %s (0x%04x)\n",
4153                         cmdname(cmd), cmd);
4154                 rv = 0;
4155                 goto fail;
4156         }
4157
4158         if (length != resp_size) {
4159                 dev_err(DEV, "expected AuthResponse payload of wrong size\n");
4160                 rv = 0;
4161                 goto fail;
4162         }
4163
4164         rv = drbd_recv(mdev, response , resp_size);
4165
4166         if (rv != resp_size) {
4167                 if (!signal_pending(current))
4168                         dev_warn(DEV, "short read receiving AuthResponse: l=%u\n", rv);
4169                 rv = 0;
4170                 goto fail;
4171         }
4172
4173         right_response = kmalloc(resp_size, GFP_NOIO);
4174         if (right_response == NULL) {
4175                 dev_err(DEV, "kmalloc of right_response failed\n");
4176                 rv = -1;
4177                 goto fail;
4178         }
4179
4180         sg_set_buf(&sg, my_challenge, CHALLENGE_LEN);
4181
4182         rv = crypto_hash_digest(&desc, &sg, sg.length, right_response);
4183         if (rv) {
4184                 dev_err(DEV, "crypto_hash_digest() failed with %d\n", rv);
4185                 rv = -1;
4186                 goto fail;
4187         }
4188
4189         rv = !memcmp(response, right_response, resp_size);
4190
4191         if (rv)
4192                 dev_info(DEV, "Peer authenticated using %d bytes of '%s' HMAC\n",
4193                      resp_size, mdev->net_conf->cram_hmac_alg);
4194         else
4195                 rv = -1;
4196
4197  fail:
4198         kfree(peers_ch);
4199         kfree(response);
4200         kfree(right_response);
4201
4202         return rv;
4203 }
4204 #endif
4205
4206 int drbdd_init(struct drbd_thread *thi)
4207 {
4208         struct drbd_conf *mdev = thi->mdev;
4209         unsigned int minor = mdev_to_minor(mdev);
4210         int h;
4211
4212         sprintf(current->comm, "drbd%d_receiver", minor);
4213
4214         dev_info(DEV, "receiver (re)started\n");
4215
4216         do {
4217                 h = drbd_connect(mdev);
4218                 if (h == 0) {
4219                         drbd_disconnect(mdev);
4220                         schedule_timeout_interruptible(HZ);
4221                 }
4222                 if (h == -1) {
4223                         dev_warn(DEV, "Discarding network configuration.\n");
4224                         drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
4225                 }
4226         } while (h == 0);
4227
4228         if (h > 0) {
4229                 if (get_net_conf(mdev)) {
4230                         drbdd(mdev);
4231                         put_net_conf(mdev);
4232                 }
4233         }
4234
4235         drbd_disconnect(mdev);
4236
4237         dev_info(DEV, "receiver terminated\n");
4238         return 0;
4239 }
4240
4241 /* ********* acknowledge sender ******** */
4242
4243 static int got_RqSReply(struct drbd_conf *mdev, struct p_header80 *h)
4244 {
4245         struct p_req_state_reply *p = (struct p_req_state_reply *)h;
4246
4247         int retcode = be32_to_cpu(p->retcode);
4248
4249         if (retcode >= SS_SUCCESS) {
4250                 set_bit(CL_ST_CHG_SUCCESS, &mdev->flags);
4251         } else {
4252                 set_bit(CL_ST_CHG_FAIL, &mdev->flags);
4253                 dev_err(DEV, "Requested state change failed by peer: %s (%d)\n",
4254                     drbd_set_st_err_str(retcode), retcode);
4255         }
4256         wake_up(&mdev->state_wait);
4257
4258         return true;
4259 }
4260
4261 static int got_Ping(struct drbd_conf *mdev, struct p_header80 *h)
4262 {
4263         return drbd_send_ping_ack(mdev);
4264
4265 }
4266
4267 static int got_PingAck(struct drbd_conf *mdev, struct p_header80 *h)
4268 {
4269         /* restore idle timeout */
4270         mdev->meta.socket->sk->sk_rcvtimeo = mdev->net_conf->ping_int*HZ;
4271         if (!test_and_set_bit(GOT_PING_ACK, &mdev->flags))
4272                 wake_up(&mdev->misc_wait);
4273
4274         return true;
4275 }
4276
4277 static int got_IsInSync(struct drbd_conf *mdev, struct p_header80 *h)
4278 {
4279         struct p_block_ack *p = (struct p_block_ack *)h;
4280         sector_t sector = be64_to_cpu(p->sector);
4281         int blksize = be32_to_cpu(p->blksize);
4282
4283         D_ASSERT(mdev->agreed_pro_version >= 89);
4284
4285         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4286
4287         if (get_ldev(mdev)) {
4288                 drbd_rs_complete_io(mdev, sector);
4289                 drbd_set_in_sync(mdev, sector, blksize);
4290                 /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
4291                 mdev->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
4292                 put_ldev(mdev);
4293         }
4294         dec_rs_pending(mdev);
4295         atomic_add(blksize >> 9, &mdev->rs_sect_in);
4296
4297         return true;
4298 }
4299
4300 /* when we receive the ACK for a write request,
4301  * verify that we actually know about it */
4302 static struct drbd_request *_ack_id_to_req(struct drbd_conf *mdev,
4303         u64 id, sector_t sector)
4304 {
4305         struct hlist_head *slot = tl_hash_slot(mdev, sector);
4306         struct hlist_node *n;
4307         struct drbd_request *req;
4308
4309         hlist_for_each_entry(req, n, slot, collision) {
4310                 if ((unsigned long)req == (unsigned long)id) {
4311                         if (req->sector != sector) {
4312                                 dev_err(DEV, "_ack_id_to_req: found req %p but it has "
4313                                     "wrong sector (%llus versus %llus)\n", req,
4314                                     (unsigned long long)req->sector,
4315                                     (unsigned long long)sector);
4316                                 break;
4317                         }
4318                         return req;
4319                 }
4320         }
4321         return NULL;
4322 }
4323
4324 typedef struct drbd_request *(req_validator_fn)
4325         (struct drbd_conf *mdev, u64 id, sector_t sector);
4326
4327 static int validate_req_change_req_state(struct drbd_conf *mdev,
4328         u64 id, sector_t sector, req_validator_fn validator,
4329         const char *func, enum drbd_req_event what)
4330 {
4331         struct drbd_request *req;
4332         struct bio_and_error m;
4333
4334         spin_lock_irq(&mdev->req_lock);
4335         req = validator(mdev, id, sector);
4336         if (unlikely(!req)) {
4337                 spin_unlock_irq(&mdev->req_lock);
4338
4339                 dev_err(DEV, "%s: failed to find req %p, sector %llus\n", func,
4340                         (void *)(unsigned long)id, (unsigned long long)sector);
4341                 return false;
4342         }
4343         __req_mod(req, what, &m);
4344         spin_unlock_irq(&mdev->req_lock);
4345
4346         if (m.bio)
4347                 complete_master_bio(mdev, &m);
4348         return true;
4349 }
4350
4351 static int got_BlockAck(struct drbd_conf *mdev, struct p_header80 *h)
4352 {
4353         struct p_block_ack *p = (struct p_block_ack *)h;
4354         sector_t sector = be64_to_cpu(p->sector);
4355         int blksize = be32_to_cpu(p->blksize);
4356         enum drbd_req_event what;
4357
4358         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4359
4360         if (is_syncer_block_id(p->block_id)) {
4361                 drbd_set_in_sync(mdev, sector, blksize);
4362                 dec_rs_pending(mdev);
4363                 return true;
4364         }
4365         switch (be16_to_cpu(h->command)) {
4366         case P_RS_WRITE_ACK:
4367                 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
4368                 what = write_acked_by_peer_and_sis;
4369                 break;
4370         case P_WRITE_ACK:
4371                 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
4372                 what = write_acked_by_peer;
4373                 break;
4374         case P_RECV_ACK:
4375                 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_B);
4376                 what = recv_acked_by_peer;
4377                 break;
4378         case P_DISCARD_ACK:
4379                 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
4380                 what = conflict_discarded_by_peer;
4381                 break;
4382         default:
4383                 D_ASSERT(0);
4384                 return false;
4385         }
4386
4387         return validate_req_change_req_state(mdev, p->block_id, sector,
4388                 _ack_id_to_req, __func__ , what);
4389 }
4390
4391 static int got_NegAck(struct drbd_conf *mdev, struct p_header80 *h)
4392 {
4393         struct p_block_ack *p = (struct p_block_ack *)h;
4394         sector_t sector = be64_to_cpu(p->sector);
4395         int size = be32_to_cpu(p->blksize);
4396         struct drbd_request *req;
4397         struct bio_and_error m;
4398
4399         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4400
4401         if (is_syncer_block_id(p->block_id)) {
4402                 dec_rs_pending(mdev);
4403                 drbd_rs_failed_io(mdev, sector, size);
4404                 return true;
4405         }
4406
4407         spin_lock_irq(&mdev->req_lock);
4408         req = _ack_id_to_req(mdev, p->block_id, sector);
4409         if (!req) {
4410                 spin_unlock_irq(&mdev->req_lock);
4411                 if (mdev->net_conf->wire_protocol == DRBD_PROT_A ||
4412                     mdev->net_conf->wire_protocol == DRBD_PROT_B) {
4413                         /* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
4414                            The master bio might already be completed, therefore the
4415                            request is no longer in the collision hash.
4416                            => Do not try to validate block_id as request. */
4417                         /* In Protocol B we might already have got a P_RECV_ACK
4418                            but then get a P_NEG_ACK after wards. */
4419                         drbd_set_out_of_sync(mdev, sector, size);
4420                         return true;
4421                 } else {
4422                         dev_err(DEV, "%s: failed to find req %p, sector %llus\n", __func__,
4423                                 (void *)(unsigned long)p->block_id, (unsigned long long)sector);
4424                         return false;
4425                 }
4426         }
4427         __req_mod(req, neg_acked, &m);
4428         spin_unlock_irq(&mdev->req_lock);
4429
4430         if (m.bio)
4431                 complete_master_bio(mdev, &m);
4432         return true;
4433 }
4434
4435 static int got_NegDReply(struct drbd_conf *mdev, struct p_header80 *h)
4436 {
4437         struct p_block_ack *p = (struct p_block_ack *)h;
4438         sector_t sector = be64_to_cpu(p->sector);
4439
4440         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4441         dev_err(DEV, "Got NegDReply; Sector %llus, len %u; Fail original request.\n",
4442             (unsigned long long)sector, be32_to_cpu(p->blksize));
4443
4444         return validate_req_change_req_state(mdev, p->block_id, sector,
4445                 _ar_id_to_req, __func__ , neg_acked);
4446 }
4447
4448 static int got_NegRSDReply(struct drbd_conf *mdev, struct p_header80 *h)
4449 {
4450         sector_t sector;
4451         int size;
4452         struct p_block_ack *p = (struct p_block_ack *)h;
4453
4454         sector = be64_to_cpu(p->sector);
4455         size = be32_to_cpu(p->blksize);
4456
4457         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4458
4459         dec_rs_pending(mdev);
4460
4461         if (get_ldev_if_state(mdev, D_FAILED)) {
4462                 drbd_rs_complete_io(mdev, sector);
4463                 switch (be16_to_cpu(h->command)) {
4464                 case P_NEG_RS_DREPLY:
4465                         drbd_rs_failed_io(mdev, sector, size);
4466                 case P_RS_CANCEL:
4467                         break;
4468                 default:
4469                         D_ASSERT(0);
4470                         put_ldev(mdev);
4471                         return false;
4472                 }
4473                 put_ldev(mdev);
4474         }
4475
4476         return true;
4477 }
4478
4479 static int got_BarrierAck(struct drbd_conf *mdev, struct p_header80 *h)
4480 {
4481         struct p_barrier_ack *p = (struct p_barrier_ack *)h;
4482
4483         tl_release(mdev, p->barrier, be32_to_cpu(p->set_size));
4484
4485         if (mdev->state.conn == C_AHEAD &&
4486             atomic_read(&mdev->ap_in_flight) == 0 &&
4487             !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &mdev->flags)) {
4488                 mdev->start_resync_timer.expires = jiffies + HZ;
4489                 add_timer(&mdev->start_resync_timer);
4490         }
4491
4492         return true;
4493 }
4494
4495 static int got_OVResult(struct drbd_conf *mdev, struct p_header80 *h)
4496 {
4497         struct p_block_ack *p = (struct p_block_ack *)h;
4498         struct drbd_work *w;
4499         sector_t sector;
4500         int size;
4501
4502         sector = be64_to_cpu(p->sector);
4503         size = be32_to_cpu(p->blksize);
4504
4505         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4506
4507         if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
4508                 drbd_ov_oos_found(mdev, sector, size);
4509         else
4510                 ov_oos_print(mdev);
4511
4512         if (!get_ldev(mdev))
4513                 return true;
4514
4515         drbd_rs_complete_io(mdev, sector);
4516         dec_rs_pending(mdev);
4517
4518         --mdev->ov_left;
4519
4520         /* let's advance progress step marks only for every other megabyte */
4521         if ((mdev->ov_left & 0x200) == 0x200)
4522                 drbd_advance_rs_marks(mdev, mdev->ov_left);
4523
4524         if (mdev->ov_left == 0) {
4525                 w = kmalloc(sizeof(*w), GFP_NOIO);
4526                 if (w) {
4527                         w->cb = w_ov_finished;
4528                         drbd_queue_work_front(&mdev->data.work, w);
4529                 } else {
4530                         dev_err(DEV, "kmalloc(w) failed.");
4531                         ov_oos_print(mdev);
4532                         drbd_resync_finished(mdev);
4533                 }
4534         }
4535         put_ldev(mdev);
4536         return true;
4537 }
4538
4539 static int got_skip(struct drbd_conf *mdev, struct p_header80 *h)
4540 {
4541         return true;
4542 }
4543
4544 struct asender_cmd {
4545         size_t pkt_size;
4546         int (*process)(struct drbd_conf *mdev, struct p_header80 *h);
4547 };
4548
4549 static struct asender_cmd *get_asender_cmd(int cmd)
4550 {
4551         static struct asender_cmd asender_tbl[] = {
4552                 /* anything missing from this table is in
4553                  * the drbd_cmd_handler (drbd_default_handler) table,
4554                  * see the beginning of drbdd() */
4555         [P_PING]            = { sizeof(struct p_header80), got_Ping },
4556         [P_PING_ACK]        = { sizeof(struct p_header80), got_PingAck },
4557         [P_RECV_ACK]        = { sizeof(struct p_block_ack), got_BlockAck },
4558         [P_WRITE_ACK]       = { sizeof(struct p_block_ack), got_BlockAck },
4559         [P_RS_WRITE_ACK]    = { sizeof(struct p_block_ack), got_BlockAck },
4560         [P_DISCARD_ACK]     = { sizeof(struct p_block_ack), got_BlockAck },
4561         [P_NEG_ACK]         = { sizeof(struct p_block_ack), got_NegAck },
4562         [P_NEG_DREPLY]      = { sizeof(struct p_block_ack), got_NegDReply },
4563         [P_NEG_RS_DREPLY]   = { sizeof(struct p_block_ack), got_NegRSDReply},
4564         [P_OV_RESULT]       = { sizeof(struct p_block_ack), got_OVResult },
4565         [P_BARRIER_ACK]     = { sizeof(struct p_barrier_ack), got_BarrierAck },
4566         [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
4567         [P_RS_IS_IN_SYNC]   = { sizeof(struct p_block_ack), got_IsInSync },
4568         [P_DELAY_PROBE]     = { sizeof(struct p_delay_probe93), got_skip },
4569         [P_RS_CANCEL]       = { sizeof(struct p_block_ack), got_NegRSDReply},
4570         [P_MAX_CMD]         = { 0, NULL },
4571         };
4572         if (cmd > P_MAX_CMD || asender_tbl[cmd].process == NULL)
4573                 return NULL;
4574         return &asender_tbl[cmd];
4575 }
4576
4577 int drbd_asender(struct drbd_thread *thi)
4578 {
4579         struct drbd_conf *mdev = thi->mdev;
4580         struct p_header80 *h = &mdev->meta.rbuf.header.h80;
4581         struct asender_cmd *cmd = NULL;
4582
4583         int rv, len;
4584         void *buf    = h;
4585         int received = 0;
4586         int expect   = sizeof(struct p_header80);
4587         int empty;
4588         int ping_timeout_active = 0;
4589
4590         sprintf(current->comm, "drbd%d_asender", mdev_to_minor(mdev));
4591
4592         current->policy = SCHED_RR;  /* Make this a realtime task! */
4593         current->rt_priority = 2;    /* more important than all other tasks */
4594
4595         while (get_t_state(thi) == Running) {
4596                 drbd_thread_current_set_cpu(mdev);
4597                 if (test_and_clear_bit(SEND_PING, &mdev->flags)) {
4598                         ERR_IF(!drbd_send_ping(mdev)) goto reconnect;
4599                         mdev->meta.socket->sk->sk_rcvtimeo =
4600                                 mdev->net_conf->ping_timeo*HZ/10;
4601                         ping_timeout_active = 1;
4602                 }
4603
4604                 /* conditionally cork;
4605                  * it may hurt latency if we cork without much to send */
4606                 if (!mdev->net_conf->no_cork &&
4607                         3 < atomic_read(&mdev->unacked_cnt))
4608                         drbd_tcp_cork(mdev->meta.socket);
4609                 while (1) {
4610                         clear_bit(SIGNAL_ASENDER, &mdev->flags);
4611                         flush_signals(current);
4612                         if (!drbd_process_done_ee(mdev))
4613                                 goto reconnect;
4614                         /* to avoid race with newly queued ACKs */
4615                         set_bit(SIGNAL_ASENDER, &mdev->flags);
4616                         spin_lock_irq(&mdev->req_lock);
4617                         empty = list_empty(&mdev->done_ee);
4618                         spin_unlock_irq(&mdev->req_lock);
4619                         /* new ack may have been queued right here,
4620                          * but then there is also a signal pending,
4621                          * and we start over... */
4622                         if (empty)
4623                                 break;
4624                 }
4625                 /* but unconditionally uncork unless disabled */
4626                 if (!mdev->net_conf->no_cork)
4627                         drbd_tcp_uncork(mdev->meta.socket);
4628
4629                 /* short circuit, recv_msg would return EINTR anyways. */
4630                 if (signal_pending(current))
4631                         continue;
4632
4633                 rv = drbd_recv_short(mdev, mdev->meta.socket,
4634                                      buf, expect-received, 0);
4635                 clear_bit(SIGNAL_ASENDER, &mdev->flags);
4636
4637                 flush_signals(current);
4638
4639                 /* Note:
4640                  * -EINTR        (on meta) we got a signal
4641                  * -EAGAIN       (on meta) rcvtimeo expired
4642                  * -ECONNRESET   other side closed the connection
4643                  * -ERESTARTSYS  (on data) we got a signal
4644                  * rv <  0       other than above: unexpected error!
4645                  * rv == expected: full header or command
4646                  * rv <  expected: "woken" by signal during receive
4647                  * rv == 0       : "connection shut down by peer"
4648                  */
4649                 if (likely(rv > 0)) {
4650                         received += rv;
4651                         buf      += rv;
4652                 } else if (rv == 0) {
4653                         dev_err(DEV, "meta connection shut down by peer.\n");
4654                         goto reconnect;
4655                 } else if (rv == -EAGAIN) {
4656                         /* If the data socket received something meanwhile,
4657                          * that is good enough: peer is still alive. */
4658                         if (time_after(mdev->last_received,
4659                                 jiffies - mdev->meta.socket->sk->sk_rcvtimeo))
4660                                 continue;
4661                         if (ping_timeout_active) {
4662                                 dev_err(DEV, "PingAck did not arrive in time.\n");
4663                                 goto reconnect;
4664                         }
4665                         set_bit(SEND_PING, &mdev->flags);
4666                         continue;
4667                 } else if (rv == -EINTR) {
4668                         continue;
4669                 } else {
4670                         dev_err(DEV, "sock_recvmsg returned %d\n", rv);
4671                         goto reconnect;
4672                 }
4673
4674                 if (received == expect && cmd == NULL) {
4675                         if (unlikely(h->magic != BE_DRBD_MAGIC)) {
4676                                 dev_err(DEV, "magic?? on meta m: 0x%08x c: %d l: %d\n",
4677                                     be32_to_cpu(h->magic),
4678                                     be16_to_cpu(h->command),
4679                                     be16_to_cpu(h->length));
4680                                 goto reconnect;
4681                         }
4682                         cmd = get_asender_cmd(be16_to_cpu(h->command));
4683                         len = be16_to_cpu(h->length);
4684                         if (unlikely(cmd == NULL)) {
4685                                 dev_err(DEV, "unknown command?? on meta m: 0x%08x c: %d l: %d\n",
4686                                     be32_to_cpu(h->magic),
4687                                     be16_to_cpu(h->command),
4688                                     be16_to_cpu(h->length));
4689                                 goto disconnect;
4690                         }
4691                         expect = cmd->pkt_size;
4692                         ERR_IF(len != expect-sizeof(struct p_header80))
4693                                 goto reconnect;
4694                 }
4695                 if (received == expect) {
4696                         mdev->last_received = jiffies;
4697                         D_ASSERT(cmd != NULL);
4698                         if (!cmd->process(mdev, h))
4699                                 goto reconnect;
4700
4701                         /* the idle_timeout (ping-int)
4702                          * has been restored in got_PingAck() */
4703                         if (cmd == get_asender_cmd(P_PING_ACK))
4704                                 ping_timeout_active = 0;
4705
4706                         buf      = h;
4707                         received = 0;
4708                         expect   = sizeof(struct p_header80);
4709                         cmd      = NULL;
4710                 }
4711         }
4712
4713         if (0) {
4714 reconnect:
4715                 drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE));
4716                 drbd_md_sync(mdev);
4717         }
4718         if (0) {
4719 disconnect:
4720                 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
4721                 drbd_md_sync(mdev);
4722         }
4723         clear_bit(SIGNAL_ASENDER, &mdev->flags);
4724
4725         D_ASSERT(mdev->state.conn < C_CONNECTED);
4726         dev_info(DEV, "asender terminated\n");
4727
4728         return 0;
4729 }