Skip to content

Commit 8a3fee2

Browse files
committed
fix: merge branch 'main' into bobik/rename_in_cluster_mode_crash
2 parents ac243af + d5c3752 commit 8a3fee2

File tree

7 files changed

+113
-127
lines changed

7 files changed

+113
-127
lines changed

.github/workflows/ci.yml

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ jobs:
140140
echo "disk space is:"
141141
df -h
142142
143-
- name: C++ Unit Tests
143+
- name: C++ Unit Tests - IoUring
144144
run: |
145145
cd ${GITHUB_WORKSPACE}/build
146146
echo Run ctest -V -L DFLY
@@ -151,7 +151,14 @@ jobs:
151151
# Run allocation tracker test separately without alsologtostderr because it generates a TON of logs.
152152
FLAGS_fiber_safety_margin=4096 timeout 5m ./allocation_tracker_test
153153
154-
echo "Running tests with --force_epoll"
154+
timeout 5m ./dragonfly_test
155+
timeout 5m ./json_family_test --jsonpathv2=false
156+
timeout 5m ./tiered_storage_test --vmodule=db_slice=2 --logtostderr
157+
158+
159+
- name: C++ Unit Tests - Epoll
160+
run: |
161+
cd ${GITHUB_WORKSPACE}/build
155162
156163
# Create a rule that automatically prints stacktrace upon segfault
157164
cat > ./init.gdb <<EOF
@@ -166,18 +173,14 @@ jobs:
166173
167174
FLAGS_fiber_safety_margin=4096 FLAGS_force_epoll=true timeout 5m ./allocation_tracker_test
168175
169-
echo "Finished running tests with --force_epoll"
170-
171-
echo "Running tests with --cluster_mode=emulated"
176+
- name: C++ Unit Tests - IoUring with cluster mode
177+
run: |
172178
FLAGS_fiber_safety_margin=4096 FLAGS_cluster_mode=emulated timeout 20m ctest -V -L DFLY
173179
174-
echo "Running tests with both --cluster_mode=emulated & --lock_on_hashtags"
180+
- name: C++ Unit Tests - IoUring with cluster mode and FLAGS_lock_on_hashtags
181+
run: |
175182
FLAGS_fiber_safety_margin=4096 FLAGS_cluster_mode=emulated FLAGS_lock_on_hashtags=true timeout 20m ctest -V -L DFLY
176183
177-
timeout 5m ./dragonfly_test
178-
timeout 5m ./json_family_test --jsonpathv2=false
179-
timeout 5m ./tiered_storage_test --vmodule=db_slice=2 --logtostderr
180-
181184
- name: Upload unit logs on failure
182185
if: failure()
183186
uses: actions/upload-artifact@v4

src/redis/lua/struct/lua_struct.c

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -82,14 +82,12 @@ typedef struct Header {
8282
} Header;
8383

8484

