diff options
author | Christian Mollekopf <chrigi_1@fastmail.fm> | 2016-05-28 02:09:58 +0200 |
---|---|---|
committer | Christian Mollekopf <chrigi_1@fastmail.fm> | 2016-05-28 02:09:58 +0200 |
commit | b441386c4e138d19bbd79d578e0a2ff1b3f54a93 (patch) | |
tree | 1110b6ec00ce29a8bcd7f6db0717f4a483f50587 | |
parent | afb29c153daff23e491a350784ce6af5db5e28af (diff) | |
download | sink-b441386c4e138d19bbd79d578e0a2ff1b3f54a93.tar.gz sink-b441386c4e138d19bbd79d578e0a2ff1b3f54a93.zip |
Moved the classes to individual files
-rw-r--r-- | common/CMakeLists.txt | 4 | ||||
-rw-r--r-- | common/entitystore.cpp | 48 | ||||
-rw-r--r-- | common/entitystore.h | 53 | ||||
-rw-r--r-- | common/genericresource.cpp | 342 | ||||
-rw-r--r-- | common/genericresource.h | 141 | ||||
-rw-r--r-- | common/remoteidmap.cpp | 75 | ||||
-rw-r--r-- | common/remoteidmap.h | 61 | ||||
-rw-r--r-- | common/sourcewriteback.cpp | 124 | ||||
-rw-r--r-- | common/sourcewriteback.h | 64 | ||||
-rw-r--r-- | common/synchronizer.cpp | 176 | ||||
-rw-r--r-- | common/synchronizer.h | 95 | ||||
-rw-r--r-- | examples/dummyresource/resourcefactory.cpp | 2 |
12 files changed, 703 insertions, 482 deletions
diff --git a/common/CMakeLists.txt b/common/CMakeLists.txt index 79b627a..3c6a083 100644 --- a/common/CMakeLists.txt +++ b/common/CMakeLists.txt | |||
@@ -68,6 +68,10 @@ set(command_SRCS | |||
68 | query.cpp | 68 | query.cpp |
69 | changereplay.cpp | 69 | changereplay.cpp |
70 | adaptorfactoryregistry.cpp | 70 | adaptorfactoryregistry.cpp |
71 | synchronizer.cpp | ||
72 | entitystore.cpp | ||
73 | remoteidmap.cpp | ||
74 | sourcewriteback.cpp | ||
71 | ${storage_SRCS}) | 75 | ${storage_SRCS}) |
72 | 76 | ||
73 | add_library(${PROJECT_NAME} SHARED ${command_SRCS}) | 77 | add_library(${PROJECT_NAME} SHARED ${command_SRCS}) |
diff --git a/common/entitystore.cpp b/common/entitystore.cpp new file mode 100644 index 0000000..2c15abf --- /dev/null +++ b/common/entitystore.cpp | |||
@@ -0,0 +1,48 @@ | |||
1 | /* | ||
2 | * Copyright (C) 2016 Christian Mollekopf <mollekopf@kolabsys.com> | ||
3 | * | ||
4 | * This library is free software; you can redistribute it and/or | ||
5 | * modify it under the terms of the GNU Lesser General Public | ||
6 | * License as published by the Free Software Foundation; either | ||
7 | * version 2.1 of the License, or (at your option) version 3, or any | ||
8 | * later version accepted by the membership of KDE e.V. (or its | ||
9 | * successor approved by the membership of KDE e.V.), which shall | ||
10 | * act as a proxy defined in Section 6 of version 3 of the license. | ||
11 | * | ||
12 | * This library is distributed in the hope that it will be useful, | ||
13 | * but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
14 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | ||
15 | * Lesser General Public License for more details. | ||
16 | * | ||
17 | * You should have received a copy of the GNU Lesser General Public | ||
18 | * License along with this library. If not, see <http://www.gnu.org/licenses/>. | ||
19 | */ | ||
20 | #include "entitystore.h" | ||
21 | |||
22 | using namespace Sink; | ||
23 | |||
24 | EntityStore::EntityStore(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier, Sink::Storage::Transaction &transaction) | ||
25 | : mResourceType(resourceType), mResourceInstanceIdentifier(resourceInstanceIdentifier), | ||
26 | mTransaction(transaction) | ||
27 | { | ||
28 | |||
29 | } | ||
30 | |||
31 | QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> EntityStore::getLatest(const Sink::Storage::NamedDatabase &db, const QByteArray &uid, DomainTypeAdaptorFactoryInterface &adaptorFactory) | ||
32 | { | ||
33 | QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> current; | ||
34 | db.findLatest(uid, | ||
35 | [¤t, &adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool { | ||
36 | Sink::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); | ||
37 | if (!buffer.isValid()) { | ||
38 | Warning() << "Read invalid buffer from disk"; | ||
39 | } else { | ||
40 | Trace() << "Found value " << key; | ||
41 | current = adaptorFactory.createAdaptor(buffer.entity()); | ||
42 | } | ||
43 | return false; | ||
44 | }, | ||
45 | [](const Sink::Storage::Error &error) { Warning() << "Failed to read current value from storage: " << error.message; }); | ||
46 | return current; | ||
47 | } | ||
48 | |||
diff --git a/common/entitystore.h b/common/entitystore.h new file mode 100644 index 0000000..17156ec --- /dev/null +++ b/common/entitystore.h | |||
@@ -0,0 +1,53 @@ | |||
1 | /* | ||
2 | * Copyright (C) 2016 Christian Mollekopf <mollekopf@kolabsys.com> | ||
3 | * | ||
4 | * This library is free software; you can redistribute it and/or | ||
5 | * modify it under the terms of the GNU Lesser General Public | ||
6 | * License as published by the Free Software Foundation; either | ||
7 | * version 2.1 of the License, or (at your option) version 3, or any | ||
8 | * later version accepted by the membership of KDE e.V. (or its | ||
9 | * successor approved by the membership of KDE e.V.), which shall | ||
10 | * act as a proxy defined in Section 6 of version 3 of the license. | ||
11 | * | ||
12 | * This library is distributed in the hope that it will be useful, | ||
13 | * but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
14 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | ||
15 | * Lesser General Public License for more details. | ||
16 | * | ||
17 | * You should have received a copy of the GNU Lesser General Public | ||
18 | * License along with this library. If not, see <http://www.gnu.org/licenses/>. | ||
19 | */ | ||
20 | #pragma once | ||
21 | |||
22 | #include "sink_export.h" | ||
23 | #include <domainadaptor.h> | ||
24 | |||
25 | #include "storage.h" | ||
26 | #include "adaptorfactoryregistry.h" | ||
27 | |||
28 | namespace Sink { | ||
29 | |||
30 | class SINK_EXPORT EntityStore | ||
31 | { | ||
32 | public: | ||
33 | EntityStore(const QByteArray &resourceType, const QByteArray &mResourceInstanceIdentifier, Sink::Storage::Transaction &transaction); | ||
34 | |||
35 | template<typename T> | ||
36 | T read(const QByteArray &identifier) const | ||
37 | { | ||
38 | auto typeName = ApplicationDomain::getTypeName<T>(); | ||
39 | auto mainDatabase = Storage::mainDatabase(mTransaction, typeName); | ||
40 | auto bufferAdaptor = getLatest(mainDatabase, identifier, *Sink::AdaptorFactoryRegistry::instance().getFactory<T>(mResourceType)); | ||
41 | Q_ASSERT(bufferAdaptor); | ||
42 | return T(mResourceInstanceIdentifier, identifier, 0, bufferAdaptor); | ||
43 | } | ||
44 | |||
45 | |||
46 | static QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> getLatest(const Sink::Storage::NamedDatabase &db, const QByteArray &uid, DomainTypeAdaptorFactoryInterface &adaptorFactory); | ||
47 | private: | ||
48 | QByteArray mResourceType; | ||
49 | QByteArray mResourceInstanceIdentifier; | ||
50 | Sink::Storage::Transaction &mTransaction; | ||
51 | }; | ||
52 | |||
53 | } | ||
diff --git a/common/genericresource.cpp b/common/genericresource.cpp index cb2ef21..568e066 100644 --- a/common/genericresource.cpp +++ b/common/genericresource.cpp | |||
@@ -34,6 +34,7 @@ | |||
34 | #include "definitions.h" | 34 | #include "definitions.h" |
35 | #include "bufferutils.h" | 35 | #include "bufferutils.h" |
36 | #include "adaptorfactoryregistry.h" | 36 | #include "adaptorfactoryregistry.h" |
37 | #include "synchronizer.h" | ||
37 | 38 | ||
38 | #include <QUuid> | 39 | #include <QUuid> |
39 | #include <QDataStream> | 40 | #include <QDataStream> |
@@ -461,347 +462,6 @@ void GenericResource::setLowerBoundRevision(qint64 revision) | |||
461 | updateLowerBoundRevision(); | 462 | updateLowerBoundRevision(); |
462 | } | 463 | } |
463 | 464 | ||
464 | |||
465 | |||
466 | |||
467 | EntityStore::EntityStore(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier, Sink::Storage::Transaction &transaction) | ||
468 | : mResourceType(resourceType), mResourceInstanceIdentifier(resourceInstanceIdentifier), | ||
469 | mTransaction(transaction) | ||
470 | { | ||
471 | |||
472 | } | ||
473 | |||
474 | template<typename T> | ||
475 | T EntityStore::read(const QByteArray &identifier) const | ||
476 | { | ||
477 | auto typeName = ApplicationDomain::getTypeName<T>(); | ||
478 | auto mainDatabase = Storage::mainDatabase(mTransaction, typeName); | ||
479 | auto bufferAdaptor = getLatest(mainDatabase, identifier, *Sink::AdaptorFactoryRegistry::instance().getFactory<T>(mResourceType)); | ||
480 | Q_ASSERT(bufferAdaptor); | ||
481 | return T(mResourceInstanceIdentifier, identifier, 0, bufferAdaptor); | ||
482 | } | ||
483 | |||
484 | QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> EntityStore::getLatest(const Sink::Storage::NamedDatabase &db, const QByteArray &uid, DomainTypeAdaptorFactoryInterface &adaptorFactory) | ||
485 | { | ||
486 | QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> current; | ||
487 | db.findLatest(uid, | ||
488 | [¤t, &adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool { | ||
489 | Sink::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); | ||
490 | if (!buffer.isValid()) { | ||
491 | Warning() << "Read invalid buffer from disk"; | ||
492 | } else { | ||
493 | Trace() << "Found value " << key; | ||
494 | current = adaptorFactory.createAdaptor(buffer.entity()); | ||
495 | } | ||
496 | return false; | ||
497 | }, | ||
498 | [](const Sink::Storage::Error &error) { Warning() << "Failed to read current value from storage: " << error.message; }); | ||
499 | return current; | ||
500 | } | ||
501 | |||
502 | |||
503 | |||
504 | SyncStore::SyncStore(Sink::Storage::Transaction &transaction) | ||
505 | : mTransaction(transaction) | ||
506 | { | ||
507 | |||
508 | } | ||
509 | |||
510 | void SyncStore::recordRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId) | ||
511 | { | ||
512 | Index("rid.mapping." + bufferType, mTransaction).add(remoteId, localId); | ||
513 | Index("localid.mapping." + bufferType, mTransaction).add(localId, remoteId); | ||
514 | } | ||
515 | |||
516 | void SyncStore::removeRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId) | ||
517 | { | ||
518 | Index("rid.mapping." + bufferType, mTransaction).remove(remoteId, localId); | ||
519 | Index("localid.mapping." + bufferType, mTransaction).remove(localId, remoteId); | ||
520 | } | ||
521 | |||
522 | void SyncStore::updateRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId) | ||
523 | { | ||
524 | const auto oldRemoteId = Index("localid.mapping." + bufferType, mTransaction).lookup(localId); | ||
525 | removeRemoteId(bufferType, localId, oldRemoteId); | ||
526 | recordRemoteId(bufferType, localId, remoteId); | ||
527 | } | ||
528 | |||
529 | QByteArray SyncStore::resolveRemoteId(const QByteArray &bufferType, const QByteArray &remoteId) | ||
530 | { | ||
531 | // Lookup local id for remote id, or insert a new pair otherwise | ||
532 | Index index("rid.mapping." + bufferType, mTransaction); | ||
533 | QByteArray sinkId = index.lookup(remoteId); | ||
534 | if (sinkId.isEmpty()) { | ||
535 | sinkId = QUuid::createUuid().toString().toUtf8(); | ||
536 | index.add(remoteId, sinkId); | ||
537 | Index("localid.mapping." + bufferType, mTransaction).add(sinkId, remoteId); | ||
538 | } | ||
539 | return sinkId; | ||
540 | } | ||
541 | |||
542 | QByteArray SyncStore::resolveLocalId(const QByteArray &bufferType, const QByteArray &localId) | ||
543 | { | ||
544 | QByteArray remoteId = Index("localid.mapping." + bufferType, mTransaction).lookup(localId); | ||
545 | if (remoteId.isEmpty()) { | ||
546 | Warning() << "Couldn't find the remote id for " << localId; | ||
547 | return QByteArray(); | ||
548 | } | ||
549 | return remoteId; | ||
550 | } | ||
551 | |||
552 | |||
553 | |||
554 | |||
555 | |||
556 | |||
557 | |||
558 | |||
559 | Synchronizer::Synchronizer(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier) | ||
560 | : mStorage(Sink::storageLocation(), resourceInstanceIdentifier, Sink::Storage::ReadOnly), | ||
561 | mSyncStorage(Sink::storageLocation(), resourceInstanceIdentifier + ".synchronization", Sink::Storage::ReadWrite), | ||
562 | mResourceType(resourceType), | ||
563 | mResourceInstanceIdentifier(resourceInstanceIdentifier) | ||
564 | { | ||
565 | Trace() << "Starting synchronizer: " << resourceType << resourceInstanceIdentifier; | ||
566 | |||
567 | } | ||
568 | |||
569 | void Synchronizer::setup(const std::function<void(int commandId, const QByteArray &data)> &enqueueCommandCallback) | ||
570 | { | ||
571 | mEnqueue = enqueueCommandCallback; | ||
572 | } | ||
573 | |||
574 | void Synchronizer::enqueueCommand(int commandId, const QByteArray &data) | ||
575 | { | ||
576 | Q_ASSERT(mEnqueue); | ||
577 | mEnqueue(commandId, data); | ||
578 | } | ||
579 | |||
580 | EntityStore &Synchronizer::store() | ||
581 | { | ||
582 | if (!mEntityStore) { | ||
583 | mEntityStore = QSharedPointer<EntityStore>::create(mResourceType, mResourceInstanceIdentifier, mTransaction); | ||
584 | } | ||
585 | return *mEntityStore; | ||
586 | } | ||
587 | |||
588 | SyncStore &Synchronizer::syncStore() | ||
589 | { | ||
590 | if (!mSyncStore) { | ||
591 | mSyncStore = QSharedPointer<SyncStore>::create(mSyncTransaction); | ||
592 | } | ||
593 | return *mSyncStore; | ||
594 | } | ||
595 | |||
596 | void Synchronizer::createEntity(const QByteArray &sinkId, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject, | ||
597 | DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function<void(const QByteArray &)> callback) | ||
598 | { | ||
599 | // These changes are coming from the source | ||
600 | const auto replayToSource = false; | ||
601 | flatbuffers::FlatBufferBuilder entityFbb; | ||
602 | adaptorFactory.createBuffer(domainObject, entityFbb); | ||
603 | flatbuffers::FlatBufferBuilder fbb; | ||
604 | // This is the resource type and not the domain type | ||
605 | auto entityId = fbb.CreateString(sinkId.toStdString()); | ||
606 | auto type = fbb.CreateString(bufferType.toStdString()); | ||
607 | auto delta = Sink::EntityBuffer::appendAsVector(fbb, entityFbb.GetBufferPointer(), entityFbb.GetSize()); | ||
608 | auto location = Sink::Commands::CreateCreateEntity(fbb, entityId, type, delta, replayToSource); | ||
609 | Sink::Commands::FinishCreateEntityBuffer(fbb, location); | ||
610 | callback(BufferUtils::extractBuffer(fbb)); | ||
611 | } | ||
612 | |||
613 | void Synchronizer::modifyEntity(const QByteArray &sinkId, qint64 revision, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject, | ||
614 | DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function<void(const QByteArray &)> callback) | ||
615 | { | ||
616 | // These changes are coming from the source | ||
617 | const auto replayToSource = false; | ||
618 | flatbuffers::FlatBufferBuilder entityFbb; | ||
619 | adaptorFactory.createBuffer(domainObject, entityFbb); | ||
620 | flatbuffers::FlatBufferBuilder fbb; | ||
621 | auto entityId = fbb.CreateString(sinkId.toStdString()); | ||
622 | // This is the resource type and not the domain type | ||
623 | auto type = fbb.CreateString(bufferType.toStdString()); | ||
624 | auto delta = Sink::EntityBuffer::appendAsVector(fbb, entityFbb.GetBufferPointer(), entityFbb.GetSize()); | ||
625 | // TODO removals | ||
626 | auto location = Sink::Commands::CreateModifyEntity(fbb, revision, entityId, 0, type, delta, replayToSource); | ||
627 | Sink::Commands::FinishModifyEntityBuffer(fbb, location); | ||
628 | callback(BufferUtils::extractBuffer(fbb)); | ||
629 | } | ||
630 | |||
631 | void Synchronizer::deleteEntity(const QByteArray &sinkId, qint64 revision, const QByteArray &bufferType, std::function<void(const QByteArray &)> callback) | ||
632 | { | ||
633 | // These changes are coming from the source | ||
634 | const auto replayToSource = false; | ||
635 | flatbuffers::FlatBufferBuilder fbb; | ||
636 | auto entityId = fbb.CreateString(sinkId.toStdString()); | ||
637 | // This is the resource type and not the domain type | ||
638 | auto type = fbb.CreateString(bufferType.toStdString()); | ||
639 | auto location = Sink::Commands::CreateDeleteEntity(fbb, revision, entityId, type, replayToSource); | ||
640 | Sink::Commands::FinishDeleteEntityBuffer(fbb, location); | ||
641 | callback(BufferUtils::extractBuffer(fbb)); | ||
642 | } | ||
643 | |||
644 | void Synchronizer::scanForRemovals(const QByteArray &bufferType, const std::function<void(const std::function<void(const QByteArray &key)> &callback)> &entryGenerator, std::function<bool(const QByteArray &remoteId)> exists) | ||
645 | { | ||
646 | entryGenerator([this, bufferType, &exists](const QByteArray &key) { | ||
647 | auto sinkId = Sink::Storage::uidFromKey(key); | ||
648 | Trace() << "Checking for removal " << key; | ||
649 | const auto remoteId = syncStore().resolveLocalId(bufferType, sinkId); | ||
650 | // If we have no remoteId, the entity hasn't been replayed to the source yet | ||
651 | if (!remoteId.isEmpty()) { | ||
652 | if (!exists(remoteId)) { | ||
653 | Trace() << "Found a removed entity: " << sinkId; | ||
654 | deleteEntity(sinkId, Sink::Storage::maxRevision(mTransaction), bufferType, | ||
655 | [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::DeleteEntityCommand, buffer); }); | ||
656 | } | ||
657 | } | ||
658 | }); | ||
659 | } | ||
660 | |||
661 | void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity) | ||
662 | { | ||
663 | Trace() << "Create or modify" << bufferType << remoteId; | ||
664 | auto mainDatabase = Storage::mainDatabase(mTransaction, bufferType); | ||
665 | const auto sinkId = syncStore().resolveRemoteId(bufferType, remoteId); | ||
666 | const auto found = mainDatabase.contains(sinkId); | ||
667 | auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(mResourceType, bufferType); | ||
668 | if (!found) { | ||
669 | Trace() << "Found a new entity: " << remoteId; | ||
670 | createEntity( | ||
671 | sinkId, bufferType, entity, *adaptorFactory, [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::CreateEntityCommand, buffer); }); | ||
672 | } else { // modification | ||
673 | if (auto current = store().getLatest(mainDatabase, sinkId, *adaptorFactory)) { | ||
674 | bool changed = false; | ||
675 | for (const auto &property : entity.changedProperties()) { | ||
676 | if (entity.getProperty(property) != current->getProperty(property)) { | ||
677 | Trace() << "Property changed " << sinkId << property; | ||
678 | changed = true; | ||
679 | } | ||
680 | } | ||
681 | if (changed) { | ||
682 | Trace() << "Found a modified entity: " << remoteId; | ||
683 | modifyEntity(sinkId, Sink::Storage::maxRevision(mTransaction), bufferType, entity, *adaptorFactory, | ||
684 | [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::ModifyEntityCommand, buffer); }); | ||
685 | } | ||
686 | } else { | ||
687 | Warning() << "Failed to get current entity"; | ||
688 | } | ||
689 | } | ||
690 | } | ||
691 | |||
692 | KAsync::Job<void> Synchronizer::synchronize() | ||
693 | { | ||
694 | mTransaction = mStorage.createTransaction(Sink::Storage::ReadOnly); | ||
695 | mSyncTransaction = mSyncStorage.createTransaction(Sink::Storage::ReadWrite); | ||
696 | return synchronizeWithSource().then<void>([this]() { | ||
697 | mTransaction.abort(); | ||
698 | mSyncTransaction.commit(); | ||
699 | mSyncStore.clear(); | ||
700 | mEntityStore.clear(); | ||
701 | }); | ||
702 | } | ||
703 | |||
704 | |||
705 | |||
706 | SourceWriteBack::SourceWriteBack(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier) | ||
707 | : ChangeReplay(resourceInstanceIdentifier), | ||
708 | mSyncStorage(Sink::storageLocation(), resourceInstanceIdentifier + ".synchronization", Sink::Storage::ReadWrite), | ||
709 | mResourceType(resourceType), | ||
710 | mResourceInstanceIdentifier(resourceInstanceIdentifier) | ||
711 | { | ||
712 | |||
713 | } | ||
714 | |||
715 | EntityStore &SourceWriteBack::store() | ||
716 | { | ||
717 | if (!mEntityStore) { | ||
718 | mEntityStore = QSharedPointer<EntityStore>::create(mResourceType, mResourceInstanceIdentifier, mTransaction); | ||
719 | } | ||
720 | return *mEntityStore; | ||
721 | } | ||
722 | |||
723 | SyncStore &SourceWriteBack::syncStore() | ||
724 | { | ||
725 | if (!mSyncStore) { | ||
726 | mSyncStore = QSharedPointer<SyncStore>::create(mSyncTransaction); | ||
727 | } | ||
728 | return *mSyncStore; | ||
729 | } | ||
730 | |||
731 | KAsync::Job<void> SourceWriteBack::replay(const QByteArray &type, const QByteArray &key, const QByteArray &value) | ||
732 | { | ||
733 | mTransaction = mStorage.createTransaction(Sink::Storage::ReadOnly); | ||
734 | mSyncTransaction = mSyncStorage.createTransaction(Sink::Storage::ReadWrite); | ||
735 | |||
736 | Sink::EntityBuffer buffer(value); | ||
737 | const Sink::Entity &entity = buffer.entity(); | ||
738 | const auto metadataBuffer = Sink::EntityBuffer::readBuffer<Sink::Metadata>(entity.metadata()); | ||
739 | Q_ASSERT(metadataBuffer); | ||
740 | if (!metadataBuffer->replayToSource()) { | ||
741 | Trace() << "Change is coming from the source"; | ||
742 | return KAsync::null<void>(); | ||
743 | } | ||
744 | const qint64 revision = metadataBuffer ? metadataBuffer->revision() : -1; | ||
745 | const auto operation = metadataBuffer ? metadataBuffer->operation() : Sink::Operation_Creation; | ||
746 | const auto uid = Sink::Storage::uidFromKey(key); | ||
747 | QByteArray oldRemoteId; | ||
748 | |||
749 | if (operation != Sink::Operation_Creation) { | ||
750 | oldRemoteId = syncStore().resolveLocalId(type, uid); | ||
751 | } | ||
752 | Trace() << "Replaying " << key << type; | ||
753 | |||
754 | KAsync::Job<QByteArray> job = KAsync::null<QByteArray>(); | ||
755 | if (type == ENTITY_TYPE_FOLDER) { | ||
756 | auto folder = store().read<ApplicationDomain::Folder>(uid); | ||
757 | // const Sink::ApplicationDomain::Folder folder(mResourceInstanceIdentifier, uid, revision, mAdaptorFactories.value(type)->createAdaptor(entity)); | ||
758 | job = replay(folder, operation, oldRemoteId); | ||
759 | } else if (type == ENTITY_TYPE_MAIL) { | ||
760 | auto mail = store().read<ApplicationDomain::Mail>(uid); | ||
761 | // const Sink::ApplicationDomain::Mail mail(mResourceInstanceIdentifier, uid, revision, mAdaptorFactories.value(type)->createAdaptor(entity)); | ||
762 | job = replay(mail, operation, oldRemoteId); | ||
763 | } | ||
764 | |||
765 | return job.then<void, QByteArray>([this, operation, type, uid](const QByteArray &remoteId) { | ||
766 | Trace() << "Replayed change with remote id: " << remoteId; | ||
767 | if (operation == Sink::Operation_Creation) { | ||
768 | if (remoteId.isEmpty()) { | ||
769 | Warning() << "Returned an empty remoteId from the creation"; | ||
770 | } else { | ||
771 | syncStore().recordRemoteId(type, uid, remoteId); | ||
772 | } | ||
773 | } else if (operation == Sink::Operation_Modification) { | ||
774 | if (remoteId.isEmpty()) { | ||
775 | Warning() << "Returned an empty remoteId from the creation"; | ||
776 | } else { | ||
777 | syncStore().updateRemoteId(type, uid, remoteId); | ||
778 | } | ||
779 | } else if (operation == Sink::Operation_Removal) { | ||
780 | syncStore().removeRemoteId(type, uid, remoteId); | ||
781 | } else { | ||
782 | Warning() << "Unkown operation" << operation; | ||
783 | } | ||
784 | |||
785 | mTransaction.abort(); | ||
786 | mSyncTransaction.commit(); | ||
787 | mSyncStore.clear(); | ||
788 | mEntityStore.clear(); | ||
789 | }, [](int errorCode, const QString &errorMessage) { | ||
790 | Warning() << "Failed to replay change: " << errorMessage; | ||
791 | }); | ||
792 | } | ||
793 | |||
794 | KAsync::Job<QByteArray> SourceWriteBack::replay(const ApplicationDomain::Mail &, Sink::Operation, const QByteArray &) | ||
795 | { | ||
796 | return KAsync::null<QByteArray>(); | ||
797 | } | ||
798 | |||
799 | KAsync::Job<QByteArray> SourceWriteBack::replay(const ApplicationDomain::Folder &, Sink::Operation, const QByteArray &) | ||
800 | { | ||
801 | return KAsync::null<QByteArray>(); | ||
802 | } | ||
803 | |||
804 | |||
805 | #pragma clang diagnostic push | 465 | #pragma clang diagnostic push |
806 | #pragma clang diagnostic ignored "-Wundefined-reinterpret-cast" | 466 | #pragma clang diagnostic ignored "-Wundefined-reinterpret-cast" |
807 | #include "genericresource.moc" | 467 | #include "genericresource.moc" |
diff --git a/common/genericresource.h b/common/genericresource.h index 45d5d3a..4ed408d 100644 --- a/common/genericresource.h +++ b/common/genericresource.h | |||
@@ -82,147 +82,6 @@ private: | |||
82 | int mError; | 82 | int mError; |
83 | QTimer mCommitQueueTimer; | 83 | QTimer mCommitQueueTimer; |
84 | qint64 mClientLowerBoundRevision; | 84 | qint64 mClientLowerBoundRevision; |
85 | QHash<QByteArray, DomainTypeAdaptorFactoryInterface::Ptr> mAdaptorFactories; | ||
86 | }; | 85 | }; |
87 | 86 | ||
88 | class SINK_EXPORT SyncStore | ||
89 | { | ||
90 | public: | ||
91 | SyncStore(Sink::Storage::Transaction &); | ||
92 | |||
93 | /** | ||
94 | * Records a localId to remoteId mapping | ||
95 | */ | ||
96 | void recordRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId); | ||
97 | void removeRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId); | ||
98 | void updateRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId); | ||
99 | |||
100 | /** | ||
101 | * Tries to find a local id for the remote id, and creates a new local id otherwise. | ||
102 | * | ||
103 | * The new local id is recorded in the local to remote id mapping. | ||
104 | */ | ||
105 | QByteArray resolveRemoteId(const QByteArray &type, const QByteArray &remoteId); | ||
106 | |||
107 | /** | ||
108 | * Tries to find a remote id for a local id. | ||
109 | * | ||
110 | * This can fail if the entity hasn't been written back to the server yet. | ||
111 | */ | ||
112 | QByteArray resolveLocalId(const QByteArray &bufferType, const QByteArray &localId); | ||
113 | |||
114 | private: | ||
115 | Sink::Storage::Transaction &mTransaction; | ||
116 | }; | ||
117 | |||
118 | class SINK_EXPORT EntityStore | ||
119 | { | ||
120 | public: | ||
121 | EntityStore(const QByteArray &resourceType, const QByteArray &mResourceInstanceIdentifier, Sink::Storage::Transaction &transaction); | ||
122 | |||
123 | template<typename T> | ||
124 | T read(const QByteArray &identifier) const; | ||
125 | |||
126 | static QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> getLatest(const Sink::Storage::NamedDatabase &db, const QByteArray &uid, DomainTypeAdaptorFactoryInterface &adaptorFactory); | ||
127 | private: | ||
128 | QByteArray mResourceType; | ||
129 | QByteArray mResourceInstanceIdentifier; | ||
130 | Sink::Storage::Transaction &mTransaction; | ||
131 | }; | ||
132 | |||
133 | /** | ||
134 | * Synchronize and add what we don't already have to local queue | ||
135 | */ | ||
136 | class SINK_EXPORT Synchronizer | ||
137 | { | ||
138 | public: | ||
139 | Synchronizer(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier); | ||
140 | |||
141 | void setup(const std::function<void(int commandId, const QByteArray &data)> &enqueueCommandCallback); | ||
142 | KAsync::Job<void> synchronize(); | ||
143 | |||
144 | protected: | ||
145 | ///Calls the callback to enqueue the command | ||
146 | void enqueueCommand(int commandId, const QByteArray &data); | ||
147 | |||
148 | static void createEntity(const QByteArray &localId, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject, | ||
149 | DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function<void(const QByteArray &)> callback); | ||
150 | static void modifyEntity(const QByteArray &localId, qint64 revision, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject, | ||
151 | DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function<void(const QByteArray &)> callback); | ||
152 | static void deleteEntity(const QByteArray &localId, qint64 revision, const QByteArray &bufferType, std::function<void(const QByteArray &)> callback); | ||
153 | |||
154 | /** | ||
155 | * A synchronous algorithm to remove entities that are no longer existing. | ||
156 | * | ||
157 | * A list of entities is generated by @param entryGenerator. | ||
158 | * The entiry Generator typically iterates over an index to produce all existing entries. | ||
159 | * This algorithm calls @param exists for every entity of type @param type, with its remoteId. For every entity where @param exists returns false, | ||
160 | * an entity delete command is enqueued. | ||
161 | * | ||
162 | * All functions are called synchronously, and both @param entryGenerator and @param exists need to be synchronous. | ||
163 | */ | ||
164 | void scanForRemovals(const QByteArray &bufferType, | ||
165 | const std::function<void(const std::function<void(const QByteArray &key)> &callback)> &entryGenerator, std::function<bool(const QByteArray &remoteId)> exists); | ||
166 | |||
167 | /** | ||
168 | * An algorithm to create or modify the entity. | ||
169 | * | ||
170 | * Depending on whether the entity is locally available, or has changed. | ||
171 | */ | ||
172 | void createOrModify(const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity); | ||
173 | |||
174 | //Read only access to main storage | ||
175 | EntityStore &store(); | ||
176 | |||
177 | //Read/Write access to sync storage | ||
178 | SyncStore &syncStore(); | ||
179 | |||
180 | virtual KAsync::Job<void> synchronizeWithSource() = 0; | ||
181 | |||
182 | private: | ||
183 | QSharedPointer<SyncStore> mSyncStore; | ||
184 | QSharedPointer<EntityStore> mEntityStore; | ||
185 | Sink::Storage mStorage; | ||
186 | Sink::Storage mSyncStorage; | ||
187 | QByteArray mResourceType; | ||
188 | QByteArray mResourceInstanceIdentifier; | ||
189 | Sink::Storage::Transaction mTransaction; | ||
190 | Sink::Storage::Transaction mSyncTransaction; | ||
191 | std::function<void(int commandId, const QByteArray &data)> mEnqueue; | ||
192 | }; | ||
193 | |||
194 | /** | ||
195 | * Replay changes to the source | ||
196 | */ | ||
197 | class SINK_EXPORT SourceWriteBack : public ChangeReplay | ||
198 | { | ||
199 | public: | ||
200 | SourceWriteBack(const QByteArray &resourceType,const QByteArray &resourceInstanceIdentifier); | ||
201 | |||
202 | protected: | ||
203 | ///Base implementation calls the replay$Type calls | ||
204 | virtual KAsync::Job<void> replay(const QByteArray &type, const QByteArray &key, const QByteArray &value) Q_DECL_OVERRIDE; | ||
205 | |||
206 | protected: | ||
207 | ///Implement to write back changes to the server | ||
208 | virtual KAsync::Job<QByteArray> replay(const Sink::ApplicationDomain::Mail &, Sink::Operation, const QByteArray &oldRemoteId); | ||
209 | virtual KAsync::Job<QByteArray> replay(const Sink::ApplicationDomain::Folder &, Sink::Operation, const QByteArray &oldRemoteId); | ||
210 | |||
211 | //Read only access to main storage | ||
212 | EntityStore &store(); | ||
213 | |||
214 | //Read/Write access to sync storage | ||
215 | SyncStore &syncStore(); | ||
216 | |||
217 | private: | ||
218 | Sink::Storage mSyncStorage; | ||
219 | QSharedPointer<SyncStore> mSyncStore; | ||
220 | QSharedPointer<EntityStore> mEntityStore; | ||
221 | Sink::Storage::Transaction mTransaction; | ||
222 | Sink::Storage::Transaction mSyncTransaction; | ||
223 | QByteArray mResourceType; | ||
224 | QByteArray mResourceInstanceIdentifier; | ||
225 | }; | ||
226 | |||
227 | |||
228 | } | 87 | } |
diff --git a/common/remoteidmap.cpp b/common/remoteidmap.cpp new file mode 100644 index 0000000..f72369d --- /dev/null +++ b/common/remoteidmap.cpp | |||
@@ -0,0 +1,75 @@ | |||
1 | /* | ||
2 | * Copyright (C) 2016 Christian Mollekopf <mollekopf@kolabsys.com> | ||
3 | * | ||
4 | * This library is free software; you can redistribute it and/or | ||
5 | * modify it under the terms of the GNU Lesser General Public | ||
6 | * License as published by the Free Software Foundation; either | ||
7 | * version 2.1 of the License, or (at your option) version 3, or any | ||
8 | * later version accepted by the membership of KDE e.V. (or its | ||
9 | * successor approved by the membership of KDE e.V.), which shall | ||
10 | * act as a proxy defined in Section 6 of version 3 of the license. | ||
11 | * | ||
12 | * This library is distributed in the hope that it will be useful, | ||
13 | * but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
14 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | ||
15 | * Lesser General Public License for more details. | ||
16 | * | ||
17 | * You should have received a copy of the GNU Lesser General Public | ||
18 | * License along with this library. If not, see <http://www.gnu.org/licenses/>. | ||
19 | */ | ||
20 | #include "remoteidmap.h" | ||
21 | |||
22 | #include <QUuid> | ||
23 | #include "index.h" | ||
24 | #include "log.h" | ||
25 | |||
26 | using namespace Sink; | ||
27 | |||
28 | RemoteIdMap::RemoteIdMap(Sink::Storage::Transaction &transaction) | ||
29 | : mTransaction(transaction) | ||
30 | { | ||
31 | |||
32 | } | ||
33 | |||
34 | void RemoteIdMap::recordRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId) | ||
35 | { | ||
36 | Index("rid.mapping." + bufferType, mTransaction).add(remoteId, localId); | ||
37 | Index("localid.mapping." + bufferType, mTransaction).add(localId, remoteId); | ||
38 | } | ||
39 | |||
40 | void RemoteIdMap::removeRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId) | ||
41 | { | ||
42 | Index("rid.mapping." + bufferType, mTransaction).remove(remoteId, localId); | ||
43 | Index("localid.mapping." + bufferType, mTransaction).remove(localId, remoteId); | ||
44 | } | ||
45 | |||
46 | void RemoteIdMap::updateRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId) | ||
47 | { | ||
48 | const auto oldRemoteId = Index("localid.mapping." + bufferType, mTransaction).lookup(localId); | ||
49 | removeRemoteId(bufferType, localId, oldRemoteId); | ||
50 | recordRemoteId(bufferType, localId, remoteId); | ||
51 | } | ||
52 | |||
53 | QByteArray RemoteIdMap::resolveRemoteId(const QByteArray &bufferType, const QByteArray &remoteId) | ||
54 | { | ||
55 | // Lookup local id for remote id, or insert a new pair otherwise | ||
56 | Index index("rid.mapping." + bufferType, mTransaction); | ||
57 | QByteArray sinkId = index.lookup(remoteId); | ||
58 | if (sinkId.isEmpty()) { | ||
59 | sinkId = QUuid::createUuid().toString().toUtf8(); | ||
60 | index.add(remoteId, sinkId); | ||
61 | Index("localid.mapping." + bufferType, mTransaction).add(sinkId, remoteId); | ||
62 | } | ||
63 | return sinkId; | ||
64 | } | ||
65 | |||
66 | QByteArray RemoteIdMap::resolveLocalId(const QByteArray &bufferType, const QByteArray &localId) | ||
67 | { | ||
68 | QByteArray remoteId = Index("localid.mapping." + bufferType, mTransaction).lookup(localId); | ||
69 | if (remoteId.isEmpty()) { | ||
70 | Warning() << "Couldn't find the remote id for " << localId; | ||
71 | return QByteArray(); | ||
72 | } | ||
73 | return remoteId; | ||
74 | } | ||
75 | |||
diff --git a/common/remoteidmap.h b/common/remoteidmap.h new file mode 100644 index 0000000..12891dc --- /dev/null +++ b/common/remoteidmap.h | |||
@@ -0,0 +1,61 @@ | |||
1 | /* | ||
2 | * Copyright (C) 2016 Christian Mollekopf <mollekopf@kolabsys.com> | ||
3 | * | ||
4 | * This library is free software; you can redistribute it and/or | ||
5 | * modify it under the terms of the GNU Lesser General Public | ||
6 | * License as published by the Free Software Foundation; either | ||
7 | * version 2.1 of the License, or (at your option) version 3, or any | ||
8 | * later version accepted by the membership of KDE e.V. (or its | ||
9 | * successor approved by the membership of KDE e.V.), which shall | ||
10 | * act as a proxy defined in Section 6 of version 3 of the license. | ||
11 | * | ||
12 | * This library is distributed in the hope that it will be useful, | ||
13 | * but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
14 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | ||
15 | * Lesser General Public License for more details. | ||
16 | * | ||
17 | * You should have received a copy of the GNU Lesser General Public | ||
18 | * License along with this library. If not, see <http://www.gnu.org/licenses/>. | ||
19 | */ | ||
20 | #pragma once | ||
21 | |||
22 | #include "sink_export.h" | ||
23 | |||
24 | #include "storage.h" | ||
25 | |||
26 | namespace Sink { | ||
27 | |||
28 | /** | ||
29 | * A remoteId mapping | ||
30 | */ | ||
31 | class SINK_EXPORT RemoteIdMap | ||
32 | { | ||
33 | public: | ||
34 | RemoteIdMap(Sink::Storage::Transaction &); | ||
35 | |||
36 | /** | ||
37 | * Records a localId to remoteId mapping | ||
38 | */ | ||
39 | void recordRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId); | ||
40 | void removeRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId); | ||
41 | void updateRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId); | ||
42 | |||
43 | /** | ||
44 | * Tries to find a local id for the remote id, and creates a new local id otherwise. | ||
45 | * | ||
46 | * The new local id is recorded in the local to remote id mapping. | ||
47 | */ | ||
48 | QByteArray resolveRemoteId(const QByteArray &type, const QByteArray &remoteId); | ||
49 | |||
50 | /** | ||
51 | * Tries to find a remote id for a local id. | ||
52 | * | ||
53 | * This can fail if the entity hasn't been written back to the server yet. | ||
54 | */ | ||
55 | QByteArray resolveLocalId(const QByteArray &bufferType, const QByteArray &localId); | ||
56 | |||
57 | private: | ||
58 | Sink::Storage::Transaction &mTransaction; | ||
59 | }; | ||
60 | |||
61 | } | ||
diff --git a/common/sourcewriteback.cpp b/common/sourcewriteback.cpp new file mode 100644 index 0000000..1ef20d2 --- /dev/null +++ b/common/sourcewriteback.cpp | |||
@@ -0,0 +1,124 @@ | |||
1 | /* | ||
2 | * Copyright (C) 2016 Christian Mollekopf <mollekopf@kolabsys.com> | ||
3 | * | ||
4 | * This library is free software; you can redistribute it and/or | ||
5 | * modify it under the terms of the GNU Lesser General Public | ||
6 | * License as published by the Free Software Foundation; either | ||
7 | * version 2.1 of the License, or (at your option) version 3, or any | ||
8 | * later version accepted by the membership of KDE e.V. (or its | ||
9 | * successor approved by the membership of KDE e.V.), which shall | ||
10 | * act as a proxy defined in Section 6 of version 3 of the license. | ||
11 | * | ||
12 | * This library is distributed in the hope that it will be useful, | ||
13 | * but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
14 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | ||
15 | * Lesser General Public License for more details. | ||
16 | * | ||
17 | * You should have received a copy of the GNU Lesser General Public | ||
18 | * License along with this library. If not, see <http://www.gnu.org/licenses/>. | ||
19 | */ | ||
20 | #include "sourcewriteback.h" | ||
21 | |||
22 | #include "definitions.h" | ||
23 | #include "log.h" | ||
24 | |||
25 | #define ENTITY_TYPE_MAIL "mail" | ||
26 | #define ENTITY_TYPE_FOLDER "folder" | ||
27 | |||
28 | using namespace Sink; | ||
29 | |||
30 | SourceWriteBack::SourceWriteBack(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier) | ||
31 | : ChangeReplay(resourceInstanceIdentifier), | ||
32 | mSyncStorage(Sink::storageLocation(), resourceInstanceIdentifier + ".synchronization", Sink::Storage::ReadWrite), | ||
33 | mResourceType(resourceType), | ||
34 | mResourceInstanceIdentifier(resourceInstanceIdentifier) | ||
35 | { | ||
36 | |||
37 | } | ||
38 | |||
39 | EntityStore &SourceWriteBack::store() | ||
40 | { | ||
41 | if (!mEntityStore) { | ||
42 | mEntityStore = QSharedPointer<EntityStore>::create(mResourceType, mResourceInstanceIdentifier, mTransaction); | ||
43 | } | ||
44 | return *mEntityStore; | ||
45 | } | ||
46 | |||
47 | RemoteIdMap &SourceWriteBack::syncStore() | ||
48 | { | ||
49 | if (!mSyncStore) { | ||
50 | mSyncStore = QSharedPointer<RemoteIdMap>::create(mSyncTransaction); | ||
51 | } | ||
52 | return *mSyncStore; | ||
53 | } | ||
54 | |||
55 | KAsync::Job<void> SourceWriteBack::replay(const QByteArray &type, const QByteArray &key, const QByteArray &value) | ||
56 | { | ||
57 | mTransaction = mStorage.createTransaction(Sink::Storage::ReadOnly); | ||
58 | mSyncTransaction = mSyncStorage.createTransaction(Sink::Storage::ReadWrite); | ||
59 | |||
60 | Sink::EntityBuffer buffer(value); | ||
61 | const Sink::Entity &entity = buffer.entity(); | ||
62 | const auto metadataBuffer = Sink::EntityBuffer::readBuffer<Sink::Metadata>(entity.metadata()); | ||
63 | Q_ASSERT(metadataBuffer); | ||
64 | if (!metadataBuffer->replayToSource()) { | ||
65 | Trace() << "Change is coming from the source"; | ||
66 | return KAsync::null<void>(); | ||
67 | } | ||
68 | const qint64 revision = metadataBuffer ? metadataBuffer->revision() : -1; | ||
69 | const auto operation = metadataBuffer ? metadataBuffer->operation() : Sink::Operation_Creation; | ||
70 | const auto uid = Sink::Storage::uidFromKey(key); | ||
71 | QByteArray oldRemoteId; | ||
72 | |||
73 | if (operation != Sink::Operation_Creation) { | ||
74 | oldRemoteId = syncStore().resolveLocalId(type, uid); | ||
75 | } | ||
76 | Trace() << "Replaying " << key << type; | ||
77 | |||
78 | KAsync::Job<QByteArray> job = KAsync::null<QByteArray>(); | ||
79 | if (type == ENTITY_TYPE_FOLDER) { | ||
80 | auto folder = store().read<ApplicationDomain::Folder>(uid); | ||
81 | job = replay(folder, operation, oldRemoteId); | ||
82 | } else if (type == ENTITY_TYPE_MAIL) { | ||
83 | auto mail = store().read<ApplicationDomain::Mail>(uid); | ||
84 | job = replay(mail, operation, oldRemoteId); | ||
85 | } | ||
86 | |||
87 | return job.then<void, QByteArray>([this, operation, type, uid](const QByteArray &remoteId) { | ||
88 | Trace() << "Replayed change with remote id: " << remoteId; | ||
89 | if (operation == Sink::Operation_Creation) { | ||
90 | if (remoteId.isEmpty()) { | ||
91 | Warning() << "Returned an empty remoteId from the creation"; | ||
92 | } else { | ||
93 | syncStore().recordRemoteId(type, uid, remoteId); | ||
94 | } | ||
95 | } else if (operation == Sink::Operation_Modification) { | ||
96 | if (remoteId.isEmpty()) { | ||
97 | Warning() << "Returned an empty remoteId from the creation"; | ||
98 | } else { | ||
99 | syncStore().updateRemoteId(type, uid, remoteId); | ||
100 | } | ||
101 | } else if (operation == Sink::Operation_Removal) { | ||
102 | syncStore().removeRemoteId(type, uid, remoteId); | ||
103 | } else { | ||
104 | Warning() << "Unkown operation" << operation; | ||
105 | } | ||
106 | |||
107 | mTransaction.abort(); | ||
108 | mSyncTransaction.commit(); | ||
109 | mSyncStore.clear(); | ||
110 | mEntityStore.clear(); | ||
111 | }, [](int errorCode, const QString &errorMessage) { | ||
112 | Warning() << "Failed to replay change: " << errorMessage; | ||
113 | }); | ||
114 | } | ||
115 | |||
116 | KAsync::Job<QByteArray> SourceWriteBack::replay(const ApplicationDomain::Mail &, Sink::Operation, const QByteArray &) | ||
117 | { | ||
118 | return KAsync::null<QByteArray>(); | ||
119 | } | ||
120 | |||
121 | KAsync::Job<QByteArray> SourceWriteBack::replay(const ApplicationDomain::Folder &, Sink::Operation, const QByteArray &) | ||
122 | { | ||
123 | return KAsync::null<QByteArray>(); | ||
124 | } | ||
diff --git a/common/sourcewriteback.h b/common/sourcewriteback.h new file mode 100644 index 0000000..8470e00 --- /dev/null +++ b/common/sourcewriteback.h | |||
@@ -0,0 +1,64 @@ | |||
1 | /* | ||
2 | * Copyright (C) 2016 Christian Mollekopf <mollekopf@kolabsys.com> | ||
3 | * | ||
4 | * This library is free software; you can redistribute it and/or | ||
5 | * modify it under the terms of the GNU Lesser General Public | ||
6 | * License as published by the Free Software Foundation; either | ||
7 | * version 2.1 of the License, or (at your option) version 3, or any | ||
8 | * later version accepted by the membership of KDE e.V. (or its | ||
9 | * successor approved by the membership of KDE e.V.), which shall | ||
10 | * act as a proxy defined in Section 6 of version 3 of the license. | ||
11 | * | ||
12 | * This library is distributed in the hope that it will be useful, | ||
13 | * but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
14 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | ||
15 | * Lesser General Public License for more details. | ||
16 | * | ||
17 | * You should have received a copy of the GNU Lesser General Public | ||
18 | * License along with this library. If not, see <http://www.gnu.org/licenses/>. | ||
19 | */ | ||
20 | #pragma once | ||
21 | |||
22 | #include "sink_export.h" | ||
23 | |||
24 | #include "changereplay.h" | ||
25 | #include "storage.h" | ||
26 | #include "entitystore.h" | ||
27 | #include "remoteidmap.h" | ||
28 | |||
29 | namespace Sink { | ||
30 | |||
31 | /** | ||
32 | * Replay changes to the source | ||
33 | */ | ||
34 | class SINK_EXPORT SourceWriteBack : public ChangeReplay | ||
35 | { | ||
36 | public: | ||
37 | SourceWriteBack(const QByteArray &resourceType,const QByteArray &resourceInstanceIdentifier); | ||
38 | |||
39 | protected: | ||
40 | ///Base implementation calls the replay$Type calls | ||
41 | virtual KAsync::Job<void> replay(const QByteArray &type, const QByteArray &key, const QByteArray &value) Q_DECL_OVERRIDE; | ||
42 | |||
43 | protected: | ||
44 | ///Implement to write back changes to the server | ||
45 | virtual KAsync::Job<QByteArray> replay(const Sink::ApplicationDomain::Mail &, Sink::Operation, const QByteArray &oldRemoteId); | ||
46 | virtual KAsync::Job<QByteArray> replay(const Sink::ApplicationDomain::Folder &, Sink::Operation, const QByteArray &oldRemoteId); | ||
47 | |||
48 | //Read only access to main storage | ||
49 | EntityStore &store(); | ||
50 | |||
51 | //Read/Write access to sync storage | ||
52 | RemoteIdMap &syncStore(); | ||
53 | |||
54 | private: | ||
55 | Sink::Storage mSyncStorage; | ||
56 | QSharedPointer<RemoteIdMap> mSyncStore; | ||
57 | QSharedPointer<EntityStore> mEntityStore; | ||
58 | Sink::Storage::Transaction mTransaction; | ||
59 | Sink::Storage::Transaction mSyncTransaction; | ||
60 | QByteArray mResourceType; | ||
61 | QByteArray mResourceInstanceIdentifier; | ||
62 | }; | ||
63 | |||
64 | } | ||
diff --git a/common/synchronizer.cpp b/common/synchronizer.cpp new file mode 100644 index 0000000..fb0baaa --- /dev/null +++ b/common/synchronizer.cpp | |||
@@ -0,0 +1,176 @@ | |||
1 | /* | ||
2 | * Copyright (C) 2016 Christian Mollekopf <mollekopf@kolabsys.com> | ||
3 | * | ||
4 | * This library is free software; you can redistribute it and/or | ||
5 | * modify it under the terms of the GNU Lesser General Public | ||
6 | * License as published by the Free Software Foundation; either | ||
7 | * version 2.1 of the License, or (at your option) version 3, or any | ||
8 | * later version accepted by the membership of KDE e.V. (or its | ||
9 | * successor approved by the membership of KDE e.V.), which shall | ||
10 | * act as a proxy defined in Section 6 of version 3 of the license. | ||
11 | * | ||
12 | * This library is distributed in the hope that it will be useful, | ||
13 | * but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
14 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | ||
15 | * Lesser General Public License for more details. | ||
16 | * | ||
17 | * You should have received a copy of the GNU Lesser General Public | ||
18 | * License along with this library. If not, see <http://www.gnu.org/licenses/>. | ||
19 | */ | ||
20 | #include "synchronizer.h" | ||
21 | |||
22 | #include "definitions.h" | ||
23 | #include "commands.h" | ||
24 | #include "bufferutils.h" | ||
25 | #include "entitystore.h" | ||
26 | #include "remoteidmap.h" | ||
27 | #include "adaptorfactoryregistry.h" | ||
28 | #include "createentity_generated.h" | ||
29 | #include "modifyentity_generated.h" | ||
30 | #include "deleteentity_generated.h" | ||
31 | |||
32 | using namespace Sink; | ||
33 | |||
34 | Synchronizer::Synchronizer(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier) | ||
35 | : mStorage(Sink::storageLocation(), resourceInstanceIdentifier, Sink::Storage::ReadOnly), | ||
36 | mSyncStorage(Sink::storageLocation(), resourceInstanceIdentifier + ".synchronization", Sink::Storage::ReadWrite), | ||
37 | mResourceType(resourceType), | ||
38 | mResourceInstanceIdentifier(resourceInstanceIdentifier) | ||
39 | { | ||
40 | Trace() << "Starting synchronizer: " << resourceType << resourceInstanceIdentifier; | ||
41 | } | ||
42 | |||
43 | void Synchronizer::setup(const std::function<void(int commandId, const QByteArray &data)> &enqueueCommandCallback) | ||
44 | { | ||
45 | mEnqueue = enqueueCommandCallback; | ||
46 | } | ||
47 | |||
48 | void Synchronizer::enqueueCommand(int commandId, const QByteArray &data) | ||
49 | { | ||
50 | Q_ASSERT(mEnqueue); | ||
51 | mEnqueue(commandId, data); | ||
52 | } | ||
53 | |||
54 | EntityStore &Synchronizer::store() | ||
55 | { | ||
56 | if (!mEntityStore) { | ||
57 | mEntityStore = QSharedPointer<EntityStore>::create(mResourceType, mResourceInstanceIdentifier, mTransaction); | ||
58 | } | ||
59 | return *mEntityStore; | ||
60 | } | ||
61 | |||
62 | RemoteIdMap &Synchronizer::syncStore() | ||
63 | { | ||
64 | if (!mSyncStore) { | ||
65 | mSyncStore = QSharedPointer<RemoteIdMap>::create(mSyncTransaction); | ||
66 | } | ||
67 | return *mSyncStore; | ||
68 | } | ||
69 | |||
70 | void Synchronizer::createEntity(const QByteArray &sinkId, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject, | ||
71 | DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function<void(const QByteArray &)> callback) | ||
72 | { | ||
73 | // These changes are coming from the source | ||
74 | const auto replayToSource = false; | ||
75 | flatbuffers::FlatBufferBuilder entityFbb; | ||
76 | adaptorFactory.createBuffer(domainObject, entityFbb); | ||
77 | flatbuffers::FlatBufferBuilder fbb; | ||
78 | // This is the resource type and not the domain type | ||
79 | auto entityId = fbb.CreateString(sinkId.toStdString()); | ||
80 | auto type = fbb.CreateString(bufferType.toStdString()); | ||
81 | auto delta = Sink::EntityBuffer::appendAsVector(fbb, entityFbb.GetBufferPointer(), entityFbb.GetSize()); | ||
82 | auto location = Sink::Commands::CreateCreateEntity(fbb, entityId, type, delta, replayToSource); | ||
83 | Sink::Commands::FinishCreateEntityBuffer(fbb, location); | ||
84 | callback(BufferUtils::extractBuffer(fbb)); | ||
85 | } | ||
86 | |||
87 | void Synchronizer::modifyEntity(const QByteArray &sinkId, qint64 revision, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject, | ||
88 | DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function<void(const QByteArray &)> callback) | ||
89 | { | ||
90 | // These changes are coming from the source | ||
91 | const auto replayToSource = false; | ||
92 | flatbuffers::FlatBufferBuilder entityFbb; | ||
93 | adaptorFactory.createBuffer(domainObject, entityFbb); | ||
94 | flatbuffers::FlatBufferBuilder fbb; | ||
95 | auto entityId = fbb.CreateString(sinkId.toStdString()); | ||
96 | // This is the resource type and not the domain type | ||
97 | auto type = fbb.CreateString(bufferType.toStdString()); | ||
98 | auto delta = Sink::EntityBuffer::appendAsVector(fbb, entityFbb.GetBufferPointer(), entityFbb.GetSize()); | ||
99 | // FIXME removals | ||
100 | auto location = Sink::Commands::CreateModifyEntity(fbb, revision, entityId, 0, type, delta, replayToSource); | ||
101 | Sink::Commands::FinishModifyEntityBuffer(fbb, location); | ||
102 | callback(BufferUtils::extractBuffer(fbb)); | ||
103 | } | ||
104 | |||
105 | void Synchronizer::deleteEntity(const QByteArray &sinkId, qint64 revision, const QByteArray &bufferType, std::function<void(const QByteArray &)> callback) | ||
106 | { | ||
107 | // These changes are coming from the source | ||
108 | const auto replayToSource = false; | ||
109 | flatbuffers::FlatBufferBuilder fbb; | ||
110 | auto entityId = fbb.CreateString(sinkId.toStdString()); | ||
111 | // This is the resource type and not the domain type | ||
112 | auto type = fbb.CreateString(bufferType.toStdString()); | ||
113 | auto location = Sink::Commands::CreateDeleteEntity(fbb, revision, entityId, type, replayToSource); | ||
114 | Sink::Commands::FinishDeleteEntityBuffer(fbb, location); | ||
115 | callback(BufferUtils::extractBuffer(fbb)); | ||
116 | } | ||
117 | |||
118 | void Synchronizer::scanForRemovals(const QByteArray &bufferType, const std::function<void(const std::function<void(const QByteArray &key)> &callback)> &entryGenerator, std::function<bool(const QByteArray &remoteId)> exists) | ||
119 | { | ||
120 | entryGenerator([this, bufferType, &exists](const QByteArray &key) { | ||
121 | auto sinkId = Sink::Storage::uidFromKey(key); | ||
122 | Trace() << "Checking for removal " << key; | ||
123 | const auto remoteId = syncStore().resolveLocalId(bufferType, sinkId); | ||
124 | // If we have no remoteId, the entity hasn't been replayed to the source yet | ||
125 | if (!remoteId.isEmpty()) { | ||
126 | if (!exists(remoteId)) { | ||
127 | Trace() << "Found a removed entity: " << sinkId; | ||
128 | deleteEntity(sinkId, Sink::Storage::maxRevision(mTransaction), bufferType, | ||
129 | [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::DeleteEntityCommand, buffer); }); | ||
130 | } | ||
131 | } | ||
132 | }); | ||
133 | } | ||
134 | |||
135 | void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity) | ||
136 | { | ||
137 | Trace() << "Create or modify" << bufferType << remoteId; | ||
138 | auto mainDatabase = Storage::mainDatabase(mTransaction, bufferType); | ||
139 | const auto sinkId = syncStore().resolveRemoteId(bufferType, remoteId); | ||
140 | const auto found = mainDatabase.contains(sinkId); | ||
141 | auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(mResourceType, bufferType); | ||
142 | if (!found) { | ||
143 | Trace() << "Found a new entity: " << remoteId; | ||
144 | createEntity( | ||
145 | sinkId, bufferType, entity, *adaptorFactory, [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::CreateEntityCommand, buffer); }); | ||
146 | } else { // modification | ||
147 | if (auto current = store().getLatest(mainDatabase, sinkId, *adaptorFactory)) { | ||
148 | bool changed = false; | ||
149 | for (const auto &property : entity.changedProperties()) { | ||
150 | if (entity.getProperty(property) != current->getProperty(property)) { | ||
151 | Trace() << "Property changed " << sinkId << property; | ||
152 | changed = true; | ||
153 | } | ||
154 | } | ||
155 | if (changed) { | ||
156 | Trace() << "Found a modified entity: " << remoteId; | ||
157 | modifyEntity(sinkId, Sink::Storage::maxRevision(mTransaction), bufferType, entity, *adaptorFactory, | ||
158 | [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::ModifyEntityCommand, buffer); }); | ||
159 | } | ||
160 | } else { | ||
161 | Warning() << "Failed to get current entity"; | ||
162 | } | ||
163 | } | ||
164 | } | ||
165 | |||
166 | KAsync::Job<void> Synchronizer::synchronize() | ||
167 | { | ||
168 | mTransaction = mStorage.createTransaction(Sink::Storage::ReadOnly); | ||
169 | mSyncTransaction = mSyncStorage.createTransaction(Sink::Storage::ReadWrite); | ||
170 | return synchronizeWithSource().then<void>([this]() { | ||
171 | mTransaction.abort(); | ||
172 | mSyncTransaction.commit(); | ||
173 | mSyncStore.clear(); | ||
174 | mEntityStore.clear(); | ||
175 | }); | ||
176 | } | ||
diff --git a/common/synchronizer.h b/common/synchronizer.h new file mode 100644 index 0000000..61bca7d --- /dev/null +++ b/common/synchronizer.h | |||
@@ -0,0 +1,95 @@ | |||
1 | /* | ||
2 | * Copyright (C) 2016 Christian Mollekopf <mollekopf@kolabsys.com> | ||
3 | * | ||
4 | * This library is free software; you can redistribute it and/or | ||
5 | * modify it under the terms of the GNU Lesser General Public | ||
6 | * License as published by the Free Software Foundation; either | ||
7 | * version 2.1 of the License, or (at your option) version 3, or any | ||
8 | * later version accepted by the membership of KDE e.V. (or its | ||
9 | * successor approved by the membership of KDE e.V.), which shall | ||
10 | * act as a proxy defined in Section 6 of version 3 of the license. | ||
11 | * | ||
12 | * This library is distributed in the hope that it will be useful, | ||
13 | * but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
14 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | ||
15 | * Lesser General Public License for more details. | ||
16 | * | ||
17 | * You should have received a copy of the GNU Lesser General Public | ||
18 | * License along with this library. If not, see <http://www.gnu.org/licenses/>. | ||
19 | */ | ||
20 | #pragma once | ||
21 | |||
22 | #include "sink_export.h" | ||
23 | #include <QObject> | ||
24 | #include <Async/Async> | ||
25 | #include <domainadaptor.h> | ||
26 | |||
27 | #include "storage.h" | ||
28 | |||
29 | namespace Sink { | ||
30 | class EntityStore; | ||
31 | class RemoteIdMap; | ||
32 | |||
33 | /** | ||
34 | * Synchronize and add what we don't already have to local queue | ||
35 | */ | ||
36 | class SINK_EXPORT Synchronizer | ||
37 | { | ||
38 | public: | ||
39 | Synchronizer(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier); | ||
40 | |||
41 | void setup(const std::function<void(int commandId, const QByteArray &data)> &enqueueCommandCallback); | ||
42 | KAsync::Job<void> synchronize(); | ||
43 | |||
44 | protected: | ||
45 | ///Calls the callback to enqueue the command | ||
46 | void enqueueCommand(int commandId, const QByteArray &data); | ||
47 | |||
48 | static void createEntity(const QByteArray &localId, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject, | ||
49 | DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function<void(const QByteArray &)> callback); | ||
50 | static void modifyEntity(const QByteArray &localId, qint64 revision, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject, | ||
51 | DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function<void(const QByteArray &)> callback); | ||
52 | static void deleteEntity(const QByteArray &localId, qint64 revision, const QByteArray &bufferType, std::function<void(const QByteArray &)> callback); | ||
53 | |||
54 | /** | ||
55 | * A synchronous algorithm to remove entities that are no longer existing. | ||
56 | * | ||
57 | * A list of entities is generated by @param entryGenerator. | ||
58 | * The entiry Generator typically iterates over an index to produce all existing entries. | ||
59 | * This algorithm calls @param exists for every entity of type @param type, with its remoteId. For every entity where @param exists returns false, | ||
60 | * an entity delete command is enqueued. | ||
61 | * | ||
62 | * All functions are called synchronously, and both @param entryGenerator and @param exists need to be synchronous. | ||
63 | */ | ||
64 | void scanForRemovals(const QByteArray &bufferType, | ||
65 | const std::function<void(const std::function<void(const QByteArray &key)> &callback)> &entryGenerator, std::function<bool(const QByteArray &remoteId)> exists); | ||
66 | |||
67 | /** | ||
68 | * An algorithm to create or modify the entity. | ||
69 | * | ||
70 | * Depending on whether the entity is locally available, or has changed. | ||
71 | */ | ||
72 | void createOrModify(const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity); | ||
73 | |||
74 | //Read only access to main storage | ||
75 | EntityStore &store(); | ||
76 | |||
77 | //Read/Write access to sync storage | ||
78 | RemoteIdMap &syncStore(); | ||
79 | |||
80 | virtual KAsync::Job<void> synchronizeWithSource() = 0; | ||
81 | |||
82 | private: | ||
83 | QSharedPointer<RemoteIdMap> mSyncStore; | ||
84 | QSharedPointer<EntityStore> mEntityStore; | ||
85 | Sink::Storage mStorage; | ||
86 | Sink::Storage mSyncStorage; | ||
87 | QByteArray mResourceType; | ||
88 | QByteArray mResourceInstanceIdentifier; | ||
89 | Sink::Storage::Transaction mTransaction; | ||
90 | Sink::Storage::Transaction mSyncTransaction; | ||
91 | std::function<void(int commandId, const QByteArray &data)> mEnqueue; | ||
92 | }; | ||
93 | |||
94 | } | ||
95 | |||
diff --git a/examples/dummyresource/resourcefactory.cpp b/examples/dummyresource/resourcefactory.cpp index 609d23e..1708cc5 100644 --- a/examples/dummyresource/resourcefactory.cpp +++ b/examples/dummyresource/resourcefactory.cpp | |||
@@ -36,6 +36,8 @@ | |||
36 | #include "facadefactory.h" | 36 | #include "facadefactory.h" |
37 | #include "indexupdater.h" | 37 | #include "indexupdater.h" |
38 | #include "adaptorfactoryregistry.h" | 38 | #include "adaptorfactoryregistry.h" |
39 | #include "synchronizer.h" | ||
40 | #include "remoteidmap.h" | ||
39 | #include <QDate> | 41 | #include <QDate> |
40 | #include <QUuid> | 42 | #include <QUuid> |
41 | 43 | ||