summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--common/CMakeLists.txt1
-rw-r--r--common/datastorequery.cpp12
-rw-r--r--common/datastorequery.h4
-rw-r--r--common/entityreader.cpp207
-rw-r--r--common/entityreader.h95
-rw-r--r--common/queryrunner.cpp8
-rw-r--r--common/specialpurposepreprocessor.cpp1
-rw-r--r--common/synchronizer.cpp20
8 files changed, 22 insertions, 326 deletions
diff --git a/common/CMakeLists.txt b/common/CMakeLists.txt
index fd8e898..e329d93 100644
--- a/common/CMakeLists.txt
+++ b/common/CMakeLists.txt
@@ -71,7 +71,6 @@ set(command_SRCS
71 synchronizer.cpp 71 synchronizer.cpp
72 remoteidmap.cpp 72 remoteidmap.cpp
73 sourcewriteback.cpp 73 sourcewriteback.cpp
74 entityreader.cpp
75 mailpreprocessor.cpp 74 mailpreprocessor.cpp
76 specialpurposepreprocessor.cpp 75 specialpurposepreprocessor.cpp
77 datastorequery.cpp 76 datastorequery.cpp
diff --git a/common/datastorequery.cpp b/common/datastorequery.cpp
index f56ac91..9f20ba9 100644
--- a/common/datastorequery.cpp
+++ b/common/datastorequery.cpp
@@ -298,7 +298,7 @@ public:
298 } 298 }
299}; 299};
300 300
301DataStoreQuery::DataStoreQuery(const Sink::Query &query, const QByteArray &type, EntityStore::Ptr store) 301DataStoreQuery::DataStoreQuery(const Sink::Query &query, const QByteArray &type, EntityStore &store)
302 : mQuery(query), mType(type), mStore(store) 302 : mQuery(query), mType(type), mStore(store)
303{ 303{
304 setupQuery(); 304 setupQuery();
@@ -306,12 +306,12 @@ DataStoreQuery::DataStoreQuery(const Sink::Query &query, const QByteArray &type,
306 306
307void DataStoreQuery::readEntity(const QByteArray &key, const BufferCallback &resultCallback) 307void DataStoreQuery::readEntity(const QByteArray &key, const BufferCallback &resultCallback)
308{ 308{
309 mStore->readLatest(mType, key, resultCallback); 309 mStore.readLatest(mType, key, resultCallback);
310} 310}
311 311
312QVector<QByteArray> DataStoreQuery::indexLookup(const QByteArray &property, const QVariant &value) 312QVector<QByteArray> DataStoreQuery::indexLookup(const QByteArray &property, const QVariant &value)
313{ 313{
314 return mStore->indexLookup(mType, property, value); 314 return mStore.indexLookup(mType, property, value);
315} 315}
316 316
317/* ResultSet DataStoreQuery::filterAndSortSet(ResultSet &resultSet, const FilterFunction &filter, const QByteArray &sortProperty) */ 317/* ResultSet DataStoreQuery::filterAndSortSet(ResultSet &resultSet, const FilterFunction &filter, const QByteArray &sortProperty) */
@@ -443,13 +443,13 @@ void DataStoreQuery::setupQuery()
443 } else { 443 } else {
444 QSet<QByteArray> appliedFilters; 444 QSet<QByteArray> appliedFilters;
445 445
446 auto resultSet = mStore->indexLookup(mType, mQuery, appliedFilters, appliedSorting); 446 auto resultSet = mStore.indexLookup(mType, mQuery, appliedFilters, appliedSorting);
447 remainingFilters = remainingFilters - appliedFilters; 447 remainingFilters = remainingFilters - appliedFilters;
448 448
449 // We do a full scan if there were no indexes available to create the initial set. 449 // We do a full scan if there were no indexes available to create the initial set.
450 if (appliedFilters.isEmpty()) { 450 if (appliedFilters.isEmpty()) {
451 // TODO this should be replaced by an index lookup on the uid index 451 // TODO this should be replaced by an index lookup on the uid index
452 mSource = Source::Ptr::create(mStore->fullScan(mType), this); 452 mSource = Source::Ptr::create(mStore.fullScan(mType), this);
453 } else { 453 } else {
454 mSource = Source::Ptr::create(resultSet, this); 454 mSource = Source::Ptr::create(resultSet, this);
455 } 455 }
@@ -492,7 +492,7 @@ QVector<QByteArray> DataStoreQuery::loadIncrementalResultSet(qint64 baseRevision
492{ 492{
493 auto revisionCounter = QSharedPointer<qint64>::create(baseRevision); 493 auto revisionCounter = QSharedPointer<qint64>::create(baseRevision);
494 QVector<QByteArray> changedKeys; 494 QVector<QByteArray> changedKeys;
495 mStore->readRevisions(baseRevision, mType, [&](const QByteArray &key) { 495 mStore.readRevisions(baseRevision, mType, [&](const QByteArray &key) {
496 changedKeys << key; 496 changedKeys << key;
497 }); 497 });
498 SinkTrace() << "Finished reading incremental result set:" << *revisionCounter; 498 SinkTrace() << "Finished reading incremental result set:" << *revisionCounter;
diff --git a/common/datastorequery.h b/common/datastorequery.h
index 92235fd..25fee66 100644
--- a/common/datastorequery.h
+++ b/common/datastorequery.h
@@ -35,7 +35,7 @@ class DataStoreQuery {
35public: 35public:
36 typedef QSharedPointer<DataStoreQuery> Ptr; 36 typedef QSharedPointer<DataStoreQuery> Ptr;
37 37
38 DataStoreQuery(const Sink::Query &query, const QByteArray &type, Sink::Storage::EntityStore::Ptr store); 38 DataStoreQuery(const Sink::Query &query, const QByteArray &type, Sink::Storage::EntityStore &store);
39 ResultSet execute(); 39 ResultSet execute();
40 ResultSet update(qint64 baseRevision); 40 ResultSet update(qint64 baseRevision);
41 41
@@ -60,7 +60,7 @@ private:
60 QSharedPointer<FilterBase> mCollector; 60 QSharedPointer<FilterBase> mCollector;
61 QSharedPointer<Source> mSource; 61 QSharedPointer<Source> mSource;
62 62
63 QSharedPointer<Sink::Storage::EntityStore> mStore; 63 Sink::Storage::EntityStore &mStore;
64 64
65 SINK_DEBUG_COMPONENT(mType) 65 SINK_DEBUG_COMPONENT(mType)
66}; 66};
diff --git a/common/entityreader.cpp b/common/entityreader.cpp
deleted file mode 100644
index c49d1f7..0000000
--- a/common/entityreader.cpp
+++ /dev/null
@@ -1,207 +0,0 @@
1/*
2 * Copyright (C) 2016 Christian Mollekopf <mollekopf@kolabsys.com>
3 *
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Lesser General Public
6 * License as published by the Free Software Foundation; either
7 * version 2.1 of the License, or (at your option) version 3, or any
8 * later version accepted by the membership of KDE e.V. (or its
9 * successor approved by the membership of KDE e.V.), which shall
10 * act as a proxy defined in Section 6 of version 3 of the license.
11 *
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this library. If not, see <http://www.gnu.org/licenses/>.
19 */
20#include "entityreader.h"
21
22#include "resultset.h"
23#include "storage.h"
24#include "query.h"
25#include "datastorequery.h"
26
27SINK_DEBUG_AREA("entityreader")
28
29using namespace Sink;
30
31/* QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> EntityReaderUtils::getLatest(const Sink::Storage::DataStore::NamedDatabase &db, const QByteArray &uid, DomainTypeAdaptorFactoryInterface &adaptorFactory, qint64 &retrievedRevision) */
32/* { */
33/* QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> current; */
34/* db.findLatest(uid, */
35/* [&current, &adaptorFactory, &retrievedRevision](const QByteArray &key, const QByteArray &data) -> bool { */
36/* Sink::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); */
37/* if (!buffer.isValid()) { */
38/* SinkWarning() << "Read invalid buffer from disk"; */
39/* } else { */
40/* SinkTrace() << "Found value " << key; */
41/* current = adaptorFactory.createAdaptor(buffer.entity()); */
42/* retrievedRevision = Sink::Storage::DataStore::revisionFromKey(key); */
43/* } */
44/* return false; */
45/* }, */
46/* [](const Sink::Storage::DataStore::Error &error) { SinkWarning() << "Failed to read current value from storage: " << error.message; }); */
47/* return current; */
48/* } */
49
50/* QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> EntityReaderUtils::get(const Sink::Storage::DataStore::NamedDatabase &db, const QByteArray &key, DomainTypeAdaptorFactoryInterface &adaptorFactory, qint64 &retrievedRevision) */
51/* { */
52/* QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> current; */
53/* db.scan(key, */
54/* [&current, &adaptorFactory, &retrievedRevision](const QByteArray &key, const QByteArray &data) -> bool { */
55/* Sink::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); */
56/* if (!buffer.isValid()) { */
57/* SinkWarning() << "Read invalid buffer from disk"; */
58/* } else { */
59/* current = adaptorFactory.createAdaptor(buffer.entity()); */
60/* retrievedRevision = Sink::Storage::DataStore::revisionFromKey(key); */
61/* } */
62/* return false; */
63/* }, */
64/* [](const Sink::Storage::DataStore::Error &error) { SinkWarning() << "Failed to read current value from storage: " << error.message; }); */
65/* return current; */
66/* } */
67
68/* QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> EntityReaderUtils::getPrevious(const Sink::Storage::DataStore::NamedDatabase &db, const QByteArray &uid, qint64 revision, DomainTypeAdaptorFactoryInterface &adaptorFactory, qint64 &retrievedRevision) */
69/* { */
70/* QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> current; */
71/* qint64 latestRevision = 0; */
72/* db.scan(uid, */
73/* [&current, &latestRevision, revision](const QByteArray &key, const QByteArray &) -> bool { */
74/* auto foundRevision = Sink::Storage::DataStore::revisionFromKey(key); */
75/* if (foundRevision < revision && foundRevision > latestRevision) { */
76/* latestRevision = foundRevision; */
77/* } */
78/* return true; */
79/* }, */
80/* [](const Sink::Storage::DataStore::Error &error) { SinkWarning() << "Failed to read current value from storage: " << error.message; }, true); */
81/* return get(db, Sink::Storage::DataStore::assembleKey(uid, latestRevision), adaptorFactory, retrievedRevision); */
82/* } */
83
84/* template <class DomainType> */
85/* EntityReader<DomainType>::EntityReader(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier, Sink::Storage::DataStore::Transaction &transaction) */
86/* : mResourceInstanceIdentifier(resourceInstanceIdentifier), */
87/* mTransaction(transaction), */
88/* mDomainTypeAdaptorFactoryPtr(Sink::AdaptorFactoryRegistry::instance().getFactory<DomainType>(resourceType)), */
89/* mDomainTypeAdaptorFactory(*mDomainTypeAdaptorFactoryPtr) */
90/* { */
91/* Q_ASSERT(!resourceType.isEmpty()); */
92/* Q_ASSERT(mDomainTypeAdaptorFactoryPtr); */
93/* } */
94
95/* template <class DomainType> */
96/* EntityReader<DomainType>::EntityReader(DomainTypeAdaptorFactoryInterface &domainTypeAdaptorFactory, const QByteArray &resourceInstanceIdentifier, Sink::Storage::DataStore::Transaction &transaction) */
97/* : mResourceInstanceIdentifier(resourceInstanceIdentifier), */
98/* mTransaction(transaction), */
99/* mDomainTypeAdaptorFactory(domainTypeAdaptorFactory) */
100/* { */
101
102/* } */
103
104template <class DomainType>
105EntityReader<DomainType>::EntityReader(Storage::EntityStore &entityStore)
106 : mEntityStore(entityStore)
107{
108
109}
110
111template <class DomainType>
112DomainType EntityReader<DomainType>::read(const QByteArray &identifier) const
113{
114 auto typeName = ApplicationDomain::getTypeName<DomainType>();
115 return mEntityStore.readLatest<DomainType>(identifier);
116}
117
118template <class DomainType>
119DomainType EntityReader<DomainType>::readFromKey(const QByteArray &key) const
120{
121 /* auto typeName = ApplicationDomain::getTypeName<DomainType>(); */
122 /* auto mainDatabase = Storage::DataStore::mainDatabase(mTransaction, typeName); */
123 /* qint64 retrievedRevision = 0; */
124 /* auto bufferAdaptor = EntityReaderUtils::get(mainDatabase, key, mDomainTypeAdaptorFactory, retrievedRevision); */
125 /* const auto identifier = Storage::DataStore::uidFromKey(key); */
126 /* if (!bufferAdaptor) { */
127 /* return DomainType(); */
128 /* } */
129 /* return DomainType(mResourceInstanceIdentifier, identifier, retrievedRevision, bufferAdaptor); */
130 return mEntityStore.readEntity<DomainType>(key);
131}
132
133template <class DomainType>
134DomainType EntityReader<DomainType>::readPrevious(const QByteArray &uid, qint64 revision) const
135{
136 return mEntityStore.readPrevious<DomainType>(uid, revision);
137}
138
139template <class DomainType>
140void EntityReader<DomainType>::query(const Sink::Query &query, const std::function<bool(const DomainType &)> &callback)
141{
142 executeInitialQuery(query, 0, 0,
143 [&callback](const typename DomainType::Ptr &value, Sink::Operation operation, const QMap<QByteArray, QVariant> &) -> bool {
144 Q_ASSERT(operation == Sink::Operation_Creation);
145 return callback(*value);
146 });
147}
148
149template <class DomainType>
150QPair<qint64, qint64> EntityReader<DomainType>::executeInitialQuery(const Sink::Query &query, int offset, int batchsize, const ResultCallback &callback)
151{
152 QTime time;
153 time.start();
154
155 auto preparedQuery = ApplicationDomain::TypeImplementation<DomainType>::prepareQuery(query, Storage::EntityStore::Ptr(&mEntityStore, [](Storage::EntityStore *){}));
156 auto resultSet = preparedQuery->execute();
157
158 SinkTrace() << "Filtered set retrieved. " << Log::TraceTime(time.elapsed());
159 auto replayedEntities = replaySet(resultSet, offset, batchsize, callback);
160
161 SinkTrace() << "Initial query took: " << Log::TraceTime(time.elapsed());
162 return qMakePair(mEntityStore.maxRevision(), replayedEntities);
163}
164
165template <class DomainType>
166QPair<qint64, qint64> EntityReader<DomainType>::executeIncrementalQuery(const Sink::Query &query, qint64 lastRevision, const ResultCallback &callback)
167{
168 QTime time;
169 time.start();
170 const qint64 baseRevision = lastRevision + 1;
171
172 auto preparedQuery = ApplicationDomain::TypeImplementation<DomainType>::prepareQuery(query, Storage::EntityStore::Ptr(&mEntityStore, [](Storage::EntityStore *){}));
173 auto resultSet = preparedQuery->update(baseRevision);
174
175 SinkTrace() << "Filtered set retrieved. " << Log::TraceTime(time.elapsed());
176 auto replayedEntities = replaySet(resultSet, 0, 0, callback);
177
178 SinkTrace() << "Incremental query took: " << Log::TraceTime(time.elapsed());
179 return qMakePair(mEntityStore.maxRevision(), replayedEntities);
180}
181
182template <class DomainType>
183qint64 EntityReader<DomainType>::replaySet(ResultSet &resultSet, int offset, int batchSize, const ResultCallback &callback)
184{
185 SinkTrace() << "Skipping over " << offset << " results";
186 resultSet.skip(offset);
187 int counter = 0;
188 /* while (!batchSize || (counter < batchSize)) { */
189 /* const bool ret = */
190 /* resultSet.next([this, &counter, callback](const ResultSet::Result &result) -> bool { */
191 /* counter++; */
192 /* auto adaptor = mResourceContext.adaptorFactory<DomainType>().createAdaptor(result.buffer.entity()); */
193 /* Q_ASSERT(adaptor); */
194 /* return callback(QSharedPointer<DomainType>::create(mResourceContext, result.uid, result.buffer.revision(), adaptor), result.operation, result.aggregateValues); */
195 /* }); */
196 /* if (!ret) { */
197 /* break; */
198 /* } */
199 /* }; */
200 SinkTrace() << "Replayed " << counter << " results."
201 << "Limit " << batchSize;
202 return counter;
203}
204
205template class Sink::EntityReader<Sink::ApplicationDomain::Folder>;
206template class Sink::EntityReader<Sink::ApplicationDomain::Mail>;
207template class Sink::EntityReader<Sink::ApplicationDomain::Event>;
diff --git a/common/entityreader.h b/common/entityreader.h
deleted file mode 100644
index a641106..0000000
--- a/common/entityreader.h
+++ /dev/null
@@ -1,95 +0,0 @@
1
2/*
3 * Copyright (C) 2016 Christian Mollekopf <mollekopf@kolabsys.com>
4 *
5 * This library is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU Lesser General Public
7 * License as published by the Free Software Foundation; either
8 * version 2.1 of the License, or (at your option) version 3, or any
9 * later version accepted by the membership of KDE e.V. (or its
10 * successor approved by the membership of KDE e.V.), which shall
11 * act as a proxy defined in Section 6 of version 3 of the license.
12 *
13 * This library is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 * Lesser General Public License for more details.
17 *
18 * You should have received a copy of the GNU Lesser General Public
19 * License along with this library. If not, see <http://www.gnu.org/licenses/>.
20 */
21#pragma once
22
23#include "sink_export.h"
24#include <domainadaptor.h>
25
26#include "storage.h"
27#include "resultprovider.h"
28#include "adaptorfactoryregistry.h"
29
30namespace Sink {
31
32namespace EntityReaderUtils {
33 SINK_EXPORT QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> getLatest(const Sink::Storage::DataStore::NamedDatabase &db, const QByteArray &uid, DomainTypeAdaptorFactoryInterface &adaptorFactory, qint64 &retrievedRevision);
34 SINK_EXPORT QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> get(const Sink::Storage::DataStore::NamedDatabase &db, const QByteArray &key, DomainTypeAdaptorFactoryInterface &adaptorFactory, qint64 &retrievedRevision);
35 SINK_EXPORT QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> getPrevious(const Sink::Storage::DataStore::NamedDatabase &db, const QByteArray &uid, qint64 revision, DomainTypeAdaptorFactoryInterface &adaptorFactory, qint64 &retrievedRevision);
36};
37
38/**
39 * A synchronous interface to read entities from the storage.
40 *
41 * All callbacks will be called before the end of the function.
42 * The caller must ensure passed in references remain valid for the lifetime of the object.
43 *
44 * This class is meant to be instantiated temporarily during reads on the stack.
45 *
46 * Note that all objects returned in callbacks are only valid during the execution of the callback and may start pointing into invalid memory if shallow-copied.
47 */
48template<typename DomainType>
49class SINK_EXPORT EntityReader
50{
51 typedef std::function<bool(const typename DomainType::Ptr &domainObject, Sink::Operation operation, const QMap<QByteArray, QVariant> &aggregateValues)> ResultCallback;
52
53public:
54 EntityReader(Storage::EntityStore &store);
55
56 /**
57 * Reads the latest revision of an entity identified by @param uid
58 */
59 DomainType read(const QByteArray &uid) const;
60
61 /**
62 * Reads the revision of the entity identified by @param key (uid + revision)
63 */
64 DomainType readFromKey(const QByteArray &key) const;
65
66 /**
67 * Reads the (revision - 1) of an entity identified by @param uid
68 */
69 DomainType readPrevious(const QByteArray &uid, qint64 revision) const;
70
71 /**
72 * Reads all entities that match @param query.
73 */
74 void query(const Sink::Query &query, const std::function<bool(const DomainType &)> &callback);
75
76 /**
77 * Returns all entities that match @param query.
78 *
79 * @param offset and @param batchsize can be used to return paginated results.
80 */
81 QPair<qint64, qint64> executeInitialQuery(const Sink::Query &query, int offset, int batchsize, const ResultCallback &callback);
82
83 /**
84 * Returns all changed entities that match @param query starting from @param lastRevision
85 */
86 QPair<qint64, qint64> executeIncrementalQuery(const Sink::Query &query, qint64 lastRevision, const ResultCallback &callback);
87
88private:
89 qint64 replaySet(ResultSet &resultSet, int offset, int batchSize, const ResultCallback &callback);
90
91private:
92 Sink::Storage::EntityStore &mEntityStore;
93};
94
95}
diff --git a/common/queryrunner.cpp b/common/queryrunner.cpp
index 67e83ea..bc4b5fc 100644
--- a/common/queryrunner.cpp
+++ b/common/queryrunner.cpp
@@ -206,7 +206,7 @@ QPair<qint64, qint64> QueryWorker<DomainType>::executeIncrementalQuery(const Sin
206 time.start(); 206 time.start();
207 207
208 const qint64 baseRevision = resultProvider.revision() + 1; 208 const qint64 baseRevision = resultProvider.revision() + 1;
209 auto entityStore = EntityStore::Ptr::create(mResourceContext); 209 auto entityStore = EntityStore{mResourceContext};
210 auto preparedQuery = DataStoreQuery{query, ApplicationDomain::getTypeName<DomainType>(), entityStore}; 210 auto preparedQuery = DataStoreQuery{query, ApplicationDomain::getTypeName<DomainType>(), entityStore};
211 auto resultSet = preparedQuery.update(baseRevision); 211 auto resultSet = preparedQuery.update(baseRevision);
212 SinkTrace() << "Filtered set retrieved. " << Log::TraceTime(time.elapsed()); 212 SinkTrace() << "Filtered set retrieved. " << Log::TraceTime(time.elapsed());
@@ -215,7 +215,7 @@ QPair<qint64, qint64> QueryWorker<DomainType>::executeIncrementalQuery(const Sin
215 }); 215 });
216 216
217 SinkTrace() << "Incremental query took: " << Log::TraceTime(time.elapsed()); 217 SinkTrace() << "Incremental query took: " << Log::TraceTime(time.elapsed());
218 return qMakePair(entityStore->maxRevision(), replayedEntities); 218 return qMakePair(entityStore.maxRevision(), replayedEntities);
219} 219}
220 220
221template <class DomainType> 221template <class DomainType>
@@ -236,7 +236,7 @@ QPair<qint64, qint64> QueryWorker<DomainType>::executeInitialQuery(
236 } 236 }
237 } 237 }
238 238
239 auto entityStore = EntityStore::Ptr::create(mResourceContext); 239 auto entityStore = EntityStore{mResourceContext};
240 auto preparedQuery = DataStoreQuery{query, ApplicationDomain::getTypeName<DomainType>(), entityStore}; 240 auto preparedQuery = DataStoreQuery{query, ApplicationDomain::getTypeName<DomainType>(), entityStore};
241 auto resultSet = preparedQuery.execute(); 241 auto resultSet = preparedQuery.execute();
242 242
@@ -246,7 +246,7 @@ QPair<qint64, qint64> QueryWorker<DomainType>::executeInitialQuery(
246 }); 246 });
247 247
248 SinkTrace() << "Initial query took: " << Log::TraceTime(time.elapsed()); 248 SinkTrace() << "Initial query took: " << Log::TraceTime(time.elapsed());
249 return qMakePair(entityStore->maxRevision(), replayedEntities); 249 return qMakePair(entityStore.maxRevision(), replayedEntities);
250} 250}
251 251
252template class QueryRunner<Sink::ApplicationDomain::Folder>; 252template class QueryRunner<Sink::ApplicationDomain::Folder>;
diff --git a/common/specialpurposepreprocessor.cpp b/common/specialpurposepreprocessor.cpp
index a6ee373..0fd8e34 100644
--- a/common/specialpurposepreprocessor.cpp
+++ b/common/specialpurposepreprocessor.cpp
@@ -1,5 +1,4 @@
1#include "specialpurposepreprocessor.h" 1#include "specialpurposepreprocessor.h"
2#include "entityreader.h"
3#include "query.h" 2#include "query.h"
4#include "applicationdomaintype.h" 3#include "applicationdomaintype.h"
5 4
diff --git a/common/synchronizer.cpp b/common/synchronizer.cpp
index 9bba9cf..206cf5e 100644
--- a/common/synchronizer.cpp
+++ b/common/synchronizer.cpp
@@ -23,7 +23,7 @@
23#include "commands.h" 23#include "commands.h"
24#include "bufferutils.h" 24#include "bufferutils.h"
25#include "remoteidmap.h" 25#include "remoteidmap.h"
26#include "entityreader.h" 26#include "datastorequery.h"
27#include "createentity_generated.h" 27#include "createentity_generated.h"
28#include "modifyentity_generated.h" 28#include "modifyentity_generated.h"
29#include "deleteentity_generated.h" 29#include "deleteentity_generated.h"
@@ -200,15 +200,15 @@ void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray
200 query.filter(it.key(), it.value()); 200 query.filter(it.key(), it.value());
201 } 201 }
202 bool merge = false; 202 bool merge = false;
203 Storage::EntityStore store(mResourceContext); 203 Storage::EntityStore store{mResourceContext};
204 Sink::EntityReader<DomainType> reader(store); 204 DataStoreQuery dataStoreQuery{query, ApplicationDomain::getTypeName<DomainType>(), store};
205 reader.query(query, 205 auto resultSet = dataStoreQuery.execute();
206 [this, bufferType, remoteId, &merge](const DomainType &o) -> bool{ 206 resultSet.replaySet(0, 1, [this, &merge, bufferType, remoteId](const ResultSet::Result &r) {
207 merge = true; 207 merge = true;
208 SinkTrace() << "Merging local entity with remote entity: " << o.identifier() << remoteId; 208 SinkTrace() << "Merging local entity with remote entity: " << r.entity.identifier() << remoteId;
209 syncStore().recordRemoteId(bufferType, o.identifier(), remoteId); 209 syncStore().recordRemoteId(bufferType, r.entity.identifier(), remoteId);
210 return false; 210 });
211 }); 211
212 if (!merge) { 212 if (!merge) {
213 SinkTrace() << "Found a new entity: " << remoteId; 213 SinkTrace() << "Found a new entity: " << remoteId;
214 createEntity(sinkId, bufferType, entity); 214 createEntity(sinkId, bufferType, entity);