Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph...
authorLinus Torvalds <torvalds@linux-foundation.org>
Sun, 24 Jan 2016 20:34:13 +0000 (12:34 -0800)
committerLinus Torvalds <torvalds@linux-foundation.org>
Sun, 24 Jan 2016 20:34:13 +0000 (12:34 -0800)
Pull Ceph updates from Sage Weil:
 "The two main changes are aio support in CephFS, and a series that
  fixes several issues in the authentication key timeout/renewal code.

  On top of that are a variety of cleanups and minor bug fixes"

* 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client:
  libceph: remove outdated comment
  libceph: kill off ceph_x_ticket_handler::validity
  libceph: invalidate AUTH in addition to a service ticket
  libceph: fix authorizer invalidation, take 2
  libceph: clear messenger auth_retry flag if we fault
  libceph: fix ceph_msg_revoke()
  libceph: use list_for_each_entry_safe
  ceph: use i_size_{read,write} to get/set i_size
  ceph: re-send AIO write request when getting -EOLDSNAP error
  ceph: Asynchronous IO support
  ceph: Avoid to propagate the invalid page point
  ceph: fix double page_unlock() in page_mkwrite()
  rbd: delete an unnecessary check before rbd_dev_destroy()
  libceph: use list_next_entry instead of list_entry_next
  ceph: ceph_frag_contains_value can be boolean
  ceph: remove unused functions in ceph_frag.h

drivers/block/rbd.c
fs/ceph/addr.c
fs/ceph/cache.c
fs/ceph/file.c
fs/ceph/inode.c
include/linux/ceph/ceph_frag.h
include/linux/ceph/messenger.h
net/ceph/auth_x.c
net/ceph/auth_x.h
net/ceph/messenger.c
net/ceph/mon_client.c

index 81ea69fee7ca183313b8e8322833062262279187..4a876785b68cd5c550dbbb1d2b63071446454081 100644 (file)
@@ -5185,8 +5185,7 @@ static int rbd_dev_probe_parent(struct rbd_device *rbd_dev, int depth)
 
 out_err:
        rbd_dev_unparent(rbd_dev);
-       if (parent)
-               rbd_dev_destroy(parent);
+       rbd_dev_destroy(parent);
        return ret;
 }
 
index b7d218a168fb81c2c028ea38ffccf2a17a1d68d5..c22213789090f975b3680b10e022a1e5f2f9fdb4 100644 (file)
@@ -1108,7 +1108,7 @@ retry_locked:
                return 0;
 
        /* past end of file? */
