Merge branch 'acpi-ec'
[linux-drm-fsl-dcu.git] / drivers / block / rbd.c
1
2 /*
3    rbd.c -- Export ceph rados objects as a Linux block device
4
5
6    based on drivers/block/osdblk.c:
7
8    Copyright 2009 Red Hat, Inc.
9
10    This program is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation.
13
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18
19    You should have received a copy of the GNU General Public License
20    along with this program; see the file COPYING.  If not, write to
21    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
22
23
24
25    For usage instructions, please refer to:
26
27                  Documentation/ABI/testing/sysfs-bus-rbd
28
29  */
30
31 #include <linux/ceph/libceph.h>
32 #include <linux/ceph/osd_client.h>
33 #include <linux/ceph/mon_client.h>
34 #include <linux/ceph/decode.h>
35 #include <linux/parser.h>
36 #include <linux/bsearch.h>
37
38 #include <linux/kernel.h>
39 #include <linux/device.h>
40 #include <linux/module.h>
41 #include <linux/fs.h>
42 #include <linux/blkdev.h>
43 #include <linux/slab.h>
44 #include <linux/idr.h>
45 #include <linux/workqueue.h>
46
47 #include "rbd_types.h"
48
49 #define RBD_DEBUG       /* Activate rbd_assert() calls */
50
51 /*
52  * The basic unit of block I/O is a sector.  It is interpreted in a
53  * number of contexts in Linux (blk, bio, genhd), but the default is
54  * universally 512 bytes.  These symbols are just slightly more
55  * meaningful than the bare numbers they represent.
56  */
57 #define SECTOR_SHIFT    9
58 #define SECTOR_SIZE     (1ULL << SECTOR_SHIFT)
59
60 /*
61  * Increment the given counter and return its updated value.
62  * If the counter is already 0 it will not be incremented.
63  * If the counter is already at its maximum value returns
64  * -EINVAL without updating it.
65  */
66 static int atomic_inc_return_safe(atomic_t *v)
67 {
68         unsigned int counter;
69
70         counter = (unsigned int)__atomic_add_unless(v, 1, 0);
71         if (counter <= (unsigned int)INT_MAX)
72                 return (int)counter;
73
74         atomic_dec(v);
75
76         return -EINVAL;
77 }
78
79 /* Decrement the counter.  Return the resulting value, or -EINVAL */
80 static int atomic_dec_return_safe(atomic_t *v)
81 {
82         int counter;
83
84         counter = atomic_dec_return(v);
85         if (counter >= 0)
86                 return counter;
87
88         atomic_inc(v);
89
90         return -EINVAL;
91 }
92
93 #define RBD_DRV_NAME "rbd"
94
95 #define RBD_MINORS_PER_MAJOR            256
96 #define RBD_SINGLE_MAJOR_PART_SHIFT     4
97
98 #define RBD_SNAP_DEV_NAME_PREFIX        "snap_"
99 #define RBD_MAX_SNAP_NAME_LEN   \
100                         (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
101
102 #define RBD_MAX_SNAP_COUNT      510     /* allows max snapc to fit in 4KB */
103
104 #define RBD_SNAP_HEAD_NAME      "-"
105
106 #define BAD_SNAP_INDEX  U32_MAX         /* invalid index into snap array */
107
108 /* This allows a single page to hold an image name sent by OSD */
109 #define RBD_IMAGE_NAME_LEN_MAX  (PAGE_SIZE - sizeof (__le32) - 1)
110 #define RBD_IMAGE_ID_LEN_MAX    64
111
112 #define RBD_OBJ_PREFIX_LEN_MAX  64
113
114 /* Feature bits */
115
116 #define RBD_FEATURE_LAYERING    (1<<0)
117 #define RBD_FEATURE_STRIPINGV2  (1<<1)
118 #define RBD_FEATURES_ALL \
119             (RBD_FEATURE_LAYERING | RBD_FEATURE_STRIPINGV2)
120
121 /* Features supported by this (client software) implementation. */
122
123 #define RBD_FEATURES_SUPPORTED  (RBD_FEATURES_ALL)
124
125 /*
126  * An RBD device name will be "rbd#", where the "rbd" comes from
127  * RBD_DRV_NAME above, and # is a unique integer identifier.
128  * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
129  * enough to hold all possible device names.
130  */
131 #define DEV_NAME_LEN            32
132 #define MAX_INT_FORMAT_WIDTH    ((5 * sizeof (int)) / 2 + 1)
133
134 /*
135  * block device image metadata (in-memory version)
136  */
137 struct rbd_image_header {
138         /* These six fields never change for a given rbd image */
139         char *object_prefix;
140         __u8 obj_order;
141         __u8 crypt_type;
142         __u8 comp_type;
143         u64 stripe_unit;
144         u64 stripe_count;
145         u64 features;           /* Might be changeable someday? */
146
147         /* The remaining fields need to be updated occasionally */
148         u64 image_size;
149         struct ceph_snap_context *snapc;
150         char *snap_names;       /* format 1 only */
151         u64 *snap_sizes;        /* format 1 only */
152 };
153
154 /*
155  * An rbd image specification.
156  *
157  * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
158  * identify an image.  Each rbd_dev structure includes a pointer to
159  * an rbd_spec structure that encapsulates this identity.
160  *
161  * Each of the id's in an rbd_spec has an associated name.  For a
162  * user-mapped image, the names are supplied and the id's associated
163  * with them are looked up.  For a layered image, a parent image is
164  * defined by the tuple, and the names are looked up.
165  *
166  * An rbd_dev structure contains a parent_spec pointer which is
167  * non-null if the image it represents is a child in a layered
168  * image.  This pointer will refer to the rbd_spec structure used
169  * by the parent rbd_dev for its own identity (i.e., the structure
170  * is shared between the parent and child).
171  *
172  * Since these structures are populated once, during the discovery
173  * phase of image construction, they are effectively immutable so
174  * we make no effort to synchronize access to them.
175  *
176  * Note that code herein does not assume the image name is known (it
177  * could be a null pointer).
178  */
179 struct rbd_spec {
180         u64             pool_id;
181         const char      *pool_name;
182
183         const char      *image_id;
184         const char      *image_name;
185
186         u64             snap_id;
187         const char      *snap_name;
188
189         struct kref     kref;
190 };
191
192 /*
193  * an instance of the client.  multiple devices may share an rbd client.
194  */
195 struct rbd_client {
196         struct ceph_client      *client;
197         struct kref             kref;
198         struct list_head        node;
199 };
200
201 struct rbd_img_request;
202 typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
203
204 #define BAD_WHICH       U32_MAX         /* Good which or bad which, which? */
205
206 struct rbd_obj_request;
207 typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
208
209 enum obj_request_type {
210         OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
211 };
212
213 enum obj_operation_type {
214         OBJ_OP_WRITE,
215         OBJ_OP_READ,
216         OBJ_OP_DISCARD,
217 };
218
219 enum obj_req_flags {
220         OBJ_REQ_DONE,           /* completion flag: not done = 0, done = 1 */
221         OBJ_REQ_IMG_DATA,       /* object usage: standalone = 0, image = 1 */
222         OBJ_REQ_KNOWN,          /* EXISTS flag valid: no = 0, yes = 1 */
223         OBJ_REQ_EXISTS,         /* target exists: no = 0, yes = 1 */
224 };
225
226 struct rbd_obj_request {
227         const char              *object_name;
228         u64                     offset;         /* object start byte */
229         u64                     length;         /* bytes from offset */
230         unsigned long           flags;
231
232         /*
233          * An object request associated with an image will have its
234          * img_data flag set; a standalone object request will not.
235          *
236          * A standalone object request will have which == BAD_WHICH
237          * and a null obj_request pointer.
238          *
239          * An object request initiated in support of a layered image
240          * object (to check for its existence before a write) will
241          * have which == BAD_WHICH and a non-null obj_request pointer.
242          *
243          * Finally, an object request for rbd image data will have
244          * which != BAD_WHICH, and will have a non-null img_request
245          * pointer.  The value of which will be in the range
246          * 0..(img_request->obj_request_count-1).
247          */
248         union {
249                 struct rbd_obj_request  *obj_request;   /* STAT op */
250                 struct {
251                         struct rbd_img_request  *img_request;
252                         u64                     img_offset;
253                         /* links for img_request->obj_requests list */
254                         struct list_head        links;
255                 };
256         };
257         u32                     which;          /* posn image request list */
258
259         enum obj_request_type   type;
260         union {
261                 struct bio      *bio_list;
262                 struct {
263                         struct page     **pages;
264                         u32             page_count;
265                 };
266         };
267         struct page             **copyup_pages;
268         u32                     copyup_page_count;
269
270         struct ceph_osd_request *osd_req;
271
272         u64                     xferred;        /* bytes transferred */
273         int                     result;
274
275         rbd_obj_callback_t      callback;
276         struct completion       completion;
277
278         struct kref             kref;
279 };
280
281 enum img_req_flags {
282         IMG_REQ_WRITE,          /* I/O direction: read = 0, write = 1 */
283         IMG_REQ_CHILD,          /* initiator: block = 0, child image = 1 */
284         IMG_REQ_LAYERED,        /* ENOENT handling: normal = 0, layered = 1 */
285         IMG_REQ_DISCARD,        /* discard: normal = 0, discard request = 1 */
286 };
287
288 struct rbd_img_request {
289         struct rbd_device       *rbd_dev;
290         u64                     offset; /* starting image byte offset */
291         u64                     length; /* byte count from offset */
292         unsigned long           flags;
293         union {
294                 u64                     snap_id;        /* for reads */
295                 struct ceph_snap_context *snapc;        /* for writes */
296         };
297         union {
298                 struct request          *rq;            /* block request */
299                 struct rbd_obj_request  *obj_request;   /* obj req initiator */
300         };
301         struct page             **copyup_pages;
302         u32                     copyup_page_count;
303         spinlock_t              completion_lock;/* protects next_completion */
304         u32                     next_completion;
305         rbd_img_callback_t      callback;
306         u64                     xferred;/* aggregate bytes transferred */
307         int                     result; /* first nonzero obj_request result */
308
309         u32                     obj_request_count;
310         struct list_head        obj_requests;   /* rbd_obj_request structs */
311
312         struct kref             kref;
313 };
314
315 #define for_each_obj_request(ireq, oreq) \
316         list_for_each_entry(oreq, &(ireq)->obj_requests, links)
317 #define for_each_obj_request_from(ireq, oreq) \
318         list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
319 #define for_each_obj_request_safe(ireq, oreq, n) \
320         list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
321
322 struct rbd_mapping {
323         u64                     size;
324         u64                     features;
325         bool                    read_only;
326 };
327
328 /*
329  * a single device
330  */
331 struct rbd_device {
332         int                     dev_id;         /* blkdev unique id */
333
334         int                     major;          /* blkdev assigned major */
335         int                     minor;
336         struct gendisk          *disk;          /* blkdev's gendisk and rq */
337
338         u32                     image_format;   /* Either 1 or 2 */
339         struct rbd_client       *rbd_client;
340
341         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
342
343         struct list_head        rq_queue;       /* incoming rq queue */
344         spinlock_t              lock;           /* queue, flags, open_count */
345         struct work_struct      rq_work;
346
347         struct rbd_image_header header;
348         unsigned long           flags;          /* possibly lock protected */
349         struct rbd_spec         *spec;
350
351         char                    *header_name;
352
353         struct ceph_file_layout layout;
354
355         struct ceph_osd_event   *watch_event;
356         struct rbd_obj_request  *watch_request;
357
358         struct rbd_spec         *parent_spec;
359         u64                     parent_overlap;
360         atomic_t                parent_ref;
361         struct rbd_device       *parent;
362
363         /* protects updating the header */
364         struct rw_semaphore     header_rwsem;
365
366         struct rbd_mapping      mapping;
367
368         struct list_head        node;
369
370         /* sysfs related */
371         struct device           dev;
372         unsigned long           open_count;     /* protected by lock */
373 };
374
375 /*
376  * Flag bits for rbd_dev->flags.  If atomicity is required,
377  * rbd_dev->lock is used to protect access.
378  *
379  * Currently, only the "removing" flag (which is coupled with the
380  * "open_count" field) requires atomic access.
381  */
382 enum rbd_dev_flags {
383         RBD_DEV_FLAG_EXISTS,    /* mapped snapshot has not been deleted */
384         RBD_DEV_FLAG_REMOVING,  /* this mapping is being removed */
385 };
386
387 static DEFINE_MUTEX(client_mutex);      /* Serialize client creation */
388
389 static LIST_HEAD(rbd_dev_list);    /* devices */
390 static DEFINE_SPINLOCK(rbd_dev_list_lock);
391
392 static LIST_HEAD(rbd_client_list);              /* clients */
393 static DEFINE_SPINLOCK(rbd_client_list_lock);
394
395 /* Slab caches for frequently-allocated structures */
396
397 static struct kmem_cache        *rbd_img_request_cache;
398 static struct kmem_cache        *rbd_obj_request_cache;
399 static struct kmem_cache        *rbd_segment_name_cache;
400
401 static int rbd_major;
402 static DEFINE_IDA(rbd_dev_id_ida);
403
404 static struct workqueue_struct *rbd_wq;
405
406 /*
407  * Default to false for now, as single-major requires >= 0.75 version of
408  * userspace rbd utility.
409  */
410 static bool single_major = false;
411 module_param(single_major, bool, S_IRUGO);
412 MODULE_PARM_DESC(single_major, "Use a single major number for all rbd devices (default: false)");
413
414 static int rbd_img_request_submit(struct rbd_img_request *img_request);
415
416 static void rbd_dev_device_release(struct device *dev);
417
418 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
419                        size_t count);
420 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
421                           size_t count);
422 static ssize_t rbd_add_single_major(struct bus_type *bus, const char *buf,
423                                     size_t count);
424 static ssize_t rbd_remove_single_major(struct bus_type *bus, const char *buf,
425                                        size_t count);
426 static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool mapping);
427 static void rbd_spec_put(struct rbd_spec *spec);
428
429 static int rbd_dev_id_to_minor(int dev_id)
430 {
431         return dev_id << RBD_SINGLE_MAJOR_PART_SHIFT;
432 }
433
434 static int minor_to_rbd_dev_id(int minor)
435 {
436         return minor >> RBD_SINGLE_MAJOR_PART_SHIFT;
437 }
438
439 static BUS_ATTR(add, S_IWUSR, NULL, rbd_add);
440 static BUS_ATTR(remove, S_IWUSR, NULL, rbd_remove);
441 static BUS_ATTR(add_single_major, S_IWUSR, NULL, rbd_add_single_major);
442 static BUS_ATTR(remove_single_major, S_IWUSR, NULL, rbd_remove_single_major);
443
444 static struct attribute *rbd_bus_attrs[] = {
445         &bus_attr_add.attr,
446         &bus_attr_remove.attr,
447         &bus_attr_add_single_major.attr,
448         &bus_attr_remove_single_major.attr,
449         NULL,
450 };
451
452 static umode_t rbd_bus_is_visible(struct kobject *kobj,
453                                   struct attribute *attr, int index)
454 {
455         if (!single_major &&
456             (attr == &bus_attr_add_single_major.attr ||
457              attr == &bus_attr_remove_single_major.attr))
458                 return 0;
459
460         return attr->mode;
461 }
462
463 static const struct attribute_group rbd_bus_group = {
464         .attrs = rbd_bus_attrs,
465         .is_visible = rbd_bus_is_visible,
466 };
467 __ATTRIBUTE_GROUPS(rbd_bus);
468
469 static struct bus_type rbd_bus_type = {
470         .name           = "rbd",
471         .bus_groups     = rbd_bus_groups,
472 };
473
474 static void rbd_root_dev_release(struct device *dev)
475 {
476 }
477
478 static struct device rbd_root_dev = {
479         .init_name =    "rbd",
480         .release =      rbd_root_dev_release,
481 };
482
483 static __printf(2, 3)
484 void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
485 {
486         struct va_format vaf;
487         va_list args;
488
489         va_start(args, fmt);
490         vaf.fmt = fmt;
491         vaf.va = &args;
492
493         if (!rbd_dev)
494                 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
495         else if (rbd_dev->disk)
496                 printk(KERN_WARNING "%s: %s: %pV\n",
497                         RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
498         else if (rbd_dev->spec && rbd_dev->spec->image_name)
499                 printk(KERN_WARNING "%s: image %s: %pV\n",
500                         RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
501         else if (rbd_dev->spec && rbd_dev->spec->image_id)
502                 printk(KERN_WARNING "%s: id %s: %pV\n",
503                         RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
504         else    /* punt */
505                 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
506                         RBD_DRV_NAME, rbd_dev, &vaf);
507         va_end(args);
508 }
509
510 #ifdef RBD_DEBUG
511 #define rbd_assert(expr)                                                \
512                 if (unlikely(!(expr))) {                                \
513                         printk(KERN_ERR "\nAssertion failure in %s() "  \
514                                                 "at line %d:\n\n"       \
515                                         "\trbd_assert(%s);\n\n",        \
516                                         __func__, __LINE__, #expr);     \
517                         BUG();                                          \
518                 }
519 #else /* !RBD_DEBUG */
520 #  define rbd_assert(expr)      ((void) 0)
521 #endif /* !RBD_DEBUG */
522
523 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request);
524 static void rbd_img_parent_read(struct rbd_obj_request *obj_request);
525 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
526
527 static int rbd_dev_refresh(struct rbd_device *rbd_dev);
528 static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev);
529 static int rbd_dev_header_info(struct rbd_device *rbd_dev);
530 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev);
531 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
532                                         u64 snap_id);
533 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
534                                 u8 *order, u64 *snap_size);
535 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
536                 u64 *snap_features);
537 static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name);
538
539 static int rbd_open(struct block_device *bdev, fmode_t mode)
540 {
541         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
542         bool removing = false;
543
544         if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
545                 return -EROFS;
546
547         spin_lock_irq(&rbd_dev->lock);
548         if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
549                 removing = true;
550         else
551                 rbd_dev->open_count++;
552         spin_unlock_irq(&rbd_dev->lock);
553         if (removing)
554                 return -ENOENT;
555
556         (void) get_device(&rbd_dev->dev);
557
558         return 0;
559 }
560
561 static void rbd_release(struct gendisk *disk, fmode_t mode)
562 {
563         struct rbd_device *rbd_dev = disk->private_data;
564         unsigned long open_count_before;
565
566         spin_lock_irq(&rbd_dev->lock);
567         open_count_before = rbd_dev->open_count--;
568         spin_unlock_irq(&rbd_dev->lock);
569         rbd_assert(open_count_before > 0);
570
571         put_device(&rbd_dev->dev);
572 }
573
574 static int rbd_ioctl_set_ro(struct rbd_device *rbd_dev, unsigned long arg)
575 {
576         int ret = 0;
577         int val;
578         bool ro;
579         bool ro_changed = false;
580
581         /* get_user() may sleep, so call it before taking rbd_dev->lock */
582         if (get_user(val, (int __user *)(arg)))
583                 return -EFAULT;
584
585         ro = val ? true : false;
586         /* Snapshot doesn't allow to write*/
587         if (rbd_dev->spec->snap_id != CEPH_NOSNAP && !ro)
588                 return -EROFS;
589
590         spin_lock_irq(&rbd_dev->lock);
591         /* prevent others open this device */
592         if (rbd_dev->open_count > 1) {
593                 ret = -EBUSY;
594                 goto out;
595         }
596
597         if (rbd_dev->mapping.read_only != ro) {
598                 rbd_dev->mapping.read_only = ro;
599                 ro_changed = true;
600         }
601
602 out:
603         spin_unlock_irq(&rbd_dev->lock);
604         /* set_disk_ro() may sleep, so call it after releasing rbd_dev->lock */
605         if (ret == 0 && ro_changed)
606                 set_disk_ro(rbd_dev->disk, ro ? 1 : 0);
607
608         return ret;
609 }
610
611 static int rbd_ioctl(struct block_device *bdev, fmode_t mode,
612                         unsigned int cmd, unsigned long arg)
613 {
614         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
615         int ret = 0;
616
617         switch (cmd) {
618         case BLKROSET:
619                 ret = rbd_ioctl_set_ro(rbd_dev, arg);
620                 break;
621         default:
622                 ret = -ENOTTY;
623         }
624
625         return ret;
626 }
627
628 #ifdef CONFIG_COMPAT
629 static int rbd_compat_ioctl(struct block_device *bdev, fmode_t mode,
630                                 unsigned int cmd, unsigned long arg)
631 {
632         return rbd_ioctl(bdev, mode, cmd, arg);
633 }
634 #endif /* CONFIG_COMPAT */
635
636 static const struct block_device_operations rbd_bd_ops = {
637         .owner                  = THIS_MODULE,
638         .open                   = rbd_open,
639         .release                = rbd_release,
640         .ioctl                  = rbd_ioctl,
641 #ifdef CONFIG_COMPAT
642         .compat_ioctl           = rbd_compat_ioctl,
643 #endif
644 };
645
646 /*
647  * Initialize an rbd client instance.  Success or not, this function
648  * consumes ceph_opts.  Caller holds client_mutex.
649  */
650 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
651 {
652         struct rbd_client *rbdc;
653         int ret = -ENOMEM;
654
655         dout("%s:\n", __func__);
656         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
657         if (!rbdc)
658                 goto out_opt;
659
660         kref_init(&rbdc->kref);
661         INIT_LIST_HEAD(&rbdc->node);
662
663         rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
664         if (IS_ERR(rbdc->client))
665                 goto out_rbdc;
666         ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
667
668         ret = ceph_open_session(rbdc->client);
669         if (ret < 0)
670                 goto out_client;
671
672         spin_lock(&rbd_client_list_lock);
673         list_add_tail(&rbdc->node, &rbd_client_list);
674         spin_unlock(&rbd_client_list_lock);
675
676         dout("%s: rbdc %p\n", __func__, rbdc);
677
678         return rbdc;
679 out_client:
680         ceph_destroy_client(rbdc->client);
681 out_rbdc:
682         kfree(rbdc);
683 out_opt:
684         if (ceph_opts)
685                 ceph_destroy_options(ceph_opts);
686         dout("%s: error %d\n", __func__, ret);
687
688         return ERR_PTR(ret);
689 }
690
691 static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
692 {
693         kref_get(&rbdc->kref);
694
695         return rbdc;
696 }
697
698 /*
699  * Find a ceph client with specific addr and configuration.  If
700  * found, bump its reference count.
701  */
702 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
703 {
704         struct rbd_client *client_node;
705         bool found = false;
706
707         if (ceph_opts->flags & CEPH_OPT_NOSHARE)
708                 return NULL;
709
710         spin_lock(&rbd_client_list_lock);
711         list_for_each_entry(client_node, &rbd_client_list, node) {
712                 if (!ceph_compare_options(ceph_opts, client_node->client)) {
713                         __rbd_get_client(client_node);
714
715                         found = true;
716                         break;
717                 }
718         }
719         spin_unlock(&rbd_client_list_lock);
720
721         return found ? client_node : NULL;
722 }
723
724 /*
725  * mount options
726  */
727 enum {
728         Opt_last_int,
729         /* int args above */
730         Opt_last_string,
731         /* string args above */
732         Opt_read_only,
733         Opt_read_write,
734         /* Boolean args above */
735         Opt_last_bool,
736 };
737
738 static match_table_t rbd_opts_tokens = {
739         /* int args above */
740         /* string args above */
741         {Opt_read_only, "read_only"},
742         {Opt_read_only, "ro"},          /* Alternate spelling */
743         {Opt_read_write, "read_write"},
744         {Opt_read_write, "rw"},         /* Alternate spelling */
745         /* Boolean args above */
746         {-1, NULL}
747 };
748
749 struct rbd_options {
750         bool    read_only;
751 };
752
753 #define RBD_READ_ONLY_DEFAULT   false
754
755 static int parse_rbd_opts_token(char *c, void *private)
756 {
757         struct rbd_options *rbd_opts = private;
758         substring_t argstr[MAX_OPT_ARGS];
759         int token, intval, ret;
760
761         token = match_token(c, rbd_opts_tokens, argstr);
762         if (token < 0)
763                 return -EINVAL;
764
765         if (token < Opt_last_int) {
766                 ret = match_int(&argstr[0], &intval);
767                 if (ret < 0) {
768                         pr_err("bad mount option arg (not int) "
769                                "at '%s'\n", c);
770                         return ret;
771                 }
772                 dout("got int token %d val %d\n", token, intval);
773         } else if (token > Opt_last_int && token < Opt_last_string) {
774                 dout("got string token %d val %s\n", token,
775                      argstr[0].from);
776         } else if (token > Opt_last_string && token < Opt_last_bool) {
777                 dout("got Boolean token %d\n", token);
778         } else {
779                 dout("got token %d\n", token);
780         }
781
782         switch (token) {
783         case Opt_read_only:
784                 rbd_opts->read_only = true;
785                 break;
786         case Opt_read_write:
787                 rbd_opts->read_only = false;
788                 break;
789         default:
790                 rbd_assert(false);
791                 break;
792         }
793         return 0;
794 }
795
796 static char* obj_op_name(enum obj_operation_type op_type)
797 {
798         switch (op_type) {
799         case OBJ_OP_READ:
800                 return "read";
801         case OBJ_OP_WRITE:
802                 return "write";
803         case OBJ_OP_DISCARD:
804                 return "discard";
805         default:
806                 return "???";
807         }
808 }
809
810 /*
811  * Get a ceph client with specific addr and configuration, if one does
812  * not exist create it.  Either way, ceph_opts is consumed by this
813  * function.
814  */
815 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
816 {
817         struct rbd_client *rbdc;
818
819         mutex_lock_nested(&client_mutex, SINGLE_DEPTH_NESTING);
820         rbdc = rbd_client_find(ceph_opts);
821         if (rbdc)       /* using an existing client */
822                 ceph_destroy_options(ceph_opts);
823         else
824                 rbdc = rbd_client_create(ceph_opts);
825         mutex_unlock(&client_mutex);
826
827         return rbdc;
828 }
829
830 /*
831  * Destroy ceph client
832  *
833  * Caller must hold rbd_client_list_lock.
834  */
835 static void rbd_client_release(struct kref *kref)
836 {
837         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
838
839         dout("%s: rbdc %p\n", __func__, rbdc);
840         spin_lock(&rbd_client_list_lock);
841         list_del(&rbdc->node);
842         spin_unlock(&rbd_client_list_lock);
843
844         ceph_destroy_client(rbdc->client);
845         kfree(rbdc);
846 }
847
848 /*
849  * Drop reference to ceph client node. If it's not referenced anymore, release
850  * it.
851  */
852 static void rbd_put_client(struct rbd_client *rbdc)
853 {
854         if (rbdc)
855                 kref_put(&rbdc->kref, rbd_client_release);
856 }
857
858 static bool rbd_image_format_valid(u32 image_format)
859 {
860         return image_format == 1 || image_format == 2;
861 }
862
863 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
864 {
865         size_t size;
866         u32 snap_count;
867
868         /* The header has to start with the magic rbd header text */
869         if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
870                 return false;
871
872         /* The bio layer requires at least sector-sized I/O */
873
874         if (ondisk->options.order < SECTOR_SHIFT)
875                 return false;
876
877         /* If we use u64 in a few spots we may be able to loosen this */
878
879         if (ondisk->options.order > 8 * sizeof (int) - 1)
880                 return false;
881
882         /*
883          * The size of a snapshot header has to fit in a size_t, and
884          * that limits the number of snapshots.
885          */
886         snap_count = le32_to_cpu(ondisk->snap_count);
887         size = SIZE_MAX - sizeof (struct ceph_snap_context);
888         if (snap_count > size / sizeof (__le64))
889                 return false;
890
891         /*
892          * Not only that, but the size of the entire the snapshot
893          * header must also be representable in a size_t.
894          */
895         size -= snap_count * sizeof (__le64);
896         if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
897                 return false;
898
899         return true;
900 }
901
902 /*
903  * Fill an rbd image header with information from the given format 1
904  * on-disk header.
905  */
906 static int rbd_header_from_disk(struct rbd_device *rbd_dev,
907                                  struct rbd_image_header_ondisk *ondisk)
908 {
909         struct rbd_image_header *header = &rbd_dev->header;
910         bool first_time = header->object_prefix == NULL;
911         struct ceph_snap_context *snapc;
912         char *object_prefix = NULL;
913         char *snap_names = NULL;
914         u64 *snap_sizes = NULL;
915         u32 snap_count;
916         size_t size;
917         int ret = -ENOMEM;
918         u32 i;
919
920         /* Allocate this now to avoid having to handle failure below */
921
922         if (first_time) {
923                 size_t len;
924
925                 len = strnlen(ondisk->object_prefix,
926                                 sizeof (ondisk->object_prefix));
927                 object_prefix = kmalloc(len + 1, GFP_KERNEL);
928                 if (!object_prefix)
929                         return -ENOMEM;
930                 memcpy(object_prefix, ondisk->object_prefix, len);
931                 object_prefix[len] = '\0';
932         }
933
934         /* Allocate the snapshot context and fill it in */
935
936         snap_count = le32_to_cpu(ondisk->snap_count);
937         snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
938         if (!snapc)
939                 goto out_err;
940         snapc->seq = le64_to_cpu(ondisk->snap_seq);
941         if (snap_count) {
942                 struct rbd_image_snap_ondisk *snaps;
943                 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
944
945                 /* We'll keep a copy of the snapshot names... */
946
947                 if (snap_names_len > (u64)SIZE_MAX)
948                         goto out_2big;
949                 snap_names = kmalloc(snap_names_len, GFP_KERNEL);
950                 if (!snap_names)
951                         goto out_err;
952
953                 /* ...as well as the array of their sizes. */
954
955                 size = snap_count * sizeof (*header->snap_sizes);
956                 snap_sizes = kmalloc(size, GFP_KERNEL);
957                 if (!snap_sizes)
958                         goto out_err;
959
960                 /*
961                  * Copy the names, and fill in each snapshot's id
962                  * and size.
963                  *
964                  * Note that rbd_dev_v1_header_info() guarantees the
965                  * ondisk buffer we're working with has
966                  * snap_names_len bytes beyond the end of the
967                  * snapshot id array, this memcpy() is safe.
968                  */
969                 memcpy(snap_names, &ondisk->snaps[snap_count], snap_names_len);
970                 snaps = ondisk->snaps;
971                 for (i = 0; i < snap_count; i++) {
972                         snapc->snaps[i] = le64_to_cpu(snaps[i].id);
973                         snap_sizes[i] = le64_to_cpu(snaps[i].image_size);
974                 }
975         }
976
977         /* We won't fail any more, fill in the header */
978
979         if (first_time) {
980                 header->object_prefix = object_prefix;
981                 header->obj_order = ondisk->options.order;
982                 header->crypt_type = ondisk->options.crypt_type;
983                 header->comp_type = ondisk->options.comp_type;
984                 /* The rest aren't used for format 1 images */
985                 header->stripe_unit = 0;
986                 header->stripe_count = 0;
987                 header->features = 0;
988         } else {
989                 ceph_put_snap_context(header->snapc);
990                 kfree(header->snap_names);
991                 kfree(header->snap_sizes);
992         }
993
994         /* The remaining fields always get updated (when we refresh) */
995
996         header->image_size = le64_to_cpu(ondisk->image_size);
997         header->snapc = snapc;
998         header->snap_names = snap_names;
999         header->snap_sizes = snap_sizes;
1000
1001         return 0;
1002 out_2big:
1003         ret = -EIO;
1004 out_err:
1005         kfree(snap_sizes);
1006         kfree(snap_names);
1007         ceph_put_snap_context(snapc);
1008         kfree(object_prefix);
1009
1010         return ret;
1011 }
1012
1013 static const char *_rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, u32 which)
1014 {
1015         const char *snap_name;
1016
1017         rbd_assert(which < rbd_dev->header.snapc->num_snaps);
1018
1019         /* Skip over names until we find the one we are looking for */
1020
1021         snap_name = rbd_dev->header.snap_names;
1022         while (which--)
1023                 snap_name += strlen(snap_name) + 1;
1024
1025         return kstrdup(snap_name, GFP_KERNEL);
1026 }
1027
1028 /*
1029  * Snapshot id comparison function for use with qsort()/bsearch().
1030  * Note that result is for snapshots in *descending* order.
1031  */
1032 static int snapid_compare_reverse(const void *s1, const void *s2)
1033 {
1034         u64 snap_id1 = *(u64 *)s1;
1035         u64 snap_id2 = *(u64 *)s2;
1036
1037         if (snap_id1 < snap_id2)
1038                 return 1;
1039         return snap_id1 == snap_id2 ? 0 : -1;
1040 }
1041
1042 /*
1043  * Search a snapshot context to see if the given snapshot id is
1044  * present.
1045  *
1046  * Returns the position of the snapshot id in the array if it's found,
1047  * or BAD_SNAP_INDEX otherwise.
1048  *
1049  * Note: The snapshot array is in kept sorted (by the osd) in
1050  * reverse order, highest snapshot id first.
1051  */
1052 static u32 rbd_dev_snap_index(struct rbd_device *rbd_dev, u64 snap_id)
1053 {
1054         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
1055         u64 *found;
1056
1057         found = bsearch(&snap_id, &snapc->snaps, snapc->num_snaps,
1058                                 sizeof (snap_id), snapid_compare_reverse);
1059
1060         return found ? (u32)(found - &snapc->snaps[0]) : BAD_SNAP_INDEX;
1061 }
1062
1063 static const char *rbd_dev_v1_snap_name(struct rbd_device *rbd_dev,
1064                                         u64 snap_id)
1065 {
1066         u32 which;
1067         const char *snap_name;
1068
1069         which = rbd_dev_snap_index(rbd_dev, snap_id);
1070         if (which == BAD_SNAP_INDEX)
1071                 return ERR_PTR(-ENOENT);
1072
1073         snap_name = _rbd_dev_v1_snap_name(rbd_dev, which);
1074         return snap_name ? snap_name : ERR_PTR(-ENOMEM);
1075 }
1076
1077 static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
1078 {
1079         if (snap_id == CEPH_NOSNAP)
1080                 return RBD_SNAP_HEAD_NAME;
1081
1082         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1083         if (rbd_dev->image_format == 1)
1084                 return rbd_dev_v1_snap_name(rbd_dev, snap_id);
1085
1086         return rbd_dev_v2_snap_name(rbd_dev, snap_id);
1087 }
1088
1089 static int rbd_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
1090                                 u64 *snap_size)
1091 {
1092         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1093         if (snap_id == CEPH_NOSNAP) {
1094                 *snap_size = rbd_dev->header.image_size;
1095         } else if (rbd_dev->image_format == 1) {
1096                 u32 which;
1097
1098                 which = rbd_dev_snap_index(rbd_dev, snap_id);
1099                 if (which == BAD_SNAP_INDEX)
1100                         return -ENOENT;
1101
1102                 *snap_size = rbd_dev->header.snap_sizes[which];
1103         } else {
1104                 u64 size = 0;
1105                 int ret;
1106
1107                 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
1108                 if (ret)
1109                         return ret;
1110
1111                 *snap_size = size;
1112         }
1113         return 0;
1114 }
1115
1116 static int rbd_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
1117                         u64 *snap_features)
1118 {
1119         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1120         if (snap_id == CEPH_NOSNAP) {
1121                 *snap_features = rbd_dev->header.features;
1122         } else if (rbd_dev->image_format == 1) {
1123                 *snap_features = 0;     /* No features for format 1 */
1124         } else {
1125                 u64 features = 0;
1126                 int ret;
1127
1128                 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features);
1129                 if (ret)
1130                         return ret;
1131
1132                 *snap_features = features;
1133         }
1134         return 0;
1135 }
1136
1137 static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
1138 {
1139         u64 snap_id = rbd_dev->spec->snap_id;
1140         u64 size = 0;
1141         u64 features = 0;
1142         int ret;
1143
1144         ret = rbd_snap_size(rbd_dev, snap_id, &size);
1145         if (ret)
1146                 return ret;
1147         ret = rbd_snap_features(rbd_dev, snap_id, &features);
1148         if (ret)
1149                 return ret;
1150
1151         rbd_dev->mapping.size = size;
1152         rbd_dev->mapping.features = features;
1153
1154         return 0;
1155 }
1156
1157 static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
1158 {
1159         rbd_dev->mapping.size = 0;
1160         rbd_dev->mapping.features = 0;
1161 }
1162
1163 static void rbd_segment_name_free(const char *name)
1164 {
1165         /* The explicit cast here is needed to drop the const qualifier */
1166
1167         kmem_cache_free(rbd_segment_name_cache, (void *)name);
1168 }
1169
1170 static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
1171 {
1172         char *name;
1173         u64 segment;
1174         int ret;
1175         char *name_format;
1176
1177         name = kmem_cache_alloc(rbd_segment_name_cache, GFP_NOIO);
1178         if (!name)
1179                 return NULL;
1180         segment = offset >> rbd_dev->header.obj_order;
1181         name_format = "%s.%012llx";
1182         if (rbd_dev->image_format == 2)
1183                 name_format = "%s.%016llx";
1184         ret = snprintf(name, CEPH_MAX_OID_NAME_LEN + 1, name_format,
1185                         rbd_dev->header.object_prefix, segment);
1186         if (ret < 0 || ret > CEPH_MAX_OID_NAME_LEN) {
1187                 pr_err("error formatting segment name for #%llu (%d)\n",
1188                         segment, ret);
1189                 rbd_segment_name_free(name);
1190                 name = NULL;
1191         }
1192
1193         return name;
1194 }
1195
1196 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
1197 {
1198         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
1199
1200         return offset & (segment_size - 1);
1201 }
1202
1203 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
1204                                 u64 offset, u64 length)
1205 {
1206         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
1207
1208         offset &= segment_size - 1;
1209
1210         rbd_assert(length <= U64_MAX - offset);
1211         if (offset + length > segment_size)
1212                 length = segment_size - offset;
1213
1214         return length;
1215 }
1216
1217 /*
1218  * returns the size of an object in the image
1219  */
1220 static u64 rbd_obj_bytes(struct rbd_image_header *header)
1221 {
1222         return 1 << header->obj_order;
1223 }
1224
1225 /*
1226  * bio helpers
1227  */
1228
1229 static void bio_chain_put(struct bio *chain)
1230 {
1231         struct bio *tmp;
1232
1233         while (chain) {
1234                 tmp = chain;
1235                 chain = chain->bi_next;
1236                 bio_put(tmp);
1237         }
1238 }
1239
1240 /*
1241  * zeros a bio chain, starting at specific offset
1242  */
1243 static void zero_bio_chain(struct bio *chain, int start_ofs)
1244 {
1245         struct bio_vec bv;
1246         struct bvec_iter iter;
1247         unsigned long flags;
1248         void *buf;
1249         int pos = 0;
1250
1251         while (chain) {
1252                 bio_for_each_segment(bv, chain, iter) {
1253                         if (pos + bv.bv_len > start_ofs) {
1254                                 int remainder = max(start_ofs - pos, 0);
1255                                 buf = bvec_kmap_irq(&bv, &flags);
1256                                 memset(buf + remainder, 0,
1257                                        bv.bv_len - remainder);
1258                                 flush_dcache_page(bv.bv_page);
1259                                 bvec_kunmap_irq(buf, &flags);
1260                         }
1261                         pos += bv.bv_len;
1262                 }
1263
1264                 chain = chain->bi_next;
1265         }
1266 }
1267
1268 /*
1269  * similar to zero_bio_chain(), zeros data defined by a page array,
1270  * starting at the given byte offset from the start of the array and
1271  * continuing up to the given end offset.  The pages array is
1272  * assumed to be big enough to hold all bytes up to the end.
1273  */
1274 static void zero_pages(struct page **pages, u64 offset, u64 end)
1275 {
1276         struct page **page = &pages[offset >> PAGE_SHIFT];
1277
1278         rbd_assert(end > offset);
1279         rbd_assert(end - offset <= (u64)SIZE_MAX);
1280         while (offset < end) {
1281                 size_t page_offset;
1282                 size_t length;
1283                 unsigned long flags;
1284                 void *kaddr;
1285
1286                 page_offset = offset & ~PAGE_MASK;
1287                 length = min_t(size_t, PAGE_SIZE - page_offset, end - offset);
1288                 local_irq_save(flags);
1289                 kaddr = kmap_atomic(*page);
1290                 memset(kaddr + page_offset, 0, length);
1291                 flush_dcache_page(*page);
1292                 kunmap_atomic(kaddr);
1293                 local_irq_restore(flags);
1294
1295                 offset += length;
1296                 page++;
1297         }
1298 }
1299
1300 /*
1301  * Clone a portion of a bio, starting at the given byte offset
1302  * and continuing for the number of bytes indicated.
1303  */
1304 static struct bio *bio_clone_range(struct bio *bio_src,
1305                                         unsigned int offset,
1306                                         unsigned int len,
1307                                         gfp_t gfpmask)
1308 {
1309         struct bio *bio;
1310
1311         bio = bio_clone(bio_src, gfpmask);
1312         if (!bio)
1313                 return NULL;    /* ENOMEM */
1314
1315         bio_advance(bio, offset);
1316         bio->bi_iter.bi_size = len;
1317
1318         return bio;
1319 }
1320
1321 /*
1322  * Clone a portion of a bio chain, starting at the given byte offset
1323  * into the first bio in the source chain and continuing for the
1324  * number of bytes indicated.  The result is another bio chain of
1325  * exactly the given length, or a null pointer on error.
1326  *
1327  * The bio_src and offset parameters are both in-out.  On entry they
1328  * refer to the first source bio and the offset into that bio where
1329  * the start of data to be cloned is located.
1330  *
1331  * On return, bio_src is updated to refer to the bio in the source
1332  * chain that contains first un-cloned byte, and *offset will
1333  * contain the offset of that byte within that bio.
1334  */
1335 static struct bio *bio_chain_clone_range(struct bio **bio_src,
1336                                         unsigned int *offset,
1337                                         unsigned int len,
1338                                         gfp_t gfpmask)
1339 {
1340         struct bio *bi = *bio_src;
1341         unsigned int off = *offset;
1342         struct bio *chain = NULL;
1343         struct bio **end;
1344
1345         /* Build up a chain of clone bios up to the limit */
1346
1347         if (!bi || off >= bi->bi_iter.bi_size || !len)
1348                 return NULL;            /* Nothing to clone */
1349
1350         end = &chain;
1351         while (len) {
1352                 unsigned int bi_size;
1353                 struct bio *bio;
1354
1355                 if (!bi) {
1356                         rbd_warn(NULL, "bio_chain exhausted with %u left", len);
1357                         goto out_err;   /* EINVAL; ran out of bio's */
1358                 }
1359                 bi_size = min_t(unsigned int, bi->bi_iter.bi_size - off, len);
1360                 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1361                 if (!bio)
1362                         goto out_err;   /* ENOMEM */
1363
1364                 *end = bio;
1365                 end = &bio->bi_next;
1366
1367                 off += bi_size;
1368                 if (off == bi->bi_iter.bi_size) {
1369                         bi = bi->bi_next;
1370                         off = 0;
1371                 }
1372                 len -= bi_size;
1373         }
1374         *bio_src = bi;
1375         *offset = off;
1376
1377         return chain;
1378 out_err:
1379         bio_chain_put(chain);
1380
1381         return NULL;
1382 }
1383
1384 /*
1385  * The default/initial value for all object request flags is 0.  For
1386  * each flag, once its value is set to 1 it is never reset to 0
1387  * again.
1388  */
1389 static void obj_request_img_data_set(struct rbd_obj_request *obj_request)
1390 {
1391         if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) {
1392                 struct rbd_device *rbd_dev;
1393
1394                 rbd_dev = obj_request->img_request->rbd_dev;
1395                 rbd_warn(rbd_dev, "obj_request %p already marked img_data",
1396                         obj_request);
1397         }
1398 }
1399
1400 static bool obj_request_img_data_test(struct rbd_obj_request *obj_request)
1401 {
1402         smp_mb();
1403         return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0;
1404 }
1405
1406 static void obj_request_done_set(struct rbd_obj_request *obj_request)
1407 {
1408         if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) {
1409                 struct rbd_device *rbd_dev = NULL;
1410
1411                 if (obj_request_img_data_test(obj_request))
1412                         rbd_dev = obj_request->img_request->rbd_dev;
1413                 rbd_warn(rbd_dev, "obj_request %p already marked done",
1414                         obj_request);
1415         }
1416 }
1417
1418 static bool obj_request_done_test(struct rbd_obj_request *obj_request)
1419 {
1420         smp_mb();
1421         return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0;
1422 }
1423
1424 /*
1425  * This sets the KNOWN flag after (possibly) setting the EXISTS
1426  * flag.  The latter is set based on the "exists" value provided.
1427  *
1428  * Note that for our purposes once an object exists it never goes
1429  * away again.  It's possible that the response from two existence
1430  * checks are separated by the creation of the target object, and
1431  * the first ("doesn't exist") response arrives *after* the second
1432  * ("does exist").  In that case we ignore the second one.
1433  */
1434 static void obj_request_existence_set(struct rbd_obj_request *obj_request,
1435                                 bool exists)
1436 {
1437         if (exists)
1438                 set_bit(OBJ_REQ_EXISTS, &obj_request->flags);
1439         set_bit(OBJ_REQ_KNOWN, &obj_request->flags);
1440         smp_mb();
1441 }
1442
1443 static bool obj_request_known_test(struct rbd_obj_request *obj_request)
1444 {
1445         smp_mb();
1446         return test_bit(OBJ_REQ_KNOWN, &obj_request->flags) != 0;
1447 }
1448
1449 static bool obj_request_exists_test(struct rbd_obj_request *obj_request)
1450 {
1451         smp_mb();
1452         return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0;
1453 }
1454
1455 static bool obj_request_overlaps_parent(struct rbd_obj_request *obj_request)
1456 {
1457         struct rbd_device *rbd_dev = obj_request->img_request->rbd_dev;
1458
1459         return obj_request->img_offset <
1460             round_up(rbd_dev->parent_overlap, rbd_obj_bytes(&rbd_dev->header));
1461 }
1462
1463 static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1464 {
1465         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1466                 atomic_read(&obj_request->kref.refcount));
1467         kref_get(&obj_request->kref);
1468 }
1469
1470 static void rbd_obj_request_destroy(struct kref *kref);
1471 static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1472 {
1473         rbd_assert(obj_request != NULL);
1474         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1475                 atomic_read(&obj_request->kref.refcount));
1476         kref_put(&obj_request->kref, rbd_obj_request_destroy);
1477 }
1478
1479 static void rbd_img_request_get(struct rbd_img_request *img_request)
1480 {
1481         dout("%s: img %p (was %d)\n", __func__, img_request,
1482              atomic_read(&img_request->kref.refcount));
1483         kref_get(&img_request->kref);
1484 }
1485
1486 static bool img_request_child_test(struct rbd_img_request *img_request);
1487 static void rbd_parent_request_destroy(struct kref *kref);
1488 static void rbd_img_request_destroy(struct kref *kref);
1489 static void rbd_img_request_put(struct rbd_img_request *img_request)
1490 {
1491         rbd_assert(img_request != NULL);
1492         dout("%s: img %p (was %d)\n", __func__, img_request,
1493                 atomic_read(&img_request->kref.refcount));
1494         if (img_request_child_test(img_request))
1495                 kref_put(&img_request->kref, rbd_parent_request_destroy);
1496         else
1497                 kref_put(&img_request->kref, rbd_img_request_destroy);
1498 }
1499
1500 static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1501                                         struct rbd_obj_request *obj_request)
1502 {
1503         rbd_assert(obj_request->img_request == NULL);
1504
1505         /* Image request now owns object's original reference */
1506         obj_request->img_request = img_request;
1507         obj_request->which = img_request->obj_request_count;
1508         rbd_assert(!obj_request_img_data_test(obj_request));
1509         obj_request_img_data_set(obj_request);
1510         rbd_assert(obj_request->which != BAD_WHICH);
1511         img_request->obj_request_count++;
1512         list_add_tail(&obj_request->links, &img_request->obj_requests);
1513         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1514                 obj_request->which);
1515 }
1516
1517 static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1518                                         struct rbd_obj_request *obj_request)
1519 {
1520         rbd_assert(obj_request->which != BAD_WHICH);
1521
1522         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1523                 obj_request->which);
1524         list_del(&obj_request->links);
1525         rbd_assert(img_request->obj_request_count > 0);
1526         img_request->obj_request_count--;
1527         rbd_assert(obj_request->which == img_request->obj_request_count);
1528         obj_request->which = BAD_WHICH;
1529         rbd_assert(obj_request_img_data_test(obj_request));
1530         rbd_assert(obj_request->img_request == img_request);
1531         obj_request->img_request = NULL;
1532         obj_request->callback = NULL;
1533         rbd_obj_request_put(obj_request);
1534 }
1535
1536 static bool obj_request_type_valid(enum obj_request_type type)
1537 {
1538         switch (type) {
1539         case OBJ_REQUEST_NODATA:
1540         case OBJ_REQUEST_BIO:
1541         case OBJ_REQUEST_PAGES:
1542                 return true;
1543         default:
1544                 return false;
1545         }
1546 }
1547
1548 static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
1549                                 struct rbd_obj_request *obj_request)
1550 {
1551         dout("%s %p\n", __func__, obj_request);
1552         return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
1553 }
1554
1555 static void rbd_obj_request_end(struct rbd_obj_request *obj_request)
1556 {
1557         dout("%s %p\n", __func__, obj_request);
1558         ceph_osdc_cancel_request(obj_request->osd_req);
1559 }
1560
1561 /*
1562  * Wait for an object request to complete.  If interrupted, cancel the
1563  * underlying osd request.
1564  */
1565 static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
1566 {
1567         int ret;
1568
1569         dout("%s %p\n", __func__, obj_request);
1570
1571         ret = wait_for_completion_interruptible(&obj_request->completion);
1572         if (ret < 0) {
1573                 dout("%s %p interrupted\n", __func__, obj_request);
1574                 rbd_obj_request_end(obj_request);
1575                 return ret;
1576         }
1577
1578         dout("%s %p done\n", __func__, obj_request);
1579         return 0;
1580 }
1581
1582 static void rbd_img_request_complete(struct rbd_img_request *img_request)
1583 {
1584
1585         dout("%s: img %p\n", __func__, img_request);
1586
1587         /*
1588          * If no error occurred, compute the aggregate transfer
1589          * count for the image request.  We could instead use
1590          * atomic64_cmpxchg() to update it as each object request
1591          * completes; not clear which way is better off hand.
1592          */
1593         if (!img_request->result) {
1594                 struct rbd_obj_request *obj_request;
1595                 u64 xferred = 0;
1596
1597                 for_each_obj_request(img_request, obj_request)
1598                         xferred += obj_request->xferred;
1599                 img_request->xferred = xferred;
1600         }
1601
1602         if (img_request->callback)
1603                 img_request->callback(img_request);
1604         else
1605                 rbd_img_request_put(img_request);
1606 }
1607
1608 /*
1609  * The default/initial value for all image request flags is 0.  Each
1610  * is conditionally set to 1 at image request initialization time
1611  * and currently never change thereafter.
1612  */
1613 static void img_request_write_set(struct rbd_img_request *img_request)
1614 {
1615         set_bit(IMG_REQ_WRITE, &img_request->flags);
1616         smp_mb();
1617 }
1618
1619 static bool img_request_write_test(struct rbd_img_request *img_request)
1620 {
1621         smp_mb();
1622         return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0;
1623 }
1624
1625 /*
1626  * Set the discard flag when the img_request is an discard request
1627  */
1628 static void img_request_discard_set(struct rbd_img_request *img_request)
1629 {
1630         set_bit(IMG_REQ_DISCARD, &img_request->flags);
1631         smp_mb();
1632 }
1633
1634 static bool img_request_discard_test(struct rbd_img_request *img_request)
1635 {
1636         smp_mb();
1637         return test_bit(IMG_REQ_DISCARD, &img_request->flags) != 0;
1638 }
1639
1640 static void img_request_child_set(struct rbd_img_request *img_request)
1641 {
1642         set_bit(IMG_REQ_CHILD, &img_request->flags);
1643         smp_mb();
1644 }
1645
1646 static void img_request_child_clear(struct rbd_img_request *img_request)
1647 {
1648         clear_bit(IMG_REQ_CHILD, &img_request->flags);
1649         smp_mb();
1650 }
1651
1652 static bool img_request_child_test(struct rbd_img_request *img_request)
1653 {
1654         smp_mb();
1655         return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0;
1656 }
1657
1658 static void img_request_layered_set(struct rbd_img_request *img_request)
1659 {
1660         set_bit(IMG_REQ_LAYERED, &img_request->flags);
1661         smp_mb();
1662 }
1663
1664 static void img_request_layered_clear(struct rbd_img_request *img_request)
1665 {
1666         clear_bit(IMG_REQ_LAYERED, &img_request->flags);
1667         smp_mb();
1668 }
1669
1670 static bool img_request_layered_test(struct rbd_img_request *img_request)
1671 {
1672         smp_mb();
1673         return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1674 }
1675
1676 static enum obj_operation_type
1677 rbd_img_request_op_type(struct rbd_img_request *img_request)
1678 {
1679         if (img_request_write_test(img_request))
1680                 return OBJ_OP_WRITE;
1681         else if (img_request_discard_test(img_request))
1682                 return OBJ_OP_DISCARD;
1683         else
1684                 return OBJ_OP_READ;
1685 }
1686
1687 static void
1688 rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1689 {
1690         u64 xferred = obj_request->xferred;
1691         u64 length = obj_request->length;
1692
1693         dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1694                 obj_request, obj_request->img_request, obj_request->result,
1695                 xferred, length);
1696         /*
1697          * ENOENT means a hole in the image.  We zero-fill the entire
1698          * length of the request.  A short read also implies zero-fill
1699          * to the end of the request.  An error requires the whole
1700          * length of the request to be reported finished with an error
1701          * to the block layer.  In each case we update the xferred
1702          * count to indicate the whole request was satisfied.
1703          */
1704         rbd_assert(obj_request->type != OBJ_REQUEST_NODATA);
1705         if (obj_request->result == -ENOENT) {
1706                 if (obj_request->type == OBJ_REQUEST_BIO)
1707                         zero_bio_chain(obj_request->bio_list, 0);
1708                 else
1709                         zero_pages(obj_request->pages, 0, length);
1710                 obj_request->result = 0;
1711         } else if (xferred < length && !obj_request->result) {
1712                 if (obj_request->type == OBJ_REQUEST_BIO)
1713                         zero_bio_chain(obj_request->bio_list, xferred);
1714                 else
1715                         zero_pages(obj_request->pages, xferred, length);
1716         }
1717         obj_request->xferred = length;
1718         obj_request_done_set(obj_request);
1719 }
1720
1721 static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1722 {
1723         dout("%s: obj %p cb %p\n", __func__, obj_request,
1724                 obj_request->callback);
1725         if (obj_request->callback)
1726                 obj_request->callback(obj_request);
1727         else
1728                 complete_all(&obj_request->completion);
1729 }
1730
1731 static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request)
1732 {
1733         dout("%s: obj %p\n", __func__, obj_request);
1734         obj_request_done_set(obj_request);
1735 }
1736
1737 static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
1738 {
1739         struct rbd_img_request *img_request = NULL;
1740         struct rbd_device *rbd_dev = NULL;
1741         bool layered = false;
1742
1743         if (obj_request_img_data_test(obj_request)) {
1744                 img_request = obj_request->img_request;
1745                 layered = img_request && img_request_layered_test(img_request);
1746                 rbd_dev = img_request->rbd_dev;
1747         }
1748
1749         dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1750                 obj_request, img_request, obj_request->result,
1751                 obj_request->xferred, obj_request->length);
1752         if (layered && obj_request->result == -ENOENT &&
1753                         obj_request->img_offset < rbd_dev->parent_overlap)
1754                 rbd_img_parent_read(obj_request);
1755         else if (img_request)
1756                 rbd_img_obj_request_read_callback(obj_request);
1757         else
1758                 obj_request_done_set(obj_request);
1759 }
1760
1761 static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
1762 {
1763         dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1764                 obj_request->result, obj_request->length);
1765         /*
1766          * There is no such thing as a successful short write.  Set
1767          * it to our originally-requested length.
1768          */
1769         obj_request->xferred = obj_request->length;
1770         obj_request_done_set(obj_request);
1771 }
1772
1773 static void rbd_osd_discard_callback(struct rbd_obj_request *obj_request)
1774 {
1775         dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1776                 obj_request->result, obj_request->length);
1777         /*
1778          * There is no such thing as a successful short discard.  Set
1779          * it to our originally-requested length.
1780          */
1781         obj_request->xferred = obj_request->length;
1782         /* discarding a non-existent object is not a problem */
1783         if (obj_request->result == -ENOENT)
1784                 obj_request->result = 0;
1785         obj_request_done_set(obj_request);
1786 }
1787
1788 /*
1789  * For a simple stat call there's nothing to do.  We'll do more if
1790  * this is part of a write sequence for a layered image.
1791  */
1792 static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
1793 {
1794         dout("%s: obj %p\n", __func__, obj_request);
1795         obj_request_done_set(obj_request);
1796 }
1797
1798 static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1799                                 struct ceph_msg *msg)
1800 {
1801         struct rbd_obj_request *obj_request = osd_req->r_priv;
1802         u16 opcode;
1803
1804         dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg);
1805         rbd_assert(osd_req == obj_request->osd_req);
1806         if (obj_request_img_data_test(obj_request)) {
1807                 rbd_assert(obj_request->img_request);
1808                 rbd_assert(obj_request->which != BAD_WHICH);
1809         } else {
1810                 rbd_assert(obj_request->which == BAD_WHICH);
1811         }
1812
1813         if (osd_req->r_result < 0)
1814                 obj_request->result = osd_req->r_result;
1815
1816         rbd_assert(osd_req->r_num_ops <= CEPH_OSD_MAX_OP);
1817
1818         /*
1819          * We support a 64-bit length, but ultimately it has to be
1820          * passed to blk_end_request(), which takes an unsigned int.
1821          */
1822         obj_request->xferred = osd_req->r_reply_op_len[0];
1823         rbd_assert(obj_request->xferred < (u64)UINT_MAX);
1824
1825         opcode = osd_req->r_ops[0].op;
1826         switch (opcode) {
1827         case CEPH_OSD_OP_READ:
1828                 rbd_osd_read_callback(obj_request);
1829                 break;
1830         case CEPH_OSD_OP_SETALLOCHINT:
1831                 rbd_assert(osd_req->r_ops[1].op == CEPH_OSD_OP_WRITE);
1832                 /* fall through */
1833         case CEPH_OSD_OP_WRITE:
1834                 rbd_osd_write_callback(obj_request);
1835                 break;
1836         case CEPH_OSD_OP_STAT:
1837                 rbd_osd_stat_callback(obj_request);
1838                 break;
1839         case CEPH_OSD_OP_DELETE:
1840         case CEPH_OSD_OP_TRUNCATE:
1841         case CEPH_OSD_OP_ZERO:
1842                 rbd_osd_discard_callback(obj_request);
1843                 break;
1844         case CEPH_OSD_OP_CALL:
1845         case CEPH_OSD_OP_NOTIFY_ACK:
1846         case CEPH_OSD_OP_WATCH:
1847                 rbd_osd_trivial_callback(obj_request);
1848                 break;
1849         default:
1850                 rbd_warn(NULL, "%s: unsupported op %hu",
1851                         obj_request->object_name, (unsigned short) opcode);
1852                 break;
1853         }
1854
1855         if (obj_request_done_test(obj_request))
1856                 rbd_obj_request_complete(obj_request);
1857 }
1858
1859 static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
1860 {
1861         struct rbd_img_request *img_request = obj_request->img_request;
1862         struct ceph_osd_request *osd_req = obj_request->osd_req;
1863         u64 snap_id;
1864
1865         rbd_assert(osd_req != NULL);
1866
1867         snap_id = img_request ? img_request->snap_id : CEPH_NOSNAP;
1868         ceph_osdc_build_request(osd_req, obj_request->offset,
1869                         NULL, snap_id, NULL);
1870 }
1871
1872 static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
1873 {
1874         struct rbd_img_request *img_request = obj_request->img_request;
1875         struct ceph_osd_request *osd_req = obj_request->osd_req;
1876         struct ceph_snap_context *snapc;
1877         struct timespec mtime = CURRENT_TIME;
1878
1879         rbd_assert(osd_req != NULL);
1880
1881         snapc = img_request ? img_request->snapc : NULL;
1882         ceph_osdc_build_request(osd_req, obj_request->offset,
1883                         snapc, CEPH_NOSNAP, &mtime);
1884 }
1885
1886 /*
1887  * Create an osd request.  A read request has one osd op (read).
1888  * A write request has either one (watch) or two (hint+write) osd ops.
1889  * (All rbd data writes are prefixed with an allocation hint op, but
1890  * technically osd watch is a write request, hence this distinction.)
1891  */
1892 static struct ceph_osd_request *rbd_osd_req_create(
1893                                         struct rbd_device *rbd_dev,
1894                                         enum obj_operation_type op_type,
1895                                         unsigned int num_ops,
1896                                         struct rbd_obj_request *obj_request)
1897 {
1898         struct ceph_snap_context *snapc = NULL;
1899         struct ceph_osd_client *osdc;
1900         struct ceph_osd_request *osd_req;
1901
1902         if (obj_request_img_data_test(obj_request) &&
1903                 (op_type == OBJ_OP_DISCARD || op_type == OBJ_OP_WRITE)) {
1904                 struct rbd_img_request *img_request = obj_request->img_request;
1905                 if (op_type == OBJ_OP_WRITE) {
1906                         rbd_assert(img_request_write_test(img_request));
1907                 } else {
1908                         rbd_assert(img_request_discard_test(img_request));
1909                 }
1910                 snapc = img_request->snapc;
1911         }
1912
1913         rbd_assert(num_ops == 1 || ((op_type == OBJ_OP_WRITE) && num_ops == 2));
1914
1915         /* Allocate and initialize the request, for the num_ops ops */
1916
1917         osdc = &rbd_dev->rbd_client->client->osdc;
1918         osd_req = ceph_osdc_alloc_request(osdc, snapc, num_ops, false,
1919                                           GFP_ATOMIC);
1920         if (!osd_req)
1921                 return NULL;    /* ENOMEM */
1922
1923         if (op_type == OBJ_OP_WRITE || op_type == OBJ_OP_DISCARD)
1924                 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1925         else
1926                 osd_req->r_flags = CEPH_OSD_FLAG_READ;
1927
1928         osd_req->r_callback = rbd_osd_req_callback;
1929         osd_req->r_priv = obj_request;
1930
1931         osd_req->r_base_oloc.pool = ceph_file_layout_pg_pool(rbd_dev->layout);
1932         ceph_oid_set_name(&osd_req->r_base_oid, obj_request->object_name);
1933
1934         return osd_req;
1935 }
1936
1937 /*
1938  * Create a copyup osd request based on the information in the object
1939  * request supplied.  A copyup request has two or three osd ops, a
1940  * copyup method call, potentially a hint op, and a write or truncate
1941  * or zero op.
1942  */
1943 static struct ceph_osd_request *
1944 rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
1945 {
1946         struct rbd_img_request *img_request;
1947         struct ceph_snap_context *snapc;
1948         struct rbd_device *rbd_dev;
1949         struct ceph_osd_client *osdc;
1950         struct ceph_osd_request *osd_req;
1951         int num_osd_ops = 3;
1952
1953         rbd_assert(obj_request_img_data_test(obj_request));
1954         img_request = obj_request->img_request;
1955         rbd_assert(img_request);
1956         rbd_assert(img_request_write_test(img_request) ||
1957                         img_request_discard_test(img_request));
1958
1959         if (img_request_discard_test(img_request))
1960                 num_osd_ops = 2;
1961
1962         /* Allocate and initialize the request, for all the ops */
1963
1964         snapc = img_request->snapc;
1965         rbd_dev = img_request->rbd_dev;
1966         osdc = &rbd_dev->rbd_client->client->osdc;
1967         osd_req = ceph_osdc_alloc_request(osdc, snapc, num_osd_ops,
1968                                                 false, GFP_ATOMIC);
1969         if (!osd_req)
1970                 return NULL;    /* ENOMEM */
1971
1972         osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1973         osd_req->r_callback = rbd_osd_req_callback;
1974         osd_req->r_priv = obj_request;
1975
1976         osd_req->r_base_oloc.pool = ceph_file_layout_pg_pool(rbd_dev->layout);
1977         ceph_oid_set_name(&osd_req->r_base_oid, obj_request->object_name);
1978
1979         return osd_req;
1980 }
1981
1982
1983 static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1984 {
1985         ceph_osdc_put_request(osd_req);
1986 }
1987
1988 /* object_name is assumed to be a non-null pointer and NUL-terminated */
1989
1990 static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
1991                                                 u64 offset, u64 length,
1992                                                 enum obj_request_type type)
1993 {
1994         struct rbd_obj_request *obj_request;
1995         size_t size;
1996         char *name;
1997
1998         rbd_assert(obj_request_type_valid(type));
1999
2000         size = strlen(object_name) + 1;
2001         name = kmalloc(size, GFP_KERNEL);
2002         if (!name)
2003                 return NULL;
2004
2005         obj_request = kmem_cache_zalloc(rbd_obj_request_cache, GFP_KERNEL);
2006         if (!obj_request) {
2007                 kfree(name);
2008                 return NULL;
2009         }
2010
2011         obj_request->object_name = memcpy(name, object_name, size);
2012         obj_request->offset = offset;
2013         obj_request->length = length;
2014         obj_request->flags = 0;
2015         obj_request->which = BAD_WHICH;
2016         obj_request->type = type;
2017         INIT_LIST_HEAD(&obj_request->links);
2018         init_completion(&obj_request->completion);
2019         kref_init(&obj_request->kref);
2020
2021         dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__, object_name,
2022                 offset, length, (int)type, obj_request);
2023
2024         return obj_request;
2025 }
2026
2027 static void rbd_obj_request_destroy(struct kref *kref)
2028 {
2029         struct rbd_obj_request *obj_request;
2030
2031         obj_request = container_of(kref, struct rbd_obj_request, kref);
2032
2033         dout("%s: obj %p\n", __func__, obj_request);
2034
2035         rbd_assert(obj_request->img_request == NULL);
2036         rbd_assert(obj_request->which == BAD_WHICH);
2037
2038         if (obj_request->osd_req)
2039                 rbd_osd_req_destroy(obj_request->osd_req);
2040
2041         rbd_assert(obj_request_type_valid(obj_request->type));
2042         switch (obj_request->type) {
2043         case OBJ_REQUEST_NODATA:
2044                 break;          /* Nothing to do */
2045         case OBJ_REQUEST_BIO:
2046                 if (obj_request->bio_list)
2047                         bio_chain_put(obj_request->bio_list);
2048                 break;
2049         case OBJ_REQUEST_PAGES:
2050                 if (obj_request->pages)
2051                         ceph_release_page_vector(obj_request->pages,
2052                                                 obj_request->page_count);
2053                 break;
2054         }
2055
2056         kfree(obj_request->object_name);
2057         obj_request->object_name = NULL;
2058         kmem_cache_free(rbd_obj_request_cache, obj_request);
2059 }
2060
2061 /* It's OK to call this for a device with no parent */
2062
2063 static void rbd_spec_put(struct rbd_spec *spec);
2064 static void rbd_dev_unparent(struct rbd_device *rbd_dev)
2065 {
2066         rbd_dev_remove_parent(rbd_dev);
2067         rbd_spec_put(rbd_dev->parent_spec);
2068         rbd_dev->parent_spec = NULL;
2069         rbd_dev->parent_overlap = 0;
2070 }
2071
2072 /*
2073  * Parent image reference counting is used to determine when an
2074  * image's parent fields can be safely torn down--after there are no
2075  * more in-flight requests to the parent image.  When the last
2076  * reference is dropped, cleaning them up is safe.
2077  */
2078 static void rbd_dev_parent_put(struct rbd_device *rbd_dev)
2079 {
2080         int counter;
2081
2082         if (!rbd_dev->parent_spec)
2083                 return;
2084
2085         counter = atomic_dec_return_safe(&rbd_dev->parent_ref);
2086         if (counter > 0)
2087                 return;
2088
2089         /* Last reference; clean up parent data structures */
2090
2091         if (!counter)
2092                 rbd_dev_unparent(rbd_dev);
2093         else
2094                 rbd_warn(rbd_dev, "parent reference underflow");
2095 }
2096
2097 /*
2098  * If an image has a non-zero parent overlap, get a reference to its
2099  * parent.
2100  *
2101  * Returns true if the rbd device has a parent with a non-zero
2102  * overlap and a reference for it was successfully taken, or
2103  * false otherwise.
2104  */
2105 static bool rbd_dev_parent_get(struct rbd_device *rbd_dev)
2106 {
2107         int counter = 0;
2108
2109         if (!rbd_dev->parent_spec)
2110                 return false;
2111
2112         down_read(&rbd_dev->header_rwsem);
2113         if (rbd_dev->parent_overlap)
2114                 counter = atomic_inc_return_safe(&rbd_dev->parent_ref);
2115         up_read(&rbd_dev->header_rwsem);
2116
2117         if (counter < 0)
2118                 rbd_warn(rbd_dev, "parent reference overflow");
2119
2120         return counter > 0;
2121 }
2122
2123 /*
2124  * Caller is responsible for filling in the list of object requests
2125  * that comprises the image request, and the Linux request pointer
2126  * (if there is one).
2127  */
2128 static struct rbd_img_request *rbd_img_request_create(
2129                                         struct rbd_device *rbd_dev,
2130                                         u64 offset, u64 length,
2131                                         enum obj_operation_type op_type,
2132                                         struct ceph_snap_context *snapc)
2133 {
2134         struct rbd_img_request *img_request;
2135
2136         img_request = kmem_cache_alloc(rbd_img_request_cache, GFP_NOIO);
2137         if (!img_request)
2138                 return NULL;
2139
2140         img_request->rq = NULL;
2141         img_request->rbd_dev = rbd_dev;
2142         img_request->offset = offset;
2143         img_request->length = length;
2144         img_request->flags = 0;
2145         if (op_type == OBJ_OP_DISCARD) {
2146                 img_request_discard_set(img_request);
2147                 img_request->snapc = snapc;
2148         } else if (op_type == OBJ_OP_WRITE) {
2149                 img_request_write_set(img_request);
2150                 img_request->snapc = snapc;
2151         } else {
2152                 img_request->snap_id = rbd_dev->spec->snap_id;
2153         }
2154         if (rbd_dev_parent_get(rbd_dev))
2155                 img_request_layered_set(img_request);
2156         spin_lock_init(&img_request->completion_lock);
2157         img_request->next_completion = 0;
2158         img_request->callback = NULL;
2159         img_request->result = 0;
2160         img_request->obj_request_count = 0;
2161         INIT_LIST_HEAD(&img_request->obj_requests);
2162         kref_init(&img_request->kref);
2163
2164         dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
2165                 obj_op_name(op_type), offset, length, img_request);
2166
2167         return img_request;
2168 }
2169
2170 static void rbd_img_request_destroy(struct kref *kref)
2171 {
2172         struct rbd_img_request *img_request;
2173         struct rbd_obj_request *obj_request;
2174         struct rbd_obj_request *next_obj_request;
2175
2176         img_request = container_of(kref, struct rbd_img_request, kref);
2177
2178         dout("%s: img %p\n", __func__, img_request);
2179
2180         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2181                 rbd_img_obj_request_del(img_request, obj_request);
2182         rbd_assert(img_request->obj_request_count == 0);
2183
2184         if (img_request_layered_test(img_request)) {
2185                 img_request_layered_clear(img_request);
2186                 rbd_dev_parent_put(img_request->rbd_dev);
2187         }
2188
2189         if (img_request_write_test(img_request) ||
2190                 img_request_discard_test(img_request))
2191                 ceph_put_snap_context(img_request->snapc);
2192
2193         kmem_cache_free(rbd_img_request_cache, img_request);
2194 }
2195
2196 static struct rbd_img_request *rbd_parent_request_create(
2197                                         struct rbd_obj_request *obj_request,
2198                                         u64 img_offset, u64 length)
2199 {
2200         struct rbd_img_request *parent_request;
2201         struct rbd_device *rbd_dev;
2202
2203         rbd_assert(obj_request->img_request);
2204         rbd_dev = obj_request->img_request->rbd_dev;
2205
2206         parent_request = rbd_img_request_create(rbd_dev->parent, img_offset,
2207                                                 length, OBJ_OP_READ, NULL);
2208         if (!parent_request)
2209                 return NULL;
2210
2211         img_request_child_set(parent_request);
2212         rbd_obj_request_get(obj_request);
2213         parent_request->obj_request = obj_request;
2214
2215         return parent_request;
2216 }
2217
2218 static void rbd_parent_request_destroy(struct kref *kref)
2219 {
2220         struct rbd_img_request *parent_request;
2221         struct rbd_obj_request *orig_request;
2222
2223         parent_request = container_of(kref, struct rbd_img_request, kref);
2224         orig_request = parent_request->obj_request;
2225
2226         parent_request->obj_request = NULL;
2227         rbd_obj_request_put(orig_request);
2228         img_request_child_clear(parent_request);
2229
2230         rbd_img_request_destroy(kref);
2231 }
2232
2233 static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
2234 {
2235         struct rbd_img_request *img_request;
2236         unsigned int xferred;
2237         int result;
2238         bool more;
2239
2240         rbd_assert(obj_request_img_data_test(obj_request));
2241         img_request = obj_request->img_request;
2242
2243         rbd_assert(obj_request->xferred <= (u64)UINT_MAX);
2244         xferred = (unsigned int)obj_request->xferred;
2245         result = obj_request->result;
2246         if (result) {
2247                 struct rbd_device *rbd_dev = img_request->rbd_dev;
2248                 enum obj_operation_type op_type;
2249
2250                 if (img_request_discard_test(img_request))
2251                         op_type = OBJ_OP_DISCARD;
2252                 else if (img_request_write_test(img_request))
2253                         op_type = OBJ_OP_WRITE;
2254                 else
2255                         op_type = OBJ_OP_READ;
2256
2257                 rbd_warn(rbd_dev, "%s %llx at %llx (%llx)",
2258                         obj_op_name(op_type), obj_request->length,
2259                         obj_request->img_offset, obj_request->offset);
2260                 rbd_warn(rbd_dev, "  result %d xferred %x",
2261                         result, xferred);
2262                 if (!img_request->result)
2263                         img_request->result = result;
2264         }
2265
2266         /* Image object requests don't own their page array */
2267
2268         if (obj_request->type == OBJ_REQUEST_PAGES) {
2269                 obj_request->pages = NULL;
2270                 obj_request->page_count = 0;
2271         }
2272
2273         if (img_request_child_test(img_request)) {
2274                 rbd_assert(img_request->obj_request != NULL);
2275                 more = obj_request->which < img_request->obj_request_count - 1;
2276         } else {
2277                 rbd_assert(img_request->rq != NULL);
2278                 more = blk_end_request(img_request->rq, result, xferred);
2279         }
2280
2281         return more;
2282 }
2283
2284 static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
2285 {
2286         struct rbd_img_request *img_request;
2287         u32 which = obj_request->which;
2288         bool more = true;
2289
2290         rbd_assert(obj_request_img_data_test(obj_request));
2291         img_request = obj_request->img_request;
2292
2293         dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
2294         rbd_assert(img_request != NULL);
2295         rbd_assert(img_request->obj_request_count > 0);
2296         rbd_assert(which != BAD_WHICH);
2297         rbd_assert(which < img_request->obj_request_count);
2298
2299         spin_lock_irq(&img_request->completion_lock);
2300         if (which != img_request->next_completion)
2301                 goto out;
2302
2303         for_each_obj_request_from(img_request, obj_request) {
2304                 rbd_assert(more);
2305                 rbd_assert(which < img_request->obj_request_count);
2306
2307                 if (!obj_request_done_test(obj_request))
2308                         break;
2309                 more = rbd_img_obj_end_request(obj_request);
2310                 which++;
2311         }
2312
2313         rbd_assert(more ^ (which == img_request->obj_request_count));
2314         img_request->next_completion = which;
2315 out:
2316         spin_unlock_irq(&img_request->completion_lock);
2317         rbd_img_request_put(img_request);
2318
2319         if (!more)
2320                 rbd_img_request_complete(img_request);
2321 }
2322
2323 /*
2324  * Add individual osd ops to the given ceph_osd_request and prepare
2325  * them for submission. num_ops is the current number of
2326  * osd operations already to the object request.
2327  */
2328 static void rbd_img_obj_request_fill(struct rbd_obj_request *obj_request,
2329                                 struct ceph_osd_request *osd_request,
2330                                 enum obj_operation_type op_type,
2331                                 unsigned int num_ops)
2332 {
2333         struct rbd_img_request *img_request = obj_request->img_request;
2334         struct rbd_device *rbd_dev = img_request->rbd_dev;
2335         u64 object_size = rbd_obj_bytes(&rbd_dev->header);
2336         u64 offset = obj_request->offset;
2337         u64 length = obj_request->length;
2338         u64 img_end;
2339         u16 opcode;
2340
2341         if (op_type == OBJ_OP_DISCARD) {
2342                 if (!offset && length == object_size &&
2343                     (!img_request_layered_test(img_request) ||
2344                      !obj_request_overlaps_parent(obj_request))) {
2345                         opcode = CEPH_OSD_OP_DELETE;
2346                 } else if ((offset + length == object_size)) {
2347                         opcode = CEPH_OSD_OP_TRUNCATE;
2348                 } else {
2349                         down_read(&rbd_dev->header_rwsem);
2350                         img_end = rbd_dev->header.image_size;
2351                         up_read(&rbd_dev->header_rwsem);
2352
2353                         if (obj_request->img_offset + length == img_end)
2354                                 opcode = CEPH_OSD_OP_TRUNCATE;
2355                         else
2356                                 opcode = CEPH_OSD_OP_ZERO;
2357                 }
2358         } else if (op_type == OBJ_OP_WRITE) {
2359                 opcode = CEPH_OSD_OP_WRITE;
2360                 osd_req_op_alloc_hint_init(osd_request, num_ops,
2361                                         object_size, object_size);
2362                 num_ops++;
2363         } else {
2364                 opcode = CEPH_OSD_OP_READ;
2365         }
2366
2367         if (opcode == CEPH_OSD_OP_DELETE)
2368                 osd_req_op_init(osd_request, num_ops, opcode);
2369         else
2370                 osd_req_op_extent_init(osd_request, num_ops, opcode,
2371                                        offset, length, 0, 0);
2372
2373         if (obj_request->type == OBJ_REQUEST_BIO)
2374                 osd_req_op_extent_osd_data_bio(osd_request, num_ops,
2375                                         obj_request->bio_list, length);
2376         else if (obj_request->type == OBJ_REQUEST_PAGES)
2377                 osd_req_op_extent_osd_data_pages(osd_request, num_ops,
2378                                         obj_request->pages, length,
2379                                         offset & ~PAGE_MASK, false, false);
2380
2381         /* Discards are also writes */
2382         if (op_type == OBJ_OP_WRITE || op_type == OBJ_OP_DISCARD)
2383                 rbd_osd_req_format_write(obj_request);
2384         else
2385                 rbd_osd_req_format_read(obj_request);
2386 }
2387
2388 /*
2389  * Split up an image request into one or more object requests, each
2390  * to a different object.  The "type" parameter indicates whether
2391  * "data_desc" is the pointer to the head of a list of bio
2392  * structures, or the base of a page array.  In either case this
2393  * function assumes data_desc describes memory sufficient to hold
2394  * all data described by the image request.
2395  */
2396 static int rbd_img_request_fill(struct rbd_img_request *img_request,
2397                                         enum obj_request_type type,
2398                                         void *data_desc)
2399 {
2400         struct rbd_device *rbd_dev = img_request->rbd_dev;
2401         struct rbd_obj_request *obj_request = NULL;
2402         struct rbd_obj_request *next_obj_request;
2403         struct bio *bio_list = NULL;
2404         unsigned int bio_offset = 0;
2405         struct page **pages = NULL;
2406         enum obj_operation_type op_type;
2407         u64 img_offset;
2408         u64 resid;
2409
2410         dout("%s: img %p type %d data_desc %p\n", __func__, img_request,
2411                 (int)type, data_desc);
2412
2413         img_offset = img_request->offset;
2414         resid = img_request->length;
2415         rbd_assert(resid > 0);
2416         op_type = rbd_img_request_op_type(img_request);
2417
2418         if (type == OBJ_REQUEST_BIO) {
2419                 bio_list = data_desc;
2420                 rbd_assert(img_offset ==
2421                            bio_list->bi_iter.bi_sector << SECTOR_SHIFT);
2422         } else if (type == OBJ_REQUEST_PAGES) {
2423                 pages = data_desc;
2424         }
2425
2426         while (resid) {
2427                 struct ceph_osd_request *osd_req;
2428                 const char *object_name;
2429                 u64 offset;
2430                 u64 length;
2431
2432                 object_name = rbd_segment_name(rbd_dev, img_offset);
2433                 if (!object_name)
2434                         goto out_unwind;
2435                 offset = rbd_segment_offset(rbd_dev, img_offset);
2436                 length = rbd_segment_length(rbd_dev, img_offset, resid);
2437                 obj_request = rbd_obj_request_create(object_name,
2438                                                 offset, length, type);
2439                 /* object request has its own copy of the object name */
2440                 rbd_segment_name_free(object_name);
2441                 if (!obj_request)
2442                         goto out_unwind;
2443
2444                 /*
2445                  * set obj_request->img_request before creating the
2446                  * osd_request so that it gets the right snapc
2447                  */
2448                 rbd_img_obj_request_add(img_request, obj_request);
2449
2450                 if (type == OBJ_REQUEST_BIO) {
2451                         unsigned int clone_size;
2452
2453                         rbd_assert(length <= (u64)UINT_MAX);
2454                         clone_size = (unsigned int)length;
2455                         obj_request->bio_list =
2456                                         bio_chain_clone_range(&bio_list,
2457                                                                 &bio_offset,
2458                                                                 clone_size,
2459                                                                 GFP_ATOMIC);
2460                         if (!obj_request->bio_list)
2461                                 goto out_unwind;
2462                 } else if (type == OBJ_REQUEST_PAGES) {
2463                         unsigned int page_count;
2464
2465                         obj_request->pages = pages;
2466                         page_count = (u32)calc_pages_for(offset, length);
2467                         obj_request->page_count = page_count;
2468                         if ((offset + length) & ~PAGE_MASK)
2469                                 page_count--;   /* more on last page */
2470                         pages += page_count;
2471                 }
2472
2473                 osd_req = rbd_osd_req_create(rbd_dev, op_type,
2474                                         (op_type == OBJ_OP_WRITE) ? 2 : 1,
2475                                         obj_request);
2476                 if (!osd_req)
2477                         goto out_unwind;
2478
2479                 obj_request->osd_req = osd_req;
2480                 obj_request->callback = rbd_img_obj_callback;
2481                 obj_request->img_offset = img_offset;
2482
2483                 rbd_img_obj_request_fill(obj_request, osd_req, op_type, 0);
2484
2485                 rbd_img_request_get(img_request);
2486
2487                 img_offset += length;
2488                 resid -= length;
2489         }
2490
2491         return 0;
2492
2493 out_unwind:
2494         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2495                 rbd_img_obj_request_del(img_request, obj_request);
2496
2497         return -ENOMEM;
2498 }
2499
2500 static void
2501 rbd_img_obj_copyup_callback(struct rbd_obj_request *obj_request)
2502 {
2503         struct rbd_img_request *img_request;
2504         struct rbd_device *rbd_dev;
2505         struct page **pages;
2506         u32 page_count;
2507
2508         rbd_assert(obj_request->type == OBJ_REQUEST_BIO ||
2509                 obj_request->type == OBJ_REQUEST_NODATA);
2510         rbd_assert(obj_request_img_data_test(obj_request));
2511         img_request = obj_request->img_request;
2512         rbd_assert(img_request);
2513
2514         rbd_dev = img_request->rbd_dev;
2515         rbd_assert(rbd_dev);
2516
2517         pages = obj_request->copyup_pages;
2518         rbd_assert(pages != NULL);
2519         obj_request->copyup_pages = NULL;
2520         page_count = obj_request->copyup_page_count;
2521         rbd_assert(page_count);
2522         obj_request->copyup_page_count = 0;
2523         ceph_release_page_vector(pages, page_count);
2524
2525         /*
2526          * We want the transfer count to reflect the size of the
2527          * original write request.  There is no such thing as a
2528          * successful short write, so if the request was successful
2529          * we can just set it to the originally-requested length.
2530          */
2531         if (!obj_request->result)
2532                 obj_request->xferred = obj_request->length;
2533
2534         /* Finish up with the normal image object callback */
2535
2536         rbd_img_obj_callback(obj_request);
2537 }
2538
2539 static void
2540 rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
2541 {
2542         struct rbd_obj_request *orig_request;
2543         struct ceph_osd_request *osd_req;
2544         struct ceph_osd_client *osdc;
2545         struct rbd_device *rbd_dev;
2546         struct page **pages;
2547         enum obj_operation_type op_type;
2548         u32 page_count;
2549         int img_result;
2550         u64 parent_length;
2551
2552         rbd_assert(img_request_child_test(img_request));
2553
2554         /* First get what we need from the image request */
2555
2556         pages = img_request->copyup_pages;
2557         rbd_assert(pages != NULL);
2558         img_request->copyup_pages = NULL;
2559         page_count = img_request->copyup_page_count;
2560         rbd_assert(page_count);
2561         img_request->copyup_page_count = 0;
2562
2563         orig_request = img_request->obj_request;
2564         rbd_assert(orig_request != NULL);
2565         rbd_assert(obj_request_type_valid(orig_request->type));
2566         img_result = img_request->result;
2567         parent_length = img_request->length;
2568         rbd_assert(parent_length == img_request->xferred);
2569         rbd_img_request_put(img_request);
2570
2571         rbd_assert(orig_request->img_request);
2572         rbd_dev = orig_request->img_request->rbd_dev;
2573         rbd_assert(rbd_dev);
2574
2575         /*
2576          * If the overlap has become 0 (most likely because the
2577          * image has been flattened) we need to free the pages
2578          * and re-submit the original write request.
2579          */
2580         if (!rbd_dev->parent_overlap) {
2581                 struct ceph_osd_client *osdc;
2582
2583                 ceph_release_page_vector(pages, page_count);
2584                 osdc = &rbd_dev->rbd_client->client->osdc;
2585                 img_result = rbd_obj_request_submit(osdc, orig_request);
2586                 if (!img_result)
2587                         return;
2588         }
2589
2590         if (img_result)
2591                 goto out_err;
2592
2593         /*
2594          * The original osd request is of no use to use any more.
2595          * We need a new one that can hold the three ops in a copyup
2596          * request.  Allocate the new copyup osd request for the
2597          * original request, and release the old one.
2598          */
2599         img_result = -ENOMEM;
2600         osd_req = rbd_osd_req_create_copyup(orig_request);
2601         if (!osd_req)
2602                 goto out_err;
2603         rbd_osd_req_destroy(orig_request->osd_req);
2604         orig_request->osd_req = osd_req;
2605         orig_request->copyup_pages = pages;
2606         orig_request->copyup_page_count = page_count;
2607
2608         /* Initialize the copyup op */
2609
2610         osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup");
2611         osd_req_op_cls_request_data_pages(osd_req, 0, pages, parent_length, 0,
2612                                                 false, false);
2613
2614         /* Add the other op(s) */
2615
2616         op_type = rbd_img_request_op_type(orig_request->img_request);
2617         rbd_img_obj_request_fill(orig_request, osd_req, op_type, 1);
2618
2619         /* All set, send it off. */
2620
2621         orig_request->callback = rbd_img_obj_copyup_callback;
2622         osdc = &rbd_dev->rbd_client->client->osdc;
2623         img_result = rbd_obj_request_submit(osdc, orig_request);
2624         if (!img_result)
2625                 return;
2626 out_err:
2627         /* Record the error code and complete the request */
2628
2629         orig_request->result = img_result;
2630         orig_request->xferred = 0;
2631         obj_request_done_set(orig_request);
2632         rbd_obj_request_complete(orig_request);
2633 }
2634
2635 /*
2636  * Read from the parent image the range of data that covers the
2637  * entire target of the given object request.  This is used for
2638  * satisfying a layered image write request when the target of an
2639  * object request from the image request does not exist.
2640  *
2641  * A page array big enough to hold the returned data is allocated
2642  * and supplied to rbd_img_request_fill() as the "data descriptor."
2643  * When the read completes, this page array will be transferred to
2644  * the original object request for the copyup operation.
2645  *
2646  * If an error occurs, record it as the result of the original
2647  * object request and mark it done so it gets completed.
2648  */
2649 static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
2650 {
2651         struct rbd_img_request *img_request = NULL;
2652         struct rbd_img_request *parent_request = NULL;
2653         struct rbd_device *rbd_dev;
2654         u64 img_offset;
2655         u64 length;
2656         struct page **pages = NULL;
2657         u32 page_count;
2658         int result;
2659
2660         rbd_assert(obj_request_img_data_test(obj_request));
2661         rbd_assert(obj_request_type_valid(obj_request->type));
2662
2663         img_request = obj_request->img_request;
2664         rbd_assert(img_request != NULL);
2665         rbd_dev = img_request->rbd_dev;
2666         rbd_assert(rbd_dev->parent != NULL);
2667
2668         /*
2669          * Determine the byte range covered by the object in the
2670          * child image to which the original request was to be sent.
2671          */
2672         img_offset = obj_request->img_offset - obj_request->offset;
2673         length = (u64)1 << rbd_dev->header.obj_order;
2674
2675         /*
2676          * There is no defined parent data beyond the parent
2677          * overlap, so limit what we read at that boundary if
2678          * necessary.
2679          */
2680         if (img_offset + length > rbd_dev->parent_overlap) {
2681                 rbd_assert(img_offset < rbd_dev->parent_overlap);
2682                 length = rbd_dev->parent_overlap - img_offset;
2683         }
2684
2685         /*
2686          * Allocate a page array big enough to receive the data read
2687          * from the parent.
2688          */
2689         page_count = (u32)calc_pages_for(0, length);
2690         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2691         if (IS_ERR(pages)) {
2692                 result = PTR_ERR(pages);
2693                 pages = NULL;
2694                 goto out_err;
2695         }
2696
2697         result = -ENOMEM;
2698         parent_request = rbd_parent_request_create(obj_request,
2699                                                 img_offset, length);
2700         if (!parent_request)
2701                 goto out_err;
2702
2703         result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages);
2704         if (result)
2705                 goto out_err;
2706         parent_request->copyup_pages = pages;
2707         parent_request->copyup_page_count = page_count;
2708
2709         parent_request->callback = rbd_img_obj_parent_read_full_callback;
2710         result = rbd_img_request_submit(parent_request);
2711         if (!result)
2712                 return 0;
2713
2714         parent_request->copyup_pages = NULL;
2715         parent_request->copyup_page_count = 0;
2716         parent_request->obj_request = NULL;
2717         rbd_obj_request_put(obj_request);
2718 out_err:
2719         if (pages)
2720                 ceph_release_page_vector(pages, page_count);
2721         if (parent_request)
2722                 rbd_img_request_put(parent_request);
2723         obj_request->result = result;
2724         obj_request->xferred = 0;
2725         obj_request_done_set(obj_request);
2726
2727         return result;
2728 }
2729
2730 static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
2731 {
2732         struct rbd_obj_request *orig_request;
2733         struct rbd_device *rbd_dev;
2734         int result;
2735
2736         rbd_assert(!obj_request_img_data_test(obj_request));
2737
2738         /*
2739          * All we need from the object request is the original
2740          * request and the result of the STAT op.  Grab those, then
2741          * we're done with the request.
2742          */
2743         orig_request = obj_request->obj_request;
2744         obj_request->obj_request = NULL;
2745         rbd_obj_request_put(orig_request);
2746         rbd_assert(orig_request);
2747         rbd_assert(orig_request->img_request);
2748
2749         result = obj_request->result;
2750         obj_request->result = 0;
2751
2752         dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__,
2753                 obj_request, orig_request, result,
2754                 obj_request->xferred, obj_request->length);
2755         rbd_obj_request_put(obj_request);
2756
2757         /*
2758          * If the overlap has become 0 (most likely because the
2759          * image has been flattened) we need to free the pages
2760          * and re-submit the original write request.
2761          */
2762         rbd_dev = orig_request->img_request->rbd_dev;
2763         if (!rbd_dev->parent_overlap) {
2764                 struct ceph_osd_client *osdc;
2765
2766                 osdc = &rbd_dev->rbd_client->client->osdc;
2767                 result = rbd_obj_request_submit(osdc, orig_request);
2768                 if (!result)
2769                         return;
2770         }
2771
2772         /*
2773          * Our only purpose here is to determine whether the object
2774          * exists, and we don't want to treat the non-existence as
2775          * an error.  If something else comes back, transfer the
2776          * error to the original request and complete it now.
2777          */
2778         if (!result) {
2779                 obj_request_existence_set(orig_request, true);
2780         } else if (result == -ENOENT) {
2781                 obj_request_existence_set(orig_request, false);
2782         } else if (result) {
2783                 orig_request->result = result;
2784                 goto out;
2785         }
2786
2787         /*
2788          * Resubmit the original request now that we have recorded
2789          * whether the target object exists.
2790          */
2791         orig_request->result = rbd_img_obj_request_submit(orig_request);
2792 out:
2793         if (orig_request->result)
2794                 rbd_obj_request_complete(orig_request);
2795 }
2796
2797 static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
2798 {
2799         struct rbd_obj_request *stat_request;
2800         struct rbd_device *rbd_dev;
2801         struct ceph_osd_client *osdc;
2802         struct page **pages = NULL;
2803         u32 page_count;
2804         size_t size;
2805         int ret;
2806
2807         /*
2808          * The response data for a STAT call consists of:
2809          *     le64 length;
2810          *     struct {
2811          *         le32 tv_sec;
2812          *         le32 tv_nsec;
2813          *     } mtime;
2814          */
2815         size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32);
2816         page_count = (u32)calc_pages_for(0, size);
2817         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2818         if (IS_ERR(pages))
2819                 return PTR_ERR(pages);
2820
2821         ret = -ENOMEM;
2822         stat_request = rbd_obj_request_create(obj_request->object_name, 0, 0,
2823                                                         OBJ_REQUEST_PAGES);
2824         if (!stat_request)
2825                 goto out;
2826
2827         rbd_obj_request_get(obj_request);
2828         stat_request->obj_request = obj_request;
2829         stat_request->pages = pages;
2830         stat_request->page_count = page_count;
2831
2832         rbd_assert(obj_request->img_request);
2833         rbd_dev = obj_request->img_request->rbd_dev;
2834         stat_request->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_READ, 1,
2835                                                    stat_request);
2836         if (!stat_request->osd_req)
2837                 goto out;
2838         stat_request->callback = rbd_img_obj_exists_callback;
2839
2840         osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT);
2841         osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0,
2842                                         false, false);
2843         rbd_osd_req_format_read(stat_request);
2844
2845         osdc = &rbd_dev->rbd_client->client->osdc;
2846         ret = rbd_obj_request_submit(osdc, stat_request);
2847 out:
2848         if (ret)
2849                 rbd_obj_request_put(obj_request);
2850
2851         return ret;
2852 }
2853
2854 static bool img_obj_request_simple(struct rbd_obj_request *obj_request)
2855 {
2856         struct rbd_img_request *img_request;
2857         struct rbd_device *rbd_dev;
2858
2859         rbd_assert(obj_request_img_data_test(obj_request));
2860
2861         img_request = obj_request->img_request;
2862         rbd_assert(img_request);
2863         rbd_dev = img_request->rbd_dev;
2864
2865         /* Reads */
2866         if (!img_request_write_test(img_request) &&
2867             !img_request_discard_test(img_request))
2868                 return true;
2869
2870         /* Non-layered writes */
2871         if (!img_request_layered_test(img_request))
2872                 return true;
2873
2874         /*
2875          * Layered writes outside of the parent overlap range don't
2876          * share any data with the parent.
2877          */
2878         if (!obj_request_overlaps_parent(obj_request))
2879                 return true;
2880
2881         /*
2882          * Entire-object layered writes - we will overwrite whatever
2883          * parent data there is anyway.
2884          */
2885         if (!obj_request->offset &&
2886             obj_request->length == rbd_obj_bytes(&rbd_dev->header))
2887                 return true;
2888
2889         /*
2890          * If the object is known to already exist, its parent data has
2891          * already been copied.
2892          */
2893         if (obj_request_known_test(obj_request) &&
2894             obj_request_exists_test(obj_request))
2895                 return true;
2896
2897         return false;
2898 }
2899
2900 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
2901 {
2902         if (img_obj_request_simple(obj_request)) {
2903                 struct rbd_device *rbd_dev;
2904                 struct ceph_osd_client *osdc;
2905
2906                 rbd_dev = obj_request->img_request->rbd_dev;
2907                 osdc = &rbd_dev->rbd_client->client->osdc;
2908
2909                 return rbd_obj_request_submit(osdc, obj_request);
2910         }
2911
2912         /*
2913          * It's a layered write.  The target object might exist but
2914          * we may not know that yet.  If we know it doesn't exist,
2915          * start by reading the data for the full target object from
2916          * the parent so we can use it for a copyup to the target.
2917          */
2918         if (obj_request_known_test(obj_request))
2919                 return rbd_img_obj_parent_read_full(obj_request);
2920
2921         /* We don't know whether the target exists.  Go find out. */
2922
2923         return rbd_img_obj_exists_submit(obj_request);
2924 }
2925
2926 static int rbd_img_request_submit(struct rbd_img_request *img_request)
2927 {
2928         struct rbd_obj_request *obj_request;
2929         struct rbd_obj_request *next_obj_request;
2930
2931         dout("%s: img %p\n", __func__, img_request);
2932         for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
2933                 int ret;
2934
2935                 ret = rbd_img_obj_request_submit(obj_request);
2936                 if (ret)
2937                         return ret;
2938         }
2939
2940         return 0;
2941 }
2942
2943 static void rbd_img_parent_read_callback(struct rbd_img_request *img_request)
2944 {
2945         struct rbd_obj_request *obj_request;
2946         struct rbd_device *rbd_dev;
2947         u64 obj_end;
2948         u64 img_xferred;
2949         int img_result;
2950
2951         rbd_assert(img_request_child_test(img_request));
2952
2953         /* First get what we need from the image request and release it */
2954
2955         obj_request = img_request->obj_request;
2956         img_xferred = img_request->xferred;
2957         img_result = img_request->result;
2958         rbd_img_request_put(img_request);
2959
2960         /*
2961          * If the overlap has become 0 (most likely because the
2962          * image has been flattened) we need to re-submit the
2963          * original request.
2964          */
2965         rbd_assert(obj_request);
2966         rbd_assert(obj_request->img_request);
2967         rbd_dev = obj_request->img_request->rbd_dev;
2968         if (!rbd_dev->parent_overlap) {
2969                 struct ceph_osd_client *osdc;
2970
2971                 osdc = &rbd_dev->rbd_client->client->osdc;
2972                 img_result = rbd_obj_request_submit(osdc, obj_request);
2973                 if (!img_result)
2974                         return;
2975         }
2976
2977         obj_request->result = img_result;
2978         if (obj_request->result)
2979                 goto out;
2980
2981         /*
2982          * We need to zero anything beyond the parent overlap
2983          * boundary.  Since rbd_img_obj_request_read_callback()
2984          * will zero anything beyond the end of a short read, an
2985          * easy way to do this is to pretend the data from the
2986          * parent came up short--ending at the overlap boundary.
2987          */
2988         rbd_assert(obj_request->img_offset < U64_MAX - obj_request->length);
2989         obj_end = obj_request->img_offset + obj_request->length;
2990         if (obj_end > rbd_dev->parent_overlap) {
2991                 u64 xferred = 0;
2992
2993                 if (obj_request->img_offset < rbd_dev->parent_overlap)
2994                         xferred = rbd_dev->parent_overlap -
2995                                         obj_request->img_offset;
2996
2997                 obj_request->xferred = min(img_xferred, xferred);
2998         } else {
2999                 obj_request->xferred = img_xferred;
3000         }
3001 out:
3002         rbd_img_obj_request_read_callback(obj_request);
3003         rbd_obj_request_complete(obj_request);
3004 }
3005
3006 static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
3007 {
3008         struct rbd_img_request *img_request;
3009         int result;
3010
3011         rbd_assert(obj_request_img_data_test(obj_request));
3012         rbd_assert(obj_request->img_request != NULL);
3013         rbd_assert(obj_request->result == (s32) -ENOENT);
3014         rbd_assert(obj_request_type_valid(obj_request->type));
3015
3016         /* rbd_read_finish(obj_request, obj_request->length); */
3017         img_request = rbd_parent_request_create(obj_request,
3018                                                 obj_request->img_offset,
3019                                                 obj_request->length);
3020         result = -ENOMEM;
3021         if (!img_request)
3022                 goto out_err;
3023
3024         if (obj_request->type == OBJ_REQUEST_BIO)
3025                 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
3026                                                 obj_request->bio_list);
3027         else
3028                 result = rbd_img_request_fill(img_request, OBJ_REQUEST_PAGES,
3029                                                 obj_request->pages);
3030         if (result)
3031                 goto out_err;
3032
3033         img_request->callback = rbd_img_parent_read_callback;
3034         result = rbd_img_request_submit(img_request);
3035         if (result)
3036                 goto out_err;
3037
3038         return;
3039 out_err:
3040         if (img_request)
3041                 rbd_img_request_put(img_request);
3042         obj_request->result = result;
3043         obj_request->xferred = 0;
3044         obj_request_done_set(obj_request);
3045 }
3046
3047 static int rbd_obj_notify_ack_sync(struct rbd_device *rbd_dev, u64 notify_id)
3048 {
3049         struct rbd_obj_request *obj_request;
3050         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3051         int ret;
3052
3053         obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
3054                                                         OBJ_REQUEST_NODATA);
3055         if (!obj_request)
3056                 return -ENOMEM;
3057
3058         ret = -ENOMEM;
3059         obj_request->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_READ, 1,
3060                                                   obj_request);
3061         if (!obj_request->osd_req)
3062                 goto out;
3063
3064         osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_NOTIFY_ACK,
3065                                         notify_id, 0, 0);
3066         rbd_osd_req_format_read(obj_request);
3067
3068         ret = rbd_obj_request_submit(osdc, obj_request);
3069         if (ret)
3070                 goto out;
3071         ret = rbd_obj_request_wait(obj_request);
3072 out:
3073         rbd_obj_request_put(obj_request);
3074
3075         return ret;
3076 }
3077
3078 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
3079 {
3080         struct rbd_device *rbd_dev = (struct rbd_device *)data;
3081         int ret;
3082
3083         if (!rbd_dev)
3084                 return;
3085
3086         dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
3087                 rbd_dev->header_name, (unsigned long long)notify_id,
3088                 (unsigned int)opcode);
3089
3090         /*
3091          * Until adequate refresh error handling is in place, there is
3092          * not much we can do here, except warn.
3093          *
3094          * See http://tracker.ceph.com/issues/5040
3095          */
3096         ret = rbd_dev_refresh(rbd_dev);
3097         if (ret)
3098                 rbd_warn(rbd_dev, "refresh failed: %d", ret);
3099
3100         ret = rbd_obj_notify_ack_sync(rbd_dev, notify_id);
3101         if (ret)
3102                 rbd_warn(rbd_dev, "notify_ack ret %d", ret);
3103 }
3104
3105 /*
3106  * Send a (un)watch request and wait for the ack.  Return a request
3107  * with a ref held on success or error.
3108  */
3109 static struct rbd_obj_request *rbd_obj_watch_request_helper(
3110                                                 struct rbd_device *rbd_dev,
3111                                                 bool watch)
3112 {
3113         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3114         struct rbd_obj_request *obj_request;
3115         int ret;
3116
3117         obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
3118                                              OBJ_REQUEST_NODATA);
3119         if (!obj_request)
3120                 return ERR_PTR(-ENOMEM);
3121
3122         obj_request->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_WRITE, 1,
3123                                                   obj_request);
3124         if (!obj_request->osd_req) {
3125                 ret = -ENOMEM;
3126                 goto out;
3127         }
3128
3129         osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
3130                               rbd_dev->watch_event->cookie, 0, watch);
3131         rbd_osd_req_format_write(obj_request);
3132
3133         if (watch)
3134                 ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
3135
3136         ret = rbd_obj_request_submit(osdc, obj_request);
3137         if (ret)
3138                 goto out;
3139
3140         ret = rbd_obj_request_wait(obj_request);
3141         if (ret)
3142                 goto out;
3143
3144         ret = obj_request->result;
3145         if (ret) {
3146                 if (watch)
3147                         rbd_obj_request_end(obj_request);
3148                 goto out;
3149         }
3150
3151         return obj_request;
3152
3153 out:
3154         rbd_obj_request_put(obj_request);
3155         return ERR_PTR(ret);
3156 }
3157
3158 /*
3159  * Initiate a watch request, synchronously.
3160  */
3161 static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev)
3162 {
3163         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3164         struct rbd_obj_request *obj_request;
3165         int ret;
3166
3167         rbd_assert(!rbd_dev->watch_event);
3168         rbd_assert(!rbd_dev->watch_request);
3169
3170         ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
3171                                      &rbd_dev->watch_event);
3172         if (ret < 0)
3173                 return ret;
3174
3175         obj_request = rbd_obj_watch_request_helper(rbd_dev, true);
3176         if (IS_ERR(obj_request)) {
3177                 ceph_osdc_cancel_event(rbd_dev->watch_event);
3178                 rbd_dev->watch_event = NULL;
3179                 return PTR_ERR(obj_request);
3180         }
3181
3182         /*
3183          * A watch request is set to linger, so the underlying osd
3184          * request won't go away until we unregister it.  We retain
3185          * a pointer to the object request during that time (in
3186          * rbd_dev->watch_request), so we'll keep a reference to it.
3187          * We'll drop that reference after we've unregistered it in
3188          * rbd_dev_header_unwatch_sync().
3189          */
3190         rbd_dev->watch_request = obj_request;
3191
3192         return 0;
3193 }
3194
3195 /*
3196  * Tear down a watch request, synchronously.
3197  */
3198 static void rbd_dev_header_unwatch_sync(struct rbd_device *rbd_dev)
3199 {
3200         struct rbd_obj_request *obj_request;
3201
3202         rbd_assert(rbd_dev->watch_event);
3203         rbd_assert(rbd_dev->watch_request);
3204
3205         rbd_obj_request_end(rbd_dev->watch_request);
3206         rbd_obj_request_put(rbd_dev->watch_request);
3207         rbd_dev->watch_request = NULL;
3208
3209         obj_request = rbd_obj_watch_request_helper(rbd_dev, false);
3210         if (!IS_ERR(obj_request))
3211                 rbd_obj_request_put(obj_request);
3212         else
3213                 rbd_warn(rbd_dev, "unable to tear down watch request (%ld)",
3214                          PTR_ERR(obj_request));
3215
3216         ceph_osdc_cancel_event(rbd_dev->watch_event);
3217         rbd_dev->watch_event = NULL;
3218 }
3219
3220 /*
3221  * Synchronous osd object method call.  Returns the number of bytes
3222  * returned in the outbound buffer, or a negative error code.
3223  */
3224 static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
3225                              const char *object_name,
3226                              const char *class_name,
3227                              const char *method_name,
3228                              const void *outbound,
3229                              size_t outbound_size,
3230                              void *inbound,
3231                              size_t inbound_size)
3232 {
3233         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3234         struct rbd_obj_request *obj_request;
3235         struct page **pages;
3236         u32 page_count;
3237         int ret;
3238
3239         /*
3240          * Method calls are ultimately read operations.  The result
3241          * should placed into the inbound buffer provided.  They
3242          * also supply outbound data--parameters for the object
3243          * method.  Currently if this is present it will be a
3244          * snapshot id.
3245          */
3246         page_count = (u32)calc_pages_for(0, inbound_size);
3247         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
3248         if (IS_ERR(pages))
3249                 return PTR_ERR(pages);
3250
3251         ret = -ENOMEM;
3252         obj_request = rbd_obj_request_create(object_name, 0, inbound_size,
3253                                                         OBJ_REQUEST_PAGES);
3254         if (!obj_request)
3255                 goto out;
3256
3257         obj_request->pages = pages;
3258         obj_request->page_count = page_count;
3259
3260         obj_request->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_READ, 1,
3261                                                   obj_request);
3262         if (!obj_request->osd_req)
3263                 goto out;
3264
3265         osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL,
3266                                         class_name, method_name);
3267         if (outbound_size) {
3268                 struct ceph_pagelist *pagelist;
3269
3270                 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
3271                 if (!pagelist)
3272                         goto out;
3273
3274                 ceph_pagelist_init(pagelist);
3275                 ceph_pagelist_append(pagelist, outbound, outbound_size);
3276                 osd_req_op_cls_request_data_pagelist(obj_request->osd_req, 0,
3277                                                 pagelist);
3278         }
3279         osd_req_op_cls_response_data_pages(obj_request->osd_req, 0,
3280                                         obj_request->pages, inbound_size,
3281                                         0, false, false);
3282         rbd_osd_req_format_read(obj_request);
3283
3284         ret = rbd_obj_request_submit(osdc, obj_request);
3285         if (ret)
3286                 goto out;
3287         ret = rbd_obj_request_wait(obj_request);
3288         if (ret)
3289                 goto out;
3290
3291         ret = obj_request->result;
3292         if (ret < 0)
3293                 goto out;
3294
3295         rbd_assert(obj_request->xferred < (u64)INT_MAX);
3296         ret = (int)obj_request->xferred;
3297         ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred);
3298 out:
3299         if (obj_request)
3300                 rbd_obj_request_put(obj_request);
3301         else
3302                 ceph_release_page_vector(pages, page_count);
3303
3304         return ret;
3305 }
3306
3307 static void rbd_handle_request(struct rbd_device *rbd_dev, struct request *rq)
3308 {
3309         struct rbd_img_request *img_request;
3310         struct ceph_snap_context *snapc = NULL;
3311         u64 offset = (u64)blk_rq_pos(rq) << SECTOR_SHIFT;
3312         u64 length = blk_rq_bytes(rq);
3313         enum obj_operation_type op_type;
3314         u64 mapping_size;
3315         int result;
3316
3317         if (rq->cmd_flags & REQ_DISCARD)
3318                 op_type = OBJ_OP_DISCARD;
3319         else if (rq->cmd_flags & REQ_WRITE)
3320                 op_type = OBJ_OP_WRITE;
3321         else
3322                 op_type = OBJ_OP_READ;
3323
3324         /* Ignore/skip any zero-length requests */
3325
3326         if (!length) {
3327                 dout("%s: zero-length request\n", __func__);
3328                 result = 0;
3329                 goto err_rq;
3330         }
3331
3332         /* Only reads are allowed to a read-only device */
3333
3334         if (op_type != OBJ_OP_READ) {
3335                 if (rbd_dev->mapping.read_only) {
3336                         result = -EROFS;
3337                         goto err_rq;
3338                 }
3339                 rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
3340         }
3341
3342         /*
3343          * Quit early if the mapped snapshot no longer exists.  It's
3344          * still possible the snapshot will have disappeared by the
3345          * time our request arrives at the osd, but there's no sense in
3346          * sending it if we already know.
3347          */
3348         if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
3349                 dout("request for non-existent snapshot");
3350                 rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
3351                 result = -ENXIO;
3352                 goto err_rq;
3353         }
3354
3355         if (offset && length > U64_MAX - offset + 1) {
3356                 rbd_warn(rbd_dev, "bad request range (%llu~%llu)", offset,
3357                          length);
3358                 result = -EINVAL;
3359                 goto err_rq;    /* Shouldn't happen */
3360         }
3361
3362         down_read(&rbd_dev->header_rwsem);
3363         mapping_size = rbd_dev->mapping.size;
3364         if (op_type != OBJ_OP_READ) {
3365                 snapc = rbd_dev->header.snapc;
3366                 ceph_get_snap_context(snapc);
3367         }
3368         up_read(&rbd_dev->header_rwsem);
3369
3370         if (offset + length > mapping_size) {
3371                 rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)", offset,
3372                          length, mapping_size);
3373                 result = -EIO;
3374                 goto err_rq;
3375         }
3376
3377         img_request = rbd_img_request_create(rbd_dev, offset, length, op_type,
3378                                              snapc);
3379         if (!img_request) {
3380                 result = -ENOMEM;
3381                 goto err_rq;
3382         }
3383         img_request->rq = rq;
3384
3385         if (op_type == OBJ_OP_DISCARD)
3386                 result = rbd_img_request_fill(img_request, OBJ_REQUEST_NODATA,
3387                                               NULL);
3388         else
3389                 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
3390                                               rq->bio);
3391         if (result)
3392                 goto err_img_request;
3393
3394         result = rbd_img_request_submit(img_request);
3395         if (result)
3396                 goto err_img_request;
3397
3398         return;
3399
3400 err_img_request:
3401         rbd_img_request_put(img_request);
3402 err_rq:
3403         if (result)
3404                 rbd_warn(rbd_dev, "%s %llx at %llx result %d",
3405                          obj_op_name(op_type), length, offset, result);
3406         ceph_put_snap_context(snapc);
3407         blk_end_request_all(rq, result);
3408 }
3409
3410 static void rbd_request_workfn(struct work_struct *work)
3411 {
3412         struct rbd_device *rbd_dev =
3413             container_of(work, struct rbd_device, rq_work);
3414         struct request *rq, *next;
3415         LIST_HEAD(requests);
3416
3417         spin_lock_irq(&rbd_dev->lock); /* rq->q->queue_lock */
3418         list_splice_init(&rbd_dev->rq_queue, &requests);
3419         spin_unlock_irq(&rbd_dev->lock);
3420
3421         list_for_each_entry_safe(rq, next, &requests, queuelist) {
3422                 list_del_init(&rq->queuelist);
3423                 rbd_handle_request(rbd_dev, rq);
3424         }
3425 }
3426
3427 /*
3428  * Called with q->queue_lock held and interrupts disabled, possibly on
3429  * the way to schedule().  Do not sleep here!
3430  */
3431 static void rbd_request_fn(struct request_queue *q)
3432 {
3433         struct rbd_device *rbd_dev = q->queuedata;
3434         struct request *rq;
3435         int queued = 0;
3436
3437         rbd_assert(rbd_dev);
3438
3439         while ((rq = blk_fetch_request(q))) {
3440                 /* Ignore any non-FS requests that filter through. */
3441                 if (rq->cmd_type != REQ_TYPE_FS) {
3442                         dout("%s: non-fs request type %d\n", __func__,
3443                                 (int) rq->cmd_type);
3444                         __blk_end_request_all(rq, 0);
3445                         continue;
3446                 }
3447
3448                 list_add_tail(&rq->queuelist, &rbd_dev->rq_queue);
3449                 queued++;
3450         }
3451
3452         if (queued)
3453                 queue_work(rbd_wq, &rbd_dev->rq_work);
3454 }
3455
3456 /*
3457  * a queue callback. Makes sure that we don't create a bio that spans across
3458  * multiple osd objects. One exception would be with a single page bios,
3459  * which we handle later at bio_chain_clone_range()
3460  */
3461 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
3462                           struct bio_vec *bvec)
3463 {
3464         struct rbd_device *rbd_dev = q->queuedata;
3465         sector_t sector_offset;
3466         sector_t sectors_per_obj;
3467         sector_t obj_sector_offset;
3468         int ret;
3469
3470         /*
3471          * Find how far into its rbd object the partition-relative
3472          * bio start sector is to offset relative to the enclosing
3473          * device.
3474          */
3475         sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
3476         sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
3477         obj_sector_offset = sector_offset & (sectors_per_obj - 1);
3478
3479         /*
3480          * Compute the number of bytes from that offset to the end
3481          * of the object.  Account for what's already used by the bio.
3482          */
3483         ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
3484         if (ret > bmd->bi_size)
3485                 ret -= bmd->bi_size;
3486         else
3487                 ret = 0;
3488
3489         /*
3490          * Don't send back more than was asked for.  And if the bio
3491          * was empty, let the whole thing through because:  "Note
3492          * that a block device *must* allow a single page to be
3493          * added to an empty bio."
3494          */
3495         rbd_assert(bvec->bv_len <= PAGE_SIZE);
3496         if (ret > (int) bvec->bv_len || !bmd->bi_size)
3497                 ret = (int) bvec->bv_len;
3498
3499         return ret;
3500 }
3501
3502 static void rbd_free_disk(struct rbd_device *rbd_dev)
3503 {
3504         struct gendisk *disk = rbd_dev->disk;
3505
3506         if (!disk)
3507                 return;
3508
3509         rbd_dev->disk = NULL;
3510         if (disk->flags & GENHD_FL_UP) {
3511                 del_gendisk(disk);
3512                 if (disk->queue)
3513                         blk_cleanup_queue(disk->queue);
3514         }
3515         put_disk(disk);
3516 }
3517
3518 static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
3519                                 const char *object_name,
3520                                 u64 offset, u64 length, void *buf)
3521
3522 {
3523         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3524         struct rbd_obj_request *obj_request;
3525         struct page **pages = NULL;
3526         u32 page_count;
3527         size_t size;
3528         int ret;
3529
3530         page_count = (u32) calc_pages_for(offset, length);
3531         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
3532         if (IS_ERR(pages))
3533                 return PTR_ERR(pages);
3534
3535         ret = -ENOMEM;
3536         obj_request = rbd_obj_request_create(object_name, offset, length,
3537                                                         OBJ_REQUEST_PAGES);
3538         if (!obj_request)
3539                 goto out;
3540
3541         obj_request->pages = pages;
3542         obj_request->page_count = page_count;
3543
3544         obj_request->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_READ, 1,
3545                                                   obj_request);
3546         if (!obj_request->osd_req)
3547                 goto out;
3548
3549         osd_req_op_extent_init(obj_request->osd_req, 0, CEPH_OSD_OP_READ,
3550                                         offset, length, 0, 0);
3551         osd_req_op_extent_osd_data_pages(obj_request->osd_req, 0,
3552                                         obj_request->pages,
3553                                         obj_request->length,
3554                                         obj_request->offset & ~PAGE_MASK,
3555                                         false, false);
3556         rbd_osd_req_format_read(obj_request);
3557
3558         ret = rbd_obj_request_submit(osdc, obj_request);
3559         if (ret)
3560                 goto out;
3561         ret = rbd_obj_request_wait(obj_request);
3562         if (ret)
3563                 goto out;
3564
3565         ret = obj_request->result;
3566         if (ret < 0)
3567                 goto out;
3568
3569         rbd_assert(obj_request->xferred <= (u64) SIZE_MAX);
3570         size = (size_t) obj_request->xferred;
3571         ceph_copy_from_page_vector(pages, buf, 0, size);
3572         rbd_assert(size <= (size_t)INT_MAX);
3573         ret = (int)size;
3574 out:
3575         if (obj_request)
3576                 rbd_obj_request_put(obj_request);
3577         else
3578                 ceph_release_page_vector(pages, page_count);
3579
3580         return ret;
3581 }
3582
3583 /*
3584  * Read the complete header for the given rbd device.  On successful
3585  * return, the rbd_dev->header field will contain up-to-date
3586  * information about the image.
3587  */
3588 static int rbd_dev_v1_header_info(struct rbd_device *rbd_dev)
3589 {
3590         struct rbd_image_header_ondisk *ondisk = NULL;
3591         u32 snap_count = 0;
3592         u64 names_size = 0;
3593         u32 want_count;
3594         int ret;
3595
3596         /*
3597          * The complete header will include an array of its 64-bit
3598          * snapshot ids, followed by the names of those snapshots as
3599          * a contiguous block of NUL-terminated strings.  Note that
3600          * the number of snapshots could change by the time we read
3601          * it in, in which case we re-read it.
3602          */
3603         do {
3604                 size_t size;
3605
3606                 kfree(ondisk);
3607
3608                 size = sizeof (*ondisk);
3609                 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
3610                 size += names_size;
3611                 ondisk = kmalloc(size, GFP_KERNEL);
3612                 if (!ondisk)
3613                         return -ENOMEM;
3614
3615                 ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
3616                                        0, size, ondisk);
3617                 if (ret < 0)
3618                         goto out;
3619                 if ((size_t)ret < size) {
3620                         ret = -ENXIO;
3621                         rbd_warn(rbd_dev, "short header read (want %zd got %d)",
3622                                 size, ret);
3623                         goto out;
3624                 }
3625                 if (!rbd_dev_ondisk_valid(ondisk)) {
3626                         ret = -ENXIO;
3627                         rbd_warn(rbd_dev, "invalid header");
3628                         goto out;
3629                 }
3630
3631                 names_size = le64_to_cpu(ondisk->snap_names_len);
3632                 want_count = snap_count;
3633                 snap_count = le32_to_cpu(ondisk->snap_count);
3634         } while (snap_count != want_count);
3635
3636         ret = rbd_header_from_disk(rbd_dev, ondisk);
3637 out:
3638         kfree(ondisk);
3639
3640         return ret;
3641 }
3642
3643 /*
3644  * Clear the rbd device's EXISTS flag if the snapshot it's mapped to
3645  * has disappeared from the (just updated) snapshot context.
3646  */
3647 static void rbd_exists_validate(struct rbd_device *rbd_dev)
3648 {
3649         u64 snap_id;
3650
3651         if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags))
3652                 return;
3653
3654         snap_id = rbd_dev->spec->snap_id;
3655         if (snap_id == CEPH_NOSNAP)
3656                 return;
3657
3658         if (rbd_dev_snap_index(rbd_dev, snap_id) == BAD_SNAP_INDEX)
3659                 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
3660 }
3661
3662 static void rbd_dev_update_size(struct rbd_device *rbd_dev)
3663 {
3664         sector_t size;
3665         bool removing;
3666
3667         /*
3668          * Don't hold the lock while doing disk operations,
3669          * or lock ordering will conflict with the bdev mutex via:
3670          * rbd_add() -> blkdev_get() -> rbd_open()
3671          */
3672         spin_lock_irq(&rbd_dev->lock);
3673         removing = test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags);
3674         spin_unlock_irq(&rbd_dev->lock);
3675         /*
3676          * If the device is being removed, rbd_dev->disk has
3677          * been destroyed, so don't try to update its size
3678          */
3679         if (!removing) {
3680                 size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
3681                 dout("setting size to %llu sectors", (unsigned long long)size);
3682                 set_capacity(rbd_dev->disk, size);
3683                 revalidate_disk(rbd_dev->disk);
3684         }
3685 }
3686
3687 static int rbd_dev_refresh(struct rbd_device *rbd_dev)
3688 {
3689         u64 mapping_size;
3690         int ret;
3691
3692         down_write(&rbd_dev->header_rwsem);
3693         mapping_size = rbd_dev->mapping.size;
3694
3695         ret = rbd_dev_header_info(rbd_dev);
3696         if (ret)
3697                 return ret;
3698
3699         /*
3700          * If there is a parent, see if it has disappeared due to the
3701          * mapped image getting flattened.
3702          */
3703         if (rbd_dev->parent) {
3704                 ret = rbd_dev_v2_parent_info(rbd_dev);
3705                 if (ret)
3706                         return ret;
3707         }
3708
3709         if (rbd_dev->spec->snap_id == CEPH_NOSNAP) {
3710                 if (rbd_dev->mapping.size != rbd_dev->header.image_size)
3711                         rbd_dev->mapping.size = rbd_dev->header.image_size;
3712         } else {
3713                 /* validate mapped snapshot's EXISTS flag */
3714                 rbd_exists_validate(rbd_dev);
3715         }
3716
3717         up_write(&rbd_dev->header_rwsem);
3718
3719         if (mapping_size != rbd_dev->mapping.size)
3720                 rbd_dev_update_size(rbd_dev);
3721
3722         return 0;
3723 }
3724
3725 static int rbd_init_disk(struct rbd_device *rbd_dev)
3726 {
3727         struct gendisk *disk;
3728         struct request_queue *q;
3729         u64 segment_size;
3730
3731         /* create gendisk info */
3732         disk = alloc_disk(single_major ?
3733                           (1 << RBD_SINGLE_MAJOR_PART_SHIFT) :
3734                           RBD_MINORS_PER_MAJOR);
3735         if (!disk)
3736                 return -ENOMEM;
3737
3738         snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
3739                  rbd_dev->dev_id);
3740         disk->major = rbd_dev->major;
3741         disk->first_minor = rbd_dev->minor;
3742         if (single_major)
3743                 disk->flags |= GENHD_FL_EXT_DEVT;
3744         disk->fops = &rbd_bd_ops;
3745         disk->private_data = rbd_dev;
3746
3747         q = blk_init_queue(rbd_request_fn, &rbd_dev->lock);
3748         if (!q)
3749                 goto out_disk;
3750
3751         /* We use the default size, but let's be explicit about it. */
3752         blk_queue_physical_block_size(q, SECTOR_SIZE);
3753
3754         /* set io sizes to object size */
3755         segment_size = rbd_obj_bytes(&rbd_dev->header);
3756         blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
3757         blk_queue_max_segment_size(q, segment_size);
3758         blk_queue_io_min(q, segment_size);
3759         blk_queue_io_opt(q, segment_size);
3760
3761         /* enable the discard support */
3762         queue_flag_set_unlocked(QUEUE_FLAG_DISCARD, q);
3763         q->limits.discard_granularity = segment_size;
3764         q->limits.discard_alignment = segment_size;
3765         q->limits.max_discard_sectors = segment_size / SECTOR_SIZE;
3766         q->limits.discard_zeroes_data = 1;
3767
3768         blk_queue_merge_bvec(q, rbd_merge_bvec);
3769         disk->queue = q;
3770
3771         q->queuedata = rbd_dev;
3772
3773         rbd_dev->disk = disk;
3774
3775         return 0;
3776 out_disk:
3777         put_disk(disk);
3778
3779         return -ENOMEM;
3780 }
3781
3782 /*
3783   sysfs
3784 */
3785
3786 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
3787 {
3788         return container_of(dev, struct rbd_device, dev);
3789 }
3790
3791 static ssize_t rbd_size_show(struct device *dev,
3792                              struct device_attribute *attr, char *buf)
3793 {
3794         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3795
3796         return sprintf(buf, "%llu\n",
3797                 (unsigned long long)rbd_dev->mapping.size);
3798 }
3799
3800 /*
3801  * Note this shows the features for whatever's mapped, which is not
3802  * necessarily the base image.
3803  */
3804 static ssize_t rbd_features_show(struct device *dev,
3805                              struct device_attribute *attr, char *buf)
3806 {
3807         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3808
3809         return sprintf(buf, "0x%016llx\n",
3810                         (unsigned long long)rbd_dev->mapping.features);
3811 }
3812
3813 static ssize_t rbd_major_show(struct device *dev,
3814                               struct device_attribute *attr, char *buf)
3815 {
3816         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3817
3818         if (rbd_dev->major)
3819                 return sprintf(buf, "%d\n", rbd_dev->major);
3820
3821         return sprintf(buf, "(none)\n");
3822 }
3823
3824 static ssize_t rbd_minor_show(struct device *dev,
3825                               struct device_attribute *attr, char *buf)
3826 {
3827         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3828
3829         return sprintf(buf, "%d\n", rbd_dev->minor);
3830 }
3831
3832 static ssize_t rbd_client_id_show(struct device *dev,
3833                                   struct device_attribute *attr, char *buf)
3834 {
3835         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3836
3837         return sprintf(buf, "client%lld\n",
3838                         ceph_client_id(rbd_dev->rbd_client->client));
3839 }
3840
3841 static ssize_t rbd_pool_show(struct device *dev,
3842                              struct device_attribute *attr, char *buf)
3843 {
3844         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3845
3846         return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
3847 }
3848
3849 static ssize_t rbd_pool_id_show(struct device *dev,
3850                              struct device_attribute *attr, char *buf)
3851 {
3852         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3853
3854         return sprintf(buf, "%llu\n",
3855                         (unsigned long long) rbd_dev->spec->pool_id);
3856 }
3857
3858 static ssize_t rbd_name_show(struct device *dev,
3859                              struct device_attribute *attr, char *buf)
3860 {
3861         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3862
3863         if (rbd_dev->spec->image_name)
3864                 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
3865
3866         return sprintf(buf, "(unknown)\n");
3867 }
3868
3869 static ssize_t rbd_image_id_show(struct device *dev,
3870                              struct device_attribute *attr, char *buf)
3871 {
3872         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3873
3874         return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
3875 }
3876
3877 /*
3878  * Shows the name of the currently-mapped snapshot (or
3879  * RBD_SNAP_HEAD_NAME for the base image).
3880  */
3881 static ssize_t rbd_snap_show(struct device *dev,
3882                              struct device_attribute *attr,
3883                              char *buf)
3884 {
3885         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3886
3887         return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
3888 }
3889
3890 /*
3891  * For a v2 image, shows the chain of parent images, separated by empty
3892  * lines.  For v1 images or if there is no parent, shows "(no parent
3893  * image)".
3894  */
3895 static ssize_t rbd_parent_show(struct device *dev,
3896                                struct device_attribute *attr,
3897                                char *buf)
3898 {
3899         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3900         ssize_t count = 0;
3901
3902         if (!rbd_dev->parent)
3903                 return sprintf(buf, "(no parent image)\n");
3904
3905         for ( ; rbd_dev->parent; rbd_dev = rbd_dev->parent) {
3906                 struct rbd_spec *spec = rbd_dev->parent_spec;
3907
3908                 count += sprintf(&buf[count], "%s"
3909                             "pool_id %llu\npool_name %s\n"
3910                             "image_id %s\nimage_name %s\n"
3911                             "snap_id %llu\nsnap_name %s\n"
3912                             "overlap %llu\n",
3913                             !count ? "" : "\n", /* first? */
3914                             spec->pool_id, spec->pool_name,
3915                             spec->image_id, spec->image_name ?: "(unknown)",
3916                             spec->snap_id, spec->snap_name,
3917                             rbd_dev->parent_overlap);
3918         }
3919
3920         return count;
3921 }
3922
3923 static ssize_t rbd_image_refresh(struct device *dev,
3924                                  struct device_attribute *attr,
3925                                  const char *buf,
3926                                  size_t size)
3927 {
3928         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3929         int ret;
3930
3931         ret = rbd_dev_refresh(rbd_dev);
3932         if (ret)
3933                 return ret;
3934
3935         return size;
3936 }
3937
3938 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
3939 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
3940 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
3941 static DEVICE_ATTR(minor, S_IRUGO, rbd_minor_show, NULL);
3942 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
3943 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
3944 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
3945 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
3946 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
3947 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
3948 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
3949 static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
3950
3951 static struct attribute *rbd_attrs[] = {
3952         &dev_attr_size.attr,
3953         &dev_attr_features.attr,
3954         &dev_attr_major.attr,
3955         &dev_attr_minor.attr,
3956         &dev_attr_client_id.attr,
3957         &dev_attr_pool.attr,
3958         &dev_attr_pool_id.attr,
3959         &dev_attr_name.attr,
3960         &dev_attr_image_id.attr,
3961         &dev_attr_current_snap.attr,
3962         &dev_attr_parent.attr,
3963         &dev_attr_refresh.attr,
3964         NULL
3965 };
3966
3967 static struct attribute_group rbd_attr_group = {
3968         .attrs = rbd_attrs,
3969 };
3970
3971 static const struct attribute_group *rbd_attr_groups[] = {
3972         &rbd_attr_group,
3973         NULL
3974 };
3975
3976 static void rbd_sysfs_dev_release(struct device *dev)
3977 {
3978 }
3979
3980 static struct device_type rbd_device_type = {
3981         .name           = "rbd",
3982         .groups         = rbd_attr_groups,
3983         .release        = rbd_sysfs_dev_release,
3984 };
3985
3986 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
3987 {
3988         kref_get(&spec->kref);
3989
3990         return spec;
3991 }
3992
3993 static void rbd_spec_free(struct kref *kref);
3994 static void rbd_spec_put(struct rbd_spec *spec)
3995 {
3996         if (spec)
3997                 kref_put(&spec->kref, rbd_spec_free);
3998 }
3999
4000 static struct rbd_spec *rbd_spec_alloc(void)
4001 {
4002         struct rbd_spec *spec;
4003
4004         spec = kzalloc(sizeof (*spec), GFP_KERNEL);
4005         if (!spec)
4006                 return NULL;
4007
4008         spec->pool_id = CEPH_NOPOOL;
4009         spec->snap_id = CEPH_NOSNAP;
4010         kref_init(&spec->kref);
4011
4012         return spec;
4013 }
4014
4015 static void rbd_spec_free(struct kref *kref)
4016 {
4017         struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
4018
4019         kfree(spec->pool_name);
4020         kfree(spec->image_id);
4021         kfree(spec->image_name);
4022         kfree(spec->snap_name);
4023         kfree(spec);
4024 }
4025
4026 static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
4027                                 struct rbd_spec *spec)
4028 {
4029         struct rbd_device *rbd_dev;
4030
4031         rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
4032         if (!rbd_dev)
4033                 return NULL;
4034
4035         spin_lock_init(&rbd_dev->lock);
4036         INIT_LIST_HEAD(&rbd_dev->rq_queue);
4037         INIT_WORK(&rbd_dev->rq_work, rbd_request_workfn);
4038         rbd_dev->flags = 0;
4039         atomic_set(&rbd_dev->parent_ref, 0);
4040         INIT_LIST_HEAD(&rbd_dev->node);
4041         init_rwsem(&rbd_dev->header_rwsem);
4042
4043         rbd_dev->spec = spec;
4044         rbd_dev->rbd_client = rbdc;
4045
4046         /* Initialize the layout used for all rbd requests */
4047
4048         rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
4049         rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
4050         rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
4051         rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
4052
4053         return rbd_dev;
4054 }
4055
4056 static void rbd_dev_destroy(struct rbd_device *rbd_dev)
4057 {
4058         rbd_put_client(rbd_dev->rbd_client);
4059         rbd_spec_put(rbd_dev->spec);
4060         kfree(rbd_dev);
4061 }
4062
4063 /*
4064  * Get the size and object order for an image snapshot, or if
4065  * snap_id is CEPH_NOSNAP, gets this information for the base
4066  * image.
4067  */
4068 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
4069                                 u8 *order, u64 *snap_size)
4070 {
4071         __le64 snapid = cpu_to_le64(snap_id);
4072         int ret;
4073         struct {
4074                 u8 order;
4075                 __le64 size;
4076         } __attribute__ ((packed)) size_buf = { 0 };
4077
4078         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
4079                                 "rbd", "get_size",
4080                                 &snapid, sizeof (snapid),
4081                                 &size_buf, sizeof (size_buf));
4082         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4083         if (ret < 0)
4084                 return ret;
4085         if (ret < sizeof (size_buf))
4086                 return -ERANGE;
4087
4088         if (order) {
4089                 *order = size_buf.order;
4090                 dout("  order %u", (unsigned int)*order);
4091         }
4092         *snap_size = le64_to_cpu(size_buf.size);
4093
4094         dout("  snap_id 0x%016llx snap_size = %llu\n",
4095                 (unsigned long long)snap_id,
4096                 (unsigned long long)*snap_size);
4097
4098         return 0;
4099 }
4100
4101 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
4102 {
4103         return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
4104                                         &rbd_dev->header.obj_order,
4105                                         &rbd_dev->header.image_size);
4106 }
4107
4108 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
4109 {
4110         void *reply_buf;
4111         int ret;
4112         void *p;
4113
4114         reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
4115         if (!reply_buf)
4116                 return -ENOMEM;
4117
4118         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
4119                                 "rbd", "get_object_prefix", NULL, 0,
4120                                 reply_buf, RBD_OBJ_PREFIX_LEN_MAX);
4121         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4122         if (ret < 0)
4123                 goto out;
4124
4125         p = reply_buf;
4126         rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
4127                                                 p + ret, NULL, GFP_NOIO);
4128         ret = 0;
4129
4130         if (IS_ERR(rbd_dev->header.object_prefix)) {
4131                 ret = PTR_ERR(rbd_dev->header.object_prefix);
4132                 rbd_dev->header.object_prefix = NULL;
4133         } else {
4134                 dout("  object_prefix = %s\n", rbd_dev->header.object_prefix);
4135         }
4136 out:
4137         kfree(reply_buf);
4138
4139         return ret;
4140 }
4141
4142 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
4143                 u64 *snap_features)
4144 {
4145         __le64 snapid = cpu_to_le64(snap_id);
4146         struct {
4147                 __le64 features;
4148                 __le64 incompat;
4149         } __attribute__ ((packed)) features_buf = { 0 };
4150         u64 incompat;
4151         int ret;
4152
4153         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
4154                                 "rbd", "get_features",
4155                                 &snapid, sizeof (snapid),
4156                                 &features_buf, sizeof (features_buf));
4157         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4158         if (ret < 0)
4159                 return ret;
4160         if (ret < sizeof (features_buf))
4161                 return -ERANGE;
4162
4163         incompat = le64_to_cpu(features_buf.incompat);
4164         if (incompat & ~RBD_FEATURES_SUPPORTED)
4165                 return -ENXIO;
4166
4167         *snap_features = le64_to_cpu(features_buf.features);
4168
4169         dout("  snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
4170                 (unsigned long long)snap_id,
4171                 (unsigned long long)*snap_features,
4172                 (unsigned long long)le64_to_cpu(features_buf.incompat));
4173
4174         return 0;
4175 }
4176
4177 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
4178 {
4179         return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
4180                                                 &rbd_dev->header.features);
4181 }
4182
4183 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
4184 {
4185         struct rbd_spec *parent_spec;
4186         size_t size;
4187         void *reply_buf = NULL;
4188         __le64 snapid;
4189         void *p;
4190         void *end;
4191         u64 pool_id;
4192         char *image_id;
4193         u64 snap_id;
4194         u64 overlap;
4195         int ret;
4196
4197         parent_spec = rbd_spec_alloc();
4198         if (!parent_spec)
4199                 return -ENOMEM;
4200
4201         size = sizeof (__le64) +                                /* pool_id */
4202                 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX +        /* image_id */
4203                 sizeof (__le64) +                               /* snap_id */
4204                 sizeof (__le64);                                /* overlap */
4205         reply_buf = kmalloc(size, GFP_KERNEL);
4206         if (!reply_buf) {
4207                 ret = -ENOMEM;
4208                 goto out_err;
4209         }
4210
4211         snapid = cpu_to_le64(rbd_dev->spec->snap_id);
4212         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
4213                                 "rbd", "get_parent",
4214                                 &snapid, sizeof (snapid),
4215                                 reply_buf, size);
4216         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4217         if (ret < 0)
4218                 goto out_err;
4219
4220         p = reply_buf;
4221         end = reply_buf + ret;
4222         ret = -ERANGE;
4223         ceph_decode_64_safe(&p, end, pool_id, out_err);
4224         if (pool_id == CEPH_NOPOOL) {
4225                 /*
4226                  * Either the parent never existed, or we have
4227                  * record of it but the image got flattened so it no
4228                  * longer has a parent.  When the parent of a
4229                  * layered image disappears we immediately set the
4230                  * overlap to 0.  The effect of this is that all new
4231                  * requests will be treated as if the image had no
4232                  * parent.
4233                  */
4234                 if (rbd_dev->parent_overlap) {
4235                         rbd_dev->parent_overlap = 0;
4236                         rbd_dev_parent_put(rbd_dev);
4237                         pr_info("%s: clone image has been flattened\n",
4238                                 rbd_dev->disk->disk_name);
4239                 }
4240
4241                 goto out;       /* No parent?  No problem. */
4242         }
4243
4244         /* The ceph file layout needs to fit pool id in 32 bits */
4245
4246         ret = -EIO;
4247         if (pool_id > (u64)U32_MAX) {
4248                 rbd_warn(NULL, "parent pool id too large (%llu > %u)",
4249                         (unsigned long long)pool_id, U32_MAX);
4250                 goto out_err;
4251         }
4252
4253         image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
4254         if (IS_ERR(image_id)) {
4255                 ret = PTR_ERR(image_id);
4256                 goto out_err;
4257         }
4258         ceph_decode_64_safe(&p, end, snap_id, out_err);
4259         ceph_decode_64_safe(&p, end, overlap, out_err);
4260
4261         /*
4262          * The parent won't change (except when the clone is
4263          * flattened, already handled that).  So we only need to
4264          * record the parent spec we have not already done so.
4265          */
4266         if (!rbd_dev->parent_spec) {
4267                 parent_spec->pool_id = pool_id;
4268                 parent_spec->image_id = image_id;
4269                 parent_spec->snap_id = snap_id;
4270                 rbd_dev->parent_spec = parent_spec;
4271                 parent_spec = NULL;     /* rbd_dev now owns this */
4272         } else {
4273                 kfree(image_id);
4274         }
4275
4276         /*
4277          * We always update the parent overlap.  If it's zero we
4278          * treat it specially.
4279          */
4280         rbd_dev->parent_overlap = overlap;
4281         if (!overlap) {
4282
4283                 /* A null parent_spec indicates it's the initial probe */
4284
4285                 if (parent_spec) {
4286                         /*
4287                          * The overlap has become zero, so the clone
4288                          * must have been resized down to 0 at some
4289                          * point.  Treat this the same as a flatten.
4290                          */
4291                         rbd_dev_parent_put(rbd_dev);
4292                         pr_info("%s: clone image now standalone\n",
4293                                 rbd_dev->disk->disk_name);
4294                 } else {
4295                         /*
4296                          * For the initial probe, if we find the
4297                          * overlap is zero we just pretend there was
4298                          * no parent image.
4299                          */
4300                         rbd_warn(rbd_dev, "ignoring parent with overlap 0");
4301                 }
4302         }
4303 out:
4304         ret = 0;
4305 out_err:
4306         kfree(reply_buf);
4307         rbd_spec_put(parent_spec);
4308
4309         return ret;
4310 }
4311
4312 static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
4313 {
4314         struct {
4315                 __le64 stripe_unit;
4316                 __le64 stripe_count;
4317         } __attribute__ ((packed)) striping_info_buf = { 0 };
4318         size_t size = sizeof (striping_info_buf);
4319         void *p;
4320         u64 obj_size;
4321         u64 stripe_unit;
4322         u64 stripe_count;
4323         int ret;
4324
4325         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
4326                                 "rbd", "get_stripe_unit_count", NULL, 0,
4327                                 (char *)&striping_info_buf, size);
4328         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4329         if (ret < 0)
4330                 return ret;
4331         if (ret < size)
4332                 return -ERANGE;
4333
4334         /*
4335          * We don't actually support the "fancy striping" feature
4336          * (STRIPINGV2) yet, but if the striping sizes are the
4337          * defaults the behavior is the same as before.  So find
4338          * out, and only fail if the image has non-default values.
4339          */
4340         ret = -EINVAL;
4341         obj_size = (u64)1 << rbd_dev->header.obj_order;
4342         p = &striping_info_buf;
4343         stripe_unit = ceph_decode_64(&p);
4344         if (stripe_unit != obj_size) {
4345                 rbd_warn(rbd_dev, "unsupported stripe unit "
4346                                 "(got %llu want %llu)",
4347                                 stripe_unit, obj_size);
4348                 return -EINVAL;
4349         }
4350         stripe_count = ceph_decode_64(&p);
4351         if (stripe_count != 1) {
4352                 rbd_warn(rbd_dev, "unsupported stripe count "
4353                                 "(got %llu want 1)", stripe_count);
4354                 return -EINVAL;
4355         }
4356         rbd_dev->header.stripe_unit = stripe_unit;
4357         rbd_dev->header.stripe_count = stripe_count;
4358
4359         return 0;
4360 }
4361
4362 static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
4363 {
4364         size_t image_id_size;
4365         char *image_id;
4366         void *p;
4367         void *end;
4368         size_t size;
4369         void *reply_buf = NULL;
4370         size_t len = 0;
4371         char *image_name = NULL;
4372         int ret;
4373
4374         rbd_assert(!rbd_dev->spec->image_name);
4375
4376         len = strlen(rbd_dev->spec->image_id);
4377         image_id_size = sizeof (__le32) + len;
4378         image_id = kmalloc(image_id_size, GFP_KERNEL);
4379         if (!image_id)
4380                 return NULL;
4381
4382         p = image_id;
4383         end = image_id + image_id_size;
4384         ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len);
4385
4386         size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
4387         reply_buf = kmalloc(size, GFP_KERNEL);
4388         if (!reply_buf)
4389                 goto out;
4390
4391         ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY,
4392                                 "rbd", "dir_get_name",
4393                                 image_id, image_id_size,
4394                                 reply_buf, size);
4395         if (ret < 0)
4396                 goto out;
4397         p = reply_buf;
4398         end = reply_buf + ret;
4399
4400         image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
4401         if (IS_ERR(image_name))
4402                 image_name = NULL;
4403         else
4404                 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
4405 out:
4406         kfree(reply_buf);
4407         kfree(image_id);
4408
4409         return image_name;
4410 }
4411
4412 static u64 rbd_v1_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
4413 {
4414         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
4415         const char *snap_name;
4416         u32 which = 0;
4417
4418         /* Skip over names until we find the one we are looking for */
4419
4420         snap_name = rbd_dev->header.snap_names;
4421         while (which < snapc->num_snaps) {
4422                 if (!strcmp(name, snap_name))
4423                         return snapc->snaps[which];
4424                 snap_name += strlen(snap_name) + 1;
4425                 which++;
4426         }
4427         return CEPH_NOSNAP;
4428 }
4429
4430 static u64 rbd_v2_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
4431 {
4432         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
4433         u32 which;
4434         bool found = false;
4435         u64 snap_id;
4436
4437         for (which = 0; !found && which < snapc->num_snaps; which++) {
4438                 const char *snap_name;
4439
4440                 snap_id = snapc->snaps[which];
4441                 snap_name = rbd_dev_v2_snap_name(rbd_dev, snap_id);
4442                 if (IS_ERR(snap_name)) {
4443                         /* ignore no-longer existing snapshots */
4444                         if (PTR_ERR(snap_name) == -ENOENT)
4445                                 continue;
4446                         else
4447                                 break;
4448                 }
4449                 found = !strcmp(name, snap_name);
4450                 kfree(snap_name);
4451         }
4452         return found ? snap_id : CEPH_NOSNAP;
4453 }
4454
4455 /*
4456  * Assumes name is never RBD_SNAP_HEAD_NAME; returns CEPH_NOSNAP if
4457  * no snapshot by that name is found, or if an error occurs.
4458  */
4459 static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
4460 {
4461         if (rbd_dev->image_format == 1)
4462                 return rbd_v1_snap_id_by_name(rbd_dev, name);
4463
4464         return rbd_v2_snap_id_by_name(rbd_dev, name);
4465 }
4466
4467 /*
4468  * An image being mapped will have everything but the snap id.
4469  */
4470 static int rbd_spec_fill_snap_id(struct rbd_device *rbd_dev)
4471 {
4472         struct rbd_spec *spec = rbd_dev->spec;
4473
4474         rbd_assert(spec->pool_id != CEPH_NOPOOL && spec->pool_name);
4475         rbd_assert(spec->image_id && spec->image_name);
4476         rbd_assert(spec->snap_name);
4477
4478         if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
4479                 u64 snap_id;
4480
4481                 snap_id = rbd_snap_id_by_name(rbd_dev, spec->snap_name);
4482                 if (snap_id == CEPH_NOSNAP)
4483                         return -ENOENT;
4484
4485                 spec->snap_id = snap_id;
4486         } else {
4487                 spec->snap_id = CEPH_NOSNAP;
4488         }
4489
4490         return 0;
4491 }
4492
4493 /*
4494  * A parent image will have all ids but none of the names.
4495  *
4496  * All names in an rbd spec are dynamically allocated.  It's OK if we
4497  * can't figure out the name for an image id.
4498  */
4499 static int rbd_spec_fill_names(struct rbd_device *rbd_dev)
4500 {
4501         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4502         struct rbd_spec *spec = rbd_dev->spec;
4503         const char *pool_name;
4504         const char *image_name;
4505         const char *snap_name;
4506         int ret;
4507
4508         rbd_assert(spec->pool_id != CEPH_NOPOOL);
4509         rbd_assert(spec->image_id);
4510         rbd_assert(spec->snap_id != CEPH_NOSNAP);
4511
4512         /* Get the pool name; we have to make our own copy of this */
4513
4514         pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id);
4515         if (!pool_name) {
4516                 rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id);
4517                 return -EIO;
4518         }
4519         pool_name = kstrdup(pool_name, GFP_KERNEL);
4520         if (!pool_name)
4521                 return -ENOMEM;
4522
4523         /* Fetch the image name; tolerate failure here */
4524
4525         image_name = rbd_dev_image_name(rbd_dev);
4526         if (!image_name)
4527                 rbd_warn(rbd_dev, "unable to get image name");
4528
4529         /* Fetch the snapshot name */
4530
4531         snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
4532         if (IS_ERR(snap_name)) {
4533                 ret = PTR_ERR(snap_name);
4534                 goto out_err;
4535         }
4536
4537         spec->pool_name = pool_name;
4538         spec->image_name = image_name;
4539         spec->snap_name = snap_name;
4540
4541         return 0;
4542
4543 out_err:
4544         kfree(image_name);
4545         kfree(pool_name);
4546         return ret;
4547 }
4548
4549 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev)
4550 {
4551         size_t size;
4552         int ret;
4553         void *reply_buf;
4554         void *p;
4555         void *end;
4556         u64 seq;
4557         u32 snap_count;
4558         struct ceph_snap_context *snapc;
4559         u32 i;
4560
4561         /*
4562          * We'll need room for the seq value (maximum snapshot id),
4563          * snapshot count, and array of that many snapshot ids.
4564          * For now we have a fixed upper limit on the number we're
4565          * prepared to receive.
4566          */
4567         size = sizeof (__le64) + sizeof (__le32) +
4568                         RBD_MAX_SNAP_COUNT * sizeof (__le64);
4569         reply_buf = kzalloc(size, GFP_KERNEL);
4570         if (!reply_buf)
4571                 return -ENOMEM;
4572
4573         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
4574                                 "rbd", "get_snapcontext", NULL, 0,
4575                                 reply_buf, size);
4576         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4577         if (ret < 0)
4578                 goto out;
4579
4580         p = reply_buf;
4581         end = reply_buf + ret;
4582         ret = -ERANGE;
4583         ceph_decode_64_safe(&p, end, seq, out);
4584         ceph_decode_32_safe(&p, end, snap_count, out);
4585
4586         /*
4587          * Make sure the reported number of snapshot ids wouldn't go
4588          * beyond the end of our buffer.  But before checking that,
4589          * make sure the computed size of the snapshot context we
4590          * allocate is representable in a size_t.
4591          */
4592         if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
4593                                  / sizeof (u64)) {
4594                 ret = -EINVAL;
4595                 goto out;
4596         }
4597         if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
4598                 goto out;
4599         ret = 0;
4600
4601         snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
4602         if (!snapc) {
4603                 ret = -ENOMEM;
4604                 goto out;
4605         }
4606         snapc->seq = seq;
4607         for (i = 0; i < snap_count; i++)
4608                 snapc->snaps[i] = ceph_decode_64(&p);
4609
4610         ceph_put_snap_context(rbd_dev->header.snapc);
4611         rbd_dev->header.snapc = snapc;
4612
4613         dout("  snap context seq = %llu, snap_count = %u\n",
4614                 (unsigned long long)seq, (unsigned int)snap_count);
4615 out:
4616         kfree(reply_buf);
4617
4618         return ret;
4619 }
4620
4621 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
4622                                         u64 snap_id)
4623 {
4624         size_t size;
4625         void *reply_buf;
4626         __le64 snapid;
4627         int ret;
4628         void *p;
4629         void *end;
4630         char *snap_name;
4631
4632         size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
4633         reply_buf = kmalloc(size, GFP_KERNEL);
4634         if (!reply_buf)
4635                 return ERR_PTR(-ENOMEM);
4636
4637         snapid = cpu_to_le64(snap_id);
4638         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
4639                                 "rbd", "get_snapshot_name",
4640                                 &snapid, sizeof (snapid),
4641                                 reply_buf, size);
4642         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4643         if (ret < 0) {
4644                 snap_name = ERR_PTR(ret);
4645                 goto out;
4646         }
4647
4648         p = reply_buf;
4649         end = reply_buf + ret;
4650         snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
4651         if (IS_ERR(snap_name))
4652                 goto out;
4653
4654         dout("  snap_id 0x%016llx snap_name = %s\n",
4655                 (unsigned long long)snap_id, snap_name);
4656 out:
4657         kfree(reply_buf);
4658
4659         return snap_name;
4660 }
4661
4662 static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev)
4663 {
4664         bool first_time = rbd_dev->header.object_prefix == NULL;
4665         int ret;
4666
4667         ret = rbd_dev_v2_image_size(rbd_dev);
4668         if (ret)
4669                 return ret;
4670
4671         if (first_time) {
4672                 ret = rbd_dev_v2_header_onetime(rbd_dev);
4673                 if (ret)
4674                         return ret;
4675         }
4676
4677         ret = rbd_dev_v2_snap_context(rbd_dev);
4678         dout("rbd_dev_v2_snap_context returned %d\n", ret);
4679
4680         return ret;
4681 }
4682
4683 static int rbd_dev_header_info(struct rbd_device *rbd_dev)
4684 {
4685         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4686
4687         if (rbd_dev->image_format == 1)
4688                 return rbd_dev_v1_header_info(rbd_dev);
4689
4690         return rbd_dev_v2_header_info(rbd_dev);
4691 }
4692
4693 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
4694 {
4695         struct device *dev;
4696         int ret;
4697
4698         dev = &rbd_dev->dev;
4699         dev->bus = &rbd_bus_type;
4700         dev->type = &rbd_device_type;
4701         dev->parent = &rbd_root_dev;
4702         dev->release = rbd_dev_device_release;
4703         dev_set_name(dev, "%d", rbd_dev->dev_id);
4704         ret = device_register(dev);
4705
4706         return ret;
4707 }
4708
4709 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
4710 {
4711         device_unregister(&rbd_dev->dev);
4712 }
4713
4714 /*
4715  * Get a unique rbd identifier for the given new rbd_dev, and add
4716  * the rbd_dev to the global list.
4717  */
4718 static int rbd_dev_id_get(struct rbd_device *rbd_dev)
4719 {
4720         int new_dev_id;
4721
4722         new_dev_id = ida_simple_get(&rbd_dev_id_ida,
4723                                     0, minor_to_rbd_dev_id(1 << MINORBITS),
4724                                     GFP_KERNEL);
4725         if (new_dev_id < 0)
4726                 return new_dev_id;
4727
4728         rbd_dev->dev_id = new_dev_id;
4729
4730         spin_lock(&rbd_dev_list_lock);
4731         list_add_tail(&rbd_dev->node, &rbd_dev_list);
4732         spin_unlock(&rbd_dev_list_lock);
4733
4734         dout("rbd_dev %p given dev id %d\n", rbd_dev, rbd_dev->dev_id);
4735
4736         return 0;
4737 }
4738
4739 /*
4740  * Remove an rbd_dev from the global list, and record that its
4741  * identifier is no longer in use.
4742  */
4743 static void rbd_dev_id_put(struct rbd_device *rbd_dev)
4744 {
4745         spin_lock(&rbd_dev_list_lock);
4746         list_del_init(&rbd_dev->node);
4747         spin_unlock(&rbd_dev_list_lock);
4748
4749         ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id);
4750
4751         dout("rbd_dev %p released dev id %d\n", rbd_dev, rbd_dev->dev_id);
4752 }
4753
4754 /*
4755  * Skips over white space at *buf, and updates *buf to point to the
4756  * first found non-space character (if any). Returns the length of
4757  * the token (string of non-white space characters) found.  Note
4758  * that *buf must be terminated with '\0'.
4759  */
4760 static inline size_t next_token(const char **buf)
4761 {
4762         /*
4763         * These are the characters that produce nonzero for
4764         * isspace() in the "C" and "POSIX" locales.
4765         */
4766         const char *spaces = " \f\n\r\t\v";
4767
4768         *buf += strspn(*buf, spaces);   /* Find start of token */
4769
4770         return strcspn(*buf, spaces);   /* Return token length */
4771 }
4772
4773 /*
4774  * Finds the next token in *buf, and if the provided token buffer is
4775  * big enough, copies the found token into it.  The result, if
4776  * copied, is guaranteed to be terminated with '\0'.  Note that *buf
4777  * must be terminated with '\0' on entry.
4778  *
4779  * Returns the length of the token found (not including the '\0').
4780  * Return value will be 0 if no token is found, and it will be >=
4781  * token_size if the token would not fit.
4782  *
4783  * The *buf pointer will be updated to point beyond the end of the
4784  * found token.  Note that this occurs even if the token buffer is
4785  * too small to hold it.
4786  */
4787 static inline size_t copy_token(const char **buf,
4788                                 char *token,
4789                                 size_t token_size)
4790 {
4791         size_t len;
4792
4793         len = next_token(buf);
4794         if (len < token_size) {
4795                 memcpy(token, *buf, len);
4796                 *(token + len) = '\0';
4797         }
4798         *buf += len;
4799
4800         return len;
4801 }
4802
4803 /*
4804  * Finds the next token in *buf, dynamically allocates a buffer big
4805  * enough to hold a copy of it, and copies the token into the new
4806  * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
4807  * that a duplicate buffer is created even for a zero-length token.
4808  *
4809  * Returns a pointer to the newly-allocated duplicate, or a null
4810  * pointer if memory for the duplicate was not available.  If
4811  * the lenp argument is a non-null pointer, the length of the token
4812  * (not including the '\0') is returned in *lenp.
4813  *
4814  * If successful, the *buf pointer will be updated to point beyond
4815  * the end of the found token.
4816  *
4817  * Note: uses GFP_KERNEL for allocation.
4818  */
4819 static inline char *dup_token(const char **buf, size_t *lenp)
4820 {
4821         char *dup;
4822         size_t len;
4823
4824         len = next_token(buf);
4825         dup = kmemdup(*buf, len + 1, GFP_KERNEL);
4826         if (!dup)
4827                 return NULL;
4828         *(dup + len) = '\0';
4829         *buf += len;
4830
4831         if (lenp)
4832                 *lenp = len;
4833
4834         return dup;
4835 }
4836
4837 /*
4838  * Parse the options provided for an "rbd add" (i.e., rbd image
4839  * mapping) request.  These arrive via a write to /sys/bus/rbd/add,
4840  * and the data written is passed here via a NUL-terminated buffer.
4841  * Returns 0 if successful or an error code otherwise.
4842  *
4843  * The information extracted from these options is recorded in
4844  * the other parameters which return dynamically-allocated
4845  * structures:
4846  *  ceph_opts
4847  *      The address of a pointer that will refer to a ceph options
4848  *      structure.  Caller must release the returned pointer using
4849  *      ceph_destroy_options() when it is no longer needed.
4850  *  rbd_opts
4851  *      Address of an rbd options pointer.  Fully initialized by
4852  *      this function; caller must release with kfree().
4853  *  spec
4854  *      Address of an rbd image specification pointer.  Fully
4855  *      initialized by this function based on parsed options.
4856  *      Caller must release with rbd_spec_put().
4857  *
4858  * The options passed take this form:
4859  *  <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
4860  * where:
4861  *  <mon_addrs>
4862  *      A comma-separated list of one or more monitor addresses.
4863  *      A monitor address is an ip address, optionally followed
4864  *      by a port number (separated by a colon).
4865  *        I.e.:  ip1[:port1][,ip2[:port2]...]
4866  *  <options>
4867  *      A comma-separated list of ceph and/or rbd options.
4868  *  <pool_name>
4869  *      The name of the rados pool containing the rbd image.
4870  *  <image_name>
4871  *      The name of the image in that pool to map.
4872  *  <snap_id>
4873  *      An optional snapshot id.  If provided, the mapping will
4874  *      present data from the image at the time that snapshot was
4875  *      created.  The image head is used if no snapshot id is
4876  *      provided.  Snapshot mappings are always read-only.
4877  */
4878 static int rbd_add_parse_args(const char *buf,
4879                                 struct ceph_options **ceph_opts,
4880                                 struct rbd_options **opts,
4881                                 struct rbd_spec **rbd_spec)
4882 {
4883         size_t len;
4884         char *options;
4885         const char *mon_addrs;
4886         char *snap_name;
4887         size_t mon_addrs_size;
4888         struct rbd_spec *spec = NULL;
4889         struct rbd_options *rbd_opts = NULL;
4890         struct ceph_options *copts;
4891         int ret;
4892
4893         /* The first four tokens are required */
4894
4895         len = next_token(&buf);
4896         if (!len) {
4897                 rbd_warn(NULL, "no monitor address(es) provided");
4898                 return -EINVAL;
4899         }
4900         mon_addrs = buf;
4901         mon_addrs_size = len + 1;
4902         buf += len;
4903
4904         ret = -EINVAL;
4905         options = dup_token(&buf, NULL);
4906         if (!options)
4907                 return -ENOMEM;
4908         if (!*options) {
4909                 rbd_warn(NULL, "no options provided");
4910                 goto out_err;
4911         }
4912
4913         spec = rbd_spec_alloc();
4914         if (!spec)
4915                 goto out_mem;
4916
4917         spec->pool_name = dup_token(&buf, NULL);
4918         if (!spec->pool_name)
4919                 goto out_mem;
4920         if (!*spec->pool_name) {
4921                 rbd_warn(NULL, "no pool name provided");
4922                 goto out_err;
4923         }
4924
4925         spec->image_name = dup_token(&buf, NULL);
4926         if (!spec->image_name)
4927                 goto out_mem;
4928         if (!*spec->image_name) {
4929                 rbd_warn(NULL, "no image name provided");
4930                 goto out_err;
4931         }
4932
4933         /*
4934          * Snapshot name is optional; default is to use "-"
4935          * (indicating the head/no snapshot).
4936          */
4937         len = next_token(&buf);
4938         if (!len) {
4939                 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
4940                 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
4941         } else if (len > RBD_MAX_SNAP_NAME_LEN) {
4942                 ret = -ENAMETOOLONG;
4943                 goto out_err;
4944         }
4945         snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
4946         if (!snap_name)
4947                 goto out_mem;
4948         *(snap_name + len) = '\0';
4949         spec->snap_name = snap_name;
4950
4951         /* Initialize all rbd options to the defaults */
4952
4953         rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
4954         if (!rbd_opts)
4955                 goto out_mem;
4956
4957         rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
4958
4959         copts = ceph_parse_options(options, mon_addrs,
4960                                         mon_addrs + mon_addrs_size - 1,
4961                                         parse_rbd_opts_token, rbd_opts);
4962         if (IS_ERR(copts)) {
4963                 ret = PTR_ERR(copts);
4964                 goto out_err;
4965         }
4966         kfree(options);
4967
4968         *ceph_opts = copts;
4969         *opts = rbd_opts;
4970         *rbd_spec = spec;
4971
4972         return 0;
4973 out_mem:
4974         ret = -ENOMEM;
4975 out_err:
4976         kfree(rbd_opts);
4977         rbd_spec_put(spec);
4978         kfree(options);
4979
4980         return ret;
4981 }
4982
4983 /*
4984  * Return pool id (>= 0) or a negative error code.
4985  */
4986 static int rbd_add_get_pool_id(struct rbd_client *rbdc, const char *pool_name)
4987 {
4988         u64 newest_epoch;
4989         unsigned long timeout = rbdc->client->options->mount_timeout * HZ;
4990         int tries = 0;
4991         int ret;
4992
4993 again:
4994         ret = ceph_pg_poolid_by_name(rbdc->client->osdc.osdmap, pool_name);
4995         if (ret == -ENOENT && tries++ < 1) {
4996                 ret = ceph_monc_do_get_version(&rbdc->client->monc, "osdmap",
4997                                                &newest_epoch);
4998                 if (ret < 0)
4999                         return ret;
5000
5001                 if (rbdc->client->osdc.osdmap->epoch < newest_epoch) {
5002                         ceph_monc_request_next_osdmap(&rbdc->client->monc);
5003                         (void) ceph_monc_wait_osdmap(&rbdc->client->monc,
5004                                                      newest_epoch, timeout);
5005                         goto again;
5006                 } else {
5007                         /* the osdmap we have is new enough */
5008                         return -ENOENT;
5009                 }
5010         }
5011
5012         return ret;
5013 }
5014
5015 /*
5016  * An rbd format 2 image has a unique identifier, distinct from the
5017  * name given to it by the user.  Internally, that identifier is
5018  * what's used to specify the names of objects related to the image.
5019  *
5020  * A special "rbd id" object is used to map an rbd image name to its
5021  * id.  If that object doesn't exist, then there is no v2 rbd image
5022  * with the supplied name.
5023  *
5024  * This function will record the given rbd_dev's image_id field if
5025  * it can be determined, and in that case will return 0.  If any
5026  * errors occur a negative errno will be returned and the rbd_dev's
5027  * image_id field will be unchanged (and should be NULL).
5028  */
5029 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
5030 {
5031         int ret;
5032         size_t size;
5033         char *object_name;
5034         void *response;
5035         char *image_id;
5036
5037         /*
5038          * When probing a parent image, the image id is already
5039          * known (and the image name likely is not).  There's no
5040          * need to fetch the image id again in this case.  We
5041          * do still need to set the image format though.
5042          */
5043         if (rbd_dev->spec->image_id) {
5044                 rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1;
5045
5046                 return 0;
5047         }
5048
5049         /*
5050          * First, see if the format 2 image id file exists, and if
5051          * so, get the image's persistent id from it.
5052          */
5053         size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
5054         object_name = kmalloc(size, GFP_NOIO);
5055         if (!object_name)
5056                 return -ENOMEM;
5057         sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
5058         dout("rbd id object name is %s\n", object_name);
5059
5060         /* Response will be an encoded string, which includes a length */
5061
5062         size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
5063         response = kzalloc(size, GFP_NOIO);
5064         if (!response) {
5065                 ret = -ENOMEM;
5066                 goto out;
5067         }
5068
5069         /* If it doesn't exist we'll assume it's a format 1 image */
5070
5071         ret = rbd_obj_method_sync(rbd_dev, object_name,
5072                                 "rbd", "get_id", NULL, 0,
5073                                 response, RBD_IMAGE_ID_LEN_MAX);
5074         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5075         if (ret == -ENOENT) {
5076                 image_id = kstrdup("", GFP_KERNEL);
5077                 ret = image_id ? 0 : -ENOMEM;
5078                 if (!ret)
5079                         rbd_dev->image_format = 1;
5080         } else if (ret >= 0) {
5081                 void *p = response;
5082
5083                 image_id = ceph_extract_encoded_string(&p, p + ret,
5084                                                 NULL, GFP_NOIO);
5085                 ret = PTR_ERR_OR_ZERO(image_id);
5086                 if (!ret)
5087                         rbd_dev->image_format = 2;
5088         }
5089
5090         if (!ret) {
5091                 rbd_dev->spec->image_id = image_id;
5092                 dout("image_id is %s\n", image_id);
5093         }
5094 out:
5095         kfree(response);
5096         kfree(object_name);
5097
5098         return ret;
5099 }
5100
5101 /*
5102  * Undo whatever state changes are made by v1 or v2 header info
5103  * call.
5104  */
5105 static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
5106 {
5107         struct rbd_image_header *header;
5108
5109         rbd_dev_parent_put(rbd_dev);
5110
5111         /* Free dynamic fields from the header, then zero it out */
5112
5113         header = &rbd_dev->header;
5114         ceph_put_snap_context(header->snapc);
5115         kfree(header->snap_sizes);
5116         kfree(header->snap_names);
5117         kfree(header->object_prefix);
5118         memset(header, 0, sizeof (*header));
5119 }
5120
5121 static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev)
5122 {
5123         int ret;
5124
5125         ret = rbd_dev_v2_object_prefix(rbd_dev);
5126         if (ret)
5127                 goto out_err;
5128
5129         /*
5130          * Get the and check features for the image.  Currently the
5131          * features are assumed to never change.
5132          */
5133         ret = rbd_dev_v2_features(rbd_dev);
5134         if (ret)
5135                 goto out_err;
5136
5137         /* If the image supports fancy striping, get its parameters */
5138
5139         if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
5140                 ret = rbd_dev_v2_striping_info(rbd_dev);
5141                 if (ret < 0)
5142                         goto out_err;
5143         }
5144         /* No support for crypto and compression type format 2 images */
5145
5146         return 0;
5147 out_err:
5148         rbd_dev->header.features = 0;
5149         kfree(rbd_dev->header.object_prefix);
5150         rbd_dev->header.object_prefix = NULL;
5151
5152         return ret;
5153 }
5154
5155 static int rbd_dev_probe_parent(struct rbd_device *rbd_dev)
5156 {
5157         struct rbd_device *parent = NULL;
5158         struct rbd_spec *parent_spec;
5159         struct rbd_client *rbdc;
5160         int ret;
5161
5162         if (!rbd_dev->parent_spec)
5163                 return 0;
5164         /*
5165          * We need to pass a reference to the client and the parent
5166          * spec when creating the parent rbd_dev.  Images related by
5167          * parent/child relationships always share both.
5168          */
5169         parent_spec = rbd_spec_get(rbd_dev->parent_spec);
5170         rbdc = __rbd_get_client(rbd_dev->rbd_client);
5171
5172         ret = -ENOMEM;
5173         parent = rbd_dev_create(rbdc, parent_spec);
5174         if (!parent)
5175                 goto out_err;
5176
5177         ret = rbd_dev_image_probe(parent, false);
5178         if (ret < 0)
5179                 goto out_err;
5180         rbd_dev->parent = parent;
5181         atomic_set(&rbd_dev->parent_ref, 1);
5182
5183         return 0;
5184 out_err:
5185         if (parent) {
5186                 rbd_dev_unparent(rbd_dev);
5187                 kfree(rbd_dev->header_name);
5188                 rbd_dev_destroy(parent);
5189         } else {
5190                 rbd_put_client(rbdc);
5191                 rbd_spec_put(parent_spec);
5192         }
5193
5194         return ret;
5195 }
5196
5197 static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
5198 {
5199         int ret;
5200
5201         /* Get an id and fill in device name. */
5202
5203         ret = rbd_dev_id_get(rbd_dev);
5204         if (ret)
5205                 return ret;
5206
5207         BUILD_BUG_ON(DEV_NAME_LEN
5208                         < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
5209         sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
5210
5211         /* Record our major and minor device numbers. */
5212
5213         if (!single_major) {
5214                 ret = register_blkdev(0, rbd_dev->name);
5215                 if (ret < 0)
5216                         goto err_out_id;
5217
5218                 rbd_dev->major = ret;
5219                 rbd_dev->minor = 0;
5220         } else {
5221                 rbd_dev->major = rbd_major;
5222                 rbd_dev->minor = rbd_dev_id_to_minor(rbd_dev->dev_id);
5223         }
5224
5225         /* Set up the blkdev mapping. */
5226
5227         ret = rbd_init_disk(rbd_dev);
5228         if (ret)
5229                 goto err_out_blkdev;
5230
5231         ret = rbd_dev_mapping_set(rbd_dev);
5232         if (ret)
5233                 goto err_out_disk;
5234
5235         set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
5236         set_disk_ro(rbd_dev->disk, rbd_dev->mapping.read_only);
5237
5238         ret = rbd_bus_add_dev(rbd_dev);
5239         if (ret)
5240                 goto err_out_mapping;
5241
5242         /* Everything's ready.  Announce the disk to the world. */
5243
5244         set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
5245         add_disk(rbd_dev->disk);
5246
5247         pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
5248                 (unsigned long long) rbd_dev->mapping.size);
5249
5250         return ret;
5251
5252 err_out_mapping:
5253         rbd_dev_mapping_clear(rbd_dev);
5254 err_out_disk:
5255         rbd_free_disk(rbd_dev);
5256 err_out_blkdev:
5257         if (!single_major)
5258                 unregister_blkdev(rbd_dev->major, rbd_dev->name);
5259 err_out_id:
5260         rbd_dev_id_put(rbd_dev);
5261         rbd_dev_mapping_clear(rbd_dev);
5262
5263         return ret;
5264 }
5265
5266 static int rbd_dev_header_name(struct rbd_device *rbd_dev)
5267 {
5268         struct rbd_spec *spec = rbd_dev->spec;
5269         size_t size;
5270
5271         /* Record the header object name for this rbd image. */
5272
5273         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
5274
5275         if (rbd_dev->image_format == 1)
5276                 size = strlen(spec->image_name) + sizeof (RBD_SUFFIX);
5277         else
5278                 size = sizeof (RBD_HEADER_PREFIX) + strlen(spec->image_id);
5279
5280         rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
5281         if (!rbd_dev->header_name)
5282                 return -ENOMEM;
5283
5284         if (rbd_dev->image_format == 1)
5285                 sprintf(rbd_dev->header_name, "%s%s",
5286                         spec->image_name, RBD_SUFFIX);
5287         else
5288                 sprintf(rbd_dev->header_name, "%s%s",
5289                         RBD_HEADER_PREFIX, spec->image_id);
5290         return 0;
5291 }
5292
5293 static void rbd_dev_image_release(struct rbd_device *rbd_dev)
5294 {
5295         rbd_dev_unprobe(rbd_dev);
5296         kfree(rbd_dev->header_name);
5297         rbd_dev->header_name = NULL;
5298         rbd_dev->image_format = 0;
5299         kfree(rbd_dev->spec->image_id);
5300         rbd_dev->spec->image_id = NULL;
5301
5302         rbd_dev_destroy(rbd_dev);
5303 }
5304
5305 /*
5306  * Probe for the existence of the header object for the given rbd
5307  * device.  If this image is the one being mapped (i.e., not a
5308  * parent), initiate a watch on its header object before using that
5309  * object to get detailed information about the rbd image.
5310  */
5311 static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool mapping)
5312 {
5313         int ret;
5314
5315         /*
5316          * Get the id from the image id object.  Unless there's an
5317          * error, rbd_dev->spec->image_id will be filled in with
5318          * a dynamically-allocated string, and rbd_dev->image_format
5319          * will be set to either 1 or 2.
5320          */
5321         ret = rbd_dev_image_id(rbd_dev);
5322         if (ret)
5323                 return ret;
5324
5325         ret = rbd_dev_header_name(rbd_dev);
5326         if (ret)
5327                 goto err_out_format;
5328
5329         if (mapping) {
5330                 ret = rbd_dev_header_watch_sync(rbd_dev);
5331                 if (ret)
5332                         goto out_header_name;
5333         }
5334
5335         ret = rbd_dev_header_info(rbd_dev);
5336         if (ret)
5337                 goto err_out_watch;
5338
5339         /*
5340          * If this image is the one being mapped, we have pool name and
5341          * id, image name and id, and snap name - need to fill snap id.
5342          * Otherwise this is a parent image, identified by pool, image
5343          * and snap ids - need to fill in names for those ids.
5344          */
5345         if (mapping)
5346                 ret = rbd_spec_fill_snap_id(rbd_dev);
5347         else
5348                 ret = rbd_spec_fill_names(rbd_dev);
5349         if (ret)
5350                 goto err_out_probe;
5351
5352         if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
5353                 ret = rbd_dev_v2_parent_info(rbd_dev);
5354                 if (ret)
5355                         goto err_out_probe;
5356
5357                 /*
5358                  * Need to warn users if this image is the one being
5359                  * mapped and has a parent.
5360                  */
5361                 if (mapping && rbd_dev->parent_spec)
5362                         rbd_warn(rbd_dev,
5363                                  "WARNING: kernel layering is EXPERIMENTAL!");
5364         }
5365
5366         ret = rbd_dev_probe_parent(rbd_dev);
5367         if (ret)
5368                 goto err_out_probe;
5369
5370         dout("discovered format %u image, header name is %s\n",
5371                 rbd_dev->image_format, rbd_dev->header_name);
5372         return 0;
5373
5374 err_out_probe:
5375         rbd_dev_unprobe(rbd_dev);
5376 err_out_watch:
5377         if (mapping)
5378                 rbd_dev_header_unwatch_sync(rbd_dev);
5379 out_header_name:
5380         kfree(rbd_dev->header_name);
5381         rbd_dev->header_name = NULL;
5382 err_out_format:
5383         rbd_dev->image_format = 0;
5384         kfree(rbd_dev->spec->image_id);
5385         rbd_dev->spec->image_id = NULL;
5386         return ret;
5387 }
5388
5389 static ssize_t do_rbd_add(struct bus_type *bus,
5390                           const char *buf,
5391                           size_t count)
5392 {
5393         struct rbd_device *rbd_dev = NULL;
5394         struct ceph_options *ceph_opts = NULL;
5395         struct rbd_options *rbd_opts = NULL;
5396         struct rbd_spec *spec = NULL;
5397         struct rbd_client *rbdc;
5398         bool read_only;
5399         int rc = -ENOMEM;
5400
5401         if (!try_module_get(THIS_MODULE))
5402                 return -ENODEV;
5403
5404         /* parse add command */
5405         rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
5406         if (rc < 0)
5407                 goto err_out_module;
5408         read_only = rbd_opts->read_only;
5409         kfree(rbd_opts);
5410         rbd_opts = NULL;        /* done with this */
5411
5412         rbdc = rbd_get_client(ceph_opts);
5413         if (IS_ERR(rbdc)) {
5414                 rc = PTR_ERR(rbdc);
5415                 goto err_out_args;
5416         }
5417
5418         /* pick the pool */
5419         rc = rbd_add_get_pool_id(rbdc, spec->pool_name);
5420         if (rc < 0)
5421                 goto err_out_client;
5422         spec->pool_id = (u64)rc;
5423
5424         /* The ceph file layout needs to fit pool id in 32 bits */
5425
5426         if (spec->pool_id > (u64)U32_MAX) {
5427                 rbd_warn(NULL, "pool id too large (%llu > %u)",
5428                                 (unsigned long long)spec->pool_id, U32_MAX);
5429                 rc = -EIO;
5430                 goto err_out_client;
5431         }
5432
5433         rbd_dev = rbd_dev_create(rbdc, spec);
5434         if (!rbd_dev)
5435                 goto err_out_client;
5436         rbdc = NULL;            /* rbd_dev now owns this */
5437         spec = NULL;            /* rbd_dev now owns this */
5438
5439         rc = rbd_dev_image_probe(rbd_dev, true);
5440         if (rc < 0)
5441                 goto err_out_rbd_dev;
5442
5443         /* If we are mapping a snapshot it must be marked read-only */
5444
5445         if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
5446                 read_only = true;
5447         rbd_dev->mapping.read_only = read_only;
5448
5449         rc = rbd_dev_device_setup(rbd_dev);
5450         if (rc) {
5451                 /*
5452                  * rbd_dev_header_unwatch_sync() can't be moved into
5453                  * rbd_dev_image_release() without refactoring, see
5454                  * commit 1f3ef78861ac.
5455                  */
5456                 rbd_dev_header_unwatch_sync(rbd_dev);
5457                 rbd_dev_image_release(rbd_dev);
5458                 goto err_out_module;
5459         }
5460
5461         return count;
5462
5463 err_out_rbd_dev:
5464         rbd_dev_destroy(rbd_dev);
5465 err_out_client:
5466         rbd_put_client(rbdc);
5467 err_out_args:
5468         rbd_spec_put(spec);
5469 err_out_module:
5470         module_put(THIS_MODULE);
5471
5472         dout("Error adding device %s\n", buf);
5473
5474         return (ssize_t)rc;
5475 }
5476
5477 static ssize_t rbd_add(struct bus_type *bus,
5478                        const char *buf,
5479                        size_t count)
5480 {
5481         if (single_major)
5482                 return -EINVAL;
5483
5484         return do_rbd_add(bus, buf, count);
5485 }
5486
5487 static ssize_t rbd_add_single_major(struct bus_type *bus,
5488                                     const char *buf,
5489                                     size_t count)
5490 {
5491         return do_rbd_add(bus, buf, count);
5492 }
5493
5494 static void rbd_dev_device_release(struct device *dev)
5495 {
5496         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5497
5498         rbd_free_disk(rbd_dev);
5499         clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
5500         rbd_dev_mapping_clear(rbd_dev);
5501         if (!single_major)
5502                 unregister_blkdev(rbd_dev->major, rbd_dev->name);
5503         rbd_dev_id_put(rbd_dev);
5504         rbd_dev_mapping_clear(rbd_dev);
5505 }
5506
5507 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
5508 {
5509         while (rbd_dev->parent) {
5510                 struct rbd_device *first = rbd_dev;
5511                 struct rbd_device *second = first->parent;
5512                 struct rbd_device *third;
5513
5514                 /*
5515                  * Follow to the parent with no grandparent and
5516                  * remove it.
5517                  */
5518                 while (second && (third = second->parent)) {
5519                         first = second;
5520                         second = third;
5521                 }
5522                 rbd_assert(second);
5523                 rbd_dev_image_release(second);
5524                 first->parent = NULL;
5525                 first->parent_overlap = 0;
5526
5527                 rbd_assert(first->parent_spec);
5528                 rbd_spec_put(first->parent_spec);
5529                 first->parent_spec = NULL;
5530         }
5531 }
5532
5533 static ssize_t do_rbd_remove(struct bus_type *bus,
5534                              const char *buf,
5535                              size_t count)
5536 {
5537         struct rbd_device *rbd_dev = NULL;
5538         struct list_head *tmp;
5539         int dev_id;
5540         unsigned long ul;
5541         bool already = false;
5542         int ret;
5543
5544         ret = kstrtoul(buf, 10, &ul);
5545         if (ret)
5546                 return ret;
5547
5548         /* convert to int; abort if we lost anything in the conversion */
5549         dev_id = (int)ul;
5550         if (dev_id != ul)
5551                 return -EINVAL;
5552
5553         ret = -ENOENT;
5554         spin_lock(&rbd_dev_list_lock);
5555         list_for_each(tmp, &rbd_dev_list) {
5556                 rbd_dev = list_entry(tmp, struct rbd_device, node);
5557                 if (rbd_dev->dev_id == dev_id) {
5558                         ret = 0;
5559                         break;
5560                 }
5561         }
5562         if (!ret) {
5563                 spin_lock_irq(&rbd_dev->lock);
5564                 if (rbd_dev->open_count)
5565                         ret = -EBUSY;
5566                 else
5567                         already = test_and_set_bit(RBD_DEV_FLAG_REMOVING,
5568                                                         &rbd_dev->flags);
5569                 spin_unlock_irq(&rbd_dev->lock);
5570         }
5571         spin_unlock(&rbd_dev_list_lock);
5572         if (ret < 0 || already)
5573                 return ret;
5574
5575         rbd_dev_header_unwatch_sync(rbd_dev);
5576         /*
5577          * flush remaining watch callbacks - these must be complete
5578          * before the osd_client is shutdown
5579          */
5580         dout("%s: flushing notifies", __func__);
5581         ceph_osdc_flush_notifies(&rbd_dev->rbd_client->client->osdc);
5582
5583         /*
5584          * Don't free anything from rbd_dev->disk until after all
5585          * notifies are completely processed. Otherwise
5586          * rbd_bus_del_dev() will race with rbd_watch_cb(), resulting
5587          * in a potential use after free of rbd_dev->disk or rbd_dev.
5588          */
5589         rbd_bus_del_dev(rbd_dev);
5590         rbd_dev_image_release(rbd_dev);
5591         module_put(THIS_MODULE);
5592
5593         return count;
5594 }
5595
5596 static ssize_t rbd_remove(struct bus_type *bus,
5597                           const char *buf,
5598                           size_t count)
5599 {
5600         if (single_major)
5601                 return -EINVAL;
5602
5603         return do_rbd_remove(bus, buf, count);
5604 }
5605
5606 static ssize_t rbd_remove_single_major(struct bus_type *bus,
5607                                        const char *buf,
5608                                        size_t count)
5609 {
5610         return do_rbd_remove(bus, buf, count);
5611 }
5612
5613 /*
5614  * create control files in sysfs
5615  * /sys/bus/rbd/...
5616  */
5617 static int rbd_sysfs_init(void)
5618 {
5619         int ret;
5620
5621         ret = device_register(&rbd_root_dev);
5622         if (ret < 0)
5623                 return ret;
5624
5625         ret = bus_register(&rbd_bus_type);
5626         if (ret < 0)
5627                 device_unregister(&rbd_root_dev);
5628
5629         return ret;
5630 }
5631
5632 static void rbd_sysfs_cleanup(void)
5633 {
5634         bus_unregister(&rbd_bus_type);
5635         device_unregister(&rbd_root_dev);
5636 }
5637
5638 static int rbd_slab_init(void)
5639 {
5640         rbd_assert(!rbd_img_request_cache);
5641         rbd_img_request_cache = kmem_cache_create("rbd_img_request",
5642                                         sizeof (struct rbd_img_request),
5643                                         __alignof__(struct rbd_img_request),
5644                                         0, NULL);
5645         if (!rbd_img_request_cache)
5646                 return -ENOMEM;
5647
5648         rbd_assert(!rbd_obj_request_cache);
5649         rbd_obj_request_cache = kmem_cache_create("rbd_obj_request",
5650                                         sizeof (struct rbd_obj_request),
5651                                         __alignof__(struct rbd_obj_request),
5652                                         0, NULL);
5653         if (!rbd_obj_request_cache)
5654                 goto out_err;
5655
5656         rbd_assert(!rbd_segment_name_cache);
5657         rbd_segment_name_cache = kmem_cache_create("rbd_segment_name",
5658                                         CEPH_MAX_OID_NAME_LEN + 1, 1, 0, NULL);
5659         if (rbd_segment_name_cache)
5660                 return 0;
5661 out_err:
5662         if (rbd_obj_request_cache) {
5663                 kmem_cache_destroy(rbd_obj_request_cache);
5664                 rbd_obj_request_cache = NULL;
5665         }
5666
5667         kmem_cache_destroy(rbd_img_request_cache);
5668         rbd_img_request_cache = NULL;
5669
5670         return -ENOMEM;
5671 }
5672
5673 static void rbd_slab_exit(void)
5674 {
5675         rbd_assert(rbd_segment_name_cache);
5676         kmem_cache_destroy(rbd_segment_name_cache);
5677         rbd_segment_name_cache = NULL;
5678
5679         rbd_assert(rbd_obj_request_cache);
5680         kmem_cache_destroy(rbd_obj_request_cache);
5681         rbd_obj_request_cache = NULL;
5682
5683         rbd_assert(rbd_img_request_cache);
5684         kmem_cache_destroy(rbd_img_request_cache);
5685         rbd_img_request_cache = NULL;
5686 }
5687
5688 static int __init rbd_init(void)
5689 {
5690         int rc;
5691
5692         if (!libceph_compatible(NULL)) {
5693                 rbd_warn(NULL, "libceph incompatibility (quitting)");
5694                 return -EINVAL;
5695         }
5696
5697         rc = rbd_slab_init();
5698         if (rc)
5699                 return rc;
5700
5701         /*
5702          * The number of active work items is limited by the number of
5703          * rbd devices, so leave @max_active at default.
5704          */
5705         rbd_wq = alloc_workqueue(RBD_DRV_NAME, WQ_MEM_RECLAIM, 0);
5706         if (!rbd_wq) {
5707                 rc = -ENOMEM;
5708                 goto err_out_slab;
5709         }
5710
5711         if (single_major) {
5712                 rbd_major = register_blkdev(0, RBD_DRV_NAME);
5713                 if (rbd_major < 0) {
5714                         rc = rbd_major;
5715                         goto err_out_wq;
5716                 }
5717         }
5718
5719         rc = rbd_sysfs_init();
5720         if (rc)
5721                 goto err_out_blkdev;
5722
5723         if (single_major)
5724                 pr_info("loaded (major %d)\n", rbd_major);
5725         else
5726                 pr_info("loaded\n");
5727
5728         return 0;
5729
5730 err_out_blkdev:
5731         if (single_major)
5732                 unregister_blkdev(rbd_major, RBD_DRV_NAME);
5733 err_out_wq:
5734         destroy_workqueue(rbd_wq);
5735 err_out_slab:
5736         rbd_slab_exit();
5737         return rc;
5738 }
5739
5740 static void __exit rbd_exit(void)
5741 {
5742         ida_destroy(&rbd_dev_id_ida);
5743         rbd_sysfs_cleanup();
5744         if (single_major)
5745                 unregister_blkdev(rbd_major, RBD_DRV_NAME);
5746         destroy_workqueue(rbd_wq);
5747         rbd_slab_exit();
5748 }
5749
5750 module_init(rbd_init);
5751 module_exit(rbd_exit);
5752
5753 MODULE_AUTHOR("Alex Elder <elder@inktank.com>");
5754 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
5755 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
5756 /* following authorship retained from original osdblk.c */
5757 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
5758
5759 MODULE_DESCRIPTION("RADOS Block Device (RBD) driver");
5760 MODULE_LICENSE("GPL");