summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2015-12-29 10:01:42 +0100
committerChristian Mollekopf <chrigi_1@fastmail.fm>2015-12-29 10:01:42 +0100
commit664fb0fbfd21a25d1f86938a186f6ec9cea6d882 (patch)
treed819fb4eba44da46fd9c9e9d5bd051e0124f68ee
parent1998b8d35478205118cea5cc215c682b235434f1 (diff)
downloadsink-664fb0fbfd21a25d1f86938a186f6ec9cea6d882.tar.gz
sink-664fb0fbfd21a25d1f86938a186f6ec9cea6d882.zip
Mark commands with whether they need to be replayed
This way we don't have to try to figure out whether a change is coming from the source already.
-rw-r--r--common/commands/createentity.fbs1
-rw-r--r--common/commands/deleteentity.fbs1
-rw-r--r--common/commands/modifyentity.fbs1
-rw-r--r--common/metadata.fbs3
-rw-r--r--common/pipeline.cpp9
-rw-r--r--examples/maildirresource/maildirresource.cpp24
-rw-r--r--tests/domainadaptortest.cpp1
7 files changed, 23 insertions, 17 deletions
diff --git a/common/commands/createentity.fbs b/common/commands/createentity.fbs
index a5bc95c..5358dea 100644
--- a/common/commands/createentity.fbs
+++ b/common/commands/createentity.fbs
@@ -4,6 +4,7 @@ table CreateEntity {
4 entityId: string; 4 entityId: string;
5 domainType: string; 5 domainType: string;
6 delta: [ubyte]; 6 delta: [ubyte];
7 replayToSource: bool = true;
7} 8}
8 9
9root_type CreateEntity; 10root_type CreateEntity;
diff --git a/common/commands/deleteentity.fbs b/common/commands/deleteentity.fbs
index 4f32b54..9f865be 100644
--- a/common/commands/deleteentity.fbs
+++ b/common/commands/deleteentity.fbs
@@ -4,6 +4,7 @@ table DeleteEntity {
4 revision: ulong; 4 revision: ulong;
5 entityId: string; 5 entityId: string;
6 domainType: string; 6 domainType: string;
7 replayToSource: bool = true;
7} 8}
8 9
9root_type DeleteEntity; 10root_type DeleteEntity;
diff --git a/common/commands/modifyentity.fbs b/common/commands/modifyentity.fbs
index a59eb9b..03b543f 100644
--- a/common/commands/modifyentity.fbs
+++ b/common/commands/modifyentity.fbs
@@ -6,6 +6,7 @@ table ModifyEntity {
6 deletions: [string]; //A list of deleted properties 6 deletions: [string]; //A list of deleted properties
7 domainType: string; 7 domainType: string;
8 delta: [ubyte]; //Contains an entity buffer with all changed properties set 8 delta: [ubyte]; //Contains an entity buffer with all changed properties set
9 replayToSource: bool = true;
9} 10}
10 11
11root_type ModifyEntity; 12root_type ModifyEntity;
diff --git a/common/metadata.fbs b/common/metadata.fbs
index 1455238..0a709fe 100644
--- a/common/metadata.fbs
+++ b/common/metadata.fbs
@@ -4,8 +4,7 @@ enum Operation : byte { Creation = 1, Modification, Removal }
4 4
5table Metadata { 5table Metadata {
6 revision: ulong; 6 revision: ulong;
7 processed: bool = true; 7 replayToSource: bool = true;
8 processingProgress: [string];
9 operation: Operation = Modification; 8 operation: Operation = Modification;
10} 9}
11 10
diff --git a/common/pipeline.cpp b/common/pipeline.cpp
index f8b1fb1..06d8114 100644
--- a/common/pipeline.cpp
+++ b/common/pipeline.cpp
@@ -142,6 +142,7 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size)
142 } 142 }
143 auto createEntity = Akonadi2::Commands::GetCreateEntity(command); 143 auto createEntity = Akonadi2::Commands::GetCreateEntity(command);
144 144
145 const bool replayToSource = createEntity->replayToSource();
145 const QByteArray bufferType = QByteArray(reinterpret_cast<char const*>(createEntity->domainType()->Data()), createEntity->domainType()->size()); 146 const QByteArray bufferType = QByteArray(reinterpret_cast<char const*>(createEntity->domainType()->Data()), createEntity->domainType()->size());
146 { 147 {
147 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(createEntity->delta()->Data()), createEntity->delta()->size()); 148 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(createEntity->delta()->Data()), createEntity->delta()->size());
@@ -175,8 +176,8 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size)
175 flatbuffers::FlatBufferBuilder metadataFbb; 176 flatbuffers::FlatBufferBuilder metadataFbb;
176 auto metadataBuilder = Akonadi2::MetadataBuilder(metadataFbb); 177 auto metadataBuilder = Akonadi2::MetadataBuilder(metadataFbb);
177 metadataBuilder.add_revision(newRevision); 178 metadataBuilder.add_revision(newRevision);
178 metadataBuilder.add_processed(false);
179 metadataBuilder.add_operation(Akonadi2::Operation_Creation); 179 metadataBuilder.add_operation(Akonadi2::Operation_Creation);
180 metadataBuilder.add_replayToSource(replayToSource);
180 auto metadataBuffer = metadataBuilder.Finish(); 181 auto metadataBuffer = metadataBuilder.Finish();
181 Akonadi2::FinishMetadataBuffer(metadataFbb, metadataBuffer); 182 Akonadi2::FinishMetadataBuffer(metadataFbb, metadataBuffer);
182 183
@@ -224,6 +225,7 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size)
224 Q_ASSERT(modifyEntity); 225 Q_ASSERT(modifyEntity);
225 226
226 const qint64 baseRevision = modifyEntity->revision(); 227 const qint64 baseRevision = modifyEntity->revision();
228 const bool replayToSource = modifyEntity->replayToSource();
227 //TODO rename modifyEntity->domainType to bufferType 229 //TODO rename modifyEntity->domainType to bufferType
228 const QByteArray bufferType = QByteArray(reinterpret_cast<char const*>(modifyEntity->domainType()->Data()), modifyEntity->domainType()->size()); 230 const QByteArray bufferType = QByteArray(reinterpret_cast<char const*>(modifyEntity->domainType()->Data()), modifyEntity->domainType()->size());
229 const QByteArray key = QByteArray(reinterpret_cast<char const*>(modifyEntity->entityId()->Data()), modifyEntity->entityId()->size()); 231 const QByteArray key = QByteArray(reinterpret_cast<char const*>(modifyEntity->entityId()->Data()), modifyEntity->entityId()->size());
@@ -291,8 +293,8 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size)
291 flatbuffers::FlatBufferBuilder metadataFbb; 293 flatbuffers::FlatBufferBuilder metadataFbb;
292 auto metadataBuilder = Akonadi2::MetadataBuilder(metadataFbb); 294 auto metadataBuilder = Akonadi2::MetadataBuilder(metadataFbb);
293 metadataBuilder.add_revision(newRevision); 295 metadataBuilder.add_revision(newRevision);
294 metadataBuilder.add_processed(false);
295 metadataBuilder.add_operation(Akonadi2::Operation_Modification); 296 metadataBuilder.add_operation(Akonadi2::Operation_Modification);
297 metadataBuilder.add_replayToSource(replayToSource);
296 auto metadataBuffer = metadataBuilder.Finish(); 298 auto metadataBuffer = metadataBuilder.Finish();
297 Akonadi2::FinishMetadataBuffer(metadataFbb, metadataBuffer); 299 Akonadi2::FinishMetadataBuffer(metadataFbb, metadataBuffer);
298 300
@@ -329,6 +331,7 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size)
329 } 331 }
330 auto deleteEntity = Akonadi2::Commands::GetDeleteEntity(command); 332 auto deleteEntity = Akonadi2::Commands::GetDeleteEntity(command);
331 333
334 const bool replayToSource = deleteEntity->replayToSource();
332 const QByteArray bufferType = QByteArray(reinterpret_cast<char const*>(deleteEntity->domainType()->Data()), deleteEntity->domainType()->size()); 335 const QByteArray bufferType = QByteArray(reinterpret_cast<char const*>(deleteEntity->domainType()->Data()), deleteEntity->domainType()->size());
333 const QByteArray key = QByteArray(reinterpret_cast<char const*>(deleteEntity->entityId()->Data()), deleteEntity->entityId()->size()); 336 const QByteArray key = QByteArray(reinterpret_cast<char const*>(deleteEntity->entityId()->Data()), deleteEntity->entityId()->size());
334 337
@@ -366,8 +369,8 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size)
366 flatbuffers::FlatBufferBuilder metadataFbb; 369 flatbuffers::FlatBufferBuilder metadataFbb;
367 auto metadataBuilder = Akonadi2::MetadataBuilder(metadataFbb); 370 auto metadataBuilder = Akonadi2::MetadataBuilder(metadataFbb);
368 metadataBuilder.add_revision(newRevision); 371 metadataBuilder.add_revision(newRevision);
369 metadataBuilder.add_processed(false);
370 metadataBuilder.add_operation(Akonadi2::Operation_Removal); 372 metadataBuilder.add_operation(Akonadi2::Operation_Removal);
373 metadataBuilder.add_replayToSource(replayToSource);
371 auto metadataBuffer = metadataBuilder.Finish(); 374 auto metadataBuffer = metadataBuilder.Finish();
372 Akonadi2::FinishMetadataBuffer(metadataFbb, metadataBuffer); 375 Akonadi2::FinishMetadataBuffer(metadataFbb, metadataBuffer);
373 376
diff --git a/examples/maildirresource/maildirresource.cpp b/examples/maildirresource/maildirresource.cpp
index 8333f76..7fcf12e 100644
--- a/examples/maildirresource/maildirresource.cpp
+++ b/examples/maildirresource/maildirresource.cpp
@@ -132,6 +132,8 @@ QStringList MaildirResource::listAvailableFolders()
132 132
133static void createEntity(const QByteArray &akonadiId, const QByteArray &bufferType, const Akonadi2::ApplicationDomain::ApplicationDomainType &domainObject, DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function<void(const QByteArray &)> callback) 133static void createEntity(const QByteArray &akonadiId, const QByteArray &bufferType, const Akonadi2::ApplicationDomain::ApplicationDomainType &domainObject, DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function<void(const QByteArray &)> callback)
134{ 134{
135 //These changes are coming from the source
136 const auto replayToSource = false;
135 flatbuffers::FlatBufferBuilder entityFbb; 137 flatbuffers::FlatBufferBuilder entityFbb;
136 adaptorFactory.createBuffer(domainObject, entityFbb); 138 adaptorFactory.createBuffer(domainObject, entityFbb);
137 flatbuffers::FlatBufferBuilder fbb; 139 flatbuffers::FlatBufferBuilder fbb;
@@ -139,13 +141,15 @@ static void createEntity(const QByteArray &akonadiId, const QByteArray &bufferTy
139 auto entityId = fbb.CreateString(akonadiId.toStdString()); 141 auto entityId = fbb.CreateString(akonadiId.toStdString());
140 auto type = fbb.CreateString(bufferType.toStdString()); 142 auto type = fbb.CreateString(bufferType.toStdString());
141 auto delta = Akonadi2::EntityBuffer::appendAsVector(fbb, entityFbb.GetBufferPointer(), entityFbb.GetSize()); 143 auto delta = Akonadi2::EntityBuffer::appendAsVector(fbb, entityFbb.GetBufferPointer(), entityFbb.GetSize());
142 auto location = Akonadi2::Commands::CreateCreateEntity(fbb, entityId, type, delta); 144 auto location = Akonadi2::Commands::CreateCreateEntity(fbb, entityId, type, delta, replayToSource);
143 Akonadi2::Commands::FinishCreateEntityBuffer(fbb, location); 145 Akonadi2::Commands::FinishCreateEntityBuffer(fbb, location);
144 callback(QByteArray::fromRawData(reinterpret_cast<char const *>(fbb.GetBufferPointer()), fbb.GetSize())); 146 callback(QByteArray::fromRawData(reinterpret_cast<char const *>(fbb.GetBufferPointer()), fbb.GetSize()));
145} 147}
146 148
147static void modifyEntity(const QByteArray &akonadiId, qint64 revision, const QByteArray &bufferType, const Akonadi2::ApplicationDomain::ApplicationDomainType &domainObject, DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function<void(const QByteArray &)> callback) 149static void modifyEntity(const QByteArray &akonadiId, qint64 revision, const QByteArray &bufferType, const Akonadi2::ApplicationDomain::ApplicationDomainType &domainObject, DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function<void(const QByteArray &)> callback)
148{ 150{
151 //These changes are coming from the source
152 const auto replayToSource = false;
149 flatbuffers::FlatBufferBuilder entityFbb; 153 flatbuffers::FlatBufferBuilder entityFbb;
150 adaptorFactory.createBuffer(domainObject, entityFbb); 154 adaptorFactory.createBuffer(domainObject, entityFbb);
151 flatbuffers::FlatBufferBuilder fbb; 155 flatbuffers::FlatBufferBuilder fbb;
@@ -154,18 +158,20 @@ static void modifyEntity(const QByteArray &akonadiId, qint64 revision, const QBy
154 auto type = fbb.CreateString(bufferType.toStdString()); 158 auto type = fbb.CreateString(bufferType.toStdString());
155 auto delta = Akonadi2::EntityBuffer::appendAsVector(fbb, entityFbb.GetBufferPointer(), entityFbb.GetSize()); 159 auto delta = Akonadi2::EntityBuffer::appendAsVector(fbb, entityFbb.GetBufferPointer(), entityFbb.GetSize());
156 //TODO removals 160 //TODO removals
157 auto location = Akonadi2::Commands::CreateModifyEntity(fbb, revision, entityId, 0, type, delta); 161 auto location = Akonadi2::Commands::CreateModifyEntity(fbb, revision, entityId, 0, type, delta, replayToSource);
158 Akonadi2::Commands::FinishModifyEntityBuffer(fbb, location); 162 Akonadi2::Commands::FinishModifyEntityBuffer(fbb, location);
159 callback(QByteArray::fromRawData(reinterpret_cast<char const *>(fbb.GetBufferPointer()), fbb.GetSize())); 163 callback(QByteArray::fromRawData(reinterpret_cast<char const *>(fbb.GetBufferPointer()), fbb.GetSize()));
160} 164}
161 165
162static void deleteEntity(const QByteArray &akonadiId, qint64 revision, const QByteArray &bufferType, std::function<void(const QByteArray &)> callback) 166static void deleteEntity(const QByteArray &akonadiId, qint64 revision, const QByteArray &bufferType, std::function<void(const QByteArray &)> callback)
163{ 167{
168 //These changes are coming from the source
169 const auto replayToSource = false;
164 flatbuffers::FlatBufferBuilder fbb; 170 flatbuffers::FlatBufferBuilder fbb;
165 auto entityId = fbb.CreateString(akonadiId.toStdString()); 171 auto entityId = fbb.CreateString(akonadiId.toStdString());
166 //This is the resource type and not the domain type 172 //This is the resource type and not the domain type
167 auto type = fbb.CreateString(bufferType.toStdString()); 173 auto type = fbb.CreateString(bufferType.toStdString());
168 auto location = Akonadi2::Commands::CreateDeleteEntity(fbb, revision, entityId, type); 174 auto location = Akonadi2::Commands::CreateDeleteEntity(fbb, revision, entityId, type, replayToSource);
169 Akonadi2::Commands::FinishDeleteEntityBuffer(fbb, location); 175 Akonadi2::Commands::FinishDeleteEntityBuffer(fbb, location);
170 callback(QByteArray::fromRawData(reinterpret_cast<char const *>(fbb.GetBufferPointer()), fbb.GetSize())); 176 callback(QByteArray::fromRawData(reinterpret_cast<char const *>(fbb.GetBufferPointer()), fbb.GetSize()));
171} 177}
@@ -365,23 +371,19 @@ KAsync::Job<void> MaildirResource::replay(const QByteArray &type, const QByteArr
365 //This results in a deadlock during sync 371 //This results in a deadlock during sync
366 Akonadi2::Storage store(Akonadi2::storageLocation(), mResourceInstanceIdentifier + ".synchronization", Akonadi2::Storage::ReadWrite); 372 Akonadi2::Storage store(Akonadi2::storageLocation(), mResourceInstanceIdentifier + ".synchronization", Akonadi2::Storage::ReadWrite);
367 auto synchronizationTransaction = store.createTransaction(Akonadi2::Storage::ReadWrite); 373 auto synchronizationTransaction = store.createTransaction(Akonadi2::Storage::ReadWrite);
368 const auto uid = Akonadi2::Storage::uidFromKey(key);
369 const auto remoteId = resolveLocalId(type, uid, synchronizationTransaction);
370 374
371 Trace() << "Replaying " << key << type; 375 Trace() << "Replaying " << key << type;
372 if (type == ENTITY_TYPE_FOLDER) { 376 if (type == ENTITY_TYPE_FOLDER) {
373 Akonadi2::EntityBuffer buffer(value.data(), value.size()); 377 Akonadi2::EntityBuffer buffer(value.data(), value.size());
374 const Akonadi2::Entity &entity = buffer.entity(); 378 const Akonadi2::Entity &entity = buffer.entity();
375 const auto metadataBuffer = Akonadi2::EntityBuffer::readBuffer<Akonadi2::Metadata>(entity.metadata()); 379 const auto metadataBuffer = Akonadi2::EntityBuffer::readBuffer<Akonadi2::Metadata>(entity.metadata());
380 if (metadataBuffer && !metadataBuffer->replayToSource()) {
381 Trace() << "Change is coming from the source";
382 return KAsync::null<void>();
383 }
376 const qint64 revision = metadataBuffer ? metadataBuffer->revision() : -1; 384 const qint64 revision = metadataBuffer ? metadataBuffer->revision() : -1;
377 const auto operation = metadataBuffer ? metadataBuffer->operation() : Akonadi2::Operation_Creation; 385 const auto operation = metadataBuffer ? metadataBuffer->operation() : Akonadi2::Operation_Creation;
378 if (operation == Akonadi2::Operation_Creation) { 386 if (operation == Akonadi2::Operation_Creation) {
379 //FIXME: This check only works for new entities
380 //Figure out wether we have replayed that revision already to the source
381 if (!remoteId.isEmpty()) {
382 Trace() << "Change is coming from the source";
383 return KAsync::null<void>();
384 }
385 const Akonadi2::ApplicationDomain::Folder folder(mResourceInstanceIdentifier, Akonadi2::Storage::uidFromKey(key), revision, mFolderAdaptorFactory->createAdaptor(entity)); 387 const Akonadi2::ApplicationDomain::Folder folder(mResourceInstanceIdentifier, Akonadi2::Storage::uidFromKey(key), revision, mFolderAdaptorFactory->createAdaptor(entity));
386 auto folderName = folder.getProperty("name").toString(); 388 auto folderName = folder.getProperty("name").toString();
387 //TODO handle non toplevel folders 389 //TODO handle non toplevel folders
diff --git a/tests/domainadaptortest.cpp b/tests/domainadaptortest.cpp
index f2300cb..b3d2a00 100644
--- a/tests/domainadaptortest.cpp
+++ b/tests/domainadaptortest.cpp
@@ -58,7 +58,6 @@ private Q_SLOTS:
58 flatbuffers::FlatBufferBuilder metadataFbb; 58 flatbuffers::FlatBufferBuilder metadataFbb;
59 auto metadataBuilder = Akonadi2::MetadataBuilder(metadataFbb); 59 auto metadataBuilder = Akonadi2::MetadataBuilder(metadataFbb);
60 metadataBuilder.add_revision(1); 60 metadataBuilder.add_revision(1);
61 metadataBuilder.add_processed(false);
62 auto metadataBuffer = metadataBuilder.Finish(); 61 auto metadataBuffer = metadataBuilder.Finish();
63 Akonadi2::FinishMetadataBuffer(metadataFbb, metadataBuffer); 62 Akonadi2::FinishMetadataBuffer(metadataFbb, metadataBuffer);
64 63