summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--common/CMakeLists.txt2
-rw-r--r--common/adaptorfactoryregistry.cpp68
-rw-r--r--common/adaptorfactoryregistry.h64
-rw-r--r--common/changereplay.cpp102
-rw-r--r--common/changereplay.h68
-rw-r--r--common/genericresource.cpp560
-rw-r--r--common/genericresource.h151
-rw-r--r--common/pipeline.cpp17
-rw-r--r--common/pipeline.h3
-rw-r--r--common/resource.cpp2
-rw-r--r--common/resource.h2
-rw-r--r--examples/dummyresource/resourcefactory.cpp163
-rw-r--r--examples/dummyresource/resourcefactory.h6
-rw-r--r--examples/maildirresource/maildirresource.h1
-rw-r--r--tests/dummyresourcetest.cpp2
-rw-r--r--tests/mailquerybenchmark.cpp5
-rw-r--r--tests/pipelinebenchmark.cpp6
-rw-r--r--tests/pipelinetest.cpp14
-rw-r--r--tests/querytest.cpp2
-rw-r--r--tests/testimplementations.h2
20 files changed, 854 insertions, 386 deletions
diff --git a/common/CMakeLists.txt b/common/CMakeLists.txt
index c269a85..79b627a 100644
--- a/common/CMakeLists.txt
+++ b/common/CMakeLists.txt
@@ -66,6 +66,8 @@ set(command_SRCS
66 domain/folder.cpp 66 domain/folder.cpp
67 test.cpp 67 test.cpp
68 query.cpp 68 query.cpp
69 changereplay.cpp
70 adaptorfactoryregistry.cpp
69 ${storage_SRCS}) 71 ${storage_SRCS})
70 72
71add_library(${PROJECT_NAME} SHARED ${command_SRCS}) 73add_library(${PROJECT_NAME} SHARED ${command_SRCS})
diff --git a/common/adaptorfactoryregistry.cpp b/common/adaptorfactoryregistry.cpp
new file mode 100644
index 0000000..323a02d
--- /dev/null
+++ b/common/adaptorfactoryregistry.cpp
@@ -0,0 +1,68 @@
1/*
2 * Copyright (C) 2016 Christian Mollekopf <chrigi_1@fastmail.fm>
3 *
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Lesser General Public
6 * License as published by the Free Software Foundation; either
7 * version 2.1 of the License, or (at your option) version 3, or any
8 * later version accepted by the membership of KDE e.V. (or its
9 * successor approved by the membership of KDE e.V.), which shall
10 * act as a proxy defined in Section 6 of version 3 of the license.
11 *
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this library. If not, see <http://www.gnu.org/licenses/>.
19 */
20#include "adaptorfactoryregistry.h"
21
22#include <QByteArray>
23#include <QDebug>
24#include <QMutex>
25#include <functional>
26#include <memory>
27
28#include "domaintypeadaptorfactoryinterface.h"
29#include "applicationdomaintype.h"
30#include "log.h"
31
32using namespace Sink;
33
34AdaptorFactoryRegistry &AdaptorFactoryRegistry::instance()
35{
36 // QMutexLocker locker(&sMutex);
37 static AdaptorFactoryRegistry *instance = 0;
38 if (!instance) {
39 instance = new AdaptorFactoryRegistry;
40 }
41 return *instance;
42}
43
44static QByteArray key(const QByteArray &resource, const QByteArray &type)
45{
46 return resource + type;
47}
48
49AdaptorFactoryRegistry::AdaptorFactoryRegistry()
50{
51
52}
53
54std::shared_ptr<DomainTypeAdaptorFactoryInterface> AdaptorFactoryRegistry::getFactory(const QByteArray &resource, const QByteArray &typeName)
55{
56 const auto ptr = mRegistry.value(key(resource, typeName));
57 //We have to check the pointer before the cast, otherwise a check would return true also for invalid instances.
58 if (!ptr) {
59 return std::shared_ptr<DomainTypeAdaptorFactoryInterface>();
60 }
61 return std::static_pointer_cast<DomainTypeAdaptorFactoryInterface>(ptr);
62}
63
64void AdaptorFactoryRegistry::registerFactory(const QByteArray &resource, const std::shared_ptr<void> &instance, const QByteArray typeName)
65{
66 mRegistry.insert(key(resource, typeName), instance);
67}
68
diff --git a/common/adaptorfactoryregistry.h b/common/adaptorfactoryregistry.h
new file mode 100644
index 0000000..f06120a
--- /dev/null
+++ b/common/adaptorfactoryregistry.h
@@ -0,0 +1,64 @@
1/*
2 * Copyright (C) 2016 Christian Mollekopf <chrigi_1@fastmail.fm>
3 *
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Lesser General Public
6 * License as published by the Free Software Foundation; either
7 * version 2.1 of the License, or (at your option) version 3, or any
8 * later version accepted by the membership of KDE e.V. (or its
9 * successor approved by the membership of KDE e.V.), which shall
10 * act as a proxy defined in Section 6 of version 3 of the license.
11 *
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this library. If not, see <http://www.gnu.org/licenses/>.
19 */
20
21#pragma once
22
23#include "sink_export.h"
24#include <QByteArray>
25#include <QDebug>
26#include <QMutex>
27#include <functional>
28#include <memory>
29
30#include "domaintypeadaptorfactoryinterface.h"
31#include "applicationdomaintype.h"
32#include "log.h"
33
34namespace Sink {
35
36/**
37 */
38class SINK_EXPORT AdaptorFactoryRegistry
39{
40public:
41 static AdaptorFactoryRegistry &instance();
42
43 template <class DomainType, class Factory>
44 void registerFactory(const QByteArray &resource)
45 {
46 registerFactory(resource, std::make_shared<Factory>(), ApplicationDomain::getTypeName<DomainType>());
47 }
48
49 template <class DomainType>
50 std::shared_ptr<DomainTypeAdaptorFactoryInterface> getFactory(const QByteArray &resource)
51 {
52 return getFactory(resource, ApplicationDomain::getTypeName<DomainType>());
53 }
54
55 std::shared_ptr<DomainTypeAdaptorFactoryInterface> getFactory(const QByteArray &resource, const QByteArray &typeName);
56
57private:
58 AdaptorFactoryRegistry();
59 void registerFactory(const QByteArray &resource, const std::shared_ptr<void> &instance, const QByteArray typeName);
60
61 QHash<QByteArray, std::shared_ptr<void>> mRegistry;
62 static QMutex sMutex;
63};
64}
diff --git a/common/changereplay.cpp b/common/changereplay.cpp
new file mode 100644
index 0000000..2447b6e
--- /dev/null
+++ b/common/changereplay.cpp
@@ -0,0 +1,102 @@
1/*
2 * Copyright (C) 2016 Christian Mollekopf <mollekopf@kolabsys.com>
3 *
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Lesser General Public
6 * License as published by the Free Software Foundation; either
7 * version 2.1 of the License, or (at your option) version 3, or any
8 * later version accepted by the membership of KDE e.V. (or its
9 * successor approved by the membership of KDE e.V.), which shall
10 * act as a proxy defined in Section 6 of version 3 of the license.
11 *
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this library. If not, see <http://www.gnu.org/licenses/>.
19 */
20#include "changereplay.h"
21
22#include "entitybuffer.h"
23#include "log.h"
24#include "definitions.h"
25#include "bufferutils.h"
26
27using namespace Sink;
28
29#undef DEBUG_AREA
30#define DEBUG_AREA "resource.changereplay"
31
32ChangeReplay::ChangeReplay(const QByteArray &resourceName)
33 : mStorage(storageLocation(), resourceName, Storage::ReadOnly), mChangeReplayStore(storageLocation(), resourceName + ".changereplay", Storage::ReadWrite)
34{
35 Trace() << "Created change replay: " << resourceName;
36}
37
38qint64 ChangeReplay::getLastReplayedRevision()
39{
40 qint64 lastReplayedRevision = 0;
41 auto replayStoreTransaction = mChangeReplayStore.createTransaction(Storage::ReadOnly);
42 replayStoreTransaction.openDatabase().scan("lastReplayedRevision",
43 [&lastReplayedRevision](const QByteArray &key, const QByteArray &value) -> bool {
44 lastReplayedRevision = value.toLongLong();
45 return false;
46 },
47 [](const Storage::Error &) {});
48 return lastReplayedRevision;
49}
50
51bool ChangeReplay::allChangesReplayed()
52{
53 const qint64 topRevision = Storage::maxRevision(mStorage.createTransaction(Storage::ReadOnly, [](const Sink::Storage::Error &error) {
54 Warning() << error.message;
55 }));
56 const qint64 lastReplayedRevision = getLastReplayedRevision();
57 Trace() << "All changes replayed " << topRevision << lastReplayedRevision;
58 return (lastReplayedRevision >= topRevision);
59}
60
61void ChangeReplay::revisionChanged()
62{
63 auto mainStoreTransaction = mStorage.createTransaction(Storage::ReadOnly, [](const Sink::Storage::Error &error) {
64 Warning() << error.message;
65 });
66 auto replayStoreTransaction = mChangeReplayStore.createTransaction(Storage::ReadWrite, [](const Sink::Storage::Error &error) {
67 Warning() << error.message;
68 });
69 qint64 lastReplayedRevision = 1;
70 replayStoreTransaction.openDatabase().scan("lastReplayedRevision",
71 [&lastReplayedRevision](const QByteArray &key, const QByteArray &value) -> bool {
72 lastReplayedRevision = value.toLongLong();
73 return false;
74 },
75 [](const Storage::Error &) {});
76 const qint64 topRevision = Storage::maxRevision(mainStoreTransaction);
77
78 Trace() << "Changereplay from " << lastReplayedRevision << " to " << topRevision;
79 if (lastReplayedRevision <= topRevision) {
80 qint64 revision = lastReplayedRevision;
81 for (; revision <= topRevision; revision++) {
82 const auto uid = Storage::getUidFromRevision(mainStoreTransaction, revision);
83 const auto type = Storage::getTypeFromRevision(mainStoreTransaction, revision);
84 const auto key = Storage::assembleKey(uid, revision);
85 Storage::mainDatabase(mainStoreTransaction, type)
86 .scan(key,
87 [&lastReplayedRevision, type, this](const QByteArray &key, const QByteArray &value) -> bool {
88 replay(type, key, value).exec();
89 // TODO make for loop async, and pass to async replay function together with type
90 Trace() << "Replaying " << key;
91 return false;
92 },
93 [key](const Storage::Error &) { ErrorMsg() << "Failed to replay change " << key; });
94 }
95 revision--;
96 replayStoreTransaction.openDatabase().write("lastReplayedRevision", QByteArray::number(revision));
97 replayStoreTransaction.commit();
98 Trace() << "Replayed until " << revision;
99 }
100 emit changesReplayed();
101}
102
diff --git a/common/changereplay.h b/common/changereplay.h
new file mode 100644
index 0000000..a568060
--- /dev/null
+++ b/common/changereplay.h
@@ -0,0 +1,68 @@
1/*
2 * Copyright (C) 2016 Christian Mollekopf <mollekopf@kolabsys.com>
3 *
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Lesser General Public
6 * License as published by the Free Software Foundation; either
7 * version 2.1 of the License, or (at your option) version 3, or any
8 * later version accepted by the membership of KDE e.V. (or its
9 * successor approved by the membership of KDE e.V.), which shall
10 * act as a proxy defined in Section 6 of version 3 of the license.
11 *
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this library. If not, see <http://www.gnu.org/licenses/>.
19 */
20#pragma once
21
22#include "sink_export.h"
23#include <QObject>
24#include <Async/Async>
25
26#include "storage.h"
27
28namespace Sink {
29
30/**
31 * Replays changes from the storage one by one.
32 *
33 * Uses a local database to:
34 * * Remember what changes have been replayed already.
35 * * store a mapping of remote to local buffers
36 */
37class SINK_EXPORT ChangeReplay : public QObject
38{
39 Q_OBJECT
40public:
41 ChangeReplay(const QByteArray &resourceName);
42
43 qint64 getLastReplayedRevision();
44 bool allChangesReplayed();
45
46signals:
47 void changesReplayed();
48
49public slots:
50 void revisionChanged();
51
52protected:
53 virtual KAsync::Job<void> replay(const QByteArray &type, const QByteArray &key, const QByteArray &value) = 0;
54 Sink::Storage mStorage;
55
56private:
57 Sink::Storage mChangeReplayStore;
58};
59
60class NullChangeReplay : public ChangeReplay
61{
62 public:
63 NullChangeReplay() : ChangeReplay("null") {}
64 KAsync::Job<void> replay(const QByteArray &type, const QByteArray &key, const QByteArray &value) Q_DECL_OVERRIDE { return KAsync::null<void>(); }
65};
66
67}
68
diff --git a/common/genericresource.cpp b/common/genericresource.cpp
index eae6ead..cb2ef21 100644
--- a/common/genericresource.cpp
+++ b/common/genericresource.cpp
@@ -1,3 +1,22 @@
1/*
2 * Copyright (C) 2016 Christian Mollekopf <mollekopf@kolabsys.com>
3 *
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Lesser General Public
6 * License as published by the Free Software Foundation; either
7 * version 2.1 of the License, or (at your option) version 3, or any
8 * later version accepted by the membership of KDE e.V. (or its
9 * successor approved by the membership of KDE e.V.), which shall
10 * act as a proxy defined in Section 6 of version 3 of the license.
11 *
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this library. If not, see <http://www.gnu.org/licenses/>.
19 */
1#include "genericresource.h" 20#include "genericresource.h"
2 21
3#include "entitybuffer.h" 22#include "entitybuffer.h"
@@ -14,6 +33,7 @@
14#include "log.h" 33#include "log.h"
15#include "definitions.h" 34#include "definitions.h"
16#include "bufferutils.h" 35#include "bufferutils.h"
36#include "adaptorfactoryregistry.h"
17 37
18#include <QUuid> 38#include <QUuid>
19#include <QDataStream> 39#include <QDataStream>
@@ -30,96 +50,6 @@ static int sCommitInterval = 10;
30using namespace Sink; 50using namespace Sink;
31 51
32#undef DEBUG_AREA 52#undef DEBUG_AREA
33#define DEBUG_AREA "resource.changereplay"
34
35/**
36 * Replays changes from the storage one by one.
37 *
38 * Uses a local database to:
39 * * Remember what changes have been replayed already.
40 * * store a mapping of remote to local buffers
41 */
42class ChangeReplay : public QObject
43{
44 Q_OBJECT
45public:
46 typedef std::function<KAsync::Job<void>(const QByteArray &type, const QByteArray &key, const QByteArray &value)> ReplayFunction;
47
48 ChangeReplay(const QString &resourceName, const ReplayFunction &replayFunction)
49 : mStorage(storageLocation(), resourceName, Storage::ReadOnly), mChangeReplayStore(storageLocation(), resourceName + ".changereplay", Storage::ReadWrite), mReplayFunction(replayFunction)
50 {
51 }
52
53 qint64 getLastReplayedRevision()
54 {
55 qint64 lastReplayedRevision = 0;
56 auto replayStoreTransaction = mChangeReplayStore.createTransaction(Storage::ReadOnly);
57 replayStoreTransaction.openDatabase().scan("lastReplayedRevision",
58 [&lastReplayedRevision](const QByteArray &key, const QByteArray &value) -> bool {
59 lastReplayedRevision = value.toLongLong();
60 return false;
61 },
62 [](const Storage::Error &) {});
63 return lastReplayedRevision;
64 }
65
66 bool allChangesReplayed()
67 {
68 const qint64 topRevision = Storage::maxRevision(mStorage.createTransaction(Storage::ReadOnly));
69 const qint64 lastReplayedRevision = getLastReplayedRevision();
70 Trace() << "All changes replayed " << topRevision << lastReplayedRevision;
71 return (lastReplayedRevision >= topRevision);
72 }
73
74signals:
75 void changesReplayed();
76
77public slots:
78 void revisionChanged()
79 {
80 auto mainStoreTransaction = mStorage.createTransaction(Storage::ReadOnly);
81 auto replayStoreTransaction = mChangeReplayStore.createTransaction(Storage::ReadWrite);
82 qint64 lastReplayedRevision = 1;
83 replayStoreTransaction.openDatabase().scan("lastReplayedRevision",
84 [&lastReplayedRevision](const QByteArray &key, const QByteArray &value) -> bool {
85 lastReplayedRevision = value.toLongLong();
86 return false;
87 },
88 [](const Storage::Error &) {});
89 const qint64 topRevision = Storage::maxRevision(mainStoreTransaction);
90
91 Trace() << "Changereplay from " << lastReplayedRevision << " to " << topRevision;
92 if (lastReplayedRevision <= topRevision) {
93 qint64 revision = lastReplayedRevision;
94 for (; revision <= topRevision; revision++) {
95 const auto uid = Storage::getUidFromRevision(mainStoreTransaction, revision);
96 const auto type = Storage::getTypeFromRevision(mainStoreTransaction, revision);
97 const auto key = Storage::assembleKey(uid, revision);
98 Storage::mainDatabase(mainStoreTransaction, type)
99 .scan(key,
100 [&lastReplayedRevision, type, this](const QByteArray &key, const QByteArray &value) -> bool {
101 mReplayFunction(type, key, value).exec();
102 // TODO make for loop async, and pass to async replay function together with type
103 Trace() << "Replaying " << key;
104 return false;
105 },
106 [key](const Storage::Error &) { ErrorMsg() << "Failed to replay change " << key; });
107 }
108 revision--;
109 replayStoreTransaction.openDatabase().write("lastReplayedRevision", QByteArray::number(revision));
110 replayStoreTransaction.commit();
111 Trace() << "Replayed until " << revision;
112 }
113 emit changesReplayed();
114 }
115
116private:
117 Sink::Storage mStorage;
118 Sink::Storage mChangeReplayStore;
119 ReplayFunction mReplayFunction;
120};
121
122#undef DEBUG_AREA
123#define DEBUG_AREA "resource.commandprocessor" 53#define DEBUG_AREA "resource.commandprocessor"
124 54
125/** 55/**
@@ -133,10 +63,9 @@ class CommandProcessor : public QObject
133public: 63public:
134 CommandProcessor(Sink::Pipeline *pipeline, QList<MessageQueue *> commandQueues) : QObject(), mPipeline(pipeline), mCommandQueues(commandQueues), mProcessingLock(false) 64 CommandProcessor(Sink::Pipeline *pipeline, QList<MessageQueue *> commandQueues) : QObject(), mPipeline(pipeline), mCommandQueues(commandQueues), mProcessingLock(false)
135 { 65 {
136 mPipeline->startTransaction(); 66 mLowerBoundRevision = Storage::maxRevision(mPipeline->storage().createTransaction(Storage::ReadOnly, [](const Sink::Storage::Error &error) {
137 // FIXME Should be initialized to the current value of the change replay queue 67 Warning() << error.message;
138 mLowerBoundRevision = Storage::maxRevision(mPipeline->transaction()); 68 }));
139 mPipeline->commit();
140 69
141 for (auto queue : mCommandQueues) { 70 for (auto queue : mCommandQueues) {
142 const bool ret = connect(queue, &MessageQueue::messageReady, this, &CommandProcessor::process); 71 const bool ret = connect(queue, &MessageQueue::messageReady, this, &CommandProcessor::process);
@@ -303,15 +232,22 @@ private:
303#undef DEBUG_AREA 232#undef DEBUG_AREA
304#define DEBUG_AREA "resource" 233#define DEBUG_AREA "resource"
305 234
306GenericResource::GenericResource(const QByteArray &resourceInstanceIdentifier, const QSharedPointer<Pipeline> &pipeline) 235GenericResource::GenericResource(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier, const QSharedPointer<Pipeline> &pipeline, const QSharedPointer<ChangeReplay> &changeReplay, const QSharedPointer<Synchronizer> &synchronizer)
307 : Sink::Resource(), 236 : Sink::Resource(),
308 mUserQueue(Sink::storageLocation(), resourceInstanceIdentifier + ".userqueue"), 237 mUserQueue(Sink::storageLocation(), resourceInstanceIdentifier + ".userqueue"),
309 mSynchronizerQueue(Sink::storageLocation(), resourceInstanceIdentifier + ".synchronizerqueue"), 238 mSynchronizerQueue(Sink::storageLocation(), resourceInstanceIdentifier + ".synchronizerqueue"),
239 mResourceType(resourceType),
310 mResourceInstanceIdentifier(resourceInstanceIdentifier), 240 mResourceInstanceIdentifier(resourceInstanceIdentifier),
311 mPipeline(pipeline ? pipeline : QSharedPointer<Sink::Pipeline>::create(resourceInstanceIdentifier)), 241 mPipeline(pipeline ? pipeline : QSharedPointer<Sink::Pipeline>::create(resourceInstanceIdentifier)),
242 mChangeReplay(changeReplay),
243 mSynchronizer(synchronizer),
312 mError(0), 244 mError(0),
313 mClientLowerBoundRevision(std::numeric_limits<qint64>::max()) 245 mClientLowerBoundRevision(std::numeric_limits<qint64>::max())
314{ 246{
247 mPipeline->setResourceType(mResourceType);
248 mSynchronizer->setup([this](int commandId, const QByteArray &data) {
249 enqueueCommand(mSynchronizerQueue, commandId, data);
250 });
315 mProcessor = new CommandProcessor(mPipeline.data(), QList<MessageQueue *>() << &mUserQueue << &mSynchronizerQueue); 251 mProcessor = new CommandProcessor(mPipeline.data(), QList<MessageQueue *>() << &mUserQueue << &mSynchronizerQueue);
316 mProcessor->setInspectionCommand([this](void const *command, size_t size) { 252 mProcessor->setInspectionCommand([this](void const *command, size_t size) {
317 flatbuffers::Verifier verifier((const uint8_t *)command, size); 253 flatbuffers::Verifier verifier((const uint8_t *)command, size);
@@ -353,14 +289,9 @@ GenericResource::GenericResource(const QByteArray &resourceInstanceIdentifier, c
353 }); 289 });
354 QObject::connect(mProcessor, &CommandProcessor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); }); 290 QObject::connect(mProcessor, &CommandProcessor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); });
355 QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, this, &Resource::revisionUpdated); 291 QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, this, &Resource::revisionUpdated);
356 mSourceChangeReplay = new ChangeReplay(resourceInstanceIdentifier, [this](const QByteArray &type, const QByteArray &key, const QByteArray &value) {
357 // This results in a deadlock when a sync is in progress and we try to create a second writing transaction (which is why we turn changereplay off during the sync)
358 auto synchronizationStore = QSharedPointer<Sink::Storage>::create(Sink::storageLocation(), mResourceInstanceIdentifier + ".synchronization", Sink::Storage::ReadWrite);
359 return this->replay(*synchronizationStore, type, key, value).then<void>([synchronizationStore]() {});
360 });
361 enableChangeReplay(true); 292 enableChangeReplay(true);
362 mClientLowerBoundRevision = mPipeline->cleanedUpRevision(); 293 mClientLowerBoundRevision = mPipeline->cleanedUpRevision();
363 mProcessor->setOldestUsedRevision(mSourceChangeReplay->getLastReplayedRevision()); 294 mProcessor->setOldestUsedRevision(mChangeReplay->getLastReplayedRevision());
364 295
365 mCommitQueueTimer.setInterval(sCommitInterval); 296 mCommitQueueTimer.setInterval(sCommitInterval);
366 mCommitQueueTimer.setSingleShot(true); 297 mCommitQueueTimer.setSingleShot(true);
@@ -370,7 +301,6 @@ GenericResource::GenericResource(const QByteArray &resourceInstanceIdentifier, c
370GenericResource::~GenericResource() 301GenericResource::~GenericResource()
371{ 302{
372 delete mProcessor; 303 delete mProcessor;
373 delete mSourceChangeReplay;
374} 304}
375 305
376KAsync::Job<void> GenericResource::inspect( 306KAsync::Job<void> GenericResource::inspect(
@@ -383,86 +313,20 @@ KAsync::Job<void> GenericResource::inspect(
383void GenericResource::enableChangeReplay(bool enable) 313void GenericResource::enableChangeReplay(bool enable)
384{ 314{
385 if (enable) { 315 if (enable) {
386 QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, mSourceChangeReplay, &ChangeReplay::revisionChanged, Qt::QueuedConnection); 316 QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, mChangeReplay.data(), &ChangeReplay::revisionChanged, Qt::QueuedConnection);
387 QObject::connect(mSourceChangeReplay, &ChangeReplay::changesReplayed, this, &GenericResource::updateLowerBoundRevision); 317 QObject::connect(mChangeReplay.data(), &ChangeReplay::changesReplayed, this, &GenericResource::updateLowerBoundRevision);
388 mSourceChangeReplay->revisionChanged(); 318 mChangeReplay->revisionChanged();
389 } else { 319 } else {
390 QObject::disconnect(mPipeline.data(), &Pipeline::revisionUpdated, mSourceChangeReplay, &ChangeReplay::revisionChanged); 320 QObject::disconnect(mPipeline.data(), &Pipeline::revisionUpdated, mChangeReplay.data(), &ChangeReplay::revisionChanged);
391 QObject::disconnect(mSourceChangeReplay, &ChangeReplay::changesReplayed, this, &GenericResource::updateLowerBoundRevision); 321 QObject::disconnect(mChangeReplay.data(), &ChangeReplay::changesReplayed, this, &GenericResource::updateLowerBoundRevision);
392 } 322 }
393} 323}
394 324
395void GenericResource::addType(const QByteArray &type, DomainTypeAdaptorFactoryInterface::Ptr factory, const QVector<Sink::Preprocessor *> &preprocessors) 325void GenericResource::addType(const QByteArray &type, DomainTypeAdaptorFactoryInterface::Ptr factory, const QVector<Sink::Preprocessor *> &preprocessors)
396{ 326{
397 mPipeline->setPreprocessors(type, preprocessors); 327 mPipeline->setPreprocessors(type, preprocessors);
398 mPipeline->setAdaptorFactory(type, factory);
399 mAdaptorFactories.insert(type, factory);
400}
401
402KAsync::Job<void> GenericResource::replay(Sink::Storage &synchronizationStore, const QByteArray &type, const QByteArray &key, const QByteArray &value)
403{
404 Sink::EntityBuffer buffer(value);
405 const Sink::Entity &entity = buffer.entity();
406 const auto metadataBuffer = Sink::EntityBuffer::readBuffer<Sink::Metadata>(entity.metadata());
407 Q_ASSERT(metadataBuffer);
408 if (!metadataBuffer->replayToSource()) {
409 Trace() << "Change is coming from the source";
410 return KAsync::null<void>();
411 }
412 const qint64 revision = metadataBuffer ? metadataBuffer->revision() : -1;
413 const auto operation = metadataBuffer ? metadataBuffer->operation() : Sink::Operation_Creation;
414 const auto uid = Sink::Storage::uidFromKey(key);
415 QByteArray oldRemoteId;
416
417 if (operation != Sink::Operation_Creation) {
418 auto synchronizationTransaction = synchronizationStore.createTransaction(Sink::Storage::ReadOnly);
419 oldRemoteId = resolveLocalId(type, uid, synchronizationTransaction);
420 }
421 Trace() << "Replaying " << key << type;
422
423 KAsync::Job<QByteArray> job = KAsync::null<QByteArray>();
424 if (type == ENTITY_TYPE_FOLDER) {
425 const Sink::ApplicationDomain::Folder folder(mResourceInstanceIdentifier, uid, revision, mAdaptorFactories.value(type)->createAdaptor(entity));
426 job = replay(folder, operation, oldRemoteId);
427 } else if (type == ENTITY_TYPE_MAIL) {
428 const Sink::ApplicationDomain::Mail mail(mResourceInstanceIdentifier, uid, revision, mAdaptorFactories.value(type)->createAdaptor(entity));
429 job = replay(mail, operation, oldRemoteId);
430 }
431
432 return job.then<void, QByteArray>([=, &synchronizationStore](const QByteArray &remoteId) {
433 auto synchronizationTransaction = synchronizationStore.createTransaction(Sink::Storage::ReadWrite);
434 Trace() << "Replayed change with remote id: " << remoteId;
435 if (operation == Sink::Operation_Creation) {
436 if (remoteId.isEmpty()) {
437 Warning() << "Returned an empty remoteId from the creation";
438 } else {
439 recordRemoteId(type, uid, remoteId, synchronizationTransaction);
440 }
441 } else if (operation == Sink::Operation_Modification) {
442 if (remoteId.isEmpty()) {
443 Warning() << "Returned an empty remoteId from the creation";
444 } else {
445 updateRemoteId(type, uid, remoteId, synchronizationTransaction);
446 }
447 } else if (operation == Sink::Operation_Removal) {
448 removeRemoteId(type, uid, remoteId, synchronizationTransaction);
449 } else {
450 Warning() << "Unkown operation" << operation;
451 }
452 }, [](int errorCode, const QString &errorMessage) {
453 Warning() << "Failed to replay change: " << errorMessage;
454 });
455} 328}
456 329
457KAsync::Job<QByteArray> GenericResource::replay(const ApplicationDomain::Mail &, Sink::Operation, const QByteArray &)
458{
459 return KAsync::null<QByteArray>();
460}
461
462KAsync::Job<QByteArray> GenericResource::replay(const ApplicationDomain::Folder &, Sink::Operation, const QByteArray &)
463{
464 return KAsync::null<QByteArray>();
465}
466 330
467void GenericResource::removeDataFromDisk() 331void GenericResource::removeDataFromDisk()
468{ 332{
@@ -528,10 +392,8 @@ KAsync::Job<void> GenericResource::synchronizeWithSource()
528 Log() << " Synchronizing"; 392 Log() << " Synchronizing";
529 // Changereplay would deadlock otherwise when trying to open the synchronization store 393 // Changereplay would deadlock otherwise when trying to open the synchronization store
530 enableChangeReplay(false); 394 enableChangeReplay(false);
531 auto mainStore = QSharedPointer<Sink::Storage>::create(Sink::storageLocation(), mResourceInstanceIdentifier, Sink::Storage::ReadOnly); 395 mSynchronizer->synchronize()
532 auto syncStore = QSharedPointer<Sink::Storage>::create(Sink::storageLocation(), mResourceInstanceIdentifier + ".synchronization", Sink::Storage::ReadWrite); 396 .then<void>([this, &future]() {
533 synchronizeWithSource(*mainStore, *syncStore)
534 .then<void>([this, mainStore, syncStore, &future]() {
535 Log() << "Done Synchronizing"; 397 Log() << "Done Synchronizing";
536 enableChangeReplay(true); 398 enableChangeReplay(true);
537 future.setFinished(); 399 future.setFinished();
@@ -576,11 +438,11 @@ KAsync::Job<void> GenericResource::processAllMessages()
576 .then<void>([this](KAsync::Future<void> &f) { waitForDrained(f, mSynchronizerQueue); }) 438 .then<void>([this](KAsync::Future<void> &f) { waitForDrained(f, mSynchronizerQueue); })
577 .then<void>([this](KAsync::Future<void> &f) { waitForDrained(f, mUserQueue); }) 439 .then<void>([this](KAsync::Future<void> &f) { waitForDrained(f, mUserQueue); })
578 .then<void>([this](KAsync::Future<void> &f) { 440 .then<void>([this](KAsync::Future<void> &f) {
579 if (mSourceChangeReplay->allChangesReplayed()) { 441 if (mChangeReplay->allChangesReplayed()) {
580 f.setFinished(); 442 f.setFinished();
581 } else { 443 } else {
582 auto context = new QObject; 444 auto context = new QObject;
583 QObject::connect(mSourceChangeReplay, &ChangeReplay::changesReplayed, context, [&f, context]() { 445 QObject::connect(mChangeReplay.data(), &ChangeReplay::changesReplayed, context, [&f, context]() {
584 delete context; 446 delete context;
585 f.setFinished(); 447 f.setFinished();
586 }); 448 });
@@ -590,7 +452,7 @@ KAsync::Job<void> GenericResource::processAllMessages()
590 452
591void GenericResource::updateLowerBoundRevision() 453void GenericResource::updateLowerBoundRevision()
592{ 454{
593 mProcessor->setOldestUsedRevision(qMin(mClientLowerBoundRevision, mSourceChangeReplay->getLastReplayedRevision())); 455 mProcessor->setOldestUsedRevision(qMin(mClientLowerBoundRevision, mChangeReplay->getLastReplayedRevision()));
594} 456}
595 457
596void GenericResource::setLowerBoundRevision(qint64 revision) 458void GenericResource::setLowerBoundRevision(qint64 revision)
@@ -599,7 +461,139 @@ void GenericResource::setLowerBoundRevision(qint64 revision)
599 updateLowerBoundRevision(); 461 updateLowerBoundRevision();
600} 462}
601 463
602void GenericResource::createEntity(const QByteArray &sinkId, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject, 464
465
466
467EntityStore::EntityStore(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier, Sink::Storage::Transaction &transaction)
468 : mResourceType(resourceType), mResourceInstanceIdentifier(resourceInstanceIdentifier),
469 mTransaction(transaction)
470{
471
472}
473
474template<typename T>
475T EntityStore::read(const QByteArray &identifier) const
476{
477 auto typeName = ApplicationDomain::getTypeName<T>();
478 auto mainDatabase = Storage::mainDatabase(mTransaction, typeName);
479 auto bufferAdaptor = getLatest(mainDatabase, identifier, *Sink::AdaptorFactoryRegistry::instance().getFactory<T>(mResourceType));
480 Q_ASSERT(bufferAdaptor);
481 return T(mResourceInstanceIdentifier, identifier, 0, bufferAdaptor);
482}
483
484QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> EntityStore::getLatest(const Sink::Storage::NamedDatabase &db, const QByteArray &uid, DomainTypeAdaptorFactoryInterface &adaptorFactory)
485{
486 QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> current;
487 db.findLatest(uid,
488 [&current, &adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool {
489 Sink::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size());
490 if (!buffer.isValid()) {
491 Warning() << "Read invalid buffer from disk";
492 } else {
493 Trace() << "Found value " << key;
494 current = adaptorFactory.createAdaptor(buffer.entity());
495 }
496 return false;
497 },
498 [](const Sink::Storage::Error &error) { Warning() << "Failed to read current value from storage: " << error.message; });
499 return current;
500}
501
502
503
504SyncStore::SyncStore(Sink::Storage::Transaction &transaction)
505 : mTransaction(transaction)
506{
507
508}
509
510void SyncStore::recordRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId)
511{
512 Index("rid.mapping." + bufferType, mTransaction).add(remoteId, localId);
513 Index("localid.mapping." + bufferType, mTransaction).add(localId, remoteId);
514}
515
516void SyncStore::removeRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId)
517{
518 Index("rid.mapping." + bufferType, mTransaction).remove(remoteId, localId);
519 Index("localid.mapping." + bufferType, mTransaction).remove(localId, remoteId);
520}
521
522void SyncStore::updateRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId)
523{
524 const auto oldRemoteId = Index("localid.mapping." + bufferType, mTransaction).lookup(localId);
525 removeRemoteId(bufferType, localId, oldRemoteId);
526 recordRemoteId(bufferType, localId, remoteId);
527}
528
529QByteArray SyncStore::resolveRemoteId(const QByteArray &bufferType, const QByteArray &remoteId)
530{
531 // Lookup local id for remote id, or insert a new pair otherwise
532 Index index("rid.mapping." + bufferType, mTransaction);
533 QByteArray sinkId = index.lookup(remoteId);
534 if (sinkId.isEmpty()) {
535 sinkId = QUuid::createUuid().toString().toUtf8();
536 index.add(remoteId, sinkId);
537 Index("localid.mapping." + bufferType, mTransaction).add(sinkId, remoteId);
538 }
539 return sinkId;
540}
541
542QByteArray SyncStore::resolveLocalId(const QByteArray &bufferType, const QByteArray &localId)
543{
544 QByteArray remoteId = Index("localid.mapping." + bufferType, mTransaction).lookup(localId);
545 if (remoteId.isEmpty()) {
546 Warning() << "Couldn't find the remote id for " << localId;
547 return QByteArray();
548 }
549 return remoteId;
550}
551
552
553
554
555
556
557
558
559Synchronizer::Synchronizer(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier)
560 : mStorage(Sink::storageLocation(), resourceInstanceIdentifier, Sink::Storage::ReadOnly),
561 mSyncStorage(Sink::storageLocation(), resourceInstanceIdentifier + ".synchronization", Sink::Storage::ReadWrite),
562 mResourceType(resourceType),
563 mResourceInstanceIdentifier(resourceInstanceIdentifier)
564{
565 Trace() << "Starting synchronizer: " << resourceType << resourceInstanceIdentifier;
566
567}
568
569void Synchronizer::setup(const std::function<void(int commandId, const QByteArray &data)> &enqueueCommandCallback)
570{
571 mEnqueue = enqueueCommandCallback;
572}
573
574void Synchronizer::enqueueCommand(int commandId, const QByteArray &data)
575{
576 Q_ASSERT(mEnqueue);
577 mEnqueue(commandId, data);
578}
579
580EntityStore &Synchronizer::store()
581{
582 if (!mEntityStore) {
583 mEntityStore = QSharedPointer<EntityStore>::create(mResourceType, mResourceInstanceIdentifier, mTransaction);
584 }
585 return *mEntityStore;
586}
587
588SyncStore &Synchronizer::syncStore()
589{
590 if (!mSyncStore) {
591 mSyncStore = QSharedPointer<SyncStore>::create(mSyncTransaction);
592 }
593 return *mSyncStore;
594}
595
596void Synchronizer::createEntity(const QByteArray &sinkId, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject,
603 DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function<void(const QByteArray &)> callback) 597 DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function<void(const QByteArray &)> callback)
604{ 598{
605 // These changes are coming from the source 599 // These changes are coming from the source
@@ -616,7 +610,7 @@ void GenericResource::createEntity(const QByteArray &sinkId, const QByteArray &b
616 callback(BufferUtils::extractBuffer(fbb)); 610 callback(BufferUtils::extractBuffer(fbb));
617} 611}
618 612
619void GenericResource::modifyEntity(const QByteArray &sinkId, qint64 revision, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject, 613void Synchronizer::modifyEntity(const QByteArray &sinkId, qint64 revision, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject,
620 DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function<void(const QByteArray &)> callback) 614 DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function<void(const QByteArray &)> callback)
621{ 615{
622 // These changes are coming from the source 616 // These changes are coming from the source
@@ -634,7 +628,7 @@ void GenericResource::modifyEntity(const QByteArray &sinkId, qint64 revision, co
634 callback(BufferUtils::extractBuffer(fbb)); 628 callback(BufferUtils::extractBuffer(fbb));
635} 629}
636 630
637void GenericResource::deleteEntity(const QByteArray &sinkId, qint64 revision, const QByteArray &bufferType, std::function<void(const QByteArray &)> callback) 631void Synchronizer::deleteEntity(const QByteArray &sinkId, qint64 revision, const QByteArray &bufferType, std::function<void(const QByteArray &)> callback)
638{ 632{
639 // These changes are coming from the source 633 // These changes are coming from the source
640 const auto replayToSource = false; 634 const auto replayToSource = false;
@@ -647,96 +641,36 @@ void GenericResource::deleteEntity(const QByteArray &sinkId, qint64 revision, co
647 callback(BufferUtils::extractBuffer(fbb)); 641 callback(BufferUtils::extractBuffer(fbb));
648} 642}
649 643
650void GenericResource::recordRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId, Sink::Storage::Transaction &transaction) 644void Synchronizer::scanForRemovals(const QByteArray &bufferType, const std::function<void(const std::function<void(const QByteArray &key)> &callback)> &entryGenerator, std::function<bool(const QByteArray &remoteId)> exists)
651{
652 Index("rid.mapping." + bufferType, transaction).add(remoteId, localId);
653 ;
654 Index("localid.mapping." + bufferType, transaction).add(localId, remoteId);
655}
656
657void GenericResource::removeRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId, Sink::Storage::Transaction &transaction)
658{
659 Index("rid.mapping." + bufferType, transaction).remove(remoteId, localId);
660 Index("localid.mapping." + bufferType, transaction).remove(localId, remoteId);
661}
662
663void GenericResource::updateRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId, Sink::Storage::Transaction &transaction)
664{
665 const auto oldRemoteId = Index("localid.mapping." + bufferType, transaction).lookup(localId);
666 removeRemoteId(bufferType, localId, oldRemoteId, transaction);
667 recordRemoteId(bufferType, localId, remoteId, transaction);
668}
669
670QByteArray GenericResource::resolveRemoteId(const QByteArray &bufferType, const QByteArray &remoteId, Sink::Storage::Transaction &transaction)
671{
672 // Lookup local id for remote id, or insert a new pair otherwise
673 Index index("rid.mapping." + bufferType, transaction);
674 QByteArray sinkId = index.lookup(remoteId);
675 if (sinkId.isEmpty()) {
676 sinkId = QUuid::createUuid().toString().toUtf8();
677 index.add(remoteId, sinkId);
678 Index("localid.mapping." + bufferType, transaction).add(sinkId, remoteId);
679 }
680 return sinkId;
681}
682
683QByteArray GenericResource::resolveLocalId(const QByteArray &bufferType, const QByteArray &localId, Sink::Storage::Transaction &transaction)
684{
685 QByteArray remoteId = Index("localid.mapping." + bufferType, transaction).lookup(localId);
686 if (remoteId.isEmpty()) {
687 Warning() << "Couldn't find the remote id for " << localId;
688 return QByteArray();
689 }
690 return remoteId;
691}
692
693void GenericResource::scanForRemovals(Sink::Storage::Transaction &transaction, Sink::Storage::Transaction &synchronizationTransaction, const QByteArray &bufferType,
694 const std::function<void(const std::function<void(const QByteArray &key)> &callback)> &entryGenerator, std::function<bool(const QByteArray &remoteId)> exists)
695{ 645{
696 entryGenerator([this, &transaction, bufferType, &synchronizationTransaction, &exists](const QByteArray &key) { 646 entryGenerator([this, bufferType, &exists](const QByteArray &key) {
697 auto sinkId = Sink::Storage::uidFromKey(key); 647 auto sinkId = Sink::Storage::uidFromKey(key);
698 Trace() << "Checking for removal " << key; 648 Trace() << "Checking for removal " << key;
699 const auto remoteId = resolveLocalId(bufferType, sinkId, synchronizationTransaction); 649 const auto remoteId = syncStore().resolveLocalId(bufferType, sinkId);
700 // If we have no remoteId, the entity hasn't been replayed to the source yet 650 // If we have no remoteId, the entity hasn't been replayed to the source yet
701 if (!remoteId.isEmpty()) { 651 if (!remoteId.isEmpty()) {
702 if (!exists(remoteId)) { 652 if (!exists(remoteId)) {
703 Trace() << "Found a removed entity: " << sinkId; 653 Trace() << "Found a removed entity: " << sinkId;
704 deleteEntity(sinkId, Sink::Storage::maxRevision(transaction), bufferType, 654 deleteEntity(sinkId, Sink::Storage::maxRevision(mTransaction), bufferType,
705 [this](const QByteArray &buffer) { enqueueCommand(mSynchronizerQueue, Sink::Commands::DeleteEntityCommand, buffer); }); 655 [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::DeleteEntityCommand, buffer); });
706 } 656 }
707 } 657 }
708 }); 658 });
709} 659}
710 660
711QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> GenericResource::getLatest(const Sink::Storage::NamedDatabase &db, const QByteArray &uid, DomainTypeAdaptorFactoryInterface &adaptorFactory) 661void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity)
712{
713 QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> current;
714 db.findLatest(uid,
715 [&current, &adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool {
716 Sink::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size());
717 if (!buffer.isValid()) {
718 Warning() << "Read invalid buffer from disk";
719 } else {
720 current = adaptorFactory.createAdaptor(buffer.entity());
721 }
722 return false;
723 },
724 [](const Sink::Storage::Error &error) { Warning() << "Failed to read current value from storage: " << error.message; });
725 return current;
726}
727
728void GenericResource::createOrModify(Sink::Storage::Transaction &transaction, Sink::Storage::Transaction &synchronizationTransaction,
729 DomainTypeAdaptorFactoryInterface &adaptorFactory, const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity)
730{ 662{
731 auto mainDatabase = Storage::mainDatabase(transaction, bufferType); 663 Trace() << "Create or modify" << bufferType << remoteId;
732 const auto sinkId = resolveRemoteId(bufferType, remoteId, synchronizationTransaction); 664 auto mainDatabase = Storage::mainDatabase(mTransaction, bufferType);
665 const auto sinkId = syncStore().resolveRemoteId(bufferType, remoteId);
733 const auto found = mainDatabase.contains(sinkId); 666 const auto found = mainDatabase.contains(sinkId);
667 auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(mResourceType, bufferType);
734 if (!found) { 668 if (!found) {
735 Trace() << "Found a new entity: " << remoteId; 669 Trace() << "Found a new entity: " << remoteId;
736 createEntity( 670 createEntity(
737 sinkId, bufferType, entity, adaptorFactory, [this](const QByteArray &buffer) { enqueueCommand(mSynchronizerQueue, Sink::Commands::CreateEntityCommand, buffer); }); 671 sinkId, bufferType, entity, *adaptorFactory, [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::CreateEntityCommand, buffer); });
738 } else { // modification 672 } else { // modification
739 if (auto current = getLatest(mainDatabase, sinkId, adaptorFactory)) { 673 if (auto current = store().getLatest(mainDatabase, sinkId, *adaptorFactory)) {
740 bool changed = false; 674 bool changed = false;
741 for (const auto &property : entity.changedProperties()) { 675 for (const auto &property : entity.changedProperties()) {
742 if (entity.getProperty(property) != current->getProperty(property)) { 676 if (entity.getProperty(property) != current->getProperty(property)) {
@@ -746,8 +680,8 @@ void GenericResource::createOrModify(Sink::Storage::Transaction &transaction, Si
746 } 680 }
747 if (changed) { 681 if (changed) {
748 Trace() << "Found a modified entity: " << remoteId; 682 Trace() << "Found a modified entity: " << remoteId;
749 modifyEntity(sinkId, Sink::Storage::maxRevision(transaction), bufferType, entity, adaptorFactory, 683 modifyEntity(sinkId, Sink::Storage::maxRevision(mTransaction), bufferType, entity, *adaptorFactory,
750 [this](const QByteArray &buffer) { enqueueCommand(mSynchronizerQueue, Sink::Commands::ModifyEntityCommand, buffer); }); 684 [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::ModifyEntityCommand, buffer); });
751 } 685 }
752 } else { 686 } else {
753 Warning() << "Failed to get current entity"; 687 Warning() << "Failed to get current entity";
@@ -755,6 +689,118 @@ void GenericResource::createOrModify(Sink::Storage::Transaction &transaction, Si
755 } 689 }
756} 690}
757 691
692KAsync::Job<void> Synchronizer::synchronize()
693{
694 mTransaction = mStorage.createTransaction(Sink::Storage::ReadOnly);
695 mSyncTransaction = mSyncStorage.createTransaction(Sink::Storage::ReadWrite);
696 return synchronizeWithSource().then<void>([this]() {
697 mTransaction.abort();
698 mSyncTransaction.commit();
699 mSyncStore.clear();
700 mEntityStore.clear();
701 });
702}
703
704
705
706SourceWriteBack::SourceWriteBack(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier)
707 : ChangeReplay(resourceInstanceIdentifier),
708 mSyncStorage(Sink::storageLocation(), resourceInstanceIdentifier + ".synchronization", Sink::Storage::ReadWrite),
709 mResourceType(resourceType),
710 mResourceInstanceIdentifier(resourceInstanceIdentifier)
711{
712
713}
714
715EntityStore &SourceWriteBack::store()
716{
717 if (!mEntityStore) {
718 mEntityStore = QSharedPointer<EntityStore>::create(mResourceType, mResourceInstanceIdentifier, mTransaction);
719 }
720 return *mEntityStore;
721}
722
723SyncStore &SourceWriteBack::syncStore()
724{
725 if (!mSyncStore) {
726 mSyncStore = QSharedPointer<SyncStore>::create(mSyncTransaction);
727 }
728 return *mSyncStore;
729}
730
731KAsync::Job<void> SourceWriteBack::replay(const QByteArray &type, const QByteArray &key, const QByteArray &value)
732{
733 mTransaction = mStorage.createTransaction(Sink::Storage::ReadOnly);
734 mSyncTransaction = mSyncStorage.createTransaction(Sink::Storage::ReadWrite);
735
736 Sink::EntityBuffer buffer(value);
737 const Sink::Entity &entity = buffer.entity();
738 const auto metadataBuffer = Sink::EntityBuffer::readBuffer<Sink::Metadata>(entity.metadata());
739 Q_ASSERT(metadataBuffer);
740 if (!metadataBuffer->replayToSource()) {
741 Trace() << "Change is coming from the source";
742 return KAsync::null<void>();
743 }
744 const qint64 revision = metadataBuffer ? metadataBuffer->revision() : -1;
745 const auto operation = metadataBuffer ? metadataBuffer->operation() : Sink::Operation_Creation;
746 const auto uid = Sink::Storage::uidFromKey(key);
747 QByteArray oldRemoteId;
748
749 if (operation != Sink::Operation_Creation) {
750 oldRemoteId = syncStore().resolveLocalId(type, uid);
751 }
752 Trace() << "Replaying " << key << type;
753
754 KAsync::Job<QByteArray> job = KAsync::null<QByteArray>();
755 if (type == ENTITY_TYPE_FOLDER) {
756 auto folder = store().read<ApplicationDomain::Folder>(uid);
757 // const Sink::ApplicationDomain::Folder folder(mResourceInstanceIdentifier, uid, revision, mAdaptorFactories.value(type)->createAdaptor(entity));
758 job = replay(folder, operation, oldRemoteId);
759 } else if (type == ENTITY_TYPE_MAIL) {
760 auto mail = store().read<ApplicationDomain::Mail>(uid);
761 // const Sink::ApplicationDomain::Mail mail(mResourceInstanceIdentifier, uid, revision, mAdaptorFactories.value(type)->createAdaptor(entity));
762 job = replay(mail, operation, oldRemoteId);
763 }
764
765 return job.then<void, QByteArray>([this, operation, type, uid](const QByteArray &remoteId) {
766 Trace() << "Replayed change with remote id: " << remoteId;
767 if (operation == Sink::Operation_Creation) {
768 if (remoteId.isEmpty()) {
769 Warning() << "Returned an empty remoteId from the creation";
770 } else {
771 syncStore().recordRemoteId(type, uid, remoteId);
772 }
773 } else if (operation == Sink::Operation_Modification) {
774 if (remoteId.isEmpty()) {
775 Warning() << "Returned an empty remoteId from the creation";
776 } else {
777 syncStore().updateRemoteId(type, uid, remoteId);
778 }
779 } else if (operation == Sink::Operation_Removal) {
780 syncStore().removeRemoteId(type, uid, remoteId);
781 } else {
782 Warning() << "Unkown operation" << operation;
783 }
784
785 mTransaction.abort();
786 mSyncTransaction.commit();
787 mSyncStore.clear();
788 mEntityStore.clear();
789 }, [](int errorCode, const QString &errorMessage) {
790 Warning() << "Failed to replay change: " << errorMessage;
791 });
792}
793
794KAsync::Job<QByteArray> SourceWriteBack::replay(const ApplicationDomain::Mail &, Sink::Operation, const QByteArray &)
795{
796 return KAsync::null<QByteArray>();
797}
798
799KAsync::Job<QByteArray> SourceWriteBack::replay(const ApplicationDomain::Folder &, Sink::Operation, const QByteArray &)
800{
801 return KAsync::null<QByteArray>();
802}
803
758 804
759#pragma clang diagnostic push 805#pragma clang diagnostic push
760#pragma clang diagnostic ignored "-Wundefined-reinterpret-cast" 806#pragma clang diagnostic ignored "-Wundefined-reinterpret-cast"
diff --git a/common/genericresource.h b/common/genericresource.h
index c551e29..45d5d3a 100644
--- a/common/genericresource.h
+++ b/common/genericresource.h
@@ -24,14 +24,16 @@
24#include <messagequeue.h> 24#include <messagequeue.h>
25#include <flatbuffers/flatbuffers.h> 25#include <flatbuffers/flatbuffers.h>
26#include <domainadaptor.h> 26#include <domainadaptor.h>
27#include "changereplay.h"
28
27#include <QTimer> 29#include <QTimer>
28 30
29class CommandProcessor; 31class CommandProcessor;
30class ChangeReplay;
31 32
32namespace Sink { 33namespace Sink {
33class Pipeline; 34class Pipeline;
34class Preprocessor; 35class Preprocessor;
36class Synchronizer;
35 37
36/** 38/**
37 * Generic Resource implementation. 39 * Generic Resource implementation.
@@ -39,7 +41,7 @@ class Preprocessor;
39class SINK_EXPORT GenericResource : public Resource 41class SINK_EXPORT GenericResource : public Resource
40{ 42{
41public: 43public:
42 GenericResource(const QByteArray &resourceInstanceIdentifier, const QSharedPointer<Pipeline> &pipeline = QSharedPointer<Pipeline>()); 44 GenericResource(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier, const QSharedPointer<Pipeline> &pipeline, const QSharedPointer<ChangeReplay> &changeReplay, const QSharedPointer<Synchronizer> &synchronizer);
43 virtual ~GenericResource(); 45 virtual ~GenericResource();
44 46
45 virtual void processCommand(int commandId, const QByteArray &data) Q_DECL_OVERRIDE; 47 virtual void processCommand(int commandId, const QByteArray &data) Q_DECL_OVERRIDE;
@@ -64,41 +66,90 @@ protected:
64 66
65 void addType(const QByteArray &type, DomainTypeAdaptorFactoryInterface::Ptr factory, const QVector<Sink::Preprocessor *> &preprocessors); 67 void addType(const QByteArray &type, DomainTypeAdaptorFactoryInterface::Ptr factory, const QVector<Sink::Preprocessor *> &preprocessors);
66 68
67 ///Base implementation call the replay$Type calls
68 virtual KAsync::Job<void> replay(Sink::Storage &synchronizationStore, const QByteArray &type, const QByteArray &key, const QByteArray &value);
69 ///Implement to write back changes to the server
70 virtual KAsync::Job<QByteArray> replay(const Sink::ApplicationDomain::Mail &, Sink::Operation, const QByteArray &oldRemoteId);
71 virtual KAsync::Job<QByteArray> replay(const Sink::ApplicationDomain::Folder &, Sink::Operation, const QByteArray &oldRemoteId);
72
73 void onProcessorError(int errorCode, const QString &errorMessage); 69 void onProcessorError(int errorCode, const QString &errorMessage);
74 void enqueueCommand(MessageQueue &mq, int commandId, const QByteArray &data); 70 void enqueueCommand(MessageQueue &mq, int commandId, const QByteArray &data);
75 71
76 static void createEntity(const QByteArray &localId, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject, 72 MessageQueue mUserQueue;
77 DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function<void(const QByteArray &)> callback); 73 MessageQueue mSynchronizerQueue;
78 static void modifyEntity(const QByteArray &localId, qint64 revision, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject, 74 QByteArray mResourceType;
79 DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function<void(const QByteArray &)> callback); 75 QByteArray mResourceInstanceIdentifier;
80 static void deleteEntity(const QByteArray &localId, qint64 revision, const QByteArray &bufferType, std::function<void(const QByteArray &)> callback); 76 QSharedPointer<Pipeline> mPipeline;
77
78private:
79 CommandProcessor *mProcessor;
80 QSharedPointer<ChangeReplay> mChangeReplay;
81 QSharedPointer<Synchronizer> mSynchronizer;
82 int mError;
83 QTimer mCommitQueueTimer;
84 qint64 mClientLowerBoundRevision;
85 QHash<QByteArray, DomainTypeAdaptorFactoryInterface::Ptr> mAdaptorFactories;
86};
87
88class SINK_EXPORT SyncStore
89{
90public:
91 SyncStore(Sink::Storage::Transaction &);
81 92
82 /** 93 /**
83 * Records a localId to remoteId mapping 94 * Records a localId to remoteId mapping
84 */ 95 */
85 void recordRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId, Sink::Storage::Transaction &transaction); 96 void recordRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId);
86 void removeRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId, Sink::Storage::Transaction &transaction); 97 void removeRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId);
87 void updateRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId, Sink::Storage::Transaction &transaction); 98 void updateRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId);
88 99
89 /** 100 /**
90 * Tries to find a local id for the remote id, and creates a new local id otherwise. 101 * Tries to find a local id for the remote id, and creates a new local id otherwise.
91 * 102 *
92 * The new local id is recorded in the local to remote id mapping. 103 * The new local id is recorded in the local to remote id mapping.
93 */ 104 */
94 QByteArray resolveRemoteId(const QByteArray &type, const QByteArray &remoteId, Sink::Storage::Transaction &transaction); 105 QByteArray resolveRemoteId(const QByteArray &type, const QByteArray &remoteId);
95 106
96 /** 107 /**
97 * Tries to find a remote id for a local id. 108 * Tries to find a remote id for a local id.
98 * 109 *
99 * This can fail if the entity hasn't been written back to the server yet. 110 * This can fail if the entity hasn't been written back to the server yet.
100 */ 111 */
101 QByteArray resolveLocalId(const QByteArray &bufferType, const QByteArray &localId, Sink::Storage::Transaction &transaction); 112 QByteArray resolveLocalId(const QByteArray &bufferType, const QByteArray &localId);
113
114private:
115 Sink::Storage::Transaction &mTransaction;
116};
117
118class SINK_EXPORT EntityStore
119{
120public:
121 EntityStore(const QByteArray &resourceType, const QByteArray &mResourceInstanceIdentifier, Sink::Storage::Transaction &transaction);
122
123 template<typename T>
124 T read(const QByteArray &identifier) const;
125
126 static QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> getLatest(const Sink::Storage::NamedDatabase &db, const QByteArray &uid, DomainTypeAdaptorFactoryInterface &adaptorFactory);
127private:
128 QByteArray mResourceType;
129 QByteArray mResourceInstanceIdentifier;
130 Sink::Storage::Transaction &mTransaction;
131};
132
133/**
134 * Synchronize and add what we don't already have to local queue
135 */
136class SINK_EXPORT Synchronizer
137{
138public:
139 Synchronizer(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier);
140
141 void setup(const std::function<void(int commandId, const QByteArray &data)> &enqueueCommandCallback);
142 KAsync::Job<void> synchronize();
143
144protected:
145 ///Calls the callback to enqueue the command
146 void enqueueCommand(int commandId, const QByteArray &data);
147
148 static void createEntity(const QByteArray &localId, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject,
149 DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function<void(const QByteArray &)> callback);
150 static void modifyEntity(const QByteArray &localId, qint64 revision, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject,
151 DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function<void(const QByteArray &)> callback);
152 static void deleteEntity(const QByteArray &localId, qint64 revision, const QByteArray &bufferType, std::function<void(const QByteArray &)> callback);
102 153
103 /** 154 /**
104 * A synchronous algorithm to remove entities that are no longer existing. 155 * A synchronous algorithm to remove entities that are no longer existing.
@@ -110,7 +161,7 @@ protected:
110 * 161 *
111 * All functions are called synchronously, and both @param entryGenerator and @param exists need to be synchronous. 162 * All functions are called synchronously, and both @param entryGenerator and @param exists need to be synchronous.
112 */ 163 */
113 void scanForRemovals(Sink::Storage::Transaction &transaction, Sink::Storage::Transaction &synchronizationTransaction, const QByteArray &bufferType, 164 void scanForRemovals(const QByteArray &bufferType,
114 const std::function<void(const std::function<void(const QByteArray &key)> &callback)> &entryGenerator, std::function<bool(const QByteArray &remoteId)> exists); 165 const std::function<void(const std::function<void(const QByteArray &key)> &callback)> &entryGenerator, std::function<bool(const QByteArray &remoteId)> exists);
115 166
116 /** 167 /**
@@ -118,22 +169,60 @@ protected:
118 * 169 *
119 * Depending on whether the entity is locally available, or has changed. 170 * Depending on whether the entity is locally available, or has changed.
120 */ 171 */
121 void createOrModify(Sink::Storage::Transaction &transaction, Sink::Storage::Transaction &synchronizationTransaction, DomainTypeAdaptorFactoryInterface &adaptorFactory, 172 void createOrModify(const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity);
122 const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity);
123 173
124 static QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> getLatest(const Sink::Storage::NamedDatabase &db, const QByteArray &uid, DomainTypeAdaptorFactoryInterface &adaptorFactory); 174 //Read only access to main storage
175 EntityStore &store();
125 176
126 MessageQueue mUserQueue; 177 //Read/Write access to sync storage
127 MessageQueue mSynchronizerQueue; 178 SyncStore &syncStore();
179
180 virtual KAsync::Job<void> synchronizeWithSource() = 0;
181
182private:
183 QSharedPointer<SyncStore> mSyncStore;
184 QSharedPointer<EntityStore> mEntityStore;
185 Sink::Storage mStorage;
186 Sink::Storage mSyncStorage;
187 QByteArray mResourceType;
128 QByteArray mResourceInstanceIdentifier; 188 QByteArray mResourceInstanceIdentifier;
129 QSharedPointer<Pipeline> mPipeline; 189 Sink::Storage::Transaction mTransaction;
190 Sink::Storage::Transaction mSyncTransaction;
191 std::function<void(int commandId, const QByteArray &data)> mEnqueue;
192};
193
194/**
195 * Replay changes to the source
196 */
197class SINK_EXPORT SourceWriteBack : public ChangeReplay
198{
199public:
200 SourceWriteBack(const QByteArray &resourceType,const QByteArray &resourceInstanceIdentifier);
201
202protected:
203 ///Base implementation calls the replay$Type calls
204 virtual KAsync::Job<void> replay(const QByteArray &type, const QByteArray &key, const QByteArray &value) Q_DECL_OVERRIDE;
205
206protected:
207 ///Implement to write back changes to the server
208 virtual KAsync::Job<QByteArray> replay(const Sink::ApplicationDomain::Mail &, Sink::Operation, const QByteArray &oldRemoteId);
209 virtual KAsync::Job<QByteArray> replay(const Sink::ApplicationDomain::Folder &, Sink::Operation, const QByteArray &oldRemoteId);
210
211 //Read only access to main storage
212 EntityStore &store();
213
214 //Read/Write access to sync storage
215 SyncStore &syncStore();
130 216
131private: 217private:
132 CommandProcessor *mProcessor; 218 Sink::Storage mSyncStorage;
133 ChangeReplay *mSourceChangeReplay; 219 QSharedPointer<SyncStore> mSyncStore;
134 int mError; 220 QSharedPointer<EntityStore> mEntityStore;
135 QTimer mCommitQueueTimer; 221 Sink::Storage::Transaction mTransaction;
136 qint64 mClientLowerBoundRevision; 222 Sink::Storage::Transaction mSyncTransaction;
137 QHash<QByteArray, DomainTypeAdaptorFactoryInterface::Ptr> mAdaptorFactories; 223 QByteArray mResourceType;
224 QByteArray mResourceInstanceIdentifier;
138}; 225};
226
227
139} 228}
diff --git a/common/pipeline.cpp b/common/pipeline.cpp
index 637a1b8..7863f67 100644
--- a/common/pipeline.cpp
+++ b/common/pipeline.cpp
@@ -34,6 +34,7 @@
34#include "entitybuffer.h" 34#include "entitybuffer.h"
35#include "log.h" 35#include "log.h"
36#include "domain/applicationdomaintype.h" 36#include "domain/applicationdomaintype.h"
37#include "adaptorfactoryregistry.h"
37#include "definitions.h" 38#include "definitions.h"
38#include "bufferutils.h" 39#include "bufferutils.h"
39 40
@@ -52,11 +53,11 @@ public:
52 Storage storage; 53 Storage storage;
53 Storage::Transaction transaction; 54 Storage::Transaction transaction;
54 QHash<QString, QVector<Preprocessor *>> processors; 55 QHash<QString, QVector<Preprocessor *>> processors;
55 QHash<QString, DomainTypeAdaptorFactoryInterface::Ptr> adaptorFactory;
56 bool revisionChanged; 56 bool revisionChanged;
57 void storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid); 57 void storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid);
58 QTime transactionTime; 58 QTime transactionTime;
59 int transactionItemCount; 59 int transactionItemCount;
60 QByteArray resourceType;
60}; 61};
61 62
62void Pipeline::Private::storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid) 63void Pipeline::Private::storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid)
@@ -84,9 +85,9 @@ void Pipeline::setPreprocessors(const QString &entityType, const QVector<Preproc
84 d->processors[entityType] = processors; 85 d->processors[entityType] = processors;
85} 86}
86 87
87void Pipeline::setAdaptorFactory(const QString &entityType, DomainTypeAdaptorFactoryInterface::Ptr factory) 88void Pipeline::setResourceType(const QByteArray &resourceType)
88{ 89{
89 d->adaptorFactory.insert(entityType, factory); 90 d->resourceType = resourceType;
90} 91}
91 92
92void Pipeline::startTransaction() 93void Pipeline::startTransaction()
@@ -102,7 +103,9 @@ void Pipeline::startTransaction()
102 Trace() << "Starting transaction."; 103 Trace() << "Starting transaction.";
103 d->transactionTime.start(); 104 d->transactionTime.start();
104 d->transactionItemCount = 0; 105 d->transactionItemCount = 0;
105 d->transaction = std::move(storage().createTransaction(Storage::ReadWrite)); 106 d->transaction = std::move(storage().createTransaction(Storage::ReadWrite, [](const Sink::Storage::Error &error) {
107 Warning() << error.message;
108 }));
106} 109}
107 110
108void Pipeline::commit() 111void Pipeline::commit()
@@ -189,7 +192,7 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size)
189 auto metadataBuffer = metadataBuilder.Finish(); 192 auto metadataBuffer = metadataBuilder.Finish();
190 FinishMetadataBuffer(metadataFbb, metadataBuffer); 193 FinishMetadataBuffer(metadataFbb, metadataBuffer);
191 194
192 auto adaptorFactory = d->adaptorFactory.value(bufferType); 195 auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceType, bufferType);
193 if (!adaptorFactory) { 196 if (!adaptorFactory) {
194 Warning() << "no adaptor factory for type " << bufferType; 197 Warning() << "no adaptor factory for type " << bufferType;
195 return KAsync::error<qint64>(0); 198 return KAsync::error<qint64>(0);
@@ -244,7 +247,7 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size)
244 } 247 }
245 248
246 // TODO use only readPropertyMapper and writePropertyMapper 249 // TODO use only readPropertyMapper and writePropertyMapper
247 auto adaptorFactory = d->adaptorFactory.value(bufferType); 250 auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceType, bufferType);
248 if (!adaptorFactory) { 251 if (!adaptorFactory) {
249 Warning() << "no adaptor factory for type " << bufferType; 252 Warning() << "no adaptor factory for type " << bufferType;
250 return KAsync::error<qint64>(0); 253 return KAsync::error<qint64>(0);
@@ -373,7 +376,7 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size)
373 flatbuffers::FlatBufferBuilder fbb; 376 flatbuffers::FlatBufferBuilder fbb;
374 EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), 0, 0, 0, 0); 377 EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), 0, 0, 0, 0);
375 378
376 auto adaptorFactory = d->adaptorFactory.value(bufferType); 379 auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceType, bufferType);
377 if (!adaptorFactory) { 380 if (!adaptorFactory) {
378 Warning() << "no adaptor factory for type " << bufferType; 381 Warning() << "no adaptor factory for type " << bufferType;
379 return KAsync::error<qint64>(0); 382 return KAsync::error<qint64>(0);
diff --git a/common/pipeline.h b/common/pipeline.h
index c65cbfd..2ca87a4 100644
--- a/common/pipeline.h
+++ b/common/pipeline.h
@@ -46,13 +46,12 @@ public:
46 46
47 Storage &storage() const; 47 Storage &storage() const;
48 48
49 void setResourceType(const QByteArray &resourceType);
49 void setPreprocessors(const QString &entityType, const QVector<Preprocessor *> &preprocessors); 50 void setPreprocessors(const QString &entityType, const QVector<Preprocessor *> &preprocessors);
50 void startTransaction(); 51 void startTransaction();
51 void commit(); 52 void commit();
52 Storage::Transaction &transaction(); 53 Storage::Transaction &transaction();
53 54
54 void setAdaptorFactory(const QString &entityType, DomainTypeAdaptorFactoryInterface::Ptr factory);
55
56 KAsync::Job<qint64> newEntity(void const *command, size_t size); 55 KAsync::Job<qint64> newEntity(void const *command, size_t size);
57 KAsync::Job<qint64> modifiedEntity(void const *command, size_t size); 56 KAsync::Job<qint64> modifiedEntity(void const *command, size_t size);
58 KAsync::Job<qint64> deletedEntity(void const *command, size_t size); 57 KAsync::Job<qint64> deletedEntity(void const *command, size_t size);
diff --git a/common/resource.cpp b/common/resource.cpp
index 6713686..82c9fc8 100644
--- a/common/resource.cpp
+++ b/common/resource.cpp
@@ -26,6 +26,7 @@
26#include <QPointer> 26#include <QPointer>
27 27
28#include "facadefactory.h" 28#include "facadefactory.h"
29#include "adaptorfactoryregistry.h"
29 30
30namespace Sink { 31namespace Sink {
31 32
@@ -110,6 +111,7 @@ ResourceFactory *ResourceFactory::load(const QString &resourceName)
110 if (factory) { 111 if (factory) {
111 Private::s_loadedFactories.insert(resourceName, factory); 112 Private::s_loadedFactories.insert(resourceName, factory);
112 factory->registerFacades(FacadeFactory::instance()); 113 factory->registerFacades(FacadeFactory::instance());
114 factory->registerAdaptorFactories(AdaptorFactoryRegistry::instance());
113 // TODO: if we need more data on it const QJsonObject json = loader.metaData()[QStringLiteral("MetaData")].toObject(); 115 // TODO: if we need more data on it const QJsonObject json = loader.metaData()[QStringLiteral("MetaData")].toObject();
114 return factory; 116 return factory;
115 } else { 117 } else {
diff --git a/common/resource.h b/common/resource.h
index 0e7cd11..d6c3c5f 100644
--- a/common/resource.h
+++ b/common/resource.h
@@ -26,6 +26,7 @@
26 26
27namespace Sink { 27namespace Sink {
28class FacadeFactory; 28class FacadeFactory;
29class AdaptorFactoryRegistry;
29 30
30/** 31/**
31 * Resource interface 32 * Resource interface
@@ -81,6 +82,7 @@ public:
81 82
82 virtual Resource *createResource(const QByteArray &instanceIdentifier) = 0; 83 virtual Resource *createResource(const QByteArray &instanceIdentifier) = 0;
83 virtual void registerFacades(FacadeFactory &factory) = 0; 84 virtual void registerFacades(FacadeFactory &factory) = 0;
85 virtual void registerAdaptorFactories(AdaptorFactoryRegistry &registry) {};
84 86
85private: 87private:
86 class Private; 88 class Private;
diff --git a/examples/dummyresource/resourcefactory.cpp b/examples/dummyresource/resourcefactory.cpp
index 48858da..609d23e 100644
--- a/examples/dummyresource/resourcefactory.cpp
+++ b/examples/dummyresource/resourcefactory.cpp
@@ -35,6 +35,7 @@
35#include "definitions.h" 35#include "definitions.h"
36#include "facadefactory.h" 36#include "facadefactory.h"
37#include "indexupdater.h" 37#include "indexupdater.h"
38#include "adaptorfactoryregistry.h"
38#include <QDate> 39#include <QDate>
39#include <QUuid> 40#include <QUuid>
40 41
@@ -43,8 +44,87 @@
43#define ENTITY_TYPE_MAIL "mail" 44#define ENTITY_TYPE_MAIL "mail"
44#define ENTITY_TYPE_FOLDER "folder" 45#define ENTITY_TYPE_FOLDER "folder"
45 46
47class DummySynchronizer : public Sink::Synchronizer {
48 public:
49
50 DummySynchronizer(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier)
51 : Sink::Synchronizer(resourceType, resourceInstanceIdentifier)
52 {
53
54 }
55
56 Sink::ApplicationDomain::Event::Ptr createEvent(const QByteArray &ridBuffer, const QMap<QString, QVariant> &data)
57 {
58 static uint8_t rawData[100];
59 auto event = Sink::ApplicationDomain::Event::Ptr::create();
60 event->setProperty("summary", data.value("summary").toString());
61 event->setProperty("remoteId", ridBuffer);
62 event->setProperty("description", data.value("description").toString());
63 event->setProperty("attachment", QByteArray::fromRawData(reinterpret_cast<const char*>(rawData), 100));
64 return event;
65 }
66
67 Sink::ApplicationDomain::Mail::Ptr createMail(const QByteArray &ridBuffer, const QMap<QString, QVariant> &data)
68 {
69 auto mail = Sink::ApplicationDomain::Mail::Ptr::create();
70 mail->setProperty("subject", data.value("subject").toString());
71 mail->setProperty("senderEmail", data.value("senderEmail").toString());
72 mail->setProperty("senderName", data.value("senderName").toString());
73 mail->setProperty("date", data.value("date").toString());
74 mail->setProperty("folder", syncStore().resolveRemoteId(ENTITY_TYPE_FOLDER, data.value("parentFolder").toByteArray()));
75 mail->setProperty("unread", data.value("unread").toBool());
76 mail->setProperty("important", data.value("important").toBool());
77 return mail;
78 }
79
80 Sink::ApplicationDomain::Folder::Ptr createFolder(const QByteArray &ridBuffer, const QMap<QString, QVariant> &data)
81 {
82 auto folder = Sink::ApplicationDomain::Folder::Ptr::create();
83 folder->setProperty("name", data.value("name").toString());
84 folder->setProperty("icon", data.value("icon").toString());
85 if (!data.value("parent").toString().isEmpty()) {
86 auto sinkId = syncStore().resolveRemoteId(ENTITY_TYPE_FOLDER, data.value("parent").toByteArray());
87 folder->setProperty("parent", sinkId);
88 }
89 return folder;
90 }
91
92 void synchronize(const QByteArray &bufferType, const QMap<QString, QMap<QString, QVariant> > &data, std::function<Sink::ApplicationDomain::ApplicationDomainType::Ptr(const QByteArray &ridBuffer, const QMap<QString, QVariant> &data)> createEntity)
93 {
94 auto time = QSharedPointer<QTime>::create();
95 time->start();
96 //TODO find items to remove
97 int count = 0;
98 for (auto it = data.constBegin(); it != data.constEnd(); it++) {
99 count++;
100 const auto remoteId = it.key().toUtf8();
101 auto entity = createEntity(remoteId, it.value());
102 createOrModify(bufferType, remoteId, *entity);
103 }
104 Trace() << "Sync of " << count << " entities of type " << bufferType << " done." << Sink::Log::TraceTime(time->elapsed());
105 }
106
107 KAsync::Job<void> synchronizeWithSource() Q_DECL_OVERRIDE
108 {
109 Log() << " Synchronizing with the source";
110 return KAsync::start<void>([this]() {
111 synchronize(ENTITY_TYPE_EVENT, DummyStore::instance().events(), [this](const QByteArray &ridBuffer, const QMap<QString, QVariant> &data) {
112 return createEvent(ridBuffer, data);
113 });
114 synchronize(ENTITY_TYPE_MAIL, DummyStore::instance().mails(), [this](const QByteArray &ridBuffer, const QMap<QString, QVariant> &data) {
115 return createMail(ridBuffer, data);
116 });
117 synchronize(ENTITY_TYPE_FOLDER, DummyStore::instance().folders(), [this](const QByteArray &ridBuffer, const QMap<QString, QVariant> &data) {
118 return createFolder(ridBuffer, data);
119 });
120 Log() << "Done Synchronizing";
121 });
122 }
123
124};
125
46DummyResource::DummyResource(const QByteArray &instanceIdentifier, const QSharedPointer<Sink::Pipeline> &pipeline) 126DummyResource::DummyResource(const QByteArray &instanceIdentifier, const QSharedPointer<Sink::Pipeline> &pipeline)
47 : Sink::GenericResource(instanceIdentifier, pipeline), 127 : Sink::GenericResource(PLUGIN_NAME, instanceIdentifier, pipeline, QSharedPointer<Sink::NullChangeReplay>::create(), QSharedPointer<DummySynchronizer>::create(PLUGIN_NAME, instanceIdentifier)),
48 mEventAdaptorFactory(QSharedPointer<DummyEventAdaptorFactory>::create()), 128 mEventAdaptorFactory(QSharedPointer<DummyEventAdaptorFactory>::create()),
49 mMailAdaptorFactory(QSharedPointer<DummyMailAdaptorFactory>::create()), 129 mMailAdaptorFactory(QSharedPointer<DummyMailAdaptorFactory>::create()),
50 mFolderAdaptorFactory(QSharedPointer<DummyFolderAdaptorFactory>::create()) 130 mFolderAdaptorFactory(QSharedPointer<DummyFolderAdaptorFactory>::create())
@@ -57,80 +137,9 @@ DummyResource::DummyResource(const QByteArray &instanceIdentifier, const QShared
57 QVector<Sink::Preprocessor*>() << new DefaultIndexUpdater<Sink::ApplicationDomain::Event>); 137 QVector<Sink::Preprocessor*>() << new DefaultIndexUpdater<Sink::ApplicationDomain::Event>);
58} 138}
59 139
60Sink::ApplicationDomain::Event::Ptr DummyResource::createEvent(const QByteArray &ridBuffer, const QMap<QString, QVariant> &data, Sink::Storage::Transaction &transaction) 140DummyResource::~DummyResource()
61{
62 static uint8_t rawData[100];
63 auto event = Sink::ApplicationDomain::Event::Ptr::create();
64 event->setProperty("summary", data.value("summary").toString());
65 event->setProperty("remoteId", ridBuffer);
66 event->setProperty("description", data.value("description").toString());
67 event->setProperty("attachment", QByteArray::fromRawData(reinterpret_cast<const char*>(rawData), 100));
68 return event;
69}
70
71Sink::ApplicationDomain::Mail::Ptr DummyResource::createMail(const QByteArray &ridBuffer, const QMap<QString, QVariant> &data, Sink::Storage::Transaction &transaction)
72{ 141{
73 auto mail = Sink::ApplicationDomain::Mail::Ptr::create();
74 mail->setProperty("subject", data.value("subject").toString());
75 mail->setProperty("senderEmail", data.value("senderEmail").toString());
76 mail->setProperty("senderName", data.value("senderName").toString());
77 mail->setProperty("date", data.value("date").toString());
78 mail->setProperty("folder", resolveRemoteId(ENTITY_TYPE_FOLDER, data.value("parentFolder").toByteArray(), transaction));
79 mail->setProperty("unread", data.value("unread").toBool());
80 mail->setProperty("important", data.value("important").toBool());
81 return mail;
82}
83 142
84Sink::ApplicationDomain::Folder::Ptr DummyResource::createFolder(const QByteArray &ridBuffer, const QMap<QString, QVariant> &data, Sink::Storage::Transaction &transaction)
85{
86 auto folder = Sink::ApplicationDomain::Folder::Ptr::create();
87 folder->setProperty("name", data.value("name").toString());
88 folder->setProperty("icon", data.value("icon").toString());
89 if (!data.value("parent").toString().isEmpty()) {
90 auto sinkId = resolveRemoteId(ENTITY_TYPE_FOLDER, data.value("parent").toByteArray(), transaction);
91 folder->setProperty("parent", sinkId);
92 }
93 return folder;
94}
95
96void DummyResource::synchronize(const QByteArray &bufferType, const QMap<QString, QMap<QString, QVariant> > &data, Sink::Storage::Transaction &transaction, Sink::Storage::Transaction &synchronizationTransaction, DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function<Sink::ApplicationDomain::ApplicationDomainType::Ptr(const QByteArray &ridBuffer, const QMap<QString, QVariant> &data, Sink::Storage::Transaction &)> createEntity)
97{
98 auto time = QSharedPointer<QTime>::create();
99 time->start();
100 //TODO find items to remove
101 int count = 0;
102 for (auto it = data.constBegin(); it != data.constEnd(); it++) {
103 count++;
104 const auto remoteId = it.key().toUtf8();
105 auto entity = createEntity(remoteId, it.value(), synchronizationTransaction);
106 createOrModify(transaction, synchronizationTransaction, adaptorFactory, bufferType, remoteId, *entity);
107 }
108 Trace() << "Sync of " << count << " entities of type " << bufferType << " done." << Sink::Log::TraceTime(time->elapsed());
109}
110
111KAsync::Job<void> DummyResource::synchronizeWithSource(Sink::Storage &mainStore, Sink::Storage &synchronizationStore)
112{
113 Log() << " Synchronizing";
114 return KAsync::start<void>([this, &mainStore, &synchronizationStore]() {
115 auto transaction = mainStore.createTransaction(Sink::Storage::ReadOnly);
116 auto synchronizationTransaction = synchronizationStore.createTransaction(Sink::Storage::ReadWrite);
117 synchronize(ENTITY_TYPE_EVENT, DummyStore::instance().events(), transaction, synchronizationTransaction, *mEventAdaptorFactory, [this](const QByteArray &ridBuffer, const QMap<QString, QVariant> &data, Sink::Storage::Transaction &synchronizationTransaction) {
118 return createEvent(ridBuffer, data, synchronizationTransaction);
119 });
120 synchronize(ENTITY_TYPE_MAIL, DummyStore::instance().mails(), transaction, synchronizationTransaction, *mMailAdaptorFactory, [this](const QByteArray &ridBuffer, const QMap<QString, QVariant> &data, Sink::Storage::Transaction &synchronizationTransaction) {
121 return createMail(ridBuffer, data, synchronizationTransaction);
122 });
123 synchronize(ENTITY_TYPE_FOLDER, DummyStore::instance().folders(), transaction, synchronizationTransaction, *mFolderAdaptorFactory, [this](const QByteArray &ridBuffer, const QMap<QString, QVariant> &data, Sink::Storage::Transaction &synchronizationTransaction) {
124 return createFolder(ridBuffer, data, synchronizationTransaction);
125 });
126 Log() << "Done Synchronizing";
127 });
128}
129
130KAsync::Job<void> DummyResource::replay(Sink::Storage &synchronizationStore, const QByteArray &type, const QByteArray &key, const QByteArray &value)
131{
132 Trace() << "Replaying " << key;
133 return KAsync::null<void>();
134} 143}
135 144
136void DummyResource::removeDataFromDisk() 145void DummyResource::removeDataFromDisk()
@@ -160,6 +169,7 @@ KAsync::Job<void> DummyResource::inspect(int inspectionType, const QByteArray &i
160 return KAsync::null<void>(); 169 return KAsync::null<void>();
161} 170}
162 171
172
163DummyResourceFactory::DummyResourceFactory(QObject *parent) 173DummyResourceFactory::DummyResourceFactory(QObject *parent)
164 : Sink::ResourceFactory(parent) 174 : Sink::ResourceFactory(parent)
165{ 175{
@@ -178,3 +188,10 @@ void DummyResourceFactory::registerFacades(Sink::FacadeFactory &factory)
178 factory.registerFacade<Sink::ApplicationDomain::Folder, DummyResourceFolderFacade>(PLUGIN_NAME); 188 factory.registerFacade<Sink::ApplicationDomain::Folder, DummyResourceFolderFacade>(PLUGIN_NAME);
179} 189}
180 190
191void DummyResourceFactory::registerAdaptorFactories(Sink::AdaptorFactoryRegistry &registry)
192{
193 registry.registerFactory<Sink::ApplicationDomain::Folder, DummyFolderAdaptorFactory>(PLUGIN_NAME);
194 registry.registerFactory<Sink::ApplicationDomain::Mail, DummyMailAdaptorFactory>(PLUGIN_NAME);
195 registry.registerFactory<Sink::ApplicationDomain::Event, DummyEventAdaptorFactory>(PLUGIN_NAME);
196}
197
diff --git a/examples/dummyresource/resourcefactory.h b/examples/dummyresource/resourcefactory.h
index 865f6e5..f73eb32 100644
--- a/examples/dummyresource/resourcefactory.h
+++ b/examples/dummyresource/resourcefactory.h
@@ -37,13 +37,12 @@ class DummyResource : public Sink::GenericResource
37{ 37{
38public: 38public:
39 DummyResource(const QByteArray &instanceIdentifier, const QSharedPointer<Sink::Pipeline> &pipeline = QSharedPointer<Sink::Pipeline>()); 39 DummyResource(const QByteArray &instanceIdentifier, const QSharedPointer<Sink::Pipeline> &pipeline = QSharedPointer<Sink::Pipeline>());
40 KAsync::Job<void> synchronizeWithSource(Sink::Storage &mainStore, Sink::Storage &synchronizationStore) Q_DECL_OVERRIDE; 40 virtual ~DummyResource();
41 using GenericResource::synchronizeWithSource; 41
42 void removeDataFromDisk() Q_DECL_OVERRIDE; 42 void removeDataFromDisk() Q_DECL_OVERRIDE;
43 static void removeFromDisk(const QByteArray &instanceIdentifier); 43 static void removeFromDisk(const QByteArray &instanceIdentifier);
44 KAsync::Job<void> inspect(int inspectionType, const QByteArray &inspectionId, const QByteArray &domainType, const QByteArray &entityId, const QByteArray &property, const QVariant &expectedValue) Q_DECL_OVERRIDE; 44 KAsync::Job<void> inspect(int inspectionType, const QByteArray &inspectionId, const QByteArray &domainType, const QByteArray &entityId, const QByteArray &property, const QVariant &expectedValue) Q_DECL_OVERRIDE;
45private: 45private:
46 KAsync::Job<void> replay(Sink::Storage &synchronizationStore, const QByteArray &type, const QByteArray &key, const QByteArray &value) Q_DECL_OVERRIDE;
47 Sink::ApplicationDomain::Event::Ptr createEvent(const QByteArray &rid, const QMap<QString, QVariant> &data, Sink::Storage::Transaction &); 46 Sink::ApplicationDomain::Event::Ptr createEvent(const QByteArray &rid, const QMap<QString, QVariant> &data, Sink::Storage::Transaction &);
48 Sink::ApplicationDomain::Mail::Ptr createMail(const QByteArray &rid, const QMap<QString, QVariant> &data, Sink::Storage::Transaction &); 47 Sink::ApplicationDomain::Mail::Ptr createMail(const QByteArray &rid, const QMap<QString, QVariant> &data, Sink::Storage::Transaction &);
49 Sink::ApplicationDomain::Folder::Ptr createFolder(const QByteArray &rid, const QMap<QString, QVariant> &data, Sink::Storage::Transaction &); 48 Sink::ApplicationDomain::Folder::Ptr createFolder(const QByteArray &rid, const QMap<QString, QVariant> &data, Sink::Storage::Transaction &);
@@ -65,5 +64,6 @@ public:
65 64
66 Sink::Resource *createResource(const QByteArray &instanceIdentifier) Q_DECL_OVERRIDE; 65 Sink::Resource *createResource(const QByteArray &instanceIdentifier) Q_DECL_OVERRIDE;
67 void registerFacades(Sink::FacadeFactory &factory) Q_DECL_OVERRIDE; 66 void registerFacades(Sink::FacadeFactory &factory) Q_DECL_OVERRIDE;
67 void registerAdaptorFactories(Sink::AdaptorFactoryRegistry &registry) Q_DECL_OVERRIDE;
68}; 68};
69 69
diff --git a/examples/maildirresource/maildirresource.h b/examples/maildirresource/maildirresource.h
index e58dc16..30c565c 100644
--- a/examples/maildirresource/maildirresource.h
+++ b/examples/maildirresource/maildirresource.h
@@ -74,5 +74,6 @@ public:
74 74
75 Sink::Resource *createResource(const QByteArray &instanceIdentifier) Q_DECL_OVERRIDE; 75 Sink::Resource *createResource(const QByteArray &instanceIdentifier) Q_DECL_OVERRIDE;
76 void registerFacades(Sink::FacadeFactory &factory) Q_DECL_OVERRIDE; 76 void registerFacades(Sink::FacadeFactory &factory) Q_DECL_OVERRIDE;
77 void registerDomainTypeAdaptors(Sink::AdaptorFactory &factory) Q_DECL_OVERRIDE;
77}; 78};
78 79
diff --git a/tests/dummyresourcetest.cpp b/tests/dummyresourcetest.cpp
index 58df2da..d41f235 100644
--- a/tests/dummyresourcetest.cpp
+++ b/tests/dummyresourcetest.cpp
@@ -11,6 +11,7 @@
11#include "modelresult.h" 11#include "modelresult.h"
12#include "pipeline.h" 12#include "pipeline.h"
13#include "log.h" 13#include "log.h"
14#include "test.h"
14 15
15using namespace Sink; 16using namespace Sink;
16 17
@@ -28,6 +29,7 @@ class DummyResourceTest : public QObject
28private slots: 29private slots:
29 void initTestCase() 30 void initTestCase()
30 { 31 {
32 Sink::Test::initTest();
31 Sink::Log::setDebugOutputLevel(Sink::Log::Trace); 33 Sink::Log::setDebugOutputLevel(Sink::Log::Trace);
32 auto factory = Sink::ResourceFactory::load("org.kde.dummy"); 34 auto factory = Sink::ResourceFactory::load("org.kde.dummy");
33 QVERIFY(factory); 35 QVERIFY(factory);
diff --git a/tests/mailquerybenchmark.cpp b/tests/mailquerybenchmark.cpp
index 20ee63c..4e58899 100644
--- a/tests/mailquerybenchmark.cpp
+++ b/tests/mailquerybenchmark.cpp
@@ -33,6 +33,7 @@
33#include <common/pipeline.h> 33#include <common/pipeline.h>
34#include <common/index.h> 34#include <common/index.h>
35#include <common/indexupdater.h> 35#include <common/indexupdater.h>
36#include <common/adaptorfactoryregistry.h>
36 37
37#include "hawd/dataset.h" 38#include "hawd/dataset.h"
38#include "hawd/formatter.h" 39#include "hawd/formatter.h"
@@ -59,12 +60,11 @@ class MailQueryBenchmark : public QObject
59 TestResource::removeFromDisk(resourceIdentifier); 60 TestResource::removeFromDisk(resourceIdentifier);
60 61
61 auto pipeline = QSharedPointer<Sink::Pipeline>::create(resourceIdentifier); 62 auto pipeline = QSharedPointer<Sink::Pipeline>::create(resourceIdentifier);
63 pipeline->setResourceType("test");
62 64
63 auto mailFactory = QSharedPointer<TestMailAdaptorFactory>::create();
64 auto indexer = QSharedPointer<DefaultIndexUpdater<Sink::ApplicationDomain::Mail>>::create(); 65 auto indexer = QSharedPointer<DefaultIndexUpdater<Sink::ApplicationDomain::Mail>>::create();
65 66
66 pipeline->setPreprocessors("mail", QVector<Sink::Preprocessor *>() << indexer.data()); 67 pipeline->setPreprocessors("mail", QVector<Sink::Preprocessor *>() << indexer.data());
67 pipeline->setAdaptorFactory("mail", mailFactory);
68 68
69 auto domainTypeAdaptorFactory = QSharedPointer<TestMailAdaptorFactory>::create(); 69 auto domainTypeAdaptorFactory = QSharedPointer<TestMailAdaptorFactory>::create();
70 70
@@ -149,6 +149,7 @@ private slots:
149 void init() 149 void init()
150 { 150 {
151 resourceIdentifier = "org.kde.test.instance1"; 151 resourceIdentifier = "org.kde.test.instance1";
152 Sink::AdaptorFactoryRegistry::instance().registerFactory<Sink::ApplicationDomain::Mail, TestMailAdaptorFactory>("test");
152 } 153 }
153 154
154 void test50k() 155 void test50k()
diff --git a/tests/pipelinebenchmark.cpp b/tests/pipelinebenchmark.cpp
index 0133a6c..51481fd 100644
--- a/tests/pipelinebenchmark.cpp
+++ b/tests/pipelinebenchmark.cpp
@@ -33,6 +33,7 @@
33#include <common/pipeline.h> 33#include <common/pipeline.h>
34#include <common/index.h> 34#include <common/index.h>
35#include <common/indexupdater.h> 35#include <common/indexupdater.h>
36#include <common/adaptorfactoryregistry.h>
36 37
37#include "hawd/dataset.h" 38#include "hawd/dataset.h"
38#include "hawd/formatter.h" 39#include "hawd/formatter.h"
@@ -83,10 +84,8 @@ class PipelineBenchmark : public QObject
83 TestResource::removeFromDisk(resourceIdentifier); 84 TestResource::removeFromDisk(resourceIdentifier);
84 85
85 auto pipeline = QSharedPointer<Sink::Pipeline>::create(resourceIdentifier); 86 auto pipeline = QSharedPointer<Sink::Pipeline>::create(resourceIdentifier);
86
87 auto mailFactory = QSharedPointer<TestMailAdaptorFactory>::create();
88 pipeline->setPreprocessors("mail", preprocessors); 87 pipeline->setPreprocessors("mail", preprocessors);
89 pipeline->setAdaptorFactory("mail", mailFactory); 88 pipeline->setResourceType("test");
90 89
91 QTime time; 90 QTime time;
92 time.start(); 91 time.start();
@@ -131,6 +130,7 @@ private slots:
131 void init() 130 void init()
132 { 131 {
133 Sink::Log::setDebugOutputLevel(Sink::Log::Warning); 132 Sink::Log::setDebugOutputLevel(Sink::Log::Warning);
133 Sink::AdaptorFactoryRegistry::instance().registerFactory<Sink::ApplicationDomain::Mail, TestMailAdaptorFactory>("test");
134 resourceIdentifier = "org.kde.test.instance1"; 134 resourceIdentifier = "org.kde.test.instance1";
135 } 135 }
136 136
diff --git a/tests/pipelinetest.cpp b/tests/pipelinetest.cpp
index 9290050..6ea2041 100644
--- a/tests/pipelinetest.cpp
+++ b/tests/pipelinetest.cpp
@@ -19,6 +19,7 @@
19#include "log.h" 19#include "log.h"
20#include "domainadaptor.h" 20#include "domainadaptor.h"
21#include "definitions.h" 21#include "definitions.h"
22#include "adaptorfactoryregistry.h"
22 23
23static void removeFromDisk(const QString &name) 24static void removeFromDisk(const QString &name)
24{ 25{
@@ -185,6 +186,7 @@ private slots:
185 void initTestCase() 186 void initTestCase()
186 { 187 {
187 Sink::Log::setDebugOutputLevel(Sink::Log::Trace); 188 Sink::Log::setDebugOutputLevel(Sink::Log::Trace);
189 Sink::AdaptorFactoryRegistry::instance().registerFactory<Sink::ApplicationDomain::Event, TestEventAdaptorFactory>("test");
188 } 190 }
189 191
190 void init() 192 void init()
@@ -198,9 +200,7 @@ private slots:
198 auto command = createEntityCommand(createEvent(entityFbb)); 200 auto command = createEntityCommand(createEvent(entityFbb));
199 201
200 Sink::Pipeline pipeline("org.kde.pipelinetest.instance1"); 202 Sink::Pipeline pipeline("org.kde.pipelinetest.instance1");
201 203 pipeline.setResourceType("test");
202 auto adaptorFactory = QSharedPointer<TestEventAdaptorFactory>::create();
203 pipeline.setAdaptorFactory("event", adaptorFactory);
204 204
205 pipeline.startTransaction(); 205 pipeline.startTransaction();
206 pipeline.newEntity(command.constData(), command.size()); 206 pipeline.newEntity(command.constData(), command.size());
@@ -216,9 +216,9 @@ private slots:
216 auto command = createEntityCommand(createEvent(entityFbb, "summary", "description")); 216 auto command = createEntityCommand(createEvent(entityFbb, "summary", "description"));
217 217
218 Sink::Pipeline pipeline("org.kde.pipelinetest.instance1"); 218 Sink::Pipeline pipeline("org.kde.pipelinetest.instance1");
219 pipeline.setResourceType("test");
219 220
220 auto adaptorFactory = QSharedPointer<TestEventAdaptorFactory>::create(); 221 auto adaptorFactory = QSharedPointer<TestEventAdaptorFactory>::create();
221 pipeline.setAdaptorFactory("event", adaptorFactory);
222 222
223 // Create the initial revision 223 // Create the initial revision
224 pipeline.startTransaction(); 224 pipeline.startTransaction();
@@ -265,9 +265,9 @@ private slots:
265 auto command = createEntityCommand(createEvent(entityFbb)); 265 auto command = createEntityCommand(createEvent(entityFbb));
266 266
267 Sink::Pipeline pipeline("org.kde.pipelinetest.instance1"); 267 Sink::Pipeline pipeline("org.kde.pipelinetest.instance1");
268 pipeline.setResourceType("test");
268 269
269 auto adaptorFactory = QSharedPointer<TestEventAdaptorFactory>::create(); 270 auto adaptorFactory = QSharedPointer<TestEventAdaptorFactory>::create();
270 pipeline.setAdaptorFactory("event", adaptorFactory);
271 271
272 // Create the initial revision 272 // Create the initial revision
273 pipeline.startTransaction(); 273 pipeline.startTransaction();
@@ -309,7 +309,7 @@ private slots:
309 flatbuffers::FlatBufferBuilder entityFbb; 309 flatbuffers::FlatBufferBuilder entityFbb;
310 auto command = createEntityCommand(createEvent(entityFbb)); 310 auto command = createEntityCommand(createEvent(entityFbb));
311 Sink::Pipeline pipeline("org.kde.pipelinetest.instance1"); 311 Sink::Pipeline pipeline("org.kde.pipelinetest.instance1");
312 pipeline.setAdaptorFactory("event", QSharedPointer<TestEventAdaptorFactory>::create()); 312 pipeline.setResourceType("test");
313 313
314 // Create the initial revision 314 // Create the initial revision
315 pipeline.startTransaction(); 315 pipeline.startTransaction();
@@ -346,9 +346,9 @@ private slots:
346 TestProcessor testProcessor; 346 TestProcessor testProcessor;
347 347
348 Sink::Pipeline pipeline("org.kde.pipelinetest.instance1"); 348 Sink::Pipeline pipeline("org.kde.pipelinetest.instance1");
349 pipeline.setResourceType("test");
349 pipeline.setPreprocessors("event", QVector<Sink::Preprocessor *>() << &testProcessor); 350 pipeline.setPreprocessors("event", QVector<Sink::Preprocessor *>() << &testProcessor);
350 pipeline.startTransaction(); 351 pipeline.startTransaction();
351 pipeline.setAdaptorFactory("event", QSharedPointer<TestEventAdaptorFactory>::create());
352 352
353 // Actual test 353 // Actual test
354 { 354 {
diff --git a/tests/querytest.cpp b/tests/querytest.cpp
index a654931..6d7746e 100644
--- a/tests/querytest.cpp
+++ b/tests/querytest.cpp
@@ -9,6 +9,7 @@
9#include "resourceconfig.h" 9#include "resourceconfig.h"
10#include "log.h" 10#include "log.h"
11#include "modelresult.h" 11#include "modelresult.h"
12#include "test.h"
12 13
13/** 14/**
14 * Test of the query system using the dummy resource. 15 * Test of the query system using the dummy resource.
@@ -21,6 +22,7 @@ class QueryTest : public QObject
21private slots: 22private slots:
22 void initTestCase() 23 void initTestCase()
23 { 24 {
25 Sink::Test::initTest();
24 Sink::Log::setDebugOutputLevel(Sink::Log::Trace); 26 Sink::Log::setDebugOutputLevel(Sink::Log::Trace);
25 auto factory = Sink::ResourceFactory::load("org.kde.dummy"); 27 auto factory = Sink::ResourceFactory::load("org.kde.dummy");
26 QVERIFY(factory); 28 QVERIFY(factory);
diff --git a/tests/testimplementations.h b/tests/testimplementations.h
index 688875d..197602c 100644
--- a/tests/testimplementations.h
+++ b/tests/testimplementations.h
@@ -107,7 +107,7 @@ public:
107class TestResource : public Sink::GenericResource 107class TestResource : public Sink::GenericResource
108{ 108{
109public: 109public:
110 TestResource(const QByteArray &instanceIdentifier, QSharedPointer<Sink::Pipeline> pipeline) : Sink::GenericResource(instanceIdentifier, pipeline) 110 TestResource(const QByteArray &instanceIdentifier, QSharedPointer<Sink::Pipeline> pipeline) : Sink::GenericResource("test", instanceIdentifier, pipeline, QSharedPointer<Sink::ChangeReplay>(), QSharedPointer<Sink::Synchronizer>())
111 { 111 {
112 } 112 }
113 113