-       i_size = inode->i_size;   /* caller holds i_mutex */
+       i_size = i_size_read(inode);
 
        if (page_off >= i_size ||
            (pos_in_page == 0 && (pos+len) >= i_size &&
@@ -1149,7 +1149,6 @@ static int ceph_write_begin(struct file *file, struct address_space *mapping,
                page = grab_cache_page_write_begin(mapping, index, 0);
                if (!page)
                        return -ENOMEM;
-               *pagep = page;
 
                dout("write_begin file %p inode %p page %p %d~%d\n", file,
                     inode, page, (int)pos, (int)len);
@@ -1184,8 +1183,7 @@ static int ceph_write_end(struct file *file, struct address_space *mapping,
                zero_user_segment(page, from+copied, len);
 
        /* did file size increase? */
-       /* (no need for i_size_read(); we caller holds i_mutex */
-       if (pos+copied > inode->i_size)
+       if (pos+copied > i_size_read(inode))
                check_cap = ceph_inode_set_size(inode, pos+copied);
 
        if (!PageUptodate(page))
@@ -1378,11 +1376,13 @@ static int ceph_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf)
 
        ret = VM_FAULT_NOPAGE;
        if ((off > size) ||
-           (page->mapping != inode->i_mapping))
+           (page->mapping != inode->i_mapping)) {
+               unlock_page(page);
                goto out;
+       }
 
        ret = ceph_update_writeable_page(vma->vm_file, off, len, page);
-       if (ret == 0) {
+       if (ret >= 0) {
                /* success.  we'll keep the page locked. */
                set_page_dirty(page);
                ret = VM_FAULT_LOCKED;
@@ -1393,8 +1393,6 @@ static int ceph_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf)
                        ret = VM_FAULT_SIGBUS;
        }
 out:
-       if (ret != VM_FAULT_LOCKED)
-               unlock_page(page);
        if (ret == VM_FAULT_LOCKED ||
            ci->i_inline_version != CEPH_INLINE_NONE) {
                int dirty;
index 7680e2626815d133d3b5e9998e529535bd9231f9..a351480dbabc95891e4b61f83fb92485f8ea7b18 100644 (file)
@@ -106,7 +106,7 @@ static uint16_t ceph_fscache_inode_get_aux(const void *cookie_netfs_data,
 
        memset(&aux, 0, sizeof(aux));
        aux.mtime = inode->i_mtime;
-       aux.size = inode->i_size;
+       aux.size = i_size_read(inode);
 
        memcpy(buffer, &aux, sizeof(aux));
 
@@ -117,9 +117,7 @@ static void ceph_fscache_inode_get_attr(const void *cookie_netfs_data,
                                        uint64_t *size)
 {
        const struct ceph_inode_info* ci = cookie_netfs_data;
-       const struct inode* inode = &ci->vfs_inode;
-
-       *size = inode->i_size;
+       *size = i_size_read(&ci->vfs_inode);
 }
 
 static enum fscache_checkaux ceph_fscache_inode_check_aux(
@@ -134,7 +132,7 @@ static enum fscache_checkaux ceph_fscache_inode_check_aux(
 
        memset(&aux, 0, sizeof(aux));
        aux.mtime = inode->i_mtime;
-       aux.size = inode->i_size;
+       aux.size = i_size_read(inode);
 
        if (memcmp(data, &aux, sizeof(aux)) != 0)
                return FSCACHE_CHECKAUX_OBSOLETE;
index 10c5ae79696ee860a1cebb6f99dec2d2bde4717a..86a9c383955e56037eb38419b1e4617317d32237 100644 (file)
@@ -397,8 +397,9 @@ int ceph_release(struct inode *inode, struct file *file)
 }
 
 enum {
-       CHECK_EOF = 1,
-       READ_INLINE = 2,
+       HAVE_RETRIED = 1,
+       CHECK_EOF =    2,
+       READ_INLINE =  3,
 };
 
 /*
@@ -411,17 +412,15 @@ enum {
 static int striped_read(struct inode *inode,
                        u64 off, u64 len,
                        struct page **pages, int num_pages,
-                       int *checkeof, bool o_direct,
-                       unsigned long buf_align)
+                       int *checkeof)
 {
        struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
        struct ceph_inode_info *ci = ceph_inode(inode);
        u64 pos, this_len, left;
-       int io_align, page_align;
-       int pages_left;
-       int read;
+       loff_t i_size;
+       int page_align, pages_left;
+       int read, ret;
        struct page **page_pos;
-       int ret;
        bool hit_stripe, was_short;
 
        /*
@@ -432,13 +431,9 @@ static int striped_read(struct inode *inode,
        page_pos = pages;
        pages_left = num_pages;
        read = 0;
-       io_align = off & ~PAGE_MASK;
 
 more:
-       if (o_direct)
-               page_align = (pos - io_align + buf_align) & ~PAGE_MASK;
-       else
-               page_align = pos & ~PAGE_MASK;
+       page_align = pos & ~PAGE_MASK;
        this_len = left;
        ret = ceph_osdc_readpages(&fsc->client->osdc, ceph_vino(inode),
                                  &ci->i_layout, pos, &this_len,
@@ -452,13 +447,12 @@ more:
        dout("striped_read %llu~%llu (read %u) got %d%s%s\n", pos, left, read,
             ret, hit_stripe ? " HITSTRIPE" : "", was_short ? " SHORT" : "");
 
+       i_size = i_size_read(inode);
        if (ret >= 0) {
                int didpages;
-               if (was_short && (pos + ret < inode->i_size)) {
-                       int zlen = min(this_len - ret,
-                                      inode->i_size - pos - ret);
-                       int zoff = (o_direct ? buf_align : io_align) +
-                                   read + ret;
+               if (was_short && (pos + ret < i_size)) {
+                       int zlen = min(this_len - ret, i_size - pos - ret);
+                       int zoff = (off & ~PAGE_MASK) + read + ret;
                        dout(" zero gap %llu to %llu\n",
                                pos + ret, pos + ret + zlen);
                        ceph_zero_page_vector_range(zoff, zlen, pages);
@@ -473,14 +467,14 @@ more:
                pages_left -= didpages;
 
                /* hit stripe and need continue*/
-               if (left && hit_stripe && pos < inode->i_size)
+               if (left && hit_stripe && pos < i_size)
                        goto more;
        }
 
        if (read > 0) {
                ret = read;
                /* did we bounce off eof? */
-               if (pos + left > inode->i_size)
+               if (pos + left > i_size)
                        *checkeof = CHECK_EOF;
        }
 
@@ -521,54 +515,28 @@ static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *i,
        if (ret < 0)
                return ret;
 
-       if (iocb->ki_flags & IOCB_DIRECT) {
-               while (iov_iter_count(i)) {
-                       size_t start;
-                       ssize_t n;
-
-                       n = dio_get_pagev_size(i);
-                       pages = dio_get_pages_alloc(i, n, &start, &num_pages);
-                       if (IS_ERR(pages))
-                               return PTR_ERR(pages);
-
-                       ret = striped_read(inode, off, n,
-                                          pages, num_pages, checkeof,
-                                          1, start);
-
-                       ceph_put_page_vector(pages, num_pages, true);
-
-                       if (ret <= 0)
-                               break;
-                       off += ret;
-                       iov_iter_advance(i, ret);
-                       if (ret < n)
+       num_pages = calc_pages_for(off, len);
+       pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
+       if (IS_ERR(pages))
+               return PTR_ERR(pages);
+       ret = striped_read(inode, off, len, pages,
+                               num_pages, checkeof);
+       if (ret > 0) {
+               int l, k = 0;
+               size_t left = ret;
+
+               while (left) {
+                       size_t page_off = off & ~PAGE_MASK;
+                       size_t copy = min_t(size_t, left,
+                                           PAGE_SIZE - page_off);
+                       l = copy_page_to_iter(pages[k++], page_off, copy, i);
+                       off += l;
+                       left -= l;
+                       if (l < copy)
                                break;
                }
-       } else {
-               num_pages = calc_pages_for(off, len);
-               pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
-               if (IS_ERR(pages))
-                       return PTR_ERR(pages);
-               ret = striped_read(inode, off, len, pages,
-                                       num_pages, checkeof, 0, 0);
-               if (ret > 0) {
-                       int l, k = 0;
-                       size_t left = ret;
-
-                       while (left) {
-                               size_t page_off = off & ~PAGE_MASK;
-                               size_t copy = min_t(size_t,
-                                                   PAGE_SIZE - page_off, left);
-                               l = copy_page_to_iter(pages[k++], page_off,
-                                                     copy, i);
-                               off += l;
-                               left -= l;
-                               if (l < copy)
-                                       break;
-                       }
-               }
-               ceph_release_page_vector(pages, num_pages);
        }
+       ceph_release_page_vector(pages, num_pages);
 
        if (off > iocb->ki_pos) {
                ret = off - iocb->ki_pos;
@@ -579,6 +547,193 @@ static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *i,
        return ret;
 }
 
+struct ceph_aio_request {
+       struct kiocb *iocb;
+       size_t total_len;
+       int write;
+       int error;
+       struct list_head osd_reqs;
+       unsigned num_reqs;
+       atomic_t pending_reqs;
+       struct timespec mtime;
+       struct ceph_cap_flush *prealloc_cf;
+};
+
+struct ceph_aio_work {
+       struct work_struct work;
+       struct ceph_osd_request *req;
+};
+
+static void ceph_aio_retry_work(struct work_struct *work);
+
+static void ceph_aio_complete(struct inode *inode,
+                             struct ceph_aio_request *aio_req)
+{
+       struct ceph_inode_info *ci = ceph_inode(inode);
+       int ret;
+
+       if (!atomic_dec_and_test(&aio_req->pending_reqs))
+               return;
+
+       ret = aio_req->error;
+       if (!ret)
+               ret = aio_req->total_len;
+
+       dout("ceph_aio_complete %p rc %d\n", inode, ret);
+
+       if (ret >= 0 && aio_req->write) {
+               int dirty;
+
+               loff_t endoff = aio_req->iocb->ki_pos + aio_req->total_len;
+               if (endoff > i_size_read(inode)) {
+                       if (ceph_inode_set_size(inode, endoff))
+                               ceph_check_caps(ci, CHECK_CAPS_AUTHONLY, NULL);
+               }
+
+               spin_lock(&ci->i_ceph_lock);
+               ci->i_inline_version = CEPH_INLINE_NONE;
+               dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR,
+                                              &aio_req->prealloc_cf);
+               spin_unlock(&ci->i_ceph_lock);
+               if (dirty)
+                       __mark_inode_dirty(inode, dirty);
+
+       }
+
+       ceph_put_cap_refs(ci, (aio_req->write ? CEPH_CAP_FILE_WR :
+                                               CEPH_CAP_FILE_RD));
+
+       aio_req->iocb->ki_complete(aio_req->iocb, ret, 0);
+
+       ceph_free_cap_flush(aio_req->prealloc_cf);
+       kfree(aio_req);
+}
+
+static void ceph_aio_complete_req(struct ceph_osd_request *req,
+                                 struct ceph_msg *msg)
+{
+       int rc = req->r_result;
+       struct inode *inode = req->r_inode;
+       struct ceph_aio_request *aio_req = req->r_priv;
+       struct ceph_osd_data *osd_data = osd_req_op_extent_osd_data(req, 0);
+       int num_pages = calc_pages_for((u64)osd_data->alignment,
+                                      osd_data->length);
+
+       dout("ceph_aio_complete_req %p rc %d bytes %llu\n",
+            inode, rc, osd_data->length);
+
+       if (rc == -EOLDSNAPC) {
+               struct ceph_aio_work *aio_work;
+               BUG_ON(!aio_req->write);
+
+               aio_work = kmalloc(sizeof(*aio_work), GFP_NOFS);
+               if (aio_work) {
+                       INIT_WORK(&aio_work->work, ceph_aio_retry_work);
+                       aio_work->req = req;
+                       queue_work(ceph_inode_to_client(inode)->wb_wq,
+                                  &aio_work->work);
+                       return;
+               }
+               rc = -ENOMEM;
+       } else if (!aio_req->write) {
+               if (rc == -ENOENT)
+                       rc = 0;
+               if (rc >= 0 && osd_data->length > rc) {
+                       int zoff = osd_data->alignment + rc;
+                       int zlen = osd_data->length - rc;
+                       /*
+                        * If read is satisfied by single OSD request,
+                        * it can pass EOF. Otherwise read is within
+                        * i_size.
+                        */
+                       if (aio_req->num_reqs == 1) {
+                               loff_t i_size = i_size_read(inode);
+                               loff_t endoff = aio_req->iocb->ki_pos + rc;
+                               if (endoff < i_size)
+                                       zlen = min_t(size_t, zlen,
+                                                    i_size - endoff);
+                               aio_req->total_len = rc + zlen;
+                       }
+
+                       if (zlen > 0)
+                               ceph_zero_page_vector_range(zoff, zlen,
+                                                           osd_data->pages);
+               }
+       }
+
+       ceph_put_page_vector(osd_data->pages, num_pages, false);
+       ceph_osdc_put_request(req);
+
+       if (rc < 0)
+               cmpxchg(&aio_req->error, 0, rc);
+
+       ceph_aio_complete(inode, aio_req);
+       return;
+}
+
+static void ceph_aio_retry_work(struct work_struct *work)
+{
+       struct ceph_aio_work *aio_work =
+               container_of(work, struct ceph_aio_work, work);
+       struct ceph_osd_request *orig_req = aio_work->req;
+       struct ceph_aio_request *aio_req = orig_req->r_priv;
+       struct inode *inode = orig_req->r_inode;
+       struct ceph_inode_info *ci = ceph_inode(inode);
+       struct ceph_snap_context *snapc;
+       struct ceph_osd_request *req;
+       int ret;
+
+       spin_lock(&ci->i_ceph_lock);
+       if (__ceph_have_pending_cap_snap(ci)) {
+               struct ceph_cap_snap *capsnap =
+                       list_last_entry(&ci->i_cap_snaps,
+                                       struct ceph_cap_snap,
+                                       ci_item);
+               snapc = ceph_get_snap_context(capsnap->context);
+       } else {
+               BUG_ON(!ci->i_head_snapc);
+               snapc = ceph_get_snap_context(ci->i_head_snapc);
+       }
+       spin_unlock(&ci->i_ceph_lock);
+
+       req = ceph_osdc_alloc_request(orig_req->r_osdc, snapc, 2,
+                       false, GFP_NOFS);
+       if (IS_ERR(req)) {
+               ret = PTR_ERR(req);
+               req = orig_req;
+               goto out;
+       }
+
+       req->r_flags =  CEPH_OSD_FLAG_ORDERSNAP |
+                       CEPH_OSD_FLAG_ONDISK |
+                       CEPH_OSD_FLAG_WRITE;
+       req->r_base_oloc = orig_req->r_base_oloc;
+       req->r_base_oid = orig_req->r_base_oid;
+
+       req->r_ops[0] = orig_req->r_ops[0];
+       osd_req_op_init(req, 1, CEPH_OSD_OP_STARTSYNC, 0);
+
+       ceph_osdc_build_request(req, req->r_ops[0].extent.offset,
+                               snapc, CEPH_NOSNAP, &aio_req->mtime);
+
+       ceph_put_snap_context(snapc);
+       ceph_osdc_put_request(orig_req);
+
+       req->r_callback = ceph_aio_complete_req;
+       req->r_inode = inode;
+       req->r_priv = aio_req;
+
+       ret = ceph_osdc_start_request(req->r_osdc, req, false);
+out:
+       if (ret < 0) {
+               BUG_ON(ret == -EOLDSNAPC);
+               req->r_result = ret;
+               ceph_aio_complete_req(req, NULL);
+       }
+
+       kfree(aio_work);
+}
+
 /*
  * Write commit request unsafe callback, called to tell us when a
  * request is unsafe (that is, in flight--has been handed to the
@@ -612,16 +767,10 @@ static void ceph_sync_write_unsafe(struct ceph_osd_request *req, bool unsafe)
 }
 
 
-/*
- * Synchronous write, straight from __user pointer or user pages.
- *
- * If write spans object boundary, just do multiple writes.  (For a
- * correct atomic write, we should e.g. take write locks on all
- * objects, rollback on failure, etc.)
- */
 static ssize_t
-ceph_sync_direct_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos,
-                      struct ceph_snap_context *snapc)
+ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter,
+                      struct ceph_snap_context *snapc,
+                      struct ceph_cap_flush **pcf)
 {
        struct file *file = iocb->ki_filp;
        struct inode *inode = file_inode(file);
@@ -630,44 +779,52 @@ ceph_sync_direct_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos,
        struct ceph_vino vino;
        struct ceph_osd_request *req;
        struct page **pages;
-       int num_pages;
-       int written = 0;
+       struct ceph_aio_request *aio_req = NULL;
+       int num_pages = 0;
        int flags;
-       int check_caps = 0;
        int ret;
        struct timespec mtime = CURRENT_TIME;
-       size_t count = iov_iter_count(from);
+       size_t count = iov_iter_count(iter);
+       loff_t pos = iocb->ki_pos;
+       bool write = iov_iter_rw(iter) == WRITE;
 
-       if (ceph_snap(file_inode(file)) != CEPH_NOSNAP)
+       if (write && ceph_snap(file_inode(file)) != CEPH_NOSNAP)
                return -EROFS;
 
-       dout("sync_direct_write on file %p %lld~%u\n", file, pos,
-            (unsigned)count);
+       dout("sync_direct_read_write (%s) on file %p %lld~%u\n",
+            (write ? "write" : "read"), file, pos, (unsigned)count);
 
        ret = filemap_write_and_wait_range(inode->i_mapping, pos, pos + count);
        if (ret < 0)
                return ret;
 
-       ret = invalidate_inode_pages2_range(inode->i_mapping,
-                                           pos >> PAGE_CACHE_SHIFT,
-                                           (pos + count) >> PAGE_CACHE_SHIFT);
-       if (ret < 0)
-               dout("invalidate_inode_pages2_range returned %d\n", ret);
+       if (write) {
+               ret = invalidate_inode_pages2_range(inode->i_mapping,
+                                       pos >> PAGE_CACHE_SHIFT,
+                                       (pos + count) >> PAGE_CACHE_SHIFT);
+               if (ret < 0)
+                       dout("invalidate_inode_pages2_range returned %d\n", ret);
 
-       flags = CEPH_OSD_FLAG_ORDERSNAP |
-               CEPH_OSD_FLAG_ONDISK |
-               CEPH_OSD_FLAG_WRITE;
+               flags = CEPH_OSD_FLAG_ORDERSNAP |
+                       CEPH_OSD_FLAG_ONDISK |
+                       CEPH_OSD_FLAG_WRITE;
+       } else {
+               flags = CEPH_OSD_FLAG_READ;
+       }
 
-       while (iov_iter_count(from) > 0) {
-               u64 len = dio_get_pagev_size(from);
-               size_t start;
-               ssize_t n;
+       while (iov_iter_count(iter) > 0) {
+               u64 size = dio_get_pagev_size(iter);
+               size_t start = 0;
+               ssize_t len;
 
                vino = ceph_vino(inode);
                req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
-                                           vino, pos, &len, 0,
-                                           2,/*include a 'startsync' command*/
-                                           CEPH_OSD_OP_WRITE, flags, snapc,
+                                           vino, pos, &size, 0,
+                                           /*include a 'startsync' command*/
+                                           write ? 2 : 1,
+                                           write ? CEPH_OSD_OP_WRITE :
+                                                   CEPH_OSD_OP_READ,
+                                           flags, snapc,
                                            ci->i_truncate_seq,
                                            ci->i_truncate_size,
                                            false);
@@ -676,10 +833,8 @@ ceph_sync_direct_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos,
                        break;
                }
 
-               osd_req_op_init(req, 1, CEPH_OSD_OP_STARTSYNC, 0);
-
-               n = len;
-               pages = dio_get_pages_alloc(from, len, &start, &num_pages);
+               len = size;
+               pages = dio_get_pages_alloc(iter, len, &start, &num_pages);
                if (IS_ERR(pages)) {
                        ceph_osdc_put_request(req);
                        ret = PTR_ERR(pages);
@@ -687,47 +842,128 @@ ceph_sync_direct_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos,
                }
 
                /*
-                * throw out any page cache pages in this range. this
-                * may block.
+                * To simplify error handling, allow AIO when IO within i_size
+                * or IO can be satisfied by single OSD request.
                 */
-               truncate_inode_pages_range(inode->i_mapping, pos,
-                                  (pos+n) | (PAGE_CACHE_SIZE-1));
-               osd_req_op_extent_osd_data_pages(req, 0, pages, n, start,
-                                               false, false);
+               if (pos == iocb->ki_pos && !is_sync_kiocb(iocb) &&
+                   (len == count || pos + count <= i_size_read(inode))) {
+                       aio_req = kzalloc(sizeof(*aio_req), GFP_KERNEL);
+                       if (aio_req) {
+                               aio_req->iocb = iocb;
+                               aio_req->write = write;
+                               INIT_LIST_HEAD(&aio_req->osd_reqs);
+                               if (write) {
+                                       aio_req->mtime = mtime;
+                                       swap(aio_req->prealloc_cf, *pcf);
+                               }
+                       }
+                       /* ignore error */
+               }
+
+               if (write) {
+                       /*
+                        * throw out any page cache pages in this range. this
+                        * may block.
+                        */
+                       truncate_inode_pages_range(inode->i_mapping, pos,
+                                       (pos+len) | (PAGE_CACHE_SIZE - 1));
+
+                       osd_req_op_init(req, 1, CEPH_OSD_OP_STARTSYNC, 0);
+               }
+
+
+               osd_req_op_extent_osd_data_pages(req, 0, pages, len, start,
+                                                false, false);
 
-               /* BUG_ON(vino.snap != CEPH_NOSNAP); */
                ceph_osdc_build_request(req, pos, snapc, vino.snap, &mtime);
 
-               ret = ceph_osdc_start_request(&fsc->client->osdc, req, false);
+               if (aio_req) {
+                       aio_req->total_len += len;
+                       aio_req->num_reqs++;
+                       atomic_inc(&aio_req->pending_reqs);
+
+                       req->r_callback = ceph_aio_complete_req;
+                       req->r_inode = inode;
+                       req->r_priv = aio_req;
+                       list_add_tail(&req->r_unsafe_item, &aio_req->osd_reqs);
+
+                       pos += len;
+                       iov_iter_advance(iter, len);
+                       continue;
+               }
+
+               ret = ceph_osdc_start_request(req->r_osdc, req, false);
                if (!ret)
                        ret = ceph_osdc_wait_request(&fsc->client->osdc, req);
 
+               size = i_size_read(inode);
+               if (!write) {
+                       if (ret == -ENOENT)
+                               ret = 0;
+                       if (ret >= 0 && ret < len && pos + ret < size) {
+                               int zlen = min_t(size_t, len - ret,
+                                                size - pos - ret);
+                               ceph_zero_page_vector_range(start + ret, zlen,
+                                                           pages);
+                               ret += zlen;
+                       }
+                       if (ret >= 0)
+                               len = ret;
+               }
+
                ceph_put_page_vector(pages, num_pages, false);
 
                ceph_osdc_put_request(req);
-               if (ret)
+               if (ret < 0)
                        break;
-               pos += n;
-               written += n;
-               iov_iter_advance(from, n);
 
-               if (pos > i_size_read(inode)) {
-                       check_caps = ceph_inode_set_size(inode, pos);
-                       if (check_caps)
+               pos += len;
+               iov_iter_advance(iter, len);
+
+               if (!write && pos >= size)
+                       break;
+
+               if (write && pos > size) {
+                       if (ceph_inode_set_size(inode, pos))
                                ceph_check_caps(ceph_inode(inode),
                                                CHECK_CAPS_AUTHONLY,
                                                NULL);
                }
        }
 
-       if (ret != -EOLDSNAPC && written > 0) {
+       if (aio_req) {
+               if (aio_req->num_reqs == 0) {
+                       kfree(aio_req);
+                       return ret;
+               }
+
+               ceph_get_cap_refs(ci, write ? CEPH_CAP_FILE_WR :
+                                             CEPH_CAP_FILE_RD);
+
+               while (!list_empty(&aio_req->osd_reqs)) {
+                       req = list_first_entry(&aio_req->osd_reqs,
+                                              struct ceph_osd_request,
+                                              r_unsafe_item);
+                       list_del_init(&req->r_unsafe_item);
+                       if (ret >= 0)
+                               ret = ceph_osdc_start_request(req->r_osdc,
+                                                             req, false);
+                       if (ret < 0) {
+                               BUG_ON(ret == -EOLDSNAPC);
+                               req->r_result = ret;
+                               ceph_aio_complete_req(req, NULL);
+                       }
+               }
+               return -EIOCBQUEUED;
+       }
+
+       if (ret != -EOLDSNAPC && pos > iocb->ki_pos) {
+               ret = pos - iocb->ki_pos;
                iocb->ki_pos = pos;
-               ret = written;
        }
        return ret;
 }
 
-
 /*
  * Synchronous write, straight from __user pointer or user pages.
  *
@@ -897,8 +1133,14 @@ again:
                     ceph_cap_string(got));
 
                if (ci->i_inline_version == CEPH_INLINE_NONE) {
-                       /* hmm, this isn't really async... */
-                       ret = ceph_sync_read(iocb, to, &retry_op);
+                       if (!retry_op && (iocb->ki_flags & IOCB_DIRECT)) {
+                               ret = ceph_direct_read_write(iocb, to,
+                                                            NULL, NULL);
+                               if (ret >= 0 && ret < len)
+                                       retry_op = CHECK_EOF;
+                       } else {
+                               ret = ceph_sync_read(iocb, to, &retry_op);
+                       }
                } else {
                        retry_op = READ_INLINE;
                }
@@ -916,7 +1158,7 @@ again:
                pinned_page = NULL;
        }
        ceph_put_cap_refs(ci, got);
-       if (retry_op && ret >= 0) {
+       if (retry_op > HAVE_RETRIED && ret >= 0) {
                int statret;
                struct page *page = NULL;
                loff_t i_size;
@@ -968,12 +1210,11 @@ again:
                if (retry_op == CHECK_EOF && iocb->ki_pos < i_size &&
                    ret < len) {
                        dout("sync_read hit hole, ppos %lld < size %lld"
-                            ", reading more\n", iocb->ki_pos,
-                            inode->i_size);
+                            ", reading more\n", iocb->ki_pos, i_size);
 
                        read += ret;
                        len -= ret;
-                       retry_op = 0;
+                       retry_op = HAVE_RETRIED;
                        goto again;
                }
        }
@@ -1052,7 +1293,7 @@ retry_snap:
        }
 
        dout("aio_write %p %llx.%llx %llu~%zd getting caps. i_size %llu\n",
-            inode, ceph_vinop(inode), pos, count, inode->i_size);
+            inode, ceph_vinop(inode), pos, count, i_size_read(inode));
        if (fi->fmode & CEPH_FILE_MODE_LAZY)
                want = CEPH_CAP_FILE_BUFFER | CEPH_CAP_FILE_LAZYIO;
        else
