From da4b74e593f1b1262e83824cc499bd855f1b4e3e Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Tue, 26 Jun 2018 10:24:58 +0200 Subject: Avoid overwriting local changes with remote modifications. The case we ran into is the following: * Fetching the full payload and marking all messages of a thread as read happens simultaneously. * The local modification to mark as read gets immediately overwritten when the full payload arrives. * Eventually the modification gets replayed to the server though (and the reversal isn't because coming from the source), so on next sync the situation fixes itself. To be able to improve this we try to protect local modifications in that properties that have been modified since baseRevision (which currently isn't, but should be equal to the last to the server replayed revision) are not overwritten. This conflict resolution strategy thus always prefers local modifications. baseRevision is currently set to the current maximum revision of the store at the time when the resource creates the modification. --- common/entitybuffer.cpp | 6 ++-- common/entitybuffer.h | 6 ++-- common/pipeline.cpp | 14 ++++++++- common/storage/entitystore.cpp | 32 +++++++++++++++++---- common/storage/entitystore.h | 4 ++- tests/pipelinetest.cpp | 65 +++++++++++++++++++++++++++++++++++++----- 6 files changed, 106 insertions(+), 21 deletions(-) diff --git a/common/entitybuffer.cpp b/common/entitybuffer.cpp index 32583cc..fa33dcc 100644 --- a/common/entitybuffer.cpp +++ b/common/entitybuffer.cpp @@ -32,7 +32,7 @@ const Sink::Entity &EntityBuffer::entity() const return *mEntity; } -const uint8_t *EntityBuffer::resourceBuffer() +const uint8_t *EntityBuffer::resourceBuffer() const { if (!mEntity) { qDebug() << "no buffer"; @@ -41,7 +41,7 @@ const uint8_t *EntityBuffer::resourceBuffer() return mEntity->resource()->Data(); } -const uint8_t *EntityBuffer::metadataBuffer() +const uint8_t *EntityBuffer::metadataBuffer() const { if (!mEntity) { return nullptr; @@ -49,7 +49,7 @@ const uint8_t *EntityBuffer::metadataBuffer() return mEntity->metadata()->Data(); } -const uint8_t *EntityBuffer::localBuffer() +const uint8_t *EntityBuffer::localBuffer() const { if (!mEntity) { return nullptr; diff --git a/common/entitybuffer.h b/common/entitybuffer.h index d73a138..31e6aed 100644 --- a/common/entitybuffer.h +++ b/common/entitybuffer.h @@ -25,9 +25,9 @@ public: * Note that @param data will need to remain valid and the data is not copied. */ EntityBuffer(const QByteArray &data); - const uint8_t *resourceBuffer(); - const uint8_t *metadataBuffer(); - const uint8_t *localBuffer(); + const uint8_t *resourceBuffer() const; + const uint8_t *metadataBuffer() const; + const uint8_t *localBuffer() const; const Entity &entity() const; bool isValid() const; diff --git a/common/pipeline.cpp b/common/pipeline.cpp index 4afe9f3..cbdc91d 100644 --- a/common/pipeline.cpp +++ b/common/pipeline.cpp @@ -258,7 +258,19 @@ KAsync::Job Pipeline::modifiedEntity(void const *command, size_t size) return KAsync::error(0); } - auto newEntity = d->entityStore.applyDiff(bufferType, current, diff, deletions); + //We avoid overwriting local changes that haven't been played back yet with remote modifications + QSet excludeProperties; + if (!replayToSource) { //We assume this means the change is coming from the source already + d->entityStore.readRevisions(bufferType, diff.identifier(), baseRevision, [&] (const QByteArray &uid, qint64 revision, const Sink::EntityBuffer &entity) { + if (entity.metadataBuffer()) { + if (auto metadata = GetMetadata(entity.metadataBuffer())) { + excludeProperties += BufferUtils::fromVector(*metadata->modifiedProperties()).toSet(); + } + } + }); + } + + auto newEntity = d->entityStore.applyDiff(bufferType, current, diff, deletions, excludeProperties); bool isMove = false; if (modifyEntity->targetResource()) { diff --git a/common/storage/entitystore.cpp b/common/storage/entitystore.cpp index c5b5ffc..dd6bbf0 100644 --- a/common/storage/entitystore.cpp +++ b/common/storage/entitystore.cpp @@ -247,22 +247,26 @@ bool EntityStore::add(const QByteArray &type, ApplicationDomainType entity, bool return true; } -ApplicationDomain::ApplicationDomainType EntityStore::applyDiff(const QByteArray &type, const ApplicationDomainType ¤t, const ApplicationDomainType &diff, const QByteArrayList &deletions) const +ApplicationDomain::ApplicationDomainType EntityStore::applyDiff(const QByteArray &type, const ApplicationDomainType ¤t, const ApplicationDomainType &diff, const QByteArrayList &deletions, const QSet &excludeProperties) const { - SinkTraceCtx(d->logCtx) << "Applying diff: " << current.availableProperties() << "Deletions: " << deletions << "Changeset: " << diff.changedProperties(); + SinkTraceCtx(d->logCtx) << "Applying diff: " << current.availableProperties() << "Deletions: " << deletions << "Changeset: " << diff.changedProperties() << "Excluded: " << excludeProperties; auto newEntity = *ApplicationDomainType::getInMemoryRepresentation(current, current.availableProperties()); // Apply diff for (const auto &property : diff.changedProperties()) { - const auto value = diff.getProperty(property); - if (value.isValid()) { - newEntity.setProperty(property, value); + if (!excludeProperties.contains(property)) { + const auto value = diff.getProperty(property); + if (value.isValid()) { + newEntity.setProperty(property, value); + } } } // Remove deletions for (const auto &property : deletions) { - newEntity.setProperty(property, QVariant()); + if (!excludeProperties.contains(property)) { + newEntity.setProperty(property, QVariant()); + } } return newEntity; } @@ -639,6 +643,22 @@ bool EntityStore::exists(const QByteArray &type, const QByteArray &uid) return true; } +void EntityStore::readRevisions(const QByteArray &type, const QByteArray &uid, qint64 startingRevision, const std::function callback) +{ + Q_ASSERT(d); + Q_ASSERT(!uid.isEmpty()); + DataStore::mainDatabase(d->transaction, type) + .scan(uid, + [&](const QByteArray &key, const QByteArray &value) -> bool { + const auto revision = DataStore::revisionFromKey(key); + if (revision >= startingRevision) { + callback(DataStore::uidFromKey(key), revision, Sink::EntityBuffer(value.data(), value.size())); + } + return true; + }, + [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Error while reading: " << error.message; }, true); + +} qint64 EntityStore::maxRevision() { diff --git a/common/storage/entitystore.h b/common/storage/entitystore.h index ffa70b9..69de76c 100644 --- a/common/storage/entitystore.h +++ b/common/storage/entitystore.h @@ -49,7 +49,7 @@ public: bool modify(const QByteArray &type, const ApplicationDomainType ¤t, ApplicationDomainType newEntity, bool replayToSource); bool remove(const QByteArray &type, const ApplicationDomainType ¤t, bool replayToSource); bool cleanupRevisions(qint64 revision); - ApplicationDomainType applyDiff(const QByteArray &type, const ApplicationDomainType ¤t, const ApplicationDomainType &diff, const QByteArrayList &deletions) const; + ApplicationDomainType applyDiff(const QByteArray &type, const ApplicationDomainType ¤t, const ApplicationDomainType &diff, const QByteArrayList &deletions, const QSet &excludeProperties = {}) const; void startTransaction(Sink::Storage::DataStore::AccessMode); void commitTransaction(); @@ -122,6 +122,8 @@ public: ///Db contains entity and entity is not yet removed bool exists(const QByteArray &type, const QByteArray &uid); + void readRevisions(const QByteArray &type, const QByteArray &uid, qint64 baseRevision, const std::function callback); + qint64 maxRevision(); Sink::Log::Context logContext() const; diff --git a/tests/pipelinetest.cpp b/tests/pipelinetest.cpp index 0268ec5..45e2fbb 100644 --- a/tests/pipelinetest.cpp +++ b/tests/pipelinetest.cpp @@ -101,23 +101,25 @@ QByteArray createEntityCommand(const flatbuffers::FlatBufferBuilder &entityFbb) return command; } -QByteArray modifyEntityCommand(const flatbuffers::FlatBufferBuilder &entityFbb, const QByteArray &uid, qint64 revision) +QByteArray modifyEntityCommand(const flatbuffers::FlatBufferBuilder &entityFbb, const QByteArray &uid, qint64 revision, QStringList modifiedProperties = {"summary"}, bool replayToSource = true) { flatbuffers::FlatBufferBuilder fbb; auto type = fbb.CreateString(Sink::ApplicationDomain::getTypeName().toStdString().data()); auto id = fbb.CreateString(std::string(uid.constData(), uid.size())); - auto summaryProperty = fbb.CreateString("summary"); - std::vector> modified; - modified.push_back(summaryProperty); + std::vector> modifiedVector; + for (const auto &modified : modifiedProperties) { + modifiedVector.push_back(fbb.CreateString(modified.toStdString())); + } auto delta = fbb.CreateVector(entityFbb.GetBufferPointer(), entityFbb.GetSize()); - auto modifiedProperties = fbb.CreateVector(modified); - // auto delta = Sink::EntityBuffer::appendAsVector(fbb, buffer.constData(), buffer.size()); + auto modifiedPropertiesVector = fbb.CreateVector(modifiedVector); Sink::Commands::ModifyEntityBuilder builder(fbb); builder.add_domainType(type); builder.add_delta(delta); builder.add_revision(revision); builder.add_entityId(id); - builder.add_modifiedProperties(modifiedProperties); + builder.add_modifiedProperties(modifiedPropertiesVector); + builder.add_replayToSource(replayToSource); + auto location = builder.Finish(); Sink::Commands::FinishModifyEntityBuffer(fbb, location); @@ -401,6 +403,55 @@ private slots: QCOMPARE(testProcessor->deletedSummaries.at(0), QByteArray("summary2")); } } + + void testModifyWithConflict() + { + flatbuffers::FlatBufferBuilder entityFbb; + auto command = createEntityCommand(createEvent(entityFbb, "summary", "description")); + + Sink::Pipeline pipeline(getContext(), {"test"}); + + auto adaptorFactory = QSharedPointer::create(); + + // Create the initial revision + pipeline.startTransaction(); + pipeline.newEntity(command.constData(), command.size()); + pipeline.commit(); + + // Get uid of written entity + auto keys = getKeys(instanceIdentifier(), "event.main"); + QCOMPARE(keys.size(), 1); + const auto key = keys.first(); + const auto uid = Sink::Storage::DataStore::uidFromKey(key); + + //Simulate local modification + { + entityFbb.Clear(); + auto modifyCommand = modifyEntityCommand(createEvent(entityFbb, "summaryLocal"), uid, 1, {"summary"}, true); + pipeline.startTransaction(); + pipeline.modifiedEntity(modifyCommand.constData(), modifyCommand.size()); + pipeline.commit(); + } + + + //Simulate remote modification + //We assume the remote modification is not overly smart and always marks all properties as changed. + { + entityFbb.Clear(); + auto modifyCommand = modifyEntityCommand(createEvent(entityFbb, "summaryRemote", "descriptionRemote"), uid, 2, {"summary", "description"}, false); + pipeline.startTransaction(); + pipeline.modifiedEntity(modifyCommand.constData(), modifyCommand.size()); + pipeline.commit(); + } + + // Ensure we've got the new revision with the modification + auto buffer = getEntity(instanceIdentifier(), "event.main", Sink::Storage::DataStore::assembleKey(uid, 3)); + QVERIFY(!buffer.isEmpty()); + Sink::EntityBuffer entityBuffer(buffer.data(), buffer.size()); + auto adaptor = adaptorFactory->createAdaptor(entityBuffer.entity()); + QVERIFY2(adaptor->getProperty("summary").toString() == QString("summaryLocal"), "The local modification was reverted."); + QVERIFY2(adaptor->getProperty("description").toString() == QString("descriptionRemote"), "The remote modification was not applied."); + } }; QTEST_MAIN(PipelineTest) -- cgit v1.2.3