aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Documentation/ABI/testing/sysfs-bus-rbd4
-rw-r--r--drivers/block/rbd.c1389
-rw-r--r--drivers/block/rbd_types.h2
-rw-r--r--fs/ceph/addr.c60
-rw-r--r--fs/ceph/caps.c18
-rw-r--r--fs/ceph/file.c73
-rw-r--r--fs/ceph/inode.c15
-rw-r--r--fs/ceph/mds_client.c11
-rw-r--r--fs/ceph/super.c4
-rw-r--r--include/linux/backing-dev.h1
-rw-r--r--include/linux/ceph/libceph.h2
-rw-r--r--include/linux/ceph/osdmap.h1
-rw-r--r--include/linux/ceph/rados.h2
-rw-r--r--net/ceph/ceph_common.c3
-rw-r--r--net/ceph/messenger.c107
-rw-r--r--net/ceph/osd_client.c59
-rw-r--r--net/ceph/osdmap.c47
17 files changed, 1192 insertions, 606 deletions
diff --git a/Documentation/ABI/testing/sysfs-bus-rbd b/Documentation/ABI/testing/sysfs-bus-rbd
index 1cf2adf46b1..cd9213ccf3d 100644
--- a/Documentation/ABI/testing/sysfs-bus-rbd
+++ b/Documentation/ABI/testing/sysfs-bus-rbd
@@ -70,6 +70,10 @@ snap_*
A directory per each snapshot
+parent
+
+ Information identifying the pool, image, and snapshot id for
+ the parent image in a layered rbd image (format 2 only).
Entries under /sys/bus/rbd/devices/<dev-id>/snap_<snap-name>
-------------------------------------------------------------
diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index bb3d9be3b1b..89576a0b3f2 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -61,15 +61,29 @@
#define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
-#define RBD_MAX_SNAP_NAME_LEN 32
+#define RBD_SNAP_DEV_NAME_PREFIX "snap_"
+#define RBD_MAX_SNAP_NAME_LEN \
+ (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
+
#define RBD_MAX_SNAP_COUNT 510 /* allows max snapc to fit in 4KB */
#define RBD_MAX_OPT_LEN 1024
#define RBD_SNAP_HEAD_NAME "-"
+/* This allows a single page to hold an image name sent by OSD */
+#define RBD_IMAGE_NAME_LEN_MAX (PAGE_SIZE - sizeof (__le32) - 1)
#define RBD_IMAGE_ID_LEN_MAX 64
+
#define RBD_OBJ_PREFIX_LEN_MAX 64
+/* Feature bits */
+
+#define RBD_FEATURE_LAYERING 1
+
+/* Features supported by this (client software) implementation. */
+
+#define RBD_FEATURES_ALL (0)
+
/*
* An RBD device name will be "rbd#", where the "rbd" comes from
* RBD_DRV_NAME above, and # is a unique integer identifier.
@@ -101,6 +115,27 @@ struct rbd_image_header {
u64 obj_version;
};
+/*
+ * An rbd image specification.
+ *
+ * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
+ * identify an image.
+ */
+struct rbd_spec {
+ u64 pool_id;
+ char *pool_name;
+
+ char *image_id;
+ size_t image_id_len;
+ char *image_name;
+ size_t image_name_len;
+
+ u64 snap_id;
+ char *snap_name;
+
+ struct kref kref;
+};
+
struct rbd_options {
bool read_only;
};
@@ -155,11 +190,8 @@ struct rbd_snap {
};
struct rbd_mapping {
- char *snap_name;
- u64 snap_id;
u64 size;
u64 features;
- bool snap_exists;
bool read_only;
};
@@ -173,7 +205,6 @@ struct rbd_device {
struct gendisk *disk; /* blkdev's gendisk and rq */
u32 image_format; /* Either 1 or 2 */
- struct rbd_options rbd_opts;
struct rbd_client *rbd_client;
char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
@@ -181,17 +212,17 @@ struct rbd_device {
spinlock_t lock; /* queue lock */
struct rbd_image_header header;
- char *image_id;
- size_t image_id_len;
- char *image_name;
- size_t image_name_len;
+ bool exists;
+ struct rbd_spec *spec;
+
char *header_name;
- char *pool_name;
- int pool_id;
struct ceph_osd_event *watch_event;
struct ceph_osd_request *watch_request;
+ struct rbd_spec *parent_spec;
+ u64 parent_overlap;
+
/* protects updating the header */
struct rw_semaphore header_rwsem;
@@ -204,6 +235,7 @@ struct rbd_device {
/* sysfs related */
struct device dev;
+ unsigned long open_count;
};
static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */
@@ -218,7 +250,7 @@ static int rbd_dev_snaps_update(struct rbd_device *rbd_dev);
static int rbd_dev_snaps_register(struct rbd_device *rbd_dev);
static void rbd_dev_release(struct device *dev);
-static void __rbd_remove_snap_dev(struct rbd_snap *snap);
+static void rbd_remove_snap_dev(struct rbd_snap *snap);
static ssize_t rbd_add(struct bus_type *bus, const char *buf,
size_t count);
@@ -258,17 +290,8 @@ static struct device rbd_root_dev = {
# define rbd_assert(expr) ((void) 0)
#endif /* !RBD_DEBUG */
-static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
-{
- return get_device(&rbd_dev->dev);
-}
-
-static void rbd_put_dev(struct rbd_device *rbd_dev)
-{
- put_device(&rbd_dev->dev);
-}
-
-static int rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver);
+static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver);
+static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver);
static int rbd_open(struct block_device *bdev, fmode_t mode)
{
@@ -277,8 +300,11 @@ static int rbd_open(struct block_device *bdev, fmode_t mode)
if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
return -EROFS;
- rbd_get_dev(rbd_dev);
+ mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
+ (void) get_device(&rbd_dev->dev);
set_device_ro(bdev, rbd_dev->mapping.read_only);
+ rbd_dev->open_count++;
+ mutex_unlock(&ctl_mutex);
return 0;
}
@@ -287,7 +313,11 @@ static int rbd_release(struct gendisk *disk, fmode_t mode)
{
struct rbd_device *rbd_dev = disk->private_data;
- rbd_put_dev(rbd_dev);
+ mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
+ rbd_assert(rbd_dev->open_count > 0);
+ rbd_dev->open_count--;
+ put_device(&rbd_dev->dev);
+ mutex_unlock(&ctl_mutex);
return 0;
}
@@ -388,7 +418,7 @@ enum {
static match_table_t rbd_opts_tokens = {
/* int args above */
/* string args above */
- {Opt_read_only, "mapping.read_only"},
+ {Opt_read_only, "read_only"},
{Opt_read_only, "ro"}, /* Alternate spelling */
{Opt_read_write, "read_write"},
{Opt_read_write, "rw"}, /* Alternate spelling */
@@ -441,33 +471,17 @@ static int parse_rbd_opts_token(char *c, void *private)
* Get a ceph client with specific addr and configuration, if one does
* not exist create it.
*/
-static int rbd_get_client(struct rbd_device *rbd_dev, const char *mon_addr,
- size_t mon_addr_len, char *options)
+static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
{
- struct rbd_options *rbd_opts = &rbd_dev->rbd_opts;
- struct ceph_options *ceph_opts;
struct rbd_client *rbdc;
- rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
-
- ceph_opts = ceph_parse_options(options, mon_addr,
- mon_addr + mon_addr_len,
- parse_rbd_opts_token, rbd_opts);
- if (IS_ERR(ceph_opts))
- return PTR_ERR(ceph_opts);
-
rbdc = rbd_client_find(ceph_opts);
- if (rbdc) {
- /* using an existing client */
+ if (rbdc) /* using an existing client */
ceph_destroy_options(ceph_opts);
- } else {
+ else
rbdc = rbd_client_create(ceph_opts);
- if (IS_ERR(rbdc))
- return PTR_ERR(rbdc);
- }
- rbd_dev->rbd_client = rbdc;
- return 0;
+ return rbdc;
}
/*
@@ -492,10 +506,10 @@ static void rbd_client_release(struct kref *kref)
* Drop reference to ceph client node. If it's not referenced anymore, release
* it.
*/
-static void rbd_put_client(struct rbd_device *rbd_dev)
+static void rbd_put_client(struct rbd_client *rbdc)
{
- kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
- rbd_dev->rbd_client = NULL;
+ if (rbdc)
+ kref_put(&rbdc->kref, rbd_client_release);
}
/*
@@ -524,6 +538,16 @@ static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
return false;
+ /* The bio layer requires at least sector-sized I/O */
+
+ if (ondisk->options.order < SECTOR_SHIFT)
+ return false;
+
+ /* If we use u64 in a few spots we may be able to loosen this */
+
+ if (ondisk->options.order > 8 * sizeof (int) - 1)
+ return false;
+
/*
* The size of a snapshot header has to fit in a size_t, and
* that limits the number of snapshots.
@@ -635,6 +659,20 @@ out_err:
return -ENOMEM;
}
+static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
+{
+ struct rbd_snap *snap;
+
+ if (snap_id == CEPH_NOSNAP)
+ return RBD_SNAP_HEAD_NAME;
+
+ list_for_each_entry(snap, &rbd_dev->snaps, node)
+ if (snap_id == snap->id)
+ return snap->name;
+
+ return NULL;
+}
+
static int snap_by_name(struct rbd_device *rbd_dev, const char *snap_name)
{
@@ -642,7 +680,7 @@ static int snap_by_name(struct rbd_device *rbd_dev, const char *snap_name)
list_for_each_entry(snap, &rbd_dev->snaps, node) {
if (!strcmp(snap_name, snap->name)) {
- rbd_dev->mapping.snap_id = snap->id;
+ rbd_dev->spec->snap_id = snap->id;
rbd_dev->mapping.size = snap->size;
rbd_dev->mapping.features = snap->features;
@@ -653,26 +691,23 @@ static int snap_by_name(struct rbd_device *rbd_dev, const char *snap_name)
return -ENOENT;
}
-static int rbd_dev_set_mapping(struct rbd_device *rbd_dev, char *snap_name)
+static int rbd_dev_set_mapping(struct rbd_device *rbd_dev)
{
int ret;
- if (!memcmp(snap_name, RBD_SNAP_HEAD_NAME,
+ if (!memcmp(rbd_dev->spec->snap_name, RBD_SNAP_HEAD_NAME,
sizeof (RBD_SNAP_HEAD_NAME))) {
- rbd_dev->mapping.snap_id = CEPH_NOSNAP;
+ rbd_dev->spec->snap_id = CEPH_NOSNAP;
rbd_dev->mapping.size = rbd_dev->header.image_size;
rbd_dev->mapping.features = rbd_dev->header.features;
- rbd_dev->mapping.snap_exists = false;
- rbd_dev->mapping.read_only = rbd_dev->rbd_opts.read_only;
ret = 0;
} else {
- ret = snap_by_name(rbd_dev, snap_name);
+ ret = snap_by_name(rbd_dev, rbd_dev->spec->snap_name);
if (ret < 0)
goto done;
- rbd_dev->mapping.snap_exists = true;
rbd_dev->mapping.read_only = true;
}
- rbd_dev->mapping.snap_name = snap_name;
+ rbd_dev->exists = true;
done:
return ret;
}
@@ -695,13 +730,13 @@ static char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
u64 segment;
int ret;
- name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
+ name = kmalloc(MAX_OBJ_NAME_SIZE + 1, GFP_NOIO);
if (!name)
return NULL;
segment = offset >> rbd_dev->header.obj_order;
- ret = snprintf(name, RBD_MAX_SEG_NAME_LEN, "%s.%012llx",
+ ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, "%s.%012llx",
rbd_dev->header.object_prefix, segment);
- if (ret < 0 || ret >= RBD_MAX_SEG_NAME_LEN) {
+ if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) {
pr_err("error formatting segment name for #%llu (%d)\n",
segment, ret);
kfree(name);
@@ -800,77 +835,144 @@ static void zero_bio_chain(struct bio *chain, int start_ofs)
}
/*
- * bio_chain_clone - clone a chain of bios up to a certain length.
- * might return a bio_pair that will need to be released.
+ * Clone a portion of a bio, starting at the given byte offset
+ * and continuing for the number of bytes indicated.
*/
-static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
- struct bio_pair **bp,
- int len, gfp_t gfpmask)
-{
- struct bio *old_chain = *old;
- struct bio *new_chain = NULL;
- struct bio *tail;
- int total = 0;
-
- if (*bp) {
- bio_pair_release(*bp);
- *bp = NULL;
- }
+static struct bio *bio_clone_range(struct bio *bio_src,
+ unsigned int offset,
+ unsigned int len,
+ gfp_t gfpmask)
+{
+ struct bio_vec *bv;
+ unsigned int resid;
+ unsigned short idx;
+ unsigned int voff;
+ unsigned short end_idx;
+ unsigned short vcnt;
+ struct bio *bio;
- while (old_chain && (total < len)) {
- struct bio *tmp;
+ /* Handle the easy case for the caller */
- tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
- if (!tmp)
- goto err_out;
- gfpmask &= ~__GFP_WAIT; /* can't wait after the first */
+ if (!offset && len == bio_src->bi_size)
+ return bio_clone(bio_src, gfpmask);
- if (total + old_chain->bi_size > len) {
- struct bio_pair *bp;
+ if (WARN_ON_ONCE(!len))
+ return NULL;
+ if (WARN_ON_ONCE(len > bio_src->bi_size))
+ return NULL;
+ if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
+ return NULL;
- /*
- * this split can only happen with a single paged bio,
- * split_bio will BUG_ON if this is not the case
- */
- dout("bio_chain_clone split! total=%d remaining=%d"
- "bi_size=%u\n",
- total, len - total, old_chain->bi_size);
+ /* Find first affected segment... */
- /* split the bio. We'll release it either in the next
- call, or it will have to be released outside */
- bp = bio_split(old_chain, (len - total) / SECTOR_SIZE);
- if (!bp)
- goto err_out;
+ resid = offset;
+ __bio_for_each_segment(bv, bio_src, idx, 0) {
+ if (resid < bv->bv_len)
+ break;
+ resid -= bv->bv_len;
+ }
+ voff = resid;
- __bio_clone(tmp, &bp->bio1);
+ /* ...and the last affected segment */
- *next = &bp->bio2;
- } else {
- __bio_clone(tmp, old_chain);
- *next = old_chain->bi_next;
- }
+ resid += len;
+ __bio_for_each_segment(bv, bio_src, end_idx, idx) {
+ if (resid <= bv->bv_len)
+ break;
+ resid -= bv->bv_len;
+ }
+ vcnt = end_idx - idx + 1;
+
+ /* Build the clone */
- tmp->bi_bdev = NULL;
- tmp->bi_next = NULL;
- if (new_chain)
- tail->bi_next = tmp;
- else
- new_chain = tmp;
- tail = tmp;
- old_chain = old_chain->bi_next;
+ bio = bio_alloc(gfpmask, (unsigned int) vcnt);
+ if (!bio)
+ return NULL; /* ENOMEM */
- total += tmp->bi_size;
+ bio->bi_bdev = bio_src->bi_bdev;
+ bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
+ bio->bi_rw = bio_src->bi_rw;
+ bio->bi_flags |= 1 << BIO_CLONED;
+
+ /*
+ * Copy over our part of the bio_vec, then update the first
+ * and last (or only) entries.
+ */
+ memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
+ vcnt * sizeof (struct bio_vec));
+ bio->bi_io_vec[0].bv_offset += voff;
+ if (vcnt > 1) {
+ bio->bi_io_vec[0].bv_len -= voff;
+ bio->bi_io_vec[vcnt - 1].bv_len = resid;
+ } else {
+ bio->bi_io_vec[0].bv_len = len;
}
- rbd_assert(total == len);
+ bio->bi_vcnt = vcnt;
+ bio->bi_size = len;
+ bio->bi_idx = 0;
+
+ return bio;
+}
+
+/*
+ * Clone a portion of a bio chain, starting at the given byte offset
+ * into the first bio in the source chain and continuing for the
+ * number of bytes indicated. The result is another bio chain of
+ * exactly the given length, or a null pointer on error.
+ *
+ * The bio_src and offset parameters are both in-out. On entry they
+ * refer to the first source bio and the offset into that bio where
+ * the start of data to be cloned is located.
+ *
+ * On return, bio_src is updated to refer to the bio in the source
+ * chain that contains first un-cloned byte, and *offset will
+ * contain the offset of that byte within that bio.
+ */
+static struct bio *bio_chain_clone_range(struct bio **bio_src,
+ unsigned int *offset,
+ unsigned int len,
+ gfp_t gfpmask)
+{
+ struct bio *bi = *bio_src;
+ unsigned int off = *offset;
+ struct bio *chain = NULL;
+ struct bio **end;
+
+ /* Build up a chain of clone bios up to the limit */
+
+ if (!bi || off >= bi->bi_size || !len)
+ return NULL; /* Nothing to clone */
- *old = old_chain;
+ end = &chain;
+ while (len) {
+ unsigned int bi_size;
+ struct bio *bio;
+
+ if (!bi)
+ goto out_err; /* EINVAL; ran out of bio's */
+ bi_size = min_t(unsigned int, bi->bi_size - off, len);
+ bio = bio_clone_range(bi, off, bi_size, gfpmask);
+ if (!bio)
+ goto out_err; /* ENOMEM */
+
+ *end = bio;
+ end = &bio->bi_next;
+
+ off += bi_size;
+ if (off == bi->bi_size) {
+ bi = bi->bi_next;
+ off = 0;
+ }
+ len -= bi_size;
+ }
+ *bio_src = bi;
+ *offset = off;
- return new_chain;
+ return chain;
+out_err:
+ bio_chain_put(chain);
-err_out:
- dout("bio_chain_clone with err\n");
- bio_chain_put(new_chain);
return NULL;
}
@@ -988,8 +1090,9 @@ static int rbd_do_request(struct request *rq,
req_data->coll_index = coll_index;
}
- dout("rbd_do_request object_name=%s ofs=%llu len=%llu\n", object_name,
- (unsigned long long) ofs, (unsigned long long) len);
+ dout("rbd_do_request object_name=%s ofs=%llu len=%llu coll=%p[%d]\n",
+ object_name, (unsigned long long) ofs,
+ (unsigned long long) len, coll, coll_index);
osdc = &rbd_dev->rbd_client->client->osdc;
req = ceph_osdc_alloc_request(osdc, flags, snapc, ops,
@@ -1019,7 +1122,7 @@ static int rbd_do_request(struct request *rq,
layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
layout->fl_stripe_count = cpu_to_le32(1);
layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
- layout->fl_pg_pool = cpu_to_le32(rbd_dev->pool_id);
+ layout->fl_pg_pool = cpu_to_le32((int) rbd_dev->spec->pool_id);
ret = ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
req, ops);
rbd_assert(ret == 0);
@@ -1154,8 +1257,6 @@ done:
static int rbd_do_op(struct request *rq,
struct rbd_device *rbd_dev,
struct ceph_snap_context *snapc,
- u64 snapid,
- int opcode, int flags,
u64 ofs, u64 len,
struct bio *bio,
struct rbd_req_coll *coll,
@@ -1167,6 +1268,9 @@ static int rbd_do_op(struct request *rq,
int ret;
struct ceph_osd_req_op *ops;
u32 payload_len;
+ int opcode;
+ int flags;
+ u64 snapid;
seg_name = rbd_segment_name(rbd_dev, ofs);
if (!seg_name)
@@ -1174,7 +1278,18 @@ static int rbd_do_op(struct request *rq,
seg_len = rbd_segment_length(rbd_dev, ofs, len);
seg_ofs = rbd_segment_offset(rbd_dev, ofs);
- payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0);
+ if (rq_data_dir(rq) == WRITE) {
+ opcode = CEPH_OSD_OP_WRITE;
+ flags = CEPH_OSD_FLAG_WRITE|CEPH_OSD_FLAG_ONDISK;
+ snapid = CEPH_NOSNAP;
+ payload_len = seg_len;
+ } else {
+ opcode = CEPH_OSD_OP_READ;
+ flags = CEPH_OSD_FLAG_READ;
+ snapc = NULL;
+ snapid = rbd_dev->spec->snap_id;
+ payload_len = 0;
+ }
ret = -ENOMEM;
ops = rbd_create_rw_ops(1, opcode, payload_len);
@@ -1202,41 +1317,6 @@ done:
}
/*
- * Request async osd write
- */
-static int rbd_req_write(struct request *rq,
- struct rbd_device *rbd_dev,
- struct ceph_snap_context *snapc,
- u64 ofs, u64 len,
- struct bio *bio,
- struct rbd_req_coll *coll,
- int coll_index)
-{
- return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
- CEPH_OSD_OP_WRITE,
- CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
- ofs, len, bio, coll, coll_index);
-}
-
-/*
- * Request async osd read
- */
-static int rbd_req_read(struct request *rq,
- struct rbd_device *rbd_dev,
- u64 snapid,
- u64 ofs, u64 len,
- struct bio *bio,
- struct rbd_req_coll *coll,
- int coll_index)
-{
- return rbd_do_op(rq, rbd_dev, NULL,
- snapid,
- CEPH_OSD_OP_READ,
- CEPH_OSD_FLAG_READ,
- ofs, len, bio, coll, coll_index);
-}
-
-/*
* Request sync osd read
*/
static int rbd_req_sync_read(struct rbd_device *rbd_dev,
@@ -1304,7 +1384,7 @@ static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
dout("rbd_watch_cb %s notify_id=%llu opcode=%u\n",
rbd_dev->header_name, (unsigned long long) notify_id,
(unsigned int) opcode);
- rc = rbd_refresh_header(rbd_dev, &hver);
+ rc = rbd_dev_refresh(rbd_dev, &hver);
if (rc)
pr_warning(RBD_DRV_NAME "%d got notification but failed to "
" update snaps: %d\n", rbd_dev->major, rc);
@@ -1460,18 +1540,16 @@ static void rbd_rq_fn(struct request_queue *q)
{
struct rbd_device *rbd_dev = q->queuedata;
struct request *rq;
- struct bio_pair *bp = NULL;
while ((rq = blk_fetch_request(q))) {
struct bio *bio;
- struct bio *rq_bio, *next_bio = NULL;
bool do_write;
unsigned int size;
- u64 op_size = 0;
u64 ofs;
int num_segs, cur_seg = 0;
struct rbd_req_coll *coll;
struct ceph_snap_context *snapc;
+ unsigned int bio_offset;
dout("fetched request\n");
@@ -1483,10 +1561,6 @@ static void rbd_rq_fn(struct request_queue *q)
/* deduce our operation (read, write) */
do_write = (rq_data_dir(rq) == WRITE);
-
- size = blk_rq_bytes(rq);
- ofs = blk_rq_pos(rq) * SECTOR_SIZE;
- rq_bio = rq->bio;
if (do_write && rbd_dev->mapping.read_only) {
__blk_end_request_all(rq, -EROFS);
continue;
@@ -1496,8 +1570,8 @@ static void rbd_rq_fn(struct request_queue *q)
down_read(&rbd_dev->header_rwsem);
- if (rbd_dev->mapping.snap_id != CEPH_NOSNAP &&
- !rbd_dev->mapping.snap_exists) {
+ if (!rbd_dev->exists) {
+ rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
up_read(&rbd_dev->header_rwsem);
dout("request for non-existent snapshot");
spin_lock_irq(q->queue_lock);
@@ -1509,6 +1583,10 @@ static void rbd_rq_fn(struct request_queue *q)
up_read(&rbd_dev->header_rwsem);
+ size = blk_rq_bytes(rq);
+ ofs = blk_rq_pos(rq) * SECTOR_SIZE;
+ bio = rq->bio;
+
dout("%s 0x%x bytes at 0x%llx\n",
do_write ? "write" : "read",
size, (unsigned long long) blk_rq_pos(rq) * SECTOR_SIZE);
@@ -1528,45 +1606,37 @@ static void rbd_rq_fn(struct request_queue *q)
continue;
}
+ bio_offset = 0;
do {
- /* a bio clone to be passed down to OSD req */
+ u64 limit = rbd_segment_length(rbd_dev, ofs, size);
+ unsigned int chain_size;
+ struct bio *bio_chain;
+
+ BUG_ON(limit > (u64) UINT_MAX);
+ chain_size = (unsigned int) limit;
dout("rq->bio->bi_vcnt=%hu\n", rq->bio->bi_vcnt);
- op_size = rbd_segment_length(rbd_dev, ofs, size);
+
kref_get(&coll->kref);
- bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
- op_size, GFP_ATOMIC);
- if (!bio) {
- rbd_coll_end_req_index(rq, coll, cur_seg,
- -ENOMEM, op_size);
- goto next_seg;
- }
+ /* Pass a cloned bio chain via an osd request */
- /* init OSD command: write or read */
- if (do_write)
- rbd_req_write(rq, rbd_dev,
- snapc,
- ofs,
- op_size, bio,
- coll, cur_seg);
+ bio_chain = bio_chain_clone_range(&bio,
+ &bio_offset, chain_size,
+ GFP_ATOMIC);
+ if (bio_chain)
+ (void) rbd_do_op(rq, rbd_dev, snapc,
+ ofs, chain_size,
+ bio_chain, coll, cur_seg);
else
- rbd_req_read(rq, rbd_dev,
- rbd_dev->mapping.snap_id,
- ofs,
- op_size, bio,
- coll, cur_seg);
-
-next_seg:
- size -= op_size;
- ofs += op_size;
+ rbd_coll_end_req_index(rq, coll, cur_seg,
+ -ENOMEM, chain_size);
+ size -= chain_size;
+ ofs += chain_size;
cur_seg++;
- rq_bio = next_bio;
} while (size > 0);
kref_put(&coll->kref, rbd_coll_release);
- if (bp)
- bio_pair_release(bp);
spin_lock_irq(q->queue_lock);
ceph_put_snap_context(snapc);
@@ -1576,28 +1646,47 @@ next_seg:
/*
* a queue callback. Makes sure that we don't create a bio that spans across
* multiple osd objects. One exception would be with a single page bios,
- * which we handle later at bio_chain_clone
+ * which we handle later at bio_chain_clone_range()
*/
static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
struct bio_vec *bvec)
{
struct rbd_device *rbd_dev = q->queuedata;
- unsigned int chunk_sectors;
- sector_t sector;
- unsigned int bio_sectors;
- int max;
+ sector_t sector_offset;
+ sector_t sectors_per_obj;
+ sector_t obj_sector_offset;
+ int ret;
- chunk_sectors = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
- sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
- bio_sectors = bmd->bi_size >> SECTOR_SHIFT;
+ /*
+ * Find how far into its rbd object the partition-relative
+ * bio start sector is to offset relative to the enclosing
+ * device.
+ */
+ sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
+ sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
+ obj_sector_offset = sector_offset & (sectors_per_obj - 1);
+
+ /*
+ * Compute the number of bytes from that offset to the end
+ * of the object. Account for what's already used by the bio.
+ */
+ ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
+ if (ret > bmd->bi_size)
+ ret -= bmd->bi_size;
+ else
+ ret = 0;
- max = (chunk_sectors - ((sector & (chunk_sectors - 1))
- + bio_sectors)) << SECTOR_SHIFT;
- if (max < 0)
- max = 0; /* bio_add cannot handle a negative return */
- if (max <= bvec->bv_len && bio_sectors == 0)
- return bvec->bv_len;
- return max;
+ /*
+ * Don't send back more than was asked for. And if the bio
+ * was empty, let the whole thing through because: "Note
+ * that a block device *must* allow a single page to be
+ * added to an empty bio."
+ */
+ rbd_assert(bvec->bv_len <= PAGE_SIZE);
+ if (ret > (int) bvec->bv_len || !bmd->bi_size)
+ ret = (int) bvec->bv_len;
+
+ return ret;
}
static void rbd_free_disk(struct rbd_device *rbd_dev)
@@ -1663,13 +1752,13 @@ rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
ret = -ENXIO;
pr_warning("short header read for image %s"
" (want %zd got %d)\n",
- rbd_dev->image_name, size, ret);
+ rbd_dev->spec->image_name, size, ret);
goto out_err;
}
if (!rbd_dev_ondisk_valid(ondisk)) {
ret = -ENXIO;
pr_warning("invalid header for image %s\n",
- rbd_dev->image_name);
+ rbd_dev->spec->image_name);
goto out_err;
}
@@ -1707,19 +1796,32 @@ static int rbd_read_header(struct rbd_device *rbd_dev,
return ret;
}
-static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
+static void rbd_remove_all_snaps(struct rbd_device *rbd_dev)
{
struct rbd_snap *snap;
struct rbd_snap *next;
list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node)
- __rbd_remove_snap_dev(snap);
+ rbd_remove_snap_dev(snap);
+}
+
+static void rbd_update_mapping_size(struct rbd_device *rbd_dev)
+{
+ sector_t size;
+
+ if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
+ return;
+
+ size = (sector_t) rbd_dev->header.image_size / SECTOR_SIZE;
+ dout("setting size to %llu sectors", (unsigned long long) size);
+ rbd_dev->mapping.size = (u64) size;
+ set_capacity(rbd_dev->disk, size);
}
/*
* only read the first part of the ondisk header, without the snaps info
*/
-static int __rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver)
+static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev, u64 *hver)
{
int ret;
struct rbd_image_header h;
@@ -1730,17 +1832,9 @@ static int __rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver)
down_write(&rbd_dev->header_rwsem);
- /* resized? */
- if (rbd_dev->mapping.snap_id == CEPH_NOSNAP) {
- sector_t size = (sector_t) h.image_size / SECTOR_SIZE;
-
- if (size != (sector_t) rbd_dev->mapping.size) {
- dout("setting size to %llu sectors",
- (unsigned long long) size);
- rbd_dev->mapping.size = (u64) size;
- set_capacity(rbd_dev->disk, size);
- }
- }
+ /* Update image size, and check for resize of mapped image */
+ rbd_dev->header.image_size = h.image_size;
+ rbd_update_mapping_size(rbd_dev);
/* rbd_dev->header.object_prefix shouldn't change */
kfree(rbd_dev->header.snap_sizes);
@@ -1768,12 +1862,16 @@ static int __rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver)
return ret;
}
-static int rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver)
+static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver)
{
int ret;
+ rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
- ret = __rbd_refresh_header(rbd_dev, hver);
+ if (rbd_dev->image_format == 1)
+ ret = rbd_dev_v1_refresh(rbd_dev, hver);
+ else
+ ret = rbd_dev_v2_refresh(rbd_dev, hver);
mutex_unlock(&ctl_mutex);
return ret;
@@ -1885,7 +1983,7 @@ static ssize_t rbd_pool_show(struct device *dev,
{
struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
- return sprintf(buf, "%s\n", rbd_dev->pool_name);
+ return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
}
static ssize_t rbd_pool_id_show(struct device *dev,
@@ -1893,7 +1991,8 @@ static ssize_t rbd_pool_id_show(struct device *dev,
{
struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
- return sprintf(buf, "%d\n", rbd_dev->pool_id);
+ return sprintf(buf, "%llu\n",
+ (unsigned long long) rbd_dev->spec->pool_id);
}
static ssize_t rbd_name_show(struct device *dev,
@@ -1901,7 +2000,10 @@ static ssize_t rbd_name_show(struct device *dev,
{
struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
- return sprintf(buf, "%s\n", rbd_dev->image_name);
+ if (rbd_dev->spec->image_name)
+ return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
+
+ return sprintf(buf, "(unknown)\n");
}
static ssize_t rbd_image_id_show(struct device *dev,
@@ -1909,7 +2011,7 @@ static ssize_t rbd_image_id_show(struct device *dev,
{
struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
- return sprintf(buf, "%s\n", rbd_dev->image_id);
+ return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
}
/*
@@ -1922,7 +2024,50 @@ static ssize_t rbd_snap_show(struct device *dev,
{
struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
- return sprintf(buf, "%s\n", rbd_dev->mapping.snap_name);
+ return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
+}
+
+/*
+ * For an rbd v2 image, shows the pool id, image id, and snapshot id
+ * for the parent image. If there is no parent, simply shows
+ * "(no parent image)".
+ */
+static ssize_t rbd_parent_show(struct device *dev,
+ struct device_attribute *attr,
+ char *buf)
+{
+ struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
+ struct rbd_spec *spec = rbd_dev->parent_spec;
+ int count;
+ char *bufp = buf;
+
+ if (!spec)
+ return sprintf(buf, "(no parent image)\n");
+
+ count = sprintf(bufp, "pool_id %llu\npool_name %s\n",
+ (unsigned long long) spec->pool_id, spec->pool_name);
+ if (count < 0)
+ return count;
+ bufp += count;
+
+ count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id,
+ spec->image_name ? spec->image_name : "(unknown)");
+ if (count < 0)
+ return count;
+ bufp += count;
+
+ count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n",
+ (unsigned long long) spec->snap_id, spec->snap_name);
+ if (count < 0)
+ return count;
+ bufp += count;
+
+ count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap);
+ if (count < 0)
+ return count;
+ bufp += count;
+
+ return (ssize_t) (bufp - buf);
}
static ssize_t rbd_image_refresh(struct device *dev,
@@ -1933,7 +2078,7 @@ static ssize_t rbd_image_refresh(struct device *dev,
struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
int ret;
- ret = rbd_refresh_header(rbd_dev, NULL);
+ ret = rbd_dev_refresh(rbd_dev, NULL);
return ret < 0 ? ret : size;
}
@@ -1948,6 +2093,7 @@ static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
+static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
static struct attribute *rbd_attrs[] = {
&dev_attr_size.attr,
@@ -1959,6 +2105,7 @@ static struct attribute *rbd_attrs[] = {
&dev_attr_name.attr,
&dev_attr_image_id.attr,
&dev_attr_current_snap.attr,
+ &dev_attr_parent.attr,
&dev_attr_refresh.attr,
NULL
};
@@ -2047,6 +2194,74 @@ static struct device_type rbd_snap_device_type = {
.release = rbd_snap_dev_release,
};
+static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
+{
+ kref_get(&spec->kref);
+
+ return spec;
+}
+
+static void rbd_spec_free(struct kref *kref);
+static void rbd_spec_put(struct rbd_spec *spec)
+{
+ if (spec)
+ kref_put(&spec->kref, rbd_spec_free);
+}
+
+static struct rbd_spec *rbd_spec_alloc(void)
+{
+ struct rbd_spec *spec;
+
+ spec = kzalloc(sizeof (*spec), GFP_KERNEL);
+ if (!spec)
+ return NULL;
+ kref_init(&spec->kref);
+
+ rbd_spec_put(rbd_spec_get(spec)); /* TEMPORARY */
+
+ return spec;
+}
+
+static void rbd_spec_free(struct kref *kref)
+{
+ struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
+
+ kfree(spec->pool_name);
+ kfree(spec->image_id);
+ kfree(spec->image_name);
+ kfree(spec->snap_name);
+ kfree(spec);
+}
+
+struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
+ struct rbd_spec *spec)
+{
+ struct rbd_device *rbd_dev;
+
+ rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
+ if (!rbd_dev)
+ return NULL;
+
+ spin_lock_init(&rbd_dev->lock);
+ INIT_LIST_HEAD(&rbd_dev->node);
+ INIT_LIST_HEAD(&rbd_dev->snaps);
+ init_rwsem(&rbd_dev->header_rwsem);
+
+ rbd_dev->spec = spec;
+ rbd_dev->rbd_client = rbdc;
+
+ return rbd_dev;
+}
+
+static void rbd_dev_destroy(struct rbd_device *rbd_dev)
+{
+ rbd_spec_put(rbd_dev->parent_spec);
+ kfree(rbd_dev->header_name);
+ rbd_put_client(rbd_dev->rbd_client);
+ rbd_spec_put(rbd_dev->spec);
+ kfree(rbd_dev);
+}
+
static bool rbd_snap_registered(struct rbd_snap *snap)
{
bool ret = snap->dev.type == &rbd_snap_device_type;
@@ -2057,7 +2272,7 @@ static bool rbd_snap_registered(struct rbd_snap *snap)
return ret;
}
-static void __rbd_remove_snap_dev(struct rbd_snap *snap)
+static void rbd_remove_snap_dev(struct rbd_snap *snap)
{
list_del(&snap->node);
if (device_is_registered(&snap->dev))
@@ -2073,7 +2288,7 @@ static int rbd_register_snap_dev(struct rbd_snap *snap,
dev->type = &rbd_snap_device_type;
dev->parent = parent;
dev->release = rbd_snap_dev_release;
- dev_set_name(dev, "snap_%s", snap->name);
+ dev_set_name(dev, "%s%s", RBD_SNAP_DEV_NAME_PREFIX, snap->name);
dout("%s: registering device for snapshot %s\n", __func__, snap->name);
ret = device_register(dev);
@@ -2189,6 +2404,7 @@ static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
if (ret < 0)
goto out;
+ ret = 0; /* rbd_req_sync_exec() can return positive */
p = reply_buf;
rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
@@ -2216,6 +2432,7 @@ static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
__le64 features;
__le64 incompat;
} features_buf = { 0 };
+ u64 incompat;
int ret;
ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
@@ -2226,6 +2443,11 @@ static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
if (ret < 0)
return ret;
+
+ incompat = le64_to_cpu(features_buf.incompat);
+ if (incompat & ~RBD_FEATURES_ALL)
+ return -ENXIO;
+
*snap_features = le64_to_cpu(features_buf.features);
dout(" snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
@@ -2242,6 +2464,183 @@ static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
&rbd_dev->header.features);
}
+static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
+{
+ struct rbd_spec *parent_spec;
+ size_t size;
+ void *reply_buf = NULL;
+ __le64 snapid;
+ void *p;
+ void *end;
+ char *image_id;
+ u64 overlap;
+ size_t len = 0;
+ int ret;
+
+ parent_spec = rbd_spec_alloc();
+ if (!parent_spec)
+ return -ENOMEM;
+
+ size = sizeof (__le64) + /* pool_id */
+ sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX + /* image_id */
+ sizeof (__le64) + /* snap_id */
+ sizeof (__le64); /* overlap */
+ reply_buf = kmalloc(size, GFP_KERNEL);
+ if (!reply_buf) {
+ ret = -ENOMEM;
+ goto out_err;
+ }
+
+ snapid = cpu_to_le64(CEPH_NOSNAP);
+ ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
+ "rbd", "get_parent",
+ (char *) &snapid, sizeof (snapid),
+ (char *) reply_buf, size,
+ CEPH_OSD_FLAG_READ, NULL);
+ dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
+ if (ret < 0)
+ goto out_err;
+
+ ret = -ERANGE;
+ p = reply_buf;
+ end = (char *) reply_buf + size;
+ ceph_decode_64_safe(&p, end, parent_spec->pool_id, out_err);
+ if (parent_spec->pool_id == CEPH_NOPOOL)
+ goto out; /* No parent? No problem. */
+
+ image_id = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
+ if (IS_ERR(image_id)) {
+ ret = PTR_ERR(image_id);
+ goto out_err;
+ }
+ parent_spec->image_id = image_id;
+ parent_spec->image_id_len = len;
+ ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
+ ceph_decode_64_safe(&p, end, overlap, out_err);
+
+ rbd_dev->parent_overlap = overlap;
+ rbd_dev->parent_spec = parent_spec;
+ parent_spec = NULL; /* rbd_dev now owns this */
+out:
+ ret = 0;
+out_err:
+ kfree(reply_buf);
+ rbd_spec_put(parent_spec);
+
+ return ret;
+}
+
+static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
+{
+ size_t image_id_size;
+ char *image_id;
+ void *p;
+ void *end;
+ size_t size;
+ void *reply_buf = NULL;
+ size_t len = 0;
+ char *image_name = NULL;
+ int ret;
+
+ rbd_assert(!rbd_dev->spec->image_name);
+
+ image_id_size = sizeof (__le32) + rbd_dev->spec->image_id_len;
+ image_id = kmalloc(image_id_size, GFP_KERNEL);
+ if (!image_id)
+ return NULL;
+
+ p = image_id;
+ end = (char *) image_id + image_id_size;
+ ceph_encode_string(&p, end, rbd_dev->spec->image_id,
+ (u32) rbd_dev->spec->image_id_len);
+
+ size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
+ reply_buf = kmalloc(size, GFP_KERNEL);
+ if (!reply_buf)
+ goto out;
+
+ ret = rbd_req_sync_exec(rbd_dev, RBD_DIRECTORY,
+ "rbd", "dir_get_name",
+ image_id, image_id_size,
+ (char *) reply_buf, size,
+ CEPH_OSD_FLAG_READ, NULL);
+ if (ret < 0)
+ goto out;
+ p = reply_buf;
+ end = (char *) reply_buf + size;
+ image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
+ if (IS_ERR(image_name))
+ image_name = NULL;
+ else
+ dout("%s: name is %s len is %zd\n", __func__, image_name, len);
+out:
+ kfree(reply_buf);
+ kfree(image_id);
+
+ return image_name;
+}
+
+/*
+ * When a parent image gets probed, we only have the pool, image,
+ * and snapshot ids but not the names of any of them. This call
+ * is made later to fill in those names. It has to be done after
+ * rbd_dev_snaps_update() has completed because some of the
+ * information (in particular, snapshot name) is not available
+ * until then.
+ */
+static int rbd_dev_probe_update_spec(struct rbd_device *rbd_dev)
+{
+ struct ceph_osd_client *osdc;
+ const char *name;
+ void *reply_buf = NULL;
+ int ret;
+
+ if (rbd_dev->spec->pool_name)
+ return 0; /* Already have the names */
+
+ /* Look up the pool name */
+
+ osdc = &rbd_dev->rbd_client->client->osdc;
+ name = ceph_pg_pool_name_by_id(osdc->osdmap, rbd_dev->spec->pool_id);
+ if (!name)
+ return -EIO; /* pool id too large (>= 2^31) */
+
+ rbd_dev->spec->pool_name = kstrdup(name, GFP_KERNEL);
+ if (!rbd_dev->spec->pool_name)
+ return -ENOMEM;
+
+ /* Fetch the image name; tolerate failure here */
+
+ name = rbd_dev_image_name(rbd_dev);
+ if (name) {
+ rbd_dev->spec->image_name_len = strlen(name);
+ rbd_dev->spec->image_name = (char *) name;
+ } else {
+ pr_warning(RBD_DRV_NAME "%d "
+ "unable to get image name for image id %s\n",
+ rbd_dev->major, rbd_dev->spec->image_id);
+ }
+
+ /* Look up the snapshot name. */
+
+ name = rbd_snap_name(rbd_dev, rbd_dev->spec->snap_id);
+ if (!name) {
+ ret = -EIO;
+ goto out_err;
+ }
+ rbd_dev->spec->snap_name = kstrdup(name, GFP_KERNEL);
+ if(!rbd_dev->spec->snap_name)
+ goto out_err;
+
+ return 0;
+out_err:
+ kfree(reply_buf);
+ kfree(rbd_dev->spec->pool_name);
+ rbd_dev->spec->pool_name = NULL;
+
+ return ret;
+}
+
static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev, u64 *ver)
{
size_t size;
@@ -2328,7 +2727,6 @@ static char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which)
int ret;
void *p;
void *end;
- size_t snap_name_len;
char *snap_name;
size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
@@ -2348,9 +2746,7 @@ static char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which)
p = reply_buf;
end = (char *) reply_buf + size;
- snap_name_len = 0;
- snap_name = ceph_extract_encoded_string(&p, end, &snap_name_len,
- GFP_KERNEL);
+ snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
if (IS_ERR(snap_name)) {
ret = PTR_ERR(snap_name);
goto out;
@@ -2397,6 +2793,41 @@ static char *rbd_dev_snap_info(struct rbd_device *rbd_dev, u32 which,
return ERR_PTR(-EINVAL);
}
+static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver)
+{
+ int ret;
+ __u8 obj_order;
+
+ down_write(&rbd_dev->header_rwsem);
+
+ /* Grab old order first, to see if it changes */
+
+ obj_order = rbd_dev->header.obj_order,
+ ret = rbd_dev_v2_image_size(rbd_dev);
+ if (ret)
+ goto out;
+ if (rbd_dev->header.obj_order != obj_order) {
+ ret = -EIO;
+ goto out;
+ }
+ rbd_update_mapping_size(rbd_dev);
+
+ ret = rbd_dev_v2_snap_context(rbd_dev, hver);
+ dout("rbd_dev_v2_snap_context returned %d\n", ret);
+ if (ret)
+ goto out;
+ ret = rbd_dev_snaps_update(rbd_dev);
+ dout("rbd_dev_snaps_update returned %d\n", ret);
+ if (ret)
+ goto out;
+ ret = rbd_dev_snaps_register(rbd_dev);
+ dout("rbd_dev_snaps_register returned %d\n", ret);
+out:
+ up_write(&rbd_dev->header_rwsem);
+
+ return ret;
+}
+
/*
* Scan the rbd device's current snapshot list and compare it to the
* newly-received snapshot context. Remove any existing snapshots
@@ -2436,12 +2867,12 @@ static int rbd_dev_snaps_update(struct rbd_device *rbd_dev)
/* Existing snapshot not in the new snap context */
- if (rbd_dev->mapping.snap_id == snap->id)
- rbd_dev->mapping.snap_exists = false;
- __rbd_remove_snap_dev(snap);
+ if (rbd_dev->spec->snap_id == snap->id)
+ rbd_dev->exists = false;
+ rbd_remove_snap_dev(snap);
dout("%ssnap id %llu has been removed\n",
- rbd_dev->mapping.snap_id == snap->id ?
- "mapped " : "",
+ rbd_dev->spec->snap_id == snap->id ?
+ "mapped " : "",
(unsigned long long) snap->id);
/* Done with this list entry; advance */
@@ -2559,7 +2990,7 @@ static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
do {
ret = rbd_req_sync_watch(rbd_dev);
if (ret == -ERANGE) {
- rc = rbd_refresh_header(rbd_dev, NULL);
+ rc = rbd_dev_refresh(rbd_dev, NULL);
if (rc < 0)
return rc;
}
@@ -2621,8 +3052,8 @@ static void rbd_dev_id_put(struct rbd_device *rbd_dev)
struct rbd_device *rbd_dev;
rbd_dev = list_entry(tmp, struct rbd_device, node);
- if (rbd_id > max_id)
- max_id = rbd_id;
+ if (rbd_dev->dev_id > max_id)
+ max_id = rbd_dev->dev_id;
}
spin_unlock(&rbd_dev_list_lock);
@@ -2722,73 +3153,140 @@ static inline char *dup_token(const char **buf, size_t *lenp)
}
/*
- * This fills in the pool_name, image_name, image_name_len, rbd_dev,
- * rbd_md_name, and name fields of the given rbd_dev, based on the
- * list of monitor addresses and other options provided via
- * /sys/bus/rbd/add. Returns a pointer to a dynamically-allocated
- * copy of the snapshot name to map if successful, or a
- * pointer-coded error otherwise.
+ * Parse the options provided for an "rbd add" (i.e., rbd image
+ * mapping) request. These arrive via a write to /sys/bus/rbd/add,
+ * and the data written is passed here via a NUL-terminated buffer.
+ * Returns 0 if successful or an error code otherwise.
+ *
+ * The information extracted from these options is recorded in
+ * the other parameters which return dynamically-allocated
+ * structures:
+ * ceph_opts
+ * The address of a pointer that will refer to a ceph options
+ * structure. Caller must release the returned pointer using
+ * ceph_destroy_options() when it is no longer needed.
+ * rbd_opts
+ * Address of an rbd options pointer. Fully initialized by
+ * this function; caller must release with kfree().
+ * spec
+ * Address of an rbd image specification pointer. Fully
+ * initialized by this function based on parsed options.
+ * Caller must release with rbd_spec_put().
*
- * Note: rbd_dev is assumed to have been initially zero-filled.
+ * The options passed take this form:
+ * <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
+ * where:
+ * <mon_addrs>
+ * A comma-separated list of one or more monitor addresses.
+ * A monitor address is an ip address, optionally followed
+ * by a port number (separated by a colon).
+ * I.e.: ip1[:port1][,ip2[:port2]...]
+ * <options>
+ * A comma-separated list of ceph and/or rbd options.
+ * <pool_name>
+ * The name of the rados pool containing the rbd image.
+ * <image_name>
+ * The name of the image in that pool to map.
+ * <snap_id>
+ * An optional snapshot id. If provided, the mapping will
+ * present data from the image at the time that snapshot was
+ * created. The image head is used if no snapshot id is
+ * provided. Snapshot mappings are always read-only.
*/
-static char *rbd_add_parse_args(struct rbd_device *rbd_dev,
- const char *buf,
- const char **mon_addrs,
- size_t *mon_addrs_size,
- char *options,
- size_t options_size)
+static int rbd_add_parse_args(const char *buf,
+ struct ceph_options **ceph_opts,
+ struct rbd_options **opts,
+ struct rbd_spec **rbd_spec)
{
size_t len;
- char *err_ptr = ERR_PTR(-EINVAL);
- char *snap_name;
+ char *options;
+ const char *mon_addrs;
+ size_t mon_addrs_size;
+ struct rbd_spec *spec = NULL;
+ struct rbd_options *rbd_opts = NULL;
+ struct ceph_options *copts;
+ int ret;
/* The first four tokens are required */
len = next_token(&buf);
if (!len)
- return err_ptr;
- *mon_addrs_size = len + 1;
- *mon_addrs = buf;
-
+ return -EINVAL; /* Missing monitor address(es) */
+ mon_addrs = buf;
+ mon_addrs_size = len + 1;
buf += len;
- len = copy_token(&buf, options, options_size);
- if (!len || len >= options_size)
- return err_ptr;
+ ret = -EINVAL;
+ options = dup_token(&buf, NULL);
+ if (!options)
+ return -ENOMEM;
+ if (!*options)
+ goto out_err; /* Missing options */
- err_ptr = ERR_PTR(-ENOMEM);
- rbd_dev->pool_name = dup_token(&buf, NULL);
- if (!rbd_dev->pool_name)
- goto out_err;
+ spec = rbd_spec_alloc();
+ if (!spec)
+ goto out_mem;
- rbd_dev->image_name = dup_token(&buf, &rbd_dev->image_name_len);
- if (!rbd_dev->image_name)
- goto out_err;
+ spec->pool_name = dup_token(&buf, NULL);
+ if (!spec->pool_name)
+ goto out_mem;
+ if (!*spec->pool_name)
+ goto out_err; /* Missing pool name */
- /* Snapshot name is optional */
+ spec->image_name = dup_token(&buf, &spec->image_name_len);
+ if (!spec->image_name)
+ goto out_mem;
+ if (!*spec->image_name)
+ goto out_err; /* Missing image name */
+
+ /*
+ * Snapshot name is optional; default is to use "-"
+ * (indicating the head/no snapshot).
+ */
len = next_token(&buf);
if (!len) {
buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
- }
- snap_name = kmalloc(len + 1, GFP_KERNEL);
- if (!snap_name)
+ } else if (len > RBD_MAX_SNAP_NAME_LEN) {
+ ret = -ENAMETOOLONG;
goto out_err;
- memcpy(snap_name, buf, len);
- *(snap_name + len) = '\0';
+ }
+ spec->snap_name = kmalloc(len + 1, GFP_KERNEL);
+ if (!spec->snap_name)
+ goto out_mem;
+ memcpy(spec->snap_name, buf, len);
+ *(spec->snap_name + len) = '\0';
-dout(" SNAP_NAME is <%s>, len is %zd\n", snap_name, len);
+ /* Initialize all rbd options to the defaults */
- return snap_name;
+ rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
+ if (!rbd_opts)
+ goto out_mem;
+
+ rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
+
+ copts = ceph_parse_options(options, mon_addrs,
+ mon_addrs + mon_addrs_size - 1,
+ parse_rbd_opts_token, rbd_opts);
+ if (IS_ERR(copts)) {
+ ret = PTR_ERR(copts);
+ goto out_err;
+ }
+ kfree(options);
+ *ceph_opts = copts;
+ *opts = rbd_opts;
+ *rbd_spec = spec;
+
+ return 0;
+out_mem:
+ ret = -ENOMEM;
out_err:
- kfree(rbd_dev->image_name);
- rbd_dev->image_name = NULL;
- rbd_dev->image_name_len = 0;
- kfree(rbd_dev->pool_name);
- rbd_dev->pool_name = NULL;
+ kfree(rbd_opts);
+ rbd_spec_put(spec);
+ kfree(options);
- return err_ptr;
+ return ret;
}
/*
@@ -2814,14 +3312,22 @@ static int rbd_dev_image_id(struct rbd_device *rbd_dev)
void *p;
/*
+ * When probing a parent image, the image id is already
+ * known (and the image name likely is not). There's no
+ * need to fetch the image id again in this case.
+ */
+ if (rbd_dev->spec->image_id)
+ return 0;
+
+ /*
* First, see if the format 2 image id file exists, and if
* so, get the image's persistent id from it.
*/
- size = sizeof (RBD_ID_PREFIX) + rbd_dev->image_name_len;
+ size = sizeof (RBD_ID_PREFIX) + rbd_dev->spec->image_name_len;
object_name = kmalloc(size, GFP_NOIO);
if (!object_name)
return -ENOMEM;
- sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->image_name);
+ sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
dout("rbd id object name is %s\n", object_name);
/* Response will be an encoded string, which includes a length */
@@ -2841,17 +3347,18 @@ static int rbd_dev_image_id(struct rbd_device *rbd_dev)
dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
if (ret < 0)
goto out;
+ ret = 0; /* rbd_req_sync_exec() can return positive */
p = response;
- rbd_dev->image_id = ceph_extract_encoded_string(&p,
+ rbd_dev->spec->image_id = ceph_extract_encoded_string(&p,
p + RBD_IMAGE_ID_LEN_MAX,
- &rbd_dev->image_id_len,
+ &rbd_dev->spec->image_id_len,
GFP_NOIO);
- if (IS_ERR(rbd_dev->image_id)) {
- ret = PTR_ERR(rbd_dev->image_id);
- rbd_dev->image_id = NULL;
+ if (IS_ERR(rbd_dev->spec->image_id)) {
+ ret = PTR_ERR(rbd_dev->spec->image_id);
+ rbd_dev->spec->image_id = NULL;
} else {
- dout("image_id is %s\n", rbd_dev->image_id);
+ dout("image_id is %s\n", rbd_dev->spec->image_id);
}
out:
kfree(response);
@@ -2867,26 +3374,33 @@ static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
/* Version 1 images have no id; empty string is used */
- rbd_dev->image_id = kstrdup("", GFP_KERNEL);
- if (!rbd_dev->image_id)
+ rbd_dev->spec->image_id = kstrdup("", GFP_KERNEL);
+ if (!rbd_dev->spec->image_id)
return -ENOMEM;
- rbd_dev->image_id_len = 0;
+ rbd_dev->spec->image_id_len = 0;
/* Record the header object name for this rbd image. */
- size = rbd_dev->image_name_len + sizeof (RBD_SUFFIX);
+ size = rbd_dev->spec->image_name_len + sizeof (RBD_SUFFIX);
rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
if (!rbd_dev->header_name) {
ret = -ENOMEM;
goto out_err;
}
- sprintf(rbd_dev->header_name, "%s%s", rbd_dev->image_name, RBD_SUFFIX);
+ sprintf(rbd_dev->header_name, "%s%s",
+ rbd_dev->spec->image_name, RBD_SUFFIX);
/* Populate rbd image metadata */
ret = rbd_read_header(rbd_dev, &rbd_dev->header);
if (ret < 0)
goto out_err;
+
+ /* Version 1 images have no parent (no layering) */
+
+ rbd_dev->parent_spec = NULL;
+ rbd_dev->parent_overlap = 0;
+
rbd_dev->image_format = 1;
dout("discovered version 1 image, header name is %s\n",
@@ -2897,8 +3411,8 @@ static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
out_err:
kfree(rbd_dev->header_name);
rbd_dev->header_name = NULL;
- kfree(rbd_dev->image_id);
- rbd_dev->image_id = NULL;
+ kfree(rbd_dev->spec->image_id);
+ rbd_dev->spec->image_id = NULL;
return ret;
}
@@ -2913,12 +3427,12 @@ static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
* Image id was filled in by the caller. Record the header
* object name for this rbd image.
*/
- size = sizeof (RBD_HEADER_PREFIX) + rbd_dev->image_id_len;
+ size = sizeof (RBD_HEADER_PREFIX) + rbd_dev->spec->image_id_len;
rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
if (!rbd_dev->header_name)
return -ENOMEM;
sprintf(rbd_dev->header_name, "%s%s",
- RBD_HEADER_PREFIX, rbd_dev->image_id);
+ RBD_HEADER_PREFIX, rbd_dev->spec->image_id);
/* Get the size and object order for the image */
@@ -2932,12 +3446,20 @@ static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
if (ret < 0)
goto out_err;
- /* Get the features for the image */
+ /* Get the and check features for the image */
ret = rbd_dev_v2_features(rbd_dev);
if (ret < 0)
goto out_err;
+ /* If the image supports layering, get the parent info */
+
+ if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
+ ret = rbd_dev_v2_parent_info(rbd_dev);
+ if (ret < 0)
+ goto out_err;
+ }
+
/* crypto and compression type aren't (yet) supported for v2 images */
rbd_dev->header.crypt_type = 0;
@@ -2955,8 +3477,11 @@ static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
dout("discovered version 2 image, header name is %s\n",
rbd_dev->header_name);
- return -ENOTSUPP;
+ return 0;
out_err:
+ rbd_dev->parent_overlap = 0;
+ rbd_spec_put(rbd_dev->parent_spec);
+ rbd_dev->parent_spec = NULL;
kfree(rbd_dev->header_name);
rbd_dev->header_name = NULL;
kfree(rbd_dev->header.object_prefix);
@@ -2965,91 +3490,22 @@ out_err:
return ret;
}
-/*
- * Probe for the existence of the header object for the given rbd
- * device. For format 2 images this includes determining the image
- * id.
- */
-static int rbd_dev_probe(struct rbd_device *rbd_dev)
+static int rbd_dev_probe_finish(struct rbd_device *rbd_dev)
{
int ret;
- /*
- * Get the id from the image id object. If it's not a
- * format 2 image, we'll get ENOENT back, and we'll assume
- * it's a format 1 image.
- */
- ret = rbd_dev_image_id(rbd_dev);
- if (ret)
- ret = rbd_dev_v1_probe(rbd_dev);
- else
- ret = rbd_dev_v2_probe(rbd_dev);
+ /* no need to lock here, as rbd_dev is not registered yet */
+ ret = rbd_dev_snaps_update(rbd_dev);
if (ret)
- dout("probe failed, returning %d\n", ret);
-
- return ret;
-}
-
-static ssize_t rbd_add(struct bus_type *bus,
- const char *buf,
- size_t count)
-{
- char *options;
- struct rbd_device *rbd_dev = NULL;
- const char *mon_addrs = NULL;
- size_t mon_addrs_size = 0;
- struct ceph_osd_client *osdc;
- int rc = -ENOMEM;
- char *snap_name;
-
- if (!try_module_get(THIS_MODULE))
- return -ENODEV;
-
- options = kmalloc(count, GFP_KERNEL);
- if (!options)
- goto err_out_mem;
- rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
- if (!rbd_dev)
- goto err_out_mem;
-
- /* static rbd_device initialization */
- spin_lock_init(&rbd_dev->lock);
- INIT_LIST_HEAD(&rbd_dev->node);
- INIT_LIST_HEAD(&rbd_dev->snaps);
- init_rwsem(&rbd_dev->header_rwsem);
-
- /* parse add command */
- snap_name = rbd_add_parse_args(rbd_dev, buf,
- &mon_addrs, &mon_addrs_size, options, count);
- if (IS_ERR(snap_name)) {
- rc = PTR_ERR(snap_name);
- goto err_out_mem;
- }
-
- rc = rbd_get_client(rbd_dev, mon_addrs, mon_addrs_size - 1, options);
- if (rc < 0)
- goto err_out_args;
-
- /* pick the pool */
- osdc = &rbd_dev->rbd_client->client->osdc;
- rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
- if (rc < 0)
- goto err_out_client;
- rbd_dev->pool_id = rc;
-
- rc = rbd_dev_probe(rbd_dev);
- if (rc < 0)
- goto err_out_client;
- rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
+ return ret;
- /* no need to lock here, as rbd_dev is not registered yet */
- rc = rbd_dev_snaps_update(rbd_dev);
- if (rc)
- goto err_out_header;
+ ret = rbd_dev_probe_update_spec(rbd_dev);
+ if (ret)
+ goto err_out_snaps;
- rc = rbd_dev_set_mapping(rbd_dev, snap_name);
- if (rc)
- goto err_out_header;
+ ret = rbd_dev_set_mapping(rbd_dev);
+ if (ret)
+ goto err_out_snaps;
/* generate unique id: find highest unique id, add one */
rbd_dev_id_get(rbd_dev);
@@ -3061,34 +3517,33 @@ static ssize_t rbd_add(struct bus_type *bus,
/* Get our block major device number. */
- rc = register_blkdev(0, rbd_dev->name);
- if (rc < 0)
+ ret = register_blkdev(0, rbd_dev->name);
+ if (ret < 0)
goto err_out_id;
- rbd_dev->major = rc;
+ rbd_dev->major = ret;
/* Set up the blkdev mapping. */
- rc = rbd_init_disk(rbd_dev);
- if (rc)
+ ret = rbd_init_disk(rbd_dev);
+ if (ret)
goto err_out_blkdev;
- rc = rbd_bus_add_dev(rbd_dev);
- if (rc)
+ ret = rbd_bus_add_dev(rbd_dev);
+ if (ret)
goto err_out_disk;
/*
* At this point cleanup in the event of an error is the job
* of the sysfs code (initiated by rbd_bus_del_dev()).
*/
-
down_write(&rbd_dev->header_rwsem);
- rc = rbd_dev_snaps_register(rbd_dev);
+ ret = rbd_dev_snaps_register(rbd_dev);
up_write(&rbd_dev->header_rwsem);
- if (rc)
+ if (ret)
goto err_out_bus;
- rc = rbd_init_watch_dev(rbd_dev);
- if (rc)
+ ret = rbd_init_watch_dev(rbd_dev);
+ if (ret)
goto err_out_bus;
/* Everything's ready. Announce the disk to the world. */
@@ -3098,37 +3553,119 @@ static ssize_t rbd_add(struct bus_type *bus,
pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
(unsigned long long) rbd_dev->mapping.size);
- return count;
-
+ return ret;
err_out_bus:
/* this will also clean up rest of rbd_dev stuff */
rbd_bus_del_dev(rbd_dev);
- kfree(options);
- return rc;
+ return ret;
err_out_disk:
rbd_free_disk(rbd_dev);
err_out_blkdev:
unregister_blkdev(rbd_dev->major, rbd_dev->name);
err_out_id:
rbd_dev_id_put(rbd_dev);
-err_out_header:
- rbd_header_free(&rbd_dev->header);
+err_out_snaps:
+ rbd_remove_all_snaps(rbd_dev);
+
+ return ret;
+}
+
+/*
+ * Probe for the existence of the header object for the given rbd
+ * device. For format 2 images this includes determining the image
+ * id.
+ */
+static int rbd_dev_probe(struct rbd_device *rbd_dev)
+{
+ int ret;
+
+ /*
+ * Get the id from the image id object. If it's not a
+ * format 2 image, we'll get ENOENT back, and we'll assume
+ * it's a format 1 image.
+ */
+ ret = rbd_dev_image_id(rbd_dev);
+ if (ret)
+ ret = rbd_dev_v1_probe(rbd_dev);
+ else
+ ret = rbd_dev_v2_probe(rbd_dev);
+ if (ret) {
+ dout("probe failed, returning %d\n", ret);
+
+ return ret;
+ }
+
+ ret = rbd_dev_probe_finish(rbd_dev);
+ if (ret)
+ rbd_header_free(&rbd_dev->header);
+
+ return ret;
+}
+
+static ssize_t rbd_add(struct bus_type *bus,
+ const char *buf,
+ size_t count)
+{
+ struct rbd_device *rbd_dev = NULL;
+ struct ceph_options *ceph_opts = NULL;
+ struct rbd_options *rbd_opts = NULL;
+ struct rbd_spec *spec = NULL;
+ struct rbd_client *rbdc;
+ struct ceph_osd_client *osdc;
+ int rc = -ENOMEM;
+
+ if (!try_module_get(THIS_MODULE))
+ return -ENODEV;
+
+ /* parse add command */
+ rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
+ if (rc < 0)
+ goto err_out_module;
+
+ rbdc = rbd_get_client(ceph_opts);
+ if (IS_ERR(rbdc)) {
+ rc = PTR_ERR(rbdc);
+ goto err_out_args;
+ }
+ ceph_opts = NULL; /* rbd_dev client now owns this */
+
+ /* pick the pool */
+ osdc = &rbdc->client->osdc;
+ rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
+ if (rc < 0)
+ goto err_out_client;
+ spec->pool_id = (u64) rc;
+
+ rbd_dev = rbd_dev_create(rbdc, spec);
+ if (!rbd_dev)
+ goto err_out_client;
+ rbdc = NULL; /* rbd_dev now owns this */
+ spec = NULL; /* rbd_dev now owns this */
+
+ rbd_dev->mapping.read_only = rbd_opts->read_only;
+ kfree(rbd_opts);
+ rbd_opts = NULL; /* done with this */
+
+ rc = rbd_dev_probe(rbd_dev);
+ if (rc < 0)
+ goto err_out_rbd_dev;
+
+ return count;
+err_out_rbd_dev:
+ rbd_dev_destroy(rbd_dev);
err_out_client:
- kfree(rbd_dev->header_name);
- rbd_put_client(rbd_dev);
- kfree(rbd_dev->image_id);
+ rbd_put_client(rbdc);
err_out_args:
- kfree(rbd_dev->mapping.snap_name);
- kfree(rbd_dev->image_name);
- kfree(rbd_dev->pool_name);
-err_out_mem:
- kfree(rbd_dev);
- kfree(options);
+ if (ceph_opts)
+ ceph_destroy_options(ceph_opts);
+ kfree(rbd_opts);
+ rbd_spec_put(spec);
+err_out_module:
+ module_put(THIS_MODULE);
dout("Error adding device %s\n", buf);
- module_put(THIS_MODULE);
return (ssize_t) rc;
}
@@ -3163,7 +3700,6 @@ static void rbd_dev_release(struct device *dev)
if (rbd_dev->watch_event)
rbd_req_sync_unwatch(rbd_dev);
- rbd_put_client(rbd_dev);
/* clean up and free blkdev */
rbd_free_disk(rbd_dev);
@@ -3173,13 +3709,9 @@ static void rbd_dev_release(struct device *dev)
rbd_header_free(&rbd_dev->header);
/* done with the id, and with the rbd_dev */
- kfree(rbd_dev->mapping.snap_name);
- kfree(rbd_dev->image_id);
- kfree(rbd_dev->header_name);
- kfree(rbd_dev->pool_name);
- kfree(rbd_dev->image_name);
rbd_dev_id_put(rbd_dev);
- kfree(rbd_dev);
+ rbd_assert(rbd_dev->rbd_client != NULL);
+ rbd_dev_destroy(rbd_dev);
/* release module ref */
module_put(THIS_MODULE);
@@ -3211,7 +3743,12 @@ static ssize_t rbd_remove(struct bus_type *bus,
goto done;
}
- __rbd_remove_all_snaps(rbd_dev);
+ if (rbd_dev->open_count) {
+ ret = -EBUSY;
+ goto done;
+ }
+
+ rbd_remove_all_snaps(rbd_dev);
rbd_bus_del_dev(rbd_dev);
done:
diff --git a/drivers/block/rbd_types.h b/drivers/block/rbd_types.h
index cbe77fa105b..49d77cbcf8b 100644
--- a/drivers/block/rbd_types.h
+++ b/drivers/block/rbd_types.h
@@ -46,8 +46,6 @@
#define RBD_MIN_OBJ_ORDER 16
#define RBD_MAX_OBJ_ORDER 30
-#define RBD_MAX_SEG_NAME_LEN 128
-
#define RBD_COMP_NONE 0
#define RBD_CRYPT_NONE 0
diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c
index 6690269f5dd..064d1a68d2c 100644
--- a/fs/ceph/addr.c
+++ b/fs/ceph/addr.c
@@ -267,6 +267,14 @@ static void finish_read(struct ceph_osd_request *req, struct ceph_msg *msg)
kfree(req->r_pages);
}
+static void ceph_unlock_page_vector(struct page **pages, int num_pages)
+{
+ int i;
+
+ for (i = 0; i < num_pages; i++)
+ unlock_page(pages[i]);
+}
+
/*
* start an async read(ahead) operation. return nr_pages we submitted
* a read for on success, or negative error code.
@@ -347,6 +355,7 @@ static int start_read(struct inode *inode, struct list_head *page_list, int max)
return nr_pages;
out_pages:
+ ceph_unlock_page_vector(pages, nr_pages);
ceph_release_page_vector(pages, nr_pages);
out:
ceph_osdc_put_request(req);
@@ -1078,23 +1087,51 @@ static int ceph_write_begin(struct file *file, struct address_space *mapping,
struct page **pagep, void **fsdata)
{
struct inode *inode = file->f_dentry->d_inode;
+ struct ceph_inode_info *ci = ceph_inode(inode);
+ struct ceph_file_info *fi = file->private_data;
struct page *page;
pgoff_t index = pos >> PAGE_CACHE_SHIFT;
- int r;
+ int r, want, got = 0;
+
+ if (fi->fmode & CEPH_FILE_MODE_LAZY)
+ want = CEPH_CAP_FILE_BUFFER | CEPH_CAP_FILE_LAZYIO;
+ else
+ want = CEPH_CAP_FILE_BUFFER;
+
+ dout("write_begin %p %llx.%llx %llu~%u getting caps. i_size %llu\n",
+ inode, ceph_vinop(inode), pos, len, inode->i_size);
+ r = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, &got, pos+len);
+ if (r < 0)
+ return r;
+ dout("write_begin %p %llx.%llx %llu~%u got cap refs on %s\n",
+ inode, ceph_vinop(inode), pos, len, ceph_cap_string(got));
+ if (!(got & (CEPH_CAP_FILE_BUFFER|CEPH_CAP_FILE_LAZYIO))) {
+ ceph_put_cap_refs(ci, got);
+ return -EAGAIN;
+ }
do {
/* get a page */
page = grab_cache_page_write_begin(mapping, index, 0);
- if (!page)
- return -ENOMEM;
- *pagep = page;
+ if (!page) {
+ r = -ENOMEM;
+ break;
+ }
dout("write_begin file %p inode %p page %p %d~%d\n", file,
inode, page, (int)pos, (int)len);
r = ceph_update_writeable_page(file, pos, len, page);
+ if (r)
+ page_cache_release(page);
} while (r == -EAGAIN);
+ if (r) {
+ ceph_put_cap_refs(ci, got);
+ } else {
+ *pagep = page;
+ *(int *)fsdata = got;
+ }
return r;
}
@@ -1108,10 +1145,12 @@ static int ceph_write_end(struct file *file, struct address_space *mapping,
struct page *page, void *fsdata)
{
struct inode *inode = file->f_dentry->d_inode;
+ struct ceph_inode_info *ci = ceph_inode(inode);
struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
struct ceph_mds_client *mdsc = fsc->mdsc;
unsigned from = pos & (PAGE_CACHE_SIZE - 1);
int check_cap = 0;
+ int got = (unsigned long)fsdata;
dout("write_end file %p inode %p page %p %d~%d (%d)\n", file,
inode, page, (int)pos, (int)copied, (int)len);
@@ -1134,6 +1173,19 @@ static int ceph_write_end(struct file *file, struct address_space *mapping,
up_read(&mdsc->snap_rwsem);
page_cache_release(page);
+ if (copied > 0) {
+ int dirty;
+ spin_lock(&ci->i_ceph_lock);
+ dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR);
+ spin_unlock(&ci->i_ceph_lock);
+ if (dirty)
+ __mark_inode_dirty(inode, dirty);
+ }
+
+ dout("write_end %p %llx.%llx %llu~%u dropping cap refs on %s\n",
+ inode, ceph_vinop(inode), pos, len, ceph_cap_string(got));
+ ceph_put_cap_refs(ci, got);
+
if (check_cap)
ceph_check_caps(ceph_inode(inode), CHECK_CAPS_AUTHONLY, NULL);
diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c
index 3251e9cc640..a1d9bb30c1b 100644
--- a/fs/ceph/caps.c
+++ b/fs/ceph/caps.c
@@ -236,8 +236,10 @@ static struct ceph_cap *get_cap(struct ceph_mds_client *mdsc,
if (!ctx) {
cap = kmem_cache_alloc(ceph_cap_cachep, GFP_NOFS);
if (cap) {
+ spin_lock(&mdsc->caps_list_lock);
mdsc->caps_use_count++;
mdsc->caps_total_count++;
+ spin_unlock(&mdsc->caps_list_lock);
}
return cap;
}
@@ -1349,11 +1351,15 @@ int __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask)
if (!ci->i_head_snapc)
ci->i_head_snapc = ceph_get_snap_context(
ci->i_snap_realm->cached_context);
- dout(" inode %p now dirty snapc %p\n", &ci->vfs_inode,
- ci->i_head_snapc);
+ dout(" inode %p now dirty snapc %p auth cap %p\n",
+ &ci->vfs_inode, ci->i_head_snapc, ci->i_auth_cap);
BUG_ON(!list_empty(&ci->i_dirty_item));
spin_lock(&mdsc->cap_dirty_lock);
- list_add(&ci->i_dirty_item, &mdsc->cap_dirty);
+ if (ci->i_auth_cap)
+ list_add(&ci->i_dirty_item, &mdsc->cap_dirty);
+ else
+ list_add(&ci->i_dirty_item,
+ &mdsc->cap_dirty_migrating);
spin_unlock(&mdsc->cap_dirty_lock);
if (ci->i_flushing_caps == 0) {
ihold(inode);
@@ -2388,7 +2394,7 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant,
&atime);
/* max size increase? */
- if (max_size != ci->i_max_size) {
+ if (ci->i_auth_cap == cap && max_size != ci->i_max_size) {
dout("max_size %lld -> %llu\n", ci->i_max_size, max_size);
ci->i_max_size = max_size;
if (max_size >= ci->i_wanted_max_size) {
@@ -2745,6 +2751,7 @@ static void handle_cap_import(struct ceph_mds_client *mdsc,
/* make sure we re-request max_size, if necessary */
spin_lock(&ci->i_ceph_lock);
+ ci->i_wanted_max_size = 0; /* reset */
ci->i_requested_max_size = 0;
spin_unlock(&ci->i_ceph_lock);
}
@@ -2840,8 +2847,6 @@ void ceph_handle_caps(struct ceph_mds_session *session,
case CEPH_CAP_OP_IMPORT:
handle_cap_import(mdsc, inode, h, session,
snaptrace, snaptrace_len);
- ceph_check_caps(ceph_inode(inode), 0, session);
- goto done_unlocked;
}
/* the rest require a cap */
@@ -2858,6 +2863,7 @@ void ceph_handle_caps(struct ceph_mds_session *session,
switch (op) {
case CEPH_CAP_OP_REVOKE:
case CEPH_CAP_OP_GRANT:
+ case CEPH_CAP_OP_IMPORT:
handle_cap_grant(inode, h, session, cap, msg->middle);
goto done_unlocked;
diff --git a/fs/ceph/file.c b/fs/ceph/file.c
index d4dfdcf76d7..e51558fca3a 100644
--- a/fs/ceph/file.c
+++ b/fs/ceph/file.c
@@ -712,63 +712,53 @@ static ssize_t ceph_aio_write(struct kiocb *iocb, const struct iovec *iov,
struct ceph_osd_client *osdc =
&ceph_sb_to_client(inode->i_sb)->client->osdc;
loff_t endoff = pos + iov->iov_len;
- int want, got = 0;
- int ret, err;
+ int got = 0;
+ int ret, err, written;
if (ceph_snap(inode) != CEPH_NOSNAP)
return -EROFS;
retry_snap:
+ written = 0;
if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL))
return -ENOSPC;
__ceph_do_pending_vmtruncate(inode);
- dout("aio_write %p %llx.%llx %llu~%u getting caps. i_size %llu\n",
- inode, ceph_vinop(inode), pos, (unsigned)iov->iov_len,
- inode->i_size);
- if (fi->fmode & CEPH_FILE_MODE_LAZY)
- want = CEPH_CAP_FILE_BUFFER | CEPH_CAP_FILE_LAZYIO;
- else
- want = CEPH_CAP_FILE_BUFFER;
- ret = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, &got, endoff);
- if (ret < 0)
- goto out_put;
-
- dout("aio_write %p %llx.%llx %llu~%u got cap refs on %s\n",
- inode, ceph_vinop(inode), pos, (unsigned)iov->iov_len,
- ceph_cap_string(got));
-
- if ((got & (CEPH_CAP_FILE_BUFFER|CEPH_CAP_FILE_LAZYIO)) == 0 ||
- (iocb->ki_filp->f_flags & O_DIRECT) ||
- (inode->i_sb->s_flags & MS_SYNCHRONOUS) ||
- (fi->flags & CEPH_F_SYNC)) {
- ret = ceph_sync_write(file, iov->iov_base, iov->iov_len,
- &iocb->ki_pos);
- } else {
- /*
- * buffered write; drop Fw early to avoid slow
- * revocation if we get stuck on balance_dirty_pages
- */
- int dirty;
-
- spin_lock(&ci->i_ceph_lock);
- dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR);
- spin_unlock(&ci->i_ceph_lock);
- ceph_put_cap_refs(ci, got);
+ /*
+ * try to do a buffered write. if we don't have sufficient
+ * caps, we'll get -EAGAIN from generic_file_aio_write, or a
+ * short write if we only get caps for some pages.
+ */
+ if (!(iocb->ki_filp->f_flags & O_DIRECT) &&
+ !(inode->i_sb->s_flags & MS_SYNCHRONOUS) &&
+ !(fi->flags & CEPH_F_SYNC)) {
ret = generic_file_aio_write(iocb, iov, nr_segs, pos);
+ if (ret >= 0)
+ written = ret;
+
if ((ret >= 0 || ret == -EIOCBQUEUED) &&
((file->f_flags & O_SYNC) || IS_SYNC(file->f_mapping->host)
|| ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_NEARFULL))) {
- err = vfs_fsync_range(file, pos, pos + ret - 1, 1);
+ err = vfs_fsync_range(file, pos, pos + written - 1, 1);
if (err < 0)
ret = err;
}
+ if ((ret < 0 && ret != -EAGAIN) || pos + written >= endoff)
+ goto out;
+ }
- if (dirty)
- __mark_inode_dirty(inode, dirty);
+ dout("aio_write %p %llx.%llx %llu~%u getting caps. i_size %llu\n",
+ inode, ceph_vinop(inode), pos + written,
+ (unsigned)iov->iov_len - written, inode->i_size);
+ ret = ceph_get_caps(ci, CEPH_CAP_FILE_WR, 0, &got, endoff);
+ if (ret < 0)
goto out;
- }
+ dout("aio_write %p %llx.%llx %llu~%u got cap refs on %s\n",
+ inode, ceph_vinop(inode), pos + written,
+ (unsigned)iov->iov_len - written, ceph_cap_string(got));
+ ret = ceph_sync_write(file, iov->iov_base + written,
+ iov->iov_len - written, &iocb->ki_pos);
if (ret >= 0) {
int dirty;
spin_lock(&ci->i_ceph_lock);
@@ -777,13 +767,10 @@ retry_snap:
if (dirty)
__mark_inode_dirty(inode, dirty);
}
-
-out_put:
dout("aio_write %p %llx.%llx %llu~%u dropping cap refs on %s\n",
- inode, ceph_vinop(inode), pos, (unsigned)iov->iov_len,
- ceph_cap_string(got));
+ inode, ceph_vinop(inode), pos + written,
+ (unsigned)iov->iov_len - written, ceph_cap_string(got));
ceph_put_cap_refs(ci, got);
-
out:
if (ret == -EOLDSNAPC) {
dout("aio_write %p %llx.%llx %llu~%u got EOLDSNAPC, retrying\n",
diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c
index ba95eea201b..2971eaa65cd 100644
--- a/fs/ceph/inode.c
+++ b/fs/ceph/inode.c
@@ -1466,7 +1466,7 @@ void __ceph_do_pending_vmtruncate(struct inode *inode)
{
struct ceph_inode_info *ci = ceph_inode(inode);
u64 to;
- int wrbuffer_refs, wake = 0;
+ int wrbuffer_refs, finish = 0;
retry:
spin_lock(&ci->i_ceph_lock);
@@ -1498,15 +1498,18 @@ retry:
truncate_inode_pages(inode->i_mapping, to);
spin_lock(&ci->i_ceph_lock);
- ci->i_truncate_pending--;
- if (ci->i_truncate_pending == 0)
- wake = 1;
+ if (to == ci->i_truncate_size) {
+ ci->i_truncate_pending = 0;
+ finish = 1;
+ }
spin_unlock(&ci->i_ceph_lock);
+ if (!finish)
+ goto retry;
if (wrbuffer_refs == 0)
ceph_check_caps(ci, CHECK_CAPS_AUTHONLY, NULL);
- if (wake)
- wake_up_all(&ci->i_cap_wq);
+
+ wake_up_all(&ci->i_cap_wq);
}
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index 1bcf712655d..9165eb8309e 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -1590,7 +1590,7 @@ static int set_request_path_attr(struct inode *rinode, struct dentry *rdentry,
} else if (rpath || rino) {
*ino = rino;
*ppath = rpath;
- *pathlen = strlen(rpath);
+ *pathlen = rpath ? strlen(rpath) : 0;
dout(" path %.*s\n", *pathlen, rpath);
}
@@ -1876,9 +1876,14 @@ finish:
static void __wake_requests(struct ceph_mds_client *mdsc,
struct list_head *head)
{
- struct ceph_mds_request *req, *nreq;
+ struct ceph_mds_request *req;
+ LIST_HEAD(tmp_list);
+
+ list_splice_init(head, &tmp_list);
- list_for_each_entry_safe(req, nreq, head, r_wait) {
+ while (!list_empty(&tmp_list)) {
+ req = list_entry(tmp_list.next,
+ struct ceph_mds_request, r_wait);
list_del_init(&req->r_wait);
__do_request(mdsc, req);
}
diff --git a/fs/ceph/super.c b/fs/ceph/super.c
index 2eb43f21132..e86aa994812 100644
--- a/fs/ceph/super.c
+++ b/fs/ceph/super.c
@@ -403,8 +403,6 @@ static int ceph_show_options(struct seq_file *m, struct dentry *root)
seq_printf(m, ",mount_timeout=%d", opt->mount_timeout);
if (opt->osd_idle_ttl != CEPH_OSD_IDLE_TTL_DEFAULT)
seq_printf(m, ",osd_idle_ttl=%d", opt->osd_idle_ttl);
- if (opt->osd_timeout != CEPH_OSD_TIMEOUT_DEFAULT)
- seq_printf(m, ",osdtimeout=%d", opt->osd_timeout);
if (opt->osd_keepalive_timeout != CEPH_OSD_KEEPALIVE_DEFAULT)
seq_printf(m, ",osdkeepalivetimeout=%d",
opt->osd_keepalive_timeout);
@@ -849,7 +847,7 @@ static int ceph_register_bdi(struct super_block *sb,
fsc->backing_dev_info.ra_pages =
default_backing_dev_info.ra_pages;
- err = bdi_register(&fsc->backing_dev_info, NULL, "ceph-%d",
+ err = bdi_register(&fsc->backing_dev_info, NULL, "ceph-%ld",
atomic_long_inc_return(&bdi_seq));
if (!err)
sb->s_bdi = &fsc->backing_dev_info;
diff --git a/include/linux/backing-dev.h b/include/linux/backing-dev.h
index 2a9a9abc912..12731a19ef0 100644
--- a/include/linux/backing-dev.h
+++ b/include/linux/backing-dev.h
@@ -114,6 +114,7 @@ struct backing_dev_info {
int bdi_init(struct backing_dev_info *bdi);
void bdi_destroy(struct backing_dev_info *bdi);
+__printf(3, 4)
int bdi_register(struct backing_dev_info *bdi, struct device *parent,
const char *fmt, ...);
int bdi_register_dev(struct backing_dev_info *bdi, dev_t dev);
diff --git a/include/linux/ceph/libceph.h b/include/linux/ceph/libceph.h
index 6470792b13d..084d3c622b1 100644
--- a/include/linux/ceph/libceph.h
+++ b/include/linux/ceph/libceph.h
@@ -43,7 +43,6 @@ struct ceph_options {
struct ceph_entity_addr my_addr;
int mount_timeout;
int osd_idle_ttl;
- int osd_timeout;
int osd_keepalive_timeout;
/*
@@ -63,7 +62,6 @@ struct ceph_options {
* defaults
*/
#define CEPH_MOUNT_TIMEOUT_DEFAULT 60
-#define CEPH_OSD_TIMEOUT_DEFAULT 60 /* seconds */
#define CEPH_OSD_KEEPALIVE_DEFAULT 5
#define CEPH_OSD_IDLE_TTL_DEFAULT 60
diff --git a/include/linux/ceph/osdmap.h b/include/linux/ceph/osdmap.h
index e37acbe989a..10a417f9f76 100644
--- a/include/linux/ceph/osdmap.h
+++ b/include/linux/ceph/osdmap.h
@@ -123,6 +123,7 @@ extern int ceph_calc_pg_acting(struct ceph_osdmap *osdmap, struct ceph_pg pgid,
extern int ceph_calc_pg_primary(struct ceph_osdmap *osdmap,
struct ceph_pg pgid);
+extern const char *ceph_pg_pool_name_by_id(struct ceph_osdmap *map, u64 id);
extern int ceph_pg_poolid_by_name(struct ceph_osdmap *map, const char *name);
#endif
diff --git a/include/linux/ceph/rados.h b/include/linux/ceph/rados.h
index de91fbdf127..2c04afeead1 100644
--- a/include/linux/ceph/rados.h
+++ b/include/linux/ceph/rados.h
@@ -87,6 +87,8 @@ struct ceph_pg {
*
* lpgp_num -- as above.
*/
+#define CEPH_NOPOOL ((__u64) (-1)) /* pool id not defined */
+
#define CEPH_PG_TYPE_REP 1
#define CEPH_PG_TYPE_RAID4 2
#define CEPH_PG_POOL_VERSION 2
diff --git a/net/ceph/ceph_common.c b/net/ceph/ceph_common.c
index a8020293f34..ee71ea26777 100644
--- a/net/ceph/ceph_common.c
+++ b/net/ceph/ceph_common.c
@@ -305,7 +305,6 @@ ceph_parse_options(char *options, const char *dev_name,
/* start with defaults */
opt->flags = CEPH_OPT_DEFAULT;
- opt->osd_timeout = CEPH_OSD_TIMEOUT_DEFAULT;
opt->osd_keepalive_timeout = CEPH_OSD_KEEPALIVE_DEFAULT;
opt->mount_timeout = CEPH_MOUNT_TIMEOUT_DEFAULT; /* seconds */
opt->osd_idle_ttl = CEPH_OSD_IDLE_TTL_DEFAULT; /* seconds */
@@ -391,7 +390,7 @@ ceph_parse_options(char *options, const char *dev_name,
/* misc */
case Opt_osdtimeout:
- opt->osd_timeout = intval;
+ pr_warning("ignoring deprecated osdtimeout option\n");
break;
case Opt_osdkeepalivetimeout:
opt->osd_keepalive_timeout = intval;
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index 3ef1759403b..4d111fd2b49 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -2244,22 +2244,62 @@ bad_tag:
/*
- * Atomically queue work on a connection. Bump @con reference to
- * avoid races with connection teardown.
+ * Atomically queue work on a connection after the specified delay.
+ * Bump @con reference to avoid races with connection teardown.
+ * Returns 0 if work was queued, or an error code otherwise.
*/
-static void queue_con(struct ceph_connection *con)
+static int queue_con_delay(struct ceph_connection *con, unsigned long delay)
{
if (!con->ops->get(con)) {
- dout("queue_con %p ref count 0\n", con);
- return;
+ dout("%s %p ref count 0\n", __func__, con);
+
+ return -ENOENT;
}
- if (!queue_delayed_work(ceph_msgr_wq, &con->work, 0)) {
- dout("queue_con %p - already queued\n", con);
+ if (!queue_delayed_work(ceph_msgr_wq, &con->work, delay)) {
+ dout("%s %p - already queued\n", __func__, con);
con->ops->put(con);
- } else {
- dout("queue_con %p\n", con);
+
+ return -EBUSY;
}
+
+ dout("%s %p %lu\n", __func__, con, delay);
+
+ return 0;
+}
+
+static void queue_con(struct ceph_connection *con)
+{
+ (void) queue_con_delay(con, 0);
+}
+
+static bool con_sock_closed(struct ceph_connection *con)
+{
+ if (!test_and_clear_bit(CON_FLAG_SOCK_CLOSED, &con->flags))
+ return false;
+
+#define CASE(x) \
+ case CON_STATE_ ## x: \
+ con->error_msg = "socket closed (con state " #x ")"; \
+ break;
+
+ switch (con->state) {
+ CASE(CLOSED);
+ CASE(PREOPEN);
+ CASE(CONNECTING);
+ CASE(NEGOTIATING);
+ CASE(OPEN);
+ CASE(STANDBY);
+ default:
+ pr_warning("%s con %p unrecognized state %lu\n",
+ __func__, con, con->state);
+ con->error_msg = "unrecognized con state";
+ BUG();
+ break;
+ }
+#undef CASE
+
+ return true;
}
/*
@@ -2273,35 +2313,16 @@ static void con_work(struct work_struct *work)
mutex_lock(&con->mutex);
restart:
- if (test_and_clear_bit(CON_FLAG_SOCK_CLOSED, &con->flags)) {
- switch (con->state) {
- case CON_STATE_CONNECTING:
- con->error_msg = "connection failed";
- break;
- case CON_STATE_NEGOTIATING:
- con->error_msg = "negotiation failed";
- break;
- case CON_STATE_OPEN:
- con->error_msg = "socket closed";
- break;
- default:
- dout("unrecognized con state %d\n", (int)con->state);
- con->error_msg = "unrecognized con state";
- BUG();
- }
+ if (con_sock_closed(con))
goto fault;
- }
if (test_and_clear_bit(CON_FLAG_BACKOFF, &con->flags)) {
dout("con_work %p backing off\n", con);
- if (queue_delayed_work(ceph_msgr_wq, &con->work,
- round_jiffies_relative(con->delay))) {
- dout("con_work %p backoff %lu\n", con, con->delay);
- mutex_unlock(&con->mutex);
- return;
- } else {
+ ret = queue_con_delay(con, round_jiffies_relative(con->delay));
+ if (ret) {
dout("con_work %p FAILED to back off %lu\n", con,
con->delay);
+ BUG_ON(ret == -ENOENT);
set_bit(CON_FLAG_BACKOFF, &con->flags);
}
goto done;
@@ -2356,7 +2377,7 @@ fault:
static void ceph_fault(struct ceph_connection *con)
__releases(con->mutex)
{
- pr_err("%s%lld %s %s\n", ENTITY_NAME(con->peer_name),
+ pr_warning("%s%lld %s %s\n", ENTITY_NAME(con->peer_name),
ceph_pr_addr(&con->peer_addr.in_addr), con->error_msg);
dout("fault %p state %lu to peer %s\n",
con, con->state, ceph_pr_addr(&con->peer_addr.in_addr));
@@ -2398,24 +2419,8 @@ static void ceph_fault(struct ceph_connection *con)
con->delay = BASE_DELAY_INTERVAL;
else if (con->delay < MAX_DELAY_INTERVAL)
con->delay *= 2;
- con->ops->get(con);
- if (queue_delayed_work(ceph_msgr_wq, &con->work,
- round_jiffies_relative(con->delay))) {
- dout("fault queued %p delay %lu\n", con, con->delay);
- } else {
- con->ops->put(con);
- dout("fault failed to queue %p delay %lu, backoff\n",
- con, con->delay);
- /*
- * In many cases we see a socket state change
- * while con_work is running and end up
- * queuing (non-delayed) work, such that we
- * can't backoff with a delay. Set a flag so
- * that when con_work restarts we schedule the
- * delay then.
- */
- set_bit(CON_FLAG_BACKOFF, &con->flags);
- }
+ set_bit(CON_FLAG_BACKOFF, &con->flags);
+ queue_con(con);
}
out_unlock:
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index c1d756cc744..780caf6b049 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -221,6 +221,7 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
kref_init(&req->r_kref);
init_completion(&req->r_completion);
init_completion(&req->r_safe_completion);
+ RB_CLEAR_NODE(&req->r_node);
INIT_LIST_HEAD(&req->r_unsafe_item);
INIT_LIST_HEAD(&req->r_linger_item);
INIT_LIST_HEAD(&req->r_linger_osd);
@@ -580,7 +581,7 @@ static void __kick_osd_requests(struct ceph_osd_client *osdc,
dout("__kick_osd_requests osd%d\n", osd->o_osd);
err = __reset_osd(osdc, osd);
- if (err == -EAGAIN)
+ if (err)
return;
list_for_each_entry(req, &osd->o_requests, r_osd_item) {
@@ -607,14 +608,6 @@ static void __kick_osd_requests(struct ceph_osd_client *osdc,
}
}
-static void kick_osd_requests(struct ceph_osd_client *osdc,
- struct ceph_osd *kickosd)
-{
- mutex_lock(&osdc->request_mutex);
- __kick_osd_requests(osdc, kickosd);
- mutex_unlock(&osdc->request_mutex);
-}
-
/*
* If the osd connection drops, we need to resubmit all requests.
*/
@@ -628,7 +621,9 @@ static void osd_reset(struct ceph_connection *con)
dout("osd_reset osd%d\n", osd->o_osd);
osdc = osd->o_osdc;
down_read(&osdc->map_sem);
- kick_osd_requests(osdc, osd);
+ mutex_lock(&osdc->request_mutex);
+ __kick_osd_requests(osdc, osd);
+ mutex_unlock(&osdc->request_mutex);
send_queued(osdc);
up_read(&osdc->map_sem);
}
@@ -647,6 +642,7 @@ static struct ceph_osd *create_osd(struct ceph_osd_client *osdc, int onum)
atomic_set(&osd->o_ref, 1);
osd->o_osdc = osdc;
osd->o_osd = onum;
+ RB_CLEAR_NODE(&osd->o_node);
INIT_LIST_HEAD(&osd->o_requests);
INIT_LIST_HEAD(&osd->o_linger_requests);
INIT_LIST_HEAD(&osd->o_osd_lru);
@@ -750,6 +746,7 @@ static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
if (list_empty(&osd->o_requests) &&
list_empty(&osd->o_linger_requests)) {
__remove_osd(osdc, osd);
+ ret = -ENODEV;
} else if (memcmp(&osdc->osdmap->osd_addr[osd->o_osd],
&osd->o_con.peer_addr,
sizeof(osd->o_con.peer_addr)) == 0 &&
@@ -876,9 +873,9 @@ static void __unregister_request(struct ceph_osd_client *osdc,
req->r_osd = NULL;
}
+ list_del_init(&req->r_req_lru_item);
ceph_osdc_put_request(req);
- list_del_init(&req->r_req_lru_item);
if (osdc->num_requests == 0) {
dout(" no requests, canceling timeout\n");
__cancel_osd_timeout(osdc);
@@ -910,8 +907,8 @@ static void __unregister_linger_request(struct ceph_osd_client *osdc,
struct ceph_osd_request *req)
{
dout("__unregister_linger_request %p\n", req);
+ list_del_init(&req->r_linger_item);
if (req->r_osd) {
- list_del_init(&req->r_linger_item);
list_del_init(&req->r_linger_osd);
if (list_empty(&req->r_osd->o_requests) &&
@@ -1090,12 +1087,10 @@ static void handle_timeout(struct work_struct *work)
{
struct ceph_osd_client *osdc =
container_of(work, struct ceph_osd_client, timeout_work.work);
- struct ceph_osd_request *req, *last_req = NULL;
+ struct ceph_osd_request *req;
struct ceph_osd *osd;
- unsigned long timeout = osdc->client->options->osd_timeout * HZ;
unsigned long keepalive =
osdc->client->options->osd_keepalive_timeout * HZ;
- unsigned long last_stamp = 0;
struct list_head slow_osds;
dout("timeout\n");
down_read(&osdc->map_sem);
@@ -1105,37 +1100,6 @@ static void handle_timeout(struct work_struct *work)
mutex_lock(&osdc->request_mutex);
/*
- * reset osds that appear to be _really_ unresponsive. this
- * is a failsafe measure.. we really shouldn't be getting to
- * this point if the system is working properly. the monitors
- * should mark the osd as failed and we should find out about
- * it from an updated osd map.
- */
- while (timeout && !list_empty(&osdc->req_lru)) {
- req = list_entry(osdc->req_lru.next, struct ceph_osd_request,
- r_req_lru_item);
-
- /* hasn't been long enough since we sent it? */
- if (time_before(jiffies, req->r_stamp + timeout))
- break;
-
- /* hasn't been long enough since it was acked? */
- if (req->r_request->ack_stamp == 0 ||
- time_before(jiffies, req->r_request->ack_stamp + timeout))
- break;
-
- BUG_ON(req == last_req && req->r_stamp == last_stamp);
- last_req = req;
- last_stamp = req->r_stamp;
-
- osd = req->r_osd;
- BUG_ON(!osd);
- pr_warning(" tid %llu timed out on osd%d, will reset osd\n",
- req->r_tid, osd->o_osd);
- __kick_osd_requests(osdc, osd);
- }
-
- /*
* ping osds that are a bit slow. this ensures that if there
* is a break in the TCP connection we will notice, and reopen
* a connection with that osd (from the fault callback).
@@ -1364,8 +1328,8 @@ static void kick_requests(struct ceph_osd_client *osdc, int force_resend)
dout("kicking lingering %p tid %llu osd%d\n", req, req->r_tid,
req->r_osd ? req->r_osd->o_osd : -1);
- __unregister_linger_request(osdc, req);
__register_request(osdc, req);
+ __unregister_linger_request(osdc, req);
}
mutex_unlock(&osdc->request_mutex);
@@ -1599,6 +1563,7 @@ int ceph_osdc_create_event(struct ceph_osd_client *osdc,
event->data = data;
event->osdc = osdc;
INIT_LIST_HEAD(&event->osd_node);
+ RB_CLEAR_NODE(&event->node);
kref_init(&event->kref); /* one ref for us */
kref_get(&event->kref); /* one ref for the caller */
init_completion(&event->completion);
diff --git a/net/ceph/osdmap.c b/net/ceph/osdmap.c
index 5433fb0eb3c..de73214b5d2 100644
--- a/net/ceph/osdmap.c
+++ b/net/ceph/osdmap.c
@@ -469,6 +469,22 @@ static struct ceph_pg_pool_info *__lookup_pg_pool(struct rb_root *root, int id)
return NULL;
}
+const char *ceph_pg_pool_name_by_id(struct ceph_osdmap *map, u64 id)
+{
+ struct ceph_pg_pool_info *pi;
+
+ if (id == CEPH_NOPOOL)
+ return NULL;
+
+ if (WARN_ON_ONCE(id > (u64) INT_MAX))
+ return NULL;
+
+ pi = __lookup_pg_pool(&map->pg_pools, (int) id);
+
+ return pi ? pi->name : NULL;
+}
+EXPORT_SYMBOL(ceph_pg_pool_name_by_id);
+
int ceph_pg_poolid_by_name(struct ceph_osdmap *map, const char *name)
{
struct rb_node *rbp;
@@ -645,10 +661,12 @@ struct ceph_osdmap *osdmap_decode(void **p, void *end)
ceph_decode_32_safe(p, end, max, bad);
while (max--) {
ceph_decode_need(p, end, 4 + 1 + sizeof(pi->v), bad);
+ err = -ENOMEM;
pi = kzalloc(sizeof(*pi), GFP_NOFS);
if (!pi)
goto bad;
pi->id = ceph_decode_32(p);
+ err = -EINVAL;
ev = ceph_decode_8(p); /* encoding version */
if (ev > CEPH_PG_POOL_VERSION) {
pr_warning("got unknown v %d > %d of ceph_pg_pool\n",
@@ -664,8 +682,13 @@ struct ceph_osdmap *osdmap_decode(void **p, void *end)
__insert_pg_pool(&map->pg_pools, pi);
}
- if (version >= 5 && __decode_pool_names(p, end, map) < 0)
- goto bad;
+ if (version >= 5) {
+ err = __decode_pool_names(p, end, map);
+ if (err < 0) {
+ dout("fail to decode pool names");
+ goto bad;
+ }
+ }
ceph_decode_32_safe(p, end, map->pool_max, bad);
@@ -745,7 +768,7 @@ struct ceph_osdmap *osdmap_decode(void **p, void *end)
return map;
bad:
- dout("osdmap_decode fail\n");
+ dout("osdmap_decode fail err %d\n", err);
ceph_osdmap_destroy(map);
return ERR_PTR(err);
}
@@ -839,6 +862,7 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end,
if (ev > CEPH_PG_POOL_VERSION) {
pr_warning("got unknown v %d > %d of ceph_pg_pool\n",
ev, CEPH_PG_POOL_VERSION);
+ err = -EINVAL;
goto bad;
}
pi = __lookup_pg_pool(&map->pg_pools, pool);
@@ -855,8 +879,11 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end,
if (err < 0)
goto bad;
}
- if (version >= 5 && __decode_pool_names(p, end, map) < 0)
- goto bad;
+ if (version >= 5) {
+ err = __decode_pool_names(p, end, map);
+ if (err < 0)
+ goto bad;
+ }
/* old_pool */
ceph_decode_32_safe(p, end, len, bad);
@@ -932,15 +959,13 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end,
(void) __remove_pg_mapping(&map->pg_temp, pgid);
/* insert */
- if (pglen > (UINT_MAX - sizeof(*pg)) / sizeof(u32)) {
- err = -EINVAL;
+ err = -EINVAL;
+ if (pglen > (UINT_MAX - sizeof(*pg)) / sizeof(u32))
goto bad;
- }
+ err = -ENOMEM;
pg = kmalloc(sizeof(*pg) + sizeof(u32)*pglen, GFP_NOFS);
- if (!pg) {
- err = -ENOMEM;
+ if (!pg)
goto bad;
- }
pg->pgid = pgid;
pg->len = pglen;
for (j = 0; j < pglen; j++)