Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph...
authorLinus Torvalds <torvalds@linux-foundation.org>
Sun, 24 Jan 2016 20:34:13 +0000 (12:34 -0800)
committerLinus Torvalds <torvalds@linux-foundation.org>
Sun, 24 Jan 2016 20:34:13 +0000 (12:34 -0800)
Pull Ceph updates from Sage Weil:
 "The two main changes are aio support in CephFS, and a series that
  fixes several issues in the authentication key timeout/renewal code.

  On top of that are a variety of cleanups and minor bug fixes"

* 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client:
  libceph: remove outdated comment
  libceph: kill off ceph_x_ticket_handler::validity
  libceph: invalidate AUTH in addition to a service ticket
  libceph: fix authorizer invalidation, take 2
  libceph: clear messenger auth_retry flag if we fault
  libceph: fix ceph_msg_revoke()
  libceph: use list_for_each_entry_safe
  ceph: use i_size_{read,write} to get/set i_size
  ceph: re-send AIO write request when getting -EOLDSNAP error
  ceph: Asynchronous IO support
  ceph: Avoid to propagate the invalid page point
  ceph: fix double page_unlock() in page_mkwrite()
  rbd: delete an unnecessary check before rbd_dev_destroy()
  libceph: use list_next_entry instead of list_entry_next
  ceph: ceph_frag_contains_value can be boolean
  ceph: remove unused functions in ceph_frag.h

1  2 
fs/ceph/cache.c
fs/ceph/file.c
fs/ceph/inode.c

diff --combined fs/ceph/cache.c
index 7680e2626815d133d3b5e9998e529535bd9231f9,16e7f7b6069112fd48e854aa9d1567d5de5b3f34..a351480dbabc95891e4b61f83fb92485f8ea7b18
@@@ -106,7 -106,7 +106,7 @@@ static uint16_t ceph_fscache_inode_get_
  
        memset(&aux, 0, sizeof(aux));
        aux.mtime = inode->i_mtime;
-       aux.size = inode->i_size;
+       aux.size = i_size_read(inode);
  
        memcpy(buffer, &aux, sizeof(aux));
  
@@@ -117,9 -117,7 +117,7 @@@ static void ceph_fscache_inode_get_attr
                                        uint64_t *size)
  {
        const struct ceph_inode_info* ci = cookie_netfs_data;
-       const struct inode* inode = &ci->vfs_inode;
-       *size = inode->i_size;
+       *size = i_size_read(&ci->vfs_inode);
  }
  
  static enum fscache_checkaux ceph_fscache_inode_check_aux(
  
        memset(&aux, 0, sizeof(aux));
        aux.mtime = inode->i_mtime;
-       aux.size = inode->i_size;
+       aux.size = i_size_read(inode);
  
        if (memcmp(data, &aux, sizeof(aux)) != 0)
                return FSCACHE_CHECKAUX_OBSOLETE;
@@@ -197,7 -195,7 +195,7 @@@ void ceph_fscache_register_inode_cookie
                return;
  
        /* Avoid multiple racing open requests */
 -      mutex_lock(&inode->i_mutex);
 +      inode_lock(inode);
  
        if (ci->fscache)
                goto done;
                                             ci, true);
        fscache_check_consistency(ci->fscache);
  done:
 -      mutex_unlock(&inode->i_mutex);
 +      inode_unlock(inode);
  
  }
  
