From ccaecfd9bb69d5c58ef817f6bfa1fc243200f82c Mon Sep 17 00:00:00 2001 From: adi_holden Date: Sun, 20 Apr 2025 23:55:04 +0300 Subject: [PATCH 1/7] bug server: fix lns mismatch in replication Signed-off-by: adi_holden --- src/server/db_slice.cc | 4 +++- src/server/transaction.cc | 6 ++++++ tests/dragonfly/replication_test.py | 1 + 3 files changed, 10 insertions(+), 1 deletion(-) diff --git a/src/server/db_slice.cc b/src/server/db_slice.cc index 85573ea44709..4d70f58b730e 100644 --- a/src/server/db_slice.cc +++ b/src/server/db_slice.cc @@ -1167,7 +1167,6 @@ DbSlice::PrimeItAndExp DbSlice::ExpireIfNeeded(const Context& cntx, PrimeIterato } auto& db = db_arr_[cntx.db_index]; - auto expire_it = db->expire.Find(it->first); if (IsValid(expire_it)) { @@ -1184,6 +1183,9 @@ DbSlice::PrimeItAndExp DbSlice::ExpireIfNeeded(const Context& cntx, PrimeIterato << ", prime table size: " << db->prime.size() << util::fb2::GetStacktrace(); } + DCHECK(shard_owner()->shard_lock()->Check(IntentLock::Mode::EXCLUSIVE)) + << util::fb2::GetStacktrace(); + string scratch; string_view key = it->first.GetSlice(&scratch); diff --git a/src/server/transaction.cc b/src/server/transaction.cc index 0bbcc2668393..4e7ec33ef61e 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -150,11 +150,17 @@ cv_status Transaction::BatonBarrier::Wait(time_point tp) { Transaction::Guard::Guard(Transaction* tx) : tx(tx) { DCHECK(tx->cid_->opt_mask() & CO::GLOBAL_TRANS); tx->Execute([](auto*, auto*) { return OpStatus::OK; }, false); + shard_set->RunBriefInParallel([](EngineShard* shard) { + namespaces->GetDefaultNamespace().GetDbSlice(shard->shard_id()).SetExpireAllowed(false); + }); } Transaction::Guard::~Guard() { tx->Conclude(); tx->Refurbish(); + shard_set->RunBriefInParallel([](EngineShard* shard) { + namespaces->GetDefaultNamespace().GetDbSlice(shard->shard_id()).SetExpireAllowed(true); + }); } void Transaction::Init(unsigned num_shards) { diff --git a/tests/dragonfly/replication_test.py b/tests/dragonfly/replication_test.py index 2a4261c86fbf..16e6e81968a9 100644 --- a/tests/dragonfly/replication_test.py +++ b/tests/dragonfly/replication_test.py @@ -2982,6 +2982,7 @@ async def test_bug_in_json_memory_tracking(df_factory: DflyInstanceFactory): seeder = SeederV2(key_target=50_000) fill_task = asyncio.create_task(seeder.run(master.client())) + await asyncio.sleep(0.2) for replica in c_replicas: await replica.execute_command(f"REPLICAOF LOCALHOST {master.port}") From 50b025444573dee3bbb0dbdc5f7be6c42826a1d8 Mon Sep 17 00:00:00 2001 From: adi_holden Date: Mon, 21 Apr 2025 09:38:32 +0300 Subject: [PATCH 2/7] bug server: fix lns mismatch in replication Signed-off-by: adi_holden --- src/server/snapshot.cc | 1 + src/server/transaction.cc | 16 +++++++++------- tests/dragonfly/replication_test.py | 6 +++++- 3 files changed, 15 insertions(+), 8 deletions(-) diff --git a/src/server/snapshot.cc b/src/server/snapshot.cc index 7b9c53602110..5cd9d4fe9898 100644 --- a/src/server/snapshot.cc +++ b/src/server/snapshot.cc @@ -131,6 +131,7 @@ void SliceSnapshot::FinalizeJournalStream(bool cancel) { journal->UnregisterOnChange(cb_id); if (!cancel) { // always succeeds because serializer_ flushes to string. + VLOG(1) << "FinalizeJournalStream lsn: " << journal->GetLsn(); std::ignore = serializer_->SendJournalOffset(journal->GetLsn()); PushSerialized(true); } diff --git a/src/server/transaction.cc b/src/server/transaction.cc index 4e7ec33ef61e..578d9426d2c7 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -149,18 +149,20 @@ cv_status Transaction::BatonBarrier::Wait(time_point tp) { Transaction::Guard::Guard(Transaction* tx) : tx(tx) { DCHECK(tx->cid_->opt_mask() & CO::GLOBAL_TRANS); - tx->Execute([](auto*, auto*) { return OpStatus::OK; }, false); - shard_set->RunBriefInParallel([](EngineShard* shard) { + auto cb = [&](Transaction* t, EngineShard* shard) { namespaces->GetDefaultNamespace().GetDbSlice(shard->shard_id()).SetExpireAllowed(false); - }); + return OpStatus::OK; + }; + tx->Execute(cb, false); } Transaction::Guard::~Guard() { - tx->Conclude(); - tx->Refurbish(); - shard_set->RunBriefInParallel([](EngineShard* shard) { + auto cb = [&](Transaction* t, EngineShard* shard) { namespaces->GetDefaultNamespace().GetDbSlice(shard->shard_id()).SetExpireAllowed(true); - }); + return OpStatus::OK; + }; + tx->Execute(cb, true); + tx->Refurbish(); } void Transaction::Init(unsigned num_shards) { diff --git a/tests/dragonfly/replication_test.py b/tests/dragonfly/replication_test.py index 16e6e81968a9..8ff9919580f1 100644 --- a/tests/dragonfly/replication_test.py +++ b/tests/dragonfly/replication_test.py @@ -2964,7 +2964,11 @@ async def test_bug_in_json_memory_tracking(df_factory: DflyInstanceFactory): """ This test reproduces a bug in the JSON memory tracking. """ - master = df_factory.create(proactor_threads=2, serialization_max_chunk_size=1) + master = df_factory.create( + proactor_threads=2, + serialization_max_chunk_size=1, + vmodule="replica=2,dflycmd=2,snapshot=1,rdb_save=1,rdb_load=1,journal_slice=2", + ) replicas = [df_factory.create(proactor_threads=2) for i in range(2)] # Start instances and connect clients From c995ffc35a9db9456af6aad8395d0dd6e8280e30 Mon Sep 17 00:00:00 2001 From: adi_holden Date: Mon, 21 Apr 2025 15:03:54 +0300 Subject: [PATCH 3/7] bug server: fix lns mismatch in replication Signed-off-by: adi_holden --- tests/dragonfly/replication_test.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/dragonfly/replication_test.py b/tests/dragonfly/replication_test.py index 8ff9919580f1..6ce62fdf557f 100644 --- a/tests/dragonfly/replication_test.py +++ b/tests/dragonfly/replication_test.py @@ -2959,7 +2959,6 @@ async def test_preempt_in_atomic_section_of_heartbeat(df_factory: DflyInstanceFa await fill_task -@pytest.mark.skip("temporarily skipped") async def test_bug_in_json_memory_tracking(df_factory: DflyInstanceFactory): """ This test reproduces a bug in the JSON memory tracking. From 7402730f7171d09ac41d24dedc1703c2e8a79c92 Mon Sep 17 00:00:00 2001 From: adi_holden Date: Tue, 22 Apr 2025 14:14:58 +0300 Subject: [PATCH 4/7] bug server: fix lns mismatch in replication Signed-off-by: adi_holden --- src/server/journal/streamer.cc | 3 ++- src/server/journal/streamer.h | 1 + 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/src/server/journal/streamer.cc b/src/server/journal/streamer.cc index c976ac52a4f3..6a38dc66587c 100644 --- a/src/server/journal/streamer.cc +++ b/src/server/journal/streamer.cc @@ -69,9 +69,10 @@ void JournalStreamer::Start(util::FiberSocketBase* dest, bool send_lsn) { return; } + DCHECK_GT(item.lsn, last_lsn_writen_); Write(item.data); time_t now = time(nullptr); - + last_lsn_writen_ = item.lsn; // TODO: to chain it to the previous Write call. if (send_lsn && now - last_lsn_time_ > 3) { last_lsn_time_ = now; diff --git a/src/server/journal/streamer.h b/src/server/journal/streamer.h index d1d0bade9424..95134abf2c4e 100644 --- a/src/server/journal/streamer.h +++ b/src/server/journal/streamer.h @@ -68,6 +68,7 @@ class JournalStreamer { size_t in_flight_bytes_ = 0, total_sent_ = 0; time_t last_lsn_time_ = 0; + LSN last_lsn_writen_ = 0; util::fb2::EventCount waker_; uint32_t journal_cb_id_{0}; }; From 2e12d1f3d52c49ca65ed79bd2f9c8c3232403453 Mon Sep 17 00:00:00 2001 From: adi_holden Date: Wed, 23 Apr 2025 14:24:28 +0300 Subject: [PATCH 5/7] bug server: fix lns mismatch in replication Signed-off-by: adi_holden --- src/server/journal/streamer.cc | 12 ++++-------- src/server/journal/streamer.h | 2 +- 2 files changed, 5 insertions(+), 9 deletions(-) diff --git a/src/server/journal/streamer.cc b/src/server/journal/streamer.cc index 6a38dc66587c..6ceb8d817ac7 100644 --- a/src/server/journal/streamer.cc +++ b/src/server/journal/streamer.cc @@ -57,14 +57,6 @@ void JournalStreamer::Start(util::FiberSocketBase* dest, bool send_lsn) { dest_ = dest; journal_cb_id_ = journal_->RegisterOnChange([this, send_lsn](const JournalItem& item, bool allow_await) { - if (allow_await) { - ThrottleIfNeeded(); - // No record to write, just await if data was written so consumer will read the data. - // TODO: shouldnt we trigger async write in noop?? - if (item.opcode == Op::NOOP) - return; - } - if (!ShouldWrite(item)) { return; } @@ -81,6 +73,10 @@ void JournalStreamer::Start(util::FiberSocketBase* dest, bool send_lsn) { writer.Write(Entry{journal::Op::LSN, item.lsn}); Write(std::move(sink).str()); } + + if (allow_await) { + ThrottleIfNeeded(); + } }); } diff --git a/src/server/journal/streamer.h b/src/server/journal/streamer.h index 95134abf2c4e..ddc28e77e8b6 100644 --- a/src/server/journal/streamer.h +++ b/src/server/journal/streamer.h @@ -48,7 +48,7 @@ class JournalStreamer { void ThrottleIfNeeded(); virtual bool ShouldWrite(const journal::JournalItem& item) const { - return cntx_->IsRunning(); + return cntx_->IsRunning() && item.opcode != journal::Op::NOOP; } void WaitForInflightToComplete(); From 24df41c31c8acce581a6d2124455d53acd78e851 Mon Sep 17 00:00:00 2001 From: adi_holden Date: Mon, 5 May 2025 10:19:43 +0300 Subject: [PATCH 6/7] use JournalConsumerInterface Signed-off-by: adi_holden --- src/server/dflycmd.cc | 5 +-- src/server/journal/journal.cc | 4 +- src/server/journal/journal.h | 2 +- src/server/journal/journal_slice.cc | 23 ++++++++---- src/server/journal/journal_slice.h | 6 +-- src/server/journal/streamer.cc | 57 ++++++++++++++--------------- src/server/journal/streamer.h | 12 ++++-- src/server/journal/types.h | 7 +++- src/server/snapshot.cc | 20 +++------- src/server/snapshot.h | 6 ++- 10 files changed, 76 insertions(+), 66 deletions(-) diff --git a/src/server/dflycmd.cc b/src/server/dflycmd.cc index 139b7dd966ba..4514e7ac84d2 100644 --- a/src/server/dflycmd.cc +++ b/src/server/dflycmd.cc @@ -617,9 +617,8 @@ void DflyCmd::StartStableSyncInThread(FlowInfo* flow, ExecutionState* exec_st, E DCHECK(shard); DCHECK(flow->conn); - flow->streamer.reset(new JournalStreamer(sf_->journal(), exec_st)); - bool send_lsn = flow->version >= DflyVersion::VER4; - flow->streamer->Start(flow->conn->socket(), send_lsn); + flow->streamer.reset(new JournalStreamer(sf_->journal(), exec_st, JournalStreamer::SendLsn::YES)); + flow->streamer->Start(flow->conn->socket()); // Register cleanup. flow->cleanup = [flow]() { diff --git a/src/server/journal/journal.cc b/src/server/journal/journal.cc index 945931bc50e7..04d2098be5e2 100644 --- a/src/server/journal/journal.cc +++ b/src/server/journal/journal.cc @@ -58,8 +58,8 @@ error_code Journal::Close() { return {}; } -uint32_t Journal::RegisterOnChange(ChangeCallback cb) { - return journal_slice.RegisterOnChange(cb); +uint32_t Journal::RegisterOnChange(JournalConsumerInterface* consumer) { + return journal_slice.RegisterOnChange(consumer); } void Journal::UnregisterOnChange(uint32_t id) { diff --git a/src/server/journal/journal.h b/src/server/journal/journal.h index 0b74c6417c42..7b277eaa32df 100644 --- a/src/server/journal/journal.h +++ b/src/server/journal/journal.h @@ -25,7 +25,7 @@ class Journal { //******* The following functions must be called in the context of the owning shard *********// - uint32_t RegisterOnChange(ChangeCallback cb); + uint32_t RegisterOnChange(JournalConsumerInterface* consumer); void UnregisterOnChange(uint32_t id); bool HasRegisteredCallbacks() const; diff --git a/src/server/journal/journal_slice.cc b/src/server/journal/journal_slice.cc index 82670c4cded9..17870c48cce3 100644 --- a/src/server/journal/journal_slice.cc +++ b/src/server/journal/journal_slice.cc @@ -157,28 +157,35 @@ void JournalSlice::CallOnChange(const JournalItem& item) { // CallOnChange is atomic iff JournalSlice::SetFlushMode(false) is called before. std::shared_lock lk(cb_mu_); - const size_t size = change_cb_arr_.size(); - auto k_v = change_cb_arr_.begin(); + const size_t size = journal_consumers_arr_.size(); + auto k_v = journal_consumers_arr_.begin(); for (size_t i = 0; i < size; ++i) { - k_v->second(item, enable_journal_flush_); + k_v->second->ConsumeJournalChange(item); ++k_v; } + k_v = journal_consumers_arr_.begin(); + if (enable_journal_flush_) { + for (size_t i = 0; i < size; ++i) { + k_v->second->ThrottleIfNeeded(); + ++k_v; + } + } } -uint32_t JournalSlice::RegisterOnChange(ChangeCallback cb) { +uint32_t JournalSlice::RegisterOnChange(JournalConsumerInterface* consumer) { // mutex lock isn't needed due to iterators are not invalidated uint32_t id = next_cb_id_++; - change_cb_arr_.emplace_back(id, std::move(cb)); + journal_consumers_arr_.emplace_back(id, std::move(consumer)); return id; } void JournalSlice::UnregisterOnChange(uint32_t id) { // we need to wait until callback is finished before remove it lock_guard lk(cb_mu_); - auto it = find_if(change_cb_arr_.begin(), change_cb_arr_.end(), + auto it = find_if(journal_consumers_arr_.begin(), journal_consumers_arr_.end(), [id](const auto& e) { return e.first == id; }); - CHECK(it != change_cb_arr_.end()); - change_cb_arr_.erase(it); + CHECK(it != journal_consumers_arr_.end()); + journal_consumers_arr_.erase(it); } } // namespace journal diff --git a/src/server/journal/journal_slice.h b/src/server/journal/journal_slice.h index da0b18ea7918..c8a9553ea723 100644 --- a/src/server/journal/journal_slice.h +++ b/src/server/journal/journal_slice.h @@ -43,11 +43,11 @@ class JournalSlice { // added to the journal. // The callback receives the entry and a boolean that indicates whether // awaiting (to apply backpressure) is allowed. - uint32_t RegisterOnChange(ChangeCallback cb); + uint32_t RegisterOnChange(JournalConsumerInterface* consumer); void UnregisterOnChange(uint32_t); bool HasRegisteredCallbacks() const { - return !change_cb_arr_.empty(); + return !journal_consumers_arr_.empty(); } /// Returns whether the journal entry with this LSN is available @@ -70,7 +70,7 @@ class JournalSlice { base::IoBuf ring_serialize_buf_; mutable util::fb2::SharedMutex cb_mu_; // to prevent removing callback during call - std::list> change_cb_arr_; + std::list> journal_consumers_arr_; LSN lsn_ = 1; diff --git a/src/server/journal/streamer.cc b/src/server/journal/streamer.cc index 6ceb8d817ac7..46bf8dee9c12 100644 --- a/src/server/journal/streamer.cc +++ b/src/server/journal/streamer.cc @@ -39,8 +39,8 @@ uint32_t migration_buckets_serialization_threshold_cached = 100; } // namespace -JournalStreamer::JournalStreamer(journal::Journal* journal, ExecutionState* cntx) - : cntx_(cntx), journal_(journal) { +JournalStreamer::JournalStreamer(journal::Journal* journal, ExecutionState* cntx, SendLsn send_lsn) + : cntx_(cntx), journal_(journal), send_lsn_(send_lsn) { // cache the flag to avoid accessing it later. replication_stream_output_limit_cached = absl::GetFlag(FLAGS_replication_stream_output_limit); } @@ -52,32 +52,29 @@ JournalStreamer::~JournalStreamer() { VLOG(1) << "~JournalStreamer"; } -void JournalStreamer::Start(util::FiberSocketBase* dest, bool send_lsn) { - CHECK(dest_ == nullptr && dest != nullptr); - dest_ = dest; - journal_cb_id_ = - journal_->RegisterOnChange([this, send_lsn](const JournalItem& item, bool allow_await) { - if (!ShouldWrite(item)) { - return; - } +void JournalStreamer::ConsumeJournalChange(const JournalItem& item) { + if (!ShouldWrite(item)) { + return; + } - DCHECK_GT(item.lsn, last_lsn_writen_); - Write(item.data); - time_t now = time(nullptr); - last_lsn_writen_ = item.lsn; - // TODO: to chain it to the previous Write call. - if (send_lsn && now - last_lsn_time_ > 3) { - last_lsn_time_ = now; - io::StringSink sink; - JournalWriter writer(&sink); - writer.Write(Entry{journal::Op::LSN, item.lsn}); - Write(std::move(sink).str()); - } + DCHECK_GT(item.lsn, last_lsn_writen_); + Write(item.data); + time_t now = time(nullptr); + last_lsn_writen_ = item.lsn; + // TODO: to chain it to the previous Write call. + if (send_lsn_ == SendLsn::YES && now - last_lsn_time_ > 3) { + last_lsn_time_ = now; + io::StringSink sink; + JournalWriter writer(&sink); + writer.Write(Entry{journal::Op::LSN, item.lsn}); + Write(std::move(sink).str()); + } +} - if (allow_await) { - ThrottleIfNeeded(); - } - }); +void JournalStreamer::Start(util::FiberSocketBase* dest) { + CHECK(dest_ == nullptr && dest != nullptr); + dest_ = dest; + journal_cb_id_ = journal_->RegisterOnChange(this); } void JournalStreamer::Cancel() { @@ -185,14 +182,16 @@ bool JournalStreamer::IsStalled() const { RestoreStreamer::RestoreStreamer(DbSlice* slice, cluster::SlotSet slots, journal::Journal* journal, ExecutionState* cntx) - : JournalStreamer(journal, cntx), db_slice_(slice), my_slots_(std::move(slots)) { + : JournalStreamer(journal, cntx, JournalStreamer::SendLsn::NO), + db_slice_(slice), + my_slots_(std::move(slots)) { DCHECK(slice != nullptr); migration_buckets_serialization_threshold_cached = absl::GetFlag(FLAGS_migration_buckets_serialization_threshold); db_array_ = slice->databases(); // Inc ref to make sure DB isn't deleted while we use it } -void RestoreStreamer::Start(util::FiberSocketBase* dest, bool send_lsn) { +void RestoreStreamer::Start(util::FiberSocketBase* dest) { if (!cntx_->IsRunning()) return; @@ -200,7 +199,7 @@ void RestoreStreamer::Start(util::FiberSocketBase* dest, bool send_lsn) { auto db_cb = absl::bind_front(&RestoreStreamer::OnDbChange, this); snapshot_version_ = db_slice_->RegisterOnChange(std::move(db_cb)); - JournalStreamer::Start(dest, send_lsn); + JournalStreamer::Start(dest); } void RestoreStreamer::Run() { diff --git a/src/server/journal/streamer.h b/src/server/journal/streamer.h index ddc28e77e8b6..16ba86e99d6e 100644 --- a/src/server/journal/streamer.h +++ b/src/server/journal/streamer.h @@ -18,9 +18,10 @@ namespace dfly { // Buffered single-shard journal streamer that listens for journal changes with a // journal listener and writes them to a destination sink in a separate fiber. -class JournalStreamer { +class JournalStreamer : public journal::JournalConsumerInterface { public: - JournalStreamer(journal::Journal* journal, ExecutionState* cntx); + enum class SendLsn { NO = 0, YES = 1 }; + JournalStreamer(journal::Journal* journal, ExecutionState* cntx, SendLsn send_lsn); virtual ~JournalStreamer(); // Self referential. @@ -28,7 +29,9 @@ class JournalStreamer { JournalStreamer(JournalStreamer&& other) = delete; // Register journal listener and start writer in fiber. - virtual void Start(util::FiberSocketBase* dest, bool send_lsn); + virtual void Start(util::FiberSocketBase* dest); + + void ConsumeJournalChange(const journal::JournalItem& item); // Must be called on context cancellation for unblocking // and manual cleanup. @@ -71,6 +74,7 @@ class JournalStreamer { LSN last_lsn_writen_ = 0; util::fb2::EventCount waker_; uint32_t journal_cb_id_{0}; + SendLsn send_lsn_; }; // Serializes existing DB as RESTORE commands, and sends updates as regular commands. @@ -81,7 +85,7 @@ class RestoreStreamer : public JournalStreamer { ExecutionState* cntx); ~RestoreStreamer() override; - void Start(util::FiberSocketBase* dest, bool send_lsn = false) override; + void Start(util::FiberSocketBase* dest) override; void Run(); diff --git a/src/server/journal/types.h b/src/server/journal/types.h index bb3904dbcefd..cd37d691e14c 100644 --- a/src/server/journal/types.h +++ b/src/server/journal/types.h @@ -87,7 +87,12 @@ struct JournalItem { std::optional slot; }; -using ChangeCallback = std::function; +struct JournalConsumerInterface { + virtual ~JournalConsumerInterface() = default; + + virtual void ConsumeJournalChange(const JournalItem& item) = 0; + virtual void ThrottleIfNeeded() = 0; +}; } // namespace journal } // namespace dfly diff --git a/src/server/snapshot.cc b/src/server/snapshot.cc index 5cd9d4fe9898..9eaae4df2684 100644 --- a/src/server/snapshot.cc +++ b/src/server/snapshot.cc @@ -75,10 +75,7 @@ void SliceSnapshot::Start(bool stream_journal, SnapshotFlush allow_flush) { if (stream_journal) { auto* journal = db_slice_->shard_owner()->journal(); DCHECK(journal); - auto journal_cb = [this](const journal::JournalItem& item, bool await) { - OnJournalEntry(item, await); - }; - journal_cb_id_ = journal->RegisterOnChange(std::move(journal_cb)); + journal_cb_id_ = journal->RegisterOnChange(this); } const auto flush_threshold = ServerState::tlocal()->serialization_max_chunk_size; @@ -228,10 +225,7 @@ void SliceSnapshot::SwitchIncrementalFb(LSN lsn) { if (journal->GetLsn() == lsn) { std::ignore = serializer_->SendFullSyncCut(); - auto journal_cb = [this](const journal::JournalItem& item, bool await) { - OnJournalEntry(item, await); - }; - journal_cb_id_ = journal->RegisterOnChange(std::move(journal_cb)); + journal_cb_id_ = journal->RegisterOnChange(this); PushSerialized(true); } else { // We stopped but we didn't manage to send the whole stream. @@ -416,7 +410,7 @@ void SliceSnapshot::OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req) // transaction. // allow_flush is controlled by Journal::SetFlushMode // (usually it's true unless we are in the middle of a critical section that can not preempt). -void SliceSnapshot::OnJournalEntry(const journal::JournalItem& item, bool allow_flush) { +void SliceSnapshot::ConsumeJournalChange(const journal::JournalItem& item) { { // We grab the lock in case we are in the middle of serializing a bucket, so it serves as a // barrier here for atomic serialization. @@ -425,12 +419,10 @@ void SliceSnapshot::OnJournalEntry(const journal::JournalItem& item, bool allow_ std::ignore = serializer_->WriteJournalEntry(item.data); } } +} - if (allow_flush) { - // This is the only place that flushes in streaming mode - // once the iterate buckets fiber finished. - PushSerialized(false); - } +void SliceSnapshot::ThrottleIfNeeded() { + PushSerialized(false); } size_t SliceSnapshot::GetBufferCapacity() const { diff --git a/src/server/snapshot.h b/src/server/snapshot.h index f027b5e48130..c49ae9f5f7ed 100644 --- a/src/server/snapshot.h +++ b/src/server/snapshot.h @@ -47,7 +47,7 @@ struct Entry; // and submitting all values to an output sink. // In journal streaming mode, the snapshot continues submitting changes // over the sink until explicitly stopped. -class SliceSnapshot { +class SliceSnapshot : public journal::JournalConsumerInterface { public: // Represents a target for receiving snapshot data. struct SnapshotDataConsumerInterface { @@ -103,6 +103,10 @@ class SliceSnapshot { RdbSaver::SnapshotStats GetCurrentSnapshotProgress() const; + // Journal listener + void ConsumeJournalChange(const journal::JournalItem& item); + void ThrottleIfNeeded(); + private: // Main snapshotting fiber that iterates over all buckets in the db slice // and submits them to SerializeBucket. From 7a5b94944b07ef541a67778f7dbc4b7c27294d6f Mon Sep 17 00:00:00 2001 From: adi_holden Date: Mon, 5 May 2025 12:00:07 +0300 Subject: [PATCH 7/7] use JournalConsumerInterface Signed-off-by: adi_holden --- src/server/journal/journal_slice.cc | 14 ++++---------- src/server/journal/types.h | 2 ++ 2 files changed, 6 insertions(+), 10 deletions(-) diff --git a/src/server/journal/journal_slice.cc b/src/server/journal/journal_slice.cc index 17870c48cce3..72f865268be2 100644 --- a/src/server/journal/journal_slice.cc +++ b/src/server/journal/journal_slice.cc @@ -156,18 +156,12 @@ void JournalSlice::CallOnChange(const JournalItem& item) { // Hence this lock prevents the UnregisterOnChange to start running in the middle of CallOnChange. // CallOnChange is atomic iff JournalSlice::SetFlushMode(false) is called before. std::shared_lock lk(cb_mu_); - - const size_t size = journal_consumers_arr_.size(); - auto k_v = journal_consumers_arr_.begin(); - for (size_t i = 0; i < size; ++i) { - k_v->second->ConsumeJournalChange(item); - ++k_v; + for (auto k_v : journal_consumers_arr_) { + k_v.second->ConsumeJournalChange(item); } - k_v = journal_consumers_arr_.begin(); if (enable_journal_flush_) { - for (size_t i = 0; i < size; ++i) { - k_v->second->ThrottleIfNeeded(); - ++k_v; + for (auto k_v : journal_consumers_arr_) { + k_v.second->ThrottleIfNeeded(); } } } diff --git a/src/server/journal/types.h b/src/server/journal/types.h index cd37d691e14c..24183623a5e3 100644 --- a/src/server/journal/types.h +++ b/src/server/journal/types.h @@ -90,7 +90,9 @@ struct JournalItem { struct JournalConsumerInterface { virtual ~JournalConsumerInterface() = default; + // Receives a journal change for serializing virtual void ConsumeJournalChange(const JournalItem& item) = 0; + // Waits for writing the serialized data virtual void ThrottleIfNeeded() = 0; };