rbd: get rid of some version parameters
[linux.git] / drivers / block / rbd.c
1 /*
2    rbd.c -- Export ceph rados objects as a Linux block device
3
4
5    based on drivers/block/osdblk.c:
6
7    Copyright 2009 Red Hat, Inc.
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation.
12
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; see the file COPYING.  If not, write to
20    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
24    For usage instructions, please refer to:
25
26                  Documentation/ABI/testing/sysfs-bus-rbd
27
28  */
29
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
35
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
39 #include <linux/fs.h>
40 #include <linux/blkdev.h>
41
42 #include "rbd_types.h"
43
44 #define RBD_DEBUG       /* Activate rbd_assert() calls */
45
46 /*
47  * The basic unit of block I/O is a sector.  It is interpreted in a
48  * number of contexts in Linux (blk, bio, genhd), but the default is
49  * universally 512 bytes.  These symbols are just slightly more
50  * meaningful than the bare numbers they represent.
51  */
52 #define SECTOR_SHIFT    9
53 #define SECTOR_SIZE     (1ULL << SECTOR_SHIFT)
54
55 #define RBD_DRV_NAME "rbd"
56 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
57
58 #define RBD_MINORS_PER_MAJOR    256             /* max minors per blkdev */
59
60 #define RBD_SNAP_DEV_NAME_PREFIX        "snap_"
61 #define RBD_MAX_SNAP_NAME_LEN   \
62                         (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
63
64 #define RBD_MAX_SNAP_COUNT      510     /* allows max snapc to fit in 4KB */
65
66 #define RBD_SNAP_HEAD_NAME      "-"
67
68 /* This allows a single page to hold an image name sent by OSD */
69 #define RBD_IMAGE_NAME_LEN_MAX  (PAGE_SIZE - sizeof (__le32) - 1)
70 #define RBD_IMAGE_ID_LEN_MAX    64
71
72 #define RBD_OBJ_PREFIX_LEN_MAX  64
73
74 /* Feature bits */
75
76 #define RBD_FEATURE_LAYERING    (1<<0)
77 #define RBD_FEATURE_STRIPINGV2  (1<<1)
78 #define RBD_FEATURES_ALL \
79             (RBD_FEATURE_LAYERING | RBD_FEATURE_STRIPINGV2)
80
81 /* Features supported by this (client software) implementation. */
82
83 #define RBD_FEATURES_SUPPORTED  (RBD_FEATURES_ALL)
84
85 /*
86  * An RBD device name will be "rbd#", where the "rbd" comes from
87  * RBD_DRV_NAME above, and # is a unique integer identifier.
88  * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
89  * enough to hold all possible device names.
90  */
91 #define DEV_NAME_LEN            32
92 #define MAX_INT_FORMAT_WIDTH    ((5 * sizeof (int)) / 2 + 1)
93
94 /*
95  * block device image metadata (in-memory version)
96  */
97 struct rbd_image_header {
98         /* These four fields never change for a given rbd image */
99         char *object_prefix;
100         u64 features;
101         __u8 obj_order;
102         __u8 crypt_type;
103         __u8 comp_type;
104
105         /* The remaining fields need to be updated occasionally */
106         u64 image_size;
107         struct ceph_snap_context *snapc;
108         char *snap_names;
109         u64 *snap_sizes;
110
111         u64 stripe_unit;
112         u64 stripe_count;
113 };
114
115 /*
116  * An rbd image specification.
117  *
118  * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
119  * identify an image.  Each rbd_dev structure includes a pointer to
120  * an rbd_spec structure that encapsulates this identity.
121  *
122  * Each of the id's in an rbd_spec has an associated name.  For a
123  * user-mapped image, the names are supplied and the id's associated
124  * with them are looked up.  For a layered image, a parent image is
125  * defined by the tuple, and the names are looked up.
126  *
127  * An rbd_dev structure contains a parent_spec pointer which is
128  * non-null if the image it represents is a child in a layered
129  * image.  This pointer will refer to the rbd_spec structure used
130  * by the parent rbd_dev for its own identity (i.e., the structure
131  * is shared between the parent and child).
132  *
133  * Since these structures are populated once, during the discovery
134  * phase of image construction, they are effectively immutable so
135  * we make no effort to synchronize access to them.
136  *
137  * Note that code herein does not assume the image name is known (it
138  * could be a null pointer).
139  */
140 struct rbd_spec {
141         u64             pool_id;
142         const char      *pool_name;
143
144         const char      *image_id;
145         const char      *image_name;
146
147         u64             snap_id;
148         const char      *snap_name;
149
150         struct kref     kref;
151 };
152
153 /*
154  * an instance of the client.  multiple devices may share an rbd client.
155  */
156 struct rbd_client {
157         struct ceph_client      *client;
158         struct kref             kref;
159         struct list_head        node;
160 };
161
162 struct rbd_img_request;
163 typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
164
165 #define BAD_WHICH       U32_MAX         /* Good which or bad which, which? */
166
167 struct rbd_obj_request;
168 typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
169
170 enum obj_request_type {
171         OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
172 };
173
174 enum obj_req_flags {
175         OBJ_REQ_DONE,           /* completion flag: not done = 0, done = 1 */
176         OBJ_REQ_IMG_DATA,       /* object usage: standalone = 0, image = 1 */
177         OBJ_REQ_KNOWN,          /* EXISTS flag valid: no = 0, yes = 1 */
178         OBJ_REQ_EXISTS,         /* target exists: no = 0, yes = 1 */
179 };
180
181 struct rbd_obj_request {
182         const char              *object_name;
183         u64                     offset;         /* object start byte */
184         u64                     length;         /* bytes from offset */
185         unsigned long           flags;
186
187         /*
188          * An object request associated with an image will have its
189          * img_data flag set; a standalone object request will not.
190          *
191          * A standalone object request will have which == BAD_WHICH
192          * and a null obj_request pointer.
193          *
194          * An object request initiated in support of a layered image
195          * object (to check for its existence before a write) will
196          * have which == BAD_WHICH and a non-null obj_request pointer.
197          *
198          * Finally, an object request for rbd image data will have
199          * which != BAD_WHICH, and will have a non-null img_request
200          * pointer.  The value of which will be in the range
201          * 0..(img_request->obj_request_count-1).
202          */
203         union {
204                 struct rbd_obj_request  *obj_request;   /* STAT op */
205                 struct {
206                         struct rbd_img_request  *img_request;
207                         u64                     img_offset;
208                         /* links for img_request->obj_requests list */
209                         struct list_head        links;
210                 };
211         };
212         u32                     which;          /* posn image request list */
213
214         enum obj_request_type   type;
215         union {
216                 struct bio      *bio_list;
217                 struct {
218                         struct page     **pages;
219                         u32             page_count;
220                 };
221         };
222         struct page             **copyup_pages;
223
224         struct ceph_osd_request *osd_req;
225
226         u64                     xferred;        /* bytes transferred */
227         u64                     version;
228         int                     result;
229
230         rbd_obj_callback_t      callback;
231         struct completion       completion;
232
233         struct kref             kref;
234 };
235
236 enum img_req_flags {
237         IMG_REQ_WRITE,          /* I/O direction: read = 0, write = 1 */
238         IMG_REQ_CHILD,          /* initiator: block = 0, child image = 1 */
239         IMG_REQ_LAYERED,        /* ENOENT handling: normal = 0, layered = 1 */
240 };
241
242 struct rbd_img_request {
243         struct rbd_device       *rbd_dev;
244         u64                     offset; /* starting image byte offset */
245         u64                     length; /* byte count from offset */
246         unsigned long           flags;
247         union {
248                 u64                     snap_id;        /* for reads */
249                 struct ceph_snap_context *snapc;        /* for writes */
250         };
251         union {
252                 struct request          *rq;            /* block request */
253                 struct rbd_obj_request  *obj_request;   /* obj req initiator */
254         };
255         struct page             **copyup_pages;
256         spinlock_t              completion_lock;/* protects next_completion */
257         u32                     next_completion;
258         rbd_img_callback_t      callback;
259         u64                     xferred;/* aggregate bytes transferred */
260         int                     result; /* first nonzero obj_request result */
261
262         u32                     obj_request_count;
263         struct list_head        obj_requests;   /* rbd_obj_request structs */
264
265         struct kref             kref;
266 };
267
268 #define for_each_obj_request(ireq, oreq) \
269         list_for_each_entry(oreq, &(ireq)->obj_requests, links)
270 #define for_each_obj_request_from(ireq, oreq) \
271         list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
272 #define for_each_obj_request_safe(ireq, oreq, n) \
273         list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
274
275 struct rbd_snap {
276         const char              *name;
277         u64                     size;
278         struct list_head        node;
279         u64                     id;
280         u64                     features;
281 };
282
283 struct rbd_mapping {
284         u64                     size;
285         u64                     features;
286         bool                    read_only;
287 };
288
289 /*
290  * a single device
291  */
292 struct rbd_device {
293         int                     dev_id;         /* blkdev unique id */
294
295         int                     major;          /* blkdev assigned major */
296         struct gendisk          *disk;          /* blkdev's gendisk and rq */
297
298         u32                     image_format;   /* Either 1 or 2 */
299         struct rbd_client       *rbd_client;
300
301         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
302
303         spinlock_t              lock;           /* queue, flags, open_count */
304
305         struct rbd_image_header header;
306         unsigned long           flags;          /* possibly lock protected */
307         struct rbd_spec         *spec;
308
309         char                    *header_name;
310
311         struct ceph_file_layout layout;
312
313         struct ceph_osd_event   *watch_event;
314         struct rbd_obj_request  *watch_request;
315
316         struct rbd_spec         *parent_spec;
317         u64                     parent_overlap;
318         struct rbd_device       *parent;
319
320         /* protects updating the header */
321         struct rw_semaphore     header_rwsem;
322
323         struct rbd_mapping      mapping;
324
325         struct list_head        node;
326
327         /* list of snapshots */
328         struct list_head        snaps;
329
330         /* sysfs related */
331         struct device           dev;
332         unsigned long           open_count;     /* protected by lock */
333 };
334
335 /*
336  * Flag bits for rbd_dev->flags.  If atomicity is required,
337  * rbd_dev->lock is used to protect access.
338  *
339  * Currently, only the "removing" flag (which is coupled with the
340  * "open_count" field) requires atomic access.
341  */
342 enum rbd_dev_flags {
343         RBD_DEV_FLAG_EXISTS,    /* mapped snapshot has not been deleted */
344         RBD_DEV_FLAG_REMOVING,  /* this mapping is being removed */
345 };
346
347 static DEFINE_MUTEX(ctl_mutex);   /* Serialize open/close/setup/teardown */
348
349 static LIST_HEAD(rbd_dev_list);    /* devices */
350 static DEFINE_SPINLOCK(rbd_dev_list_lock);
351
352 static LIST_HEAD(rbd_client_list);              /* clients */
353 static DEFINE_SPINLOCK(rbd_client_list_lock);
354
355 static int rbd_img_request_submit(struct rbd_img_request *img_request);
356
357 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev);
358
359 static void rbd_dev_device_release(struct device *dev);
360 static void rbd_snap_destroy(struct rbd_snap *snap);
361
362 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
363                        size_t count);
364 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
365                           size_t count);
366 static int rbd_dev_image_probe(struct rbd_device *rbd_dev);
367
368 static struct bus_attribute rbd_bus_attrs[] = {
369         __ATTR(add, S_IWUSR, NULL, rbd_add),
370         __ATTR(remove, S_IWUSR, NULL, rbd_remove),
371         __ATTR_NULL
372 };
373
374 static struct bus_type rbd_bus_type = {
375         .name           = "rbd",
376         .bus_attrs      = rbd_bus_attrs,
377 };
378
379 static void rbd_root_dev_release(struct device *dev)
380 {
381 }
382
383 static struct device rbd_root_dev = {
384         .init_name =    "rbd",
385         .release =      rbd_root_dev_release,
386 };
387
388 static __printf(2, 3)
389 void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
390 {
391         struct va_format vaf;
392         va_list args;
393
394         va_start(args, fmt);
395         vaf.fmt = fmt;
396         vaf.va = &args;
397
398         if (!rbd_dev)
399                 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
400         else if (rbd_dev->disk)
401                 printk(KERN_WARNING "%s: %s: %pV\n",
402                         RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
403         else if (rbd_dev->spec && rbd_dev->spec->image_name)
404                 printk(KERN_WARNING "%s: image %s: %pV\n",
405                         RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
406         else if (rbd_dev->spec && rbd_dev->spec->image_id)
407                 printk(KERN_WARNING "%s: id %s: %pV\n",
408                         RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
409         else    /* punt */
410                 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
411                         RBD_DRV_NAME, rbd_dev, &vaf);
412         va_end(args);
413 }
414
415 #ifdef RBD_DEBUG
416 #define rbd_assert(expr)                                                \
417                 if (unlikely(!(expr))) {                                \
418                         printk(KERN_ERR "\nAssertion failure in %s() "  \
419                                                 "at line %d:\n\n"       \
420                                         "\trbd_assert(%s);\n\n",        \
421                                         __func__, __LINE__, #expr);     \
422                         BUG();                                          \
423                 }
424 #else /* !RBD_DEBUG */
425 #  define rbd_assert(expr)      ((void) 0)
426 #endif /* !RBD_DEBUG */
427
428 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request);
429 static void rbd_img_parent_read(struct rbd_obj_request *obj_request);
430 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
431
432 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver);
433 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver);
434
435 static int rbd_open(struct block_device *bdev, fmode_t mode)
436 {
437         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
438         bool removing = false;
439
440         if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
441                 return -EROFS;
442
443         spin_lock_irq(&rbd_dev->lock);
444         if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
445                 removing = true;
446         else
447                 rbd_dev->open_count++;
448         spin_unlock_irq(&rbd_dev->lock);
449         if (removing)
450                 return -ENOENT;
451
452         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
453         (void) get_device(&rbd_dev->dev);
454         set_device_ro(bdev, rbd_dev->mapping.read_only);
455         mutex_unlock(&ctl_mutex);
456
457         return 0;
458 }
459
460 static int rbd_release(struct gendisk *disk, fmode_t mode)
461 {
462         struct rbd_device *rbd_dev = disk->private_data;
463         unsigned long open_count_before;
464
465         spin_lock_irq(&rbd_dev->lock);
466         open_count_before = rbd_dev->open_count--;
467         spin_unlock_irq(&rbd_dev->lock);
468         rbd_assert(open_count_before > 0);
469
470         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
471         put_device(&rbd_dev->dev);
472         mutex_unlock(&ctl_mutex);
473
474         return 0;
475 }
476
477 static const struct block_device_operations rbd_bd_ops = {
478         .owner                  = THIS_MODULE,
479         .open                   = rbd_open,
480         .release                = rbd_release,
481 };
482
483 /*
484  * Initialize an rbd client instance.
485  * We own *ceph_opts.
486  */
487 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
488 {
489         struct rbd_client *rbdc;
490         int ret = -ENOMEM;
491
492         dout("%s:\n", __func__);
493         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
494         if (!rbdc)
495                 goto out_opt;
496
497         kref_init(&rbdc->kref);
498         INIT_LIST_HEAD(&rbdc->node);
499
500         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
501
502         rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
503         if (IS_ERR(rbdc->client))
504                 goto out_mutex;
505         ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
506
507         ret = ceph_open_session(rbdc->client);
508         if (ret < 0)
509                 goto out_err;
510
511         spin_lock(&rbd_client_list_lock);
512         list_add_tail(&rbdc->node, &rbd_client_list);
513         spin_unlock(&rbd_client_list_lock);
514
515         mutex_unlock(&ctl_mutex);
516         dout("%s: rbdc %p\n", __func__, rbdc);
517
518         return rbdc;
519
520 out_err:
521         ceph_destroy_client(rbdc->client);
522 out_mutex:
523         mutex_unlock(&ctl_mutex);
524         kfree(rbdc);
525 out_opt:
526         if (ceph_opts)
527                 ceph_destroy_options(ceph_opts);
528         dout("%s: error %d\n", __func__, ret);
529
530         return ERR_PTR(ret);
531 }
532
533 static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
534 {
535         kref_get(&rbdc->kref);
536
537         return rbdc;
538 }
539
540 /*
541  * Find a ceph client with specific addr and configuration.  If
542  * found, bump its reference count.
543  */
544 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
545 {
546         struct rbd_client *client_node;
547         bool found = false;
548
549         if (ceph_opts->flags & CEPH_OPT_NOSHARE)
550                 return NULL;
551
552         spin_lock(&rbd_client_list_lock);
553         list_for_each_entry(client_node, &rbd_client_list, node) {
554                 if (!ceph_compare_options(ceph_opts, client_node->client)) {
555                         __rbd_get_client(client_node);
556
557                         found = true;
558                         break;
559                 }
560         }
561         spin_unlock(&rbd_client_list_lock);
562
563         return found ? client_node : NULL;
564 }
565
566 /*
567  * mount options
568  */
569 enum {
570         Opt_last_int,
571         /* int args above */
572         Opt_last_string,
573         /* string args above */
574         Opt_read_only,
575         Opt_read_write,
576         /* Boolean args above */
577         Opt_last_bool,
578 };
579
580 static match_table_t rbd_opts_tokens = {
581         /* int args above */
582         /* string args above */
583         {Opt_read_only, "read_only"},
584         {Opt_read_only, "ro"},          /* Alternate spelling */
585         {Opt_read_write, "read_write"},
586         {Opt_read_write, "rw"},         /* Alternate spelling */
587         /* Boolean args above */
588         {-1, NULL}
589 };
590
591 struct rbd_options {
592         bool    read_only;
593 };
594
595 #define RBD_READ_ONLY_DEFAULT   false
596
597 static int parse_rbd_opts_token(char *c, void *private)
598 {
599         struct rbd_options *rbd_opts = private;
600         substring_t argstr[MAX_OPT_ARGS];
601         int token, intval, ret;
602
603         token = match_token(c, rbd_opts_tokens, argstr);
604         if (token < 0)
605                 return -EINVAL;
606
607         if (token < Opt_last_int) {
608                 ret = match_int(&argstr[0], &intval);
609                 if (ret < 0) {
610                         pr_err("bad mount option arg (not int) "
611                                "at '%s'\n", c);
612                         return ret;
613                 }
614                 dout("got int token %d val %d\n", token, intval);
615         } else if (token > Opt_last_int && token < Opt_last_string) {
616                 dout("got string token %d val %s\n", token,
617                      argstr[0].from);
618         } else if (token > Opt_last_string && token < Opt_last_bool) {
619                 dout("got Boolean token %d\n", token);
620         } else {
621                 dout("got token %d\n", token);
622         }
623
624         switch (token) {
625         case Opt_read_only:
626                 rbd_opts->read_only = true;
627                 break;
628         case Opt_read_write:
629                 rbd_opts->read_only = false;
630                 break;
631         default:
632                 rbd_assert(false);
633                 break;
634         }
635         return 0;
636 }
637
638 /*
639  * Get a ceph client with specific addr and configuration, if one does
640  * not exist create it.
641  */
642 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
643 {
644         struct rbd_client *rbdc;
645
646         rbdc = rbd_client_find(ceph_opts);
647         if (rbdc)       /* using an existing client */
648                 ceph_destroy_options(ceph_opts);
649         else
650                 rbdc = rbd_client_create(ceph_opts);
651
652         return rbdc;
653 }
654
655 /*
656  * Destroy ceph client
657  *
658  * Caller must hold rbd_client_list_lock.
659  */
660 static void rbd_client_release(struct kref *kref)
661 {
662         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
663
664         dout("%s: rbdc %p\n", __func__, rbdc);
665         spin_lock(&rbd_client_list_lock);
666         list_del(&rbdc->node);
667         spin_unlock(&rbd_client_list_lock);
668
669         ceph_destroy_client(rbdc->client);
670         kfree(rbdc);
671 }
672
673 /*
674  * Drop reference to ceph client node. If it's not referenced anymore, release
675  * it.
676  */
677 static void rbd_put_client(struct rbd_client *rbdc)
678 {
679         if (rbdc)
680                 kref_put(&rbdc->kref, rbd_client_release);
681 }
682
683 static bool rbd_image_format_valid(u32 image_format)
684 {
685         return image_format == 1 || image_format == 2;
686 }
687
688 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
689 {
690         size_t size;
691         u32 snap_count;
692
693         /* The header has to start with the magic rbd header text */
694         if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
695                 return false;
696
697         /* The bio layer requires at least sector-sized I/O */
698
699         if (ondisk->options.order < SECTOR_SHIFT)
700                 return false;
701
702         /* If we use u64 in a few spots we may be able to loosen this */
703
704         if (ondisk->options.order > 8 * sizeof (int) - 1)
705                 return false;
706
707         /*
708          * The size of a snapshot header has to fit in a size_t, and
709          * that limits the number of snapshots.
710          */
711         snap_count = le32_to_cpu(ondisk->snap_count);
712         size = SIZE_MAX - sizeof (struct ceph_snap_context);
713         if (snap_count > size / sizeof (__le64))
714                 return false;
715
716         /*
717          * Not only that, but the size of the entire the snapshot
718          * header must also be representable in a size_t.
719          */
720         size -= snap_count * sizeof (__le64);
721         if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
722                 return false;
723
724         return true;
725 }
726
727 /*
728  * Create a new header structure, translate header format from the on-disk
729  * header.
730  */
731 static int rbd_header_from_disk(struct rbd_image_header *header,
732                                  struct rbd_image_header_ondisk *ondisk)
733 {
734         u32 snap_count;
735         size_t len;
736         size_t size;
737         u32 i;
738
739         memset(header, 0, sizeof (*header));
740
741         snap_count = le32_to_cpu(ondisk->snap_count);
742
743         len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
744         header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
745         if (!header->object_prefix)
746                 return -ENOMEM;
747         memcpy(header->object_prefix, ondisk->object_prefix, len);
748         header->object_prefix[len] = '\0';
749
750         if (snap_count) {
751                 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
752
753                 /* Save a copy of the snapshot names */
754
755                 if (snap_names_len > (u64) SIZE_MAX)
756                         return -EIO;
757                 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
758                 if (!header->snap_names)
759                         goto out_err;
760                 /*
761                  * Note that rbd_dev_v1_header_read() guarantees
762                  * the ondisk buffer we're working with has
763                  * snap_names_len bytes beyond the end of the
764                  * snapshot id array, this memcpy() is safe.
765                  */
766                 memcpy(header->snap_names, &ondisk->snaps[snap_count],
767                         snap_names_len);
768
769                 /* Record each snapshot's size */
770
771                 size = snap_count * sizeof (*header->snap_sizes);
772                 header->snap_sizes = kmalloc(size, GFP_KERNEL);
773                 if (!header->snap_sizes)
774                         goto out_err;
775                 for (i = 0; i < snap_count; i++)
776                         header->snap_sizes[i] =
777                                 le64_to_cpu(ondisk->snaps[i].image_size);
778         } else {
779                 header->snap_names = NULL;
780                 header->snap_sizes = NULL;
781         }
782
783         header->features = 0;   /* No features support in v1 images */
784         header->obj_order = ondisk->options.order;
785         header->crypt_type = ondisk->options.crypt_type;
786         header->comp_type = ondisk->options.comp_type;
787
788         /* Allocate and fill in the snapshot context */
789
790         header->image_size = le64_to_cpu(ondisk->image_size);
791
792         header->snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
793         if (!header->snapc)
794                 goto out_err;
795         header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
796         for (i = 0; i < snap_count; i++)
797                 header->snapc->snaps[i] = le64_to_cpu(ondisk->snaps[i].id);
798
799         return 0;
800
801 out_err:
802         kfree(header->snap_sizes);
803         header->snap_sizes = NULL;
804         kfree(header->snap_names);
805         header->snap_names = NULL;
806         kfree(header->object_prefix);
807         header->object_prefix = NULL;
808
809         return -ENOMEM;
810 }
811
812 static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
813 {
814         struct rbd_snap *snap;
815
816         if (snap_id == CEPH_NOSNAP)
817                 return RBD_SNAP_HEAD_NAME;
818
819         list_for_each_entry(snap, &rbd_dev->snaps, node)
820                 if (snap_id == snap->id)
821                         return snap->name;
822
823         return NULL;
824 }
825
826 static struct rbd_snap *snap_by_name(struct rbd_device *rbd_dev,
827                                         const char *snap_name)
828 {
829         struct rbd_snap *snap;
830
831         list_for_each_entry(snap, &rbd_dev->snaps, node)
832                 if (!strcmp(snap_name, snap->name))
833                         return snap;
834
835         return NULL;
836 }
837
838 static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
839 {
840         if (!memcmp(rbd_dev->spec->snap_name, RBD_SNAP_HEAD_NAME,
841                     sizeof (RBD_SNAP_HEAD_NAME))) {
842                 rbd_dev->mapping.size = rbd_dev->header.image_size;
843                 rbd_dev->mapping.features = rbd_dev->header.features;
844         } else {
845                 struct rbd_snap *snap;
846
847                 snap = snap_by_name(rbd_dev, rbd_dev->spec->snap_name);
848                 if (!snap)
849                         return -ENOENT;
850                 rbd_dev->mapping.size = snap->size;
851                 rbd_dev->mapping.features = snap->features;
852                 rbd_dev->mapping.read_only = true;
853         }
854
855         return 0;
856 }
857
858 static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
859 {
860         rbd_dev->mapping.size = 0;
861         rbd_dev->mapping.features = 0;
862         rbd_dev->mapping.read_only = true;
863 }
864
865 static void rbd_dev_clear_mapping(struct rbd_device *rbd_dev)
866 {
867         rbd_dev->mapping.size = 0;
868         rbd_dev->mapping.features = 0;
869         rbd_dev->mapping.read_only = true;
870 }
871
872 static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
873 {
874         char *name;
875         u64 segment;
876         int ret;
877
878         name = kmalloc(MAX_OBJ_NAME_SIZE + 1, GFP_NOIO);
879         if (!name)
880                 return NULL;
881         segment = offset >> rbd_dev->header.obj_order;
882         ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, "%s.%012llx",
883                         rbd_dev->header.object_prefix, segment);
884         if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) {
885                 pr_err("error formatting segment name for #%llu (%d)\n",
886                         segment, ret);
887                 kfree(name);
888                 name = NULL;
889         }
890
891         return name;
892 }
893
894 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
895 {
896         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
897
898         return offset & (segment_size - 1);
899 }
900
901 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
902                                 u64 offset, u64 length)
903 {
904         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
905
906         offset &= segment_size - 1;
907
908         rbd_assert(length <= U64_MAX - offset);
909         if (offset + length > segment_size)
910                 length = segment_size - offset;
911
912         return length;
913 }
914
915 /*
916  * returns the size of an object in the image
917  */
918 static u64 rbd_obj_bytes(struct rbd_image_header *header)
919 {
920         return 1 << header->obj_order;
921 }
922
923 /*
924  * bio helpers
925  */
926
927 static void bio_chain_put(struct bio *chain)
928 {
929         struct bio *tmp;
930
931         while (chain) {
932                 tmp = chain;
933                 chain = chain->bi_next;
934                 bio_put(tmp);
935         }
936 }
937
938 /*
939  * zeros a bio chain, starting at specific offset
940  */
941 static void zero_bio_chain(struct bio *chain, int start_ofs)
942 {
943         struct bio_vec *bv;
944         unsigned long flags;
945         void *buf;
946         int i;
947         int pos = 0;
948
949         while (chain) {
950                 bio_for_each_segment(bv, chain, i) {
951                         if (pos + bv->bv_len > start_ofs) {
952                                 int remainder = max(start_ofs - pos, 0);
953                                 buf = bvec_kmap_irq(bv, &flags);
954                                 memset(buf + remainder, 0,
955                                        bv->bv_len - remainder);
956                                 bvec_kunmap_irq(buf, &flags);
957                         }
958                         pos += bv->bv_len;
959                 }
960
961                 chain = chain->bi_next;
962         }
963 }
964
965 /*
966  * similar to zero_bio_chain(), zeros data defined by a page array,
967  * starting at the given byte offset from the start of the array and
968  * continuing up to the given end offset.  The pages array is
969  * assumed to be big enough to hold all bytes up to the end.
970  */
971 static void zero_pages(struct page **pages, u64 offset, u64 end)
972 {
973         struct page **page = &pages[offset >> PAGE_SHIFT];
974
975         rbd_assert(end > offset);
976         rbd_assert(end - offset <= (u64)SIZE_MAX);
977         while (offset < end) {
978                 size_t page_offset;
979                 size_t length;
980                 unsigned long flags;
981                 void *kaddr;
982
983                 page_offset = (size_t)(offset & ~PAGE_MASK);
984                 length = min(PAGE_SIZE - page_offset, (size_t)(end - offset));
985                 local_irq_save(flags);
986                 kaddr = kmap_atomic(*page);
987                 memset(kaddr + page_offset, 0, length);
988                 kunmap_atomic(kaddr);
989                 local_irq_restore(flags);
990
991                 offset += length;
992                 page++;
993         }
994 }
995
996 /*
997  * Clone a portion of a bio, starting at the given byte offset
998  * and continuing for the number of bytes indicated.
999  */
1000 static struct bio *bio_clone_range(struct bio *bio_src,
1001                                         unsigned int offset,
1002                                         unsigned int len,
1003                                         gfp_t gfpmask)
1004 {
1005         struct bio_vec *bv;
1006         unsigned int resid;
1007         unsigned short idx;
1008         unsigned int voff;
1009         unsigned short end_idx;
1010         unsigned short vcnt;
1011         struct bio *bio;
1012
1013         /* Handle the easy case for the caller */
1014
1015         if (!offset && len == bio_src->bi_size)
1016                 return bio_clone(bio_src, gfpmask);
1017
1018         if (WARN_ON_ONCE(!len))
1019                 return NULL;
1020         if (WARN_ON_ONCE(len > bio_src->bi_size))
1021                 return NULL;
1022         if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
1023                 return NULL;
1024
1025         /* Find first affected segment... */
1026
1027         resid = offset;
1028         __bio_for_each_segment(bv, bio_src, idx, 0) {
1029                 if (resid < bv->bv_len)
1030                         break;
1031                 resid -= bv->bv_len;
1032         }
1033         voff = resid;
1034
1035         /* ...and the last affected segment */
1036
1037         resid += len;
1038         __bio_for_each_segment(bv, bio_src, end_idx, idx) {
1039                 if (resid <= bv->bv_len)
1040                         break;
1041                 resid -= bv->bv_len;
1042         }
1043         vcnt = end_idx - idx + 1;
1044
1045         /* Build the clone */
1046
1047         bio = bio_alloc(gfpmask, (unsigned int) vcnt);
1048         if (!bio)
1049                 return NULL;    /* ENOMEM */
1050
1051         bio->bi_bdev = bio_src->bi_bdev;
1052         bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
1053         bio->bi_rw = bio_src->bi_rw;
1054         bio->bi_flags |= 1 << BIO_CLONED;
1055
1056         /*
1057          * Copy over our part of the bio_vec, then update the first
1058          * and last (or only) entries.
1059          */
1060         memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
1061                         vcnt * sizeof (struct bio_vec));
1062         bio->bi_io_vec[0].bv_offset += voff;
1063         if (vcnt > 1) {
1064                 bio->bi_io_vec[0].bv_len -= voff;
1065                 bio->bi_io_vec[vcnt - 1].bv_len = resid;
1066         } else {
1067                 bio->bi_io_vec[0].bv_len = len;
1068         }
1069
1070         bio->bi_vcnt = vcnt;
1071         bio->bi_size = len;
1072         bio->bi_idx = 0;
1073
1074         return bio;
1075 }
1076
1077 /*
1078  * Clone a portion of a bio chain, starting at the given byte offset
1079  * into the first bio in the source chain and continuing for the
1080  * number of bytes indicated.  The result is another bio chain of
1081  * exactly the given length, or a null pointer on error.
1082  *
1083  * The bio_src and offset parameters are both in-out.  On entry they
1084  * refer to the first source bio and the offset into that bio where
1085  * the start of data to be cloned is located.
1086  *
1087  * On return, bio_src is updated to refer to the bio in the source
1088  * chain that contains first un-cloned byte, and *offset will
1089  * contain the offset of that byte within that bio.
1090  */
1091 static struct bio *bio_chain_clone_range(struct bio **bio_src,
1092                                         unsigned int *offset,
1093                                         unsigned int len,
1094                                         gfp_t gfpmask)
1095 {
1096         struct bio *bi = *bio_src;
1097         unsigned int off = *offset;
1098         struct bio *chain = NULL;
1099         struct bio **end;
1100
1101         /* Build up a chain of clone bios up to the limit */
1102
1103         if (!bi || off >= bi->bi_size || !len)
1104                 return NULL;            /* Nothing to clone */
1105
1106         end = &chain;
1107         while (len) {
1108                 unsigned int bi_size;
1109                 struct bio *bio;
1110
1111                 if (!bi) {
1112                         rbd_warn(NULL, "bio_chain exhausted with %u left", len);
1113                         goto out_err;   /* EINVAL; ran out of bio's */
1114                 }
1115                 bi_size = min_t(unsigned int, bi->bi_size - off, len);
1116                 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1117                 if (!bio)
1118                         goto out_err;   /* ENOMEM */
1119
1120                 *end = bio;
1121                 end = &bio->bi_next;
1122
1123                 off += bi_size;
1124                 if (off == bi->bi_size) {
1125                         bi = bi->bi_next;
1126                         off = 0;
1127                 }
1128                 len -= bi_size;
1129         }
1130         *bio_src = bi;
1131         *offset = off;
1132
1133         return chain;
1134 out_err:
1135         bio_chain_put(chain);
1136
1137         return NULL;
1138 }
1139
1140 /*
1141  * The default/initial value for all object request flags is 0.  For
1142  * each flag, once its value is set to 1 it is never reset to 0
1143  * again.
1144  */
1145 static void obj_request_img_data_set(struct rbd_obj_request *obj_request)
1146 {
1147         if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) {
1148                 struct rbd_device *rbd_dev;
1149
1150                 rbd_dev = obj_request->img_request->rbd_dev;
1151                 rbd_warn(rbd_dev, "obj_request %p already marked img_data\n",
1152                         obj_request);
1153         }
1154 }
1155
1156 static bool obj_request_img_data_test(struct rbd_obj_request *obj_request)
1157 {
1158         smp_mb();
1159         return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0;
1160 }
1161
1162 static void obj_request_done_set(struct rbd_obj_request *obj_request)
1163 {
1164         if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) {
1165                 struct rbd_device *rbd_dev = NULL;
1166
1167                 if (obj_request_img_data_test(obj_request))
1168                         rbd_dev = obj_request->img_request->rbd_dev;
1169                 rbd_warn(rbd_dev, "obj_request %p already marked done\n",
1170                         obj_request);
1171         }
1172 }
1173
1174 static bool obj_request_done_test(struct rbd_obj_request *obj_request)
1175 {
1176         smp_mb();
1177         return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0;
1178 }
1179
1180 /*
1181  * This sets the KNOWN flag after (possibly) setting the EXISTS
1182  * flag.  The latter is set based on the "exists" value provided.
1183  *
1184  * Note that for our purposes once an object exists it never goes
1185  * away again.  It's possible that the response from two existence
1186  * checks are separated by the creation of the target object, and
1187  * the first ("doesn't exist") response arrives *after* the second
1188  * ("does exist").  In that case we ignore the second one.
1189  */
1190 static void obj_request_existence_set(struct rbd_obj_request *obj_request,
1191                                 bool exists)
1192 {
1193         if (exists)
1194                 set_bit(OBJ_REQ_EXISTS, &obj_request->flags);
1195         set_bit(OBJ_REQ_KNOWN, &obj_request->flags);
1196         smp_mb();
1197 }
1198
1199 static bool obj_request_known_test(struct rbd_obj_request *obj_request)
1200 {
1201         smp_mb();
1202         return test_bit(OBJ_REQ_KNOWN, &obj_request->flags) != 0;
1203 }
1204
1205 static bool obj_request_exists_test(struct rbd_obj_request *obj_request)
1206 {
1207         smp_mb();
1208         return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0;
1209 }
1210
1211 static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1212 {
1213         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1214                 atomic_read(&obj_request->kref.refcount));
1215         kref_get(&obj_request->kref);
1216 }
1217
1218 static void rbd_obj_request_destroy(struct kref *kref);
1219 static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1220 {
1221         rbd_assert(obj_request != NULL);
1222         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1223                 atomic_read(&obj_request->kref.refcount));
1224         kref_put(&obj_request->kref, rbd_obj_request_destroy);
1225 }
1226
1227 static void rbd_img_request_get(struct rbd_img_request *img_request)
1228 {
1229         dout("%s: img %p (was %d)\n", __func__, img_request,
1230                 atomic_read(&img_request->kref.refcount));
1231         kref_get(&img_request->kref);
1232 }
1233
1234 static void rbd_img_request_destroy(struct kref *kref);
1235 static void rbd_img_request_put(struct rbd_img_request *img_request)
1236 {
1237         rbd_assert(img_request != NULL);
1238         dout("%s: img %p (was %d)\n", __func__, img_request,
1239                 atomic_read(&img_request->kref.refcount));
1240         kref_put(&img_request->kref, rbd_img_request_destroy);
1241 }
1242
1243 static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1244                                         struct rbd_obj_request *obj_request)
1245 {
1246         rbd_assert(obj_request->img_request == NULL);
1247
1248         /* Image request now owns object's original reference */
1249         obj_request->img_request = img_request;
1250         obj_request->which = img_request->obj_request_count;
1251         rbd_assert(!obj_request_img_data_test(obj_request));
1252         obj_request_img_data_set(obj_request);
1253         rbd_assert(obj_request->which != BAD_WHICH);
1254         img_request->obj_request_count++;
1255         list_add_tail(&obj_request->links, &img_request->obj_requests);
1256         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1257                 obj_request->which);
1258 }
1259
1260 static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1261                                         struct rbd_obj_request *obj_request)
1262 {
1263         rbd_assert(obj_request->which != BAD_WHICH);
1264
1265         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1266                 obj_request->which);
1267         list_del(&obj_request->links);
1268         rbd_assert(img_request->obj_request_count > 0);
1269         img_request->obj_request_count--;
1270         rbd_assert(obj_request->which == img_request->obj_request_count);
1271         obj_request->which = BAD_WHICH;
1272         rbd_assert(obj_request_img_data_test(obj_request));
1273         rbd_assert(obj_request->img_request == img_request);
1274         obj_request->img_request = NULL;
1275         obj_request->callback = NULL;
1276         rbd_obj_request_put(obj_request);
1277 }
1278
1279 static bool obj_request_type_valid(enum obj_request_type type)
1280 {
1281         switch (type) {
1282         case OBJ_REQUEST_NODATA:
1283         case OBJ_REQUEST_BIO:
1284         case OBJ_REQUEST_PAGES:
1285                 return true;
1286         default:
1287                 return false;
1288         }
1289 }
1290
1291 static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
1292                                 struct rbd_obj_request *obj_request)
1293 {
1294         dout("%s: osdc %p obj %p\n", __func__, osdc, obj_request);
1295
1296         return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
1297 }
1298
1299 static void rbd_img_request_complete(struct rbd_img_request *img_request)
1300 {
1301
1302         dout("%s: img %p\n", __func__, img_request);
1303
1304         /*
1305          * If no error occurred, compute the aggregate transfer
1306          * count for the image request.  We could instead use
1307          * atomic64_cmpxchg() to update it as each object request
1308          * completes; not clear which way is better off hand.
1309          */
1310         if (!img_request->result) {
1311                 struct rbd_obj_request *obj_request;
1312                 u64 xferred = 0;
1313
1314                 for_each_obj_request(img_request, obj_request)
1315                         xferred += obj_request->xferred;
1316                 img_request->xferred = xferred;
1317         }
1318
1319         if (img_request->callback)
1320                 img_request->callback(img_request);
1321         else
1322                 rbd_img_request_put(img_request);
1323 }
1324
1325 /* Caller is responsible for rbd_obj_request_destroy(obj_request) */
1326
1327 static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
1328 {
1329         dout("%s: obj %p\n", __func__, obj_request);
1330
1331         return wait_for_completion_interruptible(&obj_request->completion);
1332 }
1333
1334 /*
1335  * The default/initial value for all image request flags is 0.  Each
1336  * is conditionally set to 1 at image request initialization time
1337  * and currently never change thereafter.
1338  */
1339 static void img_request_write_set(struct rbd_img_request *img_request)
1340 {
1341         set_bit(IMG_REQ_WRITE, &img_request->flags);
1342         smp_mb();
1343 }
1344
1345 static bool img_request_write_test(struct rbd_img_request *img_request)
1346 {
1347         smp_mb();
1348         return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0;
1349 }
1350
1351 static void img_request_child_set(struct rbd_img_request *img_request)
1352 {
1353         set_bit(IMG_REQ_CHILD, &img_request->flags);
1354         smp_mb();
1355 }
1356
1357 static bool img_request_child_test(struct rbd_img_request *img_request)
1358 {
1359         smp_mb();
1360         return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0;
1361 }
1362
1363 static void img_request_layered_set(struct rbd_img_request *img_request)
1364 {
1365         set_bit(IMG_REQ_LAYERED, &img_request->flags);
1366         smp_mb();
1367 }
1368
1369 static bool img_request_layered_test(struct rbd_img_request *img_request)
1370 {
1371         smp_mb();
1372         return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1373 }
1374
1375 static void
1376 rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1377 {
1378         u64 xferred = obj_request->xferred;
1379         u64 length = obj_request->length;
1380
1381         dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1382                 obj_request, obj_request->img_request, obj_request->result,
1383                 xferred, length);
1384         /*
1385          * ENOENT means a hole in the image.  We zero-fill the
1386          * entire length of the request.  A short read also implies
1387          * zero-fill to the end of the request.  Either way we
1388          * update the xferred count to indicate the whole request
1389          * was satisfied.
1390          */
1391         rbd_assert(obj_request->type != OBJ_REQUEST_NODATA);
1392         if (obj_request->result == -ENOENT) {
1393                 if (obj_request->type == OBJ_REQUEST_BIO)
1394                         zero_bio_chain(obj_request->bio_list, 0);
1395                 else
1396                         zero_pages(obj_request->pages, 0, length);
1397                 obj_request->result = 0;
1398                 obj_request->xferred = length;
1399         } else if (xferred < length && !obj_request->result) {
1400                 if (obj_request->type == OBJ_REQUEST_BIO)
1401                         zero_bio_chain(obj_request->bio_list, xferred);
1402                 else
1403                         zero_pages(obj_request->pages, xferred, length);
1404                 obj_request->xferred = length;
1405         }
1406         obj_request_done_set(obj_request);
1407 }
1408
1409 static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1410 {
1411         dout("%s: obj %p cb %p\n", __func__, obj_request,
1412                 obj_request->callback);
1413         if (obj_request->callback)
1414                 obj_request->callback(obj_request);
1415         else
1416                 complete_all(&obj_request->completion);
1417 }
1418
1419 static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request)
1420 {
1421         dout("%s: obj %p\n", __func__, obj_request);
1422         obj_request_done_set(obj_request);
1423 }
1424
1425 static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
1426 {
1427         struct rbd_img_request *img_request = NULL;
1428         struct rbd_device *rbd_dev = NULL;
1429         bool layered = false;
1430
1431         if (obj_request_img_data_test(obj_request)) {
1432                 img_request = obj_request->img_request;
1433                 layered = img_request && img_request_layered_test(img_request);
1434                 rbd_dev = img_request->rbd_dev;
1435         }
1436
1437         dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1438                 obj_request, img_request, obj_request->result,
1439                 obj_request->xferred, obj_request->length);
1440         if (layered && obj_request->result == -ENOENT &&
1441                         obj_request->img_offset < rbd_dev->parent_overlap)
1442                 rbd_img_parent_read(obj_request);
1443         else if (img_request)
1444                 rbd_img_obj_request_read_callback(obj_request);
1445         else
1446                 obj_request_done_set(obj_request);
1447 }
1448
1449 static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
1450 {
1451         dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1452                 obj_request->result, obj_request->length);
1453         /*
1454          * There is no such thing as a successful short write.  Set
1455          * it to our originally-requested length.
1456          */
1457         obj_request->xferred = obj_request->length;
1458         obj_request_done_set(obj_request);
1459 }
1460
1461 /*
1462  * For a simple stat call there's nothing to do.  We'll do more if
1463  * this is part of a write sequence for a layered image.
1464  */
1465 static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
1466 {
1467         dout("%s: obj %p\n", __func__, obj_request);
1468         obj_request_done_set(obj_request);
1469 }
1470
1471 static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1472                                 struct ceph_msg *msg)
1473 {
1474         struct rbd_obj_request *obj_request = osd_req->r_priv;
1475         u16 opcode;
1476
1477         dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg);
1478         rbd_assert(osd_req == obj_request->osd_req);
1479         if (obj_request_img_data_test(obj_request)) {
1480                 rbd_assert(obj_request->img_request);
1481                 rbd_assert(obj_request->which != BAD_WHICH);
1482         } else {
1483                 rbd_assert(obj_request->which == BAD_WHICH);
1484         }
1485
1486         if (osd_req->r_result < 0)
1487                 obj_request->result = osd_req->r_result;
1488         obj_request->version = le64_to_cpu(osd_req->r_reassert_version.version);
1489
1490         BUG_ON(osd_req->r_num_ops > 2);
1491
1492         /*
1493          * We support a 64-bit length, but ultimately it has to be
1494          * passed to blk_end_request(), which takes an unsigned int.
1495          */
1496         obj_request->xferred = osd_req->r_reply_op_len[0];
1497         rbd_assert(obj_request->xferred < (u64)UINT_MAX);
1498         opcode = osd_req->r_ops[0].op;
1499         switch (opcode) {
1500         case CEPH_OSD_OP_READ:
1501                 rbd_osd_read_callback(obj_request);
1502                 break;
1503         case CEPH_OSD_OP_WRITE:
1504                 rbd_osd_write_callback(obj_request);
1505                 break;
1506         case CEPH_OSD_OP_STAT:
1507                 rbd_osd_stat_callback(obj_request);
1508                 break;
1509         case CEPH_OSD_OP_CALL:
1510         case CEPH_OSD_OP_NOTIFY_ACK:
1511         case CEPH_OSD_OP_WATCH:
1512                 rbd_osd_trivial_callback(obj_request);
1513                 break;
1514         default:
1515                 rbd_warn(NULL, "%s: unsupported op %hu\n",
1516                         obj_request->object_name, (unsigned short) opcode);
1517                 break;
1518         }
1519
1520         if (obj_request_done_test(obj_request))
1521                 rbd_obj_request_complete(obj_request);
1522 }
1523
1524 static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
1525 {
1526         struct rbd_img_request *img_request = obj_request->img_request;
1527         struct ceph_osd_request *osd_req = obj_request->osd_req;
1528         u64 snap_id;
1529
1530         rbd_assert(osd_req != NULL);
1531
1532         snap_id = img_request ? img_request->snap_id : CEPH_NOSNAP;
1533         ceph_osdc_build_request(osd_req, obj_request->offset,
1534                         NULL, snap_id, NULL);
1535 }
1536
1537 static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
1538 {
1539         struct rbd_img_request *img_request = obj_request->img_request;
1540         struct ceph_osd_request *osd_req = obj_request->osd_req;
1541         struct ceph_snap_context *snapc;
1542         struct timespec mtime = CURRENT_TIME;
1543
1544         rbd_assert(osd_req != NULL);
1545
1546         snapc = img_request ? img_request->snapc : NULL;
1547         ceph_osdc_build_request(osd_req, obj_request->offset,
1548                         snapc, CEPH_NOSNAP, &mtime);
1549 }
1550
1551 static struct ceph_osd_request *rbd_osd_req_create(
1552                                         struct rbd_device *rbd_dev,
1553                                         bool write_request,
1554                                         struct rbd_obj_request *obj_request)
1555 {
1556         struct ceph_snap_context *snapc = NULL;
1557         struct ceph_osd_client *osdc;
1558         struct ceph_osd_request *osd_req;
1559
1560         if (obj_request_img_data_test(obj_request)) {
1561                 struct rbd_img_request *img_request = obj_request->img_request;
1562
1563                 rbd_assert(write_request ==
1564                                 img_request_write_test(img_request));
1565                 if (write_request)
1566                         snapc = img_request->snapc;
1567         }
1568
1569         /* Allocate and initialize the request, for the single op */
1570
1571         osdc = &rbd_dev->rbd_client->client->osdc;
1572         osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_ATOMIC);
1573         if (!osd_req)
1574                 return NULL;    /* ENOMEM */
1575
1576         if (write_request)
1577                 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1578         else
1579                 osd_req->r_flags = CEPH_OSD_FLAG_READ;
1580
1581         osd_req->r_callback = rbd_osd_req_callback;
1582         osd_req->r_priv = obj_request;
1583
1584         osd_req->r_oid_len = strlen(obj_request->object_name);
1585         rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1586         memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1587
1588         osd_req->r_file_layout = rbd_dev->layout;       /* struct */
1589
1590         return osd_req;
1591 }
1592
1593 /*
1594  * Create a copyup osd request based on the information in the
1595  * object request supplied.  A copyup request has two osd ops,
1596  * a copyup method call, and a "normal" write request.
1597  */
1598 static struct ceph_osd_request *
1599 rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
1600 {
1601         struct rbd_img_request *img_request;
1602         struct ceph_snap_context *snapc;
1603         struct rbd_device *rbd_dev;
1604         struct ceph_osd_client *osdc;
1605         struct ceph_osd_request *osd_req;
1606
1607         rbd_assert(obj_request_img_data_test(obj_request));
1608         img_request = obj_request->img_request;
1609         rbd_assert(img_request);
1610         rbd_assert(img_request_write_test(img_request));
1611
1612         /* Allocate and initialize the request, for the two ops */
1613
1614         snapc = img_request->snapc;
1615         rbd_dev = img_request->rbd_dev;
1616         osdc = &rbd_dev->rbd_client->client->osdc;
1617         osd_req = ceph_osdc_alloc_request(osdc, snapc, 2, false, GFP_ATOMIC);
1618         if (!osd_req)
1619                 return NULL;    /* ENOMEM */
1620
1621         osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1622         osd_req->r_callback = rbd_osd_req_callback;
1623         osd_req->r_priv = obj_request;
1624
1625         osd_req->r_oid_len = strlen(obj_request->object_name);
1626         rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1627         memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1628
1629         osd_req->r_file_layout = rbd_dev->layout;       /* struct */
1630
1631         return osd_req;
1632 }
1633
1634
1635 static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1636 {
1637         ceph_osdc_put_request(osd_req);
1638 }
1639
1640 /* object_name is assumed to be a non-null pointer and NUL-terminated */
1641
1642 static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
1643                                                 u64 offset, u64 length,
1644                                                 enum obj_request_type type)
1645 {
1646         struct rbd_obj_request *obj_request;
1647         size_t size;
1648         char *name;
1649
1650         rbd_assert(obj_request_type_valid(type));
1651
1652         size = strlen(object_name) + 1;
1653         obj_request = kzalloc(sizeof (*obj_request) + size, GFP_KERNEL);
1654         if (!obj_request)
1655                 return NULL;
1656
1657         name = (char *)(obj_request + 1);
1658         obj_request->object_name = memcpy(name, object_name, size);
1659         obj_request->offset = offset;
1660         obj_request->length = length;
1661         obj_request->flags = 0;
1662         obj_request->which = BAD_WHICH;
1663         obj_request->type = type;
1664         INIT_LIST_HEAD(&obj_request->links);
1665         init_completion(&obj_request->completion);
1666         kref_init(&obj_request->kref);
1667
1668         dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__, object_name,
1669                 offset, length, (int)type, obj_request);
1670
1671         return obj_request;
1672 }
1673
1674 static void rbd_obj_request_destroy(struct kref *kref)
1675 {
1676         struct rbd_obj_request *obj_request;
1677
1678         obj_request = container_of(kref, struct rbd_obj_request, kref);
1679
1680         dout("%s: obj %p\n", __func__, obj_request);
1681
1682         rbd_assert(obj_request->img_request == NULL);
1683         rbd_assert(obj_request->which == BAD_WHICH);
1684
1685         if (obj_request->osd_req)
1686                 rbd_osd_req_destroy(obj_request->osd_req);
1687
1688         rbd_assert(obj_request_type_valid(obj_request->type));
1689         switch (obj_request->type) {
1690         case OBJ_REQUEST_NODATA:
1691                 break;          /* Nothing to do */
1692         case OBJ_REQUEST_BIO:
1693                 if (obj_request->bio_list)
1694                         bio_chain_put(obj_request->bio_list);
1695                 break;
1696         case OBJ_REQUEST_PAGES:
1697                 if (obj_request->pages)
1698                         ceph_release_page_vector(obj_request->pages,
1699                                                 obj_request->page_count);
1700                 break;
1701         }
1702
1703         kfree(obj_request);
1704 }
1705
1706 /*
1707  * Caller is responsible for filling in the list of object requests
1708  * that comprises the image request, and the Linux request pointer
1709  * (if there is one).
1710  */
1711 static struct rbd_img_request *rbd_img_request_create(
1712                                         struct rbd_device *rbd_dev,
1713                                         u64 offset, u64 length,
1714                                         bool write_request,
1715                                         bool child_request)
1716 {
1717         struct rbd_img_request *img_request;
1718
1719         img_request = kmalloc(sizeof (*img_request), GFP_ATOMIC);
1720         if (!img_request)
1721                 return NULL;
1722
1723         if (write_request) {
1724                 down_read(&rbd_dev->header_rwsem);
1725                 ceph_get_snap_context(rbd_dev->header.snapc);
1726                 up_read(&rbd_dev->header_rwsem);
1727         }
1728
1729         img_request->rq = NULL;
1730         img_request->rbd_dev = rbd_dev;
1731         img_request->offset = offset;
1732         img_request->length = length;
1733         img_request->flags = 0;
1734         if (write_request) {
1735                 img_request_write_set(img_request);
1736                 img_request->snapc = rbd_dev->header.snapc;
1737         } else {
1738                 img_request->snap_id = rbd_dev->spec->snap_id;
1739         }
1740         if (child_request)
1741                 img_request_child_set(img_request);
1742         if (rbd_dev->parent_spec)
1743                 img_request_layered_set(img_request);
1744         spin_lock_init(&img_request->completion_lock);
1745         img_request->next_completion = 0;
1746         img_request->callback = NULL;
1747         img_request->result = 0;
1748         img_request->obj_request_count = 0;
1749         INIT_LIST_HEAD(&img_request->obj_requests);
1750         kref_init(&img_request->kref);
1751
1752         rbd_img_request_get(img_request);       /* Avoid a warning */
1753         rbd_img_request_put(img_request);       /* TEMPORARY */
1754
1755         dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
1756                 write_request ? "write" : "read", offset, length,
1757                 img_request);
1758
1759         return img_request;
1760 }
1761
1762 static void rbd_img_request_destroy(struct kref *kref)
1763 {
1764         struct rbd_img_request *img_request;
1765         struct rbd_obj_request *obj_request;
1766         struct rbd_obj_request *next_obj_request;
1767
1768         img_request = container_of(kref, struct rbd_img_request, kref);
1769
1770         dout("%s: img %p\n", __func__, img_request);
1771
1772         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1773                 rbd_img_obj_request_del(img_request, obj_request);
1774         rbd_assert(img_request->obj_request_count == 0);
1775
1776         if (img_request_write_test(img_request))
1777                 ceph_put_snap_context(img_request->snapc);
1778
1779         if (img_request_child_test(img_request))
1780                 rbd_obj_request_put(img_request->obj_request);
1781
1782         kfree(img_request);
1783 }
1784
1785 static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
1786 {
1787         struct rbd_img_request *img_request;
1788         unsigned int xferred;
1789         int result;
1790         bool more;
1791
1792         rbd_assert(obj_request_img_data_test(obj_request));
1793         img_request = obj_request->img_request;
1794
1795         rbd_assert(obj_request->xferred <= (u64)UINT_MAX);
1796         xferred = (unsigned int)obj_request->xferred;
1797         result = obj_request->result;
1798         if (result) {
1799                 struct rbd_device *rbd_dev = img_request->rbd_dev;
1800
1801                 rbd_warn(rbd_dev, "%s %llx at %llx (%llx)\n",
1802                         img_request_write_test(img_request) ? "write" : "read",
1803                         obj_request->length, obj_request->img_offset,
1804                         obj_request->offset);
1805                 rbd_warn(rbd_dev, "  result %d xferred %x\n",
1806                         result, xferred);
1807                 if (!img_request->result)
1808                         img_request->result = result;
1809         }
1810
1811         /* Image object requests don't own their page array */
1812
1813         if (obj_request->type == OBJ_REQUEST_PAGES) {
1814                 obj_request->pages = NULL;
1815                 obj_request->page_count = 0;
1816         }
1817
1818         if (img_request_child_test(img_request)) {
1819                 rbd_assert(img_request->obj_request != NULL);
1820                 more = obj_request->which < img_request->obj_request_count - 1;
1821         } else {
1822                 rbd_assert(img_request->rq != NULL);
1823                 more = blk_end_request(img_request->rq, result, xferred);
1824         }
1825
1826         return more;
1827 }
1828
1829 static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
1830 {
1831         struct rbd_img_request *img_request;
1832         u32 which = obj_request->which;
1833         bool more = true;
1834
1835         rbd_assert(obj_request_img_data_test(obj_request));
1836         img_request = obj_request->img_request;
1837
1838         dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
1839         rbd_assert(img_request != NULL);
1840         rbd_assert(img_request->obj_request_count > 0);
1841         rbd_assert(which != BAD_WHICH);
1842         rbd_assert(which < img_request->obj_request_count);
1843         rbd_assert(which >= img_request->next_completion);
1844
1845         spin_lock_irq(&img_request->completion_lock);
1846         if (which != img_request->next_completion)
1847                 goto out;
1848
1849         for_each_obj_request_from(img_request, obj_request) {
1850                 rbd_assert(more);
1851                 rbd_assert(which < img_request->obj_request_count);
1852
1853                 if (!obj_request_done_test(obj_request))
1854                         break;
1855                 more = rbd_img_obj_end_request(obj_request);
1856                 which++;
1857         }
1858
1859         rbd_assert(more ^ (which == img_request->obj_request_count));
1860         img_request->next_completion = which;
1861 out:
1862         spin_unlock_irq(&img_request->completion_lock);
1863
1864         if (!more)
1865                 rbd_img_request_complete(img_request);
1866 }
1867
1868 /*
1869  * Split up an image request into one or more object requests, each
1870  * to a different object.  The "type" parameter indicates whether
1871  * "data_desc" is the pointer to the head of a list of bio
1872  * structures, or the base of a page array.  In either case this
1873  * function assumes data_desc describes memory sufficient to hold
1874  * all data described by the image request.
1875  */
1876 static int rbd_img_request_fill(struct rbd_img_request *img_request,
1877                                         enum obj_request_type type,
1878                                         void *data_desc)
1879 {
1880         struct rbd_device *rbd_dev = img_request->rbd_dev;
1881         struct rbd_obj_request *obj_request = NULL;
1882         struct rbd_obj_request *next_obj_request;
1883         bool write_request = img_request_write_test(img_request);
1884         struct bio *bio_list;
1885         unsigned int bio_offset = 0;
1886         struct page **pages;
1887         u64 img_offset;
1888         u64 resid;
1889         u16 opcode;
1890
1891         dout("%s: img %p type %d data_desc %p\n", __func__, img_request,
1892                 (int)type, data_desc);
1893
1894         opcode = write_request ? CEPH_OSD_OP_WRITE : CEPH_OSD_OP_READ;
1895         img_offset = img_request->offset;
1896         resid = img_request->length;
1897         rbd_assert(resid > 0);
1898
1899         if (type == OBJ_REQUEST_BIO) {
1900                 bio_list = data_desc;
1901                 rbd_assert(img_offset == bio_list->bi_sector << SECTOR_SHIFT);
1902         } else {
1903                 rbd_assert(type == OBJ_REQUEST_PAGES);
1904                 pages = data_desc;
1905         }
1906
1907         while (resid) {
1908                 struct ceph_osd_request *osd_req;
1909                 const char *object_name;
1910                 u64 offset;
1911                 u64 length;
1912
1913                 object_name = rbd_segment_name(rbd_dev, img_offset);
1914                 if (!object_name)
1915                         goto out_unwind;
1916                 offset = rbd_segment_offset(rbd_dev, img_offset);
1917                 length = rbd_segment_length(rbd_dev, img_offset, resid);
1918                 obj_request = rbd_obj_request_create(object_name,
1919                                                 offset, length, type);
1920                 kfree(object_name);     /* object request has its own copy */
1921                 if (!obj_request)
1922                         goto out_unwind;
1923
1924                 if (type == OBJ_REQUEST_BIO) {
1925                         unsigned int clone_size;
1926
1927                         rbd_assert(length <= (u64)UINT_MAX);
1928                         clone_size = (unsigned int)length;
1929                         obj_request->bio_list =
1930                                         bio_chain_clone_range(&bio_list,
1931                                                                 &bio_offset,
1932                                                                 clone_size,
1933                                                                 GFP_ATOMIC);
1934                         if (!obj_request->bio_list)
1935                                 goto out_partial;
1936                 } else {
1937                         unsigned int page_count;
1938
1939                         obj_request->pages = pages;
1940                         page_count = (u32)calc_pages_for(offset, length);
1941                         obj_request->page_count = page_count;
1942                         if ((offset + length) & ~PAGE_MASK)
1943                                 page_count--;   /* more on last page */
1944                         pages += page_count;
1945                 }
1946
1947                 osd_req = rbd_osd_req_create(rbd_dev, write_request,
1948                                                 obj_request);
1949                 if (!osd_req)
1950                         goto out_partial;
1951                 obj_request->osd_req = osd_req;
1952                 obj_request->callback = rbd_img_obj_callback;
1953
1954                 osd_req_op_extent_init(osd_req, 0, opcode, offset, length,
1955                                                 0, 0);
1956                 if (type == OBJ_REQUEST_BIO)
1957                         osd_req_op_extent_osd_data_bio(osd_req, 0,
1958                                         obj_request->bio_list, length);
1959                 else
1960                         osd_req_op_extent_osd_data_pages(osd_req, 0,
1961                                         obj_request->pages, length,
1962                                         offset & ~PAGE_MASK, false, false);
1963
1964                 if (write_request)
1965                         rbd_osd_req_format_write(obj_request);
1966                 else
1967                         rbd_osd_req_format_read(obj_request);
1968
1969                 obj_request->img_offset = img_offset;
1970                 rbd_img_obj_request_add(img_request, obj_request);
1971
1972                 img_offset += length;
1973                 resid -= length;
1974         }
1975
1976         return 0;
1977
1978 out_partial:
1979         rbd_obj_request_put(obj_request);
1980 out_unwind:
1981         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1982                 rbd_obj_request_put(obj_request);
1983
1984         return -ENOMEM;
1985 }
1986
1987 static void
1988 rbd_img_obj_copyup_callback(struct rbd_obj_request *obj_request)
1989 {
1990         struct rbd_img_request *img_request;
1991         struct rbd_device *rbd_dev;
1992         u64 length;
1993         u32 page_count;
1994
1995         rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
1996         rbd_assert(obj_request_img_data_test(obj_request));
1997         img_request = obj_request->img_request;
1998         rbd_assert(img_request);
1999
2000         rbd_dev = img_request->rbd_dev;
2001         rbd_assert(rbd_dev);
2002         length = (u64)1 << rbd_dev->header.obj_order;
2003         page_count = (u32)calc_pages_for(0, length);
2004
2005         rbd_assert(obj_request->copyup_pages);
2006         ceph_release_page_vector(obj_request->copyup_pages, page_count);
2007         obj_request->copyup_pages = NULL;
2008
2009         /*
2010          * We want the transfer count to reflect the size of the
2011          * original write request.  There is no such thing as a
2012          * successful short write, so if the request was successful
2013          * we can just set it to the originally-requested length.
2014          */
2015         if (!obj_request->result)
2016                 obj_request->xferred = obj_request->length;
2017
2018         /* Finish up with the normal image object callback */
2019
2020         rbd_img_obj_callback(obj_request);
2021 }
2022
2023 static void
2024 rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
2025 {
2026         struct rbd_obj_request *orig_request;
2027         struct ceph_osd_request *osd_req;
2028         struct ceph_osd_client *osdc;
2029         struct rbd_device *rbd_dev;
2030         struct page **pages;
2031         int result;
2032         u64 obj_size;
2033         u64 xferred;
2034
2035         rbd_assert(img_request_child_test(img_request));
2036
2037         /* First get what we need from the image request */
2038
2039         pages = img_request->copyup_pages;
2040         rbd_assert(pages != NULL);
2041         img_request->copyup_pages = NULL;
2042
2043         orig_request = img_request->obj_request;
2044         rbd_assert(orig_request != NULL);
2045         rbd_assert(orig_request->type == OBJ_REQUEST_BIO);
2046         result = img_request->result;
2047         obj_size = img_request->length;
2048         xferred = img_request->xferred;
2049
2050         rbd_dev = img_request->rbd_dev;
2051         rbd_assert(rbd_dev);
2052         rbd_assert(obj_size == (u64)1 << rbd_dev->header.obj_order);
2053
2054         rbd_img_request_put(img_request);
2055
2056         if (result)
2057                 goto out_err;
2058
2059         /* Allocate the new copyup osd request for the original request */
2060
2061         result = -ENOMEM;
2062         rbd_assert(!orig_request->osd_req);
2063         osd_req = rbd_osd_req_create_copyup(orig_request);
2064         if (!osd_req)
2065                 goto out_err;
2066         orig_request->osd_req = osd_req;
2067         orig_request->copyup_pages = pages;
2068
2069         /* Initialize the copyup op */
2070
2071         osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup");
2072         osd_req_op_cls_request_data_pages(osd_req, 0, pages, obj_size, 0,
2073                                                 false, false);
2074
2075         /* Then the original write request op */
2076
2077         osd_req_op_extent_init(osd_req, 1, CEPH_OSD_OP_WRITE,
2078                                         orig_request->offset,
2079                                         orig_request->length, 0, 0);
2080         osd_req_op_extent_osd_data_bio(osd_req, 1, orig_request->bio_list,
2081                                         orig_request->length);
2082
2083         rbd_osd_req_format_write(orig_request);
2084
2085         /* All set, send it off. */
2086
2087         orig_request->callback = rbd_img_obj_copyup_callback;
2088         osdc = &rbd_dev->rbd_client->client->osdc;
2089         result = rbd_obj_request_submit(osdc, orig_request);
2090         if (!result)
2091                 return;
2092 out_err:
2093         /* Record the error code and complete the request */
2094
2095         orig_request->result = result;
2096         orig_request->xferred = 0;
2097         obj_request_done_set(orig_request);
2098         rbd_obj_request_complete(orig_request);
2099 }
2100
2101 /*
2102  * Read from the parent image the range of data that covers the
2103  * entire target of the given object request.  This is used for
2104  * satisfying a layered image write request when the target of an
2105  * object request from the image request does not exist.
2106  *
2107  * A page array big enough to hold the returned data is allocated
2108  * and supplied to rbd_img_request_fill() as the "data descriptor."
2109  * When the read completes, this page array will be transferred to
2110  * the original object request for the copyup operation.
2111  *
2112  * If an error occurs, record it as the result of the original
2113  * object request and mark it done so it gets completed.
2114  */
2115 static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
2116 {
2117         struct rbd_img_request *img_request = NULL;
2118         struct rbd_img_request *parent_request = NULL;
2119         struct rbd_device *rbd_dev;
2120         u64 img_offset;
2121         u64 length;
2122         struct page **pages = NULL;
2123         u32 page_count;
2124         int result;
2125
2126         rbd_assert(obj_request_img_data_test(obj_request));
2127         rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2128
2129         img_request = obj_request->img_request;
2130         rbd_assert(img_request != NULL);
2131         rbd_dev = img_request->rbd_dev;
2132         rbd_assert(rbd_dev->parent != NULL);
2133
2134         /*
2135          * First things first.  The original osd request is of no
2136          * use to use any more, we'll need a new one that can hold
2137          * the two ops in a copyup request.  We'll get that later,
2138          * but for now we can release the old one.
2139          */
2140         rbd_osd_req_destroy(obj_request->osd_req);
2141         obj_request->osd_req = NULL;
2142
2143         /*
2144          * Determine the byte range covered by the object in the
2145          * child image to which the original request was to be sent.
2146          */
2147         img_offset = obj_request->img_offset - obj_request->offset;
2148         length = (u64)1 << rbd_dev->header.obj_order;
2149
2150         /*
2151          * There is no defined parent data beyond the parent
2152          * overlap, so limit what we read at that boundary if
2153          * necessary.
2154          */
2155         if (img_offset + length > rbd_dev->parent_overlap) {
2156                 rbd_assert(img_offset < rbd_dev->parent_overlap);
2157                 length = rbd_dev->parent_overlap - img_offset;
2158         }
2159
2160         /*
2161          * Allocate a page array big enough to receive the data read
2162          * from the parent.
2163          */
2164         page_count = (u32)calc_pages_for(0, length);
2165         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2166         if (IS_ERR(pages)) {
2167                 result = PTR_ERR(pages);
2168                 pages = NULL;
2169                 goto out_err;
2170         }
2171
2172         result = -ENOMEM;
2173         parent_request = rbd_img_request_create(rbd_dev->parent,
2174                                                 img_offset, length,
2175                                                 false, true);
2176         if (!parent_request)
2177                 goto out_err;
2178         rbd_obj_request_get(obj_request);
2179         parent_request->obj_request = obj_request;
2180
2181         result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages);
2182         if (result)
2183                 goto out_err;
2184         parent_request->copyup_pages = pages;
2185
2186         parent_request->callback = rbd_img_obj_parent_read_full_callback;
2187         result = rbd_img_request_submit(parent_request);
2188         if (!result)
2189                 return 0;
2190
2191         parent_request->copyup_pages = NULL;
2192         parent_request->obj_request = NULL;
2193         rbd_obj_request_put(obj_request);
2194 out_err:
2195         if (pages)
2196                 ceph_release_page_vector(pages, page_count);
2197         if (parent_request)
2198                 rbd_img_request_put(parent_request);
2199         obj_request->result = result;
2200         obj_request->xferred = 0;
2201         obj_request_done_set(obj_request);
2202
2203         return result;
2204 }
2205
2206 static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
2207 {
2208         struct rbd_obj_request *orig_request;
2209         int result;
2210
2211         rbd_assert(!obj_request_img_data_test(obj_request));
2212
2213         /*
2214          * All we need from the object request is the original
2215          * request and the result of the STAT op.  Grab those, then
2216          * we're done with the request.
2217          */
2218         orig_request = obj_request->obj_request;
2219         obj_request->obj_request = NULL;
2220         rbd_assert(orig_request);
2221         rbd_assert(orig_request->img_request);
2222
2223         result = obj_request->result;
2224         obj_request->result = 0;
2225
2226         dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__,
2227                 obj_request, orig_request, result,
2228                 obj_request->xferred, obj_request->length);
2229         rbd_obj_request_put(obj_request);
2230
2231         rbd_assert(orig_request);
2232         rbd_assert(orig_request->img_request);
2233
2234         /*
2235          * Our only purpose here is to determine whether the object
2236          * exists, and we don't want to treat the non-existence as
2237          * an error.  If something else comes back, transfer the
2238          * error to the original request and complete it now.
2239          */
2240         if (!result) {
2241                 obj_request_existence_set(orig_request, true);
2242         } else if (result == -ENOENT) {
2243                 obj_request_existence_set(orig_request, false);
2244         } else if (result) {
2245                 orig_request->result = result;
2246                 goto out;
2247         }
2248
2249         /*
2250          * Resubmit the original request now that we have recorded
2251          * whether the target object exists.
2252          */
2253         orig_request->result = rbd_img_obj_request_submit(orig_request);
2254 out:
2255         if (orig_request->result)
2256                 rbd_obj_request_complete(orig_request);
2257         rbd_obj_request_put(orig_request);
2258 }
2259
2260 static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
2261 {
2262         struct rbd_obj_request *stat_request;
2263         struct rbd_device *rbd_dev;
2264         struct ceph_osd_client *osdc;
2265         struct page **pages = NULL;
2266         u32 page_count;
2267         size_t size;
2268         int ret;
2269
2270         /*
2271          * The response data for a STAT call consists of:
2272          *     le64 length;
2273          *     struct {
2274          *         le32 tv_sec;
2275          *         le32 tv_nsec;
2276          *     } mtime;
2277          */
2278         size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32);
2279         page_count = (u32)calc_pages_for(0, size);
2280         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2281         if (IS_ERR(pages))
2282                 return PTR_ERR(pages);
2283
2284         ret = -ENOMEM;
2285         stat_request = rbd_obj_request_create(obj_request->object_name, 0, 0,
2286                                                         OBJ_REQUEST_PAGES);
2287         if (!stat_request)
2288                 goto out;
2289
2290         rbd_obj_request_get(obj_request);
2291         stat_request->obj_request = obj_request;
2292         stat_request->pages = pages;
2293         stat_request->page_count = page_count;
2294
2295         rbd_assert(obj_request->img_request);
2296         rbd_dev = obj_request->img_request->rbd_dev;
2297         stat_request->osd_req = rbd_osd_req_create(rbd_dev, false,
2298                                                 stat_request);
2299         if (!stat_request->osd_req)
2300                 goto out;
2301         stat_request->callback = rbd_img_obj_exists_callback;
2302
2303         osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT);
2304         osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0,
2305                                         false, false);
2306         rbd_osd_req_format_read(stat_request);
2307
2308         osdc = &rbd_dev->rbd_client->client->osdc;
2309         ret = rbd_obj_request_submit(osdc, stat_request);
2310 out:
2311         if (ret)
2312                 rbd_obj_request_put(obj_request);
2313
2314         return ret;
2315 }
2316
2317 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
2318 {
2319         struct rbd_img_request *img_request;
2320         struct rbd_device *rbd_dev;
2321         bool known;
2322
2323         rbd_assert(obj_request_img_data_test(obj_request));
2324
2325         img_request = obj_request->img_request;
2326         rbd_assert(img_request);
2327         rbd_dev = img_request->rbd_dev;
2328
2329         /*
2330          * Only writes to layered images need special handling.
2331          * Reads and non-layered writes are simple object requests.
2332          * Layered writes that start beyond the end of the overlap
2333          * with the parent have no parent data, so they too are
2334          * simple object requests.  Finally, if the target object is
2335          * known to already exist, its parent data has already been
2336          * copied, so a write to the object can also be handled as a
2337          * simple object request.
2338          */
2339         if (!img_request_write_test(img_request) ||
2340                 !img_request_layered_test(img_request) ||
2341                 rbd_dev->parent_overlap <= obj_request->img_offset ||
2342                 ((known = obj_request_known_test(obj_request)) &&
2343                         obj_request_exists_test(obj_request))) {
2344
2345                 struct rbd_device *rbd_dev;
2346                 struct ceph_osd_client *osdc;
2347
2348                 rbd_dev = obj_request->img_request->rbd_dev;
2349                 osdc = &rbd_dev->rbd_client->client->osdc;
2350
2351                 return rbd_obj_request_submit(osdc, obj_request);
2352         }
2353
2354         /*
2355          * It's a layered write.  The target object might exist but
2356          * we may not know that yet.  If we know it doesn't exist,
2357          * start by reading the data for the full target object from
2358          * the parent so we can use it for a copyup to the target.
2359          */
2360         if (known)
2361                 return rbd_img_obj_parent_read_full(obj_request);
2362
2363         /* We don't know whether the target exists.  Go find out. */
2364
2365         return rbd_img_obj_exists_submit(obj_request);
2366 }
2367
2368 static int rbd_img_request_submit(struct rbd_img_request *img_request)
2369 {
2370         struct rbd_obj_request *obj_request;
2371         struct rbd_obj_request *next_obj_request;
2372
2373         dout("%s: img %p\n", __func__, img_request);
2374         for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
2375                 int ret;
2376
2377                 ret = rbd_img_obj_request_submit(obj_request);
2378                 if (ret)
2379                         return ret;
2380         }
2381
2382         return 0;
2383 }
2384
2385 static void rbd_img_parent_read_callback(struct rbd_img_request *img_request)
2386 {
2387         struct rbd_obj_request *obj_request;
2388         struct rbd_device *rbd_dev;
2389         u64 obj_end;
2390
2391         rbd_assert(img_request_child_test(img_request));
2392
2393         obj_request = img_request->obj_request;
2394         rbd_assert(obj_request);
2395         rbd_assert(obj_request->img_request);
2396
2397         obj_request->result = img_request->result;
2398         if (obj_request->result)
2399                 goto out;
2400
2401         /*
2402          * We need to zero anything beyond the parent overlap
2403          * boundary.  Since rbd_img_obj_request_read_callback()
2404          * will zero anything beyond the end of a short read, an
2405          * easy way to do this is to pretend the data from the
2406          * parent came up short--ending at the overlap boundary.
2407          */
2408         rbd_assert(obj_request->img_offset < U64_MAX - obj_request->length);
2409         obj_end = obj_request->img_offset + obj_request->length;
2410         rbd_dev = obj_request->img_request->rbd_dev;
2411         if (obj_end > rbd_dev->parent_overlap) {
2412                 u64 xferred = 0;
2413
2414                 if (obj_request->img_offset < rbd_dev->parent_overlap)
2415                         xferred = rbd_dev->parent_overlap -
2416                                         obj_request->img_offset;
2417
2418                 obj_request->xferred = min(img_request->xferred, xferred);
2419         } else {
2420                 obj_request->xferred = img_request->xferred;
2421         }
2422 out:
2423         rbd_img_obj_request_read_callback(obj_request);
2424         rbd_obj_request_complete(obj_request);
2425 }
2426
2427 static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
2428 {
2429         struct rbd_device *rbd_dev;
2430         struct rbd_img_request *img_request;
2431         int result;
2432
2433         rbd_assert(obj_request_img_data_test(obj_request));
2434         rbd_assert(obj_request->img_request != NULL);
2435         rbd_assert(obj_request->result == (s32) -ENOENT);
2436         rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2437
2438         rbd_dev = obj_request->img_request->rbd_dev;
2439         rbd_assert(rbd_dev->parent != NULL);
2440         /* rbd_read_finish(obj_request, obj_request->length); */
2441         img_request = rbd_img_request_create(rbd_dev->parent,
2442                                                 obj_request->img_offset,
2443                                                 obj_request->length,
2444                                                 false, true);
2445         result = -ENOMEM;
2446         if (!img_request)
2447                 goto out_err;
2448
2449         rbd_obj_request_get(obj_request);
2450         img_request->obj_request = obj_request;
2451
2452         result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2453                                         obj_request->bio_list);
2454         if (result)
2455                 goto out_err;
2456
2457         img_request->callback = rbd_img_parent_read_callback;
2458         result = rbd_img_request_submit(img_request);
2459         if (result)
2460                 goto out_err;
2461
2462         return;
2463 out_err:
2464         if (img_request)
2465                 rbd_img_request_put(img_request);
2466         obj_request->result = result;
2467         obj_request->xferred = 0;
2468         obj_request_done_set(obj_request);
2469 }
2470
2471 static int rbd_obj_notify_ack(struct rbd_device *rbd_dev,
2472                                    u64 ver, u64 notify_id)
2473 {
2474         struct rbd_obj_request *obj_request;
2475         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2476         int ret;
2477
2478         obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2479                                                         OBJ_REQUEST_NODATA);
2480         if (!obj_request)
2481                 return -ENOMEM;
2482
2483         ret = -ENOMEM;
2484         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2485         if (!obj_request->osd_req)
2486                 goto out;
2487         obj_request->callback = rbd_obj_request_put;
2488
2489         osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_NOTIFY_ACK,
2490                                         notify_id, ver, 0);
2491         rbd_osd_req_format_read(obj_request);
2492
2493         ret = rbd_obj_request_submit(osdc, obj_request);
2494 out:
2495         if (ret)
2496                 rbd_obj_request_put(obj_request);
2497
2498         return ret;
2499 }
2500
2501 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
2502 {
2503         struct rbd_device *rbd_dev = (struct rbd_device *)data;
2504         u64 hver;
2505
2506         if (!rbd_dev)
2507                 return;
2508
2509         dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
2510                 rbd_dev->header_name, (unsigned long long) notify_id,
2511                 (unsigned int) opcode);
2512         (void)rbd_dev_refresh(rbd_dev, &hver);
2513
2514         rbd_obj_notify_ack(rbd_dev, hver, notify_id);
2515 }
2516
2517 /*
2518  * Request sync osd watch/unwatch.  The value of "start" determines
2519  * whether a watch request is being initiated or torn down.
2520  */
2521 static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, int start)
2522 {
2523         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2524         struct rbd_obj_request *obj_request;
2525         int ret;
2526
2527         rbd_assert(start ^ !!rbd_dev->watch_event);
2528         rbd_assert(start ^ !!rbd_dev->watch_request);
2529
2530         if (start) {
2531                 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
2532                                                 &rbd_dev->watch_event);
2533                 if (ret < 0)
2534                         return ret;
2535                 rbd_assert(rbd_dev->watch_event != NULL);
2536         }
2537
2538         ret = -ENOMEM;
2539         obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2540                                                         OBJ_REQUEST_NODATA);
2541         if (!obj_request)
2542                 goto out_cancel;
2543
2544         obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, obj_request);
2545         if (!obj_request->osd_req)
2546                 goto out_cancel;
2547
2548         if (start)
2549                 ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
2550         else
2551                 ceph_osdc_unregister_linger_request(osdc,
2552                                         rbd_dev->watch_request->osd_req);
2553
2554         osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
2555                                 rbd_dev->watch_event->cookie, 0, start);
2556         rbd_osd_req_format_write(obj_request);
2557
2558         ret = rbd_obj_request_submit(osdc, obj_request);
2559         if (ret)
2560                 goto out_cancel;
2561         ret = rbd_obj_request_wait(obj_request);
2562         if (ret)
2563                 goto out_cancel;
2564         ret = obj_request->result;
2565         if (ret)
2566                 goto out_cancel;
2567
2568         /*
2569          * A watch request is set to linger, so the underlying osd
2570          * request won't go away until we unregister it.  We retain
2571          * a pointer to the object request during that time (in
2572          * rbd_dev->watch_request), so we'll keep a reference to
2573          * it.  We'll drop that reference (below) after we've
2574          * unregistered it.
2575          */
2576         if (start) {
2577                 rbd_dev->watch_request = obj_request;
2578
2579                 return 0;
2580         }
2581
2582         /* We have successfully torn down the watch request */
2583
2584         rbd_obj_request_put(rbd_dev->watch_request);
2585         rbd_dev->watch_request = NULL;
2586 out_cancel:
2587         /* Cancel the event if we're tearing down, or on error */
2588         ceph_osdc_cancel_event(rbd_dev->watch_event);
2589         rbd_dev->watch_event = NULL;
2590         if (obj_request)
2591                 rbd_obj_request_put(obj_request);
2592
2593         return ret;
2594 }
2595
2596 /*
2597  * Synchronous osd object method call.  Returns the number of bytes
2598  * returned in the outbound buffer, or a negative error code.
2599  */
2600 static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
2601                              const char *object_name,
2602                              const char *class_name,
2603                              const char *method_name,
2604                              const void *outbound,
2605                              size_t outbound_size,
2606                              void *inbound,
2607                              size_t inbound_size,
2608                              u64 *version)
2609 {
2610         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2611         struct rbd_obj_request *obj_request;
2612         struct page **pages;
2613         u32 page_count;
2614         int ret;
2615
2616         /*
2617          * Method calls are ultimately read operations.  The result
2618          * should placed into the inbound buffer provided.  They
2619          * also supply outbound data--parameters for the object
2620          * method.  Currently if this is present it will be a
2621          * snapshot id.
2622          */
2623         page_count = (u32)calc_pages_for(0, inbound_size);
2624         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2625         if (IS_ERR(pages))
2626                 return PTR_ERR(pages);
2627
2628         ret = -ENOMEM;
2629         obj_request = rbd_obj_request_create(object_name, 0, inbound_size,
2630                                                         OBJ_REQUEST_PAGES);
2631         if (!obj_request)
2632                 goto out;
2633
2634         obj_request->pages = pages;
2635         obj_request->page_count = page_count;
2636
2637         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2638         if (!obj_request->osd_req)
2639                 goto out;
2640
2641         osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL,
2642                                         class_name, method_name);
2643         if (outbound_size) {
2644                 struct ceph_pagelist *pagelist;
2645
2646                 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
2647                 if (!pagelist)
2648                         goto out;
2649
2650                 ceph_pagelist_init(pagelist);
2651                 ceph_pagelist_append(pagelist, outbound, outbound_size);
2652                 osd_req_op_cls_request_data_pagelist(obj_request->osd_req, 0,
2653                                                 pagelist);
2654         }
2655         osd_req_op_cls_response_data_pages(obj_request->osd_req, 0,
2656                                         obj_request->pages, inbound_size,
2657                                         0, false, false);
2658         rbd_osd_req_format_read(obj_request);
2659
2660         ret = rbd_obj_request_submit(osdc, obj_request);
2661         if (ret)
2662                 goto out;
2663         ret = rbd_obj_request_wait(obj_request);
2664         if (ret)
2665                 goto out;
2666
2667         ret = obj_request->result;
2668         if (ret < 0)
2669                 goto out;
2670
2671         rbd_assert(obj_request->xferred < (u64)INT_MAX);
2672         ret = (int)obj_request->xferred;
2673         ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred);
2674         if (version)
2675                 *version = obj_request->version;
2676 out:
2677         if (obj_request)
2678                 rbd_obj_request_put(obj_request);
2679         else
2680                 ceph_release_page_vector(pages, page_count);
2681
2682         return ret;
2683 }
2684
2685 static void rbd_request_fn(struct request_queue *q)
2686                 __releases(q->queue_lock) __acquires(q->queue_lock)
2687 {
2688         struct rbd_device *rbd_dev = q->queuedata;
2689         bool read_only = rbd_dev->mapping.read_only;
2690         struct request *rq;
2691         int result;
2692
2693         while ((rq = blk_fetch_request(q))) {
2694                 bool write_request = rq_data_dir(rq) == WRITE;
2695                 struct rbd_img_request *img_request;
2696                 u64 offset;
2697                 u64 length;
2698
2699                 /* Ignore any non-FS requests that filter through. */
2700
2701                 if (rq->cmd_type != REQ_TYPE_FS) {
2702                         dout("%s: non-fs request type %d\n", __func__,
2703                                 (int) rq->cmd_type);
2704                         __blk_end_request_all(rq, 0);
2705                         continue;
2706                 }
2707
2708                 /* Ignore/skip any zero-length requests */
2709
2710                 offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT;
2711                 length = (u64) blk_rq_bytes(rq);
2712
2713                 if (!length) {
2714                         dout("%s: zero-length request\n", __func__);
2715                         __blk_end_request_all(rq, 0);
2716                         continue;
2717                 }
2718
2719                 spin_unlock_irq(q->queue_lock);
2720
2721                 /* Disallow writes to a read-only device */
2722
2723                 if (write_request) {
2724                         result = -EROFS;
2725                         if (read_only)
2726                                 goto end_request;
2727                         rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
2728                 }
2729
2730                 /*
2731                  * Quit early if the mapped snapshot no longer
2732                  * exists.  It's still possible the snapshot will
2733                  * have disappeared by the time our request arrives
2734                  * at the osd, but there's no sense in sending it if
2735                  * we already know.
2736                  */
2737                 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
2738                         dout("request for non-existent snapshot");
2739                         rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
2740                         result = -ENXIO;
2741                         goto end_request;
2742                 }
2743
2744                 result = -EINVAL;
2745                 if (offset && length > U64_MAX - offset + 1) {
2746                         rbd_warn(rbd_dev, "bad request range (%llu~%llu)\n",
2747                                 offset, length);
2748                         goto end_request;       /* Shouldn't happen */
2749                 }
2750
2751                 result = -ENOMEM;
2752                 img_request = rbd_img_request_create(rbd_dev, offset, length,
2753                                                         write_request, false);
2754                 if (!img_request)
2755                         goto end_request;
2756
2757                 img_request->rq = rq;
2758
2759                 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2760                                                 rq->bio);
2761                 if (!result)
2762                         result = rbd_img_request_submit(img_request);
2763                 if (result)
2764                         rbd_img_request_put(img_request);
2765 end_request:
2766                 spin_lock_irq(q->queue_lock);
2767                 if (result < 0) {
2768                         rbd_warn(rbd_dev, "%s %llx at %llx result %d\n",
2769                                 write_request ? "write" : "read",
2770                                 length, offset, result);
2771
2772                         __blk_end_request_all(rq, result);
2773                 }
2774         }
2775 }
2776
2777 /*
2778  * a queue callback. Makes sure that we don't create a bio that spans across
2779  * multiple osd objects. One exception would be with a single page bios,
2780  * which we handle later at bio_chain_clone_range()
2781  */
2782 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
2783                           struct bio_vec *bvec)
2784 {
2785         struct rbd_device *rbd_dev = q->queuedata;
2786         sector_t sector_offset;
2787         sector_t sectors_per_obj;
2788         sector_t obj_sector_offset;
2789         int ret;
2790
2791         /*
2792          * Find how far into its rbd object the partition-relative
2793          * bio start sector is to offset relative to the enclosing
2794          * device.
2795          */
2796         sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
2797         sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
2798         obj_sector_offset = sector_offset & (sectors_per_obj - 1);
2799
2800         /*
2801          * Compute the number of bytes from that offset to the end
2802          * of the object.  Account for what's already used by the bio.
2803          */
2804         ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
2805         if (ret > bmd->bi_size)
2806                 ret -= bmd->bi_size;
2807         else
2808                 ret = 0;
2809
2810         /*
2811          * Don't send back more than was asked for.  And if the bio
2812          * was empty, let the whole thing through because:  "Note
2813          * that a block device *must* allow a single page to be
2814          * added to an empty bio."
2815          */
2816         rbd_assert(bvec->bv_len <= PAGE_SIZE);
2817         if (ret > (int) bvec->bv_len || !bmd->bi_size)
2818                 ret = (int) bvec->bv_len;
2819
2820         return ret;
2821 }
2822
2823 static void rbd_free_disk(struct rbd_device *rbd_dev)
2824 {
2825         struct gendisk *disk = rbd_dev->disk;
2826
2827         if (!disk)
2828                 return;
2829
2830         rbd_dev->disk = NULL;
2831         if (disk->flags & GENHD_FL_UP) {
2832                 del_gendisk(disk);
2833                 if (disk->queue)
2834                         blk_cleanup_queue(disk->queue);
2835         }
2836         put_disk(disk);
2837 }
2838
2839 static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
2840                                 const char *object_name,
2841                                 u64 offset, u64 length, void *buf)
2842
2843 {
2844         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2845         struct rbd_obj_request *obj_request;
2846         struct page **pages = NULL;
2847         u32 page_count;
2848         size_t size;
2849         int ret;
2850
2851         page_count = (u32) calc_pages_for(offset, length);
2852         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2853         if (IS_ERR(pages))
2854                 ret = PTR_ERR(pages);
2855
2856         ret = -ENOMEM;
2857         obj_request = rbd_obj_request_create(object_name, offset, length,
2858                                                         OBJ_REQUEST_PAGES);
2859         if (!obj_request)
2860                 goto out;
2861
2862         obj_request->pages = pages;
2863         obj_request->page_count = page_count;
2864
2865         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2866         if (!obj_request->osd_req)
2867                 goto out;
2868
2869         osd_req_op_extent_init(obj_request->osd_req, 0, CEPH_OSD_OP_READ,
2870                                         offset, length, 0, 0);
2871         osd_req_op_extent_osd_data_pages(obj_request->osd_req, 0,
2872                                         obj_request->pages,
2873                                         obj_request->length,
2874                                         obj_request->offset & ~PAGE_MASK,
2875                                         false, false);
2876         rbd_osd_req_format_read(obj_request);
2877
2878         ret = rbd_obj_request_submit(osdc, obj_request);
2879         if (ret)
2880                 goto out;
2881         ret = rbd_obj_request_wait(obj_request);
2882         if (ret)
2883                 goto out;
2884
2885         ret = obj_request->result;
2886         if (ret < 0)
2887                 goto out;
2888
2889         rbd_assert(obj_request->xferred <= (u64) SIZE_MAX);
2890         size = (size_t) obj_request->xferred;
2891         ceph_copy_from_page_vector(pages, buf, 0, size);
2892         rbd_assert(size <= (size_t)INT_MAX);
2893         ret = (int)size;
2894 out:
2895         if (obj_request)
2896                 rbd_obj_request_put(obj_request);
2897         else
2898                 ceph_release_page_vector(pages, page_count);
2899
2900         return ret;
2901 }
2902
2903 /*
2904  * Read the complete header for the given rbd device.
2905  *
2906  * Returns a pointer to a dynamically-allocated buffer containing
2907  * the complete and validated header.  Caller can pass the address
2908  * of a variable that will be filled in with the version of the
2909  * header object at the time it was read.
2910  *
2911  * Returns a pointer-coded errno if a failure occurs.
2912  */
2913 static struct rbd_image_header_ondisk *
2914 rbd_dev_v1_header_read(struct rbd_device *rbd_dev)
2915 {
2916         struct rbd_image_header_ondisk *ondisk = NULL;
2917         u32 snap_count = 0;
2918         u64 names_size = 0;
2919         u32 want_count;
2920         int ret;
2921
2922         /*
2923          * The complete header will include an array of its 64-bit
2924          * snapshot ids, followed by the names of those snapshots as
2925          * a contiguous block of NUL-terminated strings.  Note that
2926          * the number of snapshots could change by the time we read
2927          * it in, in which case we re-read it.
2928          */
2929         do {
2930                 size_t size;
2931
2932                 kfree(ondisk);
2933
2934                 size = sizeof (*ondisk);
2935                 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
2936                 size += names_size;
2937                 ondisk = kmalloc(size, GFP_KERNEL);
2938                 if (!ondisk)
2939                         return ERR_PTR(-ENOMEM);
2940
2941                 ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
2942                                        0, size, ondisk);
2943                 if (ret < 0)
2944                         goto out_err;
2945                 if ((size_t)ret < size) {
2946                         ret = -ENXIO;
2947                         rbd_warn(rbd_dev, "short header read (want %zd got %d)",
2948                                 size, ret);
2949                         goto out_err;
2950                 }
2951                 if (!rbd_dev_ondisk_valid(ondisk)) {
2952                         ret = -ENXIO;
2953                         rbd_warn(rbd_dev, "invalid header");
2954                         goto out_err;
2955                 }
2956
2957                 names_size = le64_to_cpu(ondisk->snap_names_len);
2958                 want_count = snap_count;
2959                 snap_count = le32_to_cpu(ondisk->snap_count);
2960         } while (snap_count != want_count);
2961
2962         return ondisk;
2963
2964 out_err:
2965         kfree(ondisk);
2966
2967         return ERR_PTR(ret);
2968 }
2969
2970 /*
2971  * reload the ondisk the header
2972  */
2973 static int rbd_read_header(struct rbd_device *rbd_dev,
2974                            struct rbd_image_header *header)
2975 {
2976         struct rbd_image_header_ondisk *ondisk;
2977         int ret;
2978
2979         ondisk = rbd_dev_v1_header_read(rbd_dev);
2980         if (IS_ERR(ondisk))
2981                 return PTR_ERR(ondisk);
2982         ret = rbd_header_from_disk(header, ondisk);
2983         kfree(ondisk);
2984
2985         return ret;
2986 }
2987
2988 static void rbd_remove_all_snaps(struct rbd_device *rbd_dev)
2989 {
2990         struct rbd_snap *snap;
2991         struct rbd_snap *next;
2992
2993         list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node) {
2994                 list_del(&snap->node);
2995                 rbd_snap_destroy(snap);
2996         }
2997 }
2998
2999 static void rbd_update_mapping_size(struct rbd_device *rbd_dev)
3000 {
3001         if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
3002                 return;
3003
3004         if (rbd_dev->mapping.size != rbd_dev->header.image_size) {
3005                 sector_t size;
3006
3007                 rbd_dev->mapping.size = rbd_dev->header.image_size;
3008                 size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
3009                 dout("setting size to %llu sectors", (unsigned long long)size);
3010                 set_capacity(rbd_dev->disk, size);
3011         }
3012 }
3013
3014 /*
3015  * only read the first part of the ondisk header, without the snaps info
3016  */
3017 static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev, u64 *hver)
3018 {
3019         int ret;
3020         struct rbd_image_header h;
3021
3022         ret = rbd_read_header(rbd_dev, &h);
3023         if (ret < 0)
3024                 return ret;
3025
3026         down_write(&rbd_dev->header_rwsem);
3027
3028         /* Update image size, and check for resize of mapped image */
3029         rbd_dev->header.image_size = h.image_size;
3030         rbd_update_mapping_size(rbd_dev);
3031
3032         /* rbd_dev->header.object_prefix shouldn't change */
3033         kfree(rbd_dev->header.snap_sizes);
3034         kfree(rbd_dev->header.snap_names);
3035         /* osd requests may still refer to snapc */
3036         ceph_put_snap_context(rbd_dev->header.snapc);
3037
3038         rbd_dev->header.image_size = h.image_size;
3039         rbd_dev->header.snapc = h.snapc;
3040         rbd_dev->header.snap_names = h.snap_names;
3041         rbd_dev->header.snap_sizes = h.snap_sizes;
3042         /* Free the extra copy of the object prefix */
3043         if (strcmp(rbd_dev->header.object_prefix, h.object_prefix))
3044                 rbd_warn(rbd_dev, "object prefix changed (ignoring)");
3045         kfree(h.object_prefix);
3046
3047         ret = rbd_dev_snaps_update(rbd_dev);
3048
3049         up_write(&rbd_dev->header_rwsem);
3050
3051         return ret;
3052 }
3053
3054 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver)
3055 {
3056         u64 image_size;
3057         int ret;
3058
3059         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
3060         image_size = rbd_dev->header.image_size;
3061         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
3062         if (rbd_dev->image_format == 1)
3063                 ret = rbd_dev_v1_refresh(rbd_dev, hver);
3064         else
3065                 ret = rbd_dev_v2_refresh(rbd_dev, hver);
3066         mutex_unlock(&ctl_mutex);
3067         if (ret)
3068                 rbd_warn(rbd_dev, "got notification but failed to "
3069                            " update snaps: %d\n", ret);
3070         if (image_size != rbd_dev->header.image_size)
3071                 revalidate_disk(rbd_dev->disk);
3072
3073         return ret;
3074 }
3075
3076 static int rbd_init_disk(struct rbd_device *rbd_dev)
3077 {
3078         struct gendisk *disk;
3079         struct request_queue *q;
3080         u64 segment_size;
3081
3082         /* create gendisk info */
3083         disk = alloc_disk(RBD_MINORS_PER_MAJOR);
3084         if (!disk)
3085                 return -ENOMEM;
3086
3087         snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
3088                  rbd_dev->dev_id);
3089         disk->major = rbd_dev->major;
3090         disk->first_minor = 0;
3091         disk->fops = &rbd_bd_ops;
3092         disk->private_data = rbd_dev;
3093
3094         q = blk_init_queue(rbd_request_fn, &rbd_dev->lock);
3095         if (!q)
3096                 goto out_disk;
3097
3098         /* We use the default size, but let's be explicit about it. */
3099         blk_queue_physical_block_size(q, SECTOR_SIZE);
3100
3101         /* set io sizes to object size */
3102         segment_size = rbd_obj_bytes(&rbd_dev->header);
3103         blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
3104         blk_queue_max_segment_size(q, segment_size);
3105         blk_queue_io_min(q, segment_size);
3106         blk_queue_io_opt(q, segment_size);
3107
3108         blk_queue_merge_bvec(q, rbd_merge_bvec);
3109         disk->queue = q;
3110
3111         q->queuedata = rbd_dev;
3112
3113         rbd_dev->disk = disk;
3114
3115         return 0;
3116 out_disk:
3117         put_disk(disk);
3118
3119         return -ENOMEM;
3120 }
3121
3122 /*
3123   sysfs
3124 */
3125
3126 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
3127 {
3128         return container_of(dev, struct rbd_device, dev);
3129 }
3130
3131 static ssize_t rbd_size_show(struct device *dev,
3132                              struct device_attribute *attr, char *buf)
3133 {
3134         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3135
3136         return sprintf(buf, "%llu\n",
3137                 (unsigned long long)rbd_dev->mapping.size);
3138 }
3139
3140 /*
3141  * Note this shows the features for whatever's mapped, which is not
3142  * necessarily the base image.
3143  */
3144 static ssize_t rbd_features_show(struct device *dev,
3145                              struct device_attribute *attr, char *buf)
3146 {
3147         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3148
3149         return sprintf(buf, "0x%016llx\n",
3150                         (unsigned long long)rbd_dev->mapping.features);
3151 }
3152
3153 static ssize_t rbd_major_show(struct device *dev,
3154                               struct device_attribute *attr, char *buf)
3155 {
3156         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3157
3158         if (rbd_dev->major)
3159                 return sprintf(buf, "%d\n", rbd_dev->major);
3160
3161         return sprintf(buf, "(none)\n");
3162
3163 }
3164
3165 static ssize_t rbd_client_id_show(struct device *dev,
3166                                   struct device_attribute *attr, char *buf)
3167 {
3168         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3169
3170         return sprintf(buf, "client%lld\n",
3171                         ceph_client_id(rbd_dev->rbd_client->client));
3172 }
3173
3174 static ssize_t rbd_pool_show(struct device *dev,
3175                              struct device_attribute *attr, char *buf)
3176 {
3177         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3178
3179         return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
3180 }
3181
3182 static ssize_t rbd_pool_id_show(struct device *dev,
3183                              struct device_attribute *attr, char *buf)
3184 {
3185         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3186
3187         return sprintf(buf, "%llu\n",
3188                         (unsigned long long) rbd_dev->spec->pool_id);
3189 }
3190
3191 static ssize_t rbd_name_show(struct device *dev,
3192                              struct device_attribute *attr, char *buf)
3193 {
3194         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3195
3196         if (rbd_dev->spec->image_name)
3197                 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
3198
3199         return sprintf(buf, "(unknown)\n");
3200 }
3201
3202 static ssize_t rbd_image_id_show(struct device *dev,
3203                              struct device_attribute *attr, char *buf)
3204 {
3205         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3206
3207         return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
3208 }
3209
3210 /*
3211  * Shows the name of the currently-mapped snapshot (or
3212  * RBD_SNAP_HEAD_NAME for the base image).
3213  */
3214 static ssize_t rbd_snap_show(struct device *dev,
3215                              struct device_attribute *attr,
3216                              char *buf)
3217 {
3218         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3219
3220         return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
3221 }
3222
3223 /*
3224  * For an rbd v2 image, shows the pool id, image id, and snapshot id
3225  * for the parent image.  If there is no parent, simply shows
3226  * "(no parent image)".
3227  */
3228 static ssize_t rbd_parent_show(struct device *dev,
3229                              struct device_attribute *attr,
3230                              char *buf)
3231 {
3232         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3233         struct rbd_spec *spec = rbd_dev->parent_spec;
3234         int count;
3235         char *bufp = buf;
3236
3237         if (!spec)
3238                 return sprintf(buf, "(no parent image)\n");
3239
3240         count = sprintf(bufp, "pool_id %llu\npool_name %s\n",
3241                         (unsigned long long) spec->pool_id, spec->pool_name);
3242         if (count < 0)
3243                 return count;
3244         bufp += count;
3245
3246         count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id,
3247                         spec->image_name ? spec->image_name : "(unknown)");
3248         if (count < 0)
3249                 return count;
3250         bufp += count;
3251
3252         count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n",
3253                         (unsigned long long) spec->snap_id, spec->snap_name);
3254         if (count < 0)
3255                 return count;
3256         bufp += count;
3257
3258         count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap);
3259         if (count < 0)
3260                 return count;
3261         bufp += count;
3262
3263         return (ssize_t) (bufp - buf);
3264 }
3265
3266 static ssize_t rbd_image_refresh(struct device *dev,
3267                                  struct device_attribute *attr,
3268                                  const char *buf,
3269                                  size_t size)
3270 {
3271         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3272         int ret;
3273
3274         ret = rbd_dev_refresh(rbd_dev, NULL);
3275
3276         return ret < 0 ? ret : size;
3277 }
3278
3279 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
3280 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
3281 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
3282 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
3283 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
3284 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
3285 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
3286 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
3287 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
3288 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
3289 static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
3290
3291 static struct attribute *rbd_attrs[] = {
3292         &dev_attr_size.attr,
3293         &dev_attr_features.attr,
3294         &dev_attr_major.attr,
3295         &dev_attr_client_id.attr,
3296         &dev_attr_pool.attr,
3297         &dev_attr_pool_id.attr,
3298         &dev_attr_name.attr,
3299         &dev_attr_image_id.attr,
3300         &dev_attr_current_snap.attr,
3301         &dev_attr_parent.attr,
3302         &dev_attr_refresh.attr,
3303         NULL
3304 };
3305
3306 static struct attribute_group rbd_attr_group = {
3307         .attrs = rbd_attrs,
3308 };
3309
3310 static const struct attribute_group *rbd_attr_groups[] = {
3311         &rbd_attr_group,
3312         NULL
3313 };
3314
3315 static void rbd_sysfs_dev_release(struct device *dev)
3316 {
3317 }
3318
3319 static struct device_type rbd_device_type = {
3320         .name           = "rbd",
3321         .groups         = rbd_attr_groups,
3322         .release        = rbd_sysfs_dev_release,
3323 };
3324
3325 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
3326 {
3327         kref_get(&spec->kref);
3328
3329         return spec;
3330 }
3331
3332 static void rbd_spec_free(struct kref *kref);
3333 static void rbd_spec_put(struct rbd_spec *spec)
3334 {
3335         if (spec)
3336                 kref_put(&spec->kref, rbd_spec_free);
3337 }
3338
3339 static struct rbd_spec *rbd_spec_alloc(void)
3340 {
3341         struct rbd_spec *spec;
3342
3343         spec = kzalloc(sizeof (*spec), GFP_KERNEL);
3344         if (!spec)
3345                 return NULL;
3346         kref_init(&spec->kref);
3347
3348         return spec;
3349 }
3350
3351 static void rbd_spec_free(struct kref *kref)
3352 {
3353         struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
3354
3355         kfree(spec->pool_name);
3356         kfree(spec->image_id);
3357         kfree(spec->image_name);
3358         kfree(spec->snap_name);
3359         kfree(spec);
3360 }
3361
3362 static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
3363                                 struct rbd_spec *spec)
3364 {
3365         struct rbd_device *rbd_dev;
3366
3367         rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
3368         if (!rbd_dev)
3369                 return NULL;
3370
3371         spin_lock_init(&rbd_dev->lock);
3372         rbd_dev->flags = 0;
3373         INIT_LIST_HEAD(&rbd_dev->node);
3374         INIT_LIST_HEAD(&rbd_dev->snaps);
3375         init_rwsem(&rbd_dev->header_rwsem);
3376
3377         rbd_dev->spec = spec;
3378         rbd_dev->rbd_client = rbdc;
3379
3380         /* Initialize the layout used for all rbd requests */
3381
3382         rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3383         rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
3384         rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3385         rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
3386
3387         return rbd_dev;
3388 }
3389
3390 static void rbd_dev_destroy(struct rbd_device *rbd_dev)
3391 {
3392         rbd_put_client(rbd_dev->rbd_client);
3393         rbd_spec_put(rbd_dev->spec);
3394         kfree(rbd_dev);
3395 }
3396
3397 static void rbd_snap_destroy(struct rbd_snap *snap)
3398 {
3399         kfree(snap->name);
3400         kfree(snap);
3401 }
3402
3403 static struct rbd_snap *rbd_snap_create(struct rbd_device *rbd_dev,
3404                                                 const char *snap_name,
3405                                                 u64 snap_id, u64 snap_size,
3406                                                 u64 snap_features)
3407 {
3408         struct rbd_snap *snap;
3409
3410         snap = kzalloc(sizeof (*snap), GFP_KERNEL);
3411         if (!snap)
3412                 return ERR_PTR(-ENOMEM);
3413
3414         snap->name = snap_name;
3415         snap->id = snap_id;
3416         snap->size = snap_size;
3417         snap->features = snap_features;
3418
3419         return snap;
3420 }
3421
3422 /*
3423  * Returns a dynamically-allocated snapshot name if successful, or a
3424  * pointer-coded error otherwise.
3425  */
3426 static const char *rbd_dev_v1_snap_info(struct rbd_device *rbd_dev, u32 which,
3427                 u64 *snap_size, u64 *snap_features)
3428 {
3429         const char *snap_name;
3430         int i;
3431
3432         rbd_assert(which < rbd_dev->header.snapc->num_snaps);
3433
3434         /* Skip over names until we find the one we are looking for */
3435
3436         snap_name = rbd_dev->header.snap_names;
3437         for (i = 0; i < which; i++)
3438                 snap_name += strlen(snap_name) + 1;
3439
3440         snap_name = kstrdup(snap_name, GFP_KERNEL);
3441         if (!snap_name)
3442                 return ERR_PTR(-ENOMEM);
3443
3444         *snap_size = rbd_dev->header.snap_sizes[which];
3445         *snap_features = 0;     /* No features for v1 */
3446
3447         return snap_name;
3448 }
3449
3450 /*
3451  * Get the size and object order for an image snapshot, or if
3452  * snap_id is CEPH_NOSNAP, gets this information for the base
3453  * image.
3454  */
3455 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
3456                                 u8 *order, u64 *snap_size)
3457 {
3458         __le64 snapid = cpu_to_le64(snap_id);
3459         int ret;
3460         struct {
3461                 u8 order;
3462                 __le64 size;
3463         } __attribute__ ((packed)) size_buf = { 0 };
3464
3465         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3466                                 "rbd", "get_size",
3467                                 &snapid, sizeof (snapid),
3468                                 &size_buf, sizeof (size_buf), NULL);
3469         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3470         if (ret < 0)
3471                 return ret;
3472         if (ret < sizeof (size_buf))
3473                 return -ERANGE;
3474
3475         if (order)
3476                 *order = size_buf.order;
3477         *snap_size = le64_to_cpu(size_buf.size);
3478
3479         dout("  snap_id 0x%016llx order = %u, snap_size = %llu\n",
3480                 (unsigned long long)snap_id, (unsigned int)*order,
3481                 (unsigned long long)*snap_size);
3482
3483         return 0;
3484 }
3485
3486 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
3487 {
3488         return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
3489                                         &rbd_dev->header.obj_order,
3490                                         &rbd_dev->header.image_size);
3491 }
3492
3493 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
3494 {
3495         void *reply_buf;
3496         int ret;
3497         void *p;
3498
3499         reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
3500         if (!reply_buf)
3501                 return -ENOMEM;
3502
3503         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3504                                 "rbd", "get_object_prefix", NULL, 0,
3505                                 reply_buf, RBD_OBJ_PREFIX_LEN_MAX, NULL);
3506         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3507         if (ret < 0)
3508                 goto out;
3509
3510         p = reply_buf;
3511         rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
3512                                                 p + ret, NULL, GFP_NOIO);
3513         ret = 0;
3514
3515         if (IS_ERR(rbd_dev->header.object_prefix)) {
3516                 ret = PTR_ERR(rbd_dev->header.object_prefix);
3517                 rbd_dev->header.object_prefix = NULL;
3518         } else {
3519                 dout("  object_prefix = %s\n", rbd_dev->header.object_prefix);
3520         }
3521 out:
3522         kfree(reply_buf);
3523
3524         return ret;
3525 }
3526
3527 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
3528                 u64 *snap_features)
3529 {
3530         __le64 snapid = cpu_to_le64(snap_id);
3531         struct {
3532                 __le64 features;
3533                 __le64 incompat;
3534         } __attribute__ ((packed)) features_buf = { 0 };
3535         u64 incompat;
3536         int ret;
3537
3538         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3539                                 "rbd", "get_features",
3540                                 &snapid, sizeof (snapid),
3541                                 &features_buf, sizeof (features_buf), NULL);
3542         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3543         if (ret < 0)
3544                 return ret;
3545         if (ret < sizeof (features_buf))
3546                 return -ERANGE;
3547
3548         incompat = le64_to_cpu(features_buf.incompat);
3549         if (incompat & ~RBD_FEATURES_SUPPORTED)
3550                 return -ENXIO;
3551
3552         *snap_features = le64_to_cpu(features_buf.features);
3553
3554         dout("  snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
3555                 (unsigned long long)snap_id,
3556                 (unsigned long long)*snap_features,
3557                 (unsigned long long)le64_to_cpu(features_buf.incompat));
3558
3559         return 0;
3560 }
3561
3562 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
3563 {
3564         return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
3565                                                 &rbd_dev->header.features);
3566 }
3567
3568 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
3569 {
3570         struct rbd_spec *parent_spec;
3571         size_t size;
3572         void *reply_buf = NULL;
3573         __le64 snapid;
3574         void *p;
3575         void *end;
3576         char *image_id;
3577         u64 overlap;
3578         int ret;
3579
3580         parent_spec = rbd_spec_alloc();
3581         if (!parent_spec)
3582                 return -ENOMEM;
3583
3584         size = sizeof (__le64) +                                /* pool_id */
3585                 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX +        /* image_id */
3586                 sizeof (__le64) +                               /* snap_id */
3587                 sizeof (__le64);                                /* overlap */
3588         reply_buf = kmalloc(size, GFP_KERNEL);
3589         if (!reply_buf) {
3590                 ret = -ENOMEM;
3591                 goto out_err;
3592         }
3593
3594         snapid = cpu_to_le64(CEPH_NOSNAP);
3595         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3596                                 "rbd", "get_parent",
3597                                 &snapid, sizeof (snapid),
3598                                 reply_buf, size, NULL);
3599         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3600         if (ret < 0)
3601                 goto out_err;
3602
3603         p = reply_buf;
3604         end = reply_buf + ret;
3605         ret = -ERANGE;
3606         ceph_decode_64_safe(&p, end, parent_spec->pool_id, out_err);
3607         if (parent_spec->pool_id == CEPH_NOPOOL)
3608                 goto out;       /* No parent?  No problem. */
3609
3610         /* The ceph file layout needs to fit pool id in 32 bits */
3611
3612         ret = -EIO;
3613         if (parent_spec->pool_id > (u64)U32_MAX) {
3614                 rbd_warn(NULL, "parent pool id too large (%llu > %u)\n",
3615                         (unsigned long long)parent_spec->pool_id, U32_MAX);
3616                 goto out_err;
3617         }
3618
3619         image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
3620         if (IS_ERR(image_id)) {
3621                 ret = PTR_ERR(image_id);
3622                 goto out_err;
3623         }
3624         parent_spec->image_id = image_id;
3625         ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
3626         ceph_decode_64_safe(&p, end, overlap, out_err);
3627
3628         rbd_dev->parent_overlap = overlap;
3629         rbd_dev->parent_spec = parent_spec;
3630         parent_spec = NULL;     /* rbd_dev now owns this */
3631 out:
3632         ret = 0;
3633 out_err:
3634         kfree(reply_buf);
3635         rbd_spec_put(parent_spec);
3636
3637         return ret;
3638 }
3639
3640 static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
3641 {
3642         struct {
3643                 __le64 stripe_unit;
3644                 __le64 stripe_count;
3645         } __attribute__ ((packed)) striping_info_buf = { 0 };
3646         size_t size = sizeof (striping_info_buf);
3647         void *p;
3648         u64 obj_size;
3649         u64 stripe_unit;
3650         u64 stripe_count;
3651         int ret;
3652
3653         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3654                                 "rbd", "get_stripe_unit_count", NULL, 0,
3655                                 (char *)&striping_info_buf, size, NULL);
3656         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3657         if (ret < 0)
3658                 return ret;
3659         if (ret < size)
3660                 return -ERANGE;
3661
3662         /*
3663          * We don't actually support the "fancy striping" feature
3664          * (STRIPINGV2) yet, but if the striping sizes are the
3665          * defaults the behavior is the same as before.  So find
3666          * out, and only fail if the image has non-default values.
3667          */
3668         ret = -EINVAL;
3669         obj_size = (u64)1 << rbd_dev->header.obj_order;
3670         p = &striping_info_buf;
3671         stripe_unit = ceph_decode_64(&p);
3672         if (stripe_unit != obj_size) {
3673                 rbd_warn(rbd_dev, "unsupported stripe unit "
3674                                 "(got %llu want %llu)",
3675                                 stripe_unit, obj_size);
3676                 return -EINVAL;
3677         }
3678         stripe_count = ceph_decode_64(&p);
3679         if (stripe_count != 1) {
3680                 rbd_warn(rbd_dev, "unsupported stripe count "
3681                                 "(got %llu want 1)", stripe_count);
3682                 return -EINVAL;
3683         }
3684         rbd_dev->header.stripe_unit = stripe_unit;
3685         rbd_dev->header.stripe_count = stripe_count;
3686
3687         return 0;
3688 }
3689
3690 static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
3691 {
3692         size_t image_id_size;
3693         char *image_id;
3694         void *p;
3695         void *end;
3696         size_t size;
3697         void *reply_buf = NULL;
3698         size_t len = 0;
3699         char *image_name = NULL;
3700         int ret;
3701
3702         rbd_assert(!rbd_dev->spec->image_name);
3703
3704         len = strlen(rbd_dev->spec->image_id);
3705         image_id_size = sizeof (__le32) + len;
3706         image_id = kmalloc(image_id_size, GFP_KERNEL);
3707         if (!image_id)
3708                 return NULL;
3709
3710         p = image_id;
3711         end = image_id + image_id_size;
3712         ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len);
3713
3714         size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
3715         reply_buf = kmalloc(size, GFP_KERNEL);
3716         if (!reply_buf)
3717                 goto out;
3718
3719         ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY,
3720                                 "rbd", "dir_get_name",
3721                                 image_id, image_id_size,
3722                                 reply_buf, size, NULL);
3723         if (ret < 0)
3724                 goto out;
3725         p = reply_buf;
3726         end = reply_buf + ret;
3727
3728         image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
3729         if (IS_ERR(image_name))
3730                 image_name = NULL;
3731         else
3732                 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
3733 out:
3734         kfree(reply_buf);
3735         kfree(image_id);
3736
3737         return image_name;
3738 }
3739
3740 /*
3741  * When an rbd image has a parent image, it is identified by the
3742  * pool, image, and snapshot ids (not names).  This function fills
3743  * in the names for those ids.  (It's OK if we can't figure out the
3744  * name for an image id, but the pool and snapshot ids should always
3745  * exist and have names.)  All names in an rbd spec are dynamically
3746  * allocated.
3747  *
3748  * When an image being mapped (not a parent) is probed, we have the
3749  * pool name and pool id, image name and image id, and the snapshot
3750  * name.  The only thing we're missing is the snapshot id.
3751  *
3752  * The set of snapshots for an image is not known until they have
3753  * been read by rbd_dev_snaps_update(), so we can't completely fill
3754  * in this information until after that has been called.
3755  */
3756 static int rbd_dev_spec_update(struct rbd_device *rbd_dev)
3757 {
3758         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3759         struct rbd_spec *spec = rbd_dev->spec;
3760         const char *pool_name;
3761         const char *image_name;
3762         const char *snap_name;
3763         int ret;
3764
3765         /*
3766          * An image being mapped will have the pool name (etc.), but
3767          * we need to look up the snapshot id.
3768          */
3769         if (spec->pool_name) {
3770                 if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
3771                         struct rbd_snap *snap;
3772
3773                         snap = snap_by_name(rbd_dev, spec->snap_name);
3774                         if (!snap)
3775                                 return -ENOENT;
3776                         spec->snap_id = snap->id;
3777                 } else {
3778                         spec->snap_id = CEPH_NOSNAP;
3779                 }
3780
3781                 return 0;
3782         }
3783
3784         /* Get the pool name; we have to make our own copy of this */
3785
3786         pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id);
3787         if (!pool_name) {
3788                 rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id);
3789                 return -EIO;
3790         }
3791         pool_name = kstrdup(pool_name, GFP_KERNEL);
3792         if (!pool_name)
3793                 return -ENOMEM;
3794
3795         /* Fetch the image name; tolerate failure here */
3796
3797         image_name = rbd_dev_image_name(rbd_dev);
3798         if (!image_name)
3799                 rbd_warn(rbd_dev, "unable to get image name");
3800
3801         /* Look up the snapshot name, and make a copy */
3802
3803         snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
3804         if (!snap_name) {
3805                 rbd_warn(rbd_dev, "no snapshot with id %llu", spec->snap_id);
3806                 ret = -EIO;
3807                 goto out_err;
3808         }
3809         snap_name = kstrdup(snap_name, GFP_KERNEL);
3810         if (!snap_name) {
3811                 ret = -ENOMEM;
3812                 goto out_err;
3813         }
3814
3815         spec->pool_name = pool_name;
3816         spec->image_name = image_name;
3817         spec->snap_name = snap_name;
3818
3819         return 0;
3820 out_err:
3821         kfree(image_name);
3822         kfree(pool_name);
3823
3824         return ret;
3825 }
3826
3827 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev, u64 *ver)
3828 {
3829         size_t size;
3830         int ret;
3831         void *reply_buf;
3832         void *p;
3833         void *end;
3834         u64 seq;
3835         u32 snap_count;
3836         struct ceph_snap_context *snapc;
3837         u32 i;
3838
3839         /*
3840          * We'll need room for the seq value (maximum snapshot id),
3841          * snapshot count, and array of that many snapshot ids.
3842          * For now we have a fixed upper limit on the number we're
3843          * prepared to receive.
3844          */
3845         size = sizeof (__le64) + sizeof (__le32) +
3846                         RBD_MAX_SNAP_COUNT * sizeof (__le64);
3847         reply_buf = kzalloc(size, GFP_KERNEL);
3848         if (!reply_buf)
3849                 return -ENOMEM;
3850
3851         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3852                                 "rbd", "get_snapcontext", NULL, 0,
3853                                 reply_buf, size, ver);
3854         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3855         if (ret < 0)
3856                 goto out;
3857
3858         p = reply_buf;
3859         end = reply_buf + ret;
3860         ret = -ERANGE;
3861         ceph_decode_64_safe(&p, end, seq, out);
3862         ceph_decode_32_safe(&p, end, snap_count, out);
3863
3864         /*
3865          * Make sure the reported number of snapshot ids wouldn't go
3866          * beyond the end of our buffer.  But before checking that,
3867          * make sure the computed size of the snapshot context we
3868          * allocate is representable in a size_t.
3869          */
3870         if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
3871                                  / sizeof (u64)) {
3872                 ret = -EINVAL;
3873                 goto out;
3874         }
3875         if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
3876                 goto out;
3877         ret = 0;
3878
3879         snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
3880         if (!snapc) {
3881                 ret = -ENOMEM;
3882                 goto out;
3883         }
3884         snapc->seq = seq;
3885         for (i = 0; i < snap_count; i++)
3886                 snapc->snaps[i] = ceph_decode_64(&p);
3887
3888         rbd_dev->header.snapc = snapc;
3889
3890         dout("  snap context seq = %llu, snap_count = %u\n",
3891                 (unsigned long long)seq, (unsigned int)snap_count);
3892 out:
3893         kfree(reply_buf);
3894
3895         return ret;
3896 }
3897
3898 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which)
3899 {
3900         size_t size;
3901         void *reply_buf;
3902         __le64 snap_id;
3903         int ret;
3904         void *p;
3905         void *end;
3906         char *snap_name;
3907
3908         size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
3909         reply_buf = kmalloc(size, GFP_KERNEL);
3910         if (!reply_buf)
3911                 return ERR_PTR(-ENOMEM);
3912
3913         rbd_assert(which < rbd_dev->header.snapc->num_snaps);
3914         snap_id = cpu_to_le64(rbd_dev->header.snapc->snaps[which]);
3915         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3916                                 "rbd", "get_snapshot_name",
3917                                 &snap_id, sizeof (snap_id),
3918                                 reply_buf, size, NULL);
3919         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3920         if (ret < 0) {
3921                 snap_name = ERR_PTR(ret);
3922                 goto out;
3923         }
3924
3925         p = reply_buf;
3926         end = reply_buf + ret;
3927         snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
3928         if (IS_ERR(snap_name))
3929                 goto out;
3930
3931         dout("  snap_id 0x%016llx snap_name = %s\n",
3932                 (unsigned long long)le64_to_cpu(snap_id), snap_name);
3933 out:
3934         kfree(reply_buf);
3935
3936         return snap_name;
3937 }
3938
3939 static const char *rbd_dev_v2_snap_info(struct rbd_device *rbd_dev, u32 which,
3940                 u64 *snap_size, u64 *snap_features)
3941 {
3942         u64 snap_id;
3943         u64 size;
3944         u64 features;
3945         const char *snap_name;
3946         int ret;
3947
3948         rbd_assert(which < rbd_dev->header.snapc->num_snaps);
3949         snap_id = rbd_dev->header.snapc->snaps[which];
3950         ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
3951         if (ret)
3952                 goto out_err;
3953
3954         ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features);
3955         if (ret)
3956                 goto out_err;
3957
3958         snap_name = rbd_dev_v2_snap_name(rbd_dev, which);
3959         if (!IS_ERR(snap_name)) {
3960                 *snap_size = size;
3961                 *snap_features = features;
3962         }
3963
3964         return snap_name;
3965 out_err:
3966         return ERR_PTR(ret);
3967 }
3968
3969 static const char *rbd_dev_snap_info(struct rbd_device *rbd_dev, u32 which,
3970                 u64 *snap_size, u64 *snap_features)
3971 {
3972         if (rbd_dev->image_format == 1)
3973                 return rbd_dev_v1_snap_info(rbd_dev, which,
3974                                         snap_size, snap_features);
3975         if (rbd_dev->image_format == 2)
3976                 return rbd_dev_v2_snap_info(rbd_dev, which,
3977                                         snap_size, snap_features);
3978         return ERR_PTR(-EINVAL);
3979 }
3980
3981 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver)
3982 {
3983         int ret;
3984
3985         down_write(&rbd_dev->header_rwsem);
3986
3987         ret = rbd_dev_v2_image_size(rbd_dev);
3988         if (ret)
3989                 goto out;
3990         rbd_update_mapping_size(rbd_dev);
3991
3992         ret = rbd_dev_v2_snap_context(rbd_dev, hver);
3993         dout("rbd_dev_v2_snap_context returned %d\n", ret);
3994         if (ret)
3995                 goto out;
3996         ret = rbd_dev_snaps_update(rbd_dev);
3997         dout("rbd_dev_snaps_update returned %d\n", ret);
3998         if (ret)
3999                 goto out;
4000 out:
4001         up_write(&rbd_dev->header_rwsem);
4002
4003         return ret;
4004 }
4005
4006 /*
4007  * Scan the rbd device's current snapshot list and compare it to the
4008  * newly-received snapshot context.  Remove any existing snapshots
4009  * not present in the new snapshot context.  Add a new snapshot for
4010  * any snaphots in the snapshot context not in the current list.
4011  * And verify there are no changes to snapshots we already know
4012  * about.
4013  *
4014  * Assumes the snapshots in the snapshot context are sorted by
4015  * snapshot id, highest id first.  (Snapshots in the rbd_dev's list
4016  * are also maintained in that order.)
4017  *
4018  * Note that any error occurs while updating the snapshot list
4019  * aborts the update, and the entire list is cleared.  The snapshot
4020  * list becomes inconsistent at that point anyway, so it might as
4021  * well be empty.
4022  */
4023 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev)
4024 {
4025         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
4026         const u32 snap_count = snapc->num_snaps;
4027         struct list_head *head = &rbd_dev->snaps;
4028         struct list_head *links = head->next;
4029         u32 index = 0;
4030         int ret = 0;
4031
4032         dout("%s: snap count is %u\n", __func__, (unsigned int)snap_count);
4033         while (index < snap_count || links != head) {
4034                 u64 snap_id;
4035                 struct rbd_snap *snap;
4036                 const char *snap_name;
4037                 u64 snap_size = 0;
4038                 u64 snap_features = 0;
4039
4040                 snap_id = index < snap_count ? snapc->snaps[index]
4041                                              : CEPH_NOSNAP;
4042                 snap = links != head ? list_entry(links, struct rbd_snap, node)
4043                                      : NULL;
4044                 rbd_assert(!snap || snap->id != CEPH_NOSNAP);
4045
4046                 if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
4047                         struct list_head *next = links->next;
4048
4049                         /*
4050                          * A previously-existing snapshot is not in
4051                          * the new snap context.
4052                          *
4053                          * If the now-missing snapshot is the one
4054                          * the image represents, clear its existence
4055                          * flag so we can avoid sending any more
4056                          * requests to it.
4057                          */
4058                         if (rbd_dev->spec->snap_id == snap->id)
4059                                 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4060                         dout("removing %ssnap id %llu\n",
4061                                 rbd_dev->spec->snap_id == snap->id ?
4062                                                         "mapped " : "",
4063                                 (unsigned long long)snap->id);
4064
4065                         list_del(&snap->node);
4066                         rbd_snap_destroy(snap);
4067
4068                         /* Done with this list entry; advance */
4069
4070                         links = next;
4071                         continue;
4072                 }
4073
4074                 snap_name = rbd_dev_snap_info(rbd_dev, index,
4075                                         &snap_size, &snap_features);
4076                 if (IS_ERR(snap_name)) {
4077                         ret = PTR_ERR(snap_name);
4078                         dout("failed to get snap info, error %d\n", ret);
4079                         goto out_err;
4080                 }
4081
4082                 dout("entry %u: snap_id = %llu\n", (unsigned int)snap_count,
4083                         (unsigned long long)snap_id);
4084                 if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
4085                         struct rbd_snap *new_snap;
4086
4087                         /* We haven't seen this snapshot before */
4088
4089                         new_snap = rbd_snap_create(rbd_dev, snap_name,
4090                                         snap_id, snap_size, snap_features);
4091                         if (IS_ERR(new_snap)) {
4092                                 ret = PTR_ERR(new_snap);
4093                                 dout("  failed to add dev, error %d\n", ret);
4094                                 goto out_err;
4095                         }
4096
4097                         /* New goes before existing, or at end of list */
4098
4099                         dout("  added dev%s\n", snap ? "" : " at end\n");
4100                         if (snap)
4101                                 list_add_tail(&new_snap->node, &snap->node);
4102                         else
4103                                 list_add_tail(&new_snap->node, head);
4104                 } else {
4105                         /* Already have this one */
4106
4107                         dout("  already present\n");
4108
4109                         rbd_assert(snap->size == snap_size);
4110                         rbd_assert(!strcmp(snap->name, snap_name));
4111                         rbd_assert(snap->features == snap_features);
4112
4113                         /* Done with this list entry; advance */
4114
4115                         links = links->next;
4116                 }
4117
4118                 /* Advance to the next entry in the snapshot context */
4119
4120                 index++;
4121         }
4122         dout("%s: done\n", __func__);
4123
4124         return 0;
4125 out_err:
4126         rbd_remove_all_snaps(rbd_dev);
4127
4128         return ret;
4129 }
4130
4131 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
4132 {
4133         struct device *dev;
4134         int ret;
4135
4136         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
4137
4138         dev = &rbd_dev->dev;
4139         dev->bus = &rbd_bus_type;
4140         dev->type = &rbd_device_type;
4141         dev->parent = &rbd_root_dev;
4142         dev->release = rbd_dev_device_release;
4143         dev_set_name(dev, "%d", rbd_dev->dev_id);
4144         ret = device_register(dev);
4145
4146         mutex_unlock(&ctl_mutex);
4147
4148         return ret;
4149 }
4150
4151 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
4152 {
4153         device_unregister(&rbd_dev->dev);
4154 }
4155
4156 static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
4157
4158 /*
4159  * Get a unique rbd identifier for the given new rbd_dev, and add
4160  * the rbd_dev to the global list.  The minimum rbd id is 1.
4161  */
4162 static void rbd_dev_id_get(struct rbd_device *rbd_dev)
4163 {
4164         rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
4165
4166         spin_lock(&rbd_dev_list_lock);
4167         list_add_tail(&rbd_dev->node, &rbd_dev_list);
4168         spin_unlock(&rbd_dev_list_lock);
4169         dout("rbd_dev %p given dev id %llu\n", rbd_dev,
4170                 (unsigned long long) rbd_dev->dev_id);
4171 }
4172
4173 /*
4174  * Remove an rbd_dev from the global list, and record that its
4175  * identifier is no longer in use.
4176  */
4177 static void rbd_dev_id_put(struct rbd_device *rbd_dev)
4178 {
4179         struct list_head *tmp;
4180         int rbd_id = rbd_dev->dev_id;
4181         int max_id;
4182
4183         rbd_assert(rbd_id > 0);
4184
4185         dout("rbd_dev %p released dev id %llu\n", rbd_dev,
4186                 (unsigned long long) rbd_dev->dev_id);
4187         spin_lock(&rbd_dev_list_lock);
4188         list_del_init(&rbd_dev->node);
4189
4190         /*
4191          * If the id being "put" is not the current maximum, there
4192          * is nothing special we need to do.
4193          */
4194         if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
4195                 spin_unlock(&rbd_dev_list_lock);
4196                 return;
4197         }
4198
4199         /*
4200          * We need to update the current maximum id.  Search the
4201          * list to find out what it is.  We're more likely to find
4202          * the maximum at the end, so search the list backward.
4203          */
4204         max_id = 0;
4205         list_for_each_prev(tmp, &rbd_dev_list) {
4206                 struct rbd_device *rbd_dev;
4207
4208                 rbd_dev = list_entry(tmp, struct rbd_device, node);
4209                 if (rbd_dev->dev_id > max_id)
4210                         max_id = rbd_dev->dev_id;
4211         }
4212         spin_unlock(&rbd_dev_list_lock);
4213
4214         /*
4215          * The max id could have been updated by rbd_dev_id_get(), in
4216          * which case it now accurately reflects the new maximum.
4217          * Be careful not to overwrite the maximum value in that
4218          * case.
4219          */
4220         atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
4221         dout("  max dev id has been reset\n");
4222 }
4223
4224 /*
4225  * Skips over white space at *buf, and updates *buf to point to the
4226  * first found non-space character (if any). Returns the length of
4227  * the token (string of non-white space characters) found.  Note
4228  * that *buf must be terminated with '\0'.
4229  */
4230 static inline size_t next_token(const char **buf)
4231 {
4232         /*
4233         * These are the characters that produce nonzero for
4234         * isspace() in the "C" and "POSIX" locales.
4235         */
4236         const char *spaces = " \f\n\r\t\v";
4237
4238         *buf += strspn(*buf, spaces);   /* Find start of token */
4239
4240         return strcspn(*buf, spaces);   /* Return token length */
4241 }
4242
4243 /*
4244  * Finds the next token in *buf, and if the provided token buffer is
4245  * big enough, copies the found token into it.  The result, if
4246  * copied, is guaranteed to be terminated with '\0'.  Note that *buf
4247  * must be terminated with '\0' on entry.
4248  *
4249  * Returns the length of the token found (not including the '\0').
4250  * Return value will be 0 if no token is found, and it will be >=
4251  * token_size if the token would not fit.
4252  *
4253  * The *buf pointer will be updated to point beyond the end of the
4254  * found token.  Note that this occurs even if the token buffer is
4255  * too small to hold it.
4256  */
4257 static inline size_t copy_token(const char **buf,
4258                                 char *token,
4259                                 size_t token_size)
4260 {
4261         size_t len;
4262
4263         len = next_token(buf);
4264         if (len < token_size) {
4265                 memcpy(token, *buf, len);
4266                 *(token + len) = '\0';
4267         }
4268         *buf += len;
4269
4270         return len;
4271 }
4272
4273 /*
4274  * Finds the next token in *buf, dynamically allocates a buffer big
4275  * enough to hold a copy of it, and copies the token into the new
4276  * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
4277  * that a duplicate buffer is created even for a zero-length token.
4278  *
4279  * Returns a pointer to the newly-allocated duplicate, or a null
4280  * pointer if memory for the duplicate was not available.  If
4281  * the lenp argument is a non-null pointer, the length of the token
4282  * (not including the '\0') is returned in *lenp.
4283  *
4284  * If successful, the *buf pointer will be updated to point beyond
4285  * the end of the found token.
4286  *
4287  * Note: uses GFP_KERNEL for allocation.
4288  */
4289 static inline char *dup_token(const char **buf, size_t *lenp)
4290 {
4291         char *dup;
4292         size_t len;
4293
4294         len = next_token(buf);
4295         dup = kmemdup(*buf, len + 1, GFP_KERNEL);
4296         if (!dup)
4297                 return NULL;
4298         *(dup + len) = '\0';
4299         *buf += len;
4300
4301         if (lenp)
4302                 *lenp = len;
4303
4304         return dup;
4305 }
4306
4307 /*
4308  * Parse the options provided for an "rbd add" (i.e., rbd image
4309  * mapping) request.  These arrive via a write to /sys/bus/rbd/add,
4310  * and the data written is passed here via a NUL-terminated buffer.
4311  * Returns 0 if successful or an error code otherwise.
4312  *
4313  * The information extracted from these options is recorded in
4314  * the other parameters which return dynamically-allocated
4315  * structures:
4316  *  ceph_opts
4317  *      The address of a pointer that will refer to a ceph options
4318  *      structure.  Caller must release the returned pointer using
4319  *      ceph_destroy_options() when it is no longer needed.
4320  *  rbd_opts
4321  *      Address of an rbd options pointer.  Fully initialized by
4322  *      this function; caller must release with kfree().
4323  *  spec
4324  *      Address of an rbd image specification pointer.  Fully
4325  *      initialized by this function based on parsed options.
4326  *      Caller must release with rbd_spec_put().
4327  *
4328  * The options passed take this form:
4329  *  <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
4330  * where:
4331  *  <mon_addrs>
4332  *      A comma-separated list of one or more monitor addresses.
4333  *      A monitor address is an ip address, optionally followed
4334  *      by a port number (separated by a colon).
4335  *        I.e.:  ip1[:port1][,ip2[:port2]...]
4336  *  <options>
4337  *      A comma-separated list of ceph and/or rbd options.
4338  *  <pool_name>
4339  *      The name of the rados pool containing the rbd image.
4340  *  <image_name>
4341  *      The name of the image in that pool to map.
4342  *  <snap_id>
4343  *      An optional snapshot id.  If provided, the mapping will
4344  *      present data from the image at the time that snapshot was
4345  *      created.  The image head is used if no snapshot id is
4346  *      provided.  Snapshot mappings are always read-only.
4347  */
4348 static int rbd_add_parse_args(const char *buf,
4349                                 struct ceph_options **ceph_opts,
4350                                 struct rbd_options **opts,
4351                                 struct rbd_spec **rbd_spec)
4352 {
4353         size_t len;
4354         char *options;
4355         const char *mon_addrs;
4356         char *snap_name;
4357         size_t mon_addrs_size;
4358         struct rbd_spec *spec = NULL;
4359         struct rbd_options *rbd_opts = NULL;
4360         struct ceph_options *copts;
4361         int ret;
4362
4363         /* The first four tokens are required */
4364
4365         len = next_token(&buf);
4366         if (!len) {
4367                 rbd_warn(NULL, "no monitor address(es) provided");
4368                 return -EINVAL;
4369         }
4370         mon_addrs = buf;
4371         mon_addrs_size = len + 1;
4372         buf += len;
4373
4374         ret = -EINVAL;
4375         options = dup_token(&buf, NULL);
4376         if (!options)
4377                 return -ENOMEM;
4378         if (!*options) {
4379                 rbd_warn(NULL, "no options provided");
4380                 goto out_err;
4381         }
4382
4383         spec = rbd_spec_alloc();
4384         if (!spec)
4385                 goto out_mem;
4386
4387         spec->pool_name = dup_token(&buf, NULL);
4388         if (!spec->pool_name)
4389                 goto out_mem;
4390         if (!*spec->pool_name) {
4391                 rbd_warn(NULL, "no pool name provided");
4392                 goto out_err;
4393         }
4394
4395         spec->image_name = dup_token(&buf, NULL);
4396         if (!spec->image_name)
4397                 goto out_mem;
4398         if (!*spec->image_name) {
4399                 rbd_warn(NULL, "no image name provided");
4400                 goto out_err;
4401         }
4402
4403         /*
4404          * Snapshot name is optional; default is to use "-"
4405          * (indicating the head/no snapshot).
4406          */
4407         len = next_token(&buf);
4408         if (!len) {
4409                 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
4410                 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
4411         } else if (len > RBD_MAX_SNAP_NAME_LEN) {
4412                 ret = -ENAMETOOLONG;
4413                 goto out_err;
4414         }
4415         snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
4416         if (!snap_name)
4417                 goto out_mem;
4418         *(snap_name + len) = '\0';
4419         spec->snap_name = snap_name;
4420
4421         /* Initialize all rbd options to the defaults */
4422
4423         rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
4424         if (!rbd_opts)
4425                 goto out_mem;
4426
4427         rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
4428
4429         copts = ceph_parse_options(options, mon_addrs,
4430                                         mon_addrs + mon_addrs_size - 1,
4431                                         parse_rbd_opts_token, rbd_opts);
4432         if (IS_ERR(copts)) {
4433                 ret = PTR_ERR(copts);
4434                 goto out_err;
4435         }
4436         kfree(options);
4437
4438         *ceph_opts = copts;
4439         *opts = rbd_opts;
4440         *rbd_spec = spec;
4441
4442         return 0;
4443 out_mem:
4444         ret = -ENOMEM;
4445 out_err:
4446         kfree(rbd_opts);
4447         rbd_spec_put(spec);
4448         kfree(options);
4449
4450         return ret;
4451 }
4452
4453 /*
4454  * An rbd format 2 image has a unique identifier, distinct from the
4455  * name given to it by the user.  Internally, that identifier is
4456  * what's used to specify the names of objects related to the image.
4457  *
4458  * A special "rbd id" object is used to map an rbd image name to its
4459  * id.  If that object doesn't exist, then there is no v2 rbd image
4460  * with the supplied name.
4461  *
4462  * This function will record the given rbd_dev's image_id field if
4463  * it can be determined, and in that case will return 0.  If any
4464  * errors occur a negative errno will be returned and the rbd_dev's
4465  * image_id field will be unchanged (and should be NULL).
4466  */
4467 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
4468 {
4469         int ret;
4470         size_t size;
4471         char *object_name;
4472         void *response;
4473         char *image_id;
4474
4475         /*
4476          * When probing a parent image, the image id is already
4477          * known (and the image name likely is not).  There's no
4478          * need to fetch the image id again in this case.  We
4479          * do still need to set the image format though.
4480          */
4481         if (rbd_dev->spec->image_id) {
4482                 rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1;
4483
4484                 return 0;
4485         }
4486
4487         /*
4488          * First, see if the format 2 image id file exists, and if
4489          * so, get the image's persistent id from it.
4490          */
4491         size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
4492         object_name = kmalloc(size, GFP_NOIO);
4493         if (!object_name)
4494                 return -ENOMEM;
4495         sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
4496         dout("rbd id object name is %s\n", object_name);
4497
4498         /* Response will be an encoded string, which includes a length */
4499
4500         size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
4501         response = kzalloc(size, GFP_NOIO);
4502         if (!response) {
4503                 ret = -ENOMEM;
4504                 goto out;
4505         }
4506
4507         /* If it doesn't exist we'll assume it's a format 1 image */
4508
4509         ret = rbd_obj_method_sync(rbd_dev, object_name,
4510                                 "rbd", "get_id", NULL, 0,
4511                                 response, RBD_IMAGE_ID_LEN_MAX, NULL);
4512         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4513         if (ret == -ENOENT) {
4514                 image_id = kstrdup("", GFP_KERNEL);
4515                 ret = image_id ? 0 : -ENOMEM;
4516                 if (!ret)
4517                         rbd_dev->image_format = 1;
4518         } else if (ret > sizeof (__le32)) {
4519                 void *p = response;
4520
4521                 image_id = ceph_extract_encoded_string(&p, p + ret,
4522                                                 NULL, GFP_NOIO);
4523                 ret = IS_ERR(image_id) ? PTR_ERR(image_id) : 0;
4524                 if (!ret)
4525                         rbd_dev->image_format = 2;
4526         } else {
4527                 ret = -EINVAL;
4528         }
4529
4530         if (!ret) {
4531                 rbd_dev->spec->image_id = image_id;
4532                 dout("image_id is %s\n", image_id);
4533         }
4534 out:
4535         kfree(response);
4536         kfree(object_name);
4537
4538         return ret;
4539 }
4540
4541 /* Undo whatever state changes are made by v1 or v2 image probe */
4542
4543 static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
4544 {
4545         struct rbd_image_header *header;
4546
4547         rbd_dev_remove_parent(rbd_dev);
4548         rbd_spec_put(rbd_dev->parent_spec);
4549         rbd_dev->parent_spec = NULL;
4550         rbd_dev->parent_overlap = 0;
4551
4552         /* Free dynamic fields from the header, then zero it out */
4553
4554         header = &rbd_dev->header;
4555         ceph_put_snap_context(header->snapc);
4556         kfree(header->snap_sizes);
4557         kfree(header->snap_names);
4558         kfree(header->object_prefix);
4559         memset(header, 0, sizeof (*header));
4560 }
4561
4562 static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
4563 {
4564         int ret;
4565
4566         /* Populate rbd image metadata */
4567
4568         ret = rbd_read_header(rbd_dev, &rbd_dev->header);
4569         if (ret < 0)
4570                 goto out_err;
4571
4572         /* Version 1 images have no parent (no layering) */
4573
4574         rbd_dev->parent_spec = NULL;
4575         rbd_dev->parent_overlap = 0;
4576
4577         dout("discovered version 1 image, header name is %s\n",
4578                 rbd_dev->header_name);
4579
4580         return 0;
4581
4582 out_err:
4583         kfree(rbd_dev->header_name);
4584         rbd_dev->header_name = NULL;
4585         kfree(rbd_dev->spec->image_id);
4586         rbd_dev->spec->image_id = NULL;
4587
4588         return ret;
4589 }
4590
4591 static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
4592 {
4593         int ret;
4594         u64 ver = 0;
4595
4596         ret = rbd_dev_v2_image_size(rbd_dev);
4597         if (ret)
4598                 goto out_err;
4599
4600         /* Get the object prefix (a.k.a. block_name) for the image */
4601
4602         ret = rbd_dev_v2_object_prefix(rbd_dev);
4603         if (ret)
4604                 goto out_err;
4605
4606         /* Get the and check features for the image */
4607
4608         ret = rbd_dev_v2_features(rbd_dev);
4609         if (ret)
4610                 goto out_err;
4611
4612         /* If the image supports layering, get the parent info */
4613
4614         if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
4615                 ret = rbd_dev_v2_parent_info(rbd_dev);
4616                 if (ret)
4617                         goto out_err;
4618
4619                 /*
4620                  * Don't print a warning for parent images.  We can
4621                  * tell this point because we won't know its pool
4622                  * name yet (just its pool id).
4623                  */
4624                 if (rbd_dev->spec->pool_name)
4625                         rbd_warn(rbd_dev, "WARNING: kernel layering "
4626                                         "is EXPERIMENTAL!");
4627         }
4628
4629         /* If the image supports fancy striping, get its parameters */
4630
4631         if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
4632                 ret = rbd_dev_v2_striping_info(rbd_dev);
4633                 if (ret < 0)
4634                         goto out_err;
4635         }
4636
4637         /* crypto and compression type aren't (yet) supported for v2 images */
4638
4639         rbd_dev->header.crypt_type = 0;
4640         rbd_dev->header.comp_type = 0;
4641
4642         /* Get the snapshot context, plus the header version */
4643
4644         ret = rbd_dev_v2_snap_context(rbd_dev, &ver);
4645         if (ret)
4646                 goto out_err;
4647
4648         dout("discovered version 2 image, header name is %s\n",
4649                 rbd_dev->header_name);
4650
4651         return 0;
4652 out_err:
4653         rbd_dev->parent_overlap = 0;
4654         rbd_spec_put(rbd_dev->parent_spec);
4655         rbd_dev->parent_spec = NULL;
4656         kfree(rbd_dev->header_name);
4657         rbd_dev->header_name = NULL;
4658         kfree(rbd_dev->header.object_prefix);
4659         rbd_dev->header.object_prefix = NULL;
4660
4661         return ret;
4662 }
4663
4664 static int rbd_dev_probe_parent(struct rbd_device *rbd_dev)
4665 {
4666         struct rbd_device *parent = NULL;
4667         struct rbd_spec *parent_spec;
4668         struct rbd_client *rbdc;
4669         int ret;
4670
4671         if (!rbd_dev->parent_spec)
4672                 return 0;
4673         /*
4674          * We need to pass a reference to the client and the parent
4675          * spec when creating the parent rbd_dev.  Images related by
4676          * parent/child relationships always share both.
4677          */
4678         parent_spec = rbd_spec_get(rbd_dev->parent_spec);
4679         rbdc = __rbd_get_client(rbd_dev->rbd_client);
4680
4681         ret = -ENOMEM;
4682         parent = rbd_dev_create(rbdc, parent_spec);
4683         if (!parent)
4684                 goto out_err;
4685
4686         ret = rbd_dev_image_probe(parent);
4687         if (ret < 0)
4688                 goto out_err;
4689         rbd_dev->parent = parent;
4690
4691         return 0;
4692 out_err:
4693         if (parent) {
4694                 rbd_spec_put(rbd_dev->parent_spec);
4695                 kfree(rbd_dev->header_name);
4696                 rbd_dev_destroy(parent);
4697         } else {
4698                 rbd_put_client(rbdc);
4699                 rbd_spec_put(parent_spec);
4700         }
4701
4702         return ret;
4703 }
4704
4705 static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
4706 {
4707         int ret;
4708
4709         ret = rbd_dev_mapping_set(rbd_dev);
4710         if (ret)
4711                 return ret;
4712
4713         /* generate unique id: find highest unique id, add one */
4714         rbd_dev_id_get(rbd_dev);
4715
4716         /* Fill in the device name, now that we have its id. */
4717         BUILD_BUG_ON(DEV_NAME_LEN
4718                         < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
4719         sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
4720
4721         /* Get our block major device number. */
4722
4723         ret = register_blkdev(0, rbd_dev->name);
4724         if (ret < 0)
4725                 goto err_out_id;
4726         rbd_dev->major = ret;
4727
4728         /* Set up the blkdev mapping. */
4729
4730         ret = rbd_init_disk(rbd_dev);
4731         if (ret)
4732                 goto err_out_blkdev;
4733
4734         ret = rbd_bus_add_dev(rbd_dev);
4735         if (ret)
4736                 goto err_out_disk;
4737
4738         /* Everything's ready.  Announce the disk to the world. */
4739
4740         set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
4741         set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4742         add_disk(rbd_dev->disk);
4743
4744         pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
4745                 (unsigned long long) rbd_dev->mapping.size);
4746
4747         return ret;
4748
4749 err_out_disk:
4750         rbd_free_disk(rbd_dev);
4751 err_out_blkdev:
4752         unregister_blkdev(rbd_dev->major, rbd_dev->name);
4753 err_out_id:
4754         rbd_dev_id_put(rbd_dev);
4755         rbd_dev_mapping_clear(rbd_dev);
4756
4757         return ret;
4758 }
4759
4760 static int rbd_dev_header_name(struct rbd_device *rbd_dev)
4761 {
4762         struct rbd_spec *spec = rbd_dev->spec;
4763         size_t size;
4764
4765         /* Record the header object name for this rbd image. */
4766
4767         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4768
4769         if (rbd_dev->image_format == 1)
4770                 size = strlen(spec->image_name) + sizeof (RBD_SUFFIX);
4771         else
4772                 size = sizeof (RBD_HEADER_PREFIX) + strlen(spec->image_id);
4773
4774         rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
4775         if (!rbd_dev->header_name)
4776                 return -ENOMEM;
4777
4778         if (rbd_dev->image_format == 1)
4779                 sprintf(rbd_dev->header_name, "%s%s",
4780                         spec->image_name, RBD_SUFFIX);
4781         else
4782                 sprintf(rbd_dev->header_name, "%s%s",
4783                         RBD_HEADER_PREFIX, spec->image_id);
4784         return 0;
4785 }
4786
4787 static void rbd_dev_image_release(struct rbd_device *rbd_dev)
4788 {
4789         int ret;
4790
4791         rbd_remove_all_snaps(rbd_dev);
4792         rbd_dev_unprobe(rbd_dev);
4793         ret = rbd_dev_header_watch_sync(rbd_dev, 0);
4794         if (ret)
4795                 rbd_warn(rbd_dev, "failed to cancel watch event (%d)\n", ret);
4796         kfree(rbd_dev->header_name);
4797         rbd_dev->header_name = NULL;
4798         rbd_dev->image_format = 0;
4799         kfree(rbd_dev->spec->image_id);
4800         rbd_dev->spec->image_id = NULL;
4801
4802         rbd_dev_destroy(rbd_dev);
4803 }
4804
4805 /*
4806  * Probe for the existence of the header object for the given rbd
4807  * device.  For format 2 images this includes determining the image
4808  * id.
4809  */
4810 static int rbd_dev_image_probe(struct rbd_device *rbd_dev)
4811 {
4812         int ret;
4813         int tmp;
4814
4815         /*
4816          * Get the id from the image id object.  If it's not a
4817          * format 2 image, we'll get ENOENT back, and we'll assume
4818          * it's a format 1 image.
4819          */
4820         ret = rbd_dev_image_id(rbd_dev);
4821         if (ret)
4822                 return ret;
4823         rbd_assert(rbd_dev->spec->image_id);
4824         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4825
4826         ret = rbd_dev_header_name(rbd_dev);
4827         if (ret)
4828                 goto err_out_format;
4829
4830         ret = rbd_dev_header_watch_sync(rbd_dev, 1);
4831         if (ret)
4832                 goto out_header_name;
4833
4834         if (rbd_dev->image_format == 1)
4835                 ret = rbd_dev_v1_probe(rbd_dev);
4836         else
4837                 ret = rbd_dev_v2_probe(rbd_dev);
4838         if (ret)
4839                 goto err_out_watch;
4840
4841         ret = rbd_dev_snaps_update(rbd_dev);
4842         if (ret)
4843                 goto err_out_probe;
4844
4845         ret = rbd_dev_spec_update(rbd_dev);
4846         if (ret)
4847                 goto err_out_snaps;
4848
4849         ret = rbd_dev_probe_parent(rbd_dev);
4850         if (!ret)
4851                 return 0;
4852
4853 err_out_snaps:
4854         rbd_remove_all_snaps(rbd_dev);
4855 err_out_probe:
4856         rbd_dev_unprobe(rbd_dev);
4857 err_out_watch:
4858         tmp = rbd_dev_header_watch_sync(rbd_dev, 0);
4859         if (tmp)
4860                 rbd_warn(rbd_dev, "unable to tear down watch request\n");
4861 out_header_name:
4862         kfree(rbd_dev->header_name);
4863         rbd_dev->header_name = NULL;
4864 err_out_format:
4865         rbd_dev->image_format = 0;
4866         kfree(rbd_dev->spec->image_id);
4867         rbd_dev->spec->image_id = NULL;
4868
4869         dout("probe failed, returning %d\n", ret);
4870
4871         return ret;
4872 }
4873
4874 static ssize_t rbd_add(struct bus_type *bus,
4875                        const char *buf,
4876                        size_t count)
4877 {
4878         struct rbd_device *rbd_dev = NULL;
4879         struct ceph_options *ceph_opts = NULL;
4880         struct rbd_options *rbd_opts = NULL;
4881         struct rbd_spec *spec = NULL;
4882         struct rbd_client *rbdc;
4883         struct ceph_osd_client *osdc;
4884         int rc = -ENOMEM;
4885
4886         if (!try_module_get(THIS_MODULE))
4887                 return -ENODEV;
4888
4889         /* parse add command */
4890         rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
4891         if (rc < 0)
4892                 goto err_out_module;
4893
4894         rbdc = rbd_get_client(ceph_opts);
4895         if (IS_ERR(rbdc)) {
4896                 rc = PTR_ERR(rbdc);
4897                 goto err_out_args;
4898         }
4899         ceph_opts = NULL;       /* rbd_dev client now owns this */
4900
4901         /* pick the pool */
4902         osdc = &rbdc->client->osdc;
4903         rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
4904         if (rc < 0)
4905                 goto err_out_client;
4906         spec->pool_id = (u64)rc;
4907
4908         /* The ceph file layout needs to fit pool id in 32 bits */
4909
4910         if (spec->pool_id > (u64)U32_MAX) {
4911                 rbd_warn(NULL, "pool id too large (%llu > %u)\n",
4912                                 (unsigned long long)spec->pool_id, U32_MAX);
4913                 rc = -EIO;
4914                 goto err_out_client;
4915         }
4916
4917         rbd_dev = rbd_dev_create(rbdc, spec);
4918         if (!rbd_dev)
4919                 goto err_out_client;
4920         rbdc = NULL;            /* rbd_dev now owns this */
4921         spec = NULL;            /* rbd_dev now owns this */
4922
4923         rbd_dev->mapping.read_only = rbd_opts->read_only;
4924         kfree(rbd_opts);
4925         rbd_opts = NULL;        /* done with this */
4926
4927         rc = rbd_dev_image_probe(rbd_dev);
4928         if (rc < 0)
4929                 goto err_out_rbd_dev;
4930
4931         rc = rbd_dev_device_setup(rbd_dev);
4932         if (!rc)
4933                 return count;
4934
4935         rbd_dev_image_release(rbd_dev);
4936 err_out_rbd_dev:
4937         rbd_dev_destroy(rbd_dev);
4938 err_out_client:
4939         rbd_put_client(rbdc);
4940 err_out_args:
4941         if (ceph_opts)
4942                 ceph_destroy_options(ceph_opts);
4943         kfree(rbd_opts);
4944         rbd_spec_put(spec);
4945 err_out_module:
4946         module_put(THIS_MODULE);
4947
4948         dout("Error adding device %s\n", buf);
4949
4950         return (ssize_t)rc;
4951 }
4952
4953 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
4954 {
4955         struct list_head *tmp;
4956         struct rbd_device *rbd_dev;
4957
4958         spin_lock(&rbd_dev_list_lock);
4959         list_for_each(tmp, &rbd_dev_list) {
4960                 rbd_dev = list_entry(tmp, struct rbd_device, node);
4961                 if (rbd_dev->dev_id == dev_id) {
4962                         spin_unlock(&rbd_dev_list_lock);
4963                         return rbd_dev;
4964                 }
4965         }
4966         spin_unlock(&rbd_dev_list_lock);
4967         return NULL;
4968 }
4969
4970 static void rbd_dev_device_release(struct device *dev)
4971 {
4972         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4973
4974         rbd_free_disk(rbd_dev);
4975         clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4976         rbd_dev_clear_mapping(rbd_dev);
4977         unregister_blkdev(rbd_dev->major, rbd_dev->name);
4978         rbd_dev->major = 0;
4979         rbd_dev_id_put(rbd_dev);
4980         rbd_dev_mapping_clear(rbd_dev);
4981 }
4982
4983 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
4984 {
4985         while (rbd_dev->parent) {
4986                 struct rbd_device *first = rbd_dev;
4987                 struct rbd_device *second = first->parent;
4988                 struct rbd_device *third;
4989
4990                 /*
4991                  * Follow to the parent with no grandparent and
4992                  * remove it.
4993                  */
4994                 while (second && (third = second->parent)) {
4995                         first = second;
4996                         second = third;
4997                 }
4998                 rbd_assert(second);
4999                 rbd_dev_image_release(second);
5000                 first->parent = NULL;
5001                 first->parent_overlap = 0;
5002
5003                 rbd_assert(first->parent_spec);
5004                 rbd_spec_put(first->parent_spec);
5005                 first->parent_spec = NULL;
5006         }
5007 }
5008
5009 static ssize_t rbd_remove(struct bus_type *bus,
5010                           const char *buf,
5011                           size_t count)
5012 {
5013         struct rbd_device *rbd_dev = NULL;
5014         int target_id;
5015         unsigned long ul;
5016         int ret;
5017
5018         ret = strict_strtoul(buf, 10, &ul);
5019         if (ret)
5020                 return ret;
5021
5022         /* convert to int; abort if we lost anything in the conversion */
5023         target_id = (int) ul;
5024         if (target_id != ul)
5025                 return -EINVAL;
5026
5027         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
5028
5029         rbd_dev = __rbd_get_dev(target_id);
5030         if (!rbd_dev) {
5031                 ret = -ENOENT;
5032                 goto done;
5033         }
5034
5035         spin_lock_irq(&rbd_dev->lock);
5036         if (rbd_dev->open_count)
5037                 ret = -EBUSY;
5038         else
5039                 set_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags);
5040         spin_unlock_irq(&rbd_dev->lock);
5041         if (ret < 0)
5042                 goto done;
5043         ret = count;
5044         rbd_bus_del_dev(rbd_dev);
5045         rbd_dev_image_release(rbd_dev);
5046         module_put(THIS_MODULE);
5047 done:
5048         mutex_unlock(&ctl_mutex);
5049
5050         return ret;
5051 }
5052
5053 /*
5054  * create control files in sysfs
5055  * /sys/bus/rbd/...
5056  */
5057 static int rbd_sysfs_init(void)
5058 {
5059         int ret;
5060
5061         ret = device_register(&rbd_root_dev);
5062         if (ret < 0)
5063                 return ret;
5064
5065         ret = bus_register(&rbd_bus_type);
5066         if (ret < 0)
5067                 device_unregister(&rbd_root_dev);
5068
5069         return ret;
5070 }
5071
5072 static void rbd_sysfs_cleanup(void)
5073 {
5074         bus_unregister(&rbd_bus_type);
5075         device_unregister(&rbd_root_dev);
5076 }
5077
5078 static int __init rbd_init(void)
5079 {
5080         int rc;
5081
5082         if (!libceph_compatible(NULL)) {
5083                 rbd_warn(NULL, "libceph incompatibility (quitting)");
5084
5085                 return -EINVAL;
5086         }
5087         rc = rbd_sysfs_init();
5088         if (rc)
5089                 return rc;
5090         pr_info("loaded " RBD_DRV_NAME_LONG "\n");
5091         return 0;
5092 }
5093
5094 static void __exit rbd_exit(void)
5095 {
5096         rbd_sysfs_cleanup();
5097 }
5098
5099 module_init(rbd_init);
5100 module_exit(rbd_exit);
5101
5102 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
5103 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
5104 MODULE_DESCRIPTION("rados block device");
5105
5106 /* following authorship retained from original osdblk.c */
5107 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
5108
5109 MODULE_LICENSE("GPL");