This patch reverts commit 2c3fa6ae4d52 ("dlm: check required context while close"). The function dlm_midcomms_close(), which will call later dlm_lowcomms_close(), is called when the cluster manager tells the node got fenced which means on midcomms/lowcomms layer to disconnect the node from the cluster communication. The node can rejoin the cluster later. This patch was ensuring no new message were able to be triggered when we are in the close() function context. This was done by checking if the lockspace has been stopped. However there is a missing check that we only need to check specific lockspaces where the fenced node is member of. This is currently complicated because there is no way to easily check if a node is part of a specific lockspace without stopping the recovery. For now we just revert this commit as it is just a check to finding possible leaks of stopping lockspaces before close() is called.
Cc: stable@vger.kernel.org Fixes: 2c3fa6ae4d52 ("dlm: check required context while close") Signed-off-by: Alexander Aring aahringo@redhat.com --- fs/dlm/lockspace.c | 12 ------------ fs/dlm/lockspace.h | 1 - fs/dlm/midcomms.c | 3 --- 3 files changed, 16 deletions(-)
diff --git a/fs/dlm/lockspace.c b/fs/dlm/lockspace.c index 67261b7b1f0e..0455dddb0797 100644 --- a/fs/dlm/lockspace.c +++ b/fs/dlm/lockspace.c @@ -935,15 +935,3 @@ void dlm_stop_lockspaces(void) log_print("dlm user daemon left %d lockspaces", count); }
-void dlm_stop_lockspaces_check(void) -{ - struct dlm_ls *ls; - - spin_lock(&lslist_lock); - list_for_each_entry(ls, &lslist, ls_list) { - if (WARN_ON(!rwsem_is_locked(&ls->ls_in_recovery) || - !dlm_locking_stopped(ls))) - break; - } - spin_unlock(&lslist_lock); -} diff --git a/fs/dlm/lockspace.h b/fs/dlm/lockspace.h index 03f4a4a3a871..47ebd4411926 100644 --- a/fs/dlm/lockspace.h +++ b/fs/dlm/lockspace.h @@ -27,7 +27,6 @@ struct dlm_ls *dlm_find_lockspace_local(void *id); struct dlm_ls *dlm_find_lockspace_device(int minor); void dlm_put_lockspace(struct dlm_ls *ls); void dlm_stop_lockspaces(void); -void dlm_stop_lockspaces_check(void); int dlm_new_user_lockspace(const char *name, const char *cluster, uint32_t flags, int lvblen, const struct dlm_lockspace_ops *ops, diff --git a/fs/dlm/midcomms.c b/fs/dlm/midcomms.c index c02c43e4980a..3df916a568ba 100644 --- a/fs/dlm/midcomms.c +++ b/fs/dlm/midcomms.c @@ -136,7 +136,6 @@ #include <net/tcp.h>
#include "dlm_internal.h" -#include "lockspace.h" #include "lowcomms.h" #include "config.h" #include "memory.h" @@ -1491,8 +1490,6 @@ int dlm_midcomms_close(int nodeid) if (nodeid == dlm_our_nodeid()) return 0;
- dlm_stop_lockspaces_check(); - idx = srcu_read_lock(&nodes_srcu); /* Abort pending close/remove operation */ node = nodeid2node(nodeid, 0);
This patch clears the DLM_IFL_CB_PENDING_BIT flag which will be set when there is callback work queued when there was no callback to dequeue. It is a buggy case and should never happen, that's why there is a WARN_ON(). However if the case happens we are prepared to somehow recover from it.
Cc: stable@vger.kernel.org Fixes: 61bed0baa4db ("fs: dlm: use a non-static queue for callbacks") Signed-off-by: Alexander Aring aahringo@redhat.com --- fs/dlm/ast.c | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-)
diff --git a/fs/dlm/ast.c b/fs/dlm/ast.c index 700ff2e0515a..ff0ef4653535 100644 --- a/fs/dlm/ast.c +++ b/fs/dlm/ast.c @@ -181,10 +181,12 @@ void dlm_callback_work(struct work_struct *work)
spin_lock(&lkb->lkb_cb_lock); rv = dlm_dequeue_lkb_callback(lkb, &cb); - spin_unlock(&lkb->lkb_cb_lock); - - if (WARN_ON_ONCE(rv == DLM_DEQUEUE_CALLBACK_EMPTY)) + if (WARN_ON_ONCE(rv == DLM_DEQUEUE_CALLBACK_EMPTY)) { + clear_bit(DLM_IFL_CB_PENDING_BIT, &lkb->lkb_iflags); + spin_unlock(&lkb->lkb_cb_lock); goto out; + } + spin_unlock(&lkb->lkb_cb_lock);
for (;;) { castfn = lkb->lkb_astfn;
This patch sets the process_dlm_messages_pending boolean to false when there was no message to process. It is a case which should not happen but if we are prepared to recover from this situation by setting pending boolean to false.
Cc: stable@vger.kernel.org Fixes: dbb751ffab0b ("fs: dlm: parallelize lowcomms socket handling") Signed-off-by: Alexander Aring aahringo@redhat.com --- fs/dlm/lowcomms.c | 1 + 1 file changed, 1 insertion(+)
diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c index 3d3802c47b8b..5aad4d4842eb 100644 --- a/fs/dlm/lowcomms.c +++ b/fs/dlm/lowcomms.c @@ -898,6 +898,7 @@ static void process_dlm_messages(struct work_struct *work) pentry = list_first_entry_or_null(&processqueue, struct processqueue_entry, list); if (WARN_ON_ONCE(!pentry)) { + process_dlm_messages_pending = false; spin_unlock(&processqueue_lock); return; }
The dlm modules midcomms, debugfs, lockspace, uses kmem caches. We ensure that the kmem caches getting deallocated after those modules exited.
Signed-off-by: Alexander Aring aahringo@redhat.com --- fs/dlm/main.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/fs/dlm/main.c b/fs/dlm/main.c index 93755f83a30d..6ca28299c9db 100644 --- a/fs/dlm/main.c +++ b/fs/dlm/main.c @@ -73,10 +73,10 @@ static void __exit exit_dlm(void) dlm_plock_exit(); dlm_user_exit(); dlm_config_exit(); - dlm_memory_exit(); dlm_lockspace_exit(); dlm_midcomms_exit(); dlm_unregister_debugfs(); + dlm_memory_exit(); }
module_init(init_dlm);
This patch removes an another check if con->othercon set inside the branch which already does that.
Signed-off-by: Alexander Aring aahringo@redhat.com --- fs/dlm/lowcomms.c | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-)
diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c index 5aad4d4842eb..b28505b8b23b 100644 --- a/fs/dlm/lowcomms.c +++ b/fs/dlm/lowcomms.c @@ -1497,8 +1497,7 @@ int dlm_lowcomms_close(int nodeid) call_srcu(&connections_srcu, &con->rcu, connection_release); if (con->othercon) { clean_one_writequeue(con->othercon); - if (con->othercon) - call_srcu(&connections_srcu, &con->othercon->rcu, connection_release); + call_srcu(&connections_srcu, &con->othercon->rcu, connection_release); } srcu_read_unlock(&connections_srcu, idx);
There should no difference between setting the CF_IO_STOP flag before restore_callbacks() to do it before or afterwards. The restore_callbacks() will be sure that no callback is executed anymore when the bit wasn't set.
Signed-off-by: Alexander Aring aahringo@redhat.com --- fs/dlm/lowcomms.c | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-)
diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c index b28505b8b23b..5a7586633cbe 100644 --- a/fs/dlm/lowcomms.c +++ b/fs/dlm/lowcomms.c @@ -735,19 +735,15 @@ static void stop_connection_io(struct connection *con) if (con->othercon) stop_connection_io(con->othercon);
+ spin_lock_bh(&con->writequeue_lock); + set_bit(CF_IO_STOP, &con->flags); + spin_unlock_bh(&con->writequeue_lock); + down_write(&con->sock_lock); if (con->sock) { lock_sock(con->sock->sk); restore_callbacks(con->sock->sk); - - spin_lock_bh(&con->writequeue_lock); - set_bit(CF_IO_STOP, &con->flags); - spin_unlock_bh(&con->writequeue_lock); release_sock(con->sock->sk); - } else { - spin_lock_bh(&con->writequeue_lock); - set_bit(CF_IO_STOP, &con->flags); - spin_unlock_bh(&con->writequeue_lock); } up_write(&con->sock_lock);
This patch moves the dlm_purge_lkb_callbacks() function from ast to user dlm module as it is only a function being used by dlm user implementation. I got be hinted to hold specific locks regarding the callback handling for dlm_purge_lkb_callbacks() but it was false positive. It is confusing because ast dlm implementation uses a different locking behaviour as user locks uses as DLM handles kernel and user dlm locks differently. To avoid the confusing we move this function to dlm user implementation.
Signed-off-by: Alexander Aring aahringo@redhat.com --- fs/dlm/ast.c | 17 ----------------- fs/dlm/ast.h | 1 - fs/dlm/user.c | 18 ++++++++++++++++++ fs/dlm/user.h | 1 + 4 files changed, 19 insertions(+), 18 deletions(-)
diff --git a/fs/dlm/ast.c b/fs/dlm/ast.c index ff0ef4653535..1f2f70a1b824 100644 --- a/fs/dlm/ast.c +++ b/fs/dlm/ast.c @@ -36,23 +36,6 @@ void dlm_callback_set_last_ptr(struct dlm_callback **from, *from = to; }
-void dlm_purge_lkb_callbacks(struct dlm_lkb *lkb) -{ - struct dlm_callback *cb, *safe; - - list_for_each_entry_safe(cb, safe, &lkb->lkb_callbacks, list) { - list_del(&cb->list); - kref_put(&cb->ref, dlm_release_callback); - } - - clear_bit(DLM_IFL_CB_PENDING_BIT, &lkb->lkb_iflags); - - /* invalidate */ - dlm_callback_set_last_ptr(&lkb->lkb_last_cast, NULL); - dlm_callback_set_last_ptr(&lkb->lkb_last_cb, NULL); - lkb->lkb_last_bast_mode = -1; -} - int dlm_enqueue_lkb_callback(struct dlm_lkb *lkb, uint32_t flags, int mode, int status, uint32_t sbflags) { diff --git a/fs/dlm/ast.h b/fs/dlm/ast.h index 880b11882495..ce007892dc2d 100644 --- a/fs/dlm/ast.h +++ b/fs/dlm/ast.h @@ -26,7 +26,6 @@ void dlm_callback_set_last_ptr(struct dlm_callback **from, struct dlm_callback *to);
void dlm_release_callback(struct kref *ref); -void dlm_purge_lkb_callbacks(struct dlm_lkb *lkb); void dlm_callback_work(struct work_struct *work); int dlm_callback_start(struct dlm_ls *ls); void dlm_callback_stop(struct dlm_ls *ls); diff --git a/fs/dlm/user.c b/fs/dlm/user.c index d9c09fc0aba1..695e691b38b3 100644 --- a/fs/dlm/user.c +++ b/fs/dlm/user.c @@ -145,6 +145,24 @@ static void compat_output(struct dlm_lock_result *res, } #endif
+/* should held proc->asts_spin lock */ +void dlm_purge_lkb_callbacks(struct dlm_lkb *lkb) +{ + struct dlm_callback *cb, *safe; + + list_for_each_entry_safe(cb, safe, &lkb->lkb_callbacks, list) { + list_del(&cb->list); + kref_put(&cb->ref, dlm_release_callback); + } + + clear_bit(DLM_IFL_CB_PENDING_BIT, &lkb->lkb_iflags); + + /* invalidate */ + dlm_callback_set_last_ptr(&lkb->lkb_last_cast, NULL); + dlm_callback_set_last_ptr(&lkb->lkb_last_cb, NULL); + lkb->lkb_last_bast_mode = -1; +} + /* Figure out if this lock is at the end of its life and no longer available for the application to use. The lkb still exists until the final ast is read. A lock becomes EOL in three situations: diff --git a/fs/dlm/user.h b/fs/dlm/user.h index 33059452d79e..2caf8e6e24d5 100644 --- a/fs/dlm/user.h +++ b/fs/dlm/user.h @@ -6,6 +6,7 @@ #ifndef __USER_DOT_H__ #define __USER_DOT_H__
+void dlm_purge_lkb_callbacks(struct dlm_lkb *lkb); void dlm_user_add_ast(struct dlm_lkb *lkb, uint32_t flags, int mode, int status, uint32_t sbflags); int dlm_user_init(void);
This patch warns about messages which are received from nodes who already left the lockspace resource signaled by the cluster manager. Before commit 489d8e559c65 ("fs: dlm: add reliable connection if reconnect") there was a synchronization issue with the socket lifetime and the cluster event of leaving a lockspace and other nodes did not stop of sending messages because the cluster manager has a pending message to leave the lockspace. The reliable session layer for dlm use sequence numbers to ensure dlm message were never being dropped. If this is not corrected synchronized we have a problem, this patch will use the filter case and turn it into a WARN_ON_ONCE() so we seeing such issue on the kernel log because it should never happen now.
Signed-off-by: Alexander Aring aahringo@redhat.com --- fs/dlm/lock.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-)
diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c index debf8a55ad7d..ede903246fa4 100644 --- a/fs/dlm/lock.c +++ b/fs/dlm/lock.c @@ -4616,7 +4616,7 @@ static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms, { int error = 0, noent = 0;
- if (!dlm_is_member(ls, le32_to_cpu(ms->m_header.h_nodeid))) { + if (WARN_ON_ONCE(!dlm_is_member(ls, le32_to_cpu(ms->m_header.h_nodeid)))) { log_limit(ls, "receive %d from non-member %d %x %x %d", le32_to_cpu(ms->m_type), le32_to_cpu(ms->m_header.h_nodeid), @@ -4754,7 +4754,7 @@ static void dlm_receive_message(struct dlm_ls *ls, struct dlm_message *ms, /* If we were a member of this lockspace, left, and rejoined, other nodes may still be sending us messages from the lockspace generation before we left. */ - if (!ls->ls_generation) { + if (WARN_ON_ONCE(!ls->ls_generation)) { log_limit(ls, "receive %d from %d ignore old gen", le32_to_cpu(ms->m_type), nodeid); return;
It makes no sense to call midcomms/lowcomms functionality for the local node as socket functionality is only required for remote nodes. This patch filters those calls in the upper layer of lockspace membership handling instead of doing it in midcomms/lowcomms layer as they should never be aware of local nodeid.
Signed-off-by: Alexander Aring aahringo@redhat.com --- fs/dlm/config.c | 3 ++- fs/dlm/lowcomms.c | 3 --- fs/dlm/member.c | 37 ++++++++++++++++++++++++++++++------- fs/dlm/midcomms.c | 9 --------- 4 files changed, 32 insertions(+), 20 deletions(-)
diff --git a/fs/dlm/config.c b/fs/dlm/config.c index 2beceff024e3..4246cd425671 100644 --- a/fs/dlm/config.c +++ b/fs/dlm/config.c @@ -532,7 +532,8 @@ static void drop_comm(struct config_group *g, struct config_item *i) struct dlm_comm *cm = config_item_to_comm(i); if (local_comm == cm) local_comm = NULL; - dlm_midcomms_close(cm->nodeid); + if (!cm->local) + dlm_midcomms_close(cm->nodeid); while (cm->addr_count--) kfree(cm->addr[cm->addr_count]); config_item_put(i); diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c index 5a7586633cbe..345a316ae54c 100644 --- a/fs/dlm/lowcomms.c +++ b/fs/dlm/lowcomms.c @@ -546,9 +546,6 @@ int dlm_lowcomms_connect_node(int nodeid) struct connection *con; int idx;
- if (nodeid == dlm_our_nodeid()) - return 0; - idx = srcu_read_lock(&connections_srcu); con = nodeid2con(nodeid, 0); if (WARN_ON_ONCE(!con)) { diff --git a/fs/dlm/member.c b/fs/dlm/member.c index 923c01a8a0aa..77d202e4a02a 100644 --- a/fs/dlm/member.c +++ b/fs/dlm/member.c @@ -307,6 +307,21 @@ static void add_ordered_member(struct dlm_ls *ls, struct dlm_member *new) } }
+static int add_remote_member(int nodeid) +{ + int error; + + if (nodeid == dlm_our_nodeid()) + return 0; + + error = dlm_lowcomms_connect_node(nodeid); + if (error < 0) + return error; + + dlm_midcomms_add_member(nodeid); + return 0; +} + static int dlm_add_member(struct dlm_ls *ls, struct dlm_config_node *node) { struct dlm_member *memb; @@ -316,16 +331,16 @@ static int dlm_add_member(struct dlm_ls *ls, struct dlm_config_node *node) if (!memb) return -ENOMEM;
- error = dlm_lowcomms_connect_node(node->nodeid); + memb->nodeid = node->nodeid; + memb->weight = node->weight; + memb->comm_seq = node->comm_seq; + + error = add_remote_member(node->nodeid); if (error < 0) { kfree(memb); return error; }
- memb->nodeid = node->nodeid; - memb->weight = node->weight; - memb->comm_seq = node->comm_seq; - dlm_midcomms_add_member(node->nodeid); add_ordered_member(ls, memb); ls->ls_num_nodes++; return 0; @@ -370,11 +385,19 @@ static void clear_memb_list(struct list_head *head, } }
-static void clear_members_cb(int nodeid) +static void remove_remote_member(int nodeid) { + if (nodeid == dlm_our_nodeid()) + return; + dlm_midcomms_remove_member(nodeid); }
+static void clear_members_cb(int nodeid) +{ + remove_remote_member(nodeid); +} + void dlm_clear_members(struct dlm_ls *ls) { clear_memb_list(&ls->ls_nodes, clear_members_cb); @@ -562,7 +585,7 @@ int dlm_recover_members(struct dlm_ls *ls, struct dlm_recover *rv, int *neg_out)
neg++; list_move(&memb->list, &ls->ls_nodes_gone); - dlm_midcomms_remove_member(memb->nodeid); + remove_remote_member(memb->nodeid); ls->ls_num_nodes--; dlm_lsop_recover_slot(ls, memb); } diff --git a/fs/dlm/midcomms.c b/fs/dlm/midcomms.c index 3df916a568ba..9c66cb853d17 100644 --- a/fs/dlm/midcomms.c +++ b/fs/dlm/midcomms.c @@ -1280,9 +1280,6 @@ void dlm_midcomms_add_member(int nodeid) struct midcomms_node *node; int idx;
- if (nodeid == dlm_our_nodeid()) - return; - idx = srcu_read_lock(&nodes_srcu); node = nodeid2node(nodeid, GFP_NOFS); if (!node) { @@ -1328,9 +1325,6 @@ void dlm_midcomms_remove_member(int nodeid) struct midcomms_node *node; int idx;
- if (nodeid == dlm_our_nodeid()) - return; - idx = srcu_read_lock(&nodes_srcu); node = nodeid2node(nodeid, 0); if (!node) { @@ -1487,9 +1481,6 @@ int dlm_midcomms_close(int nodeid) struct midcomms_node *node; int idx, ret;
- if (nodeid == dlm_our_nodeid()) - return 0; - idx = srcu_read_lock(&nodes_srcu); /* Abort pending close/remove operation */ node = nodeid2node(nodeid, 0);
Currently the lkb_wait_count is locked by the rsb lock and it should be fine to handle lkb_wait_count as non atomic_t value. However for the overall process of reducing locking this patch converts it to an atomic_t value.
Signed-off-by: Alexander Aring aahringo@redhat.com --- fs/dlm/dlm_internal.h | 2 +- fs/dlm/lock.c | 32 ++++++++++++++------------------ 2 files changed, 15 insertions(+), 19 deletions(-)
diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h index 986a9d7b1f33..c8156770205e 100644 --- a/fs/dlm/dlm_internal.h +++ b/fs/dlm/dlm_internal.h @@ -246,7 +246,7 @@ struct dlm_lkb { int8_t lkb_highbast; /* highest mode bast sent for */
int8_t lkb_wait_type; /* type of reply waiting for */ - int8_t lkb_wait_count; + atomic_t lkb_wait_count; int lkb_wait_nodeid; /* for debugging */
struct list_head lkb_statequeue; /* rsb g/c/w list */ diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c index ede903246fa4..f511a9d7d416 100644 --- a/fs/dlm/lock.c +++ b/fs/dlm/lock.c @@ -1407,6 +1407,7 @@ static int add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid) { struct dlm_ls *ls = lkb->lkb_resource->res_ls; int error = 0; + int wc;
mutex_lock(&ls->ls_waiters_mutex);
@@ -1428,20 +1429,17 @@ static int add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid) error = -EBUSY; goto out; } - lkb->lkb_wait_count++; + wc = atomic_inc_return(&lkb->lkb_wait_count); hold_lkb(lkb);
log_debug(ls, "addwait %x cur %d overlap %d count %d f %x", - lkb->lkb_id, lkb->lkb_wait_type, mstype, - lkb->lkb_wait_count, dlm_iflags_val(lkb)); + lkb->lkb_id, lkb->lkb_wait_type, mstype, wc, + dlm_iflags_val(lkb)); goto out; }
- DLM_ASSERT(!lkb->lkb_wait_count, - dlm_print_lkb(lkb); - printk("wait_count %d\n", lkb->lkb_wait_count);); - - lkb->lkb_wait_count++; + wc = atomic_fetch_inc(&lkb->lkb_wait_count); + DLM_ASSERT(!wc, dlm_print_lkb(lkb); printk("wait_count %d\n", wc);); lkb->lkb_wait_type = mstype; lkb->lkb_wait_nodeid = to_nodeid; /* for debugging */ hold_lkb(lkb); @@ -1504,7 +1502,7 @@ static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype, log_debug(ls, "remwait %x convert_reply zap overlap_cancel", lkb->lkb_id); lkb->lkb_wait_type = 0; - lkb->lkb_wait_count--; + atomic_dec(&lkb->lkb_wait_count); unhold_lkb(lkb); goto out_del; } @@ -1531,16 +1529,15 @@ static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype, if (overlap_done && lkb->lkb_wait_type) { log_error(ls, "remwait error %x reply %d wait_type %d overlap", lkb->lkb_id, mstype, lkb->lkb_wait_type); - lkb->lkb_wait_count--; + atomic_dec(&lkb->lkb_wait_count); unhold_lkb(lkb); lkb->lkb_wait_type = 0; }
- DLM_ASSERT(lkb->lkb_wait_count, dlm_print_lkb(lkb);); + DLM_ASSERT(atomic_read(&lkb->lkb_wait_count), dlm_print_lkb(lkb););
clear_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags); - lkb->lkb_wait_count--; - if (!lkb->lkb_wait_count) + if (atomic_dec_and_test(&lkb->lkb_wait_count)) list_del_init(&lkb->lkb_wait_reply); unhold_lkb(lkb); return 0; @@ -2669,7 +2666,7 @@ static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb, goto out;
/* lock not allowed if there's any op in progress */ - if (lkb->lkb_wait_type || lkb->lkb_wait_count) + if (lkb->lkb_wait_type || atomic_read(&lkb->lkb_wait_count)) goto out;
if (is_overlap(lkb)) @@ -2731,7 +2728,7 @@ static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
/* normal unlock not allowed if there's any op in progress */ if (!(args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) && - (lkb->lkb_wait_type || lkb->lkb_wait_count)) + (lkb->lkb_wait_type || atomic_read(&lkb->lkb_wait_count))) goto out;
/* an lkb may be waiting for an rsb lookup to complete where the @@ -5066,10 +5063,9 @@ int dlm_recover_waiters_post(struct dlm_ls *ls) /* drop all wait_count references we still * hold a reference for this iteration. */ - while (lkb->lkb_wait_count) { - lkb->lkb_wait_count--; + while (!atomic_dec_and_test(&lkb->lkb_wait_count)) unhold_lkb(lkb); - } + mutex_lock(&ls->ls_waiters_mutex); list_del_init(&lkb->lkb_wait_reply); mutex_unlock(&ls->ls_waiters_mutex);
Currently seq_next is only be read on the receive side which processed in an ordered way. The seq_send is being protected by locks. To being able to read the seq_next value on send side as well we convert it to an atomic_t value. The atomic_cmpxchg() is probably not necessary, however the atomic_inc() depends on a if coniditional and this should be handled in an atomic context.
Signed-off-by: Alexander Aring aahringo@redhat.com --- fs/dlm/midcomms.c | 40 +++++++++++++++++++++++++--------------- 1 file changed, 25 insertions(+), 15 deletions(-)
diff --git a/fs/dlm/midcomms.c b/fs/dlm/midcomms.c index 9c66cb853d17..70286737eb0a 100644 --- a/fs/dlm/midcomms.c +++ b/fs/dlm/midcomms.c @@ -152,8 +152,8 @@ struct midcomms_node { int nodeid; uint32_t version; - uint32_t seq_send; - uint32_t seq_next; + atomic_t seq_send; + atomic_t seq_next; /* These queues are unbound because we cannot drop any message in dlm. * We could send a fence signal for a specific node to the cluster * manager if queues hits some maximum value, however this handling @@ -317,8 +317,8 @@ static void midcomms_node_reset(struct midcomms_node *node) { pr_debug("reset node %d\n", node->nodeid);
- node->seq_next = DLM_SEQ_INIT; - node->seq_send = DLM_SEQ_INIT; + atomic_set(&node->seq_next, DLM_SEQ_INIT); + atomic_set(&node->seq_send, DLM_SEQ_INIT); node->version = DLM_VERSION_NOT_SET; node->flags = 0;
@@ -492,9 +492,19 @@ static void dlm_midcomms_receive_buffer(union dlm_packet *p, struct midcomms_node *node, uint32_t seq) { - if (seq == node->seq_next) { - node->seq_next++; + bool is_expected_seq; + uint32_t oval, nval;
+ do { + oval = atomic_read(&node->seq_next); + is_expected_seq = (oval == seq); + if (!is_expected_seq) + break; + + nval = oval + 1; + } while (atomic_cmpxchg(&node->seq_next, oval, nval) != oval); + + if (is_expected_seq) { switch (p->header.h_cmd) { case DLM_FIN: spin_lock(&node->state_lock); @@ -503,7 +513,7 @@ static void dlm_midcomms_receive_buffer(union dlm_packet *p,
switch (node->state) { case DLM_ESTABLISHED: - dlm_send_ack(node->nodeid, node->seq_next); + dlm_send_ack(node->nodeid, nval);
/* passive shutdown DLM_LAST_ACK case 1 * additional we check if the node is used by @@ -522,14 +532,14 @@ static void dlm_midcomms_receive_buffer(union dlm_packet *p, } break; case DLM_FIN_WAIT1: - dlm_send_ack(node->nodeid, node->seq_next); + dlm_send_ack(node->nodeid, nval); node->state = DLM_CLOSING; set_bit(DLM_NODE_FLAG_STOP_RX, &node->flags); pr_debug("switch node %d to state %s\n", node->nodeid, dlm_state_str(node->state)); break; case DLM_FIN_WAIT2: - dlm_send_ack(node->nodeid, node->seq_next); + dlm_send_ack(node->nodeid, nval); midcomms_node_reset(node); pr_debug("switch node %d to state %s\n", node->nodeid, dlm_state_str(node->state)); @@ -557,11 +567,11 @@ static void dlm_midcomms_receive_buffer(union dlm_packet *p, /* retry to ack message which we already have by sending back * current node->seq_next number as ack. */ - if (seq < node->seq_next) - dlm_send_ack(node->nodeid, node->seq_next); + if (seq < oval) + dlm_send_ack(node->nodeid, oval);
log_print_ratelimited("ignore dlm msg because seq mismatch, seq: %u, expected: %u, nodeid: %d", - seq, node->seq_next, node->nodeid); + seq, oval, node->nodeid); } }
@@ -992,7 +1002,7 @@ void dlm_midcomms_receive_done(int nodeid) switch (node->state) { case DLM_ESTABLISHED: spin_unlock(&node->state_lock); - dlm_send_ack(node->nodeid, node->seq_next); + dlm_send_ack(node->nodeid, atomic_read(&node->seq_next)); break; default: spin_unlock(&node->state_lock); @@ -1058,7 +1068,7 @@ static void midcomms_new_msg_cb(void *data) list_add_tail_rcu(&mh->list, &mh->node->send_queue); spin_unlock_bh(&mh->node->send_queue_lock);
- mh->seq = mh->node->seq_send++; + mh->seq = atomic_fetch_inc(&mh->node->seq_send); }
static struct dlm_msg *dlm_midcomms_get_msg_3_2(struct dlm_mhandle *mh, int nodeid, @@ -1530,7 +1540,7 @@ static void midcomms_new_rawmsg_cb(void *data) switch (h->h_cmd) { case DLM_OPTS: if (!h->u.h_seq) - h->u.h_seq = cpu_to_le32(rd->node->seq_send++); + h->u.h_seq = cpu_to_le32(atomic_fetch_inc(&rd->node->seq_send)); break; default: break;
This patch changes the time when we sending an ack back to tell the other side it can free some message because it is arrived on the receiver node, due random reconnects e.g. TCP resets this is handled as well on application layer to not let DLM run into a deadlock state.
The current handling has the following problems:
1. We end in situations that we only send an ack back message of 16 bytes out and no other messages. Whereas DLM has logic to combine so much messages as it can in one send() socket call. This behaviour can be discovered by "trace-cmd start -e dlm_recv" and observing the ret field being 16 bytes.
2. When processing of DLM messages will never end because we receive a lot of messages, we will not send an ack back as it happens when the processing loop ends.
This patch introduces a likely and unlikely threshold case. The likely case will send an ack back on a transmit path if the threshold is triggered of amount of processed upper layer protocol. This will solve issue 1 because it will be send when another normal DLM message will be sent. It solves issue 2 because it is not part of the processing loop.
There is however a unlikely case, the unlikely case has a bigger threshold and will be triggered when we only receive messages and do not sent any message back. This case avoids that the sending node will keep a lot of message for a long time as we send sometimes ack backs to tell the sender to finally release messages.
The atomic cmpxchg() is there to provide a atomically ack send with reset of the upper layer protocol delivery counter.
Signed-off-by: Alexander Aring aahringo@redhat.com --- fs/dlm/lowcomms.c | 30 ------------------- fs/dlm/midcomms.c | 76 +++++++++++++++++++---------------------------- 2 files changed, 31 insertions(+), 75 deletions(-)
diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c index 345a316ae54c..68092f953830 100644 --- a/fs/dlm/lowcomms.c +++ b/fs/dlm/lowcomms.c @@ -860,30 +860,8 @@ struct dlm_processed_nodes { struct list_head list; };
-static void add_processed_node(int nodeid, struct list_head *processed_nodes) -{ - struct dlm_processed_nodes *n; - - list_for_each_entry(n, processed_nodes, list) { - /* we already remembered this node */ - if (n->nodeid == nodeid) - return; - } - - /* if it's fails in worst case we simple don't send an ack back. - * We try it next time. - */ - n = kmalloc(sizeof(*n), GFP_NOFS); - if (!n) - return; - - n->nodeid = nodeid; - list_add(&n->list, processed_nodes); -} - static void process_dlm_messages(struct work_struct *work) { - struct dlm_processed_nodes *n, *n_tmp; struct processqueue_entry *pentry; LIST_HEAD(processed_nodes);
@@ -902,7 +880,6 @@ static void process_dlm_messages(struct work_struct *work) for (;;) { dlm_process_incoming_buffer(pentry->nodeid, pentry->buf, pentry->buflen); - add_processed_node(pentry->nodeid, &processed_nodes); free_processqueue_entry(pentry);
spin_lock(&processqueue_lock); @@ -917,13 +894,6 @@ static void process_dlm_messages(struct work_struct *work) list_del(&pentry->list); spin_unlock(&processqueue_lock); } - - /* send ack back after we processed couple of messages */ - list_for_each_entry_safe(n, n_tmp, &processed_nodes, list) { - list_del(&n->list); - dlm_midcomms_receive_done(n->nodeid); - kfree(n); - } }
/* Data received from remote end */ diff --git a/fs/dlm/midcomms.c b/fs/dlm/midcomms.c index 70286737eb0a..e1a0df67b566 100644 --- a/fs/dlm/midcomms.c +++ b/fs/dlm/midcomms.c @@ -148,6 +148,8 @@ /* 5 seconds wait to sync ending of dlm */ #define DLM_SHUTDOWN_TIMEOUT msecs_to_jiffies(5000) #define DLM_VERSION_NOT_SET 0 +#define DLM_SEND_ACK_BACK_MSG_THRESHOLD 32 +#define DLM_RECV_ACK_BACK_MSG_THRESHOLD (DLM_SEND_ACK_BACK_MSG_THRESHOLD * 8)
struct midcomms_node { int nodeid; @@ -165,7 +167,7 @@ struct midcomms_node { #define DLM_NODE_FLAG_CLOSE 1 #define DLM_NODE_FLAG_STOP_TX 2 #define DLM_NODE_FLAG_STOP_RX 3 -#define DLM_NODE_ULP_DELIVERED 4 + atomic_t ulp_delivered; unsigned long flags; wait_queue_head_t shutdown_wait;
@@ -319,6 +321,7 @@ static void midcomms_node_reset(struct midcomms_node *node)
atomic_set(&node->seq_next, DLM_SEQ_INIT); atomic_set(&node->seq_send, DLM_SEQ_INIT); + atomic_set(&node->ulp_delivered, 0); node->version = DLM_VERSION_NOT_SET; node->flags = 0;
@@ -393,6 +396,28 @@ static int dlm_send_ack(int nodeid, uint32_t seq) return 0; }
+static void dlm_send_ack_threshold(struct midcomms_node *node, + uint32_t threshold) +{ + uint32_t oval, nval; + bool send_ack; + + /* let only send one user trigger threshold to send ack back */ + do { + oval = atomic_read(&node->ulp_delivered); + send_ack = (oval > threshold); + /* abort if threshold is not reached */ + if (!send_ack) + break; + + nval = 0; + /* try to reset ulp_delivered counter */ + } while (atomic_cmpxchg(&node->ulp_delivered, oval, nval) != oval); + + if (send_ack) + dlm_send_ack(node->nodeid, atomic_read(&node->seq_next)); +} + static int dlm_send_fin(struct midcomms_node *node, void (*ack_rcv)(struct midcomms_node *node)) { @@ -560,7 +585,9 @@ static void dlm_midcomms_receive_buffer(union dlm_packet *p, WARN_ON_ONCE(test_bit(DLM_NODE_FLAG_STOP_RX, &node->flags)); dlm_receive_buffer_3_2_trace(seq, p); dlm_receive_buffer(p, node->nodeid); - set_bit(DLM_NODE_ULP_DELIVERED, &node->flags); + atomic_inc(&node->ulp_delivered); + /* unlikely case to send ack back when we don't transmit */ + dlm_send_ack_threshold(node, DLM_RECV_ACK_BACK_MSG_THRESHOLD); break; } } else { @@ -969,49 +996,6 @@ int dlm_process_incoming_buffer(int nodeid, unsigned char *buf, int len) return ret; }
-void dlm_midcomms_receive_done(int nodeid) -{ - struct midcomms_node *node; - int idx; - - idx = srcu_read_lock(&nodes_srcu); - node = nodeid2node(nodeid, 0); - if (!node) { - srcu_read_unlock(&nodes_srcu, idx); - return; - } - - /* old protocol, we do nothing */ - switch (node->version) { - case DLM_VERSION_3_2: - break; - default: - srcu_read_unlock(&nodes_srcu, idx); - return; - } - - /* do nothing if we didn't delivered stateful to ulp */ - if (!test_and_clear_bit(DLM_NODE_ULP_DELIVERED, - &node->flags)) { - srcu_read_unlock(&nodes_srcu, idx); - return; - } - - spin_lock(&node->state_lock); - /* we only ack if state is ESTABLISHED */ - switch (node->state) { - case DLM_ESTABLISHED: - spin_unlock(&node->state_lock); - dlm_send_ack(node->nodeid, atomic_read(&node->seq_next)); - break; - default: - spin_unlock(&node->state_lock); - /* do nothing FIN has it's own ack send */ - break; - } - srcu_read_unlock(&nodes_srcu, idx); -} - void dlm_midcomms_unack_msg_resend(int nodeid) { struct midcomms_node *node; @@ -1142,6 +1126,8 @@ struct dlm_mhandle *dlm_midcomms_get_mhandle(int nodeid, int len, goto err; }
+ /* send ack back if necessary */ + dlm_send_ack_threshold(node, DLM_SEND_ACK_BACK_MSG_THRESHOLD); break; default: dlm_free_mhandle(mh);
linux-stable-mirror@lists.linaro.org