]> git.kernelconcepts.de Git - karo-tx-linux.git/blob - drivers/block/drbd/drbd_receiver.c
Merge branch 'for-4.8' of git://git.kernel.org/pub/scm/linux/kernel/git/tj/libata
[karo-tx-linux.git] / drivers / block / drbd / drbd_receiver.c
1 /*
2    drbd_receiver.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24
25
26 #include <linux/module.h>
27
28 #include <asm/uaccess.h>
29 #include <net/sock.h>
30
31 #include <linux/drbd.h>
32 #include <linux/fs.h>
33 #include <linux/file.h>
34 #include <linux/in.h>
35 #include <linux/mm.h>
36 #include <linux/memcontrol.h>
37 #include <linux/mm_inline.h>
38 #include <linux/slab.h>
39 #include <linux/pkt_sched.h>
40 #define __KERNEL_SYSCALLS__
41 #include <linux/unistd.h>
42 #include <linux/vmalloc.h>
43 #include <linux/random.h>
44 #include <linux/string.h>
45 #include <linux/scatterlist.h>
46 #include "drbd_int.h"
47 #include "drbd_protocol.h"
48 #include "drbd_req.h"
49 #include "drbd_vli.h"
50
51 #define PRO_FEATURES (FF_TRIM)
52
53 struct packet_info {
54         enum drbd_packet cmd;
55         unsigned int size;
56         unsigned int vnr;
57         void *data;
58 };
59
60 enum finish_epoch {
61         FE_STILL_LIVE,
62         FE_DESTROYED,
63         FE_RECYCLED,
64 };
65
66 static int drbd_do_features(struct drbd_connection *connection);
67 static int drbd_do_auth(struct drbd_connection *connection);
68 static int drbd_disconnected(struct drbd_peer_device *);
69 static void conn_wait_active_ee_empty(struct drbd_connection *connection);
70 static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *, struct drbd_epoch *, enum epoch_event);
71 static int e_end_block(struct drbd_work *, int);
72
73
74 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
75
76 /*
77  * some helper functions to deal with single linked page lists,
78  * page->private being our "next" pointer.
79  */
80
81 /* If at least n pages are linked at head, get n pages off.
82  * Otherwise, don't modify head, and return NULL.
83  * Locking is the responsibility of the caller.
84  */
85 static struct page *page_chain_del(struct page **head, int n)
86 {
87         struct page *page;
88         struct page *tmp;
89
90         BUG_ON(!n);
91         BUG_ON(!head);
92
93         page = *head;
94
95         if (!page)
96                 return NULL;
97
98         while (page) {
99                 tmp = page_chain_next(page);
100                 if (--n == 0)
101                         break; /* found sufficient pages */
102                 if (tmp == NULL)
103                         /* insufficient pages, don't use any of them. */
104                         return NULL;
105                 page = tmp;
106         }
107
108         /* add end of list marker for the returned list */
109         set_page_private(page, 0);
110         /* actual return value, and adjustment of head */
111         page = *head;
112         *head = tmp;
113         return page;
114 }
115
116 /* may be used outside of locks to find the tail of a (usually short)
117  * "private" page chain, before adding it back to a global chain head
118  * with page_chain_add() under a spinlock. */
119 static struct page *page_chain_tail(struct page *page, int *len)
120 {
121         struct page *tmp;
122         int i = 1;
123         while ((tmp = page_chain_next(page)))
124                 ++i, page = tmp;
125         if (len)
126                 *len = i;
127         return page;
128 }
129
130 static int page_chain_free(struct page *page)
131 {
132         struct page *tmp;
133         int i = 0;
134         page_chain_for_each_safe(page, tmp) {
135                 put_page(page);
136                 ++i;
137         }
138         return i;
139 }
140
141 static void page_chain_add(struct page **head,
142                 struct page *chain_first, struct page *chain_last)
143 {
144 #if 1
145         struct page *tmp;
146         tmp = page_chain_tail(chain_first, NULL);
147         BUG_ON(tmp != chain_last);
148 #endif
149
150         /* add chain to head */
151         set_page_private(chain_last, (unsigned long)*head);
152         *head = chain_first;
153 }
154
155 static struct page *__drbd_alloc_pages(struct drbd_device *device,
156                                        unsigned int number)
157 {
158         struct page *page = NULL;
159         struct page *tmp = NULL;
160         unsigned int i = 0;
161
162         /* Yes, testing drbd_pp_vacant outside the lock is racy.
163          * So what. It saves a spin_lock. */
164         if (drbd_pp_vacant >= number) {
165                 spin_lock(&drbd_pp_lock);
166                 page = page_chain_del(&drbd_pp_pool, number);
167                 if (page)
168                         drbd_pp_vacant -= number;
169                 spin_unlock(&drbd_pp_lock);
170                 if (page)
171                         return page;
172         }
173
174         /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
175          * "criss-cross" setup, that might cause write-out on some other DRBD,
176          * which in turn might block on the other node at this very place.  */
177         for (i = 0; i < number; i++) {
178                 tmp = alloc_page(GFP_TRY);
179                 if (!tmp)
180                         break;
181                 set_page_private(tmp, (unsigned long)page);
182                 page = tmp;
183         }
184
185         if (i == number)
186                 return page;
187
188         /* Not enough pages immediately available this time.
189          * No need to jump around here, drbd_alloc_pages will retry this
190          * function "soon". */
191         if (page) {
192                 tmp = page_chain_tail(page, NULL);
193                 spin_lock(&drbd_pp_lock);
194                 page_chain_add(&drbd_pp_pool, page, tmp);
195                 drbd_pp_vacant += i;
196                 spin_unlock(&drbd_pp_lock);
197         }
198         return NULL;
199 }
200
201 static void reclaim_finished_net_peer_reqs(struct drbd_device *device,
202                                            struct list_head *to_be_freed)
203 {
204         struct drbd_peer_request *peer_req, *tmp;
205
206         /* The EEs are always appended to the end of the list. Since
207            they are sent in order over the wire, they have to finish
208            in order. As soon as we see the first not finished we can
209            stop to examine the list... */
210
211         list_for_each_entry_safe(peer_req, tmp, &device->net_ee, w.list) {
212                 if (drbd_peer_req_has_active_page(peer_req))
213                         break;
214                 list_move(&peer_req->w.list, to_be_freed);
215         }
216 }
217
218 static void drbd_reclaim_net_peer_reqs(struct drbd_device *device)
219 {
220         LIST_HEAD(reclaimed);
221         struct drbd_peer_request *peer_req, *t;
222
223         spin_lock_irq(&device->resource->req_lock);
224         reclaim_finished_net_peer_reqs(device, &reclaimed);
225         spin_unlock_irq(&device->resource->req_lock);
226         list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
227                 drbd_free_net_peer_req(device, peer_req);
228 }
229
230 static void conn_reclaim_net_peer_reqs(struct drbd_connection *connection)
231 {
232         struct drbd_peer_device *peer_device;
233         int vnr;
234
235         rcu_read_lock();
236         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
237                 struct drbd_device *device = peer_device->device;
238                 if (!atomic_read(&device->pp_in_use_by_net))
239                         continue;
240
241                 kref_get(&device->kref);
242                 rcu_read_unlock();
243                 drbd_reclaim_net_peer_reqs(device);
244                 kref_put(&device->kref, drbd_destroy_device);
245                 rcu_read_lock();
246         }
247         rcu_read_unlock();
248 }
249
250 /**
251  * drbd_alloc_pages() - Returns @number pages, retries forever (or until signalled)
252  * @device:     DRBD device.
253  * @number:     number of pages requested
254  * @retry:      whether to retry, if not enough pages are available right now
255  *
256  * Tries to allocate number pages, first from our own page pool, then from
257  * the kernel.
258  * Possibly retry until DRBD frees sufficient pages somewhere else.
259  *
260  * If this allocation would exceed the max_buffers setting, we throttle
261  * allocation (schedule_timeout) to give the system some room to breathe.
262  *
263  * We do not use max-buffers as hard limit, because it could lead to
264  * congestion and further to a distributed deadlock during online-verify or
265  * (checksum based) resync, if the max-buffers, socket buffer sizes and
266  * resync-rate settings are mis-configured.
267  *
268  * Returns a page chain linked via page->private.
269  */
270 struct page *drbd_alloc_pages(struct drbd_peer_device *peer_device, unsigned int number,
271                               bool retry)
272 {
273         struct drbd_device *device = peer_device->device;
274         struct page *page = NULL;
275         struct net_conf *nc;
276         DEFINE_WAIT(wait);
277         unsigned int mxb;
278
279         rcu_read_lock();
280         nc = rcu_dereference(peer_device->connection->net_conf);
281         mxb = nc ? nc->max_buffers : 1000000;
282         rcu_read_unlock();
283
284         if (atomic_read(&device->pp_in_use) < mxb)
285                 page = __drbd_alloc_pages(device, number);
286
287         /* Try to keep the fast path fast, but occasionally we need
288          * to reclaim the pages we lended to the network stack. */
289         if (page && atomic_read(&device->pp_in_use_by_net) > 512)
290                 drbd_reclaim_net_peer_reqs(device);
291
292         while (page == NULL) {
293                 prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
294
295                 drbd_reclaim_net_peer_reqs(device);
296
297                 if (atomic_read(&device->pp_in_use) < mxb) {
298                         page = __drbd_alloc_pages(device, number);
299                         if (page)
300                                 break;
301                 }
302
303                 if (!retry)
304                         break;
305
306                 if (signal_pending(current)) {
307                         drbd_warn(device, "drbd_alloc_pages interrupted!\n");
308                         break;
309                 }
310
311                 if (schedule_timeout(HZ/10) == 0)
312                         mxb = UINT_MAX;
313         }
314         finish_wait(&drbd_pp_wait, &wait);
315
316         if (page)
317                 atomic_add(number, &device->pp_in_use);
318         return page;
319 }
320
321 /* Must not be used from irq, as that may deadlock: see drbd_alloc_pages.
322  * Is also used from inside an other spin_lock_irq(&resource->req_lock);
323  * Either links the page chain back to the global pool,
324  * or returns all pages to the system. */
325 static void drbd_free_pages(struct drbd_device *device, struct page *page, int is_net)
326 {
327         atomic_t *a = is_net ? &device->pp_in_use_by_net : &device->pp_in_use;
328         int i;
329
330         if (page == NULL)
331                 return;
332
333         if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * minor_count)
334                 i = page_chain_free(page);
335         else {
336                 struct page *tmp;
337                 tmp = page_chain_tail(page, &i);
338                 spin_lock(&drbd_pp_lock);
339                 page_chain_add(&drbd_pp_pool, page, tmp);
340                 drbd_pp_vacant += i;
341                 spin_unlock(&drbd_pp_lock);
342         }
343         i = atomic_sub_return(i, a);
344         if (i < 0)
345                 drbd_warn(device, "ASSERTION FAILED: %s: %d < 0\n",
346                         is_net ? "pp_in_use_by_net" : "pp_in_use", i);
347         wake_up(&drbd_pp_wait);
348 }
349
350 /*
351 You need to hold the req_lock:
352  _drbd_wait_ee_list_empty()
353
354 You must not have the req_lock:
355  drbd_free_peer_req()
356  drbd_alloc_peer_req()
357  drbd_free_peer_reqs()
358  drbd_ee_fix_bhs()
359  drbd_finish_peer_reqs()
360  drbd_clear_done_ee()
361  drbd_wait_ee_list_empty()
362 */
363
364 struct drbd_peer_request *
365 drbd_alloc_peer_req(struct drbd_peer_device *peer_device, u64 id, sector_t sector,
366                     unsigned int data_size, bool has_payload, gfp_t gfp_mask) __must_hold(local)
367 {
368         struct drbd_device *device = peer_device->device;
369         struct drbd_peer_request *peer_req;
370         struct page *page = NULL;
371         unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
372
373         if (drbd_insert_fault(device, DRBD_FAULT_AL_EE))
374                 return NULL;
375
376         peer_req = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
377         if (!peer_req) {
378                 if (!(gfp_mask & __GFP_NOWARN))
379                         drbd_err(device, "%s: allocation failed\n", __func__);
380                 return NULL;
381         }
382
383         if (has_payload && data_size) {
384                 page = drbd_alloc_pages(peer_device, nr_pages,
385                                         gfpflags_allow_blocking(gfp_mask));
386                 if (!page)
387                         goto fail;
388         }
389
390         memset(peer_req, 0, sizeof(*peer_req));
391         INIT_LIST_HEAD(&peer_req->w.list);
392         drbd_clear_interval(&peer_req->i);
393         peer_req->i.size = data_size;
394         peer_req->i.sector = sector;
395         peer_req->submit_jif = jiffies;
396         peer_req->peer_device = peer_device;
397         peer_req->pages = page;
398         /*
399          * The block_id is opaque to the receiver.  It is not endianness
400          * converted, and sent back to the sender unchanged.
401          */
402         peer_req->block_id = id;
403
404         return peer_req;
405
406  fail:
407         mempool_free(peer_req, drbd_ee_mempool);
408         return NULL;
409 }
410
411 void __drbd_free_peer_req(struct drbd_device *device, struct drbd_peer_request *peer_req,
412                        int is_net)
413 {
414         might_sleep();
415         if (peer_req->flags & EE_HAS_DIGEST)
416                 kfree(peer_req->digest);
417         drbd_free_pages(device, peer_req->pages, is_net);
418         D_ASSERT(device, atomic_read(&peer_req->pending_bios) == 0);
419         D_ASSERT(device, drbd_interval_empty(&peer_req->i));
420         if (!expect(!(peer_req->flags & EE_CALL_AL_COMPLETE_IO))) {
421                 peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
422                 drbd_al_complete_io(device, &peer_req->i);
423         }
424         mempool_free(peer_req, drbd_ee_mempool);
425 }
426
427 int drbd_free_peer_reqs(struct drbd_device *device, struct list_head *list)
428 {
429         LIST_HEAD(work_list);
430         struct drbd_peer_request *peer_req, *t;
431         int count = 0;
432         int is_net = list == &device->net_ee;
433
434         spin_lock_irq(&device->resource->req_lock);
435         list_splice_init(list, &work_list);
436         spin_unlock_irq(&device->resource->req_lock);
437
438         list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
439                 __drbd_free_peer_req(device, peer_req, is_net);
440                 count++;
441         }
442         return count;
443 }
444
445 /*
446  * See also comments in _req_mod(,BARRIER_ACKED) and receive_Barrier.
447  */
448 static int drbd_finish_peer_reqs(struct drbd_device *device)
449 {
450         LIST_HEAD(work_list);
451         LIST_HEAD(reclaimed);
452         struct drbd_peer_request *peer_req, *t;
453         int err = 0;
454
455         spin_lock_irq(&device->resource->req_lock);
456         reclaim_finished_net_peer_reqs(device, &reclaimed);
457         list_splice_init(&device->done_ee, &work_list);
458         spin_unlock_irq(&device->resource->req_lock);
459
460         list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
461                 drbd_free_net_peer_req(device, peer_req);
462
463         /* possible callbacks here:
464          * e_end_block, and e_end_resync_block, e_send_superseded.
465          * all ignore the last argument.
466          */
467         list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
468                 int err2;
469
470                 /* list_del not necessary, next/prev members not touched */
471                 err2 = peer_req->w.cb(&peer_req->w, !!err);
472                 if (!err)
473                         err = err2;
474                 drbd_free_peer_req(device, peer_req);
475         }
476         wake_up(&device->ee_wait);
477
478         return err;
479 }
480
481 static void _drbd_wait_ee_list_empty(struct drbd_device *device,
482                                      struct list_head *head)
483 {
484         DEFINE_WAIT(wait);
485
486         /* avoids spin_lock/unlock
487          * and calling prepare_to_wait in the fast path */
488         while (!list_empty(head)) {
489                 prepare_to_wait(&device->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
490                 spin_unlock_irq(&device->resource->req_lock);
491                 io_schedule();
492                 finish_wait(&device->ee_wait, &wait);
493                 spin_lock_irq(&device->resource->req_lock);
494         }
495 }
496
497 static void drbd_wait_ee_list_empty(struct drbd_device *device,
498                                     struct list_head *head)
499 {
500         spin_lock_irq(&device->resource->req_lock);
501         _drbd_wait_ee_list_empty(device, head);
502         spin_unlock_irq(&device->resource->req_lock);
503 }
504
505 static int drbd_recv_short(struct socket *sock, void *buf, size_t size, int flags)
506 {
507         struct kvec iov = {
508                 .iov_base = buf,
509                 .iov_len = size,
510         };
511         struct msghdr msg = {
512                 .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
513         };
514         return kernel_recvmsg(sock, &msg, &iov, 1, size, msg.msg_flags);
515 }
516
517 static int drbd_recv(struct drbd_connection *connection, void *buf, size_t size)
518 {
519         int rv;
520
521         rv = drbd_recv_short(connection->data.socket, buf, size, 0);
522
523         if (rv < 0) {
524                 if (rv == -ECONNRESET)
525                         drbd_info(connection, "sock was reset by peer\n");
526                 else if (rv != -ERESTARTSYS)
527                         drbd_err(connection, "sock_recvmsg returned %d\n", rv);
528         } else if (rv == 0) {
529                 if (test_bit(DISCONNECT_SENT, &connection->flags)) {
530                         long t;
531                         rcu_read_lock();
532                         t = rcu_dereference(connection->net_conf)->ping_timeo * HZ/10;
533                         rcu_read_unlock();
534
535                         t = wait_event_timeout(connection->ping_wait, connection->cstate < C_WF_REPORT_PARAMS, t);
536
537                         if (t)
538                                 goto out;
539                 }
540                 drbd_info(connection, "sock was shut down by peer\n");
541         }
542
543         if (rv != size)
544                 conn_request_state(connection, NS(conn, C_BROKEN_PIPE), CS_HARD);
545
546 out:
547         return rv;
548 }
549
550 static int drbd_recv_all(struct drbd_connection *connection, void *buf, size_t size)
551 {
552         int err;
553
554         err = drbd_recv(connection, buf, size);
555         if (err != size) {
556                 if (err >= 0)
557                         err = -EIO;
558         } else
559                 err = 0;
560         return err;
561 }
562
563 static int drbd_recv_all_warn(struct drbd_connection *connection, void *buf, size_t size)
564 {
565         int err;
566
567         err = drbd_recv_all(connection, buf, size);
568         if (err && !signal_pending(current))
569                 drbd_warn(connection, "short read (expected size %d)\n", (int)size);
570         return err;
571 }
572
573 /* quoting tcp(7):
574  *   On individual connections, the socket buffer size must be set prior to the
575  *   listen(2) or connect(2) calls in order to have it take effect.
576  * This is our wrapper to do so.
577  */
578 static void drbd_setbufsize(struct socket *sock, unsigned int snd,
579                 unsigned int rcv)
580 {
581         /* open coded SO_SNDBUF, SO_RCVBUF */
582         if (snd) {
583                 sock->sk->sk_sndbuf = snd;
584                 sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
585         }
586         if (rcv) {
587                 sock->sk->sk_rcvbuf = rcv;
588                 sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
589         }
590 }
591
592 static struct socket *drbd_try_connect(struct drbd_connection *connection)
593 {
594         const char *what;
595         struct socket *sock;
596         struct sockaddr_in6 src_in6;
597         struct sockaddr_in6 peer_in6;
598         struct net_conf *nc;
599         int err, peer_addr_len, my_addr_len;
600         int sndbuf_size, rcvbuf_size, connect_int;
601         int disconnect_on_error = 1;
602
603         rcu_read_lock();
604         nc = rcu_dereference(connection->net_conf);
605         if (!nc) {
606                 rcu_read_unlock();
607                 return NULL;
608         }
609         sndbuf_size = nc->sndbuf_size;
610         rcvbuf_size = nc->rcvbuf_size;
611         connect_int = nc->connect_int;
612         rcu_read_unlock();
613
614         my_addr_len = min_t(int, connection->my_addr_len, sizeof(src_in6));
615         memcpy(&src_in6, &connection->my_addr, my_addr_len);
616
617         if (((struct sockaddr *)&connection->my_addr)->sa_family == AF_INET6)
618                 src_in6.sin6_port = 0;
619         else
620                 ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
621
622         peer_addr_len = min_t(int, connection->peer_addr_len, sizeof(src_in6));
623         memcpy(&peer_in6, &connection->peer_addr, peer_addr_len);
624
625         what = "sock_create_kern";
626         err = sock_create_kern(&init_net, ((struct sockaddr *)&src_in6)->sa_family,
627                                SOCK_STREAM, IPPROTO_TCP, &sock);
628         if (err < 0) {
629                 sock = NULL;
630                 goto out;
631         }
632
633         sock->sk->sk_rcvtimeo =
634         sock->sk->sk_sndtimeo = connect_int * HZ;
635         drbd_setbufsize(sock, sndbuf_size, rcvbuf_size);
636
637        /* explicitly bind to the configured IP as source IP
638         *  for the outgoing connections.
639         *  This is needed for multihomed hosts and to be
640         *  able to use lo: interfaces for drbd.
641         * Make sure to use 0 as port number, so linux selects
642         *  a free one dynamically.
643         */
644         what = "bind before connect";
645         err = sock->ops->bind(sock, (struct sockaddr *) &src_in6, my_addr_len);
646         if (err < 0)
647                 goto out;
648
649         /* connect may fail, peer not yet available.
650          * stay C_WF_CONNECTION, don't go Disconnecting! */
651         disconnect_on_error = 0;
652         what = "connect";
653         err = sock->ops->connect(sock, (struct sockaddr *) &peer_in6, peer_addr_len, 0);
654
655 out:
656         if (err < 0) {
657                 if (sock) {
658                         sock_release(sock);
659                         sock = NULL;
660                 }
661                 switch (-err) {
662                         /* timeout, busy, signal pending */
663                 case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
664                 case EINTR: case ERESTARTSYS:
665                         /* peer not (yet) available, network problem */
666                 case ECONNREFUSED: case ENETUNREACH:
667                 case EHOSTDOWN:    case EHOSTUNREACH:
668                         disconnect_on_error = 0;
669                         break;
670                 default:
671                         drbd_err(connection, "%s failed, err = %d\n", what, err);
672                 }
673                 if (disconnect_on_error)
674                         conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
675         }
676
677         return sock;
678 }
679
680 struct accept_wait_data {
681         struct drbd_connection *connection;
682         struct socket *s_listen;
683         struct completion door_bell;
684         void (*original_sk_state_change)(struct sock *sk);
685
686 };
687
688 static void drbd_incoming_connection(struct sock *sk)
689 {
690         struct accept_wait_data *ad = sk->sk_user_data;
691         void (*state_change)(struct sock *sk);
692
693         state_change = ad->original_sk_state_change;
694         if (sk->sk_state == TCP_ESTABLISHED)
695                 complete(&ad->door_bell);
696         state_change(sk);
697 }
698
699 static int prepare_listen_socket(struct drbd_connection *connection, struct accept_wait_data *ad)
700 {
701         int err, sndbuf_size, rcvbuf_size, my_addr_len;
702         struct sockaddr_in6 my_addr;
703         struct socket *s_listen;
704         struct net_conf *nc;
705         const char *what;
706
707         rcu_read_lock();
708         nc = rcu_dereference(connection->net_conf);
709         if (!nc) {
710                 rcu_read_unlock();
711                 return -EIO;
712         }
713         sndbuf_size = nc->sndbuf_size;
714         rcvbuf_size = nc->rcvbuf_size;
715         rcu_read_unlock();
716
717         my_addr_len = min_t(int, connection->my_addr_len, sizeof(struct sockaddr_in6));
718         memcpy(&my_addr, &connection->my_addr, my_addr_len);
719
720         what = "sock_create_kern";
721         err = sock_create_kern(&init_net, ((struct sockaddr *)&my_addr)->sa_family,
722                                SOCK_STREAM, IPPROTO_TCP, &s_listen);
723         if (err) {
724                 s_listen = NULL;
725                 goto out;
726         }
727
728         s_listen->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
729         drbd_setbufsize(s_listen, sndbuf_size, rcvbuf_size);
730
731         what = "bind before listen";
732         err = s_listen->ops->bind(s_listen, (struct sockaddr *)&my_addr, my_addr_len);
733         if (err < 0)
734                 goto out;
735
736         ad->s_listen = s_listen;
737         write_lock_bh(&s_listen->sk->sk_callback_lock);
738         ad->original_sk_state_change = s_listen->sk->sk_state_change;
739         s_listen->sk->sk_state_change = drbd_incoming_connection;
740         s_listen->sk->sk_user_data = ad;
741         write_unlock_bh(&s_listen->sk->sk_callback_lock);
742
743         what = "listen";
744         err = s_listen->ops->listen(s_listen, 5);
745         if (err < 0)
746                 goto out;
747
748         return 0;
749 out:
750         if (s_listen)
751                 sock_release(s_listen);
752         if (err < 0) {
753                 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
754                         drbd_err(connection, "%s failed, err = %d\n", what, err);
755                         conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
756                 }
757         }
758
759         return -EIO;
760 }
761
762 static void unregister_state_change(struct sock *sk, struct accept_wait_data *ad)
763 {
764         write_lock_bh(&sk->sk_callback_lock);
765         sk->sk_state_change = ad->original_sk_state_change;
766         sk->sk_user_data = NULL;
767         write_unlock_bh(&sk->sk_callback_lock);
768 }
769
770 static struct socket *drbd_wait_for_connect(struct drbd_connection *connection, struct accept_wait_data *ad)
771 {
772         int timeo, connect_int, err = 0;
773         struct socket *s_estab = NULL;
774         struct net_conf *nc;
775
776         rcu_read_lock();
777         nc = rcu_dereference(connection->net_conf);
778         if (!nc) {
779                 rcu_read_unlock();
780                 return NULL;
781         }
782         connect_int = nc->connect_int;
783         rcu_read_unlock();
784
785         timeo = connect_int * HZ;
786         /* 28.5% random jitter */
787         timeo += (prandom_u32() & 1) ? timeo / 7 : -timeo / 7;
788
789         err = wait_for_completion_interruptible_timeout(&ad->door_bell, timeo);
790         if (err <= 0)
791                 return NULL;
792
793         err = kernel_accept(ad->s_listen, &s_estab, 0);
794         if (err < 0) {
795                 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
796                         drbd_err(connection, "accept failed, err = %d\n", err);
797                         conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
798                 }
799         }
800
801         if (s_estab)
802                 unregister_state_change(s_estab->sk, ad);
803
804         return s_estab;
805 }
806
807 static int decode_header(struct drbd_connection *, void *, struct packet_info *);
808
809 static int send_first_packet(struct drbd_connection *connection, struct drbd_socket *sock,
810                              enum drbd_packet cmd)
811 {
812         if (!conn_prepare_command(connection, sock))
813                 return -EIO;
814         return conn_send_command(connection, sock, cmd, 0, NULL, 0);
815 }
816
817 static int receive_first_packet(struct drbd_connection *connection, struct socket *sock)
818 {
819         unsigned int header_size = drbd_header_size(connection);
820         struct packet_info pi;
821         struct net_conf *nc;
822         int err;
823
824         rcu_read_lock();
825         nc = rcu_dereference(connection->net_conf);
826         if (!nc) {
827                 rcu_read_unlock();
828                 return -EIO;
829         }
830         sock->sk->sk_rcvtimeo = nc->ping_timeo * 4 * HZ / 10;
831         rcu_read_unlock();
832
833         err = drbd_recv_short(sock, connection->data.rbuf, header_size, 0);
834         if (err != header_size) {
835                 if (err >= 0)
836                         err = -EIO;
837                 return err;
838         }
839         err = decode_header(connection, connection->data.rbuf, &pi);
840         if (err)
841                 return err;
842         return pi.cmd;
843 }
844
845 /**
846  * drbd_socket_okay() - Free the socket if its connection is not okay
847  * @sock:       pointer to the pointer to the socket.
848  */
849 static bool drbd_socket_okay(struct socket **sock)
850 {
851         int rr;
852         char tb[4];
853
854         if (!*sock)
855                 return false;
856
857         rr = drbd_recv_short(*sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
858
859         if (rr > 0 || rr == -EAGAIN) {
860                 return true;
861         } else {
862                 sock_release(*sock);
863                 *sock = NULL;
864                 return false;
865         }
866 }
867
868 static bool connection_established(struct drbd_connection *connection,
869                                    struct socket **sock1,
870                                    struct socket **sock2)
871 {
872         struct net_conf *nc;
873         int timeout;
874         bool ok;
875
876         if (!*sock1 || !*sock2)
877                 return false;
878
879         rcu_read_lock();
880         nc = rcu_dereference(connection->net_conf);
881         timeout = (nc->sock_check_timeo ?: nc->ping_timeo) * HZ / 10;
882         rcu_read_unlock();
883         schedule_timeout_interruptible(timeout);
884
885         ok = drbd_socket_okay(sock1);
886         ok = drbd_socket_okay(sock2) && ok;
887
888         return ok;
889 }
890
891 /* Gets called if a connection is established, or if a new minor gets created
892    in a connection */
893 int drbd_connected(struct drbd_peer_device *peer_device)
894 {
895         struct drbd_device *device = peer_device->device;
896         int err;
897
898         atomic_set(&device->packet_seq, 0);
899         device->peer_seq = 0;
900
901         device->state_mutex = peer_device->connection->agreed_pro_version < 100 ?
902                 &peer_device->connection->cstate_mutex :
903                 &device->own_state_mutex;
904
905         err = drbd_send_sync_param(peer_device);
906         if (!err)
907                 err = drbd_send_sizes(peer_device, 0, 0);
908         if (!err)
909                 err = drbd_send_uuids(peer_device);
910         if (!err)
911                 err = drbd_send_current_state(peer_device);
912         clear_bit(USE_DEGR_WFC_T, &device->flags);
913         clear_bit(RESIZE_PENDING, &device->flags);
914         atomic_set(&device->ap_in_flight, 0);
915         mod_timer(&device->request_timer, jiffies + HZ); /* just start it here. */
916         return err;
917 }
918
919 /*
920  * return values:
921  *   1 yes, we have a valid connection
922  *   0 oops, did not work out, please try again
923  *  -1 peer talks different language,
924  *     no point in trying again, please go standalone.
925  *  -2 We do not have a network config...
926  */
927 static int conn_connect(struct drbd_connection *connection)
928 {
929         struct drbd_socket sock, msock;
930         struct drbd_peer_device *peer_device;
931         struct net_conf *nc;
932         int vnr, timeout, h;
933         bool discard_my_data, ok;
934         enum drbd_state_rv rv;
935         struct accept_wait_data ad = {
936                 .connection = connection,
937                 .door_bell = COMPLETION_INITIALIZER_ONSTACK(ad.door_bell),
938         };
939
940         clear_bit(DISCONNECT_SENT, &connection->flags);
941         if (conn_request_state(connection, NS(conn, C_WF_CONNECTION), CS_VERBOSE) < SS_SUCCESS)
942                 return -2;
943
944         mutex_init(&sock.mutex);
945         sock.sbuf = connection->data.sbuf;
946         sock.rbuf = connection->data.rbuf;
947         sock.socket = NULL;
948         mutex_init(&msock.mutex);
949         msock.sbuf = connection->meta.sbuf;
950         msock.rbuf = connection->meta.rbuf;
951         msock.socket = NULL;
952
953         /* Assume that the peer only understands protocol 80 until we know better.  */
954         connection->agreed_pro_version = 80;
955
956         if (prepare_listen_socket(connection, &ad))
957                 return 0;
958
959         do {
960                 struct socket *s;
961
962                 s = drbd_try_connect(connection);
963                 if (s) {
964                         if (!sock.socket) {
965                                 sock.socket = s;
966                                 send_first_packet(connection, &sock, P_INITIAL_DATA);
967                         } else if (!msock.socket) {
968                                 clear_bit(RESOLVE_CONFLICTS, &connection->flags);
969                                 msock.socket = s;
970                                 send_first_packet(connection, &msock, P_INITIAL_META);
971                         } else {
972                                 drbd_err(connection, "Logic error in conn_connect()\n");
973                                 goto out_release_sockets;
974                         }
975                 }
976
977                 if (connection_established(connection, &sock.socket, &msock.socket))
978                         break;
979
980 retry:
981                 s = drbd_wait_for_connect(connection, &ad);
982                 if (s) {
983                         int fp = receive_first_packet(connection, s);
984                         drbd_socket_okay(&sock.socket);
985                         drbd_socket_okay(&msock.socket);
986                         switch (fp) {
987                         case P_INITIAL_DATA:
988                                 if (sock.socket) {
989                                         drbd_warn(connection, "initial packet S crossed\n");
990                                         sock_release(sock.socket);
991                                         sock.socket = s;
992                                         goto randomize;
993                                 }
994                                 sock.socket = s;
995                                 break;
996                         case P_INITIAL_META:
997                                 set_bit(RESOLVE_CONFLICTS, &connection->flags);
998                                 if (msock.socket) {
999                                         drbd_warn(connection, "initial packet M crossed\n");
1000                                         sock_release(msock.socket);
1001                                         msock.socket = s;
1002                                         goto randomize;
1003                                 }
1004                                 msock.socket = s;
1005                                 break;
1006                         default:
1007                                 drbd_warn(connection, "Error receiving initial packet\n");
1008                                 sock_release(s);
1009 randomize:
1010                                 if (prandom_u32() & 1)
1011                                         goto retry;
1012                         }
1013                 }
1014
1015                 if (connection->cstate <= C_DISCONNECTING)
1016                         goto out_release_sockets;
1017                 if (signal_pending(current)) {
1018                         flush_signals(current);
1019                         smp_rmb();
1020                         if (get_t_state(&connection->receiver) == EXITING)
1021                                 goto out_release_sockets;
1022                 }
1023
1024                 ok = connection_established(connection, &sock.socket, &msock.socket);
1025         } while (!ok);
1026
1027         if (ad.s_listen)
1028                 sock_release(ad.s_listen);
1029
1030         sock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
1031         msock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
1032
1033         sock.socket->sk->sk_allocation = GFP_NOIO;
1034         msock.socket->sk->sk_allocation = GFP_NOIO;
1035
1036         sock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
1037         msock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE;
1038
1039         /* NOT YET ...
1040          * sock.socket->sk->sk_sndtimeo = connection->net_conf->timeout*HZ/10;
1041          * sock.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
1042          * first set it to the P_CONNECTION_FEATURES timeout,
1043          * which we set to 4x the configured ping_timeout. */
1044         rcu_read_lock();
1045         nc = rcu_dereference(connection->net_conf);
1046
1047         sock.socket->sk->sk_sndtimeo =
1048         sock.socket->sk->sk_rcvtimeo = nc->ping_timeo*4*HZ/10;
1049
1050         msock.socket->sk->sk_rcvtimeo = nc->ping_int*HZ;
1051         timeout = nc->timeout * HZ / 10;
1052         discard_my_data = nc->discard_my_data;
1053         rcu_read_unlock();
1054
1055         msock.socket->sk->sk_sndtimeo = timeout;
1056
1057         /* we don't want delays.
1058          * we use TCP_CORK where appropriate, though */
1059         drbd_tcp_nodelay(sock.socket);
1060         drbd_tcp_nodelay(msock.socket);
1061
1062         connection->data.socket = sock.socket;
1063         connection->meta.socket = msock.socket;
1064         connection->last_received = jiffies;
1065
1066         h = drbd_do_features(connection);
1067         if (h <= 0)
1068                 return h;
1069
1070         if (connection->cram_hmac_tfm) {
1071                 /* drbd_request_state(device, NS(conn, WFAuth)); */
1072                 switch (drbd_do_auth(connection)) {
1073                 case -1:
1074                         drbd_err(connection, "Authentication of peer failed\n");
1075                         return -1;
1076                 case 0:
1077                         drbd_err(connection, "Authentication of peer failed, trying again.\n");
1078                         return 0;
1079                 }
1080         }
1081
1082         connection->data.socket->sk->sk_sndtimeo = timeout;
1083         connection->data.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
1084
1085         if (drbd_send_protocol(connection) == -EOPNOTSUPP)
1086                 return -1;
1087
1088         /* Prevent a race between resync-handshake and
1089          * being promoted to Primary.
1090          *
1091          * Grab and release the state mutex, so we know that any current
1092          * drbd_set_role() is finished, and any incoming drbd_set_role
1093          * will see the STATE_SENT flag, and wait for it to be cleared.
1094          */
1095         idr_for_each_entry(&connection->peer_devices, peer_device, vnr)
1096                 mutex_lock(peer_device->device->state_mutex);
1097
1098         set_bit(STATE_SENT, &connection->flags);
1099
1100         idr_for_each_entry(&connection->peer_devices, peer_device, vnr)
1101                 mutex_unlock(peer_device->device->state_mutex);
1102
1103         rcu_read_lock();
1104         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1105                 struct drbd_device *device = peer_device->device;
1106                 kref_get(&device->kref);
1107                 rcu_read_unlock();
1108
1109                 if (discard_my_data)
1110                         set_bit(DISCARD_MY_DATA, &device->flags);
1111                 else
1112                         clear_bit(DISCARD_MY_DATA, &device->flags);
1113
1114                 drbd_connected(peer_device);
1115                 kref_put(&device->kref, drbd_destroy_device);
1116                 rcu_read_lock();
1117         }
1118         rcu_read_unlock();
1119
1120         rv = conn_request_state(connection, NS(conn, C_WF_REPORT_PARAMS), CS_VERBOSE);
1121         if (rv < SS_SUCCESS || connection->cstate != C_WF_REPORT_PARAMS) {
1122                 clear_bit(STATE_SENT, &connection->flags);
1123                 return 0;
1124         }
1125
1126         drbd_thread_start(&connection->ack_receiver);
1127         /* opencoded create_singlethread_workqueue(),
1128          * to be able to use format string arguments */
1129         connection->ack_sender =
1130                 alloc_ordered_workqueue("drbd_as_%s", WQ_MEM_RECLAIM, connection->resource->name);
1131         if (!connection->ack_sender) {
1132                 drbd_err(connection, "Failed to create workqueue ack_sender\n");
1133                 return 0;
1134         }
1135
1136         mutex_lock(&connection->resource->conf_update);
1137         /* The discard_my_data flag is a single-shot modifier to the next
1138          * connection attempt, the handshake of which is now well underway.
1139          * No need for rcu style copying of the whole struct
1140          * just to clear a single value. */
1141         connection->net_conf->discard_my_data = 0;
1142         mutex_unlock(&connection->resource->conf_update);
1143
1144         return h;
1145
1146 out_release_sockets:
1147         if (ad.s_listen)
1148                 sock_release(ad.s_listen);
1149         if (sock.socket)
1150                 sock_release(sock.socket);
1151         if (msock.socket)
1152                 sock_release(msock.socket);
1153         return -1;
1154 }
1155
1156 static int decode_header(struct drbd_connection *connection, void *header, struct packet_info *pi)
1157 {
1158         unsigned int header_size = drbd_header_size(connection);
1159
1160         if (header_size == sizeof(struct p_header100) &&
1161             *(__be32 *)header == cpu_to_be32(DRBD_MAGIC_100)) {
1162                 struct p_header100 *h = header;
1163                 if (h->pad != 0) {
1164                         drbd_err(connection, "Header padding is not zero\n");
1165                         return -EINVAL;
1166                 }
1167                 pi->vnr = be16_to_cpu(h->volume);
1168                 pi->cmd = be16_to_cpu(h->command);
1169                 pi->size = be32_to_cpu(h->length);
1170         } else if (header_size == sizeof(struct p_header95) &&
1171                    *(__be16 *)header == cpu_to_be16(DRBD_MAGIC_BIG)) {
1172                 struct p_header95 *h = header;
1173                 pi->cmd = be16_to_cpu(h->command);
1174                 pi->size = be32_to_cpu(h->length);
1175                 pi->vnr = 0;
1176         } else if (header_size == sizeof(struct p_header80) &&
1177                    *(__be32 *)header == cpu_to_be32(DRBD_MAGIC)) {
1178                 struct p_header80 *h = header;
1179                 pi->cmd = be16_to_cpu(h->command);
1180                 pi->size = be16_to_cpu(h->length);
1181                 pi->vnr = 0;
1182         } else {
1183                 drbd_err(connection, "Wrong magic value 0x%08x in protocol version %d\n",
1184                          be32_to_cpu(*(__be32 *)header),
1185                          connection->agreed_pro_version);
1186                 return -EINVAL;
1187         }
1188         pi->data = header + header_size;
1189         return 0;
1190 }
1191
1192 static int drbd_recv_header(struct drbd_connection *connection, struct packet_info *pi)
1193 {
1194         void *buffer = connection->data.rbuf;
1195         int err;
1196
1197         err = drbd_recv_all_warn(connection, buffer, drbd_header_size(connection));
1198         if (err)
1199                 return err;
1200
1201         err = decode_header(connection, buffer, pi);
1202         connection->last_received = jiffies;
1203
1204         return err;
1205 }
1206
1207 static void drbd_flush(struct drbd_connection *connection)
1208 {
1209         int rv;
1210         struct drbd_peer_device *peer_device;
1211         int vnr;
1212
1213         if (connection->resource->write_ordering >= WO_BDEV_FLUSH) {
1214                 rcu_read_lock();
1215                 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1216                         struct drbd_device *device = peer_device->device;
1217
1218                         if (!get_ldev(device))
1219                                 continue;
1220                         kref_get(&device->kref);
1221                         rcu_read_unlock();
1222
1223                         /* Right now, we have only this one synchronous code path
1224                          * for flushes between request epochs.
1225                          * We may want to make those asynchronous,
1226                          * or at least parallelize the flushes to the volume devices.
1227                          */
1228                         device->flush_jif = jiffies;
1229                         set_bit(FLUSH_PENDING, &device->flags);
1230                         rv = blkdev_issue_flush(device->ldev->backing_bdev,
1231                                         GFP_NOIO, NULL);
1232                         clear_bit(FLUSH_PENDING, &device->flags);
1233                         if (rv) {
1234                                 drbd_info(device, "local disk flush failed with status %d\n", rv);
1235                                 /* would rather check on EOPNOTSUPP, but that is not reliable.
1236                                  * don't try again for ANY return value != 0
1237                                  * if (rv == -EOPNOTSUPP) */
1238                                 drbd_bump_write_ordering(connection->resource, NULL, WO_DRAIN_IO);
1239                         }
1240                         put_ldev(device);
1241                         kref_put(&device->kref, drbd_destroy_device);
1242
1243                         rcu_read_lock();
1244                         if (rv)
1245                                 break;
1246                 }
1247                 rcu_read_unlock();
1248         }
1249 }
1250
1251 /**
1252  * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
1253  * @device:     DRBD device.
1254  * @epoch:      Epoch object.
1255  * @ev:         Epoch event.
1256  */
1257 static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *connection,
1258                                                struct drbd_epoch *epoch,
1259                                                enum epoch_event ev)
1260 {
1261         int epoch_size;
1262         struct drbd_epoch *next_epoch;
1263         enum finish_epoch rv = FE_STILL_LIVE;
1264
1265         spin_lock(&connection->epoch_lock);
1266         do {
1267                 next_epoch = NULL;
1268
1269                 epoch_size = atomic_read(&epoch->epoch_size);
1270
1271                 switch (ev & ~EV_CLEANUP) {
1272                 case EV_PUT:
1273                         atomic_dec(&epoch->active);
1274                         break;
1275                 case EV_GOT_BARRIER_NR:
1276                         set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1277                         break;
1278                 case EV_BECAME_LAST:
1279                         /* nothing to do*/
1280                         break;
1281                 }
1282
1283                 if (epoch_size != 0 &&
1284                     atomic_read(&epoch->active) == 0 &&
1285                     (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags) || ev & EV_CLEANUP)) {
1286                         if (!(ev & EV_CLEANUP)) {
1287                                 spin_unlock(&connection->epoch_lock);
1288                                 drbd_send_b_ack(epoch->connection, epoch->barrier_nr, epoch_size);
1289                                 spin_lock(&connection->epoch_lock);
1290                         }
1291 #if 0
1292                         /* FIXME: dec unacked on connection, once we have
1293                          * something to count pending connection packets in. */
1294                         if (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags))
1295                                 dec_unacked(epoch->connection);
1296 #endif
1297
1298                         if (connection->current_epoch != epoch) {
1299                                 next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1300                                 list_del(&epoch->list);
1301                                 ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1302                                 connection->epochs--;
1303                                 kfree(epoch);
1304
1305                                 if (rv == FE_STILL_LIVE)
1306                                         rv = FE_DESTROYED;
1307                         } else {
1308                                 epoch->flags = 0;
1309                                 atomic_set(&epoch->epoch_size, 0);
1310                                 /* atomic_set(&epoch->active, 0); is already zero */
1311                                 if (rv == FE_STILL_LIVE)
1312                                         rv = FE_RECYCLED;
1313                         }
1314                 }
1315
1316                 if (!next_epoch)
1317                         break;
1318
1319                 epoch = next_epoch;
1320         } while (1);
1321
1322         spin_unlock(&connection->epoch_lock);
1323
1324         return rv;
1325 }
1326
1327 static enum write_ordering_e
1328 max_allowed_wo(struct drbd_backing_dev *bdev, enum write_ordering_e wo)
1329 {
1330         struct disk_conf *dc;
1331
1332         dc = rcu_dereference(bdev->disk_conf);
1333
1334         if (wo == WO_BDEV_FLUSH && !dc->disk_flushes)
1335                 wo = WO_DRAIN_IO;
1336         if (wo == WO_DRAIN_IO && !dc->disk_drain)
1337                 wo = WO_NONE;
1338
1339         return wo;
1340 }
1341
1342 /**
1343  * drbd_bump_write_ordering() - Fall back to an other write ordering method
1344  * @connection: DRBD connection.
1345  * @wo:         Write ordering method to try.
1346  */
1347 void drbd_bump_write_ordering(struct drbd_resource *resource, struct drbd_backing_dev *bdev,
1348                               enum write_ordering_e wo)
1349 {
1350         struct drbd_device *device;
1351         enum write_ordering_e pwo;
1352         int vnr;
1353         static char *write_ordering_str[] = {
1354                 [WO_NONE] = "none",
1355                 [WO_DRAIN_IO] = "drain",
1356                 [WO_BDEV_FLUSH] = "flush",
1357         };
1358
1359         pwo = resource->write_ordering;
1360         if (wo != WO_BDEV_FLUSH)
1361                 wo = min(pwo, wo);
1362         rcu_read_lock();
1363         idr_for_each_entry(&resource->devices, device, vnr) {
1364                 if (get_ldev(device)) {
1365                         wo = max_allowed_wo(device->ldev, wo);
1366                         if (device->ldev == bdev)
1367                                 bdev = NULL;
1368                         put_ldev(device);
1369                 }
1370         }
1371
1372         if (bdev)
1373                 wo = max_allowed_wo(bdev, wo);
1374
1375         rcu_read_unlock();
1376
1377         resource->write_ordering = wo;
1378         if (pwo != resource->write_ordering || wo == WO_BDEV_FLUSH)
1379                 drbd_info(resource, "Method to ensure write ordering: %s\n", write_ordering_str[resource->write_ordering]);
1380 }
1381
1382 /**
1383  * drbd_submit_peer_request()
1384  * @device:     DRBD device.
1385  * @peer_req:   peer request
1386  * @rw:         flag field, see bio->bi_rw
1387  *
1388  * May spread the pages to multiple bios,
1389  * depending on bio_add_page restrictions.
1390  *
1391  * Returns 0 if all bios have been submitted,
1392  * -ENOMEM if we could not allocate enough bios,
1393  * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1394  *  single page to an empty bio (which should never happen and likely indicates
1395  *  that the lower level IO stack is in some way broken). This has been observed
1396  *  on certain Xen deployments.
1397  */
1398 /* TODO allocate from our own bio_set. */
1399 int drbd_submit_peer_request(struct drbd_device *device,
1400                              struct drbd_peer_request *peer_req,
1401                              const unsigned rw, const int fault_type)
1402 {
1403         struct bio *bios = NULL;
1404         struct bio *bio;
1405         struct page *page = peer_req->pages;
1406         sector_t sector = peer_req->i.sector;
1407         unsigned data_size = peer_req->i.size;
1408         unsigned n_bios = 0;
1409         unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
1410         int err = -ENOMEM;
1411
1412         if (peer_req->flags & EE_IS_TRIM_USE_ZEROOUT) {
1413                 /* wait for all pending IO completions, before we start
1414                  * zeroing things out. */
1415                 conn_wait_active_ee_empty(peer_req->peer_device->connection);
1416                 /* add it to the active list now,
1417                  * so we can find it to present it in debugfs */
1418                 peer_req->submit_jif = jiffies;
1419                 peer_req->flags |= EE_SUBMITTED;
1420                 spin_lock_irq(&device->resource->req_lock);
1421                 list_add_tail(&peer_req->w.list, &device->active_ee);
1422                 spin_unlock_irq(&device->resource->req_lock);
1423                 if (blkdev_issue_zeroout(device->ldev->backing_bdev,
1424                         sector, data_size >> 9, GFP_NOIO, false))
1425                         peer_req->flags |= EE_WAS_ERROR;
1426                 drbd_endio_write_sec_final(peer_req);
1427                 return 0;
1428         }
1429
1430         /* Discards don't have any payload.
1431          * But the scsi layer still expects a bio_vec it can use internally,
1432          * see sd_setup_discard_cmnd() and blk_add_request_payload(). */
1433         if (peer_req->flags & EE_IS_TRIM)
1434                 nr_pages = 1;
1435
1436         /* In most cases, we will only need one bio.  But in case the lower
1437          * level restrictions happen to be different at this offset on this
1438          * side than those of the sending peer, we may need to submit the
1439          * request in more than one bio.
1440          *
1441          * Plain bio_alloc is good enough here, this is no DRBD internally
1442          * generated bio, but a bio allocated on behalf of the peer.
1443          */
1444 next_bio:
1445         bio = bio_alloc(GFP_NOIO, nr_pages);
1446         if (!bio) {
1447                 drbd_err(device, "submit_ee: Allocation of a bio failed (nr_pages=%u)\n", nr_pages);
1448                 goto fail;
1449         }
1450         /* > peer_req->i.sector, unless this is the first bio */
1451         bio->bi_iter.bi_sector = sector;
1452         bio->bi_bdev = device->ldev->backing_bdev;
1453         bio->bi_rw = rw;
1454         bio->bi_private = peer_req;
1455         bio->bi_end_io = drbd_peer_request_endio;
1456
1457         bio->bi_next = bios;
1458         bios = bio;
1459         ++n_bios;
1460
1461         if (rw & REQ_DISCARD) {
1462                 bio->bi_iter.bi_size = data_size;
1463                 goto submit;
1464         }
1465
1466         page_chain_for_each(page) {
1467                 unsigned len = min_t(unsigned, data_size, PAGE_SIZE);
1468                 if (!bio_add_page(bio, page, len, 0)) {
1469                         /* A single page must always be possible!
1470                          * But in case it fails anyways,
1471                          * we deal with it, and complain (below). */
1472                         if (bio->bi_vcnt == 0) {
1473                                 drbd_err(device,
1474                                         "bio_add_page failed for len=%u, "
1475                                         "bi_vcnt=0 (bi_sector=%llu)\n",
1476                                         len, (uint64_t)bio->bi_iter.bi_sector);
1477                                 err = -ENOSPC;
1478                                 goto fail;
1479                         }
1480                         goto next_bio;
1481                 }
1482                 data_size -= len;
1483                 sector += len >> 9;
1484                 --nr_pages;
1485         }
1486         D_ASSERT(device, data_size == 0);
1487 submit:
1488         D_ASSERT(device, page == NULL);
1489
1490         atomic_set(&peer_req->pending_bios, n_bios);
1491         /* for debugfs: update timestamp, mark as submitted */
1492         peer_req->submit_jif = jiffies;
1493         peer_req->flags |= EE_SUBMITTED;
1494         do {
1495                 bio = bios;
1496                 bios = bios->bi_next;
1497                 bio->bi_next = NULL;
1498
1499                 drbd_generic_make_request(device, fault_type, bio);
1500         } while (bios);
1501         return 0;
1502
1503 fail:
1504         while (bios) {
1505                 bio = bios;
1506                 bios = bios->bi_next;
1507                 bio_put(bio);
1508         }
1509         return err;
1510 }
1511
1512 static void drbd_remove_epoch_entry_interval(struct drbd_device *device,
1513                                              struct drbd_peer_request *peer_req)
1514 {
1515         struct drbd_interval *i = &peer_req->i;
1516
1517         drbd_remove_interval(&device->write_requests, i);
1518         drbd_clear_interval(i);
1519
1520         /* Wake up any processes waiting for this peer request to complete.  */
1521         if (i->waiting)
1522                 wake_up(&device->misc_wait);
1523 }
1524
1525 static void conn_wait_active_ee_empty(struct drbd_connection *connection)
1526 {
1527         struct drbd_peer_device *peer_device;
1528         int vnr;
1529
1530         rcu_read_lock();
1531         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1532                 struct drbd_device *device = peer_device->device;
1533
1534                 kref_get(&device->kref);
1535                 rcu_read_unlock();
1536                 drbd_wait_ee_list_empty(device, &device->active_ee);
1537                 kref_put(&device->kref, drbd_destroy_device);
1538                 rcu_read_lock();
1539         }
1540         rcu_read_unlock();
1541 }
1542
1543 static int receive_Barrier(struct drbd_connection *connection, struct packet_info *pi)
1544 {
1545         int rv;
1546         struct p_barrier *p = pi->data;
1547         struct drbd_epoch *epoch;
1548
1549         /* FIXME these are unacked on connection,
1550          * not a specific (peer)device.
1551          */
1552         connection->current_epoch->barrier_nr = p->barrier;
1553         connection->current_epoch->connection = connection;
1554         rv = drbd_may_finish_epoch(connection, connection->current_epoch, EV_GOT_BARRIER_NR);
1555
1556         /* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1557          * the activity log, which means it would not be resynced in case the
1558          * R_PRIMARY crashes now.
1559          * Therefore we must send the barrier_ack after the barrier request was
1560          * completed. */
1561         switch (connection->resource->write_ordering) {
1562         case WO_NONE:
1563                 if (rv == FE_RECYCLED)
1564                         return 0;
1565
1566                 /* receiver context, in the writeout path of the other node.
1567                  * avoid potential distributed deadlock */
1568                 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1569                 if (epoch)
1570                         break;
1571                 else
1572                         drbd_warn(connection, "Allocation of an epoch failed, slowing down\n");
1573                         /* Fall through */
1574
1575         case WO_BDEV_FLUSH:
1576         case WO_DRAIN_IO:
1577                 conn_wait_active_ee_empty(connection);
1578                 drbd_flush(connection);
1579
1580                 if (atomic_read(&connection->current_epoch->epoch_size)) {
1581                         epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1582                         if (epoch)
1583                                 break;
1584                 }
1585
1586                 return 0;
1587         default:
1588                 drbd_err(connection, "Strangeness in connection->write_ordering %d\n",
1589                          connection->resource->write_ordering);
1590                 return -EIO;
1591         }
1592
1593         epoch->flags = 0;
1594         atomic_set(&epoch->epoch_size, 0);
1595         atomic_set(&epoch->active, 0);
1596
1597         spin_lock(&connection->epoch_lock);
1598         if (atomic_read(&connection->current_epoch->epoch_size)) {
1599                 list_add(&epoch->list, &connection->current_epoch->list);
1600                 connection->current_epoch = epoch;
1601                 connection->epochs++;
1602         } else {
1603                 /* The current_epoch got recycled while we allocated this one... */
1604                 kfree(epoch);
1605         }
1606         spin_unlock(&connection->epoch_lock);
1607
1608         return 0;
1609 }
1610
1611 /* used from receive_RSDataReply (recv_resync_read)
1612  * and from receive_Data */
1613 static struct drbd_peer_request *
1614 read_in_block(struct drbd_peer_device *peer_device, u64 id, sector_t sector,
1615               struct packet_info *pi) __must_hold(local)
1616 {
1617         struct drbd_device *device = peer_device->device;
1618         const sector_t capacity = drbd_get_capacity(device->this_bdev);
1619         struct drbd_peer_request *peer_req;
1620         struct page *page;
1621         int digest_size, err;
1622         unsigned int data_size = pi->size, ds;
1623         void *dig_in = peer_device->connection->int_dig_in;
1624         void *dig_vv = peer_device->connection->int_dig_vv;
1625         unsigned long *data;
1626         struct p_trim *trim = (pi->cmd == P_TRIM) ? pi->data : NULL;
1627
1628         digest_size = 0;
1629         if (!trim && peer_device->connection->peer_integrity_tfm) {
1630                 digest_size = crypto_ahash_digestsize(peer_device->connection->peer_integrity_tfm);
1631                 /*
1632                  * FIXME: Receive the incoming digest into the receive buffer
1633                  *        here, together with its struct p_data?
1634                  */
1635                 err = drbd_recv_all_warn(peer_device->connection, dig_in, digest_size);
1636                 if (err)
1637                         return NULL;
1638                 data_size -= digest_size;
1639         }
1640
1641         if (trim) {
1642                 D_ASSERT(peer_device, data_size == 0);
1643                 data_size = be32_to_cpu(trim->size);
1644         }
1645
1646         if (!expect(IS_ALIGNED(data_size, 512)))
1647                 return NULL;
1648         /* prepare for larger trim requests. */
1649         if (!trim && !expect(data_size <= DRBD_MAX_BIO_SIZE))
1650                 return NULL;
1651
1652         /* even though we trust out peer,
1653          * we sometimes have to double check. */
1654         if (sector + (data_size>>9) > capacity) {
1655                 drbd_err(device, "request from peer beyond end of local disk: "
1656                         "capacity: %llus < sector: %llus + size: %u\n",
1657                         (unsigned long long)capacity,
1658                         (unsigned long long)sector, data_size);
1659                 return NULL;
1660         }
1661
1662         /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1663          * "criss-cross" setup, that might cause write-out on some other DRBD,
1664          * which in turn might block on the other node at this very place.  */
1665         peer_req = drbd_alloc_peer_req(peer_device, id, sector, data_size, trim == NULL, GFP_NOIO);
1666         if (!peer_req)
1667                 return NULL;
1668
1669         peer_req->flags |= EE_WRITE;
1670         if (trim)
1671                 return peer_req;
1672
1673         ds = data_size;
1674         page = peer_req->pages;
1675         page_chain_for_each(page) {
1676                 unsigned len = min_t(int, ds, PAGE_SIZE);
1677                 data = kmap(page);
1678                 err = drbd_recv_all_warn(peer_device->connection, data, len);
1679                 if (drbd_insert_fault(device, DRBD_FAULT_RECEIVE)) {
1680                         drbd_err(device, "Fault injection: Corrupting data on receive\n");
1681                         data[0] = data[0] ^ (unsigned long)-1;
1682                 }
1683                 kunmap(page);
1684                 if (err) {
1685                         drbd_free_peer_req(device, peer_req);
1686                         return NULL;
1687                 }
1688                 ds -= len;
1689         }
1690
1691         if (digest_size) {
1692                 drbd_csum_ee(peer_device->connection->peer_integrity_tfm, peer_req, dig_vv);
1693                 if (memcmp(dig_in, dig_vv, digest_size)) {
1694                         drbd_err(device, "Digest integrity check FAILED: %llus +%u\n",
1695                                 (unsigned long long)sector, data_size);
1696                         drbd_free_peer_req(device, peer_req);
1697                         return NULL;
1698                 }
1699         }
1700         device->recv_cnt += data_size >> 9;
1701         return peer_req;
1702 }
1703
1704 /* drbd_drain_block() just takes a data block
1705  * out of the socket input buffer, and discards it.
1706  */
1707 static int drbd_drain_block(struct drbd_peer_device *peer_device, int data_size)
1708 {
1709         struct page *page;
1710         int err = 0;
1711         void *data;
1712
1713         if (!data_size)
1714                 return 0;
1715
1716         page = drbd_alloc_pages(peer_device, 1, 1);
1717
1718         data = kmap(page);
1719         while (data_size) {
1720                 unsigned int len = min_t(int, data_size, PAGE_SIZE);
1721
1722                 err = drbd_recv_all_warn(peer_device->connection, data, len);
1723                 if (err)
1724                         break;
1725                 data_size -= len;
1726         }
1727         kunmap(page);
1728         drbd_free_pages(peer_device->device, page, 0);
1729         return err;
1730 }
1731
1732 static int recv_dless_read(struct drbd_peer_device *peer_device, struct drbd_request *req,
1733                            sector_t sector, int data_size)
1734 {
1735         struct bio_vec bvec;
1736         struct bvec_iter iter;
1737         struct bio *bio;
1738         int digest_size, err, expect;
1739         void *dig_in = peer_device->connection->int_dig_in;
1740         void *dig_vv = peer_device->connection->int_dig_vv;
1741
1742         digest_size = 0;
1743         if (peer_device->connection->peer_integrity_tfm) {
1744                 digest_size = crypto_ahash_digestsize(peer_device->connection->peer_integrity_tfm);
1745                 err = drbd_recv_all_warn(peer_device->connection, dig_in, digest_size);
1746                 if (err)
1747                         return err;
1748                 data_size -= digest_size;
1749         }
1750
1751         /* optimistically update recv_cnt.  if receiving fails below,
1752          * we disconnect anyways, and counters will be reset. */
1753         peer_device->device->recv_cnt += data_size>>9;
1754
1755         bio = req->master_bio;
1756         D_ASSERT(peer_device->device, sector == bio->bi_iter.bi_sector);
1757
1758         bio_for_each_segment(bvec, bio, iter) {
1759                 void *mapped = kmap(bvec.bv_page) + bvec.bv_offset;
1760                 expect = min_t(int, data_size, bvec.bv_len);
1761                 err = drbd_recv_all_warn(peer_device->connection, mapped, expect);
1762                 kunmap(bvec.bv_page);
1763                 if (err)
1764                         return err;
1765                 data_size -= expect;
1766         }
1767
1768         if (digest_size) {
1769                 drbd_csum_bio(peer_device->connection->peer_integrity_tfm, bio, dig_vv);
1770                 if (memcmp(dig_in, dig_vv, digest_size)) {
1771                         drbd_err(peer_device, "Digest integrity check FAILED. Broken NICs?\n");
1772                         return -EINVAL;
1773                 }
1774         }
1775
1776         D_ASSERT(peer_device->device, data_size == 0);
1777         return 0;
1778 }
1779
1780 /*
1781  * e_end_resync_block() is called in ack_sender context via
1782  * drbd_finish_peer_reqs().
1783  */
1784 static int e_end_resync_block(struct drbd_work *w, int unused)
1785 {
1786         struct drbd_peer_request *peer_req =
1787                 container_of(w, struct drbd_peer_request, w);
1788         struct drbd_peer_device *peer_device = peer_req->peer_device;
1789         struct drbd_device *device = peer_device->device;
1790         sector_t sector = peer_req->i.sector;
1791         int err;
1792
1793         D_ASSERT(device, drbd_interval_empty(&peer_req->i));
1794
1795         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1796                 drbd_set_in_sync(device, sector, peer_req->i.size);
1797                 err = drbd_send_ack(peer_device, P_RS_WRITE_ACK, peer_req);
1798         } else {
1799                 /* Record failure to sync */
1800                 drbd_rs_failed_io(device, sector, peer_req->i.size);
1801
1802                 err  = drbd_send_ack(peer_device, P_NEG_ACK, peer_req);
1803         }
1804         dec_unacked(device);
1805
1806         return err;
1807 }
1808
1809 static int recv_resync_read(struct drbd_peer_device *peer_device, sector_t sector,
1810                             struct packet_info *pi) __releases(local)
1811 {
1812         struct drbd_device *device = peer_device->device;
1813         struct drbd_peer_request *peer_req;
1814
1815         peer_req = read_in_block(peer_device, ID_SYNCER, sector, pi);
1816         if (!peer_req)
1817                 goto fail;
1818
1819         dec_rs_pending(device);
1820
1821         inc_unacked(device);
1822         /* corresponding dec_unacked() in e_end_resync_block()
1823          * respective _drbd_clear_done_ee */
1824
1825         peer_req->w.cb = e_end_resync_block;
1826         peer_req->submit_jif = jiffies;
1827
1828         spin_lock_irq(&device->resource->req_lock);
1829         list_add_tail(&peer_req->w.list, &device->sync_ee);
1830         spin_unlock_irq(&device->resource->req_lock);
1831
1832         atomic_add(pi->size >> 9, &device->rs_sect_ev);
1833         if (drbd_submit_peer_request(device, peer_req, WRITE, DRBD_FAULT_RS_WR) == 0)
1834                 return 0;
1835
1836         /* don't care for the reason here */
1837         drbd_err(device, "submit failed, triggering re-connect\n");
1838         spin_lock_irq(&device->resource->req_lock);
1839         list_del(&peer_req->w.list);
1840         spin_unlock_irq(&device->resource->req_lock);
1841
1842         drbd_free_peer_req(device, peer_req);
1843 fail:
1844         put_ldev(device);
1845         return -EIO;
1846 }
1847
1848 static struct drbd_request *
1849 find_request(struct drbd_device *device, struct rb_root *root, u64 id,
1850              sector_t sector, bool missing_ok, const char *func)
1851 {
1852         struct drbd_request *req;
1853
1854         /* Request object according to our peer */
1855         req = (struct drbd_request *)(unsigned long)id;
1856         if (drbd_contains_interval(root, sector, &req->i) && req->i.local)
1857                 return req;
1858         if (!missing_ok) {
1859                 drbd_err(device, "%s: failed to find request 0x%lx, sector %llus\n", func,
1860                         (unsigned long)id, (unsigned long long)sector);
1861         }
1862         return NULL;
1863 }
1864
1865 static int receive_DataReply(struct drbd_connection *connection, struct packet_info *pi)
1866 {
1867         struct drbd_peer_device *peer_device;
1868         struct drbd_device *device;
1869         struct drbd_request *req;
1870         sector_t sector;
1871         int err;
1872         struct p_data *p = pi->data;
1873
1874         peer_device = conn_peer_device(connection, pi->vnr);
1875         if (!peer_device)
1876                 return -EIO;
1877         device = peer_device->device;
1878
1879         sector = be64_to_cpu(p->sector);
1880
1881         spin_lock_irq(&device->resource->req_lock);
1882         req = find_request(device, &device->read_requests, p->block_id, sector, false, __func__);
1883         spin_unlock_irq(&device->resource->req_lock);
1884         if (unlikely(!req))
1885                 return -EIO;
1886
1887         /* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
1888          * special casing it there for the various failure cases.
1889          * still no race with drbd_fail_pending_reads */
1890         err = recv_dless_read(peer_device, req, sector, pi->size);
1891         if (!err)
1892                 req_mod(req, DATA_RECEIVED);
1893         /* else: nothing. handled from drbd_disconnect...
1894          * I don't think we may complete this just yet
1895          * in case we are "on-disconnect: freeze" */
1896
1897         return err;
1898 }
1899
1900 static int receive_RSDataReply(struct drbd_connection *connection, struct packet_info *pi)
1901 {
1902         struct drbd_peer_device *peer_device;
1903         struct drbd_device *device;
1904         sector_t sector;
1905         int err;
1906         struct p_data *p = pi->data;
1907
1908         peer_device = conn_peer_device(connection, pi->vnr);
1909         if (!peer_device)
1910                 return -EIO;
1911         device = peer_device->device;
1912
1913         sector = be64_to_cpu(p->sector);
1914         D_ASSERT(device, p->block_id == ID_SYNCER);
1915
1916         if (get_ldev(device)) {
1917                 /* data is submitted to disk within recv_resync_read.
1918                  * corresponding put_ldev done below on error,
1919                  * or in drbd_peer_request_endio. */
1920                 err = recv_resync_read(peer_device, sector, pi);
1921         } else {
1922                 if (__ratelimit(&drbd_ratelimit_state))
1923                         drbd_err(device, "Can not write resync data to local disk.\n");
1924
1925                 err = drbd_drain_block(peer_device, pi->size);
1926
1927                 drbd_send_ack_dp(peer_device, P_NEG_ACK, p, pi->size);
1928         }
1929
1930         atomic_add(pi->size >> 9, &device->rs_sect_in);
1931
1932         return err;
1933 }
1934
1935 static void restart_conflicting_writes(struct drbd_device *device,
1936                                        sector_t sector, int size)
1937 {
1938         struct drbd_interval *i;
1939         struct drbd_request *req;
1940
1941         drbd_for_each_overlap(i, &device->write_requests, sector, size) {
1942                 if (!i->local)
1943                         continue;
1944                 req = container_of(i, struct drbd_request, i);
1945                 if (req->rq_state & RQ_LOCAL_PENDING ||
1946                     !(req->rq_state & RQ_POSTPONED))
1947                         continue;
1948                 /* as it is RQ_POSTPONED, this will cause it to
1949                  * be queued on the retry workqueue. */
1950                 __req_mod(req, CONFLICT_RESOLVED, NULL);
1951         }
1952 }
1953
1954 /*
1955  * e_end_block() is called in ack_sender context via drbd_finish_peer_reqs().
1956  */
1957 static int e_end_block(struct drbd_work *w, int cancel)
1958 {
1959         struct drbd_peer_request *peer_req =
1960                 container_of(w, struct drbd_peer_request, w);
1961         struct drbd_peer_device *peer_device = peer_req->peer_device;
1962         struct drbd_device *device = peer_device->device;
1963         sector_t sector = peer_req->i.sector;
1964         int err = 0, pcmd;
1965
1966         if (peer_req->flags & EE_SEND_WRITE_ACK) {
1967                 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1968                         pcmd = (device->state.conn >= C_SYNC_SOURCE &&
1969                                 device->state.conn <= C_PAUSED_SYNC_T &&
1970                                 peer_req->flags & EE_MAY_SET_IN_SYNC) ?
1971                                 P_RS_WRITE_ACK : P_WRITE_ACK;
1972                         err = drbd_send_ack(peer_device, pcmd, peer_req);
1973                         if (pcmd == P_RS_WRITE_ACK)
1974                                 drbd_set_in_sync(device, sector, peer_req->i.size);
1975                 } else {
1976                         err = drbd_send_ack(peer_device, P_NEG_ACK, peer_req);
1977                         /* we expect it to be marked out of sync anyways...
1978                          * maybe assert this?  */
1979                 }
1980                 dec_unacked(device);
1981         }
1982
1983         /* we delete from the conflict detection hash _after_ we sent out the
1984          * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right.  */
1985         if (peer_req->flags & EE_IN_INTERVAL_TREE) {
1986                 spin_lock_irq(&device->resource->req_lock);
1987                 D_ASSERT(device, !drbd_interval_empty(&peer_req->i));
1988                 drbd_remove_epoch_entry_interval(device, peer_req);
1989                 if (peer_req->flags & EE_RESTART_REQUESTS)
1990                         restart_conflicting_writes(device, sector, peer_req->i.size);
1991                 spin_unlock_irq(&device->resource->req_lock);
1992         } else
1993                 D_ASSERT(device, drbd_interval_empty(&peer_req->i));
1994
1995         drbd_may_finish_epoch(peer_device->connection, peer_req->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
1996
1997         return err;
1998 }
1999
2000 static int e_send_ack(struct drbd_work *w, enum drbd_packet ack)
2001 {
2002         struct drbd_peer_request *peer_req =
2003                 container_of(w, struct drbd_peer_request, w);
2004         struct drbd_peer_device *peer_device = peer_req->peer_device;
2005         int err;
2006
2007         err = drbd_send_ack(peer_device, ack, peer_req);
2008         dec_unacked(peer_device->device);
2009
2010         return err;
2011 }
2012
2013 static int e_send_superseded(struct drbd_work *w, int unused)
2014 {
2015         return e_send_ack(w, P_SUPERSEDED);
2016 }
2017
2018 static int e_send_retry_write(struct drbd_work *w, int unused)
2019 {
2020         struct drbd_peer_request *peer_req =
2021                 container_of(w, struct drbd_peer_request, w);
2022         struct drbd_connection *connection = peer_req->peer_device->connection;
2023
2024         return e_send_ack(w, connection->agreed_pro_version >= 100 ?
2025                              P_RETRY_WRITE : P_SUPERSEDED);
2026 }
2027
2028 static bool seq_greater(u32 a, u32 b)
2029 {
2030         /*
2031          * We assume 32-bit wrap-around here.
2032          * For 24-bit wrap-around, we would have to shift:
2033          *  a <<= 8; b <<= 8;
2034          */
2035         return (s32)a - (s32)b > 0;
2036 }
2037
2038 static u32 seq_max(u32 a, u32 b)
2039 {
2040         return seq_greater(a, b) ? a : b;
2041 }
2042
2043 static void update_peer_seq(struct drbd_peer_device *peer_device, unsigned int peer_seq)
2044 {
2045         struct drbd_device *device = peer_device->device;
2046         unsigned int newest_peer_seq;
2047
2048         if (test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)) {
2049                 spin_lock(&device->peer_seq_lock);
2050                 newest_peer_seq = seq_max(device->peer_seq, peer_seq);
2051                 device->peer_seq = newest_peer_seq;
2052                 spin_unlock(&device->peer_seq_lock);
2053                 /* wake up only if we actually changed device->peer_seq */
2054                 if (peer_seq == newest_peer_seq)
2055                         wake_up(&device->seq_wait);
2056         }
2057 }
2058
2059 static inline int overlaps(sector_t s1, int l1, sector_t s2, int l2)
2060 {
2061         return !((s1 + (l1>>9) <= s2) || (s1 >= s2 + (l2>>9)));
2062 }
2063
2064 /* maybe change sync_ee into interval trees as well? */
2065 static bool overlapping_resync_write(struct drbd_device *device, struct drbd_peer_request *peer_req)
2066 {
2067         struct drbd_peer_request *rs_req;
2068         bool rv = 0;
2069
2070         spin_lock_irq(&device->resource->req_lock);
2071         list_for_each_entry(rs_req, &device->sync_ee, w.list) {
2072                 if (overlaps(peer_req->i.sector, peer_req->i.size,
2073                              rs_req->i.sector, rs_req->i.size)) {
2074                         rv = 1;
2075                         break;
2076                 }
2077         }
2078         spin_unlock_irq(&device->resource->req_lock);
2079
2080         return rv;
2081 }
2082
2083 /* Called from receive_Data.
2084  * Synchronize packets on sock with packets on msock.
2085  *
2086  * This is here so even when a P_DATA packet traveling via sock overtook an Ack
2087  * packet traveling on msock, they are still processed in the order they have
2088  * been sent.
2089  *
2090  * Note: we don't care for Ack packets overtaking P_DATA packets.
2091  *
2092  * In case packet_seq is larger than device->peer_seq number, there are
2093  * outstanding packets on the msock. We wait for them to arrive.
2094  * In case we are the logically next packet, we update device->peer_seq
2095  * ourselves. Correctly handles 32bit wrap around.
2096  *
2097  * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
2098  * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
2099  * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
2100  * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
2101  *
2102  * returns 0 if we may process the packet,
2103  * -ERESTARTSYS if we were interrupted (by disconnect signal). */
2104 static int wait_for_and_update_peer_seq(struct drbd_peer_device *peer_device, const u32 peer_seq)
2105 {
2106         struct drbd_device *device = peer_device->device;
2107         DEFINE_WAIT(wait);
2108         long timeout;
2109         int ret = 0, tp;
2110
2111         if (!test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags))
2112                 return 0;
2113
2114         spin_lock(&device->peer_seq_lock);
2115         for (;;) {
2116                 if (!seq_greater(peer_seq - 1, device->peer_seq)) {
2117                         device->peer_seq = seq_max(device->peer_seq, peer_seq);
2118                         break;
2119                 }
2120
2121                 if (signal_pending(current)) {
2122                         ret = -ERESTARTSYS;
2123                         break;
2124                 }
2125
2126                 rcu_read_lock();
2127                 tp = rcu_dereference(peer_device->connection->net_conf)->two_primaries;
2128                 rcu_read_unlock();
2129
2130                 if (!tp)
2131                         break;
2132
2133                 /* Only need to wait if two_primaries is enabled */
2134                 prepare_to_wait(&device->seq_wait, &wait, TASK_INTERRUPTIBLE);
2135                 spin_unlock(&device->peer_seq_lock);
2136                 rcu_read_lock();
2137                 timeout = rcu_dereference(peer_device->connection->net_conf)->ping_timeo*HZ/10;
2138                 rcu_read_unlock();
2139                 timeout = schedule_timeout(timeout);
2140                 spin_lock(&device->peer_seq_lock);
2141                 if (!timeout) {
2142                         ret = -ETIMEDOUT;
2143                         drbd_err(device, "Timed out waiting for missing ack packets; disconnecting\n");
2144                         break;
2145                 }
2146         }
2147         spin_unlock(&device->peer_seq_lock);
2148         finish_wait(&device->seq_wait, &wait);
2149         return ret;
2150 }
2151
2152 /* see also bio_flags_to_wire()
2153  * DRBD_REQ_*, because we need to semantically map the flags to data packet
2154  * flags and back. We may replicate to other kernel versions. */
2155 static unsigned long wire_flags_to_bio(u32 dpf)
2156 {
2157         return  (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
2158                 (dpf & DP_FUA ? REQ_FUA : 0) |
2159                 (dpf & DP_FLUSH ? REQ_FLUSH : 0) |
2160                 (dpf & DP_DISCARD ? REQ_DISCARD : 0);
2161 }
2162
2163 static void fail_postponed_requests(struct drbd_device *device, sector_t sector,
2164                                     unsigned int size)
2165 {
2166         struct drbd_interval *i;
2167
2168     repeat:
2169         drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2170                 struct drbd_request *req;
2171                 struct bio_and_error m;
2172
2173                 if (!i->local)
2174                         continue;
2175                 req = container_of(i, struct drbd_request, i);
2176                 if (!(req->rq_state & RQ_POSTPONED))
2177                         continue;
2178                 req->rq_state &= ~RQ_POSTPONED;
2179                 __req_mod(req, NEG_ACKED, &m);
2180                 spin_unlock_irq(&device->resource->req_lock);
2181                 if (m.bio)
2182                         complete_master_bio(device, &m);
2183                 spin_lock_irq(&device->resource->req_lock);
2184                 goto repeat;
2185         }
2186 }
2187
2188 static int handle_write_conflicts(struct drbd_device *device,
2189                                   struct drbd_peer_request *peer_req)
2190 {
2191         struct drbd_connection *connection = peer_req->peer_device->connection;
2192         bool resolve_conflicts = test_bit(RESOLVE_CONFLICTS, &connection->flags);
2193         sector_t sector = peer_req->i.sector;
2194         const unsigned int size = peer_req->i.size;
2195         struct drbd_interval *i;
2196         bool equal;
2197         int err;
2198
2199         /*
2200          * Inserting the peer request into the write_requests tree will prevent
2201          * new conflicting local requests from being added.
2202          */
2203         drbd_insert_interval(&device->write_requests, &peer_req->i);
2204
2205     repeat:
2206         drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2207                 if (i == &peer_req->i)
2208                         continue;
2209                 if (i->completed)
2210                         continue;
2211
2212                 if (!i->local) {
2213                         /*
2214                          * Our peer has sent a conflicting remote request; this
2215                          * should not happen in a two-node setup.  Wait for the
2216                          * earlier peer request to complete.
2217                          */
2218                         err = drbd_wait_misc(device, i);
2219                         if (err)
2220                                 goto out;
2221                         goto repeat;
2222                 }
2223
2224                 equal = i->sector == sector && i->size == size;
2225                 if (resolve_conflicts) {
2226                         /*
2227                          * If the peer request is fully contained within the
2228                          * overlapping request, it can be considered overwritten
2229                          * and thus superseded; otherwise, it will be retried
2230                          * once all overlapping requests have completed.
2231                          */
2232                         bool superseded = i->sector <= sector && i->sector +
2233                                        (i->size >> 9) >= sector + (size >> 9);
2234
2235                         if (!equal)
2236                                 drbd_alert(device, "Concurrent writes detected: "
2237                                                "local=%llus +%u, remote=%llus +%u, "
2238                                                "assuming %s came first\n",
2239                                           (unsigned long long)i->sector, i->size,
2240                                           (unsigned long long)sector, size,
2241                                           superseded ? "local" : "remote");
2242
2243                         peer_req->w.cb = superseded ? e_send_superseded :
2244                                                    e_send_retry_write;
2245                         list_add_tail(&peer_req->w.list, &device->done_ee);
2246                         queue_work(connection->ack_sender, &peer_req->peer_device->send_acks_work);
2247
2248                         err = -ENOENT;
2249                         goto out;
2250                 } else {
2251                         struct drbd_request *req =
2252                                 container_of(i, struct drbd_request, i);
2253
2254                         if (!equal)
2255                                 drbd_alert(device, "Concurrent writes detected: "
2256                                                "local=%llus +%u, remote=%llus +%u\n",
2257                                           (unsigned long long)i->sector, i->size,
2258                                           (unsigned long long)sector, size);
2259
2260                         if (req->rq_state & RQ_LOCAL_PENDING ||
2261                             !(req->rq_state & RQ_POSTPONED)) {
2262                                 /*
2263                                  * Wait for the node with the discard flag to
2264                                  * decide if this request has been superseded
2265                                  * or needs to be retried.
2266                                  * Requests that have been superseded will
2267                                  * disappear from the write_requests tree.
2268                                  *
2269                                  * In addition, wait for the conflicting
2270                                  * request to finish locally before submitting
2271                                  * the conflicting peer request.
2272                                  */
2273                                 err = drbd_wait_misc(device, &req->i);
2274                                 if (err) {
2275                                         _conn_request_state(connection, NS(conn, C_TIMEOUT), CS_HARD);
2276                                         fail_postponed_requests(device, sector, size);
2277                                         goto out;
2278                                 }
2279                                 goto repeat;
2280                         }
2281                         /*
2282                          * Remember to restart the conflicting requests after
2283                          * the new peer request has completed.
2284                          */
2285                         peer_req->flags |= EE_RESTART_REQUESTS;
2286                 }
2287         }
2288         err = 0;
2289
2290     out:
2291         if (err)
2292                 drbd_remove_epoch_entry_interval(device, peer_req);
2293         return err;
2294 }
2295
2296 /* mirrored write */
2297 static int receive_Data(struct drbd_connection *connection, struct packet_info *pi)
2298 {
2299         struct drbd_peer_device *peer_device;
2300         struct drbd_device *device;
2301         struct net_conf *nc;
2302         sector_t sector;
2303         struct drbd_peer_request *peer_req;
2304         struct p_data *p = pi->data;
2305         u32 peer_seq = be32_to_cpu(p->seq_num);
2306         int rw = WRITE;
2307         u32 dp_flags;
2308         int err, tp;
2309
2310         peer_device = conn_peer_device(connection, pi->vnr);
2311         if (!peer_device)
2312                 return -EIO;
2313         device = peer_device->device;
2314
2315         if (!get_ldev(device)) {
2316                 int err2;
2317
2318                 err = wait_for_and_update_peer_seq(peer_device, peer_seq);
2319                 drbd_send_ack_dp(peer_device, P_NEG_ACK, p, pi->size);
2320                 atomic_inc(&connection->current_epoch->epoch_size);
2321                 err2 = drbd_drain_block(peer_device, pi->size);
2322                 if (!err)
2323                         err = err2;
2324                 return err;
2325         }
2326
2327         /*
2328          * Corresponding put_ldev done either below (on various errors), or in
2329          * drbd_peer_request_endio, if we successfully submit the data at the
2330          * end of this function.
2331          */
2332
2333         sector = be64_to_cpu(p->sector);
2334         peer_req = read_in_block(peer_device, p->block_id, sector, pi);
2335         if (!peer_req) {
2336                 put_ldev(device);
2337                 return -EIO;
2338         }
2339
2340         peer_req->w.cb = e_end_block;
2341         peer_req->submit_jif = jiffies;
2342         peer_req->flags |= EE_APPLICATION;
2343
2344         dp_flags = be32_to_cpu(p->dp_flags);
2345         rw |= wire_flags_to_bio(dp_flags);
2346         if (pi->cmd == P_TRIM) {
2347                 struct request_queue *q = bdev_get_queue(device->ldev->backing_bdev);
2348                 peer_req->flags |= EE_IS_TRIM;
2349                 if (!blk_queue_discard(q))
2350                         peer_req->flags |= EE_IS_TRIM_USE_ZEROOUT;
2351                 D_ASSERT(peer_device, peer_req->i.size > 0);
2352                 D_ASSERT(peer_device, rw & REQ_DISCARD);
2353                 D_ASSERT(peer_device, peer_req->pages == NULL);
2354         } else if (peer_req->pages == NULL) {
2355                 D_ASSERT(device, peer_req->i.size == 0);
2356                 D_ASSERT(device, dp_flags & DP_FLUSH);
2357         }
2358
2359         if (dp_flags & DP_MAY_SET_IN_SYNC)
2360                 peer_req->flags |= EE_MAY_SET_IN_SYNC;
2361
2362         spin_lock(&connection->epoch_lock);
2363         peer_req->epoch = connection->current_epoch;
2364         atomic_inc(&peer_req->epoch->epoch_size);
2365         atomic_inc(&peer_req->epoch->active);
2366         spin_unlock(&connection->epoch_lock);
2367
2368         rcu_read_lock();
2369         nc = rcu_dereference(peer_device->connection->net_conf);
2370         tp = nc->two_primaries;
2371         if (peer_device->connection->agreed_pro_version < 100) {
2372                 switch (nc->wire_protocol) {
2373                 case DRBD_PROT_C:
2374                         dp_flags |= DP_SEND_WRITE_ACK;
2375                         break;
2376                 case DRBD_PROT_B:
2377                         dp_flags |= DP_SEND_RECEIVE_ACK;
2378                         break;
2379                 }
2380         }
2381         rcu_read_unlock();
2382
2383         if (dp_flags & DP_SEND_WRITE_ACK) {
2384                 peer_req->flags |= EE_SEND_WRITE_ACK;
2385                 inc_unacked(device);
2386                 /* corresponding dec_unacked() in e_end_block()
2387                  * respective _drbd_clear_done_ee */
2388         }
2389
2390         if (dp_flags & DP_SEND_RECEIVE_ACK) {
2391                 /* I really don't like it that the receiver thread
2392                  * sends on the msock, but anyways */
2393                 drbd_send_ack(peer_device, P_RECV_ACK, peer_req);
2394         }
2395
2396         if (tp) {
2397                 /* two primaries implies protocol C */
2398                 D_ASSERT(device, dp_flags & DP_SEND_WRITE_ACK);
2399                 peer_req->flags |= EE_IN_INTERVAL_TREE;
2400                 err = wait_for_and_update_peer_seq(peer_device, peer_seq);
2401                 if (err)
2402                         goto out_interrupted;
2403                 spin_lock_irq(&device->resource->req_lock);
2404                 err = handle_write_conflicts(device, peer_req);
2405                 if (err) {
2406                         spin_unlock_irq(&device->resource->req_lock);
2407                         if (err == -ENOENT) {
2408                                 put_ldev(device);
2409                                 return 0;
2410                         }
2411                         goto out_interrupted;
2412                 }
2413         } else {
2414                 update_peer_seq(peer_device, peer_seq);
2415                 spin_lock_irq(&device->resource->req_lock);
2416         }
2417         /* if we use the zeroout fallback code, we process synchronously
2418          * and we wait for all pending requests, respectively wait for
2419          * active_ee to become empty in drbd_submit_peer_request();
2420          * better not add ourselves here. */
2421         if ((peer_req->flags & EE_IS_TRIM_USE_ZEROOUT) == 0)
2422                 list_add_tail(&peer_req->w.list, &device->active_ee);
2423         spin_unlock_irq(&device->resource->req_lock);
2424
2425         if (device->state.conn == C_SYNC_TARGET)
2426                 wait_event(device->ee_wait, !overlapping_resync_write(device, peer_req));
2427
2428         if (device->state.pdsk < D_INCONSISTENT) {
2429                 /* In case we have the only disk of the cluster, */
2430                 drbd_set_out_of_sync(device, peer_req->i.sector, peer_req->i.size);
2431                 peer_req->flags &= ~EE_MAY_SET_IN_SYNC;
2432                 drbd_al_begin_io(device, &peer_req->i);
2433                 peer_req->flags |= EE_CALL_AL_COMPLETE_IO;
2434         }
2435
2436         err = drbd_submit_peer_request(device, peer_req, rw, DRBD_FAULT_DT_WR);
2437         if (!err)
2438                 return 0;
2439
2440         /* don't care for the reason here */
2441         drbd_err(device, "submit failed, triggering re-connect\n");
2442         spin_lock_irq(&device->resource->req_lock);
2443         list_del(&peer_req->w.list);
2444         drbd_remove_epoch_entry_interval(device, peer_req);
2445         spin_unlock_irq(&device->resource->req_lock);
2446         if (peer_req->flags & EE_CALL_AL_COMPLETE_IO) {
2447                 peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
2448                 drbd_al_complete_io(device, &peer_req->i);
2449         }
2450
2451 out_interrupted:
2452         drbd_may_finish_epoch(connection, peer_req->epoch, EV_PUT + EV_CLEANUP);
2453         put_ldev(device);
2454         drbd_free_peer_req(device, peer_req);
2455         return err;
2456 }
2457
2458 /* We may throttle resync, if the lower device seems to be busy,
2459  * and current sync rate is above c_min_rate.
2460  *
2461  * To decide whether or not the lower device is busy, we use a scheme similar
2462  * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
2463  * (more than 64 sectors) of activity we cannot account for with our own resync
2464  * activity, it obviously is "busy".
2465  *
2466  * The current sync rate used here uses only the most recent two step marks,
2467  * to have a short time average so we can react faster.
2468  */
2469 bool drbd_rs_should_slow_down(struct drbd_device *device, sector_t sector,
2470                 bool throttle_if_app_is_waiting)
2471 {
2472         struct lc_element *tmp;
2473         bool throttle = drbd_rs_c_min_rate_throttle(device);
2474
2475         if (!throttle || throttle_if_app_is_waiting)
2476                 return throttle;
2477
2478         spin_lock_irq(&device->al_lock);
2479         tmp = lc_find(device->resync, BM_SECT_TO_EXT(sector));
2480         if (tmp) {
2481                 struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
2482                 if (test_bit(BME_PRIORITY, &bm_ext->flags))
2483                         throttle = false;
2484                 /* Do not slow down if app IO is already waiting for this extent,
2485                  * and our progress is necessary for application IO to complete. */
2486         }
2487         spin_unlock_irq(&device->al_lock);
2488
2489         return throttle;
2490 }
2491
2492 bool drbd_rs_c_min_rate_throttle(struct drbd_device *device)
2493 {
2494         struct gendisk *disk = device->ldev->backing_bdev->bd_contains->bd_disk;
2495         unsigned long db, dt, dbdt;
2496         unsigned int c_min_rate;
2497         int curr_events;
2498
2499         rcu_read_lock();
2500         c_min_rate = rcu_dereference(device->ldev->disk_conf)->c_min_rate;
2501         rcu_read_unlock();
2502
2503         /* feature disabled? */
2504         if (c_min_rate == 0)
2505                 return false;
2506
2507         curr_events = (int)part_stat_read(&disk->part0, sectors[0]) +
2508                       (int)part_stat_read(&disk->part0, sectors[1]) -
2509                         atomic_read(&device->rs_sect_ev);
2510
2511         if (atomic_read(&device->ap_actlog_cnt)
2512             || curr_events - device->rs_last_events > 64) {
2513                 unsigned long rs_left;
2514                 int i;
2515
2516                 device->rs_last_events = curr_events;
2517
2518                 /* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
2519                  * approx. */
2520                 i = (device->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
2521
2522                 if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T)
2523                         rs_left = device->ov_left;
2524                 else
2525                         rs_left = drbd_bm_total_weight(device) - device->rs_failed;
2526
2527                 dt = ((long)jiffies - (long)device->rs_mark_time[i]) / HZ;
2528                 if (!dt)
2529                         dt++;
2530                 db = device->rs_mark_left[i] - rs_left;
2531                 dbdt = Bit2KB(db/dt);
2532
2533                 if (dbdt > c_min_rate)
2534                         return true;
2535         }
2536         return false;
2537 }
2538
2539 static int receive_DataRequest(struct drbd_connection *connection, struct packet_info *pi)
2540 {
2541         struct drbd_peer_device *peer_device;
2542         struct drbd_device *device;
2543         sector_t sector;
2544         sector_t capacity;
2545         struct drbd_peer_request *peer_req;
2546         struct digest_info *di = NULL;
2547         int size, verb;
2548         unsigned int fault_type;
2549         struct p_block_req *p = pi->data;
2550
2551         peer_device = conn_peer_device(connection, pi->vnr);
2552         if (!peer_device)
2553                 return -EIO;
2554         device = peer_device->device;
2555         capacity = drbd_get_capacity(device->this_bdev);
2556
2557         sector = be64_to_cpu(p->sector);
2558         size   = be32_to_cpu(p->blksize);
2559
2560         if (size <= 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_BIO_SIZE) {
2561                 drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2562                                 (unsigned long long)sector, size);
2563                 return -EINVAL;
2564         }
2565         if (sector + (size>>9) > capacity) {
2566                 drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2567                                 (unsigned long long)sector, size);
2568                 return -EINVAL;
2569         }
2570
2571         if (!get_ldev_if_state(device, D_UP_TO_DATE)) {
2572                 verb = 1;
2573                 switch (pi->cmd) {
2574                 case P_DATA_REQUEST:
2575                         drbd_send_ack_rp(peer_device, P_NEG_DREPLY, p);
2576                         break;
2577                 case P_RS_DATA_REQUEST:
2578                 case P_CSUM_RS_REQUEST:
2579                 case P_OV_REQUEST:
2580                         drbd_send_ack_rp(peer_device, P_NEG_RS_DREPLY , p);
2581                         break;
2582                 case P_OV_REPLY:
2583                         verb = 0;
2584                         dec_rs_pending(device);
2585                         drbd_send_ack_ex(peer_device, P_OV_RESULT, sector, size, ID_IN_SYNC);
2586                         break;
2587                 default:
2588                         BUG();
2589                 }
2590                 if (verb && __ratelimit(&drbd_ratelimit_state))
2591                         drbd_err(device, "Can not satisfy peer's read request, "
2592                             "no local data.\n");
2593
2594                 /* drain possibly payload */
2595                 return drbd_drain_block(peer_device, pi->size);
2596         }
2597
2598         /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2599          * "criss-cross" setup, that might cause write-out on some other DRBD,
2600          * which in turn might block on the other node at this very place.  */
2601         peer_req = drbd_alloc_peer_req(peer_device, p->block_id, sector, size,
2602                         true /* has real payload */, GFP_NOIO);
2603         if (!peer_req) {
2604                 put_ldev(device);
2605                 return -ENOMEM;
2606         }
2607
2608         switch (pi->cmd) {
2609         case P_DATA_REQUEST:
2610                 peer_req->w.cb = w_e_end_data_req;
2611                 fault_type = DRBD_FAULT_DT_RD;
2612                 /* application IO, don't drbd_rs_begin_io */
2613                 peer_req->flags |= EE_APPLICATION;
2614                 goto submit;
2615
2616         case P_RS_DATA_REQUEST:
2617                 peer_req->w.cb = w_e_end_rsdata_req;
2618                 fault_type = DRBD_FAULT_RS_RD;
2619                 /* used in the sector offset progress display */
2620                 device->bm_resync_fo = BM_SECT_TO_BIT(sector);
2621                 break;
2622
2623         case P_OV_REPLY:
2624         case P_CSUM_RS_REQUEST:
2625                 fault_type = DRBD_FAULT_RS_RD;
2626                 di = kmalloc(sizeof(*di) + pi->size, GFP_NOIO);
2627                 if (!di)
2628                         goto out_free_e;
2629
2630                 di->digest_size = pi->size;
2631                 di->digest = (((char *)di)+sizeof(struct digest_info));
2632
2633                 peer_req->digest = di;
2634                 peer_req->flags |= EE_HAS_DIGEST;
2635
2636                 if (drbd_recv_all(peer_device->connection, di->digest, pi->size))
2637                         goto out_free_e;
2638
2639                 if (pi->cmd == P_CSUM_RS_REQUEST) {
2640                         D_ASSERT(device, peer_device->connection->agreed_pro_version >= 89);
2641                         peer_req->w.cb = w_e_end_csum_rs_req;
2642                         /* used in the sector offset progress display */
2643                         device->bm_resync_fo = BM_SECT_TO_BIT(sector);
2644                         /* remember to report stats in drbd_resync_finished */
2645                         device->use_csums = true;
2646                 } else if (pi->cmd == P_OV_REPLY) {
2647                         /* track progress, we may need to throttle */
2648                         atomic_add(size >> 9, &device->rs_sect_in);
2649                         peer_req->w.cb = w_e_end_ov_reply;
2650                         dec_rs_pending(device);
2651                         /* drbd_rs_begin_io done when we sent this request,
2652                          * but accounting still needs to be done. */
2653                         goto submit_for_resync;
2654                 }
2655                 break;
2656
2657         case P_OV_REQUEST:
2658                 if (device->ov_start_sector == ~(sector_t)0 &&
2659                     peer_device->connection->agreed_pro_version >= 90) {
2660                         unsigned long now = jiffies;
2661                         int i;
2662                         device->ov_start_sector = sector;
2663                         device->ov_position = sector;
2664                         device->ov_left = drbd_bm_bits(device) - BM_SECT_TO_BIT(sector);
2665                         device->rs_total = device->ov_left;
2666                         for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2667                                 device->rs_mark_left[i] = device->ov_left;
2668                                 device->rs_mark_time[i] = now;
2669                         }
2670                         drbd_info(device, "Online Verify start sector: %llu\n",
2671                                         (unsigned long long)sector);
2672                 }
2673                 peer_req->w.cb = w_e_end_ov_req;
2674                 fault_type = DRBD_FAULT_RS_RD;
2675                 break;
2676
2677         default:
2678                 BUG();
2679         }
2680
2681         /* Throttle, drbd_rs_begin_io and submit should become asynchronous
2682          * wrt the receiver, but it is not as straightforward as it may seem.
2683          * Various places in the resync start and stop logic assume resync
2684          * requests are processed in order, requeuing this on the worker thread
2685          * introduces a bunch of new code for synchronization between threads.
2686          *
2687          * Unlimited throttling before drbd_rs_begin_io may stall the resync
2688          * "forever", throttling after drbd_rs_begin_io will lock that extent
2689          * for application writes for the same time.  For now, just throttle
2690          * here, where the rest of the code expects the receiver to sleep for
2691          * a while, anyways.
2692          */
2693
2694         /* Throttle before drbd_rs_begin_io, as that locks out application IO;
2695          * this defers syncer requests for some time, before letting at least
2696          * on request through.  The resync controller on the receiving side
2697          * will adapt to the incoming rate accordingly.
2698          *
2699          * We cannot throttle here if remote is Primary/SyncTarget:
2700          * we would also throttle its application reads.
2701          * In that case, throttling is done on the SyncTarget only.
2702          */
2703
2704         /* Even though this may be a resync request, we do add to "read_ee";
2705          * "sync_ee" is only used for resync WRITEs.
2706          * Add to list early, so debugfs can find this request
2707          * even if we have to sleep below. */
2708         spin_lock_irq(&device->resource->req_lock);
2709         list_add_tail(&peer_req->w.list, &device->read_ee);
2710         spin_unlock_irq(&device->resource->req_lock);
2711
2712         update_receiver_timing_details(connection, drbd_rs_should_slow_down);
2713         if (device->state.peer != R_PRIMARY
2714         && drbd_rs_should_slow_down(device, sector, false))
2715                 schedule_timeout_uninterruptible(HZ/10);
2716         update_receiver_timing_details(connection, drbd_rs_begin_io);
2717         if (drbd_rs_begin_io(device, sector))
2718                 goto out_free_e;
2719
2720 submit_for_resync:
2721         atomic_add(size >> 9, &device->rs_sect_ev);
2722
2723 submit:
2724         update_receiver_timing_details(connection, drbd_submit_peer_request);
2725         inc_unacked(device);
2726         if (drbd_submit_peer_request(device, peer_req, READ, fault_type) == 0)
2727                 return 0;
2728
2729         /* don't care for the reason here */
2730         drbd_err(device, "submit failed, triggering re-connect\n");
2731
2732 out_free_e:
2733         spin_lock_irq(&device->resource->req_lock);
2734         list_del(&peer_req->w.list);
2735         spin_unlock_irq(&device->resource->req_lock);
2736         /* no drbd_rs_complete_io(), we are dropping the connection anyways */
2737
2738         put_ldev(device);
2739         drbd_free_peer_req(device, peer_req);
2740         return -EIO;
2741 }
2742
2743 /**
2744  * drbd_asb_recover_0p  -  Recover after split-brain with no remaining primaries
2745  */
2746 static int drbd_asb_recover_0p(struct drbd_peer_device *peer_device) __must_hold(local)
2747 {
2748         struct drbd_device *device = peer_device->device;
2749         int self, peer, rv = -100;
2750         unsigned long ch_self, ch_peer;
2751         enum drbd_after_sb_p after_sb_0p;
2752
2753         self = device->ldev->md.uuid[UI_BITMAP] & 1;
2754         peer = device->p_uuid[UI_BITMAP] & 1;
2755
2756         ch_peer = device->p_uuid[UI_SIZE];
2757         ch_self = device->comm_bm_set;
2758
2759         rcu_read_lock();
2760         after_sb_0p = rcu_dereference(peer_device->connection->net_conf)->after_sb_0p;
2761         rcu_read_unlock();
2762         switch (after_sb_0p) {
2763         case ASB_CONSENSUS:
2764         case ASB_DISCARD_SECONDARY:
2765         case ASB_CALL_HELPER:
2766         case ASB_VIOLENTLY:
2767                 drbd_err(device, "Configuration error.\n");
2768                 break;
2769         case ASB_DISCONNECT:
2770                 break;
2771         case ASB_DISCARD_YOUNGER_PRI:
2772                 if (self == 0 && peer == 1) {
2773                         rv = -1;
2774                         break;
2775                 }
2776                 if (self == 1 && peer == 0) {
2777                         rv =  1;
2778                         break;
2779                 }
2780                 /* Else fall through to one of the other strategies... */
2781         case ASB_DISCARD_OLDER_PRI:
2782                 if (self == 0 && peer == 1) {
2783                         rv = 1;
2784                         break;
2785                 }
2786                 if (self == 1 && peer == 0) {
2787                         rv = -1;
2788                         break;
2789                 }
2790                 /* Else fall through to one of the other strategies... */
2791                 drbd_warn(device, "Discard younger/older primary did not find a decision\n"
2792                      "Using discard-least-changes instead\n");
2793         case ASB_DISCARD_ZERO_CHG:
2794                 if (ch_peer == 0 && ch_self == 0) {
2795                         rv = test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)
2796                                 ? -1 : 1;
2797                         break;
2798                 } else {
2799                         if (ch_peer == 0) { rv =  1; break; }
2800                         if (ch_self == 0) { rv = -1; break; }
2801                 }
2802                 if (after_sb_0p == ASB_DISCARD_ZERO_CHG)
2803                         break;
2804         case ASB_DISCARD_LEAST_CHG:
2805                 if      (ch_self < ch_peer)
2806                         rv = -1;
2807                 else if (ch_self > ch_peer)
2808                         rv =  1;
2809                 else /* ( ch_self == ch_peer ) */
2810                      /* Well, then use something else. */
2811                         rv = test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)
2812                                 ? -1 : 1;
2813                 break;
2814         case ASB_DISCARD_LOCAL:
2815                 rv = -1;
2816                 break;
2817         case ASB_DISCARD_REMOTE:
2818                 rv =  1;
2819         }
2820
2821         return rv;
2822 }
2823
2824 /**
2825  * drbd_asb_recover_1p  -  Recover after split-brain with one remaining primary
2826  */
2827 static int drbd_asb_recover_1p(struct drbd_peer_device *peer_device) __must_hold(local)
2828 {
2829         struct drbd_device *device = peer_device->device;
2830         int hg, rv = -100;
2831         enum drbd_after_sb_p after_sb_1p;
2832
2833         rcu_read_lock();
2834         after_sb_1p = rcu_dereference(peer_device->connection->net_conf)->after_sb_1p;
2835         rcu_read_unlock();
2836         switch (after_sb_1p) {
2837         case ASB_DISCARD_YOUNGER_PRI:
2838         case ASB_DISCARD_OLDER_PRI:
2839         case ASB_DISCARD_LEAST_CHG:
2840         case ASB_DISCARD_LOCAL:
2841         case ASB_DISCARD_REMOTE:
2842         case ASB_DISCARD_ZERO_CHG:
2843                 drbd_err(device, "Configuration error.\n");
2844                 break;
2845         case ASB_DISCONNECT:
2846                 break;
2847         case ASB_CONSENSUS:
2848                 hg = drbd_asb_recover_0p(peer_device);
2849                 if (hg == -1 && device->state.role == R_SECONDARY)
2850                         rv = hg;
2851                 if (hg == 1  && device->state.role == R_PRIMARY)
2852                         rv = hg;
2853                 break;
2854         case ASB_VIOLENTLY:
2855                 rv = drbd_asb_recover_0p(peer_device);
2856                 break;
2857         case ASB_DISCARD_SECONDARY:
2858                 return device->state.role == R_PRIMARY ? 1 : -1;
2859         case ASB_CALL_HELPER:
2860                 hg = drbd_asb_recover_0p(peer_device);
2861                 if (hg == -1 && device->state.role == R_PRIMARY) {
2862                         enum drbd_state_rv rv2;
2863
2864                          /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2865                           * we might be here in C_WF_REPORT_PARAMS which is transient.
2866                           * we do not need to wait for the after state change work either. */
2867                         rv2 = drbd_change_state(device, CS_VERBOSE, NS(role, R_SECONDARY));
2868                         if (rv2 != SS_SUCCESS) {
2869                                 drbd_khelper(device, "pri-lost-after-sb");
2870                         } else {
2871                                 drbd_warn(device, "Successfully gave up primary role.\n");
2872                                 rv = hg;
2873                         }
2874                 } else
2875                         rv = hg;
2876         }
2877
2878         return rv;
2879 }
2880
2881 /**
2882  * drbd_asb_recover_2p  -  Recover after split-brain with two remaining primaries
2883  */
2884 static int drbd_asb_recover_2p(struct drbd_peer_device *peer_device) __must_hold(local)
2885 {
2886         struct drbd_device *device = peer_device->device;
2887         int hg, rv = -100;
2888         enum drbd_after_sb_p after_sb_2p;
2889
2890         rcu_read_lock();
2891         after_sb_2p = rcu_dereference(peer_device->connection->net_conf)->after_sb_2p;
2892         rcu_read_unlock();
2893         switch (after_sb_2p) {
2894         case ASB_DISCARD_YOUNGER_PRI:
2895         case ASB_DISCARD_OLDER_PRI:
2896         case ASB_DISCARD_LEAST_CHG:
2897         case ASB_DISCARD_LOCAL:
2898         case ASB_DISCARD_REMOTE:
2899         case ASB_CONSENSUS:
2900         case ASB_DISCARD_SECONDARY:
2901         case ASB_DISCARD_ZERO_CHG:
2902                 drbd_err(device, "Configuration error.\n");
2903                 break;
2904         case ASB_VIOLENTLY:
2905                 rv = drbd_asb_recover_0p(peer_device);
2906                 break;
2907         case ASB_DISCONNECT:
2908                 break;
2909         case ASB_CALL_HELPER:
2910                 hg = drbd_asb_recover_0p(peer_device);
2911                 if (hg == -1) {
2912                         enum drbd_state_rv rv2;
2913
2914                          /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2915                           * we might be here in C_WF_REPORT_PARAMS which is transient.
2916                           * we do not need to wait for the after state change work either. */
2917                         rv2 = drbd_change_state(device, CS_VERBOSE, NS(role, R_SECONDARY));
2918                         if (rv2 != SS_SUCCESS) {
2919                                 drbd_khelper(device, "pri-lost-after-sb");
2920                         } else {
2921                                 drbd_warn(device, "Successfully gave up primary role.\n");
2922                                 rv = hg;
2923                         }
2924                 } else
2925                         rv = hg;
2926         }
2927
2928         return rv;
2929 }
2930
2931 static void drbd_uuid_dump(struct drbd_device *device, char *text, u64 *uuid,
2932                            u64 bits, u64 flags)
2933 {
2934         if (!uuid) {
2935                 drbd_info(device, "%s uuid info vanished while I was looking!\n", text);
2936                 return;
2937         }
2938         drbd_info(device, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
2939              text,
2940              (unsigned long long)uuid[UI_CURRENT],
2941              (unsigned long long)uuid[UI_BITMAP],
2942              (unsigned long long)uuid[UI_HISTORY_START],
2943              (unsigned long long)uuid[UI_HISTORY_END],
2944              (unsigned long long)bits,
2945              (unsigned long long)flags);
2946 }
2947
2948 /*
2949   100   after split brain try auto recover
2950     2   C_SYNC_SOURCE set BitMap
2951     1   C_SYNC_SOURCE use BitMap
2952     0   no Sync
2953    -1   C_SYNC_TARGET use BitMap
2954    -2   C_SYNC_TARGET set BitMap
2955  -100   after split brain, disconnect
2956 -1000   unrelated data
2957 -1091   requires proto 91
2958 -1096   requires proto 96
2959  */
2960 static int drbd_uuid_compare(struct drbd_device *const device, int *rule_nr) __must_hold(local)
2961 {
2962         struct drbd_peer_device *const peer_device = first_peer_device(device);
2963         struct drbd_connection *const connection = peer_device ? peer_device->connection : NULL;
2964         u64 self, peer;
2965         int i, j;
2966
2967         self = device->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2968         peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
2969
2970         *rule_nr = 10;
2971         if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
2972                 return 0;
2973
2974         *rule_nr = 20;
2975         if ((self == UUID_JUST_CREATED || self == (u64)0) &&
2976              peer != UUID_JUST_CREATED)
2977                 return -2;
2978
2979         *rule_nr = 30;
2980         if (self != UUID_JUST_CREATED &&
2981             (peer == UUID_JUST_CREATED || peer == (u64)0))
2982                 return 2;
2983
2984         if (self == peer) {
2985                 int rct, dc; /* roles at crash time */
2986
2987                 if (device->p_uuid[UI_BITMAP] == (u64)0 && device->ldev->md.uuid[UI_BITMAP] != (u64)0) {
2988
2989                         if (connection->agreed_pro_version < 91)
2990                                 return -1091;
2991
2992                         if ((device->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
2993                             (device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
2994                                 drbd_info(device, "was SyncSource, missed the resync finished event, corrected myself:\n");
2995                                 drbd_uuid_move_history(device);
2996                                 device->ldev->md.uuid[UI_HISTORY_START] = device->ldev->md.uuid[UI_BITMAP];
2997                                 device->ldev->md.uuid[UI_BITMAP] = 0;
2998
2999                                 drbd_uuid_dump(device, "self", device->ldev->md.uuid,
3000                                                device->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(device) : 0, 0);
3001                                 *rule_nr = 34;
3002                         } else {
3003                                 drbd_info(device, "was SyncSource (peer failed to write sync_uuid)\n");
3004                                 *rule_nr = 36;
3005                         }
3006
3007                         return 1;
3008                 }
3009
3010                 if (device->ldev->md.uuid[UI_BITMAP] == (u64)0 && device->p_uuid[UI_BITMAP] != (u64)0) {
3011
3012                         if (connection->agreed_pro_version < 91)
3013                                 return -1091;
3014
3015                         if ((device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_BITMAP] & ~((u64)1)) &&
3016                             (device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
3017                                 drbd_info(device, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
3018
3019                                 device->p_uuid[UI_HISTORY_START + 1] = device->p_uuid[UI_HISTORY_START];
3020                                 device->p_uuid[UI_HISTORY_START] = device->p_uuid[UI_BITMAP];
3021                                 device->p_uuid[UI_BITMAP] = 0UL;
3022
3023                                 drbd_uuid_dump(device, "peer", device->p_uuid, device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
3024                                 *rule_nr = 35;
3025                         } else {
3026                                 drbd_info(device, "was SyncTarget (failed to write sync_uuid)\n");
3027                                 *rule_nr = 37;
3028                         }
3029
3030                         return -1;
3031                 }
3032
3033                 /* Common power [off|failure] */
3034                 rct = (test_bit(CRASHED_PRIMARY, &device->flags) ? 1 : 0) +
3035                         (device->p_uuid[UI_FLAGS] & 2);
3036                 /* lowest bit is set when we were primary,
3037                  * next bit (weight 2) is set when peer was primary */
3038                 *rule_nr = 40;
3039
3040                 switch (rct) {
3041                 case 0: /* !self_pri && !peer_pri */ return 0;
3042                 case 1: /*  self_pri && !peer_pri */ return 1;
3043                 case 2: /* !self_pri &&  peer_pri */ return -1;
3044                 case 3: /*  self_pri &&  peer_pri */
3045                         dc = test_bit(RESOLVE_CONFLICTS, &connection->flags);
3046                         return dc ? -1 : 1;
3047                 }
3048         }
3049
3050         *rule_nr = 50;
3051         peer = device->p_uuid[UI_BITMAP] & ~((u64)1);
3052         if (self == peer)
3053                 return -1;
3054
3055         *rule_nr = 51;
3056         peer = device->p_uuid[UI_HISTORY_START] & ~((u64)1);
3057         if (self == peer) {
3058                 if (connection->agreed_pro_version < 96 ?
3059                     (device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
3060                     (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
3061                     peer + UUID_NEW_BM_OFFSET == (device->p_uuid[UI_BITMAP] & ~((u64)1))) {
3062                         /* The last P_SYNC_UUID did not get though. Undo the last start of
3063                            resync as sync source modifications of the peer's UUIDs. */
3064
3065                         if (connection->agreed_pro_version < 91)
3066                                 return -1091;
3067
3068                         device->p_uuid[UI_BITMAP] = device->p_uuid[UI_HISTORY_START];
3069                         device->p_uuid[UI_HISTORY_START] = device->p_uuid[UI_HISTORY_START + 1];
3070
3071                         drbd_info(device, "Lost last syncUUID packet, corrected:\n");
3072                         drbd_uuid_dump(device, "peer", device->p_uuid, device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
3073
3074                         return -1;
3075                 }
3076         }
3077
3078         *rule_nr = 60;
3079         self = device->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
3080         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3081                 peer = device->p_uuid[i] & ~((u64)1);
3082                 if (self == peer)
3083                         return -2;
3084         }
3085
3086         *rule_nr = 70;
3087         self = device->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
3088         peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
3089         if (self == peer)
3090                 return 1;
3091
3092         *rule_nr = 71;
3093         self = device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
3094         if (self == peer) {
3095                 if (connection->agreed_pro_version < 96 ?
3096                     (device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
3097                     (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
3098                     self + UUID_NEW_BM_OFFSET == (device->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
3099                         /* The last P_SYNC_UUID did not get though. Undo the last start of
3100                            resync as sync source modifications of our UUIDs. */
3101
3102                         if (connection->agreed_pro_version < 91)
3103                                 return -1091;
3104
3105                         __drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_HISTORY_START]);
3106                         __drbd_uuid_set(device, UI_HISTORY_START, device->ldev->md.uuid[UI_HISTORY_START + 1]);
3107
3108                         drbd_info(device, "Last syncUUID did not get through, corrected:\n");
3109                         drbd_uuid_dump(device, "self", device->ldev->md.uuid,
3110                                        device->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(device) : 0, 0);
3111
3112                         return 1;
3113                 }
3114         }
3115
3116
3117         *rule_nr = 80;
3118         peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
3119         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3120                 self = device->ldev->md.uuid[i] & ~((u64)1);
3121                 if (self == peer)
3122                         return 2;
3123         }
3124
3125         *rule_nr = 90;
3126         self = device->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
3127         peer = device->p_uuid[UI_BITMAP] & ~((u64)1);
3128         if (self == peer && self != ((u64)0))
3129                 return 100;
3130
3131         *rule_nr = 100;
3132         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3133                 self = device->ldev->md.uuid[i] & ~((u64)1);
3134                 for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
3135                         peer = device->p_uuid[j] & ~((u64)1);
3136                         if (self == peer)
3137                                 return -100;
3138                 }
3139         }
3140
3141         return -1000;
3142 }
3143
3144 /* drbd_sync_handshake() returns the new conn state on success, or
3145    CONN_MASK (-1) on failure.
3146  */
3147 static enum drbd_conns drbd_sync_handshake(struct drbd_peer_device *peer_device,
3148                                            enum drbd_role peer_role,
3149                                            enum drbd_disk_state peer_disk) __must_hold(local)
3150 {
3151         struct drbd_device *device = peer_device->device;
3152         enum drbd_conns rv = C_MASK;
3153         enum drbd_disk_state mydisk;
3154         struct net_conf *nc;
3155         int hg, rule_nr, rr_conflict, tentative;
3156
3157         mydisk = device->state.disk;
3158         if (mydisk == D_NEGOTIATING)
3159                 mydisk = device->new_state_tmp.disk;
3160
3161         drbd_info(device, "drbd_sync_handshake:\n");
3162
3163         spin_lock_irq(&device->ldev->md.uuid_lock);
3164         drbd_uuid_dump(device, "self", device->ldev->md.uuid, device->comm_bm_set, 0);
3165         drbd_uuid_dump(device, "peer", device->p_uuid,
3166                        device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
3167
3168         hg = drbd_uuid_compare(device, &rule_nr);
3169         spin_unlock_irq(&device->ldev->md.uuid_lock);
3170
3171         drbd_info(device, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
3172
3173         if (hg == -1000) {
3174                 drbd_alert(device, "Unrelated data, aborting!\n");
3175                 return C_MASK;
3176         }
3177         if (hg < -1000) {
3178                 drbd_alert(device, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
3179                 return C_MASK;
3180         }
3181
3182         if    ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
3183             (peer_disk == D_INCONSISTENT && mydisk    > D_INCONSISTENT)) {
3184                 int f = (hg == -100) || abs(hg) == 2;
3185                 hg = mydisk > D_INCONSISTENT ? 1 : -1;
3186                 if (f)
3187                         hg = hg*2;
3188                 drbd_info(device, "Becoming sync %s due to disk states.\n",
3189                      hg > 0 ? "source" : "target");
3190         }
3191
3192         if (abs(hg) == 100)
3193                 drbd_khelper(device, "initial-split-brain");
3194
3195         rcu_read_lock();
3196         nc = rcu_dereference(peer_device->connection->net_conf);
3197
3198         if (hg == 100 || (hg == -100 && nc->always_asbp)) {
3199                 int pcount = (device->state.role == R_PRIMARY)
3200                            + (peer_role == R_PRIMARY);
3201                 int forced = (hg == -100);
3202
3203                 switch (pcount) {
3204                 case 0:
3205                         hg = drbd_asb_recover_0p(peer_device);
3206                         break;
3207                 case 1:
3208                         hg = drbd_asb_recover_1p(peer_device);
3209                         break;
3210                 case 2:
3211                         hg = drbd_asb_recover_2p(peer_device);
3212                         break;
3213                 }
3214                 if (abs(hg) < 100) {
3215                         drbd_warn(device, "Split-Brain detected, %d primaries, "
3216                              "automatically solved. Sync from %s node\n",
3217                              pcount, (hg < 0) ? "peer" : "this");
3218                         if (forced) {
3219                                 drbd_warn(device, "Doing a full sync, since"
3220                                      " UUIDs where ambiguous.\n");
3221                                 hg = hg*2;
3222                         }
3223                 }
3224         }
3225
3226         if (hg == -100) {
3227                 if (test_bit(DISCARD_MY_DATA, &device->flags) && !(device->p_uuid[UI_FLAGS]&1))
3228                         hg = -1;
3229                 if (!test_bit(DISCARD_MY_DATA, &device->flags) && (device->p_uuid[UI_FLAGS]&1))
3230                         hg = 1;
3231
3232                 if (abs(hg) < 100)
3233                         drbd_warn(device, "Split-Brain detected, manually solved. "
3234                              "Sync from %s node\n",
3235                              (hg < 0) ? "peer" : "this");
3236         }
3237         rr_conflict = nc->rr_conflict;
3238         tentative = nc->tentative;
3239         rcu_read_unlock();
3240
3241         if (hg == -100) {
3242                 /* FIXME this log message is not correct if we end up here
3243                  * after an attempted attach on a diskless node.
3244                  * We just refuse to attach -- well, we drop the "connection"
3245                  * to that disk, in a way... */
3246                 drbd_alert(device, "Split-Brain detected but unresolved, dropping connection!\n");
3247                 drbd_khelper(device, "split-brain");
3248                 return C_MASK;
3249         }
3250
3251         if (hg > 0 && mydisk <= D_INCONSISTENT) {
3252                 drbd_err(device, "I shall become SyncSource, but I am inconsistent!\n");
3253                 return C_MASK;
3254         }
3255
3256         if (hg < 0 && /* by intention we do not use mydisk here. */
3257             device->state.role == R_PRIMARY && device->state.disk >= D_CONSISTENT) {
3258                 switch (rr_conflict) {
3259                 case ASB_CALL_HELPER:
3260                         drbd_khelper(device, "pri-lost");
3261                         /* fall through */
3262                 case ASB_DISCONNECT:
3263                         drbd_err(device, "I shall become SyncTarget, but I am primary!\n");
3264                         return C_MASK;
3265                 case ASB_VIOLENTLY:
3266                         drbd_warn(device, "Becoming SyncTarget, violating the stable-data"
3267                              "assumption\n");
3268                 }
3269         }
3270
3271         if (tentative || test_bit(CONN_DRY_RUN, &peer_device->connection->flags)) {
3272                 if (hg == 0)
3273                         drbd_info(device, "dry-run connect: No resync, would become Connected immediately.\n");
3274                 else
3275                         drbd_info(device, "dry-run connect: Would become %s, doing a %s resync.",
3276                                  drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
3277                                  abs(hg) >= 2 ? "full" : "bit-map based");
3278                 return C_MASK;
3279         }
3280
3281         if (abs(hg) >= 2) {
3282                 drbd_info(device, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
3283                 if (drbd_bitmap_io(device, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
3284                                         BM_LOCKED_SET_ALLOWED))
3285                         return C_MASK;
3286         }
3287
3288         if (hg > 0) { /* become sync source. */
3289                 rv = C_WF_BITMAP_S;
3290         } else if (hg < 0) { /* become sync target */
3291                 rv = C_WF_BITMAP_T;
3292         } else {
3293                 rv = C_CONNECTED;
3294                 if (drbd_bm_total_weight(device)) {
3295                         drbd_info(device, "No resync, but %lu bits in bitmap!\n",
3296                              drbd_bm_total_weight(device));
3297                 }
3298         }
3299
3300         return rv;
3301 }
3302
3303 static enum drbd_after_sb_p convert_after_sb(enum drbd_after_sb_p peer)
3304 {
3305         /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
3306         if (peer == ASB_DISCARD_REMOTE)
3307                 return ASB_DISCARD_LOCAL;
3308
3309         /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
3310         if (peer == ASB_DISCARD_LOCAL)
3311                 return ASB_DISCARD_REMOTE;
3312
3313         /* everything else is valid if they are equal on both sides. */
3314         return peer;
3315 }
3316
3317 static int receive_protocol(struct drbd_connection *connection, struct packet_info *pi)
3318 {
3319         struct p_protocol *p = pi->data;
3320         enum drbd_after_sb_p p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
3321         int p_proto, p_discard_my_data, p_two_primaries, cf;
3322         struct net_conf *nc, *old_net_conf, *new_net_conf = NULL;
3323         char integrity_alg[SHARED_SECRET_MAX] = "";
3324         struct crypto_ahash *peer_integrity_tfm = NULL;
3325         void *int_dig_in = NULL, *int_dig_vv = NULL;
3326
3327         p_proto         = be32_to_cpu(p->protocol);
3328         p_after_sb_0p   = be32_to_cpu(p->after_sb_0p);
3329         p_after_sb_1p   = be32_to_cpu(p->after_sb_1p);
3330         p_after_sb_2p   = be32_to_cpu(p->after_sb_2p);
3331         p_two_primaries = be32_to_cpu(p->two_primaries);
3332         cf              = be32_to_cpu(p->conn_flags);
3333         p_discard_my_data = cf & CF_DISCARD_MY_DATA;
3334
3335         if (connection->agreed_pro_version >= 87) {
3336                 int err;
3337
3338                 if (pi->size > sizeof(integrity_alg))
3339                         return -EIO;
3340                 err = drbd_recv_all(connection, integrity_alg, pi->size);
3341                 if (err)
3342                         return err;
3343                 integrity_alg[SHARED_SECRET_MAX - 1] = 0;
3344         }
3345
3346         if (pi->cmd != P_PROTOCOL_UPDATE) {
3347                 clear_bit(CONN_DRY_RUN, &connection->flags);
3348
3349                 if (cf & CF_DRY_RUN)
3350                         set_bit(CONN_DRY_RUN, &connection->flags);
3351
3352                 rcu_read_lock();
3353                 nc = rcu_dereference(connection->net_conf);
3354
3355                 if (p_proto != nc->wire_protocol) {
3356                         drbd_err(connection, "incompatible %s settings\n", "protocol");
3357                         goto disconnect_rcu_unlock;
3358                 }
3359
3360                 if (convert_after_sb(p_after_sb_0p) != nc->after_sb_0p) {
3361                         drbd_err(connection, "incompatible %s settings\n", "after-sb-0pri");
3362                         goto disconnect_rcu_unlock;
3363                 }
3364
3365                 if (convert_after_sb(p_after_sb_1p) != nc->after_sb_1p) {
3366                         drbd_err(connection, "incompatible %s settings\n", "after-sb-1pri");
3367                         goto disconnect_rcu_unlock;
3368                 }
3369
3370                 if (convert_after_sb(p_after_sb_2p) != nc->after_sb_2p) {
3371                         drbd_err(connection, "incompatible %s settings\n", "after-sb-2pri");
3372                         goto disconnect_rcu_unlock;
3373                 }
3374
3375                 if (p_discard_my_data && nc->discard_my_data) {
3376                         drbd_err(connection, "incompatible %s settings\n", "discard-my-data");
3377                         goto disconnect_rcu_unlock;
3378                 }
3379
3380                 if (p_two_primaries != nc->two_primaries) {
3381                         drbd_err(connection, "incompatible %s settings\n", "allow-two-primaries");
3382                         goto disconnect_rcu_unlock;
3383                 }
3384
3385                 if (strcmp(integrity_alg, nc->integrity_alg)) {
3386                         drbd_err(connection, "incompatible %s settings\n", "data-integrity-alg");
3387                         goto disconnect_rcu_unlock;
3388                 }
3389
3390                 rcu_read_unlock();
3391         }
3392
3393         if (integrity_alg[0]) {
3394                 int hash_size;
3395
3396                 /*
3397                  * We can only change the peer data integrity algorithm
3398                  * here.  Changing our own data integrity algorithm
3399                  * requires that we send a P_PROTOCOL_UPDATE packet at
3400                  * the same time; otherwise, the peer has no way to
3401                  * tell between which packets the algorithm should
3402                  * change.
3403                  */
3404
3405                 peer_integrity_tfm = crypto_alloc_ahash(integrity_alg, 0, CRYPTO_ALG_ASYNC);
3406                 if (!peer_integrity_tfm) {
3407                         drbd_err(connection, "peer data-integrity-alg %s not supported\n",
3408                                  integrity_alg);
3409                         goto disconnect;
3410                 }
3411
3412                 hash_size = crypto_ahash_digestsize(peer_integrity_tfm);
3413                 int_dig_in = kmalloc(hash_size, GFP_KERNEL);
3414                 int_dig_vv = kmalloc(hash_size, GFP_KERNEL);
3415                 if (!(int_dig_in && int_dig_vv)) {
3416                         drbd_err(connection, "Allocation of buffers for data integrity checking failed\n");
3417                         goto disconnect;
3418                 }
3419         }
3420
3421         new_net_conf = kmalloc(sizeof(struct net_conf), GFP_KERNEL);
3422         if (!new_net_conf) {
3423                 drbd_err(connection, "Allocation of new net_conf failed\n");
3424                 goto disconnect;
3425         }
3426
3427         mutex_lock(&connection->data.mutex);
3428         mutex_lock(&connection->resource->conf_update);
3429         old_net_conf = connection->net_conf;
3430         *new_net_conf = *old_net_conf;
3431
3432         new_net_conf->wire_protocol = p_proto;
3433         new_net_conf->after_sb_0p = convert_after_sb(p_after_sb_0p);
3434         new_net_conf->after_sb_1p = convert_after_sb(p_after_sb_1p);
3435         new_net_conf->after_sb_2p = convert_after_sb(p_after_sb_2p);
3436         new_net_conf->two_primaries = p_two_primaries;
3437
3438         rcu_assign_pointer(connection->net_conf, new_net_conf);
3439         mutex_unlock(&connection->resource->conf_update);
3440         mutex_unlock(&connection->data.mutex);
3441
3442         crypto_free_ahash(connection->peer_integrity_tfm);
3443         kfree(connection->int_dig_in);
3444         kfree(connection->int_dig_vv);
3445         connection->peer_integrity_tfm = peer_integrity_tfm;
3446         connection->int_dig_in = int_dig_in;
3447         connection->int_dig_vv = int_dig_vv;
3448
3449         if (strcmp(old_net_conf->integrity_alg, integrity_alg))
3450                 drbd_info(connection, "peer data-integrity-alg: %s\n",
3451                           integrity_alg[0] ? integrity_alg : "(none)");
3452
3453         synchronize_rcu();
3454         kfree(old_net_conf);
3455         return 0;
3456
3457 disconnect_rcu_unlock:
3458         rcu_read_unlock();
3459 disconnect:
3460         crypto_free_ahash(peer_integrity_tfm);
3461         kfree(int_dig_in);
3462         kfree(int_dig_vv);
3463         conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
3464         return -EIO;
3465 }
3466
3467 /* helper function
3468  * input: alg name, feature name
3469  * return: NULL (alg name was "")
3470  *         ERR_PTR(error) if something goes wrong
3471  *         or the crypto hash ptr, if it worked out ok. */
3472 static struct crypto_ahash *drbd_crypto_alloc_digest_safe(const struct drbd_device *device,
3473                 const char *alg, const char *name)
3474 {
3475         struct crypto_ahash *tfm;
3476
3477         if (!alg[0])
3478                 return NULL;
3479
3480         tfm = crypto_alloc_ahash(alg, 0, CRYPTO_ALG_ASYNC);
3481         if (IS_ERR(tfm)) {
3482                 drbd_err(device, "Can not allocate \"%s\" as %s (reason: %ld)\n",
3483                         alg, name, PTR_ERR(tfm));
3484                 return tfm;
3485         }
3486         return tfm;
3487 }
3488
3489 static int ignore_remaining_packet(struct drbd_connection *connection, struct packet_info *pi)
3490 {
3491         void *buffer = connection->data.rbuf;
3492         int size = pi->size;
3493
3494         while (size) {
3495                 int s = min_t(int, size, DRBD_SOCKET_BUFFER_SIZE);
3496                 s = drbd_recv(connection, buffer, s);
3497                 if (s <= 0) {
3498                         if (s < 0)
3499                                 return s;
3500                         break;
3501                 }
3502                 size -= s;
3503         }
3504         if (size)
3505                 return -EIO;
3506         return 0;
3507 }
3508
3509 /*
3510  * config_unknown_volume  -  device configuration command for unknown volume
3511  *
3512  * When a device is added to an existing connection, the node on which the
3513  * device is added first will send configuration commands to its peer but the
3514  * peer will not know about the device yet.  It will warn and ignore these
3515  * commands.  Once the device is added on the second node, the second node will
3516  * send the same device configuration commands, but in the other direction.
3517  *
3518  * (We can also end up here if drbd is misconfigured.)
3519  */
3520 static int config_unknown_volume(struct drbd_connection *connection, struct packet_info *pi)
3521 {
3522         drbd_warn(connection, "%s packet received for volume %u, which is not configured locally\n",
3523                   cmdname(pi->cmd), pi->vnr);
3524         return ignore_remaining_packet(connection, pi);
3525 }
3526
3527 static int receive_SyncParam(struct drbd_connection *connection, struct packet_info *pi)
3528 {
3529         struct drbd_peer_device *peer_device;
3530         struct drbd_device *device;
3531         struct p_rs_param_95 *p;
3532         unsigned int header_size, data_size, exp_max_sz;
3533         struct crypto_ahash *verify_tfm = NULL;
3534         struct crypto_ahash *csums_tfm = NULL;
3535         struct net_conf *old_net_conf, *new_net_conf = NULL;
3536         struct disk_conf *old_disk_conf = NULL, *new_disk_conf = NULL;
3537         const int apv = connection->agreed_pro_version;
3538         struct fifo_buffer *old_plan = NULL, *new_plan = NULL;
3539         int fifo_size = 0;
3540         int err;
3541
3542         peer_device = conn_peer_device(connection, pi->vnr);
3543         if (!peer_device)
3544                 return config_unknown_volume(connection, pi);
3545         device = peer_device->device;
3546
3547         exp_max_sz  = apv <= 87 ? sizeof(struct p_rs_param)
3548                     : apv == 88 ? sizeof(struct p_rs_param)
3549                                         + SHARED_SECRET_MAX
3550                     : apv <= 94 ? sizeof(struct p_rs_param_89)
3551                     : /* apv >= 95 */ sizeof(struct p_rs_param_95);
3552
3553         if (pi->size > exp_max_sz) {
3554                 drbd_err(device, "SyncParam packet too long: received %u, expected <= %u bytes\n",
3555                     pi->size, exp_max_sz);
3556                 return -EIO;
3557         }
3558
3559         if (apv <= 88) {
3560                 header_size = sizeof(struct p_rs_param);
3561                 data_size = pi->size - header_size;
3562         } else if (apv <= 94) {
3563                 header_size = sizeof(struct p_rs_param_89);
3564                 data_size = pi->size - header_size;
3565                 D_ASSERT(device, data_size == 0);
3566         } else {
3567                 header_size = sizeof(struct p_rs_param_95);
3568                 data_size = pi->size - header_size;
3569                 D_ASSERT(device, data_size == 0);
3570         }
3571
3572         /* initialize verify_alg and csums_alg */
3573         p = pi->data;
3574         memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
3575
3576         err = drbd_recv_all(peer_device->connection, p, header_size);
3577         if (err)
3578                 return err;
3579
3580         mutex_lock(&connection->resource->conf_update);
3581         old_net_conf = peer_device->connection->net_conf;
3582         if (get_ldev(device)) {
3583                 new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3584                 if (!new_disk_conf) {
3585                         put_ldev(device);
3586                         mutex_unlock(&connection->resource->conf_update);
3587                         drbd_err(device, "Allocation of new disk_conf failed\n");
3588                         return -ENOMEM;
3589                 }
3590
3591                 old_disk_conf = device->ldev->disk_conf;
3592                 *new_disk_conf = *old_disk_conf;
3593
3594                 new_disk_conf->resync_rate = be32_to_cpu(p->resync_rate);
3595         }
3596
3597         if (apv >= 88) {
3598                 if (apv == 88) {
3599                         if (data_size > SHARED_SECRET_MAX || data_size == 0) {
3600                                 drbd_err(device, "verify-alg of wrong size, "
3601                                         "peer wants %u, accepting only up to %u byte\n",
3602                                         data_size, SHARED_SECRET_MAX);
3603                                 err = -EIO;
3604                                 goto reconnect;
3605                         }
3606
3607                         err = drbd_recv_all(peer_device->connection, p->verify_alg, data_size);
3608                         if (err)
3609                                 goto reconnect;
3610                         /* we expect NUL terminated string */
3611                         /* but just in case someone tries to be evil */
3612                         D_ASSERT(device, p->verify_alg[data_size-1] == 0);
3613                         p->verify_alg[data_size-1] = 0;
3614
3615                 } else /* apv >= 89 */ {
3616                         /* we still expect NUL terminated strings */
3617                         /* but just in case someone tries to be evil */
3618                         D_ASSERT(device, p->verify_alg[SHARED_SECRET_MAX-1] == 0);
3619                         D_ASSERT(device, p->csums_alg[SHARED_SECRET_MAX-1] == 0);
3620                         p->verify_alg[SHARED_SECRET_MAX-1] = 0;
3621                         p->csums_alg[SHARED_SECRET_MAX-1] = 0;
3622                 }
3623
3624                 if (strcmp(old_net_conf->verify_alg, p->verify_alg)) {
3625                         if (device->state.conn == C_WF_REPORT_PARAMS) {
3626                                 drbd_err(device, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
3627                                     old_net_conf->verify_alg, p->verify_alg);
3628                                 goto disconnect;
3629                         }
3630                         verify_tfm = drbd_crypto_alloc_digest_safe(device,
3631                                         p->verify_alg, "verify-alg");
3632                         if (IS_ERR(verify_tfm)) {
3633                                 verify_tfm = NULL;
3634                                 goto disconnect;
3635                         }
3636                 }
3637
3638                 if (apv >= 89 && strcmp(old_net_conf->csums_alg, p->csums_alg)) {
3639                         if (device->state.conn == C_WF_REPORT_PARAMS) {
3640                                 drbd_err(device, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
3641                                     old_net_conf->csums_alg, p->csums_alg);
3642                                 goto disconnect;
3643                         }
3644                         csums_tfm = drbd_crypto_alloc_digest_safe(device,
3645                                         p->csums_alg, "csums-alg");
3646                         if (IS_ERR(csums_tfm)) {
3647                                 csums_tfm = NULL;
3648                                 goto disconnect;
3649                         }
3650                 }
3651
3652                 if (apv > 94 && new_disk_conf) {
3653                         new_disk_conf->c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
3654                         new_disk_conf->c_delay_target = be32_to_cpu(p->c_delay_target);
3655                         new_disk_conf->c_fill_target = be32_to_cpu(p->c_fill_target);
3656                         new_disk_conf->c_max_rate = be32_to_cpu(p->c_max_rate);
3657
3658                         fifo_size = (new_disk_conf->c_plan_ahead * 10 * SLEEP_TIME) / HZ;
3659                         if (fifo_size != device->rs_plan_s->size) {
3660                                 new_plan = fifo_alloc(fifo_size);
3661                                 if (!new_plan) {
3662                                         drbd_err(device, "kmalloc of fifo_buffer failed");
3663                                         put_ldev(device);
3664                                         goto disconnect;
3665                                 }
3666                         }
3667                 }
3668
3669                 if (verify_tfm || csums_tfm) {
3670                         new_net_conf = kzalloc(sizeof(struct net_conf), GFP_KERNEL);
3671                         if (!new_net_conf) {
3672                                 drbd_err(device, "Allocation of new net_conf failed\n");
3673                                 goto disconnect;
3674                         }
3675
3676                         *new_net_conf = *old_net_conf;
3677
3678                         if (verify_tfm) {
3679                                 strcpy(new_net_conf->verify_alg, p->verify_alg);
3680                                 new_net_conf->verify_alg_len = strlen(p->verify_alg) + 1;
3681                                 crypto_free_ahash(peer_device->connection->verify_tfm);
3682                                 peer_device->connection->verify_tfm = verify_tfm;
3683                                 drbd_info(device, "using verify-alg: \"%s\"\n", p->verify_alg);
3684                         }
3685                         if (csums_tfm) {
3686                                 strcpy(new_net_conf->csums_alg, p->csums_alg);
3687                                 new_net_conf->csums_alg_len = strlen(p->csums_alg) + 1;
3688                                 crypto_free_ahash(peer_device->connection->csums_tfm);
3689                                 peer_device->connection->csums_tfm = csums_tfm;
3690                                 drbd_info(device, "using csums-alg: \"%s\"\n", p->csums_alg);
3691                         }
3692                         rcu_assign_pointer(connection->net_conf, new_net_conf);
3693                 }
3694         }
3695
3696         if (new_disk_conf) {
3697                 rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf);
3698                 put_ldev(device);
3699         }
3700
3701         if (new_plan) {
3702                 old_plan = device->rs_plan_s;
3703                 rcu_assign_pointer(device->rs_plan_s, new_plan);
3704         }
3705
3706         mutex_unlock(&connection->resource->conf_update);
3707         synchronize_rcu();
3708         if (new_net_conf)
3709                 kfree(old_net_conf);
3710         kfree(old_disk_conf);
3711         kfree(old_plan);
3712
3713         return 0;
3714
3715 reconnect:
3716         if (new_disk_conf) {
3717                 put_ldev(device);
3718                 kfree(new_disk_conf);
3719         }
3720         mutex_unlock(&connection->resource->conf_update);
3721         return -EIO;
3722
3723 disconnect:
3724         kfree(new_plan);
3725         if (new_disk_conf) {
3726                 put_ldev(device);
3727                 kfree(new_disk_conf);
3728         }
3729         mutex_unlock(&connection->resource->conf_update);
3730         /* just for completeness: actually not needed,
3731          * as this is not reached if csums_tfm was ok. */
3732         crypto_free_ahash(csums_tfm);
3733         /* but free the verify_tfm again, if csums_tfm did not work out */
3734         crypto_free_ahash(verify_tfm);
3735         conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
3736         return -EIO;
3737 }
3738
3739 /* warn if the arguments differ by more than 12.5% */
3740 static void warn_if_differ_considerably(struct drbd_device *device,
3741         const char *s, sector_t a, sector_t b)
3742 {
3743         sector_t d;
3744         if (a == 0 || b == 0)
3745                 return;
3746         d = (a > b) ? (a - b) : (b - a);
3747         if (d > (a>>3) || d > (b>>3))
3748                 drbd_warn(device, "Considerable difference in %s: %llus vs. %llus\n", s,
3749                      (unsigned long long)a, (unsigned long long)b);
3750 }
3751
3752 static int receive_sizes(struct drbd_connection *connection, struct packet_info *pi)
3753 {
3754         struct drbd_peer_device *peer_device;
3755         struct drbd_device *device;
3756         struct p_sizes *p = pi->data;
3757         enum determine_dev_size dd = DS_UNCHANGED;
3758         sector_t p_size, p_usize, p_csize, my_usize;
3759         int ldsc = 0; /* local disk size changed */
3760         enum dds_flags ddsf;
3761
3762         peer_device = conn_peer_device(connection, pi->vnr);
3763         if (!peer_device)
3764                 return config_unknown_volume(connection, pi);
3765         device = peer_device->device;
3766
3767         p_size = be64_to_cpu(p->d_size);
3768         p_usize = be64_to_cpu(p->u_size);
3769         p_csize = be64_to_cpu(p->c_size);
3770
3771         /* just store the peer's disk size for now.
3772          * we still need to figure out whether we accept that. */
3773         device->p_size = p_size;
3774
3775         if (get_ldev(device)) {
3776                 rcu_read_lock();
3777                 my_usize = rcu_dereference(device->ldev->disk_conf)->disk_size;
3778                 rcu_read_unlock();
3779
3780                 warn_if_differ_considerably(device, "lower level device sizes",
3781                            p_size, drbd_get_max_capacity(device->ldev));
3782                 warn_if_differ_considerably(device, "user requested size",
3783                                             p_usize, my_usize);
3784
3785                 /* if this is the first connect, or an otherwise expected
3786                  * param exchange, choose the minimum */
3787                 if (device->state.conn == C_WF_REPORT_PARAMS)
3788                         p_usize = min_not_zero(my_usize, p_usize);
3789
3790                 /* Never shrink a device with usable data during connect.
3791                    But allow online shrinking if we are connected. */
3792                 if (drbd_new_dev_size(device, device->ldev, p_usize, 0) <
3793                     drbd_get_capacity(device->this_bdev) &&
3794                     device->state.disk >= D_OUTDATED &&
3795                     device->state.conn < C_CONNECTED) {
3796                         drbd_err(device, "The peer's disk size is too small!\n");
3797                         conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
3798                         put_ldev(device);
3799                         return -EIO;
3800                 }
3801
3802                 if (my_usize != p_usize) {
3803                         struct disk_conf *old_disk_conf, *new_disk_conf = NULL;
3804
3805                         new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3806                         if (!new_disk_conf) {
3807                                 drbd_err(device, "Allocation of new disk_conf failed\n");
3808                                 put_ldev(device);
3809                                 return -ENOMEM;
3810                         }
3811
3812                         mutex_lock(&connection->resource->conf_update);
3813                         old_disk_conf = device->ldev->disk_conf;
3814                         *new_disk_conf = *old_disk_conf;
3815                         new_disk_conf->disk_size = p_usize;
3816
3817                         rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf);
3818                         mutex_unlock(&connection->resource->conf_update);
3819                         synchronize_rcu();
3820                         kfree(old_disk_conf);
3821
3822                         drbd_info(device, "Peer sets u_size to %lu sectors\n",
3823                                  (unsigned long)my_usize);
3824                 }
3825
3826                 put_ldev(device);
3827         }
3828
3829         device->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
3830         /* Leave drbd_reconsider_max_bio_size() before drbd_determine_dev_size().
3831            In case we cleared the QUEUE_FLAG_DISCARD from our queue in
3832            drbd_reconsider_max_bio_size(), we can be sure that after
3833            drbd_determine_dev_size() no REQ_DISCARDs are in the queue. */
3834
3835         ddsf = be16_to_cpu(p->dds_flags);
3836         if (get_ldev(device)) {
3837                 drbd_reconsider_max_bio_size(device, device->ldev);
3838                 dd = drbd_determine_dev_size(device, ddsf, NULL);
3839                 put_ldev(device);
3840                 if (dd == DS_ERROR)
3841                         return -EIO;
3842                 drbd_md_sync(device);
3843         } else {
3844                 /*
3845                  * I am diskless, need to accept the peer's *current* size.
3846                  * I must NOT accept the peers backing disk size,
3847                  * it may have been larger than mine all along...
3848                  *
3849                  * At this point, the peer knows more about my disk, or at
3850                  * least about what we last agreed upon, than myself.
3851                  * So if his c_size is less than his d_size, the most likely
3852                  * reason is that *my* d_size was smaller last time we checked.
3853                  *
3854                  * However, if he sends a zero current size,
3855                  * take his (user-capped or) backing disk size anyways.
3856                  */
3857                 drbd_reconsider_max_bio_size(device, NULL);
3858                 drbd_set_my_capacity(device, p_csize ?: p_usize ?: p_size);
3859         }
3860
3861         if (get_ldev(device)) {
3862                 if (device->ldev->known_size != drbd_get_capacity(device->ldev->backing_bdev)) {
3863                         device->ldev->known_size = drbd_get_capacity(device->ldev->backing_bdev);
3864                         ldsc = 1;
3865                 }
3866
3867                 put_ldev(device);
3868         }
3869
3870         if (device->state.conn > C_WF_REPORT_PARAMS) {
3871                 if (be64_to_cpu(p->c_size) !=
3872                     drbd_get_capacity(device->this_bdev) || ldsc) {
3873                         /* we have different sizes, probably peer
3874                          * needs to know my new size... */
3875                         drbd_send_sizes(peer_device, 0, ddsf);
3876                 }
3877                 if (test_and_clear_bit(RESIZE_PENDING, &device->flags) ||
3878                     (dd == DS_GREW && device->state.conn == C_CONNECTED)) {
3879                         if (device->state.pdsk >= D_INCONSISTENT &&
3880                             device->state.disk >= D_INCONSISTENT) {
3881                                 if (ddsf & DDSF_NO_RESYNC)
3882                                         drbd_info(device, "Resync of new storage suppressed with --assume-clean\n");
3883                                 else
3884                                         resync_after_online_grow(device);
3885                         } else
3886                                 set_bit(RESYNC_AFTER_NEG, &device->flags);
3887                 }
3888         }
3889
3890         return 0;
3891 }
3892
3893 static int receive_uuids(struct drbd_connection *connection, struct packet_info *pi)
3894 {
3895         struct drbd_peer_device *peer_device;
3896         struct drbd_device *device;
3897         struct p_uuids *p = pi->data;
3898         u64 *p_uuid;
3899         int i, updated_uuids = 0;
3900
3901         peer_device = conn_peer_device(connection, pi->vnr);
3902         if (!peer_device)
3903                 return config_unknown_volume(connection, pi);
3904         device = peer_device->device;
3905
3906         p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
3907         if (!p_uuid) {
3908                 drbd_err(device, "kmalloc of p_uuid failed\n");
3909                 return false;
3910         }
3911
3912         for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
3913                 p_uuid[i] = be64_to_cpu(p->uuid[i]);
3914
3915         kfree(device->p_uuid);
3916         device->p_uuid = p_uuid;
3917
3918         if (device->state.conn < C_CONNECTED &&
3919             device->state.disk < D_INCONSISTENT &&
3920             device->state.role == R_PRIMARY &&
3921             (device->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
3922                 drbd_err(device, "Can only connect to data with current UUID=%016llX\n",
3923                     (unsigned long long)device->ed_uuid);
3924                 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
3925                 return -EIO;
3926         }
3927
3928         if (get_ldev(device)) {
3929                 int skip_initial_sync =
3930                         device->state.conn == C_CONNECTED &&
3931                         peer_device->connection->agreed_pro_version >= 90 &&
3932                         device->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
3933                         (p_uuid[UI_FLAGS] & 8);
3934                 if (skip_initial_sync) {
3935                         drbd_info(device, "Accepted new current UUID, preparing to skip initial sync\n");
3936                         drbd_bitmap_io(device, &drbd_bmio_clear_n_write,
3937                                         "clear_n_write from receive_uuids",
3938                                         BM_LOCKED_TEST_ALLOWED);
3939                         _drbd_uuid_set(device, UI_CURRENT, p_uuid[UI_CURRENT]);
3940                         _drbd_uuid_set(device, UI_BITMAP, 0);
3941                         _drbd_set_state(_NS2(device, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
3942                                         CS_VERBOSE, NULL);
3943                         drbd_md_sync(device);
3944                         updated_uuids = 1;
3945                 }
3946                 put_ldev(device);
3947         } else if (device->state.disk < D_INCONSISTENT &&
3948                    device->state.role == R_PRIMARY) {
3949                 /* I am a diskless primary, the peer just created a new current UUID
3950                    for me. */
3951                 updated_uuids = drbd_set_ed_uuid(device, p_uuid[UI_CURRENT]);
3952         }
3953
3954         /* Before we test for the disk state, we should wait until an eventually
3955            ongoing cluster wide state change is finished. That is important if
3956            we are primary and are detaching from our disk. We need to see the
3957            new disk state... */
3958         mutex_lock(device->state_mutex);
3959         mutex_unlock(device->state_mutex);
3960         if (device->state.conn >= C_CONNECTED && device->state.disk < D_INCONSISTENT)
3961                 updated_uuids |= drbd_set_ed_uuid(device, p_uuid[UI_CURRENT]);
3962
3963         if (updated_uuids)
3964                 drbd_print_uuids(device, "receiver updated UUIDs to");
3965
3966         return 0;
3967 }
3968
3969 /**
3970  * convert_state() - Converts the peer's view of the cluster state to our point of view
3971  * @ps:         The state as seen by the peer.
3972  */
3973 static union drbd_state convert_state(union drbd_state ps)
3974 {
3975         union drbd_state ms;
3976
3977         static enum drbd_conns c_tab[] = {
3978                 [C_WF_REPORT_PARAMS] = C_WF_REPORT_PARAMS,
3979                 [C_CONNECTED] = C_CONNECTED,
3980
3981                 [C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
3982                 [C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
3983                 [C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
3984                 [C_VERIFY_S]       = C_VERIFY_T,
3985                 [C_MASK]   = C_MASK,
3986         };
3987
3988         ms.i = ps.i;
3989
3990         ms.conn = c_tab[ps.conn];
3991         ms.peer = ps.role;
3992         ms.role = ps.peer;
3993         ms.pdsk = ps.disk;
3994         ms.disk = ps.pdsk;
3995         ms.peer_isp = (ps.aftr_isp | ps.user_isp);
3996
3997         return ms;
3998 }
3999
4000 static int receive_req_state(struct drbd_connection *connection, struct packet_info *pi)
4001 {
4002         struct drbd_peer_device *peer_device;
4003         struct drbd_device *device;
4004         struct p_req_state *p = pi->data;
4005         union drbd_state mask, val;
4006         enum drbd_state_rv rv;
4007
4008         peer_device = conn_peer_device(connection, pi->vnr);
4009         if (!peer_device)
4010                 return -EIO;
4011         device = peer_device->device;
4012
4013         mask.i = be32_to_cpu(p->mask);
4014         val.i = be32_to_cpu(p->val);
4015
4016         if (test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags) &&
4017             mutex_is_locked(device->state_mutex)) {
4018                 drbd_send_sr_reply(peer_device, SS_CONCURRENT_ST_CHG);
4019                 return 0;
4020         }
4021
4022         mask = convert_state(mask);
4023         val = convert_state(val);
4024
4025         rv = drbd_change_state(device, CS_VERBOSE, mask, val);
4026         drbd_send_sr_reply(peer_device, rv);
4027
4028         drbd_md_sync(device);
4029
4030         return 0;
4031 }
4032
4033 static int receive_req_conn_state(struct drbd_connection *connection, struct packet_info *pi)
4034 {
4035         struct p_req_state *p = pi->data;
4036         union drbd_state mask, val;
4037         enum drbd_state_rv rv;
4038
4039         mask.i = be32_to_cpu(p->mask);
4040         val.i = be32_to_cpu(p->val);
4041
4042         if (test_bit(RESOLVE_CONFLICTS, &connection->flags) &&
4043             mutex_is_locked(&connection->cstate_mutex)) {
4044                 conn_send_sr_reply(connection, SS_CONCURRENT_ST_CHG);
4045                 return 0;
4046         }
4047
4048         mask = convert_state(mask);
4049         val = convert_state(val);
4050
4051         rv = conn_request_state(connection, mask, val, CS_VERBOSE | CS_LOCAL_ONLY | CS_IGN_OUTD_FAIL);
4052         conn_send_sr_reply(connection, rv);
4053
4054         return 0;
4055 }
4056
4057 static int receive_state(struct drbd_connection *connection, struct packet_info *pi)
4058 {
4059         struct drbd_peer_device *peer_device;
4060         struct drbd_device *device;
4061         struct p_state *p = pi->data;
4062         union drbd_state os, ns, peer_state;
4063         enum drbd_disk_state real_peer_disk;
4064         enum chg_state_flags cs_flags;
4065         int rv;
4066
4067         peer_device = conn_peer_device(connection, pi->vnr);
4068         if (!peer_device)
4069                 return config_unknown_volume(connection, pi);
4070         device = peer_device->device;
4071
4072         peer_state.i = be32_to_cpu(p->state);
4073
4074         real_peer_disk = peer_state.disk;
4075         if (peer_state.disk == D_NEGOTIATING) {
4076                 real_peer_disk = device->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
4077                 drbd_info(device, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
4078         }
4079
4080         spin_lock_irq(&device->resource->req_lock);
4081  retry:
4082         os = ns = drbd_read_state(device);
4083         spin_unlock_irq(&device->resource->req_lock);
4084
4085         /* If some other part of the code (ack_receiver thread, timeout)
4086          * already decided to close the connection again,
4087          * we must not "re-establish" it here. */
4088         if (os.conn <= C_TEAR_DOWN)
4089                 return -ECONNRESET;
4090
4091         /* If this is the "end of sync" confirmation, usually the peer disk
4092          * transitions from D_INCONSISTENT to D_UP_TO_DATE. For empty (0 bits
4093          * set) resync started in PausedSyncT, or if the timing of pause-/
4094          * unpause-sync events has been "just right", the peer disk may
4095          * transition from D_CONSISTENT to D_UP_TO_DATE as well.
4096          */
4097         if ((os.pdsk == D_INCONSISTENT || os.pdsk == D_CONSISTENT) &&
4098             real_peer_disk == D_UP_TO_DATE &&
4099             os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
4100                 /* If we are (becoming) SyncSource, but peer is still in sync
4101                  * preparation, ignore its uptodate-ness to avoid flapping, it
4102                  * will change to inconsistent once the peer reaches active
4103                  * syncing states.
4104                  * It may have changed syncer-paused flags, however, so we
4105                  * cannot ignore this completely. */
4106                 if (peer_state.conn > C_CONNECTED &&
4107                     peer_state.conn < C_SYNC_SOURCE)
4108                         real_peer_disk = D_INCONSISTENT;
4109
4110                 /* if peer_state changes to connected at the same time,
4111                  * it explicitly notifies us that it finished resync.
4112                  * Maybe we should finish it up, too? */
4113                 else if (os.conn >= C_SYNC_SOURCE &&
4114                          peer_state.conn == C_CONNECTED) {
4115                         if (drbd_bm_total_weight(device) <= device->rs_failed)
4116                                 drbd_resync_finished(device);
4117                         return 0;
4118                 }
4119         }
4120
4121         /* explicit verify finished notification, stop sector reached. */
4122         if (os.conn == C_VERIFY_T && os.disk == D_UP_TO_DATE &&
4123             peer_state.conn == C_CONNECTED && real_peer_disk == D_UP_TO_DATE) {
4124                 ov_out_of_sync_print(device);
4125                 drbd_resync_finished(device);
4126                 return 0;
4127         }
4128
4129         /* peer says his disk is inconsistent, while we think it is uptodate,
4130          * and this happens while the peer still thinks we have a sync going on,
4131          * but we think we are already done with the sync.
4132          * We ignore this to avoid flapping pdsk.
4133          * This should not happen, if the peer is a recent version of drbd. */
4134         if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
4135             os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
4136                 real_peer_disk = D_UP_TO_DATE;
4137
4138         if (ns.conn == C_WF_REPORT_PARAMS)
4139                 ns.conn = C_CONNECTED;
4140
4141         if (peer_state.conn == C_AHEAD)
4142                 ns.conn = C_BEHIND;
4143
4144         if (device->p_uuid && peer_state.disk >= D_NEGOTIATING &&
4145             get_ldev_if_state(device, D_NEGOTIATING)) {
4146                 int cr; /* consider resync */
4147
4148                 /* if we established a new connection */
4149                 cr  = (os.conn < C_CONNECTED);
4150                 /* if we had an established connection
4151                  * and one of the nodes newly attaches a disk */
4152                 cr |= (os.conn == C_CONNECTED &&
4153                        (peer_state.disk == D_NEGOTIATING ||
4154                         os.disk == D_NEGOTIATING));
4155                 /* if we have both been inconsistent, and the peer has been
4156                  * forced to be UpToDate with --overwrite-data */
4157                 cr |= test_bit(CONSIDER_RESYNC, &device->flags);
4158                 /* if we had been plain connected, and the admin requested to
4159                  * start a sync by "invalidate" or "invalidate-remote" */
4160                 cr |= (os.conn == C_CONNECTED &&
4161                                 (peer_state.conn >= C_STARTING_SYNC_S &&
4162                                  peer_state.conn <= C_WF_BITMAP_T));
4163
4164                 if (cr)
4165                         ns.conn = drbd_sync_handshake(peer_device, peer_state.role, real_peer_disk);
4166
4167                 put_ldev(device);
4168                 if (ns.conn == C_MASK) {
4169                         ns.conn = C_CONNECTED;
4170                         if (device->state.disk == D_NEGOTIATING) {
4171                                 drbd_force_state(device, NS(disk, D_FAILED));
4172                         } else if (peer_state.disk == D_NEGOTIATING) {
4173                                 drbd_err(device, "Disk attach process on the peer node was aborted.\n");
4174                                 peer_state.disk = D_DISKLESS;
4175                                 real_peer_disk = D_DISKLESS;
4176                         } else {
4177                                 if (test_and_clear_bit(CONN_DRY_RUN, &peer_device->connection->flags))
4178                                         return -EIO;
4179                                 D_ASSERT(device, os.conn == C_WF_REPORT_PARAMS);
4180                                 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4181                                 return -EIO;
4182                         }
4183                 }
4184         }
4185
4186         spin_lock_irq(&device->resource->req_lock);
4187         if (os.i != drbd_read_state(device).i)
4188                 goto retry;
4189         clear_bit(CONSIDER_RESYNC, &device->flags);
4190         ns.peer = peer_state.role;
4191         ns.pdsk = real_peer_disk;
4192         ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
4193         if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
4194                 ns.disk = device->new_state_tmp.disk;
4195         cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
4196         if (ns.pdsk == D_CONSISTENT && drbd_suspended(device) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
4197             test_bit(NEW_CUR_UUID, &device->flags)) {
4198                 /* Do not allow tl_restart(RESEND) for a rebooted peer. We can only allow this
4199                    for temporal network outages! */
4200                 spin_unlock_irq(&device->resource->req_lock);
4201                 drbd_err(device, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
4202                 tl_clear(peer_device->connection);
4203                 drbd_uuid_new_current(device);
4204                 clear_bit(NEW_CUR_UUID, &device->flags);
4205                 conn_request_state(peer_device->connection, NS2(conn, C_PROTOCOL_ERROR, susp, 0), CS_HARD);
4206                 return -EIO;
4207         }
4208         rv = _drbd_set_state(device, ns, cs_flags, NULL);
4209         ns = drbd_read_state(device);
4210         spin_unlock_irq(&device->resource->req_lock);
4211
4212         if (rv < SS_SUCCESS) {
4213                 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4214                 return -EIO;
4215         }
4216
4217         if (os.conn > C_WF_REPORT_PARAMS) {
4218                 if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
4219                     peer_state.disk != D_NEGOTIATING ) {
4220                         /* we want resync, peer has not yet decided to sync... */
4221                         /* Nowadays only used when forcing a node into primary role and
4222                            setting its disk to UpToDate with that */
4223                         drbd_send_uuids(peer_device);
4224                         drbd_send_current_state(peer_device);
4225                 }
4226         }
4227
4228         clear_bit(DISCARD_MY_DATA, &device->flags);
4229
4230         drbd_md_sync(device); /* update connected indicator, la_size_sect, ... */
4231
4232         return 0;
4233 }
4234
4235 static int receive_sync_uuid(struct drbd_connection *connection, struct packet_info *pi)
4236 {
4237         struct drbd_peer_device *peer_device;
4238         struct drbd_device *device;
4239         struct p_rs_uuid *p = pi->data;
4240
4241         peer_device = conn_peer_device(connection, pi->vnr);
4242         if (!peer_device)
4243                 return -EIO;
4244         device = peer_device->device;
4245
4246         wait_event(device->misc_wait,
4247                    device->state.conn == C_WF_SYNC_UUID ||
4248                    device->state.conn == C_BEHIND ||
4249                    device->state.conn < C_CONNECTED ||
4250                    device->state.disk < D_NEGOTIATING);
4251
4252         /* D_ASSERT(device,  device->state.conn == C_WF_SYNC_UUID ); */
4253
4254         /* Here the _drbd_uuid_ functions are right, current should
4255            _not_ be rotated into the history */
4256         if (get_ldev_if_state(device, D_NEGOTIATING)) {
4257                 _drbd_uuid_set(device, UI_CURRENT, be64_to_cpu(p->uuid));
4258                 _drbd_uuid_set(device, UI_BITMAP, 0UL);
4259
4260                 drbd_print_uuids(device, "updated sync uuid");
4261                 drbd_start_resync(device, C_SYNC_TARGET);
4262
4263                 put_ldev(device);
4264         } else
4265                 drbd_err(device, "Ignoring SyncUUID packet!\n");
4266
4267         return 0;
4268 }
4269
4270 /**
4271  * receive_bitmap_plain
4272  *
4273  * Return 0 when done, 1 when another iteration is needed, and a negative error
4274  * code upon failure.
4275  */
4276 static int
4277 receive_bitmap_plain(struct drbd_peer_device *peer_device, unsigned int size,
4278                      unsigned long *p, struct bm_xfer_ctx *c)
4279 {
4280         unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE -
4281                                  drbd_header_size(peer_device->connection);
4282         unsigned int num_words = min_t(size_t, data_size / sizeof(*p),
4283                                        c->bm_words - c->word_offset);
4284         unsigned int want = num_words * sizeof(*p);
4285         int err;
4286
4287         if (want != size) {
4288                 drbd_err(peer_device, "%s:want (%u) != size (%u)\n", __func__, want, size);
4289                 return -EIO;
4290         }
4291         if (want == 0)
4292                 return 0;
4293         err = drbd_recv_all(peer_device->connection, p, want);
4294         if (err)
4295                 return err;
4296
4297         drbd_bm_merge_lel(peer_device->device, c->word_offset, num_words, p);
4298
4299         c->word_offset += num_words;
4300         c->bit_offset = c->word_offset * BITS_PER_LONG;
4301         if (c->bit_offset > c->bm_bits)
4302                 c->bit_offset = c->bm_bits;
4303
4304         return 1;
4305 }
4306
4307 static enum drbd_bitmap_code dcbp_get_code(struct p_compressed_bm *p)
4308 {
4309         return (enum drbd_bitmap_code)(p->encoding & 0x0f);
4310 }
4311
4312 static int dcbp_get_start(struct p_compressed_bm *p)
4313 {
4314         return (p->encoding & 0x80) != 0;
4315 }
4316
4317 static int dcbp_get_pad_bits(struct p_compressed_bm *p)
4318 {
4319         return (p->encoding >> 4) & 0x7;
4320 }
4321
4322 /**
4323  * recv_bm_rle_bits
4324  *
4325  * Return 0 when done, 1 when another iteration is needed, and a negative error
4326  * code upon failure.
4327  */
4328 static int
4329 recv_bm_rle_bits(struct drbd_peer_device *peer_device,
4330                 struct p_compressed_bm *p,
4331                  struct bm_xfer_ctx *c,
4332                  unsigned int len)
4333 {
4334         struct bitstream bs;
4335         u64 look_ahead;
4336         u64 rl;
4337         u64 tmp;
4338         unsigned long s = c->bit_offset;
4339         unsigned long e;
4340         int toggle = dcbp_get_start(p);
4341         int have;
4342         int bits;
4343
4344         bitstream_init(&bs, p->code, len, dcbp_get_pad_bits(p));
4345
4346         bits = bitstream_get_bits(&bs, &look_ahead, 64);
4347         if (bits < 0)
4348                 return -EIO;
4349
4350         for (have = bits; have > 0; s += rl, toggle = !toggle) {
4351                 bits = vli_decode_bits(&rl, look_ahead);
4352                 if (bits <= 0)
4353                         return -EIO;
4354
4355                 if (toggle) {
4356                         e = s + rl -1;
4357                         if (e >= c->bm_bits) {
4358                                 drbd_err(peer_device, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
4359                                 return -EIO;
4360                         }
4361                         _drbd_bm_set_bits(peer_device->device, s, e);
4362                 }
4363
4364                 if (have < bits) {
4365                         drbd_err(peer_device, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
4366                                 have, bits, look_ahead,
4367                                 (unsigned int)(bs.cur.b - p->code),
4368                                 (unsigned int)bs.buf_len);
4369                         return -EIO;
4370                 }
4371                 /* if we consumed all 64 bits, assign 0; >> 64 is "undefined"; */
4372                 if (likely(bits < 64))
4373                         look_ahead >>= bits;
4374                 else
4375                         look_ahead = 0;
4376                 have -= bits;
4377
4378                 bits = bitstream_get_bits(&bs, &tmp, 64 - have);
4379                 if (bits < 0)
4380                         return -EIO;
4381                 look_ahead |= tmp << have;
4382                 have += bits;
4383         }
4384
4385         c->bit_offset = s;
4386         bm_xfer_ctx_bit_to_word_offset(c);
4387
4388         return (s != c->bm_bits);
4389 }
4390
4391 /**
4392  * decode_bitmap_c
4393  *
4394  * Return 0 when done, 1 when another iteration is needed, and a negative error
4395  * code upon failure.
4396  */
4397 static int
4398 decode_bitmap_c(struct drbd_peer_device *peer_device,
4399                 struct p_compressed_bm *p,
4400                 struct bm_xfer_ctx *c,
4401                 unsigned int len)
4402 {
4403         if (dcbp_get_code(p) == RLE_VLI_Bits)
4404                 return recv_bm_rle_bits(peer_device, p, c, len - sizeof(*p));
4405
4406         /* other variants had been implemented for evaluation,
4407          * but have been dropped as this one turned out to be "best"
4408          * during all our tests. */
4409
4410         drbd_err(peer_device, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
4411         conn_request_state(peer_device->connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4412         return -EIO;
4413 }
4414
4415 void INFO_bm_xfer_stats(struct drbd_device *device,
4416                 const char *direction, struct bm_xfer_ctx *c)
4417 {
4418         /* what would it take to transfer it "plaintext" */
4419         unsigned int header_size = drbd_header_size(first_peer_device(device)->connection);
4420         unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE - header_size;
4421         unsigned int plain =
4422                 header_size * (DIV_ROUND_UP(c->bm_words, data_size) + 1) +
4423                 c->bm_words * sizeof(unsigned long);
4424         unsigned int total = c->bytes[0] + c->bytes[1];
4425         unsigned int r;
4426
4427         /* total can not be zero. but just in case: */
4428         if (total == 0)
4429                 return;
4430
4431         /* don't report if not compressed */
4432         if (total >= plain)
4433                 return;
4434
4435         /* total < plain. check for overflow, still */
4436         r = (total > UINT_MAX/1000) ? (total / (plain/1000))
4437                                     : (1000 * total / plain);
4438
4439         if (r > 1000)
4440                 r = 1000;
4441
4442         r = 1000 - r;
4443         drbd_info(device, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
4444              "total %u; compression: %u.%u%%\n",
4445                         direction,
4446                         c->bytes[1], c->packets[1],
4447                         c->bytes[0], c->packets[0],
4448                         total, r/10, r % 10);
4449 }
4450
4451 /* Since we are processing the bitfield from lower addresses to higher,
4452    it does not matter if the process it in 32 bit chunks or 64 bit
4453    chunks as long as it is little endian. (Understand it as byte stream,
4454    beginning with the lowest byte...) If we would use big endian
4455    we would need to process it from the highest address to the lowest,
4456    in order to be agnostic to the 32 vs 64 bits issue.
4457
4458    returns 0 on failure, 1 if we successfully received it. */
4459 static int receive_bitmap(struct drbd_connection *connection, struct packet_info *pi)
4460 {
4461         struct drbd_peer_device *peer_device;
4462         struct drbd_device *device;
4463         struct bm_xfer_ctx c;
4464         int err;
4465
4466         peer_device = conn_peer_device(connection, pi->vnr);
4467         if (!peer_device)
4468                 return -EIO;
4469         device = peer_device->device;
4470
4471         drbd_bm_lock(device, "receive bitmap", BM_LOCKED_SET_ALLOWED);
4472         /* you are supposed to send additional out-of-sync information
4473          * if you actually set bits during this phase */
4474
4475         c = (struct bm_xfer_ctx) {
4476                 .bm_bits = drbd_bm_bits(device),
4477                 .bm_words = drbd_bm_words(device),
4478         };
4479
4480         for(;;) {
4481                 if (pi->cmd == P_BITMAP)
4482                         err = receive_bitmap_plain(peer_device, pi->size, pi->data, &c);
4483                 else if (pi->cmd == P_COMPRESSED_BITMAP) {
4484                         /* MAYBE: sanity check that we speak proto >= 90,
4485                          * and the feature is enabled! */
4486                         struct p_compressed_bm *p = pi->data;
4487
4488                         if (pi->size > DRBD_SOCKET_BUFFER_SIZE - drbd_header_size(connection)) {
4489                                 drbd_err(device, "ReportCBitmap packet too large\n");
4490                                 err = -EIO;
4491                                 goto out;
4492                         }
4493                         if (pi->size <= sizeof(*p)) {
4494                                 drbd_err(device, "ReportCBitmap packet too small (l:%u)\n", pi->size);
4495                                 err = -EIO;
4496                                 goto out;
4497                         }
4498                         err = drbd_recv_all(peer_device->connection, p, pi->size);
4499                         if (err)
4500                                goto out;
4501                         err = decode_bitmap_c(peer_device, p, &c, pi->size);
4502                 } else {
4503                         drbd_warn(device, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", pi->cmd);
4504                         err = -EIO;
4505                         goto out;
4506                 }
4507
4508                 c.packets[pi->cmd == P_BITMAP]++;
4509                 c.bytes[pi->cmd == P_BITMAP] += drbd_header_size(connection) + pi->size;
4510
4511                 if (err <= 0) {
4512                         if (err < 0)
4513                                 goto out;
4514                         break;
4515                 }
4516                 err = drbd_recv_header(peer_device->connection, pi);
4517                 if (err)
4518                         goto out;
4519         }
4520
4521         INFO_bm_xfer_stats(device, "receive", &c);
4522
4523         if (device->state.conn == C_WF_BITMAP_T) {
4524                 enum drbd_state_rv rv;
4525
4526                 err = drbd_send_bitmap(device);
4527                 if (err)
4528                         goto out;
4529                 /* Omit CS_ORDERED with this state transition to avoid deadlocks. */
4530                 rv = _drbd_request_state(device, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
4531                 D_ASSERT(device, rv == SS_SUCCESS);
4532         } else if (device->state.conn != C_WF_BITMAP_S) {
4533                 /* admin may have requested C_DISCONNECTING,
4534                  * other threads may have noticed network errors */
4535                 drbd_info(device, "unexpected cstate (%s) in receive_bitmap\n",
4536                     drbd_conn_str(device->state.conn));
4537         }
4538         err = 0;
4539
4540  out:
4541         drbd_bm_unlock(device);
4542         if (!err && device->state.conn == C_WF_BITMAP_S)
4543                 drbd_start_resync(device, C_SYNC_SOURCE);
4544         return err;
4545 }
4546
4547 static int receive_skip(struct drbd_connection *connection, struct packet_info *pi)
4548 {
4549         drbd_warn(connection, "skipping unknown optional packet type %d, l: %d!\n",
4550                  pi->cmd, pi->size);
4551
4552         return ignore_remaining_packet(connection, pi);
4553 }
4554
4555 static int receive_UnplugRemote(struct drbd_connection *connection, struct packet_info *pi)
4556 {
4557         /* Make sure we've acked all the TCP data associated
4558          * with the data requests being unplugged */
4559         drbd_tcp_quickack(connection->data.socket);
4560
4561         return 0;
4562 }
4563
4564 static int receive_out_of_sync(struct drbd_connection *connection, struct packet_info *pi)
4565 {
4566         struct drbd_peer_device *peer_device;
4567         struct drbd_device *device;
4568         struct p_block_desc *p = pi->data;
4569
4570         peer_device = conn_peer_device(connection, pi->vnr);
4571         if (!peer_device)
4572                 return -EIO;
4573         device = peer_device->device;
4574
4575         switch (device->state.conn) {
4576         case C_WF_SYNC_UUID:
4577         case C_WF_BITMAP_T:
4578         case C_BEHIND:
4579                         break;
4580         default:
4581                 drbd_err(device, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
4582                                 drbd_conn_str(device->state.conn));
4583         }
4584
4585         drbd_set_out_of_sync(device, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
4586
4587         return 0;
4588 }
4589
4590 struct data_cmd {
4591         int expect_payload;
4592         size_t pkt_size;
4593         int (*fn)(struct drbd_connection *, struct packet_info *);
4594 };
4595
4596 static struct data_cmd drbd_cmd_handler[] = {
4597         [P_DATA]            = { 1, sizeof(struct p_data), receive_Data },
4598         [P_DATA_REPLY]      = { 1, sizeof(struct p_data), receive_DataReply },
4599         [P_RS_DATA_REPLY]   = { 1, sizeof(struct p_data), receive_RSDataReply } ,
4600         [P_BARRIER]         = { 0, sizeof(struct p_barrier), receive_Barrier } ,
4601         [P_BITMAP]          = { 1, 0, receive_bitmap } ,
4602         [P_COMPRESSED_BITMAP] = { 1, 0, receive_bitmap } ,
4603         [P_UNPLUG_REMOTE]   = { 0, 0, receive_UnplugRemote },
4604         [P_DATA_REQUEST]    = { 0, sizeof(struct p_block_req), receive_DataRequest },
4605         [P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
4606         [P_SYNC_PARAM]      = { 1, 0, receive_SyncParam },
4607         [P_SYNC_PARAM89]    = { 1, 0, receive_SyncParam },
4608         [P_PROTOCOL]        = { 1, sizeof(struct p_protocol), receive_protocol },
4609         [P_UUIDS]           = { 0, sizeof(struct p_uuids), receive_uuids },
4610         [P_SIZES]           = { 0, sizeof(struct p_sizes), receive_sizes },
4611         [P_STATE]           = { 0, sizeof(struct p_state), receive_state },
4612         [P_STATE_CHG_REQ]   = { 0, sizeof(struct p_req_state), receive_req_state },
4613         [P_SYNC_UUID]       = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
4614         [P_OV_REQUEST]      = { 0, sizeof(struct p_block_req), receive_DataRequest },
4615         [P_OV_REPLY]        = { 1, sizeof(struct p_block_req), receive_DataRequest },
4616         [P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
4617         [P_DELAY_PROBE]     = { 0, sizeof(struct p_delay_probe93), receive_skip },
4618         [P_OUT_OF_SYNC]     = { 0, sizeof(struct p_block_desc), receive_out_of_sync },
4619         [P_CONN_ST_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_conn_state },
4620         [P_PROTOCOL_UPDATE] = { 1, sizeof(struct p_protocol), receive_protocol },
4621         [P_TRIM]            = { 0, sizeof(struct p_trim), receive_Data },
4622 };
4623
4624 static void drbdd(struct drbd_connection *connection)
4625 {
4626         struct packet_info pi;
4627         size_t shs; /* sub header size */
4628         int err;
4629
4630         while (get_t_state(&connection->receiver) == RUNNING) {
4631                 struct data_cmd *cmd;
4632
4633                 drbd_thread_current_set_cpu(&connection->receiver);
4634                 update_receiver_timing_details(connection, drbd_recv_header);
4635                 if (drbd_recv_header(connection, &pi))
4636                         goto err_out;
4637
4638                 cmd = &drbd_cmd_handler[pi.cmd];
4639                 if (unlikely(pi.cmd >= ARRAY_SIZE(drbd_cmd_handler) || !cmd->fn)) {
4640                         drbd_err(connection, "Unexpected data packet %s (0x%04x)",
4641                                  cmdname(pi.cmd), pi.cmd);
4642                         goto err_out;
4643                 }
4644
4645                 shs = cmd->pkt_size;
4646                 if (pi.size > shs && !cmd->expect_payload) {
4647                         drbd_err(connection, "No payload expected %s l:%d\n",
4648                                  cmdname(pi.cmd), pi.size);
4649                         goto err_out;
4650                 }
4651
4652                 if (shs) {
4653                         update_receiver_timing_details(connection, drbd_recv_all_warn);
4654                         err = drbd_recv_all_warn(connection, pi.data, shs);
4655                         if (err)
4656                                 goto err_out;
4657                         pi.size -= shs;
4658                 }
4659
4660                 update_receiver_timing_details(connection, cmd->fn);
4661                 err = cmd->fn(connection, &pi);
4662                 if (err) {
4663                         drbd_err(connection, "error receiving %s, e: %d l: %d!\n",
4664                                  cmdname(pi.cmd), err, pi.size);
4665                         goto err_out;
4666                 }
4667         }
4668         return;
4669
4670     err_out:
4671         conn_request_state(connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4672 }
4673
4674 static void conn_disconnect(struct drbd_connection *connection)
4675 {
4676         struct drbd_peer_device *peer_device;
4677         enum drbd_conns oc;
4678         int vnr;
4679
4680         if (connection->cstate == C_STANDALONE)
4681                 return;
4682
4683         /* We are about to start the cleanup after connection loss.
4684          * Make sure drbd_make_request knows about that.
4685          * Usually we should be in some network failure state already,
4686          * but just in case we are not, we fix it up here.
4687          */
4688         conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
4689
4690         /* ack_receiver does not clean up anything. it must not interfere, either */
4691         drbd_thread_stop(&connection->ack_receiver);
4692         if (connection->ack_sender) {
4693                 destroy_workqueue(connection->ack_sender);
4694                 connection->ack_sender = NULL;
4695         }
4696         drbd_free_sock(connection);
4697
4698         rcu_read_lock();
4699         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
4700                 struct drbd_device *device = peer_device->device;
4701                 kref_get(&device->kref);
4702                 rcu_read_unlock();
4703                 drbd_disconnected(peer_device);
4704                 kref_put(&device->kref, drbd_destroy_device);
4705                 rcu_read_lock();
4706         }
4707         rcu_read_unlock();
4708
4709         if (!list_empty(&connection->current_epoch->list))
4710                 drbd_err(connection, "ASSERTION FAILED: connection->current_epoch->list not empty\n");
4711         /* ok, no more ee's on the fly, it is safe to reset the epoch_size */
4712         atomic_set(&connection->current_epoch->epoch_size, 0);
4713         connection->send.seen_any_write_yet = false;
4714
4715         drbd_info(connection, "Connection closed\n");
4716
4717         if (conn_highest_role(connection) == R_PRIMARY && conn_highest_pdsk(connection) >= D_UNKNOWN)
4718                 conn_try_outdate_peer_async(connection);
4719
4720         spin_lock_irq(&connection->resource->req_lock);
4721         oc = connection->cstate;
4722         if (oc >= C_UNCONNECTED)
4723                 _conn_request_state(connection, NS(conn, C_UNCONNECTED), CS_VERBOSE);
4724
4725         spin_unlock_irq(&connection->resource->req_lock);
4726
4727         if (oc == C_DISCONNECTING)
4728                 conn_request_state(connection, NS(conn, C_STANDALONE), CS_VERBOSE | CS_HARD);
4729 }
4730
4731 static int drbd_disconnected(struct drbd_peer_device *peer_device)
4732 {
4733         struct drbd_device *device = peer_device->device;
4734         unsigned int i;
4735
4736         /* wait for current activity to cease. */
4737         spin_lock_irq(&device->resource->req_lock);
4738         _drbd_wait_ee_list_empty(device, &device->active_ee);
4739         _drbd_wait_ee_list_empty(device, &device->sync_ee);
4740         _drbd_wait_ee_list_empty(device, &device->read_ee);
4741         spin_unlock_irq(&device->resource->req_lock);
4742
4743         /* We do not have data structures that would allow us to
4744          * get the rs_pending_cnt down to 0 again.
4745          *  * On C_SYNC_TARGET we do not have any data structures describing
4746          *    the pending RSDataRequest's we have sent.
4747          *  * On C_SYNC_SOURCE there is no data structure that tracks
4748          *    the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
4749          *  And no, it is not the sum of the reference counts in the
4750          *  resync_LRU. The resync_LRU tracks the whole operation including
4751          *  the disk-IO, while the rs_pending_cnt only tracks the blocks
4752          *  on the fly. */
4753         drbd_rs_cancel_all(device);
4754         device->rs_total = 0;
4755         device->rs_failed = 0;
4756         atomic_set(&device->rs_pending_cnt, 0);
4757         wake_up(&device->misc_wait);
4758
4759         del_timer_sync(&device->resync_timer);
4760         resync_timer_fn((unsigned long)device);
4761
4762         /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
4763          * w_make_resync_request etc. which may still be on the worker queue
4764          * to be "canceled" */
4765         drbd_flush_workqueue(&peer_device->connection->sender_work);
4766
4767         drbd_finish_peer_reqs(device);
4768
4769         /* This second workqueue flush is necessary, since drbd_finish_peer_reqs()
4770            might have issued a work again. The one before drbd_finish_peer_reqs() is
4771            necessary to reclain net_ee in drbd_finish_peer_reqs(). */
4772         drbd_flush_workqueue(&peer_device->connection->sender_work);
4773
4774         /* need to do it again, drbd_finish_peer_reqs() may have populated it
4775          * again via drbd_try_clear_on_disk_bm(). */
4776         drbd_rs_cancel_all(device);
4777
4778         kfree(device->p_uuid);
4779         device->p_uuid = NULL;
4780
4781         if (!drbd_suspended(device))
4782                 tl_clear(peer_device->connection);
4783
4784         drbd_md_sync(device);
4785
4786         /* serialize with bitmap writeout triggered by the state change,
4787          * if any. */
4788         wait_event(device->misc_wait, !test_bit(BITMAP_IO, &device->flags));
4789
4790         /* tcp_close and release of sendpage pages can be deferred.  I don't
4791          * want to use SO_LINGER, because apparently it can be deferred for
4792          * more than 20 seconds (longest time I checked).
4793          *
4794          * Actually we don't care for exactly when the network stack does its
4795          * put_page(), but release our reference on these pages right here.
4796          */
4797         i = drbd_free_peer_reqs(device, &device->net_ee);
4798         if (i)
4799                 drbd_info(device, "net_ee not empty, killed %u entries\n", i);
4800         i = atomic_read(&device->pp_in_use_by_net);
4801         if (i)
4802                 drbd_info(device, "pp_in_use_by_net = %d, expected 0\n", i);
4803         i = atomic_read(&device->pp_in_use);
4804         if (i)
4805                 drbd_info(device, "pp_in_use = %d, expected 0\n", i);
4806
4807         D_ASSERT(device, list_empty(&device->read_ee));
4808         D_ASSERT(device, list_empty(&device->active_ee));
4809         D_ASSERT(device, list_empty(&device->sync_ee));
4810         D_ASSERT(device, list_empty(&device->done_ee));
4811
4812         return 0;
4813 }
4814
4815 /*
4816  * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
4817  * we can agree on is stored in agreed_pro_version.
4818  *
4819  * feature flags and the reserved array should be enough room for future
4820  * enhancements of the handshake protocol, and possible plugins...
4821  *
4822  * for now, they are expected to be zero, but ignored.
4823  */
4824 static int drbd_send_features(struct drbd_connection *connection)
4825 {
4826         struct drbd_socket *sock;
4827         struct p_connection_features *p;
4828
4829         sock = &connection->data;
4830         p = conn_prepare_command(connection, sock);
4831         if (!p)
4832                 return -EIO;
4833         memset(p, 0, sizeof(*p));
4834         p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
4835         p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
4836         p->feature_flags = cpu_to_be32(PRO_FEATURES);
4837         return conn_send_command(connection, sock, P_CONNECTION_FEATURES, sizeof(*p), NULL, 0);
4838 }
4839
4840 /*
4841  * return values:
4842  *   1 yes, we have a valid connection
4843  *   0 oops, did not work out, please try again
4844  *  -1 peer talks different language,
4845  *     no point in trying again, please go standalone.
4846  */
4847 static int drbd_do_features(struct drbd_connection *connection)
4848 {
4849         /* ASSERT current == connection->receiver ... */
4850         struct p_connection_features *p;
4851         const int expect = sizeof(struct p_connection_features);
4852         struct packet_info pi;
4853         int err;
4854
4855         err = drbd_send_features(connection);
4856         if (err)
4857                 return 0;
4858
4859         err = drbd_recv_header(connection, &pi);
4860         if (err)
4861                 return 0;
4862
4863         if (pi.cmd != P_CONNECTION_FEATURES) {
4864                 drbd_err(connection, "expected ConnectionFeatures packet, received: %s (0x%04x)\n",
4865                          cmdname(pi.cmd), pi.cmd);
4866                 return -1;
4867         }
4868
4869         if (pi.size != expect) {
4870                 drbd_err(connection, "expected ConnectionFeatures length: %u, received: %u\n",
4871                      expect, pi.size);
4872                 return -1;
4873         }
4874
4875         p = pi.data;
4876         err = drbd_recv_all_warn(connection, p, expect);
4877         if (err)
4878                 return 0;
4879
4880         p->protocol_min = be32_to_cpu(p->protocol_min);
4881         p->protocol_max = be32_to_cpu(p->protocol_max);
4882         if (p->protocol_max == 0)
4883                 p->protocol_max = p->protocol_min;
4884
4885         if (PRO_VERSION_MAX < p->protocol_min ||
4886             PRO_VERSION_MIN > p->protocol_max)
4887                 goto incompat;
4888
4889         connection->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
4890         connection->agreed_features = PRO_FEATURES & be32_to_cpu(p->feature_flags);
4891
4892         drbd_info(connection, "Handshake successful: "
4893              "Agreed network protocol version %d\n", connection->agreed_pro_version);
4894
4895         drbd_info(connection, "Agreed to%ssupport TRIM on protocol level\n",
4896                   connection->agreed_features & FF_TRIM ? " " : " not ");
4897
4898         return 1;
4899
4900  incompat:
4901         drbd_err(connection, "incompatible DRBD dialects: "
4902             "I support %d-%d, peer supports %d-%d\n",
4903             PRO_VERSION_MIN, PRO_VERSION_MAX,
4904             p->protocol_min, p->protocol_max);
4905         return -1;
4906 }
4907
4908 #if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
4909 static int drbd_do_auth(struct drbd_connection *connection)
4910 {
4911         drbd_err(connection, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
4912         drbd_err(connection, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
4913         return -1;
4914 }
4915 #else
4916 #define CHALLENGE_LEN 64
4917
4918 /* Return value:
4919         1 - auth succeeded,
4920         0 - failed, try again (network error),
4921         -1 - auth failed, don't try again.
4922 */
4923
4924 static int drbd_do_auth(struct drbd_connection *connection)
4925 {
4926         struct drbd_socket *sock;
4927         char my_challenge[CHALLENGE_LEN];  /* 64 Bytes... */
4928         char *response = NULL;
4929         char *right_response = NULL;
4930         char *peers_ch = NULL;
4931         unsigned int key_len;
4932         char secret[SHARED_SECRET_MAX]; /* 64 byte */
4933         unsigned int resp_size;
4934         SHASH_DESC_ON_STACK(desc, connection->cram_hmac_tfm);
4935         struct packet_info pi;
4936         struct net_conf *nc;
4937         int err, rv;
4938
4939         /* FIXME: Put the challenge/response into the preallocated socket buffer.  */
4940
4941         rcu_read_lock();
4942         nc = rcu_dereference(connection->net_conf);
4943         key_len = strlen(nc->shared_secret);
4944         memcpy(secret, nc->shared_secret, key_len);
4945         rcu_read_unlock();
4946
4947         desc->tfm = connection->cram_hmac_tfm;
4948         desc->flags = 0;
4949
4950         rv = crypto_shash_setkey(connection->cram_hmac_tfm, (u8 *)secret, key_len);
4951         if (rv) {
4952                 drbd_err(connection, "crypto_shash_setkey() failed with %d\n", rv);
4953                 rv = -1;
4954                 goto fail;
4955         }
4956
4957         get_random_bytes(my_challenge, CHALLENGE_LEN);
4958
4959         sock = &connection->data;
4960         if (!conn_prepare_command(connection, sock)) {
4961                 rv = 0;
4962                 goto fail;
4963         }
4964         rv = !conn_send_command(connection, sock, P_AUTH_CHALLENGE, 0,
4965                                 my_challenge, CHALLENGE_LEN);
4966         if (!rv)
4967                 goto fail;
4968
4969         err = drbd_recv_header(connection, &pi);
4970         if (err) {
4971                 rv = 0;
4972                 goto fail;
4973         }
4974
4975         if (pi.cmd != P_AUTH_CHALLENGE) {
4976                 drbd_err(connection, "expected AuthChallenge packet, received: %s (0x%04x)\n",
4977                          cmdname(pi.cmd), pi.cmd);
4978                 rv = 0;
4979                 goto fail;
4980         }
4981
4982         if (pi.size > CHALLENGE_LEN * 2) {
4983                 drbd_err(connection, "expected AuthChallenge payload too big.\n");
4984                 rv = -1;
4985                 goto fail;
4986         }
4987
4988         if (pi.size < CHALLENGE_LEN) {
4989                 drbd_err(connection, "AuthChallenge payload too small.\n");
4990                 rv = -1;
4991                 goto fail;
4992         }
4993
4994         peers_ch = kmalloc(pi.size, GFP_NOIO);
4995         if (peers_ch == NULL) {
4996                 drbd_err(connection, "kmalloc of peers_ch failed\n");
4997                 rv = -1;
4998                 goto fail;
4999         }
5000
5001         err = drbd_recv_all_warn(connection, peers_ch, pi.size);
5002         if (err) {
5003                 rv = 0;
5004                 goto fail;
5005         }
5006
5007         if (!memcmp(my_challenge, peers_ch, CHALLENGE_LEN)) {
5008                 drbd_err(connection, "Peer presented the same challenge!\n");
5009                 rv = -1;
5010                 goto fail;
5011         }
5012
5013         resp_size = crypto_shash_digestsize(connection->cram_hmac_tfm);
5014         response = kmalloc(resp_size, GFP_NOIO);
5015         if (response == NULL) {
5016                 drbd_err(connection, "kmalloc of response failed\n");
5017                 rv = -1;
5018                 goto fail;
5019         }
5020
5021         rv = crypto_shash_digest(desc, peers_ch, pi.size, response);
5022         if (rv) {
5023                 drbd_err(connection, "crypto_hash_digest() failed with %d\n", rv);
5024                 rv = -1;
5025                 goto fail;
5026         }
5027
5028         if (!conn_prepare_command(connection, sock)) {
5029                 rv = 0;
5030                 goto fail;
5031         }
5032         rv = !conn_send_command(connection, sock, P_AUTH_RESPONSE, 0,
5033                                 response, resp_size);
5034         if (!rv)
5035                 goto fail;
5036
5037         err = drbd_recv_header(connection, &pi);
5038         if (err) {
5039                 rv = 0;
5040                 goto fail;
5041         }
5042
5043         if (pi.cmd != P_AUTH_RESPONSE) {
5044                 drbd_err(connection, "expected AuthResponse packet, received: %s (0x%04x)\n",
5045                          cmdname(pi.cmd), pi.cmd);
5046                 rv = 0;
5047                 goto fail;
5048         }
5049
5050         if (pi.size != resp_size) {
5051                 drbd_err(connection, "expected AuthResponse payload of wrong size\n");
5052                 rv = 0;
5053                 goto fail;
5054         }
5055
5056         err = drbd_recv_all_warn(connection, response , resp_size);
5057         if (err) {
5058                 rv = 0;
5059                 goto fail;
5060         }
5061
5062         right_response = kmalloc(resp_size, GFP_NOIO);
5063         if (right_response == NULL) {
5064                 drbd_err(connection, "kmalloc of right_response failed\n");
5065                 rv = -1;
5066                 goto fail;
5067         }
5068
5069         rv = crypto_shash_digest(desc, my_challenge, CHALLENGE_LEN,
5070                                  right_response);
5071         if (rv) {
5072                 drbd_err(connection, "crypto_hash_digest() failed with %d\n", rv);
5073                 rv = -1;
5074                 goto fail;
5075         }
5076
5077         rv = !memcmp(response, right_response, resp_size);
5078
5079         if (rv)
5080                 drbd_info(connection, "Peer authenticated using %d bytes HMAC\n",
5081                      resp_size);
5082         else
5083                 rv = -1;
5084
5085  fail:
5086         kfree(peers_ch);
5087         kfree(response);
5088         kfree(right_response);
5089         shash_desc_zero(desc);
5090
5091         return rv;
5092 }
5093 #endif
5094
5095 int drbd_receiver(struct drbd_thread *thi)
5096 {
5097         struct drbd_connection *connection = thi->connection;
5098         int h;
5099
5100         drbd_info(connection, "receiver (re)started\n");
5101
5102         do {
5103                 h = conn_connect(connection);
5104                 if (h == 0) {
5105                         conn_disconnect(connection);
5106                         schedule_timeout_interruptible(HZ);
5107                 }
5108                 if (h == -1) {
5109                         drbd_warn(connection, "Discarding network configuration.\n");
5110                         conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
5111                 }
5112         } while (h == 0);
5113
5114         if (h > 0)
5115                 drbdd(connection);
5116
5117         conn_disconnect(connection);
5118
5119         drbd_info(connection, "receiver terminated\n");
5120         return 0;
5121 }
5122
5123 /* ********* acknowledge sender ******** */
5124
5125 static int got_conn_RqSReply(struct drbd_connection *connection, struct packet_info *pi)
5126 {
5127         struct p_req_state_reply *p = pi->data;
5128         int retcode = be32_to_cpu(p->retcode);
5129
5130         if (retcode >= SS_SUCCESS) {
5131                 set_bit(CONN_WD_ST_CHG_OKAY, &connection->flags);
5132         } else {
5133                 set_bit(CONN_WD_ST_CHG_FAIL, &connection->flags);
5134                 drbd_err(connection, "Requested state change failed by peer: %s (%d)\n",
5135                          drbd_set_st_err_str(retcode), retcode);
5136         }
5137         wake_up(&connection->ping_wait);
5138
5139         return 0;
5140 }
5141
5142 static int got_RqSReply(struct drbd_connection *connection, struct packet_info *pi)
5143 {
5144         struct drbd_peer_device *peer_device;
5145         struct drbd_device *device;
5146         struct p_req_state_reply *p = pi->data;
5147         int retcode = be32_to_cpu(p->retcode);
5148
5149         peer_device = conn_peer_device(connection, pi->vnr);
5150         if (!peer_device)
5151                 return -EIO;
5152         device = peer_device->device;
5153
5154         if (test_bit(CONN_WD_ST_CHG_REQ, &connection->flags)) {
5155                 D_ASSERT(device, connection->agreed_pro_version < 100);
5156                 return got_conn_RqSReply(connection, pi);
5157         }
5158
5159         if (retcode >= SS_SUCCESS) {
5160                 set_bit(CL_ST_CHG_SUCCESS, &device->flags);
5161         } else {
5162                 set_bit(CL_ST_CHG_FAIL, &device->flags);
5163                 drbd_err(device, "Requested state change failed by peer: %s (%d)\n",
5164                         drbd_set_st_err_str(retcode), retcode);
5165         }
5166         wake_up(&device->state_wait);
5167
5168         return 0;
5169 }
5170
5171 static int got_Ping(struct drbd_connection *connection, struct packet_info *pi)
5172 {
5173         return drbd_send_ping_ack(connection);
5174
5175 }
5176
5177 static int got_PingAck(struct drbd_connection *connection, struct packet_info *pi)
5178 {
5179         /* restore idle timeout */
5180         connection->meta.socket->sk->sk_rcvtimeo = connection->net_conf->ping_int*HZ;
5181         if (!test_and_set_bit(GOT_PING_ACK, &connection->flags))
5182                 wake_up(&connection->ping_wait);
5183
5184         return 0;
5185 }
5186
5187 static int got_IsInSync(struct drbd_connection *connection, struct packet_info *pi)
5188 {
5189         struct drbd_peer_device *peer_device;
5190         struct drbd_device *device;
5191         struct p_block_ack *p = pi->data;
5192         sector_t sector = be64_to_cpu(p->sector);
5193         int blksize = be32_to_cpu(p->blksize);
5194
5195         peer_device = conn_peer_device(connection, pi->vnr);
5196         if (!peer_device)
5197                 return -EIO;
5198         device = peer_device->device;
5199
5200         D_ASSERT(device, peer_device->connection->agreed_pro_version >= 89);
5201
5202         update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5203
5204         if (get_ldev(device)) {
5205                 drbd_rs_complete_io(device, sector);
5206                 drbd_set_in_sync(device, sector, blksize);
5207                 /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
5208                 device->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
5209                 put_ldev(device);
5210         }
5211         dec_rs_pending(device);
5212         atomic_add(blksize >> 9, &device->rs_sect_in);
5213
5214         return 0;
5215 }
5216
5217 static int
5218 validate_req_change_req_state(struct drbd_device *device, u64 id, sector_t sector,
5219                               struct rb_root *root, const char *func,
5220                               enum drbd_req_event what, bool missing_ok)
5221 {
5222         struct drbd_request *req;
5223         struct bio_and_error m;
5224
5225         spin_lock_irq(&device->resource->req_lock);
5226         req = find_request(device, root, id, sector, missing_ok, func);
5227         if (unlikely(!req)) {
5228                 spin_unlock_irq(&device->resource->req_lock);
5229                 return -EIO;
5230         }
5231         __req_mod(req, what, &m);
5232         spin_unlock_irq(&device->resource->req_lock);
5233
5234         if (m.bio)
5235                 complete_master_bio(device, &m);
5236         return 0;
5237 }
5238
5239 static int got_BlockAck(struct drbd_connection *connection, struct packet_info *pi)
5240 {
5241         struct drbd_peer_device *peer_device;
5242         struct drbd_device *device;
5243         struct p_block_ack *p = pi->data;
5244         sector_t sector = be64_to_cpu(p->sector);
5245         int blksize = be32_to_cpu(p->blksize);
5246         enum drbd_req_event what;
5247
5248         peer_device = conn_peer_device(connection, pi->vnr);
5249         if (!peer_device)
5250                 return -EIO;
5251         device = peer_device->device;
5252
5253         update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5254
5255         if (p->block_id == ID_SYNCER) {
5256                 drbd_set_in_sync(device, sector, blksize);
5257                 dec_rs_pending(device);
5258                 return 0;
5259         }
5260         switch (pi->cmd) {
5261         case P_RS_WRITE_ACK:
5262                 what = WRITE_ACKED_BY_PEER_AND_SIS;
5263                 break;
5264         case P_WRITE_ACK:
5265                 what = WRITE_ACKED_BY_PEER;
5266                 break;
5267         case P_RECV_ACK:
5268                 what = RECV_ACKED_BY_PEER;
5269                 break;
5270         case P_SUPERSEDED:
5271                 what = CONFLICT_RESOLVED;
5272                 break;
5273         case P_RETRY_WRITE:
5274                 what = POSTPONE_WRITE;
5275                 break;
5276         default:
5277                 BUG();
5278         }
5279
5280         return validate_req_change_req_state(device, p->block_id, sector,
5281                                              &device->write_requests, __func__,
5282                                              what, false);
5283 }
5284
5285 static int got_NegAck(struct drbd_connection *connection, struct packet_info *pi)
5286 {
5287         struct drbd_peer_device *peer_device;
5288         struct drbd_device *device;
5289         struct p_block_ack *p = pi->data;
5290         sector_t sector = be64_to_cpu(p->sector);
5291         int size = be32_to_cpu(p->blksize);
5292         int err;
5293
5294         peer_device = conn_peer_device(connection, pi->vnr);
5295         if (!peer_device)
5296                 return -EIO;
5297         device = peer_device->device;
5298
5299         update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5300
5301         if (p->block_id == ID_SYNCER) {
5302                 dec_rs_pending(device);
5303                 drbd_rs_failed_io(device, sector, size);
5304                 return 0;
5305         }
5306
5307         err = validate_req_change_req_state(device, p->block_id, sector,
5308                                             &device->write_requests, __func__,
5309                                             NEG_ACKED, true);
5310         if (err) {
5311                 /* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
5312                    The master bio might already be completed, therefore the
5313                    request is no longer in the collision hash. */
5314                 /* In Protocol B we might already have got a P_RECV_ACK
5315                    but then get a P_NEG_ACK afterwards. */
5316                 drbd_set_out_of_sync(device, sector, size);
5317         }
5318         return 0;
5319 }
5320
5321 static int got_NegDReply(struct drbd_connection *connection, struct packet_info *pi)
5322 {
5323         struct drbd_peer_device *peer_device;
5324         struct drbd_device *device;
5325         struct p_block_ack *p = pi->data;
5326         sector_t sector = be64_to_cpu(p->sector);
5327
5328         peer_device = conn_peer_device(connection, pi->vnr);
5329         if (!peer_device)
5330                 return -EIO;
5331         device = peer_device->device;
5332
5333         update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5334
5335         drbd_err(device, "Got NegDReply; Sector %llus, len %u.\n",
5336             (unsigned long long)sector, be32_to_cpu(p->blksize));
5337
5338         return validate_req_change_req_state(device, p->block_id, sector,
5339                                              &device->read_requests, __func__,
5340                                              NEG_ACKED, false);
5341 }
5342
5343 static int got_NegRSDReply(struct drbd_connection *connection, struct packet_info *pi)
5344 {
5345         struct drbd_peer_device *peer_device;
5346         struct drbd_device *device;
5347         sector_t sector;
5348         int size;
5349         struct p_block_ack *p = pi->data;
5350
5351         peer_device = conn_peer_device(connection, pi->vnr);
5352         if (!peer_device)
5353                 return -EIO;
5354         device = peer_device->device;
5355
5356         sector = be64_to_cpu(p->sector);
5357         size = be32_to_cpu(p->blksize);
5358
5359         update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5360
5361         dec_rs_pending(device);
5362
5363         if (get_ldev_if_state(device, D_FAILED)) {
5364                 drbd_rs_complete_io(device, sector);
5365                 switch (pi->cmd) {
5366                 case P_NEG_RS_DREPLY:
5367                         drbd_rs_failed_io(device, sector, size);
5368                 case P_RS_CANCEL:
5369                         break;
5370                 default:
5371                         BUG();
5372                 }
5373                 put_ldev(device);
5374         }
5375
5376         return 0;
5377 }
5378
5379 static int got_BarrierAck(struct drbd_connection *connection, struct packet_info *pi)
5380 {
5381         struct p_barrier_ack *p = pi->data;
5382         struct drbd_peer_device *peer_device;
5383         int vnr;
5384
5385         tl_release(connection, p->barrier, be32_to_cpu(p->set_size));
5386
5387         rcu_read_lock();
5388         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
5389                 struct drbd_device *device = peer_device->device;
5390
5391                 if (device->state.conn == C_AHEAD &&
5392                     atomic_read(&device->ap_in_flight) == 0 &&
5393                     !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &device->flags)) {
5394                         device->start_resync_timer.expires = jiffies + HZ;
5395                         add_timer(&device->start_resync_timer);
5396                 }
5397         }
5398         rcu_read_unlock();
5399
5400         return 0;
5401 }
5402
5403 static int got_OVResult(struct drbd_connection *connection, struct packet_info *pi)
5404 {
5405         struct drbd_peer_device *peer_device;
5406         struct drbd_device *device;
5407         struct p_block_ack *p = pi->data;
5408         struct drbd_device_work *dw;
5409         sector_t sector;
5410         int size;
5411
5412         peer_device = conn_peer_device(connection, pi->vnr);
5413         if (!peer_device)
5414                 return -EIO;
5415         device = peer_device->device;
5416
5417         sector = be64_to_cpu(p->sector);
5418         size = be32_to_cpu(p->blksize);
5419
5420         update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5421
5422         if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
5423                 drbd_ov_out_of_sync_found(device, sector, size);
5424         else
5425                 ov_out_of_sync_print(device);
5426
5427         if (!get_ldev(device))
5428                 return 0;
5429
5430         drbd_rs_complete_io(device, sector);
5431         dec_rs_pending(device);
5432
5433         --device->ov_left;
5434
5435         /* let's advance progress step marks only for every other megabyte */
5436         if ((device->ov_left & 0x200) == 0x200)
5437                 drbd_advance_rs_marks(device, device->ov_left);
5438
5439         if (device->ov_left == 0) {
5440                 dw = kmalloc(sizeof(*dw), GFP_NOIO);
5441                 if (dw) {
5442                         dw->w.cb = w_ov_finished;
5443                         dw->device = device;
5444                         drbd_queue_work(&peer_device->connection->sender_work, &dw->w);
5445                 } else {
5446                         drbd_err(device, "kmalloc(dw) failed.");
5447                         ov_out_of_sync_print(device);
5448                         drbd_resync_finished(device);
5449                 }
5450         }
5451         put_ldev(device);
5452         return 0;
5453 }
5454
5455 static int got_skip(struct drbd_connection *connection, struct packet_info *pi)
5456 {
5457         return 0;
5458 }
5459
5460 struct meta_sock_cmd {
5461         size_t pkt_size;
5462         int (*fn)(struct drbd_connection *connection, struct packet_info *);
5463 };
5464
5465 static void set_rcvtimeo(struct drbd_connection *connection, bool ping_timeout)
5466 {
5467         long t;
5468         struct net_conf *nc;
5469
5470         rcu_read_lock();
5471         nc = rcu_dereference(connection->net_conf);
5472         t = ping_timeout ? nc->ping_timeo : nc->ping_int;
5473         rcu_read_unlock();
5474
5475         t *= HZ;
5476         if (ping_timeout)
5477                 t /= 10;
5478
5479         connection->meta.socket->sk->sk_rcvtimeo = t;
5480 }
5481
5482 static void set_ping_timeout(struct drbd_connection *connection)
5483 {
5484         set_rcvtimeo(connection, 1);
5485 }
5486
5487 static void set_idle_timeout(struct drbd_connection *connection)
5488 {
5489         set_rcvtimeo(connection, 0);
5490 }
5491
5492 static struct meta_sock_cmd ack_receiver_tbl[] = {
5493         [P_PING]            = { 0, got_Ping },
5494         [P_PING_ACK]        = { 0, got_PingAck },
5495         [P_RECV_ACK]        = { sizeof(struct p_block_ack), got_BlockAck },
5496         [P_WRITE_ACK]       = { sizeof(struct p_block_ack), got_BlockAck },
5497         [P_RS_WRITE_ACK]    = { sizeof(struct p_block_ack), got_BlockAck },
5498         [P_SUPERSEDED]   = { sizeof(struct p_block_ack), got_BlockAck },
5499         [P_NEG_ACK]         = { sizeof(struct p_block_ack), got_NegAck },
5500         [P_NEG_DREPLY]      = { sizeof(struct p_block_ack), got_NegDReply },
5501         [P_NEG_RS_DREPLY]   = { sizeof(struct p_block_ack), got_NegRSDReply },
5502         [P_OV_RESULT]       = { sizeof(struct p_block_ack), got_OVResult },
5503         [P_BARRIER_ACK]     = { sizeof(struct p_barrier_ack), got_BarrierAck },
5504         [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
5505         [P_RS_IS_IN_SYNC]   = { sizeof(struct p_block_ack), got_IsInSync },
5506         [P_DELAY_PROBE]     = { sizeof(struct p_delay_probe93), got_skip },
5507         [P_RS_CANCEL]       = { sizeof(struct p_block_ack), got_NegRSDReply },
5508         [P_CONN_ST_CHG_REPLY]={ sizeof(struct p_req_state_reply), got_conn_RqSReply },
5509         [P_RETRY_WRITE]     = { sizeof(struct p_block_ack), got_BlockAck },
5510 };
5511
5512 int drbd_ack_receiver(struct drbd_thread *thi)
5513 {
5514         struct drbd_connection *connection = thi->connection;
5515         struct meta_sock_cmd *cmd = NULL;
5516         struct packet_info pi;
5517         unsigned long pre_recv_jif;
5518         int rv;
5519         void *buf    = connection->meta.rbuf;
5520         int received = 0;
5521         unsigned int header_size = drbd_header_size(connection);
5522         int expect   = header_size;
5523         bool ping_timeout_active = false;
5524         struct sched_param param = { .sched_priority = 2 };
5525
5526         rv = sched_setscheduler(current, SCHED_RR, &param);
5527         if (rv < 0)
5528                 drbd_err(connection, "drbd_ack_receiver: ERROR set priority, ret=%d\n", rv);
5529
5530         while (get_t_state(thi) == RUNNING) {
5531                 drbd_thread_current_set_cpu(thi);
5532
5533                 conn_reclaim_net_peer_reqs(connection);
5534
5535                 if (test_and_clear_bit(SEND_PING, &connection->flags)) {
5536                         if (drbd_send_ping(connection)) {
5537                                 drbd_err(connection, "drbd_send_ping has failed\n");
5538                                 goto reconnect;
5539                         }
5540                         set_ping_timeout(connection);
5541                         ping_timeout_active = true;
5542                 }
5543
5544                 pre_recv_jif = jiffies;
5545                 rv = drbd_recv_short(connection->meta.socket, buf, expect-received, 0);
5546
5547                 /* Note:
5548                  * -EINTR        (on meta) we got a signal
5549                  * -EAGAIN       (on meta) rcvtimeo expired
5550                  * -ECONNRESET   other side closed the connection
5551                  * -ERESTARTSYS  (on data) we got a signal
5552                  * rv <  0       other than above: unexpected error!
5553                  * rv == expected: full header or command
5554                  * rv <  expected: "woken" by signal during receive
5555                  * rv == 0       : "connection shut down by peer"
5556                  */
5557                 if (likely(rv > 0)) {
5558                         received += rv;
5559                         buf      += rv;
5560                 } else if (rv == 0) {
5561                         if (test_bit(DISCONNECT_SENT, &connection->flags)) {
5562                                 long t;
5563                                 rcu_read_lock();
5564                                 t = rcu_dereference(connection->net_conf)->ping_timeo * HZ/10;
5565                                 rcu_read_unlock();
5566
5567                                 t = wait_event_timeout(connection->ping_wait,
5568                                                        connection->cstate < C_WF_REPORT_PARAMS,
5569                                                        t);
5570                                 if (t)
5571                                         break;
5572                         }
5573                         drbd_err(connection, "meta connection shut down by peer.\n");
5574                         goto reconnect;
5575                 } else if (rv == -EAGAIN) {
5576                         /* If the data socket received something meanwhile,
5577                          * that is good enough: peer is still alive. */
5578                         if (time_after(connection->last_received, pre_recv_jif))
5579                                 continue;
5580                         if (ping_timeout_active) {
5581                                 drbd_err(connection, "PingAck did not arrive in time.\n");
5582                                 goto reconnect;
5583                         }
5584                         set_bit(SEND_PING, &connection->flags);
5585                         continue;
5586                 } else if (rv == -EINTR) {
5587                         /* maybe drbd_thread_stop(): the while condition will notice.
5588                          * maybe woken for send_ping: we'll send a ping above,
5589                          * and change the rcvtimeo */
5590                         flush_signals(current);
5591                         continue;
5592                 } else {
5593                         drbd_err(connection, "sock_recvmsg returned %d\n", rv);
5594                         goto reconnect;
5595                 }
5596
5597                 if (received == expect && cmd == NULL) {
5598                         if (decode_header(connection, connection->meta.rbuf, &pi))
5599                                 goto reconnect;
5600                         cmd = &ack_receiver_tbl[pi.cmd];
5601                         if (pi.cmd >= ARRAY_SIZE(ack_receiver_tbl) || !cmd->fn) {
5602                                 drbd_err(connection, "Unexpected meta packet %s (0x%04x)\n",
5603                                          cmdname(pi.cmd), pi.cmd);
5604                                 goto disconnect;
5605                         }
5606                         expect = header_size + cmd->pkt_size;
5607                         if (pi.size != expect - header_size) {
5608                                 drbd_err(connection, "Wrong packet size on meta (c: %d, l: %d)\n",
5609                                         pi.cmd, pi.size);
5610                                 goto reconnect;
5611                         }
5612                 }
5613                 if (received == expect) {
5614                         bool err;
5615
5616                         err = cmd->fn(connection, &pi);
5617                         if (err) {
5618                                 drbd_err(connection, "%pf failed\n", cmd->fn);
5619                                 goto reconnect;
5620                         }
5621
5622                         connection->last_received = jiffies;
5623
5624                         if (cmd == &ack_receiver_tbl[P_PING_ACK]) {
5625                                 set_idle_timeout(connection);
5626                                 ping_timeout_active = false;
5627                         }
5628
5629                         buf      = connection->meta.rbuf;
5630                         received = 0;
5631                         expect   = header_size;
5632                         cmd      = NULL;
5633                 }
5634         }
5635
5636         if (0) {
5637 reconnect:
5638                 conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
5639                 conn_md_sync(connection);
5640         }
5641         if (0) {
5642 disconnect:
5643                 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
5644         }
5645
5646         drbd_info(connection, "ack_receiver terminated\n");
5647
5648         return 0;
5649 }
5650
5651 void drbd_send_acks_wf(struct work_struct *ws)
5652 {
5653         struct drbd_peer_device *peer_device =
5654                 container_of(ws, struct drbd_peer_device, send_acks_work);
5655         struct drbd_connection *connection = peer_device->connection;
5656         struct drbd_device *device = peer_device->device;
5657         struct net_conf *nc;
5658         int tcp_cork, err;
5659
5660         rcu_read_lock();
5661         nc = rcu_dereference(connection->net_conf);
5662         tcp_cork = nc->tcp_cork;
5663         rcu_read_unlock();
5664
5665         if (tcp_cork)
5666                 drbd_tcp_cork(connection->meta.socket);
5667
5668         err = drbd_finish_peer_reqs(device);
5669         kref_put(&device->kref, drbd_destroy_device);
5670         /* get is in drbd_endio_write_sec_final(). That is necessary to keep the
5671            struct work_struct send_acks_work alive, which is in the peer_device object */
5672
5673         if (err) {
5674                 conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
5675                 return;
5676         }
5677
5678         if (tcp_cork)
5679                 drbd_tcp_uncork(connection->meta.socket);
5680
5681         return;
5682 }