summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--common/entitybuffer.cpp6
-rw-r--r--common/entitybuffer.h6
-rw-r--r--common/pipeline.cpp14
-rw-r--r--common/storage/entitystore.cpp32
-rw-r--r--common/storage/entitystore.h4
-rw-r--r--tests/pipelinetest.cpp65
6 files changed, 106 insertions, 21 deletions
diff --git a/common/entitybuffer.cpp b/common/entitybuffer.cpp
index 32583cc..fa33dcc 100644
--- a/common/entitybuffer.cpp
+++ b/common/entitybuffer.cpp
@@ -32,7 +32,7 @@ const Sink::Entity &EntityBuffer::entity() const
32 return *mEntity; 32 return *mEntity;
33} 33}
34 34
35const uint8_t *EntityBuffer::resourceBuffer() 35const uint8_t *EntityBuffer::resourceBuffer() const
36{ 36{
37 if (!mEntity) { 37 if (!mEntity) {
38 qDebug() << "no buffer"; 38 qDebug() << "no buffer";
@@ -41,7 +41,7 @@ const uint8_t *EntityBuffer::resourceBuffer()
41 return mEntity->resource()->Data(); 41 return mEntity->resource()->Data();
42} 42}
43 43
44const uint8_t *EntityBuffer::metadataBuffer() 44const uint8_t *EntityBuffer::metadataBuffer() const
45{ 45{
46 if (!mEntity) { 46 if (!mEntity) {
47 return nullptr; 47 return nullptr;
@@ -49,7 +49,7 @@ const uint8_t *EntityBuffer::metadataBuffer()
49 return mEntity->metadata()->Data(); 49 return mEntity->metadata()->Data();
50} 50}
51 51
52const uint8_t *EntityBuffer::localBuffer() 52const uint8_t *EntityBuffer::localBuffer() const
53{ 53{
54 if (!mEntity) { 54 if (!mEntity) {
55 return nullptr; 55 return nullptr;
diff --git a/common/entitybuffer.h b/common/entitybuffer.h
index d73a138..31e6aed 100644
--- a/common/entitybuffer.h
+++ b/common/entitybuffer.h
@@ -25,9 +25,9 @@ public:
25 * Note that @param data will need to remain valid and the data is not copied. 25 * Note that @param data will need to remain valid and the data is not copied.
26 */ 26 */
27 EntityBuffer(const QByteArray &data); 27 EntityBuffer(const QByteArray &data);
28 const uint8_t *resourceBuffer(); 28 const uint8_t *resourceBuffer() const;
29 const uint8_t *metadataBuffer(); 29 const uint8_t *metadataBuffer() const;
30 const uint8_t *localBuffer(); 30 const uint8_t *localBuffer() const;
31 const Entity &entity() const; 31 const Entity &entity() const;
32 bool isValid() const; 32 bool isValid() const;
33 33
diff --git a/common/pipeline.cpp b/common/pipeline.cpp
index 4afe9f3..cbdc91d 100644
--- a/common/pipeline.cpp
+++ b/common/pipeline.cpp
@@ -258,7 +258,19 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size)
258 return KAsync::error<qint64>(0); 258 return KAsync::error<qint64>(0);
259 } 259 }
260 260
261 auto newEntity = d->entityStore.applyDiff(bufferType, current, diff, deletions); 261 //We avoid overwriting local changes that haven't been played back yet with remote modifications
262 QSet<QByteArray> excludeProperties;
263 if (!replayToSource) { //We assume this means the change is coming from the source already
264 d->entityStore.readRevisions(bufferType, diff.identifier(), baseRevision, [&] (const QByteArray &uid, qint64 revision, const Sink::EntityBuffer &entity) {
265 if (entity.metadataBuffer()) {
266 if (auto metadata = GetMetadata(entity.metadataBuffer())) {
267 excludeProperties += BufferUtils::fromVector(*metadata->modifiedProperties()).toSet();
268 }
269 }
270 });
271 }
272
273 auto newEntity = d->entityStore.applyDiff(bufferType, current, diff, deletions, excludeProperties);
262 274
263 bool isMove = false; 275 bool isMove = false;
264 if (modifyEntity->targetResource()) { 276 if (modifyEntity->targetResource()) {
diff --git a/common/storage/entitystore.cpp b/common/storage/entitystore.cpp
index c5b5ffc..dd6bbf0 100644
--- a/common/storage/entitystore.cpp
+++ b/common/storage/entitystore.cpp
@@ -247,22 +247,26 @@ bool EntityStore::add(const QByteArray &type, ApplicationDomainType entity, bool
247 return true; 247 return true;
248} 248}
249 249
250ApplicationDomain::ApplicationDomainType EntityStore::applyDiff(const QByteArray &type, const ApplicationDomainType &current, const ApplicationDomainType &diff, const QByteArrayList &deletions) const 250ApplicationDomain::ApplicationDomainType EntityStore::applyDiff(const QByteArray &type, const ApplicationDomainType &current, const ApplicationDomainType &diff, const QByteArrayList &deletions, const QSet<QByteArray> &excludeProperties) const
251{ 251{
252 SinkTraceCtx(d->logCtx) << "Applying diff: " << current.availableProperties() << "Deletions: " << deletions << "Changeset: " << diff.changedProperties(); 252 SinkTraceCtx(d->logCtx) << "Applying diff: " << current.availableProperties() << "Deletions: " << deletions << "Changeset: " << diff.changedProperties() << "Excluded: " << excludeProperties;
253 auto newEntity = *ApplicationDomainType::getInMemoryRepresentation<ApplicationDomainType>(current, current.availableProperties()); 253 auto newEntity = *ApplicationDomainType::getInMemoryRepresentation<ApplicationDomainType>(current, current.availableProperties());
254 254
255 // Apply diff 255 // Apply diff
256 for (const auto &property : diff.changedProperties()) { 256 for (const auto &property : diff.changedProperties()) {
257 const auto value = diff.getProperty(property); 257 if (!excludeProperties.contains(property)) {
258 if (value.isValid()) { 258 const auto value = diff.getProperty(property);
259 newEntity.setProperty(property, value); 259 if (value.isValid()) {
260 newEntity.setProperty(property, value);
261 }
260 } 262 }
261 } 263 }
262 264
263 // Remove deletions 265 // Remove deletions
264 for (const auto &property : deletions) { 266 for (const auto &property : deletions) {
265 newEntity.setProperty(property, QVariant()); 267 if (!excludeProperties.contains(property)) {
268 newEntity.setProperty(property, QVariant());
269 }
266 } 270 }
267 return newEntity; 271 return newEntity;
268} 272}
@@ -639,6 +643,22 @@ bool EntityStore::exists(const QByteArray &type, const QByteArray &uid)
639 return true; 643 return true;
640} 644}
641 645
646void EntityStore::readRevisions(const QByteArray &type, const QByteArray &uid, qint64 startingRevision, const std::function<void(const QByteArray &uid, qint64 revision, const EntityBuffer &entity)> callback)
647{
648 Q_ASSERT(d);
649 Q_ASSERT(!uid.isEmpty());
650 DataStore::mainDatabase(d->transaction, type)
651 .scan(uid,
652 [&](const QByteArray &key, const QByteArray &value) -> bool {
653 const auto revision = DataStore::revisionFromKey(key);
654 if (revision >= startingRevision) {
655 callback(DataStore::uidFromKey(key), revision, Sink::EntityBuffer(value.data(), value.size()));
656 }
657 return true;
658 },
659 [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Error while reading: " << error.message; }, true);
660
661}
642 662
643qint64 EntityStore::maxRevision() 663qint64 EntityStore::maxRevision()
644{ 664{
diff --git a/common/storage/entitystore.h b/common/storage/entitystore.h
index ffa70b9..69de76c 100644
--- a/common/storage/entitystore.h
+++ b/common/storage/entitystore.h
@@ -49,7 +49,7 @@ public:
49 bool modify(const QByteArray &type, const ApplicationDomainType &current, ApplicationDomainType newEntity, bool replayToSource); 49 bool modify(const QByteArray &type, const ApplicationDomainType &current, ApplicationDomainType newEntity, bool replayToSource);
50 bool remove(const QByteArray &type, const ApplicationDomainType &current, bool replayToSource); 50 bool remove(const QByteArray &type, const ApplicationDomainType &current, bool replayToSource);
51 bool cleanupRevisions(qint64 revision); 51 bool cleanupRevisions(qint64 revision);
52 ApplicationDomainType applyDiff(const QByteArray &type, const ApplicationDomainType &current, const ApplicationDomainType &diff, const QByteArrayList &deletions) const; 52 ApplicationDomainType applyDiff(const QByteArray &type, const ApplicationDomainType &current, const ApplicationDomainType &diff, const QByteArrayList &deletions, const QSet<QByteArray> &excludeProperties = {}) const;
53 53
54 void startTransaction(Sink::Storage::DataStore::AccessMode); 54 void startTransaction(Sink::Storage::DataStore::AccessMode);
55 void commitTransaction(); 55 void commitTransaction();
@@ -122,6 +122,8 @@ public:
122 ///Db contains entity and entity is not yet removed 122 ///Db contains entity and entity is not yet removed
123 bool exists(const QByteArray &type, const QByteArray &uid); 123 bool exists(const QByteArray &type, const QByteArray &uid);
124 124
125 void readRevisions(const QByteArray &type, const QByteArray &uid, qint64 baseRevision, const std::function<void(const QByteArray &uid, qint64 revision, const EntityBuffer &entity)> callback);
126
125 qint64 maxRevision(); 127 qint64 maxRevision();
126 128
127 Sink::Log::Context logContext() const; 129 Sink::Log::Context logContext() const;
diff --git a/tests/pipelinetest.cpp b/tests/pipelinetest.cpp
index 0268ec5..45e2fbb 100644
--- a/tests/pipelinetest.cpp
+++ b/tests/pipelinetest.cpp
@@ -101,23 +101,25 @@ QByteArray createEntityCommand(const flatbuffers::FlatBufferBuilder &entityFbb)
101 return command; 101 return command;
102} 102}
103 103
104QByteArray modifyEntityCommand(const flatbuffers::FlatBufferBuilder &entityFbb, const QByteArray &uid, qint64 revision) 104QByteArray modifyEntityCommand(const flatbuffers::FlatBufferBuilder &entityFbb, const QByteArray &uid, qint64 revision, QStringList modifiedProperties = {"summary"}, bool replayToSource = true)
105{ 105{
106 flatbuffers::FlatBufferBuilder fbb; 106 flatbuffers::FlatBufferBuilder fbb;
107 auto type = fbb.CreateString(Sink::ApplicationDomain::getTypeName<Sink::ApplicationDomain::Event>().toStdString().data()); 107 auto type = fbb.CreateString(Sink::ApplicationDomain::getTypeName<Sink::ApplicationDomain::Event>().toStdString().data());
108 auto id = fbb.CreateString(std::string(uid.constData(), uid.size())); 108 auto id = fbb.CreateString(std::string(uid.constData(), uid.size()));
109 auto summaryProperty = fbb.CreateString("summary"); 109 std::vector<flatbuffers::Offset<flatbuffers::String>> modifiedVector;
110 std::vector<flatbuffers::Offset<flatbuffers::String>> modified; 110 for (const auto &modified : modifiedProperties) {
111 modified.push_back(summaryProperty); 111 modifiedVector.push_back(fbb.CreateString(modified.toStdString()));
112 }
112 auto delta = fbb.CreateVector<uint8_t>(entityFbb.GetBufferPointer(), entityFbb.GetSize()); 113 auto delta = fbb.CreateVector<uint8_t>(entityFbb.GetBufferPointer(), entityFbb.GetSize());
113 auto modifiedProperties = fbb.CreateVector(modified); 114 auto modifiedPropertiesVector = fbb.CreateVector(modifiedVector);
114 // auto delta = Sink::EntityBuffer::appendAsVector(fbb, buffer.constData(), buffer.size());
115 Sink::Commands::ModifyEntityBuilder builder(fbb); 115 Sink::Commands::ModifyEntityBuilder builder(fbb);
116 builder.add_domainType(type); 116 builder.add_domainType(type);
117 builder.add_delta(delta); 117 builder.add_delta(delta);
118 builder.add_revision(revision); 118 builder.add_revision(revision);
119 builder.add_entityId(id); 119 builder.add_entityId(id);
120 builder.add_modifiedProperties(modifiedProperties); 120 builder.add_modifiedProperties(modifiedPropertiesVector);
121 builder.add_replayToSource(replayToSource);
122
121 auto location = builder.Finish(); 123 auto location = builder.Finish();
122 Sink::Commands::FinishModifyEntityBuffer(fbb, location); 124 Sink::Commands::FinishModifyEntityBuffer(fbb, location);
123 125
@@ -401,6 +403,55 @@ private slots:
401 QCOMPARE(testProcessor->deletedSummaries.at(0), QByteArray("summary2")); 403 QCOMPARE(testProcessor->deletedSummaries.at(0), QByteArray("summary2"));
402 } 404 }
403 } 405 }
406
407 void testModifyWithConflict()
408 {
409 flatbuffers::FlatBufferBuilder entityFbb;
410 auto command = createEntityCommand(createEvent(entityFbb, "summary", "description"));
411
412 Sink::Pipeline pipeline(getContext(), {"test"});
413
414 auto adaptorFactory = QSharedPointer<TestEventAdaptorFactory>::create();
415
416 // Create the initial revision
417 pipeline.startTransaction();
418 pipeline.newEntity(command.constData(), command.size());
419 pipeline.commit();
420
421 // Get uid of written entity
422 auto keys = getKeys(instanceIdentifier(), "event.main");
423 QCOMPARE(keys.size(), 1);
424 const auto key = keys.first();
425 const auto uid = Sink::Storage::DataStore::uidFromKey(key);
426
427 //Simulate local modification
428 {
429 entityFbb.Clear();
430 auto modifyCommand = modifyEntityCommand(createEvent(entityFbb, "summaryLocal"), uid, 1, {"summary"}, true);
431 pipeline.startTransaction();
432 pipeline.modifiedEntity(modifyCommand.constData(), modifyCommand.size());
433 pipeline.commit();
434 }
435
436
437 //Simulate remote modification
438 //We assume the remote modification is not overly smart and always marks all properties as changed.
439 {
440 entityFbb.Clear();
441 auto modifyCommand = modifyEntityCommand(createEvent(entityFbb, "summaryRemote", "descriptionRemote"), uid, 2, {"summary", "description"}, false);
442 pipeline.startTransaction();
443 pipeline.modifiedEntity(modifyCommand.constData(), modifyCommand.size());
444 pipeline.commit();
445 }
446
447 // Ensure we've got the new revision with the modification
448 auto buffer = getEntity(instanceIdentifier(), "event.main", Sink::Storage::DataStore::assembleKey(uid, 3));
449 QVERIFY(!buffer.isEmpty());
450 Sink::EntityBuffer entityBuffer(buffer.data(), buffer.size());
451 auto adaptor = adaptorFactory->createAdaptor(entityBuffer.entity());
452 QVERIFY2(adaptor->getProperty("summary").toString() == QString("summaryLocal"), "The local modification was reverted.");
453 QVERIFY2(adaptor->getProperty("description").toString() == QString("descriptionRemote"), "The remote modification was not applied.");
454 }
404}; 455};
405 456
406QTEST_MAIN(PipelineTest) 457QTEST_MAIN(PipelineTest)