]> git.kernelconcepts.de Git - karo-tx-linux.git/blob - drivers/block/drbd/drbd_receiver.c
Merge branch 'for-4.8/core' of git://git.kernel.dk/linux-block
[karo-tx-linux.git] / drivers / block / drbd / drbd_receiver.c
1 /*
2    drbd_receiver.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24
25
26 #include <linux/module.h>
27
28 #include <asm/uaccess.h>
29 #include <net/sock.h>
30
31 #include <linux/drbd.h>
32 #include <linux/fs.h>
33 #include <linux/file.h>
34 #include <linux/in.h>
35 #include <linux/mm.h>
36 #include <linux/memcontrol.h>
37 #include <linux/mm_inline.h>
38 #include <linux/slab.h>
39 #include <linux/pkt_sched.h>
40 #define __KERNEL_SYSCALLS__
41 #include <linux/unistd.h>
42 #include <linux/vmalloc.h>
43 #include <linux/random.h>
44 #include <linux/string.h>
45 #include <linux/scatterlist.h>
46 #include "drbd_int.h"
47 #include "drbd_protocol.h"
48 #include "drbd_req.h"
49 #include "drbd_vli.h"
50
51 #define PRO_FEATURES (FF_TRIM)
52
53 struct packet_info {
54         enum drbd_packet cmd;
55         unsigned int size;
56         unsigned int vnr;
57         void *data;
58 };
59
60 enum finish_epoch {
61         FE_STILL_LIVE,
62         FE_DESTROYED,
63         FE_RECYCLED,
64 };
65
66 static int drbd_do_features(struct drbd_connection *connection);
67 static int drbd_do_auth(struct drbd_connection *connection);
68 static int drbd_disconnected(struct drbd_peer_device *);
69 static void conn_wait_active_ee_empty(struct drbd_connection *connection);
70 static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *, struct drbd_epoch *, enum epoch_event);
71 static int e_end_block(struct drbd_work *, int);
72
73
74 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
75
76 /*
77  * some helper functions to deal with single linked page lists,
78  * page->private being our "next" pointer.
79  */
80
81 /* If at least n pages are linked at head, get n pages off.
82  * Otherwise, don't modify head, and return NULL.
83  * Locking is the responsibility of the caller.
84  */
85 static struct page *page_chain_del(struct page **head, int n)
86 {
87         struct page *page;
88         struct page *tmp;
89
90         BUG_ON(!n);
91         BUG_ON(!head);
92
93         page = *head;
94
95         if (!page)
96                 return NULL;
97
98         while (page) {
99                 tmp = page_chain_next(page);
100                 if (--n == 0)
101                         break; /* found sufficient pages */
102                 if (tmp == NULL)
103                         /* insufficient pages, don't use any of them. */
104                         return NULL;
105                 page = tmp;
106         }
107
108         /* add end of list marker for the returned list */
109         set_page_private(page, 0);
110         /* actual return value, and adjustment of head */
111         page = *head;
112         *head = tmp;
113         return page;
114 }
115
116 /* may be used outside of locks to find the tail of a (usually short)
117  * "private" page chain, before adding it back to a global chain head
118  * with page_chain_add() under a spinlock. */
119 static struct page *page_chain_tail(struct page *page, int *len)
120 {
121         struct page *tmp;
122         int i = 1;
123         while ((tmp = page_chain_next(page)))
124                 ++i, page = tmp;
125         if (len)
126                 *len = i;
127         return page;
128 }
129
130 static int page_chain_free(struct page *page)
131 {
132         struct page *tmp;
133         int i = 0;
134         page_chain_for_each_safe(page, tmp) {
135                 put_page(page);
136                 ++i;
137         }
138         return i;
139 }
140
141 static void page_chain_add(struct page **head,
142                 struct page *chain_first, struct page *chain_last)
143 {
144 #if 1
145         struct page *tmp;
146         tmp = page_chain_tail(chain_first, NULL);
147         BUG_ON(tmp != chain_last);
148 #endif
149
150         /* add chain to head */
151         set_page_private(chain_last, (unsigned long)*head);
152         *head = chain_first;
153 }
154
155 static struct page *__drbd_alloc_pages(struct drbd_device *device,
156                                        unsigned int number)
157 {
158         struct page *page = NULL;
159         struct page *tmp = NULL;
160         unsigned int i = 0;
161
162         /* Yes, testing drbd_pp_vacant outside the lock is racy.
163          * So what. It saves a spin_lock. */
164         if (drbd_pp_vacant >= number) {
165                 spin_lock(&drbd_pp_lock);
166                 page = page_chain_del(&drbd_pp_pool, number);
167                 if (page)
168                         drbd_pp_vacant -= number;
169                 spin_unlock(&drbd_pp_lock);
170                 if (page)
171                         return page;
172         }
173
174         /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
175          * "criss-cross" setup, that might cause write-out on some other DRBD,
176          * which in turn might block on the other node at this very place.  */
177         for (i = 0; i < number; i++) {
178                 tmp = alloc_page(GFP_TRY);
179                 if (!tmp)
180                         break;
181                 set_page_private(tmp, (unsigned long)page);
182                 page = tmp;
183         }
184
185         if (i == number)
186                 return page;
187
188         /* Not enough pages immediately available this time.
189          * No need to jump around here, drbd_alloc_pages will retry this
190          * function "soon". */
191         if (page) {
192                 tmp = page_chain_tail(page, NULL);
193                 spin_lock(&drbd_pp_lock);
194                 page_chain_add(&drbd_pp_pool, page, tmp);
195                 drbd_pp_vacant += i;
196                 spin_unlock(&drbd_pp_lock);
197         }
198         return NULL;
199 }
200
201 static void reclaim_finished_net_peer_reqs(struct drbd_device *device,
202                                            struct list_head *to_be_freed)
203 {
204         struct drbd_peer_request *peer_req, *tmp;
205
206         /* The EEs are always appended to the end of the list. Since
207            they are sent in order over the wire, they have to finish
208            in order. As soon as we see the first not finished we can
209            stop to examine the list... */
210
211         list_for_each_entry_safe(peer_req, tmp, &device->net_ee, w.list) {
212                 if (drbd_peer_req_has_active_page(peer_req))
213                         break;
214                 list_move(&peer_req->w.list, to_be_freed);
215         }
216 }
217
218 static void drbd_reclaim_net_peer_reqs(struct drbd_device *device)
219 {
220         LIST_HEAD(reclaimed);
221         struct drbd_peer_request *peer_req, *t;
222
223         spin_lock_irq(&device->resource->req_lock);
224         reclaim_finished_net_peer_reqs(device, &reclaimed);
225         spin_unlock_irq(&device->resource->req_lock);
226         list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
227                 drbd_free_net_peer_req(device, peer_req);
228 }
229
230 static void conn_reclaim_net_peer_reqs(struct drbd_connection *connection)
231 {
232         struct drbd_peer_device *peer_device;
233         int vnr;
234
235         rcu_read_lock();
236         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
237                 struct drbd_device *device = peer_device->device;
238                 if (!atomic_read(&device->pp_in_use_by_net))
239                         continue;
240
241                 kref_get(&device->kref);
242                 rcu_read_unlock();
243                 drbd_reclaim_net_peer_reqs(device);
244                 kref_put(&device->kref, drbd_destroy_device);
245                 rcu_read_lock();
246         }
247         rcu_read_unlock();
248 }
249
250 /**
251  * drbd_alloc_pages() - Returns @number pages, retries forever (or until signalled)
252  * @device:     DRBD device.
253  * @number:     number of pages requested
254  * @retry:      whether to retry, if not enough pages are available right now
255  *
256  * Tries to allocate number pages, first from our own page pool, then from
257  * the kernel.
258  * Possibly retry until DRBD frees sufficient pages somewhere else.
259  *
260  * If this allocation would exceed the max_buffers setting, we throttle
261  * allocation (schedule_timeout) to give the system some room to breathe.
262  *
263  * We do not use max-buffers as hard limit, because it could lead to
264  * congestion and further to a distributed deadlock during online-verify or
265  * (checksum based) resync, if the max-buffers, socket buffer sizes and
266  * resync-rate settings are mis-configured.
267  *
268  * Returns a page chain linked via page->private.
269  */
270 struct page *drbd_alloc_pages(struct drbd_peer_device *peer_device, unsigned int number,
271                               bool retry)
272 {
273         struct drbd_device *device = peer_device->device;
274         struct page *page = NULL;
275         struct net_conf *nc;
276         DEFINE_WAIT(wait);
277         unsigned int mxb;
278
279         rcu_read_lock();
280         nc = rcu_dereference(peer_device->connection->net_conf);
281         mxb = nc ? nc->max_buffers : 1000000;
282         rcu_read_unlock();
283
284         if (atomic_read(&device->pp_in_use) < mxb)
285                 page = __drbd_alloc_pages(device, number);
286
287         /* Try to keep the fast path fast, but occasionally we need
288          * to reclaim the pages we lended to the network stack. */
289         if (page && atomic_read(&device->pp_in_use_by_net) > 512)
290                 drbd_reclaim_net_peer_reqs(device);
291
292         while (page == NULL) {
293                 prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
294
295                 drbd_reclaim_net_peer_reqs(device);
296
297                 if (atomic_read(&device->pp_in_use) < mxb) {
298                         page = __drbd_alloc_pages(device, number);
299                         if (page)
300                                 break;
301                 }
302
303                 if (!retry)
304                         break;
305
306                 if (signal_pending(current)) {
307                         drbd_warn(device, "drbd_alloc_pages interrupted!\n");
308                         break;
309                 }
310
311                 if (schedule_timeout(HZ/10) == 0)
312                         mxb = UINT_MAX;
313         }
314         finish_wait(&drbd_pp_wait, &wait);
315
316         if (page)
317                 atomic_add(number, &device->pp_in_use);
318         return page;
319 }
320
321 /* Must not be used from irq, as that may deadlock: see drbd_alloc_pages.
322  * Is also used from inside an other spin_lock_irq(&resource->req_lock);
323  * Either links the page chain back to the global pool,
324  * or returns all pages to the system. */
325 static void drbd_free_pages(struct drbd_device *device, struct page *page, int is_net)
326 {
327         atomic_t *a = is_net ? &device->pp_in_use_by_net : &device->pp_in_use;
328         int i;
329
330         if (page == NULL)
331                 return;
332
333         if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * minor_count)
334                 i = page_chain_free(page);
335         else {
336                 struct page *tmp;
337                 tmp = page_chain_tail(page, &i);
338                 spin_lock(&drbd_pp_lock);
339                 page_chain_add(&drbd_pp_pool, page, tmp);
340                 drbd_pp_vacant += i;
341                 spin_unlock(&drbd_pp_lock);
342         }
343         i = atomic_sub_return(i, a);
344         if (i < 0)
345                 drbd_warn(device, "ASSERTION FAILED: %s: %d < 0\n",
346                         is_net ? "pp_in_use_by_net" : "pp_in_use", i);
347         wake_up(&drbd_pp_wait);
348 }
349
350 /*
351 You need to hold the req_lock:
352  _drbd_wait_ee_list_empty()
353
354 You must not have the req_lock:
355  drbd_free_peer_req()
356  drbd_alloc_peer_req()
357  drbd_free_peer_reqs()
358  drbd_ee_fix_bhs()
359  drbd_finish_peer_reqs()
360  drbd_clear_done_ee()
361  drbd_wait_ee_list_empty()
362 */
363
364 struct drbd_peer_request *
365 drbd_alloc_peer_req(struct drbd_peer_device *peer_device, u64 id, sector_t sector,
366                     unsigned int data_size, bool has_payload, gfp_t gfp_mask) __must_hold(local)
367 {
368         struct drbd_device *device = peer_device->device;
369         struct drbd_peer_request *peer_req;
370         struct page *page = NULL;
371         unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
372
373         if (drbd_insert_fault(device, DRBD_FAULT_AL_EE))
374                 return NULL;
375
376         peer_req = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
377         if (!peer_req) {
378                 if (!(gfp_mask & __GFP_NOWARN))
379                         drbd_err(device, "%s: allocation failed\n", __func__);
380                 return NULL;
381         }
382
383         if (has_payload && data_size) {
384                 page = drbd_alloc_pages(peer_device, nr_pages,
385                                         gfpflags_allow_blocking(gfp_mask));
386                 if (!page)
387                         goto fail;
388         }
389
390         memset(peer_req, 0, sizeof(*peer_req));
391         INIT_LIST_HEAD(&peer_req->w.list);
392         drbd_clear_interval(&peer_req->i);
393         peer_req->i.size = data_size;
394         peer_req->i.sector = sector;
395         peer_req->submit_jif = jiffies;
396         peer_req->peer_device = peer_device;
397         peer_req->pages = page;
398         /*
399          * The block_id is opaque to the receiver.  It is not endianness
400          * converted, and sent back to the sender unchanged.
401          */
402         peer_req->block_id = id;
403
404         return peer_req;
405
406  fail:
407         mempool_free(peer_req, drbd_ee_mempool);
408         return NULL;
409 }
410
411 void __drbd_free_peer_req(struct drbd_device *device, struct drbd_peer_request *peer_req,
412                        int is_net)
413 {
414         might_sleep();
415         if (peer_req->flags & EE_HAS_DIGEST)
416                 kfree(peer_req->digest);
417         drbd_free_pages(device, peer_req->pages, is_net);
418         D_ASSERT(device, atomic_read(&peer_req->pending_bios) == 0);
419         D_ASSERT(device, drbd_interval_empty(&peer_req->i));
420         if (!expect(!(peer_req->flags & EE_CALL_AL_COMPLETE_IO))) {
421                 peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
422                 drbd_al_complete_io(device, &peer_req->i);
423         }
424         mempool_free(peer_req, drbd_ee_mempool);
425 }
426
427 int drbd_free_peer_reqs(struct drbd_device *device, struct list_head *list)
428 {
429         LIST_HEAD(work_list);
430         struct drbd_peer_request *peer_req, *t;
431         int count = 0;
432         int is_net = list == &device->net_ee;
433
434         spin_lock_irq(&device->resource->req_lock);
435         list_splice_init(list, &work_list);
436         spin_unlock_irq(&device->resource->req_lock);
437
438         list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
439                 __drbd_free_peer_req(device, peer_req, is_net);
440                 count++;
441         }
442         return count;
443 }
444
445 /*
446  * See also comments in _req_mod(,BARRIER_ACKED) and receive_Barrier.
447  */
448 static int drbd_finish_peer_reqs(struct drbd_device *device)
449 {
450         LIST_HEAD(work_list);
451         LIST_HEAD(reclaimed);
452         struct drbd_peer_request *peer_req, *t;
453         int err = 0;
454
455         spin_lock_irq(&device->resource->req_lock);
456         reclaim_finished_net_peer_reqs(device, &reclaimed);
457         list_splice_init(&device->done_ee, &work_list);
458         spin_unlock_irq(&device->resource->req_lock);
459
460         list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
461                 drbd_free_net_peer_req(device, peer_req);
462
463         /* possible callbacks here:
464          * e_end_block, and e_end_resync_block, e_send_superseded.
465          * all ignore the last argument.
466          */
467         list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
468                 int err2;
469
470                 /* list_del not necessary, next/prev members not touched */
471                 err2 = peer_req->w.cb(&peer_req->w, !!err);
472                 if (!err)
473                         err = err2;
474                 drbd_free_peer_req(device, peer_req);
475         }
476         wake_up(&device->ee_wait);
477
478         return err;
479 }
480
481 static void _drbd_wait_ee_list_empty(struct drbd_device *device,
482                                      struct list_head *head)
483 {
484         DEFINE_WAIT(wait);
485
486         /* avoids spin_lock/unlock
487          * and calling prepare_to_wait in the fast path */
488         while (!list_empty(head)) {
489                 prepare_to_wait(&device->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
490                 spin_unlock_irq(&device->resource->req_lock);
491                 io_schedule();
492                 finish_wait(&device->ee_wait, &wait);
493                 spin_lock_irq(&device->resource->req_lock);
494         }
495 }
496
497 static void drbd_wait_ee_list_empty(struct drbd_device *device,
498                                     struct list_head *head)
499 {
500         spin_lock_irq(&device->resource->req_lock);
501         _drbd_wait_ee_list_empty(device, head);
502         spin_unlock_irq(&device->resource->req_lock);
503 }
504
505 static int drbd_recv_short(struct socket *sock, void *buf, size_t size, int flags)
506 {
507         struct kvec iov = {
508                 .iov_base = buf,
509                 .iov_len = size,
510         };
511         struct msghdr msg = {
512                 .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
513         };
514         return kernel_recvmsg(sock, &msg, &iov, 1, size, msg.msg_flags);
515 }
516
517 static int drbd_recv(struct drbd_connection *connection, void *buf, size_t size)
518 {
519         int rv;
520
521         rv = drbd_recv_short(connection->data.socket, buf, size, 0);
522
523         if (rv < 0) {
524                 if (rv == -ECONNRESET)
525                         drbd_info(connection, "sock was reset by peer\n");
526                 else if (rv != -ERESTARTSYS)
527                         drbd_err(connection, "sock_recvmsg returned %d\n", rv);
528         } else if (rv == 0) {
529                 if (test_bit(DISCONNECT_SENT, &connection->flags)) {
530                         long t;
531                         rcu_read_lock();
532                         t = rcu_dereference(connection->net_conf)->ping_timeo * HZ/10;
533                         rcu_read_unlock();
534
535                         t = wait_event_timeout(connection->ping_wait, connection->cstate < C_WF_REPORT_PARAMS, t);
536
537                         if (t)
538                                 goto out;
539                 }
540                 drbd_info(connection, "sock was shut down by peer\n");
541         }
542
543         if (rv != size)
544                 conn_request_state(connection, NS(conn, C_BROKEN_PIPE), CS_HARD);
545
546 out:
547         return rv;
548 }
549
550 static int drbd_recv_all(struct drbd_connection *connection, void *buf, size_t size)
551 {
552         int err;
553
554         err = drbd_recv(connection, buf, size);
555         if (err != size) {
556                 if (err >= 0)
557                         err = -EIO;
558         } else
559                 err = 0;
560         return err;
561 }
562
563 static int drbd_recv_all_warn(struct drbd_connection *connection, void *buf, size_t size)
564 {
565         int err;
566
567         err = drbd_recv_all(connection, buf, size);
568         if (err && !signal_pending(current))
569                 drbd_warn(connection, "short read (expected size %d)\n", (int)size);
570         return err;
571 }
572
573 /* quoting tcp(7):
574  *   On individual connections, the socket buffer size must be set prior to the
575  *   listen(2) or connect(2) calls in order to have it take effect.
576  * This is our wrapper to do so.
577  */
578 static void drbd_setbufsize(struct socket *sock, unsigned int snd,
579                 unsigned int rcv)
580 {
581         /* open coded SO_SNDBUF, SO_RCVBUF */
582         if (snd) {
583                 sock->sk->sk_sndbuf = snd;
584                 sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
585         }
586         if (rcv) {
587                 sock->sk->sk_rcvbuf = rcv;
588                 sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
589         }
590 }
591
592 static struct socket *drbd_try_connect(struct drbd_connection *connection)
593 {
594         const char *what;
595         struct socket *sock;
596         struct sockaddr_in6 src_in6;
597         struct sockaddr_in6 peer_in6;
598         struct net_conf *nc;
599         int err, peer_addr_len, my_addr_len;
600         int sndbuf_size, rcvbuf_size, connect_int;
601         int disconnect_on_error = 1;
602
603         rcu_read_lock();
604         nc = rcu_dereference(connection->net_conf);
605         if (!nc) {
606                 rcu_read_unlock();
607                 return NULL;
608         }
609         sndbuf_size = nc->sndbuf_size;
610         rcvbuf_size = nc->rcvbuf_size;
611         connect_int = nc->connect_int;
612         rcu_read_unlock();
613
614         my_addr_len = min_t(int, connection->my_addr_len, sizeof(src_in6));
615         memcpy(&src_in6, &connection->my_addr, my_addr_len);
616
617         if (((struct sockaddr *)&connection->my_addr)->sa_family == AF_INET6)
618                 src_in6.sin6_port = 0;
619         else
620                 ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
621
622         peer_addr_len = min_t(int, connection->peer_addr_len, sizeof(src_in6));
623         memcpy(&peer_in6, &connection->peer_addr, peer_addr_len);
624
625         what = "sock_create_kern";
626         err = sock_create_kern(&init_net, ((struct sockaddr *)&src_in6)->sa_family,
627                                SOCK_STREAM, IPPROTO_TCP, &sock);
628         if (err < 0) {
629                 sock = NULL;
630                 goto out;
631         }
632
633         sock->sk->sk_rcvtimeo =
634         sock->sk->sk_sndtimeo = connect_int * HZ;
635         drbd_setbufsize(sock, sndbuf_size, rcvbuf_size);
636
637        /* explicitly bind to the configured IP as source IP
638         *  for the outgoing connections.
639         *  This is needed for multihomed hosts and to be
640         *  able to use lo: interfaces for drbd.
641         * Make sure to use 0 as port number, so linux selects
642         *  a free one dynamically.
643         */
644         what = "bind before connect";
645         err = sock->ops->bind(sock, (struct sockaddr *) &src_in6, my_addr_len);
646         if (err < 0)
647                 goto out;
648
649         /* connect may fail, peer not yet available.
650          * stay C_WF_CONNECTION, don't go Disconnecting! */
651         disconnect_on_error = 0;
652         what = "connect";
653         err = sock->ops->connect(sock, (struct sockaddr *) &peer_in6, peer_addr_len, 0);
654
655 out:
656         if (err < 0) {
657                 if (sock) {
658                         sock_release(sock);
659                         sock = NULL;
660                 }
661                 switch (-err) {
662                         /* timeout, busy, signal pending */
663                 case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
664                 case EINTR: case ERESTARTSYS:
665                         /* peer not (yet) available, network problem */
666                 case ECONNREFUSED: case ENETUNREACH:
667                 case EHOSTDOWN:    case EHOSTUNREACH:
668                         disconnect_on_error = 0;
669                         break;
670                 default:
671                         drbd_err(connection, "%s failed, err = %d\n", what, err);
672                 }
673                 if (disconnect_on_error)
674                         conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
675         }
676
677         return sock;
678 }
679
680 struct accept_wait_data {
681         struct drbd_connection *connection;
682         struct socket *s_listen;
683         struct completion door_bell;
684         void (*original_sk_state_change)(struct sock *sk);
685
686 };
687
688 static void drbd_incoming_connection(struct sock *sk)
689 {
690         struct accept_wait_data *ad = sk->sk_user_data;
691         void (*state_change)(struct sock *sk);
692
693         state_change = ad->original_sk_state_change;
694         if (sk->sk_state == TCP_ESTABLISHED)
695                 complete(&ad->door_bell);
696         state_change(sk);
697 }
698
699 static int prepare_listen_socket(struct drbd_connection *connection, struct accept_wait_data *ad)
700 {
701         int err, sndbuf_size, rcvbuf_size, my_addr_len;
702         struct sockaddr_in6 my_addr;
703         struct socket *s_listen;
704         struct net_conf *nc;
705         const char *what;
706
707         rcu_read_lock();
708         nc = rcu_dereference(connection->net_conf);
709         if (!nc) {
710                 rcu_read_unlock();
711                 return -EIO;
712         }
713         sndbuf_size = nc->sndbuf_size;
714         rcvbuf_size = nc->rcvbuf_size;
715         rcu_read_unlock();
716
717         my_addr_len = min_t(int, connection->my_addr_len, sizeof(struct sockaddr_in6));
718         memcpy(&my_addr, &connection->my_addr, my_addr_len);
719
720         what = "sock_create_kern";
721         err = sock_create_kern(&init_net, ((struct sockaddr *)&my_addr)->sa_family,
722                                SOCK_STREAM, IPPROTO_TCP, &s_listen);
723         if (err) {
724                 s_listen = NULL;
725                 goto out;
726         }
727
728         s_listen->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
729         drbd_setbufsize(s_listen, sndbuf_size, rcvbuf_size);
730
731         what = "bind before listen";
732         err = s_listen->ops->bind(s_listen, (struct sockaddr *)&my_addr, my_addr_len);
733         if (err < 0)
734                 goto out;
735
736         ad->s_listen = s_listen;
737         write_lock_bh(&s_listen->sk->sk_callback_lock);
738         ad->original_sk_state_change = s_listen->sk->sk_state_change;
739         s_listen->sk->sk_state_change = drbd_incoming_connection;
740         s_listen->sk->sk_user_data = ad;
741         write_unlock_bh(&s_listen->sk->sk_callback_lock);
742
743         what = "listen";
744         err = s_listen->ops->listen(s_listen, 5);
745         if (err < 0)
746                 goto out;
747
748         return 0;
749 out:
750         if (s_listen)
751                 sock_release(s_listen);
752         if (err < 0) {
753                 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
754                         drbd_err(connection, "%s failed, err = %d\n", what, err);
755                         conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
756                 }
757         }
758
759         return -EIO;
760 }
761
762 static void unregister_state_change(struct sock *sk, struct accept_wait_data *ad)
763 {
764         write_lock_bh(&sk->sk_callback_lock);
765         sk->sk_state_change = ad->original_sk_state_change;
766         sk->sk_user_data = NULL;
767         write_unlock_bh(&sk->sk_callback_lock);
768 }
769
770 static struct socket *drbd_wait_for_connect(struct drbd_connection *connection, struct accept_wait_data *ad)
771 {
772         int timeo, connect_int, err = 0;
773         struct socket *s_estab = NULL;
774         struct net_conf *nc;
775
776         rcu_read_lock();
777         nc = rcu_dereference(connection->net_conf);
778         if (!nc) {
779                 rcu_read_unlock();
780                 return NULL;
781         }
782         connect_int = nc->connect_int;
783         rcu_read_unlock();
784
785         timeo = connect_int * HZ;
786         /* 28.5% random jitter */
787         timeo += (prandom_u32() & 1) ? timeo / 7 : -timeo / 7;
788
789         err = wait_for_completion_interruptible_timeout(&ad->door_bell, timeo);
790         if (err <= 0)
791                 return NULL;
792
793         err = kernel_accept(ad->s_listen, &s_estab, 0);
794         if (err < 0) {
795                 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
796                         drbd_err(connection, "accept failed, err = %d\n", err);
797                         conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
798                 }
799         }
800
801         if (s_estab)
802                 unregister_state_change(s_estab->sk, ad);
803
804         return s_estab;
805 }
806
807 static int decode_header(struct drbd_connection *, void *, struct packet_info *);
808
809 static int send_first_packet(struct drbd_connection *connection, struct drbd_socket *sock,
810                              enum drbd_packet cmd)
811 {
812         if (!conn_prepare_command(connection, sock))
813                 return -EIO;
814         return conn_send_command(connection, sock, cmd, 0, NULL, 0);
815 }
816
817 static int receive_first_packet(struct drbd_connection *connection, struct socket *sock)
818 {
819         unsigned int header_size = drbd_header_size(connection);
820         struct packet_info pi;
821         struct net_conf *nc;
822         int err;
823
824         rcu_read_lock();
825         nc = rcu_dereference(connection->net_conf);
826         if (!nc) {
827                 rcu_read_unlock();
828                 return -EIO;
829         }
830         sock->sk->sk_rcvtimeo = nc->ping_timeo * 4 * HZ / 10;
831         rcu_read_unlock();
832
833         err = drbd_recv_short(sock, connection->data.rbuf, header_size, 0);
834         if (err != header_size) {
835                 if (err >= 0)
836                         err = -EIO;
837                 return err;
838         }
839         err = decode_header(connection, connection->data.rbuf, &pi);
840         if (err)
841                 return err;
842         return pi.cmd;
843 }
844
845 /**
846  * drbd_socket_okay() - Free the socket if its connection is not okay
847  * @sock:       pointer to the pointer to the socket.
848  */
849 static bool drbd_socket_okay(struct socket **sock)
850 {
851         int rr;
852         char tb[4];
853
854         if (!*sock)
855                 return false;
856
857         rr = drbd_recv_short(*sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
858
859         if (rr > 0 || rr == -EAGAIN) {
860                 return true;
861         } else {
862                 sock_release(*sock);
863                 *sock = NULL;
864                 return false;
865         }
866 }
867
868 static bool connection_established(struct drbd_connection *connection,
869                                    struct socket **sock1,
870                                    struct socket **sock2)
871 {
872         struct net_conf *nc;
873         int timeout;
874         bool ok;
875
876         if (!*sock1 || !*sock2)
877                 return false;
878
879         rcu_read_lock();
880         nc = rcu_dereference(connection->net_conf);
881         timeout = (nc->sock_check_timeo ?: nc->ping_timeo) * HZ / 10;
882         rcu_read_unlock();
883         schedule_timeout_interruptible(timeout);
884
885         ok = drbd_socket_okay(sock1);
886         ok = drbd_socket_okay(sock2) && ok;
887
888         return ok;
889 }
890
891 /* Gets called if a connection is established, or if a new minor gets created
892    in a connection */
893 int drbd_connected(struct drbd_peer_device *peer_device)
894 {
895         struct drbd_device *device = peer_device->device;
896         int err;
897
898         atomic_set(&device->packet_seq, 0);
899         device->peer_seq = 0;
900
901         device->state_mutex = peer_device->connection->agreed_pro_version < 100 ?
902                 &peer_device->connection->cstate_mutex :
903                 &device->own_state_mutex;
904
905         err = drbd_send_sync_param(peer_device);
906         if (!err)
907                 err = drbd_send_sizes(peer_device, 0, 0);
908         if (!err)
909                 err = drbd_send_uuids(peer_device);
910         if (!err)
911                 err = drbd_send_current_state(peer_device);
912         clear_bit(USE_DEGR_WFC_T, &device->flags);
913         clear_bit(RESIZE_PENDING, &device->flags);
914         atomic_set(&device->ap_in_flight, 0);
915         mod_timer(&device->request_timer, jiffies + HZ); /* just start it here. */
916         return err;
917 }
918
919 /*
920  * return values:
921  *   1 yes, we have a valid connection
922  *   0 oops, did not work out, please try again
923  *  -1 peer talks different language,
924  *     no point in trying again, please go standalone.
925  *  -2 We do not have a network config...
926  */
927 static int conn_connect(struct drbd_connection *connection)
928 {
929         struct drbd_socket sock, msock;
930         struct drbd_peer_device *peer_device;
931         struct net_conf *nc;
932         int vnr, timeout, h;
933         bool discard_my_data, ok;
934         enum drbd_state_rv rv;
935         struct accept_wait_data ad = {
936                 .connection = connection,
937                 .door_bell = COMPLETION_INITIALIZER_ONSTACK(ad.door_bell),
938         };
939
940         clear_bit(DISCONNECT_SENT, &connection->flags);
941         if (conn_request_state(connection, NS(conn, C_WF_CONNECTION), CS_VERBOSE) < SS_SUCCESS)
942                 return -2;
943
944         mutex_init(&sock.mutex);
945         sock.sbuf = connection->data.sbuf;
946         sock.rbuf = connection->data.rbuf;
947         sock.socket = NULL;
948         mutex_init(&msock.mutex);
949         msock.sbuf = connection->meta.sbuf;
950         msock.rbuf = connection->meta.rbuf;
951         msock.socket = NULL;
952
953         /* Assume that the peer only understands protocol 80 until we know better.  */
954         connection->agreed_pro_version = 80;
955
956         if (prepare_listen_socket(connection, &ad))
957                 return 0;
958
959         do {
960                 struct socket *s;
961
962                 s = drbd_try_connect(connection);
963                 if (s) {
964                         if (!sock.socket) {
965                                 sock.socket = s;
966                                 send_first_packet(connection, &sock, P_INITIAL_DATA);
967                         } else if (!msock.socket) {
968                                 clear_bit(RESOLVE_CONFLICTS, &connection->flags);
969                                 msock.socket = s;
970                                 send_first_packet(connection, &msock, P_INITIAL_META);
971                         } else {
972                                 drbd_err(connection, "Logic error in conn_connect()\n");
973                                 goto out_release_sockets;
974                         }
975                 }
976
977                 if (connection_established(connection, &sock.socket, &msock.socket))
978                         break;
979
980 retry:
981                 s = drbd_wait_for_connect(connection, &ad);
982                 if (s) {
983                         int fp = receive_first_packet(connection, s);
984                         drbd_socket_okay(&sock.socket);
985                         drbd_socket_okay(&msock.socket);
986                         switch (fp) {
987                         case P_INITIAL_DATA:
988                                 if (sock.socket) {
989                                         drbd_warn(connection, "initial packet S crossed\n");
990                                         sock_release(sock.socket);
991                                         sock.socket = s;
992                                         goto randomize;
993                                 }
994                                 sock.socket = s;
995                                 break;
996                         case P_INITIAL_META:
997                                 set_bit(RESOLVE_CONFLICTS, &connection->flags);
998                                 if (msock.socket) {
999                                         drbd_warn(connection, "initial packet M crossed\n");
1000                                         sock_release(msock.socket);
1001                                         msock.socket = s;
1002                                         goto randomize;
1003                                 }
1004                                 msock.socket = s;
1005                                 break;
1006                         default:
1007                                 drbd_warn(connection, "Error receiving initial packet\n");
1008                                 sock_release(s);
1009 randomize:
1010                                 if (prandom_u32() & 1)
1011                                         goto retry;
1012                         }
1013                 }
1014
1015                 if (connection->cstate <= C_DISCONNECTING)
1016                         goto out_release_sockets;
1017                 if (signal_pending(current)) {
1018                         flush_signals(current);
1019                         smp_rmb();
1020                         if (get_t_state(&connection->receiver) == EXITING)
1021                                 goto out_release_sockets;
1022                 }
1023
1024                 ok = connection_established(connection, &sock.socket, &msock.socket);
1025         } while (!ok);
1026
1027         if (ad.s_listen)
1028                 sock_release(ad.s_listen);
1029
1030         sock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
1031         msock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
1032
1033         sock.socket->sk->sk_allocation = GFP_NOIO;
1034         msock.socket->sk->sk_allocation = GFP_NOIO;
1035
1036         sock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
1037         msock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE;
1038
1039         /* NOT YET ...
1040          * sock.socket->sk->sk_sndtimeo = connection->net_conf->timeout*HZ/10;
1041          * sock.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
1042          * first set it to the P_CONNECTION_FEATURES timeout,
1043          * which we set to 4x the configured ping_timeout. */
1044         rcu_read_lock();
1045         nc = rcu_dereference(connection->net_conf);
1046
1047         sock.socket->sk->sk_sndtimeo =
1048         sock.socket->sk->sk_rcvtimeo = nc->ping_timeo*4*HZ/10;
1049
1050         msock.socket->sk->sk_rcvtimeo = nc->ping_int*HZ;
1051         timeout = nc->timeout * HZ / 10;
1052         discard_my_data = nc->discard_my_data;
1053         rcu_read_unlock();
1054
1055         msock.socket->sk->sk_sndtimeo = timeout;
1056
1057         /* we don't want delays.
1058          * we use TCP_CORK where appropriate, though */
1059         drbd_tcp_nodelay(sock.socket);
1060         drbd_tcp_nodelay(msock.socket);
1061
1062         connection->data.socket = sock.socket;
1063         connection->meta.socket = msock.socket;
1064         connection->last_received = jiffies;
1065
1066         h = drbd_do_features(connection);
1067         if (h <= 0)
1068                 return h;
1069
1070         if (connection->cram_hmac_tfm) {
1071                 /* drbd_request_state(device, NS(conn, WFAuth)); */
1072                 switch (drbd_do_auth(connection)) {
1073                 case -1:
1074                         drbd_err(connection, "Authentication of peer failed\n");
1075                         return -1;
1076                 case 0:
1077                         drbd_err(connection, "Authentication of peer failed, trying again.\n");
1078                         return 0;
1079                 }
1080         }
1081
1082         connection->data.socket->sk->sk_sndtimeo = timeout;
1083         connection->data.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
1084
1085         if (drbd_send_protocol(connection) == -EOPNOTSUPP)
1086                 return -1;
1087
1088         /* Prevent a race between resync-handshake and
1089          * being promoted to Primary.
1090          *
1091          * Grab and release the state mutex, so we know that any current
1092          * drbd_set_role() is finished, and any incoming drbd_set_role
1093          * will see the STATE_SENT flag, and wait for it to be cleared.
1094          */
1095         idr_for_each_entry(&connection->peer_devices, peer_device, vnr)
1096                 mutex_lock(peer_device->device->state_mutex);
1097
1098         set_bit(STATE_SENT, &connection->flags);
1099
1100         idr_for_each_entry(&connection->peer_devices, peer_device, vnr)
1101                 mutex_unlock(peer_device->device->state_mutex);
1102
1103         rcu_read_lock();
1104         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1105                 struct drbd_device *device = peer_device->device;
1106                 kref_get(&device->kref);
1107                 rcu_read_unlock();
1108
1109                 if (discard_my_data)
1110                         set_bit(DISCARD_MY_DATA, &device->flags);
1111                 else
1112                         clear_bit(DISCARD_MY_DATA, &device->flags);
1113
1114                 drbd_connected(peer_device);
1115                 kref_put(&device->kref, drbd_destroy_device);
1116                 rcu_read_lock();
1117         }
1118         rcu_read_unlock();
1119
1120         rv = conn_request_state(connection, NS(conn, C_WF_REPORT_PARAMS), CS_VERBOSE);
1121         if (rv < SS_SUCCESS || connection->cstate != C_WF_REPORT_PARAMS) {
1122                 clear_bit(STATE_SENT, &connection->flags);
1123                 return 0;
1124         }
1125
1126         drbd_thread_start(&connection->ack_receiver);
1127         /* opencoded create_singlethread_workqueue(),
1128          * to be able to use format string arguments */
1129         connection->ack_sender =
1130                 alloc_ordered_workqueue("drbd_as_%s", WQ_MEM_RECLAIM, connection->resource->name);
1131         if (!connection->ack_sender) {
1132                 drbd_err(connection, "Failed to create workqueue ack_sender\n");
1133                 return 0;
1134         }
1135
1136         mutex_lock(&connection->resource->conf_update);
1137         /* The discard_my_data flag is a single-shot modifier to the next
1138          * connection attempt, the handshake of which is now well underway.
1139          * No need for rcu style copying of the whole struct
1140          * just to clear a single value. */
1141         connection->net_conf->discard_my_data = 0;
1142         mutex_unlock(&connection->resource->conf_update);
1143
1144         return h;
1145
1146 out_release_sockets:
1147         if (ad.s_listen)
1148                 sock_release(ad.s_listen);
1149         if (sock.socket)
1150                 sock_release(sock.socket);
1151         if (msock.socket)
1152                 sock_release(msock.socket);
1153         return -1;
1154 }
1155
1156 static int decode_header(struct drbd_connection *connection, void *header, struct packet_info *pi)
1157 {
1158         unsigned int header_size = drbd_header_size(connection);
1159
1160         if (header_size == sizeof(struct p_header100) &&
1161             *(__be32 *)header == cpu_to_be32(DRBD_MAGIC_100)) {
1162                 struct p_header100 *h = header;
1163                 if (h->pad != 0) {
1164                         drbd_err(connection, "Header padding is not zero\n");
1165                         return -EINVAL;
1166                 }
1167                 pi->vnr = be16_to_cpu(h->volume);
1168                 pi->cmd = be16_to_cpu(h->command);
1169                 pi->size = be32_to_cpu(h->length);
1170         } else if (header_size == sizeof(struct p_header95) &&
1171                    *(__be16 *)header == cpu_to_be16(DRBD_MAGIC_BIG)) {
1172                 struct p_header95 *h = header;
1173                 pi->cmd = be16_to_cpu(h->command);
1174                 pi->size = be32_to_cpu(h->length);
1175                 pi->vnr = 0;
1176         } else if (header_size == sizeof(struct p_header80) &&
1177                    *(__be32 *)header == cpu_to_be32(DRBD_MAGIC)) {
1178                 struct p_header80 *h = header;
1179                 pi->cmd = be16_to_cpu(h->command);
1180                 pi->size = be16_to_cpu(h->length);
1181                 pi->vnr = 0;
1182         } else {
1183                 drbd_err(connection, "Wrong magic value 0x%08x in protocol version %d\n",
1184                          be32_to_cpu(*(__be32 *)header),
1185                          connection->agreed_pro_version);
1186                 return -EINVAL;
1187         }
1188         pi->data = header + header_size;
1189         return 0;
1190 }
1191
1192 static int drbd_recv_header(struct drbd_connection *connection, struct packet_info *pi)
1193 {
1194         void *buffer = connection->data.rbuf;
1195         int err;
1196
1197         err = drbd_recv_all_warn(connection, buffer, drbd_header_size(connection));
1198         if (err)
1199                 return err;
1200
1201         err = decode_header(connection, buffer, pi);
1202         connection->last_received = jiffies;
1203
1204         return err;
1205 }
1206
1207 static void drbd_flush(struct drbd_connection *connection)
1208 {
1209         int rv;
1210         struct drbd_peer_device *peer_device;
1211         int vnr;
1212
1213         if (connection->resource->write_ordering >= WO_BDEV_FLUSH) {
1214                 rcu_read_lock();
1215                 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1216                         struct drbd_device *device = peer_device->device;
1217
1218                         if (!get_ldev(device))
1219                                 continue;
1220                         kref_get(&device->kref);
1221                         rcu_read_unlock();
1222
1223                         /* Right now, we have only this one synchronous code path
1224                          * for flushes between request epochs.
1225                          * We may want to make those asynchronous,
1226                          * or at least parallelize the flushes to the volume devices.
1227                          */
1228                         device->flush_jif = jiffies;
1229                         set_bit(FLUSH_PENDING, &device->flags);
1230                         rv = blkdev_issue_flush(device->ldev->backing_bdev,
1231                                         GFP_NOIO, NULL);
1232                         clear_bit(FLUSH_PENDING, &device->flags);
1233                         if (rv) {
1234                                 drbd_info(device, "local disk flush failed with status %d\n", rv);
1235                                 /* would rather check on EOPNOTSUPP, but that is not reliable.
1236                                  * don't try again for ANY return value != 0
1237                                  * if (rv == -EOPNOTSUPP) */
1238                                 drbd_bump_write_ordering(connection->resource, NULL, WO_DRAIN_IO);
1239                         }
1240                         put_ldev(device);
1241                         kref_put(&device->kref, drbd_destroy_device);
1242
1243                         rcu_read_lock();
1244                         if (rv)
1245                                 break;
1246                 }
1247                 rcu_read_unlock();
1248         }
1249 }
1250
1251 /**
1252  * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
1253  * @device:     DRBD device.
1254  * @epoch:      Epoch object.
1255  * @ev:         Epoch event.
1256  */
1257 static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *connection,
1258                                                struct drbd_epoch *epoch,
1259                                                enum epoch_event ev)
1260 {
1261         int epoch_size;
1262         struct drbd_epoch *next_epoch;
1263         enum finish_epoch rv = FE_STILL_LIVE;
1264
1265         spin_lock(&connection->epoch_lock);
1266         do {
1267                 next_epoch = NULL;
1268
1269                 epoch_size = atomic_read(&epoch->epoch_size);
1270
1271                 switch (ev & ~EV_CLEANUP) {
1272                 case EV_PUT:
1273                         atomic_dec(&epoch->active);
1274                         break;
1275                 case EV_GOT_BARRIER_NR:
1276                         set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1277                         break;
1278                 case EV_BECAME_LAST:
1279                         /* nothing to do*/
1280                         break;
1281                 }
1282
1283                 if (epoch_size != 0 &&
1284                     atomic_read(&epoch->active) == 0 &&
1285                     (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags) || ev & EV_CLEANUP)) {
1286                         if (!(ev & EV_CLEANUP)) {
1287                                 spin_unlock(&connection->epoch_lock);
1288                                 drbd_send_b_ack(epoch->connection, epoch->barrier_nr, epoch_size);
1289                                 spin_lock(&connection->epoch_lock);
1290                         }
1291 #if 0
1292                         /* FIXME: dec unacked on connection, once we have
1293                          * something to count pending connection packets in. */
1294                         if (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags))
1295                                 dec_unacked(epoch->connection);
1296 #endif
1297
1298                         if (connection->current_epoch != epoch) {
1299                                 next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1300                                 list_del(&epoch->list);
1301                                 ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1302                                 connection->epochs--;
1303                                 kfree(epoch);
1304
1305                                 if (rv == FE_STILL_LIVE)
1306                                         rv = FE_DESTROYED;
1307                         } else {
1308                                 epoch->flags = 0;
1309                                 atomic_set(&epoch->epoch_size, 0);
1310                                 /* atomic_set(&epoch->active, 0); is already zero */
1311                                 if (rv == FE_STILL_LIVE)
1312                                         rv = FE_RECYCLED;
1313                         }
1314                 }
1315
1316                 if (!next_epoch)
1317                         break;
1318
1319                 epoch = next_epoch;
1320         } while (1);
1321
1322         spin_unlock(&connection->epoch_lock);
1323
1324         return rv;
1325 }
1326
1327 static enum write_ordering_e
1328 max_allowed_wo(struct drbd_backing_dev *bdev, enum write_ordering_e wo)
1329 {
1330         struct disk_conf *dc;
1331
1332         dc = rcu_dereference(bdev->disk_conf);
1333
1334         if (wo == WO_BDEV_FLUSH && !dc->disk_flushes)
1335                 wo = WO_DRAIN_IO;
1336         if (wo == WO_DRAIN_IO && !dc->disk_drain)
1337                 wo = WO_NONE;
1338
1339         return wo;
1340 }
1341
1342 /**
1343  * drbd_bump_write_ordering() - Fall back to an other write ordering method
1344  * @connection: DRBD connection.
1345  * @wo:         Write ordering method to try.
1346  */
1347 void drbd_bump_write_ordering(struct drbd_resource *resource, struct drbd_backing_dev *bdev,
1348                               enum write_ordering_e wo)
1349 {
1350         struct drbd_device *device;
1351         enum write_ordering_e pwo;
1352         int vnr;
1353         static char *write_ordering_str[] = {
1354                 [WO_NONE] = "none",
1355                 [WO_DRAIN_IO] = "drain",
1356                 [WO_BDEV_FLUSH] = "flush",
1357         };
1358
1359         pwo = resource->write_ordering;
1360         if (wo != WO_BDEV_FLUSH)
1361                 wo = min(pwo, wo);
1362         rcu_read_lock();
1363         idr_for_each_entry(&resource->devices, device, vnr) {
1364                 if (get_ldev(device)) {
1365                         wo = max_allowed_wo(device->ldev, wo);
1366                         if (device->ldev == bdev)
1367                                 bdev = NULL;
1368                         put_ldev(device);
1369                 }
1370         }
1371
1372         if (bdev)
1373                 wo = max_allowed_wo(bdev, wo);
1374
1375         rcu_read_unlock();
1376
1377         resource->write_ordering = wo;
1378         if (pwo != resource->write_ordering || wo == WO_BDEV_FLUSH)
1379                 drbd_info(resource, "Method to ensure write ordering: %s\n", write_ordering_str[resource->write_ordering]);
1380 }
1381
1382 /**
1383  * drbd_submit_peer_request()
1384  * @device:     DRBD device.
1385  * @peer_req:   peer request
1386  * @rw:         flag field, see bio->bi_rw
1387  *
1388  * May spread the pages to multiple bios,
1389  * depending on bio_add_page restrictions.
1390  *
1391  * Returns 0 if all bios have been submitted,
1392  * -ENOMEM if we could not allocate enough bios,
1393  * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1394  *  single page to an empty bio (which should never happen and likely indicates
1395  *  that the lower level IO stack is in some way broken). This has been observed
1396  *  on certain Xen deployments.
1397  */
1398 /* TODO allocate from our own bio_set. */
1399 int drbd_submit_peer_request(struct drbd_device *device,
1400                              struct drbd_peer_request *peer_req,
1401                              const unsigned op, const unsigned op_flags,
1402                              const int fault_type)
1403 {
1404         struct bio *bios = NULL;
1405         struct bio *bio;
1406         struct page *page = peer_req->pages;
1407         sector_t sector = peer_req->i.sector;
1408         unsigned data_size = peer_req->i.size;
1409         unsigned n_bios = 0;
1410         unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
1411         int err = -ENOMEM;
1412
1413         if (peer_req->flags & EE_IS_TRIM_USE_ZEROOUT) {
1414                 /* wait for all pending IO completions, before we start
1415                  * zeroing things out. */
1416                 conn_wait_active_ee_empty(peer_req->peer_device->connection);
1417                 /* add it to the active list now,
1418                  * so we can find it to present it in debugfs */
1419                 peer_req->submit_jif = jiffies;
1420                 peer_req->flags |= EE_SUBMITTED;
1421                 spin_lock_irq(&device->resource->req_lock);
1422                 list_add_tail(&peer_req->w.list, &device->active_ee);
1423                 spin_unlock_irq(&device->resource->req_lock);
1424                 if (blkdev_issue_zeroout(device->ldev->backing_bdev,
1425                         sector, data_size >> 9, GFP_NOIO, false))
1426                         peer_req->flags |= EE_WAS_ERROR;
1427                 drbd_endio_write_sec_final(peer_req);
1428                 return 0;
1429         }
1430
1431         /* Discards don't have any payload.
1432          * But the scsi layer still expects a bio_vec it can use internally,
1433          * see sd_setup_discard_cmnd() and blk_add_request_payload(). */
1434         if (peer_req->flags & EE_IS_TRIM)
1435                 nr_pages = 1;
1436
1437         /* In most cases, we will only need one bio.  But in case the lower
1438          * level restrictions happen to be different at this offset on this
1439          * side than those of the sending peer, we may need to submit the
1440          * request in more than one bio.
1441          *
1442          * Plain bio_alloc is good enough here, this is no DRBD internally
1443          * generated bio, but a bio allocated on behalf of the peer.
1444          */
1445 next_bio:
1446         bio = bio_alloc(GFP_NOIO, nr_pages);
1447         if (!bio) {
1448                 drbd_err(device, "submit_ee: Allocation of a bio failed (nr_pages=%u)\n", nr_pages);
1449                 goto fail;
1450         }
1451         /* > peer_req->i.sector, unless this is the first bio */
1452         bio->bi_iter.bi_sector = sector;
1453         bio->bi_bdev = device->ldev->backing_bdev;
1454         bio_set_op_attrs(bio, op, op_flags);
1455         bio->bi_private = peer_req;
1456         bio->bi_end_io = drbd_peer_request_endio;
1457
1458         bio->bi_next = bios;
1459         bios = bio;
1460         ++n_bios;
1461
1462         if (op == REQ_OP_DISCARD) {
1463                 bio->bi_iter.bi_size = data_size;
1464                 goto submit;
1465         }
1466
1467         page_chain_for_each(page) {
1468                 unsigned len = min_t(unsigned, data_size, PAGE_SIZE);
1469                 if (!bio_add_page(bio, page, len, 0)) {
1470                         /* A single page must always be possible!
1471                          * But in case it fails anyways,
1472                          * we deal with it, and complain (below). */
1473                         if (bio->bi_vcnt == 0) {
1474                                 drbd_err(device,
1475                                         "bio_add_page failed for len=%u, "
1476                                         "bi_vcnt=0 (bi_sector=%llu)\n",
1477                                         len, (uint64_t)bio->bi_iter.bi_sector);
1478                                 err = -ENOSPC;
1479                                 goto fail;
1480                         }
1481                         goto next_bio;
1482                 }
1483                 data_size -= len;
1484                 sector += len >> 9;
1485                 --nr_pages;
1486         }
1487         D_ASSERT(device, data_size == 0);
1488 submit:
1489         D_ASSERT(device, page == NULL);
1490
1491         atomic_set(&peer_req->pending_bios, n_bios);
1492         /* for debugfs: update timestamp, mark as submitted */
1493         peer_req->submit_jif = jiffies;
1494         peer_req->flags |= EE_SUBMITTED;
1495         do {
1496                 bio = bios;
1497                 bios = bios->bi_next;
1498                 bio->bi_next = NULL;
1499
1500                 drbd_generic_make_request(device, fault_type, bio);
1501         } while (bios);
1502         return 0;
1503
1504 fail:
1505         while (bios) {
1506                 bio = bios;
1507                 bios = bios->bi_next;
1508                 bio_put(bio);
1509         }
1510         return err;
1511 }
1512
1513 static void drbd_remove_epoch_entry_interval(struct drbd_device *device,
1514                                              struct drbd_peer_request *peer_req)
1515 {
1516         struct drbd_interval *i = &peer_req->i;
1517
1518         drbd_remove_interval(&device->write_requests, i);
1519         drbd_clear_interval(i);
1520
1521         /* Wake up any processes waiting for this peer request to complete.  */
1522         if (i->waiting)
1523                 wake_up(&device->misc_wait);
1524 }
1525
1526 static void conn_wait_active_ee_empty(struct drbd_connection *connection)
1527 {
1528         struct drbd_peer_device *peer_device;
1529         int vnr;
1530
1531         rcu_read_lock();
1532         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1533                 struct drbd_device *device = peer_device->device;
1534
1535                 kref_get(&device->kref);
1536                 rcu_read_unlock();
1537                 drbd_wait_ee_list_empty(device, &device->active_ee);
1538                 kref_put(&device->kref, drbd_destroy_device);
1539                 rcu_read_lock();
1540         }
1541         rcu_read_unlock();
1542 }
1543
1544 static int receive_Barrier(struct drbd_connection *connection, struct packet_info *pi)
1545 {
1546         int rv;
1547         struct p_barrier *p = pi->data;
1548         struct drbd_epoch *epoch;
1549
1550         /* FIXME these are unacked on connection,
1551          * not a specific (peer)device.
1552          */
1553         connection->current_epoch->barrier_nr = p->barrier;
1554         connection->current_epoch->connection = connection;
1555         rv = drbd_may_finish_epoch(connection, connection->current_epoch, EV_GOT_BARRIER_NR);
1556
1557         /* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1558          * the activity log, which means it would not be resynced in case the
1559          * R_PRIMARY crashes now.
1560          * Therefore we must send the barrier_ack after the barrier request was
1561          * completed. */
1562         switch (connection->resource->write_ordering) {
1563         case WO_NONE:
1564                 if (rv == FE_RECYCLED)
1565                         return 0;
1566
1567                 /* receiver context, in the writeout path of the other node.
1568                  * avoid potential distributed deadlock */
1569                 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1570                 if (epoch)
1571                         break;
1572                 else
1573                         drbd_warn(connection, "Allocation of an epoch failed, slowing down\n");
1574                         /* Fall through */
1575
1576         case WO_BDEV_FLUSH:
1577         case WO_DRAIN_IO:
1578                 conn_wait_active_ee_empty(connection);
1579                 drbd_flush(connection);
1580
1581                 if (atomic_read(&connection->current_epoch->epoch_size)) {
1582                         epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1583                         if (epoch)
1584                                 break;
1585                 }
1586
1587                 return 0;
1588         default:
1589                 drbd_err(connection, "Strangeness in connection->write_ordering %d\n",
1590                          connection->resource->write_ordering);
1591                 return -EIO;
1592         }
1593
1594         epoch->flags = 0;
1595         atomic_set(&epoch->epoch_size, 0);
1596         atomic_set(&epoch->active, 0);
1597
1598         spin_lock(&connection->epoch_lock);
1599         if (atomic_read(&connection->current_epoch->epoch_size)) {
1600                 list_add(&epoch->list, &connection->current_epoch->list);
1601                 connection->current_epoch = epoch;
1602                 connection->epochs++;
1603         } else {
1604                 /* The current_epoch got recycled while we allocated this one... */
1605                 kfree(epoch);
1606         }
1607         spin_unlock(&connection->epoch_lock);
1608
1609         return 0;
1610 }
1611
1612 /* used from receive_RSDataReply (recv_resync_read)
1613  * and from receive_Data */
1614 static struct drbd_peer_request *
1615 read_in_block(struct drbd_peer_device *peer_device, u64 id, sector_t sector,
1616               struct packet_info *pi) __must_hold(local)
1617 {
1618         struct drbd_device *device = peer_device->device;
1619         const sector_t capacity = drbd_get_capacity(device->this_bdev);
1620         struct drbd_peer_request *peer_req;
1621         struct page *page;
1622         int digest_size, err;
1623         unsigned int data_size = pi->size, ds;
1624         void *dig_in = peer_device->connection->int_dig_in;
1625         void *dig_vv = peer_device->connection->int_dig_vv;
1626         unsigned long *data;
1627         struct p_trim *trim = (pi->cmd == P_TRIM) ? pi->data : NULL;
1628
1629         digest_size = 0;
1630         if (!trim && peer_device->connection->peer_integrity_tfm) {
1631                 digest_size = crypto_ahash_digestsize(peer_device->connection->peer_integrity_tfm);
1632                 /*
1633                  * FIXME: Receive the incoming digest into the receive buffer
1634                  *        here, together with its struct p_data?
1635                  */
1636                 err = drbd_recv_all_warn(peer_device->connection, dig_in, digest_size);
1637                 if (err)
1638                         return NULL;
1639                 data_size -= digest_size;
1640         }
1641
1642         if (trim) {
1643                 D_ASSERT(peer_device, data_size == 0);
1644                 data_size = be32_to_cpu(trim->size);
1645         }
1646
1647         if (!expect(IS_ALIGNED(data_size, 512)))
1648                 return NULL;
1649         /* prepare for larger trim requests. */
1650         if (!trim && !expect(data_size <= DRBD_MAX_BIO_SIZE))
1651                 return NULL;
1652
1653         /* even though we trust out peer,
1654          * we sometimes have to double check. */
1655         if (sector + (data_size>>9) > capacity) {
1656                 drbd_err(device, "request from peer beyond end of local disk: "
1657                         "capacity: %llus < sector: %llus + size: %u\n",
1658                         (unsigned long long)capacity,
1659                         (unsigned long long)sector, data_size);
1660                 return NULL;
1661         }
1662
1663         /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1664          * "criss-cross" setup, that might cause write-out on some other DRBD,
1665          * which in turn might block on the other node at this very place.  */
1666         peer_req = drbd_alloc_peer_req(peer_device, id, sector, data_size, trim == NULL, GFP_NOIO);
1667         if (!peer_req)
1668                 return NULL;
1669
1670         peer_req->flags |= EE_WRITE;
1671         if (trim)
1672                 return peer_req;
1673
1674         ds = data_size;
1675         page = peer_req->pages;
1676         page_chain_for_each(page) {
1677                 unsigned len = min_t(int, ds, PAGE_SIZE);
1678                 data = kmap(page);
1679                 err = drbd_recv_all_warn(peer_device->connection, data, len);
1680                 if (drbd_insert_fault(device, DRBD_FAULT_RECEIVE)) {
1681                         drbd_err(device, "Fault injection: Corrupting data on receive\n");
1682                         data[0] = data[0] ^ (unsigned long)-1;
1683                 }
1684                 kunmap(page);
1685                 if (err) {
1686                         drbd_free_peer_req(device, peer_req);
1687                         return NULL;
1688                 }
1689                 ds -= len;
1690         }
1691
1692         if (digest_size) {
1693                 drbd_csum_ee(peer_device->connection->peer_integrity_tfm, peer_req, dig_vv);
1694                 if (memcmp(dig_in, dig_vv, digest_size)) {
1695                         drbd_err(device, "Digest integrity check FAILED: %llus +%u\n",
1696                                 (unsigned long long)sector, data_size);
1697                         drbd_free_peer_req(device, peer_req);
1698                         return NULL;
1699                 }
1700         }
1701         device->recv_cnt += data_size >> 9;
1702         return peer_req;
1703 }
1704
1705 /* drbd_drain_block() just takes a data block
1706  * out of the socket input buffer, and discards it.
1707  */
1708 static int drbd_drain_block(struct drbd_peer_device *peer_device, int data_size)
1709 {
1710         struct page *page;
1711         int err = 0;
1712         void *data;
1713
1714         if (!data_size)
1715                 return 0;
1716
1717         page = drbd_alloc_pages(peer_device, 1, 1);
1718
1719         data = kmap(page);
1720         while (data_size) {
1721                 unsigned int len = min_t(int, data_size, PAGE_SIZE);
1722
1723                 err = drbd_recv_all_warn(peer_device->connection, data, len);
1724                 if (err)
1725                         break;
1726                 data_size -= len;
1727         }
1728         kunmap(page);
1729         drbd_free_pages(peer_device->device, page, 0);
1730         return err;
1731 }
1732
1733 static int recv_dless_read(struct drbd_peer_device *peer_device, struct drbd_request *req,
1734                            sector_t sector, int data_size)
1735 {
1736         struct bio_vec bvec;
1737         struct bvec_iter iter;
1738         struct bio *bio;
1739         int digest_size, err, expect;
1740         void *dig_in = peer_device->connection->int_dig_in;
1741         void *dig_vv = peer_device->connection->int_dig_vv;
1742
1743         digest_size = 0;
1744         if (peer_device->connection->peer_integrity_tfm) {
1745                 digest_size = crypto_ahash_digestsize(peer_device->connection->peer_integrity_tfm);
1746                 err = drbd_recv_all_warn(peer_device->connection, dig_in, digest_size);
1747                 if (err)
1748                         return err;
1749                 data_size -= digest_size;
1750         }
1751
1752         /* optimistically update recv_cnt.  if receiving fails below,
1753          * we disconnect anyways, and counters will be reset. */
1754         peer_device->device->recv_cnt += data_size>>9;
1755
1756         bio = req->master_bio;
1757         D_ASSERT(peer_device->device, sector == bio->bi_iter.bi_sector);
1758
1759         bio_for_each_segment(bvec, bio, iter) {
1760                 void *mapped = kmap(bvec.bv_page) + bvec.bv_offset;
1761                 expect = min_t(int, data_size, bvec.bv_len);
1762                 err = drbd_recv_all_warn(peer_device->connection, mapped, expect);
1763                 kunmap(bvec.bv_page);
1764                 if (err)
1765                         return err;
1766                 data_size -= expect;
1767         }
1768
1769         if (digest_size) {
1770                 drbd_csum_bio(peer_device->connection->peer_integrity_tfm, bio, dig_vv);
1771                 if (memcmp(dig_in, dig_vv, digest_size)) {
1772                         drbd_err(peer_device, "Digest integrity check FAILED. Broken NICs?\n");
1773                         return -EINVAL;
1774                 }
1775         }
1776
1777         D_ASSERT(peer_device->device, data_size == 0);
1778         return 0;
1779 }
1780
1781 /*
1782  * e_end_resync_block() is called in ack_sender context via
1783  * drbd_finish_peer_reqs().
1784  */
1785 static int e_end_resync_block(struct drbd_work *w, int unused)
1786 {
1787         struct drbd_peer_request *peer_req =
1788                 container_of(w, struct drbd_peer_request, w);
1789         struct drbd_peer_device *peer_device = peer_req->peer_device;
1790         struct drbd_device *device = peer_device->device;
1791         sector_t sector = peer_req->i.sector;
1792         int err;
1793
1794         D_ASSERT(device, drbd_interval_empty(&peer_req->i));
1795
1796         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1797                 drbd_set_in_sync(device, sector, peer_req->i.size);
1798                 err = drbd_send_ack(peer_device, P_RS_WRITE_ACK, peer_req);
1799         } else {
1800                 /* Record failure to sync */
1801                 drbd_rs_failed_io(device, sector, peer_req->i.size);
1802
1803                 err  = drbd_send_ack(peer_device, P_NEG_ACK, peer_req);
1804         }
1805         dec_unacked(device);
1806
1807         return err;
1808 }
1809
1810 static int recv_resync_read(struct drbd_peer_device *peer_device, sector_t sector,
1811                             struct packet_info *pi) __releases(local)
1812 {
1813         struct drbd_device *device = peer_device->device;
1814         struct drbd_peer_request *peer_req;
1815
1816         peer_req = read_in_block(peer_device, ID_SYNCER, sector, pi);
1817         if (!peer_req)
1818                 goto fail;
1819
1820         dec_rs_pending(device);
1821
1822         inc_unacked(device);
1823         /* corresponding dec_unacked() in e_end_resync_block()
1824          * respective _drbd_clear_done_ee */
1825
1826         peer_req->w.cb = e_end_resync_block;
1827         peer_req->submit_jif = jiffies;
1828
1829         spin_lock_irq(&device->resource->req_lock);
1830         list_add_tail(&peer_req->w.list, &device->sync_ee);
1831         spin_unlock_irq(&device->resource->req_lock);
1832
1833         atomic_add(pi->size >> 9, &device->rs_sect_ev);
1834         if (drbd_submit_peer_request(device, peer_req, REQ_OP_WRITE, 0,
1835                                      DRBD_FAULT_RS_WR) == 0)
1836                 return 0;
1837
1838         /* don't care for the reason here */
1839         drbd_err(device, "submit failed, triggering re-connect\n");
1840         spin_lock_irq(&device->resource->req_lock);
1841         list_del(&peer_req->w.list);
1842         spin_unlock_irq(&device->resource->req_lock);
1843
1844         drbd_free_peer_req(device, peer_req);
1845 fail:
1846         put_ldev(device);
1847         return -EIO;
1848 }
1849
1850 static struct drbd_request *
1851 find_request(struct drbd_device *device, struct rb_root *root, u64 id,
1852              sector_t sector, bool missing_ok, const char *func)
1853 {
1854         struct drbd_request *req;
1855
1856         /* Request object according to our peer */
1857         req = (struct drbd_request *)(unsigned long)id;
1858         if (drbd_contains_interval(root, sector, &req->i) && req->i.local)
1859                 return req;
1860         if (!missing_ok) {
1861                 drbd_err(device, "%s: failed to find request 0x%lx, sector %llus\n", func,
1862                         (unsigned long)id, (unsigned long long)sector);
1863         }
1864         return NULL;
1865 }
1866
1867 static int receive_DataReply(struct drbd_connection *connection, struct packet_info *pi)
1868 {
1869         struct drbd_peer_device *peer_device;
1870         struct drbd_device *device;
1871         struct drbd_request *req;
1872         sector_t sector;
1873         int err;
1874         struct p_data *p = pi->data;
1875
1876         peer_device = conn_peer_device(connection, pi->vnr);
1877         if (!peer_device)
1878                 return -EIO;
1879         device = peer_device->device;
1880
1881         sector = be64_to_cpu(p->sector);
1882
1883         spin_lock_irq(&device->resource->req_lock);
1884         req = find_request(device, &device->read_requests, p->block_id, sector, false, __func__);
1885         spin_unlock_irq(&device->resource->req_lock);
1886         if (unlikely(!req))
1887                 return -EIO;
1888
1889         /* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
1890          * special casing it there for the various failure cases.
1891          * still no race with drbd_fail_pending_reads */
1892         err = recv_dless_read(peer_device, req, sector, pi->size);
1893         if (!err)
1894                 req_mod(req, DATA_RECEIVED);
1895         /* else: nothing. handled from drbd_disconnect...
1896          * I don't think we may complete this just yet
1897          * in case we are "on-disconnect: freeze" */
1898
1899         return err;
1900 }
1901
1902 static int receive_RSDataReply(struct drbd_connection *connection, struct packet_info *pi)
1903 {
1904         struct drbd_peer_device *peer_device;
1905         struct drbd_device *device;
1906         sector_t sector;
1907         int err;
1908         struct p_data *p = pi->data;
1909
1910         peer_device = conn_peer_device(connection, pi->vnr);
1911         if (!peer_device)
1912                 return -EIO;
1913         device = peer_device->device;
1914
1915         sector = be64_to_cpu(p->sector);
1916         D_ASSERT(device, p->block_id == ID_SYNCER);
1917
1918         if (get_ldev(device)) {
1919                 /* data is submitted to disk within recv_resync_read.
1920                  * corresponding put_ldev done below on error,
1921                  * or in drbd_peer_request_endio. */
1922                 err = recv_resync_read(peer_device, sector, pi);
1923         } else {
1924                 if (__ratelimit(&drbd_ratelimit_state))
1925                         drbd_err(device, "Can not write resync data to local disk.\n");
1926
1927                 err = drbd_drain_block(peer_device, pi->size);
1928
1929                 drbd_send_ack_dp(peer_device, P_NEG_ACK, p, pi->size);
1930         }
1931
1932         atomic_add(pi->size >> 9, &device->rs_sect_in);
1933
1934         return err;
1935 }
1936
1937 static void restart_conflicting_writes(struct drbd_device *device,
1938                                        sector_t sector, int size)
1939 {
1940         struct drbd_interval *i;
1941         struct drbd_request *req;
1942
1943         drbd_for_each_overlap(i, &device->write_requests, sector, size) {
1944                 if (!i->local)
1945                         continue;
1946                 req = container_of(i, struct drbd_request, i);
1947                 if (req->rq_state & RQ_LOCAL_PENDING ||
1948                     !(req->rq_state & RQ_POSTPONED))
1949                         continue;
1950                 /* as it is RQ_POSTPONED, this will cause it to
1951                  * be queued on the retry workqueue. */
1952                 __req_mod(req, CONFLICT_RESOLVED, NULL);
1953         }
1954 }
1955
1956 /*
1957  * e_end_block() is called in ack_sender context via drbd_finish_peer_reqs().
1958  */
1959 static int e_end_block(struct drbd_work *w, int cancel)
1960 {
1961         struct drbd_peer_request *peer_req =
1962                 container_of(w, struct drbd_peer_request, w);
1963         struct drbd_peer_device *peer_device = peer_req->peer_device;
1964         struct drbd_device *device = peer_device->device;
1965         sector_t sector = peer_req->i.sector;
1966         int err = 0, pcmd;
1967
1968         if (peer_req->flags & EE_SEND_WRITE_ACK) {
1969                 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1970                         pcmd = (device->state.conn >= C_SYNC_SOURCE &&
1971                                 device->state.conn <= C_PAUSED_SYNC_T &&
1972                                 peer_req->flags & EE_MAY_SET_IN_SYNC) ?
1973                                 P_RS_WRITE_ACK : P_WRITE_ACK;
1974                         err = drbd_send_ack(peer_device, pcmd, peer_req);
1975                         if (pcmd == P_RS_WRITE_ACK)
1976                                 drbd_set_in_sync(device, sector, peer_req->i.size);
1977                 } else {
1978                         err = drbd_send_ack(peer_device, P_NEG_ACK, peer_req);
1979                         /* we expect it to be marked out of sync anyways...
1980                          * maybe assert this?  */
1981                 }
1982                 dec_unacked(device);
1983         }
1984
1985         /* we delete from the conflict detection hash _after_ we sent out the
1986          * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right.  */
1987         if (peer_req->flags & EE_IN_INTERVAL_TREE) {
1988                 spin_lock_irq(&device->resource->req_lock);
1989                 D_ASSERT(device, !drbd_interval_empty(&peer_req->i));
1990                 drbd_remove_epoch_entry_interval(device, peer_req);
1991                 if (peer_req->flags & EE_RESTART_REQUESTS)
1992                         restart_conflicting_writes(device, sector, peer_req->i.size);
1993                 spin_unlock_irq(&device->resource->req_lock);
1994         } else
1995                 D_ASSERT(device, drbd_interval_empty(&peer_req->i));
1996
1997         drbd_may_finish_epoch(peer_device->connection, peer_req->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
1998
1999         return err;
2000 }
2001
2002 static int e_send_ack(struct drbd_work *w, enum drbd_packet ack)
2003 {
2004         struct drbd_peer_request *peer_req =
2005                 container_of(w, struct drbd_peer_request, w);
2006         struct drbd_peer_device *peer_device = peer_req->peer_device;
2007         int err;
2008
2009         err = drbd_send_ack(peer_device, ack, peer_req);
2010         dec_unacked(peer_device->device);
2011
2012         return err;
2013 }
2014
2015 static int e_send_superseded(struct drbd_work *w, int unused)
2016 {
2017         return e_send_ack(w, P_SUPERSEDED);
2018 }
2019
2020 static int e_send_retry_write(struct drbd_work *w, int unused)
2021 {
2022         struct drbd_peer_request *peer_req =
2023                 container_of(w, struct drbd_peer_request, w);
2024         struct drbd_connection *connection = peer_req->peer_device->connection;
2025
2026         return e_send_ack(w, connection->agreed_pro_version >= 100 ?
2027                              P_RETRY_WRITE : P_SUPERSEDED);
2028 }
2029
2030 static bool seq_greater(u32 a, u32 b)
2031 {
2032         /*
2033          * We assume 32-bit wrap-around here.
2034          * For 24-bit wrap-around, we would have to shift:
2035          *  a <<= 8; b <<= 8;
2036          */
2037         return (s32)a - (s32)b > 0;
2038 }
2039
2040 static u32 seq_max(u32 a, u32 b)
2041 {
2042         return seq_greater(a, b) ? a : b;
2043 }
2044
2045 static void update_peer_seq(struct drbd_peer_device *peer_device, unsigned int peer_seq)
2046 {
2047         struct drbd_device *device = peer_device->device;
2048         unsigned int newest_peer_seq;
2049
2050         if (test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)) {
2051                 spin_lock(&device->peer_seq_lock);
2052                 newest_peer_seq = seq_max(device->peer_seq, peer_seq);
2053                 device->peer_seq = newest_peer_seq;
2054                 spin_unlock(&device->peer_seq_lock);
2055                 /* wake up only if we actually changed device->peer_seq */
2056                 if (peer_seq == newest_peer_seq)
2057                         wake_up(&device->seq_wait);
2058         }
2059 }
2060
2061 static inline int overlaps(sector_t s1, int l1, sector_t s2, int l2)
2062 {
2063         return !((s1 + (l1>>9) <= s2) || (s1 >= s2 + (l2>>9)));
2064 }
2065
2066 /* maybe change sync_ee into interval trees as well? */
2067 static bool overlapping_resync_write(struct drbd_device *device, struct drbd_peer_request *peer_req)
2068 {
2069         struct drbd_peer_request *rs_req;
2070         bool rv = 0;
2071
2072         spin_lock_irq(&device->resource->req_lock);
2073         list_for_each_entry(rs_req, &device->sync_ee, w.list) {
2074                 if (overlaps(peer_req->i.sector, peer_req->i.size,
2075                              rs_req->i.sector, rs_req->i.size)) {
2076                         rv = 1;
2077                         break;
2078                 }
2079         }
2080         spin_unlock_irq(&device->resource->req_lock);
2081
2082         return rv;
2083 }
2084
2085 /* Called from receive_Data.
2086  * Synchronize packets on sock with packets on msock.
2087  *
2088  * This is here so even when a P_DATA packet traveling via sock overtook an Ack
2089  * packet traveling on msock, they are still processed in the order they have
2090  * been sent.
2091  *
2092  * Note: we don't care for Ack packets overtaking P_DATA packets.
2093  *
2094  * In case packet_seq is larger than device->peer_seq number, there are
2095  * outstanding packets on the msock. We wait for them to arrive.
2096  * In case we are the logically next packet, we update device->peer_seq
2097  * ourselves. Correctly handles 32bit wrap around.
2098  *
2099  * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
2100  * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
2101  * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
2102  * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
2103  *
2104  * returns 0 if we may process the packet,
2105  * -ERESTARTSYS if we were interrupted (by disconnect signal). */
2106 static int wait_for_and_update_peer_seq(struct drbd_peer_device *peer_device, const u32 peer_seq)
2107 {
2108         struct drbd_device *device = peer_device->device;
2109         DEFINE_WAIT(wait);
2110         long timeout;
2111         int ret = 0, tp;
2112
2113         if (!test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags))
2114                 return 0;
2115
2116         spin_lock(&device->peer_seq_lock);
2117         for (;;) {
2118                 if (!seq_greater(peer_seq - 1, device->peer_seq)) {
2119                         device->peer_seq = seq_max(device->peer_seq, peer_seq);
2120                         break;
2121                 }
2122
2123                 if (signal_pending(current)) {
2124                         ret = -ERESTARTSYS;
2125                         break;
2126                 }
2127
2128                 rcu_read_lock();
2129                 tp = rcu_dereference(peer_device->connection->net_conf)->two_primaries;
2130                 rcu_read_unlock();
2131
2132                 if (!tp)
2133                         break;
2134
2135                 /* Only need to wait if two_primaries is enabled */
2136                 prepare_to_wait(&device->seq_wait, &wait, TASK_INTERRUPTIBLE);
2137                 spin_unlock(&device->peer_seq_lock);
2138                 rcu_read_lock();
2139                 timeout = rcu_dereference(peer_device->connection->net_conf)->ping_timeo*HZ/10;
2140                 rcu_read_unlock();
2141                 timeout = schedule_timeout(timeout);
2142                 spin_lock(&device->peer_seq_lock);
2143                 if (!timeout) {
2144                         ret = -ETIMEDOUT;
2145                         drbd_err(device, "Timed out waiting for missing ack packets; disconnecting\n");
2146                         break;
2147                 }
2148         }
2149         spin_unlock(&device->peer_seq_lock);
2150         finish_wait(&device->seq_wait, &wait);
2151         return ret;
2152 }
2153
2154 /* see also bio_flags_to_wire()
2155  * DRBD_REQ_*, because we need to semantically map the flags to data packet
2156  * flags and back. We may replicate to other kernel versions. */
2157 static unsigned long wire_flags_to_bio_flags(u32 dpf)
2158 {
2159         return  (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
2160                 (dpf & DP_FUA ? REQ_FUA : 0) |
2161                 (dpf & DP_FLUSH ? REQ_PREFLUSH : 0);
2162 }
2163
2164 static unsigned long wire_flags_to_bio_op(u32 dpf)
2165 {
2166         if (dpf & DP_DISCARD)
2167                 return REQ_OP_DISCARD;
2168         else
2169                 return REQ_OP_WRITE;
2170 }
2171
2172 static void fail_postponed_requests(struct drbd_device *device, sector_t sector,
2173                                     unsigned int size)
2174 {
2175         struct drbd_interval *i;
2176
2177     repeat:
2178         drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2179                 struct drbd_request *req;
2180                 struct bio_and_error m;
2181
2182                 if (!i->local)
2183                         continue;
2184                 req = container_of(i, struct drbd_request, i);
2185                 if (!(req->rq_state & RQ_POSTPONED))
2186                         continue;
2187                 req->rq_state &= ~RQ_POSTPONED;
2188                 __req_mod(req, NEG_ACKED, &m);
2189                 spin_unlock_irq(&device->resource->req_lock);
2190                 if (m.bio)
2191                         complete_master_bio(device, &m);
2192                 spin_lock_irq(&device->resource->req_lock);
2193                 goto repeat;
2194         }
2195 }
2196
2197 static int handle_write_conflicts(struct drbd_device *device,
2198                                   struct drbd_peer_request *peer_req)
2199 {
2200         struct drbd_connection *connection = peer_req->peer_device->connection;
2201         bool resolve_conflicts = test_bit(RESOLVE_CONFLICTS, &connection->flags);
2202         sector_t sector = peer_req->i.sector;
2203         const unsigned int size = peer_req->i.size;
2204         struct drbd_interval *i;
2205         bool equal;
2206         int err;
2207
2208         /*
2209          * Inserting the peer request into the write_requests tree will prevent
2210          * new conflicting local requests from being added.
2211          */
2212         drbd_insert_interval(&device->write_requests, &peer_req->i);
2213
2214     repeat:
2215         drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2216                 if (i == &peer_req->i)
2217                         continue;
2218                 if (i->completed)
2219                         continue;
2220
2221                 if (!i->local) {
2222                         /*
2223                          * Our peer has sent a conflicting remote request; this
2224                          * should not happen in a two-node setup.  Wait for the
2225                          * earlier peer request to complete.
2226                          */
2227                         err = drbd_wait_misc(device, i);
2228                         if (err)
2229                                 goto out;
2230                         goto repeat;
2231                 }
2232
2233                 equal = i->sector == sector && i->size == size;
2234                 if (resolve_conflicts) {
2235                         /*
2236                          * If the peer request is fully contained within the
2237                          * overlapping request, it can be considered overwritten
2238                          * and thus superseded; otherwise, it will be retried
2239                          * once all overlapping requests have completed.
2240                          */
2241                         bool superseded = i->sector <= sector && i->sector +
2242                                        (i->size >> 9) >= sector + (size >> 9);
2243
2244                         if (!equal)
2245                                 drbd_alert(device, "Concurrent writes detected: "
2246                                                "local=%llus +%u, remote=%llus +%u, "
2247                                                "assuming %s came first\n",
2248                                           (unsigned long long)i->sector, i->size,
2249                                           (unsigned long long)sector, size,
2250                                           superseded ? "local" : "remote");
2251
2252                         peer_req->w.cb = superseded ? e_send_superseded :
2253                                                    e_send_retry_write;
2254                         list_add_tail(&peer_req->w.list, &device->done_ee);
2255                         queue_work(connection->ack_sender, &peer_req->peer_device->send_acks_work);
2256
2257                         err = -ENOENT;
2258                         goto out;
2259                 } else {
2260                         struct drbd_request *req =
2261                                 container_of(i, struct drbd_request, i);
2262
2263                         if (!equal)
2264                                 drbd_alert(device, "Concurrent writes detected: "
2265                                                "local=%llus +%u, remote=%llus +%u\n",
2266                                           (unsigned long long)i->sector, i->size,
2267                                           (unsigned long long)sector, size);
2268
2269                         if (req->rq_state & RQ_LOCAL_PENDING ||
2270                             !(req->rq_state & RQ_POSTPONED)) {
2271                                 /*
2272                                  * Wait for the node with the discard flag to
2273                                  * decide if this request has been superseded
2274                                  * or needs to be retried.
2275                                  * Requests that have been superseded will
2276                                  * disappear from the write_requests tree.
2277                                  *
2278                                  * In addition, wait for the conflicting
2279                                  * request to finish locally before submitting
2280                                  * the conflicting peer request.
2281                                  */
2282                                 err = drbd_wait_misc(device, &req->i);
2283                                 if (err) {
2284                                         _conn_request_state(connection, NS(conn, C_TIMEOUT), CS_HARD);
2285                                         fail_postponed_requests(device, sector, size);
2286                                         goto out;
2287                                 }
2288                                 goto repeat;
2289                         }
2290                         /*
2291                          * Remember to restart the conflicting requests after
2292                          * the new peer request has completed.
2293                          */
2294                         peer_req->flags |= EE_RESTART_REQUESTS;
2295                 }
2296         }
2297         err = 0;
2298
2299     out:
2300         if (err)
2301                 drbd_remove_epoch_entry_interval(device, peer_req);
2302         return err;
2303 }
2304
2305 /* mirrored write */
2306 static int receive_Data(struct drbd_connection *connection, struct packet_info *pi)
2307 {
2308         struct drbd_peer_device *peer_device;
2309         struct drbd_device *device;
2310         struct net_conf *nc;
2311         sector_t sector;
2312         struct drbd_peer_request *peer_req;
2313         struct p_data *p = pi->data;
2314         u32 peer_seq = be32_to_cpu(p->seq_num);
2315         int op, op_flags;
2316         u32 dp_flags;
2317         int err, tp;
2318
2319         peer_device = conn_peer_device(connection, pi->vnr);
2320         if (!peer_device)
2321                 return -EIO;
2322         device = peer_device->device;
2323
2324         if (!get_ldev(device)) {
2325                 int err2;
2326
2327                 err = wait_for_and_update_peer_seq(peer_device, peer_seq);
2328                 drbd_send_ack_dp(peer_device, P_NEG_ACK, p, pi->size);
2329                 atomic_inc(&connection->current_epoch->epoch_size);
2330                 err2 = drbd_drain_block(peer_device, pi->size);
2331                 if (!err)
2332                         err = err2;
2333                 return err;
2334         }
2335
2336         /*
2337          * Corresponding put_ldev done either below (on various errors), or in
2338          * drbd_peer_request_endio, if we successfully submit the data at the
2339          * end of this function.
2340          */
2341
2342         sector = be64_to_cpu(p->sector);
2343         peer_req = read_in_block(peer_device, p->block_id, sector, pi);
2344         if (!peer_req) {
2345                 put_ldev(device);
2346                 return -EIO;
2347         }
2348
2349         peer_req->w.cb = e_end_block;
2350         peer_req->submit_jif = jiffies;
2351         peer_req->flags |= EE_APPLICATION;
2352
2353         dp_flags = be32_to_cpu(p->dp_flags);
2354         op = wire_flags_to_bio_op(dp_flags);
2355         op_flags = wire_flags_to_bio_flags(dp_flags);
2356         if (pi->cmd == P_TRIM) {
2357                 struct request_queue *q = bdev_get_queue(device->ldev->backing_bdev);
2358                 peer_req->flags |= EE_IS_TRIM;
2359                 if (!blk_queue_discard(q))
2360                         peer_req->flags |= EE_IS_TRIM_USE_ZEROOUT;
2361                 D_ASSERT(peer_device, peer_req->i.size > 0);
2362                 D_ASSERT(peer_device, op == REQ_OP_DISCARD);
2363                 D_ASSERT(peer_device, peer_req->pages == NULL);
2364         } else if (peer_req->pages == NULL) {
2365                 D_ASSERT(device, peer_req->i.size == 0);
2366                 D_ASSERT(device, dp_flags & DP_FLUSH);
2367         }
2368
2369         if (dp_flags & DP_MAY_SET_IN_SYNC)
2370                 peer_req->flags |= EE_MAY_SET_IN_SYNC;
2371
2372         spin_lock(&connection->epoch_lock);
2373         peer_req->epoch = connection->current_epoch;
2374         atomic_inc(&peer_req->epoch->epoch_size);
2375         atomic_inc(&peer_req->epoch->active);
2376         spin_unlock(&connection->epoch_lock);
2377
2378         rcu_read_lock();
2379         nc = rcu_dereference(peer_device->connection->net_conf);
2380         tp = nc->two_primaries;
2381         if (peer_device->connection->agreed_pro_version < 100) {
2382                 switch (nc->wire_protocol) {
2383                 case DRBD_PROT_C:
2384                         dp_flags |= DP_SEND_WRITE_ACK;
2385                         break;
2386                 case DRBD_PROT_B:
2387                         dp_flags |= DP_SEND_RECEIVE_ACK;
2388                         break;
2389                 }
2390         }
2391         rcu_read_unlock();
2392
2393         if (dp_flags & DP_SEND_WRITE_ACK) {
2394                 peer_req->flags |= EE_SEND_WRITE_ACK;
2395                 inc_unacked(device);
2396                 /* corresponding dec_unacked() in e_end_block()
2397                  * respective _drbd_clear_done_ee */
2398         }
2399
2400         if (dp_flags & DP_SEND_RECEIVE_ACK) {
2401                 /* I really don't like it that the receiver thread
2402                  * sends on the msock, but anyways */
2403                 drbd_send_ack(peer_device, P_RECV_ACK, peer_req);
2404         }
2405
2406         if (tp) {
2407                 /* two primaries implies protocol C */
2408                 D_ASSERT(device, dp_flags & DP_SEND_WRITE_ACK);
2409                 peer_req->flags |= EE_IN_INTERVAL_TREE;
2410                 err = wait_for_and_update_peer_seq(peer_device, peer_seq);
2411                 if (err)
2412                         goto out_interrupted;
2413                 spin_lock_irq(&device->resource->req_lock);
2414                 err = handle_write_conflicts(device, peer_req);
2415                 if (err) {
2416                         spin_unlock_irq(&device->resource->req_lock);
2417                         if (err == -ENOENT) {
2418                                 put_ldev(device);
2419                                 return 0;
2420                         }
2421                         goto out_interrupted;
2422                 }
2423         } else {
2424                 update_peer_seq(peer_device, peer_seq);
2425                 spin_lock_irq(&device->resource->req_lock);
2426         }
2427         /* if we use the zeroout fallback code, we process synchronously
2428          * and we wait for all pending requests, respectively wait for
2429          * active_ee to become empty in drbd_submit_peer_request();
2430          * better not add ourselves here. */
2431         if ((peer_req->flags & EE_IS_TRIM_USE_ZEROOUT) == 0)
2432                 list_add_tail(&peer_req->w.list, &device->active_ee);
2433         spin_unlock_irq(&device->resource->req_lock);
2434
2435         if (device->state.conn == C_SYNC_TARGET)
2436                 wait_event(device->ee_wait, !overlapping_resync_write(device, peer_req));
2437
2438         if (device->state.pdsk < D_INCONSISTENT) {
2439                 /* In case we have the only disk of the cluster, */
2440                 drbd_set_out_of_sync(device, peer_req->i.sector, peer_req->i.size);
2441                 peer_req->flags &= ~EE_MAY_SET_IN_SYNC;
2442                 drbd_al_begin_io(device, &peer_req->i);
2443                 peer_req->flags |= EE_CALL_AL_COMPLETE_IO;
2444         }
2445
2446         err = drbd_submit_peer_request(device, peer_req, op, op_flags,
2447                                        DRBD_FAULT_DT_WR);
2448         if (!err)
2449                 return 0;
2450
2451         /* don't care for the reason here */
2452         drbd_err(device, "submit failed, triggering re-connect\n");
2453         spin_lock_irq(&device->resource->req_lock);
2454         list_del(&peer_req->w.list);
2455         drbd_remove_epoch_entry_interval(device, peer_req);
2456         spin_unlock_irq(&device->resource->req_lock);
2457         if (peer_req->flags & EE_CALL_AL_COMPLETE_IO) {
2458                 peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
2459                 drbd_al_complete_io(device, &peer_req->i);
2460         }
2461
2462 out_interrupted:
2463         drbd_may_finish_epoch(connection, peer_req->epoch, EV_PUT + EV_CLEANUP);
2464         put_ldev(device);
2465         drbd_free_peer_req(device, peer_req);
2466         return err;
2467 }
2468
2469 /* We may throttle resync, if the lower device seems to be busy,
2470  * and current sync rate is above c_min_rate.
2471  *
2472  * To decide whether or not the lower device is busy, we use a scheme similar
2473  * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
2474  * (more than 64 sectors) of activity we cannot account for with our own resync
2475  * activity, it obviously is "busy".
2476  *
2477  * The current sync rate used here uses only the most recent two step marks,
2478  * to have a short time average so we can react faster.
2479  */
2480 bool drbd_rs_should_slow_down(struct drbd_device *device, sector_t sector,
2481                 bool throttle_if_app_is_waiting)
2482 {
2483         struct lc_element *tmp;
2484         bool throttle = drbd_rs_c_min_rate_throttle(device);
2485
2486         if (!throttle || throttle_if_app_is_waiting)
2487                 return throttle;
2488
2489         spin_lock_irq(&device->al_lock);
2490         tmp = lc_find(device->resync, BM_SECT_TO_EXT(sector));
2491         if (tmp) {
2492                 struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
2493                 if (test_bit(BME_PRIORITY, &bm_ext->flags))
2494                         throttle = false;
2495                 /* Do not slow down if app IO is already waiting for this extent,
2496                  * and our progress is necessary for application IO to complete. */
2497         }
2498         spin_unlock_irq(&device->al_lock);
2499
2500         return throttle;
2501 }
2502
2503 bool drbd_rs_c_min_rate_throttle(struct drbd_device *device)
2504 {
2505         struct gendisk *disk = device->ldev->backing_bdev->bd_contains->bd_disk;
2506         unsigned long db, dt, dbdt;
2507         unsigned int c_min_rate;
2508         int curr_events;
2509
2510         rcu_read_lock();
2511         c_min_rate = rcu_dereference(device->ldev->disk_conf)->c_min_rate;
2512         rcu_read_unlock();
2513
2514         /* feature disabled? */
2515         if (c_min_rate == 0)
2516                 return false;
2517
2518         curr_events = (int)part_stat_read(&disk->part0, sectors[0]) +
2519                       (int)part_stat_read(&disk->part0, sectors[1]) -
2520                         atomic_read(&device->rs_sect_ev);
2521
2522         if (atomic_read(&device->ap_actlog_cnt)
2523             || curr_events - device->rs_last_events > 64) {
2524                 unsigned long rs_left;
2525                 int i;
2526
2527                 device->rs_last_events = curr_events;
2528
2529                 /* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
2530                  * approx. */
2531                 i = (device->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
2532
2533                 if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T)
2534                         rs_left = device->ov_left;
2535                 else
2536                         rs_left = drbd_bm_total_weight(device) - device->rs_failed;
2537
2538                 dt = ((long)jiffies - (long)device->rs_mark_time[i]) / HZ;
2539                 if (!dt)
2540                         dt++;
2541                 db = device->rs_mark_left[i] - rs_left;
2542                 dbdt = Bit2KB(db/dt);
2543
2544                 if (dbdt > c_min_rate)
2545                         return true;
2546         }
2547         return false;
2548 }
2549
2550 static int receive_DataRequest(struct drbd_connection *connection, struct packet_info *pi)
2551 {
2552         struct drbd_peer_device *peer_device;
2553         struct drbd_device *device;
2554         sector_t sector;
2555         sector_t capacity;
2556         struct drbd_peer_request *peer_req;
2557         struct digest_info *di = NULL;
2558         int size, verb;
2559         unsigned int fault_type;
2560         struct p_block_req *p = pi->data;
2561
2562         peer_device = conn_peer_device(connection, pi->vnr);
2563         if (!peer_device)
2564                 return -EIO;
2565         device = peer_device->device;
2566         capacity = drbd_get_capacity(device->this_bdev);
2567
2568         sector = be64_to_cpu(p->sector);
2569         size   = be32_to_cpu(p->blksize);
2570
2571         if (size <= 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_BIO_SIZE) {
2572                 drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2573                                 (unsigned long long)sector, size);
2574                 return -EINVAL;
2575         }
2576         if (sector + (size>>9) > capacity) {
2577                 drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2578                                 (unsigned long long)sector, size);
2579                 return -EINVAL;
2580         }
2581
2582         if (!get_ldev_if_state(device, D_UP_TO_DATE)) {
2583                 verb = 1;
2584                 switch (pi->cmd) {
2585                 case P_DATA_REQUEST:
2586                         drbd_send_ack_rp(peer_device, P_NEG_DREPLY, p);
2587                         break;
2588                 case P_RS_DATA_REQUEST:
2589                 case P_CSUM_RS_REQUEST:
2590                 case P_OV_REQUEST:
2591                         drbd_send_ack_rp(peer_device, P_NEG_RS_DREPLY , p);
2592                         break;
2593                 case P_OV_REPLY:
2594                         verb = 0;
2595                         dec_rs_pending(device);
2596                         drbd_send_ack_ex(peer_device, P_OV_RESULT, sector, size, ID_IN_SYNC);
2597                         break;
2598                 default:
2599                         BUG();
2600                 }
2601                 if (verb && __ratelimit(&drbd_ratelimit_state))
2602                         drbd_err(device, "Can not satisfy peer's read request, "
2603                             "no local data.\n");
2604
2605                 /* drain possibly payload */
2606                 return drbd_drain_block(peer_device, pi->size);
2607         }
2608
2609         /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2610          * "criss-cross" setup, that might cause write-out on some other DRBD,
2611          * which in turn might block on the other node at this very place.  */
2612         peer_req = drbd_alloc_peer_req(peer_device, p->block_id, sector, size,
2613                         true /* has real payload */, GFP_NOIO);
2614         if (!peer_req) {
2615                 put_ldev(device);
2616                 return -ENOMEM;
2617         }
2618
2619         switch (pi->cmd) {
2620         case P_DATA_REQUEST:
2621                 peer_req->w.cb = w_e_end_data_req;
2622                 fault_type = DRBD_FAULT_DT_RD;
2623                 /* application IO, don't drbd_rs_begin_io */
2624                 peer_req->flags |= EE_APPLICATION;
2625                 goto submit;
2626
2627         case P_RS_DATA_REQUEST:
2628                 peer_req->w.cb = w_e_end_rsdata_req;
2629                 fault_type = DRBD_FAULT_RS_RD;
2630                 /* used in the sector offset progress display */
2631                 device->bm_resync_fo = BM_SECT_TO_BIT(sector);
2632                 break;
2633
2634         case P_OV_REPLY:
2635         case P_CSUM_RS_REQUEST:
2636                 fault_type = DRBD_FAULT_RS_RD;
2637                 di = kmalloc(sizeof(*di) + pi->size, GFP_NOIO);
2638                 if (!di)
2639                         goto out_free_e;
2640
2641                 di->digest_size = pi->size;
2642                 di->digest = (((char *)di)+sizeof(struct digest_info));
2643
2644                 peer_req->digest = di;
2645                 peer_req->flags |= EE_HAS_DIGEST;
2646
2647                 if (drbd_recv_all(peer_device->connection, di->digest, pi->size))
2648                         goto out_free_e;
2649
2650                 if (pi->cmd == P_CSUM_RS_REQUEST) {
2651                         D_ASSERT(device, peer_device->connection->agreed_pro_version >= 89);
2652                         peer_req->w.cb = w_e_end_csum_rs_req;
2653                         /* used in the sector offset progress display */
2654                         device->bm_resync_fo = BM_SECT_TO_BIT(sector);
2655                         /* remember to report stats in drbd_resync_finished */
2656                         device->use_csums = true;
2657                 } else if (pi->cmd == P_OV_REPLY) {
2658                         /* track progress, we may need to throttle */
2659                         atomic_add(size >> 9, &device->rs_sect_in);
2660                         peer_req->w.cb = w_e_end_ov_reply;
2661                         dec_rs_pending(device);
2662                         /* drbd_rs_begin_io done when we sent this request,
2663                          * but accounting still needs to be done. */
2664                         goto submit_for_resync;
2665                 }
2666                 break;
2667
2668         case P_OV_REQUEST:
2669                 if (device->ov_start_sector == ~(sector_t)0 &&
2670                     peer_device->connection->agreed_pro_version >= 90) {
2671                         unsigned long now = jiffies;
2672                         int i;
2673                         device->ov_start_sector = sector;
2674                         device->ov_position = sector;
2675                         device->ov_left = drbd_bm_bits(device) - BM_SECT_TO_BIT(sector);
2676                         device->rs_total = device->ov_left;
2677                         for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2678                                 device->rs_mark_left[i] = device->ov_left;
2679                                 device->rs_mark_time[i] = now;
2680                         }
2681                         drbd_info(device, "Online Verify start sector: %llu\n",
2682                                         (unsigned long long)sector);
2683                 }
2684                 peer_req->w.cb = w_e_end_ov_req;
2685                 fault_type = DRBD_FAULT_RS_RD;
2686                 break;
2687
2688         default:
2689                 BUG();
2690         }
2691
2692         /* Throttle, drbd_rs_begin_io and submit should become asynchronous
2693          * wrt the receiver, but it is not as straightforward as it may seem.
2694          * Various places in the resync start and stop logic assume resync
2695          * requests are processed in order, requeuing this on the worker thread
2696          * introduces a bunch of new code for synchronization between threads.
2697          *
2698          * Unlimited throttling before drbd_rs_begin_io may stall the resync
2699          * "forever", throttling after drbd_rs_begin_io will lock that extent
2700          * for application writes for the same time.  For now, just throttle
2701          * here, where the rest of the code expects the receiver to sleep for
2702          * a while, anyways.
2703          */
2704
2705         /* Throttle before drbd_rs_begin_io, as that locks out application IO;
2706          * this defers syncer requests for some time, before letting at least
2707          * on request through.  The resync controller on the receiving side
2708          * will adapt to the incoming rate accordingly.
2709          *
2710          * We cannot throttle here if remote is Primary/SyncTarget:
2711          * we would also throttle its application reads.
2712          * In that case, throttling is done on the SyncTarget only.
2713          */
2714
2715         /* Even though this may be a resync request, we do add to "read_ee";
2716          * "sync_ee" is only used for resync WRITEs.
2717          * Add to list early, so debugfs can find this request
2718          * even if we have to sleep below. */
2719         spin_lock_irq(&device->resource->req_lock);
2720         list_add_tail(&peer_req->w.list, &device->read_ee);
2721         spin_unlock_irq(&device->resource->req_lock);
2722
2723         update_receiver_timing_details(connection, drbd_rs_should_slow_down);
2724         if (device->state.peer != R_PRIMARY
2725         && drbd_rs_should_slow_down(device, sector, false))
2726                 schedule_timeout_uninterruptible(HZ/10);
2727         update_receiver_timing_details(connection, drbd_rs_begin_io);
2728         if (drbd_rs_begin_io(device, sector))
2729                 goto out_free_e;
2730
2731 submit_for_resync:
2732         atomic_add(size >> 9, &device->rs_sect_ev);
2733
2734 submit:
2735         update_receiver_timing_details(connection, drbd_submit_peer_request);
2736         inc_unacked(device);
2737         if (drbd_submit_peer_request(device, peer_req, REQ_OP_READ, 0,
2738                                      fault_type) == 0)
2739                 return 0;
2740
2741         /* don't care for the reason here */
2742         drbd_err(device, "submit failed, triggering re-connect\n");
2743
2744 out_free_e:
2745         spin_lock_irq(&device->resource->req_lock);
2746         list_del(&peer_req->w.list);
2747         spin_unlock_irq(&device->resource->req_lock);
2748         /* no drbd_rs_complete_io(), we are dropping the connection anyways */
2749
2750         put_ldev(device);
2751         drbd_free_peer_req(device, peer_req);
2752         return -EIO;
2753 }
2754
2755 /**
2756  * drbd_asb_recover_0p  -  Recover after split-brain with no remaining primaries
2757  */
2758 static int drbd_asb_recover_0p(struct drbd_peer_device *peer_device) __must_hold(local)
2759 {
2760         struct drbd_device *device = peer_device->device;
2761         int self, peer, rv = -100;
2762         unsigned long ch_self, ch_peer;
2763         enum drbd_after_sb_p after_sb_0p;
2764
2765         self = device->ldev->md.uuid[UI_BITMAP] & 1;
2766         peer = device->p_uuid[UI_BITMAP] & 1;
2767
2768         ch_peer = device->p_uuid[UI_SIZE];
2769         ch_self = device->comm_bm_set;
2770
2771         rcu_read_lock();
2772         after_sb_0p = rcu_dereference(peer_device->connection->net_conf)->after_sb_0p;
2773         rcu_read_unlock();
2774         switch (after_sb_0p) {
2775         case ASB_CONSENSUS:
2776         case ASB_DISCARD_SECONDARY:
2777         case ASB_CALL_HELPER:
2778         case ASB_VIOLENTLY:
2779                 drbd_err(device, "Configuration error.\n");
2780                 break;
2781         case ASB_DISCONNECT:
2782                 break;
2783         case ASB_DISCARD_YOUNGER_PRI:
2784                 if (self == 0 && peer == 1) {
2785                         rv = -1;
2786                         break;
2787                 }
2788                 if (self == 1 && peer == 0) {
2789                         rv =  1;
2790                         break;
2791                 }
2792                 /* Else fall through to one of the other strategies... */
2793         case ASB_DISCARD_OLDER_PRI:
2794                 if (self == 0 && peer == 1) {
2795                         rv = 1;
2796                         break;
2797                 }
2798                 if (self == 1 && peer == 0) {
2799                         rv = -1;
2800                         break;
2801                 }
2802                 /* Else fall through to one of the other strategies... */
2803                 drbd_warn(device, "Discard younger/older primary did not find a decision\n"
2804                      "Using discard-least-changes instead\n");
2805         case ASB_DISCARD_ZERO_CHG:
2806                 if (ch_peer == 0 && ch_self == 0) {
2807                         rv = test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)
2808                                 ? -1 : 1;
2809                         break;
2810                 } else {
2811                         if (ch_peer == 0) { rv =  1; break; }
2812                         if (ch_self == 0) { rv = -1; break; }
2813                 }
2814                 if (after_sb_0p == ASB_DISCARD_ZERO_CHG)
2815                         break;
2816         case ASB_DISCARD_LEAST_CHG:
2817                 if      (ch_self < ch_peer)
2818                         rv = -1;
2819                 else if (ch_self > ch_peer)
2820                         rv =  1;
2821                 else /* ( ch_self == ch_peer ) */
2822                      /* Well, then use something else. */
2823                         rv = test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)
2824                                 ? -1 : 1;
2825                 break;
2826         case ASB_DISCARD_LOCAL:
2827                 rv = -1;
2828                 break;
2829         case ASB_DISCARD_REMOTE:
2830                 rv =  1;
2831         }
2832
2833         return rv;
2834 }
2835
2836 /**
2837  * drbd_asb_recover_1p  -  Recover after split-brain with one remaining primary
2838  */
2839 static int drbd_asb_recover_1p(struct drbd_peer_device *peer_device) __must_hold(local)
2840 {
2841         struct drbd_device *device = peer_device->device;
2842         int hg, rv = -100;
2843         enum drbd_after_sb_p after_sb_1p;
2844
2845         rcu_read_lock();
2846         after_sb_1p = rcu_dereference(peer_device->connection->net_conf)->after_sb_1p;
2847         rcu_read_unlock();
2848         switch (after_sb_1p) {
2849         case ASB_DISCARD_YOUNGER_PRI:
2850         case ASB_DISCARD_OLDER_PRI:
2851         case ASB_DISCARD_LEAST_CHG:
2852         case ASB_DISCARD_LOCAL:
2853         case ASB_DISCARD_REMOTE:
2854         case ASB_DISCARD_ZERO_CHG:
2855                 drbd_err(device, "Configuration error.\n");
2856                 break;
2857         case ASB_DISCONNECT:
2858                 break;
2859         case ASB_CONSENSUS:
2860                 hg = drbd_asb_recover_0p(peer_device);
2861                 if (hg == -1 && device->state.role == R_SECONDARY)
2862                         rv = hg;
2863                 if (hg == 1  && device->state.role == R_PRIMARY)
2864                         rv = hg;
2865                 break;
2866         case ASB_VIOLENTLY:
2867                 rv = drbd_asb_recover_0p(peer_device);
2868                 break;
2869         case ASB_DISCARD_SECONDARY:
2870                 return device->state.role == R_PRIMARY ? 1 : -1;
2871         case ASB_CALL_HELPER:
2872                 hg = drbd_asb_recover_0p(peer_device);
2873                 if (hg == -1 && device->state.role == R_PRIMARY) {
2874                         enum drbd_state_rv rv2;
2875
2876                          /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2877                           * we might be here in C_WF_REPORT_PARAMS which is transient.
2878                           * we do not need to wait for the after state change work either. */
2879                         rv2 = drbd_change_state(device, CS_VERBOSE, NS(role, R_SECONDARY));
2880                         if (rv2 != SS_SUCCESS) {
2881                                 drbd_khelper(device, "pri-lost-after-sb");
2882                         } else {
2883                                 drbd_warn(device, "Successfully gave up primary role.\n");
2884                                 rv = hg;
2885                         }
2886                 } else
2887                         rv = hg;
2888         }
2889
2890         return rv;
2891 }
2892
2893 /**
2894  * drbd_asb_recover_2p  -  Recover after split-brain with two remaining primaries
2895  */
2896 static int drbd_asb_recover_2p(struct drbd_peer_device *peer_device) __must_hold(local)
2897 {
2898         struct drbd_device *device = peer_device->device;
2899         int hg, rv = -100;
2900         enum drbd_after_sb_p after_sb_2p;
2901
2902         rcu_read_lock();
2903         after_sb_2p = rcu_dereference(peer_device->connection->net_conf)->after_sb_2p;
2904         rcu_read_unlock();
2905         switch (after_sb_2p) {
2906         case ASB_DISCARD_YOUNGER_PRI:
2907         case ASB_DISCARD_OLDER_PRI:
2908         case ASB_DISCARD_LEAST_CHG:
2909         case ASB_DISCARD_LOCAL:
2910         case ASB_DISCARD_REMOTE:
2911         case ASB_CONSENSUS:
2912         case ASB_DISCARD_SECONDARY:
2913         case ASB_DISCARD_ZERO_CHG:
2914                 drbd_err(device, "Configuration error.\n");
2915                 break;
2916         case ASB_VIOLENTLY:
2917                 rv = drbd_asb_recover_0p(peer_device);
2918                 break;
2919         case ASB_DISCONNECT:
2920                 break;
2921         case ASB_CALL_HELPER:
2922                 hg = drbd_asb_recover_0p(peer_device);
2923                 if (hg == -1) {
2924                         enum drbd_state_rv rv2;
2925
2926                          /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2927                           * we might be here in C_WF_REPORT_PARAMS which is transient.
2928                           * we do not need to wait for the after state change work either. */
2929                         rv2 = drbd_change_state(device, CS_VERBOSE, NS(role, R_SECONDARY));
2930                         if (rv2 != SS_SUCCESS) {
2931                                 drbd_khelper(device, "pri-lost-after-sb");
2932                         } else {
2933                                 drbd_warn(device, "Successfully gave up primary role.\n");
2934                                 rv = hg;
2935                         }
2936                 } else
2937                         rv = hg;
2938         }
2939
2940         return rv;
2941 }
2942
2943 static void drbd_uuid_dump(struct drbd_device *device, char *text, u64 *uuid,
2944                            u64 bits, u64 flags)
2945 {
2946         if (!uuid) {
2947                 drbd_info(device, "%s uuid info vanished while I was looking!\n", text);
2948                 return;
2949         }
2950         drbd_info(device, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
2951              text,
2952              (unsigned long long)uuid[UI_CURRENT],
2953              (unsigned long long)uuid[UI_BITMAP],
2954              (unsigned long long)uuid[UI_HISTORY_START],
2955              (unsigned long long)uuid[UI_HISTORY_END],
2956              (unsigned long long)bits,
2957              (unsigned long long)flags);
2958 }
2959
2960 /*
2961   100   after split brain try auto recover
2962     2   C_SYNC_SOURCE set BitMap
2963     1   C_SYNC_SOURCE use BitMap
2964     0   no Sync
2965    -1   C_SYNC_TARGET use BitMap
2966    -2   C_SYNC_TARGET set BitMap
2967  -100   after split brain, disconnect
2968 -1000   unrelated data
2969 -1091   requires proto 91
2970 -1096   requires proto 96
2971  */
2972 static int drbd_uuid_compare(struct drbd_device *const device, int *rule_nr) __must_hold(local)
2973 {
2974         struct drbd_peer_device *const peer_device = first_peer_device(device);
2975         struct drbd_connection *const connection = peer_device ? peer_device->connection : NULL;
2976         u64 self, peer;
2977         int i, j;
2978
2979         self = device->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2980         peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
2981
2982         *rule_nr = 10;
2983         if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
2984                 return 0;
2985
2986         *rule_nr = 20;
2987         if ((self == UUID_JUST_CREATED || self == (u64)0) &&
2988              peer != UUID_JUST_CREATED)
2989                 return -2;
2990
2991         *rule_nr = 30;
2992         if (self != UUID_JUST_CREATED &&
2993             (peer == UUID_JUST_CREATED || peer == (u64)0))
2994                 return 2;
2995
2996         if (self == peer) {
2997                 int rct, dc; /* roles at crash time */
2998
2999                 if (device->p_uuid[UI_BITMAP] == (u64)0 && device->ldev->md.uuid[UI_BITMAP] != (u64)0) {
3000
3001                         if (connection->agreed_pro_version < 91)
3002                                 return -1091;
3003
3004                         if ((device->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
3005                             (device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
3006                                 drbd_info(device, "was SyncSource, missed the resync finished event, corrected myself:\n");
3007                                 drbd_uuid_move_history(device);
3008                                 device->ldev->md.uuid[UI_HISTORY_START] = device->ldev->md.uuid[UI_BITMAP];
3009                                 device->ldev->md.uuid[UI_BITMAP] = 0;
3010
3011                                 drbd_uuid_dump(device, "self", device->ldev->md.uuid,
3012                                                device->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(device) : 0, 0);
3013                                 *rule_nr = 34;
3014                         } else {
3015                                 drbd_info(device, "was SyncSource (peer failed to write sync_uuid)\n");
3016                                 *rule_nr = 36;
3017                         }
3018
3019                         return 1;
3020                 }
3021
3022                 if (device->ldev->md.uuid[UI_BITMAP] == (u64)0 && device->p_uuid[UI_BITMAP] != (u64)0) {
3023
3024                         if (connection->agreed_pro_version < 91)
3025                                 return -1091;
3026
3027                         if ((device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_BITMAP] & ~((u64)1)) &&
3028                             (device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
3029                                 drbd_info(device, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
3030
3031                                 device->p_uuid[UI_HISTORY_START + 1] = device->p_uuid[UI_HISTORY_START];
3032                                 device->p_uuid[UI_HISTORY_START] = device->p_uuid[UI_BITMAP];
3033                                 device->p_uuid[UI_BITMAP] = 0UL;
3034
3035                                 drbd_uuid_dump(device, "peer", device->p_uuid, device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
3036                                 *rule_nr = 35;
3037                         } else {
3038                                 drbd_info(device, "was SyncTarget (failed to write sync_uuid)\n");
3039                                 *rule_nr = 37;
3040                         }
3041
3042                         return -1;
3043                 }
3044
3045                 /* Common power [off|failure] */
3046                 rct = (test_bit(CRASHED_PRIMARY, &device->flags) ? 1 : 0) +
3047                         (device->p_uuid[UI_FLAGS] & 2);
3048                 /* lowest bit is set when we were primary,
3049                  * next bit (weight 2) is set when peer was primary */
3050                 *rule_nr = 40;
3051
3052                 switch (rct) {
3053                 case 0: /* !self_pri && !peer_pri */ return 0;
3054                 case 1: /*  self_pri && !peer_pri */ return 1;
3055                 case 2: /* !self_pri &&  peer_pri */ return -1;
3056                 case 3: /*  self_pri &&  peer_pri */
3057                         dc = test_bit(RESOLVE_CONFLICTS, &connection->flags);
3058                         return dc ? -1 : 1;
3059                 }
3060         }
3061
3062         *rule_nr = 50;
3063         peer = device->p_uuid[UI_BITMAP] & ~((u64)1);
3064         if (self == peer)
3065                 return -1;
3066
3067         *rule_nr = 51;
3068         peer = device->p_uuid[UI_HISTORY_START] & ~((u64)1);
3069         if (self == peer) {
3070                 if (connection->agreed_pro_version < 96 ?
3071                     (device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
3072                     (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
3073                     peer + UUID_NEW_BM_OFFSET == (device->p_uuid[UI_BITMAP] & ~((u64)1))) {
3074                         /* The last P_SYNC_UUID did not get though. Undo the last start of
3075                            resync as sync source modifications of the peer's UUIDs. */
3076
3077                         if (connection->agreed_pro_version < 91)
3078                                 return -1091;
3079
3080                         device->p_uuid[UI_BITMAP] = device->p_uuid[UI_HISTORY_START];
3081                         device->p_uuid[UI_HISTORY_START] = device->p_uuid[UI_HISTORY_START + 1];
3082
3083                         drbd_info(device, "Lost last syncUUID packet, corrected:\n");
3084                         drbd_uuid_dump(device, "peer", device->p_uuid, device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
3085
3086                         return -1;
3087                 }
3088         }
3089
3090         *rule_nr = 60;
3091         self = device->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
3092         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3093                 peer = device->p_uuid[i] & ~((u64)1);
3094                 if (self == peer)
3095                         return -2;
3096         }
3097
3098         *rule_nr = 70;
3099         self = device->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
3100         peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
3101         if (self == peer)
3102                 return 1;
3103
3104         *rule_nr = 71;
3105         self = device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
3106         if (self == peer) {
3107                 if (connection->agreed_pro_version < 96 ?
3108                     (device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
3109                     (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
3110                     self + UUID_NEW_BM_OFFSET == (device->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
3111                         /* The last P_SYNC_UUID did not get though. Undo the last start of
3112                            resync as sync source modifications of our UUIDs. */
3113
3114                         if (connection->agreed_pro_version < 91)
3115                                 return -1091;
3116
3117                         __drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_HISTORY_START]);
3118                         __drbd_uuid_set(device, UI_HISTORY_START, device->ldev->md.uuid[UI_HISTORY_START + 1]);
3119
3120                         drbd_info(device, "Last syncUUID did not get through, corrected:\n");
3121                         drbd_uuid_dump(device, "self", device->ldev->md.uuid,
3122                                        device->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(device) : 0, 0);
3123
3124                         return 1;
3125                 }
3126         }
3127
3128
3129         *rule_nr = 80;
3130         peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
3131         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3132                 self = device->ldev->md.uuid[i] & ~((u64)1);
3133                 if (self == peer)
3134                         return 2;
3135         }
3136
3137         *rule_nr = 90;
3138         self = device->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
3139         peer = device->p_uuid[UI_BITMAP] & ~((u64)1);
3140         if (self == peer && self != ((u64)0))
3141                 return 100;
3142
3143         *rule_nr = 100;
3144         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3145                 self = device->ldev->md.uuid[i] & ~((u64)1);
3146                 for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
3147                         peer = device->p_uuid[j] & ~((u64)1);
3148                         if (self == peer)
3149                                 return -100;
3150                 }
3151         }
3152
3153         return -1000;
3154 }
3155
3156 /* drbd_sync_handshake() returns the new conn state on success, or
3157    CONN_MASK (-1) on failure.
3158  */
3159 static enum drbd_conns drbd_sync_handshake(struct drbd_peer_device *peer_device,
3160                                            enum drbd_role peer_role,
3161                                            enum drbd_disk_state peer_disk) __must_hold(local)
3162 {
3163         struct drbd_device *device = peer_device->device;
3164         enum drbd_conns rv = C_MASK;
3165         enum drbd_disk_state mydisk;
3166         struct net_conf *nc;
3167         int hg, rule_nr, rr_conflict, tentative;
3168
3169         mydisk = device->state.disk;
3170         if (mydisk == D_NEGOTIATING)
3171                 mydisk = device->new_state_tmp.disk;
3172
3173         drbd_info(device, "drbd_sync_handshake:\n");
3174
3175         spin_lock_irq(&device->ldev->md.uuid_lock);
3176         drbd_uuid_dump(device, "self", device->ldev->md.uuid, device->comm_bm_set, 0);
3177         drbd_uuid_dump(device, "peer", device->p_uuid,
3178                        device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
3179
3180         hg = drbd_uuid_compare(device, &rule_nr);
3181         spin_unlock_irq(&device->ldev->md.uuid_lock);
3182
3183         drbd_info(device, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
3184
3185         if (hg == -1000) {
3186                 drbd_alert(device, "Unrelated data, aborting!\n");
3187                 return C_MASK;
3188         }
3189         if (hg < -1000) {
3190                 drbd_alert(device, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
3191                 return C_MASK;
3192         }
3193
3194         if    ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
3195             (peer_disk == D_INCONSISTENT && mydisk    > D_INCONSISTENT)) {
3196                 int f = (hg == -100) || abs(hg) == 2;
3197                 hg = mydisk > D_INCONSISTENT ? 1 : -1;
3198                 if (f)
3199                         hg = hg*2;
3200                 drbd_info(device, "Becoming sync %s due to disk states.\n",
3201                      hg > 0 ? "source" : "target");
3202         }
3203
3204         if (abs(hg) == 100)
3205                 drbd_khelper(device, "initial-split-brain");
3206
3207         rcu_read_lock();
3208         nc = rcu_dereference(peer_device->connection->net_conf);
3209
3210         if (hg == 100 || (hg == -100 && nc->always_asbp)) {
3211                 int pcount = (device->state.role == R_PRIMARY)
3212                            + (peer_role == R_PRIMARY);
3213                 int forced = (hg == -100);
3214
3215                 switch (pcount) {
3216                 case 0:
3217                         hg = drbd_asb_recover_0p(peer_device);
3218                         break;
3219                 case 1:
3220                         hg = drbd_asb_recover_1p(peer_device);
3221                         break;
3222                 case 2:
3223                         hg = drbd_asb_recover_2p(peer_device);
3224                         break;
3225                 }
3226                 if (abs(hg) < 100) {
3227                         drbd_warn(device, "Split-Brain detected, %d primaries, "
3228                              "automatically solved. Sync from %s node\n",
3229                              pcount, (hg < 0) ? "peer" : "this");
3230                         if (forced) {
3231                                 drbd_warn(device, "Doing a full sync, since"
3232                                      " UUIDs where ambiguous.\n");
3233                                 hg = hg*2;
3234                         }
3235                 }
3236         }
3237
3238         if (hg == -100) {
3239                 if (test_bit(DISCARD_MY_DATA, &device->flags) && !(device->p_uuid[UI_FLAGS]&1))
3240                         hg = -1;
3241                 if (!test_bit(DISCARD_MY_DATA, &device->flags) && (device->p_uuid[UI_FLAGS]&1))
3242                         hg = 1;
3243
3244                 if (abs(hg) < 100)
3245                         drbd_warn(device, "Split-Brain detected, manually solved. "
3246                              "Sync from %s node\n",
3247                              (hg < 0) ? "peer" : "this");
3248         }
3249         rr_conflict = nc->rr_conflict;
3250         tentative = nc->tentative;
3251         rcu_read_unlock();
3252
3253         if (hg == -100) {
3254                 /* FIXME this log message is not correct if we end up here
3255                  * after an attempted attach on a diskless node.
3256                  * We just refuse to attach -- well, we drop the "connection"
3257                  * to that disk, in a way... */
3258                 drbd_alert(device, "Split-Brain detected but unresolved, dropping connection!\n");
3259                 drbd_khelper(device, "split-brain");
3260                 return C_MASK;
3261         }
3262
3263         if (hg > 0 && mydisk <= D_INCONSISTENT) {
3264                 drbd_err(device, "I shall become SyncSource, but I am inconsistent!\n");
3265                 return C_MASK;
3266         }
3267
3268         if (hg < 0 && /* by intention we do not use mydisk here. */
3269             device->state.role == R_PRIMARY && device->state.disk >= D_CONSISTENT) {
3270                 switch (rr_conflict) {
3271                 case ASB_CALL_HELPER:
3272                         drbd_khelper(device, "pri-lost");
3273                         /* fall through */
3274                 case ASB_DISCONNECT:
3275                         drbd_err(device, "I shall become SyncTarget, but I am primary!\n");
3276                         return C_MASK;
3277                 case ASB_VIOLENTLY:
3278                         drbd_warn(device, "Becoming SyncTarget, violating the stable-data"
3279                              "assumption\n");
3280                 }
3281         }
3282
3283         if (tentative || test_bit(CONN_DRY_RUN, &peer_device->connection->flags)) {
3284                 if (hg == 0)
3285                         drbd_info(device, "dry-run connect: No resync, would become Connected immediately.\n");
3286                 else
3287                         drbd_info(device, "dry-run connect: Would become %s, doing a %s resync.",
3288                                  drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
3289                                  abs(hg) >= 2 ? "full" : "bit-map based");
3290                 return C_MASK;
3291         }
3292
3293         if (abs(hg) >= 2) {
3294                 drbd_info(device, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
3295                 if (drbd_bitmap_io(device, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
3296                                         BM_LOCKED_SET_ALLOWED))
3297                         return C_MASK;
3298         }
3299
3300         if (hg > 0) { /* become sync source. */
3301                 rv = C_WF_BITMAP_S;
3302         } else if (hg < 0) { /* become sync target */
3303                 rv = C_WF_BITMAP_T;
3304         } else {
3305                 rv = C_CONNECTED;
3306                 if (drbd_bm_total_weight(device)) {
3307                         drbd_info(device, "No resync, but %lu bits in bitmap!\n",
3308                              drbd_bm_total_weight(device));
3309                 }
3310         }
3311
3312         return rv;
3313 }
3314
3315 static enum drbd_after_sb_p convert_after_sb(enum drbd_after_sb_p peer)
3316 {
3317         /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
3318         if (peer == ASB_DISCARD_REMOTE)
3319                 return ASB_DISCARD_LOCAL;
3320
3321         /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
3322         if (peer == ASB_DISCARD_LOCAL)
3323                 return ASB_DISCARD_REMOTE;
3324
3325         /* everything else is valid if they are equal on both sides. */
3326         return peer;
3327 }
3328
3329 static int receive_protocol(struct drbd_connection *connection, struct packet_info *pi)
3330 {
3331         struct p_protocol *p = pi->data;
3332         enum drbd_after_sb_p p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
3333         int p_proto, p_discard_my_data, p_two_primaries, cf;
3334         struct net_conf *nc, *old_net_conf, *new_net_conf = NULL;
3335         char integrity_alg[SHARED_SECRET_MAX] = "";
3336         struct crypto_ahash *peer_integrity_tfm = NULL;
3337         void *int_dig_in = NULL, *int_dig_vv = NULL;
3338
3339         p_proto         = be32_to_cpu(p->protocol);
3340         p_after_sb_0p   = be32_to_cpu(p->after_sb_0p);
3341         p_after_sb_1p   = be32_to_cpu(p->after_sb_1p);
3342         p_after_sb_2p   = be32_to_cpu(p->after_sb_2p);
3343         p_two_primaries = be32_to_cpu(p->two_primaries);
3344         cf              = be32_to_cpu(p->conn_flags);
3345         p_discard_my_data = cf & CF_DISCARD_MY_DATA;
3346
3347         if (connection->agreed_pro_version >= 87) {
3348                 int err;
3349
3350                 if (pi->size > sizeof(integrity_alg))
3351                         return -EIO;
3352                 err = drbd_recv_all(connection, integrity_alg, pi->size);
3353                 if (err)
3354                         return err;
3355                 integrity_alg[SHARED_SECRET_MAX - 1] = 0;
3356         }
3357
3358         if (pi->cmd != P_PROTOCOL_UPDATE) {
3359                 clear_bit(CONN_DRY_RUN, &connection->flags);
3360
3361                 if (cf & CF_DRY_RUN)
3362                         set_bit(CONN_DRY_RUN, &connection->flags);
3363
3364                 rcu_read_lock();
3365                 nc = rcu_dereference(connection->net_conf);
3366
3367                 if (p_proto != nc->wire_protocol) {
3368                         drbd_err(connection, "incompatible %s settings\n", "protocol");
3369                         goto disconnect_rcu_unlock;
3370                 }
3371
3372                 if (convert_after_sb(p_after_sb_0p) != nc->after_sb_0p) {
3373                         drbd_err(connection, "incompatible %s settings\n", "after-sb-0pri");
3374                         goto disconnect_rcu_unlock;
3375                 }
3376
3377                 if (convert_after_sb(p_after_sb_1p) != nc->after_sb_1p) {
3378                         drbd_err(connection, "incompatible %s settings\n", "after-sb-1pri");
3379                         goto disconnect_rcu_unlock;
3380                 }
3381
3382                 if (convert_after_sb(p_after_sb_2p) != nc->after_sb_2p) {
3383                         drbd_err(connection, "incompatible %s settings\n", "after-sb-2pri");
3384                         goto disconnect_rcu_unlock;
3385                 }
3386
3387                 if (p_discard_my_data && nc->discard_my_data) {
3388                         drbd_err(connection, "incompatible %s settings\n", "discard-my-data");
3389                         goto disconnect_rcu_unlock;
3390                 }
3391
3392                 if (p_two_primaries != nc->two_primaries) {
3393                         drbd_err(connection, "incompatible %s settings\n", "allow-two-primaries");
3394                         goto disconnect_rcu_unlock;
3395                 }
3396
3397                 if (strcmp(integrity_alg, nc->integrity_alg)) {
3398                         drbd_err(connection, "incompatible %s settings\n", "data-integrity-alg");
3399                         goto disconnect_rcu_unlock;
3400                 }
3401
3402                 rcu_read_unlock();
3403         }
3404
3405         if (integrity_alg[0]) {
3406                 int hash_size;
3407
3408                 /*
3409                  * We can only change the peer data integrity algorithm
3410                  * here.  Changing our own data integrity algorithm
3411                  * requires that we send a P_PROTOCOL_UPDATE packet at
3412                  * the same time; otherwise, the peer has no way to
3413                  * tell between which packets the algorithm should
3414                  * change.
3415                  */
3416
3417                 peer_integrity_tfm = crypto_alloc_ahash(integrity_alg, 0, CRYPTO_ALG_ASYNC);
3418                 if (!peer_integrity_tfm) {
3419                         drbd_err(connection, "peer data-integrity-alg %s not supported\n",
3420                                  integrity_alg);
3421                         goto disconnect;
3422                 }
3423
3424                 hash_size = crypto_ahash_digestsize(peer_integrity_tfm);
3425                 int_dig_in = kmalloc(hash_size, GFP_KERNEL);
3426                 int_dig_vv = kmalloc(hash_size, GFP_KERNEL);
3427                 if (!(int_dig_in && int_dig_vv)) {
3428                         drbd_err(connection, "Allocation of buffers for data integrity checking failed\n");
3429                         goto disconnect;
3430                 }
3431         }
3432
3433         new_net_conf = kmalloc(sizeof(struct net_conf), GFP_KERNEL);
3434         if (!new_net_conf) {
3435                 drbd_err(connection, "Allocation of new net_conf failed\n");
3436                 goto disconnect;
3437         }
3438
3439         mutex_lock(&connection->data.mutex);
3440         mutex_lock(&connection->resource->conf_update);
3441         old_net_conf = connection->net_conf;
3442         *new_net_conf = *old_net_conf;
3443
3444         new_net_conf->wire_protocol = p_proto;
3445         new_net_conf->after_sb_0p = convert_after_sb(p_after_sb_0p);
3446         new_net_conf->after_sb_1p = convert_after_sb(p_after_sb_1p);
3447         new_net_conf->after_sb_2p = convert_after_sb(p_after_sb_2p);
3448         new_net_conf->two_primaries = p_two_primaries;
3449
3450         rcu_assign_pointer(connection->net_conf, new_net_conf);
3451         mutex_unlock(&connection->resource->conf_update);
3452         mutex_unlock(&connection->data.mutex);
3453
3454         crypto_free_ahash(connection->peer_integrity_tfm);
3455         kfree(connection->int_dig_in);
3456         kfree(connection->int_dig_vv);
3457         connection->peer_integrity_tfm = peer_integrity_tfm;
3458         connection->int_dig_in = int_dig_in;
3459         connection->int_dig_vv = int_dig_vv;
3460
3461         if (strcmp(old_net_conf->integrity_alg, integrity_alg))
3462                 drbd_info(connection, "peer data-integrity-alg: %s\n",
3463                           integrity_alg[0] ? integrity_alg : "(none)");
3464
3465         synchronize_rcu();
3466         kfree(old_net_conf);
3467         return 0;
3468
3469 disconnect_rcu_unlock:
3470         rcu_read_unlock();
3471 disconnect:
3472         crypto_free_ahash(peer_integrity_tfm);
3473         kfree(int_dig_in);
3474         kfree(int_dig_vv);
3475         conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
3476         return -EIO;
3477 }
3478
3479 /* helper function
3480  * input: alg name, feature name
3481  * return: NULL (alg name was "")
3482  *         ERR_PTR(error) if something goes wrong
3483  *         or the crypto hash ptr, if it worked out ok. */
3484 static struct crypto_ahash *drbd_crypto_alloc_digest_safe(const struct drbd_device *device,
3485                 const char *alg, const char *name)
3486 {
3487         struct crypto_ahash *tfm;
3488
3489         if (!alg[0])
3490                 return NULL;
3491
3492         tfm = crypto_alloc_ahash(alg, 0, CRYPTO_ALG_ASYNC);
3493         if (IS_ERR(tfm)) {
3494                 drbd_err(device, "Can not allocate \"%s\" as %s (reason: %ld)\n",
3495                         alg, name, PTR_ERR(tfm));
3496                 return tfm;
3497         }
3498         return tfm;
3499 }
3500
3501 static int ignore_remaining_packet(struct drbd_connection *connection, struct packet_info *pi)
3502 {
3503         void *buffer = connection->data.rbuf;
3504         int size = pi->size;
3505
3506         while (size) {
3507                 int s = min_t(int, size, DRBD_SOCKET_BUFFER_SIZE);
3508                 s = drbd_recv(connection, buffer, s);
3509                 if (s <= 0) {
3510                         if (s < 0)
3511                                 return s;
3512                         break;
3513                 }
3514                 size -= s;
3515         }
3516         if (size)
3517                 return -EIO;
3518         return 0;
3519 }
3520
3521 /*
3522  * config_unknown_volume  -  device configuration command for unknown volume
3523  *
3524  * When a device is added to an existing connection, the node on which the
3525  * device is added first will send configuration commands to its peer but the
3526  * peer will not know about the device yet.  It will warn and ignore these
3527  * commands.  Once the device is added on the second node, the second node will
3528  * send the same device configuration commands, but in the other direction.
3529  *
3530  * (We can also end up here if drbd is misconfigured.)
3531  */
3532 static int config_unknown_volume(struct drbd_connection *connection, struct packet_info *pi)
3533 {
3534         drbd_warn(connection, "%s packet received for volume %u, which is not configured locally\n",
3535                   cmdname(pi->cmd), pi->vnr);
3536         return ignore_remaining_packet(connection, pi);
3537 }
3538
3539 static int receive_SyncParam(struct drbd_connection *connection, struct packet_info *pi)
3540 {
3541         struct drbd_peer_device *peer_device;
3542         struct drbd_device *device;
3543         struct p_rs_param_95 *p;
3544         unsigned int header_size, data_size, exp_max_sz;
3545         struct crypto_ahash *verify_tfm = NULL;
3546         struct crypto_ahash *csums_tfm = NULL;
3547         struct net_conf *old_net_conf, *new_net_conf = NULL;
3548         struct disk_conf *old_disk_conf = NULL, *new_disk_conf = NULL;
3549         const int apv = connection->agreed_pro_version;
3550         struct fifo_buffer *old_plan = NULL, *new_plan = NULL;
3551         int fifo_size = 0;
3552         int err;
3553
3554         peer_device = conn_peer_device(connection, pi->vnr);
3555         if (!peer_device)
3556                 return config_unknown_volume(connection, pi);
3557         device = peer_device->device;
3558
3559         exp_max_sz  = apv <= 87 ? sizeof(struct p_rs_param)
3560                     : apv == 88 ? sizeof(struct p_rs_param)
3561                                         + SHARED_SECRET_MAX
3562                     : apv <= 94 ? sizeof(struct p_rs_param_89)
3563                     : /* apv >= 95 */ sizeof(struct p_rs_param_95);
3564
3565         if (pi->size > exp_max_sz) {
3566                 drbd_err(device, "SyncParam packet too long: received %u, expected <= %u bytes\n",
3567                     pi->size, exp_max_sz);
3568                 return -EIO;
3569         }
3570
3571         if (apv <= 88) {
3572                 header_size = sizeof(struct p_rs_param);
3573                 data_size = pi->size - header_size;
3574         } else if (apv <= 94) {
3575                 header_size = sizeof(struct p_rs_param_89);
3576                 data_size = pi->size - header_size;
3577                 D_ASSERT(device, data_size == 0);
3578         } else {
3579                 header_size = sizeof(struct p_rs_param_95);
3580                 data_size = pi->size - header_size;
3581                 D_ASSERT(device, data_size == 0);
3582         }
3583
3584         /* initialize verify_alg and csums_alg */
3585         p = pi->data;
3586         memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
3587
3588         err = drbd_recv_all(peer_device->connection, p, header_size);
3589         if (err)
3590                 return err;
3591
3592         mutex_lock(&connection->resource->conf_update);
3593         old_net_conf = peer_device->connection->net_conf;
3594         if (get_ldev(device)) {
3595                 new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3596                 if (!new_disk_conf) {
3597                         put_ldev(device);
3598                         mutex_unlock(&connection->resource->conf_update);
3599                         drbd_err(device, "Allocation of new disk_conf failed\n");
3600                         return -ENOMEM;
3601                 }
3602
3603                 old_disk_conf = device->ldev->disk_conf;
3604                 *new_disk_conf = *old_disk_conf;
3605
3606                 new_disk_conf->resync_rate = be32_to_cpu(p->resync_rate);
3607         }
3608
3609         if (apv >= 88) {
3610                 if (apv == 88) {
3611                         if (data_size > SHARED_SECRET_MAX || data_size == 0) {
3612                                 drbd_err(device, "verify-alg of wrong size, "
3613                                         "peer wants %u, accepting only up to %u byte\n",
3614                                         data_size, SHARED_SECRET_MAX);
3615                                 err = -EIO;
3616                                 goto reconnect;
3617                         }
3618
3619                         err = drbd_recv_all(peer_device->connection, p->verify_alg, data_size);
3620                         if (err)
3621                                 goto reconnect;
3622                         /* we expect NUL terminated string */
3623                         /* but just in case someone tries to be evil */
3624                         D_ASSERT(device, p->verify_alg[data_size-1] == 0);
3625                         p->verify_alg[data_size-1] = 0;
3626
3627                 } else /* apv >= 89 */ {
3628                         /* we still expect NUL terminated strings */
3629                         /* but just in case someone tries to be evil */
3630                         D_ASSERT(device, p->verify_alg[SHARED_SECRET_MAX-1] == 0);
3631                         D_ASSERT(device, p->csums_alg[SHARED_SECRET_MAX-1] == 0);
3632                         p->verify_alg[SHARED_SECRET_MAX-1] = 0;
3633                         p->csums_alg[SHARED_SECRET_MAX-1] = 0;
3634                 }
3635
3636                 if (strcmp(old_net_conf->verify_alg, p->verify_alg)) {
3637                         if (device->state.conn == C_WF_REPORT_PARAMS) {
3638                                 drbd_err(device, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
3639                                     old_net_conf->verify_alg, p->verify_alg);
3640                                 goto disconnect;
3641                         }
3642                         verify_tfm = drbd_crypto_alloc_digest_safe(device,
3643                                         p->verify_alg, "verify-alg");
3644                         if (IS_ERR(verify_tfm)) {
3645                                 verify_tfm = NULL;
3646                                 goto disconnect;
3647                         }
3648                 }
3649
3650                 if (apv >= 89 && strcmp(old_net_conf->csums_alg, p->csums_alg)) {
3651                         if (device->state.conn == C_WF_REPORT_PARAMS) {
3652                                 drbd_err(device, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
3653                                     old_net_conf->csums_alg, p->csums_alg);
3654                                 goto disconnect;
3655                         }
3656                         csums_tfm = drbd_crypto_alloc_digest_safe(device,
3657                                         p->csums_alg, "csums-alg");
3658                         if (IS_ERR(csums_tfm)) {
3659                                 csums_tfm = NULL;
3660                                 goto disconnect;
3661                         }
3662                 }
3663
3664                 if (apv > 94 && new_disk_conf) {
3665                         new_disk_conf->c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
3666                         new_disk_conf->c_delay_target = be32_to_cpu(p->c_delay_target);
3667                         new_disk_conf->c_fill_target = be32_to_cpu(p->c_fill_target);
3668                         new_disk_conf->c_max_rate = be32_to_cpu(p->c_max_rate);
3669
3670                         fifo_size = (new_disk_conf->c_plan_ahead * 10 * SLEEP_TIME) / HZ;
3671                         if (fifo_size != device->rs_plan_s->size) {
3672                                 new_plan = fifo_alloc(fifo_size);
3673                                 if (!new_plan) {
3674                                         drbd_err(device, "kmalloc of fifo_buffer failed");
3675                                         put_ldev(device);
3676                                         goto disconnect;
3677                                 }
3678                         }
3679                 }
3680
3681                 if (verify_tfm || csums_tfm) {
3682                         new_net_conf = kzalloc(sizeof(struct net_conf), GFP_KERNEL);
3683                         if (!new_net_conf) {
3684                                 drbd_err(device, "Allocation of new net_conf failed\n");
3685                                 goto disconnect;
3686                         }
3687
3688                         *new_net_conf = *old_net_conf;
3689
3690                         if (verify_tfm) {
3691                                 strcpy(new_net_conf->verify_alg, p->verify_alg);
3692                                 new_net_conf->verify_alg_len = strlen(p->verify_alg) + 1;
3693                                 crypto_free_ahash(peer_device->connection->verify_tfm);
3694                                 peer_device->connection->verify_tfm = verify_tfm;
3695                                 drbd_info(device, "using verify-alg: \"%s\"\n", p->verify_alg);
3696                         }
3697                         if (csums_tfm) {
3698                                 strcpy(new_net_conf->csums_alg, p->csums_alg);
3699                                 new_net_conf->csums_alg_len = strlen(p->csums_alg) + 1;
3700                                 crypto_free_ahash(peer_device->connection->csums_tfm);
3701                                 peer_device->connection->csums_tfm = csums_tfm;
3702                                 drbd_info(device, "using csums-alg: \"%s\"\n", p->csums_alg);
3703                         }
3704                         rcu_assign_pointer(connection->net_conf, new_net_conf);
3705                 }
3706         }
3707
3708         if (new_disk_conf) {
3709                 rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf);
3710                 put_ldev(device);
3711         }
3712
3713         if (new_plan) {
3714                 old_plan = device->rs_plan_s;
3715                 rcu_assign_pointer(device->rs_plan_s, new_plan);
3716         }
3717
3718         mutex_unlock(&connection->resource->conf_update);
3719         synchronize_rcu();
3720         if (new_net_conf)
3721                 kfree(old_net_conf);
3722         kfree(old_disk_conf);
3723         kfree(old_plan);
3724
3725         return 0;
3726
3727 reconnect:
3728         if (new_disk_conf) {
3729                 put_ldev(device);
3730                 kfree(new_disk_conf);
3731         }
3732         mutex_unlock(&connection->resource->conf_update);
3733         return -EIO;
3734
3735 disconnect:
3736         kfree(new_plan);
3737         if (new_disk_conf) {
3738                 put_ldev(device);
3739                 kfree(new_disk_conf);
3740         }
3741         mutex_unlock(&connection->resource->conf_update);
3742         /* just for completeness: actually not needed,
3743          * as this is not reached if csums_tfm was ok. */
3744         crypto_free_ahash(csums_tfm);
3745         /* but free the verify_tfm again, if csums_tfm did not work out */
3746         crypto_free_ahash(verify_tfm);
3747         conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
3748         return -EIO;
3749 }
3750
3751 /* warn if the arguments differ by more than 12.5% */
3752 static void warn_if_differ_considerably(struct drbd_device *device,
3753         const char *s, sector_t a, sector_t b)
3754 {
3755         sector_t d;
3756         if (a == 0 || b == 0)
3757                 return;
3758         d = (a > b) ? (a - b) : (b - a);
3759         if (d > (a>>3) || d > (b>>3))
3760                 drbd_warn(device, "Considerable difference in %s: %llus vs. %llus\n", s,
3761                      (unsigned long long)a, (unsigned long long)b);
3762 }
3763
3764 static int receive_sizes(struct drbd_connection *connection, struct packet_info *pi)
3765 {
3766         struct drbd_peer_device *peer_device;
3767         struct drbd_device *device;
3768         struct p_sizes *p = pi->data;
3769         enum determine_dev_size dd = DS_UNCHANGED;
3770         sector_t p_size, p_usize, p_csize, my_usize;
3771         int ldsc = 0; /* local disk size changed */
3772         enum dds_flags ddsf;
3773
3774         peer_device = conn_peer_device(connection, pi->vnr);
3775         if (!peer_device)
3776                 return config_unknown_volume(connection, pi);
3777         device = peer_device->device;
3778
3779         p_size = be64_to_cpu(p->d_size);
3780         p_usize = be64_to_cpu(p->u_size);
3781         p_csize = be64_to_cpu(p->c_size);
3782
3783         /* just store the peer's disk size for now.
3784          * we still need to figure out whether we accept that. */
3785         device->p_size = p_size;
3786
3787         if (get_ldev(device)) {
3788                 rcu_read_lock();
3789                 my_usize = rcu_dereference(device->ldev->disk_conf)->disk_size;
3790                 rcu_read_unlock();
3791
3792                 warn_if_differ_considerably(device, "lower level device sizes",
3793                            p_size, drbd_get_max_capacity(device->ldev));
3794                 warn_if_differ_considerably(device, "user requested size",
3795                                             p_usize, my_usize);
3796
3797                 /* if this is the first connect, or an otherwise expected
3798                  * param exchange, choose the minimum */
3799                 if (device->state.conn == C_WF_REPORT_PARAMS)
3800                         p_usize = min_not_zero(my_usize, p_usize);
3801
3802                 /* Never shrink a device with usable data during connect.
3803                    But allow online shrinking if we are connected. */
3804                 if (drbd_new_dev_size(device, device->ldev, p_usize, 0) <
3805                     drbd_get_capacity(device->this_bdev) &&
3806                     device->state.disk >= D_OUTDATED &&
3807                     device->state.conn < C_CONNECTED) {
3808                         drbd_err(device, "The peer's disk size is too small!\n");
3809                         conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
3810                         put_ldev(device);
3811                         return -EIO;
3812                 }
3813
3814                 if (my_usize != p_usize) {
3815                         struct disk_conf *old_disk_conf, *new_disk_conf = NULL;
3816
3817                         new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3818                         if (!new_disk_conf) {
3819                                 drbd_err(device, "Allocation of new disk_conf failed\n");
3820                                 put_ldev(device);
3821                                 return -ENOMEM;
3822                         }
3823
3824                         mutex_lock(&connection->resource->conf_update);
3825                         old_disk_conf = device->ldev->disk_conf;
3826                         *new_disk_conf = *old_disk_conf;
3827                         new_disk_conf->disk_size = p_usize;
3828
3829                         rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf);
3830                         mutex_unlock(&connection->resource->conf_update);
3831                         synchronize_rcu();
3832                         kfree(old_disk_conf);
3833
3834                         drbd_info(device, "Peer sets u_size to %lu sectors\n",
3835                                  (unsigned long)my_usize);
3836                 }
3837
3838                 put_ldev(device);
3839         }
3840
3841         device->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
3842         /* Leave drbd_reconsider_max_bio_size() before drbd_determine_dev_size().
3843            In case we cleared the QUEUE_FLAG_DISCARD from our queue in
3844            drbd_reconsider_max_bio_size(), we can be sure that after
3845            drbd_determine_dev_size() no REQ_DISCARDs are in the queue. */
3846
3847         ddsf = be16_to_cpu(p->dds_flags);
3848         if (get_ldev(device)) {
3849                 drbd_reconsider_max_bio_size(device, device->ldev);
3850                 dd = drbd_determine_dev_size(device, ddsf, NULL);
3851                 put_ldev(device);
3852                 if (dd == DS_ERROR)
3853                         return -EIO;
3854                 drbd_md_sync(device);
3855         } else {
3856                 /*
3857                  * I am diskless, need to accept the peer's *current* size.
3858                  * I must NOT accept the peers backing disk size,
3859                  * it may have been larger than mine all along...
3860                  *
3861                  * At this point, the peer knows more about my disk, or at
3862                  * least about what we last agreed upon, than myself.
3863                  * So if his c_size is less than his d_size, the most likely
3864                  * reason is that *my* d_size was smaller last time we checked.
3865                  *
3866                  * However, if he sends a zero current size,
3867                  * take his (user-capped or) backing disk size anyways.
3868                  */
3869                 drbd_reconsider_max_bio_size(device, NULL);
3870                 drbd_set_my_capacity(device, p_csize ?: p_usize ?: p_size);
3871         }
3872
3873         if (get_ldev(device)) {
3874                 if (device->ldev->known_size != drbd_get_capacity(device->ldev->backing_bdev)) {
3875                         device->ldev->known_size = drbd_get_capacity(device->ldev->backing_bdev);
3876                         ldsc = 1;
3877                 }
3878
3879                 put_ldev(device);
3880         }
3881
3882         if (device->state.conn > C_WF_REPORT_PARAMS) {
3883                 if (be64_to_cpu(p->c_size) !=
3884                     drbd_get_capacity(device->this_bdev) || ldsc) {
3885                         /* we have different sizes, probably peer
3886                          * needs to know my new size... */
3887                         drbd_send_sizes(peer_device, 0, ddsf);
3888                 }
3889                 if (test_and_clear_bit(RESIZE_PENDING, &device->flags) ||
3890                     (dd == DS_GREW && device->state.conn == C_CONNECTED)) {
3891                         if (device->state.pdsk >= D_INCONSISTENT &&
3892                             device->state.disk >= D_INCONSISTENT) {
3893                                 if (ddsf & DDSF_NO_RESYNC)
3894                                         drbd_info(device, "Resync of new storage suppressed with --assume-clean\n");
3895                                 else
3896                                         resync_after_online_grow(device);
3897                         } else
3898                                 set_bit(RESYNC_AFTER_NEG, &device->flags);
3899                 }
3900         }
3901
3902         return 0;
3903 }
3904
3905 static int receive_uuids(struct drbd_connection *connection, struct packet_info *pi)
3906 {
3907         struct drbd_peer_device *peer_device;
3908         struct drbd_device *device;
3909         struct p_uuids *p = pi->data;
3910         u64 *p_uuid;
3911         int i, updated_uuids = 0;
3912
3913         peer_device = conn_peer_device(connection, pi->vnr);
3914         if (!peer_device)
3915                 return config_unknown_volume(connection, pi);
3916         device = peer_device->device;
3917
3918         p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
3919         if (!p_uuid) {
3920                 drbd_err(device, "kmalloc of p_uuid failed\n");
3921                 return false;
3922         }
3923
3924         for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
3925                 p_uuid[i] = be64_to_cpu(p->uuid[i]);
3926
3927         kfree(device->p_uuid);
3928         device->p_uuid = p_uuid;
3929
3930         if (device->state.conn < C_CONNECTED &&
3931             device->state.disk < D_INCONSISTENT &&
3932             device->state.role == R_PRIMARY &&
3933             (device->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
3934                 drbd_err(device, "Can only connect to data with current UUID=%016llX\n",
3935                     (unsigned long long)device->ed_uuid);
3936                 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
3937                 return -EIO;
3938         }
3939
3940         if (get_ldev(device)) {
3941                 int skip_initial_sync =
3942                         device->state.conn == C_CONNECTED &&
3943                         peer_device->connection->agreed_pro_version >= 90 &&
3944                         device->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
3945                         (p_uuid[UI_FLAGS] & 8);
3946                 if (skip_initial_sync) {
3947                         drbd_info(device, "Accepted new current UUID, preparing to skip initial sync\n");
3948                         drbd_bitmap_io(device, &drbd_bmio_clear_n_write,
3949                                         "clear_n_write from receive_uuids",
3950                                         BM_LOCKED_TEST_ALLOWED);
3951                         _drbd_uuid_set(device, UI_CURRENT, p_uuid[UI_CURRENT]);
3952                         _drbd_uuid_set(device, UI_BITMAP, 0);
3953                         _drbd_set_state(_NS2(device, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
3954                                         CS_VERBOSE, NULL);
3955                         drbd_md_sync(device);
3956                         updated_uuids = 1;
3957                 }
3958                 put_ldev(device);
3959         } else if (device->state.disk < D_INCONSISTENT &&
3960                    device->state.role == R_PRIMARY) {
3961                 /* I am a diskless primary, the peer just created a new current UUID
3962                    for me. */
3963                 updated_uuids = drbd_set_ed_uuid(device, p_uuid[UI_CURRENT]);
3964         }
3965
3966         /* Before we test for the disk state, we should wait until an eventually
3967            ongoing cluster wide state change is finished. That is important if
3968            we are primary and are detaching from our disk. We need to see the
3969            new disk state... */
3970         mutex_lock(device->state_mutex);
3971         mutex_unlock(device->state_mutex);
3972         if (device->state.conn >= C_CONNECTED && device->state.disk < D_INCONSISTENT)
3973                 updated_uuids |= drbd_set_ed_uuid(device, p_uuid[UI_CURRENT]);
3974
3975         if (updated_uuids)
3976                 drbd_print_uuids(device, "receiver updated UUIDs to");
3977
3978         return 0;
3979 }
3980
3981 /**
3982  * convert_state() - Converts the peer's view of the cluster state to our point of view
3983  * @ps:         The state as seen by the peer.
3984  */
3985 static union drbd_state convert_state(union drbd_state ps)
3986 {
3987         union drbd_state ms;
3988
3989         static enum drbd_conns c_tab[] = {
3990                 [C_WF_REPORT_PARAMS] = C_WF_REPORT_PARAMS,
3991                 [C_CONNECTED] = C_CONNECTED,
3992
3993                 [C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
3994                 [C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
3995                 [C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
3996                 [C_VERIFY_S]       = C_VERIFY_T,
3997                 [C_MASK]   = C_MASK,
3998         };
3999
4000         ms.i = ps.i;
4001
4002         ms.conn = c_tab[ps.conn];
4003         ms.peer = ps.role;
4004         ms.role = ps.peer;
4005         ms.pdsk = ps.disk;
4006         ms.disk = ps.pdsk;
4007         ms.peer_isp = (ps.aftr_isp | ps.user_isp);
4008
4009         return ms;
4010 }
4011
4012 static int receive_req_state(struct drbd_connection *connection, struct packet_info *pi)
4013 {
4014         struct drbd_peer_device *peer_device;
4015         struct drbd_device *device;
4016         struct p_req_state *p = pi->data;
4017         union drbd_state mask, val;
4018         enum drbd_state_rv rv;
4019
4020         peer_device = conn_peer_device(connection, pi->vnr);
4021         if (!peer_device)
4022                 return -EIO;
4023         device = peer_device->device;
4024
4025         mask.i = be32_to_cpu(p->mask);
4026         val.i = be32_to_cpu(p->val);
4027
4028         if (test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags) &&
4029             mutex_is_locked(device->state_mutex)) {
4030                 drbd_send_sr_reply(peer_device, SS_CONCURRENT_ST_CHG);
4031                 return 0;
4032         }
4033
4034         mask = convert_state(mask);
4035         val = convert_state(val);
4036
4037         rv = drbd_change_state(device, CS_VERBOSE, mask, val);
4038         drbd_send_sr_reply(peer_device, rv);
4039
4040         drbd_md_sync(device);
4041
4042         return 0;
4043 }
4044
4045 static int receive_req_conn_state(struct drbd_connection *connection, struct packet_info *pi)
4046 {
4047         struct p_req_state *p = pi->data;
4048         union drbd_state mask, val;
4049         enum drbd_state_rv rv;
4050
4051         mask.i = be32_to_cpu(p->mask);
4052         val.i = be32_to_cpu(p->val);
4053
4054         if (test_bit(RESOLVE_CONFLICTS, &connection->flags) &&
4055             mutex_is_locked(&connection->cstate_mutex)) {
4056                 conn_send_sr_reply(connection, SS_CONCURRENT_ST_CHG);
4057                 return 0;
4058         }
4059
4060         mask = convert_state(mask);
4061         val = convert_state(val);
4062
4063         rv = conn_request_state(connection, mask, val, CS_VERBOSE | CS_LOCAL_ONLY | CS_IGN_OUTD_FAIL);
4064         conn_send_sr_reply(connection, rv);
4065
4066         return 0;
4067 }
4068
4069 static int receive_state(struct drbd_connection *connection, struct packet_info *pi)
4070 {
4071         struct drbd_peer_device *peer_device;
4072         struct drbd_device *device;
4073         struct p_state *p = pi->data;
4074         union drbd_state os, ns, peer_state;
4075         enum drbd_disk_state real_peer_disk;
4076         enum chg_state_flags cs_flags;
4077         int rv;
4078
4079         peer_device = conn_peer_device(connection, pi->vnr);
4080         if (!peer_device)
4081                 return config_unknown_volume(connection, pi);
4082         device = peer_device->device;
4083
4084         peer_state.i = be32_to_cpu(p->state);
4085
4086         real_peer_disk = peer_state.disk;
4087         if (peer_state.disk == D_NEGOTIATING) {
4088                 real_peer_disk = device->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
4089                 drbd_info(device, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
4090         }
4091
4092         spin_lock_irq(&device->resource->req_lock);
4093  retry:
4094         os = ns = drbd_read_state(device);
4095         spin_unlock_irq(&device->resource->req_lock);
4096
4097         /* If some other part of the code (ack_receiver thread, timeout)
4098          * already decided to close the connection again,
4099          * we must not "re-establish" it here. */
4100         if (os.conn <= C_TEAR_DOWN)
4101                 return -ECONNRESET;
4102
4103         /* If this is the "end of sync" confirmation, usually the peer disk
4104          * transitions from D_INCONSISTENT to D_UP_TO_DATE. For empty (0 bits
4105          * set) resync started in PausedSyncT, or if the timing of pause-/
4106          * unpause-sync events has been "just right", the peer disk may
4107          * transition from D_CONSISTENT to D_UP_TO_DATE as well.
4108          */
4109         if ((os.pdsk == D_INCONSISTENT || os.pdsk == D_CONSISTENT) &&
4110             real_peer_disk == D_UP_TO_DATE &&
4111             os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
4112                 /* If we are (becoming) SyncSource, but peer is still in sync
4113                  * preparation, ignore its uptodate-ness to avoid flapping, it
4114                  * will change to inconsistent once the peer reaches active
4115                  * syncing states.
4116                  * It may have changed syncer-paused flags, however, so we
4117                  * cannot ignore this completely. */
4118                 if (peer_state.conn > C_CONNECTED &&
4119                     peer_state.conn < C_SYNC_SOURCE)
4120                         real_peer_disk = D_INCONSISTENT;
4121
4122                 /* if peer_state changes to connected at the same time,
4123                  * it explicitly notifies us that it finished resync.
4124                  * Maybe we should finish it up, too? */
4125                 else if (os.conn >= C_SYNC_SOURCE &&
4126                          peer_state.conn == C_CONNECTED) {
4127                         if (drbd_bm_total_weight(device) <= device->rs_failed)
4128                                 drbd_resync_finished(device);
4129                         return 0;
4130                 }
4131         }
4132
4133         /* explicit verify finished notification, stop sector reached. */
4134         if (os.conn == C_VERIFY_T && os.disk == D_UP_TO_DATE &&
4135             peer_state.conn == C_CONNECTED && real_peer_disk == D_UP_TO_DATE) {
4136                 ov_out_of_sync_print(device);
4137                 drbd_resync_finished(device);
4138                 return 0;
4139         }
4140
4141         /* peer says his disk is inconsistent, while we think it is uptodate,
4142          * and this happens while the peer still thinks we have a sync going on,
4143          * but we think we are already done with the sync.
4144          * We ignore this to avoid flapping pdsk.
4145          * This should not happen, if the peer is a recent version of drbd. */
4146         if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
4147             os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
4148                 real_peer_disk = D_UP_TO_DATE;
4149
4150         if (ns.conn == C_WF_REPORT_PARAMS)
4151                 ns.conn = C_CONNECTED;
4152
4153         if (peer_state.conn == C_AHEAD)
4154                 ns.conn = C_BEHIND;
4155
4156         if (device->p_uuid && peer_state.disk >= D_NEGOTIATING &&
4157             get_ldev_if_state(device, D_NEGOTIATING)) {
4158                 int cr; /* consider resync */
4159
4160                 /* if we established a new connection */
4161                 cr  = (os.conn < C_CONNECTED);
4162                 /* if we had an established connection
4163                  * and one of the nodes newly attaches a disk */
4164                 cr |= (os.conn == C_CONNECTED &&
4165                        (peer_state.disk == D_NEGOTIATING ||
4166                         os.disk == D_NEGOTIATING));
4167                 /* if we have both been inconsistent, and the peer has been
4168                  * forced to be UpToDate with --overwrite-data */
4169                 cr |= test_bit(CONSIDER_RESYNC, &device->flags);
4170                 /* if we had been plain connected, and the admin requested to
4171                  * start a sync by "invalidate" or "invalidate-remote" */
4172                 cr |= (os.conn == C_CONNECTED &&
4173                                 (peer_state.conn >= C_STARTING_SYNC_S &&
4174                                  peer_state.conn <= C_WF_BITMAP_T));
4175
4176                 if (cr)
4177                         ns.conn = drbd_sync_handshake(peer_device, peer_state.role, real_peer_disk);
4178
4179                 put_ldev(device);
4180                 if (ns.conn == C_MASK) {
4181                         ns.conn = C_CONNECTED;
4182                         if (device->state.disk == D_NEGOTIATING) {
4183                                 drbd_force_state(device, NS(disk, D_FAILED));
4184                         } else if (peer_state.disk == D_NEGOTIATING) {
4185                                 drbd_err(device, "Disk attach process on the peer node was aborted.\n");
4186                                 peer_state.disk = D_DISKLESS;
4187                                 real_peer_disk = D_DISKLESS;
4188                         } else {
4189                                 if (test_and_clear_bit(CONN_DRY_RUN, &peer_device->connection->flags))
4190                                         return -EIO;
4191                                 D_ASSERT(device, os.conn == C_WF_REPORT_PARAMS);
4192                                 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4193                                 return -EIO;
4194                         }
4195                 }
4196         }
4197
4198         spin_lock_irq(&device->resource->req_lock);
4199         if (os.i != drbd_read_state(device).i)
4200                 goto retry;
4201         clear_bit(CONSIDER_RESYNC, &device->flags);
4202         ns.peer = peer_state.role;
4203         ns.pdsk = real_peer_disk;
4204         ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
4205         if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
4206                 ns.disk = device->new_state_tmp.disk;
4207         cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
4208         if (ns.pdsk == D_CONSISTENT && drbd_suspended(device) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
4209             test_bit(NEW_CUR_UUID, &device->flags)) {
4210                 /* Do not allow tl_restart(RESEND) for a rebooted peer. We can only allow this
4211                    for temporal network outages! */
4212                 spin_unlock_irq(&device->resource->req_lock);
4213                 drbd_err(device, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
4214                 tl_clear(peer_device->connection);
4215                 drbd_uuid_new_current(device);
4216                 clear_bit(NEW_CUR_UUID, &device->flags);
4217                 conn_request_state(peer_device->connection, NS2(conn, C_PROTOCOL_ERROR, susp, 0), CS_HARD);
4218                 return -EIO;
4219         }
4220         rv = _drbd_set_state(device, ns, cs_flags, NULL);
4221         ns = drbd_read_state(device);
4222         spin_unlock_irq(&device->resource->req_lock);
4223
4224         if (rv < SS_SUCCESS) {
4225                 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4226                 return -EIO;
4227         }
4228
4229         if (os.conn > C_WF_REPORT_PARAMS) {
4230                 if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
4231                     peer_state.disk != D_NEGOTIATING ) {
4232                         /* we want resync, peer has not yet decided to sync... */
4233                         /* Nowadays only used when forcing a node into primary role and
4234                            setting its disk to UpToDate with that */
4235                         drbd_send_uuids(peer_device);
4236                         drbd_send_current_state(peer_device);
4237                 }
4238         }
4239
4240         clear_bit(DISCARD_MY_DATA, &device->flags);
4241
4242         drbd_md_sync(device); /* update connected indicator, la_size_sect, ... */
4243
4244         return 0;
4245 }
4246
4247 static int receive_sync_uuid(struct drbd_connection *connection, struct packet_info *pi)
4248 {
4249         struct drbd_peer_device *peer_device;
4250         struct drbd_device *device;
4251         struct p_rs_uuid *p = pi->data;
4252
4253         peer_device = conn_peer_device(connection, pi->vnr);
4254         if (!peer_device)
4255                 return -EIO;
4256         device = peer_device->device;
4257
4258         wait_event(device->misc_wait,
4259                    device->state.conn == C_WF_SYNC_UUID ||
4260                    device->state.conn == C_BEHIND ||
4261                    device->state.conn < C_CONNECTED ||
4262                    device->state.disk < D_NEGOTIATING);
4263
4264         /* D_ASSERT(device,  device->state.conn == C_WF_SYNC_UUID ); */
4265
4266         /* Here the _drbd_uuid_ functions are right, current should
4267            _not_ be rotated into the history */
4268         if (get_ldev_if_state(device, D_NEGOTIATING)) {
4269                 _drbd_uuid_set(device, UI_CURRENT, be64_to_cpu(p->uuid));
4270                 _drbd_uuid_set(device, UI_BITMAP, 0UL);
4271
4272                 drbd_print_uuids(device, "updated sync uuid");
4273                 drbd_start_resync(device, C_SYNC_TARGET);
4274
4275                 put_ldev(device);
4276         } else
4277                 drbd_err(device, "Ignoring SyncUUID packet!\n");
4278
4279         return 0;
4280 }
4281
4282 /**
4283  * receive_bitmap_plain
4284  *
4285  * Return 0 when done, 1 when another iteration is needed, and a negative error
4286  * code upon failure.
4287  */
4288 static int
4289 receive_bitmap_plain(struct drbd_peer_device *peer_device, unsigned int size,
4290                      unsigned long *p, struct bm_xfer_ctx *c)
4291 {
4292         unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE -
4293                                  drbd_header_size(peer_device->connection);
4294         unsigned int num_words = min_t(size_t, data_size / sizeof(*p),
4295                                        c->bm_words - c->word_offset);
4296         unsigned int want = num_words * sizeof(*p);
4297         int err;
4298
4299         if (want != size) {
4300                 drbd_err(peer_device, "%s:want (%u) != size (%u)\n", __func__, want, size);
4301                 return -EIO;
4302         }
4303         if (want == 0)
4304                 return 0;
4305         err = drbd_recv_all(peer_device->connection, p, want);
4306         if (err)
4307                 return err;
4308
4309         drbd_bm_merge_lel(peer_device->device, c->word_offset, num_words, p);
4310
4311         c->word_offset += num_words;
4312         c->bit_offset = c->word_offset * BITS_PER_LONG;
4313         if (c->bit_offset > c->bm_bits)
4314                 c->bit_offset = c->bm_bits;
4315
4316         return 1;
4317 }
4318
4319 static enum drbd_bitmap_code dcbp_get_code(struct p_compressed_bm *p)
4320 {
4321         return (enum drbd_bitmap_code)(p->encoding & 0x0f);
4322 }
4323
4324 static int dcbp_get_start(struct p_compressed_bm *p)
4325 {
4326         return (p->encoding & 0x80) != 0;
4327 }
4328
4329 static int dcbp_get_pad_bits(struct p_compressed_bm *p)
4330 {
4331         return (p->encoding >> 4) & 0x7;
4332 }
4333
4334 /**
4335  * recv_bm_rle_bits
4336  *
4337  * Return 0 when done, 1 when another iteration is needed, and a negative error
4338  * code upon failure.
4339  */
4340 static int
4341 recv_bm_rle_bits(struct drbd_peer_device *peer_device,
4342                 struct p_compressed_bm *p,
4343                  struct bm_xfer_ctx *c,
4344                  unsigned int len)
4345 {
4346         struct bitstream bs;
4347         u64 look_ahead;
4348         u64 rl;
4349         u64 tmp;
4350         unsigned long s = c->bit_offset;
4351         unsigned long e;
4352         int toggle = dcbp_get_start(p);
4353         int have;
4354         int bits;
4355
4356         bitstream_init(&bs, p->code, len, dcbp_get_pad_bits(p));
4357
4358         bits = bitstream_get_bits(&bs, &look_ahead, 64);
4359         if (bits < 0)
4360                 return -EIO;
4361
4362         for (have = bits; have > 0; s += rl, toggle = !toggle) {
4363                 bits = vli_decode_bits(&rl, look_ahead);
4364                 if (bits <= 0)
4365                         return -EIO;
4366
4367                 if (toggle) {
4368                         e = s + rl -1;
4369                         if (e >= c->bm_bits) {
4370                                 drbd_err(peer_device, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
4371                                 return -EIO;
4372                         }
4373                         _drbd_bm_set_bits(peer_device->device, s, e);
4374                 }
4375
4376                 if (have < bits) {
4377                         drbd_err(peer_device, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
4378                                 have, bits, look_ahead,
4379                                 (unsigned int)(bs.cur.b - p->code),
4380                                 (unsigned int)bs.buf_len);
4381                         return -EIO;
4382                 }
4383                 /* if we consumed all 64 bits, assign 0; >> 64 is "undefined"; */
4384                 if (likely(bits < 64))
4385                         look_ahead >>= bits;
4386                 else
4387                         look_ahead = 0;
4388                 have -= bits;
4389
4390                 bits = bitstream_get_bits(&bs, &tmp, 64 - have);
4391                 if (bits < 0)
4392                         return -EIO;
4393                 look_ahead |= tmp << have;
4394                 have += bits;
4395         }
4396
4397         c->bit_offset = s;
4398         bm_xfer_ctx_bit_to_word_offset(c);
4399
4400         return (s != c->bm_bits);
4401 }
4402
4403 /**
4404  * decode_bitmap_c
4405  *
4406  * Return 0 when done, 1 when another iteration is needed, and a negative error
4407  * code upon failure.
4408  */
4409 static int
4410 decode_bitmap_c(struct drbd_peer_device *peer_device,
4411                 struct p_compressed_bm *p,
4412                 struct bm_xfer_ctx *c,
4413                 unsigned int len)
4414 {
4415         if (dcbp_get_code(p) == RLE_VLI_Bits)
4416                 return recv_bm_rle_bits(peer_device, p, c, len - sizeof(*p));
4417
4418         /* other variants had been implemented for evaluation,
4419          * but have been dropped as this one turned out to be "best"
4420          * during all our tests. */
4421
4422         drbd_err(peer_device, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
4423         conn_request_state(peer_device->connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4424         return -EIO;
4425 }
4426
4427 void INFO_bm_xfer_stats(struct drbd_device *device,
4428                 const char *direction, struct bm_xfer_ctx *c)
4429 {
4430         /* what would it take to transfer it "plaintext" */
4431         unsigned int header_size = drbd_header_size(first_peer_device(device)->connection);
4432         unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE - header_size;
4433         unsigned int plain =
4434                 header_size * (DIV_ROUND_UP(c->bm_words, data_size) + 1) +
4435                 c->bm_words * sizeof(unsigned long);
4436         unsigned int total = c->bytes[0] + c->bytes[1];
4437         unsigned int r;
4438
4439         /* total can not be zero. but just in case: */
4440         if (total == 0)
4441                 return;
4442
4443         /* don't report if not compressed */
4444         if (total >= plain)
4445                 return;
4446
4447         /* total < plain. check for overflow, still */
4448         r = (total > UINT_MAX/1000) ? (total / (plain/1000))
4449                                     : (1000 * total / plain);
4450
4451         if (r > 1000)
4452                 r = 1000;
4453
4454         r = 1000 - r;
4455         drbd_info(device, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
4456              "total %u; compression: %u.%u%%\n",
4457                         direction,
4458                         c->bytes[1], c->packets[1],
4459                         c->bytes[0], c->packets[0],
4460                         total, r/10, r % 10);
4461 }
4462
4463 /* Since we are processing the bitfield from lower addresses to higher,
4464    it does not matter if the process it in 32 bit chunks or 64 bit
4465    chunks as long as it is little endian. (Understand it as byte stream,
4466    beginning with the lowest byte...) If we would use big endian
4467    we would need to process it from the highest address to the lowest,
4468    in order to be agnostic to the 32 vs 64 bits issue.
4469
4470    returns 0 on failure, 1 if we successfully received it. */
4471 static int receive_bitmap(struct drbd_connection *connection, struct packet_info *pi)
4472 {
4473         struct drbd_peer_device *peer_device;
4474         struct drbd_device *device;
4475         struct bm_xfer_ctx c;
4476         int err;
4477
4478         peer_device = conn_peer_device(connection, pi->vnr);
4479         if (!peer_device)
4480                 return -EIO;
4481         device = peer_device->device;
4482
4483         drbd_bm_lock(device, "receive bitmap", BM_LOCKED_SET_ALLOWED);
4484         /* you are supposed to send additional out-of-sync information
4485          * if you actually set bits during this phase */
4486
4487         c = (struct bm_xfer_ctx) {
4488                 .bm_bits = drbd_bm_bits(device),
4489                 .bm_words = drbd_bm_words(device),
4490         };
4491
4492         for(;;) {
4493                 if (pi->cmd == P_BITMAP)
4494                         err = receive_bitmap_plain(peer_device, pi->size, pi->data, &c);
4495                 else if (pi->cmd == P_COMPRESSED_BITMAP) {
4496                         /* MAYBE: sanity check that we speak proto >= 90,
4497                          * and the feature is enabled! */
4498                         struct p_compressed_bm *p = pi->data;
4499
4500                         if (pi->size > DRBD_SOCKET_BUFFER_SIZE - drbd_header_size(connection)) {
4501                                 drbd_err(device, "ReportCBitmap packet too large\n");
4502                                 err = -EIO;
4503                                 goto out;
4504                         }
4505                         if (pi->size <= sizeof(*p)) {
4506                                 drbd_err(device, "ReportCBitmap packet too small (l:%u)\n", pi->size);
4507                                 err = -EIO;
4508                                 goto out;
4509                         }
4510                         err = drbd_recv_all(peer_device->connection, p, pi->size);
4511                         if (err)
4512                                goto out;
4513                         err = decode_bitmap_c(peer_device, p, &c, pi->size);
4514                 } else {
4515                         drbd_warn(device, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", pi->cmd);
4516                         err = -EIO;
4517                         goto out;
4518                 }
4519
4520                 c.packets[pi->cmd == P_BITMAP]++;
4521                 c.bytes[pi->cmd == P_BITMAP] += drbd_header_size(connection) + pi->size;
4522
4523                 if (err <= 0) {
4524                         if (err < 0)
4525                                 goto out;
4526                         break;
4527                 }
4528                 err = drbd_recv_header(peer_device->connection, pi);
4529                 if (err)
4530                         goto out;
4531         }
4532
4533         INFO_bm_xfer_stats(device, "receive", &c);
4534
4535         if (device->state.conn == C_WF_BITMAP_T) {
4536                 enum drbd_state_rv rv;
4537
4538                 err = drbd_send_bitmap(device);
4539                 if (err)
4540                         goto out;
4541                 /* Omit CS_ORDERED with this state transition to avoid deadlocks. */
4542                 rv = _drbd_request_state(device, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
4543                 D_ASSERT(device, rv == SS_SUCCESS);
4544         } else if (device->state.conn != C_WF_BITMAP_S) {
4545                 /* admin may have requested C_DISCONNECTING,
4546                  * other threads may have noticed network errors */
4547                 drbd_info(device, "unexpected cstate (%s) in receive_bitmap\n",
4548                     drbd_conn_str(device->state.conn));
4549         }
4550         err = 0;
4551
4552  out:
4553         drbd_bm_unlock(device);
4554         if (!err && device->state.conn == C_WF_BITMAP_S)
4555                 drbd_start_resync(device, C_SYNC_SOURCE);
4556         return err;
4557 }
4558
4559 static int receive_skip(struct drbd_connection *connection, struct packet_info *pi)
4560 {
4561         drbd_warn(connection, "skipping unknown optional packet type %d, l: %d!\n",
4562                  pi->cmd, pi->size);
4563
4564         return ignore_remaining_packet(connection, pi);
4565 }
4566
4567 static int receive_UnplugRemote(struct drbd_connection *connection, struct packet_info *pi)
4568 {
4569         /* Make sure we've acked all the TCP data associated
4570          * with the data requests being unplugged */
4571         drbd_tcp_quickack(connection->data.socket);
4572
4573         return 0;
4574 }
4575
4576 static int receive_out_of_sync(struct drbd_connection *connection, struct packet_info *pi)
4577 {
4578         struct drbd_peer_device *peer_device;
4579         struct drbd_device *device;
4580         struct p_block_desc *p = pi->data;
4581
4582         peer_device = conn_peer_device(connection, pi->vnr);
4583         if (!peer_device)
4584                 return -EIO;
4585         device = peer_device->device;
4586
4587         switch (device->state.conn) {
4588         case C_WF_SYNC_UUID:
4589         case C_WF_BITMAP_T:
4590         case C_BEHIND:
4591                         break;
4592         default:
4593                 drbd_err(device, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
4594                                 drbd_conn_str(device->state.conn));
4595         }
4596
4597         drbd_set_out_of_sync(device, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
4598
4599         return 0;
4600 }
4601
4602 struct data_cmd {
4603         int expect_payload;
4604         size_t pkt_size;
4605         int (*fn)(struct drbd_connection *, struct packet_info *);
4606 };
4607
4608 static struct data_cmd drbd_cmd_handler[] = {
4609         [P_DATA]            = { 1, sizeof(struct p_data), receive_Data },
4610         [P_DATA_REPLY]      = { 1, sizeof(struct p_data), receive_DataReply },
4611         [P_RS_DATA_REPLY]   = { 1, sizeof(struct p_data), receive_RSDataReply } ,
4612         [P_BARRIER]         = { 0, sizeof(struct p_barrier), receive_Barrier } ,
4613         [P_BITMAP]          = { 1, 0, receive_bitmap } ,
4614         [P_COMPRESSED_BITMAP] = { 1, 0, receive_bitmap } ,
4615         [P_UNPLUG_REMOTE]   = { 0, 0, receive_UnplugRemote },
4616         [P_DATA_REQUEST]    = { 0, sizeof(struct p_block_req), receive_DataRequest },
4617         [P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
4618         [P_SYNC_PARAM]      = { 1, 0, receive_SyncParam },
4619         [P_SYNC_PARAM89]    = { 1, 0, receive_SyncParam },
4620         [P_PROTOCOL]        = { 1, sizeof(struct p_protocol), receive_protocol },
4621         [P_UUIDS]           = { 0, sizeof(struct p_uuids), receive_uuids },
4622         [P_SIZES]           = { 0, sizeof(struct p_sizes), receive_sizes },
4623         [P_STATE]           = { 0, sizeof(struct p_state), receive_state },
4624         [P_STATE_CHG_REQ]   = { 0, sizeof(struct p_req_state), receive_req_state },
4625         [P_SYNC_UUID]       = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
4626         [P_OV_REQUEST]      = { 0, sizeof(struct p_block_req), receive_DataRequest },
4627         [P_OV_REPLY]        = { 1, sizeof(struct p_block_req), receive_DataRequest },
4628         [P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
4629         [P_DELAY_PROBE]     = { 0, sizeof(struct p_delay_probe93), receive_skip },
4630         [P_OUT_OF_SYNC]     = { 0, sizeof(struct p_block_desc), receive_out_of_sync },
4631         [P_CONN_ST_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_conn_state },
4632         [P_PROTOCOL_UPDATE] = { 1, sizeof(struct p_protocol), receive_protocol },
4633         [P_TRIM]            = { 0, sizeof(struct p_trim), receive_Data },
4634 };
4635
4636 static void drbdd(struct drbd_connection *connection)
4637 {
4638         struct packet_info pi;
4639         size_t shs; /* sub header size */
4640         int err;
4641
4642         while (get_t_state(&connection->receiver) == RUNNING) {
4643                 struct data_cmd *cmd;
4644
4645                 drbd_thread_current_set_cpu(&connection->receiver);
4646                 update_receiver_timing_details(connection, drbd_recv_header);
4647                 if (drbd_recv_header(connection, &pi))
4648                         goto err_out;
4649
4650                 cmd = &drbd_cmd_handler[pi.cmd];
4651                 if (unlikely(pi.cmd >= ARRAY_SIZE(drbd_cmd_handler) || !cmd->fn)) {
4652                         drbd_err(connection, "Unexpected data packet %s (0x%04x)",
4653                                  cmdname(pi.cmd), pi.cmd);
4654                         goto err_out;
4655                 }
4656
4657                 shs = cmd->pkt_size;
4658                 if (pi.size > shs && !cmd->expect_payload) {
4659                         drbd_err(connection, "No payload expected %s l:%d\n",
4660                                  cmdname(pi.cmd), pi.size);
4661                         goto err_out;
4662                 }
4663
4664                 if (shs) {
4665                         update_receiver_timing_details(connection, drbd_recv_all_warn);
4666                         err = drbd_recv_all_warn(connection, pi.data, shs);
4667                         if (err)
4668                                 goto err_out;
4669                         pi.size -= shs;
4670                 }
4671
4672                 update_receiver_timing_details(connection, cmd->fn);
4673                 err = cmd->fn(connection, &pi);
4674                 if (err) {
4675                         drbd_err(connection, "error receiving %s, e: %d l: %d!\n",
4676                                  cmdname(pi.cmd), err, pi.size);
4677                         goto err_out;
4678                 }
4679         }
4680         return;
4681
4682     err_out:
4683         conn_request_state(connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4684 }
4685
4686 static void conn_disconnect(struct drbd_connection *connection)
4687 {
4688         struct drbd_peer_device *peer_device;
4689         enum drbd_conns oc;
4690         int vnr;
4691
4692         if (connection->cstate == C_STANDALONE)
4693                 return;
4694
4695         /* We are about to start the cleanup after connection loss.
4696          * Make sure drbd_make_request knows about that.
4697          * Usually we should be in some network failure state already,
4698          * but just in case we are not, we fix it up here.
4699          */
4700         conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
4701
4702         /* ack_receiver does not clean up anything. it must not interfere, either */
4703         drbd_thread_stop(&connection->ack_receiver);
4704         if (connection->ack_sender) {
4705                 destroy_workqueue(connection->ack_sender);
4706                 connection->ack_sender = NULL;
4707         }
4708         drbd_free_sock(connection);
4709
4710         rcu_read_lock();
4711         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
4712                 struct drbd_device *device = peer_device->device;
4713                 kref_get(&device->kref);
4714                 rcu_read_unlock();
4715                 drbd_disconnected(peer_device);
4716                 kref_put(&device->kref, drbd_destroy_device);
4717                 rcu_read_lock();
4718         }
4719         rcu_read_unlock();
4720
4721         if (!list_empty(&connection->current_epoch->list))
4722                 drbd_err(connection, "ASSERTION FAILED: connection->current_epoch->list not empty\n");
4723         /* ok, no more ee's on the fly, it is safe to reset the epoch_size */
4724         atomic_set(&connection->current_epoch->epoch_size, 0);
4725         connection->send.seen_any_write_yet = false;
4726
4727         drbd_info(connection, "Connection closed\n");
4728
4729         if (conn_highest_role(connection) == R_PRIMARY && conn_highest_pdsk(connection) >= D_UNKNOWN)
4730                 conn_try_outdate_peer_async(connection);
4731
4732         spin_lock_irq(&connection->resource->req_lock);
4733         oc = connection->cstate;
4734         if (oc >= C_UNCONNECTED)
4735                 _conn_request_state(connection, NS(conn, C_UNCONNECTED), CS_VERBOSE);
4736
4737         spin_unlock_irq(&connection->resource->req_lock);
4738
4739         if (oc == C_DISCONNECTING)
4740                 conn_request_state(connection, NS(conn, C_STANDALONE), CS_VERBOSE | CS_HARD);
4741 }
4742
4743 static int drbd_disconnected(struct drbd_peer_device *peer_device)
4744 {
4745         struct drbd_device *device = peer_device->device;
4746         unsigned int i;
4747
4748         /* wait for current activity to cease. */
4749         spin_lock_irq(&device->resource->req_lock);
4750         _drbd_wait_ee_list_empty(device, &device->active_ee);
4751         _drbd_wait_ee_list_empty(device, &device->sync_ee);
4752         _drbd_wait_ee_list_empty(device, &device->read_ee);
4753         spin_unlock_irq(&device->resource->req_lock);
4754
4755         /* We do not have data structures that would allow us to
4756          * get the rs_pending_cnt down to 0 again.
4757          *  * On C_SYNC_TARGET we do not have any data structures describing
4758          *    the pending RSDataRequest's we have sent.
4759          *  * On C_SYNC_SOURCE there is no data structure that tracks
4760          *    the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
4761          *  And no, it is not the sum of the reference counts in the
4762          *  resync_LRU. The resync_LRU tracks the whole operation including
4763          *  the disk-IO, while the rs_pending_cnt only tracks the blocks
4764          *  on the fly. */
4765         drbd_rs_cancel_all(device);
4766         device->rs_total = 0;
4767         device->rs_failed = 0;
4768         atomic_set(&device->rs_pending_cnt, 0);
4769         wake_up(&device->misc_wait);
4770
4771         del_timer_sync(&device->resync_timer);
4772         resync_timer_fn((unsigned long)device);
4773
4774         /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
4775          * w_make_resync_request etc. which may still be on the worker queue
4776          * to be "canceled" */
4777         drbd_flush_workqueue(&peer_device->connection->sender_work);
4778
4779         drbd_finish_peer_reqs(device);
4780
4781         /* This second workqueue flush is necessary, since drbd_finish_peer_reqs()
4782            might have issued a work again. The one before drbd_finish_peer_reqs() is
4783            necessary to reclain net_ee in drbd_finish_peer_reqs(). */
4784         drbd_flush_workqueue(&peer_device->connection->sender_work);
4785
4786         /* need to do it again, drbd_finish_peer_reqs() may have populated it
4787          * again via drbd_try_clear_on_disk_bm(). */
4788         drbd_rs_cancel_all(device);
4789
4790         kfree(device->p_uuid);
4791         device->p_uuid = NULL;
4792
4793         if (!drbd_suspended(device))
4794                 tl_clear(peer_device->connection);
4795
4796         drbd_md_sync(device);
4797
4798         /* serialize with bitmap writeout triggered by the state change,
4799          * if any. */
4800         wait_event(device->misc_wait, !test_bit(BITMAP_IO, &device->flags));
4801
4802         /* tcp_close and release of sendpage pages can be deferred.  I don't
4803          * want to use SO_LINGER, because apparently it can be deferred for
4804          * more than 20 seconds (longest time I checked).
4805          *
4806          * Actually we don't care for exactly when the network stack does its
4807          * put_page(), but release our reference on these pages right here.
4808          */
4809         i = drbd_free_peer_reqs(device, &device->net_ee);
4810         if (i)
4811                 drbd_info(device, "net_ee not empty, killed %u entries\n", i);
4812         i = atomic_read(&device->pp_in_use_by_net);
4813         if (i)
4814                 drbd_info(device, "pp_in_use_by_net = %d, expected 0\n", i);
4815         i = atomic_read(&device->pp_in_use);
4816         if (i)
4817                 drbd_info(device, "pp_in_use = %d, expected 0\n", i);
4818
4819         D_ASSERT(device, list_empty(&device->read_ee));
4820         D_ASSERT(device, list_empty(&device->active_ee));
4821         D_ASSERT(device, list_empty(&device->sync_ee));
4822         D_ASSERT(device, list_empty(&device->done_ee));
4823
4824         return 0;
4825 }
4826
4827 /*
4828  * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
4829  * we can agree on is stored in agreed_pro_version.
4830  *
4831  * feature flags and the reserved array should be enough room for future
4832  * enhancements of the handshake protocol, and possible plugins...
4833  *
4834  * for now, they are expected to be zero, but ignored.
4835  */
4836 static int drbd_send_features(struct drbd_connection *connection)
4837 {
4838         struct drbd_socket *sock;
4839         struct p_connection_features *p;
4840
4841         sock = &connection->data;
4842         p = conn_prepare_command(connection, sock);
4843         if (!p)
4844                 return -EIO;
4845         memset(p, 0, sizeof(*p));
4846         p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
4847         p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
4848         p->feature_flags = cpu_to_be32(PRO_FEATURES);
4849         return conn_send_command(connection, sock, P_CONNECTION_FEATURES, sizeof(*p), NULL, 0);
4850 }
4851
4852 /*
4853  * return values:
4854  *   1 yes, we have a valid connection
4855  *   0 oops, did not work out, please try again
4856  *  -1 peer talks different language,
4857  *     no point in trying again, please go standalone.
4858  */
4859 static int drbd_do_features(struct drbd_connection *connection)
4860 {
4861         /* ASSERT current == connection->receiver ... */
4862         struct p_connection_features *p;
4863         const int expect = sizeof(struct p_connection_features);
4864         struct packet_info pi;
4865         int err;
4866
4867         err = drbd_send_features(connection);
4868         if (err)
4869                 return 0;
4870
4871         err = drbd_recv_header(connection, &pi);
4872         if (err)
4873                 return 0;
4874
4875         if (pi.cmd != P_CONNECTION_FEATURES) {
4876                 drbd_err(connection, "expected ConnectionFeatures packet, received: %s (0x%04x)\n",
4877                          cmdname(pi.cmd), pi.cmd);
4878                 return -1;
4879         }
4880
4881         if (pi.size != expect) {
4882                 drbd_err(connection, "expected ConnectionFeatures length: %u, received: %u\n",
4883                      expect, pi.size);
4884                 return -1;
4885         }
4886
4887         p = pi.data;
4888         err = drbd_recv_all_warn(connection, p, expect);
4889         if (err)
4890                 return 0;
4891
4892         p->protocol_min = be32_to_cpu(p->protocol_min);
4893         p->protocol_max = be32_to_cpu(p->protocol_max);
4894         if (p->protocol_max == 0)
4895                 p->protocol_max = p->protocol_min;
4896
4897         if (PRO_VERSION_MAX < p->protocol_min ||
4898             PRO_VERSION_MIN > p->protocol_max)
4899                 goto incompat;
4900
4901         connection->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
4902         connection->agreed_features = PRO_FEATURES & be32_to_cpu(p->feature_flags);
4903
4904         drbd_info(connection, "Handshake successful: "
4905              "Agreed network protocol version %d\n", connection->agreed_pro_version);
4906
4907         drbd_info(connection, "Agreed to%ssupport TRIM on protocol level\n",
4908                   connection->agreed_features & FF_TRIM ? " " : " not ");
4909
4910         return 1;
4911
4912  incompat:
4913         drbd_err(connection, "incompatible DRBD dialects: "
4914             "I support %d-%d, peer supports %d-%d\n",
4915             PRO_VERSION_MIN, PRO_VERSION_MAX,
4916             p->protocol_min, p->protocol_max);
4917         return -1;
4918 }
4919
4920 #if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
4921 static int drbd_do_auth(struct drbd_connection *connection)
4922 {
4923         drbd_err(connection, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
4924         drbd_err(connection, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
4925         return -1;
4926 }
4927 #else
4928 #define CHALLENGE_LEN 64
4929
4930 /* Return value:
4931         1 - auth succeeded,
4932         0 - failed, try again (network error),
4933         -1 - auth failed, don't try again.
4934 */
4935
4936 static int drbd_do_auth(struct drbd_connection *connection)
4937 {
4938         struct drbd_socket *sock;
4939         char my_challenge[CHALLENGE_LEN];  /* 64 Bytes... */
4940         char *response = NULL;
4941         char *right_response = NULL;
4942         char *peers_ch = NULL;
4943         unsigned int key_len;
4944         char secret[SHARED_SECRET_MAX]; /* 64 byte */
4945         unsigned int resp_size;
4946         SHASH_DESC_ON_STACK(desc, connection->cram_hmac_tfm);
4947         struct packet_info pi;
4948         struct net_conf *nc;
4949         int err, rv;
4950
4951         /* FIXME: Put the challenge/response into the preallocated socket buffer.  */
4952
4953         rcu_read_lock();
4954         nc = rcu_dereference(connection->net_conf);
4955         key_len = strlen(nc->shared_secret);
4956         memcpy(secret, nc->shared_secret, key_len);
4957         rcu_read_unlock();
4958
4959         desc->tfm = connection->cram_hmac_tfm;
4960         desc->flags = 0;
4961
4962         rv = crypto_shash_setkey(connection->cram_hmac_tfm, (u8 *)secret, key_len);
4963         if (rv) {
4964                 drbd_err(connection, "crypto_shash_setkey() failed with %d\n", rv);
4965                 rv = -1;
4966                 goto fail;
4967         }
4968
4969         get_random_bytes(my_challenge, CHALLENGE_LEN);
4970
4971         sock = &connection->data;
4972         if (!conn_prepare_command(connection, sock)) {
4973                 rv = 0;
4974                 goto fail;
4975         }
4976         rv = !conn_send_command(connection, sock, P_AUTH_CHALLENGE, 0,
4977                                 my_challenge, CHALLENGE_LEN);
4978         if (!rv)
4979                 goto fail;
4980
4981         err = drbd_recv_header(connection, &pi);
4982         if (err) {
4983                 rv = 0;
4984                 goto fail;
4985         }
4986
4987         if (pi.cmd != P_AUTH_CHALLENGE) {
4988                 drbd_err(connection, "expected AuthChallenge packet, received: %s (0x%04x)\n",
4989                          cmdname(pi.cmd), pi.cmd);
4990                 rv = 0;
4991                 goto fail;
4992         }
4993
4994         if (pi.size > CHALLENGE_LEN * 2) {
4995                 drbd_err(connection, "expected AuthChallenge payload too big.\n");
4996                 rv = -1;
4997                 goto fail;
4998         }
4999
5000         if (pi.size < CHALLENGE_LEN) {
5001                 drbd_err(connection, "AuthChallenge payload too small.\n");
5002                 rv = -1;
5003                 goto fail;
5004         }
5005
5006         peers_ch = kmalloc(pi.size, GFP_NOIO);
5007         if (peers_ch == NULL) {
5008                 drbd_err(connection, "kmalloc of peers_ch failed\n");
5009                 rv = -1;
5010                 goto fail;
5011         }
5012
5013         err = drbd_recv_all_warn(connection, peers_ch, pi.size);
5014         if (err) {
5015                 rv = 0;
5016                 goto fail;
5017         }
5018
5019         if (!memcmp(my_challenge, peers_ch, CHALLENGE_LEN)) {
5020                 drbd_err(connection, "Peer presented the same challenge!\n");
5021                 rv = -1;
5022                 goto fail;
5023         }
5024
5025         resp_size = crypto_shash_digestsize(connection->cram_hmac_tfm);
5026         response = kmalloc(resp_size, GFP_NOIO);
5027         if (response == NULL) {
5028                 drbd_err(connection, "kmalloc of response failed\n");
5029                 rv = -1;
5030                 goto fail;
5031         }
5032
5033         rv = crypto_shash_digest(desc, peers_ch, pi.size, response);
5034         if (rv) {
5035                 drbd_err(connection, "crypto_hash_digest() failed with %d\n", rv);
5036                 rv = -1;
5037                 goto fail;
5038         }
5039
5040         if (!conn_prepare_command(connection, sock)) {
5041                 rv = 0;
5042                 goto fail;
5043         }
5044         rv = !conn_send_command(connection, sock, P_AUTH_RESPONSE, 0,
5045                                 response, resp_size);
5046         if (!rv)
5047                 goto fail;
5048
5049         err = drbd_recv_header(connection, &pi);
5050         if (err) {
5051                 rv = 0;
5052                 goto fail;
5053         }
5054
5055         if (pi.cmd != P_AUTH_RESPONSE) {
5056                 drbd_err(connection, "expected AuthResponse packet, received: %s (0x%04x)\n",
5057                          cmdname(pi.cmd), pi.cmd);
5058                 rv = 0;
5059                 goto fail;
5060         }
5061
5062         if (pi.size != resp_size) {
5063                 drbd_err(connection, "expected AuthResponse payload of wrong size\n");
5064                 rv = 0;
5065                 goto fail;
5066         }
5067
5068         err = drbd_recv_all_warn(connection, response , resp_size);
5069         if (err) {
5070                 rv = 0;
5071                 goto fail;
5072         }
5073
5074         right_response = kmalloc(resp_size, GFP_NOIO);
5075         if (right_response == NULL) {
5076                 drbd_err(connection, "kmalloc of right_response failed\n");
5077                 rv = -1;
5078                 goto fail;
5079         }
5080
5081         rv = crypto_shash_digest(desc, my_challenge, CHALLENGE_LEN,
5082                                  right_response);
5083         if (rv) {
5084                 drbd_err(connection, "crypto_hash_digest() failed with %d\n", rv);
5085                 rv = -1;
5086                 goto fail;
5087         }
5088
5089         rv = !memcmp(response, right_response, resp_size);
5090
5091         if (rv)
5092                 drbd_info(connection, "Peer authenticated using %d bytes HMAC\n",
5093                      resp_size);
5094         else
5095                 rv = -1;
5096
5097  fail:
5098         kfree(peers_ch);
5099         kfree(response);
5100         kfree(right_response);
5101         shash_desc_zero(desc);
5102
5103         return rv;
5104 }
5105 #endif
5106
5107 int drbd_receiver(struct drbd_thread *thi)
5108 {
5109         struct drbd_connection *connection = thi->connection;
5110         int h;
5111
5112         drbd_info(connection, "receiver (re)started\n");
5113
5114         do {
5115                 h = conn_connect(connection);
5116                 if (h == 0) {
5117                         conn_disconnect(connection);
5118                         schedule_timeout_interruptible(HZ);
5119                 }
5120                 if (h == -1) {
5121                         drbd_warn(connection, "Discarding network configuration.\n");
5122                         conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
5123                 }
5124         } while (h == 0);
5125
5126         if (h > 0)
5127                 drbdd(connection);
5128
5129         conn_disconnect(connection);
5130
5131         drbd_info(connection, "receiver terminated\n");
5132         return 0;
5133 }
5134
5135 /* ********* acknowledge sender ******** */
5136
5137 static int got_conn_RqSReply(struct drbd_connection *connection, struct packet_info *pi)
5138 {
5139         struct p_req_state_reply *p = pi->data;
5140         int retcode = be32_to_cpu(p->retcode);
5141
5142         if (retcode >= SS_SUCCESS) {
5143                 set_bit(CONN_WD_ST_CHG_OKAY, &connection->flags);
5144         } else {
5145                 set_bit(CONN_WD_ST_CHG_FAIL, &connection->flags);
5146                 drbd_err(connection, "Requested state change failed by peer: %s (%d)\n",
5147                          drbd_set_st_err_str(retcode), retcode);
5148         }
5149         wake_up(&connection->ping_wait);
5150
5151         return 0;
5152 }
5153
5154 static int got_RqSReply(struct drbd_connection *connection, struct packet_info *pi)
5155 {
5156         struct drbd_peer_device *peer_device;
5157         struct drbd_device *device;
5158         struct p_req_state_reply *p = pi->data;
5159         int retcode = be32_to_cpu(p->retcode);
5160
5161         peer_device = conn_peer_device(connection, pi->vnr);
5162         if (!peer_device)
5163                 return -EIO;
5164         device = peer_device->device;
5165
5166         if (test_bit(CONN_WD_ST_CHG_REQ, &connection->flags)) {
5167                 D_ASSERT(device, connection->agreed_pro_version < 100);
5168                 return got_conn_RqSReply(connection, pi);
5169         }
5170
5171         if (retcode >= SS_SUCCESS) {
5172                 set_bit(CL_ST_CHG_SUCCESS, &device->flags);
5173         } else {
5174                 set_bit(CL_ST_CHG_FAIL, &device->flags);
5175                 drbd_err(device, "Requested state change failed by peer: %s (%d)\n",
5176                         drbd_set_st_err_str(retcode), retcode);
5177         }
5178         wake_up(&device->state_wait);
5179
5180         return 0;
5181 }
5182
5183 static int got_Ping(struct drbd_connection *connection, struct packet_info *pi)
5184 {
5185         return drbd_send_ping_ack(connection);
5186
5187 }
5188
5189 static int got_PingAck(struct drbd_connection *connection, struct packet_info *pi)
5190 {
5191         /* restore idle timeout */
5192         connection->meta.socket->sk->sk_rcvtimeo = connection->net_conf->ping_int*HZ;
5193         if (!test_and_set_bit(GOT_PING_ACK, &connection->flags))
5194                 wake_up(&connection->ping_wait);
5195
5196         return 0;
5197 }
5198
5199 static int got_IsInSync(struct drbd_connection *connection, struct packet_info *pi)
5200 {
5201         struct drbd_peer_device *peer_device;
5202         struct drbd_device *device;
5203         struct p_block_ack *p = pi->data;
5204         sector_t sector = be64_to_cpu(p->sector);
5205         int blksize = be32_to_cpu(p->blksize);
5206
5207         peer_device = conn_peer_device(connection, pi->vnr);
5208         if (!peer_device)
5209                 return -EIO;
5210         device = peer_device->device;
5211
5212         D_ASSERT(device, peer_device->connection->agreed_pro_version >= 89);
5213
5214         update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5215
5216         if (get_ldev(device)) {
5217                 drbd_rs_complete_io(device, sector);
5218                 drbd_set_in_sync(device, sector, blksize);
5219                 /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
5220                 device->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
5221                 put_ldev(device);
5222         }
5223         dec_rs_pending(device);
5224         atomic_add(blksize >> 9, &device->rs_sect_in);
5225
5226         return 0;
5227 }
5228
5229 static int
5230 validate_req_change_req_state(struct drbd_device *device, u64 id, sector_t sector,
5231                               struct rb_root *root, const char *func,
5232                               enum drbd_req_event what, bool missing_ok)
5233 {
5234         struct drbd_request *req;
5235         struct bio_and_error m;
5236
5237         spin_lock_irq(&device->resource->req_lock);
5238         req = find_request(device, root, id, sector, missing_ok, func);
5239         if (unlikely(!req)) {
5240                 spin_unlock_irq(&device->resource->req_lock);
5241                 return -EIO;
5242         }
5243         __req_mod(req, what, &m);
5244         spin_unlock_irq(&device->resource->req_lock);
5245
5246         if (m.bio)
5247                 complete_master_bio(device, &m);
5248         return 0;
5249 }
5250
5251 static int got_BlockAck(struct drbd_connection *connection, struct packet_info *pi)
5252 {
5253         struct drbd_peer_device *peer_device;
5254         struct drbd_device *device;
5255         struct p_block_ack *p = pi->data;
5256         sector_t sector = be64_to_cpu(p->sector);
5257         int blksize = be32_to_cpu(p->blksize);
5258         enum drbd_req_event what;
5259
5260         peer_device = conn_peer_device(connection, pi->vnr);
5261         if (!peer_device)
5262                 return -EIO;
5263         device = peer_device->device;
5264
5265         update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5266
5267         if (p->block_id == ID_SYNCER) {
5268                 drbd_set_in_sync(device, sector, blksize);
5269                 dec_rs_pending(device);
5270                 return 0;
5271         }
5272         switch (pi->cmd) {
5273         case P_RS_WRITE_ACK:
5274                 what = WRITE_ACKED_BY_PEER_AND_SIS;
5275                 break;
5276         case P_WRITE_ACK:
5277                 what = WRITE_ACKED_BY_PEER;
5278                 break;
5279         case P_RECV_ACK:
5280                 what = RECV_ACKED_BY_PEER;
5281                 break;
5282         case P_SUPERSEDED:
5283                 what = CONFLICT_RESOLVED;
5284                 break;
5285         case P_RETRY_WRITE:
5286                 what = POSTPONE_WRITE;
5287                 break;
5288         default:
5289                 BUG();
5290         }
5291
5292         return validate_req_change_req_state(device, p->block_id, sector,
5293                                              &device->write_requests, __func__,
5294                                              what, false);
5295 }
5296
5297 static int got_NegAck(struct drbd_connection *connection, struct packet_info *pi)
5298 {
5299         struct drbd_peer_device *peer_device;
5300         struct drbd_device *device;
5301         struct p_block_ack *p = pi->data;
5302         sector_t sector = be64_to_cpu(p->sector);
5303         int size = be32_to_cpu(p->blksize);
5304         int err;
5305
5306         peer_device = conn_peer_device(connection, pi->vnr);
5307         if (!peer_device)
5308                 return -EIO;
5309         device = peer_device->device;
5310
5311         update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5312
5313         if (p->block_id == ID_SYNCER) {
5314                 dec_rs_pending(device);
5315                 drbd_rs_failed_io(device, sector, size);
5316                 return 0;
5317         }
5318
5319         err = validate_req_change_req_state(device, p->block_id, sector,
5320                                             &device->write_requests, __func__,
5321                                             NEG_ACKED, true);
5322         if (err) {
5323                 /* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
5324                    The master bio might already be completed, therefore the
5325                    request is no longer in the collision hash. */
5326                 /* In Protocol B we might already have got a P_RECV_ACK
5327                    but then get a P_NEG_ACK afterwards. */
5328                 drbd_set_out_of_sync(device, sector, size);
5329         }
5330         return 0;
5331 }
5332
5333 static int got_NegDReply(struct drbd_connection *connection, struct packet_info *pi)
5334 {
5335         struct drbd_peer_device *peer_device;
5336         struct drbd_device *device;
5337         struct p_block_ack *p = pi->data;
5338         sector_t sector = be64_to_cpu(p->sector);
5339
5340         peer_device = conn_peer_device(connection, pi->vnr);
5341         if (!peer_device)
5342                 return -EIO;
5343         device = peer_device->device;
5344
5345         update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5346
5347         drbd_err(device, "Got NegDReply; Sector %llus, len %u.\n",
5348             (unsigned long long)sector, be32_to_cpu(p->blksize));
5349
5350         return validate_req_change_req_state(device, p->block_id, sector,
5351                                              &device->read_requests, __func__,
5352                                              NEG_ACKED, false);
5353 }
5354
5355 static int got_NegRSDReply(struct drbd_connection *connection, struct packet_info *pi)
5356 {
5357         struct drbd_peer_device *peer_device;
5358         struct drbd_device *device;
5359         sector_t sector;
5360         int size;
5361         struct p_block_ack *p = pi->data;
5362
5363         peer_device = conn_peer_device(connection, pi->vnr);
5364         if (!peer_device)
5365                 return -EIO;
5366         device = peer_device->device;
5367
5368         sector = be64_to_cpu(p->sector);
5369         size = be32_to_cpu(p->blksize);
5370
5371         update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5372
5373         dec_rs_pending(device);
5374
5375         if (get_ldev_if_state(device, D_FAILED)) {
5376                 drbd_rs_complete_io(device, sector);
5377                 switch (pi->cmd) {
5378                 case P_NEG_RS_DREPLY:
5379                         drbd_rs_failed_io(device, sector, size);
5380                 case P_RS_CANCEL:
5381                         break;
5382                 default:
5383                         BUG();
5384                 }
5385                 put_ldev(device);
5386         }
5387
5388         return 0;
5389 }
5390
5391 static int got_BarrierAck(struct drbd_connection *connection, struct packet_info *pi)
5392 {
5393         struct p_barrier_ack *p = pi->data;
5394         struct drbd_peer_device *peer_device;
5395         int vnr;
5396
5397         tl_release(connection, p->barrier, be32_to_cpu(p->set_size));
5398
5399         rcu_read_lock();
5400         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
5401                 struct drbd_device *device = peer_device->device;
5402
5403                 if (device->state.conn == C_AHEAD &&
5404                     atomic_read(&device->ap_in_flight) == 0 &&
5405                     !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &device->flags)) {
5406                         device->start_resync_timer.expires = jiffies + HZ;
5407                         add_timer(&device->start_resync_timer);
5408                 }
5409         }
5410         rcu_read_unlock();
5411
5412         return 0;
5413 }
5414
5415 static int got_OVResult(struct drbd_connection *connection, struct packet_info *pi)
5416 {
5417         struct drbd_peer_device *peer_device;
5418         struct drbd_device *device;
5419         struct p_block_ack *p = pi->data;
5420         struct drbd_device_work *dw;
5421         sector_t sector;
5422         int size;
5423
5424         peer_device = conn_peer_device(connection, pi->vnr);
5425         if (!peer_device)
5426                 return -EIO;
5427         device = peer_device->device;
5428
5429         sector = be64_to_cpu(p->sector);
5430         size = be32_to_cpu(p->blksize);
5431
5432         update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5433
5434         if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
5435                 drbd_ov_out_of_sync_found(device, sector, size);
5436         else
5437                 ov_out_of_sync_print(device);
5438
5439         if (!get_ldev(device))
5440                 return 0;
5441
5442         drbd_rs_complete_io(device, sector);
5443         dec_rs_pending(device);
5444
5445         --device->ov_left;
5446
5447         /* let's advance progress step marks only for every other megabyte */
5448         if ((device->ov_left & 0x200) == 0x200)
5449                 drbd_advance_rs_marks(device, device->ov_left);
5450
5451         if (device->ov_left == 0) {
5452                 dw = kmalloc(sizeof(*dw), GFP_NOIO);
5453                 if (dw) {
5454                         dw->w.cb = w_ov_finished;
5455                         dw->device = device;
5456                         drbd_queue_work(&peer_device->connection->sender_work, &dw->w);
5457                 } else {
5458                         drbd_err(device, "kmalloc(dw) failed.");
5459                         ov_out_of_sync_print(device);
5460                         drbd_resync_finished(device);
5461                 }
5462         }
5463         put_ldev(device);
5464         return 0;
5465 }
5466
5467 static int got_skip(struct drbd_connection *connection, struct packet_info *pi)
5468 {
5469         return 0;
5470 }
5471
5472 struct meta_sock_cmd {
5473         size_t pkt_size;
5474         int (*fn)(struct drbd_connection *connection, struct packet_info *);
5475 };
5476
5477 static void set_rcvtimeo(struct drbd_connection *connection, bool ping_timeout)
5478 {
5479         long t;
5480         struct net_conf *nc;
5481
5482         rcu_read_lock();
5483         nc = rcu_dereference(connection->net_conf);
5484         t = ping_timeout ? nc->ping_timeo : nc->ping_int;
5485         rcu_read_unlock();
5486
5487         t *= HZ;
5488         if (ping_timeout)
5489                 t /= 10;
5490
5491         connection->meta.socket->sk->sk_rcvtimeo = t;
5492 }
5493
5494 static void set_ping_timeout(struct drbd_connection *connection)
5495 {
5496         set_rcvtimeo(connection, 1);
5497 }
5498
5499 static void set_idle_timeout(struct drbd_connection *connection)
5500 {
5501         set_rcvtimeo(connection, 0);
5502 }
5503
5504 static struct meta_sock_cmd ack_receiver_tbl[] = {
5505         [P_PING]            = { 0, got_Ping },
5506         [P_PING_ACK]        = { 0, got_PingAck },
5507         [P_RECV_ACK]        = { sizeof(struct p_block_ack), got_BlockAck },
5508         [P_WRITE_ACK]       = { sizeof(struct p_block_ack), got_BlockAck },
5509         [P_RS_WRITE_ACK]    = { sizeof(struct p_block_ack), got_BlockAck },
5510         [P_SUPERSEDED]   = { sizeof(struct p_block_ack), got_BlockAck },
5511         [P_NEG_ACK]         = { sizeof(struct p_block_ack), got_NegAck },
5512         [P_NEG_DREPLY]      = { sizeof(struct p_block_ack), got_NegDReply },
5513         [P_NEG_RS_DREPLY]   = { sizeof(struct p_block_ack), got_NegRSDReply },
5514         [P_OV_RESULT]       = { sizeof(struct p_block_ack), got_OVResult },
5515         [P_BARRIER_ACK]     = { sizeof(struct p_barrier_ack), got_BarrierAck },
5516         [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
5517         [P_RS_IS_IN_SYNC]   = { sizeof(struct p_block_ack), got_IsInSync },
5518         [P_DELAY_PROBE]     = { sizeof(struct p_delay_probe93), got_skip },
5519         [P_RS_CANCEL]       = { sizeof(struct p_block_ack), got_NegRSDReply },
5520         [P_CONN_ST_CHG_REPLY]={ sizeof(struct p_req_state_reply), got_conn_RqSReply },
5521         [P_RETRY_WRITE]     = { sizeof(struct p_block_ack), got_BlockAck },
5522 };
5523
5524 int drbd_ack_receiver(struct drbd_thread *thi)
5525 {
5526         struct drbd_connection *connection = thi->connection;
5527         struct meta_sock_cmd *cmd = NULL;
5528         struct packet_info pi;
5529         unsigned long pre_recv_jif;
5530         int rv;
5531         void *buf    = connection->meta.rbuf;
5532         int received = 0;
5533         unsigned int header_size = drbd_header_size(connection);
5534         int expect   = header_size;
5535         bool ping_timeout_active = false;
5536         struct sched_param param = { .sched_priority = 2 };
5537
5538         rv = sched_setscheduler(current, SCHED_RR, &param);
5539         if (rv < 0)
5540                 drbd_err(connection, "drbd_ack_receiver: ERROR set priority, ret=%d\n", rv);
5541
5542         while (get_t_state(thi) == RUNNING) {
5543                 drbd_thread_current_set_cpu(thi);
5544
5545                 conn_reclaim_net_peer_reqs(connection);
5546
5547                 if (test_and_clear_bit(SEND_PING, &connection->flags)) {
5548                         if (drbd_send_ping(connection)) {
5549                                 drbd_err(connection, "drbd_send_ping has failed\n");
5550                                 goto reconnect;
5551                         }
5552                         set_ping_timeout(connection);
5553                         ping_timeout_active = true;
5554                 }
5555
5556                 pre_recv_jif = jiffies;
5557                 rv = drbd_recv_short(connection->meta.socket, buf, expect-received, 0);
5558
5559                 /* Note:
5560                  * -EINTR        (on meta) we got a signal
5561                  * -EAGAIN       (on meta) rcvtimeo expired
5562                  * -ECONNRESET   other side closed the connection
5563                  * -ERESTARTSYS  (on data) we got a signal
5564                  * rv <  0       other than above: unexpected error!
5565                  * rv == expected: full header or command
5566                  * rv <  expected: "woken" by signal during receive
5567                  * rv == 0       : "connection shut down by peer"
5568                  */
5569                 if (likely(rv > 0)) {
5570                         received += rv;
5571                         buf      += rv;
5572                 } else if (rv == 0) {
5573                         if (test_bit(DISCONNECT_SENT, &connection->flags)) {
5574                                 long t;
5575                                 rcu_read_lock();
5576                                 t = rcu_dereference(connection->net_conf)->ping_timeo * HZ/10;
5577                                 rcu_read_unlock();
5578
5579                                 t = wait_event_timeout(connection->ping_wait,
5580                                                        connection->cstate < C_WF_REPORT_PARAMS,
5581                                                        t);
5582                                 if (t)
5583                                         break;
5584                         }
5585                         drbd_err(connection, "meta connection shut down by peer.\n");
5586                         goto reconnect;
5587                 } else if (rv == -EAGAIN) {
5588                         /* If the data socket received something meanwhile,
5589                          * that is good enough: peer is still alive. */
5590                         if (time_after(connection->last_received, pre_recv_jif))
5591                                 continue;
5592                         if (ping_timeout_active) {
5593                                 drbd_err(connection, "PingAck did not arrive in time.\n");
5594                                 goto reconnect;
5595                         }
5596                         set_bit(SEND_PING, &connection->flags);
5597                         continue;
5598                 } else if (rv == -EINTR) {
5599                         /* maybe drbd_thread_stop(): the while condition will notice.
5600                          * maybe woken for send_ping: we'll send a ping above,
5601                          * and change the rcvtimeo */
5602                         flush_signals(current);
5603                         continue;
5604                 } else {
5605                         drbd_err(connection, "sock_recvmsg returned %d\n", rv);
5606                         goto reconnect;
5607                 }
5608
5609                 if (received == expect && cmd == NULL) {
5610                         if (decode_header(connection, connection->meta.rbuf, &pi))
5611                                 goto reconnect;
5612                         cmd = &ack_receiver_tbl[pi.cmd];
5613                         if (pi.cmd >= ARRAY_SIZE(ack_receiver_tbl) || !cmd->fn) {
5614                                 drbd_err(connection, "Unexpected meta packet %s (0x%04x)\n",
5615                                          cmdname(pi.cmd), pi.cmd);
5616                                 goto disconnect;
5617                         }
5618                         expect = header_size + cmd->pkt_size;
5619                         if (pi.size != expect - header_size) {
5620                                 drbd_err(connection, "Wrong packet size on meta (c: %d, l: %d)\n",
5621                                         pi.cmd, pi.size);
5622                                 goto reconnect;
5623                         }
5624                 }
5625                 if (received == expect) {
5626                         bool err;
5627
5628                         err = cmd->fn(connection, &pi);
5629                         if (err) {
5630                                 drbd_err(connection, "%pf failed\n", cmd->fn);
5631                                 goto reconnect;
5632                         }
5633
5634                         connection->last_received = jiffies;
5635
5636                         if (cmd == &ack_receiver_tbl[P_PING_ACK]) {
5637                                 set_idle_timeout(connection);
5638                                 ping_timeout_active = false;
5639                         }
5640
5641                         buf      = connection->meta.rbuf;
5642                         received = 0;
5643                         expect   = header_size;
5644                         cmd      = NULL;
5645                 }
5646         }
5647
5648         if (0) {
5649 reconnect:
5650                 conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
5651                 conn_md_sync(connection);
5652         }
5653         if (0) {
5654 disconnect:
5655                 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
5656         }
5657
5658         drbd_info(connection, "ack_receiver terminated\n");
5659
5660         return 0;
5661 }
5662
5663 void drbd_send_acks_wf(struct work_struct *ws)
5664 {
5665         struct drbd_peer_device *peer_device =
5666                 container_of(ws, struct drbd_peer_device, send_acks_work);
5667         struct drbd_connection *connection = peer_device->connection;
5668         struct drbd_device *device = peer_device->device;
5669         struct net_conf *nc;
5670         int tcp_cork, err;
5671
5672         rcu_read_lock();
5673         nc = rcu_dereference(connection->net_conf);
5674         tcp_cork = nc->tcp_cork;
5675         rcu_read_unlock();
5676
5677         if (tcp_cork)
5678                 drbd_tcp_cork(connection->meta.socket);
5679
5680         err = drbd_finish_peer_reqs(device);
5681         kref_put(&device->kref, drbd_destroy_device);
5682         /* get is in drbd_endio_write_sec_final(). That is necessary to keep the
5683            struct work_struct send_acks_work alive, which is in the peer_device object */
5684
5685         if (err) {
5686                 conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
5687                 return;
5688         }
5689
5690         if (tcp_cork)
5691                 drbd_tcp_uncork(connection->meta.socket);
5692
5693         return;
5694 }