RDS: Explicitly allocate rm in sendmsg()
authorAndy Grover <andy.grover@oracle.com>
Tue, 12 Jan 2010 20:56:06 +0000 (12:56 -0800)
committerAndy Grover <andy.grover@oracle.com>
Thu, 9 Sep 2010 01:11:36 +0000 (18:11 -0700)
r_m_copy_from_user used to allocate the rm as well as kernel
buffers for the data, and then copy the data in. Now, sendmsg()
allocates the rm, although the data buffer alloc still happens
in r_m_copy_from_user.

SGs are still allocated with rm, but now r_m_alloc_sgs() is
used to reserve them. This allows multiple SG lists to be
allocated from the one rm -- this is important once we also
want to alloc our rdma sgl from this pool.

Signed-off-by: Andy Grover <andy.grover@oracle.com>
net/rds/message.c
net/rds/rds.h
net/rds/send.c

index 4421d160b1a4262aeca60a8844d3ff043d3f51b9..3498cbcc7542752af3d42c68d886c151fdde79df 100644 (file)
@@ -214,17 +214,22 @@ int rds_message_add_rdma_dest_extension(struct rds_header *hdr, u32 r_key, u32 o
 }
 EXPORT_SYMBOL_GPL(rds_message_add_rdma_dest_extension);
 
-struct rds_message *rds_message_alloc(unsigned int nents, gfp_t gfp)
+/*
+ * Each rds_message is allocated with extra space for the scatterlist entries
+ * rds ops will need. This is to minimize memory allocation count. Then, each rds op
+ * can grab SGs when initializing its part of the rds_message.
+ */
+struct rds_message *rds_message_alloc(unsigned int extra_len, gfp_t gfp)
 {
        struct rds_message *rm;
 
-       rm = kzalloc(sizeof(struct rds_message) +
-                    (nents * sizeof(struct scatterlist)), gfp);
+       rm = kzalloc(sizeof(struct rds_message) + extra_len, gfp);
        if (!rm)
                goto out;
 
-       if (nents)
-               sg_init_table(rm->data.m_sg, nents);
+       rm->m_used_sgs = 0;
+       rm->m_total_sgs = extra_len / sizeof(struct scatterlist);
+
        atomic_set(&rm->m_refcount, 1);
        INIT_LIST_HEAD(&rm->m_sock_item);
        INIT_LIST_HEAD(&rm->m_conn_item);
@@ -234,6 +239,23 @@ out:
        return rm;
 }
 
+/*
+ * RDS ops use this to grab SG entries from the rm's sg pool.
+ */
+struct scatterlist *rds_message_alloc_sgs(struct rds_message *rm, int nents)
+{
+       struct scatterlist *sg_first = (struct scatterlist *) &rm[1];
+       struct scatterlist *sg_ret;
+
+       WARN_ON(rm->m_used_sgs + nents > rm->m_total_sgs);
+
+       sg_ret = &sg_first[rm->m_used_sgs];
+
+       rm->m_used_sgs += nents;
+
+       return sg_ret;
+}
+
 struct rds_message *rds_message_map_pages(unsigned long *page_addrs, unsigned int total_len)
 {
        struct rds_message *rm;
@@ -256,22 +278,15 @@ struct rds_message *rds_message_map_pages(unsigned long *page_addrs, unsigned in
        return rm;
 }
 
-struct rds_message *rds_message_copy_from_user(struct iovec *first_iov,
+int rds_message_copy_from_user(struct rds_message *rm, struct iovec *first_iov,
                                               size_t total_len)
 {
        unsigned long to_copy;
        unsigned long iov_off;
        unsigned long sg_off;
-       struct rds_message *rm;
        struct iovec *iov;
        struct scatterlist *sg;
-       int ret;
-
-       rm = rds_message_alloc(ceil(total_len, PAGE_SIZE), GFP_KERNEL);
-       if (rm == NULL) {
-               ret = -ENOMEM;
-               goto out;
-       }
+       int ret = 0;
 
        rm->m_inc.i_hdr.h_len = cpu_to_be32(total_len);
 
@@ -320,14 +335,8 @@ struct rds_message *rds_message_copy_from_user(struct iovec *first_iov,
                        sg++;
        }
 
-       ret = 0;
 out:
-       if (ret) {
-               if (rm)
-                       rds_message_put(rm);
-               rm = ERR_PTR(ret);
-       }
-       return rm;
+       return ret;
 }
 
 int rds_message_inc_copy_to_user(struct rds_incoming *inc,
index 07a750b3fb312879b8b73d2f65ca64e8ebe4d491..d29c71aabbd4ac08216217b3165e38b2a99b5fee 100644 (file)
@@ -268,9 +268,11 @@ struct rds_message {
                struct {
                        unsigned int            m_nents;
                        unsigned int            m_count;
-                       struct scatterlist      m_sg[0];
+                       struct scatterlist      *m_sg;
                } data;
        };
+       unsigned int            m_used_sgs;
+       unsigned int            m_total_sgs;
 };
 
 /*
@@ -573,7 +575,8 @@ rds_conn_connecting(struct rds_connection *conn)
 
 /* message.c */
 struct rds_message *rds_message_alloc(unsigned int nents, gfp_t gfp);
-struct rds_message *rds_message_copy_from_user(struct iovec *first_iov,
+struct scatterlist *rds_message_alloc_sgs(struct rds_message *rm, int nents);
+int rds_message_copy_from_user(struct rds_message *rm, struct iovec *first_iov,
                                               size_t total_len);
 struct rds_message *rds_message_map_pages(unsigned long *page_addrs, unsigned int total_len);
 void rds_message_populate_header(struct rds_header *hdr, __be16 sport,
index 19dfd025498eadbd5560fddaf9b721b5de30bd19..28d09447207b8b765b3df543acbeffa544fde906 100644 (file)
@@ -758,6 +758,19 @@ out:
        return *queued;
 }
 
+/*
+ * rds_message is getting to be quite complicated, and we'd like to allocate
+ * it all in one go. This figures out how big it needs to be up front.
+ */
+static int rds_rm_size(struct msghdr *msg, int data_len)
+{
+       int size = 0;
+
+       size += ceil(data_len, PAGE_SIZE) * sizeof(struct scatterlist);
+
+       return size;
+}
+
 static int rds_cmsg_send(struct rds_sock *rs, struct rds_message *rm,
                         struct msghdr *msg, int *allocated_mr)
 {
@@ -845,13 +858,23 @@ int rds_sendmsg(struct kiocb *iocb, struct socket *sock, struct msghdr *msg,
                goto out;
        }
 
-       rm = rds_message_copy_from_user(msg->msg_iov, payload_len);
-       if (IS_ERR(rm)) {
-               ret = PTR_ERR(rm);
-               rm = NULL;
+       /* size of rm including all sgs */
+       ret = rds_rm_size(msg, payload_len);
+       if (ret < 0)
+               goto out;
+
+       rm = rds_message_alloc(ret, GFP_KERNEL);
+       if (!rm) {
+               ret = -ENOMEM;
                goto out;
        }
 
+       rm->data.m_sg = rds_message_alloc_sgs(rm, ceil(payload_len, PAGE_SIZE));
+       /* XXX fix this to not allocate memory */
+       ret = rds_message_copy_from_user(rm, msg->msg_iov, payload_len);
+       if (ret)
+               goto out;
+
        rm->m_daddr = daddr;
 
        /* rds_conn_create has a spinlock that runs with IRQ off.