diff options
author | Christian Mollekopf <chrigi_1@fastmail.fm> | 2016-05-28 00:24:53 +0200 |
---|---|---|
committer | Christian Mollekopf <chrigi_1@fastmail.fm> | 2016-05-28 00:24:53 +0200 |
commit | e9c75177590d8546ebd9425f16c4269a9c92f517 (patch) | |
tree | 8a953631e467d9df50657e22bd90954b7b71c990 | |
parent | 8f01eb530262d1442fc4fa0782a41e052412d43b (diff) | |
download | sink-e9c75177590d8546ebd9425f16c4269a9c92f517.tar.gz sink-e9c75177590d8546ebd9425f16c4269a9c92f517.zip |
Refactored the generic resource to use separate classes for
changereplay and synchronization.
This cleans up the API and avoids the excessive passing around of
transactions. It also provides more flexibility in eventually using
different synchronization strategies for different resources.
-rw-r--r-- | common/CMakeLists.txt | 2 | ||||
-rw-r--r-- | common/adaptorfactoryregistry.cpp | 68 | ||||
-rw-r--r-- | common/adaptorfactoryregistry.h | 64 | ||||
-rw-r--r-- | common/changereplay.cpp | 102 | ||||
-rw-r--r-- | common/changereplay.h | 68 | ||||
-rw-r--r-- | common/genericresource.cpp | 560 | ||||
-rw-r--r-- | common/genericresource.h | 151 | ||||
-rw-r--r-- | common/pipeline.cpp | 17 | ||||
-rw-r--r-- | common/pipeline.h | 3 | ||||
-rw-r--r-- | common/resource.cpp | 2 | ||||
-rw-r--r-- | common/resource.h | 2 | ||||
-rw-r--r-- | examples/dummyresource/resourcefactory.cpp | 163 | ||||
-rw-r--r-- | examples/dummyresource/resourcefactory.h | 6 | ||||
-rw-r--r-- | examples/maildirresource/maildirresource.h | 1 | ||||
-rw-r--r-- | tests/dummyresourcetest.cpp | 2 | ||||
-rw-r--r-- | tests/mailquerybenchmark.cpp | 5 | ||||
-rw-r--r-- | tests/pipelinebenchmark.cpp | 6 | ||||
-rw-r--r-- | tests/pipelinetest.cpp | 14 | ||||
-rw-r--r-- | tests/querytest.cpp | 2 | ||||
-rw-r--r-- | tests/testimplementations.h | 2 |
20 files changed, 854 insertions, 386 deletions
diff --git a/common/CMakeLists.txt b/common/CMakeLists.txt index c269a85..79b627a 100644 --- a/common/CMakeLists.txt +++ b/common/CMakeLists.txt | |||
@@ -66,6 +66,8 @@ set(command_SRCS | |||
66 | domain/folder.cpp | 66 | domain/folder.cpp |
67 | test.cpp | 67 | test.cpp |
68 | query.cpp | 68 | query.cpp |
69 | changereplay.cpp | ||
70 | adaptorfactoryregistry.cpp | ||
69 | ${storage_SRCS}) | 71 | ${storage_SRCS}) |
70 | 72 | ||
71 | add_library(${PROJECT_NAME} SHARED ${command_SRCS}) | 73 | add_library(${PROJECT_NAME} SHARED ${command_SRCS}) |
diff --git a/common/adaptorfactoryregistry.cpp b/common/adaptorfactoryregistry.cpp new file mode 100644 index 0000000..323a02d --- /dev/null +++ b/common/adaptorfactoryregistry.cpp | |||
@@ -0,0 +1,68 @@ | |||
1 | /* | ||
2 | * Copyright (C) 2016 Christian Mollekopf <chrigi_1@fastmail.fm> | ||
3 | * | ||
4 | * This library is free software; you can redistribute it and/or | ||
5 | * modify it under the terms of the GNU Lesser General Public | ||
6 | * License as published by the Free Software Foundation; either | ||
7 | * version 2.1 of the License, or (at your option) version 3, or any | ||
8 | * later version accepted by the membership of KDE e.V. (or its | ||
9 | * successor approved by the membership of KDE e.V.), which shall | ||
10 | * act as a proxy defined in Section 6 of version 3 of the license. | ||
11 | * | ||
12 | * This library is distributed in the hope that it will be useful, | ||
13 | * but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
14 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | ||
15 | * Lesser General Public License for more details. | ||
16 | * | ||
17 | * You should have received a copy of the GNU Lesser General Public | ||
18 | * License along with this library. If not, see <http://www.gnu.org/licenses/>. | ||
19 | */ | ||
20 | #include "adaptorfactoryregistry.h" | ||
21 | |||
22 | #include <QByteArray> | ||
23 | #include <QDebug> | ||
24 | #include <QMutex> | ||
25 | #include <functional> | ||
26 | #include <memory> | ||
27 | |||
28 | #include "domaintypeadaptorfactoryinterface.h" | ||
29 | #include "applicationdomaintype.h" | ||
30 | #include "log.h" | ||
31 | |||
32 | using namespace Sink; | ||
33 | |||
34 | AdaptorFactoryRegistry &AdaptorFactoryRegistry::instance() | ||
35 | { | ||
36 | // QMutexLocker locker(&sMutex); | ||
37 | static AdaptorFactoryRegistry *instance = 0; | ||
38 | if (!instance) { | ||
39 | instance = new AdaptorFactoryRegistry; | ||
40 | } | ||
41 | return *instance; | ||
42 | } | ||
43 | |||
44 | static QByteArray key(const QByteArray &resource, const QByteArray &type) | ||
45 | { | ||
46 | return resource + type; | ||
47 | } | ||
48 | |||
49 | AdaptorFactoryRegistry::AdaptorFactoryRegistry() | ||
50 | { | ||
51 | |||
52 | } | ||
53 | |||
54 | std::shared_ptr<DomainTypeAdaptorFactoryInterface> AdaptorFactoryRegistry::getFactory(const QByteArray &resource, const QByteArray &typeName) | ||
55 | { | ||
56 | const auto ptr = mRegistry.value(key(resource, typeName)); | ||
57 | //We have to check the pointer before the cast, otherwise a check would return true also for invalid instances. | ||
58 | if (!ptr) { | ||
59 | return std::shared_ptr<DomainTypeAdaptorFactoryInterface>(); | ||
60 | } | ||
61 | return std::static_pointer_cast<DomainTypeAdaptorFactoryInterface>(ptr); | ||
62 | } | ||
63 | |||
64 | void AdaptorFactoryRegistry::registerFactory(const QByteArray &resource, const std::shared_ptr<void> &instance, const QByteArray typeName) | ||
65 | { | ||
66 | mRegistry.insert(key(resource, typeName), instance); | ||
67 | } | ||
68 | |||
diff --git a/common/adaptorfactoryregistry.h b/common/adaptorfactoryregistry.h new file mode 100644 index 0000000..f06120a --- /dev/null +++ b/common/adaptorfactoryregistry.h | |||
@@ -0,0 +1,64 @@ | |||
1 | /* | ||
2 | * Copyright (C) 2016 Christian Mollekopf <chrigi_1@fastmail.fm> | ||
3 | * | ||
4 | * This library is free software; you can redistribute it and/or | ||
5 | * modify it under the terms of the GNU Lesser General Public | ||
6 | * License as published by the Free Software Foundation; either | ||
7 | * version 2.1 of the License, or (at your option) version 3, or any | ||
8 | * later version accepted by the membership of KDE e.V. (or its | ||
9 | * successor approved by the membership of KDE e.V.), which shall | ||
10 | * act as a proxy defined in Section 6 of version 3 of the license. | ||
11 | * | ||
12 | * This library is distributed in the hope that it will be useful, | ||
13 | * but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
14 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | ||
15 | * Lesser General Public License for more details. | ||
16 | * | ||
17 | * You should have received a copy of the GNU Lesser General Public | ||
18 | * License along with this library. If not, see <http://www.gnu.org/licenses/>. | ||
19 | */ | ||
20 | |||
21 | #pragma once | ||
22 | |||
23 | #include "sink_export.h" | ||
24 | #include <QByteArray> | ||
25 | #include <QDebug> | ||
26 | #include <QMutex> | ||
27 | #include <functional> | ||
28 | #include <memory> | ||
29 | |||
30 | #include "domaintypeadaptorfactoryinterface.h" | ||
31 | #include "applicationdomaintype.h" | ||
32 | #include "log.h" | ||
33 | |||
34 | namespace Sink { | ||
35 | |||
36 | /** | ||
37 | */ | ||
38 | class SINK_EXPORT AdaptorFactoryRegistry | ||
39 | { | ||
40 | public: | ||
41 | static AdaptorFactoryRegistry &instance(); | ||
42 | |||
43 | template <class DomainType, class Factory> | ||
44 | void registerFactory(const QByteArray &resource) | ||
45 | { | ||
46 | registerFactory(resource, std::make_shared<Factory>(), ApplicationDomain::getTypeName<DomainType>()); | ||
47 | } | ||
48 | |||
49 | template <class DomainType> | ||
50 | std::shared_ptr<DomainTypeAdaptorFactoryInterface> getFactory(const QByteArray &resource) | ||
51 | { | ||
52 | return getFactory(resource, ApplicationDomain::getTypeName<DomainType>()); | ||
53 | } | ||
54 | |||
55 | std::shared_ptr<DomainTypeAdaptorFactoryInterface> getFactory(const QByteArray &resource, const QByteArray &typeName); | ||
56 | |||
57 | private: | ||
58 | AdaptorFactoryRegistry(); | ||
59 | void registerFactory(const QByteArray &resource, const std::shared_ptr<void> &instance, const QByteArray typeName); | ||
60 | |||
61 | QHash<QByteArray, std::shared_ptr<void>> mRegistry; | ||
62 | static QMutex sMutex; | ||
63 | }; | ||
64 | } | ||
diff --git a/common/changereplay.cpp b/common/changereplay.cpp new file mode 100644 index 0000000..2447b6e --- /dev/null +++ b/common/changereplay.cpp | |||
@@ -0,0 +1,102 @@ | |||
1 | /* | ||
2 | * Copyright (C) 2016 Christian Mollekopf <mollekopf@kolabsys.com> | ||
3 | * | ||
4 | * This library is free software; you can redistribute it and/or | ||
5 | * modify it under the terms of the GNU Lesser General Public | ||
6 | * License as published by the Free Software Foundation; either | ||
7 | * version 2.1 of the License, or (at your option) version 3, or any | ||
8 | * later version accepted by the membership of KDE e.V. (or its | ||
9 | * successor approved by the membership of KDE e.V.), which shall | ||
10 | * act as a proxy defined in Section 6 of version 3 of the license. | ||
11 | * | ||
12 | * This library is distributed in the hope that it will be useful, | ||
13 | * but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
14 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | ||
15 | * Lesser General Public License for more details. | ||
16 | * | ||
17 | * You should have received a copy of the GNU Lesser General Public | ||
18 | * License along with this library. If not, see <http://www.gnu.org/licenses/>. | ||
19 | */ | ||
20 | #include "changereplay.h" | ||
21 | |||
22 | #include "entitybuffer.h" | ||
23 | #include "log.h" | ||
24 | #include "definitions.h" | ||
25 | #include "bufferutils.h" | ||
26 | |||
27 | using namespace Sink; | ||
28 | |||
29 | #undef DEBUG_AREA | ||
30 | #define DEBUG_AREA "resource.changereplay" | ||
31 | |||
32 | ChangeReplay::ChangeReplay(const QByteArray &resourceName) | ||
33 | : mStorage(storageLocation(), resourceName, Storage::ReadOnly), mChangeReplayStore(storageLocation(), resourceName + ".changereplay", Storage::ReadWrite) | ||
34 | { | ||
35 | Trace() << "Created change replay: " << resourceName; | ||
36 | } | ||
37 | |||
38 | qint64 ChangeReplay::getLastReplayedRevision() | ||
39 | { | ||
40 | qint64 lastReplayedRevision = 0; | ||
41 | auto replayStoreTransaction = mChangeReplayStore.createTransaction(Storage::ReadOnly); | ||
42 | replayStoreTransaction.openDatabase().scan("lastReplayedRevision", | ||
43 | [&lastReplayedRevision](const QByteArray &key, const QByteArray &value) -> bool { | ||
44 | lastReplayedRevision = value.toLongLong(); | ||
45 | return false; | ||
46 | }, | ||
47 | [](const Storage::Error &) {}); | ||
48 | return lastReplayedRevision; | ||
49 | } | ||
50 | |||
51 | bool ChangeReplay::allChangesReplayed() | ||
52 | { | ||
53 | const qint64 topRevision = Storage::maxRevision(mStorage.createTransaction(Storage::ReadOnly, [](const Sink::Storage::Error &error) { | ||
54 | Warning() << error.message; | ||
55 | })); | ||
56 | const qint64 lastReplayedRevision = getLastReplayedRevision(); | ||
57 | Trace() << "All changes replayed " << topRevision << lastReplayedRevision; | ||
58 | return (lastReplayedRevision >= topRevision); | ||
59 | } | ||
60 | |||
61 | void ChangeReplay::revisionChanged() | ||
62 | { | ||
63 | auto mainStoreTransaction = mStorage.createTransaction(Storage::ReadOnly, [](const Sink::Storage::Error &error) { | ||
64 | Warning() << error.message; | ||
65 | }); | ||
66 | auto replayStoreTransaction = mChangeReplayStore.createTransaction(Storage::ReadWrite, [](const Sink::Storage::Error &error) { | ||
67 | Warning() << error.message; | ||
68 | }); | ||
69 | qint64 lastReplayedRevision = 1; | ||
70 | replayStoreTransaction.openDatabase().scan("lastReplayedRevision", | ||
71 | [&lastReplayedRevision](const QByteArray &key, const QByteArray &value) -> bool { | ||
72 | lastReplayedRevision = value.toLongLong(); | ||
73 | return false; | ||
74 | }, | ||
75 | [](const Storage::Error &) {}); | ||
76 | const qint64 topRevision = Storage::maxRevision(mainStoreTransaction); | ||
77 | |||
78 | Trace() << "Changereplay from " << lastReplayedRevision << " to " << topRevision; | ||
79 | if (lastReplayedRevision <= topRevision) { | ||
80 | qint64 revision = lastReplayedRevision; | ||
81 | for (; revision <= topRevision; revision++) { | ||
82 | const auto uid = Storage::getUidFromRevision(mainStoreTransaction, revision); | ||
83 | const auto type = Storage::getTypeFromRevision(mainStoreTransaction, revision); | ||
84 | const auto key = Storage::assembleKey(uid, revision); | ||
85 | Storage::mainDatabase(mainStoreTransaction, type) | ||
86 | .scan(key, | ||
87 | [&lastReplayedRevision, type, this](const QByteArray &key, const QByteArray &value) -> bool { | ||
88 | replay(type, key, value).exec(); | ||
89 | // TODO make for loop async, and pass to async replay function together with type | ||
90 | Trace() << "Replaying " << key; | ||
91 | return false; | ||
92 | }, | ||
93 | [key](const Storage::Error &) { ErrorMsg() << "Failed to replay change " << key; }); | ||
94 | } | ||
95 | revision--; | ||
96 | replayStoreTransaction.openDatabase().write("lastReplayedRevision", QByteArray::number(revision)); | ||
97 | replayStoreTransaction.commit(); | ||
98 | Trace() << "Replayed until " << revision; | ||
99 | } | ||
100 | emit changesReplayed(); | ||
101 | } | ||
102 | |||
diff --git a/common/changereplay.h b/common/changereplay.h new file mode 100644 index 0000000..a568060 --- /dev/null +++ b/common/changereplay.h | |||
@@ -0,0 +1,68 @@ | |||
1 | /* | ||
2 | * Copyright (C) 2016 Christian Mollekopf <mollekopf@kolabsys.com> | ||
3 | * | ||
4 | * This library is free software; you can redistribute it and/or | ||
5 | * modify it under the terms of the GNU Lesser General Public | ||
6 | * License as published by the Free Software Foundation; either | ||
7 | * version 2.1 of the License, or (at your option) version 3, or any | ||
8 | * later version accepted by the membership of KDE e.V. (or its | ||
9 | * successor approved by the membership of KDE e.V.), which shall | ||
10 | * act as a proxy defined in Section 6 of version 3 of the license. | ||
11 | * | ||
12 | * This library is distributed in the hope that it will be useful, | ||
13 | * but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
14 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | ||
15 | * Lesser General Public License for more details. | ||
16 | * | ||
17 | * You should have received a copy of the GNU Lesser General Public | ||
18 | * License along with this library. If not, see <http://www.gnu.org/licenses/>. | ||
19 | */ | ||
20 | #pragma once | ||
21 | |||
22 | #include "sink_export.h" | ||
23 | #include <QObject> | ||
24 | #include <Async/Async> | ||
25 | |||
26 | #include "storage.h" | ||
27 | |||
28 | namespace Sink { | ||
29 | |||
30 | /** | ||
31 | * Replays changes from the storage one by one. | ||
32 | * | ||
33 | * Uses a local database to: | ||
34 | * * Remember what changes have been replayed already. | ||
35 | * * store a mapping of remote to local buffers | ||
36 | */ | ||
37 | class SINK_EXPORT ChangeReplay : public QObject | ||
38 | { | ||
39 | Q_OBJECT | ||
40 | public: | ||
41 | ChangeReplay(const QByteArray &resourceName); | ||
42 | |||
43 | qint64 getLastReplayedRevision(); | ||
44 | bool allChangesReplayed(); | ||
45 | |||
46 | signals: | ||
47 | void changesReplayed(); | ||
48 | |||
49 | public slots: | ||
50 | void revisionChanged(); | ||
51 | |||
52 | protected: | ||
53 | virtual KAsync::Job<void> replay(const QByteArray &type, const QByteArray &key, const QByteArray &value) = 0; | ||
54 | Sink::Storage mStorage; | ||
55 | |||
56 | private: | ||
57 | Sink::Storage mChangeReplayStore; | ||
58 | }; | ||
59 | |||
60 | class NullChangeReplay : public ChangeReplay | ||
61 | { | ||
62 | public: | ||
63 | NullChangeReplay() : ChangeReplay("null") {} | ||
64 | KAsync::Job<void> replay(const QByteArray &type, const QByteArray &key, const QByteArray &value) Q_DECL_OVERRIDE { return KAsync::null<void>(); } | ||
65 | }; | ||
66 | |||
67 | } | ||
68 | |||
diff --git a/common/genericresource.cpp b/common/genericresource.cpp index eae6ead..cb2ef21 100644 --- a/common/genericresource.cpp +++ b/common/genericresource.cpp | |||
@@ -1,3 +1,22 @@ | |||
1 | /* | ||
2 | * Copyright (C) 2016 Christian Mollekopf <mollekopf@kolabsys.com> | ||
3 | * | ||
4 | * This library is free software; you can redistribute it and/or | ||
5 | * modify it under the terms of the GNU Lesser General Public | ||
6 | * License as published by the Free Software Foundation; either | ||
7 | * version 2.1 of the License, or (at your option) version 3, or any | ||
8 | * later version accepted by the membership of KDE e.V. (or its | ||
9 | * successor approved by the membership of KDE e.V.), which shall | ||
10 | * act as a proxy defined in Section 6 of version 3 of the license. | ||
11 | * | ||
12 | * This library is distributed in the hope that it will be useful, | ||
13 | * but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
14 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | ||
15 | * Lesser General Public License for more details. | ||
16 | * | ||
17 | * You should have received a copy of the GNU Lesser General Public | ||
18 | * License along with this library. If not, see <http://www.gnu.org/licenses/>. | ||
19 | */ | ||
1 | #include "genericresource.h" | 20 | #include "genericresource.h" |
2 | 21 | ||
3 | #include "entitybuffer.h" | 22 | #include "entitybuffer.h" |
@@ -14,6 +33,7 @@ | |||
14 | #include "log.h" | 33 | #include "log.h" |
15 | #include "definitions.h" | 34 | #include "definitions.h" |
16 | #include "bufferutils.h" | 35 | #include "bufferutils.h" |
36 | #include "adaptorfactoryregistry.h" | ||
17 | 37 | ||
18 | #include <QUuid> | 38 | #include <QUuid> |
19 | #include <QDataStream> | 39 | #include <QDataStream> |
@@ -30,96 +50,6 @@ static int sCommitInterval = 10; | |||
30 | using namespace Sink; | 50 | using namespace Sink; |
31 | 51 | ||
32 | #undef DEBUG_AREA | 52 | #undef DEBUG_AREA |
33 | #define DEBUG_AREA "resource.changereplay" | ||
34 | |||
35 | /** | ||
36 | * Replays changes from the storage one by one. | ||
37 | * | ||
38 | * Uses a local database to: | ||
39 | * * Remember what changes have been replayed already. | ||
40 | * * store a mapping of remote to local buffers | ||
41 | */ | ||
42 | class ChangeReplay : public QObject | ||
43 | { | ||
44 | Q_OBJECT | ||
45 | public: | ||
46 | typedef std::function<KAsync::Job<void>(const QByteArray &type, const QByteArray &key, const QByteArray &value)> ReplayFunction; | ||
47 | |||
48 | ChangeReplay(const QString &resourceName, const ReplayFunction &replayFunction) | ||
49 | : mStorage(storageLocation(), resourceName, Storage::ReadOnly), mChangeReplayStore(storageLocation(), resourceName + ".changereplay", Storage::ReadWrite), mReplayFunction(replayFunction) | ||
50 | { | ||
51 | } | ||
52 | |||
53 | qint64 getLastReplayedRevision() | ||
54 | { | ||
55 | qint64 lastReplayedRevision = 0; | ||
56 | auto replayStoreTransaction = mChangeReplayStore.createTransaction(Storage::ReadOnly); | ||
57 | replayStoreTransaction.openDatabase().scan("lastReplayedRevision", | ||
58 | [&lastReplayedRevision](const QByteArray &key, const QByteArray &value) -> bool { | ||
59 | lastReplayedRevision = value.toLongLong(); | ||
60 | return false; | ||
61 | }, | ||
62 | [](const Storage::Error &) {}); | ||
63 | return lastReplayedRevision; | ||
64 | } | ||
65 | |||
66 | bool allChangesReplayed() | ||
67 | { | ||
68 | const qint64 topRevision = Storage::maxRevision(mStorage.createTransaction(Storage::ReadOnly)); | ||
69 | const qint64 lastReplayedRevision = getLastReplayedRevision(); | ||
70 | Trace() << "All changes replayed " << topRevision << lastReplayedRevision; | ||
71 | return (lastReplayedRevision >= topRevision); | ||
72 | } | ||
73 | |||
74 | signals: | ||
75 | void changesReplayed(); | ||
76 | |||
77 | public slots: | ||
78 | void revisionChanged() | ||
79 | { | ||
80 | auto mainStoreTransaction = mStorage.createTransaction(Storage::ReadOnly); | ||
81 | auto replayStoreTransaction = mChangeReplayStore.createTransaction(Storage::ReadWrite); | ||
82 | qint64 lastReplayedRevision = 1; | ||
83 | replayStoreTransaction.openDatabase().scan("lastReplayedRevision", | ||
84 | [&lastReplayedRevision](const QByteArray &key, const QByteArray &value) -> bool { | ||
85 | lastReplayedRevision = value.toLongLong(); | ||
86 | return false; | ||
87 | }, | ||
88 | [](const Storage::Error &) {}); | ||
89 | const qint64 topRevision = Storage::maxRevision(mainStoreTransaction); | ||
90 | |||
91 | Trace() << "Changereplay from " << lastReplayedRevision << " to " << topRevision; | ||
92 | if (lastReplayedRevision <= topRevision) { | ||
93 | qint64 revision = lastReplayedRevision; | ||
94 | for (; revision <= topRevision; revision++) { | ||
95 | const auto uid = Storage::getUidFromRevision(mainStoreTransaction, revision); | ||
96 | const auto type = Storage::getTypeFromRevision(mainStoreTransaction, revision); | ||
97 | const auto key = Storage::assembleKey(uid, revision); | ||
98 | Storage::mainDatabase(mainStoreTransaction, type) | ||
99 | .scan(key, | ||
100 | [&lastReplayedRevision, type, this](const QByteArray &key, const QByteArray &value) -> bool { | ||
101 | mReplayFunction(type, key, value).exec(); | ||
102 | // TODO make for loop async, and pass to async replay function together with type | ||
103 | Trace() << "Replaying " << key; | ||
104 | return false; | ||
105 | }, | ||
106 | [key](const Storage::Error &) { ErrorMsg() << "Failed to replay change " << key; }); | ||
107 | } | ||
108 | revision--; | ||
109 | replayStoreTransaction.openDatabase().write("lastReplayedRevision", QByteArray::number(revision)); | ||
110 | replayStoreTransaction.commit(); | ||
111 | Trace() << "Replayed until " << revision; | ||
112 | } | ||
113 | emit changesReplayed(); | ||
114 | } | ||
115 | |||
116 | private: | ||
117 | Sink::Storage mStorage; | ||
118 | Sink::Storage mChangeReplayStore; | ||
119 | ReplayFunction mReplayFunction; | ||
120 | }; | ||
121 | |||
122 | #undef DEBUG_AREA | ||
123 | #define DEBUG_AREA "resource.commandprocessor" | 53 | #define DEBUG_AREA "resource.commandprocessor" |
124 | 54 | ||
125 | /** | 55 | /** |
@@ -133,10 +63,9 @@ class CommandProcessor : public QObject | |||
133 | public: | 63 | public: |
134 | CommandProcessor(Sink::Pipeline *pipeline, QList<MessageQueue *> commandQueues) : QObject(), mPipeline(pipeline), mCommandQueues(commandQueues), mProcessingLock(false) | 64 | CommandProcessor(Sink::Pipeline *pipeline, QList<MessageQueue *> commandQueues) : QObject(), mPipeline(pipeline), mCommandQueues(commandQueues), mProcessingLock(false) |
135 | { | 65 | { |
136 | mPipeline->startTransaction(); | 66 | mLowerBoundRevision = Storage::maxRevision(mPipeline->storage().createTransaction(Storage::ReadOnly, [](const Sink::Storage::Error &error) { |
137 | // FIXME Should be initialized to the current value of the change replay queue | 67 | Warning() << error.message; |
138 | mLowerBoundRevision = Storage::maxRevision(mPipeline->transaction()); | 68 | })); |
139 | mPipeline->commit(); | ||
140 | 69 | ||
141 | for (auto queue : mCommandQueues) { | 70 | for (auto queue : mCommandQueues) { |
142 | const bool ret = connect(queue, &MessageQueue::messageReady, this, &CommandProcessor::process); | 71 | const bool ret = connect(queue, &MessageQueue::messageReady, this, &CommandProcessor::process); |
@@ -303,15 +232,22 @@ private: | |||
303 | #undef DEBUG_AREA | 232 | #undef DEBUG_AREA |
304 | #define DEBUG_AREA "resource" | 233 | #define DEBUG_AREA "resource" |
305 | 234 | ||
306 | GenericResource::GenericResource(const QByteArray &resourceInstanceIdentifier, const QSharedPointer<Pipeline> &pipeline) | 235 | GenericResource::GenericResource(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier, const QSharedPointer<Pipeline> &pipeline, const QSharedPointer<ChangeReplay> &changeReplay, const QSharedPointer<Synchronizer> &synchronizer) |
307 | : Sink::Resource(), | 236 | : Sink::Resource(), |
308 | mUserQueue(Sink::storageLocation(), resourceInstanceIdentifier + ".userqueue"), | 237 | mUserQueue(Sink::storageLocation(), resourceInstanceIdentifier + ".userqueue"), |
309 | mSynchronizerQueue(Sink::storageLocation(), resourceInstanceIdentifier + ".synchronizerqueue"), | 238 | mSynchronizerQueue(Sink::storageLocation(), resourceInstanceIdentifier + ".synchronizerqueue"), |
239 | mResourceType(resourceType), | ||
310 | mResourceInstanceIdentifier(resourceInstanceIdentifier), | 240 | mResourceInstanceIdentifier(resourceInstanceIdentifier), |
311 | mPipeline(pipeline ? pipeline : QSharedPointer<Sink::Pipeline>::create(resourceInstanceIdentifier)), | 241 | mPipeline(pipeline ? pipeline : QSharedPointer<Sink::Pipeline>::create(resourceInstanceIdentifier)), |
242 | mChangeReplay(changeReplay), | ||
243 | mSynchronizer(synchronizer), | ||
312 | mError(0), | 244 | mError(0), |
313 | mClientLowerBoundRevision(std::numeric_limits<qint64>::max()) | 245 | mClientLowerBoundRevision(std::numeric_limits<qint64>::max()) |
314 | { | 246 | { |
247 | mPipeline->setResourceType(mResourceType); | ||
248 | mSynchronizer->setup([this](int commandId, const QByteArray &data) { | ||
249 | enqueueCommand(mSynchronizerQueue, commandId, data); | ||
250 | }); | ||
315 | mProcessor = new CommandProcessor(mPipeline.data(), QList<MessageQueue *>() << &mUserQueue << &mSynchronizerQueue); | 251 | mProcessor = new CommandProcessor(mPipeline.data(), QList<MessageQueue *>() << &mUserQueue << &mSynchronizerQueue); |
316 | mProcessor->setInspectionCommand([this](void const *command, size_t size) { | 252 | mProcessor->setInspectionCommand([this](void const *command, size_t size) { |
317 | flatbuffers::Verifier verifier((const uint8_t *)command, size); | 253 | flatbuffers::Verifier verifier((const uint8_t *)command, size); |
@@ -353,14 +289,9 @@ GenericResource::GenericResource(const QByteArray &resourceInstanceIdentifier, c | |||
353 | }); | 289 | }); |
354 | QObject::connect(mProcessor, &CommandProcessor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); }); | 290 | QObject::connect(mProcessor, &CommandProcessor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); }); |
355 | QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, this, &Resource::revisionUpdated); | 291 | QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, this, &Resource::revisionUpdated); |
356 | mSourceChangeReplay = new ChangeReplay(resourceInstanceIdentifier, [this](const QByteArray &type, const QByteArray &key, const QByteArray &value) { | ||
357 | // This results in a deadlock when a sync is in progress and we try to create a second writing transaction (which is why we turn changereplay off during the sync) | ||
358 | auto synchronizationStore = QSharedPointer<Sink::Storage>::create(Sink::storageLocation(), mResourceInstanceIdentifier + ".synchronization", Sink::Storage::ReadWrite); | ||
359 | return this->replay(*synchronizationStore, type, key, value).then<void>([synchronizationStore]() {}); | ||
360 | }); | ||
361 | enableChangeReplay(true); | 292 | enableChangeReplay(true); |
362 | mClientLowerBoundRevision = mPipeline->cleanedUpRevision(); | 293 | mClientLowerBoundRevision = mPipeline->cleanedUpRevision(); |
363 | mProcessor->setOldestUsedRevision(mSourceChangeReplay->getLastReplayedRevision()); | 294 | mProcessor->setOldestUsedRevision(mChangeReplay->getLastReplayedRevision()); |
364 | 295 | ||
365 | mCommitQueueTimer.setInterval(sCommitInterval); | 296 | mCommitQueueTimer.setInterval(sCommitInterval); |
366 | mCommitQueueTimer.setSingleShot(true); | 297 | mCommitQueueTimer.setSingleShot(true); |
@@ -370,7 +301,6 @@ GenericResource::GenericResource(const QByteArray &resourceInstanceIdentifier, c | |||
370 | GenericResource::~GenericResource() | 301 | GenericResource::~GenericResource() |
371 | { | 302 | { |
372 | delete mProcessor; | 303 | delete mProcessor; |
373 | delete mSourceChangeReplay; | ||
374 | } | 304 | } |
375 | 305 | ||
376 | KAsync::Job<void> GenericResource::inspect( | 306 | KAsync::Job<void> GenericResource::inspect( |
@@ -383,86 +313,20 @@ KAsync::Job<void> GenericResource::inspect( | |||
383 | void GenericResource::enableChangeReplay(bool enable) | 313 | void GenericResource::enableChangeReplay(bool enable) |
384 | { | 314 | { |
385 | if (enable) { | 315 | if (enable) { |
386 | QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, mSourceChangeReplay, &ChangeReplay::revisionChanged, Qt::QueuedConnection); | 316 | QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, mChangeReplay.data(), &ChangeReplay::revisionChanged, Qt::QueuedConnection); |
387 | QObject::connect(mSourceChangeReplay, &ChangeReplay::changesReplayed, this, &GenericResource::updateLowerBoundRevision); | 317 | QObject::connect(mChangeReplay.data(), &ChangeReplay::changesReplayed, this, &GenericResource::updateLowerBoundRevision); |
388 | mSourceChangeReplay->revisionChanged(); | 318 | mChangeReplay->revisionChanged(); |
389 | } else { | 319 | } else { |
390 | QObject::disconnect(mPipeline.data(), &Pipeline::revisionUpdated, mSourceChangeReplay, &ChangeReplay::revisionChanged); | 320 | QObject::disconnect(mPipeline.data(), &Pipeline::revisionUpdated, mChangeReplay.data(), &ChangeReplay::revisionChanged); |
391 | QObject::disconnect(mSourceChangeReplay, &ChangeReplay::changesReplayed, this, &GenericResource::updateLowerBoundRevision); | 321 | QObject::disconnect(mChangeReplay.data(), &ChangeReplay::changesReplayed, this, &GenericResource::updateLowerBoundRevision); |
392 | } | 322 | } |
393 | } | 323 | } |
394 | 324 | ||
395 | void GenericResource::addType(const QByteArray &type, DomainTypeAdaptorFactoryInterface::Ptr factory, const QVector<Sink::Preprocessor *> &preprocessors) | 325 | void GenericResource::addType(const QByteArray &type, DomainTypeAdaptorFactoryInterface::Ptr factory, const QVector<Sink::Preprocessor *> &preprocessors) |
396 | { | 326 | { |
397 | mPipeline->setPreprocessors(type, preprocessors); | 327 | mPipeline->setPreprocessors(type, preprocessors); |
398 | mPipeline->setAdaptorFactory(type, factory); | ||
399 | mAdaptorFactories.insert(type, factory); | ||
400 | } | ||
401 | |||
402 | KAsync::Job<void> GenericResource::replay(Sink::Storage &synchronizationStore, const QByteArray &type, const QByteArray &key, const QByteArray &value) | ||
403 | { | ||
404 | Sink::EntityBuffer buffer(value); | ||
405 | const Sink::Entity &entity = buffer.entity(); | ||
406 | const auto metadataBuffer = Sink::EntityBuffer::readBuffer<Sink::Metadata>(entity.metadata()); | ||
407 | Q_ASSERT(metadataBuffer); | ||
408 | if (!metadataBuffer->replayToSource()) { | ||
409 | Trace() << "Change is coming from the source"; | ||
410 | return KAsync::null<void>(); | ||
411 | } | ||
412 | const qint64 revision = metadataBuffer ? metadataBuffer->revision() : -1; | ||
413 | const auto operation = metadataBuffer ? metadataBuffer->operation() : Sink::Operation_Creation; | ||
414 | const auto uid = Sink::Storage::uidFromKey(key); | ||
415 | QByteArray oldRemoteId; | ||
416 | |||
417 | if (operation != Sink::Operation_Creation) { | ||
418 | auto synchronizationTransaction = synchronizationStore.createTransaction(Sink::Storage::ReadOnly); | ||
419 | oldRemoteId = resolveLocalId(type, uid, synchronizationTransaction); | ||
420 | } | ||
421 | Trace() << "Replaying " << key << type; | ||
422 | |||
423 | KAsync::Job<QByteArray> job = KAsync::null<QByteArray>(); | ||
424 | if (type == ENTITY_TYPE_FOLDER) { | ||
425 | const Sink::ApplicationDomain::Folder folder(mResourceInstanceIdentifier, uid, revision, mAdaptorFactories.value(type)->createAdaptor(entity)); | ||
426 | job = replay(folder, operation, oldRemoteId); | ||
427 | } else if (type == ENTITY_TYPE_MAIL) { | ||
428 | const Sink::ApplicationDomain::Mail mail(mResourceInstanceIdentifier, uid, revision, mAdaptorFactories.value(type)->createAdaptor(entity)); | ||
429 | job = replay(mail, operation, oldRemoteId); | ||
430 | } | ||
431 | |||
432 | return job.then<void, QByteArray>([=, &synchronizationStore](const QByteArray &remoteId) { | ||
433 | auto synchronizationTransaction = synchronizationStore.createTransaction(Sink::Storage::ReadWrite); | ||
434 | Trace() << "Replayed change with remote id: " << remoteId; | ||
435 | if (operation == Sink::Operation_Creation) { | ||
436 | if (remoteId.isEmpty()) { | ||
437 | Warning() << "Returned an empty remoteId from the creation"; | ||
438 | } else { | ||
439 | recordRemoteId(type, uid, remoteId, synchronizationTransaction); | ||
440 | } | ||
441 | } else if (operation == Sink::Operation_Modification) { | ||
442 | if (remoteId.isEmpty()) { | ||
443 | Warning() << "Returned an empty remoteId from the creation"; | ||
444 | } else { | ||
445 | updateRemoteId(type, uid, remoteId, synchronizationTransaction); | ||
446 | } | ||
447 | } else if (operation == Sink::Operation_Removal) { | ||
448 | removeRemoteId(type, uid, remoteId, synchronizationTransaction); | ||
449 | } else { | ||
450 | Warning() << "Unkown operation" << operation; | ||
451 | } | ||
452 | }, [](int errorCode, const QString &errorMessage) { | ||
453 | Warning() << "Failed to replay change: " << errorMessage; | ||
454 | }); | ||
455 | } | 328 | } |
456 | 329 | ||
457 | KAsync::Job<QByteArray> GenericResource::replay(const ApplicationDomain::Mail &, Sink::Operation, const QByteArray &) | ||
458 | { | ||
459 | return KAsync::null<QByteArray>(); | ||
460 | } | ||
461 | |||
462 | KAsync::Job<QByteArray> GenericResource::replay(const ApplicationDomain::Folder &, Sink::Operation, const QByteArray &) | ||
463 | { | ||
464 | return KAsync::null<QByteArray>(); | ||
465 | } | ||
466 | 330 | ||
467 | void GenericResource::removeDataFromDisk() | 331 | void GenericResource::removeDataFromDisk() |
468 | { | 332 | { |
@@ -528,10 +392,8 @@ KAsync::Job<void> GenericResource::synchronizeWithSource() | |||
528 | Log() << " Synchronizing"; | 392 | Log() << " Synchronizing"; |
529 | // Changereplay would deadlock otherwise when trying to open the synchronization store | 393 | // Changereplay would deadlock otherwise when trying to open the synchronization store |
530 | enableChangeReplay(false); | 394 | enableChangeReplay(false); |
531 | auto mainStore = QSharedPointer<Sink::Storage>::create(Sink::storageLocation(), mResourceInstanceIdentifier, Sink::Storage::ReadOnly); | 395 | mSynchronizer->synchronize() |
532 | auto syncStore = QSharedPointer<Sink::Storage>::create(Sink::storageLocation(), mResourceInstanceIdentifier + ".synchronization", Sink::Storage::ReadWrite); | 396 | .then<void>([this, &future]() { |
533 | synchronizeWithSource(*mainStore, *syncStore) | ||
534 | .then<void>([this, mainStore, syncStore, &future]() { | ||
535 | Log() << "Done Synchronizing"; | 397 | Log() << "Done Synchronizing"; |
536 | enableChangeReplay(true); | 398 | enableChangeReplay(true); |
537 | future.setFinished(); | 399 | future.setFinished(); |
@@ -576,11 +438,11 @@ KAsync::Job<void> GenericResource::processAllMessages() | |||
576 | .then<void>([this](KAsync::Future<void> &f) { waitForDrained(f, mSynchronizerQueue); }) | 438 | .then<void>([this](KAsync::Future<void> &f) { waitForDrained(f, mSynchronizerQueue); }) |
577 | .then<void>([this](KAsync::Future<void> &f) { waitForDrained(f, mUserQueue); }) | 439 | .then<void>([this](KAsync::Future<void> &f) { waitForDrained(f, mUserQueue); }) |
578 | .then<void>([this](KAsync::Future<void> &f) { | 440 | .then<void>([this](KAsync::Future<void> &f) { |
579 | if (mSourceChangeReplay->allChangesReplayed()) { | 441 | if (mChangeReplay->allChangesReplayed()) { |
580 | f.setFinished(); | 442 | f.setFinished(); |
581 | } else { | 443 | } else { |
582 | auto context = new QObject; | 444 | auto context = new QObject; |
583 | QObject::connect(mSourceChangeReplay, &ChangeReplay::changesReplayed, context, [&f, context]() { | 445 | QObject::connect(mChangeReplay.data(), &ChangeReplay::changesReplayed, context, [&f, context]() { |
584 | delete context; | 446 | delete context; |
585 | f.setFinished(); | 447 | f.setFinished(); |
586 | }); | 448 | }); |
@@ -590,7 +452,7 @@ KAsync::Job<void> GenericResource::processAllMessages() | |||
590 | 452 | ||
591 | void GenericResource::updateLowerBoundRevision() | 453 | void GenericResource::updateLowerBoundRevision() |
592 | { | 454 | { |
593 | mProcessor->setOldestUsedRevision(qMin(mClientLowerBoundRevision, mSourceChangeReplay->getLastReplayedRevision())); | 455 | mProcessor->setOldestUsedRevision(qMin(mClientLowerBoundRevision, mChangeReplay->getLastReplayedRevision())); |
594 | } | 456 | } |
595 | 457 | ||
596 | void GenericResource::setLowerBoundRevision(qint64 revision) | 458 | void GenericResource::setLowerBoundRevision(qint64 revision) |
@@ -599,7 +461,139 @@ void GenericResource::setLowerBoundRevision(qint64 revision) | |||
599 | updateLowerBoundRevision(); | 461 | updateLowerBoundRevision(); |
600 | } | 462 | } |
601 | 463 | ||
602 | void GenericResource::createEntity(const QByteArray &sinkId, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject, | 464 | |
465 | |||
466 | |||
467 | EntityStore::EntityStore(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier, Sink::Storage::Transaction &transaction) | ||
468 | : mResourceType(resourceType), mResourceInstanceIdentifier(resourceInstanceIdentifier), | ||
469 | mTransaction(transaction) | ||
470 | { | ||
471 | |||
472 | } | ||
473 | |||
474 | template<typename T> | ||
475 | T EntityStore::read(const QByteArray &identifier) const | ||
476 | { | ||
477 | auto typeName = ApplicationDomain::getTypeName<T>(); | ||
478 | auto mainDatabase = Storage::mainDatabase(mTransaction, typeName); | ||
479 | auto bufferAdaptor = getLatest(mainDatabase, identifier, *Sink::AdaptorFactoryRegistry::instance().getFactory<T>(mResourceType)); | ||
480 | Q_ASSERT(bufferAdaptor); | ||
481 | return T(mResourceInstanceIdentifier, identifier, 0, bufferAdaptor); | ||
482 | } | ||
483 | |||
484 | QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> EntityStore::getLatest(const Sink::Storage::NamedDatabase &db, const QByteArray &uid, DomainTypeAdaptorFactoryInterface &adaptorFactory) | ||
485 | { | ||
486 | QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> current; | ||
487 | db.findLatest(uid, | ||
488 | [¤t, &adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool { | ||
489 | Sink::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); | ||
490 | if (!buffer.isValid()) { | ||
491 | Warning() << "Read invalid buffer from disk"; | ||
492 | } else { | ||
493 | Trace() << "Found value " << key; | ||
494 | current = adaptorFactory.createAdaptor(buffer.entity()); | ||
495 | } | ||
496 | return false; | ||
497 | }, | ||
498 | [](const Sink::Storage::Error &error) { Warning() << "Failed to read current value from storage: " << error.message; }); | ||
499 | return current; | ||
500 | } | ||
501 | |||
502 | |||
503 | |||
504 | SyncStore::SyncStore(Sink::Storage::Transaction &transaction) | ||
505 | : mTransaction(transaction) | ||
506 | { | ||
507 | |||
508 | } | ||
509 | |||
510 | void SyncStore::recordRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId) | ||
511 | { | ||
512 | Index("rid.mapping." + bufferType, mTransaction).add(remoteId, localId); | ||
513 | Index("localid.mapping." + bufferType, mTransaction).add(localId, remoteId); | ||
514 | } | ||
515 | |||
516 | void SyncStore::removeRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId) | ||
517 | { | ||
518 | Index("rid.mapping." + bufferType, mTransaction).remove(remoteId, localId); | ||
519 | Index("localid.mapping." + bufferType, mTransaction).remove(localId, remoteId); | ||
520 | } | ||
521 | |||
522 | void SyncStore::updateRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId) | ||
523 | { | ||
524 | const auto oldRemoteId = Index("localid.mapping." + bufferType, mTransaction).lookup(localId); | ||
525 | removeRemoteId(bufferType, localId, oldRemoteId); | ||
526 | recordRemoteId(bufferType, localId, remoteId); | ||
527 | } | ||
528 | |||
529 | QByteArray SyncStore::resolveRemoteId(const QByteArray &bufferType, const QByteArray &remoteId) | ||
530 | { | ||
531 | // Lookup local id for remote id, or insert a new pair otherwise | ||
532 | Index index("rid.mapping." + bufferType, mTransaction); | ||
533 | QByteArray sinkId = index.lookup(remoteId); | ||
534 | if (sinkId.isEmpty()) { | ||
535 | sinkId = QUuid::createUuid().toString().toUtf8(); | ||
536 | index.add(remoteId, sinkId); | ||
537 | Index("localid.mapping." + bufferType, mTransaction).add(sinkId, remoteId); | ||
538 | } | ||
539 | return sinkId; | ||
540 | } | ||
541 | |||
542 | QByteArray SyncStore::resolveLocalId(const QByteArray &bufferType, const QByteArray &localId) | ||
543 | { | ||
544 | QByteArray remoteId = Index("localid.mapping." + bufferType, mTransaction).lookup(localId); | ||
545 | if (remoteId.isEmpty()) { | ||
546 | Warning() << "Couldn't find the remote id for " << localId; | ||
547 | return QByteArray(); | ||
548 | } | ||
549 | return remoteId; | ||
550 | } | ||
551 | |||
552 | |||
553 | |||
554 | |||
555 | |||
556 | |||
557 | |||
558 | |||
559 | Synchronizer::Synchronizer(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier) | ||
560 | : mStorage(Sink::storageLocation(), resourceInstanceIdentifier, Sink::Storage::ReadOnly), | ||
561 | mSyncStorage(Sink::storageLocation(), resourceInstanceIdentifier + ".synchronization", Sink::Storage::ReadWrite), | ||
562 | mResourceType(resourceType), | ||
563 | mResourceInstanceIdentifier(resourceInstanceIdentifier) | ||
564 | { | ||
565 | Trace() << "Starting synchronizer: " << resourceType << resourceInstanceIdentifier; | ||
566 | |||
567 | } | ||
568 | |||
569 | void Synchronizer::setup(const std::function<void(int commandId, const QByteArray &data)> &enqueueCommandCallback) | ||
570 | { | ||
571 | mEnqueue = enqueueCommandCallback; | ||
572 | } | ||
573 | |||
574 | void Synchronizer::enqueueCommand(int commandId, const QByteArray &data) | ||
575 | { | ||
576 | Q_ASSERT(mEnqueue); | ||
577 | mEnqueue(commandId, data); | ||
578 | } | ||
579 | |||
580 | EntityStore &Synchronizer::store() | ||
581 | { | ||
582 | if (!mEntityStore) { | ||
583 | mEntityStore = QSharedPointer<EntityStore>::create(mResourceType, mResourceInstanceIdentifier, mTransaction); | ||
584 | } | ||
585 | return *mEntityStore; | ||
586 | } | ||
587 | |||
588 | SyncStore &Synchronizer::syncStore() | ||
589 | { | ||
590 | if (!mSyncStore) { | ||
591 | mSyncStore = QSharedPointer<SyncStore>::create(mSyncTransaction); | ||
592 | } | ||
593 | return *mSyncStore; | ||
594 | } | ||
595 | |||
596 | void Synchronizer::createEntity(const QByteArray &sinkId, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject, | ||
603 | DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function<void(const QByteArray &)> callback) | 597 | DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function<void(const QByteArray &)> callback) |
604 | { | 598 | { |
605 | // These changes are coming from the source | 599 | // These changes are coming from the source |
@@ -616,7 +610,7 @@ void GenericResource::createEntity(const QByteArray &sinkId, const QByteArray &b | |||
616 | callback(BufferUtils::extractBuffer(fbb)); | 610 | callback(BufferUtils::extractBuffer(fbb)); |
617 | } | 611 | } |
618 | 612 | ||
619 | void GenericResource::modifyEntity(const QByteArray &sinkId, qint64 revision, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject, | 613 | void Synchronizer::modifyEntity(const QByteArray &sinkId, qint64 revision, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject, |
620 | DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function<void(const QByteArray &)> callback) | 614 | DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function<void(const QByteArray &)> callback) |
621 | { | 615 | { |
622 | // These changes are coming from the source | 616 | // These changes are coming from the source |
@@ -634,7 +628,7 @@ void GenericResource::modifyEntity(const QByteArray &sinkId, qint64 revision, co | |||
634 | callback(BufferUtils::extractBuffer(fbb)); | 628 | callback(BufferUtils::extractBuffer(fbb)); |
635 | } | 629 | } |
636 | 630 | ||
637 | void GenericResource::deleteEntity(const QByteArray &sinkId, qint64 revision, const QByteArray &bufferType, std::function<void(const QByteArray &)> callback) | 631 | void Synchronizer::deleteEntity(const QByteArray &sinkId, qint64 revision, const QByteArray &bufferType, std::function<void(const QByteArray &)> callback) |
638 | { | 632 | { |
639 | // These changes are coming from the source | 633 | // These changes are coming from the source |
640 | const auto replayToSource = false; | 634 | const auto replayToSource = false; |
@@ -647,96 +641,36 @@ void GenericResource::deleteEntity(const QByteArray &sinkId, qint64 revision, co | |||
647 | callback(BufferUtils::extractBuffer(fbb)); | 641 | callback(BufferUtils::extractBuffer(fbb)); |
648 | } | 642 | } |
649 | 643 | ||
650 | void GenericResource::recordRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId, Sink::Storage::Transaction &transaction) | 644 | void Synchronizer::scanForRemovals(const QByteArray &bufferType, const std::function<void(const std::function<void(const QByteArray &key)> &callback)> &entryGenerator, std::function<bool(const QByteArray &remoteId)> exists) |
651 | { | ||
652 | Index("rid.mapping." + bufferType, transaction).add(remoteId, localId); | ||
653 | ; | ||
654 | Index("localid.mapping." + bufferType, transaction).add(localId, remoteId); | ||
655 | } | ||
656 | |||
657 | void GenericResource::removeRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId, Sink::Storage::Transaction &transaction) | ||
658 | { | ||
659 | Index("rid.mapping." + bufferType, transaction).remove(remoteId, localId); | ||
660 | Index("localid.mapping." + bufferType, transaction).remove(localId, remoteId); | ||
661 | } | ||
662 | |||
663 | void GenericResource::updateRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId, Sink::Storage::Transaction &transaction) | ||
664 | { | ||
665 | const auto oldRemoteId = Index("localid.mapping." + bufferType, transaction).lookup(localId); | ||
666 | removeRemoteId(bufferType, localId, oldRemoteId, transaction); | ||
667 | recordRemoteId(bufferType, localId, remoteId, transaction); | ||
668 | } | ||
669 | |||
670 | QByteArray GenericResource::resolveRemoteId(const QByteArray &bufferType, const QByteArray &remoteId, Sink::Storage::Transaction &transaction) | ||
671 | { | ||
672 | // Lookup local id for remote id, or insert a new pair otherwise | ||
673 | Index index("rid.mapping." + bufferType, transaction); | ||
674 | QByteArray sinkId = index.lookup(remoteId); | ||
675 | if (sinkId.isEmpty()) { | ||
676 | sinkId = QUuid::createUuid().toString().toUtf8(); | ||
677 | index.add(remoteId, sinkId); | ||
678 | Index("localid.mapping." + bufferType, transaction).add(sinkId, remoteId); | ||
679 | } | ||
680 | return sinkId; | ||
681 | } | ||
682 | |||
683 | QByteArray GenericResource::resolveLocalId(const QByteArray &bufferType, const QByteArray &localId, Sink::Storage::Transaction &transaction) | ||
684 | { | ||
685 | QByteArray remoteId = Index("localid.mapping." + bufferType, transaction).lookup(localId); | ||
686 | if (remoteId.isEmpty()) { | ||
687 | Warning() << "Couldn't find the remote id for " << localId; | ||
688 | return QByteArray(); | ||
689 | } | ||
690 | return remoteId; | ||
691 | } | ||
692 | |||
693 | void GenericResource::scanForRemovals(Sink::Storage::Transaction &transaction, Sink::Storage::Transaction &synchronizationTransaction, const QByteArray &bufferType, | ||
694 | const std::function<void(const std::function<void(const QByteArray &key)> &callback)> &entryGenerator, std::function<bool(const QByteArray &remoteId)> exists) | ||
695 | { | 645 | { |
696 | entryGenerator([this, &transaction, bufferType, &synchronizationTransaction, &exists](const QByteArray &key) { | 646 | entryGenerator([this, bufferType, &exists](const QByteArray &key) { |
697 | auto sinkId = Sink::Storage::uidFromKey(key); | 647 | auto sinkId = Sink::Storage::uidFromKey(key); |
698 | Trace() << "Checking for removal " << key; | 648 | Trace() << "Checking for removal " << key; |
699 | const auto remoteId = resolveLocalId(bufferType, sinkId, synchronizationTransaction); | 649 | const auto remoteId = syncStore().resolveLocalId(bufferType, sinkId); |
700 | // If we have no remoteId, the entity hasn't been replayed to the source yet | 650 | // If we have no remoteId, the entity hasn't been replayed to the source yet |
701 | if (!remoteId.isEmpty()) { | 651 | if (!remoteId.isEmpty()) { |
702 | if (!exists(remoteId)) { | 652 | if (!exists(remoteId)) { |
703 | Trace() << "Found a removed entity: " << sinkId; | 653 | Trace() << "Found a removed entity: " << sinkId; |
704 | deleteEntity(sinkId, Sink::Storage::maxRevision(transaction), bufferType, | 654 | deleteEntity(sinkId, Sink::Storage::maxRevision(mTransaction), bufferType, |
705 | [this](const QByteArray &buffer) { enqueueCommand(mSynchronizerQueue, Sink::Commands::DeleteEntityCommand, buffer); }); | 655 | [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::DeleteEntityCommand, buffer); }); |
706 | } | 656 | } |
707 | } | 657 | } |
708 | }); | 658 | }); |
709 | } | 659 | } |
710 | 660 | ||
711 | QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> GenericResource::getLatest(const Sink::Storage::NamedDatabase &db, const QByteArray &uid, DomainTypeAdaptorFactoryInterface &adaptorFactory) | 661 | void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity) |
712 | { | ||
713 | QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> current; | ||
714 | db.findLatest(uid, | ||
715 | [¤t, &adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool { | ||
716 | Sink::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); | ||
717 | if (!buffer.isValid()) { | ||
718 | Warning() << "Read invalid buffer from disk"; | ||
719 | } else { | ||
720 | current = adaptorFactory.createAdaptor(buffer.entity()); | ||
721 | } | ||
722 | return false; | ||
723 | }, | ||
724 | [](const Sink::Storage::Error &error) { Warning() << "Failed to read current value from storage: " << error.message; }); | ||
725 | return current; | ||
726 | } | ||
727 | |||
728 | void GenericResource::createOrModify(Sink::Storage::Transaction &transaction, Sink::Storage::Transaction &synchronizationTransaction, | ||
729 | DomainTypeAdaptorFactoryInterface &adaptorFactory, const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity) | ||
730 | { | 662 | { |
731 | auto mainDatabase = Storage::mainDatabase(transaction, bufferType); | 663 | Trace() << "Create or modify" << bufferType << remoteId; |
732 | const auto sinkId = resolveRemoteId(bufferType, remoteId, synchronizationTransaction); | 664 | auto mainDatabase = Storage::mainDatabase(mTransaction, bufferType); |
665 | const auto sinkId = syncStore().resolveRemoteId(bufferType, remoteId); | ||
733 | const auto found = mainDatabase.contains(sinkId); | 666 | const auto found = mainDatabase.contains(sinkId); |
667 | auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(mResourceType, bufferType); | ||
734 | if (!found) { | 668 | if (!found) { |
735 | Trace() << "Found a new entity: " << remoteId; | 669 | Trace() << "Found a new entity: " << remoteId; |
736 | createEntity( | 670 | createEntity( |
737 | sinkId, bufferType, entity, adaptorFactory, [this](const QByteArray &buffer) { enqueueCommand(mSynchronizerQueue, Sink::Commands::CreateEntityCommand, buffer); }); | 671 | sinkId, bufferType, entity, *adaptorFactory, [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::CreateEntityCommand, buffer); }); |
738 | } else { // modification | 672 | } else { // modification |
739 | if (auto current = getLatest(mainDatabase, sinkId, adaptorFactory)) { | 673 | if (auto current = store().getLatest(mainDatabase, sinkId, *adaptorFactory)) { |
740 | bool changed = false; | 674 | bool changed = false; |
741 | for (const auto &property : entity.changedProperties()) { | 675 | for (const auto &property : entity.changedProperties()) { |
742 | if (entity.getProperty(property) != current->getProperty(property)) { | 676 | if (entity.getProperty(property) != current->getProperty(property)) { |
@@ -746,8 +680,8 @@ void GenericResource::createOrModify(Sink::Storage::Transaction &transaction, Si | |||
746 | } | 680 | } |
747 | if (changed) { | 681 | if (changed) { |
748 | Trace() << "Found a modified entity: " << remoteId; | 682 | Trace() << "Found a modified entity: " << remoteId; |
749 | modifyEntity(sinkId, Sink::Storage::maxRevision(transaction), bufferType, entity, adaptorFactory, | 683 | modifyEntity(sinkId, Sink::Storage::maxRevision(mTransaction), bufferType, entity, *adaptorFactory, |
750 | [this](const QByteArray &buffer) { enqueueCommand(mSynchronizerQueue, Sink::Commands::ModifyEntityCommand, buffer); }); | 684 | [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::ModifyEntityCommand, buffer); }); |
751 | } | 685 | } |
752 | } else { | 686 | } else { |
753 | Warning() << "Failed to get current entity"; | 687 | Warning() << "Failed to get current entity"; |
@@ -755,6 +689,118 @@ void GenericResource::createOrModify(Sink::Storage::Transaction &transaction, Si | |||
755 | } | 689 | } |
756 | } | 690 | } |
757 | 691 | ||
692 | KAsync::Job<void> Synchronizer::synchronize() | ||
693 | { | ||
694 | mTransaction = mStorage.createTransaction(Sink::Storage::ReadOnly); | ||
695 | mSyncTransaction = mSyncStorage.createTransaction(Sink::Storage::ReadWrite); | ||
696 | return synchronizeWithSource().then<void>([this]() { | ||
697 | mTransaction.abort(); | ||
698 | mSyncTransaction.commit(); | ||
699 | mSyncStore.clear(); | ||
700 | mEntityStore.clear(); | ||
701 | }); | ||
702 | } | ||
703 | |||
704 | |||
705 | |||
706 | SourceWriteBack::SourceWriteBack(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier) | ||
707 | : ChangeReplay(resourceInstanceIdentifier), | ||
708 | mSyncStorage(Sink::storageLocation(), resourceInstanceIdentifier + ".synchronization", Sink::Storage::ReadWrite), | ||
709 | mResourceType(resourceType), | ||
710 | mResourceInstanceIdentifier(resourceInstanceIdentifier) | ||
711 | { | ||
712 | |||
713 | } | ||
714 | |||
715 | EntityStore &SourceWriteBack::store() | ||
716 | { | ||
717 | if (!mEntityStore) { | ||
718 | mEntityStore = QSharedPointer<EntityStore>::create(mResourceType, mResourceInstanceIdentifier, mTransaction); | ||
719 | } | ||
720 | return *mEntityStore; | ||
721 | } | ||
722 | |||
723 | SyncStore &SourceWriteBack::syncStore() | ||
724 | { | ||
725 | if (!mSyncStore) { | ||
726 | mSyncStore = QSharedPointer<SyncStore>::create(mSyncTransaction); | ||
727 | } | ||
728 | return *mSyncStore; | ||
729 | } | ||
730 | |||
731 | KAsync::Job<void> SourceWriteBack::replay(const QByteArray &type, const QByteArray &key, const QByteArray &value) | ||
732 | { | ||
733 | mTransaction = mStorage.createTransaction(Sink::Storage::ReadOnly); | ||
734 | mSyncTransaction = mSyncStorage.createTransaction(Sink::Storage::ReadWrite); | ||
735 | |||
736 | Sink::EntityBuffer buffer(value); | ||
737 | const Sink::Entity &entity = buffer.entity(); | ||
738 | const auto metadataBuffer = Sink::EntityBuffer::readBuffer<Sink::Metadata>(entity.metadata()); | ||
739 | Q_ASSERT(metadataBuffer); | ||
740 | if (!metadataBuffer->replayToSource()) { | ||
741 | Trace() << "Change is coming from the source"; | ||
742 | return KAsync::null<void>(); | ||
743 | } | ||
744 | const qint64 revision = metadataBuffer ? metadataBuffer->revision() : -1; | ||
745 | const auto operation = metadataBuffer ? metadataBuffer->operation() : Sink::Operation_Creation; | ||
746 | const auto uid = Sink::Storage::uidFromKey(key); | ||
747 | QByteArray oldRemoteId; | ||
748 | |||
749 | if (operation != Sink::Operation_Creation) { | ||
750 | oldRemoteId = syncStore().resolveLocalId(type, uid); | ||
751 | } | ||
752 | Trace() << "Replaying " << key << type; | ||
753 | |||
754 | KAsync::Job<QByteArray> job = KAsync::null<QByteArray>(); | ||
755 | if (type == ENTITY_TYPE_FOLDER) { | ||
756 | auto folder = store().read<ApplicationDomain::Folder>(uid); | ||
757 | // const Sink::ApplicationDomain::Folder folder(mResourceInstanceIdentifier, uid, revision, mAdaptorFactories.value(type)->createAdaptor(entity)); | ||
758 | job = replay(folder, operation, oldRemoteId); | ||
759 | } else if (type == ENTITY_TYPE_MAIL) { | ||
760 | auto mail = store().read<ApplicationDomain::Mail>(uid); | ||
761 | // const Sink::ApplicationDomain::Mail mail(mResourceInstanceIdentifier, uid, revision, mAdaptorFactories.value(type)->createAdaptor(entity)); | ||
762 | job = replay(mail, operation, oldRemoteId); | ||
763 | } | ||
764 | |||
765 | return job.then<void, QByteArray>([this, operation, type, uid](const QByteArray &remoteId) { | ||
766 | Trace() << "Replayed change with remote id: " << remoteId; | ||
767 | if (operation == Sink::Operation_Creation) { | ||
768 | if (remoteId.isEmpty()) { | ||
769 | Warning() << "Returned an empty remoteId from the creation"; | ||
770 | } else { | ||
771 | syncStore().recordRemoteId(type, uid, remoteId); | ||
772 | } | ||
773 | } else if (operation == Sink::Operation_Modification) { | ||
774 | if (remoteId.isEmpty()) { | ||
775 | Warning() << "Returned an empty remoteId from the creation"; | ||
776 | } else { | ||
777 | syncStore().updateRemoteId(type, uid, remoteId); | ||
778 | } | ||
779 | } else if (operation == Sink::Operation_Removal) { | ||
780 | syncStore().removeRemoteId(type, uid, remoteId); | ||
781 | } else { | ||
782 | Warning() << "Unkown operation" << operation; | ||
783 | } | ||
784 | |||
785 | mTransaction.abort(); | ||
786 | mSyncTransaction.commit(); | ||
787 | mSyncStore.clear(); | ||
788 | mEntityStore.clear(); | ||
789 | }, [](int errorCode, const QString &errorMessage) { | ||
790 | Warning() << "Failed to replay change: " << errorMessage; | ||
791 | }); | ||
792 | } | ||
793 | |||
794 | KAsync::Job<QByteArray> SourceWriteBack::replay(const ApplicationDomain::Mail &, Sink::Operation, const QByteArray &) | ||
795 | { | ||
796 | return KAsync::null<QByteArray>(); | ||
797 | } | ||
798 | |||
799 | KAsync::Job<QByteArray> SourceWriteBack::replay(const ApplicationDomain::Folder &, Sink::Operation, const QByteArray &) | ||
800 | { | ||
801 | return KAsync::null<QByteArray>(); | ||
802 | } | ||
803 | |||
758 | 804 | ||
759 | #pragma clang diagnostic push | 805 | #pragma clang diagnostic push |
760 | #pragma clang diagnostic ignored "-Wundefined-reinterpret-cast" | 806 | #pragma clang diagnostic ignored "-Wundefined-reinterpret-cast" |
diff --git a/common/genericresource.h b/common/genericresource.h index c551e29..45d5d3a 100644 --- a/common/genericresource.h +++ b/common/genericresource.h | |||
@@ -24,14 +24,16 @@ | |||
24 | #include <messagequeue.h> | 24 | #include <messagequeue.h> |
25 | #include <flatbuffers/flatbuffers.h> | 25 | #include <flatbuffers/flatbuffers.h> |
26 | #include <domainadaptor.h> | 26 | #include <domainadaptor.h> |
27 | #include "changereplay.h" | ||
28 | |||
27 | #include <QTimer> | 29 | #include <QTimer> |
28 | 30 | ||
29 | class CommandProcessor; | 31 | class CommandProcessor; |
30 | class ChangeReplay; | ||
31 | 32 | ||
32 | namespace Sink { | 33 | namespace Sink { |
33 | class Pipeline; | 34 | class Pipeline; |
34 | class Preprocessor; | 35 | class Preprocessor; |
36 | class Synchronizer; | ||
35 | 37 | ||
36 | /** | 38 | /** |
37 | * Generic Resource implementation. | 39 | * Generic Resource implementation. |
@@ -39,7 +41,7 @@ class Preprocessor; | |||
39 | class SINK_EXPORT GenericResource : public Resource | 41 | class SINK_EXPORT GenericResource : public Resource |
40 | { | 42 | { |
41 | public: | 43 | public: |
42 | GenericResource(const QByteArray &resourceInstanceIdentifier, const QSharedPointer<Pipeline> &pipeline = QSharedPointer<Pipeline>()); | 44 | GenericResource(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier, const QSharedPointer<Pipeline> &pipeline, const QSharedPointer<ChangeReplay> &changeReplay, const QSharedPointer<Synchronizer> &synchronizer); |
43 | virtual ~GenericResource(); | 45 | virtual ~GenericResource(); |
44 | 46 | ||
45 | virtual void processCommand(int commandId, const QByteArray &data) Q_DECL_OVERRIDE; | 47 | virtual void processCommand(int commandId, const QByteArray &data) Q_DECL_OVERRIDE; |
@@ -64,41 +66,90 @@ protected: | |||
64 | 66 | ||
65 | void addType(const QByteArray &type, DomainTypeAdaptorFactoryInterface::Ptr factory, const QVector<Sink::Preprocessor *> &preprocessors); | 67 | void addType(const QByteArray &type, DomainTypeAdaptorFactoryInterface::Ptr factory, const QVector<Sink::Preprocessor *> &preprocessors); |
66 | 68 | ||
67 | ///Base implementation call the replay$Type calls | ||
68 | virtual KAsync::Job<void> replay(Sink::Storage &synchronizationStore, const QByteArray &type, const QByteArray &key, const QByteArray &value); | ||
69 | ///Implement to write back changes to the server | ||
70 | virtual KAsync::Job<QByteArray> replay(const Sink::ApplicationDomain::Mail &, Sink::Operation, const QByteArray &oldRemoteId); | ||
71 | virtual KAsync::Job<QByteArray> replay(const Sink::ApplicationDomain::Folder &, Sink::Operation, const QByteArray &oldRemoteId); | ||
72 | |||
73 | void onProcessorError(int errorCode, const QString &errorMessage); | 69 | void onProcessorError(int errorCode, const QString &errorMessage); |
74 | void enqueueCommand(MessageQueue &mq, int commandId, const QByteArray &data); | 70 | void enqueueCommand(MessageQueue &mq, int commandId, const QByteArray &data); |
75 | 71 | ||
76 | static void createEntity(const QByteArray &localId, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject, | 72 | MessageQueue mUserQueue; |
77 | DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function<void(const QByteArray &)> callback); | 73 | MessageQueue mSynchronizerQueue; |
78 | static void modifyEntity(const QByteArray &localId, qint64 revision, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject, | 74 | QByteArray mResourceType; |
79 | DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function<void(const QByteArray &)> callback); | 75 | QByteArray mResourceInstanceIdentifier; |
80 | static void deleteEntity(const QByteArray &localId, qint64 revision, const QByteArray &bufferType, std::function<void(const QByteArray &)> callback); | 76 | QSharedPointer<Pipeline> mPipeline; |
77 | |||
78 | private: | ||
79 | CommandProcessor *mProcessor; | ||
80 | QSharedPointer<ChangeReplay> mChangeReplay; | ||
81 | QSharedPointer<Synchronizer> mSynchronizer; | ||
82 | int mError; | ||
83 | QTimer mCommitQueueTimer; | ||
84 | qint64 mClientLowerBoundRevision; | ||
85 | QHash<QByteArray, DomainTypeAdaptorFactoryInterface::Ptr> mAdaptorFactories; | ||
86 | }; | ||
87 | |||
88 | class SINK_EXPORT SyncStore | ||
89 | { | ||
90 | public: | ||
91 | SyncStore(Sink::Storage::Transaction &); | ||
81 | 92 | ||
82 | /** | 93 | /** |
83 | * Records a localId to remoteId mapping | 94 | * Records a localId to remoteId mapping |
84 | */ | 95 | */ |
85 | void recordRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId, Sink::Storage::Transaction &transaction); | 96 | void recordRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId); |
86 | void removeRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId, Sink::Storage::Transaction &transaction); | 97 | void removeRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId); |
87 | void updateRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId, Sink::Storage::Transaction &transaction); | 98 | void updateRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId); |
88 | 99 | ||
89 | /** | 100 | /** |
90 | * Tries to find a local id for the remote id, and creates a new local id otherwise. | 101 | * Tries to find a local id for the remote id, and creates a new local id otherwise. |
91 | * | 102 | * |
92 | * The new local id is recorded in the local to remote id mapping. | 103 | * The new local id is recorded in the local to remote id mapping. |
93 | */ | 104 | */ |
94 | QByteArray resolveRemoteId(const QByteArray &type, const QByteArray &remoteId, Sink::Storage::Transaction &transaction); | 105 | QByteArray resolveRemoteId(const QByteArray &type, const QByteArray &remoteId); |
95 | 106 | ||
96 | /** | 107 | /** |
97 | * Tries to find a remote id for a local id. | 108 | * Tries to find a remote id for a local id. |
98 | * | 109 | * |
99 | * This can fail if the entity hasn't been written back to the server yet. | 110 | * This can fail if the entity hasn't been written back to the server yet. |
100 | */ | 111 | */ |
101 | QByteArray resolveLocalId(const QByteArray &bufferType, const QByteArray &localId, Sink::Storage::Transaction &transaction); | 112 | QByteArray resolveLocalId(const QByteArray &bufferType, const QByteArray &localId); |
113 | |||
114 | private: | ||
115 | Sink::Storage::Transaction &mTransaction; | ||
116 | }; | ||
117 | |||
118 | class SINK_EXPORT EntityStore | ||
119 | { | ||
120 | public: | ||
121 | EntityStore(const QByteArray &resourceType, const QByteArray &mResourceInstanceIdentifier, Sink::Storage::Transaction &transaction); | ||
122 | |||
123 | template<typename T> | ||
124 | T read(const QByteArray &identifier) const; | ||
125 | |||
126 | static QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> getLatest(const Sink::Storage::NamedDatabase &db, const QByteArray &uid, DomainTypeAdaptorFactoryInterface &adaptorFactory); | ||
127 | private: | ||
128 | QByteArray mResourceType; | ||
129 | QByteArray mResourceInstanceIdentifier; | ||
130 | Sink::Storage::Transaction &mTransaction; | ||
131 | }; | ||
132 | |||
133 | /** | ||
134 | * Synchronize and add what we don't already have to local queue | ||
135 | */ | ||
136 | class SINK_EXPORT Synchronizer | ||
137 | { | ||
138 | public: | ||
139 | Synchronizer(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier); | ||
140 | |||
141 | void setup(const std::function<void(int commandId, const QByteArray &data)> &enqueueCommandCallback); | ||
142 | KAsync::Job<void> synchronize(); | ||
143 | |||
144 | protected: | ||
145 | ///Calls the callback to enqueue the command | ||
146 | void enqueueCommand(int commandId, const QByteArray &data); | ||
147 | |||
148 | static void createEntity(const QByteArray &localId, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject, | ||
149 | DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function<void(const QByteArray &)> callback); | ||
150 | static void modifyEntity(const QByteArray &localId, qint64 revision, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject, | ||
151 | DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function<void(const QByteArray &)> callback); | ||
152 | static void deleteEntity(const QByteArray &localId, qint64 revision, const QByteArray &bufferType, std::function<void(const QByteArray &)> callback); | ||
102 | 153 | ||
103 | /** | 154 | /** |
104 | * A synchronous algorithm to remove entities that are no longer existing. | 155 | * A synchronous algorithm to remove entities that are no longer existing. |
@@ -110,7 +161,7 @@ protected: | |||
110 | * | 161 | * |
111 | * All functions are called synchronously, and both @param entryGenerator and @param exists need to be synchronous. | 162 | * All functions are called synchronously, and both @param entryGenerator and @param exists need to be synchronous. |
112 | */ | 163 | */ |
113 | void scanForRemovals(Sink::Storage::Transaction &transaction, Sink::Storage::Transaction &synchronizationTransaction, const QByteArray &bufferType, | 164 | void scanForRemovals(const QByteArray &bufferType, |
114 | const std::function<void(const std::function<void(const QByteArray &key)> &callback)> &entryGenerator, std::function<bool(const QByteArray &remoteId)> exists); | 165 | const std::function<void(const std::function<void(const QByteArray &key)> &callback)> &entryGenerator, std::function<bool(const QByteArray &remoteId)> exists); |
115 | 166 | ||
116 | /** | 167 | /** |
@@ -118,22 +169,60 @@ protected: | |||
118 | * | 169 | * |
119 | * Depending on whether the entity is locally available, or has changed. | 170 | * Depending on whether the entity is locally available, or has changed. |
120 | */ | 171 | */ |
121 | void createOrModify(Sink::Storage::Transaction &transaction, Sink::Storage::Transaction &synchronizationTransaction, DomainTypeAdaptorFactoryInterface &adaptorFactory, | 172 | void createOrModify(const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity); |
122 | const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity); | ||
123 | 173 | ||
124 | static QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> getLatest(const Sink::Storage::NamedDatabase &db, const QByteArray &uid, DomainTypeAdaptorFactoryInterface &adaptorFactory); | 174 | //Read only access to main storage |
175 | EntityStore &store(); | ||
125 | 176 | ||
126 | MessageQueue mUserQueue; | 177 | //Read/Write access to sync storage |
127 | MessageQueue mSynchronizerQueue; | 178 | SyncStore &syncStore(); |
179 | |||
180 | virtual KAsync::Job<void> synchronizeWithSource() = 0; | ||
181 | |||
182 | private: | ||
183 | QSharedPointer<SyncStore> mSyncStore; | ||
184 | QSharedPointer<EntityStore> mEntityStore; | ||
185 | Sink::Storage mStorage; | ||
186 | Sink::Storage mSyncStorage; | ||
187 | QByteArray mResourceType; | ||
128 | QByteArray mResourceInstanceIdentifier; | 188 | QByteArray mResourceInstanceIdentifier; |
129 | QSharedPointer<Pipeline> mPipeline; | 189 | Sink::Storage::Transaction mTransaction; |
190 | Sink::Storage::Transaction mSyncTransaction; | ||
191 | std::function<void(int commandId, const QByteArray &data)> mEnqueue; | ||
192 | }; | ||
193 | |||
194 | /** | ||
195 | * Replay changes to the source | ||
196 | */ | ||
197 | class SINK_EXPORT SourceWriteBack : public ChangeReplay | ||
198 | { | ||
199 | public: | ||
200 | SourceWriteBack(const QByteArray &resourceType,const QByteArray &resourceInstanceIdentifier); | ||
201 | |||
202 | protected: | ||
203 | ///Base implementation calls the replay$Type calls | ||
204 | virtual KAsync::Job<void> replay(const QByteArray &type, const QByteArray &key, const QByteArray &value) Q_DECL_OVERRIDE; | ||
205 | |||
206 | protected: | ||
207 | ///Implement to write back changes to the server | ||
208 | virtual KAsync::Job<QByteArray> replay(const Sink::ApplicationDomain::Mail &, Sink::Operation, const QByteArray &oldRemoteId); | ||
209 | virtual KAsync::Job<QByteArray> replay(const Sink::ApplicationDomain::Folder &, Sink::Operation, const QByteArray &oldRemoteId); | ||
210 | |||
211 | //Read only access to main storage | ||
212 | EntityStore &store(); | ||
213 | |||
214 | //Read/Write access to sync storage | ||
215 | SyncStore &syncStore(); | ||
130 | 216 | ||
131 | private: | 217 | private: |
132 | CommandProcessor *mProcessor; | 218 | Sink::Storage mSyncStorage; |
133 | ChangeReplay *mSourceChangeReplay; | 219 | QSharedPointer<SyncStore> mSyncStore; |
134 | int mError; | 220 | QSharedPointer<EntityStore> mEntityStore; |
135 | QTimer mCommitQueueTimer; | 221 | Sink::Storage::Transaction mTransaction; |
136 | qint64 mClientLowerBoundRevision; | 222 | Sink::Storage::Transaction mSyncTransaction; |
137 | QHash<QByteArray, DomainTypeAdaptorFactoryInterface::Ptr> mAdaptorFactories; | 223 | QByteArray mResourceType; |
224 | QByteArray mResourceInstanceIdentifier; | ||
138 | }; | 225 | }; |
226 | |||
227 | |||
139 | } | 228 | } |
diff --git a/common/pipeline.cpp b/common/pipeline.cpp index 637a1b8..7863f67 100644 --- a/common/pipeline.cpp +++ b/common/pipeline.cpp | |||
@@ -34,6 +34,7 @@ | |||
34 | #include "entitybuffer.h" | 34 | #include "entitybuffer.h" |
35 | #include "log.h" | 35 | #include "log.h" |
36 | #include "domain/applicationdomaintype.h" | 36 | #include "domain/applicationdomaintype.h" |
37 | #include "adaptorfactoryregistry.h" | ||
37 | #include "definitions.h" | 38 | #include "definitions.h" |
38 | #include "bufferutils.h" | 39 | #include "bufferutils.h" |
39 | 40 | ||
@@ -52,11 +53,11 @@ public: | |||
52 | Storage storage; | 53 | Storage storage; |
53 | Storage::Transaction transaction; | 54 | Storage::Transaction transaction; |
54 | QHash<QString, QVector<Preprocessor *>> processors; | 55 | QHash<QString, QVector<Preprocessor *>> processors; |
55 | QHash<QString, DomainTypeAdaptorFactoryInterface::Ptr> adaptorFactory; | ||
56 | bool revisionChanged; | 56 | bool revisionChanged; |
57 | void storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid); | 57 | void storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid); |
58 | QTime transactionTime; | 58 | QTime transactionTime; |
59 | int transactionItemCount; | 59 | int transactionItemCount; |
60 | QByteArray resourceType; | ||
60 | }; | 61 | }; |
61 | 62 | ||
62 | void Pipeline::Private::storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid) | 63 | void Pipeline::Private::storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid) |
@@ -84,9 +85,9 @@ void Pipeline::setPreprocessors(const QString &entityType, const QVector<Preproc | |||
84 | d->processors[entityType] = processors; | 85 | d->processors[entityType] = processors; |
85 | } | 86 | } |
86 | 87 | ||
87 | void Pipeline::setAdaptorFactory(const QString &entityType, DomainTypeAdaptorFactoryInterface::Ptr factory) | 88 | void Pipeline::setResourceType(const QByteArray &resourceType) |
88 | { | 89 | { |
89 | d->adaptorFactory.insert(entityType, factory); | 90 | d->resourceType = resourceType; |
90 | } | 91 | } |
91 | 92 | ||
92 | void Pipeline::startTransaction() | 93 | void Pipeline::startTransaction() |
@@ -102,7 +103,9 @@ void Pipeline::startTransaction() | |||
102 | Trace() << "Starting transaction."; | 103 | Trace() << "Starting transaction."; |
103 | d->transactionTime.start(); | 104 | d->transactionTime.start(); |
104 | d->transactionItemCount = 0; | 105 | d->transactionItemCount = 0; |
105 | d->transaction = std::move(storage().createTransaction(Storage::ReadWrite)); | 106 | d->transaction = std::move(storage().createTransaction(Storage::ReadWrite, [](const Sink::Storage::Error &error) { |
107 | Warning() << error.message; | ||
108 | })); | ||
106 | } | 109 | } |
107 | 110 | ||
108 | void Pipeline::commit() | 111 | void Pipeline::commit() |
@@ -189,7 +192,7 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size) | |||
189 | auto metadataBuffer = metadataBuilder.Finish(); | 192 | auto metadataBuffer = metadataBuilder.Finish(); |
190 | FinishMetadataBuffer(metadataFbb, metadataBuffer); | 193 | FinishMetadataBuffer(metadataFbb, metadataBuffer); |
191 | 194 | ||
192 | auto adaptorFactory = d->adaptorFactory.value(bufferType); | 195 | auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceType, bufferType); |
193 | if (!adaptorFactory) { | 196 | if (!adaptorFactory) { |
194 | Warning() << "no adaptor factory for type " << bufferType; | 197 | Warning() << "no adaptor factory for type " << bufferType; |
195 | return KAsync::error<qint64>(0); | 198 | return KAsync::error<qint64>(0); |
@@ -244,7 +247,7 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size) | |||
244 | } | 247 | } |
245 | 248 | ||
246 | // TODO use only readPropertyMapper and writePropertyMapper | 249 | // TODO use only readPropertyMapper and writePropertyMapper |
247 | auto adaptorFactory = d->adaptorFactory.value(bufferType); | 250 | auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceType, bufferType); |
248 | if (!adaptorFactory) { | 251 | if (!adaptorFactory) { |
249 | Warning() << "no adaptor factory for type " << bufferType; | 252 | Warning() << "no adaptor factory for type " << bufferType; |
250 | return KAsync::error<qint64>(0); | 253 | return KAsync::error<qint64>(0); |
@@ -373,7 +376,7 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size) | |||
373 | flatbuffers::FlatBufferBuilder fbb; | 376 | flatbuffers::FlatBufferBuilder fbb; |
374 | EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), 0, 0, 0, 0); | 377 | EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), 0, 0, 0, 0); |
375 | 378 | ||
376 | auto adaptorFactory = d->adaptorFactory.value(bufferType); | 379 | auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceType, bufferType); |
377 | if (!adaptorFactory) { | 380 | if (!adaptorFactory) { |
378 | Warning() << "no adaptor factory for type " << bufferType; | 381 | Warning() << "no adaptor factory for type " << bufferType; |
379 | return KAsync::error<qint64>(0); | 382 | return KAsync::error<qint64>(0); |
diff --git a/common/pipeline.h b/common/pipeline.h index c65cbfd..2ca87a4 100644 --- a/common/pipeline.h +++ b/common/pipeline.h | |||
@@ -46,13 +46,12 @@ public: | |||
46 | 46 | ||
47 | Storage &storage() const; | 47 | Storage &storage() const; |
48 | 48 | ||
49 | void setResourceType(const QByteArray &resourceType); | ||
49 | void setPreprocessors(const QString &entityType, const QVector<Preprocessor *> &preprocessors); | 50 | void setPreprocessors(const QString &entityType, const QVector<Preprocessor *> &preprocessors); |
50 | void startTransaction(); | 51 | void startTransaction(); |
51 | void commit(); | 52 | void commit(); |
52 | Storage::Transaction &transaction(); | 53 | Storage::Transaction &transaction(); |
53 | 54 | ||
54 | void setAdaptorFactory(const QString &entityType, DomainTypeAdaptorFactoryInterface::Ptr factory); | ||
55 | |||
56 | KAsync::Job<qint64> newEntity(void const *command, size_t size); | 55 | KAsync::Job<qint64> newEntity(void const *command, size_t size); |
57 | KAsync::Job<qint64> modifiedEntity(void const *command, size_t size); | 56 | KAsync::Job<qint64> modifiedEntity(void const *command, size_t size); |
58 | KAsync::Job<qint64> deletedEntity(void const *command, size_t size); | 57 | KAsync::Job<qint64> deletedEntity(void const *command, size_t size); |
diff --git a/common/resource.cpp b/common/resource.cpp index 6713686..82c9fc8 100644 --- a/common/resource.cpp +++ b/common/resource.cpp | |||
@@ -26,6 +26,7 @@ | |||
26 | #include <QPointer> | 26 | #include <QPointer> |
27 | 27 | ||
28 | #include "facadefactory.h" | 28 | #include "facadefactory.h" |
29 | #include "adaptorfactoryregistry.h" | ||
29 | 30 | ||
30 | namespace Sink { | 31 | namespace Sink { |
31 | 32 | ||
@@ -110,6 +111,7 @@ ResourceFactory *ResourceFactory::load(const QString &resourceName) | |||
110 | if (factory) { | 111 | if (factory) { |
111 | Private::s_loadedFactories.insert(resourceName, factory); | 112 | Private::s_loadedFactories.insert(resourceName, factory); |
112 | factory->registerFacades(FacadeFactory::instance()); | 113 | factory->registerFacades(FacadeFactory::instance()); |
114 | factory->registerAdaptorFactories(AdaptorFactoryRegistry::instance()); | ||
113 | // TODO: if we need more data on it const QJsonObject json = loader.metaData()[QStringLiteral("MetaData")].toObject(); | 115 | // TODO: if we need more data on it const QJsonObject json = loader.metaData()[QStringLiteral("MetaData")].toObject(); |
114 | return factory; | 116 | return factory; |
115 | } else { | 117 | } else { |
diff --git a/common/resource.h b/common/resource.h index 0e7cd11..d6c3c5f 100644 --- a/common/resource.h +++ b/common/resource.h | |||
@@ -26,6 +26,7 @@ | |||
26 | 26 | ||
27 | namespace Sink { | 27 | namespace Sink { |
28 | class FacadeFactory; | 28 | class FacadeFactory; |
29 | class AdaptorFactoryRegistry; | ||
29 | 30 | ||
30 | /** | 31 | /** |
31 | * Resource interface | 32 | * Resource interface |
@@ -81,6 +82,7 @@ public: | |||
81 | 82 | ||
82 | virtual Resource *createResource(const QByteArray &instanceIdentifier) = 0; | 83 | virtual Resource *createResource(const QByteArray &instanceIdentifier) = 0; |
83 | virtual void registerFacades(FacadeFactory &factory) = 0; | 84 | virtual void registerFacades(FacadeFactory &factory) = 0; |
85 | virtual void registerAdaptorFactories(AdaptorFactoryRegistry ®istry) {}; | ||
84 | 86 | ||
85 | private: | 87 | private: |
86 | class Private; | 88 | class Private; |
diff --git a/examples/dummyresource/resourcefactory.cpp b/examples/dummyresource/resourcefactory.cpp index 48858da..609d23e 100644 --- a/examples/dummyresource/resourcefactory.cpp +++ b/examples/dummyresource/resourcefactory.cpp | |||
@@ -35,6 +35,7 @@ | |||
35 | #include "definitions.h" | 35 | #include "definitions.h" |
36 | #include "facadefactory.h" | 36 | #include "facadefactory.h" |
37 | #include "indexupdater.h" | 37 | #include "indexupdater.h" |
38 | #include "adaptorfactoryregistry.h" | ||
38 | #include <QDate> | 39 | #include <QDate> |
39 | #include <QUuid> | 40 | #include <QUuid> |
40 | 41 | ||
@@ -43,8 +44,87 @@ | |||
43 | #define ENTITY_TYPE_MAIL "mail" | 44 | #define ENTITY_TYPE_MAIL "mail" |
44 | #define ENTITY_TYPE_FOLDER "folder" | 45 | #define ENTITY_TYPE_FOLDER "folder" |
45 | 46 | ||
47 | class DummySynchronizer : public Sink::Synchronizer { | ||
48 | public: | ||
49 | |||
50 | DummySynchronizer(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier) | ||
51 | : Sink::Synchronizer(resourceType, resourceInstanceIdentifier) | ||
52 | { | ||
53 | |||
54 | } | ||
55 | |||
56 | Sink::ApplicationDomain::Event::Ptr createEvent(const QByteArray &ridBuffer, const QMap<QString, QVariant> &data) | ||
57 | { | ||
58 | static uint8_t rawData[100]; | ||
59 | auto event = Sink::ApplicationDomain::Event::Ptr::create(); | ||
60 | event->setProperty("summary", data.value("summary").toString()); | ||
61 | event->setProperty("remoteId", ridBuffer); | ||
62 | event->setProperty("description", data.value("description").toString()); | ||
63 | event->setProperty("attachment", QByteArray::fromRawData(reinterpret_cast<const char*>(rawData), 100)); | ||
64 | return event; | ||
65 | } | ||
66 | |||
67 | Sink::ApplicationDomain::Mail::Ptr createMail(const QByteArray &ridBuffer, const QMap<QString, QVariant> &data) | ||
68 | { | ||
69 | auto mail = Sink::ApplicationDomain::Mail::Ptr::create(); | ||
70 | mail->setProperty("subject", data.value("subject").toString()); | ||
71 | mail->setProperty("senderEmail", data.value("senderEmail").toString()); | ||
72 | mail->setProperty("senderName", data.value("senderName").toString()); | ||
73 | mail->setProperty("date", data.value("date").toString()); | ||
74 | mail->setProperty("folder", syncStore().resolveRemoteId(ENTITY_TYPE_FOLDER, data.value("parentFolder").toByteArray())); | ||
75 | mail->setProperty("unread", data.value("unread").toBool()); | ||
76 | mail->setProperty("important", data.value("important").toBool()); | ||
77 | return mail; | ||
78 | } | ||
79 | |||
80 | Sink::ApplicationDomain::Folder::Ptr createFolder(const QByteArray &ridBuffer, const QMap<QString, QVariant> &data) | ||
81 | { | ||
82 | auto folder = Sink::ApplicationDomain::Folder::Ptr::create(); | ||
83 | folder->setProperty("name", data.value("name").toString()); | ||
84 | folder->setProperty("icon", data.value("icon").toString()); | ||
85 | if (!data.value("parent").toString().isEmpty()) { | ||
86 | auto sinkId = syncStore().resolveRemoteId(ENTITY_TYPE_FOLDER, data.value("parent").toByteArray()); | ||
87 | folder->setProperty("parent", sinkId); | ||
88 | } | ||
89 | return folder; | ||
90 | } | ||
91 | |||
92 | void synchronize(const QByteArray &bufferType, const QMap<QString, QMap<QString, QVariant> > &data, std::function<Sink::ApplicationDomain::ApplicationDomainType::Ptr(const QByteArray &ridBuffer, const QMap<QString, QVariant> &data)> createEntity) | ||
93 | { | ||
94 | auto time = QSharedPointer<QTime>::create(); | ||
95 | time->start(); | ||
96 | //TODO find items to remove | ||
97 | int count = 0; | ||
98 | for (auto it = data.constBegin(); it != data.constEnd(); it++) { | ||
99 | count++; | ||
100 | const auto remoteId = it.key().toUtf8(); | ||
101 | auto entity = createEntity(remoteId, it.value()); | ||
102 | createOrModify(bufferType, remoteId, *entity); | ||
103 | } | ||
104 | Trace() << "Sync of " << count << " entities of type " << bufferType << " done." << Sink::Log::TraceTime(time->elapsed()); | ||
105 | } | ||
106 | |||
107 | KAsync::Job<void> synchronizeWithSource() Q_DECL_OVERRIDE | ||
108 | { | ||
109 | Log() << " Synchronizing with the source"; | ||
110 | return KAsync::start<void>([this]() { | ||
111 | synchronize(ENTITY_TYPE_EVENT, DummyStore::instance().events(), [this](const QByteArray &ridBuffer, const QMap<QString, QVariant> &data) { | ||
112 | return createEvent(ridBuffer, data); | ||
113 | }); | ||
114 | synchronize(ENTITY_TYPE_MAIL, DummyStore::instance().mails(), [this](const QByteArray &ridBuffer, const QMap<QString, QVariant> &data) { | ||
115 | return createMail(ridBuffer, data); | ||
116 | }); | ||
117 | synchronize(ENTITY_TYPE_FOLDER, DummyStore::instance().folders(), [this](const QByteArray &ridBuffer, const QMap<QString, QVariant> &data) { | ||
118 | return createFolder(ridBuffer, data); | ||
119 | }); | ||
120 | Log() << "Done Synchronizing"; | ||
121 | }); | ||
122 | } | ||
123 | |||
124 | }; | ||
125 | |||
46 | DummyResource::DummyResource(const QByteArray &instanceIdentifier, const QSharedPointer<Sink::Pipeline> &pipeline) | 126 | DummyResource::DummyResource(const QByteArray &instanceIdentifier, const QSharedPointer<Sink::Pipeline> &pipeline) |
47 | : Sink::GenericResource(instanceIdentifier, pipeline), | 127 | : Sink::GenericResource(PLUGIN_NAME, instanceIdentifier, pipeline, QSharedPointer<Sink::NullChangeReplay>::create(), QSharedPointer<DummySynchronizer>::create(PLUGIN_NAME, instanceIdentifier)), |
48 | mEventAdaptorFactory(QSharedPointer<DummyEventAdaptorFactory>::create()), | 128 | mEventAdaptorFactory(QSharedPointer<DummyEventAdaptorFactory>::create()), |
49 | mMailAdaptorFactory(QSharedPointer<DummyMailAdaptorFactory>::create()), | 129 | mMailAdaptorFactory(QSharedPointer<DummyMailAdaptorFactory>::create()), |
50 | mFolderAdaptorFactory(QSharedPointer<DummyFolderAdaptorFactory>::create()) | 130 | mFolderAdaptorFactory(QSharedPointer<DummyFolderAdaptorFactory>::create()) |
@@ -57,80 +137,9 @@ DummyResource::DummyResource(const QByteArray &instanceIdentifier, const QShared | |||
57 | QVector<Sink::Preprocessor*>() << new DefaultIndexUpdater<Sink::ApplicationDomain::Event>); | 137 | QVector<Sink::Preprocessor*>() << new DefaultIndexUpdater<Sink::ApplicationDomain::Event>); |
58 | } | 138 | } |
59 | 139 | ||
60 | Sink::ApplicationDomain::Event::Ptr DummyResource::createEvent(const QByteArray &ridBuffer, const QMap<QString, QVariant> &data, Sink::Storage::Transaction &transaction) | 140 | DummyResource::~DummyResource() |
61 | { | ||
62 | static uint8_t rawData[100]; | ||
63 | auto event = Sink::ApplicationDomain::Event::Ptr::create(); | ||
64 | event->setProperty("summary", data.value("summary").toString()); | ||
65 | event->setProperty("remoteId", ridBuffer); | ||
66 | event->setProperty("description", data.value("description").toString()); | ||
67 | event->setProperty("attachment", QByteArray::fromRawData(reinterpret_cast<const char*>(rawData), 100)); | ||
68 | return event; | ||
69 | } | ||
70 | |||
71 | Sink::ApplicationDomain::Mail::Ptr DummyResource::createMail(const QByteArray &ridBuffer, const QMap<QString, QVariant> &data, Sink::Storage::Transaction &transaction) | ||
72 | { | 141 | { |
73 | auto mail = Sink::ApplicationDomain::Mail::Ptr::create(); | ||
74 | mail->setProperty("subject", data.value("subject").toString()); | ||
75 | mail->setProperty("senderEmail", data.value("senderEmail").toString()); | ||
76 | mail->setProperty("senderName", data.value("senderName").toString()); | ||
77 | mail->setProperty("date", data.value("date").toString()); | ||
78 | mail->setProperty("folder", resolveRemoteId(ENTITY_TYPE_FOLDER, data.value("parentFolder").toByteArray(), transaction)); | ||
79 | mail->setProperty("unread", data.value("unread").toBool()); | ||
80 | mail->setProperty("important", data.value("important").toBool()); | ||
81 | return mail; | ||
82 | } | ||
83 | 142 | ||
84 | Sink::ApplicationDomain::Folder::Ptr DummyResource::createFolder(const QByteArray &ridBuffer, const QMap<QString, QVariant> &data, Sink::Storage::Transaction &transaction) | ||
85 | { | ||
86 | auto folder = Sink::ApplicationDomain::Folder::Ptr::create(); | ||
87 | folder->setProperty("name", data.value("name").toString()); | ||
88 | folder->setProperty("icon", data.value("icon").toString()); | ||
89 | if (!data.value("parent").toString().isEmpty()) { | ||
90 | auto sinkId = resolveRemoteId(ENTITY_TYPE_FOLDER, data.value("parent").toByteArray(), transaction); | ||
91 | folder->setProperty("parent", sinkId); | ||
92 | } | ||
93 | return folder; | ||
94 | } | ||
95 | |||
96 | void DummyResource::synchronize(const QByteArray &bufferType, const QMap<QString, QMap<QString, QVariant> > &data, Sink::Storage::Transaction &transaction, Sink::Storage::Transaction &synchronizationTransaction, DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function<Sink::ApplicationDomain::ApplicationDomainType::Ptr(const QByteArray &ridBuffer, const QMap<QString, QVariant> &data, Sink::Storage::Transaction &)> createEntity) | ||
97 | { | ||
98 | auto time = QSharedPointer<QTime>::create(); | ||
99 | time->start(); | ||
100 | //TODO find items to remove | ||
101 | int count = 0; | ||
102 | for (auto it = data.constBegin(); it != data.constEnd(); it++) { | ||
103 | count++; | ||
104 | const auto remoteId = it.key().toUtf8(); | ||
105 | auto entity = createEntity(remoteId, it.value(), synchronizationTransaction); | ||
106 | createOrModify(transaction, synchronizationTransaction, adaptorFactory, bufferType, remoteId, *entity); | ||
107 | } | ||
108 | Trace() << "Sync of " << count << " entities of type " << bufferType << " done." << Sink::Log::TraceTime(time->elapsed()); | ||
109 | } | ||
110 | |||
111 | KAsync::Job<void> DummyResource::synchronizeWithSource(Sink::Storage &mainStore, Sink::Storage &synchronizationStore) | ||
112 | { | ||
113 | Log() << " Synchronizing"; | ||
114 | return KAsync::start<void>([this, &mainStore, &synchronizationStore]() { | ||
115 | auto transaction = mainStore.createTransaction(Sink::Storage::ReadOnly); | ||
116 | auto synchronizationTransaction = synchronizationStore.createTransaction(Sink::Storage::ReadWrite); | ||
117 | synchronize(ENTITY_TYPE_EVENT, DummyStore::instance().events(), transaction, synchronizationTransaction, *mEventAdaptorFactory, [this](const QByteArray &ridBuffer, const QMap<QString, QVariant> &data, Sink::Storage::Transaction &synchronizationTransaction) { | ||
118 | return createEvent(ridBuffer, data, synchronizationTransaction); | ||
119 | }); | ||
120 | synchronize(ENTITY_TYPE_MAIL, DummyStore::instance().mails(), transaction, synchronizationTransaction, *mMailAdaptorFactory, [this](const QByteArray &ridBuffer, const QMap<QString, QVariant> &data, Sink::Storage::Transaction &synchronizationTransaction) { | ||
121 | return createMail(ridBuffer, data, synchronizationTransaction); | ||
122 | }); | ||
123 | synchronize(ENTITY_TYPE_FOLDER, DummyStore::instance().folders(), transaction, synchronizationTransaction, *mFolderAdaptorFactory, [this](const QByteArray &ridBuffer, const QMap<QString, QVariant> &data, Sink::Storage::Transaction &synchronizationTransaction) { | ||
124 | return createFolder(ridBuffer, data, synchronizationTransaction); | ||
125 | }); | ||
126 | Log() << "Done Synchronizing"; | ||
127 | }); | ||
128 | } | ||
129 | |||
130 | KAsync::Job<void> DummyResource::replay(Sink::Storage &synchronizationStore, const QByteArray &type, const QByteArray &key, const QByteArray &value) | ||
131 | { | ||
132 | Trace() << "Replaying " << key; | ||
133 | return KAsync::null<void>(); | ||
134 | } | 143 | } |
135 | 144 | ||
136 | void DummyResource::removeDataFromDisk() | 145 | void DummyResource::removeDataFromDisk() |
@@ -160,6 +169,7 @@ KAsync::Job<void> DummyResource::inspect(int inspectionType, const QByteArray &i | |||
160 | return KAsync::null<void>(); | 169 | return KAsync::null<void>(); |
161 | } | 170 | } |
162 | 171 | ||
172 | |||
163 | DummyResourceFactory::DummyResourceFactory(QObject *parent) | 173 | DummyResourceFactory::DummyResourceFactory(QObject *parent) |
164 | : Sink::ResourceFactory(parent) | 174 | : Sink::ResourceFactory(parent) |
165 | { | 175 | { |
@@ -178,3 +188,10 @@ void DummyResourceFactory::registerFacades(Sink::FacadeFactory &factory) | |||
178 | factory.registerFacade<Sink::ApplicationDomain::Folder, DummyResourceFolderFacade>(PLUGIN_NAME); | 188 | factory.registerFacade<Sink::ApplicationDomain::Folder, DummyResourceFolderFacade>(PLUGIN_NAME); |
179 | } | 189 | } |
180 | 190 | ||
191 | void DummyResourceFactory::registerAdaptorFactories(Sink::AdaptorFactoryRegistry ®istry) | ||
192 | { | ||
193 | registry.registerFactory<Sink::ApplicationDomain::Folder, DummyFolderAdaptorFactory>(PLUGIN_NAME); | ||
194 | registry.registerFactory<Sink::ApplicationDomain::Mail, DummyMailAdaptorFactory>(PLUGIN_NAME); | ||
195 | registry.registerFactory<Sink::ApplicationDomain::Event, DummyEventAdaptorFactory>(PLUGIN_NAME); | ||
196 | } | ||
197 | |||
diff --git a/examples/dummyresource/resourcefactory.h b/examples/dummyresource/resourcefactory.h index 865f6e5..f73eb32 100644 --- a/examples/dummyresource/resourcefactory.h +++ b/examples/dummyresource/resourcefactory.h | |||
@@ -37,13 +37,12 @@ class DummyResource : public Sink::GenericResource | |||
37 | { | 37 | { |
38 | public: | 38 | public: |
39 | DummyResource(const QByteArray &instanceIdentifier, const QSharedPointer<Sink::Pipeline> &pipeline = QSharedPointer<Sink::Pipeline>()); | 39 | DummyResource(const QByteArray &instanceIdentifier, const QSharedPointer<Sink::Pipeline> &pipeline = QSharedPointer<Sink::Pipeline>()); |
40 | KAsync::Job<void> synchronizeWithSource(Sink::Storage &mainStore, Sink::Storage &synchronizationStore) Q_DECL_OVERRIDE; | 40 | virtual ~DummyResource(); |
41 | using GenericResource::synchronizeWithSource; | 41 | |
42 | void removeDataFromDisk() Q_DECL_OVERRIDE; | 42 | void removeDataFromDisk() Q_DECL_OVERRIDE; |
43 | static void removeFromDisk(const QByteArray &instanceIdentifier); | 43 | static void removeFromDisk(const QByteArray &instanceIdentifier); |
44 | KAsync::Job<void> inspect(int inspectionType, const QByteArray &inspectionId, const QByteArray &domainType, const QByteArray &entityId, const QByteArray &property, const QVariant &expectedValue) Q_DECL_OVERRIDE; | 44 | KAsync::Job<void> inspect(int inspectionType, const QByteArray &inspectionId, const QByteArray &domainType, const QByteArray &entityId, const QByteArray &property, const QVariant &expectedValue) Q_DECL_OVERRIDE; |
45 | private: | 45 | private: |
46 | KAsync::Job<void> replay(Sink::Storage &synchronizationStore, const QByteArray &type, const QByteArray &key, const QByteArray &value) Q_DECL_OVERRIDE; | ||
47 | Sink::ApplicationDomain::Event::Ptr createEvent(const QByteArray &rid, const QMap<QString, QVariant> &data, Sink::Storage::Transaction &); | 46 | Sink::ApplicationDomain::Event::Ptr createEvent(const QByteArray &rid, const QMap<QString, QVariant> &data, Sink::Storage::Transaction &); |
48 | Sink::ApplicationDomain::Mail::Ptr createMail(const QByteArray &rid, const QMap<QString, QVariant> &data, Sink::Storage::Transaction &); | 47 | Sink::ApplicationDomain::Mail::Ptr createMail(const QByteArray &rid, const QMap<QString, QVariant> &data, Sink::Storage::Transaction &); |
49 | Sink::ApplicationDomain::Folder::Ptr createFolder(const QByteArray &rid, const QMap<QString, QVariant> &data, Sink::Storage::Transaction &); | 48 | Sink::ApplicationDomain::Folder::Ptr createFolder(const QByteArray &rid, const QMap<QString, QVariant> &data, Sink::Storage::Transaction &); |
@@ -65,5 +64,6 @@ public: | |||
65 | 64 | ||
66 | Sink::Resource *createResource(const QByteArray &instanceIdentifier) Q_DECL_OVERRIDE; | 65 | Sink::Resource *createResource(const QByteArray &instanceIdentifier) Q_DECL_OVERRIDE; |
67 | void registerFacades(Sink::FacadeFactory &factory) Q_DECL_OVERRIDE; | 66 | void registerFacades(Sink::FacadeFactory &factory) Q_DECL_OVERRIDE; |
67 | void registerAdaptorFactories(Sink::AdaptorFactoryRegistry ®istry) Q_DECL_OVERRIDE; | ||
68 | }; | 68 | }; |
69 | 69 | ||
diff --git a/examples/maildirresource/maildirresource.h b/examples/maildirresource/maildirresource.h index e58dc16..30c565c 100644 --- a/examples/maildirresource/maildirresource.h +++ b/examples/maildirresource/maildirresource.h | |||
@@ -74,5 +74,6 @@ public: | |||
74 | 74 | ||
75 | Sink::Resource *createResource(const QByteArray &instanceIdentifier) Q_DECL_OVERRIDE; | 75 | Sink::Resource *createResource(const QByteArray &instanceIdentifier) Q_DECL_OVERRIDE; |
76 | void registerFacades(Sink::FacadeFactory &factory) Q_DECL_OVERRIDE; | 76 | void registerFacades(Sink::FacadeFactory &factory) Q_DECL_OVERRIDE; |
77 | void registerDomainTypeAdaptors(Sink::AdaptorFactory &factory) Q_DECL_OVERRIDE; | ||
77 | }; | 78 | }; |
78 | 79 | ||
diff --git a/tests/dummyresourcetest.cpp b/tests/dummyresourcetest.cpp index 58df2da..d41f235 100644 --- a/tests/dummyresourcetest.cpp +++ b/tests/dummyresourcetest.cpp | |||
@@ -11,6 +11,7 @@ | |||
11 | #include "modelresult.h" | 11 | #include "modelresult.h" |
12 | #include "pipeline.h" | 12 | #include "pipeline.h" |
13 | #include "log.h" | 13 | #include "log.h" |
14 | #include "test.h" | ||
14 | 15 | ||
15 | using namespace Sink; | 16 | using namespace Sink; |
16 | 17 | ||
@@ -28,6 +29,7 @@ class DummyResourceTest : public QObject | |||
28 | private slots: | 29 | private slots: |
29 | void initTestCase() | 30 | void initTestCase() |
30 | { | 31 | { |
32 | Sink::Test::initTest(); | ||
31 | Sink::Log::setDebugOutputLevel(Sink::Log::Trace); | 33 | Sink::Log::setDebugOutputLevel(Sink::Log::Trace); |
32 | auto factory = Sink::ResourceFactory::load("org.kde.dummy"); | 34 | auto factory = Sink::ResourceFactory::load("org.kde.dummy"); |
33 | QVERIFY(factory); | 35 | QVERIFY(factory); |
diff --git a/tests/mailquerybenchmark.cpp b/tests/mailquerybenchmark.cpp index 20ee63c..4e58899 100644 --- a/tests/mailquerybenchmark.cpp +++ b/tests/mailquerybenchmark.cpp | |||
@@ -33,6 +33,7 @@ | |||
33 | #include <common/pipeline.h> | 33 | #include <common/pipeline.h> |
34 | #include <common/index.h> | 34 | #include <common/index.h> |
35 | #include <common/indexupdater.h> | 35 | #include <common/indexupdater.h> |
36 | #include <common/adaptorfactoryregistry.h> | ||
36 | 37 | ||
37 | #include "hawd/dataset.h" | 38 | #include "hawd/dataset.h" |
38 | #include "hawd/formatter.h" | 39 | #include "hawd/formatter.h" |
@@ -59,12 +60,11 @@ class MailQueryBenchmark : public QObject | |||
59 | TestResource::removeFromDisk(resourceIdentifier); | 60 | TestResource::removeFromDisk(resourceIdentifier); |
60 | 61 | ||
61 | auto pipeline = QSharedPointer<Sink::Pipeline>::create(resourceIdentifier); | 62 | auto pipeline = QSharedPointer<Sink::Pipeline>::create(resourceIdentifier); |
63 | pipeline->setResourceType("test"); | ||
62 | 64 | ||
63 | auto mailFactory = QSharedPointer<TestMailAdaptorFactory>::create(); | ||
64 | auto indexer = QSharedPointer<DefaultIndexUpdater<Sink::ApplicationDomain::Mail>>::create(); | 65 | auto indexer = QSharedPointer<DefaultIndexUpdater<Sink::ApplicationDomain::Mail>>::create(); |
65 | 66 | ||
66 | pipeline->setPreprocessors("mail", QVector<Sink::Preprocessor *>() << indexer.data()); | 67 | pipeline->setPreprocessors("mail", QVector<Sink::Preprocessor *>() << indexer.data()); |
67 | pipeline->setAdaptorFactory("mail", mailFactory); | ||
68 | 68 | ||
69 | auto domainTypeAdaptorFactory = QSharedPointer<TestMailAdaptorFactory>::create(); | 69 | auto domainTypeAdaptorFactory = QSharedPointer<TestMailAdaptorFactory>::create(); |
70 | 70 | ||
@@ -149,6 +149,7 @@ private slots: | |||
149 | void init() | 149 | void init() |
150 | { | 150 | { |
151 | resourceIdentifier = "org.kde.test.instance1"; | 151 | resourceIdentifier = "org.kde.test.instance1"; |
152 | Sink::AdaptorFactoryRegistry::instance().registerFactory<Sink::ApplicationDomain::Mail, TestMailAdaptorFactory>("test"); | ||
152 | } | 153 | } |
153 | 154 | ||
154 | void test50k() | 155 | void test50k() |
diff --git a/tests/pipelinebenchmark.cpp b/tests/pipelinebenchmark.cpp index 0133a6c..51481fd 100644 --- a/tests/pipelinebenchmark.cpp +++ b/tests/pipelinebenchmark.cpp | |||
@@ -33,6 +33,7 @@ | |||
33 | #include <common/pipeline.h> | 33 | #include <common/pipeline.h> |
34 | #include <common/index.h> | 34 | #include <common/index.h> |
35 | #include <common/indexupdater.h> | 35 | #include <common/indexupdater.h> |
36 | #include <common/adaptorfactoryregistry.h> | ||
36 | 37 | ||
37 | #include "hawd/dataset.h" | 38 | #include "hawd/dataset.h" |
38 | #include "hawd/formatter.h" | 39 | #include "hawd/formatter.h" |
@@ -83,10 +84,8 @@ class PipelineBenchmark : public QObject | |||
83 | TestResource::removeFromDisk(resourceIdentifier); | 84 | TestResource::removeFromDisk(resourceIdentifier); |
84 | 85 | ||
85 | auto pipeline = QSharedPointer<Sink::Pipeline>::create(resourceIdentifier); | 86 | auto pipeline = QSharedPointer<Sink::Pipeline>::create(resourceIdentifier); |
86 | |||
87 | auto mailFactory = QSharedPointer<TestMailAdaptorFactory>::create(); | ||
88 | pipeline->setPreprocessors("mail", preprocessors); | 87 | pipeline->setPreprocessors("mail", preprocessors); |
89 | pipeline->setAdaptorFactory("mail", mailFactory); | 88 | pipeline->setResourceType("test"); |
90 | 89 | ||
91 | QTime time; | 90 | QTime time; |
92 | time.start(); | 91 | time.start(); |
@@ -131,6 +130,7 @@ private slots: | |||
131 | void init() | 130 | void init() |
132 | { | 131 | { |
133 | Sink::Log::setDebugOutputLevel(Sink::Log::Warning); | 132 | Sink::Log::setDebugOutputLevel(Sink::Log::Warning); |
133 | Sink::AdaptorFactoryRegistry::instance().registerFactory<Sink::ApplicationDomain::Mail, TestMailAdaptorFactory>("test"); | ||
134 | resourceIdentifier = "org.kde.test.instance1"; | 134 | resourceIdentifier = "org.kde.test.instance1"; |
135 | } | 135 | } |
136 | 136 | ||
diff --git a/tests/pipelinetest.cpp b/tests/pipelinetest.cpp index 9290050..6ea2041 100644 --- a/tests/pipelinetest.cpp +++ b/tests/pipelinetest.cpp | |||
@@ -19,6 +19,7 @@ | |||
19 | #include "log.h" | 19 | #include "log.h" |
20 | #include "domainadaptor.h" | 20 | #include "domainadaptor.h" |
21 | #include "definitions.h" | 21 | #include "definitions.h" |
22 | #include "adaptorfactoryregistry.h" | ||
22 | 23 | ||
23 | static void removeFromDisk(const QString &name) | 24 | static void removeFromDisk(const QString &name) |
24 | { | 25 | { |
@@ -185,6 +186,7 @@ private slots: | |||
185 | void initTestCase() | 186 | void initTestCase() |
186 | { | 187 | { |
187 | Sink::Log::setDebugOutputLevel(Sink::Log::Trace); | 188 | Sink::Log::setDebugOutputLevel(Sink::Log::Trace); |
189 | Sink::AdaptorFactoryRegistry::instance().registerFactory<Sink::ApplicationDomain::Event, TestEventAdaptorFactory>("test"); | ||
188 | } | 190 | } |
189 | 191 | ||
190 | void init() | 192 | void init() |
@@ -198,9 +200,7 @@ private slots: | |||
198 | auto command = createEntityCommand(createEvent(entityFbb)); | 200 | auto command = createEntityCommand(createEvent(entityFbb)); |
199 | 201 | ||
200 | Sink::Pipeline pipeline("org.kde.pipelinetest.instance1"); | 202 | Sink::Pipeline pipeline("org.kde.pipelinetest.instance1"); |
201 | 203 | pipeline.setResourceType("test"); | |
202 | auto adaptorFactory = QSharedPointer<TestEventAdaptorFactory>::create(); | ||
203 | pipeline.setAdaptorFactory("event", adaptorFactory); | ||
204 | 204 | ||
205 | pipeline.startTransaction(); | 205 | pipeline.startTransaction(); |
206 | pipeline.newEntity(command.constData(), command.size()); | 206 | pipeline.newEntity(command.constData(), command.size()); |
@@ -216,9 +216,9 @@ private slots: | |||
216 | auto command = createEntityCommand(createEvent(entityFbb, "summary", "description")); | 216 | auto command = createEntityCommand(createEvent(entityFbb, "summary", "description")); |
217 | 217 | ||
218 | Sink::Pipeline pipeline("org.kde.pipelinetest.instance1"); | 218 | Sink::Pipeline pipeline("org.kde.pipelinetest.instance1"); |
219 | pipeline.setResourceType("test"); | ||
219 | 220 | ||
220 | auto adaptorFactory = QSharedPointer<TestEventAdaptorFactory>::create(); | 221 | auto adaptorFactory = QSharedPointer<TestEventAdaptorFactory>::create(); |
221 | pipeline.setAdaptorFactory("event", adaptorFactory); | ||
222 | 222 | ||
223 | // Create the initial revision | 223 | // Create the initial revision |
224 | pipeline.startTransaction(); | 224 | pipeline.startTransaction(); |
@@ -265,9 +265,9 @@ private slots: | |||
265 | auto command = createEntityCommand(createEvent(entityFbb)); | 265 | auto command = createEntityCommand(createEvent(entityFbb)); |
266 | 266 | ||
267 | Sink::Pipeline pipeline("org.kde.pipelinetest.instance1"); | 267 | Sink::Pipeline pipeline("org.kde.pipelinetest.instance1"); |
268 | pipeline.setResourceType("test"); | ||
268 | 269 | ||
269 | auto adaptorFactory = QSharedPointer<TestEventAdaptorFactory>::create(); | 270 | auto adaptorFactory = QSharedPointer<TestEventAdaptorFactory>::create(); |
270 | pipeline.setAdaptorFactory("event", adaptorFactory); | ||
271 | 271 | ||
272 | // Create the initial revision | 272 | // Create the initial revision |
273 | pipeline.startTransaction(); | 273 | pipeline.startTransaction(); |
@@ -309,7 +309,7 @@ private slots: | |||
309 | flatbuffers::FlatBufferBuilder entityFbb; | 309 | flatbuffers::FlatBufferBuilder entityFbb; |
310 | auto command = createEntityCommand(createEvent(entityFbb)); | 310 | auto command = createEntityCommand(createEvent(entityFbb)); |
311 | Sink::Pipeline pipeline("org.kde.pipelinetest.instance1"); | 311 | Sink::Pipeline pipeline("org.kde.pipelinetest.instance1"); |
312 | pipeline.setAdaptorFactory("event", QSharedPointer<TestEventAdaptorFactory>::create()); | 312 | pipeline.setResourceType("test"); |
313 | 313 | ||
314 | // Create the initial revision | 314 | // Create the initial revision |
315 | pipeline.startTransaction(); | 315 | pipeline.startTransaction(); |
@@ -346,9 +346,9 @@ private slots: | |||
346 | TestProcessor testProcessor; | 346 | TestProcessor testProcessor; |
347 | 347 | ||
348 | Sink::Pipeline pipeline("org.kde.pipelinetest.instance1"); | 348 | Sink::Pipeline pipeline("org.kde.pipelinetest.instance1"); |
349 | pipeline.setResourceType("test"); | ||
349 | pipeline.setPreprocessors("event", QVector<Sink::Preprocessor *>() << &testProcessor); | 350 | pipeline.setPreprocessors("event", QVector<Sink::Preprocessor *>() << &testProcessor); |
350 | pipeline.startTransaction(); | 351 | pipeline.startTransaction(); |
351 | pipeline.setAdaptorFactory("event", QSharedPointer<TestEventAdaptorFactory>::create()); | ||
352 | 352 | ||
353 | // Actual test | 353 | // Actual test |
354 | { | 354 | { |
diff --git a/tests/querytest.cpp b/tests/querytest.cpp index a654931..6d7746e 100644 --- a/tests/querytest.cpp +++ b/tests/querytest.cpp | |||
@@ -9,6 +9,7 @@ | |||
9 | #include "resourceconfig.h" | 9 | #include "resourceconfig.h" |
10 | #include "log.h" | 10 | #include "log.h" |
11 | #include "modelresult.h" | 11 | #include "modelresult.h" |
12 | #include "test.h" | ||
12 | 13 | ||
13 | /** | 14 | /** |
14 | * Test of the query system using the dummy resource. | 15 | * Test of the query system using the dummy resource. |
@@ -21,6 +22,7 @@ class QueryTest : public QObject | |||
21 | private slots: | 22 | private slots: |
22 | void initTestCase() | 23 | void initTestCase() |
23 | { | 24 | { |
25 | Sink::Test::initTest(); | ||
24 | Sink::Log::setDebugOutputLevel(Sink::Log::Trace); | 26 | Sink::Log::setDebugOutputLevel(Sink::Log::Trace); |
25 | auto factory = Sink::ResourceFactory::load("org.kde.dummy"); | 27 | auto factory = Sink::ResourceFactory::load("org.kde.dummy"); |
26 | QVERIFY(factory); | 28 | QVERIFY(factory); |
diff --git a/tests/testimplementations.h b/tests/testimplementations.h index 688875d..197602c 100644 --- a/tests/testimplementations.h +++ b/tests/testimplementations.h | |||
@@ -107,7 +107,7 @@ public: | |||
107 | class TestResource : public Sink::GenericResource | 107 | class TestResource : public Sink::GenericResource |
108 | { | 108 | { |
109 | public: | 109 | public: |
110 | TestResource(const QByteArray &instanceIdentifier, QSharedPointer<Sink::Pipeline> pipeline) : Sink::GenericResource(instanceIdentifier, pipeline) | 110 | TestResource(const QByteArray &instanceIdentifier, QSharedPointer<Sink::Pipeline> pipeline) : Sink::GenericResource("test", instanceIdentifier, pipeline, QSharedPointer<Sink::ChangeReplay>(), QSharedPointer<Sink::Synchronizer>()) |
111 | { | 111 | { |
112 | } | 112 | } |
113 | 113 | ||