diff options
Diffstat (limited to 'fs/ceph')
51 files changed, 2456 insertions, 1354 deletions
diff --git a/fs/ceph/Kconfig b/fs/ceph/Kconfig index 04b8280582a..bc87b9c1d27 100644 --- a/fs/ceph/Kconfig +++ b/fs/ceph/Kconfig @@ -2,7 +2,7 @@ config CEPH_FS tristate "Ceph distributed file system (EXPERIMENTAL)" depends on INET && EXPERIMENTAL select LIBCRC32C - select CONFIG_CRYPTO_AES + select CRYPTO_AES help Choose Y or M here to include support for mounting the experimental Ceph distributed file system. Ceph is an extremely diff --git a/fs/ceph/Makefile b/fs/ceph/Makefile index 6a660e610be..278e1172600 100644 --- a/fs/ceph/Makefile +++ b/fs/ceph/Makefile @@ -6,7 +6,7 @@ ifneq ($(KERNELRELEASE),) obj-$(CONFIG_CEPH_FS) += ceph.o -ceph-objs := super.o inode.o dir.o file.o addr.o ioctl.o \ +ceph-objs := super.o inode.o dir.o file.o locks.o addr.o ioctl.o \ export.o caps.o snap.o xattr.o \ messenger.o msgpool.o buffer.o pagelist.o \ mds_client.o mdsmap.o \ diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c index 4b42c2bb603..5598a0d0229 100644 --- a/fs/ceph/addr.c +++ b/fs/ceph/addr.c @@ -274,7 +274,6 @@ static int ceph_readpages(struct file *file, struct address_space *mapping, struct ceph_osd_client *osdc = &ceph_inode_to_client(inode)->osdc; int rc = 0; struct page **pages; - struct pagevec pvec; loff_t offset; u64 len; @@ -297,8 +296,6 @@ static int ceph_readpages(struct file *file, struct address_space *mapping, if (rc < 0) goto out; - /* set uptodate and add to lru in pagevec-sized chunks */ - pagevec_init(&pvec, 0); for (; !list_empty(page_list) && len > 0; rc -= PAGE_CACHE_SIZE, len -= PAGE_CACHE_SIZE) { struct page *page = @@ -312,7 +309,8 @@ static int ceph_readpages(struct file *file, struct address_space *mapping, zero_user_segment(page, s, PAGE_CACHE_SIZE); } - if (add_to_page_cache(page, mapping, page->index, GFP_NOFS)) { + if (add_to_page_cache_lru(page, mapping, page->index, + GFP_NOFS)) { page_cache_release(page); dout("readpages %p add_to_page_cache failed %p\n", inode, page); @@ -323,10 +321,8 @@ static int ceph_readpages(struct file *file, struct address_space *mapping, flush_dcache_page(page); SetPageUptodate(page); unlock_page(page); - if (pagevec_add(&pvec, page) == 0) - pagevec_lru_add_file(&pvec); /* add to lru */ + page_cache_release(page); } - pagevec_lru_add_file(&pvec); rc = 0; out: @@ -504,7 +500,6 @@ static void writepages_finish(struct ceph_osd_request *req, int i; struct ceph_snap_context *snapc = req->r_snapc; struct address_space *mapping = inode->i_mapping; - struct writeback_control *wbc = req->r_wbc; __s32 rc = -EIO; u64 bytes = 0; struct ceph_client *client = ceph_inode_to_client(inode); @@ -546,10 +541,6 @@ static void writepages_finish(struct ceph_osd_request *req, clear_bdi_congested(&client->backing_dev_info, BLK_RW_ASYNC); - if (i >= wrote) { - dout("inode %p skipping page %p\n", inode, page); - wbc->pages_skipped++; - } ceph_put_snap_context((void *)page->private); page->private = 0; ClearPagePrivate(page); @@ -562,7 +553,7 @@ static void writepages_finish(struct ceph_osd_request *req, * page truncation thread, possibly losing some data that * raced its way in */ - if ((issued & CEPH_CAP_FILE_CACHE) == 0) + if ((issued & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) == 0) generic_error_remove_page(inode->i_mapping, page); unlock_page(page); @@ -573,7 +564,7 @@ static void writepages_finish(struct ceph_osd_request *req, ceph_release_pages(req->r_pages, req->r_num_pages); if (req->r_pages_from_pool) mempool_free(req->r_pages, - ceph_client(inode->i_sb)->wb_pagevec_pool); + ceph_sb_to_client(inode->i_sb)->wb_pagevec_pool); else kfree(req->r_pages); ceph_osdc_put_request(req); @@ -799,7 +790,6 @@ get_more_pages: alloc_page_vec(client, req); req->r_callback = writepages_finish; req->r_inode = inode; - req->r_wbc = wbc; } /* note position of first page in pvec */ @@ -808,9 +798,12 @@ get_more_pages: dout("%p will write page %p idx %lu\n", inode, page, page->index); - writeback_stat = atomic_long_inc_return(&client->writeback_count); - if (writeback_stat > CONGESTION_ON_THRESH(client->mount_args->congestion_kb)) { - set_bdi_congested(&client->backing_dev_info, BLK_RW_ASYNC); + writeback_stat = + atomic_long_inc_return(&client->writeback_count); + if (writeback_stat > CONGESTION_ON_THRESH( + client->mount_args->congestion_kb)) { + set_bdi_congested(&client->backing_dev_info, + BLK_RW_ASYNC); } set_page_writeback(page); @@ -1047,7 +1040,7 @@ static int ceph_write_begin(struct file *file, struct address_space *mapping, *pagep = page; dout("write_begin file %p inode %p page %p %d~%d\n", file, - inode, page, (int)pos, (int)len); + inode, page, (int)pos, (int)len); r = ceph_update_writeable_page(file, pos, len, page); } while (r == -EAGAIN); diff --git a/fs/ceph/armor.c b/fs/ceph/armor.c index 67b2c030924..eb2a666b0be 100644 --- a/fs/ceph/armor.c +++ b/fs/ceph/armor.c @@ -1,11 +1,15 @@ #include <linux/errno.h> +int ceph_armor(char *dst, const char *src, const char *end); +int ceph_unarmor(char *dst, const char *src, const char *end); + /* * base64 encode/decode. */ -const char *pem_key = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/"; +static const char *pem_key = + "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/"; static int encode_bits(int c) { diff --git a/fs/ceph/auth.c b/fs/ceph/auth.c index 818afe72e6c..6d2e3060062 100644 --- a/fs/ceph/auth.c +++ b/fs/ceph/auth.c @@ -1,7 +1,6 @@ #include "ceph_debug.h" #include <linux/module.h> -#include <linux/slab.h> #include <linux/err.h> #include <linux/slab.h> @@ -21,7 +20,7 @@ static u32 supported_protocols[] = { CEPH_AUTH_CEPHX }; -int ceph_auth_init_protocol(struct ceph_auth_client *ac, int protocol) +static int ceph_auth_init_protocol(struct ceph_auth_client *ac, int protocol) { switch (protocol) { case CEPH_AUTH_NONE: @@ -134,8 +133,8 @@ bad: return -ERANGE; } -int ceph_build_auth_request(struct ceph_auth_client *ac, - void *msg_buf, size_t msg_len) +static int ceph_build_auth_request(struct ceph_auth_client *ac, + void *msg_buf, size_t msg_len) { struct ceph_mon_request_header *monhdr = msg_buf; void *p = monhdr + 1; @@ -150,7 +149,8 @@ int ceph_build_auth_request(struct ceph_auth_client *ac, ret = ac->ops->build_request(ac, p + sizeof(u32), end); if (ret < 0) { - pr_err("error %d building request\n", ret); + pr_err("error %d building auth method %s request\n", ret, + ac->ops->name); return ret; } dout(" built request %d bytes\n", ret); @@ -229,7 +229,7 @@ int ceph_handle_auth_reply(struct ceph_auth_client *ac, if (ret == -EAGAIN) { return ceph_build_auth_request(ac, reply_buf, reply_len); } else if (ret) { - pr_err("authentication error %d\n", ret); + pr_err("auth method '%s' error %d\n", ac->ops->name, ret); return ret; } return 0; @@ -246,7 +246,7 @@ int ceph_build_auth(struct ceph_auth_client *ac, if (!ac->protocol) return ceph_auth_build_hello(ac, msg_buf, msg_len); BUG_ON(!ac->ops); - if (!ac->ops->is_authenticated(ac)) + if (ac->ops->should_authenticate(ac)) return ceph_build_auth_request(ac, msg_buf, msg_len); return 0; } diff --git a/fs/ceph/auth.h b/fs/ceph/auth.h index ca4f57cfb26..d38a2fb4a13 100644 --- a/fs/ceph/auth.h +++ b/fs/ceph/auth.h @@ -15,6 +15,8 @@ struct ceph_auth_client; struct ceph_authorizer; struct ceph_auth_client_ops { + const char *name; + /* * true if we are authenticated and can connect to * services. @@ -22,6 +24,12 @@ struct ceph_auth_client_ops { int (*is_authenticated)(struct ceph_auth_client *ac); /* + * true if we should (re)authenticate, e.g., when our tickets + * are getting old and crusty. + */ + int (*should_authenticate)(struct ceph_auth_client *ac); + + /* * build requests and process replies during monitor * handshake. if handle_reply returns -EAGAIN, we build * another request. diff --git a/fs/ceph/auth_none.c b/fs/ceph/auth_none.c index 8cd9e3af07f..ad1dc21286c 100644 --- a/fs/ceph/auth_none.c +++ b/fs/ceph/auth_none.c @@ -31,6 +31,13 @@ static int is_authenticated(struct ceph_auth_client *ac) return !xi->starting; } +static int should_authenticate(struct ceph_auth_client *ac) +{ + struct ceph_auth_none_info *xi = ac->private; + + return xi->starting; +} + /* * the generic auth code decode the global_id, and we carry no actual * authenticate state, so nothing happens here. @@ -94,9 +101,11 @@ static void ceph_auth_none_destroy_authorizer(struct ceph_auth_client *ac, } static const struct ceph_auth_client_ops ceph_auth_none_ops = { + .name = "none", .reset = reset, .destroy = destroy, .is_authenticated = is_authenticated, + .should_authenticate = should_authenticate, .handle_reply = handle_reply, .create_authorizer = ceph_auth_none_create_authorizer, .destroy_authorizer = ceph_auth_none_destroy_authorizer, diff --git a/fs/ceph/auth_x.c b/fs/ceph/auth_x.c index fee5a08da88..582e0b2caf8 100644 --- a/fs/ceph/auth_x.c +++ b/fs/ceph/auth_x.c @@ -27,6 +27,17 @@ static int ceph_x_is_authenticated(struct ceph_auth_client *ac) return (ac->want_keys & xi->have_keys) == ac->want_keys; } +static int ceph_x_should_authenticate(struct ceph_auth_client *ac) +{ + struct ceph_x_info *xi = ac->private; + int need; + + ceph_x_validate_tickets(ac, &need); + dout("ceph_x_should_authenticate want=%d need=%d have=%d\n", + ac->want_keys, need, xi->have_keys); + return need != 0; +} + static int ceph_x_encrypt_buflen(int ilen) { return sizeof(struct ceph_x_encrypt_header) + ilen + 16 + @@ -76,8 +87,8 @@ static int ceph_x_decrypt(struct ceph_crypto_key *secret, /* * get existing (or insert new) ticket handler */ -struct ceph_x_ticket_handler *get_ticket_handler(struct ceph_auth_client *ac, - int service) +static struct ceph_x_ticket_handler * +get_ticket_handler(struct ceph_auth_client *ac, int service) { struct ceph_x_ticket_handler *th; struct ceph_x_info *xi = ac->private; @@ -127,7 +138,7 @@ static int ceph_x_proc_ticket_reply(struct ceph_auth_client *ac, int ret; char *dbuf; char *ticket_buf; - u8 struct_v; + u8 reply_struct_v; dbuf = kmalloc(TEMP_TICKET_BUF_LEN, GFP_NOFS); if (!dbuf) @@ -139,14 +150,14 @@ static int ceph_x_proc_ticket_reply(struct ceph_auth_client *ac, goto out_dbuf; ceph_decode_need(&p, end, 1 + sizeof(u32), bad); - struct_v = ceph_decode_8(&p); - if (struct_v != 1) + reply_struct_v = ceph_decode_8(&p); + if (reply_struct_v != 1) goto bad; num = ceph_decode_32(&p); dout("%d tickets\n", num); while (num--) { int type; - u8 struct_v; + u8 tkt_struct_v, blob_struct_v; struct ceph_x_ticket_handler *th; void *dp, *dend; int dlen; @@ -165,8 +176,8 @@ static int ceph_x_proc_ticket_reply(struct ceph_auth_client *ac, type = ceph_decode_32(&p); dout(" ticket type %d %s\n", type, ceph_entity_type_name(type)); - struct_v = ceph_decode_8(&p); - if (struct_v != 1) + tkt_struct_v = ceph_decode_8(&p); + if (tkt_struct_v != 1) goto bad; th = get_ticket_handler(ac, type); @@ -186,8 +197,8 @@ static int ceph_x_proc_ticket_reply(struct ceph_auth_client *ac, dend = dbuf + dlen; dp = dbuf; - struct_v = ceph_decode_8(&dp); - if (struct_v != 1) + tkt_struct_v = ceph_decode_8(&dp); + if (tkt_struct_v != 1) goto bad; memcpy(&old_key, &th->session_key, sizeof(old_key)); @@ -224,7 +235,7 @@ static int ceph_x_proc_ticket_reply(struct ceph_auth_client *ac, tpend = tp + dlen; dout(" ticket blob is %d bytes\n", dlen); ceph_decode_need(&tp, tpend, 1 + sizeof(u64), bad); - struct_v = ceph_decode_8(&tp); + blob_struct_v = ceph_decode_8(&tp); new_secret_id = ceph_decode_64(&tp); ret = ceph_decode_buffer(&new_ticket_blob, &tp, tpend); if (ret) @@ -418,7 +429,7 @@ static int ceph_x_build_request(struct ceph_auth_client *ac, auth->struct_v = 1; auth->key = 0; for (u = (u64 *)tmp_enc; u + 1 <= (u64 *)(tmp_enc + ret); u++) - auth->key ^= *u; + auth->key ^= *(__le64 *)u; dout(" server_challenge %llx client_challenge %llx key %llx\n", xi->server_challenge, le64_to_cpu(auth->client_challenge), le64_to_cpu(auth->key)); @@ -482,7 +493,7 @@ static int ceph_x_handle_reply(struct ceph_auth_client *ac, int result, return -EAGAIN; } - op = le32_to_cpu(head->op); + op = le16_to_cpu(head->op); result = le32_to_cpu(head->result); dout("handle_reply op %d result %d\n", op, result); switch (op) { @@ -602,6 +613,9 @@ static void ceph_x_destroy(struct ceph_auth_client *ac) remove_ticket_handler(ac, th); } + if (xi->auth_authorizer.buf) + ceph_buffer_put(xi->auth_authorizer.buf); + kfree(ac->private); ac->private = NULL; } @@ -618,7 +632,9 @@ static void ceph_x_invalidate_authorizer(struct ceph_auth_client *ac, static const struct ceph_auth_client_ops ceph_x_ops = { + .name = "x", .is_authenticated = ceph_x_is_authenticated, + .should_authenticate = ceph_x_should_authenticate, .build_request = ceph_x_build_request, .handle_reply = ceph_x_handle_reply, .create_authorizer = ceph_x_create_authorizer, diff --git a/fs/ceph/buffer.c b/fs/ceph/buffer.c index c67535d70aa..cd39f17021d 100644 --- a/fs/ceph/buffer.c +++ b/fs/ceph/buffer.c @@ -47,22 +47,6 @@ void ceph_buffer_release(struct kref *kref) kfree(b); } -int ceph_buffer_alloc(struct ceph_buffer *b, int len, gfp_t gfp) -{ - b->vec.iov_base = kmalloc(len, gfp | __GFP_NOWARN); - if (b->vec.iov_base) { - b->is_vmalloc = false; - } else { - b->vec.iov_base = __vmalloc(len, gfp, PAGE_KERNEL); - b->is_vmalloc = true; - } - if (!b->vec.iov_base) - return -ENOMEM; - b->alloc_len = len; - b->vec.iov_len = len; - return 0; -} - int ceph_decode_buffer(struct ceph_buffer **b, void **p, void *end) { size_t len; diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c index 0c168180686..7bf182b0397 100644 --- a/fs/ceph/caps.c +++ b/fs/ceph/caps.c @@ -113,58 +113,41 @@ const char *ceph_cap_string(int caps) return cap_str[i]; } -/* - * Cap reservations - * - * Maintain a global pool of preallocated struct ceph_caps, referenced - * by struct ceph_caps_reservations. This ensures that we preallocate - * memory needed to successfully process an MDS response. (If an MDS - * sends us cap information and we fail to process it, we will have - * problems due to the client and MDS being out of sync.) - * - * Reservations are 'owned' by a ceph_cap_reservation context. - */ -static spinlock_t caps_list_lock; -static struct list_head caps_list; /* unused (reserved or unreserved) */ -static int caps_total_count; /* total caps allocated */ -static int caps_use_count; /* in use */ -static int caps_reserve_count; /* unused, reserved */ -static int caps_avail_count; /* unused, unreserved */ -static int caps_min_count; /* keep at least this many (unreserved) */ - -void __init ceph_caps_init(void) +void ceph_caps_init(struct ceph_mds_client *mdsc) { - INIT_LIST_HEAD(&caps_list); - spin_lock_init(&caps_list_lock); + INIT_LIST_HEAD(&mdsc->caps_list); + spin_lock_init(&mdsc->caps_list_lock); } -void ceph_caps_finalize(void) +void ceph_caps_finalize(struct ceph_mds_client *mdsc) { struct ceph_cap *cap; - spin_lock(&caps_list_lock); - while (!list_empty(&caps_list)) { - cap = list_first_entry(&caps_list, struct ceph_cap, caps_item); + spin_lock(&mdsc->caps_list_lock); + while (!list_empty(&mdsc->caps_list)) { + cap = list_first_entry(&mdsc->caps_list, + struct ceph_cap, caps_item); list_del(&cap->caps_item); kmem_cache_free(ceph_cap_cachep, cap); } - caps_total_count = 0; - caps_avail_count = 0; - caps_use_count = 0; - caps_reserve_count = 0; - caps_min_count = 0; - spin_unlock(&caps_list_lock); + mdsc->caps_total_count = 0; + mdsc->caps_avail_count = 0; + mdsc->caps_use_count = 0; + mdsc->caps_reserve_count = 0; + mdsc->caps_min_count = 0; + spin_unlock(&mdsc->caps_list_lock); } -void ceph_adjust_min_caps(int delta) +void ceph_adjust_min_caps(struct ceph_mds_client *mdsc, int delta) { - spin_lock(&caps_list_lock); - caps_min_count += delta; - BUG_ON(caps_min_count < 0); - spin_unlock(&caps_list_lock); + spin_lock(&mdsc->caps_list_lock); + mdsc->caps_min_count += delta; + BUG_ON(mdsc->caps_min_count < 0); + spin_unlock(&mdsc->caps_list_lock); } -int ceph_reserve_caps(struct ceph_cap_reservation *ctx, int need) +int ceph_reserve_caps(struct ceph_mds_client *mdsc, + struct ceph_cap_reservation *ctx, int need) { int i; struct ceph_cap *cap; @@ -176,16 +159,17 @@ int ceph_reserve_caps(struct ceph_cap_reservation *ctx, int need) dout("reserve caps ctx=%p need=%d\n", ctx, need); /* first reserve any caps that are already allocated */ - spin_lock(&caps_list_lock); - if (caps_avail_count >= need) + spin_lock(&mdsc->caps_list_lock); + if (mdsc->caps_avail_count >= need) have = need; else - have = caps_avail_count; - caps_avail_count -= have; - caps_reserve_count += have; - BUG_ON(caps_total_count != caps_use_count + caps_reserve_count + - caps_avail_count); - spin_unlock(&caps_list_lock); + have = mdsc->caps_avail_count; + mdsc->caps_avail_count -= have; + mdsc->caps_reserve_count += have; + BUG_ON(mdsc->caps_total_count != mdsc->caps_use_count + + mdsc->caps_reserve_count + + mdsc->caps_avail_count); + spin_unlock(&mdsc->caps_list_lock); for (i = have; i < need; i++) { cap = kmem_cache_alloc(ceph_cap_cachep, GFP_NOFS); @@ -198,19 +182,20 @@ int ceph_reserve_caps(struct ceph_cap_reservation *ctx, int need) } BUG_ON(have + alloc != need); - spin_lock(&caps_list_lock); - caps_total_count += alloc; - caps_reserve_count += alloc; - list_splice(&newcaps, &caps_list); + spin_lock(&mdsc->caps_list_lock); + mdsc->caps_total_count += alloc; + mdsc->caps_reserve_count += alloc; + list_splice(&newcaps, &mdsc->caps_list); - BUG_ON(caps_total_count != caps_use_count + caps_reserve_count + - caps_avail_count); - spin_unlock(&caps_list_lock); + BUG_ON(mdsc->caps_total_count != mdsc->caps_use_count + + mdsc->caps_reserve_count + + mdsc->caps_avail_count); + spin_unlock(&mdsc->caps_list_lock); ctx->count = need; dout("reserve caps ctx=%p %d = %d used + %d resv + %d avail\n", - ctx, caps_total_count, caps_use_count, caps_reserve_count, - caps_avail_count); + ctx, mdsc->caps_total_count, mdsc->caps_use_count, + mdsc->caps_reserve_count, mdsc->caps_avail_count); return 0; out_alloc_count: @@ -220,92 +205,104 @@ out_alloc_count: return ret; } -int ceph_unreserve_caps(struct ceph_cap_reservation *ctx) +int ceph_unreserve_caps(struct ceph_mds_client *mdsc, + struct ceph_cap_reservation *ctx) { dout("unreserve caps ctx=%p count=%d\n", ctx, ctx->count); if (ctx->count) { - spin_lock(&caps_list_lock); - BUG_ON(caps_reserve_count < ctx->count); - caps_reserve_count -= ctx->count; - caps_avail_count += ctx->count; + spin_lock(&mdsc->caps_list_lock); + BUG_ON(mdsc->caps_reserve_count < ctx->count); + mdsc->caps_reserve_count -= ctx->count; + mdsc->caps_avail_count += ctx->count; ctx->count = 0; dout("unreserve caps %d = %d used + %d resv + %d avail\n", - caps_total_count, caps_use_count, caps_reserve_count, - caps_avail_count); - BUG_ON(caps_total_count != caps_use_count + caps_reserve_count + - caps_avail_count); - spin_unlock(&caps_list_lock); + mdsc->caps_total_count, mdsc->caps_use_count, + mdsc->caps_reserve_count, mdsc->caps_avail_count); + BUG_ON(mdsc->caps_total_count != mdsc->caps_use_count + + mdsc->caps_reserve_count + + mdsc->caps_avail_count); + spin_unlock(&mdsc->caps_list_lock); } return 0; } -static struct ceph_cap *get_cap(struct ceph_cap_reservation *ctx) +static struct ceph_cap *get_cap(struct ceph_mds_client *mdsc, + struct ceph_cap_reservation *ctx) { struct ceph_cap *cap = NULL; /* temporary, until we do something about cap import/export */ - if (!ctx) - return kmem_cache_alloc(ceph_cap_cachep, GFP_NOFS); + if (!ctx) { + cap = kmem_cache_alloc(ceph_cap_cachep, GFP_NOFS); + if (cap) { + mdsc->caps_use_count++; + mdsc->caps_total_count++; + } + return cap; + } - spin_lock(&caps_list_lock); + spin_lock(&mdsc->caps_list_lock); dout("get_cap ctx=%p (%d) %d = %d used + %d resv + %d avail\n", - ctx, ctx->count, caps_total_count, caps_use_count, - caps_reserve_count, caps_avail_count); + ctx, ctx->count, mdsc->caps_total_count, mdsc->caps_use_count, + mdsc->caps_reserve_count, mdsc->caps_avail_count); BUG_ON(!ctx->count); - BUG_ON(ctx->count > caps_reserve_count); - BUG_ON(list_empty(&caps_list)); + BUG_ON(ctx->count > mdsc->caps_reserve_count); + BUG_ON(list_empty(&mdsc->caps_list)); ctx->count--; - caps_reserve_count--; - caps_use_count++; + mdsc->caps_reserve_count--; + mdsc->caps_use_count++; - cap = list_first_entry(&caps_list, struct ceph_cap, caps_item); + cap = list_first_entry(&mdsc->caps_list, struct ceph_cap, caps_item); list_del(&cap->caps_item); - BUG_ON(caps_total_count != caps_use_count + caps_reserve_count + - caps_avail_count); - spin_unlock(&caps_list_lock); + BUG_ON(mdsc->caps_total_count != mdsc->caps_use_count + + mdsc->caps_reserve_count + mdsc->caps_avail_count); + spin_unlock(&mdsc->caps_list_lock); return cap; } -void ceph_put_cap(struct ceph_cap *cap) +void ceph_put_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap) { - spin_lock(&caps_list_lock); + spin_lock(&mdsc->caps_list_lock); dout("put_cap %p %d = %d used + %d resv + %d avail\n", - cap, caps_total_count, caps_use_count, - caps_reserve_count, caps_avail_count); - caps_use_count--; + cap, mdsc->caps_total_count, mdsc->caps_use_count, + mdsc->caps_reserve_count, mdsc->caps_avail_count); + mdsc->caps_use_count--; /* * Keep some preallocated caps around (ceph_min_count), to * avoid lots of free/alloc churn. */ - if (caps_avail_count >= caps_reserve_count + caps_min_count) { - caps_total_count--; + if (mdsc->caps_avail_count >= mdsc->caps_reserve_count + + mdsc->caps_min_count) { + mdsc->caps_total_count--; kmem_cache_free(ceph_cap_cachep, cap); } else { - caps_avail_count++; - list_add(&cap->caps_item, &caps_list); + mdsc->caps_avail_count++; + list_add(&cap->caps_item, &mdsc->caps_list); } - BUG_ON(caps_total_count != caps_use_count + caps_reserve_count + - caps_avail_count); - spin_unlock(&caps_list_lock); + BUG_ON(mdsc->caps_total_count != mdsc->caps_use_count + + mdsc->caps_reserve_count + mdsc->caps_avail_count); + spin_unlock(&mdsc->caps_list_lock); } void ceph_reservation_status(struct ceph_client *client, int *total, int *avail, int *used, int *reserved, int *min) { + struct ceph_mds_client *mdsc = &client->mdsc; + if (total) - *total = caps_total_count; + *total = mdsc->caps_total_count; if (avail) - *avail = caps_avail_count; + *avail = mdsc->caps_avail_count; if (used) - *used = caps_use_count; + *used = mdsc->caps_use_count; if (reserved) - *reserved = caps_reserve_count; + *reserved = mdsc->caps_reserve_count; if (min) - *min = caps_min_count; + *min = mdsc->caps_min_count; } /* @@ -330,22 +327,29 @@ static struct ceph_cap *__get_cap_for_mds(struct ceph_inode_info *ci, int mds) return NULL; } +struct ceph_cap *ceph_get_cap_for_mds(struct ceph_inode_info *ci, int mds) +{ + struct ceph_cap *cap; + + spin_lock(&ci->vfs_inode.i_lock); + cap = __get_cap_for_mds(ci, mds); + spin_unlock(&ci->vfs_inode.i_lock); + return cap; +} + /* - * Return id of any MDS with a cap, preferably FILE_WR|WRBUFFER|EXCL, else - * -1. + * Return id of any MDS with a cap, preferably FILE_WR|BUFFER|EXCL, else -1. */ -static int __ceph_get_cap_mds(struct ceph_inode_info *ci, u32 *mseq) +static int __ceph_get_cap_mds(struct ceph_inode_info *ci) { struct ceph_cap *cap; int mds = -1; struct rb_node *p; - /* prefer mds with WR|WRBUFFER|EXCL caps */ + /* prefer mds with WR|BUFFER|EXCL caps */ for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) { cap = rb_entry(p, struct ceph_cap, ci_node); mds = cap->mds; - if (mseq) - *mseq = cap->mseq; if (cap->issued & (CEPH_CAP_FILE_WR | CEPH_CAP_FILE_BUFFER | CEPH_CAP_FILE_EXCL)) @@ -358,7 +362,7 @@ int ceph_get_cap_mds(struct inode *inode) { int mds; spin_lock(&inode->i_lock); - mds = __ceph_get_cap_mds(ceph_inode(inode), NULL); + mds = __ceph_get_cap_mds(ceph_inode(inode)); spin_unlock(&inode->i_lock); return mds; } @@ -477,8 +481,8 @@ static void __check_cap_issue(struct ceph_inode_info *ci, struct ceph_cap *cap, * Each time we receive FILE_CACHE anew, we increment * i_rdcache_gen. */ - if ((issued & CEPH_CAP_FILE_CACHE) && - (had & CEPH_CAP_FILE_CACHE) == 0) + if ((issued & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) && + (had & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) == 0) ci->i_rdcache_gen++; /* @@ -537,7 +541,7 @@ retry: new_cap = NULL; } else { spin_unlock(&inode->i_lock); - new_cap = get_cap(caps_reservation); + new_cap = get_cap(mdsc, caps_reservation); if (new_cap == NULL) return -ENOMEM; goto retry; @@ -582,6 +586,7 @@ retry: } else { pr_err("ceph_add_cap: couldn't find snap realm %llx\n", realmino); + WARN_ON(!realm); } } @@ -621,7 +626,7 @@ retry: if (fmode >= 0) __ceph_get_fmode(ci, fmode); spin_unlock(&inode->i_lock); - wake_up(&ci->i_cap_wq); + wake_up_all(&ci->i_cap_wq); return 0; } @@ -825,7 +830,7 @@ int __ceph_caps_file_wanted(struct ceph_inode_info *ci) { int want = 0; int mode; - for (mode = 0; mode < 4; mode++) + for (mode = 0; mode < CEPH_FILE_MODE_NUM; mode++) if (ci->i_nr_by_mode[mode]) want |= ceph_caps_for_mode(mode); return want; @@ -858,6 +863,8 @@ static int __ceph_is_any_caps(struct ceph_inode_info *ci) } /* + * Remove a cap. Take steps to deal with a racing iterate_session_caps. + * * caller should hold i_lock. * caller will not hold session s_mutex if called from destroy_inode. */ @@ -865,16 +872,12 @@ void __ceph_remove_cap(struct ceph_cap *cap) { struct ceph_mds_session *session = cap->session; struct ceph_inode_info *ci = cap->ci; - struct ceph_mds_client *mdsc = &ceph_client(ci->vfs_inode.i_sb)->mdsc; + struct ceph_mds_client *mdsc = + &ceph_sb_to_client(ci->vfs_inode.i_sb)->mdsc; + int removed = 0; dout("__ceph_remove_cap %p from %p\n", cap, &ci->vfs_inode); - /* remove from inode list */ - rb_erase(&cap->ci_node, &ci->i_caps); - cap->ci = NULL; - if (ci->i_auth_cap == cap) - ci->i_auth_cap = NULL; - /* remove from session list */ spin_lock(&session->s_cap_lock); if (session->s_cap_iterator == cap) { @@ -885,11 +888,19 @@ void __ceph_remove_cap(struct ceph_cap *cap) list_del_init(&cap->session_caps); session->s_nr_caps--; cap->session = NULL; + removed = 1; } + /* protect backpointer with s_cap_lock: see iterate_session_caps */ + cap->ci = NULL; spin_unlock(&session->s_cap_lock); - if (cap->session == NULL) - ceph_put_cap(cap); + /* remove from inode list */ + rb_erase(&cap->ci_node, &ci->i_caps); + if (ci->i_auth_cap == cap) + ci->i_auth_cap = NULL; + + if (removed) + ceph_put_cap(mdsc, cap); if (!__ceph_is_any_caps(ci) && ci->i_snap_realm) { struct ceph_snap_realm *realm = ci->i_snap_realm; @@ -932,9 +943,9 @@ static int send_cap_msg(struct ceph_mds_session *session, seq, issue_seq, mseq, follows, size, max_size, xattr_version, xattrs_buf ? (int)xattrs_buf->vec.iov_len : 0); - msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPS, sizeof(*fc), 0, 0, NULL); - if (IS_ERR(msg)) - return PTR_ERR(msg); + msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPS, sizeof(*fc), GFP_NOFS); + if (!msg) + return -ENOMEM; msg->hdr.tid = cpu_to_le64(flush_tid); @@ -975,6 +986,46 @@ static int send_cap_msg(struct ceph_mds_session *session, return 0; } +static void __queue_cap_release(struct ceph_mds_session *session, + u64 ino, u64 cap_id, u32 migrate_seq, + u32 issue_seq) +{ + struct ceph_msg *msg; + struct ceph_mds_cap_release *head; + struct ceph_mds_cap_item *item; + + spin_lock(&session->s_cap_lock); + BUG_ON(!session->s_num_cap_releases); + msg = list_first_entry(&session->s_cap_releases, + struct ceph_msg, list_head); + + dout(" adding %llx release to mds%d msg %p (%d left)\n", + ino, session->s_mds, msg, session->s_num_cap_releases); + + BUG_ON(msg->front.iov_len + sizeof(*item) > PAGE_CACHE_SIZE); + head = msg->front.iov_base; + head->num = cpu_to_le32(le32_to_cpu(head->num) + 1); + item = msg->front.iov_base + msg->front.iov_len; + item->ino = cpu_to_le64(ino); + item->cap_id = cpu_to_le64(cap_id); + item->migrate_seq = cpu_to_le32(migrate_seq); + item->seq = cpu_to_le32(issue_seq); + + session->s_num_cap_releases--; + + msg->front.iov_len += sizeof(*item); + if (le32_to_cpu(head->num) == CEPH_CAPS_PER_RELEASE) { + dout(" release msg %p full\n", msg); + list_move_tail(&msg->list_head, &session->s_cap_releases_done); + } else { + dout(" release msg %p at %d/%d (%d)\n", msg, + (int)le32_to_cpu(head->num), + (int)CEPH_CAPS_PER_RELEASE, + (int)msg->front.iov_len); + } + spin_unlock(&session->s_cap_lock); +} + /* * Queue cap releases when an inode is dropped from our cache. Since * inode is about to be destroyed, there is no need for i_lock. @@ -988,41 +1039,9 @@ void ceph_queue_caps_release(struct inode *inode) while (p) { struct ceph_cap *cap = rb_entry(p, struct ceph_cap, ci_node); struct ceph_mds_session *session = cap->session; - struct ceph_msg *msg; - struct ceph_mds_cap_release *head; - struct ceph_mds_cap_item *item; - spin_lock(&session->s_cap_lock); - BUG_ON(!session->s_num_cap_releases); - msg = list_first_entry(&session->s_cap_releases, - struct ceph_msg, list_head); - - dout(" adding %p release to mds%d msg %p (%d left)\n", - inode, session->s_mds, msg, session->s_num_cap_releases); - - BUG_ON(msg->front.iov_len + sizeof(*item) > PAGE_CACHE_SIZE); - head = msg->front.iov_base; - head->num = cpu_to_le32(le32_to_cpu(head->num) + 1); - item = msg->front.iov_base + msg->front.iov_len; - item->ino = cpu_to_le64(ceph_ino(inode)); - item->cap_id = cpu_to_le64(cap->cap_id); - item->migrate_seq = cpu_to_le32(cap->mseq); - item->seq = cpu_to_le32(cap->issue_seq); - - session->s_num_cap_releases--; - - msg->front.iov_len += sizeof(*item); - if (le32_to_cpu(head->num) == CEPH_CAPS_PER_RELEASE) { - dout(" release msg %p full\n", msg); - list_move_tail(&msg->list_head, - &session->s_cap_releases_done); - } else { - dout(" release msg %p at %d/%d (%d)\n", msg, - (int)le32_to_cpu(head->num), - (int)CEPH_CAPS_PER_RELEASE, - (int)msg->front.iov_len); - } - spin_unlock(&session->s_cap_lock); + __queue_cap_release(session, ceph_ino(inode), cap->cap_id, + cap->mseq, cap->issue_seq); p = rb_next(p); __ceph_remove_cap(cap); } @@ -1161,7 +1180,7 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap, } if (wake) - wake_up(&ci->i_cap_wq); + wake_up_all(&ci->i_cap_wq); return delayed; } @@ -1177,6 +1196,8 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap, */ void __ceph_flush_snaps(struct ceph_inode_info *ci, struct ceph_mds_session **psession) + __releases(ci->vfs_inode->i_lock) + __acquires(ci->vfs_inode->i_lock) { struct inode *inode = &ci->vfs_inode; int mds; @@ -1212,7 +1233,13 @@ retry: BUG_ON(capsnap->dirty == 0); /* pick mds, take s_mutex */ - mds = __ceph_get_cap_mds(ci, &mseq); + if (ci->i_auth_cap == NULL) { + dout("no auth cap (migrating?), doing nothing\n"); + goto out; + } + mds = ci->i_auth_cap->session->s_mds; + mseq = ci->i_auth_cap->mseq; + if (session && session->s_mds != mds) { dout("oops, wrong session %p mutex\n", session); mutex_unlock(&session->s_mutex); @@ -1231,8 +1258,8 @@ retry: } /* * if session == NULL, we raced against a cap - * deletion. retry, and we'll get a better - * @mds value next time. + * deletion or migration. retry, and we'll + * get a better @mds value next time. */ spin_lock(&inode->i_lock); goto retry; @@ -1270,6 +1297,7 @@ retry: list_del_init(&ci->i_snap_flush_item); spin_unlock(&mdsc->snap_flush_lock); +out: if (psession) *psession = session; else if (session) { @@ -1293,7 +1321,8 @@ static void ceph_flush_snaps(struct ceph_inode_info *ci) */ void __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask) { - struct ceph_mds_client *mdsc = &ceph_client(ci->vfs_inode.i_sb)->mdsc; + struct ceph_mds_client *mdsc = + &ceph_sb_to_client(ci->vfs_inode.i_sb)->mdsc; struct inode *inode = &ci->vfs_inode; int was = ci->i_dirty_caps; int dirty = 0; @@ -1331,7 +1360,7 @@ void __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask) static int __mark_caps_flushing(struct inode *inode, struct ceph_mds_session *session) { - struct ceph_mds_client *mdsc = &ceph_client(inode->i_sb)->mdsc; + struct ceph_mds_client *mdsc = &ceph_sb_to_client(inode->i_sb)->mdsc; struct ceph_inode_info *ci = ceph_inode(inode); int flushing; @@ -1414,7 +1443,6 @@ static int try_nonblocking_invalidate(struct inode *inode) */ void ceph_check_caps(struct ceph_inode_info *ci, int flags, struct ceph_mds_session *session) - __releases(session->s_mutex) { struct ceph_client *client = ceph_inode_to_client(&ci->vfs_inode); struct ceph_mds_client *mdsc = &client->mdsc; @@ -1489,11 +1517,13 @@ retry_locked: ci->i_wrbuffer_ref == 0 && /* no dirty pages... */ ci->i_rdcache_gen && /* may have cached pages */ (file_wanted == 0 || /* no open files */ - (revoking & CEPH_CAP_FILE_CACHE)) && /* or revoking cache */ + (revoking & (CEPH_CAP_FILE_CACHE| + CEPH_CAP_FILE_LAZYIO))) && /* or revoking cache */ !tried_invalidate) { dout("check_caps trying to invalidate on %p\n", inode); if (try_nonblocking_invalidate(inode) < 0) { - if (revoking & CEPH_CAP_FILE_CACHE) { + if (revoking & (CEPH_CAP_FILE_CACHE| + CEPH_CAP_FILE_LAZYIO)) { dout("check_caps queuing invalidate\n"); queue_invalidate = 1; ci->i_rdcache_revoking = ci->i_rdcache_gen; @@ -1658,7 +1688,7 @@ ack: static int try_flush_caps(struct inode *inode, struct ceph_mds_session *session, unsigned *flush_tid) { - struct ceph_mds_client *mdsc = &ceph_client(inode->i_sb)->mdsc; + struct ceph_mds_client *mdsc = &ceph_sb_to_client(inode->i_sb)->mdsc; struct ceph_inode_info *ci = ceph_inode(inode); int unlock_session = session ? 0 : 1; int flushing = 0; @@ -1711,10 +1741,9 @@ out_unlocked: static int caps_are_flushed(struct inode *inode, unsigned tid) { struct ceph_inode_info *ci = ceph_inode(inode); - int dirty, i, ret = 1; + int i, ret = 1; spin_lock(&inode->i_lock); - dirty = __ceph_caps_dirty(ci); for (i = 0; i < CEPH_CAP_BITS; i++) if ((ci->i_flushing_caps & (1 << i)) && ci->i_cap_flush_tid[i] <= tid) { @@ -1770,9 +1799,9 @@ out: spin_unlock(&ci->i_unsafe_lock); } -int ceph_fsync(struct file *file, struct dentry *dentry, int datasync) +int ceph_fsync(struct file *file, int datasync) { - struct inode *inode = dentry->d_inode; + struct inode *inode = file->f_mapping->host; struct ceph_inode_info *ci = ceph_inode(inode); unsigned flush_tid; int ret; @@ -1824,7 +1853,8 @@ int ceph_write_inode(struct inode *inode, struct writeback_control *wbc) err = wait_event_interruptible(ci->i_cap_wq, caps_are_flushed(inode, flush_tid)); } else { - struct ceph_mds_client *mdsc = &ceph_client(inode->i_sb)->mdsc; + struct ceph_mds_client *mdsc = + &ceph_sb_to_client(inode->i_sb)->mdsc; spin_lock(&inode->i_lock); if (__ceph_caps_dirty(ci)) @@ -2132,7 +2162,7 @@ void ceph_put_cap_refs(struct ceph_inode_info *ci, int had) else if (flushsnaps) ceph_flush_snaps(ci); if (wake) - wake_up(&ci->i_cap_wq); + wake_up_all(&ci->i_cap_wq); if (put) iput(inode); } @@ -2208,7 +2238,7 @@ void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr, iput(inode); } else if (complete_capsnap) { ceph_flush_snaps(ci); - wake_up(&ci->i_cap_wq); + wake_up_all(&ci->i_cap_wq); } if (drop_capsnap) iput(inode); @@ -2229,8 +2259,7 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant, struct ceph_mds_session *session, struct ceph_cap *cap, struct ceph_buffer *xattr_buf) - __releases(inode->i_lock) - __releases(session->s_mutex) + __releases(inode->i_lock) { struct ceph_inode_info *ci = ceph_inode(inode); int mds = session->s_mds; @@ -2257,6 +2286,7 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant, * will invalidate _after_ writeback.) */ if (((cap->issued & ~newcaps) & CEPH_CAP_FILE_CACHE) && + (newcaps & CEPH_CAP_FILE_LAZYIO) == 0 && !ci->i_wrbuffer_ref) { if (try_nonblocking_invalidate(inode) == 0) { revoked_rdcache = 1; @@ -2348,15 +2378,22 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant, /* revocation, grant, or no-op? */ if (cap->issued & ~newcaps) { - dout("revocation: %s -> %s\n", ceph_cap_string(cap->issued), - ceph_cap_string(newcaps)); - if ((used & ~newcaps) & CEPH_CAP_FILE_BUFFER) - writeback = 1; /* will delay ack */ - else if (dirty & ~newcaps) - check_caps = 1; /* initiate writeback in check_caps */ - else if (((used & ~newcaps) & CEPH_CAP_FILE_CACHE) == 0 || - revoked_rdcache) - check_caps = 2; /* send revoke ack in check_caps */ + int revoking = cap->issued & ~newcaps; + + dout("revocation: %s -> %s (revoking %s)\n", + ceph_cap_string(cap->issued), + ceph_cap_string(newcaps), + ceph_cap_string(revoking)); + if (revoking & used & CEPH_CAP_FILE_BUFFER) + writeback = 1; /* initiate writeback; will delay ack */ + else if (revoking == CEPH_CAP_FILE_CACHE && + (newcaps & CEPH_CAP_FILE_LAZYIO) == 0 && + queue_invalidate) + ; /* do nothing yet, invalidation will be queued */ + else if (cap == ci->i_auth_cap) + check_caps = 1; /* check auth cap only */ + else + check_caps = 2; /* check all caps */ cap->issued = newcaps; cap->implemented |= newcaps; } else if (cap->issued == newcaps) { @@ -2384,7 +2421,7 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant, if (queue_invalidate) ceph_queue_invalidate(inode); if (wake) - wake_up(&ci->i_cap_wq); + wake_up_all(&ci->i_cap_wq); if (check_caps == 1) ceph_check_caps(ci, CHECK_CAPS_NODELAY|CHECK_CAPS_AUTHONLY, @@ -2406,7 +2443,7 @@ static void handle_cap_flush_ack(struct inode *inode, u64 flush_tid, __releases(inode->i_lock) { struct ceph_inode_info *ci = ceph_inode(inode); - struct ceph_mds_client *mdsc = &ceph_client(inode->i_sb)->mdsc; + struct ceph_mds_client *mdsc = &ceph_sb_to_client(inode->i_sb)->mdsc; unsigned seq = le32_to_cpu(m->seq); int dirty = le32_to_cpu(m->dirty); int cleaned = 0; @@ -2439,7 +2476,7 @@ static void handle_cap_flush_ack(struct inode *inode, u64 flush_tid, struct ceph_inode_info, i_flushing_item)->vfs_inode); mdsc->num_cap_flushing--; - wake_up(&mdsc->cap_flushing_wq); + wake_up_all(&mdsc->cap_flushing_wq); dout(" inode %p now !flushing\n", inode); if (ci->i_dirty_caps == 0) { @@ -2451,7 +2488,7 @@ static void handle_cap_flush_ack(struct inode *inode, u64 flush_tid, } } spin_unlock(&mdsc->cap_dirty_lock); - wake_up(&ci->i_cap_wq); + wake_up_all(&ci->i_cap_wq); out: spin_unlock(&inode->i_lock); @@ -2547,7 +2584,8 @@ static void handle_cap_trunc(struct inode *inode, * caller holds s_mutex */ static void handle_cap_export(struct inode *inode, struct ceph_mds_caps *ex, - struct ceph_mds_session *session) + struct ceph_mds_session *session, + int *open_target_sessions) { struct ceph_inode_info *ci = ceph_inode(inode); int mds = session->s_mds; @@ -2579,6 +2617,12 @@ static void handle_cap_export(struct inode *inode, struct ceph_mds_caps *ex, ci->i_cap_exporting_mds = mds; ci->i_cap_exporting_mseq = mseq; ci->i_cap_exporting_issued = cap->issued; + + /* + * make sure we have open sessions with all possible + * export targets, so that we get the matching IMPORT + */ + *open_target_sessions = 1; } __ceph_remove_cap(cap); } @@ -2648,12 +2692,16 @@ void ceph_handle_caps(struct ceph_mds_session *session, struct ceph_mds_caps *h; int mds = session->s_mds; int op; - u32 seq; + u32 seq, mseq; struct ceph_vino vino; u64 cap_id; u64 size, max_size; u64 tid; void *snaptrace; + size_t snaptrace_len; + void *flock; + u32 flock_len; + int open_target_sessions = 0; dout("handle_caps from mds%d\n", mds); @@ -2662,15 +2710,30 @@ void ceph_handle_caps(struct ceph_mds_session *session, if (msg->front.iov_len < sizeof(*h)) goto bad; h = msg->front.iov_base; - snaptrace = h + 1; op = le32_to_cpu(h->op); vino.ino = le64_to_cpu(h->ino); vino.snap = CEPH_NOSNAP; cap_id = le64_to_cpu(h->cap_id); seq = le32_to_cpu(h->seq); + mseq = le32_to_cpu(h->migrate_seq); size = le64_to_cpu(h->size); max_size = le64_to_cpu(h->max_size); + snaptrace = h + 1; + snaptrace_len = le32_to_cpu(h->snap_trace_len); + + if (le16_to_cpu(msg->hdr.version) >= 2) { + void *p, *end; + + p = snaptrace + snaptrace_len; + end = msg->front.iov_base + msg->front.iov_len; + ceph_decode_32_safe(&p, end, flock_len, bad); + flock = p; + } else { + flock = NULL; + flock_len = 0; + } + mutex_lock(&session->s_mutex); session->s_seq++; dout(" mds%d seq %lld cap seq %u\n", session->s_mds, session->s_seq, @@ -2682,6 +2745,18 @@ void ceph_handle_caps(struct ceph_mds_session *session, vino.snap, inode); if (!inode) { dout(" i don't have ino %llx\n", vino.ino); + + if (op == CEPH_CAP_OP_IMPORT) + __queue_cap_release(session, vino.ino, cap_id, + mseq, seq); + + /* + * send any full release message to try to move things + * along for the mds (who clearly thinks we still have this + * cap). + */ + ceph_add_cap_releases(mdsc, session); + ceph_send_cap_releases(mdsc, session); goto done; } @@ -2692,12 +2767,12 @@ void ceph_handle_caps(struct ceph_mds_session *session, goto done; case CEPH_CAP_OP_EXPORT: - handle_cap_export(inode, h, session); + handle_cap_export(inode, h, session, &open_target_sessions); goto done; case CEPH_CAP_OP_IMPORT: handle_cap_import(mdsc, inode, h, session, - snaptrace, le32_to_cpu(h->snap_trace_len)); + snaptrace, snaptrace_len); ceph_check_caps(ceph_inode(inode), CHECK_CAPS_NODELAY, session); goto done_unlocked; @@ -2707,7 +2782,7 @@ void ceph_handle_caps(struct ceph_mds_session *session, spin_lock(&inode->i_lock); cap = __get_cap_for_mds(ceph_inode(inode), mds); if (!cap) { - dout("no cap on %p ino %llx.%llx from mds%d, releasing\n", + dout(" no cap on %p ino %llx.%llx from mds%d\n", inode, ceph_ino(inode), ceph_snap(inode), mds); spin_unlock(&inode->i_lock); goto done; @@ -2739,6 +2814,8 @@ done: done_unlocked: if (inode) iput(inode); + if (open_target_sessions) + ceph_mdsc_open_export_target_sessions(mdsc, session); return; bad: @@ -2858,18 +2935,19 @@ int ceph_encode_inode_release(void **p, struct inode *inode, struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_cap *cap; struct ceph_mds_request_release *rel = *p; + int used, dirty; int ret = 0; - int used = 0; spin_lock(&inode->i_lock); used = __ceph_caps_used(ci); + dirty = __ceph_caps_dirty(ci); - dout("encode_inode_release %p mds%d used %s drop %s unless %s\n", inode, - mds, ceph_cap_string(used), ceph_cap_string(drop), + dout("encode_inode_release %p mds%d used|dirty %s drop %s unless %s\n", + inode, mds, ceph_cap_string(used|dirty), ceph_cap_string(drop), ceph_cap_string(unless)); - /* only drop unused caps */ - drop &= ~used; + /* only drop unused, clean caps */ + drop &= ~(used | dirty); cap = __get_cap_for_mds(ci, mds); if (cap && __cap_is_valid(cap)) { @@ -2949,6 +3027,7 @@ int ceph_encode_dentry_release(void **p, struct dentry *dentry, memcpy(*p, dentry->d_name.name, dentry->d_name.len); *p += dentry->d_name.len; rel->dname_seq = cpu_to_le32(di->lease_seq); + __ceph_mdsc_drop_dentry_lease(dentry); } spin_unlock(&dentry->d_lock); return ret; diff --git a/fs/ceph/ceph_frag.h b/fs/ceph/ceph_frag.h index 793f50cb7c2..5babb8e9535 100644 --- a/fs/ceph/ceph_frag.h +++ b/fs/ceph/ceph_frag.h @@ -1,5 +1,5 @@ -#ifndef _FS_CEPH_FRAG_H -#define _FS_CEPH_FRAG_H +#ifndef FS_CEPH_FRAG_H +#define FS_CEPH_FRAG_H /* * "Frags" are a way to describe a subset of a 32-bit number space, diff --git a/fs/ceph/ceph_fs.c b/fs/ceph/ceph_fs.c index 79d76bc4303..3ac6cc7c115 100644 --- a/fs/ceph/ceph_fs.c +++ b/fs/ceph/ceph_fs.c @@ -29,46 +29,44 @@ int ceph_file_layout_is_valid(const struct ceph_file_layout *layout) int ceph_flags_to_mode(int flags) { + int mode; + #ifdef O_DIRECTORY /* fixme */ if ((flags & O_DIRECTORY) == O_DIRECTORY) return CEPH_FILE_MODE_PIN; #endif + if ((flags & O_APPEND) == O_APPEND) + flags |= O_WRONLY; + + if ((flags & O_ACCMODE) == O_RDWR) + mode = CEPH_FILE_MODE_RDWR; + else if ((flags & O_ACCMODE) == O_WRONLY) + mode = CEPH_FILE_MODE_WR; + else + mode = CEPH_FILE_MODE_RD; + #ifdef O_LAZY if (flags & O_LAZY) - return CEPH_FILE_MODE_LAZY; + mode |= CEPH_FILE_MODE_LAZY; #endif - if ((flags & O_APPEND) == O_APPEND) - flags |= O_WRONLY; - flags &= O_ACCMODE; - if ((flags & O_RDWR) == O_RDWR) - return CEPH_FILE_MODE_RDWR; - if ((flags & O_WRONLY) == O_WRONLY) - return CEPH_FILE_MODE_WR; - return CEPH_FILE_MODE_RD; + return mode; } int ceph_caps_for_mode(int mode) { - switch (mode) { - case CEPH_FILE_MODE_PIN: - return CEPH_CAP_PIN; - case CEPH_FILE_MODE_RD: - return CEPH_CAP_PIN | CEPH_CAP_FILE_SHARED | + int caps = CEPH_CAP_PIN; + + if (mode & CEPH_FILE_MODE_RD) + caps |= CEPH_CAP_FILE_SHARED | CEPH_CAP_FILE_RD | CEPH_CAP_FILE_CACHE; - case CEPH_FILE_MODE_RDWR: - return CEPH_CAP_PIN | CEPH_CAP_FILE_SHARED | - CEPH_CAP_FILE_EXCL | - CEPH_CAP_FILE_RD | CEPH_CAP_FILE_CACHE | - CEPH_CAP_FILE_WR | CEPH_CAP_FILE_BUFFER | - CEPH_CAP_AUTH_SHARED | CEPH_CAP_AUTH_EXCL | - CEPH_CAP_XATTR_SHARED | CEPH_CAP_XATTR_EXCL; - case CEPH_FILE_MODE_WR: - return CEPH_CAP_PIN | CEPH_CAP_FILE_SHARED | - CEPH_CAP_FILE_EXCL | + if (mode & CEPH_FILE_MODE_WR) + caps |= CEPH_CAP_FILE_EXCL | CEPH_CAP_FILE_WR | CEPH_CAP_FILE_BUFFER | CEPH_CAP_AUTH_SHARED | CEPH_CAP_AUTH_EXCL | CEPH_CAP_XATTR_SHARED | CEPH_CAP_XATTR_EXCL; - } - return 0; + if (mode & CEPH_FILE_MODE_LAZY) + caps |= CEPH_CAP_FILE_LAZYIO; + + return caps; } diff --git a/fs/ceph/ceph_fs.h b/fs/ceph/ceph_fs.h index 0c2241ef365..d5619ac8671 100644 --- a/fs/ceph/ceph_fs.h +++ b/fs/ceph/ceph_fs.h @@ -9,34 +9,20 @@ * LGPL2 */ -#ifndef _FS_CEPH_CEPH_FS_H -#define _FS_CEPH_CEPH_FS_H +#ifndef CEPH_FS_H +#define CEPH_FS_H #include "msgr.h" #include "rados.h" /* - * Ceph release version - */ -#define CEPH_VERSION_MAJOR 0 -#define CEPH_VERSION_MINOR 19 -#define CEPH_VERSION_PATCH 0 - -#define _CEPH_STRINGIFY(x) #x -#define CEPH_STRINGIFY(x) _CEPH_STRINGIFY(x) -#define CEPH_MAKE_VERSION(x, y, z) CEPH_STRINGIFY(x) "." CEPH_STRINGIFY(y) \ - "." CEPH_STRINGIFY(z) -#define CEPH_VERSION CEPH_MAKE_VERSION(CEPH_VERSION_MAJOR, \ - CEPH_VERSION_MINOR, CEPH_VERSION_PATCH) - -/* * subprotocol versions. when specific messages types or high-level * protocols change, bump the affected components. we keep rev * internal cluster protocols separately from the public, * client-facing protocol. */ #define CEPH_OSD_PROTOCOL 8 /* cluster internal */ -#define CEPH_MDS_PROTOCOL 9 /* cluster internal */ +#define CEPH_MDS_PROTOCOL 12 /* cluster internal */ #define CEPH_MON_PROTOCOL 5 /* cluster internal */ #define CEPH_OSDC_PROTOCOL 24 /* server/client */ #define CEPH_MDSC_PROTOCOL 32 /* server/client */ @@ -53,8 +39,10 @@ /* * feature bits */ -#define CEPH_FEATURE_SUPPORTED 0 -#define CEPH_FEATURE_REQUIRED 0 +#define CEPH_FEATURE_UID (1<<0) +#define CEPH_FEATURE_NOSRCADDR (1<<1) +#define CEPH_FEATURE_MONCLOCKCHECK (1<<2) +#define CEPH_FEATURE_FLOCK (1<<3) /* @@ -86,11 +74,15 @@ int ceph_file_layout_is_valid(const struct ceph_file_layout *layout); #define CEPH_CRYPTO_NONE 0x0 #define CEPH_CRYPTO_AES 0x1 +#define CEPH_AES_IV "cephsageyudagreg" + /* security/authentication protocols */ #define CEPH_AUTH_UNKNOWN 0x0 #define CEPH_AUTH_NONE 0x1 #define CEPH_AUTH_CEPHX 0x2 +#define CEPH_AUTH_UID_DEFAULT ((__u64) -1) + /********************************************* * message layer @@ -128,11 +120,27 @@ int ceph_file_layout_is_valid(const struct ceph_file_layout *layout); #define CEPH_MSG_CLIENT_SNAP 0x312 #define CEPH_MSG_CLIENT_CAPRELEASE 0x313 +/* pool ops */ +#define CEPH_MSG_POOLOP_REPLY 48 +#define CEPH_MSG_POOLOP 49 + + /* osd */ #define CEPH_MSG_OSD_MAP 41 #define CEPH_MSG_OSD_OP 42 #define CEPH_MSG_OSD_OPREPLY 43 +/* pool operations */ +enum { + POOL_OP_CREATE = 0x01, + POOL_OP_DELETE = 0x02, + POOL_OP_AUID_CHANGE = 0x03, + POOL_OP_CREATE_SNAP = 0x11, + POOL_OP_DELETE_SNAP = 0x12, + POOL_OP_CREATE_UNMANAGED_SNAP = 0x21, + POOL_OP_DELETE_UNMANAGED_SNAP = 0x22, +}; + struct ceph_mon_request_header { __le64 have_version; __le16 session_mon; @@ -155,6 +163,31 @@ struct ceph_mon_statfs_reply { struct ceph_statfs st; } __attribute__ ((packed)); +const char *ceph_pool_op_name(int op); + +struct ceph_mon_poolop { + struct ceph_mon_request_header monhdr; + struct ceph_fsid fsid; + __le32 pool; + __le32 op; + __le64 auid; + __le64 snapid; + __le32 name_len; +} __attribute__ ((packed)); + +struct ceph_mon_poolop_reply { + struct ceph_mon_request_header monhdr; + struct ceph_fsid fsid; + __le32 reply_code; + __le32 epoch; + char has_data; + char data[0]; +} __attribute__ ((packed)); + +struct ceph_mon_unmanaged_snap { + __le64 snapid; +} __attribute__ ((packed)); + struct ceph_osd_getmap { struct ceph_mon_request_header monhdr; struct ceph_fsid fsid; @@ -212,16 +245,18 @@ extern const char *ceph_mds_state_name(int s); * - they also define the lock ordering by the MDS * - a few of these are internal to the mds */ -#define CEPH_LOCK_DN 1 -#define CEPH_LOCK_ISNAP 2 -#define CEPH_LOCK_IVERSION 4 /* mds internal */ -#define CEPH_LOCK_IFILE 8 /* mds internal */ -#define CEPH_LOCK_IAUTH 32 -#define CEPH_LOCK_ILINK 64 -#define CEPH_LOCK_IDFT 128 /* dir frag tree */ -#define CEPH_LOCK_INEST 256 /* mds internal */ -#define CEPH_LOCK_IXATTR 512 -#define CEPH_LOCK_INO 2048 /* immutable inode bits; not a lock */ +#define CEPH_LOCK_DVERSION 1 +#define CEPH_LOCK_DN 2 +#define CEPH_LOCK_ISNAP 16 +#define CEPH_LOCK_IVERSION 32 /* mds internal */ +#define CEPH_LOCK_IFILE 64 +#define CEPH_LOCK_IAUTH 128 +#define CEPH_LOCK_ILINK 256 +#define CEPH_LOCK_IDFT 512 /* dir frag tree */ +#define CEPH_LOCK_INEST 1024 /* mds internal */ +#define CEPH_LOCK_IXATTR 2048 +#define CEPH_LOCK_IFLOCK 4096 /* advisory file locks */ +#define CEPH_LOCK_INO 8192 /* immutable inode bits; not a lock */ /* client_session ops */ enum { @@ -262,6 +297,8 @@ enum { CEPH_MDS_OP_RMXATTR = 0x01106, CEPH_MDS_OP_SETLAYOUT = 0x01107, CEPH_MDS_OP_SETATTR = 0x01108, + CEPH_MDS_OP_SETFILELOCK= 0x01109, + CEPH_MDS_OP_GETFILELOCK= 0x00110, CEPH_MDS_OP_MKNOD = 0x01201, CEPH_MDS_OP_LINK = 0x01202, @@ -308,6 +345,7 @@ union ceph_mds_request_args { struct { __le32 frag; /* which dir fragment */ __le32 max_entries; /* how many dentries to grab */ + __le32 max_bytes; } __attribute__ ((packed)) readdir; struct { __le32 mode; @@ -331,6 +369,15 @@ union ceph_mds_request_args { struct { struct ceph_file_layout layout; } __attribute__ ((packed)) setlayout; + struct { + __u8 rule; /* currently fcntl or flock */ + __u8 type; /* shared, exclusive, remove*/ + __le64 pid; /* process id requesting the lock */ + __le64 pid_namespace; + __le64 start; /* initial location to lock */ + __le64 length; /* num bytes to lock from start */ + __u8 wait; /* will caller wait for lock to become available? */ + } __attribute__ ((packed)) filelock_change; } __attribute__ ((packed)); #define CEPH_MDS_FLAG_REPLAY 1 /* this is a replayed op */ @@ -425,6 +472,23 @@ struct ceph_mds_reply_dirfrag { __le32 dist[]; } __attribute__ ((packed)); +#define CEPH_LOCK_FCNTL 1 +#define CEPH_LOCK_FLOCK 2 + +#define CEPH_LOCK_SHARED 1 +#define CEPH_LOCK_EXCL 2 +#define CEPH_LOCK_UNLOCK 4 + +struct ceph_filelock { + __le64 start;/* file offset to start lock at */ + __le64 length; /* num bytes to lock; 0 for all following start */ + __le64 client; /* which client holds the lock */ + __le64 pid; /* process id holding the lock on the client */ + __le64 pid_namespace; + __u8 type; /* shared lock, exclusive lock, or unlock */ +} __attribute__ ((packed)); + + /* file access modes */ #define CEPH_FILE_MODE_PIN 0 #define CEPH_FILE_MODE_RD 1 @@ -453,9 +517,10 @@ int ceph_flags_to_mode(int flags); #define CEPH_CAP_SAUTH 2 #define CEPH_CAP_SLINK 4 #define CEPH_CAP_SXATTR 6 -#define CEPH_CAP_SFILE 8 /* goes at the end (uses >2 cap bits) */ +#define CEPH_CAP_SFILE 8 +#define CEPH_CAP_SFLOCK 20 -#define CEPH_CAP_BITS 16 +#define CEPH_CAP_BITS 22 /* composed values */ #define CEPH_CAP_AUTH_SHARED (CEPH_CAP_GSHARED << CEPH_CAP_SAUTH) @@ -473,6 +538,9 @@ int ceph_flags_to_mode(int flags); #define CEPH_CAP_FILE_BUFFER (CEPH_CAP_GBUFFER << CEPH_CAP_SFILE) #define CEPH_CAP_FILE_WREXTEND (CEPH_CAP_GWREXTEND << CEPH_CAP_SFILE) #define CEPH_CAP_FILE_LAZYIO (CEPH_CAP_GLAZYIO << CEPH_CAP_SFILE) +#define CEPH_CAP_FLOCK_SHARED (CEPH_CAP_GSHARED << CEPH_CAP_SFLOCK) +#define CEPH_CAP_FLOCK_EXCL (CEPH_CAP_GEXCL << CEPH_CAP_SFLOCK) + /* cap masks (for getattr) */ #define CEPH_STAT_CAP_INODE CEPH_CAP_PIN @@ -508,7 +576,8 @@ int ceph_flags_to_mode(int flags); CEPH_CAP_FILE_EXCL) #define CEPH_CAP_ANY_WR (CEPH_CAP_ANY_EXCL | CEPH_CAP_ANY_FILE_WR) #define CEPH_CAP_ANY (CEPH_CAP_ANY_RD | CEPH_CAP_ANY_EXCL | \ - CEPH_CAP_ANY_FILE_WR | CEPH_CAP_PIN) + CEPH_CAP_ANY_FILE_WR | CEPH_CAP_FILE_LAZYIO | \ + CEPH_CAP_PIN) #define CEPH_CAP_LOCKS (CEPH_LOCK_IFILE | CEPH_LOCK_IAUTH | CEPH_LOCK_ILINK | \ CEPH_LOCK_IXATTR) @@ -598,12 +667,21 @@ struct ceph_mds_cap_reconnect { __le64 cap_id; __le32 wanted; __le32 issued; + __le64 snaprealm; + __le64 pathbase; /* base ino for our path to this ino */ + __le32 flock_len; /* size of flock state blob, if any */ +} __attribute__ ((packed)); +/* followed by flock blob */ + +struct ceph_mds_cap_reconnect_v1 { + __le64 cap_id; + __le32 wanted; + __le32 issued; __le64 size; struct ceph_timespec mtime, atime; __le64 snaprealm; __le64 pathbase; /* base ino for our path to this ino */ } __attribute__ ((packed)); -/* followed by encoded string */ struct ceph_mds_snaprealm_reconnect { __le64 ino; /* snap realm base */ diff --git a/fs/ceph/ceph_hash.h b/fs/ceph/ceph_hash.h index 5ac470c433c..d099c3f9023 100644 --- a/fs/ceph/ceph_hash.h +++ b/fs/ceph/ceph_hash.h @@ -1,5 +1,5 @@ -#ifndef _FS_CEPH_HASH_H -#define _FS_CEPH_HASH_H +#ifndef FS_CEPH_HASH_H +#define FS_CEPH_HASH_H #define CEPH_STR_HASH_LINUX 0x1 /* linux dcache hash */ #define CEPH_STR_HASH_RJENKINS 0x2 /* robert jenkins' */ diff --git a/fs/ceph/ceph_strings.c b/fs/ceph/ceph_strings.c index 8e4be6a80c6..c6179d3a26a 100644 --- a/fs/ceph/ceph_strings.c +++ b/fs/ceph/ceph_strings.c @@ -10,7 +10,6 @@ const char *ceph_entity_type_name(int type) case CEPH_ENTITY_TYPE_OSD: return "osd"; case CEPH_ENTITY_TYPE_MON: return "mon"; case CEPH_ENTITY_TYPE_CLIENT: return "client"; - case CEPH_ENTITY_TYPE_ADMIN: return "admin"; case CEPH_ENTITY_TYPE_AUTH: return "auth"; default: return "unknown"; } @@ -29,6 +28,7 @@ const char *ceph_osd_op_name(int op) case CEPH_OSD_OP_TRUNCATE: return "truncate"; case CEPH_OSD_OP_ZERO: return "zero"; case CEPH_OSD_OP_WRITEFULL: return "writefull"; + case CEPH_OSD_OP_ROLLBACK: return "rollback"; case CEPH_OSD_OP_APPEND: return "append"; case CEPH_OSD_OP_STARTSYNC: return "startsync"; @@ -45,6 +45,7 @@ const char *ceph_osd_op_name(int op) case CEPH_OSD_OP_SETXATTRS: return "setxattrs"; case CEPH_OSD_OP_RESETXATTRS: return "resetxattrs"; case CEPH_OSD_OP_RMXATTR: return "rmxattr"; + case CEPH_OSD_OP_CMPXATTR: return "cmpxattr"; case CEPH_OSD_OP_PULL: return "pull"; case CEPH_OSD_OP_PUSH: return "push"; @@ -129,6 +130,8 @@ const char *ceph_mds_op_name(int op) case CEPH_MDS_OP_LSSNAP: return "lssnap"; case CEPH_MDS_OP_MKSNAP: return "mksnap"; case CEPH_MDS_OP_RMSNAP: return "rmsnap"; + case CEPH_MDS_OP_SETFILELOCK: return "setfilelock"; + case CEPH_MDS_OP_GETFILELOCK: return "getfilelock"; } return "???"; } @@ -174,3 +177,17 @@ const char *ceph_snap_op_name(int o) } return "???"; } + +const char *ceph_pool_op_name(int op) +{ + switch (op) { + case POOL_OP_CREATE: return "create"; + case POOL_OP_DELETE: return "delete"; + case POOL_OP_AUID_CHANGE: return "auid change"; + case POOL_OP_CREATE_SNAP: return "create snap"; + case POOL_OP_DELETE_SNAP: return "delete snap"; + case POOL_OP_CREATE_UNMANAGED_SNAP: return "create unmanaged snap"; + case POOL_OP_DELETE_UNMANAGED_SNAP: return "delete unmanaged snap"; + } + return "???"; +} diff --git a/fs/ceph/crush/crush.h b/fs/ceph/crush/crush.h index dcd7e752370..97e435b191f 100644 --- a/fs/ceph/crush/crush.h +++ b/fs/ceph/crush/crush.h @@ -1,5 +1,5 @@ -#ifndef _CRUSH_CRUSH_H -#define _CRUSH_CRUSH_H +#ifndef CEPH_CRUSH_CRUSH_H +#define CEPH_CRUSH_CRUSH_H #include <linux/types.h> diff --git a/fs/ceph/crush/hash.h b/fs/ceph/crush/hash.h index ff48e110e4b..91e884230d5 100644 --- a/fs/ceph/crush/hash.h +++ b/fs/ceph/crush/hash.h @@ -1,5 +1,5 @@ -#ifndef _CRUSH_HASH_H -#define _CRUSH_HASH_H +#ifndef CEPH_CRUSH_HASH_H +#define CEPH_CRUSH_HASH_H #define CRUSH_HASH_RJENKINS1 0 diff --git a/fs/ceph/crush/mapper.c b/fs/ceph/crush/mapper.c index 9ba54efb654..a4eec133258 100644 --- a/fs/ceph/crush/mapper.c +++ b/fs/ceph/crush/mapper.c @@ -238,7 +238,7 @@ static int bucket_straw_choose(struct crush_bucket_straw *bucket, static int crush_bucket_choose(struct crush_bucket *in, int x, int r) { - dprintk("choose %d x=%d r=%d\n", in->id, x, r); + dprintk(" crush_bucket_choose %d x=%d r=%d\n", in->id, x, r); switch (in->alg) { case CRUSH_BUCKET_UNIFORM: return bucket_uniform_choose((struct crush_bucket_uniform *)in, @@ -264,7 +264,7 @@ static int crush_bucket_choose(struct crush_bucket *in, int x, int r) */ static int is_out(struct crush_map *map, __u32 *weight, int item, int x) { - if (weight[item] >= 0x1000) + if (weight[item] >= 0x10000) return 0; if (weight[item] == 0) return 1; @@ -305,7 +305,9 @@ static int crush_choose(struct crush_map *map, int itemtype; int collide, reject; const int orig_tries = 5; /* attempts before we fall back to search */ - dprintk("choose bucket %d x %d outpos %d\n", bucket->id, x, outpos); + + dprintk("CHOOSE%s bucket %d x %d outpos %d numrep %d\n", recurse_to_leaf ? "_LEAF" : "", + bucket->id, x, outpos, numrep); for (rep = outpos; rep < numrep; rep++) { /* keep trying until we get a non-out, non-colliding item */ @@ -366,6 +368,7 @@ static int crush_choose(struct crush_map *map, BUG_ON(item >= 0 || (-1-item) >= map->max_buckets); in = map->buckets[-1-item]; + retry_bucket = 1; continue; } @@ -377,15 +380,25 @@ static int crush_choose(struct crush_map *map, } } - if (recurse_to_leaf && - item < 0 && - crush_choose(map, map->buckets[-1-item], - weight, - x, outpos+1, 0, - out2, outpos, - firstn, 0, NULL) <= outpos) { - reject = 1; - } else { + reject = 0; + if (recurse_to_leaf) { + if (item < 0) { + if (crush_choose(map, + map->buckets[-1-item], + weight, + x, outpos+1, 0, + out2, outpos, + firstn, 0, + NULL) <= outpos) + /* didn't get leaf */ + reject = 1; + } else { + /* we already have a leaf! */ + out2[outpos] = item; + } + } + + if (!reject) { /* out? */ if (itemtype == 0) reject = is_out(map, weight, @@ -424,12 +437,12 @@ reject: continue; } - dprintk("choose got %d\n", item); + dprintk("CHOOSE got %d\n", item); out[outpos] = item; outpos++; } - dprintk("choose returns %d\n", outpos); + dprintk("CHOOSE returns %d\n", outpos); return outpos; } diff --git a/fs/ceph/crush/mapper.h b/fs/ceph/crush/mapper.h index 98e90046fd9..c46b99c18bb 100644 --- a/fs/ceph/crush/mapper.h +++ b/fs/ceph/crush/mapper.h @@ -1,5 +1,5 @@ -#ifndef _CRUSH_MAPPER_H -#define _CRUSH_MAPPER_H +#ifndef CEPH_CRUSH_MAPPER_H +#define CEPH_CRUSH_MAPPER_H /* * CRUSH functions for find rules and then mapping an input to an diff --git a/fs/ceph/crypto.c b/fs/ceph/crypto.c index f704b3b6242..a3e627f6329 100644 --- a/fs/ceph/crypto.c +++ b/fs/ceph/crypto.c @@ -75,10 +75,11 @@ static struct crypto_blkcipher *ceph_crypto_alloc_cipher(void) return crypto_alloc_blkcipher("cbc(aes)", 0, CRYPTO_ALG_ASYNC); } -const u8 *aes_iv = "cephsageyudagreg"; +static const u8 *aes_iv = (u8 *)CEPH_AES_IV; -int ceph_aes_encrypt(const void *key, int key_len, void *dst, size_t *dst_len, - const void *src, size_t src_len) +static int ceph_aes_encrypt(const void *key, int key_len, + void *dst, size_t *dst_len, + const void *src, size_t src_len) { struct scatterlist sg_in[2], sg_out[1]; struct crypto_blkcipher *tfm = ceph_crypto_alloc_cipher(); @@ -126,9 +127,10 @@ int ceph_aes_encrypt(const void *key, int key_len, void *dst, size_t *dst_len, return 0; } -int ceph_aes_encrypt2(const void *key, int key_len, void *dst, size_t *dst_len, - const void *src1, size_t src1_len, - const void *src2, size_t src2_len) +static int ceph_aes_encrypt2(const void *key, int key_len, void *dst, + size_t *dst_len, + const void *src1, size_t src1_len, + const void *src2, size_t src2_len) { struct scatterlist sg_in[3], sg_out[1]; struct crypto_blkcipher *tfm = ceph_crypto_alloc_cipher(); @@ -179,8 +181,9 @@ int ceph_aes_encrypt2(const void *key, int key_len, void *dst, size_t *dst_len, return 0; } -int ceph_aes_decrypt(const void *key, int key_len, void *dst, size_t *dst_len, - const void *src, size_t src_len) +static int ceph_aes_decrypt(const void *key, int key_len, + void *dst, size_t *dst_len, + const void *src, size_t src_len) { struct scatterlist sg_in[1], sg_out[2]; struct crypto_blkcipher *tfm = ceph_crypto_alloc_cipher(); @@ -238,10 +241,10 @@ int ceph_aes_decrypt(const void *key, int key_len, void *dst, size_t *dst_len, return 0; } -int ceph_aes_decrypt2(const void *key, int key_len, - void *dst1, size_t *dst1_len, - void *dst2, size_t *dst2_len, - const void *src, size_t src_len) +static int ceph_aes_decrypt2(const void *key, int key_len, + void *dst1, size_t *dst1_len, + void *dst2, size_t *dst2_len, + const void *src, size_t src_len) { struct scatterlist sg_in[1], sg_out[3]; struct crypto_blkcipher *tfm = ceph_crypto_alloc_cipher(); diff --git a/fs/ceph/crypto.h b/fs/ceph/crypto.h index 40b502e6bd8..bdf38607323 100644 --- a/fs/ceph/crypto.h +++ b/fs/ceph/crypto.h @@ -42,7 +42,7 @@ extern int ceph_encrypt2(struct ceph_crypto_key *secret, const void *src2, size_t src2_len); /* armor.c */ -extern int ceph_armor(char *dst, const void *src, const void *end); -extern int ceph_unarmor(void *dst, const char *src, const char *end); +extern int ceph_armor(char *dst, const char *src, const char *end); +extern int ceph_unarmor(char *dst, const char *src, const char *end); #endif diff --git a/fs/ceph/debugfs.c b/fs/ceph/debugfs.c index f7048da92ac..360c4f22718 100644 --- a/fs/ceph/debugfs.c +++ b/fs/ceph/debugfs.c @@ -113,7 +113,7 @@ static int osdmap_show(struct seq_file *s, void *p) static int monc_show(struct seq_file *s, void *p) { struct ceph_client *client = s->private; - struct ceph_mon_statfs_request *req; + struct ceph_mon_generic_request *req; struct ceph_mon_client *monc = &client->monc; struct rb_node *rp; @@ -126,9 +126,14 @@ static int monc_show(struct seq_file *s, void *p) if (monc->want_next_osdmap) seq_printf(s, "want next osdmap\n"); - for (rp = rb_first(&monc->statfs_request_tree); rp; rp = rb_next(rp)) { - req = rb_entry(rp, struct ceph_mon_statfs_request, node); - seq_printf(s, "%lld statfs\n", req->tid); + for (rp = rb_first(&monc->generic_request_tree); rp; rp = rb_next(rp)) { + __u16 op; + req = rb_entry(rp, struct ceph_mon_generic_request, node); + op = le16_to_cpu(req->request->hdr.type); + if (op == CEPH_MSG_STATFS) + seq_printf(s, "%lld statfs\n", req->tid); + else + seq_printf(s, "%lld unknown\n", req->tid); } mutex_unlock(&monc->mutex); @@ -256,7 +261,7 @@ static int osdc_show(struct seq_file *s, void *pp) static int caps_show(struct seq_file *s, void *p) { - struct ceph_client *client = p; + struct ceph_client *client = s->private; int total, avail, used, reserved, min; ceph_reservation_status(client, &total, &avail, &used, &reserved, &min); @@ -286,7 +291,7 @@ static int dentry_lru_show(struct seq_file *s, void *ptr) return 0; } -#define DEFINE_SHOW_FUNC(name) \ +#define DEFINE_SHOW_FUNC(name) \ static int name##_open(struct inode *inode, struct file *file) \ { \ struct seq_file *sf; \ @@ -356,8 +361,8 @@ int ceph_debugfs_client_init(struct ceph_client *client) int ret = 0; char name[80]; - snprintf(name, sizeof(name), FSID_FORMAT ".client%lld", - PR_FSID(&client->fsid), client->monc.auth->global_id); + snprintf(name, sizeof(name), "%pU.client%lld", &client->fsid, + client->monc.auth->global_id); client->debugfs_dir = debugfs_create_dir(name, ceph_debugfs_dir); if (!client->debugfs_dir) @@ -427,11 +432,12 @@ int ceph_debugfs_client_init(struct ceph_client *client) if (!client->debugfs_caps) goto out; - client->debugfs_congestion_kb = debugfs_create_file("writeback_congestion_kb", - 0600, - client->debugfs_dir, - client, - &congestion_kb_fops); + client->debugfs_congestion_kb = + debugfs_create_file("writeback_congestion_kb", + 0600, + client->debugfs_dir, + client, + &congestion_kb_fops); if (!client->debugfs_congestion_kb) goto out; @@ -461,7 +467,7 @@ void ceph_debugfs_client_cleanup(struct ceph_client *client) debugfs_remove(client->debugfs_dir); } -#else // CONFIG_DEBUG_FS +#else /* CONFIG_DEBUG_FS */ int __init ceph_debugfs_init(void) { @@ -481,4 +487,4 @@ void ceph_debugfs_client_cleanup(struct ceph_client *client) { } -#endif // CONFIG_DEBUG_FS +#endif /* CONFIG_DEBUG_FS */ diff --git a/fs/ceph/decode.h b/fs/ceph/decode.h index 65b3e022eaf..3d25415afe6 100644 --- a/fs/ceph/decode.h +++ b/fs/ceph/decode.h @@ -99,11 +99,13 @@ static inline void ceph_encode_timespec(struct ceph_timespec *tv, */ static inline void ceph_encode_addr(struct ceph_entity_addr *a) { - a->in_addr.ss_family = htons(a->in_addr.ss_family); + __be16 ss_family = htons(a->in_addr.ss_family); + a->in_addr.ss_family = *(__u16 *)&ss_family; } static inline void ceph_decode_addr(struct ceph_entity_addr *a) { - a->in_addr.ss_family = ntohs(a->in_addr.ss_family); + __be16 ss_family = *(__be16 *)&a->in_addr.ss_family; + a->in_addr.ss_family = ntohs(ss_family); WARN_ON(a->in_addr.ss_family == 512); } diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c index 650d2db5ed2..67bbb41d552 100644 --- a/fs/ceph/dir.c +++ b/fs/ceph/dir.c @@ -27,7 +27,7 @@ const struct inode_operations ceph_dir_iops; const struct file_operations ceph_dir_fops; -struct dentry_operations ceph_dentry_ops; +const struct dentry_operations ceph_dentry_ops; /* * Initialize ceph dentry state. @@ -51,8 +51,11 @@ int ceph_init_dentry(struct dentry *dentry) return -ENOMEM; /* oh well */ spin_lock(&dentry->d_lock); - if (dentry->d_fsdata) /* lost a race */ + if (dentry->d_fsdata) { + /* lost a race */ + kmem_cache_free(ceph_dentry_cachep, di); goto out_unlock; + } di->dentry = dentry; di->lease_session = NULL; dentry->d_fsdata = di; @@ -91,6 +94,8 @@ static unsigned fpos_off(loff_t p) */ static int __dcache_readdir(struct file *filp, void *dirent, filldir_t filldir) + __releases(inode->i_lock) + __acquires(inode->i_lock) { struct inode *inode = filp->f_dentry->d_inode; struct ceph_file_info *fi = filp->private_data; @@ -125,7 +130,8 @@ more: dentry = list_entry(p, struct dentry, d_u.d_child); di = ceph_dentry(dentry); while (1) { - dout(" p %p/%p d_subdirs %p/%p\n", p->prev, p->next, + dout(" p %p/%p %s d_subdirs %p/%p\n", p->prev, p->next, + d_unhashed(dentry) ? "!hashed" : "hashed", parent->d_subdirs.prev, parent->d_subdirs.next); if (p == &parent->d_subdirs) { fi->at_end = 1; @@ -229,6 +235,7 @@ static int ceph_readdir(struct file *filp, void *dirent, filldir_t filldir) u32 ftype; struct ceph_mds_reply_info_parsed *rinfo; const int max_entries = client->mount_args->max_readdir; + const int max_bytes = client->mount_args->max_readdir_bytes; dout("readdir %p filp %p frag %u off %u\n", inode, filp, frag, off); if (fi->at_end) @@ -261,6 +268,7 @@ static int ceph_readdir(struct file *filp, void *dirent, filldir_t filldir) spin_lock(&inode->i_lock); if ((filp->f_pos == 2 || fi->dentry) && !ceph_test_opt(client, NOASYNCREADDIR) && + ceph_snap(inode) != CEPH_SNAPDIR && (ci->i_ceph_flags & CEPH_I_COMPLETE) && __ceph_caps_issued_mask(ci, CEPH_CAP_FILE_SHARED, 1)) { err = __dcache_readdir(filp, dirent, filldir); @@ -312,6 +320,7 @@ more: req->r_readdir_offset = fi->next_offset; req->r_args.readdir.frag = cpu_to_le32(frag); req->r_args.readdir.max_entries = cpu_to_le32(max_entries); + req->r_args.readdir.max_bytes = cpu_to_le32(max_bytes); req->r_num_caps = max_entries + 1; err = ceph_mdsc_do_request(mdsc, NULL, req); if (err < 0) { @@ -335,7 +344,7 @@ more: if (req->r_reply_info.dir_end) { kfree(fi->last_name); fi->last_name = NULL; - fi->next_offset = 0; + fi->next_offset = 2; } else { rinfo = &req->r_reply_info; err = note_last_dentry(fi, @@ -478,7 +487,7 @@ static loff_t ceph_dir_llseek(struct file *file, loff_t offset, int origin) struct dentry *ceph_finish_lookup(struct ceph_mds_request *req, struct dentry *dentry, int err) { - struct ceph_client *client = ceph_client(dentry->d_sb); + struct ceph_client *client = ceph_sb_to_client(dentry->d_sb); struct inode *parent = dentry->d_parent->d_inode; /* .snap dir? */ @@ -568,7 +577,6 @@ static struct dentry *ceph_lookup(struct inode *dir, struct dentry *dentry, !is_root_ceph_dentry(dir, dentry) && (ci->i_ceph_flags & CEPH_I_COMPLETE) && (__ceph_caps_issued_mask(ci, CEPH_CAP_FILE_SHARED, 1))) { - di->offset = ci->i_max_offset++; spin_unlock(&dir->i_lock); dout(" dir %p complete, -ENOENT\n", dir); d_add(dentry, NULL); @@ -582,7 +590,7 @@ static struct dentry *ceph_lookup(struct inode *dir, struct dentry *dentry, CEPH_MDS_OP_LOOKUPSNAP : CEPH_MDS_OP_LOOKUP; req = ceph_mdsc_create_request(mdsc, op, USE_ANY_MDS); if (IS_ERR(req)) - return ERR_PTR(PTR_ERR(req)); + return ERR_CAST(req); req->r_dentry = dget(dentry); req->r_num_caps = 2; /* we only need inode linkage */ @@ -888,13 +896,22 @@ static int ceph_rename(struct inode *old_dir, struct dentry *old_dentry, /* ensure target dentry is invalidated, despite rehashing bug in vfs_rename_dir */ - new_dentry->d_time = jiffies; - ceph_dentry(new_dentry)->lease_shared_gen = 0; + ceph_invalidate_dentry_lease(new_dentry); } ceph_mdsc_put_request(req); return err; } +/* + * Ensure a dentry lease will no longer revalidate. + */ +void ceph_invalidate_dentry_lease(struct dentry *dentry) +{ + spin_lock(&dentry->d_lock); + dentry->d_time = jiffies; + ceph_dentry(dentry)->lease_shared_gen = 0; + spin_unlock(&dentry->d_lock); +} /* * Check if dentry lease is valid. If not, delete the lease. Try to @@ -972,8 +989,9 @@ static int ceph_d_revalidate(struct dentry *dentry, struct nameidata *nd) { struct inode *dir = dentry->d_parent->d_inode; - dout("d_revalidate %p '%.*s' inode %p\n", dentry, - dentry->d_name.len, dentry->d_name.name, dentry->d_inode); + dout("d_revalidate %p '%.*s' inode %p offset %lld\n", dentry, + dentry->d_name.len, dentry->d_name.name, dentry->d_inode, + ceph_dentry(dentry)->offset); /* always trust cached snapped dentries, snapdir dentry */ if (ceph_snap(dir) != CEPH_NOSNAP) { @@ -998,18 +1016,22 @@ out_touch: /* * When a dentry is released, clear the dir I_COMPLETE if it was part - * of the current dir gen. + * of the current dir gen or if this is in the snapshot namespace. */ static void ceph_dentry_release(struct dentry *dentry) { struct ceph_dentry_info *di = ceph_dentry(dentry); struct inode *parent_inode = dentry->d_parent->d_inode; + u64 snapid = ceph_snap(parent_inode); - if (parent_inode) { + dout("dentry_release %p parent %p\n", dentry, parent_inode); + + if (parent_inode && snapid != CEPH_SNAPDIR) { struct ceph_inode_info *ci = ceph_inode(parent_inode); spin_lock(&parent_inode->i_lock); - if (ci->i_shared_gen == di->lease_shared_gen) { + if (ci->i_shared_gen == di->lease_shared_gen || + snapid <= CEPH_MAXSNAP) { dout(" clearing %p complete (d_release)\n", parent_inode); ci->i_ceph_flags &= ~CEPH_I_COMPLETE; @@ -1050,7 +1072,7 @@ static ssize_t ceph_read_dir(struct file *file, char __user *buf, size_t size, struct ceph_inode_info *ci = ceph_inode(inode); int left; - if (!ceph_test_opt(ceph_client(inode->i_sb), DIRSTAT)) + if (!ceph_test_opt(ceph_sb_to_client(inode->i_sb), DIRSTAT)) return -EISDIR; if (!cf->dir_info) { @@ -1092,10 +1114,9 @@ static ssize_t ceph_read_dir(struct file *file, char __user *buf, size_t size, * an fsync() on a dir will wait for any uncommitted directory * operations to commit. */ -static int ceph_dir_fsync(struct file *file, struct dentry *dentry, - int datasync) +static int ceph_dir_fsync(struct file *file, int datasync) { - struct inode *inode = dentry->d_inode; + struct inode *inode = file->f_path.dentry->d_inode; struct ceph_inode_info *ci = ceph_inode(inode); struct list_head *head = &ci->i_unsafe_dirops; struct ceph_mds_request *req; @@ -1152,7 +1173,7 @@ void ceph_dentry_lru_add(struct dentry *dn) dout("dentry_lru_add %p %p '%.*s'\n", di, dn, dn->d_name.len, dn->d_name.name); if (di) { - mdsc = &ceph_client(dn->d_sb)->mdsc; + mdsc = &ceph_sb_to_client(dn->d_sb)->mdsc; spin_lock(&mdsc->dentry_lru_lock); list_add_tail(&di->lru, &mdsc->dentry_lru); mdsc->num_dentry++; @@ -1165,10 +1186,10 @@ void ceph_dentry_lru_touch(struct dentry *dn) struct ceph_dentry_info *di = ceph_dentry(dn); struct ceph_mds_client *mdsc; - dout("dentry_lru_touch %p %p '%.*s'\n", di, dn, - dn->d_name.len, dn->d_name.name); + dout("dentry_lru_touch %p %p '%.*s' (offset %lld)\n", di, dn, + dn->d_name.len, dn->d_name.name, di->offset); if (di) { - mdsc = &ceph_client(dn->d_sb)->mdsc; + mdsc = &ceph_sb_to_client(dn->d_sb)->mdsc; spin_lock(&mdsc->dentry_lru_lock); list_move_tail(&di->lru, &mdsc->dentry_lru); spin_unlock(&mdsc->dentry_lru_lock); @@ -1183,7 +1204,7 @@ void ceph_dentry_lru_del(struct dentry *dn) dout("dentry_lru_del %p %p '%.*s'\n", di, dn, dn->d_name.len, dn->d_name.name); if (di) { - mdsc = &ceph_client(dn->d_sb)->mdsc; + mdsc = &ceph_sb_to_client(dn->d_sb)->mdsc; spin_lock(&mdsc->dentry_lru_lock); list_del_init(&di->lru); mdsc->num_dentry--; @@ -1220,14 +1241,16 @@ const struct inode_operations ceph_dir_iops = { .create = ceph_create, }; -struct dentry_operations ceph_dentry_ops = { +const struct dentry_operations ceph_dentry_ops = { .d_revalidate = ceph_d_revalidate, .d_release = ceph_dentry_release, }; -struct dentry_operations ceph_snapdir_dentry_ops = { +const struct dentry_operations ceph_snapdir_dentry_ops = { .d_revalidate = ceph_snapdir_d_revalidate, + .d_release = ceph_dentry_release, }; -struct dentry_operations ceph_snap_dentry_ops = { +const struct dentry_operations ceph_snap_dentry_ops = { + .d_release = ceph_dentry_release, }; diff --git a/fs/ceph/export.c b/fs/ceph/export.c index 9d67572fb32..4480cb1c63e 100644 --- a/fs/ceph/export.c +++ b/fs/ceph/export.c @@ -93,11 +93,11 @@ static struct dentry *__fh_to_dentry(struct super_block *sb, return ERR_PTR(-ESTALE); dentry = d_obtain_alias(inode); - if (!dentry) { + if (IS_ERR(dentry)) { pr_err("fh_to_dentry %llx -- inode %p but ENOMEM\n", fh->ino, inode); iput(inode); - return ERR_PTR(-ENOMEM); + return dentry; } err = ceph_init_dentry(dentry); @@ -115,7 +115,7 @@ static struct dentry *__fh_to_dentry(struct super_block *sb, static struct dentry *__cfh_to_dentry(struct super_block *sb, struct ceph_nfs_confh *cfh) { - struct ceph_mds_client *mdsc = &ceph_client(sb)->mdsc; + struct ceph_mds_client *mdsc = &ceph_sb_to_client(sb)->mdsc; struct inode *inode; struct dentry *dentry; struct ceph_vino vino; @@ -133,7 +133,7 @@ static struct dentry *__cfh_to_dentry(struct super_block *sb, req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_LOOKUPHASH, USE_ANY_MDS); if (IS_ERR(req)) - return ERR_PTR(PTR_ERR(req)); + return ERR_CAST(req); req->r_ino1 = vino; req->r_ino2.ino = cfh->parent_ino; @@ -149,11 +149,11 @@ static struct dentry *__cfh_to_dentry(struct super_block *sb, } dentry = d_obtain_alias(inode); - if (!dentry) { + if (IS_ERR(dentry)) { pr_err("cfh_to_dentry %llx -- inode %p but ENOMEM\n", cfh->ino, inode); iput(inode); - return ERR_PTR(-ENOMEM); + return dentry; } err = ceph_init_dentry(dentry); if (err < 0) { @@ -202,11 +202,11 @@ static struct dentry *ceph_fh_to_parent(struct super_block *sb, return ERR_PTR(-ESTALE); dentry = d_obtain_alias(inode); - if (!dentry) { + if (IS_ERR(dentry)) { pr_err("fh_to_parent %llx -- inode %p but ENOMEM\n", cfh->ino, inode); iput(inode); - return ERR_PTR(-ENOMEM); + return dentry; } err = ceph_init_dentry(dentry); if (err < 0) { diff --git a/fs/ceph/file.c b/fs/ceph/file.c index ed6f19721d6..8c044a4f045 100644 --- a/fs/ceph/file.c +++ b/fs/ceph/file.c @@ -230,7 +230,7 @@ struct dentry *ceph_lookup_open(struct inode *dir, struct dentry *dentry, /* do the open */ req = prepare_open_request(dir->i_sb, flags, mode); if (IS_ERR(req)) - return ERR_PTR(PTR_ERR(req)); + return ERR_CAST(req); req->r_dentry = dget(dentry); req->r_num_caps = 2; if (flags & O_CREAT) { @@ -265,7 +265,7 @@ int ceph_release(struct inode *inode, struct file *file) kmem_cache_free(ceph_file_cachep, cf); /* wake up anyone waiting for caps on this inode */ - wake_up(&ci->i_cap_wq); + wake_up_all(&ci->i_cap_wq); return 0; } @@ -317,16 +317,16 @@ void ceph_release_page_vector(struct page **pages, int num_pages) /* * allocate a vector new pages */ -static struct page **alloc_page_vector(int num_pages) +static struct page **ceph_alloc_page_vector(int num_pages, gfp_t flags) { struct page **pages; int i; - pages = kmalloc(sizeof(*pages) * num_pages, GFP_NOFS); + pages = kmalloc(sizeof(*pages) * num_pages, flags); if (!pages) return ERR_PTR(-ENOMEM); for (i = 0; i < num_pages; i++) { - pages[i] = alloc_page(GFP_NOFS); + pages[i] = __page_cache_alloc(flags); if (pages[i] == NULL) { ceph_release_page_vector(pages, i); return ERR_PTR(-ENOMEM); @@ -540,7 +540,7 @@ static ssize_t ceph_sync_read(struct file *file, char __user *data, * in sequence. */ } else { - pages = alloc_page_vector(num_pages); + pages = ceph_alloc_page_vector(num_pages, GFP_NOFS); } if (IS_ERR(pages)) return PTR_ERR(pages); @@ -649,8 +649,8 @@ more: do_sync, ci->i_truncate_seq, ci->i_truncate_size, &mtime, false, 2); - if (IS_ERR(req)) - return PTR_ERR(req); + if (!req) + return -ENOMEM; num_pages = calc_pages_for(pos, len); @@ -665,10 +665,10 @@ more: * throw out any page cache pages in this range. this * may block. */ - truncate_inode_pages_range(inode->i_mapping, pos, + truncate_inode_pages_range(inode->i_mapping, pos, (pos+len) | (PAGE_CACHE_SIZE-1)); } else { - pages = alloc_page_vector(num_pages); + pages = ceph_alloc_page_vector(num_pages, GFP_NOFS); if (IS_ERR(pages)) { ret = PTR_ERR(pages); goto out; @@ -740,28 +740,32 @@ static ssize_t ceph_aio_read(struct kiocb *iocb, const struct iovec *iov, unsigned long nr_segs, loff_t pos) { struct file *filp = iocb->ki_filp; + struct ceph_file_info *fi = filp->private_data; loff_t *ppos = &iocb->ki_pos; size_t len = iov->iov_len; struct inode *inode = filp->f_dentry->d_inode; struct ceph_inode_info *ci = ceph_inode(inode); - void *base = iov->iov_base; + void __user *base = iov->iov_base; ssize_t ret; - int got = 0; + int want, got = 0; int checkeof = 0, read = 0; dout("aio_read %p %llx.%llx %llu~%u trying to get caps on %p\n", inode, ceph_vinop(inode), pos, (unsigned)len, inode); again: __ceph_do_pending_vmtruncate(inode); - ret = ceph_get_caps(ci, CEPH_CAP_FILE_RD, CEPH_CAP_FILE_CACHE, - &got, -1); + if (fi->fmode & CEPH_FILE_MODE_LAZY) + want = CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO; + else + want = CEPH_CAP_FILE_CACHE; + ret = ceph_get_caps(ci, CEPH_CAP_FILE_RD, want, &got, -1); if (ret < 0) goto out; dout("aio_read %p %llx.%llx %llu~%u got cap refs on %s\n", inode, ceph_vinop(inode), pos, (unsigned)len, ceph_cap_string(got)); - if ((got & CEPH_CAP_FILE_CACHE) == 0 || + if ((got & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) == 0 || (iocb->ki_filp->f_flags & O_DIRECT) || (inode->i_sb->s_flags & MS_SYNCHRONOUS)) /* hmm, this isn't really async... */ @@ -807,11 +811,12 @@ static ssize_t ceph_aio_write(struct kiocb *iocb, const struct iovec *iov, unsigned long nr_segs, loff_t pos) { struct file *file = iocb->ki_filp; + struct ceph_file_info *fi = file->private_data; struct inode *inode = file->f_dentry->d_inode; struct ceph_inode_info *ci = ceph_inode(inode); - struct ceph_osd_client *osdc = &ceph_client(inode->i_sb)->osdc; + struct ceph_osd_client *osdc = &ceph_sb_to_client(inode->i_sb)->osdc; loff_t endoff = pos + iov->iov_len; - int got = 0; + int want, got = 0; int ret, err; if (ceph_snap(inode) != CEPH_NOSNAP) @@ -824,8 +829,11 @@ retry_snap: dout("aio_write %p %llx.%llx %llu~%u getting caps. i_size %llu\n", inode, ceph_vinop(inode), pos, (unsigned)iov->iov_len, inode->i_size); - ret = ceph_get_caps(ci, CEPH_CAP_FILE_WR, CEPH_CAP_FILE_BUFFER, - &got, endoff); + if (fi->fmode & CEPH_FILE_MODE_LAZY) + want = CEPH_CAP_FILE_BUFFER | CEPH_CAP_FILE_LAZYIO; + else + want = CEPH_CAP_FILE_BUFFER; + ret = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, &got, endoff); if (ret < 0) goto out; @@ -833,7 +841,7 @@ retry_snap: inode, ceph_vinop(inode), pos, (unsigned)iov->iov_len, ceph_cap_string(got)); - if ((got & CEPH_CAP_FILE_BUFFER) == 0 || + if ((got & (CEPH_CAP_FILE_BUFFER|CEPH_CAP_FILE_LAZYIO)) == 0 || (iocb->ki_filp->f_flags & O_DIRECT) || (inode->i_sb->s_flags & MS_SYNCHRONOUS)) { ret = ceph_sync_write(file, iov->iov_base, iov->iov_len, @@ -844,8 +852,7 @@ retry_snap: if ((ret >= 0 || ret == -EIOCBQUEUED) && ((file->f_flags & O_SYNC) || IS_SYNC(file->f_mapping->host) || ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_NEARFULL))) { - err = vfs_fsync_range(file, file->f_path.dentry, - pos, pos + ret - 1, 1); + err = vfs_fsync_range(file, pos, pos + ret - 1, 1); if (err < 0) ret = err; } @@ -931,6 +938,8 @@ const struct file_operations ceph_file_fops = { .aio_write = ceph_aio_write, .mmap = ceph_mmap, .fsync = ceph_fsync, + .lock = ceph_lock, + .flock = ceph_flock, .splice_read = generic_file_splice_read, .splice_write = generic_file_splice_write, .unlocked_ioctl = ceph_ioctl, diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c index 261f3e6c0bc..5d893d31e39 100644 --- a/fs/ceph/inode.c +++ b/fs/ceph/inode.c @@ -69,7 +69,7 @@ struct inode *ceph_get_snapdir(struct inode *parent) BUG_ON(!S_ISDIR(parent->i_mode)); if (IS_ERR(inode)) - return ERR_PTR(PTR_ERR(inode)); + return inode; inode->i_mode = parent->i_mode; inode->i_uid = parent->i_uid; inode->i_gid = parent->i_gid; @@ -384,7 +384,7 @@ void ceph_destroy_inode(struct inode *inode) */ if (ci->i_snap_realm) { struct ceph_mds_client *mdsc = - &ceph_client(ci->vfs_inode.i_sb)->mdsc; + &ceph_sb_to_client(ci->vfs_inode.i_sb)->mdsc; struct ceph_snap_realm *realm = ci->i_snap_realm; dout(" dropping residual ref to snap realm %p\n", realm); @@ -442,8 +442,9 @@ int ceph_fill_file_size(struct inode *inode, int issued, * the file is either opened or mmaped */ if ((issued & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_RD| - CEPH_CAP_FILE_WR|CEPH_CAP_FILE_BUFFER| - CEPH_CAP_FILE_EXCL)) || + CEPH_CAP_FILE_WR|CEPH_CAP_FILE_BUFFER| + CEPH_CAP_FILE_EXCL| + CEPH_CAP_FILE_LAZYIO)) || mapping_mapped(inode->i_mapping) || __ceph_caps_file_wanted(ci)) { ci->i_truncate_pending++; @@ -619,11 +620,12 @@ static int fill_inode(struct inode *inode, memcpy(ci->i_xattrs.blob->vec.iov_base, iinfo->xattr_data, iinfo->xattr_len); ci->i_xattrs.version = le64_to_cpu(info->xattr_version); + xattr_blob = NULL; } inode->i_mapping->a_ops = &ceph_aops; inode->i_mapping->backing_dev_info = - &ceph_client(inode->i_sb)->backing_dev_info; + &ceph_sb_to_client(inode->i_sb)->backing_dev_info; switch (inode->i_mode & S_IFMT) { case S_IFIFO: @@ -674,14 +676,15 @@ static int fill_inode(struct inode *inode, /* set dir completion flag? */ if (ci->i_files == 0 && ci->i_subdirs == 0 && ceph_snap(inode) == CEPH_NOSNAP && - (le32_to_cpu(info->cap.caps) & CEPH_CAP_FILE_SHARED)) { + (le32_to_cpu(info->cap.caps) & CEPH_CAP_FILE_SHARED) && + (ci->i_ceph_flags & CEPH_I_COMPLETE) == 0) { dout(" marking %p complete (empty)\n", inode); ci->i_ceph_flags |= CEPH_I_COMPLETE; ci->i_max_offset = 2; } /* it may be better to set st_size in getattr instead? */ - if (ceph_test_opt(ceph_client(inode->i_sb), RBYTES)) + if (ceph_test_opt(ceph_sb_to_client(inode->i_sb), RBYTES)) inode->i_size = ci->i_rbytes; break; default: @@ -733,6 +736,10 @@ no_change: __ceph_get_fmode(ci, cap_fmode); spin_unlock(&inode->i_lock); } + } else if (cap_fmode >= 0) { + pr_warning("mds issued no caps on %llx.%llx\n", + ceph_vinop(inode)); + __ceph_get_fmode(ci, cap_fmode); } /* update delegation info? */ @@ -798,6 +805,37 @@ out_unlock: } /* + * Set dentry's directory position based on the current dir's max, and + * order it in d_subdirs, so that dcache_readdir behaves. + */ +static void ceph_set_dentry_offset(struct dentry *dn) +{ + struct dentry *dir = dn->d_parent; + struct inode *inode = dn->d_parent->d_inode; + struct ceph_dentry_info *di; + + BUG_ON(!inode); + + di = ceph_dentry(dn); + + spin_lock(&inode->i_lock); + if ((ceph_inode(inode)->i_ceph_flags & CEPH_I_COMPLETE) == 0) { + spin_unlock(&inode->i_lock); + return; + } + di->offset = ceph_inode(inode)->i_max_offset++; + spin_unlock(&inode->i_lock); + + spin_lock(&dcache_lock); + spin_lock(&dn->d_lock); + list_move(&dn->d_u.d_child, &dir->d_subdirs); + dout("set_dentry_offset %p %lld (%p %p)\n", dn, di->offset, + dn->d_u.d_child.prev, dn->d_u.d_child.next); + spin_unlock(&dn->d_lock); + spin_unlock(&dcache_lock); +} + +/* * splice a dentry to an inode. * caller must hold directory i_mutex for this to be safe. * @@ -810,13 +848,15 @@ static struct dentry *splice_dentry(struct dentry *dn, struct inode *in, { struct dentry *realdn; + BUG_ON(dn->d_inode); + /* dn must be unhashed */ if (!d_unhashed(dn)) d_drop(dn); realdn = d_materialise_unique(dn, in); if (IS_ERR(realdn)) { - pr_err("splice_dentry error %p inode %p ino %llx.%llx\n", - dn, in, ceph_vinop(in)); + pr_err("splice_dentry error %ld %p inode %p ino %llx.%llx\n", + PTR_ERR(realdn), dn, in, ceph_vinop(in)); if (prehash) *prehash = false; /* don't rehash on error */ dn = realdn; /* note realdn contains the error */ @@ -831,44 +871,17 @@ static struct dentry *splice_dentry(struct dentry *dn, struct inode *in, dn = realdn; } else { BUG_ON(!ceph_dentry(dn)); - dout("dn %p attached to %p ino %llx.%llx\n", dn, dn->d_inode, ceph_vinop(dn->d_inode)); } if ((!prehash || *prehash) && d_unhashed(dn)) d_rehash(dn); + ceph_set_dentry_offset(dn); out: return dn; } /* - * Set dentry's directory position based on the current dir's max, and - * order it in d_subdirs, so that dcache_readdir behaves. - */ -static void ceph_set_dentry_offset(struct dentry *dn) -{ - struct dentry *dir = dn->d_parent; - struct inode *inode = dn->d_parent->d_inode; - struct ceph_dentry_info *di; - - BUG_ON(!inode); - - di = ceph_dentry(dn); - - spin_lock(&inode->i_lock); - di->offset = ceph_inode(inode)->i_max_offset++; - spin_unlock(&inode->i_lock); - - spin_lock(&dcache_lock); - spin_lock(&dn->d_lock); - list_move_tail(&dir->d_subdirs, &dn->d_u.d_child); - dout("set_dentry_offset %p %lld (%p %p)\n", dn, di->offset, - dn->d_u.d_child.prev, dn->d_u.d_child.next); - spin_unlock(&dn->d_lock); - spin_unlock(&dcache_lock); -} - -/* * Incorporate results into the local cache. This is either just * one inode, or a directory, dentry, and possibly linked-to inode (e.g., * after a lookup). @@ -929,14 +942,8 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req, if (!rinfo->head->is_target && !rinfo->head->is_dentry) { dout("fill_trace reply is empty!\n"); - if (rinfo->head->result == 0 && req->r_locked_dir) { - struct ceph_inode_info *ci = - ceph_inode(req->r_locked_dir); - dout(" clearing %p complete (empty trace)\n", - req->r_locked_dir); - ci->i_ceph_flags &= ~CEPH_I_COMPLETE; - ci->i_release_count++; - } + if (rinfo->head->result == 0 && req->r_locked_dir) + ceph_invalidate_dir_request(req); return 0; } @@ -1007,13 +1014,18 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req, req->r_old_dentry->d_name.len, req->r_old_dentry->d_name.name, dn, dn->d_name.len, dn->d_name.name); + /* ensure target dentry is invalidated, despite rehashing bug in vfs_rename_dir */ - dn->d_time = jiffies; - ceph_dentry(dn)->lease_shared_gen = 0; + ceph_invalidate_dentry_lease(dn); + /* take overwritten dentry's readdir offset */ + dout("dn %p gets %p offset %lld (old offset %lld)\n", + req->r_old_dentry, dn, ceph_dentry(dn)->offset, + ceph_dentry(req->r_old_dentry)->offset); ceph_dentry(req->r_old_dentry)->offset = ceph_dentry(dn)->offset; + dn = req->r_old_dentry; /* use old_dentry */ in = dn->d_inode; } @@ -1055,7 +1067,6 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req, goto done; } req->r_dentry = dn; /* may have spliced */ - ceph_set_dentry_offset(dn); igrab(in); } else if (ceph_ino(in) == vino.ino && ceph_snap(in) == vino.snap) { @@ -1098,7 +1109,6 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req, err = PTR_ERR(dn); goto done; } - ceph_set_dentry_offset(dn); req->r_dentry = dn; /* may have spliced */ igrab(in); rinfo->head->is_dentry = 1; /* fool notrace handlers */ @@ -1190,8 +1200,10 @@ retry_lookup: goto out; } err = ceph_init_dentry(dn); - if (err < 0) + if (err < 0) { + dput(dn); goto out; + } } else if (dn->d_inode && (ceph_ino(dn->d_inode) != vino.ino || ceph_snap(dn->d_inode) != vino.snap)) { @@ -1225,18 +1237,23 @@ retry_lookup: goto out; } dn = splice_dentry(dn, in, NULL); + if (IS_ERR(dn)) + dn = NULL; } if (fill_inode(in, &rinfo->dir_in[i], NULL, session, req->r_request_started, -1, &req->r_caps_reservation) < 0) { pr_err("fill_inode badness on %p\n", in); - dput(dn); - continue; + goto next_item; } - update_dentry_lease(dn, rinfo->dir_dlease[i], - req->r_session, req->r_request_started); - dput(dn); + if (dn) + update_dentry_lease(dn, rinfo->dir_dlease[i], + req->r_session, + req->r_request_started); +next_item: + if (dn) + dput(dn); } req->r_did_prepopulate = true; @@ -1425,7 +1442,7 @@ void ceph_queue_vmtruncate(struct inode *inode) { struct ceph_inode_info *ci = ceph_inode(inode); - if (queue_work(ceph_client(inode->i_sb)->trunc_wq, + if (queue_work(ceph_sb_to_client(inode->i_sb)->trunc_wq, &ci->i_vmtruncate_work)) { dout("ceph_queue_vmtruncate %p\n", inode); igrab(inode); @@ -1485,7 +1502,7 @@ retry: if (wrbuffer_refs == 0) ceph_check_caps(ci, CHECK_CAPS_AUTHONLY, NULL); if (wake) - wake_up(&ci->i_cap_wq); + wake_up_all(&ci->i_cap_wq); } @@ -1514,7 +1531,7 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr) struct inode *parent_inode = dentry->d_parent->d_inode; const unsigned int ia_valid = attr->ia_valid; struct ceph_mds_request *req; - struct ceph_mds_client *mdsc = &ceph_client(dentry->d_sb)->mdsc; + struct ceph_mds_client *mdsc = &ceph_sb_to_client(dentry->d_sb)->mdsc; int issued; int release = 0, dirtied = 0; int mask = 0; diff --git a/fs/ceph/ioctl.c b/fs/ceph/ioctl.c index 8a5bcae6284..76e307d2aba 100644 --- a/fs/ceph/ioctl.c +++ b/fs/ceph/ioctl.c @@ -98,7 +98,7 @@ static long ceph_ioctl_get_dataloc(struct file *file, void __user *arg) struct ceph_ioctl_dataloc dl; struct inode *inode = file->f_dentry->d_inode; struct ceph_inode_info *ci = ceph_inode(inode); - struct ceph_osd_client *osdc = &ceph_client(inode->i_sb)->osdc; + struct ceph_osd_client *osdc = &ceph_sb_to_client(inode->i_sb)->osdc; u64 len = 1, olen; u64 tmp; struct ceph_object_layout ol; @@ -143,6 +143,27 @@ static long ceph_ioctl_get_dataloc(struct file *file, void __user *arg) return 0; } +static long ceph_ioctl_lazyio(struct file *file) +{ + struct ceph_file_info *fi = file->private_data; + struct inode *inode = file->f_dentry->d_inode; + struct ceph_inode_info *ci = ceph_inode(inode); + + if ((fi->fmode & CEPH_FILE_MODE_LAZY) == 0) { + spin_lock(&inode->i_lock); + ci->i_nr_by_mode[fi->fmode]--; + fi->fmode |= CEPH_FILE_MODE_LAZY; + ci->i_nr_by_mode[fi->fmode]++; + spin_unlock(&inode->i_lock); + dout("ioctl_layzio: file %p marked lazy\n", file); + + ceph_check_caps(ci, 0, NULL); + } else { + dout("ioctl_layzio: file %p already lazy\n", file); + } + return 0; +} + long ceph_ioctl(struct file *file, unsigned int cmd, unsigned long arg) { dout("ioctl file %p cmd %u arg %lu\n", file, cmd, arg); @@ -155,6 +176,9 @@ long ceph_ioctl(struct file *file, unsigned int cmd, unsigned long arg) case CEPH_IOC_GET_DATALOC: return ceph_ioctl_get_dataloc(file, (void __user *)arg); + + case CEPH_IOC_LAZYIO: + return ceph_ioctl_lazyio(file); } return -ENOTTY; } diff --git a/fs/ceph/ioctl.h b/fs/ceph/ioctl.h index 25e4f1a9d05..88451a3b685 100644 --- a/fs/ceph/ioctl.h +++ b/fs/ceph/ioctl.h @@ -37,4 +37,6 @@ struct ceph_ioctl_dataloc { #define CEPH_IOC_GET_DATALOC _IOWR(CEPH_IOCTL_MAGIC, 3, \ struct ceph_ioctl_dataloc) +#define CEPH_IOC_LAZYIO _IO(CEPH_IOCTL_MAGIC, 4) + #endif diff --git a/fs/ceph/locks.c b/fs/ceph/locks.c new file mode 100644 index 00000000000..ae85af06454 --- /dev/null +++ b/fs/ceph/locks.c @@ -0,0 +1,256 @@ +#include "ceph_debug.h" + +#include <linux/file.h> +#include <linux/namei.h> + +#include "super.h" +#include "mds_client.h" +#include "pagelist.h" + +/** + * Implement fcntl and flock locking functions. + */ +static int ceph_lock_message(u8 lock_type, u16 operation, struct file *file, + u64 pid, u64 pid_ns, + int cmd, u64 start, u64 length, u8 wait) +{ + struct inode *inode = file->f_dentry->d_inode; + struct ceph_mds_client *mdsc = + &ceph_sb_to_client(inode->i_sb)->mdsc; + struct ceph_mds_request *req; + int err; + + req = ceph_mdsc_create_request(mdsc, operation, USE_AUTH_MDS); + if (IS_ERR(req)) + return PTR_ERR(req); + req->r_inode = igrab(inode); + + dout("ceph_lock_message: rule: %d, op: %d, pid: %llu, start: %llu, " + "length: %llu, wait: %d, type`: %d", (int)lock_type, + (int)operation, pid, start, length, wait, cmd); + + req->r_args.filelock_change.rule = lock_type; + req->r_args.filelock_change.type = cmd; + req->r_args.filelock_change.pid = cpu_to_le64(pid); + /* This should be adjusted, but I'm not sure if + namespaces actually get id numbers*/ + req->r_args.filelock_change.pid_namespace = + cpu_to_le64((u64)pid_ns); + req->r_args.filelock_change.start = cpu_to_le64(start); + req->r_args.filelock_change.length = cpu_to_le64(length); + req->r_args.filelock_change.wait = wait; + + err = ceph_mdsc_do_request(mdsc, inode, req); + ceph_mdsc_put_request(req); + dout("ceph_lock_message: rule: %d, op: %d, pid: %llu, start: %llu, " + "length: %llu, wait: %d, type`: %d err code %d", (int)lock_type, + (int)operation, pid, start, length, wait, cmd, err); + return err; +} + +/** + * Attempt to set an fcntl lock. + * For now, this just goes away to the server. Later it may be more awesome. + */ +int ceph_lock(struct file *file, int cmd, struct file_lock *fl) +{ + u64 length; + u8 lock_cmd; + int err; + u8 wait = 0; + u16 op = CEPH_MDS_OP_SETFILELOCK; + + fl->fl_nspid = get_pid(task_tgid(current)); + dout("ceph_lock, fl_pid:%d", fl->fl_pid); + + /* set wait bit as appropriate, then make command as Ceph expects it*/ + if (F_SETLKW == cmd) + wait = 1; + if (F_GETLK == cmd) + op = CEPH_MDS_OP_GETFILELOCK; + + if (F_RDLCK == fl->fl_type) + lock_cmd = CEPH_LOCK_SHARED; + else if (F_WRLCK == fl->fl_type) + lock_cmd = CEPH_LOCK_EXCL; + else + lock_cmd = CEPH_LOCK_UNLOCK; + + if (LLONG_MAX == fl->fl_end) + length = 0; + else + length = fl->fl_end - fl->fl_start + 1; + + err = ceph_lock_message(CEPH_LOCK_FCNTL, op, file, + (u64)fl->fl_pid, (u64)fl->fl_nspid, + lock_cmd, fl->fl_start, + length, wait); + if (!err) { + dout("mds locked, locking locally"); + err = posix_lock_file(file, fl, NULL); + if (err && (CEPH_MDS_OP_SETFILELOCK == op)) { + /* undo! This should only happen if the kernel detects + * local deadlock. */ + ceph_lock_message(CEPH_LOCK_FCNTL, op, file, + (u64)fl->fl_pid, (u64)fl->fl_nspid, + CEPH_LOCK_UNLOCK, fl->fl_start, + length, 0); + dout("got %d on posix_lock_file, undid lock", err); + } + } else { + dout("mds returned error code %d", err); + } + return err; +} + +int ceph_flock(struct file *file, int cmd, struct file_lock *fl) +{ + u64 length; + u8 lock_cmd; + int err; + u8 wait = 1; + + fl->fl_nspid = get_pid(task_tgid(current)); + dout("ceph_flock, fl_pid:%d", fl->fl_pid); + + /* set wait bit, then clear it out of cmd*/ + if (cmd & LOCK_NB) + wait = 0; + cmd = cmd & (LOCK_SH | LOCK_EX | LOCK_UN); + /* set command sequence that Ceph wants to see: + shared lock, exclusive lock, or unlock */ + if (LOCK_SH == cmd) + lock_cmd = CEPH_LOCK_SHARED; + else if (LOCK_EX == cmd) + lock_cmd = CEPH_LOCK_EXCL; + else + lock_cmd = CEPH_LOCK_UNLOCK; + /* mds requires start and length rather than start and end */ + if (LLONG_MAX == fl->fl_end) + length = 0; + else + length = fl->fl_end - fl->fl_start + 1; + + err = ceph_lock_message(CEPH_LOCK_FLOCK, CEPH_MDS_OP_SETFILELOCK, + file, (u64)fl->fl_pid, (u64)fl->fl_nspid, + lock_cmd, fl->fl_start, + length, wait); + if (!err) { + err = flock_lock_file_wait(file, fl); + if (err) { + ceph_lock_message(CEPH_LOCK_FLOCK, + CEPH_MDS_OP_SETFILELOCK, + file, (u64)fl->fl_pid, + (u64)fl->fl_nspid, + CEPH_LOCK_UNLOCK, fl->fl_start, + length, 0); + dout("got %d on flock_lock_file_wait, undid lock", err); + } + } else { + dout("mds error code %d", err); + } + return err; +} + +/** + * Must be called with BKL already held. Fills in the passed + * counter variables, so you can prepare pagelist metadata before calling + * ceph_encode_locks. + */ +void ceph_count_locks(struct inode *inode, int *fcntl_count, int *flock_count) +{ + struct file_lock *lock; + + *fcntl_count = 0; + *flock_count = 0; + + for (lock = inode->i_flock; lock != NULL; lock = lock->fl_next) { + if (lock->fl_flags & FL_POSIX) + ++(*fcntl_count); + else if (lock->fl_flags & FL_FLOCK) + ++(*flock_count); + } + dout("counted %d flock locks and %d fcntl locks", + *flock_count, *fcntl_count); +} + +/** + * Encode the flock and fcntl locks for the given inode into the pagelist. + * Format is: #fcntl locks, sequential fcntl locks, #flock locks, + * sequential flock locks. + * Must be called with BLK already held, and the lock numbers should have + * been gathered under the same lock holding window. + */ +int ceph_encode_locks(struct inode *inode, struct ceph_pagelist *pagelist, + int num_fcntl_locks, int num_flock_locks) +{ + struct file_lock *lock; + struct ceph_filelock cephlock; + int err = 0; + + dout("encoding %d flock and %d fcntl locks", num_flock_locks, + num_fcntl_locks); + err = ceph_pagelist_append(pagelist, &num_fcntl_locks, sizeof(u32)); + if (err) + goto fail; + for (lock = inode->i_flock; lock != NULL; lock = lock->fl_next) { + if (lock->fl_flags & FL_POSIX) { + err = lock_to_ceph_filelock(lock, &cephlock); + if (err) + goto fail; + err = ceph_pagelist_append(pagelist, &cephlock, + sizeof(struct ceph_filelock)); + } + if (err) + goto fail; + } + + err = ceph_pagelist_append(pagelist, &num_flock_locks, sizeof(u32)); + if (err) + goto fail; + for (lock = inode->i_flock; lock != NULL; lock = lock->fl_next) { + if (lock->fl_flags & FL_FLOCK) { + err = lock_to_ceph_filelock(lock, &cephlock); + if (err) + goto fail; + err = ceph_pagelist_append(pagelist, &cephlock, + sizeof(struct ceph_filelock)); + } + if (err) + goto fail; + } +fail: + return err; +} + +/* + * Given a pointer to a lock, convert it to a ceph filelock + */ +int lock_to_ceph_filelock(struct file_lock *lock, + struct ceph_filelock *cephlock) +{ + int err = 0; + + cephlock->start = cpu_to_le64(lock->fl_start); + cephlock->length = cpu_to_le64(lock->fl_end - lock->fl_start + 1); + cephlock->client = cpu_to_le64(0); + cephlock->pid = cpu_to_le64(lock->fl_pid); + cephlock->pid_namespace = cpu_to_le64((u64)lock->fl_nspid); + + switch (lock->fl_type) { + case F_RDLCK: + cephlock->type = CEPH_LOCK_SHARED; + break; + case F_WRLCK: + cephlock->type = CEPH_LOCK_EXCL; + break; + case F_UNLCK: + cephlock->type = CEPH_LOCK_UNLOCK; + break; + default: + dout("Have unknown lock type %d", lock->fl_type); + err = -EINVAL; + } + + return err; +} diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c index 60a9a4ae47b..a75ddbf9fe3 100644 --- a/fs/ceph/mds_client.c +++ b/fs/ceph/mds_client.c @@ -3,6 +3,7 @@ #include <linux/wait.h> #include <linux/slab.h> #include <linux/sched.h> +#include <linux/smp_lock.h> #include "mds_client.h" #include "mon_client.h" @@ -37,10 +38,15 @@ * are no longer valid. */ +struct ceph_reconnect_state { + struct ceph_pagelist *pagelist; + bool flock; +}; + static void __wake_requests(struct ceph_mds_client *mdsc, struct list_head *head); -const static struct ceph_connection_operations mds_con_ops; +static const struct ceph_connection_operations mds_con_ops; /* @@ -449,7 +455,7 @@ void ceph_mdsc_release_request(struct kref *kref) kfree(req->r_path1); kfree(req->r_path2); put_request_session(req); - ceph_unreserve_caps(&req->r_caps_reservation); + ceph_unreserve_caps(req->r_mdsc, &req->r_caps_reservation); kfree(req); } @@ -512,7 +518,8 @@ static void __register_request(struct ceph_mds_client *mdsc, { req->r_tid = ++mdsc->last_tid; if (req->r_num_caps) - ceph_reserve_caps(&req->r_caps_reservation, req->r_num_caps); + ceph_reserve_caps(mdsc, &req->r_caps_reservation, + req->r_num_caps); dout("__register_request %p tid %lld\n", req, req->r_tid); ceph_mdsc_get_request(req); __insert_request(mdsc, req); @@ -665,10 +672,10 @@ static struct ceph_msg *create_session_msg(u32 op, u64 seq) struct ceph_msg *msg; struct ceph_mds_session_head *h; - msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h), 0, 0, NULL); - if (IS_ERR(msg)) { + msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h), GFP_NOFS); + if (!msg) { pr_err("create_session_msg ENOMEM creating msg\n"); - return ERR_PTR(PTR_ERR(msg)); + return NULL; } h = msg->front.iov_base; h->op = cpu_to_le32(op); @@ -687,7 +694,6 @@ static int __open_session(struct ceph_mds_client *mdsc, struct ceph_msg *msg; int mstate; int mds = session->s_mds; - int err = 0; /* wait for mds to go active? */ mstate = ceph_mdsmap_get_state(mdsc->mdsmap, mds); @@ -698,17 +704,58 @@ static int __open_session(struct ceph_mds_client *mdsc, /* send connect message */ msg = create_session_msg(CEPH_SESSION_REQUEST_OPEN, session->s_seq); - if (IS_ERR(msg)) { - err = PTR_ERR(msg); - goto out; - } + if (!msg) + return -ENOMEM; ceph_con_send(&session->s_con, msg); - -out: return 0; } /* + * open sessions for any export targets for the given mds + * + * called under mdsc->mutex + */ +static void __open_export_target_sessions(struct ceph_mds_client *mdsc, + struct ceph_mds_session *session) +{ + struct ceph_mds_info *mi; + struct ceph_mds_session *ts; + int i, mds = session->s_mds; + int target; + + if (mds >= mdsc->mdsmap->m_max_mds) + return; + mi = &mdsc->mdsmap->m_info[mds]; + dout("open_export_target_sessions for mds%d (%d targets)\n", + session->s_mds, mi->num_export_targets); + + for (i = 0; i < mi->num_export_targets; i++) { + target = mi->export_targets[i]; + ts = __ceph_lookup_mds_session(mdsc, target); + if (!ts) { + ts = register_session(mdsc, target); + if (IS_ERR(ts)) + return; + } + if (session->s_state == CEPH_MDS_SESSION_NEW || + session->s_state == CEPH_MDS_SESSION_CLOSING) + __open_session(mdsc, session); + else + dout(" mds%d target mds%d %p is %s\n", session->s_mds, + i, ts, session_state_name(ts->s_state)); + ceph_put_mds_session(ts); + } +} + +void ceph_mdsc_open_export_target_sessions(struct ceph_mds_client *mdsc, + struct ceph_mds_session *session) +{ + mutex_lock(&mdsc->mutex); + __open_export_target_sessions(mdsc, session); + mutex_unlock(&mdsc->mutex); +} + +/* * session caps */ @@ -736,9 +783,10 @@ static void cleanup_cap_releases(struct ceph_mds_session *session) } /* - * Helper to safely iterate over all caps associated with a session. + * Helper to safely iterate over all caps associated with a session, with + * special care taken to handle a racing __ceph_remove_cap(). * - * caller must hold session s_mutex + * Caller must hold session s_mutex. */ static int iterate_session_caps(struct ceph_mds_session *session, int (*cb)(struct inode *, struct ceph_cap *, @@ -768,7 +816,7 @@ static int iterate_session_caps(struct ceph_mds_session *session, last_inode = NULL; } if (old_cap) { - ceph_put_cap(old_cap); + ceph_put_cap(session->s_mdsc, old_cap); old_cap = NULL; } @@ -797,18 +845,55 @@ out: if (last_inode) iput(last_inode); if (old_cap) - ceph_put_cap(old_cap); + ceph_put_cap(session->s_mdsc, old_cap); return ret; } static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap, - void *arg) + void *arg) { struct ceph_inode_info *ci = ceph_inode(inode); + int drop = 0; + dout("removing cap %p, ci is %p, inode is %p\n", cap, ci, &ci->vfs_inode); - ceph_remove_cap(cap); + spin_lock(&inode->i_lock); + __ceph_remove_cap(cap); + if (!__ceph_is_any_real_caps(ci)) { + struct ceph_mds_client *mdsc = + &ceph_sb_to_client(inode->i_sb)->mdsc; + + spin_lock(&mdsc->cap_dirty_lock); + if (!list_empty(&ci->i_dirty_item)) { + pr_info(" dropping dirty %s state for %p %lld\n", + ceph_cap_string(ci->i_dirty_caps), + inode, ceph_ino(inode)); + ci->i_dirty_caps = 0; + list_del_init(&ci->i_dirty_item); + drop = 1; + } + if (!list_empty(&ci->i_flushing_item)) { + pr_info(" dropping dirty+flushing %s state for %p %lld\n", + ceph_cap_string(ci->i_flushing_caps), + inode, ceph_ino(inode)); + ci->i_flushing_caps = 0; + list_del_init(&ci->i_flushing_item); + mdsc->num_cap_flushing--; + drop = 1; + } + if (drop && ci->i_wrbuffer_ref) { + pr_info(" dropping dirty data for %p %lld\n", + inode, ceph_ino(inode)); + ci->i_wrbuffer_ref = 0; + ci->i_wrbuffer_ref_head = 0; + drop++; + } + spin_unlock(&mdsc->cap_dirty_lock); + } + spin_unlock(&inode->i_lock); + while (drop--) + iput(inode); return 0; } @@ -820,6 +905,7 @@ static void remove_session_caps(struct ceph_mds_session *session) dout("remove_session_caps on %p\n", session); iterate_session_caps(session, remove_session_caps_cb, NULL); BUG_ON(session->s_nr_caps > 0); + BUG_ON(!list_empty(&session->s_cap_flushing)); cleanup_cap_releases(session); } @@ -834,7 +920,7 @@ static int wake_up_session_cb(struct inode *inode, struct ceph_cap *cap, { struct ceph_inode_info *ci = ceph_inode(inode); - wake_up(&ci->i_cap_wq); + wake_up_all(&ci->i_cap_wq); if (arg) { spin_lock(&inode->i_lock); ci->i_wanted_max_size = 0; @@ -882,8 +968,8 @@ static int send_renew_caps(struct ceph_mds_client *mdsc, ceph_mds_state_name(state)); msg = create_session_msg(CEPH_SESSION_REQUEST_RENEWCAPS, ++session->s_renew_seq); - if (IS_ERR(msg)) - return PTR_ERR(msg); + if (!msg) + return -ENOMEM; ceph_con_send(&session->s_con, msg); return 0; } @@ -930,17 +1016,15 @@ static int request_close_session(struct ceph_mds_client *mdsc, struct ceph_mds_session *session) { struct ceph_msg *msg; - int err = 0; dout("request_close_session mds%d state %s seq %lld\n", session->s_mds, session_state_name(session->s_state), session->s_seq); msg = create_session_msg(CEPH_SESSION_REQUEST_CLOSE, session->s_seq); - if (IS_ERR(msg)) - err = PTR_ERR(msg); - else - ceph_con_send(&session->s_con, msg); - return err; + if (!msg) + return -ENOMEM; + ceph_con_send(&session->s_con, msg); + return 0; } /* @@ -1034,16 +1118,17 @@ static int trim_caps(struct ceph_mds_client *mdsc, * * Called under s_mutex. */ -static int add_cap_releases(struct ceph_mds_client *mdsc, - struct ceph_mds_session *session, - int extra) +int ceph_add_cap_releases(struct ceph_mds_client *mdsc, + struct ceph_mds_session *session) { - struct ceph_msg *msg; + struct ceph_msg *msg, *partial = NULL; struct ceph_mds_cap_release *head; int err = -ENOMEM; + int extra = mdsc->client->mount_args->cap_release_safety; + int num; - if (extra < 0) - extra = mdsc->client->mount_args->cap_release_safety; + dout("add_cap_releases %p mds%d extra %d\n", session, session->s_mds, + extra); spin_lock(&session->s_cap_lock); @@ -1052,13 +1137,18 @@ static int add_cap_releases(struct ceph_mds_client *mdsc, struct ceph_msg, list_head); head = msg->front.iov_base; - extra += CEPH_CAPS_PER_RELEASE - le32_to_cpu(head->num); + num = le32_to_cpu(head->num); + if (num) { + dout(" partial %p with (%d/%d)\n", msg, num, + (int)CEPH_CAPS_PER_RELEASE); + extra += CEPH_CAPS_PER_RELEASE - num; + partial = msg; + } } - while (session->s_num_cap_releases < session->s_nr_caps + extra) { spin_unlock(&session->s_cap_lock); msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE, PAGE_CACHE_SIZE, - 0, 0, NULL); + GFP_NOFS); if (!msg) goto out_unlocked; dout("add_cap_releases %p msg %p now %d\n", session, msg, @@ -1071,19 +1161,14 @@ static int add_cap_releases(struct ceph_mds_client *mdsc, session->s_num_cap_releases += CEPH_CAPS_PER_RELEASE; } - if (!list_empty(&session->s_cap_releases)) { - msg = list_first_entry(&session->s_cap_releases, - struct ceph_msg, - list_head); - head = msg->front.iov_base; - if (head->num) { - dout(" queueing non-full %p (%d)\n", msg, - le32_to_cpu(head->num)); - list_move_tail(&msg->list_head, - &session->s_cap_releases_done); - session->s_num_cap_releases -= - CEPH_CAPS_PER_RELEASE - le32_to_cpu(head->num); - } + if (partial) { + head = partial->front.iov_base; + num = le32_to_cpu(head->num); + dout(" queueing partial %p with %d/%d\n", partial, num, + (int)CEPH_CAPS_PER_RELEASE); + list_move_tail(&partial->list_head, + &session->s_cap_releases_done); + session->s_num_cap_releases -= CEPH_CAPS_PER_RELEASE - num; } err = 0; spin_unlock(&session->s_cap_lock); @@ -1144,16 +1229,14 @@ static int check_cap_flush(struct ceph_mds_client *mdsc, u64 want_flush_seq) /* * called under s_mutex */ -static void send_cap_releases(struct ceph_mds_client *mdsc, - struct ceph_mds_session *session) +void ceph_send_cap_releases(struct ceph_mds_client *mdsc, + struct ceph_mds_session *session) { struct ceph_msg *msg; dout("send_cap_releases mds%d\n", session->s_mds); - while (1) { - spin_lock(&session->s_cap_lock); - if (list_empty(&session->s_cap_releases_done)) - break; + spin_lock(&session->s_cap_lock); + while (!list_empty(&session->s_cap_releases_done)) { msg = list_first_entry(&session->s_cap_releases_done, struct ceph_msg, list_head); list_del_init(&msg->list_head); @@ -1161,10 +1244,49 @@ static void send_cap_releases(struct ceph_mds_client *mdsc, msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); dout("send_cap_releases mds%d %p\n", session->s_mds, msg); ceph_con_send(&session->s_con, msg); + spin_lock(&session->s_cap_lock); } spin_unlock(&session->s_cap_lock); } +static void discard_cap_releases(struct ceph_mds_client *mdsc, + struct ceph_mds_session *session) +{ + struct ceph_msg *msg; + struct ceph_mds_cap_release *head; + unsigned num; + + dout("discard_cap_releases mds%d\n", session->s_mds); + spin_lock(&session->s_cap_lock); + + /* zero out the in-progress message */ + msg = list_first_entry(&session->s_cap_releases, + struct ceph_msg, list_head); + head = msg->front.iov_base; + num = le32_to_cpu(head->num); + dout("discard_cap_releases mds%d %p %u\n", session->s_mds, msg, num); + head->num = cpu_to_le32(0); + session->s_num_cap_releases += num; + + /* requeue completed messages */ + while (!list_empty(&session->s_cap_releases_done)) { + msg = list_first_entry(&session->s_cap_releases_done, + struct ceph_msg, list_head); + list_del_init(&msg->list_head); + + head = msg->front.iov_base; + num = le32_to_cpu(head->num); + dout("discard_cap_releases mds%d %p %u\n", session->s_mds, msg, + num); + session->s_num_cap_releases += num; + head->num = cpu_to_le32(0); + msg->front.iov_len = sizeof(*head); + list_add(&msg->list_head, &session->s_cap_releases); + } + + spin_unlock(&session->s_cap_lock); +} + /* * requests */ @@ -1180,6 +1302,8 @@ ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode) if (!req) return ERR_PTR(-ENOMEM); + mutex_init(&req->r_fill_mutex); + req->r_mdsc = mdsc; req->r_started = jiffies; req->r_resend_mds = -1; INIT_LIST_HEAD(&req->r_unsafe_dir_item); @@ -1250,7 +1374,7 @@ retry: len += 1 + temp->d_name.len; temp = temp->d_parent; if (temp == NULL) { - pr_err("build_path_dentry corrupt dentry %p\n", dentry); + pr_err("build_path corrupt dentry %p\n", dentry); return ERR_PTR(-EINVAL); } } @@ -1266,7 +1390,7 @@ retry: struct inode *inode = temp->d_inode; if (inode && ceph_snap(inode) == CEPH_SNAPDIR) { - dout("build_path_dentry path+%d: %p SNAPDIR\n", + dout("build_path path+%d: %p SNAPDIR\n", pos, temp); } else if (stop_on_nosnap && inode && ceph_snap(inode) == CEPH_NOSNAP) { @@ -1277,20 +1401,18 @@ retry: break; strncpy(path + pos, temp->d_name.name, temp->d_name.len); - dout("build_path_dentry path+%d: %p '%.*s'\n", - pos, temp, temp->d_name.len, path + pos); } if (pos) path[--pos] = '/'; temp = temp->d_parent; if (temp == NULL) { - pr_err("build_path_dentry corrupt dentry\n"); + pr_err("build_path corrupt dentry\n"); kfree(path); return ERR_PTR(-EINVAL); } } if (pos != 0) { - pr_err("build_path_dentry did not end path lookup where " + pr_err("build_path did not end path lookup where " "expected, namelen is %d, pos is %d\n", len, pos); /* presumably this is only possible if racing with a rename of one of the parent directories (we can not @@ -1302,7 +1424,7 @@ retry: *base = ceph_ino(temp->d_inode); *plen = len; - dout("build_path_dentry on %p %d built %llx '%.*s'\n", + dout("build_path on %p %d built %llx '%.*s'\n", dentry, atomic_read(&dentry->d_count), *base, len, path); return path; } @@ -1425,9 +1547,11 @@ static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc, if (req->r_old_dentry_drop) len += req->r_old_dentry->d_name.len; - msg = ceph_msg_new(CEPH_MSG_CLIENT_REQUEST, len, 0, 0, NULL); - if (IS_ERR(msg)) + msg = ceph_msg_new(CEPH_MSG_CLIENT_REQUEST, len, GFP_NOFS); + if (!msg) { + msg = ERR_PTR(-ENOMEM); goto out_free2; + } msg->hdr.tid = cpu_to_le64(req->r_tid); @@ -1444,6 +1568,9 @@ static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc, ceph_encode_filepath(&p, end, ino1, path1); ceph_encode_filepath(&p, end, ino2, path2); + /* make note of release offset, in case we need to replay */ + req->r_request_release_offset = p - msg->front.iov_base; + /* cap releases */ releases = 0; if (req->r_inode_drop) @@ -1491,7 +1618,7 @@ static void complete_request(struct ceph_mds_client *mdsc, if (req->r_callback) req->r_callback(mdsc, req); else - complete(&req->r_completion); + complete_all(&req->r_completion); } /* @@ -1507,18 +1634,53 @@ static int __prepare_send_request(struct ceph_mds_client *mdsc, req->r_mds = mds; req->r_attempts++; + if (req->r_inode) { + struct ceph_cap *cap = + ceph_get_cap_for_mds(ceph_inode(req->r_inode), mds); + + if (cap) + req->r_sent_on_mseq = cap->mseq; + else + req->r_sent_on_mseq = -1; + } dout("prepare_send_request %p tid %lld %s (attempt %d)\n", req, req->r_tid, ceph_mds_op_name(req->r_op), req->r_attempts); + if (req->r_got_unsafe) { + /* + * Replay. Do not regenerate message (and rebuild + * paths, etc.); just use the original message. + * Rebuilding paths will break for renames because + * d_move mangles the src name. + */ + msg = req->r_request; + rhead = msg->front.iov_base; + + flags = le32_to_cpu(rhead->flags); + flags |= CEPH_MDS_FLAG_REPLAY; + rhead->flags = cpu_to_le32(flags); + + if (req->r_target_inode) + rhead->ino = cpu_to_le64(ceph_ino(req->r_target_inode)); + + rhead->num_retry = req->r_attempts - 1; + + /* remove cap/dentry releases from message */ + rhead->num_releases = 0; + msg->hdr.front_len = cpu_to_le32(req->r_request_release_offset); + msg->front.iov_len = req->r_request_release_offset; + return 0; + } + if (req->r_request) { ceph_msg_put(req->r_request); req->r_request = NULL; } msg = create_request_message(mdsc, req, mds); if (IS_ERR(msg)) { - req->r_reply = ERR_PTR(PTR_ERR(msg)); + req->r_err = PTR_ERR(msg); complete_request(mdsc, req); - return -PTR_ERR(msg); + return PTR_ERR(msg); } req->r_request = msg; @@ -1531,13 +1693,9 @@ static int __prepare_send_request(struct ceph_mds_client *mdsc, rhead->flags = cpu_to_le32(flags); rhead->num_fwd = req->r_num_fwd; rhead->num_retry = req->r_attempts - 1; + rhead->ino = 0; dout(" r_locked_dir = %p\n", req->r_locked_dir); - - if (req->r_target_inode && req->r_got_unsafe) - rhead->ino = cpu_to_le64(ceph_ino(req->r_target_inode)); - else - rhead->ino = 0; return 0; } @@ -1551,7 +1709,7 @@ static int __do_request(struct ceph_mds_client *mdsc, int mds = -1; int err = -EAGAIN; - if (req->r_reply) + if (req->r_err || req->r_got_result) goto out; if (req->r_timeout && @@ -1608,7 +1766,7 @@ out: return err; finish: - req->r_reply = ERR_PTR(err); + req->r_err = err; complete_request(mdsc, req); goto out; } @@ -1629,10 +1787,9 @@ static void __wake_requests(struct ceph_mds_client *mdsc, /* * Wake up threads with requests pending for @mds, so that they can - * resubmit their requests to a possibly different mds. If @all is set, - * wake up if their requests has been forwarded to @mds, too. + * resubmit their requests to a possibly different mds. */ -static void kick_requests(struct ceph_mds_client *mdsc, int mds, int all) +static void kick_requests(struct ceph_mds_client *mdsc, int mds) { struct ceph_mds_request *req; struct rb_node *p; @@ -1688,64 +1845,78 @@ int ceph_mdsc_do_request(struct ceph_mds_client *mdsc, __register_request(mdsc, req, dir); __do_request(mdsc, req); - /* wait */ - if (!req->r_reply) { - mutex_unlock(&mdsc->mutex); - if (req->r_timeout) { - err = (long)wait_for_completion_interruptible_timeout( - &req->r_completion, req->r_timeout); - if (err == 0) - req->r_reply = ERR_PTR(-EIO); - else if (err < 0) - req->r_reply = ERR_PTR(err); - } else { - err = wait_for_completion_interruptible( - &req->r_completion); - if (err) - req->r_reply = ERR_PTR(err); - } - mutex_lock(&mdsc->mutex); + if (req->r_err) { + err = req->r_err; + __unregister_request(mdsc, req); + dout("do_request early error %d\n", err); + goto out; } - if (IS_ERR(req->r_reply)) { - err = PTR_ERR(req->r_reply); - req->r_reply = NULL; + /* wait */ + mutex_unlock(&mdsc->mutex); + dout("do_request waiting\n"); + if (req->r_timeout) { + err = (long)wait_for_completion_killable_timeout( + &req->r_completion, req->r_timeout); + if (err == 0) + err = -EIO; + } else { + err = wait_for_completion_killable(&req->r_completion); + } + dout("do_request waited, got %d\n", err); + mutex_lock(&mdsc->mutex); - if (err == -ERESTARTSYS) { - /* aborted */ - req->r_aborted = true; + /* only abort if we didn't race with a real reply */ + if (req->r_got_result) { + err = le32_to_cpu(req->r_reply_info.head->result); + } else if (err < 0) { + dout("aborted request %lld with %d\n", req->r_tid, err); - if (req->r_locked_dir && - (req->r_op & CEPH_MDS_OP_WRITE)) { - struct ceph_inode_info *ci = - ceph_inode(req->r_locked_dir); + /* + * ensure we aren't running concurrently with + * ceph_fill_trace or ceph_readdir_prepopulate, which + * rely on locks (dir mutex) held by our caller. + */ + mutex_lock(&req->r_fill_mutex); + req->r_err = err; + req->r_aborted = true; + mutex_unlock(&req->r_fill_mutex); - dout("aborted, clearing I_COMPLETE on %p\n", - req->r_locked_dir); - spin_lock(&req->r_locked_dir->i_lock); - ci->i_ceph_flags &= ~CEPH_I_COMPLETE; - ci->i_release_count++; - spin_unlock(&req->r_locked_dir->i_lock); - } - } else { - /* clean up this request */ - __unregister_request(mdsc, req); - if (!list_empty(&req->r_unsafe_item)) - list_del_init(&req->r_unsafe_item); - complete(&req->r_safe_completion); - } - } else if (req->r_err) { - err = req->r_err; + if (req->r_locked_dir && + (req->r_op & CEPH_MDS_OP_WRITE)) + ceph_invalidate_dir_request(req); } else { - err = le32_to_cpu(req->r_reply_info.head->result); + err = req->r_err; } - mutex_unlock(&mdsc->mutex); +out: + mutex_unlock(&mdsc->mutex); dout("do_request %p done, result %d\n", req, err); return err; } /* + * Invalidate dir I_COMPLETE, dentry lease state on an aborted MDS + * namespace request. + */ +void ceph_invalidate_dir_request(struct ceph_mds_request *req) +{ + struct inode *inode = req->r_locked_dir; + struct ceph_inode_info *ci = ceph_inode(inode); + + dout("invalidate_dir_request %p (I_COMPLETE, lease(s))\n", inode); + spin_lock(&inode->i_lock); + ci->i_ceph_flags &= ~CEPH_I_COMPLETE; + ci->i_release_count++; + spin_unlock(&inode->i_lock); + + if (req->r_dentry) + ceph_invalidate_dentry_lease(req->r_dentry); + if (req->r_old_dentry) + ceph_invalidate_dentry_lease(req->r_old_dentry); +} + +/* * Handle mds reply. * * We take the session mutex and parse and process the reply immediately. @@ -1796,29 +1967,54 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg) mutex_unlock(&mdsc->mutex); goto out; } + if (req->r_got_safe && !head->safe) { + pr_warning("got unsafe after safe on %llu from mds%d\n", + tid, mds); + mutex_unlock(&mdsc->mutex); + goto out; + } result = le32_to_cpu(head->result); /* - * Tolerate 2 consecutive ESTALEs from the same mds. - * FIXME: we should be looking at the cap migrate_seq. + * Handle an ESTALE + * if we're not talking to the authority, send to them + * if the authority has changed while we weren't looking, + * send to new authority + * Otherwise we just have to return an ESTALE */ if (result == -ESTALE) { - req->r_direct_mode = USE_AUTH_MDS; - req->r_num_stale++; - if (req->r_num_stale <= 2) { + dout("got ESTALE on request %llu", req->r_tid); + if (!req->r_inode) { + /* do nothing; not an authority problem */ + } else if (req->r_direct_mode != USE_AUTH_MDS) { + dout("not using auth, setting for that now"); + req->r_direct_mode = USE_AUTH_MDS; __do_request(mdsc, req); mutex_unlock(&mdsc->mutex); goto out; + } else { + struct ceph_inode_info *ci = ceph_inode(req->r_inode); + struct ceph_cap *cap = + ceph_get_cap_for_mds(ci, req->r_mds);; + + dout("already using auth"); + if ((!cap || cap != ci->i_auth_cap) || + (cap->mseq != req->r_sent_on_mseq)) { + dout("but cap changed, so resending"); + __do_request(mdsc, req); + mutex_unlock(&mdsc->mutex); + goto out; + } } - } else { - req->r_num_stale = 0; + dout("have to return ESTALE on request %llu", req->r_tid); } + if (head->safe) { req->r_got_safe = true; __unregister_request(mdsc, req); - complete(&req->r_safe_completion); + complete_all(&req->r_safe_completion); if (req->r_got_unsafe) { /* @@ -1833,15 +2029,11 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg) /* last unsafe request during umount? */ if (mdsc->stopping && !__get_oldest_req(mdsc)) - complete(&mdsc->safe_umount_waiters); + complete_all(&mdsc->safe_umount_waiters); mutex_unlock(&mdsc->mutex); goto out; } - } - - BUG_ON(req->r_reply); - - if (!head->safe) { + } else { req->r_got_unsafe = true; list_add_tail(&req->r_unsafe_item, &req->r_session->s_unsafe); } @@ -1870,23 +2062,32 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg) } /* insert trace into our cache */ + mutex_lock(&req->r_fill_mutex); err = ceph_fill_trace(mdsc->client->sb, req, req->r_session); if (err == 0) { if (result == 0 && rinfo->dir_nr) ceph_readdir_prepopulate(req, req->r_session); - ceph_unreserve_caps(&req->r_caps_reservation); + ceph_unreserve_caps(mdsc, &req->r_caps_reservation); } + mutex_unlock(&req->r_fill_mutex); up_read(&mdsc->snap_rwsem); out_err: - if (err) { - req->r_err = err; + mutex_lock(&mdsc->mutex); + if (!req->r_aborted) { + if (err) { + req->r_err = err; + } else { + req->r_reply = msg; + ceph_msg_get(msg); + req->r_got_result = true; + } } else { - req->r_reply = msg; - ceph_msg_get(msg); + dout("reply arrived after request %lld was aborted\n", tid); } + mutex_unlock(&mdsc->mutex); - add_cap_releases(mdsc, req->r_session, -1); + ceph_add_cap_releases(mdsc, req->r_session); mutex_unlock(&session->s_mutex); /* kick calling process */ @@ -1920,16 +2121,21 @@ static void handle_forward(struct ceph_mds_client *mdsc, mutex_lock(&mdsc->mutex); req = __lookup_request(mdsc, tid); if (!req) { - dout("forward %llu to mds%d - req dne\n", tid, next_mds); + dout("forward tid %llu to mds%d - req dne\n", tid, next_mds); goto out; /* dup reply? */ } - if (fwd_seq <= req->r_num_fwd) { - dout("forward %llu to mds%d - old seq %d <= %d\n", + if (req->r_aborted) { + dout("forward tid %llu aborted, unregistering\n", tid); + __unregister_request(mdsc, req); + } else if (fwd_seq <= req->r_num_fwd) { + dout("forward tid %llu to mds%d - old seq %d <= %d\n", tid, next_mds, req->r_num_fwd, fwd_seq); } else { /* resend. forward race not possible; mds would drop */ - dout("forward %llu to mds%d (we resend)\n", tid, next_mds); + dout("forward tid %llu to mds%d (we resend)\n", tid, next_mds); + BUG_ON(req->r_err); + BUG_ON(req->r_got_result); req->r_num_fwd = fwd_seq; req->r_resend_mds = next_mds; put_request_session(req); @@ -1983,6 +2189,8 @@ static void handle_session(struct ceph_mds_session *session, switch (op) { case CEPH_SESSION_OPEN: + if (session->s_state == CEPH_MDS_SESSION_RECONNECTING) + pr_info("mds%d reconnect success\n", session->s_mds); session->s_state = CEPH_MDS_SESSION_OPEN; renewed_caps(mdsc, session, 0); wake = 1; @@ -1996,10 +2204,12 @@ static void handle_session(struct ceph_mds_session *session, break; case CEPH_SESSION_CLOSE: + if (session->s_state == CEPH_MDS_SESSION_RECONNECTING) + pr_info("mds%d reconnect denied\n", session->s_mds); remove_session_caps(session); wake = 1; /* for good measure */ - complete(&mdsc->session_close_waiters); - kick_requests(mdsc, mds, 0); /* cur only */ + complete_all(&mdsc->session_close_waiters); + kick_requests(mdsc, mds); break; case CEPH_SESSION_STALE: @@ -2065,9 +2275,14 @@ static void replay_unsafe_requests(struct ceph_mds_client *mdsc, static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg) { - struct ceph_mds_cap_reconnect rec; + union { + struct ceph_mds_cap_reconnect v2; + struct ceph_mds_cap_reconnect_v1 v1; + } rec; + size_t reclen; struct ceph_inode_info *ci; - struct ceph_pagelist *pagelist = arg; + struct ceph_reconnect_state *recon_state = arg; + struct ceph_pagelist *pagelist = recon_state->pagelist; char *path; int pathlen, err; u64 pathbase; @@ -2100,17 +2315,44 @@ static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap, spin_lock(&inode->i_lock); cap->seq = 0; /* reset cap seq */ cap->issue_seq = 0; /* and issue_seq */ - rec.cap_id = cpu_to_le64(cap->cap_id); - rec.pathbase = cpu_to_le64(pathbase); - rec.wanted = cpu_to_le32(__ceph_caps_wanted(ci)); - rec.issued = cpu_to_le32(cap->issued); - rec.size = cpu_to_le64(inode->i_size); - ceph_encode_timespec(&rec.mtime, &inode->i_mtime); - ceph_encode_timespec(&rec.atime, &inode->i_atime); - rec.snaprealm = cpu_to_le64(ci->i_snap_realm->ino); + + if (recon_state->flock) { + rec.v2.cap_id = cpu_to_le64(cap->cap_id); + rec.v2.wanted = cpu_to_le32(__ceph_caps_wanted(ci)); + rec.v2.issued = cpu_to_le32(cap->issued); + rec.v2.snaprealm = cpu_to_le64(ci->i_snap_realm->ino); + rec.v2.pathbase = cpu_to_le64(pathbase); + rec.v2.flock_len = 0; + reclen = sizeof(rec.v2); + } else { + rec.v1.cap_id = cpu_to_le64(cap->cap_id); + rec.v1.wanted = cpu_to_le32(__ceph_caps_wanted(ci)); + rec.v1.issued = cpu_to_le32(cap->issued); + rec.v1.size = cpu_to_le64(inode->i_size); + ceph_encode_timespec(&rec.v1.mtime, &inode->i_mtime); + ceph_encode_timespec(&rec.v1.atime, &inode->i_atime); + rec.v1.snaprealm = cpu_to_le64(ci->i_snap_realm->ino); + rec.v1.pathbase = cpu_to_le64(pathbase); + reclen = sizeof(rec.v1); + } spin_unlock(&inode->i_lock); - err = ceph_pagelist_append(pagelist, &rec, sizeof(rec)); + if (recon_state->flock) { + int num_fcntl_locks, num_flock_locks; + + lock_kernel(); + ceph_count_locks(inode, &num_fcntl_locks, &num_flock_locks); + rec.v2.flock_len = (2*sizeof(u32) + + (num_fcntl_locks+num_flock_locks) * + sizeof(struct ceph_filelock)); + + err = ceph_pagelist_append(pagelist, &rec, reclen); + if (!err) + err = ceph_encode_locks(inode, pagelist, + num_fcntl_locks, + num_flock_locks); + unlock_kernel(); + } out: kfree(path); @@ -2131,61 +2373,55 @@ out: * * called with mdsc->mutex held. */ -static void send_mds_reconnect(struct ceph_mds_client *mdsc, int mds) +static void send_mds_reconnect(struct ceph_mds_client *mdsc, + struct ceph_mds_session *session) { - struct ceph_mds_session *session = NULL; struct ceph_msg *reply; struct rb_node *p; - int err; + int mds = session->s_mds; + int err = -ENOMEM; struct ceph_pagelist *pagelist; + struct ceph_reconnect_state recon_state; - pr_info("reconnect to recovering mds%d\n", mds); + pr_info("mds%d reconnect start\n", mds); pagelist = kmalloc(sizeof(*pagelist), GFP_NOFS); if (!pagelist) goto fail_nopagelist; ceph_pagelist_init(pagelist); - reply = ceph_msg_new(CEPH_MSG_CLIENT_RECONNECT, 0, 0, 0, NULL); - if (IS_ERR(reply)) { - err = PTR_ERR(reply); + reply = ceph_msg_new(CEPH_MSG_CLIENT_RECONNECT, 0, GFP_NOFS); + if (!reply) goto fail_nomsg; - } - - /* find session */ - session = __ceph_lookup_mds_session(mdsc, mds); - mutex_unlock(&mdsc->mutex); /* drop lock for duration */ - if (session) { - mutex_lock(&session->s_mutex); + mutex_lock(&session->s_mutex); + session->s_state = CEPH_MDS_SESSION_RECONNECTING; + session->s_seq = 0; - session->s_state = CEPH_MDS_SESSION_RECONNECTING; - session->s_seq = 0; + ceph_con_open(&session->s_con, + ceph_mdsmap_get_addr(mdsc->mdsmap, mds)); - ceph_con_open(&session->s_con, - ceph_mdsmap_get_addr(mdsc->mdsmap, mds)); - - /* replay unsafe requests */ - replay_unsafe_requests(mdsc, session); - } else { - dout("no session for mds%d, will send short reconnect\n", - mds); - } + /* replay unsafe requests */ + replay_unsafe_requests(mdsc, session); down_read(&mdsc->snap_rwsem); - if (!session) - goto send; dout("session %p state %s\n", session, session_state_name(session->s_state)); + /* drop old cap expires; we're about to reestablish that state */ + discard_cap_releases(mdsc, session); + /* traverse this session's caps */ err = ceph_pagelist_encode_32(pagelist, session->s_nr_caps); if (err) goto fail; - err = iterate_session_caps(session, encode_caps_cb, pagelist); + + recon_state.pagelist = pagelist; + recon_state.flock = session->s_con.peer_features & CEPH_FEATURE_FLOCK; + err = iterate_session_caps(session, encode_caps_cb, &recon_state); if (err < 0) - goto out; + goto fail; /* * snaprealms. we provide mds with the ino, seq (version), and @@ -2207,34 +2443,32 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc, int mds) goto fail; } -send: reply->pagelist = pagelist; + if (recon_state.flock) + reply->hdr.version = cpu_to_le16(2); reply->hdr.data_len = cpu_to_le32(pagelist->length); reply->nr_pages = calc_pages_for(0, pagelist->length); ceph_con_send(&session->s_con, reply); - if (session) { - session->s_state = CEPH_MDS_SESSION_OPEN; - __wake_requests(mdsc, &session->s_waiting); - } + mutex_unlock(&session->s_mutex); -out: - up_read(&mdsc->snap_rwsem); - if (session) { - mutex_unlock(&session->s_mutex); - ceph_put_mds_session(session); - } mutex_lock(&mdsc->mutex); + __wake_requests(mdsc, &session->s_waiting); + mutex_unlock(&mdsc->mutex); + + up_read(&mdsc->snap_rwsem); return; fail: ceph_msg_put(reply); + up_read(&mdsc->snap_rwsem); + mutex_unlock(&session->s_mutex); fail_nomsg: ceph_pagelist_release(pagelist); kfree(pagelist); fail_nopagelist: - pr_err("ENOMEM preparing reconnect for mds%d\n", mds); - goto out; + pr_err("error %d preparing reconnect for mds%d\n", err, mds); + return; } @@ -2262,9 +2496,11 @@ static void check_new_map(struct ceph_mds_client *mdsc, oldstate = ceph_mdsmap_get_state(oldmap, i); newstate = ceph_mdsmap_get_state(newmap, i); - dout("check_new_map mds%d state %s -> %s (session %s)\n", + dout("check_new_map mds%d state %s%s -> %s%s (session %s)\n", i, ceph_mds_state_name(oldstate), + ceph_mdsmap_is_laggy(oldmap, i) ? " (laggy)" : "", ceph_mds_state_name(newstate), + ceph_mdsmap_is_laggy(newmap, i) ? " (laggy)" : "", session_state_name(s->s_state)); if (memcmp(ceph_mdsmap_get_addr(oldmap, i), @@ -2286,7 +2522,7 @@ static void check_new_map(struct ceph_mds_client *mdsc, } /* kick any requests waiting on the recovering mds */ - kick_requests(mdsc, i, 1); + kick_requests(mdsc, i); } else if (oldstate == newstate) { continue; /* nothing new with this mds */ } @@ -2295,26 +2531,40 @@ static void check_new_map(struct ceph_mds_client *mdsc, * send reconnect? */ if (s->s_state == CEPH_MDS_SESSION_RESTARTING && - newstate >= CEPH_MDS_STATE_RECONNECT) - send_mds_reconnect(mdsc, i); + newstate >= CEPH_MDS_STATE_RECONNECT) { + mutex_unlock(&mdsc->mutex); + send_mds_reconnect(mdsc, s); + mutex_lock(&mdsc->mutex); + } /* - * kick requests on any mds that has gone active. - * - * kick requests on cur or forwarder: we may have sent - * the request to mds1, mds1 told us it forwarded it - * to mds2, but then we learn mds1 failed and can't be - * sure it successfully forwarded our request before - * it died. + * kick request on any mds that has gone active. */ if (oldstate < CEPH_MDS_STATE_ACTIVE && newstate >= CEPH_MDS_STATE_ACTIVE) { - pr_info("mds%d reconnect completed\n", s->s_mds); - kick_requests(mdsc, i, 1); + if (oldstate != CEPH_MDS_STATE_CREATING && + oldstate != CEPH_MDS_STATE_STARTING) + pr_info("mds%d recovery completed\n", s->s_mds); + kick_requests(mdsc, i); ceph_kick_flushing_caps(mdsc, s); wake_up_session_caps(s, 1); } } + + for (i = 0; i < newmap->m_max_mds && i < mdsc->max_sessions; i++) { + s = mdsc->sessions[i]; + if (!s) + continue; + if (!ceph_mdsmap_is_laggy(newmap, i)) + continue; + if (s->s_state == CEPH_MDS_SESSION_OPEN || + s->s_state == CEPH_MDS_SESSION_HUNG || + s->s_state == CEPH_MDS_SESSION_CLOSING) { + dout(" connecting to export targets of laggy mds%d\n", + i); + __open_export_target_sessions(mdsc, s); + } + } } @@ -2345,6 +2595,7 @@ static void handle_lease(struct ceph_mds_client *mdsc, struct ceph_dentry_info *di; int mds = session->s_mds; struct ceph_mds_lease *h = msg->front.iov_base; + u32 seq; struct ceph_vino vino; int mask; struct qstr dname; @@ -2358,6 +2609,7 @@ static void handle_lease(struct ceph_mds_client *mdsc, vino.ino = le64_to_cpu(h->ino); vino.snap = CEPH_NOSNAP; mask = le16_to_cpu(h->mask); + seq = le32_to_cpu(h->seq); dname.name = (void *)h + sizeof(*h) + sizeof(u32); dname.len = msg->front.iov_len - sizeof(*h) - sizeof(u32); if (dname.len != get_unaligned_le32(h+1)) @@ -2368,8 +2620,9 @@ static void handle_lease(struct ceph_mds_client *mdsc, /* lookup inode */ inode = ceph_find_inode(sb, vino); - dout("handle_lease '%s', mask %d, ino %llx %p\n", - ceph_lease_op_name(h->action), mask, vino.ino, inode); + dout("handle_lease %s, mask %d, ino %llx %p %.*s\n", + ceph_lease_op_name(h->action), mask, vino.ino, inode, + dname.len, dname.name); if (inode == NULL) { dout("handle_lease no inode %llx\n", vino.ino); goto release; @@ -2394,7 +2647,8 @@ static void handle_lease(struct ceph_mds_client *mdsc, switch (h->action) { case CEPH_MDS_LEASE_REVOKE: if (di && di->lease_session == session) { - h->seq = cpu_to_le32(di->lease_seq); + if (ceph_seq_cmp(di->lease_seq, seq) > 0) + h->seq = cpu_to_le32(di->lease_seq); __ceph_mdsc_drop_dentry_lease(dentry); } release = 1; @@ -2408,7 +2662,7 @@ static void handle_lease(struct ceph_mds_client *mdsc, unsigned long duration = le32_to_cpu(h->duration_ms) * HZ / 1000; - di->lease_seq = le32_to_cpu(h->seq); + di->lease_seq = seq; dentry->d_time = di->lease_renew_from + duration; di->lease_renew_after = di->lease_renew_from + (duration >> 1); @@ -2453,12 +2707,12 @@ void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session, dnamelen = dentry->d_name.len; len += dnamelen; - msg = ceph_msg_new(CEPH_MSG_CLIENT_LEASE, len, 0, 0, NULL); - if (IS_ERR(msg)) + msg = ceph_msg_new(CEPH_MSG_CLIENT_LEASE, len, GFP_NOFS); + if (!msg) return; lease = msg->front.iov_base; lease->action = action; - lease->mask = cpu_to_le16(CEPH_LOCK_DN); + lease->mask = cpu_to_le16(1); lease->ino = cpu_to_le64(ceph_vino(inode).ino); lease->first = lease->last = cpu_to_le64(ceph_vino(inode).snap); lease->seq = cpu_to_le32(seq); @@ -2488,7 +2742,7 @@ void ceph_mdsc_lease_release(struct ceph_mds_client *mdsc, struct inode *inode, BUG_ON(inode == NULL); BUG_ON(dentry == NULL); - BUG_ON(mask != CEPH_LOCK_DN); + BUG_ON(mask == 0); /* is dentry lease valid? */ spin_lock(&dentry->d_lock); @@ -2598,8 +2852,10 @@ static void delayed_work(struct work_struct *work) send_renew_caps(mdsc, s); else ceph_con_keepalive(&s->s_con); - add_cap_releases(mdsc, s, -1); - send_cap_releases(mdsc, s); + ceph_add_cap_releases(mdsc, s); + if (s->s_state == CEPH_MDS_SESSION_OPEN || + s->s_state == CEPH_MDS_SESSION_HUNG) + ceph_send_cap_releases(mdsc, s); mutex_unlock(&s->s_mutex); ceph_put_mds_session(s); @@ -2616,6 +2872,9 @@ int ceph_mdsc_init(struct ceph_mds_client *mdsc, struct ceph_client *client) mdsc->client = client; mutex_init(&mdsc->mutex); mdsc->mdsmap = kzalloc(sizeof(*mdsc->mdsmap), GFP_NOFS); + if (mdsc->mdsmap == NULL) + return -ENOMEM; + init_completion(&mdsc->safe_umount_waiters); init_completion(&mdsc->session_close_waiters); INIT_LIST_HEAD(&mdsc->waiting_for_map); @@ -2641,6 +2900,10 @@ int ceph_mdsc_init(struct ceph_mds_client *mdsc, struct ceph_client *client) init_waitqueue_head(&mdsc->cap_flushing_wq); spin_lock_init(&mdsc->dentry_lru_lock); INIT_LIST_HEAD(&mdsc->dentry_lru); + + ceph_caps_init(mdsc); + ceph_adjust_min_caps(mdsc, client->min_caps); + return 0; } @@ -2685,6 +2948,12 @@ void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc) drop_leases(mdsc); ceph_flush_dirty_caps(mdsc); wait_requests(mdsc); + + /* + * wait for reply handlers to drop their request refs and + * their inode/dcache refs + */ + ceph_msgr_flush(); } /* @@ -2736,6 +3005,9 @@ void ceph_mdsc_sync(struct ceph_mds_client *mdsc) { u64 want_tid, want_flush; + if (mdsc->client->mount_state == CEPH_MOUNT_SHUTDOWN) + return; + dout("sync\n"); mutex_lock(&mdsc->mutex); want_tid = mdsc->last_tid; @@ -2827,6 +3099,7 @@ void ceph_mdsc_stop(struct ceph_mds_client *mdsc) if (mdsc->mdsmap) ceph_mdsmap_destroy(mdsc->mdsmap); kfree(mdsc->sessions); + ceph_caps_finalize(mdsc); } @@ -2918,9 +3191,10 @@ static void con_put(struct ceph_connection *con) static void peer_reset(struct ceph_connection *con) { struct ceph_mds_session *s = con->private; + struct ceph_mds_client *mdsc = s->s_mdsc; - pr_err("mds%d gave us the boot. IMPLEMENT RECONNECT.\n", - s->s_mds); + pr_warning("mds%d closed our session\n", s->s_mds); + send_mds_reconnect(mdsc, s); } static void dispatch(struct ceph_connection *con, struct ceph_msg *msg) @@ -3027,7 +3301,7 @@ static int invalidate_authorizer(struct ceph_connection *con) return ceph_monc_validate_auth(&mdsc->client->monc); } -const static struct ceph_connection_operations mds_con_ops = { +static const struct ceph_connection_operations mds_con_ops = { .get = con_get, .put = con_put, .dispatch = dispatch, diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h index 961cc6f6587..ab7e89f5e34 100644 --- a/fs/ceph/mds_client.h +++ b/fs/ceph/mds_client.h @@ -151,6 +151,7 @@ typedef void (*ceph_mds_request_callback_t) (struct ceph_mds_client *mdsc, struct ceph_mds_request { u64 r_tid; /* transaction id */ struct rb_node r_node; + struct ceph_mds_client *r_mdsc; int r_op; /* mds op code */ int r_mds; @@ -165,6 +166,8 @@ struct ceph_mds_request { struct inode *r_locked_dir; /* dir (if any) i_mutex locked by vfs */ struct inode *r_target_inode; /* resulting inode */ + struct mutex r_fill_mutex; + union ceph_mds_request_args r_args; int r_fmode; /* file mode, if expecting cap */ @@ -186,6 +189,7 @@ struct ceph_mds_request { int r_old_inode_drop, r_old_inode_unless; struct ceph_msg *r_request; /* original request */ + int r_request_release_offset; struct ceph_msg *r_reply; struct ceph_mds_reply_info_parsed r_reply_info; int r_err; @@ -204,8 +208,8 @@ struct ceph_mds_request { int r_attempts; /* resend attempts */ int r_num_fwd; /* number of forward attempts */ - int r_num_stale; int r_resend_mds; /* mds to resend to next, if any*/ + u32 r_sent_on_mseq; /* cap mseq request was sent at*/ struct kref r_kref; struct list_head r_wait; @@ -213,7 +217,7 @@ struct ceph_mds_request { struct completion r_safe_completion; ceph_mds_request_callback_t r_callback; struct list_head r_unsafe_item; /* per-session unsafe list item */ - bool r_got_unsafe, r_got_safe; + bool r_got_unsafe, r_got_safe, r_got_result; bool r_did_prepopulate; u32 r_readdir_offset; @@ -264,6 +268,27 @@ struct ceph_mds_client { spinlock_t cap_dirty_lock; /* protects above items */ wait_queue_head_t cap_flushing_wq; + /* + * Cap reservations + * + * Maintain a global pool of preallocated struct ceph_caps, referenced + * by struct ceph_caps_reservations. This ensures that we preallocate + * memory needed to successfully process an MDS response. (If an MDS + * sends us cap information and we fail to process it, we will have + * problems due to the client and MDS being out of sync.) + * + * Reservations are 'owned' by a ceph_cap_reservation context. + */ + spinlock_t caps_list_lock; + struct list_head caps_list; /* unused (reserved or + unreserved) */ + int caps_total_count; /* total caps allocated */ + int caps_use_count; /* in use */ + int caps_reserve_count; /* unused, reserved */ + int caps_avail_count; /* unused, unreserved */ + int caps_min_count; /* keep at least this many + (unreserved) */ + #ifdef CONFIG_DEBUG_FS struct dentry *debugfs_file; #endif @@ -301,6 +326,8 @@ extern void ceph_mdsc_lease_release(struct ceph_mds_client *mdsc, struct inode *inode, struct dentry *dn, int mask); +extern void ceph_invalidate_dir_request(struct ceph_mds_request *req); + extern struct ceph_mds_request * ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode); extern void ceph_mdsc_submit_request(struct ceph_mds_client *mdsc, @@ -318,6 +345,11 @@ static inline void ceph_mdsc_put_request(struct ceph_mds_request *req) kref_put(&req->r_kref, ceph_mdsc_release_request); } +extern int ceph_add_cap_releases(struct ceph_mds_client *mdsc, + struct ceph_mds_session *session); +extern void ceph_send_cap_releases(struct ceph_mds_client *mdsc, + struct ceph_mds_session *session); + extern void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc); extern char *ceph_mdsc_build_path(struct dentry *dentry, int *plen, u64 *base, @@ -332,4 +364,7 @@ extern void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session, extern void ceph_mdsc_handle_map(struct ceph_mds_client *mdsc, struct ceph_msg *msg); +extern void ceph_mdsc_open_export_target_sessions(struct ceph_mds_client *mdsc, + struct ceph_mds_session *session); + #endif diff --git a/fs/ceph/mdsmap.c b/fs/ceph/mdsmap.c index c4c498e6dfe..040be6d1150 100644 --- a/fs/ceph/mdsmap.c +++ b/fs/ceph/mdsmap.c @@ -85,6 +85,7 @@ struct ceph_mdsmap *ceph_mdsmap_decode(void **p, void *end) struct ceph_entity_addr addr; u32 num_export_targets; void *pexport_targets = NULL; + struct ceph_timespec laggy_since; ceph_decode_need(p, end, sizeof(u64)*2 + 1 + sizeof(u32), bad); global_id = ceph_decode_64(p); @@ -103,7 +104,7 @@ struct ceph_mdsmap *ceph_mdsmap_decode(void **p, void *end) state_seq = ceph_decode_64(p); ceph_decode_copy(p, &addr, sizeof(addr)); ceph_decode_addr(&addr); - *p += sizeof(struct ceph_timespec); + ceph_decode_copy(p, &laggy_since, sizeof(laggy_since)); *p += sizeof(u32); ceph_decode_32_safe(p, end, namelen, bad); *p += namelen; @@ -122,6 +123,9 @@ struct ceph_mdsmap *ceph_mdsmap_decode(void **p, void *end) m->m_info[mds].global_id = global_id; m->m_info[mds].state = state; m->m_info[mds].addr = addr; + m->m_info[mds].laggy = + (laggy_since.tv_sec != 0 || + laggy_since.tv_nsec != 0); m->m_info[mds].num_export_targets = num_export_targets; if (num_export_targets) { m->m_info[mds].export_targets = diff --git a/fs/ceph/mdsmap.h b/fs/ceph/mdsmap.h index eacc131aa5c..4c5cb0880bb 100644 --- a/fs/ceph/mdsmap.h +++ b/fs/ceph/mdsmap.h @@ -13,6 +13,7 @@ struct ceph_mds_info { struct ceph_entity_addr addr; s32 state; int num_export_targets; + bool laggy; u32 *export_targets; }; @@ -47,6 +48,13 @@ static inline int ceph_mdsmap_get_state(struct ceph_mdsmap *m, int w) return m->m_info[w].state; } +static inline bool ceph_mdsmap_is_laggy(struct ceph_mdsmap *m, int w) +{ + if (w >= 0 && w < m->m_max_mds) + return m->m_info[w].laggy; + return false; +} + extern int ceph_mdsmap_get_random_mds(struct ceph_mdsmap *m); extern struct ceph_mdsmap *ceph_mdsmap_decode(void **p, void *end); extern void ceph_mdsmap_destroy(struct ceph_mdsmap *m); diff --git a/fs/ceph/messenger.c b/fs/ceph/messenger.c index 509f57d9ccb..2502d76fcec 100644 --- a/fs/ceph/messenger.c +++ b/fs/ceph/messenger.c @@ -39,23 +39,12 @@ static void queue_con(struct ceph_connection *con); static void con_work(struct work_struct *); static void ceph_fault(struct ceph_connection *con); -const char *ceph_name_type_str(int t) -{ - switch (t) { - case CEPH_ENTITY_TYPE_MON: return "mon"; - case CEPH_ENTITY_TYPE_MDS: return "mds"; - case CEPH_ENTITY_TYPE_OSD: return "osd"; - case CEPH_ENTITY_TYPE_CLIENT: return "client"; - case CEPH_ENTITY_TYPE_ADMIN: return "admin"; - default: return "???"; - } -} - /* * nicely render a sockaddr as a string. */ #define MAX_ADDR_STR 20 -static char addr_str[MAX_ADDR_STR][40]; +#define MAX_ADDR_STR_LEN 60 +static char addr_str[MAX_ADDR_STR][MAX_ADDR_STR_LEN]; static DEFINE_SPINLOCK(addr_str_lock); static int last_addr_str; @@ -64,7 +53,6 @@ const char *pr_addr(const struct sockaddr_storage *ss) int i; char *s; struct sockaddr_in *in4 = (void *)ss; - unsigned char *quad = (void *)&in4->sin_addr.s_addr; struct sockaddr_in6 *in6 = (void *)ss; spin_lock(&addr_str_lock); @@ -76,25 +64,13 @@ const char *pr_addr(const struct sockaddr_storage *ss) switch (ss->ss_family) { case AF_INET: - sprintf(s, "%u.%u.%u.%u:%u", - (unsigned int)quad[0], - (unsigned int)quad[1], - (unsigned int)quad[2], - (unsigned int)quad[3], - (unsigned int)ntohs(in4->sin_port)); + snprintf(s, MAX_ADDR_STR_LEN, "%pI4:%u", &in4->sin_addr, + (unsigned int)ntohs(in4->sin_port)); break; case AF_INET6: - sprintf(s, "%04x:%04x:%04x:%04x:%04x:%04x:%04x:%04x:%u", - in6->sin6_addr.s6_addr16[0], - in6->sin6_addr.s6_addr16[1], - in6->sin6_addr.s6_addr16[2], - in6->sin6_addr.s6_addr16[3], - in6->sin6_addr.s6_addr16[4], - in6->sin6_addr.s6_addr16[5], - in6->sin6_addr.s6_addr16[6], - in6->sin6_addr.s6_addr16[7], - (unsigned int)ntohs(in6->sin6_port)); + snprintf(s, MAX_ADDR_STR_LEN, "[%pI6c]:%u", &in6->sin6_addr, + (unsigned int)ntohs(in6->sin6_port)); break; default: @@ -132,6 +108,12 @@ void ceph_msgr_exit(void) destroy_workqueue(ceph_msgr_wq); } +void ceph_msgr_flush(void) +{ + flush_workqueue(ceph_msgr_wq); +} + + /* * socket callback functions */ @@ -221,12 +203,13 @@ static void set_sock_callbacks(struct socket *sock, */ static struct socket *ceph_tcp_connect(struct ceph_connection *con) { - struct sockaddr *paddr = (struct sockaddr *)&con->peer_addr.in_addr; + struct sockaddr_storage *paddr = &con->peer_addr.in_addr; struct socket *sock; int ret; BUG_ON(con->sock); - ret = sock_create_kern(AF_INET, SOCK_STREAM, IPPROTO_TCP, &sock); + ret = sock_create_kern(con->peer_addr.in_addr.ss_family, SOCK_STREAM, + IPPROTO_TCP, &sock); if (ret) return ERR_PTR(ret); con->sock = sock; @@ -240,7 +223,8 @@ static struct socket *ceph_tcp_connect(struct ceph_connection *con) dout("connect %s\n", pr_addr(&con->peer_addr.in_addr)); - ret = sock->ops->connect(sock, paddr, sizeof(*paddr), O_NONBLOCK); + ret = sock->ops->connect(sock, (struct sockaddr *)paddr, sizeof(*paddr), + O_NONBLOCK); if (ret == -EINPROGRESS) { dout("connect %s EINPROGRESS sk_state = %u\n", pr_addr(&con->peer_addr.in_addr), @@ -340,6 +324,7 @@ static void reset_connection(struct ceph_connection *con) ceph_msg_put(con->out_msg); con->out_msg = NULL; } + con->out_keepalive_pending = false; con->in_seq = 0; con->in_seq_acked = 0; } @@ -357,6 +342,7 @@ void ceph_con_close(struct ceph_connection *con) clear_bit(WRITE_PENDING, &con->state); mutex_lock(&con->mutex); reset_connection(con); + con->peer_global_seq = 0; cancel_delayed_work(&con->work); mutex_unlock(&con->mutex); queue_con(con); @@ -492,7 +478,14 @@ static void prepare_write_message(struct ceph_connection *con) list_move_tail(&m->list_head, &con->out_sent); } - m->hdr.seq = cpu_to_le64(++con->out_seq); + /* + * only assign outgoing seq # if we haven't sent this message + * yet. if it is requeued, resend with it's original seq. + */ + if (m->needs_out_seq) { + m->hdr.seq = cpu_to_le64(++con->out_seq); + m->needs_out_seq = false; + } dout("prepare_write_message %p seq %lld type %d len %d+%d+%d %d pgs\n", m, con->out_seq, le16_to_cpu(m->hdr.type), @@ -654,7 +647,7 @@ static void prepare_write_connect(struct ceph_messenger *msgr, dout("prepare_write_connect %p cseq=%d gseq=%d proto=%d\n", con, con->connect_seq, global_seq, proto); - con->out_connect.features = CEPH_FEATURE_SUPPORTED; + con->out_connect.features = cpu_to_le64(CEPH_FEATURE_SUPPORTED); con->out_connect.host_type = cpu_to_le32(CEPH_ENTITY_TYPE_CLIENT); con->out_connect.connect_seq = cpu_to_le32(con->connect_seq); con->out_connect.global_seq = cpu_to_le32(global_seq); @@ -1006,19 +999,32 @@ int ceph_parse_ips(const char *c, const char *end, struct sockaddr_in *in4 = (void *)ss; struct sockaddr_in6 *in6 = (void *)ss; int port; + char delim = ','; + + if (*p == '[') { + delim = ']'; + p++; + } memset(ss, 0, sizeof(*ss)); if (in4_pton(p, end - p, (u8 *)&in4->sin_addr.s_addr, - ',', &ipend)) { + delim, &ipend)) ss->ss_family = AF_INET; - } else if (in6_pton(p, end - p, (u8 *)&in6->sin6_addr.s6_addr, - ',', &ipend)) { + else if (in6_pton(p, end - p, (u8 *)&in6->sin6_addr.s6_addr, + delim, &ipend)) ss->ss_family = AF_INET6; - } else { + else goto bad; - } p = ipend; + if (delim == ']') { + if (*p != ']') { + dout("missing matching ']'\n"); + goto bad; + } + p++; + } + /* port? */ if (p < end && *p == ':') { port = 0; @@ -1052,7 +1058,7 @@ int ceph_parse_ips(const char *c, const char *end, return 0; bad: - pr_err("parse_ips bad ip '%s'\n", c); + pr_err("parse_ips bad ip '%.*s'\n", (int)(end - c), c); return -EINVAL; } @@ -1075,11 +1081,11 @@ static int process_banner(struct ceph_connection *con) sizeof(con->peer_addr)) != 0 && !(addr_is_blank(&con->actual_peer_addr.in_addr) && con->actual_peer_addr.nonce == con->peer_addr.nonce)) { - pr_warning("wrong peer, want %s/%lld, got %s/%lld\n", + pr_warning("wrong peer, want %s/%d, got %s/%d\n", pr_addr(&con->peer_addr.in_addr), - le64_to_cpu(con->peer_addr.nonce), + (int)le32_to_cpu(con->peer_addr.nonce), pr_addr(&con->actual_peer_addr.in_addr), - le64_to_cpu(con->actual_peer_addr.nonce)); + (int)le32_to_cpu(con->actual_peer_addr.nonce)); con->error_msg = "wrong peer at address"; return -1; } @@ -1226,6 +1232,7 @@ static int process_connect(struct ceph_connection *con) clear_bit(CONNECTING, &con->state); con->peer_global_seq = le32_to_cpu(con->in_reply.global_seq); con->connect_seq++; + con->peer_features = server_feat; dout("process_connect got READY gseq %d cseq %d (%d)\n", con->peer_global_seq, le32_to_cpu(con->in_reply.connect_seq), @@ -1295,8 +1302,8 @@ static void process_ack(struct ceph_connection *con) static int read_partial_message_section(struct ceph_connection *con, - struct kvec *section, unsigned int sec_len, - u32 *crc) + struct kvec *section, + unsigned int sec_len, u32 *crc) { int left; int ret; @@ -1392,22 +1399,22 @@ static int read_partial_message(struct ceph_connection *con) if (!con->in_msg) { dout("got hdr type %d front %d data %d\n", con->in_hdr.type, con->in_hdr.front_len, con->in_hdr.data_len); + skip = 0; con->in_msg = ceph_alloc_msg(con, &con->in_hdr, &skip); if (skip) { /* skip this message */ - dout("alloc_msg returned NULL, skipping message\n"); + dout("alloc_msg said skip message\n"); + BUG_ON(con->in_msg); con->in_base_pos = -front_len - middle_len - data_len - sizeof(m->footer); con->in_tag = CEPH_MSGR_TAG_READY; con->in_seq++; return 0; } - if (IS_ERR(con->in_msg)) { - ret = PTR_ERR(con->in_msg); - con->in_msg = NULL; + if (!con->in_msg) { con->error_msg = "error allocating memory for incoming message"; - return ret; + return -ENOMEM; } m = con->in_msg; m->front.iov_len = 0; /* haven't read it yet */ @@ -1427,7 +1434,8 @@ static int read_partial_message(struct ceph_connection *con) /* middle */ if (m->middle) { - ret = read_partial_message_section(con, &m->middle->vec, middle_len, + ret = read_partial_message_section(con, &m->middle->vec, + middle_len, &con->in_middle_crc); if (ret <= 0) return ret; @@ -1507,14 +1515,14 @@ static void process_message(struct ceph_connection *con) /* if first message, set peer_name */ if (con->peer_name.type == 0) - con->peer_name = msg->hdr.src.name; + con->peer_name = msg->hdr.src; con->in_seq++; mutex_unlock(&con->mutex); dout("===== %p %llu from %s%lld %d=%s len %d+%d (%u %u %u) =====\n", msg, le64_to_cpu(msg->hdr.seq), - ENTITY_NAME(msg->hdr.src.name), + ENTITY_NAME(msg->hdr.src), le16_to_cpu(msg->hdr.type), ceph_msg_type_name(le16_to_cpu(msg->hdr.type)), le32_to_cpu(msg->hdr.front_len), @@ -1539,7 +1547,6 @@ static int try_write(struct ceph_connection *con) dout("try_write start %p state %lu nref %d\n", con, con->state, atomic_read(&con->nref)); - mutex_lock(&con->mutex); more: dout("try_write out_kvec_bytes %d\n", con->out_kvec_bytes); @@ -1632,7 +1639,6 @@ do_next: done: ret = 0; out: - mutex_unlock(&con->mutex); dout("try_write done on %p\n", con); return ret; } @@ -1644,7 +1650,6 @@ out: */ static int try_read(struct ceph_connection *con) { - struct ceph_messenger *msgr; int ret = -1; if (!con->sock) @@ -1654,9 +1659,6 @@ static int try_read(struct ceph_connection *con) return 0; dout("try_read start on %p\n", con); - msgr = con->msgr; - - mutex_lock(&con->mutex); more: dout("try_read tag %d in_base_pos %d\n", (int)con->in_tag, @@ -1751,7 +1753,6 @@ more: done: ret = 0; out: - mutex_unlock(&con->mutex); dout("try_read done on %p\n", con); return ret; @@ -1823,6 +1824,8 @@ more: dout("con_work %p start, clearing QUEUED\n", con); clear_bit(QUEUED, &con->state); + mutex_lock(&con->mutex); + if (test_bit(CLOSED, &con->state)) { /* e.g. if we are replaced */ dout("con_work CLOSED\n"); con_close_socket(con); @@ -1837,11 +1840,16 @@ more: if (test_and_clear_bit(SOCK_CLOSED, &con->state) || try_read(con) < 0 || try_write(con) < 0) { + mutex_unlock(&con->mutex); backoff = 1; ceph_fault(con); /* error/fault path */ + goto done_unlocked; } done: + mutex_unlock(&con->mutex); + +done_unlocked: clear_bit(BUSY, &con->state); dout("con->state=%lu\n", con->state); if (test_bit(QUEUED, &con->state)) { @@ -1913,7 +1921,7 @@ out: /* * in case we faulted due to authentication, invalidate our * current tickets so that we can get new ones. - */ + */ if (con->auth_retry && con->ops->invalidate_authorizer) { dout("calling invalidate_authorizer()\n"); con->ops->invalidate_authorizer(con); @@ -1940,7 +1948,7 @@ struct ceph_messenger *ceph_messenger_create(struct ceph_entity_addr *myaddr) /* the zero page is needed if a request is "canceled" while the message * is being written over the socket */ - msgr->zero_page = alloc_page(GFP_KERNEL | __GFP_ZERO); + msgr->zero_page = __page_cache_alloc(GFP_KERNEL | __GFP_ZERO); if (!msgr->zero_page) { kfree(msgr); return ERR_PTR(-ENOMEM); @@ -1980,12 +1988,12 @@ void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg) } /* set src+dst */ - msg->hdr.src.name = con->msgr->inst.name; - msg->hdr.src.addr = con->msgr->my_enc_addr; - msg->hdr.orig_src = msg->hdr.src; + msg->hdr.src = con->msgr->inst.name; BUG_ON(msg->front.iov_len != le32_to_cpu(msg->hdr.front_len)); + msg->needs_out_seq = true; + /* queue */ mutex_lock(&con->mutex); BUG_ON(!list_empty(&msg->list_head)); @@ -2011,20 +2019,20 @@ void ceph_con_revoke(struct ceph_connection *con, struct ceph_msg *msg) { mutex_lock(&con->mutex); if (!list_empty(&msg->list_head)) { - dout("con_revoke %p msg %p\n", con, msg); + dout("con_revoke %p msg %p - was on queue\n", con, msg); list_del_init(&msg->list_head); ceph_msg_put(msg); msg->hdr.seq = 0; - if (con->out_msg == msg) { - ceph_msg_put(con->out_msg); - con->out_msg = NULL; - } + } + if (con->out_msg == msg) { + dout("con_revoke %p msg %p - was sending\n", con, msg); + con->out_msg = NULL; if (con->out_kvec_is_msg) { con->out_skip = con->out_kvec_bytes; con->out_kvec_is_msg = false; } - } else { - dout("con_revoke %p msg %p - not queued (sent?)\n", con, msg); + ceph_msg_put(msg); + msg->hdr.seq = 0; } mutex_unlock(&con->mutex); } @@ -2074,26 +2082,29 @@ void ceph_con_keepalive(struct ceph_connection *con) * construct a new message with given type, size * the new msg has a ref count of 1. */ -struct ceph_msg *ceph_msg_new(int type, int front_len, - int page_len, int page_off, struct page **pages) +struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags) { struct ceph_msg *m; - m = kmalloc(sizeof(*m), GFP_NOFS); + m = kmalloc(sizeof(*m), flags); if (m == NULL) goto out; kref_init(&m->kref); INIT_LIST_HEAD(&m->list_head); + m->hdr.tid = 0; m->hdr.type = cpu_to_le16(type); + m->hdr.priority = cpu_to_le16(CEPH_MSG_PRIO_DEFAULT); + m->hdr.version = 0; m->hdr.front_len = cpu_to_le32(front_len); m->hdr.middle_len = 0; - m->hdr.data_len = cpu_to_le32(page_len); - m->hdr.data_off = cpu_to_le16(page_off); - m->hdr.priority = cpu_to_le16(CEPH_MSG_PRIO_DEFAULT); + m->hdr.data_len = 0; + m->hdr.data_off = 0; + m->hdr.reserved = 0; m->footer.front_crc = 0; m->footer.middle_crc = 0; m->footer.data_crc = 0; + m->footer.flags = 0; m->front_max = front_len; m->front_is_vmalloc = false; m->more_to_follow = false; @@ -2102,11 +2113,11 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, /* front */ if (front_len) { if (front_len > PAGE_CACHE_SIZE) { - m->front.iov_base = __vmalloc(front_len, GFP_NOFS, + m->front.iov_base = __vmalloc(front_len, flags, PAGE_KERNEL); m->front_is_vmalloc = true; } else { - m->front.iov_base = kmalloc(front_len, GFP_NOFS); + m->front.iov_base = kmalloc(front_len, flags); } if (m->front.iov_base == NULL) { pr_err("msg_new can't allocate %d bytes\n", @@ -2122,19 +2133,18 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, m->middle = NULL; /* data */ - m->nr_pages = calc_pages_for(page_off, page_len); - m->pages = pages; + m->nr_pages = 0; + m->pages = NULL; m->pagelist = NULL; - dout("ceph_msg_new %p page %d~%d -> %d\n", m, page_off, page_len, - m->nr_pages); + dout("ceph_msg_new %p front %d\n", m, front_len); return m; out2: ceph_msg_put(m); out: - pr_err("msg_new can't create type %d len %d\n", type, front_len); - return ERR_PTR(-ENOMEM); + pr_err("msg_new can't create type %d front %d\n", type, front_len); + return NULL; } /* @@ -2177,29 +2187,25 @@ static struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con, mutex_unlock(&con->mutex); msg = con->ops->alloc_msg(con, hdr, skip); mutex_lock(&con->mutex); - if (IS_ERR(msg)) - return msg; - - if (*skip) + if (!msg || *skip) return NULL; } if (!msg) { *skip = 0; - msg = ceph_msg_new(type, front_len, 0, 0, NULL); + msg = ceph_msg_new(type, front_len, GFP_NOFS); if (!msg) { pr_err("unable to allocate msg type %d len %d\n", type, front_len); - return ERR_PTR(-ENOMEM); + return NULL; } } memcpy(&msg->hdr, &con->in_hdr, sizeof(con->in_hdr)); - if (middle_len) { + if (middle_len && !msg->middle) { ret = ceph_alloc_middle(con, msg); - if (ret < 0) { ceph_msg_put(msg); - return msg; + return NULL; } } diff --git a/fs/ceph/messenger.h b/fs/ceph/messenger.h index a343dae73cd..76fbc957bc1 100644 --- a/fs/ceph/messenger.h +++ b/fs/ceph/messenger.h @@ -49,10 +49,8 @@ struct ceph_connection_operations { int *skip); }; -extern const char *ceph_name_type_str(int t); - /* use format string %s%d */ -#define ENTITY_NAME(n) ceph_name_type_str((n).type), le64_to_cpu((n).num) +#define ENTITY_NAME(n) ceph_entity_type_name((n).type), le64_to_cpu((n).num) struct ceph_messenger { struct ceph_entity_inst inst; /* my name+address */ @@ -86,6 +84,7 @@ struct ceph_msg { struct kref kref; bool front_is_vmalloc; bool more_to_follow; + bool needs_out_seq; int front_max; struct ceph_msgpool *pool; @@ -143,6 +142,7 @@ struct ceph_connection { struct ceph_entity_addr peer_addr; /* peer address */ struct ceph_entity_name peer_name; /* peer name */ struct ceph_entity_addr peer_addr_for_me; + unsigned peer_features; u32 connect_seq; /* identify the most recent connection attempt for this connection, client */ u32 peer_global_seq; /* peer's global seq for this connection */ @@ -157,7 +157,6 @@ struct ceph_connection { struct list_head out_queue; struct list_head out_sent; /* sending or sent but unacked */ u64 out_seq; /* last message queued for send */ - u64 out_seq_sent; /* last message sent */ bool out_keepalive_pending; u64 in_seq, in_seq_acked; /* last message received, acked */ @@ -214,6 +213,7 @@ extern int ceph_parse_ips(const char *c, const char *end, extern int ceph_msgr_init(void); extern void ceph_msgr_exit(void); +extern void ceph_msgr_flush(void); extern struct ceph_messenger *ceph_messenger_create( struct ceph_entity_addr *myaddr); @@ -233,9 +233,7 @@ extern void ceph_con_keepalive(struct ceph_connection *con); extern struct ceph_connection *ceph_con_get(struct ceph_connection *con); extern void ceph_con_put(struct ceph_connection *con); -extern struct ceph_msg *ceph_msg_new(int type, int front_len, - int page_len, int page_off, - struct page **pages); +extern struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags); extern void ceph_msg_kfree(struct ceph_msg *m); diff --git a/fs/ceph/mon_client.c b/fs/ceph/mon_client.c index 8fdc011ca95..b2a5a3e4a67 100644 --- a/fs/ceph/mon_client.c +++ b/fs/ceph/mon_client.c @@ -28,7 +28,7 @@ * resend any outstanding requests. */ -const static struct ceph_connection_operations mon_con_ops; +static const struct ceph_connection_operations mon_con_ops; static int __validate_auth(struct ceph_mon_client *monc); @@ -104,6 +104,7 @@ static void __send_prepared_auth_request(struct ceph_mon_client *monc, int len) monc->pending_auth = 1; monc->m_auth->front.iov_len = len; monc->m_auth->hdr.front_len = cpu_to_le32(len); + ceph_con_revoke(monc->con, monc->m_auth); ceph_msg_get(monc->m_auth); /* keep our ref */ ceph_con_send(monc->con, monc->m_auth); } @@ -187,16 +188,12 @@ static void __send_subscribe(struct ceph_mon_client *monc) monc->want_next_osdmap); if ((__sub_expired(monc) && !monc->sub_sent) || monc->want_next_osdmap == 1) { - struct ceph_msg *msg; + struct ceph_msg *msg = monc->m_subscribe; struct ceph_mon_subscribe_item *i; void *p, *end; - msg = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE, 96, 0, 0, NULL); - if (!msg) - return; - p = msg->front.iov_base; - end = p + msg->front.iov_len; + end = p + msg->front_max; dout("__send_subscribe to 'mdsmap' %u+\n", (unsigned)monc->have_mdsmap); @@ -226,7 +223,8 @@ static void __send_subscribe(struct ceph_mon_client *monc) msg->front.iov_len = p - msg->front.iov_base; msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); - ceph_con_send(monc->con, msg); + ceph_con_revoke(monc->con, msg); + ceph_con_send(monc->con, ceph_msg_get(msg)); monc->sub_sent = jiffies | 1; /* never 0 */ } @@ -347,20 +345,20 @@ static void ceph_monc_handle_map(struct ceph_mon_client *monc, out: mutex_unlock(&monc->mutex); - wake_up(&client->auth_wq); + wake_up_all(&client->auth_wq); } /* - * statfs + * generic requests (e.g., statfs, poolop) */ -static struct ceph_mon_statfs_request *__lookup_statfs( +static struct ceph_mon_generic_request *__lookup_generic_req( struct ceph_mon_client *monc, u64 tid) { - struct ceph_mon_statfs_request *req; - struct rb_node *n = monc->statfs_request_tree.rb_node; + struct ceph_mon_generic_request *req; + struct rb_node *n = monc->generic_request_tree.rb_node; while (n) { - req = rb_entry(n, struct ceph_mon_statfs_request, node); + req = rb_entry(n, struct ceph_mon_generic_request, node); if (tid < req->tid) n = n->rb_left; else if (tid > req->tid) @@ -371,16 +369,16 @@ static struct ceph_mon_statfs_request *__lookup_statfs( return NULL; } -static void __insert_statfs(struct ceph_mon_client *monc, - struct ceph_mon_statfs_request *new) +static void __insert_generic_request(struct ceph_mon_client *monc, + struct ceph_mon_generic_request *new) { - struct rb_node **p = &monc->statfs_request_tree.rb_node; + struct rb_node **p = &monc->generic_request_tree.rb_node; struct rb_node *parent = NULL; - struct ceph_mon_statfs_request *req = NULL; + struct ceph_mon_generic_request *req = NULL; while (*p) { parent = *p; - req = rb_entry(parent, struct ceph_mon_statfs_request, node); + req = rb_entry(parent, struct ceph_mon_generic_request, node); if (new->tid < req->tid) p = &(*p)->rb_left; else if (new->tid > req->tid) @@ -390,113 +388,290 @@ static void __insert_statfs(struct ceph_mon_client *monc, } rb_link_node(&new->node, parent, p); - rb_insert_color(&new->node, &monc->statfs_request_tree); + rb_insert_color(&new->node, &monc->generic_request_tree); +} + +static void release_generic_request(struct kref *kref) +{ + struct ceph_mon_generic_request *req = + container_of(kref, struct ceph_mon_generic_request, kref); + + if (req->reply) + ceph_msg_put(req->reply); + if (req->request) + ceph_msg_put(req->request); + + kfree(req); +} + +static void put_generic_request(struct ceph_mon_generic_request *req) +{ + kref_put(&req->kref, release_generic_request); +} + +static void get_generic_request(struct ceph_mon_generic_request *req) +{ + kref_get(&req->kref); +} + +static struct ceph_msg *get_generic_reply(struct ceph_connection *con, + struct ceph_msg_header *hdr, + int *skip) +{ + struct ceph_mon_client *monc = con->private; + struct ceph_mon_generic_request *req; + u64 tid = le64_to_cpu(hdr->tid); + struct ceph_msg *m; + + mutex_lock(&monc->mutex); + req = __lookup_generic_req(monc, tid); + if (!req) { + dout("get_generic_reply %lld dne\n", tid); + *skip = 1; + m = NULL; + } else { + dout("get_generic_reply %lld got %p\n", tid, req->reply); + m = ceph_msg_get(req->reply); + /* + * we don't need to track the connection reading into + * this reply because we only have one open connection + * at a time, ever. + */ + } + mutex_unlock(&monc->mutex); + return m; } +static int do_generic_request(struct ceph_mon_client *monc, + struct ceph_mon_generic_request *req) +{ + int err; + + /* register request */ + mutex_lock(&monc->mutex); + req->tid = ++monc->last_tid; + req->request->hdr.tid = cpu_to_le64(req->tid); + __insert_generic_request(monc, req); + monc->num_generic_requests++; + ceph_con_send(monc->con, ceph_msg_get(req->request)); + mutex_unlock(&monc->mutex); + + err = wait_for_completion_interruptible(&req->completion); + + mutex_lock(&monc->mutex); + rb_erase(&req->node, &monc->generic_request_tree); + monc->num_generic_requests--; + mutex_unlock(&monc->mutex); + + if (!err) + err = req->result; + return err; +} + +/* + * statfs + */ static void handle_statfs_reply(struct ceph_mon_client *monc, struct ceph_msg *msg) { - struct ceph_mon_statfs_request *req; + struct ceph_mon_generic_request *req; struct ceph_mon_statfs_reply *reply = msg->front.iov_base; - u64 tid; + u64 tid = le64_to_cpu(msg->hdr.tid); if (msg->front.iov_len != sizeof(*reply)) goto bad; - tid = le64_to_cpu(msg->hdr.tid); dout("handle_statfs_reply %p tid %llu\n", msg, tid); mutex_lock(&monc->mutex); - req = __lookup_statfs(monc, tid); + req = __lookup_generic_req(monc, tid); if (req) { - *req->buf = reply->st; + *(struct ceph_statfs *)req->buf = reply->st; req->result = 0; + get_generic_request(req); } mutex_unlock(&monc->mutex); - if (req) - complete(&req->completion); + if (req) { + complete_all(&req->completion); + put_generic_request(req); + } return; bad: - pr_err("corrupt statfs reply, no tid\n"); + pr_err("corrupt generic reply, tid %llu\n", tid); ceph_msg_dump(msg); } /* - * (re)send a statfs request + * Do a synchronous statfs(). */ -static int send_statfs(struct ceph_mon_client *monc, - struct ceph_mon_statfs_request *req) +int ceph_monc_do_statfs(struct ceph_mon_client *monc, struct ceph_statfs *buf) { - struct ceph_msg *msg; + struct ceph_mon_generic_request *req; struct ceph_mon_statfs *h; + int err; + + req = kzalloc(sizeof(*req), GFP_NOFS); + if (!req) + return -ENOMEM; + + kref_init(&req->kref); + req->buf = buf; + req->buf_len = sizeof(*buf); + init_completion(&req->completion); + + err = -ENOMEM; + req->request = ceph_msg_new(CEPH_MSG_STATFS, sizeof(*h), GFP_NOFS); + if (!req->request) + goto out; + req->reply = ceph_msg_new(CEPH_MSG_STATFS_REPLY, 1024, GFP_NOFS); + if (!req->reply) + goto out; - dout("send_statfs tid %llu\n", req->tid); - msg = ceph_msg_new(CEPH_MSG_STATFS, sizeof(*h), 0, 0, NULL); - if (IS_ERR(msg)) - return PTR_ERR(msg); - req->request = msg; - msg->hdr.tid = cpu_to_le64(req->tid); - h = msg->front.iov_base; + /* fill out request */ + h = req->request->front.iov_base; h->monhdr.have_version = 0; h->monhdr.session_mon = cpu_to_le16(-1); h->monhdr.session_mon_tid = 0; h->fsid = monc->monmap->fsid; - ceph_con_send(monc->con, msg); - return 0; + + err = do_generic_request(monc, req); + +out: + kref_put(&req->kref, release_generic_request); + return err; } /* - * Do a synchronous statfs(). + * pool ops */ -int ceph_monc_do_statfs(struct ceph_mon_client *monc, struct ceph_statfs *buf) +static int get_poolop_reply_buf(const char *src, size_t src_len, + char *dst, size_t dst_len) { - struct ceph_mon_statfs_request req; - int err; + u32 buf_len; - req.buf = buf; - init_completion(&req.completion); + if (src_len != sizeof(u32) + dst_len) + return -EINVAL; - /* allocate memory for reply */ - err = ceph_msgpool_resv(&monc->msgpool_statfs_reply, 1); - if (err) - return err; + buf_len = le32_to_cpu(*(u32 *)src); + if (buf_len != dst_len) + return -EINVAL; - /* register request */ - mutex_lock(&monc->mutex); - req.tid = ++monc->last_tid; - req.last_attempt = jiffies; - req.delay = BASE_DELAY_INTERVAL; - __insert_statfs(monc, &req); - monc->num_statfs_requests++; - mutex_unlock(&monc->mutex); + memcpy(dst, src + sizeof(u32), dst_len); + return 0; +} - /* send request and wait */ - err = send_statfs(monc, &req); - if (!err) - err = wait_for_completion_interruptible(&req.completion); +static void handle_poolop_reply(struct ceph_mon_client *monc, + struct ceph_msg *msg) +{ + struct ceph_mon_generic_request *req; + struct ceph_mon_poolop_reply *reply = msg->front.iov_base; + u64 tid = le64_to_cpu(msg->hdr.tid); + + if (msg->front.iov_len < sizeof(*reply)) + goto bad; + dout("handle_poolop_reply %p tid %llu\n", msg, tid); mutex_lock(&monc->mutex); - rb_erase(&req.node, &monc->statfs_request_tree); - monc->num_statfs_requests--; - ceph_msgpool_resv(&monc->msgpool_statfs_reply, -1); + req = __lookup_generic_req(monc, tid); + if (req) { + if (req->buf_len && + get_poolop_reply_buf(msg->front.iov_base + sizeof(*reply), + msg->front.iov_len - sizeof(*reply), + req->buf, req->buf_len) < 0) { + mutex_unlock(&monc->mutex); + goto bad; + } + req->result = le32_to_cpu(reply->reply_code); + get_generic_request(req); + } mutex_unlock(&monc->mutex); + if (req) { + complete(&req->completion); + put_generic_request(req); + } + return; - if (!err) - err = req.result; +bad: + pr_err("corrupt generic reply, tid %llu\n", tid); + ceph_msg_dump(msg); +} + +/* + * Do a synchronous pool op. + */ +int ceph_monc_do_poolop(struct ceph_mon_client *monc, u32 op, + u32 pool, u64 snapid, + char *buf, int len) +{ + struct ceph_mon_generic_request *req; + struct ceph_mon_poolop *h; + int err; + + req = kzalloc(sizeof(*req), GFP_NOFS); + if (!req) + return -ENOMEM; + + kref_init(&req->kref); + req->buf = buf; + req->buf_len = len; + init_completion(&req->completion); + + err = -ENOMEM; + req->request = ceph_msg_new(CEPH_MSG_POOLOP, sizeof(*h), GFP_NOFS); + if (!req->request) + goto out; + req->reply = ceph_msg_new(CEPH_MSG_POOLOP_REPLY, 1024, GFP_NOFS); + if (!req->reply) + goto out; + + /* fill out request */ + req->request->hdr.version = cpu_to_le16(2); + h = req->request->front.iov_base; + h->monhdr.have_version = 0; + h->monhdr.session_mon = cpu_to_le16(-1); + h->monhdr.session_mon_tid = 0; + h->fsid = monc->monmap->fsid; + h->pool = cpu_to_le32(pool); + h->op = cpu_to_le32(op); + h->auid = 0; + h->snapid = cpu_to_le64(snapid); + h->name_len = 0; + + err = do_generic_request(monc, req); + +out: + kref_put(&req->kref, release_generic_request); return err; } +int ceph_monc_create_snapid(struct ceph_mon_client *monc, + u32 pool, u64 *snapid) +{ + return ceph_monc_do_poolop(monc, POOL_OP_CREATE_UNMANAGED_SNAP, + pool, 0, (char *)snapid, sizeof(*snapid)); + +} + +int ceph_monc_delete_snapid(struct ceph_mon_client *monc, + u32 pool, u64 snapid) +{ + return ceph_monc_do_poolop(monc, POOL_OP_CREATE_UNMANAGED_SNAP, + pool, snapid, 0, 0); + +} + /* - * Resend pending statfs requests. + * Resend pending generic requests. */ -static void __resend_statfs(struct ceph_mon_client *monc) +static void __resend_generic_request(struct ceph_mon_client *monc) { - struct ceph_mon_statfs_request *req; + struct ceph_mon_generic_request *req; struct rb_node *p; - for (p = rb_first(&monc->statfs_request_tree); p; p = rb_next(p)) { - req = rb_entry(p, struct ceph_mon_statfs_request, node); - send_statfs(monc, req); + for (p = rb_first(&monc->generic_request_tree); p; p = rb_next(p)) { + req = rb_entry(p, struct ceph_mon_generic_request, node); + ceph_con_revoke(monc->con, req->request); + ceph_con_send(monc->con, ceph_msg_get(req->request)); } } @@ -586,26 +761,26 @@ int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl) CEPH_ENTITY_TYPE_AUTH | CEPH_ENTITY_TYPE_MON | CEPH_ENTITY_TYPE_OSD | CEPH_ENTITY_TYPE_MDS; - /* msg pools */ - err = ceph_msgpool_init(&monc->msgpool_subscribe_ack, - sizeof(struct ceph_mon_subscribe_ack), 1, false); - if (err < 0) + /* msgs */ + err = -ENOMEM; + monc->m_subscribe_ack = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE_ACK, + sizeof(struct ceph_mon_subscribe_ack), + GFP_NOFS); + if (!monc->m_subscribe_ack) goto out_monmap; - err = ceph_msgpool_init(&monc->msgpool_statfs_reply, - sizeof(struct ceph_mon_statfs_reply), 0, false); - if (err < 0) - goto out_pool1; - err = ceph_msgpool_init(&monc->msgpool_auth_reply, 4096, 1, false); - if (err < 0) - goto out_pool2; - - monc->m_auth = ceph_msg_new(CEPH_MSG_AUTH, 4096, 0, 0, NULL); + + monc->m_subscribe = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE, 96, GFP_NOFS); + if (!monc->m_subscribe) + goto out_subscribe_ack; + + monc->m_auth_reply = ceph_msg_new(CEPH_MSG_AUTH_REPLY, 4096, GFP_NOFS); + if (!monc->m_auth_reply) + goto out_subscribe; + + monc->m_auth = ceph_msg_new(CEPH_MSG_AUTH, 4096, GFP_NOFS); monc->pending_auth = 0; - if (IS_ERR(monc->m_auth)) { - err = PTR_ERR(monc->m_auth); - monc->m_auth = NULL; - goto out_pool3; - } + if (!monc->m_auth) + goto out_auth_reply; monc->cur_mon = -1; monc->hunting = true; @@ -613,8 +788,8 @@ int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl) monc->sub_sent = 0; INIT_DELAYED_WORK(&monc->delayed_work, delayed_work); - monc->statfs_request_tree = RB_ROOT; - monc->num_statfs_requests = 0; + monc->generic_request_tree = RB_ROOT; + monc->num_generic_requests = 0; monc->last_tid = 0; monc->have_mdsmap = 0; @@ -622,12 +797,12 @@ int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl) monc->want_next_osdmap = 1; return 0; -out_pool3: - ceph_msgpool_destroy(&monc->msgpool_auth_reply); -out_pool2: - ceph_msgpool_destroy(&monc->msgpool_subscribe_ack); -out_pool1: - ceph_msgpool_destroy(&monc->msgpool_statfs_reply); +out_auth_reply: + ceph_msg_put(monc->m_auth_reply); +out_subscribe: + ceph_msg_put(monc->m_subscribe); +out_subscribe_ack: + ceph_msg_put(monc->m_subscribe_ack); out_monmap: kfree(monc->monmap); out: @@ -651,9 +826,9 @@ void ceph_monc_stop(struct ceph_mon_client *monc) ceph_auth_destroy(monc->auth); ceph_msg_put(monc->m_auth); - ceph_msgpool_destroy(&monc->msgpool_subscribe_ack); - ceph_msgpool_destroy(&monc->msgpool_statfs_reply); - ceph_msgpool_destroy(&monc->msgpool_auth_reply); + ceph_msg_put(monc->m_auth_reply); + ceph_msg_put(monc->m_subscribe); + ceph_msg_put(monc->m_subscribe_ack); kfree(monc->monmap); } @@ -662,8 +837,11 @@ static void handle_auth_reply(struct ceph_mon_client *monc, struct ceph_msg *msg) { int ret; + int was_auth = 0; mutex_lock(&monc->mutex); + if (monc->auth->ops) + was_auth = monc->auth->ops->is_authenticated(monc->auth); monc->pending_auth = 0; ret = ceph_handle_auth_reply(monc->auth, msg->front.iov_base, msg->front.iov_len, @@ -671,17 +849,18 @@ static void handle_auth_reply(struct ceph_mon_client *monc, monc->m_auth->front_max); if (ret < 0) { monc->client->auth_err = ret; - wake_up(&monc->client->auth_wq); + wake_up_all(&monc->client->auth_wq); } else if (ret > 0) { __send_prepared_auth_request(monc, ret); - } else if (monc->auth->ops->is_authenticated(monc->auth)) { + } else if (!was_auth && monc->auth->ops->is_authenticated(monc->auth)) { dout("authenticated, starting session\n"); monc->client->msgr->inst.name.type = CEPH_ENTITY_TYPE_CLIENT; - monc->client->msgr->inst.name.num = monc->auth->global_id; + monc->client->msgr->inst.name.num = + cpu_to_le64(monc->auth->global_id); __send_subscribe(monc); - __resend_statfs(monc); + __resend_generic_request(monc); } mutex_unlock(&monc->mutex); } @@ -735,6 +914,10 @@ static void dispatch(struct ceph_connection *con, struct ceph_msg *msg) handle_statfs_reply(monc, msg); break; + case CEPH_MSG_POOLOP_REPLY: + handle_poolop_reply(monc, msg); + break; + case CEPH_MSG_MON_MAP: ceph_monc_handle_map(monc, msg); break; @@ -770,18 +953,18 @@ static struct ceph_msg *mon_alloc_msg(struct ceph_connection *con, switch (type) { case CEPH_MSG_MON_SUBSCRIBE_ACK: - m = ceph_msgpool_get(&monc->msgpool_subscribe_ack, front_len); + m = ceph_msg_get(monc->m_subscribe_ack); break; + case CEPH_MSG_POOLOP_REPLY: case CEPH_MSG_STATFS_REPLY: - m = ceph_msgpool_get(&monc->msgpool_statfs_reply, front_len); - break; + return get_generic_reply(con, hdr, skip); case CEPH_MSG_AUTH_REPLY: - m = ceph_msgpool_get(&monc->msgpool_auth_reply, front_len); + m = ceph_msg_get(monc->m_auth_reply); break; case CEPH_MSG_MON_MAP: case CEPH_MSG_MDS_MAP: case CEPH_MSG_OSD_MAP: - m = ceph_msg_new(type, front_len, 0, 0, NULL); + m = ceph_msg_new(type, front_len, GFP_NOFS); break; } @@ -826,7 +1009,7 @@ out: mutex_unlock(&monc->mutex); } -const static struct ceph_connection_operations mon_con_ops = { +static const struct ceph_connection_operations mon_con_ops = { .get = ceph_con_get, .put = ceph_con_put, .dispatch = dispatch, diff --git a/fs/ceph/mon_client.h b/fs/ceph/mon_client.h index b958ad5afa0..8e396f2c096 100644 --- a/fs/ceph/mon_client.h +++ b/fs/ceph/mon_client.h @@ -2,10 +2,10 @@ #define _FS_CEPH_MON_CLIENT_H #include <linux/completion.h> +#include <linux/kref.h> #include <linux/rbtree.h> #include "messenger.h" -#include "msgpool.h" struct ceph_client; struct ceph_mount_args; @@ -22,7 +22,7 @@ struct ceph_monmap { }; struct ceph_mon_client; -struct ceph_mon_statfs_request; +struct ceph_mon_generic_request; /* @@ -40,17 +40,20 @@ struct ceph_mon_request { }; /* - * statfs() is done a bit differently because we need to get data back + * ceph_mon_generic_request is being used for the statfs and poolop requests + * which are bening done a bit differently because we need to get data back * to the caller */ -struct ceph_mon_statfs_request { +struct ceph_mon_generic_request { + struct kref kref; u64 tid; struct rb_node node; int result; - struct ceph_statfs *buf; + void *buf; + int buf_len; struct completion completion; - unsigned long last_attempt, delay; /* jiffies */ struct ceph_msg *request; /* original request */ + struct ceph_msg *reply; /* and reply */ }; struct ceph_mon_client { @@ -61,7 +64,7 @@ struct ceph_mon_client { struct delayed_work delayed_work; struct ceph_auth_client *auth; - struct ceph_msg *m_auth; + struct ceph_msg *m_auth, *m_auth_reply, *m_subscribe, *m_subscribe_ack; int pending_auth; bool hunting; @@ -70,14 +73,9 @@ struct ceph_mon_client { struct ceph_connection *con; bool have_fsid; - /* msg pools */ - struct ceph_msgpool msgpool_subscribe_ack; - struct ceph_msgpool msgpool_statfs_reply; - struct ceph_msgpool msgpool_auth_reply; - - /* pending statfs requests */ - struct rb_root statfs_request_tree; - int num_statfs_requests; + /* pending generic requests */ + struct rb_root generic_request_tree; + int num_generic_requests; u64 last_tid; /* mds/osd map */ @@ -114,6 +112,10 @@ extern int ceph_monc_open_session(struct ceph_mon_client *monc); extern int ceph_monc_validate_auth(struct ceph_mon_client *monc); +extern int ceph_monc_create_snapid(struct ceph_mon_client *monc, + u32 pool, u64 *snapid); +extern int ceph_monc_delete_snapid(struct ceph_mon_client *monc, + u32 pool, u64 snapid); #endif diff --git a/fs/ceph/msgpool.c b/fs/ceph/msgpool.c index ca3b44a89f2..dd65a643813 100644 --- a/fs/ceph/msgpool.c +++ b/fs/ceph/msgpool.c @@ -7,180 +7,58 @@ #include "msgpool.h" -/* - * We use msg pools to preallocate memory for messages we expect to - * receive over the wire, to avoid getting ourselves into OOM - * conditions at unexpected times. We take use a few different - * strategies: - * - * - for request/response type interactions, we preallocate the - * memory needed for the response when we generate the request. - * - * - for messages we can receive at any time from the MDS, we preallocate - * a pool of messages we can re-use. - * - * - for writeback, we preallocate some number of messages to use for - * requests and their replies, so that we always make forward - * progress. - * - * The msgpool behaves like a mempool_t, but keeps preallocated - * ceph_msgs strung together on a list_head instead of using a pointer - * vector. This avoids vector reallocation when we adjust the number - * of preallocated items (which happens frequently). - */ +static void *alloc_fn(gfp_t gfp_mask, void *arg) +{ + struct ceph_msgpool *pool = arg; + void *p; + p = ceph_msg_new(0, pool->front_len, gfp_mask); + if (!p) + pr_err("msgpool %s alloc failed\n", pool->name); + return p; +} -/* - * Allocate or release as necessary to meet our target pool size. - */ -static int __fill_msgpool(struct ceph_msgpool *pool) +static void free_fn(void *element, void *arg) { - struct ceph_msg *msg; - - while (pool->num < pool->min) { - dout("fill_msgpool %p %d/%d allocating\n", pool, pool->num, - pool->min); - spin_unlock(&pool->lock); - msg = ceph_msg_new(0, pool->front_len, 0, 0, NULL); - spin_lock(&pool->lock); - if (IS_ERR(msg)) - return PTR_ERR(msg); - msg->pool = pool; - list_add(&msg->list_head, &pool->msgs); - pool->num++; - } - while (pool->num > pool->min) { - msg = list_first_entry(&pool->msgs, struct ceph_msg, list_head); - dout("fill_msgpool %p %d/%d releasing %p\n", pool, pool->num, - pool->min, msg); - list_del_init(&msg->list_head); - pool->num--; - ceph_msg_kfree(msg); - } - return 0; + ceph_msg_put(element); } int ceph_msgpool_init(struct ceph_msgpool *pool, - int front_len, int min, bool blocking) + int front_len, int size, bool blocking, const char *name) { - int ret; - - dout("msgpool_init %p front_len %d min %d\n", pool, front_len, min); - spin_lock_init(&pool->lock); pool->front_len = front_len; - INIT_LIST_HEAD(&pool->msgs); - pool->num = 0; - pool->min = min; - pool->blocking = blocking; - init_waitqueue_head(&pool->wait); - - spin_lock(&pool->lock); - ret = __fill_msgpool(pool); - spin_unlock(&pool->lock); - return ret; + pool->pool = mempool_create(size, alloc_fn, free_fn, pool); + if (!pool->pool) + return -ENOMEM; + pool->name = name; + return 0; } void ceph_msgpool_destroy(struct ceph_msgpool *pool) { - dout("msgpool_destroy %p\n", pool); - spin_lock(&pool->lock); - pool->min = 0; - __fill_msgpool(pool); - spin_unlock(&pool->lock); + mempool_destroy(pool->pool); } -int ceph_msgpool_resv(struct ceph_msgpool *pool, int delta) +struct ceph_msg *ceph_msgpool_get(struct ceph_msgpool *pool, + int front_len) { - int ret; - - spin_lock(&pool->lock); - dout("msgpool_resv %p delta %d\n", pool, delta); - pool->min += delta; - ret = __fill_msgpool(pool); - spin_unlock(&pool->lock); - return ret; -} - -struct ceph_msg *ceph_msgpool_get(struct ceph_msgpool *pool, int front_len) -{ - wait_queue_t wait; - struct ceph_msg *msg; - - if (front_len && front_len > pool->front_len) { - pr_err("msgpool_get pool %p need front %d, pool size is %d\n", - pool, front_len, pool->front_len); + if (front_len > pool->front_len) { + pr_err("msgpool_get pool %s need front %d, pool size is %d\n", + pool->name, front_len, pool->front_len); WARN_ON(1); /* try to alloc a fresh message */ - msg = ceph_msg_new(0, front_len, 0, 0, NULL); - if (!IS_ERR(msg)) - return msg; - } - - if (!front_len) - front_len = pool->front_len; - - if (pool->blocking) { - /* mempool_t behavior; first try to alloc */ - msg = ceph_msg_new(0, front_len, 0, 0, NULL); - if (!IS_ERR(msg)) - return msg; + return ceph_msg_new(0, front_len, GFP_NOFS); } - while (1) { - spin_lock(&pool->lock); - if (likely(pool->num)) { - msg = list_entry(pool->msgs.next, struct ceph_msg, - list_head); - list_del_init(&msg->list_head); - pool->num--; - dout("msgpool_get %p got %p, now %d/%d\n", pool, msg, - pool->num, pool->min); - spin_unlock(&pool->lock); - return msg; - } - pr_err("msgpool_get %p now %d/%d, %s\n", pool, pool->num, - pool->min, pool->blocking ? "waiting" : "may fail"); - spin_unlock(&pool->lock); - - if (!pool->blocking) { - WARN_ON(1); - - /* maybe we can allocate it now? */ - msg = ceph_msg_new(0, front_len, 0, 0, NULL); - if (!IS_ERR(msg)) - return msg; - - pr_err("msgpool_get %p empty + alloc failed\n", pool); - return ERR_PTR(-ENOMEM); - } - - init_wait(&wait); - prepare_to_wait(&pool->wait, &wait, TASK_UNINTERRUPTIBLE); - schedule(); - finish_wait(&pool->wait, &wait); - } + return mempool_alloc(pool->pool, GFP_NOFS); } void ceph_msgpool_put(struct ceph_msgpool *pool, struct ceph_msg *msg) { - spin_lock(&pool->lock); - if (pool->num < pool->min) { - /* reset msg front_len; user may have changed it */ - msg->front.iov_len = pool->front_len; - msg->hdr.front_len = cpu_to_le32(pool->front_len); + /* reset msg front_len; user may have changed it */ + msg->front.iov_len = pool->front_len; + msg->hdr.front_len = cpu_to_le32(pool->front_len); - kref_set(&msg->kref, 1); /* retake a single ref */ - list_add(&msg->list_head, &pool->msgs); - pool->num++; - dout("msgpool_put %p reclaim %p, now %d/%d\n", pool, msg, - pool->num, pool->min); - spin_unlock(&pool->lock); - wake_up(&pool->wait); - } else { - dout("msgpool_put %p drop %p, at %d/%d\n", pool, msg, - pool->num, pool->min); - spin_unlock(&pool->lock); - ceph_msg_kfree(msg); - } + kref_init(&msg->kref); /* retake single ref */ } diff --git a/fs/ceph/msgpool.h b/fs/ceph/msgpool.h index bc834bfcd72..a362605f936 100644 --- a/fs/ceph/msgpool.h +++ b/fs/ceph/msgpool.h @@ -1,6 +1,7 @@ #ifndef _FS_CEPH_MSGPOOL #define _FS_CEPH_MSGPOOL +#include <linux/mempool.h> #include "messenger.h" /* @@ -8,18 +9,15 @@ * avoid unexpected OOM conditions. */ struct ceph_msgpool { - spinlock_t lock; + const char *name; + mempool_t *pool; int front_len; /* preallocated payload size */ - struct list_head msgs; /* msgs in the pool; each has 1 ref */ - int num, min; /* cur, min # msgs in the pool */ - bool blocking; - wait_queue_head_t wait; }; extern int ceph_msgpool_init(struct ceph_msgpool *pool, - int front_len, int size, bool blocking); + int front_len, int size, bool blocking, + const char *name); extern void ceph_msgpool_destroy(struct ceph_msgpool *pool); -extern int ceph_msgpool_resv(struct ceph_msgpool *, int delta); extern struct ceph_msg *ceph_msgpool_get(struct ceph_msgpool *, int front_len); extern void ceph_msgpool_put(struct ceph_msgpool *, struct ceph_msg *); diff --git a/fs/ceph/msgr.h b/fs/ceph/msgr.h index 8aaab414f3f..680d3d648ca 100644 --- a/fs/ceph/msgr.h +++ b/fs/ceph/msgr.h @@ -1,5 +1,5 @@ -#ifndef __MSGR_H -#define __MSGR_H +#ifndef CEPH_MSGR_H +#define CEPH_MSGR_H /* * Data types for message passing layer used by Ceph. @@ -50,7 +50,6 @@ struct ceph_entity_name { #define CEPH_ENTITY_TYPE_MDS 0x02 #define CEPH_ENTITY_TYPE_OSD 0x04 #define CEPH_ENTITY_TYPE_CLIENT 0x08 -#define CEPH_ENTITY_TYPE_ADMIN 0x10 #define CEPH_ENTITY_TYPE_AUTH 0x20 #define CEPH_ENTITY_TYPE_ANY 0xFF @@ -120,7 +119,7 @@ struct ceph_msg_connect_reply { /* * message header */ -struct ceph_msg_header { +struct ceph_msg_header_old { __le64 seq; /* message seq# for this session */ __le64 tid; /* transaction id */ __le16 type; /* message type */ @@ -138,6 +137,24 @@ struct ceph_msg_header { __le32 crc; /* header crc32c */ } __attribute__ ((packed)); +struct ceph_msg_header { + __le64 seq; /* message seq# for this session */ + __le64 tid; /* transaction id */ + __le16 type; /* message type */ + __le16 priority; /* priority. higher value == higher priority */ + __le16 version; /* version of message encoding */ + + __le32 front_len; /* bytes in main payload */ + __le32 middle_len;/* bytes in middle payload */ + __le32 data_len; /* bytes of data payload */ + __le16 data_off; /* sender: include full offset; + receiver: mask against ~PAGE_MASK */ + + struct ceph_entity_name src; + __le32 reserved; + __le32 crc; /* header crc32c */ +} __attribute__ ((packed)); + #define CEPH_MSG_PRIO_LOW 64 #define CEPH_MSG_PRIO_DEFAULT 127 #define CEPH_MSG_PRIO_HIGH 196 diff --git a/fs/ceph/osd_client.c b/fs/ceph/osd_client.c index c7b4dedaace..bed6391e52c 100644 --- a/fs/ceph/osd_client.c +++ b/fs/ceph/osd_client.c @@ -16,7 +16,7 @@ #define OSD_OP_FRONT_LEN 4096 #define OSD_OPREPLY_FRONT_LEN 512 -const static struct ceph_connection_operations osd_con_ops; +static const struct ceph_connection_operations osd_con_ops; static int __kick_requests(struct ceph_osd_client *osdc, struct ceph_osd *kickosd); @@ -147,7 +147,7 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, req = kzalloc(sizeof(*req), GFP_NOFS); } if (req == NULL) - return ERR_PTR(-ENOMEM); + return NULL; req->r_osdc = osdc; req->r_mempool = use_mempool; @@ -164,10 +164,10 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0); else msg = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, - OSD_OPREPLY_FRONT_LEN, 0, 0, NULL); - if (IS_ERR(msg)) { + OSD_OPREPLY_FRONT_LEN, GFP_NOFS); + if (!msg) { ceph_osdc_put_request(req); - return ERR_PTR(PTR_ERR(msg)); + return NULL; } req->r_reply = msg; @@ -178,10 +178,10 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, if (use_mempool) msg = ceph_msgpool_get(&osdc->msgpool_op, 0); else - msg = ceph_msg_new(CEPH_MSG_OSD_OP, msg_size, 0, 0, NULL); - if (IS_ERR(msg)) { + msg = ceph_msg_new(CEPH_MSG_OSD_OP, msg_size, GFP_NOFS); + if (!msg) { ceph_osdc_put_request(req); - return ERR_PTR(PTR_ERR(msg)); + return NULL; } msg->hdr.type = cpu_to_le16(CEPH_MSG_OSD_OP); memset(msg->front.iov_base, 0, msg->front.iov_len); @@ -361,8 +361,13 @@ static void put_osd(struct ceph_osd *osd) { dout("put_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref), atomic_read(&osd->o_ref) - 1); - if (atomic_dec_and_test(&osd->o_ref)) + if (atomic_dec_and_test(&osd->o_ref)) { + struct ceph_auth_client *ac = osd->o_osdc->client->monc.auth; + + if (osd->o_authorizer) + ac->ops->destroy_authorizer(ac, osd->o_authorizer); kfree(osd); + } } /* @@ -565,7 +570,8 @@ static int __map_osds(struct ceph_osd_client *osdc, { struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base; struct ceph_pg pgid; - int o = -1; + int acting[CEPH_PG_MAX_SIZE]; + int o = -1, num = 0; int err; dout("map_osds %p tid %lld\n", req, req->r_tid); @@ -576,10 +582,16 @@ static int __map_osds(struct ceph_osd_client *osdc, pgid = reqhead->layout.ol_pgid; req->r_pgid = pgid; - o = ceph_calc_pg_primary(osdc->osdmap, pgid); + err = ceph_calc_pg_acting(osdc->osdmap, pgid, acting); + if (err > 0) { + o = acting[0]; + num = err; + } if ((req->r_osd && req->r_osd->o_osd == o && - req->r_sent >= req->r_osd->o_incarnation) || + req->r_sent >= req->r_osd->o_incarnation && + req->r_num_pg_osds == num && + memcmp(req->r_pg_osds, acting, sizeof(acting[0])*num) == 0) || (req->r_osd == NULL && o == -1)) return 0; /* no change */ @@ -587,6 +599,10 @@ static int __map_osds(struct ceph_osd_client *osdc, req->r_tid, le32_to_cpu(pgid.pool), le16_to_cpu(pgid.ps), o, req->r_osd ? req->r_osd->o_osd : -1); + /* record full pg acting set */ + memcpy(req->r_pg_osds, acting, sizeof(acting[0]) * num); + req->r_num_pg_osds = num; + if (req->r_osd) { __cancel_request(req); list_del_init(&req->r_osd_item); @@ -612,7 +628,7 @@ static int __map_osds(struct ceph_osd_client *osdc, __remove_osd_from_lru(req->r_osd); list_add(&req->r_osd_item, &req->r_osd->o_requests); } - err = 1; /* osd changed */ + err = 1; /* osd or pg changed */ out: return err; @@ -704,7 +720,7 @@ static void handle_timeout(struct work_struct *work) * should mark the osd as failed and we should find out about * it from an updated osd map. */ - while (!list_empty(&osdc->req_lru)) { + while (timeout && !list_empty(&osdc->req_lru)) { req = list_entry(osdc->req_lru.next, struct ceph_osd_request, r_req_lru_item); @@ -779,16 +795,18 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg, struct ceph_osd_request *req; u64 tid; int numops, object_len, flags; + s32 result; tid = le64_to_cpu(msg->hdr.tid); if (msg->front.iov_len < sizeof(*rhead)) goto bad; numops = le32_to_cpu(rhead->num_ops); object_len = le32_to_cpu(rhead->object_len); + result = le32_to_cpu(rhead->result); if (msg->front.iov_len != sizeof(*rhead) + object_len + numops * sizeof(struct ceph_osd_op)) goto bad; - dout("handle_reply %p tid %llu\n", msg, tid); + dout("handle_reply %p tid %llu result %d\n", msg, tid, (int)result); /* lookup */ mutex_lock(&osdc->request_mutex); @@ -834,7 +852,8 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg, dout("handle_reply tid %llu flags %d\n", tid, flags); /* either this is a read, or we got the safe response */ - if ((flags & CEPH_OSD_FLAG_ONDISK) || + if (result < 0 || + (flags & CEPH_OSD_FLAG_ONDISK) || ((flags & CEPH_OSD_FLAG_WRITE) == 0)) __unregister_request(osdc, req); @@ -843,12 +862,12 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg, if (req->r_callback) req->r_callback(req, msg); else - complete(&req->r_completion); + complete_all(&req->r_completion); if (flags & CEPH_OSD_FLAG_ONDISK) { if (req->r_safe_callback) req->r_safe_callback(req, msg); - complete(&req->r_safe_completion); /* fsync waiter */ + complete_all(&req->r_safe_completion); /* fsync waiter */ } done: @@ -1064,6 +1083,7 @@ done: if (newmap) kick_requests(osdc, NULL); up_read(&osdc->map_sem); + wake_up_all(&osdc->client->auth_wq); return; bad: @@ -1073,45 +1093,6 @@ bad: return; } - -/* - * A read request prepares specific pages that data is to be read into. - * When a message is being read off the wire, we call prepare_pages to - * find those pages. - * 0 = success, -1 failure. - */ -static int __prepare_pages(struct ceph_connection *con, - struct ceph_msg_header *hdr, - struct ceph_osd_request *req, - u64 tid, - struct ceph_msg *m) -{ - struct ceph_osd *osd = con->private; - struct ceph_osd_client *osdc; - int ret = -1; - int data_len = le32_to_cpu(hdr->data_len); - unsigned data_off = le16_to_cpu(hdr->data_off); - - int want = calc_pages_for(data_off & ~PAGE_MASK, data_len); - - if (!osd) - return -1; - - osdc = osd->o_osdc; - - dout("__prepare_pages on msg %p tid %llu, has %d pages, want %d\n", m, - tid, req->r_num_pages, want); - if (unlikely(req->r_num_pages < want)) - goto out; - m->pages = req->r_pages; - m->nr_pages = req->r_num_pages; - ret = 0; /* success */ -out: - BUG_ON(ret < 0 || m->nr_pages < want); - - return ret; -} - /* * Register request, send initial attempt. */ @@ -1238,11 +1219,13 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client) if (!osdc->req_mempool) goto out; - err = ceph_msgpool_init(&osdc->msgpool_op, OSD_OP_FRONT_LEN, 10, true); + err = ceph_msgpool_init(&osdc->msgpool_op, OSD_OP_FRONT_LEN, 10, true, + "osd_op"); if (err < 0) goto out_mempool; err = ceph_msgpool_init(&osdc->msgpool_op_reply, - OSD_OPREPLY_FRONT_LEN, 10, true); + OSD_OPREPLY_FRONT_LEN, 10, true, + "osd_op_reply"); if (err < 0) goto out_msgpool; return 0; @@ -1288,13 +1271,11 @@ int ceph_osdc_readpages(struct ceph_osd_client *osdc, CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ, NULL, 0, truncate_seq, truncate_size, NULL, false, 1); - if (IS_ERR(req)) - return PTR_ERR(req); + if (!req) + return -ENOMEM; /* it may be a short read due to an object boundary */ req->r_pages = pages; - num_pages = calc_pages_for(off, *plen); - req->r_num_pages = num_pages; dout("readpages final extent is %llu~%llu (%d pages)\n", off, *plen, req->r_num_pages); @@ -1331,12 +1312,11 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino, snapc, do_sync, truncate_seq, truncate_size, mtime, nofail, 1); - if (IS_ERR(req)) - return PTR_ERR(req); + if (!req) + return -ENOMEM; /* it may be a short write due to an object boundary */ req->r_pages = pages; - req->r_num_pages = calc_pages_for(off, len); dout("writepages %llu~%llu (%d pages)\n", off, len, req->r_num_pages); @@ -1361,7 +1341,7 @@ static void dispatch(struct ceph_connection *con, struct ceph_msg *msg) int type = le16_to_cpu(msg->hdr.type); if (!osd) - return; + goto out; osdc = osd->o_osdc; switch (type) { @@ -1376,11 +1356,13 @@ static void dispatch(struct ceph_connection *con, struct ceph_msg *msg) pr_err("received unknown message type %d %s\n", type, ceph_msg_type_name(type)); } +out: ceph_msg_put(msg); } /* - * lookup and return message for incoming reply + * lookup and return message for incoming reply. set up reply message + * pages. */ static struct ceph_msg *get_reply(struct ceph_connection *con, struct ceph_msg_header *hdr, @@ -1393,7 +1375,6 @@ static struct ceph_msg *get_reply(struct ceph_connection *con, int front = le32_to_cpu(hdr->front_len); int data_len = le32_to_cpu(hdr->data_len); u64 tid; - int err; tid = le64_to_cpu(hdr->tid); mutex_lock(&osdc->request_mutex); @@ -1411,13 +1392,14 @@ static struct ceph_msg *get_reply(struct ceph_connection *con, req->r_reply, req->r_con_filling_msg); ceph_con_revoke_message(req->r_con_filling_msg, req->r_reply); ceph_con_put(req->r_con_filling_msg); + req->r_con_filling_msg = NULL; } if (front > req->r_reply->front.iov_len) { pr_warning("get_reply front %d > preallocated %d\n", front, (int)req->r_reply->front.iov_len); - m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front, 0, 0, NULL); - if (IS_ERR(m)) + m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front, GFP_NOFS); + if (!m) goto out; ceph_msg_put(req->r_reply); req->r_reply = m; @@ -1425,12 +1407,19 @@ static struct ceph_msg *get_reply(struct ceph_connection *con, m = ceph_msg_get(req->r_reply); if (data_len > 0) { - err = __prepare_pages(con, hdr, req, tid, m); - if (err < 0) { + unsigned data_off = le16_to_cpu(hdr->data_off); + int want = calc_pages_for(data_off & ~PAGE_MASK, data_len); + + if (unlikely(req->r_num_pages < want)) { + pr_warning("tid %lld reply %d > expected %d pages\n", + tid, want, m->nr_pages); *skip = 1; ceph_msg_put(m); - m = ERR_PTR(err); + m = NULL; + goto out; } + m->pages = req->r_pages; + m->nr_pages = req->r_num_pages; } *skip = 0; req->r_con_filling_msg = ceph_con_get(con); @@ -1452,7 +1441,7 @@ static struct ceph_msg *alloc_msg(struct ceph_connection *con, switch (type) { case CEPH_MSG_OSD_MAP: - return ceph_msg_new(type, front, 0, 0, NULL); + return ceph_msg_new(type, front, GFP_NOFS); case CEPH_MSG_OSD_OPREPLY: return get_reply(con, hdr, skip); default: @@ -1484,8 +1473,8 @@ static void put_osd_con(struct ceph_connection *con) * authentication */ static int get_authorizer(struct ceph_connection *con, - void **buf, int *len, int *proto, - void **reply_buf, int *reply_len, int force_new) + void **buf, int *len, int *proto, + void **reply_buf, int *reply_len, int force_new) { struct ceph_osd *o = con->private; struct ceph_osd_client *osdc = o->o_osdc; @@ -1505,7 +1494,7 @@ static int get_authorizer(struct ceph_connection *con, &o->o_authorizer_reply_buf, &o->o_authorizer_reply_buf_len); if (ret) - return ret; + return ret; } *proto = ac->protocol; @@ -1538,7 +1527,7 @@ static int invalidate_authorizer(struct ceph_connection *con) return ceph_monc_validate_auth(&osdc->client->monc); } -const static struct ceph_connection_operations osd_con_ops = { +static const struct ceph_connection_operations osd_con_ops = { .get = get_osd_con, .put = put_osd_con, .dispatch = dispatch, diff --git a/fs/ceph/osd_client.h b/fs/ceph/osd_client.h index b0759911e7c..ce776989ef6 100644 --- a/fs/ceph/osd_client.h +++ b/fs/ceph/osd_client.h @@ -48,6 +48,8 @@ struct ceph_osd_request { struct list_head r_osd_item; struct ceph_osd *r_osd; struct ceph_pg r_pgid; + int r_pg_osds[CEPH_PG_MAX_SIZE]; + int r_num_pg_osds; struct ceph_connection *r_con_filling_msg; @@ -66,7 +68,6 @@ struct ceph_osd_request { struct list_head r_unsafe_item; struct inode *r_inode; /* for use by callbacks */ - struct writeback_control *r_wbc; /* ditto */ char r_oid[40]; /* object name */ int r_oid_len; diff --git a/fs/ceph/osdmap.c b/fs/ceph/osdmap.c index 2e2c15eed82..e31f118f139 100644 --- a/fs/ceph/osdmap.c +++ b/fs/ceph/osdmap.c @@ -424,12 +424,30 @@ static void __remove_pg_pool(struct rb_root *root, struct ceph_pg_pool_info *pi) kfree(pi); } -void __decode_pool(void **p, struct ceph_pg_pool_info *pi) +static int __decode_pool(void **p, void *end, struct ceph_pg_pool_info *pi) { + unsigned n, m; + ceph_decode_copy(p, &pi->v, sizeof(pi->v)); calc_pg_masks(pi); - *p += le32_to_cpu(pi->v.num_snaps) * sizeof(u64); + + /* num_snaps * snap_info_t */ + n = le32_to_cpu(pi->v.num_snaps); + while (n--) { + ceph_decode_need(p, end, sizeof(u64) + 1 + sizeof(u64) + + sizeof(struct ceph_timespec), bad); + *p += sizeof(u64) + /* key */ + 1 + sizeof(u64) + /* u8, snapid */ + sizeof(struct ceph_timespec); + m = ceph_decode_32(p); /* snap name */ + *p += m; + } + *p += le32_to_cpu(pi->v.num_removed_snap_intervals) * sizeof(u64) * 2; + return 0; + +bad: + return -EINVAL; } static int __decode_pool_names(void **p, void *end, struct ceph_osdmap *map) @@ -568,9 +586,12 @@ struct ceph_osdmap *osdmap_decode(void **p, void *end) if (ev > CEPH_PG_POOL_VERSION) { pr_warning("got unknown v %d > %d of ceph_pg_pool\n", ev, CEPH_PG_POOL_VERSION); + kfree(pi); goto bad; } - __decode_pool(p, pi); + err = __decode_pool(p, end, pi); + if (err < 0) + goto bad; __insert_pg_pool(&map->pg_pools, pi); } @@ -706,7 +727,8 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end, len, *p, end); newcrush = crush_decode(*p, min(*p+len, end)); if (IS_ERR(newcrush)) - return ERR_PTR(PTR_ERR(newcrush)); + return ERR_CAST(newcrush); + *p += len; } /* new flags? */ @@ -758,7 +780,9 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end, pi->id = pool; __insert_pg_pool(&map->pg_pools, pi); } - __decode_pool(p, pi); + err = __decode_pool(p, end, pi); + if (err < 0) + goto bad; } if (version >= 5 && __decode_pool_names(p, end, map) < 0) goto bad; @@ -829,12 +853,13 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end, /* remove any? */ while (rbp && pgid_cmp(rb_entry(rbp, struct ceph_pg_mapping, node)->pgid, pgid) <= 0) { - struct rb_node *cur = rbp; + struct ceph_pg_mapping *cur = + rb_entry(rbp, struct ceph_pg_mapping, node); + rbp = rb_next(rbp); - dout(" removed pg_temp %llx\n", - *(u64 *)&rb_entry(cur, struct ceph_pg_mapping, - node)->pgid); - rb_erase(cur, &map->pg_temp); + dout(" removed pg_temp %llx\n", *(u64 *)&cur->pgid); + rb_erase(&cur->node, &map->pg_temp); + kfree(cur); } if (pglen) { @@ -850,19 +875,22 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end, for (j = 0; j < pglen; j++) pg->osds[j] = ceph_decode_32(p); err = __insert_pg_mapping(pg, &map->pg_temp); - if (err) + if (err) { + kfree(pg); goto bad; + } dout(" added pg_temp %llx len %d\n", *(u64 *)&pgid, pglen); } } while (rbp) { - struct rb_node *cur = rbp; + struct ceph_pg_mapping *cur = + rb_entry(rbp, struct ceph_pg_mapping, node); + rbp = rb_next(rbp); - dout(" removed pg_temp %llx\n", - *(u64 *)&rb_entry(cur, struct ceph_pg_mapping, - node)->pgid); - rb_erase(cur, &map->pg_temp); + dout(" removed pg_temp %llx\n", *(u64 *)&cur->pgid); + rb_erase(&cur->node, &map->pg_temp); + kfree(cur); } /* ignore the rest */ @@ -1020,8 +1048,9 @@ static int *calc_pg_raw(struct ceph_osdmap *osdmap, struct ceph_pg pgid, ruleno = crush_find_rule(osdmap->crush, pool->v.crush_ruleset, pool->v.type, pool->v.size); if (ruleno < 0) { - pr_err("no crush rule pool %d type %d size %d\n", - poolid, pool->v.type, pool->v.size); + pr_err("no crush rule pool %d ruleset %d type %d size %d\n", + poolid, pool->v.crush_ruleset, pool->v.type, + pool->v.size); return NULL; } @@ -1041,12 +1070,33 @@ static int *calc_pg_raw(struct ceph_osdmap *osdmap, struct ceph_pg pgid, } /* + * Return acting set for given pgid. + */ +int ceph_calc_pg_acting(struct ceph_osdmap *osdmap, struct ceph_pg pgid, + int *acting) +{ + int rawosds[CEPH_PG_MAX_SIZE], *osds; + int i, o, num = CEPH_PG_MAX_SIZE; + + osds = calc_pg_raw(osdmap, pgid, rawosds, &num); + if (!osds) + return -1; + + /* primary is first up osd */ + o = 0; + for (i = 0; i < num; i++) + if (ceph_osd_is_up(osdmap, osds[i])) + acting[o++] = osds[i]; + return o; +} + +/* * Return primary osd for given pgid, or -1 if none. */ int ceph_calc_pg_primary(struct ceph_osdmap *osdmap, struct ceph_pg pgid) { - int rawosds[10], *osds; - int i, num = ARRAY_SIZE(rawosds); + int rawosds[CEPH_PG_MAX_SIZE], *osds; + int i, num = CEPH_PG_MAX_SIZE; osds = calc_pg_raw(osdmap, pgid, rawosds, &num); if (!osds) @@ -1054,9 +1104,7 @@ int ceph_calc_pg_primary(struct ceph_osdmap *osdmap, struct ceph_pg pgid) /* primary is first up osd */ for (i = 0; i < num; i++) - if (ceph_osd_is_up(osdmap, osds[i])) { + if (ceph_osd_is_up(osdmap, osds[i])) return osds[i]; - break; - } return -1; } diff --git a/fs/ceph/osdmap.h b/fs/ceph/osdmap.h index 8bc9f1e4f56..970b547e510 100644 --- a/fs/ceph/osdmap.h +++ b/fs/ceph/osdmap.h @@ -120,6 +120,8 @@ extern int ceph_calc_object_layout(struct ceph_object_layout *ol, const char *oid, struct ceph_file_layout *fl, struct ceph_osdmap *osdmap); +extern int ceph_calc_pg_acting(struct ceph_osdmap *osdmap, struct ceph_pg pgid, + int *acting); extern int ceph_calc_pg_primary(struct ceph_osdmap *osdmap, struct ceph_pg pgid); diff --git a/fs/ceph/pagelist.c b/fs/ceph/pagelist.c index 5f8dbf7c745..b6859f47d36 100644 --- a/fs/ceph/pagelist.c +++ b/fs/ceph/pagelist.c @@ -20,7 +20,7 @@ int ceph_pagelist_release(struct ceph_pagelist *pl) static int ceph_pagelist_addpage(struct ceph_pagelist *pl) { - struct page *page = alloc_page(GFP_NOFS); + struct page *page = __page_cache_alloc(GFP_NOFS); if (!page) return -ENOMEM; pl->room += PAGE_SIZE; diff --git a/fs/ceph/rados.h b/fs/ceph/rados.h index a1fc1d017b5..6d5247f2e81 100644 --- a/fs/ceph/rados.h +++ b/fs/ceph/rados.h @@ -1,5 +1,5 @@ -#ifndef __RADOS_H -#define __RADOS_H +#ifndef CEPH_RADOS_H +#define CEPH_RADOS_H /* * Data types for the Ceph distributed object storage layer RADOS @@ -58,6 +58,7 @@ struct ceph_timespec { #define CEPH_PG_LAYOUT_LINEAR 2 #define CEPH_PG_LAYOUT_HYBRID 3 +#define CEPH_PG_MAX_SIZE 16 /* max # osds in a single pg */ /* * placement group. @@ -100,8 +101,8 @@ struct ceph_pg_pool { __le64 snap_seq; /* seq for per-pool snapshot */ __le32 snap_epoch; /* epoch of last snap */ __le32 num_snaps; - __le32 num_removed_snap_intervals; - __le64 uid; + __le32 num_removed_snap_intervals; /* if non-empty, NO per-pool snaps */ + __le64 auid; /* who owns the pg */ } __attribute__ ((packed)); /* @@ -202,11 +203,13 @@ enum { CEPH_OSD_OP_TMAPGET = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_DATA | 12, CEPH_OSD_OP_CREATE = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 13, + CEPH_OSD_OP_ROLLBACK= CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 14, /** attrs **/ /* read */ CEPH_OSD_OP_GETXATTR = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_ATTR | 1, CEPH_OSD_OP_GETXATTRS = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_ATTR | 2, + CEPH_OSD_OP_CMPXATTR = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_ATTR | 3, /* write */ CEPH_OSD_OP_SETXATTR = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_ATTR | 1, @@ -270,6 +273,10 @@ static inline int ceph_osd_op_mode_modify(int op) return (op & CEPH_OSD_OP_MODE) == CEPH_OSD_OP_MODE_WR; } +/* + * note that the following tmap stuff is also defined in the ceph librados.h + * any modification here needs to be updated there + */ #define CEPH_OSD_TMAP_HDR 'h' #define CEPH_OSD_TMAP_SET 's' #define CEPH_OSD_TMAP_RM 'r' @@ -295,6 +302,7 @@ enum { CEPH_OSD_FLAG_PARALLELEXEC = 512, /* execute op in parallel */ CEPH_OSD_FLAG_PGOP = 1024, /* pg op, no object */ CEPH_OSD_FLAG_EXEC = 2048, /* op may exec */ + CEPH_OSD_FLAG_EXEC_PUBLIC = 4096, /* op may exec (public) */ }; enum { @@ -304,6 +312,22 @@ enum { #define EOLDSNAPC ERESTART /* ORDERSNAP flag set; writer has old snapc*/ #define EBLACKLISTED ESHUTDOWN /* blacklisted */ +/* xattr comparison */ +enum { + CEPH_OSD_CMPXATTR_OP_NOP = 0, + CEPH_OSD_CMPXATTR_OP_EQ = 1, + CEPH_OSD_CMPXATTR_OP_NE = 2, + CEPH_OSD_CMPXATTR_OP_GT = 3, + CEPH_OSD_CMPXATTR_OP_GTE = 4, + CEPH_OSD_CMPXATTR_OP_LT = 5, + CEPH_OSD_CMPXATTR_OP_LTE = 6 +}; + +enum { + CEPH_OSD_CMPXATTR_MODE_STRING = 1, + CEPH_OSD_CMPXATTR_MODE_U64 = 2 +}; + /* * an individual object operation. each may be accompanied by some data * payload @@ -320,6 +344,8 @@ struct ceph_osd_op { struct { __le32 name_len; __le32 value_len; + __u8 cmp_op; /* CEPH_OSD_CMPXATTR_OP_* */ + __u8 cmp_mode; /* CEPH_OSD_CMPXATTR_MODE_* */ } __attribute__ ((packed)) xattr; struct { __u8 class_len; @@ -330,6 +356,9 @@ struct ceph_osd_op { struct { __le64 cookie, count; } __attribute__ ((packed)) pgls; + struct { + __le64 snapid; + } __attribute__ ((packed)) snap; }; __le32 payload_len; } __attribute__ ((packed)); diff --git a/fs/ceph/snap.c b/fs/ceph/snap.c index d5114db7045..c0b26b6badb 100644 --- a/fs/ceph/snap.c +++ b/fs/ceph/snap.c @@ -512,7 +512,7 @@ int __ceph_finish_cap_snap(struct ceph_inode_info *ci, struct ceph_cap_snap *capsnap) { struct inode *inode = &ci->vfs_inode; - struct ceph_mds_client *mdsc = &ceph_client(inode->i_sb)->mdsc; + struct ceph_mds_client *mdsc = &ceph_sb_to_client(inode->i_sb)->mdsc; BUG_ON(capsnap->writing); capsnap->size = inode->i_size; diff --git a/fs/ceph/super.c b/fs/ceph/super.c index f888cf487b7..9922628532b 100644 --- a/fs/ceph/super.c +++ b/fs/ceph/super.c @@ -2,20 +2,18 @@ #include "ceph_debug.h" #include <linux/backing-dev.h> +#include <linux/ctype.h> #include <linux/fs.h> #include <linux/inet.h> #include <linux/in6.h> #include <linux/module.h> #include <linux/mount.h> #include <linux/parser.h> -#include <linux/rwsem.h> #include <linux/sched.h> #include <linux/seq_file.h> #include <linux/slab.h> #include <linux/statfs.h> #include <linux/string.h> -#include <linux/version.h> -#include <linux/vmalloc.h> #include "decode.h" #include "super.h" @@ -47,10 +45,20 @@ const char *ceph_file_part(const char *s, int len) */ static void ceph_put_super(struct super_block *s) { - struct ceph_client *cl = ceph_client(s); + struct ceph_client *client = ceph_sb_to_client(s); dout("put_super\n"); - ceph_mdsc_close_sessions(&cl->mdsc); + ceph_mdsc_close_sessions(&client->mdsc); + + /* + * ensure we release the bdi before put_anon_super releases + * the device name. + */ + if (s->s_bdi == &client->backing_dev_info) { + bdi_unregister(&client->backing_dev_info); + s->s_bdi = NULL; + } + return; } @@ -82,7 +90,7 @@ static int ceph_statfs(struct dentry *dentry, struct kstatfs *buf) buf->f_files = le64_to_cpu(st.num_objects); buf->f_ffree = -1; - buf->f_namelen = PATH_MAX; + buf->f_namelen = NAME_MAX; buf->f_frsize = PAGE_CACHE_SIZE; /* leave fsid little-endian, regardless of host endianness */ @@ -94,15 +102,52 @@ static int ceph_statfs(struct dentry *dentry, struct kstatfs *buf) } -static int ceph_syncfs(struct super_block *sb, int wait) +static int ceph_sync_fs(struct super_block *sb, int wait) { - dout("sync_fs %d\n", wait); - ceph_osdc_sync(&ceph_client(sb)->osdc); - ceph_mdsc_sync(&ceph_client(sb)->mdsc); - dout("sync_fs %d done\n", wait); + struct ceph_client *client = ceph_sb_to_client(sb); + + if (!wait) { + dout("sync_fs (non-blocking)\n"); + ceph_flush_dirty_caps(&client->mdsc); + dout("sync_fs (non-blocking) done\n"); + return 0; + } + + dout("sync_fs (blocking)\n"); + ceph_osdc_sync(&ceph_sb_to_client(sb)->osdc); + ceph_mdsc_sync(&ceph_sb_to_client(sb)->mdsc); + dout("sync_fs (blocking) done\n"); return 0; } +static int default_congestion_kb(void) +{ + int congestion_kb; + + /* + * Copied from NFS + * + * congestion size, scale with available memory. + * + * 64MB: 8192k + * 128MB: 11585k + * 256MB: 16384k + * 512MB: 23170k + * 1GB: 32768k + * 2GB: 46340k + * 4GB: 65536k + * 8GB: 92681k + * 16GB: 131072k + * + * This allows larger machines to have larger/more transfers. + * Limit the default to 256M + */ + congestion_kb = (16*int_sqrt(totalram_pages)) << (PAGE_SHIFT-10); + if (congestion_kb > 256*1024) + congestion_kb = 256*1024; + + return congestion_kb; +} /** * ceph_show_options - Show mount options in /proc/mounts @@ -115,9 +160,7 @@ static int ceph_show_options(struct seq_file *m, struct vfsmount *mnt) struct ceph_mount_args *args = client->mount_args; if (args->flags & CEPH_OPT_FSID) - seq_printf(m, ",fsidmajor=%llu,fsidminor%llu", - le64_to_cpu(*(__le64 *)&args->fsid.fsid[0]), - le64_to_cpu(*(__le64 *)&args->fsid.fsid[8])); + seq_printf(m, ",fsid=%pU", &args->fsid); if (args->flags & CEPH_OPT_NOSHARE) seq_puts(m, ",noshare"); if (args->flags & CEPH_OPT_DIRSTAT) @@ -128,6 +171,35 @@ static int ceph_show_options(struct seq_file *m, struct vfsmount *mnt) seq_puts(m, ",nocrc"); if (args->flags & CEPH_OPT_NOASYNCREADDIR) seq_puts(m, ",noasyncreaddir"); + + if (args->mount_timeout != CEPH_MOUNT_TIMEOUT_DEFAULT) + seq_printf(m, ",mount_timeout=%d", args->mount_timeout); + if (args->osd_idle_ttl != CEPH_OSD_IDLE_TTL_DEFAULT) + seq_printf(m, ",osd_idle_ttl=%d", args->osd_idle_ttl); + if (args->osd_timeout != CEPH_OSD_TIMEOUT_DEFAULT) + seq_printf(m, ",osdtimeout=%d", args->osd_timeout); + if (args->osd_keepalive_timeout != CEPH_OSD_KEEPALIVE_DEFAULT) + seq_printf(m, ",osdkeepalivetimeout=%d", + args->osd_keepalive_timeout); + if (args->wsize) + seq_printf(m, ",wsize=%d", args->wsize); + if (args->rsize != CEPH_MOUNT_RSIZE_DEFAULT) + seq_printf(m, ",rsize=%d", args->rsize); + if (args->congestion_kb != default_congestion_kb()) + seq_printf(m, ",write_congestion_kb=%d", args->congestion_kb); + if (args->caps_wanted_delay_min != CEPH_CAPS_WANTED_DELAY_MIN_DEFAULT) + seq_printf(m, ",caps_wanted_delay_min=%d", + args->caps_wanted_delay_min); + if (args->caps_wanted_delay_max != CEPH_CAPS_WANTED_DELAY_MAX_DEFAULT) + seq_printf(m, ",caps_wanted_delay_max=%d", + args->caps_wanted_delay_max); + if (args->cap_release_safety != CEPH_CAP_RELEASE_SAFETY_DEFAULT) + seq_printf(m, ",cap_release_safety=%d", + args->cap_release_safety); + if (args->max_readdir != CEPH_MAX_READDIR_DEFAULT) + seq_printf(m, ",readdir_max_entries=%d", args->max_readdir); + if (args->max_readdir_bytes != CEPH_MAX_READDIR_BYTES_DEFAULT) + seq_printf(m, ",readdir_max_bytes=%d", args->max_readdir_bytes); if (strcmp(args->snapdir_name, CEPH_SNAPDIRNAME_DEFAULT)) seq_printf(m, ",snapdirname=%s", args->snapdir_name); if (args->name) @@ -151,35 +223,6 @@ static void ceph_inode_init_once(void *foo) inode_init_once(&ci->vfs_inode); } -static int default_congestion_kb(void) -{ - int congestion_kb; - - /* - * Copied from NFS - * - * congestion size, scale with available memory. - * - * 64MB: 8192k - * 128MB: 11585k - * 256MB: 16384k - * 512MB: 23170k - * 1GB: 32768k - * 2GB: 46340k - * 4GB: 65536k - * 8GB: 92681k - * 16GB: 131072k - * - * This allows larger machines to have larger/more transfers. - * Limit the default to 256M - */ - congestion_kb = (16*int_sqrt(totalram_pages)) << (PAGE_SHIFT-10); - if (congestion_kb > 256*1024) - congestion_kb = 256*1024; - - return congestion_kb; -} - static int __init init_caches(void) { ceph_inode_cachep = kmem_cache_create("ceph_inode_info", @@ -244,7 +287,7 @@ static const struct super_operations ceph_super_ops = { .alloc_inode = ceph_alloc_inode, .destroy_inode = ceph_destroy_inode, .write_inode = ceph_write_inode, - .sync_fs = ceph_syncfs, + .sync_fs = ceph_sync_fs, .put_super = ceph_put_super, .show_options = ceph_show_options, .statfs = ceph_statfs, @@ -287,9 +330,6 @@ const char *ceph_msg_type_name(int type) * mount options */ enum { - Opt_fsidmajor, - Opt_fsidminor, - Opt_monport, Opt_wsize, Opt_rsize, Opt_osdtimeout, @@ -298,10 +338,13 @@ enum { Opt_osd_idle_ttl, Opt_caps_wanted_delay_min, Opt_caps_wanted_delay_max, + Opt_cap_release_safety, Opt_readdir_max_entries, + Opt_readdir_max_bytes, Opt_congestion_kb, Opt_last_int, /* int args above */ + Opt_fsid, Opt_snapdirname, Opt_name, Opt_secret, @@ -318,9 +361,6 @@ enum { }; static match_table_t arg_tokens = { - {Opt_fsidmajor, "fsidmajor=%ld"}, - {Opt_fsidminor, "fsidminor=%ld"}, - {Opt_monport, "monport=%d"}, {Opt_wsize, "wsize=%d"}, {Opt_rsize, "rsize=%d"}, {Opt_osdtimeout, "osdtimeout=%d"}, @@ -329,9 +369,12 @@ static match_table_t arg_tokens = { {Opt_osd_idle_ttl, "osd_idle_ttl=%d"}, {Opt_caps_wanted_delay_min, "caps_wanted_delay_min=%d"}, {Opt_caps_wanted_delay_max, "caps_wanted_delay_max=%d"}, + {Opt_cap_release_safety, "cap_release_safety=%d"}, {Opt_readdir_max_entries, "readdir_max_entries=%d"}, + {Opt_readdir_max_bytes, "readdir_max_bytes=%d"}, {Opt_congestion_kb, "write_congestion_kb=%d"}, /* int args above */ + {Opt_fsid, "fsid=%s"}, {Opt_snapdirname, "snapdirname=%s"}, {Opt_name, "name=%s"}, {Opt_secret, "secret=%s"}, @@ -347,6 +390,36 @@ static match_table_t arg_tokens = { {-1, NULL} }; +static int parse_fsid(const char *str, struct ceph_fsid *fsid) +{ + int i = 0; + char tmp[3]; + int err = -EINVAL; + int d; + + dout("parse_fsid '%s'\n", str); + tmp[2] = 0; + while (*str && i < 16) { + if (ispunct(*str)) { + str++; + continue; + } + if (!isxdigit(str[0]) || !isxdigit(str[1])) + break; + tmp[0] = str[0]; + tmp[1] = str[1]; + if (sscanf(tmp, "%x", &d) < 1) + break; + fsid->fsid[i] = d & 0xff; + i++; + str += 2; + } + + if (i == 16) + err = 0; + dout("parse_fsid ret %d got fsid %pU", err, fsid); + return err; +} static struct ceph_mount_args *parse_mount_args(int flags, char *options, const char *dev_name, @@ -378,8 +451,9 @@ static struct ceph_mount_args *parse_mount_args(int flags, char *options, args->caps_wanted_delay_max = CEPH_CAPS_WANTED_DELAY_MAX_DEFAULT; args->rsize = CEPH_MOUNT_RSIZE_DEFAULT; args->snapdir_name = kstrdup(CEPH_SNAPDIRNAME_DEFAULT, GFP_KERNEL); - args->cap_release_safety = CEPH_CAPS_PER_RELEASE * 4; - args->max_readdir = 1024; + args->cap_release_safety = CEPH_CAP_RELEASE_SAFETY_DEFAULT; + args->max_readdir = CEPH_MAX_READDIR_DEFAULT; + args->max_readdir_bytes = CEPH_MAX_READDIR_BYTES_DEFAULT; args->congestion_kb = default_congestion_kb(); /* ip1[:port1][,ip2[:port2]...]:/subdir/in/fs */ @@ -429,12 +503,6 @@ static struct ceph_mount_args *parse_mount_args(int flags, char *options, dout("got token %d\n", token); } switch (token) { - case Opt_fsidmajor: - *(__le64 *)&args->fsid.fsid[0] = cpu_to_le64(intval); - break; - case Opt_fsidminor: - *(__le64 *)&args->fsid.fsid[8] = cpu_to_le64(intval); - break; case Opt_ip: err = ceph_parse_ips(argstr[0].from, argstr[0].to, @@ -445,6 +513,11 @@ static struct ceph_mount_args *parse_mount_args(int flags, char *options, args->flags |= CEPH_OPT_MYIP; break; + case Opt_fsid: + err = parse_fsid(argstr[0].from, &args->fsid); + if (err == 0) + args->flags |= CEPH_OPT_FSID; + break; case Opt_snapdirname: kfree(args->snapdir_name); args->snapdir_name = kstrndup(argstr[0].from, @@ -475,6 +548,9 @@ static struct ceph_mount_args *parse_mount_args(int flags, char *options, case Opt_osdkeepalivetimeout: args->osd_keepalive_timeout = intval; break; + case Opt_osd_idle_ttl: + args->osd_idle_ttl = intval; + break; case Opt_mount_timeout: args->mount_timeout = intval; break; @@ -487,6 +563,9 @@ static struct ceph_mount_args *parse_mount_args(int flags, char *options, case Opt_readdir_max_entries: args->max_readdir = intval; break; + case Opt_readdir_max_bytes: + args->max_readdir_bytes = intval; + break; case Opt_congestion_kb: args->congestion_kb = intval; break; @@ -587,7 +666,6 @@ static struct ceph_client *ceph_create_client(struct ceph_mount_args *args) /* caps */ client->min_caps = args->max_readdir; - ceph_adjust_min_caps(client->min_caps); /* subsystems */ err = ceph_monc_init(&client->monc, client); @@ -626,16 +704,24 @@ static void ceph_destroy_client(struct ceph_client *client) /* unmount */ ceph_mdsc_stop(&client->mdsc); - ceph_monc_stop(&client->monc); ceph_osdc_stop(&client->osdc); - ceph_adjust_min_caps(-client->min_caps); + /* + * make sure mds and osd connections close out before destroying + * the auth module, which is needed to free those connections' + * ceph_authorizers. + */ + ceph_msgr_flush(); + + ceph_monc_stop(&client->monc); ceph_debugfs_client_cleanup(client); destroy_workqueue(client->wb_wq); destroy_workqueue(client->pg_inv_wq); destroy_workqueue(client->trunc_wq); + bdi_destroy(&client->backing_dev_info); + if (client->msgr) ceph_messenger_destroy(client->msgr); mempool_destroy(client->wb_pagevec_pool); @@ -653,13 +739,13 @@ int ceph_check_fsid(struct ceph_client *client, struct ceph_fsid *fsid) { if (client->have_fsid) { if (ceph_fsid_compare(&client->fsid, fsid)) { - pr_err("bad fsid, had " FSID_FORMAT " got " FSID_FORMAT, - PR_FSID(&client->fsid), PR_FSID(fsid)); + pr_err("bad fsid, had %pU got %pU", + &client->fsid, fsid); return -1; } } else { - pr_info("client%lld fsid " FSID_FORMAT "\n", - client->monc.auth->global_id, PR_FSID(fsid)); + pr_info("client%lld fsid %pU\n", client->monc.auth->global_id, + fsid); memcpy(&client->fsid, fsid, sizeof(*fsid)); ceph_debugfs_client_init(client); client->have_fsid = true; @@ -670,9 +756,10 @@ int ceph_check_fsid(struct ceph_client *client, struct ceph_fsid *fsid) /* * true if we have the mon map (and have thus joined the cluster) */ -static int have_mon_map(struct ceph_client *client) +static int have_mon_and_osd_map(struct ceph_client *client) { - return client->monc.monmap && client->monc.monmap->epoch; + return client->monc.monmap && client->monc.monmap->epoch && + client->osdc.osdmap && client->osdc.osdmap->epoch; } /* @@ -692,7 +779,7 @@ static struct dentry *open_root_dentry(struct ceph_client *client, dout("open_root_inode opening '%s'\n", path); req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_GETATTR, USE_ANY_MDS); if (IS_ERR(req)) - return ERR_PTR(PTR_ERR(req)); + return ERR_CAST(req); req->r_path1 = kstrdup(path, GFP_NOFS); req->r_ino1.ino = CEPH_INO_ROOT; req->r_ino1.snap = CEPH_NOSNAP; @@ -750,7 +837,7 @@ static int ceph_mount(struct ceph_client *client, struct vfsmount *mnt, if (err < 0) goto out; - while (!have_mon_map(client)) { + while (!have_mon_and_osd_map(client)) { err = -EIO; if (timeout && time_after_eq(jiffies, started + timeout)) goto out; @@ -758,8 +845,8 @@ static int ceph_mount(struct ceph_client *client, struct vfsmount *mnt, /* wait */ dout("mount waiting for mon_map\n"); err = wait_event_interruptible_timeout(client->auth_wq, - have_mon_map(client) || (client->auth_err < 0), - timeout); + have_mon_and_osd_map(client) || (client->auth_err < 0), + timeout); if (err == -EINTR || err == -ERESTARTSYS) goto out; if (client->auth_err < 0) { @@ -872,18 +959,21 @@ static int ceph_compare_super(struct super_block *sb, void *data) /* * construct our own bdi so we can control readahead, etc. */ +static atomic_long_t bdi_seq = ATOMIC_LONG_INIT(0); + static int ceph_register_bdi(struct super_block *sb, struct ceph_client *client) { int err; - sb->s_bdi = &client->backing_dev_info; - /* set ra_pages based on rsize mount option? */ if (client->mount_args->rsize >= PAGE_CACHE_SIZE) client->backing_dev_info.ra_pages = (client->mount_args->rsize + PAGE_CACHE_SIZE - 1) >> PAGE_SHIFT; - err = bdi_register_dev(&client->backing_dev_info, sb->s_dev); + err = bdi_register(&client->backing_dev_info, NULL, "ceph-%d", + atomic_long_inc_return(&bdi_seq)); + if (!err) + sb->s_bdi = &client->backing_dev_info; return err; } @@ -920,9 +1010,9 @@ static int ceph_get_sb(struct file_system_type *fs_type, goto out; } - if (ceph_client(sb) != client) { + if (ceph_sb_to_client(sb) != client) { ceph_destroy_client(client); - client = ceph_client(sb); + client = ceph_sb_to_client(sb); dout("get_sb got existing client %p\n", client); } else { dout("get_sb using new client %p\n", client); @@ -940,8 +1030,7 @@ static int ceph_get_sb(struct file_system_type *fs_type, out_splat: ceph_mdsc_close_sessions(&client->mdsc); - up_write(&sb->s_umount); - deactivate_super(sb); + deactivate_locked_super(sb); goto out_final; out: @@ -957,9 +1046,6 @@ static void ceph_kill_sb(struct super_block *s) dout("kill_sb %p\n", s); ceph_mdsc_pre_umount(&client->mdsc); kill_anon_super(s); /* will call put_super after sb is r/o */ - if (s->s_bdi == &client->backing_dev_info) - bdi_unregister(&client->backing_dev_info); - bdi_destroy(&client->backing_dev_info); ceph_destroy_client(client); } @@ -990,8 +1076,6 @@ static int __init init_ceph(void) if (ret) goto out_msgr; - ceph_caps_init(); - ret = register_filesystem(&ceph_fs_type); if (ret) goto out_icache; @@ -1016,7 +1100,6 @@ static void __exit exit_ceph(void) { dout("exit_ceph\n"); unregister_filesystem(&ceph_fs_type); - ceph_caps_finalize(); destroy_caches(); ceph_msgr_exit(); ceph_debugfs_cleanup(); diff --git a/fs/ceph/super.h b/fs/ceph/super.h index 13513b80d87..2482d696f0d 100644 --- a/fs/ceph/super.h +++ b/fs/ceph/super.h @@ -10,7 +10,6 @@ #include <linux/fs.h> #include <linux/mempool.h> #include <linux/pagemap.h> -#include <linux/slab.h> #include <linux/wait.h> #include <linux/writeback.h> #include <linux/slab.h> @@ -32,6 +31,12 @@ #define CEPH_BLOCK (1 << CEPH_BLOCK_SHIFT) /* + * Supported features + */ +#define CEPH_FEATURE_SUPPORTED CEPH_FEATURE_NOSRCADDR | CEPH_FEATURE_FLOCK +#define CEPH_FEATURE_REQUIRED CEPH_FEATURE_NOSRCADDR + +/* * mount options */ #define CEPH_OPT_FSID (1<<0) @@ -52,24 +57,25 @@ struct ceph_mount_args { int sb_flags; + int flags; + struct ceph_fsid fsid; + struct ceph_entity_addr my_addr; int num_mon; struct ceph_entity_addr *mon_addr; - int flags; int mount_timeout; int osd_idle_ttl; - int caps_wanted_delay_min, caps_wanted_delay_max; - struct ceph_fsid fsid; - struct ceph_entity_addr my_addr; - int wsize; - int rsize; /* max readahead */ - int max_readdir; /* max readdir size */ - int congestion_kb; /* max readdir size */ int osd_timeout; int osd_keepalive_timeout; + int wsize; + int rsize; /* max readahead */ + int congestion_kb; /* max writeback in flight */ + int caps_wanted_delay_min, caps_wanted_delay_max; + int cap_release_safety; + int max_readdir; /* max readdir result (entires) */ + int max_readdir_bytes; /* max readdir result (bytes) */ char *snapdir_name; /* default ".snap" */ char *name; char *secret; - int cap_release_safety; }; /* @@ -80,13 +86,14 @@ struct ceph_mount_args { #define CEPH_OSD_KEEPALIVE_DEFAULT 5 #define CEPH_OSD_IDLE_TTL_DEFAULT 60 #define CEPH_MOUNT_RSIZE_DEFAULT (512*1024) /* readahead */ +#define CEPH_MAX_READDIR_DEFAULT 1024 +#define CEPH_MAX_READDIR_BYTES_DEFAULT (512*1024) #define CEPH_MSG_MAX_FRONT_LEN (16*1024*1024) #define CEPH_MSG_MAX_DATA_LEN (16*1024*1024) #define CEPH_SNAPDIRNAME_DEFAULT ".snap" #define CEPH_AUTH_NAME_DEFAULT "guest" - /* * Delay telling the MDS we no longer want caps, in case we reopen * the file. Delay a minimum amount of time, even if we send a cap @@ -96,6 +103,7 @@ struct ceph_mount_args { #define CEPH_CAPS_WANTED_DELAY_MIN_DEFAULT 5 /* cap release delay */ #define CEPH_CAPS_WANTED_DELAY_MAX_DEFAULT 60 /* cap release delay */ +#define CEPH_CAP_RELEASE_SAFETY_DEFAULT (CEPH_CAPS_PER_RELEASE * 4) /* mount state */ enum { @@ -160,12 +168,6 @@ struct ceph_client { #endif }; -static inline struct ceph_client *ceph_client(struct super_block *sb) -{ - return sb->s_fs_info; -} - - /* * File i/o capability. This tracks shared state with the metadata * server that allows us to cache or writeback attributes or to read @@ -564,11 +566,13 @@ static inline int __ceph_caps_wanted(struct ceph_inode_info *ci) /* what the mds thinks we want */ extern int __ceph_caps_mds_wanted(struct ceph_inode_info *ci); -extern void ceph_caps_init(void); -extern void ceph_caps_finalize(void); -extern void ceph_adjust_min_caps(int delta); -extern int ceph_reserve_caps(struct ceph_cap_reservation *ctx, int need); -extern int ceph_unreserve_caps(struct ceph_cap_reservation *ctx); +extern void ceph_caps_init(struct ceph_mds_client *mdsc); +extern void ceph_caps_finalize(struct ceph_mds_client *mdsc); +extern void ceph_adjust_min_caps(struct ceph_mds_client *mdsc, int delta); +extern int ceph_reserve_caps(struct ceph_mds_client *mdsc, + struct ceph_cap_reservation *ctx, int need); +extern int ceph_unreserve_caps(struct ceph_mds_client *mdsc, + struct ceph_cap_reservation *ctx); extern void ceph_reservation_status(struct ceph_client *client, int *total, int *avail, int *used, int *reserved, int *min); @@ -742,13 +746,6 @@ extern struct kmem_cache *ceph_file_cachep; extern const char *ceph_msg_type_name(int type); extern int ceph_check_fsid(struct ceph_client *client, struct ceph_fsid *fsid); -#define FSID_FORMAT "%02x%02x%02x%02x-%02x%02x-%02x%02x-%02x%02x-" \ - "%02x%02x%02x%02x%02x%02x" -#define PR_FSID(f) (f)->fsid[0], (f)->fsid[1], (f)->fsid[2], (f)->fsid[3], \ - (f)->fsid[4], (f)->fsid[5], (f)->fsid[6], (f)->fsid[7], \ - (f)->fsid[8], (f)->fsid[9], (f)->fsid[10], (f)->fsid[11], \ - (f)->fsid[12], (f)->fsid[13], (f)->fsid[14], (f)->fsid[15] - /* inode.c */ extern const struct inode_operations ceph_file_iops; @@ -810,13 +807,16 @@ static inline void ceph_remove_cap(struct ceph_cap *cap) __ceph_remove_cap(cap); spin_unlock(&inode->i_lock); } -extern void ceph_put_cap(struct ceph_cap *cap); +extern void ceph_put_cap(struct ceph_mds_client *mdsc, + struct ceph_cap *cap); extern void ceph_queue_caps_release(struct inode *inode); extern int ceph_write_inode(struct inode *inode, struct writeback_control *wbc); -extern int ceph_fsync(struct file *file, struct dentry *dentry, int datasync); +extern int ceph_fsync(struct file *file, int datasync); extern void ceph_kick_flushing_caps(struct ceph_mds_client *mdsc, struct ceph_mds_session *session); +extern struct ceph_cap *ceph_get_cap_for_mds(struct ceph_inode_info *ci, + int mds); extern int ceph_get_cap_mds(struct inode *inode); extern void ceph_get_cap_refs(struct ceph_inode_info *ci, int caps); extern void ceph_put_cap_refs(struct ceph_inode_info *ci, int had); @@ -861,7 +861,7 @@ extern void ceph_release_page_vector(struct page **pages, int num_pages); /* dir.c */ extern const struct file_operations ceph_dir_fops; extern const struct inode_operations ceph_dir_iops; -extern struct dentry_operations ceph_dentry_ops, ceph_snap_dentry_ops, +extern const struct dentry_operations ceph_dentry_ops, ceph_snap_dentry_ops, ceph_snapdir_dentry_ops; extern int ceph_handle_notrace_create(struct inode *dir, struct dentry *dentry); @@ -871,6 +871,7 @@ extern struct dentry *ceph_finish_lookup(struct ceph_mds_request *req, extern void ceph_dentry_lru_add(struct dentry *dn); extern void ceph_dentry_lru_touch(struct dentry *dn); extern void ceph_dentry_lru_del(struct dentry *dn); +extern void ceph_invalidate_dentry_lease(struct dentry *dentry); /* * our d_ops vary depending on whether the inode is live, @@ -891,6 +892,14 @@ extern void ceph_debugfs_cleanup(void); extern int ceph_debugfs_client_init(struct ceph_client *client); extern void ceph_debugfs_client_cleanup(struct ceph_client *client); +/* locks.c */ +extern int ceph_lock(struct file *file, int cmd, struct file_lock *fl); +extern int ceph_flock(struct file *file, int cmd, struct file_lock *fl); +extern void ceph_count_locks(struct inode *inode, int *p_num, int *f_num); +extern int ceph_encode_locks(struct inode *i, struct ceph_pagelist *p, + int p_locks, int f_locks); +extern int lock_to_ceph_filelock(struct file_lock *fl, struct ceph_filelock *c); + static inline struct inode *get_dentry_parent_inode(struct dentry *dentry) { if (dentry && dentry->d_parent) diff --git a/fs/ceph/xattr.c b/fs/ceph/xattr.c index 2845422907f..097a2654c00 100644 --- a/fs/ceph/xattr.c +++ b/fs/ceph/xattr.c @@ -7,7 +7,8 @@ static bool ceph_is_valid_xattr(const char *name) { - return !strncmp(name, XATTR_SECURITY_PREFIX, + return !strncmp(name, "ceph.", 5) || + !strncmp(name, XATTR_SECURITY_PREFIX, XATTR_SECURITY_PREFIX_LEN) || !strncmp(name, XATTR_TRUSTED_PREFIX, XATTR_TRUSTED_PREFIX_LEN) || !strncmp(name, XATTR_USER_PREFIX, XATTR_USER_PREFIX_LEN); @@ -76,14 +77,14 @@ static size_t ceph_vxattrcb_rctime(struct ceph_inode_info *ci, char *val, } static struct ceph_vxattr_cb ceph_dir_vxattrs[] = { - { true, "user.ceph.dir.entries", ceph_vxattrcb_entries}, - { true, "user.ceph.dir.files", ceph_vxattrcb_files}, - { true, "user.ceph.dir.subdirs", ceph_vxattrcb_subdirs}, - { true, "user.ceph.dir.rentries", ceph_vxattrcb_rentries}, - { true, "user.ceph.dir.rfiles", ceph_vxattrcb_rfiles}, - { true, "user.ceph.dir.rsubdirs", ceph_vxattrcb_rsubdirs}, - { true, "user.ceph.dir.rbytes", ceph_vxattrcb_rbytes}, - { true, "user.ceph.dir.rctime", ceph_vxattrcb_rctime}, + { true, "ceph.dir.entries", ceph_vxattrcb_entries}, + { true, "ceph.dir.files", ceph_vxattrcb_files}, + { true, "ceph.dir.subdirs", ceph_vxattrcb_subdirs}, + { true, "ceph.dir.rentries", ceph_vxattrcb_rentries}, + { true, "ceph.dir.rfiles", ceph_vxattrcb_rfiles}, + { true, "ceph.dir.rsubdirs", ceph_vxattrcb_rsubdirs}, + { true, "ceph.dir.rbytes", ceph_vxattrcb_rbytes}, + { true, "ceph.dir.rctime", ceph_vxattrcb_rctime}, { true, NULL, NULL } }; @@ -107,7 +108,7 @@ static size_t ceph_vxattrcb_layout(struct ceph_inode_info *ci, char *val, } static struct ceph_vxattr_cb ceph_file_vxattrs[] = { - { true, "user.ceph.layout", ceph_vxattrcb_layout}, + { true, "ceph.layout", ceph_vxattrcb_layout}, { NULL, NULL } }; @@ -186,12 +187,6 @@ static int __set_xattr(struct ceph_inode_info *ci, ci->i_xattrs.names_size -= xattr->name_len; ci->i_xattrs.vals_size -= xattr->val_len; } - if (!xattr) { - pr_err("__set_xattr ENOMEM on %p %llx.%llx xattr %s=%s\n", - &ci->vfs_inode, ceph_vinop(&ci->vfs_inode), name, - xattr->val); - return -ENOMEM; - } ci->i_xattrs.names_size += name_len; ci->i_xattrs.vals_size += val_len; if (val) @@ -342,6 +337,8 @@ void __ceph_destroy_xattrs(struct ceph_inode_info *ci) } static int __build_xattrs(struct inode *inode) + __releases(inode->i_lock) + __acquires(inode->i_lock) { u32 namelen; u32 numattr = 0; @@ -574,7 +571,7 @@ ssize_t ceph_listxattr(struct dentry *dentry, char *names, size_t size) ci->i_xattrs.version, ci->i_xattrs.index_version); if (__ceph_caps_issued_mask(ci, CEPH_CAP_XATTR_SHARED, 1) && - (ci->i_xattrs.index_version > ci->i_xattrs.version)) { + (ci->i_xattrs.index_version >= ci->i_xattrs.version)) { goto list_xattr; } else { spin_unlock(&inode->i_lock); @@ -622,7 +619,7 @@ out: static int ceph_sync_setxattr(struct dentry *dentry, const char *name, const char *value, size_t size, int flags) { - struct ceph_client *client = ceph_client(dentry->d_sb); + struct ceph_client *client = ceph_sb_to_client(dentry->d_sb); struct inode *inode = dentry->d_inode; struct ceph_inode_info *ci = ceph_inode(inode); struct inode *parent_inode = dentry->d_parent->d_inode; @@ -641,7 +638,7 @@ static int ceph_sync_setxattr(struct dentry *dentry, const char *name, return -ENOMEM; err = -ENOMEM; for (i = 0; i < nr_pages; i++) { - pages[i] = alloc_page(GFP_NOFS); + pages[i] = __page_cache_alloc(GFP_NOFS); if (!pages[i]) { nr_pages = i; goto out; @@ -779,7 +776,7 @@ out: static int ceph_send_removexattr(struct dentry *dentry, const char *name) { - struct ceph_client *client = ceph_client(dentry->d_sb); + struct ceph_client *client = ceph_sb_to_client(dentry->d_sb); struct ceph_mds_client *mdsc = &client->mdsc; struct inode *inode = dentry->d_inode; struct inode *parent_inode = dentry->d_parent->d_inode; |
