adc12e2273032f8eeb9c19b099510562d9607837
[linux.git] / net / tipc / socket.c
1 /*
2  * net/tipc/socket.c: TIPC socket API
3  *
4  * Copyright (c) 2001-2007, 2012-2014, Ericsson AB
5  * Copyright (c) 2004-2008, 2010-2013, Wind River Systems
6  * All rights reserved.
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions are met:
10  *
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in the
15  *    documentation and/or other materials provided with the distribution.
16  * 3. Neither the names of the copyright holders nor the names of its
17  *    contributors may be used to endorse or promote products derived from
18  *    this software without specific prior written permission.
19  *
20  * Alternatively, this software may be distributed under the terms of the
21  * GNU General Public License ("GPL") version 2 as published by the Free
22  * Software Foundation.
23  *
24  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
25  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
26  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
27  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
28  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
29  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
30  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
31  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
32  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
33  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
34  * POSSIBILITY OF SUCH DAMAGE.
35  */
36
37 #include "core.h"
38 #include "port.h"
39
40 #include <linux/export.h>
41
42 #define SS_LISTENING    -1      /* socket is listening */
43 #define SS_READY        -2      /* socket is connectionless */
44
45 #define CONN_TIMEOUT_DEFAULT    8000    /* default connect timeout = 8s */
46
47 static int backlog_rcv(struct sock *sk, struct sk_buff *skb);
48 static void tipc_data_ready(struct sock *sk, int len);
49 static void tipc_write_space(struct sock *sk);
50 static int tipc_release(struct socket *sock);
51 static int tipc_accept(struct socket *sock, struct socket *new_sock, int flags);
52
53 static const struct proto_ops packet_ops;
54 static const struct proto_ops stream_ops;
55 static const struct proto_ops msg_ops;
56
57 static struct proto tipc_proto;
58 static struct proto tipc_proto_kern;
59
60 /*
61  * Revised TIPC socket locking policy:
62  *
63  * Most socket operations take the standard socket lock when they start
64  * and hold it until they finish (or until they need to sleep).  Acquiring
65  * this lock grants the owner exclusive access to the fields of the socket
66  * data structures, with the exception of the backlog queue.  A few socket
67  * operations can be done without taking the socket lock because they only
68  * read socket information that never changes during the life of the socket.
69  *
70  * Socket operations may acquire the lock for the associated TIPC port if they
71  * need to perform an operation on the port.  If any routine needs to acquire
72  * both the socket lock and the port lock it must take the socket lock first
73  * to avoid the risk of deadlock.
74  *
75  * The dispatcher handling incoming messages cannot grab the socket lock in
76  * the standard fashion, since invoked it runs at the BH level and cannot block.
77  * Instead, it checks to see if the socket lock is currently owned by someone,
78  * and either handles the message itself or adds it to the socket's backlog
79  * queue; in the latter case the queued message is processed once the process
80  * owning the socket lock releases it.
81  *
82  * NOTE: Releasing the socket lock while an operation is sleeping overcomes
83  * the problem of a blocked socket operation preventing any other operations
84  * from occurring.  However, applications must be careful if they have
85  * multiple threads trying to send (or receive) on the same socket, as these
86  * operations might interfere with each other.  For example, doing a connect
87  * and a receive at the same time might allow the receive to consume the
88  * ACK message meant for the connect.  While additional work could be done
89  * to try and overcome this, it doesn't seem to be worthwhile at the present.
90  *
91  * NOTE: Releasing the socket lock while an operation is sleeping also ensures
92  * that another operation that must be performed in a non-blocking manner is
93  * not delayed for very long because the lock has already been taken.
94  *
95  * NOTE: This code assumes that certain fields of a port/socket pair are
96  * constant over its lifetime; such fields can be examined without taking
97  * the socket lock and/or port lock, and do not need to be re-read even
98  * after resuming processing after waiting.  These fields include:
99  *   - socket type
100  *   - pointer to socket sk structure (aka tipc_sock structure)
101  *   - pointer to port structure
102  *   - port reference
103  */
104
105 #include "socket.h"
106
107 /**
108  * advance_rx_queue - discard first buffer in socket receive queue
109  *
110  * Caller must hold socket lock
111  */
112 static void advance_rx_queue(struct sock *sk)
113 {
114         kfree_skb(__skb_dequeue(&sk->sk_receive_queue));
115 }
116
117 /**
118  * reject_rx_queue - reject all buffers in socket receive queue
119  *
120  * Caller must hold socket lock
121  */
122 static void reject_rx_queue(struct sock *sk)
123 {
124         struct sk_buff *buf;
125
126         while ((buf = __skb_dequeue(&sk->sk_receive_queue)))
127                 tipc_reject_msg(buf, TIPC_ERR_NO_PORT);
128 }
129
130 /**
131  * tipc_sk_create - create a TIPC socket
132  * @net: network namespace (must be default network)
133  * @sock: pre-allocated socket structure
134  * @protocol: protocol indicator (must be 0)
135  * @kern: caused by kernel or by userspace?
136  *
137  * This routine creates additional data structures used by the TIPC socket,
138  * initializes them, and links them together.
139  *
140  * Returns 0 on success, errno otherwise
141  */
142 static int tipc_sk_create(struct net *net, struct socket *sock,
143                           int protocol, int kern)
144 {
145         const struct proto_ops *ops;
146         socket_state state;
147         struct sock *sk;
148         struct tipc_sock *tsk;
149         struct tipc_port *port;
150         u32 ref;
151
152         /* Validate arguments */
153         if (unlikely(protocol != 0))
154                 return -EPROTONOSUPPORT;
155
156         switch (sock->type) {
157         case SOCK_STREAM:
158                 ops = &stream_ops;
159                 state = SS_UNCONNECTED;
160                 break;
161         case SOCK_SEQPACKET:
162                 ops = &packet_ops;
163                 state = SS_UNCONNECTED;
164                 break;
165         case SOCK_DGRAM:
166         case SOCK_RDM:
167                 ops = &msg_ops;
168                 state = SS_READY;
169                 break;
170         default:
171                 return -EPROTOTYPE;
172         }
173
174         /* Allocate socket's protocol area */
175         if (!kern)
176                 sk = sk_alloc(net, AF_TIPC, GFP_KERNEL, &tipc_proto);
177         else
178                 sk = sk_alloc(net, AF_TIPC, GFP_KERNEL, &tipc_proto_kern);
179
180         if (sk == NULL)
181                 return -ENOMEM;
182
183         tsk = tipc_sk(sk);
184         port = &tsk->port;
185
186         ref = tipc_port_init(port, TIPC_LOW_IMPORTANCE);
187         if (!ref) {
188                 pr_warn("Socket registration failed, ref. table exhausted\n");
189                 sk_free(sk);
190                 return -ENOMEM;
191         }
192
193         /* Finish initializing socket data structures */
194         sock->ops = ops;
195         sock->state = state;
196
197         sock_init_data(sock, sk);
198         sk->sk_backlog_rcv = backlog_rcv;
199         sk->sk_rcvbuf = sysctl_tipc_rmem[1];
200         sk->sk_data_ready = tipc_data_ready;
201         sk->sk_write_space = tipc_write_space;
202         tipc_sk(sk)->conn_timeout = CONN_TIMEOUT_DEFAULT;
203         tipc_port_unlock(port);
204
205         if (sock->state == SS_READY) {
206                 tipc_port_set_unreturnable(port, true);
207                 if (sock->type == SOCK_DGRAM)
208                         tipc_port_set_unreliable(port, true);
209         }
210         return 0;
211 }
212
213 /**
214  * tipc_sock_create_local - create TIPC socket from inside TIPC module
215  * @type: socket type - SOCK_RDM or SOCK_SEQPACKET
216  *
217  * We cannot use sock_creat_kern here because it bumps module user count.
218  * Since socket owner and creator is the same module we must make sure
219  * that module count remains zero for module local sockets, otherwise
220  * we cannot do rmmod.
221  *
222  * Returns 0 on success, errno otherwise
223  */
224 int tipc_sock_create_local(int type, struct socket **res)
225 {
226         int rc;
227
228         rc = sock_create_lite(AF_TIPC, type, 0, res);
229         if (rc < 0) {
230                 pr_err("Failed to create kernel socket\n");
231                 return rc;
232         }
233         tipc_sk_create(&init_net, *res, 0, 1);
234
235         return 0;
236 }
237
238 /**
239  * tipc_sock_release_local - release socket created by tipc_sock_create_local
240  * @sock: the socket to be released.
241  *
242  * Module reference count is not incremented when such sockets are created,
243  * so we must keep it from being decremented when they are released.
244  */
245 void tipc_sock_release_local(struct socket *sock)
246 {
247         tipc_release(sock);
248         sock->ops = NULL;
249         sock_release(sock);
250 }
251
252 /**
253  * tipc_sock_accept_local - accept a connection on a socket created
254  * with tipc_sock_create_local. Use this function to avoid that
255  * module reference count is inadvertently incremented.
256  *
257  * @sock:    the accepting socket
258  * @newsock: reference to the new socket to be created
259  * @flags:   socket flags
260  */
261
262 int tipc_sock_accept_local(struct socket *sock, struct socket **newsock,
263                            int flags)
264 {
265         struct sock *sk = sock->sk;
266         int ret;
267
268         ret = sock_create_lite(sk->sk_family, sk->sk_type,
269                                sk->sk_protocol, newsock);
270         if (ret < 0)
271                 return ret;
272
273         ret = tipc_accept(sock, *newsock, flags);
274         if (ret < 0) {
275                 sock_release(*newsock);
276                 return ret;
277         }
278         (*newsock)->ops = sock->ops;
279         return ret;
280 }
281
282 /**
283  * tipc_release - destroy a TIPC socket
284  * @sock: socket to destroy
285  *
286  * This routine cleans up any messages that are still queued on the socket.
287  * For DGRAM and RDM socket types, all queued messages are rejected.
288  * For SEQPACKET and STREAM socket types, the first message is rejected
289  * and any others are discarded.  (If the first message on a STREAM socket
290  * is partially-read, it is discarded and the next one is rejected instead.)
291  *
292  * NOTE: Rejected messages are not necessarily returned to the sender!  They
293  * are returned or discarded according to the "destination droppable" setting
294  * specified for the message by the sender.
295  *
296  * Returns 0 on success, errno otherwise
297  */
298 static int tipc_release(struct socket *sock)
299 {
300         struct sock *sk = sock->sk;
301         struct tipc_sock *tsk;
302         struct tipc_port *port;
303         struct sk_buff *buf;
304
305         /*
306          * Exit if socket isn't fully initialized (occurs when a failed accept()
307          * releases a pre-allocated child socket that was never used)
308          */
309         if (sk == NULL)
310                 return 0;
311
312         tsk = tipc_sk(sk);
313         port = &tsk->port;
314         lock_sock(sk);
315
316         /*
317          * Reject all unreceived messages, except on an active connection
318          * (which disconnects locally & sends a 'FIN+' to peer)
319          */
320         while (sock->state != SS_DISCONNECTING) {
321                 buf = __skb_dequeue(&sk->sk_receive_queue);
322                 if (buf == NULL)
323                         break;
324                 if (TIPC_SKB_CB(buf)->handle != NULL)
325                         kfree_skb(buf);
326                 else {
327                         if ((sock->state == SS_CONNECTING) ||
328                             (sock->state == SS_CONNECTED)) {
329                                 sock->state = SS_DISCONNECTING;
330                                 tipc_port_disconnect(port->ref);
331                         }
332                         tipc_reject_msg(buf, TIPC_ERR_NO_PORT);
333                 }
334         }
335
336         /* Destroy TIPC port; also disconnects an active connection and
337          * sends a 'FIN-' to peer.
338          */
339         tipc_port_destroy(port);
340
341         /* Discard any remaining (connection-based) messages in receive queue */
342         __skb_queue_purge(&sk->sk_receive_queue);
343
344         /* Reject any messages that accumulated in backlog queue */
345         sock->state = SS_DISCONNECTING;
346         release_sock(sk);
347
348         sock_put(sk);
349         sock->sk = NULL;
350
351         return 0;
352 }
353
354 /**
355  * tipc_bind - associate or disassocate TIPC name(s) with a socket
356  * @sock: socket structure
357  * @uaddr: socket address describing name(s) and desired operation
358  * @uaddr_len: size of socket address data structure
359  *
360  * Name and name sequence binding is indicated using a positive scope value;
361  * a negative scope value unbinds the specified name.  Specifying no name
362  * (i.e. a socket address length of 0) unbinds all names from the socket.
363  *
364  * Returns 0 on success, errno otherwise
365  *
366  * NOTE: This routine doesn't need to take the socket lock since it doesn't
367  *       access any non-constant socket information.
368  */
369 static int tipc_bind(struct socket *sock, struct sockaddr *uaddr,
370                      int uaddr_len)
371 {
372         struct sock *sk = sock->sk;
373         struct sockaddr_tipc *addr = (struct sockaddr_tipc *)uaddr;
374         struct tipc_sock *tsk = tipc_sk(sk);
375         int res = -EINVAL;
376
377         lock_sock(sk);
378         if (unlikely(!uaddr_len)) {
379                 res = tipc_withdraw(&tsk->port, 0, NULL);
380                 goto exit;
381         }
382
383         if (uaddr_len < sizeof(struct sockaddr_tipc)) {
384                 res = -EINVAL;
385                 goto exit;
386         }
387         if (addr->family != AF_TIPC) {
388                 res = -EAFNOSUPPORT;
389                 goto exit;
390         }
391
392         if (addr->addrtype == TIPC_ADDR_NAME)
393                 addr->addr.nameseq.upper = addr->addr.nameseq.lower;
394         else if (addr->addrtype != TIPC_ADDR_NAMESEQ) {
395                 res = -EAFNOSUPPORT;
396                 goto exit;
397         }
398
399         if ((addr->addr.nameseq.type < TIPC_RESERVED_TYPES) &&
400             (addr->addr.nameseq.type != TIPC_TOP_SRV) &&
401             (addr->addr.nameseq.type != TIPC_CFG_SRV)) {
402                 res = -EACCES;
403                 goto exit;
404         }
405
406         res = (addr->scope > 0) ?
407                 tipc_publish(&tsk->port, addr->scope, &addr->addr.nameseq) :
408                 tipc_withdraw(&tsk->port, -addr->scope, &addr->addr.nameseq);
409 exit:
410         release_sock(sk);
411         return res;
412 }
413
414 /**
415  * tipc_getname - get port ID of socket or peer socket
416  * @sock: socket structure
417  * @uaddr: area for returned socket address
418  * @uaddr_len: area for returned length of socket address
419  * @peer: 0 = own ID, 1 = current peer ID, 2 = current/former peer ID
420  *
421  * Returns 0 on success, errno otherwise
422  *
423  * NOTE: This routine doesn't need to take the socket lock since it only
424  *       accesses socket information that is unchanging (or which changes in
425  *       a completely predictable manner).
426  */
427 static int tipc_getname(struct socket *sock, struct sockaddr *uaddr,
428                         int *uaddr_len, int peer)
429 {
430         struct sockaddr_tipc *addr = (struct sockaddr_tipc *)uaddr;
431         struct tipc_sock *tsk = tipc_sk(sock->sk);
432
433         memset(addr, 0, sizeof(*addr));
434         if (peer) {
435                 if ((sock->state != SS_CONNECTED) &&
436                         ((peer != 2) || (sock->state != SS_DISCONNECTING)))
437                         return -ENOTCONN;
438                 addr->addr.id.ref = tipc_port_peerport(&tsk->port);
439                 addr->addr.id.node = tipc_port_peernode(&tsk->port);
440         } else {
441                 addr->addr.id.ref = tsk->port.ref;
442                 addr->addr.id.node = tipc_own_addr;
443         }
444
445         *uaddr_len = sizeof(*addr);
446         addr->addrtype = TIPC_ADDR_ID;
447         addr->family = AF_TIPC;
448         addr->scope = 0;
449         addr->addr.name.domain = 0;
450
451         return 0;
452 }
453
454 /**
455  * tipc_poll - read and possibly block on pollmask
456  * @file: file structure associated with the socket
457  * @sock: socket for which to calculate the poll bits
458  * @wait: ???
459  *
460  * Returns pollmask value
461  *
462  * COMMENTARY:
463  * It appears that the usual socket locking mechanisms are not useful here
464  * since the pollmask info is potentially out-of-date the moment this routine
465  * exits.  TCP and other protocols seem to rely on higher level poll routines
466  * to handle any preventable race conditions, so TIPC will do the same ...
467  *
468  * TIPC sets the returned events as follows:
469  *
470  * socket state         flags set
471  * ------------         ---------
472  * unconnected          no read flags
473  *                      POLLOUT if port is not congested
474  *
475  * connecting           POLLIN/POLLRDNORM if ACK/NACK in rx queue
476  *                      no write flags
477  *
478  * connected            POLLIN/POLLRDNORM if data in rx queue
479  *                      POLLOUT if port is not congested
480  *
481  * disconnecting        POLLIN/POLLRDNORM/POLLHUP
482  *                      no write flags
483  *
484  * listening            POLLIN if SYN in rx queue
485  *                      no write flags
486  *
487  * ready                POLLIN/POLLRDNORM if data in rx queue
488  * [connectionless]     POLLOUT (since port cannot be congested)
489  *
490  * IMPORTANT: The fact that a read or write operation is indicated does NOT
491  * imply that the operation will succeed, merely that it should be performed
492  * and will not block.
493  */
494 static unsigned int tipc_poll(struct file *file, struct socket *sock,
495                               poll_table *wait)
496 {
497         struct sock *sk = sock->sk;
498         struct tipc_sock *tsk = tipc_sk(sk);
499         u32 mask = 0;
500
501         sock_poll_wait(file, sk_sleep(sk), wait);
502
503         switch ((int)sock->state) {
504         case SS_UNCONNECTED:
505                 if (!tsk->port.congested)
506                         mask |= POLLOUT;
507                 break;
508         case SS_READY:
509         case SS_CONNECTED:
510                 if (!tsk->port.congested)
511                         mask |= POLLOUT;
512                 /* fall thru' */
513         case SS_CONNECTING:
514         case SS_LISTENING:
515                 if (!skb_queue_empty(&sk->sk_receive_queue))
516                         mask |= (POLLIN | POLLRDNORM);
517                 break;
518         case SS_DISCONNECTING:
519                 mask = (POLLIN | POLLRDNORM | POLLHUP);
520                 break;
521         }
522
523         return mask;
524 }
525
526 /**
527  * dest_name_check - verify user is permitted to send to specified port name
528  * @dest: destination address
529  * @m: descriptor for message to be sent
530  *
531  * Prevents restricted configuration commands from being issued by
532  * unauthorized users.
533  *
534  * Returns 0 if permission is granted, otherwise errno
535  */
536 static int dest_name_check(struct sockaddr_tipc *dest, struct msghdr *m)
537 {
538         struct tipc_cfg_msg_hdr hdr;
539
540         if (likely(dest->addr.name.name.type >= TIPC_RESERVED_TYPES))
541                 return 0;
542         if (likely(dest->addr.name.name.type == TIPC_TOP_SRV))
543                 return 0;
544         if (likely(dest->addr.name.name.type != TIPC_CFG_SRV))
545                 return -EACCES;
546
547         if (!m->msg_iovlen || (m->msg_iov[0].iov_len < sizeof(hdr)))
548                 return -EMSGSIZE;
549         if (copy_from_user(&hdr, m->msg_iov[0].iov_base, sizeof(hdr)))
550                 return -EFAULT;
551         if ((ntohs(hdr.tcm_type) & 0xC000) && (!capable(CAP_NET_ADMIN)))
552                 return -EACCES;
553
554         return 0;
555 }
556
557 static int tipc_wait_for_sndmsg(struct socket *sock, long *timeo_p)
558 {
559         struct sock *sk = sock->sk;
560         struct tipc_sock *tsk = tipc_sk(sk);
561         DEFINE_WAIT(wait);
562         int done;
563
564         do {
565                 int err = sock_error(sk);
566                 if (err)
567                         return err;
568                 if (sock->state == SS_DISCONNECTING)
569                         return -EPIPE;
570                 if (!*timeo_p)
571                         return -EAGAIN;
572                 if (signal_pending(current))
573                         return sock_intr_errno(*timeo_p);
574
575                 prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
576                 done = sk_wait_event(sk, timeo_p, !tsk->port.congested);
577                 finish_wait(sk_sleep(sk), &wait);
578         } while (!done);
579         return 0;
580 }
581
582
583 /**
584  * tipc_sendmsg - send message in connectionless manner
585  * @iocb: if NULL, indicates that socket lock is already held
586  * @sock: socket structure
587  * @m: message to send
588  * @total_len: length of message
589  *
590  * Message must have an destination specified explicitly.
591  * Used for SOCK_RDM and SOCK_DGRAM messages,
592  * and for 'SYN' messages on SOCK_SEQPACKET and SOCK_STREAM connections.
593  * (Note: 'SYN+' is prohibited on SOCK_STREAM.)
594  *
595  * Returns the number of bytes sent on success, or errno otherwise
596  */
597 static int tipc_sendmsg(struct kiocb *iocb, struct socket *sock,
598                         struct msghdr *m, size_t total_len)
599 {
600         struct sock *sk = sock->sk;
601         struct tipc_sock *tsk = tipc_sk(sk);
602         struct tipc_port *port = &tsk->port;
603         DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
604         int needs_conn;
605         long timeo;
606         int res = -EINVAL;
607
608         if (unlikely(!dest))
609                 return -EDESTADDRREQ;
610         if (unlikely((m->msg_namelen < sizeof(*dest)) ||
611                      (dest->family != AF_TIPC)))
612                 return -EINVAL;
613         if (total_len > TIPC_MAX_USER_MSG_SIZE)
614                 return -EMSGSIZE;
615
616         if (iocb)
617                 lock_sock(sk);
618
619         needs_conn = (sock->state != SS_READY);
620         if (unlikely(needs_conn)) {
621                 if (sock->state == SS_LISTENING) {
622                         res = -EPIPE;
623                         goto exit;
624                 }
625                 if (sock->state != SS_UNCONNECTED) {
626                         res = -EISCONN;
627                         goto exit;
628                 }
629                 if (tsk->port.published) {
630                         res = -EOPNOTSUPP;
631                         goto exit;
632                 }
633                 if (dest->addrtype == TIPC_ADDR_NAME) {
634                         tsk->port.conn_type = dest->addr.name.name.type;
635                         tsk->port.conn_instance = dest->addr.name.name.instance;
636                 }
637
638                 /* Abort any pending connection attempts (very unlikely) */
639                 reject_rx_queue(sk);
640         }
641
642         timeo = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
643         do {
644                 if (dest->addrtype == TIPC_ADDR_NAME) {
645                         res = dest_name_check(dest, m);
646                         if (res)
647                                 break;
648                         res = tipc_send2name(port,
649                                              &dest->addr.name.name,
650                                              dest->addr.name.domain,
651                                              m->msg_iov,
652                                              total_len);
653                 } else if (dest->addrtype == TIPC_ADDR_ID) {
654                         res = tipc_send2port(port,
655                                              &dest->addr.id,
656                                              m->msg_iov,
657                                              total_len);
658                 } else if (dest->addrtype == TIPC_ADDR_MCAST) {
659                         if (needs_conn) {
660                                 res = -EOPNOTSUPP;
661                                 break;
662                         }
663                         res = dest_name_check(dest, m);
664                         if (res)
665                                 break;
666                         res = tipc_port_mcast_xmit(port,
667                                                    &dest->addr.nameseq,
668                                                    m->msg_iov,
669                                                    total_len);
670                 }
671                 if (likely(res != -ELINKCONG)) {
672                         if (needs_conn && (res >= 0))
673                                 sock->state = SS_CONNECTING;
674                         break;
675                 }
676                 res = tipc_wait_for_sndmsg(sock, &timeo);
677                 if (res)
678                         break;
679         } while (1);
680
681 exit:
682         if (iocb)
683                 release_sock(sk);
684         return res;
685 }
686
687 static int tipc_wait_for_sndpkt(struct socket *sock, long *timeo_p)
688 {
689         struct sock *sk = sock->sk;
690         struct tipc_sock *tsk = tipc_sk(sk);
691         struct tipc_port *port = &tsk->port;
692         DEFINE_WAIT(wait);
693         int done;
694
695         do {
696                 int err = sock_error(sk);
697                 if (err)
698                         return err;
699                 if (sock->state == SS_DISCONNECTING)
700                         return -EPIPE;
701                 else if (sock->state != SS_CONNECTED)
702                         return -ENOTCONN;
703                 if (!*timeo_p)
704                         return -EAGAIN;
705                 if (signal_pending(current))
706                         return sock_intr_errno(*timeo_p);
707
708                 prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
709                 done = sk_wait_event(sk, timeo_p,
710                                      (!port->congested || !port->connected));
711                 finish_wait(sk_sleep(sk), &wait);
712         } while (!done);
713         return 0;
714 }
715
716 /**
717  * tipc_send_packet - send a connection-oriented message
718  * @iocb: if NULL, indicates that socket lock is already held
719  * @sock: socket structure
720  * @m: message to send
721  * @total_len: length of message
722  *
723  * Used for SOCK_SEQPACKET messages and SOCK_STREAM data.
724  *
725  * Returns the number of bytes sent on success, or errno otherwise
726  */
727 static int tipc_send_packet(struct kiocb *iocb, struct socket *sock,
728                             struct msghdr *m, size_t total_len)
729 {
730         struct sock *sk = sock->sk;
731         struct tipc_sock *tsk = tipc_sk(sk);
732         DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
733         int res = -EINVAL;
734         long timeo;
735
736         /* Handle implied connection establishment */
737         if (unlikely(dest))
738                 return tipc_sendmsg(iocb, sock, m, total_len);
739
740         if (total_len > TIPC_MAX_USER_MSG_SIZE)
741                 return -EMSGSIZE;
742
743         if (iocb)
744                 lock_sock(sk);
745
746         if (unlikely(sock->state != SS_CONNECTED)) {
747                 if (sock->state == SS_DISCONNECTING)
748                         res = -EPIPE;
749                 else
750                         res = -ENOTCONN;
751                 goto exit;
752         }
753
754         timeo = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
755         do {
756                 res = tipc_send(&tsk->port, m->msg_iov, total_len);
757                 if (likely(res != -ELINKCONG))
758                         break;
759                 res = tipc_wait_for_sndpkt(sock, &timeo);
760                 if (res)
761                         break;
762         } while (1);
763 exit:
764         if (iocb)
765                 release_sock(sk);
766         return res;
767 }
768
769 /**
770  * tipc_send_stream - send stream-oriented data
771  * @iocb: (unused)
772  * @sock: socket structure
773  * @m: data to send
774  * @total_len: total length of data to be sent
775  *
776  * Used for SOCK_STREAM data.
777  *
778  * Returns the number of bytes sent on success (or partial success),
779  * or errno if no data sent
780  */
781 static int tipc_send_stream(struct kiocb *iocb, struct socket *sock,
782                             struct msghdr *m, size_t total_len)
783 {
784         struct sock *sk = sock->sk;
785         struct tipc_sock *tsk = tipc_sk(sk);
786         struct msghdr my_msg;
787         struct iovec my_iov;
788         struct iovec *curr_iov;
789         int curr_iovlen;
790         char __user *curr_start;
791         u32 hdr_size;
792         int curr_left;
793         int bytes_to_send;
794         int bytes_sent;
795         int res;
796
797         lock_sock(sk);
798
799         /* Handle special cases where there is no connection */
800         if (unlikely(sock->state != SS_CONNECTED)) {
801                 if (sock->state == SS_UNCONNECTED)
802                         res = tipc_send_packet(NULL, sock, m, total_len);
803                 else
804                         res = sock->state == SS_DISCONNECTING ? -EPIPE : -ENOTCONN;
805                 goto exit;
806         }
807
808         if (unlikely(m->msg_name)) {
809                 res = -EISCONN;
810                 goto exit;
811         }
812
813         if (total_len > (unsigned int)INT_MAX) {
814                 res = -EMSGSIZE;
815                 goto exit;
816         }
817
818         /*
819          * Send each iovec entry using one or more messages
820          *
821          * Note: This algorithm is good for the most likely case
822          * (i.e. one large iovec entry), but could be improved to pass sets
823          * of small iovec entries into send_packet().
824          */
825         curr_iov = m->msg_iov;
826         curr_iovlen = m->msg_iovlen;
827         my_msg.msg_iov = &my_iov;
828         my_msg.msg_iovlen = 1;
829         my_msg.msg_flags = m->msg_flags;
830         my_msg.msg_name = NULL;
831         bytes_sent = 0;
832
833         hdr_size = msg_hdr_sz(&tsk->port.phdr);
834
835         while (curr_iovlen--) {
836                 curr_start = curr_iov->iov_base;
837                 curr_left = curr_iov->iov_len;
838
839                 while (curr_left) {
840                         bytes_to_send = tsk->port.max_pkt - hdr_size;
841                         if (bytes_to_send > TIPC_MAX_USER_MSG_SIZE)
842                                 bytes_to_send = TIPC_MAX_USER_MSG_SIZE;
843                         if (curr_left < bytes_to_send)
844                                 bytes_to_send = curr_left;
845                         my_iov.iov_base = curr_start;
846                         my_iov.iov_len = bytes_to_send;
847                         res = tipc_send_packet(NULL, sock, &my_msg,
848                                                bytes_to_send);
849                         if (res < 0) {
850                                 if (bytes_sent)
851                                         res = bytes_sent;
852                                 goto exit;
853                         }
854                         curr_left -= bytes_to_send;
855                         curr_start += bytes_to_send;
856                         bytes_sent += bytes_to_send;
857                 }
858
859                 curr_iov++;
860         }
861         res = bytes_sent;
862 exit:
863         release_sock(sk);
864         return res;
865 }
866
867 /**
868  * auto_connect - complete connection setup to a remote port
869  * @tsk: tipc socket structure
870  * @msg: peer's response message
871  *
872  * Returns 0 on success, errno otherwise
873  */
874 static int auto_connect(struct tipc_sock *tsk, struct tipc_msg *msg)
875 {
876         struct tipc_port *port = &tsk->port;
877         struct socket *sock = tsk->sk.sk_socket;
878         struct tipc_portid peer;
879
880         peer.ref = msg_origport(msg);
881         peer.node = msg_orignode(msg);
882
883         __tipc_port_connect(port->ref, port, &peer);
884
885         if (msg_importance(msg) > TIPC_CRITICAL_IMPORTANCE)
886                 return -EINVAL;
887         msg_set_importance(&port->phdr, (u32)msg_importance(msg));
888         sock->state = SS_CONNECTED;
889         return 0;
890 }
891
892 /**
893  * set_orig_addr - capture sender's address for received message
894  * @m: descriptor for message info
895  * @msg: received message header
896  *
897  * Note: Address is not captured if not requested by receiver.
898  */
899 static void set_orig_addr(struct msghdr *m, struct tipc_msg *msg)
900 {
901         DECLARE_SOCKADDR(struct sockaddr_tipc *, addr, m->msg_name);
902
903         if (addr) {
904                 addr->family = AF_TIPC;
905                 addr->addrtype = TIPC_ADDR_ID;
906                 memset(&addr->addr, 0, sizeof(addr->addr));
907                 addr->addr.id.ref = msg_origport(msg);
908                 addr->addr.id.node = msg_orignode(msg);
909                 addr->addr.name.domain = 0;     /* could leave uninitialized */
910                 addr->scope = 0;                /* could leave uninitialized */
911                 m->msg_namelen = sizeof(struct sockaddr_tipc);
912         }
913 }
914
915 /**
916  * anc_data_recv - optionally capture ancillary data for received message
917  * @m: descriptor for message info
918  * @msg: received message header
919  * @tport: TIPC port associated with message
920  *
921  * Note: Ancillary data is not captured if not requested by receiver.
922  *
923  * Returns 0 if successful, otherwise errno
924  */
925 static int anc_data_recv(struct msghdr *m, struct tipc_msg *msg,
926                          struct tipc_port *tport)
927 {
928         u32 anc_data[3];
929         u32 err;
930         u32 dest_type;
931         int has_name;
932         int res;
933
934         if (likely(m->msg_controllen == 0))
935                 return 0;
936
937         /* Optionally capture errored message object(s) */
938         err = msg ? msg_errcode(msg) : 0;
939         if (unlikely(err)) {
940                 anc_data[0] = err;
941                 anc_data[1] = msg_data_sz(msg);
942                 res = put_cmsg(m, SOL_TIPC, TIPC_ERRINFO, 8, anc_data);
943                 if (res)
944                         return res;
945                 if (anc_data[1]) {
946                         res = put_cmsg(m, SOL_TIPC, TIPC_RETDATA, anc_data[1],
947                                        msg_data(msg));
948                         if (res)
949                                 return res;
950                 }
951         }
952
953         /* Optionally capture message destination object */
954         dest_type = msg ? msg_type(msg) : TIPC_DIRECT_MSG;
955         switch (dest_type) {
956         case TIPC_NAMED_MSG:
957                 has_name = 1;
958                 anc_data[0] = msg_nametype(msg);
959                 anc_data[1] = msg_namelower(msg);
960                 anc_data[2] = msg_namelower(msg);
961                 break;
962         case TIPC_MCAST_MSG:
963                 has_name = 1;
964                 anc_data[0] = msg_nametype(msg);
965                 anc_data[1] = msg_namelower(msg);
966                 anc_data[2] = msg_nameupper(msg);
967                 break;
968         case TIPC_CONN_MSG:
969                 has_name = (tport->conn_type != 0);
970                 anc_data[0] = tport->conn_type;
971                 anc_data[1] = tport->conn_instance;
972                 anc_data[2] = tport->conn_instance;
973                 break;
974         default:
975                 has_name = 0;
976         }
977         if (has_name) {
978                 res = put_cmsg(m, SOL_TIPC, TIPC_DESTNAME, 12, anc_data);
979                 if (res)
980                         return res;
981         }
982
983         return 0;
984 }
985
986 static int tipc_wait_for_rcvmsg(struct socket *sock, long timeo)
987 {
988         struct sock *sk = sock->sk;
989         DEFINE_WAIT(wait);
990         int err;
991
992         for (;;) {
993                 prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
994                 if (timeo && skb_queue_empty(&sk->sk_receive_queue)) {
995                         if (sock->state == SS_DISCONNECTING) {
996                                 err = -ENOTCONN;
997                                 break;
998                         }
999                         release_sock(sk);
1000                         timeo = schedule_timeout(timeo);
1001                         lock_sock(sk);
1002                 }
1003                 err = 0;
1004                 if (!skb_queue_empty(&sk->sk_receive_queue))
1005                         break;
1006                 err = sock_intr_errno(timeo);
1007                 if (signal_pending(current))
1008                         break;
1009                 err = -EAGAIN;
1010                 if (!timeo)
1011                         break;
1012         }
1013         finish_wait(sk_sleep(sk), &wait);
1014         return err;
1015 }
1016
1017 /**
1018  * tipc_recvmsg - receive packet-oriented message
1019  * @iocb: (unused)
1020  * @m: descriptor for message info
1021  * @buf_len: total size of user buffer area
1022  * @flags: receive flags
1023  *
1024  * Used for SOCK_DGRAM, SOCK_RDM, and SOCK_SEQPACKET messages.
1025  * If the complete message doesn't fit in user area, truncate it.
1026  *
1027  * Returns size of returned message data, errno otherwise
1028  */
1029 static int tipc_recvmsg(struct kiocb *iocb, struct socket *sock,
1030                         struct msghdr *m, size_t buf_len, int flags)
1031 {
1032         struct sock *sk = sock->sk;
1033         struct tipc_sock *tsk = tipc_sk(sk);
1034         struct tipc_port *port = &tsk->port;
1035         struct sk_buff *buf;
1036         struct tipc_msg *msg;
1037         long timeo;
1038         unsigned int sz;
1039         u32 err;
1040         int res;
1041
1042         /* Catch invalid receive requests */
1043         if (unlikely(!buf_len))
1044                 return -EINVAL;
1045
1046         lock_sock(sk);
1047
1048         if (unlikely(sock->state == SS_UNCONNECTED)) {
1049                 res = -ENOTCONN;
1050                 goto exit;
1051         }
1052
1053         timeo = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
1054 restart:
1055
1056         /* Look for a message in receive queue; wait if necessary */
1057         res = tipc_wait_for_rcvmsg(sock, timeo);
1058         if (res)
1059                 goto exit;
1060
1061         /* Look at first message in receive queue */
1062         buf = skb_peek(&sk->sk_receive_queue);
1063         msg = buf_msg(buf);
1064         sz = msg_data_sz(msg);
1065         err = msg_errcode(msg);
1066
1067         /* Discard an empty non-errored message & try again */
1068         if ((!sz) && (!err)) {
1069                 advance_rx_queue(sk);
1070                 goto restart;
1071         }
1072
1073         /* Capture sender's address (optional) */
1074         set_orig_addr(m, msg);
1075
1076         /* Capture ancillary data (optional) */
1077         res = anc_data_recv(m, msg, port);
1078         if (res)
1079                 goto exit;
1080
1081         /* Capture message data (if valid) & compute return value (always) */
1082         if (!err) {
1083                 if (unlikely(buf_len < sz)) {
1084                         sz = buf_len;
1085                         m->msg_flags |= MSG_TRUNC;
1086                 }
1087                 res = skb_copy_datagram_iovec(buf, msg_hdr_sz(msg),
1088                                               m->msg_iov, sz);
1089                 if (res)
1090                         goto exit;
1091                 res = sz;
1092         } else {
1093                 if ((sock->state == SS_READY) ||
1094                     ((err == TIPC_CONN_SHUTDOWN) || m->msg_control))
1095                         res = 0;
1096                 else
1097                         res = -ECONNRESET;
1098         }
1099
1100         /* Consume received message (optional) */
1101         if (likely(!(flags & MSG_PEEK))) {
1102                 if ((sock->state != SS_READY) &&
1103                     (++port->conn_unacked >= TIPC_FLOW_CONTROL_WIN))
1104                         tipc_acknowledge(port->ref, port->conn_unacked);
1105                 advance_rx_queue(sk);
1106         }
1107 exit:
1108         release_sock(sk);
1109         return res;
1110 }
1111
1112 /**
1113  * tipc_recv_stream - receive stream-oriented data
1114  * @iocb: (unused)
1115  * @m: descriptor for message info
1116  * @buf_len: total size of user buffer area
1117  * @flags: receive flags
1118  *
1119  * Used for SOCK_STREAM messages only.  If not enough data is available
1120  * will optionally wait for more; never truncates data.
1121  *
1122  * Returns size of returned message data, errno otherwise
1123  */
1124 static int tipc_recv_stream(struct kiocb *iocb, struct socket *sock,
1125                             struct msghdr *m, size_t buf_len, int flags)
1126 {
1127         struct sock *sk = sock->sk;
1128         struct tipc_sock *tsk = tipc_sk(sk);
1129         struct tipc_port *port = &tsk->port;
1130         struct sk_buff *buf;
1131         struct tipc_msg *msg;
1132         long timeo;
1133         unsigned int sz;
1134         int sz_to_copy, target, needed;
1135         int sz_copied = 0;
1136         u32 err;
1137         int res = 0;
1138
1139         /* Catch invalid receive attempts */
1140         if (unlikely(!buf_len))
1141                 return -EINVAL;
1142
1143         lock_sock(sk);
1144
1145         if (unlikely(sock->state == SS_UNCONNECTED)) {
1146                 res = -ENOTCONN;
1147                 goto exit;
1148         }
1149
1150         target = sock_rcvlowat(sk, flags & MSG_WAITALL, buf_len);
1151         timeo = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
1152
1153 restart:
1154         /* Look for a message in receive queue; wait if necessary */
1155         res = tipc_wait_for_rcvmsg(sock, timeo);
1156         if (res)
1157                 goto exit;
1158
1159         /* Look at first message in receive queue */
1160         buf = skb_peek(&sk->sk_receive_queue);
1161         msg = buf_msg(buf);
1162         sz = msg_data_sz(msg);
1163         err = msg_errcode(msg);
1164
1165         /* Discard an empty non-errored message & try again */
1166         if ((!sz) && (!err)) {
1167                 advance_rx_queue(sk);
1168                 goto restart;
1169         }
1170
1171         /* Optionally capture sender's address & ancillary data of first msg */
1172         if (sz_copied == 0) {
1173                 set_orig_addr(m, msg);
1174                 res = anc_data_recv(m, msg, port);
1175                 if (res)
1176                         goto exit;
1177         }
1178
1179         /* Capture message data (if valid) & compute return value (always) */
1180         if (!err) {
1181                 u32 offset = (u32)(unsigned long)(TIPC_SKB_CB(buf)->handle);
1182
1183                 sz -= offset;
1184                 needed = (buf_len - sz_copied);
1185                 sz_to_copy = (sz <= needed) ? sz : needed;
1186
1187                 res = skb_copy_datagram_iovec(buf, msg_hdr_sz(msg) + offset,
1188                                               m->msg_iov, sz_to_copy);
1189                 if (res)
1190                         goto exit;
1191
1192                 sz_copied += sz_to_copy;
1193
1194                 if (sz_to_copy < sz) {
1195                         if (!(flags & MSG_PEEK))
1196                                 TIPC_SKB_CB(buf)->handle =
1197                                 (void *)(unsigned long)(offset + sz_to_copy);
1198                         goto exit;
1199                 }
1200         } else {
1201                 if (sz_copied != 0)
1202                         goto exit; /* can't add error msg to valid data */
1203
1204                 if ((err == TIPC_CONN_SHUTDOWN) || m->msg_control)
1205                         res = 0;
1206                 else
1207                         res = -ECONNRESET;
1208         }
1209
1210         /* Consume received message (optional) */
1211         if (likely(!(flags & MSG_PEEK))) {
1212                 if (unlikely(++port->conn_unacked >= TIPC_FLOW_CONTROL_WIN))
1213                         tipc_acknowledge(port->ref, port->conn_unacked);
1214                 advance_rx_queue(sk);
1215         }
1216
1217         /* Loop around if more data is required */
1218         if ((sz_copied < buf_len) &&    /* didn't get all requested data */
1219             (!skb_queue_empty(&sk->sk_receive_queue) ||
1220             (sz_copied < target)) &&    /* and more is ready or required */
1221             (!(flags & MSG_PEEK)) &&    /* and aren't just peeking at data */
1222             (!err))                     /* and haven't reached a FIN */
1223                 goto restart;
1224
1225 exit:
1226         release_sock(sk);
1227         return sz_copied ? sz_copied : res;
1228 }
1229
1230 /**
1231  * tipc_write_space - wake up thread if port congestion is released
1232  * @sk: socket
1233  */
1234 static void tipc_write_space(struct sock *sk)
1235 {
1236         struct socket_wq *wq;
1237
1238         rcu_read_lock();
1239         wq = rcu_dereference(sk->sk_wq);
1240         if (wq_has_sleeper(wq))
1241                 wake_up_interruptible_sync_poll(&wq->wait, POLLOUT |
1242                                                 POLLWRNORM | POLLWRBAND);
1243         rcu_read_unlock();
1244 }
1245
1246 /**
1247  * tipc_data_ready - wake up threads to indicate messages have been received
1248  * @sk: socket
1249  * @len: the length of messages
1250  */
1251 static void tipc_data_ready(struct sock *sk, int len)
1252 {
1253         struct socket_wq *wq;
1254
1255         rcu_read_lock();
1256         wq = rcu_dereference(sk->sk_wq);
1257         if (wq_has_sleeper(wq))
1258                 wake_up_interruptible_sync_poll(&wq->wait, POLLIN |
1259                                                 POLLRDNORM | POLLRDBAND);
1260         rcu_read_unlock();
1261 }
1262
1263 /**
1264  * filter_connect - Handle all incoming messages for a connection-based socket
1265  * @tsk: TIPC socket
1266  * @msg: message
1267  *
1268  * Returns TIPC error status code and socket error status code
1269  * once it encounters some errors
1270  */
1271 static u32 filter_connect(struct tipc_sock *tsk, struct sk_buff **buf)
1272 {
1273         struct sock *sk = &tsk->sk;
1274         struct tipc_port *port = &tsk->port;
1275         struct socket *sock = sk->sk_socket;
1276         struct tipc_msg *msg = buf_msg(*buf);
1277
1278         u32 retval = TIPC_ERR_NO_PORT;
1279         int res;
1280
1281         if (msg_mcast(msg))
1282                 return retval;
1283
1284         switch ((int)sock->state) {
1285         case SS_CONNECTED:
1286                 /* Accept only connection-based messages sent by peer */
1287                 if (msg_connected(msg) && tipc_port_peer_msg(port, msg)) {
1288                         if (unlikely(msg_errcode(msg))) {
1289                                 sock->state = SS_DISCONNECTING;
1290                                 __tipc_port_disconnect(port);
1291                         }
1292                         retval = TIPC_OK;
1293                 }
1294                 break;
1295         case SS_CONNECTING:
1296                 /* Accept only ACK or NACK message */
1297                 if (unlikely(msg_errcode(msg))) {
1298                         sock->state = SS_DISCONNECTING;
1299                         sk->sk_err = ECONNREFUSED;
1300                         retval = TIPC_OK;
1301                         break;
1302                 }
1303
1304                 if (unlikely(!msg_connected(msg)))
1305                         break;
1306
1307                 res = auto_connect(tsk, msg);
1308                 if (res) {
1309                         sock->state = SS_DISCONNECTING;
1310                         sk->sk_err = -res;
1311                         retval = TIPC_OK;
1312                         break;
1313                 }
1314
1315                 /* If an incoming message is an 'ACK-', it should be
1316                  * discarded here because it doesn't contain useful
1317                  * data. In addition, we should try to wake up
1318                  * connect() routine if sleeping.
1319                  */
1320                 if (msg_data_sz(msg) == 0) {
1321                         kfree_skb(*buf);
1322                         *buf = NULL;
1323                         if (waitqueue_active(sk_sleep(sk)))
1324                                 wake_up_interruptible(sk_sleep(sk));
1325                 }
1326                 retval = TIPC_OK;
1327                 break;
1328         case SS_LISTENING:
1329         case SS_UNCONNECTED:
1330                 /* Accept only SYN message */
1331                 if (!msg_connected(msg) && !(msg_errcode(msg)))
1332                         retval = TIPC_OK;
1333                 break;
1334         case SS_DISCONNECTING:
1335                 break;
1336         default:
1337                 pr_err("Unknown socket state %u\n", sock->state);
1338         }
1339         return retval;
1340 }
1341
1342 /**
1343  * rcvbuf_limit - get proper overload limit of socket receive queue
1344  * @sk: socket
1345  * @buf: message
1346  *
1347  * For all connection oriented messages, irrespective of importance,
1348  * the default overload value (i.e. 67MB) is set as limit.
1349  *
1350  * For all connectionless messages, by default new queue limits are
1351  * as belows:
1352  *
1353  * TIPC_LOW_IMPORTANCE       (4 MB)
1354  * TIPC_MEDIUM_IMPORTANCE    (8 MB)
1355  * TIPC_HIGH_IMPORTANCE      (16 MB)
1356  * TIPC_CRITICAL_IMPORTANCE  (32 MB)
1357  *
1358  * Returns overload limit according to corresponding message importance
1359  */
1360 static unsigned int rcvbuf_limit(struct sock *sk, struct sk_buff *buf)
1361 {
1362         struct tipc_msg *msg = buf_msg(buf);
1363
1364         if (msg_connected(msg))
1365                 return sysctl_tipc_rmem[2];
1366
1367         return sk->sk_rcvbuf >> TIPC_CRITICAL_IMPORTANCE <<
1368                 msg_importance(msg);
1369 }
1370
1371 /**
1372  * filter_rcv - validate incoming message
1373  * @sk: socket
1374  * @buf: message
1375  *
1376  * Enqueues message on receive queue if acceptable; optionally handles
1377  * disconnect indication for a connected socket.
1378  *
1379  * Called with socket lock already taken; port lock may also be taken.
1380  *
1381  * Returns TIPC error status code (TIPC_OK if message is not to be rejected)
1382  */
1383 static u32 filter_rcv(struct sock *sk, struct sk_buff *buf)
1384 {
1385         struct socket *sock = sk->sk_socket;
1386         struct tipc_sock *tsk = tipc_sk(sk);
1387         struct tipc_msg *msg = buf_msg(buf);
1388         unsigned int limit = rcvbuf_limit(sk, buf);
1389         u32 res = TIPC_OK;
1390
1391         /* Reject message if it is wrong sort of message for socket */
1392         if (msg_type(msg) > TIPC_DIRECT_MSG)
1393                 return TIPC_ERR_NO_PORT;
1394
1395         if (sock->state == SS_READY) {
1396                 if (msg_connected(msg))
1397                         return TIPC_ERR_NO_PORT;
1398         } else {
1399                 res = filter_connect(tsk, &buf);
1400                 if (res != TIPC_OK || buf == NULL)
1401                         return res;
1402         }
1403
1404         /* Reject message if there isn't room to queue it */
1405         if (sk_rmem_alloc_get(sk) + buf->truesize >= limit)
1406                 return TIPC_ERR_OVERLOAD;
1407
1408         /* Enqueue message */
1409         TIPC_SKB_CB(buf)->handle = NULL;
1410         __skb_queue_tail(&sk->sk_receive_queue, buf);
1411         skb_set_owner_r(buf, sk);
1412
1413         sk->sk_data_ready(sk, 0);
1414         return TIPC_OK;
1415 }
1416
1417 /**
1418  * backlog_rcv - handle incoming message from backlog queue
1419  * @sk: socket
1420  * @buf: message
1421  *
1422  * Caller must hold socket lock, but not port lock.
1423  *
1424  * Returns 0
1425  */
1426 static int backlog_rcv(struct sock *sk, struct sk_buff *buf)
1427 {
1428         u32 res;
1429
1430         res = filter_rcv(sk, buf);
1431         if (res)
1432                 tipc_reject_msg(buf, res);
1433         return 0;
1434 }
1435
1436 /**
1437  * tipc_sk_rcv - handle incoming message
1438  * @sk:  socket receiving message
1439  * @buf: message
1440  *
1441  * Called with port lock already taken.
1442  *
1443  * Returns TIPC error status code (TIPC_OK if message is not to be rejected)
1444  */
1445 u32 tipc_sk_rcv(struct sock *sk, struct sk_buff *buf)
1446 {
1447         u32 res;
1448
1449         /*
1450          * Process message if socket is unlocked; otherwise add to backlog queue
1451          *
1452          * This code is based on sk_receive_skb(), but must be distinct from it
1453          * since a TIPC-specific filter/reject mechanism is utilized
1454          */
1455         bh_lock_sock(sk);
1456         if (!sock_owned_by_user(sk)) {
1457                 res = filter_rcv(sk, buf);
1458         } else {
1459                 if (sk_add_backlog(sk, buf, rcvbuf_limit(sk, buf)))
1460                         res = TIPC_ERR_OVERLOAD;
1461                 else
1462                         res = TIPC_OK;
1463         }
1464         bh_unlock_sock(sk);
1465
1466         return res;
1467 }
1468
1469 static int tipc_wait_for_connect(struct socket *sock, long *timeo_p)
1470 {
1471         struct sock *sk = sock->sk;
1472         DEFINE_WAIT(wait);
1473         int done;
1474
1475         do {
1476                 int err = sock_error(sk);
1477                 if (err)
1478                         return err;
1479                 if (!*timeo_p)
1480                         return -ETIMEDOUT;
1481                 if (signal_pending(current))
1482                         return sock_intr_errno(*timeo_p);
1483
1484                 prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
1485                 done = sk_wait_event(sk, timeo_p, sock->state != SS_CONNECTING);
1486                 finish_wait(sk_sleep(sk), &wait);
1487         } while (!done);
1488         return 0;
1489 }
1490
1491 /**
1492  * tipc_connect - establish a connection to another TIPC port
1493  * @sock: socket structure
1494  * @dest: socket address for destination port
1495  * @destlen: size of socket address data structure
1496  * @flags: file-related flags associated with socket
1497  *
1498  * Returns 0 on success, errno otherwise
1499  */
1500 static int tipc_connect(struct socket *sock, struct sockaddr *dest,
1501                         int destlen, int flags)
1502 {
1503         struct sock *sk = sock->sk;
1504         struct sockaddr_tipc *dst = (struct sockaddr_tipc *)dest;
1505         struct msghdr m = {NULL,};
1506         long timeout = (flags & O_NONBLOCK) ? 0 : tipc_sk(sk)->conn_timeout;
1507         socket_state previous;
1508         int res;
1509
1510         lock_sock(sk);
1511
1512         /* For now, TIPC does not allow use of connect() with DGRAM/RDM types */
1513         if (sock->state == SS_READY) {
1514                 res = -EOPNOTSUPP;
1515                 goto exit;
1516         }
1517
1518         /*
1519          * Reject connection attempt using multicast address
1520          *
1521          * Note: send_msg() validates the rest of the address fields,
1522          *       so there's no need to do it here
1523          */
1524         if (dst->addrtype == TIPC_ADDR_MCAST) {
1525                 res = -EINVAL;
1526                 goto exit;
1527         }
1528
1529         previous = sock->state;
1530         switch (sock->state) {
1531         case SS_UNCONNECTED:
1532                 /* Send a 'SYN-' to destination */
1533                 m.msg_name = dest;
1534                 m.msg_namelen = destlen;
1535
1536                 /* If connect is in non-blocking case, set MSG_DONTWAIT to
1537                  * indicate send_msg() is never blocked.
1538                  */
1539                 if (!timeout)
1540                         m.msg_flags = MSG_DONTWAIT;
1541
1542                 res = tipc_sendmsg(NULL, sock, &m, 0);
1543                 if ((res < 0) && (res != -EWOULDBLOCK))
1544                         goto exit;
1545
1546                 /* Just entered SS_CONNECTING state; the only
1547                  * difference is that return value in non-blocking
1548                  * case is EINPROGRESS, rather than EALREADY.
1549                  */
1550                 res = -EINPROGRESS;
1551         case SS_CONNECTING:
1552                 if (previous == SS_CONNECTING)
1553                         res = -EALREADY;
1554                 if (!timeout)
1555                         goto exit;
1556                 timeout = msecs_to_jiffies(timeout);
1557                 /* Wait until an 'ACK' or 'RST' arrives, or a timeout occurs */
1558                 res = tipc_wait_for_connect(sock, &timeout);
1559                 break;
1560         case SS_CONNECTED:
1561                 res = -EISCONN;
1562                 break;
1563         default:
1564                 res = -EINVAL;
1565                 break;
1566         }
1567 exit:
1568         release_sock(sk);
1569         return res;
1570 }
1571
1572 /**
1573  * tipc_listen - allow socket to listen for incoming connections
1574  * @sock: socket structure
1575  * @len: (unused)
1576  *
1577  * Returns 0 on success, errno otherwise
1578  */
1579 static int tipc_listen(struct socket *sock, int len)
1580 {
1581         struct sock *sk = sock->sk;
1582         int res;
1583
1584         lock_sock(sk);
1585
1586         if (sock->state != SS_UNCONNECTED)
1587                 res = -EINVAL;
1588         else {
1589                 sock->state = SS_LISTENING;
1590                 res = 0;
1591         }
1592
1593         release_sock(sk);
1594         return res;
1595 }
1596
1597 static int tipc_wait_for_accept(struct socket *sock, long timeo)
1598 {
1599         struct sock *sk = sock->sk;
1600         DEFINE_WAIT(wait);
1601         int err;
1602
1603         /* True wake-one mechanism for incoming connections: only
1604          * one process gets woken up, not the 'whole herd'.
1605          * Since we do not 'race & poll' for established sockets
1606          * anymore, the common case will execute the loop only once.
1607         */
1608         for (;;) {
1609                 prepare_to_wait_exclusive(sk_sleep(sk), &wait,
1610                                           TASK_INTERRUPTIBLE);
1611                 if (timeo && skb_queue_empty(&sk->sk_receive_queue)) {
1612                         release_sock(sk);
1613                         timeo = schedule_timeout(timeo);
1614                         lock_sock(sk);
1615                 }
1616                 err = 0;
1617                 if (!skb_queue_empty(&sk->sk_receive_queue))
1618                         break;
1619                 err = -EINVAL;
1620                 if (sock->state != SS_LISTENING)
1621                         break;
1622                 err = sock_intr_errno(timeo);
1623                 if (signal_pending(current))
1624                         break;
1625                 err = -EAGAIN;
1626                 if (!timeo)
1627                         break;
1628         }
1629         finish_wait(sk_sleep(sk), &wait);
1630         return err;
1631 }
1632
1633 /**
1634  * tipc_accept - wait for connection request
1635  * @sock: listening socket
1636  * @newsock: new socket that is to be connected
1637  * @flags: file-related flags associated with socket
1638  *
1639  * Returns 0 on success, errno otherwise
1640  */
1641 static int tipc_accept(struct socket *sock, struct socket *new_sock, int flags)
1642 {
1643         struct sock *new_sk, *sk = sock->sk;
1644         struct sk_buff *buf;
1645         struct tipc_port *new_port;
1646         struct tipc_msg *msg;
1647         struct tipc_portid peer;
1648         u32 new_ref;
1649         long timeo;
1650         int res;
1651
1652         lock_sock(sk);
1653
1654         if (sock->state != SS_LISTENING) {
1655                 res = -EINVAL;
1656                 goto exit;
1657         }
1658         timeo = sock_rcvtimeo(sk, flags & O_NONBLOCK);
1659         res = tipc_wait_for_accept(sock, timeo);
1660         if (res)
1661                 goto exit;
1662
1663         buf = skb_peek(&sk->sk_receive_queue);
1664
1665         res = tipc_sk_create(sock_net(sock->sk), new_sock, 0, 1);
1666         if (res)
1667                 goto exit;
1668
1669         new_sk = new_sock->sk;
1670         new_port = &tipc_sk(new_sk)->port;
1671         new_ref = new_port->ref;
1672         msg = buf_msg(buf);
1673
1674         /* we lock on new_sk; but lockdep sees the lock on sk */
1675         lock_sock_nested(new_sk, SINGLE_DEPTH_NESTING);
1676
1677         /*
1678          * Reject any stray messages received by new socket
1679          * before the socket lock was taken (very, very unlikely)
1680          */
1681         reject_rx_queue(new_sk);
1682
1683         /* Connect new socket to it's peer */
1684         peer.ref = msg_origport(msg);
1685         peer.node = msg_orignode(msg);
1686         tipc_port_connect(new_ref, &peer);
1687         new_sock->state = SS_CONNECTED;
1688
1689         tipc_port_set_importance(new_port, msg_importance(msg));
1690         if (msg_named(msg)) {
1691                 new_port->conn_type = msg_nametype(msg);
1692                 new_port->conn_instance = msg_nameinst(msg);
1693         }
1694
1695         /*
1696          * Respond to 'SYN-' by discarding it & returning 'ACK'-.
1697          * Respond to 'SYN+' by queuing it on new socket.
1698          */
1699         if (!msg_data_sz(msg)) {
1700                 struct msghdr m = {NULL,};
1701
1702                 advance_rx_queue(sk);
1703                 tipc_send_packet(NULL, new_sock, &m, 0);
1704         } else {
1705                 __skb_dequeue(&sk->sk_receive_queue);
1706                 __skb_queue_head(&new_sk->sk_receive_queue, buf);
1707                 skb_set_owner_r(buf, new_sk);
1708         }
1709         release_sock(new_sk);
1710 exit:
1711         release_sock(sk);
1712         return res;
1713 }
1714
1715 /**
1716  * tipc_shutdown - shutdown socket connection
1717  * @sock: socket structure
1718  * @how: direction to close (must be SHUT_RDWR)
1719  *
1720  * Terminates connection (if necessary), then purges socket's receive queue.
1721  *
1722  * Returns 0 on success, errno otherwise
1723  */
1724 static int tipc_shutdown(struct socket *sock, int how)
1725 {
1726         struct sock *sk = sock->sk;
1727         struct tipc_sock *tsk = tipc_sk(sk);
1728         struct tipc_port *port = &tsk->port;
1729         struct sk_buff *buf;
1730         int res;
1731
1732         if (how != SHUT_RDWR)
1733                 return -EINVAL;
1734
1735         lock_sock(sk);
1736
1737         switch (sock->state) {
1738         case SS_CONNECTING:
1739         case SS_CONNECTED:
1740
1741 restart:
1742                 /* Disconnect and send a 'FIN+' or 'FIN-' message to peer */
1743                 buf = __skb_dequeue(&sk->sk_receive_queue);
1744                 if (buf) {
1745                         if (TIPC_SKB_CB(buf)->handle != NULL) {
1746                                 kfree_skb(buf);
1747                                 goto restart;
1748                         }
1749                         tipc_port_disconnect(port->ref);
1750                         tipc_reject_msg(buf, TIPC_CONN_SHUTDOWN);
1751                 } else {
1752                         tipc_port_shutdown(port->ref);
1753                 }
1754
1755                 sock->state = SS_DISCONNECTING;
1756
1757                 /* fall through */
1758
1759         case SS_DISCONNECTING:
1760
1761                 /* Discard any unreceived messages */
1762                 __skb_queue_purge(&sk->sk_receive_queue);
1763
1764                 /* Wake up anyone sleeping in poll */
1765                 sk->sk_state_change(sk);
1766                 res = 0;
1767                 break;
1768
1769         default:
1770                 res = -ENOTCONN;
1771         }
1772
1773         release_sock(sk);
1774         return res;
1775 }
1776
1777 /**
1778  * tipc_setsockopt - set socket option
1779  * @sock: socket structure
1780  * @lvl: option level
1781  * @opt: option identifier
1782  * @ov: pointer to new option value
1783  * @ol: length of option value
1784  *
1785  * For stream sockets only, accepts and ignores all IPPROTO_TCP options
1786  * (to ease compatibility).
1787  *
1788  * Returns 0 on success, errno otherwise
1789  */
1790 static int tipc_setsockopt(struct socket *sock, int lvl, int opt,
1791                            char __user *ov, unsigned int ol)
1792 {
1793         struct sock *sk = sock->sk;
1794         struct tipc_sock *tsk = tipc_sk(sk);
1795         struct tipc_port *port = &tsk->port;
1796         u32 value;
1797         int res;
1798
1799         if ((lvl == IPPROTO_TCP) && (sock->type == SOCK_STREAM))
1800                 return 0;
1801         if (lvl != SOL_TIPC)
1802                 return -ENOPROTOOPT;
1803         if (ol < sizeof(value))
1804                 return -EINVAL;
1805         res = get_user(value, (u32 __user *)ov);
1806         if (res)
1807                 return res;
1808
1809         lock_sock(sk);
1810
1811         switch (opt) {
1812         case TIPC_IMPORTANCE:
1813                 tipc_port_set_importance(port, value);
1814                 break;
1815         case TIPC_SRC_DROPPABLE:
1816                 if (sock->type != SOCK_STREAM)
1817                         tipc_port_set_unreliable(port, value);
1818                 else
1819                         res = -ENOPROTOOPT;
1820                 break;
1821         case TIPC_DEST_DROPPABLE:
1822                 tipc_port_set_unreturnable(port, value);
1823                 break;
1824         case TIPC_CONN_TIMEOUT:
1825                 tipc_sk(sk)->conn_timeout = value;
1826                 /* no need to set "res", since already 0 at this point */
1827                 break;
1828         default:
1829                 res = -EINVAL;
1830         }
1831
1832         release_sock(sk);
1833
1834         return res;
1835 }
1836
1837 /**
1838  * tipc_getsockopt - get socket option
1839  * @sock: socket structure
1840  * @lvl: option level
1841  * @opt: option identifier
1842  * @ov: receptacle for option value
1843  * @ol: receptacle for length of option value
1844  *
1845  * For stream sockets only, returns 0 length result for all IPPROTO_TCP options
1846  * (to ease compatibility).
1847  *
1848  * Returns 0 on success, errno otherwise
1849  */
1850 static int tipc_getsockopt(struct socket *sock, int lvl, int opt,
1851                            char __user *ov, int __user *ol)
1852 {
1853         struct sock *sk = sock->sk;
1854         struct tipc_sock *tsk = tipc_sk(sk);
1855         struct tipc_port *port = &tsk->port;
1856         int len;
1857         u32 value;
1858         int res;
1859
1860         if ((lvl == IPPROTO_TCP) && (sock->type == SOCK_STREAM))
1861                 return put_user(0, ol);
1862         if (lvl != SOL_TIPC)
1863                 return -ENOPROTOOPT;
1864         res = get_user(len, ol);
1865         if (res)
1866                 return res;
1867
1868         lock_sock(sk);
1869
1870         switch (opt) {
1871         case TIPC_IMPORTANCE:
1872                 value = tipc_port_importance(port);
1873                 break;
1874         case TIPC_SRC_DROPPABLE:
1875                 value = tipc_port_unreliable(port);
1876                 break;
1877         case TIPC_DEST_DROPPABLE:
1878                 value = tipc_port_unreturnable(port);
1879                 break;
1880         case TIPC_CONN_TIMEOUT:
1881                 value = tipc_sk(sk)->conn_timeout;
1882                 /* no need to set "res", since already 0 at this point */
1883                 break;
1884         case TIPC_NODE_RECVQ_DEPTH:
1885                 value = 0; /* was tipc_queue_size, now obsolete */
1886                 break;
1887         case TIPC_SOCK_RECVQ_DEPTH:
1888                 value = skb_queue_len(&sk->sk_receive_queue);
1889                 break;
1890         default:
1891                 res = -EINVAL;
1892         }
1893
1894         release_sock(sk);
1895
1896         if (res)
1897                 return res;     /* "get" failed */
1898
1899         if (len < sizeof(value))
1900                 return -EINVAL;
1901
1902         if (copy_to_user(ov, &value, sizeof(value)))
1903                 return -EFAULT;
1904
1905         return put_user(sizeof(value), ol);
1906 }
1907
1908 /* Protocol switches for the various types of TIPC sockets */
1909
1910 static const struct proto_ops msg_ops = {
1911         .owner          = THIS_MODULE,
1912         .family         = AF_TIPC,
1913         .release        = tipc_release,
1914         .bind           = tipc_bind,
1915         .connect        = tipc_connect,
1916         .socketpair     = sock_no_socketpair,
1917         .accept         = sock_no_accept,
1918         .getname        = tipc_getname,
1919         .poll           = tipc_poll,
1920         .ioctl          = sock_no_ioctl,
1921         .listen         = sock_no_listen,
1922         .shutdown       = tipc_shutdown,
1923         .setsockopt     = tipc_setsockopt,
1924         .getsockopt     = tipc_getsockopt,
1925         .sendmsg        = tipc_sendmsg,
1926         .recvmsg        = tipc_recvmsg,
1927         .mmap           = sock_no_mmap,
1928         .sendpage       = sock_no_sendpage
1929 };
1930
1931 static const struct proto_ops packet_ops = {
1932         .owner          = THIS_MODULE,
1933         .family         = AF_TIPC,
1934         .release        = tipc_release,
1935         .bind           = tipc_bind,
1936         .connect        = tipc_connect,
1937         .socketpair     = sock_no_socketpair,
1938         .accept         = tipc_accept,
1939         .getname        = tipc_getname,
1940         .poll           = tipc_poll,
1941         .ioctl          = sock_no_ioctl,
1942         .listen         = tipc_listen,
1943         .shutdown       = tipc_shutdown,
1944         .setsockopt     = tipc_setsockopt,
1945         .getsockopt     = tipc_getsockopt,
1946         .sendmsg        = tipc_send_packet,
1947         .recvmsg        = tipc_recvmsg,
1948         .mmap           = sock_no_mmap,
1949         .sendpage       = sock_no_sendpage
1950 };
1951
1952 static const struct proto_ops stream_ops = {
1953         .owner          = THIS_MODULE,
1954         .family         = AF_TIPC,
1955         .release        = tipc_release,
1956         .bind           = tipc_bind,
1957         .connect        = tipc_connect,
1958         .socketpair     = sock_no_socketpair,
1959         .accept         = tipc_accept,
1960         .getname        = tipc_getname,
1961         .poll           = tipc_poll,
1962         .ioctl          = sock_no_ioctl,
1963         .listen         = tipc_listen,
1964         .shutdown       = tipc_shutdown,
1965         .setsockopt     = tipc_setsockopt,
1966         .getsockopt     = tipc_getsockopt,
1967         .sendmsg        = tipc_send_stream,
1968         .recvmsg        = tipc_recv_stream,
1969         .mmap           = sock_no_mmap,
1970         .sendpage       = sock_no_sendpage
1971 };
1972
1973 static const struct net_proto_family tipc_family_ops = {
1974         .owner          = THIS_MODULE,
1975         .family         = AF_TIPC,
1976         .create         = tipc_sk_create
1977 };
1978
1979 static struct proto tipc_proto = {
1980         .name           = "TIPC",
1981         .owner          = THIS_MODULE,
1982         .obj_size       = sizeof(struct tipc_sock),
1983         .sysctl_rmem    = sysctl_tipc_rmem
1984 };
1985
1986 static struct proto tipc_proto_kern = {
1987         .name           = "TIPC",
1988         .obj_size       = sizeof(struct tipc_sock),
1989         .sysctl_rmem    = sysctl_tipc_rmem
1990 };
1991
1992 /**
1993  * tipc_socket_init - initialize TIPC socket interface
1994  *
1995  * Returns 0 on success, errno otherwise
1996  */
1997 int tipc_socket_init(void)
1998 {
1999         int res;
2000
2001         res = proto_register(&tipc_proto, 1);
2002         if (res) {
2003                 pr_err("Failed to register TIPC protocol type\n");
2004                 goto out;
2005         }
2006
2007         res = sock_register(&tipc_family_ops);
2008         if (res) {
2009                 pr_err("Failed to register TIPC socket type\n");
2010                 proto_unregister(&tipc_proto);
2011                 goto out;
2012         }
2013  out:
2014         return res;
2015 }
2016
2017 /**
2018  * tipc_socket_stop - stop TIPC socket interface
2019  */
2020 void tipc_socket_stop(void)
2021 {
2022         sock_unregister(tipc_family_ops.family);
2023         proto_unregister(&tipc_proto);
2024 }