ceph: convert inline data to normal data before data write
authorYan, Zheng <zyan@redhat.com>
Fri, 14 Nov 2014 14:38:29 +0000 (22:38 +0800)
committerIlya Dryomov <idryomov@redhat.com>
Wed, 17 Dec 2014 17:09:52 +0000 (20:09 +0300)
Before any data write, convert inline data to normal data and set
i_inline_version to CEPH_INLINE_NONE. The OSD request that saves
inline data to object contains 3 operations (CMPXATTR, WRITE and
SETXATTR). It compares a xattr named 'inline_version' to prevent
old data overwrites newer data.

Signed-off-by: Yan, Zheng <zyan@redhat.com>
fs/ceph/addr.c
fs/ceph/file.c
fs/ceph/super.h

index 13413d7440d6896fc883a2625bec8f1d53458656..70a3b441261ba26e5c25430e34b3126f99585c6b 100644 (file)
@@ -1313,6 +1313,19 @@ static int ceph_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf)
        size_t len;
        int want, got, ret;
 
+       if (ci->i_inline_version != CEPH_INLINE_NONE) {
+               struct page *locked_page = NULL;
+               if (off == 0) {
+                       lock_page(page);
+                       locked_page = page;
+               }
+               ret = ceph_uninline_data(vma->vm_file, locked_page);
+               if (locked_page)
+                       unlock_page(locked_page);
+               if (ret < 0)
+                       return VM_FAULT_SIGBUS;
+       }
+
        if (off + PAGE_CACHE_SIZE <= size)
                len = PAGE_CACHE_SIZE;
        else
@@ -1361,11 +1374,13 @@ static int ceph_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf)
                        ret = VM_FAULT_SIGBUS;
        }
 out:
-       if (ret != VM_FAULT_LOCKED) {
+       if (ret != VM_FAULT_LOCKED)
                unlock_page(page);
-       } else {
+       if (ret == VM_FAULT_LOCKED ||
+           ci->i_inline_version != CEPH_INLINE_NONE) {
                int dirty;
                spin_lock(&ci->i_ceph_lock);
+               ci->i_inline_version = CEPH_INLINE_NONE;
                dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR);
                spin_unlock(&ci->i_ceph_lock);
                if (dirty)
@@ -1422,6 +1437,135 @@ void ceph_fill_inline_data(struct inode *inode, struct page *locked_page,
        }
 }
 
