summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/linux/ceph/osd_client.h6
-rw-r--r--net/ceph/osd_client.c248
2 files changed, 248 insertions, 6 deletions
diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h
index 2ae7cfd82ec9..3e7bf72e4078 100644
--- a/include/linux/ceph/osd_client.h
+++ b/include/linux/ceph/osd_client.h
@@ -151,6 +151,7 @@ struct ceph_osd_request_target {
struct ceph_osd_request {
u64 r_tid; /* unique for this client */
struct rb_node r_node;
+ struct rb_node r_mc_node; /* map check */
struct ceph_osd *r_osd;
struct ceph_osd_request_target r_t;
@@ -191,6 +192,7 @@ struct ceph_osd_request {
int r_attempts;
struct ceph_eversion r_replay_version; /* aka reassert_version */
u32 r_last_force_resend;
+ u32 r_map_dne_bound;
struct ceph_osd_req_op r_ops[];
};
@@ -218,6 +220,7 @@ struct ceph_osd_linger_request {
struct ceph_osd_request_target t;
u32 last_force_resend;
+ u32 map_dne_bound;
struct timespec mtime;
@@ -225,6 +228,7 @@ struct ceph_osd_linger_request {
struct mutex lock;
struct rb_node node; /* osd */
struct rb_node osdc_node; /* osdc */
+ struct rb_node mc_node; /* map check */
struct list_head scan_item;
struct completion reg_commit_wait;
@@ -257,6 +261,8 @@ struct ceph_osd_client {
atomic64_t last_tid; /* tid of last request */
u64 last_linger_id;
struct rb_root linger_requests; /* lingering requests */
+ struct rb_root map_checks;
+ struct rb_root linger_map_checks;
atomic_t num_requests;
atomic_t num_homeless;
struct delayed_work timeout_work;
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index 5ac6dce74f07..ece2d10a1208 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -396,6 +396,7 @@ static void target_destroy(struct ceph_osd_request_target *t)
static void request_release_checks(struct ceph_osd_request *req)
{
WARN_ON(!RB_EMPTY_NODE(&req->r_node));
+ WARN_ON(!RB_EMPTY_NODE(&req->r_mc_node));
WARN_ON(!list_empty(&req->r_unsafe_item));
WARN_ON(req->r_osd);
}
@@ -456,6 +457,7 @@ static void request_init(struct ceph_osd_request *req)
init_completion(&req->r_completion);
init_completion(&req->r_safe_completion);
RB_CLEAR_NODE(&req->r_node);
+ RB_CLEAR_NODE(&req->r_mc_node);
INIT_LIST_HEAD(&req->r_unsafe_item);
target_init(&req->r_t);
@@ -969,6 +971,7 @@ EXPORT_SYMBOL(ceph_osdc_new_request);
* We keep osd requests in an rbtree, sorted by ->r_tid.
*/
DEFINE_RB_FUNCS(request, struct ceph_osd_request, r_tid, r_node)
+DEFINE_RB_FUNCS(request_mc, struct ceph_osd_request, r_tid, r_mc_node)
static bool osd_homeless(struct ceph_osd *osd)
{
@@ -1601,10 +1604,13 @@ static void maybe_request_map(struct ceph_osd_client *osdc)
ceph_monc_renew_subs(&osdc->client->monc);
}
+static void send_map_check(struct ceph_osd_request *req);
+
static void __submit_request(struct ceph_osd_request *req, bool wrlocked)
{
struct ceph_osd_client *osdc = req->r_osdc;
struct ceph_osd *osd;
+ enum calc_target_result ct_res;
bool need_send = false;
bool promoted = false;
@@ -1612,7 +1618,10 @@ static void __submit_request(struct ceph_osd_request *req, bool wrlocked)
dout("%s req %p wrlocked %d\n", __func__, req, wrlocked);
again:
- calc_target(osdc, &req->r_t, &req->r_last_force_resend, false);
+ ct_res = calc_target(osdc, &req->r_t, &req->r_last_force_resend, false);
+ if (ct_res == CALC_TARGET_POOL_DNE && !wrlocked)
+ goto promote;
+
osd = lookup_create_osd(osdc, req->r_t.osd, wrlocked);
if (IS_ERR(osd)) {
WARN_ON(PTR_ERR(osd) != -EAGAIN || wrlocked);
@@ -1656,6 +1665,9 @@ again:
send_request(req);
mutex_unlock(&osd->lock);
+ if (ct_res == CALC_TARGET_POOL_DNE)
+ send_map_check(req);
+
if (promoted)
downgrade_write(&osdc->lock);
return;
@@ -1699,6 +1711,7 @@ static void __finish_request(struct ceph_osd_request *req)
verify_osd_locked(osd);
dout("%s req %p tid %llu\n", __func__, req, req->r_tid);
+ WARN_ON(lookup_request_mc(&osdc->map_checks, req->r_tid));
unlink_request(osd, req);
atomic_dec(&osdc->num_requests);
@@ -1726,13 +1739,127 @@ static void __complete_request(struct ceph_osd_request *req)
complete_all(&req->r_completion);
}
+/*
+ * Note that this is open-coded in handle_reply(), which has to deal
+ * with ack vs commit, dup acks, etc.
+ */
+static void complete_request(struct ceph_osd_request *req, int err)
+{
+ dout("%s req %p tid %llu err %d\n", __func__, req, req->r_tid, err);
+
+ req->r_result = err;
+ __finish_request(req);
+ __complete_request(req);
+ complete_all(&req->r_safe_completion);
+ ceph_osdc_put_request(req);
+}
+
+static void cancel_map_check(struct ceph_osd_request *req)
+{
+ struct ceph_osd_client *osdc = req->r_osdc;
+ struct ceph_osd_request *lookup_req;
+
+ verify_osdc_wrlocked(osdc);
+
+ lookup_req = lookup_request_mc(&osdc->map_checks, req->r_tid);
+ if (!lookup_req)
+ return;
+
+ WARN_ON(lookup_req != req);
+ erase_request_mc(&osdc->map_checks, req);
+ ceph_osdc_put_request(req);
+}
+
static void cancel_request(struct ceph_osd_request *req)
{
dout("%s req %p tid %llu\n", __func__, req, req->r_tid);
+ cancel_map_check(req);
finish_request(req);
}
+static void check_pool_dne(struct ceph_osd_request *req)
+{
+ struct ceph_osd_client *osdc = req->r_osdc;
+ struct ceph_osdmap *map = osdc->osdmap;
+
+ verify_osdc_wrlocked(osdc);
+ WARN_ON(!map->epoch);
+
+ if (req->r_attempts) {
+ /*
+ * We sent a request earlier, which means that
+ * previously the pool existed, and now it does not
+ * (i.e., it was deleted).
+ */
+ req->r_map_dne_bound = map->epoch;
+ dout("%s req %p tid %llu pool disappeared\n", __func__, req,
+ req->r_tid);
+ } else {
+ dout("%s req %p tid %llu map_dne_bound %u have %u\n", __func__,
+ req, req->r_tid, req->r_map_dne_bound, map->epoch);
+ }
+
+ if (req->r_map_dne_bound) {
+ if (map->epoch >= req->r_map_dne_bound) {
+ /* we had a new enough map */
+ pr_info_ratelimited("tid %llu pool does not exist\n",
+ req->r_tid);
+ complete_request(req, -ENOENT);
+ }
+ } else {
+ send_map_check(req);
+ }
+}
+
+static void map_check_cb(struct ceph_mon_generic_request *greq)
+{
+ struct ceph_osd_client *osdc = &greq->monc->client->osdc;
+ struct ceph_osd_request *req;
+ u64 tid = greq->private_data;
+
+ WARN_ON(greq->result || !greq->u.newest);
+
+ down_write(&osdc->lock);
+ req = lookup_request_mc(&osdc->map_checks, tid);
+ if (!req) {
+ dout("%s tid %llu dne\n", __func__, tid);
+ goto out_unlock;
+ }
+
+ dout("%s req %p tid %llu map_dne_bound %u newest %llu\n", __func__,
+ req, req->r_tid, req->r_map_dne_bound, greq->u.newest);
+ if (!req->r_map_dne_bound)
+ req->r_map_dne_bound = greq->u.newest;
+ erase_request_mc(&osdc->map_checks, req);
+ check_pool_dne(req);
+
+ ceph_osdc_put_request(req);
+out_unlock:
+ up_write(&osdc->lock);
+}
+
+static void send_map_check(struct ceph_osd_request *req)
+{
+ struct ceph_osd_client *osdc = req->r_osdc;
+ struct ceph_osd_request *lookup_req;
+ int ret;
+
+ verify_osdc_wrlocked(osdc);
+
+ lookup_req = lookup_request_mc(&osdc->map_checks, req->r_tid);
+ if (lookup_req) {
+ WARN_ON(lookup_req != req);
+ return;
+ }
+
+ ceph_osdc_get_request(req);
+ insert_request_mc(&osdc->map_checks, req);
+ ret = ceph_monc_get_version_async(&osdc->client->monc, "osdmap",
+ map_check_cb, req->r_tid);
+ WARN_ON(ret);
+}
+
/*
* lingering requests, watch/notify v2 infrastructure
*/
@@ -1745,6 +1872,7 @@ static void linger_release(struct kref *kref)
lreq->reg_req, lreq->ping_req);
WARN_ON(!RB_EMPTY_NODE(&lreq->node));
WARN_ON(!RB_EMPTY_NODE(&lreq->osdc_node));
+ WARN_ON(!RB_EMPTY_NODE(&lreq->mc_node));
WARN_ON(!list_empty(&lreq->scan_item));
WARN_ON(!list_empty(&lreq->pending_lworks));
WARN_ON(lreq->osd);
@@ -1783,6 +1911,7 @@ linger_alloc(struct ceph_osd_client *osdc)
mutex_init(&lreq->lock);
RB_CLEAR_NODE(&lreq->node);
RB_CLEAR_NODE(&lreq->osdc_node);
+ RB_CLEAR_NODE(&lreq->mc_node);
INIT_LIST_HEAD(&lreq->scan_item);
INIT_LIST_HEAD(&lreq->pending_lworks);
init_completion(&lreq->reg_commit_wait);
@@ -1797,6 +1926,7 @@ linger_alloc(struct ceph_osd_client *osdc)
DEFINE_RB_INSDEL_FUNCS(linger, struct ceph_osd_linger_request, linger_id, node)
DEFINE_RB_FUNCS(linger_osdc, struct ceph_osd_linger_request, linger_id, osdc_node)
+DEFINE_RB_FUNCS(linger_mc, struct ceph_osd_linger_request, linger_id, mc_node)
/*
* Create linger request <-> OSD session relation.
@@ -2193,6 +2323,23 @@ static void linger_submit(struct ceph_osd_linger_request *lreq)
send_linger(lreq);
}
+static void cancel_linger_map_check(struct ceph_osd_linger_request *lreq)
+{
+ struct ceph_osd_client *osdc = lreq->osdc;
+ struct ceph_osd_linger_request *lookup_lreq;
+
+ verify_osdc_wrlocked(osdc);
+
+ lookup_lreq = lookup_linger_mc(&osdc->linger_map_checks,
+ lreq->linger_id);
+ if (!lookup_lreq)
+ return;
+
+ WARN_ON(lookup_lreq != lreq);
+ erase_linger_mc(&osdc->linger_map_checks, lreq);
+ linger_put(lreq);
+}
+
/*
* @lreq has to be both registered and linked.
*/
@@ -2202,6 +2349,7 @@ static void __linger_cancel(struct ceph_osd_linger_request *lreq)
cancel_linger_request(lreq->ping_req);
if (lreq->reg_req->r_osd)
cancel_linger_request(lreq->reg_req);
+ cancel_linger_map_check(lreq);
unlink_linger(lreq->osd, lreq);
linger_unregister(lreq);
}
@@ -2216,6 +2364,89 @@ static void linger_cancel(struct ceph_osd_linger_request *lreq)
up_write(&osdc->lock);
}
+static void send_linger_map_check(struct ceph_osd_linger_request *lreq);
+
+static void check_linger_pool_dne(struct ceph_osd_linger_request *lreq)
+{
+ struct ceph_osd_client *osdc = lreq->osdc;
+ struct ceph_osdmap *map = osdc->osdmap;
+
+ verify_osdc_wrlocked(osdc);
+ WARN_ON(!map->epoch);
+
+ if (lreq->register_gen) {
+ lreq->map_dne_bound = map->epoch;
+ dout("%s lreq %p linger_id %llu pool disappeared\n", __func__,
+ lreq, lreq->linger_id);
+ } else {
+ dout("%s lreq %p linger_id %llu map_dne_bound %u have %u\n",
+ __func__, lreq, lreq->linger_id, lreq->map_dne_bound,
+ map->epoch);
+ }
+
+ if (lreq->map_dne_bound) {
+ if (map->epoch >= lreq->map_dne_bound) {
+ /* we had a new enough map */
+ pr_info("linger_id %llu pool does not exist\n",
+ lreq->linger_id);
+ linger_reg_commit_complete(lreq, -ENOENT);
+ __linger_cancel(lreq);
+ }
+ } else {
+ send_linger_map_check(lreq);
+ }
+}
+
+static void linger_map_check_cb(struct ceph_mon_generic_request *greq)
+{
+ struct ceph_osd_client *osdc = &greq->monc->client->osdc;
+ struct ceph_osd_linger_request *lreq;
+ u64 linger_id = greq->private_data;
+
+ WARN_ON(greq->result || !greq->u.newest);
+
+ down_write(&osdc->lock);
+ lreq = lookup_linger_mc(&osdc->linger_map_checks, linger_id);
+ if (!lreq) {
+ dout("%s linger_id %llu dne\n", __func__, linger_id);
+ goto out_unlock;
+ }
+
+ dout("%s lreq %p linger_id %llu map_dne_bound %u newest %llu\n",
+ __func__, lreq, lreq->linger_id, lreq->map_dne_bound,
+ greq->u.newest);
+ if (!lreq->map_dne_bound)
+ lreq->map_dne_bound = greq->u.newest;
+ erase_linger_mc(&osdc->linger_map_checks, lreq);
+ check_linger_pool_dne(lreq);
+
+ linger_put(lreq);
+out_unlock:
+ up_write(&osdc->lock);
+}
+
+static void send_linger_map_check(struct ceph_osd_linger_request *lreq)
+{
+ struct ceph_osd_client *osdc = lreq->osdc;
+ struct ceph_osd_linger_request *lookup_lreq;
+ int ret;
+
+ verify_osdc_wrlocked(osdc);
+
+ lookup_lreq = lookup_linger_mc(&osdc->linger_map_checks,
+ lreq->linger_id);
+ if (lookup_lreq) {
+ WARN_ON(lookup_lreq != lreq);
+ return;
+ }
+
+ linger_get(lreq);
+ insert_linger_mc(&osdc->linger_map_checks, lreq);
+ ret = ceph_monc_get_version_async(&osdc->client->monc, "osdmap",
+ linger_map_check_cb, lreq->linger_id);
+ WARN_ON(ret);
+}
+
static int linger_reg_commit_wait(struct ceph_osd_linger_request *lreq)
{
int ret;
@@ -2677,10 +2908,7 @@ static void handle_reply(struct ceph_osd *osd, struct ceph_msg *msg)
return;
fail_request:
- req->r_result = -EIO;
- __finish_request(req);
- __complete_request(req);
- complete_all(&req->r_safe_completion);
+ complete_request(req, -EIO);
out_unlock_session:
mutex_unlock(&osd->lock);
out_unlock_osdc:
@@ -2764,6 +2992,7 @@ static void scan_requests(struct ceph_osd *osd,
/* fall through */
case CALC_TARGET_NEED_RESEND:
+ cancel_linger_map_check(lreq);
/*
* scan_requests() for the previous epoch(s)
* may have already added it to the list, since
@@ -2773,6 +3002,7 @@ static void scan_requests(struct ceph_osd *osd,
list_add_tail(&lreq->scan_item, need_resend_linger);
break;
case CALC_TARGET_POOL_DNE:
+ check_linger_pool_dne(lreq);
break;
}
}
@@ -2782,7 +3012,7 @@ static void scan_requests(struct ceph_osd *osd,
rb_entry(n, struct ceph_osd_request, r_node);
enum calc_target_result ct_res;
- n = rb_next(n); /* unlink_request() */
+ n = rb_next(n); /* unlink_request(), check_pool_dne() */
dout("%s req %p tid %llu\n", __func__, req, req->r_tid);
ct_res = calc_target(osdc, &req->r_t,
@@ -2799,10 +3029,12 @@ static void scan_requests(struct ceph_osd *osd,
/* fall through */
case CALC_TARGET_NEED_RESEND:
+ cancel_map_check(req);
unlink_request(osd, req);
insert_request(need_resend, req);
break;
case CALC_TARGET_POOL_DNE:
+ check_pool_dne(req);
break;
}
}
@@ -3655,6 +3887,8 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
osdc->homeless_osd.o_osdc = osdc;
osdc->homeless_osd.o_osd = CEPH_HOMELESS_OSD;
osdc->linger_requests = RB_ROOT;
+ osdc->map_checks = RB_ROOT;
+ osdc->linger_map_checks = RB_ROOT;
INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout);
INIT_DELAYED_WORK(&osdc->osds_timeout_work, handle_osds_timeout);
@@ -3720,6 +3954,8 @@ void ceph_osdc_stop(struct ceph_osd_client *osdc)
WARN_ON(!list_empty(&osdc->osd_lru));
WARN_ON(!RB_EMPTY_ROOT(&osdc->linger_requests));
+ WARN_ON(!RB_EMPTY_ROOT(&osdc->map_checks));
+ WARN_ON(!RB_EMPTY_ROOT(&osdc->linger_map_checks));
WARN_ON(atomic_read(&osdc->num_requests));
WARN_ON(atomic_read(&osdc->num_homeless));