summaryrefslogtreecommitdiffstats
path: root/common
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2016-10-16 14:55:20 +0200
committerChristian Mollekopf <chrigi_1@fastmail.fm>2016-10-21 09:02:21 +0200
commit237b9ae4113e7a9f489632296941becb71afdb45 (patch)
tree01cde58f495944f01cad9d282391d4efd2897141 /common
parent95d11bf0be98a4e3c08502fe23417b800233ce14 (diff)
downloadsink-237b9ae4113e7a9f489632296941becb71afdb45.tar.gz
sink-237b9ae4113e7a9f489632296941becb71afdb45.zip
Refactor how the storage is used.
This is the initial refactoring to improve how we deal with the storage. It does a couple of things: * Rename Sink::Storage to Sink::Storage::DataStore to free up the Sink::Storage namespace * Introduce a Sink::ResourceContext to have a single object that can be passed around containing everything that is necessary to operate on a resource. This is a lot better than the multiple separate parameters that we used to pass around all over the place, while still allowing for dependency injection for tests. * Tie storage access together using the new EntityStore that directly works with ApplicationDomainTypes. This gives us a central place where main storage, indexes and buffer adaptors are tied together, which will also give us a place to implement external indexes, such as a fulltextindex using xapian. * Use ApplicationDomainTypes as the default way to pass around entities. Instead of using various ways to pass around entities (buffers, buffer adaptors, ApplicationDomainTypes), only use a single way. The old approach was confusing, and was only done as: * optimization; really shouldn't be necessary and otherwise I'm sure we can find better ways to optimize ApplicationDomainType itself. * a way to account for entities that have multiple buffers, a concept that I no longer deem relevant. While this commit does the bulk of the work to get there, the following commits will refactor more stuff to get things back to normal.
Diffstat (limited to 'common')
-rw-r--r--common/CMakeLists.txt1
-rw-r--r--common/adaptorfactoryregistry.cpp12
-rw-r--r--common/adaptorfactoryregistry.h3
-rw-r--r--common/changereplay.cpp33
-rw-r--r--common/changereplay.h11
-rw-r--r--common/datastorequery.cpp62
-rw-r--r--common/datastorequery.h9
-rw-r--r--common/domain/applicationdomaintype.cpp2
-rw-r--r--common/domain/applicationdomaintype.h2
-rw-r--r--common/domain/event.cpp15
-rw-r--r--common/domain/event.h11
-rw-r--r--common/domain/folder.cpp17
-rw-r--r--common/domain/folder.h10
-rw-r--r--common/domain/mail.cpp52
-rw-r--r--common/domain/mail.h11
-rw-r--r--common/domainadaptor.h7
-rw-r--r--common/domaintypeadaptorfactoryinterface.h4
-rw-r--r--common/entityreader.cpp209
-rw-r--r--common/entityreader.h16
-rw-r--r--common/entitystore.cpp5
-rw-r--r--common/entitystore.h30
-rw-r--r--common/facade.cpp22
-rw-r--r--common/facade.h18
-rw-r--r--common/facadefactory.cpp3
-rw-r--r--common/facadefactory.h7
-rw-r--r--common/genericresource.cpp33
-rw-r--r--common/genericresource.h5
-rw-r--r--common/index.cpp12
-rw-r--r--common/index.h8
-rw-r--r--common/indexupdater.h16
-rw-r--r--common/listener.cpp6
-rw-r--r--common/mailpreprocessor.cpp10
-rw-r--r--common/mailpreprocessor.h10
-rw-r--r--common/messagequeue.cpp20
-rw-r--r--common/messagequeue.h4
-rw-r--r--common/pipeline.cpp99
-rw-r--r--common/pipeline.h44
-rw-r--r--common/queryrunner.cpp106
-rw-r--r--common/queryrunner.h9
-rw-r--r--common/remoteidmap.cpp6
-rw-r--r--common/remoteidmap.h4
-rw-r--r--common/resource.h3
-rw-r--r--common/resourcecontext.h77
-rw-r--r--common/sourcewriteback.cpp31
-rw-r--r--common/sourcewriteback.h13
-rw-r--r--common/specialpurposepreprocessor.cpp48
-rw-r--r--common/specialpurposepreprocessor.h8
-rw-r--r--common/storage.h52
-rw-r--r--common/storage/entitystore.cpp338
-rw-r--r--common/storage/entitystore.h109
-rw-r--r--common/storage_common.cpp50
-rw-r--r--common/storage_lmdb.cpp100
-rw-r--r--common/store.cpp2
-rw-r--r--common/synchronizer.cpp148
-rw-r--r--common/synchronizer.h31
-rw-r--r--common/test.cpp4
-rw-r--r--common/typeindex.cpp24
-rw-r--r--common/typeindex.h20
58 files changed, 1259 insertions, 763 deletions
diff --git a/common/CMakeLists.txt b/common/CMakeLists.txt
index 84fe474..e1e7a51 100644
--- a/common/CMakeLists.txt
+++ b/common/CMakeLists.txt
@@ -76,6 +76,7 @@ set(command_SRCS
76 mailpreprocessor.cpp 76 mailpreprocessor.cpp
77 specialpurposepreprocessor.cpp 77 specialpurposepreprocessor.cpp
78 datastorequery.cpp 78 datastorequery.cpp
79 storage/entitystore.cpp
79 ${storage_SRCS}) 80 ${storage_SRCS})
80 81
81add_library(${PROJECT_NAME} SHARED ${command_SRCS}) 82add_library(${PROJECT_NAME} SHARED ${command_SRCS})
diff --git a/common/adaptorfactoryregistry.cpp b/common/adaptorfactoryregistry.cpp
index 323a02d..91b5a4c 100644
--- a/common/adaptorfactoryregistry.cpp
+++ b/common/adaptorfactoryregistry.cpp
@@ -61,8 +61,20 @@ std::shared_ptr<DomainTypeAdaptorFactoryInterface> AdaptorFactoryRegistry::getFa
61 return std::static_pointer_cast<DomainTypeAdaptorFactoryInterface>(ptr); 61 return std::static_pointer_cast<DomainTypeAdaptorFactoryInterface>(ptr);
62} 62}
63 63
64QMap<QByteArray, DomainTypeAdaptorFactoryInterface::Ptr> AdaptorFactoryRegistry::getFactories(const QByteArray &resource)
65{
66 QMap<QByteArray, DomainTypeAdaptorFactoryInterface::Ptr> map;
67 for (const auto &type : mTypes.values(resource)) {
68 auto f = getFactory(resource, type);
69 //Convert the std::shared_ptr to a QSharedPointer
70 map.insert(type, DomainTypeAdaptorFactoryInterface::Ptr(f.get(), [](DomainTypeAdaptorFactoryInterface *) {}));
71 }
72 return map;
73}
74
64void AdaptorFactoryRegistry::registerFactory(const QByteArray &resource, const std::shared_ptr<void> &instance, const QByteArray typeName) 75void AdaptorFactoryRegistry::registerFactory(const QByteArray &resource, const std::shared_ptr<void> &instance, const QByteArray typeName)
65{ 76{
77 mTypes.insert(resource, typeName);
66 mRegistry.insert(key(resource, typeName), instance); 78 mRegistry.insert(key(resource, typeName), instance);
67} 79}
68 80
diff --git a/common/adaptorfactoryregistry.h b/common/adaptorfactoryregistry.h
index f06120a..47f2612 100644
--- a/common/adaptorfactoryregistry.h
+++ b/common/adaptorfactoryregistry.h
@@ -54,11 +54,14 @@ public:
54 54
55 std::shared_ptr<DomainTypeAdaptorFactoryInterface> getFactory(const QByteArray &resource, const QByteArray &typeName); 55 std::shared_ptr<DomainTypeAdaptorFactoryInterface> getFactory(const QByteArray &resource, const QByteArray &typeName);
56 56
57 QMap<QByteArray, DomainTypeAdaptorFactoryInterface::Ptr> getFactories(const QByteArray &resource);
58
57private: 59private:
58 AdaptorFactoryRegistry(); 60 AdaptorFactoryRegistry();
59 void registerFactory(const QByteArray &resource, const std::shared_ptr<void> &instance, const QByteArray typeName); 61 void registerFactory(const QByteArray &resource, const std::shared_ptr<void> &instance, const QByteArray typeName);
60 62
61 QHash<QByteArray, std::shared_ptr<void>> mRegistry; 63 QHash<QByteArray, std::shared_ptr<void>> mRegistry;
64 QMultiHash<QByteArray, QByteArray> mTypes;
62 static QMutex sMutex; 65 static QMutex sMutex;
63}; 66};
64} 67}
diff --git a/common/changereplay.cpp b/common/changereplay.cpp
index e3b7158..6e58564 100644
--- a/common/changereplay.cpp
+++ b/common/changereplay.cpp
@@ -27,31 +27,32 @@
27#include <QTimer> 27#include <QTimer>
28 28
29using namespace Sink; 29using namespace Sink;
30using namespace Sink::Storage;
30 31
31SINK_DEBUG_AREA("changereplay"); 32SINK_DEBUG_AREA("changereplay");
32 33
33ChangeReplay::ChangeReplay(const QByteArray &resourceName) 34ChangeReplay::ChangeReplay(const ResourceContext &resourceContext)
34 : mStorage(storageLocation(), resourceName, Storage::ReadOnly), mChangeReplayStore(storageLocation(), resourceName + ".changereplay", Storage::ReadWrite), mReplayInProgress(false) 35 : mStorage(storageLocation(), resourceContext.instanceId(), DataStore::ReadOnly), mChangeReplayStore(storageLocation(), resourceContext.instanceId() + ".changereplay", DataStore::ReadWrite), mReplayInProgress(false)
35{ 36{
36 SinkTrace() << "Created change replay: " << resourceName; 37 SinkTrace() << "Created change replay: " << resourceContext.instanceId();
37} 38}
38 39
39qint64 ChangeReplay::getLastReplayedRevision() 40qint64 ChangeReplay::getLastReplayedRevision()
40{ 41{
41 qint64 lastReplayedRevision = 0; 42 qint64 lastReplayedRevision = 0;
42 auto replayStoreTransaction = mChangeReplayStore.createTransaction(Storage::ReadOnly); 43 auto replayStoreTransaction = mChangeReplayStore.createTransaction(DataStore::ReadOnly);
43 replayStoreTransaction.openDatabase().scan("lastReplayedRevision", 44 replayStoreTransaction.openDatabase().scan("lastReplayedRevision",
44 [&lastReplayedRevision](const QByteArray &key, const QByteArray &value) -> bool { 45 [&lastReplayedRevision](const QByteArray &key, const QByteArray &value) -> bool {
45 lastReplayedRevision = value.toLongLong(); 46 lastReplayedRevision = value.toLongLong();
46 return false; 47 return false;
47 }, 48 },
48 [](const Storage::Error &) {}); 49 [](const DataStore::Error &) {});
49 return lastReplayedRevision; 50 return lastReplayedRevision;
50} 51}
51 52
52bool ChangeReplay::allChangesReplayed() 53bool ChangeReplay::allChangesReplayed()
53{ 54{
54 const qint64 topRevision = Storage::maxRevision(mStorage.createTransaction(Storage::ReadOnly, [](const Sink::Storage::Error &error) { 55 const qint64 topRevision = DataStore::maxRevision(mStorage.createTransaction(DataStore::ReadOnly, [](const Sink::Storage::DataStore::Error &error) {
55 SinkWarning() << error.message; 56 SinkWarning() << error.message;
56 })); 57 }));
57 const qint64 lastReplayedRevision = getLastReplayedRevision(); 58 const qint64 lastReplayedRevision = getLastReplayedRevision();
@@ -61,7 +62,7 @@ bool ChangeReplay::allChangesReplayed()
61 62
62void ChangeReplay::recordReplayedRevision(qint64 revision) 63void ChangeReplay::recordReplayedRevision(qint64 revision)
63{ 64{
64 auto replayStoreTransaction = mChangeReplayStore.createTransaction(Storage::ReadWrite, [](const Sink::Storage::Error &error) { 65 auto replayStoreTransaction = mChangeReplayStore.createTransaction(DataStore::ReadWrite, [](const Sink::Storage::DataStore::Error &error) {
65 SinkWarning() << error.message; 66 SinkWarning() << error.message;
66 }); 67 });
67 replayStoreTransaction.openDatabase().write("lastReplayedRevision", QByteArray::number(revision)); 68 replayStoreTransaction.openDatabase().write("lastReplayedRevision", QByteArray::number(revision));
@@ -74,10 +75,10 @@ KAsync::Job<void> ChangeReplay::replayNextRevision()
74 auto topRevision = QSharedPointer<qint64>::create(0); 75 auto topRevision = QSharedPointer<qint64>::create(0);
75 return KAsync::syncStart<void>([this, lastReplayedRevision, topRevision]() { 76 return KAsync::syncStart<void>([this, lastReplayedRevision, topRevision]() {
76 mReplayInProgress = true; 77 mReplayInProgress = true;
77 mMainStoreTransaction = mStorage.createTransaction(Storage::ReadOnly, [](const Sink::Storage::Error &error) { 78 mMainStoreTransaction = mStorage.createTransaction(DataStore::ReadOnly, [](const Sink::Storage::DataStore::Error &error) {
78 SinkWarning() << error.message; 79 SinkWarning() << error.message;
79 }); 80 });
80 auto replayStoreTransaction = mChangeReplayStore.createTransaction(Storage::ReadOnly, [](const Sink::Storage::Error &error) { 81 auto replayStoreTransaction = mChangeReplayStore.createTransaction(DataStore::ReadOnly, [](const Sink::Storage::DataStore::Error &error) {
81 SinkWarning() << error.message; 82 SinkWarning() << error.message;
82 }); 83 });
83 replayStoreTransaction.openDatabase().scan("lastReplayedRevision", 84 replayStoreTransaction.openDatabase().scan("lastReplayedRevision",
@@ -85,8 +86,8 @@ KAsync::Job<void> ChangeReplay::replayNextRevision()
85 *lastReplayedRevision = value.toLongLong(); 86 *lastReplayedRevision = value.toLongLong();
86 return false; 87 return false;
87 }, 88 },
88 [](const Storage::Error &) {}); 89 [](const DataStore::Error &) {});
89 *topRevision = Storage::maxRevision(mMainStoreTransaction); 90 *topRevision = DataStore::maxRevision(mMainStoreTransaction);
90 SinkTrace() << "Changereplay from " << *lastReplayedRevision << " to " << *topRevision; 91 SinkTrace() << "Changereplay from " << *lastReplayedRevision << " to " << *topRevision;
91 }) 92 })
92 .then(KAsync::dowhile( 93 .then(KAsync::dowhile(
@@ -98,11 +99,11 @@ KAsync::Job<void> ChangeReplay::replayNextRevision()
98 qint64 revision = *lastReplayedRevision + 1; 99 qint64 revision = *lastReplayedRevision + 1;
99 KAsync::Job<void> replayJob = KAsync::null<void>(); 100 KAsync::Job<void> replayJob = KAsync::null<void>();
100 while (revision <= *topRevision) { 101 while (revision <= *topRevision) {
101 const auto uid = Storage::getUidFromRevision(mMainStoreTransaction, revision); 102 const auto uid = DataStore::getUidFromRevision(mMainStoreTransaction, revision);
102 const auto type = Storage::getTypeFromRevision(mMainStoreTransaction, revision); 103 const auto type = DataStore::getTypeFromRevision(mMainStoreTransaction, revision);
103 const auto key = Storage::assembleKey(uid, revision); 104 const auto key = DataStore::assembleKey(uid, revision);
104 bool exitLoop = false; 105 bool exitLoop = false;
105 Storage::mainDatabase(mMainStoreTransaction, type) 106 DataStore::mainDatabase(mMainStoreTransaction, type)
106 .scan(key, 107 .scan(key,
107 [&lastReplayedRevision, type, this, &replayJob, &exitLoop, revision](const QByteArray &key, const QByteArray &value) -> bool { 108 [&lastReplayedRevision, type, this, &replayJob, &exitLoop, revision](const QByteArray &key, const QByteArray &value) -> bool {
108 SinkTrace() << "Replaying " << key; 109 SinkTrace() << "Replaying " << key;
@@ -123,7 +124,7 @@ KAsync::Job<void> ChangeReplay::replayNextRevision()
123 } 124 }
124 return false; 125 return false;
125 }, 126 },
126 [key](const Storage::Error &) { SinkError() << "Failed to replay change " << key; }); 127 [key](const DataStore::Error &) { SinkError() << "Failed to replay change " << key; });
127 if (exitLoop) { 128 if (exitLoop) {
128 break; 129 break;
129 } 130 }
diff --git a/common/changereplay.h b/common/changereplay.h
index 88d6ce3..e86c4f2 100644
--- a/common/changereplay.h
+++ b/common/changereplay.h
@@ -24,6 +24,7 @@
24#include <Async/Async> 24#include <Async/Async>
25 25
26#include "storage.h" 26#include "storage.h"
27#include "resourcecontext.h"
27 28
28namespace Sink { 29namespace Sink {
29 30
@@ -38,7 +39,7 @@ class SINK_EXPORT ChangeReplay : public QObject
38{ 39{
39 Q_OBJECT 40 Q_OBJECT
40public: 41public:
41 ChangeReplay(const QByteArray &resourceName); 42 ChangeReplay(const ResourceContext &resourceContext);
42 43
43 qint64 getLastReplayedRevision(); 44 qint64 getLastReplayedRevision();
44 bool allChangesReplayed(); 45 bool allChangesReplayed();
@@ -53,20 +54,20 @@ public slots:
53protected: 54protected:
54 virtual KAsync::Job<void> replay(const QByteArray &type, const QByteArray &key, const QByteArray &value) = 0; 55 virtual KAsync::Job<void> replay(const QByteArray &type, const QByteArray &key, const QByteArray &value) = 0;
55 virtual bool canReplay(const QByteArray &type, const QByteArray &key, const QByteArray &value) = 0; 56 virtual bool canReplay(const QByteArray &type, const QByteArray &key, const QByteArray &value) = 0;
56 Sink::Storage mStorage; 57 Sink::Storage::DataStore mStorage;
57 58
58private: 59private:
59 void recordReplayedRevision(qint64 revision); 60 void recordReplayedRevision(qint64 revision);
60 KAsync::Job<void> replayNextRevision(); 61 KAsync::Job<void> replayNextRevision();
61 Sink::Storage mChangeReplayStore; 62 Sink::Storage::DataStore mChangeReplayStore;
62 bool mReplayInProgress; 63 bool mReplayInProgress;
63 Sink::Storage::Transaction mMainStoreTransaction; 64 Sink::Storage::DataStore::Transaction mMainStoreTransaction;
64}; 65};
65 66
66class NullChangeReplay : public ChangeReplay 67class NullChangeReplay : public ChangeReplay
67{ 68{
68public: 69public:
69 NullChangeReplay(const QByteArray &resourceName) : ChangeReplay(resourceName) {} 70 NullChangeReplay(const ResourceContext &resourceContext) : ChangeReplay(resourceContext) {}
70 KAsync::Job<void> replay(const QByteArray &type, const QByteArray &key, const QByteArray &value) Q_DECL_OVERRIDE { return KAsync::null<void>(); } 71 KAsync::Job<void> replay(const QByteArray &type, const QByteArray &key, const QByteArray &value) Q_DECL_OVERRIDE { return KAsync::null<void>(); }
71 bool canReplay(const QByteArray &type, const QByteArray &key, const QByteArray &value) Q_DECL_OVERRIDE { return false; } 72 bool canReplay(const QByteArray &type, const QByteArray &key, const QByteArray &value) Q_DECL_OVERRIDE { return false; }
72}; 73};
diff --git a/common/datastorequery.cpp b/common/datastorequery.cpp
index 7b7d3a3..d4a83b1 100644
--- a/common/datastorequery.cpp
+++ b/common/datastorequery.cpp
@@ -28,6 +28,7 @@
28#include "event.h" 28#include "event.h"
29 29
30using namespace Sink; 30using namespace Sink;
31using namespace Sink::Storage;
31 32
32 33
33SINK_DEBUG_AREA("datastorequery") 34SINK_DEBUG_AREA("datastorequery")
@@ -299,42 +300,18 @@ public:
299 } 300 }
300}; 301};
301 302
302DataStoreQuery::DataStoreQuery(const Sink::Query &query, const QByteArray &type, Sink::Storage::Transaction &transaction, TypeIndex &typeIndex, std::function<QVariant(const Sink::Entity &entity, const QByteArray &property)> getProperty) 303DataStoreQuery::DataStoreQuery(const Sink::Query &query, const QByteArray &type, EntityStore::Ptr store, TypeIndex &typeIndex, std::function<QVariant(const Sink::Entity &entity, const QByteArray &property)> getProperty)
303 : mQuery(query), mTransaction(transaction), mType(type), mTypeIndex(typeIndex), mDb(Storage::mainDatabase(mTransaction, mType)), mGetProperty(getProperty) 304 : mQuery(query), mType(type), mTypeIndex(typeIndex), mGetProperty(getProperty), mStore(store)
304{ 305{
305 setupQuery(); 306 setupQuery();
306} 307}
307 308
308static inline QVector<QByteArray> fullScan(const Sink::Storage::Transaction &transaction, const QByteArray &bufferType)
309{
310 // TODO use a result set with an iterator, to read values on demand
311 SinkTrace() << "Looking for : " << bufferType;
312 //The scan can return duplicate results if we have multiple revisions, so we use a set to deduplicate.
313 QSet<QByteArray> keys;
314 Storage::mainDatabase(transaction, bufferType)
315 .scan(QByteArray(),
316 [&](const QByteArray &key, const QByteArray &value) -> bool {
317 if (keys.contains(Sink::Storage::uidFromKey(key))) {
318 //Not something that should persist if the replay works, so we keep a message for now.
319 SinkTrace() << "Multiple revisions for key: " << key;
320 }
321 keys << Sink::Storage::uidFromKey(key);
322 return true;
323 },
324 [](const Sink::Storage::Error &error) { SinkWarning() << "Error during query: " << error.message; });
325
326 SinkTrace() << "Full scan retrieved " << keys.size() << " results.";
327 return keys.toList().toVector();
328}
329
330void DataStoreQuery::readEntity(const QByteArray &key, const BufferCallback &resultCallback) 309void DataStoreQuery::readEntity(const QByteArray &key, const BufferCallback &resultCallback)
331{ 310{
332 mDb.findLatest(key, 311 mStore->readLatest(mType, key, [=](const QByteArray &key, const Sink::EntityBuffer &buffer) {
333 [=](const QByteArray &key, const QByteArray &value) -> bool { 312 resultCallback(DataStore::uidFromKey(key), buffer);
334 resultCallback(Sink::Storage::uidFromKey(key), Sink::EntityBuffer(value.data(), value.size()));
335 return false; 313 return false;
336 }, 314 });
337 [&](const Sink::Storage::Error &error) { SinkWarning() << "Error during query: " << error.message << key; });
338} 315}
339 316
340QVariant DataStoreQuery::getProperty(const Sink::Entity &entity, const QByteArray &property) 317QVariant DataStoreQuery::getProperty(const Sink::Entity &entity, const QByteArray &property)
@@ -344,7 +321,7 @@ QVariant DataStoreQuery::getProperty(const Sink::Entity &entity, const QByteArra
344 321
345QVector<QByteArray> DataStoreQuery::indexLookup(const QByteArray &property, const QVariant &value) 322QVector<QByteArray> DataStoreQuery::indexLookup(const QByteArray &property, const QVariant &value)
346{ 323{
347 return mTypeIndex.lookup(property, value, mTransaction); 324 return mStore->indexLookup(mType, property, value);
348} 325}
349 326
350/* ResultSet DataStoreQuery::filterAndSortSet(ResultSet &resultSet, const FilterFunction &filter, const QByteArray &sortProperty) */ 327/* ResultSet DataStoreQuery::filterAndSortSet(ResultSet &resultSet, const FilterFunction &filter, const QByteArray &sortProperty) */
@@ -444,7 +421,7 @@ QSharedPointer<DataStoreQuery> prepareQuery(const QByteArray &type, Args && ...
444QByteArrayList DataStoreQuery::executeSubquery(const Query &subquery) 421QByteArrayList DataStoreQuery::executeSubquery(const Query &subquery)
445{ 422{
446 Q_ASSERT(!subquery.type.isEmpty()); 423 Q_ASSERT(!subquery.type.isEmpty());
447 auto sub = prepareQuery(subquery.type, subquery, mTransaction); 424 auto sub = prepareQuery(subquery.type, subquery, mStore);
448 auto result = sub->execute(); 425 auto result = sub->execute();
449 QByteArrayList ids; 426 QByteArrayList ids;
450 while (result.next([&ids](const ResultSet::Result &result) { 427 while (result.next([&ids](const ResultSet::Result &result) {
@@ -476,13 +453,13 @@ void DataStoreQuery::setupQuery()
476 } else { 453 } else {
477 QSet<QByteArray> appliedFilters; 454 QSet<QByteArray> appliedFilters;
478 455
479 auto resultSet = mTypeIndex.query(mQuery, appliedFilters, appliedSorting, mTransaction); 456 auto resultSet = mStore->indexLookup(mType, mQuery, appliedFilters, appliedSorting);
480 remainingFilters = remainingFilters - appliedFilters; 457 remainingFilters = remainingFilters - appliedFilters;
481 458
482 // We do a full scan if there were no indexes available to create the initial set. 459 // We do a full scan if there were no indexes available to create the initial set.
483 if (appliedFilters.isEmpty()) { 460 if (appliedFilters.isEmpty()) {
484 // TODO this should be replaced by an index lookup on the uid index 461 // TODO this should be replaced by an index lookup on the uid index
485 mSource = Source::Ptr::create(fullScan(mTransaction, mType), this); 462 mSource = Source::Ptr::create(mStore->fullScan(mType), this);
486 } else { 463 } else {
487 mSource = Source::Ptr::create(resultSet, this); 464 mSource = Source::Ptr::create(resultSet, this);
488 } 465 }
@@ -523,26 +500,11 @@ void DataStoreQuery::setupQuery()
523 500
524QVector<QByteArray> DataStoreQuery::loadIncrementalResultSet(qint64 baseRevision) 501QVector<QByteArray> DataStoreQuery::loadIncrementalResultSet(qint64 baseRevision)
525{ 502{
526 const auto bufferType = mType;
527 auto revisionCounter = QSharedPointer<qint64>::create(baseRevision); 503 auto revisionCounter = QSharedPointer<qint64>::create(baseRevision);
528 QVector<QByteArray> changedKeys; 504 QVector<QByteArray> changedKeys;
529 const qint64 topRevision = Sink::Storage::maxRevision(mTransaction); 505 mStore->readRevisions(baseRevision, mType, [&](const QByteArray &key) {
530 // Spit out the revision keys one by one.
531 while (*revisionCounter <= topRevision) {
532 const auto uid = Sink::Storage::getUidFromRevision(mTransaction, *revisionCounter);
533 const auto type = Sink::Storage::getTypeFromRevision(mTransaction, *revisionCounter);
534 // SinkTrace() << "Revision" << *revisionCounter << type << uid;
535 Q_ASSERT(!uid.isEmpty());
536 Q_ASSERT(!type.isEmpty());
537 if (type != bufferType) {
538 // Skip revision
539 *revisionCounter += 1;
540 continue;
541 }
542 const auto key = Sink::Storage::assembleKey(uid, *revisionCounter);
543 *revisionCounter += 1;
544 changedKeys << key; 506 changedKeys << key;
545 } 507 });
546 SinkTrace() << "Finished reading incremental result set:" << *revisionCounter; 508 SinkTrace() << "Finished reading incremental result set:" << *revisionCounter;
547 return changedKeys; 509 return changedKeys;
548} 510}
diff --git a/common/datastorequery.h b/common/datastorequery.h
index 164d721..4cf25b2 100644
--- a/common/datastorequery.h
+++ b/common/datastorequery.h
@@ -25,6 +25,7 @@
25#include "query.h" 25#include "query.h"
26#include "entitybuffer.h" 26#include "entitybuffer.h"
27#include "log.h" 27#include "log.h"
28#include "storage/entitystore.h"
28 29
29 30
30class Source; 31class Source;
@@ -35,11 +36,11 @@ class DataStoreQuery {
35public: 36public:
36 typedef QSharedPointer<DataStoreQuery> Ptr; 37 typedef QSharedPointer<DataStoreQuery> Ptr;
37 38
38 DataStoreQuery(const Sink::Query &query, const QByteArray &type, Sink::Storage::Transaction &transaction, TypeIndex &typeIndex, std::function<QVariant(const Sink::Entity &entity, const QByteArray &property)> getProperty); 39 DataStoreQuery(const Sink::Query &query, const QByteArray &type, Sink::Storage::EntityStore::Ptr store, TypeIndex &typeIndex, std::function<QVariant(const Sink::Entity &entity, const QByteArray &property)> getProperty);
39 ResultSet execute(); 40 ResultSet execute();
40 ResultSet update(qint64 baseRevision); 41 ResultSet update(qint64 baseRevision);
41 42
42protected: 43private:
43 44
44 typedef std::function<bool(const QByteArray &uid, const Sink::EntityBuffer &entityBuffer)> FilterFunction; 45 typedef std::function<bool(const QByteArray &uid, const Sink::EntityBuffer &entityBuffer)> FilterFunction;
45 typedef std::function<void(const QByteArray &uid, const Sink::EntityBuffer &entityBuffer)> BufferCallback; 46 typedef std::function<void(const QByteArray &uid, const Sink::EntityBuffer &entityBuffer)> BufferCallback;
@@ -56,15 +57,15 @@ protected:
56 QByteArrayList executeSubquery(const Sink::Query &subquery); 57 QByteArrayList executeSubquery(const Sink::Query &subquery);
57 58
58 Sink::Query mQuery; 59 Sink::Query mQuery;
59 Sink::Storage::Transaction &mTransaction;
60 const QByteArray mType; 60 const QByteArray mType;
61 TypeIndex &mTypeIndex; 61 TypeIndex &mTypeIndex;
62 Sink::Storage::NamedDatabase mDb;
63 std::function<QVariant(const Sink::Entity &entity, const QByteArray &property)> mGetProperty; 62 std::function<QVariant(const Sink::Entity &entity, const QByteArray &property)> mGetProperty;
64 bool mInitialQuery; 63 bool mInitialQuery;
65 QSharedPointer<FilterBase> mCollector; 64 QSharedPointer<FilterBase> mCollector;
66 QSharedPointer<Source> mSource; 65 QSharedPointer<Source> mSource;
67 66
67 QSharedPointer<Sink::Storage::EntityStore> mStore;
68
68 SINK_DEBUG_COMPONENT(mType) 69 SINK_DEBUG_COMPONENT(mType)
69}; 70};
70 71
diff --git a/common/domain/applicationdomaintype.cpp b/common/domain/applicationdomaintype.cpp
index 2a0d977..3109966 100644
--- a/common/domain/applicationdomaintype.cpp
+++ b/common/domain/applicationdomaintype.cpp
@@ -73,7 +73,7 @@ ApplicationDomainType::~ApplicationDomainType()
73 73
74QByteArray ApplicationDomainType::generateUid() 74QByteArray ApplicationDomainType::generateUid()
75{ 75{
76 return Sink::Storage::generateUid(); 76 return Sink::Storage::DataStore::generateUid();
77} 77}
78 78
79bool ApplicationDomainType::hasProperty(const QByteArray &key) const 79bool ApplicationDomainType::hasProperty(const QByteArray &key) const
diff --git a/common/domain/applicationdomaintype.h b/common/domain/applicationdomaintype.h
index e581e07..39ce2b9 100644
--- a/common/domain/applicationdomaintype.h
+++ b/common/domain/applicationdomaintype.h
@@ -241,6 +241,8 @@ struct SINK_EXPORT SinkResource : public ApplicationDomainType {
241struct SINK_EXPORT Entity : public ApplicationDomainType { 241struct SINK_EXPORT Entity : public ApplicationDomainType {
242 typedef QSharedPointer<Entity> Ptr; 242 typedef QSharedPointer<Entity> Ptr;
243 using ApplicationDomainType::ApplicationDomainType; 243 using ApplicationDomainType::ApplicationDomainType;
244 Entity() = default;
245 Entity(const ApplicationDomainType &other) : ApplicationDomainType(other) {}
244 virtual ~Entity(); 246 virtual ~Entity();
245}; 247};
246 248
diff --git a/common/domain/event.cpp b/common/domain/event.cpp
index f3abd62..d801592 100644
--- a/common/domain/event.cpp
+++ b/common/domain/event.cpp
@@ -42,23 +42,28 @@ static QMutex sMutex;
42 42
43using namespace Sink::ApplicationDomain; 43using namespace Sink::ApplicationDomain;
44 44
45void TypeImplementation<Event>::configureIndex(TypeIndex &index)
46{
47 index.addProperty<QByteArray>(Event::Uid::name);
48}
49
45static TypeIndex &getIndex() 50static TypeIndex &getIndex()
46{ 51{
47 QMutexLocker locker(&sMutex); 52 QMutexLocker locker(&sMutex);
48 static TypeIndex *index = 0; 53 static TypeIndex *index = 0;
49 if (!index) { 54 if (!index) {
50 index = new TypeIndex("event"); 55 index = new TypeIndex("event");
51 index->addProperty<QByteArray>("uid"); 56 TypeImplementation<Event>::configureIndex(*index);
52 } 57 }
53 return *index; 58 return *index;
54} 59}
55 60
56void TypeImplementation<Event>::index(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::Transaction &transaction) 61void TypeImplementation<Event>::index(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::DataStore::Transaction &transaction)
57{ 62{
58 return getIndex().add(identifier, bufferAdaptor, transaction); 63 return getIndex().add(identifier, bufferAdaptor, transaction);
59} 64}
60 65
61void TypeImplementation<Event>::removeIndex(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::Transaction &transaction) 66void TypeImplementation<Event>::removeIndex(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::DataStore::Transaction &transaction)
62{ 67{
63 return getIndex().remove(identifier, bufferAdaptor, transaction); 68 return getIndex().remove(identifier, bufferAdaptor, transaction);
64} 69}
@@ -83,10 +88,10 @@ QSharedPointer<WritePropertyMapper<TypeImplementation<Event>::BufferBuilder> > T
83 return propertyMapper; 88 return propertyMapper;
84} 89}
85 90
86DataStoreQuery::Ptr TypeImplementation<Event>::prepareQuery(const Sink::Query &query, Sink::Storage::Transaction &transaction) 91DataStoreQuery::Ptr TypeImplementation<Event>::prepareQuery(const Sink::Query &query, Sink::Storage::EntityStore::Ptr store)
87{ 92{
88 auto mapper = initializeReadPropertyMapper(); 93 auto mapper = initializeReadPropertyMapper();
89 return DataStoreQuery::Ptr::create(query, ApplicationDomain::getTypeName<Event>(), transaction, getIndex(), [mapper](const Sink::Entity &entity, const QByteArray &property) { 94 return DataStoreQuery::Ptr::create(query, ApplicationDomain::getTypeName<Event>(), store, getIndex(), [mapper](const Sink::Entity &entity, const QByteArray &property) {
90 95
91 const auto localBuffer = Sink::EntityBuffer::readBuffer<Buffer>(entity.local()); 96 const auto localBuffer = Sink::EntityBuffer::readBuffer<Buffer>(entity.local());
92 return mapper->getProperty(property, localBuffer); 97 return mapper->getProperty(property, localBuffer);
diff --git a/common/domain/event.h b/common/domain/event.h
index 684b58e..ce9691d 100644
--- a/common/domain/event.h
+++ b/common/domain/event.h
@@ -21,6 +21,7 @@
21#include "applicationdomaintype.h" 21#include "applicationdomaintype.h"
22 22
23#include "storage.h" 23#include "storage.h"
24#include "storage/entitystore.h"
24 25
25class ResultSet; 26class ResultSet;
26class QByteArray; 27class QByteArray;
@@ -32,6 +33,8 @@ class WritePropertyMapper;
32 33
33class DataStoreQuery; 34class DataStoreQuery;
34 35
36class TypeIndex;
37
35namespace Sink { 38namespace Sink {
36 class Query; 39 class Query;
37 40
@@ -51,10 +54,12 @@ class TypeImplementation<Sink::ApplicationDomain::Event> {
51public: 54public:
52 typedef Sink::ApplicationDomain::Buffer::Event Buffer; 55 typedef Sink::ApplicationDomain::Buffer::Event Buffer;
53 typedef Sink::ApplicationDomain::Buffer::EventBuilder BufferBuilder; 56 typedef Sink::ApplicationDomain::Buffer::EventBuilder BufferBuilder;
57 static void configureIndex(TypeIndex &index);
54 static QSet<QByteArray> indexedProperties(); 58 static QSet<QByteArray> indexedProperties();
55 static QSharedPointer<DataStoreQuery> prepareQuery(const Sink::Query &query, Sink::Storage::Transaction &transaction); 59 static QSharedPointer<DataStoreQuery> prepareQuery(const Sink::Query &query, Sink::Storage::EntityStore::Ptr store);
56 static void index(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::Transaction &transaction); 60
57 static void removeIndex(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::Transaction &transaction); 61 static void index(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::DataStore::Transaction &transaction);
62 static void removeIndex(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::DataStore::Transaction &transaction);
58 static QSharedPointer<ReadPropertyMapper<Buffer> > initializeReadPropertyMapper(); 63 static QSharedPointer<ReadPropertyMapper<Buffer> > initializeReadPropertyMapper();
59 static QSharedPointer<WritePropertyMapper<BufferBuilder> > initializeWritePropertyMapper(); 64 static QSharedPointer<WritePropertyMapper<BufferBuilder> > initializeWritePropertyMapper();
60}; 65};
diff --git a/common/domain/folder.cpp b/common/domain/folder.cpp
index 824fa0b..f04a3e7 100644
--- a/common/domain/folder.cpp
+++ b/common/domain/folder.cpp
@@ -44,25 +44,30 @@ static QMutex sMutex;
44 44
45using namespace Sink::ApplicationDomain; 45using namespace Sink::ApplicationDomain;
46 46
47void TypeImplementation<Folder>::configureIndex(TypeIndex &index)
48{
49 index.addProperty<QByteArray>(Folder::Parent::name);
50 index.addProperty<QString>(Folder::Name::name);
51}
52
47static TypeIndex &getIndex() 53static TypeIndex &getIndex()
48{ 54{
49 QMutexLocker locker(&sMutex); 55 QMutexLocker locker(&sMutex);
50 static TypeIndex *index = 0; 56 static TypeIndex *index = 0;
51 if (!index) { 57 if (!index) {
52 index = new TypeIndex("folder"); 58 index = new TypeIndex("folder");
53 index->addProperty<QByteArray>("parent"); 59 TypeImplementation<Folder>::configureIndex(*index);
54 index->addProperty<QString>("name");
55 } 60 }
56 return *index; 61 return *index;
57} 62}
58 63
59void TypeImplementation<Folder>::index(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::Transaction &transaction) 64void TypeImplementation<Folder>::index(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::DataStore::Transaction &transaction)
60{ 65{
61 SinkTrace() << "Indexing " << identifier; 66 SinkTrace() << "Indexing " << identifier;
62 getIndex().add(identifier, bufferAdaptor, transaction); 67 getIndex().add(identifier, bufferAdaptor, transaction);
63} 68}
64 69
65void TypeImplementation<Folder>::removeIndex(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::Transaction &transaction) 70void TypeImplementation<Folder>::removeIndex(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::DataStore::Transaction &transaction)
66{ 71{
67 getIndex().remove(identifier, bufferAdaptor, transaction); 72 getIndex().remove(identifier, bufferAdaptor, transaction);
68} 73}
@@ -87,10 +92,10 @@ QSharedPointer<WritePropertyMapper<TypeImplementation<Folder>::BufferBuilder> >
87 return propertyMapper; 92 return propertyMapper;
88} 93}
89 94
90DataStoreQuery::Ptr TypeImplementation<Folder>::prepareQuery(const Sink::Query &query, Sink::Storage::Transaction &transaction) 95DataStoreQuery::Ptr TypeImplementation<Folder>::prepareQuery(const Sink::Query &query, Sink::Storage::EntityStore::Ptr store)
91{ 96{
92 auto mapper = initializeReadPropertyMapper(); 97 auto mapper = initializeReadPropertyMapper();
93 return DataStoreQuery::Ptr::create(query, ApplicationDomain::getTypeName<Folder>(), transaction, getIndex(), [mapper](const Sink::Entity &entity, const QByteArray &property) { 98 return DataStoreQuery::Ptr::create(query, ApplicationDomain::getTypeName<Folder>(), store, getIndex(), [mapper](const Sink::Entity &entity, const QByteArray &property) {
94 const auto localBuffer = Sink::EntityBuffer::readBuffer<Buffer>(entity.local()); 99 const auto localBuffer = Sink::EntityBuffer::readBuffer<Buffer>(entity.local());
95 return mapper->getProperty(property, localBuffer); 100 return mapper->getProperty(property, localBuffer);
96 }); 101 });
diff --git a/common/domain/folder.h b/common/domain/folder.h
index e4631de..0a52b01 100644
--- a/common/domain/folder.h
+++ b/common/domain/folder.h
@@ -21,6 +21,7 @@
21#include "applicationdomaintype.h" 21#include "applicationdomaintype.h"
22 22
23#include "storage.h" 23#include "storage.h"
24#include "storage/entitystore.h"
24 25
25class ResultSet; 26class ResultSet;
26class QByteArray; 27class QByteArray;
@@ -31,6 +32,8 @@ class ReadPropertyMapper;
31template<typename T> 32template<typename T>
32class WritePropertyMapper; 33class WritePropertyMapper;
33 34
35class TypeIndex;
36
34namespace Sink { 37namespace Sink {
35 class Query; 38 class Query;
36 39
@@ -45,10 +48,11 @@ class TypeImplementation<Sink::ApplicationDomain::Folder> {
45public: 48public:
46 typedef Sink::ApplicationDomain::Buffer::Folder Buffer; 49 typedef Sink::ApplicationDomain::Buffer::Folder Buffer;
47 typedef Sink::ApplicationDomain::Buffer::FolderBuilder BufferBuilder; 50 typedef Sink::ApplicationDomain::Buffer::FolderBuilder BufferBuilder;
48 static QSharedPointer<DataStoreQuery> prepareQuery(const Sink::Query &query, Sink::Storage::Transaction &transaction); 51 static void configureIndex(TypeIndex &index);
52 static QSharedPointer<DataStoreQuery> prepareQuery(const Sink::Query &query, Sink::Storage::EntityStore::Ptr store);
49 static QSet<QByteArray> indexedProperties(); 53 static QSet<QByteArray> indexedProperties();
50 static void index(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::Transaction &transaction); 54 static void index(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::DataStore::Transaction &transaction);
51 static void removeIndex(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::Transaction &transaction); 55 static void removeIndex(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::DataStore::Transaction &transaction);
52 static QSharedPointer<ReadPropertyMapper<Buffer> > initializeReadPropertyMapper(); 56 static QSharedPointer<ReadPropertyMapper<Buffer> > initializeReadPropertyMapper();
53 static QSharedPointer<WritePropertyMapper<BufferBuilder> > initializeWritePropertyMapper(); 57 static QSharedPointer<WritePropertyMapper<BufferBuilder> > initializeWritePropertyMapper();
54}; 58};
diff --git a/common/domain/mail.cpp b/common/domain/mail.cpp
index 2b6eb84..1b46e28 100644
--- a/common/domain/mail.cpp
+++ b/common/domain/mail.cpp
@@ -45,25 +45,31 @@ static QMutex sMutex;
45using namespace Sink; 45using namespace Sink;
46using namespace Sink::ApplicationDomain; 46using namespace Sink::ApplicationDomain;
47 47
48void TypeImplementation<Mail>::configureIndex(TypeIndex &index)
49{
50 index.addProperty<QByteArray>(Mail::Uid::name);
51 index.addProperty<QByteArray>(Mail::Sender::name);
52 index.addProperty<QByteArray>(Mail::SenderName::name);
53 /* index->addProperty<QString>(Mail::Subject::name); */
54 /* index->addFulltextProperty<QString>(Mail::Subject::name); */
55 index.addProperty<QDateTime>(Mail::Date::name);
56 index.addProperty<QByteArray>(Mail::Folder::name);
57 index.addPropertyWithSorting<QByteArray, QDateTime>(Mail::Folder::name, Mail::Date::name);
58 index.addProperty<QByteArray>(Mail::MessageId::name);
59 index.addProperty<QByteArray>(Mail::ParentMessageId::name);
60
61 index.addProperty<Mail::MessageId>();
62 index.addSecondaryProperty<Mail::MessageId, Mail::ThreadId>();
63 index.addSecondaryProperty<Mail::ThreadId, Mail::MessageId>();
64}
65
48static TypeIndex &getIndex() 66static TypeIndex &getIndex()
49{ 67{
50 QMutexLocker locker(&sMutex); 68 QMutexLocker locker(&sMutex);
51 static TypeIndex *index = 0; 69 static TypeIndex *index = 0;
52 if (!index) { 70 if (!index) {
53 index = new TypeIndex("mail"); 71 index = new TypeIndex("mail");
54 index->addProperty<QByteArray>(Mail::Uid::name); 72 TypeImplementation<Mail>::configureIndex(*index);
55 index->addProperty<QByteArray>(Mail::Sender::name);
56 index->addProperty<QByteArray>(Mail::SenderName::name);
57 index->addProperty<QString>(Mail::Subject::name);
58 index->addProperty<QDateTime>(Mail::Date::name);
59 index->addProperty<QByteArray>(Mail::Folder::name);
60 index->addPropertyWithSorting<QByteArray, QDateTime>(Mail::Folder::name, Mail::Date::name);
61 index->addProperty<QByteArray>(Mail::MessageId::name);
62 index->addProperty<QByteArray>(Mail::ParentMessageId::name);
63
64 index->addProperty<Mail::MessageId>();
65 index->addSecondaryProperty<Mail::MessageId, Mail::ThreadId>();
66 index->addSecondaryProperty<Mail::ThreadId, Mail::MessageId>();
67 } 73 }
68 return *index; 74 return *index;
69} 75}
@@ -122,7 +128,7 @@ static QString stripOffPrefixes(const QString &subject)
122} 128}
123 129
124 130
125static void updateThreadingIndex(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::Transaction &transaction) 131static void updateThreadingIndex(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::DataStore::Transaction &transaction)
126{ 132{
127 auto messageId = bufferAdaptor.getProperty(Mail::MessageId::name); 133 auto messageId = bufferAdaptor.getProperty(Mail::MessageId::name);
128 auto parentMessageId = bufferAdaptor.getProperty(Mail::ParentMessageId::name); 134 auto parentMessageId = bufferAdaptor.getProperty(Mail::ParentMessageId::name);
@@ -164,16 +170,17 @@ static void updateThreadingIndex(const QByteArray &identifier, const BufferAdapt
164 } 170 }
165} 171}
166 172
167void TypeImplementation<Mail>::index(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::Transaction &transaction) 173void TypeImplementation<Mail>::index(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::DataStore::Transaction &transaction)
168{ 174{
169 SinkTrace() << "Indexing " << identifier; 175 SinkTrace() << "Indexing " << identifier;
170 getIndex().add(identifier, bufferAdaptor, transaction); 176 getIndex().add(identifier, bufferAdaptor, transaction);
171 updateThreadingIndex(identifier, bufferAdaptor, transaction); 177 updateThreadingIndex(identifier, bufferAdaptor, transaction);
172} 178}
173 179
174void TypeImplementation<Mail>::removeIndex(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::Transaction &transaction) 180void TypeImplementation<Mail>::removeIndex(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::DataStore::Transaction &transaction)
175{ 181{
176 getIndex().remove(identifier, bufferAdaptor, transaction); 182 getIndex().remove(identifier, bufferAdaptor, transaction);
183 //TODO cleanup threading index
177} 184}
178 185
179QSharedPointer<ReadPropertyMapper<TypeImplementation<Mail>::Buffer> > TypeImplementation<Mail>::initializeReadPropertyMapper() 186QSharedPointer<ReadPropertyMapper<TypeImplementation<Mail>::Buffer> > TypeImplementation<Mail>::initializeReadPropertyMapper()
@@ -218,18 +225,21 @@ QSharedPointer<WritePropertyMapper<TypeImplementation<Mail>::BufferBuilder> > Ty
218} 225}
219 226
220 227
221DataStoreQuery::Ptr TypeImplementation<Mail>::prepareQuery(const Sink::Query &query, Sink::Storage::Transaction &transaction) 228DataStoreQuery::Ptr TypeImplementation<Mail>::prepareQuery(const Sink::Query &query, Sink::Storage::EntityStore::Ptr store)
222{ 229{
223 auto mapper = initializeReadPropertyMapper(); 230 auto mapper = initializeReadPropertyMapper();
224 return DataStoreQuery::Ptr::create(query, ApplicationDomain::getTypeName<Mail>(), transaction, getIndex(), [mapper, &transaction](const Sink::Entity &entity, const QByteArray &property) -> QVariant { 231 return DataStoreQuery::Ptr::create(query, ApplicationDomain::getTypeName<Mail>(), store, getIndex(), [mapper, store](const Sink::Entity &entity, const QByteArray &property) -> QVariant {
225 if (property == Mail::ThreadId::name) { 232 if (property == Mail::ThreadId::name) {
226 const auto localBuffer = Sink::EntityBuffer::readBuffer<Buffer>(entity.local()); 233 const auto localBuffer = Sink::EntityBuffer::readBuffer<Buffer>(entity.local());
227 Q_ASSERT(localBuffer); 234 Q_ASSERT(localBuffer);
228 auto messageId = mapper->getProperty(Mail::MessageId::name, localBuffer); 235 auto messageId = mapper->getProperty(Mail::MessageId::name, localBuffer);
236 //FIXME
229 //This is an index property that we have too lookup 237 //This is an index property that we have too lookup
230 auto thread = getIndex().secondaryLookup<Mail::MessageId, Mail::ThreadId>(messageId, transaction); 238 /* auto thread = getIndex().secondaryLookup<Mail::MessageId, Mail::ThreadId>(messageId); */
231 Q_ASSERT(!thread.isEmpty()); 239 /* auto thread = store->secondaryLookup<Mail::MessageId, Mail::ThreadId>(messageId); */
232 return thread.first(); 240 /* Q_ASSERT(!thread.isEmpty()); */
241 /* return thread.first(); */
242 return QVariant();
233 } else { 243 } else {
234 const auto localBuffer = Sink::EntityBuffer::readBuffer<Buffer>(entity.local()); 244 const auto localBuffer = Sink::EntityBuffer::readBuffer<Buffer>(entity.local());
235 Q_ASSERT(localBuffer); 245 Q_ASSERT(localBuffer);
diff --git a/common/domain/mail.h b/common/domain/mail.h
index ea3ef9e..6c1f670 100644
--- a/common/domain/mail.h
+++ b/common/domain/mail.h
@@ -21,7 +21,7 @@
21#include "applicationdomaintype.h" 21#include "applicationdomaintype.h"
22 22
23#include "storage.h" 23#include "storage.h"
24#include "datastorequery.h" 24#include "storage/entitystore.h"
25 25
26class ResultSet; 26class ResultSet;
27class QByteArray; 27class QByteArray;
@@ -32,6 +32,8 @@ class ReadPropertyMapper;
32template<typename T> 32template<typename T>
33class WritePropertyMapper; 33class WritePropertyMapper;
34 34
35class TypeIndex;
36
35namespace Sink { 37namespace Sink {
36 class Query; 38 class Query;
37 39
@@ -46,10 +48,11 @@ class TypeImplementation<Sink::ApplicationDomain::Mail> {
46public: 48public:
47 typedef Sink::ApplicationDomain::Buffer::Mail Buffer; 49 typedef Sink::ApplicationDomain::Buffer::Mail Buffer;
48 typedef Sink::ApplicationDomain::Buffer::MailBuilder BufferBuilder; 50 typedef Sink::ApplicationDomain::Buffer::MailBuilder BufferBuilder;
49 static QSharedPointer<DataStoreQuery> prepareQuery(const Sink::Query &query, Sink::Storage::Transaction &transaction); 51 static void configureIndex(TypeIndex &index);
52 static QSharedPointer<DataStoreQuery> prepareQuery(const Sink::Query &query, Sink::Storage::EntityStore::Ptr storage);
50 static QSet<QByteArray> indexedProperties(); 53 static QSet<QByteArray> indexedProperties();
51 static void index(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::Transaction &transaction); 54 static void index(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::DataStore::Transaction &transaction);
52 static void removeIndex(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::Transaction &transaction); 55 static void removeIndex(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::DataStore::Transaction &transaction);
53 static QSharedPointer<ReadPropertyMapper<Buffer> > initializeReadPropertyMapper(); 56 static QSharedPointer<ReadPropertyMapper<Buffer> > initializeReadPropertyMapper();
54 static QSharedPointer<WritePropertyMapper<BufferBuilder> > initializeWritePropertyMapper(); 57 static QSharedPointer<WritePropertyMapper<BufferBuilder> > initializeWritePropertyMapper();
55}; 58};
diff --git a/common/domainadaptor.h b/common/domainadaptor.h
index 16fc8c2..6a9d755 100644
--- a/common/domainadaptor.h
+++ b/common/domainadaptor.h
@@ -164,7 +164,7 @@ public:
164 return adaptor; 164 return adaptor;
165 } 165 }
166 166
167 virtual void 167 virtual bool
168 createBuffer(const Sink::ApplicationDomain::ApplicationDomainType &domainObject, flatbuffers::FlatBufferBuilder &fbb, void const *metadataData = 0, size_t metadataSize = 0) Q_DECL_OVERRIDE 168 createBuffer(const Sink::ApplicationDomain::ApplicationDomainType &domainObject, flatbuffers::FlatBufferBuilder &fbb, void const *metadataData = 0, size_t metadataSize = 0) Q_DECL_OVERRIDE
169 { 169 {
170 flatbuffers::FlatBufferBuilder localFbb; 170 flatbuffers::FlatBufferBuilder localFbb;
@@ -180,15 +180,16 @@ public:
180 } 180 }
181 181
182 Sink::EntityBuffer::assembleEntityBuffer(fbb, metadataData, metadataSize, resFbb.GetBufferPointer(), resFbb.GetSize(), localFbb.GetBufferPointer(), localFbb.GetSize()); 182 Sink::EntityBuffer::assembleEntityBuffer(fbb, metadataData, metadataSize, resFbb.GetBufferPointer(), resFbb.GetSize(), localFbb.GetBufferPointer(), localFbb.GetSize());
183 return true;
183 } 184 }
184 185
185 virtual void createBuffer(const QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> &bufferAdaptor, flatbuffers::FlatBufferBuilder &fbb, void const *metadataData = 0, size_t metadataSize = 0) Q_DECL_OVERRIDE 186 virtual bool createBuffer(const QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> &bufferAdaptor, flatbuffers::FlatBufferBuilder &fbb, void const *metadataData = 0, size_t metadataSize = 0) Q_DECL_OVERRIDE
186 { 187 {
187 //TODO rewrite the unterlying functions so we don't have to wrap the bufferAdaptor 188 //TODO rewrite the unterlying functions so we don't have to wrap the bufferAdaptor
188 auto newObject = Sink::ApplicationDomain::ApplicationDomainType("", "", 0, bufferAdaptor); 189 auto newObject = Sink::ApplicationDomain::ApplicationDomainType("", "", 0, bufferAdaptor);
189 //Serialize all properties 190 //Serialize all properties
190 newObject.setChangedProperties(bufferAdaptor->availableProperties().toSet()); 191 newObject.setChangedProperties(bufferAdaptor->availableProperties().toSet());
191 createBuffer(newObject, fbb, metadataData, metadataSize); 192 return createBuffer(newObject, fbb, metadataData, metadataSize);
192 } 193 }
193 194
194 195
diff --git a/common/domaintypeadaptorfactoryinterface.h b/common/domaintypeadaptorfactoryinterface.h
index b498796..8829c87 100644
--- a/common/domaintypeadaptorfactoryinterface.h
+++ b/common/domaintypeadaptorfactoryinterface.h
@@ -44,7 +44,7 @@ public:
44 * 44 *
45 * Note that this only serialized parameters that are part of ApplicationDomainType::changedProperties() 45 * Note that this only serialized parameters that are part of ApplicationDomainType::changedProperties()
46 */ 46 */
47 virtual void 47 virtual bool
48 createBuffer(const Sink::ApplicationDomain::ApplicationDomainType &domainType, flatbuffers::FlatBufferBuilder &fbb, void const *metadataData = 0, size_t metadataSize = 0) = 0; 48 createBuffer(const Sink::ApplicationDomain::ApplicationDomainType &domainType, flatbuffers::FlatBufferBuilder &fbb, void const *metadataData = 0, size_t metadataSize = 0) = 0;
49 virtual void createBuffer(const QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> &bufferAdaptor, flatbuffers::FlatBufferBuilder &fbb, void const *metadataData = 0, size_t metadataSize = 0) = 0; 49 virtual bool createBuffer(const QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> &bufferAdaptor, flatbuffers::FlatBufferBuilder &fbb, void const *metadataData = 0, size_t metadataSize = 0) = 0;
50}; 50};
diff --git a/common/entityreader.cpp b/common/entityreader.cpp
index cca1511..c49d1f7 100644
--- a/common/entityreader.cpp
+++ b/common/entityreader.cpp
@@ -28,75 +28,82 @@ SINK_DEBUG_AREA("entityreader")
28 28
29using namespace Sink; 29using namespace Sink;
30 30
31QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> EntityReaderUtils::getLatest(const Sink::Storage::NamedDatabase &db, const QByteArray &uid, DomainTypeAdaptorFactoryInterface &adaptorFactory, qint64 &retrievedRevision) 31/* QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> EntityReaderUtils::getLatest(const Sink::Storage::DataStore::NamedDatabase &db, const QByteArray &uid, DomainTypeAdaptorFactoryInterface &adaptorFactory, qint64 &retrievedRevision) */
32{ 32/* { */
33 QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> current; 33/* QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> current; */
34 db.findLatest(uid, 34/* db.findLatest(uid, */
35 [&current, &adaptorFactory, &retrievedRevision](const QByteArray &key, const QByteArray &data) -> bool { 35/* [&current, &adaptorFactory, &retrievedRevision](const QByteArray &key, const QByteArray &data) -> bool { */
36 Sink::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); 36/* Sink::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); */
37 if (!buffer.isValid()) { 37/* if (!buffer.isValid()) { */
38 SinkWarning() << "Read invalid buffer from disk"; 38/* SinkWarning() << "Read invalid buffer from disk"; */
39 } else { 39/* } else { */
40 SinkTrace() << "Found value " << key; 40/* SinkTrace() << "Found value " << key; */
41 current = adaptorFactory.createAdaptor(buffer.entity()); 41/* current = adaptorFactory.createAdaptor(buffer.entity()); */
42 retrievedRevision = Sink::Storage::revisionFromKey(key); 42/* retrievedRevision = Sink::Storage::DataStore::revisionFromKey(key); */
43 } 43/* } */
44 return false; 44/* return false; */
45 }, 45/* }, */
46 [](const Sink::Storage::Error &error) { SinkWarning() << "Failed to read current value from storage: " << error.message; }); 46/* [](const Sink::Storage::DataStore::Error &error) { SinkWarning() << "Failed to read current value from storage: " << error.message; }); */
47 return current; 47/* return current; */
48} 48/* } */
49 49
50QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> EntityReaderUtils::get(const Sink::Storage::NamedDatabase &db, const QByteArray &key, DomainTypeAdaptorFactoryInterface &adaptorFactory, qint64 &retrievedRevision) 50/* QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> EntityReaderUtils::get(const Sink::Storage::DataStore::NamedDatabase &db, const QByteArray &key, DomainTypeAdaptorFactoryInterface &adaptorFactory, qint64 &retrievedRevision) */
51{ 51/* { */
52 QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> current; 52/* QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> current; */
53 db.scan(key, 53/* db.scan(key, */
54 [&current, &adaptorFactory, &retrievedRevision](const QByteArray &key, const QByteArray &data) -> bool { 54/* [&current, &adaptorFactory, &retrievedRevision](const QByteArray &key, const QByteArray &data) -> bool { */
55 Sink::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); 55/* Sink::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); */
56 if (!buffer.isValid()) { 56/* if (!buffer.isValid()) { */
57 SinkWarning() << "Read invalid buffer from disk"; 57/* SinkWarning() << "Read invalid buffer from disk"; */
58 } else { 58/* } else { */
59 current = adaptorFactory.createAdaptor(buffer.entity()); 59/* current = adaptorFactory.createAdaptor(buffer.entity()); */
60 retrievedRevision = Sink::Storage::revisionFromKey(key); 60/* retrievedRevision = Sink::Storage::DataStore::revisionFromKey(key); */
61 } 61/* } */
62 return false; 62/* return false; */
63 }, 63/* }, */
64 [](const Sink::Storage::Error &error) { SinkWarning() << "Failed to read current value from storage: " << error.message; }); 64/* [](const Sink::Storage::DataStore::Error &error) { SinkWarning() << "Failed to read current value from storage: " << error.message; }); */
65 return current; 65/* return current; */
66} 66/* } */
67 67
68QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> EntityReaderUtils::getPrevious(const Sink::Storage::NamedDatabase &db, const QByteArray &uid, qint64 revision, DomainTypeAdaptorFactoryInterface &adaptorFactory, qint64 &retrievedRevision) 68/* QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> EntityReaderUtils::getPrevious(const Sink::Storage::DataStore::NamedDatabase &db, const QByteArray &uid, qint64 revision, DomainTypeAdaptorFactoryInterface &adaptorFactory, qint64 &retrievedRevision) */
69{ 69/* { */
70 QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> current; 70/* QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> current; */
71 qint64 latestRevision = 0; 71/* qint64 latestRevision = 0; */
72 db.scan(uid, 72/* db.scan(uid, */
73 [&current, &latestRevision, revision](const QByteArray &key, const QByteArray &) -> bool { 73/* [&current, &latestRevision, revision](const QByteArray &key, const QByteArray &) -> bool { */
74 auto foundRevision = Sink::Storage::revisionFromKey(key); 74/* auto foundRevision = Sink::Storage::DataStore::revisionFromKey(key); */
75 if (foundRevision < revision && foundRevision > latestRevision) { 75/* if (foundRevision < revision && foundRevision > latestRevision) { */
76 latestRevision = foundRevision; 76/* latestRevision = foundRevision; */
77 } 77/* } */
78 return true; 78/* return true; */
79 }, 79/* }, */
80 [](const Sink::Storage::Error &error) { SinkWarning() << "Failed to read current value from storage: " << error.message; }, true); 80/* [](const Sink::Storage::DataStore::Error &error) { SinkWarning() << "Failed to read current value from storage: " << error.message; }, true); */
81 return get(db, Sink::Storage::assembleKey(uid, latestRevision), adaptorFactory, retrievedRevision); 81/* return get(db, Sink::Storage::DataStore::assembleKey(uid, latestRevision), adaptorFactory, retrievedRevision); */
82} 82/* } */
83
84/* template <class DomainType> */
85/* EntityReader<DomainType>::EntityReader(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier, Sink::Storage::DataStore::Transaction &transaction) */
86/* : mResourceInstanceIdentifier(resourceInstanceIdentifier), */
87/* mTransaction(transaction), */
88/* mDomainTypeAdaptorFactoryPtr(Sink::AdaptorFactoryRegistry::instance().getFactory<DomainType>(resourceType)), */
89/* mDomainTypeAdaptorFactory(*mDomainTypeAdaptorFactoryPtr) */
90/* { */
91/* Q_ASSERT(!resourceType.isEmpty()); */
92/* Q_ASSERT(mDomainTypeAdaptorFactoryPtr); */
93/* } */
94
95/* template <class DomainType> */
96/* EntityReader<DomainType>::EntityReader(DomainTypeAdaptorFactoryInterface &domainTypeAdaptorFactory, const QByteArray &resourceInstanceIdentifier, Sink::Storage::DataStore::Transaction &transaction) */
97/* : mResourceInstanceIdentifier(resourceInstanceIdentifier), */
98/* mTransaction(transaction), */
99/* mDomainTypeAdaptorFactory(domainTypeAdaptorFactory) */
100/* { */
101
102/* } */
83 103
84template <class DomainType> 104template <class DomainType>
85EntityReader<DomainType>::EntityReader(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier, Sink::Storage::Transaction &transaction) 105EntityReader<DomainType>::EntityReader(Storage::EntityStore &entityStore)
86 : mResourceInstanceIdentifier(resourceInstanceIdentifier), 106 : mEntityStore(entityStore)
87 mTransaction(transaction),
88 mDomainTypeAdaptorFactoryPtr(Sink::AdaptorFactoryRegistry::instance().getFactory<DomainType>(resourceType)),
89 mDomainTypeAdaptorFactory(*mDomainTypeAdaptorFactoryPtr)
90{
91 Q_ASSERT(!resourceType.isEmpty());
92 Q_ASSERT(mDomainTypeAdaptorFactoryPtr);
93}
94
95template <class DomainType>
96EntityReader<DomainType>::EntityReader(DomainTypeAdaptorFactoryInterface &domainTypeAdaptorFactory, const QByteArray &resourceInstanceIdentifier, Sink::Storage::Transaction &transaction)
97 : mResourceInstanceIdentifier(resourceInstanceIdentifier),
98 mTransaction(transaction),
99 mDomainTypeAdaptorFactory(domainTypeAdaptorFactory)
100{ 107{
101 108
102} 109}
@@ -105,40 +112,28 @@ template <class DomainType>
105DomainType EntityReader<DomainType>::read(const QByteArray &identifier) const 112DomainType EntityReader<DomainType>::read(const QByteArray &identifier) const
106{ 113{
107 auto typeName = ApplicationDomain::getTypeName<DomainType>(); 114 auto typeName = ApplicationDomain::getTypeName<DomainType>();
108 auto mainDatabase = Storage::mainDatabase(mTransaction, typeName); 115 return mEntityStore.readLatest<DomainType>(identifier);
109 qint64 retrievedRevision = 0;
110 auto bufferAdaptor = EntityReaderUtils::getLatest(mainDatabase, identifier, mDomainTypeAdaptorFactory, retrievedRevision);
111 if (!bufferAdaptor) {
112 return DomainType();
113 }
114 return DomainType(mResourceInstanceIdentifier, identifier, retrievedRevision, bufferAdaptor);
115} 116}
116 117
117template <class DomainType> 118template <class DomainType>
118DomainType EntityReader<DomainType>::readFromKey(const QByteArray &key) const 119DomainType EntityReader<DomainType>::readFromKey(const QByteArray &key) const
119{ 120{
120 auto typeName = ApplicationDomain::getTypeName<DomainType>(); 121 /* auto typeName = ApplicationDomain::getTypeName<DomainType>(); */
121 auto mainDatabase = Storage::mainDatabase(mTransaction, typeName); 122 /* auto mainDatabase = Storage::DataStore::mainDatabase(mTransaction, typeName); */
122 qint64 retrievedRevision = 0; 123 /* qint64 retrievedRevision = 0; */
123 auto bufferAdaptor = EntityReaderUtils::get(mainDatabase, key, mDomainTypeAdaptorFactory, retrievedRevision); 124 /* auto bufferAdaptor = EntityReaderUtils::get(mainDatabase, key, mDomainTypeAdaptorFactory, retrievedRevision); */
124 const auto identifier = Storage::uidFromKey(key); 125 /* const auto identifier = Storage::DataStore::uidFromKey(key); */
125 if (!bufferAdaptor) { 126 /* if (!bufferAdaptor) { */
126 return DomainType(); 127 /* return DomainType(); */
127 } 128 /* } */
128 return DomainType(mResourceInstanceIdentifier, identifier, retrievedRevision, bufferAdaptor); 129 /* return DomainType(mResourceInstanceIdentifier, identifier, retrievedRevision, bufferAdaptor); */
130 return mEntityStore.readEntity<DomainType>(key);
129} 131}
130 132
131template <class DomainType> 133template <class DomainType>
132DomainType EntityReader<DomainType>::readPrevious(const QByteArray &uid, qint64 revision) const 134DomainType EntityReader<DomainType>::readPrevious(const QByteArray &uid, qint64 revision) const
133{ 135{
134 auto typeName = ApplicationDomain::getTypeName<DomainType>(); 136 return mEntityStore.readPrevious<DomainType>(uid, revision);
135 auto mainDatabase = Storage::mainDatabase(mTransaction, typeName);
136 qint64 retrievedRevision = 0;
137 auto bufferAdaptor = EntityReaderUtils::getPrevious(mainDatabase, uid, revision, mDomainTypeAdaptorFactory, retrievedRevision);
138 if (!bufferAdaptor) {
139 return DomainType();
140 }
141 return DomainType(mResourceInstanceIdentifier, uid, retrievedRevision, bufferAdaptor);
142} 137}
143 138
144template <class DomainType> 139template <class DomainType>
@@ -157,14 +152,14 @@ QPair<qint64, qint64> EntityReader<DomainType>::executeInitialQuery(const Sink::
157 QTime time; 152 QTime time;
158 time.start(); 153 time.start();
159 154
160 auto preparedQuery = ApplicationDomain::TypeImplementation<DomainType>::prepareQuery(query, mTransaction); 155 auto preparedQuery = ApplicationDomain::TypeImplementation<DomainType>::prepareQuery(query, Storage::EntityStore::Ptr(&mEntityStore, [](Storage::EntityStore *){}));
161 auto resultSet = preparedQuery->execute(); 156 auto resultSet = preparedQuery->execute();
162 157
163 SinkTrace() << "Filtered set retrieved. " << Log::TraceTime(time.elapsed()); 158 SinkTrace() << "Filtered set retrieved. " << Log::TraceTime(time.elapsed());
164 auto replayedEntities = replaySet(resultSet, offset, batchsize, callback); 159 auto replayedEntities = replaySet(resultSet, offset, batchsize, callback);
165 160
166 SinkTrace() << "Initial query took: " << Log::TraceTime(time.elapsed()); 161 SinkTrace() << "Initial query took: " << Log::TraceTime(time.elapsed());
167 return qMakePair(Sink::Storage::maxRevision(mTransaction), replayedEntities); 162 return qMakePair(mEntityStore.maxRevision(), replayedEntities);
168} 163}
169 164
170template <class DomainType> 165template <class DomainType>
@@ -174,14 +169,14 @@ QPair<qint64, qint64> EntityReader<DomainType>::executeIncrementalQuery(const Si
174 time.start(); 169 time.start();
175 const qint64 baseRevision = lastRevision + 1; 170 const qint64 baseRevision = lastRevision + 1;
176 171
177 auto preparedQuery = ApplicationDomain::TypeImplementation<DomainType>::prepareQuery(query, mTransaction); 172 auto preparedQuery = ApplicationDomain::TypeImplementation<DomainType>::prepareQuery(query, Storage::EntityStore::Ptr(&mEntityStore, [](Storage::EntityStore *){}));
178 auto resultSet = preparedQuery->update(baseRevision); 173 auto resultSet = preparedQuery->update(baseRevision);
179 174
180 SinkTrace() << "Filtered set retrieved. " << Log::TraceTime(time.elapsed()); 175 SinkTrace() << "Filtered set retrieved. " << Log::TraceTime(time.elapsed());
181 auto replayedEntities = replaySet(resultSet, 0, 0, callback); 176 auto replayedEntities = replaySet(resultSet, 0, 0, callback);
182 177
183 SinkTrace() << "Incremental query took: " << Log::TraceTime(time.elapsed()); 178 SinkTrace() << "Incremental query took: " << Log::TraceTime(time.elapsed());
184 return qMakePair(Sink::Storage::maxRevision(mTransaction), replayedEntities); 179 return qMakePair(mEntityStore.maxRevision(), replayedEntities);
185} 180}
186 181
187template <class DomainType> 182template <class DomainType>
@@ -190,18 +185,18 @@ qint64 EntityReader<DomainType>::replaySet(ResultSet &resultSet, int offset, int
190 SinkTrace() << "Skipping over " << offset << " results"; 185 SinkTrace() << "Skipping over " << offset << " results";
191 resultSet.skip(offset); 186 resultSet.skip(offset);
192 int counter = 0; 187 int counter = 0;
193 while (!batchSize || (counter < batchSize)) { 188 /* while (!batchSize || (counter < batchSize)) { */
194 const bool ret = 189 /* const bool ret = */
195 resultSet.next([this, &counter, callback](const ResultSet::Result &result) -> bool { 190 /* resultSet.next([this, &counter, callback](const ResultSet::Result &result) -> bool { */
196 counter++; 191 /* counter++; */
197 auto adaptor = mDomainTypeAdaptorFactory.createAdaptor(result.buffer.entity()); 192 /* auto adaptor = mResourceContext.adaptorFactory<DomainType>().createAdaptor(result.buffer.entity()); */
198 Q_ASSERT(adaptor); 193 /* Q_ASSERT(adaptor); */
199 return callback(QSharedPointer<DomainType>::create(mResourceInstanceIdentifier, result.uid, result.buffer.revision(), adaptor), result.operation, result.aggregateValues); 194 /* return callback(QSharedPointer<DomainType>::create(mResourceContext, result.uid, result.buffer.revision(), adaptor), result.operation, result.aggregateValues); */
200 }); 195 /* }); */
201 if (!ret) { 196 /* if (!ret) { */
202 break; 197 /* break; */
203 } 198 /* } */
204 }; 199 /* }; */
205 SinkTrace() << "Replayed " << counter << " results." 200 SinkTrace() << "Replayed " << counter << " results."
206 << "Limit " << batchSize; 201 << "Limit " << batchSize;
207 return counter; 202 return counter;
diff --git a/common/entityreader.h b/common/entityreader.h
index 1e7b086..a641106 100644
--- a/common/entityreader.h
+++ b/common/entityreader.h
@@ -30,9 +30,9 @@
30namespace Sink { 30namespace Sink {
31 31
32namespace EntityReaderUtils { 32namespace EntityReaderUtils {
33 SINK_EXPORT QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> getLatest(const Sink::Storage::NamedDatabase &db, const QByteArray &uid, DomainTypeAdaptorFactoryInterface &adaptorFactory, qint64 &retrievedRevision); 33 SINK_EXPORT QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> getLatest(const Sink::Storage::DataStore::NamedDatabase &db, const QByteArray &uid, DomainTypeAdaptorFactoryInterface &adaptorFactory, qint64 &retrievedRevision);
34 SINK_EXPORT QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> get(const Sink::Storage::NamedDatabase &db, const QByteArray &key, DomainTypeAdaptorFactoryInterface &adaptorFactory, qint64 &retrievedRevision); 34 SINK_EXPORT QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> get(const Sink::Storage::DataStore::NamedDatabase &db, const QByteArray &key, DomainTypeAdaptorFactoryInterface &adaptorFactory, qint64 &retrievedRevision);
35 SINK_EXPORT QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> getPrevious(const Sink::Storage::NamedDatabase &db, const QByteArray &uid, qint64 revision, DomainTypeAdaptorFactoryInterface &adaptorFactory, qint64 &retrievedRevision); 35 SINK_EXPORT QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> getPrevious(const Sink::Storage::DataStore::NamedDatabase &db, const QByteArray &uid, qint64 revision, DomainTypeAdaptorFactoryInterface &adaptorFactory, qint64 &retrievedRevision);
36}; 36};
37 37
38/** 38/**
@@ -41,7 +41,7 @@ namespace EntityReaderUtils {
41 * All callbacks will be called before the end of the function. 41 * All callbacks will be called before the end of the function.
42 * The caller must ensure passed in references remain valid for the lifetime of the object. 42 * The caller must ensure passed in references remain valid for the lifetime of the object.
43 * 43 *
44 * This class is meaent to be instantiated temporarily during reads on the stack. 44 * This class is meant to be instantiated temporarily during reads on the stack.
45 * 45 *
46 * Note that all objects returned in callbacks are only valid during the execution of the callback and may start pointing into invalid memory if shallow-copied. 46 * Note that all objects returned in callbacks are only valid during the execution of the callback and may start pointing into invalid memory if shallow-copied.
47 */ 47 */
@@ -51,8 +51,7 @@ class SINK_EXPORT EntityReader
51 typedef std::function<bool(const typename DomainType::Ptr &domainObject, Sink::Operation operation, const QMap<QByteArray, QVariant> &aggregateValues)> ResultCallback; 51 typedef std::function<bool(const typename DomainType::Ptr &domainObject, Sink::Operation operation, const QMap<QByteArray, QVariant> &aggregateValues)> ResultCallback;
52 52
53public: 53public:
54 EntityReader(const QByteArray &resourceType, const QByteArray &mResourceInstanceIdentifier, Sink::Storage::Transaction &transaction); 54 EntityReader(Storage::EntityStore &store);
55 EntityReader(DomainTypeAdaptorFactoryInterface &domainTypeAdaptorFactory, const QByteArray &resourceInstanceIdentifier, Sink::Storage::Transaction &transaction);
56 55
57 /** 56 /**
58 * Reads the latest revision of an entity identified by @param uid 57 * Reads the latest revision of an entity identified by @param uid
@@ -90,10 +89,7 @@ private:
90 qint64 replaySet(ResultSet &resultSet, int offset, int batchSize, const ResultCallback &callback); 89 qint64 replaySet(ResultSet &resultSet, int offset, int batchSize, const ResultCallback &callback);
91 90
92private: 91private:
93 QByteArray mResourceInstanceIdentifier; 92 Sink::Storage::EntityStore &mEntityStore;
94 Sink::Storage::Transaction &mTransaction;
95 std::shared_ptr<DomainTypeAdaptorFactoryInterface> mDomainTypeAdaptorFactoryPtr;
96 DomainTypeAdaptorFactoryInterface &mDomainTypeAdaptorFactory;
97}; 93};
98 94
99} 95}
diff --git a/common/entitystore.cpp b/common/entitystore.cpp
index 5fb213d..b7b03aa 100644
--- a/common/entitystore.cpp
+++ b/common/entitystore.cpp
@@ -21,9 +21,8 @@
21 21
22using namespace Sink; 22using namespace Sink;
23 23
24EntityStore::EntityStore(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier, Sink::Storage::Transaction &transaction) 24EntityStore::EntityStore(Storage::EntityStore &store_)
25 : mResourceType(resourceType), mResourceInstanceIdentifier(resourceInstanceIdentifier), 25 : store(store_)
26 mTransaction(transaction)
27{ 26{
28 27
29} 28}
diff --git a/common/entitystore.h b/common/entitystore.h
index 6bfe414..3d9ca36 100644
--- a/common/entitystore.h
+++ b/common/entitystore.h
@@ -20,50 +20,42 @@
20#pragma once 20#pragma once
21 21
22#include "sink_export.h" 22#include "sink_export.h"
23#include <domainadaptor.h>
24 23
25#include "storage.h" 24#include "storage/entitystore.h"
26#include "adaptorfactoryregistry.h"
27#include "entityreader.h"
28 25
29namespace Sink { 26namespace Sink {
30 27
31class SINK_EXPORT EntityStore 28class SINK_EXPORT EntityStore
32{ 29{
33public: 30public:
34 EntityStore(const QByteArray &resourceType, const QByteArray &mResourceInstanceIdentifier, Sink::Storage::Transaction &transaction); 31 EntityStore(Storage::EntityStore &store);
35 32
36 template<typename T> 33 template<typename T>
37 T read(const QByteArray &identifier) const 34 T read(const QByteArray &identifier) const
38 { 35 {
39 EntityReader<T> reader(mResourceType, mResourceInstanceIdentifier, mTransaction); 36 return store.readLatest<T>(identifier);
40 return reader.read(identifier);
41 } 37 }
42 38
43 template<typename T> 39 template<typename T>
44 T readFromKey(const QByteArray &key) const 40 T readFromKey(const QByteArray &key) const
45 { 41 {
46 EntityReader<T> reader(mResourceType, mResourceInstanceIdentifier, mTransaction); 42 return store.readEntity<T>(key);
47 return reader.readFromKey(key);
48 } 43 }
49 44
50 template<typename T> 45 template<typename T>
51 T readPrevious(const QByteArray &uid, qint64 revision) const 46 T readPrevious(const QByteArray &uid, qint64 revision) const
52 { 47 {
53 EntityReader<T> reader(mResourceType, mResourceInstanceIdentifier, mTransaction); 48 return store.readPrevious<T>(uid, revision);
54 return reader.readPrevious(uid, revision);
55 } 49 }
56 50
57 template<typename T> 51 /* template<typename T> */
58 EntityReader<T> reader() 52 /* EntityReader<T> reader() */
59 { 53 /* { */
60 return EntityReader<T>(mResourceType, mResourceInstanceIdentifier, mTransaction); 54 /* return EntityReader<T>(mResourceType, mResourceInstanceIdentifier, mTransaction); */
61 } 55 /* } */
62 56
63private: 57private:
64 QByteArray mResourceType; 58 Sink::Storage::EntityStore &store;
65 QByteArray mResourceInstanceIdentifier;
66 Sink::Storage::Transaction &mTransaction;
67}; 59};
68 60
69} 61}
diff --git a/common/facade.cpp b/common/facade.cpp
index 72f7414..3ec58e3 100644
--- a/common/facade.cpp
+++ b/common/facade.cpp
@@ -31,13 +31,9 @@
31using namespace Sink; 31using namespace Sink;
32 32
33template <class DomainType> 33template <class DomainType>
34GenericFacade<DomainType>::GenericFacade( 34GenericFacade<DomainType>::GenericFacade(const ResourceContext &context)
35 const QByteArray &resourceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &adaptorFactory, const QSharedPointer<Sink::ResourceAccessInterface> resourceAccess) 35 : Sink::StoreFacade<DomainType>(), mResourceContext(context), mResourceAccess(mResourceContext.resourceAccess())
36 : Sink::StoreFacade<DomainType>(), mResourceAccess(resourceAccess), mDomainTypeAdaptorFactory(adaptorFactory), mResourceInstanceIdentifier(resourceIdentifier)
37{ 36{
38 if (!mResourceAccess) {
39 mResourceAccess = ResourceAccessFactory::instance().getAccess(resourceIdentifier, ResourceConfig::getResourceType(resourceIdentifier));
40 }
41} 37}
42 38
43template <class DomainType> 39template <class DomainType>
@@ -55,25 +51,23 @@ QByteArray GenericFacade<DomainType>::bufferTypeForDomainType()
55template <class DomainType> 51template <class DomainType>
56KAsync::Job<void> GenericFacade<DomainType>::create(const DomainType &domainObject) 52KAsync::Job<void> GenericFacade<DomainType>::create(const DomainType &domainObject)
57{ 53{
58 if (!mDomainTypeAdaptorFactory) { 54 flatbuffers::FlatBufferBuilder entityFbb;
55 if (!mResourceContext.adaptorFactory<DomainType>().createBuffer(domainObject, entityFbb)) {
59 SinkWarning() << "No domain type adaptor factory available"; 56 SinkWarning() << "No domain type adaptor factory available";
60 return KAsync::error<void>(); 57 return KAsync::error<void>();
61 } 58 }
62 flatbuffers::FlatBufferBuilder entityFbb;
63 mDomainTypeAdaptorFactory->createBuffer(domainObject, entityFbb);
64 return mResourceAccess->sendCreateCommand(domainObject.identifier(), bufferTypeForDomainType(), BufferUtils::extractBuffer(entityFbb)); 59 return mResourceAccess->sendCreateCommand(domainObject.identifier(), bufferTypeForDomainType(), BufferUtils::extractBuffer(entityFbb));
65} 60}
66 61
67template <class DomainType> 62template <class DomainType>
68KAsync::Job<void> GenericFacade<DomainType>::modify(const DomainType &domainObject) 63KAsync::Job<void> GenericFacade<DomainType>::modify(const DomainType &domainObject)
69{ 64{
70 if (!mDomainTypeAdaptorFactory) { 65 SinkTrace() << "Modifying entity: " << domainObject.identifier() << domainObject.changedProperties();
66 flatbuffers::FlatBufferBuilder entityFbb;
67 if (!mResourceContext.adaptorFactory<DomainType>().createBuffer(domainObject, entityFbb)) {
71 SinkWarning() << "No domain type adaptor factory available"; 68 SinkWarning() << "No domain type adaptor factory available";
72 return KAsync::error<void>(); 69 return KAsync::error<void>();
73 } 70 }
74 SinkTrace() << "Modifying entity: " << domainObject.identifier() << domainObject.changedProperties();
75 flatbuffers::FlatBufferBuilder entityFbb;
76 mDomainTypeAdaptorFactory->createBuffer(domainObject, entityFbb);
77 return mResourceAccess->sendModifyCommand(domainObject.identifier(), domainObject.revision(), bufferTypeForDomainType(), QByteArrayList(), BufferUtils::extractBuffer(entityFbb), domainObject.changedProperties()); 71 return mResourceAccess->sendModifyCommand(domainObject.identifier(), domainObject.revision(), bufferTypeForDomainType(), QByteArrayList(), BufferUtils::extractBuffer(entityFbb), domainObject.changedProperties());
78} 72}
79 73
@@ -87,7 +81,7 @@ template <class DomainType>
87QPair<KAsync::Job<void>, typename ResultEmitter<typename DomainType::Ptr>::Ptr> GenericFacade<DomainType>::load(const Sink::Query &query) 81QPair<KAsync::Job<void>, typename ResultEmitter<typename DomainType::Ptr>::Ptr> GenericFacade<DomainType>::load(const Sink::Query &query)
88{ 82{
89 // The runner lives for the lifetime of the query 83 // The runner lives for the lifetime of the query
90 auto runner = new QueryRunner<DomainType>(query, mResourceAccess, mResourceInstanceIdentifier, mDomainTypeAdaptorFactory, bufferTypeForDomainType()); 84 auto runner = new QueryRunner<DomainType>(query, mResourceContext, bufferTypeForDomainType());
91 runner->setResultTransformation(mResultTransformation); 85 runner->setResultTransformation(mResultTransformation);
92 return qMakePair(KAsync::null<void>(), runner->emitter()); 86 return qMakePair(KAsync::null<void>(), runner->emitter());
93} 87}
diff --git a/common/facade.h b/common/facade.h
index b193580..50d93e0 100644
--- a/common/facade.h
+++ b/common/facade.h
@@ -28,6 +28,7 @@
28#include "resourceaccess.h" 28#include "resourceaccess.h"
29#include "domaintypeadaptorfactoryinterface.h" 29#include "domaintypeadaptorfactoryinterface.h"
30#include "storage.h" 30#include "storage.h"
31#include "resourcecontext.h"
31 32
32namespace Sink { 33namespace Sink {
33 34
@@ -48,7 +49,7 @@ class SINK_EXPORT GenericFacade : public Sink::StoreFacade<DomainType>
48{ 49{
49protected: 50protected:
50 SINK_DEBUG_AREA("facade") 51 SINK_DEBUG_AREA("facade")
51 SINK_DEBUG_COMPONENT(mResourceInstanceIdentifier) 52 SINK_DEBUG_COMPONENT(mResourceContext.resourceInstanceIdentifier)
52public: 53public:
53 /** 54 /**
54 * Create a new GenericFacade 55 * Create a new GenericFacade
@@ -56,8 +57,7 @@ public:
56 * @param resourceIdentifier is the identifier of the resource instance 57 * @param resourceIdentifier is the identifier of the resource instance
57 * @param adaptorFactory is the adaptor factory used to generate the mappings from domain to resource types and vice versa 58 * @param adaptorFactory is the adaptor factory used to generate the mappings from domain to resource types and vice versa
58 */ 59 */
59 GenericFacade(const QByteArray &resourceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &adaptorFactory = DomainTypeAdaptorFactoryInterface::Ptr(), 60 GenericFacade(const ResourceContext &context);
60 const QSharedPointer<Sink::ResourceAccessInterface> resourceAccess = QSharedPointer<Sink::ResourceAccessInterface>());
61 virtual ~GenericFacade(); 61 virtual ~GenericFacade();
62 62
63 static QByteArray bufferTypeForDomainType(); 63 static QByteArray bufferTypeForDomainType();
@@ -68,20 +68,18 @@ public:
68 68
69protected: 69protected:
70 std::function<void(Sink::ApplicationDomain::ApplicationDomainType &domainObject)> mResultTransformation; 70 std::function<void(Sink::ApplicationDomain::ApplicationDomainType &domainObject)> mResultTransformation;
71 // TODO use one resource access instance per application & per resource 71 ResourceContext mResourceContext;
72 QSharedPointer<Sink::ResourceAccessInterface> mResourceAccess; 72 Sink::ResourceAccessInterface::Ptr mResourceAccess;
73 DomainTypeAdaptorFactoryInterface::Ptr mDomainTypeAdaptorFactory;
74 QByteArray mResourceInstanceIdentifier;
75}; 73};
76 74
77/** 75/**
78 * A default facade implemenation that simply instantiates a generic resource with the given DomainTypeAdaptorFactory 76 * A default facade implemenation that simply instantiates a generic resource
79 */ 77 */
80template<typename DomainType, typename DomainTypeAdaptorFactory> 78template<typename DomainType>
81class DefaultFacade : public GenericFacade<DomainType> 79class DefaultFacade : public GenericFacade<DomainType>
82{ 80{
83public: 81public:
84 DefaultFacade(const QByteArray &resourceIdentifier) : GenericFacade<DomainType>(resourceIdentifier, QSharedPointer<DomainTypeAdaptorFactory>::create()) {} 82 DefaultFacade(const ResourceContext &context) : GenericFacade<DomainType>(context) {}
85 virtual ~DefaultFacade(){} 83 virtual ~DefaultFacade(){}
86}; 84};
87 85
diff --git a/common/facadefactory.cpp b/common/facadefactory.cpp
index b5a0ff2..107d575 100644
--- a/common/facadefactory.cpp
+++ b/common/facadefactory.cpp
@@ -21,6 +21,7 @@
21 21
22#include "resourcefacade.h" 22#include "resourcefacade.h"
23#include "resource.h" 23#include "resource.h"
24#include "adaptorfactoryregistry.h"
24 25
25using namespace Sink; 26using namespace Sink;
26 27
@@ -72,7 +73,7 @@ std::shared_ptr<void> FacadeFactory::getFacade(const QByteArray &resource, const
72 } 73 }
73 74
74 if (auto factoryFunction = mFacadeRegistry.value(k)) { 75 if (auto factoryFunction = mFacadeRegistry.value(k)) {
75 return factoryFunction(instanceIdentifier); 76 return factoryFunction(ResourceContext{instanceIdentifier, resource, AdaptorFactoryRegistry::instance().getFactories(resource)});
76 } 77 }
77 qWarning() << "Failed to find facade for resource: " << resource << " and type: " << typeName; 78 qWarning() << "Failed to find facade for resource: " << resource << " and type: " << typeName;
78 return std::shared_ptr<void>(); 79 return std::shared_ptr<void>();
diff --git a/common/facadefactory.h b/common/facadefactory.h
index 7313970..8d41705 100644
--- a/common/facadefactory.h
+++ b/common/facadefactory.h
@@ -29,6 +29,7 @@
29 29
30#include "facadeinterface.h" 30#include "facadeinterface.h"
31#include "applicationdomaintype.h" 31#include "applicationdomaintype.h"
32#include "resourcecontext.h"
32#include "log.h" 33#include "log.h"
33 34
34namespace Sink { 35namespace Sink {
@@ -41,7 +42,7 @@ namespace Sink {
41class SINK_EXPORT FacadeFactory 42class SINK_EXPORT FacadeFactory
42{ 43{
43public: 44public:
44 typedef std::function<std::shared_ptr<void>(const QByteArray &)> FactoryFunction; 45 typedef std::function<std::shared_ptr<void>(const ResourceContext &)> FactoryFunction;
45 46
46 void registerStaticFacades(); 47 void registerStaticFacades();
47 48
@@ -52,13 +53,13 @@ public:
52 template <class DomainType, class Facade> 53 template <class DomainType, class Facade>
53 void registerFacade(const QByteArray &resource) 54 void registerFacade(const QByteArray &resource)
54 { 55 {
55 registerFacade(resource, [](const QByteArray &instanceIdentifier) { return std::make_shared<Facade>(instanceIdentifier); }, ApplicationDomain::getTypeName<DomainType>()); 56 registerFacade(resource, [](const ResourceContext &context) { return std::make_shared<Facade>(context); }, ApplicationDomain::getTypeName<DomainType>());
56 } 57 }
57 58
58 template <class DomainType, class Facade> 59 template <class DomainType, class Facade>
59 void registerFacade() 60 void registerFacade()
60 { 61 {
61 registerFacade(QByteArray(), [](const QByteArray &) { return std::make_shared<Facade>(); }, ApplicationDomain::getTypeName<DomainType>()); 62 registerFacade(QByteArray(), [](const ResourceContext &) { return std::make_shared<Facade>(); }, ApplicationDomain::getTypeName<DomainType>());
62 } 63 }
63 64
64 /* 65 /*
diff --git a/common/genericresource.cpp b/common/genericresource.cpp
index ef6edc8..e0d395a 100644
--- a/common/genericresource.cpp
+++ b/common/genericresource.cpp
@@ -45,6 +45,7 @@ static int sBatchSize = 100;
45static int sCommitInterval = 10; 45static int sCommitInterval = 10;
46 46
47using namespace Sink; 47using namespace Sink;
48using namespace Sink::Storage;
48 49
49/** 50/**
50 * Drives the pipeline using the output from all command queues 51 * Drives the pipeline using the output from all command queues
@@ -58,7 +59,7 @@ class CommandProcessor : public QObject
58public: 59public:
59 CommandProcessor(Sink::Pipeline *pipeline, QList<MessageQueue *> commandQueues) : QObject(), mPipeline(pipeline), mCommandQueues(commandQueues), mProcessingLock(false) 60 CommandProcessor(Sink::Pipeline *pipeline, QList<MessageQueue *> commandQueues) : QObject(), mPipeline(pipeline), mCommandQueues(commandQueues), mProcessingLock(false)
60 { 61 {
61 mLowerBoundRevision = Storage::maxRevision(mPipeline->storage().createTransaction(Storage::ReadOnly, [](const Sink::Storage::Error &error) { 62 mLowerBoundRevision = DataStore::maxRevision(mPipeline->storage().createTransaction(DataStore::ReadOnly, [](const Sink::Storage::DataStore::Error &error) {
62 SinkWarning() << error.message; 63 SinkWarning() << error.message;
63 })); 64 }));
64 65
@@ -226,17 +227,15 @@ private:
226 InspectionFunction mInspect; 227 InspectionFunction mInspect;
227}; 228};
228 229
229GenericResource::GenericResource(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier, const QSharedPointer<Pipeline> &pipeline ) 230GenericResource::GenericResource(const ResourceContext &resourceContext, const QSharedPointer<Pipeline> &pipeline )
230 : Sink::Resource(), 231 : Sink::Resource(),
231 mUserQueue(Sink::storageLocation(), resourceInstanceIdentifier + ".userqueue"), 232 mResourceContext(resourceContext),
232 mSynchronizerQueue(Sink::storageLocation(), resourceInstanceIdentifier + ".synchronizerqueue"), 233 mUserQueue(Sink::storageLocation(), resourceContext.instanceId() + ".userqueue"),
233 mResourceType(resourceType), 234 mSynchronizerQueue(Sink::storageLocation(), resourceContext.instanceId() + ".synchronizerqueue"),
234 mResourceInstanceIdentifier(resourceInstanceIdentifier), 235 mPipeline(pipeline ? pipeline : QSharedPointer<Sink::Pipeline>::create(resourceContext)),
235 mPipeline(pipeline ? pipeline : QSharedPointer<Sink::Pipeline>::create(resourceInstanceIdentifier)),
236 mError(0), 236 mError(0),
237 mClientLowerBoundRevision(std::numeric_limits<qint64>::max()) 237 mClientLowerBoundRevision(std::numeric_limits<qint64>::max())
238{ 238{
239 mPipeline->setResourceType(mResourceType);
240 mProcessor = std::unique_ptr<CommandProcessor>(new CommandProcessor(mPipeline.data(), QList<MessageQueue *>() << &mUserQueue << &mSynchronizerQueue)); 239 mProcessor = std::unique_ptr<CommandProcessor>(new CommandProcessor(mPipeline.data(), QList<MessageQueue *>() << &mUserQueue << &mSynchronizerQueue));
241 mProcessor->setInspectionCommand([this](void const *command, size_t size) { 240 mProcessor->setInspectionCommand([this](void const *command, size_t size) {
242 flatbuffers::Verifier verifier((const uint8_t *)command, size); 241 flatbuffers::Verifier verifier((const uint8_t *)command, size);
@@ -357,19 +356,19 @@ void GenericResource::setupChangereplay(const QSharedPointer<ChangeReplay> &chan
357 356
358void GenericResource::removeFromDisk(const QByteArray &instanceIdentifier) 357void GenericResource::removeFromDisk(const QByteArray &instanceIdentifier)
359{ 358{
360 Sink::Storage(Sink::storageLocation(), instanceIdentifier, Sink::Storage::ReadWrite).removeFromDisk(); 359 Sink::Storage::DataStore(Sink::storageLocation(), instanceIdentifier, Sink::Storage::DataStore::ReadWrite).removeFromDisk();
361 Sink::Storage(Sink::storageLocation(), instanceIdentifier + ".userqueue", Sink::Storage::ReadWrite).removeFromDisk(); 360 Sink::Storage::DataStore(Sink::storageLocation(), instanceIdentifier + ".userqueue", Sink::Storage::DataStore::ReadWrite).removeFromDisk();
362 Sink::Storage(Sink::storageLocation(), instanceIdentifier + ".synchronizerqueue", Sink::Storage::ReadWrite).removeFromDisk(); 361 Sink::Storage::DataStore(Sink::storageLocation(), instanceIdentifier + ".synchronizerqueue", Sink::Storage::DataStore::ReadWrite).removeFromDisk();
363 Sink::Storage(Sink::storageLocation(), instanceIdentifier + ".changereplay", Sink::Storage::ReadWrite).removeFromDisk(); 362 Sink::Storage::DataStore(Sink::storageLocation(), instanceIdentifier + ".changereplay", Sink::Storage::DataStore::ReadWrite).removeFromDisk();
364 Sink::Storage(Sink::storageLocation(), instanceIdentifier + ".synchronization", Sink::Storage::ReadWrite).removeFromDisk(); 363 Sink::Storage::DataStore(Sink::storageLocation(), instanceIdentifier + ".synchronization", Sink::Storage::DataStore::ReadWrite).removeFromDisk();
365} 364}
366 365
367qint64 GenericResource::diskUsage(const QByteArray &instanceIdentifier) 366qint64 GenericResource::diskUsage(const QByteArray &instanceIdentifier)
368{ 367{
369 auto size = Sink::Storage(Sink::storageLocation(), instanceIdentifier, Sink::Storage::ReadOnly).diskUsage(); 368 auto size = Sink::Storage::DataStore(Sink::storageLocation(), instanceIdentifier, Sink::Storage::DataStore::ReadOnly).diskUsage();
370 size += Sink::Storage(Sink::storageLocation(), instanceIdentifier + ".userqueue", Sink::Storage::ReadOnly).diskUsage(); 369 size += Sink::Storage::DataStore(Sink::storageLocation(), instanceIdentifier + ".userqueue", Sink::Storage::DataStore::ReadOnly).diskUsage();
371 size += Sink::Storage(Sink::storageLocation(), instanceIdentifier + ".synchronizerqueue", Sink::Storage::ReadOnly).diskUsage(); 370 size += Sink::Storage::DataStore(Sink::storageLocation(), instanceIdentifier + ".synchronizerqueue", Sink::Storage::DataStore::ReadOnly).diskUsage();
372 size += Sink::Storage(Sink::storageLocation(), instanceIdentifier + ".changereplay", Sink::Storage::ReadOnly).diskUsage(); 371 size += Sink::Storage::DataStore(Sink::storageLocation(), instanceIdentifier + ".changereplay", Sink::Storage::DataStore::ReadOnly).diskUsage();
373 return size; 372 return size;
374} 373}
375 374
diff --git a/common/genericresource.h b/common/genericresource.h
index ec43939..687e307 100644
--- a/common/genericresource.h
+++ b/common/genericresource.h
@@ -43,7 +43,7 @@ class SINK_EXPORT GenericResource : public Resource
43protected: 43protected:
44 SINK_DEBUG_AREA("resource") 44 SINK_DEBUG_AREA("resource")
45public: 45public:
46 GenericResource(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier, const QSharedPointer<Pipeline> &pipeline); 46 GenericResource(const Sink::ResourceContext &context, const QSharedPointer<Pipeline> &pipeline);
47 virtual ~GenericResource(); 47 virtual ~GenericResource();
48 48
49 virtual void processCommand(int commandId, const QByteArray &data) Q_DECL_OVERRIDE; 49 virtual void processCommand(int commandId, const QByteArray &data) Q_DECL_OVERRIDE;
@@ -71,10 +71,9 @@ protected:
71 void onProcessorError(int errorCode, const QString &errorMessage); 71 void onProcessorError(int errorCode, const QString &errorMessage);
72 void enqueueCommand(MessageQueue &mq, int commandId, const QByteArray &data); 72 void enqueueCommand(MessageQueue &mq, int commandId, const QByteArray &data);
73 73
74 ResourceContext mResourceContext;
74 MessageQueue mUserQueue; 75 MessageQueue mUserQueue;
75 MessageQueue mSynchronizerQueue; 76 MessageQueue mSynchronizerQueue;
76 QByteArray mResourceType;
77 QByteArray mResourceInstanceIdentifier;
78 QSharedPointer<Pipeline> mPipeline; 77 QSharedPointer<Pipeline> mPipeline;
79 78
80private: 79private:
diff --git a/common/index.cpp b/common/index.cpp
index beed45c..c864e77 100644
--- a/common/index.cpp
+++ b/common/index.cpp
@@ -4,15 +4,15 @@
4 4
5SINK_DEBUG_AREA("index") 5SINK_DEBUG_AREA("index")
6 6
7Index::Index(const QString &storageRoot, const QString &name, Sink::Storage::AccessMode mode) 7Index::Index(const QString &storageRoot, const QString &name, Sink::Storage::DataStore::AccessMode mode)
8 : mTransaction(Sink::Storage(storageRoot, name, mode).createTransaction(mode)), 8 : mTransaction(Sink::Storage::DataStore(storageRoot, name, mode).createTransaction(mode)),
9 mDb(mTransaction.openDatabase(name.toLatin1(), std::function<void(const Sink::Storage::Error &)>(), true)), 9 mDb(mTransaction.openDatabase(name.toLatin1(), std::function<void(const Sink::Storage::DataStore::Error &)>(), true)),
10 mName(name) 10 mName(name)
11{ 11{
12} 12}
13 13
14Index::Index(const QByteArray &name, Sink::Storage::Transaction &transaction) 14Index::Index(const QByteArray &name, Sink::Storage::DataStore::Transaction &transaction)
15 : mDb(transaction.openDatabase(name, std::function<void(const Sink::Storage::Error &)>(), true)), mName(name) 15 : mDb(transaction.openDatabase(name, std::function<void(const Sink::Storage::DataStore::Error &)>(), true)), mName(name)
16{ 16{
17} 17}
18 18
@@ -33,7 +33,7 @@ void Index::lookup(const QByteArray &key, const std::function<void(const QByteAr
33 resultHandler(value); 33 resultHandler(value);
34 return true; 34 return true;
35 }, 35 },
36 [this, errorHandler](const Sink::Storage::Error &error) { 36 [this, errorHandler](const Sink::Storage::DataStore::Error &error) {
37 SinkWarning() << "Error while retrieving value" << error.message; 37 SinkWarning() << "Error while retrieving value" << error.message;
38 errorHandler(Error(error.store, error.code, error.message)); 38 errorHandler(Error(error.store, error.code, error.message));
39 }, 39 },
diff --git a/common/index.h b/common/index.h
index bfedf9a..cfcc7a0 100644
--- a/common/index.h
+++ b/common/index.h
@@ -29,8 +29,8 @@ public:
29 int code; 29 int code;
30 }; 30 };
31 31
32 Index(const QString &storageRoot, const QString &name, Sink::Storage::AccessMode mode = Sink::Storage::ReadOnly); 32 Index(const QString &storageRoot, const QString &name, Sink::Storage::DataStore::AccessMode mode = Sink::Storage::DataStore::ReadOnly);
33 Index(const QByteArray &name, Sink::Storage::Transaction &); 33 Index(const QByteArray &name, Sink::Storage::DataStore::Transaction &);
34 34
35 void add(const QByteArray &key, const QByteArray &value); 35 void add(const QByteArray &key, const QByteArray &value);
36 void remove(const QByteArray &key, const QByteArray &value); 36 void remove(const QByteArray &key, const QByteArray &value);
@@ -41,8 +41,8 @@ public:
41 41
42private: 42private:
43 Q_DISABLE_COPY(Index); 43 Q_DISABLE_COPY(Index);
44 Sink::Storage::Transaction mTransaction; 44 Sink::Storage::DataStore::Transaction mTransaction;
45 Sink::Storage::NamedDatabase mDb; 45 Sink::Storage::DataStore::NamedDatabase mDb;
46 QString mName; 46 QString mName;
47 SINK_DEBUG_COMPONENT(mName.toLatin1()) 47 SINK_DEBUG_COMPONENT(mName.toLatin1())
48}; 48};
diff --git a/common/indexupdater.h b/common/indexupdater.h
index 79499c3..221a4ed 100644
--- a/common/indexupdater.h
+++ b/common/indexupdater.h
@@ -28,32 +28,32 @@ public:
28 { 28 {
29 } 29 }
30 30
31 void newEntity(const QByteArray &uid, qint64 revision, Sink::ApplicationDomain::BufferAdaptor &newEntity, Sink::Storage::Transaction &transaction) Q_DECL_OVERRIDE 31 void newEntity(const QByteArray &uid, qint64 revision, Sink::ApplicationDomain::BufferAdaptor &newEntity, Sink::Storage::DataStore::Transaction &transaction) Q_DECL_OVERRIDE
32 { 32 {
33 add(newEntity.getProperty(mProperty), uid, transaction); 33 add(newEntity.getProperty(mProperty), uid, transaction);
34 } 34 }
35 35
36 void modifiedEntity(const QByteArray &uid, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &oldEntity, Sink::ApplicationDomain::BufferAdaptor &newEntity, 36 void modifiedEntity(const QByteArray &uid, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &oldEntity, Sink::ApplicationDomain::BufferAdaptor &newEntity,
37 Sink::Storage::Transaction &transaction) Q_DECL_OVERRIDE 37 Sink::Storage::DataStore::Transaction &transaction) Q_DECL_OVERRIDE
38 { 38 {
39 remove(oldEntity.getProperty(mProperty), uid, transaction); 39 remove(oldEntity.getProperty(mProperty), uid, transaction);
40 add(newEntity.getProperty(mProperty), uid, transaction); 40 add(newEntity.getProperty(mProperty), uid, transaction);
41 } 41 }
42 42
43 void deletedEntity(const QByteArray &uid, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &oldEntity, Sink::Storage::Transaction &transaction) Q_DECL_OVERRIDE 43 void deletedEntity(const QByteArray &uid, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &oldEntity, Sink::Storage::DataStore::Transaction &transaction) Q_DECL_OVERRIDE
44 { 44 {
45 remove(oldEntity.getProperty(mProperty), uid, transaction); 45 remove(oldEntity.getProperty(mProperty), uid, transaction);
46 } 46 }
47 47
48private: 48private:
49 void add(const QVariant &value, const QByteArray &uid, Sink::Storage::Transaction &transaction) 49 void add(const QVariant &value, const QByteArray &uid, Sink::Storage::DataStore::Transaction &transaction)
50 { 50 {
51 if (value.isValid()) { 51 if (value.isValid()) {
52 Index(mIndexIdentifier, transaction).add(value.toByteArray(), uid); 52 Index(mIndexIdentifier, transaction).add(value.toByteArray(), uid);
53 } 53 }
54 } 54 }
55 55
56 void remove(const QVariant &value, const QByteArray &uid, Sink::Storage::Transaction &transaction) 56 void remove(const QVariant &value, const QByteArray &uid, Sink::Storage::DataStore::Transaction &transaction)
57 { 57 {
58 if (value.isValid()) { 58 if (value.isValid()) {
59 const auto data = value.toByteArray(); 59 const auto data = value.toByteArray();
@@ -72,19 +72,19 @@ template <typename DomainType>
72class DefaultIndexUpdater : public Sink::Preprocessor 72class DefaultIndexUpdater : public Sink::Preprocessor
73{ 73{
74public: 74public:
75 void newEntity(const QByteArray &uid, qint64 revision, Sink::ApplicationDomain::BufferAdaptor &newEntity, Sink::Storage::Transaction &transaction) Q_DECL_OVERRIDE 75 void newEntity(const QByteArray &uid, qint64 revision, Sink::ApplicationDomain::BufferAdaptor &newEntity, Sink::Storage::DataStore::Transaction &transaction) Q_DECL_OVERRIDE
76 { 76 {
77 Sink::ApplicationDomain::TypeImplementation<DomainType>::index(uid, newEntity, transaction); 77 Sink::ApplicationDomain::TypeImplementation<DomainType>::index(uid, newEntity, transaction);
78 } 78 }
79 79
80 void modifiedEntity(const QByteArray &uid, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &oldEntity, Sink::ApplicationDomain::BufferAdaptor &newEntity, 80 void modifiedEntity(const QByteArray &uid, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &oldEntity, Sink::ApplicationDomain::BufferAdaptor &newEntity,
81 Sink::Storage::Transaction &transaction) Q_DECL_OVERRIDE 81 Sink::Storage::DataStore::Transaction &transaction) Q_DECL_OVERRIDE
82 { 82 {
83 Sink::ApplicationDomain::TypeImplementation<DomainType>::removeIndex(uid, oldEntity, transaction); 83 Sink::ApplicationDomain::TypeImplementation<DomainType>::removeIndex(uid, oldEntity, transaction);
84 Sink::ApplicationDomain::TypeImplementation<DomainType>::index(uid, newEntity, transaction); 84 Sink::ApplicationDomain::TypeImplementation<DomainType>::index(uid, newEntity, transaction);
85 } 85 }
86 86
87 void deletedEntity(const QByteArray &uid, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &oldEntity, Sink::Storage::Transaction &transaction) Q_DECL_OVERRIDE 87 void deletedEntity(const QByteArray &uid, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &oldEntity, Sink::Storage::DataStore::Transaction &transaction) Q_DECL_OVERRIDE
88 { 88 {
89 Sink::ApplicationDomain::TypeImplementation<DomainType>::removeIndex(uid, oldEntity, transaction); 89 Sink::ApplicationDomain::TypeImplementation<DomainType>::removeIndex(uid, oldEntity, transaction);
90 } 90 }
diff --git a/common/listener.cpp b/common/listener.cpp
index 1a8f392..0742017 100644
--- a/common/listener.cpp
+++ b/common/listener.cpp
@@ -23,6 +23,8 @@
23#include "common/resource.h" 23#include "common/resource.h"
24#include "common/log.h" 24#include "common/log.h"
25#include "common/definitions.h" 25#include "common/definitions.h"
26#include "common/resourcecontext.h"
27#include "common/adaptorfactoryregistry.h"
26 28
27// commands 29// commands
28#include "common/commandcompletion_generated.h" 30#include "common/commandcompletion_generated.h"
@@ -455,8 +457,8 @@ void Listener::notify(const Sink::Notification &notification)
455Sink::Resource &Listener::loadResource() 457Sink::Resource &Listener::loadResource()
456{ 458{
457 if (!m_resource) { 459 if (!m_resource) {
458 if (Sink::ResourceFactory *resourceFactory = Sink::ResourceFactory::load(m_resourceName)) { 460 if (auto resourceFactory = Sink::ResourceFactory::load(m_resourceName)) {
459 m_resource = std::unique_ptr<Sink::Resource>(resourceFactory->createResource(m_resourceInstanceIdentifier)); 461 m_resource = std::unique_ptr<Sink::Resource>(resourceFactory->createResource(Sink::ResourceContext{m_resourceInstanceIdentifier, m_resourceName, Sink::AdaptorFactoryRegistry::instance().getFactories(m_resourceName)}));
460 if (!m_resource) { 462 if (!m_resource) {
461 SinkError() << "Failed to instantiate the resource " << m_resourceName; 463 SinkError() << "Failed to instantiate the resource " << m_resourceName;
462 m_resource = std::unique_ptr<Sink::Resource>(new Sink::Resource); 464 m_resource = std::unique_ptr<Sink::Resource>(new Sink::Resource);
diff --git a/common/mailpreprocessor.cpp b/common/mailpreprocessor.cpp
index ec5748f..b978323 100644
--- a/common/mailpreprocessor.cpp
+++ b/common/mailpreprocessor.cpp
@@ -116,7 +116,7 @@ static void updatedIndexedProperties(Sink::ApplicationDomain::Mail &mail, KMime:
116 } 116 }
117} 117}
118 118
119void MailPropertyExtractor::newEntity(Sink::ApplicationDomain::Mail &mail, Sink::Storage::Transaction &transaction) 119void MailPropertyExtractor::newEntity(Sink::ApplicationDomain::Mail &mail, Sink::Storage::DataStore::Transaction &transaction)
120{ 120{
121 MimeMessageReader mimeMessageReader(getFilePathFromMimeMessagePath(mail.getMimeMessagePath())); 121 MimeMessageReader mimeMessageReader(getFilePathFromMimeMessagePath(mail.getMimeMessagePath()));
122 auto msg = mimeMessageReader.mimeMessage(); 122 auto msg = mimeMessageReader.mimeMessage();
@@ -125,7 +125,7 @@ void MailPropertyExtractor::newEntity(Sink::ApplicationDomain::Mail &mail, Sink:
125 } 125 }
126} 126}
127 127
128void MailPropertyExtractor::modifiedEntity(const Sink::ApplicationDomain::Mail &oldMail, Sink::ApplicationDomain::Mail &newMail,Sink::Storage::Transaction &transaction) 128void MailPropertyExtractor::modifiedEntity(const Sink::ApplicationDomain::Mail &oldMail, Sink::ApplicationDomain::Mail &newMail,Sink::Storage::DataStore::Transaction &transaction)
129{ 129{
130 MimeMessageReader mimeMessageReader(getFilePathFromMimeMessagePath(newMail.getMimeMessagePath())); 130 MimeMessageReader mimeMessageReader(getFilePathFromMimeMessagePath(newMail.getMimeMessagePath()));
131 auto msg = mimeMessageReader.mimeMessage(); 131 auto msg = mimeMessageReader.mimeMessage();
@@ -161,21 +161,21 @@ QString MimeMessageMover::moveMessage(const QString &oldPath, const Sink::Applic
161 return oldPath; 161 return oldPath;
162} 162}
163 163
164void MimeMessageMover::newEntity(Sink::ApplicationDomain::Mail &mail, Sink::Storage::Transaction &transaction) 164void MimeMessageMover::newEntity(Sink::ApplicationDomain::Mail &mail, Sink::Storage::DataStore::Transaction &transaction)
165{ 165{
166 if (!mail.getMimeMessagePath().isEmpty()) { 166 if (!mail.getMimeMessagePath().isEmpty()) {
167 mail.setMimeMessagePath(moveMessage(mail.getMimeMessagePath(), mail)); 167 mail.setMimeMessagePath(moveMessage(mail.getMimeMessagePath(), mail));
168 } 168 }
169} 169}
170 170
171void MimeMessageMover::modifiedEntity(const Sink::ApplicationDomain::Mail &oldMail, Sink::ApplicationDomain::Mail &newMail, Sink::Storage::Transaction &transaction) 171void MimeMessageMover::modifiedEntity(const Sink::ApplicationDomain::Mail &oldMail, Sink::ApplicationDomain::Mail &newMail, Sink::Storage::DataStore::Transaction &transaction)
172{ 172{
173 if (!newMail.getMimeMessagePath().isEmpty()) { 173 if (!newMail.getMimeMessagePath().isEmpty()) {
174 newMail.setMimeMessagePath(moveMessage(newMail.getMimeMessagePath(), newMail)); 174 newMail.setMimeMessagePath(moveMessage(newMail.getMimeMessagePath(), newMail));
175 } 175 }
176} 176}
177 177
178void MimeMessageMover::deletedEntity(const Sink::ApplicationDomain::Mail &mail, Sink::Storage::Transaction &transaction) 178void MimeMessageMover::deletedEntity(const Sink::ApplicationDomain::Mail &mail, Sink::Storage::DataStore::Transaction &transaction)
179{ 179{
180 QFile::remove(mail.getMimeMessagePath()); 180 QFile::remove(mail.getMimeMessagePath());
181} 181}
diff --git a/common/mailpreprocessor.h b/common/mailpreprocessor.h
index b7cd0e7..c66517e 100644
--- a/common/mailpreprocessor.h
+++ b/common/mailpreprocessor.h
@@ -24,8 +24,8 @@ class SINK_EXPORT MailPropertyExtractor : public Sink::EntityPreprocessor<Sink::
24{ 24{
25public: 25public:
26 virtual ~MailPropertyExtractor(){} 26 virtual ~MailPropertyExtractor(){}
27 virtual void newEntity(Sink::ApplicationDomain::Mail &mail, Sink::Storage::Transaction &transaction) Q_DECL_OVERRIDE; 27 virtual void newEntity(Sink::ApplicationDomain::Mail &mail, Sink::Storage::DataStore::Transaction &transaction) Q_DECL_OVERRIDE;
28 virtual void modifiedEntity(const Sink::ApplicationDomain::Mail &oldMail, Sink::ApplicationDomain::Mail &newMail,Sink::Storage::Transaction &transaction) Q_DECL_OVERRIDE; 28 virtual void modifiedEntity(const Sink::ApplicationDomain::Mail &oldMail, Sink::ApplicationDomain::Mail &newMail,Sink::Storage::DataStore::Transaction &transaction) Q_DECL_OVERRIDE;
29protected: 29protected:
30 virtual QString getFilePathFromMimeMessagePath(const QString &) const; 30 virtual QString getFilePathFromMimeMessagePath(const QString &) const;
31}; 31};
@@ -36,9 +36,9 @@ public:
36 MimeMessageMover(); 36 MimeMessageMover();
37 virtual ~MimeMessageMover(){} 37 virtual ~MimeMessageMover(){}
38 38
39 void newEntity(Sink::ApplicationDomain::Mail &mail, Sink::Storage::Transaction &transaction) Q_DECL_OVERRIDE; 39 void newEntity(Sink::ApplicationDomain::Mail &mail, Sink::Storage::DataStore::Transaction &transaction) Q_DECL_OVERRIDE;
40 void modifiedEntity(const Sink::ApplicationDomain::Mail &oldMail, Sink::ApplicationDomain::Mail &newMail, Sink::Storage::Transaction &transaction) Q_DECL_OVERRIDE; 40 void modifiedEntity(const Sink::ApplicationDomain::Mail &oldMail, Sink::ApplicationDomain::Mail &newMail, Sink::Storage::DataStore::Transaction &transaction) Q_DECL_OVERRIDE;
41 void deletedEntity(const Sink::ApplicationDomain::Mail &mail, Sink::Storage::Transaction &transaction) Q_DECL_OVERRIDE; 41 void deletedEntity(const Sink::ApplicationDomain::Mail &mail, Sink::Storage::DataStore::Transaction &transaction) Q_DECL_OVERRIDE;
42 42
43private: 43private:
44 QString moveMessage(const QString &oldPath, const Sink::ApplicationDomain::Mail &mail); 44 QString moveMessage(const QString &oldPath, const Sink::ApplicationDomain::Mail &mail);
diff --git a/common/messagequeue.cpp b/common/messagequeue.cpp
index e050bcd..0fcbf99 100644
--- a/common/messagequeue.cpp
+++ b/common/messagequeue.cpp
@@ -5,7 +5,7 @@
5 5
6SINK_DEBUG_AREA("messagequeue") 6SINK_DEBUG_AREA("messagequeue")
7 7
8MessageQueue::MessageQueue(const QString &storageRoot, const QString &name) : mStorage(storageRoot, name, Sink::Storage::ReadWrite) 8MessageQueue::MessageQueue(const QString &storageRoot, const QString &name) : mStorage(storageRoot, name, Sink::Storage::DataStore::ReadWrite)
9{ 9{
10} 10}
11 11
@@ -27,13 +27,13 @@ void MessageQueue::startTransaction()
27 return; 27 return;
28 } 28 }
29 processRemovals(); 29 processRemovals();
30 mWriteTransaction = mStorage.createTransaction(Sink::Storage::ReadWrite); 30 mWriteTransaction = mStorage.createTransaction(Sink::Storage::DataStore::ReadWrite);
31} 31}
32 32
33void MessageQueue::commit() 33void MessageQueue::commit()
34{ 34{
35 mWriteTransaction.commit(); 35 mWriteTransaction.commit();
36 mWriteTransaction = Sink::Storage::Transaction(); 36 mWriteTransaction = Sink::Storage::DataStore::Transaction();
37 processRemovals(); 37 processRemovals();
38 emit messageReady(); 38 emit messageReady();
39} 39}
@@ -45,10 +45,10 @@ void MessageQueue::enqueue(const QByteArray &value)
45 implicitTransaction = true; 45 implicitTransaction = true;
46 startTransaction(); 46 startTransaction();
47 } 47 }
48 const qint64 revision = Sink::Storage::maxRevision(mWriteTransaction) + 1; 48 const qint64 revision = Sink::Storage::DataStore::maxRevision(mWriteTransaction) + 1;
49 const QByteArray key = QString("%1").arg(revision).toUtf8(); 49 const QByteArray key = QString("%1").arg(revision).toUtf8();
50 mWriteTransaction.openDatabase().write(key, value); 50 mWriteTransaction.openDatabase().write(key, value);
51 Sink::Storage::setMaxRevision(mWriteTransaction, revision); 51 Sink::Storage::DataStore::setMaxRevision(mWriteTransaction, revision);
52 if (implicitTransaction) { 52 if (implicitTransaction) {
53 commit(); 53 commit();
54 } 54 }
@@ -59,7 +59,7 @@ void MessageQueue::processRemovals()
59 if (mWriteTransaction) { 59 if (mWriteTransaction) {
60 return; 60 return;
61 } 61 }
62 auto transaction = mStorage.createTransaction(Sink::Storage::ReadWrite); 62 auto transaction = mStorage.createTransaction(Sink::Storage::DataStore::ReadWrite);
63 for (const auto &key : mPendingRemoval) { 63 for (const auto &key : mPendingRemoval) {
64 transaction.openDatabase().remove(key); 64 transaction.openDatabase().remove(key);
65 } 65 }
@@ -82,7 +82,7 @@ KAsync::Job<void> MessageQueue::dequeueBatch(int maxBatchSize, const std::functi
82 return KAsync::start<void>([this, maxBatchSize, resultHandler, resultCount](KAsync::Future<void> &future) { 82 return KAsync::start<void>([this, maxBatchSize, resultHandler, resultCount](KAsync::Future<void> &future) {
83 int count = 0; 83 int count = 0;
84 QList<KAsync::Future<void>> waitCondition; 84 QList<KAsync::Future<void>> waitCondition;
85 mStorage.createTransaction(Sink::Storage::ReadOnly) 85 mStorage.createTransaction(Sink::Storage::DataStore::ReadOnly)
86 .openDatabase() 86 .openDatabase()
87 .scan("", 87 .scan("",
88 [this, resultHandler, resultCount, &count, maxBatchSize, &waitCondition](const QByteArray &key, const QByteArray &value) -> bool { 88 [this, resultHandler, resultCount, &count, maxBatchSize, &waitCondition](const QByteArray &key, const QByteArray &value) -> bool {
@@ -101,7 +101,7 @@ KAsync::Job<void> MessageQueue::dequeueBatch(int maxBatchSize, const std::functi
101 } 101 }
102 return false; 102 return false;
103 }, 103 },
104 [](const Sink::Storage::Error &error) { 104 [](const Sink::Storage::DataStore::Error &error) {
105 SinkError() << "Error while retrieving value" << error.message; 105 SinkError() << "Error while retrieving value" << error.message;
106 // errorHandler(Error(error.store, error.code, error.message)); 106 // errorHandler(Error(error.store, error.code, error.message));
107 }); 107 });
@@ -126,7 +126,7 @@ KAsync::Job<void> MessageQueue::dequeueBatch(int maxBatchSize, const std::functi
126bool MessageQueue::isEmpty() 126bool MessageQueue::isEmpty()
127{ 127{
128 int count = 0; 128 int count = 0;
129 auto t = mStorage.createTransaction(Sink::Storage::ReadOnly); 129 auto t = mStorage.createTransaction(Sink::Storage::DataStore::ReadOnly);
130 auto db = t.openDatabase(); 130 auto db = t.openDatabase();
131 if (db) { 131 if (db) {
132 db.scan("", 132 db.scan("",
@@ -137,7 +137,7 @@ bool MessageQueue::isEmpty()
137 } 137 }
138 return true; 138 return true;
139 }, 139 },
140 [](const Sink::Storage::Error &error) { SinkError() << "Error while checking if empty" << error.message; }); 140 [](const Sink::Storage::DataStore::Error &error) { SinkError() << "Error while checking if empty" << error.message; });
141 } 141 }
142 return count == 0; 142 return count == 0;
143} 143}
diff --git a/common/messagequeue.h b/common/messagequeue.h
index 6f0bddb..f23ddcf 100644
--- a/common/messagequeue.h
+++ b/common/messagequeue.h
@@ -56,7 +56,7 @@ private slots:
56 56
57private: 57private:
58 Q_DISABLE_COPY(MessageQueue); 58 Q_DISABLE_COPY(MessageQueue);
59 Sink::Storage mStorage; 59 Sink::Storage::DataStore mStorage;
60 Sink::Storage::Transaction mWriteTransaction; 60 Sink::Storage::DataStore::Transaction mWriteTransaction;
61 QByteArrayList mPendingRemoval; 61 QByteArrayList mPendingRemoval;
62}; 62};
diff --git a/common/pipeline.cpp b/common/pipeline.cpp
index ce864f7..e257857 100644
--- a/common/pipeline.cpp
+++ b/common/pipeline.cpp
@@ -40,45 +40,45 @@
40 40
41SINK_DEBUG_AREA("pipeline") 41SINK_DEBUG_AREA("pipeline")
42 42
43namespace Sink { 43using namespace Sink;
44using namespace Sink::Storage;
44 45
45class Pipeline::Private 46class Pipeline::Private
46{ 47{
47public: 48public:
48 Private(const QString &resourceName) : storage(Sink::storageLocation(), resourceName, Storage::ReadWrite), revisionChanged(false), resourceInstanceIdentifier(resourceName.toUtf8()) 49 Private(const ResourceContext &context) : resourceContext(context), storage(Sink::storageLocation(), context.instanceId(), DataStore::ReadWrite), revisionChanged(false)
49 { 50 {
50 } 51 }
51 52
52 Storage storage; 53 ResourceContext resourceContext;
53 Storage::Transaction transaction; 54 DataStore storage;
55 DataStore::Transaction transaction;
54 QHash<QString, QVector<QSharedPointer<Preprocessor>>> processors; 56 QHash<QString, QVector<QSharedPointer<Preprocessor>>> processors;
55 bool revisionChanged; 57 bool revisionChanged;
56 void storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid); 58 void storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid);
57 QTime transactionTime; 59 QTime transactionTime;
58 int transactionItemCount; 60 int transactionItemCount;
59 QByteArray resourceType;
60 QByteArray resourceInstanceIdentifier;
61}; 61};
62 62
63void Pipeline::Private::storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid) 63void Pipeline::Private::storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid)
64{ 64{
65 SinkTrace() << "Committing new revision: " << uid << newRevision; 65 SinkTrace() << "Committing new revision: " << uid << newRevision;
66 Storage::mainDatabase(transaction, bufferType) 66 DataStore::mainDatabase(transaction, bufferType)
67 .write(Storage::assembleKey(uid, newRevision), BufferUtils::extractBuffer(fbb), 67 .write(DataStore::assembleKey(uid, newRevision), BufferUtils::extractBuffer(fbb),
68 [uid, newRevision](const Storage::Error &error) { SinkWarning() << "Failed to write entity" << uid << newRevision; }); 68 [uid, newRevision](const DataStore::Error &error) { SinkWarning() << "Failed to write entity" << uid << newRevision; });
69 revisionChanged = true; 69 revisionChanged = true;
70 Storage::setMaxRevision(transaction, newRevision); 70 DataStore::setMaxRevision(transaction, newRevision);
71 Storage::recordRevision(transaction, newRevision, uid, bufferType); 71 DataStore::recordRevision(transaction, newRevision, uid, bufferType);
72} 72}
73 73
74 74
75Pipeline::Pipeline(const QString &resourceName, QObject *parent) : QObject(parent), d(new Private(resourceName)) 75Pipeline::Pipeline(const ResourceContext &context) : QObject(nullptr), d(new Private(context))
76{ 76{
77} 77}
78 78
79Pipeline::~Pipeline() 79Pipeline::~Pipeline()
80{ 80{
81 d->transaction = Storage::Transaction(); 81 d->transaction = DataStore::Transaction();
82} 82}
83 83
84void Pipeline::setPreprocessors(const QString &entityType, const QVector<Preprocessor *> &processors) 84void Pipeline::setPreprocessors(const QString &entityType, const QVector<Preprocessor *> &processors)
@@ -86,16 +86,11 @@ void Pipeline::setPreprocessors(const QString &entityType, const QVector<Preproc
86 auto &list = d->processors[entityType]; 86 auto &list = d->processors[entityType];
87 list.clear(); 87 list.clear();
88 for (auto p : processors) { 88 for (auto p : processors) {
89 p->setup(d->resourceType, d->resourceInstanceIdentifier, this); 89 p->setup(d->resourceContext.resourceType, d->resourceContext.instanceId(), this);
90 list.append(QSharedPointer<Preprocessor>(p)); 90 list.append(QSharedPointer<Preprocessor>(p));
91 } 91 }
92} 92}
93 93
94void Pipeline::setResourceType(const QByteArray &resourceType)
95{
96 d->resourceType = resourceType;
97}
98
99void Pipeline::startTransaction() 94void Pipeline::startTransaction()
100{ 95{
101 // TODO call for all types 96 // TODO call for all types
@@ -109,7 +104,7 @@ void Pipeline::startTransaction()
109 SinkTrace() << "Starting transaction."; 104 SinkTrace() << "Starting transaction.";
110 d->transactionTime.start(); 105 d->transactionTime.start();
111 d->transactionItemCount = 0; 106 d->transactionItemCount = 0;
112 d->transaction = storage().createTransaction(Storage::ReadWrite, [](const Sink::Storage::Error &error) { 107 d->transaction = storage().createTransaction(DataStore::ReadWrite, [](const DataStore::Error &error) {
113 SinkWarning() << error.message; 108 SinkWarning() << error.message;
114 }); 109 });
115 110
@@ -119,7 +114,7 @@ void Pipeline::startTransaction()
119 if (d->storage.exists()) { 114 if (d->storage.exists()) {
120 while (!d->transaction.validateNamedDatabases()) { 115 while (!d->transaction.validateNamedDatabases()) {
121 SinkWarning() << "Opened an invalid transaction!!!!!!"; 116 SinkWarning() << "Opened an invalid transaction!!!!!!";
122 d->transaction = storage().createTransaction(Storage::ReadWrite, [](const Sink::Storage::Error &error) { 117 d->transaction = storage().createTransaction(DataStore::ReadWrite, [](const DataStore::Error &error) {
123 SinkWarning() << error.message; 118 SinkWarning() << error.message;
124 }); 119 });
125 } 120 }
@@ -135,29 +130,29 @@ void Pipeline::commit()
135 // } 130 // }
136 if (!d->revisionChanged) { 131 if (!d->revisionChanged) {
137 d->transaction.abort(); 132 d->transaction.abort();
138 d->transaction = Storage::Transaction(); 133 d->transaction = DataStore::Transaction();
139 return; 134 return;
140 } 135 }
141 const auto revision = Storage::maxRevision(d->transaction); 136 const auto revision = DataStore::maxRevision(d->transaction);
142 const auto elapsed = d->transactionTime.elapsed(); 137 const auto elapsed = d->transactionTime.elapsed();
143 SinkLog() << "Committing revision: " << revision << ":" << d->transactionItemCount << " items in: " << Log::TraceTime(elapsed) << " " 138 SinkLog() << "Committing revision: " << revision << ":" << d->transactionItemCount << " items in: " << Log::TraceTime(elapsed) << " "
144 << (double)elapsed / (double)qMax(d->transactionItemCount, 1) << "[ms/item]"; 139 << (double)elapsed / (double)qMax(d->transactionItemCount, 1) << "[ms/item]";
145 if (d->transaction) { 140 if (d->transaction) {
146 d->transaction.commit(); 141 d->transaction.commit();
147 } 142 }
148 d->transaction = Storage::Transaction(); 143 d->transaction = DataStore::Transaction();
149 if (d->revisionChanged) { 144 if (d->revisionChanged) {
150 d->revisionChanged = false; 145 d->revisionChanged = false;
151 emit revisionUpdated(revision); 146 emit revisionUpdated(revision);
152 } 147 }
153} 148}
154 149
155Storage::Transaction &Pipeline::transaction() 150DataStore::Transaction &Pipeline::transaction()
156{ 151{
157 return d->transaction; 152 return d->transaction;
158} 153}
159 154
160Storage &Pipeline::storage() const 155DataStore &Pipeline::storage() const
161{ 156{
162 return d->storage; 157 return d->storage;
163} 158}
@@ -180,14 +175,14 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size)
180 QByteArray key; 175 QByteArray key;
181 if (createEntity->entityId()) { 176 if (createEntity->entityId()) {
182 key = QByteArray(reinterpret_cast<char const *>(createEntity->entityId()->Data()), createEntity->entityId()->size()); 177 key = QByteArray(reinterpret_cast<char const *>(createEntity->entityId()->Data()), createEntity->entityId()->size());
183 if (Storage::mainDatabase(d->transaction, bufferType).contains(key)) { 178 if (DataStore::mainDatabase(d->transaction, bufferType).contains(key)) {
184 SinkError() << "An entity with this id already exists: " << key; 179 SinkError() << "An entity with this id already exists: " << key;
185 return KAsync::error<qint64>(0); 180 return KAsync::error<qint64>(0);
186 } 181 }
187 } 182 }
188 183
189 if (key.isEmpty()) { 184 if (key.isEmpty()) {
190 key = Sink::Storage::generateUid(); 185 key = DataStore::generateUid();
191 } 186 }
192 SinkTrace() << "New Entity. Type: " << bufferType << "uid: "<< key << " replayToSource: " << replayToSource; 187 SinkTrace() << "New Entity. Type: " << bufferType << "uid: "<< key << " replayToSource: " << replayToSource;
193 Q_ASSERT(!key.isEmpty()); 188 Q_ASSERT(!key.isEmpty());
@@ -205,7 +200,7 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size)
205 return KAsync::error<qint64>(0); 200 return KAsync::error<qint64>(0);
206 } 201 }
207 202
208 auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceType, bufferType); 203 auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceContext.resourceType, bufferType);
209 if (!adaptorFactory) { 204 if (!adaptorFactory) {
210 SinkWarning() << "no adaptor factory for type " << bufferType; 205 SinkWarning() << "no adaptor factory for type " << bufferType;
211 return KAsync::error<qint64>(0); 206 return KAsync::error<qint64>(0);
@@ -214,10 +209,10 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size)
214 auto adaptor = adaptorFactory->createAdaptor(*entity); 209 auto adaptor = adaptorFactory->createAdaptor(*entity);
215 auto memoryAdaptor = QSharedPointer<Sink::ApplicationDomain::MemoryBufferAdaptor>::create(*(adaptor), adaptor->availableProperties()); 210 auto memoryAdaptor = QSharedPointer<Sink::ApplicationDomain::MemoryBufferAdaptor>::create(*(adaptor), adaptor->availableProperties());
216 foreach (const auto &processor, d->processors[bufferType]) { 211 foreach (const auto &processor, d->processors[bufferType]) {
217 processor->newEntity(key, Storage::maxRevision(d->transaction) + 1, *memoryAdaptor, d->transaction); 212 processor->newEntity(key, DataStore::maxRevision(d->transaction) + 1, *memoryAdaptor, d->transaction);
218 } 213 }
219 //The maxRevision may have changed meanwhile if the entity created sub-entities 214 //The maxRevision may have changed meanwhile if the entity created sub-entities
220 const qint64 newRevision = Storage::maxRevision(d->transaction) + 1; 215 const qint64 newRevision = DataStore::maxRevision(d->transaction) + 1;
221 216
222 // Add metadata buffer 217 // Add metadata buffer
223 flatbuffers::FlatBufferBuilder metadataFbb; 218 flatbuffers::FlatBufferBuilder metadataFbb;
@@ -233,6 +228,8 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size)
233 228
234 d->storeNewRevision(newRevision, fbb, bufferType, key); 229 d->storeNewRevision(newRevision, fbb, bufferType, key);
235 230
231 //FIXME entityStore->create(bufferType, memoryAdaptor, replayToSource)
232
236 return KAsync::value(newRevision); 233 return KAsync::value(newRevision);
237} 234}
238 235
@@ -273,7 +270,7 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size)
273 } 270 }
274 271
275 // TODO use only readPropertyMapper and writePropertyMapper 272 // TODO use only readPropertyMapper and writePropertyMapper
276 auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceType, bufferType); 273 auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceContext.resourceType, bufferType);
277 if (!adaptorFactory) { 274 if (!adaptorFactory) {
278 SinkWarning() << "no adaptor factory for type " << bufferType; 275 SinkWarning() << "no adaptor factory for type " << bufferType;
279 return KAsync::error<qint64>(0); 276 return KAsync::error<qint64>(0);
@@ -284,7 +281,7 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size)
284 auto diff = adaptorFactory->createAdaptor(*diffEntity); 281 auto diff = adaptorFactory->createAdaptor(*diffEntity);
285 282
286 QSharedPointer<ApplicationDomain::BufferAdaptor> current; 283 QSharedPointer<ApplicationDomain::BufferAdaptor> current;
287 Storage::mainDatabase(d->transaction, bufferType) 284 DataStore::mainDatabase(d->transaction, bufferType)
288 .findLatest(key, 285 .findLatest(key,
289 [&current, adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool { 286 [&current, adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool {
290 EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); 287 EntityBuffer buffer(const_cast<const char *>(data.data()), data.size());
@@ -295,7 +292,7 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size)
295 } 292 }
296 return false; 293 return false;
297 }, 294 },
298 [baseRevision](const Storage::Error &error) { SinkWarning() << "Failed to read old revision from storage: " << error.message << "Revision: " << baseRevision; }); 295 [baseRevision](const DataStore::Error &error) { SinkWarning() << "Failed to read old revision from storage: " << error.message << "Revision: " << baseRevision; });
299 296
300 if (!current) { 297 if (!current) {
301 SinkWarning() << "Failed to read local value " << key; 298 SinkWarning() << "Failed to read local value " << key;
@@ -323,10 +320,10 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size)
323 320
324 newAdaptor->resetChangedProperties(); 321 newAdaptor->resetChangedProperties();
325 foreach (const auto &processor, d->processors[bufferType]) { 322 foreach (const auto &processor, d->processors[bufferType]) {
326 processor->modifiedEntity(key, Storage::maxRevision(d->transaction) + 1, *current, *newAdaptor, d->transaction); 323 processor->modifiedEntity(key, DataStore::maxRevision(d->transaction) + 1, *current, *newAdaptor, d->transaction);
327 } 324 }
328 //The maxRevision may have changed meanwhile if the entity created sub-entities 325 //The maxRevision may have changed meanwhile if the entity created sub-entities
329 const qint64 newRevision = Storage::maxRevision(d->transaction) + 1; 326 const qint64 newRevision = DataStore::maxRevision(d->transaction) + 1;
330 327
331 // Add metadata buffer 328 // Add metadata buffer
332 flatbuffers::FlatBufferBuilder metadataFbb; 329 flatbuffers::FlatBufferBuilder metadataFbb;
@@ -369,7 +366,7 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size)
369 366
370 bool found = false; 367 bool found = false;
371 bool alreadyRemoved = false; 368 bool alreadyRemoved = false;
372 Storage::mainDatabase(d->transaction, bufferType) 369 DataStore::mainDatabase(d->transaction, bufferType)
373 .findLatest(key, 370 .findLatest(key,
374 [&found, &alreadyRemoved](const QByteArray &key, const QByteArray &data) -> bool { 371 [&found, &alreadyRemoved](const QByteArray &key, const QByteArray &data) -> bool {
375 auto entity = GetEntity(data.data()); 372 auto entity = GetEntity(data.data());
@@ -382,7 +379,7 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size)
382 } 379 }
383 return false; 380 return false;
384 }, 381 },
385 [](const Storage::Error &error) { SinkWarning() << "Failed to read old revision from storage: " << error.message; }); 382 [](const DataStore::Error &error) { SinkWarning() << "Failed to read old revision from storage: " << error.message; });
386 383
387 if (!found) { 384 if (!found) {
388 SinkWarning() << "Failed to find entity " << key; 385 SinkWarning() << "Failed to find entity " << key;
@@ -393,7 +390,7 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size)
393 return KAsync::error<qint64>(0); 390 return KAsync::error<qint64>(0);
394 } 391 }
395 392
396 const qint64 newRevision = Storage::maxRevision(d->transaction) + 1; 393 const qint64 newRevision = DataStore::maxRevision(d->transaction) + 1;
397 394
398 // Add metadata buffer 395 // Add metadata buffer
399 flatbuffers::FlatBufferBuilder metadataFbb; 396 flatbuffers::FlatBufferBuilder metadataFbb;
@@ -407,14 +404,14 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size)
407 flatbuffers::FlatBufferBuilder fbb; 404 flatbuffers::FlatBufferBuilder fbb;
408 EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), 0, 0, 0, 0); 405 EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), 0, 0, 0, 0);
409 406
410 auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceType, bufferType); 407 auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceContext.resourceType, bufferType);
411 if (!adaptorFactory) { 408 if (!adaptorFactory) {
412 SinkWarning() << "no adaptor factory for type " << bufferType; 409 SinkWarning() << "no adaptor factory for type " << bufferType;
413 return KAsync::error<qint64>(0); 410 return KAsync::error<qint64>(0);
414 } 411 }
415 412
416 QSharedPointer<ApplicationDomain::BufferAdaptor> current; 413 QSharedPointer<ApplicationDomain::BufferAdaptor> current;
417 Storage::mainDatabase(d->transaction, bufferType) 414 DataStore::mainDatabase(d->transaction, bufferType)
418 .findLatest(key, 415 .findLatest(key,
419 [this, bufferType, newRevision, adaptorFactory, key, &current](const QByteArray &, const QByteArray &data) -> bool { 416 [this, bufferType, newRevision, adaptorFactory, key, &current](const QByteArray &, const QByteArray &data) -> bool {
420 EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); 417 EntityBuffer buffer(const_cast<const char *>(data.data()), data.size());
@@ -425,7 +422,7 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size)
425 } 422 }
426 return false; 423 return false;
427 }, 424 },
428 [this](const Storage::Error &error) { SinkError() << "Failed to find value in pipeline: " << error.message; }); 425 [this](const DataStore::Error &error) { SinkError() << "Failed to find value in pipeline: " << error.message; });
429 426
430 d->storeNewRevision(newRevision, fbb, bufferType, key); 427 d->storeNewRevision(newRevision, fbb, bufferType, key);
431 428
@@ -439,10 +436,10 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size)
439void Pipeline::cleanupRevision(qint64 revision) 436void Pipeline::cleanupRevision(qint64 revision)
440{ 437{
441 d->revisionChanged = true; 438 d->revisionChanged = true;
442 const auto uid = Storage::getUidFromRevision(d->transaction, revision); 439 const auto uid = DataStore::getUidFromRevision(d->transaction, revision);
443 const auto bufferType = Storage::getTypeFromRevision(d->transaction, revision); 440 const auto bufferType = DataStore::getTypeFromRevision(d->transaction, revision);
444 SinkTrace() << "Cleaning up revision " << revision << uid << bufferType; 441 SinkTrace() << "Cleaning up revision " << revision << uid << bufferType;
445 Storage::mainDatabase(d->transaction, bufferType) 442 DataStore::mainDatabase(d->transaction, bufferType)
446 .scan(uid, 443 .scan(uid,
447 [&](const QByteArray &key, const QByteArray &data) -> bool { 444 [&](const QByteArray &key, const QByteArray &data) -> bool {
448 EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); 445 EntityBuffer buffer(const_cast<const char *>(data.data()), data.size());
@@ -453,20 +450,20 @@ void Pipeline::cleanupRevision(qint64 revision)
453 const qint64 rev = metadata->revision(); 450 const qint64 rev = metadata->revision();
454 // Remove old revisions, and the current if the entity has already been removed 451 // Remove old revisions, and the current if the entity has already been removed
455 if (rev < revision || metadata->operation() == Operation_Removal) { 452 if (rev < revision || metadata->operation() == Operation_Removal) {
456 Storage::removeRevision(d->transaction, rev); 453 DataStore::removeRevision(d->transaction, rev);
457 Storage::mainDatabase(d->transaction, bufferType).remove(key); 454 DataStore::mainDatabase(d->transaction, bufferType).remove(key);
458 } 455 }
459 } 456 }
460 457
461 return true; 458 return true;
462 }, 459 },
463 [](const Storage::Error &error) { SinkWarning() << "Error while reading: " << error.message; }, true); 460 [](const DataStore::Error &error) { SinkWarning() << "Error while reading: " << error.message; }, true);
464 Storage::setCleanedUpRevision(d->transaction, revision); 461 DataStore::setCleanedUpRevision(d->transaction, revision);
465} 462}
466 463
467qint64 Pipeline::cleanedUpRevision() 464qint64 Pipeline::cleanedUpRevision()
468{ 465{
469 return Storage::cleanedUpRevision(d->transaction); 466 return DataStore::cleanedUpRevision(d->transaction);
470} 467}
471 468
472class Preprocessor::Private { 469class Preprocessor::Private {
@@ -523,8 +520,6 @@ void Preprocessor::createEntity(const Sink::ApplicationDomain::ApplicationDomain
523 d->pipeline->newEntity(data, data.size()).exec(); 520 d->pipeline->newEntity(data, data.size()).exec();
524} 521}
525 522
526} // namespace Sink
527
528#pragma clang diagnostic push 523#pragma clang diagnostic push
529#pragma clang diagnostic ignored "-Wundefined-reinterpret-cast" 524#pragma clang diagnostic ignored "-Wundefined-reinterpret-cast"
530#include "moc_pipeline.cpp" 525#include "moc_pipeline.cpp"
diff --git a/common/pipeline.h b/common/pipeline.h
index ef89cf0..bf94017 100644
--- a/common/pipeline.h
+++ b/common/pipeline.h
@@ -1,5 +1,6 @@
1/* 1/*
2 * Copyright (C) 2014 Aaron Seigo <aseigo@kde.org> 2 * Copyright (C) 2014 Aaron Seigo <aseigo@kde.org>
3 * Copyright (C) 2015 Christian Mollekopf <mollekopf@kolabsys.com>
3 * 4 *
4 * This library is free software; you can redistribute it and/or 5 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Lesser General Public 6 * modify it under the terms of the GNU Lesser General Public
@@ -41,16 +42,15 @@ class SINK_EXPORT Pipeline : public QObject
41 Q_OBJECT 42 Q_OBJECT
42 43
43public: 44public:
44 Pipeline(const QString &storagePath, QObject *parent = 0); 45 Pipeline(const ResourceContext &context);
45 ~Pipeline(); 46 ~Pipeline();
46 47
47 Storage &storage() const; 48 Storage::DataStore &storage() const;
48 49
49 void setResourceType(const QByteArray &resourceType);
50 void setPreprocessors(const QString &entityType, const QVector<Preprocessor *> &preprocessors); 50 void setPreprocessors(const QString &entityType, const QVector<Preprocessor *> &preprocessors);
51 void startTransaction(); 51 void startTransaction();
52 void commit(); 52 void commit();
53 Storage::Transaction &transaction(); 53 Storage::DataStore::Transaction &transaction();
54 54
55 KAsync::Job<qint64> newEntity(void const *command, size_t size); 55 KAsync::Job<qint64> newEntity(void const *command, size_t size);
56 KAsync::Job<qint64> modifiedEntity(void const *command, size_t size); 56 KAsync::Job<qint64> modifiedEntity(void const *command, size_t size);
@@ -82,10 +82,10 @@ public:
82 virtual ~Preprocessor(); 82 virtual ~Preprocessor();
83 83
84 virtual void startBatch(); 84 virtual void startBatch();
85 virtual void newEntity(const QByteArray &uid, qint64 revision, Sink::ApplicationDomain::BufferAdaptor &newEntity, Sink::Storage::Transaction &transaction) {}; 85 virtual void newEntity(const QByteArray &uid, qint64 revision, ApplicationDomain::BufferAdaptor &newEntity, Storage::DataStore::Transaction &transaction) {};
86 virtual void modifiedEntity(const QByteArray &uid, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &oldEntity, 86 virtual void modifiedEntity(const QByteArray &uid, qint64 revision, const ApplicationDomain::BufferAdaptor &oldEntity,
87 Sink::ApplicationDomain::BufferAdaptor &newEntity, Sink::Storage::Transaction &transaction) {}; 87 ApplicationDomain::BufferAdaptor &newEntity, Storage::DataStore::Transaction &transaction) {};
88 virtual void deletedEntity(const QByteArray &uid, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &oldEntity, Sink::Storage::Transaction &transaction) {}; 88 virtual void deletedEntity(const QByteArray &uid, qint64 revision, const ApplicationDomain::BufferAdaptor &oldEntity, Storage::DataStore::Transaction &transaction) {};
89 virtual void finalize(); 89 virtual void finalize();
90 90
91 void setup(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier, Pipeline *); 91 void setup(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier, Pipeline *);
@@ -94,9 +94,9 @@ protected:
94 template <typename DomainType> 94 template <typename DomainType>
95 void createEntity(const DomainType &entity) 95 void createEntity(const DomainType &entity)
96 { 96 {
97 createEntity(entity, Sink::ApplicationDomain::getTypeName<DomainType>()); 97 createEntity(entity, ApplicationDomain::getTypeName<DomainType>());
98 } 98 }
99 void createEntity(const Sink::ApplicationDomain::ApplicationDomainType &entity, const QByteArray &type); 99 void createEntity(const ApplicationDomain::ApplicationDomainType &entity, const QByteArray &type);
100 100
101 QByteArray resourceInstanceIdentifier() const; 101 QByteArray resourceInstanceIdentifier() const;
102 102
@@ -110,27 +110,27 @@ template<typename DomainType>
110class SINK_EXPORT EntityPreprocessor: public Preprocessor 110class SINK_EXPORT EntityPreprocessor: public Preprocessor
111{ 111{
112public: 112public:
113 virtual void newEntity(DomainType &, Sink::Storage::Transaction &transaction) {}; 113 virtual void newEntity(DomainType &, Storage::DataStore::Transaction &transaction) {};
114 virtual void modifiedEntity(const DomainType &oldEntity, DomainType &newEntity, Sink::Storage::Transaction &transaction) {}; 114 virtual void modifiedEntity(const DomainType &oldEntity, DomainType &newEntity, Storage::DataStore::Transaction &transaction) {};
115 virtual void deletedEntity(const DomainType &oldEntity, Sink::Storage::Transaction &transaction) {}; 115 virtual void deletedEntity(const DomainType &oldEntity, Storage::DataStore::Transaction &transaction) {};
116 116
117private: 117private:
118 static void nullDeleter(Sink::ApplicationDomain::BufferAdaptor *) {} 118 static void nullDeleter(ApplicationDomain::BufferAdaptor *) {}
119 virtual void newEntity(const QByteArray &uid, qint64 revision, Sink::ApplicationDomain::BufferAdaptor &bufferAdaptor, Sink::Storage::Transaction &transaction) Q_DECL_OVERRIDE 119 virtual void newEntity(const QByteArray &uid, qint64 revision, ApplicationDomain::BufferAdaptor &bufferAdaptor, Storage::DataStore::Transaction &transaction) Q_DECL_OVERRIDE
120 { 120 {
121 auto o = DomainType("", uid, revision, QSharedPointer<Sink::ApplicationDomain::BufferAdaptor>(&bufferAdaptor, nullDeleter)); 121 auto o = DomainType("", uid, revision, QSharedPointer<ApplicationDomain::BufferAdaptor>(&bufferAdaptor, nullDeleter));
122 newEntity(o, transaction); 122 newEntity(o, transaction);
123 } 123 }
124 124
125 virtual void modifiedEntity(const QByteArray &uid, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &oldEntity, 125 virtual void modifiedEntity(const QByteArray &uid, qint64 revision, const ApplicationDomain::BufferAdaptor &oldEntity,
126 Sink::ApplicationDomain::BufferAdaptor &bufferAdaptor, Sink::Storage::Transaction &transaction) Q_DECL_OVERRIDE 126 ApplicationDomain::BufferAdaptor &bufferAdaptor, Storage::DataStore::Transaction &transaction) Q_DECL_OVERRIDE
127 { 127 {
128 auto o = DomainType("", uid, revision, QSharedPointer<Sink::ApplicationDomain::BufferAdaptor>(&bufferAdaptor, nullDeleter)); 128 auto o = DomainType("", uid, revision, QSharedPointer<ApplicationDomain::BufferAdaptor>(&bufferAdaptor, nullDeleter));
129 modifiedEntity(DomainType("", uid, 0, QSharedPointer<Sink::ApplicationDomain::BufferAdaptor>(const_cast<Sink::ApplicationDomain::BufferAdaptor*>(&oldEntity), nullDeleter)), o, transaction); 129 modifiedEntity(DomainType("", uid, 0, QSharedPointer<ApplicationDomain::BufferAdaptor>(const_cast<ApplicationDomain::BufferAdaptor*>(&oldEntity), nullDeleter)), o, transaction);
130 } 130 }
131 virtual void deletedEntity(const QByteArray &uid, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &bufferAdaptor, Sink::Storage::Transaction &transaction) Q_DECL_OVERRIDE 131 virtual void deletedEntity(const QByteArray &uid, qint64 revision, const ApplicationDomain::BufferAdaptor &bufferAdaptor, Storage::DataStore::Transaction &transaction) Q_DECL_OVERRIDE
132 { 132 {
133 deletedEntity(DomainType("", uid, revision, QSharedPointer<Sink::ApplicationDomain::BufferAdaptor>(const_cast<Sink::ApplicationDomain::BufferAdaptor*>(&bufferAdaptor), nullDeleter)), transaction); 133 deletedEntity(DomainType("", uid, revision, QSharedPointer<ApplicationDomain::BufferAdaptor>(const_cast<ApplicationDomain::BufferAdaptor*>(&bufferAdaptor), nullDeleter)), transaction);
134 } 134 }
135}; 135};
136 136
diff --git a/common/queryrunner.cpp b/common/queryrunner.cpp
index f037cfc..e7963a3 100644
--- a/common/queryrunner.cpp
+++ b/common/queryrunner.cpp
@@ -28,11 +28,13 @@
28#include "definitions.h" 28#include "definitions.h"
29#include "domainadaptor.h" 29#include "domainadaptor.h"
30#include "asyncutils.h" 30#include "asyncutils.h"
31#include "entityreader.h" 31#include "storage.h"
32#include "datastorequery.h"
32 33
33SINK_DEBUG_AREA("queryrunner") 34SINK_DEBUG_AREA("queryrunner")
34 35
35using namespace Sink; 36using namespace Sink;
37using namespace Sink::Storage;
36 38
37/* 39/*
38 * This class wraps the actual query implementation. 40 * This class wraps the actual query implementation.
@@ -43,30 +45,28 @@ using namespace Sink;
43template <typename DomainType> 45template <typename DomainType>
44class QueryWorker : public QObject 46class QueryWorker : public QObject
45{ 47{
48 typedef std::function<bool(const typename DomainType::Ptr &domainObject, Sink::Operation operation, const QMap<QByteArray, QVariant> &aggregateValues)> ResultCallback;
46 // SINK_DEBUG_COMPONENT(mResourceInstanceIdentifier, mId) 49 // SINK_DEBUG_COMPONENT(mResourceInstanceIdentifier, mId)
47 SINK_DEBUG_COMPONENT(mResourceInstanceIdentifier) 50 SINK_DEBUG_COMPONENT(mResourceContext.resourceInstanceIdentifier)
48public: 51public:
49 QueryWorker(const Sink::Query &query, const QByteArray &instanceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &, const QByteArray &bufferType, 52 QueryWorker(const Sink::Query &query, const ResourceContext &context, const QByteArray &bufferType, const QueryRunnerBase::ResultTransformation &transformation);
50 const QueryRunnerBase::ResultTransformation &transformation);
51 virtual ~QueryWorker(); 53 virtual ~QueryWorker();
52 54
55 qint64 replaySet(ResultSet &resultSet, int offset, int batchSize, const ResultCallback &callback);
53 QPair<qint64, qint64> executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider); 56 QPair<qint64, qint64> executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider);
54 QPair<qint64, qint64> executeInitialQuery(const Sink::Query &query, const typename DomainType::Ptr &parent, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, int offset, int batchsize); 57 QPair<qint64, qint64> executeInitialQuery(const Sink::Query &query, const typename DomainType::Ptr &parent, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, int offset, int batchsize);
55 58
56private: 59private:
57 Storage::Transaction getTransaction();
58 std::function<bool(const typename DomainType::Ptr &, Sink::Operation, const QMap<QByteArray, QVariant> &)> resultProviderCallback(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider); 60 std::function<bool(const typename DomainType::Ptr &, Sink::Operation, const QMap<QByteArray, QVariant> &)> resultProviderCallback(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider);
59 61
60 QueryRunnerBase::ResultTransformation mResultTransformation; 62 QueryRunnerBase::ResultTransformation mResultTransformation;
61 DomainTypeAdaptorFactoryInterface::Ptr mDomainTypeAdaptorFactory; 63 ResourceContext mResourceContext;
62 QByteArray mResourceInstanceIdentifier;
63 QByteArray mId; //Used for identification in debug output 64 QByteArray mId; //Used for identification in debug output
64}; 65};
65 66
66template <class DomainType> 67template <class DomainType>
67QueryRunner<DomainType>::QueryRunner(const Sink::Query &query, const Sink::ResourceAccessInterface::Ptr &resourceAccess, const QByteArray &instanceIdentifier, 68QueryRunner<DomainType>::QueryRunner(const Sink::Query &query, const Sink::ResourceContext &context, const QByteArray &bufferType)
68 const DomainTypeAdaptorFactoryInterface::Ptr &factory, const QByteArray &bufferType) 69 : QueryRunnerBase(), mResourceContext(context), mResourceAccess(mResourceContext.resourceAccess()), mResultProvider(new ResultProvider<typename DomainType::Ptr>), mBatchSize(query.limit)
69 : QueryRunnerBase(), mResourceInstanceIdentifier(instanceIdentifier), mResourceAccess(resourceAccess), mResultProvider(new ResultProvider<typename DomainType::Ptr>), mBatchSize(query.limit)
70{ 70{
71 SinkTrace() << "Starting query"; 71 SinkTrace() << "Starting query";
72 if (query.limit && query.sortProperty.isEmpty()) { 72 if (query.limit && query.sortProperty.isEmpty()) {
@@ -79,16 +79,17 @@ QueryRunner<DomainType>::QueryRunner(const Sink::Query &query, const Sink::Resou
79 SinkTrace() << "Running fetcher. Offset: " << mOffset[parentId] << " Batchsize: " << mBatchSize; 79 SinkTrace() << "Running fetcher. Offset: " << mOffset[parentId] << " Batchsize: " << mBatchSize;
80 auto resultProvider = mResultProvider; 80 auto resultProvider = mResultProvider;
81 if (query.synchronousQuery) { 81 if (query.synchronousQuery) {
82 QueryWorker<DomainType> worker(query, instanceIdentifier, factory, bufferType, mResultTransformation); 82 QueryWorker<DomainType> worker(query, mResourceContext, bufferType, mResultTransformation);
83 worker.executeInitialQuery(query, parent, *resultProvider, mOffset[parentId], mBatchSize); 83 worker.executeInitialQuery(query, parent, *resultProvider, mOffset[parentId], mBatchSize);
84 resultProvider->initialResultSetComplete(parent); 84 resultProvider->initialResultSetComplete(parent);
85 } else { 85 } else {
86 auto resultTransformation = mResultTransformation; 86 auto resultTransformation = mResultTransformation;
87 auto offset = mOffset[parentId]; 87 auto offset = mOffset[parentId];
88 auto batchSize = mBatchSize; 88 auto batchSize = mBatchSize;
89 auto resourceContext = mResourceContext;
89 //The lambda will be executed in a separate thread, so we're extra careful 90 //The lambda will be executed in a separate thread, so we're extra careful
90 async::run<QPair<qint64, qint64> >([resultTransformation, offset, batchSize, query, bufferType, instanceIdentifier, factory, resultProvider, parent]() { 91 async::run<QPair<qint64, qint64> >([resultTransformation, offset, batchSize, query, bufferType, resourceContext, resultProvider, parent]() {
91 QueryWorker<DomainType> worker(query, instanceIdentifier, factory, bufferType, resultTransformation); 92 QueryWorker<DomainType> worker(query, resourceContext, bufferType, resultTransformation);
92 const auto newRevisionAndReplayedEntities = worker.executeInitialQuery(query, parent, *resultProvider, offset, batchSize); 93 const auto newRevisionAndReplayedEntities = worker.executeInitialQuery(query, parent, *resultProvider, offset, batchSize);
93 return newRevisionAndReplayedEntities; 94 return newRevisionAndReplayedEntities;
94 }) 95 })
@@ -115,8 +116,9 @@ QueryRunner<DomainType>::QueryRunner(const Sink::Query &query, const Sink::Resou
115 // Incremental updates are always loaded directly, leaving it up to the result to discard the changes if they are not interesting 116 // Incremental updates are always loaded directly, leaving it up to the result to discard the changes if they are not interesting
116 setQuery([=]() -> KAsync::Job<void> { 117 setQuery([=]() -> KAsync::Job<void> {
117 auto resultProvider = mResultProvider; 118 auto resultProvider = mResultProvider;
119 auto resourceContext = mResourceContext;
118 return async::run<QPair<qint64, qint64> >([=]() { 120 return async::run<QPair<qint64, qint64> >([=]() {
119 QueryWorker<DomainType> worker(query, instanceIdentifier, factory, bufferType, mResultTransformation); 121 QueryWorker<DomainType> worker(query, resourceContext, bufferType, mResultTransformation);
120 const auto newRevisionAndReplayedEntities = worker.executeIncrementalQuery(query, *resultProvider); 122 const auto newRevisionAndReplayedEntities = worker.executeIncrementalQuery(query, *resultProvider);
121 return newRevisionAndReplayedEntities; 123 return newRevisionAndReplayedEntities;
122 }) 124 })
@@ -158,11 +160,10 @@ typename Sink::ResultEmitter<typename DomainType::Ptr>::Ptr QueryRunner<DomainTy
158 return mResultProvider->emitter(); 160 return mResultProvider->emitter();
159} 161}
160 162
161
162template <class DomainType> 163template <class DomainType>
163QueryWorker<DomainType>::QueryWorker(const Sink::Query &query, const QByteArray &instanceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &factory, 164QueryWorker<DomainType>::QueryWorker(const Sink::Query &query, const Sink::ResourceContext &resourceContext,
164 const QByteArray &bufferType, const QueryRunnerBase::ResultTransformation &transformation) 165 const QByteArray &bufferType, const QueryRunnerBase::ResultTransformation &transformation)
165 : QObject(), mResultTransformation(transformation), mDomainTypeAdaptorFactory(factory), mResourceInstanceIdentifier(instanceIdentifier), mId(QUuid::createUuid().toByteArray()) 166 : QObject(), mResultTransformation(transformation), mResourceContext(resourceContext), mId(QUuid::createUuid().toByteArray())
166{ 167{
167 SinkTrace() << "Starting query worker"; 168 SinkTrace() << "Starting query worker";
168} 169}
@@ -203,41 +204,46 @@ std::function<bool(const typename DomainType::Ptr &, Sink::Operation, const QMap
203} 204}
204 205
205template <class DomainType> 206template <class DomainType>
207qint64 QueryWorker<DomainType>::replaySet(ResultSet &resultSet, int offset, int batchSize, const ResultCallback &callback)
208{
209 SinkTrace() << "Skipping over " << offset << " results";
210 resultSet.skip(offset);
211 int counter = 0;
212 while (!batchSize || (counter < batchSize)) {
213 const bool ret =
214 resultSet.next([this, &counter, callback](const ResultSet::Result &result) -> bool {
215 counter++;
216 auto adaptor = mResourceContext.adaptorFactory<DomainType>().createAdaptor(result.buffer.entity());
217 Q_ASSERT(adaptor);
218 return callback(QSharedPointer<DomainType>::create(mResourceContext.instanceId(), result.uid, result.buffer.revision(), adaptor), result.operation, result.aggregateValues);
219 });
220 if (!ret) {
221 break;
222 }
223 };
224 SinkTrace() << "Replayed " << counter << " results."
225 << "Limit " << batchSize;
226 return counter;
227}
228
229template <class DomainType>
206QPair<qint64, qint64> QueryWorker<DomainType>::executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider) 230QPair<qint64, qint64> QueryWorker<DomainType>::executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider)
207{ 231{
208 QTime time; 232 QTime time;
209 time.start(); 233 time.start();
210 234
211 auto transaction = getTransaction(); 235 auto entityStore = EntityStore::Ptr::create(mResourceContext);
212 236
213 Sink::EntityReader<DomainType> reader(*mDomainTypeAdaptorFactory, mResourceInstanceIdentifier, transaction); 237 const qint64 baseRevision = resultProvider.revision() + 1;
214 auto revisionAndReplayedEntities = reader.executeIncrementalQuery(query, resultProvider.revision(), resultProviderCallback(query, resultProvider));
215 SinkTrace() << "Incremental query took: " << Log::TraceTime(time.elapsed());
216 return revisionAndReplayedEntities;
217}
218 238
219template <class DomainType> 239 auto preparedQuery = ApplicationDomain::TypeImplementation<DomainType>::prepareQuery(query, entityStore);
220Storage::Transaction QueryWorker<DomainType>::getTransaction() 240 auto resultSet = preparedQuery->update(baseRevision);
221{
222 Sink::Storage::Transaction transaction;
223 {
224 Sink::Storage storage(Sink::storageLocation(), mResourceInstanceIdentifier);
225 if (!storage.exists()) {
226 //This is not an error if the resource wasn't started before
227 SinkLog() << "Store doesn't exist: " << mResourceInstanceIdentifier;
228 return Sink::Storage::Transaction();
229 }
230 storage.setDefaultErrorHandler([this](const Sink::Storage::Error &error) { SinkWarning() << "Error during query: " << error.store << error.message; });
231 transaction = storage.createTransaction(Sink::Storage::ReadOnly);
232 }
233 241
234 //FIXME this is a temporary measure to recover from a failure to open the named databases correctly. 242 SinkTrace() << "Filtered set retrieved. " << Log::TraceTime(time.elapsed());
235 //Once the actual problem is fixed it will be enough to simply crash if we open the wrong database (which we check in openDatabase already). 243 auto replayedEntities = replaySet(resultSet, 0, 0, resultProviderCallback(query, resultProvider));
236 while (!transaction.validateNamedDatabases()) { 244
237 Sink::Storage storage(Sink::storageLocation(), mResourceInstanceIdentifier); 245 SinkTrace() << "Incremental query took: " << Log::TraceTime(time.elapsed());
238 transaction = storage.createTransaction(Sink::Storage::ReadOnly); 246 return qMakePair(entityStore->maxRevision(), replayedEntities);
239 }
240 return transaction;
241} 247}
242 248
243template <class DomainType> 249template <class DomainType>
@@ -258,12 +264,16 @@ QPair<qint64, qint64> QueryWorker<DomainType>::executeInitialQuery(
258 } 264 }
259 } 265 }
260 266
261 auto transaction = getTransaction(); 267 auto entityStore = EntityStore::Ptr::create(mResourceContext);
268
269 auto preparedQuery = ApplicationDomain::TypeImplementation<DomainType>::prepareQuery(query, entityStore);
270 auto resultSet = preparedQuery->execute();
271
272 SinkTrace() << "Filtered set retrieved. " << Log::TraceTime(time.elapsed());
273 auto replayedEntities = replaySet(resultSet, offset, batchsize, resultProviderCallback(query, resultProvider));
262 274
263 Sink::EntityReader<DomainType> reader(*mDomainTypeAdaptorFactory, mResourceInstanceIdentifier, transaction);
264 auto revisionAndReplayedEntities = reader.executeInitialQuery(modifiedQuery, offset, batchsize, resultProviderCallback(query, resultProvider));
265 SinkTrace() << "Initial query took: " << Log::TraceTime(time.elapsed()); 275 SinkTrace() << "Initial query took: " << Log::TraceTime(time.elapsed());
266 return revisionAndReplayedEntities; 276 return qMakePair(entityStore->maxRevision(), replayedEntities);
267} 277}
268 278
269template class QueryRunner<Sink::ApplicationDomain::Folder>; 279template class QueryRunner<Sink::ApplicationDomain::Folder>;
diff --git a/common/queryrunner.h b/common/queryrunner.h
index 78aabf6..9bd4791 100644
--- a/common/queryrunner.h
+++ b/common/queryrunner.h
@@ -20,10 +20,10 @@
20#pragma once 20#pragma once
21 21
22#include <QObject> 22#include <QObject>
23#include "resourcecontext.h"
23#include "resourceaccess.h" 24#include "resourceaccess.h"
24#include "resultprovider.h" 25#include "resultprovider.h"
25#include "domaintypeadaptorfactoryinterface.h" 26#include "domaintypeadaptorfactoryinterface.h"
26#include "storage.h"
27#include "query.h" 27#include "query.h"
28#include "log.h" 28#include "log.h"
29 29
@@ -84,8 +84,7 @@ template <typename DomainType>
84class QueryRunner : public QueryRunnerBase 84class QueryRunner : public QueryRunnerBase
85{ 85{
86public: 86public:
87 QueryRunner(const Sink::Query &query, const Sink::ResourceAccessInterface::Ptr &, const QByteArray &instanceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &, 87 QueryRunner(const Sink::Query &query, const Sink::ResourceContext &context, const QByteArray &bufferType);
88 const QByteArray &bufferType);
89 virtual ~QueryRunner(); 88 virtual ~QueryRunner();
90 89
91 /** 90 /**
@@ -97,8 +96,8 @@ public:
97 typename Sink::ResultEmitter<typename DomainType::Ptr>::Ptr emitter(); 96 typename Sink::ResultEmitter<typename DomainType::Ptr>::Ptr emitter();
98 97
99private: 98private:
100 QByteArray mResourceInstanceIdentifier; 99 Sink::ResourceContext mResourceContext;
101 SINK_DEBUG_COMPONENT(mResourceInstanceIdentifier) 100 SINK_DEBUG_COMPONENT(mResourceContext.resourceInstanceIdentifier)
102 QSharedPointer<Sink::ResourceAccessInterface> mResourceAccess; 101 QSharedPointer<Sink::ResourceAccessInterface> mResourceAccess;
103 QSharedPointer<Sink::ResultProvider<typename DomainType::Ptr>> mResultProvider; 102 QSharedPointer<Sink::ResultProvider<typename DomainType::Ptr>> mResultProvider;
104 ResultTransformation mResultTransformation; 103 ResultTransformation mResultTransformation;
diff --git a/common/remoteidmap.cpp b/common/remoteidmap.cpp
index 2c3e5c7..da57cf6 100644
--- a/common/remoteidmap.cpp
+++ b/common/remoteidmap.cpp
@@ -27,7 +27,7 @@ using namespace Sink;
27 27
28SINK_DEBUG_AREA("remoteidmap") 28SINK_DEBUG_AREA("remoteidmap")
29 29
30RemoteIdMap::RemoteIdMap(Sink::Storage::Transaction &transaction) 30RemoteIdMap::RemoteIdMap(Sink::Storage::DataStore::Transaction &transaction)
31 : mTransaction(transaction) 31 : mTransaction(transaction)
32{ 32{
33 33
@@ -58,7 +58,7 @@ QByteArray RemoteIdMap::resolveRemoteId(const QByteArray &bufferType, const QByt
58 Index index("rid.mapping." + bufferType, mTransaction); 58 Index index("rid.mapping." + bufferType, mTransaction);
59 QByteArray sinkId = index.lookup(remoteId); 59 QByteArray sinkId = index.lookup(remoteId);
60 if (sinkId.isEmpty()) { 60 if (sinkId.isEmpty()) {
61 sinkId = Sink::Storage::generateUid(); 61 sinkId = Sink::Storage::DataStore::generateUid();
62 index.add(remoteId, sinkId); 62 index.add(remoteId, sinkId);
63 Index("localid.mapping." + bufferType, mTransaction).add(sinkId, remoteId); 63 Index("localid.mapping." + bufferType, mTransaction).add(sinkId, remoteId);
64 } 64 }
@@ -81,7 +81,7 @@ QByteArray RemoteIdMap::readValue(const QByteArray &key)
81 mTransaction.openDatabase("values").scan(key, [&value](const QByteArray &, const QByteArray &v) { 81 mTransaction.openDatabase("values").scan(key, [&value](const QByteArray &, const QByteArray &v) {
82 value = v; 82 value = v;
83 return false; 83 return false;
84 }, [](const Sink::Storage::Error &) { 84 }, [](const Sink::Storage::DataStore::Error &) {
85 //Ignore errors because we may not find the value 85 //Ignore errors because we may not find the value
86 }); 86 });
87 return value; 87 return value;
diff --git a/common/remoteidmap.h b/common/remoteidmap.h
index bf08621..32c5efd 100644
--- a/common/remoteidmap.h
+++ b/common/remoteidmap.h
@@ -31,7 +31,7 @@ namespace Sink {
31class SINK_EXPORT RemoteIdMap 31class SINK_EXPORT RemoteIdMap
32{ 32{
33public: 33public:
34 RemoteIdMap(Sink::Storage::Transaction &); 34 RemoteIdMap(Sink::Storage::DataStore::Transaction &);
35 35
36 /** 36 /**
37 * Records a localId to remoteId mapping 37 * Records a localId to remoteId mapping
@@ -58,7 +58,7 @@ public:
58 void writeValue(const QByteArray &key, const QByteArray &value); 58 void writeValue(const QByteArray &key, const QByteArray &value);
59 59
60private: 60private:
61 Sink::Storage::Transaction &mTransaction; 61 Sink::Storage::DataStore::Transaction &mTransaction;
62}; 62};
63 63
64} 64}
diff --git a/common/resource.h b/common/resource.h
index d468aca..426585d 100644
--- a/common/resource.h
+++ b/common/resource.h
@@ -27,6 +27,7 @@
27namespace Sink { 27namespace Sink {
28class FacadeFactory; 28class FacadeFactory;
29class AdaptorFactoryRegistry; 29class AdaptorFactoryRegistry;
30class ResourceContext;
30 31
31/** 32/**
32 * Resource interface 33 * Resource interface
@@ -75,7 +76,7 @@ public:
75 ResourceFactory(QObject *parent); 76 ResourceFactory(QObject *parent);
76 virtual ~ResourceFactory(); 77 virtual ~ResourceFactory();
77 78
78 virtual Resource *createResource(const QByteArray &instanceIdentifier) = 0; 79 virtual Resource *createResource(const ResourceContext &context) = 0;
79 virtual void registerFacades(FacadeFactory &factory) = 0; 80 virtual void registerFacades(FacadeFactory &factory) = 0;
80 virtual void registerAdaptorFactories(AdaptorFactoryRegistry &registry) {}; 81 virtual void registerAdaptorFactories(AdaptorFactoryRegistry &registry) {};
81 virtual void removeDataFromDisk(const QByteArray &instanceIdentifier) = 0; 82 virtual void removeDataFromDisk(const QByteArray &instanceIdentifier) = 0;
diff --git a/common/resourcecontext.h b/common/resourcecontext.h
new file mode 100644
index 0000000..6058ac7
--- /dev/null
+++ b/common/resourcecontext.h
@@ -0,0 +1,77 @@
1/*
2 * Copyright (C) 2016 Christian Mollekopf <mollekopf@kolabsys.com>
3 *
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Lesser General Public
6 * License as published by the Free Software Foundation; either
7 * version 2.1 of the License, or (at your option) version 3, or any
8 * later version accepted by the membership of KDE e.V. (or its
9 * successor approved by the membership of KDE e.V.), which shall
10 * act as a proxy defined in Section 6 of version 3 of the license.
11 *
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this library. If not, see <http://www.gnu.org/licenses/>.
19 */
20#pragma once
21
22#include "domaintypeadaptorfactoryinterface.h"
23#include "applicationdomaintype.h"
24#include "resourceaccess.h"
25#include <QByteArray>
26#include <QMap>
27
28namespace Sink {
29
30/*
31 * A context object that can be passed around so each part of the system knows in what context it works.
32 *
33 * This is necessary because we can't rely on a singleton or thread-local storage since multiple resources can be accessed from the same thread/process.
34 */
35struct ResourceContext {
36 const QByteArray resourceInstanceIdentifier;
37 const QByteArray resourceType;
38 QMap<QByteArray, DomainTypeAdaptorFactoryInterface::Ptr> adaptorFactories;
39 //TODO prehaps use a weak pointer to not mess up lifetime management
40 ResourceAccessInterface::Ptr mResourceAccess;
41
42
43 ResourceContext(const QByteArray &identifier, const QByteArray &resourceType_, const QMap<QByteArray, DomainTypeAdaptorFactoryInterface::Ptr> &factories = QMap<QByteArray, DomainTypeAdaptorFactoryInterface::Ptr>())
44 : resourceInstanceIdentifier(identifier),
45 resourceType(resourceType_),
46 adaptorFactories(factories)
47 {
48 }
49
50 QByteArray instanceId() const
51 {
52 return resourceInstanceIdentifier;
53 }
54
55 DomainTypeAdaptorFactoryInterface &adaptorFactory(const QByteArray &type) const
56 {
57 auto factory = adaptorFactories.value(type);
58 Q_ASSERT(factory);
59 return *factory;
60 }
61
62 template<typename DomainType>
63 DomainTypeAdaptorFactoryInterface &adaptorFactory()
64 {
65 return adaptorFactory(ApplicationDomain::getTypeName<DomainType>());
66 }
67
68 ResourceAccessInterface::Ptr resourceAccess()
69 {
70 if (!mResourceAccess) {
71 mResourceAccess = ResourceAccessFactory::instance().getAccess(resourceInstanceIdentifier, resourceType);
72 }
73 return mResourceAccess;
74 }
75};
76
77}
diff --git a/common/sourcewriteback.cpp b/common/sourcewriteback.cpp
index 702d8e3..204793e 100644
--- a/common/sourcewriteback.cpp
+++ b/common/sourcewriteback.cpp
@@ -22,6 +22,8 @@
22#include "definitions.h" 22#include "definitions.h"
23#include "log.h" 23#include "log.h"
24#include "bufferutils.h" 24#include "bufferutils.h"
25#include "entitybuffer.h"
26#include "entity_generated.h"
25 27
26#define ENTITY_TYPE_MAIL "mail" 28#define ENTITY_TYPE_MAIL "mail"
27#define ENTITY_TYPE_FOLDER "folder" 29#define ENTITY_TYPE_FOLDER "folder"
@@ -30,21 +32,21 @@ SINK_DEBUG_AREA("sourcewriteback")
30 32
31using namespace Sink; 33using namespace Sink;
32 34
33SourceWriteBack::SourceWriteBack(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier) 35SourceWriteBack::SourceWriteBack(const ResourceContext &context)
34 : ChangeReplay(resourceInstanceIdentifier), 36 : ChangeReplay(context),
35 mSyncStorage(Sink::storageLocation(), resourceInstanceIdentifier + ".synchronization", Sink::Storage::ReadWrite), 37 mResourceContext(context),
36 mResourceType(resourceType), 38 mSyncStorage(Sink::storageLocation(), context.instanceId() + ".synchronization", Sink::Storage::DataStore::ReadWrite),
37 mResourceInstanceIdentifier(resourceInstanceIdentifier) 39 mEntityStore(QSharedPointer<Storage::EntityStore>::create(mResourceContext))
38{ 40{
39 41
40} 42}
41 43
42EntityStore &SourceWriteBack::store() 44EntityStore &SourceWriteBack::store()
43{ 45{
44 if (!mEntityStore) { 46 if (!mEntityStoreWrapper) {
45 mEntityStore = QSharedPointer<EntityStore>::create(mResourceType, mResourceInstanceIdentifier, mTransaction); 47 mEntityStoreWrapper = QSharedPointer<EntityStore>::create(*mEntityStore);
46 } 48 }
47 return *mEntityStore; 49 return *mEntityStoreWrapper;
48} 50}
49 51
50RemoteIdMap &SourceWriteBack::syncStore() 52RemoteIdMap &SourceWriteBack::syncStore()
@@ -76,15 +78,14 @@ KAsync::Job<void> SourceWriteBack::replay(const QByteArray &type, const QByteArr
76 const auto metadataBuffer = Sink::EntityBuffer::readBuffer<Sink::Metadata>(entity.metadata()); 78 const auto metadataBuffer = Sink::EntityBuffer::readBuffer<Sink::Metadata>(entity.metadata());
77 Q_ASSERT(metadataBuffer); 79 Q_ASSERT(metadataBuffer);
78 Q_ASSERT(!mSyncStore); 80 Q_ASSERT(!mSyncStore);
79 Q_ASSERT(!mEntityStore); 81 Q_ASSERT(!mEntityStoreWrapper);
80 Q_ASSERT(!mTransaction);
81 Q_ASSERT(!mSyncTransaction); 82 Q_ASSERT(!mSyncTransaction);
82 mTransaction = mStorage.createTransaction(Sink::Storage::ReadOnly); 83 mEntityStore->startTransaction(Storage::DataStore::ReadOnly);
83 mSyncTransaction = mSyncStorage.createTransaction(Sink::Storage::ReadWrite); 84 mSyncTransaction = mSyncStorage.createTransaction(Sink::Storage::DataStore::ReadWrite);
84 85
85 // const qint64 revision = metadataBuffer ? metadataBuffer->revision() : -1; 86 // const qint64 revision = metadataBuffer ? metadataBuffer->revision() : -1;
86 const auto operation = metadataBuffer ? metadataBuffer->operation() : Sink::Operation_Creation; 87 const auto operation = metadataBuffer ? metadataBuffer->operation() : Sink::Operation_Creation;
87 const auto uid = Sink::Storage::uidFromKey(key); 88 const auto uid = Sink::Storage::DataStore::uidFromKey(key);
88 const auto modifiedProperties = metadataBuffer->modifiedProperties() ? BufferUtils::fromVector(*metadataBuffer->modifiedProperties()) : QByteArrayList(); 89 const auto modifiedProperties = metadataBuffer->modifiedProperties() ? BufferUtils::fromVector(*metadataBuffer->modifiedProperties()) : QByteArrayList();
89 QByteArray oldRemoteId; 90 QByteArray oldRemoteId;
90 91
@@ -133,9 +134,9 @@ KAsync::Job<void> SourceWriteBack::replay(const QByteArray &type, const QByteArr
133 SinkWarning() << "Failed to replay change: " << error.errorMessage; 134 SinkWarning() << "Failed to replay change: " << error.errorMessage;
134 } 135 }
135 mSyncStore.clear(); 136 mSyncStore.clear();
136 mEntityStore.clear(); 137 mEntityStoreWrapper.clear();
137 mTransaction.abort();
138 mSyncTransaction.commit(); 138 mSyncTransaction.commit();
139 mEntityStore->abortTransaction();
139 }); 140 });
140} 141}
141 142
diff --git a/common/sourcewriteback.h b/common/sourcewriteback.h
index 8031573..327d1ad 100644
--- a/common/sourcewriteback.h
+++ b/common/sourcewriteback.h
@@ -25,6 +25,7 @@
25#include "storage.h" 25#include "storage.h"
26#include "entitystore.h" 26#include "entitystore.h"
27#include "remoteidmap.h" 27#include "remoteidmap.h"
28#include "metadata_generated.h"
28 29
29namespace Sink { 30namespace Sink {
30 31
@@ -34,7 +35,7 @@ namespace Sink {
34class SINK_EXPORT SourceWriteBack : public ChangeReplay 35class SINK_EXPORT SourceWriteBack : public ChangeReplay
35{ 36{
36public: 37public:
37 SourceWriteBack(const QByteArray &resourceType,const QByteArray &resourceInstanceIdentifier); 38 SourceWriteBack(const ResourceContext &resourceContext);
38 39
39protected: 40protected:
40 ///Base implementation calls the replay$Type calls 41 ///Base implementation calls the replay$Type calls
@@ -58,12 +59,12 @@ protected:
58private: 59private:
59 //Read only access to main storage 60 //Read only access to main storage
60 EntityStore &store(); 61 EntityStore &store();
61 62 ResourceContext mResourceContext;
62 Sink::Storage mSyncStorage; 63 Sink::Storage::DataStore mSyncStorage;
63 QSharedPointer<RemoteIdMap> mSyncStore; 64 QSharedPointer<RemoteIdMap> mSyncStore;
64 QSharedPointer<EntityStore> mEntityStore; 65 QSharedPointer<Storage::EntityStore> mEntityStore;
65 Sink::Storage::Transaction mTransaction; 66 QSharedPointer<EntityStore> mEntityStoreWrapper;
66 Sink::Storage::Transaction mSyncTransaction; 67 Sink::Storage::DataStore::Transaction mSyncTransaction;
67 QByteArray mResourceType; 68 QByteArray mResourceType;
68 QByteArray mResourceInstanceIdentifier; 69 QByteArray mResourceInstanceIdentifier;
69}; 70};
diff --git a/common/specialpurposepreprocessor.cpp b/common/specialpurposepreprocessor.cpp
index b9ad94a..a6ee373 100644
--- a/common/specialpurposepreprocessor.cpp
+++ b/common/specialpurposepreprocessor.cpp
@@ -11,6 +11,8 @@ static QHash<QByteArray, QString> specialPurposeFolders()
11{ 11{
12 QHash<QByteArray, QString> hash; 12 QHash<QByteArray, QString> hash;
13 //FIXME localize 13 //FIXME localize
14 //TODO inbox
15 //TODO use standardized values
14 hash.insert("drafts", "Drafts"); 16 hash.insert("drafts", "Drafts");
15 hash.insert("trash", "Trash"); 17 hash.insert("trash", "Trash");
16 hash.insert("inbox", "Inbox"); 18 hash.insert("inbox", "Inbox");
@@ -45,31 +47,31 @@ QByteArray getSpecialPurposeType(const QString &name)
45 47
46SpecialPurposeProcessor::SpecialPurposeProcessor(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier) : mResourceType(resourceType), mResourceInstanceIdentifier(resourceInstanceIdentifier) {} 48SpecialPurposeProcessor::SpecialPurposeProcessor(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier) : mResourceType(resourceType), mResourceInstanceIdentifier(resourceInstanceIdentifier) {}
47 49
48QByteArray SpecialPurposeProcessor::ensureFolder(Sink::Storage::Transaction &transaction, const QByteArray &specialPurpose) 50QByteArray SpecialPurposeProcessor::ensureFolder(Sink::Storage::DataStore::Transaction &transaction, const QByteArray &specialPurpose)
49{ 51{
50 if (!mSpecialPurposeFolders.contains(specialPurpose)) { 52 /* if (!mSpecialPurposeFolders.contains(specialPurpose)) { */
51 //Try to find an existing drafts folder 53 /* //Try to find an existing drafts folder */
52 Sink::EntityReader<ApplicationDomain::Folder> reader(mResourceType, mResourceInstanceIdentifier, transaction); 54 /* Sink::EntityReader<ApplicationDomain::Folder> reader(mResourceType, mResourceInstanceIdentifier, transaction); */
53 reader.query(Sink::Query().filter<ApplicationDomain::Folder::SpecialPurpose>(Query::Comparator(specialPurpose, Query::Comparator::Contains)), 55 /* reader.query(Sink::Query().filter<ApplicationDomain::Folder::SpecialPurpose>(Query::Comparator(specialPurpose, Query::Comparator::Contains)), */
54 [this, specialPurpose](const ApplicationDomain::Folder &f) -> bool{ 56 /* [this, specialPurpose](const ApplicationDomain::Folder &f) -> bool{ */
55 mSpecialPurposeFolders.insert(specialPurpose, f.identifier()); 57 /* mSpecialPurposeFolders.insert(specialPurpose, f.identifier()); */
56 return false; 58 /* return false; */
57 }); 59 /* }); */
58 if (!mSpecialPurposeFolders.contains(specialPurpose)) { 60 /* if (!mSpecialPurposeFolders.contains(specialPurpose)) { */
59 SinkTrace() << "Failed to find a drafts folder, creating a new one"; 61 /* SinkTrace() << "Failed to find a drafts folder, creating a new one"; */
60 auto folder = ApplicationDomain::Folder::create(mResourceInstanceIdentifier); 62 /* auto folder = ApplicationDomain::Folder::create(mResourceInstanceIdentifier); */
61 folder.setSpecialPurpose(QByteArrayList() << specialPurpose); 63 /* folder.setSpecialPurpose(QByteArrayList() << specialPurpose); */
62 folder.setName(sSpecialPurposeFolders.value(specialPurpose)); 64 /* folder.setName(sSpecialPurposeFolders.value(specialPurpose)); */
63 folder.setIcon("folder"); 65 /* folder.setIcon("folder"); */
64 //This processes the pipeline synchronously 66 /* //This processes the pipeline synchronously */
65 createEntity(folder); 67 /* createEntity(folder); */
66 mSpecialPurposeFolders.insert(specialPurpose, folder.identifier()); 68 /* mSpecialPurposeFolders.insert(specialPurpose, folder.identifier()); */
67 } 69 /* } */
68 } 70 /* } */
69 return mSpecialPurposeFolders.value(specialPurpose); 71 return mSpecialPurposeFolders.value(specialPurpose);
70} 72}
71 73
72void SpecialPurposeProcessor::moveToFolder(Sink::ApplicationDomain::BufferAdaptor &newEntity, Sink::Storage::Transaction &transaction) 74void SpecialPurposeProcessor::moveToFolder(Sink::ApplicationDomain::BufferAdaptor &newEntity, Sink::Storage::DataStore::Transaction &transaction)
73{ 75{
74 if (newEntity.getProperty("trash").toBool()) { 76 if (newEntity.getProperty("trash").toBool()) {
75 newEntity.setProperty("folder", ensureFolder(transaction, "trash")); 77 newEntity.setProperty("folder", ensureFolder(transaction, "trash"));
@@ -80,12 +82,12 @@ void SpecialPurposeProcessor::moveToFolder(Sink::ApplicationDomain::BufferAdapto
80 } 82 }
81} 83}
82 84
83void SpecialPurposeProcessor::newEntity(const QByteArray &uid, qint64 revision, Sink::ApplicationDomain::BufferAdaptor &newEntity, Sink::Storage::Transaction &transaction) 85void SpecialPurposeProcessor::newEntity(const QByteArray &uid, qint64 revision, Sink::ApplicationDomain::BufferAdaptor &newEntity, Sink::Storage::DataStore::Transaction &transaction)
84{ 86{
85 moveToFolder(newEntity, transaction); 87 moveToFolder(newEntity, transaction);
86} 88}
87 89
88void SpecialPurposeProcessor::modifiedEntity(const QByteArray &uid, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &oldEntity, Sink::ApplicationDomain::BufferAdaptor &newEntity, Sink::Storage::Transaction &transaction) 90void SpecialPurposeProcessor::modifiedEntity(const QByteArray &uid, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &oldEntity, Sink::ApplicationDomain::BufferAdaptor &newEntity, Sink::Storage::DataStore::Transaction &transaction)
89{ 91{
90 moveToFolder(newEntity, transaction); 92 moveToFolder(newEntity, transaction);
91} 93}
diff --git a/common/specialpurposepreprocessor.h b/common/specialpurposepreprocessor.h
index a33701b..8b2d9e9 100644
--- a/common/specialpurposepreprocessor.h
+++ b/common/specialpurposepreprocessor.h
@@ -30,12 +30,12 @@ class SINK_EXPORT SpecialPurposeProcessor : public Sink::Preprocessor
30public: 30public:
31 SpecialPurposeProcessor(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier); 31 SpecialPurposeProcessor(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier);
32 32
33 QByteArray ensureFolder(Sink::Storage::Transaction &transaction, const QByteArray &specialPurpose); 33 QByteArray ensureFolder(Sink::Storage::DataStore::Transaction &transaction, const QByteArray &specialPurpose);
34 34
35 void moveToFolder(Sink::ApplicationDomain::BufferAdaptor &newEntity, Sink::Storage::Transaction &transaction); 35 void moveToFolder(Sink::ApplicationDomain::BufferAdaptor &newEntity, Sink::Storage::DataStore::Transaction &transaction);
36 36
37 void newEntity(const QByteArray &uid, qint64 revision, Sink::ApplicationDomain::BufferAdaptor &newEntity, Sink::Storage::Transaction &transaction) Q_DECL_OVERRIDE; 37 void newEntity(const QByteArray &uid, qint64 revision, Sink::ApplicationDomain::BufferAdaptor &newEntity, Sink::Storage::DataStore::Transaction &transaction) Q_DECL_OVERRIDE;
38 void modifiedEntity(const QByteArray &uid, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &oldEntity, Sink::ApplicationDomain::BufferAdaptor &newEntity, Sink::Storage::Transaction &transaction) Q_DECL_OVERRIDE; 38 void modifiedEntity(const QByteArray &uid, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &oldEntity, Sink::ApplicationDomain::BufferAdaptor &newEntity, Sink::Storage::DataStore::Transaction &transaction) Q_DECL_OVERRIDE;
39 39
40 QHash<QByteArray, QByteArray> mSpecialPurposeFolders; 40 QHash<QByteArray, QByteArray> mSpecialPurposeFolders;
41 QByteArray mResourceType; 41 QByteArray mResourceType;
diff --git a/common/storage.h b/common/storage.h
index 4ef20d5..e368b05 100644
--- a/common/storage.h
+++ b/common/storage.h
@@ -27,8 +27,9 @@
27#include <QString> 27#include <QString>
28 28
29namespace Sink { 29namespace Sink {
30namespace Storage {
30 31
31class SINK_EXPORT Storage 32class SINK_EXPORT DataStore
32{ 33{
33public: 34public:
34 enum AccessMode 35 enum AccessMode
@@ -66,16 +67,16 @@ public:
66 /** 67 /**
67 * Write a value 68 * Write a value
68 */ 69 */
69 bool write(const QByteArray &key, const QByteArray &value, const std::function<void(const Storage::Error &error)> &errorHandler = std::function<void(const Storage::Error &error)>()); 70 bool write(const QByteArray &key, const QByteArray &value, const std::function<void(const DataStore::Error &error)> &errorHandler = std::function<void(const DataStore::Error &error)>());
70 71
71 /** 72 /**
72 * Remove a key 73 * Remove a key
73 */ 74 */
74 void remove(const QByteArray &key, const std::function<void(const Storage::Error &error)> &errorHandler = std::function<void(const Storage::Error &error)>()); 75 void remove(const QByteArray &key, const std::function<void(const DataStore::Error &error)> &errorHandler = std::function<void(const DataStore::Error &error)>());
75 /** 76 /**
76 * Remove a key-value pair 77 * Remove a key-value pair
77 */ 78 */
78 void remove(const QByteArray &key, const QByteArray &value, const std::function<void(const Storage::Error &error)> &errorHandler = std::function<void(const Storage::Error &error)>()); 79 void remove(const QByteArray &key, const QByteArray &value, const std::function<void(const DataStore::Error &error)> &errorHandler = std::function<void(const DataStore::Error &error)>());
79 80
80 /** 81 /**
81 * Read values with a given key. 82 * Read values with a given key.
@@ -87,7 +88,7 @@ public:
87 * @return The number of values retrieved. 88 * @return The number of values retrieved.
88 */ 89 */
89 int scan(const QByteArray &key, const std::function<bool(const QByteArray &key, const QByteArray &value)> &resultHandler, 90 int scan(const QByteArray &key, const std::function<bool(const QByteArray &key, const QByteArray &value)> &resultHandler,
90 const std::function<void(const Storage::Error &error)> &errorHandler = std::function<void(const Storage::Error &error)>(), bool findSubstringKeys = false, bool skipInternalKeys = true) const; 91 const std::function<void(const DataStore::Error &error)> &errorHandler = std::function<void(const DataStore::Error &error)>(), bool findSubstringKeys = false, bool skipInternalKeys = true) const;
91 92
92 /** 93 /**
93 * Finds the last value in a series matched by prefix. 94 * Finds the last value in a series matched by prefix.
@@ -96,7 +97,7 @@ public:
96 * Note that this relies on a key scheme like $uid$revision. 97 * Note that this relies on a key scheme like $uid$revision.
97 */ 98 */
98 void findLatest(const QByteArray &uid, const std::function<void(const QByteArray &key, const QByteArray &value)> &resultHandler, 99 void findLatest(const QByteArray &uid, const std::function<void(const QByteArray &key, const QByteArray &value)> &resultHandler,
99 const std::function<void(const Storage::Error &error)> &errorHandler = std::function<void(const Storage::Error &error)>()) const; 100 const std::function<void(const DataStore::Error &error)> &errorHandler = std::function<void(const DataStore::Error &error)>()) const;
100 101
101 /** 102 /**
102 * Returns true if the database contains the substring key. 103 * Returns true if the database contains the substring key.
@@ -127,14 +128,14 @@ public:
127 public: 128 public:
128 Transaction(); 129 Transaction();
129 ~Transaction(); 130 ~Transaction();
130 bool commit(const std::function<void(const Storage::Error &error)> &errorHandler = std::function<void(const Storage::Error &error)>()); 131 bool commit(const std::function<void(const DataStore::Error &error)> &errorHandler = std::function<void(const DataStore::Error &error)>());
131 void abort(); 132 void abort();
132 133
133 QList<QByteArray> getDatabaseNames() const; 134 QList<QByteArray> getDatabaseNames() const;
134 bool validateNamedDatabases(); 135 bool validateNamedDatabases();
135 136
136 NamedDatabase openDatabase(const QByteArray &name = QByteArray("default"), 137 NamedDatabase openDatabase(const QByteArray &name = QByteArray("default"),
137 const std::function<void(const Storage::Error &error)> &errorHandler = std::function<void(const Storage::Error &error)>(), bool allowDuplicates = false) const; 138 const std::function<void(const DataStore::Error &error)> &errorHandler = std::function<void(const DataStore::Error &error)>(), bool allowDuplicates = false) const;
138 139
139 Transaction(Transaction &&other); 140 Transaction(Transaction &&other);
140 Transaction &operator=(Transaction &&other); 141 Transaction &operator=(Transaction &&other);
@@ -144,29 +145,29 @@ public:
144 private: 145 private:
145 Transaction(Transaction &other); 146 Transaction(Transaction &other);
146 Transaction &operator=(Transaction &other); 147 Transaction &operator=(Transaction &other);
147 friend Storage; 148 friend DataStore;
148 class Private; 149 class Private;
149 Transaction(Private *); 150 Transaction(Private *);
150 Private *d; 151 Private *d;
151 }; 152 };
152 153
153 Storage(const QString &storageRoot, const QString &name, AccessMode mode = ReadOnly); 154 DataStore(const QString &storageRoot, const QString &name, AccessMode mode = ReadOnly);
154 ~Storage(); 155 ~DataStore();
155 156
156 Transaction createTransaction(AccessMode mode = ReadWrite, const std::function<void(const Storage::Error &error)> &errorHandler = std::function<void(const Storage::Error &error)>()); 157 Transaction createTransaction(AccessMode mode = ReadWrite, const std::function<void(const DataStore::Error &error)> &errorHandler = std::function<void(const DataStore::Error &error)>());
157 158
158 /** 159 /**
159 * Set the default error handler. 160 * Set the default error handler.
160 */ 161 */
161 void setDefaultErrorHandler(const std::function<void(const Storage::Error &error)> &errorHandler); 162 void setDefaultErrorHandler(const std::function<void(const DataStore::Error &error)> &errorHandler);
162 std::function<void(const Storage::Error &error)> defaultErrorHandler() const; 163 std::function<void(const DataStore::Error &error)> defaultErrorHandler() const;
163 164
164 /** 165 /**
165 * A basic error handler that writes to std::cerr. 166 * A basic error handler that writes to std::cerr.
166 * 167 *
167 * Used if nothing else is configured. 168 * Used if nothing else is configured.
168 */ 169 */
169 static std::function<void(const Storage::Error &error)> basicErrorHandler(); 170 static std::function<void(const DataStore::Error &error)> basicErrorHandler();
170 171
171 qint64 diskUsage() const; 172 qint64 diskUsage() const;
172 void removeFromDisk() const; 173 void removeFromDisk() const;
@@ -178,16 +179,16 @@ public:
178 */ 179 */
179 static void clearEnv(); 180 static void clearEnv();
180 181
181 static qint64 maxRevision(const Sink::Storage::Transaction &); 182 static qint64 maxRevision(const Transaction &);
182 static void setMaxRevision(Sink::Storage::Transaction &, qint64 revision); 183 static void setMaxRevision(Transaction &, qint64 revision);
183 184
184 static qint64 cleanedUpRevision(const Sink::Storage::Transaction &); 185 static qint64 cleanedUpRevision(const Transaction &);
185 static void setCleanedUpRevision(Sink::Storage::Transaction &, qint64 revision); 186 static void setCleanedUpRevision(Transaction &, qint64 revision);
186 187
187 static QByteArray getUidFromRevision(const Sink::Storage::Transaction &, qint64 revision); 188 static QByteArray getUidFromRevision(const Transaction &, qint64 revision);
188 static QByteArray getTypeFromRevision(const Sink::Storage::Transaction &, qint64 revision); 189 static QByteArray getTypeFromRevision(const Transaction &, qint64 revision);
189 static void recordRevision(Sink::Storage::Transaction &, qint64 revision, const QByteArray &uid, const QByteArray &type); 190 static void recordRevision(Transaction &, qint64 revision, const QByteArray &uid, const QByteArray &type);
190 static void removeRevision(Sink::Storage::Transaction &, qint64 revision); 191 static void removeRevision(Transaction &, qint64 revision);
191 192
192 bool exists() const; 193 bool exists() const;
193 194
@@ -199,16 +200,17 @@ public:
199 static QByteArray uidFromKey(const QByteArray &key); 200 static QByteArray uidFromKey(const QByteArray &key);
200 static qint64 revisionFromKey(const QByteArray &key); 201 static qint64 revisionFromKey(const QByteArray &key);
201 202
202 static NamedDatabase mainDatabase(const Sink::Storage::Transaction &, const QByteArray &type); 203 static NamedDatabase mainDatabase(const Transaction &, const QByteArray &type);
203 204
204 static QByteArray generateUid(); 205 static QByteArray generateUid();
205 206
206private: 207private:
207 std::function<void(const Storage::Error &error)> mErrorHandler; 208 std::function<void(const DataStore::Error &error)> mErrorHandler;
208 209
209private: 210private:
210 class Private; 211 class Private;
211 Private *const d; 212 Private *const d;
212}; 213};
213 214
215}
214} // namespace Sink 216} // namespace Sink
diff --git a/common/storage/entitystore.cpp b/common/storage/entitystore.cpp
new file mode 100644
index 0000000..9615eca
--- /dev/null
+++ b/common/storage/entitystore.cpp
@@ -0,0 +1,338 @@
1/*
2 * Copyright (C) 2016 Christian Mollekopf <mollekopf@kolabsys.com>
3 *
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Lesser General Public
6 * License as published by the Free Software Foundation; either
7 * version 2.1 of the License, or (at your option) version 3, or any
8 * later version accepted by the membership of KDE e.V. (or its
9 * successor approved by the membership of KDE e.V.), which shall
10 * act as a proxy defined in Section 6 of version 3 of the license.
11 *
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this library. If not, see <http://www.gnu.org/licenses/>.
19 */
20#include "entitystore.h"
21
22#include "entitybuffer.h"
23#include "log.h"
24#include "typeindex.h"
25#include "definitions.h"
26#include "resourcecontext.h"
27#include "index.h"
28
29#include "mail.h"
30#include "folder.h"
31#include "event.h"
32
33using namespace Sink;
34using namespace Sink::Storage;
35
36SINK_DEBUG_AREA("entitystore");
37
38class EntityStore::Private {
39public:
40 Private(const ResourceContext &context) : resourceContext(context) {}
41
42 ResourceContext resourceContext;
43 DataStore::Transaction transaction;
44 QHash<QByteArray, QSharedPointer<TypeIndex> > indexByType;
45
46 DataStore::Transaction &getTransaction()
47 {
48 if (transaction) {
49 return transaction;
50 }
51
52 Sink::Storage::DataStore store(Sink::storageLocation(), resourceContext.instanceId(), DataStore::ReadOnly);
53 transaction = store.createTransaction(DataStore::ReadOnly);
54 Q_ASSERT(transaction.validateNamedDatabases());
55 return transaction;
56 }
57
58 /* template<typename T> */
59 /* TypeIndex &typeIndex(const QByteArray &type) */
60 /* { */
61 /* if (indexByType.contains(type)) { */
62 /* return *indexByType.value(type); */
63 /* } */
64 /* auto index = QSharedPointer<TypeIndex>::create(type); */
65 /* ApplicationDomain::TypeImplementation<T>::configureIndex(*index); */
66 /* indexByType.insert(type, index); */
67 /* return *index; */
68 /* } */
69
70 TypeIndex &typeIndex(const QByteArray &type)
71 {
72 /* return applyType<typeIndex>(type); */
73 if (indexByType.contains(type)) {
74 return *indexByType.value(type);
75 }
76 auto index = QSharedPointer<TypeIndex>::create(type);
77 //TODO expand for all types
78 /* TypeHelper<type>::configureIndex(*index); */
79 // Try this: (T would i.e. become
80 // TypeHelper<ApplicationDomain::TypeImplementation>::T::configureIndex(*index);
81 if (type == ApplicationDomain::getTypeName<ApplicationDomain::Folder>()) {
82 ApplicationDomain::TypeImplementation<ApplicationDomain::Folder>::configureIndex(*index);
83 } else if (type == ApplicationDomain::getTypeName<ApplicationDomain::Mail>()) {
84 ApplicationDomain::TypeImplementation<ApplicationDomain::Mail>::configureIndex(*index);
85 } else if (type == ApplicationDomain::getTypeName<ApplicationDomain::Event>()) {
86 ApplicationDomain::TypeImplementation<ApplicationDomain::Event>::configureIndex(*index);
87 } else {
88 Q_ASSERT(false);
89 SinkError() << "Unkonwn type " << type;
90 }
91 indexByType.insert(type, index);
92 return *index;
93 }
94};
95
96EntityStore::EntityStore(const ResourceContext &context)
97 : d(new EntityStore::Private{context})
98{
99
100}
101
102void EntityStore::startTransaction(Sink::Storage::DataStore::AccessMode accessMode)
103{
104 Sink::Storage::DataStore store(Sink::storageLocation(), d->resourceContext.instanceId(), accessMode);
105 d->transaction = store.createTransaction(accessMode);
106 Q_ASSERT(d->transaction.validateNamedDatabases());
107}
108
109void EntityStore::commitTransaction()
110{
111 d->transaction.commit();
112 d->transaction = Storage::DataStore::Transaction();
113}
114
115void EntityStore::abortTransaction()
116{
117 d->transaction.abort();
118 d->transaction = Storage::DataStore::Transaction();
119}
120
121QVector<QByteArray> EntityStore::fullScan(const QByteArray &type)
122{
123 SinkTrace() << "Looking for : " << type;
124 //The scan can return duplicate results if we have multiple revisions, so we use a set to deduplicate.
125 QSet<QByteArray> keys;
126 DataStore::mainDatabase(d->getTransaction(), type)
127 .scan(QByteArray(),
128 [&](const QByteArray &key, const QByteArray &value) -> bool {
129 const auto uid = DataStore::uidFromKey(key);
130 if (keys.contains(uid)) {
131 //Not something that should persist if the replay works, so we keep a message for now.
132 SinkTrace() << "Multiple revisions for key: " << key;
133 }
134 keys << uid;
135 return true;
136 },
137 [](const DataStore::Error &error) { SinkWarning() << "Error during query: " << error.message; });
138
139 SinkTrace() << "Full scan retrieved " << keys.size() << " results.";
140 return keys.toList().toVector();
141}
142
143QVector<QByteArray> EntityStore::indexLookup(const QByteArray &type, const Query &query, QSet<QByteArray> &appliedFilters, QByteArray &appliedSorting)
144{
145 return d->typeIndex(type).query(query, appliedFilters, appliedSorting, d->getTransaction());
146}
147
148QVector<QByteArray> EntityStore::indexLookup(const QByteArray &type, const QByteArray &property, const QVariant &value)
149{
150 return d->typeIndex(type).lookup(property, value, d->getTransaction());
151}
152
153void EntityStore::indexLookup(const QByteArray &type, const QByteArray &property, const QVariant &value, const std::function<void(const QByteArray &uid)> &callback)
154{
155 auto list = d->typeIndex(type).lookup(property, value, d->getTransaction());
156 for (const auto &uid : list) {
157 callback(uid);
158 }
159 /* Index index(type + ".index." + property, d->transaction); */
160 /* index.lookup(value, [&](const QByteArray &sinkId) { */
161 /* callback(sinkId); */
162 /* }, */
163 /* [&](const Index::Error &error) { */
164 /* SinkWarning() << "Error in index: " << error.message << property; */
165 /* }); */
166}
167
168void EntityStore::readLatest(const QByteArray &type, const QByteArray &uid, const std::function<void(const QByteArray &uid, const EntityBuffer &entity)> callback)
169{
170 auto db = DataStore::mainDatabase(d->getTransaction(), type);
171 db.findLatest(uid,
172 [=](const QByteArray &key, const QByteArray &value) -> bool {
173 callback(DataStore::uidFromKey(key), Sink::EntityBuffer(value.data(), value.size()));
174 return false;
175 },
176 [&](const DataStore::Error &error) { SinkWarning() << "Error during query: " << error.message << uid; });
177}
178
179void EntityStore::readLatest(const QByteArray &type, const QByteArray &uid, const std::function<void(const ApplicationDomain::ApplicationDomainType &)> callback)
180{
181 readLatest(type, uid, [&](const QByteArray &uid, const EntityBuffer &buffer) {
182 auto adaptor = d->resourceContext.adaptorFactory(type).createAdaptor(buffer.entity());
183 callback(ApplicationDomain::ApplicationDomainType{d->resourceContext.instanceId(), uid, DataStore::maxRevision(d->getTransaction()), adaptor});
184 });
185}
186
187ApplicationDomain::ApplicationDomainType EntityStore::readLatest(const QByteArray &type, const QByteArray &uid)
188{
189 ApplicationDomain::ApplicationDomainType dt;
190 readLatest(type, uid, [&](const ApplicationDomain::ApplicationDomainType &entity) {
191 dt = entity;
192 });
193 return dt;
194}
195
196void EntityStore::readEntity(const QByteArray &type, const QByteArray &key, const std::function<void(const QByteArray &uid, const EntityBuffer &entity)> callback)
197{
198 auto db = DataStore::mainDatabase(d->getTransaction(), type);
199 db.scan(key,
200 [=](const QByteArray &key, const QByteArray &value) -> bool {
201 callback(DataStore::uidFromKey(key), Sink::EntityBuffer(value.data(), value.size()));
202 return false;
203 },
204 [&](const DataStore::Error &error) { SinkWarning() << "Error during query: " << error.message << key; });
205}
206
207void EntityStore::readEntity(const QByteArray &type, const QByteArray &uid, const std::function<void(const ApplicationDomain::ApplicationDomainType &)> callback)
208{
209 readEntity(type, uid, [&](const QByteArray &uid, const EntityBuffer &buffer) {
210 auto adaptor = d->resourceContext.adaptorFactory(type).createAdaptor(buffer.entity());
211 callback(ApplicationDomain::ApplicationDomainType{d->resourceContext.instanceId(), uid, DataStore::maxRevision(d->getTransaction()), adaptor});
212 });
213}
214
215ApplicationDomain::ApplicationDomainType EntityStore::readEntity(const QByteArray &type, const QByteArray &uid)
216{
217 ApplicationDomain::ApplicationDomainType dt;
218 readEntity(type, uid, [&](const ApplicationDomain::ApplicationDomainType &entity) {
219 dt = entity;
220 });
221 return dt;
222}
223
224
225void EntityStore::readAll(const QByteArray &type, const std::function<void(const ApplicationDomain::ApplicationDomainType &entity)> &callback)
226{
227 auto db = DataStore::mainDatabase(d->getTransaction(), type);
228 db.scan("",
229 [=](const QByteArray &key, const QByteArray &value) -> bool {
230 auto uid = DataStore::uidFromKey(key);
231 auto buffer = Sink::EntityBuffer{value.data(), value.size()};
232 auto adaptor = d->resourceContext.adaptorFactory(type).createAdaptor(buffer.entity());
233 callback(ApplicationDomain::ApplicationDomainType{d->resourceContext.instanceId(), uid, DataStore::maxRevision(d->getTransaction()), adaptor});
234 return true;
235 },
236 [&](const DataStore::Error &error) { SinkWarning() << "Error during query: " << error.message; });
237}
238
239void EntityStore::readRevisions(qint64 baseRevision, const QByteArray &expectedType, const std::function<void(const QByteArray &key)> &callback)
240{
241 qint64 revisionCounter = baseRevision;
242 const qint64 topRevision = DataStore::maxRevision(d->getTransaction());
243 // Spit out the revision keys one by one.
244 while (revisionCounter <= topRevision) {
245 const auto uid = DataStore::getUidFromRevision(d->getTransaction(), revisionCounter);
246 const auto type = DataStore::getTypeFromRevision(d->getTransaction(), revisionCounter);
247 // SinkTrace() << "Revision" << *revisionCounter << type << uid;
248 Q_ASSERT(!uid.isEmpty());
249 Q_ASSERT(!type.isEmpty());
250 if (type != expectedType) {
251 // Skip revision
252 revisionCounter++;
253 continue;
254 }
255 const auto key = DataStore::assembleKey(uid, revisionCounter);
256 revisionCounter++;
257 callback(key);
258 }
259}
260
261void EntityStore::readPrevious(const QByteArray &type, const QByteArray &uid, qint64 revision, const std::function<void(const QByteArray &uid, const EntityBuffer &entity)> callback)
262{
263 auto db = DataStore::mainDatabase(d->getTransaction(), type);
264 qint64 latestRevision = 0;
265 db.scan(uid,
266 [&latestRevision, revision](const QByteArray &key, const QByteArray &) -> bool {
267 const auto foundRevision = Sink::Storage::DataStore::revisionFromKey(key);
268 if (foundRevision < revision && foundRevision > latestRevision) {
269 latestRevision = foundRevision;
270 }
271 return true;
272 },
273 [](const Sink::Storage::DataStore::Error &error) { SinkWarning() << "Failed to read current value from storage: " << error.message; }, true);
274 return readEntity(type, Sink::Storage::DataStore::assembleKey(uid, latestRevision), callback);
275}
276
277void EntityStore::readPrevious(const QByteArray &type, const QByteArray &uid, qint64 revision, const std::function<void(const ApplicationDomain::ApplicationDomainType &)> callback)
278{
279 readPrevious(type, uid, revision, [&](const QByteArray &uid, const EntityBuffer &buffer) {
280 auto adaptor = d->resourceContext.adaptorFactory(type).createAdaptor(buffer.entity());
281 callback(ApplicationDomain::ApplicationDomainType{d->resourceContext.instanceId(), uid, DataStore::maxRevision(d->getTransaction()), adaptor});
282 });
283}
284
285ApplicationDomain::ApplicationDomainType EntityStore::readPrevious(const QByteArray &type, const QByteArray &uid, qint64 revision)
286{
287 ApplicationDomain::ApplicationDomainType dt;
288 readPrevious(type, uid, revision, [&](const ApplicationDomain::ApplicationDomainType &entity) {
289 dt = entity;
290 });
291 return dt;
292}
293
294void EntityStore::readAllUids(const QByteArray &type, const std::function<void(const QByteArray &uid)> callback)
295{
296 //TODO use uid index instead
297 //FIXME we currently report each uid for every revision with the same uid
298 auto db = DataStore::mainDatabase(d->getTransaction(), type);
299 db.scan("",
300 [callback](const QByteArray &key, const QByteArray &) -> bool {
301 callback(Sink::Storage::DataStore::uidFromKey(key));
302 return true;
303 },
304 [](const Sink::Storage::DataStore::Error &error) { SinkWarning() << "Failed to read current value from storage: " << error.message; });
305}
306
307bool EntityStore::contains(const QByteArray &type, const QByteArray &uid)
308{
309 return DataStore::mainDatabase(d->getTransaction(), type).contains(uid);
310}
311
312qint64 EntityStore::maxRevision()
313{
314 return DataStore::maxRevision(d->getTransaction());
315}
316
317/* DataStore::Transaction getTransaction() */
318/* { */
319/* Sink::Storage::DataStore::Transaction transaction; */
320/* { */
321/* Sink::Storage::DataStore storage(Sink::storageLocation(), mResourceInstanceIdentifier); */
322/* if (!storage.exists()) { */
323/* //This is not an error if the resource wasn't started before */
324/* SinkLog() << "Store doesn't exist: " << mResourceInstanceIdentifier; */
325/* return Sink::Storage::DataStore::Transaction(); */
326/* } */
327/* storage.setDefaultErrorHandler([this](const Sink::Storage::DataStore::Error &error) { SinkWarning() << "Error during query: " << error.store << error.message; }); */
328/* transaction = storage.createTransaction(Sink::Storage::DataStore::ReadOnly); */
329/* } */
330
331/* //FIXME this is a temporary measure to recover from a failure to open the named databases correctly. */
332/* //Once the actual problem is fixed it will be enough to simply crash if we open the wrong database (which we check in openDatabase already). */
333/* while (!transaction.validateNamedDatabases()) { */
334/* Sink::Storage::DataStore storage(Sink::storageLocation(), mResourceInstanceIdentifier); */
335/* transaction = storage.createTransaction(Sink::Storage::DataStore::ReadOnly); */
336/* } */
337/* return transaction; */
338/* } */
diff --git a/common/storage/entitystore.h b/common/storage/entitystore.h
new file mode 100644
index 0000000..de29e87
--- /dev/null
+++ b/common/storage/entitystore.h
@@ -0,0 +1,109 @@
1/*
2 * Copyright (C) 2016 Christian Mollekopf <mollekopf@kolabsys.com>
3 *
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Lesser General Public
6 * License as published by the Free Software Foundation; either
7 * version 2.1 of the License, or (at your option) version 3, or any
8 * later version accepted by the membership of KDE e.V. (or its
9 * successor approved by the membership of KDE e.V.), which shall
10 * act as a proxy defined in Section 6 of version 3 of the license.
11 *
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this library. If not, see <http://www.gnu.org/licenses/>.
19 */
20#pragma once
21
22#include "sink_export.h"
23
24#include <memory>
25#include "domaintypeadaptorfactoryinterface.h"
26#include "query.h"
27#include "storage.h"
28#include "resourcecontext.h"
29
30namespace Sink {
31class EntityBuffer;
32namespace Storage {
33
34class SINK_EXPORT EntityStore
35{
36public:
37 typedef QSharedPointer<EntityStore> Ptr;
38 EntityStore(const ResourceContext &resourceContext);
39
40 void add(const ApplicationDomain::ApplicationDomainType &);
41 void modify(const ApplicationDomain::ApplicationDomainType &);
42 void remove(const ApplicationDomain::ApplicationDomainType &);
43
44 void startTransaction(Sink::Storage::DataStore::AccessMode);
45 void commitTransaction();
46 void abortTransaction();
47
48 QVector<QByteArray> fullScan(const QByteArray &type);
49 QVector<QByteArray> indexLookup(const QByteArray &type, const Query &query, QSet<QByteArray> &appliedFilters, QByteArray &appliedSorting);
50 QVector<QByteArray> indexLookup(const QByteArray &type, const QByteArray &property, const QVariant &value);
51 void indexLookup(const QByteArray &type, const QByteArray &property, const QVariant &value, const std::function<void(const QByteArray &uid)> &callback);
52 template<typename EntityType, typename PropertyType>
53 void indexLookup(const QVariant &value, const std::function<void(const QByteArray &uid)> &callback) {
54 return indexLookup(ApplicationDomain::getTypeName<EntityType>(), PropertyType::name, value, callback);
55 }
56
57 void readLatest(const QByteArray &type, const QByteArray &uid, const std::function<void(const QByteArray &uid, const EntityBuffer &entity)> callback);
58 void readLatest(const QByteArray &type, const QByteArray &uid, const std::function<void(const ApplicationDomain::ApplicationDomainType &entity)> callback);
59
60 ApplicationDomain::ApplicationDomainType readLatest(const QByteArray &type, const QByteArray &uid);
61
62 template<typename T>
63 T readLatest(const QByteArray &uid) {
64 return T(readLatest(ApplicationDomain::getTypeName<T>(), uid));
65 }
66
67 void readEntity(const QByteArray &type, const QByteArray &uid, const std::function<void(const QByteArray &uid, const EntityBuffer &entity)> callback);
68 void readEntity(const QByteArray &type, const QByteArray &uid, const std::function<void(const ApplicationDomain::ApplicationDomainType &entity)> callback);
69 ApplicationDomain::ApplicationDomainType readEntity(const QByteArray &type, const QByteArray &key);
70
71 template<typename T>
72 T readEntity(const QByteArray &key) {
73 return T(readEntity(ApplicationDomain::getTypeName<T>(), key));
74 }
75
76
77 void readPrevious(const QByteArray &type, const QByteArray &uid, qint64 revision, const std::function<void(const QByteArray &uid, const EntityBuffer &entity)> callback);
78 void readPrevious(const QByteArray &type, const QByteArray &uid, qint64 revision, const std::function<void(const ApplicationDomain::ApplicationDomainType &entity)> callback);
79 ApplicationDomain::ApplicationDomainType readPrevious(const QByteArray &type, const QByteArray &uid, qint64 revision);
80
81 template<typename T>
82 T readPrevious(const QByteArray &uid, qint64 revision) {
83 return T(readPrevious(ApplicationDomain::getTypeName<T>(), uid, revision));
84 }
85
86 void readAllUids(const QByteArray &type, const std::function<void(const QByteArray &uid)> callback);
87
88 void readAll(const QByteArray &type, const std::function<void(const ApplicationDomain::ApplicationDomainType &entity)> &callback);
89
90 template<typename T>
91 void readAll(const std::function<void(const T &entity)> &callback) {
92 return readAll(ApplicationDomain::getTypeName<T>(), [&](const ApplicationDomain::ApplicationDomainType &entity) {
93 callback(T(entity));
94 });
95 }
96
97 void readRevisions(qint64 baseRevision, const QByteArray &type, const std::function<void(const QByteArray &key)> &callback);
98
99 bool contains(const QByteArray &type, const QByteArray &uid);
100
101 qint64 maxRevision();
102
103private:
104 class Private;
105 const QSharedPointer<Private> d;
106};
107
108}
109}
diff --git a/common/storage_common.cpp b/common/storage_common.cpp
index 1f2594e..60ef83d 100644
--- a/common/storage_common.cpp
+++ b/common/storage_common.cpp
@@ -27,26 +27,27 @@
27SINK_DEBUG_AREA("storage") 27SINK_DEBUG_AREA("storage")
28 28
29namespace Sink { 29namespace Sink {
30namespace Storage {
30 31
31static const char *s_internalPrefix = "__internal"; 32static const char *s_internalPrefix = "__internal";
32static const int s_internalPrefixSize = strlen(s_internalPrefix); 33static const int s_internalPrefixSize = strlen(s_internalPrefix);
33 34
34void errorHandler(const Storage::Error &error) 35void errorHandler(const DataStore::Error &error)
35{ 36{
36 SinkWarning() << "Database error in " << error.store << ", code " << error.code << ", message: " << error.message; 37 SinkWarning() << "Database error in " << error.store << ", code " << error.code << ", message: " << error.message;
37} 38}
38 39
39std::function<void(const Storage::Error &error)> Storage::basicErrorHandler() 40std::function<void(const DataStore::Error &error)> DataStore::basicErrorHandler()
40{ 41{
41 return errorHandler; 42 return errorHandler;
42} 43}
43 44
44void Storage::setDefaultErrorHandler(const std::function<void(const Storage::Error &error)> &errorHandler) 45void DataStore::setDefaultErrorHandler(const std::function<void(const DataStore::Error &error)> &errorHandler)
45{ 46{
46 mErrorHandler = errorHandler; 47 mErrorHandler = errorHandler;
47} 48}
48 49
49std::function<void(const Storage::Error &error)> Storage::defaultErrorHandler() const 50std::function<void(const DataStore::Error &error)> DataStore::defaultErrorHandler() const
50{ 51{
51 if (mErrorHandler) { 52 if (mErrorHandler) {
52 return mErrorHandler; 53 return mErrorHandler;
@@ -54,12 +55,12 @@ std::function<void(const Storage::Error &error)> Storage::defaultErrorHandler()
54 return basicErrorHandler(); 55 return basicErrorHandler();
55} 56}
56 57
57void Storage::setMaxRevision(Sink::Storage::Transaction &transaction, qint64 revision) 58void DataStore::setMaxRevision(DataStore::Transaction &transaction, qint64 revision)
58{ 59{
59 transaction.openDatabase().write("__internal_maxRevision", QByteArray::number(revision)); 60 transaction.openDatabase().write("__internal_maxRevision", QByteArray::number(revision));
60} 61}
61 62
62qint64 Storage::maxRevision(const Sink::Storage::Transaction &transaction) 63qint64 DataStore::maxRevision(const DataStore::Transaction &transaction)
63{ 64{
64 qint64 r = 0; 65 qint64 r = 0;
65 transaction.openDatabase().scan("__internal_maxRevision", 66 transaction.openDatabase().scan("__internal_maxRevision",
@@ -68,19 +69,19 @@ qint64 Storage::maxRevision(const Sink::Storage::Transaction &transaction)
68 return false; 69 return false;
69 }, 70 },
70 [](const Error &error) { 71 [](const Error &error) {
71 if (error.code != Sink::Storage::NotFound) { 72 if (error.code != DataStore::NotFound) {
72 SinkWarning() << "Coultn'd find the maximum revision."; 73 SinkWarning() << "Coultn'd find the maximum revision.";
73 } 74 }
74 }); 75 });
75 return r; 76 return r;
76} 77}
77 78
78void Storage::setCleanedUpRevision(Sink::Storage::Transaction &transaction, qint64 revision) 79void DataStore::setCleanedUpRevision(DataStore::Transaction &transaction, qint64 revision)
79{ 80{
80 transaction.openDatabase().write("__internal_cleanedUpRevision", QByteArray::number(revision)); 81 transaction.openDatabase().write("__internal_cleanedUpRevision", QByteArray::number(revision));
81} 82}
82 83
83qint64 Storage::cleanedUpRevision(const Sink::Storage::Transaction &transaction) 84qint64 DataStore::cleanedUpRevision(const DataStore::Transaction &transaction)
84{ 85{
85 qint64 r = 0; 86 qint64 r = 0;
86 transaction.openDatabase().scan("__internal_cleanedUpRevision", 87 transaction.openDatabase().scan("__internal_cleanedUpRevision",
@@ -89,14 +90,14 @@ qint64 Storage::cleanedUpRevision(const Sink::Storage::Transaction &transaction)
89 return false; 90 return false;
90 }, 91 },
91 [](const Error &error) { 92 [](const Error &error) {
92 if (error.code != Sink::Storage::NotFound) { 93 if (error.code != DataStore::NotFound) {
93 SinkWarning() << "Coultn'd find the maximum revision."; 94 SinkWarning() << "Coultn'd find the maximum revision.";
94 } 95 }
95 }); 96 });
96 return r; 97 return r;
97} 98}
98 99
99QByteArray Storage::getUidFromRevision(const Sink::Storage::Transaction &transaction, qint64 revision) 100QByteArray DataStore::getUidFromRevision(const DataStore::Transaction &transaction, qint64 revision)
100{ 101{
101 QByteArray uid; 102 QByteArray uid;
102 transaction.openDatabase("revisions") 103 transaction.openDatabase("revisions")
@@ -109,7 +110,7 @@ QByteArray Storage::getUidFromRevision(const Sink::Storage::Transaction &transac
109 return uid; 110 return uid;
110} 111}
111 112
112QByteArray Storage::getTypeFromRevision(const Sink::Storage::Transaction &transaction, qint64 revision) 113QByteArray DataStore::getTypeFromRevision(const DataStore::Transaction &transaction, qint64 revision)
113{ 114{
114 QByteArray type; 115 QByteArray type;
115 transaction.openDatabase("revisionType") 116 transaction.openDatabase("revisionType")
@@ -122,25 +123,25 @@ QByteArray Storage::getTypeFromRevision(const Sink::Storage::Transaction &transa
122 return type; 123 return type;
123} 124}
124 125
125void Storage::recordRevision(Sink::Storage::Transaction &transaction, qint64 revision, const QByteArray &uid, const QByteArray &type) 126void DataStore::recordRevision(DataStore::Transaction &transaction, qint64 revision, const QByteArray &uid, const QByteArray &type)
126{ 127{
127 // TODO use integerkeys 128 // TODO use integerkeys
128 transaction.openDatabase("revisions").write(QByteArray::number(revision), uid); 129 transaction.openDatabase("revisions").write(QByteArray::number(revision), uid);
129 transaction.openDatabase("revisionType").write(QByteArray::number(revision), type); 130 transaction.openDatabase("revisionType").write(QByteArray::number(revision), type);
130} 131}
131 132
132void Storage::removeRevision(Sink::Storage::Transaction &transaction, qint64 revision) 133void DataStore::removeRevision(DataStore::Transaction &transaction, qint64 revision)
133{ 134{
134 transaction.openDatabase("revisions").remove(QByteArray::number(revision)); 135 transaction.openDatabase("revisions").remove(QByteArray::number(revision));
135 transaction.openDatabase("revisionType").remove(QByteArray::number(revision)); 136 transaction.openDatabase("revisionType").remove(QByteArray::number(revision));
136} 137}
137 138
138bool Storage::isInternalKey(const char *key) 139bool DataStore::isInternalKey(const char *key)
139{ 140{
140 return key && strncmp(key, s_internalPrefix, s_internalPrefixSize) == 0; 141 return key && strncmp(key, s_internalPrefix, s_internalPrefixSize) == 0;
141} 142}
142 143
143bool Storage::isInternalKey(void *key, int size) 144bool DataStore::isInternalKey(void *key, int size)
144{ 145{
145 if (size < 1) { 146 if (size < 1) {
146 return false; 147 return false;
@@ -149,39 +150,39 @@ bool Storage::isInternalKey(void *key, int size)
149 return key && strncmp(static_cast<char *>(key), s_internalPrefix, (size > s_internalPrefixSize ? s_internalPrefixSize : size)) == 0; 150 return key && strncmp(static_cast<char *>(key), s_internalPrefix, (size > s_internalPrefixSize ? s_internalPrefixSize : size)) == 0;
150} 151}
151 152
152bool Storage::isInternalKey(const QByteArray &key) 153bool DataStore::isInternalKey(const QByteArray &key)
153{ 154{
154 return key.startsWith(s_internalPrefix); 155 return key.startsWith(s_internalPrefix);
155} 156}
156 157
157QByteArray Storage::assembleKey(const QByteArray &key, qint64 revision) 158QByteArray DataStore::assembleKey(const QByteArray &key, qint64 revision)
158{ 159{
159 Q_ASSERT(revision <= 9223372036854775807); 160 Q_ASSERT(revision <= 9223372036854775807);
160 Q_ASSERT(key.size() == 38); 161 Q_ASSERT(key.size() == 38);
161 return key + QByteArray::number(revision).rightJustified(19, '0', false); 162 return key + QByteArray::number(revision).rightJustified(19, '0', false);
162} 163}
163 164
164QByteArray Storage::uidFromKey(const QByteArray &key) 165QByteArray DataStore::uidFromKey(const QByteArray &key)
165{ 166{
166 return key.mid(0, 38); 167 return key.mid(0, 38);
167} 168}
168 169
169qint64 Storage::revisionFromKey(const QByteArray &key) 170qint64 DataStore::revisionFromKey(const QByteArray &key)
170{ 171{
171 return key.mid(39).toLongLong(); 172 return key.mid(39).toLongLong();
172} 173}
173 174
174QByteArray Storage::generateUid() 175QByteArray DataStore::generateUid()
175{ 176{
176 return QUuid::createUuid().toByteArray(); 177 return QUuid::createUuid().toByteArray();
177} 178}
178 179
179Storage::NamedDatabase Storage::mainDatabase(const Sink::Storage::Transaction &t, const QByteArray &type) 180DataStore::NamedDatabase DataStore::mainDatabase(const DataStore::Transaction &t, const QByteArray &type)
180{ 181{
181 return t.openDatabase(type + ".main"); 182 return t.openDatabase(type + ".main");
182} 183}
183 184
184bool Storage::NamedDatabase::contains(const QByteArray &uid) 185bool DataStore::NamedDatabase::contains(const QByteArray &uid)
185{ 186{
186 bool found = false; 187 bool found = false;
187 scan(uid, 188 scan(uid,
@@ -189,8 +190,9 @@ bool Storage::NamedDatabase::contains(const QByteArray &uid)
189 found = true; 190 found = true;
190 return false; 191 return false;
191 }, 192 },
192 [this](const Sink::Storage::Error &error) {}, true); 193 [this](const DataStore::Error &error) {}, true);
193 return found; 194 return found;
194} 195}
195 196
197}
196} // namespace Sink 198} // namespace Sink
diff --git a/common/storage_lmdb.cpp b/common/storage_lmdb.cpp
index 6f11af3..e418472 100644
--- a/common/storage_lmdb.cpp
+++ b/common/storage_lmdb.cpp
@@ -39,6 +39,7 @@ SINK_DEBUG_AREA("storage")
39// SINK_DEBUG_COMPONENT(d->storageRoot.toLatin1() + '/' + d->name.toLatin1()) 39// SINK_DEBUG_COMPONENT(d->storageRoot.toLatin1() + '/' + d->name.toLatin1())
40 40
41namespace Sink { 41namespace Sink {
42namespace Storage {
42 43
43QMutex sMutex; 44QMutex sMutex;
44QHash<QString, MDB_env *> sEnvironments; 45QHash<QString, MDB_env *> sEnvironments;
@@ -47,17 +48,17 @@ int getErrorCode(int e)
47{ 48{
48 switch (e) { 49 switch (e) {
49 case MDB_NOTFOUND: 50 case MDB_NOTFOUND:
50 return Storage::ErrorCodes::NotFound; 51 return DataStore::ErrorCodes::NotFound;
51 default: 52 default:
52 break; 53 break;
53 } 54 }
54 return -1; 55 return -1;
55} 56}
56 57
57class Storage::NamedDatabase::Private 58class DataStore::NamedDatabase::Private
58{ 59{
59public: 60public:
60 Private(const QByteArray &_db, bool _allowDuplicates, const std::function<void(const Storage::Error &error)> &_defaultErrorHandler, const QString &_name, MDB_txn *_txn) 61 Private(const QByteArray &_db, bool _allowDuplicates, const std::function<void(const DataStore::Error &error)> &_defaultErrorHandler, const QString &_name, MDB_txn *_txn)
61 : db(_db), transaction(_txn), allowDuplicates(_allowDuplicates), defaultErrorHandler(_defaultErrorHandler), name(_name) 62 : db(_db), transaction(_txn), allowDuplicates(_allowDuplicates), defaultErrorHandler(_defaultErrorHandler), name(_name)
62 { 63 {
63 } 64 }
@@ -70,10 +71,10 @@ public:
70 MDB_txn *transaction; 71 MDB_txn *transaction;
71 MDB_dbi dbi; 72 MDB_dbi dbi;
72 bool allowDuplicates; 73 bool allowDuplicates;
73 std::function<void(const Storage::Error &error)> defaultErrorHandler; 74 std::function<void(const DataStore::Error &error)> defaultErrorHandler;
74 QString name; 75 QString name;
75 76
76 bool openDatabase(bool readOnly, std::function<void(const Storage::Error &error)> errorHandler) 77 bool openDatabase(bool readOnly, std::function<void(const DataStore::Error &error)> errorHandler)
77 { 78 {
78 unsigned int flags = 0; 79 unsigned int flags = 0;
79 if (!readOnly) { 80 if (!readOnly) {
@@ -97,20 +98,20 @@ public:
97 } 98 }
98}; 99};
99 100
100Storage::NamedDatabase::NamedDatabase() : d(nullptr) 101DataStore::NamedDatabase::NamedDatabase() : d(nullptr)
101{ 102{
102} 103}
103 104
104Storage::NamedDatabase::NamedDatabase(NamedDatabase::Private *prv) : d(prv) 105DataStore::NamedDatabase::NamedDatabase(NamedDatabase::Private *prv) : d(prv)
105{ 106{
106} 107}
107 108
108Storage::NamedDatabase::NamedDatabase(NamedDatabase &&other) : d(nullptr) 109DataStore::NamedDatabase::NamedDatabase(NamedDatabase &&other) : d(nullptr)
109{ 110{
110 *this = std::move(other); 111 *this = std::move(other);
111} 112}
112 113
113Storage::NamedDatabase &Storage::NamedDatabase::operator=(Storage::NamedDatabase &&other) 114DataStore::NamedDatabase &DataStore::NamedDatabase::operator=(DataStore::NamedDatabase &&other)
114{ 115{
115 if (&other != this) { 116 if (&other != this) {
116 delete d; 117 delete d;
@@ -120,12 +121,12 @@ Storage::NamedDatabase &Storage::NamedDatabase::operator=(Storage::NamedDatabase
120 return *this; 121 return *this;
121} 122}
122 123
123Storage::NamedDatabase::~NamedDatabase() 124DataStore::NamedDatabase::~NamedDatabase()
124{ 125{
125 delete d; 126 delete d;
126} 127}
127 128
128bool Storage::NamedDatabase::write(const QByteArray &sKey, const QByteArray &sValue, const std::function<void(const Storage::Error &error)> &errorHandler) 129bool DataStore::NamedDatabase::write(const QByteArray &sKey, const QByteArray &sValue, const std::function<void(const DataStore::Error &error)> &errorHandler)
129{ 130{
130 if (!d || !d->transaction) { 131 if (!d || !d->transaction) {
131 Error error("", ErrorCodes::GenericError, "Not open"); 132 Error error("", ErrorCodes::GenericError, "Not open");
@@ -161,12 +162,12 @@ bool Storage::NamedDatabase::write(const QByteArray &sKey, const QByteArray &sVa
161 return !rc; 162 return !rc;
162} 163}
163 164
164void Storage::NamedDatabase::remove(const QByteArray &k, const std::function<void(const Storage::Error &error)> &errorHandler) 165void DataStore::NamedDatabase::remove(const QByteArray &k, const std::function<void(const DataStore::Error &error)> &errorHandler)
165{ 166{
166 remove(k, QByteArray(), errorHandler); 167 remove(k, QByteArray(), errorHandler);
167} 168}
168 169
169void Storage::NamedDatabase::remove(const QByteArray &k, const QByteArray &value, const std::function<void(const Storage::Error &error)> &errorHandler) 170void DataStore::NamedDatabase::remove(const QByteArray &k, const QByteArray &value, const std::function<void(const DataStore::Error &error)> &errorHandler)
170{ 171{
171 if (!d || !d->transaction) { 172 if (!d || !d->transaction) {
172 if (d) { 173 if (d) {
@@ -195,8 +196,8 @@ void Storage::NamedDatabase::remove(const QByteArray &k, const QByteArray &value
195 } 196 }
196} 197}
197 198
198int Storage::NamedDatabase::scan(const QByteArray &k, const std::function<bool(const QByteArray &key, const QByteArray &value)> &resultHandler, 199int DataStore::NamedDatabase::scan(const QByteArray &k, const std::function<bool(const QByteArray &key, const QByteArray &value)> &resultHandler,
199 const std::function<void(const Storage::Error &error)> &errorHandler, bool findSubstringKeys, bool skipInternalKeys) const 200 const std::function<void(const DataStore::Error &error)> &errorHandler, bool findSubstringKeys, bool skipInternalKeys) const
200{ 201{
201 if (!d || !d->transaction) { 202 if (!d || !d->transaction) {
202 // Not an error. We rely on this to read nothing from non-existing databases. 203 // Not an error. We rely on this to read nothing from non-existing databases.
@@ -278,8 +279,8 @@ int Storage::NamedDatabase::scan(const QByteArray &k, const std::function<bool(c
278 return numberOfRetrievedValues; 279 return numberOfRetrievedValues;
279} 280}
280 281
281void Storage::NamedDatabase::findLatest(const QByteArray &k, const std::function<void(const QByteArray &key, const QByteArray &value)> &resultHandler, 282void DataStore::NamedDatabase::findLatest(const QByteArray &k, const std::function<void(const QByteArray &key, const QByteArray &value)> &resultHandler,
282 const std::function<void(const Storage::Error &error)> &errorHandler) const 283 const std::function<void(const DataStore::Error &error)> &errorHandler) const
283{ 284{
284 if (!d || !d->transaction) { 285 if (!d || !d->transaction) {
285 // Not an error. We rely on this to read nothing from non-existing databases. 286 // Not an error. We rely on this to read nothing from non-existing databases.
@@ -346,7 +347,7 @@ void Storage::NamedDatabase::findLatest(const QByteArray &k, const std::function
346 return; 347 return;
347} 348}
348 349
349qint64 Storage::NamedDatabase::getSize() 350qint64 DataStore::NamedDatabase::getSize()
350{ 351{
351 if (!d || !d->transaction) { 352 if (!d || !d->transaction) {
352 return -1; 353 return -1;
@@ -368,10 +369,10 @@ qint64 Storage::NamedDatabase::getSize()
368} 369}
369 370
370 371
371class Storage::Transaction::Private 372class DataStore::Transaction::Private
372{ 373{
373public: 374public:
374 Private(bool _requestRead, const std::function<void(const Storage::Error &error)> &_defaultErrorHandler, const QString &_name, MDB_env *_env) 375 Private(bool _requestRead, const std::function<void(const DataStore::Error &error)> &_defaultErrorHandler, const QString &_name, MDB_env *_env)
375 : env(_env), transaction(nullptr), requestedRead(_requestRead), defaultErrorHandler(_defaultErrorHandler), name(_name), implicitCommit(false), error(false), modificationCounter(0) 376 : env(_env), transaction(nullptr), requestedRead(_requestRead), defaultErrorHandler(_defaultErrorHandler), name(_name), implicitCommit(false), error(false), modificationCounter(0)
376 { 377 {
377 } 378 }
@@ -383,7 +384,7 @@ public:
383 MDB_txn *transaction; 384 MDB_txn *transaction;
384 MDB_dbi dbi; 385 MDB_dbi dbi;
385 bool requestedRead; 386 bool requestedRead;
386 std::function<void(const Storage::Error &error)> defaultErrorHandler; 387 std::function<void(const DataStore::Error &error)> defaultErrorHandler;
387 QString name; 388 QString name;
388 bool implicitCommit; 389 bool implicitCommit;
389 bool error; 390 bool error;
@@ -406,21 +407,21 @@ public:
406 } 407 }
407}; 408};
408 409
409Storage::Transaction::Transaction() : d(nullptr) 410DataStore::Transaction::Transaction() : d(nullptr)
410{ 411{
411} 412}
412 413
413Storage::Transaction::Transaction(Transaction::Private *prv) : d(prv) 414DataStore::Transaction::Transaction(Transaction::Private *prv) : d(prv)
414{ 415{
415 d->startTransaction(); 416 d->startTransaction();
416} 417}
417 418
418Storage::Transaction::Transaction(Transaction &&other) : d(nullptr) 419DataStore::Transaction::Transaction(Transaction &&other) : d(nullptr)
419{ 420{
420 *this = std::move(other); 421 *this = std::move(other);
421} 422}
422 423
423Storage::Transaction &Storage::Transaction::operator=(Storage::Transaction &&other) 424DataStore::Transaction &DataStore::Transaction::operator=(DataStore::Transaction &&other)
424{ 425{
425 if (&other != this) { 426 if (&other != this) {
426 delete d; 427 delete d;
@@ -430,7 +431,7 @@ Storage::Transaction &Storage::Transaction::operator=(Storage::Transaction &&oth
430 return *this; 431 return *this;
431} 432}
432 433
433Storage::Transaction::~Transaction() 434DataStore::Transaction::~Transaction()
434{ 435{
435 if (d && d->transaction) { 436 if (d && d->transaction) {
436 if (d->implicitCommit && !d->error) { 437 if (d->implicitCommit && !d->error) {
@@ -443,12 +444,12 @@ Storage::Transaction::~Transaction()
443 delete d; 444 delete d;
444} 445}
445 446
446Storage::Transaction::operator bool() const 447DataStore::Transaction::operator bool() const
447{ 448{
448 return (d && d->transaction); 449 return (d && d->transaction);
449} 450}
450 451
451bool Storage::Transaction::commit(const std::function<void(const Storage::Error &error)> &errorHandler) 452bool DataStore::Transaction::commit(const std::function<void(const DataStore::Error &error)> &errorHandler)
452{ 453{
453 if (!d || !d->transaction) { 454 if (!d || !d->transaction) {
454 return false; 455 return false;
@@ -467,7 +468,7 @@ bool Storage::Transaction::commit(const std::function<void(const Storage::Error
467 return !rc; 468 return !rc;
468} 469}
469 470
470void Storage::Transaction::abort() 471void DataStore::Transaction::abort()
471{ 472{
472 if (!d || !d->transaction) { 473 if (!d || !d->transaction) {
473 return; 474 return;
@@ -481,7 +482,7 @@ void Storage::Transaction::abort()
481 482
482//Ensure that we opened the correct database by comparing the expected identifier with the one 483//Ensure that we opened the correct database by comparing the expected identifier with the one
483//we write to the database on first open. 484//we write to the database on first open.
484static bool ensureCorrectDb(Storage::NamedDatabase &database, const QByteArray &db, bool readOnly) 485static bool ensureCorrectDb(DataStore::NamedDatabase &database, const QByteArray &db, bool readOnly)
485{ 486{
486 bool openedTheWrongDatabase = false; 487 bool openedTheWrongDatabase = false;
487 auto count = database.scan("__internal_dbname", [db, &openedTheWrongDatabase](const QByteArray &key, const QByteArray &value) ->bool { 488 auto count = database.scan("__internal_dbname", [db, &openedTheWrongDatabase](const QByteArray &key, const QByteArray &value) ->bool {
@@ -491,7 +492,7 @@ static bool ensureCorrectDb(Storage::NamedDatabase &database, const QByteArray &
491 } 492 }
492 return false; 493 return false;
493 }, 494 },
494 [](const Storage::Error &error) -> bool{ 495 [](const DataStore::Error &error) -> bool{
495 return false; 496 return false;
496 }, false); 497 }, false);
497 //This is the first time we open this database in a write transaction, write the db name 498 //This is the first time we open this database in a write transaction, write the db name
@@ -503,7 +504,7 @@ static bool ensureCorrectDb(Storage::NamedDatabase &database, const QByteArray &
503 return !openedTheWrongDatabase; 504 return !openedTheWrongDatabase;
504} 505}
505 506
506bool Storage::Transaction::validateNamedDatabases() 507bool DataStore::Transaction::validateNamedDatabases()
507{ 508{
508 auto databases = getDatabaseNames(); 509 auto databases = getDatabaseNames();
509 for (const auto &dbName : databases) { 510 for (const auto &dbName : databases) {
@@ -516,28 +517,28 @@ bool Storage::Transaction::validateNamedDatabases()
516 return true; 517 return true;
517} 518}
518 519
519Storage::NamedDatabase Storage::Transaction::openDatabase(const QByteArray &db, const std::function<void(const Storage::Error &error)> &errorHandler, bool allowDuplicates) const 520DataStore::NamedDatabase DataStore::Transaction::openDatabase(const QByteArray &db, const std::function<void(const DataStore::Error &error)> &errorHandler, bool allowDuplicates) const
520{ 521{
521 if (!d) { 522 if (!d) {
522 return Storage::NamedDatabase(); 523 return DataStore::NamedDatabase();
523 } 524 }
524 Q_ASSERT(d->transaction); 525 Q_ASSERT(d->transaction);
525 // We don't now if anything changed 526 // We don't now if anything changed
526 d->implicitCommit = true; 527 d->implicitCommit = true;
527 auto p = new Storage::NamedDatabase::Private(db, allowDuplicates, d->defaultErrorHandler, d->name, d->transaction); 528 auto p = new DataStore::NamedDatabase::Private(db, allowDuplicates, d->defaultErrorHandler, d->name, d->transaction);
528 if (!p->openDatabase(d->requestedRead, errorHandler)) { 529 if (!p->openDatabase(d->requestedRead, errorHandler)) {
529 delete p; 530 delete p;
530 return Storage::NamedDatabase(); 531 return DataStore::NamedDatabase();
531 } 532 }
532 auto database = Storage::NamedDatabase(p); 533 auto database = DataStore::NamedDatabase(p);
533 if (!ensureCorrectDb(database, db, d->requestedRead)) { 534 if (!ensureCorrectDb(database, db, d->requestedRead)) {
534 SinkWarning() << "Failed to open the database" << db; 535 SinkWarning() << "Failed to open the database" << db;
535 return Storage::NamedDatabase(); 536 return DataStore::NamedDatabase();
536 } 537 }
537 return database; 538 return database;
538} 539}
539 540
540QList<QByteArray> Storage::Transaction::getDatabaseNames() const 541QList<QByteArray> DataStore::Transaction::getDatabaseNames() const
541{ 542{
542 if (!d) { 543 if (!d) {
543 SinkWarning() << "Invalid transaction"; 544 SinkWarning() << "Invalid transaction";
@@ -574,7 +575,7 @@ QList<QByteArray> Storage::Transaction::getDatabaseNames() const
574} 575}
575 576
576 577
577class Storage::Private 578class DataStore::Private
578{ 579{
579public: 580public:
580 Private(const QString &s, const QString &n, AccessMode m); 581 Private(const QString &s, const QString &n, AccessMode m);
@@ -587,7 +588,7 @@ public:
587 AccessMode mode; 588 AccessMode mode;
588}; 589};
589 590
590Storage::Private::Private(const QString &s, const QString &n, AccessMode m) : storageRoot(s), name(n), env(0), mode(m) 591DataStore::Private::Private(const QString &s, const QString &n, AccessMode m) : storageRoot(s), name(n), env(0), mode(m)
591{ 592{
592 const QString fullPath(storageRoot + '/' + name); 593 const QString fullPath(storageRoot + '/' + name);
593 QFileInfo dirInfo(fullPath); 594 QFileInfo dirInfo(fullPath);
@@ -639,27 +640,27 @@ Storage::Private::Private(const QString &s, const QString &n, AccessMode m) : st
639 } 640 }
640} 641}
641 642
642Storage::Private::~Private() 643DataStore::Private::~Private()
643{ 644{
644 //We never close the environment (unless we remove the db), since we should only open the environment once per process (as per lmdb docs) 645 //We never close the environment (unless we remove the db), since we should only open the environment once per process (as per lmdb docs)
645 //and create storage instance from all over the place. Thus, we're not closing it here on purpose. 646 //and create storage instance from all over the place. Thus, we're not closing it here on purpose.
646} 647}
647 648
648Storage::Storage(const QString &storageRoot, const QString &name, AccessMode mode) : d(new Private(storageRoot, name, mode)) 649DataStore::DataStore(const QString &storageRoot, const QString &name, AccessMode mode) : d(new Private(storageRoot, name, mode))
649{ 650{
650} 651}
651 652
652Storage::~Storage() 653DataStore::~DataStore()
653{ 654{
654 delete d; 655 delete d;
655} 656}
656 657
657bool Storage::exists() const 658bool DataStore::exists() const
658{ 659{
659 return (d->env != 0); 660 return (d->env != 0);
660} 661}
661 662
662Storage::Transaction Storage::createTransaction(AccessMode type, const std::function<void(const Storage::Error &error)> &errorHandlerArg) 663DataStore::Transaction DataStore::createTransaction(AccessMode type, const std::function<void(const DataStore::Error &error)> &errorHandlerArg)
663{ 664{
664 auto errorHandler = errorHandlerArg ? errorHandlerArg : defaultErrorHandler(); 665 auto errorHandler = errorHandlerArg ? errorHandlerArg : defaultErrorHandler();
665 if (!d->env) { 666 if (!d->env) {
@@ -677,7 +678,7 @@ Storage::Transaction Storage::createTransaction(AccessMode type, const std::func
677 return Transaction(new Transaction::Private(requestedRead, defaultErrorHandler(), d->name, d->env)); 678 return Transaction(new Transaction::Private(requestedRead, defaultErrorHandler(), d->name, d->env));
678} 679}
679 680
680qint64 Storage::diskUsage() const 681qint64 DataStore::diskUsage() const
681{ 682{
682 QFileInfo info(d->storageRoot + '/' + d->name + "/data.mdb"); 683 QFileInfo info(d->storageRoot + '/' + d->name + "/data.mdb");
683 if (!info.exists()) { 684 if (!info.exists()) {
@@ -686,7 +687,7 @@ qint64 Storage::diskUsage() const
686 return info.size(); 687 return info.size();
687} 688}
688 689
689void Storage::removeFromDisk() const 690void DataStore::removeFromDisk() const
690{ 691{
691 const QString fullPath(d->storageRoot + '/' + d->name); 692 const QString fullPath(d->storageRoot + '/' + d->name);
692 QMutexLocker locker(&sMutex); 693 QMutexLocker locker(&sMutex);
@@ -701,7 +702,7 @@ void Storage::removeFromDisk() const
701 } 702 }
702} 703}
703 704
704void Storage::clearEnv() 705void DataStore::clearEnv()
705{ 706{
706 for (auto env : sEnvironments) { 707 for (auto env : sEnvironments) {
707 mdb_env_close(env); 708 mdb_env_close(env);
@@ -709,4 +710,5 @@ void Storage::clearEnv()
709 sEnvironments.clear(); 710 sEnvironments.clear();
710} 711}
711 712
713}
712} // namespace Sink 714} // namespace Sink
diff --git a/common/store.cpp b/common/store.cpp
index 0ecdcd2..52fec2e 100644
--- a/common/store.cpp
+++ b/common/store.cpp
@@ -230,7 +230,7 @@ KAsync::Job<void> Store::removeDataFromDisk(const QByteArray &identifier)
230{ 230{
231 // All databases are going to become invalid, nuke the environments 231 // All databases are going to become invalid, nuke the environments
232 // TODO: all clients should react to a notification the resource 232 // TODO: all clients should react to a notification the resource
233 Sink::Storage::clearEnv(); 233 Sink::Storage::DataStore::clearEnv();
234 SinkTrace() << "Remove data from disk " << identifier; 234 SinkTrace() << "Remove data from disk " << identifier;
235 auto time = QSharedPointer<QTime>::create(); 235 auto time = QSharedPointer<QTime>::create();
236 time->start(); 236 time->start();
diff --git a/common/synchronizer.cpp b/common/synchronizer.cpp
index 53db82f..5ddd77c 100644
--- a/common/synchronizer.cpp
+++ b/common/synchronizer.cpp
@@ -24,7 +24,7 @@
24#include "bufferutils.h" 24#include "bufferutils.h"
25#include "entitystore.h" 25#include "entitystore.h"
26#include "remoteidmap.h" 26#include "remoteidmap.h"
27#include "adaptorfactoryregistry.h" 27#include "entityreader.h"
28#include "createentity_generated.h" 28#include "createentity_generated.h"
29#include "modifyentity_generated.h" 29#include "modifyentity_generated.h"
30#include "deleteentity_generated.h" 30#include "deleteentity_generated.h"
@@ -33,13 +33,12 @@ SINK_DEBUG_AREA("synchronizer")
33 33
34using namespace Sink; 34using namespace Sink;
35 35
36Synchronizer::Synchronizer(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier) 36Synchronizer::Synchronizer(const Sink::ResourceContext &context)
37 : mStorage(Sink::storageLocation(), resourceInstanceIdentifier, Sink::Storage::ReadOnly), 37 : mResourceContext(context),
38 mSyncStorage(Sink::storageLocation(), resourceInstanceIdentifier + ".synchronization", Sink::Storage::ReadWrite), 38 mEntityStore(Storage::EntityStore::Ptr::create(mResourceContext)),
39 mResourceType(resourceType), 39 mSyncStorage(Sink::storageLocation(), mResourceContext.instanceId() + ".synchronization", Sink::Storage::DataStore::DataStore::ReadWrite)
40 mResourceInstanceIdentifier(resourceInstanceIdentifier)
41{ 40{
42 SinkTrace() << "Starting synchronizer: " << resourceType << resourceInstanceIdentifier; 41 SinkTrace() << "Starting synchronizer: " << mResourceContext.resourceType << mResourceContext.instanceId();
43} 42}
44 43
45Synchronizer::~Synchronizer() 44Synchronizer::~Synchronizer()
@@ -59,11 +58,9 @@ void Synchronizer::enqueueCommand(int commandId, const QByteArray &data)
59 mEnqueue(commandId, data); 58 mEnqueue(commandId, data);
60} 59}
61 60
62EntityStore &Synchronizer::store() 61Storage::EntityStore &Synchronizer::store()
63{ 62{
64 if (!mEntityStore) { 63 mEntityStore->startTransaction(Sink::Storage::DataStore::ReadOnly);
65 mEntityStore = QSharedPointer<EntityStore>::create(mResourceType, mResourceInstanceIdentifier, transaction());
66 }
67 return *mEntityStore; 64 return *mEntityStore;
68} 65}
69 66
@@ -75,13 +72,12 @@ RemoteIdMap &Synchronizer::syncStore()
75 return *mSyncStore; 72 return *mSyncStore;
76} 73}
77 74
78void Synchronizer::createEntity(const QByteArray &sinkId, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject, 75void Synchronizer::createEntity(const QByteArray &sinkId, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject)
79 DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function<void(const QByteArray &)> callback)
80{ 76{
81 // These changes are coming from the source 77 // These changes are coming from the source
82 const auto replayToSource = false; 78 const auto replayToSource = false;
83 flatbuffers::FlatBufferBuilder entityFbb; 79 flatbuffers::FlatBufferBuilder entityFbb;
84 adaptorFactory.createBuffer(domainObject, entityFbb); 80 mResourceContext.adaptorFactory(bufferType).createBuffer(domainObject, entityFbb);
85 flatbuffers::FlatBufferBuilder fbb; 81 flatbuffers::FlatBufferBuilder fbb;
86 // This is the resource type and not the domain type 82 // This is the resource type and not the domain type
87 auto entityId = fbb.CreateString(sinkId.toStdString()); 83 auto entityId = fbb.CreateString(sinkId.toStdString());
@@ -89,18 +85,17 @@ void Synchronizer::createEntity(const QByteArray &sinkId, const QByteArray &buff
89 auto delta = Sink::EntityBuffer::appendAsVector(fbb, entityFbb.GetBufferPointer(), entityFbb.GetSize()); 85 auto delta = Sink::EntityBuffer::appendAsVector(fbb, entityFbb.GetBufferPointer(), entityFbb.GetSize());
90 auto location = Sink::Commands::CreateCreateEntity(fbb, entityId, type, delta, replayToSource); 86 auto location = Sink::Commands::CreateCreateEntity(fbb, entityId, type, delta, replayToSource);
91 Sink::Commands::FinishCreateEntityBuffer(fbb, location); 87 Sink::Commands::FinishCreateEntityBuffer(fbb, location);
92 callback(BufferUtils::extractBuffer(fbb)); 88 enqueueCommand(Sink::Commands::CreateEntityCommand, BufferUtils::extractBuffer(fbb));
93} 89}
94 90
95void Synchronizer::modifyEntity(const QByteArray &sinkId, qint64 revision, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject, 91void Synchronizer::modifyEntity(const QByteArray &sinkId, qint64 revision, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject)
96 DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function<void(const QByteArray &)> callback)
97{ 92{
98 // FIXME removals 93 // FIXME removals
99 QByteArrayList deletedProperties; 94 QByteArrayList deletedProperties;
100 // These changes are coming from the source 95 // These changes are coming from the source
101 const auto replayToSource = false; 96 const auto replayToSource = false;
102 flatbuffers::FlatBufferBuilder entityFbb; 97 flatbuffers::FlatBufferBuilder entityFbb;
103 adaptorFactory.createBuffer(domainObject, entityFbb); 98 mResourceContext.adaptorFactory(bufferType).createBuffer(domainObject, entityFbb);
104 flatbuffers::FlatBufferBuilder fbb; 99 flatbuffers::FlatBufferBuilder fbb;
105 auto entityId = fbb.CreateString(sinkId.toStdString()); 100 auto entityId = fbb.CreateString(sinkId.toStdString());
106 auto modifiedProperties = BufferUtils::toVector(fbb, domainObject.changedProperties()); 101 auto modifiedProperties = BufferUtils::toVector(fbb, domainObject.changedProperties());
@@ -110,10 +105,10 @@ void Synchronizer::modifyEntity(const QByteArray &sinkId, qint64 revision, const
110 auto delta = Sink::EntityBuffer::appendAsVector(fbb, entityFbb.GetBufferPointer(), entityFbb.GetSize()); 105 auto delta = Sink::EntityBuffer::appendAsVector(fbb, entityFbb.GetBufferPointer(), entityFbb.GetSize());
111 auto location = Sink::Commands::CreateModifyEntity(fbb, revision, entityId, deletions, type, delta, replayToSource, modifiedProperties); 106 auto location = Sink::Commands::CreateModifyEntity(fbb, revision, entityId, deletions, type, delta, replayToSource, modifiedProperties);
112 Sink::Commands::FinishModifyEntityBuffer(fbb, location); 107 Sink::Commands::FinishModifyEntityBuffer(fbb, location);
113 callback(BufferUtils::extractBuffer(fbb)); 108 enqueueCommand(Sink::Commands::ModifyEntityCommand, BufferUtils::extractBuffer(fbb));
114} 109}
115 110
116void Synchronizer::deleteEntity(const QByteArray &sinkId, qint64 revision, const QByteArray &bufferType, std::function<void(const QByteArray &)> callback) 111void Synchronizer::deleteEntity(const QByteArray &sinkId, qint64 revision, const QByteArray &bufferType)
117{ 112{
118 // These changes are coming from the source 113 // These changes are coming from the source
119 const auto replayToSource = false; 114 const auto replayToSource = false;
@@ -123,63 +118,69 @@ void Synchronizer::deleteEntity(const QByteArray &sinkId, qint64 revision, const
123 auto type = fbb.CreateString(bufferType.toStdString()); 118 auto type = fbb.CreateString(bufferType.toStdString());
124 auto location = Sink::Commands::CreateDeleteEntity(fbb, revision, entityId, type, replayToSource); 119 auto location = Sink::Commands::CreateDeleteEntity(fbb, revision, entityId, type, replayToSource);
125 Sink::Commands::FinishDeleteEntityBuffer(fbb, location); 120 Sink::Commands::FinishDeleteEntityBuffer(fbb, location);
126 callback(BufferUtils::extractBuffer(fbb)); 121 enqueueCommand(Sink::Commands::DeleteEntityCommand, BufferUtils::extractBuffer(fbb));
127} 122}
128 123
129void Synchronizer::scanForRemovals(const QByteArray &bufferType, const std::function<void(const std::function<void(const QByteArray &key)> &callback)> &entryGenerator, std::function<bool(const QByteArray &remoteId)> exists) 124void Synchronizer::scanForRemovals(const QByteArray &bufferType, const std::function<void(const std::function<void(const QByteArray &key)> &callback)> &entryGenerator, std::function<bool(const QByteArray &remoteId)> exists)
130{ 125{
131 entryGenerator([this, bufferType, &exists](const QByteArray &key) { 126 entryGenerator([this, bufferType, &exists](const QByteArray &sinkId) {
132 auto sinkId = Sink::Storage::uidFromKey(key);
133 const auto remoteId = syncStore().resolveLocalId(bufferType, sinkId); 127 const auto remoteId = syncStore().resolveLocalId(bufferType, sinkId);
134 SinkTrace() << "Checking for removal " << key << remoteId; 128 SinkTrace() << "Checking for removal " << sinkId << remoteId;
135 // If we have no remoteId, the entity hasn't been replayed to the source yet 129 // If we have no remoteId, the entity hasn't been replayed to the source yet
136 if (!remoteId.isEmpty()) { 130 if (!remoteId.isEmpty()) {
137 if (!exists(remoteId)) { 131 if (!exists(remoteId)) {
138 SinkTrace() << "Found a removed entity: " << sinkId; 132 SinkTrace() << "Found a removed entity: " << sinkId;
139 deleteEntity(sinkId, Sink::Storage::maxRevision(transaction()), bufferType, 133 deleteEntity(sinkId, mEntityStore->maxRevision(), bufferType);
140 [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::DeleteEntityCommand, buffer); });
141 } 134 }
142 } 135 }
143 }); 136 });
144} 137}
145 138
146void Synchronizer::modify(const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity) 139void Synchronizer::scanForRemovals(const QByteArray &bufferType, std::function<bool(const QByteArray &remoteId)> exists)
147{ 140{
148 auto mainDatabase = Storage::mainDatabase(transaction(), bufferType); 141 scanForRemovals(bufferType,
149 const auto sinkId = syncStore().resolveRemoteId(bufferType, remoteId); 142 [this, &bufferType](const std::function<void(const QByteArray &)> &callback) {
150 auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(mResourceType, bufferType); 143 store().readAllUids(bufferType, [callback](const QByteArray &uid) {
151 Q_ASSERT(adaptorFactory); 144 callback(uid);
152 qint64 retrievedRevision = 0; 145 });
153 if (auto current = EntityReaderUtils::getLatest(mainDatabase, sinkId, *adaptorFactory, retrievedRevision)) { 146 },
147 exists
148 );
149}
150
151void Synchronizer::modifyIfChanged(Storage::EntityStore &store, const QByteArray &bufferType, const QByteArray &sinkId, const Sink::ApplicationDomain::ApplicationDomainType &entity)
152{
153 store.readLatest(bufferType, sinkId, [&, this](const Sink::ApplicationDomain::ApplicationDomainType &current) {
154 bool changed = false; 154 bool changed = false;
155 for (const auto &property : entity.changedProperties()) { 155 for (const auto &property : entity.changedProperties()) {
156 if (entity.getProperty(property) != current->getProperty(property)) { 156 if (entity.getProperty(property) != current.getProperty(property)) {
157 SinkTrace() << "Property changed " << sinkId << property; 157 SinkTrace() << "Property changed " << sinkId << property;
158 changed = true; 158 changed = true;
159 } 159 }
160 } 160 }
161 if (changed) { 161 if (changed) {
162 SinkTrace() << "Found a modified entity: " << remoteId; 162 SinkTrace() << "Found a modified entity: " << sinkId;
163 modifyEntity(sinkId, Sink::Storage::maxRevision(transaction()), bufferType, entity, *adaptorFactory, 163 modifyEntity(sinkId, store.maxRevision(), bufferType, entity);
164 [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::ModifyEntityCommand, buffer); });
165 } 164 }
166 } else { 165 });
167 SinkWarning() << "Failed to get current entity"; 166}
168 } 167
168void Synchronizer::modify(const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity)
169{
170 const auto sinkId = syncStore().resolveRemoteId(bufferType, remoteId);
171 Storage::EntityStore store(mResourceContext);
172 modifyIfChanged(store, bufferType, sinkId, entity);
169} 173}
170 174
171void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity) 175void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity)
172{ 176{
173 SinkTrace() << "Create or modify" << bufferType << remoteId; 177 SinkTrace() << "Create or modify" << bufferType << remoteId;
174 auto mainDatabase = Storage::mainDatabase(transaction(), bufferType); 178 Storage::EntityStore store(mResourceContext);
175 const auto sinkId = syncStore().resolveRemoteId(bufferType, remoteId); 179 const auto sinkId = syncStore().resolveRemoteId(bufferType, remoteId);
176 const auto found = mainDatabase.contains(sinkId); 180 const auto found = store.contains(bufferType, sinkId);
177 if (!found) { 181 if (!found) {
178 SinkTrace() << "Found a new entity: " << remoteId; 182 SinkTrace() << "Found a new entity: " << remoteId;
179 auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(mResourceType, bufferType); 183 createEntity(sinkId, bufferType, entity);
180 Q_ASSERT(adaptorFactory);
181 createEntity(
182 sinkId, bufferType, entity, *adaptorFactory, [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::CreateEntityCommand, buffer); });
183 } else { // modification 184 } else { // modification
184 modify(bufferType, remoteId, entity); 185 modify(bufferType, remoteId, entity);
185 } 186 }
@@ -190,10 +191,9 @@ void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray
190{ 191{
191 192
192 SinkTrace() << "Create or modify" << bufferType << remoteId; 193 SinkTrace() << "Create or modify" << bufferType << remoteId;
193 auto mainDatabase = Storage::mainDatabase(transaction(), bufferType);
194 const auto sinkId = syncStore().resolveRemoteId(bufferType, remoteId); 194 const auto sinkId = syncStore().resolveRemoteId(bufferType, remoteId);
195 const auto found = mainDatabase.contains(sinkId); 195 Storage::EntityStore store(mResourceContext);
196 auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(mResourceType, bufferType); 196 const auto found = store.contains(bufferType, sinkId);
197 if (!found) { 197 if (!found) {
198 if (!mergeCriteria.isEmpty()) { 198 if (!mergeCriteria.isEmpty()) {
199 Sink::Query query; 199 Sink::Query query;
@@ -201,7 +201,8 @@ void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray
201 query.filter(it.key(), it.value()); 201 query.filter(it.key(), it.value());
202 } 202 }
203 bool merge = false; 203 bool merge = false;
204 Sink::EntityReader<DomainType> reader(mResourceType, mResourceInstanceIdentifier, transaction()); 204 Storage::EntityStore store(mResourceContext);
205 Sink::EntityReader<DomainType> reader(store);
205 reader.query(query, 206 reader.query(query,
206 [this, bufferType, remoteId, &merge](const DomainType &o) -> bool{ 207 [this, bufferType, remoteId, &merge](const DomainType &o) -> bool{
207 merge = true; 208 merge = true;
@@ -211,43 +212,21 @@ void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray
211 }); 212 });
212 if (!merge) { 213 if (!merge) {
213 SinkTrace() << "Found a new entity: " << remoteId; 214 SinkTrace() << "Found a new entity: " << remoteId;
214 createEntity( 215 createEntity(sinkId, bufferType, entity);
215 sinkId, bufferType, entity, *adaptorFactory, [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::CreateEntityCommand, buffer); });
216 } 216 }
217 } else { 217 } else {
218 SinkTrace() << "Found a new entity: " << remoteId; 218 SinkTrace() << "Found a new entity: " << remoteId;
219 createEntity( 219 createEntity(sinkId, bufferType, entity);
220 sinkId, bufferType, entity, *adaptorFactory, [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::CreateEntityCommand, buffer); });
221 } 220 }
222 } else { // modification 221 } else { // modification
223 qint64 retrievedRevision = 0; 222 modifyIfChanged(store, bufferType, sinkId, entity);
224 if (auto current = EntityReaderUtils::getLatest(mainDatabase, sinkId, *adaptorFactory, retrievedRevision)) {
225 bool changed = false;
226 for (const auto &property : entity.changedProperties()) {
227 if (entity.getProperty(property) != current->getProperty(property)) {
228 SinkTrace() << "Property changed " << sinkId << property;
229 changed = true;
230 }
231 }
232 if (changed) {
233 SinkTrace() << "Found a modified entity: " << remoteId;
234 modifyEntity(sinkId, Sink::Storage::maxRevision(transaction()), bufferType, entity, *adaptorFactory,
235 [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::ModifyEntityCommand, buffer); });
236 }
237 } else {
238 SinkWarning() << "Failed to get current entity";
239 }
240 } 223 }
241} 224}
242 225
243template<typename DomainType> 226template<typename DomainType>
244void Synchronizer::modify(const DomainType &entity) 227void Synchronizer::modify(const DomainType &entity)
245{ 228{
246 const auto bufferType = ApplicationDomain::getTypeName<DomainType>(); 229 modifyEntity(entity.identifier(), entity.revision(), ApplicationDomain::getTypeName<DomainType>(), entity);
247 const auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(mResourceType, bufferType);
248 Q_ASSERT(adaptorFactory);
249 modifyEntity(entity.identifier(), entity.revision(), bufferType, entity, *adaptorFactory,
250 [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::ModifyEntityCommand, buffer); });
251} 230}
252 231
253KAsync::Job<void> Synchronizer::synchronize() 232KAsync::Job<void> Synchronizer::synchronize()
@@ -257,7 +236,6 @@ KAsync::Job<void> Synchronizer::synchronize()
257 mMessageQueue->startTransaction(); 236 mMessageQueue->startTransaction();
258 return synchronizeWithSource().syncThen<void>([this]() { 237 return synchronizeWithSource().syncThen<void>([this]() {
259 mSyncStore.clear(); 238 mSyncStore.clear();
260 mEntityStore.clear();
261 mMessageQueue->commit(); 239 mMessageQueue->commit();
262 mSyncInProgress = false; 240 mSyncInProgress = false;
263 }); 241 });
@@ -266,8 +244,7 @@ KAsync::Job<void> Synchronizer::synchronize()
266void Synchronizer::commit() 244void Synchronizer::commit()
267{ 245{
268 mMessageQueue->commit(); 246 mMessageQueue->commit();
269 mTransaction.abort(); 247 mEntityStore->abortTransaction();
270 mEntityStore.clear();
271 mSyncTransaction.commit(); 248 mSyncTransaction.commit();
272 mSyncStore.clear(); 249 mSyncStore.clear();
273 if (mSyncInProgress) { 250 if (mSyncInProgress) {
@@ -275,20 +252,11 @@ void Synchronizer::commit()
275 } 252 }
276} 253}
277 254
278Sink::Storage::Transaction &Synchronizer::transaction() 255Sink::Storage::DataStore::DataStore::Transaction &Synchronizer::syncTransaction()
279{
280 if (!mTransaction) {
281 SinkTrace() << "Starting transaction";
282 mTransaction = mStorage.createTransaction(Sink::Storage::ReadOnly);
283 }
284 return mTransaction;
285}
286
287Sink::Storage::Transaction &Synchronizer::syncTransaction()
288{ 256{
289 if (!mSyncTransaction) { 257 if (!mSyncTransaction) {
290 SinkTrace() << "Starting transaction"; 258 SinkTrace() << "Starting transaction";
291 mSyncTransaction = mSyncStorage.createTransaction(Sink::Storage::ReadWrite); 259 mSyncTransaction = mSyncStorage.createTransaction(Sink::Storage::DataStore::DataStore::ReadWrite);
292 } 260 }
293 return mSyncTransaction; 261 return mSyncTransaction;
294} 262}
diff --git a/common/synchronizer.h b/common/synchronizer.h
index 5f60128..f3319f6 100644
--- a/common/synchronizer.h
+++ b/common/synchronizer.h
@@ -37,31 +37,28 @@ class RemoteIdMap;
37class SINK_EXPORT Synchronizer 37class SINK_EXPORT Synchronizer
38{ 38{
39public: 39public:
40 Synchronizer(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier); 40 Synchronizer(const Sink::ResourceContext &resourceContext);
41 virtual ~Synchronizer(); 41 virtual ~Synchronizer();
42 42
43 void setup(const std::function<void(int commandId, const QByteArray &data)> &enqueueCommandCallback, MessageQueue &messageQueue); 43 void setup(const std::function<void(int commandId, const QByteArray &data)> &enqueueCommandCallback, MessageQueue &messageQueue);
44 KAsync::Job<void> synchronize(); 44 KAsync::Job<void> synchronize();
45 45
46 //Read only access to main storage 46 //Read only access to main storage
47 EntityStore &store(); 47 Storage::EntityStore &store();
48 48
49 //Read/Write access to sync storage 49 //Read/Write access to sync storage
50 RemoteIdMap &syncStore(); 50 RemoteIdMap &syncStore();
51 51
52 void commit(); 52 void commit();
53 Sink::Storage::Transaction &transaction(); 53 Sink::Storage::DataStore::Transaction &syncTransaction();
54 Sink::Storage::Transaction &syncTransaction();
55 54
56protected: 55protected:
57 ///Calls the callback to enqueue the command 56 ///Calls the callback to enqueue the command
58 void enqueueCommand(int commandId, const QByteArray &data); 57 void enqueueCommand(int commandId, const QByteArray &data);
59 58
60 static void createEntity(const QByteArray &localId, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject, 59 void createEntity(const QByteArray &localId, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject);
61 DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function<void(const QByteArray &)> callback); 60 void modifyEntity(const QByteArray &localId, qint64 revision, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject);
62 static void modifyEntity(const QByteArray &localId, qint64 revision, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject, 61 void deleteEntity(const QByteArray &localId, qint64 revision, const QByteArray &bufferType);
63 DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function<void(const QByteArray &)> callback);
64 static void deleteEntity(const QByteArray &localId, qint64 revision, const QByteArray &bufferType, std::function<void(const QByteArray &)> callback);
65 62
66 /** 63 /**
67 * A synchronous algorithm to remove entities that are no longer existing. 64 * A synchronous algorithm to remove entities that are no longer existing.
@@ -74,7 +71,8 @@ protected:
74 * All functions are called synchronously, and both @param entryGenerator and @param exists need to be synchronous. 71 * All functions are called synchronously, and both @param entryGenerator and @param exists need to be synchronous.
75 */ 72 */
76 void scanForRemovals(const QByteArray &bufferType, 73 void scanForRemovals(const QByteArray &bufferType,
77 const std::function<void(const std::function<void(const QByteArray &key)> &callback)> &entryGenerator, std::function<bool(const QByteArray &remoteId)> exists); 74 const std::function<void(const std::function<void(const QByteArray &sinkId)> &callback)> &entryGenerator, std::function<bool(const QByteArray &remoteId)> exists);
75 void scanForRemovals(const QByteArray &bufferType, std::function<bool(const QByteArray &remoteId)> exists);
78 76
79 /** 77 /**
80 * An algorithm to create or modify the entity. 78 * An algorithm to create or modify the entity.
@@ -96,14 +94,13 @@ protected:
96 virtual KAsync::Job<void> synchronizeWithSource() = 0; 94 virtual KAsync::Job<void> synchronizeWithSource() = 0;
97 95
98private: 96private:
97 void modifyIfChanged(Storage::EntityStore &store, const QByteArray &bufferType, const QByteArray &sinkId, const Sink::ApplicationDomain::ApplicationDomainType &entity);
98
99 Sink::ResourceContext mResourceContext;
100 Sink::Storage::EntityStore::Ptr mEntityStore;
99 QSharedPointer<RemoteIdMap> mSyncStore; 101 QSharedPointer<RemoteIdMap> mSyncStore;
100 QSharedPointer<EntityStore> mEntityStore; 102 Sink::Storage::DataStore mSyncStorage;
101 Sink::Storage mStorage; 103 Sink::Storage::DataStore::Transaction mSyncTransaction;
102 Sink::Storage mSyncStorage;
103 QByteArray mResourceType;
104 QByteArray mResourceInstanceIdentifier;
105 Sink::Storage::Transaction mTransaction;
106 Sink::Storage::Transaction mSyncTransaction;
107 std::function<void(int commandId, const QByteArray &data)> mEnqueue; 104 std::function<void(int commandId, const QByteArray &data)> mEnqueue;
108 MessageQueue *mMessageQueue; 105 MessageQueue *mMessageQueue;
109 bool mSyncInProgress; 106 bool mSyncInProgress;
diff --git a/common/test.cpp b/common/test.cpp
index 7bba125..0982293 100644
--- a/common/test.cpp
+++ b/common/test.cpp
@@ -104,11 +104,11 @@ public:
104 facade->mTestAccount = testAccount; 104 facade->mTestAccount = testAccount;
105 map.insert(instanceIdentifier, facade); 105 map.insert(instanceIdentifier, facade);
106 bool alwaysReturnFacade = instanceIdentifier.isEmpty(); 106 bool alwaysReturnFacade = instanceIdentifier.isEmpty();
107 Sink::FacadeFactory::instance().registerFacade<T, TestFacade<T>>("testresource", [alwaysReturnFacade](const QByteArray &instanceIdentifier) { 107 Sink::FacadeFactory::instance().registerFacade<T, TestFacade<T>>("testresource", [alwaysReturnFacade](const Sink::ResourceContext &context) {
108 if (alwaysReturnFacade) { 108 if (alwaysReturnFacade) {
109 return map.value(QByteArray()); 109 return map.value(QByteArray());
110 } 110 }
111 return map.value(instanceIdentifier); 111 return map.value(context.resourceInstanceIdentifier);
112 }); 112 });
113 return facade; 113 return facade;
114 } 114 }
diff --git a/common/typeindex.cpp b/common/typeindex.cpp
index 816e7ee..64c2a01 100644
--- a/common/typeindex.cpp
+++ b/common/typeindex.cpp
@@ -66,7 +66,7 @@ QByteArray TypeIndex::indexName(const QByteArray &property, const QByteArray &so
66template <> 66template <>
67void TypeIndex::addProperty<QByteArray>(const QByteArray &property) 67void TypeIndex::addProperty<QByteArray>(const QByteArray &property)
68{ 68{
69 auto indexer = [this, property](const QByteArray &identifier, const QVariant &value, Sink::Storage::Transaction &transaction) { 69 auto indexer = [this, property](const QByteArray &identifier, const QVariant &value, Sink::Storage::DataStore::Transaction &transaction) {
70 // SinkTrace() << "Indexing " << mType + ".index." + property << value.toByteArray(); 70 // SinkTrace() << "Indexing " << mType + ".index." + property << value.toByteArray();
71 Index(indexName(property), transaction).add(getByteArray(value), identifier); 71 Index(indexName(property), transaction).add(getByteArray(value), identifier);
72 }; 72 };
@@ -77,7 +77,7 @@ void TypeIndex::addProperty<QByteArray>(const QByteArray &property)
77template <> 77template <>
78void TypeIndex::addProperty<QString>(const QByteArray &property) 78void TypeIndex::addProperty<QString>(const QByteArray &property)
79{ 79{
80 auto indexer = [this, property](const QByteArray &identifier, const QVariant &value, Sink::Storage::Transaction &transaction) { 80 auto indexer = [this, property](const QByteArray &identifier, const QVariant &value, Sink::Storage::DataStore::Transaction &transaction) {
81 // SinkTrace() << "Indexing " << mType + ".index." + property << value.toByteArray(); 81 // SinkTrace() << "Indexing " << mType + ".index." + property << value.toByteArray();
82 Index(indexName(property), transaction).add(getByteArray(value), identifier); 82 Index(indexName(property), transaction).add(getByteArray(value), identifier);
83 }; 83 };
@@ -88,7 +88,7 @@ void TypeIndex::addProperty<QString>(const QByteArray &property)
88template <> 88template <>
89void TypeIndex::addProperty<QDateTime>(const QByteArray &property) 89void TypeIndex::addProperty<QDateTime>(const QByteArray &property)
90{ 90{
91 auto indexer = [this, property](const QByteArray &identifier, const QVariant &value, Sink::Storage::Transaction &transaction) { 91 auto indexer = [this, property](const QByteArray &identifier, const QVariant &value, Sink::Storage::DataStore::Transaction &transaction) {
92 //SinkTrace() << "Indexing " << mType + ".index." + property << getByteArray(value); 92 //SinkTrace() << "Indexing " << mType + ".index." + property << getByteArray(value);
93 Index(indexName(property), transaction).add(getByteArray(value), identifier); 93 Index(indexName(property), transaction).add(getByteArray(value), identifier);
94 }; 94 };
@@ -99,7 +99,7 @@ void TypeIndex::addProperty<QDateTime>(const QByteArray &property)
99template <> 99template <>
100void TypeIndex::addPropertyWithSorting<QByteArray, QDateTime>(const QByteArray &property, const QByteArray &sortProperty) 100void TypeIndex::addPropertyWithSorting<QByteArray, QDateTime>(const QByteArray &property, const QByteArray &sortProperty)
101{ 101{
102 auto indexer = [=](const QByteArray &identifier, const QVariant &value, const QVariant &sortValue, Sink::Storage::Transaction &transaction) { 102 auto indexer = [=](const QByteArray &identifier, const QVariant &value, const QVariant &sortValue, Sink::Storage::DataStore::Transaction &transaction) {
103 const auto date = sortValue.toDateTime(); 103 const auto date = sortValue.toDateTime();
104 const auto propertyValue = getByteArray(value); 104 const auto propertyValue = getByteArray(value);
105 Index(indexName(property, sortProperty), transaction).add(propertyValue + toSortableByteArray(date), identifier); 105 Index(indexName(property, sortProperty), transaction).add(propertyValue + toSortableByteArray(date), identifier);
@@ -108,7 +108,7 @@ void TypeIndex::addPropertyWithSorting<QByteArray, QDateTime>(const QByteArray &
108 mSortedProperties.insert(property, sortProperty); 108 mSortedProperties.insert(property, sortProperty);
109} 109}
110 110
111void TypeIndex::add(const QByteArray &identifier, const Sink::ApplicationDomain::BufferAdaptor &bufferAdaptor, Sink::Storage::Transaction &transaction) 111void TypeIndex::add(const QByteArray &identifier, const Sink::ApplicationDomain::BufferAdaptor &bufferAdaptor, Sink::Storage::DataStore::Transaction &transaction)
112{ 112{
113 for (const auto &property : mProperties) { 113 for (const auto &property : mProperties) {
114 const auto value = bufferAdaptor.getProperty(property); 114 const auto value = bufferAdaptor.getProperty(property);
@@ -123,7 +123,7 @@ void TypeIndex::add(const QByteArray &identifier, const Sink::ApplicationDomain:
123 } 123 }
124} 124}
125 125
126void TypeIndex::remove(const QByteArray &identifier, const Sink::ApplicationDomain::BufferAdaptor &bufferAdaptor, Sink::Storage::Transaction &transaction) 126void TypeIndex::remove(const QByteArray &identifier, const Sink::ApplicationDomain::BufferAdaptor &bufferAdaptor, Sink::Storage::DataStore::Transaction &transaction)
127{ 127{
128 for (const auto &property : mProperties) { 128 for (const auto &property : mProperties) {
129 const auto value = bufferAdaptor.getProperty(property); 129 const auto value = bufferAdaptor.getProperty(property);
@@ -159,7 +159,7 @@ static QVector<QByteArray> indexLookup(Index &index, Query::Comparator filter)
159 return keys; 159 return keys;
160} 160}
161 161
162QVector<QByteArray> TypeIndex::query(const Sink::Query &query, QSet<QByteArray> &appliedFilters, QByteArray &appliedSorting, Sink::Storage::Transaction &transaction) 162QVector<QByteArray> TypeIndex::query(const Sink::Query &query, QSet<QByteArray> &appliedFilters, QByteArray &appliedSorting, Sink::Storage::DataStore::Transaction &transaction)
163{ 163{
164 QVector<QByteArray> keys; 164 QVector<QByteArray> keys;
165 for (auto it = mSortedProperties.constBegin(); it != mSortedProperties.constEnd(); it++) { 165 for (auto it = mSortedProperties.constBegin(); it != mSortedProperties.constEnd(); it++) {
@@ -185,7 +185,7 @@ QVector<QByteArray> TypeIndex::query(const Sink::Query &query, QSet<QByteArray>
185 return keys; 185 return keys;
186} 186}
187 187
188QVector<QByteArray> TypeIndex::lookup(const QByteArray &property, const QVariant &value, Sink::Storage::Transaction &transaction) 188QVector<QByteArray> TypeIndex::lookup(const QByteArray &property, const QVariant &value, Sink::Storage::DataStore::Transaction &transaction)
189{ 189{
190 SinkTrace() << "Index lookup on property: " << property << mSecondaryProperties.keys() << mProperties; 190 SinkTrace() << "Index lookup on property: " << property << mSecondaryProperties.keys() << mProperties;
191 if (mProperties.contains(property)) { 191 if (mProperties.contains(property)) {
@@ -218,19 +218,19 @@ QVector<QByteArray> TypeIndex::lookup(const QByteArray &property, const QVariant
218} 218}
219 219
220template <> 220template <>
221void TypeIndex::index<QByteArray, QByteArray>(const QByteArray &leftName, const QByteArray &rightName, const QVariant &leftValue, const QVariant &rightValue, Sink::Storage::Transaction &transaction) 221void TypeIndex::index<QByteArray, QByteArray>(const QByteArray &leftName, const QByteArray &rightName, const QVariant &leftValue, const QVariant &rightValue, Sink::Storage::DataStore::Transaction &transaction)
222{ 222{
223 Index(indexName(leftName + rightName), transaction).add(getByteArray(leftValue), getByteArray(rightValue)); 223 Index(indexName(leftName + rightName), transaction).add(getByteArray(leftValue), getByteArray(rightValue));
224} 224}
225 225
226template <> 226template <>
227void TypeIndex::index<QString, QByteArray>(const QByteArray &leftName, const QByteArray &rightName, const QVariant &leftValue, const QVariant &rightValue, Sink::Storage::Transaction &transaction) 227void TypeIndex::index<QString, QByteArray>(const QByteArray &leftName, const QByteArray &rightName, const QVariant &leftValue, const QVariant &rightValue, Sink::Storage::DataStore::Transaction &transaction)
228{ 228{
229 Index(indexName(leftName + rightName), transaction).add(getByteArray(leftValue), getByteArray(rightValue)); 229 Index(indexName(leftName + rightName), transaction).add(getByteArray(leftValue), getByteArray(rightValue));
230} 230}
231 231
232template <> 232template <>
233QVector<QByteArray> TypeIndex::secondaryLookup<QByteArray>(const QByteArray &leftName, const QByteArray &rightName, const QVariant &value, Sink::Storage::Transaction &transaction) 233QVector<QByteArray> TypeIndex::secondaryLookup<QByteArray>(const QByteArray &leftName, const QByteArray &rightName, const QVariant &value, Sink::Storage::DataStore::Transaction &transaction)
234{ 234{
235 QVector<QByteArray> keys; 235 QVector<QByteArray> keys;
236 Index index(indexName(leftName + rightName), transaction); 236 Index index(indexName(leftName + rightName), transaction);
@@ -242,7 +242,7 @@ QVector<QByteArray> TypeIndex::secondaryLookup<QByteArray>(const QByteArray &lef
242} 242}
243 243
244template <> 244template <>
245QVector<QByteArray> TypeIndex::secondaryLookup<QString>(const QByteArray &leftName, const QByteArray &rightName, const QVariant &value, Sink::Storage::Transaction &transaction) 245QVector<QByteArray> TypeIndex::secondaryLookup<QString>(const QByteArray &leftName, const QByteArray &rightName, const QVariant &value, Sink::Storage::DataStore::Transaction &transaction)
246{ 246{
247 QVector<QByteArray> keys; 247 QVector<QByteArray> keys;
248 Index index(indexName(leftName + rightName), transaction); 248 Index index(indexName(leftName + rightName), transaction);
diff --git a/common/typeindex.h b/common/typeindex.h
index 4972e95..2638577 100644
--- a/common/typeindex.h
+++ b/common/typeindex.h
@@ -52,29 +52,29 @@ public:
52 { 52 {
53 mSecondaryProperties.insert(Left::name, Right::name); 53 mSecondaryProperties.insert(Left::name, Right::name);
54 } 54 }
55 void add(const QByteArray &identifier, const Sink::ApplicationDomain::BufferAdaptor &bufferAdaptor, Sink::Storage::Transaction &transaction); 55 void add(const QByteArray &identifier, const Sink::ApplicationDomain::BufferAdaptor &bufferAdaptor, Sink::Storage::DataStore::Transaction &transaction);
56 void remove(const QByteArray &identifier, const Sink::ApplicationDomain::BufferAdaptor &bufferAdaptor, Sink::Storage::Transaction &transaction); 56 void remove(const QByteArray &identifier, const Sink::ApplicationDomain::BufferAdaptor &bufferAdaptor, Sink::Storage::DataStore::Transaction &transaction);
57 57
58 QVector<QByteArray> query(const Sink::Query &query, QSet<QByteArray> &appliedFilters, QByteArray &appliedSorting, Sink::Storage::Transaction &transaction); 58 QVector<QByteArray> query(const Sink::Query &query, QSet<QByteArray> &appliedFilters, QByteArray &appliedSorting, Sink::Storage::DataStore::Transaction &transaction);
59 QVector<QByteArray> lookup(const QByteArray &property, const QVariant &value, Sink::Storage::Transaction &transaction); 59 QVector<QByteArray> lookup(const QByteArray &property, const QVariant &value, Sink::Storage::DataStore::Transaction &transaction);
60 60
61 template <typename Left, typename Right> 61 template <typename Left, typename Right>
62 QVector<QByteArray> secondaryLookup(const QVariant &value, Sink::Storage::Transaction &transaction) 62 QVector<QByteArray> secondaryLookup(const QVariant &value, Sink::Storage::DataStore::Transaction &transaction)
63 { 63 {
64 return secondaryLookup<typename Left::Type>(Left::name, Right::name, value, transaction); 64 return secondaryLookup<typename Left::Type>(Left::name, Right::name, value, transaction);
65 } 65 }
66 66
67 template <typename Type> 67 template <typename Type>
68 QVector<QByteArray> secondaryLookup(const QByteArray &leftName, const QByteArray &rightName, const QVariant &value, Sink::Storage::Transaction &transaction); 68 QVector<QByteArray> secondaryLookup(const QByteArray &leftName, const QByteArray &rightName, const QVariant &value, Sink::Storage::DataStore::Transaction &transaction);
69 69
70 template <typename Left, typename Right> 70 template <typename Left, typename Right>
71 void index(const QVariant &leftValue, const QVariant &rightValue, Sink::Storage::Transaction &transaction) 71 void index(const QVariant &leftValue, const QVariant &rightValue, Sink::Storage::DataStore::Transaction &transaction)
72 { 72 {
73 index<typename Left::Type, typename Right::Type>(Left::name, Right::name, leftValue, rightValue, transaction); 73 index<typename Left::Type, typename Right::Type>(Left::name, Right::name, leftValue, rightValue, transaction);
74 } 74 }
75 75
76 template <typename LeftType, typename RightType> 76 template <typename LeftType, typename RightType>
77 void index(const QByteArray &leftName, const QByteArray &rightName, const QVariant &leftValue, const QVariant &rightValue, Sink::Storage::Transaction &transaction); 77 void index(const QByteArray &leftName, const QByteArray &rightName, const QVariant &leftValue, const QVariant &rightValue, Sink::Storage::DataStore::Transaction &transaction);
78 78
79 79
80private: 80private:
@@ -85,6 +85,6 @@ private:
85 QMap<QByteArray, QByteArray> mSortedProperties; 85 QMap<QByteArray, QByteArray> mSortedProperties;
86 //<Property, ResultProperty> 86 //<Property, ResultProperty>
87 QMap<QByteArray, QByteArray> mSecondaryProperties; 87 QMap<QByteArray, QByteArray> mSecondaryProperties;
88 QHash<QByteArray, std::function<void(const QByteArray &identifier, const QVariant &value, Sink::Storage::Transaction &transaction)>> mIndexer; 88 QHash<QByteArray, std::function<void(const QByteArray &identifier, const QVariant &value, Sink::Storage::DataStore::Transaction &transaction)>> mIndexer;
89 QHash<QByteArray, std::function<void(const QByteArray &identifier, const QVariant &value, const QVariant &sortValue, Sink::Storage::Transaction &transaction)>> mSortIndexer; 89 QHash<QByteArray, std::function<void(const QByteArray &identifier, const QVariant &value, const QVariant &sortValue, Sink::Storage::DataStore::Transaction &transaction)>> mSortIndexer;
90}; 90};