85-
static int getnum (lua_State *L, const char **fmt, int df) {
85+
static int getnum (const char **fmt, int df) {
8686
if (!isdigit(**fmt)) /* no number? */
8787
return df; /* return default value */
8888
else {
8989
int a = 0;
9090
do {
91-
if (a > (INT_MAX / 10) || a * 10 > (INT_MAX - (**fmt - '0')))
92-
luaL_error(L, "integral size overflow");
9391
a = a*10 + *((*fmt)++) - '0';
9492
} while (isdigit(**fmt));
9593
return a;
@@ -110,9 +108,9 @@ static size_t optsize (lua_State *L, char opt, const char **fmt) {
110108
case 'f': return sizeof(float);
111109
case 'd': return sizeof(double);
112110
case 'x': return 1;
113-
case 'c': return getnum(L, fmt, 1);
111+
case 'c': return getnum(fmt, 1);
114112
case 'i': case 'I': {
115-
int sz = getnum(L, fmt, sizeof(int));
113+
int sz = getnum(fmt, sizeof(int));
116114
if (sz > MAXINTSIZE)
117115
luaL_error(L, "integral size %d is larger than limit of %d",
118116
sz, MAXINTSIZE);
@@ -145,7 +143,7 @@ static void controloptions (lua_State *L, int opt, const char **fmt,
145143
case '>': h->endian = BIG; return;
146144
case '<': h->endian = LITTLE; return;
147145
case '!': {
148-
int a = getnum(L, fmt, MAXALIGN);
146+
int a = getnum(fmt, MAXALIGN);
149147
if (!isp2(a))
150148
luaL_error(L, "alignment %d is not a power of 2", a);
151149
h->align = a;

src/server/cluster_support.cc

Lines changed: 13 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,10 @@ ABSL_FLAG(string, cluster_mode, "",
1717
"Cluster mode supported. Possible values are "
1818
"'emulated', 'yes' or ''");
1919

20+
ABSL_FLAG(bool, experimental_cluster_shard_by_slot, false,
21+
"If true, cluster mode is enabled and sharding is done by slot. "
22+
"Otherwise, sharding is done by hash tag.");
23+
2024
namespace dfly {
2125

2226
void UniqueSlotChecker::Add(std::string_view key) {
@@ -43,16 +47,13 @@ optional<SlotId> UniqueSlotChecker::GetUniqueSlotId() const {
4347
return slot_id_ > kMaxSlotNum ? optional<SlotId>() : slot_id_;
4448
}
4549

46-
namespace {
47-
enum class ClusterMode {
48-
kUninitialized,
49-
kNoCluster,
50-
kEmulatedCluster,
51-
kRealCluster,
52-
};
53-
50+
namespace detail {
5451
ClusterMode cluster_mode = ClusterMode::kUninitialized;
55-
} // namespace
52+
bool cluster_shard_by_slot = false;
53+
54+
} // namespace detail
55+
56+
using namespace detail;
5657

5758
void InitializeCluster() {
5859
string cluster_mode_str = absl::GetFlag(FLAGS_cluster_mode);
@@ -67,25 +68,17 @@ void InitializeCluster() {
6768
LOG(ERROR) << "Invalid value for flag --cluster_mode. Exiting...";
6869
exit(1);
6970
}
70-
}
7171

72-
bool IsClusterEnabled() {
73-
return cluster_mode == ClusterMode::kRealCluster;
74-
}
75-
76-
bool IsClusterEmulated() {
77-
return cluster_mode == ClusterMode::kEmulatedCluster;
72+
if (cluster_mode != ClusterMode::kNoCluster) {
73+
cluster_shard_by_slot = absl::GetFlag(FLAGS_experimental_cluster_shard_by_slot);
74+
}
7875
}
7976

8077
SlotId KeySlot(std::string_view key) {
8178
string_view tag = LockTagOptions::instance().Tag(key);
8279
return crc16(tag.data(), tag.length()) & kMaxSlotNum;
8380
}
8481

85-
bool IsClusterEnabledOrEmulated() {
86-
return IsClusterEnabled() || IsClusterEmulated();
87-
}
88-
8982
bool IsClusterShardedByTag() {
9083
return IsClusterEnabledOrEmulated() || LockTagOptions::instance().enabled;
9184
}

src/server/cluster_support.h

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,20 @@
1010

1111
namespace dfly {
1212

13+
namespace detail {
14+
15+
enum class ClusterMode {
16+
kUninitialized,
17+
kNoCluster,
18+
kEmulatedCluster,
19+
kRealCluster,
20+
};
21+
22+
extern ClusterMode cluster_mode;
23+
extern bool cluster_shard_by_slot;
24+
25+
}; // namespace detail
26+
1327
using SlotId = std::uint16_t;
1428
constexpr SlotId kMaxSlotNum = 0x3FFF;
1529

@@ -42,9 +56,23 @@ class UniqueSlotChecker {
4256
SlotId KeySlot(std::string_view key);
4357

4458
void InitializeCluster();
45-
bool IsClusterEnabled();
46-
bool IsClusterEmulated();
47-
bool IsClusterEnabledOrEmulated();
59+
60+
inline bool IsClusterEnabled() {
61+
return detail::cluster_mode == detail::ClusterMode::kRealCluster;
62+
}
63+
64+
inline bool IsClusterEmulated() {
65+
return detail::cluster_mode == detail::ClusterMode::kEmulatedCluster;
66+
}
67+
68+
inline bool IsClusterEnabledOrEmulated() {
69+
return IsClusterEnabled() || IsClusterEmulated();
70+
}
71+
72+
inline bool IsClusterShardedBySlot() {
73+
return detail::cluster_shard_by_slot;
74+
}
75+
4876
bool IsClusterShardedByTag();
4977

5078
} // namespace dfly

src/server/debugcmd.cc

Lines changed: 39 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ extern "C" {
2828
#include "core/sorted_map.h"
2929
#include "core/string_map.h"
3030
#include "core/string_set.h"
31+
#include "facade/cmd_arg_parser.h"
3132
#include "server/blocking_controller.h"
3233
#include "server/container_utils.h"
3334
#include "server/engine_shard_set.h"
@@ -38,7 +39,6 @@ extern "C" {
3839
#include "server/server_state.h"
3940
#include "server/string_family.h"
4041
#include "server/transaction.h"
41-
4242
using namespace std;
4343

4444
ABSL_DECLARE_FLAG(string, dir);
@@ -712,106 +712,57 @@ void DebugCmd::Migration(CmdArgList args, facade::SinkReplyBuilder* builder) {
712712
return builder->SendError(UnknownSubCmd("MIGRATION", "DEBUG"));
713713
}
714714

715+
enum PopulateFlag { FLAG_RAND, FLAG_TYPE, FLAG_ELEMENTS, FLAG_SLOT, FLAG_EXPIRE, FLAG_UNKNOWN };
716+
715717
// Populate arguments format:
716718
// required: (total count) (key prefix) (val size)
717719
// optional: [RAND | TYPE typename | ELEMENTS element num | SLOTS (key value)+ | EXPIRE start end]
718720
optional<DebugCmd::PopulateOptions> DebugCmd::ParsePopulateArgs(CmdArgList args,
719721
facade::SinkReplyBuilder* builder) {
720-
if (args.size() < 2) {
721-
builder->SendError(UnknownSubCmd("populate", "DEBUG"));
722-
return nullopt;
723-
}
724-
722+
CmdArgParser parser(args.subspan(1));
725723
PopulateOptions options;
726-
if (!absl::SimpleAtoi(ArgS(args, 1), &options.total_count)) {
727-
builder->SendError(kUintErr);
728-
return nullopt;
729-
}
730-
731-
if (args.size() > 2) {
732-
options.prefix = ArgS(args, 2);
733-
}
734724

735-
if (args.size() > 3) {
736-
if (!absl::SimpleAtoi(ArgS(args, 3), &options.val_size)) {
737-
builder->SendError(kUintErr);
738-
return nullopt;
739-
}
740-
}
725+
options.total_count = parser.Next<uint64_t>();
726+
options.prefix = parser.NextOrDefault<string_view>("key");
727+
options.val_size = parser.NextOrDefault<uint32_t>(16);
741728

742-
for (size_t index = 4; args.size() > index; ++index) {
743-
string str = absl::AsciiStrToUpper(ArgS(args, index));
744-
if (str == "RAND") {
745-
options.populate_random_values = true;
746-
} else if (str == "TYPE") {
747-
if (args.size() < index + 2) {
748-
builder->SendError(kSyntaxErr);
749-
return nullopt;
750-
}
751-
++index;
752-
options.type = absl::AsciiStrToUpper(ArgS(args, index));
753-
} else if (str == "ELEMENTS") {
754-
if (args.size() < index + 2) {
755-
builder->SendError(kSyntaxErr);
756-
return nullopt;
757-
}
758-
if (!absl::SimpleAtoi(ArgS(args, ++index), &options.elements)) {
759-
builder->SendError(kSyntaxErr);
760-
return nullopt;
761-
}
762-
} else if (str == "SLOTS") {
763-
if (args.size() < index + 3) {
764-
builder->SendError(kSyntaxErr);
765-
return nullopt;
729+
while (parser.HasNext()) {
730+
PopulateFlag flag = parser.MapNext("RAND", FLAG_RAND, "TYPE", FLAG_TYPE, "ELEMENTS",
731+
FLAG_ELEMENTS, "SLOTS", FLAG_SLOT, "EXPIRE", FLAG_EXPIRE);
732+
switch (flag) {
733+
case FLAG_RAND:
734+
options.populate_random_values = true;
735+
break;
736+
case FLAG_TYPE:
737+
options.type = absl::AsciiStrToUpper(parser.Next<string_view>());
738+
break;
739+
case FLAG_ELEMENTS:
740+
options.elements = parser.Next<uint32_t>();
741+
break;
742+
case FLAG_SLOT: {
743+
auto [start, end] = parser.Next<FInt<0, 16383>, FInt<0, 16383>>();
744+
options.slot_range = cluster::SlotRange{SlotId(start), SlotId(end)};
745+
break;
766746
}
767-
768-
auto parse_slot = [](string_view slot_str) -> OpResult<uint32_t> {
769-
uint32_t slot_id;
770-
if (!absl::SimpleAtoi(slot_str, &slot_id)) {
771-
return facade::OpStatus::INVALID_INT;
772-
}
773-
if (slot_id > kMaxSlotNum) {
774-
return facade::OpStatus::INVALID_VALUE;
747+
case FLAG_EXPIRE: {
748+
auto [min_ttl, max_ttl] = parser.Next<uint32_t, uint32_t>();
749+
if (min_ttl >= max_ttl) {
750+
builder->SendError(kExpiryOutOfRange);
751+
(void)parser.Error();
752+
return nullopt;
775753
}
776-
return slot_id;
777-
};
778-
779-
auto start = parse_slot(ArgS(args, ++index));
780-
if (start.status() != facade::OpStatus::OK) {
781-
builder->SendError(start.status());
782-
return nullopt;
783-
}
784-
auto end = parse_slot(ArgS(args, ++index));
785-
if (end.status() != facade::OpStatus::OK) {
786-
builder->SendError(end.status());
787-
return nullopt;
788-
}
789-
options.slot_range = cluster::SlotRange{.start = static_cast<SlotId>(start.value()),
790-
.end = static_cast<SlotId>(end.value())};
791-
} else if (str == "EXPIRE") {
792-
if (args.size() < index + 3) {
793-
builder->SendError(kSyntaxErr);
794-
return nullopt;
795-
}
796-
uint32_t start, end;
797-
if (!absl::SimpleAtoi(ArgS(args, ++index), &start)) {
798-
builder->SendError(kSyntaxErr);
799-
return nullopt;
800-
}
801-
if (!absl::SimpleAtoi(ArgS(args, ++index), &end)) {
802-
builder->SendError(kSyntaxErr);
803-
return nullopt;
804-
}
805-
if (start >= end) {
806-
builder->SendError(kExpiryOutOfRange);
807-
return nullopt;
754+
options.expire_ttl_range = std::make_pair(min_ttl, max_ttl);
755+
break;
808756
}
809-
options.expire_ttl_range = std::make_pair(start, end);
810-
} else {
811-
builder->SendError(kSyntaxErr);
812-
return nullopt;
757+
default:
758+
LOG(FATAL) << "Unexpected flag in PopulateArgs. Args: " << args;
759+
break;
813760
}
814761
}
762+
if (parser.HasError()) {
763+
builder->SendError(parser.Error()->MakeReply());
764+
return nullopt;
765+
}
815766
return options;
816767
}
817768

src/server/engine_shard.cc

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -261,6 +261,17 @@ __thread EngineShard* EngineShard::shard_ = nullptr;
261261
uint64_t TEST_current_time_ms = 0;
262262

263263
ShardId Shard(string_view v, ShardId shard_num) {
264+
// This cluster sharding is not necessary and may degrade keys distribution among shard threads.
265+
// For example, if we have 3 shards, then no single-char keys will be assigned to shard 2 and
266+
// 32 single char keys in range ['_' - '~'] will be assigned to shard 0.
267+
// Yes, SlotId function does not have great distribution properties.
268+
// On the other side, slot based sharding may help with pipeline squashing optimizations,
269+
// because they rely on commands being single-sharded.
270+
// TODO: once we improve our squashing logic, we can remove this.
271+
if (IsClusterShardedBySlot()) {
272+
return KeySlot(v) % shard_num;
273+
}
274+
264275
if (IsClusterShardedByTag()) {
265276
v = LockTagOptions::instance().Tag(v);
266277
}

src/server/hll_family_test.cc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,8 @@ TEST_F(HllFamilyTest, MergeOverlapping) {
194194
}
195195

196196
TEST_F(HllFamilyTest, MergeInvalid) {
197+
GTEST_SKIP() << "TBD: MergeInvalid test fails with multi-shard runs, see #5004";
198+
197199
EXPECT_EQ(CheckedInt({"pfadd", "key1", "1", "2", "3"}), 1);
198200
EXPECT_EQ(Run({"set", "key2", "..."}), "OK");
199201
EXPECT_THAT(Run({"pfmerge", "key1", "key2"}), ErrArg(HllFamily::kInvalidHllErr));

0 commit comments

Comments
 (0)