diff options
-rw-r--r-- | common/CMakeLists.txt | 2 | ||||
-rw-r--r-- | common/adaptorfactoryregistry.cpp | 68 | ||||
-rw-r--r-- | common/adaptorfactoryregistry.h | 64 | ||||
-rw-r--r-- | common/changereplay.cpp | 102 | ||||
-rw-r--r-- | common/changereplay.h | 68 | ||||
-rw-r--r-- | common/genericresource.cpp | 560 | ||||
-rw-r--r-- | common/genericresource.h | 151 | ||||
-rw-r--r-- | common/pipeline.cpp | 17 | ||||
-rw-r--r-- | common/pipeline.h | 3 | ||||
-rw-r--r-- | common/resource.cpp | 2 | ||||
-rw-r--r-- | common/resource.h | 2 | ||||
-rw-r--r-- | examples/dummyresource/resourcefactory.cpp | 163 | ||||
-rw-r--r-- | examples/dummyresource/resourcefactory.h | 6 | ||||
-rw-r--r-- | examples/maildirresource/maildirresource.h | 1 | ||||
-rw-r--r-- | tests/dummyresourcetest.cpp | 2 | ||||
-rw-r--r-- | tests/mailquerybenchmark.cpp | 5 | ||||
-rw-r--r-- | tests/pipelinebenchmark.cpp | 6 | ||||
-rw-r--r-- | tests/pipelinetest.cpp | 14 | ||||
-rw-r--r-- | tests/querytest.cpp | 2 | ||||
-rw-r--r-- | tests/testimplementations.h | 2 |
20 files changed, 854 insertions, 386 deletions
diff --git a/common/CMakeLists.txt b/common/CMakeLists.txt index c269a85..79b627a 100644 --- a/common/CMakeLists.txt +++ b/common/CMakeLists.txt | |||
@@ -66,6 +66,8 @@ set(command_SRCS | |||
66 | domain/folder.cpp | 66 | domain/folder.cpp |
67 | test.cpp | 67 | test.cpp |
68 | query.cpp | 68 | query.cpp |
69 | changereplay.cpp | ||
70 | adaptorfactoryregistry.cpp | ||
69 | ${storage_SRCS}) | 71 | ${storage_SRCS}) |
70 | 72 | ||
71 | add_library(${PROJECT_NAME} SHARED ${command_SRCS}) | 73 | add_library(${PROJECT_NAME} SHARED ${command_SRCS}) |
diff --git a/common/adaptorfactoryregistry.cpp b/common/adaptorfactoryregistry.cpp new file mode 100644 index 0000000..323a02d --- /dev/null +++ b/common/adaptorfactoryregistry.cpp | |||
@@ -0,0 +1,68 @@ | |||
1 | /* | ||
2 | * Copyright (C) 2016 Christian Mollekopf <chrigi_1@fastmail.fm> | ||
3 | * | ||
4 | * This library is free software; you can redistribute it and/or | ||
5 | * modify it under the terms of the GNU Lesser General Public | ||
6 | * License as published by the Free Software Foundation; either | ||
7 | * version 2.1 of the License, or (at your option) version 3, or any | ||
8 | * later version accepted by the membership of KDE e.V. (or its | ||
9 | * successor approved by the membership of KDE e.V.), which shall | ||
10 | * act as a proxy defined in Section 6 of version 3 of the license. | ||
11 | * | ||
12 | * This library is distributed in the hope that it will be useful, | ||
13 | * but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
14 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | ||
15 | * Lesser General Public License for more details. | ||
16 | * | ||
17 | * You should have received a copy of the GNU Lesser General Public | ||
18 | * License along with this library. If not, see <http://www.gnu.org/licenses/>. | ||
19 | */ | ||
20 | #include "adaptorfactoryregistry.h" | ||
21 | |||
22 | #include <QByteArray> | ||
23 | #include <QDebug> | ||
24 | #include <QMutex> | ||
25 | #include <functional> | ||
26 | #include <memory> | ||
27 | |||
28 | #include "domaintypeadaptorfactoryinterface.h" | ||
29 | #include "applicationdomaintype.h" | ||
30 | #include "log.h" | ||
31 | |||
32 | using namespace Sink; | ||
33 | |||
34 | AdaptorFactoryRegistry &AdaptorFactoryRegistry::instance() | ||
35 | { | ||
36 | // QMutexLocker locker(&sMutex); | ||
37 | static AdaptorFactoryRegistry *instance = 0; | ||
38 | if (!instance) { | ||
39 | instance = new AdaptorFactoryRegistry; | ||
40 | } | ||
41 | return *instance; | ||
42 | } | ||
43 | |||
44 | static QByteArray key(const QByteArray &resource, const QByteArray &type) | ||
45 | { | ||
46 | return resource + type; | ||
47 | } | ||
48 | |||
49 | AdaptorFactoryRegistry::AdaptorFactoryRegistry() | ||
50 | { | ||
51 | |||
52 | } | ||
53 | |||
54 | std::shared_ptr<DomainTypeAdaptorFactoryInterface> AdaptorFactoryRegistry::getFactory(const QByteArray &resource, const QByteArray &typeName) | ||
55 | { | ||
56 | const auto ptr = mRegistry.value(key(resource, typeName)); | ||
57 | //We have to check the pointer before the cast, otherwise a check would return true also for invalid instances. | ||
58 | if (!ptr) { | ||
59 | return std::shared_ptr<DomainTypeAdaptorFactoryInterface>(); | ||
60 | } | ||
61 | return std::static_pointer_cast<DomainTypeAdaptorFactoryInterface>(ptr); | ||
62 | } | ||
63 | |||
64 | void AdaptorFactoryRegistry::registerFactory(const QByteArray &resource, const std::shared_ptr<void> &instance, const QByteArray typeName) | ||
65 | { | ||
66 | mRegistry.insert(key(resource, typeName), instance); | ||
67 | } | ||
68 | |||
diff --git a/common/adaptorfactoryregistry.h b/common/adaptorfactoryregistry.h new file mode 100644 index 0000000..f06120a --- /dev/null +++ b/common/adaptorfactoryregistry.h | |||
@@ -0,0 +1,64 @@ | |||
1 | /* | ||
2 | * Copyright (C) 2016 Christian Mollekopf <chrigi_1@fastmail.fm> | ||
3 | * | ||
4 | * This library is free software; you can redistribute it and/or | ||
5 | * modify it under the terms of the GNU Lesser General Public | ||
6 | * License as published by the Free Software Foundation; either | ||
7 | * version 2.1 of the License, or (at your option) version 3, or any | ||
8 | * later version accepted by the membership of KDE e.V. (or its | ||
9 | * successor approved by the membership of KDE e.V.), which shall | ||
10 | * act as a proxy defined in Section 6 of version 3 of the license. | ||
11 | * | ||
12 | * This library is distributed in the hope that it will be useful, | ||
13 | * but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
14 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | ||
15 | * Lesser General Public License for more details. | ||
16 | * | ||
17 | * You should have received a copy of the GNU Lesser General Public | ||
18 | * License along with this library. If not, see <http://www.gnu.org/licenses/>. | ||
19 | */ | ||
20 | |||
21 | #pragma once | ||
22 | |||
23 | #include "sink_export.h" | ||
24 | #include <QByteArray> | ||
25 | #include <QDebug> | ||
26 | #include <QMutex> | ||
27 | #include <functional> | ||
28 | #include <memory> | ||
29 | |||
30 | #include "domaintypeadaptorfactoryinterface.h" | ||
31 | #include "applicationdomaintype.h" | ||
32 | #include "log.h" | ||
33 | |||
34 | namespace Sink { | ||
35 | |||
36 | /** | ||
37 | */ | ||
38 | class SINK_EXPORT AdaptorFactoryRegistry | ||
39 | { | ||
40 | public: | ||
41 | static AdaptorFactoryRegistry &instance(); | ||
42 | |||
43 | template <class DomainType, class Factory> | ||
44 | void registerFactory(const QByteArray &resource) | ||
45 | { | ||
46 | registerFactory(resource, std::make_shared<Factory>(), ApplicationDomain::getTypeName<DomainType>()); | ||
47 | } | ||
48 | |||
49 | template <class DomainType> | ||
50 | std::shared_ptr<DomainTypeAdaptorFactoryInterface> getFactory(const QByteArray &resource) | ||
51 | { | ||
52 | return getFactory(resource, ApplicationDomain::getTypeName<DomainType>()); | ||
53 | } | ||
54 | |||
55 | std::shared_ptr<DomainTypeAdaptorFactoryInterface> getFactory(const QByteArray &resource, const QByteArray &typeName); | ||
56 | |||
57 | private: | ||
58 | AdaptorFactoryRegistry(); | ||
59 | void registerFactory(const QByteArray &resource, const std::shared_ptr<void> &instance, const QByteArray typeName); | ||
60 | |||
61 | QHash<QByteArray, std::shared_ptr<void>> mRegistry; | ||
62 | static QMutex sMutex; | ||
63 | }; | ||
64 | } | ||
diff --git a/common/changereplay.cpp b/common/changereplay.cpp new file mode 100644 index 0000000..2447b6e --- /dev/null +++ b/common/changereplay.cpp | |||
@@ -0,0 +1,102 @@ | |||
1 | /* | ||
2 | * Copyright (C) 2016 Christian Mollekopf <mollekopf@kolabsys.com> | ||
3 | * | ||
4 | * This library is free software; you can redistribute it and/or | ||
5 | * modify it under the terms of the GNU Lesser General Public | ||
6 | * License as published by the Free Software Foundation; either | ||
7 | * version 2.1 of the License, or (at your option) version 3, or any | ||
8 | * later version accepted by the membership of KDE e.V. (or its | ||
9 | * successor approved by the membership of KDE e.V.), which shall | ||
10 | * act as a proxy defined in Section 6 of version 3 of the license. | ||
11 | * | ||
12 | * This library is distributed in the hope that it will be useful, | ||
13 | * but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
14 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | ||
15 | * Lesser General Public License for more details. | ||
16 | * | ||
17 | * You should have received a copy of the GNU Lesser General Public | ||
18 | * License along with this library. If not, see <http://www.gnu.org/licenses/>. | ||
19 | */ | ||
20 | #include "changereplay.h" | ||
21 | |||
22 | #include "entitybuffer.h" | ||
23 | #include "log.h" | ||
24 | #include "definitions.h" | ||
25 | #include "bufferutils.h" | ||
26 | |||
27 | using namespace Sink; | ||
28 | |||
29 | #undef DEBUG_AREA | ||
30 | #define DEBUG_AREA "resource.changereplay" | ||
31 | |||
32 | ChangeReplay::ChangeReplay(const QByteArray &resourceName) | ||
33 | : mStorage(storageLocation(), resourceName, Storage::ReadOnly), mChangeReplayStore(storageLocation(), resourceName + ".changereplay", Storage::ReadWrite) | ||
34 | { | ||
35 | Trace() << "Created change replay: " << resourceName; | ||
36 | } | ||
37 | |||
38 | qint64 ChangeReplay::getLastReplayedRevision() | ||
39 | { | ||
40 | qint64 lastReplayedRevision = 0; | ||
41 | auto replayStoreTransaction = mChangeReplayStore.createTransaction(Storage::ReadOnly); | ||
42 | replayStoreTransaction.openDatabase().scan("lastReplayedRevision", | ||
43 | [&lastReplayedRevision](const QByteArray &key, const QByteArray &value) -> bool { | ||
44 | lastReplayedRevision = value.toLongLong(); | ||
45 | return false; | ||
46 | }, | ||
47 | [](const Storage::Error &) {}); | ||
48 | return lastReplayedRevision; | ||
49 | } | ||
50 | |||
51 | bool ChangeReplay::allChangesReplayed() | ||
52 | { | ||
53 | const qint64 topRevision = Storage::maxRevision(mStorage.createTransaction(Storage::ReadOnly, [](const Sink::Storage::Error &error) { | ||
54 | Warning() << error.message; | ||
55 | })); | ||
56 | const qint64 lastReplayedRevision = getLastReplayedRevision(); | ||
57 | Trace() << "All changes replayed " << topRevision << lastReplayedRevision; | ||
58 | return (lastReplayedRevision >= topRevision); | ||
59 | } | ||
60 | |||
61 | void ChangeReplay::revisionChanged() | ||
62 | { | ||
63 | auto mainStoreTransaction = mStorage.createTransaction(Storage::ReadOnly, [](const Sink::Storage::Error &error) { | ||
64 | Warning() << error.message; | ||
65 | }); | ||
66 | auto replayStoreTransaction = mChangeReplayStore.createTransaction(Storage::ReadWrite, [](const Sink::Storage::Error &error) { | ||
67 | Warning() << error.message; | ||
68 | }); | ||
69 | qint64 lastReplayedRevision = 1; | ||
70 | replayStoreTransaction.openDatabase().scan("lastReplayedRevision", | ||
71 | [&lastReplayedRevision](const QByteArray &key, const QByteArray &value) -> bool { | ||
72 | lastReplayedRevision = value.toLongLong(); | ||
73 | return false; | ||
74 | }, | ||
75 | [](const Storage::Error &) {}); | ||
76 | const qint64 topRevision = Storage::maxRevision(mainStoreTransaction); | ||
77 | |||
78 | Trace() << "Changereplay from " << lastReplayedRevision << " to " << topRevision; | ||
79 | if (lastReplayedRevision <= topRevision) { | ||
80 | qint64 revision = lastReplayedRevision; | ||
81 | for (; revision <= topRevision; revision++) { | ||
82 | const auto uid = Storage::getUidFromRevision(mainStoreTransaction, revision); | ||
83 | const auto type = Storage::getTypeFromRevision(mainStoreTransaction, revision); | ||
84 | const auto key = Storage::assembleKey(uid, revision); | ||
85 | Storage::mainDatabase(mainStoreTransaction, type) | ||
86 | .scan(key, | ||
87 | [&lastReplayedRevision, type, this](const QByteArray &key, const QByteArray &value) -> bool { | ||
88 | replay(type, key, value).exec(); | ||
89 | // TODO make for loop async, and pass to async replay function together with type | ||
90 | Trace() << "Replaying " << key; | ||
91 | return false; | ||
92 | }, | ||
93 | [key](const Storage::Error &) { ErrorMsg() << "Failed to replay change " << key; }); | ||
94 | } | ||
95 | revision--; | ||
96 | replayStoreTransaction.openDatabase().write("lastReplayedRevision", QByteArray::number(revision)); | ||
97 | replayStoreTransaction.commit(); | ||
98 | Trace() << "Replayed until " << revision; | ||
99 | } | ||
100 | emit changesReplayed(); | ||
101 | } | ||
102 | |||
diff --git a/common/changereplay.h b/common/changereplay.h new file mode 100644 index 0000000..a568060 --- /dev/null +++ b/common/changereplay.h | |||
@@ -0,0 +1,68 @@ | |||
1 | /* | ||
2 | * Copyright (C) 2016 Christian Mollekopf <mollekopf@kolabsys.com> | ||
3 | * | ||
4 | * This library is free software; you can redistribute it and/or | ||
5 | * modify it under the terms of the GNU Lesser General Public | ||
6 | * License as published by the Free Software Foundation; either | ||
7 | * version 2.1 of the License, or (at your option) version 3, or any | ||
8 | * later version accepted by the membership of KDE e.V. (or its | ||
9 | * successor approved by the membership of KDE e.V.), which shall | ||
10 | * act as a proxy defined in Section 6 of version 3 of the license. | ||
11 | * | ||
12 | * This library is distributed in the hope that it will be useful, | ||
13 | * but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
14 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | ||
15 | * Lesser General Public License for more details. | ||
16 | * | ||
17 | * You should have received a copy of the GNU Lesser General Public | ||
18 | * License along with this library. If not, see <http://www.gnu.org/licenses/>. | ||
19 | */ | ||
20 | #pragma once | ||
21 | |||
22 | #include "sink_export.h" | ||
23 | #include <QObject> | ||
24 | #include <Async/Async> | ||
25 | |||
26 | #include "storage.h" | ||
27 | |||
28 | namespace Sink { | ||
29 | |||
30 | /** | ||
31 | * Replays changes from the storage one by one. | ||
32 | * | ||
33 | * Uses a local database to: | ||
34 | * * Remember what changes have been replayed already. | ||
35 | * * store a mapping of remote to local buffers | ||
36 | */ | ||
37 | class SINK_EXPORT ChangeReplay : public QObject | ||
38 | { | ||
39 | Q_OBJECT | ||
40 | public: | ||
41 | ChangeReplay(const QByteArray &resourceName); | ||
42 | |||
43 | qint64 getLastReplayedRevision(); | ||
44 | bool allChangesReplayed(); | ||
45 | |||
46 | signals: | ||
47 | void changesReplayed(); | ||
48 | |||
49 | public slots: | ||
50 | void revisionChanged(); | ||
51 | |||
52 | protected: | ||
53 | virtual KAsync::Job<void> replay(const QByteArray &type, const QByteArray &key, const QByteArray &value) = 0; | ||
54 | Sink::Storage mStorage; | ||
55 | |||
56 | private: | ||
57 | Sink::Storage mChangeReplayStore; | ||
58 | }; | ||
59 | |||
60 | class NullChangeReplay : public ChangeReplay | ||
61 | { | ||
62 | public: | ||
63 | NullChangeReplay() : ChangeReplay("null") {} | ||
64 | KAsync::Job<void> replay(const QByteArray &type, const QByteArray &key, const QByteArray &value) Q_DECL_OVERRIDE { return KAsync::null<void>(); } | ||
65 | }; | ||
66 | |||
67 | } | ||
68 | |||
diff --git a/common/genericresource.cpp b/common/genericresource.cpp index eae6ead..cb2ef21 100644 --- a/common/genericresource.cpp +++ b/common/genericresource.cpp | |||
@@ -1,3 +1,22 @@ | |||
1 | /* | ||
2 | * Copyright (C) 2016 Christian Mollekopf <mollekopf@kolabsys.com> | ||
3 | * | ||
4 | * This library is free software; you can redistribute it and/or | ||
5 | * modify it under the terms of the GNU Lesser General Public | ||
6 | * License as published by the Free Software Foundation; either | ||
7 | * version 2.1 of the License, or (at your option) version 3, or any | ||
8 | * later version accepted by the membership of KDE e.V. (or its | ||
9 | * successor approved by the membership of KDE e.V.), which shall | ||
10 | * act as a proxy defined in Section 6 of version 3 of the license. | ||
11 | * | ||
12 | * This library is distributed in the hope that it will be useful, | ||
13 | * but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
14 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | ||
15 | * Lesser General Public License for more details. | ||
16 | * | ||
17 | * You should have received a copy of the GNU Lesser General Public | ||
18 | * License along with this library. If not, see <http://www.gnu.org/licenses/>. | ||
19 | */ | ||
1 | #include "genericresource.h" | 20 | #include "genericresource.h" |
2 | 21 | ||
3 | #include "entitybuffer.h" | 22 | #include "entitybuffer.h" |
@@ -14,6 +33,7 @@ | |||
14 | #include "log.h" | 33 | #include "log.h" |
15 | #include "definitions.h" | 34 | #include "definitions.h" |
16 | #include "bufferutils.h" | 35 | #include "bufferutils.h" |
36 | #include "adaptorfactoryregistry.h" | ||
17 | 37 | ||
18 | #include <QUuid> | 38 | #include <QUuid> |
19 | #include <QDataStream> | 39 | #include <QDataStream> |
@@ -30,96 +50,6 @@ static int sCommitInterval = 10; | |||
30 | using namespace Sink; | 50 | using namespace Sink; |
31 | 51 | ||
32 | #undef DEBUG_AREA | 52 | #undef DEBUG_AREA |
33 | #define DEBUG_AREA "resource.changereplay" | ||
34 | |||
35 | /** | ||
36 | * Replays changes from the storage one by one. | ||
37 | * | ||
38 | * Uses a local database to: | ||
39 | * * Remember what changes have been replayed already. | ||
40 | * * store a mapping of remote to local buffers | ||
41 | */ | ||
42 | class ChangeReplay : public QObject | ||
43 | { | ||
44 | Q_OBJECT | ||
45 | public: | ||
46 | typedef std::function<KAsync::Job<void>(const QByteArray &type, const QByteArray &key, const QByteArray &value)> ReplayFunction; | ||
47 | |||
48 | ChangeReplay(const QString &resourceName, const ReplayFunction &replayFunction) | ||
49 | : mStorage(storageLocation(), resourceName, Storage::ReadOnly), mChangeReplayStore(storageLocation(), resourceName + ".changereplay", Storage::ReadWrite), mReplayFunction(replayFunction) | ||
50 | { | ||
51 | } | ||
52 | |||
53 | qint64 getLastReplayedRevision() | ||
54 | { | ||
55 | qint64 lastReplayedRevision = 0; | ||
56 | auto replayStoreTransaction = mChangeReplayStore.createTransaction(Storage::ReadOnly); | ||
57 | replayStoreTransaction.openDatabase().scan("lastReplayedRevision", | ||
58 | [&lastReplayedRevision](const QByteArray &key, const QByteArray &value) -> bool { | ||
59 | lastReplayedRevision = value.toLongLong(); | ||
60 | return false; | ||
61 | }, | ||
62 | [](const Storage::Error &) {}); | ||
63 | return lastReplayedRevision; | ||
64 | } | ||
65 | |||
66 | bool allChangesReplayed() | ||
67 | { | ||
68 | const qint64 topRevision = Storage::maxRevision(mStorage.createTransaction(Storage::ReadOnly)); | ||
69 | const qint64 lastReplayedRevision = getLastReplayedRevision(); | ||
70 | Trace() << "All changes replayed " << topRevision << lastReplayedRevision; | ||
71 | return (lastReplayedRevision >= topRevision); | ||
72 | } | ||
73 | |||
74 | signals: | ||
75 | void changesReplayed(); | ||
76 | |||
77 | public slots: | ||
78 | void revisionChanged() | ||
79 | { | ||
80 | auto mainStoreTransaction = mStorage.createTransaction(Storage::ReadOnly); | ||
81 | auto replayStoreTransaction = mChangeReplayStore.createTransaction(Storage::ReadWrite); | ||
82 | qint64 lastReplayedRevision = 1; | ||
83 | replayStoreTransaction.openDatabase().scan("lastReplayedRevision", | ||
84 | [&lastReplayedRevision](const QByteArray &key, const QByteArray &value) -> bool { | ||
85 | lastReplayedRevision = value.toLongLong(); | ||
86 | return false; | ||
87 | }, | ||
88 | [](const Storage::Error &) {}); | ||
89 | const qint64 topRevision = Storage::maxRevision(mainStoreTransaction); | ||
90 | |||
91 | Trace() << "Changereplay from " << lastReplayedRevision << " to " << topRevision; | ||
92 | if (lastReplayedRevision <= topRevision) { | ||
93 | qint64 revision = lastReplayedRevision; | ||
94 | for (; revision <= topRevision; revision++) { | ||
95 | const auto uid = Storage::getUidFromRevision(mainStoreTransaction, revision); | ||
96 | const auto type = Storage::getTypeFromRevision(mainStoreTransaction, revision); | ||
97 | const auto key = Storage::assembleKey(uid, revision); | ||
98 | Storage::mainDatabase(mainStoreTransaction, type) | ||
99 | .scan(key, | ||
100 | [&lastReplayedRevision, type, this](const QByteArray &key, const QByteArray &value) -> bool { | ||
101 | mReplayFunction(type, key, value).exec(); | ||
102 | // TODO make for loop async, and pass to async replay function together with type | ||
103 | Trace() << "Replaying " << key; | ||
104 | return false; | ||
105 | }, | ||
106 | [key](const Storage::Error &) { ErrorMsg() << "Failed to replay change " << key; }); | ||
107 | } | ||
108 | revision--; | ||
109 | replayStoreTransaction.openDatabase().write("lastReplayedRevision", QByteArray::number(revision)); | ||
110 | replayStoreTransaction.commit(); | ||
111 | Trace() << "Replayed until " << revision; | ||
112 | } | ||
113 | emit changesReplayed(); | ||
114 | } | ||
115 | |||
116 | private: | ||
117 | Sink::Storage mStorage; | ||
118 | Sink::Storage mChangeReplayStore; | ||
119 | ReplayFunction mReplayFunction; | ||
120 | }; | ||
121 | |||
122 | #undef DEBUG_AREA | ||
123 | #define DEBUG_AREA "resource.commandprocessor" | 53 | #define DEBUG_AREA "resource.commandprocessor" |
124 | 54 | ||
125 | /** | 55 | /** |
@@ -133,10 +63,9 @@ class CommandProcessor : public QObject | |||
133 | public: | 63 | public: |
134 | CommandProcessor(Sink::Pipeline *pipeline, QList<MessageQueue *> commandQueues) : QObject(), mPipeline(pipeline), mCommandQueues(commandQueues), mProcessingLock(false) | 64 | CommandProcessor(Sink::Pipeline *pipeline, QList<MessageQueue *> commandQueues) : QObject(), mPipeline(pipeline), mCommandQueues(commandQueues), mProcessingLock(false) |
135 | { | 65 | { |
136 | mPipeline->startTransaction(); | 66 | mLowerBoundRevision = Storage::maxRevision(mPipeline->storage().createTransaction(Storage::ReadOnly, [](const Sink::Storage::Error &error) { |
137 | // FIXME Should be initialized to the current value of the change replay queue | 67 | Warning() << error.message; |
138 | mLowerBoundRevision = Storage::maxRevision(mPipeline->transaction()); | 68 | })); |
139 | mPipeline->commit(); | ||
140 | 69 | ||
141 | for (auto queue : mCommandQueues) { | 70 | for (auto queue : mCommandQueues) { |
142 | const bool ret = connect(queue, &MessageQueue::messageReady, this, &CommandProcessor::process); | 71 | const bool ret = connect(queue, &MessageQueue::messageReady, this, &CommandProcessor::process); |
@@ -303,15 +232,22 @@ private: | |||
303 | #undef DEBUG_AREA | 232 | #undef DEBUG_AREA |
304 | #define DEBUG_AREA "resource" | 233 | #define DEBUG_AREA "resource" |
305 | 234 | ||
306 | GenericResource::GenericResource(const QByteArray &resourceInstanceIdentifier, const QSharedPointer<Pipeline> &pipeline) | 235 | GenericResource::GenericResource(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier, const QSharedPointer<Pipeline> &pipeline, const QSharedPointer<ChangeReplay> &changeReplay, const QSharedPointer<Synchronizer> &synchronizer) |
307 | : Sink::Resource(), | 236 | : Sink::Resource(), |
308 | mUserQueue(Sink::storageLocation(), resourceInstanceIdentifier + ".userqueue"), | 237 | mUserQueue(Sink::storageLocation(), resourceInstanceIdentifier + ".userqueue"), |
309 | mSynchronizerQueue(Sink::storageLocation(), resourceInstanceIdentifier + ".synchronizerqueue"), | 238 | mSynchronizerQueue(Sink::storageLocation(), resourceInstanceIdentifier + ".synchronizerqueue"), |
239 | mResourceType(resourceType), | ||
310 | mResourceInstanceIdentifier(resourceInstanceIdentifier), | 240 | mResourceInstanceIdentifier(resourceInstanceIdentifier), |
311 | mPipeline(pipeline ? pipeline : QSharedPointer<Sink::Pipeline>::create(resourceInstanceIdentifier)), | 241 | mPipeline(pipeline ? pipeline : QSharedPointer<Sink::Pipeline>::create(resourceInstanceIdentifier)), |
242 | mChangeReplay(changeReplay), | ||
243 | mSynchronizer(synchronizer), | ||
312 | mError(0), | 244 | mError(0), |
313 | mClientLowerBoundRevision(std::numeric_limits<qint64>::max()) | 245 | mClientLowerBoundRevision(std::numeric_limits<qint64>::max()) |
314 | { | 246 | { |
247 | mPipeline->setResourceType(mResourceType); | ||
248 | mSynchronizer->setup([this](int commandId, const QByteArray &data) { | ||
249 | enqueueCommand(mSynchronizerQueue, commandId, data); | ||
250 | }); | ||
315 | mProcessor = new CommandProcessor(mPipeline.data(), QList<MessageQueue *>() << &mUserQueue << &mSynchronizerQueue); | 251 | mProcessor = new CommandProcessor(mPipeline.data(), QList<MessageQueue *>() << &mUserQueue << &mSynchronizerQueue); |
316 | mProcessor->setInspectionCommand([this](void const *command, size_t size) { | 252 | mProcessor->setInspectionCommand([this](void const *command, size_t size) { |
317 | flatbuffers::Verifier verifier((const uint8_t *)command, size); | 253 | flatbuffers::Verifier verifier((const uint8_t *)command, size); |
@@ -353,14 +289,9 @@ GenericResource::GenericResource(const QByteArray &resourceInstanceIdentifier, c | |||
353 | }); | 289 | }); |
354 | QObject::connect(mProcessor, &CommandProcessor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); }); | 290 | QObject::connect(mProcessor, &CommandProcessor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); }); |
355 | QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, this, &Resource::revisionUpdated); | 291 | QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, this, &Resource::revisionUpdated); |
356 | mSourceChangeReplay = new ChangeReplay(resourceInstanceIdentifier, [this](const QByteArray &type, const QByteArray &key, const QByteArray &value) { | ||
357 | // This results in a deadlock when a sync is in progress and we try to create a second writing transaction (which is why we turn changereplay off during the sync) | ||
358 | auto synchronizationStore = QSharedPointer<Sink::Storage>::create(Sink::storageLocation(), mResourceInstanceIdentifier + ".synchronization", Sink::Storage::ReadWrite); | ||
359 | return this->replay(*synchronizationStore, type, key, value).then<void>([synchronizationStore]() {}); | ||
360 | }); | ||
361 | enableChangeReplay(true); | 292 | enableChangeReplay(true); |
362 | mClientLowerBoundRevision = mPipeline->cleanedUpRevision(); | 293 | mClientLowerBoundRevision = mPipeline->cleanedUpRevision(); |
363 | mProcessor->setOldestUsedRevision(mSourceChangeReplay->getLastReplayedRevision()); | 294 | mProcessor->setOldestUsedRevision(mChangeReplay->getLastReplayedRevision()); |
364 | 295 | ||
365 | mCommitQueueTimer.setInterval(sCommitInterval); | 296 | mCommitQueueTimer.setInterval(sCommitInterval); |
366 | mCommitQueueTimer.setSingleShot(true); | 297 | mCommitQueueTimer.setSingleShot(true); |
@@ -370,7 +301,6 @@ GenericResource::GenericResource(const QByteArray &resourceInstanceIdentifier, c | |||
370 | GenericResource::~GenericResource() | 301 | GenericResource::~GenericResource() |
371 | { | 302 | { |
372 | delete mProcessor; | 303 | delete mProcessor; |
373 | delete mSourceChangeReplay; | ||
374 | } | 304 | } |
375 | 305 | ||
376 | KAsync::Job<void> GenericResource::inspect( | 306 | KAsync::Job<void> GenericResource::inspect( |
@@ -383,86 +313,20 @@ KAsync::Job<void> GenericResource::inspect( | |||
383 | void GenericResource::enableChangeReplay(bool enable) | 313 | void GenericResource::enableChangeReplay(bool enable) |
384 | { | 314 | { |
385 | if (enable) { | 315 | if (enable) { |
386 | QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, mSourceChangeReplay, &ChangeReplay::revisionChanged, Qt::QueuedConnection); | 316 | QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, mChangeReplay.data(), &ChangeReplay::revisionChanged, Qt::QueuedConnection); |
387 | QObject::connect(mSourceChangeReplay, &ChangeReplay::changesReplayed, this, &GenericResource::updateLowerBoundRevision); | 317 | QObject::connect(mChangeReplay.data(), &ChangeReplay::changesReplayed, this, &GenericResource::updateLowerBoundRevision); |
388 | mSourceChangeReplay->revisionChanged(); | 318 | mChangeReplay->revisionChanged(); |
389 | } else { | 319 | } else { |
390 | QObject::disconnect(mPipeline.data(), &Pipeline::revisionUpdated, mSourceChangeReplay, &ChangeReplay::revisionChanged); | 320 | QObject::disconnect(mPipeline.data(), &Pipeline::revisionUpdated, mChangeReplay.data(), &ChangeReplay::revisionChanged); |
391 | QObject::disconnect(mSourceChangeReplay, &ChangeReplay::changesReplayed, this, &GenericResource::updateLowerBoundRevision); | 321 | QObject::disconnect(mChangeReplay.data(), &ChangeReplay::changesReplayed, this, &GenericResource::updateLowerBoundRevision); |
392 | } | 322 | } |
393 | } | 323 | } |
394 | 324 | ||
395 | void GenericResource::addType(const QByteArray &type, DomainTypeAdaptorFactoryInterface::Ptr factory, const QVector<Sink::Preprocessor *> &preprocessors) | 325 | void GenericResource::addType(const QByteArray &type, DomainTypeAdaptorFactoryInterface::Ptr factory, const QVector<Sink::Preprocessor *> &preprocessors) |
396 | { | 326 | { |
397 | mPipeline->setPreprocessors(type, preprocessors); | 327 | mPipeline->setPreprocessors(type, preprocessors); |
398 | mPipeline->setAdaptorFactory(type, factory); | ||
399 | mAdaptorFactories.insert(type, factory); | ||
400 | } | ||
401 | |||
402 | KAsync::Job<void> GenericResource::replay(Sink::Storage &synchronizationStore, const QByteArray &type, const QByteArray &key, const QByteArray &value) | ||
403 | { | ||
404 | Sink::EntityBuffer buffer(value); | ||
405 | const Sink::Entity &entity = buffer.entity(); | ||
406 | const auto metadataBuffer = Sink::EntityBuffer::readBuffer<Sink::Metadata>(entity.metadata()); | ||
407 | Q_ASSERT(metadataBuffer); | ||
408 | if (!metadataBuffer->replayToSource()) { | ||
409 | Trace() << "Change is coming from the source"; | ||
410 | return KAsync::null<void>(); | ||
411 | } | ||
412 | const qint64 revision = metadataBuffer ? metadataBuffer->revision() : -1; | ||
413 | const auto operation = metadataBuffer ? metadataBuffer->operation() : Sink::Operation_Creation; | ||
414 | const auto uid = Sink::Storage::uidFromKey(key); | ||
415 | QByteArray oldRemoteId; | ||
416 | |||
417 | if (operation != Sink::Operation_Creation) { | ||
418 | auto synchronizationTransaction = synchronizationStore.createTransaction(Sink::Storage::ReadOnly); | ||
419 | oldRemoteId = resolveLocalId(type, uid, synchronizationTransaction); | ||
420 | } | ||
421 | Trace() << "Replaying " << key << type; | ||
422 | |||
423 | KAsync::Job<QByteArray> job = KAsync::null<QByteArray>(); | ||
424 | if (type == ENTITY_TYPE_FOLDER) { | ||
425 | const Sink::ApplicationDomain::Folder folder(mResourceInstanceIdentifier, uid, revision, mAdaptorFactories.value(type)->createAdaptor(entity)); | ||
426 | job = replay(folder, operation, oldRemoteId); | ||
427 | } else if (type == ENTITY_TYPE_MAIL) { | ||
428 | const Sink::ApplicationDomain::Mail mail(mResourceInstanceIdentifier, uid, revision, mAdaptorFactories.value(type)->createAdaptor(entity)); | ||
429 | job = replay(mail, operation, oldRemoteId); | ||
430 | } | ||
431 | |||
432 | return job.then<void, QByteArray>([=, &synchronizationStore](const QByteArray &remoteId) { | ||
433 | auto synchronizationTransaction = synchronizationStore.createTransaction(Sink::Storage::ReadWrite); | ||
434 | Trace() << "Replayed change with remote id: " << remoteId; | ||
435 | if (operation == Sink::Operation_Creation) { | ||
436 | if (remoteId.isEmpty()) { | ||
437 | Warning() << "Returned an empty remoteId from the creation"; | ||
438 | } else { | ||
439 | recordRemoteId(type, uid, remoteId, synchronizationTransaction); | ||
440 | } | ||
441 | } else if (operation == Sink::Operation_Modification) { | ||
442 | if (remoteId.isEmpty()) { | ||
443 | Warning() << "Returned an empty remoteId from the creation"; | ||
444 | } else { | ||
445 | updateRemoteId(type, uid, remoteId, synchronizationTransaction); | ||
446 | } | ||
447 | } else if (operation == Sink::Operation_Removal) { | ||
448 | removeRemoteId(type, uid, remoteId, synchronizationTransaction); | ||
449 | } else { | ||
450 | Warning() << "Unkown operation" << operation; | ||
451 | } | ||
452 | }, [](int errorCode, const QString &errorMessage) { | ||
453 | Warning() << "Failed to replay change: " << errorMessage; | ||
454 | }); | ||
455 | } | 328 | } |
456 | 329 | ||
457 | KAsync::Job<QByteArray> GenericResource::replay(const ApplicationDomain::Mail &, Sink::Operation, const QByteArray &) | ||
458 | { | ||
459 | return KAsync::null<QByteArray>(); | ||
460 | } | ||
461 | |||
462 | KAsync::Job<QByteArray> GenericResource::replay(const ApplicationDomain::Folder &, Sink::Operation, const QByteArray &) | ||
463 | { | ||
464 | return KAsync::null<QByteArray>(); | ||
465 | } | ||
466 | 330 | ||
467 | void GenericResource::removeDataFromDisk() | 331 | void GenericResource::removeDataFromDisk() |
468 | { | 332 | { |
@@ -528,10 +392,8 @@ KAsync::Job<void> GenericResource::synchronizeWithSource() | |||
528 | Log() << " Synchronizing"; | 392 | Log() << " Synchronizing"; |
529 | // Changereplay would deadlock otherwise when trying to open the synchronization store | 393 | // Changereplay would deadlock otherwise when trying to open the synchronization store |
530 | enableChangeReplay(false); | 394 | enableChangeReplay(false); |
531 | auto mainStore = QSharedPointer<Sink::Storage>::create(Sink::storageLocation(), mResourceInstanceIdentifier, Sink::Storage::ReadOnly); | 395 | mSynchronizer->synchronize() |
532 | auto syncStore = QSharedPointer<Sink::Storage>::create(Sink::storageLocation(), mResourceInstanceIdentifier + ".synchronization", Sink::Storage::ReadWrite); | 396 | .then<void>([this, &future]() { |
533 | synchronizeWithSource(*mainStore, *syncStore) | ||
534 | .then<void>([this, mainStore, syncStore, &future]() { | ||
535 | Log() << "Done Synchronizing"; | 397 | Log() << "Done Synchronizing"; |
536 | enableChangeReplay(true); | 398 | enableChangeReplay(true); |
537 | future.setFinished(); | 399 | future.setFinished(); |
@@ -576,11 +438,11 @@ KAsync::Job<void> GenericResource::processAllMessages() | |||
576 | .then<void>([this](KAsync::Future<void> &f) { waitForDrained(f, mSynchronizerQueue); }) | 438 | .then<void>([this](KAsync::Future<void> &f) { waitForDrained(f, mSynchronizerQueue); }) |
577 | .then<void>([this](KAsync::Future<void> &f) { waitForDrained(f, mUserQueue); }) | 439 | .then<void>([this](KAsync::Future<void> &f) { waitForDrained(f, mUserQueue); }) |
578 | .then<void>([this](KAsync::Future<void> &f) { | 440 | .then<void>([this](KAsync::Future<void> &f) { |
579 | if (mSourceChangeReplay->allChangesReplayed()) { | 441 | if (mChangeReplay->allChangesReplayed()) { |
580 | f.setFinished(); | 442 | f.setFinished(); |
581 | } else { | 443 | } else { |
582 | auto context = new QObject; | 444 | auto context = new QObject; |
583 | QObject::connect(mSourceChangeReplay, &ChangeReplay::changesReplayed, context, [&f, context]() { | 445 | QObject::connect(mChangeReplay.data(), &ChangeReplay::changesReplayed, context, [&f, context]() { |
584 | delete context; | 446 | delete context; |
585 | f.setFinished(); | 447 | f.setFinished(); |
586 | }); | 448 | }); |
@@ -590,7 +452,7 @@ KAsync::Job<void> GenericResource::processAllMessages() | |||
590 | 452 | ||
591 | void GenericResource::updateLowerBoundRevision() | 453 | void GenericResource::updateLowerBoundRevision() |
592 | { | 454 | { |
593 | mProcessor->setOldestUsedRevision(qMin(mClientLowerBoundRevision, mSourceChangeReplay->getLastReplayedRevision())); | 455 | mProcessor->setOldestUsedRevision(qMin(mClientLowerBoundRevision, mChangeReplay->getLastReplayedRevision())); |
594 | } | 456 | } |
595 | 457 | ||
596 | void GenericResource::setLowerBoundRevision(qint64 revision) | 458 | void GenericResource::setLowerBoundRevision(qint64 revision) |
@@ -599,7 +461,139 @@ void GenericResource::setLowerBoundRevision(qint64 revision) | |||
599 | updateLowerBoundRevision(); | 461 | updateLowerBoundRevision(); |
600 | } | 462 | } |
601 | 463 | ||
602 | void GenericResource::createEntity(const QByteArray &sinkId, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject, | 464 | |
465 | |||
466 | |||
467 | EntityStore::EntityStore(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier, Sink::Storage::Transaction &transaction) | ||
468 | : mResourceType(resourceType), mResourceInstanceIdentifier(resourceInstanceIdentifier), | ||
469 | mTransaction(transaction) | ||
470 | { | ||
471 | |||
472 | } | ||
473 | |||
474 | template<typename T> | ||
475 | T EntityStore::read(const QByteArray &identifier) const | ||
476 | { | ||
477 | auto typeName = ApplicationDomain::getTypeName<T>(); | ||
478 | auto mainDatabase = Storage::mainDatabase(mTransaction, typeName); | ||
479 | auto bufferAdaptor = getLatest(mainDatabase, identifier, *Sink::AdaptorFactoryRegistry::instance().getFactory<T>(mResourceType)); | ||
480 | Q_ASSERT(bufferAdaptor); | ||
481 | return T(mResourceInstanceIdentifier, identifier, 0, bufferAdaptor); | ||
482 | } | ||
483 | |||
484 | QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> EntityStore::getLatest(const Sink::Storage::NamedDatabase &db, const QByteArray &uid, DomainTypeAdaptorFactoryInterface &adaptorFactory) | ||
485 | { | ||
486 | QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> current; | ||
487 | db.findLatest(uid, | ||
488 | [¤t, &adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool { | ||
489 | Sink::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); | ||
490 | if (!buffer.isValid()) { | ||
491 | Warning() << "Read invalid buffer from disk"; | ||
492 | } else { | ||
493 | Trace() << "Found value " << key; | ||
494 | current = adaptorFactory.createAdaptor(buffer.entity()); | ||
495 | } | ||
496 | return false; | ||
497 | }, | ||
498 | [](const Sink::Storage::Error &error) { Warning() << "Failed to read current value from storage: " << error.message; }); | ||
499 | return current; | ||
500 | } | ||
501 | |||
502 | |||
503 | |||
504 | SyncStore::SyncStore(Sink::Storage::Transaction &transaction) | ||
505 | : mTransaction(transaction) | ||
506 | { | ||
507 | |||
508 | } | ||
509 | |||
510 | void SyncStore::recordRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId) | ||
511 | { | ||
512 | Index("rid.mapping." + bufferType, mTransaction).add(remoteId, localId); | ||
513 | Index("localid.mapping." + bufferType, mTransaction).add(localId, remoteId); | ||
514 | } | ||
515 | |||
516 | void SyncStore::removeRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId) | ||
517 | { | ||
518 | Index("rid.mapping." + bufferType, mTransaction).remove(remoteId, localId); | ||
519 | Index("localid.mapping." + bufferType, mTransaction).remove(localId, remoteId); | ||
520 | } | ||
521 | |||
522 | void SyncStore::updateRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId) | ||
523 | { | ||
524 | const auto oldRemoteId = Index("localid.mapping." + bufferType, mTransaction).lookup(localId); | ||
525 | removeRemoteId(bufferType, localId, oldRemoteId); | ||
526 | recordRemoteId(bufferType, localId, remoteId); | ||
527 | } | ||
528 | |||
529 | QByteArray SyncStore::resolveRemoteId(const QByteArray &bufferType, const QByteArray &remoteId) | ||
530 | { | ||
531 | // Lookup local id for remote id, or insert a new pair otherwise | ||
532 | Index index("rid.mapping." + bufferType, mTransaction); | ||
533 | QByteArray sinkId = index.lookup(remoteId); | ||
534 | if (sinkId.isEmpty()) { | ||
535 | sinkId = QUuid::createUuid().toString().toUtf8(); | ||
536 | index.add(remoteId, sinkId); | ||
537 | Index("localid.mapping." + bufferType, mTransaction).add(sinkId, remoteId); | ||
538 | } | ||
539 | return sinkId; | ||
540 | } | ||
541 | |||
542 | QByteArray SyncStore::resolveLocalId(const QByteArray &bufferType, const QByteArray &localId) | ||
543 | { | ||
544 | QByteArray remoteId = Index("localid.mapping." + bufferType, mTransaction).lookup(localId); | ||
545 | if (remoteId.isEmpty()) { | ||
546 | Warning() << "Couldn't find the remote id for " << localId; | ||
547 | return QByteArray(); | ||
548 | } | ||
549 | return remoteId; | ||
550 | } | ||
551 | |||
552 | |||
553 | |||
554 | |||
555 | |||
556 | |||
557 | |||
558 | |||
559 | Synchronizer::Synchronizer(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier) | ||
560 | : mStorage(Sink::storageLocation(), resourceInstanceIdentifier, Sink::Storage::ReadOnly), | ||
561 | mSyncStorage(Sink::storageLocation(), resourceInstanceIdentifier + ".synchronization", Sink::Storage::ReadWrite), | ||
562 | mResourceType(resourceType), | ||
563 | mResourceInstanceIdentifier(resourceInstanceIdentifier) | ||
564 | { | ||
565 | Trace() << "Starting synchronizer: " << resourceType << resourceInstanceIdentifier; | ||
566 | |||
567 | } | ||
568 | |||
569 | void Synchronizer::setup(const std::function<void(int commandId, const QByteArray &data)> &enqueueCommandCallback) | ||
570 | { | ||
571 | mEnqueue = enqueueCommandCallback; | ||
572 | } | ||
573 | |||
574 | void Synchronizer::enqueueCommand(int commandId, const QByteArray &data) | ||
575 | { | ||
576 | Q_ASSERT(mEnqueue); | ||
577 | mEnqueue(commandId, data); | ||
578 | } | ||
579 | |||
580 | EntityStore &Synchronizer::store() | ||
581 | { | ||
582 | if (!mEntityStore) { | ||
583 | mEntityStore = QSharedPointer<EntityStore>::create(mResourceType, mResourceInstanceIdentifier, mTransaction); | ||
584 | } | ||
585 | return *mEntityStore; | ||
586 | } | ||
587 | |||
588 | SyncStore &Synchronizer::syncStore() | ||
589 | { | ||
590 | if (!mSyncStore) { | ||
591 | mSyncStore = QSharedPointer<SyncStore>::create(mSyncTransaction); | ||
592 | } | ||
593 | return *mSyncStore; | ||
594 | } | ||
595 | |||
596 | void Synchronizer::createEntity(const QByteArray &sinkId, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject, | ||
603 | DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function<void(const QByteArray &)> callback) | 597 | DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function<void(const QByteArray &)> callback) |
604 | { | 598 | { |
605 | // These changes are coming from the source | 599 | // These changes are coming from the source |
@@ -616,7 +610,7 @@ void GenericResource::createEntity(const QByteArray &sinkId, const QByteArray &b | |||
616 | callback(BufferUtils::extractBuffer(fbb)); | 610 | callback(BufferUtils::extractBuffer(fbb)); |
617 | } | 611 | } |
618 | 612 | ||
619 | void GenericResource::modifyEntity(const QByteArray &sinkId, qint64 revision, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject, | 613 | void Synchronizer::modifyEntity(const QByteArray &sinkId, qint64 revision, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject, |
620 | DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function<void(const QByteArray &)> callback) | 614 | DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function<void(const QByteArray &)> callback) |
621 | { | 615 | { |
622 | // These changes are coming from the source | 616 | // These changes are coming from the source |
@@ -634,7 +628,7 @@ void GenericResource::modifyEntity(const QByteArray &sinkId, qint64 revision, co | |||
634 | callback(BufferUtils::extractBuffer(fbb)); | 628 | callback(BufferUtils::extractBuffer(fbb)); |
635 | } | 629 | } |
636 | 630 | ||
637 | void GenericResource::deleteEntity(const QByteArray &sinkId, qint64 revision, const QByteArray &bufferType, std::function<void(const QByteArray &)> callback) | 631 | void Synchronizer::deleteEntity(const QByteArray &sinkId, qint64 revision, const QByteArray &bufferType, std::function<void(const QByteArray &)> callback) |
638 | { | 632 | { |
639 | // These changes are coming from the source | 633 | // These changes are coming from the source |
640 | const auto replayToSource = false; | 634 | const auto replayToSource = false; |
@@ -647,96 +641,36 @@ void GenericResource::deleteEntity(const QByteArray &sinkId, qint64 revision, co | |||
647 | callback(BufferUtils::extractBuffer(fbb)); | 641 | callback(BufferUtils::extractBuffer(fbb)); |
648 | } | 642 | } |
649 | 643 | ||
650 | void GenericResource::recordRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId, Sink::Storage::Transaction &transaction) | 644 | void Synchronizer::scanForRemovals(const QByteArray &bufferType, const std::function<void(const std::function<void(const QByteArray &key)> &callback)> &entryGenerator, std::function<bool(const QByteArray &remoteId)> exists) |
651 | { | ||
652 | Index("rid.mapping." + bufferType, transaction).add(remoteId, localId); | ||
653 | ; | ||
654 | Index("localid.mapping." + bufferType, transaction).add(localId, remoteId); | ||
655 | } | ||
656 | |||
657 | void GenericResource::removeRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId, Sink::Storage::Transaction &transaction) | ||
658 | { | ||
659 | Index("rid.mapping." + bufferType, transaction).remove(remoteId, localId); | ||
660 | Index("localid.mapping." + bufferType, transaction).remove(localId, remoteId); | ||
661 | } | ||
662 | |||
663 | void GenericResource::updateRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId, Sink::Storage::Transaction &transaction) | ||
664 | { | ||
665 | const auto oldRemoteId = Index("localid.mapping." + bufferType, transaction).lookup(localId); | ||
666 | removeRemoteId(bufferType, localId, oldRemoteId, transaction); | ||
667 | recordRemoteId(bufferType, localId, remoteId, transaction); | ||
668 | } | ||
669 | |||
670 | QByteArray GenericResource::resolveRemoteId(const QByteArray &bufferType, const QByteArray &remoteId, Sink::Storage::Transaction &transaction) | ||
671 | { | ||
672 | // Lookup local id for remote id, or insert a new pair otherwise | ||
673 | Index index("rid.mapping." + bufferType, transaction); | ||
674 | QByteArray sinkId = index.lookup(remoteId); | ||
675 | if (sinkId.isEmpty()) { | ||
676 | sinkId = QUuid::createUuid().toString().toUtf8(); | ||
677 | index.add(remoteId, sinkId); | ||
678 | Index("localid.mapping." + bufferType, transaction).add(sinkId, remoteId); | ||
679 | } | ||
680 | return sinkId; | ||
681 | } | ||
682 | |||
683 | QByteArray GenericResource::resolveLocalId(const QByteArray &bufferType, const QByteArray &localId, Sink::Storage::Transaction &transaction) | ||
684 | { | ||
685 | QByteArray remoteId = Index("localid.mapping." + bufferType, transaction).lookup(localId); | ||
686 | if (remoteId.isEmpty()) { | ||
687 | Warning() << "Couldn't find the remote id for " << localId; | ||
688 | return QByteArray(); | ||
689 | } | ||
690 | return remoteId; | ||
691 | } | ||
692 | |||
693 | void GenericResource::scanForRemovals(Sink::Storage::Transaction &transaction, Sink::Storage::Transaction &synchronizationTransaction, const QByteArray &bufferType, | ||
694 | const std::function<void(const std::function<void(const QByteArray &key)> &callback)> &entryGenerator, std::function<bool(const QByteArray &remoteId)> exists) | ||
695 | { | 645 | { |
696 | entryGenerator([this, &transaction, bufferType, &synchronizationTransaction, &exists](const QByteArray &key) { | 646 | entryGenerator([this, bufferType, &exists](const QByteArray &key) { |
697 | auto sinkId = Sink::Storage::uidFromKey(key); | 647 | auto sinkId = Sink::Storage::uidFromKey(key); |
698 | Trace() << "Checking for removal " << key; | 648 | Trace() << "Checking for removal " << key; |
699 | const auto remoteId = resolveLocalId(bufferType, sinkId, synchronizationTransaction); | 649 | const auto remoteId = syncStore().resolveLocalId(bufferType, sinkId); |
700 | // If we have no remoteId, the entity hasn't been replayed to the source yet | 650 | // If we have no remoteId, the entity hasn't been replayed to the source yet |
701 | if (!remoteId.isEmpty()) { | 651 | if (!remoteId.isEmpty()) { |
702 | if (!exists(remoteId)) { | 652 | if (!exists(remoteId)) { |
703 | Trace() << "Found a removed entity: " << sinkId; | 653 | Trace() << "Found a removed entity: " << sinkId; |
704 | deleteEntity(sinkId, Sink::Storage::maxRevision(transaction), bufferType, | 654 | deleteEntity(sinkId, Sink::Storage::maxRevision(mTransaction), bufferType, |
705 | [this](const QByteArray &buffer) { enqueueCommand(mSynchronizerQueue, Sink::Commands::DeleteEntityCommand, buffer); }); | 655 | [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::DeleteEntityCommand, buffer); }); |
706 | } | 656 | } |
707 | } | 657 | } |
708 | }); | 658 | }); |
709 | } | 659 | } |
710 | 660 | ||
711 | QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> GenericResource::getLatest(const Sink::Storage::NamedDatabase &db, const QByteArray &uid, DomainTypeAdaptorFactoryInterface &adaptorFactory) | 661 | void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity) |
712 | { | ||
713 | QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> current; | ||
714 | db.findLatest(uid, | ||
715 | [¤t, &adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool { | ||
716 | Sink::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); | ||
717 | if (!buffer.isValid()) { | ||
718 | Warning() << "Read invalid buffer from disk"; | ||
719 | } else { | ||
720 | current = adaptorFactory.createAdaptor(buffer.entity()); | ||
721 | } | ||
722 | return false; | ||
723 | }, | ||
724 | [](const Sink::Storage::Error &error) { Warning() << "Failed to read current value from storage: " << error.message; }); | ||
725 | return current; | ||
726 | } | ||
727 | |||
728 | void GenericResource::createOrModify(Sink::Storage::Transaction &transaction, Sink::Storage::Transaction &synchronizationTransaction, | ||
729 | DomainTypeAdaptorFactoryInterface &adaptorFactory, const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity) | ||
730 | { | 662 | { |
731 | auto mainDatabase = Storage::mainDatabase(transaction, bufferType); | 663 | Trace() << "Create or modify" << bufferType << remoteId; |
732 | const auto sinkId = resolveRemoteId(bufferType, remoteId, synchronizationTransaction); | 664 | auto mainDatabase = Storage::mainDatabase(mTransaction, bufferType); |
665 | const auto sinkId = syncStore().resolveRemoteId(bufferType, remoteId); | ||
733 | const auto found = mainDatabase.contains(sinkId); | 666 | const auto found = mainDatabase.contains(sinkId); |
667 | auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(mResourceType, bufferType); | ||
734 | if (!found) { | 668 | if (!found) { |
735 | Trace() << "Found a new entity: " << remoteId; | 669 | Trace() << "Found a new entity: " << remoteId; |
736 | createEntity( | 670 | createEntity( |
737 | sinkId, bufferType, entity, adaptorFactory, [this](const QByteArray &buffer) { enqueueCommand(mSynchronizerQueue, Sink::Commands::CreateEntityCommand, buffer); }); | 671 | sinkId, bufferType, entity, *adaptorFactory, [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::CreateEntityCommand, buffer); }); |
738 | } else { // modification | 672 | } else { // modification |
739 | if (auto current = getLatest(mainDatabase, sinkId, adaptorFactory)) { | 673 | if (auto current = store().getLatest(mainDatabase, sinkId, *adaptorFactory)) { |
740 | bool changed = false; | 674 | bool changed = false; |
741 | for (const auto &property : entity.changedProperties()) { | 675 | for (const auto &property : entity.changedProperties()) { |
742 | if (entity.getProperty(property) != current->getProperty(property)) { | 676 | if (entity.getProperty(property) != current->getProperty(property)) { |
@@ -746,8 +680,8 @@ void GenericResource::createOrModify(Sink::Storage::Transaction &transaction, Si | |||
746 | } | 680 | } |
747 | if (changed) { | 681 | if (changed) { |
748 | Trace() << "Found a modified entity: " << remoteId; | 682 | Trace() << "Found a modified entity: " << remoteId; |
749 | modifyEntity(sinkId, Sink::Storage::maxRevision(transaction), bufferType, entity, adaptorFactory, | 683 | modifyEntity(sinkId, Sink::Storage::maxRevision(mTransaction), bufferType, entity, *adaptorFactory, |
750 | [this](const QByteArray &buffer) { enqueueCommand(mSynchronizerQueue, Sink::Commands::ModifyEntityCommand, buffer); }); | 684 | [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::ModifyEntityCommand, buffer); }); |
751 | } | 685 | } |
752 | } else { | 686 | } else { |
753 | Warning() << "Failed to get current entity"; | 687 | Warning() << "Failed to get current entity"; |
@@ -755,6 +689,118 @@ void GenericResource::createOrModify(Sink::Storage::Transaction &transaction, Si | |||
755 | } | 689 | } |
756 | } | 690 | } |
757 | 691 | ||
692 | KAsync::Job<void> Synchronizer::synchronize() | ||
693 | { | ||
694 | mTransaction = mStorage.createTransaction(Sink::Storage::ReadOnly); | ||
695 | mSyncTransaction = mSyncStorage.createTransaction(Sink::Storage::ReadWrite); | ||
696 | return synchronizeWithSource().then<void>([this]() { | ||
697 | mTransaction.abort(); | ||
698 | mSyncTransaction.commit(); | ||
699 | mSyncStore.clear(); | ||
700 | mEntityStore.clear(); | ||
701 | }); | ||
702 | } | ||
703 | |||
704 | |||
705 | |||
706 | SourceWriteBack::SourceWriteBack(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier) | ||
707 | : ChangeReplay(resourceInstanceIdentifier), | ||
708 | mSyncStorage(Sink::storageLocation(), resourceInstanceIdentifier + ".synchronization", Sink::Storage::ReadWrite), | ||
709 | mResourceType(resourceType), | ||
710 | mResourceInstanceIdentifier(resourceInstanceIdentifier) | ||
711 | { | ||
712 | |||
713 | } | ||
714 | |||
715 | EntityStore &SourceWriteBack::store() | ||
716 | { | ||
717 | if (!mEntityStore) { | ||
718 | mEntityStore = QSharedPointer<EntityStore>::create(mResourceType, mResourceInstanceIdentifier, mTransaction); | ||
719 | } | ||
720 | return *mEntityStore; | ||
721 | } | ||
722 | |||
723 | SyncStore &SourceWriteBack::syncStore() | ||
724 | { | ||
725 | if (!mSyncStore) { | ||
726 | mSyncStore = QSharedPointer<SyncStore>::create(mSyncTransaction); | ||
727 | } | ||
728 | return *mSyncStore; | ||
729 | } | ||
730 | |||
731 | KAsync::Job<void> SourceWriteBack::replay(const QByteArray &type, const QByteArray &key, const QByteArray &value) | ||
732 | { | ||
733 | mTransaction = mStorage.createTransaction(Sink::Storage::ReadOnly); | ||
734 | mSyncTransaction = mSyncStorage.createTransaction(Sink::Storage::ReadWrite); | ||
735 | |||
736 | Sink::EntityBuffer buffer(value); | ||
737 | const Sink::Entity &entity = buffer.entity(); | ||
738 | const auto metadataBuffer = Sink::EntityBuffer::readBuffer<Sink::Metadata>(entity.metadata()); | ||
739 | Q_ASSERT(metadataBuffer); | ||
740 | if (!metadataBuffer->replayToSource()) { | ||
741 | Trace() << "Change is coming from the source"; | ||
742 | return KAsync::null<void>(); | ||
743 | } | ||
744 | const qint64 revision = metadataBuffer ? metadataBuffer->revision() : -1; | ||
745 | const auto operation = metadataBuffer ? metadataBuffer->operation() : Sink::Operation_Creation; | ||
746 | const auto uid = Sink::Storage::uidFromKey(key); | ||
747 | QByteArray oldRemoteId; | ||
748 | |||
749 | if (operation != Sink::Operation_Creation) { | ||
750 | oldRemoteId = syncStore().resolveLocalId(type, uid); | ||
751 | } | ||
752 | Trace() << "Replaying " << key << type; | ||
753 | |||
754 | KAsync::Job<QByteArray> job = KAsync::null<QByteArray>(); | ||
755 | if (type == ENTITY_TYPE_FOLDER) { | ||
756 | auto folder = store().read<ApplicationDomain::Folder>(uid); | ||
757 | // const Sink::ApplicationDomain::Folder folder(mResourceInstanceIdentifier, uid, revision, mAdaptorFactories.value(type)->createAdaptor(entity)); | ||
758 | job = replay(folder, operation, oldRemoteId); | ||
759 | } else if (type == ENTITY_TYPE_MAIL) { | ||
760 | auto mail = store().read<ApplicationDomain::Mail>(uid); | ||
761 | // const Sink::ApplicationDomain::Mail mail(mResourceInstanceIdentifier, uid, revision, mAdaptorFactories.value(type)->createAdaptor(entity)); | ||
762 | job = replay(mail, operation, oldRemoteId); | ||
763 | } | ||
764 | |||
765 | return job.then<void, QByteArray>([this, operation, type, uid](const QByteArray &remoteId) { | ||
766 | Trace() << "Replayed change with remote id: " << remoteId; | ||
767 | if (operation == Sink::Operation_Creation) { | ||
768 | if (remoteId.isEmpty()) { | ||
769 | Warning() << "Returned an empty remoteId from the creation"; | ||
770 | } else { | ||
771 | syncStore().recordRemoteId(type, uid, remoteId); | ||
772 | } | ||
773 | } else if (operation == Sink::Operation_Modification) { | ||
774 | if (remoteId.isEmpty()) { | ||
775 | Warning() << "Returned an empty remoteId from the creation"; | ||
776 | } else { | ||
777 | syncStore().updateRemoteId(type, uid, remoteId); | ||
778 | } | ||
779 | } else if (operation == Sink::Operation_Removal) { | ||
780 | syncStore().removeRemoteId(type, uid, remoteId); | ||
781 | } else { | ||
782 | Warning() << "Unkown operation" << operation; | ||
783 | } | ||
784 | |||
785 | mTransaction.abort(); | ||
786 | mSyncTransaction.commit(); | ||
787 | mSyncStore.clear(); | ||
788 | mEntityStore.clear(); | ||
789 | }, [](int errorCode, const QString &errorMessage) { | ||
790 | Warning() << "Failed to replay change: " << errorMessage; | ||
791 | }); | ||
792 | } | ||
793 | |||
794 | KAsync::Job<QByteArray> SourceWriteBack::replay(const ApplicationDomain::Mail &, Sink::Operation, const QByteArray &) | ||
795 | { | ||
796 | return KAsync::null<QByteArray>(); | ||
797 | } | ||
798 | |||
799 | KAsync::Job<QByteArray> SourceWriteBack::replay(const ApplicationDomain::Folder &, Sink::Operation, const QByteArray &) | ||
800 | { | ||
801 | return KAsync::null<QByteArray>(); | ||
802 | } | ||
803 | |||
758 | 804 | ||
759 | #pragma clang diagnostic push | 805 | #pragma clang diagnostic push |
760 | #pragma clang diagnostic ignored "-Wundefined-reinterpret-cast" | 806 | #pragma clang diagnostic ignored "-Wundefined-reinterpret-cast" |
diff --git a/common/genericresource.h b/common/genericresource.h index c551e29..45d5d3a 100644 --- a/common/genericresource.h +++ b/common/genericresource.h | |||
@@ -24,14 +24,16 @@ | |||
24 | #include <messagequeue.h> | 24 | #include <messagequeue.h> |
25 | #include <flatbuffers/flatbuffers.h> | 25 | #include <flatbuffers/flatbuffers.h> |
26 | #include <domainadaptor.h> | 26 | #include <domainadaptor.h> |
27 | #include "changereplay.h" | ||
28 | |||
27 | #include <QTimer> | 29 | #include <QTimer> |
28 | 30 | ||
29 | class CommandProcessor; | 31 | class CommandProcessor; |
30 | class ChangeReplay; | ||
31 | 32 | ||
32 | namespace Sink { | 33 | namespace Sink { |
33 | class Pipeline; | 34 | class Pipeline; |
34 | class Preprocessor; | 35 | class Preprocessor; |
36 | class Synchronizer; | ||
35 | 37 | ||
36 | /** | 38 | /** |
37 | * Generic Resource implementation. | 39 | * Generic Resource implementation. |
@@ -39,7 +41,7 @@ class Preprocessor; | |||
39 | class SINK_EXPORT GenericResource : public Resource | 41 | class SINK_EXPORT GenericResource : public Resource |
40 | { | 42 | { |
41 | public: | 43 | public: |
42 | GenericResource(const QByteArray &resourceInstanceIdentifier, const QSharedPointer<Pipeline> &pipeline = QSharedPointer<Pipeline>()); | 44 | GenericResource(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier, const QSharedPointer<Pipeline> &pipeline, const QSharedPointer<ChangeReplay> &changeReplay, const QSharedPointer<Synchronizer> &synchronizer); |
43 | virtual ~GenericResource(); | 45 | virtual ~GenericResource(); |
44 | 46 | ||
45 | virtual void processCommand(int commandId, const QByteArray &data) Q_DECL_OVERRIDE; | 47 | virtual void processCommand(int commandId, const QByteArray &data) Q_DECL_OVERRIDE; |
@@ -64,41 +66,90 @@ protected: | |||
64 | 66 | ||
65 | void addType(const QByteArray &type, DomainTypeAdaptorFactoryInterface::Ptr factory, const QVector<Sink::Preprocessor *> &preprocessors); | 67 | void addType(const QByteArray &type, DomainTypeAdaptorFactoryInterface::Ptr factory, const QVector<Sink::Preprocessor *> &preprocessors); |
66 | 68 | ||
67 | ///Base implementation call the replay$Type calls | ||
68 | virtual KAsync::Job<void> replay(Sink::Storage &synchronizationStore, const QByteArray &type, const QByteArray &key, const QByteArray &value); | ||
69 | ///Implement to write back changes to the server | ||
70 | virtual KAsync::Job<QByteArray> replay(const Sink::ApplicationDomain::Mail &, Sink::Operation, const QByteArray &oldRemoteId); | ||
71 | virtual KAsync::Job<QByteArray> replay(const Sink::ApplicationDomain::Folder &, Sink::Operation, const QByteArray &oldRemoteId); | ||
72 | |||
73 | void onProcessorError(int errorCode, const QString &errorMessage); | 69 | void onProcessorError(int errorCode, const QString &errorMessage); |
74 | void enqueueCommand(MessageQueue &mq, int commandId, const QByteArray &data); | 70 | void enqueueCommand(MessageQueue &mq, int commandId, const QByteArray &data); |
75 | 71 | ||
76 | static void createEntity(const QByteArray &localId, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject, | 72 | MessageQueue mUserQueue; |
77 | DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function<void(const QByteArray &)> callback); | 73 | MessageQueue mSynchronizerQueue; |
78 | static void modifyEntity(const QByteArray &localId, qint64 revision, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject, | 74 | QByteArray mResourceType; |
79 | DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function<void(const QByteArray &)> callback); | 75 | QByteArray mResourceInstanceIdentifier; |
80 | static void deleteEntity(const QByteArray &localId, qint64 revision, const QByteArray &bufferType, std::function<void(const QByteArray &)> callback); | 76 | QSharedPointer<Pipeline> mPipeline; |
77 | |||
78 | private: | ||
79 | CommandProcessor *mProcessor; | ||
80 | QSharedPointer<ChangeReplay> mChangeReplay; | ||
81 | QSharedPointer<Synchronizer> mSynchronizer; | ||
82 | int mError; | ||
83 | QTimer mCommitQueueTimer; | ||
84 | qint64 mClientLowerBoundRevision; | ||
85 | QHash<QByteArray, DomainTypeAdaptorFactoryInterface::Ptr> mAdaptorFactories; | ||
86 | }; | ||
87 | |||
88 | class SINK_EXPORT SyncStore | ||
89 | { | ||
90 | public: | ||
91 | SyncStore(Sink::Storage::Transaction &); | ||
81 | 92 | ||
82 | /** | 93 | /** |
83 | * Records a localId to remoteId mapping | 94 | * Records a localId to remoteId mapping |
84 | */ | 95 | */ |
85 | void recordRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId, Sink::Storage::Transaction &transaction); | 96 | void recordRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId); |
86 | void removeRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId, Sink::Storage::Transaction &transaction); | 97 | void removeRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId); |
87 | void updateRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId, Sink::Storage::Transaction &transaction); | 98 | void updateRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId); |
88 | 99 | ||
89 | /** | 100 | /** |
90 | * Tries to find a local id for the remote id, and creates a new local id otherwise. | 101 | * Tries to find a local id for the remote id, and creates a new local id otherwise. |
91 | * | 102 | * |
92 | * The new local id is recorded in the local to remote id mapping. | 103 | * The new local id is recorded in the local to remote id mapping. |
93 | */ | 104 | */ |
94 | QByteArray resolveRemoteId(const QByteArray &type, const QByteArray &remoteId, Sink::Storage::Transaction &transaction); | 105 | QByteArray resolveRemoteId(const QByteArray &type, const QByteArray &remoteId); |
95 | 106 | ||
96 | /** | 107 | /** |
97 | * Tries to find a remote id for a local id. | 108 | * Tries to find a remote id for a local id. |
98 | * | 109 | * |
99 | * This can fail if the entity hasn't been written back to the server yet. | 110 | * This can fail if the entity hasn't been written back to the server yet. |
100 | */ | 111 | */ |
101 | QByteArray resolveLocalId(const QByteArray &bufferType, const QByteArray &localId, Sink::Storage::Transaction &transaction); | 112 | QByteArray resolveLocalId(const QByteArray &bufferType, const QByteArray &localId); |
113 | |||
114 | private: | ||
115 | Sink::Storage::Transaction &mTransaction; | ||
116 | }; | ||
117 | |||
118 | class SINK_EXPORT EntityStore | ||
119 | { | ||
120 | public: | ||
121 | EntityStore(const QByteArray &resourceType, const QByteArray &mResourceInstanceIdentifier, Sink::Storage::Transaction &transaction); | ||
122 | |||
123 | template<typename T> | ||
124 | T read(const QByteArray &identifier) const; | ||
125 | |||
126 | static QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> getLatest(const Sink::Storage::NamedDatabase &db, const QByteArray &uid, DomainTypeAdaptorFactoryInterface &adaptorFactory); | ||
127 | private: | ||
128 | QByteArray mResourceType; | ||
129 | QByteArray mResourceInstanceIdentifier; | ||
130 | Sink::Storage::Transaction &mTransaction; | ||
131 | }; | ||
132 | |||
133 | /** | ||
134 | * Synchronize and add what we don't already have to local queue | ||
135 | */ | ||
136 | class SINK_EXPORT Synchronizer | ||
137 | { | ||
138 | public: | ||
139 | Synchronizer(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier); | ||
140 | |||
141 | void setup(const std::function<void(int commandId, const QByteArray &data)> &enqueueCommandCallback); | ||
142 | KAsync::Job<void> synchronize(); | ||
143 | |||
144 | protected: | ||
145 | ///Calls the callback to enqueue the command | ||
146 | void enqueueCommand(int commandId, const QByteArray &data); | ||
147 | |||
148 | static void createEntity(const QByteArray &localId, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject, | ||
149 | DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function<void(const QByteArray &)> callback); | ||
150 | static void modifyEntity(const QByteArray &localId, qint64 revision, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject, | ||
151 | DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function<void(const QByteArray &)> callback); | ||
152 | static void deleteEntity(const QByteArray &localId, qint64 revision, const QByteArray &bufferType, std::function<void(const QByteArray &)> callback); | ||
102 | 153 | ||
103 | /** | 154 | /** |
104 | * A synchronous algorithm to remove entities that are no longer existing. | 155 | * A synchronous algorithm to remove entities that are no longer existing. |
@@ -110,7 +161,7 @@ protected: | |||
110 | * | 161 | * |
111 | * All functions are called synchronously, and both @param entryGenerator and @param exists need to be synchronous. | 162 | * All functions are called synchronously, and both @param entryGenerator and @param exists need to be synchronous. |
112 | */ | 163 | */ |
113 | void scanForRemovals(Sink::Storage::Transaction &transaction, Sink::Storage::Transaction &synchronizationTransaction, const QByteArray &bufferType, | 164 | void scanForRemovals(const QByteArray &bufferType, |
114 | const std::function<void(const std::function<void(const QByteArray &key)> &callback)> &entryGenerator, std::function<bool(const QByteArray &remoteId)> exists); | 165 | const std::function<void(const std::function<void(const QByteArray &key)> &callback)> &entryGenerator, std::function<bool(const QByteArray &remoteId)> exists); |
115 | 166 | ||
116 | /** | 167 | /** |
@@ -118,22 +169,60 @@ protected: | |||
118 | * | 169 | * |
119 | * Depending on whether the entity is locally available, or has changed. | 170 | * Depending on whether the entity is locally available, or has changed. |
120 | */ | 171 | */ |
121 | void createOrModify(Sink::Storage::Transaction &transaction, Sink::Storage::Transaction &synchronizationTransaction, DomainTypeAdaptorFactoryInterface &adaptorFactory, | 172 | void createOrModify(const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity); |
122 | const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity); | ||
123 | 173 | ||
124 | static QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> getLatest(const Sink::Storage::NamedDatabase &db, const QByteArray &uid, DomainTypeAdaptorFactoryInterface &adaptorFactory); | 174 | //Read only access to main storage |
175 | EntityStore &store(); | ||
125 | 176 | ||
126 | MessageQueue mUserQueue; | 177 | //Read/Write access to sync storage |
127 | MessageQueue mSynchronizerQueue; | 178 | SyncStore &syncStore(); |
179 | |||
180 | virtual KAsync::Job<void> synchronizeWithSource() = 0; | ||
181 | |||
182 | private: | ||
183 | QSharedPointer<SyncStore> mSyncStore; | ||
184 | QSharedPointer<EntityStore> mEntityStore; | ||
185 | Sink::Storage mStorage; | ||
186 | Sink::Storage mSyncStorage; | ||
187 | QByteArray mResourceType; | ||
128 | QByteArray mResourceInstanceIdentifier; | 188 | QByteArray mResourceInstanceIdentifier; |
129 | QSharedPointer<Pipeline> mPipeline; | 189 | Sink::Storage::Transaction mTransaction; |
190 | Sink::Storage::Transaction mSyncTransaction; | ||
191 | std::function<void(int commandId, const QByteArray &data)> mEnqueue; | ||
192 | }; | ||
193 | |||
194 | /** | ||
195 | * Replay changes to the source | ||
196 | */ | ||
197 | class SINK_EXPORT SourceWriteBack : public ChangeReplay | ||
198 | { | ||
199 | public: | ||
200 | SourceWriteBack(const QByteArray &resourceType,const QByteArray &resourceInstanceIdentifier); | ||
201 | |||
202 | protected: | ||
203 | ///Base implementation calls the replay$Type calls | ||
204 | virtual KAsync::Job<void> replay(const QByteArray &type, const QByteArray &key, const QByteArray &value) Q_DECL_OVERRIDE; | ||
205 | |||
206 | protected: | ||
207 | ///Implement to write back changes to the server | ||
208 | virtual KAsync::Job<QByteArray> replay(const Sink::ApplicationDomain::Mail &, Sink::Operation, const QByteArray &oldRemoteId); | ||
209 | virtual KAsync::Job<QByteArray> replay(const Sink::ApplicationDomain::Folder &, Sink::Operation, const QByteArray &oldRemoteId); | ||
210 | |||
211 | //Read only access to main storage | ||
212 | EntityStore &store(); | ||
213 | |||
214 | //Read/Write access to sync storage | ||
215 | SyncStore &syncStore(); | ||
130 | 216 | ||
131 | private: | 217 | private: |
132 | CommandProcessor *mProcessor; | 218 | Sink::Storage mSyncStorage; |
133 | ChangeReplay *mSourceChangeReplay; | 219 | QSharedPointer<SyncStore> mSyncStore; |
134 | int mError; | 220 | QSharedPointer<EntityStore> mEntityStore; |
135 | QTimer mCommitQueueTimer; | 221 | Sink::Storage::Transaction mTransaction; |
136 | qint64 mClientLowerBoundRevision; | 222 | Sink::Storage::Transaction mSyncTransaction; |
137 | QHash<QByteArray, DomainTypeAdaptorFactoryInterface::Ptr> mAdaptorFactories; | 223 | QByteArray mResourceType; |
224 | QByteArray mResourceInstanceIdentifier; | ||
138 | }; | 225 | }; |
226 | |||
227 | |||
139 | } | 228 | } |
diff --git a/common/pipeline.cpp b/common/pipeline.cpp index 637a1b8..7863f67 100644 --- a/common/pipeline.cpp +++ b/common/pipeline.cpp | |||
@@ -34,6 +34,7 @@ | |||
34 | #include "entitybuffer.h" | 34 | #include "entitybuffer.h" |
35 | #include "log.h" | 35 | #include "log.h" |
36 | #include "domain/applicationdomaintype.h" | 36 | #include "domain/applicationdomaintype.h" |
37 | #include "adaptorfactoryregistry.h" | ||
37 | #include "definitions.h" | 38 | #include "definitions.h" |
38 | #include "bufferutils.h" | 39 | #include "bufferutils.h" |
39 | 40 | ||
@@ -52,11 +53,11 @@ public: | |||
52 | Storage storage; | 53 | Storage storage; |
53 | Storage::Transaction transaction; | 54 | Storage::Transaction transaction; |
54 | QHash<QString, QVector<Preprocessor *>> processors; | 55 | QHash<QString, QVector<Preprocessor *>> processors; |
55 | QHash<QString, DomainTypeAdaptorFactoryInterface::Ptr> adaptorFactory; | ||
56 | bool revisionChanged; | 56 | bool revisionChanged; |
57 | void storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid); | 57 | void storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid); |
58 | QTime transactionTime; | 58 | QTime transactionTime; |
59 | int transactionItemCount; | 59 | int transactionItemCount; |
60 | QByteArray resourceType; | ||
60 | }; | 61 | }; |
61 | 62 | ||
62 | void Pipeline::Private::storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid) | 63 | void Pipeline::Private::storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid) |
@@ -84,9 +85,9 @@ void Pipeline::setPreprocessors(const QString &entityType, const QVector<Preproc | |||
84 | d->processors[entityType] = processors; | 85 | d->processors[entityType] = processors; |
85 | } | 86 | } |
86 | 87 | ||
87 | void Pipeline::setAdaptorFactory(const QString &entityType, DomainTypeAdaptorFactoryInterface::Ptr factory) | 88 | void Pipeline::setResourceType(const QByteArray &resourceType) |
88 | { | 89 | { |
89 | d->adaptorFactory.insert(entityType, factory); | 90 | d->resourceType = resourceType; |
90 | } | 91 | } |
91 | 92 | ||
92 | void Pipeline::startTransaction() | 93 | void Pipeline::startTransaction() |
@@ -102,7 +103,9 @@ void Pipeline::startTransaction() | |||
102 | Trace() << "Starting transaction."; | 103 | Trace() << "Starting transaction."; |
103 | d->transactionTime.start(); | 104 | d->transactionTime.start(); |
104 | d->transactionItemCount = 0; | 105 | d->transactionItemCount = 0; |
105 | d->transaction = std::move(storage().createTransaction(Storage::ReadWrite)); | 106 | d->transaction = std::move(storage().createTransaction(Storage::ReadWrite, [](const Sink::Storage::Error &error) { |
107 | Warning() << error.message; | ||
108 | })); | ||
106 | } | 109 | } |
107 | 110 | ||
108 | void Pipeline::commit() | 111 | void Pipeline::commit() |
@@ -189,7 +192,7 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size) | |||
189 | auto metadataBuffer = metadataBuilder.Finish(); | 192 | auto metadataBuffer = metadataBuilder.Finish(); |
190 | FinishMetadataBuffer(metadataFbb, metadataBuffer); | 193 | FinishMetadataBuffer(metadataFbb, metadataBuffer); |
191 | 194 | ||
192 | auto adaptorFactory = d->adaptorFactory.value(bufferType); | 195 | auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceType, bufferType); |
193 | if (!adaptorFactory) { | 196 | if (!adaptorFactory) { |
194 | Warning() << "no adaptor factory for type " << bufferType; | 197 | Warning() << "no adaptor factory for type " << bufferType; |
195 | return KAsync::error<qint64>(0); | 198 | return KAsync::error<qint64>(0); |
@@ -244,7 +247,7 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size) | |||
244 | } | 247 | } |
245 | 248 | ||
246 | // TODO use only readPropertyMapper and writePropertyMapper | 249 | // TODO use only readPropertyMapper and writePropertyMapper |
247 | auto adaptorFactory = d->adaptorFactory.value(bufferType); | 250 | auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceType, bufferType); |
248 | if (!adaptorFactory) { | 251 | if (!adaptorFactory) { |
249 | Warning() << "no adaptor factory for type " << bufferType; | 252 | Warning() << "no adaptor factory for type " << bufferType; |
250 | return KAsync::error<qint64>(0); | 253 | return KAsync::error<qint64>(0); |
@@ -373,7 +376,7 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size) | |||
373 | flatbuffers::FlatBufferBuilder fbb; | 376 | flatbuffers::FlatBufferBuilder fbb; |
374 | EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), 0, 0, 0, 0); | 377 | EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), 0, 0, 0, 0); |
375 | 378 | ||
376 | auto adaptorFactory = d->adaptorFactory.value(bufferType); | 379 | auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceType, bufferType); |
377 | if (!adaptorFactory) { | 380 | if (!adaptorFactory) { |
378 | Warning() << "no adaptor factory for type " << bufferType; | 381 | Warning() << "no adaptor factory for type " << bufferType; |
379 | return KAsync::error<qint64>(0); | 382 | return KAsync::error<qint64>(0); |
diff --git a/common/pipeline.h b/common/pipeline.h index c65cbfd..2ca87a4 100644 --- a/common/pipeline.h +++ b/common/pipeline.h | |||
@@ -46,13 +46,12 @@ public: | |||
46 | 46 | ||
47 | Storage &storage() const; | 47 | Storage &storage() const; |
48 | 48 | ||
49 | void setResourceType(const QByteArray &resourceType); | ||
49 | void setPreprocessors(const QString &entityType, const QVector<Preprocessor *> &preprocessors); | 50 | void setPreprocessors(const QString &entityType, const QVector<Preprocessor *> &preprocessors); |
50 | void startTransaction(); | 51 | void startTransaction(); |
51 | void commit(); | 52 | void commit(); |
52 | Storage::Transaction &transaction(); | 53 | Storage::Transaction &transaction(); |
53 | 54 | ||
54 | void setAdaptorFactory(const QString &entityType, DomainTypeAdaptorFactoryInterface::Ptr factory); | ||
55 | |||
56 | KAsync::Job<qint64> newEntity(void const *command, size_t size); | 55 | KAsync::Job<qint64> newEntity(void const *command, size_t size); |
57 | KAsync::Job<qint64> modifiedEntity(void const *command, size_t size); | 56 | KAsync::Job<qint64> modifiedEntity(void const *command, size_t size); |
58 | KAsync::Job<qint64> deletedEntity(void const *command, size_t size); | 57 | KAsync::Job<qint64> deletedEntity(void const *command, size_t size); |
diff --git a/common/resource.cpp b/common/resource.cpp index 6713686..82c9fc8 100644 --- a/common/resource.cpp +++ b/common/resource.cpp | |||
@@ -26,6 +26,7 @@ | |||
26 | #include <QPointer> | 26 | #include <QPointer> |
27 | 27 | ||
28 | #include "facadefactory.h" | 28 | #include "facadefactory.h" |
29 | #include "adaptorfactoryregistry.h" | ||
29 | 30 | ||
30 | namespace Sink { | 31 | namespace Sink { |
31 | 32 | ||
@@ -110,6 +111,7 @@ ResourceFactory *ResourceFactory::load(const QString &resourceName) | |||
110 | if (factory) { | 111 | if (factory) { |
111 | Private::s_loadedFactories.insert(resourceName, factory); | 112 | Private::s_loadedFactories.insert(resourceName, factory); |
112 | factory->registerFacades(FacadeFactory::instance()); | 113 | factory->registerFacades(FacadeFactory::instance()); |
114 | factory->registerAdaptorFactories(AdaptorFactoryRegistry::instance()); | ||
113 | // TODO: if we need more data on it const QJsonObject json = loader.metaData()[QStringLiteral("MetaData")].toObject(); | 115 | // TODO: if we need more data on it const QJsonObject json = loader.metaData()[QStringLiteral("MetaData")].toObject(); |
114 | return factory; | 116 | return factory; |
115 | } else { | 117 | } else { |
diff --git a/common/resource.h b/common/resource.h index 0e7cd11..d6c3c5f 100644 --- a/common/resource.h +++ b/common/resource.h | |||
@@ -26,6 +26,7 @@ | |||
26 | 26 | ||
27 | namespace Sink { | 27 | namespace Sink { |
28 | class FacadeFactory; | 28 | class FacadeFactory; |
29 | class AdaptorFactoryRegistry; | ||
29 | 30 | ||
30 | /** | 31 | /** |
31 | * Resource interface | 32 | * Resource interface |
@@ -81,6 +82,7 @@ public: | |||
81 | 82 | ||
82 | virtual Resource *createResource(const QByteArray &instanceIdentifier) = 0; | 83 | virtual Resource *createResource(const QByteArray &instanceIdentifier) = 0; |
83 | virtual void registerFacades(FacadeFactory &factory) = 0; | 84 | virtual void registerFacades(FacadeFactory &factory) = 0; |
85 | virtual void registerAdaptorFactories(AdaptorFactoryRegistry ®istry) {}; | ||
84 | 86 | ||
85 | private: | 87 | private: |
86 | class Private; | 88 | class Private; |
diff --git a/examples/dummyresource/resourcefactory.cpp b/examples/dummyresource/resourcefactory.cpp index 48858da..609d23e 100644 --- a/examples/dummyresource/resourcefactory.cpp +++ b/examples/dummyresource/resourcefactory.cpp | |||
@@ -35,6 +35,7 @@ | |||
35 | #include "definitions.h" | 35 | #include "definitions.h" |
36 | #include "facadefactory.h" | 36 | #include "facadefactory.h" |
37 | #include "indexupdater.h" | 37 | #include "indexupdater.h" |
38 | #include "adaptorfactoryregistry.h" | ||
38 | #include <QDate> | 39 | #include <QDate> |
39 | #include <QUuid> | 40 | #include <QUuid> |
40 | 41 | ||
@@ -43,8 +44,87 @@ | |||
43 | #define ENTITY_TYPE_MAIL "mail" | 44 | #define ENTITY_TYPE_MAIL "mail" |
44 | #define ENTITY_TYPE_FOLDER "folder" | 45 | #define ENTITY_TYPE_FOLDER "folder" |
45 | 46 | ||
47 | class DummySynchronizer : public Sink::Synchronizer { | ||
48 | public: | ||
49 | |||
50 | DummySynchronizer(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier) | ||
51 | : Sink::Synchronizer(resourceType, resourceInstanceIdentifier) | ||
52 | { | ||
53 | |||
54 | } | ||
55 | |||
56 | Sink::ApplicationDomain::Event::Ptr createEvent(const QByteArray &ridBuffer, const QMap<QString, QVariant> &data) | ||
57 | { | ||
58 | static uint8_t rawData[100]; | ||
59 | auto event = Sink::ApplicationDomain::Event::Ptr::create(); | ||
60 | event->setProperty("summary", data.value("summary").toString()); | ||
61 | event->setProperty("remoteId", ridBuffer); | ||
62 | event->setProperty("description", data.value("description").toString()); | ||
63 | event->setProperty("attachment", QByteArray::fromRawData(reinterpret_cast<const char*>(rawData), 100)); | ||
64 | return event; | ||
65 | } | ||
66 | |||
67 | Sink::ApplicationDomain::Mail::Ptr createMail(const QByteArray &ridBuffer, const QMap<QString, QVariant> &data) | ||
68 | { | ||
69 | auto mail = Sink::ApplicationDomain::Mail::Ptr::create(); | ||
70 | mail->setProperty("subject", data.value("subject").toString()); | ||
71 | mail->setProperty("senderEmail", data.value("senderEmail").toString()); | ||
72 | mail->setProperty("senderName", data.value("senderName").toString()); | ||
73 | mail->setProperty("date", data.value("date").toString()); | ||
74 | mail->setProperty("folder", syncStore().resolveRemoteId(ENTITY_TYPE_FOLDER, data.value("parentFolder").toByteArray())); | ||
75 | mail->setProperty("unread", data.value("unread").toBool()); | ||
76 | mail->setProperty("important", data.value("important").toBool()); | ||
77 | return mail; | ||
78 | } | ||
79 | |||
80 | Sink::ApplicationDomain::Folder::Ptr createFolder(const QByteArray &ridBuffer, const QMap<QString, QVariant> &data) | ||
81 | { | ||
82 | auto folder = Sink::ApplicationDomain::Folder::Ptr::create(); | ||
83 | folder->setProperty("name", data.value("name").toString()); | ||
84 | folder->setProperty("icon", data.value("icon").toString()); | ||
85 | if (!data.value("parent").toString().isEmpty()) { | ||
86 | auto sinkId = syncStore().resolveRemoteId(ENTITY_TYPE_FOLDER, data.value("parent").toByteArray()); | ||
87 | folder->setProperty("parent", sinkId); | ||
88 | } | ||
89 | return folder; | ||
90 | } | ||
91 | |||
92 | void synchronize(const QByteArray &bufferType, const QMap<QString, QMap<QString, QVariant> > &data, std::function<Sink::ApplicationDomain::ApplicationDomainType::Ptr(const QByteArray &ridBuffer, const QMap<QString, QVariant> &data)> createEntity) | ||
93 | { | ||
94 | auto time = QSharedPointer<QTime>::create(); | ||
95 | time->start(); | ||
96 | //TODO find items to remove | ||
97 | int count = 0; | ||
98 | for (auto it = data.constBegin(); it != data.constEnd(); it++) { | ||
99 | count++; | ||
100 | const auto remoteId = it.key().toUtf8(); | ||
101 | auto entity = createEntity(remoteId, it.value()); | ||
102 | createOrModify(bufferType, remoteId, *entity); | ||
103 | } | ||
104 | Trace() << "Sync of " << count << " entities of type " << bufferType << " done." << Sink::Log::TraceTime(time->elapsed()); | ||
105 | } | ||
106 | |||
107 | KAsync::Job<void> synchronizeWithSource() Q_DECL_OVERRIDE | ||
108 | { | ||
109 | Log() << " Synchronizing with the source"; | ||
110 | return KAsync::start<void>([this]() { | ||
111 | synchronize(ENTITY_TYPE_EVENT, DummyStore::instance().events(), [this](const QByteArray &ridBuffer, const QMap<QString, QVariant> &data) { | ||
112 | return createEvent(ridBuffer, data); | ||
113 | }); | ||
114 | synchronize(ENTITY_TYPE_MAIL, DummyStore::instance().mails(), [this](const QByteArray &ridBuffer, const QMap<QString, QVariant> &data) { | ||
115 | return createMail(ridBuffer, data); | ||
116 | }); | ||
117 | synchronize(ENTITY_TYPE_FOLDER, DummyStore::instance().folders(), [this](const QByteArray &ridBuffer, const QMap<QString, QVariant> &data) { | ||
118 | return createFolder(ridBuffer, data); | ||
119 | }); | ||
120 | Log() << "Done Synchronizing"; | ||
121 | }); | ||
122 | } | ||
123 | |||
124 | }; | ||
125 | |||
46 | DummyResource::DummyResource(const QByteArray &instanceIdentifier, const QSharedPointer<Sink::Pipeline> &pipeline) | 126 | DummyResource::DummyResource(const QByteArray &instanceIdentifier, const QSharedPointer<Sink::Pipeline> &pipeline) |
47 | : Sink::GenericResource(instanceIdentifier, pipeline), | 127 | : Sink::GenericResource(PLUGIN_NAME, instanceIdentifier, pipeline, QSharedPointer<Sink::NullChangeReplay>::create(), QSharedPointer<DummySynchronizer>::create(PLUGIN_NAME, instanceIdentifier)), |
48 | mEventAdaptorFactory(QSharedPointer<DummyEventAdaptorFactory>::create()), | 128 | mEventAdaptorFactory(QSharedPointer<DummyEventAdaptorFactory>::create()), |
49 | mMailAdaptorFactory(QSharedPointer<DummyMailAdaptorFactory>::create()), | 129 | mMailAdaptorFactory(QSharedPointer<DummyMailAdaptorFactory>::create()), |
50 | mFolderAdaptorFactory(QSharedPointer<DummyFolderAdaptorFactory>::create()) | 130 | mFolderAdaptorFactory(QSharedPointer<DummyFolderAdaptorFactory>::create()) |
@@ -57,80 +137,9 @@ DummyResource::DummyResource(const QByteArray &instanceIdentifier, const QShared | |||
57 | QVector<Sink::Preprocessor*>() << new DefaultIndexUpdater<Sink::ApplicationDomain::Event>); | 137 | QVector<Sink::Preprocessor*>() << new DefaultIndexUpdater<Sink::ApplicationDomain::Event>); |
58 | } | 138 | } |
59 | 139 | ||
60 | Sink::ApplicationDomain::Event::Ptr DummyResource::createEvent(const QByteArray &ridBuffer, const QMap<QString, QVariant> &data, Sink::Storage::Transaction &transaction) | 140 | DummyResource::~DummyResource() |
61 | { | ||
62 | static uint8_t rawData[100]; | ||
63 | auto event = Sink::ApplicationDomain::Event::Ptr::create(); | ||
64 | event->setProperty("summary", data.value("summary").toString()); | ||
65 | event->setProperty("remoteId", ridBuffer); | ||
66 | event->setProperty("description", data.value("description").toString()); | ||
67 | event->setProperty("attachment", QByteArray::fromRawData(reinterpret_cast<const char*>(rawData), 100)); | ||
68 | return event; | ||
69 | } | ||
70 | |||
71 | Sink::ApplicationDomain::Mail::Ptr DummyResource::createMail(const QByteArray &ridBuffer, const QMap<QString, QVariant> &data, Sink::Storage::Transaction &transaction) | ||
72 | { | 141 | { |
73 | auto mail = Sink::ApplicationDomain::Mail::Ptr::create(); | ||
74 | mail->setProperty("subject", data.value("subject").toString()); | ||
75 | mail->setProperty("senderEmail", data.value("senderEmail").toString()); | ||
76 | mail->setProperty("senderName", data.value("senderName").toString()); | ||
77 | mail->setProperty("date", data.value("date").toString()); | ||
78 | mail->setProperty("folder", resolveRemoteId(ENTITY_TYPE_FOLDER, data.value("parentFolder").toByteArray(), transaction)); | ||
79 | mail->setProperty("unread", data.value("unread").toBool()); | ||
80 | mail->setProperty("important", data.value("important").toBool()); | ||
81 | return mail; | ||
82 | } | ||
83 | 142 | ||
84 | Sink::ApplicationDomain::Folder::Ptr DummyResource::createFolder(const QByteArray &ridBuffer, const QMap<QString, QVariant> &data, Sink::Storage::Transaction &transaction) | ||
85 | { | ||
86 | auto folder = Sink::ApplicationDomain::Folder::Ptr::create(); | ||
87 | folder->setProperty("name", data.value("name").toString()); | ||
88 | folder->setProperty("icon", data.value("icon").toString()); | ||
89 | if (!data.value("parent").toString().isEmpty()) { | ||
90 | auto sinkId = resolveRemoteId(ENTITY_TYPE_FOLDER, data.value("parent").toByteArray(), transaction); | ||
91 | folder->setProperty("parent", sinkId); | ||
92 | } | ||
93 | return folder; | ||
94 | } | ||
95 | |||
96 | void DummyResource::synchronize(const QByteArray &bufferType, const QMap<QString, QMap<QString, QVariant> > &data, Sink::Storage::Transaction &transaction, Sink::Storage::Transaction &synchronizationTransaction, DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function<Sink::ApplicationDomain::ApplicationDomainType::Ptr(const QByteArray &ridBuffer, const QMap<QString, QVariant> &data, Sink::Storage::Transaction &)> createEntity) | ||
97 | { | ||
98 | auto time = QSharedPointer<QTime>::create(); | ||
99 | time->start(); | ||
100 | //TODO find items to remove | ||
101 | int count = 0; | ||
102 | for (auto it = data.constBegin(); it != data.constEnd(); it++) { | ||
103 | count++; | ||
104 | const auto remoteId = it.key().toUtf8(); | ||
105 | auto entity = createEntity(remoteId, it.value(), synchronizationTransaction); | ||
106 | createOrModify(transaction, synchronizationTransaction, adaptorFactory, bufferType, remoteId, *entity); | ||
107 | } | ||
108 | Trace() << "Sync of " << count << " entities of type " << bufferType << " done." << Sink::Log::TraceTime(time->elapsed()); | ||
109 | } | ||
110 | |||
111 | KAsync::Job<void> DummyResource::synchronizeWithSource(Sink::Storage &mainStore, Sink::Storage &synchronizationStore) | ||
112 | { | ||
113 | Log() << " Synchronizing"; | ||
114 | return KAsync::start<void>([this, &mainStore, &synchronizationStore]() { | ||
115 | auto transaction = mainStore.createTransaction(Sink::Storage::ReadOnly); | ||
116 | auto synchronizationTransaction = synchronizationStore.createTransaction(Sink::Storage::ReadWrite); | ||
117 | synchronize(ENTITY_TYPE_EVENT, DummyStore::instance().events(), transaction, synchronizationTransaction, *mEventAdaptorFactory, [this](const QByteArray &ridBuffer, const QMap<QString, QVariant> &data, Sink::Storage::Transaction &synchronizationTransaction) { | ||
118 | return createEvent(ridBuffer, data, synchronizationTransaction); | ||
119 | }); | ||
120 | synchronize(ENTITY_TYPE_MAIL, DummyStore::instance().mails(), transaction, synchronizationTransaction, *mMailAdaptorFactory, [this](const QByteArray &ridBuffer, const QMap<QString, QVariant> &data, Sink::Storage::Transaction &synchronizationTransaction) { | ||
121 | return createMail(ridBuffer, data, synchronizationTransaction); | ||
122 | }); | ||
123 | synchronize(ENTITY_TYPE_FOLDER, DummyStore::instance().folders(), transaction, synchronizationTransaction, *mFolderAdaptorFactory, [this](const QByteArray &ridBuffer, const QMap<QString, QVariant> &data, Sink::Storage::Transaction &synchronizationTransaction) { | ||
124 | return createFolder(ridBuffer, data, synchronizationTransaction); | ||
125 | }); | ||
126 | Log() << "Done Synchronizing"; | ||
127 | }); | ||
128 | } | ||
129 | |||
130 | KAsync::Job<void> DummyResource::replay(Sink::Storage &synchronizationStore, const QByteArray &type, const QByteArray &key, const QByteArray &value) | ||
131 | { | ||
132 | Trace() << "Replaying " << key; | ||
133 | return KAsync::null<void>(); | ||
134 | } | 143 | } |
135 | 144 | ||
136 | void DummyResource::removeDataFromDisk() | 145 | void DummyResource::removeDataFromDisk() |
@@ -160,6 +169,7 @@ KAsync::Job<void> DummyResource::inspect(int inspectionType, const QByteArray &i | |||
160 | return KAsync::null<void>(); | 169 | return KAsync::null<void>(); |
161 | } | 170 | } |
162 | 171 | ||
172 | |||
163 | DummyResourceFactory::DummyResourceFactory(QObject *parent) | 173 | DummyResourceFactory::DummyResourceFactory(QObject *parent) |
164 | : Sink::ResourceFactory(parent) | 174 | : Sink::ResourceFactory(parent) |
165 | { | 175 | { |
@@ -178,3 +188,10 @@ void DummyResourceFactory::registerFacades(Sink::FacadeFactory &factory) | |||
178 | factory.registerFacade<Sink::ApplicationDomain::Folder, DummyResourceFolderFacade>(PLUGIN_NAME); | 188 | factory.registerFacade<Sink::ApplicationDomain::Folder, DummyResourceFolderFacade>(PLUGIN_NAME); |
179 | } | 189 | } |
180 | 190 | ||
191 | void DummyResourceFactory::registerAdaptorFactories(Sink::AdaptorFactoryRegistry ®istry) | ||
192 | { | ||
193 | registry.registerFactory<Sink::ApplicationDomain::Folder, DummyFolderAdaptorFactory>(PLUGIN_NAME); | ||
194 | registry.registerFactory<Sink::ApplicationDomain::Mail, DummyMailAdaptorFactory>(PLUGIN_NAME); | ||
195 | registry.registerFactory<Sink::ApplicationDomain::Event, DummyEventAdaptorFactory>(PLUGIN_NAME); | ||
196 | } | ||
197 | |||
diff --git a/examples/dummyresource/resourcefactory.h b/examples/dummyresource/resourcefactory.h index 865f6e5..f73eb32 100644 --- a/examples/dummyresource/resourcefactory.h +++ b/examples/dummyresource/resourcefactory.h | |||
@@ -37,13 +37,12 @@ class DummyResource : public Sink::GenericResource | |||
37 | { | 37 | { |
38 | public: | 38 | public: |
39 | DummyResource(const QByteArray &instanceIdentifier, const QSharedPointer<Sink::Pipeline> &pipeline = QSharedPointer<Sink::Pipeline>()); | 39 | DummyResource(const QByteArray &instanceIdentifier, const QSharedPointer<Sink::Pipeline> &pipeline = QSharedPointer<Sink::Pipeline>()); |
40 | KAsync::Job<void> synchronizeWithSource(Sink::Storage &mainStore, Sink::Storage &synchronizationStore) Q_DECL_OVERRIDE; | 40 | virtual ~DummyResource(); |
41 | using GenericResource::synchronizeWithSource; | 41 | |
42 | void removeDataFromDisk() Q_DECL_OVERRIDE; | 42 | void removeDataFromDisk() Q_DECL_OVERRIDE; |
43 | static void removeFromDisk(const QByteArray &instanceIdentifier); | 43 | static void removeFromDisk(const QByteArray &instanceIdentifier); |
44 | KAsync::Job<void> inspect(int inspectionType, const QByteArray &inspectionId, const QByteArray &domainType, const QByteArray &entityId, const QByteArray &property, const QVariant &expectedValue) Q_DECL_OVERRIDE; | 44 | KAsync::Job<void> inspect(int inspectionType, const QByteArray &inspectionId, const QByteArray &domainType, const QByteArray &entityId, const QByteArray &property, const QVariant &expectedValue) Q_DECL_OVERRIDE; |
45 | private: | 45 | private: |
46 | KAsync::Job<void> replay(Sink::Storage &synchronizationStore, const QByteArray &type, const QByteArray &key, const QByteArray &value) Q_DECL_OVERRIDE; | ||
47 | Sink::ApplicationDomain::Event::Ptr createEvent(const QByteArray &rid, const QMap<QString, QVariant> &data, Sink::Storage::Transaction &); | 46 | Sink::ApplicationDomain::Event::Ptr createEvent(const QByteArray &rid, const QMap<QString, QVariant> &data, Sink::Storage::Transaction &); |
48 | Sink::ApplicationDomain::Mail::Ptr createMail(const QByteArray &rid, const QMap<QString, QVariant> &data, Sink::Storage::Transaction &); | 47 | Sink::ApplicationDomain::Mail::Ptr createMail(const QByteArray &rid, const QMap<QString, QVariant> &data, Sink::Storage::Transaction &); |
49 | Sink::ApplicationDomain::Folder::Ptr createFolder(const QByteArray &rid, const QMap<QString, QVariant> &data, Sink::Storage::Transaction &); | 48 | Sink::ApplicationDomain::Folder::Ptr createFolder(const QByteArray &rid, const QMap<QString, QVariant> &data, Sink::Storage::Transaction &); |
@@ -65,5 +64,6 @@ public: | |||
65 | 64 | ||
66 | Sink::Resource *createResource(const QByteArray &instanceIdentifier) Q_DECL_OVERRIDE; | 65 | Sink::Resource *createResource(const QByteArray &instanceIdentifier) Q_DECL_OVERRIDE; |
67 | void registerFacades(Sink::FacadeFactory &factory) Q_DECL_OVERRIDE; | 66 | void registerFacades(Sink::FacadeFactory &factory) Q_DECL_OVERRIDE; |
67 | void registerAdaptorFactories(Sink::AdaptorFactoryRegistry ®istry) Q_DECL_OVERRIDE; | ||
68 | }; | 68 | }; |
69 | 69 | ||
diff --git a/examples/maildirresource/maildirresource.h b/examples/maildirresource/maildirresource.h index e58dc16..30c565c 100644 --- a/examples/maildirresource/maildirresource.h +++ b/examples/maildirresource/maildirresource.h | |||
@@ -74,5 +74,6 @@ public: | |||
74 | 74 | ||
75 | Sink::Resource *createResource(const QByteArray &instanceIdentifier) Q_DECL_OVERRIDE; | 75 | Sink::Resource *createResource(const QByteArray &instanceIdentifier) Q_DECL_OVERRIDE; |
76 | void registerFacades(Sink::FacadeFactory &factory) Q_DECL_OVERRIDE; | 76 | void registerFacades(Sink::FacadeFactory &factory) Q_DECL_OVERRIDE; |
77 | void registerDomainTypeAdaptors(Sink::AdaptorFactory &factory) Q_DECL_OVERRIDE; | ||
77 | }; | 78 | }; |
78 | 79 | ||
diff --git a/tests/dummyresourcetest.cpp b/tests/dummyresourcetest.cpp index 58df2da..d41f235 100644 --- a/tests/dummyresourcetest.cpp +++ b/tests/dummyresourcetest.cpp | |||
@@ -11,6 +11,7 @@ | |||
11 | #include "modelresult.h" | 11 | #include "modelresult.h" |
12 | #include "pipeline.h" | 12 | #include "pipeline.h" |
13 | #include "log.h" | 13 | #include "log.h" |
14 | #include "test.h" | ||
14 | 15 | ||
15 | using namespace Sink; | 16 | using namespace Sink; |
16 | 17 | ||
@@ -28,6 +29,7 @@ class DummyResourceTest : public QObject | |||
28 | private slots: | 29 | private slots: |
29 | void initTestCase() | 30 | void initTestCase() |
30 | { | 31 | { |
32 | Sink::Test::initTest(); | ||
31 | Sink::Log::setDebugOutputLevel(Sink::Log::Trace); | 33 | Sink::Log::setDebugOutputLevel(Sink::Log::Trace); |
32 | auto factory = Sink::ResourceFactory::load("org.kde.dummy"); | 34 | auto factory = Sink::ResourceFactory::load("org.kde.dummy"); |
33 | QVERIFY(factory); | 35 | QVERIFY(factory); |
diff --git a/tests/mailquerybenchmark.cpp b/tests/mailquerybenchmark.cpp index 20ee63c..4e58899 100644 --- a/tests/mailquerybenchmark.cpp +++ b/tests/mailquerybenchmark.cpp | |||
@@ -33,6 +33,7 @@ | |||
33 | #include <common/pipeline.h> | 33 | #include <common/pipeline.h> |
34 | #include <common/index.h> | 34 | #include <common/index.h> |
35 | #include <common/indexupdater.h> | 35 | #include <common/indexupdater.h> |
36 | #include <common/adaptorfactoryregistry.h> | ||
36 | 37 | ||
37 | #include "hawd/dataset.h" | 38 | #include "hawd/dataset.h" |
38 | #include "hawd/formatter.h" | 39 | #include "hawd/formatter.h" |
@@ -59,12 +60,11 @@ class MailQueryBenchmark : public QObject | |||
59 | TestResource::removeFromDisk(resourceIdentifier); | 60 | TestResource::removeFromDisk(resourceIdentifier); |
60 | 61 | ||
61 | auto pipeline = QSharedPointer<Sink::Pipeline>::create(resourceIdentifier); | 62 | auto pipeline = QSharedPointer<Sink::Pipeline>::create(resourceIdentifier); |
63 | pipeline->setResourceType("test"); | ||
62 | 64 | ||
63 | auto mailFactory = QSharedPointer<TestMailAdaptorFactory>::create(); | ||
64 | auto indexer = QSharedPointer<DefaultIndexUpdater<Sink::ApplicationDomain::Mail>>::create(); | 65 | auto indexer = QSharedPointer<DefaultIndexUpdater<Sink::ApplicationDomain::Mail>>::create(); |
65 | 66 | ||
66 | pipeline->setPreprocessors("mail", QVector<Sink::Preprocessor *>() << indexer.data()); | 67 | pipeline->setPreprocessors("mail", QVector<Sink::Preprocessor *>() << indexer.data()); |
67 | pipeline->setAdaptorFactory("mail", mailFactory); | ||
68 | 68 | ||
69 | auto domainTypeAdaptorFactory = QSharedPointer<TestMailAdaptorFactory>::create(); | 69 | auto domainTypeAdaptorFactory = QSharedPointer<TestMailAdaptorFactory>::create(); |
70 | 70 | ||
@@ -149,6 +149,7 @@ private slots: | |||
149 | void init() | 149 | void init() |
150 | { | 150 | { |
151 | resourceIdentifier = "org.kde.test.instance1"; | 151 | resourceIdentifier = "org.kde.test.instance1"; |
152 | Sink::AdaptorFactoryRegistry::instance().registerFactory<Sink::ApplicationDomain::Mail, TestMailAdaptorFactory>("test"); | ||
152 | } | 153 | } |
153 | 154 | ||
154 | void test50k() | 155 | void test50k() |
diff --git a/tests/pipelinebenchmark.cpp b/tests/pipelinebenchmark.cpp index 0133a6c..51481fd 100644 --- a/tests/pipelinebenchmark.cpp +++ b/tests/pipelinebenchmark.cpp | |||
@@ -33,6 +33,7 @@ | |||
33 | #include <common/pipeline.h> | 33 | #include <common/pipeline.h> |
34 | #include <common/index.h> | 34 | #include <common/index.h> |
35 | #include <common/indexupdater.h> | 35 | #include <common/indexupdater.h> |
36 | #include <common/adaptorfactoryregistry.h> | ||
36 | 37 | ||
37 | #include "hawd/dataset.h" | 38 | #include "hawd/dataset.h" |
38 | #include "hawd/formatter.h" | 39 | #include "hawd/formatter.h" |
@@ -83,10 +84,8 @@ class PipelineBenchmark : public QObject | |||
83 | TestResource::removeFromDisk(resourceIdentifier); | 84 | TestResource::removeFromDisk(resourceIdentifier); |
84 | 85 | ||
85 | auto pipeline = QSharedPointer<Sink::Pipeline>::create(resourceIdentifier); | 86 | auto pipeline = QSharedPointer<Sink::Pipeline>::create(resourceIdentifier); |
86 | |||
87 | auto mailFactory = QSharedPointer<TestMailAdaptorFactory>::create(); | ||
88 | pipeline->setPreprocessors("mail", preprocessors); | 87 | pipeline->setPreprocessors("mail", preprocessors); |
89 | pipeline->setAdaptorFactory("mail", mailFactory); | 88 | pipeline->setResourceType("test"); |
90 | 89 | ||
91 | QTime time; | 90 | QTime time; |
92 | time.start(); | 91 | time.start(); |
@@ -131,6 +130,7 @@ private slots: | |||
131 | void init() | 130 | void init() |
132 | { | 131 | { |
133 | Sink::Log::setDebugOutputLevel(Sink::Log::Warning); | 132 | Sink::Log::setDebugOutputLevel(Sink::Log::Warning); |
133 | Sink::AdaptorFactoryRegistry::instance().registerFactory<Sink::ApplicationDomain::Mail, TestMailAdaptorFactory>("test"); | ||
134 | resourceIdentifier = "org.kde.test.instance1"; | 134 | resourceIdentifier = "org.kde.test.instance1"; |
135 | } | 135 | } |
136 | 136 | ||
diff --git a/tests/pipelinetest.cpp b/tests/pipelinetest.cpp index 9290050..6ea2041 100644 --- a/tests/pipelinetest.cpp +++ b/tests/pipelinetest.cpp | |||
@@ -19,6 +19,7 @@ | |||
19 | #include "log.h" | 19 | #include "log.h" |
20 | #include "domainadaptor.h" | 20 | #include "domainadaptor.h" |
21 | #include "definitions.h" | 21 | #include "definitions.h" |
22 | #include "adaptorfactoryregistry.h" | ||
22 | 23 | ||
23 | static void removeFromDisk(const QString &name) | 24 | static void removeFromDisk(const QString &name) |
24 | { | 25 | { |
@@ -185,6 +186,7 @@ private slots: | |||
185 | void initTestCase() | 186 | void initTestCase() |
186 | { | 187 | { |
187 | Sink::Log::setDebugOutputLevel(Sink::Log::Trace); | 188 | Sink::Log::setDebugOutputLevel(Sink::Log::Trace); |
189 | Sink::AdaptorFactoryRegistry::instance().registerFactory<Sink::ApplicationDomain::Event, TestEventAdaptorFactory>("test"); | ||
188 | } | 190 | } |
189 | 191 | ||
190 | void init() | 192 | void init() |
@@ -198,9 +200,7 @@ private slots: | |||
198 | auto command = createEntityCommand(createEvent(entityFbb)); | 200 | auto command = createEntityCommand(createEvent(entityFbb)); |
199 | 201 | ||
200 | Sink::Pipeline pipeline("org.kde.pipelinetest.instance1"); | 202 | Sink::Pipeline pipeline("org.kde.pipelinetest.instance1"); |
201 | 203 | pipeline.setResourceType("test"); | |
202 | auto adaptorFactory = QSharedPointer<TestEventAdaptorFactory>::create(); | ||
203 | pipeline.setAdaptorFactory("event", adaptorFactory); | ||
204 | 204 | ||
205 | pipeline.startTransaction(); | 205 | pipeline.startTransaction(); |
206 | pipeline.newEntity(command.constData(), command.size()); | 206 | pipeline.newEntity(command.constData(), command.size()); |
@@ -216,9 +216,9 @@ private slots: | |||
216 | auto command = createEntityCommand(createEvent(entityFbb, "summary", "description")); | 216 | auto command = createEntityCommand(createEvent(entityFbb, "summary", "description")); |
217 | 217 | ||
218 | Sink::Pipeline pipeline("org.kde.pipelinetest.instance1"); | 218 | Sink::Pipeline pipeline("org.kde.pipelinetest.instance1"); |
219 | pipeline.setResourceType("test"); | ||
219 | 220 | ||
220 | auto adaptorFactory = QSharedPointer<TestEventAdaptorFactory>::create(); | 221 | auto adaptorFactory = QSharedPointer<TestEventAdaptorFactory>::create(); |
221 | pipeline.setAdaptorFactory("event", adaptorFactory); | ||
222 | 222 | ||
223 | // Create the initial revision | 223 | // Create the initial revision |
224 | pipeline.startTransaction(); | 224 | pipeline.startTransaction(); |
@@ -265,9 +265,9 @@ private slots: | |||
265 | auto command = createEntityCommand(createEvent(entityFbb)); | 265 | auto command = createEntityCommand(createEvent(entityFbb)); |
266 | 266 | ||
267 | Sink::Pipeline pipeline("org.kde.pipelinetest.instance1"); | 267 | Sink::Pipeline pipeline("org.kde.pipelinetest.instance1"); |
268 | pipeline.setResourceType("test"); | ||
268 | 269 | ||
269 | auto adaptorFactory = QSharedPointer<TestEventAdaptorFactory>::create(); | 270 | auto adaptorFactory = QSharedPointer<TestEventAdaptorFactory>::create(); |
270 | pipeline.setAdaptorFactory("event", adaptorFactory); | ||
271 | 271 | ||
272 | // Create the initial revision | 272 | // Create the initial revision |
273 | pipeline.startTransaction(); | 273 | pipeline.startTransaction(); |
@@ -309,7 +309,7 @@ private slots: | |||
309 | flatbuffers::FlatBufferBuilder entityFbb; | 309 | flatbuffers::FlatBufferBuilder entityFbb; |
310 | auto command = createEntityCommand(createEvent(entityFbb)); | 310 | auto command = createEntityCommand(createEvent(entityFbb)); |
311 | Sink::Pipeline pipeline("org.kde.pipelinetest.instance1"); | 311 | Sink::Pipeline pipeline("org.kde.pipelinetest.instance1"); |
312 | pipeline.setAdaptorFactory("event", QSharedPointer<TestEventAdaptorFactory>::create()); | 312 | pipeline.setResourceType("test"); |
313 | 313 | ||
314 | // Create the initial revision | 314 | // Create the initial revision |
315 | pipeline.startTransaction(); | 315 | pipeline.startTransaction(); |
@@ -346,9 +346,9 @@ private slots: | |||
346 | TestProcessor testProcessor; | 346 | TestProcessor testProcessor; |
347 | 347 | ||
348 | Sink::Pipeline pipeline("org.kde.pipelinetest.instance1"); | 348 | Sink::Pipeline pipeline("org.kde.pipelinetest.instance1"); |
349 | pipeline.setResourceType("test"); | ||
349 | pipeline.setPreprocessors("event", QVector<Sink::Preprocessor *>() << &testProcessor); | 350 | pipeline.setPreprocessors("event", QVector<Sink::Preprocessor *>() << &testProcessor); |
350 | pipeline.startTransaction(); | 351 | pipeline.startTransaction(); |
351 | pipeline.setAdaptorFactory("event", QSharedPointer<TestEventAdaptorFactory>::create()); | ||
352 | 352 | ||
353 | // Actual test | 353 | // Actual test |
354 | { | 354 | { |
diff --git a/tests/querytest.cpp b/tests/querytest.cpp index a654931..6d7746e 100644 --- a/tests/querytest.cpp +++ b/tests/querytest.cpp | |||
@@ -9,6 +9,7 @@ | |||
9 | #include "resourceconfig.h" | 9 | #include "resourceconfig.h" |
10 | #include "log.h" | 10 | #include "log.h" |
11 | #include "modelresult.h" | 11 | #include "modelresult.h" |
12 | #include "test.h" | ||
12 | 13 | ||
13 | /** | 14 | /** |
14 | * Test of the query system using the dummy resource. | 15 | * Test of the query system using the dummy resource. |
@@ -21,6 +22,7 @@ class QueryTest : public QObject | |||
21 | private slots: | 22 | private slots: |
22 | void initTestCase() | 23 | void initTestCase() |
23 | { | 24 | { |
25 | Sink::Test::initTest(); | ||
24 | Sink::Log::setDebugOutputLevel(Sink::Log::Trace); | 26 | Sink::Log::setDebugOutputLevel(Sink::Log::Trace); |
25 | auto factory = Sink::ResourceFactory::load("org.kde.dummy"); | 27 | auto factory = Sink::ResourceFactory::load("org.kde.dummy"); |
26 | QVERIFY(factory); | 28 | QVERIFY(factory); |
diff --git a/tests/testimplementations.h b/tests/testimplementations.h index 688875d..197602c 100644 --- a/tests/testimplementations.h +++ b/tests/testimplementations.h | |||
@@ -107,7 +107,7 @@ public: | |||
107 | class TestResource : public Sink::GenericResource | 107 | class TestResource : public Sink::GenericResource |
108 | { | 108 | { |
109 | public: | 109 | public: |
110 | TestResource(const QByteArray &instanceIdentifier, QSharedPointer<Sink::Pipeline> pipeline) : Sink::GenericResource(instanceIdentifier, pipeline) | 110 | TestResource(const QByteArray &instanceIdentifier, QSharedPointer<Sink::Pipeline> pipeline) : Sink::GenericResource("test", instanceIdentifier, pipeline, QSharedPointer<Sink::ChangeReplay>(), QSharedPointer<Sink::Synchronizer>()) |
111 | { | 111 | { |
112 | } | 112 | } |
113 | 113 | ||