+int ceph_uninline_data(struct file *filp, struct page *locked_page)
+{
+       struct inode *inode = file_inode(filp);
+       struct ceph_inode_info *ci = ceph_inode(inode);
+       struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
+       struct ceph_osd_request *req;
+       struct page *page = NULL;
+       u64 len, inline_version;
+       int err = 0;
+       bool from_pagecache = false;
+
+       spin_lock(&ci->i_ceph_lock);
+       inline_version = ci->i_inline_version;
+       spin_unlock(&ci->i_ceph_lock);
+
+       dout("uninline_data %p %llx.%llx inline_version %llu\n",
+            inode, ceph_vinop(inode), inline_version);
+
+       if (inline_version == 1 || /* initial version, no data */
+           inline_version == CEPH_INLINE_NONE)
+               goto out;
+
+       if (locked_page) {
+               page = locked_page;
+               WARN_ON(!PageUptodate(page));
+       } else if (ceph_caps_issued(ci) &
+                  (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) {
+               page = find_get_page(inode->i_mapping, 0);
+               if (page) {
+                       if (PageUptodate(page)) {
+                               from_pagecache = true;
+                               lock_page(page);
+                       } else {
+                               page_cache_release(page);
+                               page = NULL;
+                       }
+               }
+       }
+
+       if (page) {
+               len = i_size_read(inode);
+               if (len > PAGE_CACHE_SIZE)
+                       len = PAGE_CACHE_SIZE;
+       } else {
+               page = __page_cache_alloc(GFP_NOFS);
+               if (!page) {
+                       err = -ENOMEM;
+                       goto out;
+               }
+               err = __ceph_do_getattr(inode, page,
+                                       CEPH_STAT_CAP_INLINE_DATA, true);
+               if (err < 0) {
+                       /* no inline data */
+                       if (err == -ENODATA)
+                               err = 0;
+                       goto out;
+               }
+               len = err;
+       }
+
+       req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
+                                   ceph_vino(inode), 0, &len, 0, 1,
+                                   CEPH_OSD_OP_CREATE,
+                                   CEPH_OSD_FLAG_ONDISK | CEPH_OSD_FLAG_WRITE,
+                                   ci->i_snap_realm->cached_context,
+                                   0, 0, false);
+       if (IS_ERR(req)) {
+               err = PTR_ERR(req);
+               goto out;
+       }
+
+       ceph_osdc_build_request(req, 0, NULL, CEPH_NOSNAP, &inode->i_mtime);
+       err = ceph_osdc_start_request(&fsc->client->osdc, req, false);
+       if (!err)
+               err = ceph_osdc_wait_request(&fsc->client->osdc, req);
+       ceph_osdc_put_request(req);
+       if (err < 0)
+               goto out;
+
+       req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
+                                   ceph_vino(inode), 0, &len, 1, 3,
+                                   CEPH_OSD_OP_WRITE,
+                                   CEPH_OSD_FLAG_ONDISK | CEPH_OSD_FLAG_WRITE,
+                                   ci->i_snap_realm->cached_context,
+                                   ci->i_truncate_seq, ci->i_truncate_size,
+                                   false);
+       if (IS_ERR(req)) {
+               err = PTR_ERR(req);
+               goto out;
+       }
+
+       osd_req_op_extent_osd_data_pages(req, 1, &page, len, 0, false, false);
+
+       err = osd_req_op_xattr_init(req, 0, CEPH_OSD_OP_CMPXATTR,
+                                   "inline_version", &inline_version,
+                                   sizeof(inline_version),
+                                   CEPH_OSD_CMPXATTR_OP_GT,
+                                   CEPH_OSD_CMPXATTR_MODE_U64);
+       if (err)
+               goto out_put;
+
+       err = osd_req_op_xattr_init(req, 2, CEPH_OSD_OP_SETXATTR,
+                                   "inline_version", &inline_version,
+                                   sizeof(inline_version), 0, 0);
+       if (err)
+               goto out_put;
+
+       ceph_osdc_build_request(req, 0, NULL, CEPH_NOSNAP, &inode->i_mtime);
+       err = ceph_osdc_start_request(&fsc->client->osdc, req, false);
+       if (!err)
+               err = ceph_osdc_wait_request(&fsc->client->osdc, req);
+out_put:
+       ceph_osdc_put_request(req);
+       if (err == -ECANCELED)
+               err = 0;
+out:
+       if (page && page != locked_page) {
+               if (from_pagecache) {
+                       unlock_page(page);
+                       page_cache_release(page);
+               } else
+                       __free_pages(page, 0);
+       }
+
+       dout("uninline_data %p %llx.%llx inline_version %llu = %d\n",
+            inode, ceph_vinop(inode), inline_version, err);
+       return err;
+}
+
 static struct vm_operations_struct ceph_vmops = {
        .fault          = ceph_filemap_fault,
        .page_mkwrite   = ceph_page_mkwrite,
index 5b092bda928464294e332346c0a04e0bcc0fb2c7..9b5901fefbf89db3a99a41c8b61f2ad47dd528f5 100644 (file)
@@ -963,6 +963,12 @@ static ssize_t ceph_write_iter(struct kiocb *iocb, struct iov_iter *from)
        if (err)
                goto out;
 
+       if (ci->i_inline_version != CEPH_INLINE_NONE) {
+               err = ceph_uninline_data(file, NULL);
+               if (err < 0)
+                       goto out;
+       }
+
 retry_snap:
        if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL)) {
                err = -ENOSPC;
@@ -1024,6 +1030,7 @@ retry_snap:
        if (written >= 0) {
                int dirty;
                spin_lock(&ci->i_ceph_lock);
+               ci->i_inline_version = CEPH_INLINE_NONE;
                dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR);
                spin_unlock(&ci->i_ceph_lock);
                if (dirty)
@@ -1269,6 +1276,12 @@ static long ceph_fallocate(struct file *file, int mode,
                goto unlock;
        }
 
+       if (ci->i_inline_version != CEPH_INLINE_NONE) {
+               ret = ceph_uninline_data(file, NULL);
+               if (ret < 0)
+                       goto unlock;
+       }
+
        size = i_size_read(inode);
        if (!(mode & FALLOC_FL_KEEP_SIZE))
                endoff = offset + length;
@@ -1295,6 +1308,7 @@ static long ceph_fallocate(struct file *file, int mode,
 
        if (!ret) {
                spin_lock(&ci->i_ceph_lock);
+               ci->i_inline_version = CEPH_INLINE_NONE;
                dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR);
                spin_unlock(&ci->i_ceph_lock);
                if (dirty)
index 6d56fae863ca997121ec99d3832b56e124b1c479..8197a3cf750b285e4be9196a89d98b28a19cd065 100644 (file)
@@ -888,7 +888,7 @@ extern int ceph_atomic_open(struct inode *dir, struct dentry *dentry,
 extern int ceph_release(struct inode *inode, struct file *filp);
 extern void ceph_fill_inline_data(struct inode *inode, struct page *locked_page,
                                  char *data, size_t len);
-
+int ceph_uninline_data(struct file *filp, struct page *locked_page);
 /* dir.c */
 extern const struct file_operations ceph_dir_fops;
 extern const struct inode_operations ceph_dir_iops;