@@ -1088,8 +1329,8 @@ retry_snap:
                /* we might need to revert back to that point */
                data = *from;
                if (iocb->ki_flags & IOCB_DIRECT)
-                       written = ceph_sync_direct_write(iocb, &data, pos,
-                                                        snapc);
+                       written = ceph_direct_read_write(iocb, &data, snapc,
+                                                        &prealloc_cf);
                else
                        written = ceph_sync_write(iocb, &data, pos, snapc);
                if (written == -EOLDSNAPC) {
@@ -1104,7 +1345,7 @@ retry_snap:
                        iov_iter_advance(from, written);
                ceph_put_snap_context(snapc);
        } else {
-               loff_t old_size = inode->i_size;
+               loff_t old_size = i_size_read(inode);
                /*
                 * No need to acquire the i_truncate_mutex. Because
                 * the MDS revokes Fwb caps before sending truncate
@@ -1115,7 +1356,7 @@ retry_snap:
                written = generic_perform_write(file, from, pos);
                if (likely(written >= 0))
                        iocb->ki_pos = pos + written;
-               if (inode->i_size > old_size)
+               if (i_size_read(inode) > old_size)
                        ceph_fscache_update_objectsize(inode);
                inode_unlock(inode);
        }
@@ -1160,6 +1401,7 @@ out_unlocked:
 static loff_t ceph_llseek(struct file *file, loff_t offset, int whence)
 {
        struct inode *inode = file->f_mapping->host;
+       loff_t i_size;
        int ret;
 
        inode_lock(inode);
@@ -1172,9 +1414,10 @@ static loff_t ceph_llseek(struct file *file, loff_t offset, int whence)
                }
        }
 
+       i_size = i_size_read(inode);
        switch (whence) {
        case SEEK_END:
-               offset += inode->i_size;
+               offset += i_size;
                break;
        case SEEK_CUR:
                /*
@@ -1190,17 +1433,17 @@ static loff_t ceph_llseek(struct file *file, loff_t offset, int whence)
                offset += file->f_pos;
                break;
        case SEEK_DATA:
-               if (offset >= inode->i_size) {
+               if (offset >= i_size) {
                        ret = -ENXIO;
                        goto out;
                }
                break;
        case SEEK_HOLE:
-               if (offset >= inode->i_size) {
+               if (offset >= i_size) {
                        ret = -ENXIO;
                        goto out;
                }
-               offset = inode->i_size;
+               offset = i_size;
                break;
        }
 
index da55eb8bcffab89755baf5229b92ededf49dd484..fb4ba2e4e2a5fa5c5d62afa3b94f758c7906687b 100644 (file)
@@ -548,7 +548,7 @@ int ceph_fill_file_size(struct inode *inode, int issued,
        if (ceph_seq_cmp(truncate_seq, ci->i_truncate_seq) > 0 ||
            (truncate_seq == ci->i_truncate_seq && size > inode->i_size)) {
                dout("size %lld -> %llu\n", inode->i_size, size);
-               inode->i_size = size;
+               i_size_write(inode, size);
                inode->i_blocks = (size + (1<<9) - 1) >> 9;
                ci->i_reported_size = size;
                if (truncate_seq != ci->i_truncate_seq) {
@@ -808,7 +808,7 @@ static int fill_inode(struct inode *inode, struct page *locked_page,
                        spin_unlock(&ci->i_ceph_lock);
 
                        err = -EINVAL;
-                       if (WARN_ON(symlen != inode->i_size))
+                       if (WARN_ON(symlen != i_size_read(inode)))
                                goto out;
 
                        err = -ENOMEM;
@@ -1549,7 +1549,7 @@ int ceph_inode_set_size(struct inode *inode, loff_t size)
 
        spin_lock(&ci->i_ceph_lock);
        dout("set_size %p %llu -> %llu\n", inode, inode->i_size, size);
-       inode->i_size = size;
+       i_size_write(inode, size);
        inode->i_blocks = (size + (1 << 9) - 1) >> 9;
 
        /* tell the MDS if we are approaching max_size */
@@ -1911,7 +1911,7 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr)
                     inode->i_size, attr->ia_size);
                if ((issued & CEPH_CAP_FILE_EXCL) &&
                    attr->ia_size > inode->i_size) {
-                       inode->i_size = attr->ia_size;
+                       i_size_write(inode, attr->ia_size);
                        inode->i_blocks =
                                (attr->ia_size + (1 << 9) - 1) >> 9;
                        inode->i_ctime = attr->ia_ctime;
index 5babb8e95352a6a7468feca70a343fafb56bddad..b827e066e55a198ec13f41d52b52964bd72edf4d 100644 (file)
@@ -40,46 +40,11 @@ static inline __u32 ceph_frag_mask_shift(__u32 f)
        return 24 - ceph_frag_bits(f);
 }
 