diff --combined fs/ceph/file.c
index 10c5ae79696ee860a1cebb6f99dec2d2bde4717a,d37efdd8533d58c8b105b4280a6c2c6f72392ec5..86a9c383955e56037eb38419b1e4617317d32237
@@@ -397,8 -397,9 +397,9 @@@ int ceph_release(struct inode *inode, s
  }
  
  enum {
-       CHECK_EOF = 1,
-       READ_INLINE = 2,
+       HAVE_RETRIED = 1,
+       CHECK_EOF =    2,
+       READ_INLINE =  3,
  };
  
  /*
  static int striped_read(struct inode *inode,
                        u64 off, u64 len,
                        struct page **pages, int num_pages,
-                       int *checkeof, bool o_direct,
-                       unsigned long buf_align)
+                       int *checkeof)
  {
        struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
        struct ceph_inode_info *ci = ceph_inode(inode);
        u64 pos, this_len, left;
-       int io_align, page_align;
-       int pages_left;
-       int read;
+       loff_t i_size;
+       int page_align, pages_left;
+       int read, ret;
        struct page **page_pos;
-       int ret;
        bool hit_stripe, was_short;
  
        /*
        page_pos = pages;
        pages_left = num_pages;
        read = 0;
-       io_align = off & ~PAGE_MASK;
  
  more:
-       if (o_direct)
-               page_align = (pos - io_align + buf_align) & ~PAGE_MASK;
-       else
-               page_align = pos & ~PAGE_MASK;
+       page_align = pos & ~PAGE_MASK;
        this_len = left;
        ret = ceph_osdc_readpages(&fsc->client->osdc, ceph_vino(inode),
                                  &ci->i_layout, pos, &this_len,
        dout("striped_read %llu~%llu (read %u) got %d%s%s\n", pos, left, read,
             ret, hit_stripe ? " HITSTRIPE" : "", was_short ? " SHORT" : "");
  
+       i_size = i_size_read(inode);
        if (ret >= 0) {
                int didpages;
-               if (was_short && (pos + ret < inode->i_size)) {
-                       int zlen = min(this_len - ret,
-                                      inode->i_size - pos - ret);
-                       int zoff = (o_direct ? buf_align : io_align) +
-                                   read + ret;
+               if (was_short && (pos + ret < i_size)) {
+                       int zlen = min(this_len - ret, i_size - pos - ret);
+                       int zoff = (off & ~PAGE_MASK) + read + ret;
                        dout(" zero gap %llu to %llu\n",
                                pos + ret, pos + ret + zlen);
                        ceph_zero_page_vector_range(zoff, zlen, pages);
                pages_left -= didpages;
  
                /* hit stripe and need continue*/
-               if (left && hit_stripe && pos < inode->i_size)
+               if (left && hit_stripe && pos < i_size)
                        goto more;
        }
  
        if (read > 0) {
                ret = read;
                /* did we bounce off eof? */
-               if (pos + left > inode->i_size)
+               if (pos + left > i_size)
                        *checkeof = CHECK_EOF;
        }
  
