The iput() function is a dangerous one - if the reference counter goes to zero, the function may block for a long time due to:
- inode_wait_for_writeback() waits until writeback on this inode completes
- the filesystem-specific "evict_inode" callback can do similar things; e.g. all netfs-based filesystems will call netfs_wait_for_outstanding_io() which is similar to inode_wait_for_writeback()
Therefore, callers must carefully evaluate the context they're in and check whether invoking iput() is a good idea at all.
Most of the time, this is not a problem because the dcache holds references to all inodes, and the dcache is usually the one to release the last reference. But this assumption is fragile. For example, under (memcg) memory pressure, the dcache shrinker is more likely to release inode references, moving the inode eviction to contexts where that was extremely unlikely to occur.
Our production servers "found" at least two deadlock bugs in the Ceph filesystem that were caused by this iput() behavior:
1. Writeback may lead to iput() calls in Ceph (e.g. from ceph_put_wrbuffer_cap_refs()) which deadlocks in inode_wait_for_writeback(). Waiting for writeback completion from within writeback will obviously never be able to make any progress. This leads to blocked kworkers like this:
INFO: task kworker/u777:6:1270802 blocked for more than 122 seconds. Not tainted 6.16.7-i1-es #773 task:kworker/u777:6 state:D stack:0 pid:1270802 tgid:1270802 ppid:2 task_flags:0x4208060 flags:0x00004000 Workqueue: writeback wb_workfn (flush-ceph-3) Call Trace: <TASK> __schedule+0x4ea/0x17d0 schedule+0x1c/0xc0 inode_wait_for_writeback+0x71/0xb0 evict+0xcf/0x200 ceph_put_wrbuffer_cap_refs+0xdd/0x220 ceph_invalidate_folio+0x97/0xc0 ceph_writepages_start+0x127b/0x14d0 do_writepages+0xba/0x150 __writeback_single_inode+0x34/0x290 writeback_sb_inodes+0x203/0x470 __writeback_inodes_wb+0x4c/0xe0 wb_writeback+0x189/0x2b0 wb_workfn+0x30b/0x3d0 process_one_work+0x143/0x2b0 worker_thread+0x30a/0x450
2. In the Ceph messenger thread (net/ceph/messenger*.c), any iput() call may invoke ceph_evict_inode() which will deadlock in netfs_wait_for_outstanding_io(); since this blocks the messenger thread, completions from the Ceph servers will not ever be received and handled.
It looks like these deadlock bugs have been in the Ceph filesystem code since forever (therefore no "Fixes" tag in this patch). There may be various ways to solve this:
- make iput() asynchronous and defer the actual eviction like fput() (may add overhead)
- make iput() only asynchronous if I_SYNC is set (doesn't solve random things happening inside the "evict_inode" callback)
- add iput_deferred() to make this asynchronous behavior/overhead optional and explicit
- refactor Ceph to avoid iput() calls from within writeback and messenger (if that is even possible)
- add a Ceph-specific workaround
After advice from Mateusz Guzik, I decided to do the latter. The implementation is simple because it piggybacks on the existing work_struct for ceph_queue_inode_work() - ceph_inode_work() calls iput() at the end which means we can donate the last reference to it.
This patch adds ceph_iput_async() and converts lots of iput() calls to it - at least those that may come through writeback and the messenger.
Signed-off-by: Max Kellermann max.kellermann@ionos.com Cc: Mateusz Guzik mjguzik@gmail.com Cc: stable@vger.kernel.org ---
v1->v2: using atomic_add_unless() instead of atomic_add_unless() to avoid letting i_count drop to zero which may cause races (thanks Mateusz Guzik)
Signed-off-by: Max Kellermann max.kellermann@ionos.com --- fs/ceph/addr.c | 2 +- fs/ceph/caps.c | 16 ++++++++-------- fs/ceph/dir.c | 2 +- fs/ceph/inode.c | 34 ++++++++++++++++++++++++++++++++++ fs/ceph/mds_client.c | 30 +++++++++++++++--------------- fs/ceph/quota.c | 4 ++-- fs/ceph/snap.c | 10 +++++----- fs/ceph/super.h | 2 ++ 8 files changed, 68 insertions(+), 32 deletions(-)
diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c index 322ed268f14a..fc497c91530e 100644 --- a/fs/ceph/addr.c +++ b/fs/ceph/addr.c @@ -265,7 +265,7 @@ static void finish_netfs_read(struct ceph_osd_request *req) subreq->error = err; trace_netfs_sreq(subreq, netfs_sreq_trace_io_progress); netfs_read_subreq_terminated(subreq); - iput(req->r_inode); + ceph_iput_async(req->r_inode); ceph_dec_osd_stopping_blocker(fsc->mdsc); }
diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c index b1a8ff612c41..af9e3ae9ab7e 100644 --- a/fs/ceph/caps.c +++ b/fs/ceph/caps.c @@ -1771,7 +1771,7 @@ void ceph_flush_snaps(struct ceph_inode_info *ci, spin_unlock(&mdsc->snap_flush_lock);
if (need_put) - iput(inode); + ceph_iput_async(inode); }
/* @@ -3319,7 +3319,7 @@ static void __ceph_put_cap_refs(struct ceph_inode_info *ci, int had, if (wake) wake_up_all(&ci->i_cap_wq); while (put-- > 0) - iput(inode); + ceph_iput_async(inode); }
void ceph_put_cap_refs(struct ceph_inode_info *ci, int had) @@ -3419,7 +3419,7 @@ void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr, if (complete_capsnap) wake_up_all(&ci->i_cap_wq); while (put-- > 0) { - iput(inode); + ceph_iput_async(inode); } }
@@ -3917,7 +3917,7 @@ static void handle_cap_flush_ack(struct inode *inode, u64 flush_tid, if (wake_mdsc) wake_up_all(&mdsc->cap_flushing_wq); if (drop) - iput(inode); + ceph_iput_async(inode); }
void __ceph_remove_capsnap(struct inode *inode, struct ceph_cap_snap *capsnap, @@ -4008,7 +4008,7 @@ static void handle_cap_flushsnap_ack(struct inode *inode, u64 flush_tid, wake_up_all(&ci->i_cap_wq); if (wake_mdsc) wake_up_all(&mdsc->cap_flushing_wq); - iput(inode); + ceph_iput_async(inode); } }
@@ -4557,7 +4557,7 @@ void ceph_handle_caps(struct ceph_mds_session *session, done: mutex_unlock(&session->s_mutex); done_unlocked: - iput(inode); + ceph_iput_async(inode); out: ceph_dec_mds_stopping_blocker(mdsc);
@@ -4636,7 +4636,7 @@ unsigned long ceph_check_delayed_caps(struct ceph_mds_client *mdsc) doutc(cl, "on %p %llx.%llx\n", inode, ceph_vinop(inode)); ceph_check_caps(ci, 0); - iput(inode); + ceph_iput_async(inode); spin_lock(&mdsc->cap_delay_lock); }
@@ -4675,7 +4675,7 @@ static void flush_dirty_session_caps(struct ceph_mds_session *s) spin_unlock(&mdsc->cap_dirty_lock); ceph_wait_on_async_create(inode); ceph_check_caps(ci, CHECK_CAPS_FLUSH); - iput(inode); + ceph_iput_async(inode); spin_lock(&mdsc->cap_dirty_lock); } spin_unlock(&mdsc->cap_dirty_lock); diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c index 32973c62c1a2..ec73ed52a227 100644 --- a/fs/ceph/dir.c +++ b/fs/ceph/dir.c @@ -1290,7 +1290,7 @@ static void ceph_async_unlink_cb(struct ceph_mds_client *mdsc, ceph_mdsc_free_path_info(&path_info); } out: - iput(req->r_old_inode); + ceph_iput_async(req->r_old_inode); ceph_mdsc_release_dir_caps(req); }
diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c index f67025465de0..d7c0ed82bf62 100644 --- a/fs/ceph/inode.c +++ b/fs/ceph/inode.c @@ -2191,6 +2191,40 @@ void ceph_queue_inode_work(struct inode *inode, int work_bit) } }
+/** + * Queue an asynchronous iput() call in a worker thread. Use this + * instead of iput() in contexts where evicting the inode is unsafe. + * For example, inode eviction may cause deadlocks in + * inode_wait_for_writeback() (when called from within writeback) or + * in netfs_wait_for_outstanding_io() (when called from within the + * Ceph messenger). + */ +void ceph_iput_async(struct inode *inode) +{ + if (unlikely(!inode)) + return; + + if (likely(atomic_add_unless(&inode->i_count, -1, 1))) + /* somebody else is holding another reference - + * nothing left to do for us + */ + return; + + doutc(ceph_inode_to_fs_client(inode)->client, "%p %llx.%llx\n", inode, ceph_vinop(inode)); + + /* simply queue a ceph_inode_work() (donating the remaining + * reference) without setting i_work_mask bit; other than + * putting the reference, there is nothing to do + */ + WARN_ON_ONCE(!queue_work(ceph_inode_to_fs_client(inode)->inode_wq, + &ceph_inode(inode)->i_work)); + + /* note: queue_work() cannot fail; it i_work were already + * queued, then it would be holding another reference, but no + * such reference exists + */ +} + static void ceph_do_invalidate_pages(struct inode *inode) { struct ceph_client *cl = ceph_inode_to_client(inode); diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c index 3bc72b47fe4d..22d75c3be5a8 100644 --- a/fs/ceph/mds_client.c +++ b/fs/ceph/mds_client.c @@ -1097,14 +1097,14 @@ void ceph_mdsc_release_request(struct kref *kref) ceph_msg_put(req->r_reply); if (req->r_inode) { ceph_put_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN); - iput(req->r_inode); + ceph_iput_async(req->r_inode); } if (req->r_parent) { ceph_put_cap_refs(ceph_inode(req->r_parent), CEPH_CAP_PIN); - iput(req->r_parent); + ceph_iput_async(req->r_parent); } - iput(req->r_target_inode); - iput(req->r_new_inode); + ceph_iput_async(req->r_target_inode); + ceph_iput_async(req->r_new_inode); if (req->r_dentry) dput(req->r_dentry); if (req->r_old_dentry) @@ -1118,7 +1118,7 @@ void ceph_mdsc_release_request(struct kref *kref) */ ceph_put_cap_refs(ceph_inode(req->r_old_dentry_dir), CEPH_CAP_PIN); - iput(req->r_old_dentry_dir); + ceph_iput_async(req->r_old_dentry_dir); } kfree(req->r_path1); kfree(req->r_path2); @@ -1240,7 +1240,7 @@ static void __unregister_request(struct ceph_mds_client *mdsc, }
if (req->r_unsafe_dir) { - iput(req->r_unsafe_dir); + ceph_iput_async(req->r_unsafe_dir); req->r_unsafe_dir = NULL; }
@@ -1413,7 +1413,7 @@ static int __choose_mds(struct ceph_mds_client *mdsc, cap = rb_entry(rb_first(&ci->i_caps), struct ceph_cap, ci_node); if (!cap) { spin_unlock(&ci->i_ceph_lock); - iput(inode); + ceph_iput_async(inode); goto random; } mds = cap->session->s_mds; @@ -1422,7 +1422,7 @@ static int __choose_mds(struct ceph_mds_client *mdsc, cap == ci->i_auth_cap ? "auth " : "", cap); spin_unlock(&ci->i_ceph_lock); out: - iput(inode); + ceph_iput_async(inode); return mds;
random: @@ -1841,7 +1841,7 @@ int ceph_iterate_session_caps(struct ceph_mds_session *session, spin_unlock(&session->s_cap_lock);
if (last_inode) { - iput(last_inode); + ceph_iput_async(last_inode); last_inode = NULL; } if (old_cap) { @@ -1874,7 +1874,7 @@ int ceph_iterate_session_caps(struct ceph_mds_session *session, session->s_cap_iterator = NULL; spin_unlock(&session->s_cap_lock);
- iput(last_inode); + ceph_iput_async(last_inode); if (old_cap) ceph_put_cap(session->s_mdsc, old_cap);
@@ -1904,7 +1904,7 @@ static int remove_session_caps_cb(struct inode *inode, int mds, void *arg) if (invalidate) ceph_queue_invalidate(inode); while (iputs--) - iput(inode); + ceph_iput_async(inode); return 0; }
@@ -1944,7 +1944,7 @@ static void remove_session_caps(struct ceph_mds_session *session) spin_unlock(&session->s_cap_lock);
inode = ceph_find_inode(sb, vino); - iput(inode); + ceph_iput_async(inode);
spin_lock(&session->s_cap_lock); } @@ -2512,7 +2512,7 @@ static void ceph_cap_unlink_work(struct work_struct *work) doutc(cl, "on %p %llx.%llx\n", inode, ceph_vinop(inode)); ceph_check_caps(ci, CHECK_CAPS_FLUSH); - iput(inode); + ceph_iput_async(inode); spin_lock(&mdsc->cap_delay_lock); } } @@ -3933,7 +3933,7 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg) !req->r_reply_info.has_create_ino) { /* This should never happen on an async create */ WARN_ON_ONCE(req->r_deleg_ino); - iput(in); + ceph_iput_async(in); in = NULL; }
@@ -5313,7 +5313,7 @@ static void handle_lease(struct ceph_mds_client *mdsc,
out: mutex_unlock(&session->s_mutex); - iput(inode); + ceph_iput_async(inode);
ceph_dec_mds_stopping_blocker(mdsc); return; diff --git a/fs/ceph/quota.c b/fs/ceph/quota.c index d90eda19bcc4..bba00f8926e6 100644 --- a/fs/ceph/quota.c +++ b/fs/ceph/quota.c @@ -76,7 +76,7 @@ void ceph_handle_quota(struct ceph_mds_client *mdsc, le64_to_cpu(h->max_files)); spin_unlock(&ci->i_ceph_lock);
- iput(inode); + ceph_iput_async(inode); out: ceph_dec_mds_stopping_blocker(mdsc); } @@ -190,7 +190,7 @@ void ceph_cleanup_quotarealms_inodes(struct ceph_mds_client *mdsc) node = rb_first(&mdsc->quotarealms_inodes); qri = rb_entry(node, struct ceph_quotarealm_inode, node); rb_erase(node, &mdsc->quotarealms_inodes); - iput(qri->inode); + ceph_iput_async(qri->inode); kfree(qri); } mutex_unlock(&mdsc->quotarealms_inodes_mutex); diff --git a/fs/ceph/snap.c b/fs/ceph/snap.c index c65f2b202b2b..19f097e79b3c 100644 --- a/fs/ceph/snap.c +++ b/fs/ceph/snap.c @@ -735,7 +735,7 @@ static void queue_realm_cap_snaps(struct ceph_mds_client *mdsc, if (!inode) continue; spin_unlock(&realm->inodes_with_caps_lock); - iput(lastinode); + ceph_iput_async(lastinode); lastinode = inode;
/* @@ -762,7 +762,7 @@ static void queue_realm_cap_snaps(struct ceph_mds_client *mdsc, spin_lock(&realm->inodes_with_caps_lock); } spin_unlock(&realm->inodes_with_caps_lock); - iput(lastinode); + ceph_iput_async(lastinode);
if (capsnap) kmem_cache_free(ceph_cap_snap_cachep, capsnap); @@ -955,7 +955,7 @@ static void flush_snaps(struct ceph_mds_client *mdsc) ihold(inode); spin_unlock(&mdsc->snap_flush_lock); ceph_flush_snaps(ci, &session); - iput(inode); + ceph_iput_async(inode); spin_lock(&mdsc->snap_flush_lock); } spin_unlock(&mdsc->snap_flush_lock); @@ -1116,12 +1116,12 @@ void ceph_handle_snap(struct ceph_mds_client *mdsc, ceph_get_snap_realm(mdsc, realm); ceph_change_snap_realm(inode, realm); spin_unlock(&ci->i_ceph_lock); - iput(inode); + ceph_iput_async(inode); continue;
skip_inode: spin_unlock(&ci->i_ceph_lock); - iput(inode); + ceph_iput_async(inode); }
/* we may have taken some of the old realm's children. */ diff --git a/fs/ceph/super.h b/fs/ceph/super.h index cf176aab0f82..361a72a67bb8 100644 --- a/fs/ceph/super.h +++ b/fs/ceph/super.h @@ -1085,6 +1085,8 @@ static inline void ceph_queue_flush_snaps(struct inode *inode) ceph_queue_inode_work(inode, CEPH_I_WORK_FLUSH_SNAPS); }
+void ceph_iput_async(struct inode *inode); + extern int ceph_try_to_choose_auth_mds(struct inode *inode, int mask); extern int __ceph_do_getattr(struct inode *inode, struct page *locked_page, int mask, bool force);
On Wed, Sep 17, 2025 at 3:59 PM Max Kellermann max.kellermann@ionos.com wrote:
+/**
- Queue an asynchronous iput() call in a worker thread. Use this
- instead of iput() in contexts where evicting the inode is unsafe.
- For example, inode eviction may cause deadlocks in
- inode_wait_for_writeback() (when called from within writeback) or
- in netfs_wait_for_outstanding_io() (when called from within the
- Ceph messenger).
- */
+void ceph_iput_async(struct inode *inode) +{
if (unlikely(!inode))
return;
if (likely(atomic_add_unless(&inode->i_count, -1, 1)))
/* somebody else is holding another reference -
* nothing left to do for us
*/
return;
LGTM, I see the queue thing ends up issuing iput() so it's all good, thanks.
No idea about the other stuff it is doing concerning ceph flags so no comment.
doutc(ceph_inode_to_fs_client(inode)->client, "%p %llx.%llx\n", inode, ceph_vinop(inode));
/* simply queue a ceph_inode_work() (donating the remaining
* reference) without setting i_work_mask bit; other than
* putting the reference, there is nothing to do
*/
WARN_ON_ONCE(!queue_work(ceph_inode_to_fs_client(inode)->inode_wq,
&ceph_inode(inode)->i_work));
/* note: queue_work() cannot fail; it i_work were already
* queued, then it would be holding another reference, but no
* such reference exists
*/
+}
On Wed, 2025-09-17 at 15:59 +0200, Max Kellermann wrote:
The iput() function is a dangerous one - if the reference counter goes to zero, the function may block for a long time due to:
inode_wait_for_writeback() waits until writeback on this inode completes
the filesystem-specific "evict_inode" callback can do similar things; e.g. all netfs-based filesystems will call netfs_wait_for_outstanding_io() which is similar to inode_wait_for_writeback()
Therefore, callers must carefully evaluate the context they're in and check whether invoking iput() is a good idea at all.
Most of the time, this is not a problem because the dcache holds references to all inodes, and the dcache is usually the one to release the last reference. But this assumption is fragile. For example, under (memcg) memory pressure, the dcache shrinker is more likely to release inode references, moving the inode eviction to contexts where that was extremely unlikely to occur.
Our production servers "found" at least two deadlock bugs in the Ceph filesystem that were caused by this iput() behavior:
It sounds that you can easily to reproduce the issue. What is the reproduction path of the issue? How can we reproduce the issue to test the patch?
Writeback may lead to iput() calls in Ceph (e.g. from ceph_put_wrbuffer_cap_refs()) which deadlocks in inode_wait_for_writeback(). Waiting for writeback completion from within writeback will obviously never be able to make any progress. This leads to blocked kworkers like this:
INFO: task kworker/u777:6:1270802 blocked for more than 122 seconds. Not tainted 6.16.7-i1-es #773 task:kworker/u777:6 state:D stack:0 pid:1270802 tgid:1270802 ppid:2 task_flags:0x4208060 flags:0x00004000 Workqueue: writeback wb_workfn (flush-ceph-3) Call Trace:
<TASK> __schedule+0x4ea/0x17d0 schedule+0x1c/0xc0 inode_wait_for_writeback+0x71/0xb0 evict+0xcf/0x200 ceph_put_wrbuffer_cap_refs+0xdd/0x220 ceph_invalidate_folio+0x97/0xc0 ceph_writepages_start+0x127b/0x14d0 do_writepages+0xba/0x150 __writeback_single_inode+0x34/0x290 writeback_sb_inodes+0x203/0x470 __writeback_inodes_wb+0x4c/0xe0 wb_writeback+0x189/0x2b0 wb_workfn+0x30b/0x3d0 process_one_work+0x143/0x2b0 worker_thread+0x30a/0x450
In the Ceph messenger thread (net/ceph/messenger*.c), any iput() call may invoke ceph_evict_inode() which will deadlock in netfs_wait_for_outstanding_io(); since this blocks the messenger thread, completions from the Ceph servers will not ever be received and handled.
It looks like these deadlock bugs have been in the Ceph filesystem code since forever (therefore no "Fixes" tag in this patch). There may be various ways to solve this:
make iput() asynchronous and defer the actual eviction like fput() (may add overhead)
make iput() only asynchronous if I_SYNC is set (doesn't solve random things happening inside the "evict_inode" callback)
add iput_deferred() to make this asynchronous behavior/overhead optional and explicit
refactor Ceph to avoid iput() calls from within writeback and messenger (if that is even possible)
add a Ceph-specific workaround
After advice from Mateusz Guzik, I decided to do the latter. The implementation is simple because it piggybacks on the existing work_struct for ceph_queue_inode_work() - ceph_inode_work() calls iput() at the end which means we can donate the last reference to it.
This patch adds ceph_iput_async() and converts lots of iput() calls to it - at least those that may come through writeback and the messenger.
Signed-off-by: Max Kellermann max.kellermann@ionos.com Cc: Mateusz Guzik mjguzik@gmail.com Cc: stable@vger.kernel.org
v1->v2: using atomic_add_unless() instead of atomic_add_unless() to avoid letting i_count drop to zero which may cause races (thanks Mateusz Guzik)
Signed-off-by: Max Kellermann max.kellermann@ionos.com
fs/ceph/addr.c | 2 +- fs/ceph/caps.c | 16 ++++++++-------- fs/ceph/dir.c | 2 +- fs/ceph/inode.c | 34 ++++++++++++++++++++++++++++++++++ fs/ceph/mds_client.c | 30 +++++++++++++++--------------- fs/ceph/quota.c | 4 ++-- fs/ceph/snap.c | 10 +++++----- fs/ceph/super.h | 2 ++ 8 files changed, 68 insertions(+), 32 deletions(-)
diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c index 322ed268f14a..fc497c91530e 100644 --- a/fs/ceph/addr.c +++ b/fs/ceph/addr.c @@ -265,7 +265,7 @@ static void finish_netfs_read(struct ceph_osd_request *req) subreq->error = err; trace_netfs_sreq(subreq, netfs_sreq_trace_io_progress); netfs_read_subreq_terminated(subreq);
- iput(req->r_inode);
- ceph_iput_async(req->r_inode); ceph_dec_osd_stopping_blocker(fsc->mdsc);
} diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c index b1a8ff612c41..af9e3ae9ab7e 100644 --- a/fs/ceph/caps.c +++ b/fs/ceph/caps.c @@ -1771,7 +1771,7 @@ void ceph_flush_snaps(struct ceph_inode_info *ci, spin_unlock(&mdsc->snap_flush_lock); if (need_put)
iput(inode);
ceph_iput_async(inode);
} /* @@ -3319,7 +3319,7 @@ static void __ceph_put_cap_refs(struct ceph_inode_info *ci, int had, if (wake) wake_up_all(&ci->i_cap_wq); while (put-- > 0)
iput(inode);
ceph_iput_async(inode);
} void ceph_put_cap_refs(struct ceph_inode_info *ci, int had) @@ -3419,7 +3419,7 @@ void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr, if (complete_capsnap) wake_up_all(&ci->i_cap_wq); while (put-- > 0) {
iput(inode);
}ceph_iput_async(inode);
} @@ -3917,7 +3917,7 @@ static void handle_cap_flush_ack(struct inode *inode, u64 flush_tid, if (wake_mdsc) wake_up_all(&mdsc->cap_flushing_wq); if (drop)
iput(inode);
ceph_iput_async(inode);
} void __ceph_remove_capsnap(struct inode *inode, struct ceph_cap_snap *capsnap, @@ -4008,7 +4008,7 @@ static void handle_cap_flushsnap_ack(struct inode *inode, u64 flush_tid, wake_up_all(&ci->i_cap_wq); if (wake_mdsc) wake_up_all(&mdsc->cap_flushing_wq);
iput(inode);
}ceph_iput_async(inode);
} @@ -4557,7 +4557,7 @@ void ceph_handle_caps(struct ceph_mds_session *session, done: mutex_unlock(&session->s_mutex); done_unlocked:
- iput(inode);
- ceph_iput_async(inode);
out: ceph_dec_mds_stopping_blocker(mdsc); @@ -4636,7 +4636,7 @@ unsigned long ceph_check_delayed_caps(struct ceph_mds_client *mdsc) doutc(cl, "on %p %llx.%llx\n", inode, ceph_vinop(inode)); ceph_check_caps(ci, 0);
iput(inode);
}ceph_iput_async(inode); spin_lock(&mdsc->cap_delay_lock);
@@ -4675,7 +4675,7 @@ static void flush_dirty_session_caps(struct ceph_mds_session *s) spin_unlock(&mdsc->cap_dirty_lock); ceph_wait_on_async_create(inode); ceph_check_caps(ci, CHECK_CAPS_FLUSH);
iput(inode);
spin_lock(&mdsc->cap_dirty_lock); } spin_unlock(&mdsc->cap_dirty_lock);ceph_iput_async(inode);
diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c index 32973c62c1a2..ec73ed52a227 100644 --- a/fs/ceph/dir.c +++ b/fs/ceph/dir.c @@ -1290,7 +1290,7 @@ static void ceph_async_unlink_cb(struct ceph_mds_client *mdsc, ceph_mdsc_free_path_info(&path_info); } out:
- iput(req->r_old_inode);
- ceph_iput_async(req->r_old_inode); ceph_mdsc_release_dir_caps(req);
} diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c index f67025465de0..d7c0ed82bf62 100644 --- a/fs/ceph/inode.c +++ b/fs/ceph/inode.c @@ -2191,6 +2191,40 @@ void ceph_queue_inode_work(struct inode *inode, int work_bit) } } +/**
- Queue an asynchronous iput() call in a worker thread. Use this
- instead of iput() in contexts where evicting the inode is unsafe.
- For example, inode eviction may cause deadlocks in
- inode_wait_for_writeback() (when called from within writeback) or
- in netfs_wait_for_outstanding_io() (when called from within the
- Ceph messenger).
- */
+void ceph_iput_async(struct inode *inode) +{
- if (unlikely(!inode))
return;
- if (likely(atomic_add_unless(&inode->i_count, -1, 1)))
/* somebody else is holding another reference -
* nothing left to do for us
*/
return;
- doutc(ceph_inode_to_fs_client(inode)->client, "%p %llx.%llx\n", inode, ceph_vinop(inode));
What's about this?
struct ceph_fs_client *fsc = ceph_inode_to_fs_client(inode);
doutc(fsc, "%p %llx.%llx\n", inode, ceph_vinop(inode));
- /* simply queue a ceph_inode_work() (donating the remaining
* reference) without setting i_work_mask bit; other than
* putting the reference, there is nothing to do
*/
- WARN_ON_ONCE(!queue_work(ceph_inode_to_fs_client(inode)->inode_wq,
&ceph_inode(inode)->i_work));
This function looks like ceph_queue_inode_work() [1]. Can we use ceph_queue_inode_work()?
[1] https://elixir.bootlin.com/linux/v6.17-rc6/source/fs/ceph/inode.c#L2176
- /* note: queue_work() cannot fail; it i_work were already
Do you imply "if i_work were already"?
Thansk, Slava.
* queued, then it would be holding another reference, but no
* such reference exists
*/
+}
static void ceph_do_invalidate_pages(struct inode *inode) { struct ceph_client *cl = ceph_inode_to_client(inode); diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c index 3bc72b47fe4d..22d75c3be5a8 100644 --- a/fs/ceph/mds_client.c +++ b/fs/ceph/mds_client.c @@ -1097,14 +1097,14 @@ void ceph_mdsc_release_request(struct kref *kref) ceph_msg_put(req->r_reply); if (req->r_inode) { ceph_put_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
iput(req->r_inode);
} if (req->r_parent) { ceph_put_cap_refs(ceph_inode(req->r_parent), CEPH_CAP_PIN);ceph_iput_async(req->r_inode);
iput(req->r_parent);
}ceph_iput_async(req->r_parent);
- iput(req->r_target_inode);
- iput(req->r_new_inode);
- ceph_iput_async(req->r_target_inode);
- ceph_iput_async(req->r_new_inode); if (req->r_dentry) dput(req->r_dentry); if (req->r_old_dentry)
@@ -1118,7 +1118,7 @@ void ceph_mdsc_release_request(struct kref *kref) */ ceph_put_cap_refs(ceph_inode(req->r_old_dentry_dir), CEPH_CAP_PIN);
iput(req->r_old_dentry_dir);
} kfree(req->r_path1); kfree(req->r_path2);ceph_iput_async(req->r_old_dentry_dir);
@@ -1240,7 +1240,7 @@ static void __unregister_request(struct ceph_mds_client *mdsc, } if (req->r_unsafe_dir) {
iput(req->r_unsafe_dir);
req->r_unsafe_dir = NULL; }ceph_iput_async(req->r_unsafe_dir);
@@ -1413,7 +1413,7 @@ static int __choose_mds(struct ceph_mds_client *mdsc, cap = rb_entry(rb_first(&ci->i_caps), struct ceph_cap, ci_node); if (!cap) { spin_unlock(&ci->i_ceph_lock);
iput(inode);
goto random; } mds = cap->session->s_mds;ceph_iput_async(inode);
@@ -1422,7 +1422,7 @@ static int __choose_mds(struct ceph_mds_client *mdsc, cap == ci->i_auth_cap ? "auth " : "", cap); spin_unlock(&ci->i_ceph_lock); out:
- iput(inode);
- ceph_iput_async(inode); return mds;
random: @@ -1841,7 +1841,7 @@ int ceph_iterate_session_caps(struct ceph_mds_session *session, spin_unlock(&session->s_cap_lock); if (last_inode) {
iput(last_inode);
} if (old_cap) {ceph_iput_async(last_inode); last_inode = NULL;
@@ -1874,7 +1874,7 @@ int ceph_iterate_session_caps(struct ceph_mds_session *session, session->s_cap_iterator = NULL; spin_unlock(&session->s_cap_lock);
- iput(last_inode);
- ceph_iput_async(last_inode); if (old_cap) ceph_put_cap(session->s_mdsc, old_cap);
@@ -1904,7 +1904,7 @@ static int remove_session_caps_cb(struct inode *inode, int mds, void *arg) if (invalidate) ceph_queue_invalidate(inode); while (iputs--)
iput(inode);
return 0;ceph_iput_async(inode);
} @@ -1944,7 +1944,7 @@ static void remove_session_caps(struct ceph_mds_session *session) spin_unlock(&session->s_cap_lock); inode = ceph_find_inode(sb, vino);
iput(inode);
ceph_iput_async(inode);
spin_lock(&session->s_cap_lock); } @@ -2512,7 +2512,7 @@ static void ceph_cap_unlink_work(struct work_struct *work) doutc(cl, "on %p %llx.%llx\n", inode, ceph_vinop(inode)); ceph_check_caps(ci, CHECK_CAPS_FLUSH);
iput(inode);
} }ceph_iput_async(inode); spin_lock(&mdsc->cap_delay_lock);
@@ -3933,7 +3933,7 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg) !req->r_reply_info.has_create_ino) { /* This should never happen on an async create */ WARN_ON_ONCE(req->r_deleg_ino);
iput(in);
}ceph_iput_async(in); in = NULL;
@@ -5313,7 +5313,7 @@ static void handle_lease(struct ceph_mds_client *mdsc, out: mutex_unlock(&session->s_mutex);
- iput(inode);
- ceph_iput_async(inode);
ceph_dec_mds_stopping_blocker(mdsc); return; diff --git a/fs/ceph/quota.c b/fs/ceph/quota.c index d90eda19bcc4..bba00f8926e6 100644 --- a/fs/ceph/quota.c +++ b/fs/ceph/quota.c @@ -76,7 +76,7 @@ void ceph_handle_quota(struct ceph_mds_client *mdsc, le64_to_cpu(h->max_files)); spin_unlock(&ci->i_ceph_lock);
- iput(inode);
- ceph_iput_async(inode);
out: ceph_dec_mds_stopping_blocker(mdsc); } @@ -190,7 +190,7 @@ void ceph_cleanup_quotarealms_inodes(struct ceph_mds_client *mdsc) node = rb_first(&mdsc->quotarealms_inodes); qri = rb_entry(node, struct ceph_quotarealm_inode, node); rb_erase(node, &mdsc->quotarealms_inodes);
iput(qri->inode);
kfree(qri); } mutex_unlock(&mdsc->quotarealms_inodes_mutex);ceph_iput_async(qri->inode);
diff --git a/fs/ceph/snap.c b/fs/ceph/snap.c index c65f2b202b2b..19f097e79b3c 100644 --- a/fs/ceph/snap.c +++ b/fs/ceph/snap.c @@ -735,7 +735,7 @@ static void queue_realm_cap_snaps(struct ceph_mds_client *mdsc, if (!inode) continue; spin_unlock(&realm->inodes_with_caps_lock);
iput(lastinode);
lastinode = inode;ceph_iput_async(lastinode);
/* @@ -762,7 +762,7 @@ static void queue_realm_cap_snaps(struct ceph_mds_client *mdsc, spin_lock(&realm->inodes_with_caps_lock); } spin_unlock(&realm->inodes_with_caps_lock);
- iput(lastinode);
- ceph_iput_async(lastinode);
if (capsnap) kmem_cache_free(ceph_cap_snap_cachep, capsnap); @@ -955,7 +955,7 @@ static void flush_snaps(struct ceph_mds_client *mdsc) ihold(inode); spin_unlock(&mdsc->snap_flush_lock); ceph_flush_snaps(ci, &session);
iput(inode);
spin_lock(&mdsc->snap_flush_lock); } spin_unlock(&mdsc->snap_flush_lock);ceph_iput_async(inode);
@@ -1116,12 +1116,12 @@ void ceph_handle_snap(struct ceph_mds_client *mdsc, ceph_get_snap_realm(mdsc, realm); ceph_change_snap_realm(inode, realm); spin_unlock(&ci->i_ceph_lock);
iput(inode);
ceph_iput_async(inode); continue;
skip_inode: spin_unlock(&ci->i_ceph_lock);
iput(inode);
}ceph_iput_async(inode);
/* we may have taken some of the old realm's children. */ diff --git a/fs/ceph/super.h b/fs/ceph/super.h index cf176aab0f82..361a72a67bb8 100644 --- a/fs/ceph/super.h +++ b/fs/ceph/super.h @@ -1085,6 +1085,8 @@ static inline void ceph_queue_flush_snaps(struct inode *inode) ceph_queue_inode_work(inode, CEPH_I_WORK_FLUSH_SNAPS); } +void ceph_iput_async(struct inode *inode);
extern int ceph_try_to_choose_auth_mds(struct inode *inode, int mask); extern int __ceph_do_getattr(struct inode *inode, struct page *locked_page, int mask, bool force);
On Wed, Sep 17, 2025 at 7:55 PM Viacheslav Dubeyko Slava.Dubeyko@ibm.com wrote:
doutc(ceph_inode_to_fs_client(inode)->client, "%p %llx.%llx\n", inode, ceph_vinop(inode));
What's about this?
struct ceph_fs_client *fsc = ceph_inode_to_fs_client(inode);
doutc(fsc, "%p %llx.%llx\n", inode, ceph_vinop(inode));
That means I have to declare this variable at the beginning of the function because the kernel unfortunately still doesn't allow C99 rules (declare variables where they are used). And that means paying the overhead for chasing 3 layers of pointers for all callers, even those 99.99% who return early. Or declare the variable but initialize it later in an extra line. Is that the preferred coding style?
WARN_ON_ONCE(!queue_work(ceph_inode_to_fs_client(inode)->inode_wq,
&ceph_inode(inode)->i_work));
This function looks like ceph_queue_inode_work() [1]. Can we use ceph_queue_inode_work()?
No, we can not, because that function adds an inode reference (instead of donating the existing reference) and there's no way we can safely get rid of it (even if we would accept paying the overhead of two extra atomic operations).
Do you imply "if i_work were already"?
Yes, it's a typo.
On Wed, 2025-09-17 at 21:06 +0200, Max Kellermann wrote:
On Wed, Sep 17, 2025 at 7:55 PM Viacheslav Dubeyko Slava.Dubeyko@ibm.com wrote:
doutc(ceph_inode_to_fs_client(inode)->client, "%p %llx.%llx\n", inode, ceph_vinop(inode));
What's about this?
struct ceph_fs_client *fsc = ceph_inode_to_fs_client(inode);
doutc(fsc, "%p %llx.%llx\n", inode, ceph_vinop(inode));
That means I have to declare this variable at the beginning of the function because the kernel unfortunately still doesn't allow C99 rules (declare variables where they are used). And that means paying the overhead for chasing 3 layers of pointers for all callers, even those 99.99% who return early. Or declare the variable but initialize it later in an extra line. Is that the preferred coding style?
My worries here that it is too long statement. Maybe, we can make it as two lines statement then? For example:
doutc(ceph_inode_to_fs_client(inode)->client, "%p %llx.%llx\n", inode, ceph_vinop(inode));
WARN_ON_ONCE(!queue_work(ceph_inode_to_fs_client(inode)->inode_wq,
&ceph_inode(inode)->i_work));
This function looks like ceph_queue_inode_work() [1]. Can we use ceph_queue_inode_work()?
No, we can not, because that function adds an inode reference (instead of donating the existing reference) and there's no way we can safely get rid of it (even if we would accept paying the overhead of two extra atomic operations).
This function can call iput() too. Should we rework it, then? Also, as a result, we will have two similar functions. And it could be confusing.
Thanks, Slava.
On Wed, Sep 17, 2025 at 9:20 PM Viacheslav Dubeyko Slava.Dubeyko@ibm.com wrote:
WARN_ON_ONCE(!queue_work(ceph_inode_to_fs_client(inode)->inode_wq,
&ceph_inode(inode)->i_work));
This function looks like ceph_queue_inode_work() [1]. Can we use ceph_queue_inode_work()?
No, we can not, because that function adds an inode reference (instead of donating the existing reference) and there's no way we can safely get rid of it (even if we would accept paying the overhead of two extra atomic operations).
This function can call iput() too. Should we rework it, then? Also, as a result, we will have two similar functions. And it could be confusing.
No. NOT calling iput() is the whole point of my patch. Did you read the patch description?
On Wed, 2025-09-17 at 21:25 +0200, Max Kellermann wrote:
On Wed, Sep 17, 2025 at 9:20 PM Viacheslav Dubeyko Slava.Dubeyko@ibm.com wrote:
WARN_ON_ONCE(!queue_work(ceph_inode_to_fs_client(inode)->inode_wq,
&ceph_inode(inode)->i_work));
This function looks like ceph_queue_inode_work() [1]. Can we use ceph_queue_inode_work()?
No, we can not, because that function adds an inode reference (instead of donating the existing reference) and there's no way we can safely get rid of it (even if we would accept paying the overhead of two extra atomic operations).
This function can call iput() too. Should we rework it, then? Also, as a result, we will have two similar functions. And it could be confusing.
No. NOT calling iput() is the whole point of my patch. Did you read the patch description?
This function can call the iput:
void ceph_queue_inode_work(struct inode *inode, int work_bit) { struct ceph_fs_client *fsc = ceph_inode_to_fs_client(inode); struct ceph_client *cl = fsc->client; struct ceph_inode_info *ci = ceph_inode(inode); set_bit(work_bit, &ci->i_work_mask);
ihold(inode); if (queue_work(fsc->inode_wq, &ci->i_work)) { doutc(cl, "%p %llx.%llx mask=%lx\n", inode, ceph_vinop(inode), ci->i_work_mask); } else { doutc(cl, "%p %llx.%llx already queued, mask=%lx\n", inode, ceph_vinop(inode), ci->i_work_mask); iput(inode); <-- we can call iput here. } }
I am citing you: "NOT calling iput() is the whole point of my patch". This function can call iput(). And this is my question: Should we rework it, then?
Thanks, Slava.
On Wed, Sep 17, 2025 at 03:59:07PM +0200, Max Kellermann wrote:
After advice from Mateusz Guzik, I decided to do the latter. The implementation is simple because it piggybacks on the existing work_struct for ceph_queue_inode_work() - ceph_inode_work() calls iput() at the end which means we can donate the last reference to it.
This patch adds ceph_iput_async() and converts lots of iput() calls to it - at least those that may come through writeback and the messenger.
What would force those delayed calls through at fs shutdown time?
On Wed, Sep 17, 2025 at 10:20 PM Al Viro viro@zeniv.linux.org.uk wrote:
On Wed, Sep 17, 2025 at 03:59:07PM +0200, Max Kellermann wrote:
After advice from Mateusz Guzik, I decided to do the latter. The implementation is simple because it piggybacks on the existing work_struct for ceph_queue_inode_work() - ceph_inode_work() calls iput() at the end which means we can donate the last reference to it.
This patch adds ceph_iput_async() and converts lots of iput() calls to it - at least those that may come through writeback and the messenger.
What would force those delayed calls through at fs shutdown time?
I was wondering the same a few days ago, but found no code to enforce wait for work completion during shutdown - but since this was pre-existing code, I thought somebody more clever than I must have thought of this at some point and I just don't understand it. Or maybe Ceph is already bugged and my patch just makes hitting the bug more likely?
On Wed, Sep 17, 2025 at 10:25 PM Max Kellermann max.kellermann@ionos.com wrote:
On Wed, Sep 17, 2025 at 10:20 PM Al Viro viro@zeniv.linux.org.uk wrote:
On Wed, Sep 17, 2025 at 03:59:07PM +0200, Max Kellermann wrote:
After advice from Mateusz Guzik, I decided to do the latter. The implementation is simple because it piggybacks on the existing work_struct for ceph_queue_inode_work() - ceph_inode_work() calls iput() at the end which means we can donate the last reference to it.
This patch adds ceph_iput_async() and converts lots of iput() calls to it - at least those that may come through writeback and the messenger.
What would force those delayed calls through at fs shutdown time?
I was wondering the same a few days ago, but found no code to enforce wait for work completion during shutdown
What about flush_fs_workqueues() in fs/ceph/super.c? Is this what you're looking for, Al?
linux-stable-mirror@lists.linaro.org