diff options
-rw-r--r-- | include/linux/ceph/osd_client.h | 6 | ||||
-rw-r--r-- | net/ceph/osd_client.c | 248 |
2 files changed, 248 insertions, 6 deletions
diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h index 2ae7cfd82ec9..3e7bf72e4078 100644 --- a/include/linux/ceph/osd_client.h +++ b/include/linux/ceph/osd_client.h @@ -151,6 +151,7 @@ struct ceph_osd_request_target { struct ceph_osd_request { u64 r_tid; /* unique for this client */ struct rb_node r_node; + struct rb_node r_mc_node; /* map check */ struct ceph_osd *r_osd; struct ceph_osd_request_target r_t; @@ -191,6 +192,7 @@ struct ceph_osd_request { int r_attempts; struct ceph_eversion r_replay_version; /* aka reassert_version */ u32 r_last_force_resend; + u32 r_map_dne_bound; struct ceph_osd_req_op r_ops[]; }; @@ -218,6 +220,7 @@ struct ceph_osd_linger_request { struct ceph_osd_request_target t; u32 last_force_resend; + u32 map_dne_bound; struct timespec mtime; @@ -225,6 +228,7 @@ struct ceph_osd_linger_request { struct mutex lock; struct rb_node node; /* osd */ struct rb_node osdc_node; /* osdc */ + struct rb_node mc_node; /* map check */ struct list_head scan_item; struct completion reg_commit_wait; @@ -257,6 +261,8 @@ struct ceph_osd_client { atomic64_t last_tid; /* tid of last request */ u64 last_linger_id; struct rb_root linger_requests; /* lingering requests */ + struct rb_root map_checks; + struct rb_root linger_map_checks; atomic_t num_requests; atomic_t num_homeless; struct delayed_work timeout_work; diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index 5ac6dce74f07..ece2d10a1208 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -396,6 +396,7 @@ static void target_destroy(struct ceph_osd_request_target *t) static void request_release_checks(struct ceph_osd_request *req) { WARN_ON(!RB_EMPTY_NODE(&req->r_node)); + WARN_ON(!RB_EMPTY_NODE(&req->r_mc_node)); WARN_ON(!list_empty(&req->r_unsafe_item)); WARN_ON(req->r_osd); } @@ -456,6 +457,7 @@ static void request_init(struct ceph_osd_request *req) init_completion(&req->r_completion); init_completion(&req->r_safe_completion); RB_CLEAR_NODE(&req->r_node); + RB_CLEAR_NODE(&req->r_mc_node); INIT_LIST_HEAD(&req->r_unsafe_item); target_init(&req->r_t); @@ -969,6 +971,7 @@ EXPORT_SYMBOL(ceph_osdc_new_request); * We keep osd requests in an rbtree, sorted by ->r_tid. */ DEFINE_RB_FUNCS(request, struct ceph_osd_request, r_tid, r_node) +DEFINE_RB_FUNCS(request_mc, struct ceph_osd_request, r_tid, r_mc_node) static bool osd_homeless(struct ceph_osd *osd) { @@ -1601,10 +1604,13 @@ static void maybe_request_map(struct ceph_osd_client *osdc) ceph_monc_renew_subs(&osdc->client->monc); } +static void send_map_check(struct ceph_osd_request *req); + static void __submit_request(struct ceph_osd_request *req, bool wrlocked) { struct ceph_osd_client *osdc = req->r_osdc; struct ceph_osd *osd; + enum calc_target_result ct_res; bool need_send = false; bool promoted = false; @@ -1612,7 +1618,10 @@ static void __submit_request(struct ceph_osd_request *req, bool wrlocked) dout("%s req %p wrlocked %d\n", __func__, req, wrlocked); again: - calc_target(osdc, &req->r_t, &req->r_last_force_resend, false); + ct_res = calc_target(osdc, &req->r_t, &req->r_last_force_resend, false); + if (ct_res == CALC_TARGET_POOL_DNE && !wrlocked) + goto promote; + osd = lookup_create_osd(osdc, req->r_t.osd, wrlocked); if (IS_ERR(osd)) { WARN_ON(PTR_ERR(osd) != -EAGAIN || wrlocked); @@ -1656,6 +1665,9 @@ again: send_request(req); mutex_unlock(&osd->lock); + if (ct_res == CALC_TARGET_POOL_DNE) + send_map_check(req); + if (promoted) downgrade_write(&osdc->lock); return; @@ -1699,6 +1711,7 @@ static void __finish_request(struct ceph_osd_request *req) verify_osd_locked(osd); dout("%s req %p tid %llu\n", __func__, req, req->r_tid); + WARN_ON(lookup_request_mc(&osdc->map_checks, req->r_tid)); unlink_request(osd, req); atomic_dec(&osdc->num_requests); @@ -1726,13 +1739,127 @@ static void __complete_request(struct ceph_osd_request *req) complete_all(&req->r_completion); } +/* + * Note that this is open-coded in handle_reply(), which has to deal + * with ack vs commit, dup acks, etc. + */ +static void complete_request(struct ceph_osd_request *req, int err) +{ + dout("%s req %p tid %llu err %d\n", __func__, req, req->r_tid, err); + + req->r_result = err; + __finish_request(req); + __complete_request(req); + complete_all(&req->r_safe_completion); + ceph_osdc_put_request(req); +} + +static void cancel_map_check(struct ceph_osd_request *req) +{ + struct ceph_osd_client *osdc = req->r_osdc; + struct ceph_osd_request *lookup_req; + + verify_osdc_wrlocked(osdc); + + lookup_req = lookup_request_mc(&osdc->map_checks, req->r_tid); + if (!lookup_req) + return; + + WARN_ON(lookup_req != req); + erase_request_mc(&osdc->map_checks, req); + ceph_osdc_put_request(req); +} + static void cancel_request(struct ceph_osd_request *req) { dout("%s req %p tid %llu\n", __func__, req, req->r_tid); + cancel_map_check(req); finish_request(req); } +static void check_pool_dne(struct ceph_osd_request *req) +{ + struct ceph_osd_client *osdc = req->r_osdc; + struct ceph_osdmap *map = osdc->osdmap; + + verify_osdc_wrlocked(osdc); + WARN_ON(!map->epoch); + + if (req->r_attempts) { + /* + * We sent a request earlier, which means that + * previously the pool existed, and now it does not + * (i.e., it was deleted). + */ + req->r_map_dne_bound = map->epoch; + dout("%s req %p tid %llu pool disappeared\n", __func__, req, + req->r_tid); + } else { + dout("%s req %p tid %llu map_dne_bound %u have %u\n", __func__, + req, req->r_tid, req->r_map_dne_bound, map->epoch); + } + + if (req->r_map_dne_bound) { + if (map->epoch >= req->r_map_dne_bound) { + /* we had a new enough map */ + pr_info_ratelimited("tid %llu pool does not exist\n", + req->r_tid); + complete_request(req, -ENOENT); + } + } else { + send_map_check(req); + } +} + +static void map_check_cb(struct ceph_mon_generic_request *greq) +{ + struct ceph_osd_client *osdc = &greq->monc->client->osdc; + struct ceph_osd_request *req; + u64 tid = greq->private_data; + + WARN_ON(greq->result || !greq->u.newest); + + down_write(&osdc->lock); + req = lookup_request_mc(&osdc->map_checks, tid); + if (!req) { + dout("%s tid %llu dne\n", __func__, tid); + goto out_unlock; + } + + dout("%s req %p tid %llu map_dne_bound %u newest %llu\n", __func__, + req, req->r_tid, req->r_map_dne_bound, greq->u.newest); + if (!req->r_map_dne_bound) + req->r_map_dne_bound = greq->u.newest; + erase_request_mc(&osdc->map_checks, req); + check_pool_dne(req); + + ceph_osdc_put_request(req); +out_unlock: + up_write(&osdc->lock); +} + +static void send_map_check(struct ceph_osd_request *req) +{ + struct ceph_osd_client *osdc = req->r_osdc; + struct ceph_osd_request *lookup_req; + int ret; + + verify_osdc_wrlocked(osdc); + + lookup_req = lookup_request_mc(&osdc->map_checks, req->r_tid); + if (lookup_req) { + WARN_ON(lookup_req != req); + return; + } + + ceph_osdc_get_request(req); + insert_request_mc(&osdc->map_checks, req); + ret = ceph_monc_get_version_async(&osdc->client->monc, "osdmap", + map_check_cb, req->r_tid); + WARN_ON(ret); +} + /* * lingering requests, watch/notify v2 infrastructure */ @@ -1745,6 +1872,7 @@ static void linger_release(struct kref *kref) lreq->reg_req, lreq->ping_req); WARN_ON(!RB_EMPTY_NODE(&lreq->node)); WARN_ON(!RB_EMPTY_NODE(&lreq->osdc_node)); + WARN_ON(!RB_EMPTY_NODE(&lreq->mc_node)); WARN_ON(!list_empty(&lreq->scan_item)); WARN_ON(!list_empty(&lreq->pending_lworks)); WARN_ON(lreq->osd); @@ -1783,6 +1911,7 @@ linger_alloc(struct ceph_osd_client *osdc) mutex_init(&lreq->lock); RB_CLEAR_NODE(&lreq->node); RB_CLEAR_NODE(&lreq->osdc_node); + RB_CLEAR_NODE(&lreq->mc_node); INIT_LIST_HEAD(&lreq->scan_item); INIT_LIST_HEAD(&lreq->pending_lworks); init_completion(&lreq->reg_commit_wait); @@ -1797,6 +1926,7 @@ linger_alloc(struct ceph_osd_client *osdc) DEFINE_RB_INSDEL_FUNCS(linger, struct ceph_osd_linger_request, linger_id, node) DEFINE_RB_FUNCS(linger_osdc, struct ceph_osd_linger_request, linger_id, osdc_node) +DEFINE_RB_FUNCS(linger_mc, struct ceph_osd_linger_request, linger_id, mc_node) /* * Create linger request <-> OSD session relation. @@ -2193,6 +2323,23 @@ static void linger_submit(struct ceph_osd_linger_request *lreq) send_linger(lreq); } +static void cancel_linger_map_check(struct ceph_osd_linger_request *lreq) +{ + struct ceph_osd_client *osdc = lreq->osdc; + struct ceph_osd_linger_request *lookup_lreq; + + verify_osdc_wrlocked(osdc); + + lookup_lreq = lookup_linger_mc(&osdc->linger_map_checks, + lreq->linger_id); + if (!lookup_lreq) + return; + + WARN_ON(lookup_lreq != lreq); + erase_linger_mc(&osdc->linger_map_checks, lreq); + linger_put(lreq); +} + /* * @lreq has to be both registered and linked. */ @@ -2202,6 +2349,7 @@ static void __linger_cancel(struct ceph_osd_linger_request *lreq) cancel_linger_request(lreq->ping_req); if (lreq->reg_req->r_osd) cancel_linger_request(lreq->reg_req); + cancel_linger_map_check(lreq); unlink_linger(lreq->osd, lreq); linger_unregister(lreq); } @@ -2216,6 +2364,89 @@ static void linger_cancel(struct ceph_osd_linger_request *lreq) up_write(&osdc->lock); } +static void send_linger_map_check(struct ceph_osd_linger_request *lreq); + +static void check_linger_pool_dne(struct ceph_osd_linger_request *lreq) +{ + struct ceph_osd_client *osdc = lreq->osdc; + struct ceph_osdmap *map = osdc->osdmap; + + verify_osdc_wrlocked(osdc); + WARN_ON(!map->epoch); + + if (lreq->register_gen) { + lreq->map_dne_bound = map->epoch; + dout("%s lreq %p linger_id %llu pool disappeared\n", __func__, + lreq, lreq->linger_id); + } else { + dout("%s lreq %p linger_id %llu map_dne_bound %u have %u\n", + __func__, lreq, lreq->linger_id, lreq->map_dne_bound, + map->epoch); + } + + if (lreq->map_dne_bound) { + if (map->epoch >= lreq->map_dne_bound) { + /* we had a new enough map */ + pr_info("linger_id %llu pool does not exist\n", + lreq->linger_id); + linger_reg_commit_complete(lreq, -ENOENT); + __linger_cancel(lreq); + } + } else { + send_linger_map_check(lreq); + } +} + +static void linger_map_check_cb(struct ceph_mon_generic_request *greq) +{ + struct ceph_osd_client *osdc = &greq->monc->client->osdc; + struct ceph_osd_linger_request *lreq; + u64 linger_id = greq->private_data; + + WARN_ON(greq->result || !greq->u.newest); + + down_write(&osdc->lock); + lreq = lookup_linger_mc(&osdc->linger_map_checks, linger_id); + if (!lreq) { + dout("%s linger_id %llu dne\n", __func__, linger_id); + goto out_unlock; + } + + dout("%s lreq %p linger_id %llu map_dne_bound %u newest %llu\n", + __func__, lreq, lreq->linger_id, lreq->map_dne_bound, + greq->u.newest); + if (!lreq->map_dne_bound) + lreq->map_dne_bound = greq->u.newest; + erase_linger_mc(&osdc->linger_map_checks, lreq); + check_linger_pool_dne(lreq); + + linger_put(lreq); +out_unlock: + up_write(&osdc->lock); +} + +static void send_linger_map_check(struct ceph_osd_linger_request *lreq) +{ + struct ceph_osd_client *osdc = lreq->osdc; + struct ceph_osd_linger_request *lookup_lreq; + int ret; + + verify_osdc_wrlocked(osdc); + + lookup_lreq = lookup_linger_mc(&osdc->linger_map_checks, + lreq->linger_id); + if (lookup_lreq) { + WARN_ON(lookup_lreq != lreq); + return; + } + + linger_get(lreq); + insert_linger_mc(&osdc->linger_map_checks, lreq); + ret = ceph_monc_get_version_async(&osdc->client->monc, "osdmap", + linger_map_check_cb, lreq->linger_id); + WARN_ON(ret); +} + static int linger_reg_commit_wait(struct ceph_osd_linger_request *lreq) { int ret; @@ -2677,10 +2908,7 @@ static void handle_reply(struct ceph_osd *osd, struct ceph_msg *msg) return; fail_request: - req->r_result = -EIO; - __finish_request(req); - __complete_request(req); - complete_all(&req->r_safe_completion); + complete_request(req, -EIO); out_unlock_session: mutex_unlock(&osd->lock); out_unlock_osdc: @@ -2764,6 +2992,7 @@ static void scan_requests(struct ceph_osd *osd, /* fall through */ case CALC_TARGET_NEED_RESEND: + cancel_linger_map_check(lreq); /* * scan_requests() for the previous epoch(s) * may have already added it to the list, since @@ -2773,6 +3002,7 @@ static void scan_requests(struct ceph_osd *osd, list_add_tail(&lreq->scan_item, need_resend_linger); break; case CALC_TARGET_POOL_DNE: + check_linger_pool_dne(lreq); break; } } @@ -2782,7 +3012,7 @@ static void scan_requests(struct ceph_osd *osd, rb_entry(n, struct ceph_osd_request, r_node); enum calc_target_result ct_res; - n = rb_next(n); /* unlink_request() */ + n = rb_next(n); /* unlink_request(), check_pool_dne() */ dout("%s req %p tid %llu\n", __func__, req, req->r_tid); ct_res = calc_target(osdc, &req->r_t, @@ -2799,10 +3029,12 @@ static void scan_requests(struct ceph_osd *osd, /* fall through */ case CALC_TARGET_NEED_RESEND: + cancel_map_check(req); unlink_request(osd, req); insert_request(need_resend, req); break; case CALC_TARGET_POOL_DNE: + check_pool_dne(req); break; } } @@ -3655,6 +3887,8 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client) osdc->homeless_osd.o_osdc = osdc; osdc->homeless_osd.o_osd = CEPH_HOMELESS_OSD; osdc->linger_requests = RB_ROOT; + osdc->map_checks = RB_ROOT; + osdc->linger_map_checks = RB_ROOT; INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout); INIT_DELAYED_WORK(&osdc->osds_timeout_work, handle_osds_timeout); @@ -3720,6 +3954,8 @@ void ceph_osdc_stop(struct ceph_osd_client *osdc) WARN_ON(!list_empty(&osdc->osd_lru)); WARN_ON(!RB_EMPTY_ROOT(&osdc->linger_requests)); + WARN_ON(!RB_EMPTY_ROOT(&osdc->map_checks)); + WARN_ON(!RB_EMPTY_ROOT(&osdc->linger_map_checks)); WARN_ON(atomic_read(&osdc->num_requests)); WARN_ON(atomic_read(&osdc->num_homeless)); |