@@@ -521,54 -515,28 +515,28 @@@ static ssize_t ceph_sync_read(struct ki
        if (ret < 0)
                return ret;
  
-       if (iocb->ki_flags & IOCB_DIRECT) {
-               while (iov_iter_count(i)) {
-                       size_t start;
-                       ssize_t n;
-                       n = dio_get_pagev_size(i);
-                       pages = dio_get_pages_alloc(i, n, &start, &num_pages);
-                       if (IS_ERR(pages))
-                               return PTR_ERR(pages);
-                       ret = striped_read(inode, off, n,
-                                          pages, num_pages, checkeof,
-                                          1, start);
-                       ceph_put_page_vector(pages, num_pages, true);
-                       if (ret <= 0)
-                               break;
-                       off += ret;
-                       iov_iter_advance(i, ret);
-                       if (ret < n)
+       num_pages = calc_pages_for(off, len);
+       pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
+       if (IS_ERR(pages))
+               return PTR_ERR(pages);
+       ret = striped_read(inode, off, len, pages,
+                               num_pages, checkeof);
+       if (ret > 0) {
+               int l, k = 0;
+               size_t left = ret;
+               while (left) {
+                       size_t page_off = off & ~PAGE_MASK;
+                       size_t copy = min_t(size_t, left,
+                                           PAGE_SIZE - page_off);
+                       l = copy_page_to_iter(pages[k++], page_off, copy, i);
+                       off += l;
+                       left -= l;
+                       if (l < copy)
                                break;
                }
-       } else {
-               num_pages = calc_pages_for(off, len);
-               pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
-               if (IS_ERR(pages))
-                       return PTR_ERR(pages);
-               ret = striped_read(inode, off, len, pages,
-                                       num_pages, checkeof, 0, 0);
-               if (ret > 0) {
-                       int l, k = 0;
-                       size_t left = ret;
-                       while (left) {
-                               size_t page_off = off & ~PAGE_MASK;
-                               size_t copy = min_t(size_t,
-                                                   PAGE_SIZE - page_off, left);
-                               l = copy_page_to_iter(pages[k++], page_off,
-                                                     copy, i);
-                               off += l;
-                               left -= l;
-                               if (l < copy)
-                                       break;
-                       }
-               }
-               ceph_release_page_vector(pages, num_pages);
        }
+       ceph_release_page_vector(pages, num_pages);
  
        if (off > iocb->ki_pos) {
                ret = off - iocb->ki_pos;
        return ret;
  }
  
+ struct ceph_aio_request {
+       struct kiocb *iocb;
+       size_t total_len;
+       int write;
+       int error;
+       struct list_head osd_reqs;
+       unsigned num_reqs;
+       atomic_t pending_reqs;
+       struct timespec mtime;
+       struct ceph_cap_flush *prealloc_cf;
+ };
+ struct ceph_aio_work {
+       struct work_struct work;
+       struct ceph_osd_request *req;
+ };
+ static void ceph_aio_retry_work(struct work_struct *work);
+ static void ceph_aio_complete(struct inode *inode,
+                             struct ceph_aio_request *aio_req)
+ {
+       struct ceph_inode_info *ci = ceph_inode(inode);
+       int ret;
+       if (!atomic_dec_and_test(&aio_req->pending_reqs))
+               return;
+       ret = aio_req->error;
+       if (!ret)
+               ret = aio_req->total_len;
+       dout("ceph_aio_complete %p rc %d\n", inode, ret);
+       if (ret >= 0 && aio_req->write) {
+               int dirty;
+               loff_t endoff = aio_req->iocb->ki_pos + aio_req->total_len;
+               if (endoff > i_size_read(inode)) {
+                       if (ceph_inode_set_size(inode, endoff))
+                               ceph_check_caps(ci, CHECK_CAPS_AUTHONLY, NULL);
+               }
+               spin_lock(&ci->i_ceph_lock);
+               ci->i_inline_version = CEPH_INLINE_NONE;
+               dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR,
+                                              &aio_req->prealloc_cf);
+               spin_unlock(&ci->i_ceph_lock);
+               if (dirty)
+                       __mark_inode_dirty(inode, dirty);
+       }
+       ceph_put_cap_refs(ci, (aio_req->write ? CEPH_CAP_FILE_WR :
+                                               CEPH_CAP_FILE_RD));
+       aio_req->iocb->ki_complete(aio_req->iocb, ret, 0);
+       ceph_free_cap_flush(aio_req->prealloc_cf);
+       kfree(aio_req);
+ }
+ static void ceph_aio_complete_req(struct ceph_osd_request *req,
+                                 struct ceph_msg *msg)
+ {
+       int rc = req->r_result;
+       struct inode *inode = req->r_inode;
+       struct ceph_aio_request *aio_req = req->r_priv;
+       struct ceph_osd_data *osd_data = osd_req_op_extent_osd_data(req, 0);
+       int num_pages = calc_pages_for((u64)osd_data->alignment,
+                                      osd_data->length);
+       dout("ceph_aio_complete_req %p rc %d bytes %llu\n",
+            inode, rc, osd_data->length);
+       if (rc == -EOLDSNAPC) {
+               struct ceph_aio_work *aio_work;
+               BUG_ON(!aio_req->write);
+               aio_work = kmalloc(sizeof(*aio_work), GFP_NOFS);
+               if (aio_work) {
+                       INIT_WORK(&aio_work->work, ceph_aio_retry_work);
+                       aio_work->req = req;
+                       queue_work(ceph_inode_to_client(inode)->wb_wq,
+                                  &aio_work->work);
+                       return;
+               }
+               rc = -ENOMEM;
+       } else if (!aio_req->write) {
+               if (rc == -ENOENT)
+                       rc = 0;
+               if (rc >= 0 && osd_data->length > rc) {
+                       int zoff = osd_data->alignment + rc;
+                       int zlen = osd_data->length - rc;
+                       /*
+                        * If read is satisfied by single OSD request,
+                        * it can pass EOF. Otherwise read is within
+                        * i_size.
+                        */
+                       if (aio_req->num_reqs == 1) {
+                               loff_t i_size = i_size_read(inode);
+                               loff_t endoff = aio_req->iocb->ki_pos + rc;
+                               if (endoff < i_size)
+                                       zlen = min_t(size_t, zlen,
+                                                    i_size - endoff);
+                               aio_req->total_len = rc + zlen;
+                       }
+                       if (zlen > 0)
+                               ceph_zero_page_vector_range(zoff, zlen,
+                                                           osd_data->pages);
+               }
+       }
+       ceph_put_page_vector(osd_data->pages, num_pages, false);
+       ceph_osdc_put_request(req);
+       if (rc < 0)
+               cmpxchg(&aio_req->error, 0, rc);
+       ceph_aio_complete(inode, aio_req);
+       return;
+ }
+ static void ceph_aio_retry_work(struct work_struct *work)
+ {
+       struct ceph_aio_work *aio_work =
+               container_of(work, struct ceph_aio_work, work);
+       struct ceph_osd_request *orig_req = aio_work->req;
+       struct ceph_aio_request *aio_req = orig_req->r_priv;
+       struct inode *inode = orig_req->r_inode;
+       struct ceph_inode_info *ci = ceph_inode(inode);
+       struct ceph_snap_context *snapc;
+       struct ceph_osd_request *req;
+       int ret;
+       spin_lock(&ci->i_ceph_lock);
+       if (__ceph_have_pending_cap_snap(ci)) {
+               struct ceph_cap_snap *capsnap =
+                       list_last_entry(&ci->i_cap_snaps,
+                                       struct ceph_cap_snap,
+                                       ci_item);
+               snapc = ceph_get_snap_context(capsnap->context);
+       } else {
+               BUG_ON(!ci->i_head_snapc);
+               snapc = ceph_get_snap_context(ci->i_head_snapc);
+       }
+       spin_unlock(&ci->i_ceph_lock);
+       req = ceph_osdc_alloc_request(orig_req->r_osdc, snapc, 2,
+                       false, GFP_NOFS);
+       if (IS_ERR(req)) {
+               ret = PTR_ERR(req);
+               req = orig_req;
+               goto out;
+       }
+       req->r_flags =  CEPH_OSD_FLAG_ORDERSNAP |
+                       CEPH_OSD_FLAG_ONDISK |
+                       CEPH_OSD_FLAG_WRITE;
+       req->r_base_oloc = orig_req->r_base_oloc;
+       req->r_base_oid = orig_req->r_base_oid;
+       req->r_ops[0] = orig_req->r_ops[0];
+       osd_req_op_init(req, 1, CEPH_OSD_OP_STARTSYNC, 0);
+       ceph_osdc_build_request(req, req->r_ops[0].extent.offset,
+                               snapc, CEPH_NOSNAP, &aio_req->mtime);
+       ceph_put_snap_context(snapc);
+       ceph_osdc_put_request(orig_req);
+       req->r_callback = ceph_aio_complete_req;
+       req->r_inode = inode;
+       req->r_priv = aio_req;
+       ret = ceph_osdc_start_request(req->r_osdc, req, false);
+ out:
+       if (ret < 0) {
+               BUG_ON(ret == -EOLDSNAPC);
+               req->r_result = ret;
+               ceph_aio_complete_req(req, NULL);
+       }
+       kfree(aio_work);
+ }
  /*
   * Write commit request unsafe callback, called to tell us when a
   * request is unsafe (that is, in flight--has been handed to the
@@@ -612,16 -767,10 +767,10 @@@ static void ceph_sync_write_unsafe(stru
  }
  
  
- /*
-  * Synchronous write, straight from __user pointer or user pages.
-  *
-  * If write spans object boundary, just do multiple writes.  (For a
-  * correct atomic write, we should e.g. take write locks on all
-  * objects, rollback on failure, etc.)
-  */
  static ssize_t
- ceph_sync_direct_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos,
-                      struct ceph_snap_context *snapc)
+ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter,
+                      struct ceph_snap_context *snapc,
+                      struct ceph_cap_flush **pcf)
  {
        struct file *file = iocb->ki_filp;
        struct inode *inode = file_inode(file);
        struct ceph_vino vino;
        struct ceph_osd_request *req;
        struct page **pages;
-       int num_pages;
-       int written = 0;
+       struct ceph_aio_request *aio_req = NULL;
+       int num_pages = 0;
        int flags;
-       int check_caps = 0;
        int ret;
        struct timespec mtime = CURRENT_TIME;
-       size_t count = iov_iter_count(from);
+       size_t count = iov_iter_count(iter);
+       loff_t pos = iocb->ki_pos;
+       bool write = iov_iter_rw(iter) == WRITE;
  
-       if (ceph_snap(file_inode(file)) != CEPH_NOSNAP)
+       if (write && ceph_snap(file_inode(file)) != CEPH_NOSNAP)
                return -EROFS;
  
-       dout("sync_direct_write on file %p %lld~%u\n", file, pos,
-            (unsigned)count);
+       dout("sync_direct_read_write (%s) on file %p %lld~%u\n",
+            (write ? "write" : "read"), file, pos, (unsigned)count);
  
        ret = filemap_write_and_wait_range(inode->i_mapping, pos, pos + count);
        if (ret < 0)
                return ret;
  
-       ret = invalidate_inode_pages2_range(inode->i_mapping,
-                                           pos >> PAGE_CACHE_SHIFT,
-                                           (pos + count) >> PAGE_CACHE_SHIFT);
-       if (ret < 0)
-               dout("invalidate_inode_pages2_range returned %d\n", ret);
+       if (write) {
+               ret = invalidate_inode_pages2_range(inode->i_mapping,
+                                       pos >> PAGE_CACHE_SHIFT,
+                                       (pos + count) >> PAGE_CACHE_SHIFT);
+               if (ret < 0)
+                       dout("invalidate_inode_pages2_range returned %d\n", ret);
  
-       flags = CEPH_OSD_FLAG_ORDERSNAP |
-               CEPH_OSD_FLAG_ONDISK |
-               CEPH_OSD_FLAG_WRITE;
+               flags = CEPH_OSD_FLAG_ORDERSNAP |
+                       CEPH_OSD_FLAG_ONDISK |
+                       CEPH_OSD_FLAG_WRITE;
+       } else {
+               flags = CEPH_OSD_FLAG_READ;
+       }
  
-       while (iov_iter_count(from) > 0) {
-               u64 len = dio_get_pagev_size(from);
-               size_t start;
-               ssize_t n;
+       while (iov_iter_count(iter) > 0) {
+               u64 size = dio_get_pagev_size(iter);
+               size_t start = 0;
+               ssize_t len;
  
                vino = ceph_vino(inode);
                req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
-                                           vino, pos, &len, 0,
-                                           2,/*include a 'startsync' command*/
-                                           CEPH_OSD_OP_WRITE, flags, snapc,
+                                           vino, pos, &size, 0,
+                                           /*include a 'startsync' command*/
+                                           write ? 2 : 1,
+                                           write ? CEPH_OSD_OP_WRITE :
+                                                   CEPH_OSD_OP_READ,
+                                           flags, snapc,
                                            ci->i_truncate_seq,
                                            ci->i_truncate_size,
                                            false);
                        break;
                }
  
-               osd_req_op_init(req, 1, CEPH_OSD_OP_STARTSYNC, 0);
-               n = len;
-               pages = dio_get_pages_alloc(from, len, &start, &num_pages);
+               len = size;
+               pages = dio_get_pages_alloc(iter, len, &start, &num_pages);
                if (IS_ERR(pages)) {
                        ceph_osdc_put_request(req);
                        ret = PTR_ERR(pages);
                }
  
                /*
-                * throw out any page cache pages in this range. this
-                * may block.
+                * To simplify error handling, allow AIO when IO within i_size
+                * or IO can be satisfied by single OSD request.
                 */
-               truncate_inode_pages_range(inode->i_mapping, pos,
-                                  (pos+n) | (PAGE_CACHE_SIZE-1));
-               osd_req_op_extent_osd_data_pages(req, 0, pages, n, start,
-                                               false, false);
+               if (pos == iocb->ki_pos && !is_sync_kiocb(iocb) &&
+                   (len == count || pos + count <= i_size_read(inode))) {
+                       aio_req = kzalloc(sizeof(*aio_req), GFP_KERNEL);
+                       if (aio_req) {
+                               aio_req->iocb = iocb;
+                               aio_req->write = write;
+                               INIT_LIST_HEAD(&aio_req->osd_reqs);
+                               if (write) {
+                                       aio_req->mtime = mtime;
+                                       swap(aio_req->prealloc_cf, *pcf);
+                               }
+                       }
+                       /* ignore error */
+               }
+               if (write) {
+                       /*
+                        * throw out any page cache pages in this range. this
+                        * may block.
+                        */
+                       truncate_inode_pages_range(inode->i_mapping, pos,
+                                       (pos+len) | (PAGE_CACHE_SIZE - 1));
+                       osd_req_op_init(req, 1, CEPH_OSD_OP_STARTSYNC, 0);
+               }
+               osd_req_op_extent_osd_data_pages(req, 0, pages, len, start,
+                                                false, false);
  
-               /* BUG_ON(vino.snap != CEPH_NOSNAP); */
                ceph_osdc_build_request(req, pos, snapc, vino.snap, &mtime);
  
-               ret = ceph_osdc_start_request(&fsc->client->osdc, req, false);
+               if (aio_req) {
+                       aio_req->total_len += len;
+                       aio_req->num_reqs++;
+                       atomic_inc(&aio_req->pending_reqs);
+                       req->r_callback = ceph_aio_complete_req;
+                       req->r_inode = inode;
+                       req->r_priv = aio_req;
+                       list_add_tail(&req->r_unsafe_item, &aio_req->osd_reqs);
+                       pos += len;
+                       iov_iter_advance(iter, len);
+                       continue;
+               }
+               ret = ceph_osdc_start_request(req->r_osdc, req, false);
                if (!ret)
                        ret = ceph_osdc_wait_request(&fsc->client->osdc, req);
  
+               size = i_size_read(inode);
+               if (!write) {
+                       if (ret == -ENOENT)
+                               ret = 0;
+                       if (ret >= 0 && ret < len && pos + ret < size) {
+                               int zlen = min_t(size_t, len - ret,
+                                                size - pos - ret);
+                               ceph_zero_page_vector_range(start + ret, zlen,
+                                                           pages);
+                               ret += zlen;
+                       }
+                       if (ret >= 0)
+                               len = ret;
+               }
                ceph_put_page_vector(pages, num_pages, false);
  
                ceph_osdc_put_request(req);
-               if (ret)
+               if (ret < 0)
+                       break;
+               pos += len;
+               iov_iter_advance(iter, len);
+               if (!write && pos >= size)
                        break;
-               pos += n;
-               written += n;
-               iov_iter_advance(from, n);
  
-               if (pos > i_size_read(inode)) {
-                       check_caps = ceph_inode_set_size(inode, pos);
-                       if (check_caps)
+               if (write && pos > size) {
+                       if (ceph_inode_set_size(inode, pos))
                                ceph_check_caps(ceph_inode(inode),
                                                CHECK_CAPS_AUTHONLY,
                                                NULL);
                }
        }
  
-       if (ret != -EOLDSNAPC && written > 0) {
+       if (aio_req) {
+               if (aio_req->num_reqs == 0) {
+                       kfree(aio_req);
+                       return ret;
+               }
+               ceph_get_cap_refs(ci, write ? CEPH_CAP_FILE_WR :
+                                             CEPH_CAP_FILE_RD);
+               while (!list_empty(&aio_req->osd_reqs)) {
+                       req = list_first_entry(&aio_req->osd_reqs,
+                                              struct ceph_osd_request,
+                                              r_unsafe_item);
+                       list_del_init(&req->r_unsafe_item);
+                       if (ret >= 0)
+                               ret = ceph_osdc_start_request(req->r_osdc,
+                                                             req, false);
+                       if (ret < 0) {
+                               BUG_ON(ret == -EOLDSNAPC);
+                               req->r_result = ret;
+                               ceph_aio_complete_req(req, NULL);
+                       }
+               }
+               return -EIOCBQUEUED;
+       }
+       if (ret != -EOLDSNAPC && pos > iocb->ki_pos) {
+               ret = pos - iocb->ki_pos;
                iocb->ki_pos = pos;
-               ret = written;
        }
        return ret;
  }
  
  /*
   * Synchronous write, straight from __user pointer or user pages.
   *
@@@ -897,8 -1133,14 +1133,14 @@@ again
                     ceph_cap_string(got));
  
                if (ci->i_inline_version == CEPH_INLINE_NONE) {
-                       /* hmm, this isn't really async... */
-                       ret = ceph_sync_read(iocb, to, &retry_op);
+                       if (!retry_op && (iocb->ki_flags & IOCB_DIRECT)) {
+                               ret = ceph_direct_read_write(iocb, to,
+                                                            NULL, NULL);
+                               if (ret >= 0 && ret < len)
+                                       retry_op = CHECK_EOF;
+                       } else {
+                               ret = ceph_sync_read(iocb, to, &retry_op);
+                       }
                } else {
                        retry_op = READ_INLINE;
                }
                pinned_page = NULL;
        }
        ceph_put_cap_refs(ci, got);
