summaryrefslogtreecommitdiff
path: root/fs/ceph/mds_client.c
diff options
context:
space:
mode:
Diffstat (limited to 'fs/ceph/mds_client.c')
-rw-r--r--fs/ceph/mds_client.c730
1 files changed, 502 insertions, 228 deletions
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index 60a9a4ae47b..a75ddbf9fe3 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -3,6 +3,7 @@
#include <linux/wait.h>
#include <linux/slab.h>
#include <linux/sched.h>
+#include <linux/smp_lock.h>
#include "mds_client.h"
#include "mon_client.h"
@@ -37,10 +38,15 @@
* are no longer valid.
*/
+struct ceph_reconnect_state {
+ struct ceph_pagelist *pagelist;
+ bool flock;
+};
+
static void __wake_requests(struct ceph_mds_client *mdsc,
struct list_head *head);
-const static struct ceph_connection_operations mds_con_ops;
+static const struct ceph_connection_operations mds_con_ops;
/*
@@ -449,7 +455,7 @@ void ceph_mdsc_release_request(struct kref *kref)
kfree(req->r_path1);
kfree(req->r_path2);
put_request_session(req);
- ceph_unreserve_caps(&req->r_caps_reservation);
+ ceph_unreserve_caps(req->r_mdsc, &req->r_caps_reservation);
kfree(req);
}
@@ -512,7 +518,8 @@ static void __register_request(struct ceph_mds_client *mdsc,
{
req->r_tid = ++mdsc->last_tid;
if (req->r_num_caps)
- ceph_reserve_caps(&req->r_caps_reservation, req->r_num_caps);
+ ceph_reserve_caps(mdsc, &req->r_caps_reservation,
+ req->r_num_caps);
dout("__register_request %p tid %lld\n", req, req->r_tid);
ceph_mdsc_get_request(req);
__insert_request(mdsc, req);
@@ -665,10 +672,10 @@ static struct ceph_msg *create_session_msg(u32 op, u64 seq)
struct ceph_msg *msg;
struct ceph_mds_session_head *h;
- msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h), 0, 0, NULL);
- if (IS_ERR(msg)) {
+ msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h), GFP_NOFS);
+ if (!msg) {
pr_err("create_session_msg ENOMEM creating msg\n");
- return ERR_PTR(PTR_ERR(msg));
+ return NULL;
}
h = msg->front.iov_base;
h->op = cpu_to_le32(op);
@@ -687,7 +694,6 @@ static int __open_session(struct ceph_mds_client *mdsc,
struct ceph_msg *msg;
int mstate;
int mds = session->s_mds;
- int err = 0;
/* wait for mds to go active? */
mstate = ceph_mdsmap_get_state(mdsc->mdsmap, mds);
@@ -698,17 +704,58 @@ static int __open_session(struct ceph_mds_client *mdsc,
/* send connect message */
msg = create_session_msg(CEPH_SESSION_REQUEST_OPEN, session->s_seq);
- if (IS_ERR(msg)) {
- err = PTR_ERR(msg);
- goto out;
- }
+ if (!msg)
+ return -ENOMEM;
ceph_con_send(&session->s_con, msg);
-
-out:
return 0;
}
/*
+ * open sessions for any export targets for the given mds
+ *
+ * called under mdsc->mutex
+ */
+static void __open_export_target_sessions(struct ceph_mds_client *mdsc,
+ struct ceph_mds_session *session)
+{
+ struct ceph_mds_info *mi;
+ struct ceph_mds_session *ts;
+ int i, mds = session->s_mds;
+ int target;
+
+ if (mds >= mdsc->mdsmap->m_max_mds)
+ return;
+ mi = &mdsc->mdsmap->m_info[mds];
+ dout("open_export_target_sessions for mds%d (%d targets)\n",
+ session->s_mds, mi->num_export_targets);
+
+ for (i = 0; i < mi->num_export_targets; i++) {
+ target = mi->export_targets[i];
+ ts = __ceph_lookup_mds_session(mdsc, target);
+ if (!ts) {
+ ts = register_session(mdsc, target);
+ if (IS_ERR(ts))
+ return;
+ }
+ if (session->s_state == CEPH_MDS_SESSION_NEW ||
+ session->s_state == CEPH_MDS_SESSION_CLOSING)
+ __open_session(mdsc, session);
+ else
+ dout(" mds%d target mds%d %p is %s\n", session->s_mds,
+ i, ts, session_state_name(ts->s_state));
+ ceph_put_mds_session(ts);
+ }
+}
+
+void ceph_mdsc_open_export_target_sessions(struct ceph_mds_client *mdsc,
+ struct ceph_mds_session *session)
+{
+ mutex_lock(&mdsc->mutex);
+ __open_export_target_sessions(mdsc, session);
+ mutex_unlock(&mdsc->mutex);
+}
+
+/*
* session caps
*/
@@ -736,9 +783,10 @@ static void cleanup_cap_releases(struct ceph_mds_session *session)
}
/*
- * Helper to safely iterate over all caps associated with a session.
+ * Helper to safely iterate over all caps associated with a session, with
+ * special care taken to handle a racing __ceph_remove_cap().
*
- * caller must hold session s_mutex
+ * Caller must hold session s_mutex.
*/
static int iterate_session_caps(struct ceph_mds_session *session,
int (*cb)(struct inode *, struct ceph_cap *,
@@ -768,7 +816,7 @@ static int iterate_session_caps(struct ceph_mds_session *session,
last_inode = NULL;
}
if (old_cap) {
- ceph_put_cap(old_cap);
+ ceph_put_cap(session->s_mdsc, old_cap);
old_cap = NULL;
}
@@ -797,18 +845,55 @@ out:
if (last_inode)
iput(last_inode);
if (old_cap)
- ceph_put_cap(old_cap);
+ ceph_put_cap(session->s_mdsc, old_cap);
return ret;
}
static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
- void *arg)
+ void *arg)
{
struct ceph_inode_info *ci = ceph_inode(inode);
+ int drop = 0;
+
dout("removing cap %p, ci is %p, inode is %p\n",
cap, ci, &ci->vfs_inode);
- ceph_remove_cap(cap);
+ spin_lock(&inode->i_lock);
+ __ceph_remove_cap(cap);
+ if (!__ceph_is_any_real_caps(ci)) {
+ struct ceph_mds_client *mdsc =
+ &ceph_sb_to_client(inode->i_sb)->mdsc;
+
+ spin_lock(&mdsc->cap_dirty_lock);
+ if (!list_empty(&ci->i_dirty_item)) {
+ pr_info(" dropping dirty %s state for %p %lld\n",
+ ceph_cap_string(ci->i_dirty_caps),
+ inode, ceph_ino(inode));
+ ci->i_dirty_caps = 0;
+ list_del_init(&ci->i_dirty_item);
+ drop = 1;
+ }
+ if (!list_empty(&ci->i_flushing_item)) {
+ pr_info(" dropping dirty+flushing %s state for %p %lld\n",
+ ceph_cap_string(ci->i_flushing_caps),
+ inode, ceph_ino(inode));
+ ci->i_flushing_caps = 0;
+ list_del_init(&ci->i_flushing_item);
+ mdsc->num_cap_flushing--;
+ drop = 1;
+ }
+ if (drop && ci->i_wrbuffer_ref) {
+ pr_info(" dropping dirty data for %p %lld\n",
+ inode, ceph_ino(inode));
+ ci->i_wrbuffer_ref = 0;
+ ci->i_wrbuffer_ref_head = 0;
+ drop++;
+ }
+ spin_unlock(&mdsc->cap_dirty_lock);
+ }
+ spin_unlock(&inode->i_lock);
+ while (drop--)
+ iput(inode);
return 0;
}
@@ -820,6 +905,7 @@ static void remove_session_caps(struct ceph_mds_session *session)
dout("remove_session_caps on %p\n", session);
iterate_session_caps(session, remove_session_caps_cb, NULL);
BUG_ON(session->s_nr_caps > 0);
+ BUG_ON(!list_empty(&session->s_cap_flushing));
cleanup_cap_releases(session);
}
@@ -834,7 +920,7 @@ static int wake_up_session_cb(struct inode *inode, struct ceph_cap *cap,
{
struct ceph_inode_info *ci = ceph_inode(inode);
- wake_up(&ci->i_cap_wq);
+ wake_up_all(&ci->i_cap_wq);
if (arg) {
spin_lock(&inode->i_lock);
ci->i_wanted_max_size = 0;
@@ -882,8 +968,8 @@ static int send_renew_caps(struct ceph_mds_client *mdsc,
ceph_mds_state_name(state));
msg = create_session_msg(CEPH_SESSION_REQUEST_RENEWCAPS,
++session->s_renew_seq);
- if (IS_ERR(msg))
- return PTR_ERR(msg);
+ if (!msg)
+ return -ENOMEM;
ceph_con_send(&session->s_con, msg);
return 0;
}
@@ -930,17 +1016,15 @@ static int request_close_session(struct ceph_mds_client *mdsc,
struct ceph_mds_session *session)
{
struct ceph_msg *msg;
- int err = 0;
dout("request_close_session mds%d state %s seq %lld\n",
session->s_mds, session_state_name(session->s_state),
session->s_seq);
msg = create_session_msg(CEPH_SESSION_REQUEST_CLOSE, session->s_seq);
- if (IS_ERR(msg))
- err = PTR_ERR(msg);
- else
- ceph_con_send(&session->s_con, msg);
- return err;
+ if (!msg)
+ return -ENOMEM;
+ ceph_con_send(&session->s_con, msg);
+ return 0;
}
/*
@@ -1034,16 +1118,17 @@ static int trim_caps(struct ceph_mds_client *mdsc,
*
* Called under s_mutex.
*/
-static int add_cap_releases(struct ceph_mds_client *mdsc,
- struct ceph_mds_session *session,
- int extra)
+int ceph_add_cap_releases(struct ceph_mds_client *mdsc,
+ struct ceph_mds_session *session)
{
- struct ceph_msg *msg;
+ struct ceph_msg *msg, *partial = NULL;
struct ceph_mds_cap_release *head;
int err = -ENOMEM;
+ int extra = mdsc->client->mount_args->cap_release_safety;
+ int num;
- if (extra < 0)
- extra = mdsc->client->mount_args->cap_release_safety;
+ dout("add_cap_releases %p mds%d extra %d\n", session, session->s_mds,
+ extra);
spin_lock(&session->s_cap_lock);
@@ -1052,13 +1137,18 @@ static int add_cap_releases(struct ceph_mds_client *mdsc,
struct ceph_msg,
list_head);
head = msg->front.iov_base;
- extra += CEPH_CAPS_PER_RELEASE - le32_to_cpu(head->num);
+ num = le32_to_cpu(head->num);
+ if (num) {
+ dout(" partial %p with (%d/%d)\n", msg, num,
+ (int)CEPH_CAPS_PER_RELEASE);
+ extra += CEPH_CAPS_PER_RELEASE - num;
+ partial = msg;
+ }
}
-
while (session->s_num_cap_releases < session->s_nr_caps + extra) {
spin_unlock(&session->s_cap_lock);
msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE, PAGE_CACHE_SIZE,
- 0, 0, NULL);
+ GFP_NOFS);
if (!msg)
goto out_unlocked;
dout("add_cap_releases %p msg %p now %d\n", session, msg,
@@ -1071,19 +1161,14 @@ static int add_cap_releases(struct ceph_mds_client *mdsc,
session->s_num_cap_releases += CEPH_CAPS_PER_RELEASE;
}
- if (!list_empty(&session->s_cap_releases)) {
- msg = list_first_entry(&session->s_cap_releases,
- struct ceph_msg,
- list_head);
- head = msg->front.iov_base;
- if (head->num) {
- dout(" queueing non-full %p (%d)\n", msg,
- le32_to_cpu(head->num));
- list_move_tail(&msg->list_head,
- &session->s_cap_releases_done);
- session->s_num_cap_releases -=
- CEPH_CAPS_PER_RELEASE - le32_to_cpu(head->num);
- }
+ if (partial) {
+ head = partial->front.iov_base;
+ num = le32_to_cpu(head->num);
+ dout(" queueing partial %p with %d/%d\n", partial, num,
+ (int)CEPH_CAPS_PER_RELEASE);
+ list_move_tail(&partial->list_head,
+ &session->s_cap_releases_done);
+ session->s_num_cap_releases -= CEPH_CAPS_PER_RELEASE - num;
}
err = 0;
spin_unlock(&session->s_cap_lock);
@@ -1144,16 +1229,14 @@ static int check_cap_flush(struct ceph_mds_client *mdsc, u64 want_flush_seq)
/*
* called under s_mutex
*/
-static void send_cap_releases(struct ceph_mds_client *mdsc,
- struct ceph_mds_session *session)
+void ceph_send_cap_releases(struct ceph_mds_client *mdsc,
+ struct ceph_mds_session *session)
{
struct ceph_msg *msg;
dout("send_cap_releases mds%d\n", session->s_mds);
- while (1) {
- spin_lock(&session->s_cap_lock);
- if (list_empty(&session->s_cap_releases_done))
- break;
+ spin_lock(&session->s_cap_lock);
+ while (!list_empty(&session->s_cap_releases_done)) {
msg = list_first_entry(&session->s_cap_releases_done,
struct ceph_msg, list_head);
list_del_init(&msg->list_head);
@@ -1161,10 +1244,49 @@ static void send_cap_releases(struct ceph_mds_client *mdsc,
msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
dout("send_cap_releases mds%d %p\n", session->s_mds, msg);
ceph_con_send(&session->s_con, msg);
+ spin_lock(&session->s_cap_lock);
}
spin_unlock(&session->s_cap_lock);
}
+static void discard_cap_releases(struct ceph_mds_client *mdsc,
+ struct ceph_mds_session *session)
+{
+ struct ceph_msg *msg;
+ struct ceph_mds_cap_release *head;
+ unsigned num;
+
+ dout("discard_cap_releases mds%d\n", session->s_mds);
+ spin_lock(&session->s_cap_lock);
+
+ /* zero out the in-progress message */
+ msg = list_first_entry(&session->s_cap_releases,
+ struct ceph_msg, list_head);
+ head = msg->front.iov_base;
+ num = le32_to_cpu(head->num);
+ dout("discard_cap_releases mds%d %p %u\n", session->s_mds, msg, num);
+ head->num = cpu_to_le32(0);
+ session->s_num_cap_releases += num;
+
+ /* requeue completed messages */
+ while (!list_empty(&session->s_cap_releases_done)) {
+ msg = list_first_entry(&session->s_cap_releases_done,
+ struct ceph_msg, list_head);
+ list_del_init(&msg->list_head);
+
+ head = msg->front.iov_base;
+ num = le32_to_cpu(head->num);
+ dout("discard_cap_releases mds%d %p %u\n", session->s_mds, msg,
+ num);
+ session->s_num_cap_releases += num;
+ head->num = cpu_to_le32(0);
+ msg->front.iov_len = sizeof(*head);
+ list_add(&msg->list_head, &session->s_cap_releases);
+ }
+
+ spin_unlock(&session->s_cap_lock);
+}
+
/*
* requests
*/
@@ -1180,6 +1302,8 @@ ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode)
if (!req)
return ERR_PTR(-ENOMEM);
+ mutex_init(&req->r_fill_mutex);
+ req->r_mdsc = mdsc;
req->r_started = jiffies;
req->r_resend_mds = -1;
INIT_LIST_HEAD(&req->r_unsafe_dir_item);
@@ -1250,7 +1374,7 @@ retry:
len += 1 + temp->d_name.len;
temp = temp->d_parent;
if (temp == NULL) {
- pr_err("build_path_dentry corrupt dentry %p\n", dentry);
+ pr_err("build_path corrupt dentry %p\n", dentry);
return ERR_PTR(-EINVAL);
}
}
@@ -1266,7 +1390,7 @@ retry:
struct inode *inode = temp->d_inode;
if (inode && ceph_snap(inode) == CEPH_SNAPDIR) {
- dout("build_path_dentry path+%d: %p SNAPDIR\n",
+ dout("build_path path+%d: %p SNAPDIR\n",
pos, temp);
} else if (stop_on_nosnap && inode &&
ceph_snap(inode) == CEPH_NOSNAP) {
@@ -1277,20 +1401,18 @@ retry:
break;
strncpy(path + pos, temp->d_name.name,
temp->d_name.len);
- dout("build_path_dentry path+%d: %p '%.*s'\n",
- pos, temp, temp->d_name.len, path + pos);
}
if (pos)
path[--pos] = '/';
temp = temp->d_parent;
if (temp == NULL) {
- pr_err("build_path_dentry corrupt dentry\n");
+ pr_err("build_path corrupt dentry\n");
kfree(path);
return ERR_PTR(-EINVAL);
}
}
if (pos != 0) {
- pr_err("build_path_dentry did not end path lookup where "
+ pr_err("build_path did not end path lookup where "
"expected, namelen is %d, pos is %d\n", len, pos);
/* presumably this is only possible if racing with a
rename of one of the parent directories (we can not
@@ -1302,7 +1424,7 @@ retry:
*base = ceph_ino(temp->d_inode);
*plen = len;
- dout("build_path_dentry on %p %d built %llx '%.*s'\n",
+ dout("build_path on %p %d built %llx '%.*s'\n",
dentry, atomic_read(&dentry->d_count), *base, len, path);
return path;
}
@@ -1425,9 +1547,11 @@ static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc,
if (req->r_old_dentry_drop)
len += req->r_old_dentry->d_name.len;
- msg = ceph_msg_new(CEPH_MSG_CLIENT_REQUEST, len, 0, 0, NULL);
- if (IS_ERR(msg))
+ msg = ceph_msg_new(CEPH_MSG_CLIENT_REQUEST, len, GFP_NOFS);
+ if (!msg) {
+ msg = ERR_PTR(-ENOMEM);
goto out_free2;
+ }
msg->hdr.tid = cpu_to_le64(req->r_tid);
@@ -1444,6 +1568,9 @@ static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc,
ceph_encode_filepath(&p, end, ino1, path1);
ceph_encode_filepath(&p, end, ino2, path2);
+ /* make note of release offset, in case we need to replay */
+ req->r_request_release_offset = p - msg->front.iov_base;
+
/* cap releases */
releases = 0;
if (req->r_inode_drop)
@@ -1491,7 +1618,7 @@ static void complete_request(struct ceph_mds_client *mdsc,
if (req->r_callback)
req->r_callback(mdsc, req);
else
- complete(&req->r_completion);
+ complete_all(&req->r_completion);
}
/*
@@ -1507,18 +1634,53 @@ static int __prepare_send_request(struct ceph_mds_client *mdsc,
req->r_mds = mds;
req->r_attempts++;
+ if (req->r_inode) {
+ struct ceph_cap *cap =
+ ceph_get_cap_for_mds(ceph_inode(req->r_inode), mds);
+
+ if (cap)
+ req->r_sent_on_mseq = cap->mseq;
+ else
+ req->r_sent_on_mseq = -1;
+ }
dout("prepare_send_request %p tid %lld %s (attempt %d)\n", req,
req->r_tid, ceph_mds_op_name(req->r_op), req->r_attempts);
+ if (req->r_got_unsafe) {
+ /*
+ * Replay. Do not regenerate message (and rebuild
+ * paths, etc.); just use the original message.
+ * Rebuilding paths will break for renames because
+ * d_move mangles the src name.
+ */
+ msg = req->r_request;
+ rhead = msg->front.iov_base;
+
+ flags = le32_to_cpu(rhead->flags);
+ flags |= CEPH_MDS_FLAG_REPLAY;
+ rhead->flags = cpu_to_le32(flags);
+
+ if (req->r_target_inode)
+ rhead->ino = cpu_to_le64(ceph_ino(req->r_target_inode));
+
+ rhead->num_retry = req->r_attempts - 1;
+
+ /* remove cap/dentry releases from message */
+ rhead->num_releases = 0;
+ msg->hdr.front_len = cpu_to_le32(req->r_request_release_offset);
+ msg->front.iov_len = req->r_request_release_offset;
+ return 0;
+ }
+
if (req->r_request) {
ceph_msg_put(req->r_request);
req->r_request = NULL;
}
msg = create_request_message(mdsc, req, mds);
if (IS_ERR(msg)) {
- req->r_reply = ERR_PTR(PTR_ERR(msg));
+ req->r_err = PTR_ERR(msg);
complete_request(mdsc, req);
- return -PTR_ERR(msg);
+ return PTR_ERR(msg);
}
req->r_request = msg;
@@ -1531,13 +1693,9 @@ static int __prepare_send_request(struct ceph_mds_client *mdsc,
rhead->flags = cpu_to_le32(flags);
rhead->num_fwd = req->r_num_fwd;
rhead->num_retry = req->r_attempts - 1;
+ rhead->ino = 0;
dout(" r_locked_dir = %p\n", req->r_locked_dir);
-
- if (req->r_target_inode && req->r_got_unsafe)
- rhead->ino = cpu_to_le64(ceph_ino(req->r_target_inode));
- else
- rhead->ino = 0;
return 0;
}
@@ -1551,7 +1709,7 @@ static int __do_request(struct ceph_mds_client *mdsc,
int mds = -1;
int err = -EAGAIN;
- if (req->r_reply)
+ if (req->r_err || req->r_got_result)
goto out;
if (req->r_timeout &&
@@ -1608,7 +1766,7 @@ out:
return err;
finish:
- req->r_reply = ERR_PTR(err);
+ req->r_err = err;
complete_request(mdsc, req);
goto out;
}
@@ -1629,10 +1787,9 @@ static void __wake_requests(struct ceph_mds_client *mdsc,
/*
* Wake up threads with requests pending for @mds, so that they can
- * resubmit their requests to a possibly different mds. If @all is set,
- * wake up if their requests has been forwarded to @mds, too.
+ * resubmit their requests to a possibly different mds.
*/
-static void kick_requests(struct ceph_mds_client *mdsc, int mds, int all)
+static void kick_requests(struct ceph_mds_client *mdsc, int mds)
{
struct ceph_mds_request *req;
struct rb_node *p;
@@ -1688,64 +1845,78 @@ int ceph_mdsc_do_request(struct ceph_mds_client *mdsc,
__register_request(mdsc, req, dir);
__do_request(mdsc, req);
- /* wait */
- if (!req->r_reply) {
- mutex_unlock(&mdsc->mutex);
- if (req->r_timeout) {
- err = (long)wait_for_completion_interruptible_timeout(
- &req->r_completion, req->r_timeout);
- if (err == 0)
- req->r_reply = ERR_PTR(-EIO);
- else if (err < 0)
- req->r_reply = ERR_PTR(err);
- } else {
- err = wait_for_completion_interruptible(
- &req->r_completion);
- if (err)
- req->r_reply = ERR_PTR(err);
- }
- mutex_lock(&mdsc->mutex);
+ if (req->r_err) {
+ err = req->r_err;
+ __unregister_request(mdsc, req);
+ dout("do_request early error %d\n", err);
+ goto out;
}
- if (IS_ERR(req->r_reply)) {
- err = PTR_ERR(req->r_reply);
- req->r_reply = NULL;
+ /* wait */
+ mutex_unlock(&mdsc->mutex);
+ dout("do_request waiting\n");
+ if (req->r_timeout) {
+ err = (long)wait_for_completion_killable_timeout(
+ &req->r_completion, req->r_timeout);
+ if (err == 0)
+ err = -EIO;
+ } else {
+ err = wait_for_completion_killable(&req->r_completion);
+ }
+ dout("do_request waited, got %d\n", err);
+ mutex_lock(&mdsc->mutex);
- if (err == -ERESTARTSYS) {
- /* aborted */
- req->r_aborted = true;
+ /* only abort if we didn't race with a real reply */
+ if (req->r_got_result) {
+ err = le32_to_cpu(req->r_reply_info.head->result);
+ } else if (err < 0) {
+ dout("aborted request %lld with %d\n", req->r_tid, err);
- if (req->r_locked_dir &&
- (req->r_op & CEPH_MDS_OP_WRITE)) {
- struct ceph_inode_info *ci =
- ceph_inode(req->r_locked_dir);
+ /*
+ * ensure we aren't running concurrently with
+ * ceph_fill_trace or ceph_readdir_prepopulate, which
+ * rely on locks (dir mutex) held by our caller.
+ */
+ mutex_lock(&req->r_fill_mutex);
+ req->r_err = err;
+ req->r_aborted = true;
+ mutex_unlock(&req->r_fill_mutex);
- dout("aborted, clearing I_COMPLETE on %p\n",
- req->r_locked_dir);
- spin_lock(&req->r_locked_dir->i_lock);
- ci->i_ceph_flags &= ~CEPH_I_COMPLETE;
- ci->i_release_count++;
- spin_unlock(&req->r_locked_dir->i_lock);
- }
- } else {
- /* clean up this request */
- __unregister_request(mdsc, req);
- if (!list_empty(&req->r_unsafe_item))
- list_del_init(&req->r_unsafe_item);
- complete(&req->r_safe_completion);
- }
- } else if (req->r_err) {
- err = req->r_err;
+ if (req->r_locked_dir &&
+ (req->r_op & CEPH_MDS_OP_WRITE))
+ ceph_invalidate_dir_request(req);
} else {
- err = le32_to_cpu(req->r_reply_info.head->result);
+ err = req->r_err;
}
- mutex_unlock(&mdsc->mutex);
+out:
+ mutex_unlock(&mdsc->mutex);
dout("do_request %p done, result %d\n", req, err);
return err;
}
/*
+ * Invalidate dir I_COMPLETE, dentry lease state on an aborted MDS
+ * namespace request.
+ */
+void ceph_invalidate_dir_request(struct ceph_mds_request *req)
+{
+ struct inode *inode = req->r_locked_dir;
+ struct ceph_inode_info *ci = ceph_inode(inode);
+
+ dout("invalidate_dir_request %p (I_COMPLETE, lease(s))\n", inode);
+ spin_lock(&inode->i_lock);
+ ci->i_ceph_flags &= ~CEPH_I_COMPLETE;
+ ci->i_release_count++;
+ spin_unlock(&inode->i_lock);
+
+ if (req->r_dentry)
+ ceph_invalidate_dentry_lease(req->r_dentry);
+ if (req->r_old_dentry)
+ ceph_invalidate_dentry_lease(req->r_old_dentry);
+}
+
+/*
* Handle mds reply.
*
* We take the session mutex and parse and process the reply immediately.
@@ -1796,29 +1967,54 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
mutex_unlock(&mdsc->mutex);
goto out;
}
+ if (req->r_got_safe && !head->safe) {
+ pr_warning("got unsafe after safe on %llu from mds%d\n",
+ tid, mds);
+ mutex_unlock(&mdsc->mutex);
+ goto out;
+ }
result = le32_to_cpu(head->result);
/*
- * Tolerate 2 consecutive ESTALEs from the same mds.
- * FIXME: we should be looking at the cap migrate_seq.
+ * Handle an ESTALE
+ * if we're not talking to the authority, send to them
+ * if the authority has changed while we weren't looking,
+ * send to new authority
+ * Otherwise we just have to return an ESTALE
*/
if (result == -ESTALE) {
- req->r_direct_mode = USE_AUTH_MDS;
- req->r_num_stale++;
- if (req->r_num_stale <= 2) {
+ dout("got ESTALE on request %llu", req->r_tid);
+ if (!req->r_inode) {
+ /* do nothing; not an authority problem */
+ } else if (req->r_direct_mode != USE_AUTH_MDS) {
+ dout("not using auth, setting for that now");
+ req->r_direct_mode = USE_AUTH_MDS;
__do_request(mdsc, req);
mutex_unlock(&mdsc->mutex);
goto out;
+ } else {
+ struct ceph_inode_info *ci = ceph_inode(req->r_inode);
+ struct ceph_cap *cap =
+ ceph_get_cap_for_mds(ci, req->r_mds);;
+
+ dout("already using auth");
+ if ((!cap || cap != ci->i_auth_cap) ||
+ (cap->mseq != req->r_sent_on_mseq)) {
+ dout("but cap changed, so resending");
+ __do_request(mdsc, req);
+ mutex_unlock(&mdsc->mutex);
+ goto out;
+ }
}
- } else {
- req->r_num_stale = 0;
+ dout("have to return ESTALE on request %llu", req->r_tid);
}
+
if (head->safe) {
req->r_got_safe = true;
__unregister_request(mdsc, req);
- complete(&req->r_safe_completion);
+ complete_all(&req->r_safe_completion);
if (req->r_got_unsafe) {
/*
@@ -1833,15 +2029,11 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
/* last unsafe request during umount? */
if (mdsc->stopping && !__get_oldest_req(mdsc))
- complete(&mdsc->safe_umount_waiters);
+ complete_all(&mdsc->safe_umount_waiters);
mutex_unlock(&mdsc->mutex);
goto out;
}
- }
-
- BUG_ON(req->r_reply);
-
- if (!head->safe) {
+ } else {
req->r_got_unsafe = true;
list_add_tail(&req->r_unsafe_item, &req->r_session->s_unsafe);
}
@@ -1870,23 +2062,32 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
}
/* insert trace into our cache */
+ mutex_lock(&req->r_fill_mutex);
err = ceph_fill_trace(mdsc->client->sb, req, req->r_session);
if (err == 0) {
if (result == 0 && rinfo->dir_nr)
ceph_readdir_prepopulate(req, req->r_session);
- ceph_unreserve_caps(&req->r_caps_reservation);
+ ceph_unreserve_caps(mdsc, &req->r_caps_reservation);
}
+ mutex_unlock(&req->r_fill_mutex);
up_read(&mdsc->snap_rwsem);
out_err:
- if (err) {
- req->r_err = err;
+ mutex_lock(&mdsc->mutex);
+ if (!req->r_aborted) {
+ if (err) {
+ req->r_err = err;
+ } else {
+ req->r_reply = msg;
+ ceph_msg_get(msg);
+ req->r_got_result = true;
+ }
} else {
- req->r_reply = msg;
- ceph_msg_get(msg);
+ dout("reply arrived after request %lld was aborted\n", tid);
}
+ mutex_unlock(&mdsc->mutex);
- add_cap_releases(mdsc, req->r_session, -1);
+ ceph_add_cap_releases(mdsc, req->r_session);
mutex_unlock(&session->s_mutex);
/* kick calling process */
@@ -1920,16 +2121,21 @@ static void handle_forward(struct ceph_mds_client *mdsc,
mutex_lock(&mdsc->mutex);
req = __lookup_request(mdsc, tid);
if (!req) {
- dout("forward %llu to mds%d - req dne\n", tid, next_mds);
+ dout("forward tid %llu to mds%d - req dne\n", tid, next_mds);
goto out; /* dup reply? */
}
- if (fwd_seq <= req->r_num_fwd) {
- dout("forward %llu to mds%d - old seq %d <= %d\n",
+ if (req->r_aborted) {
+ dout("forward tid %llu aborted, unregistering\n", tid);
+ __unregister_request(mdsc, req);
+ } else if (fwd_seq <= req->r_num_fwd) {
+ dout("forward tid %llu to mds%d - old seq %d <= %d\n",
tid, next_mds, req->r_num_fwd, fwd_seq);
} else {
/* resend. forward race not possible; mds would drop */
- dout("forward %llu to mds%d (we resend)\n", tid, next_mds);
+ dout("forward tid %llu to mds%d (we resend)\n", tid, next_mds);
+ BUG_ON(req->r_err);
+ BUG_ON(req->r_got_result);
req->r_num_fwd = fwd_seq;
req->r_resend_mds = next_mds;
put_request_session(req);
@@ -1983,6 +2189,8 @@ static void handle_session(struct ceph_mds_session *session,
switch (op) {
case CEPH_SESSION_OPEN:
+ if (session->s_state == CEPH_MDS_SESSION_RECONNECTING)
+ pr_info("mds%d reconnect success\n", session->s_mds);
session->s_state = CEPH_MDS_SESSION_OPEN;
renewed_caps(mdsc, session, 0);
wake = 1;
@@ -1996,10 +2204,12 @@ static void handle_session(struct ceph_mds_session *session,
break;
case CEPH_SESSION_CLOSE:
+ if (session->s_state == CEPH_MDS_SESSION_RECONNECTING)
+ pr_info("mds%d reconnect denied\n", session->s_mds);
remove_session_caps(session);
wake = 1; /* for good measure */
- complete(&mdsc->session_close_waiters);
- kick_requests(mdsc, mds, 0); /* cur only */
+ complete_all(&mdsc->session_close_waiters);
+ kick_requests(mdsc, mds);
break;
case CEPH_SESSION_STALE:
@@ -2065,9 +2275,14 @@ static void replay_unsafe_requests(struct ceph_mds_client *mdsc,
static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
void *arg)
{
- struct ceph_mds_cap_reconnect rec;
+ union {
+ struct ceph_mds_cap_reconnect v2;
+ struct ceph_mds_cap_reconnect_v1 v1;
+ } rec;
+ size_t reclen;
struct ceph_inode_info *ci;
- struct ceph_pagelist *pagelist = arg;
+ struct ceph_reconnect_state *recon_state = arg;
+ struct ceph_pagelist *pagelist = recon_state->pagelist;
char *path;
int pathlen, err;
u64 pathbase;
@@ -2100,17 +2315,44 @@ static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
spin_lock(&inode->i_lock);
cap->seq = 0; /* reset cap seq */
cap->issue_seq = 0; /* and issue_seq */
- rec.cap_id = cpu_to_le64(cap->cap_id);
- rec.pathbase = cpu_to_le64(pathbase);
- rec.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
- rec.issued = cpu_to_le32(cap->issued);
- rec.size = cpu_to_le64(inode->i_size);
- ceph_encode_timespec(&rec.mtime, &inode->i_mtime);
- ceph_encode_timespec(&rec.atime, &inode->i_atime);
- rec.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
+
+ if (recon_state->flock) {
+ rec.v2.cap_id = cpu_to_le64(cap->cap_id);
+ rec.v2.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
+ rec.v2.issued = cpu_to_le32(cap->issued);
+ rec.v2.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
+ rec.v2.pathbase = cpu_to_le64(pathbase);
+ rec.v2.flock_len = 0;
+ reclen = sizeof(rec.v2);
+ } else {
+ rec.v1.cap_id = cpu_to_le64(cap->cap_id);
+ rec.v1.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
+ rec.v1.issued = cpu_to_le32(cap->issued);
+ rec.v1.size = cpu_to_le64(inode->i_size);
+ ceph_encode_timespec(&rec.v1.mtime, &inode->i_mtime);
+ ceph_encode_timespec(&rec.v1.atime, &inode->i_atime);
+ rec.v1.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
+ rec.v1.pathbase = cpu_to_le64(pathbase);
+ reclen = sizeof(rec.v1);
+ }
spin_unlock(&inode->i_lock);
- err = ceph_pagelist_append(pagelist, &rec, sizeof(rec));
+ if (recon_state->flock) {
+ int num_fcntl_locks, num_flock_locks;
+
+ lock_kernel();
+ ceph_count_locks(inode, &num_fcntl_locks, &num_flock_locks);
+ rec.v2.flock_len = (2*sizeof(u32) +
+ (num_fcntl_locks+num_flock_locks) *
+ sizeof(struct ceph_filelock));
+
+ err = ceph_pagelist_append(pagelist, &rec, reclen);
+ if (!err)
+ err = ceph_encode_locks(inode, pagelist,
+ num_fcntl_locks,
+ num_flock_locks);
+ unlock_kernel();
+ }
out:
kfree(path);
@@ -2131,61 +2373,55 @@ out:
*
* called with mdsc->mutex held.
*/
-static void send_mds_reconnect(struct ceph_mds_client *mdsc, int mds)
+static void send_mds_reconnect(struct ceph_mds_client *mdsc,
+ struct ceph_mds_session *session)
{
- struct ceph_mds_session *session = NULL;
struct ceph_msg *reply;
struct rb_node *p;
- int err;
+ int mds = session->s_mds;
+ int err = -ENOMEM;
struct ceph_pagelist *pagelist;
+ struct ceph_reconnect_state recon_state;
- pr_info("reconnect to recovering mds%d\n", mds);
+ pr_info("mds%d reconnect start\n", mds);
pagelist = kmalloc(sizeof(*pagelist), GFP_NOFS);
if (!pagelist)
goto fail_nopagelist;
ceph_pagelist_init(pagelist);
- reply = ceph_msg_new(CEPH_MSG_CLIENT_RECONNECT, 0, 0, 0, NULL);
- if (IS_ERR(reply)) {
- err = PTR_ERR(reply);
+ reply = ceph_msg_new(CEPH_MSG_CLIENT_RECONNECT, 0, GFP_NOFS);
+ if (!reply)
goto fail_nomsg;
- }
-
- /* find session */
- session = __ceph_lookup_mds_session(mdsc, mds);
- mutex_unlock(&mdsc->mutex); /* drop lock for duration */
- if (session) {
- mutex_lock(&session->s_mutex);
+ mutex_lock(&session->s_mutex);
+ session->s_state = CEPH_MDS_SESSION_RECONNECTING;
+ session->s_seq = 0;
- session->s_state = CEPH_MDS_SESSION_RECONNECTING;
- session->s_seq = 0;
+ ceph_con_open(&session->s_con,
+ ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
- ceph_con_open(&session->s_con,
- ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
-
- /* replay unsafe requests */
- replay_unsafe_requests(mdsc, session);
- } else {
- dout("no session for mds%d, will send short reconnect\n",
- mds);
- }
+ /* replay unsafe requests */
+ replay_unsafe_requests(mdsc, session);
down_read(&mdsc->snap_rwsem);
- if (!session)
- goto send;
dout("session %p state %s\n", session,
session_state_name(session->s_state));
+ /* drop old cap expires; we're about to reestablish that state */
+ discard_cap_releases(mdsc, session);
+
/* traverse this session's caps */
err = ceph_pagelist_encode_32(pagelist, session->s_nr_caps);
if (err)
goto fail;
- err = iterate_session_caps(session, encode_caps_cb, pagelist);
+
+ recon_state.pagelist = pagelist;
+ recon_state.flock = session->s_con.peer_features & CEPH_FEATURE_FLOCK;
+ err = iterate_session_caps(session, encode_caps_cb, &recon_state);
if (err < 0)
- goto out;
+ goto fail;
/*
* snaprealms. we provide mds with the ino, seq (version), and
@@ -2207,34 +2443,32 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc, int mds)
goto fail;
}
-send:
reply->pagelist = pagelist;
+ if (recon_state.flock)
+ reply->hdr.version = cpu_to_le16(2);
reply->hdr.data_len = cpu_to_le32(pagelist->length);
reply->nr_pages = calc_pages_for(0, pagelist->length);
ceph_con_send(&session->s_con, reply);
- if (session) {
- session->s_state = CEPH_MDS_SESSION_OPEN;
- __wake_requests(mdsc, &session->s_waiting);
- }
+ mutex_unlock(&session->s_mutex);
-out:
- up_read(&mdsc->snap_rwsem);
- if (session) {
- mutex_unlock(&session->s_mutex);
- ceph_put_mds_session(session);
- }
mutex_lock(&mdsc->mutex);
+ __wake_requests(mdsc, &session->s_waiting);
+ mutex_unlock(&mdsc->mutex);
+
+ up_read(&mdsc->snap_rwsem);
return;
fail:
ceph_msg_put(reply);
+ up_read(&mdsc->snap_rwsem);
+ mutex_unlock(&session->s_mutex);
fail_nomsg:
ceph_pagelist_release(pagelist);
kfree(pagelist);
fail_nopagelist:
- pr_err("ENOMEM preparing reconnect for mds%d\n", mds);
- goto out;
+ pr_err("error %d preparing reconnect for mds%d\n", err, mds);
+ return;
}
@@ -2262,9 +2496,11 @@ static void check_new_map(struct ceph_mds_client *mdsc,
oldstate = ceph_mdsmap_get_state(oldmap, i);
newstate = ceph_mdsmap_get_state(newmap, i);
- dout("check_new_map mds%d state %s -> %s (session %s)\n",
+ dout("check_new_map mds%d state %s%s -> %s%s (session %s)\n",
i, ceph_mds_state_name(oldstate),
+ ceph_mdsmap_is_laggy(oldmap, i) ? " (laggy)" : "",
ceph_mds_state_name(newstate),
+ ceph_mdsmap_is_laggy(newmap, i) ? " (laggy)" : "",
session_state_name(s->s_state));
if (memcmp(ceph_mdsmap_get_addr(oldmap, i),
@@ -2286,7 +2522,7 @@ static void check_new_map(struct ceph_mds_client *mdsc,
}
/* kick any requests waiting on the recovering mds */
- kick_requests(mdsc, i, 1);
+ kick_requests(mdsc, i);
} else if (oldstate == newstate) {
continue; /* nothing new with this mds */
}
@@ -2295,26 +2531,40 @@ static void check_new_map(struct ceph_mds_client *mdsc,
* send reconnect?
*/
if (s->s_state == CEPH_MDS_SESSION_RESTARTING &&
- newstate >= CEPH_MDS_STATE_RECONNECT)
- send_mds_reconnect(mdsc, i);
+ newstate >= CEPH_MDS_STATE_RECONNECT) {
+ mutex_unlock(&mdsc->mutex);
+ send_mds_reconnect(mdsc, s);
+ mutex_lock(&mdsc->mutex);
+ }
/*
- * kick requests on any mds that has gone active.
- *
- * kick requests on cur or forwarder: we may have sent
- * the request to mds1, mds1 told us it forwarded it
- * to mds2, but then we learn mds1 failed and can't be
- * sure it successfully forwarded our request before
- * it died.
+ * kick request on any mds that has gone active.
*/
if (oldstate < CEPH_MDS_STATE_ACTIVE &&
newstate >= CEPH_MDS_STATE_ACTIVE) {
- pr_info("mds%d reconnect completed\n", s->s_mds);
- kick_requests(mdsc, i, 1);
+ if (oldstate != CEPH_MDS_STATE_CREATING &&
+ oldstate != CEPH_MDS_STATE_STARTING)
+ pr_info("mds%d recovery completed\n", s->s_mds);
+ kick_requests(mdsc, i);
ceph_kick_flushing_caps(mdsc, s);
wake_up_session_caps(s, 1);
}
}
+
+ for (i = 0; i < newmap->m_max_mds && i < mdsc->max_sessions; i++) {
+ s = mdsc->sessions[i];
+ if (!s)
+ continue;
+ if (!ceph_mdsmap_is_laggy(newmap, i))
+ continue;
+ if (s->s_state == CEPH_MDS_SESSION_OPEN ||
+ s->s_state == CEPH_MDS_SESSION_HUNG ||
+ s->s_state == CEPH_MDS_SESSION_CLOSING) {
+ dout(" connecting to export targets of laggy mds%d\n",
+ i);
+ __open_export_target_sessions(mdsc, s);
+ }
+ }
}
@@ -2345,6 +2595,7 @@ static void handle_lease(struct ceph_mds_client *mdsc,
struct ceph_dentry_info *di;
int mds = session->s_mds;
struct ceph_mds_lease *h = msg->front.iov_base;
+ u32 seq;
struct ceph_vino vino;
int mask;
struct qstr dname;
@@ -2358,6 +2609,7 @@ static void handle_lease(struct ceph_mds_client *mdsc,
vino.ino = le64_to_cpu(h->ino);
vino.snap = CEPH_NOSNAP;
mask = le16_to_cpu(h->mask);
+ seq = le32_to_cpu(h->seq);
dname.name = (void *)h + sizeof(*h) + sizeof(u32);
dname.len = msg->front.iov_len - sizeof(*h) - sizeof(u32);
if (dname.len != get_unaligned_le32(h+1))
@@ -2368,8 +2620,9 @@ static void handle_lease(struct ceph_mds_client *mdsc,
/* lookup inode */
inode = ceph_find_inode(sb, vino);
- dout("handle_lease '%s', mask %d, ino %llx %p\n",
- ceph_lease_op_name(h->action), mask, vino.ino, inode);
+ dout("handle_lease %s, mask %d, ino %llx %p %.*s\n",
+ ceph_lease_op_name(h->action), mask, vino.ino, inode,
+ dname.len, dname.name);
if (inode == NULL) {
dout("handle_lease no inode %llx\n", vino.ino);
goto release;
@@ -2394,7 +2647,8 @@ static void handle_lease(struct ceph_mds_client *mdsc,
switch (h->action) {
case CEPH_MDS_LEASE_REVOKE:
if (di && di->lease_session == session) {
- h->seq = cpu_to_le32(di->lease_seq);
+ if (ceph_seq_cmp(di->lease_seq, seq) > 0)
+ h->seq = cpu_to_le32(di->lease_seq);
__ceph_mdsc_drop_dentry_lease(dentry);
}
release = 1;
@@ -2408,7 +2662,7 @@ static void handle_lease(struct ceph_mds_client *mdsc,
unsigned long duration =
le32_to_cpu(h->duration_ms) * HZ / 1000;
- di->lease_seq = le32_to_cpu(h->seq);
+ di->lease_seq = seq;
dentry->d_time = di->lease_renew_from + duration;
di->lease_renew_after = di->lease_renew_from +
(duration >> 1);
@@ -2453,12 +2707,12 @@ void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session,
dnamelen = dentry->d_name.len;
len += dnamelen;
- msg = ceph_msg_new(CEPH_MSG_CLIENT_LEASE, len, 0, 0, NULL);
- if (IS_ERR(msg))
+ msg = ceph_msg_new(CEPH_MSG_CLIENT_LEASE, len, GFP_NOFS);
+ if (!msg)
return;
lease = msg->front.iov_base;
lease->action = action;
- lease->mask = cpu_to_le16(CEPH_LOCK_DN);
+ lease->mask = cpu_to_le16(1);
lease->ino = cpu_to_le64(ceph_vino(inode).ino);
lease->first = lease->last = cpu_to_le64(ceph_vino(inode).snap);
lease->seq = cpu_to_le32(seq);
@@ -2488,7 +2742,7 @@ void ceph_mdsc_lease_release(struct ceph_mds_client *mdsc, struct inode *inode,
BUG_ON(inode == NULL);
BUG_ON(dentry == NULL);
- BUG_ON(mask != CEPH_LOCK_DN);
+ BUG_ON(mask == 0);
/* is dentry lease valid? */
spin_lock(&dentry->d_lock);
@@ -2598,8 +2852,10 @@ static void delayed_work(struct work_struct *work)
send_renew_caps(mdsc, s);
else
ceph_con_keepalive(&s->s_con);
- add_cap_releases(mdsc, s, -1);
- send_cap_releases(mdsc, s);
+ ceph_add_cap_releases(mdsc, s);
+ if (s->s_state == CEPH_MDS_SESSION_OPEN ||
+ s->s_state == CEPH_MDS_SESSION_HUNG)
+ ceph_send_cap_releases(mdsc, s);
mutex_unlock(&s->s_mutex);
ceph_put_mds_session(s);
@@ -2616,6 +2872,9 @@ int ceph_mdsc_init(struct ceph_mds_client *mdsc, struct ceph_client *client)
mdsc->client = client;
mutex_init(&mdsc->mutex);
mdsc->mdsmap = kzalloc(sizeof(*mdsc->mdsmap), GFP_NOFS);
+ if (mdsc->mdsmap == NULL)
+ return -ENOMEM;
+
init_completion(&mdsc->safe_umount_waiters);
init_completion(&mdsc->session_close_waiters);
INIT_LIST_HEAD(&mdsc->waiting_for_map);
@@ -2641,6 +2900,10 @@ int ceph_mdsc_init(struct ceph_mds_client *mdsc, struct ceph_client *client)
init_waitqueue_head(&mdsc->cap_flushing_wq);
spin_lock_init(&mdsc->dentry_lru_lock);
INIT_LIST_HEAD(&mdsc->dentry_lru);
+
+ ceph_caps_init(mdsc);
+ ceph_adjust_min_caps(mdsc, client->min_caps);
+
return 0;
}
@@ -2685,6 +2948,12 @@ void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc)
drop_leases(mdsc);
ceph_flush_dirty_caps(mdsc);
wait_requests(mdsc);
+
+ /*
+ * wait for reply handlers to drop their request refs and
+ * their inode/dcache refs
+ */
+ ceph_msgr_flush();
}
/*
@@ -2736,6 +3005,9 @@ void ceph_mdsc_sync(struct ceph_mds_client *mdsc)
{
u64 want_tid, want_flush;
+ if (mdsc->client->mount_state == CEPH_MOUNT_SHUTDOWN)
+ return;
+
dout("sync\n");
mutex_lock(&mdsc->mutex);
want_tid = mdsc->last_tid;
@@ -2827,6 +3099,7 @@ void ceph_mdsc_stop(struct ceph_mds_client *mdsc)
if (mdsc->mdsmap)
ceph_mdsmap_destroy(mdsc->mdsmap);
kfree(mdsc->sessions);
+ ceph_caps_finalize(mdsc);
}
@@ -2918,9 +3191,10 @@ static void con_put(struct ceph_connection *con)
static void peer_reset(struct ceph_connection *con)
{
struct ceph_mds_session *s = con->private;
+ struct ceph_mds_client *mdsc = s->s_mdsc;
- pr_err("mds%d gave us the boot. IMPLEMENT RECONNECT.\n",
- s->s_mds);
+ pr_warning("mds%d closed our session\n", s->s_mds);
+ send_mds_reconnect(mdsc, s);
}
static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
@@ -3027,7 +3301,7 @@ static int invalidate_authorizer(struct ceph_connection *con)
return ceph_monc_validate_auth(&mdsc->client->monc);
}
-const static struct ceph_connection_operations mds_con_ops = {
+static const struct ceph_connection_operations mds_con_ops = {
.get = con_get,
.put = con_put,
.dispatch = dispatch,