@@ -37,7 +37,12 @@ using std::string;
37
37
38
38
ABSL_FLAG (uint16_t , p, 6379 , " Server port" );
39
39
ABSL_FLAG (uint32_t , c, 20 , " Number of connections per thread" );
40
- ABSL_FLAG (uint32_t , qps, 20 , " QPS schedule at which the generator sends requests to the server" );
40
+ ABSL_FLAG (int32_t , qps, 20 ,
41
+ " QPS schedule at which the generator sends requests to the server "
42
+ " per single connection. 0 means - coordinated omission, and positive value will throttle "
43
+ " the actual qps if server is slower than the target qps. "
44
+ " negative value means - hard target, without throttling." );
45
+
41
46
ABSL_FLAG (uint32_t , n, 1000 , " Number of requests to send per connection" );
42
47
ABSL_FLAG (uint32_t , test_time, 0 , " Testing time in seconds" );
43
48
ABSL_FLAG (uint32_t , d, 16 , " Value size in bytes " );
@@ -604,24 +609,56 @@ void Driver::Connect(unsigned index, const tcp::endpoint& ep) {
604
609
605
610
void Driver::Run (uint64_t * cycle_ns, CommandGenerator* cmd_gen) {
606
611
start_ns_ = absl::GetCurrentTimeNanos ();
607
- unsigned pipeline = GetFlag (FLAGS_pipeline);
612
+ uint32_t pipeline = std::max<uint32_t >(GetFlag (FLAGS_pipeline), 1u );
613
+ bool should_throttle = GetFlag (FLAGS_qps) > 0 ;
608
614
609
615
stats_.num_clients ++;
610
616
int64_t time_limit_ns =
611
617
time_limit_ > 0 ? int64_t (time_limit_) * 1'000'000'000 + start_ns_ : INT64_MAX;
612
-
618
+ int64_t now = start_ns_;
613
619
SlotRange slot_range{0 , kNumSlots - 1 };
620
+ CHECK_GT (num_reqs_, 0u );
614
621
615
- for (unsigned i = 0 ; i < num_reqs_; ++i) {
616
- int64_t now = absl::GetCurrentTimeNanos ();
622
+ uint32_t num_batches = ((num_reqs_ - 1 ) / pipeline) + 1 ;
617
623
618
- if (now > time_limit_ns) {
619
- break ;
624
+ for (unsigned i = 0 ; i < num_batches && now < time_limit_ns; ++i) {
625
+ if (i == num_batches - 1 ) { // last batch
626
+ pipeline = num_reqs_ - i * pipeline;
627
+ }
628
+
629
+ for (unsigned j = 0 ; j < pipeline; ++j) {
630
+ // TODO: this skews the distribution if slot ranges are uneven.
631
+ // Ideally we would like to pick randomly a single slot from all the ranges we have
632
+ // and pass it to cmd_gen->Next below.
633
+ if (!shard_slots_.Empty ()) {
634
+ slot_range = shard_slots_.NextSlotRange (ep_, i);
635
+ }
636
+
637
+ string cmd = cmd_gen->Next (slot_range);
638
+
639
+ Req req;
640
+ req.start = absl::GetCurrentTimeNanos ();
641
+ req.might_hit = cmd_gen->might_hit ();
642
+
643
+ reqs_.push (req);
644
+
645
+ error_code ec = socket_->Write (io::Buffer (cmd));
646
+ if (ec && FiberSocketBase::IsConnClosed (ec)) {
647
+ // TODO: report failure
648
+ VLOG (1 ) << " Connection closed" ;
649
+ break ;
650
+ }
651
+ CHECK (!ec) << ec.message ();
652
+ if (cmd_gen->noreply ()) {
653
+ PopRequest ();
654
+ }
620
655
}
656
+
657
+ now = absl::GetCurrentTimeNanos ();
621
658
if (cycle_ns) {
622
659
int64_t target_ts = start_ns_ + i * (*cycle_ns);
623
660
int64_t sleep_ns = target_ts - now;
624
- if (reqs_.size () > 10 && sleep_ns <= 0 ) {
661
+ if (reqs_.size () > pipeline * 2 && should_throttle && sleep_ns <= 0 ) {
625
662
sleep_ns = 10'000 ;
626
663
}
627
664
@@ -630,7 +667,7 @@ void Driver::Run(uint64_t* cycle_ns, CommandGenerator* cmd_gen) {
630
667
// There is no point in sending more requests if they are piled up in the server.
631
668
do {
632
669
ThisFiber::SleepFor (chrono::nanoseconds (sleep_ns));
633
- } while (reqs_.size () > 10 );
670
+ } while (should_throttle && reqs_.size () > pipeline * 2 );
634
671
} else if (i % 256 == 255 ) {
635
672
ThisFiber::Yield ();
636
673
VLOG (5 ) << " Behind QPS schedule" ;
@@ -639,33 +676,7 @@ void Driver::Run(uint64_t* cycle_ns, CommandGenerator* cmd_gen) {
639
676
// Coordinated omission.
640
677
641
678
fb2::NoOpLock lk;
642
- cnd_.wait (lk, [this , pipeline] { return reqs_.size () < pipeline; });
643
- }
644
-
645
- // TODO: this skews the distribution if slot ranges are uneven.
646
- // Ideally we would like to pick randomly a single slot from all the ranges we have
647
- // and pass it to cmd_gen->Next below.
648
- if (!shard_slots_.Empty ()) {
649
- slot_range = shard_slots_.NextSlotRange (ep_, i);
650
- }
651
-
652
- string cmd = cmd_gen->Next (slot_range);
653
-
654
- Req req;
655
- req.start = absl::GetCurrentTimeNanos ();
656
- req.might_hit = cmd_gen->might_hit ();
657
-
658
- reqs_.push (req);
659
-
660
- error_code ec = socket_->Write (io::Buffer (cmd));
661
- if (ec && FiberSocketBase::IsConnClosed (ec)) {
662
- // TODO: report failure
663
- VLOG (1 ) << " Connection closed" ;
664
- break ;
665
- }
666
- CHECK (!ec) << ec.message ();
667
- if (cmd_gen->noreply ()) {
668
- PopRequest ();
679
+ cnd_.wait (lk, [this ] { return reqs_.empty (); });
669
680
}
670
681
}
671
682
@@ -908,12 +919,15 @@ void WatchFiber(size_t num_shards, atomic_bool* finish_signal, ProactorPool* pp)
908
919
num_shards = max<size_t >(num_shards, 1u );
909
920
uint64_t resp_goal = GetFlag (FLAGS_c) * pp->size () * GetFlag (FLAGS_n) * num_shards;
910
921
uint32_t time_limit = GetFlag (FLAGS_test_time);
922
+ bool should_throttle = GetFlag (FLAGS_qps) > 0 ;
911
923
912
924
while (*finish_signal == false ) {
913
925
// we sleep with resolution of 1s but print with lower frequency to be more responsive
914
926
// when benchmark finishes.
915
927
ThisFiber::SleepFor (1s);
916
- pp->AwaitBrief ([](auto , auto *) { client->AdjustCycle (); });
928
+ if (should_throttle) {
929
+ pp->AwaitBrief ([](auto , auto *) { client->AdjustCycle (); });
930
+ }
917
931
918
932
int64_t now = absl::GetCurrentTimeNanos ();
919
933
if (now - last_print < 5000'000'000LL ) // 5s
@@ -1084,9 +1098,9 @@ int main(int argc, char* argv[]) {
1084
1098
if (protocol == RESP) {
1085
1099
shards = proactor->Await ([&] { return FetchClusterInfo (ep, proactor); });
1086
1100
}
1087
- LOG (INFO) << " Connecting threads to "
1088
- << (shards.empty () ? string (" single node " )
1089
- : absl::StrCat (shards.size (), " shard cluster" ));
1101
+ CONSOLE_INFO << " Connecting to "
1102
+ << (shards.empty () ? string (" single node " )
1103
+ : absl::StrCat (shards.size (), " shard cluster" ));
1090
1104
1091
1105
if (!shards.empty () && !GetFlag (FLAGS_command).empty () && GetFlag (FLAGS_cluster_skip_tags)) {
1092
1106
// For custom commands we may need to use the same hashtag for multiple keys.
@@ -1112,9 +1126,11 @@ int main(int argc, char* argv[]) {
1112
1126
CHECK_LE (key_minimum, key_maximum);
1113
1127
1114
1128
uint32_t thread_key_step = 0 ;
1115
- const uint32_t qps = GetFlag (FLAGS_qps);
1129
+ uint32_t qps = abs (GetFlag (FLAGS_qps));
1130
+ bool throttle = GetFlag (FLAGS_qps) > 0 ;
1116
1131
const int64_t interval = qps ? 1'000'000'000LL / qps : 0 ;
1117
1132
uint64_t num_reqs = GetFlag (FLAGS_n);
1133
+
1118
1134
uint64_t total_conn_num = GetFlag (FLAGS_c) * pp->size ();
1119
1135
uint64_t total_requests = num_reqs * total_conn_num;
1120
1136
uint32_t time_limit = GetFlag (FLAGS_test_time);
@@ -1130,11 +1146,12 @@ int main(int argc, char* argv[]) {
1130
1146
1131
1147
if (!time_limit) {
1132
1148
CONSOLE_INFO << " Running " << pp->size () << " threads, sending " << num_reqs
1133
- << " requests per each connection, or " << total_requests << " requests overall" ;
1149
+ << " requests per each connection, or " << total_requests << " requests overall "
1150
+ << (throttle ? " with" : " without" ) << " throttling" ;
1134
1151
}
1135
1152
if (interval) {
1136
- CONSOLE_INFO << " At a rate of " << GetFlag (FLAGS_qps)
1137
- << " rps per connection, i.e. request every " << interval / 1000 << " us" ;
1153
+ CONSOLE_INFO << " At a rate of " << qps << " rps per connection, i.e. request every "
1154
+ << interval / 1000 << " us" ;
1138
1155
CONSOLE_INFO << " Overall scheduled RPS: " << qps * total_conn_num;
1139
1156
} else {
1140
1157
CONSOLE_INFO << " Coordinated omission mode - the rate is determined by the server" ;
0 commit comments