-       if (retry_op && ret >= 0) {
+       if (retry_op > HAVE_RETRIED && ret >= 0) {
                int statret;
                struct page *page = NULL;
                loff_t i_size;
                if (retry_op == CHECK_EOF && iocb->ki_pos < i_size &&
                    ret < len) {
                        dout("sync_read hit hole, ppos %lld < size %lld"
-                            ", reading more\n", iocb->ki_pos,
-                            inode->i_size);
+                            ", reading more\n", iocb->ki_pos, i_size);
  
                        read += ret;
                        len -= ret;
-                       retry_op = 0;
+                       retry_op = HAVE_RETRIED;
                        goto again;
                }
        }
@@@ -1014,7 -1255,7 +1255,7 @@@ static ssize_t ceph_write_iter(struct k
        if (!prealloc_cf)
                return -ENOMEM;
  
 -      mutex_lock(&inode->i_mutex);
 +      inode_lock(inode);
  
        /* We can write back this queue in page reclaim */
        current->backing_dev_info = inode_to_bdi(inode);
@@@ -1052,7 -1293,7 +1293,7 @@@ retry_snap
        }
  
        dout("aio_write %p %llx.%llx %llu~%zd getting caps. i_size %llu\n",
-            inode, ceph_vinop(inode), pos, count, inode->i_size);
+            inode, ceph_vinop(inode), pos, count, i_size_read(inode));
        if (fi->fmode & CEPH_FILE_MODE_LAZY)
                want = CEPH_CAP_FILE_BUFFER | CEPH_CAP_FILE_LAZYIO;
        else
            (iocb->ki_flags & IOCB_DIRECT) || (fi->flags & CEPH_F_SYNC)) {
                struct ceph_snap_context *snapc;
                struct iov_iter data;
 -              mutex_unlock(&inode->i_mutex);
 +              inode_unlock(inode);
  
                spin_lock(&ci->i_ceph_lock);
                if (__ceph_have_pending_cap_snap(ci)) {
                /* we might need to revert back to that point */
                data = *from;
                if (iocb->ki_flags & IOCB_DIRECT)
-                       written = ceph_sync_direct_write(iocb, &data, pos,
-                                                        snapc);
+                       written = ceph_direct_read_write(iocb, &data, snapc,
+                                                        &prealloc_cf);
                else
                        written = ceph_sync_write(iocb, &data, pos, snapc);
                if (written == -EOLDSNAPC) {
                                "got EOLDSNAPC, retrying\n",
                                inode, ceph_vinop(inode),
                                pos, (unsigned)count);
 -                      mutex_lock(&inode->i_mutex);
 +                      inode_lock(inode);
                        goto retry_snap;
                }
                if (written > 0)
                        iov_iter_advance(from, written);
                ceph_put_snap_context(snapc);
        } else {
-               loff_t old_size = inode->i_size;
+               loff_t old_size = i_size_read(inode);
                /*
                 * No need to acquire the i_truncate_mutex. Because
                 * the MDS revokes Fwb caps before sending truncate
                written = generic_perform_write(file, from, pos);
                if (likely(written >= 0))
                        iocb->ki_pos = pos + written;
-               if (inode->i_size > old_size)
+               if (i_size_read(inode) > old_size)
                        ceph_fscache_update_objectsize(inode);
 -              mutex_unlock(&inode->i_mutex);
 +              inode_unlock(inode);
        }
  
        if (written >= 0) {
        goto out_unlocked;
  
  out:
 -      mutex_unlock(&inode->i_mutex);
 +      inode_unlock(inode);
  out_unlocked:
        ceph_free_cap_flush(prealloc_cf);
        current->backing_dev_info = NULL;
  static loff_t ceph_llseek(struct file *file, loff_t offset, int whence)
  {
        struct inode *inode = file->f_mapping->host;
+       loff_t i_size;
        int ret;
  
 -      mutex_lock(&inode->i_mutex);
 +      inode_lock(inode);
  
        if (whence == SEEK_END || whence == SEEK_DATA || whence == SEEK_HOLE) {
                ret = ceph_do_getattr(inode, CEPH_STAT_CAP_SIZE, false);
                }
        }
  
+       i_size = i_size_read(inode);
        switch (whence) {
        case SEEK_END:
-               offset += inode->i_size;
+               offset += i_size;
                break;
        case SEEK_CUR:
                /*
                offset += file->f_pos;
                break;
        case SEEK_DATA:
-               if (offset >= inode->i_size) {
+               if (offset >= i_size) {
                        ret = -ENXIO;
                        goto out;
                }
                break;
        case SEEK_HOLE:
-               if (offset >= inode->i_size) {
+               if (offset >= i_size) {
                        ret = -ENXIO;
                        goto out;
                }
-               offset = inode->i_size;
+               offset = i_size;
                break;
        }
  
        offset = vfs_setpos(file, offset, inode->i_sb->s_maxbytes);
  
  out:
 -      mutex_unlock(&inode->i_mutex);
 +      inode_unlock(inode);
        return offset;
  }
  
@@@ -1363,7 -1606,7 +1606,7 @@@ static long ceph_fallocate(struct file 
        if (!prealloc_cf)
                return -ENOMEM;
  
 -      mutex_lock(&inode->i_mutex);
 +      inode_lock(inode);
  
        if (ceph_snap(inode) != CEPH_NOSNAP) {
                ret = -EROFS;
  
        ceph_put_cap_refs(ci, got);
  unlock:
 -      mutex_unlock(&inode->i_mutex);
 +      inode_unlock(inode);
        ceph_free_cap_flush(prealloc_cf);
        return ret;
  }
diff --combined fs/ceph/inode.c
index da55eb8bcffab89755baf5229b92ededf49dd484,eb7cb9176b0c1754ef4f5f1a23f02bdc563574b8..fb4ba2e4e2a5fa5c5d62afa3b94f758c7906687b
@@@ -548,7 -548,7 +548,7 @@@ int ceph_fill_file_size(struct inode *i
        if (ceph_seq_cmp(truncate_seq, ci->i_truncate_seq) > 0 ||
            (truncate_seq == ci->i_truncate_seq && size > inode->i_size)) {
                dout("size %lld -> %llu\n", inode->i_size, size);
-               inode->i_size = size;
+               i_size_write(inode, size);
                inode->i_blocks = (size + (1<<9) - 1) >> 9;
                ci->i_reported_size = size;
                if (truncate_seq != ci->i_truncate_seq) {
@@@ -808,7 -808,7 +808,7 @@@ static int fill_inode(struct inode *ino
                        spin_unlock(&ci->i_ceph_lock);
  
                        err = -EINVAL;
-                       if (WARN_ON(symlen != inode->i_size))
+                       if (WARN_ON(symlen != i_size_read(inode)))
                                goto out;
  
                        err = -ENOMEM;
@@@ -1549,7 -1549,7 +1549,7 @@@ int ceph_inode_set_size(struct inode *i
  
        spin_lock(&ci->i_ceph_lock);
        dout("set_size %p %llu -> %llu\n", inode, inode->i_size, size);
-       inode->i_size = size;
+       i_size_write(inode, size);
        inode->i_blocks = (size + (1 << 9) - 1) >> 9;
  
        /* tell the MDS if we are approaching max_size */
@@@ -1756,7 -1756,7 +1756,7 @@@ retry
   */
  static const struct inode_operations ceph_symlink_iops = {
        .readlink = generic_readlink,
 -      .follow_link = simple_follow_link,
 +      .get_link = simple_get_link,
        .setattr = ceph_setattr,
        .getattr = ceph_getattr,
        .setxattr = ceph_setxattr,
@@@ -1911,7 -1911,7 +1911,7 @@@ int ceph_setattr(struct dentry *dentry
                     inode->i_size, attr->ia_size);
                if ((issued & CEPH_CAP_FILE_EXCL) &&
                    attr->ia_size > inode->i_size) {
-                       inode->i_size = attr->ia_size;
+                       i_size_write(inode, attr->ia_size);
                        inode->i_blocks =
                                (attr->ia_size + (1 << 9) - 1) >> 9;
                        inode->i_ctime = attr->ia_ctime;