Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/viro/vfs
[linux.git] / drivers / block / drbd / drbd_receiver.c
1 /*
2    drbd_receiver.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24
25
26 #include <linux/module.h>
27
28 #include <asm/uaccess.h>
29 #include <net/sock.h>
30
31 #include <linux/drbd.h>
32 #include <linux/fs.h>
33 #include <linux/file.h>
34 #include <linux/in.h>
35 #include <linux/mm.h>
36 #include <linux/memcontrol.h>
37 #include <linux/mm_inline.h>
38 #include <linux/slab.h>
39 #include <linux/pkt_sched.h>
40 #define __KERNEL_SYSCALLS__
41 #include <linux/unistd.h>
42 #include <linux/vmalloc.h>
43 #include <linux/random.h>
44 #include <linux/string.h>
45 #include <linux/scatterlist.h>
46 #include "drbd_int.h"
47 #include "drbd_protocol.h"
48 #include "drbd_req.h"
49
50 #include "drbd_vli.h"
51
52 struct packet_info {
53         enum drbd_packet cmd;
54         unsigned int size;
55         unsigned int vnr;
56         void *data;
57 };
58
59 enum finish_epoch {
60         FE_STILL_LIVE,
61         FE_DESTROYED,
62         FE_RECYCLED,
63 };
64
65 static int drbd_do_features(struct drbd_connection *connection);
66 static int drbd_do_auth(struct drbd_connection *connection);
67 static int drbd_disconnected(struct drbd_peer_device *);
68
69 static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *, struct drbd_epoch *, enum epoch_event);
70 static int e_end_block(struct drbd_work *, int);
71
72
73 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
74
75 /*
76  * some helper functions to deal with single linked page lists,
77  * page->private being our "next" pointer.
78  */
79
80 /* If at least n pages are linked at head, get n pages off.
81  * Otherwise, don't modify head, and return NULL.
82  * Locking is the responsibility of the caller.
83  */
84 static struct page *page_chain_del(struct page **head, int n)
85 {
86         struct page *page;
87         struct page *tmp;
88
89         BUG_ON(!n);
90         BUG_ON(!head);
91
92         page = *head;
93
94         if (!page)
95                 return NULL;
96
97         while (page) {
98                 tmp = page_chain_next(page);
99                 if (--n == 0)
100                         break; /* found sufficient pages */
101                 if (tmp == NULL)
102                         /* insufficient pages, don't use any of them. */
103                         return NULL;
104                 page = tmp;
105         }
106
107         /* add end of list marker for the returned list */
108         set_page_private(page, 0);
109         /* actual return value, and adjustment of head */
110         page = *head;
111         *head = tmp;
112         return page;
113 }
114
115 /* may be used outside of locks to find the tail of a (usually short)
116  * "private" page chain, before adding it back to a global chain head
117  * with page_chain_add() under a spinlock. */
118 static struct page *page_chain_tail(struct page *page, int *len)
119 {
120         struct page *tmp;
121         int i = 1;
122         while ((tmp = page_chain_next(page)))
123                 ++i, page = tmp;
124         if (len)
125                 *len = i;
126         return page;
127 }
128
129 static int page_chain_free(struct page *page)
130 {
131         struct page *tmp;
132         int i = 0;
133         page_chain_for_each_safe(page, tmp) {
134                 put_page(page);
135                 ++i;
136         }
137         return i;
138 }
139
140 static void page_chain_add(struct page **head,
141                 struct page *chain_first, struct page *chain_last)
142 {
143 #if 1
144         struct page *tmp;
145         tmp = page_chain_tail(chain_first, NULL);
146         BUG_ON(tmp != chain_last);
147 #endif
148
149         /* add chain to head */
150         set_page_private(chain_last, (unsigned long)*head);
151         *head = chain_first;
152 }
153
154 static struct page *__drbd_alloc_pages(struct drbd_device *device,
155                                        unsigned int number)
156 {
157         struct page *page = NULL;
158         struct page *tmp = NULL;
159         unsigned int i = 0;
160
161         /* Yes, testing drbd_pp_vacant outside the lock is racy.
162          * So what. It saves a spin_lock. */
163         if (drbd_pp_vacant >= number) {
164                 spin_lock(&drbd_pp_lock);
165                 page = page_chain_del(&drbd_pp_pool, number);
166                 if (page)
167                         drbd_pp_vacant -= number;
168                 spin_unlock(&drbd_pp_lock);
169                 if (page)
170                         return page;
171         }
172
173         /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
174          * "criss-cross" setup, that might cause write-out on some other DRBD,
175          * which in turn might block on the other node at this very place.  */
176         for (i = 0; i < number; i++) {
177                 tmp = alloc_page(GFP_TRY);
178                 if (!tmp)
179                         break;
180                 set_page_private(tmp, (unsigned long)page);
181                 page = tmp;
182         }
183
184         if (i == number)
185                 return page;
186
187         /* Not enough pages immediately available this time.
188          * No need to jump around here, drbd_alloc_pages will retry this
189          * function "soon". */
190         if (page) {
191                 tmp = page_chain_tail(page, NULL);
192                 spin_lock(&drbd_pp_lock);
193                 page_chain_add(&drbd_pp_pool, page, tmp);
194                 drbd_pp_vacant += i;
195                 spin_unlock(&drbd_pp_lock);
196         }
197         return NULL;
198 }
199
200 static void reclaim_finished_net_peer_reqs(struct drbd_device *device,
201                                            struct list_head *to_be_freed)
202 {
203         struct drbd_peer_request *peer_req, *tmp;
204
205         /* The EEs are always appended to the end of the list. Since
206            they are sent in order over the wire, they have to finish
207            in order. As soon as we see the first not finished we can
208            stop to examine the list... */
209
210         list_for_each_entry_safe(peer_req, tmp, &device->net_ee, w.list) {
211                 if (drbd_peer_req_has_active_page(peer_req))
212                         break;
213                 list_move(&peer_req->w.list, to_be_freed);
214         }
215 }
216
217 static void drbd_kick_lo_and_reclaim_net(struct drbd_device *device)
218 {
219         LIST_HEAD(reclaimed);
220         struct drbd_peer_request *peer_req, *t;
221
222         spin_lock_irq(&device->resource->req_lock);
223         reclaim_finished_net_peer_reqs(device, &reclaimed);
224         spin_unlock_irq(&device->resource->req_lock);
225
226         list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
227                 drbd_free_net_peer_req(device, peer_req);
228 }
229
230 /**
231  * drbd_alloc_pages() - Returns @number pages, retries forever (or until signalled)
232  * @device:     DRBD device.
233  * @number:     number of pages requested
234  * @retry:      whether to retry, if not enough pages are available right now
235  *
236  * Tries to allocate number pages, first from our own page pool, then from
237  * the kernel, unless this allocation would exceed the max_buffers setting.
238  * Possibly retry until DRBD frees sufficient pages somewhere else.
239  *
240  * Returns a page chain linked via page->private.
241  */
242 struct page *drbd_alloc_pages(struct drbd_peer_device *peer_device, unsigned int number,
243                               bool retry)
244 {
245         struct drbd_device *device = peer_device->device;
246         struct page *page = NULL;
247         struct net_conf *nc;
248         DEFINE_WAIT(wait);
249         int mxb;
250
251         /* Yes, we may run up to @number over max_buffers. If we
252          * follow it strictly, the admin will get it wrong anyways. */
253         rcu_read_lock();
254         nc = rcu_dereference(peer_device->connection->net_conf);
255         mxb = nc ? nc->max_buffers : 1000000;
256         rcu_read_unlock();
257
258         if (atomic_read(&device->pp_in_use) < mxb)
259                 page = __drbd_alloc_pages(device, number);
260
261         while (page == NULL) {
262                 prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
263
264                 drbd_kick_lo_and_reclaim_net(device);
265
266                 if (atomic_read(&device->pp_in_use) < mxb) {
267                         page = __drbd_alloc_pages(device, number);
268                         if (page)
269                                 break;
270                 }
271
272                 if (!retry)
273                         break;
274
275                 if (signal_pending(current)) {
276                         drbd_warn(device, "drbd_alloc_pages interrupted!\n");
277                         break;
278                 }
279
280                 schedule();
281         }
282         finish_wait(&drbd_pp_wait, &wait);
283
284         if (page)
285                 atomic_add(number, &device->pp_in_use);
286         return page;
287 }
288
289 /* Must not be used from irq, as that may deadlock: see drbd_alloc_pages.
290  * Is also used from inside an other spin_lock_irq(&resource->req_lock);
291  * Either links the page chain back to the global pool,
292  * or returns all pages to the system. */
293 static void drbd_free_pages(struct drbd_device *device, struct page *page, int is_net)
294 {
295         atomic_t *a = is_net ? &device->pp_in_use_by_net : &device->pp_in_use;
296         int i;
297
298         if (page == NULL)
299                 return;
300
301         if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * minor_count)
302                 i = page_chain_free(page);
303         else {
304                 struct page *tmp;
305                 tmp = page_chain_tail(page, &i);
306                 spin_lock(&drbd_pp_lock);
307                 page_chain_add(&drbd_pp_pool, page, tmp);
308                 drbd_pp_vacant += i;
309                 spin_unlock(&drbd_pp_lock);
310         }
311         i = atomic_sub_return(i, a);
312         if (i < 0)
313                 drbd_warn(device, "ASSERTION FAILED: %s: %d < 0\n",
314                         is_net ? "pp_in_use_by_net" : "pp_in_use", i);
315         wake_up(&drbd_pp_wait);
316 }
317
318 /*
319 You need to hold the req_lock:
320  _drbd_wait_ee_list_empty()
321
322 You must not have the req_lock:
323  drbd_free_peer_req()
324  drbd_alloc_peer_req()
325  drbd_free_peer_reqs()
326  drbd_ee_fix_bhs()
327  drbd_finish_peer_reqs()
328  drbd_clear_done_ee()
329  drbd_wait_ee_list_empty()
330 */
331
332 struct drbd_peer_request *
333 drbd_alloc_peer_req(struct drbd_peer_device *peer_device, u64 id, sector_t sector,
334                     unsigned int data_size, gfp_t gfp_mask) __must_hold(local)
335 {
336         struct drbd_device *device = peer_device->device;
337         struct drbd_peer_request *peer_req;
338         struct page *page = NULL;
339         unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
340
341         if (drbd_insert_fault(device, DRBD_FAULT_AL_EE))
342                 return NULL;
343
344         peer_req = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
345         if (!peer_req) {
346                 if (!(gfp_mask & __GFP_NOWARN))
347                         drbd_err(device, "%s: allocation failed\n", __func__);
348                 return NULL;
349         }
350
351         if (data_size) {
352                 page = drbd_alloc_pages(peer_device, nr_pages, (gfp_mask & __GFP_WAIT));
353                 if (!page)
354                         goto fail;
355         }
356
357         drbd_clear_interval(&peer_req->i);
358         peer_req->i.size = data_size;
359         peer_req->i.sector = sector;
360         peer_req->i.local = false;
361         peer_req->i.waiting = false;
362
363         peer_req->epoch = NULL;
364         peer_req->peer_device = peer_device;
365         peer_req->pages = page;
366         atomic_set(&peer_req->pending_bios, 0);
367         peer_req->flags = 0;
368         /*
369          * The block_id is opaque to the receiver.  It is not endianness
370          * converted, and sent back to the sender unchanged.
371          */
372         peer_req->block_id = id;
373
374         return peer_req;
375
376  fail:
377         mempool_free(peer_req, drbd_ee_mempool);
378         return NULL;
379 }
380
381 void __drbd_free_peer_req(struct drbd_device *device, struct drbd_peer_request *peer_req,
382                        int is_net)
383 {
384         if (peer_req->flags & EE_HAS_DIGEST)
385                 kfree(peer_req->digest);
386         drbd_free_pages(device, peer_req->pages, is_net);
387         D_ASSERT(device, atomic_read(&peer_req->pending_bios) == 0);
388         D_ASSERT(device, drbd_interval_empty(&peer_req->i));
389         mempool_free(peer_req, drbd_ee_mempool);
390 }
391
392 int drbd_free_peer_reqs(struct drbd_device *device, struct list_head *list)
393 {
394         LIST_HEAD(work_list);
395         struct drbd_peer_request *peer_req, *t;
396         int count = 0;
397         int is_net = list == &device->net_ee;
398
399         spin_lock_irq(&device->resource->req_lock);
400         list_splice_init(list, &work_list);
401         spin_unlock_irq(&device->resource->req_lock);
402
403         list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
404                 __drbd_free_peer_req(device, peer_req, is_net);
405                 count++;
406         }
407         return count;
408 }
409
410 /*
411  * See also comments in _req_mod(,BARRIER_ACKED) and receive_Barrier.
412  */
413 static int drbd_finish_peer_reqs(struct drbd_device *device)
414 {
415         LIST_HEAD(work_list);
416         LIST_HEAD(reclaimed);
417         struct drbd_peer_request *peer_req, *t;
418         int err = 0;
419
420         spin_lock_irq(&device->resource->req_lock);
421         reclaim_finished_net_peer_reqs(device, &reclaimed);
422         list_splice_init(&device->done_ee, &work_list);
423         spin_unlock_irq(&device->resource->req_lock);
424
425         list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
426                 drbd_free_net_peer_req(device, peer_req);
427
428         /* possible callbacks here:
429          * e_end_block, and e_end_resync_block, e_send_superseded.
430          * all ignore the last argument.
431          */
432         list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
433                 int err2;
434
435                 /* list_del not necessary, next/prev members not touched */
436                 err2 = peer_req->w.cb(&peer_req->w, !!err);
437                 if (!err)
438                         err = err2;
439                 drbd_free_peer_req(device, peer_req);
440         }
441         wake_up(&device->ee_wait);
442
443         return err;
444 }
445
446 static void _drbd_wait_ee_list_empty(struct drbd_device *device,
447                                      struct list_head *head)
448 {
449         DEFINE_WAIT(wait);
450
451         /* avoids spin_lock/unlock
452          * and calling prepare_to_wait in the fast path */
453         while (!list_empty(head)) {
454                 prepare_to_wait(&device->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
455                 spin_unlock_irq(&device->resource->req_lock);
456                 io_schedule();
457                 finish_wait(&device->ee_wait, &wait);
458                 spin_lock_irq(&device->resource->req_lock);
459         }
460 }
461
462 static void drbd_wait_ee_list_empty(struct drbd_device *device,
463                                     struct list_head *head)
464 {
465         spin_lock_irq(&device->resource->req_lock);
466         _drbd_wait_ee_list_empty(device, head);
467         spin_unlock_irq(&device->resource->req_lock);
468 }
469
470 static int drbd_recv_short(struct socket *sock, void *buf, size_t size, int flags)
471 {
472         struct kvec iov = {
473                 .iov_base = buf,
474                 .iov_len = size,
475         };
476         struct msghdr msg = {
477                 .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
478         };
479         return kernel_recvmsg(sock, &msg, &iov, 1, size, msg.msg_flags);
480 }
481
482 static int drbd_recv(struct drbd_connection *connection, void *buf, size_t size)
483 {
484         int rv;
485
486         rv = drbd_recv_short(connection->data.socket, buf, size, 0);
487
488         if (rv < 0) {
489                 if (rv == -ECONNRESET)
490                         drbd_info(connection, "sock was reset by peer\n");
491                 else if (rv != -ERESTARTSYS)
492                         drbd_err(connection, "sock_recvmsg returned %d\n", rv);
493         } else if (rv == 0) {
494                 if (test_bit(DISCONNECT_SENT, &connection->flags)) {
495                         long t;
496                         rcu_read_lock();
497                         t = rcu_dereference(connection->net_conf)->ping_timeo * HZ/10;
498                         rcu_read_unlock();
499
500                         t = wait_event_timeout(connection->ping_wait, connection->cstate < C_WF_REPORT_PARAMS, t);
501
502                         if (t)
503                                 goto out;
504                 }
505                 drbd_info(connection, "sock was shut down by peer\n");
506         }
507
508         if (rv != size)
509                 conn_request_state(connection, NS(conn, C_BROKEN_PIPE), CS_HARD);
510
511 out:
512         return rv;
513 }
514
515 static int drbd_recv_all(struct drbd_connection *connection, void *buf, size_t size)
516 {
517         int err;
518
519         err = drbd_recv(connection, buf, size);
520         if (err != size) {
521                 if (err >= 0)
522                         err = -EIO;
523         } else
524                 err = 0;
525         return err;
526 }
527
528 static int drbd_recv_all_warn(struct drbd_connection *connection, void *buf, size_t size)
529 {
530         int err;
531
532         err = drbd_recv_all(connection, buf, size);
533         if (err && !signal_pending(current))
534                 drbd_warn(connection, "short read (expected size %d)\n", (int)size);
535         return err;
536 }
537
538 /* quoting tcp(7):
539  *   On individual connections, the socket buffer size must be set prior to the
540  *   listen(2) or connect(2) calls in order to have it take effect.
541  * This is our wrapper to do so.
542  */
543 static void drbd_setbufsize(struct socket *sock, unsigned int snd,
544                 unsigned int rcv)
545 {
546         /* open coded SO_SNDBUF, SO_RCVBUF */
547         if (snd) {
548                 sock->sk->sk_sndbuf = snd;
549                 sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
550         }
551         if (rcv) {
552                 sock->sk->sk_rcvbuf = rcv;
553                 sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
554         }
555 }
556
557 static struct socket *drbd_try_connect(struct drbd_connection *connection)
558 {
559         const char *what;
560         struct socket *sock;
561         struct sockaddr_in6 src_in6;
562         struct sockaddr_in6 peer_in6;
563         struct net_conf *nc;
564         int err, peer_addr_len, my_addr_len;
565         int sndbuf_size, rcvbuf_size, connect_int;
566         int disconnect_on_error = 1;
567
568         rcu_read_lock();
569         nc = rcu_dereference(connection->net_conf);
570         if (!nc) {
571                 rcu_read_unlock();
572                 return NULL;
573         }
574         sndbuf_size = nc->sndbuf_size;
575         rcvbuf_size = nc->rcvbuf_size;
576         connect_int = nc->connect_int;
577         rcu_read_unlock();
578
579         my_addr_len = min_t(int, connection->my_addr_len, sizeof(src_in6));
580         memcpy(&src_in6, &connection->my_addr, my_addr_len);
581
582         if (((struct sockaddr *)&connection->my_addr)->sa_family == AF_INET6)
583                 src_in6.sin6_port = 0;
584         else
585                 ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
586
587         peer_addr_len = min_t(int, connection->peer_addr_len, sizeof(src_in6));
588         memcpy(&peer_in6, &connection->peer_addr, peer_addr_len);
589
590         what = "sock_create_kern";
591         err = sock_create_kern(((struct sockaddr *)&src_in6)->sa_family,
592                                SOCK_STREAM, IPPROTO_TCP, &sock);
593         if (err < 0) {
594                 sock = NULL;
595                 goto out;
596         }
597
598         sock->sk->sk_rcvtimeo =
599         sock->sk->sk_sndtimeo = connect_int * HZ;
600         drbd_setbufsize(sock, sndbuf_size, rcvbuf_size);
601
602        /* explicitly bind to the configured IP as source IP
603         *  for the outgoing connections.
604         *  This is needed for multihomed hosts and to be
605         *  able to use lo: interfaces for drbd.
606         * Make sure to use 0 as port number, so linux selects
607         *  a free one dynamically.
608         */
609         what = "bind before connect";
610         err = sock->ops->bind(sock, (struct sockaddr *) &src_in6, my_addr_len);
611         if (err < 0)
612                 goto out;
613
614         /* connect may fail, peer not yet available.
615          * stay C_WF_CONNECTION, don't go Disconnecting! */
616         disconnect_on_error = 0;
617         what = "connect";
618         err = sock->ops->connect(sock, (struct sockaddr *) &peer_in6, peer_addr_len, 0);
619
620 out:
621         if (err < 0) {
622                 if (sock) {
623                         sock_release(sock);
624                         sock = NULL;
625                 }
626                 switch (-err) {
627                         /* timeout, busy, signal pending */
628                 case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
629                 case EINTR: case ERESTARTSYS:
630                         /* peer not (yet) available, network problem */
631                 case ECONNREFUSED: case ENETUNREACH:
632                 case EHOSTDOWN:    case EHOSTUNREACH:
633                         disconnect_on_error = 0;
634                         break;
635                 default:
636                         drbd_err(connection, "%s failed, err = %d\n", what, err);
637                 }
638                 if (disconnect_on_error)
639                         conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
640         }
641
642         return sock;
643 }
644
645 struct accept_wait_data {
646         struct drbd_connection *connection;
647         struct socket *s_listen;
648         struct completion door_bell;
649         void (*original_sk_state_change)(struct sock *sk);
650
651 };
652
653 static void drbd_incoming_connection(struct sock *sk)
654 {
655         struct accept_wait_data *ad = sk->sk_user_data;
656         void (*state_change)(struct sock *sk);
657
658         state_change = ad->original_sk_state_change;
659         if (sk->sk_state == TCP_ESTABLISHED)
660                 complete(&ad->door_bell);
661         state_change(sk);
662 }
663
664 static int prepare_listen_socket(struct drbd_connection *connection, struct accept_wait_data *ad)
665 {
666         int err, sndbuf_size, rcvbuf_size, my_addr_len;
667         struct sockaddr_in6 my_addr;
668         struct socket *s_listen;
669         struct net_conf *nc;
670         const char *what;
671
672         rcu_read_lock();
673         nc = rcu_dereference(connection->net_conf);
674         if (!nc) {
675                 rcu_read_unlock();
676                 return -EIO;
677         }
678         sndbuf_size = nc->sndbuf_size;
679         rcvbuf_size = nc->rcvbuf_size;
680         rcu_read_unlock();
681
682         my_addr_len = min_t(int, connection->my_addr_len, sizeof(struct sockaddr_in6));
683         memcpy(&my_addr, &connection->my_addr, my_addr_len);
684
685         what = "sock_create_kern";
686         err = sock_create_kern(((struct sockaddr *)&my_addr)->sa_family,
687                                SOCK_STREAM, IPPROTO_TCP, &s_listen);
688         if (err) {
689                 s_listen = NULL;
690                 goto out;
691         }
692
693         s_listen->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
694         drbd_setbufsize(s_listen, sndbuf_size, rcvbuf_size);
695
696         what = "bind before listen";
697         err = s_listen->ops->bind(s_listen, (struct sockaddr *)&my_addr, my_addr_len);
698         if (err < 0)
699                 goto out;
700
701         ad->s_listen = s_listen;
702         write_lock_bh(&s_listen->sk->sk_callback_lock);
703         ad->original_sk_state_change = s_listen->sk->sk_state_change;
704         s_listen->sk->sk_state_change = drbd_incoming_connection;
705         s_listen->sk->sk_user_data = ad;
706         write_unlock_bh(&s_listen->sk->sk_callback_lock);
707
708         what = "listen";
709         err = s_listen->ops->listen(s_listen, 5);
710         if (err < 0)
711                 goto out;
712
713         return 0;
714 out:
715         if (s_listen)
716                 sock_release(s_listen);
717         if (err < 0) {
718                 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
719                         drbd_err(connection, "%s failed, err = %d\n", what, err);
720                         conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
721                 }
722         }
723
724         return -EIO;
725 }
726
727 static void unregister_state_change(struct sock *sk, struct accept_wait_data *ad)
728 {
729         write_lock_bh(&sk->sk_callback_lock);
730         sk->sk_state_change = ad->original_sk_state_change;
731         sk->sk_user_data = NULL;
732         write_unlock_bh(&sk->sk_callback_lock);
733 }
734
735 static struct socket *drbd_wait_for_connect(struct drbd_connection *connection, struct accept_wait_data *ad)
736 {
737         int timeo, connect_int, err = 0;
738         struct socket *s_estab = NULL;
739         struct net_conf *nc;
740
741         rcu_read_lock();
742         nc = rcu_dereference(connection->net_conf);
743         if (!nc) {
744                 rcu_read_unlock();
745                 return NULL;
746         }
747         connect_int = nc->connect_int;
748         rcu_read_unlock();
749
750         timeo = connect_int * HZ;
751         /* 28.5% random jitter */
752         timeo += (prandom_u32() & 1) ? timeo / 7 : -timeo / 7;
753
754         err = wait_for_completion_interruptible_timeout(&ad->door_bell, timeo);
755         if (err <= 0)
756                 return NULL;
757
758         err = kernel_accept(ad->s_listen, &s_estab, 0);
759         if (err < 0) {
760                 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
761                         drbd_err(connection, "accept failed, err = %d\n", err);
762                         conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
763                 }
764         }
765
766         if (s_estab)
767                 unregister_state_change(s_estab->sk, ad);
768
769         return s_estab;
770 }
771
772 static int decode_header(struct drbd_connection *, void *, struct packet_info *);
773
774 static int send_first_packet(struct drbd_connection *connection, struct drbd_socket *sock,
775                              enum drbd_packet cmd)
776 {
777         if (!conn_prepare_command(connection, sock))
778                 return -EIO;
779         return conn_send_command(connection, sock, cmd, 0, NULL, 0);
780 }
781
782 static int receive_first_packet(struct drbd_connection *connection, struct socket *sock)
783 {
784         unsigned int header_size = drbd_header_size(connection);
785         struct packet_info pi;
786         int err;
787
788         err = drbd_recv_short(sock, connection->data.rbuf, header_size, 0);
789         if (err != header_size) {
790                 if (err >= 0)
791                         err = -EIO;
792                 return err;
793         }
794         err = decode_header(connection, connection->data.rbuf, &pi);
795         if (err)
796                 return err;
797         return pi.cmd;
798 }
799
800 /**
801  * drbd_socket_okay() - Free the socket if its connection is not okay
802  * @sock:       pointer to the pointer to the socket.
803  */
804 static int drbd_socket_okay(struct socket **sock)
805 {
806         int rr;
807         char tb[4];
808
809         if (!*sock)
810                 return false;
811
812         rr = drbd_recv_short(*sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
813
814         if (rr > 0 || rr == -EAGAIN) {
815                 return true;
816         } else {
817                 sock_release(*sock);
818                 *sock = NULL;
819                 return false;
820         }
821 }
822 /* Gets called if a connection is established, or if a new minor gets created
823    in a connection */
824 int drbd_connected(struct drbd_peer_device *peer_device)
825 {
826         struct drbd_device *device = peer_device->device;
827         int err;
828
829         atomic_set(&device->packet_seq, 0);
830         device->peer_seq = 0;
831
832         device->state_mutex = peer_device->connection->agreed_pro_version < 100 ?
833                 &peer_device->connection->cstate_mutex :
834                 &device->own_state_mutex;
835
836         err = drbd_send_sync_param(peer_device);
837         if (!err)
838                 err = drbd_send_sizes(peer_device, 0, 0);
839         if (!err)
840                 err = drbd_send_uuids(peer_device);
841         if (!err)
842                 err = drbd_send_current_state(peer_device);
843         clear_bit(USE_DEGR_WFC_T, &device->flags);
844         clear_bit(RESIZE_PENDING, &device->flags);
845         atomic_set(&device->ap_in_flight, 0);
846         mod_timer(&device->request_timer, jiffies + HZ); /* just start it here. */
847         return err;
848 }
849
850 /*
851  * return values:
852  *   1 yes, we have a valid connection
853  *   0 oops, did not work out, please try again
854  *  -1 peer talks different language,
855  *     no point in trying again, please go standalone.
856  *  -2 We do not have a network config...
857  */
858 static int conn_connect(struct drbd_connection *connection)
859 {
860         struct drbd_socket sock, msock;
861         struct drbd_peer_device *peer_device;
862         struct net_conf *nc;
863         int vnr, timeout, h, ok;
864         bool discard_my_data;
865         enum drbd_state_rv rv;
866         struct accept_wait_data ad = {
867                 .connection = connection,
868                 .door_bell = COMPLETION_INITIALIZER_ONSTACK(ad.door_bell),
869         };
870
871         clear_bit(DISCONNECT_SENT, &connection->flags);
872         if (conn_request_state(connection, NS(conn, C_WF_CONNECTION), CS_VERBOSE) < SS_SUCCESS)
873                 return -2;
874
875         mutex_init(&sock.mutex);
876         sock.sbuf = connection->data.sbuf;
877         sock.rbuf = connection->data.rbuf;
878         sock.socket = NULL;
879         mutex_init(&msock.mutex);
880         msock.sbuf = connection->meta.sbuf;
881         msock.rbuf = connection->meta.rbuf;
882         msock.socket = NULL;
883
884         /* Assume that the peer only understands protocol 80 until we know better.  */
885         connection->agreed_pro_version = 80;
886
887         if (prepare_listen_socket(connection, &ad))
888                 return 0;
889
890         do {
891                 struct socket *s;
892
893                 s = drbd_try_connect(connection);
894                 if (s) {
895                         if (!sock.socket) {
896                                 sock.socket = s;
897                                 send_first_packet(connection, &sock, P_INITIAL_DATA);
898                         } else if (!msock.socket) {
899                                 clear_bit(RESOLVE_CONFLICTS, &connection->flags);
900                                 msock.socket = s;
901                                 send_first_packet(connection, &msock, P_INITIAL_META);
902                         } else {
903                                 drbd_err(connection, "Logic error in conn_connect()\n");
904                                 goto out_release_sockets;
905                         }
906                 }
907
908                 if (sock.socket && msock.socket) {
909                         rcu_read_lock();
910                         nc = rcu_dereference(connection->net_conf);
911                         timeout = nc->ping_timeo * HZ / 10;
912                         rcu_read_unlock();
913                         schedule_timeout_interruptible(timeout);
914                         ok = drbd_socket_okay(&sock.socket);
915                         ok = drbd_socket_okay(&msock.socket) && ok;
916                         if (ok)
917                                 break;
918                 }
919
920 retry:
921                 s = drbd_wait_for_connect(connection, &ad);
922                 if (s) {
923                         int fp = receive_first_packet(connection, s);
924                         drbd_socket_okay(&sock.socket);
925                         drbd_socket_okay(&msock.socket);
926                         switch (fp) {
927                         case P_INITIAL_DATA:
928                                 if (sock.socket) {
929                                         drbd_warn(connection, "initial packet S crossed\n");
930                                         sock_release(sock.socket);
931                                         sock.socket = s;
932                                         goto randomize;
933                                 }
934                                 sock.socket = s;
935                                 break;
936                         case P_INITIAL_META:
937                                 set_bit(RESOLVE_CONFLICTS, &connection->flags);
938                                 if (msock.socket) {
939                                         drbd_warn(connection, "initial packet M crossed\n");
940                                         sock_release(msock.socket);
941                                         msock.socket = s;
942                                         goto randomize;
943                                 }
944                                 msock.socket = s;
945                                 break;
946                         default:
947                                 drbd_warn(connection, "Error receiving initial packet\n");
948                                 sock_release(s);
949 randomize:
950                                 if (prandom_u32() & 1)
951                                         goto retry;
952                         }
953                 }
954
955                 if (connection->cstate <= C_DISCONNECTING)
956                         goto out_release_sockets;
957                 if (signal_pending(current)) {
958                         flush_signals(current);
959                         smp_rmb();
960                         if (get_t_state(&connection->receiver) == EXITING)
961                                 goto out_release_sockets;
962                 }
963
964                 ok = drbd_socket_okay(&sock.socket);
965                 ok = drbd_socket_okay(&msock.socket) && ok;
966         } while (!ok);
967
968         if (ad.s_listen)
969                 sock_release(ad.s_listen);
970
971         sock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
972         msock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
973
974         sock.socket->sk->sk_allocation = GFP_NOIO;
975         msock.socket->sk->sk_allocation = GFP_NOIO;
976
977         sock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
978         msock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE;
979
980         /* NOT YET ...
981          * sock.socket->sk->sk_sndtimeo = connection->net_conf->timeout*HZ/10;
982          * sock.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
983          * first set it to the P_CONNECTION_FEATURES timeout,
984          * which we set to 4x the configured ping_timeout. */
985         rcu_read_lock();
986         nc = rcu_dereference(connection->net_conf);
987
988         sock.socket->sk->sk_sndtimeo =
989         sock.socket->sk->sk_rcvtimeo = nc->ping_timeo*4*HZ/10;
990
991         msock.socket->sk->sk_rcvtimeo = nc->ping_int*HZ;
992         timeout = nc->timeout * HZ / 10;
993         discard_my_data = nc->discard_my_data;
994         rcu_read_unlock();
995
996         msock.socket->sk->sk_sndtimeo = timeout;
997
998         /* we don't want delays.
999          * we use TCP_CORK where appropriate, though */
1000         drbd_tcp_nodelay(sock.socket);
1001         drbd_tcp_nodelay(msock.socket);
1002
1003         connection->data.socket = sock.socket;
1004         connection->meta.socket = msock.socket;
1005         connection->last_received = jiffies;
1006
1007         h = drbd_do_features(connection);
1008         if (h <= 0)
1009                 return h;
1010
1011         if (connection->cram_hmac_tfm) {
1012                 /* drbd_request_state(device, NS(conn, WFAuth)); */
1013                 switch (drbd_do_auth(connection)) {
1014                 case -1:
1015                         drbd_err(connection, "Authentication of peer failed\n");
1016                         return -1;
1017                 case 0:
1018                         drbd_err(connection, "Authentication of peer failed, trying again.\n");
1019                         return 0;
1020                 }
1021         }
1022
1023         connection->data.socket->sk->sk_sndtimeo = timeout;
1024         connection->data.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
1025
1026         if (drbd_send_protocol(connection) == -EOPNOTSUPP)
1027                 return -1;
1028
1029         set_bit(STATE_SENT, &connection->flags);
1030
1031         rcu_read_lock();
1032         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1033                 struct drbd_device *device = peer_device->device;
1034                 kref_get(&device->kref);
1035                 rcu_read_unlock();
1036
1037                 /* Prevent a race between resync-handshake and
1038                  * being promoted to Primary.
1039                  *
1040                  * Grab and release the state mutex, so we know that any current
1041                  * drbd_set_role() is finished, and any incoming drbd_set_role
1042                  * will see the STATE_SENT flag, and wait for it to be cleared.
1043                  */
1044                 mutex_lock(device->state_mutex);
1045                 mutex_unlock(device->state_mutex);
1046
1047                 if (discard_my_data)
1048                         set_bit(DISCARD_MY_DATA, &device->flags);
1049                 else
1050                         clear_bit(DISCARD_MY_DATA, &device->flags);
1051
1052                 drbd_connected(peer_device);
1053                 kref_put(&device->kref, drbd_destroy_device);
1054                 rcu_read_lock();
1055         }
1056         rcu_read_unlock();
1057
1058         rv = conn_request_state(connection, NS(conn, C_WF_REPORT_PARAMS), CS_VERBOSE);
1059         if (rv < SS_SUCCESS || connection->cstate != C_WF_REPORT_PARAMS) {
1060                 clear_bit(STATE_SENT, &connection->flags);
1061                 return 0;
1062         }
1063
1064         drbd_thread_start(&connection->asender);
1065
1066         mutex_lock(&connection->resource->conf_update);
1067         /* The discard_my_data flag is a single-shot modifier to the next
1068          * connection attempt, the handshake of which is now well underway.
1069          * No need for rcu style copying of the whole struct
1070          * just to clear a single value. */
1071         connection->net_conf->discard_my_data = 0;
1072         mutex_unlock(&connection->resource->conf_update);
1073
1074         return h;
1075
1076 out_release_sockets:
1077         if (ad.s_listen)
1078                 sock_release(ad.s_listen);
1079         if (sock.socket)
1080                 sock_release(sock.socket);
1081         if (msock.socket)
1082                 sock_release(msock.socket);
1083         return -1;
1084 }
1085
1086 static int decode_header(struct drbd_connection *connection, void *header, struct packet_info *pi)
1087 {
1088         unsigned int header_size = drbd_header_size(connection);
1089
1090         if (header_size == sizeof(struct p_header100) &&
1091             *(__be32 *)header == cpu_to_be32(DRBD_MAGIC_100)) {
1092                 struct p_header100 *h = header;
1093                 if (h->pad != 0) {
1094                         drbd_err(connection, "Header padding is not zero\n");
1095                         return -EINVAL;
1096                 }
1097                 pi->vnr = be16_to_cpu(h->volume);
1098                 pi->cmd = be16_to_cpu(h->command);
1099                 pi->size = be32_to_cpu(h->length);
1100         } else if (header_size == sizeof(struct p_header95) &&
1101                    *(__be16 *)header == cpu_to_be16(DRBD_MAGIC_BIG)) {
1102                 struct p_header95 *h = header;
1103                 pi->cmd = be16_to_cpu(h->command);
1104                 pi->size = be32_to_cpu(h->length);
1105                 pi->vnr = 0;
1106         } else if (header_size == sizeof(struct p_header80) &&
1107                    *(__be32 *)header == cpu_to_be32(DRBD_MAGIC)) {
1108                 struct p_header80 *h = header;
1109                 pi->cmd = be16_to_cpu(h->command);
1110                 pi->size = be16_to_cpu(h->length);
1111                 pi->vnr = 0;
1112         } else {
1113                 drbd_err(connection, "Wrong magic value 0x%08x in protocol version %d\n",
1114                          be32_to_cpu(*(__be32 *)header),
1115                          connection->agreed_pro_version);
1116                 return -EINVAL;
1117         }
1118         pi->data = header + header_size;
1119         return 0;
1120 }
1121
1122 static int drbd_recv_header(struct drbd_connection *connection, struct packet_info *pi)
1123 {
1124         void *buffer = connection->data.rbuf;
1125         int err;
1126
1127         err = drbd_recv_all_warn(connection, buffer, drbd_header_size(connection));
1128         if (err)
1129                 return err;
1130
1131         err = decode_header(connection, buffer, pi);
1132         connection->last_received = jiffies;
1133
1134         return err;
1135 }
1136
1137 static void drbd_flush(struct drbd_connection *connection)
1138 {
1139         int rv;
1140         struct drbd_peer_device *peer_device;
1141         int vnr;
1142
1143         if (connection->write_ordering >= WO_bdev_flush) {
1144                 rcu_read_lock();
1145                 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1146                         struct drbd_device *device = peer_device->device;
1147
1148                         if (!get_ldev(device))
1149                                 continue;
1150                         kref_get(&device->kref);
1151                         rcu_read_unlock();
1152
1153                         rv = blkdev_issue_flush(device->ldev->backing_bdev,
1154                                         GFP_NOIO, NULL);
1155                         if (rv) {
1156                                 drbd_info(device, "local disk flush failed with status %d\n", rv);
1157                                 /* would rather check on EOPNOTSUPP, but that is not reliable.
1158                                  * don't try again for ANY return value != 0
1159                                  * if (rv == -EOPNOTSUPP) */
1160                                 drbd_bump_write_ordering(connection, WO_drain_io);
1161                         }
1162                         put_ldev(device);
1163                         kref_put(&device->kref, drbd_destroy_device);
1164
1165                         rcu_read_lock();
1166                         if (rv)
1167                                 break;
1168                 }
1169                 rcu_read_unlock();
1170         }
1171 }
1172
1173 /**
1174  * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
1175  * @device:     DRBD device.
1176  * @epoch:      Epoch object.
1177  * @ev:         Epoch event.
1178  */
1179 static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *connection,
1180                                                struct drbd_epoch *epoch,
1181                                                enum epoch_event ev)
1182 {
1183         int epoch_size;
1184         struct drbd_epoch *next_epoch;
1185         enum finish_epoch rv = FE_STILL_LIVE;
1186
1187         spin_lock(&connection->epoch_lock);
1188         do {
1189                 next_epoch = NULL;
1190
1191                 epoch_size = atomic_read(&epoch->epoch_size);
1192
1193                 switch (ev & ~EV_CLEANUP) {
1194                 case EV_PUT:
1195                         atomic_dec(&epoch->active);
1196                         break;
1197                 case EV_GOT_BARRIER_NR:
1198                         set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1199                         break;
1200                 case EV_BECAME_LAST:
1201                         /* nothing to do*/
1202                         break;
1203                 }
1204
1205                 if (epoch_size != 0 &&
1206                     atomic_read(&epoch->active) == 0 &&
1207                     (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags) || ev & EV_CLEANUP)) {
1208                         if (!(ev & EV_CLEANUP)) {
1209                                 spin_unlock(&connection->epoch_lock);
1210                                 drbd_send_b_ack(epoch->connection, epoch->barrier_nr, epoch_size);
1211                                 spin_lock(&connection->epoch_lock);
1212                         }
1213 #if 0
1214                         /* FIXME: dec unacked on connection, once we have
1215                          * something to count pending connection packets in. */
1216                         if (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags))
1217                                 dec_unacked(epoch->connection);
1218 #endif
1219
1220                         if (connection->current_epoch != epoch) {
1221                                 next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1222                                 list_del(&epoch->list);
1223                                 ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1224                                 connection->epochs--;
1225                                 kfree(epoch);
1226
1227                                 if (rv == FE_STILL_LIVE)
1228                                         rv = FE_DESTROYED;
1229                         } else {
1230                                 epoch->flags = 0;
1231                                 atomic_set(&epoch->epoch_size, 0);
1232                                 /* atomic_set(&epoch->active, 0); is already zero */
1233                                 if (rv == FE_STILL_LIVE)
1234                                         rv = FE_RECYCLED;
1235                         }
1236                 }
1237
1238                 if (!next_epoch)
1239                         break;
1240
1241                 epoch = next_epoch;
1242         } while (1);
1243
1244         spin_unlock(&connection->epoch_lock);
1245
1246         return rv;
1247 }
1248
1249 /**
1250  * drbd_bump_write_ordering() - Fall back to an other write ordering method
1251  * @connection: DRBD connection.
1252  * @wo:         Write ordering method to try.
1253  */
1254 void drbd_bump_write_ordering(struct drbd_connection *connection, enum write_ordering_e wo)
1255 {
1256         struct disk_conf *dc;
1257         struct drbd_peer_device *peer_device;
1258         enum write_ordering_e pwo;
1259         int vnr;
1260         static char *write_ordering_str[] = {
1261                 [WO_none] = "none",
1262                 [WO_drain_io] = "drain",
1263                 [WO_bdev_flush] = "flush",
1264         };
1265
1266         pwo = connection->write_ordering;
1267         wo = min(pwo, wo);
1268         rcu_read_lock();
1269         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1270                 struct drbd_device *device = peer_device->device;
1271
1272                 if (!get_ldev_if_state(device, D_ATTACHING))
1273                         continue;
1274                 dc = rcu_dereference(device->ldev->disk_conf);
1275
1276                 if (wo == WO_bdev_flush && !dc->disk_flushes)
1277                         wo = WO_drain_io;
1278                 if (wo == WO_drain_io && !dc->disk_drain)
1279                         wo = WO_none;
1280                 put_ldev(device);
1281         }
1282         rcu_read_unlock();
1283         connection->write_ordering = wo;
1284         if (pwo != connection->write_ordering || wo == WO_bdev_flush)
1285                 drbd_info(connection, "Method to ensure write ordering: %s\n", write_ordering_str[connection->write_ordering]);
1286 }
1287
1288 /**
1289  * drbd_submit_peer_request()
1290  * @device:     DRBD device.
1291  * @peer_req:   peer request
1292  * @rw:         flag field, see bio->bi_rw
1293  *
1294  * May spread the pages to multiple bios,
1295  * depending on bio_add_page restrictions.
1296  *
1297  * Returns 0 if all bios have been submitted,
1298  * -ENOMEM if we could not allocate enough bios,
1299  * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1300  *  single page to an empty bio (which should never happen and likely indicates
1301  *  that the lower level IO stack is in some way broken). This has been observed
1302  *  on certain Xen deployments.
1303  */
1304 /* TODO allocate from our own bio_set. */
1305 int drbd_submit_peer_request(struct drbd_device *device,
1306                              struct drbd_peer_request *peer_req,
1307                              const unsigned rw, const int fault_type)
1308 {
1309         struct bio *bios = NULL;
1310         struct bio *bio;
1311         struct page *page = peer_req->pages;
1312         sector_t sector = peer_req->i.sector;
1313         unsigned ds = peer_req->i.size;
1314         unsigned n_bios = 0;
1315         unsigned nr_pages = (ds + PAGE_SIZE -1) >> PAGE_SHIFT;
1316         int err = -ENOMEM;
1317
1318         /* In most cases, we will only need one bio.  But in case the lower
1319          * level restrictions happen to be different at this offset on this
1320          * side than those of the sending peer, we may need to submit the
1321          * request in more than one bio.
1322          *
1323          * Plain bio_alloc is good enough here, this is no DRBD internally
1324          * generated bio, but a bio allocated on behalf of the peer.
1325          */
1326 next_bio:
1327         bio = bio_alloc(GFP_NOIO, nr_pages);
1328         if (!bio) {
1329                 drbd_err(device, "submit_ee: Allocation of a bio failed\n");
1330                 goto fail;
1331         }
1332         /* > peer_req->i.sector, unless this is the first bio */
1333         bio->bi_iter.bi_sector = sector;
1334         bio->bi_bdev = device->ldev->backing_bdev;
1335         bio->bi_rw = rw;
1336         bio->bi_private = peer_req;
1337         bio->bi_end_io = drbd_peer_request_endio;
1338
1339         bio->bi_next = bios;
1340         bios = bio;
1341         ++n_bios;
1342
1343         page_chain_for_each(page) {
1344                 unsigned len = min_t(unsigned, ds, PAGE_SIZE);
1345                 if (!bio_add_page(bio, page, len, 0)) {
1346                         /* A single page must always be possible!
1347                          * But in case it fails anyways,
1348                          * we deal with it, and complain (below). */
1349                         if (bio->bi_vcnt == 0) {
1350                                 drbd_err(device,
1351                                         "bio_add_page failed for len=%u, "
1352                                         "bi_vcnt=0 (bi_sector=%llu)\n",
1353                                         len, (uint64_t)bio->bi_iter.bi_sector);
1354                                 err = -ENOSPC;
1355                                 goto fail;
1356                         }
1357                         goto next_bio;
1358                 }
1359                 ds -= len;
1360                 sector += len >> 9;
1361                 --nr_pages;
1362         }
1363         D_ASSERT(device, page == NULL);
1364         D_ASSERT(device, ds == 0);
1365
1366         atomic_set(&peer_req->pending_bios, n_bios);
1367         do {
1368                 bio = bios;
1369                 bios = bios->bi_next;
1370                 bio->bi_next = NULL;
1371
1372                 drbd_generic_make_request(device, fault_type, bio);
1373         } while (bios);
1374         return 0;
1375
1376 fail:
1377         while (bios) {
1378                 bio = bios;
1379                 bios = bios->bi_next;
1380                 bio_put(bio);
1381         }
1382         return err;
1383 }
1384
1385 static void drbd_remove_epoch_entry_interval(struct drbd_device *device,
1386                                              struct drbd_peer_request *peer_req)
1387 {
1388         struct drbd_interval *i = &peer_req->i;
1389
1390         drbd_remove_interval(&device->write_requests, i);
1391         drbd_clear_interval(i);
1392
1393         /* Wake up any processes waiting for this peer request to complete.  */
1394         if (i->waiting)
1395                 wake_up(&device->misc_wait);
1396 }
1397
1398 static void conn_wait_active_ee_empty(struct drbd_connection *connection)
1399 {
1400         struct drbd_peer_device *peer_device;
1401         int vnr;
1402
1403         rcu_read_lock();
1404         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1405                 struct drbd_device *device = peer_device->device;
1406
1407                 kref_get(&device->kref);
1408                 rcu_read_unlock();
1409                 drbd_wait_ee_list_empty(device, &device->active_ee);
1410                 kref_put(&device->kref, drbd_destroy_device);
1411                 rcu_read_lock();
1412         }
1413         rcu_read_unlock();
1414 }
1415
1416 static struct drbd_peer_device *
1417 conn_peer_device(struct drbd_connection *connection, int volume_number)
1418 {
1419         return idr_find(&connection->peer_devices, volume_number);
1420 }
1421
1422 static int receive_Barrier(struct drbd_connection *connection, struct packet_info *pi)
1423 {
1424         int rv;
1425         struct p_barrier *p = pi->data;
1426         struct drbd_epoch *epoch;
1427
1428         /* FIXME these are unacked on connection,
1429          * not a specific (peer)device.
1430          */
1431         connection->current_epoch->barrier_nr = p->barrier;
1432         connection->current_epoch->connection = connection;
1433         rv = drbd_may_finish_epoch(connection, connection->current_epoch, EV_GOT_BARRIER_NR);
1434
1435         /* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1436          * the activity log, which means it would not be resynced in case the
1437          * R_PRIMARY crashes now.
1438          * Therefore we must send the barrier_ack after the barrier request was
1439          * completed. */
1440         switch (connection->write_ordering) {
1441         case WO_none:
1442                 if (rv == FE_RECYCLED)
1443                         return 0;
1444
1445                 /* receiver context, in the writeout path of the other node.
1446                  * avoid potential distributed deadlock */
1447                 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1448                 if (epoch)
1449                         break;
1450                 else
1451                         drbd_warn(connection, "Allocation of an epoch failed, slowing down\n");
1452                         /* Fall through */
1453
1454         case WO_bdev_flush:
1455         case WO_drain_io:
1456                 conn_wait_active_ee_empty(connection);
1457                 drbd_flush(connection);
1458
1459                 if (atomic_read(&connection->current_epoch->epoch_size)) {
1460                         epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1461                         if (epoch)
1462                                 break;
1463                 }
1464
1465                 return 0;
1466         default:
1467                 drbd_err(connection, "Strangeness in connection->write_ordering %d\n", connection->write_ordering);
1468                 return -EIO;
1469         }
1470
1471         epoch->flags = 0;
1472         atomic_set(&epoch->epoch_size, 0);
1473         atomic_set(&epoch->active, 0);
1474
1475         spin_lock(&connection->epoch_lock);
1476         if (atomic_read(&connection->current_epoch->epoch_size)) {
1477                 list_add(&epoch->list, &connection->current_epoch->list);
1478                 connection->current_epoch = epoch;
1479                 connection->epochs++;
1480         } else {
1481                 /* The current_epoch got recycled while we allocated this one... */
1482                 kfree(epoch);
1483         }
1484         spin_unlock(&connection->epoch_lock);
1485
1486         return 0;
1487 }
1488
1489 /* used from receive_RSDataReply (recv_resync_read)
1490  * and from receive_Data */
1491 static struct drbd_peer_request *
1492 read_in_block(struct drbd_peer_device *peer_device, u64 id, sector_t sector,
1493               int data_size) __must_hold(local)
1494 {
1495         struct drbd_device *device = peer_device->device;
1496         const sector_t capacity = drbd_get_capacity(device->this_bdev);
1497         struct drbd_peer_request *peer_req;
1498         struct page *page;
1499         int dgs, ds, err;
1500         void *dig_in = peer_device->connection->int_dig_in;
1501         void *dig_vv = peer_device->connection->int_dig_vv;
1502         unsigned long *data;
1503
1504         dgs = 0;
1505         if (peer_device->connection->peer_integrity_tfm) {
1506                 dgs = crypto_hash_digestsize(peer_device->connection->peer_integrity_tfm);
1507                 /*
1508                  * FIXME: Receive the incoming digest into the receive buffer
1509                  *        here, together with its struct p_data?
1510                  */
1511                 err = drbd_recv_all_warn(peer_device->connection, dig_in, dgs);
1512                 if (err)
1513                         return NULL;
1514                 data_size -= dgs;
1515         }
1516
1517         if (!expect(IS_ALIGNED(data_size, 512)))
1518                 return NULL;
1519         if (!expect(data_size <= DRBD_MAX_BIO_SIZE))
1520                 return NULL;
1521
1522         /* even though we trust out peer,
1523          * we sometimes have to double check. */
1524         if (sector + (data_size>>9) > capacity) {
1525                 drbd_err(device, "request from peer beyond end of local disk: "
1526                         "capacity: %llus < sector: %llus + size: %u\n",
1527                         (unsigned long long)capacity,
1528                         (unsigned long long)sector, data_size);
1529                 return NULL;
1530         }
1531
1532         /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1533          * "criss-cross" setup, that might cause write-out on some other DRBD,
1534          * which in turn might block on the other node at this very place.  */
1535         peer_req = drbd_alloc_peer_req(peer_device, id, sector, data_size, GFP_NOIO);
1536         if (!peer_req)
1537                 return NULL;
1538
1539         if (!data_size)
1540                 return peer_req;
1541
1542         ds = data_size;
1543         page = peer_req->pages;
1544         page_chain_for_each(page) {
1545                 unsigned len = min_t(int, ds, PAGE_SIZE);
1546                 data = kmap(page);
1547                 err = drbd_recv_all_warn(peer_device->connection, data, len);
1548                 if (drbd_insert_fault(device, DRBD_FAULT_RECEIVE)) {
1549                         drbd_err(device, "Fault injection: Corrupting data on receive\n");
1550                         data[0] = data[0] ^ (unsigned long)-1;
1551                 }
1552                 kunmap(page);
1553                 if (err) {
1554                         drbd_free_peer_req(device, peer_req);
1555                         return NULL;
1556                 }
1557                 ds -= len;
1558         }
1559
1560         if (dgs) {
1561                 drbd_csum_ee(peer_device->connection->peer_integrity_tfm, peer_req, dig_vv);
1562                 if (memcmp(dig_in, dig_vv, dgs)) {
1563                         drbd_err(device, "Digest integrity check FAILED: %llus +%u\n",
1564                                 (unsigned long long)sector, data_size);
1565                         drbd_free_peer_req(device, peer_req);
1566                         return NULL;
1567                 }
1568         }
1569         device->recv_cnt += data_size>>9;
1570         return peer_req;
1571 }
1572
1573 /* drbd_drain_block() just takes a data block
1574  * out of the socket input buffer, and discards it.
1575  */
1576 static int drbd_drain_block(struct drbd_peer_device *peer_device, int data_size)
1577 {
1578         struct page *page;
1579         int err = 0;
1580         void *data;
1581
1582         if (!data_size)
1583                 return 0;
1584
1585         page = drbd_alloc_pages(peer_device, 1, 1);
1586
1587         data = kmap(page);
1588         while (data_size) {
1589                 unsigned int len = min_t(int, data_size, PAGE_SIZE);
1590
1591                 err = drbd_recv_all_warn(peer_device->connection, data, len);
1592                 if (err)
1593                         break;
1594                 data_size -= len;
1595         }
1596         kunmap(page);
1597         drbd_free_pages(peer_device->device, page, 0);
1598         return err;
1599 }
1600
1601 static int recv_dless_read(struct drbd_peer_device *peer_device, struct drbd_request *req,
1602                            sector_t sector, int data_size)
1603 {
1604         struct bio_vec bvec;
1605         struct bvec_iter iter;
1606         struct bio *bio;
1607         int dgs, err, expect;
1608         void *dig_in = peer_device->connection->int_dig_in;
1609         void *dig_vv = peer_device->connection->int_dig_vv;
1610
1611         dgs = 0;
1612         if (peer_device->connection->peer_integrity_tfm) {
1613                 dgs = crypto_hash_digestsize(peer_device->connection->peer_integrity_tfm);
1614                 err = drbd_recv_all_warn(peer_device->connection, dig_in, dgs);
1615                 if (err)
1616                         return err;
1617                 data_size -= dgs;
1618         }
1619
1620         /* optimistically update recv_cnt.  if receiving fails below,
1621          * we disconnect anyways, and counters will be reset. */
1622         peer_device->device->recv_cnt += data_size>>9;
1623
1624         bio = req->master_bio;
1625         D_ASSERT(peer_device->device, sector == bio->bi_iter.bi_sector);
1626
1627         bio_for_each_segment(bvec, bio, iter) {
1628                 void *mapped = kmap(bvec.bv_page) + bvec.bv_offset;
1629                 expect = min_t(int, data_size, bvec.bv_len);
1630                 err = drbd_recv_all_warn(peer_device->connection, mapped, expect);
1631                 kunmap(bvec.bv_page);
1632                 if (err)
1633                         return err;
1634                 data_size -= expect;
1635         }
1636
1637         if (dgs) {
1638                 drbd_csum_bio(peer_device->connection->peer_integrity_tfm, bio, dig_vv);
1639                 if (memcmp(dig_in, dig_vv, dgs)) {
1640                         drbd_err(peer_device, "Digest integrity check FAILED. Broken NICs?\n");
1641                         return -EINVAL;
1642                 }
1643         }
1644
1645         D_ASSERT(peer_device->device, data_size == 0);
1646         return 0;
1647 }
1648
1649 /*
1650  * e_end_resync_block() is called in asender context via
1651  * drbd_finish_peer_reqs().
1652  */
1653 static int e_end_resync_block(struct drbd_work *w, int unused)
1654 {
1655         struct drbd_peer_request *peer_req =
1656                 container_of(w, struct drbd_peer_request, w);
1657         struct drbd_peer_device *peer_device = peer_req->peer_device;
1658         struct drbd_device *device = peer_device->device;
1659         sector_t sector = peer_req->i.sector;
1660         int err;
1661
1662         D_ASSERT(device, drbd_interval_empty(&peer_req->i));
1663
1664         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1665                 drbd_set_in_sync(device, sector, peer_req->i.size);
1666                 err = drbd_send_ack(peer_device, P_RS_WRITE_ACK, peer_req);
1667         } else {
1668                 /* Record failure to sync */
1669                 drbd_rs_failed_io(device, sector, peer_req->i.size);
1670
1671                 err  = drbd_send_ack(peer_device, P_NEG_ACK, peer_req);
1672         }
1673         dec_unacked(device);
1674
1675         return err;
1676 }
1677
1678 static int recv_resync_read(struct drbd_peer_device *peer_device, sector_t sector,
1679                             int data_size) __releases(local)
1680 {
1681         struct drbd_device *device = peer_device->device;
1682         struct drbd_peer_request *peer_req;
1683
1684         peer_req = read_in_block(peer_device, ID_SYNCER, sector, data_size);
1685         if (!peer_req)
1686                 goto fail;
1687
1688         dec_rs_pending(device);
1689
1690         inc_unacked(device);
1691         /* corresponding dec_unacked() in e_end_resync_block()
1692          * respective _drbd_clear_done_ee */
1693
1694         peer_req->w.cb = e_end_resync_block;
1695
1696         spin_lock_irq(&device->resource->req_lock);
1697         list_add(&peer_req->w.list, &device->sync_ee);
1698         spin_unlock_irq(&device->resource->req_lock);
1699
1700         atomic_add(data_size >> 9, &device->rs_sect_ev);
1701         if (drbd_submit_peer_request(device, peer_req, WRITE, DRBD_FAULT_RS_WR) == 0)
1702                 return 0;
1703
1704         /* don't care for the reason here */
1705         drbd_err(device, "submit failed, triggering re-connect\n");
1706         spin_lock_irq(&device->resource->req_lock);
1707         list_del(&peer_req->w.list);
1708         spin_unlock_irq(&device->resource->req_lock);
1709
1710         drbd_free_peer_req(device, peer_req);
1711 fail:
1712         put_ldev(device);
1713         return -EIO;
1714 }
1715
1716 static struct drbd_request *
1717 find_request(struct drbd_device *device, struct rb_root *root, u64 id,
1718              sector_t sector, bool missing_ok, const char *func)
1719 {
1720         struct drbd_request *req;
1721
1722         /* Request object according to our peer */
1723         req = (struct drbd_request *)(unsigned long)id;
1724         if (drbd_contains_interval(root, sector, &req->i) && req->i.local)
1725                 return req;
1726         if (!missing_ok) {
1727                 drbd_err(device, "%s: failed to find request 0x%lx, sector %llus\n", func,
1728                         (unsigned long)id, (unsigned long long)sector);
1729         }
1730         return NULL;
1731 }
1732
1733 static int receive_DataReply(struct drbd_connection *connection, struct packet_info *pi)
1734 {
1735         struct drbd_peer_device *peer_device;
1736         struct drbd_device *device;
1737         struct drbd_request *req;
1738         sector_t sector;
1739         int err;
1740         struct p_data *p = pi->data;
1741
1742         peer_device = conn_peer_device(connection, pi->vnr);
1743         if (!peer_device)
1744                 return -EIO;
1745         device = peer_device->device;
1746
1747         sector = be64_to_cpu(p->sector);
1748
1749         spin_lock_irq(&device->resource->req_lock);
1750         req = find_request(device, &device->read_requests, p->block_id, sector, false, __func__);
1751         spin_unlock_irq(&device->resource->req_lock);
1752         if (unlikely(!req))
1753                 return -EIO;
1754
1755         /* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
1756          * special casing it there for the various failure cases.
1757          * still no race with drbd_fail_pending_reads */
1758         err = recv_dless_read(peer_device, req, sector, pi->size);
1759         if (!err)
1760                 req_mod(req, DATA_RECEIVED);
1761         /* else: nothing. handled from drbd_disconnect...
1762          * I don't think we may complete this just yet
1763          * in case we are "on-disconnect: freeze" */
1764
1765         return err;
1766 }
1767
1768 static int receive_RSDataReply(struct drbd_connection *connection, struct packet_info *pi)
1769 {
1770         struct drbd_peer_device *peer_device;
1771         struct drbd_device *device;
1772         sector_t sector;
1773         int err;
1774         struct p_data *p = pi->data;
1775
1776         peer_device = conn_peer_device(connection, pi->vnr);
1777         if (!peer_device)
1778                 return -EIO;
1779         device = peer_device->device;
1780
1781         sector = be64_to_cpu(p->sector);
1782         D_ASSERT(device, p->block_id == ID_SYNCER);
1783
1784         if (get_ldev(device)) {
1785                 /* data is submitted to disk within recv_resync_read.
1786                  * corresponding put_ldev done below on error,
1787                  * or in drbd_peer_request_endio. */
1788                 err = recv_resync_read(peer_device, sector, pi->size);
1789         } else {
1790                 if (__ratelimit(&drbd_ratelimit_state))
1791                         drbd_err(device, "Can not write resync data to local disk.\n");
1792
1793                 err = drbd_drain_block(peer_device, pi->size);
1794
1795                 drbd_send_ack_dp(peer_device, P_NEG_ACK, p, pi->size);
1796         }
1797
1798         atomic_add(pi->size >> 9, &device->rs_sect_in);
1799
1800         return err;
1801 }
1802
1803 static void restart_conflicting_writes(struct drbd_device *device,
1804                                        sector_t sector, int size)
1805 {
1806         struct drbd_interval *i;
1807         struct drbd_request *req;
1808
1809         drbd_for_each_overlap(i, &device->write_requests, sector, size) {
1810                 if (!i->local)
1811                         continue;
1812                 req = container_of(i, struct drbd_request, i);
1813                 if (req->rq_state & RQ_LOCAL_PENDING ||
1814                     !(req->rq_state & RQ_POSTPONED))
1815                         continue;
1816                 /* as it is RQ_POSTPONED, this will cause it to
1817                  * be queued on the retry workqueue. */
1818                 __req_mod(req, CONFLICT_RESOLVED, NULL);
1819         }
1820 }
1821
1822 /*
1823  * e_end_block() is called in asender context via drbd_finish_peer_reqs().
1824  */
1825 static int e_end_block(struct drbd_work *w, int cancel)
1826 {
1827         struct drbd_peer_request *peer_req =
1828                 container_of(w, struct drbd_peer_request, w);
1829         struct drbd_peer_device *peer_device = peer_req->peer_device;
1830         struct drbd_device *device = peer_device->device;
1831         sector_t sector = peer_req->i.sector;
1832         int err = 0, pcmd;
1833
1834         if (peer_req->flags & EE_SEND_WRITE_ACK) {
1835                 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1836                         pcmd = (device->state.conn >= C_SYNC_SOURCE &&
1837                                 device->state.conn <= C_PAUSED_SYNC_T &&
1838                                 peer_req->flags & EE_MAY_SET_IN_SYNC) ?
1839                                 P_RS_WRITE_ACK : P_WRITE_ACK;
1840                         err = drbd_send_ack(peer_device, pcmd, peer_req);
1841                         if (pcmd == P_RS_WRITE_ACK)
1842                                 drbd_set_in_sync(device, sector, peer_req->i.size);
1843                 } else {
1844                         err = drbd_send_ack(peer_device, P_NEG_ACK, peer_req);
1845                         /* we expect it to be marked out of sync anyways...
1846                          * maybe assert this?  */
1847                 }
1848                 dec_unacked(device);
1849         }
1850         /* we delete from the conflict detection hash _after_ we sent out the
1851          * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right.  */
1852         if (peer_req->flags & EE_IN_INTERVAL_TREE) {
1853                 spin_lock_irq(&device->resource->req_lock);
1854                 D_ASSERT(device, !drbd_interval_empty(&peer_req->i));
1855                 drbd_remove_epoch_entry_interval(device, peer_req);
1856                 if (peer_req->flags & EE_RESTART_REQUESTS)
1857                         restart_conflicting_writes(device, sector, peer_req->i.size);
1858                 spin_unlock_irq(&device->resource->req_lock);
1859         } else
1860                 D_ASSERT(device, drbd_interval_empty(&peer_req->i));
1861
1862         drbd_may_finish_epoch(first_peer_device(device)->connection, peer_req->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
1863
1864         return err;
1865 }
1866
1867 static int e_send_ack(struct drbd_work *w, enum drbd_packet ack)
1868 {
1869         struct drbd_peer_request *peer_req =
1870                 container_of(w, struct drbd_peer_request, w);
1871         struct drbd_peer_device *peer_device = peer_req->peer_device;
1872         int err;
1873
1874         err = drbd_send_ack(peer_device, ack, peer_req);
1875         dec_unacked(peer_device->device);
1876
1877         return err;
1878 }
1879
1880 static int e_send_superseded(struct drbd_work *w, int unused)
1881 {
1882         return e_send_ack(w, P_SUPERSEDED);
1883 }
1884
1885 static int e_send_retry_write(struct drbd_work *w, int unused)
1886 {
1887         struct drbd_peer_request *peer_req =
1888                 container_of(w, struct drbd_peer_request, w);
1889         struct drbd_connection *connection = peer_req->peer_device->connection;
1890
1891         return e_send_ack(w, connection->agreed_pro_version >= 100 ?
1892                              P_RETRY_WRITE : P_SUPERSEDED);
1893 }
1894
1895 static bool seq_greater(u32 a, u32 b)
1896 {
1897         /*
1898          * We assume 32-bit wrap-around here.
1899          * For 24-bit wrap-around, we would have to shift:
1900          *  a <<= 8; b <<= 8;
1901          */
1902         return (s32)a - (s32)b > 0;
1903 }
1904
1905 static u32 seq_max(u32 a, u32 b)
1906 {
1907         return seq_greater(a, b) ? a : b;
1908 }
1909
1910 static void update_peer_seq(struct drbd_peer_device *peer_device, unsigned int peer_seq)
1911 {
1912         struct drbd_device *device = peer_device->device;
1913         unsigned int newest_peer_seq;
1914
1915         if (test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)) {
1916                 spin_lock(&device->peer_seq_lock);
1917                 newest_peer_seq = seq_max(device->peer_seq, peer_seq);
1918                 device->peer_seq = newest_peer_seq;
1919                 spin_unlock(&device->peer_seq_lock);
1920                 /* wake up only if we actually changed device->peer_seq */
1921                 if (peer_seq == newest_peer_seq)
1922                         wake_up(&device->seq_wait);
1923         }
1924 }
1925
1926 static inline int overlaps(sector_t s1, int l1, sector_t s2, int l2)
1927 {
1928         return !((s1 + (l1>>9) <= s2) || (s1 >= s2 + (l2>>9)));
1929 }
1930
1931 /* maybe change sync_ee into interval trees as well? */
1932 static bool overlapping_resync_write(struct drbd_device *device, struct drbd_peer_request *peer_req)
1933 {
1934         struct drbd_peer_request *rs_req;
1935         bool rv = 0;
1936
1937         spin_lock_irq(&device->resource->req_lock);
1938         list_for_each_entry(rs_req, &device->sync_ee, w.list) {
1939                 if (overlaps(peer_req->i.sector, peer_req->i.size,
1940                              rs_req->i.sector, rs_req->i.size)) {
1941                         rv = 1;
1942                         break;
1943                 }
1944         }
1945         spin_unlock_irq(&device->resource->req_lock);
1946
1947         return rv;
1948 }
1949
1950 /* Called from receive_Data.
1951  * Synchronize packets on sock with packets on msock.
1952  *
1953  * This is here so even when a P_DATA packet traveling via sock overtook an Ack
1954  * packet traveling on msock, they are still processed in the order they have
1955  * been sent.
1956  *
1957  * Note: we don't care for Ack packets overtaking P_DATA packets.
1958  *
1959  * In case packet_seq is larger than device->peer_seq number, there are
1960  * outstanding packets on the msock. We wait for them to arrive.
1961  * In case we are the logically next packet, we update device->peer_seq
1962  * ourselves. Correctly handles 32bit wrap around.
1963  *
1964  * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
1965  * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
1966  * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
1967  * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
1968  *
1969  * returns 0 if we may process the packet,
1970  * -ERESTARTSYS if we were interrupted (by disconnect signal). */
1971 static int wait_for_and_update_peer_seq(struct drbd_peer_device *peer_device, const u32 peer_seq)
1972 {
1973         struct drbd_device *device = peer_device->device;
1974         DEFINE_WAIT(wait);
1975         long timeout;
1976         int ret = 0, tp;
1977
1978         if (!test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags))
1979                 return 0;
1980
1981         spin_lock(&device->peer_seq_lock);
1982         for (;;) {
1983                 if (!seq_greater(peer_seq - 1, device->peer_seq)) {
1984                         device->peer_seq = seq_max(device->peer_seq, peer_seq);
1985                         break;
1986                 }
1987
1988                 if (signal_pending(current)) {
1989                         ret = -ERESTARTSYS;
1990                         break;
1991                 }
1992
1993                 rcu_read_lock();
1994                 tp = rcu_dereference(first_peer_device(device)->connection->net_conf)->two_primaries;
1995                 rcu_read_unlock();
1996
1997                 if (!tp)
1998                         break;
1999
2000                 /* Only need to wait if two_primaries is enabled */
2001                 prepare_to_wait(&device->seq_wait, &wait, TASK_INTERRUPTIBLE);
2002                 spin_unlock(&device->peer_seq_lock);
2003                 rcu_read_lock();
2004                 timeout = rcu_dereference(peer_device->connection->net_conf)->ping_timeo*HZ/10;
2005                 rcu_read_unlock();
2006                 timeout = schedule_timeout(timeout);
2007                 spin_lock(&device->peer_seq_lock);
2008                 if (!timeout) {
2009                         ret = -ETIMEDOUT;
2010                         drbd_err(device, "Timed out waiting for missing ack packets; disconnecting\n");
2011                         break;
2012                 }
2013         }
2014         spin_unlock(&device->peer_seq_lock);
2015         finish_wait(&device->seq_wait, &wait);
2016         return ret;
2017 }
2018
2019 /* see also bio_flags_to_wire()
2020  * DRBD_REQ_*, because we need to semantically map the flags to data packet
2021  * flags and back. We may replicate to other kernel versions. */
2022 static unsigned long wire_flags_to_bio(u32 dpf)
2023 {
2024         return  (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
2025                 (dpf & DP_FUA ? REQ_FUA : 0) |
2026                 (dpf & DP_FLUSH ? REQ_FLUSH : 0) |
2027                 (dpf & DP_DISCARD ? REQ_DISCARD : 0);
2028 }
2029
2030 static void fail_postponed_requests(struct drbd_device *device, sector_t sector,
2031                                     unsigned int size)
2032 {
2033         struct drbd_interval *i;
2034
2035     repeat:
2036         drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2037                 struct drbd_request *req;
2038                 struct bio_and_error m;
2039
2040                 if (!i->local)
2041                         continue;
2042                 req = container_of(i, struct drbd_request, i);
2043                 if (!(req->rq_state & RQ_POSTPONED))
2044                         continue;
2045                 req->rq_state &= ~RQ_POSTPONED;
2046                 __req_mod(req, NEG_ACKED, &m);
2047                 spin_unlock_irq(&device->resource->req_lock);
2048                 if (m.bio)
2049                         complete_master_bio(device, &m);
2050                 spin_lock_irq(&device->resource->req_lock);
2051                 goto repeat;
2052         }
2053 }
2054
2055 static int handle_write_conflicts(struct drbd_device *device,
2056                                   struct drbd_peer_request *peer_req)
2057 {
2058         struct drbd_connection *connection = peer_req->peer_device->connection;
2059         bool resolve_conflicts = test_bit(RESOLVE_CONFLICTS, &connection->flags);
2060         sector_t sector = peer_req->i.sector;
2061         const unsigned int size = peer_req->i.size;
2062         struct drbd_interval *i;
2063         bool equal;
2064         int err;
2065
2066         /*
2067          * Inserting the peer request into the write_requests tree will prevent
2068          * new conflicting local requests from being added.
2069          */
2070         drbd_insert_interval(&device->write_requests, &peer_req->i);
2071
2072     repeat:
2073         drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2074                 if (i == &peer_req->i)
2075                         continue;
2076
2077                 if (!i->local) {
2078                         /*
2079                          * Our peer has sent a conflicting remote request; this
2080                          * should not happen in a two-node setup.  Wait for the
2081                          * earlier peer request to complete.
2082                          */
2083                         err = drbd_wait_misc(device, i);
2084                         if (err)
2085                                 goto out;
2086                         goto repeat;
2087                 }
2088
2089                 equal = i->sector == sector && i->size == size;
2090                 if (resolve_conflicts) {
2091                         /*
2092                          * If the peer request is fully contained within the
2093                          * overlapping request, it can be considered overwritten
2094                          * and thus superseded; otherwise, it will be retried
2095                          * once all overlapping requests have completed.
2096                          */
2097                         bool superseded = i->sector <= sector && i->sector +
2098                                        (i->size >> 9) >= sector + (size >> 9);
2099
2100                         if (!equal)
2101                                 drbd_alert(device, "Concurrent writes detected: "
2102                                                "local=%llus +%u, remote=%llus +%u, "
2103                                                "assuming %s came first\n",
2104                                           (unsigned long long)i->sector, i->size,
2105                                           (unsigned long long)sector, size,
2106                                           superseded ? "local" : "remote");
2107
2108                         inc_unacked(device);
2109                         peer_req->w.cb = superseded ? e_send_superseded :
2110                                                    e_send_retry_write;
2111                         list_add_tail(&peer_req->w.list, &device->done_ee);
2112                         wake_asender(connection);
2113
2114                         err = -ENOENT;
2115                         goto out;
2116                 } else {
2117                         struct drbd_request *req =
2118                                 container_of(i, struct drbd_request, i);
2119
2120                         if (!equal)
2121                                 drbd_alert(device, "Concurrent writes detected: "
2122                                                "local=%llus +%u, remote=%llus +%u\n",
2123                                           (unsigned long long)i->sector, i->size,
2124                                           (unsigned long long)sector, size);
2125
2126                         if (req->rq_state & RQ_LOCAL_PENDING ||
2127                             !(req->rq_state & RQ_POSTPONED)) {
2128                                 /*
2129                                  * Wait for the node with the discard flag to
2130                                  * decide if this request has been superseded
2131                                  * or needs to be retried.
2132                                  * Requests that have been superseded will
2133                                  * disappear from the write_requests tree.
2134                                  *
2135                                  * In addition, wait for the conflicting
2136                                  * request to finish locally before submitting
2137                                  * the conflicting peer request.
2138                                  */
2139                                 err = drbd_wait_misc(device, &req->i);
2140                                 if (err) {
2141                                         _conn_request_state(connection, NS(conn, C_TIMEOUT), CS_HARD);
2142                                         fail_postponed_requests(device, sector, size);
2143                                         goto out;
2144                                 }
2145                                 goto repeat;
2146                         }
2147                         /*
2148                          * Remember to restart the conflicting requests after
2149                          * the new peer request has completed.
2150                          */
2151                         peer_req->flags |= EE_RESTART_REQUESTS;
2152                 }
2153         }
2154         err = 0;
2155
2156     out:
2157         if (err)
2158                 drbd_remove_epoch_entry_interval(device, peer_req);
2159         return err;
2160 }
2161
2162 /* mirrored write */
2163 static int receive_Data(struct drbd_connection *connection, struct packet_info *pi)
2164 {
2165         struct drbd_peer_device *peer_device;
2166         struct drbd_device *device;
2167         sector_t sector;
2168         struct drbd_peer_request *peer_req;
2169         struct p_data *p = pi->data;
2170         u32 peer_seq = be32_to_cpu(p->seq_num);
2171         int rw = WRITE;
2172         u32 dp_flags;
2173         int err, tp;
2174
2175         peer_device = conn_peer_device(connection, pi->vnr);
2176         if (!peer_device)
2177                 return -EIO;
2178         device = peer_device->device;
2179
2180         if (!get_ldev(device)) {
2181                 int err2;
2182
2183                 err = wait_for_and_update_peer_seq(peer_device, peer_seq);
2184                 drbd_send_ack_dp(peer_device, P_NEG_ACK, p, pi->size);
2185                 atomic_inc(&connection->current_epoch->epoch_size);
2186                 err2 = drbd_drain_block(peer_device, pi->size);
2187                 if (!err)
2188                         err = err2;
2189                 return err;
2190         }
2191
2192         /*
2193          * Corresponding put_ldev done either below (on various errors), or in
2194          * drbd_peer_request_endio, if we successfully submit the data at the
2195          * end of this function.
2196          */
2197
2198         sector = be64_to_cpu(p->sector);
2199         peer_req = read_in_block(peer_device, p->block_id, sector, pi->size);
2200         if (!peer_req) {
2201                 put_ldev(device);
2202                 return -EIO;
2203         }
2204
2205         peer_req->w.cb = e_end_block;
2206
2207         dp_flags = be32_to_cpu(p->dp_flags);
2208         rw |= wire_flags_to_bio(dp_flags);
2209         if (peer_req->pages == NULL) {
2210                 D_ASSERT(device, peer_req->i.size == 0);
2211                 D_ASSERT(device, dp_flags & DP_FLUSH);
2212         }
2213
2214         if (dp_flags & DP_MAY_SET_IN_SYNC)
2215                 peer_req->flags |= EE_MAY_SET_IN_SYNC;
2216
2217         spin_lock(&connection->epoch_lock);
2218         peer_req->epoch = connection->current_epoch;
2219         atomic_inc(&peer_req->epoch->epoch_size);
2220         atomic_inc(&peer_req->epoch->active);
2221         spin_unlock(&connection->epoch_lock);
2222
2223         rcu_read_lock();
2224         tp = rcu_dereference(peer_device->connection->net_conf)->two_primaries;
2225         rcu_read_unlock();
2226         if (tp) {
2227                 peer_req->flags |= EE_IN_INTERVAL_TREE;
2228                 err = wait_for_and_update_peer_seq(peer_device, peer_seq);
2229                 if (err)
2230                         goto out_interrupted;
2231                 spin_lock_irq(&device->resource->req_lock);
2232                 err = handle_write_conflicts(device, peer_req);
2233                 if (err) {
2234                         spin_unlock_irq(&device->resource->req_lock);
2235                         if (err == -ENOENT) {
2236                                 put_ldev(device);
2237                                 return 0;
2238                         }
2239                         goto out_interrupted;
2240                 }
2241         } else {
2242                 update_peer_seq(peer_device, peer_seq);
2243                 spin_lock_irq(&device->resource->req_lock);
2244         }
2245         list_add(&peer_req->w.list, &device->active_ee);
2246         spin_unlock_irq(&device->resource->req_lock);
2247
2248         if (device->state.conn == C_SYNC_TARGET)
2249                 wait_event(device->ee_wait, !overlapping_resync_write(device, peer_req));
2250
2251         if (peer_device->connection->agreed_pro_version < 100) {
2252                 rcu_read_lock();
2253                 switch (rcu_dereference(peer_device->connection->net_conf)->wire_protocol) {
2254                 case DRBD_PROT_C:
2255                         dp_flags |= DP_SEND_WRITE_ACK;
2256                         break;
2257                 case DRBD_PROT_B:
2258                         dp_flags |= DP_SEND_RECEIVE_ACK;
2259                         break;
2260                 }
2261                 rcu_read_unlock();
2262         }
2263
2264         if (dp_flags & DP_SEND_WRITE_ACK) {
2265                 peer_req->flags |= EE_SEND_WRITE_ACK;
2266                 inc_unacked(device);
2267                 /* corresponding dec_unacked() in e_end_block()
2268                  * respective _drbd_clear_done_ee */
2269         }
2270
2271         if (dp_flags & DP_SEND_RECEIVE_ACK) {
2272                 /* I really don't like it that the receiver thread
2273                  * sends on the msock, but anyways */
2274                 drbd_send_ack(first_peer_device(device), P_RECV_ACK, peer_req);
2275         }
2276
2277         if (device->state.pdsk < D_INCONSISTENT) {
2278                 /* In case we have the only disk of the cluster, */
2279                 drbd_set_out_of_sync(device, peer_req->i.sector, peer_req->i.size);
2280                 peer_req->flags |= EE_CALL_AL_COMPLETE_IO;
2281                 peer_req->flags &= ~EE_MAY_SET_IN_SYNC;
2282                 drbd_al_begin_io(device, &peer_req->i, true);
2283         }
2284
2285         err = drbd_submit_peer_request(device, peer_req, rw, DRBD_FAULT_DT_WR);
2286         if (!err)
2287                 return 0;
2288
2289         /* don't care for the reason here */
2290         drbd_err(device, "submit failed, triggering re-connect\n");
2291         spin_lock_irq(&device->resource->req_lock);
2292         list_del(&peer_req->w.list);
2293         drbd_remove_epoch_entry_interval(device, peer_req);
2294         spin_unlock_irq(&device->resource->req_lock);
2295         if (peer_req->flags & EE_CALL_AL_COMPLETE_IO)
2296                 drbd_al_complete_io(device, &peer_req->i);
2297
2298 out_interrupted:
2299         drbd_may_finish_epoch(connection, peer_req->epoch, EV_PUT + EV_CLEANUP);
2300         put_ldev(device);
2301         drbd_free_peer_req(device, peer_req);
2302         return err;
2303 }
2304
2305 /* We may throttle resync, if the lower device seems to be busy,
2306  * and current sync rate is above c_min_rate.
2307  *
2308  * To decide whether or not the lower device is busy, we use a scheme similar
2309  * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
2310  * (more than 64 sectors) of activity we cannot account for with our own resync
2311  * activity, it obviously is "busy".
2312  *
2313  * The current sync rate used here uses only the most recent two step marks,
2314  * to have a short time average so we can react faster.
2315  */
2316 int drbd_rs_should_slow_down(struct drbd_device *device, sector_t sector)
2317 {
2318         struct gendisk *disk = device->ldev->backing_bdev->bd_contains->bd_disk;
2319         unsigned long db, dt, dbdt;
2320         struct lc_element *tmp;
2321         int curr_events;
2322         int throttle = 0;
2323         unsigned int c_min_rate;
2324
2325         rcu_read_lock();
2326         c_min_rate = rcu_dereference(device->ldev->disk_conf)->c_min_rate;
2327         rcu_read_unlock();
2328
2329         /* feature disabled? */
2330         if (c_min_rate == 0)
2331                 return 0;
2332
2333         spin_lock_irq(&device->al_lock);
2334         tmp = lc_find(device->resync, BM_SECT_TO_EXT(sector));
2335         if (tmp) {
2336                 struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
2337                 if (test_bit(BME_PRIORITY, &bm_ext->flags)) {
2338                         spin_unlock_irq(&device->al_lock);
2339                         return 0;
2340                 }
2341                 /* Do not slow down if app IO is already waiting for this extent */
2342         }
2343         spin_unlock_irq(&device->al_lock);
2344
2345         curr_events = (int)part_stat_read(&disk->part0, sectors[0]) +
2346                       (int)part_stat_read(&disk->part0, sectors[1]) -
2347                         atomic_read(&device->rs_sect_ev);
2348
2349         if (!device->rs_last_events || curr_events - device->rs_last_events > 64) {
2350                 unsigned long rs_left;
2351                 int i;
2352
2353                 device->rs_last_events = curr_events;
2354
2355                 /* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
2356                  * approx. */
2357                 i = (device->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
2358
2359                 if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T)
2360                         rs_left = device->ov_left;
2361                 else
2362                         rs_left = drbd_bm_total_weight(device) - device->rs_failed;
2363
2364                 dt = ((long)jiffies - (long)device->rs_mark_time[i]) / HZ;
2365                 if (!dt)
2366                         dt++;
2367                 db = device->rs_mark_left[i] - rs_left;
2368                 dbdt = Bit2KB(db/dt);
2369
2370                 if (dbdt > c_min_rate)
2371                         throttle = 1;
2372         }
2373         return throttle;
2374 }
2375
2376
2377 static int receive_DataRequest(struct drbd_connection *connection, struct packet_info *pi)
2378 {
2379         struct drbd_peer_device *peer_device;
2380         struct drbd_device *device;
2381         sector_t sector;
2382         sector_t capacity;
2383         struct drbd_peer_request *peer_req;
2384         struct digest_info *di = NULL;
2385         int size, verb;
2386         unsigned int fault_type;
2387         struct p_block_req *p = pi->data;
2388
2389         peer_device = conn_peer_device(connection, pi->vnr);
2390         if (!peer_device)
2391                 return -EIO;
2392         device = peer_device->device;
2393         capacity = drbd_get_capacity(device->this_bdev);
2394
2395         sector = be64_to_cpu(p->sector);
2396         size   = be32_to_cpu(p->blksize);
2397
2398         if (size <= 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_BIO_SIZE) {
2399                 drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2400                                 (unsigned long long)sector, size);
2401                 return -EINVAL;
2402         }
2403         if (sector + (size>>9) > capacity) {
2404                 drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2405                                 (unsigned long long)sector, size);
2406                 return -EINVAL;
2407         }
2408
2409         if (!get_ldev_if_state(device, D_UP_TO_DATE)) {
2410                 verb = 1;
2411                 switch (pi->cmd) {
2412                 case P_DATA_REQUEST:
2413                         drbd_send_ack_rp(peer_device, P_NEG_DREPLY, p);
2414                         break;
2415                 case P_RS_DATA_REQUEST:
2416                 case P_CSUM_RS_REQUEST:
2417                 case P_OV_REQUEST:
2418                         drbd_send_ack_rp(peer_device, P_NEG_RS_DREPLY , p);
2419                         break;
2420                 case P_OV_REPLY:
2421                         verb = 0;
2422                         dec_rs_pending(device);
2423                         drbd_send_ack_ex(peer_device, P_OV_RESULT, sector, size, ID_IN_SYNC);
2424                         break;
2425                 default:
2426                         BUG();
2427                 }
2428                 if (verb && __ratelimit(&drbd_ratelimit_state))
2429                         drbd_err(device, "Can not satisfy peer's read request, "
2430                             "no local data.\n");
2431
2432                 /* drain possibly payload */
2433                 return drbd_drain_block(peer_device, pi->size);
2434         }
2435
2436         /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2437          * "criss-cross" setup, that might cause write-out on some other DRBD,
2438          * which in turn might block on the other node at this very place.  */
2439         peer_req = drbd_alloc_peer_req(peer_device, p->block_id, sector, size, GFP_NOIO);
2440         if (!peer_req) {
2441                 put_ldev(device);
2442                 return -ENOMEM;
2443         }
2444
2445         switch (pi->cmd) {
2446         case P_DATA_REQUEST:
2447                 peer_req->w.cb = w_e_end_data_req;
2448                 fault_type = DRBD_FAULT_DT_RD;
2449                 /* application IO, don't drbd_rs_begin_io */
2450                 goto submit;
2451
2452         case P_RS_DATA_REQUEST:
2453                 peer_req->w.cb = w_e_end_rsdata_req;
2454                 fault_type = DRBD_FAULT_RS_RD;
2455                 /* used in the sector offset progress display */
2456                 device->bm_resync_fo = BM_SECT_TO_BIT(sector);
2457                 break;
2458
2459         case P_OV_REPLY:
2460         case P_CSUM_RS_REQUEST:
2461                 fault_type = DRBD_FAULT_RS_RD;
2462                 di = kmalloc(sizeof(*di) + pi->size, GFP_NOIO);
2463                 if (!di)
2464                         goto out_free_e;
2465
2466                 di->digest_size = pi->size;
2467                 di->digest = (((char *)di)+sizeof(struct digest_info));
2468
2469                 peer_req->digest = di;
2470                 peer_req->flags |= EE_HAS_DIGEST;
2471
2472                 if (drbd_recv_all(peer_device->connection, di->digest, pi->size))
2473                         goto out_free_e;
2474
2475                 if (pi->cmd == P_CSUM_RS_REQUEST) {
2476                         D_ASSERT(device, peer_device->connection->agreed_pro_version >= 89);
2477                         peer_req->w.cb = w_e_end_csum_rs_req;
2478                         /* used in the sector offset progress display */
2479                         device->bm_resync_fo = BM_SECT_TO_BIT(sector);
2480                 } else if (pi->cmd == P_OV_REPLY) {
2481                         /* track progress, we may need to throttle */
2482                         atomic_add(size >> 9, &device->rs_sect_in);
2483                         peer_req->w.cb = w_e_end_ov_reply;
2484                         dec_rs_pending(device);
2485                         /* drbd_rs_begin_io done when we sent this request,
2486                          * but accounting still needs to be done. */
2487                         goto submit_for_resync;
2488                 }
2489                 break;
2490
2491         case P_OV_REQUEST:
2492                 if (device->ov_start_sector == ~(sector_t)0 &&
2493                     peer_device->connection->agreed_pro_version >= 90) {
2494                         unsigned long now = jiffies;
2495                         int i;
2496                         device->ov_start_sector = sector;
2497                         device->ov_position = sector;
2498                         device->ov_left = drbd_bm_bits(device) - BM_SECT_TO_BIT(sector);
2499                         device->rs_total = device->ov_left;
2500                         for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2501                                 device->rs_mark_left[i] = device->ov_left;
2502                                 device->rs_mark_time[i] = now;
2503                         }
2504                         drbd_info(device, "Online Verify start sector: %llu\n",
2505                                         (unsigned long long)sector);
2506                 }
2507                 peer_req->w.cb = w_e_end_ov_req;
2508                 fault_type = DRBD_FAULT_RS_RD;
2509                 break;
2510
2511         default:
2512                 BUG();
2513         }
2514
2515         /* Throttle, drbd_rs_begin_io and submit should become asynchronous
2516          * wrt the receiver, but it is not as straightforward as it may seem.
2517          * Various places in the resync start and stop logic assume resync
2518          * requests are processed in order, requeuing this on the worker thread
2519          * introduces a bunch of new code for synchronization between threads.
2520          *
2521          * Unlimited throttling before drbd_rs_begin_io may stall the resync
2522          * "forever", throttling after drbd_rs_begin_io will lock that extent
2523          * for application writes for the same time.  For now, just throttle
2524          * here, where the rest of the code expects the receiver to sleep for
2525          * a while, anyways.
2526          */
2527
2528         /* Throttle before drbd_rs_begin_io, as that locks out application IO;
2529          * this defers syncer requests for some time, before letting at least
2530          * on request through.  The resync controller on the receiving side
2531          * will adapt to the incoming rate accordingly.
2532          *
2533          * We cannot throttle here if remote is Primary/SyncTarget:
2534          * we would also throttle its application reads.
2535          * In that case, throttling is done on the SyncTarget only.
2536          */
2537         if (device->state.peer != R_PRIMARY && drbd_rs_should_slow_down(device, sector))
2538                 schedule_timeout_uninterruptible(HZ/10);
2539         if (drbd_rs_begin_io(device, sector))
2540                 goto out_free_e;
2541
2542 submit_for_resync:
2543         atomic_add(size >> 9, &device->rs_sect_ev);
2544
2545 submit:
2546         inc_unacked(device);
2547         spin_lock_irq(&device->resource->req_lock);
2548         list_add_tail(&peer_req->w.list, &device->read_ee);
2549         spin_unlock_irq(&device->resource->req_lock);
2550
2551         if (drbd_submit_peer_request(device, peer_req, READ, fault_type) == 0)
2552                 return 0;
2553
2554         /* don't care for the reason here */
2555         drbd_err(device, "submit failed, triggering re-connect\n");
2556         spin_lock_irq(&device->resource->req_lock);
2557         list_del(&peer_req->w.list);
2558         spin_unlock_irq(&device->resource->req_lock);
2559         /* no drbd_rs_complete_io(), we are dropping the connection anyways */
2560
2561 out_free_e:
2562         put_ldev(device);
2563         drbd_free_peer_req(device, peer_req);
2564         return -EIO;
2565 }
2566
2567 /**
2568  * drbd_asb_recover_0p  -  Recover after split-brain with no remaining primaries
2569  */
2570 static int drbd_asb_recover_0p(struct drbd_peer_device *peer_device) __must_hold(local)
2571 {
2572         struct drbd_device *device = peer_device->device;
2573         int self, peer, rv = -100;
2574         unsigned long ch_self, ch_peer;
2575         enum drbd_after_sb_p after_sb_0p;
2576
2577         self = device->ldev->md.uuid[UI_BITMAP] & 1;
2578         peer = device->p_uuid[UI_BITMAP] & 1;
2579
2580         ch_peer = device->p_uuid[UI_SIZE];
2581         ch_self = device->comm_bm_set;
2582
2583         rcu_read_lock();
2584         after_sb_0p = rcu_dereference(peer_device->connection->net_conf)->after_sb_0p;
2585         rcu_read_unlock();
2586         switch (after_sb_0p) {
2587         case ASB_CONSENSUS:
2588         case ASB_DISCARD_SECONDARY:
2589         case ASB_CALL_HELPER:
2590         case ASB_VIOLENTLY:
2591                 drbd_err(device, "Configuration error.\n");
2592                 break;
2593         case ASB_DISCONNECT:
2594                 break;
2595         case ASB_DISCARD_YOUNGER_PRI:
2596                 if (self == 0 && peer == 1) {
2597                         rv = -1;
2598                         break;
2599                 }
2600                 if (self == 1 && peer == 0) {
2601                         rv =  1;
2602                         break;
2603                 }
2604                 /* Else fall through to one of the other strategies... */
2605         case ASB_DISCARD_OLDER_PRI:
2606                 if (self == 0 && peer == 1) {
2607                         rv = 1;
2608                         break;
2609                 }
2610                 if (self == 1 && peer == 0) {
2611                         rv = -1;
2612                         break;
2613                 }
2614                 /* Else fall through to one of the other strategies... */
2615                 drbd_warn(device, "Discard younger/older primary did not find a decision\n"
2616                      "Using discard-least-changes instead\n");
2617         case ASB_DISCARD_ZERO_CHG:
2618                 if (ch_peer == 0 && ch_self == 0) {
2619                         rv = test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)
2620                                 ? -1 : 1;
2621                         break;
2622                 } else {
2623                         if (ch_peer == 0) { rv =  1; break; }
2624                         if (ch_self == 0) { rv = -1; break; }
2625                 }
2626                 if (after_sb_0p == ASB_DISCARD_ZERO_CHG)
2627                         break;
2628         case ASB_DISCARD_LEAST_CHG:
2629                 if      (ch_self < ch_peer)
2630                         rv = -1;
2631                 else if (ch_self > ch_peer)
2632                         rv =  1;
2633                 else /* ( ch_self == ch_peer ) */
2634                      /* Well, then use something else. */
2635                         rv = test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)
2636                                 ? -1 : 1;
2637                 break;
2638         case ASB_DISCARD_LOCAL:
2639                 rv = -1;
2640                 break;
2641         case ASB_DISCARD_REMOTE:
2642                 rv =  1;
2643         }
2644
2645         return rv;
2646 }
2647
2648 /**
2649  * drbd_asb_recover_1p  -  Recover after split-brain with one remaining primary
2650  */
2651 static int drbd_asb_recover_1p(struct drbd_peer_device *peer_device) __must_hold(local)
2652 {
2653         struct drbd_device *device = peer_device->device;
2654         int hg, rv = -100;
2655         enum drbd_after_sb_p after_sb_1p;
2656
2657         rcu_read_lock();
2658         after_sb_1p = rcu_dereference(peer_device->connection->net_conf)->after_sb_1p;
2659         rcu_read_unlock();
2660         switch (after_sb_1p) {
2661         case ASB_DISCARD_YOUNGER_PRI:
2662         case ASB_DISCARD_OLDER_PRI:
2663         case ASB_DISCARD_LEAST_CHG:
2664         case ASB_DISCARD_LOCAL:
2665         case ASB_DISCARD_REMOTE:
2666         case ASB_DISCARD_ZERO_CHG:
2667                 drbd_err(device, "Configuration error.\n");
2668                 break;
2669         case ASB_DISCONNECT:
2670                 break;
2671         case ASB_CONSENSUS:
2672                 hg = drbd_asb_recover_0p(peer_device);
2673                 if (hg == -1 && device->state.role == R_SECONDARY)
2674                         rv = hg;
2675                 if (hg == 1  && device->state.role == R_PRIMARY)
2676                         rv = hg;
2677                 break;
2678         case ASB_VIOLENTLY:
2679                 rv = drbd_asb_recover_0p(peer_device);
2680                 break;
2681         case ASB_DISCARD_SECONDARY:
2682                 return device->state.role == R_PRIMARY ? 1 : -1;
2683         case ASB_CALL_HELPER:
2684                 hg = drbd_asb_recover_0p(peer_device);
2685                 if (hg == -1 && device->state.role == R_PRIMARY) {
2686                         enum drbd_state_rv rv2;
2687
2688                          /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2689                           * we might be here in C_WF_REPORT_PARAMS which is transient.
2690                           * we do not need to wait for the after state change work either. */
2691                         rv2 = drbd_change_state(device, CS_VERBOSE, NS(role, R_SECONDARY));
2692                         if (rv2 != SS_SUCCESS) {
2693                                 drbd_khelper(device, "pri-lost-after-sb");
2694                         } else {
2695                                 drbd_warn(device, "Successfully gave up primary role.\n");
2696                                 rv = hg;
2697                         }
2698                 } else
2699                         rv = hg;
2700         }
2701
2702         return rv;
2703 }
2704
2705 /**
2706  * drbd_asb_recover_2p  -  Recover after split-brain with two remaining primaries
2707  */
2708 static int drbd_asb_recover_2p(struct drbd_peer_device *peer_device) __must_hold(local)
2709 {
2710         struct drbd_device *device = peer_device->device;
2711         int hg, rv = -100;
2712         enum drbd_after_sb_p after_sb_2p;
2713
2714         rcu_read_lock();
2715         after_sb_2p = rcu_dereference(peer_device->connection->net_conf)->after_sb_2p;
2716         rcu_read_unlock();
2717         switch (after_sb_2p) {
2718         case ASB_DISCARD_YOUNGER_PRI:
2719         case ASB_DISCARD_OLDER_PRI:
2720         case ASB_DISCARD_LEAST_CHG:
2721         case ASB_DISCARD_LOCAL:
2722         case ASB_DISCARD_REMOTE:
2723         case ASB_CONSENSUS:
2724         case ASB_DISCARD_SECONDARY:
2725         case ASB_DISCARD_ZERO_CHG:
2726                 drbd_err(device, "Configuration error.\n");
2727                 break;
2728         case ASB_VIOLENTLY:
2729                 rv = drbd_asb_recover_0p(peer_device);
2730                 break;
2731         case ASB_DISCONNECT:
2732                 break;
2733         case ASB_CALL_HELPER:
2734                 hg = drbd_asb_recover_0p(peer_device);
2735                 if (hg == -1) {
2736                         enum drbd_state_rv rv2;
2737
2738                          /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2739                           * we might be here in C_WF_REPORT_PARAMS which is transient.
2740                           * we do not need to wait for the after state change work either. */
2741                         rv2 = drbd_change_state(device, CS_VERBOSE, NS(role, R_SECONDARY));
2742                         if (rv2 != SS_SUCCESS) {
2743                                 drbd_khelper(device, "pri-lost-after-sb");
2744                         } else {
2745                                 drbd_warn(device, "Successfully gave up primary role.\n");
2746                                 rv = hg;
2747                         }
2748                 } else
2749                         rv = hg;
2750         }
2751
2752         return rv;
2753 }
2754
2755 static void drbd_uuid_dump(struct drbd_device *device, char *text, u64 *uuid,
2756                            u64 bits, u64 flags)
2757 {
2758         if (!uuid) {
2759                 drbd_info(device, "%s uuid info vanished while I was looking!\n", text);
2760                 return;
2761         }
2762         drbd_info(device, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
2763              text,
2764              (unsigned long long)uuid[UI_CURRENT],
2765              (unsigned long long)uuid[UI_BITMAP],
2766              (unsigned long long)uuid[UI_HISTORY_START],
2767              (unsigned long long)uuid[UI_HISTORY_END],
2768              (unsigned long long)bits,
2769              (unsigned long long)flags);
2770 }
2771
2772 /*
2773   100   after split brain try auto recover
2774     2   C_SYNC_SOURCE set BitMap
2775     1   C_SYNC_SOURCE use BitMap
2776     0   no Sync
2777    -1   C_SYNC_TARGET use BitMap
2778    -2   C_SYNC_TARGET set BitMap
2779  -100   after split brain, disconnect
2780 -1000   unrelated data
2781 -1091   requires proto 91
2782 -1096   requires proto 96
2783  */
2784 static int drbd_uuid_compare(struct drbd_device *device, int *rule_nr) __must_hold(local)
2785 {
2786         u64 self, peer;
2787         int i, j;
2788
2789         self = device->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2790         peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
2791
2792         *rule_nr = 10;
2793         if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
2794                 return 0;
2795
2796         *rule_nr = 20;
2797         if ((self == UUID_JUST_CREATED || self == (u64)0) &&
2798              peer != UUID_JUST_CREATED)
2799                 return -2;
2800
2801         *rule_nr = 30;
2802         if (self != UUID_JUST_CREATED &&
2803             (peer == UUID_JUST_CREATED || peer == (u64)0))
2804                 return 2;
2805
2806         if (self == peer) {
2807                 int rct, dc; /* roles at crash time */
2808
2809                 if (device->p_uuid[UI_BITMAP] == (u64)0 && device->ldev->md.uuid[UI_BITMAP] != (u64)0) {
2810
2811                         if (first_peer_device(device)->connection->agreed_pro_version < 91)
2812                                 return -1091;
2813
2814                         if ((device->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
2815                             (device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
2816                                 drbd_info(device, "was SyncSource, missed the resync finished event, corrected myself:\n");
2817                                 drbd_uuid_move_history(device);
2818                                 device->ldev->md.uuid[UI_HISTORY_START] = device->ldev->md.uuid[UI_BITMAP];
2819                                 device->ldev->md.uuid[UI_BITMAP] = 0;
2820
2821                                 drbd_uuid_dump(device, "self", device->ldev->md.uuid,
2822                                                device->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(device) : 0, 0);
2823                                 *rule_nr = 34;
2824                         } else {
2825                                 drbd_info(device, "was SyncSource (peer failed to write sync_uuid)\n");
2826                                 *rule_nr = 36;
2827                         }
2828
2829                         return 1;
2830                 }
2831
2832                 if (device->ldev->md.uuid[UI_BITMAP] == (u64)0 && device->p_uuid[UI_BITMAP] != (u64)0) {
2833
2834                         if (first_peer_device(device)->connection->agreed_pro_version < 91)
2835                                 return -1091;
2836
2837                         if ((device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_BITMAP] & ~((u64)1)) &&
2838                             (device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
2839                                 drbd_info(device, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
2840
2841                                 device->p_uuid[UI_HISTORY_START + 1] = device->p_uuid[UI_HISTORY_START];
2842                                 device->p_uuid[UI_HISTORY_START] = device->p_uuid[UI_BITMAP];
2843                                 device->p_uuid[UI_BITMAP] = 0UL;
2844
2845                                 drbd_uuid_dump(device, "peer", device->p_uuid, device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
2846                                 *rule_nr = 35;
2847                         } else {
2848                                 drbd_info(device, "was SyncTarget (failed to write sync_uuid)\n");
2849                                 *rule_nr = 37;
2850                         }
2851
2852                         return -1;
2853                 }
2854
2855                 /* Common power [off|failure] */
2856                 rct = (test_bit(CRASHED_PRIMARY, &device->flags) ? 1 : 0) +
2857                         (device->p_uuid[UI_FLAGS] & 2);
2858                 /* lowest bit is set when we were primary,
2859                  * next bit (weight 2) is set when peer was primary */
2860                 *rule_nr = 40;
2861
2862                 switch (rct) {
2863                 case 0: /* !self_pri && !peer_pri */ return 0;
2864                 case 1: /*  self_pri && !peer_pri */ return 1;
2865                 case 2: /* !self_pri &&  peer_pri */ return -1;
2866                 case 3: /*  self_pri &&  peer_pri */
2867                         dc = test_bit(RESOLVE_CONFLICTS, &first_peer_device(device)->connection->flags);
2868                         return dc ? -1 : 1;
2869                 }
2870         }
2871
2872         *rule_nr = 50;
2873         peer = device->p_uuid[UI_BITMAP] & ~((u64)1);
2874         if (self == peer)
2875                 return -1;
2876
2877         *rule_nr = 51;
2878         peer = device->p_uuid[UI_HISTORY_START] & ~((u64)1);
2879         if (self == peer) {
2880                 if (first_peer_device(device)->connection->agreed_pro_version < 96 ?
2881                     (device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
2882                     (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
2883                     peer + UUID_NEW_BM_OFFSET == (device->p_uuid[UI_BITMAP] & ~((u64)1))) {
2884                         /* The last P_SYNC_UUID did not get though. Undo the last start of
2885                            resync as sync source modifications of the peer's UUIDs. */
2886
2887                         if (first_peer_device(device)->connection->agreed_pro_version < 91)
2888                                 return -1091;
2889
2890                         device->p_uuid[UI_BITMAP] = device->p_uuid[UI_HISTORY_START];
2891                         device->p_uuid[UI_HISTORY_START] = device->p_uuid[UI_HISTORY_START + 1];
2892
2893                         drbd_info(device, "Lost last syncUUID packet, corrected:\n");
2894                         drbd_uuid_dump(device, "peer", device->p_uuid, device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
2895
2896                         return -1;
2897                 }
2898         }
2899
2900         *rule_nr = 60;
2901         self = device->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2902         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2903                 peer = device->p_uuid[i] & ~((u64)1);
2904                 if (self == peer)
2905                         return -2;
2906         }
2907
2908         *rule_nr = 70;
2909         self = device->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2910         peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
2911         if (self == peer)
2912                 return 1;
2913
2914         *rule_nr = 71;
2915         self = device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2916         if (self == peer) {
2917                 if (first_peer_device(device)->connection->agreed_pro_version < 96 ?
2918                     (device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
2919                     (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
2920                     self + UUID_NEW_BM_OFFSET == (device->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
2921                         /* The last P_SYNC_UUID did not get though. Undo the last start of
2922                            resync as sync source modifications of our UUIDs. */
2923
2924                         if (first_peer_device(device)->connection->agreed_pro_version < 91)
2925                                 return -1091;
2926
2927                         __drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_HISTORY_START]);
2928                         __drbd_uuid_set(device, UI_HISTORY_START, device->ldev->md.uuid[UI_HISTORY_START + 1]);
2929
2930                         drbd_info(device, "Last syncUUID did not get through, corrected:\n");
2931                         drbd_uuid_dump(device, "self", device->ldev->md.uuid,
2932                                        device->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(device) : 0, 0);
2933
2934                         return 1;
2935                 }
2936         }
2937
2938
2939         *rule_nr = 80;
2940         peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
2941         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2942                 self = device->ldev->md.uuid[i] & ~((u64)1);
2943                 if (self == peer)
2944                         return 2;
2945         }
2946
2947         *rule_nr = 90;
2948         self = device->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2949         peer = device->p_uuid[UI_BITMAP] & ~((u64)1);
2950         if (self == peer && self != ((u64)0))
2951                 return 100;
2952
2953         *rule_nr = 100;
2954         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2955                 self = device->ldev->md.uuid[i] & ~((u64)1);
2956                 for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
2957                         peer = device->p_uuid[j] & ~((u64)1);
2958                         if (self == peer)
2959                                 return -100;
2960                 }
2961         }
2962
2963         return -1000;
2964 }
2965
2966 /* drbd_sync_handshake() returns the new conn state on success, or
2967    CONN_MASK (-1) on failure.
2968  */
2969 static enum drbd_conns drbd_sync_handshake(struct drbd_peer_device *peer_device,
2970                                            enum drbd_role peer_role,
2971                                            enum drbd_disk_state peer_disk) __must_hold(local)
2972 {
2973         struct drbd_device *device = peer_device->device;
2974         enum drbd_conns rv = C_MASK;
2975         enum drbd_disk_state mydisk;
2976         struct net_conf *nc;
2977         int hg, rule_nr, rr_conflict, tentative;
2978
2979         mydisk = device->state.disk;
2980         if (mydisk == D_NEGOTIATING)
2981                 mydisk = device->new_state_tmp.disk;
2982
2983         drbd_info(device, "drbd_sync_handshake:\n");
2984
2985         spin_lock_irq(&device->ldev->md.uuid_lock);
2986         drbd_uuid_dump(device, "self", device->ldev->md.uuid, device->comm_bm_set, 0);
2987         drbd_uuid_dump(device, "peer", device->p_uuid,
2988                        device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
2989
2990         hg = drbd_uuid_compare(device, &rule_nr);
2991         spin_unlock_irq(&device->ldev->md.uuid_lock);
2992
2993         drbd_info(device, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
2994
2995         if (hg == -1000) {
2996                 drbd_alert(device, "Unrelated data, aborting!\n");
2997                 return C_MASK;
2998         }
2999         if (hg < -1000) {
3000                 drbd_alert(device, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
3001                 return C_MASK;
3002         }
3003
3004         if    ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
3005             (peer_disk == D_INCONSISTENT && mydisk    > D_INCONSISTENT)) {
3006                 int f = (hg == -100) || abs(hg) == 2;
3007                 hg = mydisk > D_INCONSISTENT ? 1 : -1;
3008                 if (f)
3009                         hg = hg*2;
3010                 drbd_info(device, "Becoming sync %s due to disk states.\n",
3011                      hg > 0 ? "source" : "target");
3012         }
3013
3014         if (abs(hg) == 100)
3015                 drbd_khelper(device, "initial-split-brain");
3016
3017         rcu_read_lock();
3018         nc = rcu_dereference(peer_device->connection->net_conf);
3019
3020         if (hg == 100 || (hg == -100 && nc->always_asbp)) {
3021                 int pcount = (device->state.role == R_PRIMARY)
3022                            + (peer_role == R_PRIMARY);
3023                 int forced = (hg == -100);
3024
3025                 switch (pcount) {
3026                 case 0:
3027                         hg = drbd_asb_recover_0p(peer_device);
3028                         break;
3029                 case 1:
3030                         hg = drbd_asb_recover_1p(peer_device);
3031                         break;
3032                 case 2:
3033                         hg = drbd_asb_recover_2p(peer_device);
3034                         break;
3035                 }
3036                 if (abs(hg) < 100) {
3037                         drbd_warn(device, "Split-Brain detected, %d primaries, "
3038                              "automatically solved. Sync from %s node\n",
3039                              pcount, (hg < 0) ? "peer" : "this");
3040                         if (forced) {
3041                                 drbd_warn(device, "Doing a full sync, since"
3042                                      " UUIDs where ambiguous.\n");
3043                                 hg = hg*2;
3044                         }
3045                 }
3046         }
3047
3048         if (hg == -100) {
3049                 if (test_bit(DISCARD_MY_DATA, &device->flags) && !(device->p_uuid[UI_FLAGS]&1))
3050                         hg = -1;
3051                 if (!test_bit(DISCARD_MY_DATA, &device->flags) && (device->p_uuid[UI_FLAGS]&1))
3052                         hg = 1;
3053
3054                 if (abs(hg) < 100)
3055                         drbd_warn(device, "Split-Brain detected, manually solved. "
3056                              "Sync from %s node\n",
3057                              (hg < 0) ? "peer" : "this");
3058         }
3059         rr_conflict = nc->rr_conflict;
3060         tentative = nc->tentative;
3061         rcu_read_unlock();
3062
3063         if (hg == -100) {
3064                 /* FIXME this log message is not correct if we end up here
3065                  * after an attempted attach on a diskless node.
3066                  * We just refuse to attach -- well, we drop the "connection"
3067                  * to that disk, in a way... */
3068                 drbd_alert(device, "Split-Brain detected but unresolved, dropping connection!\n");
3069                 drbd_khelper(device, "split-brain");
3070                 return C_MASK;
3071         }
3072
3073         if (hg > 0 && mydisk <= D_INCONSISTENT) {
3074                 drbd_err(device, "I shall become SyncSource, but I am inconsistent!\n");
3075                 return C_MASK;
3076         }
3077
3078         if (hg < 0 && /* by intention we do not use mydisk here. */
3079             device->state.role == R_PRIMARY && device->state.disk >= D_CONSISTENT) {
3080                 switch (rr_conflict) {
3081                 case ASB_CALL_HELPER:
3082                         drbd_khelper(device, "pri-lost");
3083                         /* fall through */
3084                 case ASB_DISCONNECT:
3085                         drbd_err(device, "I shall become SyncTarget, but I am primary!\n");
3086                         return C_MASK;
3087                 case ASB_VIOLENTLY:
3088                         drbd_warn(device, "Becoming SyncTarget, violating the stable-data"
3089                              "assumption\n");
3090                 }
3091         }
3092
3093         if (tentative || test_bit(CONN_DRY_RUN, &peer_device->connection->flags)) {
3094                 if (hg == 0)
3095                         drbd_info(device, "dry-run connect: No resync, would become Connected immediately.\n");
3096                 else
3097                         drbd_info(device, "dry-run connect: Would become %s, doing a %s resync.",
3098                                  drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
3099                                  abs(hg) >= 2 ? "full" : "bit-map based");
3100                 return C_MASK;
3101         }
3102
3103         if (abs(hg) >= 2) {
3104                 drbd_info(device, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
3105                 if (drbd_bitmap_io(device, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
3106                                         BM_LOCKED_SET_ALLOWED))
3107                         return C_MASK;
3108         }
3109
3110         if (hg > 0) { /* become sync source. */
3111                 rv = C_WF_BITMAP_S;
3112         } else if (hg < 0) { /* become sync target */
3113                 rv = C_WF_BITMAP_T;
3114         } else {
3115                 rv = C_CONNECTED;
3116                 if (drbd_bm_total_weight(device)) {
3117                         drbd_info(device, "No resync, but %lu bits in bitmap!\n",
3118                              drbd_bm_total_weight(device));
3119                 }
3120         }
3121
3122         return rv;
3123 }
3124
3125 static enum drbd_after_sb_p convert_after_sb(enum drbd_after_sb_p peer)
3126 {
3127         /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
3128         if (peer == ASB_DISCARD_REMOTE)
3129                 return ASB_DISCARD_LOCAL;
3130
3131         /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
3132         if (peer == ASB_DISCARD_LOCAL)
3133                 return ASB_DISCARD_REMOTE;
3134
3135         /* everything else is valid if they are equal on both sides. */
3136         return peer;
3137 }
3138
3139 static int receive_protocol(struct drbd_connection *connection, struct packet_info *pi)
3140 {
3141         struct p_protocol *p = pi->data;
3142         enum drbd_after_sb_p p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
3143         int p_proto, p_discard_my_data, p_two_primaries, cf;
3144         struct net_conf *nc, *old_net_conf, *new_net_conf = NULL;
3145         char integrity_alg[SHARED_SECRET_MAX] = "";
3146         struct crypto_hash *peer_integrity_tfm = NULL;
3147         void *int_dig_in = NULL, *int_dig_vv = NULL;
3148
3149         p_proto         = be32_to_cpu(p->protocol);
3150         p_after_sb_0p   = be32_to_cpu(p->after_sb_0p);
3151         p_after_sb_1p   = be32_to_cpu(p->after_sb_1p);
3152         p_after_sb_2p   = be32_to_cpu(p->after_sb_2p);
3153         p_two_primaries = be32_to_cpu(p->two_primaries);
3154         cf              = be32_to_cpu(p->conn_flags);
3155         p_discard_my_data = cf & CF_DISCARD_MY_DATA;
3156
3157         if (connection->agreed_pro_version >= 87) {
3158                 int err;
3159
3160                 if (pi->size > sizeof(integrity_alg))
3161                         return -EIO;
3162                 err = drbd_recv_all(connection, integrity_alg, pi->size);
3163                 if (err)
3164                         return err;
3165                 integrity_alg[SHARED_SECRET_MAX - 1] = 0;
3166         }
3167
3168         if (pi->cmd != P_PROTOCOL_UPDATE) {
3169                 clear_bit(CONN_DRY_RUN, &connection->flags);
3170
3171                 if (cf & CF_DRY_RUN)
3172                         set_bit(CONN_DRY_RUN, &connection->flags);
3173
3174                 rcu_read_lock();
3175                 nc = rcu_dereference(connection->net_conf);
3176
3177                 if (p_proto != nc->wire_protocol) {
3178                         drbd_err(connection, "incompatible %s settings\n", "protocol");
3179                         goto disconnect_rcu_unlock;
3180                 }
3181
3182                 if (convert_after_sb(p_after_sb_0p) != nc->after_sb_0p) {
3183                         drbd_err(connection, "incompatible %s settings\n", "after-sb-0pri");
3184                         goto disconnect_rcu_unlock;
3185                 }
3186
3187                 if (convert_after_sb(p_after_sb_1p) != nc->after_sb_1p) {
3188                         drbd_err(connection, "incompatible %s settings\n", "after-sb-1pri");
3189                         goto disconnect_rcu_unlock;
3190                 }
3191
3192                 if (convert_after_sb(p_after_sb_2p) != nc->after_sb_2p) {
3193                         drbd_err(connection, "incompatible %s settings\n", "after-sb-2pri");
3194                         goto disconnect_rcu_unlock;
3195                 }
3196
3197                 if (p_discard_my_data && nc->discard_my_data) {
3198                         drbd_err(connection, "incompatible %s settings\n", "discard-my-data");
3199                         goto disconnect_rcu_unlock;
3200                 }
3201
3202                 if (p_two_primaries != nc->two_primaries) {
3203                         drbd_err(connection, "incompatible %s settings\n", "allow-two-primaries");
3204                         goto disconnect_rcu_unlock;
3205                 }
3206
3207                 if (strcmp(integrity_alg, nc->integrity_alg)) {
3208                         drbd_err(connection, "incompatible %s settings\n", "data-integrity-alg");
3209                         goto disconnect_rcu_unlock;
3210                 }
3211
3212                 rcu_read_unlock();
3213         }
3214
3215         if (integrity_alg[0]) {
3216                 int hash_size;
3217
3218                 /*
3219                  * We can only change the peer data integrity algorithm
3220                  * here.  Changing our own data integrity algorithm
3221                  * requires that we send a P_PROTOCOL_UPDATE packet at
3222                  * the same time; otherwise, the peer has no way to
3223                  * tell between which packets the algorithm should
3224                  * change.
3225                  */
3226
3227                 peer_integrity_tfm = crypto_alloc_hash(integrity_alg, 0, CRYPTO_ALG_ASYNC);
3228                 if (!peer_integrity_tfm) {
3229                         drbd_err(connection, "peer data-integrity-alg %s not supported\n",
3230                                  integrity_alg);
3231                         goto disconnect;
3232                 }
3233
3234                 hash_size = crypto_hash_digestsize(peer_integrity_tfm);
3235                 int_dig_in = kmalloc(hash_size, GFP_KERNEL);
3236                 int_dig_vv = kmalloc(hash_size, GFP_KERNEL);
3237                 if (!(int_dig_in && int_dig_vv)) {
3238                         drbd_err(connection, "Allocation of buffers for data integrity checking failed\n");
3239                         goto disconnect;
3240                 }
3241         }
3242
3243         new_net_conf = kmalloc(sizeof(struct net_conf), GFP_KERNEL);
3244         if (!new_net_conf) {
3245                 drbd_err(connection, "Allocation of new net_conf failed\n");
3246                 goto disconnect;
3247         }
3248
3249         mutex_lock(&connection->data.mutex);
3250         mutex_lock(&connection->resource->conf_update);
3251         old_net_conf = connection->net_conf;
3252         *new_net_conf = *old_net_conf;
3253
3254         new_net_conf->wire_protocol = p_proto;
3255         new_net_conf->after_sb_0p = convert_after_sb(p_after_sb_0p);
3256         new_net_conf->after_sb_1p = convert_after_sb(p_after_sb_1p);
3257         new_net_conf->after_sb_2p = convert_after_sb(p_after_sb_2p);
3258         new_net_conf->two_primaries = p_two_primaries;
3259
3260         rcu_assign_pointer(connection->net_conf, new_net_conf);
3261         mutex_unlock(&connection->resource->conf_update);
3262         mutex_unlock(&connection->data.mutex);
3263
3264         crypto_free_hash(connection->peer_integrity_tfm);
3265         kfree(connection->int_dig_in);
3266         kfree(connection->int_dig_vv);
3267         connection->peer_integrity_tfm = peer_integrity_tfm;
3268         connection->int_dig_in = int_dig_in;
3269         connection->int_dig_vv = int_dig_vv;
3270
3271         if (strcmp(old_net_conf->integrity_alg, integrity_alg))
3272                 drbd_info(connection, "peer data-integrity-alg: %s\n",
3273                           integrity_alg[0] ? integrity_alg : "(none)");
3274
3275         synchronize_rcu();
3276         kfree(old_net_conf);
3277         return 0;
3278
3279 disconnect_rcu_unlock:
3280         rcu_read_unlock();
3281 disconnect:
3282         crypto_free_hash(peer_integrity_tfm);
3283         kfree(int_dig_in);
3284         kfree(int_dig_vv);
3285         conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
3286         return -EIO;
3287 }
3288
3289 /* helper function
3290  * input: alg name, feature name
3291  * return: NULL (alg name was "")
3292  *         ERR_PTR(error) if something goes wrong
3293  *         or the crypto hash ptr, if it worked out ok. */
3294 static
3295 struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_device *device,
3296                 const char *alg, const char *name)
3297 {
3298         struct crypto_hash *tfm;
3299
3300         if (!alg[0])
3301                 return NULL;
3302
3303         tfm = crypto_alloc_hash(alg, 0, CRYPTO_ALG_ASYNC);
3304         if (IS_ERR(tfm)) {
3305                 drbd_err(device, "Can not allocate \"%s\" as %s (reason: %ld)\n",
3306                         alg, name, PTR_ERR(tfm));
3307                 return tfm;
3308         }
3309         return tfm;
3310 }
3311
3312 static int ignore_remaining_packet(struct drbd_connection *connection, struct packet_info *pi)
3313 {
3314         void *buffer = connection->data.rbuf;
3315         int size = pi->size;
3316
3317         while (size) {
3318                 int s = min_t(int, size, DRBD_SOCKET_BUFFER_SIZE);
3319                 s = drbd_recv(connection, buffer, s);
3320                 if (s <= 0) {
3321                         if (s < 0)
3322                                 return s;
3323                         break;
3324                 }
3325                 size -= s;
3326         }
3327         if (size)
3328                 return -EIO;
3329         return 0;
3330 }
3331
3332 /*
3333  * config_unknown_volume  -  device configuration command for unknown volume
3334  *
3335  * When a device is added to an existing connection, the node on which the
3336  * device is added first will send configuration commands to its peer but the
3337  * peer will not know about the device yet.  It will warn and ignore these
3338  * commands.  Once the device is added on the second node, the second node will
3339  * send the same device configuration commands, but in the other direction.
3340  *
3341  * (We can also end up here if drbd is misconfigured.)
3342  */
3343 static int config_unknown_volume(struct drbd_connection *connection, struct packet_info *pi)
3344 {
3345         drbd_warn(connection, "%s packet received for volume %u, which is not configured locally\n",
3346                   cmdname(pi->cmd), pi->vnr);
3347         return ignore_remaining_packet(connection, pi);
3348 }
3349
3350 static int receive_SyncParam(struct drbd_connection *connection, struct packet_info *pi)
3351 {
3352         struct drbd_peer_device *peer_device;
3353         struct drbd_device *device;
3354         struct p_rs_param_95 *p;
3355         unsigned int header_size, data_size, exp_max_sz;
3356         struct crypto_hash *verify_tfm = NULL;
3357         struct crypto_hash *csums_tfm = NULL;
3358         struct net_conf *old_net_conf, *new_net_conf = NULL;
3359         struct disk_conf *old_disk_conf = NULL, *new_disk_conf = NULL;
3360         const int apv = connection->agreed_pro_version;
3361         struct fifo_buffer *old_plan = NULL, *new_plan = NULL;
3362         int fifo_size = 0;
3363         int err;
3364
3365         peer_device = conn_peer_device(connection, pi->vnr);
3366         if (!peer_device)
3367                 return config_unknown_volume(connection, pi);
3368         device = peer_device->device;
3369
3370         exp_max_sz  = apv <= 87 ? sizeof(struct p_rs_param)
3371                     : apv == 88 ? sizeof(struct p_rs_param)
3372                                         + SHARED_SECRET_MAX
3373                     : apv <= 94 ? sizeof(struct p_rs_param_89)
3374                     : /* apv >= 95 */ sizeof(struct p_rs_param_95);
3375
3376         if (pi->size > exp_max_sz) {
3377                 drbd_err(device, "SyncParam packet too long: received %u, expected <= %u bytes\n",
3378                     pi->size, exp_max_sz);
3379                 return -EIO;
3380         }
3381
3382         if (apv <= 88) {
3383                 header_size = sizeof(struct p_rs_param);
3384                 data_size = pi->size - header_size;
3385         } else if (apv <= 94) {
3386                 header_size = sizeof(struct p_rs_param_89);
3387                 data_size = pi->size - header_size;
3388                 D_ASSERT(device, data_size == 0);
3389         } else {
3390                 header_size = sizeof(struct p_rs_param_95);
3391                 data_size = pi->size - header_size;
3392                 D_ASSERT(device, data_size == 0);
3393         }
3394
3395         /* initialize verify_alg and csums_alg */
3396         p = pi->data;
3397         memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
3398
3399         err = drbd_recv_all(peer_device->connection, p, header_size);
3400         if (err)
3401                 return err;
3402
3403         mutex_lock(&connection->resource->conf_update);
3404         old_net_conf = peer_device->connection->net_conf;
3405         if (get_ldev(device)) {
3406                 new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3407                 if (!new_disk_conf) {
3408                         put_ldev(device);
3409                         mutex_unlock(&connection->resource->conf_update);
3410                         drbd_err(device, "Allocation of new disk_conf failed\n");
3411                         return -ENOMEM;
3412                 }
3413
3414                 old_disk_conf = device->ldev->disk_conf;
3415                 *new_disk_conf = *old_disk_conf;
3416
3417                 new_disk_conf->resync_rate = be32_to_cpu(p->resync_rate);
3418         }
3419
3420         if (apv >= 88) {
3421                 if (apv == 88) {
3422                         if (data_size > SHARED_SECRET_MAX || data_size == 0) {
3423                                 drbd_err(device, "verify-alg of wrong size, "
3424                                         "peer wants %u, accepting only up to %u byte\n",
3425                                         data_size, SHARED_SECRET_MAX);
3426                                 err = -EIO;
3427                                 goto reconnect;
3428                         }
3429
3430                         err = drbd_recv_all(peer_device->connection, p->verify_alg, data_size);
3431                         if (err)
3432                                 goto reconnect;
3433                         /* we expect NUL terminated string */
3434                         /* but just in case someone tries to be evil */
3435                         D_ASSERT(device, p->verify_alg[data_size-1] == 0);
3436                         p->verify_alg[data_size-1] = 0;
3437
3438                 } else /* apv >= 89 */ {
3439                         /* we still expect NUL terminated strings */
3440                         /* but just in case someone tries to be evil */
3441                         D_ASSERT(device, p->verify_alg[SHARED_SECRET_MAX-1] == 0);
3442                         D_ASSERT(device, p->csums_alg[SHARED_SECRET_MAX-1] == 0);
3443                         p->verify_alg[SHARED_SECRET_MAX-1] = 0;
3444                         p->csums_alg[SHARED_SECRET_MAX-1] = 0;
3445                 }
3446
3447                 if (strcmp(old_net_conf->verify_alg, p->verify_alg)) {
3448                         if (device->state.conn == C_WF_REPORT_PARAMS) {
3449                                 drbd_err(device, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
3450                                     old_net_conf->verify_alg, p->verify_alg);
3451                                 goto disconnect;
3452                         }
3453                         verify_tfm = drbd_crypto_alloc_digest_safe(device,
3454                                         p->verify_alg, "verify-alg");
3455                         if (IS_ERR(verify_tfm)) {
3456                                 verify_tfm = NULL;
3457                                 goto disconnect;
3458                         }
3459                 }
3460
3461                 if (apv >= 89 && strcmp(old_net_conf->csums_alg, p->csums_alg)) {
3462                         if (device->state.conn == C_WF_REPORT_PARAMS) {
3463                                 drbd_err(device, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
3464                                     old_net_conf->csums_alg, p->csums_alg);
3465                                 goto disconnect;
3466                         }
3467                         csums_tfm = drbd_crypto_alloc_digest_safe(device,
3468                                         p->csums_alg, "csums-alg");
3469                         if (IS_ERR(csums_tfm)) {
3470                                 csums_tfm = NULL;
3471                                 goto disconnect;
3472                         }
3473                 }
3474
3475                 if (apv > 94 && new_disk_conf) {
3476                         new_disk_conf->c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
3477                         new_disk_conf->c_delay_target = be32_to_cpu(p->c_delay_target);
3478                         new_disk_conf->c_fill_target = be32_to_cpu(p->c_fill_target);
3479                         new_disk_conf->c_max_rate = be32_to_cpu(p->c_max_rate);
3480
3481                         fifo_size = (new_disk_conf->c_plan_ahead * 10 * SLEEP_TIME) / HZ;
3482                         if (fifo_size != device->rs_plan_s->size) {
3483                                 new_plan = fifo_alloc(fifo_size);
3484                                 if (!new_plan) {
3485                                         drbd_err(device, "kmalloc of fifo_buffer failed");
3486                                         put_ldev(device);
3487                                         goto disconnect;
3488                                 }
3489                         }
3490                 }
3491
3492                 if (verify_tfm || csums_tfm) {
3493                         new_net_conf = kzalloc(sizeof(struct net_conf), GFP_KERNEL);
3494                         if (!new_net_conf) {
3495                                 drbd_err(device, "Allocation of new net_conf failed\n");
3496                                 goto disconnect;
3497                         }
3498
3499                         *new_net_conf = *old_net_conf;
3500
3501                         if (verify_tfm) {
3502                                 strcpy(new_net_conf->verify_alg, p->verify_alg);
3503                                 new_net_conf->verify_alg_len = strlen(p->verify_alg) + 1;
3504                                 crypto_free_hash(peer_device->connection->verify_tfm);
3505                                 peer_device->connection->verify_tfm = verify_tfm;
3506                                 drbd_info(device, "using verify-alg: \"%s\"\n", p->verify_alg);
3507                         }
3508                         if (csums_tfm) {
3509                                 strcpy(new_net_conf->csums_alg, p->csums_alg);
3510                                 new_net_conf->csums_alg_len = strlen(p->csums_alg) + 1;
3511                                 crypto_free_hash(peer_device->connection->csums_tfm);
3512                                 peer_device->connection->csums_tfm = csums_tfm;
3513                                 drbd_info(device, "using csums-alg: \"%s\"\n", p->csums_alg);
3514                         }
3515                         rcu_assign_pointer(connection->net_conf, new_net_conf);
3516                 }
3517         }
3518
3519         if (new_disk_conf) {
3520                 rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf);
3521                 put_ldev(device);
3522         }
3523
3524         if (new_plan) {
3525                 old_plan = device->rs_plan_s;
3526                 rcu_assign_pointer(device->rs_plan_s, new_plan);
3527         }
3528
3529         mutex_unlock(&connection->resource->conf_update);
3530         synchronize_rcu();
3531         if (new_net_conf)
3532                 kfree(old_net_conf);
3533         kfree(old_disk_conf);
3534         kfree(old_plan);
3535
3536         return 0;
3537
3538 reconnect:
3539         if (new_disk_conf) {
3540                 put_ldev(device);
3541                 kfree(new_disk_conf);
3542         }
3543         mutex_unlock(&connection->resource->conf_update);
3544         return -EIO;
3545
3546 disconnect:
3547         kfree(new_plan);
3548         if (new_disk_conf) {
3549                 put_ldev(device);
3550                 kfree(new_disk_conf);
3551         }
3552         mutex_unlock(&connection->resource->conf_update);
3553         /* just for completeness: actually not needed,
3554          * as this is not reached if csums_tfm was ok. */
3555         crypto_free_hash(csums_tfm);
3556         /* but free the verify_tfm again, if csums_tfm did not work out */
3557         crypto_free_hash(verify_tfm);
3558         conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
3559         return -EIO;
3560 }
3561
3562 /* warn if the arguments differ by more than 12.5% */
3563 static void warn_if_differ_considerably(struct drbd_device *device,
3564         const char *s, sector_t a, sector_t b)
3565 {
3566         sector_t d;
3567         if (a == 0 || b == 0)
3568                 return;
3569         d = (a > b) ? (a - b) : (b - a);
3570         if (d > (a>>3) || d > (b>>3))
3571                 drbd_warn(device, "Considerable difference in %s: %llus vs. %llus\n", s,
3572                      (unsigned long long)a, (unsigned long long)b);
3573 }
3574
3575 static int receive_sizes(struct drbd_connection *connection, struct packet_info *pi)
3576 {
3577         struct drbd_peer_device *peer_device;
3578         struct drbd_device *device;
3579         struct p_sizes *p = pi->data;
3580         enum determine_dev_size dd = DS_UNCHANGED;
3581         sector_t p_size, p_usize, my_usize;
3582         int ldsc = 0; /* local disk size changed */
3583         enum dds_flags ddsf;
3584
3585         peer_device = conn_peer_device(connection, pi->vnr);
3586         if (!peer_device)
3587                 return config_unknown_volume(connection, pi);
3588         device = peer_device->device;
3589
3590         p_size = be64_to_cpu(p->d_size);
3591         p_usize = be64_to_cpu(p->u_size);
3592
3593         /* just store the peer's disk size for now.
3594          * we still need to figure out whether we accept that. */
3595         device->p_size = p_size;
3596
3597         if (get_ldev(device)) {
3598                 rcu_read_lock();
3599                 my_usize = rcu_dereference(device->ldev->disk_conf)->disk_size;
3600                 rcu_read_unlock();
3601
3602                 warn_if_differ_considerably(device, "lower level device sizes",
3603                            p_size, drbd_get_max_capacity(device->ldev));
3604                 warn_if_differ_considerably(device, "user requested size",
3605                                             p_usize, my_usize);
3606
3607                 /* if this is the first connect, or an otherwise expected
3608                  * param exchange, choose the minimum */
3609                 if (device->state.conn == C_WF_REPORT_PARAMS)
3610                         p_usize = min_not_zero(my_usize, p_usize);
3611
3612                 /* Never shrink a device with usable data during connect.
3613                    But allow online shrinking if we are connected. */
3614                 if (drbd_new_dev_size(device, device->ldev, p_usize, 0) <
3615                     drbd_get_capacity(device->this_bdev) &&
3616                     device->state.disk >= D_OUTDATED &&
3617                     device->state.conn < C_CONNECTED) {
3618                         drbd_err(device, "The peer's disk size is too small!\n");
3619                         conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
3620                         put_ldev(device);
3621                         return -EIO;
3622                 }
3623
3624                 if (my_usize != p_usize) {
3625                         struct disk_conf *old_disk_conf, *new_disk_conf = NULL;
3626
3627                         new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3628                         if (!new_disk_conf) {
3629                                 drbd_err(device, "Allocation of new disk_conf failed\n");
3630                                 put_ldev(device);
3631                                 return -ENOMEM;
3632                         }
3633
3634                         mutex_lock(&connection->resource->conf_update);
3635                         old_disk_conf = device->ldev->disk_conf;
3636                         *new_disk_conf = *old_disk_conf;
3637                         new_disk_conf->disk_size = p_usize;
3638
3639                         rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf);
3640                         mutex_unlock(&connection->resource->conf_update);
3641                         synchronize_rcu();
3642                         kfree(old_disk_conf);
3643
3644                         drbd_info(device, "Peer sets u_size to %lu sectors\n",
3645                                  (unsigned long)my_usize);
3646                 }
3647
3648                 put_ldev(device);
3649         }
3650
3651         ddsf = be16_to_cpu(p->dds_flags);
3652         if (get_ldev(device)) {
3653                 dd = drbd_determine_dev_size(device, ddsf, NULL);
3654                 put_ldev(device);
3655                 if (dd == DS_ERROR)
3656                         return -EIO;
3657                 drbd_md_sync(device);
3658         } else {
3659                 /* I am diskless, need to accept the peer's size. */
3660                 drbd_set_my_capacity(device, p_size);
3661         }
3662
3663         device->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
3664         drbd_reconsider_max_bio_size(device);
3665
3666         if (get_ldev(device)) {
3667                 if (device->ldev->known_size != drbd_get_capacity(device->ldev->backing_bdev)) {
3668                         device->ldev->known_size = drbd_get_capacity(device->ldev->backing_bdev);
3669                         ldsc = 1;
3670                 }
3671
3672                 put_ldev(device);
3673         }
3674
3675         if (device->state.conn > C_WF_REPORT_PARAMS) {
3676                 if (be64_to_cpu(p->c_size) !=
3677                     drbd_get_capacity(device->this_bdev) || ldsc) {
3678                         /* we have different sizes, probably peer
3679                          * needs to know my new size... */
3680                         drbd_send_sizes(peer_device, 0, ddsf);
3681                 }
3682                 if (test_and_clear_bit(RESIZE_PENDING, &device->flags) ||
3683                     (dd == DS_GREW && device->state.conn == C_CONNECTED)) {
3684                         if (device->state.pdsk >= D_INCONSISTENT &&
3685                             device->state.disk >= D_INCONSISTENT) {
3686                                 if (ddsf & DDSF_NO_RESYNC)
3687                                         drbd_info(device, "Resync of new storage suppressed with --assume-clean\n");
3688                                 else
3689                                         resync_after_online_grow(device);
3690                         } else
3691                                 set_bit(RESYNC_AFTER_NEG, &device->flags);
3692                 }
3693         }
3694
3695         return 0;
3696 }
3697
3698 static int receive_uuids(struct drbd_connection *connection, struct packet_info *pi)
3699 {
3700         struct drbd_peer_device *peer_device;
3701         struct drbd_device *device;
3702         struct p_uuids *p = pi->data;
3703         u64 *p_uuid;
3704         int i, updated_uuids = 0;
3705
3706         peer_device = conn_peer_device(connection, pi->vnr);
3707         if (!peer_device)
3708                 return config_unknown_volume(connection, pi);
3709         device = peer_device->device;
3710
3711         p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
3712         if (!p_uuid) {
3713                 drbd_err(device, "kmalloc of p_uuid failed\n");
3714                 return false;
3715         }
3716
3717         for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
3718                 p_uuid[i] = be64_to_cpu(p->uuid[i]);
3719
3720         kfree(device->p_uuid);
3721         device->p_uuid = p_uuid;
3722
3723         if (device->state.conn < C_CONNECTED &&
3724             device->state.disk < D_INCONSISTENT &&
3725             device->state.role == R_PRIMARY &&
3726             (device->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
3727                 drbd_err(device, "Can only connect to data with current UUID=%016llX\n",
3728                     (unsigned long long)device->ed_uuid);
3729                 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
3730                 return -EIO;
3731         }
3732
3733         if (get_ldev(device)) {
3734                 int skip_initial_sync =
3735                         device->state.conn == C_CONNECTED &&
3736                         peer_device->connection->agreed_pro_version >= 90 &&
3737                         device->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
3738                         (p_uuid[UI_FLAGS] & 8);
3739                 if (skip_initial_sync) {
3740                         drbd_info(device, "Accepted new current UUID, preparing to skip initial sync\n");
3741                         drbd_bitmap_io(device, &drbd_bmio_clear_n_write,
3742                                         "clear_n_write from receive_uuids",
3743                                         BM_LOCKED_TEST_ALLOWED);
3744                         _drbd_uuid_set(device, UI_CURRENT, p_uuid[UI_CURRENT]);
3745                         _drbd_uuid_set(device, UI_BITMAP, 0);
3746                         _drbd_set_state(_NS2(device, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
3747                                         CS_VERBOSE, NULL);
3748                         drbd_md_sync(device);
3749                         updated_uuids = 1;
3750                 }
3751                 put_ldev(device);
3752         } else if (device->state.disk < D_INCONSISTENT &&
3753                    device->state.role == R_PRIMARY) {
3754                 /* I am a diskless primary, the peer just created a new current UUID
3755                    for me. */
3756                 updated_uuids = drbd_set_ed_uuid(device, p_uuid[UI_CURRENT]);
3757         }
3758
3759         /* Before we test for the disk state, we should wait until an eventually
3760            ongoing cluster wide state change is finished. That is important if
3761            we are primary and are detaching from our disk. We need to see the
3762            new disk state... */
3763         mutex_lock(device->state_mutex);
3764         mutex_unlock(device->state_mutex);
3765         if (device->state.conn >= C_CONNECTED && device->state.disk < D_INCONSISTENT)
3766                 updated_uuids |= drbd_set_ed_uuid(device, p_uuid[UI_CURRENT]);
3767
3768         if (updated_uuids)
3769                 drbd_print_uuids(device, "receiver updated UUIDs to");
3770
3771         return 0;
3772 }
3773
3774 /**
3775  * convert_state() - Converts the peer's view of the cluster state to our point of view
3776  * @ps:         The state as seen by the peer.
3777  */
3778 static union drbd_state convert_state(union drbd_state ps)
3779 {
3780         union drbd_state ms;
3781
3782         static enum drbd_conns c_tab[] = {
3783                 [C_WF_REPORT_PARAMS] = C_WF_REPORT_PARAMS,
3784                 [C_CONNECTED] = C_CONNECTED,
3785
3786                 [C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
3787                 [C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
3788                 [C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
3789                 [C_VERIFY_S]       = C_VERIFY_T,
3790                 [C_MASK]   = C_MASK,
3791         };
3792
3793         ms.i = ps.i;
3794
3795         ms.conn = c_tab[ps.conn];
3796         ms.peer = ps.role;
3797         ms.role = ps.peer;
3798         ms.pdsk = ps.disk;
3799         ms.disk = ps.pdsk;
3800         ms.peer_isp = (ps.aftr_isp | ps.user_isp);
3801
3802         return ms;
3803 }
3804
3805 static int receive_req_state(struct drbd_connection *connection, struct packet_info *pi)
3806 {
3807         struct drbd_peer_device *peer_device;
3808         struct drbd_device *device;
3809         struct p_req_state *p = pi->data;
3810         union drbd_state mask, val;
3811         enum drbd_state_rv rv;
3812
3813         peer_device = conn_peer_device(connection, pi->vnr);
3814         if (!peer_device)
3815                 return -EIO;
3816         device = peer_device->device;
3817
3818         mask.i = be32_to_cpu(p->mask);
3819         val.i = be32_to_cpu(p->val);
3820
3821         if (test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags) &&
3822             mutex_is_locked(device->state_mutex)) {
3823                 drbd_send_sr_reply(peer_device, SS_CONCURRENT_ST_CHG);
3824                 return 0;
3825         }
3826
3827         mask = convert_state(mask);
3828         val = convert_state(val);
3829
3830         rv = drbd_change_state(device, CS_VERBOSE, mask, val);
3831         drbd_send_sr_reply(peer_device, rv);
3832
3833         drbd_md_sync(device);
3834
3835         return 0;
3836 }
3837
3838 static int receive_req_conn_state(struct drbd_connection *connection, struct packet_info *pi)
3839 {
3840         struct p_req_state *p = pi->data;
3841         union drbd_state mask, val;
3842         enum drbd_state_rv rv;
3843
3844         mask.i = be32_to_cpu(p->mask);
3845         val.i = be32_to_cpu(p->val);
3846
3847         if (test_bit(RESOLVE_CONFLICTS, &connection->flags) &&
3848             mutex_is_locked(&connection->cstate_mutex)) {
3849                 conn_send_sr_reply(connection, SS_CONCURRENT_ST_CHG);
3850                 return 0;
3851         }
3852
3853         mask = convert_state(mask);
3854         val = convert_state(val);
3855
3856         rv = conn_request_state(connection, mask, val, CS_VERBOSE | CS_LOCAL_ONLY | CS_IGN_OUTD_FAIL);
3857         conn_send_sr_reply(connection, rv);
3858
3859         return 0;
3860 }
3861
3862 static int receive_state(struct drbd_connection *connection, struct packet_info *pi)
3863 {
3864         struct drbd_peer_device *peer_device;
3865         struct drbd_device *device;
3866         struct p_state *p = pi->data;
3867         union drbd_state os, ns, peer_state;
3868         enum drbd_disk_state real_peer_disk;
3869         enum chg_state_flags cs_flags;
3870         int rv;
3871
3872         peer_device = conn_peer_device(connection, pi->vnr);
3873         if (!peer_device)
3874                 return config_unknown_volume(connection, pi);
3875         device = peer_device->device;
3876
3877         peer_state.i = be32_to_cpu(p->state);
3878
3879         real_peer_disk = peer_state.disk;
3880         if (peer_state.disk == D_NEGOTIATING) {
3881                 real_peer_disk = device->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
3882                 drbd_info(device, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
3883         }
3884
3885         spin_lock_irq(&device->resource->req_lock);
3886  retry:
3887         os = ns = drbd_read_state(device);
3888         spin_unlock_irq(&device->resource->req_lock);
3889
3890         /* If some other part of the code (asender thread, timeout)
3891          * already decided to close the connection again,
3892          * we must not "re-establish" it here. */
3893         if (os.conn <= C_TEAR_DOWN)
3894                 return -ECONNRESET;
3895
3896         /* If this is the "end of sync" confirmation, usually the peer disk
3897          * transitions from D_INCONSISTENT to D_UP_TO_DATE. For empty (0 bits
3898          * set) resync started in PausedSyncT, or if the timing of pause-/
3899          * unpause-sync events has been "just right", the peer disk may
3900          * transition from D_CONSISTENT to D_UP_TO_DATE as well.
3901          */
3902         if ((os.pdsk == D_INCONSISTENT || os.pdsk == D_CONSISTENT) &&
3903             real_peer_disk == D_UP_TO_DATE &&
3904             os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
3905                 /* If we are (becoming) SyncSource, but peer is still in sync
3906                  * preparation, ignore its uptodate-ness to avoid flapping, it
3907                  * will change to inconsistent once the peer reaches active
3908                  * syncing states.
3909                  * It may have changed syncer-paused flags, however, so we
3910                  * cannot ignore this completely. */
3911                 if (peer_state.conn > C_CONNECTED &&
3912                     peer_state.conn < C_SYNC_SOURCE)
3913                         real_peer_disk = D_INCONSISTENT;
3914
3915                 /* if peer_state changes to connected at the same time,
3916                  * it explicitly notifies us that it finished resync.
3917                  * Maybe we should finish it up, too? */
3918                 else if (os.conn >= C_SYNC_SOURCE &&
3919                          peer_state.conn == C_CONNECTED) {
3920                         if (drbd_bm_total_weight(device) <= device->rs_failed)
3921                                 drbd_resync_finished(device);
3922                         return 0;
3923                 }
3924         }
3925
3926         /* explicit verify finished notification, stop sector reached. */
3927         if (os.conn == C_VERIFY_T && os.disk == D_UP_TO_DATE &&
3928             peer_state.conn == C_CONNECTED && real_peer_disk == D_UP_TO_DATE) {
3929                 ov_out_of_sync_print(device);
3930                 drbd_resync_finished(device);
3931                 return 0;
3932         }
3933
3934         /* peer says his disk is inconsistent, while we think it is uptodate,
3935          * and this happens while the peer still thinks we have a sync going on,
3936          * but we think we are already done with the sync.
3937          * We ignore this to avoid flapping pdsk.
3938          * This should not happen, if the peer is a recent version of drbd. */
3939         if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
3940             os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
3941                 real_peer_disk = D_UP_TO_DATE;
3942
3943         if (ns.conn == C_WF_REPORT_PARAMS)
3944                 ns.conn = C_CONNECTED;
3945
3946         if (peer_state.conn == C_AHEAD)
3947                 ns.conn = C_BEHIND;
3948
3949         if (device->p_uuid && peer_state.disk >= D_NEGOTIATING &&
3950             get_ldev_if_state(device, D_NEGOTIATING)) {
3951                 int cr; /* consider resync */
3952
3953                 /* if we established a new connection */
3954                 cr  = (os.conn < C_CONNECTED);
3955                 /* if we had an established connection
3956                  * and one of the nodes newly attaches a disk */
3957                 cr |= (os.conn == C_CONNECTED &&
3958                        (peer_state.disk == D_NEGOTIATING ||
3959                         os.disk == D_NEGOTIATING));
3960                 /* if we have both been inconsistent, and the peer has been
3961                  * forced to be UpToDate with --overwrite-data */
3962                 cr |= test_bit(CONSIDER_RESYNC, &device->flags);
3963                 /* if we had been plain connected, and the admin requested to
3964                  * start a sync by "invalidate" or "invalidate-remote" */
3965                 cr |= (os.conn == C_CONNECTED &&
3966                                 (peer_state.conn >= C_STARTING_SYNC_S &&
3967                                  peer_state.conn <= C_WF_BITMAP_T));
3968
3969                 if (cr)
3970                         ns.conn = drbd_sync_handshake(peer_device, peer_state.role, real_peer_disk);
3971
3972                 put_ldev(device);
3973                 if (ns.conn == C_MASK) {
3974                         ns.conn = C_CONNECTED;
3975                         if (device->state.disk == D_NEGOTIATING) {
3976                                 drbd_force_state(device, NS(disk, D_FAILED));
3977                         } else if (peer_state.disk == D_NEGOTIATING) {
3978                                 drbd_err(device, "Disk attach process on the peer node was aborted.\n");
3979                                 peer_state.disk = D_DISKLESS;
3980                                 real_peer_disk = D_DISKLESS;
3981                         } else {
3982                                 if (test_and_clear_bit(CONN_DRY_RUN, &peer_device->connection->flags))
3983                                         return -EIO;
3984                                 D_ASSERT(device, os.conn == C_WF_REPORT_PARAMS);
3985                                 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
3986                                 return -EIO;
3987                         }
3988                 }
3989         }
3990
3991         spin_lock_irq(&device->resource->req_lock);
3992         if (os.i != drbd_read_state(device).i)
3993                 goto retry;
3994         clear_bit(CONSIDER_RESYNC, &device->flags);
3995         ns.peer = peer_state.role;
3996         ns.pdsk = real_peer_disk;
3997         ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
3998         if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
3999                 ns.disk = device->new_state_tmp.disk;
4000         cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
4001         if (ns.pdsk == D_CONSISTENT && drbd_suspended(device) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
4002             test_bit(NEW_CUR_UUID, &device->flags)) {
4003                 /* Do not allow tl_restart(RESEND) for a rebooted peer. We can only allow this
4004                    for temporal network outages! */
4005                 spin_unlock_irq(&device->resource->req_lock);
4006                 drbd_err(device, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
4007                 tl_clear(peer_device->connection);
4008                 drbd_uuid_new_current(device);
4009                 clear_bit(NEW_CUR_UUID, &device->flags);
4010                 conn_request_state(peer_device->connection, NS2(conn, C_PROTOCOL_ERROR, susp, 0), CS_HARD);
4011                 return -EIO;
4012         }
4013         rv = _drbd_set_state(device, ns, cs_flags, NULL);
4014         ns = drbd_read_state(device);
4015         spin_unlock_irq(&device->resource->req_lock);
4016
4017         if (rv < SS_SUCCESS) {
4018                 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4019                 return -EIO;
4020         }
4021
4022         if (os.conn > C_WF_REPORT_PARAMS) {
4023                 if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
4024                     peer_state.disk != D_NEGOTIATING ) {
4025                         /* we want resync, peer has not yet decided to sync... */
4026                         /* Nowadays only used when forcing a node into primary role and
4027                            setting its disk to UpToDate with that */
4028                         drbd_send_uuids(peer_device);
4029                         drbd_send_current_state(peer_device);
4030                 }
4031         }
4032
4033         clear_bit(DISCARD_MY_DATA, &device->flags);
4034
4035         drbd_md_sync(device); /* update connected indicator, la_size_sect, ... */
4036
4037         return 0;
4038 }
4039
4040 static int receive_sync_uuid(struct drbd_connection *connection, struct packet_info *pi)
4041 {
4042         struct drbd_peer_device *peer_device;
4043         struct drbd_device *device;
4044         struct p_rs_uuid *p = pi->data;
4045
4046         peer_device = conn_peer_device(connection, pi->vnr);
4047         if (!peer_device)
4048                 return -EIO;
4049         device = peer_device->device;
4050
4051         wait_event(device->misc_wait,
4052                    device->state.conn == C_WF_SYNC_UUID ||
4053                    device->state.conn == C_BEHIND ||
4054                    device->state.conn < C_CONNECTED ||
4055                    device->state.disk < D_NEGOTIATING);
4056
4057         /* D_ASSERT(device,  device->state.conn == C_WF_SYNC_UUID ); */
4058
4059         /* Here the _drbd_uuid_ functions are right, current should
4060            _not_ be rotated into the history */
4061         if (get_ldev_if_state(device, D_NEGOTIATING)) {
4062                 _drbd_uuid_set(device, UI_CURRENT, be64_to_cpu(p->uuid));
4063                 _drbd_uuid_set(device, UI_BITMAP, 0UL);
4064
4065                 drbd_print_uuids(device, "updated sync uuid");
4066                 drbd_start_resync(device, C_SYNC_TARGET);
4067
4068                 put_ldev(device);
4069         } else
4070                 drbd_err(device, "Ignoring SyncUUID packet!\n");
4071
4072         return 0;
4073 }
4074
4075 /**
4076  * receive_bitmap_plain
4077  *
4078  * Return 0 when done, 1 when another iteration is needed, and a negative error
4079  * code upon failure.
4080  */
4081 static int
4082 receive_bitmap_plain(struct drbd_peer_device *peer_device, unsigned int size,
4083                      unsigned long *p, struct bm_xfer_ctx *c)
4084 {
4085         unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE -
4086                                  drbd_header_size(peer_device->connection);
4087         unsigned int num_words = min_t(size_t, data_size / sizeof(*p),
4088                                        c->bm_words - c->word_offset);
4089         unsigned int want = num_words * sizeof(*p);
4090         int err;
4091
4092         if (want != size) {
4093                 drbd_err(peer_device, "%s:want (%u) != size (%u)\n", __func__, want, size);
4094                 return -EIO;
4095         }
4096         if (want == 0)
4097                 return 0;
4098         err = drbd_recv_all(peer_device->connection, p, want);
4099         if (err)
4100                 return err;
4101
4102         drbd_bm_merge_lel(peer_device->device, c->word_offset, num_words, p);
4103
4104         c->word_offset += num_words;
4105         c->bit_offset = c->word_offset * BITS_PER_LONG;
4106         if (c->bit_offset > c->bm_bits)
4107                 c->bit_offset = c->bm_bits;
4108
4109         return 1;
4110 }
4111
4112 static enum drbd_bitmap_code dcbp_get_code(struct p_compressed_bm *p)
4113 {
4114         return (enum drbd_bitmap_code)(p->encoding & 0x0f);
4115 }
4116
4117 static int dcbp_get_start(struct p_compressed_bm *p)
4118 {
4119         return (p->encoding & 0x80) != 0;
4120 }
4121
4122 static int dcbp_get_pad_bits(struct p_compressed_bm *p)
4123 {
4124         return (p->encoding >> 4) & 0x7;
4125 }
4126
4127 /**
4128  * recv_bm_rle_bits
4129  *
4130  * Return 0 when done, 1 when another iteration is needed, and a negative error
4131  * code upon failure.
4132  */
4133 static int
4134 recv_bm_rle_bits(struct drbd_peer_device *peer_device,
4135                 struct p_compressed_bm *p,
4136                  struct bm_xfer_ctx *c,
4137                  unsigned int len)
4138 {
4139         struct bitstream bs;
4140         u64 look_ahead;
4141         u64 rl;
4142         u64 tmp;
4143         unsigned long s = c->bit_offset;
4144         unsigned long e;
4145         int toggle = dcbp_get_start(p);
4146         int have;
4147         int bits;
4148
4149         bitstream_init(&bs, p->code, len, dcbp_get_pad_bits(p));
4150
4151         bits = bitstream_get_bits(&bs, &look_ahead, 64);
4152         if (bits < 0)
4153                 return -EIO;
4154
4155         for (have = bits; have > 0; s += rl, toggle = !toggle) {
4156                 bits = vli_decode_bits(&rl, look_ahead);
4157                 if (bits <= 0)
4158                         return -EIO;
4159
4160                 if (toggle) {
4161                         e = s + rl -1;
4162                         if (e >= c->bm_bits) {
4163                                 drbd_err(peer_device, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
4164                                 return -EIO;
4165                         }
4166                         _drbd_bm_set_bits(peer_device->device, s, e);
4167                 }
4168
4169                 if (have < bits) {
4170                         drbd_err(peer_device, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
4171                                 have, bits, look_ahead,
4172                                 (unsigned int)(bs.cur.b - p->code),
4173                                 (unsigned int)bs.buf_len);
4174                         return -EIO;
4175                 }
4176                 /* if we consumed all 64 bits, assign 0; >> 64 is "undefined"; */
4177                 if (likely(bits < 64))
4178                         look_ahead >>= bits;
4179                 else
4180                         look_ahead = 0;
4181                 have -= bits;
4182
4183                 bits = bitstream_get_bits(&bs, &tmp, 64 - have);
4184                 if (bits < 0)
4185                         return -EIO;
4186                 look_ahead |= tmp << have;
4187                 have += bits;
4188         }
4189
4190         c->bit_offset = s;
4191         bm_xfer_ctx_bit_to_word_offset(c);
4192
4193         return (s != c->bm_bits);
4194 }
4195
4196 /**
4197  * decode_bitmap_c
4198  *
4199  * Return 0 when done, 1 when another iteration is needed, and a negative error
4200  * code upon failure.
4201  */
4202 static int
4203 decode_bitmap_c(struct drbd_peer_device *peer_device,
4204                 struct p_compressed_bm *p,
4205                 struct bm_xfer_ctx *c,
4206                 unsigned int len)
4207 {
4208         if (dcbp_get_code(p) == RLE_VLI_Bits)
4209                 return recv_bm_rle_bits(peer_device, p, c, len - sizeof(*p));
4210
4211         /* other variants had been implemented for evaluation,
4212          * but have been dropped as this one turned out to be "best"
4213          * during all our tests. */
4214
4215         drbd_err(peer_device, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
4216         conn_request_state(peer_device->connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4217         return -EIO;
4218 }
4219
4220 void INFO_bm_xfer_stats(struct drbd_device *device,
4221                 const char *direction, struct bm_xfer_ctx *c)
4222 {
4223         /* what would it take to transfer it "plaintext" */
4224         unsigned int header_size = drbd_header_size(first_peer_device(device)->connection);
4225         unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE - header_size;
4226         unsigned int plain =
4227                 header_size * (DIV_ROUND_UP(c->bm_words, data_size) + 1) +
4228                 c->bm_words * sizeof(unsigned long);
4229         unsigned int total = c->bytes[0] + c->bytes[1];
4230         unsigned int r;
4231
4232         /* total can not be zero. but just in case: */
4233         if (total == 0)
4234                 return;
4235
4236         /* don't report if not compressed */
4237         if (total >= plain)
4238                 return;
4239
4240         /* total < plain. check for overflow, still */
4241         r = (total > UINT_MAX/1000) ? (total / (plain/1000))
4242                                     : (1000 * total / plain);
4243
4244         if (r > 1000)
4245                 r = 1000;
4246
4247         r = 1000 - r;
4248         drbd_info(device, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
4249              "total %u; compression: %u.%u%%\n",
4250                         direction,
4251                         c->bytes[1], c->packets[1],
4252                         c->bytes[0], c->packets[0],
4253                         total, r/10, r % 10);
4254 }
4255
4256 /* Since we are processing the bitfield from lower addresses to higher,
4257    it does not matter if the process it in 32 bit chunks or 64 bit
4258    chunks as long as it is little endian. (Understand it as byte stream,
4259    beginning with the lowest byte...) If we would use big endian
4260    we would need to process it from the highest address to the lowest,
4261    in order to be agnostic to the 32 vs 64 bits issue.
4262
4263    returns 0 on failure, 1 if we successfully received it. */
4264 static int receive_bitmap(struct drbd_connection *connection, struct packet_info *pi)
4265 {
4266         struct drbd_peer_device *peer_device;
4267         struct drbd_device *device;
4268         struct bm_xfer_ctx c;
4269         int err;
4270
4271         peer_device = conn_peer_device(connection, pi->vnr);
4272         if (!peer_device)
4273                 return -EIO;
4274         device = peer_device->device;
4275
4276         drbd_bm_lock(device, "receive bitmap", BM_LOCKED_SET_ALLOWED);
4277         /* you are supposed to send additional out-of-sync information
4278          * if you actually set bits during this phase */
4279
4280         c = (struct bm_xfer_ctx) {
4281                 .bm_bits = drbd_bm_bits(device),
4282                 .bm_words = drbd_bm_words(device),
4283         };
4284
4285         for(;;) {
4286                 if (pi->cmd == P_BITMAP)
4287                         err = receive_bitmap_plain(peer_device, pi->size, pi->data, &c);
4288                 else if (pi->cmd == P_COMPRESSED_BITMAP) {
4289                         /* MAYBE: sanity check that we speak proto >= 90,
4290                          * and the feature is enabled! */
4291                         struct p_compressed_bm *p = pi->data;
4292
4293                         if (pi->size > DRBD_SOCKET_BUFFER_SIZE - drbd_header_size(connection)) {
4294                                 drbd_err(device, "ReportCBitmap packet too large\n");
4295                                 err = -EIO;
4296                                 goto out;
4297                         }
4298                         if (pi->size <= sizeof(*p)) {
4299                                 drbd_err(device, "ReportCBitmap packet too small (l:%u)\n", pi->size);
4300                                 err = -EIO;
4301                                 goto out;
4302                         }
4303                         err = drbd_recv_all(peer_device->connection, p, pi->size);
4304                         if (err)
4305                                goto out;
4306                         err = decode_bitmap_c(peer_device, p, &c, pi->size);
4307                 } else {
4308                         drbd_warn(device, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", pi->cmd);
4309                         err = -EIO;
4310                         goto out;
4311                 }
4312
4313                 c.packets[pi->cmd == P_BITMAP]++;
4314                 c.bytes[pi->cmd == P_BITMAP] += drbd_header_size(connection) + pi->size;
4315
4316                 if (err <= 0) {
4317                         if (err < 0)
4318                                 goto out;
4319                         break;
4320                 }
4321                 err = drbd_recv_header(peer_device->connection, pi);
4322                 if (err)
4323                         goto out;
4324         }
4325
4326         INFO_bm_xfer_stats(device, "receive", &c);
4327
4328         if (device->state.conn == C_WF_BITMAP_T) {
4329                 enum drbd_state_rv rv;
4330
4331                 err = drbd_send_bitmap(device);
4332                 if (err)
4333                         goto out;
4334                 /* Omit CS_ORDERED with this state transition to avoid deadlocks. */
4335                 rv = _drbd_request_state(device, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
4336                 D_ASSERT(device, rv == SS_SUCCESS);
4337         } else if (device->state.conn != C_WF_BITMAP_S) {
4338                 /* admin may have requested C_DISCONNECTING,
4339                  * other threads may have noticed network errors */
4340                 drbd_info(device, "unexpected cstate (%s) in receive_bitmap\n",
4341                     drbd_conn_str(device->state.conn));
4342         }
4343         err = 0;
4344
4345  out:
4346         drbd_bm_unlock(device);
4347         if (!err && device->state.conn == C_WF_BITMAP_S)
4348                 drbd_start_resync(device, C_SYNC_SOURCE);
4349         return err;
4350 }
4351
4352 static int receive_skip(struct drbd_connection *connection, struct packet_info *pi)
4353 {
4354         drbd_warn(connection, "skipping unknown optional packet type %d, l: %d!\n",
4355                  pi->cmd, pi->size);
4356
4357         return ignore_remaining_packet(connection, pi);
4358 }
4359
4360 static int receive_UnplugRemote(struct drbd_connection *connection, struct packet_info *pi)
4361 {
4362         /* Make sure we've acked all the TCP data associated
4363          * with the data requests being unplugged */
4364         drbd_tcp_quickack(connection->data.socket);
4365
4366         return 0;
4367 }
4368
4369 static int receive_out_of_sync(struct drbd_connection *connection, struct packet_info *pi)
4370 {
4371         struct drbd_peer_device *peer_device;
4372         struct drbd_device *device;
4373         struct p_block_desc *p = pi->data;
4374
4375         peer_device = conn_peer_device(connection, pi->vnr);
4376         if (!peer_device)
4377                 return -EIO;
4378         device = peer_device->device;
4379
4380         switch (device->state.conn) {
4381         case C_WF_SYNC_UUID:
4382         case C_WF_BITMAP_T:
4383         case C_BEHIND:
4384                         break;
4385         default:
4386                 drbd_err(device, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
4387                                 drbd_conn_str(device->state.conn));
4388         }
4389
4390         drbd_set_out_of_sync(device, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
4391
4392         return 0;
4393 }
4394
4395 struct data_cmd {
4396         int expect_payload;
4397         size_t pkt_size;
4398         int (*fn)(struct drbd_connection *, struct packet_info *);
4399 };
4400
4401 static struct data_cmd drbd_cmd_handler[] = {
4402         [P_DATA]            = { 1, sizeof(struct p_data), receive_Data },
4403         [P_DATA_REPLY]      = { 1, sizeof(struct p_data), receive_DataReply },
4404         [P_RS_DATA_REPLY]   = { 1, sizeof(struct p_data), receive_RSDataReply } ,
4405         [P_BARRIER]         = { 0, sizeof(struct p_barrier), receive_Barrier } ,
4406         [P_BITMAP]          = { 1, 0, receive_bitmap } ,
4407         [P_COMPRESSED_BITMAP] = { 1, 0, receive_bitmap } ,
4408         [P_UNPLUG_REMOTE]   = { 0, 0, receive_UnplugRemote },
4409         [P_DATA_REQUEST]    = { 0, sizeof(struct p_block_req), receive_DataRequest },
4410         [P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
4411         [P_SYNC_PARAM]      = { 1, 0, receive_SyncParam },
4412         [P_SYNC_PARAM89]    = { 1, 0, receive_SyncParam },
4413         [P_PROTOCOL]        = { 1, sizeof(struct p_protocol), receive_protocol },
4414         [P_UUIDS]           = { 0, sizeof(struct p_uuids), receive_uuids },
4415         [P_SIZES]           = { 0, sizeof(struct p_sizes), receive_sizes },
4416         [P_STATE]           = { 0, sizeof(struct p_state), receive_state },
4417         [P_STATE_CHG_REQ]   = { 0, sizeof(struct p_req_state), receive_req_state },
4418         [P_SYNC_UUID]       = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
4419         [P_OV_REQUEST]      = { 0, sizeof(struct p_block_req), receive_DataRequest },
4420         [P_OV_REPLY]        = { 1, sizeof(struct p_block_req), receive_DataRequest },
4421         [P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
4422         [P_DELAY_PROBE]     = { 0, sizeof(struct p_delay_probe93), receive_skip },
4423         [P_OUT_OF_SYNC]     = { 0, sizeof(struct p_block_desc), receive_out_of_sync },
4424         [P_CONN_ST_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_conn_state },
4425         [P_PROTOCOL_UPDATE] = { 1, sizeof(struct p_protocol), receive_protocol },
4426 };
4427
4428 static void drbdd(struct drbd_connection *connection)
4429 {
4430         struct packet_info pi;
4431         size_t shs; /* sub header size */
4432         int err;
4433
4434         while (get_t_state(&connection->receiver) == RUNNING) {
4435                 struct data_cmd *cmd;
4436
4437                 drbd_thread_current_set_cpu(&connection->receiver);
4438                 if (drbd_recv_header(connection, &pi))
4439                         goto err_out;
4440
4441                 cmd = &drbd_cmd_handler[pi.cmd];
4442                 if (unlikely(pi.cmd >= ARRAY_SIZE(drbd_cmd_handler) || !cmd->fn)) {
4443                         drbd_err(connection, "Unexpected data packet %s (0x%04x)",
4444                                  cmdname(pi.cmd), pi.cmd);
4445                         goto err_out;
4446                 }
4447
4448                 shs = cmd->pkt_size;
4449                 if (pi.size > shs && !cmd->expect_payload) {
4450                         drbd_err(connection, "No payload expected %s l:%d\n",
4451                                  cmdname(pi.cmd), pi.size);
4452                         goto err_out;
4453                 }
4454
4455                 if (shs) {
4456                         err = drbd_recv_all_warn(connection, pi.data, shs);
4457                         if (err)
4458                                 goto err_out;
4459                         pi.size -= shs;
4460                 }
4461
4462                 err = cmd->fn(connection, &pi);
4463                 if (err) {
4464                         drbd_err(connection, "error receiving %s, e: %d l: %d!\n",
4465                                  cmdname(pi.cmd), err, pi.size);
4466                         goto err_out;
4467                 }
4468         }
4469         return;
4470
4471     err_out:
4472         conn_request_state(connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4473 }
4474
4475 static void conn_disconnect(struct drbd_connection *connection)
4476 {
4477         struct drbd_peer_device *peer_device;
4478         enum drbd_conns oc;
4479         int vnr;
4480
4481         if (connection->cstate == C_STANDALONE)
4482                 return;
4483
4484         /* We are about to start the cleanup after connection loss.
4485          * Make sure drbd_make_request knows about that.
4486          * Usually we should be in some network failure state already,
4487          * but just in case we are not, we fix it up here.
4488          */
4489         conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
4490
4491         /* asender does not clean up anything. it must not interfere, either */
4492         drbd_thread_stop(&connection->asender);
4493         drbd_free_sock(connection);
4494
4495         rcu_read_lock();
4496         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
4497                 struct drbd_device *device = peer_device->device;
4498                 kref_get(&device->kref);
4499                 rcu_read_unlock();
4500                 drbd_disconnected(peer_device);
4501                 kref_put(&device->kref, drbd_destroy_device);
4502                 rcu_read_lock();
4503         }
4504         rcu_read_unlock();
4505
4506         if (!list_empty(&connection->current_epoch->list))
4507                 drbd_err(connection, "ASSERTION FAILED: connection->current_epoch->list not empty\n");
4508         /* ok, no more ee's on the fly, it is safe to reset the epoch_size */
4509         atomic_set(&connection->current_epoch->epoch_size, 0);
4510         connection->send.seen_any_write_yet = false;
4511
4512         drbd_info(connection, "Connection closed\n");
4513
4514         if (conn_highest_role(connection) == R_PRIMARY && conn_highest_pdsk(connection) >= D_UNKNOWN)
4515                 conn_try_outdate_peer_async(connection);
4516
4517         spin_lock_irq(&connection->resource->req_lock);
4518         oc = connection->cstate;
4519         if (oc >= C_UNCONNECTED)
4520                 _conn_request_state(connection, NS(conn, C_UNCONNECTED), CS_VERBOSE);
4521
4522         spin_unlock_irq(&connection->resource->req_lock);
4523
4524         if (oc == C_DISCONNECTING)
4525                 conn_request_state(connection, NS(conn, C_STANDALONE), CS_VERBOSE | CS_HARD);
4526 }
4527
4528 static int drbd_disconnected(struct drbd_peer_device *peer_device)
4529 {
4530         struct drbd_device *device = peer_device->device;
4531         unsigned int i;
4532
4533         /* wait for current activity to cease. */
4534         spin_lock_irq(&device->resource->req_lock);
4535         _drbd_wait_ee_list_empty(device, &device->active_ee);
4536         _drbd_wait_ee_list_empty(device, &device->sync_ee);
4537         _drbd_wait_ee_list_empty(device, &device->read_ee);
4538         spin_unlock_irq(&device->resource->req_lock);
4539
4540         /* We do not have data structures that would allow us to
4541          * get the rs_pending_cnt down to 0 again.
4542          *  * On C_SYNC_TARGET we do not have any data structures describing
4543          *    the pending RSDataRequest's we have sent.
4544          *  * On C_SYNC_SOURCE there is no data structure that tracks
4545          *    the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
4546          *  And no, it is not the sum of the reference counts in the
4547          *  resync_LRU. The resync_LRU tracks the whole operation including
4548          *  the disk-IO, while the rs_pending_cnt only tracks the blocks
4549          *  on the fly. */
4550         drbd_rs_cancel_all(device);
4551         device->rs_total = 0;
4552         device->rs_failed = 0;
4553         atomic_set(&device->rs_pending_cnt, 0);
4554         wake_up(&device->misc_wait);
4555
4556         del_timer_sync(&device->resync_timer);
4557         resync_timer_fn((unsigned long)device);
4558
4559         /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
4560          * w_make_resync_request etc. which may still be on the worker queue
4561          * to be "canceled" */
4562         drbd_flush_workqueue(&peer_device->connection->sender_work);
4563
4564         drbd_finish_peer_reqs(device);
4565
4566         /* This second workqueue flush is necessary, since drbd_finish_peer_reqs()
4567            might have issued a work again. The one before drbd_finish_peer_reqs() is
4568            necessary to reclain net_ee in drbd_finish_peer_reqs(). */
4569         drbd_flush_workqueue(&peer_device->connection->sender_work);
4570
4571         /* need to do it again, drbd_finish_peer_reqs() may have populated it
4572          * again via drbd_try_clear_on_disk_bm(). */
4573         drbd_rs_cancel_all(device);
4574
4575         kfree(device->p_uuid);
4576         device->p_uuid = NULL;
4577
4578         if (!drbd_suspended(device))
4579                 tl_clear(peer_device->connection);
4580
4581         drbd_md_sync(device);
4582
4583         /* serialize with bitmap writeout triggered by the state change,
4584          * if any. */
4585         wait_event(device->misc_wait, !test_bit(BITMAP_IO, &device->flags));
4586
4587         /* tcp_close and release of sendpage pages can be deferred.  I don't
4588          * want to use SO_LINGER, because apparently it can be deferred for
4589          * more than 20 seconds (longest time I checked).
4590          *
4591          * Actually we don't care for exactly when the network stack does its
4592          * put_page(), but release our reference on these pages right here.
4593          */
4594         i = drbd_free_peer_reqs(device, &device->net_ee);
4595         if (i)
4596                 drbd_info(device, "net_ee not empty, killed %u entries\n", i);
4597         i = atomic_read(&device->pp_in_use_by_net);
4598         if (i)
4599                 drbd_info(device, "pp_in_use_by_net = %d, expected 0\n", i);
4600         i = atomic_read(&device->pp_in_use);
4601         if (i)
4602                 drbd_info(device, "pp_in_use = %d, expected 0\n", i);
4603
4604         D_ASSERT(device, list_empty(&device->read_ee));
4605         D_ASSERT(device, list_empty(&device->active_ee));
4606         D_ASSERT(device, list_empty(&device->sync_ee));
4607         D_ASSERT(device, list_empty(&device->done_ee));
4608
4609         return 0;
4610 }
4611
4612 /*
4613  * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
4614  * we can agree on is stored in agreed_pro_version.
4615  *
4616  * feature flags and the reserved array should be enough room for future
4617  * enhancements of the handshake protocol, and possible plugins...
4618  *
4619  * for now, they are expected to be zero, but ignored.
4620  */
4621 static int drbd_send_features(struct drbd_connection *connection)
4622 {
4623         struct drbd_socket *sock;
4624         struct p_connection_features *p;
4625
4626         sock = &connection->data;
4627         p = conn_prepare_command(connection, sock);
4628         if (!p)
4629                 return -EIO;
4630         memset(p, 0, sizeof(*p));
4631         p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
4632         p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
4633         return conn_send_command(connection, sock, P_CONNECTION_FEATURES, sizeof(*p), NULL, 0);
4634 }
4635
4636 /*
4637  * return values:
4638  *   1 yes, we have a valid connection
4639  *   0 oops, did not work out, please try again
4640  *  -1 peer talks different language,
4641  *     no point in trying again, please go standalone.
4642  */
4643 static int drbd_do_features(struct drbd_connection *connection)
4644 {
4645         /* ASSERT current == connection->receiver ... */
4646         struct p_connection_features *p;
4647         const int expect = sizeof(struct p_connection_features);
4648         struct packet_info pi;
4649         int err;
4650
4651         err = drbd_send_features(connection);
4652         if (err)
4653                 return 0;
4654
4655         err = drbd_recv_header(connection, &pi);
4656         if (err)
4657                 return 0;
4658
4659         if (pi.cmd != P_CONNECTION_FEATURES) {
4660                 drbd_err(connection, "expected ConnectionFeatures packet, received: %s (0x%04x)\n",
4661                          cmdname(pi.cmd), pi.cmd);
4662                 return -1;
4663         }
4664
4665         if (pi.size != expect) {
4666                 drbd_err(connection, "expected ConnectionFeatures length: %u, received: %u\n",
4667                      expect, pi.size);
4668                 return -1;
4669         }
4670
4671         p = pi.data;
4672         err = drbd_recv_all_warn(connection, p, expect);
4673         if (err)
4674                 return 0;
4675
4676         p->protocol_min = be32_to_cpu(p->protocol_min);
4677         p->protocol_max = be32_to_cpu(p->protocol_max);
4678         if (p->protocol_max == 0)
4679                 p->protocol_max = p->protocol_min;
4680
4681         if (PRO_VERSION_MAX < p->protocol_min ||
4682             PRO_VERSION_MIN > p->protocol_max)
4683                 goto incompat;
4684
4685         connection->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
4686
4687         drbd_info(connection, "Handshake successful: "
4688              "Agreed network protocol version %d\n", connection->agreed_pro_version);
4689
4690         return 1;
4691
4692  incompat:
4693         drbd_err(connection, "incompatible DRBD dialects: "
4694             "I support %d-%d, peer supports %d-%d\n",
4695             PRO_VERSION_MIN, PRO_VERSION_MAX,
4696             p->protocol_min, p->protocol_max);
4697         return -1;
4698 }
4699
4700 #if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
4701 static int drbd_do_auth(struct drbd_connection *connection)
4702 {
4703         drbd_err(connection, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
4704         drbd_err(connection, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
4705         return -1;
4706 }
4707 #else
4708 #define CHALLENGE_LEN 64
4709
4710 /* Return value:
4711         1 - auth succeeded,
4712         0 - failed, try again (network error),
4713         -1 - auth failed, don't try again.
4714 */
4715
4716 static int drbd_do_auth(struct drbd_connection *connection)
4717 {
4718         struct drbd_socket *sock;
4719         char my_challenge[CHALLENGE_LEN];  /* 64 Bytes... */
4720         struct scatterlist sg;
4721         char *response = NULL;
4722         char *right_response = NULL;
4723         char *peers_ch = NULL;
4724         unsigned int key_len;
4725         char secret[SHARED_SECRET_MAX]; /* 64 byte */
4726         unsigned int resp_size;
4727         struct hash_desc desc;
4728         struct packet_info pi;
4729         struct net_conf *nc;
4730         int err, rv;
4731
4732         /* FIXME: Put the challenge/response into the preallocated socket buffer.  */
4733
4734         rcu_read_lock();
4735         nc = rcu_dereference(connection->net_conf);
4736         key_len = strlen(nc->shared_secret);
4737         memcpy(secret, nc->shared_secret, key_len);
4738         rcu_read_unlock();
4739
4740         desc.tfm = connection->cram_hmac_tfm;
4741         desc.flags = 0;
4742
4743         rv = crypto_hash_setkey(connection->cram_hmac_tfm, (u8 *)secret, key_len);
4744         if (rv) {
4745                 drbd_err(connection, "crypto_hash_setkey() failed with %d\n", rv);
4746                 rv = -1;
4747                 goto fail;
4748         }
4749
4750         get_random_bytes(my_challenge, CHALLENGE_LEN);
4751
4752         sock = &connection->data;
4753         if (!conn_prepare_command(connection, sock)) {
4754                 rv = 0;
4755                 goto fail;
4756         }
4757         rv = !conn_send_command(connection, sock, P_AUTH_CHALLENGE, 0,
4758                                 my_challenge, CHALLENGE_LEN);
4759         if (!rv)
4760                 goto fail;
4761
4762         err = drbd_recv_header(connection, &pi);
4763         if (err) {
4764                 rv = 0;
4765                 goto fail;
4766         }
4767
4768         if (pi.cmd != P_AUTH_CHALLENGE) {
4769                 drbd_err(connection, "expected AuthChallenge packet, received: %s (0x%04x)\n",
4770                          cmdname(pi.cmd), pi.cmd);
4771                 rv = 0;
4772                 goto fail;
4773         }
4774
4775         if (pi.size > CHALLENGE_LEN * 2) {
4776                 drbd_err(connection, "expected AuthChallenge payload too big.\n");
4777                 rv = -1;
4778                 goto fail;
4779         }
4780
4781         peers_ch = kmalloc(pi.size, GFP_NOIO);
4782         if (peers_ch == NULL) {
4783                 drbd_err(connection, "kmalloc of peers_ch failed\n");
4784                 rv = -1;
4785                 goto fail;
4786         }
4787
4788         err = drbd_recv_all_warn(connection, peers_ch, pi.size);
4789         if (err) {
4790                 rv = 0;
4791                 goto fail;
4792         }
4793
4794         resp_size = crypto_hash_digestsize(connection->cram_hmac_tfm);
4795         response = kmalloc(resp_size, GFP_NOIO);
4796         if (response == NULL) {
4797                 drbd_err(connection, "kmalloc of response failed\n");
4798                 rv = -1;
4799                 goto fail;
4800         }
4801
4802         sg_init_table(&sg, 1);
4803         sg_set_buf(&sg, peers_ch, pi.size);
4804
4805         rv = crypto_hash_digest(&desc, &sg, sg.length, response);
4806         if (rv) {
4807                 drbd_err(connection, "crypto_hash_digest() failed with %d\n", rv);
4808                 rv = -1;
4809                 goto fail;
4810         }
4811
4812         if (!conn_prepare_command(connection, sock)) {
4813                 rv = 0;
4814                 goto fail;
4815         }
4816         rv = !conn_send_command(connection, sock, P_AUTH_RESPONSE, 0,
4817                                 response, resp_size);
4818         if (!rv)
4819                 goto fail;
4820
4821         err = drbd_recv_header(connection, &pi);
4822         if (err) {
4823                 rv = 0;
4824                 goto fail;
4825         }
4826
4827         if (pi.cmd != P_AUTH_RESPONSE) {
4828                 drbd_err(connection, "expected AuthResponse packet, received: %s (0x%04x)\n",
4829                          cmdname(pi.cmd), pi.cmd);
4830                 rv = 0;
4831                 goto fail;
4832         }
4833
4834         if (pi.size != resp_size) {
4835                 drbd_err(connection, "expected AuthResponse payload of wrong size\n");
4836                 rv = 0;
4837                 goto fail;
4838         }
4839
4840         err = drbd_recv_all_warn(connection, response , resp_size);
4841         if (err) {
4842                 rv = 0;
4843                 goto fail;
4844         }
4845
4846         right_response = kmalloc(resp_size, GFP_NOIO);
4847         if (right_response == NULL) {
4848                 drbd_err(connection, "kmalloc of right_response failed\n");
4849                 rv = -1;
4850                 goto fail;
4851         }
4852
4853         sg_set_buf(&sg, my_challenge, CHALLENGE_LEN);
4854
4855         rv = crypto_hash_digest(&desc, &sg, sg.length, right_response);
4856         if (rv) {
4857                 drbd_err(connection, "crypto_hash_digest() failed with %d\n", rv);
4858                 rv = -1;
4859                 goto fail;
4860         }
4861
4862         rv = !memcmp(response, right_response, resp_size);
4863
4864         if (rv)
4865                 drbd_info(connection, "Peer authenticated using %d bytes HMAC\n",
4866                      resp_size);
4867         else
4868                 rv = -1;
4869
4870  fail:
4871         kfree(peers_ch);
4872         kfree(response);
4873         kfree(right_response);
4874
4875         return rv;
4876 }
4877 #endif
4878
4879 int drbd_receiver(struct drbd_thread *thi)
4880 {
4881         struct drbd_connection *connection = thi->connection;
4882         int h;
4883
4884         drbd_info(connection, "receiver (re)started\n");
4885
4886         do {
4887                 h = conn_connect(connection);
4888                 if (h == 0) {
4889                         conn_disconnect(connection);
4890                         schedule_timeout_interruptible(HZ);
4891                 }
4892                 if (h == -1) {
4893                         drbd_warn(connection, "Discarding network configuration.\n");
4894                         conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
4895                 }
4896         } while (h == 0);
4897
4898         if (h > 0)
4899                 drbdd(connection);
4900
4901         conn_disconnect(connection);
4902
4903         drbd_info(connection, "receiver terminated\n");
4904         return 0;
4905 }
4906
4907 /* ********* acknowledge sender ******** */
4908
4909 static int got_conn_RqSReply(struct drbd_connection *connection, struct packet_info *pi)
4910 {
4911         struct p_req_state_reply *p = pi->data;
4912         int retcode = be32_to_cpu(p->retcode);
4913
4914         if (retcode >= SS_SUCCESS) {
4915                 set_bit(CONN_WD_ST_CHG_OKAY, &connection->flags);
4916         } else {
4917                 set_bit(CONN_WD_ST_CHG_FAIL, &connection->flags);
4918                 drbd_err(connection, "Requested state change failed by peer: %s (%d)\n",
4919                          drbd_set_st_err_str(retcode), retcode);
4920         }
4921         wake_up(&connection->ping_wait);
4922
4923         return 0;
4924 }
4925
4926 static int got_RqSReply(struct drbd_connection *connection, struct packet_info *pi)
4927 {
4928         struct drbd_peer_device *peer_device;
4929         struct drbd_device *device;
4930         struct p_req_state_reply *p = pi->data;
4931         int retcode = be32_to_cpu(p->retcode);
4932
4933         peer_device = conn_peer_device(connection, pi->vnr);
4934         if (!peer_device)
4935                 return -EIO;
4936         device = peer_device->device;
4937
4938         if (test_bit(CONN_WD_ST_CHG_REQ, &connection->flags)) {
4939                 D_ASSERT(device, connection->agreed_pro_version < 100);
4940                 return got_conn_RqSReply(connection, pi);
4941         }
4942
4943         if (retcode >= SS_SUCCESS) {
4944                 set_bit(CL_ST_CHG_SUCCESS, &device->flags);
4945         } else {
4946                 set_bit(CL_ST_CHG_FAIL, &device->flags);
4947                 drbd_err(device, "Requested state change failed by peer: %s (%d)\n",
4948                         drbd_set_st_err_str(retcode), retcode);
4949         }
4950         wake_up(&device->state_wait);
4951
4952         return 0;
4953 }
4954
4955 static int got_Ping(struct drbd_connection *connection, struct packet_info *pi)
4956 {
4957         return drbd_send_ping_ack(connection);
4958
4959 }
4960
4961 static int got_PingAck(struct drbd_connection *connection, struct packet_info *pi)
4962 {
4963         /* restore idle timeout */
4964         connection->meta.socket->sk->sk_rcvtimeo = connection->net_conf->ping_int*HZ;
4965         if (!test_and_set_bit(GOT_PING_ACK, &connection->flags))
4966                 wake_up(&connection->ping_wait);
4967
4968         return 0;
4969 }
4970
4971 static int got_IsInSync(struct drbd_connection *connection, struct packet_info *pi)
4972 {
4973         struct drbd_peer_device *peer_device;
4974         struct drbd_device *device;
4975         struct p_block_ack *p = pi->data;
4976         sector_t sector = be64_to_cpu(p->sector);
4977         int blksize = be32_to_cpu(p->blksize);
4978
4979         peer_device = conn_peer_device(connection, pi->vnr);
4980         if (!peer_device)
4981                 return -EIO;
4982         device = peer_device->device;
4983
4984         D_ASSERT(device, peer_device->connection->agreed_pro_version >= 89);
4985
4986         update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
4987
4988         if (get_ldev(device)) {
4989                 drbd_rs_complete_io(device, sector);
4990                 drbd_set_in_sync(device, sector, blksize);
4991                 /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
4992                 device->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
4993                 put_ldev(device);
4994         }
4995         dec_rs_pending(device);
4996         atomic_add(blksize >> 9, &device->rs_sect_in);
4997
4998         return 0;
4999 }
5000
5001 static int
5002 validate_req_change_req_state(struct drbd_device *device, u64 id, sector_t sector,
5003                               struct rb_root *root, const char *func,
5004                               enum drbd_req_event what, bool missing_ok)
5005 {
5006         struct drbd_request *req;
5007         struct bio_and_error m;
5008
5009         spin_lock_irq(&device->resource->req_lock);
5010         req = find_request(device, root, id, sector, missing_ok, func);
5011         if (unlikely(!req)) {
5012                 spin_unlock_irq(&device->resource->req_lock);
5013                 return -EIO;
5014         }
5015         __req_mod(req, what, &m);
5016         spin_unlock_irq(&device->resource->req_lock);
5017
5018         if (m.bio)
5019                 complete_master_bio(device, &m);
5020         return 0;
5021 }
5022
5023 static int got_BlockAck(struct drbd_connection *connection, struct packet_info *pi)
5024 {
5025         struct drbd_peer_device *peer_device;
5026         struct drbd_device *device;
5027         struct p_block_ack *p = pi->data;
5028         sector_t sector = be64_to_cpu(p->sector);
5029         int blksize = be32_to_cpu(p->blksize);
5030         enum drbd_req_event what;
5031
5032         peer_device = conn_peer_device(connection, pi->vnr);
5033         if (!peer_device)
5034                 return -EIO;
5035         device = peer_device->device;
5036
5037         update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5038
5039         if (p->block_id == ID_SYNCER) {
5040                 drbd_set_in_sync(device, sector, blksize);
5041                 dec_rs_pending(device);
5042                 return 0;
5043         }
5044         switch (pi->cmd) {
5045         case P_RS_WRITE_ACK:
5046                 what = WRITE_ACKED_BY_PEER_AND_SIS;
5047                 break;
5048         case P_WRITE_ACK:
5049                 what = WRITE_ACKED_BY_PEER;
5050                 break;
5051         case P_RECV_ACK:
5052                 what = RECV_ACKED_BY_PEER;
5053                 break;
5054         case P_SUPERSEDED:
5055                 what = CONFLICT_RESOLVED;
5056                 break;
5057         case P_RETRY_WRITE:
5058                 what = POSTPONE_WRITE;
5059                 break;
5060         default:
5061                 BUG();
5062         }
5063
5064         return validate_req_change_req_state(device, p->block_id, sector,
5065                                              &device->write_requests, __func__,
5066                                              what, false);
5067 }
5068
5069 static int got_NegAck(struct drbd_connection *connection, struct packet_info *pi)
5070 {
5071         struct drbd_peer_device *peer_device;
5072         struct drbd_device *device;
5073         struct p_block_ack *p = pi->data;
5074         sector_t sector = be64_to_cpu(p->sector);
5075         int size = be32_to_cpu(p->blksize);
5076         int err;
5077
5078         peer_device = conn_peer_device(connection, pi->vnr);
5079         if (!peer_device)
5080                 return -EIO;
5081         device = peer_device->device;
5082
5083         update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5084
5085         if (p->block_id == ID_SYNCER) {
5086                 dec_rs_pending(device);
5087                 drbd_rs_failed_io(device, sector, size);
5088                 return 0;
5089         }
5090
5091         err = validate_req_change_req_state(device, p->block_id, sector,
5092                                             &device->write_requests, __func__,
5093                                             NEG_ACKED, true);
5094         if (err) {
5095                 /* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
5096                    The master bio might already be completed, therefore the
5097                    request is no longer in the collision hash. */
5098                 /* In Protocol B we might already have got a P_RECV_ACK
5099                    but then get a P_NEG_ACK afterwards. */
5100                 drbd_set_out_of_sync(device, sector, size);
5101         }
5102         return 0;
5103 }
5104
5105 static int got_NegDReply(struct drbd_connection *connection, struct packet_info *pi)
5106 {
5107         struct drbd_peer_device *peer_device;
5108         struct drbd_device *device;
5109         struct p_block_ack *p = pi->data;
5110         sector_t sector = be64_to_cpu(p->sector);
5111
5112         peer_device = conn_peer_device(connection, pi->vnr);
5113         if (!peer_device)
5114                 return -EIO;
5115         device = peer_device->device;
5116
5117         update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5118
5119         drbd_err(device, "Got NegDReply; Sector %llus, len %u.\n",
5120             (unsigned long long)sector, be32_to_cpu(p->blksize));
5121
5122         return validate_req_change_req_state(device, p->block_id, sector,
5123                                              &device->read_requests, __func__,
5124                                              NEG_ACKED, false);
5125 }
5126
5127 static int got_NegRSDReply(struct drbd_connection *connection, struct packet_info *pi)
5128 {
5129         struct drbd_peer_device *peer_device;
5130         struct drbd_device *device;
5131         sector_t sector;
5132         int size;
5133         struct p_block_ack *p = pi->data;
5134
5135         peer_device = conn_peer_device(connection, pi->vnr);
5136         if (!peer_device)
5137                 return -EIO;
5138         device = peer_device->device;
5139
5140         sector = be64_to_cpu(p->sector);
5141         size = be32_to_cpu(p->blksize);
5142
5143         update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5144
5145         dec_rs_pending(device);
5146
5147         if (get_ldev_if_state(device, D_FAILED)) {
5148                 drbd_rs_complete_io(device, sector);
5149                 switch (pi->cmd) {
5150                 case P_NEG_RS_DREPLY:
5151                         drbd_rs_failed_io(device, sector, size);
5152                 case P_RS_CANCEL:
5153                         break;
5154                 default:
5155                         BUG();
5156                 }
5157                 put_ldev(device);
5158         }
5159
5160         return 0;
5161 }
5162
5163 static int got_BarrierAck(struct drbd_connection *connection, struct packet_info *pi)
5164 {
5165         struct p_barrier_ack *p = pi->data;
5166         struct drbd_peer_device *peer_device;
5167         int vnr;
5168
5169         tl_release(connection, p->barrier, be32_to_cpu(p->set_size));
5170
5171         rcu_read_lock();
5172         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
5173                 struct drbd_device *device = peer_device->device;
5174
5175                 if (device->state.conn == C_AHEAD &&
5176                     atomic_read(&device->ap_in_flight) == 0 &&
5177                     !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &device->flags)) {
5178                         device->start_resync_timer.expires = jiffies + HZ;
5179                         add_timer(&device->start_resync_timer);
5180                 }
5181         }
5182         rcu_read_unlock();
5183
5184         return 0;
5185 }
5186
5187 static int got_OVResult(struct drbd_connection *connection, struct packet_info *pi)
5188 {
5189         struct drbd_peer_device *peer_device;
5190         struct drbd_device *device;
5191         struct p_block_ack *p = pi->data;
5192         struct drbd_device_work *dw;
5193         sector_t sector;
5194         int size;
5195
5196         peer_device = conn_peer_device(connection, pi->vnr);
5197         if (!peer_device)
5198                 return -EIO;
5199         device = peer_device->device;
5200
5201         sector = be64_to_cpu(p->sector);
5202         size = be32_to_cpu(p->blksize);
5203
5204         update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5205
5206         if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
5207                 drbd_ov_out_of_sync_found(device, sector, size);
5208         else
5209                 ov_out_of_sync_print(device);
5210
5211         if (!get_ldev(device))
5212                 return 0;
5213
5214         drbd_rs_complete_io(device, sector);
5215         dec_rs_pending(device);
5216
5217         --device->ov_left;
5218
5219         /* let's advance progress step marks only for every other megabyte */
5220         if ((device->ov_left & 0x200) == 0x200)
5221                 drbd_advance_rs_marks(device, device->ov_left);
5222
5223         if (device->ov_left == 0) {
5224                 dw = kmalloc(sizeof(*dw), GFP_NOIO);
5225                 if (dw) {
5226                         dw->w.cb = w_ov_finished;
5227                         dw->device = device;
5228                         drbd_queue_work(&peer_device->connection->sender_work, &dw->w);
5229                 } else {
5230                         drbd_err(device, "kmalloc(dw) failed.");
5231                         ov_out_of_sync_print(device);
5232                         drbd_resync_finished(device);
5233                 }
5234         }
5235         put_ldev(device);
5236         return 0;
5237 }
5238
5239 static int got_skip(struct drbd_connection *connection, struct packet_info *pi)
5240 {
5241         return 0;
5242 }
5243
5244 static int connection_finish_peer_reqs(struct drbd_connection *connection)
5245 {
5246         struct drbd_peer_device *peer_device;
5247         int vnr, not_empty = 0;
5248
5249         do {
5250                 clear_bit(SIGNAL_ASENDER, &connection->flags);
5251                 flush_signals(current);
5252
5253                 rcu_read_lock();
5254                 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
5255                         struct drbd_device *device = peer_device->device;
5256                         kref_get(&device->kref);
5257                         rcu_read_unlock();
5258                         if (drbd_finish_peer_reqs(device)) {
5259                                 kref_put(&device->kref, drbd_destroy_device);
5260                                 return 1;
5261                         }
5262                         kref_put(&device->kref, drbd_destroy_device);
5263                         rcu_read_lock();
5264                 }
5265                 set_bit(SIGNAL_ASENDER, &connection->flags);
5266
5267                 spin_lock_irq(&connection->resource->req_lock);
5268                 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
5269                         struct drbd_device *device = peer_device->device;
5270                         not_empty = !list_empty(&device->done_ee);
5271                         if (not_empty)
5272                                 break;
5273                 }
5274                 spin_unlock_irq(&connection->resource->req_lock);
5275                 rcu_read_unlock();
5276         } while (not_empty);
5277
5278         return 0;
5279 }
5280
5281 struct asender_cmd {
5282         size_t pkt_size;
5283         int (*fn)(struct drbd_connection *connection, struct packet_info *);
5284 };
5285
5286 static struct asender_cmd asender_tbl[] = {
5287         [P_PING]            = { 0, got_Ping },
5288         [P_PING_ACK]        = { 0, got_PingAck },
5289         [P_RECV_ACK]        = { sizeof(struct p_block_ack), got_BlockAck },
5290         [P_WRITE_ACK]       = { sizeof(struct p_block_ack), got_BlockAck },
5291         [P_RS_WRITE_ACK]    = { sizeof(struct p_block_ack), got_BlockAck },
5292         [P_SUPERSEDED]   = { sizeof(struct p_block_ack), got_BlockAck },
5293         [P_NEG_ACK]         = { sizeof(struct p_block_ack), got_NegAck },
5294         [P_NEG_DREPLY]      = { sizeof(struct p_block_ack), got_NegDReply },
5295         [P_NEG_RS_DREPLY]   = { sizeof(struct p_block_ack), got_NegRSDReply },
5296         [P_OV_RESULT]       = { sizeof(struct p_block_ack), got_OVResult },
5297         [P_BARRIER_ACK]     = { sizeof(struct p_barrier_ack), got_BarrierAck },
5298         [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
5299         [P_RS_IS_IN_SYNC]   = { sizeof(struct p_block_ack), got_IsInSync },
5300         [P_DELAY_PROBE]     = { sizeof(struct p_delay_probe93), got_skip },
5301         [P_RS_CANCEL]       = { sizeof(struct p_block_ack), got_NegRSDReply },
5302         [P_CONN_ST_CHG_REPLY]={ sizeof(struct p_req_state_reply), got_conn_RqSReply },
5303         [P_RETRY_WRITE]     = { sizeof(struct p_block_ack), got_BlockAck },
5304 };
5305
5306 int drbd_asender(struct drbd_thread *thi)
5307 {
5308         struct drbd_connection *connection = thi->connection;
5309         struct asender_cmd *cmd = NULL;
5310         struct packet_info pi;
5311         int rv;
5312         void *buf    = connection->meta.rbuf;
5313         int received = 0;
5314         unsigned int header_size = drbd_header_size(connection);
5315         int expect   = header_size;
5316         bool ping_timeout_active = false;
5317         struct net_conf *nc;
5318         int ping_timeo, tcp_cork, ping_int;
5319         struct sched_param param = { .sched_priority = 2 };
5320
5321         rv = sched_setscheduler(current, SCHED_RR, &param);
5322         if (rv < 0)
5323                 drbd_err(connection, "drbd_asender: ERROR set priority, ret=%d\n", rv);
5324
5325         while (get_t_state(thi) == RUNNING) {
5326                 drbd_thread_current_set_cpu(thi);
5327
5328                 rcu_read_lock();
5329                 nc = rcu_dereference(connection->net_conf);
5330                 ping_timeo = nc->ping_timeo;
5331                 tcp_cork = nc->tcp_cork;
5332                 ping_int = nc->ping_int;
5333                 rcu_read_unlock();
5334
5335                 if (test_and_clear_bit(SEND_PING, &connection->flags)) {
5336                         if (drbd_send_ping(connection)) {
5337                                 drbd_err(connection, "drbd_send_ping has failed\n");
5338                                 goto reconnect;
5339                         }
5340                         connection->meta.socket->sk->sk_rcvtimeo = ping_timeo * HZ / 10;
5341                         ping_timeout_active = true;
5342                 }
5343
5344                 /* TODO: conditionally cork; it may hurt latency if we cork without
5345                    much to send */
5346                 if (tcp_cork)
5347                         drbd_tcp_cork(connection->meta.socket);
5348                 if (connection_finish_peer_reqs(connection)) {
5349                         drbd_err(connection, "connection_finish_peer_reqs() failed\n");
5350                         goto reconnect;
5351                 }
5352                 /* but unconditionally uncork unless disabled */
5353                 if (tcp_cork)
5354                         drbd_tcp_uncork(connection->meta.socket);
5355
5356                 /* short circuit, recv_msg would return EINTR anyways. */
5357                 if (signal_pending(current))
5358                         continue;
5359
5360                 rv = drbd_recv_short(connection->meta.socket, buf, expect-received, 0);
5361                 clear_bit(SIGNAL_ASENDER, &connection->flags);
5362
5363                 flush_signals(current);
5364
5365                 /* Note:
5366                  * -EINTR        (on meta) we got a signal
5367                  * -EAGAIN       (on meta) rcvtimeo expired
5368                  * -ECONNRESET   other side closed the connection
5369                  * -ERESTARTSYS  (on data) we got a signal
5370                  * rv <  0       other than above: unexpected error!
5371                  * rv == expected: full header or command
5372                  * rv <  expected: "woken" by signal during receive
5373                  * rv == 0       : "connection shut down by peer"
5374                  */
5375                 if (likely(rv > 0)) {
5376                         received += rv;
5377                         buf      += rv;
5378                 } else if (rv == 0) {
5379                         if (test_bit(DISCONNECT_SENT, &connection->flags)) {
5380                                 long t;
5381                                 rcu_read_lock();
5382                                 t = rcu_dereference(connection->net_conf)->ping_timeo * HZ/10;
5383                                 rcu_read_unlock();
5384
5385                                 t = wait_event_timeout(connection->ping_wait,
5386                                                        connection->cstate < C_WF_REPORT_PARAMS,
5387                                                        t);
5388                                 if (t)
5389                                         break;
5390                         }
5391                         drbd_err(connection, "meta connection shut down by peer.\n");
5392                         goto reconnect;
5393                 } else if (rv == -EAGAIN) {
5394                         /* If the data socket received something meanwhile,
5395                          * that is good enough: peer is still alive. */
5396                         if (time_after(connection->last_received,
5397                                 jiffies - connection->meta.socket->sk->sk_rcvtimeo))
5398                                 continue;
5399                         if (ping_timeout_active) {
5400                                 drbd_err(connection, "PingAck did not arrive in time.\n");
5401                                 goto reconnect;
5402                         }
5403                         set_bit(SEND_PING, &connection->flags);
5404                         continue;
5405                 } else if (rv == -EINTR) {
5406                         continue;
5407                 } else {
5408                         drbd_err(connection, "sock_recvmsg returned %d\n", rv);
5409                         goto reconnect;
5410                 }
5411
5412                 if (received == expect && cmd == NULL) {
5413                         if (decode_header(connection, connection->meta.rbuf, &pi))
5414                                 goto reconnect;
5415                         cmd = &asender_tbl[pi.cmd];
5416                         if (pi.cmd >= ARRAY_SIZE(asender_tbl) || !cmd->fn) {
5417                                 drbd_err(connection, "Unexpected meta packet %s (0x%04x)\n",
5418                                          cmdname(pi.cmd), pi.cmd);
5419                                 goto disconnect;
5420                         }
5421                         expect = header_size + cmd->pkt_size;
5422                         if (pi.size != expect - header_size) {
5423                                 drbd_err(connection, "Wrong packet size on meta (c: %d, l: %d)\n",
5424                                         pi.cmd, pi.size);
5425                                 goto reconnect;
5426                         }
5427                 }
5428                 if (received == expect) {
5429                         bool err;
5430
5431                         err = cmd->fn(connection, &pi);
5432                         if (err) {
5433                                 drbd_err(connection, "%pf failed\n", cmd->fn);
5434                                 goto reconnect;
5435                         }
5436
5437                         connection->last_received = jiffies;
5438
5439                         if (cmd == &asender_tbl[P_PING_ACK]) {
5440                                 /* restore idle timeout */
5441                                 connection->meta.socket->sk->sk_rcvtimeo = ping_int * HZ;
5442                                 ping_timeout_active = false;
5443                         }
5444
5445                         buf      = connection->meta.rbuf;
5446                         received = 0;
5447                         expect   = header_size;
5448                         cmd      = NULL;
5449                 }
5450         }
5451
5452         if (0) {
5453 reconnect:
5454                 conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
5455                 conn_md_sync(connection);
5456         }
5457         if (0) {
5458 disconnect:
5459                 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
5460         }
5461         clear_bit(SIGNAL_ASENDER, &connection->flags);
5462
5463         drbd_info(connection, "asender terminated\n");
5464
5465         return 0;
5466 }