diff options
-rw-r--r-- | common/entitybuffer.cpp | 6 | ||||
-rw-r--r-- | common/entitybuffer.h | 6 | ||||
-rw-r--r-- | common/pipeline.cpp | 14 | ||||
-rw-r--r-- | common/storage/entitystore.cpp | 32 | ||||
-rw-r--r-- | common/storage/entitystore.h | 4 | ||||
-rw-r--r-- | tests/pipelinetest.cpp | 65 |
6 files changed, 106 insertions, 21 deletions
diff --git a/common/entitybuffer.cpp b/common/entitybuffer.cpp index 32583cc..fa33dcc 100644 --- a/common/entitybuffer.cpp +++ b/common/entitybuffer.cpp | |||
@@ -32,7 +32,7 @@ const Sink::Entity &EntityBuffer::entity() const | |||
32 | return *mEntity; | 32 | return *mEntity; |
33 | } | 33 | } |
34 | 34 | ||
35 | const uint8_t *EntityBuffer::resourceBuffer() | 35 | const uint8_t *EntityBuffer::resourceBuffer() const |
36 | { | 36 | { |
37 | if (!mEntity) { | 37 | if (!mEntity) { |
38 | qDebug() << "no buffer"; | 38 | qDebug() << "no buffer"; |
@@ -41,7 +41,7 @@ const uint8_t *EntityBuffer::resourceBuffer() | |||
41 | return mEntity->resource()->Data(); | 41 | return mEntity->resource()->Data(); |
42 | } | 42 | } |
43 | 43 | ||
44 | const uint8_t *EntityBuffer::metadataBuffer() | 44 | const uint8_t *EntityBuffer::metadataBuffer() const |
45 | { | 45 | { |
46 | if (!mEntity) { | 46 | if (!mEntity) { |
47 | return nullptr; | 47 | return nullptr; |
@@ -49,7 +49,7 @@ const uint8_t *EntityBuffer::metadataBuffer() | |||
49 | return mEntity->metadata()->Data(); | 49 | return mEntity->metadata()->Data(); |
50 | } | 50 | } |
51 | 51 | ||
52 | const uint8_t *EntityBuffer::localBuffer() | 52 | const uint8_t *EntityBuffer::localBuffer() const |
53 | { | 53 | { |
54 | if (!mEntity) { | 54 | if (!mEntity) { |
55 | return nullptr; | 55 | return nullptr; |
diff --git a/common/entitybuffer.h b/common/entitybuffer.h index d73a138..31e6aed 100644 --- a/common/entitybuffer.h +++ b/common/entitybuffer.h | |||
@@ -25,9 +25,9 @@ public: | |||
25 | * Note that @param data will need to remain valid and the data is not copied. | 25 | * Note that @param data will need to remain valid and the data is not copied. |
26 | */ | 26 | */ |
27 | EntityBuffer(const QByteArray &data); | 27 | EntityBuffer(const QByteArray &data); |
28 | const uint8_t *resourceBuffer(); | 28 | const uint8_t *resourceBuffer() const; |
29 | const uint8_t *metadataBuffer(); | 29 | const uint8_t *metadataBuffer() const; |
30 | const uint8_t *localBuffer(); | 30 | const uint8_t *localBuffer() const; |
31 | const Entity &entity() const; | 31 | const Entity &entity() const; |
32 | bool isValid() const; | 32 | bool isValid() const; |
33 | 33 | ||
diff --git a/common/pipeline.cpp b/common/pipeline.cpp index 4afe9f3..cbdc91d 100644 --- a/common/pipeline.cpp +++ b/common/pipeline.cpp | |||
@@ -258,7 +258,19 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size) | |||
258 | return KAsync::error<qint64>(0); | 258 | return KAsync::error<qint64>(0); |
259 | } | 259 | } |
260 | 260 | ||
261 | auto newEntity = d->entityStore.applyDiff(bufferType, current, diff, deletions); | 261 | //We avoid overwriting local changes that haven't been played back yet with remote modifications |
262 | QSet<QByteArray> excludeProperties; | ||
263 | if (!replayToSource) { //We assume this means the change is coming from the source already | ||
264 | d->entityStore.readRevisions(bufferType, diff.identifier(), baseRevision, [&] (const QByteArray &uid, qint64 revision, const Sink::EntityBuffer &entity) { | ||
265 | if (entity.metadataBuffer()) { | ||
266 | if (auto metadata = GetMetadata(entity.metadataBuffer())) { | ||
267 | excludeProperties += BufferUtils::fromVector(*metadata->modifiedProperties()).toSet(); | ||
268 | } | ||
269 | } | ||
270 | }); | ||
271 | } | ||
272 | |||
273 | auto newEntity = d->entityStore.applyDiff(bufferType, current, diff, deletions, excludeProperties); | ||
262 | 274 | ||
263 | bool isMove = false; | 275 | bool isMove = false; |
264 | if (modifyEntity->targetResource()) { | 276 | if (modifyEntity->targetResource()) { |
diff --git a/common/storage/entitystore.cpp b/common/storage/entitystore.cpp index c5b5ffc..dd6bbf0 100644 --- a/common/storage/entitystore.cpp +++ b/common/storage/entitystore.cpp | |||
@@ -247,22 +247,26 @@ bool EntityStore::add(const QByteArray &type, ApplicationDomainType entity, bool | |||
247 | return true; | 247 | return true; |
248 | } | 248 | } |
249 | 249 | ||
250 | ApplicationDomain::ApplicationDomainType EntityStore::applyDiff(const QByteArray &type, const ApplicationDomainType ¤t, const ApplicationDomainType &diff, const QByteArrayList &deletions) const | 250 | ApplicationDomain::ApplicationDomainType EntityStore::applyDiff(const QByteArray &type, const ApplicationDomainType ¤t, const ApplicationDomainType &diff, const QByteArrayList &deletions, const QSet<QByteArray> &excludeProperties) const |
251 | { | 251 | { |
252 | SinkTraceCtx(d->logCtx) << "Applying diff: " << current.availableProperties() << "Deletions: " << deletions << "Changeset: " << diff.changedProperties(); | 252 | SinkTraceCtx(d->logCtx) << "Applying diff: " << current.availableProperties() << "Deletions: " << deletions << "Changeset: " << diff.changedProperties() << "Excluded: " << excludeProperties; |
253 | auto newEntity = *ApplicationDomainType::getInMemoryRepresentation<ApplicationDomainType>(current, current.availableProperties()); | 253 | auto newEntity = *ApplicationDomainType::getInMemoryRepresentation<ApplicationDomainType>(current, current.availableProperties()); |
254 | 254 | ||
255 | // Apply diff | 255 | // Apply diff |
256 | for (const auto &property : diff.changedProperties()) { | 256 | for (const auto &property : diff.changedProperties()) { |
257 | const auto value = diff.getProperty(property); | 257 | if (!excludeProperties.contains(property)) { |
258 | if (value.isValid()) { | 258 | const auto value = diff.getProperty(property); |
259 | newEntity.setProperty(property, value); | 259 | if (value.isValid()) { |
260 | newEntity.setProperty(property, value); | ||
261 | } | ||
260 | } | 262 | } |
261 | } | 263 | } |
262 | 264 | ||
263 | // Remove deletions | 265 | // Remove deletions |
264 | for (const auto &property : deletions) { | 266 | for (const auto &property : deletions) { |
265 | newEntity.setProperty(property, QVariant()); | 267 | if (!excludeProperties.contains(property)) { |
268 | newEntity.setProperty(property, QVariant()); | ||
269 | } | ||
266 | } | 270 | } |
267 | return newEntity; | 271 | return newEntity; |
268 | } | 272 | } |
@@ -639,6 +643,22 @@ bool EntityStore::exists(const QByteArray &type, const QByteArray &uid) | |||
639 | return true; | 643 | return true; |
640 | } | 644 | } |
641 | 645 | ||
646 | void EntityStore::readRevisions(const QByteArray &type, const QByteArray &uid, qint64 startingRevision, const std::function<void(const QByteArray &uid, qint64 revision, const EntityBuffer &entity)> callback) | ||
647 | { | ||
648 | Q_ASSERT(d); | ||
649 | Q_ASSERT(!uid.isEmpty()); | ||
650 | DataStore::mainDatabase(d->transaction, type) | ||
651 | .scan(uid, | ||
652 | [&](const QByteArray &key, const QByteArray &value) -> bool { | ||
653 | const auto revision = DataStore::revisionFromKey(key); | ||
654 | if (revision >= startingRevision) { | ||
655 | callback(DataStore::uidFromKey(key), revision, Sink::EntityBuffer(value.data(), value.size())); | ||
656 | } | ||
657 | return true; | ||
658 | }, | ||
659 | [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Error while reading: " << error.message; }, true); | ||
660 | |||
661 | } | ||
642 | 662 | ||
643 | qint64 EntityStore::maxRevision() | 663 | qint64 EntityStore::maxRevision() |
644 | { | 664 | { |
diff --git a/common/storage/entitystore.h b/common/storage/entitystore.h index ffa70b9..69de76c 100644 --- a/common/storage/entitystore.h +++ b/common/storage/entitystore.h | |||
@@ -49,7 +49,7 @@ public: | |||
49 | bool modify(const QByteArray &type, const ApplicationDomainType ¤t, ApplicationDomainType newEntity, bool replayToSource); | 49 | bool modify(const QByteArray &type, const ApplicationDomainType ¤t, ApplicationDomainType newEntity, bool replayToSource); |
50 | bool remove(const QByteArray &type, const ApplicationDomainType ¤t, bool replayToSource); | 50 | bool remove(const QByteArray &type, const ApplicationDomainType ¤t, bool replayToSource); |
51 | bool cleanupRevisions(qint64 revision); | 51 | bool cleanupRevisions(qint64 revision); |
52 | ApplicationDomainType applyDiff(const QByteArray &type, const ApplicationDomainType ¤t, const ApplicationDomainType &diff, const QByteArrayList &deletions) const; | 52 | ApplicationDomainType applyDiff(const QByteArray &type, const ApplicationDomainType ¤t, const ApplicationDomainType &diff, const QByteArrayList &deletions, const QSet<QByteArray> &excludeProperties = {}) const; |
53 | 53 | ||
54 | void startTransaction(Sink::Storage::DataStore::AccessMode); | 54 | void startTransaction(Sink::Storage::DataStore::AccessMode); |
55 | void commitTransaction(); | 55 | void commitTransaction(); |
@@ -122,6 +122,8 @@ public: | |||
122 | ///Db contains entity and entity is not yet removed | 122 | ///Db contains entity and entity is not yet removed |
123 | bool exists(const QByteArray &type, const QByteArray &uid); | 123 | bool exists(const QByteArray &type, const QByteArray &uid); |
124 | 124 | ||
125 | void readRevisions(const QByteArray &type, const QByteArray &uid, qint64 baseRevision, const std::function<void(const QByteArray &uid, qint64 revision, const EntityBuffer &entity)> callback); | ||
126 | |||
125 | qint64 maxRevision(); | 127 | qint64 maxRevision(); |
126 | 128 | ||
127 | Sink::Log::Context logContext() const; | 129 | Sink::Log::Context logContext() const; |
diff --git a/tests/pipelinetest.cpp b/tests/pipelinetest.cpp index 0268ec5..45e2fbb 100644 --- a/tests/pipelinetest.cpp +++ b/tests/pipelinetest.cpp | |||
@@ -101,23 +101,25 @@ QByteArray createEntityCommand(const flatbuffers::FlatBufferBuilder &entityFbb) | |||
101 | return command; | 101 | return command; |
102 | } | 102 | } |
103 | 103 | ||
104 | QByteArray modifyEntityCommand(const flatbuffers::FlatBufferBuilder &entityFbb, const QByteArray &uid, qint64 revision) | 104 | QByteArray modifyEntityCommand(const flatbuffers::FlatBufferBuilder &entityFbb, const QByteArray &uid, qint64 revision, QStringList modifiedProperties = {"summary"}, bool replayToSource = true) |
105 | { | 105 | { |
106 | flatbuffers::FlatBufferBuilder fbb; | 106 | flatbuffers::FlatBufferBuilder fbb; |
107 | auto type = fbb.CreateString(Sink::ApplicationDomain::getTypeName<Sink::ApplicationDomain::Event>().toStdString().data()); | 107 | auto type = fbb.CreateString(Sink::ApplicationDomain::getTypeName<Sink::ApplicationDomain::Event>().toStdString().data()); |
108 | auto id = fbb.CreateString(std::string(uid.constData(), uid.size())); | 108 | auto id = fbb.CreateString(std::string(uid.constData(), uid.size())); |
109 | auto summaryProperty = fbb.CreateString("summary"); | 109 | std::vector<flatbuffers::Offset<flatbuffers::String>> modifiedVector; |
110 | std::vector<flatbuffers::Offset<flatbuffers::String>> modified; | 110 | for (const auto &modified : modifiedProperties) { |
111 | modified.push_back(summaryProperty); | 111 | modifiedVector.push_back(fbb.CreateString(modified.toStdString())); |
112 | } | ||
112 | auto delta = fbb.CreateVector<uint8_t>(entityFbb.GetBufferPointer(), entityFbb.GetSize()); | 113 | auto delta = fbb.CreateVector<uint8_t>(entityFbb.GetBufferPointer(), entityFbb.GetSize()); |
113 | auto modifiedProperties = fbb.CreateVector(modified); | 114 | auto modifiedPropertiesVector = fbb.CreateVector(modifiedVector); |
114 | // auto delta = Sink::EntityBuffer::appendAsVector(fbb, buffer.constData(), buffer.size()); | ||
115 | Sink::Commands::ModifyEntityBuilder builder(fbb); | 115 | Sink::Commands::ModifyEntityBuilder builder(fbb); |
116 | builder.add_domainType(type); | 116 | builder.add_domainType(type); |
117 | builder.add_delta(delta); | 117 | builder.add_delta(delta); |
118 | builder.add_revision(revision); | 118 | builder.add_revision(revision); |
119 | builder.add_entityId(id); | 119 | builder.add_entityId(id); |
120 | builder.add_modifiedProperties(modifiedProperties); | 120 | builder.add_modifiedProperties(modifiedPropertiesVector); |
121 | builder.add_replayToSource(replayToSource); | ||
122 | |||
121 | auto location = builder.Finish(); | 123 | auto location = builder.Finish(); |
122 | Sink::Commands::FinishModifyEntityBuffer(fbb, location); | 124 | Sink::Commands::FinishModifyEntityBuffer(fbb, location); |
123 | 125 | ||
@@ -401,6 +403,55 @@ private slots: | |||
401 | QCOMPARE(testProcessor->deletedSummaries.at(0), QByteArray("summary2")); | 403 | QCOMPARE(testProcessor->deletedSummaries.at(0), QByteArray("summary2")); |
402 | } | 404 | } |
403 | } | 405 | } |
406 | |||
407 | void testModifyWithConflict() | ||
408 | { | ||
409 | flatbuffers::FlatBufferBuilder entityFbb; | ||
410 | auto command = createEntityCommand(createEvent(entityFbb, "summary", "description")); | ||
411 | |||
412 | Sink::Pipeline pipeline(getContext(), {"test"}); | ||
413 | |||
414 | auto adaptorFactory = QSharedPointer<TestEventAdaptorFactory>::create(); | ||
415 | |||
416 | // Create the initial revision | ||
417 | pipeline.startTransaction(); | ||
418 | pipeline.newEntity(command.constData(), command.size()); | ||
419 | pipeline.commit(); | ||
420 | |||
421 | // Get uid of written entity | ||
422 | auto keys = getKeys(instanceIdentifier(), "event.main"); | ||
423 | QCOMPARE(keys.size(), 1); | ||
424 | const auto key = keys.first(); | ||
425 | const auto uid = Sink::Storage::DataStore::uidFromKey(key); | ||
426 | |||
427 | //Simulate local modification | ||
428 | { | ||
429 | entityFbb.Clear(); | ||
430 | auto modifyCommand = modifyEntityCommand(createEvent(entityFbb, "summaryLocal"), uid, 1, {"summary"}, true); | ||
431 | pipeline.startTransaction(); | ||
432 | pipeline.modifiedEntity(modifyCommand.constData(), modifyCommand.size()); | ||
433 | pipeline.commit(); | ||
434 | } | ||
435 | |||
436 | |||
437 | //Simulate remote modification | ||
438 | //We assume the remote modification is not overly smart and always marks all properties as changed. | ||
439 | { | ||
440 | entityFbb.Clear(); | ||
441 | auto modifyCommand = modifyEntityCommand(createEvent(entityFbb, "summaryRemote", "descriptionRemote"), uid, 2, {"summary", "description"}, false); | ||
442 | pipeline.startTransaction(); | ||
443 | pipeline.modifiedEntity(modifyCommand.constData(), modifyCommand.size()); | ||
444 | pipeline.commit(); | ||
445 | } | ||
446 | |||
447 | // Ensure we've got the new revision with the modification | ||
448 | auto buffer = getEntity(instanceIdentifier(), "event.main", Sink::Storage::DataStore::assembleKey(uid, 3)); | ||
449 | QVERIFY(!buffer.isEmpty()); | ||
450 | Sink::EntityBuffer entityBuffer(buffer.data(), buffer.size()); | ||
451 | auto adaptor = adaptorFactory->createAdaptor(entityBuffer.entity()); | ||
452 | QVERIFY2(adaptor->getProperty("summary").toString() == QString("summaryLocal"), "The local modification was reverted."); | ||
453 | QVERIFY2(adaptor->getProperty("description").toString() == QString("descriptionRemote"), "The remote modification was not applied."); | ||
454 | } | ||
404 | }; | 455 | }; |
405 | 456 | ||
406 | QTEST_MAIN(PipelineTest) | 457 | QTEST_MAIN(PipelineTest) |