diff options
Diffstat (limited to 'net/ceph/osd_client.c')
-rw-r--r-- | net/ceph/osd_client.c | 84 |
1 files changed, 48 insertions, 36 deletions
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index 3a246a6cab47..bc0016e3e5ac 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -733,12 +733,14 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, object_size = le32_to_cpu(layout->fl_object_size); object_base = off - objoff; - if (truncate_size <= object_base) { - truncate_size = 0; - } else { - truncate_size -= object_base; - if (truncate_size > object_size) - truncate_size = object_size; + if (!(truncate_seq == 1 && truncate_size == -1ULL)) { + if (truncate_size <= object_base) { + truncate_size = 0; + } else { + truncate_size -= object_base; + if (truncate_size > object_size) + truncate_size = object_size; + } } osd_req_op_extent_init(req, 0, opcode, objoff, objlen, @@ -1174,6 +1176,7 @@ static void __register_linger_request(struct ceph_osd_client *osdc, struct ceph_osd_request *req) { dout("__register_linger_request %p\n", req); + ceph_osdc_get_request(req); list_add_tail(&req->r_linger_item, &osdc->req_linger); if (req->r_osd) list_add_tail(&req->r_linger_osd, @@ -1196,6 +1199,7 @@ static void __unregister_linger_request(struct ceph_osd_client *osdc, if (list_empty(&req->r_osd_item)) req->r_osd = NULL; } + ceph_osdc_put_request(req); } void ceph_osdc_unregister_linger_request(struct ceph_osd_client *osdc, @@ -1203,9 +1207,8 @@ void ceph_osdc_unregister_linger_request(struct ceph_osd_client *osdc, { mutex_lock(&osdc->request_mutex); if (req->r_linger) { - __unregister_linger_request(osdc, req); req->r_linger = 0; - ceph_osdc_put_request(req); + __unregister_linger_request(osdc, req); } mutex_unlock(&osdc->request_mutex); } @@ -1217,11 +1220,6 @@ void ceph_osdc_set_request_linger(struct ceph_osd_client *osdc, if (!req->r_linger) { dout("set_request_linger %p\n", req); req->r_linger = 1; - /* - * caller is now responsible for calling - * unregister_linger_request - */ - ceph_osdc_get_request(req); } } EXPORT_SYMBOL(ceph_osdc_set_request_linger); @@ -1339,10 +1337,6 @@ static void __send_request(struct ceph_osd_client *osdc, ceph_msg_get(req->r_request); /* send consumes a ref */ - /* Mark the request unsafe if this is the first timet's being sent. */ - - if (!req->r_sent && req->r_unsafe_callback) - req->r_unsafe_callback(req, true); req->r_sent = req->r_osd->o_incarnation; ceph_con_send(&req->r_osd->o_con, req->r_request); @@ -1433,8 +1427,6 @@ static void handle_osds_timeout(struct work_struct *work) static void complete_request(struct ceph_osd_request *req) { - if (req->r_unsafe_callback) - req->r_unsafe_callback(req, false); complete_all(&req->r_safe_completion); /* fsync waiter */ } @@ -1496,14 +1488,14 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg, dout("handle_reply %p tid %llu req %p result %d\n", msg, tid, req, result); - ceph_decode_need(&p, end, 4, bad); + ceph_decode_need(&p, end, 4, bad_put); numops = ceph_decode_32(&p); if (numops > CEPH_OSD_MAX_OP) goto bad_put; if (numops != req->r_num_ops) goto bad_put; payload_len = 0; - ceph_decode_need(&p, end, numops * sizeof(struct ceph_osd_op), bad); + ceph_decode_need(&p, end, numops * sizeof(struct ceph_osd_op), bad_put); for (i = 0; i < numops; i++) { struct ceph_osd_op *op = p; int len; @@ -1521,11 +1513,13 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg, goto bad_put; } - ceph_decode_need(&p, end, 4 + numops * 4, bad); + ceph_decode_need(&p, end, 4 + numops * 4, bad_put); retry_attempt = ceph_decode_32(&p); for (i = 0; i < numops; i++) req->r_reply_op_result[i] = ceph_decode_32(&p); + already_completed = req->r_got_reply; + if (!req->r_got_reply) { req->r_result = result; @@ -1556,19 +1550,23 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg, ((flags & CEPH_OSD_FLAG_WRITE) == 0)) __unregister_request(osdc, req); - already_completed = req->r_completed; - req->r_completed = 1; mutex_unlock(&osdc->request_mutex); - if (already_completed) - goto done; - if (req->r_callback) - req->r_callback(req, msg); - else - complete_all(&req->r_completion); + if (!already_completed) { + if (req->r_unsafe_callback && + result >= 0 && !(flags & CEPH_OSD_FLAG_ONDISK)) + req->r_unsafe_callback(req, true); + if (req->r_callback) + req->r_callback(req, msg); + else + complete_all(&req->r_completion); + } - if (flags & CEPH_OSD_FLAG_ONDISK) + if (flags & CEPH_OSD_FLAG_ONDISK) { + if (req->r_unsafe_callback && already_completed) + req->r_unsafe_callback(req, false); complete_request(req); + } done: dout("req=%p req->r_linger=%d\n", req, req->r_linger); @@ -1633,8 +1631,10 @@ static void kick_requests(struct ceph_osd_client *osdc, int force_resend) dout("%p tid %llu restart on osd%d\n", req, req->r_tid, req->r_osd ? req->r_osd->o_osd : -1); + ceph_osdc_get_request(req); __unregister_request(osdc, req); __register_linger_request(osdc, req); + ceph_osdc_put_request(req); continue; } @@ -1786,6 +1786,8 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg) nr_maps--; } + if (!osdc->osdmap) + goto bad; done: downgrade_write(&osdc->map_sem); ceph_monc_got_osdmap(&osdc->client->monc, osdc->osdmap->epoch); @@ -2123,13 +2125,14 @@ int ceph_osdc_start_request(struct ceph_osd_client *osdc, __register_request(osdc, req); req->r_sent = 0; req->r_got_reply = 0; - req->r_completed = 0; rc = __map_request(osdc, req, 0); if (rc < 0) { if (nofail) { dout("osdc_start_request failed map, " " will retry %lld\n", req->r_tid); rc = 0; + } else { + __unregister_request(osdc, req); } goto out_unlock; } @@ -2206,6 +2209,17 @@ void ceph_osdc_sync(struct ceph_osd_client *osdc) EXPORT_SYMBOL(ceph_osdc_sync); /* + * Call all pending notify callbacks - for use after a watch is + * unregistered, to make sure no more callbacks for it will be invoked + */ +extern void ceph_osdc_flush_notifies(struct ceph_osd_client *osdc) +{ + flush_workqueue(osdc->notify_wq); +} +EXPORT_SYMBOL(ceph_osdc_flush_notifies); + + +/* * init, shutdown */ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client) @@ -2254,12 +2268,10 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client) if (err < 0) goto out_msgpool; + err = -ENOMEM; osdc->notify_wq = create_singlethread_workqueue("ceph-watch-notify"); - if (IS_ERR(osdc->notify_wq)) { - err = PTR_ERR(osdc->notify_wq); - osdc->notify_wq = NULL; + if (!osdc->notify_wq) goto out_msgpool; - } return 0; out_msgpool: |