summaryrefslogtreecommitdiffstats
path: root/common/genericresource.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'common/genericresource.cpp')
-rw-r--r--common/genericresource.cpp174
1 files changed, 87 insertions, 87 deletions
diff --git a/common/genericresource.cpp b/common/genericresource.cpp
index c7f323a..c7326d3 100644
--- a/common/genericresource.cpp
+++ b/common/genericresource.cpp
@@ -20,7 +20,7 @@
20 20
21static int sBatchSize = 100; 21static int sBatchSize = 100;
22 22
23using namespace Akonadi2; 23using namespace Sink;
24 24
25/** 25/**
26 * Replays changes from the storage one by one. 26 * Replays changes from the storage one by one.
@@ -105,8 +105,8 @@ public Q_SLOTS:
105 } 105 }
106 106
107private: 107private:
108 Akonadi2::Storage mStorage; 108 Sink::Storage mStorage;
109 Akonadi2::Storage mChangeReplayStore; 109 Sink::Storage mChangeReplayStore;
110 ReplayFunction mReplayFunction; 110 ReplayFunction mReplayFunction;
111}; 111};
112 112
@@ -118,7 +118,7 @@ class CommandProcessor : public QObject
118 Q_OBJECT 118 Q_OBJECT
119 typedef std::function<KAsync::Job<void>(void const *, size_t)> InspectionFunction; 119 typedef std::function<KAsync::Job<void>(void const *, size_t)> InspectionFunction;
120public: 120public:
121 CommandProcessor(Akonadi2::Pipeline *pipeline, QList<MessageQueue*> commandQueues) 121 CommandProcessor(Sink::Pipeline *pipeline, QList<MessageQueue*> commandQueues)
122 : QObject(), 122 : QObject(),
123 mPipeline(pipeline), 123 mPipeline(pipeline),
124 mCommandQueues(commandQueues), 124 mCommandQueues(commandQueues),
@@ -175,18 +175,18 @@ private slots:
175 }).exec(); 175 }).exec();
176 } 176 }
177 177
178 KAsync::Job<qint64> processQueuedCommand(const Akonadi2::QueuedCommand *queuedCommand) 178 KAsync::Job<qint64> processQueuedCommand(const Sink::QueuedCommand *queuedCommand)
179 { 179 {
180 Log() << "Processing command: " << Akonadi2::Commands::name(queuedCommand->commandId()); 180 Log() << "Processing command: " << Sink::Commands::name(queuedCommand->commandId());
181 //Throw command into appropriate pipeline 181 //Throw command into appropriate pipeline
182 switch (queuedCommand->commandId()) { 182 switch (queuedCommand->commandId()) {
183 case Akonadi2::Commands::DeleteEntityCommand: 183 case Sink::Commands::DeleteEntityCommand:
184 return mPipeline->deletedEntity(queuedCommand->command()->Data(), queuedCommand->command()->size()); 184 return mPipeline->deletedEntity(queuedCommand->command()->Data(), queuedCommand->command()->size());
185 case Akonadi2::Commands::ModifyEntityCommand: 185 case Sink::Commands::ModifyEntityCommand:
186 return mPipeline->modifiedEntity(queuedCommand->command()->Data(), queuedCommand->command()->size()); 186 return mPipeline->modifiedEntity(queuedCommand->command()->Data(), queuedCommand->command()->size());
187 case Akonadi2::Commands::CreateEntityCommand: 187 case Sink::Commands::CreateEntityCommand:
188 return mPipeline->newEntity(queuedCommand->command()->Data(), queuedCommand->command()->size()); 188 return mPipeline->newEntity(queuedCommand->command()->Data(), queuedCommand->command()->size());
189 case Akonadi2::Commands::InspectionCommand: 189 case Sink::Commands::InspectionCommand:
190 if (mInspect) { 190 if (mInspect) {
191 return mInspect(queuedCommand->command()->Data(), queuedCommand->command()->size()).then<qint64>([]() { 191 return mInspect(queuedCommand->command()->Data(), queuedCommand->command()->size()).then<qint64>([]() {
192 return -1; 192 return -1;
@@ -203,16 +203,16 @@ private slots:
203 KAsync::Job<qint64, qint64> processQueuedCommand(const QByteArray &data) 203 KAsync::Job<qint64, qint64> processQueuedCommand(const QByteArray &data)
204 { 204 {
205 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(data.constData()), data.size()); 205 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(data.constData()), data.size());
206 if (!Akonadi2::VerifyQueuedCommandBuffer(verifyer)) { 206 if (!Sink::VerifyQueuedCommandBuffer(verifyer)) {
207 Warning() << "invalid buffer"; 207 Warning() << "invalid buffer";
208 // return KAsync::error<void, qint64>(1, "Invalid Buffer"); 208 // return KAsync::error<void, qint64>(1, "Invalid Buffer");
209 } 209 }
210 auto queuedCommand = Akonadi2::GetQueuedCommand(data.constData()); 210 auto queuedCommand = Sink::GetQueuedCommand(data.constData());
211 const auto commandId = queuedCommand->commandId(); 211 const auto commandId = queuedCommand->commandId();
212 Trace() << "Dequeued Command: " << Akonadi2::Commands::name(commandId); 212 Trace() << "Dequeued Command: " << Sink::Commands::name(commandId);
213 return processQueuedCommand(queuedCommand).then<qint64, qint64>( 213 return processQueuedCommand(queuedCommand).then<qint64, qint64>(
214 [commandId](qint64 createdRevision) -> qint64 { 214 [commandId](qint64 createdRevision) -> qint64 {
215 Trace() << "Command pipeline processed: " << Akonadi2::Commands::name(commandId); 215 Trace() << "Command pipeline processed: " << Sink::Commands::name(commandId);
216 return createdRevision; 216 return createdRevision;
217 } 217 }
218 , 218 ,
@@ -278,7 +278,7 @@ private slots:
278 } 278 }
279 279
280private: 280private:
281 Akonadi2::Pipeline *mPipeline; 281 Sink::Pipeline *mPipeline;
282 //Ordered by priority 282 //Ordered by priority
283 QList<MessageQueue*> mCommandQueues; 283 QList<MessageQueue*> mCommandQueues;
284 bool mProcessingLock; 284 bool mProcessingLock;
@@ -289,19 +289,19 @@ private:
289 289
290 290
291GenericResource::GenericResource(const QByteArray &resourceInstanceIdentifier, const QSharedPointer<Pipeline> &pipeline) 291GenericResource::GenericResource(const QByteArray &resourceInstanceIdentifier, const QSharedPointer<Pipeline> &pipeline)
292 : Akonadi2::Resource(), 292 : Sink::Resource(),
293 mUserQueue(Akonadi2::storageLocation(), resourceInstanceIdentifier + ".userqueue"), 293 mUserQueue(Sink::storageLocation(), resourceInstanceIdentifier + ".userqueue"),
294 mSynchronizerQueue(Akonadi2::storageLocation(), resourceInstanceIdentifier + ".synchronizerqueue"), 294 mSynchronizerQueue(Sink::storageLocation(), resourceInstanceIdentifier + ".synchronizerqueue"),
295 mResourceInstanceIdentifier(resourceInstanceIdentifier), 295 mResourceInstanceIdentifier(resourceInstanceIdentifier),
296 mPipeline(pipeline ? pipeline : QSharedPointer<Akonadi2::Pipeline>::create(resourceInstanceIdentifier)), 296 mPipeline(pipeline ? pipeline : QSharedPointer<Sink::Pipeline>::create(resourceInstanceIdentifier)),
297 mError(0), 297 mError(0),
298 mClientLowerBoundRevision(std::numeric_limits<qint64>::max()) 298 mClientLowerBoundRevision(std::numeric_limits<qint64>::max())
299{ 299{
300 mProcessor = new CommandProcessor(mPipeline.data(), QList<MessageQueue*>() << &mUserQueue << &mSynchronizerQueue); 300 mProcessor = new CommandProcessor(mPipeline.data(), QList<MessageQueue*>() << &mUserQueue << &mSynchronizerQueue);
301 mProcessor->setInspectionCommand([this](void const *command, size_t size) { 301 mProcessor->setInspectionCommand([this](void const *command, size_t size) {
302 flatbuffers::Verifier verifier((const uint8_t *)command, size); 302 flatbuffers::Verifier verifier((const uint8_t *)command, size);
303 if (Akonadi2::Commands::VerifyInspectionBuffer(verifier)) { 303 if (Sink::Commands::VerifyInspectionBuffer(verifier)) {
304 auto buffer = Akonadi2::Commands::GetInspection(command); 304 auto buffer = Sink::Commands::GetInspection(command);
305 int inspectionType = buffer->type(); 305 int inspectionType = buffer->type();
306 306
307 QByteArray inspectionId = BufferUtils::extractBuffer(buffer->id()); 307 QByteArray inspectionId = BufferUtils::extractBuffer(buffer->id());
@@ -313,17 +313,17 @@ GenericResource::GenericResource(const QByteArray &resourceInstanceIdentifier, c
313 QVariant expectedValue; 313 QVariant expectedValue;
314 s >> expectedValue; 314 s >> expectedValue;
315 inspect(inspectionType, inspectionId, domainType, entityId, property, expectedValue).then<void>([=]() { 315 inspect(inspectionType, inspectionId, domainType, entityId, property, expectedValue).then<void>([=]() {
316 Akonadi2::Notification n; 316 Sink::Notification n;
317 n.type = Akonadi2::Commands::NotificationType_Inspection; 317 n.type = Sink::Commands::NotificationType_Inspection;
318 n.id = inspectionId; 318 n.id = inspectionId;
319 n.code = Akonadi2::Commands::NotificationCode_Success; 319 n.code = Sink::Commands::NotificationCode_Success;
320 emit notify(n); 320 emit notify(n);
321 }, [=](int code, const QString &message) { 321 }, [=](int code, const QString &message) {
322 Akonadi2::Notification n; 322 Sink::Notification n;
323 n.type = Akonadi2::Commands::NotificationType_Inspection; 323 n.type = Sink::Commands::NotificationType_Inspection;
324 n.message = message; 324 n.message = message;
325 n.id = inspectionId; 325 n.id = inspectionId;
326 n.code = Akonadi2::Commands::NotificationCode_Failure; 326 n.code = Sink::Commands::NotificationCode_Failure;
327 emit notify(n); 327 emit notify(n);
328 }).exec(); 328 }).exec();
329 return KAsync::null<void>(); 329 return KAsync::null<void>();
@@ -334,7 +334,7 @@ GenericResource::GenericResource(const QByteArray &resourceInstanceIdentifier, c
334 QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, this, &Resource::revisionUpdated); 334 QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, this, &Resource::revisionUpdated);
335 mSourceChangeReplay = new ChangeReplay(resourceInstanceIdentifier, [this](const QByteArray &type, const QByteArray &key, const QByteArray &value) { 335 mSourceChangeReplay = new ChangeReplay(resourceInstanceIdentifier, [this](const QByteArray &type, const QByteArray &key, const QByteArray &value) {
336 //This results in a deadlock when a sync is in progress and we try to create a second writing transaction (which is why we turn changereplay off during the sync) 336 //This results in a deadlock when a sync is in progress and we try to create a second writing transaction (which is why we turn changereplay off during the sync)
337 auto synchronizationStore = QSharedPointer<Akonadi2::Storage>::create(Akonadi2::storageLocation(), mResourceInstanceIdentifier + ".synchronization", Akonadi2::Storage::ReadWrite); 337 auto synchronizationStore = QSharedPointer<Sink::Storage>::create(Sink::storageLocation(), mResourceInstanceIdentifier + ".synchronization", Sink::Storage::ReadWrite);
338 return this->replay(*synchronizationStore, type, key, value).then<void>([synchronizationStore](){}); 338 return this->replay(*synchronizationStore, type, key, value).then<void>([synchronizationStore](){});
339 }); 339 });
340 enableChangeReplay(true); 340 enableChangeReplay(true);
@@ -370,13 +370,13 @@ void GenericResource::enableChangeReplay(bool enable)
370 } 370 }
371} 371}
372 372
373void GenericResource::addType(const QByteArray &type, DomainTypeAdaptorFactoryInterface::Ptr factory, const QVector<Akonadi2::Preprocessor*> &preprocessors) 373void GenericResource::addType(const QByteArray &type, DomainTypeAdaptorFactoryInterface::Ptr factory, const QVector<Sink::Preprocessor*> &preprocessors)
374{ 374{
375 mPipeline->setPreprocessors(type, preprocessors); 375 mPipeline->setPreprocessors(type, preprocessors);
376 mPipeline->setAdaptorFactory(type, factory); 376 mPipeline->setAdaptorFactory(type, factory);
377} 377}
378 378
379KAsync::Job<void> GenericResource::replay(Akonadi2::Storage &synchronizationStore, const QByteArray &type, const QByteArray &key, const QByteArray &value) 379KAsync::Job<void> GenericResource::replay(Sink::Storage &synchronizationStore, const QByteArray &type, const QByteArray &key, const QByteArray &value)
380{ 380{
381 return KAsync::null<void>(); 381 return KAsync::null<void>();
382} 382}
@@ -384,18 +384,18 @@ KAsync::Job<void> GenericResource::replay(Akonadi2::Storage &synchronizationStor
384void GenericResource::removeFromDisk(const QByteArray &instanceIdentifier) 384void GenericResource::removeFromDisk(const QByteArray &instanceIdentifier)
385{ 385{
386 Warning() << "Removing from generic resource"; 386 Warning() << "Removing from generic resource";
387 Akonadi2::Storage(Akonadi2::storageLocation(), instanceIdentifier, Akonadi2::Storage::ReadWrite).removeFromDisk(); 387 Sink::Storage(Sink::storageLocation(), instanceIdentifier, Sink::Storage::ReadWrite).removeFromDisk();
388 Akonadi2::Storage(Akonadi2::storageLocation(), instanceIdentifier + ".userqueue", Akonadi2::Storage::ReadWrite).removeFromDisk(); 388 Sink::Storage(Sink::storageLocation(), instanceIdentifier + ".userqueue", Sink::Storage::ReadWrite).removeFromDisk();
389 Akonadi2::Storage(Akonadi2::storageLocation(), instanceIdentifier + ".synchronizerqueue", Akonadi2::Storage::ReadWrite).removeFromDisk(); 389 Sink::Storage(Sink::storageLocation(), instanceIdentifier + ".synchronizerqueue", Sink::Storage::ReadWrite).removeFromDisk();
390 Akonadi2::Storage(Akonadi2::storageLocation(), instanceIdentifier + ".changereplay", Akonadi2::Storage::ReadWrite).removeFromDisk(); 390 Sink::Storage(Sink::storageLocation(), instanceIdentifier + ".changereplay", Sink::Storage::ReadWrite).removeFromDisk();
391} 391}
392 392
393qint64 GenericResource::diskUsage(const QByteArray &instanceIdentifier) 393qint64 GenericResource::diskUsage(const QByteArray &instanceIdentifier)
394{ 394{
395 auto size = Akonadi2::Storage(Akonadi2::storageLocation(), instanceIdentifier, Akonadi2::Storage::ReadOnly).diskUsage(); 395 auto size = Sink::Storage(Sink::storageLocation(), instanceIdentifier, Sink::Storage::ReadOnly).diskUsage();
396 size += Akonadi2::Storage(Akonadi2::storageLocation(), instanceIdentifier + ".userqueue", Akonadi2::Storage::ReadOnly).diskUsage(); 396 size += Sink::Storage(Sink::storageLocation(), instanceIdentifier + ".userqueue", Sink::Storage::ReadOnly).diskUsage();
397 size += Akonadi2::Storage(Akonadi2::storageLocation(), instanceIdentifier + ".synchronizerqueue", Akonadi2::Storage::ReadOnly).diskUsage(); 397 size += Sink::Storage(Sink::storageLocation(), instanceIdentifier + ".synchronizerqueue", Sink::Storage::ReadOnly).diskUsage();
398 size += Akonadi2::Storage(Akonadi2::storageLocation(), instanceIdentifier + ".changereplay", Akonadi2::Storage::ReadOnly).diskUsage(); 398 size += Sink::Storage(Sink::storageLocation(), instanceIdentifier + ".changereplay", Sink::Storage::ReadOnly).diskUsage();
399 return size; 399 return size;
400} 400}
401 401
@@ -413,9 +413,9 @@ int GenericResource::error() const
413void GenericResource::enqueueCommand(MessageQueue &mq, int commandId, const QByteArray &data) 413void GenericResource::enqueueCommand(MessageQueue &mq, int commandId, const QByteArray &data)
414{ 414{
415 flatbuffers::FlatBufferBuilder fbb; 415 flatbuffers::FlatBufferBuilder fbb;
416 auto commandData = Akonadi2::EntityBuffer::appendAsVector(fbb, data.constData(), data.size()); 416 auto commandData = Sink::EntityBuffer::appendAsVector(fbb, data.constData(), data.size());
417 auto buffer = Akonadi2::CreateQueuedCommand(fbb, commandId, commandData); 417 auto buffer = Sink::CreateQueuedCommand(fbb, commandId, commandData);
418 Akonadi2::FinishQueuedCommandBuffer(fbb, buffer); 418 Sink::FinishQueuedCommandBuffer(fbb, buffer);
419 mq.enqueue(fbb.GetBufferPointer(), fbb.GetSize()); 419 mq.enqueue(fbb.GetBufferPointer(), fbb.GetSize());
420} 420}
421 421
@@ -440,8 +440,8 @@ KAsync::Job<void> GenericResource::synchronizeWithSource()
440 Log() << " Synchronizing"; 440 Log() << " Synchronizing";
441 //Changereplay would deadlock otherwise when trying to open the synchronization store 441 //Changereplay would deadlock otherwise when trying to open the synchronization store
442 enableChangeReplay(false); 442 enableChangeReplay(false);
443 auto mainStore = QSharedPointer<Akonadi2::Storage>::create(Akonadi2::storageLocation(), mResourceInstanceIdentifier, Akonadi2::Storage::ReadOnly); 443 auto mainStore = QSharedPointer<Sink::Storage>::create(Sink::storageLocation(), mResourceInstanceIdentifier, Sink::Storage::ReadOnly);
444 auto syncStore = QSharedPointer<Akonadi2::Storage>::create(Akonadi2::storageLocation(), mResourceInstanceIdentifier + ".synchronization", Akonadi2::Storage::ReadWrite); 444 auto syncStore = QSharedPointer<Sink::Storage>::create(Sink::storageLocation(), mResourceInstanceIdentifier + ".synchronization", Sink::Storage::ReadWrite);
445 synchronizeWithSource(*mainStore, *syncStore).then<void>([this, mainStore, syncStore]() { 445 synchronizeWithSource(*mainStore, *syncStore).then<void>([this, mainStore, syncStore]() {
446 Log() << "Done Synchronizing"; 446 Log() << "Done Synchronizing";
447 enableChangeReplay(true); 447 enableChangeReplay(true);
@@ -449,7 +449,7 @@ KAsync::Job<void> GenericResource::synchronizeWithSource()
449 }); 449 });
450} 450}
451 451
452KAsync::Job<void> GenericResource::synchronizeWithSource(Akonadi2::Storage &mainStore, Akonadi2::Storage &synchronizationStore) 452KAsync::Job<void> GenericResource::synchronizeWithSource(Sink::Storage &mainStore, Sink::Storage &synchronizationStore)
453{ 453{
454 return KAsync::null<void>(); 454 return KAsync::null<void>();
455} 455}
@@ -508,7 +508,7 @@ void GenericResource::setLowerBoundRevision(qint64 revision)
508 updateLowerBoundRevision(); 508 updateLowerBoundRevision();
509} 509}
510 510
511void GenericResource::createEntity(const QByteArray &akonadiId, const QByteArray &bufferType, const Akonadi2::ApplicationDomain::ApplicationDomainType &domainObject, DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function<void(const QByteArray &)> callback) 511void GenericResource::createEntity(const QByteArray &sinkId, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject, DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function<void(const QByteArray &)> callback)
512{ 512{
513 //These changes are coming from the source 513 //These changes are coming from the source
514 const auto replayToSource = false; 514 const auto replayToSource = false;
@@ -516,45 +516,45 @@ void GenericResource::createEntity(const QByteArray &akonadiId, const QByteArray
516 adaptorFactory.createBuffer(domainObject, entityFbb); 516 adaptorFactory.createBuffer(domainObject, entityFbb);
517 flatbuffers::FlatBufferBuilder fbb; 517 flatbuffers::FlatBufferBuilder fbb;
518 //This is the resource type and not the domain type 518 //This is the resource type and not the domain type
519 auto entityId = fbb.CreateString(akonadiId.toStdString()); 519 auto entityId = fbb.CreateString(sinkId.toStdString());
520 auto type = fbb.CreateString(bufferType.toStdString()); 520 auto type = fbb.CreateString(bufferType.toStdString());
521 auto delta = Akonadi2::EntityBuffer::appendAsVector(fbb, entityFbb.GetBufferPointer(), entityFbb.GetSize()); 521 auto delta = Sink::EntityBuffer::appendAsVector(fbb, entityFbb.GetBufferPointer(), entityFbb.GetSize());
522 auto location = Akonadi2::Commands::CreateCreateEntity(fbb, entityId, type, delta, replayToSource); 522 auto location = Sink::Commands::CreateCreateEntity(fbb, entityId, type, delta, replayToSource);
523 Akonadi2::Commands::FinishCreateEntityBuffer(fbb, location); 523 Sink::Commands::FinishCreateEntityBuffer(fbb, location);
524 callback(BufferUtils::extractBuffer(fbb)); 524 callback(BufferUtils::extractBuffer(fbb));
525} 525}
526 526
527void GenericResource::modifyEntity(const QByteArray &akonadiId, qint64 revision, const QByteArray &bufferType, const Akonadi2::ApplicationDomain::ApplicationDomainType &domainObject, DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function<void(const QByteArray &)> callback) 527void GenericResource::modifyEntity(const QByteArray &sinkId, qint64 revision, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject, DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function<void(const QByteArray &)> callback)
528{ 528{
529 //These changes are coming from the source 529 //These changes are coming from the source
530 const auto replayToSource = false; 530 const auto replayToSource = false;
531 flatbuffers::FlatBufferBuilder entityFbb; 531 flatbuffers::FlatBufferBuilder entityFbb;
532 adaptorFactory.createBuffer(domainObject, entityFbb); 532 adaptorFactory.createBuffer(domainObject, entityFbb);
533 flatbuffers::FlatBufferBuilder fbb; 533 flatbuffers::FlatBufferBuilder fbb;
534 auto entityId = fbb.CreateString(akonadiId.toStdString()); 534 auto entityId = fbb.CreateString(sinkId.toStdString());
535 //This is the resource type and not the domain type 535 //This is the resource type and not the domain type
536 auto type = fbb.CreateString(bufferType.toStdString()); 536 auto type = fbb.CreateString(bufferType.toStdString());
537 auto delta = Akonadi2::EntityBuffer::appendAsVector(fbb, entityFbb.GetBufferPointer(), entityFbb.GetSize()); 537 auto delta = Sink::EntityBuffer::appendAsVector(fbb, entityFbb.GetBufferPointer(), entityFbb.GetSize());
538 //TODO removals 538 //TODO removals
539 auto location = Akonadi2::Commands::CreateModifyEntity(fbb, revision, entityId, 0, type, delta, replayToSource); 539 auto location = Sink::Commands::CreateModifyEntity(fbb, revision, entityId, 0, type, delta, replayToSource);
540 Akonadi2::Commands::FinishModifyEntityBuffer(fbb, location); 540 Sink::Commands::FinishModifyEntityBuffer(fbb, location);
541 callback(BufferUtils::extractBuffer(fbb)); 541 callback(BufferUtils::extractBuffer(fbb));
542} 542}
543 543
544void GenericResource::deleteEntity(const QByteArray &akonadiId, qint64 revision, const QByteArray &bufferType, std::function<void(const QByteArray &)> callback) 544void GenericResource::deleteEntity(const QByteArray &sinkId, qint64 revision, const QByteArray &bufferType, std::function<void(const QByteArray &)> callback)
545{ 545{
546 //These changes are coming from the source 546 //These changes are coming from the source
547 const auto replayToSource = false; 547 const auto replayToSource = false;
548 flatbuffers::FlatBufferBuilder fbb; 548 flatbuffers::FlatBufferBuilder fbb;
549 auto entityId = fbb.CreateString(akonadiId.toStdString()); 549 auto entityId = fbb.CreateString(sinkId.toStdString());
550 //This is the resource type and not the domain type 550 //This is the resource type and not the domain type
551 auto type = fbb.CreateString(bufferType.toStdString()); 551 auto type = fbb.CreateString(bufferType.toStdString());
552 auto location = Akonadi2::Commands::CreateDeleteEntity(fbb, revision, entityId, type, replayToSource); 552 auto location = Sink::Commands::CreateDeleteEntity(fbb, revision, entityId, type, replayToSource);
553 Akonadi2::Commands::FinishDeleteEntityBuffer(fbb, location); 553 Sink::Commands::FinishDeleteEntityBuffer(fbb, location);
554 callback(BufferUtils::extractBuffer(fbb)); 554 callback(BufferUtils::extractBuffer(fbb));
555} 555}
556 556
557void GenericResource::recordRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId, Akonadi2::Storage::Transaction &transaction) 557void GenericResource::recordRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId, Sink::Storage::Transaction &transaction)
558{ 558{
559 Index index("rid.mapping." + bufferType, transaction); 559 Index index("rid.mapping." + bufferType, transaction);
560 Index localIndex("localid.mapping." + bufferType, transaction); 560 Index localIndex("localid.mapping." + bufferType, transaction);
@@ -562,7 +562,7 @@ void GenericResource::recordRemoteId(const QByteArray &bufferType, const QByteAr
562 localIndex.add(localId, remoteId); 562 localIndex.add(localId, remoteId);
563} 563}
564 564
565void GenericResource::removeRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId, Akonadi2::Storage::Transaction &transaction) 565void GenericResource::removeRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId, Sink::Storage::Transaction &transaction)
566{ 566{
567 Index index("rid.mapping." + bufferType, transaction); 567 Index index("rid.mapping." + bufferType, transaction);
568 Index localIndex("localid.mapping." + bufferType, transaction); 568 Index localIndex("localid.mapping." + bufferType, transaction);
@@ -570,21 +570,21 @@ void GenericResource::removeRemoteId(const QByteArray &bufferType, const QByteAr
570 localIndex.remove(localId, remoteId); 570 localIndex.remove(localId, remoteId);
571} 571}
572 572
573QByteArray GenericResource::resolveRemoteId(const QByteArray &bufferType, const QByteArray &remoteId, Akonadi2::Storage::Transaction &transaction) 573QByteArray GenericResource::resolveRemoteId(const QByteArray &bufferType, const QByteArray &remoteId, Sink::Storage::Transaction &transaction)
574{ 574{
575 //Lookup local id for remote id, or insert a new pair otherwise 575 //Lookup local id for remote id, or insert a new pair otherwise
576 Index index("rid.mapping." + bufferType, transaction); 576 Index index("rid.mapping." + bufferType, transaction);
577 Index localIndex("localid.mapping." + bufferType, transaction); 577 Index localIndex("localid.mapping." + bufferType, transaction);
578 QByteArray akonadiId = index.lookup(remoteId); 578 QByteArray sinkId = index.lookup(remoteId);
579 if (akonadiId.isEmpty()) { 579 if (sinkId.isEmpty()) {
580 akonadiId = QUuid::createUuid().toString().toUtf8(); 580 sinkId = QUuid::createUuid().toString().toUtf8();
581 index.add(remoteId, akonadiId); 581 index.add(remoteId, sinkId);
582 localIndex.add(akonadiId, remoteId); 582 localIndex.add(sinkId, remoteId);
583 } 583 }
584 return akonadiId; 584 return sinkId;
585} 585}
586 586
587QByteArray GenericResource::resolveLocalId(const QByteArray &bufferType, const QByteArray &localId, Akonadi2::Storage::Transaction &transaction) 587QByteArray GenericResource::resolveLocalId(const QByteArray &bufferType, const QByteArray &localId, Sink::Storage::Transaction &transaction)
588{ 588{
589 Index index("localid.mapping." + bufferType, transaction); 589 Index index("localid.mapping." + bufferType, transaction);
590 QByteArray remoteId = index.lookup(localId); 590 QByteArray remoteId = index.lookup(localId);
@@ -595,29 +595,29 @@ QByteArray GenericResource::resolveLocalId(const QByteArray &bufferType, const Q
595 return remoteId; 595 return remoteId;
596} 596}
597 597
598void GenericResource::scanForRemovals(Akonadi2::Storage::Transaction &transaction, Akonadi2::Storage::Transaction &synchronizationTransaction, const QByteArray &bufferType, const std::function<void(const std::function<void(const QByteArray &key)> &callback)> &entryGenerator, std::function<bool(const QByteArray &remoteId)> exists) 598void GenericResource::scanForRemovals(Sink::Storage::Transaction &transaction, Sink::Storage::Transaction &synchronizationTransaction, const QByteArray &bufferType, const std::function<void(const std::function<void(const QByteArray &key)> &callback)> &entryGenerator, std::function<bool(const QByteArray &remoteId)> exists)
599{ 599{
600 entryGenerator([this, &transaction, bufferType, &synchronizationTransaction, &exists](const QByteArray &key) { 600 entryGenerator([this, &transaction, bufferType, &synchronizationTransaction, &exists](const QByteArray &key) {
601 auto akonadiId = Akonadi2::Storage::uidFromKey(key); 601 auto sinkId = Sink::Storage::uidFromKey(key);
602 Trace() << "Checking for removal " << key; 602 Trace() << "Checking for removal " << key;
603 const auto remoteId = resolveLocalId(bufferType, akonadiId, synchronizationTransaction); 603 const auto remoteId = resolveLocalId(bufferType, sinkId, synchronizationTransaction);
604 //If we have no remoteId, the entity hasn't been replayed to the source yet 604 //If we have no remoteId, the entity hasn't been replayed to the source yet
605 if (!remoteId.isEmpty()) { 605 if (!remoteId.isEmpty()) {
606 if (!exists(remoteId)) { 606 if (!exists(remoteId)) {
607 Trace() << "Found a removed entity: " << akonadiId; 607 Trace() << "Found a removed entity: " << sinkId;
608 deleteEntity(akonadiId, Akonadi2::Storage::maxRevision(transaction), bufferType, [this](const QByteArray &buffer) { 608 deleteEntity(sinkId, Sink::Storage::maxRevision(transaction), bufferType, [this](const QByteArray &buffer) {
609 enqueueCommand(mSynchronizerQueue, Akonadi2::Commands::DeleteEntityCommand, buffer); 609 enqueueCommand(mSynchronizerQueue, Sink::Commands::DeleteEntityCommand, buffer);
610 }); 610 });
611 } 611 }
612 } 612 }
613 }); 613 });
614} 614}
615 615
616static QSharedPointer<Akonadi2::ApplicationDomain::BufferAdaptor> getLatest(const Akonadi2::Storage::NamedDatabase &db, const QByteArray &uid, DomainTypeAdaptorFactoryInterface &adaptorFactory) 616static QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> getLatest(const Sink::Storage::NamedDatabase &db, const QByteArray &uid, DomainTypeAdaptorFactoryInterface &adaptorFactory)
617{ 617{
618 QSharedPointer<Akonadi2::ApplicationDomain::BufferAdaptor> current; 618 QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> current;
619 db.findLatest(uid, [&current, &adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool { 619 db.findLatest(uid, [&current, &adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool {
620 Akonadi2::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); 620 Sink::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size());
621 if (!buffer.isValid()) { 621 if (!buffer.isValid()) {
622 Warning() << "Read invalid buffer from disk"; 622 Warning() << "Read invalid buffer from disk";
623 } else { 623 } else {
@@ -625,35 +625,35 @@ static QSharedPointer<Akonadi2::ApplicationDomain::BufferAdaptor> getLatest(cons
625 } 625 }
626 return false; 626 return false;
627 }, 627 },
628 [](const Akonadi2::Storage::Error &error) { 628 [](const Sink::Storage::Error &error) {
629 Warning() << "Failed to read current value from storage: " << error.message; 629 Warning() << "Failed to read current value from storage: " << error.message;
630 }); 630 });
631 return current; 631 return current;
632} 632}
633 633
634void GenericResource::createOrModify(Akonadi2::Storage::Transaction &transaction, Akonadi2::Storage::Transaction &synchronizationTransaction, DomainTypeAdaptorFactoryInterface &adaptorFactory, const QByteArray &bufferType, const QByteArray &remoteId, const Akonadi2::ApplicationDomain::ApplicationDomainType &entity) 634void GenericResource::createOrModify(Sink::Storage::Transaction &transaction, Sink::Storage::Transaction &synchronizationTransaction, DomainTypeAdaptorFactoryInterface &adaptorFactory, const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity)
635{ 635{
636 auto mainDatabase = transaction.openDatabase(bufferType + ".main"); 636 auto mainDatabase = transaction.openDatabase(bufferType + ".main");
637 const auto akonadiId = resolveRemoteId(bufferType, remoteId, synchronizationTransaction); 637 const auto sinkId = resolveRemoteId(bufferType, remoteId, synchronizationTransaction);
638 const auto found = mainDatabase.contains(akonadiId); 638 const auto found = mainDatabase.contains(sinkId);
639 if (!found) { 639 if (!found) {
640 Trace() << "Found a new entity: " << remoteId; 640 Trace() << "Found a new entity: " << remoteId;
641 createEntity(akonadiId, bufferType, entity, adaptorFactory, [this](const QByteArray &buffer) { 641 createEntity(sinkId, bufferType, entity, adaptorFactory, [this](const QByteArray &buffer) {
642 enqueueCommand(mSynchronizerQueue, Akonadi2::Commands::CreateEntityCommand, buffer); 642 enqueueCommand(mSynchronizerQueue, Sink::Commands::CreateEntityCommand, buffer);
643 }); 643 });
644 } else { //modification 644 } else { //modification
645 if (auto current = getLatest(mainDatabase, akonadiId, adaptorFactory)) { 645 if (auto current = getLatest(mainDatabase, sinkId, adaptorFactory)) {
646 bool changed = false; 646 bool changed = false;
647 for (const auto &property : entity.changedProperties()) { 647 for (const auto &property : entity.changedProperties()) {
648 if (entity.getProperty(property) != current->getProperty(property)) { 648 if (entity.getProperty(property) != current->getProperty(property)) {
649 Trace() << "Property changed " << akonadiId << property; 649 Trace() << "Property changed " << sinkId << property;
650 changed = true; 650 changed = true;
651 } 651 }
652 } 652 }
653 if (changed) { 653 if (changed) {
654 Trace() << "Found a modified entity: " << remoteId; 654 Trace() << "Found a modified entity: " << remoteId;
655 modifyEntity(akonadiId, Akonadi2::Storage::maxRevision(transaction), bufferType, entity, adaptorFactory, [this](const QByteArray &buffer) { 655 modifyEntity(sinkId, Sink::Storage::maxRevision(transaction), bufferType, entity, adaptorFactory, [this](const QByteArray &buffer) {
656 enqueueCommand(mSynchronizerQueue, Akonadi2::Commands::ModifyEntityCommand, buffer); 656 enqueueCommand(mSynchronizerQueue, Sink::Commands::ModifyEntityCommand, buffer);
657 }); 657 });
658 } 658 }
659 } else { 659 } else {