diff options
Diffstat (limited to 'common/genericresource.cpp')
-rw-r--r-- | common/genericresource.cpp | 160 |
1 files changed, 160 insertions, 0 deletions
diff --git a/common/genericresource.cpp b/common/genericresource.cpp index 9fbcaaa..42153ec 100644 --- a/common/genericresource.cpp +++ b/common/genericresource.cpp | |||
@@ -4,12 +4,16 @@ | |||
4 | #include "pipeline.h" | 4 | #include "pipeline.h" |
5 | #include "queuedcommand_generated.h" | 5 | #include "queuedcommand_generated.h" |
6 | #include "createentity_generated.h" | 6 | #include "createentity_generated.h" |
7 | #include "modifyentity_generated.h" | ||
8 | #include "deleteentity_generated.h" | ||
7 | #include "domainadaptor.h" | 9 | #include "domainadaptor.h" |
8 | #include "commands.h" | 10 | #include "commands.h" |
9 | #include "index.h" | 11 | #include "index.h" |
10 | #include "log.h" | 12 | #include "log.h" |
11 | #include "definitions.h" | 13 | #include "definitions.h" |
12 | 14 | ||
15 | #include <QUuid> | ||
16 | |||
13 | static int sBatchSize = 100; | 17 | static int sBatchSize = 100; |
14 | 18 | ||
15 | using namespace Akonadi2; | 19 | using namespace Akonadi2; |
@@ -52,6 +56,7 @@ public: | |||
52 | { | 56 | { |
53 | const qint64 topRevision = Storage::maxRevision(mStorage.createTransaction(Storage::ReadOnly)); | 57 | const qint64 topRevision = Storage::maxRevision(mStorage.createTransaction(Storage::ReadOnly)); |
54 | const qint64 lastReplayedRevision = getLastReplayedRevision(); | 58 | const qint64 lastReplayedRevision = getLastReplayedRevision(); |
59 | Trace() << "All changes replayed " << topRevision << lastReplayedRevision; | ||
55 | return (lastReplayedRevision >= topRevision); | 60 | return (lastReplayedRevision >= topRevision); |
56 | } | 61 | } |
57 | 62 | ||
@@ -444,4 +449,159 @@ void GenericResource::setLowerBoundRevision(qint64 revision) | |||
444 | updateLowerBoundRevision(); | 449 | updateLowerBoundRevision(); |
445 | } | 450 | } |
446 | 451 | ||
452 | void GenericResource::createEntity(const QByteArray &akonadiId, const QByteArray &bufferType, const Akonadi2::ApplicationDomain::ApplicationDomainType &domainObject, DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function<void(const QByteArray &)> callback) | ||
453 | { | ||
454 | //These changes are coming from the source | ||
455 | const auto replayToSource = false; | ||
456 | flatbuffers::FlatBufferBuilder entityFbb; | ||
457 | adaptorFactory.createBuffer(domainObject, entityFbb); | ||
458 | flatbuffers::FlatBufferBuilder fbb; | ||
459 | //This is the resource type and not the domain type | ||
460 | auto entityId = fbb.CreateString(akonadiId.toStdString()); | ||
461 | auto type = fbb.CreateString(bufferType.toStdString()); | ||
462 | auto delta = Akonadi2::EntityBuffer::appendAsVector(fbb, entityFbb.GetBufferPointer(), entityFbb.GetSize()); | ||
463 | auto location = Akonadi2::Commands::CreateCreateEntity(fbb, entityId, type, delta, replayToSource); | ||
464 | Akonadi2::Commands::FinishCreateEntityBuffer(fbb, location); | ||
465 | callback(QByteArray::fromRawData(reinterpret_cast<char const *>(fbb.GetBufferPointer()), fbb.GetSize())); | ||
466 | } | ||
467 | |||
468 | void GenericResource::modifyEntity(const QByteArray &akonadiId, qint64 revision, const QByteArray &bufferType, const Akonadi2::ApplicationDomain::ApplicationDomainType &domainObject, DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function<void(const QByteArray &)> callback) | ||
469 | { | ||
470 | //These changes are coming from the source | ||
471 | const auto replayToSource = false; | ||
472 | flatbuffers::FlatBufferBuilder entityFbb; | ||
473 | adaptorFactory.createBuffer(domainObject, entityFbb); | ||
474 | flatbuffers::FlatBufferBuilder fbb; | ||
475 | auto entityId = fbb.CreateString(akonadiId.toStdString()); | ||
476 | //This is the resource type and not the domain type | ||
477 | auto type = fbb.CreateString(bufferType.toStdString()); | ||
478 | auto delta = Akonadi2::EntityBuffer::appendAsVector(fbb, entityFbb.GetBufferPointer(), entityFbb.GetSize()); | ||
479 | //TODO removals | ||
480 | auto location = Akonadi2::Commands::CreateModifyEntity(fbb, revision, entityId, 0, type, delta, replayToSource); | ||
481 | Akonadi2::Commands::FinishModifyEntityBuffer(fbb, location); | ||
482 | callback(QByteArray::fromRawData(reinterpret_cast<char const *>(fbb.GetBufferPointer()), fbb.GetSize())); | ||
483 | } | ||
484 | |||
485 | void GenericResource::deleteEntity(const QByteArray &akonadiId, qint64 revision, const QByteArray &bufferType, std::function<void(const QByteArray &)> callback) | ||
486 | { | ||
487 | //These changes are coming from the source | ||
488 | const auto replayToSource = false; | ||
489 | flatbuffers::FlatBufferBuilder fbb; | ||
490 | auto entityId = fbb.CreateString(akonadiId.toStdString()); | ||
491 | //This is the resource type and not the domain type | ||
492 | auto type = fbb.CreateString(bufferType.toStdString()); | ||
493 | auto location = Akonadi2::Commands::CreateDeleteEntity(fbb, revision, entityId, type, replayToSource); | ||
494 | Akonadi2::Commands::FinishDeleteEntityBuffer(fbb, location); | ||
495 | callback(QByteArray::fromRawData(reinterpret_cast<char const *>(fbb.GetBufferPointer()), fbb.GetSize())); | ||
496 | } | ||
497 | |||
498 | void GenericResource::recordRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId, Akonadi2::Storage::Transaction &transaction) | ||
499 | { | ||
500 | Index index("rid.mapping." + bufferType, transaction); | ||
501 | Index localIndex("localid.mapping." + bufferType, transaction); | ||
502 | index.add(remoteId, localId); | ||
503 | localIndex.add(localId, remoteId); | ||
504 | } | ||
505 | |||
506 | void GenericResource::removeRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId, Akonadi2::Storage::Transaction &transaction) | ||
507 | { | ||
508 | Index index("rid.mapping." + bufferType, transaction); | ||
509 | Index localIndex("localid.mapping." + bufferType, transaction); | ||
510 | index.remove(remoteId, localId); | ||
511 | localIndex.remove(localId, remoteId); | ||
512 | } | ||
513 | |||
514 | QByteArray GenericResource::resolveRemoteId(const QByteArray &bufferType, const QByteArray &remoteId, Akonadi2::Storage::Transaction &transaction) | ||
515 | { | ||
516 | //Lookup local id for remote id, or insert a new pair otherwise | ||
517 | Index index("rid.mapping." + bufferType, transaction); | ||
518 | Index localIndex("localid.mapping." + bufferType, transaction); | ||
519 | QByteArray akonadiId = index.lookup(remoteId); | ||
520 | if (akonadiId.isEmpty()) { | ||
521 | akonadiId = QUuid::createUuid().toString().toUtf8(); | ||
522 | index.add(remoteId, akonadiId); | ||
523 | localIndex.add(akonadiId, remoteId); | ||
524 | } | ||
525 | return akonadiId; | ||
526 | } | ||
527 | |||
528 | QByteArray GenericResource::resolveLocalId(const QByteArray &bufferType, const QByteArray &localId, Akonadi2::Storage::Transaction &transaction) | ||
529 | { | ||
530 | Index index("localid.mapping." + bufferType, transaction); | ||
531 | QByteArray remoteId = index.lookup(localId); | ||
532 | if (remoteId.isEmpty()) { | ||
533 | Warning() << "Couldn't find the remote id for " << localId; | ||
534 | return QByteArray(); | ||
535 | } | ||
536 | return remoteId; | ||
537 | } | ||
538 | |||
539 | void GenericResource::scanForRemovals(Akonadi2::Storage::Transaction &transaction, Akonadi2::Storage::Transaction &synchronizationTransaction, const QByteArray &bufferType, const std::function<void(const std::function<void(const QByteArray &key)> &callback)> &entryGenerator, std::function<bool(const QByteArray &remoteId)> exists) | ||
540 | { | ||
541 | entryGenerator([this, &transaction, bufferType, &synchronizationTransaction, &exists](const QByteArray &key) { | ||
542 | auto akonadiId = Akonadi2::Storage::uidFromKey(key); | ||
543 | Trace() << "Checking for removal " << key; | ||
544 | const auto remoteId = resolveLocalId(bufferType, akonadiId, synchronizationTransaction); | ||
545 | //If we have no remoteId, the entity hasn't been replayed to the source yet | ||
546 | if (!remoteId.isEmpty()) { | ||
547 | if (!exists(remoteId)) { | ||
548 | Trace() << "Found a removed entity: " << akonadiId; | ||
549 | deleteEntity(akonadiId, Akonadi2::Storage::maxRevision(transaction), bufferType, [this](const QByteArray &buffer) { | ||
550 | enqueueCommand(mSynchronizerQueue, Akonadi2::Commands::DeleteEntityCommand, buffer); | ||
551 | }); | ||
552 | } | ||
553 | } | ||
554 | }); | ||
555 | } | ||
556 | |||
557 | static QSharedPointer<Akonadi2::ApplicationDomain::BufferAdaptor> getLatest(const Akonadi2::Storage::NamedDatabase &db, const QByteArray &uid, DomainTypeAdaptorFactoryInterface &adaptorFactory) | ||
558 | { | ||
559 | QSharedPointer<Akonadi2::ApplicationDomain::BufferAdaptor> current; | ||
560 | db.findLatest(uid, [¤t, &adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool { | ||
561 | Akonadi2::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); | ||
562 | if (!buffer.isValid()) { | ||
563 | Warning() << "Read invalid buffer from disk"; | ||
564 | } else { | ||
565 | current = adaptorFactory.createAdaptor(buffer.entity()); | ||
566 | } | ||
567 | return false; | ||
568 | }, | ||
569 | [](const Akonadi2::Storage::Error &error) { | ||
570 | Warning() << "Failed to read current value from storage: " << error.message; | ||
571 | }); | ||
572 | return current; | ||
573 | } | ||
574 | |||
575 | void GenericResource::createOrModify(Akonadi2::Storage::Transaction &transaction, Akonadi2::Storage::Transaction &synchronizationTransaction, DomainTypeAdaptorFactoryInterface &adaptorFactory, const QByteArray &bufferType, const QByteArray &remoteId, const Akonadi2::ApplicationDomain::ApplicationDomainType &entity) | ||
576 | { | ||
577 | auto mainDatabase = transaction.openDatabase(bufferType + ".main"); | ||
578 | const auto akonadiId = resolveRemoteId(bufferType, remoteId, synchronizationTransaction); | ||
579 | const auto found = mainDatabase.contains(akonadiId); | ||
580 | if (!found) { | ||
581 | Trace() << "Found a new entity: " << remoteId; | ||
582 | createEntity(akonadiId, bufferType, entity, adaptorFactory, [this](const QByteArray &buffer) { | ||
583 | enqueueCommand(mSynchronizerQueue, Akonadi2::Commands::CreateEntityCommand, buffer); | ||
584 | }); | ||
585 | } else { //modification | ||
586 | if (auto current = getLatest(mainDatabase, akonadiId, adaptorFactory)) { | ||
587 | bool changed = false; | ||
588 | for (const auto &property : entity.changedProperties()) { | ||
589 | if (entity.getProperty(property) != current->getProperty(property)) { | ||
590 | Trace() << "Property changed " << akonadiId << property; | ||
591 | changed = true; | ||
592 | } | ||
593 | } | ||
594 | if (changed) { | ||
595 | Trace() << "Found a modified entity: " << remoteId; | ||
596 | modifyEntity(akonadiId, Akonadi2::Storage::maxRevision(transaction), bufferType, entity, adaptorFactory, [this](const QByteArray &buffer) { | ||
597 | enqueueCommand(mSynchronizerQueue, Akonadi2::Commands::ModifyEntityCommand, buffer); | ||
598 | }); | ||
599 | } | ||
600 | } else { | ||
601 | Warning() << "Failed to get current entity"; | ||
602 | } | ||
603 | } | ||
604 | } | ||
605 | |||
606 | |||
447 | #include "genericresource.moc" | 607 | #include "genericresource.moc" |