summaryrefslogtreecommitdiff
path: root/net
diff options
context:
space:
mode:
authorLinus Torvalds <torvalds@linux-foundation.org>2020-10-21 10:34:10 -0700
committerLinus Torvalds <torvalds@linux-foundation.org>2020-10-21 10:34:10 -0700
commited7cfefe4443dcc811e84b345a3fb122eeb47661 (patch)
treeab5458ee4134916e32312580448aa151ff95000b /net
parentc4d6fe7311762f2e03b3c27ad38df7c40c80cc93 (diff)
parent28e1581c3b4ea5f98530064a103c6217bedeea73 (diff)
Merge tag 'ceph-for-5.10-rc1' of git://github.com/ceph/ceph-client
Pull ceph updates from Ilya Dryomov: - a patch that removes crush_workspace_mutex (myself). CRUSH computations are no longer serialized and can run in parallel. - a couple new filesystem client metrics for "ceph fs top" command (Xiubo Li) - a fix for a very old messenger bug that affected the filesystem, marked for stable (myself) - assorted fixups and cleanups throughout the codebase from Jeff and others. * tag 'ceph-for-5.10-rc1' of git://github.com/ceph/ceph-client: (27 commits) libceph: clear con->out_msg on Policy::stateful_server faults libceph: format ceph_entity_addr nonces as unsigned libceph: fix ENTITY_NAME format suggestion libceph: move a dout in queue_con_delay() ceph: comment cleanups and clarifications ceph: break up send_cap_msg ceph: drop separate mdsc argument from __send_cap ceph: promote to unsigned long long before shifting ceph: don't SetPageError on readpage errors ceph: mark ceph_fmt_xattr() as printf-like for better type checking ceph: fold ceph_update_writeable_page into ceph_write_begin ceph: fold ceph_sync_writepages into writepage_nounlock ceph: fold ceph_sync_readpages into ceph_readpage ceph: don't call ceph_update_writeable_page from page_mkwrite ceph: break out writeback of incompatible snap context to separate function ceph: add a note explaining session reject error string libceph: switch to the new "osd blocklist add" command libceph, rbd, ceph: "blacklist" -> "blocklist" ceph: have ceph_writepages_start call pagevec_lookup_range_tag ceph: use kill_anon_super helper ...
Diffstat (limited to 'net')
-rw-r--r--net/ceph/messenger.c13
-rw-r--r--net/ceph/mon_client.c69
-rw-r--r--net/ceph/osdmap.c166
3 files changed, 213 insertions, 35 deletions
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index d4d7a0e52491..af0f1fa24937 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -2016,11 +2016,11 @@ static int process_banner(struct ceph_connection *con)
sizeof(con->peer_addr)) != 0 &&
!(addr_is_blank(&con->actual_peer_addr) &&
con->actual_peer_addr.nonce == con->peer_addr.nonce)) {
- pr_warn("wrong peer, want %s/%d, got %s/%d\n",
+ pr_warn("wrong peer, want %s/%u, got %s/%u\n",
ceph_pr_addr(&con->peer_addr),
- (int)le32_to_cpu(con->peer_addr.nonce),
+ le32_to_cpu(con->peer_addr.nonce),
ceph_pr_addr(&con->actual_peer_addr),
- (int)le32_to_cpu(con->actual_peer_addr.nonce));
+ le32_to_cpu(con->actual_peer_addr.nonce));
con->error_msg = "wrong peer at address";
return -1;
}
@@ -2811,13 +2811,13 @@ static int queue_con_delay(struct ceph_connection *con, unsigned long delay)
return -ENOENT;
}
+ dout("%s %p %lu\n", __func__, con, delay);
if (!queue_delayed_work(ceph_msgr_wq, &con->work, delay)) {
dout("%s %p - already queued\n", __func__, con);
con->ops->put(con);
return -EBUSY;
}
- dout("%s %p %lu\n", __func__, con, delay);
return 0;
}
@@ -2998,6 +2998,11 @@ static void con_fault(struct ceph_connection *con)
ceph_msg_put(con->in_msg);
con->in_msg = NULL;
}
+ if (con->out_msg) {
+ BUG_ON(con->out_msg->con != con);
+ ceph_msg_put(con->out_msg);
+ con->out_msg = NULL;
+ }
/* Requeue anything that hasn't been acked */
list_splice_init(&con->out_sent, &con->out_queue);
diff --git a/net/ceph/mon_client.c b/net/ceph/mon_client.c
index d633a0aeaa55..c4cf2529d08b 100644
--- a/net/ceph/mon_client.c
+++ b/net/ceph/mon_client.c
@@ -896,8 +896,9 @@ bad:
ceph_msg_dump(msg);
}
-int ceph_monc_blacklist_add(struct ceph_mon_client *monc,
- struct ceph_entity_addr *client_addr)
+static __printf(2, 0)
+int do_mon_command_vargs(struct ceph_mon_client *monc, const char *fmt,
+ va_list ap)
{
struct ceph_mon_generic_request *req;
struct ceph_mon_command *h;
@@ -925,29 +926,65 @@ int ceph_monc_blacklist_add(struct ceph_mon_client *monc,
h->monhdr.session_mon_tid = 0;
h->fsid = monc->monmap->fsid;
h->num_strs = cpu_to_le32(1);
- len = sprintf(h->str, "{ \"prefix\": \"osd blacklist\", \
- \"blacklistop\": \"add\", \
- \"addr\": \"%pISpc/%u\" }",
- &client_addr->in_addr, le32_to_cpu(client_addr->nonce));
+ len = vsprintf(h->str, fmt, ap);
h->str_len = cpu_to_le32(len);
send_generic_request(monc, req);
mutex_unlock(&monc->mutex);
ret = wait_generic_request(req);
- if (!ret)
- /*
- * Make sure we have the osdmap that includes the blacklist
- * entry. This is needed to ensure that the OSDs pick up the
- * new blacklist before processing any future requests from
- * this client.
- */
- ret = ceph_wait_for_latest_osdmap(monc->client, 0);
-
out:
put_generic_request(req);
return ret;
}
-EXPORT_SYMBOL(ceph_monc_blacklist_add);
+
+static __printf(2, 3)
+int do_mon_command(struct ceph_mon_client *monc, const char *fmt, ...)
+{
+ va_list ap;
+ int ret;
+
+ va_start(ap, fmt);
+ ret = do_mon_command_vargs(monc, fmt, ap);
+ va_end(ap);
+ return ret;
+}
+
+int ceph_monc_blocklist_add(struct ceph_mon_client *monc,
+ struct ceph_entity_addr *client_addr)
+{
+ int ret;
+
+ ret = do_mon_command(monc,
+ "{ \"prefix\": \"osd blocklist\", \
+ \"blocklistop\": \"add\", \
+ \"addr\": \"%pISpc/%u\" }",
+ &client_addr->in_addr,
+ le32_to_cpu(client_addr->nonce));
+ if (ret == -EINVAL) {
+ /*
+ * The monitor returns EINVAL on an unrecognized command.
+ * Try the legacy command -- it is exactly the same except
+ * for the name.
+ */
+ ret = do_mon_command(monc,
+ "{ \"prefix\": \"osd blacklist\", \
+ \"blacklistop\": \"add\", \
+ \"addr\": \"%pISpc/%u\" }",
+ &client_addr->in_addr,
+ le32_to_cpu(client_addr->nonce));
+ }
+ if (ret)
+ return ret;
+
+ /*
+ * Make sure we have the osdmap that includes the blocklist
+ * entry. This is needed to ensure that the OSDs pick up the
+ * new blocklist before processing any future requests from
+ * this client.
+ */
+ return ceph_wait_for_latest_osdmap(monc->client, 0);
+}
+EXPORT_SYMBOL(ceph_monc_blocklist_add);
/*
* Resend pending generic requests.
diff --git a/net/ceph/osdmap.c b/net/ceph/osdmap.c
index 96c25f5e064a..fa08c15be0c0 100644
--- a/net/ceph/osdmap.c
+++ b/net/ceph/osdmap.c
@@ -965,6 +965,143 @@ bad:
}
/*
+ * CRUSH workspaces
+ *
+ * workspace_manager framework borrowed from fs/btrfs/compression.c.
+ * Two simplifications: there is only one type of workspace and there
+ * is always at least one workspace.
+ */
+static struct crush_work *alloc_workspace(const struct crush_map *c)
+{
+ struct crush_work *work;
+ size_t work_size;
+
+ WARN_ON(!c->working_size);
+ work_size = crush_work_size(c, CEPH_PG_MAX_SIZE);
+ dout("%s work_size %zu bytes\n", __func__, work_size);
+
+ work = ceph_kvmalloc(work_size, GFP_NOIO);
+ if (!work)
+ return NULL;
+
+ INIT_LIST_HEAD(&work->item);
+ crush_init_workspace(c, work);
+ return work;
+}
+
+static void free_workspace(struct crush_work *work)
+{
+ WARN_ON(!list_empty(&work->item));
+ kvfree(work);
+}
+
+static void init_workspace_manager(struct workspace_manager *wsm)
+{
+ INIT_LIST_HEAD(&wsm->idle_ws);
+ spin_lock_init(&wsm->ws_lock);
+ atomic_set(&wsm->total_ws, 0);
+ wsm->free_ws = 0;
+ init_waitqueue_head(&wsm->ws_wait);
+}
+
+static void add_initial_workspace(struct workspace_manager *wsm,
+ struct crush_work *work)
+{
+ WARN_ON(!list_empty(&wsm->idle_ws));
+
+ list_add(&work->item, &wsm->idle_ws);
+ atomic_set(&wsm->total_ws, 1);
+ wsm->free_ws = 1;
+}
+
+static void cleanup_workspace_manager(struct workspace_manager *wsm)
+{
+ struct crush_work *work;
+
+ while (!list_empty(&wsm->idle_ws)) {
+ work = list_first_entry(&wsm->idle_ws, struct crush_work,
+ item);
+ list_del_init(&work->item);
+ free_workspace(work);
+ }
+ atomic_set(&wsm->total_ws, 0);
+ wsm->free_ws = 0;
+}
+
+/*
+ * Finds an available workspace or allocates a new one. If it's not
+ * possible to allocate a new one, waits until there is one.
+ */
+static struct crush_work *get_workspace(struct workspace_manager *wsm,
+ const struct crush_map *c)
+{
+ struct crush_work *work;
+ int cpus = num_online_cpus();
+
+again:
+ spin_lock(&wsm->ws_lock);
+ if (!list_empty(&wsm->idle_ws)) {
+ work = list_first_entry(&wsm->idle_ws, struct crush_work,
+ item);
+ list_del_init(&work->item);
+ wsm->free_ws--;
+ spin_unlock(&wsm->ws_lock);
+ return work;
+
+ }
+ if (atomic_read(&wsm->total_ws) > cpus) {
+ DEFINE_WAIT(wait);
+
+ spin_unlock(&wsm->ws_lock);
+ prepare_to_wait(&wsm->ws_wait, &wait, TASK_UNINTERRUPTIBLE);
+ if (atomic_read(&wsm->total_ws) > cpus && !wsm->free_ws)
+ schedule();
+ finish_wait(&wsm->ws_wait, &wait);
+ goto again;
+ }
+ atomic_inc(&wsm->total_ws);
+ spin_unlock(&wsm->ws_lock);
+
+ work = alloc_workspace(c);
+ if (!work) {
+ atomic_dec(&wsm->total_ws);
+ wake_up(&wsm->ws_wait);
+
+ /*
+ * Do not return the error but go back to waiting. We
+ * have the inital workspace and the CRUSH computation
+ * time is bounded so we will get it eventually.
+ */
+ WARN_ON(atomic_read(&wsm->total_ws) < 1);
+ goto again;
+ }
+ return work;
+}
+
+/*
+ * Puts a workspace back on the list or frees it if we have enough
+ * idle ones sitting around.
+ */
+static void put_workspace(struct workspace_manager *wsm,
+ struct crush_work *work)
+{
+ spin_lock(&wsm->ws_lock);
+ if (wsm->free_ws <= num_online_cpus()) {
+ list_add(&work->item, &wsm->idle_ws);
+ wsm->free_ws++;
+ spin_unlock(&wsm->ws_lock);
+ goto wake;
+ }
+ spin_unlock(&wsm->ws_lock);
+
+ free_workspace(work);
+ atomic_dec(&wsm->total_ws);
+wake:
+ if (wq_has_sleeper(&wsm->ws_wait))
+ wake_up(&wsm->ws_wait);
+}
+
+/*
* osd map
*/
struct ceph_osdmap *ceph_osdmap_alloc(void)
@@ -981,7 +1118,8 @@ struct ceph_osdmap *ceph_osdmap_alloc(void)
map->primary_temp = RB_ROOT;
map->pg_upmap = RB_ROOT;
map->pg_upmap_items = RB_ROOT;
- mutex_init(&map->crush_workspace_mutex);
+
+ init_workspace_manager(&map->crush_wsm);
return map;
}
@@ -989,8 +1127,11 @@ struct ceph_osdmap *ceph_osdmap_alloc(void)
void ceph_osdmap_destroy(struct ceph_osdmap *map)
{
dout("osdmap_destroy %p\n", map);
+
if (map->crush)
crush_destroy(map->crush);
+ cleanup_workspace_manager(&map->crush_wsm);
+
while (!RB_EMPTY_ROOT(&map->pg_temp)) {
struct ceph_pg_mapping *pg =
rb_entry(rb_first(&map->pg_temp),
@@ -1029,7 +1170,6 @@ void ceph_osdmap_destroy(struct ceph_osdmap *map)
kvfree(map->osd_weight);
kvfree(map->osd_addr);
kvfree(map->osd_primary_affinity);
- kvfree(map->crush_workspace);
kfree(map);
}
@@ -1104,26 +1244,22 @@ static int osdmap_set_max_osd(struct ceph_osdmap *map, u32 max)
static int osdmap_set_crush(struct ceph_osdmap *map, struct crush_map *crush)
{
- void *workspace;
- size_t work_size;
+ struct crush_work *work;
if (IS_ERR(crush))
return PTR_ERR(crush);
- work_size = crush_work_size(crush, CEPH_PG_MAX_SIZE);
- dout("%s work_size %zu bytes\n", __func__, work_size);
- workspace = ceph_kvmalloc(work_size, GFP_NOIO);
- if (!workspace) {
+ work = alloc_workspace(crush);
+ if (!work) {
crush_destroy(crush);
return -ENOMEM;
}
- crush_init_workspace(crush, workspace);
if (map->crush)
crush_destroy(map->crush);
- kvfree(map->crush_workspace);
+ cleanup_workspace_manager(&map->crush_wsm);
map->crush = crush;
- map->crush_workspace = workspace;
+ add_initial_workspace(&map->crush_wsm, work);
return 0;
}
@@ -2322,6 +2458,7 @@ static int do_crush(struct ceph_osdmap *map, int ruleno, int x,
s64 choose_args_index)
{
struct crush_choose_arg_map *arg_map;
+ struct crush_work *work;
int r;
BUG_ON(result_max > CEPH_PG_MAX_SIZE);
@@ -2332,12 +2469,11 @@ static int do_crush(struct ceph_osdmap *map, int ruleno, int x,
arg_map = lookup_choose_arg_map(&map->crush->choose_args,
CEPH_DEFAULT_CHOOSE_ARGS);
- mutex_lock(&map->crush_workspace_mutex);
+ work = get_workspace(&map->crush_wsm, map->crush);
r = crush_do_rule(map->crush, ruleno, x, result, result_max,
- weight, weight_max, map->crush_workspace,
+ weight, weight_max, work,
arg_map ? arg_map->args : NULL);
- mutex_unlock(&map->crush_workspace_mutex);
-
+ put_workspace(&map->crush_wsm, work);
return r;
}