]> git.kernelconcepts.de Git - karo-tx-linux.git/blob - drivers/block/rbd.c
aoe: "payload" sysfs file exports per-AoE-command data transfer size
[karo-tx-linux.git] / drivers / block / rbd.c
1 /*
2    rbd.c -- Export ceph rados objects as a Linux block device
3
4
5    based on drivers/block/osdblk.c:
6
7    Copyright 2009 Red Hat, Inc.
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation.
12
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; see the file COPYING.  If not, write to
20    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
24    For usage instructions, please refer to:
25
26                  Documentation/ABI/testing/sysfs-bus-rbd
27
28  */
29
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
35
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
39 #include <linux/fs.h>
40 #include <linux/blkdev.h>
41
42 #include "rbd_types.h"
43
44 #define RBD_DEBUG       /* Activate rbd_assert() calls */
45
46 /*
47  * The basic unit of block I/O is a sector.  It is interpreted in a
48  * number of contexts in Linux (blk, bio, genhd), but the default is
49  * universally 512 bytes.  These symbols are just slightly more
50  * meaningful than the bare numbers they represent.
51  */
52 #define SECTOR_SHIFT    9
53 #define SECTOR_SIZE     (1ULL << SECTOR_SHIFT)
54
55 /* It might be useful to have this defined elsewhere too */
56
57 #define U64_MAX ((u64) (~0ULL))
58
59 #define RBD_DRV_NAME "rbd"
60 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
61
62 #define RBD_MINORS_PER_MAJOR    256             /* max minors per blkdev */
63
64 #define RBD_MAX_SNAP_NAME_LEN   32
65 #define RBD_MAX_SNAP_COUNT      510     /* allows max snapc to fit in 4KB */
66 #define RBD_MAX_OPT_LEN         1024
67
68 #define RBD_SNAP_HEAD_NAME      "-"
69
70 #define RBD_IMAGE_ID_LEN_MAX    64
71 #define RBD_OBJ_PREFIX_LEN_MAX  64
72
73 /*
74  * An RBD device name will be "rbd#", where the "rbd" comes from
75  * RBD_DRV_NAME above, and # is a unique integer identifier.
76  * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
77  * enough to hold all possible device names.
78  */
79 #define DEV_NAME_LEN            32
80 #define MAX_INT_FORMAT_WIDTH    ((5 * sizeof (int)) / 2 + 1)
81
82 #define RBD_READ_ONLY_DEFAULT           false
83
84 /*
85  * block device image metadata (in-memory version)
86  */
87 struct rbd_image_header {
88         /* These four fields never change for a given rbd image */
89         char *object_prefix;
90         u64 features;
91         __u8 obj_order;
92         __u8 crypt_type;
93         __u8 comp_type;
94
95         /* The remaining fields need to be updated occasionally */
96         u64 image_size;
97         struct ceph_snap_context *snapc;
98         char *snap_names;
99         u64 *snap_sizes;
100
101         u64 obj_version;
102 };
103
104 struct rbd_options {
105         bool    read_only;
106 };
107
108 /*
109  * an instance of the client.  multiple devices may share an rbd client.
110  */
111 struct rbd_client {
112         struct ceph_client      *client;
113         struct kref             kref;
114         struct list_head        node;
115 };
116
117 /*
118  * a request completion status
119  */
120 struct rbd_req_status {
121         int done;
122         int rc;
123         u64 bytes;
124 };
125
126 /*
127  * a collection of requests
128  */
129 struct rbd_req_coll {
130         int                     total;
131         int                     num_done;
132         struct kref             kref;
133         struct rbd_req_status   status[0];
134 };
135
136 /*
137  * a single io request
138  */
139 struct rbd_request {
140         struct request          *rq;            /* blk layer request */
141         struct bio              *bio;           /* cloned bio */
142         struct page             **pages;        /* list of used pages */
143         u64                     len;
144         int                     coll_index;
145         struct rbd_req_coll     *coll;
146 };
147
148 struct rbd_snap {
149         struct  device          dev;
150         const char              *name;
151         u64                     size;
152         struct list_head        node;
153         u64                     id;
154         u64                     features;
155 };
156
157 struct rbd_mapping {
158         char                    *snap_name;
159         u64                     snap_id;
160         u64                     size;
161         u64                     features;
162         bool                    snap_exists;
163         bool                    read_only;
164 };
165
166 /*
167  * a single device
168  */
169 struct rbd_device {
170         int                     dev_id;         /* blkdev unique id */
171
172         int                     major;          /* blkdev assigned major */
173         struct gendisk          *disk;          /* blkdev's gendisk and rq */
174
175         u32                     image_format;   /* Either 1 or 2 */
176         struct rbd_options      rbd_opts;
177         struct rbd_client       *rbd_client;
178
179         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
180
181         spinlock_t              lock;           /* queue lock */
182
183         struct rbd_image_header header;
184         char                    *image_id;
185         size_t                  image_id_len;
186         char                    *image_name;
187         size_t                  image_name_len;
188         char                    *header_name;
189         char                    *pool_name;
190         int                     pool_id;
191
192         struct ceph_osd_event   *watch_event;
193         struct ceph_osd_request *watch_request;
194
195         /* protects updating the header */
196         struct rw_semaphore     header_rwsem;
197
198         struct rbd_mapping      mapping;
199
200         struct list_head        node;
201
202         /* list of snapshots */
203         struct list_head        snaps;
204
205         /* sysfs related */
206         struct device           dev;
207 };
208
209 static DEFINE_MUTEX(ctl_mutex);   /* Serialize open/close/setup/teardown */
210
211 static LIST_HEAD(rbd_dev_list);    /* devices */
212 static DEFINE_SPINLOCK(rbd_dev_list_lock);
213
214 static LIST_HEAD(rbd_client_list);              /* clients */
215 static DEFINE_SPINLOCK(rbd_client_list_lock);
216
217 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev);
218 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev);
219
220 static void rbd_dev_release(struct device *dev);
221 static void __rbd_remove_snap_dev(struct rbd_snap *snap);
222
223 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
224                        size_t count);
225 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
226                           size_t count);
227
228 static struct bus_attribute rbd_bus_attrs[] = {
229         __ATTR(add, S_IWUSR, NULL, rbd_add),
230         __ATTR(remove, S_IWUSR, NULL, rbd_remove),
231         __ATTR_NULL
232 };
233
234 static struct bus_type rbd_bus_type = {
235         .name           = "rbd",
236         .bus_attrs      = rbd_bus_attrs,
237 };
238
239 static void rbd_root_dev_release(struct device *dev)
240 {
241 }
242
243 static struct device rbd_root_dev = {
244         .init_name =    "rbd",
245         .release =      rbd_root_dev_release,
246 };
247
248 #ifdef RBD_DEBUG
249 #define rbd_assert(expr)                                                \
250                 if (unlikely(!(expr))) {                                \
251                         printk(KERN_ERR "\nAssertion failure in %s() "  \
252                                                 "at line %d:\n\n"       \
253                                         "\trbd_assert(%s);\n\n",        \
254                                         __func__, __LINE__, #expr);     \
255                         BUG();                                          \
256                 }
257 #else /* !RBD_DEBUG */
258 #  define rbd_assert(expr)      ((void) 0)
259 #endif /* !RBD_DEBUG */
260
261 static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
262 {
263         return get_device(&rbd_dev->dev);
264 }
265
266 static void rbd_put_dev(struct rbd_device *rbd_dev)
267 {
268         put_device(&rbd_dev->dev);
269 }
270
271 static int rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver);
272
273 static int rbd_open(struct block_device *bdev, fmode_t mode)
274 {
275         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
276
277         if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
278                 return -EROFS;
279
280         rbd_get_dev(rbd_dev);
281         set_device_ro(bdev, rbd_dev->mapping.read_only);
282
283         return 0;
284 }
285
286 static int rbd_release(struct gendisk *disk, fmode_t mode)
287 {
288         struct rbd_device *rbd_dev = disk->private_data;
289
290         rbd_put_dev(rbd_dev);
291
292         return 0;
293 }
294
295 static const struct block_device_operations rbd_bd_ops = {
296         .owner                  = THIS_MODULE,
297         .open                   = rbd_open,
298         .release                = rbd_release,
299 };
300
301 /*
302  * Initialize an rbd client instance.
303  * We own *ceph_opts.
304  */
305 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
306 {
307         struct rbd_client *rbdc;
308         int ret = -ENOMEM;
309
310         dout("rbd_client_create\n");
311         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
312         if (!rbdc)
313                 goto out_opt;
314
315         kref_init(&rbdc->kref);
316         INIT_LIST_HEAD(&rbdc->node);
317
318         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
319
320         rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
321         if (IS_ERR(rbdc->client))
322                 goto out_mutex;
323         ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
324
325         ret = ceph_open_session(rbdc->client);
326         if (ret < 0)
327                 goto out_err;
328
329         spin_lock(&rbd_client_list_lock);
330         list_add_tail(&rbdc->node, &rbd_client_list);
331         spin_unlock(&rbd_client_list_lock);
332
333         mutex_unlock(&ctl_mutex);
334
335         dout("rbd_client_create created %p\n", rbdc);
336         return rbdc;
337
338 out_err:
339         ceph_destroy_client(rbdc->client);
340 out_mutex:
341         mutex_unlock(&ctl_mutex);
342         kfree(rbdc);
343 out_opt:
344         if (ceph_opts)
345                 ceph_destroy_options(ceph_opts);
346         return ERR_PTR(ret);
347 }
348
349 /*
350  * Find a ceph client with specific addr and configuration.  If
351  * found, bump its reference count.
352  */
353 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
354 {
355         struct rbd_client *client_node;
356         bool found = false;
357
358         if (ceph_opts->flags & CEPH_OPT_NOSHARE)
359                 return NULL;
360
361         spin_lock(&rbd_client_list_lock);
362         list_for_each_entry(client_node, &rbd_client_list, node) {
363                 if (!ceph_compare_options(ceph_opts, client_node->client)) {
364                         kref_get(&client_node->kref);
365                         found = true;
366                         break;
367                 }
368         }
369         spin_unlock(&rbd_client_list_lock);
370
371         return found ? client_node : NULL;
372 }
373
374 /*
375  * mount options
376  */
377 enum {
378         Opt_last_int,
379         /* int args above */
380         Opt_last_string,
381         /* string args above */
382         Opt_read_only,
383         Opt_read_write,
384         /* Boolean args above */
385         Opt_last_bool,
386 };
387
388 static match_table_t rbd_opts_tokens = {
389         /* int args above */
390         /* string args above */
391         {Opt_read_only, "mapping.read_only"},
392         {Opt_read_only, "ro"},          /* Alternate spelling */
393         {Opt_read_write, "read_write"},
394         {Opt_read_write, "rw"},         /* Alternate spelling */
395         /* Boolean args above */
396         {-1, NULL}
397 };
398
399 static int parse_rbd_opts_token(char *c, void *private)
400 {
401         struct rbd_options *rbd_opts = private;
402         substring_t argstr[MAX_OPT_ARGS];
403         int token, intval, ret;
404
405         token = match_token(c, rbd_opts_tokens, argstr);
406         if (token < 0)
407                 return -EINVAL;
408
409         if (token < Opt_last_int) {
410                 ret = match_int(&argstr[0], &intval);
411                 if (ret < 0) {
412                         pr_err("bad mount option arg (not int) "
413                                "at '%s'\n", c);
414                         return ret;
415                 }
416                 dout("got int token %d val %d\n", token, intval);
417         } else if (token > Opt_last_int && token < Opt_last_string) {
418                 dout("got string token %d val %s\n", token,
419                      argstr[0].from);
420         } else if (token > Opt_last_string && token < Opt_last_bool) {
421                 dout("got Boolean token %d\n", token);
422         } else {
423                 dout("got token %d\n", token);
424         }
425
426         switch (token) {
427         case Opt_read_only:
428                 rbd_opts->read_only = true;
429                 break;
430         case Opt_read_write:
431                 rbd_opts->read_only = false;
432                 break;
433         default:
434                 rbd_assert(false);
435                 break;
436         }
437         return 0;
438 }
439
440 /*
441  * Get a ceph client with specific addr and configuration, if one does
442  * not exist create it.
443  */
444 static int rbd_get_client(struct rbd_device *rbd_dev, const char *mon_addr,
445                                 size_t mon_addr_len, char *options)
446 {
447         struct rbd_options *rbd_opts = &rbd_dev->rbd_opts;
448         struct ceph_options *ceph_opts;
449         struct rbd_client *rbdc;
450
451         rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
452
453         ceph_opts = ceph_parse_options(options, mon_addr,
454                                         mon_addr + mon_addr_len,
455                                         parse_rbd_opts_token, rbd_opts);
456         if (IS_ERR(ceph_opts))
457                 return PTR_ERR(ceph_opts);
458
459         rbdc = rbd_client_find(ceph_opts);
460         if (rbdc) {
461                 /* using an existing client */
462                 ceph_destroy_options(ceph_opts);
463         } else {
464                 rbdc = rbd_client_create(ceph_opts);
465                 if (IS_ERR(rbdc))
466                         return PTR_ERR(rbdc);
467         }
468         rbd_dev->rbd_client = rbdc;
469
470         return 0;
471 }
472
473 /*
474  * Destroy ceph client
475  *
476  * Caller must hold rbd_client_list_lock.
477  */
478 static void rbd_client_release(struct kref *kref)
479 {
480         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
481
482         dout("rbd_release_client %p\n", rbdc);
483         spin_lock(&rbd_client_list_lock);
484         list_del(&rbdc->node);
485         spin_unlock(&rbd_client_list_lock);
486
487         ceph_destroy_client(rbdc->client);
488         kfree(rbdc);
489 }
490
491 /*
492  * Drop reference to ceph client node. If it's not referenced anymore, release
493  * it.
494  */
495 static void rbd_put_client(struct rbd_device *rbd_dev)
496 {
497         kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
498         rbd_dev->rbd_client = NULL;
499 }
500
501 /*
502  * Destroy requests collection
503  */
504 static void rbd_coll_release(struct kref *kref)
505 {
506         struct rbd_req_coll *coll =
507                 container_of(kref, struct rbd_req_coll, kref);
508
509         dout("rbd_coll_release %p\n", coll);
510         kfree(coll);
511 }
512
513 static bool rbd_image_format_valid(u32 image_format)
514 {
515         return image_format == 1 || image_format == 2;
516 }
517
518 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
519 {
520         size_t size;
521         u32 snap_count;
522
523         /* The header has to start with the magic rbd header text */
524         if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
525                 return false;
526
527         /*
528          * The size of a snapshot header has to fit in a size_t, and
529          * that limits the number of snapshots.
530          */
531         snap_count = le32_to_cpu(ondisk->snap_count);
532         size = SIZE_MAX - sizeof (struct ceph_snap_context);
533         if (snap_count > size / sizeof (__le64))
534                 return false;
535
536         /*
537          * Not only that, but the size of the entire the snapshot
538          * header must also be representable in a size_t.
539          */
540         size -= snap_count * sizeof (__le64);
541         if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
542                 return false;
543
544         return true;
545 }
546
547 /*
548  * Create a new header structure, translate header format from the on-disk
549  * header.
550  */
551 static int rbd_header_from_disk(struct rbd_image_header *header,
552                                  struct rbd_image_header_ondisk *ondisk)
553 {
554         u32 snap_count;
555         size_t len;
556         size_t size;
557         u32 i;
558
559         memset(header, 0, sizeof (*header));
560
561         snap_count = le32_to_cpu(ondisk->snap_count);
562
563         len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
564         header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
565         if (!header->object_prefix)
566                 return -ENOMEM;
567         memcpy(header->object_prefix, ondisk->object_prefix, len);
568         header->object_prefix[len] = '\0';
569
570         if (snap_count) {
571                 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
572
573                 /* Save a copy of the snapshot names */
574
575                 if (snap_names_len > (u64) SIZE_MAX)
576                         return -EIO;
577                 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
578                 if (!header->snap_names)
579                         goto out_err;
580                 /*
581                  * Note that rbd_dev_v1_header_read() guarantees
582                  * the ondisk buffer we're working with has
583                  * snap_names_len bytes beyond the end of the
584                  * snapshot id array, this memcpy() is safe.
585                  */
586                 memcpy(header->snap_names, &ondisk->snaps[snap_count],
587                         snap_names_len);
588
589                 /* Record each snapshot's size */
590
591                 size = snap_count * sizeof (*header->snap_sizes);
592                 header->snap_sizes = kmalloc(size, GFP_KERNEL);
593                 if (!header->snap_sizes)
594                         goto out_err;
595                 for (i = 0; i < snap_count; i++)
596                         header->snap_sizes[i] =
597                                 le64_to_cpu(ondisk->snaps[i].image_size);
598         } else {
599                 WARN_ON(ondisk->snap_names_len);
600                 header->snap_names = NULL;
601                 header->snap_sizes = NULL;
602         }
603
604         header->features = 0;   /* No features support in v1 images */
605         header->obj_order = ondisk->options.order;
606         header->crypt_type = ondisk->options.crypt_type;
607         header->comp_type = ondisk->options.comp_type;
608
609         /* Allocate and fill in the snapshot context */
610
611         header->image_size = le64_to_cpu(ondisk->image_size);
612         size = sizeof (struct ceph_snap_context);
613         size += snap_count * sizeof (header->snapc->snaps[0]);
614         header->snapc = kzalloc(size, GFP_KERNEL);
615         if (!header->snapc)
616                 goto out_err;
617
618         atomic_set(&header->snapc->nref, 1);
619         header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
620         header->snapc->num_snaps = snap_count;
621         for (i = 0; i < snap_count; i++)
622                 header->snapc->snaps[i] =
623                         le64_to_cpu(ondisk->snaps[i].id);
624
625         return 0;
626
627 out_err:
628         kfree(header->snap_sizes);
629         header->snap_sizes = NULL;
630         kfree(header->snap_names);
631         header->snap_names = NULL;
632         kfree(header->object_prefix);
633         header->object_prefix = NULL;
634
635         return -ENOMEM;
636 }
637
638 static int snap_by_name(struct rbd_device *rbd_dev, const char *snap_name)
639 {
640
641         struct rbd_snap *snap;
642
643         list_for_each_entry(snap, &rbd_dev->snaps, node) {
644                 if (!strcmp(snap_name, snap->name)) {
645                         rbd_dev->mapping.snap_id = snap->id;
646                         rbd_dev->mapping.size = snap->size;
647                         rbd_dev->mapping.features = snap->features;
648
649                         return 0;
650                 }
651         }
652
653         return -ENOENT;
654 }
655
656 static int rbd_dev_set_mapping(struct rbd_device *rbd_dev, char *snap_name)
657 {
658         int ret;
659
660         if (!memcmp(snap_name, RBD_SNAP_HEAD_NAME,
661                     sizeof (RBD_SNAP_HEAD_NAME))) {
662                 rbd_dev->mapping.snap_id = CEPH_NOSNAP;
663                 rbd_dev->mapping.size = rbd_dev->header.image_size;
664                 rbd_dev->mapping.features = rbd_dev->header.features;
665                 rbd_dev->mapping.snap_exists = false;
666                 rbd_dev->mapping.read_only = rbd_dev->rbd_opts.read_only;
667                 ret = 0;
668         } else {
669                 ret = snap_by_name(rbd_dev, snap_name);
670                 if (ret < 0)
671                         goto done;
672                 rbd_dev->mapping.snap_exists = true;
673                 rbd_dev->mapping.read_only = true;
674         }
675         rbd_dev->mapping.snap_name = snap_name;
676 done:
677         return ret;
678 }
679
680 static void rbd_header_free(struct rbd_image_header *header)
681 {
682         kfree(header->object_prefix);
683         header->object_prefix = NULL;
684         kfree(header->snap_sizes);
685         header->snap_sizes = NULL;
686         kfree(header->snap_names);
687         header->snap_names = NULL;
688         ceph_put_snap_context(header->snapc);
689         header->snapc = NULL;
690 }
691
692 static char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
693 {
694         char *name;
695         u64 segment;
696         int ret;
697
698         name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
699         if (!name)
700                 return NULL;
701         segment = offset >> rbd_dev->header.obj_order;
702         ret = snprintf(name, RBD_MAX_SEG_NAME_LEN, "%s.%012llx",
703                         rbd_dev->header.object_prefix, segment);
704         if (ret < 0 || ret >= RBD_MAX_SEG_NAME_LEN) {
705                 pr_err("error formatting segment name for #%llu (%d)\n",
706                         segment, ret);
707                 kfree(name);
708                 name = NULL;
709         }
710
711         return name;
712 }
713
714 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
715 {
716         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
717
718         return offset & (segment_size - 1);
719 }
720
721 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
722                                 u64 offset, u64 length)
723 {
724         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
725
726         offset &= segment_size - 1;
727
728         rbd_assert(length <= U64_MAX - offset);
729         if (offset + length > segment_size)
730                 length = segment_size - offset;
731
732         return length;
733 }
734
735 static int rbd_get_num_segments(struct rbd_image_header *header,
736                                 u64 ofs, u64 len)
737 {
738         u64 start_seg;
739         u64 end_seg;
740
741         if (!len)
742                 return 0;
743         if (len - 1 > U64_MAX - ofs)
744                 return -ERANGE;
745
746         start_seg = ofs >> header->obj_order;
747         end_seg = (ofs + len - 1) >> header->obj_order;
748
749         return end_seg - start_seg + 1;
750 }
751
752 /*
753  * returns the size of an object in the image
754  */
755 static u64 rbd_obj_bytes(struct rbd_image_header *header)
756 {
757         return 1 << header->obj_order;
758 }
759
760 /*
761  * bio helpers
762  */
763
764 static void bio_chain_put(struct bio *chain)
765 {
766         struct bio *tmp;
767
768         while (chain) {
769                 tmp = chain;
770                 chain = chain->bi_next;
771                 bio_put(tmp);
772         }
773 }
774
775 /*
776  * zeros a bio chain, starting at specific offset
777  */
778 static void zero_bio_chain(struct bio *chain, int start_ofs)
779 {
780         struct bio_vec *bv;
781         unsigned long flags;
782         void *buf;
783         int i;
784         int pos = 0;
785
786         while (chain) {
787                 bio_for_each_segment(bv, chain, i) {
788                         if (pos + bv->bv_len > start_ofs) {
789                                 int remainder = max(start_ofs - pos, 0);
790                                 buf = bvec_kmap_irq(bv, &flags);
791                                 memset(buf + remainder, 0,
792                                        bv->bv_len - remainder);
793                                 bvec_kunmap_irq(buf, &flags);
794                         }
795                         pos += bv->bv_len;
796                 }
797
798                 chain = chain->bi_next;
799         }
800 }
801
802 /*
803  * bio_chain_clone - clone a chain of bios up to a certain length.
804  * might return a bio_pair that will need to be released.
805  */
806 static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
807                                    struct bio_pair **bp,
808                                    int len, gfp_t gfpmask)
809 {
810         struct bio *old_chain = *old;
811         struct bio *new_chain = NULL;
812         struct bio *tail;
813         int total = 0;
814
815         if (*bp) {
816                 bio_pair_release(*bp);
817                 *bp = NULL;
818         }
819
820         while (old_chain && (total < len)) {
821                 struct bio *tmp;
822
823                 tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
824                 if (!tmp)
825                         goto err_out;
826                 gfpmask &= ~__GFP_WAIT; /* can't wait after the first */
827
828                 if (total + old_chain->bi_size > len) {
829                         struct bio_pair *bp;
830
831                         /*
832                          * this split can only happen with a single paged bio,
833                          * split_bio will BUG_ON if this is not the case
834                          */
835                         dout("bio_chain_clone split! total=%d remaining=%d"
836                              "bi_size=%u\n",
837                              total, len - total, old_chain->bi_size);
838
839                         /* split the bio. We'll release it either in the next
840                            call, or it will have to be released outside */
841                         bp = bio_split(old_chain, (len - total) / SECTOR_SIZE);
842                         if (!bp)
843                                 goto err_out;
844
845                         __bio_clone(tmp, &bp->bio1);
846
847                         *next = &bp->bio2;
848                 } else {
849                         __bio_clone(tmp, old_chain);
850                         *next = old_chain->bi_next;
851                 }
852
853                 tmp->bi_bdev = NULL;
854                 tmp->bi_next = NULL;
855                 if (new_chain)
856                         tail->bi_next = tmp;
857                 else
858                         new_chain = tmp;
859                 tail = tmp;
860                 old_chain = old_chain->bi_next;
861
862                 total += tmp->bi_size;
863         }
864
865         rbd_assert(total == len);
866
867         *old = old_chain;
868
869         return new_chain;
870
871 err_out:
872         dout("bio_chain_clone with err\n");
873         bio_chain_put(new_chain);
874         return NULL;
875 }
876
877 /*
878  * helpers for osd request op vectors.
879  */
880 static struct ceph_osd_req_op *rbd_create_rw_ops(int num_ops,
881                                         int opcode, u32 payload_len)
882 {
883         struct ceph_osd_req_op *ops;
884
885         ops = kzalloc(sizeof (*ops) * (num_ops + 1), GFP_NOIO);
886         if (!ops)
887                 return NULL;
888
889         ops[0].op = opcode;
890
891         /*
892          * op extent offset and length will be set later on
893          * in calc_raw_layout()
894          */
895         ops[0].payload_len = payload_len;
896
897         return ops;
898 }
899
900 static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
901 {
902         kfree(ops);
903 }
904
905 static void rbd_coll_end_req_index(struct request *rq,
906                                    struct rbd_req_coll *coll,
907                                    int index,
908                                    int ret, u64 len)
909 {
910         struct request_queue *q;
911         int min, max, i;
912
913         dout("rbd_coll_end_req_index %p index %d ret %d len %llu\n",
914              coll, index, ret, (unsigned long long) len);
915
916         if (!rq)
917                 return;
918
919         if (!coll) {
920                 blk_end_request(rq, ret, len);
921                 return;
922         }
923
924         q = rq->q;
925
926         spin_lock_irq(q->queue_lock);
927         coll->status[index].done = 1;
928         coll->status[index].rc = ret;
929         coll->status[index].bytes = len;
930         max = min = coll->num_done;
931         while (max < coll->total && coll->status[max].done)
932                 max++;
933
934         for (i = min; i<max; i++) {
935                 __blk_end_request(rq, coll->status[i].rc,
936                                   coll->status[i].bytes);
937                 coll->num_done++;
938                 kref_put(&coll->kref, rbd_coll_release);
939         }
940         spin_unlock_irq(q->queue_lock);
941 }
942
943 static void rbd_coll_end_req(struct rbd_request *req,
944                              int ret, u64 len)
945 {
946         rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len);
947 }
948
949 /*
950  * Send ceph osd request
951  */
952 static int rbd_do_request(struct request *rq,
953                           struct rbd_device *rbd_dev,
954                           struct ceph_snap_context *snapc,
955                           u64 snapid,
956                           const char *object_name, u64 ofs, u64 len,
957                           struct bio *bio,
958                           struct page **pages,
959                           int num_pages,
960                           int flags,
961                           struct ceph_osd_req_op *ops,
962                           struct rbd_req_coll *coll,
963                           int coll_index,
964                           void (*rbd_cb)(struct ceph_osd_request *req,
965                                          struct ceph_msg *msg),
966                           struct ceph_osd_request **linger_req,
967                           u64 *ver)
968 {
969         struct ceph_osd_request *req;
970         struct ceph_file_layout *layout;
971         int ret;
972         u64 bno;
973         struct timespec mtime = CURRENT_TIME;
974         struct rbd_request *req_data;
975         struct ceph_osd_request_head *reqhead;
976         struct ceph_osd_client *osdc;
977
978         req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
979         if (!req_data) {
980                 if (coll)
981                         rbd_coll_end_req_index(rq, coll, coll_index,
982                                                -ENOMEM, len);
983                 return -ENOMEM;
984         }
985
986         if (coll) {
987                 req_data->coll = coll;
988                 req_data->coll_index = coll_index;
989         }
990
991         dout("rbd_do_request object_name=%s ofs=%llu len=%llu\n", object_name,
992                 (unsigned long long) ofs, (unsigned long long) len);
993
994         osdc = &rbd_dev->rbd_client->client->osdc;
995         req = ceph_osdc_alloc_request(osdc, flags, snapc, ops,
996                                         false, GFP_NOIO, pages, bio);
997         if (!req) {
998                 ret = -ENOMEM;
999                 goto done_pages;
1000         }
1001
1002         req->r_callback = rbd_cb;
1003
1004         req_data->rq = rq;
1005         req_data->bio = bio;
1006         req_data->pages = pages;
1007         req_data->len = len;
1008
1009         req->r_priv = req_data;
1010
1011         reqhead = req->r_request->front.iov_base;
1012         reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
1013
1014         strncpy(req->r_oid, object_name, sizeof(req->r_oid));
1015         req->r_oid_len = strlen(req->r_oid);
1016
1017         layout = &req->r_file_layout;
1018         memset(layout, 0, sizeof(*layout));
1019         layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
1020         layout->fl_stripe_count = cpu_to_le32(1);
1021         layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
1022         layout->fl_pg_pool = cpu_to_le32(rbd_dev->pool_id);
1023         ret = ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
1024                                    req, ops);
1025         rbd_assert(ret == 0);
1026
1027         ceph_osdc_build_request(req, ofs, &len,
1028                                 ops,
1029                                 snapc,
1030                                 &mtime,
1031                                 req->r_oid, req->r_oid_len);
1032
1033         if (linger_req) {
1034                 ceph_osdc_set_request_linger(osdc, req);
1035                 *linger_req = req;
1036         }
1037
1038         ret = ceph_osdc_start_request(osdc, req, false);
1039         if (ret < 0)
1040                 goto done_err;
1041
1042         if (!rbd_cb) {
1043                 ret = ceph_osdc_wait_request(osdc, req);
1044                 if (ver)
1045                         *ver = le64_to_cpu(req->r_reassert_version.version);
1046                 dout("reassert_ver=%llu\n",
1047                         (unsigned long long)
1048                                 le64_to_cpu(req->r_reassert_version.version));
1049                 ceph_osdc_put_request(req);
1050         }
1051         return ret;
1052
1053 done_err:
1054         bio_chain_put(req_data->bio);
1055         ceph_osdc_put_request(req);
1056 done_pages:
1057         rbd_coll_end_req(req_data, ret, len);
1058         kfree(req_data);
1059         return ret;
1060 }
1061
1062 /*
1063  * Ceph osd op callback
1064  */
1065 static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1066 {
1067         struct rbd_request *req_data = req->r_priv;
1068         struct ceph_osd_reply_head *replyhead;
1069         struct ceph_osd_op *op;
1070         __s32 rc;
1071         u64 bytes;
1072         int read_op;
1073
1074         /* parse reply */
1075         replyhead = msg->front.iov_base;
1076         WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
1077         op = (void *)(replyhead + 1);
1078         rc = le32_to_cpu(replyhead->result);
1079         bytes = le64_to_cpu(op->extent.length);
1080         read_op = (le16_to_cpu(op->op) == CEPH_OSD_OP_READ);
1081
1082         dout("rbd_req_cb bytes=%llu readop=%d rc=%d\n",
1083                 (unsigned long long) bytes, read_op, (int) rc);
1084
1085         if (rc == -ENOENT && read_op) {
1086                 zero_bio_chain(req_data->bio, 0);
1087                 rc = 0;
1088         } else if (rc == 0 && read_op && bytes < req_data->len) {
1089                 zero_bio_chain(req_data->bio, bytes);
1090                 bytes = req_data->len;
1091         }
1092
1093         rbd_coll_end_req(req_data, rc, bytes);
1094
1095         if (req_data->bio)
1096                 bio_chain_put(req_data->bio);
1097
1098         ceph_osdc_put_request(req);
1099         kfree(req_data);
1100 }
1101
1102 static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1103 {
1104         ceph_osdc_put_request(req);
1105 }
1106
1107 /*
1108  * Do a synchronous ceph osd operation
1109  */
1110 static int rbd_req_sync_op(struct rbd_device *rbd_dev,
1111                            struct ceph_snap_context *snapc,
1112                            u64 snapid,
1113                            int flags,
1114                            struct ceph_osd_req_op *ops,
1115                            const char *object_name,
1116                            u64 ofs, u64 inbound_size,
1117                            char *inbound,
1118                            struct ceph_osd_request **linger_req,
1119                            u64 *ver)
1120 {
1121         int ret;
1122         struct page **pages;
1123         int num_pages;
1124
1125         rbd_assert(ops != NULL);
1126
1127         num_pages = calc_pages_for(ofs, inbound_size);
1128         pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1129         if (IS_ERR(pages))
1130                 return PTR_ERR(pages);
1131
1132         ret = rbd_do_request(NULL, rbd_dev, snapc, snapid,
1133                           object_name, ofs, inbound_size, NULL,
1134                           pages, num_pages,
1135                           flags,
1136                           ops,
1137                           NULL, 0,
1138                           NULL,
1139                           linger_req, ver);
1140         if (ret < 0)
1141                 goto done;
1142
1143         if ((flags & CEPH_OSD_FLAG_READ) && inbound)
1144                 ret = ceph_copy_from_page_vector(pages, inbound, ofs, ret);
1145
1146 done:
1147         ceph_release_page_vector(pages, num_pages);
1148         return ret;
1149 }
1150
1151 /*
1152  * Do an asynchronous ceph osd operation
1153  */
1154 static int rbd_do_op(struct request *rq,
1155                      struct rbd_device *rbd_dev,
1156                      struct ceph_snap_context *snapc,
1157                      u64 snapid,
1158                      int opcode, int flags,
1159                      u64 ofs, u64 len,
1160                      struct bio *bio,
1161                      struct rbd_req_coll *coll,
1162                      int coll_index)
1163 {
1164         char *seg_name;
1165         u64 seg_ofs;
1166         u64 seg_len;
1167         int ret;
1168         struct ceph_osd_req_op *ops;
1169         u32 payload_len;
1170
1171         seg_name = rbd_segment_name(rbd_dev, ofs);
1172         if (!seg_name)
1173                 return -ENOMEM;
1174         seg_len = rbd_segment_length(rbd_dev, ofs, len);
1175         seg_ofs = rbd_segment_offset(rbd_dev, ofs);
1176
1177         payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0);
1178
1179         ret = -ENOMEM;
1180         ops = rbd_create_rw_ops(1, opcode, payload_len);
1181         if (!ops)
1182                 goto done;
1183
1184         /* we've taken care of segment sizes earlier when we
1185            cloned the bios. We should never have a segment
1186            truncated at this point */
1187         rbd_assert(seg_len == len);
1188
1189         ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
1190                              seg_name, seg_ofs, seg_len,
1191                              bio,
1192                              NULL, 0,
1193                              flags,
1194                              ops,
1195                              coll, coll_index,
1196                              rbd_req_cb, 0, NULL);
1197
1198         rbd_destroy_ops(ops);
1199 done:
1200         kfree(seg_name);
1201         return ret;
1202 }
1203
1204 /*
1205  * Request async osd write
1206  */
1207 static int rbd_req_write(struct request *rq,
1208                          struct rbd_device *rbd_dev,
1209                          struct ceph_snap_context *snapc,
1210                          u64 ofs, u64 len,
1211                          struct bio *bio,
1212                          struct rbd_req_coll *coll,
1213                          int coll_index)
1214 {
1215         return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
1216                          CEPH_OSD_OP_WRITE,
1217                          CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1218                          ofs, len, bio, coll, coll_index);
1219 }
1220
1221 /*
1222  * Request async osd read
1223  */
1224 static int rbd_req_read(struct request *rq,
1225                          struct rbd_device *rbd_dev,
1226                          u64 snapid,
1227                          u64 ofs, u64 len,
1228                          struct bio *bio,
1229                          struct rbd_req_coll *coll,
1230                          int coll_index)
1231 {
1232         return rbd_do_op(rq, rbd_dev, NULL,
1233                          snapid,
1234                          CEPH_OSD_OP_READ,
1235                          CEPH_OSD_FLAG_READ,
1236                          ofs, len, bio, coll, coll_index);
1237 }
1238
1239 /*
1240  * Request sync osd read
1241  */
1242 static int rbd_req_sync_read(struct rbd_device *rbd_dev,
1243                           u64 snapid,
1244                           const char *object_name,
1245                           u64 ofs, u64 len,
1246                           char *buf,
1247                           u64 *ver)
1248 {
1249         struct ceph_osd_req_op *ops;
1250         int ret;
1251
1252         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_READ, 0);
1253         if (!ops)
1254                 return -ENOMEM;
1255
1256         ret = rbd_req_sync_op(rbd_dev, NULL,
1257                                snapid,
1258                                CEPH_OSD_FLAG_READ,
1259                                ops, object_name, ofs, len, buf, NULL, ver);
1260         rbd_destroy_ops(ops);
1261
1262         return ret;
1263 }
1264
1265 /*
1266  * Request sync osd watch
1267  */
1268 static int rbd_req_sync_notify_ack(struct rbd_device *rbd_dev,
1269                                    u64 ver,
1270                                    u64 notify_id)
1271 {
1272         struct ceph_osd_req_op *ops;
1273         int ret;
1274
1275         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY_ACK, 0);
1276         if (!ops)
1277                 return -ENOMEM;
1278
1279         ops[0].watch.ver = cpu_to_le64(ver);
1280         ops[0].watch.cookie = notify_id;
1281         ops[0].watch.flag = 0;
1282
1283         ret = rbd_do_request(NULL, rbd_dev, NULL, CEPH_NOSNAP,
1284                           rbd_dev->header_name, 0, 0, NULL,
1285                           NULL, 0,
1286                           CEPH_OSD_FLAG_READ,
1287                           ops,
1288                           NULL, 0,
1289                           rbd_simple_req_cb, 0, NULL);
1290
1291         rbd_destroy_ops(ops);
1292         return ret;
1293 }
1294
1295 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1296 {
1297         struct rbd_device *rbd_dev = (struct rbd_device *)data;
1298         u64 hver;
1299         int rc;
1300
1301         if (!rbd_dev)
1302                 return;
1303
1304         dout("rbd_watch_cb %s notify_id=%llu opcode=%u\n",
1305                 rbd_dev->header_name, (unsigned long long) notify_id,
1306                 (unsigned int) opcode);
1307         rc = rbd_refresh_header(rbd_dev, &hver);
1308         if (rc)
1309                 pr_warning(RBD_DRV_NAME "%d got notification but failed to "
1310                            " update snaps: %d\n", rbd_dev->major, rc);
1311
1312         rbd_req_sync_notify_ack(rbd_dev, hver, notify_id);
1313 }
1314
1315 /*
1316  * Request sync osd watch
1317  */
1318 static int rbd_req_sync_watch(struct rbd_device *rbd_dev)
1319 {
1320         struct ceph_osd_req_op *ops;
1321         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1322         int ret;
1323
1324         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1325         if (!ops)
1326                 return -ENOMEM;
1327
1328         ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
1329                                      (void *)rbd_dev, &rbd_dev->watch_event);
1330         if (ret < 0)
1331                 goto fail;
1332
1333         ops[0].watch.ver = cpu_to_le64(rbd_dev->header.obj_version);
1334         ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1335         ops[0].watch.flag = 1;
1336
1337         ret = rbd_req_sync_op(rbd_dev, NULL,
1338                               CEPH_NOSNAP,
1339                               CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1340                               ops,
1341                               rbd_dev->header_name,
1342                               0, 0, NULL,
1343                               &rbd_dev->watch_request, NULL);
1344
1345         if (ret < 0)
1346                 goto fail_event;
1347
1348         rbd_destroy_ops(ops);
1349         return 0;
1350
1351 fail_event:
1352         ceph_osdc_cancel_event(rbd_dev->watch_event);
1353         rbd_dev->watch_event = NULL;
1354 fail:
1355         rbd_destroy_ops(ops);
1356         return ret;
1357 }
1358
1359 /*
1360  * Request sync osd unwatch
1361  */
1362 static int rbd_req_sync_unwatch(struct rbd_device *rbd_dev)
1363 {
1364         struct ceph_osd_req_op *ops;
1365         int ret;
1366
1367         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1368         if (!ops)
1369                 return -ENOMEM;
1370
1371         ops[0].watch.ver = 0;
1372         ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1373         ops[0].watch.flag = 0;
1374
1375         ret = rbd_req_sync_op(rbd_dev, NULL,
1376                               CEPH_NOSNAP,
1377                               CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1378                               ops,
1379                               rbd_dev->header_name,
1380                               0, 0, NULL, NULL, NULL);
1381
1382
1383         rbd_destroy_ops(ops);
1384         ceph_osdc_cancel_event(rbd_dev->watch_event);
1385         rbd_dev->watch_event = NULL;
1386         return ret;
1387 }
1388
1389 /*
1390  * Synchronous osd object method call
1391  */
1392 static int rbd_req_sync_exec(struct rbd_device *rbd_dev,
1393                              const char *object_name,
1394                              const char *class_name,
1395                              const char *method_name,
1396                              const char *outbound,
1397                              size_t outbound_size,
1398                              char *inbound,
1399                              size_t inbound_size,
1400                              int flags,
1401                              u64 *ver)
1402 {
1403         struct ceph_osd_req_op *ops;
1404         int class_name_len = strlen(class_name);
1405         int method_name_len = strlen(method_name);
1406         int payload_size;
1407         int ret;
1408
1409         /*
1410          * Any input parameters required by the method we're calling
1411          * will be sent along with the class and method names as
1412          * part of the message payload.  That data and its size are
1413          * supplied via the indata and indata_len fields (named from
1414          * the perspective of the server side) in the OSD request
1415          * operation.
1416          */
1417         payload_size = class_name_len + method_name_len + outbound_size;
1418         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_CALL, payload_size);
1419         if (!ops)
1420                 return -ENOMEM;
1421
1422         ops[0].cls.class_name = class_name;
1423         ops[0].cls.class_len = (__u8) class_name_len;
1424         ops[0].cls.method_name = method_name;
1425         ops[0].cls.method_len = (__u8) method_name_len;
1426         ops[0].cls.argc = 0;
1427         ops[0].cls.indata = outbound;
1428         ops[0].cls.indata_len = outbound_size;
1429
1430         ret = rbd_req_sync_op(rbd_dev, NULL,
1431                                CEPH_NOSNAP,
1432                                flags, ops,
1433                                object_name, 0, inbound_size, inbound,
1434                                NULL, ver);
1435
1436         rbd_destroy_ops(ops);
1437
1438         dout("cls_exec returned %d\n", ret);
1439         return ret;
1440 }
1441
1442 static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
1443 {
1444         struct rbd_req_coll *coll =
1445                         kzalloc(sizeof(struct rbd_req_coll) +
1446                                 sizeof(struct rbd_req_status) * num_reqs,
1447                                 GFP_ATOMIC);
1448
1449         if (!coll)
1450                 return NULL;
1451         coll->total = num_reqs;
1452         kref_init(&coll->kref);
1453         return coll;
1454 }
1455
1456 /*
1457  * block device queue callback
1458  */
1459 static void rbd_rq_fn(struct request_queue *q)
1460 {
1461         struct rbd_device *rbd_dev = q->queuedata;
1462         struct request *rq;
1463         struct bio_pair *bp = NULL;
1464
1465         while ((rq = blk_fetch_request(q))) {
1466                 struct bio *bio;
1467                 struct bio *rq_bio, *next_bio = NULL;
1468                 bool do_write;
1469                 unsigned int size;
1470                 u64 op_size = 0;
1471                 u64 ofs;
1472                 int num_segs, cur_seg = 0;
1473                 struct rbd_req_coll *coll;
1474                 struct ceph_snap_context *snapc;
1475
1476                 dout("fetched request\n");
1477
1478                 /* filter out block requests we don't understand */
1479                 if ((rq->cmd_type != REQ_TYPE_FS)) {
1480                         __blk_end_request_all(rq, 0);
1481                         continue;
1482                 }
1483
1484                 /* deduce our operation (read, write) */
1485                 do_write = (rq_data_dir(rq) == WRITE);
1486
1487                 size = blk_rq_bytes(rq);
1488                 ofs = blk_rq_pos(rq) * SECTOR_SIZE;
1489                 rq_bio = rq->bio;
1490                 if (do_write && rbd_dev->mapping.read_only) {
1491                         __blk_end_request_all(rq, -EROFS);
1492                         continue;
1493                 }
1494
1495                 spin_unlock_irq(q->queue_lock);
1496
1497                 down_read(&rbd_dev->header_rwsem);
1498
1499                 if (rbd_dev->mapping.snap_id != CEPH_NOSNAP &&
1500                                 !rbd_dev->mapping.snap_exists) {
1501                         up_read(&rbd_dev->header_rwsem);
1502                         dout("request for non-existent snapshot");
1503                         spin_lock_irq(q->queue_lock);
1504                         __blk_end_request_all(rq, -ENXIO);
1505                         continue;
1506                 }
1507
1508                 snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1509
1510                 up_read(&rbd_dev->header_rwsem);
1511
1512                 dout("%s 0x%x bytes at 0x%llx\n",
1513                      do_write ? "write" : "read",
1514                      size, (unsigned long long) blk_rq_pos(rq) * SECTOR_SIZE);
1515
1516                 num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1517                 if (num_segs <= 0) {
1518                         spin_lock_irq(q->queue_lock);
1519                         __blk_end_request_all(rq, num_segs);
1520                         ceph_put_snap_context(snapc);
1521                         continue;
1522                 }
1523                 coll = rbd_alloc_coll(num_segs);
1524                 if (!coll) {
1525                         spin_lock_irq(q->queue_lock);
1526                         __blk_end_request_all(rq, -ENOMEM);
1527                         ceph_put_snap_context(snapc);
1528                         continue;
1529                 }
1530
1531                 do {
1532                         /* a bio clone to be passed down to OSD req */
1533                         dout("rq->bio->bi_vcnt=%hu\n", rq->bio->bi_vcnt);
1534                         op_size = rbd_segment_length(rbd_dev, ofs, size);
1535                         kref_get(&coll->kref);
1536                         bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
1537                                               op_size, GFP_ATOMIC);
1538                         if (!bio) {
1539                                 rbd_coll_end_req_index(rq, coll, cur_seg,
1540                                                        -ENOMEM, op_size);
1541                                 goto next_seg;
1542                         }
1543
1544
1545                         /* init OSD command: write or read */
1546                         if (do_write)
1547                                 rbd_req_write(rq, rbd_dev,
1548                                               snapc,
1549                                               ofs,
1550                                               op_size, bio,
1551                                               coll, cur_seg);
1552                         else
1553                                 rbd_req_read(rq, rbd_dev,
1554                                              rbd_dev->mapping.snap_id,
1555                                              ofs,
1556                                              op_size, bio,
1557                                              coll, cur_seg);
1558
1559 next_seg:
1560                         size -= op_size;
1561                         ofs += op_size;
1562
1563                         cur_seg++;
1564                         rq_bio = next_bio;
1565                 } while (size > 0);
1566                 kref_put(&coll->kref, rbd_coll_release);
1567
1568                 if (bp)
1569                         bio_pair_release(bp);
1570                 spin_lock_irq(q->queue_lock);
1571
1572                 ceph_put_snap_context(snapc);
1573         }
1574 }
1575
1576 /*
1577  * a queue callback. Makes sure that we don't create a bio that spans across
1578  * multiple osd objects. One exception would be with a single page bios,
1579  * which we handle later at bio_chain_clone
1580  */
1581 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1582                           struct bio_vec *bvec)
1583 {
1584         struct rbd_device *rbd_dev = q->queuedata;
1585         unsigned int chunk_sectors;
1586         sector_t sector;
1587         unsigned int bio_sectors;
1588         int max;
1589
1590         chunk_sectors = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
1591         sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
1592         bio_sectors = bmd->bi_size >> SECTOR_SHIFT;
1593
1594         max =  (chunk_sectors - ((sector & (chunk_sectors - 1))
1595                                  + bio_sectors)) << SECTOR_SHIFT;
1596         if (max < 0)
1597                 max = 0; /* bio_add cannot handle a negative return */
1598         if (max <= bvec->bv_len && bio_sectors == 0)
1599                 return bvec->bv_len;
1600         return max;
1601 }
1602
1603 static void rbd_free_disk(struct rbd_device *rbd_dev)
1604 {
1605         struct gendisk *disk = rbd_dev->disk;
1606
1607         if (!disk)
1608                 return;
1609
1610         if (disk->flags & GENHD_FL_UP)
1611                 del_gendisk(disk);
1612         if (disk->queue)
1613                 blk_cleanup_queue(disk->queue);
1614         put_disk(disk);
1615 }
1616
1617 /*
1618  * Read the complete header for the given rbd device.
1619  *
1620  * Returns a pointer to a dynamically-allocated buffer containing
1621  * the complete and validated header.  Caller can pass the address
1622  * of a variable that will be filled in with the version of the
1623  * header object at the time it was read.
1624  *
1625  * Returns a pointer-coded errno if a failure occurs.
1626  */
1627 static struct rbd_image_header_ondisk *
1628 rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
1629 {
1630         struct rbd_image_header_ondisk *ondisk = NULL;
1631         u32 snap_count = 0;
1632         u64 names_size = 0;
1633         u32 want_count;
1634         int ret;
1635
1636         /*
1637          * The complete header will include an array of its 64-bit
1638          * snapshot ids, followed by the names of those snapshots as
1639          * a contiguous block of NUL-terminated strings.  Note that
1640          * the number of snapshots could change by the time we read
1641          * it in, in which case we re-read it.
1642          */
1643         do {
1644                 size_t size;
1645
1646                 kfree(ondisk);
1647
1648                 size = sizeof (*ondisk);
1649                 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
1650                 size += names_size;
1651                 ondisk = kmalloc(size, GFP_KERNEL);
1652                 if (!ondisk)
1653                         return ERR_PTR(-ENOMEM);
1654
1655                 ret = rbd_req_sync_read(rbd_dev, CEPH_NOSNAP,
1656                                        rbd_dev->header_name,
1657                                        0, size,
1658                                        (char *) ondisk, version);
1659
1660                 if (ret < 0)
1661                         goto out_err;
1662                 if (WARN_ON((size_t) ret < size)) {
1663                         ret = -ENXIO;
1664                         pr_warning("short header read for image %s"
1665                                         " (want %zd got %d)\n",
1666                                 rbd_dev->image_name, size, ret);
1667                         goto out_err;
1668                 }
1669                 if (!rbd_dev_ondisk_valid(ondisk)) {
1670                         ret = -ENXIO;
1671                         pr_warning("invalid header for image %s\n",
1672                                 rbd_dev->image_name);
1673                         goto out_err;
1674                 }
1675
1676                 names_size = le64_to_cpu(ondisk->snap_names_len);
1677                 want_count = snap_count;
1678                 snap_count = le32_to_cpu(ondisk->snap_count);
1679         } while (snap_count != want_count);
1680
1681         return ondisk;
1682
1683 out_err:
1684         kfree(ondisk);
1685
1686         return ERR_PTR(ret);
1687 }
1688
1689 /*
1690  * reload the ondisk the header
1691  */
1692 static int rbd_read_header(struct rbd_device *rbd_dev,
1693                            struct rbd_image_header *header)
1694 {
1695         struct rbd_image_header_ondisk *ondisk;
1696         u64 ver = 0;
1697         int ret;
1698
1699         ondisk = rbd_dev_v1_header_read(rbd_dev, &ver);
1700         if (IS_ERR(ondisk))
1701                 return PTR_ERR(ondisk);
1702         ret = rbd_header_from_disk(header, ondisk);
1703         if (ret >= 0)
1704                 header->obj_version = ver;
1705         kfree(ondisk);
1706
1707         return ret;
1708 }
1709
1710 static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1711 {
1712         struct rbd_snap *snap;
1713         struct rbd_snap *next;
1714
1715         list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node)
1716                 __rbd_remove_snap_dev(snap);
1717 }
1718
1719 /*
1720  * only read the first part of the ondisk header, without the snaps info
1721  */
1722 static int __rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver)
1723 {
1724         int ret;
1725         struct rbd_image_header h;
1726
1727         ret = rbd_read_header(rbd_dev, &h);
1728         if (ret < 0)
1729                 return ret;
1730
1731         down_write(&rbd_dev->header_rwsem);
1732
1733         /* resized? */
1734         if (rbd_dev->mapping.snap_id == CEPH_NOSNAP) {
1735                 sector_t size = (sector_t) h.image_size / SECTOR_SIZE;
1736
1737                 if (size != (sector_t) rbd_dev->mapping.size) {
1738                         dout("setting size to %llu sectors",
1739                                 (unsigned long long) size);
1740                         rbd_dev->mapping.size = (u64) size;
1741                         set_capacity(rbd_dev->disk, size);
1742                 }
1743         }
1744
1745         /* rbd_dev->header.object_prefix shouldn't change */
1746         kfree(rbd_dev->header.snap_sizes);
1747         kfree(rbd_dev->header.snap_names);
1748         /* osd requests may still refer to snapc */
1749         ceph_put_snap_context(rbd_dev->header.snapc);
1750
1751         if (hver)
1752                 *hver = h.obj_version;
1753         rbd_dev->header.obj_version = h.obj_version;
1754         rbd_dev->header.image_size = h.image_size;
1755         rbd_dev->header.snapc = h.snapc;
1756         rbd_dev->header.snap_names = h.snap_names;
1757         rbd_dev->header.snap_sizes = h.snap_sizes;
1758         /* Free the extra copy of the object prefix */
1759         WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
1760         kfree(h.object_prefix);
1761
1762         ret = rbd_dev_snaps_update(rbd_dev);
1763         if (!ret)
1764                 ret = rbd_dev_snaps_register(rbd_dev);
1765
1766         up_write(&rbd_dev->header_rwsem);
1767
1768         return ret;
1769 }
1770
1771 static int rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver)
1772 {
1773         int ret;
1774
1775         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1776         ret = __rbd_refresh_header(rbd_dev, hver);
1777         mutex_unlock(&ctl_mutex);
1778
1779         return ret;
1780 }
1781
1782 static int rbd_init_disk(struct rbd_device *rbd_dev)
1783 {
1784         struct gendisk *disk;
1785         struct request_queue *q;
1786         u64 segment_size;
1787
1788         /* create gendisk info */
1789         disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1790         if (!disk)
1791                 return -ENOMEM;
1792
1793         snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
1794                  rbd_dev->dev_id);
1795         disk->major = rbd_dev->major;
1796         disk->first_minor = 0;
1797         disk->fops = &rbd_bd_ops;
1798         disk->private_data = rbd_dev;
1799
1800         /* init rq */
1801         q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1802         if (!q)
1803                 goto out_disk;
1804
1805         /* We use the default size, but let's be explicit about it. */
1806         blk_queue_physical_block_size(q, SECTOR_SIZE);
1807
1808         /* set io sizes to object size */
1809         segment_size = rbd_obj_bytes(&rbd_dev->header);
1810         blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
1811         blk_queue_max_segment_size(q, segment_size);
1812         blk_queue_io_min(q, segment_size);
1813         blk_queue_io_opt(q, segment_size);
1814
1815         blk_queue_merge_bvec(q, rbd_merge_bvec);
1816         disk->queue = q;
1817
1818         q->queuedata = rbd_dev;
1819
1820         rbd_dev->disk = disk;
1821
1822         set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
1823
1824         return 0;
1825 out_disk:
1826         put_disk(disk);
1827
1828         return -ENOMEM;
1829 }
1830
1831 /*
1832   sysfs
1833 */
1834
1835 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
1836 {
1837         return container_of(dev, struct rbd_device, dev);
1838 }
1839
1840 static ssize_t rbd_size_show(struct device *dev,
1841                              struct device_attribute *attr, char *buf)
1842 {
1843         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1844         sector_t size;
1845
1846         down_read(&rbd_dev->header_rwsem);
1847         size = get_capacity(rbd_dev->disk);
1848         up_read(&rbd_dev->header_rwsem);
1849
1850         return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
1851 }
1852
1853 /*
1854  * Note this shows the features for whatever's mapped, which is not
1855  * necessarily the base image.
1856  */
1857 static ssize_t rbd_features_show(struct device *dev,
1858                              struct device_attribute *attr, char *buf)
1859 {
1860         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1861
1862         return sprintf(buf, "0x%016llx\n",
1863                         (unsigned long long) rbd_dev->mapping.features);
1864 }
1865
1866 static ssize_t rbd_major_show(struct device *dev,
1867                               struct device_attribute *attr, char *buf)
1868 {
1869         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1870
1871         return sprintf(buf, "%d\n", rbd_dev->major);
1872 }
1873
1874 static ssize_t rbd_client_id_show(struct device *dev,
1875                                   struct device_attribute *attr, char *buf)
1876 {
1877         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1878
1879         return sprintf(buf, "client%lld\n",
1880                         ceph_client_id(rbd_dev->rbd_client->client));
1881 }
1882
1883 static ssize_t rbd_pool_show(struct device *dev,
1884                              struct device_attribute *attr, char *buf)
1885 {
1886         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1887
1888         return sprintf(buf, "%s\n", rbd_dev->pool_name);
1889 }
1890
1891 static ssize_t rbd_pool_id_show(struct device *dev,
1892                              struct device_attribute *attr, char *buf)
1893 {
1894         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1895
1896         return sprintf(buf, "%d\n", rbd_dev->pool_id);
1897 }
1898
1899 static ssize_t rbd_name_show(struct device *dev,
1900                              struct device_attribute *attr, char *buf)
1901 {
1902         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1903
1904         return sprintf(buf, "%s\n", rbd_dev->image_name);
1905 }
1906
1907 static ssize_t rbd_image_id_show(struct device *dev,
1908                              struct device_attribute *attr, char *buf)
1909 {
1910         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1911
1912         return sprintf(buf, "%s\n", rbd_dev->image_id);
1913 }
1914
1915 /*
1916  * Shows the name of the currently-mapped snapshot (or
1917  * RBD_SNAP_HEAD_NAME for the base image).
1918  */
1919 static ssize_t rbd_snap_show(struct device *dev,
1920                              struct device_attribute *attr,
1921                              char *buf)
1922 {
1923         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1924
1925         return sprintf(buf, "%s\n", rbd_dev->mapping.snap_name);
1926 }
1927
1928 static ssize_t rbd_image_refresh(struct device *dev,
1929                                  struct device_attribute *attr,
1930                                  const char *buf,
1931                                  size_t size)
1932 {
1933         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1934         int ret;
1935
1936         ret = rbd_refresh_header(rbd_dev, NULL);
1937
1938         return ret < 0 ? ret : size;
1939 }
1940
1941 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
1942 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
1943 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
1944 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
1945 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
1946 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
1947 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
1948 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
1949 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
1950 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
1951
1952 static struct attribute *rbd_attrs[] = {
1953         &dev_attr_size.attr,
1954         &dev_attr_features.attr,
1955         &dev_attr_major.attr,
1956         &dev_attr_client_id.attr,
1957         &dev_attr_pool.attr,
1958         &dev_attr_pool_id.attr,
1959         &dev_attr_name.attr,
1960         &dev_attr_image_id.attr,
1961         &dev_attr_current_snap.attr,
1962         &dev_attr_refresh.attr,
1963         NULL
1964 };
1965
1966 static struct attribute_group rbd_attr_group = {
1967         .attrs = rbd_attrs,
1968 };
1969
1970 static const struct attribute_group *rbd_attr_groups[] = {
1971         &rbd_attr_group,
1972         NULL
1973 };
1974
1975 static void rbd_sysfs_dev_release(struct device *dev)
1976 {
1977 }
1978
1979 static struct device_type rbd_device_type = {
1980         .name           = "rbd",
1981         .groups         = rbd_attr_groups,
1982         .release        = rbd_sysfs_dev_release,
1983 };
1984
1985
1986 /*
1987   sysfs - snapshots
1988 */
1989
1990 static ssize_t rbd_snap_size_show(struct device *dev,
1991                                   struct device_attribute *attr,
1992                                   char *buf)
1993 {
1994         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1995
1996         return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
1997 }
1998
1999 static ssize_t rbd_snap_id_show(struct device *dev,
2000                                 struct device_attribute *attr,
2001                                 char *buf)
2002 {
2003         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2004
2005         return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
2006 }
2007
2008 static ssize_t rbd_snap_features_show(struct device *dev,
2009                                 struct device_attribute *attr,
2010                                 char *buf)
2011 {
2012         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2013
2014         return sprintf(buf, "0x%016llx\n",
2015                         (unsigned long long) snap->features);
2016 }
2017
2018 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
2019 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
2020 static DEVICE_ATTR(snap_features, S_IRUGO, rbd_snap_features_show, NULL);
2021
2022 static struct attribute *rbd_snap_attrs[] = {
2023         &dev_attr_snap_size.attr,
2024         &dev_attr_snap_id.attr,
2025         &dev_attr_snap_features.attr,
2026         NULL,
2027 };
2028
2029 static struct attribute_group rbd_snap_attr_group = {
2030         .attrs = rbd_snap_attrs,
2031 };
2032
2033 static void rbd_snap_dev_release(struct device *dev)
2034 {
2035         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2036         kfree(snap->name);
2037         kfree(snap);
2038 }
2039
2040 static const struct attribute_group *rbd_snap_attr_groups[] = {
2041         &rbd_snap_attr_group,
2042         NULL
2043 };
2044
2045 static struct device_type rbd_snap_device_type = {
2046         .groups         = rbd_snap_attr_groups,
2047         .release        = rbd_snap_dev_release,
2048 };
2049
2050 static bool rbd_snap_registered(struct rbd_snap *snap)
2051 {
2052         bool ret = snap->dev.type == &rbd_snap_device_type;
2053         bool reg = device_is_registered(&snap->dev);
2054
2055         rbd_assert(!ret ^ reg);
2056
2057         return ret;
2058 }
2059
2060 static void __rbd_remove_snap_dev(struct rbd_snap *snap)
2061 {
2062         list_del(&snap->node);
2063         if (device_is_registered(&snap->dev))
2064                 device_unregister(&snap->dev);
2065 }
2066
2067 static int rbd_register_snap_dev(struct rbd_snap *snap,
2068                                   struct device *parent)
2069 {
2070         struct device *dev = &snap->dev;
2071         int ret;
2072
2073         dev->type = &rbd_snap_device_type;
2074         dev->parent = parent;
2075         dev->release = rbd_snap_dev_release;
2076         dev_set_name(dev, "snap_%s", snap->name);
2077         dout("%s: registering device for snapshot %s\n", __func__, snap->name);
2078
2079         ret = device_register(dev);
2080
2081         return ret;
2082 }
2083
2084 static struct rbd_snap *__rbd_add_snap_dev(struct rbd_device *rbd_dev,
2085                                                 const char *snap_name,
2086                                                 u64 snap_id, u64 snap_size,
2087                                                 u64 snap_features)
2088 {
2089         struct rbd_snap *snap;
2090         int ret;
2091
2092         snap = kzalloc(sizeof (*snap), GFP_KERNEL);
2093         if (!snap)
2094                 return ERR_PTR(-ENOMEM);
2095
2096         ret = -ENOMEM;
2097         snap->name = kstrdup(snap_name, GFP_KERNEL);
2098         if (!snap->name)
2099                 goto err;
2100
2101         snap->id = snap_id;
2102         snap->size = snap_size;
2103         snap->features = snap_features;
2104
2105         return snap;
2106
2107 err:
2108         kfree(snap->name);
2109         kfree(snap);
2110
2111         return ERR_PTR(ret);
2112 }
2113
2114 static char *rbd_dev_v1_snap_info(struct rbd_device *rbd_dev, u32 which,
2115                 u64 *snap_size, u64 *snap_features)
2116 {
2117         char *snap_name;
2118
2119         rbd_assert(which < rbd_dev->header.snapc->num_snaps);
2120
2121         *snap_size = rbd_dev->header.snap_sizes[which];
2122         *snap_features = 0;     /* No features for v1 */
2123
2124         /* Skip over names until we find the one we are looking for */
2125
2126         snap_name = rbd_dev->header.snap_names;
2127         while (which--)
2128                 snap_name += strlen(snap_name) + 1;
2129
2130         return snap_name;
2131 }
2132
2133 /*
2134  * Get the size and object order for an image snapshot, or if
2135  * snap_id is CEPH_NOSNAP, gets this information for the base
2136  * image.
2137  */
2138 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
2139                                 u8 *order, u64 *snap_size)
2140 {
2141         __le64 snapid = cpu_to_le64(snap_id);
2142         int ret;
2143         struct {
2144                 u8 order;
2145                 __le64 size;
2146         } __attribute__ ((packed)) size_buf = { 0 };
2147
2148         ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2149                                 "rbd", "get_size",
2150                                 (char *) &snapid, sizeof (snapid),
2151                                 (char *) &size_buf, sizeof (size_buf),
2152                                 CEPH_OSD_FLAG_READ, NULL);
2153         dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2154         if (ret < 0)
2155                 return ret;
2156
2157         *order = size_buf.order;
2158         *snap_size = le64_to_cpu(size_buf.size);
2159
2160         dout("  snap_id 0x%016llx order = %u, snap_size = %llu\n",
2161                 (unsigned long long) snap_id, (unsigned int) *order,
2162                 (unsigned long long) *snap_size);
2163
2164         return 0;
2165 }
2166
2167 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
2168 {
2169         return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
2170                                         &rbd_dev->header.obj_order,
2171                                         &rbd_dev->header.image_size);
2172 }
2173
2174 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
2175 {
2176         void *reply_buf;
2177         int ret;
2178         void *p;
2179
2180         reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
2181         if (!reply_buf)
2182                 return -ENOMEM;
2183
2184         ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2185                                 "rbd", "get_object_prefix",
2186                                 NULL, 0,
2187                                 reply_buf, RBD_OBJ_PREFIX_LEN_MAX,
2188                                 CEPH_OSD_FLAG_READ, NULL);
2189         dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2190         if (ret < 0)
2191                 goto out;
2192
2193         p = reply_buf;
2194         rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
2195                                                 p + RBD_OBJ_PREFIX_LEN_MAX,
2196                                                 NULL, GFP_NOIO);
2197
2198         if (IS_ERR(rbd_dev->header.object_prefix)) {
2199                 ret = PTR_ERR(rbd_dev->header.object_prefix);
2200                 rbd_dev->header.object_prefix = NULL;
2201         } else {
2202                 dout("  object_prefix = %s\n", rbd_dev->header.object_prefix);
2203         }
2204
2205 out:
2206         kfree(reply_buf);
2207
2208         return ret;
2209 }
2210
2211 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
2212                 u64 *snap_features)
2213 {
2214         __le64 snapid = cpu_to_le64(snap_id);
2215         struct {
2216                 __le64 features;
2217                 __le64 incompat;
2218         } features_buf = { 0 };
2219         int ret;
2220
2221         ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2222                                 "rbd", "get_features",
2223                                 (char *) &snapid, sizeof (snapid),
2224                                 (char *) &features_buf, sizeof (features_buf),
2225                                 CEPH_OSD_FLAG_READ, NULL);
2226         dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2227         if (ret < 0)
2228                 return ret;
2229         *snap_features = le64_to_cpu(features_buf.features);
2230
2231         dout("  snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
2232                 (unsigned long long) snap_id,
2233                 (unsigned long long) *snap_features,
2234                 (unsigned long long) le64_to_cpu(features_buf.incompat));
2235
2236         return 0;
2237 }
2238
2239 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
2240 {
2241         return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
2242                                                 &rbd_dev->header.features);
2243 }
2244
2245 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev, u64 *ver)
2246 {
2247         size_t size;
2248         int ret;
2249         void *reply_buf;
2250         void *p;
2251         void *end;
2252         u64 seq;
2253         u32 snap_count;
2254         struct ceph_snap_context *snapc;
2255         u32 i;
2256
2257         /*
2258          * We'll need room for the seq value (maximum snapshot id),
2259          * snapshot count, and array of that many snapshot ids.
2260          * For now we have a fixed upper limit on the number we're
2261          * prepared to receive.
2262          */
2263         size = sizeof (__le64) + sizeof (__le32) +
2264                         RBD_MAX_SNAP_COUNT * sizeof (__le64);
2265         reply_buf = kzalloc(size, GFP_KERNEL);
2266         if (!reply_buf)
2267                 return -ENOMEM;
2268
2269         ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2270                                 "rbd", "get_snapcontext",
2271                                 NULL, 0,
2272                                 reply_buf, size,
2273                                 CEPH_OSD_FLAG_READ, ver);
2274         dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2275         if (ret < 0)
2276                 goto out;
2277
2278         ret = -ERANGE;
2279         p = reply_buf;
2280         end = (char *) reply_buf + size;
2281         ceph_decode_64_safe(&p, end, seq, out);
2282         ceph_decode_32_safe(&p, end, snap_count, out);
2283
2284         /*
2285          * Make sure the reported number of snapshot ids wouldn't go
2286          * beyond the end of our buffer.  But before checking that,
2287          * make sure the computed size of the snapshot context we
2288          * allocate is representable in a size_t.
2289          */
2290         if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
2291                                  / sizeof (u64)) {
2292                 ret = -EINVAL;
2293                 goto out;
2294         }
2295         if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
2296                 goto out;
2297
2298         size = sizeof (struct ceph_snap_context) +
2299                                 snap_count * sizeof (snapc->snaps[0]);
2300         snapc = kmalloc(size, GFP_KERNEL);
2301         if (!snapc) {
2302                 ret = -ENOMEM;
2303                 goto out;
2304         }
2305
2306         atomic_set(&snapc->nref, 1);
2307         snapc->seq = seq;
2308         snapc->num_snaps = snap_count;
2309         for (i = 0; i < snap_count; i++)
2310                 snapc->snaps[i] = ceph_decode_64(&p);
2311
2312         rbd_dev->header.snapc = snapc;
2313
2314         dout("  snap context seq = %llu, snap_count = %u\n",
2315                 (unsigned long long) seq, (unsigned int) snap_count);
2316
2317 out:
2318         kfree(reply_buf);
2319
2320         return 0;
2321 }
2322
2323 static char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which)
2324 {
2325         size_t size;
2326         void *reply_buf;
2327         __le64 snap_id;
2328         int ret;
2329         void *p;
2330         void *end;
2331         size_t snap_name_len;
2332         char *snap_name;
2333
2334         size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
2335         reply_buf = kmalloc(size, GFP_KERNEL);
2336         if (!reply_buf)
2337                 return ERR_PTR(-ENOMEM);
2338
2339         snap_id = cpu_to_le64(rbd_dev->header.snapc->snaps[which]);
2340         ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2341                                 "rbd", "get_snapshot_name",
2342                                 (char *) &snap_id, sizeof (snap_id),
2343                                 reply_buf, size,
2344                                 CEPH_OSD_FLAG_READ, NULL);
2345         dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2346         if (ret < 0)
2347                 goto out;
2348
2349         p = reply_buf;
2350         end = (char *) reply_buf + size;
2351         snap_name_len = 0;
2352         snap_name = ceph_extract_encoded_string(&p, end, &snap_name_len,
2353                                 GFP_KERNEL);
2354         if (IS_ERR(snap_name)) {
2355                 ret = PTR_ERR(snap_name);
2356                 goto out;
2357         } else {
2358                 dout("  snap_id 0x%016llx snap_name = %s\n",
2359                         (unsigned long long) le64_to_cpu(snap_id), snap_name);
2360         }
2361         kfree(reply_buf);
2362
2363         return snap_name;
2364 out:
2365         kfree(reply_buf);
2366
2367         return ERR_PTR(ret);
2368 }
2369
2370 static char *rbd_dev_v2_snap_info(struct rbd_device *rbd_dev, u32 which,
2371                 u64 *snap_size, u64 *snap_features)
2372 {
2373         __le64 snap_id;
2374         u8 order;
2375         int ret;
2376
2377         snap_id = rbd_dev->header.snapc->snaps[which];
2378         ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, &order, snap_size);
2379         if (ret)
2380                 return ERR_PTR(ret);
2381         ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, snap_features);
2382         if (ret)
2383                 return ERR_PTR(ret);
2384
2385         return rbd_dev_v2_snap_name(rbd_dev, which);
2386 }
2387
2388 static char *rbd_dev_snap_info(struct rbd_device *rbd_dev, u32 which,
2389                 u64 *snap_size, u64 *snap_features)
2390 {
2391         if (rbd_dev->image_format == 1)
2392                 return rbd_dev_v1_snap_info(rbd_dev, which,
2393                                         snap_size, snap_features);
2394         if (rbd_dev->image_format == 2)
2395                 return rbd_dev_v2_snap_info(rbd_dev, which,
2396                                         snap_size, snap_features);
2397         return ERR_PTR(-EINVAL);
2398 }
2399
2400 /*
2401  * Scan the rbd device's current snapshot list and compare it to the
2402  * newly-received snapshot context.  Remove any existing snapshots
2403  * not present in the new snapshot context.  Add a new snapshot for
2404  * any snaphots in the snapshot context not in the current list.
2405  * And verify there are no changes to snapshots we already know
2406  * about.
2407  *
2408  * Assumes the snapshots in the snapshot context are sorted by
2409  * snapshot id, highest id first.  (Snapshots in the rbd_dev's list
2410  * are also maintained in that order.)
2411  */
2412 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev)
2413 {
2414         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
2415         const u32 snap_count = snapc->num_snaps;
2416         struct list_head *head = &rbd_dev->snaps;
2417         struct list_head *links = head->next;
2418         u32 index = 0;
2419
2420         dout("%s: snap count is %u\n", __func__, (unsigned int) snap_count);
2421         while (index < snap_count || links != head) {
2422                 u64 snap_id;
2423                 struct rbd_snap *snap;
2424                 char *snap_name;
2425                 u64 snap_size = 0;
2426                 u64 snap_features = 0;
2427
2428                 snap_id = index < snap_count ? snapc->snaps[index]
2429                                              : CEPH_NOSNAP;
2430                 snap = links != head ? list_entry(links, struct rbd_snap, node)
2431                                      : NULL;
2432                 rbd_assert(!snap || snap->id != CEPH_NOSNAP);
2433
2434                 if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
2435                         struct list_head *next = links->next;
2436
2437                         /* Existing snapshot not in the new snap context */
2438
2439                         if (rbd_dev->mapping.snap_id == snap->id)
2440                                 rbd_dev->mapping.snap_exists = false;
2441                         __rbd_remove_snap_dev(snap);
2442                         dout("%ssnap id %llu has been removed\n",
2443                                 rbd_dev->mapping.snap_id == snap->id ?
2444                                                                 "mapped " : "",
2445                                 (unsigned long long) snap->id);
2446
2447                         /* Done with this list entry; advance */
2448
2449                         links = next;
2450                         continue;
2451                 }
2452
2453                 snap_name = rbd_dev_snap_info(rbd_dev, index,
2454                                         &snap_size, &snap_features);
2455                 if (IS_ERR(snap_name))
2456                         return PTR_ERR(snap_name);
2457
2458                 dout("entry %u: snap_id = %llu\n", (unsigned int) snap_count,
2459                         (unsigned long long) snap_id);
2460                 if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
2461                         struct rbd_snap *new_snap;
2462
2463                         /* We haven't seen this snapshot before */
2464
2465                         new_snap = __rbd_add_snap_dev(rbd_dev, snap_name,
2466                                         snap_id, snap_size, snap_features);
2467                         if (IS_ERR(new_snap)) {
2468                                 int err = PTR_ERR(new_snap);
2469
2470                                 dout("  failed to add dev, error %d\n", err);
2471
2472                                 return err;
2473                         }
2474
2475                         /* New goes before existing, or at end of list */
2476
2477                         dout("  added dev%s\n", snap ? "" : " at end\n");
2478                         if (snap)
2479                                 list_add_tail(&new_snap->node, &snap->node);
2480                         else
2481                                 list_add_tail(&new_snap->node, head);
2482                 } else {
2483                         /* Already have this one */
2484
2485                         dout("  already present\n");
2486
2487                         rbd_assert(snap->size == snap_size);
2488                         rbd_assert(!strcmp(snap->name, snap_name));
2489                         rbd_assert(snap->features == snap_features);
2490
2491                         /* Done with this list entry; advance */
2492
2493                         links = links->next;
2494                 }
2495
2496                 /* Advance to the next entry in the snapshot context */
2497
2498                 index++;
2499         }
2500         dout("%s: done\n", __func__);
2501
2502         return 0;
2503 }
2504
2505 /*
2506  * Scan the list of snapshots and register the devices for any that
2507  * have not already been registered.
2508  */
2509 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev)
2510 {
2511         struct rbd_snap *snap;
2512         int ret = 0;
2513
2514         dout("%s called\n", __func__);
2515         if (WARN_ON(!device_is_registered(&rbd_dev->dev)))
2516                 return -EIO;
2517
2518         list_for_each_entry(snap, &rbd_dev->snaps, node) {
2519                 if (!rbd_snap_registered(snap)) {
2520                         ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
2521                         if (ret < 0)
2522                                 break;
2523                 }
2524         }
2525         dout("%s: returning %d\n", __func__, ret);
2526
2527         return ret;
2528 }
2529
2530 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
2531 {
2532         struct device *dev;
2533         int ret;
2534
2535         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2536
2537         dev = &rbd_dev->dev;
2538         dev->bus = &rbd_bus_type;
2539         dev->type = &rbd_device_type;
2540         dev->parent = &rbd_root_dev;
2541         dev->release = rbd_dev_release;
2542         dev_set_name(dev, "%d", rbd_dev->dev_id);
2543         ret = device_register(dev);
2544
2545         mutex_unlock(&ctl_mutex);
2546
2547         return ret;
2548 }
2549
2550 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
2551 {
2552         device_unregister(&rbd_dev->dev);
2553 }
2554
2555 static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
2556 {
2557         int ret, rc;
2558
2559         do {
2560                 ret = rbd_req_sync_watch(rbd_dev);
2561                 if (ret == -ERANGE) {
2562                         rc = rbd_refresh_header(rbd_dev, NULL);
2563                         if (rc < 0)
2564                                 return rc;
2565                 }
2566         } while (ret == -ERANGE);
2567
2568         return ret;
2569 }
2570
2571 static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
2572
2573 /*
2574  * Get a unique rbd identifier for the given new rbd_dev, and add
2575  * the rbd_dev to the global list.  The minimum rbd id is 1.
2576  */
2577 static void rbd_dev_id_get(struct rbd_device *rbd_dev)
2578 {
2579         rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
2580
2581         spin_lock(&rbd_dev_list_lock);
2582         list_add_tail(&rbd_dev->node, &rbd_dev_list);
2583         spin_unlock(&rbd_dev_list_lock);
2584         dout("rbd_dev %p given dev id %llu\n", rbd_dev,
2585                 (unsigned long long) rbd_dev->dev_id);
2586 }
2587
2588 /*
2589  * Remove an rbd_dev from the global list, and record that its
2590  * identifier is no longer in use.
2591  */
2592 static void rbd_dev_id_put(struct rbd_device *rbd_dev)
2593 {
2594         struct list_head *tmp;
2595         int rbd_id = rbd_dev->dev_id;
2596         int max_id;
2597
2598         rbd_assert(rbd_id > 0);
2599
2600         dout("rbd_dev %p released dev id %llu\n", rbd_dev,
2601                 (unsigned long long) rbd_dev->dev_id);
2602         spin_lock(&rbd_dev_list_lock);
2603         list_del_init(&rbd_dev->node);
2604
2605         /*
2606          * If the id being "put" is not the current maximum, there
2607          * is nothing special we need to do.
2608          */
2609         if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
2610                 spin_unlock(&rbd_dev_list_lock);
2611                 return;
2612         }
2613
2614         /*
2615          * We need to update the current maximum id.  Search the
2616          * list to find out what it is.  We're more likely to find
2617          * the maximum at the end, so search the list backward.
2618          */
2619         max_id = 0;
2620         list_for_each_prev(tmp, &rbd_dev_list) {
2621                 struct rbd_device *rbd_dev;
2622
2623                 rbd_dev = list_entry(tmp, struct rbd_device, node);
2624                 if (rbd_id > max_id)
2625                         max_id = rbd_id;
2626         }
2627         spin_unlock(&rbd_dev_list_lock);
2628
2629         /*
2630          * The max id could have been updated by rbd_dev_id_get(), in
2631          * which case it now accurately reflects the new maximum.
2632          * Be careful not to overwrite the maximum value in that
2633          * case.
2634          */
2635         atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
2636         dout("  max dev id has been reset\n");
2637 }
2638
2639 /*
2640  * Skips over white space at *buf, and updates *buf to point to the
2641  * first found non-space character (if any). Returns the length of
2642  * the token (string of non-white space characters) found.  Note
2643  * that *buf must be terminated with '\0'.
2644  */
2645 static inline size_t next_token(const char **buf)
2646 {
2647         /*
2648         * These are the characters that produce nonzero for
2649         * isspace() in the "C" and "POSIX" locales.
2650         */
2651         const char *spaces = " \f\n\r\t\v";
2652
2653         *buf += strspn(*buf, spaces);   /* Find start of token */
2654
2655         return strcspn(*buf, spaces);   /* Return token length */
2656 }
2657
2658 /*
2659  * Finds the next token in *buf, and if the provided token buffer is
2660  * big enough, copies the found token into it.  The result, if
2661  * copied, is guaranteed to be terminated with '\0'.  Note that *buf
2662  * must be terminated with '\0' on entry.
2663  *
2664  * Returns the length of the token found (not including the '\0').
2665  * Return value will be 0 if no token is found, and it will be >=
2666  * token_size if the token would not fit.
2667  *
2668  * The *buf pointer will be updated to point beyond the end of the
2669  * found token.  Note that this occurs even if the token buffer is
2670  * too small to hold it.
2671  */
2672 static inline size_t copy_token(const char **buf,
2673                                 char *token,
2674                                 size_t token_size)
2675 {
2676         size_t len;
2677
2678         len = next_token(buf);
2679         if (len < token_size) {
2680                 memcpy(token, *buf, len);
2681                 *(token + len) = '\0';
2682         }
2683         *buf += len;
2684
2685         return len;
2686 }
2687
2688 /*
2689  * Finds the next token in *buf, dynamically allocates a buffer big
2690  * enough to hold a copy of it, and copies the token into the new
2691  * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
2692  * that a duplicate buffer is created even for a zero-length token.
2693  *
2694  * Returns a pointer to the newly-allocated duplicate, or a null
2695  * pointer if memory for the duplicate was not available.  If
2696  * the lenp argument is a non-null pointer, the length of the token
2697  * (not including the '\0') is returned in *lenp.
2698  *
2699  * If successful, the *buf pointer will be updated to point beyond
2700  * the end of the found token.
2701  *
2702  * Note: uses GFP_KERNEL for allocation.
2703  */
2704 static inline char *dup_token(const char **buf, size_t *lenp)
2705 {
2706         char *dup;
2707         size_t len;
2708
2709         len = next_token(buf);
2710         dup = kmalloc(len + 1, GFP_KERNEL);
2711         if (!dup)
2712                 return NULL;
2713
2714         memcpy(dup, *buf, len);
2715         *(dup + len) = '\0';
2716         *buf += len;
2717
2718         if (lenp)
2719                 *lenp = len;
2720
2721         return dup;
2722 }
2723
2724 /*
2725  * This fills in the pool_name, image_name, image_name_len, rbd_dev,
2726  * rbd_md_name, and name fields of the given rbd_dev, based on the
2727  * list of monitor addresses and other options provided via
2728  * /sys/bus/rbd/add.  Returns a pointer to a dynamically-allocated
2729  * copy of the snapshot name to map if successful, or a
2730  * pointer-coded error otherwise.
2731  *
2732  * Note: rbd_dev is assumed to have been initially zero-filled.
2733  */
2734 static char *rbd_add_parse_args(struct rbd_device *rbd_dev,
2735                                 const char *buf,
2736                                 const char **mon_addrs,
2737                                 size_t *mon_addrs_size,
2738                                 char *options,
2739                                 size_t options_size)
2740 {
2741         size_t len;
2742         char *err_ptr = ERR_PTR(-EINVAL);
2743         char *snap_name;
2744
2745         /* The first four tokens are required */
2746
2747         len = next_token(&buf);
2748         if (!len)
2749                 return err_ptr;
2750         *mon_addrs_size = len + 1;
2751         *mon_addrs = buf;
2752
2753         buf += len;
2754
2755         len = copy_token(&buf, options, options_size);
2756         if (!len || len >= options_size)
2757                 return err_ptr;
2758
2759         err_ptr = ERR_PTR(-ENOMEM);
2760         rbd_dev->pool_name = dup_token(&buf, NULL);
2761         if (!rbd_dev->pool_name)
2762                 goto out_err;
2763
2764         rbd_dev->image_name = dup_token(&buf, &rbd_dev->image_name_len);
2765         if (!rbd_dev->image_name)
2766                 goto out_err;
2767
2768         /* Snapshot name is optional */
2769         len = next_token(&buf);
2770         if (!len) {
2771                 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
2772                 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
2773         }
2774         snap_name = kmalloc(len + 1, GFP_KERNEL);
2775         if (!snap_name)
2776                 goto out_err;
2777         memcpy(snap_name, buf, len);
2778         *(snap_name + len) = '\0';
2779
2780 dout("    SNAP_NAME is <%s>, len is %zd\n", snap_name, len);
2781
2782         return snap_name;
2783
2784 out_err:
2785         kfree(rbd_dev->image_name);
2786         rbd_dev->image_name = NULL;
2787         rbd_dev->image_name_len = 0;
2788         kfree(rbd_dev->pool_name);
2789         rbd_dev->pool_name = NULL;
2790
2791         return err_ptr;
2792 }
2793
2794 /*
2795  * An rbd format 2 image has a unique identifier, distinct from the
2796  * name given to it by the user.  Internally, that identifier is
2797  * what's used to specify the names of objects related to the image.
2798  *
2799  * A special "rbd id" object is used to map an rbd image name to its
2800  * id.  If that object doesn't exist, then there is no v2 rbd image
2801  * with the supplied name.
2802  *
2803  * This function will record the given rbd_dev's image_id field if
2804  * it can be determined, and in that case will return 0.  If any
2805  * errors occur a negative errno will be returned and the rbd_dev's
2806  * image_id field will be unchanged (and should be NULL).
2807  */
2808 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
2809 {
2810         int ret;
2811         size_t size;
2812         char *object_name;
2813         void *response;
2814         void *p;
2815
2816         /*
2817          * First, see if the format 2 image id file exists, and if
2818          * so, get the image's persistent id from it.
2819          */
2820         size = sizeof (RBD_ID_PREFIX) + rbd_dev->image_name_len;
2821         object_name = kmalloc(size, GFP_NOIO);
2822         if (!object_name)
2823                 return -ENOMEM;
2824         sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->image_name);
2825         dout("rbd id object name is %s\n", object_name);
2826
2827         /* Response will be an encoded string, which includes a length */
2828
2829         size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
2830         response = kzalloc(size, GFP_NOIO);
2831         if (!response) {
2832                 ret = -ENOMEM;
2833                 goto out;
2834         }
2835
2836         ret = rbd_req_sync_exec(rbd_dev, object_name,
2837                                 "rbd", "get_id",
2838                                 NULL, 0,
2839                                 response, RBD_IMAGE_ID_LEN_MAX,
2840                                 CEPH_OSD_FLAG_READ, NULL);
2841         dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2842         if (ret < 0)
2843                 goto out;
2844
2845         p = response;
2846         rbd_dev->image_id = ceph_extract_encoded_string(&p,
2847                                                 p + RBD_IMAGE_ID_LEN_MAX,
2848                                                 &rbd_dev->image_id_len,
2849                                                 GFP_NOIO);
2850         if (IS_ERR(rbd_dev->image_id)) {
2851                 ret = PTR_ERR(rbd_dev->image_id);
2852                 rbd_dev->image_id = NULL;
2853         } else {
2854                 dout("image_id is %s\n", rbd_dev->image_id);
2855         }
2856 out:
2857         kfree(response);
2858         kfree(object_name);
2859
2860         return ret;
2861 }
2862
2863 static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
2864 {
2865         int ret;
2866         size_t size;
2867
2868         /* Version 1 images have no id; empty string is used */
2869
2870         rbd_dev->image_id = kstrdup("", GFP_KERNEL);
2871         if (!rbd_dev->image_id)
2872                 return -ENOMEM;
2873         rbd_dev->image_id_len = 0;
2874
2875         /* Record the header object name for this rbd image. */
2876
2877         size = rbd_dev->image_name_len + sizeof (RBD_SUFFIX);
2878         rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
2879         if (!rbd_dev->header_name) {
2880                 ret = -ENOMEM;
2881                 goto out_err;
2882         }
2883         sprintf(rbd_dev->header_name, "%s%s", rbd_dev->image_name, RBD_SUFFIX);
2884
2885         /* Populate rbd image metadata */
2886
2887         ret = rbd_read_header(rbd_dev, &rbd_dev->header);
2888         if (ret < 0)
2889                 goto out_err;
2890         rbd_dev->image_format = 1;
2891
2892         dout("discovered version 1 image, header name is %s\n",
2893                 rbd_dev->header_name);
2894
2895         return 0;
2896
2897 out_err:
2898         kfree(rbd_dev->header_name);
2899         rbd_dev->header_name = NULL;
2900         kfree(rbd_dev->image_id);
2901         rbd_dev->image_id = NULL;
2902
2903         return ret;
2904 }
2905
2906 static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
2907 {
2908         size_t size;
2909         int ret;
2910         u64 ver = 0;
2911
2912         /*
2913          * Image id was filled in by the caller.  Record the header
2914          * object name for this rbd image.
2915          */
2916         size = sizeof (RBD_HEADER_PREFIX) + rbd_dev->image_id_len;
2917         rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
2918         if (!rbd_dev->header_name)
2919                 return -ENOMEM;
2920         sprintf(rbd_dev->header_name, "%s%s",
2921                         RBD_HEADER_PREFIX, rbd_dev->image_id);
2922
2923         /* Get the size and object order for the image */
2924
2925         ret = rbd_dev_v2_image_size(rbd_dev);
2926         if (ret < 0)
2927                 goto out_err;
2928
2929         /* Get the object prefix (a.k.a. block_name) for the image */
2930
2931         ret = rbd_dev_v2_object_prefix(rbd_dev);
2932         if (ret < 0)
2933                 goto out_err;
2934
2935         /* Get the features for the image */
2936
2937         ret = rbd_dev_v2_features(rbd_dev);
2938         if (ret < 0)
2939                 goto out_err;
2940
2941         /* crypto and compression type aren't (yet) supported for v2 images */
2942
2943         rbd_dev->header.crypt_type = 0;
2944         rbd_dev->header.comp_type = 0;
2945
2946         /* Get the snapshot context, plus the header version */
2947
2948         ret = rbd_dev_v2_snap_context(rbd_dev, &ver);
2949         if (ret)
2950                 goto out_err;
2951         rbd_dev->header.obj_version = ver;
2952
2953         rbd_dev->image_format = 2;
2954
2955         dout("discovered version 2 image, header name is %s\n",
2956                 rbd_dev->header_name);
2957
2958         return -ENOTSUPP;
2959 out_err:
2960         kfree(rbd_dev->header_name);
2961         rbd_dev->header_name = NULL;
2962         kfree(rbd_dev->header.object_prefix);
2963         rbd_dev->header.object_prefix = NULL;
2964
2965         return ret;
2966 }
2967
2968 /*
2969  * Probe for the existence of the header object for the given rbd
2970  * device.  For format 2 images this includes determining the image
2971  * id.
2972  */
2973 static int rbd_dev_probe(struct rbd_device *rbd_dev)
2974 {
2975         int ret;
2976
2977         /*
2978          * Get the id from the image id object.  If it's not a
2979          * format 2 image, we'll get ENOENT back, and we'll assume
2980          * it's a format 1 image.
2981          */
2982         ret = rbd_dev_image_id(rbd_dev);
2983         if (ret)
2984                 ret = rbd_dev_v1_probe(rbd_dev);
2985         else
2986                 ret = rbd_dev_v2_probe(rbd_dev);
2987         if (ret)
2988                 dout("probe failed, returning %d\n", ret);
2989
2990         return ret;
2991 }
2992
2993 static ssize_t rbd_add(struct bus_type *bus,
2994                        const char *buf,
2995                        size_t count)
2996 {
2997         char *options;
2998         struct rbd_device *rbd_dev = NULL;
2999         const char *mon_addrs = NULL;
3000         size_t mon_addrs_size = 0;
3001         struct ceph_osd_client *osdc;
3002         int rc = -ENOMEM;
3003         char *snap_name;
3004
3005         if (!try_module_get(THIS_MODULE))
3006                 return -ENODEV;
3007
3008         options = kmalloc(count, GFP_KERNEL);
3009         if (!options)
3010                 goto err_out_mem;
3011         rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
3012         if (!rbd_dev)
3013                 goto err_out_mem;
3014
3015         /* static rbd_device initialization */
3016         spin_lock_init(&rbd_dev->lock);
3017         INIT_LIST_HEAD(&rbd_dev->node);
3018         INIT_LIST_HEAD(&rbd_dev->snaps);
3019         init_rwsem(&rbd_dev->header_rwsem);
3020
3021         /* parse add command */
3022         snap_name = rbd_add_parse_args(rbd_dev, buf,
3023                                 &mon_addrs, &mon_addrs_size, options, count);
3024         if (IS_ERR(snap_name)) {
3025                 rc = PTR_ERR(snap_name);
3026                 goto err_out_mem;
3027         }
3028
3029         rc = rbd_get_client(rbd_dev, mon_addrs, mon_addrs_size - 1, options);
3030         if (rc < 0)
3031                 goto err_out_args;
3032
3033         /* pick the pool */
3034         osdc = &rbd_dev->rbd_client->client->osdc;
3035         rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
3036         if (rc < 0)
3037                 goto err_out_client;
3038         rbd_dev->pool_id = rc;
3039
3040         rc = rbd_dev_probe(rbd_dev);
3041         if (rc < 0)
3042                 goto err_out_client;
3043         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
3044
3045         /* no need to lock here, as rbd_dev is not registered yet */
3046         rc = rbd_dev_snaps_update(rbd_dev);
3047         if (rc)
3048                 goto err_out_header;
3049
3050         rc = rbd_dev_set_mapping(rbd_dev, snap_name);
3051         if (rc)
3052                 goto err_out_header;
3053
3054         /* generate unique id: find highest unique id, add one */
3055         rbd_dev_id_get(rbd_dev);
3056
3057         /* Fill in the device name, now that we have its id. */
3058         BUILD_BUG_ON(DEV_NAME_LEN
3059                         < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
3060         sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
3061
3062         /* Get our block major device number. */
3063
3064         rc = register_blkdev(0, rbd_dev->name);
3065         if (rc < 0)
3066                 goto err_out_id;
3067         rbd_dev->major = rc;
3068
3069         /* Set up the blkdev mapping. */
3070
3071         rc = rbd_init_disk(rbd_dev);
3072         if (rc)
3073                 goto err_out_blkdev;
3074
3075         rc = rbd_bus_add_dev(rbd_dev);
3076         if (rc)
3077                 goto err_out_disk;
3078
3079         /*
3080          * At this point cleanup in the event of an error is the job
3081          * of the sysfs code (initiated by rbd_bus_del_dev()).
3082          */
3083
3084         down_write(&rbd_dev->header_rwsem);
3085         rc = rbd_dev_snaps_register(rbd_dev);
3086         up_write(&rbd_dev->header_rwsem);
3087         if (rc)
3088                 goto err_out_bus;
3089
3090         rc = rbd_init_watch_dev(rbd_dev);
3091         if (rc)
3092                 goto err_out_bus;
3093
3094         /* Everything's ready.  Announce the disk to the world. */
3095
3096         add_disk(rbd_dev->disk);
3097
3098         pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
3099                 (unsigned long long) rbd_dev->mapping.size);
3100
3101         return count;
3102
3103 err_out_bus:
3104         /* this will also clean up rest of rbd_dev stuff */
3105
3106         rbd_bus_del_dev(rbd_dev);
3107         kfree(options);
3108         return rc;
3109
3110 err_out_disk:
3111         rbd_free_disk(rbd_dev);
3112 err_out_blkdev:
3113         unregister_blkdev(rbd_dev->major, rbd_dev->name);
3114 err_out_id:
3115         rbd_dev_id_put(rbd_dev);
3116 err_out_header:
3117         rbd_header_free(&rbd_dev->header);
3118 err_out_client:
3119         kfree(rbd_dev->header_name);
3120         rbd_put_client(rbd_dev);
3121         kfree(rbd_dev->image_id);
3122 err_out_args:
3123         kfree(rbd_dev->mapping.snap_name);
3124         kfree(rbd_dev->image_name);
3125         kfree(rbd_dev->pool_name);
3126 err_out_mem:
3127         kfree(rbd_dev);
3128         kfree(options);
3129
3130         dout("Error adding device %s\n", buf);
3131         module_put(THIS_MODULE);
3132
3133         return (ssize_t) rc;
3134 }
3135
3136 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
3137 {
3138         struct list_head *tmp;
3139         struct rbd_device *rbd_dev;
3140
3141         spin_lock(&rbd_dev_list_lock);
3142         list_for_each(tmp, &rbd_dev_list) {
3143                 rbd_dev = list_entry(tmp, struct rbd_device, node);
3144                 if (rbd_dev->dev_id == dev_id) {
3145                         spin_unlock(&rbd_dev_list_lock);
3146                         return rbd_dev;
3147                 }
3148         }
3149         spin_unlock(&rbd_dev_list_lock);
3150         return NULL;
3151 }
3152
3153 static void rbd_dev_release(struct device *dev)
3154 {
3155         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3156
3157         if (rbd_dev->watch_request) {
3158                 struct ceph_client *client = rbd_dev->rbd_client->client;
3159
3160                 ceph_osdc_unregister_linger_request(&client->osdc,
3161                                                     rbd_dev->watch_request);
3162         }
3163         if (rbd_dev->watch_event)
3164                 rbd_req_sync_unwatch(rbd_dev);
3165
3166         rbd_put_client(rbd_dev);
3167
3168         /* clean up and free blkdev */
3169         rbd_free_disk(rbd_dev);
3170         unregister_blkdev(rbd_dev->major, rbd_dev->name);
3171
3172         /* release allocated disk header fields */
3173         rbd_header_free(&rbd_dev->header);
3174
3175         /* done with the id, and with the rbd_dev */
3176         kfree(rbd_dev->mapping.snap_name);
3177         kfree(rbd_dev->image_id);
3178         kfree(rbd_dev->header_name);
3179         kfree(rbd_dev->pool_name);
3180         kfree(rbd_dev->image_name);
3181         rbd_dev_id_put(rbd_dev);
3182         kfree(rbd_dev);
3183
3184         /* release module ref */
3185         module_put(THIS_MODULE);
3186 }
3187
3188 static ssize_t rbd_remove(struct bus_type *bus,
3189                           const char *buf,
3190                           size_t count)
3191 {
3192         struct rbd_device *rbd_dev = NULL;
3193         int target_id, rc;
3194         unsigned long ul;
3195         int ret = count;
3196
3197         rc = strict_strtoul(buf, 10, &ul);
3198         if (rc)
3199                 return rc;
3200
3201         /* convert to int; abort if we lost anything in the conversion */
3202         target_id = (int) ul;
3203         if (target_id != ul)
3204                 return -EINVAL;
3205
3206         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
3207
3208         rbd_dev = __rbd_get_dev(target_id);
3209         if (!rbd_dev) {
3210                 ret = -ENOENT;
3211                 goto done;
3212         }
3213
3214         __rbd_remove_all_snaps(rbd_dev);
3215         rbd_bus_del_dev(rbd_dev);
3216
3217 done:
3218         mutex_unlock(&ctl_mutex);
3219
3220         return ret;
3221 }
3222
3223 /*
3224  * create control files in sysfs
3225  * /sys/bus/rbd/...
3226  */
3227 static int rbd_sysfs_init(void)
3228 {
3229         int ret;
3230
3231         ret = device_register(&rbd_root_dev);
3232         if (ret < 0)
3233                 return ret;
3234
3235         ret = bus_register(&rbd_bus_type);
3236         if (ret < 0)
3237                 device_unregister(&rbd_root_dev);
3238
3239         return ret;
3240 }
3241
3242 static void rbd_sysfs_cleanup(void)
3243 {
3244         bus_unregister(&rbd_bus_type);
3245         device_unregister(&rbd_root_dev);
3246 }
3247
3248 int __init rbd_init(void)
3249 {
3250         int rc;
3251
3252         rc = rbd_sysfs_init();
3253         if (rc)
3254                 return rc;
3255         pr_info("loaded " RBD_DRV_NAME_LONG "\n");
3256         return 0;
3257 }
3258
3259 void __exit rbd_exit(void)
3260 {
3261         rbd_sysfs_cleanup();
3262 }
3263
3264 module_init(rbd_init);
3265 module_exit(rbd_exit);
3266
3267 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
3268 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
3269 MODULE_DESCRIPTION("rados block device");
3270
3271 /* following authorship retained from original osdblk.c */
3272 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
3273
3274 MODULE_LICENSE("GPL");