diff options
Diffstat (limited to 'common')
-rw-r--r-- | common/CMakeLists.txt | 2 | ||||
-rw-r--r-- | common/adaptorfactoryregistry.cpp | 68 | ||||
-rw-r--r-- | common/adaptorfactoryregistry.h | 64 | ||||
-rw-r--r-- | common/changereplay.cpp | 102 | ||||
-rw-r--r-- | common/changereplay.h | 68 | ||||
-rw-r--r-- | common/genericresource.cpp | 560 | ||||
-rw-r--r-- | common/genericresource.h | 151 | ||||
-rw-r--r-- | common/pipeline.cpp | 17 | ||||
-rw-r--r-- | common/pipeline.h | 3 | ||||
-rw-r--r-- | common/resource.cpp | 2 | ||||
-rw-r--r-- | common/resource.h | 2 |
11 files changed, 742 insertions, 297 deletions
diff --git a/common/CMakeLists.txt b/common/CMakeLists.txt index c269a85..79b627a 100644 --- a/common/CMakeLists.txt +++ b/common/CMakeLists.txt | |||
@@ -66,6 +66,8 @@ set(command_SRCS | |||
66 | domain/folder.cpp | 66 | domain/folder.cpp |
67 | test.cpp | 67 | test.cpp |
68 | query.cpp | 68 | query.cpp |
69 | changereplay.cpp | ||
70 | adaptorfactoryregistry.cpp | ||
69 | ${storage_SRCS}) | 71 | ${storage_SRCS}) |
70 | 72 | ||
71 | add_library(${PROJECT_NAME} SHARED ${command_SRCS}) | 73 | add_library(${PROJECT_NAME} SHARED ${command_SRCS}) |
diff --git a/common/adaptorfactoryregistry.cpp b/common/adaptorfactoryregistry.cpp new file mode 100644 index 0000000..323a02d --- /dev/null +++ b/common/adaptorfactoryregistry.cpp | |||
@@ -0,0 +1,68 @@ | |||
1 | /* | ||
2 | * Copyright (C) 2016 Christian Mollekopf <chrigi_1@fastmail.fm> | ||
3 | * | ||
4 | * This library is free software; you can redistribute it and/or | ||
5 | * modify it under the terms of the GNU Lesser General Public | ||
6 | * License as published by the Free Software Foundation; either | ||
7 | * version 2.1 of the License, or (at your option) version 3, or any | ||
8 | * later version accepted by the membership of KDE e.V. (or its | ||
9 | * successor approved by the membership of KDE e.V.), which shall | ||
10 | * act as a proxy defined in Section 6 of version 3 of the license. | ||
11 | * | ||
12 | * This library is distributed in the hope that it will be useful, | ||
13 | * but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
14 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | ||
15 | * Lesser General Public License for more details. | ||
16 | * | ||
17 | * You should have received a copy of the GNU Lesser General Public | ||
18 | * License along with this library. If not, see <http://www.gnu.org/licenses/>. | ||
19 | */ | ||
20 | #include "adaptorfactoryregistry.h" | ||
21 | |||
22 | #include <QByteArray> | ||
23 | #include <QDebug> | ||
24 | #include <QMutex> | ||
25 | #include <functional> | ||
26 | #include <memory> | ||
27 | |||
28 | #include "domaintypeadaptorfactoryinterface.h" | ||
29 | #include "applicationdomaintype.h" | ||
30 | #include "log.h" | ||
31 | |||
32 | using namespace Sink; | ||
33 | |||
34 | AdaptorFactoryRegistry &AdaptorFactoryRegistry::instance() | ||
35 | { | ||
36 | // QMutexLocker locker(&sMutex); | ||
37 | static AdaptorFactoryRegistry *instance = 0; | ||
38 | if (!instance) { | ||
39 | instance = new AdaptorFactoryRegistry; | ||
40 | } | ||
41 | return *instance; | ||
42 | } | ||
43 | |||
44 | static QByteArray key(const QByteArray &resource, const QByteArray &type) | ||
45 | { | ||
46 | return resource + type; | ||
47 | } | ||
48 | |||
49 | AdaptorFactoryRegistry::AdaptorFactoryRegistry() | ||
50 | { | ||
51 | |||
52 | } | ||
53 | |||
54 | std::shared_ptr<DomainTypeAdaptorFactoryInterface> AdaptorFactoryRegistry::getFactory(const QByteArray &resource, const QByteArray &typeName) | ||
55 | { | ||
56 | const auto ptr = mRegistry.value(key(resource, typeName)); | ||
57 | //We have to check the pointer before the cast, otherwise a check would return true also for invalid instances. | ||
58 | if (!ptr) { | ||
59 | return std::shared_ptr<DomainTypeAdaptorFactoryInterface>(); | ||
60 | } | ||
61 | return std::static_pointer_cast<DomainTypeAdaptorFactoryInterface>(ptr); | ||
62 | } | ||
63 | |||
64 | void AdaptorFactoryRegistry::registerFactory(const QByteArray &resource, const std::shared_ptr<void> &instance, const QByteArray typeName) | ||
65 | { | ||
66 | mRegistry.insert(key(resource, typeName), instance); | ||
67 | } | ||
68 | |||
diff --git a/common/adaptorfactoryregistry.h b/common/adaptorfactoryregistry.h new file mode 100644 index 0000000..f06120a --- /dev/null +++ b/common/adaptorfactoryregistry.h | |||
@@ -0,0 +1,64 @@ | |||
1 | /* | ||
2 | * Copyright (C) 2016 Christian Mollekopf <chrigi_1@fastmail.fm> | ||
3 | * | ||
4 | * This library is free software; you can redistribute it and/or | ||
5 | * modify it under the terms of the GNU Lesser General Public | ||
6 | * License as published by the Free Software Foundation; either | ||
7 | * version 2.1 of the License, or (at your option) version 3, or any | ||
8 | * later version accepted by the membership of KDE e.V. (or its | ||
9 | * successor approved by the membership of KDE e.V.), which shall | ||
10 | * act as a proxy defined in Section 6 of version 3 of the license. | ||
11 | * | ||
12 | * This library is distributed in the hope that it will be useful, | ||
13 | * but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
14 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | ||
15 | * Lesser General Public License for more details. | ||
16 | * | ||
17 | * You should have received a copy of the GNU Lesser General Public | ||
18 | * License along with this library. If not, see <http://www.gnu.org/licenses/>. | ||
19 | */ | ||
20 | |||
21 | #pragma once | ||
22 | |||
23 | #include "sink_export.h" | ||
24 | #include <QByteArray> | ||
25 | #include <QDebug> | ||
26 | #include <QMutex> | ||
27 | #include <functional> | ||
28 | #include <memory> | ||
29 | |||
30 | #include "domaintypeadaptorfactoryinterface.h" | ||
31 | #include "applicationdomaintype.h" | ||
32 | #include "log.h" | ||
33 | |||
34 | namespace Sink { | ||
35 | |||
36 | /** | ||
37 | */ | ||
38 | class SINK_EXPORT AdaptorFactoryRegistry | ||
39 | { | ||
40 | public: | ||
41 | static AdaptorFactoryRegistry &instance(); | ||
42 | |||
43 | template <class DomainType, class Factory> | ||
44 | void registerFactory(const QByteArray &resource) | ||
45 | { | ||
46 | registerFactory(resource, std::make_shared<Factory>(), ApplicationDomain::getTypeName<DomainType>()); | ||
47 | } | ||
48 | |||
49 | template <class DomainType> | ||
50 | std::shared_ptr<DomainTypeAdaptorFactoryInterface> getFactory(const QByteArray &resource) | ||
51 | { | ||
52 | return getFactory(resource, ApplicationDomain::getTypeName<DomainType>()); | ||
53 | } | ||
54 | |||
55 | std::shared_ptr<DomainTypeAdaptorFactoryInterface> getFactory(const QByteArray &resource, const QByteArray &typeName); | ||
56 | |||
57 | private: | ||
58 | AdaptorFactoryRegistry(); | ||
59 | void registerFactory(const QByteArray &resource, const std::shared_ptr<void> &instance, const QByteArray typeName); | ||
60 | |||
61 | QHash<QByteArray, std::shared_ptr<void>> mRegistry; | ||
62 | static QMutex sMutex; | ||
63 | }; | ||
64 | } | ||
diff --git a/common/changereplay.cpp b/common/changereplay.cpp new file mode 100644 index 0000000..2447b6e --- /dev/null +++ b/common/changereplay.cpp | |||
@@ -0,0 +1,102 @@ | |||
1 | /* | ||
2 | * Copyright (C) 2016 Christian Mollekopf <mollekopf@kolabsys.com> | ||
3 | * | ||
4 | * This library is free software; you can redistribute it and/or | ||
5 | * modify it under the terms of the GNU Lesser General Public | ||
6 | * License as published by the Free Software Foundation; either | ||
7 | * version 2.1 of the License, or (at your option) version 3, or any | ||
8 | * later version accepted by the membership of KDE e.V. (or its | ||
9 | * successor approved by the membership of KDE e.V.), which shall | ||
10 | * act as a proxy defined in Section 6 of version 3 of the license. | ||
11 | * | ||
12 | * This library is distributed in the hope that it will be useful, | ||
13 | * but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
14 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | ||
15 | * Lesser General Public License for more details. | ||
16 | * | ||
17 | * You should have received a copy of the GNU Lesser General Public | ||
18 | * License along with this library. If not, see <http://www.gnu.org/licenses/>. | ||
19 | */ | ||
20 | #include "changereplay.h" | ||
21 | |||
22 | #include "entitybuffer.h" | ||
23 | #include "log.h" | ||
24 | #include "definitions.h" | ||
25 | #include "bufferutils.h" | ||
26 | |||
27 | using namespace Sink; | ||
28 | |||
29 | #undef DEBUG_AREA | ||
30 | #define DEBUG_AREA "resource.changereplay" | ||
31 | |||
32 | ChangeReplay::ChangeReplay(const QByteArray &resourceName) | ||
33 | : mStorage(storageLocation(), resourceName, Storage::ReadOnly), mChangeReplayStore(storageLocation(), resourceName + ".changereplay", Storage::ReadWrite) | ||
34 | { | ||
35 | Trace() << "Created change replay: " << resourceName; | ||
36 | } | ||
37 | |||
38 | qint64 ChangeReplay::getLastReplayedRevision() | ||
39 | { | ||
40 | qint64 lastReplayedRevision = 0; | ||
41 | auto replayStoreTransaction = mChangeReplayStore.createTransaction(Storage::ReadOnly); | ||
42 | replayStoreTransaction.openDatabase().scan("lastReplayedRevision", | ||
43 | [&lastReplayedRevision](const QByteArray &key, const QByteArray &value) -> bool { | ||
44 | lastReplayedRevision = value.toLongLong(); | ||
45 | return false; | ||
46 | }, | ||
47 | [](const Storage::Error &) {}); | ||
48 | return lastReplayedRevision; | ||
49 | } | ||
50 | |||
51 | bool ChangeReplay::allChangesReplayed() | ||
52 | { | ||
53 | const qint64 topRevision = Storage::maxRevision(mStorage.createTransaction(Storage::ReadOnly, [](const Sink::Storage::Error &error) { | ||
54 | Warning() << error.message; | ||
55 | })); | ||
56 | const qint64 lastReplayedRevision = getLastReplayedRevision(); | ||
57 | Trace() << "All changes replayed " << topRevision << lastReplayedRevision; | ||
58 | return (lastReplayedRevision >= topRevision); | ||
59 | } | ||
60 | |||
61 | void ChangeReplay::revisionChanged() | ||
62 | { | ||
63 | auto mainStoreTransaction = mStorage.createTransaction(Storage::ReadOnly, [](const Sink::Storage::Error &error) { | ||
64 | Warning() << error.message; | ||
65 | }); | ||
66 | auto replayStoreTransaction = mChangeReplayStore.createTransaction(Storage::ReadWrite, [](const Sink::Storage::Error &error) { | ||
67 | Warning() << error.message; | ||
68 | }); | ||
69 | qint64 lastReplayedRevision = 1; | ||
70 | replayStoreTransaction.openDatabase().scan("lastReplayedRevision", | ||
71 | [&lastReplayedRevision](const QByteArray &key, const QByteArray &value) -> bool { | ||
72 | lastReplayedRevision = value.toLongLong(); | ||
73 | return false; | ||
74 | }, | ||
75 | [](const Storage::Error &) {}); | ||
76 | const qint64 topRevision = Storage::maxRevision(mainStoreTransaction); | ||
77 | |||
78 | Trace() << "Changereplay from " << lastReplayedRevision << " to " << topRevision; | ||
79 | if (lastReplayedRevision <= topRevision) { | ||
80 | qint64 revision = lastReplayedRevision; | ||
81 | for (; revision <= topRevision; revision++) { | ||
82 | const auto uid = Storage::getUidFromRevision(mainStoreTransaction, revision); | ||
83 | const auto type = Storage::getTypeFromRevision(mainStoreTransaction, revision); | ||
84 | const auto key = Storage::assembleKey(uid, revision); | ||
85 | Storage::mainDatabase(mainStoreTransaction, type) | ||
86 | .scan(key, | ||
87 | [&lastReplayedRevision, type, this](const QByteArray &key, const QByteArray &value) -> bool { | ||
88 | replay(type, key, value).exec(); | ||
89 | // TODO make for loop async, and pass to async replay function together with type | ||
90 | Trace() << "Replaying " << key; | ||
91 | return false; | ||
92 | }, | ||
93 | [key](const Storage::Error &) { ErrorMsg() << "Failed to replay change " << key; }); | ||
94 | } | ||
95 | revision--; | ||
96 | replayStoreTransaction.openDatabase().write("lastReplayedRevision", QByteArray::number(revision)); | ||
97 | replayStoreTransaction.commit(); | ||
98 | Trace() << "Replayed until " << revision; | ||
99 | } | ||
100 | emit changesReplayed(); | ||
101 | } | ||
102 | |||
diff --git a/common/changereplay.h b/common/changereplay.h new file mode 100644 index 0000000..a568060 --- /dev/null +++ b/common/changereplay.h | |||
@@ -0,0 +1,68 @@ | |||
1 | /* | ||
2 | * Copyright (C) 2016 Christian Mollekopf <mollekopf@kolabsys.com> | ||
3 | * | ||
4 | * This library is free software; you can redistribute it and/or | ||
5 | * modify it under the terms of the GNU Lesser General Public | ||
6 | * License as published by the Free Software Foundation; either | ||
7 | * version 2.1 of the License, or (at your option) version 3, or any | ||
8 | * later version accepted by the membership of KDE e.V. (or its | ||
9 | * successor approved by the membership of KDE e.V.), which shall | ||
10 | * act as a proxy defined in Section 6 of version 3 of the license. | ||
11 | * | ||
12 | * This library is distributed in the hope that it will be useful, | ||
13 | * but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
14 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | ||
15 | * Lesser General Public License for more details. | ||
16 | * | ||
17 | * You should have received a copy of the GNU Lesser General Public | ||
18 | * License along with this library. If not, see <http://www.gnu.org/licenses/>. | ||
19 | */ | ||
20 | #pragma once | ||
21 | |||
22 | #include "sink_export.h" | ||
23 | #include <QObject> | ||
24 | #include <Async/Async> | ||
25 | |||
26 | #include "storage.h" | ||
27 | |||
28 | namespace Sink { | ||
29 | |||
30 | /** | ||
31 | * Replays changes from the storage one by one. | ||
32 | * | ||
33 | * Uses a local database to: | ||
34 | * * Remember what changes have been replayed already. | ||
35 | * * store a mapping of remote to local buffers | ||
36 | */ | ||
37 | class SINK_EXPORT ChangeReplay : public QObject | ||
38 | { | ||
39 | Q_OBJECT | ||
40 | public: | ||
41 | ChangeReplay(const QByteArray &resourceName); | ||
42 | |||
43 | qint64 getLastReplayedRevision(); | ||
44 | bool allChangesReplayed(); | ||
45 | |||
46 | signals: | ||
47 | void changesReplayed(); | ||
48 | |||
49 | public slots: | ||
50 | void revisionChanged(); | ||
51 | |||
52 | protected: | ||
53 | virtual KAsync::Job<void> replay(const QByteArray &type, const QByteArray &key, const QByteArray &value) = 0; | ||
54 | Sink::Storage mStorage; | ||
55 | |||
56 | private: | ||
57 | Sink::Storage mChangeReplayStore; | ||
58 | }; | ||
59 | |||
60 | class NullChangeReplay : public ChangeReplay | ||
61 | { | ||
62 | public: | ||
63 | NullChangeReplay() : ChangeReplay("null") {} | ||
64 | KAsync::Job<void> replay(const QByteArray &type, const QByteArray &key, const QByteArray &value) Q_DECL_OVERRIDE { return KAsync::null<void>(); } | ||
65 | }; | ||
66 | |||
67 | } | ||
68 | |||
diff --git a/common/genericresource.cpp b/common/genericresource.cpp index eae6ead..cb2ef21 100644 --- a/common/genericresource.cpp +++ b/common/genericresource.cpp | |||
@@ -1,3 +1,22 @@ | |||
1 | /* | ||
2 | * Copyright (C) 2016 Christian Mollekopf <mollekopf@kolabsys.com> | ||
3 | * | ||
4 | * This library is free software; you can redistribute it and/or | ||
5 | * modify it under the terms of the GNU Lesser General Public | ||
6 | * License as published by the Free Software Foundation; either | ||
7 | * version 2.1 of the License, or (at your option) version 3, or any | ||
8 | * later version accepted by the membership of KDE e.V. (or its | ||
9 | * successor approved by the membership of KDE e.V.), which shall | ||
10 | * act as a proxy defined in Section 6 of version 3 of the license. | ||
11 | * | ||
12 | * This library is distributed in the hope that it will be useful, | ||
13 | * but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
14 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | ||
15 | * Lesser General Public License for more details. | ||
16 | * | ||
17 | * You should have received a copy of the GNU Lesser General Public | ||
18 | * License along with this library. If not, see <http://www.gnu.org/licenses/>. | ||
19 | */ | ||
1 | #include "genericresource.h" | 20 | #include "genericresource.h" |
2 | 21 | ||
3 | #include "entitybuffer.h" | 22 | #include "entitybuffer.h" |
@@ -14,6 +33,7 @@ | |||
14 | #include "log.h" | 33 | #include "log.h" |
15 | #include "definitions.h" | 34 | #include "definitions.h" |
16 | #include "bufferutils.h" | 35 | #include "bufferutils.h" |
36 | #include "adaptorfactoryregistry.h" | ||
17 | 37 | ||
18 | #include <QUuid> | 38 | #include <QUuid> |
19 | #include <QDataStream> | 39 | #include <QDataStream> |
@@ -30,96 +50,6 @@ static int sCommitInterval = 10; | |||
30 | using namespace Sink; | 50 | using namespace Sink; |
31 | 51 | ||
32 | #undef DEBUG_AREA | 52 | #undef DEBUG_AREA |
33 | #define DEBUG_AREA "resource.changereplay" | ||
34 | |||
35 | /** | ||
36 | * Replays changes from the storage one by one. | ||
37 | * | ||
38 | * Uses a local database to: | ||
39 | * * Remember what changes have been replayed already. | ||
40 | * * store a mapping of remote to local buffers | ||
41 | */ | ||
42 | class ChangeReplay : public QObject | ||
43 | { | ||
44 | Q_OBJECT | ||
45 | public: | ||
46 | typedef std::function<KAsync::Job<void>(const QByteArray &type, const QByteArray &key, const QByteArray &value)> ReplayFunction; | ||
47 | |||
48 | ChangeReplay(const QString &resourceName, const ReplayFunction &replayFunction) | ||
49 | : mStorage(storageLocation(), resourceName, Storage::ReadOnly), mChangeReplayStore(storageLocation(), resourceName + ".changereplay", Storage::ReadWrite), mReplayFunction(replayFunction) | ||
50 | { | ||
51 | } | ||
52 | |||
53 | qint64 getLastReplayedRevision() | ||
54 | { | ||
55 | qint64 lastReplayedRevision = 0; | ||
56 | auto replayStoreTransaction = mChangeReplayStore.createTransaction(Storage::ReadOnly); | ||
57 | replayStoreTransaction.openDatabase().scan("lastReplayedRevision", | ||
58 | [&lastReplayedRevision](const QByteArray &key, const QByteArray &value) -> bool { | ||
59 | lastReplayedRevision = value.toLongLong(); | ||
60 | return false; | ||
61 | }, | ||
62 | [](const Storage::Error &) {}); | ||
63 | return lastReplayedRevision; | ||
64 | } | ||
65 | |||
66 | bool allChangesReplayed() | ||
67 | { | ||
68 | const qint64 topRevision = Storage::maxRevision(mStorage.createTransaction(Storage::ReadOnly)); | ||
69 | const qint64 lastReplayedRevision = getLastReplayedRevision(); | ||
70 | Trace() << "All changes replayed " << topRevision << lastReplayedRevision; | ||
71 | return (lastReplayedRevision >= topRevision); | ||
72 | } | ||
73 | |||
74 | signals: | ||
75 | void changesReplayed(); | ||
76 | |||
77 | public slots: | ||
78 | void revisionChanged() | ||
79 | { | ||
80 | auto mainStoreTransaction = mStorage.createTransaction(Storage::ReadOnly); | ||
81 | auto replayStoreTransaction = mChangeReplayStore.createTransaction(Storage::ReadWrite); | ||
82 | qint64 lastReplayedRevision = 1; | ||
83 | replayStoreTransaction.openDatabase().scan("lastReplayedRevision", | ||
84 | [&lastReplayedRevision](const QByteArray &key, const QByteArray &value) -> bool { | ||
85 | lastReplayedRevision = value.toLongLong(); | ||
86 | return false; | ||
87 | }, | ||
88 | [](const Storage::Error &) {}); | ||
89 | const qint64 topRevision = Storage::maxRevision(mainStoreTransaction); | ||
90 | |||
91 | Trace() << "Changereplay from " << lastReplayedRevision << " to " << topRevision; | ||
92 | if (lastReplayedRevision <= topRevision) { | ||
93 | qint64 revision = lastReplayedRevision; | ||
94 | for (; revision <= topRevision; revision++) { | ||
95 | const auto uid = Storage::getUidFromRevision(mainStoreTransaction, revision); | ||
96 | const auto type = Storage::getTypeFromRevision(mainStoreTransaction, revision); | ||
97 | const auto key = Storage::assembleKey(uid, revision); | ||
98 | Storage::mainDatabase(mainStoreTransaction, type) | ||
99 | .scan(key, | ||
100 | [&lastReplayedRevision, type, this](const QByteArray &key, const QByteArray &value) -> bool { | ||
101 | mReplayFunction(type, key, value).exec(); | ||
102 | // TODO make for loop async, and pass to async replay function together with type | ||
103 | Trace() << "Replaying " << key; | ||
104 | return false; | ||
105 | }, | ||
106 | [key](const Storage::Error &) { ErrorMsg() << "Failed to replay change " << key; }); | ||
107 | } | ||
108 | revision--; | ||
109 | replayStoreTransaction.openDatabase().write("lastReplayedRevision", QByteArray::number(revision)); | ||
110 | replayStoreTransaction.commit(); | ||
111 | Trace() << "Replayed until " << revision; | ||
112 | } | ||
113 | emit changesReplayed(); | ||
114 | } | ||
115 | |||
116 | private: | ||
117 | Sink::Storage mStorage; | ||
118 | Sink::Storage mChangeReplayStore; | ||
119 | ReplayFunction mReplayFunction; | ||
120 | }; | ||
121 | |||
122 | #undef DEBUG_AREA | ||
123 | #define DEBUG_AREA "resource.commandprocessor" | 53 | #define DEBUG_AREA "resource.commandprocessor" |
124 | 54 | ||
125 | /** | 55 | /** |
@@ -133,10 +63,9 @@ class CommandProcessor : public QObject | |||
133 | public: | 63 | public: |
134 | CommandProcessor(Sink::Pipeline *pipeline, QList<MessageQueue *> commandQueues) : QObject(), mPipeline(pipeline), mCommandQueues(commandQueues), mProcessingLock(false) | 64 | CommandProcessor(Sink::Pipeline *pipeline, QList<MessageQueue *> commandQueues) : QObject(), mPipeline(pipeline), mCommandQueues(commandQueues), mProcessingLock(false) |
135 | { | 65 | { |
136 | mPipeline->startTransaction(); | 66 | mLowerBoundRevision = Storage::maxRevision(mPipeline->storage().createTransaction(Storage::ReadOnly, [](const Sink::Storage::Error &error) { |
137 | // FIXME Should be initialized to the current value of the change replay queue | 67 | Warning() << error.message; |
138 | mLowerBoundRevision = Storage::maxRevision(mPipeline->transaction()); | 68 | })); |
139 | mPipeline->commit(); | ||
140 | 69 | ||
141 | for (auto queue : mCommandQueues) { | 70 | for (auto queue : mCommandQueues) { |
142 | const bool ret = connect(queue, &MessageQueue::messageReady, this, &CommandProcessor::process); | 71 | const bool ret = connect(queue, &MessageQueue::messageReady, this, &CommandProcessor::process); |
@@ -303,15 +232,22 @@ private: | |||
303 | #undef DEBUG_AREA | 232 | #undef DEBUG_AREA |
304 | #define DEBUG_AREA "resource" | 233 | #define DEBUG_AREA "resource" |
305 | 234 | ||
306 | GenericResource::GenericResource(const QByteArray &resourceInstanceIdentifier, const QSharedPointer<Pipeline> &pipeline) | 235 | GenericResource::GenericResource(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier, const QSharedPointer<Pipeline> &pipeline, const QSharedPointer<ChangeReplay> &changeReplay, const QSharedPointer<Synchronizer> &synchronizer) |
307 | : Sink::Resource(), | 236 | : Sink::Resource(), |
308 | mUserQueue(Sink::storageLocation(), resourceInstanceIdentifier + ".userqueue"), | 237 | mUserQueue(Sink::storageLocation(), resourceInstanceIdentifier + ".userqueue"), |
309 | mSynchronizerQueue(Sink::storageLocation(), resourceInstanceIdentifier + ".synchronizerqueue"), | 238 | mSynchronizerQueue(Sink::storageLocation(), resourceInstanceIdentifier + ".synchronizerqueue"), |
239 | mResourceType(resourceType), | ||
310 | mResourceInstanceIdentifier(resourceInstanceIdentifier), | 240 | mResourceInstanceIdentifier(resourceInstanceIdentifier), |
311 | mPipeline(pipeline ? pipeline : QSharedPointer<Sink::Pipeline>::create(resourceInstanceIdentifier)), | 241 | mPipeline(pipeline ? pipeline : QSharedPointer<Sink::Pipeline>::create(resourceInstanceIdentifier)), |
242 | mChangeReplay(changeReplay), | ||
243 | mSynchronizer(synchronizer), | ||
312 | mError(0), | 244 | mError(0), |
313 | mClientLowerBoundRevision(std::numeric_limits<qint64>::max()) | 245 | mClientLowerBoundRevision(std::numeric_limits<qint64>::max()) |
314 | { | 246 | { |
247 | mPipeline->setResourceType(mResourceType); | ||
248 | mSynchronizer->setup([this](int commandId, const QByteArray &data) { | ||
249 | enqueueCommand(mSynchronizerQueue, commandId, data); | ||
250 | }); | ||
315 | mProcessor = new CommandProcessor(mPipeline.data(), QList<MessageQueue *>() << &mUserQueue << &mSynchronizerQueue); | 251 | mProcessor = new CommandProcessor(mPipeline.data(), QList<MessageQueue *>() << &mUserQueue << &mSynchronizerQueue); |
316 | mProcessor->setInspectionCommand([this](void const *command, size_t size) { | 252 | mProcessor->setInspectionCommand([this](void const *command, size_t size) { |
317 | flatbuffers::Verifier verifier((const uint8_t *)command, size); | 253 | flatbuffers::Verifier verifier((const uint8_t *)command, size); |
@@ -353,14 +289,9 @@ GenericResource::GenericResource(const QByteArray &resourceInstanceIdentifier, c | |||
353 | }); | 289 | }); |
354 | QObject::connect(mProcessor, &CommandProcessor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); }); | 290 | QObject::connect(mProcessor, &CommandProcessor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); }); |
355 | QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, this, &Resource::revisionUpdated); | 291 | QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, this, &Resource::revisionUpdated); |
356 | mSourceChangeReplay = new ChangeReplay(resourceInstanceIdentifier, [this](const QByteArray &type, const QByteArray &key, const QByteArray &value) { | ||
357 | // This results in a deadlock when a sync is in progress and we try to create a second writing transaction (which is why we turn changereplay off during the sync) | ||
358 | auto synchronizationStore = QSharedPointer<Sink::Storage>::create(Sink::storageLocation(), mResourceInstanceIdentifier + ".synchronization", Sink::Storage::ReadWrite); | ||
359 | return this->replay(*synchronizationStore, type, key, value).then<void>([synchronizationStore]() {}); | ||
360 | }); | ||
361 | enableChangeReplay(true); | 292 | enableChangeReplay(true); |
362 | mClientLowerBoundRevision = mPipeline->cleanedUpRevision(); | 293 | mClientLowerBoundRevision = mPipeline->cleanedUpRevision(); |
363 | mProcessor->setOldestUsedRevision(mSourceChangeReplay->getLastReplayedRevision()); | 294 | mProcessor->setOldestUsedRevision(mChangeReplay->getLastReplayedRevision()); |
364 | 295 | ||
365 | mCommitQueueTimer.setInterval(sCommitInterval); | 296 | mCommitQueueTimer.setInterval(sCommitInterval); |
366 | mCommitQueueTimer.setSingleShot(true); | 297 | mCommitQueueTimer.setSingleShot(true); |
@@ -370,7 +301,6 @@ GenericResource::GenericResource(const QByteArray &resourceInstanceIdentifier, c | |||
370 | GenericResource::~GenericResource() | 301 | GenericResource::~GenericResource() |
371 | { | 302 | { |
372 | delete mProcessor; | 303 | delete mProcessor; |
373 | delete mSourceChangeReplay; | ||
374 | } | 304 | } |
375 | 305 | ||
376 | KAsync::Job<void> GenericResource::inspect( | 306 | KAsync::Job<void> GenericResource::inspect( |
@@ -383,86 +313,20 @@ KAsync::Job<void> GenericResource::inspect( | |||
383 | void GenericResource::enableChangeReplay(bool enable) | 313 | void GenericResource::enableChangeReplay(bool enable) |
384 | { | 314 | { |
385 | if (enable) { | 315 | if (enable) { |
386 | QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, mSourceChangeReplay, &ChangeReplay::revisionChanged, Qt::QueuedConnection); | 316 | QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, mChangeReplay.data(), &ChangeReplay::revisionChanged, Qt::QueuedConnection); |
387 | QObject::connect(mSourceChangeReplay, &ChangeReplay::changesReplayed, this, &GenericResource::updateLowerBoundRevision); | 317 | QObject::connect(mChangeReplay.data(), &ChangeReplay::changesReplayed, this, &GenericResource::updateLowerBoundRevision); |
388 | mSourceChangeReplay->revisionChanged(); | 318 | mChangeReplay->revisionChanged(); |
389 | } else { | 319 | } else { |
390 | QObject::disconnect(mPipeline.data(), &Pipeline::revisionUpdated, mSourceChangeReplay, &ChangeReplay::revisionChanged); | 320 | QObject::disconnect(mPipeline.data(), &Pipeline::revisionUpdated, mChangeReplay.data(), &ChangeReplay::revisionChanged); |
391 | QObject::disconnect(mSourceChangeReplay, &ChangeReplay::changesReplayed, this, &GenericResource::updateLowerBoundRevision); | 321 | QObject::disconnect(mChangeReplay.data(), &ChangeReplay::changesReplayed, this, &GenericResource::updateLowerBoundRevision); |
392 | } | 322 | } |
393 | } | 323 | } |
394 | 324 | ||
395 | void GenericResource::addType(const QByteArray &type, DomainTypeAdaptorFactoryInterface::Ptr factory, const QVector<Sink::Preprocessor *> &preprocessors) | 325 | void GenericResource::addType(const QByteArray &type, DomainTypeAdaptorFactoryInterface::Ptr factory, const QVector<Sink::Preprocessor *> &preprocessors) |
396 | { | 326 | { |
397 | mPipeline->setPreprocessors(type, preprocessors); | 327 | mPipeline->setPreprocessors(type, preprocessors); |
398 | mPipeline->setAdaptorFactory(type, factory); | ||
399 | mAdaptorFactories.insert(type, factory); | ||
400 | } | ||
401 | |||
402 | KAsync::Job<void> GenericResource::replay(Sink::Storage &synchronizationStore, const QByteArray &type, const QByteArray &key, const QByteArray &value) | ||
403 | { | ||
404 | Sink::EntityBuffer buffer(value); | ||
405 | const Sink::Entity &entity = buffer.entity(); | ||
406 | const auto metadataBuffer = Sink::EntityBuffer::readBuffer<Sink::Metadata>(entity.metadata()); | ||
407 | Q_ASSERT(metadataBuffer); | ||
408 | if (!metadataBuffer->replayToSource()) { | ||
409 | Trace() << "Change is coming from the source"; | ||
410 | return KAsync::null<void>(); | ||
411 | } | ||
412 | const qint64 revision = metadataBuffer ? metadataBuffer->revision() : -1; | ||
413 | const auto operation = metadataBuffer ? metadataBuffer->operation() : Sink::Operation_Creation; | ||
414 | const auto uid = Sink::Storage::uidFromKey(key); | ||
415 | QByteArray oldRemoteId; | ||
416 | |||
417 | if (operation != Sink::Operation_Creation) { | ||
418 | auto synchronizationTransaction = synchronizationStore.createTransaction(Sink::Storage::ReadOnly); | ||
419 | oldRemoteId = resolveLocalId(type, uid, synchronizationTransaction); | ||
420 | } | ||
421 | Trace() << "Replaying " << key << type; | ||
422 | |||
423 | KAsync::Job<QByteArray> job = KAsync::null<QByteArray>(); | ||
424 | if (type == ENTITY_TYPE_FOLDER) { | ||
425 | const Sink::ApplicationDomain::Folder folder(mResourceInstanceIdentifier, uid, revision, mAdaptorFactories.value(type)->createAdaptor(entity)); | ||
426 | job = replay(folder, operation, oldRemoteId); | ||
427 | } else if (type == ENTITY_TYPE_MAIL) { | ||
428 | const Sink::ApplicationDomain::Mail mail(mResourceInstanceIdentifier, uid, revision, mAdaptorFactories.value(type)->createAdaptor(entity)); | ||
429 | job = replay(mail, operation, oldRemoteId); | ||
430 | } | ||
431 | |||
432 | return job.then<void, QByteArray>([=, &synchronizationStore](const QByteArray &remoteId) { | ||
433 | auto synchronizationTransaction = synchronizationStore.createTransaction(Sink::Storage::ReadWrite); | ||
434 | Trace() << "Replayed change with remote id: " << remoteId; | ||
435 | if (operation == Sink::Operation_Creation) { | ||
436 | if (remoteId.isEmpty()) { | ||
437 | Warning() << "Returned an empty remoteId from the creation"; | ||
438 | } else { | ||
439 | recordRemoteId(type, uid, remoteId, synchronizationTransaction); | ||
440 | } | ||
441 | } else if (operation == Sink::Operation_Modification) { | ||
442 | if (remoteId.isEmpty()) { | ||
443 | Warning() << "Returned an empty remoteId from the creation"; | ||
444 | } else { | ||
445 | updateRemoteId(type, uid, remoteId, synchronizationTransaction); | ||
446 | } | ||
447 | } else if (operation == Sink::Operation_Removal) { | ||
448 | removeRemoteId(type, uid, remoteId, synchronizationTransaction); | ||
449 | } else { | ||
450 | Warning() << "Unkown operation" << operation; | ||
451 | } | ||
452 | }, [](int errorCode, const QString &errorMessage) { | ||
453 | Warning() << "Failed to replay change: " << errorMessage; | ||
454 | }); | ||
455 | } | 328 | } |
456 | 329 | ||
457 | KAsync::Job<QByteArray> GenericResource::replay(const ApplicationDomain::Mail &, Sink::Operation, const QByteArray &) | ||
458 | { | ||
459 | return KAsync::null<QByteArray>(); | ||
460 | } | ||
461 | |||
462 | KAsync::Job<QByteArray> GenericResource::replay(const ApplicationDomain::Folder &, Sink::Operation, const QByteArray &) | ||
463 | { | ||
464 | return KAsync::null<QByteArray>(); | ||
465 | } | ||
466 | 330 | ||
467 | void GenericResource::removeDataFromDisk() | 331 | void GenericResource::removeDataFromDisk() |
468 | { | 332 | { |
@@ -528,10 +392,8 @@ KAsync::Job<void> GenericResource::synchronizeWithSource() | |||
528 | Log() << " Synchronizing"; | 392 | Log() << " Synchronizing"; |
529 | // Changereplay would deadlock otherwise when trying to open the synchronization store | 393 | // Changereplay would deadlock otherwise when trying to open the synchronization store |
530 | enableChangeReplay(false); | 394 | enableChangeReplay(false); |
531 | auto mainStore = QSharedPointer<Sink::Storage>::create(Sink::storageLocation(), mResourceInstanceIdentifier, Sink::Storage::ReadOnly); | 395 | mSynchronizer->synchronize() |
532 | auto syncStore = QSharedPointer<Sink::Storage>::create(Sink::storageLocation(), mResourceInstanceIdentifier + ".synchronization", Sink::Storage::ReadWrite); | 396 | .then<void>([this, &future]() { |
533 | synchronizeWithSource(*mainStore, *syncStore) | ||
534 | .then<void>([this, mainStore, syncStore, &future]() { | ||
535 | Log() << "Done Synchronizing"; | 397 | Log() << "Done Synchronizing"; |
536 | enableChangeReplay(true); | 398 | enableChangeReplay(true); |
537 | future.setFinished(); | 399 | future.setFinished(); |
@@ -576,11 +438,11 @@ KAsync::Job<void> GenericResource::processAllMessages() | |||
576 | .then<void>([this](KAsync::Future<void> &f) { waitForDrained(f, mSynchronizerQueue); }) | 438 | .then<void>([this](KAsync::Future<void> &f) { waitForDrained(f, mSynchronizerQueue); }) |
577 | .then<void>([this](KAsync::Future<void> &f) { waitForDrained(f, mUserQueue); }) | 439 | .then<void>([this](KAsync::Future<void> &f) { waitForDrained(f, mUserQueue); }) |
578 | .then<void>([this](KAsync::Future<void> &f) { | 440 | .then<void>([this](KAsync::Future<void> &f) { |
579 | if (mSourceChangeReplay->allChangesReplayed()) { | 441 | if (mChangeReplay->allChangesReplayed()) { |
580 | f.setFinished(); | 442 | f.setFinished(); |
581 | } else { | 443 | } else { |
582 | auto context = new QObject; | 444 | auto context = new QObject; |
583 | QObject::connect(mSourceChangeReplay, &ChangeReplay::changesReplayed, context, [&f, context]() { | 445 | QObject::connect(mChangeReplay.data(), &ChangeReplay::changesReplayed, context, [&f, context]() { |
584 | delete context; | 446 | delete context; |
585 | f.setFinished(); | 447 | f.setFinished(); |
586 | }); | 448 | }); |
@@ -590,7 +452,7 @@ KAsync::Job<void> GenericResource::processAllMessages() | |||
590 | 452 | ||
591 | void GenericResource::updateLowerBoundRevision() | 453 | void GenericResource::updateLowerBoundRevision() |
592 | { | 454 | { |
593 | mProcessor->setOldestUsedRevision(qMin(mClientLowerBoundRevision, mSourceChangeReplay->getLastReplayedRevision())); | 455 | mProcessor->setOldestUsedRevision(qMin(mClientLowerBoundRevision, mChangeReplay->getLastReplayedRevision())); |
594 | } | 456 | } |
595 | 457 | ||
596 | void GenericResource::setLowerBoundRevision(qint64 revision) | 458 | void GenericResource::setLowerBoundRevision(qint64 revision) |
@@ -599,7 +461,139 @@ void GenericResource::setLowerBoundRevision(qint64 revision) | |||
599 | updateLowerBoundRevision(); | 461 | updateLowerBoundRevision(); |
600 | } | 462 | } |
601 | 463 | ||
602 | void GenericResource::createEntity(const QByteArray &sinkId, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject, | 464 | |
465 | |||
466 | |||
467 | EntityStore::EntityStore(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier, Sink::Storage::Transaction &transaction) | ||
468 | : mResourceType(resourceType), mResourceInstanceIdentifier(resourceInstanceIdentifier), | ||
469 | mTransaction(transaction) | ||
470 | { | ||
471 | |||
472 | } | ||
473 | |||
474 | template<typename T> | ||
475 | T EntityStore::read(const QByteArray &identifier) const | ||
476 | { | ||
477 | auto typeName = ApplicationDomain::getTypeName<T>(); | ||
478 | auto mainDatabase = Storage::mainDatabase(mTransaction, typeName); | ||
479 | auto bufferAdaptor = getLatest(mainDatabase, identifier, *Sink::AdaptorFactoryRegistry::instance().getFactory<T>(mResourceType)); | ||
480 | Q_ASSERT(bufferAdaptor); | ||
481 | return T(mResourceInstanceIdentifier, identifier, 0, bufferAdaptor); | ||
482 | } | ||
483 | |||
484 | QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> EntityStore::getLatest(const Sink::Storage::NamedDatabase &db, const QByteArray &uid, DomainTypeAdaptorFactoryInterface &adaptorFactory) | ||
485 | { | ||
486 | QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> current; | ||
487 | db.findLatest(uid, | ||
488 | [¤t, &adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool { | ||
489 | Sink::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); | ||
490 | if (!buffer.isValid()) { | ||
491 | Warning() << "Read invalid buffer from disk"; | ||
492 | } else { | ||
493 | Trace() << "Found value " << key; | ||
494 | current = adaptorFactory.createAdaptor(buffer.entity()); | ||
495 | } | ||
496 | return false; | ||
497 | }, | ||
498 | [](const Sink::Storage::Error &error) { Warning() << "Failed to read current value from storage: " << error.message; }); | ||
499 | return current; | ||
500 | } | ||
501 | |||
502 | |||
503 | |||
504 | SyncStore::SyncStore(Sink::Storage::Transaction &transaction) | ||
505 | : mTransaction(transaction) | ||
506 | { | ||
507 | |||
508 | } | ||
509 | |||
510 | void SyncStore::recordRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId) | ||
511 | { | ||
512 | Index("rid.mapping." + bufferType, mTransaction).add(remoteId, localId); | ||
513 | Index("localid.mapping." + bufferType, mTransaction).add(localId, remoteId); | ||
514 | } | ||
515 | |||
516 | void SyncStore::removeRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId) | ||
517 | { | ||
518 | Index("rid.mapping." + bufferType, mTransaction).remove(remoteId, localId); | ||
519 | Index("localid.mapping." + bufferType, mTransaction).remove(localId, remoteId); | ||
520 | } | ||
521 | |||
522 | void SyncStore::updateRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId) | ||
523 | { | ||
524 | const auto oldRemoteId = Index("localid.mapping." + bufferType, mTransaction).lookup(localId); | ||
525 | removeRemoteId(bufferType, localId, oldRemoteId); | ||
526 | recordRemoteId(bufferType, localId, remoteId); | ||
527 | } | ||
528 | |||
529 | QByteArray SyncStore::resolveRemoteId(const QByteArray &bufferType, const QByteArray &remoteId) | ||
530 | { | ||
531 | // Lookup local id for remote id, or insert a new pair otherwise | ||
532 | Index index("rid.mapping." + bufferType, mTransaction); | ||
533 | QByteArray sinkId = index.lookup(remoteId); | ||
534 | if (sinkId.isEmpty()) { | ||
535 | sinkId = QUuid::createUuid().toString().toUtf8(); | ||
536 | index.add(remoteId, sinkId); | ||
537 | Index("localid.mapping." + bufferType, mTransaction).add(sinkId, remoteId); | ||
538 | } | ||
539 | return sinkId; | ||
540 | } | ||
541 | |||
542 | QByteArray SyncStore::resolveLocalId(const QByteArray &bufferType, const QByteArray &localId) | ||
543 | { | ||
544 | QByteArray remoteId = Index("localid.mapping." + bufferType, mTransaction).lookup(localId); | ||
545 | if (remoteId.isEmpty()) { | ||
546 | Warning() << "Couldn't find the remote id for " << localId; | ||
547 | return QByteArray(); | ||
548 | } | ||
549 | return remoteId; | ||
550 | } | ||
551 | |||
552 | |||
553 | |||
554 | |||
555 | |||
556 | |||
557 | |||
558 | |||
559 | Synchronizer::Synchronizer(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier) | ||
560 | : mStorage(Sink::storageLocation(), resourceInstanceIdentifier, Sink::Storage::ReadOnly), | ||
561 | mSyncStorage(Sink::storageLocation(), resourceInstanceIdentifier + ".synchronization", Sink::Storage::ReadWrite), | ||
562 | mResourceType(resourceType), | ||
563 | mResourceInstanceIdentifier(resourceInstanceIdentifier) | ||
564 | { | ||
565 | Trace() << "Starting synchronizer: " << resourceType << resourceInstanceIdentifier; | ||
566 | |||
567 | } | ||
568 | |||
569 | void Synchronizer::setup(const std::function<void(int commandId, const QByteArray &data)> &enqueueCommandCallback) | ||
570 | { | ||
571 | mEnqueue = enqueueCommandCallback; | ||
572 | } | ||
573 | |||
574 | void Synchronizer::enqueueCommand(int commandId, const QByteArray &data) | ||
575 | { | ||
576 | Q_ASSERT(mEnqueue); | ||
577 | mEnqueue(commandId, data); | ||
578 | } | ||
579 | |||
580 | EntityStore &Synchronizer::store() | ||
581 | { | ||
582 | if (!mEntityStore) { | ||
583 | mEntityStore = QSharedPointer<EntityStore>::create(mResourceType, mResourceInstanceIdentifier, mTransaction); | ||
584 | } | ||
585 | return *mEntityStore; | ||
586 | } | ||
587 | |||
588 | SyncStore &Synchronizer::syncStore() | ||
589 | { | ||
590 | if (!mSyncStore) { | ||
591 | mSyncStore = QSharedPointer<SyncStore>::create(mSyncTransaction); | ||
592 | } | ||
593 | return *mSyncStore; | ||
594 | } | ||
595 | |||
596 | void Synchronizer::createEntity(const QByteArray &sinkId, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject, | ||
603 | DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function<void(const QByteArray &)> callback) | 597 | DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function<void(const QByteArray &)> callback) |
604 | { | 598 | { |
605 | // These changes are coming from the source | 599 | // These changes are coming from the source |
@@ -616,7 +610,7 @@ void GenericResource::createEntity(const QByteArray &sinkId, const QByteArray &b | |||
616 | callback(BufferUtils::extractBuffer(fbb)); | 610 | callback(BufferUtils::extractBuffer(fbb)); |
617 | } | 611 | } |
618 | 612 | ||
619 | void GenericResource::modifyEntity(const QByteArray &sinkId, qint64 revision, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject, | 613 | void Synchronizer::modifyEntity(const QByteArray &sinkId, qint64 revision, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject, |
620 | DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function<void(const QByteArray &)> callback) | 614 | DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function<void(const QByteArray &)> callback) |
621 | { | 615 | { |
622 | // These changes are coming from the source | 616 | // These changes are coming from the source |
@@ -634,7 +628,7 @@ void GenericResource::modifyEntity(const QByteArray &sinkId, qint64 revision, co | |||
634 | callback(BufferUtils::extractBuffer(fbb)); | 628 | callback(BufferUtils::extractBuffer(fbb)); |
635 | } | 629 | } |
636 | 630 | ||
637 | void GenericResource::deleteEntity(const QByteArray &sinkId, qint64 revision, const QByteArray &bufferType, std::function<void(const QByteArray &)> callback) | 631 | void Synchronizer::deleteEntity(const QByteArray &sinkId, qint64 revision, const QByteArray &bufferType, std::function<void(const QByteArray &)> callback) |
638 | { | 632 | { |
639 | // These changes are coming from the source | 633 | // These changes are coming from the source |
640 | const auto replayToSource = false; | 634 | const auto replayToSource = false; |
@@ -647,96 +641,36 @@ void GenericResource::deleteEntity(const QByteArray &sinkId, qint64 revision, co | |||
647 | callback(BufferUtils::extractBuffer(fbb)); | 641 | callback(BufferUtils::extractBuffer(fbb)); |
648 | } | 642 | } |
649 | 643 | ||
650 | void GenericResource::recordRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId, Sink::Storage::Transaction &transaction) | 644 | void Synchronizer::scanForRemovals(const QByteArray &bufferType, const std::function<void(const std::function<void(const QByteArray &key)> &callback)> &entryGenerator, std::function<bool(const QByteArray &remoteId)> exists) |
651 | { | ||
652 | Index("rid.mapping." + bufferType, transaction).add(remoteId, localId); | ||
653 | ; | ||
654 | Index("localid.mapping." + bufferType, transaction).add(localId, remoteId); | ||
655 | } | ||
656 | |||
657 | void GenericResource::removeRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId, Sink::Storage::Transaction &transaction) | ||
658 | { | ||
659 | Index("rid.mapping." + bufferType, transaction).remove(remoteId, localId); | ||
660 | Index("localid.mapping." + bufferType, transaction).remove(localId, remoteId); | ||
661 | } | ||
662 | |||
663 | void GenericResource::updateRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId, Sink::Storage::Transaction &transaction) | ||
664 | { | ||
665 | const auto oldRemoteId = Index("localid.mapping." + bufferType, transaction).lookup(localId); | ||
666 | removeRemoteId(bufferType, localId, oldRemoteId, transaction); | ||
667 | recordRemoteId(bufferType, localId, remoteId, transaction); | ||
668 | } | ||
669 | |||
670 | QByteArray GenericResource::resolveRemoteId(const QByteArray &bufferType, const QByteArray &remoteId, Sink::Storage::Transaction &transaction) | ||
671 | { | ||
672 | // Lookup local id for remote id, or insert a new pair otherwise | ||
673 | Index index("rid.mapping." + bufferType, transaction); | ||
674 | QByteArray sinkId = index.lookup(remoteId); | ||
675 | if (sinkId.isEmpty()) { | ||
676 | sinkId = QUuid::createUuid().toString().toUtf8(); | ||
677 | index.add(remoteId, sinkId); | ||
678 | Index("localid.mapping." + bufferType, transaction).add(sinkId, remoteId); | ||
679 | } | ||
680 | return sinkId; | ||
681 | } | ||
682 | |||
683 | QByteArray GenericResource::resolveLocalId(const QByteArray &bufferType, const QByteArray &localId, Sink::Storage::Transaction &transaction) | ||
684 | { | ||
685 | QByteArray remoteId = Index("localid.mapping." + bufferType, transaction).lookup(localId); | ||
686 | if (remoteId.isEmpty()) { | ||
687 | Warning() << "Couldn't find the remote id for " << localId; | ||
688 | return QByteArray(); | ||
689 | } | ||
690 | return remoteId; | ||
691 | } | ||
692 | |||
693 | void GenericResource::scanForRemovals(Sink::Storage::Transaction &transaction, Sink::Storage::Transaction &synchronizationTransaction, const QByteArray &bufferType, | ||
694 | const std::function<void(const std::function<void(const QByteArray &key)> &callback)> &entryGenerator, std::function<bool(const QByteArray &remoteId)> exists) | ||
695 | { | 645 | { |
696 | entryGenerator([this, &transaction, bufferType, &synchronizationTransaction, &exists](const QByteArray &key) { | 646 | entryGenerator([this, bufferType, &exists](const QByteArray &key) { |
697 | auto sinkId = Sink::Storage::uidFromKey(key); | 647 | auto sinkId = Sink::Storage::uidFromKey(key); |
698 | Trace() << "Checking for removal " << key; | 648 | Trace() << "Checking for removal " << key; |
699 | const auto remoteId = resolveLocalId(bufferType, sinkId, synchronizationTransaction); | 649 | const auto remoteId = syncStore().resolveLocalId(bufferType, sinkId); |
700 | // If we have no remoteId, the entity hasn't been replayed to the source yet | 650 | // If we have no remoteId, the entity hasn't been replayed to the source yet |
701 | if (!remoteId.isEmpty()) { | 651 | if (!remoteId.isEmpty()) { |
702 | if (!exists(remoteId)) { | 652 | if (!exists(remoteId)) { |
703 | Trace() << "Found a removed entity: " << sinkId; | 653 | Trace() << "Found a removed entity: " << sinkId; |
704 | deleteEntity(sinkId, Sink::Storage::maxRevision(transaction), bufferType, | 654 | deleteEntity(sinkId, Sink::Storage::maxRevision(mTransaction), bufferType, |
705 | [this](const QByteArray &buffer) { enqueueCommand(mSynchronizerQueue, Sink::Commands::DeleteEntityCommand, buffer); }); | 655 | [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::DeleteEntityCommand, buffer); }); |
706 | } | 656 | } |
707 | } | 657 | } |
708 | }); | 658 | }); |
709 | } | 659 | } |
710 | 660 | ||
711 | QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> GenericResource::getLatest(const Sink::Storage::NamedDatabase &db, const QByteArray &uid, DomainTypeAdaptorFactoryInterface &adaptorFactory) | 661 | void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity) |
712 | { | ||
713 | QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> current; | ||
714 | db.findLatest(uid, | ||
715 | [¤t, &adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool { | ||
716 | Sink::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); | ||
717 | if (!buffer.isValid()) { | ||
718 | Warning() << "Read invalid buffer from disk"; | ||
719 | } else { | ||
720 | current = adaptorFactory.createAdaptor(buffer.entity()); | ||
721 | } | ||
722 | return false; | ||
723 | }, | ||
724 | [](const Sink::Storage::Error &error) { Warning() << "Failed to read current value from storage: " << error.message; }); | ||
725 | return current; | ||
726 | } | ||
727 | |||
728 | void GenericResource::createOrModify(Sink::Storage::Transaction &transaction, Sink::Storage::Transaction &synchronizationTransaction, | ||
729 | DomainTypeAdaptorFactoryInterface &adaptorFactory, const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity) | ||
730 | { | 662 | { |
731 | auto mainDatabase = Storage::mainDatabase(transaction, bufferType); | 663 | Trace() << "Create or modify" << bufferType << remoteId; |
732 | const auto sinkId = resolveRemoteId(bufferType, remoteId, synchronizationTransaction); | 664 | auto mainDatabase = Storage::mainDatabase(mTransaction, bufferType); |
665 | const auto sinkId = syncStore().resolveRemoteId(bufferType, remoteId); | ||
733 | const auto found = mainDatabase.contains(sinkId); | 666 | const auto found = mainDatabase.contains(sinkId); |
667 | auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(mResourceType, bufferType); | ||
734 | if (!found) { | 668 | if (!found) { |
735 | Trace() << "Found a new entity: " << remoteId; | 669 | Trace() << "Found a new entity: " << remoteId; |
736 | createEntity( | 670 | createEntity( |
737 | sinkId, bufferType, entity, adaptorFactory, [this](const QByteArray &buffer) { enqueueCommand(mSynchronizerQueue, Sink::Commands::CreateEntityCommand, buffer); }); | 671 | sinkId, bufferType, entity, *adaptorFactory, [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::CreateEntityCommand, buffer); }); |
738 | } else { // modification | 672 | } else { // modification |
739 | if (auto current = getLatest(mainDatabase, sinkId, adaptorFactory)) { | 673 | if (auto current = store().getLatest(mainDatabase, sinkId, *adaptorFactory)) { |
740 | bool changed = false; | 674 | bool changed = false; |
741 | for (const auto &property : entity.changedProperties()) { | 675 | for (const auto &property : entity.changedProperties()) { |
742 | if (entity.getProperty(property) != current->getProperty(property)) { | 676 | if (entity.getProperty(property) != current->getProperty(property)) { |
@@ -746,8 +680,8 @@ void GenericResource::createOrModify(Sink::Storage::Transaction &transaction, Si | |||
746 | } | 680 | } |
747 | if (changed) { | 681 | if (changed) { |
748 | Trace() << "Found a modified entity: " << remoteId; | 682 | Trace() << "Found a modified entity: " << remoteId; |
749 | modifyEntity(sinkId, Sink::Storage::maxRevision(transaction), bufferType, entity, adaptorFactory, | 683 | modifyEntity(sinkId, Sink::Storage::maxRevision(mTransaction), bufferType, entity, *adaptorFactory, |
750 | [this](const QByteArray &buffer) { enqueueCommand(mSynchronizerQueue, Sink::Commands::ModifyEntityCommand, buffer); }); | 684 | [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::ModifyEntityCommand, buffer); }); |
751 | } | 685 | } |
752 | } else { | 686 | } else { |
753 | Warning() << "Failed to get current entity"; | 687 | Warning() << "Failed to get current entity"; |
@@ -755,6 +689,118 @@ void GenericResource::createOrModify(Sink::Storage::Transaction &transaction, Si | |||
755 | } | 689 | } |
756 | } | 690 | } |
757 | 691 | ||
692 | KAsync::Job<void> Synchronizer::synchronize() | ||
693 | { | ||
694 | mTransaction = mStorage.createTransaction(Sink::Storage::ReadOnly); | ||
695 | mSyncTransaction = mSyncStorage.createTransaction(Sink::Storage::ReadWrite); | ||
696 | return synchronizeWithSource().then<void>([this]() { | ||
697 | mTransaction.abort(); | ||
698 | mSyncTransaction.commit(); | ||
699 | mSyncStore.clear(); | ||
700 | mEntityStore.clear(); | ||
701 | }); | ||
702 | } | ||
703 | |||
704 | |||
705 | |||
706 | SourceWriteBack::SourceWriteBack(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier) | ||
707 | : ChangeReplay(resourceInstanceIdentifier), | ||
708 | mSyncStorage(Sink::storageLocation(), resourceInstanceIdentifier + ".synchronization", Sink::Storage::ReadWrite), | ||
709 | mResourceType(resourceType), | ||
710 | mResourceInstanceIdentifier(resourceInstanceIdentifier) | ||
711 | { | ||
712 | |||
713 | } | ||
714 | |||
715 | EntityStore &SourceWriteBack::store() | ||
716 | { | ||
717 | if (!mEntityStore) { | ||
718 | mEntityStore = QSharedPointer<EntityStore>::create(mResourceType, mResourceInstanceIdentifier, mTransaction); | ||
719 | } | ||
720 | return *mEntityStore; | ||
721 | } | ||
722 | |||
723 | SyncStore &SourceWriteBack::syncStore() | ||
724 | { | ||
725 | if (!mSyncStore) { | ||
726 | mSyncStore = QSharedPointer<SyncStore>::create(mSyncTransaction); | ||
727 | } | ||
728 | return *mSyncStore; | ||
729 | } | ||
730 | |||
731 | KAsync::Job<void> SourceWriteBack::replay(const QByteArray &type, const QByteArray &key, const QByteArray &value) | ||
732 | { | ||
733 | mTransaction = mStorage.createTransaction(Sink::Storage::ReadOnly); | ||
734 | mSyncTransaction = mSyncStorage.createTransaction(Sink::Storage::ReadWrite); | ||
735 | |||
736 | Sink::EntityBuffer buffer(value); | ||
737 | const Sink::Entity &entity = buffer.entity(); | ||
738 | const auto metadataBuffer = Sink::EntityBuffer::readBuffer<Sink::Metadata>(entity.metadata()); | ||
739 | Q_ASSERT(metadataBuffer); | ||
740 | if (!metadataBuffer->replayToSource()) { | ||
741 | Trace() << "Change is coming from the source"; | ||
742 | return KAsync::null<void>(); | ||
743 | } | ||
744 | const qint64 revision = metadataBuffer ? metadataBuffer->revision() : -1; | ||
745 | const auto operation = metadataBuffer ? metadataBuffer->operation() : Sink::Operation_Creation; | ||
746 | const auto uid = Sink::Storage::uidFromKey(key); | ||
747 | QByteArray oldRemoteId; | ||
748 | |||
749 | if (operation != Sink::Operation_Creation) { | ||
750 | oldRemoteId = syncStore().resolveLocalId(type, uid); | ||
751 | } | ||
752 | Trace() << "Replaying " << key << type; | ||
753 | |||
754 | KAsync::Job<QByteArray> job = KAsync::null<QByteArray>(); | ||
755 | if (type == ENTITY_TYPE_FOLDER) { | ||
756 | auto folder = store().read<ApplicationDomain::Folder>(uid); | ||
757 | // const Sink::ApplicationDomain::Folder folder(mResourceInstanceIdentifier, uid, revision, mAdaptorFactories.value(type)->createAdaptor(entity)); | ||
758 | job = replay(folder, operation, oldRemoteId); | ||
759 | } else if (type == ENTITY_TYPE_MAIL) { | ||
760 | auto mail = store().read<ApplicationDomain::Mail>(uid); | ||
761 | // const Sink::ApplicationDomain::Mail mail(mResourceInstanceIdentifier, uid, revision, mAdaptorFactories.value(type)->createAdaptor(entity)); | ||
762 | job = replay(mail, operation, oldRemoteId); | ||
763 | } | ||
764 | |||
765 | return job.then<void, QByteArray>([this, operation, type, uid](const QByteArray &remoteId) { | ||
766 | Trace() << "Replayed change with remote id: " << remoteId; | ||
767 | if (operation == Sink::Operation_Creation) { | ||
768 | if (remoteId.isEmpty()) { | ||
769 | Warning() << "Returned an empty remoteId from the creation"; | ||
770 | } else { | ||
771 | syncStore().recordRemoteId(type, uid, remoteId); | ||
772 | } | ||
773 | } else if (operation == Sink::Operation_Modification) { | ||
774 | if (remoteId.isEmpty()) { | ||
775 | Warning() << "Returned an empty remoteId from the creation"; | ||
776 | } else { | ||
777 | syncStore().updateRemoteId(type, uid, remoteId); | ||
778 | } | ||
779 | } else if (operation == Sink::Operation_Removal) { | ||
780 | syncStore().removeRemoteId(type, uid, remoteId); | ||
781 | } else { | ||
782 | Warning() << "Unkown operation" << operation; | ||
783 | } | ||
784 | |||
785 | mTransaction.abort(); | ||
786 | mSyncTransaction.commit(); | ||
787 | mSyncStore.clear(); | ||
788 | mEntityStore.clear(); | ||
789 | }, [](int errorCode, const QString &errorMessage) { | ||
790 | Warning() << "Failed to replay change: " << errorMessage; | ||
791 | }); | ||
792 | } | ||
793 | |||
794 | KAsync::Job<QByteArray> SourceWriteBack::replay(const ApplicationDomain::Mail &, Sink::Operation, const QByteArray &) | ||
795 | { | ||
796 | return KAsync::null<QByteArray>(); | ||
797 | } | ||
798 | |||
799 | KAsync::Job<QByteArray> SourceWriteBack::replay(const ApplicationDomain::Folder &, Sink::Operation, const QByteArray &) | ||
800 | { | ||
801 | return KAsync::null<QByteArray>(); | ||
802 | } | ||
803 | |||
758 | 804 | ||
759 | #pragma clang diagnostic push | 805 | #pragma clang diagnostic push |
760 | #pragma clang diagnostic ignored "-Wundefined-reinterpret-cast" | 806 | #pragma clang diagnostic ignored "-Wundefined-reinterpret-cast" |
diff --git a/common/genericresource.h b/common/genericresource.h index c551e29..45d5d3a 100644 --- a/common/genericresource.h +++ b/common/genericresource.h | |||
@@ -24,14 +24,16 @@ | |||
24 | #include <messagequeue.h> | 24 | #include <messagequeue.h> |
25 | #include <flatbuffers/flatbuffers.h> | 25 | #include <flatbuffers/flatbuffers.h> |
26 | #include <domainadaptor.h> | 26 | #include <domainadaptor.h> |
27 | #include "changereplay.h" | ||
28 | |||
27 | #include <QTimer> | 29 | #include <QTimer> |
28 | 30 | ||
29 | class CommandProcessor; | 31 | class CommandProcessor; |
30 | class ChangeReplay; | ||
31 | 32 | ||
32 | namespace Sink { | 33 | namespace Sink { |
33 | class Pipeline; | 34 | class Pipeline; |
34 | class Preprocessor; | 35 | class Preprocessor; |
36 | class Synchronizer; | ||
35 | 37 | ||
36 | /** | 38 | /** |
37 | * Generic Resource implementation. | 39 | * Generic Resource implementation. |
@@ -39,7 +41,7 @@ class Preprocessor; | |||
39 | class SINK_EXPORT GenericResource : public Resource | 41 | class SINK_EXPORT GenericResource : public Resource |
40 | { | 42 | { |
41 | public: | 43 | public: |
42 | GenericResource(const QByteArray &resourceInstanceIdentifier, const QSharedPointer<Pipeline> &pipeline = QSharedPointer<Pipeline>()); | 44 | GenericResource(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier, const QSharedPointer<Pipeline> &pipeline, const QSharedPointer<ChangeReplay> &changeReplay, const QSharedPointer<Synchronizer> &synchronizer); |
43 | virtual ~GenericResource(); | 45 | virtual ~GenericResource(); |
44 | 46 | ||
45 | virtual void processCommand(int commandId, const QByteArray &data) Q_DECL_OVERRIDE; | 47 | virtual void processCommand(int commandId, const QByteArray &data) Q_DECL_OVERRIDE; |
@@ -64,41 +66,90 @@ protected: | |||
64 | 66 | ||
65 | void addType(const QByteArray &type, DomainTypeAdaptorFactoryInterface::Ptr factory, const QVector<Sink::Preprocessor *> &preprocessors); | 67 | void addType(const QByteArray &type, DomainTypeAdaptorFactoryInterface::Ptr factory, const QVector<Sink::Preprocessor *> &preprocessors); |
66 | 68 | ||
67 | ///Base implementation call the replay$Type calls | ||
68 | virtual KAsync::Job<void> replay(Sink::Storage &synchronizationStore, const QByteArray &type, const QByteArray &key, const QByteArray &value); | ||
69 | ///Implement to write back changes to the server | ||
70 | virtual KAsync::Job<QByteArray> replay(const Sink::ApplicationDomain::Mail &, Sink::Operation, const QByteArray &oldRemoteId); | ||
71 | virtual KAsync::Job<QByteArray> replay(const Sink::ApplicationDomain::Folder &, Sink::Operation, const QByteArray &oldRemoteId); | ||
72 | |||
73 | void onProcessorError(int errorCode, const QString &errorMessage); | 69 | void onProcessorError(int errorCode, const QString &errorMessage); |
74 | void enqueueCommand(MessageQueue &mq, int commandId, const QByteArray &data); | 70 | void enqueueCommand(MessageQueue &mq, int commandId, const QByteArray &data); |
75 | 71 | ||
76 | static void createEntity(const QByteArray &localId, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject, | 72 | MessageQueue mUserQueue; |
77 | DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function<void(const QByteArray &)> callback); | 73 | MessageQueue mSynchronizerQueue; |
78 | static void modifyEntity(const QByteArray &localId, qint64 revision, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject, | 74 | QByteArray mResourceType; |
79 | DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function<void(const QByteArray &)> callback); | 75 | QByteArray mResourceInstanceIdentifier; |
80 | static void deleteEntity(const QByteArray &localId, qint64 revision, const QByteArray &bufferType, std::function<void(const QByteArray &)> callback); | 76 | QSharedPointer<Pipeline> mPipeline; |
77 | |||
78 | private: | ||
79 | CommandProcessor *mProcessor; | ||
80 | QSharedPointer<ChangeReplay> mChangeReplay; | ||
81 | QSharedPointer<Synchronizer> mSynchronizer; | ||
82 | int mError; | ||
83 | QTimer mCommitQueueTimer; | ||
84 | qint64 mClientLowerBoundRevision; | ||
85 | QHash<QByteArray, DomainTypeAdaptorFactoryInterface::Ptr> mAdaptorFactories; | ||
86 | }; | ||
87 | |||
88 | class SINK_EXPORT SyncStore | ||
89 | { | ||
90 | public: | ||
91 | SyncStore(Sink::Storage::Transaction &); | ||
81 | 92 | ||
82 | /** | 93 | /** |
83 | * Records a localId to remoteId mapping | 94 | * Records a localId to remoteId mapping |
84 | */ | 95 | */ |
85 | void recordRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId, Sink::Storage::Transaction &transaction); | 96 | void recordRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId); |
86 | void removeRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId, Sink::Storage::Transaction &transaction); | 97 | void removeRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId); |
87 | void updateRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId, Sink::Storage::Transaction &transaction); | 98 | void updateRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId); |
88 | 99 | ||
89 | /** | 100 | /** |
90 | * Tries to find a local id for the remote id, and creates a new local id otherwise. | 101 | * Tries to find a local id for the remote id, and creates a new local id otherwise. |
91 | * | 102 | * |
92 | * The new local id is recorded in the local to remote id mapping. | 103 | * The new local id is recorded in the local to remote id mapping. |
93 | */ | 104 | */ |
94 | QByteArray resolveRemoteId(const QByteArray &type, const QByteArray &remoteId, Sink::Storage::Transaction &transaction); | 105 | QByteArray resolveRemoteId(const QByteArray &type, const QByteArray &remoteId); |
95 | 106 | ||
96 | /** | 107 | /** |
97 | * Tries to find a remote id for a local id. | 108 | * Tries to find a remote id for a local id. |
98 | * | 109 | * |
99 | * This can fail if the entity hasn't been written back to the server yet. | 110 | * This can fail if the entity hasn't been written back to the server yet. |
100 | */ | 111 | */ |
101 | QByteArray resolveLocalId(const QByteArray &bufferType, const QByteArray &localId, Sink::Storage::Transaction &transaction); | 112 | QByteArray resolveLocalId(const QByteArray &bufferType, const QByteArray &localId); |
113 | |||
114 | private: | ||
115 | Sink::Storage::Transaction &mTransaction; | ||
116 | }; | ||
117 | |||
118 | class SINK_EXPORT EntityStore | ||
119 | { | ||
120 | public: | ||
121 | EntityStore(const QByteArray &resourceType, const QByteArray &mResourceInstanceIdentifier, Sink::Storage::Transaction &transaction); | ||
122 | |||
123 | template<typename T> | ||
124 | T read(const QByteArray &identifier) const; | ||
125 | |||
126 | static QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> getLatest(const Sink::Storage::NamedDatabase &db, const QByteArray &uid, DomainTypeAdaptorFactoryInterface &adaptorFactory); | ||
127 | private: | ||
128 | QByteArray mResourceType; | ||
129 | QByteArray mResourceInstanceIdentifier; | ||
130 | Sink::Storage::Transaction &mTransaction; | ||
131 | }; | ||
132 | |||
133 | /** | ||
134 | * Synchronize and add what we don't already have to local queue | ||
135 | */ | ||
136 | class SINK_EXPORT Synchronizer | ||
137 | { | ||
138 | public: | ||
139 | Synchronizer(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier); | ||
140 | |||
141 | void setup(const std::function<void(int commandId, const QByteArray &data)> &enqueueCommandCallback); | ||
142 | KAsync::Job<void> synchronize(); | ||
143 | |||
144 | protected: | ||
145 | ///Calls the callback to enqueue the command | ||
146 | void enqueueCommand(int commandId, const QByteArray &data); | ||
147 | |||
148 | static void createEntity(const QByteArray &localId, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject, | ||
149 | DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function<void(const QByteArray &)> callback); | ||
150 | static void modifyEntity(const QByteArray &localId, qint64 revision, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject, | ||
151 | DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function<void(const QByteArray &)> callback); | ||
152 | static void deleteEntity(const QByteArray &localId, qint64 revision, const QByteArray &bufferType, std::function<void(const QByteArray &)> callback); | ||
102 | 153 | ||
103 | /** | 154 | /** |
104 | * A synchronous algorithm to remove entities that are no longer existing. | 155 | * A synchronous algorithm to remove entities that are no longer existing. |
@@ -110,7 +161,7 @@ protected: | |||
110 | * | 161 | * |
111 | * All functions are called synchronously, and both @param entryGenerator and @param exists need to be synchronous. | 162 | * All functions are called synchronously, and both @param entryGenerator and @param exists need to be synchronous. |
112 | */ | 163 | */ |
113 | void scanForRemovals(Sink::Storage::Transaction &transaction, Sink::Storage::Transaction &synchronizationTransaction, const QByteArray &bufferType, | 164 | void scanForRemovals(const QByteArray &bufferType, |
114 | const std::function<void(const std::function<void(const QByteArray &key)> &callback)> &entryGenerator, std::function<bool(const QByteArray &remoteId)> exists); | 165 | const std::function<void(const std::function<void(const QByteArray &key)> &callback)> &entryGenerator, std::function<bool(const QByteArray &remoteId)> exists); |
115 | 166 | ||
116 | /** | 167 | /** |
@@ -118,22 +169,60 @@ protected: | |||
118 | * | 169 | * |
119 | * Depending on whether the entity is locally available, or has changed. | 170 | * Depending on whether the entity is locally available, or has changed. |
120 | */ | 171 | */ |
121 | void createOrModify(Sink::Storage::Transaction &transaction, Sink::Storage::Transaction &synchronizationTransaction, DomainTypeAdaptorFactoryInterface &adaptorFactory, | 172 | void createOrModify(const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity); |
122 | const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity); | ||
123 | 173 | ||
124 | static QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> getLatest(const Sink::Storage::NamedDatabase &db, const QByteArray &uid, DomainTypeAdaptorFactoryInterface &adaptorFactory); | 174 | //Read only access to main storage |
175 | EntityStore &store(); | ||
125 | 176 | ||
126 | MessageQueue mUserQueue; | 177 | //Read/Write access to sync storage |
127 | MessageQueue mSynchronizerQueue; | 178 | SyncStore &syncStore(); |
179 | |||
180 | virtual KAsync::Job<void> synchronizeWithSource() = 0; | ||
181 | |||
182 | private: | ||
183 | QSharedPointer<SyncStore> mSyncStore; | ||
184 | QSharedPointer<EntityStore> mEntityStore; | ||
185 | Sink::Storage mStorage; | ||
186 | Sink::Storage mSyncStorage; | ||
187 | QByteArray mResourceType; | ||
128 | QByteArray mResourceInstanceIdentifier; | 188 | QByteArray mResourceInstanceIdentifier; |
129 | QSharedPointer<Pipeline> mPipeline; | 189 | Sink::Storage::Transaction mTransaction; |
190 | Sink::Storage::Transaction mSyncTransaction; | ||
191 | std::function<void(int commandId, const QByteArray &data)> mEnqueue; | ||
192 | }; | ||
193 | |||
194 | /** | ||
195 | * Replay changes to the source | ||
196 | */ | ||
197 | class SINK_EXPORT SourceWriteBack : public ChangeReplay | ||
198 | { | ||
199 | public: | ||
200 | SourceWriteBack(const QByteArray &resourceType,const QByteArray &resourceInstanceIdentifier); | ||
201 | |||
202 | protected: | ||
203 | ///Base implementation calls the replay$Type calls | ||
204 | virtual KAsync::Job<void> replay(const QByteArray &type, const QByteArray &key, const QByteArray &value) Q_DECL_OVERRIDE; | ||
205 | |||
206 | protected: | ||
207 | ///Implement to write back changes to the server | ||
208 | virtual KAsync::Job<QByteArray> replay(const Sink::ApplicationDomain::Mail &, Sink::Operation, const QByteArray &oldRemoteId); | ||
209 | virtual KAsync::Job<QByteArray> replay(const Sink::ApplicationDomain::Folder &, Sink::Operation, const QByteArray &oldRemoteId); | ||
210 | |||
211 | //Read only access to main storage | ||
212 | EntityStore &store(); | ||
213 | |||
214 | //Read/Write access to sync storage | ||
215 | SyncStore &syncStore(); | ||
130 | 216 | ||
131 | private: | 217 | private: |
132 | CommandProcessor *mProcessor; | 218 | Sink::Storage mSyncStorage; |
133 | ChangeReplay *mSourceChangeReplay; | 219 | QSharedPointer<SyncStore> mSyncStore; |
134 | int mError; | 220 | QSharedPointer<EntityStore> mEntityStore; |
135 | QTimer mCommitQueueTimer; | 221 | Sink::Storage::Transaction mTransaction; |
136 | qint64 mClientLowerBoundRevision; | 222 | Sink::Storage::Transaction mSyncTransaction; |
137 | QHash<QByteArray, DomainTypeAdaptorFactoryInterface::Ptr> mAdaptorFactories; | 223 | QByteArray mResourceType; |
224 | QByteArray mResourceInstanceIdentifier; | ||
138 | }; | 225 | }; |
226 | |||
227 | |||
139 | } | 228 | } |
diff --git a/common/pipeline.cpp b/common/pipeline.cpp index 637a1b8..7863f67 100644 --- a/common/pipeline.cpp +++ b/common/pipeline.cpp | |||
@@ -34,6 +34,7 @@ | |||
34 | #include "entitybuffer.h" | 34 | #include "entitybuffer.h" |
35 | #include "log.h" | 35 | #include "log.h" |
36 | #include "domain/applicationdomaintype.h" | 36 | #include "domain/applicationdomaintype.h" |
37 | #include "adaptorfactoryregistry.h" | ||
37 | #include "definitions.h" | 38 | #include "definitions.h" |
38 | #include "bufferutils.h" | 39 | #include "bufferutils.h" |
39 | 40 | ||
@@ -52,11 +53,11 @@ public: | |||
52 | Storage storage; | 53 | Storage storage; |
53 | Storage::Transaction transaction; | 54 | Storage::Transaction transaction; |
54 | QHash<QString, QVector<Preprocessor *>> processors; | 55 | QHash<QString, QVector<Preprocessor *>> processors; |
55 | QHash<QString, DomainTypeAdaptorFactoryInterface::Ptr> adaptorFactory; | ||
56 | bool revisionChanged; | 56 | bool revisionChanged; |
57 | void storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid); | 57 | void storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid); |
58 | QTime transactionTime; | 58 | QTime transactionTime; |
59 | int transactionItemCount; | 59 | int transactionItemCount; |
60 | QByteArray resourceType; | ||
60 | }; | 61 | }; |
61 | 62 | ||
62 | void Pipeline::Private::storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid) | 63 | void Pipeline::Private::storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid) |
@@ -84,9 +85,9 @@ void Pipeline::setPreprocessors(const QString &entityType, const QVector<Preproc | |||
84 | d->processors[entityType] = processors; | 85 | d->processors[entityType] = processors; |
85 | } | 86 | } |
86 | 87 | ||
87 | void Pipeline::setAdaptorFactory(const QString &entityType, DomainTypeAdaptorFactoryInterface::Ptr factory) | 88 | void Pipeline::setResourceType(const QByteArray &resourceType) |
88 | { | 89 | { |
89 | d->adaptorFactory.insert(entityType, factory); | 90 | d->resourceType = resourceType; |
90 | } | 91 | } |
91 | 92 | ||
92 | void Pipeline::startTransaction() | 93 | void Pipeline::startTransaction() |
@@ -102,7 +103,9 @@ void Pipeline::startTransaction() | |||
102 | Trace() << "Starting transaction."; | 103 | Trace() << "Starting transaction."; |
103 | d->transactionTime.start(); | 104 | d->transactionTime.start(); |
104 | d->transactionItemCount = 0; | 105 | d->transactionItemCount = 0; |
105 | d->transaction = std::move(storage().createTransaction(Storage::ReadWrite)); | 106 | d->transaction = std::move(storage().createTransaction(Storage::ReadWrite, [](const Sink::Storage::Error &error) { |
107 | Warning() << error.message; | ||
108 | })); | ||
106 | } | 109 | } |
107 | 110 | ||
108 | void Pipeline::commit() | 111 | void Pipeline::commit() |
@@ -189,7 +192,7 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size) | |||
189 | auto metadataBuffer = metadataBuilder.Finish(); | 192 | auto metadataBuffer = metadataBuilder.Finish(); |
190 | FinishMetadataBuffer(metadataFbb, metadataBuffer); | 193 | FinishMetadataBuffer(metadataFbb, metadataBuffer); |
191 | 194 | ||
192 | auto adaptorFactory = d->adaptorFactory.value(bufferType); | 195 | auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceType, bufferType); |
193 | if (!adaptorFactory) { | 196 | if (!adaptorFactory) { |
194 | Warning() << "no adaptor factory for type " << bufferType; | 197 | Warning() << "no adaptor factory for type " << bufferType; |
195 | return KAsync::error<qint64>(0); | 198 | return KAsync::error<qint64>(0); |
@@ -244,7 +247,7 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size) | |||
244 | } | 247 | } |
245 | 248 | ||
246 | // TODO use only readPropertyMapper and writePropertyMapper | 249 | // TODO use only readPropertyMapper and writePropertyMapper |
247 | auto adaptorFactory = d->adaptorFactory.value(bufferType); | 250 | auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceType, bufferType); |
248 | if (!adaptorFactory) { | 251 | if (!adaptorFactory) { |
249 | Warning() << "no adaptor factory for type " << bufferType; | 252 | Warning() << "no adaptor factory for type " << bufferType; |
250 | return KAsync::error<qint64>(0); | 253 | return KAsync::error<qint64>(0); |
@@ -373,7 +376,7 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size) | |||
373 | flatbuffers::FlatBufferBuilder fbb; | 376 | flatbuffers::FlatBufferBuilder fbb; |
374 | EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), 0, 0, 0, 0); | 377 | EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), 0, 0, 0, 0); |
375 | 378 | ||
376 | auto adaptorFactory = d->adaptorFactory.value(bufferType); | 379 | auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceType, bufferType); |
377 | if (!adaptorFactory) { | 380 | if (!adaptorFactory) { |
378 | Warning() << "no adaptor factory for type " << bufferType; | 381 | Warning() << "no adaptor factory for type " << bufferType; |
379 | return KAsync::error<qint64>(0); | 382 | return KAsync::error<qint64>(0); |
diff --git a/common/pipeline.h b/common/pipeline.h index c65cbfd..2ca87a4 100644 --- a/common/pipeline.h +++ b/common/pipeline.h | |||
@@ -46,13 +46,12 @@ public: | |||
46 | 46 | ||
47 | Storage &storage() const; | 47 | Storage &storage() const; |
48 | 48 | ||
49 | void setResourceType(const QByteArray &resourceType); | ||
49 | void setPreprocessors(const QString &entityType, const QVector<Preprocessor *> &preprocessors); | 50 | void setPreprocessors(const QString &entityType, const QVector<Preprocessor *> &preprocessors); |
50 | void startTransaction(); | 51 | void startTransaction(); |
51 | void commit(); | 52 | void commit(); |
52 | Storage::Transaction &transaction(); | 53 | Storage::Transaction &transaction(); |
53 | 54 | ||
54 | void setAdaptorFactory(const QString &entityType, DomainTypeAdaptorFactoryInterface::Ptr factory); | ||
55 | |||
56 | KAsync::Job<qint64> newEntity(void const *command, size_t size); | 55 | KAsync::Job<qint64> newEntity(void const *command, size_t size); |
57 | KAsync::Job<qint64> modifiedEntity(void const *command, size_t size); | 56 | KAsync::Job<qint64> modifiedEntity(void const *command, size_t size); |
58 | KAsync::Job<qint64> deletedEntity(void const *command, size_t size); | 57 | KAsync::Job<qint64> deletedEntity(void const *command, size_t size); |
diff --git a/common/resource.cpp b/common/resource.cpp index 6713686..82c9fc8 100644 --- a/common/resource.cpp +++ b/common/resource.cpp | |||
@@ -26,6 +26,7 @@ | |||
26 | #include <QPointer> | 26 | #include <QPointer> |
27 | 27 | ||
28 | #include "facadefactory.h" | 28 | #include "facadefactory.h" |
29 | #include "adaptorfactoryregistry.h" | ||
29 | 30 | ||
30 | namespace Sink { | 31 | namespace Sink { |
31 | 32 | ||
@@ -110,6 +111,7 @@ ResourceFactory *ResourceFactory::load(const QString &resourceName) | |||
110 | if (factory) { | 111 | if (factory) { |
111 | Private::s_loadedFactories.insert(resourceName, factory); | 112 | Private::s_loadedFactories.insert(resourceName, factory); |
112 | factory->registerFacades(FacadeFactory::instance()); | 113 | factory->registerFacades(FacadeFactory::instance()); |
114 | factory->registerAdaptorFactories(AdaptorFactoryRegistry::instance()); | ||
113 | // TODO: if we need more data on it const QJsonObject json = loader.metaData()[QStringLiteral("MetaData")].toObject(); | 115 | // TODO: if we need more data on it const QJsonObject json = loader.metaData()[QStringLiteral("MetaData")].toObject(); |
114 | return factory; | 116 | return factory; |
115 | } else { | 117 | } else { |
diff --git a/common/resource.h b/common/resource.h index 0e7cd11..d6c3c5f 100644 --- a/common/resource.h +++ b/common/resource.h | |||
@@ -26,6 +26,7 @@ | |||
26 | 26 | ||
27 | namespace Sink { | 27 | namespace Sink { |
28 | class FacadeFactory; | 28 | class FacadeFactory; |
29 | class AdaptorFactoryRegistry; | ||
29 | 30 | ||
30 | /** | 31 | /** |
31 | * Resource interface | 32 | * Resource interface |
@@ -81,6 +82,7 @@ public: | |||
81 | 82 | ||
82 | virtual Resource *createResource(const QByteArray &instanceIdentifier) = 0; | 83 | virtual Resource *createResource(const QByteArray &instanceIdentifier) = 0; |
83 | virtual void registerFacades(FacadeFactory &factory) = 0; | 84 | virtual void registerFacades(FacadeFactory &factory) = 0; |
85 | virtual void registerAdaptorFactories(AdaptorFactoryRegistry ®istry) {}; | ||
84 | 86 | ||
85 | private: | 87 | private: |
86 | class Private; | 88 | class Private; |