summaryrefslogtreecommitdiffstats
path: root/common/genericresource.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'common/genericresource.cpp')
-rw-r--r--common/genericresource.cpp342
1 files changed, 1 insertions, 341 deletions
diff --git a/common/genericresource.cpp b/common/genericresource.cpp
index cb2ef21..568e066 100644
--- a/common/genericresource.cpp
+++ b/common/genericresource.cpp
@@ -34,6 +34,7 @@
34#include "definitions.h" 34#include "definitions.h"
35#include "bufferutils.h" 35#include "bufferutils.h"
36#include "adaptorfactoryregistry.h" 36#include "adaptorfactoryregistry.h"
37#include "synchronizer.h"
37 38
38#include <QUuid> 39#include <QUuid>
39#include <QDataStream> 40#include <QDataStream>
@@ -461,347 +462,6 @@ void GenericResource::setLowerBoundRevision(qint64 revision)
461 updateLowerBoundRevision(); 462 updateLowerBoundRevision();
462} 463}
463 464
464
465
466
467EntityStore::EntityStore(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier, Sink::Storage::Transaction &transaction)
468 : mResourceType(resourceType), mResourceInstanceIdentifier(resourceInstanceIdentifier),
469 mTransaction(transaction)
470{
471
472}
473
474template<typename T>
475T EntityStore::read(const QByteArray &identifier) const
476{
477 auto typeName = ApplicationDomain::getTypeName<T>();
478 auto mainDatabase = Storage::mainDatabase(mTransaction, typeName);
479 auto bufferAdaptor = getLatest(mainDatabase, identifier, *Sink::AdaptorFactoryRegistry::instance().getFactory<T>(mResourceType));
480 Q_ASSERT(bufferAdaptor);
481 return T(mResourceInstanceIdentifier, identifier, 0, bufferAdaptor);
482}
483
484QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> EntityStore::getLatest(const Sink::Storage::NamedDatabase &db, const QByteArray &uid, DomainTypeAdaptorFactoryInterface &adaptorFactory)
485{
486 QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> current;
487 db.findLatest(uid,
488 [&current, &adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool {
489 Sink::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size());
490 if (!buffer.isValid()) {
491 Warning() << "Read invalid buffer from disk";
492 } else {
493 Trace() << "Found value " << key;
494 current = adaptorFactory.createAdaptor(buffer.entity());
495 }
496 return false;
497 },
498 [](const Sink::Storage::Error &error) { Warning() << "Failed to read current value from storage: " << error.message; });
499 return current;
500}
501
502
503
504SyncStore::SyncStore(Sink::Storage::Transaction &transaction)
505 : mTransaction(transaction)
506{
507
508}
509
510void SyncStore::recordRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId)
511{
512 Index("rid.mapping." + bufferType, mTransaction).add(remoteId, localId);
513 Index("localid.mapping." + bufferType, mTransaction).add(localId, remoteId);
514}
515
516void SyncStore::removeRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId)
517{
518 Index("rid.mapping." + bufferType, mTransaction).remove(remoteId, localId);
519 Index("localid.mapping." + bufferType, mTransaction).remove(localId, remoteId);
520}
521
522void SyncStore::updateRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId)
523{
524 const auto oldRemoteId = Index("localid.mapping." + bufferType, mTransaction).lookup(localId);
525 removeRemoteId(bufferType, localId, oldRemoteId);
526 recordRemoteId(bufferType, localId, remoteId);
527}
528
529QByteArray SyncStore::resolveRemoteId(const QByteArray &bufferType, const QByteArray &remoteId)
530{
531 // Lookup local id for remote id, or insert a new pair otherwise
532 Index index("rid.mapping." + bufferType, mTransaction);
533 QByteArray sinkId = index.lookup(remoteId);
534 if (sinkId.isEmpty()) {
535 sinkId = QUuid::createUuid().toString().toUtf8();
536 index.add(remoteId, sinkId);
537 Index("localid.mapping." + bufferType, mTransaction).add(sinkId, remoteId);
538 }
539 return sinkId;
540}
541
542QByteArray SyncStore::resolveLocalId(const QByteArray &bufferType, const QByteArray &localId)
543{
544 QByteArray remoteId = Index("localid.mapping." + bufferType, mTransaction).lookup(localId);
545 if (remoteId.isEmpty()) {
546 Warning() << "Couldn't find the remote id for " << localId;
547 return QByteArray();
548 }
549 return remoteId;
550}
551
552
553
554
555
556
557
558
559Synchronizer::Synchronizer(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier)
560 : mStorage(Sink::storageLocation(), resourceInstanceIdentifier, Sink::Storage::ReadOnly),
561 mSyncStorage(Sink::storageLocation(), resourceInstanceIdentifier + ".synchronization", Sink::Storage::ReadWrite),
562 mResourceType(resourceType),
563 mResourceInstanceIdentifier(resourceInstanceIdentifier)
564{
565 Trace() << "Starting synchronizer: " << resourceType << resourceInstanceIdentifier;
566
567}
568
569void Synchronizer::setup(const std::function<void(int commandId, const QByteArray &data)> &enqueueCommandCallback)
570{
571 mEnqueue = enqueueCommandCallback;
572}
573
574void Synchronizer::enqueueCommand(int commandId, const QByteArray &data)
575{
576 Q_ASSERT(mEnqueue);
577 mEnqueue(commandId, data);
578}
579
580EntityStore &Synchronizer::store()
581{
582 if (!mEntityStore) {
583 mEntityStore = QSharedPointer<EntityStore>::create(mResourceType, mResourceInstanceIdentifier, mTransaction);
584 }
585 return *mEntityStore;
586}
587
588SyncStore &Synchronizer::syncStore()
589{
590 if (!mSyncStore) {
591 mSyncStore = QSharedPointer<SyncStore>::create(mSyncTransaction);
592 }
593 return *mSyncStore;
594}
595
596void Synchronizer::createEntity(const QByteArray &sinkId, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject,
597 DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function<void(const QByteArray &)> callback)
598{
599 // These changes are coming from the source
600 const auto replayToSource = false;
601 flatbuffers::FlatBufferBuilder entityFbb;
602 adaptorFactory.createBuffer(domainObject, entityFbb);
603 flatbuffers::FlatBufferBuilder fbb;
604 // This is the resource type and not the domain type
605 auto entityId = fbb.CreateString(sinkId.toStdString());
606 auto type = fbb.CreateString(bufferType.toStdString());
607 auto delta = Sink::EntityBuffer::appendAsVector(fbb, entityFbb.GetBufferPointer(), entityFbb.GetSize());
608 auto location = Sink::Commands::CreateCreateEntity(fbb, entityId, type, delta, replayToSource);
609 Sink::Commands::FinishCreateEntityBuffer(fbb, location);
610 callback(BufferUtils::extractBuffer(fbb));
611}
612
613void Synchronizer::modifyEntity(const QByteArray &sinkId, qint64 revision, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject,
614 DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function<void(const QByteArray &)> callback)
615{
616 // These changes are coming from the source
617 const auto replayToSource = false;
618 flatbuffers::FlatBufferBuilder entityFbb;
619 adaptorFactory.createBuffer(domainObject, entityFbb);
620 flatbuffers::FlatBufferBuilder fbb;
621 auto entityId = fbb.CreateString(sinkId.toStdString());
622 // This is the resource type and not the domain type
623 auto type = fbb.CreateString(bufferType.toStdString());
624 auto delta = Sink::EntityBuffer::appendAsVector(fbb, entityFbb.GetBufferPointer(), entityFbb.GetSize());
625 // TODO removals
626 auto location = Sink::Commands::CreateModifyEntity(fbb, revision, entityId, 0, type, delta, replayToSource);
627 Sink::Commands::FinishModifyEntityBuffer(fbb, location);
628 callback(BufferUtils::extractBuffer(fbb));
629}
630
631void Synchronizer::deleteEntity(const QByteArray &sinkId, qint64 revision, const QByteArray &bufferType, std::function<void(const QByteArray &)> callback)
632{
633 // These changes are coming from the source
634 const auto replayToSource = false;
635 flatbuffers::FlatBufferBuilder fbb;
636 auto entityId = fbb.CreateString(sinkId.toStdString());
637 // This is the resource type and not the domain type
638 auto type = fbb.CreateString(bufferType.toStdString());
639 auto location = Sink::Commands::CreateDeleteEntity(fbb, revision, entityId, type, replayToSource);
640 Sink::Commands::FinishDeleteEntityBuffer(fbb, location);
641 callback(BufferUtils::extractBuffer(fbb));
642}
643
644void Synchronizer::scanForRemovals(const QByteArray &bufferType, const std::function<void(const std::function<void(const QByteArray &key)> &callback)> &entryGenerator, std::function<bool(const QByteArray &remoteId)> exists)
645{
646 entryGenerator([this, bufferType, &exists](const QByteArray &key) {
647 auto sinkId = Sink::Storage::uidFromKey(key);
648 Trace() << "Checking for removal " << key;
649 const auto remoteId = syncStore().resolveLocalId(bufferType, sinkId);
650 // If we have no remoteId, the entity hasn't been replayed to the source yet
651 if (!remoteId.isEmpty()) {
652 if (!exists(remoteId)) {
653 Trace() << "Found a removed entity: " << sinkId;
654 deleteEntity(sinkId, Sink::Storage::maxRevision(mTransaction), bufferType,
655 [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::DeleteEntityCommand, buffer); });
656 }
657 }
658 });
659}
660
661void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity)
662{
663 Trace() << "Create or modify" << bufferType << remoteId;
664 auto mainDatabase = Storage::mainDatabase(mTransaction, bufferType);
665 const auto sinkId = syncStore().resolveRemoteId(bufferType, remoteId);
666 const auto found = mainDatabase.contains(sinkId);
667 auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(mResourceType, bufferType);
668 if (!found) {
669 Trace() << "Found a new entity: " << remoteId;
670 createEntity(
671 sinkId, bufferType, entity, *adaptorFactory, [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::CreateEntityCommand, buffer); });
672 } else { // modification
673 if (auto current = store().getLatest(mainDatabase, sinkId, *adaptorFactory)) {
674 bool changed = false;
675 for (const auto &property : entity.changedProperties()) {
676 if (entity.getProperty(property) != current->getProperty(property)) {
677 Trace() << "Property changed " << sinkId << property;
678 changed = true;
679 }
680 }
681 if (changed) {
682 Trace() << "Found a modified entity: " << remoteId;
683 modifyEntity(sinkId, Sink::Storage::maxRevision(mTransaction), bufferType, entity, *adaptorFactory,
684 [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::ModifyEntityCommand, buffer); });
685 }
686 } else {
687 Warning() << "Failed to get current entity";
688 }
689 }
690}
691
692KAsync::Job<void> Synchronizer::synchronize()
693{
694 mTransaction = mStorage.createTransaction(Sink::Storage::ReadOnly);
695 mSyncTransaction = mSyncStorage.createTransaction(Sink::Storage::ReadWrite);
696 return synchronizeWithSource().then<void>([this]() {
697 mTransaction.abort();
698 mSyncTransaction.commit();
699 mSyncStore.clear();
700 mEntityStore.clear();
701 });
702}
703
704
705
706SourceWriteBack::SourceWriteBack(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier)
707 : ChangeReplay(resourceInstanceIdentifier),
708 mSyncStorage(Sink::storageLocation(), resourceInstanceIdentifier + ".synchronization", Sink::Storage::ReadWrite),
709 mResourceType(resourceType),
710 mResourceInstanceIdentifier(resourceInstanceIdentifier)
711{
712
713}
714
715EntityStore &SourceWriteBack::store()
716{
717 if (!mEntityStore) {
718 mEntityStore = QSharedPointer<EntityStore>::create(mResourceType, mResourceInstanceIdentifier, mTransaction);
719 }
720 return *mEntityStore;
721}
722
723SyncStore &SourceWriteBack::syncStore()
724{
725 if (!mSyncStore) {
726 mSyncStore = QSharedPointer<SyncStore>::create(mSyncTransaction);
727 }
728 return *mSyncStore;
729}
730
731KAsync::Job<void> SourceWriteBack::replay(const QByteArray &type, const QByteArray &key, const QByteArray &value)
732{
733 mTransaction = mStorage.createTransaction(Sink::Storage::ReadOnly);
734 mSyncTransaction = mSyncStorage.createTransaction(Sink::Storage::ReadWrite);
735
736 Sink::EntityBuffer buffer(value);
737 const Sink::Entity &entity = buffer.entity();
738 const auto metadataBuffer = Sink::EntityBuffer::readBuffer<Sink::Metadata>(entity.metadata());
739 Q_ASSERT(metadataBuffer);
740 if (!metadataBuffer->replayToSource()) {
741 Trace() << "Change is coming from the source";
742 return KAsync::null<void>();
743 }
744 const qint64 revision = metadataBuffer ? metadataBuffer->revision() : -1;
745 const auto operation = metadataBuffer ? metadataBuffer->operation() : Sink::Operation_Creation;
746 const auto uid = Sink::Storage::uidFromKey(key);
747 QByteArray oldRemoteId;
748
749 if (operation != Sink::Operation_Creation) {
750 oldRemoteId = syncStore().resolveLocalId(type, uid);
751 }
752 Trace() << "Replaying " << key << type;
753
754 KAsync::Job<QByteArray> job = KAsync::null<QByteArray>();
755 if (type == ENTITY_TYPE_FOLDER) {
756 auto folder = store().read<ApplicationDomain::Folder>(uid);
757 // const Sink::ApplicationDomain::Folder folder(mResourceInstanceIdentifier, uid, revision, mAdaptorFactories.value(type)->createAdaptor(entity));
758 job = replay(folder, operation, oldRemoteId);
759 } else if (type == ENTITY_TYPE_MAIL) {
760 auto mail = store().read<ApplicationDomain::Mail>(uid);
761 // const Sink::ApplicationDomain::Mail mail(mResourceInstanceIdentifier, uid, revision, mAdaptorFactories.value(type)->createAdaptor(entity));
762 job = replay(mail, operation, oldRemoteId);
763 }
764
765 return job.then<void, QByteArray>([this, operation, type, uid](const QByteArray &remoteId) {
766 Trace() << "Replayed change with remote id: " << remoteId;
767 if (operation == Sink::Operation_Creation) {
768 if (remoteId.isEmpty()) {
769 Warning() << "Returned an empty remoteId from the creation";
770 } else {
771 syncStore().recordRemoteId(type, uid, remoteId);
772 }
773 } else if (operation == Sink::Operation_Modification) {
774 if (remoteId.isEmpty()) {
775 Warning() << "Returned an empty remoteId from the creation";
776 } else {
777 syncStore().updateRemoteId(type, uid, remoteId);
778 }
779 } else if (operation == Sink::Operation_Removal) {
780 syncStore().removeRemoteId(type, uid, remoteId);
781 } else {
782 Warning() << "Unkown operation" << operation;
783 }
784
785 mTransaction.abort();
786 mSyncTransaction.commit();
787 mSyncStore.clear();
788 mEntityStore.clear();
789 }, [](int errorCode, const QString &errorMessage) {
790 Warning() << "Failed to replay change: " << errorMessage;
791 });
792}
793
794KAsync::Job<QByteArray> SourceWriteBack::replay(const ApplicationDomain::Mail &, Sink::Operation, const QByteArray &)
795{
796 return KAsync::null<QByteArray>();
797}
798
799KAsync::Job<QByteArray> SourceWriteBack::replay(const ApplicationDomain::Folder &, Sink::Operation, const QByteArray &)
800{
801 return KAsync::null<QByteArray>();
802}
803
804
805#pragma clang diagnostic push 465#pragma clang diagnostic push
806#pragma clang diagnostic ignored "-Wundefined-reinterpret-cast" 466#pragma clang diagnostic ignored "-Wundefined-reinterpret-cast"
807#include "genericresource.moc" 467#include "genericresource.moc"