-static inline int ceph_frag_contains_value(__u32 f, __u32 v)
+static inline bool ceph_frag_contains_value(__u32 f, __u32 v)
 {
        return (v & ceph_frag_mask(f)) == ceph_frag_value(f);
 }
-static inline int ceph_frag_contains_frag(__u32 f, __u32 sub)
-{
-       /* is sub as specific as us, and contained by us? */
-       return ceph_frag_bits(sub) >= ceph_frag_bits(f) &&
-              (ceph_frag_value(sub) & ceph_frag_mask(f)) == ceph_frag_value(f);
-}
 
-static inline __u32 ceph_frag_parent(__u32 f)
-{
-       return ceph_frag_make(ceph_frag_bits(f) - 1,
-                        ceph_frag_value(f) & (ceph_frag_mask(f) << 1));
-}
-static inline int ceph_frag_is_left_child(__u32 f)
-{
-       return ceph_frag_bits(f) > 0 &&
-               (ceph_frag_value(f) & (0x1000000 >> ceph_frag_bits(f))) == 0;
-}
-static inline int ceph_frag_is_right_child(__u32 f)
-{
-       return ceph_frag_bits(f) > 0 &&
-               (ceph_frag_value(f) & (0x1000000 >> ceph_frag_bits(f))) == 1;
-}
-static inline __u32 ceph_frag_sibling(__u32 f)
-{
-       return ceph_frag_make(ceph_frag_bits(f),
-                     ceph_frag_value(f) ^ (0x1000000 >> ceph_frag_bits(f)));
-}
-static inline __u32 ceph_frag_left_child(__u32 f)
-{
-       return ceph_frag_make(ceph_frag_bits(f)+1, ceph_frag_value(f));
-}
-static inline __u32 ceph_frag_right_child(__u32 f)
-{
-       return ceph_frag_make(ceph_frag_bits(f)+1,
-             ceph_frag_value(f) | (0x1000000 >> (1+ceph_frag_bits(f))));
-}
 static inline __u32 ceph_frag_make_child(__u32 f, int by, int i)
 {
        int newbits = ceph_frag_bits(f) + by;
index 71b1d6cdcb5d1fc53dd45fa3950e138cfac19dfe..8dbd7879fdc6b74719d9cb65b85d8c60f589bde6 100644 (file)
@@ -220,6 +220,7 @@ struct ceph_connection {
        struct ceph_entity_addr actual_peer_addr;
 
        /* message out temps */
+       struct ceph_msg_header out_hdr;
        struct ceph_msg *out_msg;        /* sending message (== tail of
                                            out_sent) */
        bool out_msg_done;
@@ -229,7 +230,6 @@ struct ceph_connection {
        int out_kvec_left;   /* kvec's left in out_kvec */
        int out_skip;        /* skip this many bytes */
        int out_kvec_bytes;  /* total bytes left */
-       bool out_kvec_is_msg; /* kvec refers to out_msg */
        int out_more;        /* there is more data after the kvecs */
        __le64 out_temp_ack; /* for writing an ack */
        struct ceph_timespec out_temp_keepalive2; /* for writing keepalive2
index 10d87753ed8737329c244b1ae7718e95c8abd118..9e43a315e6622028ba2b61fd8d36ab20305d611b 100644 (file)
@@ -152,7 +152,6 @@ static int process_one_ticket(struct ceph_auth_client *ac,
        void *ticket_buf = NULL;
        void *tp, *tpend;
        void **ptp;
-       struct ceph_timespec new_validity;
        struct ceph_crypto_key new_session_key;
        struct ceph_buffer *new_ticket_blob;
        unsigned long new_expires, new_renew_after;
@@ -193,8 +192,8 @@ static int process_one_ticket(struct ceph_auth_client *ac,
        if (ret)
                goto out;
 
-       ceph_decode_copy(&dp, &new_validity, sizeof(new_validity));
-       ceph_decode_timespec(&validity, &new_validity);
+       ceph_decode_timespec(&validity, dp);
+       dp += sizeof(struct ceph_timespec);
        new_expires = get_seconds() + validity.tv_sec;
        new_renew_after = new_expires - (validity.tv_sec / 4);
        dout(" expires=%lu renew_after=%lu\n", new_expires,
@@ -233,10 +232,10 @@ static int process_one_ticket(struct ceph_auth_client *ac,
                ceph_buffer_put(th->ticket_blob);
        th->session_key = new_session_key;
        th->ticket_blob = new_ticket_blob;
-       th->validity = new_validity;
        th->secret_id = new_secret_id;
        th->expires = new_expires;
        th->renew_after = new_renew_after;
+       th->have_key = true;
        dout(" got ticket service %d (%s) secret_id %lld len %d\n",
             type, ceph_entity_type_name(type), th->secret_id,
             (int)th->ticket_blob->vec.iov_len);
@@ -384,6 +383,24 @@ bad:
        return -ERANGE;
 }
 
+static bool need_key(struct ceph_x_ticket_handler *th)
+{
+       if (!th->have_key)
+               return true;
+
+       return get_seconds() >= th->renew_after;
+}
+
+static bool have_key(struct ceph_x_ticket_handler *th)
+{
+       if (th->have_key) {
+               if (get_seconds() >= th->expires)
+                       th->have_key = false;
+       }
+
+       return th->have_key;
+}
+
 static void ceph_x_validate_tickets(struct ceph_auth_client *ac, int *pneed)
 {
        int want = ac->want_keys;
@@ -402,20 +419,18 @@ static void ceph_x_validate_tickets(struct ceph_auth_client *ac, int *pneed)
                        continue;
 
                th = get_ticket_handler(ac, service);
-
                if (IS_ERR(th)) {
                        *pneed |= service;
                        continue;
                }
 
-               if (get_seconds() >= th->renew_after)
+               if (need_key(th))
                        *pneed |= service;
-               if (get_seconds() >= th->expires)
+               if (!have_key(th))
                        xi->have_keys &= ~service;
        }
 }
 
-
 static int ceph_x_build_request(struct ceph_auth_client *ac,
                                void *buf, void *end)
 {
@@ -667,14 +682,26 @@ static void ceph_x_destroy(struct ceph_auth_client *ac)
        ac->private = NULL;
 }
 
-static void ceph_x_invalidate_authorizer(struct ceph_auth_client *ac,
-                                  int peer_type)
+static void invalidate_ticket(struct ceph_auth_client *ac, int peer_type)
 {
        struct ceph_x_ticket_handler *th;
 
        th = get_ticket_handler(ac, peer_type);
        if (!IS_ERR(th))
-               memset(&th->validity, 0, sizeof(th->validity));
+               th->have_key = false;
+}
+
+static void ceph_x_invalidate_authorizer(struct ceph_auth_client *ac,
+                                        int peer_type)
+{
+       /*
+        * We are to invalidate a service ticket in the hopes of
+        * getting a new, hopefully more valid, one.  But, we won't get
+        * it unless our AUTH ticket is good, so invalidate AUTH ticket
+        * as well, just in case.
+        */
+       invalidate_ticket(ac, peer_type);
+       invalidate_ticket(ac, CEPH_ENTITY_TYPE_AUTH);
 }
 
 static int calcu_signature(struct ceph_x_authorizer *au,
index e8b7c6917d472f69d356252415efcb76a4afd3cf..40b1a3cf7397352e4673becccdbbb083c88f04d8 100644 (file)
@@ -16,7 +16,7 @@ struct ceph_x_ticket_handler {
        unsigned int service;
 
        struct ceph_crypto_key session_key;
-       struct ceph_timespec validity;
+       bool have_key;
 
        u64 secret_id;
        struct ceph_buffer *ticket_blob;
index 9981039ef4ffcca29081b83a5e06a81ce6871f20..9cfedf565f5b236b5ede7974466cf62af6ad995c 100644 (file)
@@ -23,9 +23,6 @@
 #include <linux/ceph/pagelist.h>
 #include <linux/export.h>
 
-#define list_entry_next(pos, member)                                   \
-       list_entry(pos->member.next, typeof(*pos), member)
-
 /*
  * Ceph uses the messenger to exchange ceph_msg messages with other
  * hosts in the system.  The messenger provides ordered and reliable
@@ -672,6 +669,8 @@ static void reset_connection(struct ceph_connection *con)
        }
        con->in_seq = 0;
        con->in_seq_acked = 0;
+
+       con->out_skip = 0;
 }
 
 /*
@@ -771,6 +770,8 @@ static u32 get_global_seq(struct ceph_messenger *msgr, u32 gt)
 
 static void con_out_kvec_reset(struct ceph_connection *con)
 {
+       BUG_ON(con->out_skip);
+
        con->out_kvec_left = 0;
        con->out_kvec_bytes = 0;
        con->out_kvec_cur = &con->out_kvec[0];
@@ -779,9 +780,9 @@ static void con_out_kvec_reset(struct ceph_connection *con)
 static void con_out_kvec_add(struct ceph_connection *con,
                                size_t size, void *data)
 {
-       int index;
+       int index = con->out_kvec_left;
 
-       index = con->out_kvec_left;
+       BUG_ON(con->out_skip);
        BUG_ON(index >= ARRAY_SIZE(con->out_kvec));
 
        con->out_kvec[index].iov_len = size;
@@ -790,6 +791,27 @@ static void con_out_kvec_add(struct ceph_connection *con,
        con->out_kvec_bytes += size;
 }
 
+/*
+ * Chop off a kvec from the end.  Return residual number of bytes for
+ * that kvec, i.e. how many bytes would have been written if the kvec
+ * hadn't been nuked.
+ */
+static int con_out_kvec_skip(struct ceph_connection *con)
+{
+       int off = con->out_kvec_cur - con->out_kvec;
+       int skip = 0;
+
+       if (con->out_kvec_bytes > 0) {
+               skip = con->out_kvec[off + con->out_kvec_left - 1].iov_len;
+               BUG_ON(con->out_kvec_bytes < skip);
+               BUG_ON(!con->out_kvec_left);
+               con->out_kvec_bytes -= skip;
+               con->out_kvec_left--;
+       }
+
+       return skip;
+}
+
 #ifdef CONFIG_BLOCK
 
 /*
@@ -1042,7 +1064,7 @@ static bool ceph_msg_data_pagelist_advance(struct ceph_msg_data_cursor *cursor,
        /* Move on to the next page */
 
        BUG_ON(list_is_last(&cursor->page->lru, &pagelist->head));
-       cursor->page = list_entry_next(cursor->page, lru);
+       cursor->page = list_next_entry(cursor->page, lru);
        cursor->last_piece = cursor->resid <= PAGE_SIZE;
 
        return true;
@@ -1166,7 +1188,7 @@ static bool ceph_msg_data_advance(struct ceph_msg_data_cursor *cursor,
        if (!cursor->resid && cursor->total_resid) {
                WARN_ON(!cursor->last_piece);
                BUG_ON(list_is_last(&cursor->data->links, cursor->data_head));
-               cursor->data = list_entry_next(cursor->data, links);
+               cursor->data = list_next_entry(cursor->data, links);
                __ceph_msg_data_cursor_init(cursor);
                new_piece = true;
        }
@@ -1197,7 +1219,6 @@ static void prepare_write_message_footer(struct ceph_connection *con)
        m->footer.flags |= CEPH_MSG_FOOTER_COMPLETE;
 
        dout("prepare_write_message_footer %p\n", con);
-       con->out_kvec_is_msg = true;
        con->out_kvec[v].iov_base = &m->footer;
        if (con->peer_features & CEPH_FEATURE_MSG_AUTH) {
                if (con->ops->sign_message)
@@ -1225,7 +1246,6 @@ static void prepare_write_message(struct ceph_connection *con)
        u32 crc;
 
        con_out_kvec_reset(con);
-       con->out_kvec_is_msg = true;
        con->out_msg_done = false;
 
        /* Sneak an ack in there first?  If we can get it into the same
@@ -1265,18 +1285,19 @@ static void prepare_write_message(struct ceph_connection *con)
 
        /* tag + hdr + front + middle */
        con_out_kvec_add(con, sizeof (tag_msg), &tag_msg);
-       con_out_kvec_add(con, sizeof (m->hdr), &m->hdr);
+       con_out_kvec_add(con, sizeof(con->out_hdr), &con->out_hdr);
        con_out_kvec_add(con, m->front.iov_len, m->front.iov_base);
 
        if (m->middle)
                con_out_kvec_add(con, m->middle->vec.iov_len,
                        m->middle->vec.iov_base);
 
-       /* fill in crc (except data pages), footer */
+       /* fill in hdr crc and finalize hdr */
        crc = crc32c(0, &m->hdr, offsetof(struct ceph_msg_header, crc));
        con->out_msg->hdr.crc = cpu_to_le32(crc);
-       con->out_msg->footer.flags = 0;
+       memcpy(&con->out_hdr, &con->out_msg->hdr, sizeof(con->out_hdr));
 
+       /* fill in front and middle crc, footer */
        crc = crc32c(0, m->front.iov_base, m->front.iov_len);
        con->out_msg->footer.front_crc = cpu_to_le32(crc);
        if (m->middle) {
@@ -1288,6 +1309,7 @@ static void prepare_write_message(struct ceph_connection *con)
        dout("%s front_crc %u middle_crc %u\n", __func__,
             le32_to_cpu(con->out_msg->footer.front_crc),
             le32_to_cpu(con->out_msg->footer.middle_crc));
+       con->out_msg->footer.flags = 0;
 
        /* is there a data payload? */
        con->out_msg->footer.data_crc = 0;
@@ -1492,7 +1514,6 @@ static int write_partial_kvec(struct ceph_connection *con)
                }
        }
        con->out_kvec_left = 0;
-       con->out_kvec_is_msg = false;
        ret = 1;
 out:
        dout("write_partial_kvec %p %d left in %d kvecs ret = %d\n", con,
@@ -1584,6 +1605,7 @@ static int write_partial_skip(struct ceph_connection *con)
 {
        int ret;
 
+       dout("%s %p %d left\n", __func__, con, con->out_skip);
        while (con->out_skip > 0) {
                size_t size = min(con->out_skip, (int) PAGE_CACHE_SIZE);
 
@@ -2506,13 +2528,13 @@ more:
 
 more_kvec:
        /* kvec data queued? */
-       if (con->out_skip) {
-               ret = write_partial_skip(con);
+       if (con->out_kvec_left) {
+               ret = write_partial_kvec(con);
                if (ret <= 0)
                        goto out;
        }
-       if (con->out_kvec_left) {
-               ret = write_partial_kvec(con);
+       if (con->out_skip) {
+               ret = write_partial_skip(con);
                if (ret <= 0)
                        goto out;
        }
@@ -2805,13 +2827,17 @@ static bool con_backoff(struct ceph_connection *con)
 
 static void con_fault_finish(struct ceph_connection *con)
 {
+       dout("%s %p\n", __func__, con);
+
        /*
         * in case we faulted due to authentication, invalidate our
         * current tickets so that we can get new ones.
         */
-       if (con->auth_retry && con->ops->invalidate_authorizer) {
-               dout("calling invalidate_authorizer()\n");
-               con->ops->invalidate_authorizer(con);
+       if (con->auth_retry) {
+               dout("auth_retry %d, invalidating\n", con->auth_retry);
+               if (con->ops->invalidate_authorizer)
+                       con->ops->invalidate_authorizer(con);
+               con->auth_retry = 0;
        }
 
        if (con->ops->fault)
@@ -3050,16 +3076,31 @@ void ceph_msg_revoke(struct ceph_msg *msg)
                ceph_msg_put(msg);
        }
        if (con->out_msg == msg) {
-               dout("%s %p msg %p - was sending\n", __func__, con, msg);
-               con->out_msg = NULL;
-               if (con->out_kvec_is_msg) {
-                       con->out_skip = con->out_kvec_bytes;
-                       con->out_kvec_is_msg = false;
+               BUG_ON(con->out_skip);
+               /* footer */
+               if (con->out_msg_done) {
+                       con->out_skip += con_out_kvec_skip(con);
+               } else {
+                       BUG_ON(!msg->data_length);
+                       if (con->peer_features & CEPH_FEATURE_MSG_AUTH)
+                               con->out_skip += sizeof(msg->footer);
+                       else
+                               con->out_skip += sizeof(msg->old_footer);
                }
+               /* data, middle, front */
+               if (msg->data_length)
+                       con->out_skip += msg->cursor.total_resid;
+               if (msg->middle)
+                       con->out_skip += con_out_kvec_skip(con);
+               con->out_skip += con_out_kvec_skip(con);
+
+               dout("%s %p msg %p - was sending, will write %d skip %d\n",
+                    __func__, con, msg, con->out_kvec_bytes, con->out_skip);
                msg->hdr.seq = 0;
-
+               con->out_msg = NULL;
                ceph_msg_put(msg);
        }
+
        mutex_unlock(&con->mutex);
 }
 
@@ -3361,9 +3402,7 @@ static void ceph_msg_free(struct ceph_msg *m)
 static void ceph_msg_release(struct kref *kref)
 {
        struct ceph_msg *m = container_of(kref, struct ceph_msg, kref);
-       LIST_HEAD(data);
-       struct list_head *links;
-       struct list_head *next;
+       struct ceph_msg_data *data, *next;
 
        dout("%s %p\n", __func__, m);
        WARN_ON(!list_empty(&m->list_head));
@@ -3376,12 +3415,8 @@ static void ceph_msg_release(struct kref *kref)
                m->middle = NULL;
        }
 
-       list_splice_init(&m->data, &data);
-       list_for_each_safe(links, next, &data) {
-               struct ceph_msg_data *data;
-
-               data = list_entry(links, struct ceph_msg_data, links);
-               list_del_init(links);
+       list_for_each_entry_safe(data, next, &m->data, links) {
+               list_del_init(&data->links);
                ceph_msg_data_destroy(data);
        }
        m->data_length = 0;
index edda01626a459efbfdaeebd46fd6a0b13a037879..de85dddc3dc08cfeee81435c4475a6d975c4cd96 100644 (file)
@@ -364,10 +364,6 @@ static bool have_debugfs_info(struct ceph_mon_client *monc)
        return monc->client->have_fsid && monc->auth->global_id > 0;
 }
 
-/*
- * The monitor responds with mount ack indicate mount success.  The
- * included client ticket allows the client to talk to MDSs and OSDs.
- */
 static void ceph_monc_handle_map(struct ceph_mon_client *monc,
                                 struct ceph_msg *msg)
 {