missing bits of "splice: fix racy pipe->buffers uses"
[linux.git] / drivers / block / drbd / drbd_receiver.c
1 /*
2    drbd_receiver.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24
25
26 #include <linux/module.h>
27
28 #include <asm/uaccess.h>
29 #include <net/sock.h>
30
31 #include <linux/drbd.h>
32 #include <linux/fs.h>
33 #include <linux/file.h>
34 #include <linux/in.h>
35 #include <linux/mm.h>
36 #include <linux/memcontrol.h>
37 #include <linux/mm_inline.h>
38 #include <linux/slab.h>
39 #include <linux/pkt_sched.h>
40 #define __KERNEL_SYSCALLS__
41 #include <linux/unistd.h>
42 #include <linux/vmalloc.h>
43 #include <linux/random.h>
44 #include <linux/string.h>
45 #include <linux/scatterlist.h>
46 #include "drbd_int.h"
47 #include "drbd_req.h"
48
49 #include "drbd_vli.h"
50
51 struct packet_info {
52         enum drbd_packet cmd;
53         unsigned int size;
54         unsigned int vnr;
55         void *data;
56 };
57
58 enum finish_epoch {
59         FE_STILL_LIVE,
60         FE_DESTROYED,
61         FE_RECYCLED,
62 };
63
64 static int drbd_do_features(struct drbd_tconn *tconn);
65 static int drbd_do_auth(struct drbd_tconn *tconn);
66 static int drbd_disconnected(struct drbd_conf *mdev);
67
68 static enum finish_epoch drbd_may_finish_epoch(struct drbd_tconn *, struct drbd_epoch *, enum epoch_event);
69 static int e_end_block(struct drbd_work *, int);
70
71
72 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
73
74 /*
75  * some helper functions to deal with single linked page lists,
76  * page->private being our "next" pointer.
77  */
78
79 /* If at least n pages are linked at head, get n pages off.
80  * Otherwise, don't modify head, and return NULL.
81  * Locking is the responsibility of the caller.
82  */
83 static struct page *page_chain_del(struct page **head, int n)
84 {
85         struct page *page;
86         struct page *tmp;
87
88         BUG_ON(!n);
89         BUG_ON(!head);
90
91         page = *head;
92
93         if (!page)
94                 return NULL;
95
96         while (page) {
97                 tmp = page_chain_next(page);
98                 if (--n == 0)
99                         break; /* found sufficient pages */
100                 if (tmp == NULL)
101                         /* insufficient pages, don't use any of them. */
102                         return NULL;
103                 page = tmp;
104         }
105
106         /* add end of list marker for the returned list */
107         set_page_private(page, 0);
108         /* actual return value, and adjustment of head */
109         page = *head;
110         *head = tmp;
111         return page;
112 }
113
114 /* may be used outside of locks to find the tail of a (usually short)
115  * "private" page chain, before adding it back to a global chain head
116  * with page_chain_add() under a spinlock. */
117 static struct page *page_chain_tail(struct page *page, int *len)
118 {
119         struct page *tmp;
120         int i = 1;
121         while ((tmp = page_chain_next(page)))
122                 ++i, page = tmp;
123         if (len)
124                 *len = i;
125         return page;
126 }
127
128 static int page_chain_free(struct page *page)
129 {
130         struct page *tmp;
131         int i = 0;
132         page_chain_for_each_safe(page, tmp) {
133                 put_page(page);
134                 ++i;
135         }
136         return i;
137 }
138
139 static void page_chain_add(struct page **head,
140                 struct page *chain_first, struct page *chain_last)
141 {
142 #if 1
143         struct page *tmp;
144         tmp = page_chain_tail(chain_first, NULL);
145         BUG_ON(tmp != chain_last);
146 #endif
147
148         /* add chain to head */
149         set_page_private(chain_last, (unsigned long)*head);
150         *head = chain_first;
151 }
152
153 static struct page *__drbd_alloc_pages(struct drbd_conf *mdev,
154                                        unsigned int number)
155 {
156         struct page *page = NULL;
157         struct page *tmp = NULL;
158         unsigned int i = 0;
159
160         /* Yes, testing drbd_pp_vacant outside the lock is racy.
161          * So what. It saves a spin_lock. */
162         if (drbd_pp_vacant >= number) {
163                 spin_lock(&drbd_pp_lock);
164                 page = page_chain_del(&drbd_pp_pool, number);
165                 if (page)
166                         drbd_pp_vacant -= number;
167                 spin_unlock(&drbd_pp_lock);
168                 if (page)
169                         return page;
170         }
171
172         /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
173          * "criss-cross" setup, that might cause write-out on some other DRBD,
174          * which in turn might block on the other node at this very place.  */
175         for (i = 0; i < number; i++) {
176                 tmp = alloc_page(GFP_TRY);
177                 if (!tmp)
178                         break;
179                 set_page_private(tmp, (unsigned long)page);
180                 page = tmp;
181         }
182
183         if (i == number)
184                 return page;
185
186         /* Not enough pages immediately available this time.
187          * No need to jump around here, drbd_alloc_pages will retry this
188          * function "soon". */
189         if (page) {
190                 tmp = page_chain_tail(page, NULL);
191                 spin_lock(&drbd_pp_lock);
192                 page_chain_add(&drbd_pp_pool, page, tmp);
193                 drbd_pp_vacant += i;
194                 spin_unlock(&drbd_pp_lock);
195         }
196         return NULL;
197 }
198
199 static void reclaim_finished_net_peer_reqs(struct drbd_conf *mdev,
200                                            struct list_head *to_be_freed)
201 {
202         struct drbd_peer_request *peer_req;
203         struct list_head *le, *tle;
204
205         /* The EEs are always appended to the end of the list. Since
206            they are sent in order over the wire, they have to finish
207            in order. As soon as we see the first not finished we can
208            stop to examine the list... */
209
210         list_for_each_safe(le, tle, &mdev->net_ee) {
211                 peer_req = list_entry(le, struct drbd_peer_request, w.list);
212                 if (drbd_peer_req_has_active_page(peer_req))
213                         break;
214                 list_move(le, to_be_freed);
215         }
216 }
217
218 static void drbd_kick_lo_and_reclaim_net(struct drbd_conf *mdev)
219 {
220         LIST_HEAD(reclaimed);
221         struct drbd_peer_request *peer_req, *t;
222
223         spin_lock_irq(&mdev->tconn->req_lock);
224         reclaim_finished_net_peer_reqs(mdev, &reclaimed);
225         spin_unlock_irq(&mdev->tconn->req_lock);
226
227         list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
228                 drbd_free_net_peer_req(mdev, peer_req);
229 }
230
231 /**
232  * drbd_alloc_pages() - Returns @number pages, retries forever (or until signalled)
233  * @mdev:       DRBD device.
234  * @number:     number of pages requested
235  * @retry:      whether to retry, if not enough pages are available right now
236  *
237  * Tries to allocate number pages, first from our own page pool, then from
238  * the kernel, unless this allocation would exceed the max_buffers setting.
239  * Possibly retry until DRBD frees sufficient pages somewhere else.
240  *
241  * Returns a page chain linked via page->private.
242  */
243 struct page *drbd_alloc_pages(struct drbd_conf *mdev, unsigned int number,
244                               bool retry)
245 {
246         struct page *page = NULL;
247         struct net_conf *nc;
248         DEFINE_WAIT(wait);
249         int mxb;
250
251         /* Yes, we may run up to @number over max_buffers. If we
252          * follow it strictly, the admin will get it wrong anyways. */
253         rcu_read_lock();
254         nc = rcu_dereference(mdev->tconn->net_conf);
255         mxb = nc ? nc->max_buffers : 1000000;
256         rcu_read_unlock();
257
258         if (atomic_read(&mdev->pp_in_use) < mxb)
259                 page = __drbd_alloc_pages(mdev, number);
260
261         while (page == NULL) {
262                 prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
263
264                 drbd_kick_lo_and_reclaim_net(mdev);
265
266                 if (atomic_read(&mdev->pp_in_use) < mxb) {
267                         page = __drbd_alloc_pages(mdev, number);
268                         if (page)
269                                 break;
270                 }
271
272                 if (!retry)
273                         break;
274
275                 if (signal_pending(current)) {
276                         dev_warn(DEV, "drbd_alloc_pages interrupted!\n");
277                         break;
278                 }
279
280                 schedule();
281         }
282         finish_wait(&drbd_pp_wait, &wait);
283
284         if (page)
285                 atomic_add(number, &mdev->pp_in_use);
286         return page;
287 }
288
289 /* Must not be used from irq, as that may deadlock: see drbd_alloc_pages.
290  * Is also used from inside an other spin_lock_irq(&mdev->tconn->req_lock);
291  * Either links the page chain back to the global pool,
292  * or returns all pages to the system. */
293 static void drbd_free_pages(struct drbd_conf *mdev, struct page *page, int is_net)
294 {
295         atomic_t *a = is_net ? &mdev->pp_in_use_by_net : &mdev->pp_in_use;
296         int i;
297
298         if (page == NULL)
299                 return;
300
301         if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * minor_count)
302                 i = page_chain_free(page);
303         else {
304                 struct page *tmp;
305                 tmp = page_chain_tail(page, &i);
306                 spin_lock(&drbd_pp_lock);
307                 page_chain_add(&drbd_pp_pool, page, tmp);
308                 drbd_pp_vacant += i;
309                 spin_unlock(&drbd_pp_lock);
310         }
311         i = atomic_sub_return(i, a);
312         if (i < 0)
313                 dev_warn(DEV, "ASSERTION FAILED: %s: %d < 0\n",
314                         is_net ? "pp_in_use_by_net" : "pp_in_use", i);
315         wake_up(&drbd_pp_wait);
316 }
317
318 /*
319 You need to hold the req_lock:
320  _drbd_wait_ee_list_empty()
321
322 You must not have the req_lock:
323  drbd_free_peer_req()
324  drbd_alloc_peer_req()
325  drbd_free_peer_reqs()
326  drbd_ee_fix_bhs()
327  drbd_finish_peer_reqs()
328  drbd_clear_done_ee()
329  drbd_wait_ee_list_empty()
330 */
331
332 struct drbd_peer_request *
333 drbd_alloc_peer_req(struct drbd_conf *mdev, u64 id, sector_t sector,
334                     unsigned int data_size, gfp_t gfp_mask) __must_hold(local)
335 {
336         struct drbd_peer_request *peer_req;
337         struct page *page = NULL;
338         unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
339
340         if (drbd_insert_fault(mdev, DRBD_FAULT_AL_EE))
341                 return NULL;
342
343         peer_req = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
344         if (!peer_req) {
345                 if (!(gfp_mask & __GFP_NOWARN))
346                         dev_err(DEV, "%s: allocation failed\n", __func__);
347                 return NULL;
348         }
349
350         if (data_size) {
351                 page = drbd_alloc_pages(mdev, nr_pages, (gfp_mask & __GFP_WAIT));
352                 if (!page)
353                         goto fail;
354         }
355
356         drbd_clear_interval(&peer_req->i);
357         peer_req->i.size = data_size;
358         peer_req->i.sector = sector;
359         peer_req->i.local = false;
360         peer_req->i.waiting = false;
361
362         peer_req->epoch = NULL;
363         peer_req->w.mdev = mdev;
364         peer_req->pages = page;
365         atomic_set(&peer_req->pending_bios, 0);
366         peer_req->flags = 0;
367         /*
368          * The block_id is opaque to the receiver.  It is not endianness
369          * converted, and sent back to the sender unchanged.
370          */
371         peer_req->block_id = id;
372
373         return peer_req;
374
375  fail:
376         mempool_free(peer_req, drbd_ee_mempool);
377         return NULL;
378 }
379
380 void __drbd_free_peer_req(struct drbd_conf *mdev, struct drbd_peer_request *peer_req,
381                        int is_net)
382 {
383         if (peer_req->flags & EE_HAS_DIGEST)
384                 kfree(peer_req->digest);
385         drbd_free_pages(mdev, peer_req->pages, is_net);
386         D_ASSERT(atomic_read(&peer_req->pending_bios) == 0);
387         D_ASSERT(drbd_interval_empty(&peer_req->i));
388         mempool_free(peer_req, drbd_ee_mempool);
389 }
390
391 int drbd_free_peer_reqs(struct drbd_conf *mdev, struct list_head *list)
392 {
393         LIST_HEAD(work_list);
394         struct drbd_peer_request *peer_req, *t;
395         int count = 0;
396         int is_net = list == &mdev->net_ee;
397
398         spin_lock_irq(&mdev->tconn->req_lock);
399         list_splice_init(list, &work_list);
400         spin_unlock_irq(&mdev->tconn->req_lock);
401
402         list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
403                 __drbd_free_peer_req(mdev, peer_req, is_net);
404                 count++;
405         }
406         return count;
407 }
408
409 /*
410  * See also comments in _req_mod(,BARRIER_ACKED) and receive_Barrier.
411  */
412 static int drbd_finish_peer_reqs(struct drbd_conf *mdev)
413 {
414         LIST_HEAD(work_list);
415         LIST_HEAD(reclaimed);
416         struct drbd_peer_request *peer_req, *t;
417         int err = 0;
418
419         spin_lock_irq(&mdev->tconn->req_lock);
420         reclaim_finished_net_peer_reqs(mdev, &reclaimed);
421         list_splice_init(&mdev->done_ee, &work_list);
422         spin_unlock_irq(&mdev->tconn->req_lock);
423
424         list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
425                 drbd_free_net_peer_req(mdev, peer_req);
426
427         /* possible callbacks here:
428          * e_end_block, and e_end_resync_block, e_send_superseded.
429          * all ignore the last argument.
430          */
431         list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
432                 int err2;
433
434                 /* list_del not necessary, next/prev members not touched */
435                 err2 = peer_req->w.cb(&peer_req->w, !!err);
436                 if (!err)
437                         err = err2;
438                 drbd_free_peer_req(mdev, peer_req);
439         }
440         wake_up(&mdev->ee_wait);
441
442         return err;
443 }
444
445 static void _drbd_wait_ee_list_empty(struct drbd_conf *mdev,
446                                      struct list_head *head)
447 {
448         DEFINE_WAIT(wait);
449
450         /* avoids spin_lock/unlock
451          * and calling prepare_to_wait in the fast path */
452         while (!list_empty(head)) {
453                 prepare_to_wait(&mdev->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
454                 spin_unlock_irq(&mdev->tconn->req_lock);
455                 io_schedule();
456                 finish_wait(&mdev->ee_wait, &wait);
457                 spin_lock_irq(&mdev->tconn->req_lock);
458         }
459 }
460
461 static void drbd_wait_ee_list_empty(struct drbd_conf *mdev,
462                                     struct list_head *head)
463 {
464         spin_lock_irq(&mdev->tconn->req_lock);
465         _drbd_wait_ee_list_empty(mdev, head);
466         spin_unlock_irq(&mdev->tconn->req_lock);
467 }
468
469 static int drbd_recv_short(struct socket *sock, void *buf, size_t size, int flags)
470 {
471         struct kvec iov = {
472                 .iov_base = buf,
473                 .iov_len = size,
474         };
475         struct msghdr msg = {
476                 .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
477         };
478         return kernel_recvmsg(sock, &msg, &iov, 1, size, msg.msg_flags);
479 }
480
481 static int drbd_recv(struct drbd_tconn *tconn, void *buf, size_t size)
482 {
483         int rv;
484
485         rv = drbd_recv_short(tconn->data.socket, buf, size, 0);
486
487         if (rv < 0) {
488                 if (rv == -ECONNRESET)
489                         conn_info(tconn, "sock was reset by peer\n");
490                 else if (rv != -ERESTARTSYS)
491                         conn_err(tconn, "sock_recvmsg returned %d\n", rv);
492         } else if (rv == 0) {
493                 if (test_bit(DISCONNECT_SENT, &tconn->flags)) {
494                         long t;
495                         rcu_read_lock();
496                         t = rcu_dereference(tconn->net_conf)->ping_timeo * HZ/10;
497                         rcu_read_unlock();
498
499                         t = wait_event_timeout(tconn->ping_wait, tconn->cstate < C_WF_REPORT_PARAMS, t);
500
501                         if (t)
502                                 goto out;
503                 }
504                 conn_info(tconn, "sock was shut down by peer\n");
505         }
506
507         if (rv != size)
508                 conn_request_state(tconn, NS(conn, C_BROKEN_PIPE), CS_HARD);
509
510 out:
511         return rv;
512 }
513
514 static int drbd_recv_all(struct drbd_tconn *tconn, void *buf, size_t size)
515 {
516         int err;
517
518         err = drbd_recv(tconn, buf, size);
519         if (err != size) {
520                 if (err >= 0)
521                         err = -EIO;
522         } else
523                 err = 0;
524         return err;
525 }
526
527 static int drbd_recv_all_warn(struct drbd_tconn *tconn, void *buf, size_t size)
528 {
529         int err;
530
531         err = drbd_recv_all(tconn, buf, size);
532         if (err && !signal_pending(current))
533                 conn_warn(tconn, "short read (expected size %d)\n", (int)size);
534         return err;
535 }
536
537 /* quoting tcp(7):
538  *   On individual connections, the socket buffer size must be set prior to the
539  *   listen(2) or connect(2) calls in order to have it take effect.
540  * This is our wrapper to do so.
541  */
542 static void drbd_setbufsize(struct socket *sock, unsigned int snd,
543                 unsigned int rcv)
544 {
545         /* open coded SO_SNDBUF, SO_RCVBUF */
546         if (snd) {
547                 sock->sk->sk_sndbuf = snd;
548                 sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
549         }
550         if (rcv) {
551                 sock->sk->sk_rcvbuf = rcv;
552                 sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
553         }
554 }
555
556 static struct socket *drbd_try_connect(struct drbd_tconn *tconn)
557 {
558         const char *what;
559         struct socket *sock;
560         struct sockaddr_in6 src_in6;
561         struct sockaddr_in6 peer_in6;
562         struct net_conf *nc;
563         int err, peer_addr_len, my_addr_len;
564         int sndbuf_size, rcvbuf_size, connect_int;
565         int disconnect_on_error = 1;
566
567         rcu_read_lock();
568         nc = rcu_dereference(tconn->net_conf);
569         if (!nc) {
570                 rcu_read_unlock();
571                 return NULL;
572         }
573         sndbuf_size = nc->sndbuf_size;
574         rcvbuf_size = nc->rcvbuf_size;
575         connect_int = nc->connect_int;
576         rcu_read_unlock();
577
578         my_addr_len = min_t(int, tconn->my_addr_len, sizeof(src_in6));
579         memcpy(&src_in6, &tconn->my_addr, my_addr_len);
580
581         if (((struct sockaddr *)&tconn->my_addr)->sa_family == AF_INET6)
582                 src_in6.sin6_port = 0;
583         else
584                 ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
585
586         peer_addr_len = min_t(int, tconn->peer_addr_len, sizeof(src_in6));
587         memcpy(&peer_in6, &tconn->peer_addr, peer_addr_len);
588
589         what = "sock_create_kern";
590         err = sock_create_kern(((struct sockaddr *)&src_in6)->sa_family,
591                                SOCK_STREAM, IPPROTO_TCP, &sock);
592         if (err < 0) {
593                 sock = NULL;
594                 goto out;
595         }
596
597         sock->sk->sk_rcvtimeo =
598         sock->sk->sk_sndtimeo = connect_int * HZ;
599         drbd_setbufsize(sock, sndbuf_size, rcvbuf_size);
600
601        /* explicitly bind to the configured IP as source IP
602         *  for the outgoing connections.
603         *  This is needed for multihomed hosts and to be
604         *  able to use lo: interfaces for drbd.
605         * Make sure to use 0 as port number, so linux selects
606         *  a free one dynamically.
607         */
608         what = "bind before connect";
609         err = sock->ops->bind(sock, (struct sockaddr *) &src_in6, my_addr_len);
610         if (err < 0)
611                 goto out;
612
613         /* connect may fail, peer not yet available.
614          * stay C_WF_CONNECTION, don't go Disconnecting! */
615         disconnect_on_error = 0;
616         what = "connect";
617         err = sock->ops->connect(sock, (struct sockaddr *) &peer_in6, peer_addr_len, 0);
618
619 out:
620         if (err < 0) {
621                 if (sock) {
622                         sock_release(sock);
623                         sock = NULL;
624                 }
625                 switch (-err) {
626                         /* timeout, busy, signal pending */
627                 case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
628                 case EINTR: case ERESTARTSYS:
629                         /* peer not (yet) available, network problem */
630                 case ECONNREFUSED: case ENETUNREACH:
631                 case EHOSTDOWN:    case EHOSTUNREACH:
632                         disconnect_on_error = 0;
633                         break;
634                 default:
635                         conn_err(tconn, "%s failed, err = %d\n", what, err);
636                 }
637                 if (disconnect_on_error)
638                         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
639         }
640
641         return sock;
642 }
643
644 struct accept_wait_data {
645         struct drbd_tconn *tconn;
646         struct socket *s_listen;
647         struct completion door_bell;
648         void (*original_sk_state_change)(struct sock *sk);
649
650 };
651
652 static void drbd_incoming_connection(struct sock *sk)
653 {
654         struct accept_wait_data *ad = sk->sk_user_data;
655         void (*state_change)(struct sock *sk);
656
657         state_change = ad->original_sk_state_change;
658         if (sk->sk_state == TCP_ESTABLISHED)
659                 complete(&ad->door_bell);
660         state_change(sk);
661 }
662
663 static int prepare_listen_socket(struct drbd_tconn *tconn, struct accept_wait_data *ad)
664 {
665         int err, sndbuf_size, rcvbuf_size, my_addr_len;
666         struct sockaddr_in6 my_addr;
667         struct socket *s_listen;
668         struct net_conf *nc;
669         const char *what;
670
671         rcu_read_lock();
672         nc = rcu_dereference(tconn->net_conf);
673         if (!nc) {
674                 rcu_read_unlock();
675                 return -EIO;
676         }
677         sndbuf_size = nc->sndbuf_size;
678         rcvbuf_size = nc->rcvbuf_size;
679         rcu_read_unlock();
680
681         my_addr_len = min_t(int, tconn->my_addr_len, sizeof(struct sockaddr_in6));
682         memcpy(&my_addr, &tconn->my_addr, my_addr_len);
683
684         what = "sock_create_kern";
685         err = sock_create_kern(((struct sockaddr *)&my_addr)->sa_family,
686                                SOCK_STREAM, IPPROTO_TCP, &s_listen);
687         if (err) {
688                 s_listen = NULL;
689                 goto out;
690         }
691
692         s_listen->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
693         drbd_setbufsize(s_listen, sndbuf_size, rcvbuf_size);
694
695         what = "bind before listen";
696         err = s_listen->ops->bind(s_listen, (struct sockaddr *)&my_addr, my_addr_len);
697         if (err < 0)
698                 goto out;
699
700         ad->s_listen = s_listen;
701         write_lock_bh(&s_listen->sk->sk_callback_lock);
702         ad->original_sk_state_change = s_listen->sk->sk_state_change;
703         s_listen->sk->sk_state_change = drbd_incoming_connection;
704         s_listen->sk->sk_user_data = ad;
705         write_unlock_bh(&s_listen->sk->sk_callback_lock);
706
707         what = "listen";
708         err = s_listen->ops->listen(s_listen, 5);
709         if (err < 0)
710                 goto out;
711
712         return 0;
713 out:
714         if (s_listen)
715                 sock_release(s_listen);
716         if (err < 0) {
717                 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
718                         conn_err(tconn, "%s failed, err = %d\n", what, err);
719                         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
720                 }
721         }
722
723         return -EIO;
724 }
725
726 static void unregister_state_change(struct sock *sk, struct accept_wait_data *ad)
727 {
728         write_lock_bh(&sk->sk_callback_lock);
729         sk->sk_state_change = ad->original_sk_state_change;
730         sk->sk_user_data = NULL;
731         write_unlock_bh(&sk->sk_callback_lock);
732 }
733
734 static struct socket *drbd_wait_for_connect(struct drbd_tconn *tconn, struct accept_wait_data *ad)
735 {
736         int timeo, connect_int, err = 0;
737         struct socket *s_estab = NULL;
738         struct net_conf *nc;
739
740         rcu_read_lock();
741         nc = rcu_dereference(tconn->net_conf);
742         if (!nc) {
743                 rcu_read_unlock();
744                 return NULL;
745         }
746         connect_int = nc->connect_int;
747         rcu_read_unlock();
748
749         timeo = connect_int * HZ;
750         /* 28.5% random jitter */
751         timeo += (prandom_u32() & 1) ? timeo / 7 : -timeo / 7;
752
753         err = wait_for_completion_interruptible_timeout(&ad->door_bell, timeo);
754         if (err <= 0)
755                 return NULL;
756
757         err = kernel_accept(ad->s_listen, &s_estab, 0);
758         if (err < 0) {
759                 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
760                         conn_err(tconn, "accept failed, err = %d\n", err);
761                         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
762                 }
763         }
764
765         if (s_estab)
766                 unregister_state_change(s_estab->sk, ad);
767
768         return s_estab;
769 }
770
771 static int decode_header(struct drbd_tconn *, void *, struct packet_info *);
772
773 static int send_first_packet(struct drbd_tconn *tconn, struct drbd_socket *sock,
774                              enum drbd_packet cmd)
775 {
776         if (!conn_prepare_command(tconn, sock))
777                 return -EIO;
778         return conn_send_command(tconn, sock, cmd, 0, NULL, 0);
779 }
780
781 static int receive_first_packet(struct drbd_tconn *tconn, struct socket *sock)
782 {
783         unsigned int header_size = drbd_header_size(tconn);
784         struct packet_info pi;
785         int err;
786
787         err = drbd_recv_short(sock, tconn->data.rbuf, header_size, 0);
788         if (err != header_size) {
789                 if (err >= 0)
790                         err = -EIO;
791                 return err;
792         }
793         err = decode_header(tconn, tconn->data.rbuf, &pi);
794         if (err)
795                 return err;
796         return pi.cmd;
797 }
798
799 /**
800  * drbd_socket_okay() - Free the socket if its connection is not okay
801  * @sock:       pointer to the pointer to the socket.
802  */
803 static int drbd_socket_okay(struct socket **sock)
804 {
805         int rr;
806         char tb[4];
807
808         if (!*sock)
809                 return false;
810
811         rr = drbd_recv_short(*sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
812
813         if (rr > 0 || rr == -EAGAIN) {
814                 return true;
815         } else {
816                 sock_release(*sock);
817                 *sock = NULL;
818                 return false;
819         }
820 }
821 /* Gets called if a connection is established, or if a new minor gets created
822    in a connection */
823 int drbd_connected(struct drbd_conf *mdev)
824 {
825         int err;
826
827         atomic_set(&mdev->packet_seq, 0);
828         mdev->peer_seq = 0;
829
830         mdev->state_mutex = mdev->tconn->agreed_pro_version < 100 ?
831                 &mdev->tconn->cstate_mutex :
832                 &mdev->own_state_mutex;
833
834         err = drbd_send_sync_param(mdev);
835         if (!err)
836                 err = drbd_send_sizes(mdev, 0, 0);
837         if (!err)
838                 err = drbd_send_uuids(mdev);
839         if (!err)
840                 err = drbd_send_current_state(mdev);
841         clear_bit(USE_DEGR_WFC_T, &mdev->flags);
842         clear_bit(RESIZE_PENDING, &mdev->flags);
843         atomic_set(&mdev->ap_in_flight, 0);
844         mod_timer(&mdev->request_timer, jiffies + HZ); /* just start it here. */
845         return err;
846 }
847
848 /*
849  * return values:
850  *   1 yes, we have a valid connection
851  *   0 oops, did not work out, please try again
852  *  -1 peer talks different language,
853  *     no point in trying again, please go standalone.
854  *  -2 We do not have a network config...
855  */
856 static int conn_connect(struct drbd_tconn *tconn)
857 {
858         struct drbd_socket sock, msock;
859         struct drbd_conf *mdev;
860         struct net_conf *nc;
861         int vnr, timeout, h, ok;
862         bool discard_my_data;
863         enum drbd_state_rv rv;
864         struct accept_wait_data ad = {
865                 .tconn = tconn,
866                 .door_bell = COMPLETION_INITIALIZER_ONSTACK(ad.door_bell),
867         };
868
869         clear_bit(DISCONNECT_SENT, &tconn->flags);
870         if (conn_request_state(tconn, NS(conn, C_WF_CONNECTION), CS_VERBOSE) < SS_SUCCESS)
871                 return -2;
872
873         mutex_init(&sock.mutex);
874         sock.sbuf = tconn->data.sbuf;
875         sock.rbuf = tconn->data.rbuf;
876         sock.socket = NULL;
877         mutex_init(&msock.mutex);
878         msock.sbuf = tconn->meta.sbuf;
879         msock.rbuf = tconn->meta.rbuf;
880         msock.socket = NULL;
881
882         /* Assume that the peer only understands protocol 80 until we know better.  */
883         tconn->agreed_pro_version = 80;
884
885         if (prepare_listen_socket(tconn, &ad))
886                 return 0;
887
888         do {
889                 struct socket *s;
890
891                 s = drbd_try_connect(tconn);
892                 if (s) {
893                         if (!sock.socket) {
894                                 sock.socket = s;
895                                 send_first_packet(tconn, &sock, P_INITIAL_DATA);
896                         } else if (!msock.socket) {
897                                 clear_bit(RESOLVE_CONFLICTS, &tconn->flags);
898                                 msock.socket = s;
899                                 send_first_packet(tconn, &msock, P_INITIAL_META);
900                         } else {
901                                 conn_err(tconn, "Logic error in conn_connect()\n");
902                                 goto out_release_sockets;
903                         }
904                 }
905
906                 if (sock.socket && msock.socket) {
907                         rcu_read_lock();
908                         nc = rcu_dereference(tconn->net_conf);
909                         timeout = nc->ping_timeo * HZ / 10;
910                         rcu_read_unlock();
911                         schedule_timeout_interruptible(timeout);
912                         ok = drbd_socket_okay(&sock.socket);
913                         ok = drbd_socket_okay(&msock.socket) && ok;
914                         if (ok)
915                                 break;
916                 }
917
918 retry:
919                 s = drbd_wait_for_connect(tconn, &ad);
920                 if (s) {
921                         int fp = receive_first_packet(tconn, s);
922                         drbd_socket_okay(&sock.socket);
923                         drbd_socket_okay(&msock.socket);
924                         switch (fp) {
925                         case P_INITIAL_DATA:
926                                 if (sock.socket) {
927                                         conn_warn(tconn, "initial packet S crossed\n");
928                                         sock_release(sock.socket);
929                                         sock.socket = s;
930                                         goto randomize;
931                                 }
932                                 sock.socket = s;
933                                 break;
934                         case P_INITIAL_META:
935                                 set_bit(RESOLVE_CONFLICTS, &tconn->flags);
936                                 if (msock.socket) {
937                                         conn_warn(tconn, "initial packet M crossed\n");
938                                         sock_release(msock.socket);
939                                         msock.socket = s;
940                                         goto randomize;
941                                 }
942                                 msock.socket = s;
943                                 break;
944                         default:
945                                 conn_warn(tconn, "Error receiving initial packet\n");
946                                 sock_release(s);
947 randomize:
948                                 if (prandom_u32() & 1)
949                                         goto retry;
950                         }
951                 }
952
953                 if (tconn->cstate <= C_DISCONNECTING)
954                         goto out_release_sockets;
955                 if (signal_pending(current)) {
956                         flush_signals(current);
957                         smp_rmb();
958                         if (get_t_state(&tconn->receiver) == EXITING)
959                                 goto out_release_sockets;
960                 }
961
962                 ok = drbd_socket_okay(&sock.socket);
963                 ok = drbd_socket_okay(&msock.socket) && ok;
964         } while (!ok);
965
966         if (ad.s_listen)
967                 sock_release(ad.s_listen);
968
969         sock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
970         msock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
971
972         sock.socket->sk->sk_allocation = GFP_NOIO;
973         msock.socket->sk->sk_allocation = GFP_NOIO;
974
975         sock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
976         msock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE;
977
978         /* NOT YET ...
979          * sock.socket->sk->sk_sndtimeo = tconn->net_conf->timeout*HZ/10;
980          * sock.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
981          * first set it to the P_CONNECTION_FEATURES timeout,
982          * which we set to 4x the configured ping_timeout. */
983         rcu_read_lock();
984         nc = rcu_dereference(tconn->net_conf);
985
986         sock.socket->sk->sk_sndtimeo =
987         sock.socket->sk->sk_rcvtimeo = nc->ping_timeo*4*HZ/10;
988
989         msock.socket->sk->sk_rcvtimeo = nc->ping_int*HZ;
990         timeout = nc->timeout * HZ / 10;
991         discard_my_data = nc->discard_my_data;
992         rcu_read_unlock();
993
994         msock.socket->sk->sk_sndtimeo = timeout;
995
996         /* we don't want delays.
997          * we use TCP_CORK where appropriate, though */
998         drbd_tcp_nodelay(sock.socket);
999         drbd_tcp_nodelay(msock.socket);
1000
1001         tconn->data.socket = sock.socket;
1002         tconn->meta.socket = msock.socket;
1003         tconn->last_received = jiffies;
1004
1005         h = drbd_do_features(tconn);
1006         if (h <= 0)
1007                 return h;
1008
1009         if (tconn->cram_hmac_tfm) {
1010                 /* drbd_request_state(mdev, NS(conn, WFAuth)); */
1011                 switch (drbd_do_auth(tconn)) {
1012                 case -1:
1013                         conn_err(tconn, "Authentication of peer failed\n");
1014                         return -1;
1015                 case 0:
1016                         conn_err(tconn, "Authentication of peer failed, trying again.\n");
1017                         return 0;
1018                 }
1019         }
1020
1021         tconn->data.socket->sk->sk_sndtimeo = timeout;
1022         tconn->data.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
1023
1024         if (drbd_send_protocol(tconn) == -EOPNOTSUPP)
1025                 return -1;
1026
1027         set_bit(STATE_SENT, &tconn->flags);
1028
1029         rcu_read_lock();
1030         idr_for_each_entry(&tconn->volumes, mdev, vnr) {
1031                 kref_get(&mdev->kref);
1032                 rcu_read_unlock();
1033
1034                 /* Prevent a race between resync-handshake and
1035                  * being promoted to Primary.
1036                  *
1037                  * Grab and release the state mutex, so we know that any current
1038                  * drbd_set_role() is finished, and any incoming drbd_set_role
1039                  * will see the STATE_SENT flag, and wait for it to be cleared.
1040                  */
1041                 mutex_lock(mdev->state_mutex);
1042                 mutex_unlock(mdev->state_mutex);
1043
1044                 if (discard_my_data)
1045                         set_bit(DISCARD_MY_DATA, &mdev->flags);
1046                 else
1047                         clear_bit(DISCARD_MY_DATA, &mdev->flags);
1048
1049                 drbd_connected(mdev);
1050                 kref_put(&mdev->kref, &drbd_minor_destroy);
1051                 rcu_read_lock();
1052         }
1053         rcu_read_unlock();
1054
1055         rv = conn_request_state(tconn, NS(conn, C_WF_REPORT_PARAMS), CS_VERBOSE);
1056         if (rv < SS_SUCCESS || tconn->cstate != C_WF_REPORT_PARAMS) {
1057                 clear_bit(STATE_SENT, &tconn->flags);
1058                 return 0;
1059         }
1060
1061         drbd_thread_start(&tconn->asender);
1062
1063         mutex_lock(&tconn->conf_update);
1064         /* The discard_my_data flag is a single-shot modifier to the next
1065          * connection attempt, the handshake of which is now well underway.
1066          * No need for rcu style copying of the whole struct
1067          * just to clear a single value. */
1068         tconn->net_conf->discard_my_data = 0;
1069         mutex_unlock(&tconn->conf_update);
1070
1071         return h;
1072
1073 out_release_sockets:
1074         if (ad.s_listen)
1075                 sock_release(ad.s_listen);
1076         if (sock.socket)
1077                 sock_release(sock.socket);
1078         if (msock.socket)
1079                 sock_release(msock.socket);
1080         return -1;
1081 }
1082
1083 static int decode_header(struct drbd_tconn *tconn, void *header, struct packet_info *pi)
1084 {
1085         unsigned int header_size = drbd_header_size(tconn);
1086
1087         if (header_size == sizeof(struct p_header100) &&
1088             *(__be32 *)header == cpu_to_be32(DRBD_MAGIC_100)) {
1089                 struct p_header100 *h = header;
1090                 if (h->pad != 0) {
1091                         conn_err(tconn, "Header padding is not zero\n");
1092                         return -EINVAL;
1093                 }
1094                 pi->vnr = be16_to_cpu(h->volume);
1095                 pi->cmd = be16_to_cpu(h->command);
1096                 pi->size = be32_to_cpu(h->length);
1097         } else if (header_size == sizeof(struct p_header95) &&
1098                    *(__be16 *)header == cpu_to_be16(DRBD_MAGIC_BIG)) {
1099                 struct p_header95 *h = header;
1100                 pi->cmd = be16_to_cpu(h->command);
1101                 pi->size = be32_to_cpu(h->length);
1102                 pi->vnr = 0;
1103         } else if (header_size == sizeof(struct p_header80) &&
1104                    *(__be32 *)header == cpu_to_be32(DRBD_MAGIC)) {
1105                 struct p_header80 *h = header;
1106                 pi->cmd = be16_to_cpu(h->command);
1107                 pi->size = be16_to_cpu(h->length);
1108                 pi->vnr = 0;
1109         } else {
1110                 conn_err(tconn, "Wrong magic value 0x%08x in protocol version %d\n",
1111                          be32_to_cpu(*(__be32 *)header),
1112                          tconn->agreed_pro_version);
1113                 return -EINVAL;
1114         }
1115         pi->data = header + header_size;
1116         return 0;
1117 }
1118
1119 static int drbd_recv_header(struct drbd_tconn *tconn, struct packet_info *pi)
1120 {
1121         void *buffer = tconn->data.rbuf;
1122         int err;
1123
1124         err = drbd_recv_all_warn(tconn, buffer, drbd_header_size(tconn));
1125         if (err)
1126                 return err;
1127
1128         err = decode_header(tconn, buffer, pi);
1129         tconn->last_received = jiffies;
1130
1131         return err;
1132 }
1133
1134 static void drbd_flush(struct drbd_tconn *tconn)
1135 {
1136         int rv;
1137         struct drbd_conf *mdev;
1138         int vnr;
1139
1140         if (tconn->write_ordering >= WO_bdev_flush) {
1141                 rcu_read_lock();
1142                 idr_for_each_entry(&tconn->volumes, mdev, vnr) {
1143                         if (!get_ldev(mdev))
1144                                 continue;
1145                         kref_get(&mdev->kref);
1146                         rcu_read_unlock();
1147
1148                         rv = blkdev_issue_flush(mdev->ldev->backing_bdev,
1149                                         GFP_NOIO, NULL);
1150                         if (rv) {
1151                                 dev_info(DEV, "local disk flush failed with status %d\n", rv);
1152                                 /* would rather check on EOPNOTSUPP, but that is not reliable.
1153                                  * don't try again for ANY return value != 0
1154                                  * if (rv == -EOPNOTSUPP) */
1155                                 drbd_bump_write_ordering(tconn, WO_drain_io);
1156                         }
1157                         put_ldev(mdev);
1158                         kref_put(&mdev->kref, &drbd_minor_destroy);
1159
1160                         rcu_read_lock();
1161                         if (rv)
1162                                 break;
1163                 }
1164                 rcu_read_unlock();
1165         }
1166 }
1167
1168 /**
1169  * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
1170  * @mdev:       DRBD device.
1171  * @epoch:      Epoch object.
1172  * @ev:         Epoch event.
1173  */
1174 static enum finish_epoch drbd_may_finish_epoch(struct drbd_tconn *tconn,
1175                                                struct drbd_epoch *epoch,
1176                                                enum epoch_event ev)
1177 {
1178         int epoch_size;
1179         struct drbd_epoch *next_epoch;
1180         enum finish_epoch rv = FE_STILL_LIVE;
1181
1182         spin_lock(&tconn->epoch_lock);
1183         do {
1184                 next_epoch = NULL;
1185
1186                 epoch_size = atomic_read(&epoch->epoch_size);
1187
1188                 switch (ev & ~EV_CLEANUP) {
1189                 case EV_PUT:
1190                         atomic_dec(&epoch->active);
1191                         break;
1192                 case EV_GOT_BARRIER_NR:
1193                         set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1194                         break;
1195                 case EV_BECAME_LAST:
1196                         /* nothing to do*/
1197                         break;
1198                 }
1199
1200                 if (epoch_size != 0 &&
1201                     atomic_read(&epoch->active) == 0 &&
1202                     (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags) || ev & EV_CLEANUP)) {
1203                         if (!(ev & EV_CLEANUP)) {
1204                                 spin_unlock(&tconn->epoch_lock);
1205                                 drbd_send_b_ack(epoch->tconn, epoch->barrier_nr, epoch_size);
1206                                 spin_lock(&tconn->epoch_lock);
1207                         }
1208 #if 0
1209                         /* FIXME: dec unacked on connection, once we have
1210                          * something to count pending connection packets in. */
1211                         if (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags))
1212                                 dec_unacked(epoch->tconn);
1213 #endif
1214
1215                         if (tconn->current_epoch != epoch) {
1216                                 next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1217                                 list_del(&epoch->list);
1218                                 ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1219                                 tconn->epochs--;
1220                                 kfree(epoch);
1221
1222                                 if (rv == FE_STILL_LIVE)
1223                                         rv = FE_DESTROYED;
1224                         } else {
1225                                 epoch->flags = 0;
1226                                 atomic_set(&epoch->epoch_size, 0);
1227                                 /* atomic_set(&epoch->active, 0); is already zero */
1228                                 if (rv == FE_STILL_LIVE)
1229                                         rv = FE_RECYCLED;
1230                         }
1231                 }
1232
1233                 if (!next_epoch)
1234                         break;
1235
1236                 epoch = next_epoch;
1237         } while (1);
1238
1239         spin_unlock(&tconn->epoch_lock);
1240
1241         return rv;
1242 }
1243
1244 /**
1245  * drbd_bump_write_ordering() - Fall back to an other write ordering method
1246  * @tconn:      DRBD connection.
1247  * @wo:         Write ordering method to try.
1248  */
1249 void drbd_bump_write_ordering(struct drbd_tconn *tconn, enum write_ordering_e wo)
1250 {
1251         struct disk_conf *dc;
1252         struct drbd_conf *mdev;
1253         enum write_ordering_e pwo;
1254         int vnr;
1255         static char *write_ordering_str[] = {
1256                 [WO_none] = "none",
1257                 [WO_drain_io] = "drain",
1258                 [WO_bdev_flush] = "flush",
1259         };
1260
1261         pwo = tconn->write_ordering;
1262         wo = min(pwo, wo);
1263         rcu_read_lock();
1264         idr_for_each_entry(&tconn->volumes, mdev, vnr) {
1265                 if (!get_ldev_if_state(mdev, D_ATTACHING))
1266                         continue;
1267                 dc = rcu_dereference(mdev->ldev->disk_conf);
1268
1269                 if (wo == WO_bdev_flush && !dc->disk_flushes)
1270                         wo = WO_drain_io;
1271                 if (wo == WO_drain_io && !dc->disk_drain)
1272                         wo = WO_none;
1273                 put_ldev(mdev);
1274         }
1275         rcu_read_unlock();
1276         tconn->write_ordering = wo;
1277         if (pwo != tconn->write_ordering || wo == WO_bdev_flush)
1278                 conn_info(tconn, "Method to ensure write ordering: %s\n", write_ordering_str[tconn->write_ordering]);
1279 }
1280
1281 /**
1282  * drbd_submit_peer_request()
1283  * @mdev:       DRBD device.
1284  * @peer_req:   peer request
1285  * @rw:         flag field, see bio->bi_rw
1286  *
1287  * May spread the pages to multiple bios,
1288  * depending on bio_add_page restrictions.
1289  *
1290  * Returns 0 if all bios have been submitted,
1291  * -ENOMEM if we could not allocate enough bios,
1292  * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1293  *  single page to an empty bio (which should never happen and likely indicates
1294  *  that the lower level IO stack is in some way broken). This has been observed
1295  *  on certain Xen deployments.
1296  */
1297 /* TODO allocate from our own bio_set. */
1298 int drbd_submit_peer_request(struct drbd_conf *mdev,
1299                              struct drbd_peer_request *peer_req,
1300                              const unsigned rw, const int fault_type)
1301 {
1302         struct bio *bios = NULL;
1303         struct bio *bio;
1304         struct page *page = peer_req->pages;
1305         sector_t sector = peer_req->i.sector;
1306         unsigned ds = peer_req->i.size;
1307         unsigned n_bios = 0;
1308         unsigned nr_pages = (ds + PAGE_SIZE -1) >> PAGE_SHIFT;
1309         int err = -ENOMEM;
1310
1311         /* In most cases, we will only need one bio.  But in case the lower
1312          * level restrictions happen to be different at this offset on this
1313          * side than those of the sending peer, we may need to submit the
1314          * request in more than one bio.
1315          *
1316          * Plain bio_alloc is good enough here, this is no DRBD internally
1317          * generated bio, but a bio allocated on behalf of the peer.
1318          */
1319 next_bio:
1320         bio = bio_alloc(GFP_NOIO, nr_pages);
1321         if (!bio) {
1322                 dev_err(DEV, "submit_ee: Allocation of a bio failed\n");
1323                 goto fail;
1324         }
1325         /* > peer_req->i.sector, unless this is the first bio */
1326         bio->bi_iter.bi_sector = sector;
1327         bio->bi_bdev = mdev->ldev->backing_bdev;
1328         bio->bi_rw = rw;
1329         bio->bi_private = peer_req;
1330         bio->bi_end_io = drbd_peer_request_endio;
1331
1332         bio->bi_next = bios;
1333         bios = bio;
1334         ++n_bios;
1335
1336         page_chain_for_each(page) {
1337                 unsigned len = min_t(unsigned, ds, PAGE_SIZE);
1338                 if (!bio_add_page(bio, page, len, 0)) {
1339                         /* A single page must always be possible!
1340                          * But in case it fails anyways,
1341                          * we deal with it, and complain (below). */
1342                         if (bio->bi_vcnt == 0) {
1343                                 dev_err(DEV,
1344                                         "bio_add_page failed for len=%u, "
1345                                         "bi_vcnt=0 (bi_sector=%llu)\n",
1346                                         len, (uint64_t)bio->bi_iter.bi_sector);
1347                                 err = -ENOSPC;
1348                                 goto fail;
1349                         }
1350                         goto next_bio;
1351                 }
1352                 ds -= len;
1353                 sector += len >> 9;
1354                 --nr_pages;
1355         }
1356         D_ASSERT(page == NULL);
1357         D_ASSERT(ds == 0);
1358
1359         atomic_set(&peer_req->pending_bios, n_bios);
1360         do {
1361                 bio = bios;
1362                 bios = bios->bi_next;
1363                 bio->bi_next = NULL;
1364
1365                 drbd_generic_make_request(mdev, fault_type, bio);
1366         } while (bios);
1367         return 0;
1368
1369 fail:
1370         while (bios) {
1371                 bio = bios;
1372                 bios = bios->bi_next;
1373                 bio_put(bio);
1374         }
1375         return err;
1376 }
1377
1378 static void drbd_remove_epoch_entry_interval(struct drbd_conf *mdev,
1379                                              struct drbd_peer_request *peer_req)
1380 {
1381         struct drbd_interval *i = &peer_req->i;
1382
1383         drbd_remove_interval(&mdev->write_requests, i);
1384         drbd_clear_interval(i);
1385
1386         /* Wake up any processes waiting for this peer request to complete.  */
1387         if (i->waiting)
1388                 wake_up(&mdev->misc_wait);
1389 }
1390
1391 void conn_wait_active_ee_empty(struct drbd_tconn *tconn)
1392 {
1393         struct drbd_conf *mdev;
1394         int vnr;
1395
1396         rcu_read_lock();
1397         idr_for_each_entry(&tconn->volumes, mdev, vnr) {
1398                 kref_get(&mdev->kref);
1399                 rcu_read_unlock();
1400                 drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
1401                 kref_put(&mdev->kref, &drbd_minor_destroy);
1402                 rcu_read_lock();
1403         }
1404         rcu_read_unlock();
1405 }
1406
1407 static int receive_Barrier(struct drbd_tconn *tconn, struct packet_info *pi)
1408 {
1409         int rv;
1410         struct p_barrier *p = pi->data;
1411         struct drbd_epoch *epoch;
1412
1413         /* FIXME these are unacked on connection,
1414          * not a specific (peer)device.
1415          */
1416         tconn->current_epoch->barrier_nr = p->barrier;
1417         tconn->current_epoch->tconn = tconn;
1418         rv = drbd_may_finish_epoch(tconn, tconn->current_epoch, EV_GOT_BARRIER_NR);
1419
1420         /* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1421          * the activity log, which means it would not be resynced in case the
1422          * R_PRIMARY crashes now.
1423          * Therefore we must send the barrier_ack after the barrier request was
1424          * completed. */
1425         switch (tconn->write_ordering) {
1426         case WO_none:
1427                 if (rv == FE_RECYCLED)
1428                         return 0;
1429
1430                 /* receiver context, in the writeout path of the other node.
1431                  * avoid potential distributed deadlock */
1432                 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1433                 if (epoch)
1434                         break;
1435                 else
1436                         conn_warn(tconn, "Allocation of an epoch failed, slowing down\n");
1437                         /* Fall through */
1438
1439         case WO_bdev_flush:
1440         case WO_drain_io:
1441                 conn_wait_active_ee_empty(tconn);
1442                 drbd_flush(tconn);
1443
1444                 if (atomic_read(&tconn->current_epoch->epoch_size)) {
1445                         epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1446                         if (epoch)
1447                                 break;
1448                 }
1449
1450                 return 0;
1451         default:
1452                 conn_err(tconn, "Strangeness in tconn->write_ordering %d\n", tconn->write_ordering);
1453                 return -EIO;
1454         }
1455
1456         epoch->flags = 0;
1457         atomic_set(&epoch->epoch_size, 0);
1458         atomic_set(&epoch->active, 0);
1459
1460         spin_lock(&tconn->epoch_lock);
1461         if (atomic_read(&tconn->current_epoch->epoch_size)) {
1462                 list_add(&epoch->list, &tconn->current_epoch->list);
1463                 tconn->current_epoch = epoch;
1464                 tconn->epochs++;
1465         } else {
1466                 /* The current_epoch got recycled while we allocated this one... */
1467                 kfree(epoch);
1468         }
1469         spin_unlock(&tconn->epoch_lock);
1470
1471         return 0;
1472 }
1473
1474 /* used from receive_RSDataReply (recv_resync_read)
1475  * and from receive_Data */
1476 static struct drbd_peer_request *
1477 read_in_block(struct drbd_conf *mdev, u64 id, sector_t sector,
1478               int data_size) __must_hold(local)
1479 {
1480         const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
1481         struct drbd_peer_request *peer_req;
1482         struct page *page;
1483         int dgs, ds, err;
1484         void *dig_in = mdev->tconn->int_dig_in;
1485         void *dig_vv = mdev->tconn->int_dig_vv;
1486         unsigned long *data;
1487
1488         dgs = 0;
1489         if (mdev->tconn->peer_integrity_tfm) {
1490                 dgs = crypto_hash_digestsize(mdev->tconn->peer_integrity_tfm);
1491                 /*
1492                  * FIXME: Receive the incoming digest into the receive buffer
1493                  *        here, together with its struct p_data?
1494                  */
1495                 err = drbd_recv_all_warn(mdev->tconn, dig_in, dgs);
1496                 if (err)
1497                         return NULL;
1498                 data_size -= dgs;
1499         }
1500
1501         if (!expect(IS_ALIGNED(data_size, 512)))
1502                 return NULL;
1503         if (!expect(data_size <= DRBD_MAX_BIO_SIZE))
1504                 return NULL;
1505
1506         /* even though we trust out peer,
1507          * we sometimes have to double check. */
1508         if (sector + (data_size>>9) > capacity) {
1509                 dev_err(DEV, "request from peer beyond end of local disk: "
1510                         "capacity: %llus < sector: %llus + size: %u\n",
1511                         (unsigned long long)capacity,
1512                         (unsigned long long)sector, data_size);
1513                 return NULL;
1514         }
1515
1516         /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1517          * "criss-cross" setup, that might cause write-out on some other DRBD,
1518          * which in turn might block on the other node at this very place.  */
1519         peer_req = drbd_alloc_peer_req(mdev, id, sector, data_size, GFP_NOIO);
1520         if (!peer_req)
1521                 return NULL;
1522
1523         if (!data_size)
1524                 return peer_req;
1525
1526         ds = data_size;
1527         page = peer_req->pages;
1528         page_chain_for_each(page) {
1529                 unsigned len = min_t(int, ds, PAGE_SIZE);
1530                 data = kmap(page);
1531                 err = drbd_recv_all_warn(mdev->tconn, data, len);
1532                 if (drbd_insert_fault(mdev, DRBD_FAULT_RECEIVE)) {
1533                         dev_err(DEV, "Fault injection: Corrupting data on receive\n");
1534                         data[0] = data[0] ^ (unsigned long)-1;
1535                 }
1536                 kunmap(page);
1537                 if (err) {
1538                         drbd_free_peer_req(mdev, peer_req);
1539                         return NULL;
1540                 }
1541                 ds -= len;
1542         }
1543
1544         if (dgs) {
1545                 drbd_csum_ee(mdev, mdev->tconn->peer_integrity_tfm, peer_req, dig_vv);
1546                 if (memcmp(dig_in, dig_vv, dgs)) {
1547                         dev_err(DEV, "Digest integrity check FAILED: %llus +%u\n",
1548                                 (unsigned long long)sector, data_size);
1549                         drbd_free_peer_req(mdev, peer_req);
1550                         return NULL;
1551                 }
1552         }
1553         mdev->recv_cnt += data_size>>9;
1554         return peer_req;
1555 }
1556
1557 /* drbd_drain_block() just takes a data block
1558  * out of the socket input buffer, and discards it.
1559  */
1560 static int drbd_drain_block(struct drbd_conf *mdev, int data_size)
1561 {
1562         struct page *page;
1563         int err = 0;
1564         void *data;
1565
1566         if (!data_size)
1567                 return 0;
1568
1569         page = drbd_alloc_pages(mdev, 1, 1);
1570
1571         data = kmap(page);
1572         while (data_size) {
1573                 unsigned int len = min_t(int, data_size, PAGE_SIZE);
1574
1575                 err = drbd_recv_all_warn(mdev->tconn, data, len);
1576                 if (err)
1577                         break;
1578                 data_size -= len;
1579         }
1580         kunmap(page);
1581         drbd_free_pages(mdev, page, 0);
1582         return err;
1583 }
1584
1585 static int recv_dless_read(struct drbd_conf *mdev, struct drbd_request *req,
1586                            sector_t sector, int data_size)
1587 {
1588         struct bio_vec bvec;
1589         struct bvec_iter iter;
1590         struct bio *bio;
1591         int dgs, err, expect;
1592         void *dig_in = mdev->tconn->int_dig_in;
1593         void *dig_vv = mdev->tconn->int_dig_vv;
1594
1595         dgs = 0;
1596         if (mdev->tconn->peer_integrity_tfm) {
1597                 dgs = crypto_hash_digestsize(mdev->tconn->peer_integrity_tfm);
1598                 err = drbd_recv_all_warn(mdev->tconn, dig_in, dgs);
1599                 if (err)
1600                         return err;
1601                 data_size -= dgs;
1602         }
1603
1604         /* optimistically update recv_cnt.  if receiving fails below,
1605          * we disconnect anyways, and counters will be reset. */
1606         mdev->recv_cnt += data_size>>9;
1607
1608         bio = req->master_bio;
1609         D_ASSERT(sector == bio->bi_iter.bi_sector);
1610
1611         bio_for_each_segment(bvec, bio, iter) {
1612                 void *mapped = kmap(bvec.bv_page) + bvec.bv_offset;
1613                 expect = min_t(int, data_size, bvec.bv_len);
1614                 err = drbd_recv_all_warn(mdev->tconn, mapped, expect);
1615                 kunmap(bvec.bv_page);
1616                 if (err)
1617                         return err;
1618                 data_size -= expect;
1619         }
1620
1621         if (dgs) {
1622                 drbd_csum_bio(mdev, mdev->tconn->peer_integrity_tfm, bio, dig_vv);
1623                 if (memcmp(dig_in, dig_vv, dgs)) {
1624                         dev_err(DEV, "Digest integrity check FAILED. Broken NICs?\n");
1625                         return -EINVAL;
1626                 }
1627         }
1628
1629         D_ASSERT(data_size == 0);
1630         return 0;
1631 }
1632
1633 /*
1634  * e_end_resync_block() is called in asender context via
1635  * drbd_finish_peer_reqs().
1636  */
1637 static int e_end_resync_block(struct drbd_work *w, int unused)
1638 {
1639         struct drbd_peer_request *peer_req =
1640                 container_of(w, struct drbd_peer_request, w);
1641         struct drbd_conf *mdev = w->mdev;
1642         sector_t sector = peer_req->i.sector;
1643         int err;
1644
1645         D_ASSERT(drbd_interval_empty(&peer_req->i));
1646
1647         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1648                 drbd_set_in_sync(mdev, sector, peer_req->i.size);
1649                 err = drbd_send_ack(mdev, P_RS_WRITE_ACK, peer_req);
1650         } else {
1651                 /* Record failure to sync */
1652                 drbd_rs_failed_io(mdev, sector, peer_req->i.size);
1653
1654                 err  = drbd_send_ack(mdev, P_NEG_ACK, peer_req);
1655         }
1656         dec_unacked(mdev);
1657
1658         return err;
1659 }
1660
1661 static int recv_resync_read(struct drbd_conf *mdev, sector_t sector, int data_size) __releases(local)
1662 {
1663         struct drbd_peer_request *peer_req;
1664
1665         peer_req = read_in_block(mdev, ID_SYNCER, sector, data_size);
1666         if (!peer_req)
1667                 goto fail;
1668
1669         dec_rs_pending(mdev);
1670
1671         inc_unacked(mdev);
1672         /* corresponding dec_unacked() in e_end_resync_block()
1673          * respective _drbd_clear_done_ee */
1674
1675         peer_req->w.cb = e_end_resync_block;
1676
1677         spin_lock_irq(&mdev->tconn->req_lock);
1678         list_add(&peer_req->w.list, &mdev->sync_ee);
1679         spin_unlock_irq(&mdev->tconn->req_lock);
1680
1681         atomic_add(data_size >> 9, &mdev->rs_sect_ev);
1682         if (drbd_submit_peer_request(mdev, peer_req, WRITE, DRBD_FAULT_RS_WR) == 0)
1683                 return 0;
1684
1685         /* don't care for the reason here */
1686         dev_err(DEV, "submit failed, triggering re-connect\n");
1687         spin_lock_irq(&mdev->tconn->req_lock);
1688         list_del(&peer_req->w.list);
1689         spin_unlock_irq(&mdev->tconn->req_lock);
1690
1691         drbd_free_peer_req(mdev, peer_req);
1692 fail:
1693         put_ldev(mdev);
1694         return -EIO;
1695 }
1696
1697 static struct drbd_request *
1698 find_request(struct drbd_conf *mdev, struct rb_root *root, u64 id,
1699              sector_t sector, bool missing_ok, const char *func)
1700 {
1701         struct drbd_request *req;
1702
1703         /* Request object according to our peer */
1704         req = (struct drbd_request *)(unsigned long)id;
1705         if (drbd_contains_interval(root, sector, &req->i) && req->i.local)
1706                 return req;
1707         if (!missing_ok) {
1708                 dev_err(DEV, "%s: failed to find request 0x%lx, sector %llus\n", func,
1709                         (unsigned long)id, (unsigned long long)sector);
1710         }
1711         return NULL;
1712 }
1713
1714 static int receive_DataReply(struct drbd_tconn *tconn, struct packet_info *pi)
1715 {
1716         struct drbd_conf *mdev;
1717         struct drbd_request *req;
1718         sector_t sector;
1719         int err;
1720         struct p_data *p = pi->data;
1721
1722         mdev = vnr_to_mdev(tconn, pi->vnr);
1723         if (!mdev)
1724                 return -EIO;
1725
1726         sector = be64_to_cpu(p->sector);
1727
1728         spin_lock_irq(&mdev->tconn->req_lock);
1729         req = find_request(mdev, &mdev->read_requests, p->block_id, sector, false, __func__);
1730         spin_unlock_irq(&mdev->tconn->req_lock);
1731         if (unlikely(!req))
1732                 return -EIO;
1733
1734         /* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
1735          * special casing it there for the various failure cases.
1736          * still no race with drbd_fail_pending_reads */
1737         err = recv_dless_read(mdev, req, sector, pi->size);
1738         if (!err)
1739                 req_mod(req, DATA_RECEIVED);
1740         /* else: nothing. handled from drbd_disconnect...
1741          * I don't think we may complete this just yet
1742          * in case we are "on-disconnect: freeze" */
1743
1744         return err;
1745 }
1746
1747 static int receive_RSDataReply(struct drbd_tconn *tconn, struct packet_info *pi)
1748 {
1749         struct drbd_conf *mdev;
1750         sector_t sector;
1751         int err;
1752         struct p_data *p = pi->data;
1753
1754         mdev = vnr_to_mdev(tconn, pi->vnr);
1755         if (!mdev)
1756                 return -EIO;
1757
1758         sector = be64_to_cpu(p->sector);
1759         D_ASSERT(p->block_id == ID_SYNCER);
1760
1761         if (get_ldev(mdev)) {
1762                 /* data is submitted to disk within recv_resync_read.
1763                  * corresponding put_ldev done below on error,
1764                  * or in drbd_peer_request_endio. */
1765                 err = recv_resync_read(mdev, sector, pi->size);
1766         } else {
1767                 if (__ratelimit(&drbd_ratelimit_state))
1768                         dev_err(DEV, "Can not write resync data to local disk.\n");
1769
1770                 err = drbd_drain_block(mdev, pi->size);
1771
1772                 drbd_send_ack_dp(mdev, P_NEG_ACK, p, pi->size);
1773         }
1774
1775         atomic_add(pi->size >> 9, &mdev->rs_sect_in);
1776
1777         return err;
1778 }
1779
1780 static void restart_conflicting_writes(struct drbd_conf *mdev,
1781                                        sector_t sector, int size)
1782 {
1783         struct drbd_interval *i;
1784         struct drbd_request *req;
1785
1786         drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
1787                 if (!i->local)
1788                         continue;
1789                 req = container_of(i, struct drbd_request, i);
1790                 if (req->rq_state & RQ_LOCAL_PENDING ||
1791                     !(req->rq_state & RQ_POSTPONED))
1792                         continue;
1793                 /* as it is RQ_POSTPONED, this will cause it to
1794                  * be queued on the retry workqueue. */
1795                 __req_mod(req, CONFLICT_RESOLVED, NULL);
1796         }
1797 }
1798
1799 /*
1800  * e_end_block() is called in asender context via drbd_finish_peer_reqs().
1801  */
1802 static int e_end_block(struct drbd_work *w, int cancel)
1803 {
1804         struct drbd_peer_request *peer_req =
1805                 container_of(w, struct drbd_peer_request, w);
1806         struct drbd_conf *mdev = w->mdev;
1807         sector_t sector = peer_req->i.sector;
1808         int err = 0, pcmd;
1809
1810         if (peer_req->flags & EE_SEND_WRITE_ACK) {
1811                 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1812                         pcmd = (mdev->state.conn >= C_SYNC_SOURCE &&
1813                                 mdev->state.conn <= C_PAUSED_SYNC_T &&
1814                                 peer_req->flags & EE_MAY_SET_IN_SYNC) ?
1815                                 P_RS_WRITE_ACK : P_WRITE_ACK;
1816                         err = drbd_send_ack(mdev, pcmd, peer_req);
1817                         if (pcmd == P_RS_WRITE_ACK)
1818                                 drbd_set_in_sync(mdev, sector, peer_req->i.size);
1819                 } else {
1820                         err = drbd_send_ack(mdev, P_NEG_ACK, peer_req);
1821                         /* we expect it to be marked out of sync anyways...
1822                          * maybe assert this?  */
1823                 }
1824                 dec_unacked(mdev);
1825         }
1826         /* we delete from the conflict detection hash _after_ we sent out the
1827          * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right.  */
1828         if (peer_req->flags & EE_IN_INTERVAL_TREE) {
1829                 spin_lock_irq(&mdev->tconn->req_lock);
1830                 D_ASSERT(!drbd_interval_empty(&peer_req->i));
1831                 drbd_remove_epoch_entry_interval(mdev, peer_req);
1832                 if (peer_req->flags & EE_RESTART_REQUESTS)
1833                         restart_conflicting_writes(mdev, sector, peer_req->i.size);
1834                 spin_unlock_irq(&mdev->tconn->req_lock);
1835         } else
1836                 D_ASSERT(drbd_interval_empty(&peer_req->i));
1837
1838         drbd_may_finish_epoch(mdev->tconn, peer_req->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
1839
1840         return err;
1841 }
1842
1843 static int e_send_ack(struct drbd_work *w, enum drbd_packet ack)
1844 {
1845         struct drbd_conf *mdev = w->mdev;
1846         struct drbd_peer_request *peer_req =
1847                 container_of(w, struct drbd_peer_request, w);
1848         int err;
1849
1850         err = drbd_send_ack(mdev, ack, peer_req);
1851         dec_unacked(mdev);
1852
1853         return err;
1854 }
1855
1856 static int e_send_superseded(struct drbd_work *w, int unused)
1857 {
1858         return e_send_ack(w, P_SUPERSEDED);
1859 }
1860
1861 static int e_send_retry_write(struct drbd_work *w, int unused)
1862 {
1863         struct drbd_tconn *tconn = w->mdev->tconn;
1864
1865         return e_send_ack(w, tconn->agreed_pro_version >= 100 ?
1866                              P_RETRY_WRITE : P_SUPERSEDED);
1867 }
1868
1869 static bool seq_greater(u32 a, u32 b)
1870 {
1871         /*
1872          * We assume 32-bit wrap-around here.
1873          * For 24-bit wrap-around, we would have to shift:
1874          *  a <<= 8; b <<= 8;
1875          */
1876         return (s32)a - (s32)b > 0;
1877 }
1878
1879 static u32 seq_max(u32 a, u32 b)
1880 {
1881         return seq_greater(a, b) ? a : b;
1882 }
1883
1884 static void update_peer_seq(struct drbd_conf *mdev, unsigned int peer_seq)
1885 {
1886         unsigned int newest_peer_seq;
1887
1888         if (test_bit(RESOLVE_CONFLICTS, &mdev->tconn->flags)) {
1889                 spin_lock(&mdev->peer_seq_lock);
1890                 newest_peer_seq = seq_max(mdev->peer_seq, peer_seq);
1891                 mdev->peer_seq = newest_peer_seq;
1892                 spin_unlock(&mdev->peer_seq_lock);
1893                 /* wake up only if we actually changed mdev->peer_seq */
1894                 if (peer_seq == newest_peer_seq)
1895                         wake_up(&mdev->seq_wait);
1896         }
1897 }
1898
1899 static inline int overlaps(sector_t s1, int l1, sector_t s2, int l2)
1900 {
1901         return !((s1 + (l1>>9) <= s2) || (s1 >= s2 + (l2>>9)));
1902 }
1903
1904 /* maybe change sync_ee into interval trees as well? */
1905 static bool overlapping_resync_write(struct drbd_conf *mdev, struct drbd_peer_request *peer_req)
1906 {
1907         struct drbd_peer_request *rs_req;
1908         bool rv = 0;
1909
1910         spin_lock_irq(&mdev->tconn->req_lock);
1911         list_for_each_entry(rs_req, &mdev->sync_ee, w.list) {
1912                 if (overlaps(peer_req->i.sector, peer_req->i.size,
1913                              rs_req->i.sector, rs_req->i.size)) {
1914                         rv = 1;
1915                         break;
1916                 }
1917         }
1918         spin_unlock_irq(&mdev->tconn->req_lock);
1919
1920         return rv;
1921 }
1922
1923 /* Called from receive_Data.
1924  * Synchronize packets on sock with packets on msock.
1925  *
1926  * This is here so even when a P_DATA packet traveling via sock overtook an Ack
1927  * packet traveling on msock, they are still processed in the order they have
1928  * been sent.
1929  *
1930  * Note: we don't care for Ack packets overtaking P_DATA packets.
1931  *
1932  * In case packet_seq is larger than mdev->peer_seq number, there are
1933  * outstanding packets on the msock. We wait for them to arrive.
1934  * In case we are the logically next packet, we update mdev->peer_seq
1935  * ourselves. Correctly handles 32bit wrap around.
1936  *
1937  * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
1938  * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
1939  * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
1940  * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
1941  *
1942  * returns 0 if we may process the packet,
1943  * -ERESTARTSYS if we were interrupted (by disconnect signal). */
1944 static int wait_for_and_update_peer_seq(struct drbd_conf *mdev, const u32 peer_seq)
1945 {
1946         DEFINE_WAIT(wait);
1947         long timeout;
1948         int ret = 0, tp;
1949
1950         if (!test_bit(RESOLVE_CONFLICTS, &mdev->tconn->flags))
1951                 return 0;
1952
1953         spin_lock(&mdev->peer_seq_lock);
1954         for (;;) {
1955                 if (!seq_greater(peer_seq - 1, mdev->peer_seq)) {
1956                         mdev->peer_seq = seq_max(mdev->peer_seq, peer_seq);
1957                         break;
1958                 }
1959
1960                 if (signal_pending(current)) {
1961                         ret = -ERESTARTSYS;
1962                         break;
1963                 }
1964
1965                 rcu_read_lock();
1966                 tp = rcu_dereference(mdev->tconn->net_conf)->two_primaries;
1967                 rcu_read_unlock();
1968
1969                 if (!tp)
1970                         break;
1971
1972                 /* Only need to wait if two_primaries is enabled */
1973                 prepare_to_wait(&mdev->seq_wait, &wait, TASK_INTERRUPTIBLE);
1974                 spin_unlock(&mdev->peer_seq_lock);
1975                 rcu_read_lock();
1976                 timeout = rcu_dereference(mdev->tconn->net_conf)->ping_timeo*HZ/10;
1977                 rcu_read_unlock();
1978                 timeout = schedule_timeout(timeout);
1979                 spin_lock(&mdev->peer_seq_lock);
1980                 if (!timeout) {
1981                         ret = -ETIMEDOUT;
1982                         dev_err(DEV, "Timed out waiting for missing ack packets; disconnecting\n");
1983                         break;
1984                 }
1985         }
1986         spin_unlock(&mdev->peer_seq_lock);
1987         finish_wait(&mdev->seq_wait, &wait);
1988         return ret;
1989 }
1990
1991 /* see also bio_flags_to_wire()
1992  * DRBD_REQ_*, because we need to semantically map the flags to data packet
1993  * flags and back. We may replicate to other kernel versions. */
1994 static unsigned long wire_flags_to_bio(struct drbd_conf *mdev, u32 dpf)
1995 {
1996         return  (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
1997                 (dpf & DP_FUA ? REQ_FUA : 0) |
1998                 (dpf & DP_FLUSH ? REQ_FLUSH : 0) |
1999                 (dpf & DP_DISCARD ? REQ_DISCARD : 0);
2000 }
2001
2002 static void fail_postponed_requests(struct drbd_conf *mdev, sector_t sector,
2003                                     unsigned int size)
2004 {
2005         struct drbd_interval *i;
2006
2007     repeat:
2008         drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
2009                 struct drbd_request *req;
2010                 struct bio_and_error m;
2011
2012                 if (!i->local)
2013                         continue;
2014                 req = container_of(i, struct drbd_request, i);
2015                 if (!(req->rq_state & RQ_POSTPONED))
2016                         continue;
2017                 req->rq_state &= ~RQ_POSTPONED;
2018                 __req_mod(req, NEG_ACKED, &m);
2019                 spin_unlock_irq(&mdev->tconn->req_lock);
2020                 if (m.bio)
2021                         complete_master_bio(mdev, &m);
2022                 spin_lock_irq(&mdev->tconn->req_lock);
2023                 goto repeat;
2024         }
2025 }
2026
2027 static int handle_write_conflicts(struct drbd_conf *mdev,
2028                                   struct drbd_peer_request *peer_req)
2029 {
2030         struct drbd_tconn *tconn = mdev->tconn;
2031         bool resolve_conflicts = test_bit(RESOLVE_CONFLICTS, &tconn->flags);
2032         sector_t sector = peer_req->i.sector;
2033         const unsigned int size = peer_req->i.size;
2034         struct drbd_interval *i;
2035         bool equal;
2036         int err;
2037
2038         /*
2039          * Inserting the peer request into the write_requests tree will prevent
2040          * new conflicting local requests from being added.
2041          */
2042         drbd_insert_interval(&mdev->write_requests, &peer_req->i);
2043
2044     repeat:
2045         drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
2046                 if (i == &peer_req->i)
2047                         continue;
2048
2049                 if (!i->local) {
2050                         /*
2051                          * Our peer has sent a conflicting remote request; this
2052                          * should not happen in a two-node setup.  Wait for the
2053                          * earlier peer request to complete.
2054                          */
2055                         err = drbd_wait_misc(mdev, i);
2056                         if (err)
2057                                 goto out;
2058                         goto repeat;
2059                 }
2060
2061                 equal = i->sector == sector && i->size == size;
2062                 if (resolve_conflicts) {
2063                         /*
2064                          * If the peer request is fully contained within the
2065                          * overlapping request, it can be considered overwritten
2066                          * and thus superseded; otherwise, it will be retried
2067                          * once all overlapping requests have completed.
2068                          */
2069                         bool superseded = i->sector <= sector && i->sector +
2070                                        (i->size >> 9) >= sector + (size >> 9);
2071
2072                         if (!equal)
2073                                 dev_alert(DEV, "Concurrent writes detected: "
2074                                                "local=%llus +%u, remote=%llus +%u, "
2075                                                "assuming %s came first\n",
2076                                           (unsigned long long)i->sector, i->size,
2077                                           (unsigned long long)sector, size,
2078                                           superseded ? "local" : "remote");
2079
2080                         inc_unacked(mdev);
2081                         peer_req->w.cb = superseded ? e_send_superseded :
2082                                                    e_send_retry_write;
2083                         list_add_tail(&peer_req->w.list, &mdev->done_ee);
2084                         wake_asender(mdev->tconn);
2085
2086                         err = -ENOENT;
2087                         goto out;
2088                 } else {
2089                         struct drbd_request *req =
2090                                 container_of(i, struct drbd_request, i);
2091
2092                         if (!equal)
2093                                 dev_alert(DEV, "Concurrent writes detected: "
2094                                                "local=%llus +%u, remote=%llus +%u\n",
2095                                           (unsigned long long)i->sector, i->size,
2096                                           (unsigned long long)sector, size);
2097
2098                         if (req->rq_state & RQ_LOCAL_PENDING ||
2099                             !(req->rq_state & RQ_POSTPONED)) {
2100                                 /*
2101                                  * Wait for the node with the discard flag to
2102                                  * decide if this request has been superseded
2103                                  * or needs to be retried.
2104                                  * Requests that have been superseded will
2105                                  * disappear from the write_requests tree.
2106                                  *
2107                                  * In addition, wait for the conflicting
2108                                  * request to finish locally before submitting
2109                                  * the conflicting peer request.
2110                                  */
2111                                 err = drbd_wait_misc(mdev, &req->i);
2112                                 if (err) {
2113                                         _conn_request_state(mdev->tconn,
2114                                                             NS(conn, C_TIMEOUT),
2115                                                             CS_HARD);
2116                                         fail_postponed_requests(mdev, sector, size);
2117                                         goto out;
2118                                 }
2119                                 goto repeat;
2120                         }
2121                         /*
2122                          * Remember to restart the conflicting requests after
2123                          * the new peer request has completed.
2124                          */
2125                         peer_req->flags |= EE_RESTART_REQUESTS;
2126                 }
2127         }
2128         err = 0;
2129
2130     out:
2131         if (err)
2132                 drbd_remove_epoch_entry_interval(mdev, peer_req);
2133         return err;
2134 }
2135
2136 /* mirrored write */
2137 static int receive_Data(struct drbd_tconn *tconn, struct packet_info *pi)
2138 {
2139         struct drbd_conf *mdev;
2140         sector_t sector;
2141         struct drbd_peer_request *peer_req;
2142         struct p_data *p = pi->data;
2143         u32 peer_seq = be32_to_cpu(p->seq_num);
2144         int rw = WRITE;
2145         u32 dp_flags;
2146         int err, tp;
2147
2148         mdev = vnr_to_mdev(tconn, pi->vnr);
2149         if (!mdev)
2150                 return -EIO;
2151
2152         if (!get_ldev(mdev)) {
2153                 int err2;
2154
2155                 err = wait_for_and_update_peer_seq(mdev, peer_seq);
2156                 drbd_send_ack_dp(mdev, P_NEG_ACK, p, pi->size);
2157                 atomic_inc(&tconn->current_epoch->epoch_size);
2158                 err2 = drbd_drain_block(mdev, pi->size);
2159                 if (!err)
2160                         err = err2;
2161                 return err;
2162         }
2163
2164         /*
2165          * Corresponding put_ldev done either below (on various errors), or in
2166          * drbd_peer_request_endio, if we successfully submit the data at the
2167          * end of this function.
2168          */
2169
2170         sector = be64_to_cpu(p->sector);
2171         peer_req = read_in_block(mdev, p->block_id, sector, pi->size);
2172         if (!peer_req) {
2173                 put_ldev(mdev);
2174                 return -EIO;
2175         }
2176
2177         peer_req->w.cb = e_end_block;
2178
2179         dp_flags = be32_to_cpu(p->dp_flags);
2180         rw |= wire_flags_to_bio(mdev, dp_flags);
2181         if (peer_req->pages == NULL) {
2182                 D_ASSERT(peer_req->i.size == 0);
2183                 D_ASSERT(dp_flags & DP_FLUSH);
2184         }
2185
2186         if (dp_flags & DP_MAY_SET_IN_SYNC)
2187                 peer_req->flags |= EE_MAY_SET_IN_SYNC;
2188
2189         spin_lock(&tconn->epoch_lock);
2190         peer_req->epoch = tconn->current_epoch;
2191         atomic_inc(&peer_req->epoch->epoch_size);
2192         atomic_inc(&peer_req->epoch->active);
2193         spin_unlock(&tconn->epoch_lock);
2194
2195         rcu_read_lock();
2196         tp = rcu_dereference(mdev->tconn->net_conf)->two_primaries;
2197         rcu_read_unlock();
2198         if (tp) {
2199                 peer_req->flags |= EE_IN_INTERVAL_TREE;
2200                 err = wait_for_and_update_peer_seq(mdev, peer_seq);
2201                 if (err)
2202                         goto out_interrupted;
2203                 spin_lock_irq(&mdev->tconn->req_lock);
2204                 err = handle_write_conflicts(mdev, peer_req);
2205                 if (err) {
2206                         spin_unlock_irq(&mdev->tconn->req_lock);
2207                         if (err == -ENOENT) {
2208                                 put_ldev(mdev);
2209                                 return 0;
2210                         }
2211                         goto out_interrupted;
2212                 }
2213         } else {
2214                 update_peer_seq(mdev, peer_seq);
2215                 spin_lock_irq(&mdev->tconn->req_lock);
2216         }
2217         list_add(&peer_req->w.list, &mdev->active_ee);
2218         spin_unlock_irq(&mdev->tconn->req_lock);
2219
2220         if (mdev->state.conn == C_SYNC_TARGET)
2221                 wait_event(mdev->ee_wait, !overlapping_resync_write(mdev, peer_req));
2222
2223         if (mdev->tconn->agreed_pro_version < 100) {
2224                 rcu_read_lock();
2225                 switch (rcu_dereference(mdev->tconn->net_conf)->wire_protocol) {
2226                 case DRBD_PROT_C:
2227                         dp_flags |= DP_SEND_WRITE_ACK;
2228                         break;
2229                 case DRBD_PROT_B:
2230                         dp_flags |= DP_SEND_RECEIVE_ACK;
2231                         break;
2232                 }
2233                 rcu_read_unlock();
2234         }
2235
2236         if (dp_flags & DP_SEND_WRITE_ACK) {
2237                 peer_req->flags |= EE_SEND_WRITE_ACK;
2238                 inc_unacked(mdev);
2239                 /* corresponding dec_unacked() in e_end_block()
2240                  * respective _drbd_clear_done_ee */
2241         }
2242
2243         if (dp_flags & DP_SEND_RECEIVE_ACK) {
2244                 /* I really don't like it that the receiver thread
2245                  * sends on the msock, but anyways */
2246                 drbd_send_ack(mdev, P_RECV_ACK, peer_req);
2247         }
2248
2249         if (mdev->state.pdsk < D_INCONSISTENT) {
2250                 /* In case we have the only disk of the cluster, */
2251                 drbd_set_out_of_sync(mdev, peer_req->i.sector, peer_req->i.size);
2252                 peer_req->flags |= EE_CALL_AL_COMPLETE_IO;
2253                 peer_req->flags &= ~EE_MAY_SET_IN_SYNC;
2254                 drbd_al_begin_io(mdev, &peer_req->i, true);
2255         }
2256
2257         err = drbd_submit_peer_request(mdev, peer_req, rw, DRBD_FAULT_DT_WR);
2258         if (!err)
2259                 return 0;
2260
2261         /* don't care for the reason here */
2262         dev_err(DEV, "submit failed, triggering re-connect\n");
2263         spin_lock_irq(&mdev->tconn->req_lock);
2264         list_del(&peer_req->w.list);
2265         drbd_remove_epoch_entry_interval(mdev, peer_req);
2266         spin_unlock_irq(&mdev->tconn->req_lock);
2267         if (peer_req->flags & EE_CALL_AL_COMPLETE_IO)
2268                 drbd_al_complete_io(mdev, &peer_req->i);
2269
2270 out_interrupted:
2271         drbd_may_finish_epoch(tconn, peer_req->epoch, EV_PUT + EV_CLEANUP);
2272         put_ldev(mdev);
2273         drbd_free_peer_req(mdev, peer_req);
2274         return err;
2275 }
2276
2277 /* We may throttle resync, if the lower device seems to be busy,
2278  * and current sync rate is above c_min_rate.
2279  *
2280  * To decide whether or not the lower device is busy, we use a scheme similar
2281  * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
2282  * (more than 64 sectors) of activity we cannot account for with our own resync
2283  * activity, it obviously is "busy".
2284  *
2285  * The current sync rate used here uses only the most recent two step marks,
2286  * to have a short time average so we can react faster.
2287  */
2288 int drbd_rs_should_slow_down(struct drbd_conf *mdev, sector_t sector)
2289 {
2290         struct gendisk *disk = mdev->ldev->backing_bdev->bd_contains->bd_disk;
2291         unsigned long db, dt, dbdt;
2292         struct lc_element *tmp;
2293         int curr_events;
2294         int throttle = 0;
2295         unsigned int c_min_rate;
2296
2297         rcu_read_lock();
2298         c_min_rate = rcu_dereference(mdev->ldev->disk_conf)->c_min_rate;
2299         rcu_read_unlock();
2300
2301         /* feature disabled? */
2302         if (c_min_rate == 0)
2303                 return 0;
2304
2305         spin_lock_irq(&mdev->al_lock);
2306         tmp = lc_find(mdev->resync, BM_SECT_TO_EXT(sector));
2307         if (tmp) {
2308                 struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
2309                 if (test_bit(BME_PRIORITY, &bm_ext->flags)) {
2310                         spin_unlock_irq(&mdev->al_lock);
2311                         return 0;
2312                 }
2313                 /* Do not slow down if app IO is already waiting for this extent */
2314         }
2315         spin_unlock_irq(&mdev->al_lock);
2316
2317         curr_events = (int)part_stat_read(&disk->part0, sectors[0]) +
2318                       (int)part_stat_read(&disk->part0, sectors[1]) -
2319                         atomic_read(&mdev->rs_sect_ev);
2320
2321         if (!mdev->rs_last_events || curr_events - mdev->rs_last_events > 64) {
2322                 unsigned long rs_left;
2323                 int i;
2324
2325                 mdev->rs_last_events = curr_events;
2326
2327                 /* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
2328                  * approx. */
2329                 i = (mdev->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
2330
2331                 if (mdev->state.conn == C_VERIFY_S || mdev->state.conn == C_VERIFY_T)
2332                         rs_left = mdev->ov_left;
2333                 else
2334                         rs_left = drbd_bm_total_weight(mdev) - mdev->rs_failed;
2335
2336                 dt = ((long)jiffies - (long)mdev->rs_mark_time[i]) / HZ;
2337                 if (!dt)
2338                         dt++;
2339                 db = mdev->rs_mark_left[i] - rs_left;
2340                 dbdt = Bit2KB(db/dt);
2341
2342                 if (dbdt > c_min_rate)
2343                         throttle = 1;
2344         }
2345         return throttle;
2346 }
2347
2348
2349 static int receive_DataRequest(struct drbd_tconn *tconn, struct packet_info *pi)
2350 {
2351         struct drbd_conf *mdev;
2352         sector_t sector;
2353         sector_t capacity;
2354         struct drbd_peer_request *peer_req;
2355         struct digest_info *di = NULL;
2356         int size, verb;
2357         unsigned int fault_type;
2358         struct p_block_req *p = pi->data;
2359
2360         mdev = vnr_to_mdev(tconn, pi->vnr);
2361         if (!mdev)
2362                 return -EIO;
2363         capacity = drbd_get_capacity(mdev->this_bdev);
2364
2365         sector = be64_to_cpu(p->sector);
2366         size   = be32_to_cpu(p->blksize);
2367
2368         if (size <= 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_BIO_SIZE) {
2369                 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2370                                 (unsigned long long)sector, size);
2371                 return -EINVAL;
2372         }
2373         if (sector + (size>>9) > capacity) {
2374                 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2375                                 (unsigned long long)sector, size);
2376                 return -EINVAL;
2377         }
2378
2379         if (!get_ldev_if_state(mdev, D_UP_TO_DATE)) {
2380                 verb = 1;
2381                 switch (pi->cmd) {
2382                 case P_DATA_REQUEST:
2383                         drbd_send_ack_rp(mdev, P_NEG_DREPLY, p);
2384                         break;
2385                 case P_RS_DATA_REQUEST:
2386                 case P_CSUM_RS_REQUEST:
2387                 case P_OV_REQUEST:
2388                         drbd_send_ack_rp(mdev, P_NEG_RS_DREPLY , p);
2389                         break;
2390                 case P_OV_REPLY:
2391                         verb = 0;
2392                         dec_rs_pending(mdev);
2393                         drbd_send_ack_ex(mdev, P_OV_RESULT, sector, size, ID_IN_SYNC);
2394                         break;
2395                 default:
2396                         BUG();
2397                 }
2398                 if (verb && __ratelimit(&drbd_ratelimit_state))
2399                         dev_err(DEV, "Can not satisfy peer's read request, "
2400                             "no local data.\n");
2401
2402                 /* drain possibly payload */
2403                 return drbd_drain_block(mdev, pi->size);
2404         }
2405
2406         /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2407          * "criss-cross" setup, that might cause write-out on some other DRBD,
2408          * which in turn might block on the other node at this very place.  */
2409         peer_req = drbd_alloc_peer_req(mdev, p->block_id, sector, size, GFP_NOIO);
2410         if (!peer_req) {
2411                 put_ldev(mdev);
2412                 return -ENOMEM;
2413         }
2414
2415         switch (pi->cmd) {
2416         case P_DATA_REQUEST:
2417                 peer_req->w.cb = w_e_end_data_req;
2418                 fault_type = DRBD_FAULT_DT_RD;
2419                 /* application IO, don't drbd_rs_begin_io */
2420                 goto submit;
2421
2422         case P_RS_DATA_REQUEST:
2423                 peer_req->w.cb = w_e_end_rsdata_req;
2424                 fault_type = DRBD_FAULT_RS_RD;
2425                 /* used in the sector offset progress display */
2426                 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
2427                 break;
2428
2429         case P_OV_REPLY:
2430         case P_CSUM_RS_REQUEST:
2431                 fault_type = DRBD_FAULT_RS_RD;
2432                 di = kmalloc(sizeof(*di) + pi->size, GFP_NOIO);
2433                 if (!di)
2434                         goto out_free_e;
2435
2436                 di->digest_size = pi->size;
2437                 di->digest = (((char *)di)+sizeof(struct digest_info));
2438
2439                 peer_req->digest = di;
2440                 peer_req->flags |= EE_HAS_DIGEST;
2441
2442                 if (drbd_recv_all(mdev->tconn, di->digest, pi->size))
2443                         goto out_free_e;
2444
2445                 if (pi->cmd == P_CSUM_RS_REQUEST) {
2446                         D_ASSERT(mdev->tconn->agreed_pro_version >= 89);
2447                         peer_req->w.cb = w_e_end_csum_rs_req;
2448                         /* used in the sector offset progress display */
2449                         mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
2450                 } else if (pi->cmd == P_OV_REPLY) {
2451                         /* track progress, we may need to throttle */
2452                         atomic_add(size >> 9, &mdev->rs_sect_in);
2453                         peer_req->w.cb = w_e_end_ov_reply;
2454                         dec_rs_pending(mdev);
2455                         /* drbd_rs_begin_io done when we sent this request,
2456                          * but accounting still needs to be done. */
2457                         goto submit_for_resync;
2458                 }
2459                 break;
2460
2461         case P_OV_REQUEST:
2462                 if (mdev->ov_start_sector == ~(sector_t)0 &&
2463                     mdev->tconn->agreed_pro_version >= 90) {
2464                         unsigned long now = jiffies;
2465                         int i;
2466                         mdev->ov_start_sector = sector;
2467                         mdev->ov_position = sector;
2468                         mdev->ov_left = drbd_bm_bits(mdev) - BM_SECT_TO_BIT(sector);
2469                         mdev->rs_total = mdev->ov_left;
2470                         for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2471                                 mdev->rs_mark_left[i] = mdev->ov_left;
2472                                 mdev->rs_mark_time[i] = now;
2473                         }
2474                         dev_info(DEV, "Online Verify start sector: %llu\n",
2475                                         (unsigned long long)sector);
2476                 }
2477                 peer_req->w.cb = w_e_end_ov_req;
2478                 fault_type = DRBD_FAULT_RS_RD;
2479                 break;
2480
2481         default:
2482                 BUG();
2483         }
2484
2485         /* Throttle, drbd_rs_begin_io and submit should become asynchronous
2486          * wrt the receiver, but it is not as straightforward as it may seem.
2487          * Various places in the resync start and stop logic assume resync
2488          * requests are processed in order, requeuing this on the worker thread
2489          * introduces a bunch of new code for synchronization between threads.
2490          *
2491          * Unlimited throttling before drbd_rs_begin_io may stall the resync
2492          * "forever", throttling after drbd_rs_begin_io will lock that extent
2493          * for application writes for the same time.  For now, just throttle
2494          * here, where the rest of the code expects the receiver to sleep for
2495          * a while, anyways.
2496          */
2497
2498         /* Throttle before drbd_rs_begin_io, as that locks out application IO;
2499          * this defers syncer requests for some time, before letting at least
2500          * on request through.  The resync controller on the receiving side
2501          * will adapt to the incoming rate accordingly.
2502          *
2503          * We cannot throttle here if remote is Primary/SyncTarget:
2504          * we would also throttle its application reads.
2505          * In that case, throttling is done on the SyncTarget only.
2506          */
2507         if (mdev->state.peer != R_PRIMARY && drbd_rs_should_slow_down(mdev, sector))
2508                 schedule_timeout_uninterruptible(HZ/10);
2509         if (drbd_rs_begin_io(mdev, sector))
2510                 goto out_free_e;
2511
2512 submit_for_resync:
2513         atomic_add(size >> 9, &mdev->rs_sect_ev);
2514
2515 submit:
2516         inc_unacked(mdev);
2517         spin_lock_irq(&mdev->tconn->req_lock);
2518         list_add_tail(&peer_req->w.list, &mdev->read_ee);
2519         spin_unlock_irq(&mdev->tconn->req_lock);
2520
2521         if (drbd_submit_peer_request(mdev, peer_req, READ, fault_type) == 0)
2522                 return 0;
2523
2524         /* don't care for the reason here */
2525         dev_err(DEV, "submit failed, triggering re-connect\n");
2526         spin_lock_irq(&mdev->tconn->req_lock);
2527         list_del(&peer_req->w.list);
2528         spin_unlock_irq(&mdev->tconn->req_lock);
2529         /* no drbd_rs_complete_io(), we are dropping the connection anyways */
2530
2531 out_free_e:
2532         put_ldev(mdev);
2533         drbd_free_peer_req(mdev, peer_req);
2534         return -EIO;
2535 }
2536
2537 static int drbd_asb_recover_0p(struct drbd_conf *mdev) __must_hold(local)
2538 {
2539         int self, peer, rv = -100;
2540         unsigned long ch_self, ch_peer;
2541         enum drbd_after_sb_p after_sb_0p;
2542
2543         self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2544         peer = mdev->p_uuid[UI_BITMAP] & 1;
2545
2546         ch_peer = mdev->p_uuid[UI_SIZE];
2547         ch_self = mdev->comm_bm_set;
2548
2549         rcu_read_lock();
2550         after_sb_0p = rcu_dereference(mdev->tconn->net_conf)->after_sb_0p;
2551         rcu_read_unlock();
2552         switch (after_sb_0p) {
2553         case ASB_CONSENSUS:
2554         case ASB_DISCARD_SECONDARY:
2555         case ASB_CALL_HELPER:
2556         case ASB_VIOLENTLY:
2557                 dev_err(DEV, "Configuration error.\n");
2558                 break;
2559         case ASB_DISCONNECT:
2560                 break;
2561         case ASB_DISCARD_YOUNGER_PRI:
2562                 if (self == 0 && peer == 1) {
2563                         rv = -1;
2564                         break;
2565                 }
2566                 if (self == 1 && peer == 0) {
2567                         rv =  1;
2568                         break;
2569                 }
2570                 /* Else fall through to one of the other strategies... */
2571         case ASB_DISCARD_OLDER_PRI:
2572                 if (self == 0 && peer == 1) {
2573                         rv = 1;
2574                         break;
2575                 }
2576                 if (self == 1 && peer == 0) {
2577                         rv = -1;
2578                         break;
2579                 }
2580                 /* Else fall through to one of the other strategies... */
2581                 dev_warn(DEV, "Discard younger/older primary did not find a decision\n"
2582                      "Using discard-least-changes instead\n");
2583         case ASB_DISCARD_ZERO_CHG:
2584                 if (ch_peer == 0 && ch_self == 0) {
2585                         rv = test_bit(RESOLVE_CONFLICTS, &mdev->tconn->flags)
2586                                 ? -1 : 1;
2587                         break;
2588                 } else {
2589                         if (ch_peer == 0) { rv =  1; break; }
2590                         if (ch_self == 0) { rv = -1; break; }
2591                 }
2592                 if (after_sb_0p == ASB_DISCARD_ZERO_CHG)
2593                         break;
2594         case ASB_DISCARD_LEAST_CHG:
2595                 if      (ch_self < ch_peer)
2596                         rv = -1;
2597                 else if (ch_self > ch_peer)
2598                         rv =  1;
2599                 else /* ( ch_self == ch_peer ) */
2600                      /* Well, then use something else. */
2601                         rv = test_bit(RESOLVE_CONFLICTS, &mdev->tconn->flags)
2602                                 ? -1 : 1;
2603                 break;
2604         case ASB_DISCARD_LOCAL:
2605                 rv = -1;
2606                 break;
2607         case ASB_DISCARD_REMOTE:
2608                 rv =  1;
2609         }
2610
2611         return rv;
2612 }
2613
2614 static int drbd_asb_recover_1p(struct drbd_conf *mdev) __must_hold(local)
2615 {
2616         int hg, rv = -100;
2617         enum drbd_after_sb_p after_sb_1p;
2618
2619         rcu_read_lock();
2620         after_sb_1p = rcu_dereference(mdev->tconn->net_conf)->after_sb_1p;
2621         rcu_read_unlock();
2622         switch (after_sb_1p) {
2623         case ASB_DISCARD_YOUNGER_PRI:
2624         case ASB_DISCARD_OLDER_PRI:
2625         case ASB_DISCARD_LEAST_CHG:
2626         case ASB_DISCARD_LOCAL:
2627         case ASB_DISCARD_REMOTE:
2628         case ASB_DISCARD_ZERO_CHG:
2629                 dev_err(DEV, "Configuration error.\n");
2630                 break;
2631         case ASB_DISCONNECT:
2632                 break;
2633         case ASB_CONSENSUS:
2634                 hg = drbd_asb_recover_0p(mdev);
2635                 if (hg == -1 && mdev->state.role == R_SECONDARY)
2636                         rv = hg;
2637                 if (hg == 1  && mdev->state.role == R_PRIMARY)
2638                         rv = hg;
2639                 break;
2640         case ASB_VIOLENTLY:
2641                 rv = drbd_asb_recover_0p(mdev);
2642                 break;
2643         case ASB_DISCARD_SECONDARY:
2644                 return mdev->state.role == R_PRIMARY ? 1 : -1;
2645         case ASB_CALL_HELPER:
2646                 hg = drbd_asb_recover_0p(mdev);
2647                 if (hg == -1 && mdev->state.role == R_PRIMARY) {
2648                         enum drbd_state_rv rv2;
2649
2650                          /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2651                           * we might be here in C_WF_REPORT_PARAMS which is transient.
2652                           * we do not need to wait for the after state change work either. */
2653                         rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2654                         if (rv2 != SS_SUCCESS) {
2655                                 drbd_khelper(mdev, "pri-lost-after-sb");
2656                         } else {
2657                                 dev_warn(DEV, "Successfully gave up primary role.\n");
2658                                 rv = hg;
2659                         }
2660                 } else
2661                         rv = hg;
2662         }
2663
2664         return rv;
2665 }
2666
2667 static int drbd_asb_recover_2p(struct drbd_conf *mdev) __must_hold(local)
2668 {
2669         int hg, rv = -100;
2670         enum drbd_after_sb_p after_sb_2p;
2671
2672         rcu_read_lock();
2673         after_sb_2p = rcu_dereference(mdev->tconn->net_conf)->after_sb_2p;
2674         rcu_read_unlock();
2675         switch (after_sb_2p) {
2676         case ASB_DISCARD_YOUNGER_PRI:
2677         case ASB_DISCARD_OLDER_PRI:
2678         case ASB_DISCARD_LEAST_CHG:
2679         case ASB_DISCARD_LOCAL:
2680         case ASB_DISCARD_REMOTE:
2681         case ASB_CONSENSUS:
2682         case ASB_DISCARD_SECONDARY:
2683         case ASB_DISCARD_ZERO_CHG:
2684                 dev_err(DEV, "Configuration error.\n");
2685                 break;
2686         case ASB_VIOLENTLY:
2687                 rv = drbd_asb_recover_0p(mdev);
2688                 break;
2689         case ASB_DISCONNECT:
2690                 break;
2691         case ASB_CALL_HELPER:
2692                 hg = drbd_asb_recover_0p(mdev);
2693                 if (hg == -1) {
2694                         enum drbd_state_rv rv2;
2695
2696                          /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2697                           * we might be here in C_WF_REPORT_PARAMS which is transient.
2698                           * we do not need to wait for the after state change work either. */
2699                         rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2700                         if (rv2 != SS_SUCCESS) {
2701                                 drbd_khelper(mdev, "pri-lost-after-sb");
2702                         } else {
2703                                 dev_warn(DEV, "Successfully gave up primary role.\n");
2704                                 rv = hg;
2705                         }
2706                 } else
2707                         rv = hg;
2708         }
2709
2710         return rv;
2711 }
2712
2713 static void drbd_uuid_dump(struct drbd_conf *mdev, char *text, u64 *uuid,
2714                            u64 bits, u64 flags)
2715 {
2716         if (!uuid) {
2717                 dev_info(DEV, "%s uuid info vanished while I was looking!\n", text);
2718                 return;
2719         }
2720         dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
2721              text,
2722              (unsigned long long)uuid[UI_CURRENT],
2723              (unsigned long long)uuid[UI_BITMAP],
2724              (unsigned long long)uuid[UI_HISTORY_START],
2725              (unsigned long long)uuid[UI_HISTORY_END],
2726              (unsigned long long)bits,
2727              (unsigned long long)flags);
2728 }
2729
2730 /*
2731   100   after split brain try auto recover
2732     2   C_SYNC_SOURCE set BitMap
2733     1   C_SYNC_SOURCE use BitMap
2734     0   no Sync
2735    -1   C_SYNC_TARGET use BitMap
2736    -2   C_SYNC_TARGET set BitMap
2737  -100   after split brain, disconnect
2738 -1000   unrelated data
2739 -1091   requires proto 91
2740 -1096   requires proto 96
2741  */
2742 static int drbd_uuid_compare(struct drbd_conf *mdev, int *rule_nr) __must_hold(local)
2743 {
2744         u64 self, peer;
2745         int i, j;
2746
2747         self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2748         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2749
2750         *rule_nr = 10;
2751         if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
2752                 return 0;
2753
2754         *rule_nr = 20;
2755         if ((self == UUID_JUST_CREATED || self == (u64)0) &&
2756              peer != UUID_JUST_CREATED)
2757                 return -2;
2758
2759         *rule_nr = 30;
2760         if (self != UUID_JUST_CREATED &&
2761             (peer == UUID_JUST_CREATED || peer == (u64)0))
2762                 return 2;
2763
2764         if (self == peer) {
2765                 int rct, dc; /* roles at crash time */
2766
2767                 if (mdev->p_uuid[UI_BITMAP] == (u64)0 && mdev->ldev->md.uuid[UI_BITMAP] != (u64)0) {
2768
2769                         if (mdev->tconn->agreed_pro_version < 91)
2770                                 return -1091;
2771
2772                         if ((mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
2773                             (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
2774                                 dev_info(DEV, "was SyncSource, missed the resync finished event, corrected myself:\n");
2775                                 drbd_uuid_move_history(mdev);
2776                                 mdev->ldev->md.uuid[UI_HISTORY_START] = mdev->ldev->md.uuid[UI_BITMAP];
2777                                 mdev->ldev->md.uuid[UI_BITMAP] = 0;
2778
2779                                 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2780                                                mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2781                                 *rule_nr = 34;
2782                         } else {
2783                                 dev_info(DEV, "was SyncSource (peer failed to write sync_uuid)\n");
2784                                 *rule_nr = 36;
2785                         }
2786
2787                         return 1;
2788                 }
2789
2790                 if (mdev->ldev->md.uuid[UI_BITMAP] == (u64)0 && mdev->p_uuid[UI_BITMAP] != (u64)0) {
2791
2792                         if (mdev->tconn->agreed_pro_version < 91)
2793                                 return -1091;
2794
2795                         if ((mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_BITMAP] & ~((u64)1)) &&
2796                             (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
2797                                 dev_info(DEV, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
2798
2799                                 mdev->p_uuid[UI_HISTORY_START + 1] = mdev->p_uuid[UI_HISTORY_START];
2800                                 mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_BITMAP];
2801                                 mdev->p_uuid[UI_BITMAP] = 0UL;
2802
2803                                 drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2804                                 *rule_nr = 35;
2805                         } else {
2806                                 dev_info(DEV, "was SyncTarget (failed to write sync_uuid)\n");
2807                                 *rule_nr = 37;
2808                         }
2809
2810                         return -1;
2811                 }
2812
2813                 /* Common power [off|failure] */
2814                 rct = (test_bit(CRASHED_PRIMARY, &mdev->flags) ? 1 : 0) +
2815                         (mdev->p_uuid[UI_FLAGS] & 2);
2816                 /* lowest bit is set when we were primary,
2817                  * next bit (weight 2) is set when peer was primary */
2818                 *rule_nr = 40;
2819
2820                 switch (rct) {
2821                 case 0: /* !self_pri && !peer_pri */ return 0;
2822                 case 1: /*  self_pri && !peer_pri */ return 1;
2823                 case 2: /* !self_pri &&  peer_pri */ return -1;
2824                 case 3: /*  self_pri &&  peer_pri */
2825                         dc = test_bit(RESOLVE_CONFLICTS, &mdev->tconn->flags);
2826                         return dc ? -1 : 1;
2827                 }
2828         }
2829
2830         *rule_nr = 50;
2831         peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2832         if (self == peer)
2833                 return -1;
2834
2835         *rule_nr = 51;
2836         peer = mdev->p_uuid[UI_HISTORY_START] & ~((u64)1);
2837         if (self == peer) {
2838                 if (mdev->tconn->agreed_pro_version < 96 ?
2839                     (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
2840                     (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
2841                     peer + UUID_NEW_BM_OFFSET == (mdev->p_uuid[UI_BITMAP] & ~((u64)1))) {
2842                         /* The last P_SYNC_UUID did not get though. Undo the last start of
2843                            resync as sync source modifications of the peer's UUIDs. */
2844
2845                         if (mdev->tconn->agreed_pro_version < 91)
2846                                 return -1091;
2847
2848                         mdev->p_uuid[UI_BITMAP] = mdev->p_uuid[UI_HISTORY_START];
2849                         mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_HISTORY_START + 1];
2850
2851                         dev_info(DEV, "Lost last syncUUID packet, corrected:\n");
2852                         drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2853
2854                         return -1;
2855                 }
2856         }
2857
2858         *rule_nr = 60;
2859         self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2860         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2861                 peer = mdev->p_uuid[i] & ~((u64)1);
2862                 if (self == peer)
2863                         return -2;
2864         }
2865
2866         *rule_nr = 70;
2867         self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2868         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2869         if (self == peer)
2870                 return 1;
2871
2872         *rule_nr = 71;
2873         self = mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2874         if (self == peer) {
2875                 if (mdev->tconn->agreed_pro_version < 96 ?
2876                     (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
2877                     (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
2878                     self + UUID_NEW_BM_OFFSET == (mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
2879                         /* The last P_SYNC_UUID did not get though. Undo the last start of
2880                            resync as sync source modifications of our UUIDs. */
2881
2882                         if (mdev->tconn->agreed_pro_version < 91)
2883                                 return -1091;
2884
2885                         __drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_HISTORY_START]);
2886                         __drbd_uuid_set(mdev, UI_HISTORY_START, mdev->ldev->md.uuid[UI_HISTORY_START + 1]);
2887
2888                         dev_info(DEV, "Last syncUUID did not get through, corrected:\n");
2889                         drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2890                                        mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2891
2892                         return 1;
2893                 }
2894         }
2895
2896
2897         *rule_nr = 80;
2898         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2899         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2900                 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2901                 if (self == peer)
2902                         return 2;
2903         }
2904
2905         *rule_nr = 90;
2906         self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2907         peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2908         if (self == peer && self != ((u64)0))
2909                 return 100;
2910
2911         *rule_nr = 100;
2912         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2913                 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2914                 for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
2915                         peer = mdev->p_uuid[j] & ~((u64)1);
2916                         if (self == peer)
2917                                 return -100;
2918                 }
2919         }
2920
2921         return -1000;
2922 }
2923
2924 /* drbd_sync_handshake() returns the new conn state on success, or
2925    CONN_MASK (-1) on failure.
2926  */
2927 static enum drbd_conns drbd_sync_handshake(struct drbd_conf *mdev, enum drbd_role peer_role,
2928                                            enum drbd_disk_state peer_disk) __must_hold(local)
2929 {
2930         enum drbd_conns rv = C_MASK;
2931         enum drbd_disk_state mydisk;
2932         struct net_conf *nc;
2933         int hg, rule_nr, rr_conflict, tentative;
2934
2935         mydisk = mdev->state.disk;
2936         if (mydisk == D_NEGOTIATING)
2937                 mydisk = mdev->new_state_tmp.disk;
2938
2939         dev_info(DEV, "drbd_sync_handshake:\n");
2940
2941         spin_lock_irq(&mdev->ldev->md.uuid_lock);
2942         drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid, mdev->comm_bm_set, 0);
2943         drbd_uuid_dump(mdev, "peer", mdev->p_uuid,
2944                        mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2945
2946         hg = drbd_uuid_compare(mdev, &rule_nr);
2947         spin_unlock_irq(&mdev->ldev->md.uuid_lock);
2948
2949         dev_info(DEV, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
2950
2951         if (hg == -1000) {
2952                 dev_alert(DEV, "Unrelated data, aborting!\n");
2953                 return C_MASK;
2954         }
2955         if (hg < -1000) {
2956                 dev_alert(DEV, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
2957                 return C_MASK;
2958         }
2959
2960         if    ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
2961             (peer_disk == D_INCONSISTENT && mydisk    > D_INCONSISTENT)) {
2962                 int f = (hg == -100) || abs(hg) == 2;
2963                 hg = mydisk > D_INCONSISTENT ? 1 : -1;
2964                 if (f)
2965                         hg = hg*2;
2966                 dev_info(DEV, "Becoming sync %s due to disk states.\n",
2967                      hg > 0 ? "source" : "target");
2968         }
2969
2970         if (abs(hg) == 100)
2971                 drbd_khelper(mdev, "initial-split-brain");
2972
2973         rcu_read_lock();
2974         nc = rcu_dereference(mdev->tconn->net_conf);
2975
2976         if (hg == 100 || (hg == -100 && nc->always_asbp)) {
2977                 int pcount = (mdev->state.role == R_PRIMARY)
2978                            + (peer_role == R_PRIMARY);
2979                 int forced = (hg == -100);
2980
2981                 switch (pcount) {
2982                 case 0:
2983                         hg = drbd_asb_recover_0p(mdev);
2984                         break;
2985                 case 1:
2986                         hg = drbd_asb_recover_1p(mdev);
2987                         break;
2988                 case 2:
2989                         hg = drbd_asb_recover_2p(mdev);
2990                         break;
2991                 }
2992                 if (abs(hg) < 100) {
2993                         dev_warn(DEV, "Split-Brain detected, %d primaries, "
2994                              "automatically solved. Sync from %s node\n",
2995                              pcount, (hg < 0) ? "peer" : "this");
2996                         if (forced) {
2997                                 dev_warn(DEV, "Doing a full sync, since"
2998                                      " UUIDs where ambiguous.\n");
2999                                 hg = hg*2;
3000                         }
3001                 }
3002         }
3003
3004         if (hg == -100) {
3005                 if (test_bit(DISCARD_MY_DATA, &mdev->flags) && !(mdev->p_uuid[UI_FLAGS]&1))
3006                         hg = -1;
3007                 if (!test_bit(DISCARD_MY_DATA, &mdev->flags) && (mdev->p_uuid[UI_FLAGS]&1))
3008                         hg = 1;
3009
3010                 if (abs(hg) < 100)
3011                         dev_warn(DEV, "Split-Brain detected, manually solved. "
3012                              "Sync from %s node\n",
3013                              (hg < 0) ? "peer" : "this");
3014         }
3015         rr_conflict = nc->rr_conflict;
3016         tentative = nc->tentative;
3017         rcu_read_unlock();
3018
3019         if (hg == -100) {
3020                 /* FIXME this log message is not correct if we end up here
3021                  * after an attempted attach on a diskless node.
3022                  * We just refuse to attach -- well, we drop the "connection"
3023                  * to that disk, in a way... */
3024                 dev_alert(DEV, "Split-Brain detected but unresolved, dropping connection!\n");
3025                 drbd_khelper(mdev, "split-brain");
3026                 return C_MASK;
3027         }
3028
3029         if (hg > 0 && mydisk <= D_INCONSISTENT) {
3030                 dev_err(DEV, "I shall become SyncSource, but I am inconsistent!\n");
3031                 return C_MASK;
3032         }
3033
3034         if (hg < 0 && /* by intention we do not use mydisk here. */
3035             mdev->state.role == R_PRIMARY && mdev->state.disk >= D_CONSISTENT) {
3036                 switch (rr_conflict) {
3037                 case ASB_CALL_HELPER:
3038                         drbd_khelper(mdev, "pri-lost");
3039                         /* fall through */
3040                 case ASB_DISCONNECT:
3041                         dev_err(DEV, "I shall become SyncTarget, but I am primary!\n");
3042                         return C_MASK;
3043                 case ASB_VIOLENTLY:
3044                         dev_warn(DEV, "Becoming SyncTarget, violating the stable-data"
3045                              "assumption\n");
3046                 }
3047         }
3048
3049         if (tentative || test_bit(CONN_DRY_RUN, &mdev->tconn->flags)) {
3050                 if (hg == 0)
3051                         dev_info(DEV, "dry-run connect: No resync, would become Connected immediately.\n");
3052                 else
3053                         dev_info(DEV, "dry-run connect: Would become %s, doing a %s resync.",
3054                                  drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
3055                                  abs(hg) >= 2 ? "full" : "bit-map based");
3056                 return C_MASK;
3057         }
3058
3059         if (abs(hg) >= 2) {
3060                 dev_info(DEV, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
3061                 if (drbd_bitmap_io(mdev, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
3062                                         BM_LOCKED_SET_ALLOWED))
3063                         return C_MASK;
3064         }
3065
3066         if (hg > 0) { /* become sync source. */
3067                 rv = C_WF_BITMAP_S;
3068         } else if (hg < 0) { /* become sync target */
3069                 rv = C_WF_BITMAP_T;
3070         } else {
3071                 rv = C_CONNECTED;
3072                 if (drbd_bm_total_weight(mdev)) {
3073                         dev_info(DEV, "No resync, but %lu bits in bitmap!\n",
3074                              drbd_bm_total_weight(mdev));
3075                 }
3076         }
3077
3078         return rv;
3079 }
3080
3081 static enum drbd_after_sb_p convert_after_sb(enum drbd_after_sb_p peer)
3082 {
3083         /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
3084         if (peer == ASB_DISCARD_REMOTE)
3085                 return ASB_DISCARD_LOCAL;
3086
3087         /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
3088         if (peer == ASB_DISCARD_LOCAL)
3089                 return ASB_DISCARD_REMOTE;
3090
3091         /* everything else is valid if they are equal on both sides. */
3092         return peer;
3093 }
3094
3095 static int receive_protocol(struct drbd_tconn *tconn, struct packet_info *pi)
3096 {
3097         struct p_protocol *p = pi->data;
3098         enum drbd_after_sb_p p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
3099         int p_proto, p_discard_my_data, p_two_primaries, cf;
3100         struct net_conf *nc, *old_net_conf, *new_net_conf = NULL;
3101         char integrity_alg[SHARED_SECRET_MAX] = "";
3102         struct crypto_hash *peer_integrity_tfm = NULL;
3103         void *int_dig_in = NULL, *int_dig_vv = NULL;
3104
3105         p_proto         = be32_to_cpu(p->protocol);
3106         p_after_sb_0p   = be32_to_cpu(p->after_sb_0p);
3107         p_after_sb_1p   = be32_to_cpu(p->after_sb_1p);
3108         p_after_sb_2p   = be32_to_cpu(p->after_sb_2p);
3109         p_two_primaries = be32_to_cpu(p->two_primaries);
3110         cf              = be32_to_cpu(p->conn_flags);
3111         p_discard_my_data = cf & CF_DISCARD_MY_DATA;
3112
3113         if (tconn->agreed_pro_version >= 87) {
3114                 int err;
3115
3116                 if (pi->size > sizeof(integrity_alg))
3117                         return -EIO;
3118                 err = drbd_recv_all(tconn, integrity_alg, pi->size);
3119                 if (err)
3120                         return err;
3121                 integrity_alg[SHARED_SECRET_MAX - 1] = 0;
3122         }
3123
3124         if (pi->cmd != P_PROTOCOL_UPDATE) {
3125                 clear_bit(CONN_DRY_RUN, &tconn->flags);
3126
3127                 if (cf & CF_DRY_RUN)
3128                         set_bit(CONN_DRY_RUN, &tconn->flags);
3129
3130                 rcu_read_lock();
3131                 nc = rcu_dereference(tconn->net_conf);
3132
3133                 if (p_proto != nc->wire_protocol) {
3134                         conn_err(tconn, "incompatible %s settings\n", "protocol");
3135                         goto disconnect_rcu_unlock;
3136                 }
3137
3138                 if (convert_after_sb(p_after_sb_0p) != nc->after_sb_0p) {
3139                         conn_err(tconn, "incompatible %s settings\n", "after-sb-0pri");
3140                         goto disconnect_rcu_unlock;
3141                 }
3142
3143                 if (convert_after_sb(p_after_sb_1p) != nc->after_sb_1p) {
3144                         conn_err(tconn, "incompatible %s settings\n", "after-sb-1pri");
3145                         goto disconnect_rcu_unlock;
3146                 }
3147
3148                 if (convert_after_sb(p_after_sb_2p) != nc->after_sb_2p) {
3149                         conn_err(tconn, "incompatible %s settings\n", "after-sb-2pri");
3150                         goto disconnect_rcu_unlock;
3151                 }
3152
3153                 if (p_discard_my_data && nc->discard_my_data) {
3154                         conn_err(tconn, "incompatible %s settings\n", "discard-my-data");
3155                         goto disconnect_rcu_unlock;
3156                 }
3157
3158                 if (p_two_primaries != nc->two_primaries) {
3159                         conn_err(tconn, "incompatible %s settings\n", "allow-two-primaries");
3160                         goto disconnect_rcu_unlock;
3161                 }
3162
3163                 if (strcmp(integrity_alg, nc->integrity_alg)) {
3164                         conn_err(tconn, "incompatible %s settings\n", "data-integrity-alg");
3165                         goto disconnect_rcu_unlock;
3166                 }
3167
3168                 rcu_read_unlock();
3169         }
3170
3171         if (integrity_alg[0]) {
3172                 int hash_size;
3173
3174                 /*
3175                  * We can only change the peer data integrity algorithm
3176                  * here.  Changing our own data integrity algorithm
3177                  * requires that we send a P_PROTOCOL_UPDATE packet at
3178                  * the same time; otherwise, the peer has no way to
3179                  * tell between which packets the algorithm should
3180                  * change.
3181                  */
3182
3183                 peer_integrity_tfm = crypto_alloc_hash(integrity_alg, 0, CRYPTO_ALG_ASYNC);
3184                 if (!peer_integrity_tfm) {
3185                         conn_err(tconn, "peer data-integrity-alg %s not supported\n",
3186                                  integrity_alg);
3187                         goto disconnect;
3188                 }
3189
3190                 hash_size = crypto_hash_digestsize(peer_integrity_tfm);
3191                 int_dig_in = kmalloc(hash_size, GFP_KERNEL);
3192                 int_dig_vv = kmalloc(hash_size, GFP_KERNEL);
3193                 if (!(int_dig_in && int_dig_vv)) {
3194                         conn_err(tconn, "Allocation of buffers for data integrity checking failed\n");
3195                         goto disconnect;
3196                 }
3197         }
3198
3199         new_net_conf = kmalloc(sizeof(struct net_conf), GFP_KERNEL);
3200         if (!new_net_conf) {
3201                 conn_err(tconn, "Allocation of new net_conf failed\n");
3202                 goto disconnect;
3203         }
3204
3205         mutex_lock(&tconn->data.mutex);
3206         mutex_lock(&tconn->conf_update);
3207         old_net_conf = tconn->net_conf;
3208         *new_net_conf = *old_net_conf;
3209
3210         new_net_conf->wire_protocol = p_proto;
3211         new_net_conf->after_sb_0p = convert_after_sb(p_after_sb_0p);
3212         new_net_conf->after_sb_1p = convert_after_sb(p_after_sb_1p);
3213         new_net_conf->after_sb_2p = convert_after_sb(p_after_sb_2p);
3214         new_net_conf->two_primaries = p_two_primaries;
3215
3216         rcu_assign_pointer(tconn->net_conf, new_net_conf);
3217         mutex_unlock(&tconn->conf_update);
3218         mutex_unlock(&tconn->data.mutex);
3219
3220         crypto_free_hash(tconn->peer_integrity_tfm);
3221         kfree(tconn->int_dig_in);
3222         kfree(tconn->int_dig_vv);
3223         tconn->peer_integrity_tfm = peer_integrity_tfm;
3224         tconn->int_dig_in = int_dig_in;
3225         tconn->int_dig_vv = int_dig_vv;
3226
3227         if (strcmp(old_net_conf->integrity_alg, integrity_alg))
3228                 conn_info(tconn, "peer data-integrity-alg: %s\n",
3229                           integrity_alg[0] ? integrity_alg : "(none)");
3230
3231         synchronize_rcu();
3232         kfree(old_net_conf);
3233         return 0;
3234
3235 disconnect_rcu_unlock:
3236         rcu_read_unlock();
3237 disconnect:
3238         crypto_free_hash(peer_integrity_tfm);
3239         kfree(int_dig_in);
3240         kfree(int_dig_vv);
3241         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3242         return -EIO;
3243 }
3244
3245 /* helper function
3246  * input: alg name, feature name
3247  * return: NULL (alg name was "")
3248  *         ERR_PTR(error) if something goes wrong
3249  *         or the crypto hash ptr, if it worked out ok. */
3250 struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_conf *mdev,
3251                 const char *alg, const char *name)
3252 {
3253         struct crypto_hash *tfm;
3254
3255         if (!alg[0])
3256                 return NULL;
3257
3258         tfm = crypto_alloc_hash(alg, 0, CRYPTO_ALG_ASYNC);
3259         if (IS_ERR(tfm)) {
3260                 dev_err(DEV, "Can not allocate \"%s\" as %s (reason: %ld)\n",
3261                         alg, name, PTR_ERR(tfm));
3262                 return tfm;
3263         }
3264         return tfm;
3265 }
3266
3267 static int ignore_remaining_packet(struct drbd_tconn *tconn, struct packet_info *pi)
3268 {
3269         void *buffer = tconn->data.rbuf;
3270         int size = pi->size;
3271
3272         while (size) {
3273                 int s = min_t(int, size, DRBD_SOCKET_BUFFER_SIZE);
3274                 s = drbd_recv(tconn, buffer, s);
3275                 if (s <= 0) {
3276                         if (s < 0)
3277                                 return s;
3278                         break;
3279                 }
3280                 size -= s;
3281         }
3282         if (size)
3283                 return -EIO;
3284         return 0;
3285 }
3286
3287 /*
3288  * config_unknown_volume  -  device configuration command for unknown volume
3289  *
3290  * When a device is added to an existing connection, the node on which the
3291  * device is added first will send configuration commands to its peer but the
3292  * peer will not know about the device yet.  It will warn and ignore these
3293  * commands.  Once the device is added on the second node, the second node will
3294  * send the same device configuration commands, but in the other direction.
3295  *
3296  * (We can also end up here if drbd is misconfigured.)
3297  */
3298 static int config_unknown_volume(struct drbd_tconn *tconn, struct packet_info *pi)
3299 {
3300         conn_warn(tconn, "%s packet received for volume %u, which is not configured locally\n",
3301                   cmdname(pi->cmd), pi->vnr);
3302         return ignore_remaining_packet(tconn, pi);
3303 }
3304
3305 static int receive_SyncParam(struct drbd_tconn *tconn, struct packet_info *pi)
3306 {
3307         struct drbd_conf *mdev;
3308         struct p_rs_param_95 *p;
3309         unsigned int header_size, data_size, exp_max_sz;
3310         struct crypto_hash *verify_tfm = NULL;
3311         struct crypto_hash *csums_tfm = NULL;
3312         struct net_conf *old_net_conf, *new_net_conf = NULL;
3313         struct disk_conf *old_disk_conf = NULL, *new_disk_conf = NULL;
3314         const int apv = tconn->agreed_pro_version;
3315         struct fifo_buffer *old_plan = NULL, *new_plan = NULL;
3316         int fifo_size = 0;
3317         int err;
3318
3319         mdev = vnr_to_mdev(tconn, pi->vnr);
3320         if (!mdev)
3321                 return config_unknown_volume(tconn, pi);
3322
3323         exp_max_sz  = apv <= 87 ? sizeof(struct p_rs_param)
3324                     : apv == 88 ? sizeof(struct p_rs_param)
3325                                         + SHARED_SECRET_MAX
3326                     : apv <= 94 ? sizeof(struct p_rs_param_89)
3327                     : /* apv >= 95 */ sizeof(struct p_rs_param_95);
3328
3329         if (pi->size > exp_max_sz) {
3330                 dev_err(DEV, "SyncParam packet too long: received %u, expected <= %u bytes\n",
3331                     pi->size, exp_max_sz);
3332                 return -EIO;
3333         }
3334
3335         if (apv <= 88) {
3336                 header_size = sizeof(struct p_rs_param);
3337                 data_size = pi->size - header_size;
3338         } else if (apv <= 94) {
3339                 header_size = sizeof(struct p_rs_param_89);
3340                 data_size = pi->size - header_size;
3341                 D_ASSERT(data_size == 0);
3342         } else {
3343                 header_size = sizeof(struct p_rs_param_95);
3344                 data_size = pi->size - header_size;
3345                 D_ASSERT(data_size == 0);
3346         }
3347
3348         /* initialize verify_alg and csums_alg */
3349         p = pi->data;
3350         memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
3351
3352         err = drbd_recv_all(mdev->tconn, p, header_size);
3353         if (err)
3354                 return err;
3355
3356         mutex_lock(&mdev->tconn->conf_update);
3357         old_net_conf = mdev->tconn->net_conf;
3358         if (get_ldev(mdev)) {
3359                 new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3360                 if (!new_disk_conf) {
3361                         put_ldev(mdev);
3362                         mutex_unlock(&mdev->tconn->conf_update);
3363                         dev_err(DEV, "Allocation of new disk_conf failed\n");
3364                         return -ENOMEM;
3365                 }
3366
3367                 old_disk_conf = mdev->ldev->disk_conf;
3368                 *new_disk_conf = *old_disk_conf;
3369
3370                 new_disk_conf->resync_rate = be32_to_cpu(p->resync_rate);
3371         }
3372
3373         if (apv >= 88) {
3374                 if (apv == 88) {
3375                         if (data_size > SHARED_SECRET_MAX || data_size == 0) {
3376                                 dev_err(DEV, "verify-alg of wrong size, "
3377                                         "peer wants %u, accepting only up to %u byte\n",
3378                                         data_size, SHARED_SECRET_MAX);
3379                                 err = -EIO;
3380                                 goto reconnect;
3381                         }
3382
3383                         err = drbd_recv_all(mdev->tconn, p->verify_alg, data_size);
3384                         if (err)
3385                                 goto reconnect;
3386                         /* we expect NUL terminated string */
3387                         /* but just in case someone tries to be evil */
3388                         D_ASSERT(p->verify_alg[data_size-1] == 0);
3389                         p->verify_alg[data_size-1] = 0;
3390
3391                 } else /* apv >= 89 */ {
3392                         /* we still expect NUL terminated strings */
3393                         /* but just in case someone tries to be evil */
3394                         D_ASSERT(p->verify_alg[SHARED_SECRET_MAX-1] == 0);
3395                         D_ASSERT(p->csums_alg[SHARED_SECRET_MAX-1] == 0);
3396                         p->verify_alg[SHARED_SECRET_MAX-1] = 0;
3397                         p->csums_alg[SHARED_SECRET_MAX-1] = 0;
3398                 }
3399
3400                 if (strcmp(old_net_conf->verify_alg, p->verify_alg)) {
3401                         if (mdev->state.conn == C_WF_REPORT_PARAMS) {
3402                                 dev_err(DEV, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
3403                                     old_net_conf->verify_alg, p->verify_alg);
3404                                 goto disconnect;
3405                         }
3406                         verify_tfm = drbd_crypto_alloc_digest_safe(mdev,
3407                                         p->verify_alg, "verify-alg");
3408                         if (IS_ERR(verify_tfm)) {
3409                                 verify_tfm = NULL;
3410                                 goto disconnect;
3411                         }
3412                 }
3413
3414                 if (apv >= 89 && strcmp(old_net_conf->csums_alg, p->csums_alg)) {
3415                         if (mdev->state.conn == C_WF_REPORT_PARAMS) {
3416                                 dev_err(DEV, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
3417                                     old_net_conf->csums_alg, p->csums_alg);
3418                                 goto disconnect;
3419                         }
3420                         csums_tfm = drbd_crypto_alloc_digest_safe(mdev,
3421                                         p->csums_alg, "csums-alg");
3422                         if (IS_ERR(csums_tfm)) {
3423                                 csums_tfm = NULL;
3424                                 goto disconnect;
3425                         }
3426                 }
3427
3428                 if (apv > 94 && new_disk_conf) {
3429                         new_disk_conf->c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
3430                         new_disk_conf->c_delay_target = be32_to_cpu(p->c_delay_target);
3431                         new_disk_conf->c_fill_target = be32_to_cpu(p->c_fill_target);
3432                         new_disk_conf->c_max_rate = be32_to_cpu(p->c_max_rate);
3433
3434                         fifo_size = (new_disk_conf->c_plan_ahead * 10 * SLEEP_TIME) / HZ;
3435                         if (fifo_size != mdev->rs_plan_s->size) {
3436                                 new_plan = fifo_alloc(fifo_size);
3437                                 if (!new_plan) {
3438                                         dev_err(DEV, "kmalloc of fifo_buffer failed");
3439                                         put_ldev(mdev);
3440                                         goto disconnect;
3441                                 }
3442                         }
3443                 }
3444
3445                 if (verify_tfm || csums_tfm) {
3446                         new_net_conf = kzalloc(sizeof(struct net_conf), GFP_KERNEL);
3447                         if (!new_net_conf) {
3448                                 dev_err(DEV, "Allocation of new net_conf failed\n");
3449                                 goto disconnect;
3450                         }
3451
3452                         *new_net_conf = *old_net_conf;
3453
3454                         if (verify_tfm) {
3455                                 strcpy(new_net_conf->verify_alg, p->verify_alg);
3456                                 new_net_conf->verify_alg_len = strlen(p->verify_alg) + 1;
3457                                 crypto_free_hash(mdev->tconn->verify_tfm);
3458                                 mdev->tconn->verify_tfm = verify_tfm;
3459                                 dev_info(DEV, "using verify-alg: \"%s\"\n", p->verify_alg);
3460                         }
3461                         if (csums_tfm) {
3462                                 strcpy(new_net_conf->csums_alg, p->csums_alg);
3463                                 new_net_conf->csums_alg_len = strlen(p->csums_alg) + 1;
3464                                 crypto_free_hash(mdev->tconn->csums_tfm);
3465                                 mdev->tconn->csums_tfm = csums_tfm;
3466                                 dev_info(DEV, "using csums-alg: \"%s\"\n", p->csums_alg);
3467                         }
3468                         rcu_assign_pointer(tconn->net_conf, new_net_conf);
3469                 }
3470         }
3471
3472         if (new_disk_conf) {
3473                 rcu_assign_pointer(mdev->ldev->disk_conf, new_disk_conf);
3474                 put_ldev(mdev);
3475         }
3476
3477         if (new_plan) {
3478                 old_plan = mdev->rs_plan_s;
3479                 rcu_assign_pointer(mdev->rs_plan_s, new_plan);
3480         }
3481
3482         mutex_unlock(&mdev->tconn->conf_update);
3483         synchronize_rcu();
3484         if (new_net_conf)
3485                 kfree(old_net_conf);
3486         kfree(old_disk_conf);
3487         kfree(old_plan);
3488
3489         return 0;
3490
3491 reconnect:
3492         if (new_disk_conf) {
3493                 put_ldev(mdev);
3494                 kfree(new_disk_conf);
3495         }
3496         mutex_unlock(&mdev->tconn->conf_update);
3497         return -EIO;
3498
3499 disconnect:
3500         kfree(new_plan);
3501         if (new_disk_conf) {
3502                 put_ldev(mdev);
3503                 kfree(new_disk_conf);
3504         }
3505         mutex_unlock(&mdev->tconn->conf_update);
3506         /* just for completeness: actually not needed,
3507          * as this is not reached if csums_tfm was ok. */
3508         crypto_free_hash(csums_tfm);
3509         /* but free the verify_tfm again, if csums_tfm did not work out */
3510         crypto_free_hash(verify_tfm);
3511         conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3512         return -EIO;
3513 }
3514
3515 /* warn if the arguments differ by more than 12.5% */
3516 static void warn_if_differ_considerably(struct drbd_conf *mdev,
3517         const char *s, sector_t a, sector_t b)
3518 {
3519         sector_t d;
3520         if (a == 0 || b == 0)
3521                 return;
3522         d = (a > b) ? (a - b) : (b - a);
3523         if (d > (a>>3) || d > (b>>3))
3524                 dev_warn(DEV, "Considerable difference in %s: %llus vs. %llus\n", s,
3525                      (unsigned long long)a, (unsigned long long)b);
3526 }
3527
3528 static int receive_sizes(struct drbd_tconn *tconn, struct packet_info *pi)
3529 {
3530         struct drbd_conf *mdev;
3531         struct p_sizes *p = pi->data;
3532         enum determine_dev_size dd = DS_UNCHANGED;
3533         sector_t p_size, p_usize, my_usize;
3534         int ldsc = 0; /* local disk size changed */
3535         enum dds_flags ddsf;
3536
3537         mdev = vnr_to_mdev(tconn, pi->vnr);
3538         if (!mdev)
3539                 return config_unknown_volume(tconn, pi);
3540
3541         p_size = be64_to_cpu(p->d_size);
3542         p_usize = be64_to_cpu(p->u_size);
3543
3544         /* just store the peer's disk size for now.
3545          * we still need to figure out whether we accept that. */
3546         mdev->p_size = p_size;
3547
3548         if (get_ldev(mdev)) {
3549                 rcu_read_lock();
3550                 my_usize = rcu_dereference(mdev->ldev->disk_conf)->disk_size;
3551                 rcu_read_unlock();
3552
3553                 warn_if_differ_considerably(mdev, "lower level device sizes",
3554                            p_size, drbd_get_max_capacity(mdev->ldev));
3555                 warn_if_differ_considerably(mdev, "user requested size",
3556                                             p_usize, my_usize);
3557
3558                 /* if this is the first connect, or an otherwise expected
3559                  * param exchange, choose the minimum */
3560                 if (mdev->state.conn == C_WF_REPORT_PARAMS)
3561                         p_usize = min_not_zero(my_usize, p_usize);
3562
3563                 /* Never shrink a device with usable data during connect.
3564                    But allow online shrinking if we are connected. */
3565                 if (drbd_new_dev_size(mdev, mdev->ldev, p_usize, 0) <
3566                     drbd_get_capacity(mdev->this_bdev) &&
3567                     mdev->state.disk >= D_OUTDATED &&
3568                     mdev->state.conn < C_CONNECTED) {
3569                         dev_err(DEV, "The peer's disk size is too small!\n");
3570                         conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3571                         put_ldev(mdev);
3572                         return -EIO;
3573                 }
3574
3575                 if (my_usize != p_usize) {
3576                         struct disk_conf *old_disk_conf, *new_disk_conf = NULL;
3577
3578                         new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3579                         if (!new_disk_conf) {
3580                                 dev_err(DEV, "Allocation of new disk_conf failed\n");
3581                                 put_ldev(mdev);
3582                                 return -ENOMEM;
3583                         }
3584
3585                         mutex_lock(&mdev->tconn->conf_update);
3586                         old_disk_conf = mdev->ldev->disk_conf;
3587                         *new_disk_conf = *old_disk_conf;
3588                         new_disk_conf->disk_size = p_usize;
3589
3590                         rcu_assign_pointer(mdev->ldev->disk_conf, new_disk_conf);
3591                         mutex_unlock(&mdev->tconn->conf_update);
3592                         synchronize_rcu();
3593                         kfree(old_disk_conf);
3594
3595                         dev_info(DEV, "Peer sets u_size to %lu sectors\n",
3596                                  (unsigned long)my_usize);
3597                 }
3598
3599                 put_ldev(mdev);
3600         }
3601
3602         ddsf = be16_to_cpu(p->dds_flags);
3603         if (get_ldev(mdev)) {
3604                 dd = drbd_determine_dev_size(mdev, ddsf, NULL);
3605                 put_ldev(mdev);
3606                 if (dd == DS_ERROR)
3607                         return -EIO;
3608                 drbd_md_sync(mdev);
3609         } else {
3610                 /* I am diskless, need to accept the peer's size. */
3611                 drbd_set_my_capacity(mdev, p_size);
3612         }
3613
3614         mdev->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
3615         drbd_reconsider_max_bio_size(mdev);
3616
3617         if (get_ldev(mdev)) {
3618                 if (mdev->ldev->known_size != drbd_get_capacity(mdev->ldev->backing_bdev)) {
3619                         mdev->ldev->known_size = drbd_get_capacity(mdev->ldev->backing_bdev);
3620                         ldsc = 1;
3621                 }
3622
3623                 put_ldev(mdev);
3624         }
3625
3626         if (mdev->state.conn > C_WF_REPORT_PARAMS) {
3627                 if (be64_to_cpu(p->c_size) !=
3628                     drbd_get_capacity(mdev->this_bdev) || ldsc) {
3629                         /* we have different sizes, probably peer
3630                          * needs to know my new size... */
3631                         drbd_send_sizes(mdev, 0, ddsf);
3632                 }
3633                 if (test_and_clear_bit(RESIZE_PENDING, &mdev->flags) ||
3634                     (dd == DS_GREW && mdev->state.conn == C_CONNECTED)) {
3635                         if (mdev->state.pdsk >= D_INCONSISTENT &&
3636                             mdev->state.disk >= D_INCONSISTENT) {
3637                                 if (ddsf & DDSF_NO_RESYNC)
3638                                         dev_info(DEV, "Resync of new storage suppressed with --assume-clean\n");
3639                                 else
3640                                         resync_after_online_grow(mdev);
3641                         } else
3642                                 set_bit(RESYNC_AFTER_NEG, &mdev->flags);
3643                 }
3644         }
3645
3646         return 0;
3647 }
3648
3649 static int receive_uuids(struct drbd_tconn *tconn, struct packet_info *pi)
3650 {
3651         struct drbd_conf *mdev;
3652         struct p_uuids *p = pi->data;
3653         u64 *p_uuid;
3654         int i, updated_uuids = 0;
3655
3656         mdev = vnr_to_mdev(tconn, pi->vnr);
3657         if (!mdev)
3658                 return config_unknown_volume(tconn, pi);
3659
3660         p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
3661         if (!p_uuid) {
3662                 dev_err(DEV, "kmalloc of p_uuid failed\n");
3663                 return false;
3664         }
3665
3666         for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
3667                 p_uuid[i] = be64_to_cpu(p->uuid[i]);
3668
3669         kfree(mdev->p_uuid);
3670         mdev->p_uuid = p_uuid;
3671
3672         if (mdev->state.conn < C_CONNECTED &&
3673             mdev->state.disk < D_INCONSISTENT &&
3674             mdev->state.role == R_PRIMARY &&
3675             (mdev->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
3676                 dev_err(DEV, "Can only connect to data with current UUID=%016llX\n",
3677                     (unsigned long long)mdev->ed_uuid);
3678                 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3679                 return -EIO;
3680         }
3681
3682         if (get_ldev(mdev)) {
3683                 int skip_initial_sync =
3684                         mdev->state.conn == C_CONNECTED &&
3685                         mdev->tconn->agreed_pro_version >= 90 &&
3686                         mdev->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
3687                         (p_uuid[UI_FLAGS] & 8);
3688                 if (skip_initial_sync) {
3689                         dev_info(DEV, "Accepted new current UUID, preparing to skip initial sync\n");
3690                         drbd_bitmap_io(mdev, &drbd_bmio_clear_n_write,
3691                                         "clear_n_write from receive_uuids",
3692                                         BM_LOCKED_TEST_ALLOWED);
3693                         _drbd_uuid_set(mdev, UI_CURRENT, p_uuid[UI_CURRENT]);
3694                         _drbd_uuid_set(mdev, UI_BITMAP, 0);
3695                         _drbd_set_state(_NS2(mdev, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
3696                                         CS_VERBOSE, NULL);
3697                         drbd_md_sync(mdev);
3698                         updated_uuids = 1;
3699                 }
3700                 put_ldev(mdev);
3701         } else if (mdev->state.disk < D_INCONSISTENT &&
3702                    mdev->state.role == R_PRIMARY) {
3703                 /* I am a diskless primary, the peer just created a new current UUID
3704                    for me. */
3705                 updated_uuids = drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3706         }
3707
3708         /* Before we test for the disk state, we should wait until an eventually
3709            ongoing cluster wide state change is finished. That is important if
3710            we are primary and are detaching from our disk. We need to see the
3711            new disk state... */
3712         mutex_lock(mdev->state_mutex);
3713         mutex_unlock(mdev->state_mutex);
3714         if (mdev->state.conn >= C_CONNECTED && mdev->state.disk < D_INCONSISTENT)
3715                 updated_uuids |= drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3716
3717         if (updated_uuids)
3718                 drbd_print_uuids(mdev, "receiver updated UUIDs to");
3719
3720         return 0;
3721 }
3722
3723 /**
3724  * convert_state() - Converts the peer's view of the cluster state to our point of view
3725  * @ps:         The state as seen by the peer.
3726  */
3727 static union drbd_state convert_state(union drbd_state ps)
3728 {
3729         union drbd_state ms;
3730
3731         static enum drbd_conns c_tab[] = {
3732                 [C_WF_REPORT_PARAMS] = C_WF_REPORT_PARAMS,
3733                 [C_CONNECTED] = C_CONNECTED,
3734
3735                 [C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
3736                 [C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
3737                 [C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
3738                 [C_VERIFY_S]       = C_VERIFY_T,
3739                 [C_MASK]   = C_MASK,
3740         };
3741
3742         ms.i = ps.i;
3743
3744         ms.conn = c_tab[ps.conn];
3745         ms.peer = ps.role;
3746         ms.role = ps.peer;
3747         ms.pdsk = ps.disk;
3748         ms.disk = ps.pdsk;
3749         ms.peer_isp = (ps.aftr_isp | ps.user_isp);
3750
3751         return ms;
3752 }
3753
3754 static int receive_req_state(struct drbd_tconn *tconn, struct packet_info *pi)
3755 {
3756         struct drbd_conf *mdev;
3757         struct p_req_state *p = pi->data;
3758         union drbd_state mask, val;
3759         enum drbd_state_rv rv;
3760
3761         mdev = vnr_to_mdev(tconn, pi->vnr);
3762         if (!mdev)
3763                 return -EIO;
3764
3765         mask.i = be32_to_cpu(p->mask);
3766         val.i = be32_to_cpu(p->val);
3767
3768         if (test_bit(RESOLVE_CONFLICTS, &mdev->tconn->flags) &&
3769             mutex_is_locked(mdev->state_mutex)) {
3770                 drbd_send_sr_reply(mdev, SS_CONCURRENT_ST_CHG);
3771                 return 0;
3772         }
3773
3774         mask = convert_state(mask);
3775         val = convert_state(val);
3776
3777         rv = drbd_change_state(mdev, CS_VERBOSE, mask, val);
3778         drbd_send_sr_reply(mdev, rv);
3779
3780         drbd_md_sync(mdev);
3781
3782         return 0;
3783 }
3784
3785 static int receive_req_conn_state(struct drbd_tconn *tconn, struct packet_info *pi)
3786 {
3787         struct p_req_state *p = pi->data;
3788         union drbd_state mask, val;
3789         enum drbd_state_rv rv;
3790
3791         mask.i = be32_to_cpu(p->mask);
3792         val.i = be32_to_cpu(p->val);
3793
3794         if (test_bit(RESOLVE_CONFLICTS, &tconn->flags) &&
3795             mutex_is_locked(&tconn->cstate_mutex)) {
3796                 conn_send_sr_reply(tconn, SS_CONCURRENT_ST_CHG);
3797                 return 0;
3798         }
3799
3800         mask = convert_state(mask);
3801         val = convert_state(val);
3802
3803         rv = conn_request_state(tconn, mask, val, CS_VERBOSE | CS_LOCAL_ONLY | CS_IGN_OUTD_FAIL);
3804         conn_send_sr_reply(tconn, rv);
3805
3806         return 0;
3807 }
3808
3809 static int receive_state(struct drbd_tconn *tconn, struct packet_info *pi)
3810 {
3811         struct drbd_conf *mdev;
3812         struct p_state *p = pi->data;
3813         union drbd_state os, ns, peer_state;
3814         enum drbd_disk_state real_peer_disk;
3815         enum chg_state_flags cs_flags;
3816         int rv;
3817
3818         mdev = vnr_to_mdev(tconn, pi->vnr);
3819         if (!mdev)
3820                 return config_unknown_volume(tconn, pi);
3821
3822         peer_state.i = be32_to_cpu(p->state);
3823
3824         real_peer_disk = peer_state.disk;
3825         if (peer_state.disk == D_NEGOTIATING) {
3826                 real_peer_disk = mdev->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
3827                 dev_info(DEV, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
3828         }
3829
3830         spin_lock_irq(&mdev->tconn->req_lock);
3831  retry:
3832         os = ns = drbd_read_state(mdev);
3833         spin_unlock_irq(&mdev->tconn->req_lock);
3834
3835         /* If some other part of the code (asender thread, timeout)
3836          * already decided to close the connection again,
3837          * we must not "re-establish" it here. */
3838         if (os.conn <= C_TEAR_DOWN)
3839                 return -ECONNRESET;
3840
3841         /* If this is the "end of sync" confirmation, usually the peer disk
3842          * transitions from D_INCONSISTENT to D_UP_TO_DATE. For empty (0 bits
3843          * set) resync started in PausedSyncT, or if the timing of pause-/
3844          * unpause-sync events has been "just right", the peer disk may
3845          * transition from D_CONSISTENT to D_UP_TO_DATE as well.
3846          */
3847         if ((os.pdsk == D_INCONSISTENT || os.pdsk == D_CONSISTENT) &&
3848             real_peer_disk == D_UP_TO_DATE &&
3849             os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
3850                 /* If we are (becoming) SyncSource, but peer is still in sync
3851                  * preparation, ignore its uptodate-ness to avoid flapping, it
3852                  * will change to inconsistent once the peer reaches active
3853                  * syncing states.
3854                  * It may have changed syncer-paused flags, however, so we
3855                  * cannot ignore this completely. */
3856                 if (peer_state.conn > C_CONNECTED &&
3857                     peer_state.conn < C_SYNC_SOURCE)
3858                         real_peer_disk = D_INCONSISTENT;
3859
3860                 /* if peer_state changes to connected at the same time,
3861                  * it explicitly notifies us that it finished resync.
3862                  * Maybe we should finish it up, too? */
3863                 else if (os.conn >= C_SYNC_SOURCE &&
3864                          peer_state.conn == C_CONNECTED) {
3865                         if (drbd_bm_total_weight(mdev) <= mdev->rs_failed)
3866                                 drbd_resync_finished(mdev);
3867                         return 0;
3868                 }
3869         }
3870
3871         /* explicit verify finished notification, stop sector reached. */
3872         if (os.conn == C_VERIFY_T && os.disk == D_UP_TO_DATE &&
3873             peer_state.conn == C_CONNECTED && real_peer_disk == D_UP_TO_DATE) {
3874                 ov_out_of_sync_print(mdev);
3875                 drbd_resync_finished(mdev);
3876                 return 0;
3877         }
3878
3879         /* peer says his disk is inconsistent, while we think it is uptodate,
3880          * and this happens while the peer still thinks we have a sync going on,
3881          * but we think we are already done with the sync.
3882          * We ignore this to avoid flapping pdsk.
3883          * This should not happen, if the peer is a recent version of drbd. */
3884         if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
3885             os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
3886                 real_peer_disk = D_UP_TO_DATE;
3887
3888         if (ns.conn == C_WF_REPORT_PARAMS)
3889                 ns.conn = C_CONNECTED;
3890
3891         if (peer_state.conn == C_AHEAD)
3892                 ns.conn = C_BEHIND;
3893
3894         if (mdev->p_uuid && peer_state.disk >= D_NEGOTIATING &&
3895             get_ldev_if_state(mdev, D_NEGOTIATING)) {
3896                 int cr; /* consider resync */
3897
3898                 /* if we established a new connection */
3899                 cr  = (os.conn < C_CONNECTED);
3900                 /* if we had an established connection
3901                  * and one of the nodes newly attaches a disk */
3902                 cr |= (os.conn == C_CONNECTED &&
3903                        (peer_state.disk == D_NEGOTIATING ||
3904                         os.disk == D_NEGOTIATING));
3905                 /* if we have both been inconsistent, and the peer has been
3906                  * forced to be UpToDate with --overwrite-data */
3907                 cr |= test_bit(CONSIDER_RESYNC, &mdev->flags);
3908                 /* if we had been plain connected, and the admin requested to
3909                  * start a sync by "invalidate" or "invalidate-remote" */
3910                 cr |= (os.conn == C_CONNECTED &&
3911                                 (peer_state.conn >= C_STARTING_SYNC_S &&
3912                                  peer_state.conn <= C_WF_BITMAP_T));
3913
3914                 if (cr)
3915                         ns.conn = drbd_sync_handshake(mdev, peer_state.role, real_peer_disk);
3916
3917                 put_ldev(mdev);
3918                 if (ns.conn == C_MASK) {
3919                         ns.conn = C_CONNECTED;
3920                         if (mdev->state.disk == D_NEGOTIATING) {
3921                                 drbd_force_state(mdev, NS(disk, D_FAILED));
3922                         } else if (peer_state.disk == D_NEGOTIATING) {
3923                                 dev_err(DEV, "Disk attach process on the peer node was aborted.\n");
3924                                 peer_state.disk = D_DISKLESS;
3925                                 real_peer_disk = D_DISKLESS;
3926                         } else {
3927                                 if (test_and_clear_bit(CONN_DRY_RUN, &mdev->tconn->flags))
3928                                         return -EIO;
3929                                 D_ASSERT(os.conn == C_WF_REPORT_PARAMS);
3930                                 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3931                                 return -EIO;
3932                         }
3933                 }
3934         }
3935
3936         spin_lock_irq(&mdev->tconn->req_lock);
3937         if (os.i != drbd_read_state(mdev).i)
3938                 goto retry;
3939         clear_bit(CONSIDER_RESYNC, &mdev->flags);
3940         ns.peer = peer_state.role;
3941         ns.pdsk = real_peer_disk;
3942         ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
3943         if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
3944                 ns.disk = mdev->new_state_tmp.disk;
3945         cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
3946         if (ns.pdsk == D_CONSISTENT && drbd_suspended(mdev) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
3947             test_bit(NEW_CUR_UUID, &mdev->flags)) {
3948                 /* Do not allow tl_restart(RESEND) for a rebooted peer. We can only allow this
3949                    for temporal network outages! */
3950                 spin_unlock_irq(&mdev->tconn->req_lock);
3951                 dev_err(DEV, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
3952                 tl_clear(mdev->tconn);
3953                 drbd_uuid_new_current(mdev);
3954                 clear_bit(NEW_CUR_UUID, &mdev->flags);
3955                 conn_request_state(mdev->tconn, NS2(conn, C_PROTOCOL_ERROR, susp, 0), CS_HARD);
3956                 return -EIO;
3957         }
3958         rv = _drbd_set_state(mdev, ns, cs_flags, NULL);
3959         ns = drbd_read_state(mdev);
3960         spin_unlock_irq(&mdev->tconn->req_lock);
3961
3962         if (rv < SS_SUCCESS) {
3963                 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3964                 return -EIO;
3965         }
3966
3967         if (os.conn > C_WF_REPORT_PARAMS) {
3968                 if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
3969                     peer_state.disk != D_NEGOTIATING ) {
3970                         /* we want resync, peer has not yet decided to sync... */
3971                         /* Nowadays only used when forcing a node into primary role and
3972                            setting its disk to UpToDate with that */
3973                         drbd_send_uuids(mdev);
3974                         drbd_send_current_state(mdev);
3975                 }
3976         }
3977
3978         clear_bit(DISCARD_MY_DATA, &mdev->flags);
3979
3980         drbd_md_sync(mdev); /* update connected indicator, la_size_sect, ... */
3981
3982         return 0;
3983 }
3984
3985 static int receive_sync_uuid(struct drbd_tconn *tconn, struct packet_info *pi)
3986 {
3987         struct drbd_conf *mdev;
3988         struct p_rs_uuid *p = pi->data;
3989
3990         mdev = vnr_to_mdev(tconn, pi->vnr);
3991         if (!mdev)
3992                 return -EIO;
3993
3994         wait_event(mdev->misc_wait,
3995                    mdev->state.conn == C_WF_SYNC_UUID ||
3996                    mdev->state.conn == C_BEHIND ||
3997                    mdev->state.conn < C_CONNECTED ||
3998                    mdev->state.disk < D_NEGOTIATING);
3999
4000         /* D_ASSERT( mdev->state.conn == C_WF_SYNC_UUID ); */
4001
4002         /* Here the _drbd_uuid_ functions are right, current should
4003            _not_ be rotated into the history */
4004         if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
4005                 _drbd_uuid_set(mdev, UI_CURRENT, be64_to_cpu(p->uuid));
4006                 _drbd_uuid_set(mdev, UI_BITMAP, 0UL);
4007
4008                 drbd_print_uuids(mdev, "updated sync uuid");
4009                 drbd_start_resync(mdev, C_SYNC_TARGET);
4010
4011                 put_ldev(mdev);
4012         } else
4013                 dev_err(DEV, "Ignoring SyncUUID packet!\n");
4014
4015         return 0;
4016 }
4017
4018 /**
4019  * receive_bitmap_plain
4020  *
4021  * Return 0 when done, 1 when another iteration is needed, and a negative error
4022  * code upon failure.
4023  */
4024 static int
4025 receive_bitmap_plain(struct drbd_conf *mdev, unsigned int size,
4026                      unsigned long *p, struct bm_xfer_ctx *c)
4027 {
4028         unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE -
4029                                  drbd_header_size(mdev->tconn);
4030         unsigned int num_words = min_t(size_t, data_size / sizeof(*p),
4031                                        c->bm_words - c->word_offset);
4032         unsigned int want = num_words * sizeof(*p);
4033         int err;
4034
4035         if (want != size) {
4036                 dev_err(DEV, "%s:want (%u) != size (%u)\n", __func__, want, size);
4037                 return -EIO;
4038         }
4039         if (want == 0)
4040                 return 0;
4041         err = drbd_recv_all(mdev->tconn, p, want);
4042         if (err)
4043                 return err;
4044
4045         drbd_bm_merge_lel(mdev, c->word_offset, num_words, p);
4046
4047         c->word_offset += num_words;
4048         c->bit_offset = c->word_offset * BITS_PER_LONG;
4049         if (c->bit_offset > c->bm_bits)
4050                 c->bit_offset = c->bm_bits;
4051
4052         return 1;
4053 }
4054
4055 static enum drbd_bitmap_code dcbp_get_code(struct p_compressed_bm *p)
4056 {
4057         return (enum drbd_bitmap_code)(p->encoding & 0x0f);
4058 }
4059
4060 static int dcbp_get_start(struct p_compressed_bm *p)
4061 {
4062         return (p->encoding & 0x80) != 0;
4063 }
4064
4065 static int dcbp_get_pad_bits(struct p_compressed_bm *p)
4066 {
4067         return (p->encoding >> 4) & 0x7;
4068 }
4069
4070 /**
4071  * recv_bm_rle_bits
4072  *
4073  * Return 0 when done, 1 when another iteration is needed, and a negative error
4074  * code upon failure.
4075  */
4076 static int
4077 recv_bm_rle_bits(struct drbd_conf *mdev,
4078                 struct p_compressed_bm *p,
4079                  struct bm_xfer_ctx *c,
4080                  unsigned int len)
4081 {
4082         struct bitstream bs;
4083         u64 look_ahead;
4084         u64 rl;
4085         u64 tmp;
4086         unsigned long s = c->bit_offset;
4087         unsigned long e;
4088         int toggle = dcbp_get_start(p);
4089         int have;
4090         int bits;
4091
4092         bitstream_init(&bs, p->code, len, dcbp_get_pad_bits(p));
4093
4094         bits = bitstream_get_bits(&bs, &look_ahead, 64);
4095         if (bits < 0)
4096                 return -EIO;
4097
4098         for (have = bits; have > 0; s += rl, toggle = !toggle) {
4099                 bits = vli_decode_bits(&rl, look_ahead);
4100                 if (bits <= 0)
4101                         return -EIO;
4102
4103                 if (toggle) {
4104                         e = s + rl -1;
4105                         if (e >= c->bm_bits) {
4106                                 dev_err(DEV, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
4107                                 return -EIO;
4108                         }
4109                         _drbd_bm_set_bits(mdev, s, e);
4110                 }
4111
4112                 if (have < bits) {
4113                         dev_err(DEV, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
4114                                 have, bits, look_ahead,
4115                                 (unsigned int)(bs.cur.b - p->code),
4116                                 (unsigned int)bs.buf_len);
4117                         return -EIO;
4118                 }
4119                 /* if we consumed all 64 bits, assign 0; >> 64 is "undefined"; */
4120                 if (likely(bits < 64))
4121                         look_ahead >>= bits;
4122                 else
4123                         look_ahead = 0;
4124                 have -= bits;
4125
4126                 bits = bitstream_get_bits(&bs, &tmp, 64 - have);
4127                 if (bits < 0)
4128                         return -EIO;
4129                 look_ahead |= tmp << have;
4130                 have += bits;
4131         }
4132
4133         c->bit_offset = s;
4134         bm_xfer_ctx_bit_to_word_offset(c);
4135
4136         return (s != c->bm_bits);
4137 }
4138
4139 /**
4140  * decode_bitmap_c
4141  *
4142  * Return 0 when done, 1 when another iteration is needed, and a negative error
4143  * code upon failure.
4144  */
4145 static int
4146 decode_bitmap_c(struct drbd_conf *mdev,
4147                 struct p_compressed_bm *p,
4148                 struct bm_xfer_ctx *c,
4149                 unsigned int len)
4150 {
4151         if (dcbp_get_code(p) == RLE_VLI_Bits)
4152                 return recv_bm_rle_bits(mdev, p, c, len - sizeof(*p));
4153
4154         /* other variants had been implemented for evaluation,
4155          * but have been dropped as this one turned out to be "best"
4156          * during all our tests. */
4157
4158         dev_err(DEV, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
4159         conn_request_state(mdev->tconn, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4160         return -EIO;
4161 }
4162
4163 void INFO_bm_xfer_stats(struct drbd_conf *mdev,
4164                 const char *direction, struct bm_xfer_ctx *c)
4165 {
4166         /* what would it take to transfer it "plaintext" */
4167         unsigned int header_size = drbd_header_size(mdev->tconn);
4168         unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE - header_size;
4169         unsigned int plain =
4170                 header_size * (DIV_ROUND_UP(c->bm_words, data_size) + 1) +
4171                 c->bm_words * sizeof(unsigned long);
4172         unsigned int total = c->bytes[0] + c->bytes[1];
4173         unsigned int r;
4174
4175         /* total can not be zero. but just in case: */
4176         if (total == 0)
4177                 return;
4178
4179         /* don't report if not compressed */
4180         if (total >= plain)
4181                 return;
4182
4183         /* total < plain. check for overflow, still */
4184         r = (total > UINT_MAX/1000) ? (total / (plain/1000))
4185                                     : (1000 * total / plain);
4186
4187         if (r > 1000)
4188                 r = 1000;
4189
4190         r = 1000 - r;
4191         dev_info(DEV, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
4192              "total %u; compression: %u.%u%%\n",
4193                         direction,
4194                         c->bytes[1], c->packets[1],
4195                         c->bytes[0], c->packets[0],
4196                         total, r/10, r % 10);
4197 }
4198
4199 /* Since we are processing the bitfield from lower addresses to higher,
4200    it does not matter if the process it in 32 bit chunks or 64 bit
4201    chunks as long as it is little endian. (Understand it as byte stream,
4202    beginning with the lowest byte...) If we would use big endian
4203    we would need to process it from the highest address to the lowest,
4204    in order to be agnostic to the 32 vs 64 bits issue.
4205
4206    returns 0 on failure, 1 if we successfully received it. */
4207 static int receive_bitmap(struct drbd_tconn *tconn, struct packet_info *pi)
4208 {
4209         struct drbd_conf *mdev;
4210         struct bm_xfer_ctx c;
4211         int err;
4212
4213         mdev = vnr_to_mdev(tconn, pi->vnr);
4214         if (!mdev)
4215                 return -EIO;
4216
4217         drbd_bm_lock(mdev, "receive bitmap", BM_LOCKED_SET_ALLOWED);
4218         /* you are supposed to send additional out-of-sync information
4219          * if you actually set bits during this phase */
4220
4221         c = (struct bm_xfer_ctx) {
4222                 .bm_bits = drbd_bm_bits(mdev),
4223                 .bm_words = drbd_bm_words(mdev),
4224         };
4225
4226         for(;;) {
4227                 if (pi->cmd == P_BITMAP)
4228                         err = receive_bitmap_plain(mdev, pi->size, pi->data, &c);
4229                 else if (pi->cmd == P_COMPRESSED_BITMAP) {
4230                         /* MAYBE: sanity check that we speak proto >= 90,
4231                          * and the feature is enabled! */
4232                         struct p_compressed_bm *p = pi->data;
4233
4234                         if (pi->size > DRBD_SOCKET_BUFFER_SIZE - drbd_header_size(tconn)) {
4235                                 dev_err(DEV, "ReportCBitmap packet too large\n");
4236                                 err = -EIO;
4237                                 goto out;
4238                         }
4239                         if (pi->size <= sizeof(*p)) {
4240                                 dev_err(DEV, "ReportCBitmap packet too small (l:%u)\n", pi->size);
4241                                 err = -EIO;
4242                                 goto out;
4243                         }
4244                         err = drbd_recv_all(mdev->tconn, p, pi->size);
4245                         if (err)
4246                                goto out;
4247                         err = decode_bitmap_c(mdev, p, &c, pi->size);
4248                 } else {
4249                         dev_warn(DEV, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", pi->cmd);
4250                         err = -EIO;
4251                         goto out;
4252                 }
4253
4254                 c.packets[pi->cmd == P_BITMAP]++;
4255                 c.bytes[pi->cmd == P_BITMAP] += drbd_header_size(tconn) + pi->size;
4256
4257                 if (err <= 0) {
4258                         if (err < 0)
4259                                 goto out;
4260                         break;
4261                 }
4262                 err = drbd_recv_header(mdev->tconn, pi);
4263                 if (err)
4264                         goto out;
4265         }
4266
4267         INFO_bm_xfer_stats(mdev, "receive", &c);
4268
4269         if (mdev->state.conn == C_WF_BITMAP_T) {
4270                 enum drbd_state_rv rv;
4271
4272                 err = drbd_send_bitmap(mdev);
4273                 if (err)
4274                         goto out;
4275                 /* Omit CS_ORDERED with this state transition to avoid deadlocks. */
4276                 rv = _drbd_request_state(mdev, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
4277                 D_ASSERT(rv == SS_SUCCESS);
4278         } else if (mdev->state.conn != C_WF_BITMAP_S) {
4279                 /* admin may have requested C_DISCONNECTING,
4280                  * other threads may have noticed network errors */
4281                 dev_info(DEV, "unexpected cstate (%s) in receive_bitmap\n",
4282                     drbd_conn_str(mdev->state.conn));
4283         }
4284         err = 0;
4285
4286  out:
4287         drbd_bm_unlock(mdev);
4288         if (!err && mdev->state.conn == C_WF_BITMAP_S)
4289                 drbd_start_resync(mdev, C_SYNC_SOURCE);
4290         return err;
4291 }
4292
4293 static int receive_skip(struct drbd_tconn *tconn, struct packet_info *pi)
4294 {
4295         conn_warn(tconn, "skipping unknown optional packet type %d, l: %d!\n",
4296                  pi->cmd, pi->size);
4297
4298         return ignore_remaining_packet(tconn, pi);
4299 }
4300
4301 static int receive_UnplugRemote(struct drbd_tconn *tconn, struct packet_info *pi)
4302 {
4303         /* Make sure we've acked all the TCP data associated
4304          * with the data requests being unplugged */
4305         drbd_tcp_quickack(tconn->data.socket);
4306
4307         return 0;
4308 }
4309
4310 static int receive_out_of_sync(struct drbd_tconn *tconn, struct packet_info *pi)
4311 {
4312         struct drbd_conf *mdev;
4313         struct p_block_desc *p = pi->data;
4314
4315         mdev = vnr_to_mdev(tconn, pi->vnr);
4316         if (!mdev)
4317                 return -EIO;
4318
4319         switch (mdev->state.conn) {
4320         case C_WF_SYNC_UUID:
4321         case C_WF_BITMAP_T:
4322         case C_BEHIND:
4323                         break;
4324         default:
4325                 dev_err(DEV, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
4326                                 drbd_conn_str(mdev->state.conn));
4327         }
4328
4329         drbd_set_out_of_sync(mdev, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
4330
4331         return 0;
4332 }
4333
4334 struct data_cmd {
4335         int expect_payload;
4336         size_t pkt_size;
4337         int (*fn)(struct drbd_tconn *, struct packet_info *);
4338 };
4339
4340 static struct data_cmd drbd_cmd_handler[] = {
4341         [P_DATA]            = { 1, sizeof(struct p_data), receive_Data },
4342         [P_DATA_REPLY]      = { 1, sizeof(struct p_data), receive_DataReply },
4343         [P_RS_DATA_REPLY]   = { 1, sizeof(struct p_data), receive_RSDataReply } ,
4344         [P_BARRIER]         = { 0, sizeof(struct p_barrier), receive_Barrier } ,
4345         [P_BITMAP]          = { 1, 0, receive_bitmap } ,
4346         [P_COMPRESSED_BITMAP] = { 1, 0, receive_bitmap } ,
4347         [P_UNPLUG_REMOTE]   = { 0, 0, receive_UnplugRemote },
4348         [P_DATA_REQUEST]    = { 0, sizeof(struct p_block_req), receive_DataRequest },
4349         [P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
4350         [P_SYNC_PARAM]      = { 1, 0, receive_SyncParam },
4351         [P_SYNC_PARAM89]    = { 1, 0, receive_SyncParam },
4352         [P_PROTOCOL]        = { 1, sizeof(struct p_protocol), receive_protocol },
4353         [P_UUIDS]           = { 0, sizeof(struct p_uuids), receive_uuids },
4354         [P_SIZES]           = { 0, sizeof(struct p_sizes), receive_sizes },
4355         [P_STATE]           = { 0, sizeof(struct p_state), receive_state },
4356         [P_STATE_CHG_REQ]   = { 0, sizeof(struct p_req_state), receive_req_state },
4357         [P_SYNC_UUID]       = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
4358         [P_OV_REQUEST]      = { 0, sizeof(struct p_block_req), receive_DataRequest },
4359         [P_OV_REPLY]        = { 1, sizeof(struct p_block_req), receive_DataRequest },
4360         [P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
4361         [P_DELAY_PROBE]     = { 0, sizeof(struct p_delay_probe93), receive_skip },
4362         [P_OUT_OF_SYNC]     = { 0, sizeof(struct p_block_desc), receive_out_of_sync },
4363         [P_CONN_ST_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_conn_state },
4364         [P_PROTOCOL_UPDATE] = { 1, sizeof(struct p_protocol), receive_protocol },
4365 };
4366
4367 static void drbdd(struct drbd_tconn *tconn)
4368 {
4369         struct packet_info pi;
4370         size_t shs; /* sub header size */
4371         int err;
4372
4373         while (get_t_state(&tconn->receiver) == RUNNING) {
4374                 struct data_cmd *cmd;
4375
4376                 drbd_thread_current_set_cpu(&tconn->receiver);
4377                 if (drbd_recv_header(tconn, &pi))
4378                         goto err_out;
4379
4380                 cmd = &drbd_cmd_handler[pi.cmd];
4381                 if (unlikely(pi.cmd >= ARRAY_SIZE(drbd_cmd_handler) || !cmd->fn)) {
4382                         conn_err(tconn, "Unexpected data packet %s (0x%04x)",
4383                                  cmdname(pi.cmd), pi.cmd);
4384                         goto err_out;
4385                 }
4386
4387                 shs = cmd->pkt_size;
4388                 if (pi.size > shs && !cmd->expect_payload) {
4389                         conn_err(tconn, "No payload expected %s l:%d\n",
4390                                  cmdname(pi.cmd), pi.size);
4391                         goto err_out;
4392                 }
4393
4394                 if (shs) {
4395                         err = drbd_recv_all_warn(tconn, pi.data, shs);
4396                         if (err)
4397                                 goto err_out;
4398                         pi.size -= shs;
4399                 }
4400
4401                 err = cmd->fn(tconn, &pi);
4402                 if (err) {
4403                         conn_err(tconn, "error receiving %s, e: %d l: %d!\n",
4404                                  cmdname(pi.cmd), err, pi.size);
4405                         goto err_out;
4406                 }
4407         }
4408         return;
4409
4410     err_out:
4411         conn_request_state(tconn, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4412 }
4413
4414 void conn_flush_workqueue(struct drbd_tconn *tconn)
4415 {
4416         struct drbd_wq_barrier barr;
4417
4418         barr.w.cb = w_prev_work_done;
4419         barr.w.tconn = tconn;
4420         init_completion(&barr.done);
4421         drbd_queue_work(&tconn->sender_work, &barr.w);
4422         wait_for_completion(&barr.done);
4423 }
4424
4425 static void conn_disconnect(struct drbd_tconn *tconn)
4426 {
4427         struct drbd_conf *mdev;
4428         enum drbd_conns oc;
4429         int vnr;
4430
4431         if (tconn->cstate == C_STANDALONE)
4432                 return;
4433
4434         /* We are about to start the cleanup after connection loss.
4435          * Make sure drbd_make_request knows about that.
4436          * Usually we should be in some network failure state already,
4437          * but just in case we are not, we fix it up here.
4438          */
4439         conn_request_state(tconn, NS(conn, C_NETWORK_FAILURE), CS_HARD);
4440
4441         /* asender does not clean up anything. it must not interfere, either */
4442         drbd_thread_stop(&tconn->asender);
4443         drbd_free_sock(tconn);
4444
4445         rcu_read_lock();
4446         idr_for_each_entry(&tconn->volumes, mdev, vnr) {
4447                 kref_get(&mdev->kref);
4448                 rcu_read_unlock();
4449                 drbd_disconnected(mdev);
4450                 kref_put(&mdev->kref, &drbd_minor_destroy);
4451                 rcu_read_lock();
4452         }
4453         rcu_read_unlock();
4454
4455         if (!list_empty(&tconn->current_epoch->list))
4456                 conn_err(tconn, "ASSERTION FAILED: tconn->current_epoch->list not empty\n");
4457         /* ok, no more ee's on the fly, it is safe to reset the epoch_size */
4458         atomic_set(&tconn->current_epoch->epoch_size, 0);
4459         tconn->send.seen_any_write_yet = false;
4460
4461         conn_info(tconn, "Connection closed\n");
4462
4463         if (conn_highest_role(tconn) == R_PRIMARY && conn_highest_pdsk(tconn) >= D_UNKNOWN)
4464                 conn_try_outdate_peer_async(tconn);
4465
4466         spin_lock_irq(&tconn->req_lock);
4467         oc = tconn->cstate;
4468         if (oc >= C_UNCONNECTED)
4469                 _conn_request_state(tconn, NS(conn, C_UNCONNECTED), CS_VERBOSE);
4470
4471         spin_unlock_irq(&tconn->req_lock);
4472
4473         if (oc == C_DISCONNECTING)
4474                 conn_request_state(tconn, NS(conn, C_STANDALONE), CS_VERBOSE | CS_HARD);
4475 }
4476
4477 static int drbd_disconnected(struct drbd_conf *mdev)
4478 {
4479         unsigned int i;
4480
4481         /* wait for current activity to cease. */
4482         spin_lock_irq(&mdev->tconn->req_lock);
4483         _drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
4484         _drbd_wait_ee_list_empty(mdev, &mdev->sync_ee);
4485         _drbd_wait_ee_list_empty(mdev, &mdev->read_ee);
4486         spin_unlock_irq(&mdev->tconn->req_lock);
4487
4488         /* We do not have data structures that would allow us to
4489          * get the rs_pending_cnt down to 0 again.
4490          *  * On C_SYNC_TARGET we do not have any data structures describing
4491          *    the pending RSDataRequest's we have sent.
4492          *  * On C_SYNC_SOURCE there is no data structure that tracks
4493          *    the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
4494          *  And no, it is not the sum of the reference counts in the
4495          *  resync_LRU. The resync_LRU tracks the whole operation including
4496          *  the disk-IO, while the rs_pending_cnt only tracks the blocks
4497          *  on the fly. */
4498         drbd_rs_cancel_all(mdev);
4499         mdev->rs_total = 0;
4500         mdev->rs_failed = 0;
4501         atomic_set(&mdev->rs_pending_cnt, 0);
4502         wake_up(&mdev->misc_wait);
4503
4504         del_timer_sync(&mdev->resync_timer);
4505         resync_timer_fn((unsigned long)mdev);
4506
4507         /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
4508          * w_make_resync_request etc. which may still be on the worker queue
4509          * to be "canceled" */
4510         drbd_flush_workqueue(mdev);
4511
4512         drbd_finish_peer_reqs(mdev);
4513
4514         /* This second workqueue flush is necessary, since drbd_finish_peer_reqs()
4515            might have issued a work again. The one before drbd_finish_peer_reqs() is
4516            necessary to reclain net_ee in drbd_finish_peer_reqs(). */
4517         drbd_flush_workqueue(mdev);
4518
4519         /* need to do it again, drbd_finish_peer_reqs() may have populated it
4520          * again via drbd_try_clear_on_disk_bm(). */
4521         drbd_rs_cancel_all(mdev);
4522
4523         kfree(mdev->p_uuid);
4524         mdev->p_uuid = NULL;
4525
4526         if (!drbd_suspended(mdev))
4527                 tl_clear(mdev->tconn);
4528
4529         drbd_md_sync(mdev);
4530
4531         /* serialize with bitmap writeout triggered by the state change,
4532          * if any. */
4533         wait_event(mdev->misc_wait, !test_bit(BITMAP_IO, &mdev->flags));
4534
4535         /* tcp_close and release of sendpage pages can be deferred.  I don't
4536          * want to use SO_LINGER, because apparently it can be deferred for
4537          * more than 20 seconds (longest time I checked).
4538          *
4539          * Actually we don't care for exactly when the network stack does its
4540          * put_page(), but release our reference on these pages right here.
4541          */
4542         i = drbd_free_peer_reqs(mdev, &mdev->net_ee);
4543         if (i)
4544                 dev_info(DEV, "net_ee not empty, killed %u entries\n", i);
4545         i = atomic_read(&mdev->pp_in_use_by_net);
4546         if (i)
4547                 dev_info(DEV, "pp_in_use_by_net = %d, expected 0\n", i);
4548         i = atomic_read(&mdev->pp_in_use);
4549         if (i)
4550                 dev_info(DEV, "pp_in_use = %d, expected 0\n", i);
4551
4552         D_ASSERT(list_empty(&mdev->read_ee));
4553         D_ASSERT(list_empty(&mdev->active_ee));
4554         D_ASSERT(list_empty(&mdev->sync_ee));
4555         D_ASSERT(list_empty(&mdev->done_ee));
4556
4557         return 0;
4558 }
4559
4560 /*
4561  * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
4562  * we can agree on is stored in agreed_pro_version.
4563  *
4564  * feature flags and the reserved array should be enough room for future
4565  * enhancements of the handshake protocol, and possible plugins...
4566  *
4567  * for now, they are expected to be zero, but ignored.
4568  */
4569 static int drbd_send_features(struct drbd_tconn *tconn)
4570 {
4571         struct drbd_socket *sock;
4572         struct p_connection_features *p;
4573
4574         sock = &tconn->data;
4575         p = conn_prepare_command(tconn, sock);
4576         if (!p)
4577                 return -EIO;
4578         memset(p, 0, sizeof(*p));
4579         p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
4580         p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
4581         return conn_send_command(tconn, sock, P_CONNECTION_FEATURES, sizeof(*p), NULL, 0);
4582 }
4583
4584 /*
4585  * return values:
4586  *   1 yes, we have a valid connection
4587  *   0 oops, did not work out, please try again
4588  *  -1 peer talks different language,
4589  *     no point in trying again, please go standalone.
4590  */
4591 static int drbd_do_features(struct drbd_tconn *tconn)
4592 {
4593         /* ASSERT current == tconn->receiver ... */
4594         struct p_connection_features *p;
4595         const int expect = sizeof(struct p_connection_features);
4596         struct packet_info pi;
4597         int err;
4598
4599         err = drbd_send_features(tconn);
4600         if (err)
4601                 return 0;
4602
4603         err = drbd_recv_header(tconn, &pi);
4604         if (err)
4605                 return 0;
4606
4607         if (pi.cmd != P_CONNECTION_FEATURES) {
4608                 conn_err(tconn, "expected ConnectionFeatures packet, received: %s (0x%04x)\n",
4609                          cmdname(pi.cmd), pi.cmd);
4610                 return -1;
4611         }
4612
4613         if (pi.size != expect) {
4614                 conn_err(tconn, "expected ConnectionFeatures length: %u, received: %u\n",
4615                      expect, pi.size);
4616                 return -1;
4617         }
4618
4619         p = pi.data;
4620         err = drbd_recv_all_warn(tconn, p, expect);
4621         if (err)
4622                 return 0;
4623
4624         p->protocol_min = be32_to_cpu(p->protocol_min);
4625         p->protocol_max = be32_to_cpu(p->protocol_max);
4626         if (p->protocol_max == 0)
4627                 p->protocol_max = p->protocol_min;
4628
4629         if (PRO_VERSION_MAX < p->protocol_min ||
4630             PRO_VERSION_MIN > p->protocol_max)
4631                 goto incompat;
4632
4633         tconn->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
4634
4635         conn_info(tconn, "Handshake successful: "
4636              "Agreed network protocol version %d\n", tconn->agreed_pro_version);
4637
4638         return 1;
4639
4640  incompat:
4641         conn_err(tconn, "incompatible DRBD dialects: "
4642             "I support %d-%d, peer supports %d-%d\n",
4643             PRO_VERSION_MIN, PRO_VERSION_MAX,
4644             p->protocol_min, p->protocol_max);
4645         return -1;
4646 }
4647
4648 #if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
4649 static int drbd_do_auth(struct drbd_tconn *tconn)
4650 {
4651         conn_err(tconn, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
4652         conn_err(tconn, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
4653         return -1;
4654 }
4655 #else
4656 #define CHALLENGE_LEN 64
4657
4658 /* Return value:
4659         1 - auth succeeded,
4660         0 - failed, try again (network error),
4661         -1 - auth failed, don't try again.
4662 */
4663
4664 static int drbd_do_auth(struct drbd_tconn *tconn)
4665 {
4666         struct drbd_socket *sock;
4667         char my_challenge[CHALLENGE_LEN];  /* 64 Bytes... */
4668         struct scatterlist sg;
4669         char *response = NULL;
4670         char *right_response = NULL;
4671         char *peers_ch = NULL;
4672         unsigned int key_len;
4673         char secret[SHARED_SECRET_MAX]; /* 64 byte */
4674         unsigned int resp_size;
4675         struct hash_desc desc;
4676         struct packet_info pi;
4677         struct net_conf *nc;
4678         int err, rv;
4679
4680         /* FIXME: Put the challenge/response into the preallocated socket buffer.  */
4681
4682         rcu_read_lock();
4683         nc = rcu_dereference(tconn->net_conf);
4684         key_len = strlen(nc->shared_secret);
4685         memcpy(secret, nc->shared_secret, key_len);
4686         rcu_read_unlock();
4687
4688         desc.tfm = tconn->cram_hmac_tfm;
4689         desc.flags = 0;
4690
4691         rv = crypto_hash_setkey(tconn->cram_hmac_tfm, (u8 *)secret, key_len);
4692         if (rv) {
4693                 conn_err(tconn, "crypto_hash_setkey() failed with %d\n", rv);
4694                 rv = -1;
4695                 goto fail;
4696         }
4697
4698         get_random_bytes(my_challenge, CHALLENGE_LEN);
4699
4700         sock = &tconn->data;
4701         if (!conn_prepare_command(tconn, sock)) {
4702                 rv = 0;
4703                 goto fail;
4704         }
4705         rv = !conn_send_command(tconn, sock, P_AUTH_CHALLENGE, 0,
4706                                 my_challenge, CHALLENGE_LEN);
4707         if (!rv)
4708                 goto fail;
4709
4710         err = drbd_recv_header(tconn, &pi);
4711         if (err) {
4712                 rv = 0;
4713                 goto fail;
4714         }
4715
4716         if (pi.cmd != P_AUTH_CHALLENGE) {
4717                 conn_err(tconn, "expected AuthChallenge packet, received: %s (0x%04x)\n",
4718                          cmdname(pi.cmd), pi.cmd);
4719                 rv = 0;
4720                 goto fail;
4721         }
4722
4723         if (pi.size > CHALLENGE_LEN * 2) {
4724                 conn_err(tconn, "expected AuthChallenge payload too big.\n");
4725                 rv = -1;
4726                 goto fail;
4727         }
4728
4729         peers_ch = kmalloc(pi.size, GFP_NOIO);
4730         if (peers_ch == NULL) {
4731                 conn_err(tconn, "kmalloc of peers_ch failed\n");
4732                 rv = -1;
4733                 goto fail;
4734         }
4735
4736         err = drbd_recv_all_warn(tconn, peers_ch, pi.size);
4737         if (err) {
4738                 rv = 0;
4739                 goto fail;
4740         }
4741
4742         resp_size = crypto_hash_digestsize(tconn->cram_hmac_tfm);
4743         response = kmalloc(resp_size, GFP_NOIO);
4744         if (response == NULL) {
4745                 conn_err(tconn, "kmalloc of response failed\n");
4746                 rv = -1;
4747                 goto fail;
4748         }
4749
4750         sg_init_table(&sg, 1);
4751         sg_set_buf(&sg, peers_ch, pi.size);
4752
4753         rv = crypto_hash_digest(&desc, &sg, sg.length, response);
4754         if (rv) {
4755                 conn_err(tconn, "crypto_hash_digest() failed with %d\n", rv);
4756                 rv = -1;
4757                 goto fail;
4758         }
4759
4760         if (!conn_prepare_command(tconn, sock)) {
4761                 rv = 0;
4762                 goto fail;
4763         }
4764         rv = !conn_send_command(tconn, sock, P_AUTH_RESPONSE, 0,
4765                                 response, resp_size);
4766         if (!rv)
4767                 goto fail;
4768
4769         err = drbd_recv_header(tconn, &pi);
4770         if (err) {
4771                 rv = 0;
4772                 goto fail;
4773         }
4774
4775         if (pi.cmd != P_AUTH_RESPONSE) {
4776                 conn_err(tconn, "expected AuthResponse packet, received: %s (0x%04x)\n",
4777                          cmdname(pi.cmd), pi.cmd);
4778                 rv = 0;
4779                 goto fail;
4780         }
4781
4782         if (pi.size != resp_size) {
4783                 conn_err(tconn, "expected AuthResponse payload of wrong size\n");
4784                 rv = 0;
4785                 goto fail;
4786         }
4787
4788         err = drbd_recv_all_warn(tconn, response , resp_size);
4789         if (err) {
4790                 rv = 0;
4791                 goto fail;
4792         }
4793
4794         right_response = kmalloc(resp_size, GFP_NOIO);
4795         if (right_response == NULL) {
4796                 conn_err(tconn, "kmalloc of right_response failed\n");
4797                 rv = -1;
4798                 goto fail;
4799         }
4800
4801         sg_set_buf(&sg, my_challenge, CHALLENGE_LEN);
4802
4803         rv = crypto_hash_digest(&desc, &sg, sg.length, right_response);
4804         if (rv) {
4805                 conn_err(tconn, "crypto_hash_digest() failed with %d\n", rv);
4806                 rv = -1;
4807                 goto fail;
4808         }
4809
4810         rv = !memcmp(response, right_response, resp_size);
4811
4812         if (rv)
4813                 conn_info(tconn, "Peer authenticated using %d bytes HMAC\n",
4814                      resp_size);
4815         else
4816                 rv = -1;
4817
4818  fail:
4819         kfree(peers_ch);
4820         kfree(response);
4821         kfree(right_response);
4822
4823         return rv;
4824 }
4825 #endif
4826
4827 int drbdd_init(struct drbd_thread *thi)
4828 {
4829         struct drbd_tconn *tconn = thi->tconn;
4830         int h;
4831
4832         conn_info(tconn, "receiver (re)started\n");
4833
4834         do {
4835                 h = conn_connect(tconn);
4836                 if (h == 0) {
4837                         conn_disconnect(tconn);
4838                         schedule_timeout_interruptible(HZ);
4839                 }
4840                 if (h == -1) {
4841                         conn_warn(tconn, "Discarding network configuration.\n");
4842                         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
4843                 }
4844         } while (h == 0);
4845
4846         if (h > 0)
4847                 drbdd(tconn);
4848
4849         conn_disconnect(tconn);
4850
4851         conn_info(tconn, "receiver terminated\n");
4852         return 0;
4853 }
4854
4855 /* ********* acknowledge sender ******** */
4856
4857 static int got_conn_RqSReply(struct drbd_tconn *tconn, struct packet_info *pi)
4858 {
4859         struct p_req_state_reply *p = pi->data;
4860         int retcode = be32_to_cpu(p->retcode);
4861
4862         if (retcode >= SS_SUCCESS) {
4863                 set_bit(CONN_WD_ST_CHG_OKAY, &tconn->flags);
4864         } else {
4865                 set_bit(CONN_WD_ST_CHG_FAIL, &tconn->flags);
4866                 conn_err(tconn, "Requested state change failed by peer: %s (%d)\n",
4867                          drbd_set_st_err_str(retcode), retcode);
4868         }
4869         wake_up(&tconn->ping_wait);
4870
4871         return 0;
4872 }
4873
4874 static int got_RqSReply(struct drbd_tconn *tconn, struct packet_info *pi)
4875 {
4876         struct drbd_conf *mdev;
4877         struct p_req_state_reply *p = pi->data;
4878         int retcode = be32_to_cpu(p->retcode);
4879
4880         mdev = vnr_to_mdev(tconn, pi->vnr);
4881         if (!mdev)
4882                 return -EIO;
4883
4884         if (test_bit(CONN_WD_ST_CHG_REQ, &tconn->flags)) {
4885                 D_ASSERT(tconn->agreed_pro_version < 100);
4886                 return got_conn_RqSReply(tconn, pi);
4887         }
4888
4889         if (retcode >= SS_SUCCESS) {
4890                 set_bit(CL_ST_CHG_SUCCESS, &mdev->flags);
4891         } else {
4892                 set_bit(CL_ST_CHG_FAIL, &mdev->flags);
4893                 dev_err(DEV, "Requested state change failed by peer: %s (%d)\n",
4894                         drbd_set_st_err_str(retcode), retcode);
4895         }
4896         wake_up(&mdev->state_wait);
4897
4898         return 0;
4899 }
4900
4901 static int got_Ping(struct drbd_tconn *tconn, struct packet_info *pi)
4902 {
4903         return drbd_send_ping_ack(tconn);
4904
4905 }
4906
4907 static int got_PingAck(struct drbd_tconn *tconn, struct packet_info *pi)
4908 {
4909         /* restore idle timeout */
4910         tconn->meta.socket->sk->sk_rcvtimeo = tconn->net_conf->ping_int*HZ;
4911         if (!test_and_set_bit(GOT_PING_ACK, &tconn->flags))
4912                 wake_up(&tconn->ping_wait);
4913
4914         return 0;
4915 }
4916
4917 static int got_IsInSync(struct drbd_tconn *tconn, struct packet_info *pi)
4918 {
4919         struct drbd_conf *mdev;
4920         struct p_block_ack *p = pi->data;
4921         sector_t sector = be64_to_cpu(p->sector);
4922         int blksize = be32_to_cpu(p->blksize);
4923
4924         mdev = vnr_to_mdev(tconn, pi->vnr);
4925         if (!mdev)
4926                 return -EIO;
4927
4928         D_ASSERT(mdev->tconn->agreed_pro_version >= 89);
4929
4930         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4931
4932         if (get_ldev(mdev)) {
4933                 drbd_rs_complete_io(mdev, sector);
4934                 drbd_set_in_sync(mdev, sector, blksize);
4935                 /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
4936                 mdev->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
4937                 put_ldev(mdev);
4938         }
4939         dec_rs_pending(mdev);
4940         atomic_add(blksize >> 9, &mdev->rs_sect_in);
4941
4942         return 0;
4943 }
4944
4945 static int
4946 validate_req_change_req_state(struct drbd_conf *mdev, u64 id, sector_t sector,
4947                               struct rb_root *root, const char *func,
4948                               enum drbd_req_event what, bool missing_ok)
4949 {
4950         struct drbd_request *req;
4951         struct bio_and_error m;
4952
4953         spin_lock_irq(&mdev->tconn->req_lock);
4954         req = find_request(mdev, root, id, sector, missing_ok, func);
4955         if (unlikely(!req)) {
4956                 spin_unlock_irq(&mdev->tconn->req_lock);
4957                 return -EIO;
4958         }
4959         __req_mod(req, what, &m);
4960         spin_unlock_irq(&mdev->tconn->req_lock);
4961
4962         if (m.bio)
4963                 complete_master_bio(mdev, &m);
4964         return 0;
4965 }
4966
4967 static int got_BlockAck(struct drbd_tconn *tconn, struct packet_info *pi)
4968 {
4969         struct drbd_conf *mdev;
4970         struct p_block_ack *p = pi->data;
4971         sector_t sector = be64_to_cpu(p->sector);
4972         int blksize = be32_to_cpu(p->blksize);
4973         enum drbd_req_event what;
4974
4975         mdev = vnr_to_mdev(tconn, pi->vnr);
4976         if (!mdev)
4977                 return -EIO;
4978
4979         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4980
4981         if (p->block_id == ID_SYNCER) {
4982                 drbd_set_in_sync(mdev, sector, blksize);
4983                 dec_rs_pending(mdev);
4984                 return 0;
4985         }
4986         switch (pi->cmd) {
4987         case P_RS_WRITE_ACK:
4988                 what = WRITE_ACKED_BY_PEER_AND_SIS;
4989                 break;
4990         case P_WRITE_ACK:
4991                 what = WRITE_ACKED_BY_PEER;
4992                 break;
4993         case P_RECV_ACK:
4994                 what = RECV_ACKED_BY_PEER;
4995                 break;
4996         case P_SUPERSEDED:
4997                 what = CONFLICT_RESOLVED;
4998                 break;
4999         case P_RETRY_WRITE:
5000                 what = POSTPONE_WRITE;
5001                 break;
5002         default:
5003                 BUG();
5004         }
5005
5006         return validate_req_change_req_state(mdev, p->block_id, sector,
5007                                              &mdev->write_requests, __func__,
5008                                              what, false);
5009 }
5010
5011 static int got_NegAck(struct drbd_tconn *tconn, struct packet_info *pi)
5012 {
5013         struct drbd_conf *mdev;
5014         struct p_block_ack *p = pi->data;
5015         sector_t sector = be64_to_cpu(p->sector);
5016         int size = be32_to_cpu(p->blksize);
5017         int err;
5018
5019         mdev = vnr_to_mdev(tconn, pi->vnr);
5020         if (!mdev)
5021                 return -EIO;
5022
5023         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
5024
5025         if (p->block_id == ID_SYNCER) {
5026                 dec_rs_pending(mdev);
5027                 drbd_rs_failed_io(mdev, sector, size);
5028                 return 0;
5029         }
5030
5031         err = validate_req_change_req_state(mdev, p->block_id, sector,
5032                                             &mdev->write_requests, __func__,
5033                                             NEG_ACKED, true);
5034         if (err) {
5035                 /* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
5036                    The master bio might already be completed, therefore the
5037                    request is no longer in the collision hash. */
5038                 /* In Protocol B we might already have got a P_RECV_ACK
5039                    but then get a P_NEG_ACK afterwards. */
5040                 drbd_set_out_of_sync(mdev, sector, size);
5041         }
5042         return 0;
5043 }
5044
5045 static int got_NegDReply(struct drbd_tconn *tconn, struct packet_info *pi)
5046 {
5047         struct drbd_conf *mdev;
5048         struct p_block_ack *p = pi->data;
5049         sector_t sector = be64_to_cpu(p->sector);
5050
5051         mdev = vnr_to_mdev(tconn, pi->vnr);
5052         if (!mdev)
5053                 return -EIO;
5054
5055         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
5056
5057         dev_err(DEV, "Got NegDReply; Sector %llus, len %u.\n",
5058             (unsigned long long)sector, be32_to_cpu(p->blksize));
5059
5060         return validate_req_change_req_state(mdev, p->block_id, sector,
5061                                              &mdev->read_requests, __func__,
5062                                              NEG_ACKED, false);
5063 }
5064
5065 static int got_NegRSDReply(struct drbd_tconn *tconn, struct packet_info *pi)
5066 {
5067         struct drbd_conf *mdev;
5068         sector_t sector;
5069         int size;
5070         struct p_block_ack *p = pi->data;
5071
5072         mdev = vnr_to_mdev(tconn, pi->vnr);
5073         if (!mdev)
5074                 return -EIO;
5075
5076         sector = be64_to_cpu(p->sector);
5077         size = be32_to_cpu(p->blksize);
5078
5079         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
5080
5081         dec_rs_pending(mdev);
5082
5083         if (get_ldev_if_state(mdev, D_FAILED)) {
5084                 drbd_rs_complete_io(mdev, sector);
5085                 switch (pi->cmd) {
5086                 case P_NEG_RS_DREPLY:
5087                         drbd_rs_failed_io(mdev, sector, size);
5088                 case P_RS_CANCEL:
5089                         break;
5090                 default:
5091                         BUG();
5092                 }
5093                 put_ldev(mdev);
5094         }
5095
5096         return 0;
5097 }
5098
5099 static int got_BarrierAck(struct drbd_tconn *tconn, struct packet_info *pi)
5100 {
5101         struct p_barrier_ack *p = pi->data;
5102         struct drbd_conf *mdev;
5103         int vnr;
5104
5105         tl_release(tconn, p->barrier, be32_to_cpu(p->set_size));
5106
5107         rcu_read_lock();
5108         idr_for_each_entry(&tconn->volumes, mdev, vnr) {
5109                 if (mdev->state.conn == C_AHEAD &&
5110                     atomic_read(&mdev->ap_in_flight) == 0 &&
5111                     !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &mdev->flags)) {
5112                         mdev->start_resync_timer.expires = jiffies + HZ;
5113                         add_timer(&mdev->start_resync_timer);
5114                 }
5115         }
5116         rcu_read_unlock();
5117
5118         return 0;
5119 }
5120
5121 static int got_OVResult(struct drbd_tconn *tconn, struct packet_info *pi)
5122 {
5123         struct drbd_conf *mdev;
5124         struct p_block_ack *p = pi->data;
5125         struct drbd_work *w;
5126         sector_t sector;
5127         int size;
5128
5129         mdev = vnr_to_mdev(tconn, pi->vnr);
5130         if (!mdev)
5131                 return -EIO;
5132
5133         sector = be64_to_cpu(p->sector);
5134         size = be32_to_cpu(p->blksize);
5135
5136         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
5137
5138         if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
5139                 drbd_ov_out_of_sync_found(mdev, sector, size);
5140         else
5141                 ov_out_of_sync_print(mdev);
5142
5143         if (!get_ldev(mdev))
5144                 return 0;
5145
5146         drbd_rs_complete_io(mdev, sector);
5147         dec_rs_pending(mdev);
5148
5149         --mdev->ov_left;
5150
5151         /* let's advance progress step marks only for every other megabyte */
5152         if ((mdev->ov_left & 0x200) == 0x200)
5153                 drbd_advance_rs_marks(mdev, mdev->ov_left);
5154
5155         if (mdev->ov_left == 0) {
5156                 w = kmalloc(sizeof(*w), GFP_NOIO);
5157                 if (w) {
5158                         w->cb = w_ov_finished;
5159                         w->mdev = mdev;
5160                         drbd_queue_work(&mdev->tconn->sender_work, w);
5161                 } else {
5162                         dev_err(DEV, "kmalloc(w) failed.");
5163                         ov_out_of_sync_print(mdev);
5164                         drbd_resync_finished(mdev);
5165                 }
5166         }
5167         put_ldev(mdev);
5168         return 0;
5169 }
5170
5171 static int got_skip(struct drbd_tconn *tconn, struct packet_info *pi)
5172 {
5173         return 0;
5174 }
5175
5176 static int tconn_finish_peer_reqs(struct drbd_tconn *tconn)
5177 {
5178         struct drbd_conf *mdev;
5179         int vnr, not_empty = 0;
5180
5181         do {
5182                 clear_bit(SIGNAL_ASENDER, &tconn->flags);
5183                 flush_signals(current);
5184
5185                 rcu_read_lock();
5186                 idr_for_each_entry(&tconn->volumes, mdev, vnr) {
5187                         kref_get(&mdev->kref);
5188                         rcu_read_unlock();
5189                         if (drbd_finish_peer_reqs(mdev)) {
5190                                 kref_put(&mdev->kref, &drbd_minor_destroy);
5191                                 return 1;
5192                         }
5193                         kref_put(&mdev->kref, &drbd_minor_destroy);
5194                         rcu_read_lock();
5195                 }
5196                 set_bit(SIGNAL_ASENDER, &tconn->flags);
5197
5198                 spin_lock_irq(&tconn->req_lock);
5199                 idr_for_each_entry(&tconn->volumes, mdev, vnr) {
5200                         not_empty = !list_empty(&mdev->done_ee);
5201                         if (not_empty)
5202                                 break;
5203                 }
5204                 spin_unlock_irq(&tconn->req_lock);
5205                 rcu_read_unlock();
5206         } while (not_empty);
5207
5208         return 0;
5209 }
5210
5211 struct asender_cmd {
5212         size_t pkt_size;
5213         int (*fn)(struct drbd_tconn *tconn, struct packet_info *);
5214 };
5215
5216 static struct asender_cmd asender_tbl[] = {
5217         [P_PING]            = { 0, got_Ping },
5218         [P_PING_ACK]        = { 0, got_PingAck },
5219         [P_RECV_ACK]        = { sizeof(struct p_block_ack), got_BlockAck },
5220         [P_WRITE_ACK]       = { sizeof(struct p_block_ack), got_BlockAck },
5221         [P_RS_WRITE_ACK]    = { sizeof(struct p_block_ack), got_BlockAck },
5222         [P_SUPERSEDED]   = { sizeof(struct p_block_ack), got_BlockAck },
5223         [P_NEG_ACK]         = { sizeof(struct p_block_ack), got_NegAck },
5224         [P_NEG_DREPLY]      = { sizeof(struct p_block_ack), got_NegDReply },
5225         [P_NEG_RS_DREPLY]   = { sizeof(struct p_block_ack), got_NegRSDReply },
5226         [P_OV_RESULT]       = { sizeof(struct p_block_ack), got_OVResult },
5227         [P_BARRIER_ACK]     = { sizeof(struct p_barrier_ack), got_BarrierAck },
5228         [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
5229         [P_RS_IS_IN_SYNC]   = { sizeof(struct p_block_ack), got_IsInSync },
5230         [P_DELAY_PROBE]     = { sizeof(struct p_delay_probe93), got_skip },
5231         [P_RS_CANCEL]       = { sizeof(struct p_block_ack), got_NegRSDReply },
5232         [P_CONN_ST_CHG_REPLY]={ sizeof(struct p_req_state_reply), got_conn_RqSReply },
5233         [P_RETRY_WRITE]     = { sizeof(struct p_block_ack), got_BlockAck },
5234 };
5235
5236 int drbd_asender(struct drbd_thread *thi)
5237 {
5238         struct drbd_tconn *tconn = thi->tconn;
5239         struct asender_cmd *cmd = NULL;
5240         struct packet_info pi;
5241         int rv;
5242         void *buf    = tconn->meta.rbuf;
5243         int received = 0;
5244         unsigned int header_size = drbd_header_size(tconn);
5245         int expect   = header_size;
5246         bool ping_timeout_active = false;
5247         struct net_conf *nc;
5248         int ping_timeo, tcp_cork, ping_int;
5249         struct sched_param param = { .sched_priority = 2 };
5250
5251         rv = sched_setscheduler(current, SCHED_RR, &param);
5252         if (rv < 0)
5253                 conn_err(tconn, "drbd_asender: ERROR set priority, ret=%d\n", rv);
5254
5255         while (get_t_state(thi) == RUNNING) {
5256                 drbd_thread_current_set_cpu(thi);
5257
5258                 rcu_read_lock();
5259                 nc = rcu_dereference(tconn->net_conf);
5260                 ping_timeo = nc->ping_timeo;
5261                 tcp_cork = nc->tcp_cork;
5262                 ping_int = nc->ping_int;
5263                 rcu_read_unlock();
5264
5265                 if (test_and_clear_bit(SEND_PING, &tconn->flags)) {
5266                         if (drbd_send_ping(tconn)) {
5267                                 conn_err(tconn, "drbd_send_ping has failed\n");
5268                                 goto reconnect;
5269                         }
5270                         tconn->meta.socket->sk->sk_rcvtimeo = ping_timeo * HZ / 10;
5271                         ping_timeout_active = true;
5272                 }
5273
5274                 /* TODO: conditionally cork; it may hurt latency if we cork without
5275                    much to send */
5276                 if (tcp_cork)
5277                         drbd_tcp_cork(tconn->meta.socket);
5278                 if (tconn_finish_peer_reqs(tconn)) {
5279                         conn_err(tconn, "tconn_finish_peer_reqs() failed\n");
5280                         goto reconnect;
5281                 }
5282                 /* but unconditionally uncork unless disabled */
5283                 if (tcp_cork)
5284                         drbd_tcp_uncork(tconn->meta.socket);
5285
5286                 /* short circuit, recv_msg would return EINTR anyways. */
5287                 if (signal_pending(current))
5288                         continue;
5289
5290                 rv = drbd_recv_short(tconn->meta.socket, buf, expect-received, 0);
5291                 clear_bit(SIGNAL_ASENDER, &tconn->flags);
5292
5293                 flush_signals(current);
5294
5295                 /* Note:
5296                  * -EINTR        (on meta) we got a signal
5297                  * -EAGAIN       (on meta) rcvtimeo expired
5298                  * -ECONNRESET   other side closed the connection
5299                  * -ERESTARTSYS  (on data) we got a signal
5300                  * rv <  0       other than above: unexpected error!
5301                  * rv == expected: full header or command
5302                  * rv <  expected: "woken" by signal during receive
5303                  * rv == 0       : "connection shut down by peer"
5304                  */
5305                 if (likely(rv > 0)) {
5306                         received += rv;
5307                         buf      += rv;
5308                 } else if (rv == 0) {
5309                         if (test_bit(DISCONNECT_SENT, &tconn->flags)) {
5310                                 long t;
5311                                 rcu_read_lock();
5312                                 t = rcu_dereference(tconn->net_conf)->ping_timeo * HZ/10;
5313                                 rcu_read_unlock();
5314
5315                                 t = wait_event_timeout(tconn->ping_wait,
5316                                                        tconn->cstate < C_WF_REPORT_PARAMS,
5317                                                        t);
5318                                 if (t)
5319                                         break;
5320                         }
5321                         conn_err(tconn, "meta connection shut down by peer.\n");
5322                         goto reconnect;
5323                 } else if (rv == -EAGAIN) {
5324                         /* If the data socket received something meanwhile,
5325                          * that is good enough: peer is still alive. */
5326                         if (time_after(tconn->last_received,
5327                                 jiffies - tconn->meta.socket->sk->sk_rcvtimeo))
5328                                 continue;
5329                         if (ping_timeout_active) {
5330                                 conn_err(tconn, "PingAck did not arrive in time.\n");
5331                                 goto reconnect;
5332                         }
5333                         set_bit(SEND_PING, &tconn->flags);
5334                         continue;
5335                 } else if (rv == -EINTR) {
5336                         continue;
5337                 } else {
5338                         conn_err(tconn, "sock_recvmsg returned %d\n", rv);
5339                         goto reconnect;
5340                 }
5341
5342                 if (received == expect && cmd == NULL) {
5343                         if (decode_header(tconn, tconn->meta.rbuf, &pi))
5344                                 goto reconnect;
5345                         cmd = &asender_tbl[pi.cmd];
5346                         if (pi.cmd >= ARRAY_SIZE(asender_tbl) || !cmd->fn) {
5347                                 conn_err(tconn, "Unexpected meta packet %s (0x%04x)\n",
5348                                          cmdname(pi.cmd), pi.cmd);
5349                                 goto disconnect;
5350                         }
5351                         expect = header_size + cmd->pkt_size;
5352                         if (pi.size != expect - header_size) {
5353                                 conn_err(tconn, "Wrong packet size on meta (c: %d, l: %d)\n",
5354                                         pi.cmd, pi.size);
5355                                 goto reconnect;
5356                         }
5357                 }
5358                 if (received == expect) {
5359                         bool err;
5360
5361                         err = cmd->fn(tconn, &pi);
5362                         if (err) {
5363                                 conn_err(tconn, "%pf failed\n", cmd->fn);
5364                                 goto reconnect;
5365                         }
5366
5367                         tconn->last_received = jiffies;
5368
5369                         if (cmd == &asender_tbl[P_PING_ACK]) {
5370                                 /* restore idle timeout */
5371                                 tconn->meta.socket->sk->sk_rcvtimeo = ping_int * HZ;
5372                                 ping_timeout_active = false;
5373                         }
5374
5375                         buf      = tconn->meta.rbuf;
5376                         received = 0;
5377                         expect   = header_size;
5378                         cmd      = NULL;
5379                 }
5380         }
5381
5382         if (0) {
5383 reconnect:
5384                 conn_request_state(tconn, NS(conn, C_NETWORK_FAILURE), CS_HARD);
5385                 conn_md_sync(tconn);
5386         }
5387         if (0) {
5388 disconnect:
5389                 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
5390         }
5391         clear_bit(SIGNAL_ASENDER, &tconn->flags);
5392
5393         conn_info(tconn, "asender terminated\n");
5394
5395         return 0;
5396 }