summaryrefslogtreecommitdiffstats
path: root/common/genericresource.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'common/genericresource.cpp')
-rw-r--r--common/genericresource.cpp160
1 files changed, 160 insertions, 0 deletions
diff --git a/common/genericresource.cpp b/common/genericresource.cpp
index 9fbcaaa..42153ec 100644
--- a/common/genericresource.cpp
+++ b/common/genericresource.cpp
@@ -4,12 +4,16 @@
4#include "pipeline.h" 4#include "pipeline.h"
5#include "queuedcommand_generated.h" 5#include "queuedcommand_generated.h"
6#include "createentity_generated.h" 6#include "createentity_generated.h"
7#include "modifyentity_generated.h"
8#include "deleteentity_generated.h"
7#include "domainadaptor.h" 9#include "domainadaptor.h"
8#include "commands.h" 10#include "commands.h"
9#include "index.h" 11#include "index.h"
10#include "log.h" 12#include "log.h"
11#include "definitions.h" 13#include "definitions.h"
12 14
15#include <QUuid>
16
13static int sBatchSize = 100; 17static int sBatchSize = 100;
14 18
15using namespace Akonadi2; 19using namespace Akonadi2;
@@ -52,6 +56,7 @@ public:
52 { 56 {
53 const qint64 topRevision = Storage::maxRevision(mStorage.createTransaction(Storage::ReadOnly)); 57 const qint64 topRevision = Storage::maxRevision(mStorage.createTransaction(Storage::ReadOnly));
54 const qint64 lastReplayedRevision = getLastReplayedRevision(); 58 const qint64 lastReplayedRevision = getLastReplayedRevision();
59 Trace() << "All changes replayed " << topRevision << lastReplayedRevision;
55 return (lastReplayedRevision >= topRevision); 60 return (lastReplayedRevision >= topRevision);
56 } 61 }
57 62
@@ -444,4 +449,159 @@ void GenericResource::setLowerBoundRevision(qint64 revision)
444 updateLowerBoundRevision(); 449 updateLowerBoundRevision();
445} 450}
446 451
452void GenericResource::createEntity(const QByteArray &akonadiId, const QByteArray &bufferType, const Akonadi2::ApplicationDomain::ApplicationDomainType &domainObject, DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function<void(const QByteArray &)> callback)
453{
454 //These changes are coming from the source
455 const auto replayToSource = false;
456 flatbuffers::FlatBufferBuilder entityFbb;
457 adaptorFactory.createBuffer(domainObject, entityFbb);
458 flatbuffers::FlatBufferBuilder fbb;
459 //This is the resource type and not the domain type
460 auto entityId = fbb.CreateString(akonadiId.toStdString());
461 auto type = fbb.CreateString(bufferType.toStdString());
462 auto delta = Akonadi2::EntityBuffer::appendAsVector(fbb, entityFbb.GetBufferPointer(), entityFbb.GetSize());
463 auto location = Akonadi2::Commands::CreateCreateEntity(fbb, entityId, type, delta, replayToSource);
464 Akonadi2::Commands::FinishCreateEntityBuffer(fbb, location);
465 callback(QByteArray::fromRawData(reinterpret_cast<char const *>(fbb.GetBufferPointer()), fbb.GetSize()));
466}
467
468void GenericResource::modifyEntity(const QByteArray &akonadiId, qint64 revision, const QByteArray &bufferType, const Akonadi2::ApplicationDomain::ApplicationDomainType &domainObject, DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function<void(const QByteArray &)> callback)
469{
470 //These changes are coming from the source
471 const auto replayToSource = false;
472 flatbuffers::FlatBufferBuilder entityFbb;
473 adaptorFactory.createBuffer(domainObject, entityFbb);
474 flatbuffers::FlatBufferBuilder fbb;
475 auto entityId = fbb.CreateString(akonadiId.toStdString());
476 //This is the resource type and not the domain type
477 auto type = fbb.CreateString(bufferType.toStdString());
478 auto delta = Akonadi2::EntityBuffer::appendAsVector(fbb, entityFbb.GetBufferPointer(), entityFbb.GetSize());
479 //TODO removals
480 auto location = Akonadi2::Commands::CreateModifyEntity(fbb, revision, entityId, 0, type, delta, replayToSource);
481 Akonadi2::Commands::FinishModifyEntityBuffer(fbb, location);
482 callback(QByteArray::fromRawData(reinterpret_cast<char const *>(fbb.GetBufferPointer()), fbb.GetSize()));
483}
484
485void GenericResource::deleteEntity(const QByteArray &akonadiId, qint64 revision, const QByteArray &bufferType, std::function<void(const QByteArray &)> callback)
486{
487 //These changes are coming from the source
488 const auto replayToSource = false;
489 flatbuffers::FlatBufferBuilder fbb;
490 auto entityId = fbb.CreateString(akonadiId.toStdString());
491 //This is the resource type and not the domain type
492 auto type = fbb.CreateString(bufferType.toStdString());
493 auto location = Akonadi2::Commands::CreateDeleteEntity(fbb, revision, entityId, type, replayToSource);
494 Akonadi2::Commands::FinishDeleteEntityBuffer(fbb, location);
495 callback(QByteArray::fromRawData(reinterpret_cast<char const *>(fbb.GetBufferPointer()), fbb.GetSize()));
496}
497
498void GenericResource::recordRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId, Akonadi2::Storage::Transaction &transaction)
499{
500 Index index("rid.mapping." + bufferType, transaction);
501 Index localIndex("localid.mapping." + bufferType, transaction);
502 index.add(remoteId, localId);
503 localIndex.add(localId, remoteId);
504}
505
506void GenericResource::removeRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId, Akonadi2::Storage::Transaction &transaction)
507{
508 Index index("rid.mapping." + bufferType, transaction);
509 Index localIndex("localid.mapping." + bufferType, transaction);
510 index.remove(remoteId, localId);
511 localIndex.remove(localId, remoteId);
512}
513
514QByteArray GenericResource::resolveRemoteId(const QByteArray &bufferType, const QByteArray &remoteId, Akonadi2::Storage::Transaction &transaction)
515{
516 //Lookup local id for remote id, or insert a new pair otherwise
517 Index index("rid.mapping." + bufferType, transaction);
518 Index localIndex("localid.mapping." + bufferType, transaction);
519 QByteArray akonadiId = index.lookup(remoteId);
520 if (akonadiId.isEmpty()) {
521 akonadiId = QUuid::createUuid().toString().toUtf8();
522 index.add(remoteId, akonadiId);
523 localIndex.add(akonadiId, remoteId);
524 }
525 return akonadiId;
526}
527
528QByteArray GenericResource::resolveLocalId(const QByteArray &bufferType, const QByteArray &localId, Akonadi2::Storage::Transaction &transaction)
529{
530 Index index("localid.mapping." + bufferType, transaction);
531 QByteArray remoteId = index.lookup(localId);
532 if (remoteId.isEmpty()) {
533 Warning() << "Couldn't find the remote id for " << localId;
534 return QByteArray();
535 }
536 return remoteId;
537}
538
539void GenericResource::scanForRemovals(Akonadi2::Storage::Transaction &transaction, Akonadi2::Storage::Transaction &synchronizationTransaction, const QByteArray &bufferType, const std::function<void(const std::function<void(const QByteArray &key)> &callback)> &entryGenerator, std::function<bool(const QByteArray &remoteId)> exists)
540{
541 entryGenerator([this, &transaction, bufferType, &synchronizationTransaction, &exists](const QByteArray &key) {
542 auto akonadiId = Akonadi2::Storage::uidFromKey(key);
543 Trace() << "Checking for removal " << key;
544 const auto remoteId = resolveLocalId(bufferType, akonadiId, synchronizationTransaction);
545 //If we have no remoteId, the entity hasn't been replayed to the source yet
546 if (!remoteId.isEmpty()) {
547 if (!exists(remoteId)) {
548 Trace() << "Found a removed entity: " << akonadiId;
549 deleteEntity(akonadiId, Akonadi2::Storage::maxRevision(transaction), bufferType, [this](const QByteArray &buffer) {
550 enqueueCommand(mSynchronizerQueue, Akonadi2::Commands::DeleteEntityCommand, buffer);
551 });
552 }
553 }
554 });
555}
556
557static QSharedPointer<Akonadi2::ApplicationDomain::BufferAdaptor> getLatest(const Akonadi2::Storage::NamedDatabase &db, const QByteArray &uid, DomainTypeAdaptorFactoryInterface &adaptorFactory)
558{
559 QSharedPointer<Akonadi2::ApplicationDomain::BufferAdaptor> current;
560 db.findLatest(uid, [&current, &adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool {
561 Akonadi2::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size());
562 if (!buffer.isValid()) {
563 Warning() << "Read invalid buffer from disk";
564 } else {
565 current = adaptorFactory.createAdaptor(buffer.entity());
566 }
567 return false;
568 },
569 [](const Akonadi2::Storage::Error &error) {
570 Warning() << "Failed to read current value from storage: " << error.message;
571 });
572 return current;
573}
574
575void GenericResource::createOrModify(Akonadi2::Storage::Transaction &transaction, Akonadi2::Storage::Transaction &synchronizationTransaction, DomainTypeAdaptorFactoryInterface &adaptorFactory, const QByteArray &bufferType, const QByteArray &remoteId, const Akonadi2::ApplicationDomain::ApplicationDomainType &entity)
576{
577 auto mainDatabase = transaction.openDatabase(bufferType + ".main");
578 const auto akonadiId = resolveRemoteId(bufferType, remoteId, synchronizationTransaction);
579 const auto found = mainDatabase.contains(akonadiId);
580 if (!found) {
581 Trace() << "Found a new entity: " << remoteId;
582 createEntity(akonadiId, bufferType, entity, adaptorFactory, [this](const QByteArray &buffer) {
583 enqueueCommand(mSynchronizerQueue, Akonadi2::Commands::CreateEntityCommand, buffer);
584 });
585 } else { //modification
586 if (auto current = getLatest(mainDatabase, akonadiId, adaptorFactory)) {
587 bool changed = false;
588 for (const auto &property : entity.changedProperties()) {
589 if (entity.getProperty(property) != current->getProperty(property)) {
590 Trace() << "Property changed " << akonadiId << property;
591 changed = true;
592 }
593 }
594 if (changed) {
595 Trace() << "Found a modified entity: " << remoteId;
596 modifyEntity(akonadiId, Akonadi2::Storage::maxRevision(transaction), bufferType, entity, adaptorFactory, [this](const QByteArray &buffer) {
597 enqueueCommand(mSynchronizerQueue, Akonadi2::Commands::ModifyEntityCommand, buffer);
598 });
599 }
600 } else {
601 Warning() << "Failed to get current entity";
602 }
603 }
604}
605
606
447#include "genericresource.moc" 607#include "genericresource.moc"