Skip to content

Commit 3dfc8b2

Browse files
committed
Avoid pushing conflicts to the server
Use `proposeChanges` even if the server isn't in no-conflicts mode; this helps prevent pushing a change that creates a conflict. To make 100% sure we don't create a conflict, set a new `noconflicts` property in the `rev` request. These changes can be disabled by setting a new replicator option `outgoingConflicts` to true. Renamed the existing option `noConflicts` to `noIncomingConflicts` to avoid confusion. NOTE: This requires a small change in Sync Gateway to support the above changes on its end. Fixes #181
1 parent c6dbb13 commit 3dfc8b2

File tree

13 files changed

+200
-32
lines changed

13 files changed

+200
-32
lines changed

C/include/c4Replicator.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,8 @@ extern "C" {
170170
#define kC4ReplicatorOptionFilter "filter" // Filter name; string
171171
#define kC4ReplicatorOptionFilterParams "filterParams" // Filter params; Dict[string]
172172
#define kC4ReplicatorOptionSkipDeleted "skipDeleted" // Don't push/pull tombstones; bool
173-
#define kC4ReplicatorOptionNoConflicts "noConflicts" // Puller rejects conflicts; bool
173+
#define kC4ReplicatorOptionNoIncomingConflicts "noIncomingConflicts" // Reject incoming conflicts; bool
174+
#define kC4ReplicatorOptionOutgoingConflicts "outgoingConflicts" // Allow creating conflicts on remote; bool
174175
#define kC4ReplicatorCheckpointInterval "checkpointInterval" // How often to checkpoint, in seconds; number
175176
#define kC4ReplicatorOptionRemoteDBUniqueID "remoteDBUniqueID" // Stable ID for remote db with unstable URL; string
176177
#define kC4ReplicatorHeartbeatInterval "heartbeat" // Interval in secs to send a keepalive ping

LiteCore/RevTrees/RevTree.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -338,7 +338,7 @@ namespace litecore {
338338
}
339339

340340
void RevTree::removeBody(const Rev* rev) {
341-
if (rev->flags & Rev::kKeepBody) {
341+
if (rev->body()) {
342342
const_cast<Rev*>(rev)->removeBody();
343343
_changed = true;
344344
}

Replicator/DBWorker.cc

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -672,6 +672,8 @@ namespace litecore { namespace repl {
672672
msg["id"_sl] = request.docID;
673673
msg["rev"_sl] = request.revID;
674674
msg["sequence"_sl] = request.sequence;
675+
if (request.noConflicts)
676+
msg["noconflicts"_sl] = true;
675677
if (revisionFlags & kRevDeleted)
676678
msg["deleted"_sl] = "1"_sl;
677679
if (!history.empty())
@@ -805,7 +807,7 @@ namespace litecore { namespace repl {
805807
put.docID = rev->docID;
806808
put.revFlags = rev->flags | kRevKeepBody;
807809
put.existingRevision = true;
808-
put.allowConflict = true;
810+
put.allowConflict = !rev->noConflicts;
809811
put.history = history.data();
810812
put.historyCount = history.size();
811813
put.remoteDBID = _remoteDBID;

Replicator/IncomingRev.cc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,8 @@ namespace litecore { namespace repl {
6363
if (_revMessage->property("deleted"_sl))
6464
_rev.flags |= kRevDeleted;
6565
_rev.historyBuf = _revMessage->property("history"_sl);
66+
_rev.noConflicts = _revMessage->boolProperty("noconflicts"_sl)
67+
|| _options.noIncomingConflicts();
6668
slice sequence(_revMessage->property("sequence"_sl));
6769

6870
_peerError = (int)_revMessage->intProperty("error"_sl);

Replicator/Puller.cc

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,8 @@ namespace litecore { namespace repl {
4040
registerHandler("rev", &Puller::handleRev);
4141
_spareIncomingRevs.reserve(kMaxSpareIncomingRevs);
4242
_skipDeleted = _options.skipDeleted();
43-
if (nonPassive() && options.noConflicts())
44-
warn("noConflicts mode is not compatible with active pull replications!");
43+
if (nonPassive() && options.noIncomingConflicts())
44+
warn("noIncomingConflicts mode is not compatible with active pull replications!");
4545
}
4646

4747

@@ -124,7 +124,7 @@ namespace litecore { namespace repl {
124124
req->respond();
125125
} else if (req->noReply()) {
126126
warn("Got pointless noreply 'changes' message");
127-
} else if (_options.noConflicts() && !proposed) {
127+
} else if (_options.noIncomingConflicts() && !proposed) {
128128
// In conflict-free mode the protocol requires the pusher send "proposeChanges" instead
129129
req->respondWithError({"BLIP"_sl, 409});
130130
} else {

Replicator/Pusher.cc

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,17 @@ namespace litecore { namespace repl {
4040
,_skipDeleted(options.skipDeleted())
4141
{
4242
if (passive()) {
43+
// Passive replicator always sends "changes"
4344
_proposeChanges = false;
4445
_proposeChangesKnown = true;
46+
} else if (_options.properties[kC4ReplicatorOptionOutgoingConflicts].asBool()) {
47+
// Outgoing conflicts allowed: try "changes" 1st, but server may force "proposeChanges"
48+
_proposeChanges = false;
49+
_proposeChangesKnown = false;
50+
} else {
51+
// Default: always send "proposeChanges"
52+
_proposeChanges = true;
53+
_proposeChangesKnown = true;
4554
}
4655
filterByDocIDs(options.docIDs());
4756
registerHandler("subChanges", &Pusher::handleSubChanges);
@@ -69,7 +78,7 @@ namespace litecore { namespace repl {
6978
// Begins active push, starting from the next sequence after sinceSequence
7079
void Pusher::_start(C4SequenceNumber sinceSequence) {
7180
log("Starting %spush from local seq %llu",
72-
(_continuous ? "continuous " : ""), _lastSequence+1);
81+
(_continuous ? "continuous " : ""), sinceSequence+1);
7382
_started = true;
7483
_pendingSequences.clear(sinceSequence);
7584
startSending(sinceSequence);
@@ -255,6 +264,7 @@ namespace litecore { namespace repl {
255264
if (status == 0) {
256265
auto request = _revsToSend.emplace(_revsToSend.end(), change,
257266
maxHistory, legacyAttachments);
267+
request->noConflicts = true;
258268
request->ancestorRevIDs.emplace_back(change.remoteAncestorRevID);
259269
queued = true;
260270
} else if (status != 304) { // 304 means server has my rev already
@@ -308,7 +318,7 @@ namespace litecore { namespace repl {
308318
}
309319

310320

311-
// Subroutine of _gotChanges that sends a "rev" message containing a revision body.
321+
// Tells the DBWorker to send a "rev" message containing a revision body.
312322
void Pusher::sendRevision(const RevRequest &rev)
313323
{
314324
MessageProgressCallback onProgress;

Replicator/Pusher.hh

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,8 +72,8 @@ namespace litecore { namespace repl {
7272
DocIDSet _docIDs;
7373
bool _continuous;
7474
bool _skipDeleted;
75-
bool _proposeChanges {false};
76-
bool _proposeChangesKnown {false};
75+
bool _proposeChanges;
76+
bool _proposeChangesKnown;
7777

7878
C4SequenceNumber _lastSequence {0}; // Checkpointed last-sequence
7979
bool _gettingChanges {false}; // Waiting for _gotChanges() call?

Replicator/ReplicatorTypes.hh

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ namespace litecore { namespace repl {
6161
C4SequenceNumber sequence;
6262
uint64_t bodySize;
6363
C4RevisionFlags flags {0};
64+
bool noConflicts {false};
6465

6566
Rev() { }
6667

@@ -95,7 +96,7 @@ namespace litecore { namespace repl {
9596
};
9697

9798

98-
/** A revision I want from the peer; includes the opaque remote revision ID. */
99+
/** A revision I want from the peer; includes the opaque remote sequence ID. */
99100
struct RequestedRev : public Rev {
100101
alloc_slice remoteSequence;
101102

Replicator/Worker.hh

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,8 @@ namespace litecore { namespace repl {
7979
fleeceapi::Dict filterParams() const
8080
{return properties[kC4ReplicatorOptionFilterParams].asDict();}
8181
bool skipDeleted() const {return properties[kC4ReplicatorOptionSkipDeleted].asBool();}
82-
bool noConflicts() const {return properties[kC4ReplicatorOptionNoConflicts].asBool();}
82+
bool noIncomingConflicts() const {return properties[kC4ReplicatorOptionNoIncomingConflicts].asBool();}
83+
bool noOutgoingConflicts() const {return properties[kC4ReplicatorOptionNoIncomingConflicts].asBool();}
8384

8485
fleeceapi::Array arrayProperty(const char *name) const {
8586
return properties[name].asArray();
@@ -111,8 +112,8 @@ namespace litecore { namespace repl {
111112
return *this;
112113
}
113114

114-
Options& setNoConflicts() {
115-
return setProperty(C4STR(kC4ReplicatorOptionNoConflicts), true);
115+
Options& setNoIncomingConflicts() {
116+
return setProperty(C4STR(kC4ReplicatorOptionNoIncomingConflicts), true);
116117
}
117118

118119
explicit operator std::string() const;

Replicator/c4Replicator.hh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ struct C4Replicator : public RefCounted, Replicator::Delegate {
6565
new Replicator(otherDB,
6666
loopbackProvider().createWebSocket(addressFrom(db)),
6767
*this,
68-
Replicator::Options(kC4Passive, kC4Passive).setNoConflicts()),
68+
Replicator::Options(kC4Passive, kC4Passive).setNoIncomingConflicts()),
6969
params)
7070
{
7171
loopbackProvider().bind(_replicator->webSocket(), _otherReplicator->webSocket());

Replicator/tests/ReplicatorAPITest.cc

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -331,3 +331,53 @@ TEST_CASE_METHOD(ReplicatorAPITest, "API Pull Big Attachments", "[.SyncServer]")
331331
CHECK(size == 15198281);
332332
}
333333

334+
335+
TEST_CASE_METHOD(ReplicatorAPITest, "API Push Conflict", "[.SyncServer]") {
336+
importJSONLines(sFixturesDir + "names_100.json");
337+
replicate(kC4OneShot, kC4Disabled);
338+
339+
sendRemoteRequest("PUT", "0000013", "{\"_rev\":\"1-3cb9cfb09f3f0b5142e618553966ab73539b8888\","
340+
"\"serverSideUpdate\":true}"_sl);
341+
342+
createRev("0000013"_sl, "2-f000"_sl, kFleeceBody);
343+
344+
c4::ref<C4Document> doc = c4doc_get(db, C4STR("0000013"), true, nullptr);
345+
REQUIRE(doc);
346+
CHECK(doc->selectedRev.revID == C4STR("2-f000"));
347+
CHECK(doc->selectedRev.body.size > 0);
348+
REQUIRE(c4doc_selectParentRevision(doc));
349+
CHECK(doc->selectedRev.revID == C4STR("1-3cb9cfb09f3f0b5142e618553966ab73539b8888"));
350+
CHECK(doc->selectedRev.body.size > 0);
351+
CHECK((doc->selectedRev.flags & kRevKeepBody) != 0);
352+
353+
C4Log("-------- Pushing Again (conflict) --------");
354+
_expectedDocPushErrors = {"0000013"};
355+
replicate(kC4OneShot, kC4Disabled);
356+
357+
C4Log("-------- Pulling --------");
358+
_expectedDocPushErrors = { };
359+
_expectedDocPullErrors = {"0000013"};
360+
replicate(kC4Disabled, kC4OneShot);
361+
362+
C4Log("-------- Checking Conflict --------");
363+
doc = c4doc_get(db, C4STR("0000013"), true, nullptr);
364+
REQUIRE(doc);
365+
CHECK((doc->flags & kDocConflicted) != 0);
366+
CHECK(doc->selectedRev.revID == C4STR("2-f000"));
367+
CHECK(doc->selectedRev.body.size > 0);
368+
REQUIRE(c4doc_selectParentRevision(doc));
369+
CHECK(doc->selectedRev.revID == C4STR("1-3cb9cfb09f3f0b5142e618553966ab73539b8888"));
370+
#if 0 // FIX: These checks fail due to issue #402; re-enable when fixing that bug
371+
CHECK(doc->selectedRev.body.size > 0);
372+
CHECK((doc->selectedRev.flags & kRevKeepBody) != 0);
373+
#endif
374+
REQUIRE(c4doc_selectCurrentRevision(doc));
375+
REQUIRE(c4doc_selectNextRevision(doc));
376+
CHECK(doc->selectedRev.revID == C4STR("2-883a2dacc15171a466f76b9d2c39669b"));
377+
CHECK((doc->selectedRev.flags & kRevIsConflict) != 0);
378+
CHECK(doc->selectedRev.body.size > 0);
379+
REQUIRE(c4doc_selectParentRevision(doc));
380+
CHECK(doc->selectedRev.revID == C4STR("1-3cb9cfb09f3f0b5142e618553966ab73539b8888"));
381+
}
382+
383+

Replicator/tests/ReplicatorAPITest.hh

Lines changed: 29 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,11 @@ public:
144144

145145

146146
void replicate(C4ReplicatorMode push, C4ReplicatorMode pull, bool expectSuccess =true) {
147+
_callbackStatus = { };
148+
_numCallbacks = 0;
149+
memset(_numCallbacksWithLevel, 0, sizeof(_numCallbacksWithLevel));
150+
_docPushErrors = _docPullErrors = { };
151+
147152
if (push > kC4Passive && _remoteDBName == kScratchDBName && !db2 && !_flushedScratch) {
148153
flushScratchDatabase();
149154
_flushedScratch = true;
@@ -173,28 +178,41 @@ public:
173178
if (expectSuccess) {
174179
CHECK(status.error.code == 0);
175180
CHECK(_numCallbacksWithLevel[kC4Busy] > 0);
176-
//CHECK(_gotHeaders); //FIX: Enable this when civetweb can return HTTP headers
181+
if (!db2)
182+
CHECK(_headers != (FLDict)nullptr);
177183
}
178184
CHECK(_numCallbacksWithLevel[kC4Stopped] > 0);
179185
CHECK(_callbackStatus.level == status.level);
180186
CHECK(_callbackStatus.error.domain == status.error.domain);
181187
CHECK(_callbackStatus.error.code == status.error.code);
182-
CHECK(_docPullErrors.empty());
183-
CHECK(_docPushErrors.empty());
188+
CHECK(_docPullErrors == _expectedDocPullErrors);
189+
CHECK(_docPushErrors == _expectedDocPushErrors);
184190
}
185191

186192

187-
void flushScratchDatabase() {
188-
C4Log("*** Erasing scratch database");
189-
auto r = make_unique<REST::Response>("POST",
190-
(string)(slice)_address.hostname,
191-
(uint16_t)(_address.port + 1), // assume this is the admin port
192-
string("/") + (string)(slice)kScratchDBName + "/_flush");
193+
void sendRemoteRequest(const string &method,
194+
string path,
195+
slice body =nullslice,
196+
bool admin =false)
197+
{
198+
C4Log("*** Server command: %s %s", method.c_str(), path.c_str());
199+
auto r = make_unique<REST::Response>(method,
200+
(string)(slice)_address.hostname,
201+
(uint16_t)(_address.port + !!admin),
202+
string("/") + (string)(slice)kScratchDBName + "/" + path,
203+
map<string,string>{{"Content-Type", "application/json"}},
204+
body);
193205
REQUIRE(r);
194206
INFO("Status: " << (int)r->status() << " " << r->statusMessage());
195-
REQUIRE(r->status() == REST::HTTPStatus::OK);
207+
REQUIRE(r->status() <= REST::HTTPStatus::Created);
208+
}
209+
210+
211+
void flushScratchDatabase() {
212+
sendRemoteRequest("POST", "_flush", nullslice, true);
196213
}
197214

215+
198216
c4::ref<C4Database> db2;
199217
C4Address _address {kDefaultAddress};
200218
C4String _remoteDBName {kScratchDBName};
@@ -207,5 +225,6 @@ public:
207225
AllocedDict _headers;
208226
bool _stopWhenIdle {false};
209227
set<string> _docPushErrors, _docPullErrors;
228+
set<string> _expectedDocPushErrors, _expectedDocPullErrors;
210229
};
211230

0 commit comments

Comments
 (0)