summaryrefslogtreecommitdiff
path: root/fs/dlm
diff options
context:
space:
mode:
authorAlexander Aring <aahringo@redhat.com>2021-05-21 15:08:48 -0400
committerDavid Teigland <teigland@redhat.com>2021-05-25 09:22:20 -0500
commit706474fbc5fedd7799b488962aad3541b235165b (patch)
tree0e36ac1d819bf4302f21910939bdb7659ae12136 /fs/dlm
parent5b2f981fde8b0dbf0bfa117bb4322342fcfb7174 (diff)
fs: dlm: don't allow half transmitted messages
This patch will clean a dirty page buffer if a reconnect occurs. If a page buffer was half transmitted we cannot start inside the middle of a dlm message if a node connects again. I observed invalid length receptions errors and was guessing that this behaviour occurs, after this patch I never saw an invalid message length again. This patch might drops more messages for dlm version 3.1 but 3.1 can't deal with half messages as well, for 3.2 it might trigger more re-transmissions but will not leave dlm in a broken state. Signed-off-by: Alexander Aring <aahringo@redhat.com> Signed-off-by: David Teigland <teigland@redhat.com>
Diffstat (limited to 'fs/dlm')
-rw-r--r--fs/dlm/lowcomms.c95
1 files changed, 60 insertions, 35 deletions
diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c
index fe9113bd5ba0..36adccc4f849 100644
--- a/fs/dlm/lowcomms.c
+++ b/fs/dlm/lowcomms.c
@@ -118,6 +118,7 @@ struct writequeue_entry {
int len;
int end;
int users;
+ bool dirty;
struct connection *con;
struct list_head msgs;
struct kref ref;
@@ -700,6 +701,42 @@ static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port,
memset((char *)saddr + *addr_len, 0, sizeof(struct sockaddr_storage) - *addr_len);
}
+static void dlm_page_release(struct kref *kref)
+{
+ struct writequeue_entry *e = container_of(kref, struct writequeue_entry,
+ ref);
+
+ __free_page(e->page);
+ kfree(e);
+}
+
+static void dlm_msg_release(struct kref *kref)
+{
+ struct dlm_msg *msg = container_of(kref, struct dlm_msg, ref);
+
+ kref_put(&msg->entry->ref, dlm_page_release);
+ kfree(msg);
+}
+
+static void free_entry(struct writequeue_entry *e)
+{
+ struct dlm_msg *msg, *tmp;
+
+ list_for_each_entry_safe(msg, tmp, &e->msgs, list) {
+ if (msg->orig_msg) {
+ msg->orig_msg->retransmit = false;
+ kref_put(&msg->orig_msg->ref, dlm_msg_release);
+ }
+
+ list_del(&msg->list);
+ kref_put(&msg->ref, dlm_msg_release);
+ }
+
+ list_del(&e->list);
+ atomic_dec(&e->con->writequeue_cnt);
+ kref_put(&e->ref, dlm_page_release);
+}
+
static void dlm_close_sock(struct socket **sock)
{
if (*sock) {
@@ -714,6 +751,7 @@ static void close_connection(struct connection *con, bool and_other,
bool tx, bool rx)
{
bool closing = test_and_set_bit(CF_CLOSING, &con->flags);
+ struct writequeue_entry *e;
if (tx && !closing && cancel_work_sync(&con->swork)) {
log_print("canceled swork for node %d", con->nodeid);
@@ -732,6 +770,26 @@ static void close_connection(struct connection *con, bool and_other,
close_connection(con->othercon, false, tx, rx);
}
+ /* if we send a writequeue entry only a half way, we drop the
+ * whole entry because reconnection and that we not start of the
+ * middle of a msg which will confuse the other end.
+ *
+ * we can always drop messages because retransmits, but what we
+ * cannot allow is to transmit half messages which may be processed
+ * at the other side.
+ *
+ * our policy is to start on a clean state when disconnects, we don't
+ * know what's send/received on transport layer in this case.
+ */
+ spin_lock(&con->writequeue_lock);
+ if (!list_empty(&con->writequeue)) {
+ e = list_first_entry(&con->writequeue, struct writequeue_entry,
+ list);
+ if (e->dirty)
+ free_entry(e);
+ }
+ spin_unlock(&con->writequeue_lock);
+
con->rx_leftover = 0;
con->retries = 0;
clear_bit(CF_CONNECTED, &con->flags);
@@ -1026,41 +1084,6 @@ accept_err:
return result;
}
-static void dlm_page_release(struct kref *kref)
-{
- struct writequeue_entry *e = container_of(kref, struct writequeue_entry,
- ref);
-
- __free_page(e->page);
- kfree(e);
-}
-
-static void dlm_msg_release(struct kref *kref)
-{
- struct dlm_msg *msg = container_of(kref, struct dlm_msg, ref);
-
- kref_put(&msg->entry->ref, dlm_page_release);
- kfree(msg);
-}
-
-static void free_entry(struct writequeue_entry *e)
-{
- struct dlm_msg *msg, *tmp;
-
- list_for_each_entry_safe(msg, tmp, &e->msgs, list) {
- if (msg->orig_msg) {
- msg->orig_msg->retransmit = false;
- kref_put(&msg->orig_msg->ref, dlm_msg_release);
- }
- list_del(&msg->list);
- kref_put(&msg->ref, dlm_msg_release);
- }
-
- list_del(&e->list);
- atomic_dec(&e->con->writequeue_cnt);
- kref_put(&e->ref, dlm_page_release);
-}
-
/*
* writequeue_entry_complete - try to delete and free write queue entry
* @e: write queue entry to try to delete
@@ -1072,6 +1095,8 @@ static void writequeue_entry_complete(struct writequeue_entry *e, int completed)
{
e->offset += completed;
e->len -= completed;
+ /* signal that page was half way transmitted */
+ e->dirty = true;
if (e->len == 0 && e->users == 0)
free_entry(e);