summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2018-06-26 10:24:58 +0200
committerChristian Mollekopf <chrigi_1@fastmail.fm>2018-06-26 10:24:58 +0200
commitda4b74e593f1b1262e83824cc499bd855f1b4e3e (patch)
tree95b56aa073a85022c4218375e1c788894f01c2b6
parent121c3bc96a273790414ae114082053cb649fc49a (diff)
downloadsink-da4b74e593f1b1262e83824cc499bd855f1b4e3e.tar.gz
sink-da4b74e593f1b1262e83824cc499bd855f1b4e3e.zip
Avoid overwriting local changes with remote modifications.
The case we ran into is the following: * Fetching the full payload and marking all messages of a thread as read happens simultaneously. * The local modification to mark as read gets immediately overwritten when the full payload arrives. * Eventually the modification gets replayed to the server though (and the reversal isn't because coming from the source), so on next sync the situation fixes itself. To be able to improve this we try to protect local modifications in that properties that have been modified since baseRevision (which currently isn't, but should be equal to the last to the server replayed revision) are not overwritten. This conflict resolution strategy thus always prefers local modifications. baseRevision is currently set to the current maximum revision of the store at the time when the resource creates the modification.
-rw-r--r--common/entitybuffer.cpp6
-rw-r--r--common/entitybuffer.h6
-rw-r--r--common/pipeline.cpp14
-rw-r--r--common/storage/entitystore.cpp32
-rw-r--r--common/storage/entitystore.h4
-rw-r--r--tests/pipelinetest.cpp65
6 files changed, 106 insertions, 21 deletions
diff --git a/common/entitybuffer.cpp b/common/entitybuffer.cpp
index 32583cc..fa33dcc 100644
--- a/common/entitybuffer.cpp
+++ b/common/entitybuffer.cpp
@@ -32,7 +32,7 @@ const Sink::Entity &EntityBuffer::entity() const
32 return *mEntity; 32 return *mEntity;
33} 33}
34 34
35const uint8_t *EntityBuffer::resourceBuffer() 35const uint8_t *EntityBuffer::resourceBuffer() const
36{ 36{
37 if (!mEntity) { 37 if (!mEntity) {
38 qDebug() << "no buffer"; 38 qDebug() << "no buffer";
@@ -41,7 +41,7 @@ const uint8_t *EntityBuffer::resourceBuffer()
41 return mEntity->resource()->Data(); 41 return mEntity->resource()->Data();
42} 42}
43 43
44const uint8_t *EntityBuffer::metadataBuffer() 44const uint8_t *EntityBuffer::metadataBuffer() const
45{ 45{
46 if (!mEntity) { 46 if (!mEntity) {
47 return nullptr; 47 return nullptr;
@@ -49,7 +49,7 @@ const uint8_t *EntityBuffer::metadataBuffer()
49 return mEntity->metadata()->Data(); 49 return mEntity->metadata()->Data();
50} 50}
51 51
52const uint8_t *EntityBuffer::localBuffer() 52const uint8_t *EntityBuffer::localBuffer() const
53{ 53{
54 if (!mEntity) { 54 if (!mEntity) {
55 return nullptr; 55 return nullptr;
diff --git a/common/entitybuffer.h b/common/entitybuffer.h
index d73a138..31e6aed 100644
--- a/common/entitybuffer.h
+++ b/common/entitybuffer.h
@@ -25,9 +25,9 @@ public:
25 * Note that @param data will need to remain valid and the data is not copied. 25 * Note that @param data will need to remain valid and the data is not copied.
26 */ 26 */
27 EntityBuffer(const QByteArray &data); 27 EntityBuffer(const QByteArray &data);
28 const uint8_t *resourceBuffer(); 28 const uint8_t *resourceBuffer() const;
29 const uint8_t *metadataBuffer(); 29 const uint8_t *metadataBuffer() const;
30 const uint8_t *localBuffer(); 30 const uint8_t *localBuffer() const;
31 const Entity &entity() const; 31 const Entity &entity() const;
32 bool isValid() const; 32 bool isValid() const;
33 33
diff --git a/common/pipeline.cpp b/common/pipeline.cpp
index 4afe9f3..cbdc91d 100644
--- a/common/pipeline.cpp
+++ b/common/pipeline.cpp
@@ -258,7 +258,19 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size)
258 return KAsync::error<qint64>(0); 258 return KAsync::error<qint64>(0);
259 } 259 }
260 260
261 auto newEntity = d->entityStore.applyDiff(bufferType, current, diff, deletions); 261 //We avoid overwriting local changes that haven't been played back yet with remote modifications
262 QSet<QByteArray> excludeProperties;
263 if (!replayToSource) { //We assume this means the change is coming from the source already
264 d->entityStore.readRevisions(bufferType, diff.identifier(), baseRevision, [&] (const QByteArray &uid, qint64 revision, const Sink::EntityBuffer &entity) {
265 if (entity.metadataBuffer()) {
266 if (auto metadata = GetMetadata(entity.metadataBuffer())) {
267 excludeProperties += BufferUtils::fromVector(*metadata->modifiedProperties()).toSet();
268 }
269 }
270 });
271 }
272
273 auto newEntity = d->entityStore.applyDiff(bufferType, current, diff, deletions, excludeProperties);
262 274
263 bool isMove = false; 275 bool isMove = false;
264 if (modifyEntity->targetResource()) { 276 if (modifyEntity->targetResource()) {
diff --git a/common/storage/entitystore.cpp b/common/storage/entitystore.cpp
index c5b5ffc..dd6bbf0 100644
--- a/common/storage/entitystore.cpp
+++ b/common/storage/entitystore.cpp
@@ -247,22 +247,26 @@ bool EntityStore::add(const QByteArray &type, ApplicationDomainType entity, bool
247 return true; 247 return true;
248} 248}
249 249
250ApplicationDomain::ApplicationDomainType EntityStore::applyDiff(const QByteArray &type, const ApplicationDomainType &current, const ApplicationDomainType &diff, const QByteArrayList &deletions) const 250ApplicationDomain::ApplicationDomainType EntityStore::applyDiff(const QByteArray &type, const ApplicationDomainType &current, const ApplicationDomainType &diff, const QByteArrayList &deletions, const QSet<QByteArray> &excludeProperties) const
251{ 251{
252 SinkTraceCtx(d->logCtx) << "Applying diff: " << current.availableProperties() << "Deletions: " << deletions << "Changeset: " << diff.changedProperties(); 252 SinkTraceCtx(d->logCtx) << "Applying diff: " << current.availableProperties() << "Deletions: " << deletions << "Changeset: " << diff.changedProperties() << "Excluded: " << excludeProperties;
253 auto newEntity = *ApplicationDomainType::getInMemoryRepresentation<ApplicationDomainType>(current, current.availableProperties()); 253 auto newEntity = *ApplicationDomainType::getInMemoryRepresentation<ApplicationDomainType>(current, current.availableProperties());
254 254
255 // Apply diff 255 // Apply diff
256 for (const auto &property : diff.changedProperties()) { 256 for (const auto &property : diff.changedProperties()) {
257 const auto value = diff.getProperty(property); 257 if (!excludeProperties.contains(property)) {
258 if (value.isValid()) { 258 const auto value = diff.getProperty(property);
259 newEntity.setProperty(property, value); 259 if (value.isValid()) {
260 newEntity.setProperty(property, value);
261 }
260 } 262 }
261 } 263 }
262 264
263 // Remove deletions 265 // Remove deletions
264 for (const auto &property : deletions) { 266 for (const auto &property : deletions) {
265 newEntity.setProperty(property, QVariant()); 267 if (!excludeProperties.contains(property)) {
268 newEntity.setProperty(property, QVariant());
269 }
266 } 270 }
267 return newEntity; 271 return newEntity;
268} 272}
@@ -639,6 +643,22 @@ bool EntityStore::exists(const QByteArray &type, const QByteArray &uid)
639 return true; 643 return true;
640} 644}
641 645
646void EntityStore::readRevisions(const QByteArray &type, const QByteArray &uid, qint64 startingRevision, const std::function<void(const QByteArray &uid, qint64 revision, const EntityBuffer &entity)> callback)
647{
648 Q_ASSERT(d);
649 Q_ASSERT(!uid.isEmpty());
650 DataStore::mainDatabase(d->transaction, type)
651 .scan(uid,
652 [&](const QByteArray &key, const QByteArray &value) -> bool {
653 const auto revision = DataStore::revisionFromKey(key);
654 if (revision >= startingRevision) {
655 callback(DataStore::uidFromKey(key), revision, Sink::EntityBuffer(value.data(), value.size()));
656 }
657 return true;
658 },
659 [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Error while reading: " << error.message; }, true);
660
661}
642 662
643qint64 EntityStore::maxRevision() 663qint64 EntityStore::maxRevision()
644{ 664{
diff --git a/common/storage/entitystore.h b/common/storage/entitystore.h
index ffa70b9..69de76c 100644
--- a/common/storage/entitystore.h
+++ b/common/storage/entitystore.h
@@ -49,7 +49,7 @@ public:
49 bool modify(const QByteArray &type, const ApplicationDomainType &current, ApplicationDomainType newEntity, bool replayToSource); 49 bool modify(const QByteArray &type, const ApplicationDomainType &current, ApplicationDomainType newEntity, bool replayToSource);
50 bool remove(const QByteArray &type, const ApplicationDomainType &current, bool replayToSource); 50 bool remove(const QByteArray &type, const ApplicationDomainType &current, bool replayToSource);
51 bool cleanupRevisions(qint64 revision); 51 bool cleanupRevisions(qint64 revision);
52 ApplicationDomainType applyDiff(const QByteArray &type, const ApplicationDomainType &current, const ApplicationDomainType &diff, const QByteArrayList &deletions) const; 52 ApplicationDomainType applyDiff(const QByteArray &type, const ApplicationDomainType &current, const ApplicationDomainType &diff, const QByteArrayList &deletions, const QSet<QByteArray> &excludeProperties = {}) const;
53 53
54 void startTransaction(Sink::Storage::DataStore::AccessMode); 54 void startTransaction(Sink::Storage::DataStore::AccessMode);
55 void commitTransaction(); 55 void commitTransaction();
@@ -122,6 +122,8 @@ public:
122 ///Db contains entity and entity is not yet removed 122 ///Db contains entity and entity is not yet removed
123 bool exists(const QByteArray &type, const QByteArray &uid); 123 bool exists(const QByteArray &type, const QByteArray &uid);
124 124
125 void readRevisions(const QByteArray &type, const QByteArray &uid, qint64 baseRevision, const std::function<void(const QByteArray &uid, qint64 revision, const EntityBuffer &entity)> callback);
126
125 qint64 maxRevision(); 127 qint64 maxRevision();
126 128
127 Sink::Log::Context logContext() const; 129 Sink::Log::Context logContext() const;
diff --git a/tests/pipelinetest.cpp b/tests/pipelinetest.cpp
index 0268ec5..45e2fbb 100644
--- a/tests/pipelinetest.cpp
+++ b/tests/pipelinetest.cpp
@@ -101,23 +101,25 @@ QByteArray createEntityCommand(const flatbuffers::FlatBufferBuilder &entityFbb)
101 return command; 101 return command;
102} 102}
103 103
104QByteArray modifyEntityCommand(const flatbuffers::FlatBufferBuilder &entityFbb, const QByteArray &uid, qint64 revision) 104QByteArray modifyEntityCommand(const flatbuffers::FlatBufferBuilder &entityFbb, const QByteArray &uid, qint64 revision, QStringList modifiedProperties = {"summary"}, bool replayToSource = true)
105{ 105{
106 flatbuffers::FlatBufferBuilder fbb; 106 flatbuffers::FlatBufferBuilder fbb;
107 auto type = fbb.CreateString(Sink::ApplicationDomain::getTypeName<Sink::ApplicationDomain::Event>().toStdString().data()); 107 auto type = fbb.CreateString(Sink::ApplicationDomain::getTypeName<Sink::ApplicationDomain::Event>().toStdString().data());
108 auto id = fbb.CreateString(std::string(uid.constData(), uid.size())); 108 auto id = fbb.CreateString(std::string(uid.constData(), uid.size()));
109 auto summaryProperty = fbb.CreateString("summary"); 109 std::vector<flatbuffers::Offset<flatbuffers::String>> modifiedVector;
110 std::vector<flatbuffers::Offset<flatbuffers::String>> modified; 110 for (const auto &modified : modifiedProperties) {
111 modified.push_back(summaryProperty); 111 modifiedVector.push_back(fbb.CreateString(modified.toStdString()));
112 }
112 auto delta = fbb.CreateVector<uint8_t>(entityFbb.GetBufferPointer(), entityFbb.GetSize()); 113 auto delta = fbb.CreateVector<uint8_t>(entityFbb.GetBufferPointer(), entityFbb.GetSize());
113 auto modifiedProperties = fbb.CreateVector(modified); 114 auto modifiedPropertiesVector = fbb.CreateVector(modifiedVector);
114 // auto delta = Sink::EntityBuffer::appendAsVector(fbb, buffer.constData(), buffer.size());
115 Sink::Commands::ModifyEntityBuilder builder(fbb); 115 Sink::Commands::ModifyEntityBuilder builder(fbb);
116 builder.add_domainType(type); 116 builder.add_domainType(type);
117 builder.add_delta(delta); 117 builder.add_delta(delta);
118 builder.add_revision(revision); 118 builder.add_revision(revision);
119 builder.add_entityId(id); 119 builder.add_entityId(id);
120 builder.add_modifiedProperties(modifiedProperties); 120 builder.add_modifiedProperties(modifiedPropertiesVector);
121 builder.add_replayToSource(replayToSource);
122
121 auto location = builder.Finish(); 123 auto location = builder.Finish();
122 Sink::Commands::FinishModifyEntityBuffer(fbb, location); 124 Sink::Commands::FinishModifyEntityBuffer(fbb, location);
123 125
@@ -401,6 +403,55 @@ private slots:
401 QCOMPARE(testProcessor->deletedSummaries.at(0), QByteArray("summary2")); 403 QCOMPARE(testProcessor->deletedSummaries.at(0), QByteArray("summary2"));
402 } 404 }
403 } 405 }
406
407 void testModifyWithConflict()
408 {
409 flatbuffers::FlatBufferBuilder entityFbb;
410 auto command = createEntityCommand(createEvent(entityFbb, "summary", "description"));
411
412 Sink::Pipeline pipeline(getContext(), {"test"});
413
414 auto adaptorFactory = QSharedPointer<TestEventAdaptorFactory>::create();
415
416 // Create the initial revision
417 pipeline.startTransaction();
418 pipeline.newEntity(command.constData(), command.size());
419 pipeline.commit();
420
421 // Get uid of written entity
422 auto keys = getKeys(instanceIdentifier(), "event.main");
423 QCOMPARE(keys.size(), 1);
424 const auto key = keys.first();
425 const auto uid = Sink::Storage::DataStore::uidFromKey(key);
426
427 //Simulate local modification
428 {
429 entityFbb.Clear();
430 auto modifyCommand = modifyEntityCommand(createEvent(entityFbb, "summaryLocal"), uid, 1, {"summary"}, true);
431 pipeline.startTransaction();
432 pipeline.modifiedEntity(modifyCommand.constData(), modifyCommand.size());
433 pipeline.commit();
434 }
435
436
437 //Simulate remote modification
438 //We assume the remote modification is not overly smart and always marks all properties as changed.
439 {
440 entityFbb.Clear();
441 auto modifyCommand = modifyEntityCommand(createEvent(entityFbb, "summaryRemote", "descriptionRemote"), uid, 2, {"summary", "description"}, false);
442 pipeline.startTransaction();
443 pipeline.modifiedEntity(modifyCommand.constData(), modifyCommand.size());
444 pipeline.commit();
445 }
446
447 // Ensure we've got the new revision with the modification
448 auto buffer = getEntity(instanceIdentifier(), "event.main", Sink::Storage::DataStore::assembleKey(uid, 3));
449 QVERIFY(!buffer.isEmpty());
450 Sink::EntityBuffer entityBuffer(buffer.data(), buffer.size());
451 auto adaptor = adaptorFactory->createAdaptor(entityBuffer.entity());
452 QVERIFY2(adaptor->getProperty("summary").toString() == QString("summaryLocal"), "The local modification was reverted.");
453 QVERIFY2(adaptor->getProperty("description").toString() == QString("descriptionRemote"), "The remote modification was not applied.");
454 }
404}; 455};
405 456
406QTEST_MAIN(PipelineTest) 457QTEST_MAIN(PipelineTest)