]> git.kernelconcepts.de Git - karo-tx-linux.git/blob - drivers/block/drbd/drbd_receiver.c
drbd: Refer to connect-int consistently throughout the code
[karo-tx-linux.git] / drivers / block / drbd / drbd_receiver.c
1 /*
2    drbd_receiver.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24
25
26 #include <linux/module.h>
27
28 #include <asm/uaccess.h>
29 #include <net/sock.h>
30
31 #include <linux/drbd.h>
32 #include <linux/fs.h>
33 #include <linux/file.h>
34 #include <linux/in.h>
35 #include <linux/mm.h>
36 #include <linux/memcontrol.h>
37 #include <linux/mm_inline.h>
38 #include <linux/slab.h>
39 #include <linux/pkt_sched.h>
40 #define __KERNEL_SYSCALLS__
41 #include <linux/unistd.h>
42 #include <linux/vmalloc.h>
43 #include <linux/random.h>
44 #include <linux/string.h>
45 #include <linux/scatterlist.h>
46 #include "drbd_int.h"
47 #include "drbd_req.h"
48
49 #include "drbd_vli.h"
50
51 struct packet_info {
52         enum drbd_packet cmd;
53         unsigned int size;
54         unsigned int vnr;
55         void *data;
56 };
57
58 enum finish_epoch {
59         FE_STILL_LIVE,
60         FE_DESTROYED,
61         FE_RECYCLED,
62 };
63
64 static int drbd_do_features(struct drbd_tconn *tconn);
65 static int drbd_do_auth(struct drbd_tconn *tconn);
66 static int drbd_disconnected(struct drbd_conf *mdev);
67
68 static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *, struct drbd_epoch *, enum epoch_event);
69 static int e_end_block(struct drbd_work *, int);
70
71
72 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
73
74 /*
75  * some helper functions to deal with single linked page lists,
76  * page->private being our "next" pointer.
77  */
78
79 /* If at least n pages are linked at head, get n pages off.
80  * Otherwise, don't modify head, and return NULL.
81  * Locking is the responsibility of the caller.
82  */
83 static struct page *page_chain_del(struct page **head, int n)
84 {
85         struct page *page;
86         struct page *tmp;
87
88         BUG_ON(!n);
89         BUG_ON(!head);
90
91         page = *head;
92
93         if (!page)
94                 return NULL;
95
96         while (page) {
97                 tmp = page_chain_next(page);
98                 if (--n == 0)
99                         break; /* found sufficient pages */
100                 if (tmp == NULL)
101                         /* insufficient pages, don't use any of them. */
102                         return NULL;
103                 page = tmp;
104         }
105
106         /* add end of list marker for the returned list */
107         set_page_private(page, 0);
108         /* actual return value, and adjustment of head */
109         page = *head;
110         *head = tmp;
111         return page;
112 }
113
114 /* may be used outside of locks to find the tail of a (usually short)
115  * "private" page chain, before adding it back to a global chain head
116  * with page_chain_add() under a spinlock. */
117 static struct page *page_chain_tail(struct page *page, int *len)
118 {
119         struct page *tmp;
120         int i = 1;
121         while ((tmp = page_chain_next(page)))
122                 ++i, page = tmp;
123         if (len)
124                 *len = i;
125         return page;
126 }
127
128 static int page_chain_free(struct page *page)
129 {
130         struct page *tmp;
131         int i = 0;
132         page_chain_for_each_safe(page, tmp) {
133                 put_page(page);
134                 ++i;
135         }
136         return i;
137 }
138
139 static void page_chain_add(struct page **head,
140                 struct page *chain_first, struct page *chain_last)
141 {
142 #if 1
143         struct page *tmp;
144         tmp = page_chain_tail(chain_first, NULL);
145         BUG_ON(tmp != chain_last);
146 #endif
147
148         /* add chain to head */
149         set_page_private(chain_last, (unsigned long)*head);
150         *head = chain_first;
151 }
152
153 static struct page *__drbd_alloc_pages(struct drbd_conf *mdev,
154                                        unsigned int number)
155 {
156         struct page *page = NULL;
157         struct page *tmp = NULL;
158         unsigned int i = 0;
159
160         /* Yes, testing drbd_pp_vacant outside the lock is racy.
161          * So what. It saves a spin_lock. */
162         if (drbd_pp_vacant >= number) {
163                 spin_lock(&drbd_pp_lock);
164                 page = page_chain_del(&drbd_pp_pool, number);
165                 if (page)
166                         drbd_pp_vacant -= number;
167                 spin_unlock(&drbd_pp_lock);
168                 if (page)
169                         return page;
170         }
171
172         /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
173          * "criss-cross" setup, that might cause write-out on some other DRBD,
174          * which in turn might block on the other node at this very place.  */
175         for (i = 0; i < number; i++) {
176                 tmp = alloc_page(GFP_TRY);
177                 if (!tmp)
178                         break;
179                 set_page_private(tmp, (unsigned long)page);
180                 page = tmp;
181         }
182
183         if (i == number)
184                 return page;
185
186         /* Not enough pages immediately available this time.
187          * No need to jump around here, drbd_alloc_pages will retry this
188          * function "soon". */
189         if (page) {
190                 tmp = page_chain_tail(page, NULL);
191                 spin_lock(&drbd_pp_lock);
192                 page_chain_add(&drbd_pp_pool, page, tmp);
193                 drbd_pp_vacant += i;
194                 spin_unlock(&drbd_pp_lock);
195         }
196         return NULL;
197 }
198
199 static void reclaim_finished_net_peer_reqs(struct drbd_conf *mdev,
200                                            struct list_head *to_be_freed)
201 {
202         struct drbd_peer_request *peer_req;
203         struct list_head *le, *tle;
204
205         /* The EEs are always appended to the end of the list. Since
206            they are sent in order over the wire, they have to finish
207            in order. As soon as we see the first not finished we can
208            stop to examine the list... */
209
210         list_for_each_safe(le, tle, &mdev->net_ee) {
211                 peer_req = list_entry(le, struct drbd_peer_request, w.list);
212                 if (drbd_peer_req_has_active_page(peer_req))
213                         break;
214                 list_move(le, to_be_freed);
215         }
216 }
217
218 static void drbd_kick_lo_and_reclaim_net(struct drbd_conf *mdev)
219 {
220         LIST_HEAD(reclaimed);
221         struct drbd_peer_request *peer_req, *t;
222
223         spin_lock_irq(&mdev->tconn->req_lock);
224         reclaim_finished_net_peer_reqs(mdev, &reclaimed);
225         spin_unlock_irq(&mdev->tconn->req_lock);
226
227         list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
228                 drbd_free_net_peer_req(mdev, peer_req);
229 }
230
231 /**
232  * drbd_alloc_pages() - Returns @number pages, retries forever (or until signalled)
233  * @mdev:       DRBD device.
234  * @number:     number of pages requested
235  * @retry:      whether to retry, if not enough pages are available right now
236  *
237  * Tries to allocate number pages, first from our own page pool, then from
238  * the kernel, unless this allocation would exceed the max_buffers setting.
239  * Possibly retry until DRBD frees sufficient pages somewhere else.
240  *
241  * Returns a page chain linked via page->private.
242  */
243 struct page *drbd_alloc_pages(struct drbd_conf *mdev, unsigned int number,
244                               bool retry)
245 {
246         struct page *page = NULL;
247         struct net_conf *nc;
248         DEFINE_WAIT(wait);
249         int mxb;
250
251         /* Yes, we may run up to @number over max_buffers. If we
252          * follow it strictly, the admin will get it wrong anyways. */
253         rcu_read_lock();
254         nc = rcu_dereference(mdev->tconn->net_conf);
255         mxb = nc ? nc->max_buffers : 1000000;
256         rcu_read_unlock();
257
258         if (atomic_read(&mdev->pp_in_use) < mxb)
259                 page = __drbd_alloc_pages(mdev, number);
260
261         while (page == NULL) {
262                 prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
263
264                 drbd_kick_lo_and_reclaim_net(mdev);
265
266                 if (atomic_read(&mdev->pp_in_use) < mxb) {
267                         page = __drbd_alloc_pages(mdev, number);
268                         if (page)
269                                 break;
270                 }
271
272                 if (!retry)
273                         break;
274
275                 if (signal_pending(current)) {
276                         dev_warn(DEV, "drbd_alloc_pages interrupted!\n");
277                         break;
278                 }
279
280                 schedule();
281         }
282         finish_wait(&drbd_pp_wait, &wait);
283
284         if (page)
285                 atomic_add(number, &mdev->pp_in_use);
286         return page;
287 }
288
289 /* Must not be used from irq, as that may deadlock: see drbd_alloc_pages.
290  * Is also used from inside an other spin_lock_irq(&mdev->tconn->req_lock);
291  * Either links the page chain back to the global pool,
292  * or returns all pages to the system. */
293 static void drbd_free_pages(struct drbd_conf *mdev, struct page *page, int is_net)
294 {
295         atomic_t *a = is_net ? &mdev->pp_in_use_by_net : &mdev->pp_in_use;
296         int i;
297
298         if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * minor_count)
299                 i = page_chain_free(page);
300         else {
301                 struct page *tmp;
302                 tmp = page_chain_tail(page, &i);
303                 spin_lock(&drbd_pp_lock);
304                 page_chain_add(&drbd_pp_pool, page, tmp);
305                 drbd_pp_vacant += i;
306                 spin_unlock(&drbd_pp_lock);
307         }
308         i = atomic_sub_return(i, a);
309         if (i < 0)
310                 dev_warn(DEV, "ASSERTION FAILED: %s: %d < 0\n",
311                         is_net ? "pp_in_use_by_net" : "pp_in_use", i);
312         wake_up(&drbd_pp_wait);
313 }
314
315 /*
316 You need to hold the req_lock:
317  _drbd_wait_ee_list_empty()
318
319 You must not have the req_lock:
320  drbd_free_peer_req()
321  drbd_alloc_peer_req()
322  drbd_free_peer_reqs()
323  drbd_ee_fix_bhs()
324  drbd_finish_peer_reqs()
325  drbd_clear_done_ee()
326  drbd_wait_ee_list_empty()
327 */
328
329 struct drbd_peer_request *
330 drbd_alloc_peer_req(struct drbd_conf *mdev, u64 id, sector_t sector,
331                     unsigned int data_size, gfp_t gfp_mask) __must_hold(local)
332 {
333         struct drbd_peer_request *peer_req;
334         struct page *page;
335         unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
336
337         if (drbd_insert_fault(mdev, DRBD_FAULT_AL_EE))
338                 return NULL;
339
340         peer_req = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
341         if (!peer_req) {
342                 if (!(gfp_mask & __GFP_NOWARN))
343                         dev_err(DEV, "%s: allocation failed\n", __func__);
344                 return NULL;
345         }
346
347         page = drbd_alloc_pages(mdev, nr_pages, (gfp_mask & __GFP_WAIT));
348         if (!page)
349                 goto fail;
350
351         drbd_clear_interval(&peer_req->i);
352         peer_req->i.size = data_size;
353         peer_req->i.sector = sector;
354         peer_req->i.local = false;
355         peer_req->i.waiting = false;
356
357         peer_req->epoch = NULL;
358         peer_req->w.mdev = mdev;
359         peer_req->pages = page;
360         atomic_set(&peer_req->pending_bios, 0);
361         peer_req->flags = 0;
362         /*
363          * The block_id is opaque to the receiver.  It is not endianness
364          * converted, and sent back to the sender unchanged.
365          */
366         peer_req->block_id = id;
367
368         return peer_req;
369
370  fail:
371         mempool_free(peer_req, drbd_ee_mempool);
372         return NULL;
373 }
374
375 void __drbd_free_peer_req(struct drbd_conf *mdev, struct drbd_peer_request *peer_req,
376                        int is_net)
377 {
378         if (peer_req->flags & EE_HAS_DIGEST)
379                 kfree(peer_req->digest);
380         drbd_free_pages(mdev, peer_req->pages, is_net);
381         D_ASSERT(atomic_read(&peer_req->pending_bios) == 0);
382         D_ASSERT(drbd_interval_empty(&peer_req->i));
383         mempool_free(peer_req, drbd_ee_mempool);
384 }
385
386 int drbd_free_peer_reqs(struct drbd_conf *mdev, struct list_head *list)
387 {
388         LIST_HEAD(work_list);
389         struct drbd_peer_request *peer_req, *t;
390         int count = 0;
391         int is_net = list == &mdev->net_ee;
392
393         spin_lock_irq(&mdev->tconn->req_lock);
394         list_splice_init(list, &work_list);
395         spin_unlock_irq(&mdev->tconn->req_lock);
396
397         list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
398                 __drbd_free_peer_req(mdev, peer_req, is_net);
399                 count++;
400         }
401         return count;
402 }
403
404 /*
405  * See also comments in _req_mod(,BARRIER_ACKED) and receive_Barrier.
406  */
407 static int drbd_finish_peer_reqs(struct drbd_conf *mdev)
408 {
409         LIST_HEAD(work_list);
410         LIST_HEAD(reclaimed);
411         struct drbd_peer_request *peer_req, *t;
412         int err = 0;
413
414         spin_lock_irq(&mdev->tconn->req_lock);
415         reclaim_finished_net_peer_reqs(mdev, &reclaimed);
416         list_splice_init(&mdev->done_ee, &work_list);
417         spin_unlock_irq(&mdev->tconn->req_lock);
418
419         list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
420                 drbd_free_net_peer_req(mdev, peer_req);
421
422         /* possible callbacks here:
423          * e_end_block, and e_end_resync_block, e_send_discard_write.
424          * all ignore the last argument.
425          */
426         list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
427                 int err2;
428
429                 /* list_del not necessary, next/prev members not touched */
430                 err2 = peer_req->w.cb(&peer_req->w, !!err);
431                 if (!err)
432                         err = err2;
433                 drbd_free_peer_req(mdev, peer_req);
434         }
435         wake_up(&mdev->ee_wait);
436
437         return err;
438 }
439
440 static void _drbd_wait_ee_list_empty(struct drbd_conf *mdev,
441                                      struct list_head *head)
442 {
443         DEFINE_WAIT(wait);
444
445         /* avoids spin_lock/unlock
446          * and calling prepare_to_wait in the fast path */
447         while (!list_empty(head)) {
448                 prepare_to_wait(&mdev->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
449                 spin_unlock_irq(&mdev->tconn->req_lock);
450                 io_schedule();
451                 finish_wait(&mdev->ee_wait, &wait);
452                 spin_lock_irq(&mdev->tconn->req_lock);
453         }
454 }
455
456 static void drbd_wait_ee_list_empty(struct drbd_conf *mdev,
457                                     struct list_head *head)
458 {
459         spin_lock_irq(&mdev->tconn->req_lock);
460         _drbd_wait_ee_list_empty(mdev, head);
461         spin_unlock_irq(&mdev->tconn->req_lock);
462 }
463
464 /* see also kernel_accept; which is only present since 2.6.18.
465  * also we want to log which part of it failed, exactly */
466 static int drbd_accept(const char **what, struct socket *sock, struct socket **newsock)
467 {
468         struct sock *sk = sock->sk;
469         int err = 0;
470
471         *what = "listen";
472         err = sock->ops->listen(sock, 5);
473         if (err < 0)
474                 goto out;
475
476         *what = "sock_create_lite";
477         err = sock_create_lite(sk->sk_family, sk->sk_type, sk->sk_protocol,
478                                newsock);
479         if (err < 0)
480                 goto out;
481
482         *what = "accept";
483         err = sock->ops->accept(sock, *newsock, 0);
484         if (err < 0) {
485                 sock_release(*newsock);
486                 *newsock = NULL;
487                 goto out;
488         }
489         (*newsock)->ops  = sock->ops;
490
491 out:
492         return err;
493 }
494
495 static int drbd_recv_short(struct socket *sock, void *buf, size_t size, int flags)
496 {
497         mm_segment_t oldfs;
498         struct kvec iov = {
499                 .iov_base = buf,
500                 .iov_len = size,
501         };
502         struct msghdr msg = {
503                 .msg_iovlen = 1,
504                 .msg_iov = (struct iovec *)&iov,
505                 .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
506         };
507         int rv;
508
509         oldfs = get_fs();
510         set_fs(KERNEL_DS);
511         rv = sock_recvmsg(sock, &msg, size, msg.msg_flags);
512         set_fs(oldfs);
513
514         return rv;
515 }
516
517 static int drbd_recv(struct drbd_tconn *tconn, void *buf, size_t size)
518 {
519         mm_segment_t oldfs;
520         struct kvec iov = {
521                 .iov_base = buf,
522                 .iov_len = size,
523         };
524         struct msghdr msg = {
525                 .msg_iovlen = 1,
526                 .msg_iov = (struct iovec *)&iov,
527                 .msg_flags = MSG_WAITALL | MSG_NOSIGNAL
528         };
529         int rv;
530
531         oldfs = get_fs();
532         set_fs(KERNEL_DS);
533
534         for (;;) {
535                 rv = sock_recvmsg(tconn->data.socket, &msg, size, msg.msg_flags);
536                 if (rv == size)
537                         break;
538
539                 /* Note:
540                  * ECONNRESET   other side closed the connection
541                  * ERESTARTSYS  (on  sock) we got a signal
542                  */
543
544                 if (rv < 0) {
545                         if (rv == -ECONNRESET)
546                                 conn_info(tconn, "sock was reset by peer\n");
547                         else if (rv != -ERESTARTSYS)
548                                 conn_err(tconn, "sock_recvmsg returned %d\n", rv);
549                         break;
550                 } else if (rv == 0) {
551                         conn_info(tconn, "sock was shut down by peer\n");
552                         break;
553                 } else  {
554                         /* signal came in, or peer/link went down,
555                          * after we read a partial message
556                          */
557                         /* D_ASSERT(signal_pending(current)); */
558                         break;
559                 }
560         };
561
562         set_fs(oldfs);
563
564         if (rv != size)
565                 conn_request_state(tconn, NS(conn, C_BROKEN_PIPE), CS_HARD);
566
567         return rv;
568 }
569
570 static int drbd_recv_all(struct drbd_tconn *tconn, void *buf, size_t size)
571 {
572         int err;
573
574         err = drbd_recv(tconn, buf, size);
575         if (err != size) {
576                 if (err >= 0)
577                         err = -EIO;
578         } else
579                 err = 0;
580         return err;
581 }
582
583 static int drbd_recv_all_warn(struct drbd_tconn *tconn, void *buf, size_t size)
584 {
585         int err;
586
587         err = drbd_recv_all(tconn, buf, size);
588         if (err && !signal_pending(current))
589                 conn_warn(tconn, "short read (expected size %d)\n", (int)size);
590         return err;
591 }
592
593 /* quoting tcp(7):
594  *   On individual connections, the socket buffer size must be set prior to the
595  *   listen(2) or connect(2) calls in order to have it take effect.
596  * This is our wrapper to do so.
597  */
598 static void drbd_setbufsize(struct socket *sock, unsigned int snd,
599                 unsigned int rcv)
600 {
601         /* open coded SO_SNDBUF, SO_RCVBUF */
602         if (snd) {
603                 sock->sk->sk_sndbuf = snd;
604                 sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
605         }
606         if (rcv) {
607                 sock->sk->sk_rcvbuf = rcv;
608                 sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
609         }
610 }
611
612 static struct socket *drbd_try_connect(struct drbd_tconn *tconn)
613 {
614         const char *what;
615         struct socket *sock;
616         struct sockaddr_in6 src_in6;
617         struct sockaddr_in6 peer_in6;
618         struct net_conf *nc;
619         int err, peer_addr_len, my_addr_len;
620         int sndbuf_size, rcvbuf_size, connect_int;
621         int disconnect_on_error = 1;
622
623         rcu_read_lock();
624         nc = rcu_dereference(tconn->net_conf);
625         if (!nc) {
626                 rcu_read_unlock();
627                 return NULL;
628         }
629
630         sndbuf_size = nc->sndbuf_size;
631         rcvbuf_size = nc->rcvbuf_size;
632         connect_int = nc->connect_int;
633
634         my_addr_len = min_t(int, nc->my_addr_len, sizeof(src_in6));
635         memcpy(&src_in6, nc->my_addr, my_addr_len);
636
637         if (((struct sockaddr *)nc->my_addr)->sa_family == AF_INET6)
638                 src_in6.sin6_port = 0;
639         else
640                 ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
641
642         peer_addr_len = min_t(int, nc->peer_addr_len, sizeof(src_in6));
643         memcpy(&peer_in6, nc->peer_addr, peer_addr_len);
644
645         rcu_read_unlock();
646
647         what = "sock_create_kern";
648         err = sock_create_kern(((struct sockaddr *)&src_in6)->sa_family,
649                                SOCK_STREAM, IPPROTO_TCP, &sock);
650         if (err < 0) {
651                 sock = NULL;
652                 goto out;
653         }
654
655         sock->sk->sk_rcvtimeo =
656         sock->sk->sk_sndtimeo = connect_int * HZ;
657         drbd_setbufsize(sock, sndbuf_size, rcvbuf_size);
658
659        /* explicitly bind to the configured IP as source IP
660         *  for the outgoing connections.
661         *  This is needed for multihomed hosts and to be
662         *  able to use lo: interfaces for drbd.
663         * Make sure to use 0 as port number, so linux selects
664         *  a free one dynamically.
665         */
666         what = "bind before connect";
667         err = sock->ops->bind(sock, (struct sockaddr *) &src_in6, my_addr_len);
668         if (err < 0)
669                 goto out;
670
671         /* connect may fail, peer not yet available.
672          * stay C_WF_CONNECTION, don't go Disconnecting! */
673         disconnect_on_error = 0;
674         what = "connect";
675         err = sock->ops->connect(sock, (struct sockaddr *) &peer_in6, peer_addr_len, 0);
676
677 out:
678         if (err < 0) {
679                 if (sock) {
680                         sock_release(sock);
681                         sock = NULL;
682                 }
683                 switch (-err) {
684                         /* timeout, busy, signal pending */
685                 case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
686                 case EINTR: case ERESTARTSYS:
687                         /* peer not (yet) available, network problem */
688                 case ECONNREFUSED: case ENETUNREACH:
689                 case EHOSTDOWN:    case EHOSTUNREACH:
690                         disconnect_on_error = 0;
691                         break;
692                 default:
693                         conn_err(tconn, "%s failed, err = %d\n", what, err);
694                 }
695                 if (disconnect_on_error)
696                         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
697         }
698
699         return sock;
700 }
701
702 static struct socket *drbd_wait_for_connect(struct drbd_tconn *tconn)
703 {
704         int timeo, err, my_addr_len;
705         int sndbuf_size, rcvbuf_size, connect_int;
706         struct socket *s_estab = NULL, *s_listen;
707         struct sockaddr_in6 my_addr;
708         struct net_conf *nc;
709         const char *what;
710
711         rcu_read_lock();
712         nc = rcu_dereference(tconn->net_conf);
713         if (!nc) {
714                 rcu_read_unlock();
715                 return NULL;
716         }
717
718         sndbuf_size = nc->sndbuf_size;
719         rcvbuf_size = nc->rcvbuf_size;
720         connect_int = nc->connect_int;
721
722         my_addr_len = min_t(int, nc->my_addr_len, sizeof(struct sockaddr_in6));
723         memcpy(&my_addr, nc->my_addr, my_addr_len);
724         rcu_read_unlock();
725
726         what = "sock_create_kern";
727         err = sock_create_kern(((struct sockaddr *)&my_addr)->sa_family,
728                 SOCK_STREAM, IPPROTO_TCP, &s_listen);
729         if (err) {
730                 s_listen = NULL;
731                 goto out;
732         }
733
734         timeo = connect_int * HZ;
735         timeo += (random32() & 1) ? timeo / 7 : -timeo / 7; /* 28.5% random jitter */
736
737         s_listen->sk->sk_reuse    = 1; /* SO_REUSEADDR */
738         s_listen->sk->sk_rcvtimeo = timeo;
739         s_listen->sk->sk_sndtimeo = timeo;
740         drbd_setbufsize(s_listen, sndbuf_size, rcvbuf_size);
741
742         what = "bind before listen";
743         err = s_listen->ops->bind(s_listen, (struct sockaddr *)&my_addr, my_addr_len);
744         if (err < 0)
745                 goto out;
746
747         err = drbd_accept(&what, s_listen, &s_estab);
748
749 out:
750         if (s_listen)
751                 sock_release(s_listen);
752         if (err < 0) {
753                 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
754                         conn_err(tconn, "%s failed, err = %d\n", what, err);
755                         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
756                 }
757         }
758
759         return s_estab;
760 }
761
762 static int decode_header(struct drbd_tconn *, void *, struct packet_info *);
763
764 static int send_first_packet(struct drbd_tconn *tconn, struct drbd_socket *sock,
765                              enum drbd_packet cmd)
766 {
767         if (!conn_prepare_command(tconn, sock))
768                 return -EIO;
769         return conn_send_command(tconn, sock, cmd, 0, NULL, 0);
770 }
771
772 static int receive_first_packet(struct drbd_tconn *tconn, struct socket *sock)
773 {
774         unsigned int header_size = drbd_header_size(tconn);
775         struct packet_info pi;
776         int err;
777
778         err = drbd_recv_short(sock, tconn->data.rbuf, header_size, 0);
779         if (err != header_size) {
780                 if (err >= 0)
781                         err = -EIO;
782                 return err;
783         }
784         err = decode_header(tconn, tconn->data.rbuf, &pi);
785         if (err)
786                 return err;
787         return pi.cmd;
788 }
789
790 /**
791  * drbd_socket_okay() - Free the socket if its connection is not okay
792  * @sock:       pointer to the pointer to the socket.
793  */
794 static int drbd_socket_okay(struct socket **sock)
795 {
796         int rr;
797         char tb[4];
798
799         if (!*sock)
800                 return false;
801
802         rr = drbd_recv_short(*sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
803
804         if (rr > 0 || rr == -EAGAIN) {
805                 return true;
806         } else {
807                 sock_release(*sock);
808                 *sock = NULL;
809                 return false;
810         }
811 }
812 /* Gets called if a connection is established, or if a new minor gets created
813    in a connection */
814 int drbd_connected(struct drbd_conf *mdev)
815 {
816         int err;
817
818         atomic_set(&mdev->packet_seq, 0);
819         mdev->peer_seq = 0;
820
821         mdev->state_mutex = mdev->tconn->agreed_pro_version < 100 ?
822                 &mdev->tconn->cstate_mutex :
823                 &mdev->own_state_mutex;
824
825         err = drbd_send_sync_param(mdev);
826         if (!err)
827                 err = drbd_send_sizes(mdev, 0, 0);
828         if (!err)
829                 err = drbd_send_uuids(mdev);
830         if (!err)
831                 err = drbd_send_state(mdev);
832         clear_bit(USE_DEGR_WFC_T, &mdev->flags);
833         clear_bit(RESIZE_PENDING, &mdev->flags);
834         mod_timer(&mdev->request_timer, jiffies + HZ); /* just start it here. */
835         return err;
836 }
837
838 /*
839  * return values:
840  *   1 yes, we have a valid connection
841  *   0 oops, did not work out, please try again
842  *  -1 peer talks different language,
843  *     no point in trying again, please go standalone.
844  *  -2 We do not have a network config...
845  */
846 static int conn_connect(struct drbd_tconn *tconn)
847 {
848         struct socket *sock, *msock;
849         struct drbd_conf *mdev;
850         struct net_conf *nc;
851         int vnr, timeout, try, h, ok;
852
853         if (conn_request_state(tconn, NS(conn, C_WF_CONNECTION), CS_VERBOSE) < SS_SUCCESS)
854                 return -2;
855
856         clear_bit(DISCARD_CONCURRENT, &tconn->flags);
857
858         /* Assume that the peer only understands protocol 80 until we know better.  */
859         tconn->agreed_pro_version = 80;
860
861         do {
862                 struct socket *s;
863
864                 for (try = 0;;) {
865                         /* 3 tries, this should take less than a second! */
866                         s = drbd_try_connect(tconn);
867                         if (s || ++try >= 3)
868                                 break;
869                         /* give the other side time to call bind() & listen() */
870                         schedule_timeout_interruptible(HZ / 10);
871                 }
872
873                 if (s) {
874                         if (!tconn->data.socket) {
875                                 tconn->data.socket = s;
876                                 send_first_packet(tconn, &tconn->data, P_INITIAL_DATA);
877                         } else if (!tconn->meta.socket) {
878                                 tconn->meta.socket = s;
879                                 send_first_packet(tconn, &tconn->meta, P_INITIAL_META);
880                         } else {
881                                 conn_err(tconn, "Logic error in conn_connect()\n");
882                                 goto out_release_sockets;
883                         }
884                 }
885
886                 if (tconn->data.socket && tconn->meta.socket) {
887                         schedule_timeout_interruptible(tconn->net_conf->ping_timeo*HZ/10);
888                         ok = drbd_socket_okay(&tconn->data.socket);
889                         ok = drbd_socket_okay(&tconn->meta.socket) && ok;
890                         if (ok)
891                                 break;
892                 }
893
894 retry:
895                 s = drbd_wait_for_connect(tconn);
896                 if (s) {
897                         try = receive_first_packet(tconn, s);
898                         drbd_socket_okay(&tconn->data.socket);
899                         drbd_socket_okay(&tconn->meta.socket);
900                         switch (try) {
901                         case P_INITIAL_DATA:
902                                 if (tconn->data.socket) {
903                                         conn_warn(tconn, "initial packet S crossed\n");
904                                         sock_release(tconn->data.socket);
905                                 }
906                                 tconn->data.socket = s;
907                                 break;
908                         case P_INITIAL_META:
909                                 if (tconn->meta.socket) {
910                                         conn_warn(tconn, "initial packet M crossed\n");
911                                         sock_release(tconn->meta.socket);
912                                 }
913                                 tconn->meta.socket = s;
914                                 set_bit(DISCARD_CONCURRENT, &tconn->flags);
915                                 break;
916                         default:
917                                 conn_warn(tconn, "Error receiving initial packet\n");
918                                 sock_release(s);
919                                 if (random32() & 1)
920                                         goto retry;
921                         }
922                 }
923
924                 if (tconn->cstate <= C_DISCONNECTING)
925                         goto out_release_sockets;
926                 if (signal_pending(current)) {
927                         flush_signals(current);
928                         smp_rmb();
929                         if (get_t_state(&tconn->receiver) == EXITING)
930                                 goto out_release_sockets;
931                 }
932
933                 if (tconn->data.socket && &tconn->meta.socket) {
934                         ok = drbd_socket_okay(&tconn->data.socket);
935                         ok = drbd_socket_okay(&tconn->meta.socket) && ok;
936                         if (ok)
937                                 break;
938                 }
939         } while (1);
940
941         sock  = tconn->data.socket;
942         msock = tconn->meta.socket;
943
944         msock->sk->sk_reuse = 1; /* SO_REUSEADDR */
945         sock->sk->sk_reuse = 1; /* SO_REUSEADDR */
946
947         sock->sk->sk_allocation = GFP_NOIO;
948         msock->sk->sk_allocation = GFP_NOIO;
949
950         sock->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
951         msock->sk->sk_priority = TC_PRIO_INTERACTIVE;
952
953         /* NOT YET ...
954          * sock->sk->sk_sndtimeo = tconn->net_conf->timeout*HZ/10;
955          * sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
956          * first set it to the P_CONNECTION_FEATURES timeout,
957          * which we set to 4x the configured ping_timeout. */
958         rcu_read_lock();
959         nc = rcu_dereference(tconn->net_conf);
960
961         sock->sk->sk_sndtimeo =
962         sock->sk->sk_rcvtimeo = nc->ping_timeo*4*HZ/10;
963
964         msock->sk->sk_rcvtimeo = nc->ping_int*HZ;
965         timeout = nc->timeout * HZ / 10;
966         rcu_read_unlock();
967
968         msock->sk->sk_sndtimeo = timeout;
969
970         /* we don't want delays.
971          * we use TCP_CORK where appropriate, though */
972         drbd_tcp_nodelay(sock);
973         drbd_tcp_nodelay(msock);
974
975         tconn->last_received = jiffies;
976
977         h = drbd_do_features(tconn);
978         if (h <= 0)
979                 return h;
980
981         if (tconn->cram_hmac_tfm) {
982                 /* drbd_request_state(mdev, NS(conn, WFAuth)); */
983                 switch (drbd_do_auth(tconn)) {
984                 case -1:
985                         conn_err(tconn, "Authentication of peer failed\n");
986                         return -1;
987                 case 0:
988                         conn_err(tconn, "Authentication of peer failed, trying again.\n");
989                         return 0;
990                 }
991         }
992
993         if (conn_request_state(tconn, NS(conn, C_WF_REPORT_PARAMS), CS_VERBOSE) < SS_SUCCESS)
994                 return 0;
995
996         sock->sk->sk_sndtimeo = timeout;
997         sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
998
999         drbd_thread_start(&tconn->asender);
1000
1001         if (drbd_send_protocol(tconn) == -EOPNOTSUPP)
1002                 return -1;
1003
1004         rcu_read_lock();
1005         idr_for_each_entry(&tconn->volumes, mdev, vnr) {
1006                 kref_get(&mdev->kref);
1007                 rcu_read_unlock();
1008                 drbd_connected(mdev);
1009                 kref_put(&mdev->kref, &drbd_minor_destroy);
1010                 rcu_read_lock();
1011         }
1012         rcu_read_unlock();
1013
1014         return h;
1015
1016 out_release_sockets:
1017         if (tconn->data.socket) {
1018                 sock_release(tconn->data.socket);
1019                 tconn->data.socket = NULL;
1020         }
1021         if (tconn->meta.socket) {
1022                 sock_release(tconn->meta.socket);
1023                 tconn->meta.socket = NULL;
1024         }
1025         return -1;
1026 }
1027
1028 static int decode_header(struct drbd_tconn *tconn, void *header, struct packet_info *pi)
1029 {
1030         unsigned int header_size = drbd_header_size(tconn);
1031
1032         if (header_size == sizeof(struct p_header100) &&
1033             *(__be32 *)header == cpu_to_be32(DRBD_MAGIC_100)) {
1034                 struct p_header100 *h = header;
1035                 if (h->pad != 0) {
1036                         conn_err(tconn, "Header padding is not zero\n");
1037                         return -EINVAL;
1038                 }
1039                 pi->vnr = be16_to_cpu(h->volume);
1040                 pi->cmd = be16_to_cpu(h->command);
1041                 pi->size = be32_to_cpu(h->length);
1042         } else if (header_size == sizeof(struct p_header95) &&
1043                    *(__be16 *)header == cpu_to_be16(DRBD_MAGIC_BIG)) {
1044                 struct p_header95 *h = header;
1045                 pi->cmd = be16_to_cpu(h->command);
1046                 pi->size = be32_to_cpu(h->length);
1047                 pi->vnr = 0;
1048         } else if (header_size == sizeof(struct p_header80) &&
1049                    *(__be32 *)header == cpu_to_be32(DRBD_MAGIC)) {
1050                 struct p_header80 *h = header;
1051                 pi->cmd = be16_to_cpu(h->command);
1052                 pi->size = be16_to_cpu(h->length);
1053                 pi->vnr = 0;
1054         } else {
1055                 conn_err(tconn, "Wrong magic value 0x%08x in protocol version %d\n",
1056                          be32_to_cpu(*(__be32 *)header),
1057                          tconn->agreed_pro_version);
1058                 return -EINVAL;
1059         }
1060         pi->data = header + header_size;
1061         return 0;
1062 }
1063
1064 static int drbd_recv_header(struct drbd_tconn *tconn, struct packet_info *pi)
1065 {
1066         void *buffer = tconn->data.rbuf;
1067         int err;
1068
1069         err = drbd_recv_all_warn(tconn, buffer, drbd_header_size(tconn));
1070         if (err)
1071                 return err;
1072
1073         err = decode_header(tconn, buffer, pi);
1074         tconn->last_received = jiffies;
1075
1076         return err;
1077 }
1078
1079 static void drbd_flush(struct drbd_conf *mdev)
1080 {
1081         int rv;
1082
1083         if (mdev->write_ordering >= WO_bdev_flush && get_ldev(mdev)) {
1084                 rv = blkdev_issue_flush(mdev->ldev->backing_bdev, GFP_KERNEL,
1085                                         NULL);
1086                 if (rv) {
1087                         dev_err(DEV, "local disk flush failed with status %d\n", rv);
1088                         /* would rather check on EOPNOTSUPP, but that is not reliable.
1089                          * don't try again for ANY return value != 0
1090                          * if (rv == -EOPNOTSUPP) */
1091                         drbd_bump_write_ordering(mdev, WO_drain_io);
1092                 }
1093                 put_ldev(mdev);
1094         }
1095 }
1096
1097 /**
1098  * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
1099  * @mdev:       DRBD device.
1100  * @epoch:      Epoch object.
1101  * @ev:         Epoch event.
1102  */
1103 static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *mdev,
1104                                                struct drbd_epoch *epoch,
1105                                                enum epoch_event ev)
1106 {
1107         int epoch_size;
1108         struct drbd_epoch *next_epoch;
1109         enum finish_epoch rv = FE_STILL_LIVE;
1110
1111         spin_lock(&mdev->epoch_lock);
1112         do {
1113                 next_epoch = NULL;
1114
1115                 epoch_size = atomic_read(&epoch->epoch_size);
1116
1117                 switch (ev & ~EV_CLEANUP) {
1118                 case EV_PUT:
1119                         atomic_dec(&epoch->active);
1120                         break;
1121                 case EV_GOT_BARRIER_NR:
1122                         set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1123                         break;
1124                 case EV_BECAME_LAST:
1125                         /* nothing to do*/
1126                         break;
1127                 }
1128
1129                 if (epoch_size != 0 &&
1130                     atomic_read(&epoch->active) == 0 &&
1131                     test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags)) {
1132                         if (!(ev & EV_CLEANUP)) {
1133                                 spin_unlock(&mdev->epoch_lock);
1134                                 drbd_send_b_ack(mdev, epoch->barrier_nr, epoch_size);
1135                                 spin_lock(&mdev->epoch_lock);
1136                         }
1137                         dec_unacked(mdev);
1138
1139                         if (mdev->current_epoch != epoch) {
1140                                 next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1141                                 list_del(&epoch->list);
1142                                 ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1143                                 mdev->epochs--;
1144                                 kfree(epoch);
1145
1146                                 if (rv == FE_STILL_LIVE)
1147                                         rv = FE_DESTROYED;
1148                         } else {
1149                                 epoch->flags = 0;
1150                                 atomic_set(&epoch->epoch_size, 0);
1151                                 /* atomic_set(&epoch->active, 0); is already zero */
1152                                 if (rv == FE_STILL_LIVE)
1153                                         rv = FE_RECYCLED;
1154                                 wake_up(&mdev->ee_wait);
1155                         }
1156                 }
1157
1158                 if (!next_epoch)
1159                         break;
1160
1161                 epoch = next_epoch;
1162         } while (1);
1163
1164         spin_unlock(&mdev->epoch_lock);
1165
1166         return rv;
1167 }
1168
1169 /**
1170  * drbd_bump_write_ordering() - Fall back to an other write ordering method
1171  * @mdev:       DRBD device.
1172  * @wo:         Write ordering method to try.
1173  */
1174 void drbd_bump_write_ordering(struct drbd_conf *mdev, enum write_ordering_e wo) __must_hold(local)
1175 {
1176         struct disk_conf *dc;
1177         enum write_ordering_e pwo;
1178         static char *write_ordering_str[] = {
1179                 [WO_none] = "none",
1180                 [WO_drain_io] = "drain",
1181                 [WO_bdev_flush] = "flush",
1182         };
1183
1184         pwo = mdev->write_ordering;
1185         wo = min(pwo, wo);
1186         rcu_read_lock();
1187         dc = rcu_dereference(mdev->ldev->disk_conf);
1188
1189         if (wo == WO_bdev_flush && !dc->disk_flushes)
1190                 wo = WO_drain_io;
1191         if (wo == WO_drain_io && !dc->disk_drain)
1192                 wo = WO_none;
1193         rcu_read_unlock();
1194         mdev->write_ordering = wo;
1195         if (pwo != mdev->write_ordering || wo == WO_bdev_flush)
1196                 dev_info(DEV, "Method to ensure write ordering: %s\n", write_ordering_str[mdev->write_ordering]);
1197 }
1198
1199 /**
1200  * drbd_submit_peer_request()
1201  * @mdev:       DRBD device.
1202  * @peer_req:   peer request
1203  * @rw:         flag field, see bio->bi_rw
1204  *
1205  * May spread the pages to multiple bios,
1206  * depending on bio_add_page restrictions.
1207  *
1208  * Returns 0 if all bios have been submitted,
1209  * -ENOMEM if we could not allocate enough bios,
1210  * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1211  *  single page to an empty bio (which should never happen and likely indicates
1212  *  that the lower level IO stack is in some way broken). This has been observed
1213  *  on certain Xen deployments.
1214  */
1215 /* TODO allocate from our own bio_set. */
1216 int drbd_submit_peer_request(struct drbd_conf *mdev,
1217                              struct drbd_peer_request *peer_req,
1218                              const unsigned rw, const int fault_type)
1219 {
1220         struct bio *bios = NULL;
1221         struct bio *bio;
1222         struct page *page = peer_req->pages;
1223         sector_t sector = peer_req->i.sector;
1224         unsigned ds = peer_req->i.size;
1225         unsigned n_bios = 0;
1226         unsigned nr_pages = (ds + PAGE_SIZE -1) >> PAGE_SHIFT;
1227         int err = -ENOMEM;
1228
1229         /* In most cases, we will only need one bio.  But in case the lower
1230          * level restrictions happen to be different at this offset on this
1231          * side than those of the sending peer, we may need to submit the
1232          * request in more than one bio.
1233          *
1234          * Plain bio_alloc is good enough here, this is no DRBD internally
1235          * generated bio, but a bio allocated on behalf of the peer.
1236          */
1237 next_bio:
1238         bio = bio_alloc(GFP_NOIO, nr_pages);
1239         if (!bio) {
1240                 dev_err(DEV, "submit_ee: Allocation of a bio failed\n");
1241                 goto fail;
1242         }
1243         /* > peer_req->i.sector, unless this is the first bio */
1244         bio->bi_sector = sector;
1245         bio->bi_bdev = mdev->ldev->backing_bdev;
1246         bio->bi_rw = rw;
1247         bio->bi_private = peer_req;
1248         bio->bi_end_io = drbd_peer_request_endio;
1249
1250         bio->bi_next = bios;
1251         bios = bio;
1252         ++n_bios;
1253
1254         page_chain_for_each(page) {
1255                 unsigned len = min_t(unsigned, ds, PAGE_SIZE);
1256                 if (!bio_add_page(bio, page, len, 0)) {
1257                         /* A single page must always be possible!
1258                          * But in case it fails anyways,
1259                          * we deal with it, and complain (below). */
1260                         if (bio->bi_vcnt == 0) {
1261                                 dev_err(DEV,
1262                                         "bio_add_page failed for len=%u, "
1263                                         "bi_vcnt=0 (bi_sector=%llu)\n",
1264                                         len, (unsigned long long)bio->bi_sector);
1265                                 err = -ENOSPC;
1266                                 goto fail;
1267                         }
1268                         goto next_bio;
1269                 }
1270                 ds -= len;
1271                 sector += len >> 9;
1272                 --nr_pages;
1273         }
1274         D_ASSERT(page == NULL);
1275         D_ASSERT(ds == 0);
1276
1277         atomic_set(&peer_req->pending_bios, n_bios);
1278         do {
1279                 bio = bios;
1280                 bios = bios->bi_next;
1281                 bio->bi_next = NULL;
1282
1283                 drbd_generic_make_request(mdev, fault_type, bio);
1284         } while (bios);
1285         return 0;
1286
1287 fail:
1288         while (bios) {
1289                 bio = bios;
1290                 bios = bios->bi_next;
1291                 bio_put(bio);
1292         }
1293         return err;
1294 }
1295
1296 static void drbd_remove_epoch_entry_interval(struct drbd_conf *mdev,
1297                                              struct drbd_peer_request *peer_req)
1298 {
1299         struct drbd_interval *i = &peer_req->i;
1300
1301         drbd_remove_interval(&mdev->write_requests, i);
1302         drbd_clear_interval(i);
1303
1304         /* Wake up any processes waiting for this peer request to complete.  */
1305         if (i->waiting)
1306                 wake_up(&mdev->misc_wait);
1307 }
1308
1309 static int receive_Barrier(struct drbd_tconn *tconn, struct packet_info *pi)
1310 {
1311         struct drbd_conf *mdev;
1312         int rv;
1313         struct p_barrier *p = pi->data;
1314         struct drbd_epoch *epoch;
1315
1316         mdev = vnr_to_mdev(tconn, pi->vnr);
1317         if (!mdev)
1318                 return -EIO;
1319
1320         inc_unacked(mdev);
1321
1322         mdev->current_epoch->barrier_nr = p->barrier;
1323         rv = drbd_may_finish_epoch(mdev, mdev->current_epoch, EV_GOT_BARRIER_NR);
1324
1325         /* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1326          * the activity log, which means it would not be resynced in case the
1327          * R_PRIMARY crashes now.
1328          * Therefore we must send the barrier_ack after the barrier request was
1329          * completed. */
1330         switch (mdev->write_ordering) {
1331         case WO_none:
1332                 if (rv == FE_RECYCLED)
1333                         return 0;
1334
1335                 /* receiver context, in the writeout path of the other node.
1336                  * avoid potential distributed deadlock */
1337                 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1338                 if (epoch)
1339                         break;
1340                 else
1341                         dev_warn(DEV, "Allocation of an epoch failed, slowing down\n");
1342                         /* Fall through */
1343
1344         case WO_bdev_flush:
1345         case WO_drain_io:
1346                 drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
1347                 drbd_flush(mdev);
1348
1349                 if (atomic_read(&mdev->current_epoch->epoch_size)) {
1350                         epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1351                         if (epoch)
1352                                 break;
1353                 }
1354
1355                 epoch = mdev->current_epoch;
1356                 wait_event(mdev->ee_wait, atomic_read(&epoch->epoch_size) == 0);
1357
1358                 D_ASSERT(atomic_read(&epoch->active) == 0);
1359                 D_ASSERT(epoch->flags == 0);
1360
1361                 return 0;
1362         default:
1363                 dev_err(DEV, "Strangeness in mdev->write_ordering %d\n", mdev->write_ordering);
1364                 return -EIO;
1365         }
1366
1367         epoch->flags = 0;
1368         atomic_set(&epoch->epoch_size, 0);
1369         atomic_set(&epoch->active, 0);
1370
1371         spin_lock(&mdev->epoch_lock);
1372         if (atomic_read(&mdev->current_epoch->epoch_size)) {
1373                 list_add(&epoch->list, &mdev->current_epoch->list);
1374                 mdev->current_epoch = epoch;
1375                 mdev->epochs++;
1376         } else {
1377                 /* The current_epoch got recycled while we allocated this one... */
1378                 kfree(epoch);
1379         }
1380         spin_unlock(&mdev->epoch_lock);
1381
1382         return 0;
1383 }
1384
1385 /* used from receive_RSDataReply (recv_resync_read)
1386  * and from receive_Data */
1387 static struct drbd_peer_request *
1388 read_in_block(struct drbd_conf *mdev, u64 id, sector_t sector,
1389               int data_size) __must_hold(local)
1390 {
1391         const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
1392         struct drbd_peer_request *peer_req;
1393         struct page *page;
1394         int dgs, ds, err;
1395         void *dig_in = mdev->tconn->int_dig_in;
1396         void *dig_vv = mdev->tconn->int_dig_vv;
1397         unsigned long *data;
1398
1399         dgs = 0;
1400         if (mdev->tconn->peer_integrity_tfm) {
1401                 dgs = crypto_hash_digestsize(mdev->tconn->peer_integrity_tfm);
1402                 /*
1403                  * FIXME: Receive the incoming digest into the receive buffer
1404                  *        here, together with its struct p_data?
1405                  */
1406                 err = drbd_recv_all_warn(mdev->tconn, dig_in, dgs);
1407                 if (err)
1408                         return NULL;
1409                 data_size -= dgs;
1410         }
1411
1412         if (!expect(data_size != 0))
1413                 return NULL;
1414         if (!expect(IS_ALIGNED(data_size, 512)))
1415                 return NULL;
1416         if (!expect(data_size <= DRBD_MAX_BIO_SIZE))
1417                 return NULL;
1418
1419         /* even though we trust out peer,
1420          * we sometimes have to double check. */
1421         if (sector + (data_size>>9) > capacity) {
1422                 dev_err(DEV, "request from peer beyond end of local disk: "
1423                         "capacity: %llus < sector: %llus + size: %u\n",
1424                         (unsigned long long)capacity,
1425                         (unsigned long long)sector, data_size);
1426                 return NULL;
1427         }
1428
1429         /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1430          * "criss-cross" setup, that might cause write-out on some other DRBD,
1431          * which in turn might block on the other node at this very place.  */
1432         peer_req = drbd_alloc_peer_req(mdev, id, sector, data_size, GFP_NOIO);
1433         if (!peer_req)
1434                 return NULL;
1435
1436         ds = data_size;
1437         page = peer_req->pages;
1438         page_chain_for_each(page) {
1439                 unsigned len = min_t(int, ds, PAGE_SIZE);
1440                 data = kmap(page);
1441                 err = drbd_recv_all_warn(mdev->tconn, data, len);
1442                 if (drbd_insert_fault(mdev, DRBD_FAULT_RECEIVE)) {
1443                         dev_err(DEV, "Fault injection: Corrupting data on receive\n");
1444                         data[0] = data[0] ^ (unsigned long)-1;
1445                 }
1446                 kunmap(page);
1447                 if (err) {
1448                         drbd_free_peer_req(mdev, peer_req);
1449                         return NULL;
1450                 }
1451                 ds -= len;
1452         }
1453
1454         if (dgs) {
1455                 drbd_csum_ee(mdev, mdev->tconn->peer_integrity_tfm, peer_req, dig_vv);
1456                 if (memcmp(dig_in, dig_vv, dgs)) {
1457                         dev_err(DEV, "Digest integrity check FAILED: %llus +%u\n",
1458                                 (unsigned long long)sector, data_size);
1459                         drbd_free_peer_req(mdev, peer_req);
1460                         return NULL;
1461                 }
1462         }
1463         mdev->recv_cnt += data_size>>9;
1464         return peer_req;
1465 }
1466
1467 /* drbd_drain_block() just takes a data block
1468  * out of the socket input buffer, and discards it.
1469  */
1470 static int drbd_drain_block(struct drbd_conf *mdev, int data_size)
1471 {
1472         struct page *page;
1473         int err = 0;
1474         void *data;
1475
1476         if (!data_size)
1477                 return 0;
1478
1479         page = drbd_alloc_pages(mdev, 1, 1);
1480
1481         data = kmap(page);
1482         while (data_size) {
1483                 unsigned int len = min_t(int, data_size, PAGE_SIZE);
1484
1485                 err = drbd_recv_all_warn(mdev->tconn, data, len);
1486                 if (err)
1487                         break;
1488                 data_size -= len;
1489         }
1490         kunmap(page);
1491         drbd_free_pages(mdev, page, 0);
1492         return err;
1493 }
1494
1495 static int recv_dless_read(struct drbd_conf *mdev, struct drbd_request *req,
1496                            sector_t sector, int data_size)
1497 {
1498         struct bio_vec *bvec;
1499         struct bio *bio;
1500         int dgs, err, i, expect;
1501         void *dig_in = mdev->tconn->int_dig_in;
1502         void *dig_vv = mdev->tconn->int_dig_vv;
1503
1504         dgs = 0;
1505         if (mdev->tconn->peer_integrity_tfm) {
1506                 dgs = crypto_hash_digestsize(mdev->tconn->peer_integrity_tfm);
1507                 err = drbd_recv_all_warn(mdev->tconn, dig_in, dgs);
1508                 if (err)
1509                         return err;
1510                 data_size -= dgs;
1511         }
1512
1513         /* optimistically update recv_cnt.  if receiving fails below,
1514          * we disconnect anyways, and counters will be reset. */
1515         mdev->recv_cnt += data_size>>9;
1516
1517         bio = req->master_bio;
1518         D_ASSERT(sector == bio->bi_sector);
1519
1520         bio_for_each_segment(bvec, bio, i) {
1521                 void *mapped = kmap(bvec->bv_page) + bvec->bv_offset;
1522                 expect = min_t(int, data_size, bvec->bv_len);
1523                 err = drbd_recv_all_warn(mdev->tconn, mapped, expect);
1524                 kunmap(bvec->bv_page);
1525                 if (err)
1526                         return err;
1527                 data_size -= expect;
1528         }
1529
1530         if (dgs) {
1531                 drbd_csum_bio(mdev, mdev->tconn->peer_integrity_tfm, bio, dig_vv);
1532                 if (memcmp(dig_in, dig_vv, dgs)) {
1533                         dev_err(DEV, "Digest integrity check FAILED. Broken NICs?\n");
1534                         return -EINVAL;
1535                 }
1536         }
1537
1538         D_ASSERT(data_size == 0);
1539         return 0;
1540 }
1541
1542 /*
1543  * e_end_resync_block() is called in asender context via
1544  * drbd_finish_peer_reqs().
1545  */
1546 static int e_end_resync_block(struct drbd_work *w, int unused)
1547 {
1548         struct drbd_peer_request *peer_req =
1549                 container_of(w, struct drbd_peer_request, w);
1550         struct drbd_conf *mdev = w->mdev;
1551         sector_t sector = peer_req->i.sector;
1552         int err;
1553
1554         D_ASSERT(drbd_interval_empty(&peer_req->i));
1555
1556         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1557                 drbd_set_in_sync(mdev, sector, peer_req->i.size);
1558                 err = drbd_send_ack(mdev, P_RS_WRITE_ACK, peer_req);
1559         } else {
1560                 /* Record failure to sync */
1561                 drbd_rs_failed_io(mdev, sector, peer_req->i.size);
1562
1563                 err  = drbd_send_ack(mdev, P_NEG_ACK, peer_req);
1564         }
1565         dec_unacked(mdev);
1566
1567         return err;
1568 }
1569
1570 static int recv_resync_read(struct drbd_conf *mdev, sector_t sector, int data_size) __releases(local)
1571 {
1572         struct drbd_peer_request *peer_req;
1573
1574         peer_req = read_in_block(mdev, ID_SYNCER, sector, data_size);
1575         if (!peer_req)
1576                 goto fail;
1577
1578         dec_rs_pending(mdev);
1579
1580         inc_unacked(mdev);
1581         /* corresponding dec_unacked() in e_end_resync_block()
1582          * respective _drbd_clear_done_ee */
1583
1584         peer_req->w.cb = e_end_resync_block;
1585
1586         spin_lock_irq(&mdev->tconn->req_lock);
1587         list_add(&peer_req->w.list, &mdev->sync_ee);
1588         spin_unlock_irq(&mdev->tconn->req_lock);
1589
1590         atomic_add(data_size >> 9, &mdev->rs_sect_ev);
1591         if (drbd_submit_peer_request(mdev, peer_req, WRITE, DRBD_FAULT_RS_WR) == 0)
1592                 return 0;
1593
1594         /* don't care for the reason here */
1595         dev_err(DEV, "submit failed, triggering re-connect\n");
1596         spin_lock_irq(&mdev->tconn->req_lock);
1597         list_del(&peer_req->w.list);
1598         spin_unlock_irq(&mdev->tconn->req_lock);
1599
1600         drbd_free_peer_req(mdev, peer_req);
1601 fail:
1602         put_ldev(mdev);
1603         return -EIO;
1604 }
1605
1606 static struct drbd_request *
1607 find_request(struct drbd_conf *mdev, struct rb_root *root, u64 id,
1608              sector_t sector, bool missing_ok, const char *func)
1609 {
1610         struct drbd_request *req;
1611
1612         /* Request object according to our peer */
1613         req = (struct drbd_request *)(unsigned long)id;
1614         if (drbd_contains_interval(root, sector, &req->i) && req->i.local)
1615                 return req;
1616         if (!missing_ok) {
1617                 dev_err(DEV, "%s: failed to find request %lu, sector %llus\n", func,
1618                         (unsigned long)id, (unsigned long long)sector);
1619         }
1620         return NULL;
1621 }
1622
1623 static int receive_DataReply(struct drbd_tconn *tconn, struct packet_info *pi)
1624 {
1625         struct drbd_conf *mdev;
1626         struct drbd_request *req;
1627         sector_t sector;
1628         int err;
1629         struct p_data *p = pi->data;
1630
1631         mdev = vnr_to_mdev(tconn, pi->vnr);
1632         if (!mdev)
1633                 return -EIO;
1634
1635         sector = be64_to_cpu(p->sector);
1636
1637         spin_lock_irq(&mdev->tconn->req_lock);
1638         req = find_request(mdev, &mdev->read_requests, p->block_id, sector, false, __func__);
1639         spin_unlock_irq(&mdev->tconn->req_lock);
1640         if (unlikely(!req))
1641                 return -EIO;
1642
1643         /* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
1644          * special casing it there for the various failure cases.
1645          * still no race with drbd_fail_pending_reads */
1646         err = recv_dless_read(mdev, req, sector, pi->size);
1647         if (!err)
1648                 req_mod(req, DATA_RECEIVED);
1649         /* else: nothing. handled from drbd_disconnect...
1650          * I don't think we may complete this just yet
1651          * in case we are "on-disconnect: freeze" */
1652
1653         return err;
1654 }
1655
1656 static int receive_RSDataReply(struct drbd_tconn *tconn, struct packet_info *pi)
1657 {
1658         struct drbd_conf *mdev;
1659         sector_t sector;
1660         int err;
1661         struct p_data *p = pi->data;
1662
1663         mdev = vnr_to_mdev(tconn, pi->vnr);
1664         if (!mdev)
1665                 return -EIO;
1666
1667         sector = be64_to_cpu(p->sector);
1668         D_ASSERT(p->block_id == ID_SYNCER);
1669
1670         if (get_ldev(mdev)) {
1671                 /* data is submitted to disk within recv_resync_read.
1672                  * corresponding put_ldev done below on error,
1673                  * or in drbd_peer_request_endio. */
1674                 err = recv_resync_read(mdev, sector, pi->size);
1675         } else {
1676                 if (__ratelimit(&drbd_ratelimit_state))
1677                         dev_err(DEV, "Can not write resync data to local disk.\n");
1678
1679                 err = drbd_drain_block(mdev, pi->size);
1680
1681                 drbd_send_ack_dp(mdev, P_NEG_ACK, p, pi->size);
1682         }
1683
1684         atomic_add(pi->size >> 9, &mdev->rs_sect_in);
1685
1686         return err;
1687 }
1688
1689 static int w_restart_write(struct drbd_work *w, int cancel)
1690 {
1691         struct drbd_request *req = container_of(w, struct drbd_request, w);
1692         struct drbd_conf *mdev = w->mdev;
1693         struct bio *bio;
1694         unsigned long start_time;
1695         unsigned long flags;
1696
1697         spin_lock_irqsave(&mdev->tconn->req_lock, flags);
1698         if (!expect(req->rq_state & RQ_POSTPONED)) {
1699                 spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
1700                 return -EIO;
1701         }
1702         bio = req->master_bio;
1703         start_time = req->start_time;
1704         /* Postponed requests will not have their master_bio completed!  */
1705         __req_mod(req, DISCARD_WRITE, NULL);
1706         spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
1707
1708         while (__drbd_make_request(mdev, bio, start_time))
1709                 /* retry */ ;
1710         return 0;
1711 }
1712
1713 static void restart_conflicting_writes(struct drbd_conf *mdev,
1714                                        sector_t sector, int size)
1715 {
1716         struct drbd_interval *i;
1717         struct drbd_request *req;
1718
1719         drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
1720                 if (!i->local)
1721                         continue;
1722                 req = container_of(i, struct drbd_request, i);
1723                 if (req->rq_state & RQ_LOCAL_PENDING ||
1724                     !(req->rq_state & RQ_POSTPONED))
1725                         continue;
1726                 if (expect(list_empty(&req->w.list))) {
1727                         req->w.mdev = mdev;
1728                         req->w.cb = w_restart_write;
1729                         drbd_queue_work(&mdev->tconn->data.work, &req->w);
1730                 }
1731         }
1732 }
1733
1734 /*
1735  * e_end_block() is called in asender context via drbd_finish_peer_reqs().
1736  */
1737 static int e_end_block(struct drbd_work *w, int cancel)
1738 {
1739         struct drbd_peer_request *peer_req =
1740                 container_of(w, struct drbd_peer_request, w);
1741         struct drbd_conf *mdev = w->mdev;
1742         sector_t sector = peer_req->i.sector;
1743         int err = 0, pcmd;
1744
1745         if (peer_req->flags & EE_SEND_WRITE_ACK) {
1746                 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1747                         pcmd = (mdev->state.conn >= C_SYNC_SOURCE &&
1748                                 mdev->state.conn <= C_PAUSED_SYNC_T &&
1749                                 peer_req->flags & EE_MAY_SET_IN_SYNC) ?
1750                                 P_RS_WRITE_ACK : P_WRITE_ACK;
1751                         err = drbd_send_ack(mdev, pcmd, peer_req);
1752                         if (pcmd == P_RS_WRITE_ACK)
1753                                 drbd_set_in_sync(mdev, sector, peer_req->i.size);
1754                 } else {
1755                         err = drbd_send_ack(mdev, P_NEG_ACK, peer_req);
1756                         /* we expect it to be marked out of sync anyways...
1757                          * maybe assert this?  */
1758                 }
1759                 dec_unacked(mdev);
1760         }
1761         /* we delete from the conflict detection hash _after_ we sent out the
1762          * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right.  */
1763         if (peer_req->flags & EE_IN_INTERVAL_TREE) {
1764                 spin_lock_irq(&mdev->tconn->req_lock);
1765                 D_ASSERT(!drbd_interval_empty(&peer_req->i));
1766                 drbd_remove_epoch_entry_interval(mdev, peer_req);
1767                 if (peer_req->flags & EE_RESTART_REQUESTS)
1768                         restart_conflicting_writes(mdev, sector, peer_req->i.size);
1769                 spin_unlock_irq(&mdev->tconn->req_lock);
1770         } else
1771                 D_ASSERT(drbd_interval_empty(&peer_req->i));
1772
1773         drbd_may_finish_epoch(mdev, peer_req->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
1774
1775         return err;
1776 }
1777
1778 static int e_send_ack(struct drbd_work *w, enum drbd_packet ack)
1779 {
1780         struct drbd_conf *mdev = w->mdev;
1781         struct drbd_peer_request *peer_req =
1782                 container_of(w, struct drbd_peer_request, w);
1783         int err;
1784
1785         err = drbd_send_ack(mdev, ack, peer_req);
1786         dec_unacked(mdev);
1787
1788         return err;
1789 }
1790
1791 static int e_send_discard_write(struct drbd_work *w, int unused)
1792 {
1793         return e_send_ack(w, P_DISCARD_WRITE);
1794 }
1795
1796 static int e_send_retry_write(struct drbd_work *w, int unused)
1797 {
1798         struct drbd_tconn *tconn = w->mdev->tconn;
1799
1800         return e_send_ack(w, tconn->agreed_pro_version >= 100 ?
1801                              P_RETRY_WRITE : P_DISCARD_WRITE);
1802 }
1803
1804 static bool seq_greater(u32 a, u32 b)
1805 {
1806         /*
1807          * We assume 32-bit wrap-around here.
1808          * For 24-bit wrap-around, we would have to shift:
1809          *  a <<= 8; b <<= 8;
1810          */
1811         return (s32)a - (s32)b > 0;
1812 }
1813
1814 static u32 seq_max(u32 a, u32 b)
1815 {
1816         return seq_greater(a, b) ? a : b;
1817 }
1818
1819 static bool need_peer_seq(struct drbd_conf *mdev)
1820 {
1821         struct drbd_tconn *tconn = mdev->tconn;
1822         int tp;
1823
1824         /*
1825          * We only need to keep track of the last packet_seq number of our peer
1826          * if we are in dual-primary mode and we have the discard flag set; see
1827          * handle_write_conflicts().
1828          */
1829
1830         rcu_read_lock();
1831         tp = rcu_dereference(mdev->tconn->net_conf)->two_primaries;
1832         rcu_read_unlock();
1833
1834         return tp && test_bit(DISCARD_CONCURRENT, &tconn->flags);
1835 }
1836
1837 static void update_peer_seq(struct drbd_conf *mdev, unsigned int peer_seq)
1838 {
1839         unsigned int newest_peer_seq;
1840
1841         if (need_peer_seq(mdev)) {
1842                 spin_lock(&mdev->peer_seq_lock);
1843                 newest_peer_seq = seq_max(mdev->peer_seq, peer_seq);
1844                 mdev->peer_seq = newest_peer_seq;
1845                 spin_unlock(&mdev->peer_seq_lock);
1846                 /* wake up only if we actually changed mdev->peer_seq */
1847                 if (peer_seq == newest_peer_seq)
1848                         wake_up(&mdev->seq_wait);
1849         }
1850 }
1851
1852 /* Called from receive_Data.
1853  * Synchronize packets on sock with packets on msock.
1854  *
1855  * This is here so even when a P_DATA packet traveling via sock overtook an Ack
1856  * packet traveling on msock, they are still processed in the order they have
1857  * been sent.
1858  *
1859  * Note: we don't care for Ack packets overtaking P_DATA packets.
1860  *
1861  * In case packet_seq is larger than mdev->peer_seq number, there are
1862  * outstanding packets on the msock. We wait for them to arrive.
1863  * In case we are the logically next packet, we update mdev->peer_seq
1864  * ourselves. Correctly handles 32bit wrap around.
1865  *
1866  * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
1867  * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
1868  * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
1869  * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
1870  *
1871  * returns 0 if we may process the packet,
1872  * -ERESTARTSYS if we were interrupted (by disconnect signal). */
1873 static int wait_for_and_update_peer_seq(struct drbd_conf *mdev, const u32 peer_seq)
1874 {
1875         DEFINE_WAIT(wait);
1876         long timeout;
1877         int ret;
1878
1879         if (!need_peer_seq(mdev))
1880                 return 0;
1881
1882         spin_lock(&mdev->peer_seq_lock);
1883         for (;;) {
1884                 if (!seq_greater(peer_seq - 1, mdev->peer_seq)) {
1885                         mdev->peer_seq = seq_max(mdev->peer_seq, peer_seq);
1886                         ret = 0;
1887                         break;
1888                 }
1889                 if (signal_pending(current)) {
1890                         ret = -ERESTARTSYS;
1891                         break;
1892                 }
1893                 prepare_to_wait(&mdev->seq_wait, &wait, TASK_INTERRUPTIBLE);
1894                 spin_unlock(&mdev->peer_seq_lock);
1895                 rcu_read_lock();
1896                 timeout = rcu_dereference(mdev->tconn->net_conf)->ping_timeo*HZ/10;
1897                 rcu_read_unlock();
1898                 timeout = schedule_timeout(timeout);
1899                 spin_lock(&mdev->peer_seq_lock);
1900                 if (!timeout) {
1901                         ret = -ETIMEDOUT;
1902                         dev_err(DEV, "Timed out waiting for missing ack packets; disconnecting\n");
1903                         break;
1904                 }
1905         }
1906         spin_unlock(&mdev->peer_seq_lock);
1907         finish_wait(&mdev->seq_wait, &wait);
1908         return ret;
1909 }
1910
1911 /* see also bio_flags_to_wire()
1912  * DRBD_REQ_*, because we need to semantically map the flags to data packet
1913  * flags and back. We may replicate to other kernel versions. */
1914 static unsigned long wire_flags_to_bio(struct drbd_conf *mdev, u32 dpf)
1915 {
1916         return  (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
1917                 (dpf & DP_FUA ? REQ_FUA : 0) |
1918                 (dpf & DP_FLUSH ? REQ_FLUSH : 0) |
1919                 (dpf & DP_DISCARD ? REQ_DISCARD : 0);
1920 }
1921
1922 static void fail_postponed_requests(struct drbd_conf *mdev, sector_t sector,
1923                                     unsigned int size)
1924 {
1925         struct drbd_interval *i;
1926
1927     repeat:
1928         drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
1929                 struct drbd_request *req;
1930                 struct bio_and_error m;
1931
1932                 if (!i->local)
1933                         continue;
1934                 req = container_of(i, struct drbd_request, i);
1935                 if (!(req->rq_state & RQ_POSTPONED))
1936                         continue;
1937                 req->rq_state &= ~RQ_POSTPONED;
1938                 __req_mod(req, NEG_ACKED, &m);
1939                 spin_unlock_irq(&mdev->tconn->req_lock);
1940                 if (m.bio)
1941                         complete_master_bio(mdev, &m);
1942                 spin_lock_irq(&mdev->tconn->req_lock);
1943                 goto repeat;
1944         }
1945 }
1946
1947 static int handle_write_conflicts(struct drbd_conf *mdev,
1948                                   struct drbd_peer_request *peer_req)
1949 {
1950         struct drbd_tconn *tconn = mdev->tconn;
1951         bool resolve_conflicts = test_bit(DISCARD_CONCURRENT, &tconn->flags);
1952         sector_t sector = peer_req->i.sector;
1953         const unsigned int size = peer_req->i.size;
1954         struct drbd_interval *i;
1955         bool equal;
1956         int err;
1957
1958         /*
1959          * Inserting the peer request into the write_requests tree will prevent
1960          * new conflicting local requests from being added.
1961          */
1962         drbd_insert_interval(&mdev->write_requests, &peer_req->i);
1963
1964     repeat:
1965         drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
1966                 if (i == &peer_req->i)
1967                         continue;
1968
1969                 if (!i->local) {
1970                         /*
1971                          * Our peer has sent a conflicting remote request; this
1972                          * should not happen in a two-node setup.  Wait for the
1973                          * earlier peer request to complete.
1974                          */
1975                         err = drbd_wait_misc(mdev, i);
1976                         if (err)
1977                                 goto out;
1978                         goto repeat;
1979                 }
1980
1981                 equal = i->sector == sector && i->size == size;
1982                 if (resolve_conflicts) {
1983                         /*
1984                          * If the peer request is fully contained within the
1985                          * overlapping request, it can be discarded; otherwise,
1986                          * it will be retried once all overlapping requests
1987                          * have completed.
1988                          */
1989                         bool discard = i->sector <= sector && i->sector +
1990                                        (i->size >> 9) >= sector + (size >> 9);
1991
1992                         if (!equal)
1993                                 dev_alert(DEV, "Concurrent writes detected: "
1994                                                "local=%llus +%u, remote=%llus +%u, "
1995                                                "assuming %s came first\n",
1996                                           (unsigned long long)i->sector, i->size,
1997                                           (unsigned long long)sector, size,
1998                                           discard ? "local" : "remote");
1999
2000                         inc_unacked(mdev);
2001                         peer_req->w.cb = discard ? e_send_discard_write :
2002                                                    e_send_retry_write;
2003                         list_add_tail(&peer_req->w.list, &mdev->done_ee);
2004                         wake_asender(mdev->tconn);
2005
2006                         err = -ENOENT;
2007                         goto out;
2008                 } else {
2009                         struct drbd_request *req =
2010                                 container_of(i, struct drbd_request, i);
2011
2012                         if (!equal)
2013                                 dev_alert(DEV, "Concurrent writes detected: "
2014                                                "local=%llus +%u, remote=%llus +%u\n",
2015                                           (unsigned long long)i->sector, i->size,
2016                                           (unsigned long long)sector, size);
2017
2018                         if (req->rq_state & RQ_LOCAL_PENDING ||
2019                             !(req->rq_state & RQ_POSTPONED)) {
2020                                 /*
2021                                  * Wait for the node with the discard flag to
2022                                  * decide if this request will be discarded or
2023                                  * retried.  Requests that are discarded will
2024                                  * disappear from the write_requests tree.
2025                                  *
2026                                  * In addition, wait for the conflicting
2027                                  * request to finish locally before submitting
2028                                  * the conflicting peer request.
2029                                  */
2030                                 err = drbd_wait_misc(mdev, &req->i);
2031                                 if (err) {
2032                                         _conn_request_state(mdev->tconn,
2033                                                             NS(conn, C_TIMEOUT),
2034                                                             CS_HARD);
2035                                         fail_postponed_requests(mdev, sector, size);
2036                                         goto out;
2037                                 }
2038                                 goto repeat;
2039                         }
2040                         /*
2041                          * Remember to restart the conflicting requests after
2042                          * the new peer request has completed.
2043                          */
2044                         peer_req->flags |= EE_RESTART_REQUESTS;
2045                 }
2046         }
2047         err = 0;
2048
2049     out:
2050         if (err)
2051                 drbd_remove_epoch_entry_interval(mdev, peer_req);
2052         return err;
2053 }
2054
2055 /* mirrored write */
2056 static int receive_Data(struct drbd_tconn *tconn, struct packet_info *pi)
2057 {
2058         struct drbd_conf *mdev;
2059         sector_t sector;
2060         struct drbd_peer_request *peer_req;
2061         struct p_data *p = pi->data;
2062         u32 peer_seq = be32_to_cpu(p->seq_num);
2063         int rw = WRITE;
2064         u32 dp_flags;
2065         int err, tp;
2066
2067         mdev = vnr_to_mdev(tconn, pi->vnr);
2068         if (!mdev)
2069                 return -EIO;
2070
2071         if (!get_ldev(mdev)) {
2072                 int err2;
2073
2074                 err = wait_for_and_update_peer_seq(mdev, peer_seq);
2075                 drbd_send_ack_dp(mdev, P_NEG_ACK, p, pi->size);
2076                 atomic_inc(&mdev->current_epoch->epoch_size);
2077                 err2 = drbd_drain_block(mdev, pi->size);
2078                 if (!err)
2079                         err = err2;
2080                 return err;
2081         }
2082
2083         /*
2084          * Corresponding put_ldev done either below (on various errors), or in
2085          * drbd_peer_request_endio, if we successfully submit the data at the
2086          * end of this function.
2087          */
2088
2089         sector = be64_to_cpu(p->sector);
2090         peer_req = read_in_block(mdev, p->block_id, sector, pi->size);
2091         if (!peer_req) {
2092                 put_ldev(mdev);
2093                 return -EIO;
2094         }
2095
2096         peer_req->w.cb = e_end_block;
2097
2098         dp_flags = be32_to_cpu(p->dp_flags);
2099         rw |= wire_flags_to_bio(mdev, dp_flags);
2100
2101         if (dp_flags & DP_MAY_SET_IN_SYNC)
2102                 peer_req->flags |= EE_MAY_SET_IN_SYNC;
2103
2104         spin_lock(&mdev->epoch_lock);
2105         peer_req->epoch = mdev->current_epoch;
2106         atomic_inc(&peer_req->epoch->epoch_size);
2107         atomic_inc(&peer_req->epoch->active);
2108         spin_unlock(&mdev->epoch_lock);
2109
2110         rcu_read_lock();
2111         tp = rcu_dereference(mdev->tconn->net_conf)->two_primaries;
2112         rcu_read_unlock();
2113         if (tp) {
2114                 peer_req->flags |= EE_IN_INTERVAL_TREE;
2115                 err = wait_for_and_update_peer_seq(mdev, peer_seq);
2116                 if (err)
2117                         goto out_interrupted;
2118                 spin_lock_irq(&mdev->tconn->req_lock);
2119                 err = handle_write_conflicts(mdev, peer_req);
2120                 if (err) {
2121                         spin_unlock_irq(&mdev->tconn->req_lock);
2122                         if (err == -ENOENT) {
2123                                 put_ldev(mdev);
2124                                 return 0;
2125                         }
2126                         goto out_interrupted;
2127                 }
2128         } else
2129                 spin_lock_irq(&mdev->tconn->req_lock);
2130         list_add(&peer_req->w.list, &mdev->active_ee);
2131         spin_unlock_irq(&mdev->tconn->req_lock);
2132
2133         if (mdev->tconn->agreed_pro_version < 100) {
2134                 rcu_read_lock();
2135                 switch (rcu_dereference(mdev->tconn->net_conf)->wire_protocol) {
2136                 case DRBD_PROT_C:
2137                         dp_flags |= DP_SEND_WRITE_ACK;
2138                         break;
2139                 case DRBD_PROT_B:
2140                         dp_flags |= DP_SEND_RECEIVE_ACK;
2141                         break;
2142                 }
2143                 rcu_read_unlock();
2144         }
2145
2146         if (dp_flags & DP_SEND_WRITE_ACK) {
2147                 peer_req->flags |= EE_SEND_WRITE_ACK;
2148                 inc_unacked(mdev);
2149                 /* corresponding dec_unacked() in e_end_block()
2150                  * respective _drbd_clear_done_ee */
2151         }
2152
2153         if (dp_flags & DP_SEND_RECEIVE_ACK) {
2154                 /* I really don't like it that the receiver thread
2155                  * sends on the msock, but anyways */
2156                 drbd_send_ack(mdev, P_RECV_ACK, peer_req);
2157         }
2158
2159         if (mdev->state.pdsk < D_INCONSISTENT) {
2160                 /* In case we have the only disk of the cluster, */
2161                 drbd_set_out_of_sync(mdev, peer_req->i.sector, peer_req->i.size);
2162                 peer_req->flags |= EE_CALL_AL_COMPLETE_IO;
2163                 peer_req->flags &= ~EE_MAY_SET_IN_SYNC;
2164                 drbd_al_begin_io(mdev, &peer_req->i);
2165         }
2166
2167         err = drbd_submit_peer_request(mdev, peer_req, rw, DRBD_FAULT_DT_WR);
2168         if (!err)
2169                 return 0;
2170
2171         /* don't care for the reason here */
2172         dev_err(DEV, "submit failed, triggering re-connect\n");
2173         spin_lock_irq(&mdev->tconn->req_lock);
2174         list_del(&peer_req->w.list);
2175         drbd_remove_epoch_entry_interval(mdev, peer_req);
2176         spin_unlock_irq(&mdev->tconn->req_lock);
2177         if (peer_req->flags & EE_CALL_AL_COMPLETE_IO)
2178                 drbd_al_complete_io(mdev, &peer_req->i);
2179
2180 out_interrupted:
2181         drbd_may_finish_epoch(mdev, peer_req->epoch, EV_PUT + EV_CLEANUP);
2182         put_ldev(mdev);
2183         drbd_free_peer_req(mdev, peer_req);
2184         return err;
2185 }
2186
2187 /* We may throttle resync, if the lower device seems to be busy,
2188  * and current sync rate is above c_min_rate.
2189  *
2190  * To decide whether or not the lower device is busy, we use a scheme similar
2191  * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
2192  * (more than 64 sectors) of activity we cannot account for with our own resync
2193  * activity, it obviously is "busy".
2194  *
2195  * The current sync rate used here uses only the most recent two step marks,
2196  * to have a short time average so we can react faster.
2197  */
2198 int drbd_rs_should_slow_down(struct drbd_conf *mdev, sector_t sector)
2199 {
2200         struct gendisk *disk = mdev->ldev->backing_bdev->bd_contains->bd_disk;
2201         unsigned long db, dt, dbdt;
2202         struct lc_element *tmp;
2203         int curr_events;
2204         int throttle = 0;
2205         unsigned int c_min_rate;
2206
2207         rcu_read_lock();
2208         c_min_rate = rcu_dereference(mdev->ldev->disk_conf)->c_min_rate;
2209         rcu_read_unlock();
2210
2211         /* feature disabled? */
2212         if (c_min_rate == 0)
2213                 return 0;
2214
2215         spin_lock_irq(&mdev->al_lock);
2216         tmp = lc_find(mdev->resync, BM_SECT_TO_EXT(sector));
2217         if (tmp) {
2218                 struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
2219                 if (test_bit(BME_PRIORITY, &bm_ext->flags)) {
2220                         spin_unlock_irq(&mdev->al_lock);
2221                         return 0;
2222                 }
2223                 /* Do not slow down if app IO is already waiting for this extent */
2224         }
2225         spin_unlock_irq(&mdev->al_lock);
2226
2227         curr_events = (int)part_stat_read(&disk->part0, sectors[0]) +
2228                       (int)part_stat_read(&disk->part0, sectors[1]) -
2229                         atomic_read(&mdev->rs_sect_ev);
2230
2231         if (!mdev->rs_last_events || curr_events - mdev->rs_last_events > 64) {
2232                 unsigned long rs_left;
2233                 int i;
2234
2235                 mdev->rs_last_events = curr_events;
2236
2237                 /* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
2238                  * approx. */
2239                 i = (mdev->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
2240
2241                 if (mdev->state.conn == C_VERIFY_S || mdev->state.conn == C_VERIFY_T)
2242                         rs_left = mdev->ov_left;
2243                 else
2244                         rs_left = drbd_bm_total_weight(mdev) - mdev->rs_failed;
2245
2246                 dt = ((long)jiffies - (long)mdev->rs_mark_time[i]) / HZ;
2247                 if (!dt)
2248                         dt++;
2249                 db = mdev->rs_mark_left[i] - rs_left;
2250                 dbdt = Bit2KB(db/dt);
2251
2252                 if (dbdt > c_min_rate)
2253                         throttle = 1;
2254         }
2255         return throttle;
2256 }
2257
2258
2259 static int receive_DataRequest(struct drbd_tconn *tconn, struct packet_info *pi)
2260 {
2261         struct drbd_conf *mdev;
2262         sector_t sector;
2263         sector_t capacity;
2264         struct drbd_peer_request *peer_req;
2265         struct digest_info *di = NULL;
2266         int size, verb;
2267         unsigned int fault_type;
2268         struct p_block_req *p = pi->data;
2269
2270         mdev = vnr_to_mdev(tconn, pi->vnr);
2271         if (!mdev)
2272                 return -EIO;
2273         capacity = drbd_get_capacity(mdev->this_bdev);
2274
2275         sector = be64_to_cpu(p->sector);
2276         size   = be32_to_cpu(p->blksize);
2277
2278         if (size <= 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_BIO_SIZE) {
2279                 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2280                                 (unsigned long long)sector, size);
2281                 return -EINVAL;
2282         }
2283         if (sector + (size>>9) > capacity) {
2284                 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2285                                 (unsigned long long)sector, size);
2286                 return -EINVAL;
2287         }
2288
2289         if (!get_ldev_if_state(mdev, D_UP_TO_DATE)) {
2290                 verb = 1;
2291                 switch (pi->cmd) {
2292                 case P_DATA_REQUEST:
2293                         drbd_send_ack_rp(mdev, P_NEG_DREPLY, p);
2294                         break;
2295                 case P_RS_DATA_REQUEST:
2296                 case P_CSUM_RS_REQUEST:
2297                 case P_OV_REQUEST:
2298                         drbd_send_ack_rp(mdev, P_NEG_RS_DREPLY , p);
2299                         break;
2300                 case P_OV_REPLY:
2301                         verb = 0;
2302                         dec_rs_pending(mdev);
2303                         drbd_send_ack_ex(mdev, P_OV_RESULT, sector, size, ID_IN_SYNC);
2304                         break;
2305                 default:
2306                         BUG();
2307                 }
2308                 if (verb && __ratelimit(&drbd_ratelimit_state))
2309                         dev_err(DEV, "Can not satisfy peer's read request, "
2310                             "no local data.\n");
2311
2312                 /* drain possibly payload */
2313                 return drbd_drain_block(mdev, pi->size);
2314         }
2315
2316         /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2317          * "criss-cross" setup, that might cause write-out on some other DRBD,
2318          * which in turn might block on the other node at this very place.  */
2319         peer_req = drbd_alloc_peer_req(mdev, p->block_id, sector, size, GFP_NOIO);
2320         if (!peer_req) {
2321                 put_ldev(mdev);
2322                 return -ENOMEM;
2323         }
2324
2325         switch (pi->cmd) {
2326         case P_DATA_REQUEST:
2327                 peer_req->w.cb = w_e_end_data_req;
2328                 fault_type = DRBD_FAULT_DT_RD;
2329                 /* application IO, don't drbd_rs_begin_io */
2330                 goto submit;
2331
2332         case P_RS_DATA_REQUEST:
2333                 peer_req->w.cb = w_e_end_rsdata_req;
2334                 fault_type = DRBD_FAULT_RS_RD;
2335                 /* used in the sector offset progress display */
2336                 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
2337                 break;
2338
2339         case P_OV_REPLY:
2340         case P_CSUM_RS_REQUEST:
2341                 fault_type = DRBD_FAULT_RS_RD;
2342                 di = kmalloc(sizeof(*di) + pi->size, GFP_NOIO);
2343                 if (!di)
2344                         goto out_free_e;
2345
2346                 di->digest_size = pi->size;
2347                 di->digest = (((char *)di)+sizeof(struct digest_info));
2348
2349                 peer_req->digest = di;
2350                 peer_req->flags |= EE_HAS_DIGEST;
2351
2352                 if (drbd_recv_all(mdev->tconn, di->digest, pi->size))
2353                         goto out_free_e;
2354
2355                 if (pi->cmd == P_CSUM_RS_REQUEST) {
2356                         D_ASSERT(mdev->tconn->agreed_pro_version >= 89);
2357                         peer_req->w.cb = w_e_end_csum_rs_req;
2358                         /* used in the sector offset progress display */
2359                         mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
2360                 } else if (pi->cmd == P_OV_REPLY) {
2361                         /* track progress, we may need to throttle */
2362                         atomic_add(size >> 9, &mdev->rs_sect_in);
2363                         peer_req->w.cb = w_e_end_ov_reply;
2364                         dec_rs_pending(mdev);
2365                         /* drbd_rs_begin_io done when we sent this request,
2366                          * but accounting still needs to be done. */
2367                         goto submit_for_resync;
2368                 }
2369                 break;
2370
2371         case P_OV_REQUEST:
2372                 if (mdev->ov_start_sector == ~(sector_t)0 &&
2373                     mdev->tconn->agreed_pro_version >= 90) {
2374                         unsigned long now = jiffies;
2375                         int i;
2376                         mdev->ov_start_sector = sector;
2377                         mdev->ov_position = sector;
2378                         mdev->ov_left = drbd_bm_bits(mdev) - BM_SECT_TO_BIT(sector);
2379                         mdev->rs_total = mdev->ov_left;
2380                         for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2381                                 mdev->rs_mark_left[i] = mdev->ov_left;
2382                                 mdev->rs_mark_time[i] = now;
2383                         }
2384                         dev_info(DEV, "Online Verify start sector: %llu\n",
2385                                         (unsigned long long)sector);
2386                 }
2387                 peer_req->w.cb = w_e_end_ov_req;
2388                 fault_type = DRBD_FAULT_RS_RD;
2389                 break;
2390
2391         default:
2392                 BUG();
2393         }
2394
2395         /* Throttle, drbd_rs_begin_io and submit should become asynchronous
2396          * wrt the receiver, but it is not as straightforward as it may seem.
2397          * Various places in the resync start and stop logic assume resync
2398          * requests are processed in order, requeuing this on the worker thread
2399          * introduces a bunch of new code for synchronization between threads.
2400          *
2401          * Unlimited throttling before drbd_rs_begin_io may stall the resync
2402          * "forever", throttling after drbd_rs_begin_io will lock that extent
2403          * for application writes for the same time.  For now, just throttle
2404          * here, where the rest of the code expects the receiver to sleep for
2405          * a while, anyways.
2406          */
2407
2408         /* Throttle before drbd_rs_begin_io, as that locks out application IO;
2409          * this defers syncer requests for some time, before letting at least
2410          * on request through.  The resync controller on the receiving side
2411          * will adapt to the incoming rate accordingly.
2412          *
2413          * We cannot throttle here if remote is Primary/SyncTarget:
2414          * we would also throttle its application reads.
2415          * In that case, throttling is done on the SyncTarget only.
2416          */
2417         if (mdev->state.peer != R_PRIMARY && drbd_rs_should_slow_down(mdev, sector))
2418                 schedule_timeout_uninterruptible(HZ/10);
2419         if (drbd_rs_begin_io(mdev, sector))
2420                 goto out_free_e;
2421
2422 submit_for_resync:
2423         atomic_add(size >> 9, &mdev->rs_sect_ev);
2424
2425 submit:
2426         inc_unacked(mdev);
2427         spin_lock_irq(&mdev->tconn->req_lock);
2428         list_add_tail(&peer_req->w.list, &mdev->read_ee);
2429         spin_unlock_irq(&mdev->tconn->req_lock);
2430
2431         if (drbd_submit_peer_request(mdev, peer_req, READ, fault_type) == 0)
2432                 return 0;
2433
2434         /* don't care for the reason here */
2435         dev_err(DEV, "submit failed, triggering re-connect\n");
2436         spin_lock_irq(&mdev->tconn->req_lock);
2437         list_del(&peer_req->w.list);
2438         spin_unlock_irq(&mdev->tconn->req_lock);
2439         /* no drbd_rs_complete_io(), we are dropping the connection anyways */
2440
2441 out_free_e:
2442         put_ldev(mdev);
2443         drbd_free_peer_req(mdev, peer_req);
2444         return -EIO;
2445 }
2446
2447 static int drbd_asb_recover_0p(struct drbd_conf *mdev) __must_hold(local)
2448 {
2449         int self, peer, rv = -100;
2450         unsigned long ch_self, ch_peer;
2451         enum drbd_after_sb_p after_sb_0p;
2452
2453         self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2454         peer = mdev->p_uuid[UI_BITMAP] & 1;
2455
2456         ch_peer = mdev->p_uuid[UI_SIZE];
2457         ch_self = mdev->comm_bm_set;
2458
2459         rcu_read_lock();
2460         after_sb_0p = rcu_dereference(mdev->tconn->net_conf)->after_sb_0p;
2461         rcu_read_unlock();
2462         switch (after_sb_0p) {
2463         case ASB_CONSENSUS:
2464         case ASB_DISCARD_SECONDARY:
2465         case ASB_CALL_HELPER:
2466         case ASB_VIOLENTLY:
2467                 dev_err(DEV, "Configuration error.\n");
2468                 break;
2469         case ASB_DISCONNECT:
2470                 break;
2471         case ASB_DISCARD_YOUNGER_PRI:
2472                 if (self == 0 && peer == 1) {
2473                         rv = -1;
2474                         break;
2475                 }
2476                 if (self == 1 && peer == 0) {
2477                         rv =  1;
2478                         break;
2479                 }
2480                 /* Else fall through to one of the other strategies... */
2481         case ASB_DISCARD_OLDER_PRI:
2482                 if (self == 0 && peer == 1) {
2483                         rv = 1;
2484                         break;
2485                 }
2486                 if (self == 1 && peer == 0) {
2487                         rv = -1;
2488                         break;
2489                 }
2490                 /* Else fall through to one of the other strategies... */
2491                 dev_warn(DEV, "Discard younger/older primary did not find a decision\n"
2492                      "Using discard-least-changes instead\n");
2493         case ASB_DISCARD_ZERO_CHG:
2494                 if (ch_peer == 0 && ch_self == 0) {
2495                         rv = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags)
2496                                 ? -1 : 1;
2497                         break;
2498                 } else {
2499                         if (ch_peer == 0) { rv =  1; break; }
2500                         if (ch_self == 0) { rv = -1; break; }
2501                 }
2502                 if (after_sb_0p == ASB_DISCARD_ZERO_CHG)
2503                         break;
2504         case ASB_DISCARD_LEAST_CHG:
2505                 if      (ch_self < ch_peer)
2506                         rv = -1;
2507                 else if (ch_self > ch_peer)
2508                         rv =  1;
2509                 else /* ( ch_self == ch_peer ) */
2510                      /* Well, then use something else. */
2511                         rv = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags)
2512                                 ? -1 : 1;
2513                 break;
2514         case ASB_DISCARD_LOCAL:
2515                 rv = -1;
2516                 break;
2517         case ASB_DISCARD_REMOTE:
2518                 rv =  1;
2519         }
2520
2521         return rv;
2522 }
2523
2524 static int drbd_asb_recover_1p(struct drbd_conf *mdev) __must_hold(local)
2525 {
2526         int hg, rv = -100;
2527         enum drbd_after_sb_p after_sb_1p;
2528
2529         rcu_read_lock();
2530         after_sb_1p = rcu_dereference(mdev->tconn->net_conf)->after_sb_1p;
2531         rcu_read_unlock();
2532         switch (after_sb_1p) {
2533         case ASB_DISCARD_YOUNGER_PRI:
2534         case ASB_DISCARD_OLDER_PRI:
2535         case ASB_DISCARD_LEAST_CHG:
2536         case ASB_DISCARD_LOCAL:
2537         case ASB_DISCARD_REMOTE:
2538         case ASB_DISCARD_ZERO_CHG:
2539                 dev_err(DEV, "Configuration error.\n");
2540                 break;
2541         case ASB_DISCONNECT:
2542                 break;
2543         case ASB_CONSENSUS:
2544                 hg = drbd_asb_recover_0p(mdev);
2545                 if (hg == -1 && mdev->state.role == R_SECONDARY)
2546                         rv = hg;
2547                 if (hg == 1  && mdev->state.role == R_PRIMARY)
2548                         rv = hg;
2549                 break;
2550         case ASB_VIOLENTLY:
2551                 rv = drbd_asb_recover_0p(mdev);
2552                 break;
2553         case ASB_DISCARD_SECONDARY:
2554                 return mdev->state.role == R_PRIMARY ? 1 : -1;
2555         case ASB_CALL_HELPER:
2556                 hg = drbd_asb_recover_0p(mdev);
2557                 if (hg == -1 && mdev->state.role == R_PRIMARY) {
2558                         enum drbd_state_rv rv2;
2559
2560                         drbd_set_role(mdev, R_SECONDARY, 0);
2561                          /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2562                           * we might be here in C_WF_REPORT_PARAMS which is transient.
2563                           * we do not need to wait for the after state change work either. */
2564                         rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2565                         if (rv2 != SS_SUCCESS) {
2566                                 drbd_khelper(mdev, "pri-lost-after-sb");
2567                         } else {
2568                                 dev_warn(DEV, "Successfully gave up primary role.\n");
2569                                 rv = hg;
2570                         }
2571                 } else
2572                         rv = hg;
2573         }
2574
2575         return rv;
2576 }
2577
2578 static int drbd_asb_recover_2p(struct drbd_conf *mdev) __must_hold(local)
2579 {
2580         int hg, rv = -100;
2581         enum drbd_after_sb_p after_sb_2p;
2582
2583         rcu_read_lock();
2584         after_sb_2p = rcu_dereference(mdev->tconn->net_conf)->after_sb_2p;
2585         rcu_read_unlock();
2586         switch (after_sb_2p) {
2587         case ASB_DISCARD_YOUNGER_PRI:
2588         case ASB_DISCARD_OLDER_PRI:
2589         case ASB_DISCARD_LEAST_CHG:
2590         case ASB_DISCARD_LOCAL:
2591         case ASB_DISCARD_REMOTE:
2592         case ASB_CONSENSUS:
2593         case ASB_DISCARD_SECONDARY:
2594         case ASB_DISCARD_ZERO_CHG:
2595                 dev_err(DEV, "Configuration error.\n");
2596                 break;
2597         case ASB_VIOLENTLY:
2598                 rv = drbd_asb_recover_0p(mdev);
2599                 break;
2600         case ASB_DISCONNECT:
2601                 break;
2602         case ASB_CALL_HELPER:
2603                 hg = drbd_asb_recover_0p(mdev);
2604                 if (hg == -1) {
2605                         enum drbd_state_rv rv2;
2606
2607                          /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2608                           * we might be here in C_WF_REPORT_PARAMS which is transient.
2609                           * we do not need to wait for the after state change work either. */
2610                         rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2611                         if (rv2 != SS_SUCCESS) {
2612                                 drbd_khelper(mdev, "pri-lost-after-sb");
2613                         } else {
2614                                 dev_warn(DEV, "Successfully gave up primary role.\n");
2615                                 rv = hg;
2616                         }
2617                 } else
2618                         rv = hg;
2619         }
2620
2621         return rv;
2622 }
2623
2624 static void drbd_uuid_dump(struct drbd_conf *mdev, char *text, u64 *uuid,
2625                            u64 bits, u64 flags)
2626 {
2627         if (!uuid) {
2628                 dev_info(DEV, "%s uuid info vanished while I was looking!\n", text);
2629                 return;
2630         }
2631         dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
2632              text,
2633              (unsigned long long)uuid[UI_CURRENT],
2634              (unsigned long long)uuid[UI_BITMAP],
2635              (unsigned long long)uuid[UI_HISTORY_START],
2636              (unsigned long long)uuid[UI_HISTORY_END],
2637              (unsigned long long)bits,
2638              (unsigned long long)flags);
2639 }
2640
2641 /*
2642   100   after split brain try auto recover
2643     2   C_SYNC_SOURCE set BitMap
2644     1   C_SYNC_SOURCE use BitMap
2645     0   no Sync
2646    -1   C_SYNC_TARGET use BitMap
2647    -2   C_SYNC_TARGET set BitMap
2648  -100   after split brain, disconnect
2649 -1000   unrelated data
2650 -1091   requires proto 91
2651 -1096   requires proto 96
2652  */
2653 static int drbd_uuid_compare(struct drbd_conf *mdev, int *rule_nr) __must_hold(local)
2654 {
2655         u64 self, peer;
2656         int i, j;
2657
2658         self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2659         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2660
2661         *rule_nr = 10;
2662         if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
2663                 return 0;
2664
2665         *rule_nr = 20;
2666         if ((self == UUID_JUST_CREATED || self == (u64)0) &&
2667              peer != UUID_JUST_CREATED)
2668                 return -2;
2669
2670         *rule_nr = 30;
2671         if (self != UUID_JUST_CREATED &&
2672             (peer == UUID_JUST_CREATED || peer == (u64)0))
2673                 return 2;
2674
2675         if (self == peer) {
2676                 int rct, dc; /* roles at crash time */
2677
2678                 if (mdev->p_uuid[UI_BITMAP] == (u64)0 && mdev->ldev->md.uuid[UI_BITMAP] != (u64)0) {
2679
2680                         if (mdev->tconn->agreed_pro_version < 91)
2681                                 return -1091;
2682
2683                         if ((mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
2684                             (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
2685                                 dev_info(DEV, "was SyncSource, missed the resync finished event, corrected myself:\n");
2686                                 drbd_uuid_set_bm(mdev, 0UL);
2687
2688                                 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2689                                                mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2690                                 *rule_nr = 34;
2691                         } else {
2692                                 dev_info(DEV, "was SyncSource (peer failed to write sync_uuid)\n");
2693                                 *rule_nr = 36;
2694                         }
2695
2696                         return 1;
2697                 }
2698
2699                 if (mdev->ldev->md.uuid[UI_BITMAP] == (u64)0 && mdev->p_uuid[UI_BITMAP] != (u64)0) {
2700
2701                         if (mdev->tconn->agreed_pro_version < 91)
2702                                 return -1091;
2703
2704                         if ((mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_BITMAP] & ~((u64)1)) &&
2705                             (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
2706                                 dev_info(DEV, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
2707
2708                                 mdev->p_uuid[UI_HISTORY_START + 1] = mdev->p_uuid[UI_HISTORY_START];
2709                                 mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_BITMAP];
2710                                 mdev->p_uuid[UI_BITMAP] = 0UL;
2711
2712                                 drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2713                                 *rule_nr = 35;
2714                         } else {
2715                                 dev_info(DEV, "was SyncTarget (failed to write sync_uuid)\n");
2716                                 *rule_nr = 37;
2717                         }
2718
2719                         return -1;
2720                 }
2721
2722                 /* Common power [off|failure] */
2723                 rct = (test_bit(CRASHED_PRIMARY, &mdev->flags) ? 1 : 0) +
2724                         (mdev->p_uuid[UI_FLAGS] & 2);
2725                 /* lowest bit is set when we were primary,
2726                  * next bit (weight 2) is set when peer was primary */
2727                 *rule_nr = 40;
2728
2729                 switch (rct) {
2730                 case 0: /* !self_pri && !peer_pri */ return 0;
2731                 case 1: /*  self_pri && !peer_pri */ return 1;
2732                 case 2: /* !self_pri &&  peer_pri */ return -1;
2733                 case 3: /*  self_pri &&  peer_pri */
2734                         dc = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags);
2735                         return dc ? -1 : 1;
2736                 }
2737         }
2738
2739         *rule_nr = 50;
2740         peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2741         if (self == peer)
2742                 return -1;
2743
2744         *rule_nr = 51;
2745         peer = mdev->p_uuid[UI_HISTORY_START] & ~((u64)1);
2746         if (self == peer) {
2747                 if (mdev->tconn->agreed_pro_version < 96 ?
2748                     (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
2749                     (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
2750                     peer + UUID_NEW_BM_OFFSET == (mdev->p_uuid[UI_BITMAP] & ~((u64)1))) {
2751                         /* The last P_SYNC_UUID did not get though. Undo the last start of
2752                            resync as sync source modifications of the peer's UUIDs. */
2753
2754                         if (mdev->tconn->agreed_pro_version < 91)
2755                                 return -1091;
2756
2757                         mdev->p_uuid[UI_BITMAP] = mdev->p_uuid[UI_HISTORY_START];
2758                         mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_HISTORY_START + 1];
2759
2760                         dev_info(DEV, "Did not got last syncUUID packet, corrected:\n");
2761                         drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2762
2763                         return -1;
2764                 }
2765         }
2766
2767         *rule_nr = 60;
2768         self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2769         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2770                 peer = mdev->p_uuid[i] & ~((u64)1);
2771                 if (self == peer)
2772                         return -2;
2773         }
2774
2775         *rule_nr = 70;
2776         self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2777         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2778         if (self == peer)
2779                 return 1;
2780
2781         *rule_nr = 71;
2782         self = mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2783         if (self == peer) {
2784                 if (mdev->tconn->agreed_pro_version < 96 ?
2785                     (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
2786                     (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
2787                     self + UUID_NEW_BM_OFFSET == (mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
2788                         /* The last P_SYNC_UUID did not get though. Undo the last start of
2789                            resync as sync source modifications of our UUIDs. */
2790
2791                         if (mdev->tconn->agreed_pro_version < 91)
2792                                 return -1091;
2793
2794                         _drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_HISTORY_START]);
2795                         _drbd_uuid_set(mdev, UI_HISTORY_START, mdev->ldev->md.uuid[UI_HISTORY_START + 1]);
2796
2797                         dev_info(DEV, "Last syncUUID did not get through, corrected:\n");
2798                         drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2799                                        mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2800
2801                         return 1;
2802                 }
2803         }
2804
2805
2806         *rule_nr = 80;
2807         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2808         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2809                 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2810                 if (self == peer)
2811                         return 2;
2812         }
2813
2814         *rule_nr = 90;
2815         self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2816         peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2817         if (self == peer && self != ((u64)0))
2818                 return 100;
2819
2820         *rule_nr = 100;
2821         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2822                 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2823                 for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
2824                         peer = mdev->p_uuid[j] & ~((u64)1);
2825                         if (self == peer)
2826                                 return -100;
2827                 }
2828         }
2829
2830         return -1000;
2831 }
2832
2833 /* drbd_sync_handshake() returns the new conn state on success, or
2834    CONN_MASK (-1) on failure.
2835  */
2836 static enum drbd_conns drbd_sync_handshake(struct drbd_conf *mdev, enum drbd_role peer_role,
2837                                            enum drbd_disk_state peer_disk) __must_hold(local)
2838 {
2839         enum drbd_conns rv = C_MASK;
2840         enum drbd_disk_state mydisk;
2841         struct net_conf *nc;
2842         int hg, rule_nr, rr_conflict, dry_run;
2843
2844         mydisk = mdev->state.disk;
2845         if (mydisk == D_NEGOTIATING)
2846                 mydisk = mdev->new_state_tmp.disk;
2847
2848         dev_info(DEV, "drbd_sync_handshake:\n");
2849         drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid, mdev->comm_bm_set, 0);
2850         drbd_uuid_dump(mdev, "peer", mdev->p_uuid,
2851                        mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2852
2853         hg = drbd_uuid_compare(mdev, &rule_nr);
2854
2855         dev_info(DEV, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
2856
2857         if (hg == -1000) {
2858                 dev_alert(DEV, "Unrelated data, aborting!\n");
2859                 return C_MASK;
2860         }
2861         if (hg < -1000) {
2862                 dev_alert(DEV, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
2863                 return C_MASK;
2864         }
2865
2866         if    ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
2867             (peer_disk == D_INCONSISTENT && mydisk    > D_INCONSISTENT)) {
2868                 int f = (hg == -100) || abs(hg) == 2;
2869                 hg = mydisk > D_INCONSISTENT ? 1 : -1;
2870                 if (f)
2871                         hg = hg*2;
2872                 dev_info(DEV, "Becoming sync %s due to disk states.\n",
2873                      hg > 0 ? "source" : "target");
2874         }
2875
2876         if (abs(hg) == 100)
2877                 drbd_khelper(mdev, "initial-split-brain");
2878
2879         rcu_read_lock();
2880         nc = rcu_dereference(mdev->tconn->net_conf);
2881
2882         if (hg == 100 || (hg == -100 && nc->always_asbp)) {
2883                 int pcount = (mdev->state.role == R_PRIMARY)
2884                            + (peer_role == R_PRIMARY);
2885                 int forced = (hg == -100);
2886
2887                 switch (pcount) {
2888                 case 0:
2889                         hg = drbd_asb_recover_0p(mdev);
2890                         break;
2891                 case 1:
2892                         hg = drbd_asb_recover_1p(mdev);
2893                         break;
2894                 case 2:
2895                         hg = drbd_asb_recover_2p(mdev);
2896                         break;
2897                 }
2898                 if (abs(hg) < 100) {
2899                         dev_warn(DEV, "Split-Brain detected, %d primaries, "
2900                              "automatically solved. Sync from %s node\n",
2901                              pcount, (hg < 0) ? "peer" : "this");
2902                         if (forced) {
2903                                 dev_warn(DEV, "Doing a full sync, since"
2904                                      " UUIDs where ambiguous.\n");
2905                                 hg = hg*2;
2906                         }
2907                 }
2908         }
2909
2910         if (hg == -100) {
2911                 if (nc->discard_my_data && !(mdev->p_uuid[UI_FLAGS]&1))
2912                         hg = -1;
2913                 if (!nc->discard_my_data && (mdev->p_uuid[UI_FLAGS]&1))
2914                         hg = 1;
2915
2916                 if (abs(hg) < 100)
2917                         dev_warn(DEV, "Split-Brain detected, manually solved. "
2918                              "Sync from %s node\n",
2919                              (hg < 0) ? "peer" : "this");
2920         }
2921         rr_conflict = nc->rr_conflict;
2922         dry_run = nc->dry_run;
2923         rcu_read_unlock();
2924
2925         if (hg == -100) {
2926                 /* FIXME this log message is not correct if we end up here
2927                  * after an attempted attach on a diskless node.
2928                  * We just refuse to attach -- well, we drop the "connection"
2929                  * to that disk, in a way... */
2930                 dev_alert(DEV, "Split-Brain detected but unresolved, dropping connection!\n");
2931                 drbd_khelper(mdev, "split-brain");
2932                 return C_MASK;
2933         }
2934
2935         if (hg > 0 && mydisk <= D_INCONSISTENT) {
2936                 dev_err(DEV, "I shall become SyncSource, but I am inconsistent!\n");
2937                 return C_MASK;
2938         }
2939
2940         if (hg < 0 && /* by intention we do not use mydisk here. */
2941             mdev->state.role == R_PRIMARY && mdev->state.disk >= D_CONSISTENT) {
2942                 switch (rr_conflict) {
2943                 case ASB_CALL_HELPER:
2944                         drbd_khelper(mdev, "pri-lost");
2945                         /* fall through */
2946                 case ASB_DISCONNECT:
2947                         dev_err(DEV, "I shall become SyncTarget, but I am primary!\n");
2948                         return C_MASK;
2949                 case ASB_VIOLENTLY:
2950                         dev_warn(DEV, "Becoming SyncTarget, violating the stable-data"
2951                              "assumption\n");
2952                 }
2953         }
2954
2955         if (dry_run || test_bit(CONN_DRY_RUN, &mdev->tconn->flags)) {
2956                 if (hg == 0)
2957                         dev_info(DEV, "dry-run connect: No resync, would become Connected immediately.\n");
2958                 else
2959                         dev_info(DEV, "dry-run connect: Would become %s, doing a %s resync.",
2960                                  drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
2961                                  abs(hg) >= 2 ? "full" : "bit-map based");
2962                 return C_MASK;
2963         }
2964
2965         if (abs(hg) >= 2) {
2966                 dev_info(DEV, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
2967                 if (drbd_bitmap_io(mdev, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
2968                                         BM_LOCKED_SET_ALLOWED))
2969                         return C_MASK;
2970         }
2971
2972         if (hg > 0) { /* become sync source. */
2973                 rv = C_WF_BITMAP_S;
2974         } else if (hg < 0) { /* become sync target */
2975                 rv = C_WF_BITMAP_T;
2976         } else {
2977                 rv = C_CONNECTED;
2978                 if (drbd_bm_total_weight(mdev)) {
2979                         dev_info(DEV, "No resync, but %lu bits in bitmap!\n",
2980                              drbd_bm_total_weight(mdev));
2981                 }
2982         }
2983
2984         return rv;
2985 }
2986
2987 /* returns 1 if invalid */
2988 static int cmp_after_sb(enum drbd_after_sb_p peer, enum drbd_after_sb_p self)
2989 {
2990         /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
2991         if ((peer == ASB_DISCARD_REMOTE && self == ASB_DISCARD_LOCAL) ||
2992             (self == ASB_DISCARD_REMOTE && peer == ASB_DISCARD_LOCAL))
2993                 return 0;
2994
2995         /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
2996         if (peer == ASB_DISCARD_REMOTE || peer == ASB_DISCARD_LOCAL ||
2997             self == ASB_DISCARD_REMOTE || self == ASB_DISCARD_LOCAL)
2998                 return 1;
2999
3000         /* everything else is valid if they are equal on both sides. */
3001         if (peer == self)
3002                 return 0;
3003
3004         /* everything es is invalid. */
3005         return 1;
3006 }
3007
3008 static int receive_protocol(struct drbd_tconn *tconn, struct packet_info *pi)
3009 {
3010         struct p_protocol *p = pi->data;
3011         int p_proto, p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
3012         int p_discard_my_data, p_two_primaries, cf;
3013         struct net_conf *nc;
3014
3015         p_proto         = be32_to_cpu(p->protocol);
3016         p_after_sb_0p   = be32_to_cpu(p->after_sb_0p);
3017         p_after_sb_1p   = be32_to_cpu(p->after_sb_1p);
3018         p_after_sb_2p   = be32_to_cpu(p->after_sb_2p);
3019         p_two_primaries = be32_to_cpu(p->two_primaries);
3020         cf              = be32_to_cpu(p->conn_flags);
3021         p_discard_my_data = cf & CF_DISCARD_MY_DATA;
3022
3023         if (tconn->agreed_pro_version >= 87) {
3024                 char integrity_alg[SHARED_SECRET_MAX];
3025                 struct crypto_hash *tfm = NULL;
3026                 int err;
3027
3028                 if (pi->size > sizeof(integrity_alg))
3029                         return -EIO;
3030                 err = drbd_recv_all(tconn, integrity_alg, pi->size);
3031                 if (err)
3032                         return err;
3033                 integrity_alg[SHARED_SECRET_MAX-1] = 0;
3034
3035                 if (integrity_alg[0]) {
3036                         tfm = crypto_alloc_hash(integrity_alg, 0, CRYPTO_ALG_ASYNC);
3037                         if (!tfm) {
3038                                 conn_err(tconn, "peer data-integrity-alg %s not supported\n",
3039                                          integrity_alg);
3040                                 goto disconnect;
3041                         }
3042                         conn_info(tconn, "peer data-integrity-alg: %s\n", integrity_alg);
3043                 }
3044
3045                 if (tconn->peer_integrity_tfm)
3046                         crypto_free_hash(tconn->peer_integrity_tfm);
3047                 tconn->peer_integrity_tfm = tfm;
3048         }
3049
3050         clear_bit(CONN_DRY_RUN, &tconn->flags);
3051
3052         if (cf & CF_DRY_RUN)
3053                 set_bit(CONN_DRY_RUN, &tconn->flags);
3054
3055         rcu_read_lock();
3056         nc = rcu_dereference(tconn->net_conf);
3057
3058         if (p_proto != nc->wire_protocol && tconn->agreed_pro_version < 100) {
3059                 conn_err(tconn, "incompatible communication protocols\n");
3060                 goto disconnect_rcu_unlock;
3061         }
3062
3063         if (cmp_after_sb(p_after_sb_0p, nc->after_sb_0p)) {
3064                 conn_err(tconn, "incompatible after-sb-0pri settings\n");
3065                 goto disconnect_rcu_unlock;
3066         }
3067
3068         if (cmp_after_sb(p_after_sb_1p, nc->after_sb_1p)) {
3069                 conn_err(tconn, "incompatible after-sb-1pri settings\n");
3070                 goto disconnect_rcu_unlock;
3071         }
3072
3073         if (cmp_after_sb(p_after_sb_2p, nc->after_sb_2p)) {
3074                 conn_err(tconn, "incompatible after-sb-2pri settings\n");
3075                 goto disconnect_rcu_unlock;
3076         }
3077
3078         if (p_discard_my_data && nc->discard_my_data) {
3079                 conn_err(tconn, "both sides have the 'discard_my_data' flag set\n");
3080                 goto disconnect_rcu_unlock;
3081         }
3082
3083         if (p_two_primaries != nc->two_primaries) {
3084                 conn_err(tconn, "incompatible setting of the two-primaries options\n");
3085                 goto disconnect_rcu_unlock;
3086         }
3087
3088         rcu_read_unlock();
3089
3090         return 0;
3091
3092 disconnect_rcu_unlock:
3093         rcu_read_unlock();
3094 disconnect:
3095         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3096         return -EIO;
3097 }
3098
3099 /* helper function
3100  * input: alg name, feature name
3101  * return: NULL (alg name was "")
3102  *         ERR_PTR(error) if something goes wrong
3103  *         or the crypto hash ptr, if it worked out ok. */
3104 struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_conf *mdev,
3105                 const char *alg, const char *name)
3106 {
3107         struct crypto_hash *tfm;
3108
3109         if (!alg[0])
3110                 return NULL;
3111
3112         tfm = crypto_alloc_hash(alg, 0, CRYPTO_ALG_ASYNC);
3113         if (IS_ERR(tfm)) {
3114                 dev_err(DEV, "Can not allocate \"%s\" as %s (reason: %ld)\n",
3115                         alg, name, PTR_ERR(tfm));
3116                 return tfm;
3117         }
3118         return tfm;
3119 }
3120
3121 static int ignore_remaining_packet(struct drbd_tconn *tconn, struct packet_info *pi)
3122 {
3123         void *buffer = tconn->data.rbuf;
3124         int size = pi->size;
3125
3126         while (size) {
3127                 int s = min_t(int, size, DRBD_SOCKET_BUFFER_SIZE);
3128                 s = drbd_recv(tconn, buffer, s);
3129                 if (s <= 0) {
3130                         if (s < 0)
3131                                 return s;
3132                         break;
3133                 }
3134                 size -= s;
3135         }
3136         if (size)
3137                 return -EIO;
3138         return 0;
3139 }
3140
3141 /*
3142  * config_unknown_volume  -  device configuration command for unknown volume
3143  *
3144  * When a device is added to an existing connection, the node on which the
3145  * device is added first will send configuration commands to its peer but the
3146  * peer will not know about the device yet.  It will warn and ignore these
3147  * commands.  Once the device is added on the second node, the second node will
3148  * send the same device configuration commands, but in the other direction.
3149  *
3150  * (We can also end up here if drbd is misconfigured.)
3151  */
3152 static int config_unknown_volume(struct drbd_tconn *tconn, struct packet_info *pi)
3153 {
3154         conn_warn(tconn, "Volume %u unknown; ignoring %s packet\n",
3155                   pi->vnr, cmdname(pi->cmd));
3156         return ignore_remaining_packet(tconn, pi);
3157 }
3158
3159 static int receive_SyncParam(struct drbd_tconn *tconn, struct packet_info *pi)
3160 {
3161         struct drbd_conf *mdev;
3162         struct p_rs_param_95 *p;
3163         unsigned int header_size, data_size, exp_max_sz;
3164         struct crypto_hash *verify_tfm = NULL;
3165         struct crypto_hash *csums_tfm = NULL;
3166         struct net_conf *old_net_conf, *new_net_conf = NULL;
3167         struct disk_conf *old_disk_conf = NULL, *new_disk_conf = NULL;
3168         const int apv = tconn->agreed_pro_version;
3169         struct fifo_buffer *old_plan = NULL, *new_plan = NULL;
3170         int fifo_size = 0;
3171         int err;
3172
3173         mdev = vnr_to_mdev(tconn, pi->vnr);
3174         if (!mdev)
3175                 return config_unknown_volume(tconn, pi);
3176
3177         exp_max_sz  = apv <= 87 ? sizeof(struct p_rs_param)
3178                     : apv == 88 ? sizeof(struct p_rs_param)
3179                                         + SHARED_SECRET_MAX
3180                     : apv <= 94 ? sizeof(struct p_rs_param_89)
3181                     : /* apv >= 95 */ sizeof(struct p_rs_param_95);
3182
3183         if (pi->size > exp_max_sz) {
3184                 dev_err(DEV, "SyncParam packet too long: received %u, expected <= %u bytes\n",
3185                     pi->size, exp_max_sz);
3186                 return -EIO;
3187         }
3188
3189         if (apv <= 88) {
3190                 header_size = sizeof(struct p_rs_param);
3191                 data_size = pi->size - header_size;
3192         } else if (apv <= 94) {
3193                 header_size = sizeof(struct p_rs_param_89);
3194                 data_size = pi->size - header_size;
3195                 D_ASSERT(data_size == 0);
3196         } else {
3197                 header_size = sizeof(struct p_rs_param_95);
3198                 data_size = pi->size - header_size;
3199                 D_ASSERT(data_size == 0);
3200         }
3201
3202         /* initialize verify_alg and csums_alg */
3203         p = pi->data;
3204         memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
3205
3206         err = drbd_recv_all(mdev->tconn, p, header_size);
3207         if (err)
3208                 return err;
3209
3210         mutex_lock(&mdev->tconn->conf_update);
3211         old_net_conf = mdev->tconn->net_conf;
3212         if (get_ldev(mdev)) {
3213                 new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3214                 if (!new_disk_conf) {
3215                         put_ldev(mdev);
3216                         mutex_unlock(&mdev->tconn->conf_update);
3217                         dev_err(DEV, "Allocation of new disk_conf failed\n");
3218                         return -ENOMEM;
3219                 }
3220
3221                 old_disk_conf = mdev->ldev->disk_conf;
3222                 *new_disk_conf = *old_disk_conf;
3223
3224                 new_disk_conf->resync_rate = be32_to_cpu(p->resync_rate);
3225         }
3226
3227         if (apv >= 88) {
3228                 if (apv == 88) {
3229                         if (data_size > SHARED_SECRET_MAX) {
3230                                 dev_err(DEV, "verify-alg too long, "
3231                                     "peer wants %u, accepting only %u byte\n",
3232                                                 data_size, SHARED_SECRET_MAX);
3233                                 err = -EIO;
3234                                 goto reconnect;
3235                         }
3236
3237                         err = drbd_recv_all(mdev->tconn, p->verify_alg, data_size);
3238                         if (err)
3239                                 goto reconnect;
3240                         /* we expect NUL terminated string */
3241                         /* but just in case someone tries to be evil */
3242                         D_ASSERT(p->verify_alg[data_size-1] == 0);
3243                         p->verify_alg[data_size-1] = 0;
3244
3245                 } else /* apv >= 89 */ {
3246                         /* we still expect NUL terminated strings */
3247                         /* but just in case someone tries to be evil */
3248                         D_ASSERT(p->verify_alg[SHARED_SECRET_MAX-1] == 0);
3249                         D_ASSERT(p->csums_alg[SHARED_SECRET_MAX-1] == 0);
3250                         p->verify_alg[SHARED_SECRET_MAX-1] = 0;
3251                         p->csums_alg[SHARED_SECRET_MAX-1] = 0;
3252                 }
3253
3254                 if (strcmp(old_net_conf->verify_alg, p->verify_alg)) {
3255                         if (mdev->state.conn == C_WF_REPORT_PARAMS) {
3256                                 dev_err(DEV, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
3257                                     old_net_conf->verify_alg, p->verify_alg);
3258                                 goto disconnect;
3259                         }
3260                         verify_tfm = drbd_crypto_alloc_digest_safe(mdev,
3261                                         p->verify_alg, "verify-alg");
3262                         if (IS_ERR(verify_tfm)) {
3263                                 verify_tfm = NULL;
3264                                 goto disconnect;
3265                         }
3266                 }
3267
3268                 if (apv >= 89 && strcmp(old_net_conf->csums_alg, p->csums_alg)) {
3269                         if (mdev->state.conn == C_WF_REPORT_PARAMS) {
3270                                 dev_err(DEV, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
3271                                     old_net_conf->csums_alg, p->csums_alg);
3272                                 goto disconnect;
3273                         }
3274                         csums_tfm = drbd_crypto_alloc_digest_safe(mdev,
3275                                         p->csums_alg, "csums-alg");
3276                         if (IS_ERR(csums_tfm)) {
3277                                 csums_tfm = NULL;
3278                                 goto disconnect;
3279                         }
3280                 }
3281
3282                 if (apv > 94 && new_disk_conf) {
3283                         new_disk_conf->c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
3284                         new_disk_conf->c_delay_target = be32_to_cpu(p->c_delay_target);
3285                         new_disk_conf->c_fill_target = be32_to_cpu(p->c_fill_target);
3286                         new_disk_conf->c_max_rate = be32_to_cpu(p->c_max_rate);
3287
3288                         fifo_size = (new_disk_conf->c_plan_ahead * 10 * SLEEP_TIME) / HZ;
3289                         if (fifo_size != mdev->rs_plan_s->size) {
3290                                 new_plan = fifo_alloc(fifo_size);
3291                                 if (!new_plan) {
3292                                         dev_err(DEV, "kmalloc of fifo_buffer failed");
3293                                         put_ldev(mdev);
3294                                         goto disconnect;
3295                                 }
3296                         }
3297                 }
3298
3299                 if (verify_tfm || csums_tfm) {
3300                         new_net_conf = kzalloc(sizeof(struct net_conf), GFP_KERNEL);
3301                         if (!new_net_conf) {
3302                                 dev_err(DEV, "Allocation of new net_conf failed\n");
3303                                 goto disconnect;
3304                         }
3305
3306                         *new_net_conf = *old_net_conf;
3307
3308                         if (verify_tfm) {
3309                                 strcpy(new_net_conf->verify_alg, p->verify_alg);
3310                                 new_net_conf->verify_alg_len = strlen(p->verify_alg) + 1;
3311                                 crypto_free_hash(mdev->tconn->verify_tfm);
3312                                 mdev->tconn->verify_tfm = verify_tfm;
3313                                 dev_info(DEV, "using verify-alg: \"%s\"\n", p->verify_alg);
3314                         }
3315                         if (csums_tfm) {
3316                                 strcpy(new_net_conf->csums_alg, p->csums_alg);
3317                                 new_net_conf->csums_alg_len = strlen(p->csums_alg) + 1;
3318                                 crypto_free_hash(mdev->tconn->csums_tfm);
3319                                 mdev->tconn->csums_tfm = csums_tfm;
3320                                 dev_info(DEV, "using csums-alg: \"%s\"\n", p->csums_alg);
3321                         }
3322                         rcu_assign_pointer(tconn->net_conf, new_net_conf);
3323                 }
3324         }
3325
3326         if (new_disk_conf) {
3327                 rcu_assign_pointer(mdev->ldev->disk_conf, new_disk_conf);
3328                 put_ldev(mdev);
3329         }
3330
3331         if (new_plan) {
3332                 old_plan = mdev->rs_plan_s;
3333                 rcu_assign_pointer(mdev->rs_plan_s, new_plan);
3334         }
3335
3336         mutex_unlock(&mdev->tconn->conf_update);
3337         synchronize_rcu();
3338         if (new_net_conf)
3339                 kfree(old_net_conf);
3340         kfree(old_disk_conf);
3341         kfree(old_plan);
3342
3343         return 0;
3344
3345 reconnect:
3346         if (new_disk_conf) {
3347                 put_ldev(mdev);
3348                 kfree(new_disk_conf);
3349         }
3350         mutex_unlock(&mdev->tconn->conf_update);
3351         return -EIO;
3352
3353 disconnect:
3354         kfree(new_plan);
3355         if (new_disk_conf) {
3356                 put_ldev(mdev);
3357                 kfree(new_disk_conf);
3358         }
3359         mutex_unlock(&mdev->tconn->conf_update);
3360         /* just for completeness: actually not needed,
3361          * as this is not reached if csums_tfm was ok. */
3362         crypto_free_hash(csums_tfm);
3363         /* but free the verify_tfm again, if csums_tfm did not work out */
3364         crypto_free_hash(verify_tfm);
3365         conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3366         return -EIO;
3367 }
3368
3369 /* warn if the arguments differ by more than 12.5% */
3370 static void warn_if_differ_considerably(struct drbd_conf *mdev,
3371         const char *s, sector_t a, sector_t b)
3372 {
3373         sector_t d;
3374         if (a == 0 || b == 0)
3375                 return;
3376         d = (a > b) ? (a - b) : (b - a);
3377         if (d > (a>>3) || d > (b>>3))
3378                 dev_warn(DEV, "Considerable difference in %s: %llus vs. %llus\n", s,
3379                      (unsigned long long)a, (unsigned long long)b);
3380 }
3381
3382 static int receive_sizes(struct drbd_tconn *tconn, struct packet_info *pi)
3383 {
3384         struct drbd_conf *mdev;
3385         struct p_sizes *p = pi->data;
3386         enum determine_dev_size dd = unchanged;
3387         sector_t p_size, p_usize, my_usize;
3388         int ldsc = 0; /* local disk size changed */
3389         enum dds_flags ddsf;
3390
3391         mdev = vnr_to_mdev(tconn, pi->vnr);
3392         if (!mdev)
3393                 return config_unknown_volume(tconn, pi);
3394
3395         p_size = be64_to_cpu(p->d_size);
3396         p_usize = be64_to_cpu(p->u_size);
3397
3398         /* just store the peer's disk size for now.
3399          * we still need to figure out whether we accept that. */
3400         mdev->p_size = p_size;
3401
3402         if (get_ldev(mdev)) {
3403                 rcu_read_lock();
3404                 my_usize = rcu_dereference(mdev->ldev->disk_conf)->disk_size;
3405                 rcu_read_unlock();
3406
3407                 warn_if_differ_considerably(mdev, "lower level device sizes",
3408                            p_size, drbd_get_max_capacity(mdev->ldev));
3409                 warn_if_differ_considerably(mdev, "user requested size",
3410                                             p_usize, my_usize);
3411
3412                 /* if this is the first connect, or an otherwise expected
3413                  * param exchange, choose the minimum */
3414                 if (mdev->state.conn == C_WF_REPORT_PARAMS)
3415                         p_usize = min_not_zero(my_usize, p_usize);
3416
3417                 /* Never shrink a device with usable data during connect.
3418                    But allow online shrinking if we are connected. */
3419                 if (drbd_new_dev_size(mdev, mdev->ldev, p_usize, 0) <
3420                     drbd_get_capacity(mdev->this_bdev) &&
3421                     mdev->state.disk >= D_OUTDATED &&
3422                     mdev->state.conn < C_CONNECTED) {
3423                         dev_err(DEV, "The peer's disk size is too small!\n");
3424                         conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3425                         put_ldev(mdev);
3426                         return -EIO;
3427                 }
3428
3429                 if (my_usize != p_usize) {
3430                         struct disk_conf *old_disk_conf, *new_disk_conf = NULL;
3431
3432                         new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3433                         if (!new_disk_conf) {
3434                                 dev_err(DEV, "Allocation of new disk_conf failed\n");
3435                                 put_ldev(mdev);
3436                                 return -ENOMEM;
3437                         }
3438
3439                         mutex_lock(&mdev->tconn->conf_update);
3440                         old_disk_conf = mdev->ldev->disk_conf;
3441                         *new_disk_conf = *old_disk_conf;
3442                         new_disk_conf->disk_size = p_usize;
3443
3444                         rcu_assign_pointer(mdev->ldev->disk_conf, new_disk_conf);
3445                         mutex_unlock(&mdev->tconn->conf_update);
3446                         synchronize_rcu();
3447                         kfree(old_disk_conf);
3448
3449                         dev_info(DEV, "Peer sets u_size to %lu sectors\n",
3450                                  (unsigned long)my_usize);
3451                 }
3452
3453                 put_ldev(mdev);
3454         }
3455
3456         ddsf = be16_to_cpu(p->dds_flags);
3457         if (get_ldev(mdev)) {
3458                 dd = drbd_determine_dev_size(mdev, ddsf);
3459                 put_ldev(mdev);
3460                 if (dd == dev_size_error)
3461                         return -EIO;
3462                 drbd_md_sync(mdev);
3463         } else {
3464                 /* I am diskless, need to accept the peer's size. */
3465                 drbd_set_my_capacity(mdev, p_size);
3466         }
3467
3468         mdev->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
3469         drbd_reconsider_max_bio_size(mdev);
3470
3471         if (get_ldev(mdev)) {
3472                 if (mdev->ldev->known_size != drbd_get_capacity(mdev->ldev->backing_bdev)) {
3473                         mdev->ldev->known_size = drbd_get_capacity(mdev->ldev->backing_bdev);
3474                         ldsc = 1;
3475                 }
3476
3477                 put_ldev(mdev);
3478         }
3479
3480         if (mdev->state.conn > C_WF_REPORT_PARAMS) {
3481                 if (be64_to_cpu(p->c_size) !=
3482                     drbd_get_capacity(mdev->this_bdev) || ldsc) {
3483                         /* we have different sizes, probably peer
3484                          * needs to know my new size... */
3485                         drbd_send_sizes(mdev, 0, ddsf);
3486                 }
3487                 if (test_and_clear_bit(RESIZE_PENDING, &mdev->flags) ||
3488                     (dd == grew && mdev->state.conn == C_CONNECTED)) {
3489                         if (mdev->state.pdsk >= D_INCONSISTENT &&
3490                             mdev->state.disk >= D_INCONSISTENT) {
3491                                 if (ddsf & DDSF_NO_RESYNC)
3492                                         dev_info(DEV, "Resync of new storage suppressed with --assume-clean\n");
3493                                 else
3494                                         resync_after_online_grow(mdev);
3495                         } else
3496                                 set_bit(RESYNC_AFTER_NEG, &mdev->flags);
3497                 }
3498         }
3499
3500         return 0;
3501 }
3502
3503 static int receive_uuids(struct drbd_tconn *tconn, struct packet_info *pi)
3504 {
3505         struct drbd_conf *mdev;
3506         struct p_uuids *p = pi->data;
3507         u64 *p_uuid;
3508         int i, updated_uuids = 0;
3509
3510         mdev = vnr_to_mdev(tconn, pi->vnr);
3511         if (!mdev)
3512                 return config_unknown_volume(tconn, pi);
3513
3514         p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
3515
3516         for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
3517                 p_uuid[i] = be64_to_cpu(p->uuid[i]);
3518
3519         kfree(mdev->p_uuid);
3520         mdev->p_uuid = p_uuid;
3521
3522         if (mdev->state.conn < C_CONNECTED &&
3523             mdev->state.disk < D_INCONSISTENT &&
3524             mdev->state.role == R_PRIMARY &&
3525             (mdev->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
3526                 dev_err(DEV, "Can only connect to data with current UUID=%016llX\n",
3527                     (unsigned long long)mdev->ed_uuid);
3528                 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3529                 return -EIO;
3530         }
3531
3532         if (get_ldev(mdev)) {
3533                 int skip_initial_sync =
3534                         mdev->state.conn == C_CONNECTED &&
3535                         mdev->tconn->agreed_pro_version >= 90 &&
3536                         mdev->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
3537                         (p_uuid[UI_FLAGS] & 8);
3538                 if (skip_initial_sync) {
3539                         dev_info(DEV, "Accepted new current UUID, preparing to skip initial sync\n");
3540                         drbd_bitmap_io(mdev, &drbd_bmio_clear_n_write,
3541                                         "clear_n_write from receive_uuids",
3542                                         BM_LOCKED_TEST_ALLOWED);
3543                         _drbd_uuid_set(mdev, UI_CURRENT, p_uuid[UI_CURRENT]);
3544                         _drbd_uuid_set(mdev, UI_BITMAP, 0);
3545                         _drbd_set_state(_NS2(mdev, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
3546                                         CS_VERBOSE, NULL);
3547                         drbd_md_sync(mdev);
3548                         updated_uuids = 1;
3549                 }
3550                 put_ldev(mdev);
3551         } else if (mdev->state.disk < D_INCONSISTENT &&
3552                    mdev->state.role == R_PRIMARY) {
3553                 /* I am a diskless primary, the peer just created a new current UUID
3554                    for me. */
3555                 updated_uuids = drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3556         }
3557
3558         /* Before we test for the disk state, we should wait until an eventually
3559            ongoing cluster wide state change is finished. That is important if
3560            we are primary and are detaching from our disk. We need to see the
3561            new disk state... */
3562         mutex_lock(mdev->state_mutex);
3563         mutex_unlock(mdev->state_mutex);
3564         if (mdev->state.conn >= C_CONNECTED && mdev->state.disk < D_INCONSISTENT)
3565                 updated_uuids |= drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3566
3567         if (updated_uuids)
3568                 drbd_print_uuids(mdev, "receiver updated UUIDs to");
3569
3570         return 0;
3571 }
3572
3573 /**
3574  * convert_state() - Converts the peer's view of the cluster state to our point of view
3575  * @ps:         The state as seen by the peer.
3576  */
3577 static union drbd_state convert_state(union drbd_state ps)
3578 {
3579         union drbd_state ms;
3580
3581         static enum drbd_conns c_tab[] = {
3582                 [C_CONNECTED] = C_CONNECTED,
3583
3584                 [C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
3585                 [C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
3586                 [C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
3587                 [C_VERIFY_S]       = C_VERIFY_T,
3588                 [C_MASK]   = C_MASK,
3589         };
3590
3591         ms.i = ps.i;
3592
3593         ms.conn = c_tab[ps.conn];
3594         ms.peer = ps.role;
3595         ms.role = ps.peer;
3596         ms.pdsk = ps.disk;
3597         ms.disk = ps.pdsk;
3598         ms.peer_isp = (ps.aftr_isp | ps.user_isp);
3599
3600         return ms;
3601 }
3602
3603 static int receive_req_state(struct drbd_tconn *tconn, struct packet_info *pi)
3604 {
3605         struct drbd_conf *mdev;
3606         struct p_req_state *p = pi->data;
3607         union drbd_state mask, val;
3608         enum drbd_state_rv rv;
3609
3610         mdev = vnr_to_mdev(tconn, pi->vnr);
3611         if (!mdev)
3612                 return -EIO;
3613
3614         mask.i = be32_to_cpu(p->mask);
3615         val.i = be32_to_cpu(p->val);
3616
3617         if (test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags) &&
3618             mutex_is_locked(mdev->state_mutex)) {
3619                 drbd_send_sr_reply(mdev, SS_CONCURRENT_ST_CHG);
3620                 return 0;
3621         }
3622
3623         mask = convert_state(mask);
3624         val = convert_state(val);
3625
3626         rv = drbd_change_state(mdev, CS_VERBOSE, mask, val);
3627         drbd_send_sr_reply(mdev, rv);
3628
3629         drbd_md_sync(mdev);
3630
3631         return 0;
3632 }
3633
3634 static int receive_req_conn_state(struct drbd_tconn *tconn, struct packet_info *pi)
3635 {
3636         struct p_req_state *p = pi->data;
3637         union drbd_state mask, val;
3638         enum drbd_state_rv rv;
3639
3640         mask.i = be32_to_cpu(p->mask);
3641         val.i = be32_to_cpu(p->val);
3642
3643         if (test_bit(DISCARD_CONCURRENT, &tconn->flags) &&
3644             mutex_is_locked(&tconn->cstate_mutex)) {
3645                 conn_send_sr_reply(tconn, SS_CONCURRENT_ST_CHG);
3646                 return 0;
3647         }
3648
3649         mask = convert_state(mask);
3650         val = convert_state(val);
3651
3652         rv = conn_request_state(tconn, mask, val, CS_VERBOSE | CS_LOCAL_ONLY | CS_IGN_OUTD_FAIL);
3653         conn_send_sr_reply(tconn, rv);
3654
3655         return 0;
3656 }
3657
3658 static int receive_state(struct drbd_tconn *tconn, struct packet_info *pi)
3659 {
3660         struct drbd_conf *mdev;
3661         struct p_state *p = pi->data;
3662         union drbd_state os, ns, peer_state;
3663         enum drbd_disk_state real_peer_disk;
3664         enum chg_state_flags cs_flags;
3665         int rv;
3666
3667         mdev = vnr_to_mdev(tconn, pi->vnr);
3668         if (!mdev)
3669                 return config_unknown_volume(tconn, pi);
3670
3671         peer_state.i = be32_to_cpu(p->state);
3672
3673         real_peer_disk = peer_state.disk;
3674         if (peer_state.disk == D_NEGOTIATING) {
3675                 real_peer_disk = mdev->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
3676                 dev_info(DEV, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
3677         }
3678
3679         spin_lock_irq(&mdev->tconn->req_lock);
3680  retry:
3681         os = ns = drbd_read_state(mdev);
3682         spin_unlock_irq(&mdev->tconn->req_lock);
3683
3684         /* peer says his disk is uptodate, while we think it is inconsistent,
3685          * and this happens while we think we have a sync going on. */
3686         if (os.pdsk == D_INCONSISTENT && real_peer_disk == D_UP_TO_DATE &&
3687             os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
3688                 /* If we are (becoming) SyncSource, but peer is still in sync
3689                  * preparation, ignore its uptodate-ness to avoid flapping, it
3690                  * will change to inconsistent once the peer reaches active
3691                  * syncing states.
3692                  * It may have changed syncer-paused flags, however, so we
3693                  * cannot ignore this completely. */
3694                 if (peer_state.conn > C_CONNECTED &&
3695                     peer_state.conn < C_SYNC_SOURCE)
3696                         real_peer_disk = D_INCONSISTENT;
3697
3698                 /* if peer_state changes to connected at the same time,
3699                  * it explicitly notifies us that it finished resync.
3700                  * Maybe we should finish it up, too? */
3701                 else if (os.conn >= C_SYNC_SOURCE &&
3702                          peer_state.conn == C_CONNECTED) {
3703                         if (drbd_bm_total_weight(mdev) <= mdev->rs_failed)
3704                                 drbd_resync_finished(mdev);
3705                         return 0;
3706                 }
3707         }
3708
3709         /* peer says his disk is inconsistent, while we think it is uptodate,
3710          * and this happens while the peer still thinks we have a sync going on,
3711          * but we think we are already done with the sync.
3712          * We ignore this to avoid flapping pdsk.
3713          * This should not happen, if the peer is a recent version of drbd. */
3714         if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
3715             os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
3716                 real_peer_disk = D_UP_TO_DATE;
3717
3718         if (ns.conn == C_WF_REPORT_PARAMS)
3719                 ns.conn = C_CONNECTED;
3720
3721         if (peer_state.conn == C_AHEAD)
3722                 ns.conn = C_BEHIND;
3723
3724         if (mdev->p_uuid && peer_state.disk >= D_NEGOTIATING &&
3725             get_ldev_if_state(mdev, D_NEGOTIATING)) {
3726                 int cr; /* consider resync */
3727
3728                 /* if we established a new connection */
3729                 cr  = (os.conn < C_CONNECTED);
3730                 /* if we had an established connection
3731                  * and one of the nodes newly attaches a disk */
3732                 cr |= (os.conn == C_CONNECTED &&
3733                        (peer_state.disk == D_NEGOTIATING ||
3734                         os.disk == D_NEGOTIATING));
3735                 /* if we have both been inconsistent, and the peer has been
3736                  * forced to be UpToDate with --overwrite-data */
3737                 cr |= test_bit(CONSIDER_RESYNC, &mdev->flags);
3738                 /* if we had been plain connected, and the admin requested to
3739                  * start a sync by "invalidate" or "invalidate-remote" */
3740                 cr |= (os.conn == C_CONNECTED &&
3741                                 (peer_state.conn >= C_STARTING_SYNC_S &&
3742                                  peer_state.conn <= C_WF_BITMAP_T));
3743
3744                 if (cr)
3745                         ns.conn = drbd_sync_handshake(mdev, peer_state.role, real_peer_disk);
3746
3747                 put_ldev(mdev);
3748                 if (ns.conn == C_MASK) {
3749                         ns.conn = C_CONNECTED;
3750                         if (mdev->state.disk == D_NEGOTIATING) {
3751                                 drbd_force_state(mdev, NS(disk, D_FAILED));
3752                         } else if (peer_state.disk == D_NEGOTIATING) {
3753                                 dev_err(DEV, "Disk attach process on the peer node was aborted.\n");
3754                                 peer_state.disk = D_DISKLESS;
3755                                 real_peer_disk = D_DISKLESS;
3756                         } else {
3757                                 if (test_and_clear_bit(CONN_DRY_RUN, &mdev->tconn->flags))
3758                                         return -EIO;
3759                                 D_ASSERT(os.conn == C_WF_REPORT_PARAMS);
3760                                 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3761                                 return -EIO;
3762                         }
3763                 }
3764         }
3765
3766         spin_lock_irq(&mdev->tconn->req_lock);
3767         if (os.i != drbd_read_state(mdev).i)
3768                 goto retry;
3769         clear_bit(CONSIDER_RESYNC, &mdev->flags);
3770         ns.peer = peer_state.role;
3771         ns.pdsk = real_peer_disk;
3772         ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
3773         if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
3774                 ns.disk = mdev->new_state_tmp.disk;
3775         cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
3776         if (ns.pdsk == D_CONSISTENT && drbd_suspended(mdev) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
3777             test_bit(NEW_CUR_UUID, &mdev->flags)) {
3778                 /* Do not allow tl_restart(RESEND) for a rebooted peer. We can only allow this
3779                    for temporal network outages! */
3780                 spin_unlock_irq(&mdev->tconn->req_lock);
3781                 dev_err(DEV, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
3782                 tl_clear(mdev->tconn);
3783                 drbd_uuid_new_current(mdev);
3784                 clear_bit(NEW_CUR_UUID, &mdev->flags);
3785                 conn_request_state(mdev->tconn, NS2(conn, C_PROTOCOL_ERROR, susp, 0), CS_HARD);
3786                 return -EIO;
3787         }
3788         rv = _drbd_set_state(mdev, ns, cs_flags, NULL);
3789         ns = drbd_read_state(mdev);
3790         spin_unlock_irq(&mdev->tconn->req_lock);
3791
3792         if (rv < SS_SUCCESS) {
3793                 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3794                 return -EIO;
3795         }
3796
3797         if (os.conn > C_WF_REPORT_PARAMS) {
3798                 if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
3799                     peer_state.disk != D_NEGOTIATING ) {
3800                         /* we want resync, peer has not yet decided to sync... */
3801                         /* Nowadays only used when forcing a node into primary role and
3802                            setting its disk to UpToDate with that */
3803                         drbd_send_uuids(mdev);
3804                         drbd_send_state(mdev);
3805                 }
3806         }
3807
3808         mutex_lock(&mdev->tconn->conf_update);
3809         mdev->tconn->net_conf->discard_my_data = 0; /* without copy; single bit op is atomic */
3810         mutex_unlock(&mdev->tconn->conf_update);
3811
3812         drbd_md_sync(mdev); /* update connected indicator, la_size, ... */
3813
3814         return 0;
3815 }
3816
3817 static int receive_sync_uuid(struct drbd_tconn *tconn, struct packet_info *pi)
3818 {
3819         struct drbd_conf *mdev;
3820         struct p_rs_uuid *p = pi->data;
3821
3822         mdev = vnr_to_mdev(tconn, pi->vnr);
3823         if (!mdev)
3824                 return -EIO;
3825
3826         wait_event(mdev->misc_wait,
3827                    mdev->state.conn == C_WF_SYNC_UUID ||
3828                    mdev->state.conn == C_BEHIND ||
3829                    mdev->state.conn < C_CONNECTED ||
3830                    mdev->state.disk < D_NEGOTIATING);
3831
3832         /* D_ASSERT( mdev->state.conn == C_WF_SYNC_UUID ); */
3833
3834         /* Here the _drbd_uuid_ functions are right, current should
3835            _not_ be rotated into the history */
3836         if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
3837                 _drbd_uuid_set(mdev, UI_CURRENT, be64_to_cpu(p->uuid));
3838                 _drbd_uuid_set(mdev, UI_BITMAP, 0UL);
3839
3840                 drbd_print_uuids(mdev, "updated sync uuid");
3841                 drbd_start_resync(mdev, C_SYNC_TARGET);
3842
3843                 put_ldev(mdev);
3844         } else
3845                 dev_err(DEV, "Ignoring SyncUUID packet!\n");
3846
3847         return 0;
3848 }
3849
3850 /**
3851  * receive_bitmap_plain
3852  *
3853  * Return 0 when done, 1 when another iteration is needed, and a negative error
3854  * code upon failure.
3855  */
3856 static int
3857 receive_bitmap_plain(struct drbd_conf *mdev, unsigned int size,
3858                      unsigned long *p, struct bm_xfer_ctx *c)
3859 {
3860         unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE -
3861                                  drbd_header_size(mdev->tconn);
3862         unsigned int num_words = min_t(size_t, data_size / sizeof(*p),
3863                                        c->bm_words - c->word_offset);
3864         unsigned int want = num_words * sizeof(*p);
3865         int err;
3866
3867         if (want != size) {
3868                 dev_err(DEV, "%s:want (%u) != size (%u)\n", __func__, want, size);
3869                 return -EIO;
3870         }
3871         if (want == 0)
3872                 return 0;
3873         err = drbd_recv_all(mdev->tconn, p, want);
3874         if (err)
3875                 return err;
3876
3877         drbd_bm_merge_lel(mdev, c->word_offset, num_words, p);
3878
3879         c->word_offset += num_words;
3880         c->bit_offset = c->word_offset * BITS_PER_LONG;
3881         if (c->bit_offset > c->bm_bits)
3882                 c->bit_offset = c->bm_bits;
3883
3884         return 1;
3885 }
3886
3887 static enum drbd_bitmap_code dcbp_get_code(struct p_compressed_bm *p)
3888 {
3889         return (enum drbd_bitmap_code)(p->encoding & 0x0f);
3890 }
3891
3892 static int dcbp_get_start(struct p_compressed_bm *p)
3893 {
3894         return (p->encoding & 0x80) != 0;
3895 }
3896
3897 static int dcbp_get_pad_bits(struct p_compressed_bm *p)
3898 {
3899         return (p->encoding >> 4) & 0x7;
3900 }
3901
3902 /**
3903  * recv_bm_rle_bits
3904  *
3905  * Return 0 when done, 1 when another iteration is needed, and a negative error
3906  * code upon failure.
3907  */
3908 static int
3909 recv_bm_rle_bits(struct drbd_conf *mdev,
3910                 struct p_compressed_bm *p,
3911                  struct bm_xfer_ctx *c,
3912                  unsigned int len)
3913 {
3914         struct bitstream bs;
3915         u64 look_ahead;
3916         u64 rl;
3917         u64 tmp;
3918         unsigned long s = c->bit_offset;
3919         unsigned long e;
3920         int toggle = dcbp_get_start(p);
3921         int have;
3922         int bits;
3923
3924         bitstream_init(&bs, p->code, len, dcbp_get_pad_bits(p));
3925
3926         bits = bitstream_get_bits(&bs, &look_ahead, 64);
3927         if (bits < 0)
3928                 return -EIO;
3929
3930         for (have = bits; have > 0; s += rl, toggle = !toggle) {
3931                 bits = vli_decode_bits(&rl, look_ahead);
3932                 if (bits <= 0)
3933                         return -EIO;
3934
3935                 if (toggle) {
3936                         e = s + rl -1;
3937                         if (e >= c->bm_bits) {
3938                                 dev_err(DEV, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
3939                                 return -EIO;
3940                         }
3941                         _drbd_bm_set_bits(mdev, s, e);
3942                 }
3943
3944                 if (have < bits) {
3945                         dev_err(DEV, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
3946                                 have, bits, look_ahead,
3947                                 (unsigned int)(bs.cur.b - p->code),
3948                                 (unsigned int)bs.buf_len);
3949                         return -EIO;
3950                 }
3951                 look_ahead >>= bits;
3952                 have -= bits;
3953
3954                 bits = bitstream_get_bits(&bs, &tmp, 64 - have);
3955                 if (bits < 0)
3956                         return -EIO;
3957                 look_ahead |= tmp << have;
3958                 have += bits;
3959         }
3960
3961         c->bit_offset = s;
3962         bm_xfer_ctx_bit_to_word_offset(c);
3963
3964         return (s != c->bm_bits);
3965 }
3966
3967 /**
3968  * decode_bitmap_c
3969  *
3970  * Return 0 when done, 1 when another iteration is needed, and a negative error
3971  * code upon failure.
3972  */
3973 static int
3974 decode_bitmap_c(struct drbd_conf *mdev,
3975                 struct p_compressed_bm *p,
3976                 struct bm_xfer_ctx *c,
3977                 unsigned int len)
3978 {
3979         if (dcbp_get_code(p) == RLE_VLI_Bits)
3980                 return recv_bm_rle_bits(mdev, p, c, len - sizeof(*p));
3981
3982         /* other variants had been implemented for evaluation,
3983          * but have been dropped as this one turned out to be "best"
3984          * during all our tests. */
3985
3986         dev_err(DEV, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
3987         conn_request_state(mdev->tconn, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
3988         return -EIO;
3989 }
3990
3991 void INFO_bm_xfer_stats(struct drbd_conf *mdev,
3992                 const char *direction, struct bm_xfer_ctx *c)
3993 {
3994         /* what would it take to transfer it "plaintext" */
3995         unsigned int header_size = drbd_header_size(mdev->tconn);
3996         unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE - header_size;
3997         unsigned int plain =
3998                 header_size * (DIV_ROUND_UP(c->bm_words, data_size) + 1) +
3999                 c->bm_words * sizeof(unsigned long);
4000         unsigned int total = c->bytes[0] + c->bytes[1];
4001         unsigned int r;
4002
4003         /* total can not be zero. but just in case: */
4004         if (total == 0)
4005                 return;
4006
4007         /* don't report if not compressed */
4008         if (total >= plain)
4009                 return;
4010
4011         /* total < plain. check for overflow, still */
4012         r = (total > UINT_MAX/1000) ? (total / (plain/1000))
4013                                     : (1000 * total / plain);
4014
4015         if (r > 1000)
4016                 r = 1000;
4017
4018         r = 1000 - r;
4019         dev_info(DEV, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
4020              "total %u; compression: %u.%u%%\n",
4021                         direction,
4022                         c->bytes[1], c->packets[1],
4023                         c->bytes[0], c->packets[0],
4024                         total, r/10, r % 10);
4025 }
4026
4027 /* Since we are processing the bitfield from lower addresses to higher,
4028    it does not matter if the process it in 32 bit chunks or 64 bit
4029    chunks as long as it is little endian. (Understand it as byte stream,
4030    beginning with the lowest byte...) If we would use big endian
4031    we would need to process it from the highest address to the lowest,
4032    in order to be agnostic to the 32 vs 64 bits issue.
4033
4034    returns 0 on failure, 1 if we successfully received it. */
4035 static int receive_bitmap(struct drbd_tconn *tconn, struct packet_info *pi)
4036 {
4037         struct drbd_conf *mdev;
4038         struct bm_xfer_ctx c;
4039         int err;
4040
4041         mdev = vnr_to_mdev(tconn, pi->vnr);
4042         if (!mdev)
4043                 return -EIO;
4044
4045         drbd_bm_lock(mdev, "receive bitmap", BM_LOCKED_SET_ALLOWED);
4046         /* you are supposed to send additional out-of-sync information
4047          * if you actually set bits during this phase */
4048
4049         c = (struct bm_xfer_ctx) {
4050                 .bm_bits = drbd_bm_bits(mdev),
4051                 .bm_words = drbd_bm_words(mdev),
4052         };
4053
4054         for(;;) {
4055                 if (pi->cmd == P_BITMAP)
4056                         err = receive_bitmap_plain(mdev, pi->size, pi->data, &c);
4057                 else if (pi->cmd == P_COMPRESSED_BITMAP) {
4058                         /* MAYBE: sanity check that we speak proto >= 90,
4059                          * and the feature is enabled! */
4060                         struct p_compressed_bm *p = pi->data;
4061
4062                         if (pi->size > DRBD_SOCKET_BUFFER_SIZE - drbd_header_size(tconn)) {
4063                                 dev_err(DEV, "ReportCBitmap packet too large\n");
4064                                 err = -EIO;
4065                                 goto out;
4066                         }
4067                         if (pi->size <= sizeof(*p)) {
4068                                 dev_err(DEV, "ReportCBitmap packet too small (l:%u)\n", pi->size);
4069                                 err = -EIO;
4070                                 goto out;
4071                         }
4072                         err = drbd_recv_all(mdev->tconn, p, pi->size);
4073                         if (err)
4074                                goto out;
4075                         err = decode_bitmap_c(mdev, p, &c, pi->size);
4076                 } else {
4077                         dev_warn(DEV, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", pi->cmd);
4078                         err = -EIO;
4079                         goto out;
4080                 }
4081
4082                 c.packets[pi->cmd == P_BITMAP]++;
4083                 c.bytes[pi->cmd == P_BITMAP] += drbd_header_size(tconn) + pi->size;
4084
4085                 if (err <= 0) {
4086                         if (err < 0)
4087                                 goto out;
4088                         break;
4089                 }
4090                 err = drbd_recv_header(mdev->tconn, pi);
4091                 if (err)
4092                         goto out;
4093         }
4094
4095         INFO_bm_xfer_stats(mdev, "receive", &c);
4096
4097         if (mdev->state.conn == C_WF_BITMAP_T) {
4098                 enum drbd_state_rv rv;
4099
4100                 err = drbd_send_bitmap(mdev);
4101                 if (err)
4102                         goto out;
4103                 /* Omit CS_ORDERED with this state transition to avoid deadlocks. */
4104                 rv = _drbd_request_state(mdev, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
4105                 D_ASSERT(rv == SS_SUCCESS);
4106         } else if (mdev->state.conn != C_WF_BITMAP_S) {
4107                 /* admin may have requested C_DISCONNECTING,
4108                  * other threads may have noticed network errors */
4109                 dev_info(DEV, "unexpected cstate (%s) in receive_bitmap\n",
4110                     drbd_conn_str(mdev->state.conn));
4111         }
4112         err = 0;
4113
4114  out:
4115         drbd_bm_unlock(mdev);
4116         if (!err && mdev->state.conn == C_WF_BITMAP_S)
4117                 drbd_start_resync(mdev, C_SYNC_SOURCE);
4118         return err;
4119 }
4120
4121 static int receive_skip(struct drbd_tconn *tconn, struct packet_info *pi)
4122 {
4123         conn_warn(tconn, "skipping unknown optional packet type %d, l: %d!\n",
4124                  pi->cmd, pi->size);
4125
4126         return ignore_remaining_packet(tconn, pi);
4127 }
4128
4129 static int receive_UnplugRemote(struct drbd_tconn *tconn, struct packet_info *pi)
4130 {
4131         /* Make sure we've acked all the TCP data associated
4132          * with the data requests being unplugged */
4133         drbd_tcp_quickack(tconn->data.socket);
4134
4135         return 0;
4136 }
4137
4138 static int receive_out_of_sync(struct drbd_tconn *tconn, struct packet_info *pi)
4139 {
4140         struct drbd_conf *mdev;
4141         struct p_block_desc *p = pi->data;
4142
4143         mdev = vnr_to_mdev(tconn, pi->vnr);
4144         if (!mdev)
4145                 return -EIO;
4146
4147         switch (mdev->state.conn) {
4148         case C_WF_SYNC_UUID:
4149         case C_WF_BITMAP_T:
4150         case C_BEHIND:
4151                         break;
4152         default:
4153                 dev_err(DEV, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
4154                                 drbd_conn_str(mdev->state.conn));
4155         }
4156
4157         drbd_set_out_of_sync(mdev, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
4158
4159         return 0;
4160 }
4161
4162 struct data_cmd {
4163         int expect_payload;
4164         size_t pkt_size;
4165         int (*fn)(struct drbd_tconn *, struct packet_info *);
4166 };
4167
4168 static struct data_cmd drbd_cmd_handler[] = {
4169         [P_DATA]            = { 1, sizeof(struct p_data), receive_Data },
4170         [P_DATA_REPLY]      = { 1, sizeof(struct p_data), receive_DataReply },
4171         [P_RS_DATA_REPLY]   = { 1, sizeof(struct p_data), receive_RSDataReply } ,
4172         [P_BARRIER]         = { 0, sizeof(struct p_barrier), receive_Barrier } ,
4173         [P_BITMAP]          = { 1, 0, receive_bitmap } ,
4174         [P_COMPRESSED_BITMAP] = { 1, 0, receive_bitmap } ,
4175         [P_UNPLUG_REMOTE]   = { 0, 0, receive_UnplugRemote },
4176         [P_DATA_REQUEST]    = { 0, sizeof(struct p_block_req), receive_DataRequest },
4177         [P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
4178         [P_SYNC_PARAM]      = { 1, 0, receive_SyncParam },
4179         [P_SYNC_PARAM89]    = { 1, 0, receive_SyncParam },
4180         [P_PROTOCOL]        = { 1, sizeof(struct p_protocol), receive_protocol },
4181         [P_UUIDS]           = { 0, sizeof(struct p_uuids), receive_uuids },
4182         [P_SIZES]           = { 0, sizeof(struct p_sizes), receive_sizes },
4183         [P_STATE]           = { 0, sizeof(struct p_state), receive_state },
4184         [P_STATE_CHG_REQ]   = { 0, sizeof(struct p_req_state), receive_req_state },
4185         [P_SYNC_UUID]       = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
4186         [P_OV_REQUEST]      = { 0, sizeof(struct p_block_req), receive_DataRequest },
4187         [P_OV_REPLY]        = { 1, sizeof(struct p_block_req), receive_DataRequest },
4188         [P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
4189         [P_DELAY_PROBE]     = { 0, sizeof(struct p_delay_probe93), receive_skip },
4190         [P_OUT_OF_SYNC]     = { 0, sizeof(struct p_block_desc), receive_out_of_sync },
4191         [P_CONN_ST_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_conn_state },
4192 };
4193
4194 static void drbdd(struct drbd_tconn *tconn)
4195 {
4196         struct packet_info pi;
4197         size_t shs; /* sub header size */
4198         int err;
4199
4200         while (get_t_state(&tconn->receiver) == RUNNING) {
4201                 struct data_cmd *cmd;
4202
4203                 drbd_thread_current_set_cpu(&tconn->receiver);
4204                 if (drbd_recv_header(tconn, &pi))
4205                         goto err_out;
4206
4207                 cmd = &drbd_cmd_handler[pi.cmd];
4208                 if (unlikely(pi.cmd >= ARRAY_SIZE(drbd_cmd_handler) || !cmd->fn)) {
4209                         conn_err(tconn, "unknown packet type %d, l: %d!\n", pi.cmd, pi.size);
4210                         goto err_out;
4211                 }
4212
4213                 shs = cmd->pkt_size;
4214                 if (pi.size > shs && !cmd->expect_payload) {
4215                         conn_err(tconn, "No payload expected %s l:%d\n", cmdname(pi.cmd), pi.size);
4216                         goto err_out;
4217                 }
4218
4219                 if (shs) {
4220                         err = drbd_recv_all_warn(tconn, pi.data, shs);
4221                         if (err)
4222                                 goto err_out;
4223                         pi.size -= shs;
4224                 }
4225
4226                 err = cmd->fn(tconn, &pi);
4227                 if (err) {
4228                         conn_err(tconn, "error receiving %s, e: %d l: %d!\n",
4229                                  cmdname(pi.cmd), err, pi.size);
4230                         goto err_out;
4231                 }
4232         }
4233         return;
4234
4235     err_out:
4236         conn_request_state(tconn, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4237 }
4238
4239 void conn_flush_workqueue(struct drbd_tconn *tconn)
4240 {
4241         struct drbd_wq_barrier barr;
4242
4243         barr.w.cb = w_prev_work_done;
4244         barr.w.tconn = tconn;
4245         init_completion(&barr.done);
4246         drbd_queue_work(&tconn->data.work, &barr.w);
4247         wait_for_completion(&barr.done);
4248 }
4249
4250 static void conn_disconnect(struct drbd_tconn *tconn)
4251 {
4252         struct drbd_conf *mdev;
4253         enum drbd_conns oc;
4254         int vnr, rv = SS_UNKNOWN_ERROR;
4255
4256         if (tconn->cstate == C_STANDALONE)
4257                 return;
4258
4259         /* asender does not clean up anything. it must not interfere, either */
4260         drbd_thread_stop(&tconn->asender);
4261         drbd_free_sock(tconn);
4262
4263         rcu_read_lock();
4264         idr_for_each_entry(&tconn->volumes, mdev, vnr) {
4265                 kref_get(&mdev->kref);
4266                 rcu_read_unlock();
4267                 drbd_disconnected(mdev);
4268                 kref_put(&mdev->kref, &drbd_minor_destroy);
4269                 rcu_read_lock();
4270         }
4271         rcu_read_unlock();
4272
4273         conn_info(tconn, "Connection closed\n");
4274
4275         if (conn_highest_role(tconn) == R_PRIMARY && conn_highest_pdsk(tconn) >= D_UNKNOWN)
4276                 conn_try_outdate_peer_async(tconn);
4277
4278         spin_lock_irq(&tconn->req_lock);
4279         oc = tconn->cstate;
4280         if (oc >= C_UNCONNECTED)
4281                 rv = _conn_request_state(tconn, NS(conn, C_UNCONNECTED), CS_VERBOSE);
4282
4283         spin_unlock_irq(&tconn->req_lock);
4284
4285         if (oc == C_DISCONNECTING)
4286                 conn_request_state(tconn, NS(conn, C_STANDALONE), CS_VERBOSE | CS_HARD);
4287 }
4288
4289 static int drbd_disconnected(struct drbd_conf *mdev)
4290 {
4291         enum drbd_fencing_p fp;
4292         unsigned int i;
4293
4294         /* wait for current activity to cease. */
4295         spin_lock_irq(&mdev->tconn->req_lock);
4296         _drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
4297         _drbd_wait_ee_list_empty(mdev, &mdev->sync_ee);
4298         _drbd_wait_ee_list_empty(mdev, &mdev->read_ee);
4299         spin_unlock_irq(&mdev->tconn->req_lock);
4300
4301         /* We do not have data structures that would allow us to
4302          * get the rs_pending_cnt down to 0 again.
4303          *  * On C_SYNC_TARGET we do not have any data structures describing
4304          *    the pending RSDataRequest's we have sent.
4305          *  * On C_SYNC_SOURCE there is no data structure that tracks
4306          *    the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
4307          *  And no, it is not the sum of the reference counts in the
4308          *  resync_LRU. The resync_LRU tracks the whole operation including
4309          *  the disk-IO, while the rs_pending_cnt only tracks the blocks
4310          *  on the fly. */
4311         drbd_rs_cancel_all(mdev);
4312         mdev->rs_total = 0;
4313         mdev->rs_failed = 0;
4314         atomic_set(&mdev->rs_pending_cnt, 0);
4315         wake_up(&mdev->misc_wait);
4316
4317         del_timer(&mdev->request_timer);
4318
4319         del_timer_sync(&mdev->resync_timer);
4320         resync_timer_fn((unsigned long)mdev);
4321
4322         /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
4323          * w_make_resync_request etc. which may still be on the worker queue
4324          * to be "canceled" */
4325         drbd_flush_workqueue(mdev);
4326
4327         drbd_finish_peer_reqs(mdev);
4328
4329         kfree(mdev->p_uuid);
4330         mdev->p_uuid = NULL;
4331
4332         if (!drbd_suspended(mdev))
4333                 tl_clear(mdev->tconn);
4334
4335         drbd_md_sync(mdev);
4336
4337         fp = FP_DONT_CARE;
4338         if (get_ldev(mdev)) {
4339                 rcu_read_lock();
4340                 fp = rcu_dereference(mdev->ldev->disk_conf)->fencing;
4341                 rcu_read_unlock();
4342                 put_ldev(mdev);
4343         }
4344
4345         /* serialize with bitmap writeout triggered by the state change,
4346          * if any. */
4347         wait_event(mdev->misc_wait, !test_bit(BITMAP_IO, &mdev->flags));
4348
4349         /* tcp_close and release of sendpage pages can be deferred.  I don't
4350          * want to use SO_LINGER, because apparently it can be deferred for
4351          * more than 20 seconds (longest time I checked).
4352          *
4353          * Actually we don't care for exactly when the network stack does its
4354          * put_page(), but release our reference on these pages right here.
4355          */
4356         i = drbd_free_peer_reqs(mdev, &mdev->net_ee);
4357         if (i)
4358                 dev_info(DEV, "net_ee not empty, killed %u entries\n", i);
4359         i = atomic_read(&mdev->pp_in_use_by_net);
4360         if (i)
4361                 dev_info(DEV, "pp_in_use_by_net = %d, expected 0\n", i);
4362         i = atomic_read(&mdev->pp_in_use);
4363         if (i)
4364                 dev_info(DEV, "pp_in_use = %d, expected 0\n", i);
4365
4366         D_ASSERT(list_empty(&mdev->read_ee));
4367         D_ASSERT(list_empty(&mdev->active_ee));
4368         D_ASSERT(list_empty(&mdev->sync_ee));
4369         D_ASSERT(list_empty(&mdev->done_ee));
4370
4371         /* ok, no more ee's on the fly, it is safe to reset the epoch_size */
4372         atomic_set(&mdev->current_epoch->epoch_size, 0);
4373         D_ASSERT(list_empty(&mdev->current_epoch->list));
4374
4375         return 0;
4376 }
4377
4378 /*
4379  * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
4380  * we can agree on is stored in agreed_pro_version.
4381  *
4382  * feature flags and the reserved array should be enough room for future
4383  * enhancements of the handshake protocol, and possible plugins...
4384  *
4385  * for now, they are expected to be zero, but ignored.
4386  */
4387 static int drbd_send_features(struct drbd_tconn *tconn)
4388 {
4389         struct drbd_socket *sock;
4390         struct p_connection_features *p;
4391
4392         sock = &tconn->data;
4393         p = conn_prepare_command(tconn, sock);
4394         if (!p)
4395                 return -EIO;
4396         memset(p, 0, sizeof(*p));
4397         p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
4398         p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
4399         return conn_send_command(tconn, sock, P_CONNECTION_FEATURES, sizeof(*p), NULL, 0);
4400 }
4401
4402 /*
4403  * return values:
4404  *   1 yes, we have a valid connection
4405  *   0 oops, did not work out, please try again
4406  *  -1 peer talks different language,
4407  *     no point in trying again, please go standalone.
4408  */
4409 static int drbd_do_features(struct drbd_tconn *tconn)
4410 {
4411         /* ASSERT current == tconn->receiver ... */
4412         struct p_connection_features *p;
4413         const int expect = sizeof(struct p_connection_features);
4414         struct packet_info pi;
4415         int err;
4416
4417         err = drbd_send_features(tconn);
4418         if (err)
4419                 return 0;
4420
4421         err = drbd_recv_header(tconn, &pi);
4422         if (err)
4423                 return 0;
4424
4425         if (pi.cmd != P_CONNECTION_FEATURES) {
4426                 conn_err(tconn, "expected ConnectionFeatures packet, received: %s (0x%04x)\n",
4427                      cmdname(pi.cmd), pi.cmd);
4428                 return -1;
4429         }
4430
4431         if (pi.size != expect) {
4432                 conn_err(tconn, "expected ConnectionFeatures length: %u, received: %u\n",
4433                      expect, pi.size);
4434                 return -1;
4435         }
4436
4437         p = pi.data;
4438         err = drbd_recv_all_warn(tconn, p, expect);
4439         if (err)
4440                 return 0;
4441
4442         p->protocol_min = be32_to_cpu(p->protocol_min);
4443         p->protocol_max = be32_to_cpu(p->protocol_max);
4444         if (p->protocol_max == 0)
4445                 p->protocol_max = p->protocol_min;
4446
4447         if (PRO_VERSION_MAX < p->protocol_min ||
4448             PRO_VERSION_MIN > p->protocol_max)
4449                 goto incompat;
4450
4451         tconn->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
4452
4453         conn_info(tconn, "Handshake successful: "
4454              "Agreed network protocol version %d\n", tconn->agreed_pro_version);
4455
4456         return 1;
4457
4458  incompat:
4459         conn_err(tconn, "incompatible DRBD dialects: "
4460             "I support %d-%d, peer supports %d-%d\n",
4461             PRO_VERSION_MIN, PRO_VERSION_MAX,
4462             p->protocol_min, p->protocol_max);
4463         return -1;
4464 }
4465
4466 #if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
4467 static int drbd_do_auth(struct drbd_tconn *tconn)
4468 {
4469         dev_err(DEV, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
4470         dev_err(DEV, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
4471         return -1;
4472 }
4473 #else
4474 #define CHALLENGE_LEN 64
4475
4476 /* Return value:
4477         1 - auth succeeded,
4478         0 - failed, try again (network error),
4479         -1 - auth failed, don't try again.
4480 */
4481
4482 static int drbd_do_auth(struct drbd_tconn *tconn)
4483 {
4484         struct drbd_socket *sock;
4485         char my_challenge[CHALLENGE_LEN];  /* 64 Bytes... */
4486         struct scatterlist sg;
4487         char *response = NULL;
4488         char *right_response = NULL;
4489         char *peers_ch = NULL;
4490         unsigned int key_len;
4491         char secret[SHARED_SECRET_MAX]; /* 64 byte */
4492         unsigned int resp_size;
4493         struct hash_desc desc;
4494         struct packet_info pi;
4495         struct net_conf *nc;
4496         int err, rv;
4497
4498         /* FIXME: Put the challenge/response into the preallocated socket buffer.  */
4499
4500         rcu_read_lock();
4501         nc = rcu_dereference(tconn->net_conf);
4502         key_len = strlen(nc->shared_secret);
4503         memcpy(secret, nc->shared_secret, key_len);
4504         rcu_read_unlock();
4505
4506         desc.tfm = tconn->cram_hmac_tfm;
4507         desc.flags = 0;
4508
4509         rv = crypto_hash_setkey(tconn->cram_hmac_tfm, (u8 *)secret, key_len);
4510         if (rv) {
4511                 conn_err(tconn, "crypto_hash_setkey() failed with %d\n", rv);
4512                 rv = -1;
4513                 goto fail;
4514         }
4515
4516         get_random_bytes(my_challenge, CHALLENGE_LEN);
4517
4518         sock = &tconn->data;
4519         if (!conn_prepare_command(tconn, sock)) {
4520                 rv = 0;
4521                 goto fail;
4522         }
4523         rv = !conn_send_command(tconn, sock, P_AUTH_CHALLENGE, 0,
4524                                 my_challenge, CHALLENGE_LEN);
4525         if (!rv)
4526                 goto fail;
4527
4528         err = drbd_recv_header(tconn, &pi);
4529         if (err) {
4530                 rv = 0;
4531                 goto fail;
4532         }
4533
4534         if (pi.cmd != P_AUTH_CHALLENGE) {
4535                 conn_err(tconn, "expected AuthChallenge packet, received: %s (0x%04x)\n",
4536                     cmdname(pi.cmd), pi.cmd);
4537                 rv = 0;
4538                 goto fail;
4539         }
4540
4541         if (pi.size > CHALLENGE_LEN * 2) {
4542                 conn_err(tconn, "expected AuthChallenge payload too big.\n");
4543                 rv = -1;
4544                 goto fail;
4545         }
4546
4547         peers_ch = kmalloc(pi.size, GFP_NOIO);
4548         if (peers_ch == NULL) {
4549                 conn_err(tconn, "kmalloc of peers_ch failed\n");
4550                 rv = -1;
4551                 goto fail;
4552         }
4553
4554         err = drbd_recv_all_warn(tconn, peers_ch, pi.size);
4555         if (err) {
4556                 rv = 0;
4557                 goto fail;
4558         }
4559
4560         resp_size = crypto_hash_digestsize(tconn->cram_hmac_tfm);
4561         response = kmalloc(resp_size, GFP_NOIO);
4562         if (response == NULL) {
4563                 conn_err(tconn, "kmalloc of response failed\n");
4564                 rv = -1;
4565                 goto fail;
4566         }
4567
4568         sg_init_table(&sg, 1);
4569         sg_set_buf(&sg, peers_ch, pi.size);
4570
4571         rv = crypto_hash_digest(&desc, &sg, sg.length, response);
4572         if (rv) {
4573                 conn_err(tconn, "crypto_hash_digest() failed with %d\n", rv);
4574                 rv = -1;
4575                 goto fail;
4576         }
4577
4578         if (!conn_prepare_command(tconn, sock)) {
4579                 rv = 0;
4580                 goto fail;
4581         }
4582         rv = !conn_send_command(tconn, sock, P_AUTH_RESPONSE, 0,
4583                                 response, resp_size);
4584         if (!rv)
4585                 goto fail;
4586
4587         err = drbd_recv_header(tconn, &pi);
4588         if (err) {
4589                 rv = 0;
4590                 goto fail;
4591         }
4592
4593         if (pi.cmd != P_AUTH_RESPONSE) {
4594                 conn_err(tconn, "expected AuthResponse packet, received: %s (0x%04x)\n",
4595                         cmdname(pi.cmd), pi.cmd);
4596                 rv = 0;
4597                 goto fail;
4598         }
4599
4600         if (pi.size != resp_size) {
4601                 conn_err(tconn, "expected AuthResponse payload of wrong size\n");
4602                 rv = 0;
4603                 goto fail;
4604         }
4605
4606         err = drbd_recv_all_warn(tconn, response , resp_size);
4607         if (err) {
4608                 rv = 0;
4609                 goto fail;
4610         }
4611
4612         right_response = kmalloc(resp_size, GFP_NOIO);
4613         if (right_response == NULL) {
4614                 conn_err(tconn, "kmalloc of right_response failed\n");
4615                 rv = -1;
4616                 goto fail;
4617         }
4618
4619         sg_set_buf(&sg, my_challenge, CHALLENGE_LEN);
4620
4621         rv = crypto_hash_digest(&desc, &sg, sg.length, right_response);
4622         if (rv) {
4623                 conn_err(tconn, "crypto_hash_digest() failed with %d\n", rv);
4624                 rv = -1;
4625                 goto fail;
4626         }
4627
4628         rv = !memcmp(response, right_response, resp_size);
4629
4630         if (rv)
4631                 conn_info(tconn, "Peer authenticated using %d bytes HMAC\n",
4632                      resp_size);
4633         else
4634                 rv = -1;
4635
4636  fail:
4637         kfree(peers_ch);
4638         kfree(response);
4639         kfree(right_response);
4640
4641         return rv;
4642 }
4643 #endif
4644
4645 int drbdd_init(struct drbd_thread *thi)
4646 {
4647         struct drbd_tconn *tconn = thi->tconn;
4648         int h;
4649
4650         conn_info(tconn, "receiver (re)started\n");
4651
4652         do {
4653                 h = conn_connect(tconn);
4654                 if (h == 0) {
4655                         conn_disconnect(tconn);
4656                         schedule_timeout_interruptible(HZ);
4657                 }
4658                 if (h == -1) {
4659                         conn_warn(tconn, "Discarding network configuration.\n");
4660                         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
4661                 }
4662         } while (h == 0);
4663
4664         if (h > 0)
4665                 drbdd(tconn);
4666
4667         conn_disconnect(tconn);
4668
4669         conn_info(tconn, "receiver terminated\n");
4670         return 0;
4671 }
4672
4673 /* ********* acknowledge sender ******** */
4674
4675 static int got_conn_RqSReply(struct drbd_tconn *tconn, struct packet_info *pi)
4676 {
4677         struct p_req_state_reply *p = pi->data;
4678         int retcode = be32_to_cpu(p->retcode);
4679
4680         if (retcode >= SS_SUCCESS) {
4681                 set_bit(CONN_WD_ST_CHG_OKAY, &tconn->flags);
4682         } else {
4683                 set_bit(CONN_WD_ST_CHG_FAIL, &tconn->flags);
4684                 conn_err(tconn, "Requested state change failed by peer: %s (%d)\n",
4685                          drbd_set_st_err_str(retcode), retcode);
4686         }
4687         wake_up(&tconn->ping_wait);
4688
4689         return 0;
4690 }
4691
4692 static int got_RqSReply(struct drbd_tconn *tconn, struct packet_info *pi)
4693 {
4694         struct drbd_conf *mdev;
4695         struct p_req_state_reply *p = pi->data;
4696         int retcode = be32_to_cpu(p->retcode);
4697
4698         mdev = vnr_to_mdev(tconn, pi->vnr);
4699         if (!mdev)
4700                 return -EIO;
4701
4702         if (retcode >= SS_SUCCESS) {
4703                 set_bit(CL_ST_CHG_SUCCESS, &mdev->flags);
4704         } else {
4705                 set_bit(CL_ST_CHG_FAIL, &mdev->flags);
4706                 dev_err(DEV, "Requested state change failed by peer: %s (%d)\n",
4707                         drbd_set_st_err_str(retcode), retcode);
4708         }
4709         wake_up(&mdev->state_wait);
4710
4711         return 0;
4712 }
4713
4714 static int got_Ping(struct drbd_tconn *tconn, struct packet_info *pi)
4715 {
4716         return drbd_send_ping_ack(tconn);
4717
4718 }
4719
4720 static int got_PingAck(struct drbd_tconn *tconn, struct packet_info *pi)
4721 {
4722         /* restore idle timeout */
4723         tconn->meta.socket->sk->sk_rcvtimeo = tconn->net_conf->ping_int*HZ;
4724         if (!test_and_set_bit(GOT_PING_ACK, &tconn->flags))
4725                 wake_up(&tconn->ping_wait);
4726
4727         return 0;
4728 }
4729
4730 static int got_IsInSync(struct drbd_tconn *tconn, struct packet_info *pi)
4731 {
4732         struct drbd_conf *mdev;
4733         struct p_block_ack *p = pi->data;
4734         sector_t sector = be64_to_cpu(p->sector);
4735         int blksize = be32_to_cpu(p->blksize);
4736
4737         mdev = vnr_to_mdev(tconn, pi->vnr);
4738         if (!mdev)
4739                 return -EIO;
4740
4741         D_ASSERT(mdev->tconn->agreed_pro_version >= 89);
4742
4743         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4744
4745         if (get_ldev(mdev)) {
4746                 drbd_rs_complete_io(mdev, sector);
4747                 drbd_set_in_sync(mdev, sector, blksize);
4748                 /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
4749                 mdev->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
4750                 put_ldev(mdev);
4751         }
4752         dec_rs_pending(mdev);
4753         atomic_add(blksize >> 9, &mdev->rs_sect_in);
4754
4755         return 0;
4756 }
4757
4758 static int
4759 validate_req_change_req_state(struct drbd_conf *mdev, u64 id, sector_t sector,
4760                               struct rb_root *root, const char *func,
4761                               enum drbd_req_event what, bool missing_ok)
4762 {
4763         struct drbd_request *req;
4764         struct bio_and_error m;
4765
4766         spin_lock_irq(&mdev->tconn->req_lock);
4767         req = find_request(mdev, root, id, sector, missing_ok, func);
4768         if (unlikely(!req)) {
4769                 spin_unlock_irq(&mdev->tconn->req_lock);
4770                 return -EIO;
4771         }
4772         __req_mod(req, what, &m);
4773         spin_unlock_irq(&mdev->tconn->req_lock);
4774
4775         if (m.bio)
4776                 complete_master_bio(mdev, &m);
4777         return 0;
4778 }
4779
4780 static int got_BlockAck(struct drbd_tconn *tconn, struct packet_info *pi)
4781 {
4782         struct drbd_conf *mdev;
4783         struct p_block_ack *p = pi->data;
4784         sector_t sector = be64_to_cpu(p->sector);
4785         int blksize = be32_to_cpu(p->blksize);
4786         enum drbd_req_event what;
4787
4788         mdev = vnr_to_mdev(tconn, pi->vnr);
4789         if (!mdev)
4790                 return -EIO;
4791
4792         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4793
4794         if (p->block_id == ID_SYNCER) {
4795                 drbd_set_in_sync(mdev, sector, blksize);
4796                 dec_rs_pending(mdev);
4797                 return 0;
4798         }
4799         switch (pi->cmd) {
4800         case P_RS_WRITE_ACK:
4801                 what = WRITE_ACKED_BY_PEER_AND_SIS;
4802                 break;
4803         case P_WRITE_ACK:
4804                 what = WRITE_ACKED_BY_PEER;
4805                 break;
4806         case P_RECV_ACK:
4807                 what = RECV_ACKED_BY_PEER;
4808                 break;
4809         case P_DISCARD_WRITE:
4810                 what = DISCARD_WRITE;
4811                 break;
4812         case P_RETRY_WRITE:
4813                 what = POSTPONE_WRITE;
4814                 break;
4815         default:
4816                 BUG();
4817         }
4818
4819         return validate_req_change_req_state(mdev, p->block_id, sector,
4820                                              &mdev->write_requests, __func__,
4821                                              what, false);
4822 }
4823
4824 static int got_NegAck(struct drbd_tconn *tconn, struct packet_info *pi)
4825 {
4826         struct drbd_conf *mdev;
4827         struct p_block_ack *p = pi->data;
4828         sector_t sector = be64_to_cpu(p->sector);
4829         int size = be32_to_cpu(p->blksize);
4830         int err;
4831
4832         mdev = vnr_to_mdev(tconn, pi->vnr);
4833         if (!mdev)
4834                 return -EIO;
4835
4836         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4837
4838         if (p->block_id == ID_SYNCER) {
4839                 dec_rs_pending(mdev);
4840                 drbd_rs_failed_io(mdev, sector, size);
4841                 return 0;
4842         }
4843
4844         err = validate_req_change_req_state(mdev, p->block_id, sector,
4845                                             &mdev->write_requests, __func__,
4846                                             NEG_ACKED, true);
4847         if (err) {
4848                 /* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
4849                    The master bio might already be completed, therefore the
4850                    request is no longer in the collision hash. */
4851                 /* In Protocol B we might already have got a P_RECV_ACK
4852                    but then get a P_NEG_ACK afterwards. */
4853                 drbd_set_out_of_sync(mdev, sector, size);
4854         }
4855         return 0;
4856 }
4857
4858 static int got_NegDReply(struct drbd_tconn *tconn, struct packet_info *pi)
4859 {
4860         struct drbd_conf *mdev;
4861         struct p_block_ack *p = pi->data;
4862         sector_t sector = be64_to_cpu(p->sector);
4863
4864         mdev = vnr_to_mdev(tconn, pi->vnr);
4865         if (!mdev)
4866                 return -EIO;
4867
4868         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4869
4870         dev_err(DEV, "Got NegDReply; Sector %llus, len %u; Fail original request.\n",
4871             (unsigned long long)sector, be32_to_cpu(p->blksize));
4872
4873         return validate_req_change_req_state(mdev, p->block_id, sector,
4874                                              &mdev->read_requests, __func__,
4875                                              NEG_ACKED, false);
4876 }
4877
4878 static int got_NegRSDReply(struct drbd_tconn *tconn, struct packet_info *pi)
4879 {
4880         struct drbd_conf *mdev;
4881         sector_t sector;
4882         int size;
4883         struct p_block_ack *p = pi->data;
4884
4885         mdev = vnr_to_mdev(tconn, pi->vnr);
4886         if (!mdev)
4887                 return -EIO;
4888
4889         sector = be64_to_cpu(p->sector);
4890         size = be32_to_cpu(p->blksize);
4891
4892         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4893
4894         dec_rs_pending(mdev);
4895
4896         if (get_ldev_if_state(mdev, D_FAILED)) {
4897                 drbd_rs_complete_io(mdev, sector);
4898                 switch (pi->cmd) {
4899                 case P_NEG_RS_DREPLY:
4900                         drbd_rs_failed_io(mdev, sector, size);
4901                 case P_RS_CANCEL:
4902                         break;
4903                 default:
4904                         BUG();
4905                 }
4906                 put_ldev(mdev);
4907         }
4908
4909         return 0;
4910 }
4911
4912 static int got_BarrierAck(struct drbd_tconn *tconn, struct packet_info *pi)
4913 {
4914         struct drbd_conf *mdev;
4915         struct p_barrier_ack *p = pi->data;
4916
4917         mdev = vnr_to_mdev(tconn, pi->vnr);
4918         if (!mdev)
4919                 return -EIO;
4920
4921         tl_release(mdev->tconn, p->barrier, be32_to_cpu(p->set_size));
4922
4923         if (mdev->state.conn == C_AHEAD &&
4924             atomic_read(&mdev->ap_in_flight) == 0 &&
4925             !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &mdev->current_epoch->flags)) {
4926                 mdev->start_resync_timer.expires = jiffies + HZ;
4927                 add_timer(&mdev->start_resync_timer);
4928         }
4929
4930         return 0;
4931 }
4932
4933 static int got_OVResult(struct drbd_tconn *tconn, struct packet_info *pi)
4934 {
4935         struct drbd_conf *mdev;
4936         struct p_block_ack *p = pi->data;
4937         struct drbd_work *w;
4938         sector_t sector;
4939         int size;
4940
4941         mdev = vnr_to_mdev(tconn, pi->vnr);
4942         if (!mdev)
4943                 return -EIO;
4944
4945         sector = be64_to_cpu(p->sector);
4946         size = be32_to_cpu(p->blksize);
4947
4948         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4949
4950         if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
4951                 drbd_ov_out_of_sync_found(mdev, sector, size);
4952         else
4953                 ov_out_of_sync_print(mdev);
4954
4955         if (!get_ldev(mdev))
4956                 return 0;
4957
4958         drbd_rs_complete_io(mdev, sector);
4959         dec_rs_pending(mdev);
4960
4961         --mdev->ov_left;
4962
4963         /* let's advance progress step marks only for every other megabyte */
4964         if ((mdev->ov_left & 0x200) == 0x200)
4965                 drbd_advance_rs_marks(mdev, mdev->ov_left);
4966
4967         if (mdev->ov_left == 0) {
4968                 w = kmalloc(sizeof(*w), GFP_NOIO);
4969                 if (w) {
4970                         w->cb = w_ov_finished;
4971                         w->mdev = mdev;
4972                         drbd_queue_work_front(&mdev->tconn->data.work, w);
4973                 } else {
4974                         dev_err(DEV, "kmalloc(w) failed.");
4975                         ov_out_of_sync_print(mdev);
4976                         drbd_resync_finished(mdev);
4977                 }
4978         }
4979         put_ldev(mdev);
4980         return 0;
4981 }
4982
4983 static int got_skip(struct drbd_tconn *tconn, struct packet_info *pi)
4984 {
4985         return 0;
4986 }
4987
4988 static int tconn_finish_peer_reqs(struct drbd_tconn *tconn)
4989 {
4990         struct drbd_conf *mdev;
4991         int vnr, not_empty = 0;
4992
4993         do {
4994                 clear_bit(SIGNAL_ASENDER, &tconn->flags);
4995                 flush_signals(current);
4996
4997                 rcu_read_lock();
4998                 idr_for_each_entry(&tconn->volumes, mdev, vnr) {
4999                         kref_get(&mdev->kref);
5000                         rcu_read_unlock();
5001                         if (drbd_finish_peer_reqs(mdev)) {
5002                                 kref_put(&mdev->kref, &drbd_minor_destroy);
5003                                 return 1;
5004                         }
5005                         kref_put(&mdev->kref, &drbd_minor_destroy);
5006                         rcu_read_lock();
5007                 }
5008                 set_bit(SIGNAL_ASENDER, &tconn->flags);
5009
5010                 spin_lock_irq(&tconn->req_lock);
5011                 idr_for_each_entry(&tconn->volumes, mdev, vnr) {
5012                         not_empty = !list_empty(&mdev->done_ee);
5013                         if (not_empty)
5014                                 break;
5015                 }
5016                 spin_unlock_irq(&tconn->req_lock);
5017                 rcu_read_unlock();
5018         } while (not_empty);
5019
5020         return 0;
5021 }
5022
5023 struct asender_cmd {
5024         size_t pkt_size;
5025         int (*fn)(struct drbd_tconn *tconn, struct packet_info *);
5026 };
5027
5028 static struct asender_cmd asender_tbl[] = {
5029         [P_PING]            = { 0, got_Ping },
5030         [P_PING_ACK]        = { 0, got_PingAck },
5031         [P_RECV_ACK]        = { sizeof(struct p_block_ack), got_BlockAck },
5032         [P_WRITE_ACK]       = { sizeof(struct p_block_ack), got_BlockAck },
5033         [P_RS_WRITE_ACK]    = { sizeof(struct p_block_ack), got_BlockAck },
5034         [P_DISCARD_WRITE]   = { sizeof(struct p_block_ack), got_BlockAck },
5035         [P_NEG_ACK]         = { sizeof(struct p_block_ack), got_NegAck },
5036         [P_NEG_DREPLY]      = { sizeof(struct p_block_ack), got_NegDReply },
5037         [P_NEG_RS_DREPLY]   = { sizeof(struct p_block_ack), got_NegRSDReply },
5038         [P_OV_RESULT]       = { sizeof(struct p_block_ack), got_OVResult },
5039         [P_BARRIER_ACK]     = { sizeof(struct p_barrier_ack), got_BarrierAck },
5040         [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
5041         [P_RS_IS_IN_SYNC]   = { sizeof(struct p_block_ack), got_IsInSync },
5042         [P_DELAY_PROBE]     = { sizeof(struct p_delay_probe93), got_skip },
5043         [P_RS_CANCEL]       = { sizeof(struct p_block_ack), got_NegRSDReply },
5044         [P_CONN_ST_CHG_REPLY]={ sizeof(struct p_req_state_reply), got_conn_RqSReply },
5045         [P_RETRY_WRITE]     = { sizeof(struct p_block_ack), got_BlockAck },
5046 };
5047
5048 int drbd_asender(struct drbd_thread *thi)
5049 {
5050         struct drbd_tconn *tconn = thi->tconn;
5051         struct asender_cmd *cmd = NULL;
5052         struct packet_info pi;
5053         int rv;
5054         void *buf    = tconn->meta.rbuf;
5055         int received = 0;
5056         unsigned int header_size = drbd_header_size(tconn);
5057         int expect   = header_size;
5058         bool ping_timeout_active = false;
5059         struct net_conf *nc;
5060         int ping_timeo, tcp_cork, ping_int;
5061
5062         current->policy = SCHED_RR;  /* Make this a realtime task! */
5063         current->rt_priority = 2;    /* more important than all other tasks */
5064
5065         while (get_t_state(thi) == RUNNING) {
5066                 drbd_thread_current_set_cpu(thi);
5067
5068                 rcu_read_lock();
5069                 nc = rcu_dereference(tconn->net_conf);
5070                 ping_timeo = nc->ping_timeo;
5071                 tcp_cork = nc->tcp_cork;
5072                 ping_int = nc->ping_int;
5073                 rcu_read_unlock();
5074
5075                 if (test_and_clear_bit(SEND_PING, &tconn->flags)) {
5076                         if (drbd_send_ping(tconn)) {
5077                                 conn_err(tconn, "drbd_send_ping has failed\n");
5078                                 goto reconnect;
5079                         }
5080                         tconn->meta.socket->sk->sk_rcvtimeo = ping_timeo * HZ / 10;
5081                         ping_timeout_active = true;
5082                 }
5083
5084                 /* TODO: conditionally cork; it may hurt latency if we cork without
5085                    much to send */
5086                 if (tcp_cork)
5087                         drbd_tcp_cork(tconn->meta.socket);
5088                 if (tconn_finish_peer_reqs(tconn)) {
5089                         conn_err(tconn, "tconn_finish_peer_reqs() failed\n");
5090                         goto reconnect;
5091                 }
5092                 /* but unconditionally uncork unless disabled */
5093                 if (tcp_cork)
5094                         drbd_tcp_uncork(tconn->meta.socket);
5095
5096                 /* short circuit, recv_msg would return EINTR anyways. */
5097                 if (signal_pending(current))
5098                         continue;
5099
5100                 rv = drbd_recv_short(tconn->meta.socket, buf, expect-received, 0);
5101                 clear_bit(SIGNAL_ASENDER, &tconn->flags);
5102
5103                 flush_signals(current);
5104
5105                 /* Note:
5106                  * -EINTR        (on meta) we got a signal
5107                  * -EAGAIN       (on meta) rcvtimeo expired
5108                  * -ECONNRESET   other side closed the connection
5109                  * -ERESTARTSYS  (on data) we got a signal
5110                  * rv <  0       other than above: unexpected error!
5111                  * rv == expected: full header or command
5112                  * rv <  expected: "woken" by signal during receive
5113                  * rv == 0       : "connection shut down by peer"
5114                  */
5115                 if (likely(rv > 0)) {
5116                         received += rv;
5117                         buf      += rv;
5118                 } else if (rv == 0) {
5119                         conn_err(tconn, "meta connection shut down by peer.\n");
5120                         goto reconnect;
5121                 } else if (rv == -EAGAIN) {
5122                         /* If the data socket received something meanwhile,
5123                          * that is good enough: peer is still alive. */
5124                         if (time_after(tconn->last_received,
5125                                 jiffies - tconn->meta.socket->sk->sk_rcvtimeo))
5126                                 continue;
5127                         if (ping_timeout_active) {
5128                                 conn_err(tconn, "PingAck did not arrive in time.\n");
5129                                 goto reconnect;
5130                         }
5131                         set_bit(SEND_PING, &tconn->flags);
5132                         continue;
5133                 } else if (rv == -EINTR) {
5134                         continue;
5135                 } else {
5136                         conn_err(tconn, "sock_recvmsg returned %d\n", rv);
5137                         goto reconnect;
5138                 }
5139
5140                 if (received == expect && cmd == NULL) {
5141                         if (decode_header(tconn, tconn->meta.rbuf, &pi))
5142                                 goto reconnect;
5143                         cmd = &asender_tbl[pi.cmd];
5144                         if (pi.cmd >= ARRAY_SIZE(asender_tbl) || !cmd->fn) {
5145                                 conn_err(tconn, "unknown command %d on meta (l: %d)\n",
5146                                         pi.cmd, pi.size);
5147                                 goto disconnect;
5148                         }
5149                         expect = header_size + cmd->pkt_size;
5150                         if (pi.size != expect - header_size) {
5151                                 conn_err(tconn, "Wrong packet size on meta (c: %d, l: %d)\n",
5152                                         pi.cmd, pi.size);
5153                                 goto reconnect;
5154                         }
5155                 }
5156                 if (received == expect) {
5157                         bool err;
5158
5159                         err = cmd->fn(tconn, &pi);
5160                         if (err) {
5161                                 conn_err(tconn, "%pf failed\n", cmd->fn);
5162                                 goto reconnect;
5163                         }
5164
5165                         tconn->last_received = jiffies;
5166
5167                         if (cmd == &asender_tbl[P_PING_ACK]) {
5168                                 /* restore idle timeout */
5169                                 tconn->meta.socket->sk->sk_rcvtimeo = ping_int * HZ;
5170                                 ping_timeout_active = false;
5171                         }
5172
5173                         buf      = tconn->meta.rbuf;
5174                         received = 0;
5175                         expect   = header_size;
5176                         cmd      = NULL;
5177                 }
5178         }
5179
5180         if (0) {
5181 reconnect:
5182                 conn_request_state(tconn, NS(conn, C_NETWORK_FAILURE), CS_HARD);
5183         }
5184         if (0) {
5185 disconnect:
5186                 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
5187         }
5188         clear_bit(SIGNAL_ASENDER, &tconn->flags);
5189
5190         conn_info(tconn, "asender terminated\n");
5191
5192         return 0;
5193 }