diff options
author | Christian Mollekopf <chrigi_1@fastmail.fm> | 2018-06-26 10:24:58 +0200 |
---|---|---|
committer | Christian Mollekopf <chrigi_1@fastmail.fm> | 2018-06-26 10:24:58 +0200 |
commit | da4b74e593f1b1262e83824cc499bd855f1b4e3e (patch) | |
tree | 95b56aa073a85022c4218375e1c788894f01c2b6 | |
parent | 121c3bc96a273790414ae114082053cb649fc49a (diff) | |
download | sink-da4b74e593f1b1262e83824cc499bd855f1b4e3e.tar.gz sink-da4b74e593f1b1262e83824cc499bd855f1b4e3e.zip |
Avoid overwriting local changes with remote modifications.
The case we ran into is the following:
* Fetching the full payload and marking all messages of a thread as read
happens simultaneously.
* The local modification to mark as read gets immediately overwritten
when the full payload arrives.
* Eventually the modification gets replayed to the server though (and
the reversal isn't because coming from the source), so on next sync the
situation fixes itself.
To be able to improve this we try to protect local modifications in that
properties that have been modified since baseRevision (which currently
isn't, but should be equal to the last to the server replayed revision)
are not overwritten. This conflict resolution strategy thus always
prefers local modifications. baseRevision is currently set to the
current maximum revision of the store at the time when the resource
creates the modification.
-rw-r--r-- | common/entitybuffer.cpp | 6 | ||||
-rw-r--r-- | common/entitybuffer.h | 6 | ||||
-rw-r--r-- | common/pipeline.cpp | 14 | ||||
-rw-r--r-- | common/storage/entitystore.cpp | 32 | ||||
-rw-r--r-- | common/storage/entitystore.h | 4 | ||||
-rw-r--r-- | tests/pipelinetest.cpp | 65 |
6 files changed, 106 insertions, 21 deletions
diff --git a/common/entitybuffer.cpp b/common/entitybuffer.cpp index 32583cc..fa33dcc 100644 --- a/common/entitybuffer.cpp +++ b/common/entitybuffer.cpp | |||
@@ -32,7 +32,7 @@ const Sink::Entity &EntityBuffer::entity() const | |||
32 | return *mEntity; | 32 | return *mEntity; |
33 | } | 33 | } |
34 | 34 | ||
35 | const uint8_t *EntityBuffer::resourceBuffer() | 35 | const uint8_t *EntityBuffer::resourceBuffer() const |
36 | { | 36 | { |
37 | if (!mEntity) { | 37 | if (!mEntity) { |
38 | qDebug() << "no buffer"; | 38 | qDebug() << "no buffer"; |
@@ -41,7 +41,7 @@ const uint8_t *EntityBuffer::resourceBuffer() | |||
41 | return mEntity->resource()->Data(); | 41 | return mEntity->resource()->Data(); |
42 | } | 42 | } |
43 | 43 | ||
44 | const uint8_t *EntityBuffer::metadataBuffer() | 44 | const uint8_t *EntityBuffer::metadataBuffer() const |
45 | { | 45 | { |
46 | if (!mEntity) { | 46 | if (!mEntity) { |
47 | return nullptr; | 47 | return nullptr; |
@@ -49,7 +49,7 @@ const uint8_t *EntityBuffer::metadataBuffer() | |||
49 | return mEntity->metadata()->Data(); | 49 | return mEntity->metadata()->Data(); |
50 | } | 50 | } |
51 | 51 | ||
52 | const uint8_t *EntityBuffer::localBuffer() | 52 | const uint8_t *EntityBuffer::localBuffer() const |
53 | { | 53 | { |
54 | if (!mEntity) { | 54 | if (!mEntity) { |
55 | return nullptr; | 55 | return nullptr; |
diff --git a/common/entitybuffer.h b/common/entitybuffer.h index d73a138..31e6aed 100644 --- a/common/entitybuffer.h +++ b/common/entitybuffer.h | |||
@@ -25,9 +25,9 @@ public: | |||
25 | * Note that @param data will need to remain valid and the data is not copied. | 25 | * Note that @param data will need to remain valid and the data is not copied. |
26 | */ | 26 | */ |
27 | EntityBuffer(const QByteArray &data); | 27 | EntityBuffer(const QByteArray &data); |
28 | const uint8_t *resourceBuffer(); | 28 | const uint8_t *resourceBuffer() const; |
29 | const uint8_t *metadataBuffer(); | 29 | const uint8_t *metadataBuffer() const; |
30 | const uint8_t *localBuffer(); | 30 | const uint8_t *localBuffer() const; |
31 | const Entity &entity() const; | 31 | const Entity &entity() const; |
32 | bool isValid() const; | 32 | bool isValid() const; |
33 | 33 | ||
diff --git a/common/pipeline.cpp b/common/pipeline.cpp index 4afe9f3..cbdc91d 100644 --- a/common/pipeline.cpp +++ b/common/pipeline.cpp | |||
@@ -258,7 +258,19 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size) | |||
258 | return KAsync::error<qint64>(0); | 258 | return KAsync::error<qint64>(0); |
259 | } | 259 | } |
260 | 260 | ||
261 | auto newEntity = d->entityStore.applyDiff(bufferType, current, diff, deletions); | 261 | //We avoid overwriting local changes that haven't been played back yet with remote modifications |
262 | QSet<QByteArray> excludeProperties; | ||
263 | if (!replayToSource) { //We assume this means the change is coming from the source already | ||
264 | d->entityStore.readRevisions(bufferType, diff.identifier(), baseRevision, [&] (const QByteArray &uid, qint64 revision, const Sink::EntityBuffer &entity) { | ||
265 | if (entity.metadataBuffer()) { | ||
266 | if (auto metadata = GetMetadata(entity.metadataBuffer())) { | ||
267 | excludeProperties += BufferUtils::fromVector(*metadata->modifiedProperties()).toSet(); | ||
268 | } | ||
269 | } | ||
270 | }); | ||
271 | } | ||
272 | |||
273 | auto newEntity = d->entityStore.applyDiff(bufferType, current, diff, deletions, excludeProperties); | ||
262 | 274 | ||
263 | bool isMove = false; | 275 | bool isMove = false; |
264 | if (modifyEntity->targetResource()) { | 276 | if (modifyEntity->targetResource()) { |
diff --git a/common/storage/entitystore.cpp b/common/storage/entitystore.cpp index c5b5ffc..dd6bbf0 100644 --- a/common/storage/entitystore.cpp +++ b/common/storage/entitystore.cpp | |||
@@ -247,22 +247,26 @@ bool EntityStore::add(const QByteArray &type, ApplicationDomainType entity, bool | |||
247 | return true; | 247 | return true; |
248 | } | 248 | } |
249 | 249 | ||
250 | ApplicationDomain::ApplicationDomainType EntityStore::applyDiff(const QByteArray &type, const ApplicationDomainType ¤t, const ApplicationDomainType &diff, const QByteArrayList &deletions) const | 250 | ApplicationDomain::ApplicationDomainType EntityStore::applyDiff(const QByteArray &type, const ApplicationDomainType ¤t, const ApplicationDomainType &diff, const QByteArrayList &deletions, const QSet<QByteArray> &excludeProperties) const |
251 | { | 251 | { |
252 | SinkTraceCtx(d->logCtx) << "Applying diff: " << current.availableProperties() << "Deletions: " << deletions << "Changeset: " << diff.changedProperties(); | 252 | SinkTraceCtx(d->logCtx) << "Applying diff: " << current.availableProperties() << "Deletions: " << deletions << "Changeset: " << diff.changedProperties() << "Excluded: " << excludeProperties; |
253 | auto newEntity = *ApplicationDomainType::getInMemoryRepresentation<ApplicationDomainType>(current, current.availableProperties()); | 253 | auto newEntity = *ApplicationDomainType::getInMemoryRepresentation<ApplicationDomainType>(current, current.availableProperties()); |
254 | 254 | ||
255 | // Apply diff | 255 | // Apply diff |
256 | for (const auto &property : diff.changedProperties()) { | 256 | for (const auto &property : diff.changedProperties()) { |
257 | const auto value = diff.getProperty(property); | 257 | if (!excludeProperties.contains(property)) { |
258 | if (value.isValid()) { | 258 | const auto value = diff.getProperty(property); |
259 | newEntity.setProperty(property, value); | 259 | if (value.isValid()) { |
260 | newEntity.setProperty(property, value); | ||
261 | } | ||
260 | } | 262 | } |
261 | } | 263 | } |
262 | 264 | ||
263 | // Remove deletions | 265 | // Remove deletions |
264 | for (const auto &property : deletions) { | 266 | for (const auto &property : deletions) { |
265 | newEntity.setProperty(property, QVariant()); | 267 | if (!excludeProperties.contains(property)) { |
268 | newEntity.setProperty(property, QVariant()); | ||
269 | } | ||
266 | } | 270 | } |
267 | return newEntity; | 271 | return newEntity; |
268 | } | 272 | } |
@@ -639,6 +643,22 @@ bool EntityStore::exists(const QByteArray &type, const QByteArray &uid) | |||
639 | return true; | 643 | return true; |
640 | } | 644 | } |
641 | 645 | ||
646 | void EntityStore::readRevisions(const QByteArray &type, const QByteArray &uid, qint64 startingRevision, const std::function<void(const QByteArray &uid, qint64 revision, const EntityBuffer &entity)> callback) | ||
647 | { | ||
648 | Q_ASSERT(d); | ||
649 | Q_ASSERT(!uid.isEmpty()); | ||
650 | DataStore::mainDatabase(d->transaction, type) | ||
651 | .scan(uid, | ||
652 | [&](const QByteArray &key, const QByteArray &value) -> bool { | ||
653 | const auto revision = DataStore::revisionFromKey(key); | ||
654 | if (revision >= startingRevision) { | ||
655 | callback(DataStore::uidFromKey(key), revision, Sink::EntityBuffer(value.data(), value.size())); | ||
656 | } | ||
657 | return true; | ||
658 | }, | ||
659 | [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Error while reading: " << error.message; }, true); | ||
660 | |||
661 | } | ||
642 | 662 | ||
643 | qint64 EntityStore::maxRevision() | 663 | qint64 EntityStore::maxRevision() |
644 | { | 664 | { |
diff --git a/common/storage/entitystore.h b/common/storage/entitystore.h index ffa70b9..69de76c 100644 --- a/common/storage/entitystore.h +++ b/common/storage/entitystore.h | |||
@@ -49,7 +49,7 @@ public: | |||
49 | bool modify(const QByteArray &type, const ApplicationDomainType ¤t, ApplicationDomainType newEntity, bool replayToSource); | 49 | bool modify(const QByteArray &type, const ApplicationDomainType ¤t, ApplicationDomainType newEntity, bool replayToSource); |
50 | bool remove(const QByteArray &type, const ApplicationDomainType ¤t, bool replayToSource); | 50 | bool remove(const QByteArray &type, const ApplicationDomainType ¤t, bool replayToSource); |
51 | bool cleanupRevisions(qint64 revision); | 51 | bool cleanupRevisions(qint64 revision); |
52 | ApplicationDomainType applyDiff(const QByteArray &type, const ApplicationDomainType ¤t, const ApplicationDomainType &diff, const QByteArrayList &deletions) const; | 52 | ApplicationDomainType applyDiff(const QByteArray &type, const ApplicationDomainType ¤t, const ApplicationDomainType &diff, const QByteArrayList &deletions, const QSet<QByteArray> &excludeProperties = {}) const; |
53 | 53 | ||
54 | void startTransaction(Sink::Storage::DataStore::AccessMode); | 54 | void startTransaction(Sink::Storage::DataStore::AccessMode); |
55 | void commitTransaction(); | 55 | void commitTransaction(); |
@@ -122,6 +122,8 @@ public: | |||
122 | ///Db contains entity and entity is not yet removed | 122 | ///Db contains entity and entity is not yet removed |
123 | bool exists(const QByteArray &type, const QByteArray &uid); | 123 | bool exists(const QByteArray &type, const QByteArray &uid); |
124 | 124 | ||
125 | void readRevisions(const QByteArray &type, const QByteArray &uid, qint64 baseRevision, const std::function<void(const QByteArray &uid, qint64 revision, const EntityBuffer &entity)> callback); | ||
126 | |||
125 | qint64 maxRevision(); | 127 | qint64 maxRevision(); |
126 | 128 | ||
127 | Sink::Log::Context logContext() const; | 129 | Sink::Log::Context logContext() const; |
diff --git a/tests/pipelinetest.cpp b/tests/pipelinetest.cpp index 0268ec5..45e2fbb 100644 --- a/tests/pipelinetest.cpp +++ b/tests/pipelinetest.cpp | |||
@@ -101,23 +101,25 @@ QByteArray createEntityCommand(const flatbuffers::FlatBufferBuilder &entityFbb) | |||
101 | return command; | 101 | return command; |
102 | } | 102 | } |
103 | 103 | ||
104 | QByteArray modifyEntityCommand(const flatbuffers::FlatBufferBuilder &entityFbb, const QByteArray &uid, qint64 revision) | 104 | QByteArray modifyEntityCommand(const flatbuffers::FlatBufferBuilder &entityFbb, const QByteArray &uid, qint64 revision, QStringList modifiedProperties = {"summary"}, bool replayToSource = true) |
105 | { | 105 | { |
106 | flatbuffers::FlatBufferBuilder fbb; | 106 | flatbuffers::FlatBufferBuilder fbb; |
107 | auto type = fbb.CreateString(Sink::ApplicationDomain::getTypeName<Sink::ApplicationDomain::Event>().toStdString().data()); | 107 | auto type = fbb.CreateString(Sink::ApplicationDomain::getTypeName<Sink::ApplicationDomain::Event>().toStdString().data()); |
108 | auto id = fbb.CreateString(std::string(uid.constData(), uid.size())); | 108 | auto id = fbb.CreateString(std::string(uid.constData(), uid.size())); |
109 | auto summaryProperty = fbb.CreateString("summary"); | 109 | std::vector<flatbuffers::Offset<flatbuffers::String>> modifiedVector; |
110 | std::vector<flatbuffers::Offset<flatbuffers::String>> modified; | 110 | for (const auto &modified : modifiedProperties) { |
111 | modified.push_back(summaryProperty); | 111 | modifiedVector.push_back(fbb.CreateString(modified.toStdString())); |
112 | } | ||
112 | auto delta = fbb.CreateVector<uint8_t>(entityFbb.GetBufferPointer(), entityFbb.GetSize()); | 113 | auto delta = fbb.CreateVector<uint8_t>(entityFbb.GetBufferPointer(), entityFbb.GetSize()); |
113 | auto modifiedProperties = fbb.CreateVector(modified); | 114 | auto modifiedPropertiesVector = fbb.CreateVector(modifiedVector); |
114 | // auto delta = Sink::EntityBuffer::appendAsVector(fbb, buffer.constData(), buffer.size()); | ||
115 | Sink::Commands::ModifyEntityBuilder builder(fbb); | 115 | Sink::Commands::ModifyEntityBuilder builder(fbb); |
116 | builder.add_domainType(type); | 116 | builder.add_domainType(type); |
117 | builder.add_delta(delta); | 117 | builder.add_delta(delta); |
118 | builder.add_revision(revision); | 118 | builder.add_revision(revision); |
119 | builder.add_entityId(id); | 119 | builder.add_entityId(id); |
120 | builder.add_modifiedProperties(modifiedProperties); | 120 | builder.add_modifiedProperties(modifiedPropertiesVector); |
121 | builder.add_replayToSource(replayToSource); | ||
122 | |||
121 | auto location = builder.Finish(); | 123 | auto location = builder.Finish(); |
122 | Sink::Commands::FinishModifyEntityBuffer(fbb, location); | 124 | Sink::Commands::FinishModifyEntityBuffer(fbb, location); |
123 | 125 | ||
@@ -401,6 +403,55 @@ private slots: | |||
401 | QCOMPARE(testProcessor->deletedSummaries.at(0), QByteArray("summary2")); | 403 | QCOMPARE(testProcessor->deletedSummaries.at(0), QByteArray("summary2")); |
402 | } | 404 | } |
403 | } | 405 | } |
406 | |||
407 | void testModifyWithConflict() | ||
408 | { | ||
409 | flatbuffers::FlatBufferBuilder entityFbb; | ||
410 | auto command = createEntityCommand(createEvent(entityFbb, "summary", "description")); | ||
411 | |||
412 | Sink::Pipeline pipeline(getContext(), {"test"}); | ||
413 | |||
414 | auto adaptorFactory = QSharedPointer<TestEventAdaptorFactory>::create(); | ||
415 | |||
416 | // Create the initial revision | ||
417 | pipeline.startTransaction(); | ||
418 | pipeline.newEntity(command.constData(), command.size()); | ||
419 | pipeline.commit(); | ||
420 | |||
421 | // Get uid of written entity | ||
422 | auto keys = getKeys(instanceIdentifier(), "event.main"); | ||
423 | QCOMPARE(keys.size(), 1); | ||
424 | const auto key = keys.first(); | ||
425 | const auto uid = Sink::Storage::DataStore::uidFromKey(key); | ||
426 | |||
427 | //Simulate local modification | ||
428 | { | ||
429 | entityFbb.Clear(); | ||
430 | auto modifyCommand = modifyEntityCommand(createEvent(entityFbb, "summaryLocal"), uid, 1, {"summary"}, true); | ||
431 | pipeline.startTransaction(); | ||
432 | pipeline.modifiedEntity(modifyCommand.constData(), modifyCommand.size()); | ||
433 | pipeline.commit(); | ||
434 | } | ||
435 | |||
436 | |||
437 | //Simulate remote modification | ||
438 | //We assume the remote modification is not overly smart and always marks all properties as changed. | ||
439 | { | ||
440 | entityFbb.Clear(); | ||
441 | auto modifyCommand = modifyEntityCommand(createEvent(entityFbb, "summaryRemote", "descriptionRemote"), uid, 2, {"summary", "description"}, false); | ||
442 | pipeline.startTransaction(); | ||
443 | pipeline.modifiedEntity(modifyCommand.constData(), modifyCommand.size()); | ||
444 | pipeline.commit(); | ||
445 | } | ||
446 | |||
447 | // Ensure we've got the new revision with the modification | ||
448 | auto buffer = getEntity(instanceIdentifier(), "event.main", Sink::Storage::DataStore::assembleKey(uid, 3)); | ||
449 | QVERIFY(!buffer.isEmpty()); | ||
450 | Sink::EntityBuffer entityBuffer(buffer.data(), buffer.size()); | ||
451 | auto adaptor = adaptorFactory->createAdaptor(entityBuffer.entity()); | ||
452 | QVERIFY2(adaptor->getProperty("summary").toString() == QString("summaryLocal"), "The local modification was reverted."); | ||
453 | QVERIFY2(adaptor->getProperty("description").toString() == QString("descriptionRemote"), "The remote modification was not applied."); | ||
454 | } | ||
404 | }; | 455 | }; |
405 | 456 | ||
406 | QTEST_MAIN(PipelineTest) | 457 | QTEST_MAIN(PipelineTest) |