diff options
Diffstat (limited to 'common/genericresource.cpp')
-rw-r--r-- | common/genericresource.cpp | 174 |
1 files changed, 87 insertions, 87 deletions
diff --git a/common/genericresource.cpp b/common/genericresource.cpp index c7f323a..c7326d3 100644 --- a/common/genericresource.cpp +++ b/common/genericresource.cpp | |||
@@ -20,7 +20,7 @@ | |||
20 | 20 | ||
21 | static int sBatchSize = 100; | 21 | static int sBatchSize = 100; |
22 | 22 | ||
23 | using namespace Akonadi2; | 23 | using namespace Sink; |
24 | 24 | ||
25 | /** | 25 | /** |
26 | * Replays changes from the storage one by one. | 26 | * Replays changes from the storage one by one. |
@@ -105,8 +105,8 @@ public Q_SLOTS: | |||
105 | } | 105 | } |
106 | 106 | ||
107 | private: | 107 | private: |
108 | Akonadi2::Storage mStorage; | 108 | Sink::Storage mStorage; |
109 | Akonadi2::Storage mChangeReplayStore; | 109 | Sink::Storage mChangeReplayStore; |
110 | ReplayFunction mReplayFunction; | 110 | ReplayFunction mReplayFunction; |
111 | }; | 111 | }; |
112 | 112 | ||
@@ -118,7 +118,7 @@ class CommandProcessor : public QObject | |||
118 | Q_OBJECT | 118 | Q_OBJECT |
119 | typedef std::function<KAsync::Job<void>(void const *, size_t)> InspectionFunction; | 119 | typedef std::function<KAsync::Job<void>(void const *, size_t)> InspectionFunction; |
120 | public: | 120 | public: |
121 | CommandProcessor(Akonadi2::Pipeline *pipeline, QList<MessageQueue*> commandQueues) | 121 | CommandProcessor(Sink::Pipeline *pipeline, QList<MessageQueue*> commandQueues) |
122 | : QObject(), | 122 | : QObject(), |
123 | mPipeline(pipeline), | 123 | mPipeline(pipeline), |
124 | mCommandQueues(commandQueues), | 124 | mCommandQueues(commandQueues), |
@@ -175,18 +175,18 @@ private slots: | |||
175 | }).exec(); | 175 | }).exec(); |
176 | } | 176 | } |
177 | 177 | ||
178 | KAsync::Job<qint64> processQueuedCommand(const Akonadi2::QueuedCommand *queuedCommand) | 178 | KAsync::Job<qint64> processQueuedCommand(const Sink::QueuedCommand *queuedCommand) |
179 | { | 179 | { |
180 | Log() << "Processing command: " << Akonadi2::Commands::name(queuedCommand->commandId()); | 180 | Log() << "Processing command: " << Sink::Commands::name(queuedCommand->commandId()); |
181 | //Throw command into appropriate pipeline | 181 | //Throw command into appropriate pipeline |
182 | switch (queuedCommand->commandId()) { | 182 | switch (queuedCommand->commandId()) { |
183 | case Akonadi2::Commands::DeleteEntityCommand: | 183 | case Sink::Commands::DeleteEntityCommand: |
184 | return mPipeline->deletedEntity(queuedCommand->command()->Data(), queuedCommand->command()->size()); | 184 | return mPipeline->deletedEntity(queuedCommand->command()->Data(), queuedCommand->command()->size()); |
185 | case Akonadi2::Commands::ModifyEntityCommand: | 185 | case Sink::Commands::ModifyEntityCommand: |
186 | return mPipeline->modifiedEntity(queuedCommand->command()->Data(), queuedCommand->command()->size()); | 186 | return mPipeline->modifiedEntity(queuedCommand->command()->Data(), queuedCommand->command()->size()); |
187 | case Akonadi2::Commands::CreateEntityCommand: | 187 | case Sink::Commands::CreateEntityCommand: |
188 | return mPipeline->newEntity(queuedCommand->command()->Data(), queuedCommand->command()->size()); | 188 | return mPipeline->newEntity(queuedCommand->command()->Data(), queuedCommand->command()->size()); |
189 | case Akonadi2::Commands::InspectionCommand: | 189 | case Sink::Commands::InspectionCommand: |
190 | if (mInspect) { | 190 | if (mInspect) { |
191 | return mInspect(queuedCommand->command()->Data(), queuedCommand->command()->size()).then<qint64>([]() { | 191 | return mInspect(queuedCommand->command()->Data(), queuedCommand->command()->size()).then<qint64>([]() { |
192 | return -1; | 192 | return -1; |
@@ -203,16 +203,16 @@ private slots: | |||
203 | KAsync::Job<qint64, qint64> processQueuedCommand(const QByteArray &data) | 203 | KAsync::Job<qint64, qint64> processQueuedCommand(const QByteArray &data) |
204 | { | 204 | { |
205 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(data.constData()), data.size()); | 205 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(data.constData()), data.size()); |
206 | if (!Akonadi2::VerifyQueuedCommandBuffer(verifyer)) { | 206 | if (!Sink::VerifyQueuedCommandBuffer(verifyer)) { |
207 | Warning() << "invalid buffer"; | 207 | Warning() << "invalid buffer"; |
208 | // return KAsync::error<void, qint64>(1, "Invalid Buffer"); | 208 | // return KAsync::error<void, qint64>(1, "Invalid Buffer"); |
209 | } | 209 | } |
210 | auto queuedCommand = Akonadi2::GetQueuedCommand(data.constData()); | 210 | auto queuedCommand = Sink::GetQueuedCommand(data.constData()); |
211 | const auto commandId = queuedCommand->commandId(); | 211 | const auto commandId = queuedCommand->commandId(); |
212 | Trace() << "Dequeued Command: " << Akonadi2::Commands::name(commandId); | 212 | Trace() << "Dequeued Command: " << Sink::Commands::name(commandId); |
213 | return processQueuedCommand(queuedCommand).then<qint64, qint64>( | 213 | return processQueuedCommand(queuedCommand).then<qint64, qint64>( |
214 | [commandId](qint64 createdRevision) -> qint64 { | 214 | [commandId](qint64 createdRevision) -> qint64 { |
215 | Trace() << "Command pipeline processed: " << Akonadi2::Commands::name(commandId); | 215 | Trace() << "Command pipeline processed: " << Sink::Commands::name(commandId); |
216 | return createdRevision; | 216 | return createdRevision; |
217 | } | 217 | } |
218 | , | 218 | , |
@@ -278,7 +278,7 @@ private slots: | |||
278 | } | 278 | } |
279 | 279 | ||
280 | private: | 280 | private: |
281 | Akonadi2::Pipeline *mPipeline; | 281 | Sink::Pipeline *mPipeline; |
282 | //Ordered by priority | 282 | //Ordered by priority |
283 | QList<MessageQueue*> mCommandQueues; | 283 | QList<MessageQueue*> mCommandQueues; |
284 | bool mProcessingLock; | 284 | bool mProcessingLock; |
@@ -289,19 +289,19 @@ private: | |||
289 | 289 | ||
290 | 290 | ||
291 | GenericResource::GenericResource(const QByteArray &resourceInstanceIdentifier, const QSharedPointer<Pipeline> &pipeline) | 291 | GenericResource::GenericResource(const QByteArray &resourceInstanceIdentifier, const QSharedPointer<Pipeline> &pipeline) |
292 | : Akonadi2::Resource(), | 292 | : Sink::Resource(), |
293 | mUserQueue(Akonadi2::storageLocation(), resourceInstanceIdentifier + ".userqueue"), | 293 | mUserQueue(Sink::storageLocation(), resourceInstanceIdentifier + ".userqueue"), |
294 | mSynchronizerQueue(Akonadi2::storageLocation(), resourceInstanceIdentifier + ".synchronizerqueue"), | 294 | mSynchronizerQueue(Sink::storageLocation(), resourceInstanceIdentifier + ".synchronizerqueue"), |
295 | mResourceInstanceIdentifier(resourceInstanceIdentifier), | 295 | mResourceInstanceIdentifier(resourceInstanceIdentifier), |
296 | mPipeline(pipeline ? pipeline : QSharedPointer<Akonadi2::Pipeline>::create(resourceInstanceIdentifier)), | 296 | mPipeline(pipeline ? pipeline : QSharedPointer<Sink::Pipeline>::create(resourceInstanceIdentifier)), |
297 | mError(0), | 297 | mError(0), |
298 | mClientLowerBoundRevision(std::numeric_limits<qint64>::max()) | 298 | mClientLowerBoundRevision(std::numeric_limits<qint64>::max()) |
299 | { | 299 | { |
300 | mProcessor = new CommandProcessor(mPipeline.data(), QList<MessageQueue*>() << &mUserQueue << &mSynchronizerQueue); | 300 | mProcessor = new CommandProcessor(mPipeline.data(), QList<MessageQueue*>() << &mUserQueue << &mSynchronizerQueue); |
301 | mProcessor->setInspectionCommand([this](void const *command, size_t size) { | 301 | mProcessor->setInspectionCommand([this](void const *command, size_t size) { |
302 | flatbuffers::Verifier verifier((const uint8_t *)command, size); | 302 | flatbuffers::Verifier verifier((const uint8_t *)command, size); |
303 | if (Akonadi2::Commands::VerifyInspectionBuffer(verifier)) { | 303 | if (Sink::Commands::VerifyInspectionBuffer(verifier)) { |
304 | auto buffer = Akonadi2::Commands::GetInspection(command); | 304 | auto buffer = Sink::Commands::GetInspection(command); |
305 | int inspectionType = buffer->type(); | 305 | int inspectionType = buffer->type(); |
306 | 306 | ||
307 | QByteArray inspectionId = BufferUtils::extractBuffer(buffer->id()); | 307 | QByteArray inspectionId = BufferUtils::extractBuffer(buffer->id()); |
@@ -313,17 +313,17 @@ GenericResource::GenericResource(const QByteArray &resourceInstanceIdentifier, c | |||
313 | QVariant expectedValue; | 313 | QVariant expectedValue; |
314 | s >> expectedValue; | 314 | s >> expectedValue; |
315 | inspect(inspectionType, inspectionId, domainType, entityId, property, expectedValue).then<void>([=]() { | 315 | inspect(inspectionType, inspectionId, domainType, entityId, property, expectedValue).then<void>([=]() { |
316 | Akonadi2::Notification n; | 316 | Sink::Notification n; |
317 | n.type = Akonadi2::Commands::NotificationType_Inspection; | 317 | n.type = Sink::Commands::NotificationType_Inspection; |
318 | n.id = inspectionId; | 318 | n.id = inspectionId; |
319 | n.code = Akonadi2::Commands::NotificationCode_Success; | 319 | n.code = Sink::Commands::NotificationCode_Success; |
320 | emit notify(n); | 320 | emit notify(n); |
321 | }, [=](int code, const QString &message) { | 321 | }, [=](int code, const QString &message) { |
322 | Akonadi2::Notification n; | 322 | Sink::Notification n; |
323 | n.type = Akonadi2::Commands::NotificationType_Inspection; | 323 | n.type = Sink::Commands::NotificationType_Inspection; |
324 | n.message = message; | 324 | n.message = message; |
325 | n.id = inspectionId; | 325 | n.id = inspectionId; |
326 | n.code = Akonadi2::Commands::NotificationCode_Failure; | 326 | n.code = Sink::Commands::NotificationCode_Failure; |
327 | emit notify(n); | 327 | emit notify(n); |
328 | }).exec(); | 328 | }).exec(); |
329 | return KAsync::null<void>(); | 329 | return KAsync::null<void>(); |
@@ -334,7 +334,7 @@ GenericResource::GenericResource(const QByteArray &resourceInstanceIdentifier, c | |||
334 | QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, this, &Resource::revisionUpdated); | 334 | QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, this, &Resource::revisionUpdated); |
335 | mSourceChangeReplay = new ChangeReplay(resourceInstanceIdentifier, [this](const QByteArray &type, const QByteArray &key, const QByteArray &value) { | 335 | mSourceChangeReplay = new ChangeReplay(resourceInstanceIdentifier, [this](const QByteArray &type, const QByteArray &key, const QByteArray &value) { |
336 | //This results in a deadlock when a sync is in progress and we try to create a second writing transaction (which is why we turn changereplay off during the sync) | 336 | //This results in a deadlock when a sync is in progress and we try to create a second writing transaction (which is why we turn changereplay off during the sync) |
337 | auto synchronizationStore = QSharedPointer<Akonadi2::Storage>::create(Akonadi2::storageLocation(), mResourceInstanceIdentifier + ".synchronization", Akonadi2::Storage::ReadWrite); | 337 | auto synchronizationStore = QSharedPointer<Sink::Storage>::create(Sink::storageLocation(), mResourceInstanceIdentifier + ".synchronization", Sink::Storage::ReadWrite); |
338 | return this->replay(*synchronizationStore, type, key, value).then<void>([synchronizationStore](){}); | 338 | return this->replay(*synchronizationStore, type, key, value).then<void>([synchronizationStore](){}); |
339 | }); | 339 | }); |
340 | enableChangeReplay(true); | 340 | enableChangeReplay(true); |
@@ -370,13 +370,13 @@ void GenericResource::enableChangeReplay(bool enable) | |||
370 | } | 370 | } |
371 | } | 371 | } |
372 | 372 | ||
373 | void GenericResource::addType(const QByteArray &type, DomainTypeAdaptorFactoryInterface::Ptr factory, const QVector<Akonadi2::Preprocessor*> &preprocessors) | 373 | void GenericResource::addType(const QByteArray &type, DomainTypeAdaptorFactoryInterface::Ptr factory, const QVector<Sink::Preprocessor*> &preprocessors) |
374 | { | 374 | { |
375 | mPipeline->setPreprocessors(type, preprocessors); | 375 | mPipeline->setPreprocessors(type, preprocessors); |
376 | mPipeline->setAdaptorFactory(type, factory); | 376 | mPipeline->setAdaptorFactory(type, factory); |
377 | } | 377 | } |
378 | 378 | ||
379 | KAsync::Job<void> GenericResource::replay(Akonadi2::Storage &synchronizationStore, const QByteArray &type, const QByteArray &key, const QByteArray &value) | 379 | KAsync::Job<void> GenericResource::replay(Sink::Storage &synchronizationStore, const QByteArray &type, const QByteArray &key, const QByteArray &value) |
380 | { | 380 | { |
381 | return KAsync::null<void>(); | 381 | return KAsync::null<void>(); |
382 | } | 382 | } |
@@ -384,18 +384,18 @@ KAsync::Job<void> GenericResource::replay(Akonadi2::Storage &synchronizationStor | |||
384 | void GenericResource::removeFromDisk(const QByteArray &instanceIdentifier) | 384 | void GenericResource::removeFromDisk(const QByteArray &instanceIdentifier) |
385 | { | 385 | { |
386 | Warning() << "Removing from generic resource"; | 386 | Warning() << "Removing from generic resource"; |
387 | Akonadi2::Storage(Akonadi2::storageLocation(), instanceIdentifier, Akonadi2::Storage::ReadWrite).removeFromDisk(); | 387 | Sink::Storage(Sink::storageLocation(), instanceIdentifier, Sink::Storage::ReadWrite).removeFromDisk(); |
388 | Akonadi2::Storage(Akonadi2::storageLocation(), instanceIdentifier + ".userqueue", Akonadi2::Storage::ReadWrite).removeFromDisk(); | 388 | Sink::Storage(Sink::storageLocation(), instanceIdentifier + ".userqueue", Sink::Storage::ReadWrite).removeFromDisk(); |
389 | Akonadi2::Storage(Akonadi2::storageLocation(), instanceIdentifier + ".synchronizerqueue", Akonadi2::Storage::ReadWrite).removeFromDisk(); | 389 | Sink::Storage(Sink::storageLocation(), instanceIdentifier + ".synchronizerqueue", Sink::Storage::ReadWrite).removeFromDisk(); |
390 | Akonadi2::Storage(Akonadi2::storageLocation(), instanceIdentifier + ".changereplay", Akonadi2::Storage::ReadWrite).removeFromDisk(); | 390 | Sink::Storage(Sink::storageLocation(), instanceIdentifier + ".changereplay", Sink::Storage::ReadWrite).removeFromDisk(); |
391 | } | 391 | } |
392 | 392 | ||
393 | qint64 GenericResource::diskUsage(const QByteArray &instanceIdentifier) | 393 | qint64 GenericResource::diskUsage(const QByteArray &instanceIdentifier) |
394 | { | 394 | { |
395 | auto size = Akonadi2::Storage(Akonadi2::storageLocation(), instanceIdentifier, Akonadi2::Storage::ReadOnly).diskUsage(); | 395 | auto size = Sink::Storage(Sink::storageLocation(), instanceIdentifier, Sink::Storage::ReadOnly).diskUsage(); |
396 | size += Akonadi2::Storage(Akonadi2::storageLocation(), instanceIdentifier + ".userqueue", Akonadi2::Storage::ReadOnly).diskUsage(); | 396 | size += Sink::Storage(Sink::storageLocation(), instanceIdentifier + ".userqueue", Sink::Storage::ReadOnly).diskUsage(); |
397 | size += Akonadi2::Storage(Akonadi2::storageLocation(), instanceIdentifier + ".synchronizerqueue", Akonadi2::Storage::ReadOnly).diskUsage(); | 397 | size += Sink::Storage(Sink::storageLocation(), instanceIdentifier + ".synchronizerqueue", Sink::Storage::ReadOnly).diskUsage(); |
398 | size += Akonadi2::Storage(Akonadi2::storageLocation(), instanceIdentifier + ".changereplay", Akonadi2::Storage::ReadOnly).diskUsage(); | 398 | size += Sink::Storage(Sink::storageLocation(), instanceIdentifier + ".changereplay", Sink::Storage::ReadOnly).diskUsage(); |
399 | return size; | 399 | return size; |
400 | } | 400 | } |
401 | 401 | ||
@@ -413,9 +413,9 @@ int GenericResource::error() const | |||
413 | void GenericResource::enqueueCommand(MessageQueue &mq, int commandId, const QByteArray &data) | 413 | void GenericResource::enqueueCommand(MessageQueue &mq, int commandId, const QByteArray &data) |
414 | { | 414 | { |
415 | flatbuffers::FlatBufferBuilder fbb; | 415 | flatbuffers::FlatBufferBuilder fbb; |
416 | auto commandData = Akonadi2::EntityBuffer::appendAsVector(fbb, data.constData(), data.size()); | 416 | auto commandData = Sink::EntityBuffer::appendAsVector(fbb, data.constData(), data.size()); |
417 | auto buffer = Akonadi2::CreateQueuedCommand(fbb, commandId, commandData); | 417 | auto buffer = Sink::CreateQueuedCommand(fbb, commandId, commandData); |
418 | Akonadi2::FinishQueuedCommandBuffer(fbb, buffer); | 418 | Sink::FinishQueuedCommandBuffer(fbb, buffer); |
419 | mq.enqueue(fbb.GetBufferPointer(), fbb.GetSize()); | 419 | mq.enqueue(fbb.GetBufferPointer(), fbb.GetSize()); |
420 | } | 420 | } |
421 | 421 | ||
@@ -440,8 +440,8 @@ KAsync::Job<void> GenericResource::synchronizeWithSource() | |||
440 | Log() << " Synchronizing"; | 440 | Log() << " Synchronizing"; |
441 | //Changereplay would deadlock otherwise when trying to open the synchronization store | 441 | //Changereplay would deadlock otherwise when trying to open the synchronization store |
442 | enableChangeReplay(false); | 442 | enableChangeReplay(false); |
443 | auto mainStore = QSharedPointer<Akonadi2::Storage>::create(Akonadi2::storageLocation(), mResourceInstanceIdentifier, Akonadi2::Storage::ReadOnly); | 443 | auto mainStore = QSharedPointer<Sink::Storage>::create(Sink::storageLocation(), mResourceInstanceIdentifier, Sink::Storage::ReadOnly); |
444 | auto syncStore = QSharedPointer<Akonadi2::Storage>::create(Akonadi2::storageLocation(), mResourceInstanceIdentifier + ".synchronization", Akonadi2::Storage::ReadWrite); | 444 | auto syncStore = QSharedPointer<Sink::Storage>::create(Sink::storageLocation(), mResourceInstanceIdentifier + ".synchronization", Sink::Storage::ReadWrite); |
445 | synchronizeWithSource(*mainStore, *syncStore).then<void>([this, mainStore, syncStore]() { | 445 | synchronizeWithSource(*mainStore, *syncStore).then<void>([this, mainStore, syncStore]() { |
446 | Log() << "Done Synchronizing"; | 446 | Log() << "Done Synchronizing"; |
447 | enableChangeReplay(true); | 447 | enableChangeReplay(true); |
@@ -449,7 +449,7 @@ KAsync::Job<void> GenericResource::synchronizeWithSource() | |||
449 | }); | 449 | }); |
450 | } | 450 | } |
451 | 451 | ||
452 | KAsync::Job<void> GenericResource::synchronizeWithSource(Akonadi2::Storage &mainStore, Akonadi2::Storage &synchronizationStore) | 452 | KAsync::Job<void> GenericResource::synchronizeWithSource(Sink::Storage &mainStore, Sink::Storage &synchronizationStore) |
453 | { | 453 | { |
454 | return KAsync::null<void>(); | 454 | return KAsync::null<void>(); |
455 | } | 455 | } |
@@ -508,7 +508,7 @@ void GenericResource::setLowerBoundRevision(qint64 revision) | |||
508 | updateLowerBoundRevision(); | 508 | updateLowerBoundRevision(); |
509 | } | 509 | } |
510 | 510 | ||
511 | void GenericResource::createEntity(const QByteArray &akonadiId, const QByteArray &bufferType, const Akonadi2::ApplicationDomain::ApplicationDomainType &domainObject, DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function<void(const QByteArray &)> callback) | 511 | void GenericResource::createEntity(const QByteArray &sinkId, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject, DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function<void(const QByteArray &)> callback) |
512 | { | 512 | { |
513 | //These changes are coming from the source | 513 | //These changes are coming from the source |
514 | const auto replayToSource = false; | 514 | const auto replayToSource = false; |
@@ -516,45 +516,45 @@ void GenericResource::createEntity(const QByteArray &akonadiId, const QByteArray | |||
516 | adaptorFactory.createBuffer(domainObject, entityFbb); | 516 | adaptorFactory.createBuffer(domainObject, entityFbb); |
517 | flatbuffers::FlatBufferBuilder fbb; | 517 | flatbuffers::FlatBufferBuilder fbb; |
518 | //This is the resource type and not the domain type | 518 | //This is the resource type and not the domain type |
519 | auto entityId = fbb.CreateString(akonadiId.toStdString()); | 519 | auto entityId = fbb.CreateString(sinkId.toStdString()); |
520 | auto type = fbb.CreateString(bufferType.toStdString()); | 520 | auto type = fbb.CreateString(bufferType.toStdString()); |
521 | auto delta = Akonadi2::EntityBuffer::appendAsVector(fbb, entityFbb.GetBufferPointer(), entityFbb.GetSize()); | 521 | auto delta = Sink::EntityBuffer::appendAsVector(fbb, entityFbb.GetBufferPointer(), entityFbb.GetSize()); |
522 | auto location = Akonadi2::Commands::CreateCreateEntity(fbb, entityId, type, delta, replayToSource); | 522 | auto location = Sink::Commands::CreateCreateEntity(fbb, entityId, type, delta, replayToSource); |
523 | Akonadi2::Commands::FinishCreateEntityBuffer(fbb, location); | 523 | Sink::Commands::FinishCreateEntityBuffer(fbb, location); |
524 | callback(BufferUtils::extractBuffer(fbb)); | 524 | callback(BufferUtils::extractBuffer(fbb)); |
525 | } | 525 | } |
526 | 526 | ||
527 | void GenericResource::modifyEntity(const QByteArray &akonadiId, qint64 revision, const QByteArray &bufferType, const Akonadi2::ApplicationDomain::ApplicationDomainType &domainObject, DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function<void(const QByteArray &)> callback) | 527 | void GenericResource::modifyEntity(const QByteArray &sinkId, qint64 revision, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject, DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function<void(const QByteArray &)> callback) |
528 | { | 528 | { |
529 | //These changes are coming from the source | 529 | //These changes are coming from the source |
530 | const auto replayToSource = false; | 530 | const auto replayToSource = false; |
531 | flatbuffers::FlatBufferBuilder entityFbb; | 531 | flatbuffers::FlatBufferBuilder entityFbb; |
532 | adaptorFactory.createBuffer(domainObject, entityFbb); | 532 | adaptorFactory.createBuffer(domainObject, entityFbb); |
533 | flatbuffers::FlatBufferBuilder fbb; | 533 | flatbuffers::FlatBufferBuilder fbb; |
534 | auto entityId = fbb.CreateString(akonadiId.toStdString()); | 534 | auto entityId = fbb.CreateString(sinkId.toStdString()); |
535 | //This is the resource type and not the domain type | 535 | //This is the resource type and not the domain type |
536 | auto type = fbb.CreateString(bufferType.toStdString()); | 536 | auto type = fbb.CreateString(bufferType.toStdString()); |
537 | auto delta = Akonadi2::EntityBuffer::appendAsVector(fbb, entityFbb.GetBufferPointer(), entityFbb.GetSize()); | 537 | auto delta = Sink::EntityBuffer::appendAsVector(fbb, entityFbb.GetBufferPointer(), entityFbb.GetSize()); |
538 | //TODO removals | 538 | //TODO removals |
539 | auto location = Akonadi2::Commands::CreateModifyEntity(fbb, revision, entityId, 0, type, delta, replayToSource); | 539 | auto location = Sink::Commands::CreateModifyEntity(fbb, revision, entityId, 0, type, delta, replayToSource); |
540 | Akonadi2::Commands::FinishModifyEntityBuffer(fbb, location); | 540 | Sink::Commands::FinishModifyEntityBuffer(fbb, location); |
541 | callback(BufferUtils::extractBuffer(fbb)); | 541 | callback(BufferUtils::extractBuffer(fbb)); |
542 | } | 542 | } |
543 | 543 | ||
544 | void GenericResource::deleteEntity(const QByteArray &akonadiId, qint64 revision, const QByteArray &bufferType, std::function<void(const QByteArray &)> callback) | 544 | void GenericResource::deleteEntity(const QByteArray &sinkId, qint64 revision, const QByteArray &bufferType, std::function<void(const QByteArray &)> callback) |
545 | { | 545 | { |
546 | //These changes are coming from the source | 546 | //These changes are coming from the source |
547 | const auto replayToSource = false; | 547 | const auto replayToSource = false; |
548 | flatbuffers::FlatBufferBuilder fbb; | 548 | flatbuffers::FlatBufferBuilder fbb; |
549 | auto entityId = fbb.CreateString(akonadiId.toStdString()); | 549 | auto entityId = fbb.CreateString(sinkId.toStdString()); |
550 | //This is the resource type and not the domain type | 550 | //This is the resource type and not the domain type |
551 | auto type = fbb.CreateString(bufferType.toStdString()); | 551 | auto type = fbb.CreateString(bufferType.toStdString()); |
552 | auto location = Akonadi2::Commands::CreateDeleteEntity(fbb, revision, entityId, type, replayToSource); | 552 | auto location = Sink::Commands::CreateDeleteEntity(fbb, revision, entityId, type, replayToSource); |
553 | Akonadi2::Commands::FinishDeleteEntityBuffer(fbb, location); | 553 | Sink::Commands::FinishDeleteEntityBuffer(fbb, location); |
554 | callback(BufferUtils::extractBuffer(fbb)); | 554 | callback(BufferUtils::extractBuffer(fbb)); |
555 | } | 555 | } |
556 | 556 | ||
557 | void GenericResource::recordRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId, Akonadi2::Storage::Transaction &transaction) | 557 | void GenericResource::recordRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId, Sink::Storage::Transaction &transaction) |
558 | { | 558 | { |
559 | Index index("rid.mapping." + bufferType, transaction); | 559 | Index index("rid.mapping." + bufferType, transaction); |
560 | Index localIndex("localid.mapping." + bufferType, transaction); | 560 | Index localIndex("localid.mapping." + bufferType, transaction); |
@@ -562,7 +562,7 @@ void GenericResource::recordRemoteId(const QByteArray &bufferType, const QByteAr | |||
562 | localIndex.add(localId, remoteId); | 562 | localIndex.add(localId, remoteId); |
563 | } | 563 | } |
564 | 564 | ||
565 | void GenericResource::removeRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId, Akonadi2::Storage::Transaction &transaction) | 565 | void GenericResource::removeRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId, Sink::Storage::Transaction &transaction) |
566 | { | 566 | { |
567 | Index index("rid.mapping." + bufferType, transaction); | 567 | Index index("rid.mapping." + bufferType, transaction); |
568 | Index localIndex("localid.mapping." + bufferType, transaction); | 568 | Index localIndex("localid.mapping." + bufferType, transaction); |
@@ -570,21 +570,21 @@ void GenericResource::removeRemoteId(const QByteArray &bufferType, const QByteAr | |||
570 | localIndex.remove(localId, remoteId); | 570 | localIndex.remove(localId, remoteId); |
571 | } | 571 | } |
572 | 572 | ||
573 | QByteArray GenericResource::resolveRemoteId(const QByteArray &bufferType, const QByteArray &remoteId, Akonadi2::Storage::Transaction &transaction) | 573 | QByteArray GenericResource::resolveRemoteId(const QByteArray &bufferType, const QByteArray &remoteId, Sink::Storage::Transaction &transaction) |
574 | { | 574 | { |
575 | //Lookup local id for remote id, or insert a new pair otherwise | 575 | //Lookup local id for remote id, or insert a new pair otherwise |
576 | Index index("rid.mapping." + bufferType, transaction); | 576 | Index index("rid.mapping." + bufferType, transaction); |
577 | Index localIndex("localid.mapping." + bufferType, transaction); | 577 | Index localIndex("localid.mapping." + bufferType, transaction); |
578 | QByteArray akonadiId = index.lookup(remoteId); | 578 | QByteArray sinkId = index.lookup(remoteId); |
579 | if (akonadiId.isEmpty()) { | 579 | if (sinkId.isEmpty()) { |
580 | akonadiId = QUuid::createUuid().toString().toUtf8(); | 580 | sinkId = QUuid::createUuid().toString().toUtf8(); |
581 | index.add(remoteId, akonadiId); | 581 | index.add(remoteId, sinkId); |
582 | localIndex.add(akonadiId, remoteId); | 582 | localIndex.add(sinkId, remoteId); |
583 | } | 583 | } |
584 | return akonadiId; | 584 | return sinkId; |
585 | } | 585 | } |
586 | 586 | ||
587 | QByteArray GenericResource::resolveLocalId(const QByteArray &bufferType, const QByteArray &localId, Akonadi2::Storage::Transaction &transaction) | 587 | QByteArray GenericResource::resolveLocalId(const QByteArray &bufferType, const QByteArray &localId, Sink::Storage::Transaction &transaction) |
588 | { | 588 | { |
589 | Index index("localid.mapping." + bufferType, transaction); | 589 | Index index("localid.mapping." + bufferType, transaction); |
590 | QByteArray remoteId = index.lookup(localId); | 590 | QByteArray remoteId = index.lookup(localId); |
@@ -595,29 +595,29 @@ QByteArray GenericResource::resolveLocalId(const QByteArray &bufferType, const Q | |||
595 | return remoteId; | 595 | return remoteId; |
596 | } | 596 | } |
597 | 597 | ||
598 | void GenericResource::scanForRemovals(Akonadi2::Storage::Transaction &transaction, Akonadi2::Storage::Transaction &synchronizationTransaction, const QByteArray &bufferType, const std::function<void(const std::function<void(const QByteArray &key)> &callback)> &entryGenerator, std::function<bool(const QByteArray &remoteId)> exists) | 598 | void GenericResource::scanForRemovals(Sink::Storage::Transaction &transaction, Sink::Storage::Transaction &synchronizationTransaction, const QByteArray &bufferType, const std::function<void(const std::function<void(const QByteArray &key)> &callback)> &entryGenerator, std::function<bool(const QByteArray &remoteId)> exists) |
599 | { | 599 | { |
600 | entryGenerator([this, &transaction, bufferType, &synchronizationTransaction, &exists](const QByteArray &key) { | 600 | entryGenerator([this, &transaction, bufferType, &synchronizationTransaction, &exists](const QByteArray &key) { |
601 | auto akonadiId = Akonadi2::Storage::uidFromKey(key); | 601 | auto sinkId = Sink::Storage::uidFromKey(key); |
602 | Trace() << "Checking for removal " << key; | 602 | Trace() << "Checking for removal " << key; |
603 | const auto remoteId = resolveLocalId(bufferType, akonadiId, synchronizationTransaction); | 603 | const auto remoteId = resolveLocalId(bufferType, sinkId, synchronizationTransaction); |
604 | //If we have no remoteId, the entity hasn't been replayed to the source yet | 604 | //If we have no remoteId, the entity hasn't been replayed to the source yet |
605 | if (!remoteId.isEmpty()) { | 605 | if (!remoteId.isEmpty()) { |
606 | if (!exists(remoteId)) { | 606 | if (!exists(remoteId)) { |
607 | Trace() << "Found a removed entity: " << akonadiId; | 607 | Trace() << "Found a removed entity: " << sinkId; |
608 | deleteEntity(akonadiId, Akonadi2::Storage::maxRevision(transaction), bufferType, [this](const QByteArray &buffer) { | 608 | deleteEntity(sinkId, Sink::Storage::maxRevision(transaction), bufferType, [this](const QByteArray &buffer) { |
609 | enqueueCommand(mSynchronizerQueue, Akonadi2::Commands::DeleteEntityCommand, buffer); | 609 | enqueueCommand(mSynchronizerQueue, Sink::Commands::DeleteEntityCommand, buffer); |
610 | }); | 610 | }); |
611 | } | 611 | } |
612 | } | 612 | } |
613 | }); | 613 | }); |
614 | } | 614 | } |
615 | 615 | ||
616 | static QSharedPointer<Akonadi2::ApplicationDomain::BufferAdaptor> getLatest(const Akonadi2::Storage::NamedDatabase &db, const QByteArray &uid, DomainTypeAdaptorFactoryInterface &adaptorFactory) | 616 | static QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> getLatest(const Sink::Storage::NamedDatabase &db, const QByteArray &uid, DomainTypeAdaptorFactoryInterface &adaptorFactory) |
617 | { | 617 | { |
618 | QSharedPointer<Akonadi2::ApplicationDomain::BufferAdaptor> current; | 618 | QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> current; |
619 | db.findLatest(uid, [¤t, &adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool { | 619 | db.findLatest(uid, [¤t, &adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool { |
620 | Akonadi2::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); | 620 | Sink::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); |
621 | if (!buffer.isValid()) { | 621 | if (!buffer.isValid()) { |
622 | Warning() << "Read invalid buffer from disk"; | 622 | Warning() << "Read invalid buffer from disk"; |
623 | } else { | 623 | } else { |
@@ -625,35 +625,35 @@ static QSharedPointer<Akonadi2::ApplicationDomain::BufferAdaptor> getLatest(cons | |||
625 | } | 625 | } |
626 | return false; | 626 | return false; |
627 | }, | 627 | }, |
628 | [](const Akonadi2::Storage::Error &error) { | 628 | [](const Sink::Storage::Error &error) { |
629 | Warning() << "Failed to read current value from storage: " << error.message; | 629 | Warning() << "Failed to read current value from storage: " << error.message; |
630 | }); | 630 | }); |
631 | return current; | 631 | return current; |
632 | } | 632 | } |
633 | 633 | ||
634 | void GenericResource::createOrModify(Akonadi2::Storage::Transaction &transaction, Akonadi2::Storage::Transaction &synchronizationTransaction, DomainTypeAdaptorFactoryInterface &adaptorFactory, const QByteArray &bufferType, const QByteArray &remoteId, const Akonadi2::ApplicationDomain::ApplicationDomainType &entity) | 634 | void GenericResource::createOrModify(Sink::Storage::Transaction &transaction, Sink::Storage::Transaction &synchronizationTransaction, DomainTypeAdaptorFactoryInterface &adaptorFactory, const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity) |
635 | { | 635 | { |
636 | auto mainDatabase = transaction.openDatabase(bufferType + ".main"); | 636 | auto mainDatabase = transaction.openDatabase(bufferType + ".main"); |
637 | const auto akonadiId = resolveRemoteId(bufferType, remoteId, synchronizationTransaction); | 637 | const auto sinkId = resolveRemoteId(bufferType, remoteId, synchronizationTransaction); |
638 | const auto found = mainDatabase.contains(akonadiId); | 638 | const auto found = mainDatabase.contains(sinkId); |
639 | if (!found) { | 639 | if (!found) { |
640 | Trace() << "Found a new entity: " << remoteId; | 640 | Trace() << "Found a new entity: " << remoteId; |
641 | createEntity(akonadiId, bufferType, entity, adaptorFactory, [this](const QByteArray &buffer) { | 641 | createEntity(sinkId, bufferType, entity, adaptorFactory, [this](const QByteArray &buffer) { |
642 | enqueueCommand(mSynchronizerQueue, Akonadi2::Commands::CreateEntityCommand, buffer); | 642 | enqueueCommand(mSynchronizerQueue, Sink::Commands::CreateEntityCommand, buffer); |
643 | }); | 643 | }); |
644 | } else { //modification | 644 | } else { //modification |
645 | if (auto current = getLatest(mainDatabase, akonadiId, adaptorFactory)) { | 645 | if (auto current = getLatest(mainDatabase, sinkId, adaptorFactory)) { |
646 | bool changed = false; | 646 | bool changed = false; |
647 | for (const auto &property : entity.changedProperties()) { | 647 | for (const auto &property : entity.changedProperties()) { |
648 | if (entity.getProperty(property) != current->getProperty(property)) { | 648 | if (entity.getProperty(property) != current->getProperty(property)) { |
649 | Trace() << "Property changed " << akonadiId << property; | 649 | Trace() << "Property changed " << sinkId << property; |
650 | changed = true; | 650 | changed = true; |
651 | } | 651 | } |
652 | } | 652 | } |
653 | if (changed) { | 653 | if (changed) { |
654 | Trace() << "Found a modified entity: " << remoteId; | 654 | Trace() << "Found a modified entity: " << remoteId; |
655 | modifyEntity(akonadiId, Akonadi2::Storage::maxRevision(transaction), bufferType, entity, adaptorFactory, [this](const QByteArray &buffer) { | 655 | modifyEntity(sinkId, Sink::Storage::maxRevision(transaction), bufferType, entity, adaptorFactory, [this](const QByteArray &buffer) { |
656 | enqueueCommand(mSynchronizerQueue, Akonadi2::Commands::ModifyEntityCommand, buffer); | 656 | enqueueCommand(mSynchronizerQueue, Sink::Commands::ModifyEntityCommand, buffer); |
657 | }); | 657 | }); |
658 | } | 658 | } |
659 | } else { | 659 | } else { |