aboutsummaryrefslogtreecommitdiff
path: root/drivers/block/rbd.c
diff options
context:
space:
mode:
Diffstat (limited to 'drivers/block/rbd.c')
-rw-r--r--drivers/block/rbd.c172
1 files changed, 105 insertions, 67 deletions
diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index aff789d6fcc..c421fa52851 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -937,12 +937,14 @@ static const char *rbd_dev_v1_snap_name(struct rbd_device *rbd_dev,
u64 snap_id)
{
u32 which;
+ const char *snap_name;
which = rbd_dev_snap_index(rbd_dev, snap_id);
if (which == BAD_SNAP_INDEX)
- return NULL;
+ return ERR_PTR(-ENOENT);
- return _rbd_dev_v1_snap_name(rbd_dev, which);
+ snap_name = _rbd_dev_v1_snap_name(rbd_dev, which);
+ return snap_name ? snap_name : ERR_PTR(-ENOMEM);
}
static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
@@ -1126,6 +1128,7 @@ static void zero_bio_chain(struct bio *chain, int start_ofs)
buf = bvec_kmap_irq(bv, &flags);
memset(buf + remainder, 0,
bv->bv_len - remainder);
+ flush_dcache_page(bv->bv_page);
bvec_kunmap_irq(buf, &flags);
}
pos += bv->bv_len;
@@ -1158,6 +1161,7 @@ static void zero_pages(struct page **pages, u64 offset, u64 end)
local_irq_save(flags);
kaddr = kmap_atomic(*page);
memset(kaddr + page_offset, 0, length);
+ flush_dcache_page(*page);
kunmap_atomic(kaddr);
local_irq_restore(flags);
@@ -1565,11 +1569,12 @@ rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
obj_request, obj_request->img_request, obj_request->result,
xferred, length);
/*
- * ENOENT means a hole in the image. We zero-fill the
- * entire length of the request. A short read also implies
- * zero-fill to the end of the request. Either way we
- * update the xferred count to indicate the whole request
- * was satisfied.
+ * ENOENT means a hole in the image. We zero-fill the entire
+ * length of the request. A short read also implies zero-fill
+ * to the end of the request. An error requires the whole
+ * length of the request to be reported finished with an error
+ * to the block layer. In each case we update the xferred
+ * count to indicate the whole request was satisfied.
*/
rbd_assert(obj_request->type != OBJ_REQUEST_NODATA);
if (obj_request->result == -ENOENT) {
@@ -1578,14 +1583,13 @@ rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
else
zero_pages(obj_request->pages, 0, length);
obj_request->result = 0;
- obj_request->xferred = length;
} else if (xferred < length && !obj_request->result) {
if (obj_request->type == OBJ_REQUEST_BIO)
zero_bio_chain(obj_request->bio_list, xferred);
else
zero_pages(obj_request->pages, xferred, length);
- obj_request->xferred = length;
}
+ obj_request->xferred = length;
obj_request_done_set(obj_request);
}
@@ -2171,9 +2175,9 @@ static int rbd_img_request_fill(struct rbd_img_request *img_request,
struct rbd_obj_request *obj_request = NULL;
struct rbd_obj_request *next_obj_request;
bool write_request = img_request_write_test(img_request);
- struct bio *bio_list;
+ struct bio *bio_list = 0;
unsigned int bio_offset = 0;
- struct page **pages;
+ struct page **pages = 0;
u64 img_offset;
u64 resid;
u16 opcode;
@@ -2211,6 +2215,11 @@ static int rbd_img_request_fill(struct rbd_img_request *img_request,
rbd_segment_name_free(object_name);
if (!obj_request)
goto out_unwind;
+ /*
+ * set obj_request->img_request before creating the
+ * osd_request so that it gets the right snapc
+ */
+ rbd_img_obj_request_add(img_request, obj_request);
if (type == OBJ_REQUEST_BIO) {
unsigned int clone_size;
@@ -2252,11 +2261,6 @@ static int rbd_img_request_fill(struct rbd_img_request *img_request,
obj_request->pages, length,
offset & ~PAGE_MASK, false, false);
- /*
- * set obj_request->img_request before formatting
- * the osd_request so that it gets the right snapc
- */
- rbd_img_obj_request_add(img_request, obj_request);
if (write_request)
rbd_osd_req_format_write(obj_request);
else
@@ -2817,7 +2821,7 @@ out_err:
obj_request_done_set(obj_request);
}
-static int rbd_obj_notify_ack(struct rbd_device *rbd_dev, u64 notify_id)
+static int rbd_obj_notify_ack_sync(struct rbd_device *rbd_dev, u64 notify_id)
{
struct rbd_obj_request *obj_request;
struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
@@ -2832,16 +2836,17 @@ static int rbd_obj_notify_ack(struct rbd_device *rbd_dev, u64 notify_id)
obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
if (!obj_request->osd_req)
goto out;
- obj_request->callback = rbd_obj_request_put;
osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_NOTIFY_ACK,
notify_id, 0, 0);
rbd_osd_req_format_read(obj_request);
ret = rbd_obj_request_submit(osdc, obj_request);
-out:
if (ret)
- rbd_obj_request_put(obj_request);
+ goto out;
+ ret = rbd_obj_request_wait(obj_request);
+out:
+ rbd_obj_request_put(obj_request);
return ret;
}
@@ -2861,7 +2866,7 @@ static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
if (ret)
rbd_warn(rbd_dev, ": header refresh error (%d)\n", ret);
- rbd_obj_notify_ack(rbd_dev, notify_id);
+ rbd_obj_notify_ack_sync(rbd_dev, notify_id);
}
/*
@@ -3333,6 +3338,31 @@ static void rbd_exists_validate(struct rbd_device *rbd_dev)
clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
}
+static void rbd_dev_update_size(struct rbd_device *rbd_dev)
+{
+ sector_t size;
+ bool removing;
+
+ /*
+ * Don't hold the lock while doing disk operations,
+ * or lock ordering will conflict with the bdev mutex via:
+ * rbd_add() -> blkdev_get() -> rbd_open()
+ */
+ spin_lock_irq(&rbd_dev->lock);
+ removing = test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags);
+ spin_unlock_irq(&rbd_dev->lock);
+ /*
+ * If the device is being removed, rbd_dev->disk has
+ * been destroyed, so don't try to update its size
+ */
+ if (!removing) {
+ size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
+ dout("setting size to %llu sectors", (unsigned long long)size);
+ set_capacity(rbd_dev->disk, size);
+ revalidate_disk(rbd_dev->disk);
+ }
+}
+
static int rbd_dev_refresh(struct rbd_device *rbd_dev)
{
u64 mapping_size;
@@ -3351,12 +3381,7 @@ static int rbd_dev_refresh(struct rbd_device *rbd_dev)
rbd_exists_validate(rbd_dev);
mutex_unlock(&ctl_mutex);
if (mapping_size != rbd_dev->mapping.size) {
- sector_t size;
-
- size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
- dout("setting size to %llu sectors", (unsigned long long)size);
- set_capacity(rbd_dev->disk, size);
- revalidate_disk(rbd_dev->disk);
+ rbd_dev_update_size(rbd_dev);
}
return ret;
@@ -3710,12 +3735,14 @@ static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
if (ret < sizeof (size_buf))
return -ERANGE;
- if (order)
+ if (order) {
*order = size_buf.order;
+ dout(" order %u", (unsigned int)*order);
+ }
*snap_size = le64_to_cpu(size_buf.size);
- dout(" snap_id 0x%016llx order = %u, snap_size = %llu\n",
- (unsigned long long)snap_id, (unsigned int)*order,
+ dout(" snap_id 0x%016llx snap_size = %llu\n",
+ (unsigned long long)snap_id,
(unsigned long long)*snap_size);
return 0;
@@ -4030,8 +4057,13 @@ static u64 rbd_v2_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
snap_id = snapc->snaps[which];
snap_name = rbd_dev_v2_snap_name(rbd_dev, snap_id);
- if (IS_ERR(snap_name))
- break;
+ if (IS_ERR(snap_name)) {
+ /* ignore no-longer existing snapshots */
+ if (PTR_ERR(snap_name) == -ENOENT)
+ continue;
+ else
+ break;
+ }
found = !strcmp(name, snap_name);
kfree(snap_name);
}
@@ -4110,8 +4142,8 @@ static int rbd_dev_spec_update(struct rbd_device *rbd_dev)
/* Look up the snapshot name, and make a copy */
snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
- if (!snap_name) {
- ret = -ENOMEM;
+ if (IS_ERR(snap_name)) {
+ ret = PTR_ERR(snap_name);
goto out_err;
}
@@ -5059,23 +5091,6 @@ err_out_module:
return (ssize_t)rc;
}
-static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
-{
- struct list_head *tmp;
- struct rbd_device *rbd_dev;
-
- spin_lock(&rbd_dev_list_lock);
- list_for_each(tmp, &rbd_dev_list) {
- rbd_dev = list_entry(tmp, struct rbd_device, node);
- if (rbd_dev->dev_id == dev_id) {
- spin_unlock(&rbd_dev_list_lock);
- return rbd_dev;
- }
- }
- spin_unlock(&rbd_dev_list_lock);
- return NULL;
-}
-
static void rbd_dev_device_release(struct device *dev)
{
struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
@@ -5120,8 +5135,10 @@ static ssize_t rbd_remove(struct bus_type *bus,
size_t count)
{
struct rbd_device *rbd_dev = NULL;
- int target_id;
+ struct list_head *tmp;
+ int dev_id;
unsigned long ul;
+ bool already = false;
int ret;
ret = strict_strtoul(buf, 10, &ul);
@@ -5129,30 +5146,51 @@ static ssize_t rbd_remove(struct bus_type *bus,
return ret;
/* convert to int; abort if we lost anything in the conversion */
- target_id = (int) ul;
- if (target_id != ul)
+ dev_id = (int)ul;
+ if (dev_id != ul)
return -EINVAL;
mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
- rbd_dev = __rbd_get_dev(target_id);
- if (!rbd_dev) {
- ret = -ENOENT;
- goto done;
+ ret = -ENOENT;
+ spin_lock(&rbd_dev_list_lock);
+ list_for_each(tmp, &rbd_dev_list) {
+ rbd_dev = list_entry(tmp, struct rbd_device, node);
+ if (rbd_dev->dev_id == dev_id) {
+ ret = 0;
+ break;
+ }
}
-
- spin_lock_irq(&rbd_dev->lock);
- if (rbd_dev->open_count)
- ret = -EBUSY;
- else
- set_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags);
- spin_unlock_irq(&rbd_dev->lock);
- if (ret < 0)
+ if (!ret) {
+ spin_lock_irq(&rbd_dev->lock);
+ if (rbd_dev->open_count)
+ ret = -EBUSY;
+ else
+ already = test_and_set_bit(RBD_DEV_FLAG_REMOVING,
+ &rbd_dev->flags);
+ spin_unlock_irq(&rbd_dev->lock);
+ }
+ spin_unlock(&rbd_dev_list_lock);
+ if (ret < 0 || already)
goto done;
- rbd_bus_del_dev(rbd_dev);
+
ret = rbd_dev_header_watch_sync(rbd_dev, false);
if (ret)
rbd_warn(rbd_dev, "failed to cancel watch event (%d)\n", ret);
+
+ /*
+ * flush remaining watch callbacks - these must be complete
+ * before the osd_client is shutdown
+ */
+ dout("%s: flushing notifies", __func__);
+ ceph_osdc_flush_notifies(&rbd_dev->rbd_client->client->osdc);
+ /*
+ * Don't free anything from rbd_dev->disk until after all
+ * notifies are completely processed. Otherwise
+ * rbd_bus_del_dev() will race with rbd_watch_cb(), resulting
+ * in a potential use after free of rbd_dev->disk or rbd_dev.
+ */
+ rbd_bus_del_dev(rbd_dev);
rbd_dev_image_release(rbd_dev);
module_put(THIS_MODULE);
ret = count;