Merge remote-tracking branches 'regulator/fix/88pm800', 'regulator/fix/max8973',...
[linux-drm-fsl-dcu.git] / net / ceph / messenger.c
1 #include <linux/ceph/ceph_debug.h>
2
3 #include <linux/crc32c.h>
4 #include <linux/ctype.h>
5 #include <linux/highmem.h>
6 #include <linux/inet.h>
7 #include <linux/kthread.h>
8 #include <linux/net.h>
9 #include <linux/nsproxy.h>
10 #include <linux/slab.h>
11 #include <linux/socket.h>
12 #include <linux/string.h>
13 #ifdef  CONFIG_BLOCK
14 #include <linux/bio.h>
15 #endif  /* CONFIG_BLOCK */
16 #include <linux/dns_resolver.h>
17 #include <net/tcp.h>
18
19 #include <linux/ceph/ceph_features.h>
20 #include <linux/ceph/libceph.h>
21 #include <linux/ceph/messenger.h>
22 #include <linux/ceph/decode.h>
23 #include <linux/ceph/pagelist.h>
24 #include <linux/export.h>
25
26 #define list_entry_next(pos, member)                                    \
27         list_entry(pos->member.next, typeof(*pos), member)
28
29 /*
30  * Ceph uses the messenger to exchange ceph_msg messages with other
31  * hosts in the system.  The messenger provides ordered and reliable
32  * delivery.  We tolerate TCP disconnects by reconnecting (with
33  * exponential backoff) in the case of a fault (disconnection, bad
34  * crc, protocol error).  Acks allow sent messages to be discarded by
35  * the sender.
36  */
37
38 /*
39  * We track the state of the socket on a given connection using
40  * values defined below.  The transition to a new socket state is
41  * handled by a function which verifies we aren't coming from an
42  * unexpected state.
43  *
44  *      --------
45  *      | NEW* |  transient initial state
46  *      --------
47  *          | con_sock_state_init()
48  *          v
49  *      ----------
50  *      | CLOSED |  initialized, but no socket (and no
51  *      ----------  TCP connection)
52  *       ^      \
53  *       |       \ con_sock_state_connecting()
54  *       |        ----------------------
55  *       |                              \
56  *       + con_sock_state_closed()       \
57  *       |+---------------------------    \
58  *       | \                          \    \
59  *       |  -----------                \    \
60  *       |  | CLOSING |  socket event;  \    \
61  *       |  -----------  await close     \    \
62  *       |       ^                        \   |
63  *       |       |                         \  |
64  *       |       + con_sock_state_closing() \ |
65  *       |      / \                         | |
66  *       |     /   ---------------          | |
67  *       |    /                   \         v v
68  *       |   /                    --------------
69  *       |  /    -----------------| CONNECTING |  socket created, TCP
70  *       |  |   /                 --------------  connect initiated
71  *       |  |   | con_sock_state_connected()
72  *       |  |   v
73  *      -------------
74  *      | CONNECTED |  TCP connection established
75  *      -------------
76  *
77  * State values for ceph_connection->sock_state; NEW is assumed to be 0.
78  */
79
80 #define CON_SOCK_STATE_NEW              0       /* -> CLOSED */
81 #define CON_SOCK_STATE_CLOSED           1       /* -> CONNECTING */
82 #define CON_SOCK_STATE_CONNECTING       2       /* -> CONNECTED or -> CLOSING */
83 #define CON_SOCK_STATE_CONNECTED        3       /* -> CLOSING or -> CLOSED */
84 #define CON_SOCK_STATE_CLOSING          4       /* -> CLOSED */
85
86 /*
87  * connection states
88  */
89 #define CON_STATE_CLOSED        1  /* -> PREOPEN */
90 #define CON_STATE_PREOPEN       2  /* -> CONNECTING, CLOSED */
91 #define CON_STATE_CONNECTING    3  /* -> NEGOTIATING, CLOSED */
92 #define CON_STATE_NEGOTIATING   4  /* -> OPEN, CLOSED */
93 #define CON_STATE_OPEN          5  /* -> STANDBY, CLOSED */
94 #define CON_STATE_STANDBY       6  /* -> PREOPEN, CLOSED */
95
96 /*
97  * ceph_connection flag bits
98  */
99 #define CON_FLAG_LOSSYTX           0  /* we can close channel or drop
100                                        * messages on errors */
101 #define CON_FLAG_KEEPALIVE_PENDING 1  /* we need to send a keepalive */
102 #define CON_FLAG_WRITE_PENDING     2  /* we have data ready to send */
103 #define CON_FLAG_SOCK_CLOSED       3  /* socket state changed to closed */
104 #define CON_FLAG_BACKOFF           4  /* need to retry queuing delayed work */
105
106 static bool con_flag_valid(unsigned long con_flag)
107 {
108         switch (con_flag) {
109         case CON_FLAG_LOSSYTX:
110         case CON_FLAG_KEEPALIVE_PENDING:
111         case CON_FLAG_WRITE_PENDING:
112         case CON_FLAG_SOCK_CLOSED:
113         case CON_FLAG_BACKOFF:
114                 return true;
115         default:
116                 return false;
117         }
118 }
119
120 static void con_flag_clear(struct ceph_connection *con, unsigned long con_flag)
121 {
122         BUG_ON(!con_flag_valid(con_flag));
123
124         clear_bit(con_flag, &con->flags);
125 }
126
127 static void con_flag_set(struct ceph_connection *con, unsigned long con_flag)
128 {
129         BUG_ON(!con_flag_valid(con_flag));
130
131         set_bit(con_flag, &con->flags);
132 }
133
134 static bool con_flag_test(struct ceph_connection *con, unsigned long con_flag)
135 {
136         BUG_ON(!con_flag_valid(con_flag));
137
138         return test_bit(con_flag, &con->flags);
139 }
140
141 static bool con_flag_test_and_clear(struct ceph_connection *con,
142                                         unsigned long con_flag)
143 {
144         BUG_ON(!con_flag_valid(con_flag));
145
146         return test_and_clear_bit(con_flag, &con->flags);
147 }
148
149 static bool con_flag_test_and_set(struct ceph_connection *con,
150                                         unsigned long con_flag)
151 {
152         BUG_ON(!con_flag_valid(con_flag));
153
154         return test_and_set_bit(con_flag, &con->flags);
155 }
156
157 /* Slab caches for frequently-allocated structures */
158
159 static struct kmem_cache        *ceph_msg_cache;
160 static struct kmem_cache        *ceph_msg_data_cache;
161
162 /* static tag bytes (protocol control messages) */
163 static char tag_msg = CEPH_MSGR_TAG_MSG;
164 static char tag_ack = CEPH_MSGR_TAG_ACK;
165 static char tag_keepalive = CEPH_MSGR_TAG_KEEPALIVE;
166
167 #ifdef CONFIG_LOCKDEP
168 static struct lock_class_key socket_class;
169 #endif
170
171 /*
172  * When skipping (ignoring) a block of input we read it into a "skip
173  * buffer," which is this many bytes in size.
174  */
175 #define SKIP_BUF_SIZE   1024
176
177 static void queue_con(struct ceph_connection *con);
178 static void cancel_con(struct ceph_connection *con);
179 static void con_work(struct work_struct *);
180 static void con_fault(struct ceph_connection *con);
181
182 /*
183  * Nicely render a sockaddr as a string.  An array of formatted
184  * strings is used, to approximate reentrancy.
185  */
186 #define ADDR_STR_COUNT_LOG      5       /* log2(# address strings in array) */
187 #define ADDR_STR_COUNT          (1 << ADDR_STR_COUNT_LOG)
188 #define ADDR_STR_COUNT_MASK     (ADDR_STR_COUNT - 1)
189 #define MAX_ADDR_STR_LEN        64      /* 54 is enough */
190
191 static char addr_str[ADDR_STR_COUNT][MAX_ADDR_STR_LEN];
192 static atomic_t addr_str_seq = ATOMIC_INIT(0);
193
194 static struct page *zero_page;          /* used in certain error cases */
195
196 const char *ceph_pr_addr(const struct sockaddr_storage *ss)
197 {
198         int i;
199         char *s;
200         struct sockaddr_in *in4 = (struct sockaddr_in *) ss;
201         struct sockaddr_in6 *in6 = (struct sockaddr_in6 *) ss;
202
203         i = atomic_inc_return(&addr_str_seq) & ADDR_STR_COUNT_MASK;
204         s = addr_str[i];
205
206         switch (ss->ss_family) {
207         case AF_INET:
208                 snprintf(s, MAX_ADDR_STR_LEN, "%pI4:%hu", &in4->sin_addr,
209                          ntohs(in4->sin_port));
210                 break;
211
212         case AF_INET6:
213                 snprintf(s, MAX_ADDR_STR_LEN, "[%pI6c]:%hu", &in6->sin6_addr,
214                          ntohs(in6->sin6_port));
215                 break;
216
217         default:
218                 snprintf(s, MAX_ADDR_STR_LEN, "(unknown sockaddr family %hu)",
219                          ss->ss_family);
220         }
221
222         return s;
223 }
224 EXPORT_SYMBOL(ceph_pr_addr);
225
226 static void encode_my_addr(struct ceph_messenger *msgr)
227 {
228         memcpy(&msgr->my_enc_addr, &msgr->inst.addr, sizeof(msgr->my_enc_addr));
229         ceph_encode_addr(&msgr->my_enc_addr);
230 }
231
232 /*
233  * work queue for all reading and writing to/from the socket.
234  */
235 static struct workqueue_struct *ceph_msgr_wq;
236
237 static int ceph_msgr_slab_init(void)
238 {
239         BUG_ON(ceph_msg_cache);
240         ceph_msg_cache = kmem_cache_create("ceph_msg",
241                                         sizeof (struct ceph_msg),
242                                         __alignof__(struct ceph_msg), 0, NULL);
243
244         if (!ceph_msg_cache)
245                 return -ENOMEM;
246
247         BUG_ON(ceph_msg_data_cache);
248         ceph_msg_data_cache = kmem_cache_create("ceph_msg_data",
249                                         sizeof (struct ceph_msg_data),
250                                         __alignof__(struct ceph_msg_data),
251                                         0, NULL);
252         if (ceph_msg_data_cache)
253                 return 0;
254
255         kmem_cache_destroy(ceph_msg_cache);
256         ceph_msg_cache = NULL;
257
258         return -ENOMEM;
259 }
260
261 static void ceph_msgr_slab_exit(void)
262 {
263         BUG_ON(!ceph_msg_data_cache);
264         kmem_cache_destroy(ceph_msg_data_cache);
265         ceph_msg_data_cache = NULL;
266
267         BUG_ON(!ceph_msg_cache);
268         kmem_cache_destroy(ceph_msg_cache);
269         ceph_msg_cache = NULL;
270 }
271
272 static void _ceph_msgr_exit(void)
273 {
274         if (ceph_msgr_wq) {
275                 destroy_workqueue(ceph_msgr_wq);
276                 ceph_msgr_wq = NULL;
277         }
278
279         ceph_msgr_slab_exit();
280
281         BUG_ON(zero_page == NULL);
282         page_cache_release(zero_page);
283         zero_page = NULL;
284 }
285
286 int ceph_msgr_init(void)
287 {
288         BUG_ON(zero_page != NULL);
289         zero_page = ZERO_PAGE(0);
290         page_cache_get(zero_page);
291
292         if (ceph_msgr_slab_init())
293                 return -ENOMEM;
294
295         /*
296          * The number of active work items is limited by the number of
297          * connections, so leave @max_active at default.
298          */
299         ceph_msgr_wq = alloc_workqueue("ceph-msgr", WQ_MEM_RECLAIM, 0);
300         if (ceph_msgr_wq)
301                 return 0;
302
303         pr_err("msgr_init failed to create workqueue\n");
304         _ceph_msgr_exit();
305
306         return -ENOMEM;
307 }
308 EXPORT_SYMBOL(ceph_msgr_init);
309
310 void ceph_msgr_exit(void)
311 {
312         BUG_ON(ceph_msgr_wq == NULL);
313
314         _ceph_msgr_exit();
315 }
316 EXPORT_SYMBOL(ceph_msgr_exit);
317
318 void ceph_msgr_flush(void)
319 {
320         flush_workqueue(ceph_msgr_wq);
321 }
322 EXPORT_SYMBOL(ceph_msgr_flush);
323
324 /* Connection socket state transition functions */
325
326 static void con_sock_state_init(struct ceph_connection *con)
327 {
328         int old_state;
329
330         old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CLOSED);
331         if (WARN_ON(old_state != CON_SOCK_STATE_NEW))
332                 printk("%s: unexpected old state %d\n", __func__, old_state);
333         dout("%s con %p sock %d -> %d\n", __func__, con, old_state,
334              CON_SOCK_STATE_CLOSED);
335 }
336
337 static void con_sock_state_connecting(struct ceph_connection *con)
338 {
339         int old_state;
340
341         old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CONNECTING);
342         if (WARN_ON(old_state != CON_SOCK_STATE_CLOSED))
343                 printk("%s: unexpected old state %d\n", __func__, old_state);
344         dout("%s con %p sock %d -> %d\n", __func__, con, old_state,
345              CON_SOCK_STATE_CONNECTING);
346 }
347
348 static void con_sock_state_connected(struct ceph_connection *con)
349 {
350         int old_state;
351
352         old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CONNECTED);
353         if (WARN_ON(old_state != CON_SOCK_STATE_CONNECTING))
354                 printk("%s: unexpected old state %d\n", __func__, old_state);
355         dout("%s con %p sock %d -> %d\n", __func__, con, old_state,
356              CON_SOCK_STATE_CONNECTED);
357 }
358
359 static void con_sock_state_closing(struct ceph_connection *con)
360 {
361         int old_state;
362
363         old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CLOSING);
364         if (WARN_ON(old_state != CON_SOCK_STATE_CONNECTING &&
365                         old_state != CON_SOCK_STATE_CONNECTED &&
366                         old_state != CON_SOCK_STATE_CLOSING))
367                 printk("%s: unexpected old state %d\n", __func__, old_state);
368         dout("%s con %p sock %d -> %d\n", __func__, con, old_state,
369              CON_SOCK_STATE_CLOSING);
370 }
371
372 static void con_sock_state_closed(struct ceph_connection *con)
373 {
374         int old_state;
375
376         old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CLOSED);
377         if (WARN_ON(old_state != CON_SOCK_STATE_CONNECTED &&
378                     old_state != CON_SOCK_STATE_CLOSING &&
379                     old_state != CON_SOCK_STATE_CONNECTING &&
380                     old_state != CON_SOCK_STATE_CLOSED))
381                 printk("%s: unexpected old state %d\n", __func__, old_state);
382         dout("%s con %p sock %d -> %d\n", __func__, con, old_state,
383              CON_SOCK_STATE_CLOSED);
384 }
385
386 /*
387  * socket callback functions
388  */
389
390 /* data available on socket, or listen socket received a connect */
391 static void ceph_sock_data_ready(struct sock *sk)
392 {
393         struct ceph_connection *con = sk->sk_user_data;
394         if (atomic_read(&con->msgr->stopping)) {
395                 return;
396         }
397
398         if (sk->sk_state != TCP_CLOSE_WAIT) {
399                 dout("%s on %p state = %lu, queueing work\n", __func__,
400                      con, con->state);
401                 queue_con(con);
402         }
403 }
404
405 /* socket has buffer space for writing */
406 static void ceph_sock_write_space(struct sock *sk)
407 {
408         struct ceph_connection *con = sk->sk_user_data;
409
410         /* only queue to workqueue if there is data we want to write,
411          * and there is sufficient space in the socket buffer to accept
412          * more data.  clear SOCK_NOSPACE so that ceph_sock_write_space()
413          * doesn't get called again until try_write() fills the socket
414          * buffer. See net/ipv4/tcp_input.c:tcp_check_space()
415          * and net/core/stream.c:sk_stream_write_space().
416          */
417         if (con_flag_test(con, CON_FLAG_WRITE_PENDING)) {
418                 if (sk_stream_is_writeable(sk)) {
419                         dout("%s %p queueing write work\n", __func__, con);
420                         clear_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
421                         queue_con(con);
422                 }
423         } else {
424                 dout("%s %p nothing to write\n", __func__, con);
425         }
426 }
427
428 /* socket's state has changed */
429 static void ceph_sock_state_change(struct sock *sk)
430 {
431         struct ceph_connection *con = sk->sk_user_data;
432
433         dout("%s %p state = %lu sk_state = %u\n", __func__,
434              con, con->state, sk->sk_state);
435
436         switch (sk->sk_state) {
437         case TCP_CLOSE:
438                 dout("%s TCP_CLOSE\n", __func__);
439         case TCP_CLOSE_WAIT:
440                 dout("%s TCP_CLOSE_WAIT\n", __func__);
441                 con_sock_state_closing(con);
442                 con_flag_set(con, CON_FLAG_SOCK_CLOSED);
443                 queue_con(con);
444                 break;
445         case TCP_ESTABLISHED:
446                 dout("%s TCP_ESTABLISHED\n", __func__);
447                 con_sock_state_connected(con);
448                 queue_con(con);
449                 break;
450         default:        /* Everything else is uninteresting */
451                 break;
452         }
453 }
454
455 /*
456  * set up socket callbacks
457  */
458 static void set_sock_callbacks(struct socket *sock,
459                                struct ceph_connection *con)
460 {
461         struct sock *sk = sock->sk;
462         sk->sk_user_data = con;
463         sk->sk_data_ready = ceph_sock_data_ready;
464         sk->sk_write_space = ceph_sock_write_space;
465         sk->sk_state_change = ceph_sock_state_change;
466 }
467
468
469 /*
470  * socket helpers
471  */
472
473 /*
474  * initiate connection to a remote socket.
475  */
476 static int ceph_tcp_connect(struct ceph_connection *con)
477 {
478         struct sockaddr_storage *paddr = &con->peer_addr.in_addr;
479         struct socket *sock;
480         int ret;
481
482         BUG_ON(con->sock);
483         ret = sock_create_kern(read_pnet(&con->msgr->net), paddr->ss_family,
484                                SOCK_STREAM, IPPROTO_TCP, &sock);
485         if (ret)
486                 return ret;
487         sock->sk->sk_allocation = GFP_NOFS;
488
489 #ifdef CONFIG_LOCKDEP
490         lockdep_set_class(&sock->sk->sk_lock, &socket_class);
491 #endif
492
493         set_sock_callbacks(sock, con);
494
495         dout("connect %s\n", ceph_pr_addr(&con->peer_addr.in_addr));
496
497         con_sock_state_connecting(con);
498         ret = sock->ops->connect(sock, (struct sockaddr *)paddr, sizeof(*paddr),
499                                  O_NONBLOCK);
500         if (ret == -EINPROGRESS) {
501                 dout("connect %s EINPROGRESS sk_state = %u\n",
502                      ceph_pr_addr(&con->peer_addr.in_addr),
503                      sock->sk->sk_state);
504         } else if (ret < 0) {
505                 pr_err("connect %s error %d\n",
506                        ceph_pr_addr(&con->peer_addr.in_addr), ret);
507                 sock_release(sock);
508                 return ret;
509         }
510
511         if (con->msgr->tcp_nodelay) {
512                 int optval = 1;
513
514                 ret = kernel_setsockopt(sock, SOL_TCP, TCP_NODELAY,
515                                         (char *)&optval, sizeof(optval));
516                 if (ret)
517                         pr_err("kernel_setsockopt(TCP_NODELAY) failed: %d",
518                                ret);
519         }
520
521         con->sock = sock;
522         return 0;
523 }
524
525 static int ceph_tcp_recvmsg(struct socket *sock, void *buf, size_t len)
526 {
527         struct kvec iov = {buf, len};
528         struct msghdr msg = { .msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL };
529         int r;
530
531         r = kernel_recvmsg(sock, &msg, &iov, 1, len, msg.msg_flags);
532         if (r == -EAGAIN)
533                 r = 0;
534         return r;
535 }
536
537 static int ceph_tcp_recvpage(struct socket *sock, struct page *page,
538                      int page_offset, size_t length)
539 {
540         void *kaddr;
541         int ret;
542
543         BUG_ON(page_offset + length > PAGE_SIZE);
544
545         kaddr = kmap(page);
546         BUG_ON(!kaddr);
547         ret = ceph_tcp_recvmsg(sock, kaddr + page_offset, length);
548         kunmap(page);
549
550         return ret;
551 }
552
553 /*
554  * write something.  @more is true if caller will be sending more data
555  * shortly.
556  */
557 static int ceph_tcp_sendmsg(struct socket *sock, struct kvec *iov,
558                      size_t kvlen, size_t len, int more)
559 {
560         struct msghdr msg = { .msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL };
561         int r;
562
563         if (more)
564                 msg.msg_flags |= MSG_MORE;
565         else
566                 msg.msg_flags |= MSG_EOR;  /* superfluous, but what the hell */
567
568         r = kernel_sendmsg(sock, &msg, iov, kvlen, len);
569         if (r == -EAGAIN)
570                 r = 0;
571         return r;
572 }
573
574 static int __ceph_tcp_sendpage(struct socket *sock, struct page *page,
575                      int offset, size_t size, bool more)
576 {
577         int flags = MSG_DONTWAIT | MSG_NOSIGNAL | (more ? MSG_MORE : MSG_EOR);
578         int ret;
579
580         ret = kernel_sendpage(sock, page, offset, size, flags);
581         if (ret == -EAGAIN)
582                 ret = 0;
583
584         return ret;
585 }
586
587 static int ceph_tcp_sendpage(struct socket *sock, struct page *page,
588                      int offset, size_t size, bool more)
589 {
590         int ret;
591         struct kvec iov;
592
593         /* sendpage cannot properly handle pages with page_count == 0,
594          * we need to fallback to sendmsg if that's the case */
595         if (page_count(page) >= 1)
596                 return __ceph_tcp_sendpage(sock, page, offset, size, more);
597
598         iov.iov_base = kmap(page) + offset;
599         iov.iov_len = size;
600         ret = ceph_tcp_sendmsg(sock, &iov, 1, size, more);
601         kunmap(page);
602
603         return ret;
604 }
605
606 /*
607  * Shutdown/close the socket for the given connection.
608  */
609 static int con_close_socket(struct ceph_connection *con)
610 {
611         int rc = 0;
612
613         dout("con_close_socket on %p sock %p\n", con, con->sock);
614         if (con->sock) {
615                 rc = con->sock->ops->shutdown(con->sock, SHUT_RDWR);
616                 sock_release(con->sock);
617                 con->sock = NULL;
618         }
619
620         /*
621          * Forcibly clear the SOCK_CLOSED flag.  It gets set
622          * independent of the connection mutex, and we could have
623          * received a socket close event before we had the chance to
624          * shut the socket down.
625          */
626         con_flag_clear(con, CON_FLAG_SOCK_CLOSED);
627
628         con_sock_state_closed(con);
629         return rc;
630 }
631
632 /*
633  * Reset a connection.  Discard all incoming and outgoing messages
634  * and clear *_seq state.
635  */
636 static void ceph_msg_remove(struct ceph_msg *msg)
637 {
638         list_del_init(&msg->list_head);
639         BUG_ON(msg->con == NULL);
640         msg->con->ops->put(msg->con);
641         msg->con = NULL;
642
643         ceph_msg_put(msg);
644 }
645 static void ceph_msg_remove_list(struct list_head *head)
646 {
647         while (!list_empty(head)) {
648                 struct ceph_msg *msg = list_first_entry(head, struct ceph_msg,
649                                                         list_head);
650                 ceph_msg_remove(msg);
651         }
652 }
653
654 static void reset_connection(struct ceph_connection *con)
655 {
656         /* reset connection, out_queue, msg_ and connect_seq */
657         /* discard existing out_queue and msg_seq */
658         dout("reset_connection %p\n", con);
659         ceph_msg_remove_list(&con->out_queue);
660         ceph_msg_remove_list(&con->out_sent);
661
662         if (con->in_msg) {
663                 BUG_ON(con->in_msg->con != con);
664                 con->in_msg->con = NULL;
665                 ceph_msg_put(con->in_msg);
666                 con->in_msg = NULL;
667                 con->ops->put(con);
668         }
669
670         con->connect_seq = 0;
671         con->out_seq = 0;
672         if (con->out_msg) {
673                 ceph_msg_put(con->out_msg);
674                 con->out_msg = NULL;
675         }
676         con->in_seq = 0;
677         con->in_seq_acked = 0;
678 }
679
680 /*
681  * mark a peer down.  drop any open connections.
682  */
683 void ceph_con_close(struct ceph_connection *con)
684 {
685         mutex_lock(&con->mutex);
686         dout("con_close %p peer %s\n", con,
687              ceph_pr_addr(&con->peer_addr.in_addr));
688         con->state = CON_STATE_CLOSED;
689
690         con_flag_clear(con, CON_FLAG_LOSSYTX);  /* so we retry next connect */
691         con_flag_clear(con, CON_FLAG_KEEPALIVE_PENDING);
692         con_flag_clear(con, CON_FLAG_WRITE_PENDING);
693         con_flag_clear(con, CON_FLAG_BACKOFF);
694
695         reset_connection(con);
696         con->peer_global_seq = 0;
697         cancel_con(con);
698         con_close_socket(con);
699         mutex_unlock(&con->mutex);
700 }
701 EXPORT_SYMBOL(ceph_con_close);
702
703 /*
704  * Reopen a closed connection, with a new peer address.
705  */
706 void ceph_con_open(struct ceph_connection *con,
707                    __u8 entity_type, __u64 entity_num,
708                    struct ceph_entity_addr *addr)
709 {
710         mutex_lock(&con->mutex);
711         dout("con_open %p %s\n", con, ceph_pr_addr(&addr->in_addr));
712
713         WARN_ON(con->state != CON_STATE_CLOSED);
714         con->state = CON_STATE_PREOPEN;
715
716         con->peer_name.type = (__u8) entity_type;
717         con->peer_name.num = cpu_to_le64(entity_num);
718
719         memcpy(&con->peer_addr, addr, sizeof(*addr));
720         con->delay = 0;      /* reset backoff memory */
721         mutex_unlock(&con->mutex);
722         queue_con(con);
723 }
724 EXPORT_SYMBOL(ceph_con_open);
725
726 /*
727  * return true if this connection ever successfully opened
728  */
729 bool ceph_con_opened(struct ceph_connection *con)
730 {
731         return con->connect_seq > 0;
732 }
733
734 /*
735  * initialize a new connection.
736  */
737 void ceph_con_init(struct ceph_connection *con, void *private,
738         const struct ceph_connection_operations *ops,
739         struct ceph_messenger *msgr)
740 {
741         dout("con_init %p\n", con);
742         memset(con, 0, sizeof(*con));
743         con->private = private;
744         con->ops = ops;
745         con->msgr = msgr;
746
747         con_sock_state_init(con);
748
749         mutex_init(&con->mutex);
750         INIT_LIST_HEAD(&con->out_queue);
751         INIT_LIST_HEAD(&con->out_sent);
752         INIT_DELAYED_WORK(&con->work, con_work);
753
754         con->state = CON_STATE_CLOSED;
755 }
756 EXPORT_SYMBOL(ceph_con_init);
757
758
759 /*
760  * We maintain a global counter to order connection attempts.  Get
761  * a unique seq greater than @gt.
762  */
763 static u32 get_global_seq(struct ceph_messenger *msgr, u32 gt)
764 {
765         u32 ret;
766
767         spin_lock(&msgr->global_seq_lock);
768         if (msgr->global_seq < gt)
769                 msgr->global_seq = gt;
770         ret = ++msgr->global_seq;
771         spin_unlock(&msgr->global_seq_lock);
772         return ret;
773 }
774
775 static void con_out_kvec_reset(struct ceph_connection *con)
776 {
777         con->out_kvec_left = 0;
778         con->out_kvec_bytes = 0;
779         con->out_kvec_cur = &con->out_kvec[0];
780 }
781
782 static void con_out_kvec_add(struct ceph_connection *con,
783                                 size_t size, void *data)
784 {
785         int index;
786
787         index = con->out_kvec_left;
788         BUG_ON(index >= ARRAY_SIZE(con->out_kvec));
789
790         con->out_kvec[index].iov_len = size;
791         con->out_kvec[index].iov_base = data;
792         con->out_kvec_left++;
793         con->out_kvec_bytes += size;
794 }
795
796 #ifdef CONFIG_BLOCK
797
798 /*
799  * For a bio data item, a piece is whatever remains of the next
800  * entry in the current bio iovec, or the first entry in the next
801  * bio in the list.
802  */
803 static void ceph_msg_data_bio_cursor_init(struct ceph_msg_data_cursor *cursor,
804                                         size_t length)
805 {
806         struct ceph_msg_data *data = cursor->data;
807         struct bio *bio;
808
809         BUG_ON(data->type != CEPH_MSG_DATA_BIO);
810
811         bio = data->bio;
812         BUG_ON(!bio);
813
814         cursor->resid = min(length, data->bio_length);
815         cursor->bio = bio;
816         cursor->bvec_iter = bio->bi_iter;
817         cursor->last_piece =
818                 cursor->resid <= bio_iter_len(bio, cursor->bvec_iter);
819 }
820
821 static struct page *ceph_msg_data_bio_next(struct ceph_msg_data_cursor *cursor,
822                                                 size_t *page_offset,
823                                                 size_t *length)
824 {
825         struct ceph_msg_data *data = cursor->data;
826         struct bio *bio;
827         struct bio_vec bio_vec;
828
829         BUG_ON(data->type != CEPH_MSG_DATA_BIO);
830
831         bio = cursor->bio;
832         BUG_ON(!bio);
833
834         bio_vec = bio_iter_iovec(bio, cursor->bvec_iter);
835
836         *page_offset = (size_t) bio_vec.bv_offset;
837         BUG_ON(*page_offset >= PAGE_SIZE);
838         if (cursor->last_piece) /* pagelist offset is always 0 */
839                 *length = cursor->resid;
840         else
841                 *length = (size_t) bio_vec.bv_len;
842         BUG_ON(*length > cursor->resid);
843         BUG_ON(*page_offset + *length > PAGE_SIZE);
844
845         return bio_vec.bv_page;
846 }
847
848 static bool ceph_msg_data_bio_advance(struct ceph_msg_data_cursor *cursor,
849                                         size_t bytes)
850 {
851         struct bio *bio;
852         struct bio_vec bio_vec;
853
854         BUG_ON(cursor->data->type != CEPH_MSG_DATA_BIO);
855
856         bio = cursor->bio;
857         BUG_ON(!bio);
858
859         bio_vec = bio_iter_iovec(bio, cursor->bvec_iter);
860
861         /* Advance the cursor offset */
862
863         BUG_ON(cursor->resid < bytes);
864         cursor->resid -= bytes;
865
866         bio_advance_iter(bio, &cursor->bvec_iter, bytes);
867
868         if (bytes < bio_vec.bv_len)
869                 return false;   /* more bytes to process in this segment */
870
871         /* Move on to the next segment, and possibly the next bio */
872
873         if (!cursor->bvec_iter.bi_size) {
874                 bio = bio->bi_next;
875                 cursor->bio = bio;
876                 if (bio)
877                         cursor->bvec_iter = bio->bi_iter;
878                 else
879                         memset(&cursor->bvec_iter, 0,
880                                sizeof(cursor->bvec_iter));
881         }
882
883         if (!cursor->last_piece) {
884                 BUG_ON(!cursor->resid);
885                 BUG_ON(!bio);
886                 /* A short read is OK, so use <= rather than == */
887                 if (cursor->resid <= bio_iter_len(bio, cursor->bvec_iter))
888                         cursor->last_piece = true;
889         }
890
891         return true;
892 }
893 #endif /* CONFIG_BLOCK */
894
895 /*
896  * For a page array, a piece comes from the first page in the array
897  * that has not already been fully consumed.
898  */
899 static void ceph_msg_data_pages_cursor_init(struct ceph_msg_data_cursor *cursor,
900                                         size_t length)
901 {
902         struct ceph_msg_data *data = cursor->data;
903         int page_count;
904
905         BUG_ON(data->type != CEPH_MSG_DATA_PAGES);
906
907         BUG_ON(!data->pages);
908         BUG_ON(!data->length);
909
910         cursor->resid = min(length, data->length);
911         page_count = calc_pages_for(data->alignment, (u64)data->length);
912         cursor->page_offset = data->alignment & ~PAGE_MASK;
913         cursor->page_index = 0;
914         BUG_ON(page_count > (int)USHRT_MAX);
915         cursor->page_count = (unsigned short)page_count;
916         BUG_ON(length > SIZE_MAX - cursor->page_offset);
917         cursor->last_piece = cursor->page_offset + cursor->resid <= PAGE_SIZE;
918 }
919
920 static struct page *
921 ceph_msg_data_pages_next(struct ceph_msg_data_cursor *cursor,
922                                         size_t *page_offset, size_t *length)
923 {
924         struct ceph_msg_data *data = cursor->data;
925
926         BUG_ON(data->type != CEPH_MSG_DATA_PAGES);
927
928         BUG_ON(cursor->page_index >= cursor->page_count);
929         BUG_ON(cursor->page_offset >= PAGE_SIZE);
930
931         *page_offset = cursor->page_offset;
932         if (cursor->last_piece)
933                 *length = cursor->resid;
934         else
935                 *length = PAGE_SIZE - *page_offset;
936
937         return data->pages[cursor->page_index];
938 }
939
940 static bool ceph_msg_data_pages_advance(struct ceph_msg_data_cursor *cursor,
941                                                 size_t bytes)
942 {
943         BUG_ON(cursor->data->type != CEPH_MSG_DATA_PAGES);
944
945         BUG_ON(cursor->page_offset + bytes > PAGE_SIZE);
946
947         /* Advance the cursor page offset */
948
949         cursor->resid -= bytes;
950         cursor->page_offset = (cursor->page_offset + bytes) & ~PAGE_MASK;
951         if (!bytes || cursor->page_offset)
952                 return false;   /* more bytes to process in the current page */
953
954         if (!cursor->resid)
955                 return false;   /* no more data */
956
957         /* Move on to the next page; offset is already at 0 */
958
959         BUG_ON(cursor->page_index >= cursor->page_count);
960         cursor->page_index++;
961         cursor->last_piece = cursor->resid <= PAGE_SIZE;
962
963         return true;
964 }
965
966 /*
967  * For a pagelist, a piece is whatever remains to be consumed in the
968  * first page in the list, or the front of the next page.
969  */
970 static void
971 ceph_msg_data_pagelist_cursor_init(struct ceph_msg_data_cursor *cursor,
972                                         size_t length)
973 {
974         struct ceph_msg_data *data = cursor->data;
975         struct ceph_pagelist *pagelist;
976         struct page *page;
977
978         BUG_ON(data->type != CEPH_MSG_DATA_PAGELIST);
979
980         pagelist = data->pagelist;
981         BUG_ON(!pagelist);
982
983         if (!length)
984                 return;         /* pagelist can be assigned but empty */
985
986         BUG_ON(list_empty(&pagelist->head));
987         page = list_first_entry(&pagelist->head, struct page, lru);
988
989         cursor->resid = min(length, pagelist->length);
990         cursor->page = page;
991         cursor->offset = 0;
992         cursor->last_piece = cursor->resid <= PAGE_SIZE;
993 }
994
995 static struct page *
996 ceph_msg_data_pagelist_next(struct ceph_msg_data_cursor *cursor,
997                                 size_t *page_offset, size_t *length)
998 {
999         struct ceph_msg_data *data = cursor->data;
1000         struct ceph_pagelist *pagelist;
1001
1002         BUG_ON(data->type != CEPH_MSG_DATA_PAGELIST);
1003
1004         pagelist = data->pagelist;
1005         BUG_ON(!pagelist);
1006
1007         BUG_ON(!cursor->page);
1008         BUG_ON(cursor->offset + cursor->resid != pagelist->length);
1009
1010         /* offset of first page in pagelist is always 0 */
1011         *page_offset = cursor->offset & ~PAGE_MASK;
1012         if (cursor->last_piece)
1013                 *length = cursor->resid;
1014         else
1015                 *length = PAGE_SIZE - *page_offset;
1016
1017         return cursor->page;
1018 }
1019
1020 static bool ceph_msg_data_pagelist_advance(struct ceph_msg_data_cursor *cursor,
1021                                                 size_t bytes)
1022 {
1023         struct ceph_msg_data *data = cursor->data;
1024         struct ceph_pagelist *pagelist;
1025
1026         BUG_ON(data->type != CEPH_MSG_DATA_PAGELIST);
1027
1028         pagelist = data->pagelist;
1029         BUG_ON(!pagelist);
1030
1031         BUG_ON(cursor->offset + cursor->resid != pagelist->length);
1032         BUG_ON((cursor->offset & ~PAGE_MASK) + bytes > PAGE_SIZE);
1033
1034         /* Advance the cursor offset */
1035
1036         cursor->resid -= bytes;
1037         cursor->offset += bytes;
1038         /* offset of first page in pagelist is always 0 */
1039         if (!bytes || cursor->offset & ~PAGE_MASK)
1040                 return false;   /* more bytes to process in the current page */
1041
1042         if (!cursor->resid)
1043                 return false;   /* no more data */
1044
1045         /* Move on to the next page */
1046
1047         BUG_ON(list_is_last(&cursor->page->lru, &pagelist->head));
1048         cursor->page = list_entry_next(cursor->page, lru);
1049         cursor->last_piece = cursor->resid <= PAGE_SIZE;
1050
1051         return true;
1052 }
1053
1054 /*
1055  * Message data is handled (sent or received) in pieces, where each
1056  * piece resides on a single page.  The network layer might not
1057  * consume an entire piece at once.  A data item's cursor keeps
1058  * track of which piece is next to process and how much remains to
1059  * be processed in that piece.  It also tracks whether the current
1060  * piece is the last one in the data item.
1061  */
1062 static void __ceph_msg_data_cursor_init(struct ceph_msg_data_cursor *cursor)
1063 {
1064         size_t length = cursor->total_resid;
1065
1066         switch (cursor->data->type) {
1067         case CEPH_MSG_DATA_PAGELIST:
1068                 ceph_msg_data_pagelist_cursor_init(cursor, length);
1069                 break;
1070         case CEPH_MSG_DATA_PAGES:
1071                 ceph_msg_data_pages_cursor_init(cursor, length);
1072                 break;
1073 #ifdef CONFIG_BLOCK
1074         case CEPH_MSG_DATA_BIO:
1075                 ceph_msg_data_bio_cursor_init(cursor, length);
1076                 break;
1077 #endif /* CONFIG_BLOCK */
1078         case CEPH_MSG_DATA_NONE:
1079         default:
1080                 /* BUG(); */
1081                 break;
1082         }
1083         cursor->need_crc = true;
1084 }
1085
1086 static void ceph_msg_data_cursor_init(struct ceph_msg *msg, size_t length)
1087 {
1088         struct ceph_msg_data_cursor *cursor = &msg->cursor;
1089         struct ceph_msg_data *data;
1090
1091         BUG_ON(!length);
1092         BUG_ON(length > msg->data_length);
1093         BUG_ON(list_empty(&msg->data));
1094
1095         cursor->data_head = &msg->data;
1096         cursor->total_resid = length;
1097         data = list_first_entry(&msg->data, struct ceph_msg_data, links);
1098         cursor->data = data;
1099
1100         __ceph_msg_data_cursor_init(cursor);
1101 }
1102
1103 /*
1104  * Return the page containing the next piece to process for a given
1105  * data item, and supply the page offset and length of that piece.
1106  * Indicate whether this is the last piece in this data item.
1107  */
1108 static struct page *ceph_msg_data_next(struct ceph_msg_data_cursor *cursor,
1109                                         size_t *page_offset, size_t *length,
1110                                         bool *last_piece)
1111 {
1112         struct page *page;
1113
1114         switch (cursor->data->type) {
1115         case CEPH_MSG_DATA_PAGELIST:
1116                 page = ceph_msg_data_pagelist_next(cursor, page_offset, length);
1117                 break;
1118         case CEPH_MSG_DATA_PAGES:
1119                 page = ceph_msg_data_pages_next(cursor, page_offset, length);
1120                 break;
1121 #ifdef CONFIG_BLOCK
1122         case CEPH_MSG_DATA_BIO:
1123                 page = ceph_msg_data_bio_next(cursor, page_offset, length);
1124                 break;
1125 #endif /* CONFIG_BLOCK */
1126         case CEPH_MSG_DATA_NONE:
1127         default:
1128                 page = NULL;
1129                 break;
1130         }
1131         BUG_ON(!page);
1132         BUG_ON(*page_offset + *length > PAGE_SIZE);
1133         BUG_ON(!*length);
1134         if (last_piece)
1135                 *last_piece = cursor->last_piece;
1136
1137         return page;
1138 }
1139
1140 /*
1141  * Returns true if the result moves the cursor on to the next piece
1142  * of the data item.
1143  */
1144 static bool ceph_msg_data_advance(struct ceph_msg_data_cursor *cursor,
1145                                 size_t bytes)
1146 {
1147         bool new_piece;
1148
1149         BUG_ON(bytes > cursor->resid);
1150         switch (cursor->data->type) {
1151         case CEPH_MSG_DATA_PAGELIST:
1152                 new_piece = ceph_msg_data_pagelist_advance(cursor, bytes);
1153                 break;
1154         case CEPH_MSG_DATA_PAGES:
1155                 new_piece = ceph_msg_data_pages_advance(cursor, bytes);
1156                 break;
1157 #ifdef CONFIG_BLOCK
1158         case CEPH_MSG_DATA_BIO:
1159                 new_piece = ceph_msg_data_bio_advance(cursor, bytes);
1160                 break;
1161 #endif /* CONFIG_BLOCK */
1162         case CEPH_MSG_DATA_NONE:
1163         default:
1164                 BUG();
1165                 break;
1166         }
1167         cursor->total_resid -= bytes;
1168
1169         if (!cursor->resid && cursor->total_resid) {
1170                 WARN_ON(!cursor->last_piece);
1171                 BUG_ON(list_is_last(&cursor->data->links, cursor->data_head));
1172                 cursor->data = list_entry_next(cursor->data, links);
1173                 __ceph_msg_data_cursor_init(cursor);
1174                 new_piece = true;
1175         }
1176         cursor->need_crc = new_piece;
1177
1178         return new_piece;
1179 }
1180
1181 static void prepare_message_data(struct ceph_msg *msg, u32 data_len)
1182 {
1183         BUG_ON(!msg);
1184         BUG_ON(!data_len);
1185
1186         /* Initialize data cursor */
1187
1188         ceph_msg_data_cursor_init(msg, (size_t)data_len);
1189 }
1190
1191 /*
1192  * Prepare footer for currently outgoing message, and finish things
1193  * off.  Assumes out_kvec* are already valid.. we just add on to the end.
1194  */
1195 static void prepare_write_message_footer(struct ceph_connection *con)
1196 {
1197         struct ceph_msg *m = con->out_msg;
1198         int v = con->out_kvec_left;
1199
1200         m->footer.flags |= CEPH_MSG_FOOTER_COMPLETE;
1201
1202         dout("prepare_write_message_footer %p\n", con);
1203         con->out_kvec_is_msg = true;
1204         con->out_kvec[v].iov_base = &m->footer;
1205         if (con->peer_features & CEPH_FEATURE_MSG_AUTH) {
1206                 if (con->ops->sign_message)
1207                         con->ops->sign_message(con, m);
1208                 else
1209                         m->footer.sig = 0;
1210                 con->out_kvec[v].iov_len = sizeof(m->footer);
1211                 con->out_kvec_bytes += sizeof(m->footer);
1212         } else {
1213                 m->old_footer.flags = m->footer.flags;
1214                 con->out_kvec[v].iov_len = sizeof(m->old_footer);
1215                 con->out_kvec_bytes += sizeof(m->old_footer);
1216         }
1217         con->out_kvec_left++;
1218         con->out_more = m->more_to_follow;
1219         con->out_msg_done = true;
1220 }
1221
1222 /*
1223  * Prepare headers for the next outgoing message.
1224  */
1225 static void prepare_write_message(struct ceph_connection *con)
1226 {
1227         struct ceph_msg *m;
1228         u32 crc;
1229
1230         con_out_kvec_reset(con);
1231         con->out_kvec_is_msg = true;
1232         con->out_msg_done = false;
1233
1234         /* Sneak an ack in there first?  If we can get it into the same
1235          * TCP packet that's a good thing. */
1236         if (con->in_seq > con->in_seq_acked) {
1237                 con->in_seq_acked = con->in_seq;
1238                 con_out_kvec_add(con, sizeof (tag_ack), &tag_ack);
1239                 con->out_temp_ack = cpu_to_le64(con->in_seq_acked);
1240                 con_out_kvec_add(con, sizeof (con->out_temp_ack),
1241                         &con->out_temp_ack);
1242         }
1243
1244         BUG_ON(list_empty(&con->out_queue));
1245         m = list_first_entry(&con->out_queue, struct ceph_msg, list_head);
1246         con->out_msg = m;
1247         BUG_ON(m->con != con);
1248
1249         /* put message on sent list */
1250         ceph_msg_get(m);
1251         list_move_tail(&m->list_head, &con->out_sent);
1252
1253         /*
1254          * only assign outgoing seq # if we haven't sent this message
1255          * yet.  if it is requeued, resend with it's original seq.
1256          */
1257         if (m->needs_out_seq) {
1258                 m->hdr.seq = cpu_to_le64(++con->out_seq);
1259                 m->needs_out_seq = false;
1260         }
1261         WARN_ON(m->data_length != le32_to_cpu(m->hdr.data_len));
1262
1263         dout("prepare_write_message %p seq %lld type %d len %d+%d+%zd\n",
1264              m, con->out_seq, le16_to_cpu(m->hdr.type),
1265              le32_to_cpu(m->hdr.front_len), le32_to_cpu(m->hdr.middle_len),
1266              m->data_length);
1267         BUG_ON(le32_to_cpu(m->hdr.front_len) != m->front.iov_len);
1268
1269         /* tag + hdr + front + middle */
1270         con_out_kvec_add(con, sizeof (tag_msg), &tag_msg);
1271         con_out_kvec_add(con, sizeof (m->hdr), &m->hdr);
1272         con_out_kvec_add(con, m->front.iov_len, m->front.iov_base);
1273
1274         if (m->middle)
1275                 con_out_kvec_add(con, m->middle->vec.iov_len,
1276                         m->middle->vec.iov_base);
1277
1278         /* fill in crc (except data pages), footer */
1279         crc = crc32c(0, &m->hdr, offsetof(struct ceph_msg_header, crc));
1280         con->out_msg->hdr.crc = cpu_to_le32(crc);
1281         con->out_msg->footer.flags = 0;
1282
1283         crc = crc32c(0, m->front.iov_base, m->front.iov_len);
1284         con->out_msg->footer.front_crc = cpu_to_le32(crc);
1285         if (m->middle) {
1286                 crc = crc32c(0, m->middle->vec.iov_base,
1287                                 m->middle->vec.iov_len);
1288                 con->out_msg->footer.middle_crc = cpu_to_le32(crc);
1289         } else
1290                 con->out_msg->footer.middle_crc = 0;
1291         dout("%s front_crc %u middle_crc %u\n", __func__,
1292              le32_to_cpu(con->out_msg->footer.front_crc),
1293              le32_to_cpu(con->out_msg->footer.middle_crc));
1294
1295         /* is there a data payload? */
1296         con->out_msg->footer.data_crc = 0;
1297         if (m->data_length) {
1298                 prepare_message_data(con->out_msg, m->data_length);
1299                 con->out_more = 1;  /* data + footer will follow */
1300         } else {
1301                 /* no, queue up footer too and be done */
1302                 prepare_write_message_footer(con);
1303         }
1304
1305         con_flag_set(con, CON_FLAG_WRITE_PENDING);
1306 }
1307
1308 /*
1309  * Prepare an ack.
1310  */
1311 static void prepare_write_ack(struct ceph_connection *con)
1312 {
1313         dout("prepare_write_ack %p %llu -> %llu\n", con,
1314              con->in_seq_acked, con->in_seq);
1315         con->in_seq_acked = con->in_seq;
1316
1317         con_out_kvec_reset(con);
1318
1319         con_out_kvec_add(con, sizeof (tag_ack), &tag_ack);
1320
1321         con->out_temp_ack = cpu_to_le64(con->in_seq_acked);
1322         con_out_kvec_add(con, sizeof (con->out_temp_ack),
1323                                 &con->out_temp_ack);
1324
1325         con->out_more = 1;  /* more will follow.. eventually.. */
1326         con_flag_set(con, CON_FLAG_WRITE_PENDING);
1327 }
1328
1329 /*
1330  * Prepare to share the seq during handshake
1331  */
1332 static void prepare_write_seq(struct ceph_connection *con)
1333 {
1334         dout("prepare_write_seq %p %llu -> %llu\n", con,
1335              con->in_seq_acked, con->in_seq);
1336         con->in_seq_acked = con->in_seq;
1337
1338         con_out_kvec_reset(con);
1339
1340         con->out_temp_ack = cpu_to_le64(con->in_seq_acked);
1341         con_out_kvec_add(con, sizeof (con->out_temp_ack),
1342                          &con->out_temp_ack);
1343
1344         con_flag_set(con, CON_FLAG_WRITE_PENDING);
1345 }
1346
1347 /*
1348  * Prepare to write keepalive byte.
1349  */
1350 static void prepare_write_keepalive(struct ceph_connection *con)
1351 {
1352         dout("prepare_write_keepalive %p\n", con);
1353         con_out_kvec_reset(con);
1354         con_out_kvec_add(con, sizeof (tag_keepalive), &tag_keepalive);
1355         con_flag_set(con, CON_FLAG_WRITE_PENDING);
1356 }
1357
1358 /*
1359  * Connection negotiation.
1360  */
1361
1362 static struct ceph_auth_handshake *get_connect_authorizer(struct ceph_connection *con,
1363                                                 int *auth_proto)
1364 {
1365         struct ceph_auth_handshake *auth;
1366
1367         if (!con->ops->get_authorizer) {
1368                 con->out_connect.authorizer_protocol = CEPH_AUTH_UNKNOWN;
1369                 con->out_connect.authorizer_len = 0;
1370                 return NULL;
1371         }
1372
1373         /* Can't hold the mutex while getting authorizer */
1374         mutex_unlock(&con->mutex);
1375         auth = con->ops->get_authorizer(con, auth_proto, con->auth_retry);
1376         mutex_lock(&con->mutex);
1377
1378         if (IS_ERR(auth))
1379                 return auth;
1380         if (con->state != CON_STATE_NEGOTIATING)
1381                 return ERR_PTR(-EAGAIN);
1382
1383         con->auth_reply_buf = auth->authorizer_reply_buf;
1384         con->auth_reply_buf_len = auth->authorizer_reply_buf_len;
1385         return auth;
1386 }
1387
1388 /*
1389  * We connected to a peer and are saying hello.
1390  */
1391 static void prepare_write_banner(struct ceph_connection *con)
1392 {
1393         con_out_kvec_add(con, strlen(CEPH_BANNER), CEPH_BANNER);
1394         con_out_kvec_add(con, sizeof (con->msgr->my_enc_addr),
1395                                         &con->msgr->my_enc_addr);
1396
1397         con->out_more = 0;
1398         con_flag_set(con, CON_FLAG_WRITE_PENDING);
1399 }
1400
1401 static int prepare_write_connect(struct ceph_connection *con)
1402 {
1403         unsigned int global_seq = get_global_seq(con->msgr, 0);
1404         int proto;
1405         int auth_proto;
1406         struct ceph_auth_handshake *auth;
1407
1408         switch (con->peer_name.type) {
1409         case CEPH_ENTITY_TYPE_MON:
1410                 proto = CEPH_MONC_PROTOCOL;
1411                 break;
1412         case CEPH_ENTITY_TYPE_OSD:
1413                 proto = CEPH_OSDC_PROTOCOL;
1414                 break;
1415         case CEPH_ENTITY_TYPE_MDS:
1416                 proto = CEPH_MDSC_PROTOCOL;
1417                 break;
1418         default:
1419                 BUG();
1420         }
1421
1422         dout("prepare_write_connect %p cseq=%d gseq=%d proto=%d\n", con,
1423              con->connect_seq, global_seq, proto);
1424
1425         con->out_connect.features = cpu_to_le64(con->msgr->supported_features);
1426         con->out_connect.host_type = cpu_to_le32(CEPH_ENTITY_TYPE_CLIENT);
1427         con->out_connect.connect_seq = cpu_to_le32(con->connect_seq);
1428         con->out_connect.global_seq = cpu_to_le32(global_seq);
1429         con->out_connect.protocol_version = cpu_to_le32(proto);
1430         con->out_connect.flags = 0;
1431
1432         auth_proto = CEPH_AUTH_UNKNOWN;
1433         auth = get_connect_authorizer(con, &auth_proto);
1434         if (IS_ERR(auth))
1435                 return PTR_ERR(auth);
1436
1437         con->out_connect.authorizer_protocol = cpu_to_le32(auth_proto);
1438         con->out_connect.authorizer_len = auth ?
1439                 cpu_to_le32(auth->authorizer_buf_len) : 0;
1440
1441         con_out_kvec_add(con, sizeof (con->out_connect),
1442                                         &con->out_connect);
1443         if (auth && auth->authorizer_buf_len)
1444                 con_out_kvec_add(con, auth->authorizer_buf_len,
1445                                         auth->authorizer_buf);
1446
1447         con->out_more = 0;
1448         con_flag_set(con, CON_FLAG_WRITE_PENDING);
1449
1450         return 0;
1451 }
1452
1453 /*
1454  * write as much of pending kvecs to the socket as we can.
1455  *  1 -> done
1456  *  0 -> socket full, but more to do
1457  * <0 -> error
1458  */
1459 static int write_partial_kvec(struct ceph_connection *con)
1460 {
1461         int ret;
1462
1463         dout("write_partial_kvec %p %d left\n", con, con->out_kvec_bytes);
1464         while (con->out_kvec_bytes > 0) {
1465                 ret = ceph_tcp_sendmsg(con->sock, con->out_kvec_cur,
1466                                        con->out_kvec_left, con->out_kvec_bytes,
1467                                        con->out_more);
1468                 if (ret <= 0)
1469                         goto out;
1470                 con->out_kvec_bytes -= ret;
1471                 if (con->out_kvec_bytes == 0)
1472                         break;            /* done */
1473
1474                 /* account for full iov entries consumed */
1475                 while (ret >= con->out_kvec_cur->iov_len) {
1476                         BUG_ON(!con->out_kvec_left);
1477                         ret -= con->out_kvec_cur->iov_len;
1478                         con->out_kvec_cur++;
1479                         con->out_kvec_left--;
1480                 }
1481                 /* and for a partially-consumed entry */
1482                 if (ret) {
1483                         con->out_kvec_cur->iov_len -= ret;
1484                         con->out_kvec_cur->iov_base += ret;
1485                 }
1486         }
1487         con->out_kvec_left = 0;
1488         con->out_kvec_is_msg = false;
1489         ret = 1;
1490 out:
1491         dout("write_partial_kvec %p %d left in %d kvecs ret = %d\n", con,
1492              con->out_kvec_bytes, con->out_kvec_left, ret);
1493         return ret;  /* done! */
1494 }
1495
1496 static u32 ceph_crc32c_page(u32 crc, struct page *page,
1497                                 unsigned int page_offset,
1498                                 unsigned int length)
1499 {
1500         char *kaddr;
1501
1502         kaddr = kmap(page);
1503         BUG_ON(kaddr == NULL);
1504         crc = crc32c(crc, kaddr + page_offset, length);
1505         kunmap(page);
1506
1507         return crc;
1508 }
1509 /*
1510  * Write as much message data payload as we can.  If we finish, queue
1511  * up the footer.
1512  *  1 -> done, footer is now queued in out_kvec[].
1513  *  0 -> socket full, but more to do
1514  * <0 -> error
1515  */
1516 static int write_partial_message_data(struct ceph_connection *con)
1517 {
1518         struct ceph_msg *msg = con->out_msg;
1519         struct ceph_msg_data_cursor *cursor = &msg->cursor;
1520         bool do_datacrc = !con->msgr->nocrc;
1521         u32 crc;
1522
1523         dout("%s %p msg %p\n", __func__, con, msg);
1524
1525         if (list_empty(&msg->data))
1526                 return -EINVAL;
1527
1528         /*
1529          * Iterate through each page that contains data to be
1530          * written, and send as much as possible for each.
1531          *
1532          * If we are calculating the data crc (the default), we will
1533          * need to map the page.  If we have no pages, they have
1534          * been revoked, so use the zero page.
1535          */
1536         crc = do_datacrc ? le32_to_cpu(msg->footer.data_crc) : 0;
1537         while (cursor->resid) {
1538                 struct page *page;
1539                 size_t page_offset;
1540                 size_t length;
1541                 bool last_piece;
1542                 bool need_crc;
1543                 int ret;
1544
1545                 page = ceph_msg_data_next(&msg->cursor, &page_offset, &length,
1546                                                         &last_piece);
1547                 ret = ceph_tcp_sendpage(con->sock, page, page_offset,
1548                                         length, !last_piece);
1549                 if (ret <= 0) {
1550                         if (do_datacrc)
1551                                 msg->footer.data_crc = cpu_to_le32(crc);
1552
1553                         return ret;
1554                 }
1555                 if (do_datacrc && cursor->need_crc)
1556                         crc = ceph_crc32c_page(crc, page, page_offset, length);
1557                 need_crc = ceph_msg_data_advance(&msg->cursor, (size_t)ret);
1558         }
1559
1560         dout("%s %p msg %p done\n", __func__, con, msg);
1561
1562         /* prepare and queue up footer, too */
1563         if (do_datacrc)
1564                 msg->footer.data_crc = cpu_to_le32(crc);
1565         else
1566                 msg->footer.flags |= CEPH_MSG_FOOTER_NOCRC;
1567         con_out_kvec_reset(con);
1568         prepare_write_message_footer(con);
1569
1570         return 1;       /* must return > 0 to indicate success */
1571 }
1572
1573 /*
1574  * write some zeros
1575  */
1576 static int write_partial_skip(struct ceph_connection *con)
1577 {
1578         int ret;
1579
1580         while (con->out_skip > 0) {
1581                 size_t size = min(con->out_skip, (int) PAGE_CACHE_SIZE);
1582
1583                 ret = ceph_tcp_sendpage(con->sock, zero_page, 0, size, true);
1584                 if (ret <= 0)
1585                         goto out;
1586                 con->out_skip -= ret;
1587         }
1588         ret = 1;
1589 out:
1590         return ret;
1591 }
1592
1593 /*
1594  * Prepare to read connection handshake, or an ack.
1595  */
1596 static void prepare_read_banner(struct ceph_connection *con)
1597 {
1598         dout("prepare_read_banner %p\n", con);
1599         con->in_base_pos = 0;
1600 }
1601
1602 static void prepare_read_connect(struct ceph_connection *con)
1603 {
1604         dout("prepare_read_connect %p\n", con);
1605         con->in_base_pos = 0;
1606 }
1607
1608 static void prepare_read_ack(struct ceph_connection *con)
1609 {
1610         dout("prepare_read_ack %p\n", con);
1611         con->in_base_pos = 0;
1612 }
1613
1614 static void prepare_read_seq(struct ceph_connection *con)
1615 {
1616         dout("prepare_read_seq %p\n", con);
1617         con->in_base_pos = 0;
1618         con->in_tag = CEPH_MSGR_TAG_SEQ;
1619 }
1620
1621 static void prepare_read_tag(struct ceph_connection *con)
1622 {
1623         dout("prepare_read_tag %p\n", con);
1624         con->in_base_pos = 0;
1625         con->in_tag = CEPH_MSGR_TAG_READY;
1626 }
1627
1628 /*
1629  * Prepare to read a message.
1630  */
1631 static int prepare_read_message(struct ceph_connection *con)
1632 {
1633         dout("prepare_read_message %p\n", con);
1634         BUG_ON(con->in_msg != NULL);
1635         con->in_base_pos = 0;
1636         con->in_front_crc = con->in_middle_crc = con->in_data_crc = 0;
1637         return 0;
1638 }
1639
1640
1641 static int read_partial(struct ceph_connection *con,
1642                         int end, int size, void *object)
1643 {
1644         while (con->in_base_pos < end) {
1645                 int left = end - con->in_base_pos;
1646                 int have = size - left;
1647                 int ret = ceph_tcp_recvmsg(con->sock, object + have, left);
1648                 if (ret <= 0)
1649                         return ret;
1650                 con->in_base_pos += ret;
1651         }
1652         return 1;
1653 }
1654
1655
1656 /*
1657  * Read all or part of the connect-side handshake on a new connection
1658  */
1659 static int read_partial_banner(struct ceph_connection *con)
1660 {
1661         int size;
1662         int end;
1663         int ret;
1664
1665         dout("read_partial_banner %p at %d\n", con, con->in_base_pos);
1666
1667         /* peer's banner */
1668         size = strlen(CEPH_BANNER);
1669         end = size;
1670         ret = read_partial(con, end, size, con->in_banner);
1671         if (ret <= 0)
1672                 goto out;
1673
1674         size = sizeof (con->actual_peer_addr);
1675         end += size;
1676         ret = read_partial(con, end, size, &con->actual_peer_addr);
1677         if (ret <= 0)
1678                 goto out;
1679
1680         size = sizeof (con->peer_addr_for_me);
1681         end += size;
1682         ret = read_partial(con, end, size, &con->peer_addr_for_me);
1683         if (ret <= 0)
1684                 goto out;
1685
1686 out:
1687         return ret;
1688 }
1689
1690 static int read_partial_connect(struct ceph_connection *con)
1691 {
1692         int size;
1693         int end;
1694         int ret;
1695
1696         dout("read_partial_connect %p at %d\n", con, con->in_base_pos);
1697
1698         size = sizeof (con->in_reply);
1699         end = size;
1700         ret = read_partial(con, end, size, &con->in_reply);
1701         if (ret <= 0)
1702                 goto out;
1703
1704         size = le32_to_cpu(con->in_reply.authorizer_len);
1705         end += size;
1706         ret = read_partial(con, end, size, con->auth_reply_buf);
1707         if (ret <= 0)
1708                 goto out;
1709
1710         dout("read_partial_connect %p tag %d, con_seq = %u, g_seq = %u\n",
1711              con, (int)con->in_reply.tag,
1712              le32_to_cpu(con->in_reply.connect_seq),
1713              le32_to_cpu(con->in_reply.global_seq));
1714 out:
1715         return ret;
1716
1717 }
1718
1719 /*
1720  * Verify the hello banner looks okay.
1721  */
1722 static int verify_hello(struct ceph_connection *con)
1723 {
1724         if (memcmp(con->in_banner, CEPH_BANNER, strlen(CEPH_BANNER))) {
1725                 pr_err("connect to %s got bad banner\n",
1726                        ceph_pr_addr(&con->peer_addr.in_addr));
1727                 con->error_msg = "protocol error, bad banner";
1728                 return -1;
1729         }
1730         return 0;
1731 }
1732
1733 static bool addr_is_blank(struct sockaddr_storage *ss)
1734 {
1735         struct in_addr *addr = &((struct sockaddr_in *)ss)->sin_addr;
1736         struct in6_addr *addr6 = &((struct sockaddr_in6 *)ss)->sin6_addr;
1737
1738         switch (ss->ss_family) {
1739         case AF_INET:
1740                 return addr->s_addr == htonl(INADDR_ANY);
1741         case AF_INET6:
1742                 return ipv6_addr_any(addr6);
1743         default:
1744                 return true;
1745         }
1746 }
1747
1748 static int addr_port(struct sockaddr_storage *ss)
1749 {
1750         switch (ss->ss_family) {
1751         case AF_INET:
1752                 return ntohs(((struct sockaddr_in *)ss)->sin_port);
1753         case AF_INET6:
1754                 return ntohs(((struct sockaddr_in6 *)ss)->sin6_port);
1755         }
1756         return 0;
1757 }
1758
1759 static void addr_set_port(struct sockaddr_storage *ss, int p)
1760 {
1761         switch (ss->ss_family) {
1762         case AF_INET:
1763                 ((struct sockaddr_in *)ss)->sin_port = htons(p);
1764                 break;
1765         case AF_INET6:
1766                 ((struct sockaddr_in6 *)ss)->sin6_port = htons(p);
1767                 break;
1768         }
1769 }
1770
1771 /*
1772  * Unlike other *_pton function semantics, zero indicates success.
1773  */
1774 static int ceph_pton(const char *str, size_t len, struct sockaddr_storage *ss,
1775                 char delim, const char **ipend)
1776 {
1777         struct sockaddr_in *in4 = (struct sockaddr_in *) ss;
1778         struct sockaddr_in6 *in6 = (struct sockaddr_in6 *) ss;
1779
1780         memset(ss, 0, sizeof(*ss));
1781
1782         if (in4_pton(str, len, (u8 *)&in4->sin_addr.s_addr, delim, ipend)) {
1783                 ss->ss_family = AF_INET;
1784                 return 0;
1785         }
1786
1787         if (in6_pton(str, len, (u8 *)&in6->sin6_addr.s6_addr, delim, ipend)) {
1788                 ss->ss_family = AF_INET6;
1789                 return 0;
1790         }
1791
1792         return -EINVAL;
1793 }
1794
1795 /*
1796  * Extract hostname string and resolve using kernel DNS facility.
1797  */
1798 #ifdef CONFIG_CEPH_LIB_USE_DNS_RESOLVER
1799 static int ceph_dns_resolve_name(const char *name, size_t namelen,
1800                 struct sockaddr_storage *ss, char delim, const char **ipend)
1801 {
1802         const char *end, *delim_p;
1803         char *colon_p, *ip_addr = NULL;
1804         int ip_len, ret;
1805
1806         /*
1807          * The end of the hostname occurs immediately preceding the delimiter or
1808          * the port marker (':') where the delimiter takes precedence.
1809          */
1810         delim_p = memchr(name, delim, namelen);
1811         colon_p = memchr(name, ':', namelen);
1812
1813         if (delim_p && colon_p)
1814                 end = delim_p < colon_p ? delim_p : colon_p;
1815         else if (!delim_p && colon_p)
1816                 end = colon_p;
1817         else {
1818                 end = delim_p;
1819                 if (!end) /* case: hostname:/ */
1820                         end = name + namelen;
1821         }
1822
1823         if (end <= name)
1824                 return -EINVAL;
1825
1826         /* do dns_resolve upcall */
1827         ip_len = dns_query(NULL, name, end - name, NULL, &ip_addr, NULL);
1828         if (ip_len > 0)
1829                 ret = ceph_pton(ip_addr, ip_len, ss, -1, NULL);
1830         else
1831                 ret = -ESRCH;
1832
1833         kfree(ip_addr);
1834
1835         *ipend = end;
1836
1837         pr_info("resolve '%.*s' (ret=%d): %s\n", (int)(end - name), name,
1838                         ret, ret ? "failed" : ceph_pr_addr(ss));
1839
1840         return ret;
1841 }
1842 #else
1843 static inline int ceph_dns_resolve_name(const char *name, size_t namelen,
1844                 struct sockaddr_storage *ss, char delim, const char **ipend)
1845 {
1846         return -EINVAL;
1847 }
1848 #endif
1849
1850 /*
1851  * Parse a server name (IP or hostname). If a valid IP address is not found
1852  * then try to extract a hostname to resolve using userspace DNS upcall.
1853  */
1854 static int ceph_parse_server_name(const char *name, size_t namelen,
1855                         struct sockaddr_storage *ss, char delim, const char **ipend)
1856 {
1857         int ret;
1858
1859         ret = ceph_pton(name, namelen, ss, delim, ipend);
1860         if (ret)
1861                 ret = ceph_dns_resolve_name(name, namelen, ss, delim, ipend);
1862
1863         return ret;
1864 }
1865
1866 /*
1867  * Parse an ip[:port] list into an addr array.  Use the default
1868  * monitor port if a port isn't specified.
1869  */
1870 int ceph_parse_ips(const char *c, const char *end,
1871                    struct ceph_entity_addr *addr,
1872                    int max_count, int *count)
1873 {
1874         int i, ret = -EINVAL;
1875         const char *p = c;
1876
1877         dout("parse_ips on '%.*s'\n", (int)(end-c), c);
1878         for (i = 0; i < max_count; i++) {
1879                 const char *ipend;
1880                 struct sockaddr_storage *ss = &addr[i].in_addr;
1881                 int port;
1882                 char delim = ',';
1883
1884                 if (*p == '[') {
1885                         delim = ']';
1886                         p++;
1887                 }
1888
1889                 ret = ceph_parse_server_name(p, end - p, ss, delim, &ipend);
1890                 if (ret)
1891                         goto bad;
1892                 ret = -EINVAL;
1893
1894                 p = ipend;
1895
1896                 if (delim == ']') {
1897                         if (*p != ']') {
1898                                 dout("missing matching ']'\n");
1899                                 goto bad;
1900                         }
1901                         p++;
1902                 }
1903
1904                 /* port? */
1905                 if (p < end && *p == ':') {
1906                         port = 0;
1907                         p++;
1908                         while (p < end && *p >= '0' && *p <= '9') {
1909                                 port = (port * 10) + (*p - '0');
1910                                 p++;
1911                         }
1912                         if (port == 0)
1913                                 port = CEPH_MON_PORT;
1914                         else if (port > 65535)
1915                                 goto bad;
1916                 } else {
1917                         port = CEPH_MON_PORT;
1918                 }
1919
1920                 addr_set_port(ss, port);
1921
1922                 dout("parse_ips got %s\n", ceph_pr_addr(ss));
1923
1924                 if (p == end)
1925                         break;
1926                 if (*p != ',')
1927                         goto bad;
1928                 p++;
1929         }
1930
1931         if (p != end)
1932                 goto bad;
1933
1934         if (count)
1935                 *count = i + 1;
1936         return 0;
1937
1938 bad:
1939         pr_err("parse_ips bad ip '%.*s'\n", (int)(end - c), c);
1940         return ret;
1941 }
1942 EXPORT_SYMBOL(ceph_parse_ips);
1943
1944 static int process_banner(struct ceph_connection *con)
1945 {
1946         dout("process_banner on %p\n", con);
1947
1948         if (verify_hello(con) < 0)
1949                 return -1;
1950
1951         ceph_decode_addr(&con->actual_peer_addr);
1952         ceph_decode_addr(&con->peer_addr_for_me);
1953
1954         /*
1955          * Make sure the other end is who we wanted.  note that the other
1956          * end may not yet know their ip address, so if it's 0.0.0.0, give
1957          * them the benefit of the doubt.
1958          */
1959         if (memcmp(&con->peer_addr, &con->actual_peer_addr,
1960                    sizeof(con->peer_addr)) != 0 &&
1961             !(addr_is_blank(&con->actual_peer_addr.in_addr) &&
1962               con->actual_peer_addr.nonce == con->peer_addr.nonce)) {
1963                 pr_warn("wrong peer, want %s/%d, got %s/%d\n",
1964                         ceph_pr_addr(&con->peer_addr.in_addr),
1965                         (int)le32_to_cpu(con->peer_addr.nonce),
1966                         ceph_pr_addr(&con->actual_peer_addr.in_addr),
1967                         (int)le32_to_cpu(con->actual_peer_addr.nonce));
1968                 con->error_msg = "wrong peer at address";
1969                 return -1;
1970         }
1971
1972         /*
1973          * did we learn our address?
1974          */
1975         if (addr_is_blank(&con->msgr->inst.addr.in_addr)) {
1976                 int port = addr_port(&con->msgr->inst.addr.in_addr);
1977
1978                 memcpy(&con->msgr->inst.addr.in_addr,
1979                        &con->peer_addr_for_me.in_addr,
1980                        sizeof(con->peer_addr_for_me.in_addr));
1981                 addr_set_port(&con->msgr->inst.addr.in_addr, port);
1982                 encode_my_addr(con->msgr);
1983                 dout("process_banner learned my addr is %s\n",
1984                      ceph_pr_addr(&con->msgr->inst.addr.in_addr));
1985         }
1986
1987         return 0;
1988 }
1989
1990 static int process_connect(struct ceph_connection *con)
1991 {
1992         u64 sup_feat = con->msgr->supported_features;
1993         u64 req_feat = con->msgr->required_features;
1994         u64 server_feat = ceph_sanitize_features(
1995                                 le64_to_cpu(con->in_reply.features));
1996         int ret;
1997
1998         dout("process_connect on %p tag %d\n", con, (int)con->in_tag);
1999
2000         switch (con->in_reply.tag) {
2001         case CEPH_MSGR_TAG_FEATURES:
2002                 pr_err("%s%lld %s feature set mismatch,"
2003                        " my %llx < server's %llx, missing %llx\n",
2004                        ENTITY_NAME(con->peer_name),
2005                        ceph_pr_addr(&con->peer_addr.in_addr),
2006                        sup_feat, server_feat, server_feat & ~sup_feat);
2007                 con->error_msg = "missing required protocol features";
2008                 reset_connection(con);
2009                 return -1;
2010
2011         case CEPH_MSGR_TAG_BADPROTOVER:
2012                 pr_err("%s%lld %s protocol version mismatch,"
2013                        " my %d != server's %d\n",
2014                        ENTITY_NAME(con->peer_name),
2015                        ceph_pr_addr(&con->peer_addr.in_addr),
2016                        le32_to_cpu(con->out_connect.protocol_version),
2017                        le32_to_cpu(con->in_reply.protocol_version));
2018                 con->error_msg = "protocol version mismatch";
2019                 reset_connection(con);
2020                 return -1;
2021
2022         case CEPH_MSGR_TAG_BADAUTHORIZER:
2023                 con->auth_retry++;
2024                 dout("process_connect %p got BADAUTHORIZER attempt %d\n", con,
2025                      con->auth_retry);
2026                 if (con->auth_retry == 2) {
2027                         con->error_msg = "connect authorization failure";
2028                         return -1;
2029                 }
2030                 con_out_kvec_reset(con);
2031                 ret = prepare_write_connect(con);
2032                 if (ret < 0)
2033                         return ret;
2034                 prepare_read_connect(con);
2035                 break;
2036
2037         case CEPH_MSGR_TAG_RESETSESSION:
2038                 /*
2039                  * If we connected with a large connect_seq but the peer
2040                  * has no record of a session with us (no connection, or
2041                  * connect_seq == 0), they will send RESETSESION to indicate
2042                  * that they must have reset their session, and may have
2043                  * dropped messages.
2044                  */
2045                 dout("process_connect got RESET peer seq %u\n",
2046                      le32_to_cpu(con->in_reply.connect_seq));
2047                 pr_err("%s%lld %s connection reset\n",
2048                        ENTITY_NAME(con->peer_name),
2049                        ceph_pr_addr(&con->peer_addr.in_addr));
2050                 reset_connection(con);
2051                 con_out_kvec_reset(con);
2052                 ret = prepare_write_connect(con);
2053                 if (ret < 0)
2054                         return ret;
2055                 prepare_read_connect(con);
2056
2057                 /* Tell ceph about it. */
2058                 mutex_unlock(&con->mutex);
2059                 pr_info("reset on %s%lld\n", ENTITY_NAME(con->peer_name));
2060                 if (con->ops->peer_reset)
2061                         con->ops->peer_reset(con);
2062                 mutex_lock(&con->mutex);
2063                 if (con->state != CON_STATE_NEGOTIATING)
2064                         return -EAGAIN;
2065                 break;
2066
2067         case CEPH_MSGR_TAG_RETRY_SESSION:
2068                 /*
2069                  * If we sent a smaller connect_seq than the peer has, try
2070                  * again with a larger value.
2071                  */
2072                 dout("process_connect got RETRY_SESSION my seq %u, peer %u\n",
2073                      le32_to_cpu(con->out_connect.connect_seq),
2074                      le32_to_cpu(con->in_reply.connect_seq));
2075                 con->connect_seq = le32_to_cpu(con->in_reply.connect_seq);
2076                 con_out_kvec_reset(con);
2077                 ret = prepare_write_connect(con);
2078                 if (ret < 0)
2079                         return ret;
2080                 prepare_read_connect(con);
2081                 break;
2082
2083         case CEPH_MSGR_TAG_RETRY_GLOBAL:
2084                 /*
2085                  * If we sent a smaller global_seq than the peer has, try
2086                  * again with a larger value.
2087                  */
2088                 dout("process_connect got RETRY_GLOBAL my %u peer_gseq %u\n",
2089                      con->peer_global_seq,
2090                      le32_to_cpu(con->in_reply.global_seq));
2091                 get_global_seq(con->msgr,
2092                                le32_to_cpu(con->in_reply.global_seq));
2093                 con_out_kvec_reset(con);
2094                 ret = prepare_write_connect(con);
2095                 if (ret < 0)
2096                         return ret;
2097                 prepare_read_connect(con);
2098                 break;
2099
2100         case CEPH_MSGR_TAG_SEQ:
2101         case CEPH_MSGR_TAG_READY:
2102                 if (req_feat & ~server_feat) {
2103                         pr_err("%s%lld %s protocol feature mismatch,"
2104                                " my required %llx > server's %llx, need %llx\n",
2105                                ENTITY_NAME(con->peer_name),
2106                                ceph_pr_addr(&con->peer_addr.in_addr),
2107                                req_feat, server_feat, req_feat & ~server_feat);
2108                         con->error_msg = "missing required protocol features";
2109                         reset_connection(con);
2110                         return -1;
2111                 }
2112
2113                 WARN_ON(con->state != CON_STATE_NEGOTIATING);
2114                 con->state = CON_STATE_OPEN;
2115                 con->auth_retry = 0;    /* we authenticated; clear flag */
2116                 con->peer_global_seq = le32_to_cpu(con->in_reply.global_seq);
2117                 con->connect_seq++;
2118                 con->peer_features = server_feat;
2119                 dout("process_connect got READY gseq %d cseq %d (%d)\n",
2120                      con->peer_global_seq,
2121                      le32_to_cpu(con->in_reply.connect_seq),
2122                      con->connect_seq);
2123                 WARN_ON(con->connect_seq !=
2124                         le32_to_cpu(con->in_reply.connect_seq));
2125
2126                 if (con->in_reply.flags & CEPH_MSG_CONNECT_LOSSY)
2127                         con_flag_set(con, CON_FLAG_LOSSYTX);
2128
2129                 con->delay = 0;      /* reset backoff memory */
2130
2131                 if (con->in_reply.tag == CEPH_MSGR_TAG_SEQ) {
2132                         prepare_write_seq(con);
2133                         prepare_read_seq(con);
2134                 } else {
2135                         prepare_read_tag(con);
2136                 }
2137                 break;
2138
2139         case CEPH_MSGR_TAG_WAIT:
2140                 /*
2141                  * If there is a connection race (we are opening
2142                  * connections to each other), one of us may just have
2143                  * to WAIT.  This shouldn't happen if we are the
2144                  * client.
2145                  */
2146                 con->error_msg = "protocol error, got WAIT as client";
2147                 return -1;
2148
2149         default:
2150                 con->error_msg = "protocol error, garbage tag during connect";
2151                 return -1;
2152         }
2153         return 0;
2154 }
2155
2156
2157 /*
2158  * read (part of) an ack
2159  */
2160 static int read_partial_ack(struct ceph_connection *con)
2161 {
2162         int size = sizeof (con->in_temp_ack);
2163         int end = size;
2164
2165         return read_partial(con, end, size, &con->in_temp_ack);
2166 }
2167
2168 /*
2169  * We can finally discard anything that's been acked.
2170  */
2171 static void process_ack(struct ceph_connection *con)
2172 {
2173         struct ceph_msg *m;
2174         u64 ack = le64_to_cpu(con->in_temp_ack);
2175         u64 seq;
2176
2177         while (!list_empty(&con->out_sent)) {
2178                 m = list_first_entry(&con->out_sent, struct ceph_msg,
2179                                      list_head);
2180                 seq = le64_to_cpu(m->hdr.seq);
2181                 if (seq > ack)
2182                         break;
2183                 dout("got ack for seq %llu type %d at %p\n", seq,
2184                      le16_to_cpu(m->hdr.type), m);
2185                 m->ack_stamp = jiffies;
2186                 ceph_msg_remove(m);
2187         }
2188         prepare_read_tag(con);
2189 }
2190
2191
2192 static int read_partial_message_section(struct ceph_connection *con,
2193                                         struct kvec *section,
2194                                         unsigned int sec_len, u32 *crc)
2195 {
2196         int ret, left;
2197
2198         BUG_ON(!section);
2199
2200         while (section->iov_len < sec_len) {
2201                 BUG_ON(section->iov_base == NULL);
2202                 left = sec_len - section->iov_len;
2203                 ret = ceph_tcp_recvmsg(con->sock, (char *)section->iov_base +
2204                                        section->iov_len, left);
2205                 if (ret <= 0)
2206                         return ret;
2207                 section->iov_len += ret;
2208         }
2209         if (section->iov_len == sec_len)
2210                 *crc = crc32c(0, section->iov_base, section->iov_len);
2211
2212         return 1;
2213 }
2214
2215 static int read_partial_msg_data(struct ceph_connection *con)
2216 {
2217         struct ceph_msg *msg = con->in_msg;
2218         struct ceph_msg_data_cursor *cursor = &msg->cursor;
2219         const bool do_datacrc = !con->msgr->nocrc;
2220         struct page *page;
2221         size_t page_offset;
2222         size_t length;
2223         u32 crc = 0;
2224         int ret;
2225
2226         BUG_ON(!msg);
2227         if (list_empty(&msg->data))
2228                 return -EIO;
2229
2230         if (do_datacrc)
2231                 crc = con->in_data_crc;
2232         while (cursor->resid) {
2233                 page = ceph_msg_data_next(&msg->cursor, &page_offset, &length,
2234                                                         NULL);
2235                 ret = ceph_tcp_recvpage(con->sock, page, page_offset, length);
2236                 if (ret <= 0) {
2237                         if (do_datacrc)
2238                                 con->in_data_crc = crc;
2239
2240                         return ret;
2241                 }
2242
2243                 if (do_datacrc)
2244                         crc = ceph_crc32c_page(crc, page, page_offset, ret);
2245                 (void) ceph_msg_data_advance(&msg->cursor, (size_t)ret);
2246         }
2247         if (do_datacrc)
2248                 con->in_data_crc = crc;
2249
2250         return 1;       /* must return > 0 to indicate success */
2251 }
2252
2253 /*
2254  * read (part of) a message.
2255  */
2256 static int ceph_con_in_msg_alloc(struct ceph_connection *con, int *skip);
2257
2258 static int read_partial_message(struct ceph_connection *con)
2259 {
2260         struct ceph_msg *m = con->in_msg;
2261         int size;
2262         int end;
2263         int ret;
2264         unsigned int front_len, middle_len, data_len;
2265         bool do_datacrc = !con->msgr->nocrc;
2266         bool need_sign = (con->peer_features & CEPH_FEATURE_MSG_AUTH);
2267         u64 seq;
2268         u32 crc;
2269
2270         dout("read_partial_message con %p msg %p\n", con, m);
2271
2272         /* header */
2273         size = sizeof (con->in_hdr);
2274         end = size;
2275         ret = read_partial(con, end, size, &con->in_hdr);
2276         if (ret <= 0)
2277                 return ret;
2278
2279         crc = crc32c(0, &con->in_hdr, offsetof(struct ceph_msg_header, crc));
2280         if (cpu_to_le32(crc) != con->in_hdr.crc) {
2281                 pr_err("read_partial_message bad hdr crc %u != expected %u\n",
2282                        crc, con->in_hdr.crc);
2283                 return -EBADMSG;
2284         }
2285
2286         front_len = le32_to_cpu(con->in_hdr.front_len);
2287         if (front_len > CEPH_MSG_MAX_FRONT_LEN)
2288                 return -EIO;
2289         middle_len = le32_to_cpu(con->in_hdr.middle_len);
2290         if (middle_len > CEPH_MSG_MAX_MIDDLE_LEN)
2291                 return -EIO;
2292         data_len = le32_to_cpu(con->in_hdr.data_len);
2293         if (data_len > CEPH_MSG_MAX_DATA_LEN)
2294                 return -EIO;
2295
2296         /* verify seq# */
2297         seq = le64_to_cpu(con->in_hdr.seq);
2298         if ((s64)seq - (s64)con->in_seq < 1) {
2299                 pr_info("skipping %s%lld %s seq %lld expected %lld\n",
2300                         ENTITY_NAME(con->peer_name),
2301                         ceph_pr_addr(&con->peer_addr.in_addr),
2302                         seq, con->in_seq + 1);
2303                 con->in_base_pos = -front_len - middle_len - data_len -
2304                         sizeof(m->footer);
2305                 con->in_tag = CEPH_MSGR_TAG_READY;
2306                 return 0;
2307         } else if ((s64)seq - (s64)con->in_seq > 1) {
2308                 pr_err("read_partial_message bad seq %lld expected %lld\n",
2309                        seq, con->in_seq + 1);
2310                 con->error_msg = "bad message sequence # for incoming message";
2311                 return -EBADE;
2312         }
2313
2314         /* allocate message? */
2315         if (!con->in_msg) {
2316                 int skip = 0;
2317
2318                 dout("got hdr type %d front %d data %d\n", con->in_hdr.type,
2319                      front_len, data_len);
2320                 ret = ceph_con_in_msg_alloc(con, &skip);
2321                 if (ret < 0)
2322                         return ret;
2323
2324                 BUG_ON(!con->in_msg ^ skip);
2325                 if (con->in_msg && data_len > con->in_msg->data_length) {
2326                         pr_warn("%s skipping long message (%u > %zd)\n",
2327                                 __func__, data_len, con->in_msg->data_length);
2328                         ceph_msg_put(con->in_msg);
2329                         con->in_msg = NULL;
2330                         skip = 1;
2331                 }
2332                 if (skip) {
2333                         /* skip this message */
2334                         dout("alloc_msg said skip message\n");
2335                         con->in_base_pos = -front_len - middle_len - data_len -
2336                                 sizeof(m->footer);
2337                         con->in_tag = CEPH_MSGR_TAG_READY;
2338                         con->in_seq++;
2339                         return 0;
2340                 }
2341
2342                 BUG_ON(!con->in_msg);
2343                 BUG_ON(con->in_msg->con != con);
2344                 m = con->in_msg;
2345                 m->front.iov_len = 0;    /* haven't read it yet */
2346                 if (m->middle)
2347                         m->middle->vec.iov_len = 0;
2348
2349                 /* prepare for data payload, if any */
2350
2351                 if (data_len)
2352                         prepare_message_data(con->in_msg, data_len);
2353         }
2354
2355         /* front */
2356         ret = read_partial_message_section(con, &m->front, front_len,
2357                                            &con->in_front_crc);
2358         if (ret <= 0)
2359                 return ret;
2360
2361         /* middle */
2362         if (m->middle) {
2363                 ret = read_partial_message_section(con, &m->middle->vec,
2364                                                    middle_len,
2365                                                    &con->in_middle_crc);
2366                 if (ret <= 0)
2367                         return ret;
2368         }
2369
2370         /* (page) data */
2371         if (data_len) {
2372                 ret = read_partial_msg_data(con);
2373                 if (ret <= 0)
2374                         return ret;
2375         }
2376
2377         /* footer */
2378         if (need_sign)
2379                 size = sizeof(m->footer);
2380         else
2381                 size = sizeof(m->old_footer);
2382
2383         end += size;
2384         ret = read_partial(con, end, size, &m->footer);
2385         if (ret <= 0)
2386                 return ret;
2387
2388         if (!need_sign) {
2389                 m->footer.flags = m->old_footer.flags;
2390                 m->footer.sig = 0;
2391         }
2392
2393         dout("read_partial_message got msg %p %d (%u) + %d (%u) + %d (%u)\n",
2394              m, front_len, m->footer.front_crc, middle_len,
2395              m->footer.middle_crc, data_len, m->footer.data_crc);
2396
2397         /* crc ok? */
2398         if (con->in_front_crc != le32_to_cpu(m->footer.front_crc)) {
2399                 pr_err("read_partial_message %p front crc %u != exp. %u\n",
2400                        m, con->in_front_crc, m->footer.front_crc);
2401                 return -EBADMSG;
2402         }
2403         if (con->in_middle_crc != le32_to_cpu(m->footer.middle_crc)) {
2404                 pr_err("read_partial_message %p middle crc %u != exp %u\n",
2405                        m, con->in_middle_crc, m->footer.middle_crc);
2406                 return -EBADMSG;
2407         }
2408         if (do_datacrc &&
2409             (m->footer.flags & CEPH_MSG_FOOTER_NOCRC) == 0 &&
2410             con->in_data_crc != le32_to_cpu(m->footer.data_crc)) {
2411                 pr_err("read_partial_message %p data crc %u != exp. %u\n", m,
2412                        con->in_data_crc, le32_to_cpu(m->footer.data_crc));
2413                 return -EBADMSG;
2414         }
2415
2416         if (need_sign && con->ops->check_message_signature &&
2417             con->ops->check_message_signature(con, m)) {
2418                 pr_err("read_partial_message %p signature check failed\n", m);
2419                 return -EBADMSG;
2420         }
2421
2422         return 1; /* done! */
2423 }
2424
2425 /*
2426  * Process message.  This happens in the worker thread.  The callback should
2427  * be careful not to do anything that waits on other incoming messages or it
2428  * may deadlock.
2429  */
2430 static void process_message(struct ceph_connection *con)
2431 {
2432         struct ceph_msg *msg;
2433
2434         BUG_ON(con->in_msg->con != con);
2435         con->in_msg->con = NULL;
2436         msg = con->in_msg;
2437         con->in_msg = NULL;
2438         con->ops->put(con);
2439
2440         /* if first message, set peer_name */
2441         if (con->peer_name.type == 0)
2442                 con->peer_name = msg->hdr.src;
2443
2444         con->in_seq++;
2445         mutex_unlock(&con->mutex);
2446
2447         dout("===== %p %llu from %s%lld %d=%s len %d+%d (%u %u %u) =====\n",
2448              msg, le64_to_cpu(msg->hdr.seq),
2449              ENTITY_NAME(msg->hdr.src),
2450              le16_to_cpu(msg->hdr.type),
2451              ceph_msg_type_name(le16_to_cpu(msg->hdr.type)),
2452              le32_to_cpu(msg->hdr.front_len),
2453              le32_to_cpu(msg->hdr.data_len),
2454              con->in_front_crc, con->in_middle_crc, con->in_data_crc);
2455         con->ops->dispatch(con, msg);
2456
2457         mutex_lock(&con->mutex);
2458 }
2459
2460
2461 /*
2462  * Write something to the socket.  Called in a worker thread when the
2463  * socket appears to be writeable and we have something ready to send.
2464  */
2465 static int try_write(struct ceph_connection *con)
2466 {
2467         int ret = 1;
2468
2469         dout("try_write start %p state %lu\n", con, con->state);
2470
2471 more:
2472         dout("try_write out_kvec_bytes %d\n", con->out_kvec_bytes);
2473
2474         /* open the socket first? */
2475         if (con->state == CON_STATE_PREOPEN) {
2476                 BUG_ON(con->sock);
2477                 con->state = CON_STATE_CONNECTING;
2478
2479                 con_out_kvec_reset(con);
2480                 prepare_write_banner(con);
2481                 prepare_read_banner(con);
2482
2483                 BUG_ON(con->in_msg);
2484                 con->in_tag = CEPH_MSGR_TAG_READY;
2485                 dout("try_write initiating connect on %p new state %lu\n",
2486                      con, con->state);
2487                 ret = ceph_tcp_connect(con);
2488                 if (ret < 0) {
2489                         con->error_msg = "connect error";
2490                         goto out;
2491                 }
2492         }
2493
2494 more_kvec:
2495         /* kvec data queued? */
2496         if (con->out_skip) {
2497                 ret = write_partial_skip(con);
2498                 if (ret <= 0)
2499                         goto out;
2500         }
2501         if (con->out_kvec_left) {
2502                 ret = write_partial_kvec(con);
2503                 if (ret <= 0)
2504                         goto out;
2505         }
2506
2507         /* msg pages? */
2508         if (con->out_msg) {
2509                 if (con->out_msg_done) {
2510                         ceph_msg_put(con->out_msg);
2511                         con->out_msg = NULL;   /* we're done with this one */
2512                         goto do_next;
2513                 }
2514
2515                 ret = write_partial_message_data(con);
2516                 if (ret == 1)
2517                         goto more_kvec;  /* we need to send the footer, too! */
2518                 if (ret == 0)
2519                         goto out;
2520                 if (ret < 0) {
2521                         dout("try_write write_partial_message_data err %d\n",
2522                              ret);
2523                         goto out;
2524                 }
2525         }
2526
2527 do_next:
2528         if (con->state == CON_STATE_OPEN) {
2529                 /* is anything else pending? */
2530                 if (!list_empty(&con->out_queue)) {
2531                         prepare_write_message(con);
2532                         goto more;
2533                 }
2534                 if (con->in_seq > con->in_seq_acked) {
2535                         prepare_write_ack(con);
2536                         goto more;
2537                 }
2538                 if (con_flag_test_and_clear(con, CON_FLAG_KEEPALIVE_PENDING)) {
2539                         prepare_write_keepalive(con);
2540                         goto more;
2541                 }
2542         }
2543
2544         /* Nothing to do! */
2545         con_flag_clear(con, CON_FLAG_WRITE_PENDING);
2546         dout("try_write nothing else to write.\n");
2547         ret = 0;
2548 out:
2549         dout("try_write done on %p ret %d\n", con, ret);
2550         return ret;
2551 }
2552
2553
2554
2555 /*
2556  * Read what we can from the socket.
2557  */
2558 static int try_read(struct ceph_connection *con)
2559 {
2560         int ret = -1;
2561
2562 more:
2563         dout("try_read start on %p state %lu\n", con, con->state);
2564         if (con->state != CON_STATE_CONNECTING &&
2565             con->state != CON_STATE_NEGOTIATING &&
2566             con->state != CON_STATE_OPEN)
2567                 return 0;
2568
2569         BUG_ON(!con->sock);
2570
2571         dout("try_read tag %d in_base_pos %d\n", (int)con->in_tag,
2572              con->in_base_pos);
2573
2574         if (con->state == CON_STATE_CONNECTING) {
2575                 dout("try_read connecting\n");
2576                 ret = read_partial_banner(con);
2577                 if (ret <= 0)
2578                         goto out;
2579                 ret = process_banner(con);
2580                 if (ret < 0)
2581                         goto out;
2582
2583                 con->state = CON_STATE_NEGOTIATING;
2584
2585                 /*
2586                  * Received banner is good, exchange connection info.
2587                  * Do not reset out_kvec, as sending our banner raced
2588                  * with receiving peer banner after connect completed.
2589                  */
2590                 ret = prepare_write_connect(con);
2591                 if (ret < 0)
2592                         goto out;
2593                 prepare_read_connect(con);
2594
2595                 /* Send connection info before awaiting response */
2596                 goto out;
2597         }
2598
2599         if (con->state == CON_STATE_NEGOTIATING) {
2600                 dout("try_read negotiating\n");
2601                 ret = read_partial_connect(con);
2602                 if (ret <= 0)
2603                         goto out;
2604                 ret = process_connect(con);
2605                 if (ret < 0)
2606                         goto out;
2607                 goto more;
2608         }
2609
2610         WARN_ON(con->state != CON_STATE_OPEN);
2611
2612         if (con->in_base_pos < 0) {
2613                 /*
2614                  * skipping + discarding content.
2615                  *
2616                  * FIXME: there must be a better way to do this!
2617                  */
2618                 static char buf[SKIP_BUF_SIZE];
2619                 int skip = min((int) sizeof (buf), -con->in_base_pos);
2620
2621                 dout("skipping %d / %d bytes\n", skip, -con->in_base_pos);
2622                 ret = ceph_tcp_recvmsg(con->sock, buf, skip);
2623                 if (ret <= 0)
2624                         goto out;
2625                 con->in_base_pos += ret;
2626                 if (con->in_base_pos)
2627                         goto more;
2628         }
2629         if (con->in_tag == CEPH_MSGR_TAG_READY) {
2630                 /*
2631                  * what's next?
2632                  */
2633                 ret = ceph_tcp_recvmsg(con->sock, &con->in_tag, 1);
2634                 if (ret <= 0)
2635                         goto out;
2636                 dout("try_read got tag %d\n", (int)con->in_tag);
2637                 switch (con->in_tag) {
2638                 case CEPH_MSGR_TAG_MSG:
2639                         prepare_read_message(con);
2640                         break;
2641                 case CEPH_MSGR_TAG_ACK:
2642                         prepare_read_ack(con);
2643                         break;
2644                 case CEPH_MSGR_TAG_CLOSE:
2645                         con_close_socket(con);
2646                         con->state = CON_STATE_CLOSED;
2647                         goto out;
2648                 default:
2649                         goto bad_tag;
2650                 }
2651         }
2652         if (con->in_tag == CEPH_MSGR_TAG_MSG) {
2653                 ret = read_partial_message(con);
2654                 if (ret <= 0) {
2655                         switch (ret) {
2656                         case -EBADMSG:
2657                                 con->error_msg = "bad crc";
2658                                 /* fall through */
2659                         case -EBADE:
2660                                 ret = -EIO;
2661                                 break;
2662                         case -EIO:
2663                                 con->error_msg = "io error";
2664                                 break;
2665                         }
2666                         goto out;
2667                 }
2668                 if (con->in_tag == CEPH_MSGR_TAG_READY)
2669                         goto more;
2670                 process_message(con);
2671                 if (con->state == CON_STATE_OPEN)
2672                         prepare_read_tag(con);
2673                 goto more;
2674         }
2675         if (con->in_tag == CEPH_MSGR_TAG_ACK ||
2676             con->in_tag == CEPH_MSGR_TAG_SEQ) {
2677                 /*
2678                  * the final handshake seq exchange is semantically
2679                  * equivalent to an ACK
2680                  */
2681                 ret = read_partial_ack(con);
2682                 if (ret <= 0)
2683                         goto out;
2684                 process_ack(con);
2685                 goto more;
2686         }
2687
2688 out:
2689         dout("try_read done on %p ret %d\n", con, ret);
2690         return ret;
2691
2692 bad_tag:
2693         pr_err("try_read bad con->in_tag = %d\n", (int)con->in_tag);
2694         con->error_msg = "protocol error, garbage tag";
2695         ret = -1;
2696         goto out;
2697 }
2698
2699
2700 /*
2701  * Atomically queue work on a connection after the specified delay.
2702  * Bump @con reference to avoid races with connection teardown.
2703  * Returns 0 if work was queued, or an error code otherwise.
2704  */
2705 static int queue_con_delay(struct ceph_connection *con, unsigned long delay)
2706 {
2707         if (!con->ops->get(con)) {
2708                 dout("%s %p ref count 0\n", __func__, con);
2709                 return -ENOENT;
2710         }
2711
2712         if (!queue_delayed_work(ceph_msgr_wq, &con->work, delay)) {
2713                 dout("%s %p - already queued\n", __func__, con);
2714                 con->ops->put(con);
2715                 return -EBUSY;
2716         }
2717
2718         dout("%s %p %lu\n", __func__, con, delay);
2719         return 0;
2720 }
2721
2722 static void queue_con(struct ceph_connection *con)
2723 {
2724         (void) queue_con_delay(con, 0);
2725 }
2726
2727 static void cancel_con(struct ceph_connection *con)
2728 {
2729         if (cancel_delayed_work(&con->work)) {
2730                 dout("%s %p\n", __func__, con);
2731                 con->ops->put(con);
2732         }
2733 }
2734
2735 static bool con_sock_closed(struct ceph_connection *con)
2736 {
2737         if (!con_flag_test_and_clear(con, CON_FLAG_SOCK_CLOSED))
2738                 return false;
2739
2740 #define CASE(x)                                                         \
2741         case CON_STATE_ ## x:                                           \
2742                 con->error_msg = "socket closed (con state " #x ")";    \
2743                 break;
2744
2745         switch (con->state) {
2746         CASE(CLOSED);
2747         CASE(PREOPEN);
2748         CASE(CONNECTING);
2749         CASE(NEGOTIATING);
2750         CASE(OPEN);
2751         CASE(STANDBY);
2752         default:
2753                 pr_warn("%s con %p unrecognized state %lu\n",
2754                         __func__, con, con->state);
2755                 con->error_msg = "unrecognized con state";
2756                 BUG();
2757                 break;
2758         }
2759 #undef CASE
2760
2761         return true;
2762 }
2763
2764 static bool con_backoff(struct ceph_connection *con)
2765 {
2766         int ret;
2767
2768         if (!con_flag_test_and_clear(con, CON_FLAG_BACKOFF))
2769                 return false;
2770
2771         ret = queue_con_delay(con, round_jiffies_relative(con->delay));
2772         if (ret) {
2773                 dout("%s: con %p FAILED to back off %lu\n", __func__,
2774                         con, con->delay);
2775                 BUG_ON(ret == -ENOENT);
2776                 con_flag_set(con, CON_FLAG_BACKOFF);
2777         }
2778
2779         return true;
2780 }
2781
2782 /* Finish fault handling; con->mutex must *not* be held here */
2783
2784 static void con_fault_finish(struct ceph_connection *con)
2785 {
2786         /*
2787          * in case we faulted due to authentication, invalidate our
2788          * current tickets so that we can get new ones.
2789          */
2790         if (con->auth_retry && con->ops->invalidate_authorizer) {
2791                 dout("calling invalidate_authorizer()\n");
2792                 con->ops->invalidate_authorizer(con);
2793         }
2794
2795         if (con->ops->fault)
2796                 con->ops->fault(con);
2797 }
2798
2799 /*
2800  * Do some work on a connection.  Drop a connection ref when we're done.
2801  */
2802 static void con_work(struct work_struct *work)
2803 {
2804         struct ceph_connection *con = container_of(work, struct ceph_connection,
2805                                                    work.work);
2806         bool fault;
2807
2808         mutex_lock(&con->mutex);
2809         while (true) {
2810                 int ret;
2811
2812                 if ((fault = con_sock_closed(con))) {
2813                         dout("%s: con %p SOCK_CLOSED\n", __func__, con);
2814                         break;
2815                 }
2816                 if (con_backoff(con)) {
2817                         dout("%s: con %p BACKOFF\n", __func__, con);
2818                         break;
2819                 }
2820                 if (con->state == CON_STATE_STANDBY) {
2821                         dout("%s: con %p STANDBY\n", __func__, con);
2822                         break;
2823                 }
2824                 if (con->state == CON_STATE_CLOSED) {
2825                         dout("%s: con %p CLOSED\n", __func__, con);
2826                         BUG_ON(con->sock);
2827                         break;
2828                 }
2829                 if (con->state == CON_STATE_PREOPEN) {
2830                         dout("%s: con %p PREOPEN\n", __func__, con);
2831                         BUG_ON(con->sock);
2832                 }
2833
2834                 ret = try_read(con);
2835                 if (ret < 0) {
2836                         if (ret == -EAGAIN)
2837                                 continue;
2838                         if (!con->error_msg)
2839                                 con->error_msg = "socket error on read";
2840                         fault = true;
2841                         break;
2842                 }
2843
2844                 ret = try_write(con);
2845                 if (ret < 0) {
2846                         if (ret == -EAGAIN)
2847                                 continue;
2848                         if (!con->error_msg)
2849                                 con->error_msg = "socket error on write";
2850                         fault = true;
2851                 }
2852
2853                 break;  /* If we make it to here, we're done */
2854         }
2855         if (fault)
2856                 con_fault(con);
2857         mutex_unlock(&con->mutex);
2858
2859         if (fault)
2860                 con_fault_finish(con);
2861
2862         con->ops->put(con);
2863 }
2864
2865 /*
2866  * Generic error/fault handler.  A retry mechanism is used with
2867  * exponential backoff
2868  */
2869 static void con_fault(struct ceph_connection *con)
2870 {
2871         dout("fault %p state %lu to peer %s\n",
2872              con, con->state, ceph_pr_addr(&con->peer_addr.in_addr));
2873
2874         pr_warn("%s%lld %s %s\n", ENTITY_NAME(con->peer_name),
2875                 ceph_pr_addr(&con->peer_addr.in_addr), con->error_msg);
2876         con->error_msg = NULL;
2877
2878         WARN_ON(con->state != CON_STATE_CONNECTING &&
2879                con->state != CON_STATE_NEGOTIATING &&
2880                con->state != CON_STATE_OPEN);
2881
2882         con_close_socket(con);
2883
2884         if (con_flag_test(con, CON_FLAG_LOSSYTX)) {
2885                 dout("fault on LOSSYTX channel, marking CLOSED\n");
2886                 con->state = CON_STATE_CLOSED;
2887                 return;
2888         }
2889
2890         if (con->in_msg) {
2891                 BUG_ON(con->in_msg->con != con);
2892                 con->in_msg->con = NULL;
2893                 ceph_msg_put(con->in_msg);
2894                 con->in_msg = NULL;
2895                 con->ops->put(con);
2896         }
2897
2898         /* Requeue anything that hasn't been acked */
2899         list_splice_init(&con->out_sent, &con->out_queue);
2900
2901         /* If there are no messages queued or keepalive pending, place
2902          * the connection in a STANDBY state */
2903         if (list_empty(&con->out_queue) &&
2904             !con_flag_test(con, CON_FLAG_KEEPALIVE_PENDING)) {
2905                 dout("fault %p setting STANDBY clearing WRITE_PENDING\n", con);
2906                 con_flag_clear(con, CON_FLAG_WRITE_PENDING);
2907                 con->state = CON_STATE_STANDBY;
2908         } else {
2909                 /* retry after a delay. */
2910                 con->state = CON_STATE_PREOPEN;
2911                 if (con->delay == 0)
2912                         con->delay = BASE_DELAY_INTERVAL;
2913                 else if (con->delay < MAX_DELAY_INTERVAL)
2914                         con->delay *= 2;
2915                 con_flag_set(con, CON_FLAG_BACKOFF);
2916                 queue_con(con);
2917         }
2918 }
2919
2920
2921
2922 /*
2923  * initialize a new messenger instance
2924  */
2925 void ceph_messenger_init(struct ceph_messenger *msgr,
2926                         struct ceph_entity_addr *myaddr,
2927                         u64 supported_features,
2928                         u64 required_features,
2929                         bool nocrc,
2930                         bool tcp_nodelay)
2931 {
2932         msgr->supported_features = supported_features;
2933         msgr->required_features = required_features;
2934
2935         spin_lock_init(&msgr->global_seq_lock);
2936
2937         if (myaddr)
2938                 msgr->inst.addr = *myaddr;
2939
2940         /* select a random nonce */
2941         msgr->inst.addr.type = 0;
2942         get_random_bytes(&msgr->inst.addr.nonce, sizeof(msgr->inst.addr.nonce));
2943         encode_my_addr(msgr);
2944         msgr->nocrc = nocrc;
2945         msgr->tcp_nodelay = tcp_nodelay;
2946
2947         atomic_set(&msgr->stopping, 0);
2948         write_pnet(&msgr->net, get_net(current->nsproxy->net_ns));
2949
2950         dout("%s %p\n", __func__, msgr);
2951 }
2952 EXPORT_SYMBOL(ceph_messenger_init);
2953
2954 void ceph_messenger_fini(struct ceph_messenger *msgr)
2955 {
2956         put_net(read_pnet(&msgr->net));
2957 }
2958 EXPORT_SYMBOL(ceph_messenger_fini);
2959
2960 static void clear_standby(struct ceph_connection *con)
2961 {
2962         /* come back from STANDBY? */
2963         if (con->state == CON_STATE_STANDBY) {
2964                 dout("clear_standby %p and ++connect_seq\n", con);
2965                 con->state = CON_STATE_PREOPEN;
2966                 con->connect_seq++;
2967                 WARN_ON(con_flag_test(con, CON_FLAG_WRITE_PENDING));
2968                 WARN_ON(con_flag_test(con, CON_FLAG_KEEPALIVE_PENDING));
2969         }
2970 }
2971
2972 /*
2973  * Queue up an outgoing message on the given connection.
2974  */
2975 void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg)
2976 {
2977         /* set src+dst */
2978         msg->hdr.src = con->msgr->inst.name;
2979         BUG_ON(msg->front.iov_len != le32_to_cpu(msg->hdr.front_len));
2980         msg->needs_out_seq = true;
2981
2982         mutex_lock(&con->mutex);
2983
2984         if (con->state == CON_STATE_CLOSED) {
2985                 dout("con_send %p closed, dropping %p\n", con, msg);
2986                 ceph_msg_put(msg);
2987                 mutex_unlock(&con->mutex);
2988                 return;
2989         }
2990
2991         BUG_ON(msg->con != NULL);
2992         msg->con = con->ops->get(con);
2993         BUG_ON(msg->con == NULL);
2994
2995         BUG_ON(!list_empty(&msg->list_head));
2996         list_add_tail(&msg->list_head, &con->out_queue);
2997         dout("----- %p to %s%lld %d=%s len %d+%d+%d -----\n", msg,
2998              ENTITY_NAME(con->peer_name), le16_to_cpu(msg->hdr.type),
2999              ceph_msg_type_name(le16_to_cpu(msg->hdr.type)),
3000              le32_to_cpu(msg->hdr.front_len),
3001              le32_to_cpu(msg->hdr.middle_len),
3002              le32_to_cpu(msg->hdr.data_len));
3003
3004         clear_standby(con);
3005         mutex_unlock(&con->mutex);
3006
3007         /* if there wasn't anything waiting to send before, queue
3008          * new work */
3009         if (con_flag_test_and_set(con, CON_FLAG_WRITE_PENDING) == 0)
3010                 queue_con(con);
3011 }
3012 EXPORT_SYMBOL(ceph_con_send);
3013
3014 /*
3015  * Revoke a message that was previously queued for send
3016  */
3017 void ceph_msg_revoke(struct ceph_msg *msg)
3018 {
3019         struct ceph_connection *con = msg->con;
3020
3021         if (!con)
3022                 return;         /* Message not in our possession */
3023
3024         mutex_lock(&con->mutex);
3025         if (!list_empty(&msg->list_head)) {
3026                 dout("%s %p msg %p - was on queue\n", __func__, con, msg);
3027                 list_del_init(&msg->list_head);
3028                 BUG_ON(msg->con == NULL);
3029                 msg->con->ops->put(msg->con);
3030                 msg->con = NULL;
3031                 msg->hdr.seq = 0;
3032
3033                 ceph_msg_put(msg);
3034         }
3035         if (con->out_msg == msg) {
3036                 dout("%s %p msg %p - was sending\n", __func__, con, msg);
3037                 con->out_msg = NULL;
3038                 if (con->out_kvec_is_msg) {
3039                         con->out_skip = con->out_kvec_bytes;
3040                         con->out_kvec_is_msg = false;
3041                 }
3042                 msg->hdr.seq = 0;
3043
3044                 ceph_msg_put(msg);
3045         }
3046         mutex_unlock(&con->mutex);
3047 }
3048
3049 /*
3050  * Revoke a message that we may be reading data into
3051  */
3052 void ceph_msg_revoke_incoming(struct ceph_msg *msg)
3053 {
3054         struct ceph_connection *con;
3055
3056         BUG_ON(msg == NULL);
3057         if (!msg->con) {
3058                 dout("%s msg %p null con\n", __func__, msg);
3059
3060                 return;         /* Message not in our possession */
3061         }
3062
3063         con = msg->con;
3064         mutex_lock(&con->mutex);
3065         if (con->in_msg == msg) {
3066                 unsigned int front_len = le32_to_cpu(con->in_hdr.front_len);
3067                 unsigned int middle_len = le32_to_cpu(con->in_hdr.middle_len);
3068                 unsigned int data_len = le32_to_cpu(con->in_hdr.data_len);
3069
3070                 /* skip rest of message */
3071                 dout("%s %p msg %p revoked\n", __func__, con, msg);
3072                 con->in_base_pos = con->in_base_pos -
3073                                 sizeof(struct ceph_msg_header) -
3074                                 front_len -
3075                                 middle_len -
3076                                 data_len -
3077                                 sizeof(struct ceph_msg_footer);
3078                 ceph_msg_put(con->in_msg);
3079                 con->in_msg = NULL;
3080                 con->in_tag = CEPH_MSGR_TAG_READY;
3081                 con->in_seq++;
3082         } else {
3083                 dout("%s %p in_msg %p msg %p no-op\n",
3084                      __func__, con, con->in_msg, msg);
3085         }
3086         mutex_unlock(&con->mutex);
3087 }
3088
3089 /*
3090  * Queue a keepalive byte to ensure the tcp connection is alive.
3091  */
3092 void ceph_con_keepalive(struct ceph_connection *con)
3093 {
3094         dout("con_keepalive %p\n", con);
3095         mutex_lock(&con->mutex);
3096         clear_standby(con);
3097         mutex_unlock(&con->mutex);
3098         if (con_flag_test_and_set(con, CON_FLAG_KEEPALIVE_PENDING) == 0 &&
3099             con_flag_test_and_set(con, CON_FLAG_WRITE_PENDING) == 0)
3100                 queue_con(con);
3101 }
3102 EXPORT_SYMBOL(ceph_con_keepalive);
3103
3104 static struct ceph_msg_data *ceph_msg_data_create(enum ceph_msg_data_type type)
3105 {
3106         struct ceph_msg_data *data;
3107
3108         if (WARN_ON(!ceph_msg_data_type_valid(type)))
3109                 return NULL;
3110
3111         data = kmem_cache_zalloc(ceph_msg_data_cache, GFP_NOFS);
3112         if (data)
3113                 data->type = type;
3114         INIT_LIST_HEAD(&data->links);
3115
3116         return data;
3117 }
3118
3119 static void ceph_msg_data_destroy(struct ceph_msg_data *data)
3120 {
3121         if (!data)
3122                 return;
3123
3124         WARN_ON(!list_empty(&data->links));
3125         if (data->type == CEPH_MSG_DATA_PAGELIST)
3126                 ceph_pagelist_release(data->pagelist);
3127         kmem_cache_free(ceph_msg_data_cache, data);
3128 }
3129
3130 void ceph_msg_data_add_pages(struct ceph_msg *msg, struct page **pages,
3131                 size_t length, size_t alignment)
3132 {
3133         struct ceph_msg_data *data;
3134
3135         BUG_ON(!pages);
3136         BUG_ON(!length);
3137
3138         data = ceph_msg_data_create(CEPH_MSG_DATA_PAGES);
3139         BUG_ON(!data);
3140         data->pages = pages;
3141         data->length = length;
3142         data->alignment = alignment & ~PAGE_MASK;
3143
3144         list_add_tail(&data->links, &msg->data);
3145         msg->data_length += length;
3146 }
3147 EXPORT_SYMBOL(ceph_msg_data_add_pages);
3148
3149 void ceph_msg_data_add_pagelist(struct ceph_msg *msg,
3150                                 struct ceph_pagelist *pagelist)
3151 {
3152         struct ceph_msg_data *data;
3153
3154         BUG_ON(!pagelist);
3155         BUG_ON(!pagelist->length);
3156
3157         data = ceph_msg_data_create(CEPH_MSG_DATA_PAGELIST);
3158         BUG_ON(!data);
3159         data->pagelist = pagelist;
3160
3161         list_add_tail(&data->links, &msg->data);
3162         msg->data_length += pagelist->length;
3163 }
3164 EXPORT_SYMBOL(ceph_msg_data_add_pagelist);
3165
3166 #ifdef  CONFIG_BLOCK
3167 void ceph_msg_data_add_bio(struct ceph_msg *msg, struct bio *bio,
3168                 size_t length)
3169 {
3170         struct ceph_msg_data *data;
3171
3172         BUG_ON(!bio);
3173
3174         data = ceph_msg_data_create(CEPH_MSG_DATA_BIO);
3175         BUG_ON(!data);
3176         data->bio = bio;
3177         data->bio_length = length;
3178
3179         list_add_tail(&data->links, &msg->data);
3180         msg->data_length += length;
3181 }
3182 EXPORT_SYMBOL(ceph_msg_data_add_bio);
3183 #endif  /* CONFIG_BLOCK */
3184
3185 /*
3186  * construct a new message with given type, size
3187  * the new msg has a ref count of 1.
3188  */
3189 struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags,
3190                               bool can_fail)
3191 {
3192         struct ceph_msg *m;
3193
3194         m = kmem_cache_zalloc(ceph_msg_cache, flags);
3195         if (m == NULL)
3196                 goto out;
3197
3198         m->hdr.type = cpu_to_le16(type);
3199         m->hdr.priority = cpu_to_le16(CEPH_MSG_PRIO_DEFAULT);
3200         m->hdr.front_len = cpu_to_le32(front_len);
3201
3202         INIT_LIST_HEAD(&m->list_head);
3203         kref_init(&m->kref);
3204         INIT_LIST_HEAD(&m->data);
3205
3206         /* front */
3207         if (front_len) {
3208                 m->front.iov_base = ceph_kvmalloc(front_len, flags);
3209                 if (m->front.iov_base == NULL) {
3210                         dout("ceph_msg_new can't allocate %d bytes\n",
3211                              front_len);
3212                         goto out2;
3213                 }
3214         } else {
3215                 m->front.iov_base = NULL;
3216         }
3217         m->front_alloc_len = m->front.iov_len = front_len;
3218
3219         dout("ceph_msg_new %p front %d\n", m, front_len);
3220         return m;
3221
3222 out2:
3223         ceph_msg_put(m);
3224 out:
3225         if (!can_fail) {
3226                 pr_err("msg_new can't create type %d front %d\n", type,
3227                        front_len);
3228                 WARN_ON(1);
3229         } else {
3230                 dout("msg_new can't create type %d front %d\n", type,
3231                      front_len);
3232         }
3233         return NULL;
3234 }
3235 EXPORT_SYMBOL(ceph_msg_new);
3236
3237 /*
3238  * Allocate "middle" portion of a message, if it is needed and wasn't
3239  * allocated by alloc_msg.  This allows us to read a small fixed-size
3240  * per-type header in the front and then gracefully fail (i.e.,
3241  * propagate the error to the caller based on info in the front) when
3242  * the middle is too large.
3243  */
3244 static int ceph_alloc_middle(struct ceph_connection *con, struct ceph_msg *msg)
3245 {
3246         int type = le16_to_cpu(msg->hdr.type);
3247         int middle_len = le32_to_cpu(msg->hdr.middle_len);
3248
3249         dout("alloc_middle %p type %d %s middle_len %d\n", msg, type,
3250              ceph_msg_type_name(type), middle_len);
3251         BUG_ON(!middle_len);
3252         BUG_ON(msg->middle);
3253
3254         msg->middle = ceph_buffer_new(middle_len, GFP_NOFS);
3255         if (!msg->middle)
3256                 return -ENOMEM;
3257         return 0;
3258 }
3259
3260 /*
3261  * Allocate a message for receiving an incoming message on a
3262  * connection, and save the result in con->in_msg.  Uses the
3263  * connection's private alloc_msg op if available.
3264  *
3265  * Returns 0 on success, or a negative error code.
3266  *
3267  * On success, if we set *skip = 1:
3268  *  - the next message should be skipped and ignored.
3269  *  - con->in_msg == NULL
3270  * or if we set *skip = 0:
3271  *  - con->in_msg is non-null.
3272  * On error (ENOMEM, EAGAIN, ...),
3273  *  - con->in_msg == NULL
3274  */
3275 static int ceph_con_in_msg_alloc(struct ceph_connection *con, int *skip)
3276 {
3277         struct ceph_msg_header *hdr = &con->in_hdr;
3278         int middle_len = le32_to_cpu(hdr->middle_len);
3279         struct ceph_msg *msg;
3280         int ret = 0;
3281
3282         BUG_ON(con->in_msg != NULL);
3283         BUG_ON(!con->ops->alloc_msg);
3284
3285         mutex_unlock(&con->mutex);
3286         msg = con->ops->alloc_msg(con, hdr, skip);
3287         mutex_lock(&con->mutex);
3288         if (con->state != CON_STATE_OPEN) {
3289                 if (msg)
3290                         ceph_msg_put(msg);
3291                 return -EAGAIN;
3292         }
3293         if (msg) {
3294                 BUG_ON(*skip);
3295                 con->in_msg = msg;
3296                 con->in_msg->con = con->ops->get(con);
3297                 BUG_ON(con->in_msg->con == NULL);
3298         } else {
3299                 /*
3300                  * Null message pointer means either we should skip
3301                  * this message or we couldn't allocate memory.  The
3302                  * former is not an error.
3303                  */
3304                 if (*skip)
3305                         return 0;
3306
3307                 con->error_msg = "error allocating memory for incoming message";
3308                 return -ENOMEM;
3309         }
3310         memcpy(&con->in_msg->hdr, &con->in_hdr, sizeof(con->in_hdr));
3311
3312         if (middle_len && !con->in_msg->middle) {
3313                 ret = ceph_alloc_middle(con, con->in_msg);
3314                 if (ret < 0) {
3315                         ceph_msg_put(con->in_msg);
3316                         con->in_msg = NULL;
3317                 }
3318         }
3319
3320         return ret;
3321 }
3322
3323
3324 /*
3325  * Free a generically kmalloc'd message.
3326  */
3327 static void ceph_msg_free(struct ceph_msg *m)
3328 {
3329         dout("%s %p\n", __func__, m);
3330         kvfree(m->front.iov_base);
3331         kmem_cache_free(ceph_msg_cache, m);
3332 }
3333
3334 static void ceph_msg_release(struct kref *kref)
3335 {
3336         struct ceph_msg *m = container_of(kref, struct ceph_msg, kref);
3337         LIST_HEAD(data);
3338         struct list_head *links;
3339         struct list_head *next;
3340
3341         dout("%s %p\n", __func__, m);
3342         WARN_ON(!list_empty(&m->list_head));
3343
3344         /* drop middle, data, if any */
3345         if (m->middle) {
3346                 ceph_buffer_put(m->middle);
3347                 m->middle = NULL;
3348         }
3349
3350         list_splice_init(&m->data, &data);
3351         list_for_each_safe(links, next, &data) {
3352                 struct ceph_msg_data *data;
3353
3354                 data = list_entry(links, struct ceph_msg_data, links);
3355                 list_del_init(links);
3356                 ceph_msg_data_destroy(data);
3357         }
3358         m->data_length = 0;
3359
3360         if (m->pool)
3361                 ceph_msgpool_put(m->pool, m);
3362         else
3363                 ceph_msg_free(m);
3364 }
3365
3366 struct ceph_msg *ceph_msg_get(struct ceph_msg *msg)
3367 {
3368         dout("%s %p (was %d)\n", __func__, msg,
3369              atomic_read(&msg->kref.refcount));
3370         kref_get(&msg->kref);
3371         return msg;
3372 }
3373 EXPORT_SYMBOL(ceph_msg_get);
3374
3375 void ceph_msg_put(struct ceph_msg *msg)
3376 {
3377         dout("%s %p (was %d)\n", __func__, msg,
3378              atomic_read(&msg->kref.refcount));
3379         kref_put(&msg->kref, ceph_msg_release);
3380 }
3381 EXPORT_SYMBOL(ceph_msg_put);
3382
3383 void ceph_msg_dump(struct ceph_msg *msg)
3384 {
3385         pr_debug("msg_dump %p (front_alloc_len %d length %zd)\n", msg,
3386                  msg->front_alloc_len, msg->data_length);
3387         print_hex_dump(KERN_DEBUG, "header: ",
3388                        DUMP_PREFIX_OFFSET, 16, 1,
3389                        &msg->hdr, sizeof(msg->hdr), true);
3390         print_hex_dump(KERN_DEBUG, " front: ",
3391                        DUMP_PREFIX_OFFSET, 16, 1,
3392                        msg->front.iov_base, msg->front.iov_len, true);
3393         if (msg->middle)
3394                 print_hex_dump(KERN_DEBUG, "middle: ",
3395                                DUMP_PREFIX_OFFSET, 16, 1,
3396                                msg->middle->vec.iov_base,
3397                                msg->middle->vec.iov_len, true);
3398         print_hex_dump(KERN_DEBUG, "footer: ",
3399                        DUMP_PREFIX_OFFSET, 16, 1,
3400                        &msg->footer, sizeof(msg->footer), true);
3401 }
3402 EXPORT_SYMBOL(ceph_msg_dump);