Skip to content

bug(server): fix lns mismatch in replication #4967

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
May 5, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/server/db_slice.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1167,7 +1167,6 @@ DbSlice::PrimeItAndExp DbSlice::ExpireIfNeeded(const Context& cntx, PrimeIterato
}

auto& db = db_arr_[cntx.db_index];

auto expire_it = db->expire.Find(it->first);

if (IsValid(expire_it)) {
Expand All @@ -1184,6 +1183,9 @@ DbSlice::PrimeItAndExp DbSlice::ExpireIfNeeded(const Context& cntx, PrimeIterato
<< ", prime table size: " << db->prime.size() << util::fb2::GetStacktrace();
}

DCHECK(shard_owner()->shard_lock()->Check(IntentLock::Mode::EXCLUSIVE))
<< util::fb2::GetStacktrace();

string scratch;
string_view key = it->first.GetSlice(&scratch);

Expand Down
5 changes: 2 additions & 3 deletions src/server/dflycmd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -665,9 +665,8 @@ void DflyCmd::StartStableSyncInThread(FlowInfo* flow, ExecutionState* exec_st, E
DCHECK(shard);
DCHECK(flow->conn);

flow->streamer.reset(new JournalStreamer(sf_->journal(), exec_st));
bool send_lsn = flow->version >= DflyVersion::VER4;
flow->streamer->Start(flow->conn->socket(), send_lsn);
flow->streamer.reset(new JournalStreamer(sf_->journal(), exec_st, JournalStreamer::SendLsn::YES));
flow->streamer->Start(flow->conn->socket());

// Register cleanup.
flow->cleanup = [flow]() {
Expand Down
4 changes: 2 additions & 2 deletions src/server/journal/journal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ error_code Journal::Close() {
return {};
}

uint32_t Journal::RegisterOnChange(ChangeCallback cb) {
return journal_slice.RegisterOnChange(cb);
uint32_t Journal::RegisterOnChange(JournalConsumerInterface* consumer) {
return journal_slice.RegisterOnChange(consumer);
}

void Journal::UnregisterOnChange(uint32_t id) {
Expand Down
2 changes: 1 addition & 1 deletion src/server/journal/journal.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class Journal {

//******* The following functions must be called in the context of the owning shard *********//

uint32_t RegisterOnChange(ChangeCallback cb);
uint32_t RegisterOnChange(JournalConsumerInterface* consumer);
void UnregisterOnChange(uint32_t id);
bool HasRegisteredCallbacks() const;

Expand Down
23 changes: 12 additions & 11 deletions src/server/journal/journal_slice.cc
Original file line number Diff line number Diff line change
Expand Up @@ -156,29 +156,30 @@ void JournalSlice::CallOnChange(const JournalItem& item) {
// Hence this lock prevents the UnregisterOnChange to start running in the middle of CallOnChange.
// CallOnChange is atomic iff JournalSlice::SetFlushMode(false) is called before.
std::shared_lock lk(cb_mu_);

const size_t size = change_cb_arr_.size();
auto k_v = change_cb_arr_.begin();
for (size_t i = 0; i < size; ++i) {
k_v->second(item, enable_journal_flush_);
++k_v;
for (auto k_v : journal_consumers_arr_) {
k_v.second->ConsumeJournalChange(item);
}
if (enable_journal_flush_) {
for (auto k_v : journal_consumers_arr_) {
k_v.second->ThrottleIfNeeded();
}
}
}

uint32_t JournalSlice::RegisterOnChange(ChangeCallback cb) {
uint32_t JournalSlice::RegisterOnChange(JournalConsumerInterface* consumer) {
// mutex lock isn't needed due to iterators are not invalidated
uint32_t id = next_cb_id_++;
change_cb_arr_.emplace_back(id, std::move(cb));
journal_consumers_arr_.emplace_back(id, std::move(consumer));
return id;
}

void JournalSlice::UnregisterOnChange(uint32_t id) {
// we need to wait until callback is finished before remove it
lock_guard lk(cb_mu_);
auto it = find_if(change_cb_arr_.begin(), change_cb_arr_.end(),
auto it = find_if(journal_consumers_arr_.begin(), journal_consumers_arr_.end(),
[id](const auto& e) { return e.first == id; });
CHECK(it != change_cb_arr_.end());
change_cb_arr_.erase(it);
CHECK(it != journal_consumers_arr_.end());
journal_consumers_arr_.erase(it);
}

} // namespace journal
Expand Down
6 changes: 3 additions & 3 deletions src/server/journal/journal_slice.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,11 @@ class JournalSlice {
// added to the journal.
// The callback receives the entry and a boolean that indicates whether
// awaiting (to apply backpressure) is allowed.
uint32_t RegisterOnChange(ChangeCallback cb);
uint32_t RegisterOnChange(JournalConsumerInterface* consumer);
void UnregisterOnChange(uint32_t);

bool HasRegisteredCallbacks() const {
return !change_cb_arr_.empty();
return !journal_consumers_arr_.empty();
}

/// Returns whether the journal entry with this LSN is available
Expand All @@ -70,7 +70,7 @@ class JournalSlice {
base::IoBuf ring_serialize_buf_;

mutable util::fb2::SharedMutex cb_mu_; // to prevent removing callback during call
std::list<std::pair<uint32_t, ChangeCallback>> change_cb_arr_;
std::list<std::pair<uint32_t, JournalConsumerInterface*>> journal_consumers_arr_;

LSN lsn_ = 1;

Expand Down
60 changes: 28 additions & 32 deletions src/server/journal/streamer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ uint32_t migration_buckets_serialization_threshold_cached = 100;

} // namespace

JournalStreamer::JournalStreamer(journal::Journal* journal, ExecutionState* cntx)
: cntx_(cntx), journal_(journal) {
JournalStreamer::JournalStreamer(journal::Journal* journal, ExecutionState* cntx, SendLsn send_lsn)
: cntx_(cntx), journal_(journal), send_lsn_(send_lsn) {
// cache the flag to avoid accessing it later.
replication_stream_output_limit_cached = absl::GetFlag(FLAGS_replication_stream_output_limit);
}
Expand All @@ -52,35 +52,29 @@ JournalStreamer::~JournalStreamer() {
VLOG(1) << "~JournalStreamer";
}

void JournalStreamer::Start(util::FiberSocketBase* dest, bool send_lsn) {
CHECK(dest_ == nullptr && dest != nullptr);
dest_ = dest;
journal_cb_id_ =
journal_->RegisterOnChange([this, send_lsn](const JournalItem& item, bool allow_await) {
if (allow_await) {
ThrottleIfNeeded();
// No record to write, just await if data was written so consumer will read the data.
// TODO: shouldnt we trigger async write in noop??
if (item.opcode == Op::NOOP)
return;
}

if (!ShouldWrite(item)) {
return;
}
void JournalStreamer::ConsumeJournalChange(const JournalItem& item) {
if (!ShouldWrite(item)) {
return;
}

Write(item.data);
time_t now = time(nullptr);
DCHECK_GT(item.lsn, last_lsn_writen_);
Write(item.data);
time_t now = time(nullptr);
last_lsn_writen_ = item.lsn;
// TODO: to chain it to the previous Write call.
if (send_lsn_ == SendLsn::YES && now - last_lsn_time_ > 3) {
last_lsn_time_ = now;
io::StringSink sink;
JournalWriter writer(&sink);
writer.Write(Entry{journal::Op::LSN, item.lsn});
Write(std::move(sink).str());
}
}

// TODO: to chain it to the previous Write call.
if (send_lsn && now - last_lsn_time_ > 3) {
last_lsn_time_ = now;
io::StringSink sink;
JournalWriter writer(&sink);
writer.Write(Entry{journal::Op::LSN, item.lsn});
Write(std::move(sink).str());
}
});
void JournalStreamer::Start(util::FiberSocketBase* dest) {
CHECK(dest_ == nullptr && dest != nullptr);
dest_ = dest;
journal_cb_id_ = journal_->RegisterOnChange(this);
}

void JournalStreamer::Cancel() {
Expand Down Expand Up @@ -188,22 +182,24 @@ bool JournalStreamer::IsStalled() const {

RestoreStreamer::RestoreStreamer(DbSlice* slice, cluster::SlotSet slots, journal::Journal* journal,
ExecutionState* cntx)
: JournalStreamer(journal, cntx), db_slice_(slice), my_slots_(std::move(slots)) {
: JournalStreamer(journal, cntx, JournalStreamer::SendLsn::NO),
db_slice_(slice),
my_slots_(std::move(slots)) {
DCHECK(slice != nullptr);
migration_buckets_serialization_threshold_cached =
absl::GetFlag(FLAGS_migration_buckets_serialization_threshold);
db_array_ = slice->databases(); // Inc ref to make sure DB isn't deleted while we use it
}

void RestoreStreamer::Start(util::FiberSocketBase* dest, bool send_lsn) {
void RestoreStreamer::Start(util::FiberSocketBase* dest) {
if (!cntx_->IsRunning())
return;

VLOG(1) << "RestoreStreamer start";
auto db_cb = absl::bind_front(&RestoreStreamer::OnDbChange, this);
snapshot_version_ = db_slice_->RegisterOnChange(std::move(db_cb));

JournalStreamer::Start(dest, send_lsn);
JournalStreamer::Start(dest);
}

void RestoreStreamer::Run() {
Expand Down
15 changes: 10 additions & 5 deletions src/server/journal/streamer.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,20 @@ namespace dfly {

// Buffered single-shard journal streamer that listens for journal changes with a
// journal listener and writes them to a destination sink in a separate fiber.
class JournalStreamer {
class JournalStreamer : public journal::JournalConsumerInterface {
public:
JournalStreamer(journal::Journal* journal, ExecutionState* cntx);
enum class SendLsn { NO = 0, YES = 1 };
JournalStreamer(journal::Journal* journal, ExecutionState* cntx, SendLsn send_lsn);
virtual ~JournalStreamer();

// Self referential.
JournalStreamer(const JournalStreamer& other) = delete;
JournalStreamer(JournalStreamer&& other) = delete;

// Register journal listener and start writer in fiber.
virtual void Start(util::FiberSocketBase* dest, bool send_lsn);
virtual void Start(util::FiberSocketBase* dest);

void ConsumeJournalChange(const journal::JournalItem& item);

// Must be called on context cancellation for unblocking
// and manual cleanup.
Expand All @@ -48,7 +51,7 @@ class JournalStreamer {
void ThrottleIfNeeded();

virtual bool ShouldWrite(const journal::JournalItem& item) const {
return cntx_->IsRunning();
return cntx_->IsRunning() && item.opcode != journal::Op::NOOP;
}

void WaitForInflightToComplete();
Expand All @@ -68,8 +71,10 @@ class JournalStreamer {

size_t in_flight_bytes_ = 0, total_sent_ = 0;
time_t last_lsn_time_ = 0;
LSN last_lsn_writen_ = 0;
util::fb2::EventCount waker_;
uint32_t journal_cb_id_{0};
SendLsn send_lsn_;
};

// Serializes existing DB as RESTORE commands, and sends updates as regular commands.
Expand All @@ -80,7 +85,7 @@ class RestoreStreamer : public JournalStreamer {
ExecutionState* cntx);
~RestoreStreamer() override;

void Start(util::FiberSocketBase* dest, bool send_lsn = false) override;
void Start(util::FiberSocketBase* dest) override;

void Run();

Expand Down
9 changes: 8 additions & 1 deletion src/server/journal/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,14 @@ struct JournalItem {
std::optional<SlotId> slot;
};

using ChangeCallback = std::function<void(const JournalItem&, bool await)>;
struct JournalConsumerInterface {
virtual ~JournalConsumerInterface() = default;

// Receives a journal change for serializing
virtual void ConsumeJournalChange(const JournalItem& item) = 0;
// Waits for writing the serialized data
virtual void ThrottleIfNeeded() = 0;
};

} // namespace journal
} // namespace dfly
21 changes: 7 additions & 14 deletions src/server/snapshot.cc
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,7 @@ void SliceSnapshot::Start(bool stream_journal, SnapshotFlush allow_flush) {
if (stream_journal) {
auto* journal = db_slice_->shard_owner()->journal();
DCHECK(journal);
auto journal_cb = [this](const journal::JournalItem& item, bool await) {
OnJournalEntry(item, await);
};
journal_cb_id_ = journal->RegisterOnChange(std::move(journal_cb));
journal_cb_id_ = journal->RegisterOnChange(this);
}

const auto flush_threshold = ServerState::tlocal()->serialization_max_chunk_size;
Expand Down Expand Up @@ -131,6 +128,7 @@ void SliceSnapshot::FinalizeJournalStream(bool cancel) {
journal->UnregisterOnChange(cb_id);
if (!cancel) {
// always succeeds because serializer_ flushes to string.
VLOG(1) << "FinalizeJournalStream lsn: " << journal->GetLsn();
std::ignore = serializer_->SendJournalOffset(journal->GetLsn());
PushSerialized(true);
}
Expand Down Expand Up @@ -227,10 +225,7 @@ void SliceSnapshot::SwitchIncrementalFb(LSN lsn) {
if (journal->GetLsn() == lsn) {
std::ignore = serializer_->SendFullSyncCut();

auto journal_cb = [this](const journal::JournalItem& item, bool await) {
OnJournalEntry(item, await);
};
journal_cb_id_ = journal->RegisterOnChange(std::move(journal_cb));
journal_cb_id_ = journal->RegisterOnChange(this);
PushSerialized(true);
} else {
// We stopped but we didn't manage to send the whole stream.
Expand Down Expand Up @@ -415,7 +410,7 @@ void SliceSnapshot::OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req)
// transaction.
// allow_flush is controlled by Journal::SetFlushMode
// (usually it's true unless we are in the middle of a critical section that can not preempt).
void SliceSnapshot::OnJournalEntry(const journal::JournalItem& item, bool allow_flush) {
void SliceSnapshot::ConsumeJournalChange(const journal::JournalItem& item) {
{
// We grab the lock in case we are in the middle of serializing a bucket, so it serves as a
// barrier here for atomic serialization.
Expand All @@ -424,12 +419,10 @@ void SliceSnapshot::OnJournalEntry(const journal::JournalItem& item, bool allow_
std::ignore = serializer_->WriteJournalEntry(item.data);
}
}
}

if (allow_flush) {
// This is the only place that flushes in streaming mode
// once the iterate buckets fiber finished.
PushSerialized(false);
}
void SliceSnapshot::ThrottleIfNeeded() {
PushSerialized(false);
}

size_t SliceSnapshot::GetBufferCapacity() const {
Expand Down
6 changes: 5 additions & 1 deletion src/server/snapshot.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ struct Entry;
// and submitting all values to an output sink.
// In journal streaming mode, the snapshot continues submitting changes
// over the sink until explicitly stopped.
class SliceSnapshot {
class SliceSnapshot : public journal::JournalConsumerInterface {
public:
// Represents a target for receiving snapshot data.
struct SnapshotDataConsumerInterface {
Expand Down Expand Up @@ -103,6 +103,10 @@ class SliceSnapshot {

RdbSaver::SnapshotStats GetCurrentSnapshotProgress() const;

// Journal listener
void ConsumeJournalChange(const journal::JournalItem& item);
void ThrottleIfNeeded();

private:
// Main snapshotting fiber that iterates over all buckets in the db slice
// and submits them to SerializeBucket.
Expand Down
12 changes: 10 additions & 2 deletions src/server/transaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -149,11 +149,19 @@ cv_status Transaction::BatonBarrier::Wait(time_point tp) {

Transaction::Guard::Guard(Transaction* tx) : tx(tx) {
DCHECK(tx->cid_->opt_mask() & CO::GLOBAL_TRANS);
tx->Execute([](auto*, auto*) { return OpStatus::OK; }, false);
auto cb = [&](Transaction* t, EngineShard* shard) {
namespaces->GetDefaultNamespace().GetDbSlice(shard->shard_id()).SetExpireAllowed(false);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about eviction?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we check the same flag in eviction flow FreeMemWithEvictionStep checks expire_allowed_

return OpStatus::OK;
};
tx->Execute(cb, false);
}

Transaction::Guard::~Guard() {
tx->Conclude();
auto cb = [&](Transaction* t, EngineShard* shard) {
namespaces->GetDefaultNamespace().GetDbSlice(shard->shard_id()).SetExpireAllowed(true);
return OpStatus::OK;
};
tx->Execute(cb, true);
tx->Refurbish();
}

Expand Down
Loading
Loading