Merge tag 'trace-3.15-v2' of git://git.kernel.org/pub/scm/linux/kernel/git/rostedt...
[linux.git] / drivers / block / drbd / drbd_receiver.c
1 /*
2    drbd_receiver.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24
25
26 #include <linux/module.h>
27
28 #include <asm/uaccess.h>
29 #include <net/sock.h>
30
31 #include <linux/drbd.h>
32 #include <linux/fs.h>
33 #include <linux/file.h>
34 #include <linux/in.h>
35 #include <linux/mm.h>
36 #include <linux/memcontrol.h>
37 #include <linux/mm_inline.h>
38 #include <linux/slab.h>
39 #include <linux/pkt_sched.h>
40 #define __KERNEL_SYSCALLS__
41 #include <linux/unistd.h>
42 #include <linux/vmalloc.h>
43 #include <linux/random.h>
44 #include <linux/string.h>
45 #include <linux/scatterlist.h>
46 #include "drbd_int.h"
47 #include "drbd_protocol.h"
48 #include "drbd_req.h"
49
50 #include "drbd_vli.h"
51
52 struct packet_info {
53         enum drbd_packet cmd;
54         unsigned int size;
55         unsigned int vnr;
56         void *data;
57 };
58
59 enum finish_epoch {
60         FE_STILL_LIVE,
61         FE_DESTROYED,
62         FE_RECYCLED,
63 };
64
65 static int drbd_do_features(struct drbd_connection *connection);
66 static int drbd_do_auth(struct drbd_connection *connection);
67 static int drbd_disconnected(struct drbd_peer_device *);
68
69 static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *, struct drbd_epoch *, enum epoch_event);
70 static int e_end_block(struct drbd_work *, int);
71
72
73 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
74
75 /*
76  * some helper functions to deal with single linked page lists,
77  * page->private being our "next" pointer.
78  */
79
80 /* If at least n pages are linked at head, get n pages off.
81  * Otherwise, don't modify head, and return NULL.
82  * Locking is the responsibility of the caller.
83  */
84 static struct page *page_chain_del(struct page **head, int n)
85 {
86         struct page *page;
87         struct page *tmp;
88
89         BUG_ON(!n);
90         BUG_ON(!head);
91
92         page = *head;
93
94         if (!page)
95                 return NULL;
96
97         while (page) {
98                 tmp = page_chain_next(page);
99                 if (--n == 0)
100                         break; /* found sufficient pages */
101                 if (tmp == NULL)
102                         /* insufficient pages, don't use any of them. */
103                         return NULL;
104                 page = tmp;
105         }
106
107         /* add end of list marker for the returned list */
108         set_page_private(page, 0);
109         /* actual return value, and adjustment of head */
110         page = *head;
111         *head = tmp;
112         return page;
113 }
114
115 /* may be used outside of locks to find the tail of a (usually short)
116  * "private" page chain, before adding it back to a global chain head
117  * with page_chain_add() under a spinlock. */
118 static struct page *page_chain_tail(struct page *page, int *len)
119 {
120         struct page *tmp;
121         int i = 1;
122         while ((tmp = page_chain_next(page)))
123                 ++i, page = tmp;
124         if (len)
125                 *len = i;
126         return page;
127 }
128
129 static int page_chain_free(struct page *page)
130 {
131         struct page *tmp;
132         int i = 0;
133         page_chain_for_each_safe(page, tmp) {
134                 put_page(page);
135                 ++i;
136         }
137         return i;
138 }
139
140 static void page_chain_add(struct page **head,
141                 struct page *chain_first, struct page *chain_last)
142 {
143 #if 1
144         struct page *tmp;
145         tmp = page_chain_tail(chain_first, NULL);
146         BUG_ON(tmp != chain_last);
147 #endif
148
149         /* add chain to head */
150         set_page_private(chain_last, (unsigned long)*head);
151         *head = chain_first;
152 }
153
154 static struct page *__drbd_alloc_pages(struct drbd_device *device,
155                                        unsigned int number)
156 {
157         struct page *page = NULL;
158         struct page *tmp = NULL;
159         unsigned int i = 0;
160
161         /* Yes, testing drbd_pp_vacant outside the lock is racy.
162          * So what. It saves a spin_lock. */
163         if (drbd_pp_vacant >= number) {
164                 spin_lock(&drbd_pp_lock);
165                 page = page_chain_del(&drbd_pp_pool, number);
166                 if (page)
167                         drbd_pp_vacant -= number;
168                 spin_unlock(&drbd_pp_lock);
169                 if (page)
170                         return page;
171         }
172
173         /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
174          * "criss-cross" setup, that might cause write-out on some other DRBD,
175          * which in turn might block on the other node at this very place.  */
176         for (i = 0; i < number; i++) {
177                 tmp = alloc_page(GFP_TRY);
178                 if (!tmp)
179                         break;
180                 set_page_private(tmp, (unsigned long)page);
181                 page = tmp;
182         }
183
184         if (i == number)
185                 return page;
186
187         /* Not enough pages immediately available this time.
188          * No need to jump around here, drbd_alloc_pages will retry this
189          * function "soon". */
190         if (page) {
191                 tmp = page_chain_tail(page, NULL);
192                 spin_lock(&drbd_pp_lock);
193                 page_chain_add(&drbd_pp_pool, page, tmp);
194                 drbd_pp_vacant += i;
195                 spin_unlock(&drbd_pp_lock);
196         }
197         return NULL;
198 }
199
200 static void reclaim_finished_net_peer_reqs(struct drbd_device *device,
201                                            struct list_head *to_be_freed)
202 {
203         struct drbd_peer_request *peer_req, *tmp;
204
205         /* The EEs are always appended to the end of the list. Since
206            they are sent in order over the wire, they have to finish
207            in order. As soon as we see the first not finished we can
208            stop to examine the list... */
209
210         list_for_each_entry_safe(peer_req, tmp, &device->net_ee, w.list) {
211                 if (drbd_peer_req_has_active_page(peer_req))
212                         break;
213                 list_move(&peer_req->w.list, to_be_freed);
214         }
215 }
216
217 static void drbd_kick_lo_and_reclaim_net(struct drbd_device *device)
218 {
219         LIST_HEAD(reclaimed);
220         struct drbd_peer_request *peer_req, *t;
221
222         spin_lock_irq(&device->resource->req_lock);
223         reclaim_finished_net_peer_reqs(device, &reclaimed);
224         spin_unlock_irq(&device->resource->req_lock);
225
226         list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
227                 drbd_free_net_peer_req(device, peer_req);
228 }
229
230 /**
231  * drbd_alloc_pages() - Returns @number pages, retries forever (or until signalled)
232  * @device:     DRBD device.
233  * @number:     number of pages requested
234  * @retry:      whether to retry, if not enough pages are available right now
235  *
236  * Tries to allocate number pages, first from our own page pool, then from
237  * the kernel, unless this allocation would exceed the max_buffers setting.
238  * Possibly retry until DRBD frees sufficient pages somewhere else.
239  *
240  * Returns a page chain linked via page->private.
241  */
242 struct page *drbd_alloc_pages(struct drbd_peer_device *peer_device, unsigned int number,
243                               bool retry)
244 {
245         struct drbd_device *device = peer_device->device;
246         struct page *page = NULL;
247         struct net_conf *nc;
248         DEFINE_WAIT(wait);
249         int mxb;
250
251         /* Yes, we may run up to @number over max_buffers. If we
252          * follow it strictly, the admin will get it wrong anyways. */
253         rcu_read_lock();
254         nc = rcu_dereference(peer_device->connection->net_conf);
255         mxb = nc ? nc->max_buffers : 1000000;
256         rcu_read_unlock();
257
258         if (atomic_read(&device->pp_in_use) < mxb)
259                 page = __drbd_alloc_pages(device, number);
260
261         while (page == NULL) {
262                 prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
263
264                 drbd_kick_lo_and_reclaim_net(device);
265
266                 if (atomic_read(&device->pp_in_use) < mxb) {
267                         page = __drbd_alloc_pages(device, number);
268                         if (page)
269                                 break;
270                 }
271
272                 if (!retry)
273                         break;
274
275                 if (signal_pending(current)) {
276                         drbd_warn(device, "drbd_alloc_pages interrupted!\n");
277                         break;
278                 }
279
280                 schedule();
281         }
282         finish_wait(&drbd_pp_wait, &wait);
283
284         if (page)
285                 atomic_add(number, &device->pp_in_use);
286         return page;
287 }
288
289 /* Must not be used from irq, as that may deadlock: see drbd_alloc_pages.
290  * Is also used from inside an other spin_lock_irq(&resource->req_lock);
291  * Either links the page chain back to the global pool,
292  * or returns all pages to the system. */
293 static void drbd_free_pages(struct drbd_device *device, struct page *page, int is_net)
294 {
295         atomic_t *a = is_net ? &device->pp_in_use_by_net : &device->pp_in_use;
296         int i;
297
298         if (page == NULL)
299                 return;
300
301         if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * minor_count)
302                 i = page_chain_free(page);
303         else {
304                 struct page *tmp;
305                 tmp = page_chain_tail(page, &i);
306                 spin_lock(&drbd_pp_lock);
307                 page_chain_add(&drbd_pp_pool, page, tmp);
308                 drbd_pp_vacant += i;
309                 spin_unlock(&drbd_pp_lock);
310         }
311         i = atomic_sub_return(i, a);
312         if (i < 0)
313                 drbd_warn(device, "ASSERTION FAILED: %s: %d < 0\n",
314                         is_net ? "pp_in_use_by_net" : "pp_in_use", i);
315         wake_up(&drbd_pp_wait);
316 }
317
318 /*
319 You need to hold the req_lock:
320  _drbd_wait_ee_list_empty()
321
322 You must not have the req_lock:
323  drbd_free_peer_req()
324  drbd_alloc_peer_req()
325  drbd_free_peer_reqs()
326  drbd_ee_fix_bhs()
327  drbd_finish_peer_reqs()
328  drbd_clear_done_ee()
329  drbd_wait_ee_list_empty()
330 */
331
332 struct drbd_peer_request *
333 drbd_alloc_peer_req(struct drbd_peer_device *peer_device, u64 id, sector_t sector,
334                     unsigned int data_size, gfp_t gfp_mask) __must_hold(local)
335 {
336         struct drbd_device *device = peer_device->device;
337         struct drbd_peer_request *peer_req;
338         struct page *page = NULL;
339         unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
340
341         if (drbd_insert_fault(device, DRBD_FAULT_AL_EE))
342                 return NULL;
343
344         peer_req = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
345         if (!peer_req) {
346                 if (!(gfp_mask & __GFP_NOWARN))
347                         drbd_err(device, "%s: allocation failed\n", __func__);
348                 return NULL;
349         }
350
351         if (data_size) {
352                 page = drbd_alloc_pages(peer_device, nr_pages, (gfp_mask & __GFP_WAIT));
353                 if (!page)
354                         goto fail;
355         }
356
357         drbd_clear_interval(&peer_req->i);
358         peer_req->i.size = data_size;
359         peer_req->i.sector = sector;
360         peer_req->i.local = false;
361         peer_req->i.waiting = false;
362
363         peer_req->epoch = NULL;
364         peer_req->peer_device = peer_device;
365         peer_req->pages = page;
366         atomic_set(&peer_req->pending_bios, 0);
367         peer_req->flags = 0;
368         /*
369          * The block_id is opaque to the receiver.  It is not endianness
370          * converted, and sent back to the sender unchanged.
371          */
372         peer_req->block_id = id;
373
374         return peer_req;
375
376  fail:
377         mempool_free(peer_req, drbd_ee_mempool);
378         return NULL;
379 }
380
381 void __drbd_free_peer_req(struct drbd_device *device, struct drbd_peer_request *peer_req,
382                        int is_net)
383 {
384         if (peer_req->flags & EE_HAS_DIGEST)
385                 kfree(peer_req->digest);
386         drbd_free_pages(device, peer_req->pages, is_net);
387         D_ASSERT(device, atomic_read(&peer_req->pending_bios) == 0);
388         D_ASSERT(device, drbd_interval_empty(&peer_req->i));
389         mempool_free(peer_req, drbd_ee_mempool);
390 }
391
392 int drbd_free_peer_reqs(struct drbd_device *device, struct list_head *list)
393 {
394         LIST_HEAD(work_list);
395         struct drbd_peer_request *peer_req, *t;
396         int count = 0;
397         int is_net = list == &device->net_ee;
398
399         spin_lock_irq(&device->resource->req_lock);
400         list_splice_init(list, &work_list);
401         spin_unlock_irq(&device->resource->req_lock);
402
403         list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
404                 __drbd_free_peer_req(device, peer_req, is_net);
405                 count++;
406         }
407         return count;
408 }
409
410 /*
411  * See also comments in _req_mod(,BARRIER_ACKED) and receive_Barrier.
412  */
413 static int drbd_finish_peer_reqs(struct drbd_device *device)
414 {
415         LIST_HEAD(work_list);
416         LIST_HEAD(reclaimed);
417         struct drbd_peer_request *peer_req, *t;
418         int err = 0;
419
420         spin_lock_irq(&device->resource->req_lock);
421         reclaim_finished_net_peer_reqs(device, &reclaimed);
422         list_splice_init(&device->done_ee, &work_list);
423         spin_unlock_irq(&device->resource->req_lock);
424
425         list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
426                 drbd_free_net_peer_req(device, peer_req);
427
428         /* possible callbacks here:
429          * e_end_block, and e_end_resync_block, e_send_superseded.
430          * all ignore the last argument.
431          */
432         list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
433                 int err2;
434
435                 /* list_del not necessary, next/prev members not touched */
436                 err2 = peer_req->w.cb(&peer_req->w, !!err);
437                 if (!err)
438                         err = err2;
439                 drbd_free_peer_req(device, peer_req);
440         }
441         wake_up(&device->ee_wait);
442
443         return err;
444 }
445
446 static void _drbd_wait_ee_list_empty(struct drbd_device *device,
447                                      struct list_head *head)
448 {
449         DEFINE_WAIT(wait);
450
451         /* avoids spin_lock/unlock
452          * and calling prepare_to_wait in the fast path */
453         while (!list_empty(head)) {
454                 prepare_to_wait(&device->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
455                 spin_unlock_irq(&device->resource->req_lock);
456                 io_schedule();
457                 finish_wait(&device->ee_wait, &wait);
458                 spin_lock_irq(&device->resource->req_lock);
459         }
460 }
461
462 static void drbd_wait_ee_list_empty(struct drbd_device *device,
463                                     struct list_head *head)
464 {
465         spin_lock_irq(&device->resource->req_lock);
466         _drbd_wait_ee_list_empty(device, head);
467         spin_unlock_irq(&device->resource->req_lock);
468 }
469
470 static int drbd_recv_short(struct socket *sock, void *buf, size_t size, int flags)
471 {
472         mm_segment_t oldfs;
473         struct kvec iov = {
474                 .iov_base = buf,
475                 .iov_len = size,
476         };
477         struct msghdr msg = {
478                 .msg_iovlen = 1,
479                 .msg_iov = (struct iovec *)&iov,
480                 .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
481         };
482         int rv;
483
484         oldfs = get_fs();
485         set_fs(KERNEL_DS);
486         rv = sock_recvmsg(sock, &msg, size, msg.msg_flags);
487         set_fs(oldfs);
488
489         return rv;
490 }
491
492 static int drbd_recv(struct drbd_connection *connection, void *buf, size_t size)
493 {
494         int rv;
495
496         rv = drbd_recv_short(connection->data.socket, buf, size, 0);
497
498         if (rv < 0) {
499                 if (rv == -ECONNRESET)
500                         drbd_info(connection, "sock was reset by peer\n");
501                 else if (rv != -ERESTARTSYS)
502                         drbd_err(connection, "sock_recvmsg returned %d\n", rv);
503         } else if (rv == 0) {
504                 if (test_bit(DISCONNECT_SENT, &connection->flags)) {
505                         long t;
506                         rcu_read_lock();
507                         t = rcu_dereference(connection->net_conf)->ping_timeo * HZ/10;
508                         rcu_read_unlock();
509
510                         t = wait_event_timeout(connection->ping_wait, connection->cstate < C_WF_REPORT_PARAMS, t);
511
512                         if (t)
513                                 goto out;
514                 }
515                 drbd_info(connection, "sock was shut down by peer\n");
516         }
517
518         if (rv != size)
519                 conn_request_state(connection, NS(conn, C_BROKEN_PIPE), CS_HARD);
520
521 out:
522         return rv;
523 }
524
525 static int drbd_recv_all(struct drbd_connection *connection, void *buf, size_t size)
526 {
527         int err;
528
529         err = drbd_recv(connection, buf, size);
530         if (err != size) {
531                 if (err >= 0)
532                         err = -EIO;
533         } else
534                 err = 0;
535         return err;
536 }
537
538 static int drbd_recv_all_warn(struct drbd_connection *connection, void *buf, size_t size)
539 {
540         int err;
541
542         err = drbd_recv_all(connection, buf, size);
543         if (err && !signal_pending(current))
544                 drbd_warn(connection, "short read (expected size %d)\n", (int)size);
545         return err;
546 }
547
548 /* quoting tcp(7):
549  *   On individual connections, the socket buffer size must be set prior to the
550  *   listen(2) or connect(2) calls in order to have it take effect.
551  * This is our wrapper to do so.
552  */
553 static void drbd_setbufsize(struct socket *sock, unsigned int snd,
554                 unsigned int rcv)
555 {
556         /* open coded SO_SNDBUF, SO_RCVBUF */
557         if (snd) {
558                 sock->sk->sk_sndbuf = snd;
559                 sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
560         }
561         if (rcv) {
562                 sock->sk->sk_rcvbuf = rcv;
563                 sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
564         }
565 }
566
567 static struct socket *drbd_try_connect(struct drbd_connection *connection)
568 {
569         const char *what;
570         struct socket *sock;
571         struct sockaddr_in6 src_in6;
572         struct sockaddr_in6 peer_in6;
573         struct net_conf *nc;
574         int err, peer_addr_len, my_addr_len;
575         int sndbuf_size, rcvbuf_size, connect_int;
576         int disconnect_on_error = 1;
577
578         rcu_read_lock();
579         nc = rcu_dereference(connection->net_conf);
580         if (!nc) {
581                 rcu_read_unlock();
582                 return NULL;
583         }
584         sndbuf_size = nc->sndbuf_size;
585         rcvbuf_size = nc->rcvbuf_size;
586         connect_int = nc->connect_int;
587         rcu_read_unlock();
588
589         my_addr_len = min_t(int, connection->my_addr_len, sizeof(src_in6));
590         memcpy(&src_in6, &connection->my_addr, my_addr_len);
591
592         if (((struct sockaddr *)&connection->my_addr)->sa_family == AF_INET6)
593                 src_in6.sin6_port = 0;
594         else
595                 ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
596
597         peer_addr_len = min_t(int, connection->peer_addr_len, sizeof(src_in6));
598         memcpy(&peer_in6, &connection->peer_addr, peer_addr_len);
599
600         what = "sock_create_kern";
601         err = sock_create_kern(((struct sockaddr *)&src_in6)->sa_family,
602                                SOCK_STREAM, IPPROTO_TCP, &sock);
603         if (err < 0) {
604                 sock = NULL;
605                 goto out;
606         }
607
608         sock->sk->sk_rcvtimeo =
609         sock->sk->sk_sndtimeo = connect_int * HZ;
610         drbd_setbufsize(sock, sndbuf_size, rcvbuf_size);
611
612        /* explicitly bind to the configured IP as source IP
613         *  for the outgoing connections.
614         *  This is needed for multihomed hosts and to be
615         *  able to use lo: interfaces for drbd.
616         * Make sure to use 0 as port number, so linux selects
617         *  a free one dynamically.
618         */
619         what = "bind before connect";
620         err = sock->ops->bind(sock, (struct sockaddr *) &src_in6, my_addr_len);
621         if (err < 0)
622                 goto out;
623
624         /* connect may fail, peer not yet available.
625          * stay C_WF_CONNECTION, don't go Disconnecting! */
626         disconnect_on_error = 0;
627         what = "connect";
628         err = sock->ops->connect(sock, (struct sockaddr *) &peer_in6, peer_addr_len, 0);
629
630 out:
631         if (err < 0) {
632                 if (sock) {
633                         sock_release(sock);
634                         sock = NULL;
635                 }
636                 switch (-err) {
637                         /* timeout, busy, signal pending */
638                 case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
639                 case EINTR: case ERESTARTSYS:
640                         /* peer not (yet) available, network problem */
641                 case ECONNREFUSED: case ENETUNREACH:
642                 case EHOSTDOWN:    case EHOSTUNREACH:
643                         disconnect_on_error = 0;
644                         break;
645                 default:
646                         drbd_err(connection, "%s failed, err = %d\n", what, err);
647                 }
648                 if (disconnect_on_error)
649                         conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
650         }
651
652         return sock;
653 }
654
655 struct accept_wait_data {
656         struct drbd_connection *connection;
657         struct socket *s_listen;
658         struct completion door_bell;
659         void (*original_sk_state_change)(struct sock *sk);
660
661 };
662
663 static void drbd_incoming_connection(struct sock *sk)
664 {
665         struct accept_wait_data *ad = sk->sk_user_data;
666         void (*state_change)(struct sock *sk);
667
668         state_change = ad->original_sk_state_change;
669         if (sk->sk_state == TCP_ESTABLISHED)
670                 complete(&ad->door_bell);
671         state_change(sk);
672 }
673
674 static int prepare_listen_socket(struct drbd_connection *connection, struct accept_wait_data *ad)
675 {
676         int err, sndbuf_size, rcvbuf_size, my_addr_len;
677         struct sockaddr_in6 my_addr;
678         struct socket *s_listen;
679         struct net_conf *nc;
680         const char *what;
681
682         rcu_read_lock();
683         nc = rcu_dereference(connection->net_conf);
684         if (!nc) {
685                 rcu_read_unlock();
686                 return -EIO;
687         }
688         sndbuf_size = nc->sndbuf_size;
689         rcvbuf_size = nc->rcvbuf_size;
690         rcu_read_unlock();
691
692         my_addr_len = min_t(int, connection->my_addr_len, sizeof(struct sockaddr_in6));
693         memcpy(&my_addr, &connection->my_addr, my_addr_len);
694
695         what = "sock_create_kern";
696         err = sock_create_kern(((struct sockaddr *)&my_addr)->sa_family,
697                                SOCK_STREAM, IPPROTO_TCP, &s_listen);
698         if (err) {
699                 s_listen = NULL;
700                 goto out;
701         }
702
703         s_listen->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
704         drbd_setbufsize(s_listen, sndbuf_size, rcvbuf_size);
705
706         what = "bind before listen";
707         err = s_listen->ops->bind(s_listen, (struct sockaddr *)&my_addr, my_addr_len);
708         if (err < 0)
709                 goto out;
710
711         ad->s_listen = s_listen;
712         write_lock_bh(&s_listen->sk->sk_callback_lock);
713         ad->original_sk_state_change = s_listen->sk->sk_state_change;
714         s_listen->sk->sk_state_change = drbd_incoming_connection;
715         s_listen->sk->sk_user_data = ad;
716         write_unlock_bh(&s_listen->sk->sk_callback_lock);
717
718         what = "listen";
719         err = s_listen->ops->listen(s_listen, 5);
720         if (err < 0)
721                 goto out;
722
723         return 0;
724 out:
725         if (s_listen)
726                 sock_release(s_listen);
727         if (err < 0) {
728                 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
729                         drbd_err(connection, "%s failed, err = %d\n", what, err);
730                         conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
731                 }
732         }
733
734         return -EIO;
735 }
736
737 static void unregister_state_change(struct sock *sk, struct accept_wait_data *ad)
738 {
739         write_lock_bh(&sk->sk_callback_lock);
740         sk->sk_state_change = ad->original_sk_state_change;
741         sk->sk_user_data = NULL;
742         write_unlock_bh(&sk->sk_callback_lock);
743 }
744
745 static struct socket *drbd_wait_for_connect(struct drbd_connection *connection, struct accept_wait_data *ad)
746 {
747         int timeo, connect_int, err = 0;
748         struct socket *s_estab = NULL;
749         struct net_conf *nc;
750
751         rcu_read_lock();
752         nc = rcu_dereference(connection->net_conf);
753         if (!nc) {
754                 rcu_read_unlock();
755                 return NULL;
756         }
757         connect_int = nc->connect_int;
758         rcu_read_unlock();
759
760         timeo = connect_int * HZ;
761         /* 28.5% random jitter */
762         timeo += (prandom_u32() & 1) ? timeo / 7 : -timeo / 7;
763
764         err = wait_for_completion_interruptible_timeout(&ad->door_bell, timeo);
765         if (err <= 0)
766                 return NULL;
767
768         err = kernel_accept(ad->s_listen, &s_estab, 0);
769         if (err < 0) {
770                 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
771                         drbd_err(connection, "accept failed, err = %d\n", err);
772                         conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
773                 }
774         }
775
776         if (s_estab)
777                 unregister_state_change(s_estab->sk, ad);
778
779         return s_estab;
780 }
781
782 static int decode_header(struct drbd_connection *, void *, struct packet_info *);
783
784 static int send_first_packet(struct drbd_connection *connection, struct drbd_socket *sock,
785                              enum drbd_packet cmd)
786 {
787         if (!conn_prepare_command(connection, sock))
788                 return -EIO;
789         return conn_send_command(connection, sock, cmd, 0, NULL, 0);
790 }
791
792 static int receive_first_packet(struct drbd_connection *connection, struct socket *sock)
793 {
794         unsigned int header_size = drbd_header_size(connection);
795         struct packet_info pi;
796         int err;
797
798         err = drbd_recv_short(sock, connection->data.rbuf, header_size, 0);
799         if (err != header_size) {
800                 if (err >= 0)
801                         err = -EIO;
802                 return err;
803         }
804         err = decode_header(connection, connection->data.rbuf, &pi);
805         if (err)
806                 return err;
807         return pi.cmd;
808 }
809
810 /**
811  * drbd_socket_okay() - Free the socket if its connection is not okay
812  * @sock:       pointer to the pointer to the socket.
813  */
814 static int drbd_socket_okay(struct socket **sock)
815 {
816         int rr;
817         char tb[4];
818
819         if (!*sock)
820                 return false;
821
822         rr = drbd_recv_short(*sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
823
824         if (rr > 0 || rr == -EAGAIN) {
825                 return true;
826         } else {
827                 sock_release(*sock);
828                 *sock = NULL;
829                 return false;
830         }
831 }
832 /* Gets called if a connection is established, or if a new minor gets created
833    in a connection */
834 int drbd_connected(struct drbd_peer_device *peer_device)
835 {
836         struct drbd_device *device = peer_device->device;
837         int err;
838
839         atomic_set(&device->packet_seq, 0);
840         device->peer_seq = 0;
841
842         device->state_mutex = peer_device->connection->agreed_pro_version < 100 ?
843                 &peer_device->connection->cstate_mutex :
844                 &device->own_state_mutex;
845
846         err = drbd_send_sync_param(peer_device);
847         if (!err)
848                 err = drbd_send_sizes(peer_device, 0, 0);
849         if (!err)
850                 err = drbd_send_uuids(peer_device);
851         if (!err)
852                 err = drbd_send_current_state(peer_device);
853         clear_bit(USE_DEGR_WFC_T, &device->flags);
854         clear_bit(RESIZE_PENDING, &device->flags);
855         atomic_set(&device->ap_in_flight, 0);
856         mod_timer(&device->request_timer, jiffies + HZ); /* just start it here. */
857         return err;
858 }
859
860 /*
861  * return values:
862  *   1 yes, we have a valid connection
863  *   0 oops, did not work out, please try again
864  *  -1 peer talks different language,
865  *     no point in trying again, please go standalone.
866  *  -2 We do not have a network config...
867  */
868 static int conn_connect(struct drbd_connection *connection)
869 {
870         struct drbd_socket sock, msock;
871         struct drbd_peer_device *peer_device;
872         struct net_conf *nc;
873         int vnr, timeout, h, ok;
874         bool discard_my_data;
875         enum drbd_state_rv rv;
876         struct accept_wait_data ad = {
877                 .connection = connection,
878                 .door_bell = COMPLETION_INITIALIZER_ONSTACK(ad.door_bell),
879         };
880
881         clear_bit(DISCONNECT_SENT, &connection->flags);
882         if (conn_request_state(connection, NS(conn, C_WF_CONNECTION), CS_VERBOSE) < SS_SUCCESS)
883                 return -2;
884
885         mutex_init(&sock.mutex);
886         sock.sbuf = connection->data.sbuf;
887         sock.rbuf = connection->data.rbuf;
888         sock.socket = NULL;
889         mutex_init(&msock.mutex);
890         msock.sbuf = connection->meta.sbuf;
891         msock.rbuf = connection->meta.rbuf;
892         msock.socket = NULL;
893
894         /* Assume that the peer only understands protocol 80 until we know better.  */
895         connection->agreed_pro_version = 80;
896
897         if (prepare_listen_socket(connection, &ad))
898                 return 0;
899
900         do {
901                 struct socket *s;
902
903                 s = drbd_try_connect(connection);
904                 if (s) {
905                         if (!sock.socket) {
906                                 sock.socket = s;
907                                 send_first_packet(connection, &sock, P_INITIAL_DATA);
908                         } else if (!msock.socket) {
909                                 clear_bit(RESOLVE_CONFLICTS, &connection->flags);
910                                 msock.socket = s;
911                                 send_first_packet(connection, &msock, P_INITIAL_META);
912                         } else {
913                                 drbd_err(connection, "Logic error in conn_connect()\n");
914                                 goto out_release_sockets;
915                         }
916                 }
917
918                 if (sock.socket && msock.socket) {
919                         rcu_read_lock();
920                         nc = rcu_dereference(connection->net_conf);
921                         timeout = nc->ping_timeo * HZ / 10;
922                         rcu_read_unlock();
923                         schedule_timeout_interruptible(timeout);
924                         ok = drbd_socket_okay(&sock.socket);
925                         ok = drbd_socket_okay(&msock.socket) && ok;
926                         if (ok)
927                                 break;
928                 }
929
930 retry:
931                 s = drbd_wait_for_connect(connection, &ad);
932                 if (s) {
933                         int fp = receive_first_packet(connection, s);
934                         drbd_socket_okay(&sock.socket);
935                         drbd_socket_okay(&msock.socket);
936                         switch (fp) {
937                         case P_INITIAL_DATA:
938                                 if (sock.socket) {
939                                         drbd_warn(connection, "initial packet S crossed\n");
940                                         sock_release(sock.socket);
941                                         sock.socket = s;
942                                         goto randomize;
943                                 }
944                                 sock.socket = s;
945                                 break;
946                         case P_INITIAL_META:
947                                 set_bit(RESOLVE_CONFLICTS, &connection->flags);
948                                 if (msock.socket) {
949                                         drbd_warn(connection, "initial packet M crossed\n");
950                                         sock_release(msock.socket);
951                                         msock.socket = s;
952                                         goto randomize;
953                                 }
954                                 msock.socket = s;
955                                 break;
956                         default:
957                                 drbd_warn(connection, "Error receiving initial packet\n");
958                                 sock_release(s);
959 randomize:
960                                 if (prandom_u32() & 1)
961                                         goto retry;
962                         }
963                 }
964
965                 if (connection->cstate <= C_DISCONNECTING)
966                         goto out_release_sockets;
967                 if (signal_pending(current)) {
968                         flush_signals(current);
969                         smp_rmb();
970                         if (get_t_state(&connection->receiver) == EXITING)
971                                 goto out_release_sockets;
972                 }
973
974                 ok = drbd_socket_okay(&sock.socket);
975                 ok = drbd_socket_okay(&msock.socket) && ok;
976         } while (!ok);
977
978         if (ad.s_listen)
979                 sock_release(ad.s_listen);
980
981         sock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
982         msock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
983
984         sock.socket->sk->sk_allocation = GFP_NOIO;
985         msock.socket->sk->sk_allocation = GFP_NOIO;
986
987         sock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
988         msock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE;
989
990         /* NOT YET ...
991          * sock.socket->sk->sk_sndtimeo = connection->net_conf->timeout*HZ/10;
992          * sock.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
993          * first set it to the P_CONNECTION_FEATURES timeout,
994          * which we set to 4x the configured ping_timeout. */
995         rcu_read_lock();
996         nc = rcu_dereference(connection->net_conf);
997
998         sock.socket->sk->sk_sndtimeo =
999         sock.socket->sk->sk_rcvtimeo = nc->ping_timeo*4*HZ/10;
1000
1001         msock.socket->sk->sk_rcvtimeo = nc->ping_int*HZ;
1002         timeout = nc->timeout * HZ / 10;
1003         discard_my_data = nc->discard_my_data;
1004         rcu_read_unlock();
1005
1006         msock.socket->sk->sk_sndtimeo = timeout;
1007
1008         /* we don't want delays.
1009          * we use TCP_CORK where appropriate, though */
1010         drbd_tcp_nodelay(sock.socket);
1011         drbd_tcp_nodelay(msock.socket);
1012
1013         connection->data.socket = sock.socket;
1014         connection->meta.socket = msock.socket;
1015         connection->last_received = jiffies;
1016
1017         h = drbd_do_features(connection);
1018         if (h <= 0)
1019                 return h;
1020
1021         if (connection->cram_hmac_tfm) {
1022                 /* drbd_request_state(device, NS(conn, WFAuth)); */
1023                 switch (drbd_do_auth(connection)) {
1024                 case -1:
1025                         drbd_err(connection, "Authentication of peer failed\n");
1026                         return -1;
1027                 case 0:
1028                         drbd_err(connection, "Authentication of peer failed, trying again.\n");
1029                         return 0;
1030                 }
1031         }
1032
1033         connection->data.socket->sk->sk_sndtimeo = timeout;
1034         connection->data.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
1035
1036         if (drbd_send_protocol(connection) == -EOPNOTSUPP)
1037                 return -1;
1038
1039         set_bit(STATE_SENT, &connection->flags);
1040
1041         rcu_read_lock();
1042         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1043                 struct drbd_device *device = peer_device->device;
1044                 kref_get(&device->kref);
1045                 rcu_read_unlock();
1046
1047                 /* Prevent a race between resync-handshake and
1048                  * being promoted to Primary.
1049                  *
1050                  * Grab and release the state mutex, so we know that any current
1051                  * drbd_set_role() is finished, and any incoming drbd_set_role
1052                  * will see the STATE_SENT flag, and wait for it to be cleared.
1053                  */
1054                 mutex_lock(device->state_mutex);
1055                 mutex_unlock(device->state_mutex);
1056
1057                 if (discard_my_data)
1058                         set_bit(DISCARD_MY_DATA, &device->flags);
1059                 else
1060                         clear_bit(DISCARD_MY_DATA, &device->flags);
1061
1062                 drbd_connected(peer_device);
1063                 kref_put(&device->kref, drbd_destroy_device);
1064                 rcu_read_lock();
1065         }
1066         rcu_read_unlock();
1067
1068         rv = conn_request_state(connection, NS(conn, C_WF_REPORT_PARAMS), CS_VERBOSE);
1069         if (rv < SS_SUCCESS || connection->cstate != C_WF_REPORT_PARAMS) {
1070                 clear_bit(STATE_SENT, &connection->flags);
1071                 return 0;
1072         }
1073
1074         drbd_thread_start(&connection->asender);
1075
1076         mutex_lock(&connection->resource->conf_update);
1077         /* The discard_my_data flag is a single-shot modifier to the next
1078          * connection attempt, the handshake of which is now well underway.
1079          * No need for rcu style copying of the whole struct
1080          * just to clear a single value. */
1081         connection->net_conf->discard_my_data = 0;
1082         mutex_unlock(&connection->resource->conf_update);
1083
1084         return h;
1085
1086 out_release_sockets:
1087         if (ad.s_listen)
1088                 sock_release(ad.s_listen);
1089         if (sock.socket)
1090                 sock_release(sock.socket);
1091         if (msock.socket)
1092                 sock_release(msock.socket);
1093         return -1;
1094 }
1095
1096 static int decode_header(struct drbd_connection *connection, void *header, struct packet_info *pi)
1097 {
1098         unsigned int header_size = drbd_header_size(connection);
1099
1100         if (header_size == sizeof(struct p_header100) &&
1101             *(__be32 *)header == cpu_to_be32(DRBD_MAGIC_100)) {
1102                 struct p_header100 *h = header;
1103                 if (h->pad != 0) {
1104                         drbd_err(connection, "Header padding is not zero\n");
1105                         return -EINVAL;
1106                 }
1107                 pi->vnr = be16_to_cpu(h->volume);
1108                 pi->cmd = be16_to_cpu(h->command);
1109                 pi->size = be32_to_cpu(h->length);
1110         } else if (header_size == sizeof(struct p_header95) &&
1111                    *(__be16 *)header == cpu_to_be16(DRBD_MAGIC_BIG)) {
1112                 struct p_header95 *h = header;
1113                 pi->cmd = be16_to_cpu(h->command);
1114                 pi->size = be32_to_cpu(h->length);
1115                 pi->vnr = 0;
1116         } else if (header_size == sizeof(struct p_header80) &&
1117                    *(__be32 *)header == cpu_to_be32(DRBD_MAGIC)) {
1118                 struct p_header80 *h = header;
1119                 pi->cmd = be16_to_cpu(h->command);
1120                 pi->size = be16_to_cpu(h->length);
1121                 pi->vnr = 0;
1122         } else {
1123                 drbd_err(connection, "Wrong magic value 0x%08x in protocol version %d\n",
1124                          be32_to_cpu(*(__be32 *)header),
1125                          connection->agreed_pro_version);
1126                 return -EINVAL;
1127         }
1128         pi->data = header + header_size;
1129         return 0;
1130 }
1131
1132 static int drbd_recv_header(struct drbd_connection *connection, struct packet_info *pi)
1133 {
1134         void *buffer = connection->data.rbuf;
1135         int err;
1136
1137         err = drbd_recv_all_warn(connection, buffer, drbd_header_size(connection));
1138         if (err)
1139                 return err;
1140
1141         err = decode_header(connection, buffer, pi);
1142         connection->last_received = jiffies;
1143
1144         return err;
1145 }
1146
1147 static void drbd_flush(struct drbd_connection *connection)
1148 {
1149         int rv;
1150         struct drbd_peer_device *peer_device;
1151         int vnr;
1152
1153         if (connection->write_ordering >= WO_bdev_flush) {
1154                 rcu_read_lock();
1155                 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1156                         struct drbd_device *device = peer_device->device;
1157
1158                         if (!get_ldev(device))
1159                                 continue;
1160                         kref_get(&device->kref);
1161                         rcu_read_unlock();
1162
1163                         rv = blkdev_issue_flush(device->ldev->backing_bdev,
1164                                         GFP_NOIO, NULL);
1165                         if (rv) {
1166                                 drbd_info(device, "local disk flush failed with status %d\n", rv);
1167                                 /* would rather check on EOPNOTSUPP, but that is not reliable.
1168                                  * don't try again for ANY return value != 0
1169                                  * if (rv == -EOPNOTSUPP) */
1170                                 drbd_bump_write_ordering(connection, WO_drain_io);
1171                         }
1172                         put_ldev(device);
1173                         kref_put(&device->kref, drbd_destroy_device);
1174
1175                         rcu_read_lock();
1176                         if (rv)
1177                                 break;
1178                 }
1179                 rcu_read_unlock();
1180         }
1181 }
1182
1183 /**
1184  * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
1185  * @device:     DRBD device.
1186  * @epoch:      Epoch object.
1187  * @ev:         Epoch event.
1188  */
1189 static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *connection,
1190                                                struct drbd_epoch *epoch,
1191                                                enum epoch_event ev)
1192 {
1193         int epoch_size;
1194         struct drbd_epoch *next_epoch;
1195         enum finish_epoch rv = FE_STILL_LIVE;
1196
1197         spin_lock(&connection->epoch_lock);
1198         do {
1199                 next_epoch = NULL;
1200
1201                 epoch_size = atomic_read(&epoch->epoch_size);
1202
1203                 switch (ev & ~EV_CLEANUP) {
1204                 case EV_PUT:
1205                         atomic_dec(&epoch->active);
1206                         break;
1207                 case EV_GOT_BARRIER_NR:
1208                         set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1209                         break;
1210                 case EV_BECAME_LAST:
1211                         /* nothing to do*/
1212                         break;
1213                 }
1214
1215                 if (epoch_size != 0 &&
1216                     atomic_read(&epoch->active) == 0 &&
1217                     (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags) || ev & EV_CLEANUP)) {
1218                         if (!(ev & EV_CLEANUP)) {
1219                                 spin_unlock(&connection->epoch_lock);
1220                                 drbd_send_b_ack(epoch->connection, epoch->barrier_nr, epoch_size);
1221                                 spin_lock(&connection->epoch_lock);
1222                         }
1223 #if 0
1224                         /* FIXME: dec unacked on connection, once we have
1225                          * something to count pending connection packets in. */
1226                         if (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags))
1227                                 dec_unacked(epoch->connection);
1228 #endif
1229
1230                         if (connection->current_epoch != epoch) {
1231                                 next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1232                                 list_del(&epoch->list);
1233                                 ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1234                                 connection->epochs--;
1235                                 kfree(epoch);
1236
1237                                 if (rv == FE_STILL_LIVE)
1238                                         rv = FE_DESTROYED;
1239                         } else {
1240                                 epoch->flags = 0;
1241                                 atomic_set(&epoch->epoch_size, 0);
1242                                 /* atomic_set(&epoch->active, 0); is already zero */
1243                                 if (rv == FE_STILL_LIVE)
1244                                         rv = FE_RECYCLED;
1245                         }
1246                 }
1247
1248                 if (!next_epoch)
1249                         break;
1250
1251                 epoch = next_epoch;
1252         } while (1);
1253
1254         spin_unlock(&connection->epoch_lock);
1255
1256         return rv;
1257 }
1258
1259 /**
1260  * drbd_bump_write_ordering() - Fall back to an other write ordering method
1261  * @connection: DRBD connection.
1262  * @wo:         Write ordering method to try.
1263  */
1264 void drbd_bump_write_ordering(struct drbd_connection *connection, enum write_ordering_e wo)
1265 {
1266         struct disk_conf *dc;
1267         struct drbd_peer_device *peer_device;
1268         enum write_ordering_e pwo;
1269         int vnr;
1270         static char *write_ordering_str[] = {
1271                 [WO_none] = "none",
1272                 [WO_drain_io] = "drain",
1273                 [WO_bdev_flush] = "flush",
1274         };
1275
1276         pwo = connection->write_ordering;
1277         wo = min(pwo, wo);
1278         rcu_read_lock();
1279         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1280                 struct drbd_device *device = peer_device->device;
1281
1282                 if (!get_ldev_if_state(device, D_ATTACHING))
1283                         continue;
1284                 dc = rcu_dereference(device->ldev->disk_conf);
1285
1286                 if (wo == WO_bdev_flush && !dc->disk_flushes)
1287                         wo = WO_drain_io;
1288                 if (wo == WO_drain_io && !dc->disk_drain)
1289                         wo = WO_none;
1290                 put_ldev(device);
1291         }
1292         rcu_read_unlock();
1293         connection->write_ordering = wo;
1294         if (pwo != connection->write_ordering || wo == WO_bdev_flush)
1295                 drbd_info(connection, "Method to ensure write ordering: %s\n", write_ordering_str[connection->write_ordering]);
1296 }
1297
1298 /**
1299  * drbd_submit_peer_request()
1300  * @device:     DRBD device.
1301  * @peer_req:   peer request
1302  * @rw:         flag field, see bio->bi_rw
1303  *
1304  * May spread the pages to multiple bios,
1305  * depending on bio_add_page restrictions.
1306  *
1307  * Returns 0 if all bios have been submitted,
1308  * -ENOMEM if we could not allocate enough bios,
1309  * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1310  *  single page to an empty bio (which should never happen and likely indicates
1311  *  that the lower level IO stack is in some way broken). This has been observed
1312  *  on certain Xen deployments.
1313  */
1314 /* TODO allocate from our own bio_set. */
1315 int drbd_submit_peer_request(struct drbd_device *device,
1316                              struct drbd_peer_request *peer_req,
1317                              const unsigned rw, const int fault_type)
1318 {
1319         struct bio *bios = NULL;
1320         struct bio *bio;
1321         struct page *page = peer_req->pages;
1322         sector_t sector = peer_req->i.sector;
1323         unsigned ds = peer_req->i.size;
1324         unsigned n_bios = 0;
1325         unsigned nr_pages = (ds + PAGE_SIZE -1) >> PAGE_SHIFT;
1326         int err = -ENOMEM;
1327
1328         /* In most cases, we will only need one bio.  But in case the lower
1329          * level restrictions happen to be different at this offset on this
1330          * side than those of the sending peer, we may need to submit the
1331          * request in more than one bio.
1332          *
1333          * Plain bio_alloc is good enough here, this is no DRBD internally
1334          * generated bio, but a bio allocated on behalf of the peer.
1335          */
1336 next_bio:
1337         bio = bio_alloc(GFP_NOIO, nr_pages);
1338         if (!bio) {
1339                 drbd_err(device, "submit_ee: Allocation of a bio failed\n");
1340                 goto fail;
1341         }
1342         /* > peer_req->i.sector, unless this is the first bio */
1343         bio->bi_iter.bi_sector = sector;
1344         bio->bi_bdev = device->ldev->backing_bdev;
1345         bio->bi_rw = rw;
1346         bio->bi_private = peer_req;
1347         bio->bi_end_io = drbd_peer_request_endio;
1348
1349         bio->bi_next = bios;
1350         bios = bio;
1351         ++n_bios;
1352
1353         page_chain_for_each(page) {
1354                 unsigned len = min_t(unsigned, ds, PAGE_SIZE);
1355                 if (!bio_add_page(bio, page, len, 0)) {
1356                         /* A single page must always be possible!
1357                          * But in case it fails anyways,
1358                          * we deal with it, and complain (below). */
1359                         if (bio->bi_vcnt == 0) {
1360                                 drbd_err(device,
1361                                         "bio_add_page failed for len=%u, "
1362                                         "bi_vcnt=0 (bi_sector=%llu)\n",
1363                                         len, (uint64_t)bio->bi_iter.bi_sector);
1364                                 err = -ENOSPC;
1365                                 goto fail;
1366                         }
1367                         goto next_bio;
1368                 }
1369                 ds -= len;
1370                 sector += len >> 9;
1371                 --nr_pages;
1372         }
1373         D_ASSERT(device, page == NULL);
1374         D_ASSERT(device, ds == 0);
1375
1376         atomic_set(&peer_req->pending_bios, n_bios);
1377         do {
1378                 bio = bios;
1379                 bios = bios->bi_next;
1380                 bio->bi_next = NULL;
1381
1382                 drbd_generic_make_request(device, fault_type, bio);
1383         } while (bios);
1384         return 0;
1385
1386 fail:
1387         while (bios) {
1388                 bio = bios;
1389                 bios = bios->bi_next;
1390                 bio_put(bio);
1391         }
1392         return err;
1393 }
1394
1395 static void drbd_remove_epoch_entry_interval(struct drbd_device *device,
1396                                              struct drbd_peer_request *peer_req)
1397 {
1398         struct drbd_interval *i = &peer_req->i;
1399
1400         drbd_remove_interval(&device->write_requests, i);
1401         drbd_clear_interval(i);
1402
1403         /* Wake up any processes waiting for this peer request to complete.  */
1404         if (i->waiting)
1405                 wake_up(&device->misc_wait);
1406 }
1407
1408 static void conn_wait_active_ee_empty(struct drbd_connection *connection)
1409 {
1410         struct drbd_peer_device *peer_device;
1411         int vnr;
1412
1413         rcu_read_lock();
1414         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1415                 struct drbd_device *device = peer_device->device;
1416
1417                 kref_get(&device->kref);
1418                 rcu_read_unlock();
1419                 drbd_wait_ee_list_empty(device, &device->active_ee);
1420                 kref_put(&device->kref, drbd_destroy_device);
1421                 rcu_read_lock();
1422         }
1423         rcu_read_unlock();
1424 }
1425
1426 static struct drbd_peer_device *
1427 conn_peer_device(struct drbd_connection *connection, int volume_number)
1428 {
1429         return idr_find(&connection->peer_devices, volume_number);
1430 }
1431
1432 static int receive_Barrier(struct drbd_connection *connection, struct packet_info *pi)
1433 {
1434         int rv;
1435         struct p_barrier *p = pi->data;
1436         struct drbd_epoch *epoch;
1437
1438         /* FIXME these are unacked on connection,
1439          * not a specific (peer)device.
1440          */
1441         connection->current_epoch->barrier_nr = p->barrier;
1442         connection->current_epoch->connection = connection;
1443         rv = drbd_may_finish_epoch(connection, connection->current_epoch, EV_GOT_BARRIER_NR);
1444
1445         /* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1446          * the activity log, which means it would not be resynced in case the
1447          * R_PRIMARY crashes now.
1448          * Therefore we must send the barrier_ack after the barrier request was
1449          * completed. */
1450         switch (connection->write_ordering) {
1451         case WO_none:
1452                 if (rv == FE_RECYCLED)
1453                         return 0;
1454
1455                 /* receiver context, in the writeout path of the other node.
1456                  * avoid potential distributed deadlock */
1457                 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1458                 if (epoch)
1459                         break;
1460                 else
1461                         drbd_warn(connection, "Allocation of an epoch failed, slowing down\n");
1462                         /* Fall through */
1463
1464         case WO_bdev_flush:
1465         case WO_drain_io:
1466                 conn_wait_active_ee_empty(connection);
1467                 drbd_flush(connection);
1468
1469                 if (atomic_read(&connection->current_epoch->epoch_size)) {
1470                         epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1471                         if (epoch)
1472                                 break;
1473                 }
1474
1475                 return 0;
1476         default:
1477                 drbd_err(connection, "Strangeness in connection->write_ordering %d\n", connection->write_ordering);
1478                 return -EIO;
1479         }
1480
1481         epoch->flags = 0;
1482         atomic_set(&epoch->epoch_size, 0);
1483         atomic_set(&epoch->active, 0);
1484
1485         spin_lock(&connection->epoch_lock);
1486         if (atomic_read(&connection->current_epoch->epoch_size)) {
1487                 list_add(&epoch->list, &connection->current_epoch->list);
1488                 connection->current_epoch = epoch;
1489                 connection->epochs++;
1490         } else {
1491                 /* The current_epoch got recycled while we allocated this one... */
1492                 kfree(epoch);
1493         }
1494         spin_unlock(&connection->epoch_lock);
1495
1496         return 0;
1497 }
1498
1499 /* used from receive_RSDataReply (recv_resync_read)
1500  * and from receive_Data */
1501 static struct drbd_peer_request *
1502 read_in_block(struct drbd_peer_device *peer_device, u64 id, sector_t sector,
1503               int data_size) __must_hold(local)
1504 {
1505         struct drbd_device *device = peer_device->device;
1506         const sector_t capacity = drbd_get_capacity(device->this_bdev);
1507         struct drbd_peer_request *peer_req;
1508         struct page *page;
1509         int dgs, ds, err;
1510         void *dig_in = peer_device->connection->int_dig_in;
1511         void *dig_vv = peer_device->connection->int_dig_vv;
1512         unsigned long *data;
1513
1514         dgs = 0;
1515         if (peer_device->connection->peer_integrity_tfm) {
1516                 dgs = crypto_hash_digestsize(peer_device->connection->peer_integrity_tfm);
1517                 /*
1518                  * FIXME: Receive the incoming digest into the receive buffer
1519                  *        here, together with its struct p_data?
1520                  */
1521                 err = drbd_recv_all_warn(peer_device->connection, dig_in, dgs);
1522                 if (err)
1523                         return NULL;
1524                 data_size -= dgs;
1525         }
1526
1527         if (!expect(IS_ALIGNED(data_size, 512)))
1528                 return NULL;
1529         if (!expect(data_size <= DRBD_MAX_BIO_SIZE))
1530                 return NULL;
1531
1532         /* even though we trust out peer,
1533          * we sometimes have to double check. */
1534         if (sector + (data_size>>9) > capacity) {
1535                 drbd_err(device, "request from peer beyond end of local disk: "
1536                         "capacity: %llus < sector: %llus + size: %u\n",
1537                         (unsigned long long)capacity,
1538                         (unsigned long long)sector, data_size);
1539                 return NULL;
1540         }
1541
1542         /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1543          * "criss-cross" setup, that might cause write-out on some other DRBD,
1544          * which in turn might block on the other node at this very place.  */
1545         peer_req = drbd_alloc_peer_req(peer_device, id, sector, data_size, GFP_NOIO);
1546         if (!peer_req)
1547                 return NULL;
1548
1549         if (!data_size)
1550                 return peer_req;
1551
1552         ds = data_size;
1553         page = peer_req->pages;
1554         page_chain_for_each(page) {
1555                 unsigned len = min_t(int, ds, PAGE_SIZE);
1556                 data = kmap(page);
1557                 err = drbd_recv_all_warn(peer_device->connection, data, len);
1558                 if (drbd_insert_fault(device, DRBD_FAULT_RECEIVE)) {
1559                         drbd_err(device, "Fault injection: Corrupting data on receive\n");
1560                         data[0] = data[0] ^ (unsigned long)-1;
1561                 }
1562                 kunmap(page);
1563                 if (err) {
1564                         drbd_free_peer_req(device, peer_req);
1565                         return NULL;
1566                 }
1567                 ds -= len;
1568         }
1569
1570         if (dgs) {
1571                 drbd_csum_ee(peer_device->connection->peer_integrity_tfm, peer_req, dig_vv);
1572                 if (memcmp(dig_in, dig_vv, dgs)) {
1573                         drbd_err(device, "Digest integrity check FAILED: %llus +%u\n",
1574                                 (unsigned long long)sector, data_size);
1575                         drbd_free_peer_req(device, peer_req);
1576                         return NULL;
1577                 }
1578         }
1579         device->recv_cnt += data_size>>9;
1580         return peer_req;
1581 }
1582
1583 /* drbd_drain_block() just takes a data block
1584  * out of the socket input buffer, and discards it.
1585  */
1586 static int drbd_drain_block(struct drbd_peer_device *peer_device, int data_size)
1587 {
1588         struct page *page;
1589         int err = 0;
1590         void *data;
1591
1592         if (!data_size)
1593                 return 0;
1594
1595         page = drbd_alloc_pages(peer_device, 1, 1);
1596
1597         data = kmap(page);
1598         while (data_size) {
1599                 unsigned int len = min_t(int, data_size, PAGE_SIZE);
1600
1601                 err = drbd_recv_all_warn(peer_device->connection, data, len);
1602                 if (err)
1603                         break;
1604                 data_size -= len;
1605         }
1606         kunmap(page);
1607         drbd_free_pages(peer_device->device, page, 0);
1608         return err;
1609 }
1610
1611 static int recv_dless_read(struct drbd_peer_device *peer_device, struct drbd_request *req,
1612                            sector_t sector, int data_size)
1613 {
1614         struct bio_vec bvec;
1615         struct bvec_iter iter;
1616         struct bio *bio;
1617         int dgs, err, expect;
1618         void *dig_in = peer_device->connection->int_dig_in;
1619         void *dig_vv = peer_device->connection->int_dig_vv;
1620
1621         dgs = 0;
1622         if (peer_device->connection->peer_integrity_tfm) {
1623                 dgs = crypto_hash_digestsize(peer_device->connection->peer_integrity_tfm);
1624                 err = drbd_recv_all_warn(peer_device->connection, dig_in, dgs);
1625                 if (err)
1626                         return err;
1627                 data_size -= dgs;
1628         }
1629
1630         /* optimistically update recv_cnt.  if receiving fails below,
1631          * we disconnect anyways, and counters will be reset. */
1632         peer_device->device->recv_cnt += data_size>>9;
1633
1634         bio = req->master_bio;
1635         D_ASSERT(peer_device->device, sector == bio->bi_iter.bi_sector);
1636
1637         bio_for_each_segment(bvec, bio, iter) {
1638                 void *mapped = kmap(bvec.bv_page) + bvec.bv_offset;
1639                 expect = min_t(int, data_size, bvec.bv_len);
1640                 err = drbd_recv_all_warn(peer_device->connection, mapped, expect);
1641                 kunmap(bvec.bv_page);
1642                 if (err)
1643                         return err;
1644                 data_size -= expect;
1645         }
1646
1647         if (dgs) {
1648                 drbd_csum_bio(peer_device->connection->peer_integrity_tfm, bio, dig_vv);
1649                 if (memcmp(dig_in, dig_vv, dgs)) {
1650                         drbd_err(peer_device, "Digest integrity check FAILED. Broken NICs?\n");
1651                         return -EINVAL;
1652                 }
1653         }
1654
1655         D_ASSERT(peer_device->device, data_size == 0);
1656         return 0;
1657 }
1658
1659 /*
1660  * e_end_resync_block() is called in asender context via
1661  * drbd_finish_peer_reqs().
1662  */
1663 static int e_end_resync_block(struct drbd_work *w, int unused)
1664 {
1665         struct drbd_peer_request *peer_req =
1666                 container_of(w, struct drbd_peer_request, w);
1667         struct drbd_peer_device *peer_device = peer_req->peer_device;
1668         struct drbd_device *device = peer_device->device;
1669         sector_t sector = peer_req->i.sector;
1670         int err;
1671
1672         D_ASSERT(device, drbd_interval_empty(&peer_req->i));
1673
1674         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1675                 drbd_set_in_sync(device, sector, peer_req->i.size);
1676                 err = drbd_send_ack(peer_device, P_RS_WRITE_ACK, peer_req);
1677         } else {
1678                 /* Record failure to sync */
1679                 drbd_rs_failed_io(device, sector, peer_req->i.size);
1680
1681                 err  = drbd_send_ack(peer_device, P_NEG_ACK, peer_req);
1682         }
1683         dec_unacked(device);
1684
1685         return err;
1686 }
1687
1688 static int recv_resync_read(struct drbd_peer_device *peer_device, sector_t sector,
1689                             int data_size) __releases(local)
1690 {
1691         struct drbd_device *device = peer_device->device;
1692         struct drbd_peer_request *peer_req;
1693
1694         peer_req = read_in_block(peer_device, ID_SYNCER, sector, data_size);
1695         if (!peer_req)
1696                 goto fail;
1697
1698         dec_rs_pending(device);
1699
1700         inc_unacked(device);
1701         /* corresponding dec_unacked() in e_end_resync_block()
1702          * respective _drbd_clear_done_ee */
1703
1704         peer_req->w.cb = e_end_resync_block;
1705
1706         spin_lock_irq(&device->resource->req_lock);
1707         list_add(&peer_req->w.list, &device->sync_ee);
1708         spin_unlock_irq(&device->resource->req_lock);
1709
1710         atomic_add(data_size >> 9, &device->rs_sect_ev);
1711         if (drbd_submit_peer_request(device, peer_req, WRITE, DRBD_FAULT_RS_WR) == 0)
1712                 return 0;
1713
1714         /* don't care for the reason here */
1715         drbd_err(device, "submit failed, triggering re-connect\n");
1716         spin_lock_irq(&device->resource->req_lock);
1717         list_del(&peer_req->w.list);
1718         spin_unlock_irq(&device->resource->req_lock);
1719
1720         drbd_free_peer_req(device, peer_req);
1721 fail:
1722         put_ldev(device);
1723         return -EIO;
1724 }
1725
1726 static struct drbd_request *
1727 find_request(struct drbd_device *device, struct rb_root *root, u64 id,
1728              sector_t sector, bool missing_ok, const char *func)
1729 {
1730         struct drbd_request *req;
1731
1732         /* Request object according to our peer */
1733         req = (struct drbd_request *)(unsigned long)id;
1734         if (drbd_contains_interval(root, sector, &req->i) && req->i.local)
1735                 return req;
1736         if (!missing_ok) {
1737                 drbd_err(device, "%s: failed to find request 0x%lx, sector %llus\n", func,
1738                         (unsigned long)id, (unsigned long long)sector);
1739         }
1740         return NULL;
1741 }
1742
1743 static int receive_DataReply(struct drbd_connection *connection, struct packet_info *pi)
1744 {
1745         struct drbd_peer_device *peer_device;
1746         struct drbd_device *device;
1747         struct drbd_request *req;
1748         sector_t sector;
1749         int err;
1750         struct p_data *p = pi->data;
1751
1752         peer_device = conn_peer_device(connection, pi->vnr);
1753         if (!peer_device)
1754                 return -EIO;
1755         device = peer_device->device;
1756
1757         sector = be64_to_cpu(p->sector);
1758
1759         spin_lock_irq(&device->resource->req_lock);
1760         req = find_request(device, &device->read_requests, p->block_id, sector, false, __func__);
1761         spin_unlock_irq(&device->resource->req_lock);
1762         if (unlikely(!req))
1763                 return -EIO;
1764
1765         /* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
1766          * special casing it there for the various failure cases.
1767          * still no race with drbd_fail_pending_reads */
1768         err = recv_dless_read(peer_device, req, sector, pi->size);
1769         if (!err)
1770                 req_mod(req, DATA_RECEIVED);
1771         /* else: nothing. handled from drbd_disconnect...
1772          * I don't think we may complete this just yet
1773          * in case we are "on-disconnect: freeze" */
1774
1775         return err;
1776 }
1777
1778 static int receive_RSDataReply(struct drbd_connection *connection, struct packet_info *pi)
1779 {
1780         struct drbd_peer_device *peer_device;
1781         struct drbd_device *device;
1782         sector_t sector;
1783         int err;
1784         struct p_data *p = pi->data;
1785
1786         peer_device = conn_peer_device(connection, pi->vnr);
1787         if (!peer_device)
1788                 return -EIO;
1789         device = peer_device->device;
1790
1791         sector = be64_to_cpu(p->sector);
1792         D_ASSERT(device, p->block_id == ID_SYNCER);
1793
1794         if (get_ldev(device)) {
1795                 /* data is submitted to disk within recv_resync_read.
1796                  * corresponding put_ldev done below on error,
1797                  * or in drbd_peer_request_endio. */
1798                 err = recv_resync_read(peer_device, sector, pi->size);
1799         } else {
1800                 if (__ratelimit(&drbd_ratelimit_state))
1801                         drbd_err(device, "Can not write resync data to local disk.\n");
1802
1803                 err = drbd_drain_block(peer_device, pi->size);
1804
1805                 drbd_send_ack_dp(peer_device, P_NEG_ACK, p, pi->size);
1806         }
1807
1808         atomic_add(pi->size >> 9, &device->rs_sect_in);
1809
1810         return err;
1811 }
1812
1813 static void restart_conflicting_writes(struct drbd_device *device,
1814                                        sector_t sector, int size)
1815 {
1816         struct drbd_interval *i;
1817         struct drbd_request *req;
1818
1819         drbd_for_each_overlap(i, &device->write_requests, sector, size) {
1820                 if (!i->local)
1821                         continue;
1822                 req = container_of(i, struct drbd_request, i);
1823                 if (req->rq_state & RQ_LOCAL_PENDING ||
1824                     !(req->rq_state & RQ_POSTPONED))
1825                         continue;
1826                 /* as it is RQ_POSTPONED, this will cause it to
1827                  * be queued on the retry workqueue. */
1828                 __req_mod(req, CONFLICT_RESOLVED, NULL);
1829         }
1830 }
1831
1832 /*
1833  * e_end_block() is called in asender context via drbd_finish_peer_reqs().
1834  */
1835 static int e_end_block(struct drbd_work *w, int cancel)
1836 {
1837         struct drbd_peer_request *peer_req =
1838                 container_of(w, struct drbd_peer_request, w);
1839         struct drbd_peer_device *peer_device = peer_req->peer_device;
1840         struct drbd_device *device = peer_device->device;
1841         sector_t sector = peer_req->i.sector;
1842         int err = 0, pcmd;
1843
1844         if (peer_req->flags & EE_SEND_WRITE_ACK) {
1845                 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1846                         pcmd = (device->state.conn >= C_SYNC_SOURCE &&
1847                                 device->state.conn <= C_PAUSED_SYNC_T &&
1848                                 peer_req->flags & EE_MAY_SET_IN_SYNC) ?
1849                                 P_RS_WRITE_ACK : P_WRITE_ACK;
1850                         err = drbd_send_ack(peer_device, pcmd, peer_req);
1851                         if (pcmd == P_RS_WRITE_ACK)
1852                                 drbd_set_in_sync(device, sector, peer_req->i.size);
1853                 } else {
1854                         err = drbd_send_ack(peer_device, P_NEG_ACK, peer_req);
1855                         /* we expect it to be marked out of sync anyways...
1856                          * maybe assert this?  */
1857                 }
1858                 dec_unacked(device);
1859         }
1860         /* we delete from the conflict detection hash _after_ we sent out the
1861          * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right.  */
1862         if (peer_req->flags & EE_IN_INTERVAL_TREE) {
1863                 spin_lock_irq(&device->resource->req_lock);
1864                 D_ASSERT(device, !drbd_interval_empty(&peer_req->i));
1865                 drbd_remove_epoch_entry_interval(device, peer_req);
1866                 if (peer_req->flags & EE_RESTART_REQUESTS)
1867                         restart_conflicting_writes(device, sector, peer_req->i.size);
1868                 spin_unlock_irq(&device->resource->req_lock);
1869         } else
1870                 D_ASSERT(device, drbd_interval_empty(&peer_req->i));
1871
1872         drbd_may_finish_epoch(first_peer_device(device)->connection, peer_req->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
1873
1874         return err;
1875 }
1876
1877 static int e_send_ack(struct drbd_work *w, enum drbd_packet ack)
1878 {
1879         struct drbd_peer_request *peer_req =
1880                 container_of(w, struct drbd_peer_request, w);
1881         struct drbd_peer_device *peer_device = peer_req->peer_device;
1882         int err;
1883
1884         err = drbd_send_ack(peer_device, ack, peer_req);
1885         dec_unacked(peer_device->device);
1886
1887         return err;
1888 }
1889
1890 static int e_send_superseded(struct drbd_work *w, int unused)
1891 {
1892         return e_send_ack(w, P_SUPERSEDED);
1893 }
1894
1895 static int e_send_retry_write(struct drbd_work *w, int unused)
1896 {
1897         struct drbd_peer_request *peer_req =
1898                 container_of(w, struct drbd_peer_request, w);
1899         struct drbd_connection *connection = peer_req->peer_device->connection;
1900
1901         return e_send_ack(w, connection->agreed_pro_version >= 100 ?
1902                              P_RETRY_WRITE : P_SUPERSEDED);
1903 }
1904
1905 static bool seq_greater(u32 a, u32 b)
1906 {
1907         /*
1908          * We assume 32-bit wrap-around here.
1909          * For 24-bit wrap-around, we would have to shift:
1910          *  a <<= 8; b <<= 8;
1911          */
1912         return (s32)a - (s32)b > 0;
1913 }
1914
1915 static u32 seq_max(u32 a, u32 b)
1916 {
1917         return seq_greater(a, b) ? a : b;
1918 }
1919
1920 static void update_peer_seq(struct drbd_peer_device *peer_device, unsigned int peer_seq)
1921 {
1922         struct drbd_device *device = peer_device->device;
1923         unsigned int newest_peer_seq;
1924
1925         if (test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)) {
1926                 spin_lock(&device->peer_seq_lock);
1927                 newest_peer_seq = seq_max(device->peer_seq, peer_seq);
1928                 device->peer_seq = newest_peer_seq;
1929                 spin_unlock(&device->peer_seq_lock);
1930                 /* wake up only if we actually changed device->peer_seq */
1931                 if (peer_seq == newest_peer_seq)
1932                         wake_up(&device->seq_wait);
1933         }
1934 }
1935
1936 static inline int overlaps(sector_t s1, int l1, sector_t s2, int l2)
1937 {
1938         return !((s1 + (l1>>9) <= s2) || (s1 >= s2 + (l2>>9)));
1939 }
1940
1941 /* maybe change sync_ee into interval trees as well? */
1942 static bool overlapping_resync_write(struct drbd_device *device, struct drbd_peer_request *peer_req)
1943 {
1944         struct drbd_peer_request *rs_req;
1945         bool rv = 0;
1946
1947         spin_lock_irq(&device->resource->req_lock);
1948         list_for_each_entry(rs_req, &device->sync_ee, w.list) {
1949                 if (overlaps(peer_req->i.sector, peer_req->i.size,
1950                              rs_req->i.sector, rs_req->i.size)) {
1951                         rv = 1;
1952                         break;
1953                 }
1954         }
1955         spin_unlock_irq(&device->resource->req_lock);
1956
1957         return rv;
1958 }
1959
1960 /* Called from receive_Data.
1961  * Synchronize packets on sock with packets on msock.
1962  *
1963  * This is here so even when a P_DATA packet traveling via sock overtook an Ack
1964  * packet traveling on msock, they are still processed in the order they have
1965  * been sent.
1966  *
1967  * Note: we don't care for Ack packets overtaking P_DATA packets.
1968  *
1969  * In case packet_seq is larger than device->peer_seq number, there are
1970  * outstanding packets on the msock. We wait for them to arrive.
1971  * In case we are the logically next packet, we update device->peer_seq
1972  * ourselves. Correctly handles 32bit wrap around.
1973  *
1974  * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
1975  * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
1976  * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
1977  * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
1978  *
1979  * returns 0 if we may process the packet,
1980  * -ERESTARTSYS if we were interrupted (by disconnect signal). */
1981 static int wait_for_and_update_peer_seq(struct drbd_peer_device *peer_device, const u32 peer_seq)
1982 {
1983         struct drbd_device *device = peer_device->device;
1984         DEFINE_WAIT(wait);
1985         long timeout;
1986         int ret = 0, tp;
1987
1988         if (!test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags))
1989                 return 0;
1990
1991         spin_lock(&device->peer_seq_lock);
1992         for (;;) {
1993                 if (!seq_greater(peer_seq - 1, device->peer_seq)) {
1994                         device->peer_seq = seq_max(device->peer_seq, peer_seq);
1995                         break;
1996                 }
1997
1998                 if (signal_pending(current)) {
1999                         ret = -ERESTARTSYS;
2000                         break;
2001                 }
2002
2003                 rcu_read_lock();
2004                 tp = rcu_dereference(first_peer_device(device)->connection->net_conf)->two_primaries;
2005                 rcu_read_unlock();
2006
2007                 if (!tp)
2008                         break;
2009
2010                 /* Only need to wait if two_primaries is enabled */
2011                 prepare_to_wait(&device->seq_wait, &wait, TASK_INTERRUPTIBLE);
2012                 spin_unlock(&device->peer_seq_lock);
2013                 rcu_read_lock();
2014                 timeout = rcu_dereference(peer_device->connection->net_conf)->ping_timeo*HZ/10;
2015                 rcu_read_unlock();
2016                 timeout = schedule_timeout(timeout);
2017                 spin_lock(&device->peer_seq_lock);
2018                 if (!timeout) {
2019                         ret = -ETIMEDOUT;
2020                         drbd_err(device, "Timed out waiting for missing ack packets; disconnecting\n");
2021                         break;
2022                 }
2023         }
2024         spin_unlock(&device->peer_seq_lock);
2025         finish_wait(&device->seq_wait, &wait);
2026         return ret;
2027 }
2028
2029 /* see also bio_flags_to_wire()
2030  * DRBD_REQ_*, because we need to semantically map the flags to data packet
2031  * flags and back. We may replicate to other kernel versions. */
2032 static unsigned long wire_flags_to_bio(u32 dpf)
2033 {
2034         return  (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
2035                 (dpf & DP_FUA ? REQ_FUA : 0) |
2036                 (dpf & DP_FLUSH ? REQ_FLUSH : 0) |
2037                 (dpf & DP_DISCARD ? REQ_DISCARD : 0);
2038 }
2039
2040 static void fail_postponed_requests(struct drbd_device *device, sector_t sector,
2041                                     unsigned int size)
2042 {
2043         struct drbd_interval *i;
2044
2045     repeat:
2046         drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2047                 struct drbd_request *req;
2048                 struct bio_and_error m;
2049
2050                 if (!i->local)
2051                         continue;
2052                 req = container_of(i, struct drbd_request, i);
2053                 if (!(req->rq_state & RQ_POSTPONED))
2054                         continue;
2055                 req->rq_state &= ~RQ_POSTPONED;
2056                 __req_mod(req, NEG_ACKED, &m);
2057                 spin_unlock_irq(&device->resource->req_lock);
2058                 if (m.bio)
2059                         complete_master_bio(device, &m);
2060                 spin_lock_irq(&device->resource->req_lock);
2061                 goto repeat;
2062         }
2063 }
2064
2065 static int handle_write_conflicts(struct drbd_device *device,
2066                                   struct drbd_peer_request *peer_req)
2067 {
2068         struct drbd_connection *connection = peer_req->peer_device->connection;
2069         bool resolve_conflicts = test_bit(RESOLVE_CONFLICTS, &connection->flags);
2070         sector_t sector = peer_req->i.sector;
2071         const unsigned int size = peer_req->i.size;
2072         struct drbd_interval *i;
2073         bool equal;
2074         int err;
2075
2076         /*
2077          * Inserting the peer request into the write_requests tree will prevent
2078          * new conflicting local requests from being added.
2079          */
2080         drbd_insert_interval(&device->write_requests, &peer_req->i);
2081
2082     repeat:
2083         drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2084                 if (i == &peer_req->i)
2085                         continue;
2086
2087                 if (!i->local) {
2088                         /*
2089                          * Our peer has sent a conflicting remote request; this
2090                          * should not happen in a two-node setup.  Wait for the
2091                          * earlier peer request to complete.
2092                          */
2093                         err = drbd_wait_misc(device, i);
2094                         if (err)
2095                                 goto out;
2096                         goto repeat;
2097                 }
2098
2099                 equal = i->sector == sector && i->size == size;
2100                 if (resolve_conflicts) {
2101                         /*
2102                          * If the peer request is fully contained within the
2103                          * overlapping request, it can be considered overwritten
2104                          * and thus superseded; otherwise, it will be retried
2105                          * once all overlapping requests have completed.
2106                          */
2107                         bool superseded = i->sector <= sector && i->sector +
2108                                        (i->size >> 9) >= sector + (size >> 9);
2109
2110                         if (!equal)
2111                                 drbd_alert(device, "Concurrent writes detected: "
2112                                                "local=%llus +%u, remote=%llus +%u, "
2113                                                "assuming %s came first\n",
2114                                           (unsigned long long)i->sector, i->size,
2115                                           (unsigned long long)sector, size,
2116                                           superseded ? "local" : "remote");
2117
2118                         inc_unacked(device);
2119                         peer_req->w.cb = superseded ? e_send_superseded :
2120                                                    e_send_retry_write;
2121                         list_add_tail(&peer_req->w.list, &device->done_ee);
2122                         wake_asender(connection);
2123
2124                         err = -ENOENT;
2125                         goto out;
2126                 } else {
2127                         struct drbd_request *req =
2128                                 container_of(i, struct drbd_request, i);
2129
2130                         if (!equal)
2131                                 drbd_alert(device, "Concurrent writes detected: "
2132                                                "local=%llus +%u, remote=%llus +%u\n",
2133                                           (unsigned long long)i->sector, i->size,
2134                                           (unsigned long long)sector, size);
2135
2136                         if (req->rq_state & RQ_LOCAL_PENDING ||
2137                             !(req->rq_state & RQ_POSTPONED)) {
2138                                 /*
2139                                  * Wait for the node with the discard flag to
2140                                  * decide if this request has been superseded
2141                                  * or needs to be retried.
2142                                  * Requests that have been superseded will
2143                                  * disappear from the write_requests tree.
2144                                  *
2145                                  * In addition, wait for the conflicting
2146                                  * request to finish locally before submitting
2147                                  * the conflicting peer request.
2148                                  */
2149                                 err = drbd_wait_misc(device, &req->i);
2150                                 if (err) {
2151                                         _conn_request_state(connection, NS(conn, C_TIMEOUT), CS_HARD);
2152                                         fail_postponed_requests(device, sector, size);
2153                                         goto out;
2154                                 }
2155                                 goto repeat;
2156                         }
2157                         /*
2158                          * Remember to restart the conflicting requests after
2159                          * the new peer request has completed.
2160                          */
2161                         peer_req->flags |= EE_RESTART_REQUESTS;
2162                 }
2163         }
2164         err = 0;
2165
2166     out:
2167         if (err)
2168                 drbd_remove_epoch_entry_interval(device, peer_req);
2169         return err;
2170 }
2171
2172 /* mirrored write */
2173 static int receive_Data(struct drbd_connection *connection, struct packet_info *pi)
2174 {
2175         struct drbd_peer_device *peer_device;
2176         struct drbd_device *device;
2177         sector_t sector;
2178         struct drbd_peer_request *peer_req;
2179         struct p_data *p = pi->data;
2180         u32 peer_seq = be32_to_cpu(p->seq_num);
2181         int rw = WRITE;
2182         u32 dp_flags;
2183         int err, tp;
2184
2185         peer_device = conn_peer_device(connection, pi->vnr);
2186         if (!peer_device)
2187                 return -EIO;
2188         device = peer_device->device;
2189
2190         if (!get_ldev(device)) {
2191                 int err2;
2192
2193                 err = wait_for_and_update_peer_seq(peer_device, peer_seq);
2194                 drbd_send_ack_dp(peer_device, P_NEG_ACK, p, pi->size);
2195                 atomic_inc(&connection->current_epoch->epoch_size);
2196                 err2 = drbd_drain_block(peer_device, pi->size);
2197                 if (!err)
2198                         err = err2;
2199                 return err;
2200         }
2201
2202         /*
2203          * Corresponding put_ldev done either below (on various errors), or in
2204          * drbd_peer_request_endio, if we successfully submit the data at the
2205          * end of this function.
2206          */
2207
2208         sector = be64_to_cpu(p->sector);
2209         peer_req = read_in_block(peer_device, p->block_id, sector, pi->size);
2210         if (!peer_req) {
2211                 put_ldev(device);
2212                 return -EIO;
2213         }
2214
2215         peer_req->w.cb = e_end_block;
2216
2217         dp_flags = be32_to_cpu(p->dp_flags);
2218         rw |= wire_flags_to_bio(dp_flags);
2219         if (peer_req->pages == NULL) {
2220                 D_ASSERT(device, peer_req->i.size == 0);
2221                 D_ASSERT(device, dp_flags & DP_FLUSH);
2222         }
2223
2224         if (dp_flags & DP_MAY_SET_IN_SYNC)
2225                 peer_req->flags |= EE_MAY_SET_IN_SYNC;
2226
2227         spin_lock(&connection->epoch_lock);
2228         peer_req->epoch = connection->current_epoch;
2229         atomic_inc(&peer_req->epoch->epoch_size);
2230         atomic_inc(&peer_req->epoch->active);
2231         spin_unlock(&connection->epoch_lock);
2232
2233         rcu_read_lock();
2234         tp = rcu_dereference(peer_device->connection->net_conf)->two_primaries;
2235         rcu_read_unlock();
2236         if (tp) {
2237                 peer_req->flags |= EE_IN_INTERVAL_TREE;
2238                 err = wait_for_and_update_peer_seq(peer_device, peer_seq);
2239                 if (err)
2240                         goto out_interrupted;
2241                 spin_lock_irq(&device->resource->req_lock);
2242                 err = handle_write_conflicts(device, peer_req);
2243                 if (err) {
2244                         spin_unlock_irq(&device->resource->req_lock);
2245                         if (err == -ENOENT) {
2246                                 put_ldev(device);
2247                                 return 0;
2248                         }
2249                         goto out_interrupted;
2250                 }
2251         } else {
2252                 update_peer_seq(peer_device, peer_seq);
2253                 spin_lock_irq(&device->resource->req_lock);
2254         }
2255         list_add(&peer_req->w.list, &device->active_ee);
2256         spin_unlock_irq(&device->resource->req_lock);
2257
2258         if (device->state.conn == C_SYNC_TARGET)
2259                 wait_event(device->ee_wait, !overlapping_resync_write(device, peer_req));
2260
2261         if (peer_device->connection->agreed_pro_version < 100) {
2262                 rcu_read_lock();
2263                 switch (rcu_dereference(peer_device->connection->net_conf)->wire_protocol) {
2264                 case DRBD_PROT_C:
2265                         dp_flags |= DP_SEND_WRITE_ACK;
2266                         break;
2267                 case DRBD_PROT_B:
2268                         dp_flags |= DP_SEND_RECEIVE_ACK;
2269                         break;
2270                 }
2271                 rcu_read_unlock();
2272         }
2273
2274         if (dp_flags & DP_SEND_WRITE_ACK) {
2275                 peer_req->flags |= EE_SEND_WRITE_ACK;
2276                 inc_unacked(device);
2277                 /* corresponding dec_unacked() in e_end_block()
2278                  * respective _drbd_clear_done_ee */
2279         }
2280
2281         if (dp_flags & DP_SEND_RECEIVE_ACK) {
2282                 /* I really don't like it that the receiver thread
2283                  * sends on the msock, but anyways */
2284                 drbd_send_ack(first_peer_device(device), P_RECV_ACK, peer_req);
2285         }
2286
2287         if (device->state.pdsk < D_INCONSISTENT) {
2288                 /* In case we have the only disk of the cluster, */
2289                 drbd_set_out_of_sync(device, peer_req->i.sector, peer_req->i.size);
2290                 peer_req->flags |= EE_CALL_AL_COMPLETE_IO;
2291                 peer_req->flags &= ~EE_MAY_SET_IN_SYNC;
2292                 drbd_al_begin_io(device, &peer_req->i, true);
2293         }
2294
2295         err = drbd_submit_peer_request(device, peer_req, rw, DRBD_FAULT_DT_WR);
2296         if (!err)
2297                 return 0;
2298
2299         /* don't care for the reason here */
2300         drbd_err(device, "submit failed, triggering re-connect\n");
2301         spin_lock_irq(&device->resource->req_lock);
2302         list_del(&peer_req->w.list);
2303         drbd_remove_epoch_entry_interval(device, peer_req);
2304         spin_unlock_irq(&device->resource->req_lock);
2305         if (peer_req->flags & EE_CALL_AL_COMPLETE_IO)
2306                 drbd_al_complete_io(device, &peer_req->i);
2307
2308 out_interrupted:
2309         drbd_may_finish_epoch(connection, peer_req->epoch, EV_PUT + EV_CLEANUP);
2310         put_ldev(device);
2311         drbd_free_peer_req(device, peer_req);
2312         return err;
2313 }
2314
2315 /* We may throttle resync, if the lower device seems to be busy,
2316  * and current sync rate is above c_min_rate.
2317  *
2318  * To decide whether or not the lower device is busy, we use a scheme similar
2319  * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
2320  * (more than 64 sectors) of activity we cannot account for with our own resync
2321  * activity, it obviously is "busy".
2322  *
2323  * The current sync rate used here uses only the most recent two step marks,
2324  * to have a short time average so we can react faster.
2325  */
2326 int drbd_rs_should_slow_down(struct drbd_device *device, sector_t sector)
2327 {
2328         struct gendisk *disk = device->ldev->backing_bdev->bd_contains->bd_disk;
2329         unsigned long db, dt, dbdt;
2330         struct lc_element *tmp;
2331         int curr_events;
2332         int throttle = 0;
2333         unsigned int c_min_rate;
2334
2335         rcu_read_lock();
2336         c_min_rate = rcu_dereference(device->ldev->disk_conf)->c_min_rate;
2337         rcu_read_unlock();
2338
2339         /* feature disabled? */
2340         if (c_min_rate == 0)
2341                 return 0;
2342
2343         spin_lock_irq(&device->al_lock);
2344         tmp = lc_find(device->resync, BM_SECT_TO_EXT(sector));
2345         if (tmp) {
2346                 struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
2347                 if (test_bit(BME_PRIORITY, &bm_ext->flags)) {
2348                         spin_unlock_irq(&device->al_lock);
2349                         return 0;
2350                 }
2351                 /* Do not slow down if app IO is already waiting for this extent */
2352         }
2353         spin_unlock_irq(&device->al_lock);
2354
2355         curr_events = (int)part_stat_read(&disk->part0, sectors[0]) +
2356                       (int)part_stat_read(&disk->part0, sectors[1]) -
2357                         atomic_read(&device->rs_sect_ev);
2358
2359         if (!device->rs_last_events || curr_events - device->rs_last_events > 64) {
2360                 unsigned long rs_left;
2361                 int i;
2362
2363                 device->rs_last_events = curr_events;
2364
2365                 /* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
2366                  * approx. */
2367                 i = (device->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
2368
2369                 if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T)
2370                         rs_left = device->ov_left;
2371                 else
2372                         rs_left = drbd_bm_total_weight(device) - device->rs_failed;
2373
2374                 dt = ((long)jiffies - (long)device->rs_mark_time[i]) / HZ;
2375                 if (!dt)
2376                         dt++;
2377                 db = device->rs_mark_left[i] - rs_left;
2378                 dbdt = Bit2KB(db/dt);
2379
2380                 if (dbdt > c_min_rate)
2381                         throttle = 1;
2382         }
2383         return throttle;
2384 }
2385
2386
2387 static int receive_DataRequest(struct drbd_connection *connection, struct packet_info *pi)
2388 {
2389         struct drbd_peer_device *peer_device;
2390         struct drbd_device *device;
2391         sector_t sector;
2392         sector_t capacity;
2393         struct drbd_peer_request *peer_req;
2394         struct digest_info *di = NULL;
2395         int size, verb;
2396         unsigned int fault_type;
2397         struct p_block_req *p = pi->data;
2398
2399         peer_device = conn_peer_device(connection, pi->vnr);
2400         if (!peer_device)
2401                 return -EIO;
2402         device = peer_device->device;
2403         capacity = drbd_get_capacity(device->this_bdev);
2404
2405         sector = be64_to_cpu(p->sector);
2406         size   = be32_to_cpu(p->blksize);
2407
2408         if (size <= 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_BIO_SIZE) {
2409                 drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2410                                 (unsigned long long)sector, size);
2411                 return -EINVAL;
2412         }
2413         if (sector + (size>>9) > capacity) {
2414                 drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2415                                 (unsigned long long)sector, size);
2416                 return -EINVAL;
2417         }
2418
2419         if (!get_ldev_if_state(device, D_UP_TO_DATE)) {
2420                 verb = 1;
2421                 switch (pi->cmd) {
2422                 case P_DATA_REQUEST:
2423                         drbd_send_ack_rp(peer_device, P_NEG_DREPLY, p);
2424                         break;
2425                 case P_RS_DATA_REQUEST:
2426                 case P_CSUM_RS_REQUEST:
2427                 case P_OV_REQUEST:
2428                         drbd_send_ack_rp(peer_device, P_NEG_RS_DREPLY , p);
2429                         break;
2430                 case P_OV_REPLY:
2431                         verb = 0;
2432                         dec_rs_pending(device);
2433                         drbd_send_ack_ex(peer_device, P_OV_RESULT, sector, size, ID_IN_SYNC);
2434                         break;
2435                 default:
2436                         BUG();
2437                 }
2438                 if (verb && __ratelimit(&drbd_ratelimit_state))
2439                         drbd_err(device, "Can not satisfy peer's read request, "
2440                             "no local data.\n");
2441
2442                 /* drain possibly payload */
2443                 return drbd_drain_block(peer_device, pi->size);
2444         }
2445
2446         /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2447          * "criss-cross" setup, that might cause write-out on some other DRBD,
2448          * which in turn might block on the other node at this very place.  */
2449         peer_req = drbd_alloc_peer_req(peer_device, p->block_id, sector, size, GFP_NOIO);
2450         if (!peer_req) {
2451                 put_ldev(device);
2452                 return -ENOMEM;
2453         }
2454
2455         switch (pi->cmd) {
2456         case P_DATA_REQUEST:
2457                 peer_req->w.cb = w_e_end_data_req;
2458                 fault_type = DRBD_FAULT_DT_RD;
2459                 /* application IO, don't drbd_rs_begin_io */
2460                 goto submit;
2461
2462         case P_RS_DATA_REQUEST:
2463                 peer_req->w.cb = w_e_end_rsdata_req;
2464                 fault_type = DRBD_FAULT_RS_RD;
2465                 /* used in the sector offset progress display */
2466                 device->bm_resync_fo = BM_SECT_TO_BIT(sector);
2467                 break;
2468
2469         case P_OV_REPLY:
2470         case P_CSUM_RS_REQUEST:
2471                 fault_type = DRBD_FAULT_RS_RD;
2472                 di = kmalloc(sizeof(*di) + pi->size, GFP_NOIO);
2473                 if (!di)
2474                         goto out_free_e;
2475
2476                 di->digest_size = pi->size;
2477                 di->digest = (((char *)di)+sizeof(struct digest_info));
2478
2479                 peer_req->digest = di;
2480                 peer_req->flags |= EE_HAS_DIGEST;
2481
2482                 if (drbd_recv_all(peer_device->connection, di->digest, pi->size))
2483                         goto out_free_e;
2484
2485                 if (pi->cmd == P_CSUM_RS_REQUEST) {
2486                         D_ASSERT(device, peer_device->connection->agreed_pro_version >= 89);
2487                         peer_req->w.cb = w_e_end_csum_rs_req;
2488                         /* used in the sector offset progress display */
2489                         device->bm_resync_fo = BM_SECT_TO_BIT(sector);
2490                 } else if (pi->cmd == P_OV_REPLY) {
2491                         /* track progress, we may need to throttle */
2492                         atomic_add(size >> 9, &device->rs_sect_in);
2493                         peer_req->w.cb = w_e_end_ov_reply;
2494                         dec_rs_pending(device);
2495                         /* drbd_rs_begin_io done when we sent this request,
2496                          * but accounting still needs to be done. */
2497                         goto submit_for_resync;
2498                 }
2499                 break;
2500
2501         case P_OV_REQUEST:
2502                 if (device->ov_start_sector == ~(sector_t)0 &&
2503                     peer_device->connection->agreed_pro_version >= 90) {
2504                         unsigned long now = jiffies;
2505                         int i;
2506                         device->ov_start_sector = sector;
2507                         device->ov_position = sector;
2508                         device->ov_left = drbd_bm_bits(device) - BM_SECT_TO_BIT(sector);
2509                         device->rs_total = device->ov_left;
2510                         for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2511                                 device->rs_mark_left[i] = device->ov_left;
2512                                 device->rs_mark_time[i] = now;
2513                         }
2514                         drbd_info(device, "Online Verify start sector: %llu\n",
2515                                         (unsigned long long)sector);
2516                 }
2517                 peer_req->w.cb = w_e_end_ov_req;
2518                 fault_type = DRBD_FAULT_RS_RD;
2519                 break;
2520
2521         default:
2522                 BUG();
2523         }
2524
2525         /* Throttle, drbd_rs_begin_io and submit should become asynchronous
2526          * wrt the receiver, but it is not as straightforward as it may seem.
2527          * Various places in the resync start and stop logic assume resync
2528          * requests are processed in order, requeuing this on the worker thread
2529          * introduces a bunch of new code for synchronization between threads.
2530          *
2531          * Unlimited throttling before drbd_rs_begin_io may stall the resync
2532          * "forever", throttling after drbd_rs_begin_io will lock that extent
2533          * for application writes for the same time.  For now, just throttle
2534          * here, where the rest of the code expects the receiver to sleep for
2535          * a while, anyways.
2536          */
2537
2538         /* Throttle before drbd_rs_begin_io, as that locks out application IO;
2539          * this defers syncer requests for some time, before letting at least
2540          * on request through.  The resync controller on the receiving side
2541          * will adapt to the incoming rate accordingly.
2542          *
2543          * We cannot throttle here if remote is Primary/SyncTarget:
2544          * we would also throttle its application reads.
2545          * In that case, throttling is done on the SyncTarget only.
2546          */
2547         if (device->state.peer != R_PRIMARY && drbd_rs_should_slow_down(device, sector))
2548                 schedule_timeout_uninterruptible(HZ/10);
2549         if (drbd_rs_begin_io(device, sector))
2550                 goto out_free_e;
2551
2552 submit_for_resync:
2553         atomic_add(size >> 9, &device->rs_sect_ev);
2554
2555 submit:
2556         inc_unacked(device);
2557         spin_lock_irq(&device->resource->req_lock);
2558         list_add_tail(&peer_req->w.list, &device->read_ee);
2559         spin_unlock_irq(&device->resource->req_lock);
2560
2561         if (drbd_submit_peer_request(device, peer_req, READ, fault_type) == 0)
2562                 return 0;
2563
2564         /* don't care for the reason here */
2565         drbd_err(device, "submit failed, triggering re-connect\n");
2566         spin_lock_irq(&device->resource->req_lock);
2567         list_del(&peer_req->w.list);
2568         spin_unlock_irq(&device->resource->req_lock);
2569         /* no drbd_rs_complete_io(), we are dropping the connection anyways */
2570
2571 out_free_e:
2572         put_ldev(device);
2573         drbd_free_peer_req(device, peer_req);
2574         return -EIO;
2575 }
2576
2577 /**
2578  * drbd_asb_recover_0p  -  Recover after split-brain with no remaining primaries
2579  */
2580 static int drbd_asb_recover_0p(struct drbd_peer_device *peer_device) __must_hold(local)
2581 {
2582         struct drbd_device *device = peer_device->device;
2583         int self, peer, rv = -100;
2584         unsigned long ch_self, ch_peer;
2585         enum drbd_after_sb_p after_sb_0p;
2586
2587         self = device->ldev->md.uuid[UI_BITMAP] & 1;
2588         peer = device->p_uuid[UI_BITMAP] & 1;
2589
2590         ch_peer = device->p_uuid[UI_SIZE];
2591         ch_self = device->comm_bm_set;
2592
2593         rcu_read_lock();
2594         after_sb_0p = rcu_dereference(peer_device->connection->net_conf)->after_sb_0p;
2595         rcu_read_unlock();
2596         switch (after_sb_0p) {
2597         case ASB_CONSENSUS:
2598         case ASB_DISCARD_SECONDARY:
2599         case ASB_CALL_HELPER:
2600         case ASB_VIOLENTLY:
2601                 drbd_err(device, "Configuration error.\n");
2602                 break;
2603         case ASB_DISCONNECT:
2604                 break;
2605         case ASB_DISCARD_YOUNGER_PRI:
2606                 if (self == 0 && peer == 1) {
2607                         rv = -1;
2608                         break;
2609                 }
2610                 if (self == 1 && peer == 0) {
2611                         rv =  1;
2612                         break;
2613                 }
2614                 /* Else fall through to one of the other strategies... */
2615         case ASB_DISCARD_OLDER_PRI:
2616                 if (self == 0 && peer == 1) {
2617                         rv = 1;
2618                         break;
2619                 }
2620                 if (self == 1 && peer == 0) {
2621                         rv = -1;
2622                         break;
2623                 }
2624                 /* Else fall through to one of the other strategies... */
2625                 drbd_warn(device, "Discard younger/older primary did not find a decision\n"
2626                      "Using discard-least-changes instead\n");
2627         case ASB_DISCARD_ZERO_CHG:
2628                 if (ch_peer == 0 && ch_self == 0) {
2629                         rv = test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)
2630                                 ? -1 : 1;
2631                         break;
2632                 } else {
2633                         if (ch_peer == 0) { rv =  1; break; }
2634                         if (ch_self == 0) { rv = -1; break; }
2635                 }
2636                 if (after_sb_0p == ASB_DISCARD_ZERO_CHG)
2637                         break;
2638         case ASB_DISCARD_LEAST_CHG:
2639                 if      (ch_self < ch_peer)
2640                         rv = -1;
2641                 else if (ch_self > ch_peer)
2642                         rv =  1;
2643                 else /* ( ch_self == ch_peer ) */
2644                      /* Well, then use something else. */
2645                         rv = test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)
2646                                 ? -1 : 1;
2647                 break;
2648         case ASB_DISCARD_LOCAL:
2649                 rv = -1;
2650                 break;
2651         case ASB_DISCARD_REMOTE:
2652                 rv =  1;
2653         }
2654
2655         return rv;
2656 }
2657
2658 /**
2659  * drbd_asb_recover_1p  -  Recover after split-brain with one remaining primary
2660  */
2661 static int drbd_asb_recover_1p(struct drbd_peer_device *peer_device) __must_hold(local)
2662 {
2663         struct drbd_device *device = peer_device->device;
2664         int hg, rv = -100;
2665         enum drbd_after_sb_p after_sb_1p;
2666
2667         rcu_read_lock();
2668         after_sb_1p = rcu_dereference(peer_device->connection->net_conf)->after_sb_1p;
2669         rcu_read_unlock();
2670         switch (after_sb_1p) {
2671         case ASB_DISCARD_YOUNGER_PRI:
2672         case ASB_DISCARD_OLDER_PRI:
2673         case ASB_DISCARD_LEAST_CHG:
2674         case ASB_DISCARD_LOCAL:
2675         case ASB_DISCARD_REMOTE:
2676         case ASB_DISCARD_ZERO_CHG:
2677                 drbd_err(device, "Configuration error.\n");
2678                 break;
2679         case ASB_DISCONNECT:
2680                 break;
2681         case ASB_CONSENSUS:
2682                 hg = drbd_asb_recover_0p(peer_device);
2683                 if (hg == -1 && device->state.role == R_SECONDARY)
2684                         rv = hg;
2685                 if (hg == 1  && device->state.role == R_PRIMARY)
2686                         rv = hg;
2687                 break;
2688         case ASB_VIOLENTLY:
2689                 rv = drbd_asb_recover_0p(peer_device);
2690                 break;
2691         case ASB_DISCARD_SECONDARY:
2692                 return device->state.role == R_PRIMARY ? 1 : -1;
2693         case ASB_CALL_HELPER:
2694                 hg = drbd_asb_recover_0p(peer_device);
2695                 if (hg == -1 && device->state.role == R_PRIMARY) {
2696                         enum drbd_state_rv rv2;
2697
2698                          /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2699                           * we might be here in C_WF_REPORT_PARAMS which is transient.
2700                           * we do not need to wait for the after state change work either. */
2701                         rv2 = drbd_change_state(device, CS_VERBOSE, NS(role, R_SECONDARY));
2702                         if (rv2 != SS_SUCCESS) {
2703                                 drbd_khelper(device, "pri-lost-after-sb");
2704                         } else {
2705                                 drbd_warn(device, "Successfully gave up primary role.\n");
2706                                 rv = hg;
2707                         }
2708                 } else
2709                         rv = hg;
2710         }
2711
2712         return rv;
2713 }
2714
2715 /**
2716  * drbd_asb_recover_2p  -  Recover after split-brain with two remaining primaries
2717  */
2718 static int drbd_asb_recover_2p(struct drbd_peer_device *peer_device) __must_hold(local)
2719 {
2720         struct drbd_device *device = peer_device->device;
2721         int hg, rv = -100;
2722         enum drbd_after_sb_p after_sb_2p;
2723
2724         rcu_read_lock();
2725         after_sb_2p = rcu_dereference(peer_device->connection->net_conf)->after_sb_2p;
2726         rcu_read_unlock();
2727         switch (after_sb_2p) {
2728         case ASB_DISCARD_YOUNGER_PRI:
2729         case ASB_DISCARD_OLDER_PRI:
2730         case ASB_DISCARD_LEAST_CHG:
2731         case ASB_DISCARD_LOCAL:
2732         case ASB_DISCARD_REMOTE:
2733         case ASB_CONSENSUS:
2734         case ASB_DISCARD_SECONDARY:
2735         case ASB_DISCARD_ZERO_CHG:
2736                 drbd_err(device, "Configuration error.\n");
2737                 break;
2738         case ASB_VIOLENTLY:
2739                 rv = drbd_asb_recover_0p(peer_device);
2740                 break;
2741         case ASB_DISCONNECT:
2742                 break;
2743         case ASB_CALL_HELPER:
2744                 hg = drbd_asb_recover_0p(peer_device);
2745                 if (hg == -1) {
2746                         enum drbd_state_rv rv2;
2747
2748                          /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2749                           * we might be here in C_WF_REPORT_PARAMS which is transient.
2750                           * we do not need to wait for the after state change work either. */
2751                         rv2 = drbd_change_state(device, CS_VERBOSE, NS(role, R_SECONDARY));
2752                         if (rv2 != SS_SUCCESS) {
2753                                 drbd_khelper(device, "pri-lost-after-sb");
2754                         } else {
2755                                 drbd_warn(device, "Successfully gave up primary role.\n");
2756                                 rv = hg;
2757                         }
2758                 } else
2759                         rv = hg;
2760         }
2761
2762         return rv;
2763 }
2764
2765 static void drbd_uuid_dump(struct drbd_device *device, char *text, u64 *uuid,
2766                            u64 bits, u64 flags)
2767 {
2768         if (!uuid) {
2769                 drbd_info(device, "%s uuid info vanished while I was looking!\n", text);
2770                 return;
2771         }
2772         drbd_info(device, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
2773              text,
2774              (unsigned long long)uuid[UI_CURRENT],
2775              (unsigned long long)uuid[UI_BITMAP],
2776              (unsigned long long)uuid[UI_HISTORY_START],
2777              (unsigned long long)uuid[UI_HISTORY_END],
2778              (unsigned long long)bits,
2779              (unsigned long long)flags);
2780 }
2781
2782 /*
2783   100   after split brain try auto recover
2784     2   C_SYNC_SOURCE set BitMap
2785     1   C_SYNC_SOURCE use BitMap
2786     0   no Sync
2787    -1   C_SYNC_TARGET use BitMap
2788    -2   C_SYNC_TARGET set BitMap
2789  -100   after split brain, disconnect
2790 -1000   unrelated data
2791 -1091   requires proto 91
2792 -1096   requires proto 96
2793  */
2794 static int drbd_uuid_compare(struct drbd_device *device, int *rule_nr) __must_hold(local)
2795 {
2796         u64 self, peer;
2797         int i, j;
2798
2799         self = device->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2800         peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
2801
2802         *rule_nr = 10;
2803         if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
2804                 return 0;
2805
2806         *rule_nr = 20;
2807         if ((self == UUID_JUST_CREATED || self == (u64)0) &&
2808              peer != UUID_JUST_CREATED)
2809                 return -2;
2810
2811         *rule_nr = 30;
2812         if (self != UUID_JUST_CREATED &&
2813             (peer == UUID_JUST_CREATED || peer == (u64)0))
2814                 return 2;
2815
2816         if (self == peer) {
2817                 int rct, dc; /* roles at crash time */
2818
2819                 if (device->p_uuid[UI_BITMAP] == (u64)0 && device->ldev->md.uuid[UI_BITMAP] != (u64)0) {
2820
2821                         if (first_peer_device(device)->connection->agreed_pro_version < 91)
2822                                 return -1091;
2823
2824                         if ((device->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
2825                             (device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
2826                                 drbd_info(device, "was SyncSource, missed the resync finished event, corrected myself:\n");
2827                                 drbd_uuid_move_history(device);
2828                                 device->ldev->md.uuid[UI_HISTORY_START] = device->ldev->md.uuid[UI_BITMAP];
2829                                 device->ldev->md.uuid[UI_BITMAP] = 0;
2830
2831                                 drbd_uuid_dump(device, "self", device->ldev->md.uuid,
2832                                                device->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(device) : 0, 0);
2833                                 *rule_nr = 34;
2834                         } else {
2835                                 drbd_info(device, "was SyncSource (peer failed to write sync_uuid)\n");
2836                                 *rule_nr = 36;
2837                         }
2838
2839                         return 1;
2840                 }
2841
2842                 if (device->ldev->md.uuid[UI_BITMAP] == (u64)0 && device->p_uuid[UI_BITMAP] != (u64)0) {
2843
2844                         if (first_peer_device(device)->connection->agreed_pro_version < 91)
2845                                 return -1091;
2846
2847                         if ((device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_BITMAP] & ~((u64)1)) &&
2848                             (device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
2849                                 drbd_info(device, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
2850
2851                                 device->p_uuid[UI_HISTORY_START + 1] = device->p_uuid[UI_HISTORY_START];
2852                                 device->p_uuid[UI_HISTORY_START] = device->p_uuid[UI_BITMAP];
2853                                 device->p_uuid[UI_BITMAP] = 0UL;
2854
2855                                 drbd_uuid_dump(device, "peer", device->p_uuid, device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
2856                                 *rule_nr = 35;
2857                         } else {
2858                                 drbd_info(device, "was SyncTarget (failed to write sync_uuid)\n");
2859                                 *rule_nr = 37;
2860                         }
2861
2862                         return -1;
2863                 }
2864
2865                 /* Common power [off|failure] */
2866                 rct = (test_bit(CRASHED_PRIMARY, &device->flags) ? 1 : 0) +
2867                         (device->p_uuid[UI_FLAGS] & 2);
2868                 /* lowest bit is set when we were primary,
2869                  * next bit (weight 2) is set when peer was primary */
2870                 *rule_nr = 40;
2871
2872                 switch (rct) {
2873                 case 0: /* !self_pri && !peer_pri */ return 0;
2874                 case 1: /*  self_pri && !peer_pri */ return 1;
2875                 case 2: /* !self_pri &&  peer_pri */ return -1;
2876                 case 3: /*  self_pri &&  peer_pri */
2877                         dc = test_bit(RESOLVE_CONFLICTS, &first_peer_device(device)->connection->flags);
2878                         return dc ? -1 : 1;
2879                 }
2880         }
2881
2882         *rule_nr = 50;
2883         peer = device->p_uuid[UI_BITMAP] & ~((u64)1);
2884         if (self == peer)
2885                 return -1;
2886
2887         *rule_nr = 51;
2888         peer = device->p_uuid[UI_HISTORY_START] & ~((u64)1);
2889         if (self == peer) {
2890                 if (first_peer_device(device)->connection->agreed_pro_version < 96 ?
2891                     (device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
2892                     (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
2893                     peer + UUID_NEW_BM_OFFSET == (device->p_uuid[UI_BITMAP] & ~((u64)1))) {
2894                         /* The last P_SYNC_UUID did not get though. Undo the last start of
2895                            resync as sync source modifications of the peer's UUIDs. */
2896
2897                         if (first_peer_device(device)->connection->agreed_pro_version < 91)
2898                                 return -1091;
2899
2900                         device->p_uuid[UI_BITMAP] = device->p_uuid[UI_HISTORY_START];
2901                         device->p_uuid[UI_HISTORY_START] = device->p_uuid[UI_HISTORY_START + 1];
2902
2903                         drbd_info(device, "Lost last syncUUID packet, corrected:\n");
2904                         drbd_uuid_dump(device, "peer", device->p_uuid, device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
2905
2906                         return -1;
2907                 }
2908         }
2909
2910         *rule_nr = 60;
2911         self = device->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2912         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2913                 peer = device->p_uuid[i] & ~((u64)1);
2914                 if (self == peer)
2915                         return -2;
2916         }
2917
2918         *rule_nr = 70;
2919         self = device->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2920         peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
2921         if (self == peer)
2922                 return 1;
2923
2924         *rule_nr = 71;
2925         self = device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2926         if (self == peer) {
2927                 if (first_peer_device(device)->connection->agreed_pro_version < 96 ?
2928                     (device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
2929                     (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
2930                     self + UUID_NEW_BM_OFFSET == (device->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
2931                         /* The last P_SYNC_UUID did not get though. Undo the last start of
2932                            resync as sync source modifications of our UUIDs. */
2933
2934                         if (first_peer_device(device)->connection->agreed_pro_version < 91)
2935                                 return -1091;
2936
2937                         __drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_HISTORY_START]);
2938                         __drbd_uuid_set(device, UI_HISTORY_START, device->ldev->md.uuid[UI_HISTORY_START + 1]);
2939
2940                         drbd_info(device, "Last syncUUID did not get through, corrected:\n");
2941                         drbd_uuid_dump(device, "self", device->ldev->md.uuid,
2942                                        device->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(device) : 0, 0);
2943
2944                         return 1;
2945                 }
2946         }
2947
2948
2949         *rule_nr = 80;
2950         peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
2951         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2952                 self = device->ldev->md.uuid[i] & ~((u64)1);
2953                 if (self == peer)
2954                         return 2;
2955         }
2956
2957         *rule_nr = 90;
2958         self = device->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2959         peer = device->p_uuid[UI_BITMAP] & ~((u64)1);
2960         if (self == peer && self != ((u64)0))
2961                 return 100;
2962
2963         *rule_nr = 100;
2964         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2965                 self = device->ldev->md.uuid[i] & ~((u64)1);
2966                 for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
2967                         peer = device->p_uuid[j] & ~((u64)1);
2968                         if (self == peer)
2969                                 return -100;
2970                 }
2971         }
2972
2973         return -1000;
2974 }
2975
2976 /* drbd_sync_handshake() returns the new conn state on success, or
2977    CONN_MASK (-1) on failure.
2978  */
2979 static enum drbd_conns drbd_sync_handshake(struct drbd_peer_device *peer_device,
2980                                            enum drbd_role peer_role,
2981                                            enum drbd_disk_state peer_disk) __must_hold(local)
2982 {
2983         struct drbd_device *device = peer_device->device;
2984         enum drbd_conns rv = C_MASK;
2985         enum drbd_disk_state mydisk;
2986         struct net_conf *nc;
2987         int hg, rule_nr, rr_conflict, tentative;
2988
2989         mydisk = device->state.disk;
2990         if (mydisk == D_NEGOTIATING)
2991                 mydisk = device->new_state_tmp.disk;
2992
2993         drbd_info(device, "drbd_sync_handshake:\n");
2994
2995         spin_lock_irq(&device->ldev->md.uuid_lock);
2996         drbd_uuid_dump(device, "self", device->ldev->md.uuid, device->comm_bm_set, 0);
2997         drbd_uuid_dump(device, "peer", device->p_uuid,
2998                        device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
2999
3000         hg = drbd_uuid_compare(device, &rule_nr);
3001         spin_unlock_irq(&device->ldev->md.uuid_lock);
3002
3003         drbd_info(device, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
3004
3005         if (hg == -1000) {
3006                 drbd_alert(device, "Unrelated data, aborting!\n");
3007                 return C_MASK;
3008         }
3009         if (hg < -1000) {
3010                 drbd_alert(device, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
3011                 return C_MASK;
3012         }
3013
3014         if    ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
3015             (peer_disk == D_INCONSISTENT && mydisk    > D_INCONSISTENT)) {
3016                 int f = (hg == -100) || abs(hg) == 2;
3017                 hg = mydisk > D_INCONSISTENT ? 1 : -1;
3018                 if (f)
3019                         hg = hg*2;
3020                 drbd_info(device, "Becoming sync %s due to disk states.\n",
3021                      hg > 0 ? "source" : "target");
3022         }
3023
3024         if (abs(hg) == 100)
3025                 drbd_khelper(device, "initial-split-brain");
3026
3027         rcu_read_lock();
3028         nc = rcu_dereference(peer_device->connection->net_conf);
3029
3030         if (hg == 100 || (hg == -100 && nc->always_asbp)) {
3031                 int pcount = (device->state.role == R_PRIMARY)
3032                            + (peer_role == R_PRIMARY);
3033                 int forced = (hg == -100);
3034
3035                 switch (pcount) {
3036                 case 0:
3037                         hg = drbd_asb_recover_0p(peer_device);
3038                         break;
3039                 case 1:
3040                         hg = drbd_asb_recover_1p(peer_device);
3041                         break;
3042                 case 2:
3043                         hg = drbd_asb_recover_2p(peer_device);
3044                         break;
3045                 }
3046                 if (abs(hg) < 100) {
3047                         drbd_warn(device, "Split-Brain detected, %d primaries, "
3048                              "automatically solved. Sync from %s node\n",
3049                              pcount, (hg < 0) ? "peer" : "this");
3050                         if (forced) {
3051                                 drbd_warn(device, "Doing a full sync, since"
3052                                      " UUIDs where ambiguous.\n");
3053                                 hg = hg*2;
3054                         }
3055                 }
3056         }
3057
3058         if (hg == -100) {
3059                 if (test_bit(DISCARD_MY_DATA, &device->flags) && !(device->p_uuid[UI_FLAGS]&1))
3060                         hg = -1;
3061                 if (!test_bit(DISCARD_MY_DATA, &device->flags) && (device->p_uuid[UI_FLAGS]&1))
3062                         hg = 1;
3063
3064                 if (abs(hg) < 100)
3065                         drbd_warn(device, "Split-Brain detected, manually solved. "
3066                              "Sync from %s node\n",
3067                              (hg < 0) ? "peer" : "this");
3068         }
3069         rr_conflict = nc->rr_conflict;
3070         tentative = nc->tentative;
3071         rcu_read_unlock();
3072
3073         if (hg == -100) {
3074                 /* FIXME this log message is not correct if we end up here
3075                  * after an attempted attach on a diskless node.
3076                  * We just refuse to attach -- well, we drop the "connection"
3077                  * to that disk, in a way... */
3078                 drbd_alert(device, "Split-Brain detected but unresolved, dropping connection!\n");
3079                 drbd_khelper(device, "split-brain");
3080                 return C_MASK;
3081         }
3082
3083         if (hg > 0 && mydisk <= D_INCONSISTENT) {
3084                 drbd_err(device, "I shall become SyncSource, but I am inconsistent!\n");
3085                 return C_MASK;
3086         }
3087
3088         if (hg < 0 && /* by intention we do not use mydisk here. */
3089             device->state.role == R_PRIMARY && device->state.disk >= D_CONSISTENT) {
3090                 switch (rr_conflict) {
3091                 case ASB_CALL_HELPER:
3092                         drbd_khelper(device, "pri-lost");
3093                         /* fall through */
3094                 case ASB_DISCONNECT:
3095                         drbd_err(device, "I shall become SyncTarget, but I am primary!\n");
3096                         return C_MASK;
3097                 case ASB_VIOLENTLY:
3098                         drbd_warn(device, "Becoming SyncTarget, violating the stable-data"
3099                              "assumption\n");
3100                 }
3101         }
3102
3103         if (tentative || test_bit(CONN_DRY_RUN, &peer_device->connection->flags)) {
3104                 if (hg == 0)
3105                         drbd_info(device, "dry-run connect: No resync, would become Connected immediately.\n");
3106                 else
3107                         drbd_info(device, "dry-run connect: Would become %s, doing a %s resync.",
3108                                  drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
3109                                  abs(hg) >= 2 ? "full" : "bit-map based");
3110                 return C_MASK;
3111         }
3112
3113         if (abs(hg) >= 2) {
3114                 drbd_info(device, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
3115                 if (drbd_bitmap_io(device, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
3116                                         BM_LOCKED_SET_ALLOWED))
3117                         return C_MASK;
3118         }
3119
3120         if (hg > 0) { /* become sync source. */
3121                 rv = C_WF_BITMAP_S;
3122         } else if (hg < 0) { /* become sync target */
3123                 rv = C_WF_BITMAP_T;
3124         } else {
3125                 rv = C_CONNECTED;
3126                 if (drbd_bm_total_weight(device)) {
3127                         drbd_info(device, "No resync, but %lu bits in bitmap!\n",
3128                              drbd_bm_total_weight(device));
3129                 }
3130         }
3131
3132         return rv;
3133 }
3134
3135 static enum drbd_after_sb_p convert_after_sb(enum drbd_after_sb_p peer)
3136 {
3137         /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
3138         if (peer == ASB_DISCARD_REMOTE)
3139                 return ASB_DISCARD_LOCAL;
3140
3141         /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
3142         if (peer == ASB_DISCARD_LOCAL)
3143                 return ASB_DISCARD_REMOTE;
3144
3145         /* everything else is valid if they are equal on both sides. */
3146         return peer;
3147 }
3148
3149 static int receive_protocol(struct drbd_connection *connection, struct packet_info *pi)
3150 {
3151         struct p_protocol *p = pi->data;
3152         enum drbd_after_sb_p p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
3153         int p_proto, p_discard_my_data, p_two_primaries, cf;
3154         struct net_conf *nc, *old_net_conf, *new_net_conf = NULL;
3155         char integrity_alg[SHARED_SECRET_MAX] = "";
3156         struct crypto_hash *peer_integrity_tfm = NULL;
3157         void *int_dig_in = NULL, *int_dig_vv = NULL;
3158
3159         p_proto         = be32_to_cpu(p->protocol);
3160         p_after_sb_0p   = be32_to_cpu(p->after_sb_0p);
3161         p_after_sb_1p   = be32_to_cpu(p->after_sb_1p);
3162         p_after_sb_2p   = be32_to_cpu(p->after_sb_2p);
3163         p_two_primaries = be32_to_cpu(p->two_primaries);
3164         cf              = be32_to_cpu(p->conn_flags);
3165         p_discard_my_data = cf & CF_DISCARD_MY_DATA;
3166
3167         if (connection->agreed_pro_version >= 87) {
3168                 int err;
3169
3170                 if (pi->size > sizeof(integrity_alg))
3171                         return -EIO;
3172                 err = drbd_recv_all(connection, integrity_alg, pi->size);
3173                 if (err)
3174                         return err;
3175                 integrity_alg[SHARED_SECRET_MAX - 1] = 0;
3176         }
3177
3178         if (pi->cmd != P_PROTOCOL_UPDATE) {
3179                 clear_bit(CONN_DRY_RUN, &connection->flags);
3180
3181                 if (cf & CF_DRY_RUN)
3182                         set_bit(CONN_DRY_RUN, &connection->flags);
3183
3184                 rcu_read_lock();
3185                 nc = rcu_dereference(connection->net_conf);
3186
3187                 if (p_proto != nc->wire_protocol) {
3188                         drbd_err(connection, "incompatible %s settings\n", "protocol");
3189                         goto disconnect_rcu_unlock;
3190                 }
3191
3192                 if (convert_after_sb(p_after_sb_0p) != nc->after_sb_0p) {
3193                         drbd_err(connection, "incompatible %s settings\n", "after-sb-0pri");
3194                         goto disconnect_rcu_unlock;
3195                 }
3196
3197                 if (convert_after_sb(p_after_sb_1p) != nc->after_sb_1p) {
3198                         drbd_err(connection, "incompatible %s settings\n", "after-sb-1pri");
3199                         goto disconnect_rcu_unlock;
3200                 }
3201
3202                 if (convert_after_sb(p_after_sb_2p) != nc->after_sb_2p) {
3203                         drbd_err(connection, "incompatible %s settings\n", "after-sb-2pri");
3204                         goto disconnect_rcu_unlock;
3205                 }
3206
3207                 if (p_discard_my_data && nc->discard_my_data) {
3208                         drbd_err(connection, "incompatible %s settings\n", "discard-my-data");
3209                         goto disconnect_rcu_unlock;
3210                 }
3211
3212                 if (p_two_primaries != nc->two_primaries) {
3213                         drbd_err(connection, "incompatible %s settings\n", "allow-two-primaries");
3214                         goto disconnect_rcu_unlock;
3215                 }
3216
3217                 if (strcmp(integrity_alg, nc->integrity_alg)) {
3218                         drbd_err(connection, "incompatible %s settings\n", "data-integrity-alg");
3219                         goto disconnect_rcu_unlock;
3220                 }
3221
3222                 rcu_read_unlock();
3223         }
3224
3225         if (integrity_alg[0]) {
3226                 int hash_size;
3227
3228                 /*
3229                  * We can only change the peer data integrity algorithm
3230                  * here.  Changing our own data integrity algorithm
3231                  * requires that we send a P_PROTOCOL_UPDATE packet at
3232                  * the same time; otherwise, the peer has no way to
3233                  * tell between which packets the algorithm should
3234                  * change.
3235                  */
3236
3237                 peer_integrity_tfm = crypto_alloc_hash(integrity_alg, 0, CRYPTO_ALG_ASYNC);
3238                 if (!peer_integrity_tfm) {
3239                         drbd_err(connection, "peer data-integrity-alg %s not supported\n",
3240                                  integrity_alg);
3241                         goto disconnect;
3242                 }
3243
3244                 hash_size = crypto_hash_digestsize(peer_integrity_tfm);
3245                 int_dig_in = kmalloc(hash_size, GFP_KERNEL);
3246                 int_dig_vv = kmalloc(hash_size, GFP_KERNEL);
3247                 if (!(int_dig_in && int_dig_vv)) {
3248                         drbd_err(connection, "Allocation of buffers for data integrity checking failed\n");
3249                         goto disconnect;
3250                 }
3251         }
3252
3253         new_net_conf = kmalloc(sizeof(struct net_conf), GFP_KERNEL);
3254         if (!new_net_conf) {
3255                 drbd_err(connection, "Allocation of new net_conf failed\n");
3256                 goto disconnect;
3257         }
3258
3259         mutex_lock(&connection->data.mutex);
3260         mutex_lock(&connection->resource->conf_update);
3261         old_net_conf = connection->net_conf;
3262         *new_net_conf = *old_net_conf;
3263
3264         new_net_conf->wire_protocol = p_proto;
3265         new_net_conf->after_sb_0p = convert_after_sb(p_after_sb_0p);
3266         new_net_conf->after_sb_1p = convert_after_sb(p_after_sb_1p);
3267         new_net_conf->after_sb_2p = convert_after_sb(p_after_sb_2p);
3268         new_net_conf->two_primaries = p_two_primaries;
3269
3270         rcu_assign_pointer(connection->net_conf, new_net_conf);
3271         mutex_unlock(&connection->resource->conf_update);
3272         mutex_unlock(&connection->data.mutex);
3273
3274         crypto_free_hash(connection->peer_integrity_tfm);
3275         kfree(connection->int_dig_in);
3276         kfree(connection->int_dig_vv);
3277         connection->peer_integrity_tfm = peer_integrity_tfm;
3278         connection->int_dig_in = int_dig_in;
3279         connection->int_dig_vv = int_dig_vv;
3280
3281         if (strcmp(old_net_conf->integrity_alg, integrity_alg))
3282                 drbd_info(connection, "peer data-integrity-alg: %s\n",
3283                           integrity_alg[0] ? integrity_alg : "(none)");
3284
3285         synchronize_rcu();
3286         kfree(old_net_conf);
3287         return 0;
3288
3289 disconnect_rcu_unlock:
3290         rcu_read_unlock();
3291 disconnect:
3292         crypto_free_hash(peer_integrity_tfm);
3293         kfree(int_dig_in);
3294         kfree(int_dig_vv);
3295         conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
3296         return -EIO;
3297 }
3298
3299 /* helper function
3300  * input: alg name, feature name
3301  * return: NULL (alg name was "")
3302  *         ERR_PTR(error) if something goes wrong
3303  *         or the crypto hash ptr, if it worked out ok. */
3304 static
3305 struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_device *device,
3306                 const char *alg, const char *name)
3307 {
3308         struct crypto_hash *tfm;
3309
3310         if (!alg[0])
3311                 return NULL;
3312
3313         tfm = crypto_alloc_hash(alg, 0, CRYPTO_ALG_ASYNC);
3314         if (IS_ERR(tfm)) {
3315                 drbd_err(device, "Can not allocate \"%s\" as %s (reason: %ld)\n",
3316                         alg, name, PTR_ERR(tfm));
3317                 return tfm;
3318         }
3319         return tfm;
3320 }
3321
3322 static int ignore_remaining_packet(struct drbd_connection *connection, struct packet_info *pi)
3323 {
3324         void *buffer = connection->data.rbuf;
3325         int size = pi->size;
3326
3327         while (size) {
3328                 int s = min_t(int, size, DRBD_SOCKET_BUFFER_SIZE);
3329                 s = drbd_recv(connection, buffer, s);
3330                 if (s <= 0) {
3331                         if (s < 0)
3332                                 return s;
3333                         break;
3334                 }
3335                 size -= s;
3336         }
3337         if (size)
3338                 return -EIO;
3339         return 0;
3340 }
3341
3342 /*
3343  * config_unknown_volume  -  device configuration command for unknown volume
3344  *
3345  * When a device is added to an existing connection, the node on which the
3346  * device is added first will send configuration commands to its peer but the
3347  * peer will not know about the device yet.  It will warn and ignore these
3348  * commands.  Once the device is added on the second node, the second node will
3349  * send the same device configuration commands, but in the other direction.
3350  *
3351  * (We can also end up here if drbd is misconfigured.)
3352  */
3353 static int config_unknown_volume(struct drbd_connection *connection, struct packet_info *pi)
3354 {
3355         drbd_warn(connection, "%s packet received for volume %u, which is not configured locally\n",
3356                   cmdname(pi->cmd), pi->vnr);
3357         return ignore_remaining_packet(connection, pi);
3358 }
3359
3360 static int receive_SyncParam(struct drbd_connection *connection, struct packet_info *pi)
3361 {
3362         struct drbd_peer_device *peer_device;
3363         struct drbd_device *device;
3364         struct p_rs_param_95 *p;
3365         unsigned int header_size, data_size, exp_max_sz;
3366         struct crypto_hash *verify_tfm = NULL;
3367         struct crypto_hash *csums_tfm = NULL;
3368         struct net_conf *old_net_conf, *new_net_conf = NULL;
3369         struct disk_conf *old_disk_conf = NULL, *new_disk_conf = NULL;
3370         const int apv = connection->agreed_pro_version;
3371         struct fifo_buffer *old_plan = NULL, *new_plan = NULL;
3372         int fifo_size = 0;
3373         int err;
3374
3375         peer_device = conn_peer_device(connection, pi->vnr);
3376         if (!peer_device)
3377                 return config_unknown_volume(connection, pi);
3378         device = peer_device->device;
3379
3380         exp_max_sz  = apv <= 87 ? sizeof(struct p_rs_param)
3381                     : apv == 88 ? sizeof(struct p_rs_param)
3382                                         + SHARED_SECRET_MAX
3383                     : apv <= 94 ? sizeof(struct p_rs_param_89)
3384                     : /* apv >= 95 */ sizeof(struct p_rs_param_95);
3385
3386         if (pi->size > exp_max_sz) {
3387                 drbd_err(device, "SyncParam packet too long: received %u, expected <= %u bytes\n",
3388                     pi->size, exp_max_sz);
3389                 return -EIO;
3390         }
3391
3392         if (apv <= 88) {
3393                 header_size = sizeof(struct p_rs_param);
3394                 data_size = pi->size - header_size;
3395         } else if (apv <= 94) {
3396                 header_size = sizeof(struct p_rs_param_89);
3397                 data_size = pi->size - header_size;
3398                 D_ASSERT(device, data_size == 0);
3399         } else {
3400                 header_size = sizeof(struct p_rs_param_95);
3401                 data_size = pi->size - header_size;
3402                 D_ASSERT(device, data_size == 0);
3403         }
3404
3405         /* initialize verify_alg and csums_alg */
3406         p = pi->data;
3407         memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
3408
3409         err = drbd_recv_all(peer_device->connection, p, header_size);
3410         if (err)
3411                 return err;
3412
3413         mutex_lock(&connection->resource->conf_update);
3414         old_net_conf = peer_device->connection->net_conf;
3415         if (get_ldev(device)) {
3416                 new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3417                 if (!new_disk_conf) {
3418                         put_ldev(device);
3419                         mutex_unlock(&connection->resource->conf_update);
3420                         drbd_err(device, "Allocation of new disk_conf failed\n");
3421                         return -ENOMEM;
3422                 }
3423
3424                 old_disk_conf = device->ldev->disk_conf;
3425                 *new_disk_conf = *old_disk_conf;
3426
3427                 new_disk_conf->resync_rate = be32_to_cpu(p->resync_rate);
3428         }
3429
3430         if (apv >= 88) {
3431                 if (apv == 88) {
3432                         if (data_size > SHARED_SECRET_MAX || data_size == 0) {
3433                                 drbd_err(device, "verify-alg of wrong size, "
3434                                         "peer wants %u, accepting only up to %u byte\n",
3435                                         data_size, SHARED_SECRET_MAX);
3436                                 err = -EIO;
3437                                 goto reconnect;
3438                         }
3439
3440                         err = drbd_recv_all(peer_device->connection, p->verify_alg, data_size);
3441                         if (err)
3442                                 goto reconnect;
3443                         /* we expect NUL terminated string */
3444                         /* but just in case someone tries to be evil */
3445                         D_ASSERT(device, p->verify_alg[data_size-1] == 0);
3446                         p->verify_alg[data_size-1] = 0;
3447
3448                 } else /* apv >= 89 */ {
3449                         /* we still expect NUL terminated strings */
3450                         /* but just in case someone tries to be evil */
3451                         D_ASSERT(device, p->verify_alg[SHARED_SECRET_MAX-1] == 0);
3452                         D_ASSERT(device, p->csums_alg[SHARED_SECRET_MAX-1] == 0);
3453                         p->verify_alg[SHARED_SECRET_MAX-1] = 0;
3454                         p->csums_alg[SHARED_SECRET_MAX-1] = 0;
3455                 }
3456
3457                 if (strcmp(old_net_conf->verify_alg, p->verify_alg)) {
3458                         if (device->state.conn == C_WF_REPORT_PARAMS) {
3459                                 drbd_err(device, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
3460                                     old_net_conf->verify_alg, p->verify_alg);
3461                                 goto disconnect;
3462                         }
3463                         verify_tfm = drbd_crypto_alloc_digest_safe(device,
3464                                         p->verify_alg, "verify-alg");
3465                         if (IS_ERR(verify_tfm)) {
3466                                 verify_tfm = NULL;
3467                                 goto disconnect;
3468                         }
3469                 }
3470
3471                 if (apv >= 89 && strcmp(old_net_conf->csums_alg, p->csums_alg)) {
3472                         if (device->state.conn == C_WF_REPORT_PARAMS) {
3473                                 drbd_err(device, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
3474                                     old_net_conf->csums_alg, p->csums_alg);
3475                                 goto disconnect;
3476                         }
3477                         csums_tfm = drbd_crypto_alloc_digest_safe(device,
3478                                         p->csums_alg, "csums-alg");
3479                         if (IS_ERR(csums_tfm)) {
3480                                 csums_tfm = NULL;
3481                                 goto disconnect;
3482                         }
3483                 }
3484
3485                 if (apv > 94 && new_disk_conf) {
3486                         new_disk_conf->c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
3487                         new_disk_conf->c_delay_target = be32_to_cpu(p->c_delay_target);
3488                         new_disk_conf->c_fill_target = be32_to_cpu(p->c_fill_target);
3489                         new_disk_conf->c_max_rate = be32_to_cpu(p->c_max_rate);
3490
3491                         fifo_size = (new_disk_conf->c_plan_ahead * 10 * SLEEP_TIME) / HZ;
3492                         if (fifo_size != device->rs_plan_s->size) {
3493                                 new_plan = fifo_alloc(fifo_size);
3494                                 if (!new_plan) {
3495                                         drbd_err(device, "kmalloc of fifo_buffer failed");
3496                                         put_ldev(device);
3497                                         goto disconnect;
3498                                 }
3499                         }
3500                 }
3501
3502                 if (verify_tfm || csums_tfm) {
3503                         new_net_conf = kzalloc(sizeof(struct net_conf), GFP_KERNEL);
3504                         if (!new_net_conf) {
3505                                 drbd_err(device, "Allocation of new net_conf failed\n");
3506                                 goto disconnect;
3507                         }
3508
3509                         *new_net_conf = *old_net_conf;
3510
3511                         if (verify_tfm) {
3512                                 strcpy(new_net_conf->verify_alg, p->verify_alg);
3513                                 new_net_conf->verify_alg_len = strlen(p->verify_alg) + 1;
3514                                 crypto_free_hash(peer_device->connection->verify_tfm);
3515                                 peer_device->connection->verify_tfm = verify_tfm;
3516                                 drbd_info(device, "using verify-alg: \"%s\"\n", p->verify_alg);
3517                         }
3518                         if (csums_tfm) {
3519                                 strcpy(new_net_conf->csums_alg, p->csums_alg);
3520                                 new_net_conf->csums_alg_len = strlen(p->csums_alg) + 1;
3521                                 crypto_free_hash(peer_device->connection->csums_tfm);
3522                                 peer_device->connection->csums_tfm = csums_tfm;
3523                                 drbd_info(device, "using csums-alg: \"%s\"\n", p->csums_alg);
3524                         }
3525                         rcu_assign_pointer(connection->net_conf, new_net_conf);
3526                 }
3527         }
3528
3529         if (new_disk_conf) {
3530                 rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf);
3531                 put_ldev(device);
3532         }
3533
3534         if (new_plan) {
3535                 old_plan = device->rs_plan_s;
3536                 rcu_assign_pointer(device->rs_plan_s, new_plan);
3537         }
3538
3539         mutex_unlock(&connection->resource->conf_update);
3540         synchronize_rcu();
3541         if (new_net_conf)
3542                 kfree(old_net_conf);
3543         kfree(old_disk_conf);
3544         kfree(old_plan);
3545
3546         return 0;
3547
3548 reconnect:
3549         if (new_disk_conf) {
3550                 put_ldev(device);
3551                 kfree(new_disk_conf);
3552         }
3553         mutex_unlock(&connection->resource->conf_update);
3554         return -EIO;
3555
3556 disconnect:
3557         kfree(new_plan);
3558         if (new_disk_conf) {
3559                 put_ldev(device);
3560                 kfree(new_disk_conf);
3561         }
3562         mutex_unlock(&connection->resource->conf_update);
3563         /* just for completeness: actually not needed,
3564          * as this is not reached if csums_tfm was ok. */
3565         crypto_free_hash(csums_tfm);
3566         /* but free the verify_tfm again, if csums_tfm did not work out */
3567         crypto_free_hash(verify_tfm);
3568         conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
3569         return -EIO;
3570 }
3571
3572 /* warn if the arguments differ by more than 12.5% */
3573 static void warn_if_differ_considerably(struct drbd_device *device,
3574         const char *s, sector_t a, sector_t b)
3575 {
3576         sector_t d;
3577         if (a == 0 || b == 0)
3578                 return;
3579         d = (a > b) ? (a - b) : (b - a);
3580         if (d > (a>>3) || d > (b>>3))
3581                 drbd_warn(device, "Considerable difference in %s: %llus vs. %llus\n", s,
3582                      (unsigned long long)a, (unsigned long long)b);
3583 }
3584
3585 static int receive_sizes(struct drbd_connection *connection, struct packet_info *pi)
3586 {
3587         struct drbd_peer_device *peer_device;
3588         struct drbd_device *device;
3589         struct p_sizes *p = pi->data;
3590         enum determine_dev_size dd = DS_UNCHANGED;
3591         sector_t p_size, p_usize, my_usize;
3592         int ldsc = 0; /* local disk size changed */
3593         enum dds_flags ddsf;
3594
3595         peer_device = conn_peer_device(connection, pi->vnr);
3596         if (!peer_device)
3597                 return config_unknown_volume(connection, pi);
3598         device = peer_device->device;
3599
3600         p_size = be64_to_cpu(p->d_size);
3601         p_usize = be64_to_cpu(p->u_size);
3602
3603         /* just store the peer's disk size for now.
3604          * we still need to figure out whether we accept that. */
3605         device->p_size = p_size;
3606
3607         if (get_ldev(device)) {
3608                 rcu_read_lock();
3609                 my_usize = rcu_dereference(device->ldev->disk_conf)->disk_size;
3610                 rcu_read_unlock();
3611
3612                 warn_if_differ_considerably(device, "lower level device sizes",
3613                            p_size, drbd_get_max_capacity(device->ldev));
3614                 warn_if_differ_considerably(device, "user requested size",
3615                                             p_usize, my_usize);
3616
3617                 /* if this is the first connect, or an otherwise expected
3618                  * param exchange, choose the minimum */
3619                 if (device->state.conn == C_WF_REPORT_PARAMS)
3620                         p_usize = min_not_zero(my_usize, p_usize);
3621
3622                 /* Never shrink a device with usable data during connect.
3623                    But allow online shrinking if we are connected. */
3624                 if (drbd_new_dev_size(device, device->ldev, p_usize, 0) <
3625                     drbd_get_capacity(device->this_bdev) &&
3626                     device->state.disk >= D_OUTDATED &&
3627                     device->state.conn < C_CONNECTED) {
3628                         drbd_err(device, "The peer's disk size is too small!\n");
3629                         conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
3630                         put_ldev(device);
3631                         return -EIO;
3632                 }
3633
3634                 if (my_usize != p_usize) {
3635                         struct disk_conf *old_disk_conf, *new_disk_conf = NULL;
3636
3637                         new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3638                         if (!new_disk_conf) {
3639                                 drbd_err(device, "Allocation of new disk_conf failed\n");
3640                                 put_ldev(device);
3641                                 return -ENOMEM;
3642                         }
3643
3644                         mutex_lock(&connection->resource->conf_update);
3645                         old_disk_conf = device->ldev->disk_conf;
3646                         *new_disk_conf = *old_disk_conf;
3647                         new_disk_conf->disk_size = p_usize;
3648
3649                         rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf);
3650                         mutex_unlock(&connection->resource->conf_update);
3651                         synchronize_rcu();
3652                         kfree(old_disk_conf);
3653
3654                         drbd_info(device, "Peer sets u_size to %lu sectors\n",
3655                                  (unsigned long)my_usize);
3656                 }
3657
3658                 put_ldev(device);
3659         }
3660
3661         ddsf = be16_to_cpu(p->dds_flags);
3662         if (get_ldev(device)) {
3663                 dd = drbd_determine_dev_size(device, ddsf, NULL);
3664                 put_ldev(device);
3665                 if (dd == DS_ERROR)
3666                         return -EIO;
3667                 drbd_md_sync(device);
3668         } else {
3669                 /* I am diskless, need to accept the peer's size. */
3670                 drbd_set_my_capacity(device, p_size);
3671         }
3672
3673         device->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
3674         drbd_reconsider_max_bio_size(device);
3675
3676         if (get_ldev(device)) {
3677                 if (device->ldev->known_size != drbd_get_capacity(device->ldev->backing_bdev)) {
3678                         device->ldev->known_size = drbd_get_capacity(device->ldev->backing_bdev);
3679                         ldsc = 1;
3680                 }
3681
3682                 put_ldev(device);
3683         }
3684
3685         if (device->state.conn > C_WF_REPORT_PARAMS) {
3686                 if (be64_to_cpu(p->c_size) !=
3687                     drbd_get_capacity(device->this_bdev) || ldsc) {
3688                         /* we have different sizes, probably peer
3689                          * needs to know my new size... */
3690                         drbd_send_sizes(peer_device, 0, ddsf);
3691                 }
3692                 if (test_and_clear_bit(RESIZE_PENDING, &device->flags) ||
3693                     (dd == DS_GREW && device->state.conn == C_CONNECTED)) {
3694                         if (device->state.pdsk >= D_INCONSISTENT &&
3695                             device->state.disk >= D_INCONSISTENT) {
3696                                 if (ddsf & DDSF_NO_RESYNC)
3697                                         drbd_info(device, "Resync of new storage suppressed with --assume-clean\n");
3698                                 else
3699                                         resync_after_online_grow(device);
3700                         } else
3701                                 set_bit(RESYNC_AFTER_NEG, &device->flags);
3702                 }
3703         }
3704
3705         return 0;
3706 }
3707
3708 static int receive_uuids(struct drbd_connection *connection, struct packet_info *pi)
3709 {
3710         struct drbd_peer_device *peer_device;
3711         struct drbd_device *device;
3712         struct p_uuids *p = pi->data;
3713         u64 *p_uuid;
3714         int i, updated_uuids = 0;
3715
3716         peer_device = conn_peer_device(connection, pi->vnr);
3717         if (!peer_device)
3718                 return config_unknown_volume(connection, pi);
3719         device = peer_device->device;
3720
3721         p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
3722         if (!p_uuid) {
3723                 drbd_err(device, "kmalloc of p_uuid failed\n");
3724                 return false;
3725         }
3726
3727         for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
3728                 p_uuid[i] = be64_to_cpu(p->uuid[i]);
3729
3730         kfree(device->p_uuid);
3731         device->p_uuid = p_uuid;
3732
3733         if (device->state.conn < C_CONNECTED &&
3734             device->state.disk < D_INCONSISTENT &&
3735             device->state.role == R_PRIMARY &&
3736             (device->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
3737                 drbd_err(device, "Can only connect to data with current UUID=%016llX\n",
3738                     (unsigned long long)device->ed_uuid);
3739                 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
3740                 return -EIO;
3741         }
3742
3743         if (get_ldev(device)) {
3744                 int skip_initial_sync =
3745                         device->state.conn == C_CONNECTED &&
3746                         peer_device->connection->agreed_pro_version >= 90 &&
3747                         device->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
3748                         (p_uuid[UI_FLAGS] & 8);
3749                 if (skip_initial_sync) {
3750                         drbd_info(device, "Accepted new current UUID, preparing to skip initial sync\n");
3751                         drbd_bitmap_io(device, &drbd_bmio_clear_n_write,
3752                                         "clear_n_write from receive_uuids",
3753                                         BM_LOCKED_TEST_ALLOWED);
3754                         _drbd_uuid_set(device, UI_CURRENT, p_uuid[UI_CURRENT]);
3755                         _drbd_uuid_set(device, UI_BITMAP, 0);
3756                         _drbd_set_state(_NS2(device, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
3757                                         CS_VERBOSE, NULL);
3758                         drbd_md_sync(device);
3759                         updated_uuids = 1;
3760                 }
3761                 put_ldev(device);
3762         } else if (device->state.disk < D_INCONSISTENT &&
3763                    device->state.role == R_PRIMARY) {
3764                 /* I am a diskless primary, the peer just created a new current UUID
3765                    for me. */
3766                 updated_uuids = drbd_set_ed_uuid(device, p_uuid[UI_CURRENT]);
3767         }
3768
3769         /* Before we test for the disk state, we should wait until an eventually
3770            ongoing cluster wide state change is finished. That is important if
3771            we are primary and are detaching from our disk. We need to see the
3772            new disk state... */
3773         mutex_lock(device->state_mutex);
3774         mutex_unlock(device->state_mutex);
3775         if (device->state.conn >= C_CONNECTED && device->state.disk < D_INCONSISTENT)
3776                 updated_uuids |= drbd_set_ed_uuid(device, p_uuid[UI_CURRENT]);
3777
3778         if (updated_uuids)
3779                 drbd_print_uuids(device, "receiver updated UUIDs to");
3780
3781         return 0;
3782 }
3783
3784 /**
3785  * convert_state() - Converts the peer's view of the cluster state to our point of view
3786  * @ps:         The state as seen by the peer.
3787  */
3788 static union drbd_state convert_state(union drbd_state ps)
3789 {
3790         union drbd_state ms;
3791
3792         static enum drbd_conns c_tab[] = {
3793                 [C_WF_REPORT_PARAMS] = C_WF_REPORT_PARAMS,
3794                 [C_CONNECTED] = C_CONNECTED,
3795
3796                 [C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
3797                 [C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
3798                 [C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
3799                 [C_VERIFY_S]       = C_VERIFY_T,
3800                 [C_MASK]   = C_MASK,
3801         };
3802
3803         ms.i = ps.i;
3804
3805         ms.conn = c_tab[ps.conn];
3806         ms.peer = ps.role;
3807         ms.role = ps.peer;
3808         ms.pdsk = ps.disk;
3809         ms.disk = ps.pdsk;
3810         ms.peer_isp = (ps.aftr_isp | ps.user_isp);
3811
3812         return ms;
3813 }
3814
3815 static int receive_req_state(struct drbd_connection *connection, struct packet_info *pi)
3816 {
3817         struct drbd_peer_device *peer_device;
3818         struct drbd_device *device;
3819         struct p_req_state *p = pi->data;
3820         union drbd_state mask, val;
3821         enum drbd_state_rv rv;
3822
3823         peer_device = conn_peer_device(connection, pi->vnr);
3824         if (!peer_device)
3825                 return -EIO;
3826         device = peer_device->device;
3827
3828         mask.i = be32_to_cpu(p->mask);
3829         val.i = be32_to_cpu(p->val);
3830
3831         if (test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags) &&
3832             mutex_is_locked(device->state_mutex)) {
3833                 drbd_send_sr_reply(peer_device, SS_CONCURRENT_ST_CHG);
3834                 return 0;
3835         }
3836
3837         mask = convert_state(mask);
3838         val = convert_state(val);
3839
3840         rv = drbd_change_state(device, CS_VERBOSE, mask, val);
3841         drbd_send_sr_reply(peer_device, rv);
3842
3843         drbd_md_sync(device);
3844
3845         return 0;
3846 }
3847
3848 static int receive_req_conn_state(struct drbd_connection *connection, struct packet_info *pi)
3849 {
3850         struct p_req_state *p = pi->data;
3851         union drbd_state mask, val;
3852         enum drbd_state_rv rv;
3853
3854         mask.i = be32_to_cpu(p->mask);
3855         val.i = be32_to_cpu(p->val);
3856
3857         if (test_bit(RESOLVE_CONFLICTS, &connection->flags) &&
3858             mutex_is_locked(&connection->cstate_mutex)) {
3859                 conn_send_sr_reply(connection, SS_CONCURRENT_ST_CHG);
3860                 return 0;
3861         }
3862
3863         mask = convert_state(mask);
3864         val = convert_state(val);
3865
3866         rv = conn_request_state(connection, mask, val, CS_VERBOSE | CS_LOCAL_ONLY | CS_IGN_OUTD_FAIL);
3867         conn_send_sr_reply(connection, rv);
3868
3869         return 0;
3870 }
3871
3872 static int receive_state(struct drbd_connection *connection, struct packet_info *pi)
3873 {
3874         struct drbd_peer_device *peer_device;
3875         struct drbd_device *device;
3876         struct p_state *p = pi->data;
3877         union drbd_state os, ns, peer_state;
3878         enum drbd_disk_state real_peer_disk;
3879         enum chg_state_flags cs_flags;
3880         int rv;
3881
3882         peer_device = conn_peer_device(connection, pi->vnr);
3883         if (!peer_device)
3884                 return config_unknown_volume(connection, pi);
3885         device = peer_device->device;
3886
3887         peer_state.i = be32_to_cpu(p->state);
3888
3889         real_peer_disk = peer_state.disk;
3890         if (peer_state.disk == D_NEGOTIATING) {
3891                 real_peer_disk = device->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
3892                 drbd_info(device, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
3893         }
3894
3895         spin_lock_irq(&device->resource->req_lock);
3896  retry:
3897         os = ns = drbd_read_state(device);
3898         spin_unlock_irq(&device->resource->req_lock);
3899
3900         /* If some other part of the code (asender thread, timeout)
3901          * already decided to close the connection again,
3902          * we must not "re-establish" it here. */
3903         if (os.conn <= C_TEAR_DOWN)
3904                 return -ECONNRESET;
3905
3906         /* If this is the "end of sync" confirmation, usually the peer disk
3907          * transitions from D_INCONSISTENT to D_UP_TO_DATE. For empty (0 bits
3908          * set) resync started in PausedSyncT, or if the timing of pause-/
3909          * unpause-sync events has been "just right", the peer disk may
3910          * transition from D_CONSISTENT to D_UP_TO_DATE as well.
3911          */
3912         if ((os.pdsk == D_INCONSISTENT || os.pdsk == D_CONSISTENT) &&
3913             real_peer_disk == D_UP_TO_DATE &&
3914             os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
3915                 /* If we are (becoming) SyncSource, but peer is still in sync
3916                  * preparation, ignore its uptodate-ness to avoid flapping, it
3917                  * will change to inconsistent once the peer reaches active
3918                  * syncing states.
3919                  * It may have changed syncer-paused flags, however, so we
3920                  * cannot ignore this completely. */
3921                 if (peer_state.conn > C_CONNECTED &&
3922                     peer_state.conn < C_SYNC_SOURCE)
3923                         real_peer_disk = D_INCONSISTENT;
3924
3925                 /* if peer_state changes to connected at the same time,
3926                  * it explicitly notifies us that it finished resync.
3927                  * Maybe we should finish it up, too? */
3928                 else if (os.conn >= C_SYNC_SOURCE &&
3929                          peer_state.conn == C_CONNECTED) {
3930                         if (drbd_bm_total_weight(device) <= device->rs_failed)
3931                                 drbd_resync_finished(device);
3932                         return 0;
3933                 }
3934         }
3935
3936         /* explicit verify finished notification, stop sector reached. */
3937         if (os.conn == C_VERIFY_T && os.disk == D_UP_TO_DATE &&
3938             peer_state.conn == C_CONNECTED && real_peer_disk == D_UP_TO_DATE) {
3939                 ov_out_of_sync_print(device);
3940                 drbd_resync_finished(device);
3941                 return 0;
3942         }
3943
3944         /* peer says his disk is inconsistent, while we think it is uptodate,
3945          * and this happens while the peer still thinks we have a sync going on,
3946          * but we think we are already done with the sync.
3947          * We ignore this to avoid flapping pdsk.
3948          * This should not happen, if the peer is a recent version of drbd. */
3949         if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
3950             os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
3951                 real_peer_disk = D_UP_TO_DATE;
3952
3953         if (ns.conn == C_WF_REPORT_PARAMS)
3954                 ns.conn = C_CONNECTED;
3955
3956         if (peer_state.conn == C_AHEAD)
3957                 ns.conn = C_BEHIND;
3958
3959         if (device->p_uuid && peer_state.disk >= D_NEGOTIATING &&
3960             get_ldev_if_state(device, D_NEGOTIATING)) {
3961                 int cr; /* consider resync */
3962
3963                 /* if we established a new connection */
3964                 cr  = (os.conn < C_CONNECTED);
3965                 /* if we had an established connection
3966                  * and one of the nodes newly attaches a disk */
3967                 cr |= (os.conn == C_CONNECTED &&
3968                        (peer_state.disk == D_NEGOTIATING ||
3969                         os.disk == D_NEGOTIATING));
3970                 /* if we have both been inconsistent, and the peer has been
3971                  * forced to be UpToDate with --overwrite-data */
3972                 cr |= test_bit(CONSIDER_RESYNC, &device->flags);
3973                 /* if we had been plain connected, and the admin requested to
3974                  * start a sync by "invalidate" or "invalidate-remote" */
3975                 cr |= (os.conn == C_CONNECTED &&
3976                                 (peer_state.conn >= C_STARTING_SYNC_S &&
3977                                  peer_state.conn <= C_WF_BITMAP_T));
3978
3979                 if (cr)
3980                         ns.conn = drbd_sync_handshake(peer_device, peer_state.role, real_peer_disk);
3981
3982                 put_ldev(device);
3983                 if (ns.conn == C_MASK) {
3984                         ns.conn = C_CONNECTED;
3985                         if (device->state.disk == D_NEGOTIATING) {
3986                                 drbd_force_state(device, NS(disk, D_FAILED));
3987                         } else if (peer_state.disk == D_NEGOTIATING) {
3988                                 drbd_err(device, "Disk attach process on the peer node was aborted.\n");
3989                                 peer_state.disk = D_DISKLESS;
3990                                 real_peer_disk = D_DISKLESS;
3991                         } else {
3992                                 if (test_and_clear_bit(CONN_DRY_RUN, &peer_device->connection->flags))
3993                                         return -EIO;
3994                                 D_ASSERT(device, os.conn == C_WF_REPORT_PARAMS);
3995                                 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
3996                                 return -EIO;
3997                         }
3998                 }
3999         }
4000
4001         spin_lock_irq(&device->resource->req_lock);
4002         if (os.i != drbd_read_state(device).i)
4003                 goto retry;
4004         clear_bit(CONSIDER_RESYNC, &device->flags);
4005         ns.peer = peer_state.role;
4006         ns.pdsk = real_peer_disk;
4007         ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
4008         if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
4009                 ns.disk = device->new_state_tmp.disk;
4010         cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
4011         if (ns.pdsk == D_CONSISTENT && drbd_suspended(device) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
4012             test_bit(NEW_CUR_UUID, &device->flags)) {
4013                 /* Do not allow tl_restart(RESEND) for a rebooted peer. We can only allow this
4014                    for temporal network outages! */
4015                 spin_unlock_irq(&device->resource->req_lock);
4016                 drbd_err(device, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
4017                 tl_clear(peer_device->connection);
4018                 drbd_uuid_new_current(device);
4019                 clear_bit(NEW_CUR_UUID, &device->flags);
4020                 conn_request_state(peer_device->connection, NS2(conn, C_PROTOCOL_ERROR, susp, 0), CS_HARD);
4021                 return -EIO;
4022         }
4023         rv = _drbd_set_state(device, ns, cs_flags, NULL);
4024         ns = drbd_read_state(device);
4025         spin_unlock_irq(&device->resource->req_lock);
4026
4027         if (rv < SS_SUCCESS) {
4028                 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4029                 return -EIO;
4030         }
4031
4032         if (os.conn > C_WF_REPORT_PARAMS) {
4033                 if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
4034                     peer_state.disk != D_NEGOTIATING ) {
4035                         /* we want resync, peer has not yet decided to sync... */
4036                         /* Nowadays only used when forcing a node into primary role and
4037                            setting its disk to UpToDate with that */
4038                         drbd_send_uuids(peer_device);
4039                         drbd_send_current_state(peer_device);
4040                 }
4041         }
4042
4043         clear_bit(DISCARD_MY_DATA, &device->flags);
4044
4045         drbd_md_sync(device); /* update connected indicator, la_size_sect, ... */
4046
4047         return 0;
4048 }
4049
4050 static int receive_sync_uuid(struct drbd_connection *connection, struct packet_info *pi)
4051 {
4052         struct drbd_peer_device *peer_device;
4053         struct drbd_device *device;
4054         struct p_rs_uuid *p = pi->data;
4055
4056         peer_device = conn_peer_device(connection, pi->vnr);
4057         if (!peer_device)
4058                 return -EIO;
4059         device = peer_device->device;
4060
4061         wait_event(device->misc_wait,
4062                    device->state.conn == C_WF_SYNC_UUID ||
4063                    device->state.conn == C_BEHIND ||
4064                    device->state.conn < C_CONNECTED ||
4065                    device->state.disk < D_NEGOTIATING);
4066
4067         /* D_ASSERT(device,  device->state.conn == C_WF_SYNC_UUID ); */
4068
4069         /* Here the _drbd_uuid_ functions are right, current should
4070            _not_ be rotated into the history */
4071         if (get_ldev_if_state(device, D_NEGOTIATING)) {
4072                 _drbd_uuid_set(device, UI_CURRENT, be64_to_cpu(p->uuid));
4073                 _drbd_uuid_set(device, UI_BITMAP, 0UL);
4074
4075                 drbd_print_uuids(device, "updated sync uuid");
4076                 drbd_start_resync(device, C_SYNC_TARGET);
4077
4078                 put_ldev(device);
4079         } else
4080                 drbd_err(device, "Ignoring SyncUUID packet!\n");
4081
4082         return 0;
4083 }
4084
4085 /**
4086  * receive_bitmap_plain
4087  *
4088  * Return 0 when done, 1 when another iteration is needed, and a negative error
4089  * code upon failure.
4090  */
4091 static int
4092 receive_bitmap_plain(struct drbd_peer_device *peer_device, unsigned int size,
4093                      unsigned long *p, struct bm_xfer_ctx *c)
4094 {
4095         unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE -
4096                                  drbd_header_size(peer_device->connection);
4097         unsigned int num_words = min_t(size_t, data_size / sizeof(*p),
4098                                        c->bm_words - c->word_offset);
4099         unsigned int want = num_words * sizeof(*p);
4100         int err;
4101
4102         if (want != size) {
4103                 drbd_err(peer_device, "%s:want (%u) != size (%u)\n", __func__, want, size);
4104                 return -EIO;
4105         }
4106         if (want == 0)
4107                 return 0;
4108         err = drbd_recv_all(peer_device->connection, p, want);
4109         if (err)
4110                 return err;
4111
4112         drbd_bm_merge_lel(peer_device->device, c->word_offset, num_words, p);
4113
4114         c->word_offset += num_words;
4115         c->bit_offset = c->word_offset * BITS_PER_LONG;
4116         if (c->bit_offset > c->bm_bits)
4117                 c->bit_offset = c->bm_bits;
4118
4119         return 1;
4120 }
4121
4122 static enum drbd_bitmap_code dcbp_get_code(struct p_compressed_bm *p)
4123 {
4124         return (enum drbd_bitmap_code)(p->encoding & 0x0f);
4125 }
4126
4127 static int dcbp_get_start(struct p_compressed_bm *p)
4128 {
4129         return (p->encoding & 0x80) != 0;
4130 }
4131
4132 static int dcbp_get_pad_bits(struct p_compressed_bm *p)
4133 {
4134         return (p->encoding >> 4) & 0x7;
4135 }
4136
4137 /**
4138  * recv_bm_rle_bits
4139  *
4140  * Return 0 when done, 1 when another iteration is needed, and a negative error
4141  * code upon failure.
4142  */
4143 static int
4144 recv_bm_rle_bits(struct drbd_peer_device *peer_device,
4145                 struct p_compressed_bm *p,
4146                  struct bm_xfer_ctx *c,
4147                  unsigned int len)
4148 {
4149         struct bitstream bs;
4150         u64 look_ahead;
4151         u64 rl;
4152         u64 tmp;
4153         unsigned long s = c->bit_offset;
4154         unsigned long e;
4155         int toggle = dcbp_get_start(p);
4156         int have;
4157         int bits;
4158
4159         bitstream_init(&bs, p->code, len, dcbp_get_pad_bits(p));
4160
4161         bits = bitstream_get_bits(&bs, &look_ahead, 64);
4162         if (bits < 0)
4163                 return -EIO;
4164
4165         for (have = bits; have > 0; s += rl, toggle = !toggle) {
4166                 bits = vli_decode_bits(&rl, look_ahead);
4167                 if (bits <= 0)
4168                         return -EIO;
4169
4170                 if (toggle) {
4171                         e = s + rl -1;
4172                         if (e >= c->bm_bits) {
4173                                 drbd_err(peer_device, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
4174                                 return -EIO;
4175                         }
4176                         _drbd_bm_set_bits(peer_device->device, s, e);
4177                 }
4178
4179                 if (have < bits) {
4180                         drbd_err(peer_device, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
4181                                 have, bits, look_ahead,
4182                                 (unsigned int)(bs.cur.b - p->code),
4183                                 (unsigned int)bs.buf_len);
4184                         return -EIO;
4185                 }
4186                 /* if we consumed all 64 bits, assign 0; >> 64 is "undefined"; */
4187                 if (likely(bits < 64))
4188                         look_ahead >>= bits;
4189                 else
4190                         look_ahead = 0;
4191                 have -= bits;
4192
4193                 bits = bitstream_get_bits(&bs, &tmp, 64 - have);
4194                 if (bits < 0)
4195                         return -EIO;
4196                 look_ahead |= tmp << have;
4197                 have += bits;
4198         }
4199
4200         c->bit_offset = s;
4201         bm_xfer_ctx_bit_to_word_offset(c);
4202
4203         return (s != c->bm_bits);
4204 }
4205
4206 /**
4207  * decode_bitmap_c
4208  *
4209  * Return 0 when done, 1 when another iteration is needed, and a negative error
4210  * code upon failure.
4211  */
4212 static int
4213 decode_bitmap_c(struct drbd_peer_device *peer_device,
4214                 struct p_compressed_bm *p,
4215                 struct bm_xfer_ctx *c,
4216                 unsigned int len)
4217 {
4218         if (dcbp_get_code(p) == RLE_VLI_Bits)
4219                 return recv_bm_rle_bits(peer_device, p, c, len - sizeof(*p));
4220
4221         /* other variants had been implemented for evaluation,
4222          * but have been dropped as this one turned out to be "best"
4223          * during all our tests. */
4224
4225         drbd_err(peer_device, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
4226         conn_request_state(peer_device->connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4227         return -EIO;
4228 }
4229
4230 void INFO_bm_xfer_stats(struct drbd_device *device,
4231                 const char *direction, struct bm_xfer_ctx *c)
4232 {
4233         /* what would it take to transfer it "plaintext" */
4234         unsigned int header_size = drbd_header_size(first_peer_device(device)->connection);
4235         unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE - header_size;
4236         unsigned int plain =
4237                 header_size * (DIV_ROUND_UP(c->bm_words, data_size) + 1) +
4238                 c->bm_words * sizeof(unsigned long);
4239         unsigned int total = c->bytes[0] + c->bytes[1];
4240         unsigned int r;
4241
4242         /* total can not be zero. but just in case: */
4243         if (total == 0)
4244                 return;
4245
4246         /* don't report if not compressed */
4247         if (total >= plain)
4248                 return;
4249
4250         /* total < plain. check for overflow, still */
4251         r = (total > UINT_MAX/1000) ? (total / (plain/1000))
4252                                     : (1000 * total / plain);
4253
4254         if (r > 1000)
4255                 r = 1000;
4256
4257         r = 1000 - r;
4258         drbd_info(device, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
4259              "total %u; compression: %u.%u%%\n",
4260                         direction,
4261                         c->bytes[1], c->packets[1],
4262                         c->bytes[0], c->packets[0],
4263                         total, r/10, r % 10);
4264 }
4265
4266 /* Since we are processing the bitfield from lower addresses to higher,
4267    it does not matter if the process it in 32 bit chunks or 64 bit
4268    chunks as long as it is little endian. (Understand it as byte stream,
4269    beginning with the lowest byte...) If we would use big endian
4270    we would need to process it from the highest address to the lowest,
4271    in order to be agnostic to the 32 vs 64 bits issue.
4272
4273    returns 0 on failure, 1 if we successfully received it. */
4274 static int receive_bitmap(struct drbd_connection *connection, struct packet_info *pi)
4275 {
4276         struct drbd_peer_device *peer_device;
4277         struct drbd_device *device;
4278         struct bm_xfer_ctx c;
4279         int err;
4280
4281         peer_device = conn_peer_device(connection, pi->vnr);
4282         if (!peer_device)
4283                 return -EIO;
4284         device = peer_device->device;
4285
4286         drbd_bm_lock(device, "receive bitmap", BM_LOCKED_SET_ALLOWED);
4287         /* you are supposed to send additional out-of-sync information
4288          * if you actually set bits during this phase */
4289
4290         c = (struct bm_xfer_ctx) {
4291                 .bm_bits = drbd_bm_bits(device),
4292                 .bm_words = drbd_bm_words(device),
4293         };
4294
4295         for(;;) {
4296                 if (pi->cmd == P_BITMAP)
4297                         err = receive_bitmap_plain(peer_device, pi->size, pi->data, &c);
4298                 else if (pi->cmd == P_COMPRESSED_BITMAP) {
4299                         /* MAYBE: sanity check that we speak proto >= 90,
4300                          * and the feature is enabled! */
4301                         struct p_compressed_bm *p = pi->data;
4302
4303                         if (pi->size > DRBD_SOCKET_BUFFER_SIZE - drbd_header_size(connection)) {
4304                                 drbd_err(device, "ReportCBitmap packet too large\n");
4305                                 err = -EIO;
4306                                 goto out;
4307                         }
4308                         if (pi->size <= sizeof(*p)) {
4309                                 drbd_err(device, "ReportCBitmap packet too small (l:%u)\n", pi->size);
4310                                 err = -EIO;
4311                                 goto out;
4312                         }
4313                         err = drbd_recv_all(peer_device->connection, p, pi->size);
4314                         if (err)
4315                                goto out;
4316                         err = decode_bitmap_c(peer_device, p, &c, pi->size);
4317                 } else {
4318                         drbd_warn(device, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", pi->cmd);
4319                         err = -EIO;
4320                         goto out;
4321                 }
4322
4323                 c.packets[pi->cmd == P_BITMAP]++;
4324                 c.bytes[pi->cmd == P_BITMAP] += drbd_header_size(connection) + pi->size;
4325
4326                 if (err <= 0) {
4327                         if (err < 0)
4328                                 goto out;
4329                         break;
4330                 }
4331                 err = drbd_recv_header(peer_device->connection, pi);
4332                 if (err)
4333                         goto out;
4334         }
4335
4336         INFO_bm_xfer_stats(device, "receive", &c);
4337
4338         if (device->state.conn == C_WF_BITMAP_T) {
4339                 enum drbd_state_rv rv;
4340
4341                 err = drbd_send_bitmap(device);
4342                 if (err)
4343                         goto out;
4344                 /* Omit CS_ORDERED with this state transition to avoid deadlocks. */
4345                 rv = _drbd_request_state(device, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
4346                 D_ASSERT(device, rv == SS_SUCCESS);
4347         } else if (device->state.conn != C_WF_BITMAP_S) {
4348                 /* admin may have requested C_DISCONNECTING,
4349                  * other threads may have noticed network errors */
4350                 drbd_info(device, "unexpected cstate (%s) in receive_bitmap\n",
4351                     drbd_conn_str(device->state.conn));
4352         }
4353         err = 0;
4354
4355  out:
4356         drbd_bm_unlock(device);
4357         if (!err && device->state.conn == C_WF_BITMAP_S)
4358                 drbd_start_resync(device, C_SYNC_SOURCE);
4359         return err;
4360 }
4361
4362 static int receive_skip(struct drbd_connection *connection, struct packet_info *pi)
4363 {
4364         drbd_warn(connection, "skipping unknown optional packet type %d, l: %d!\n",
4365                  pi->cmd, pi->size);
4366
4367         return ignore_remaining_packet(connection, pi);
4368 }
4369
4370 static int receive_UnplugRemote(struct drbd_connection *connection, struct packet_info *pi)
4371 {
4372         /* Make sure we've acked all the TCP data associated
4373          * with the data requests being unplugged */
4374         drbd_tcp_quickack(connection->data.socket);
4375
4376         return 0;
4377 }
4378
4379 static int receive_out_of_sync(struct drbd_connection *connection, struct packet_info *pi)
4380 {
4381         struct drbd_peer_device *peer_device;
4382         struct drbd_device *device;
4383         struct p_block_desc *p = pi->data;
4384
4385         peer_device = conn_peer_device(connection, pi->vnr);
4386         if (!peer_device)
4387                 return -EIO;
4388         device = peer_device->device;
4389
4390         switch (device->state.conn) {
4391         case C_WF_SYNC_UUID:
4392         case C_WF_BITMAP_T:
4393         case C_BEHIND:
4394                         break;
4395         default:
4396                 drbd_err(device, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
4397                                 drbd_conn_str(device->state.conn));
4398         }
4399
4400         drbd_set_out_of_sync(device, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
4401
4402         return 0;
4403 }
4404
4405 struct data_cmd {
4406         int expect_payload;
4407         size_t pkt_size;
4408         int (*fn)(struct drbd_connection *, struct packet_info *);
4409 };
4410
4411 static struct data_cmd drbd_cmd_handler[] = {
4412         [P_DATA]            = { 1, sizeof(struct p_data), receive_Data },
4413         [P_DATA_REPLY]      = { 1, sizeof(struct p_data), receive_DataReply },
4414         [P_RS_DATA_REPLY]   = { 1, sizeof(struct p_data), receive_RSDataReply } ,
4415         [P_BARRIER]         = { 0, sizeof(struct p_barrier), receive_Barrier } ,
4416         [P_BITMAP]          = { 1, 0, receive_bitmap } ,
4417         [P_COMPRESSED_BITMAP] = { 1, 0, receive_bitmap } ,
4418         [P_UNPLUG_REMOTE]   = { 0, 0, receive_UnplugRemote },
4419         [P_DATA_REQUEST]    = { 0, sizeof(struct p_block_req), receive_DataRequest },
4420         [P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
4421         [P_SYNC_PARAM]      = { 1, 0, receive_SyncParam },
4422         [P_SYNC_PARAM89]    = { 1, 0, receive_SyncParam },
4423         [P_PROTOCOL]        = { 1, sizeof(struct p_protocol), receive_protocol },
4424         [P_UUIDS]           = { 0, sizeof(struct p_uuids), receive_uuids },
4425         [P_SIZES]           = { 0, sizeof(struct p_sizes), receive_sizes },
4426         [P_STATE]           = { 0, sizeof(struct p_state), receive_state },
4427         [P_STATE_CHG_REQ]   = { 0, sizeof(struct p_req_state), receive_req_state },
4428         [P_SYNC_UUID]       = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
4429         [P_OV_REQUEST]      = { 0, sizeof(struct p_block_req), receive_DataRequest },
4430         [P_OV_REPLY]        = { 1, sizeof(struct p_block_req), receive_DataRequest },
4431         [P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
4432         [P_DELAY_PROBE]     = { 0, sizeof(struct p_delay_probe93), receive_skip },
4433         [P_OUT_OF_SYNC]     = { 0, sizeof(struct p_block_desc), receive_out_of_sync },
4434         [P_CONN_ST_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_conn_state },
4435         [P_PROTOCOL_UPDATE] = { 1, sizeof(struct p_protocol), receive_protocol },
4436 };
4437
4438 static void drbdd(struct drbd_connection *connection)
4439 {
4440         struct packet_info pi;
4441         size_t shs; /* sub header size */
4442         int err;
4443
4444         while (get_t_state(&connection->receiver) == RUNNING) {
4445                 struct data_cmd *cmd;
4446
4447                 drbd_thread_current_set_cpu(&connection->receiver);
4448                 if (drbd_recv_header(connection, &pi))
4449                         goto err_out;
4450
4451                 cmd = &drbd_cmd_handler[pi.cmd];
4452                 if (unlikely(pi.cmd >= ARRAY_SIZE(drbd_cmd_handler) || !cmd->fn)) {
4453                         drbd_err(connection, "Unexpected data packet %s (0x%04x)",
4454                                  cmdname(pi.cmd), pi.cmd);
4455                         goto err_out;
4456                 }
4457
4458                 shs = cmd->pkt_size;
4459                 if (pi.size > shs && !cmd->expect_payload) {
4460                         drbd_err(connection, "No payload expected %s l:%d\n",
4461                                  cmdname(pi.cmd), pi.size);
4462                         goto err_out;
4463                 }
4464
4465                 if (shs) {
4466                         err = drbd_recv_all_warn(connection, pi.data, shs);
4467                         if (err)
4468                                 goto err_out;
4469                         pi.size -= shs;
4470                 }
4471
4472                 err = cmd->fn(connection, &pi);
4473                 if (err) {
4474                         drbd_err(connection, "error receiving %s, e: %d l: %d!\n",
4475                                  cmdname(pi.cmd), err, pi.size);
4476                         goto err_out;
4477                 }
4478         }
4479         return;
4480
4481     err_out:
4482         conn_request_state(connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4483 }
4484
4485 static void conn_disconnect(struct drbd_connection *connection)
4486 {
4487         struct drbd_peer_device *peer_device;
4488         enum drbd_conns oc;
4489         int vnr;
4490
4491         if (connection->cstate == C_STANDALONE)
4492                 return;
4493
4494         /* We are about to start the cleanup after connection loss.
4495          * Make sure drbd_make_request knows about that.
4496          * Usually we should be in some network failure state already,
4497          * but just in case we are not, we fix it up here.
4498          */
4499         conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
4500
4501         /* asender does not clean up anything. it must not interfere, either */
4502         drbd_thread_stop(&connection->asender);
4503         drbd_free_sock(connection);
4504
4505         rcu_read_lock();
4506         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
4507                 struct drbd_device *device = peer_device->device;
4508                 kref_get(&device->kref);
4509                 rcu_read_unlock();
4510                 drbd_disconnected(peer_device);
4511                 kref_put(&device->kref, drbd_destroy_device);
4512                 rcu_read_lock();
4513         }
4514         rcu_read_unlock();
4515
4516         if (!list_empty(&connection->current_epoch->list))
4517                 drbd_err(connection, "ASSERTION FAILED: connection->current_epoch->list not empty\n");
4518         /* ok, no more ee's on the fly, it is safe to reset the epoch_size */
4519         atomic_set(&connection->current_epoch->epoch_size, 0);
4520         connection->send.seen_any_write_yet = false;
4521
4522         drbd_info(connection, "Connection closed\n");
4523
4524         if (conn_highest_role(connection) == R_PRIMARY && conn_highest_pdsk(connection) >= D_UNKNOWN)
4525                 conn_try_outdate_peer_async(connection);
4526
4527         spin_lock_irq(&connection->resource->req_lock);
4528         oc = connection->cstate;
4529         if (oc >= C_UNCONNECTED)
4530                 _conn_request_state(connection, NS(conn, C_UNCONNECTED), CS_VERBOSE);
4531
4532         spin_unlock_irq(&connection->resource->req_lock);
4533
4534         if (oc == C_DISCONNECTING)
4535                 conn_request_state(connection, NS(conn, C_STANDALONE), CS_VERBOSE | CS_HARD);
4536 }
4537
4538 static int drbd_disconnected(struct drbd_peer_device *peer_device)
4539 {
4540         struct drbd_device *device = peer_device->device;
4541         unsigned int i;
4542
4543         /* wait for current activity to cease. */
4544         spin_lock_irq(&device->resource->req_lock);
4545         _drbd_wait_ee_list_empty(device, &device->active_ee);
4546         _drbd_wait_ee_list_empty(device, &device->sync_ee);
4547         _drbd_wait_ee_list_empty(device, &device->read_ee);
4548         spin_unlock_irq(&device->resource->req_lock);
4549
4550         /* We do not have data structures that would allow us to
4551          * get the rs_pending_cnt down to 0 again.
4552          *  * On C_SYNC_TARGET we do not have any data structures describing
4553          *    the pending RSDataRequest's we have sent.
4554          *  * On C_SYNC_SOURCE there is no data structure that tracks
4555          *    the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
4556          *  And no, it is not the sum of the reference counts in the
4557          *  resync_LRU. The resync_LRU tracks the whole operation including
4558          *  the disk-IO, while the rs_pending_cnt only tracks the blocks
4559          *  on the fly. */
4560         drbd_rs_cancel_all(device);
4561         device->rs_total = 0;
4562         device->rs_failed = 0;
4563         atomic_set(&device->rs_pending_cnt, 0);
4564         wake_up(&device->misc_wait);
4565
4566         del_timer_sync(&device->resync_timer);
4567         resync_timer_fn((unsigned long)device);
4568
4569         /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
4570          * w_make_resync_request etc. which may still be on the worker queue
4571          * to be "canceled" */
4572         drbd_flush_workqueue(&peer_device->connection->sender_work);
4573
4574         drbd_finish_peer_reqs(device);
4575
4576         /* This second workqueue flush is necessary, since drbd_finish_peer_reqs()
4577            might have issued a work again. The one before drbd_finish_peer_reqs() is
4578            necessary to reclain net_ee in drbd_finish_peer_reqs(). */
4579         drbd_flush_workqueue(&peer_device->connection->sender_work);
4580
4581         /* need to do it again, drbd_finish_peer_reqs() may have populated it
4582          * again via drbd_try_clear_on_disk_bm(). */
4583         drbd_rs_cancel_all(device);
4584
4585         kfree(device->p_uuid);
4586         device->p_uuid = NULL;
4587
4588         if (!drbd_suspended(device))
4589                 tl_clear(peer_device->connection);
4590
4591         drbd_md_sync(device);
4592
4593         /* serialize with bitmap writeout triggered by the state change,
4594          * if any. */
4595         wait_event(device->misc_wait, !test_bit(BITMAP_IO, &device->flags));
4596
4597         /* tcp_close and release of sendpage pages can be deferred.  I don't
4598          * want to use SO_LINGER, because apparently it can be deferred for
4599          * more than 20 seconds (longest time I checked).
4600          *
4601          * Actually we don't care for exactly when the network stack does its
4602          * put_page(), but release our reference on these pages right here.
4603          */
4604         i = drbd_free_peer_reqs(device, &device->net_ee);
4605         if (i)
4606                 drbd_info(device, "net_ee not empty, killed %u entries\n", i);
4607         i = atomic_read(&device->pp_in_use_by_net);
4608         if (i)
4609                 drbd_info(device, "pp_in_use_by_net = %d, expected 0\n", i);
4610         i = atomic_read(&device->pp_in_use);
4611         if (i)
4612                 drbd_info(device, "pp_in_use = %d, expected 0\n", i);
4613
4614         D_ASSERT(device, list_empty(&device->read_ee));
4615         D_ASSERT(device, list_empty(&device->active_ee));
4616         D_ASSERT(device, list_empty(&device->sync_ee));
4617         D_ASSERT(device, list_empty(&device->done_ee));
4618
4619         return 0;
4620 }
4621
4622 /*
4623  * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
4624  * we can agree on is stored in agreed_pro_version.
4625  *
4626  * feature flags and the reserved array should be enough room for future
4627  * enhancements of the handshake protocol, and possible plugins...
4628  *
4629  * for now, they are expected to be zero, but ignored.
4630  */
4631 static int drbd_send_features(struct drbd_connection *connection)
4632 {
4633         struct drbd_socket *sock;
4634         struct p_connection_features *p;
4635
4636         sock = &connection->data;
4637         p = conn_prepare_command(connection, sock);
4638         if (!p)
4639                 return -EIO;
4640         memset(p, 0, sizeof(*p));
4641         p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
4642         p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
4643         return conn_send_command(connection, sock, P_CONNECTION_FEATURES, sizeof(*p), NULL, 0);
4644 }
4645
4646 /*
4647  * return values:
4648  *   1 yes, we have a valid connection
4649  *   0 oops, did not work out, please try again
4650  *  -1 peer talks different language,
4651  *     no point in trying again, please go standalone.
4652  */
4653 static int drbd_do_features(struct drbd_connection *connection)
4654 {
4655         /* ASSERT current == connection->receiver ... */
4656         struct p_connection_features *p;
4657         const int expect = sizeof(struct p_connection_features);
4658         struct packet_info pi;
4659         int err;
4660
4661         err = drbd_send_features(connection);
4662         if (err)
4663                 return 0;
4664
4665         err = drbd_recv_header(connection, &pi);
4666         if (err)
4667                 return 0;
4668
4669         if (pi.cmd != P_CONNECTION_FEATURES) {
4670                 drbd_err(connection, "expected ConnectionFeatures packet, received: %s (0x%04x)\n",
4671                          cmdname(pi.cmd), pi.cmd);
4672                 return -1;
4673         }
4674
4675         if (pi.size != expect) {
4676                 drbd_err(connection, "expected ConnectionFeatures length: %u, received: %u\n",
4677                      expect, pi.size);
4678                 return -1;
4679         }
4680
4681         p = pi.data;
4682         err = drbd_recv_all_warn(connection, p, expect);
4683         if (err)
4684                 return 0;
4685
4686         p->protocol_min = be32_to_cpu(p->protocol_min);
4687         p->protocol_max = be32_to_cpu(p->protocol_max);
4688         if (p->protocol_max == 0)
4689                 p->protocol_max = p->protocol_min;
4690
4691         if (PRO_VERSION_MAX < p->protocol_min ||
4692             PRO_VERSION_MIN > p->protocol_max)
4693                 goto incompat;
4694
4695         connection->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
4696
4697         drbd_info(connection, "Handshake successful: "
4698              "Agreed network protocol version %d\n", connection->agreed_pro_version);
4699
4700         return 1;
4701
4702  incompat:
4703         drbd_err(connection, "incompatible DRBD dialects: "
4704             "I support %d-%d, peer supports %d-%d\n",
4705             PRO_VERSION_MIN, PRO_VERSION_MAX,
4706             p->protocol_min, p->protocol_max);
4707         return -1;
4708 }
4709
4710 #if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
4711 static int drbd_do_auth(struct drbd_connection *connection)
4712 {
4713         drbd_err(connection, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
4714         drbd_err(connection, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
4715         return -1;
4716 }
4717 #else
4718 #define CHALLENGE_LEN 64
4719
4720 /* Return value:
4721         1 - auth succeeded,
4722         0 - failed, try again (network error),
4723         -1 - auth failed, don't try again.
4724 */
4725
4726 static int drbd_do_auth(struct drbd_connection *connection)
4727 {
4728         struct drbd_socket *sock;
4729         char my_challenge[CHALLENGE_LEN];  /* 64 Bytes... */
4730         struct scatterlist sg;
4731         char *response = NULL;
4732         char *right_response = NULL;
4733         char *peers_ch = NULL;
4734         unsigned int key_len;
4735         char secret[SHARED_SECRET_MAX]; /* 64 byte */
4736         unsigned int resp_size;
4737         struct hash_desc desc;
4738         struct packet_info pi;
4739         struct net_conf *nc;
4740         int err, rv;
4741
4742         /* FIXME: Put the challenge/response into the preallocated socket buffer.  */
4743
4744         rcu_read_lock();
4745         nc = rcu_dereference(connection->net_conf);
4746         key_len = strlen(nc->shared_secret);
4747         memcpy(secret, nc->shared_secret, key_len);
4748         rcu_read_unlock();
4749
4750         desc.tfm = connection->cram_hmac_tfm;
4751         desc.flags = 0;
4752
4753         rv = crypto_hash_setkey(connection->cram_hmac_tfm, (u8 *)secret, key_len);
4754         if (rv) {
4755                 drbd_err(connection, "crypto_hash_setkey() failed with %d\n", rv);
4756                 rv = -1;
4757                 goto fail;
4758         }
4759
4760         get_random_bytes(my_challenge, CHALLENGE_LEN);
4761
4762         sock = &connection->data;
4763         if (!conn_prepare_command(connection, sock)) {
4764                 rv = 0;
4765                 goto fail;
4766         }
4767         rv = !conn_send_command(connection, sock, P_AUTH_CHALLENGE, 0,
4768                                 my_challenge, CHALLENGE_LEN);
4769         if (!rv)
4770                 goto fail;
4771
4772         err = drbd_recv_header(connection, &pi);
4773         if (err) {
4774                 rv = 0;
4775                 goto fail;
4776         }
4777
4778         if (pi.cmd != P_AUTH_CHALLENGE) {
4779                 drbd_err(connection, "expected AuthChallenge packet, received: %s (0x%04x)\n",
4780                          cmdname(pi.cmd), pi.cmd);
4781                 rv = 0;
4782                 goto fail;
4783         }
4784
4785         if (pi.size > CHALLENGE_LEN * 2) {
4786                 drbd_err(connection, "expected AuthChallenge payload too big.\n");
4787                 rv = -1;
4788                 goto fail;
4789         }
4790
4791         peers_ch = kmalloc(pi.size, GFP_NOIO);
4792         if (peers_ch == NULL) {
4793                 drbd_err(connection, "kmalloc of peers_ch failed\n");
4794                 rv = -1;
4795                 goto fail;
4796         }
4797
4798         err = drbd_recv_all_warn(connection, peers_ch, pi.size);
4799         if (err) {
4800                 rv = 0;
4801                 goto fail;
4802         }
4803
4804         resp_size = crypto_hash_digestsize(connection->cram_hmac_tfm);
4805         response = kmalloc(resp_size, GFP_NOIO);
4806         if (response == NULL) {
4807                 drbd_err(connection, "kmalloc of response failed\n");
4808                 rv = -1;
4809                 goto fail;
4810         }
4811
4812         sg_init_table(&sg, 1);
4813         sg_set_buf(&sg, peers_ch, pi.size);
4814
4815         rv = crypto_hash_digest(&desc, &sg, sg.length, response);
4816         if (rv) {
4817                 drbd_err(connection, "crypto_hash_digest() failed with %d\n", rv);
4818                 rv = -1;
4819                 goto fail;
4820         }
4821
4822         if (!conn_prepare_command(connection, sock)) {
4823                 rv = 0;
4824                 goto fail;
4825         }
4826         rv = !conn_send_command(connection, sock, P_AUTH_RESPONSE, 0,
4827                                 response, resp_size);
4828         if (!rv)
4829                 goto fail;
4830
4831         err = drbd_recv_header(connection, &pi);
4832         if (err) {
4833                 rv = 0;
4834                 goto fail;
4835         }
4836
4837         if (pi.cmd != P_AUTH_RESPONSE) {
4838                 drbd_err(connection, "expected AuthResponse packet, received: %s (0x%04x)\n",
4839                          cmdname(pi.cmd), pi.cmd);
4840                 rv = 0;
4841                 goto fail;
4842         }
4843
4844         if (pi.size != resp_size) {
4845                 drbd_err(connection, "expected AuthResponse payload of wrong size\n");
4846                 rv = 0;
4847                 goto fail;
4848         }
4849
4850         err = drbd_recv_all_warn(connection, response , resp_size);
4851         if (err) {
4852                 rv = 0;
4853                 goto fail;
4854         }
4855
4856         right_response = kmalloc(resp_size, GFP_NOIO);
4857         if (right_response == NULL) {
4858                 drbd_err(connection, "kmalloc of right_response failed\n");
4859                 rv = -1;
4860                 goto fail;
4861         }
4862
4863         sg_set_buf(&sg, my_challenge, CHALLENGE_LEN);
4864
4865         rv = crypto_hash_digest(&desc, &sg, sg.length, right_response);
4866         if (rv) {
4867                 drbd_err(connection, "crypto_hash_digest() failed with %d\n", rv);
4868                 rv = -1;
4869                 goto fail;
4870         }
4871
4872         rv = !memcmp(response, right_response, resp_size);
4873
4874         if (rv)
4875                 drbd_info(connection, "Peer authenticated using %d bytes HMAC\n",
4876                      resp_size);
4877         else
4878                 rv = -1;
4879
4880  fail:
4881         kfree(peers_ch);
4882         kfree(response);
4883         kfree(right_response);
4884
4885         return rv;
4886 }
4887 #endif
4888
4889 int drbd_receiver(struct drbd_thread *thi)
4890 {
4891         struct drbd_connection *connection = thi->connection;
4892         int h;
4893
4894         drbd_info(connection, "receiver (re)started\n");
4895
4896         do {
4897                 h = conn_connect(connection);
4898                 if (h == 0) {
4899                         conn_disconnect(connection);
4900                         schedule_timeout_interruptible(HZ);
4901                 }
4902                 if (h == -1) {
4903                         drbd_warn(connection, "Discarding network configuration.\n");
4904                         conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
4905                 }
4906         } while (h == 0);
4907
4908         if (h > 0)
4909                 drbdd(connection);
4910
4911         conn_disconnect(connection);
4912
4913         drbd_info(connection, "receiver terminated\n");
4914         return 0;
4915 }
4916
4917 /* ********* acknowledge sender ******** */
4918
4919 static int got_conn_RqSReply(struct drbd_connection *connection, struct packet_info *pi)
4920 {
4921         struct p_req_state_reply *p = pi->data;
4922         int retcode = be32_to_cpu(p->retcode);
4923
4924         if (retcode >= SS_SUCCESS) {
4925                 set_bit(CONN_WD_ST_CHG_OKAY, &connection->flags);
4926         } else {
4927                 set_bit(CONN_WD_ST_CHG_FAIL, &connection->flags);
4928                 drbd_err(connection, "Requested state change failed by peer: %s (%d)\n",
4929                          drbd_set_st_err_str(retcode), retcode);
4930         }
4931         wake_up(&connection->ping_wait);
4932
4933         return 0;
4934 }
4935
4936 static int got_RqSReply(struct drbd_connection *connection, struct packet_info *pi)
4937 {
4938         struct drbd_peer_device *peer_device;
4939         struct drbd_device *device;
4940         struct p_req_state_reply *p = pi->data;
4941         int retcode = be32_to_cpu(p->retcode);
4942
4943         peer_device = conn_peer_device(connection, pi->vnr);
4944         if (!peer_device)
4945                 return -EIO;
4946         device = peer_device->device;
4947
4948         if (test_bit(CONN_WD_ST_CHG_REQ, &connection->flags)) {
4949                 D_ASSERT(device, connection->agreed_pro_version < 100);
4950                 return got_conn_RqSReply(connection, pi);
4951         }
4952
4953         if (retcode >= SS_SUCCESS) {
4954                 set_bit(CL_ST_CHG_SUCCESS, &device->flags);
4955         } else {
4956                 set_bit(CL_ST_CHG_FAIL, &device->flags);
4957                 drbd_err(device, "Requested state change failed by peer: %s (%d)\n",
4958                         drbd_set_st_err_str(retcode), retcode);
4959         }
4960         wake_up(&device->state_wait);
4961
4962         return 0;
4963 }
4964
4965 static int got_Ping(struct drbd_connection *connection, struct packet_info *pi)
4966 {
4967         return drbd_send_ping_ack(connection);
4968
4969 }
4970
4971 static int got_PingAck(struct drbd_connection *connection, struct packet_info *pi)
4972 {
4973         /* restore idle timeout */
4974         connection->meta.socket->sk->sk_rcvtimeo = connection->net_conf->ping_int*HZ;
4975         if (!test_and_set_bit(GOT_PING_ACK, &connection->flags))
4976                 wake_up(&connection->ping_wait);
4977
4978         return 0;
4979 }
4980
4981 static int got_IsInSync(struct drbd_connection *connection, struct packet_info *pi)
4982 {
4983         struct drbd_peer_device *peer_device;
4984         struct drbd_device *device;
4985         struct p_block_ack *p = pi->data;
4986         sector_t sector = be64_to_cpu(p->sector);
4987         int blksize = be32_to_cpu(p->blksize);
4988
4989         peer_device = conn_peer_device(connection, pi->vnr);
4990         if (!peer_device)
4991                 return -EIO;
4992         device = peer_device->device;
4993
4994         D_ASSERT(device, peer_device->connection->agreed_pro_version >= 89);
4995
4996         update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
4997
4998         if (get_ldev(device)) {
4999                 drbd_rs_complete_io(device, sector);
5000                 drbd_set_in_sync(device, sector, blksize);
5001                 /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
5002                 device->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
5003                 put_ldev(device);
5004         }
5005         dec_rs_pending(device);
5006         atomic_add(blksize >> 9, &device->rs_sect_in);
5007
5008         return 0;
5009 }
5010
5011 static int
5012 validate_req_change_req_state(struct drbd_device *device, u64 id, sector_t sector,
5013                               struct rb_root *root, const char *func,
5014                               enum drbd_req_event what, bool missing_ok)
5015 {
5016         struct drbd_request *req;
5017         struct bio_and_error m;
5018
5019         spin_lock_irq(&device->resource->req_lock);
5020         req = find_request(device, root, id, sector, missing_ok, func);
5021         if (unlikely(!req)) {
5022                 spin_unlock_irq(&device->resource->req_lock);
5023                 return -EIO;
5024         }
5025         __req_mod(req, what, &m);
5026         spin_unlock_irq(&device->resource->req_lock);
5027
5028         if (m.bio)
5029                 complete_master_bio(device, &m);
5030         return 0;
5031 }
5032
5033 static int got_BlockAck(struct drbd_connection *connection, struct packet_info *pi)
5034 {
5035         struct drbd_peer_device *peer_device;
5036         struct drbd_device *device;
5037         struct p_block_ack *p = pi->data;
5038         sector_t sector = be64_to_cpu(p->sector);
5039         int blksize = be32_to_cpu(p->blksize);
5040         enum drbd_req_event what;
5041
5042         peer_device = conn_peer_device(connection, pi->vnr);
5043         if (!peer_device)
5044                 return -EIO;
5045         device = peer_device->device;
5046
5047         update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5048
5049         if (p->block_id == ID_SYNCER) {
5050                 drbd_set_in_sync(device, sector, blksize);
5051                 dec_rs_pending(device);
5052                 return 0;
5053         }
5054         switch (pi->cmd) {
5055         case P_RS_WRITE_ACK:
5056                 what = WRITE_ACKED_BY_PEER_AND_SIS;
5057                 break;
5058         case P_WRITE_ACK:
5059                 what = WRITE_ACKED_BY_PEER;
5060                 break;
5061         case P_RECV_ACK:
5062                 what = RECV_ACKED_BY_PEER;
5063                 break;
5064         case P_SUPERSEDED:
5065                 what = CONFLICT_RESOLVED;
5066                 break;
5067         case P_RETRY_WRITE:
5068                 what = POSTPONE_WRITE;
5069                 break;
5070         default:
5071                 BUG();
5072         }
5073
5074         return validate_req_change_req_state(device, p->block_id, sector,
5075                                              &device->write_requests, __func__,
5076                                              what, false);
5077 }
5078
5079 static int got_NegAck(struct drbd_connection *connection, struct packet_info *pi)
5080 {
5081         struct drbd_peer_device *peer_device;
5082         struct drbd_device *device;
5083         struct p_block_ack *p = pi->data;
5084         sector_t sector = be64_to_cpu(p->sector);
5085         int size = be32_to_cpu(p->blksize);
5086         int err;
5087
5088         peer_device = conn_peer_device(connection, pi->vnr);
5089         if (!peer_device)
5090                 return -EIO;
5091         device = peer_device->device;
5092
5093         update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5094
5095         if (p->block_id == ID_SYNCER) {
5096                 dec_rs_pending(device);
5097                 drbd_rs_failed_io(device, sector, size);
5098                 return 0;
5099         }
5100
5101         err = validate_req_change_req_state(device, p->block_id, sector,
5102                                             &device->write_requests, __func__,
5103                                             NEG_ACKED, true);
5104         if (err) {
5105                 /* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
5106                    The master bio might already be completed, therefore the
5107                    request is no longer in the collision hash. */
5108                 /* In Protocol B we might already have got a P_RECV_ACK
5109                    but then get a P_NEG_ACK afterwards. */
5110                 drbd_set_out_of_sync(device, sector, size);
5111         }
5112         return 0;
5113 }
5114
5115 static int got_NegDReply(struct drbd_connection *connection, struct packet_info *pi)
5116 {
5117         struct drbd_peer_device *peer_device;
5118         struct drbd_device *device;
5119         struct p_block_ack *p = pi->data;
5120         sector_t sector = be64_to_cpu(p->sector);
5121
5122         peer_device = conn_peer_device(connection, pi->vnr);
5123         if (!peer_device)
5124                 return -EIO;
5125         device = peer_device->device;
5126
5127         update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5128
5129         drbd_err(device, "Got NegDReply; Sector %llus, len %u.\n",
5130             (unsigned long long)sector, be32_to_cpu(p->blksize));
5131
5132         return validate_req_change_req_state(device, p->block_id, sector,
5133                                              &device->read_requests, __func__,
5134                                              NEG_ACKED, false);
5135 }
5136
5137 static int got_NegRSDReply(struct drbd_connection *connection, struct packet_info *pi)
5138 {
5139         struct drbd_peer_device *peer_device;
5140         struct drbd_device *device;
5141         sector_t sector;
5142         int size;
5143         struct p_block_ack *p = pi->data;
5144
5145         peer_device = conn_peer_device(connection, pi->vnr);
5146         if (!peer_device)
5147                 return -EIO;
5148         device = peer_device->device;
5149
5150         sector = be64_to_cpu(p->sector);
5151         size = be32_to_cpu(p->blksize);
5152
5153         update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5154
5155         dec_rs_pending(device);
5156
5157         if (get_ldev_if_state(device, D_FAILED)) {
5158                 drbd_rs_complete_io(device, sector);
5159                 switch (pi->cmd) {
5160                 case P_NEG_RS_DREPLY:
5161                         drbd_rs_failed_io(device, sector, size);
5162                 case P_RS_CANCEL:
5163                         break;
5164                 default:
5165                         BUG();
5166                 }
5167                 put_ldev(device);
5168         }
5169
5170         return 0;
5171 }
5172
5173 static int got_BarrierAck(struct drbd_connection *connection, struct packet_info *pi)
5174 {
5175         struct p_barrier_ack *p = pi->data;
5176         struct drbd_peer_device *peer_device;
5177         int vnr;
5178
5179         tl_release(connection, p->barrier, be32_to_cpu(p->set_size));
5180
5181         rcu_read_lock();
5182         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
5183                 struct drbd_device *device = peer_device->device;
5184
5185                 if (device->state.conn == C_AHEAD &&
5186                     atomic_read(&device->ap_in_flight) == 0 &&
5187                     !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &device->flags)) {
5188                         device->start_resync_timer.expires = jiffies + HZ;
5189                         add_timer(&device->start_resync_timer);
5190                 }
5191         }
5192         rcu_read_unlock();
5193
5194         return 0;
5195 }
5196
5197 static int got_OVResult(struct drbd_connection *connection, struct packet_info *pi)
5198 {
5199         struct drbd_peer_device *peer_device;
5200         struct drbd_device *device;
5201         struct p_block_ack *p = pi->data;
5202         struct drbd_device_work *dw;
5203         sector_t sector;
5204         int size;
5205
5206         peer_device = conn_peer_device(connection, pi->vnr);
5207         if (!peer_device)
5208                 return -EIO;
5209         device = peer_device->device;
5210
5211         sector = be64_to_cpu(p->sector);
5212         size = be32_to_cpu(p->blksize);
5213
5214         update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5215
5216         if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
5217                 drbd_ov_out_of_sync_found(device, sector, size);
5218         else
5219                 ov_out_of_sync_print(device);
5220
5221         if (!get_ldev(device))
5222                 return 0;
5223
5224         drbd_rs_complete_io(device, sector);
5225         dec_rs_pending(device);
5226
5227         --device->ov_left;
5228
5229         /* let's advance progress step marks only for every other megabyte */
5230         if ((device->ov_left & 0x200) == 0x200)
5231                 drbd_advance_rs_marks(device, device->ov_left);
5232
5233         if (device->ov_left == 0) {
5234                 dw = kmalloc(sizeof(*dw), GFP_NOIO);
5235                 if (dw) {
5236                         dw->w.cb = w_ov_finished;
5237                         dw->device = device;
5238                         drbd_queue_work(&peer_device->connection->sender_work, &dw->w);
5239                 } else {
5240                         drbd_err(device, "kmalloc(dw) failed.");
5241                         ov_out_of_sync_print(device);
5242                         drbd_resync_finished(device);
5243                 }
5244         }
5245         put_ldev(device);
5246         return 0;
5247 }
5248
5249 static int got_skip(struct drbd_connection *connection, struct packet_info *pi)
5250 {
5251         return 0;
5252 }
5253
5254 static int connection_finish_peer_reqs(struct drbd_connection *connection)
5255 {
5256         struct drbd_peer_device *peer_device;
5257         int vnr, not_empty = 0;
5258
5259         do {
5260                 clear_bit(SIGNAL_ASENDER, &connection->flags);
5261                 flush_signals(current);
5262
5263                 rcu_read_lock();
5264                 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
5265                         struct drbd_device *device = peer_device->device;
5266                         kref_get(&device->kref);
5267                         rcu_read_unlock();
5268                         if (drbd_finish_peer_reqs(device)) {
5269                                 kref_put(&device->kref, drbd_destroy_device);
5270                                 return 1;
5271                         }
5272                         kref_put(&device->kref, drbd_destroy_device);
5273                         rcu_read_lock();
5274                 }
5275                 set_bit(SIGNAL_ASENDER, &connection->flags);
5276
5277                 spin_lock_irq(&connection->resource->req_lock);
5278                 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
5279                         struct drbd_device *device = peer_device->device;
5280                         not_empty = !list_empty(&device->done_ee);
5281                         if (not_empty)
5282                                 break;
5283                 }
5284                 spin_unlock_irq(&connection->resource->req_lock);
5285                 rcu_read_unlock();
5286         } while (not_empty);
5287
5288         return 0;
5289 }
5290
5291 struct asender_cmd {
5292         size_t pkt_size;
5293         int (*fn)(struct drbd_connection *connection, struct packet_info *);
5294 };
5295
5296 static struct asender_cmd asender_tbl[] = {
5297         [P_PING]            = { 0, got_Ping },
5298         [P_PING_ACK]        = { 0, got_PingAck },
5299         [P_RECV_ACK]        = { sizeof(struct p_block_ack), got_BlockAck },
5300         [P_WRITE_ACK]       = { sizeof(struct p_block_ack), got_BlockAck },
5301         [P_RS_WRITE_ACK]    = { sizeof(struct p_block_ack), got_BlockAck },
5302         [P_SUPERSEDED]   = { sizeof(struct p_block_ack), got_BlockAck },
5303         [P_NEG_ACK]         = { sizeof(struct p_block_ack), got_NegAck },
5304         [P_NEG_DREPLY]      = { sizeof(struct p_block_ack), got_NegDReply },
5305         [P_NEG_RS_DREPLY]   = { sizeof(struct p_block_ack), got_NegRSDReply },
5306         [P_OV_RESULT]       = { sizeof(struct p_block_ack), got_OVResult },
5307         [P_BARRIER_ACK]     = { sizeof(struct p_barrier_ack), got_BarrierAck },
5308         [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
5309         [P_RS_IS_IN_SYNC]   = { sizeof(struct p_block_ack), got_IsInSync },
5310         [P_DELAY_PROBE]     = { sizeof(struct p_delay_probe93), got_skip },
5311         [P_RS_CANCEL]       = { sizeof(struct p_block_ack), got_NegRSDReply },
5312         [P_CONN_ST_CHG_REPLY]={ sizeof(struct p_req_state_reply), got_conn_RqSReply },
5313         [P_RETRY_WRITE]     = { sizeof(struct p_block_ack), got_BlockAck },
5314 };
5315
5316 int drbd_asender(struct drbd_thread *thi)
5317 {
5318         struct drbd_connection *connection = thi->connection;
5319         struct asender_cmd *cmd = NULL;
5320         struct packet_info pi;
5321         int rv;
5322         void *buf    = connection->meta.rbuf;
5323         int received = 0;
5324         unsigned int header_size = drbd_header_size(connection);
5325         int expect   = header_size;
5326         bool ping_timeout_active = false;
5327         struct net_conf *nc;
5328         int ping_timeo, tcp_cork, ping_int;
5329         struct sched_param param = { .sched_priority = 2 };
5330
5331         rv = sched_setscheduler(current, SCHED_RR, &param);
5332         if (rv < 0)
5333                 drbd_err(connection, "drbd_asender: ERROR set priority, ret=%d\n", rv);
5334
5335         while (get_t_state(thi) == RUNNING) {
5336                 drbd_thread_current_set_cpu(thi);
5337
5338                 rcu_read_lock();
5339                 nc = rcu_dereference(connection->net_conf);
5340                 ping_timeo = nc->ping_timeo;
5341                 tcp_cork = nc->tcp_cork;
5342                 ping_int = nc->ping_int;
5343                 rcu_read_unlock();
5344
5345                 if (test_and_clear_bit(SEND_PING, &connection->flags)) {
5346                         if (drbd_send_ping(connection)) {
5347                                 drbd_err(connection, "drbd_send_ping has failed\n");
5348                                 goto reconnect;
5349                         }
5350                         connection->meta.socket->sk->sk_rcvtimeo = ping_timeo * HZ / 10;
5351                         ping_timeout_active = true;
5352                 }
5353
5354                 /* TODO: conditionally cork; it may hurt latency if we cork without
5355                    much to send */
5356                 if (tcp_cork)
5357                         drbd_tcp_cork(connection->meta.socket);
5358                 if (connection_finish_peer_reqs(connection)) {
5359                         drbd_err(connection, "connection_finish_peer_reqs() failed\n");
5360                         goto reconnect;
5361                 }
5362                 /* but unconditionally uncork unless disabled */
5363                 if (tcp_cork)
5364                         drbd_tcp_uncork(connection->meta.socket);
5365
5366                 /* short circuit, recv_msg would return EINTR anyways. */
5367                 if (signal_pending(current))
5368                         continue;
5369
5370                 rv = drbd_recv_short(connection->meta.socket, buf, expect-received, 0);
5371                 clear_bit(SIGNAL_ASENDER, &connection->flags);
5372
5373                 flush_signals(current);
5374
5375                 /* Note:
5376                  * -EINTR        (on meta) we got a signal
5377                  * -EAGAIN       (on meta) rcvtimeo expired
5378                  * -ECONNRESET   other side closed the connection
5379                  * -ERESTARTSYS  (on data) we got a signal
5380                  * rv <  0       other than above: unexpected error!
5381                  * rv == expected: full header or command
5382                  * rv <  expected: "woken" by signal during receive
5383                  * rv == 0       : "connection shut down by peer"
5384                  */
5385                 if (likely(rv > 0)) {
5386                         received += rv;
5387                         buf      += rv;
5388                 } else if (rv == 0) {
5389                         if (test_bit(DISCONNECT_SENT, &connection->flags)) {
5390                                 long t;
5391                                 rcu_read_lock();
5392                                 t = rcu_dereference(connection->net_conf)->ping_timeo * HZ/10;
5393                                 rcu_read_unlock();
5394
5395                                 t = wait_event_timeout(connection->ping_wait,
5396                                                        connection->cstate < C_WF_REPORT_PARAMS,
5397                                                        t);
5398                                 if (t)
5399                                         break;
5400                         }
5401                         drbd_err(connection, "meta connection shut down by peer.\n");
5402                         goto reconnect;
5403                 } else if (rv == -EAGAIN) {
5404                         /* If the data socket received something meanwhile,
5405                          * that is good enough: peer is still alive. */
5406                         if (time_after(connection->last_received,
5407                                 jiffies - connection->meta.socket->sk->sk_rcvtimeo))
5408                                 continue;
5409                         if (ping_timeout_active) {
5410                                 drbd_err(connection, "PingAck did not arrive in time.\n");
5411                                 goto reconnect;
5412                         }
5413                         set_bit(SEND_PING, &connection->flags);
5414                         continue;
5415                 } else if (rv == -EINTR) {
5416                         continue;
5417                 } else {
5418                         drbd_err(connection, "sock_recvmsg returned %d\n", rv);
5419                         goto reconnect;
5420                 }
5421
5422                 if (received == expect && cmd == NULL) {
5423                         if (decode_header(connection, connection->meta.rbuf, &pi))
5424                                 goto reconnect;
5425                         cmd = &asender_tbl[pi.cmd];
5426                         if (pi.cmd >= ARRAY_SIZE(asender_tbl) || !cmd->fn) {
5427                                 drbd_err(connection, "Unexpected meta packet %s (0x%04x)\n",
5428                                          cmdname(pi.cmd), pi.cmd);
5429                                 goto disconnect;
5430                         }
5431                         expect = header_size + cmd->pkt_size;
5432                         if (pi.size != expect - header_size) {
5433                                 drbd_err(connection, "Wrong packet size on meta (c: %d, l: %d)\n",
5434                                         pi.cmd, pi.size);
5435                                 goto reconnect;
5436                         }
5437                 }
5438                 if (received == expect) {
5439                         bool err;
5440
5441                         err = cmd->fn(connection, &pi);
5442                         if (err) {
5443                                 drbd_err(connection, "%pf failed\n", cmd->fn);
5444                                 goto reconnect;
5445                         }
5446
5447                         connection->last_received = jiffies;
5448
5449                         if (cmd == &asender_tbl[P_PING_ACK]) {
5450                                 /* restore idle timeout */
5451                                 connection->meta.socket->sk->sk_rcvtimeo = ping_int * HZ;
5452                                 ping_timeout_active = false;
5453                         }
5454
5455                         buf      = connection->meta.rbuf;
5456                         received = 0;
5457                         expect   = header_size;
5458                         cmd      = NULL;
5459                 }
5460         }
5461
5462         if (0) {
5463 reconnect:
5464                 conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
5465                 conn_md_sync(connection);
5466         }
5467         if (0) {
5468 disconnect:
5469                 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
5470         }
5471         clear_bit(SIGNAL_ASENDER, &connection->flags);
5472
5473         drbd_info(connection, "asender terminated\n");
5474
5475         return 0;
5476 }