summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--common/CMakeLists.txt1
-rw-r--r--common/entityreader.cpp412
-rw-r--r--common/entityreader.h110
-rw-r--r--common/entitystore.cpp52
-rw-r--r--common/entitystore.h37
-rw-r--r--common/query.h40
-rw-r--r--common/queryrunner.cpp317
-rw-r--r--common/synchronizer.cpp2
8 files changed, 594 insertions, 377 deletions
diff --git a/common/CMakeLists.txt b/common/CMakeLists.txt
index 3c6a083..752e4e4 100644
--- a/common/CMakeLists.txt
+++ b/common/CMakeLists.txt
@@ -72,6 +72,7 @@ set(command_SRCS
72 entitystore.cpp 72 entitystore.cpp
73 remoteidmap.cpp 73 remoteidmap.cpp
74 sourcewriteback.cpp 74 sourcewriteback.cpp
75 entityreader.cpp
75 ${storage_SRCS}) 76 ${storage_SRCS})
76 77
77add_library(${PROJECT_NAME} SHARED ${command_SRCS}) 78add_library(${PROJECT_NAME} SHARED ${command_SRCS})
diff --git a/common/entityreader.cpp b/common/entityreader.cpp
new file mode 100644
index 0000000..b29b2a3
--- /dev/null
+++ b/common/entityreader.cpp
@@ -0,0 +1,412 @@
1/*
2 * Copyright (C) 2016 Christian Mollekopf <mollekopf@kolabsys.com>
3 *
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Lesser General Public
6 * License as published by the Free Software Foundation; either
7 * version 2.1 of the License, or (at your option) version 3, or any
8 * later version accepted by the membership of KDE e.V. (or its
9 * successor approved by the membership of KDE e.V.), which shall
10 * act as a proxy defined in Section 6 of version 3 of the license.
11 *
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this library. If not, see <http://www.gnu.org/licenses/>.
19 */
20#include "entityreader.h"
21
22#include "resultset.h"
23#include "storage.h"
24#include "query.h"
25
26using namespace Sink;
27
28QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> EntityReaderUtils::getLatest(const Sink::Storage::NamedDatabase &db, const QByteArray &uid, DomainTypeAdaptorFactoryInterface &adaptorFactory, qint64 &retrievedRevision)
29{
30 QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> current;
31 db.findLatest(uid,
32 [&current, &adaptorFactory, &retrievedRevision](const QByteArray &key, const QByteArray &data) -> bool {
33 Sink::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size());
34 if (!buffer.isValid()) {
35 Warning() << "Read invalid buffer from disk";
36 } else {
37 Trace() << "Found value " << key;
38 current = adaptorFactory.createAdaptor(buffer.entity());
39 retrievedRevision = Sink::Storage::revisionFromKey(key);
40 }
41 return false;
42 },
43 [](const Sink::Storage::Error &error) { Warning() << "Failed to read current value from storage: " << error.message; });
44 return current;
45}
46
47QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> EntityReaderUtils::get(const Sink::Storage::NamedDatabase &db, const QByteArray &key, DomainTypeAdaptorFactoryInterface &adaptorFactory, qint64 &retrievedRevision)
48{
49 QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> current;
50 db.scan(key,
51 [&current, &adaptorFactory, &retrievedRevision](const QByteArray &key, const QByteArray &data) -> bool {
52 Sink::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size());
53 if (!buffer.isValid()) {
54 Warning() << "Read invalid buffer from disk";
55 } else {
56 current = adaptorFactory.createAdaptor(buffer.entity());
57 retrievedRevision = Sink::Storage::revisionFromKey(key);
58 }
59 return false;
60 },
61 [](const Sink::Storage::Error &error) { Warning() << "Failed to read current value from storage: " << error.message; });
62 return current;
63}
64
65QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> EntityReaderUtils::getPrevious(const Sink::Storage::NamedDatabase &db, const QByteArray &uid, qint64 revision, DomainTypeAdaptorFactoryInterface &adaptorFactory, qint64 &retrievedRevision)
66{
67 QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> current;
68 qint64 latestRevision = 0;
69 db.scan(uid,
70 [&current, &latestRevision, revision](const QByteArray &key, const QByteArray &) -> bool {
71 auto foundRevision = Sink::Storage::revisionFromKey(key);
72 if (foundRevision < revision && foundRevision > latestRevision) {
73 latestRevision = foundRevision;
74 }
75 return true;
76 },
77 [](const Sink::Storage::Error &error) { Warning() << "Failed to read current value from storage: " << error.message; }, true);
78 return get(db, Sink::Storage::assembleKey(uid, latestRevision), adaptorFactory, retrievedRevision);
79}
80
81template <class DomainType>
82EntityReader<DomainType>::EntityReader(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier, Sink::Storage::Transaction &transaction)
83 : mResourceInstanceIdentifier(resourceInstanceIdentifier),
84 mTransaction(transaction),
85 mDomainTypeAdaptorFactoryPtr(Sink::AdaptorFactoryRegistry::instance().getFactory<DomainType>(resourceType)),
86 mDomainTypeAdaptorFactory(*mDomainTypeAdaptorFactoryPtr)
87{
88
89}
90
91template <class DomainType>
92EntityReader<DomainType>::EntityReader(DomainTypeAdaptorFactoryInterface &domainTypeAdaptorFactory, const QByteArray &resourceInstanceIdentifier, Sink::Storage::Transaction &transaction)
93 : mResourceInstanceIdentifier(resourceInstanceIdentifier),
94 mTransaction(transaction),
95 mDomainTypeAdaptorFactory(domainTypeAdaptorFactory)
96{
97
98}
99
100template <class DomainType>
101DomainType EntityReader<DomainType>::read(const QByteArray &identifier) const
102{
103 auto typeName = ApplicationDomain::getTypeName<DomainType>();
104 auto mainDatabase = Storage::mainDatabase(mTransaction, typeName);
105 qint64 retrievedRevision = 0;
106 auto bufferAdaptor = EntityReaderUtils::getLatest(mainDatabase, identifier, mDomainTypeAdaptorFactory, retrievedRevision);
107 if (!bufferAdaptor) {
108 return DomainType();
109 }
110 return DomainType(mResourceInstanceIdentifier, identifier, retrievedRevision, bufferAdaptor);
111}
112
113template <class DomainType>
114DomainType EntityReader<DomainType>::readFromKey(const QByteArray &key) const
115{
116 auto typeName = ApplicationDomain::getTypeName<DomainType>();
117 auto mainDatabase = Storage::mainDatabase(mTransaction, typeName);
118 qint64 retrievedRevision = 0;
119 auto bufferAdaptor = EntityReaderUtils::get(mainDatabase, key, mDomainTypeAdaptorFactory, retrievedRevision);
120 const auto identifier = Storage::uidFromKey(key);
121 if (!bufferAdaptor) {
122 return DomainType();
123 }
124 return DomainType(mResourceInstanceIdentifier, identifier, retrievedRevision, bufferAdaptor);
125}
126
127template <class DomainType>
128DomainType EntityReader<DomainType>::readPrevious(const QByteArray &uid, qint64 revision) const
129{
130 auto typeName = ApplicationDomain::getTypeName<DomainType>();
131 auto mainDatabase = Storage::mainDatabase(mTransaction, typeName);
132 qint64 retrievedRevision = 0;
133 auto bufferAdaptor = EntityReaderUtils::getPrevious(mainDatabase, uid, revision, mDomainTypeAdaptorFactory, retrievedRevision);
134 if (!bufferAdaptor) {
135 return DomainType();
136 }
137 return DomainType(mResourceInstanceIdentifier, uid, retrievedRevision, bufferAdaptor);
138}
139
140template <class DomainType>
141void EntityReader<DomainType>::query(const Sink::Query &query, const std::function<bool(const DomainType &)> &callback)
142{
143 executeInitialQuery(query, 0, 0,
144 [&callback](const typename DomainType::Ptr &value, Sink::Operation operation) -> bool {
145 Q_ASSERT(operation == Sink::Operation_Creation);
146 return callback(*value);
147 });
148}
149
150template <class DomainType>
151void EntityReader<DomainType>::readEntity(const Sink::Storage::NamedDatabase &db, const QByteArray &key,
152 const std::function<void(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &, Sink::Operation)> &resultCallback)
153{
154 db.findLatest(key,
155 [=](const QByteArray &key, const QByteArray &value) -> bool {
156 Sink::EntityBuffer buffer(value.data(), value.size());
157 const Sink::Entity &entity = buffer.entity();
158 const auto metadataBuffer = Sink::EntityBuffer::readBuffer<Sink::Metadata>(entity.metadata());
159 const qint64 revision = metadataBuffer ? metadataBuffer->revision() : -1;
160 const auto operation = metadataBuffer ? metadataBuffer->operation() : Sink::Operation_Creation;
161 auto adaptor = mDomainTypeAdaptorFactory.createAdaptor(entity);
162 resultCallback(DomainType::Ptr::create(mResourceInstanceIdentifier, Sink::Storage::uidFromKey(key), revision, adaptor), operation);
163 return false;
164 },
165 [&](const Sink::Storage::Error &error) { Warning() << "Error during query: " << error.message << key; });
166}
167
168static inline ResultSet fullScan(const Sink::Storage::Transaction &transaction, const QByteArray &bufferType)
169{
170 // TODO use a result set with an iterator, to read values on demand
171 Trace() << "Looking for : " << bufferType;
172 //The scan can return duplicate results if we have multiple revisions, so we use a set to deduplicate.
173 QSet<QByteArray> keys;
174 Storage::mainDatabase(transaction, bufferType)
175 .scan(QByteArray(),
176 [&](const QByteArray &key, const QByteArray &value) -> bool {
177 if (keys.contains(Sink::Storage::uidFromKey(key))) {
178 //Not something that should persist if the replay works, so we keep a message for now.
179 Trace() << "Multiple revisions for key: " << key;
180 }
181 keys << Sink::Storage::uidFromKey(key);
182 return true;
183 },
184 [](const Sink::Storage::Error &error) { Warning() << "Error during query: " << error.message; });
185
186 Trace() << "Full scan retrieved " << keys.size() << " results.";
187 return ResultSet(keys.toList().toVector());
188}
189
190template <class DomainType>
191ResultSet EntityReader<DomainType>::loadInitialResultSet(const Sink::Query &query, QSet<QByteArray> &remainingFilters, QByteArray &remainingSorting)
192{
193 if (!query.ids.isEmpty()) {
194 return ResultSet(query.ids.toVector());
195 }
196 QSet<QByteArray> appliedFilters;
197 QByteArray appliedSorting;
198 auto resultSet = Sink::ApplicationDomain::TypeImplementation<DomainType>::queryIndexes(query, mResourceInstanceIdentifier, appliedFilters, appliedSorting, mTransaction);
199 remainingFilters = query.propertyFilter.keys().toSet() - appliedFilters;
200 if (appliedSorting.isEmpty()) {
201 remainingSorting = query.sortProperty;
202 }
203
204 // We do a full scan if there were no indexes available to create the initial set.
205 if (appliedFilters.isEmpty()) {
206 // TODO this should be replaced by an index lookup as well
207 resultSet = fullScan(mTransaction, ApplicationDomain::getTypeName<DomainType>());
208 }
209 return resultSet;
210}
211
212template <class DomainType>
213ResultSet EntityReader<DomainType>::loadIncrementalResultSet(qint64 baseRevision, const Sink::Query &query, QSet<QByteArray> &remainingFilters)
214{
215 const auto bufferType = ApplicationDomain::getTypeName<DomainType>();
216 auto revisionCounter = QSharedPointer<qint64>::create(baseRevision);
217 remainingFilters = query.propertyFilter.keys().toSet();
218 return ResultSet([this, bufferType, revisionCounter]() -> QByteArray {
219 const qint64 topRevision = Sink::Storage::maxRevision(mTransaction);
220 // Spit out the revision keys one by one.
221 while (*revisionCounter <= topRevision) {
222 const auto uid = Sink::Storage::getUidFromRevision(mTransaction, *revisionCounter);
223 const auto type = Sink::Storage::getTypeFromRevision(mTransaction, *revisionCounter);
224 // Trace() << "Revision" << *revisionCounter << type << uid;
225 Q_ASSERT(!uid.isEmpty());
226 Q_ASSERT(!type.isEmpty());
227 if (type != bufferType) {
228 // Skip revision
229 *revisionCounter += 1;
230 continue;
231 }
232 const auto key = Sink::Storage::assembleKey(uid, *revisionCounter);
233 *revisionCounter += 1;
234 return key;
235 }
236 Trace() << "Finished reading incremental result set:" << *revisionCounter;
237 // We're done
238 return QByteArray();
239 });
240}
241
242template <class DomainType>
243ResultSet EntityReader<DomainType>::filterAndSortSet(ResultSet &resultSet, const std::function<bool(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> &filter,
244 const Sink::Storage::NamedDatabase &db, bool initialQuery, const QByteArray &sortProperty)
245{
246 const bool sortingRequired = !sortProperty.isEmpty();
247 if (initialQuery && sortingRequired) {
248 Trace() << "Sorting the resultset in memory according to property: " << sortProperty;
249 // Sort the complete set by reading the sort property and filling into a sorted map
250 auto sortedMap = QSharedPointer<QMap<QByteArray, QByteArray>>::create();
251 while (resultSet.next()) {
252 // readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess)
253 readEntity(db, resultSet.id(),
254 [this, filter, initialQuery, sortedMap, sortProperty, &resultSet](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject, Sink::Operation operation) {
255 // We're not interested in removals during the initial query
256 if ((operation != Sink::Operation_Removal) && filter(domainObject)) {
257 if (!sortProperty.isEmpty()) {
258 const auto sortValue = domainObject->getProperty(sortProperty);
259 if (sortValue.type() == QVariant::DateTime) {
260 sortedMap->insert(QByteArray::number(std::numeric_limits<unsigned int>::max() - sortValue.toDateTime().toTime_t()), domainObject->identifier());
261 } else {
262 sortedMap->insert(sortValue.toString().toLatin1(), domainObject->identifier());
263 }
264 } else {
265 sortedMap->insert(domainObject->identifier(), domainObject->identifier());
266 }
267 }
268 });
269 }
270
271 Trace() << "Sorted " << sortedMap->size() << " values.";
272 auto iterator = QSharedPointer<QMapIterator<QByteArray, QByteArray>>::create(*sortedMap);
273 ResultSet::ValueGenerator generator = [this, iterator, sortedMap, &db, filter, initialQuery](
274 std::function<void(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &, Sink::Operation)> callback) -> bool {
275 if (iterator->hasNext()) {
276 readEntity(db, iterator->next().value(), [this, filter, callback, initialQuery](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject,
277 Sink::Operation operation) { callback(domainObject, Sink::Operation_Creation); });
278 return true;
279 }
280 return false;
281 };
282
283 auto skip = [iterator]() {
284 if (iterator->hasNext()) {
285 iterator->next();
286 }
287 };
288 return ResultSet(generator, skip);
289 } else {
290 auto resultSetPtr = QSharedPointer<ResultSet>::create(resultSet);
291 ResultSet::ValueGenerator generator = [this, resultSetPtr, &db, filter, initialQuery](
292 std::function<void(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &, Sink::Operation)> callback) -> bool {
293 if (resultSetPtr->next()) {
294 // readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess)
295 readEntity(db, resultSetPtr->id(), [this, filter, callback, initialQuery](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject, Sink::Operation operation) {
296 if (initialQuery) {
297 // We're not interested in removals during the initial query
298 if ((operation != Sink::Operation_Removal) && filter(domainObject)) {
299 // In the initial set every entity is new
300 callback(domainObject, Sink::Operation_Creation);
301 }
302 } else {
303 // Always remove removals, they probably don't match due to non-available properties
304 if ((operation == Sink::Operation_Removal) || filter(domainObject)) {
305 // TODO only replay if this is in the currently visible set (or just always replay, worst case we have a couple to many results)
306 callback(domainObject, operation);
307 }
308 }
309 });
310 return true;
311 }
312 return false;
313 };
314 auto skip = [resultSetPtr]() { resultSetPtr->skip(1); };
315 return ResultSet(generator, skip);
316 }
317}
318
319template <class DomainType>
320QPair<qint64, qint64> EntityReader<DomainType>::load(const Sink::Query &query, const std::function<ResultSet(QSet<QByteArray> &, QByteArray &)> &baseSetRetriever, bool initialQuery, int offset, int batchSize, const std::function<bool(const typename DomainType::Ptr &value, Sink::Operation operation)> &callback)
321{
322 QTime time;
323 time.start();
324
325 auto db = Storage::mainDatabase(mTransaction, ApplicationDomain::getTypeName<DomainType>());
326
327 QSet<QByteArray> remainingFilters;
328 QByteArray remainingSorting;
329 auto resultSet = baseSetRetriever(remainingFilters, remainingSorting);
330 Trace() << "Base set retrieved. " << Log::TraceTime(time.elapsed());
331 auto filteredSet = filterAndSortSet(resultSet, getFilter(remainingFilters, query), db, initialQuery, remainingSorting);
332 Trace() << "Filtered set retrieved. " << Log::TraceTime(time.elapsed());
333 auto replayedEntities = replaySet(filteredSet, offset, batchSize, callback);
334 // Trace() << "Filtered set replayed. " << Log::TraceTime(time.elapsed());
335 return qMakePair(Sink::Storage::maxRevision(mTransaction), replayedEntities);
336}
337
338template <class DomainType>
339QPair<qint64, qint64> EntityReader<DomainType>::executeInitialQuery(const Sink::Query &query, int offset, int batchsize, const std::function<bool(const typename DomainType::Ptr &value, Sink::Operation operation)> &callback)
340{
341 QTime time;
342 time.start();
343 auto revisionAndReplayedEntities = load(query, [&](QSet<QByteArray> &remainingFilters, QByteArray &remainingSorting) -> ResultSet {
344 return loadInitialResultSet(query, remainingFilters, remainingSorting);
345 }, true, offset, batchsize, callback);
346 Trace() << "Initial query took: " << Log::TraceTime(time.elapsed());
347 return revisionAndReplayedEntities;
348}
349
350template <class DomainType>
351QPair<qint64, qint64> EntityReader<DomainType>::executeIncrementalQuery(const Sink::Query &query, qint64 lastRevision, const std::function<bool(const typename DomainType::Ptr &value, Sink::Operation operation)> &callback)
352{
353 QTime time;
354 time.start();
355 const qint64 baseRevision = lastRevision + 1;
356 auto revisionAndReplayedEntities = load(query, [&](QSet<QByteArray> &remainingFilters, QByteArray &remainingSorting) -> ResultSet {
357 return loadIncrementalResultSet(baseRevision, query, remainingFilters);
358 }, false, 0, 0, callback);
359 Trace() << "Initial query took: " << Log::TraceTime(time.elapsed());
360 return revisionAndReplayedEntities;
361}
362
363template <class DomainType>
364std::function<bool(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)>
365EntityReader<DomainType>::getFilter(const QSet<QByteArray> remainingFilters, const Sink::Query &query)
366{
367 return [this, remainingFilters, query](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject) -> bool {
368 if (!query.ids.isEmpty()) {
369 if (!query.ids.contains(domainObject->identifier())) {
370 return false;
371 }
372 }
373 for (const auto &filterProperty : remainingFilters) {
374 const auto property = domainObject->getProperty(filterProperty);
375 if (property.isValid()) {
376 const auto comparator = query.propertyFilter.value(filterProperty);
377 if (!comparator.matches(property)) {
378 Trace() << "Filtering entity due to property mismatch on filter: " << filterProperty << property << ":" << comparator.value;
379 return false;
380 }
381 } else {
382 Warning() << "Ignored property filter because value is invalid: " << filterProperty;
383 }
384 }
385 return true;
386 };
387}
388
389template <class DomainType>
390qint64 EntityReader<DomainType>::replaySet(ResultSet &resultSet, int offset, int batchSize, const std::function<bool(const typename DomainType::Ptr &value, Sink::Operation operation)> &callback)
391{
392 Trace() << "Skipping over " << offset << " results";
393 resultSet.skip(offset);
394 int counter = 0;
395 while (!batchSize || (counter < batchSize)) {
396 const bool ret =
397 resultSet.next([&counter, callback](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &value, Sink::Operation operation) -> bool {
398 counter++;
399 return callback(value.staticCast<DomainType>(), operation);
400 });
401 if (!ret) {
402 break;
403 }
404 };
405 Trace() << "Replayed " << counter << " results."
406 << "Limit " << batchSize;
407 return counter;
408}
409
410template class Sink::EntityReader<Sink::ApplicationDomain::Folder>;
411template class Sink::EntityReader<Sink::ApplicationDomain::Mail>;
412template class Sink::EntityReader<Sink::ApplicationDomain::Event>;
diff --git a/common/entityreader.h b/common/entityreader.h
new file mode 100644
index 0000000..a479679
--- /dev/null
+++ b/common/entityreader.h
@@ -0,0 +1,110 @@
1
2/*
3 * Copyright (C) 2016 Christian Mollekopf <mollekopf@kolabsys.com>
4 *
5 * This library is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU Lesser General Public
7 * License as published by the Free Software Foundation; either
8 * version 2.1 of the License, or (at your option) version 3, or any
9 * later version accepted by the membership of KDE e.V. (or its
10 * successor approved by the membership of KDE e.V.), which shall
11 * act as a proxy defined in Section 6 of version 3 of the license.
12 *
13 * This library is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 * Lesser General Public License for more details.
17 *
18 * You should have received a copy of the GNU Lesser General Public
19 * License along with this library. If not, see <http://www.gnu.org/licenses/>.
20 */
21#pragma once
22
23#include "sink_export.h"
24#include <domainadaptor.h>
25
26#include "storage.h"
27#include "resultprovider.h"
28#include "adaptorfactoryregistry.h"
29
30namespace Sink {
31
32namespace EntityReaderUtils {
33 SINK_EXPORT QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> getLatest(const Sink::Storage::NamedDatabase &db, const QByteArray &uid, DomainTypeAdaptorFactoryInterface &adaptorFactory, qint64 &retrievedRevision);
34 SINK_EXPORT QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> get(const Sink::Storage::NamedDatabase &db, const QByteArray &key, DomainTypeAdaptorFactoryInterface &adaptorFactory, qint64 &retrievedRevision);
35 SINK_EXPORT QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> getPrevious(const Sink::Storage::NamedDatabase &db, const QByteArray &uid, qint64 revision, DomainTypeAdaptorFactoryInterface &adaptorFactory, qint64 &retrievedRevision);
36};
37
38/**
39 * A synchronous interface to read entities from the storage.
40 *
41 * All callbacks will be called before the end of the function.
42 * The caller must ensure passed in references remain valid for the lifetime of the object.
43 *
44 * This class is meaent to be instantiated temporarily during reads on the stack.
45 *
46 * Note that all objects returned in callbacks are only valid during the execution of the callback and may start pointing into invalid memory if shallow-copied.
47 */
48template<typename DomainType>
49class SINK_EXPORT EntityReader
50{
51 typedef std::function<bool(const typename DomainType::Ptr &domainObject, Sink::Operation operation)> ResultCallback;
52
53public:
54 EntityReader(const QByteArray &resourceType, const QByteArray &mResourceInstanceIdentifier, Sink::Storage::Transaction &transaction);
55 EntityReader(DomainTypeAdaptorFactoryInterface &domainTypeAdaptorFactory, const QByteArray &resourceInstanceIdentifier, Sink::Storage::Transaction &transaction);
56
57 /**
58 * Reads the latest revision of an entity identified by @param uid
59 */
60 DomainType read(const QByteArray &uid) const;
61
62 /**
63 * Reads the revision of the entity identified by @param key (uid + revision)
64 */
65 DomainType readFromKey(const QByteArray &key) const;
66
67 /**
68 * Reads the (revision - 1) of an entity identified by @param uid
69 */
70 DomainType readPrevious(const QByteArray &uid, qint64 revision) const;
71
72 /**
73 * Reads all entities that match @param query.
74 */
75 void query(const Sink::Query &query, const std::function<bool(const DomainType &)> &callback);
76
77 /**
78 * Returns all entities that match @param query.
79 *
80 * @param offset and @param batchsize can be used to return paginated results.
81 */
82 QPair<qint64, qint64> executeInitialQuery(const Sink::Query &query, int offset, int batchsize, const ResultCallback &callback);
83
84 /**
85 * Returns all changed entities that match @param query starting from @param lastRevision
86 */
87 QPair<qint64, qint64> executeIncrementalQuery(const Sink::Query &query, qint64 lastRevision, const ResultCallback &callback);
88
89private:
90 qint64 replaySet(ResultSet &resultSet, int offset, int batchSize, const ResultCallback &callback);
91
92 void readEntity(const Sink::Storage::NamedDatabase &db, const QByteArray &key,
93 const std::function<void(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &, Sink::Operation)> &resultCallback);
94
95 ResultSet loadInitialResultSet(const Sink::Query &query, QSet<QByteArray> &remainingFilters, QByteArray &remainingSorting);
96 ResultSet loadIncrementalResultSet(qint64 baseRevision, const Sink::Query &query, QSet<QByteArray> &remainingFilters);
97
98 ResultSet filterAndSortSet(ResultSet &resultSet, const std::function<bool(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> &filter,
99 const Sink::Storage::NamedDatabase &db, bool initialQuery, const QByteArray &sortProperty);
100 std::function<bool(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> getFilter(const QSet<QByteArray> remainingFilters, const Sink::Query &query);
101 QPair<qint64, qint64> load(const Sink::Query &query, const std::function<ResultSet(QSet<QByteArray> &, QByteArray &)> &baseSetRetriever, bool initialQuery, int offset, int batchSize, const ResultCallback &callback);
102
103private:
104 QByteArray mResourceInstanceIdentifier;
105 Sink::Storage::Transaction &mTransaction;
106 std::shared_ptr<DomainTypeAdaptorFactoryInterface> mDomainTypeAdaptorFactoryPtr;
107 DomainTypeAdaptorFactoryInterface &mDomainTypeAdaptorFactory;
108};
109
110}
diff --git a/common/entitystore.cpp b/common/entitystore.cpp
index 5296d53..5fb213d 100644
--- a/common/entitystore.cpp
+++ b/common/entitystore.cpp
@@ -28,55 +28,3 @@ EntityStore::EntityStore(const QByteArray &resourceType, const QByteArray &resou
28 28
29} 29}
30 30
31QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> EntityStore::getLatest(const Sink::Storage::NamedDatabase &db, const QByteArray &uid, DomainTypeAdaptorFactoryInterface &adaptorFactory, qint64 &retrievedRevision)
32{
33 QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> current;
34 db.findLatest(uid,
35 [&current, &adaptorFactory, &retrievedRevision](const QByteArray &key, const QByteArray &data) -> bool {
36 Sink::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size());
37 if (!buffer.isValid()) {
38 Warning() << "Read invalid buffer from disk";
39 } else {
40 Trace() << "Found value " << key;
41 current = adaptorFactory.createAdaptor(buffer.entity());
42 retrievedRevision = Sink::Storage::revisionFromKey(key);
43 }
44 return false;
45 },
46 [](const Sink::Storage::Error &error) { Warning() << "Failed to read current value from storage: " << error.message; });
47 return current;
48}
49
50QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> EntityStore::get(const Sink::Storage::NamedDatabase &db, const QByteArray &key, DomainTypeAdaptorFactoryInterface &adaptorFactory, qint64 &retrievedRevision)
51{
52 QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> current;
53 db.scan(key,
54 [&current, &adaptorFactory, &retrievedRevision](const QByteArray &key, const QByteArray &data) -> bool {
55 Sink::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size());
56 if (!buffer.isValid()) {
57 Warning() << "Read invalid buffer from disk";
58 } else {
59 current = adaptorFactory.createAdaptor(buffer.entity());
60 retrievedRevision = Sink::Storage::revisionFromKey(key);
61 }
62 return false;
63 },
64 [](const Sink::Storage::Error &error) { Warning() << "Failed to read current value from storage: " << error.message; });
65 return current;
66}
67
68QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> EntityStore::getPrevious(const Sink::Storage::NamedDatabase &db, const QByteArray &uid, qint64 revision, DomainTypeAdaptorFactoryInterface &adaptorFactory, qint64 &retrievedRevision)
69{
70 QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> current;
71 qint64 latestRevision = 0;
72 db.scan(uid,
73 [&current, &latestRevision, revision](const QByteArray &key, const QByteArray &) -> bool {
74 auto foundRevision = Sink::Storage::revisionFromKey(key);
75 if (foundRevision < revision && foundRevision > latestRevision) {
76 latestRevision = foundRevision;
77 }
78 return true;
79 },
80 [](const Sink::Storage::Error &error) { Warning() << "Failed to read current value from storage: " << error.message; }, true);
81 return get(db, Sink::Storage::assembleKey(uid, latestRevision), adaptorFactory, retrievedRevision);
82}
diff --git a/common/entitystore.h b/common/entitystore.h
index 24f43b1..b795b26 100644
--- a/common/entitystore.h
+++ b/common/entitystore.h
@@ -24,6 +24,7 @@
24 24
25#include "storage.h" 25#include "storage.h"
26#include "adaptorfactoryregistry.h" 26#include "adaptorfactoryregistry.h"
27#include "entityreader.h"
27 28
28namespace Sink { 29namespace Sink {
29 30
@@ -35,48 +36,24 @@ public:
35 template<typename T> 36 template<typename T>
36 T read(const QByteArray &identifier) const 37 T read(const QByteArray &identifier) const
37 { 38 {
38 auto typeName = ApplicationDomain::getTypeName<T>(); 39 EntityReader<T> reader(mResourceType, mResourceInstanceIdentifier, mTransaction);
39 auto mainDatabase = Storage::mainDatabase(mTransaction, typeName); 40 return reader.read(identifier);
40 qint64 retrievedRevision = 0;
41 auto bufferAdaptor = getLatest(mainDatabase, identifier, *Sink::AdaptorFactoryRegistry::instance().getFactory<T>(mResourceType), retrievedRevision);
42 if (!bufferAdaptor) {
43 return T();
44 }
45 return T(mResourceInstanceIdentifier, identifier, retrievedRevision, bufferAdaptor);
46 } 41 }
47 42
48 template<typename T> 43 template<typename T>
49 T readFromKey(const QByteArray &key) const 44 T readFromKey(const QByteArray &key) const
50 { 45 {
51 auto typeName = ApplicationDomain::getTypeName<T>(); 46 EntityReader<T> reader(mResourceType, mResourceInstanceIdentifier, mTransaction);
52 auto mainDatabase = Storage::mainDatabase(mTransaction, typeName); 47 return reader.readFromKey(key);
53 qint64 retrievedRevision = 0;
54 auto bufferAdaptor = get(mainDatabase, key, *Sink::AdaptorFactoryRegistry::instance().getFactory<T>(mResourceType), retrievedRevision);
55 const auto identifier = Storage::uidFromKey(key);
56 if (!bufferAdaptor) {
57 return T();
58 }
59 return T(mResourceInstanceIdentifier, identifier, retrievedRevision, bufferAdaptor);
60 } 48 }
61 49
62 template<typename T> 50 template<typename T>
63 T readPrevious(const QByteArray &uid, qint64 revision) const 51 T readPrevious(const QByteArray &uid, qint64 revision) const
64 { 52 {
65 auto typeName = ApplicationDomain::getTypeName<T>(); 53 EntityReader<T> reader(mResourceType, mResourceInstanceIdentifier, mTransaction);
66 auto mainDatabase = Storage::mainDatabase(mTransaction, typeName); 54 return reader.readPrevious(uid, revision);
67 qint64 retrievedRevision = 0;
68 auto bufferAdaptor = getPrevious(mainDatabase, uid, revision, *Sink::AdaptorFactoryRegistry::instance().getFactory<T>(mResourceType), retrievedRevision);
69 if (!bufferAdaptor) {
70 return T();
71 }
72 return T(mResourceInstanceIdentifier, uid, retrievedRevision, bufferAdaptor);
73 } 55 }
74 56
75
76
77 static QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> getLatest(const Sink::Storage::NamedDatabase &db, const QByteArray &uid, DomainTypeAdaptorFactoryInterface &adaptorFactory, qint64 &retrievedRevision);
78 static QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> get(const Sink::Storage::NamedDatabase &db, const QByteArray &key, DomainTypeAdaptorFactoryInterface &adaptorFactory, qint64 &retrievedRevision);
79 static QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> getPrevious(const Sink::Storage::NamedDatabase &db, const QByteArray &uid, qint64 revision, DomainTypeAdaptorFactoryInterface &adaptorFactory, qint64 &retrievedRevision);
80private: 57private:
81 QByteArray mResourceType; 58 QByteArray mResourceType;
82 QByteArray mResourceInstanceIdentifier; 59 QByteArray mResourceInstanceIdentifier;
diff --git a/common/query.h b/common/query.h
index 8a7edf6..717ed75 100644
--- a/common/query.h
+++ b/common/query.h
@@ -40,6 +40,23 @@ public:
40 }; 40 };
41 Q_DECLARE_FLAGS(Flags, Flag) 41 Q_DECLARE_FLAGS(Flags, Flag)
42 42
43 struct Comparator {
44 enum Comparators {
45 Invalid,
46 Equals,
47 Contains
48 };
49
50 Comparator();
51 Comparator(const QVariant &v);
52 Comparator(const QVariant &v, Comparators c);
53 bool matches(const QVariant &v) const;
54
55 QVariant value;
56 Comparators comparator;
57 };
58
59
43 static Query PropertyFilter(const QByteArray &key, const QVariant &value) 60 static Query PropertyFilter(const QByteArray &key, const QVariant &value)
44 { 61 {
45 Query query; 62 Query query;
@@ -161,6 +178,13 @@ public:
161 } 178 }
162 179
163 template <typename T> 180 template <typename T>
181 Query &filter(const Comparator &comparator)
182 {
183 propertyFilter.insert(T::name, comparator);
184 return *this;
185 }
186
187 template <typename T>
164 Query &filter(const ApplicationDomain::Entity &value) 188 Query &filter(const ApplicationDomain::Entity &value)
165 { 189 {
166 propertyFilter.insert(T::name, QVariant::fromValue(value.identifier())); 190 propertyFilter.insert(T::name, QVariant::fromValue(value.identifier()));
@@ -199,22 +223,6 @@ public:
199 return lhs; 223 return lhs;
200 } 224 }
201 225
202 struct Comparator {
203 enum Comparators {
204 Invalid,
205 Equals,
206 Contains
207 };
208
209 Comparator();
210 Comparator(const QVariant &v);
211 Comparator(const QVariant &v, Comparators c);
212 bool matches(const QVariant &v) const;
213
214 QVariant value;
215 Comparators comparator;
216 };
217
218 QByteArrayList resources; 226 QByteArrayList resources;
219 QByteArrayList accounts; 227 QByteArrayList accounts;
220 QByteArrayList ids; 228 QByteArrayList ids;
diff --git a/common/queryrunner.cpp b/common/queryrunner.cpp
index cb8157e..c6a6b86 100644
--- a/common/queryrunner.cpp
+++ b/common/queryrunner.cpp
@@ -27,6 +27,7 @@
27#include "definitions.h" 27#include "definitions.h"
28#include "domainadaptor.h" 28#include "domainadaptor.h"
29#include "asyncutils.h" 29#include "asyncutils.h"
30#include "entityreader.h"
30 31
31#undef DEBUG_AREA 32#undef DEBUG_AREA
32#define DEBUG_AREA "client.queryrunner" 33#define DEBUG_AREA "client.queryrunner"
@@ -51,27 +52,13 @@ public:
51 QPair<qint64, qint64> executeInitialQuery(const Sink::Query &query, const typename DomainType::Ptr &parent, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, int offset, int batchsize); 52 QPair<qint64, qint64> executeInitialQuery(const Sink::Query &query, const typename DomainType::Ptr &parent, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, int offset, int batchsize);
52 53
53private: 54private:
54 qint64 replaySet(ResultSet &resultSet, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, const QList<QByteArray> &properties, int offset, int batchSize); 55 Storage::Transaction getTransaction();
56 std::function<bool(const typename DomainType::Ptr &, Sink::Operation)> resultProviderCallback(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider);
55 57
56 void readEntity(const Sink::Storage::NamedDatabase &db, const QByteArray &key,
57 const std::function<void(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &, Sink::Operation)> &resultCallback);
58
59 ResultSet loadInitialResultSet(const Sink::Query &query, Sink::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters, QByteArray &remainingSorting);
60 ResultSet loadIncrementalResultSet(qint64 baseRevision, const Sink::Query &query, Sink::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters);
61
62 ResultSet filterAndSortSet(ResultSet &resultSet, const std::function<bool(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> &filter,
63 const Sink::Storage::NamedDatabase &db, bool initialQuery, const QByteArray &sortProperty);
64 std::function<bool(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> getFilter(const QSet<QByteArray> remainingFilters, const Sink::Query &query);
65 QPair<qint64, qint64> load(const Sink::Query &query, const std::function<ResultSet(Sink::Storage::Transaction &, QSet<QByteArray> &, QByteArray &)> &baseSetRetriever,
66 Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, bool initialQuery, int offset, int batchSize);
67
68private:
69 QueryRunnerBase::ResultTransformation mResultTransformation; 58 QueryRunnerBase::ResultTransformation mResultTransformation;
70 DomainTypeAdaptorFactoryInterface::Ptr mDomainTypeAdaptorFactory; 59 DomainTypeAdaptorFactoryInterface::Ptr mDomainTypeAdaptorFactory;
71 QByteArray mResourceInstanceIdentifier; 60 QByteArray mResourceInstanceIdentifier;
72 QByteArray mBufferType;
73 QByteArray mId; //Used for identification in debug output 61 QByteArray mId; //Used for identification in debug output
74 Sink::Query mQuery;
75}; 62};
76 63
77#undef Trace 64#undef Trace
@@ -147,35 +134,13 @@ typename Sink::ResultEmitter<typename DomainType::Ptr>::Ptr QueryRunner<DomainTy
147} 134}
148 135
149 136
150static inline ResultSet fullScan(const Sink::Storage::Transaction &transaction, const QByteArray &bufferType)
151{
152 // TODO use a result set with an iterator, to read values on demand
153 Trace() << "Looking for : " << bufferType;
154 //The scan can return duplicate results if we have multiple revisions, so we use a set to deduplicate.
155 QSet<QByteArray> keys;
156 Storage::mainDatabase(transaction, bufferType)
157 .scan(QByteArray(),
158 [&](const QByteArray &key, const QByteArray &value) -> bool {
159 if (keys.contains(Sink::Storage::uidFromKey(key))) {
160 //Not something that should persist if the replay works, so we keep a message for now.
161 Trace() << "Multiple revisions for key: " << key;
162 }
163 keys << Sink::Storage::uidFromKey(key);
164 return true;
165 },
166 [](const Sink::Storage::Error &error) { Warning() << "Error during query: " << error.message; });
167
168 Trace() << "Full scan retrieved " << keys.size() << " results.";
169 return ResultSet(keys.toList().toVector());
170}
171
172#undef Trace 137#undef Trace
173#define Trace() Trace_area("client.queryrunner." + mId) 138#define Trace() Trace_area("client.queryrunner." + mId)
174 139
175template <class DomainType> 140template <class DomainType>
176QueryWorker<DomainType>::QueryWorker(const Sink::Query &query, const QByteArray &instanceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &factory, 141QueryWorker<DomainType>::QueryWorker(const Sink::Query &query, const QByteArray &instanceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &factory,
177 const QByteArray &bufferType, const QueryRunnerBase::ResultTransformation &transformation) 142 const QByteArray &bufferType, const QueryRunnerBase::ResultTransformation &transformation)
178 : QObject(), mResultTransformation(transformation), mDomainTypeAdaptorFactory(factory), mResourceInstanceIdentifier(instanceIdentifier), mBufferType(bufferType), mId(QUuid::createUuid().toByteArray()), mQuery(query) 143 : QObject(), mResultTransformation(transformation), mDomainTypeAdaptorFactory(factory), mResourceInstanceIdentifier(instanceIdentifier), mId(QUuid::createUuid().toByteArray())
179{ 144{
180 Trace() << "Starting query worker"; 145 Trace() << "Starting query worker";
181} 146}
@@ -187,228 +152,48 @@ QueryWorker<DomainType>::~QueryWorker()
187} 152}
188 153
189template <class DomainType> 154template <class DomainType>
190qint64 QueryWorker<DomainType>::replaySet(ResultSet &resultSet, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, const QList<QByteArray> &properties, int offset, int batchSize) 155std::function<bool(const typename DomainType::Ptr &, Sink::Operation)> QueryWorker<DomainType>::resultProviderCallback(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider)
191{ 156{
192 Trace() << "Skipping over " << offset << " results"; 157 return [this, &query, &resultProvider](const typename DomainType::Ptr &domainObject, Sink::Operation operation) -> bool {
193 resultSet.skip(offset); 158 auto valueCopy = Sink::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<DomainType>(*domainObject, query.requestedProperties).template staticCast<DomainType>();
194 int counter = 0; 159 if (mResultTransformation) {
195 while (!batchSize || (counter < batchSize)) { 160 mResultTransformation(*valueCopy);
196 const bool ret =
197 resultSet.next([this, &resultProvider, &counter, &properties, batchSize](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &value, Sink::Operation operation) -> bool {
198 counter++;
199 auto valueCopy = Sink::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<DomainType>(*value, properties).template staticCast<DomainType>();
200 if (mResultTransformation) {
201 mResultTransformation(*valueCopy);
202 }
203 switch (operation) {
204 case Sink::Operation_Creation:
205 // Trace() << "Got creation";
206 resultProvider.add(valueCopy);
207 break;
208 case Sink::Operation_Modification:
209 // Trace() << "Got modification";
210 resultProvider.modify(valueCopy);
211 break;
212 case Sink::Operation_Removal:
213 // Trace() << "Got removal";
214 resultProvider.remove(valueCopy);
215 break;
216 }
217 return true;
218 });
219 if (!ret) {
220 break;
221 } 161 }
222 }; 162 switch (operation) {
223 Trace() << "Replayed " << counter << " results." 163 case Sink::Operation_Creation:
224 << "Limit " << batchSize; 164 // Trace() << "Got creation";
225 return counter; 165 resultProvider.add(valueCopy);
226} 166 break;
227 167 case Sink::Operation_Modification:
228template <class DomainType> 168 // Trace() << "Got modification";
229void QueryWorker<DomainType>::readEntity(const Sink::Storage::NamedDatabase &db, const QByteArray &key, 169 resultProvider.modify(valueCopy);
230 const std::function<void(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &, Sink::Operation)> &resultCallback) 170 break;
231{ 171 case Sink::Operation_Removal:
232 // This only works for a 1:1 mapping of resource to domain types. 172 // Trace() << "Got removal";
233 // Not i.e. for tags that are stored as flags in each entity of an imap store. 173 resultProvider.remove(valueCopy);
234 // additional properties that don't have a 1:1 mapping (such as separately stored tags), 174 break;
235 // could be added to the adaptor.
236 db.findLatest(key,
237 [=](const QByteArray &key, const QByteArray &value) -> bool {
238 Sink::EntityBuffer buffer(value.data(), value.size());
239 const Sink::Entity &entity = buffer.entity();
240 const auto metadataBuffer = Sink::EntityBuffer::readBuffer<Sink::Metadata>(entity.metadata());
241 const qint64 revision = metadataBuffer ? metadataBuffer->revision() : -1;
242 const auto operation = metadataBuffer ? metadataBuffer->operation() : Sink::Operation_Creation;
243 auto adaptor = mDomainTypeAdaptorFactory->createAdaptor(entity);
244 resultCallback(DomainType::Ptr::create(mResourceInstanceIdentifier, Sink::Storage::uidFromKey(key), revision, adaptor), operation);
245 return false;
246 },
247 [&](const Sink::Storage::Error &error) { Warning() << "Error during query: " << error.message << key; });
248}
249
250template <class DomainType>
251ResultSet QueryWorker<DomainType>::loadInitialResultSet(const Sink::Query &query, Sink::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters, QByteArray &remainingSorting)
252{
253 if (!query.ids.isEmpty()) {
254 return ResultSet(query.ids.toVector());
255 }
256 QSet<QByteArray> appliedFilters;
257 QByteArray appliedSorting;
258 auto resultSet = Sink::ApplicationDomain::TypeImplementation<DomainType>::queryIndexes(query, mResourceInstanceIdentifier, appliedFilters, appliedSorting, transaction);
259 remainingFilters = query.propertyFilter.keys().toSet() - appliedFilters;
260 if (appliedSorting.isEmpty()) {
261 remainingSorting = query.sortProperty;
262 }
263
264 // We do a full scan if there were no indexes available to create the initial set.
265 if (appliedFilters.isEmpty()) {
266 // TODO this should be replaced by an index lookup as well
267 resultSet = fullScan(transaction, mBufferType);
268 }
269 return resultSet;
270}
271
272template <class DomainType>
273ResultSet QueryWorker<DomainType>::loadIncrementalResultSet(qint64 baseRevision, const Sink::Query &query, Sink::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters)
274{
275 const auto bufferType = mBufferType;
276 auto revisionCounter = QSharedPointer<qint64>::create(baseRevision);
277 remainingFilters = query.propertyFilter.keys().toSet();
278 return ResultSet([this, bufferType, revisionCounter, &transaction]() -> QByteArray {
279 const qint64 topRevision = Sink::Storage::maxRevision(transaction);
280 // Spit out the revision keys one by one.
281 while (*revisionCounter <= topRevision) {
282 const auto uid = Sink::Storage::getUidFromRevision(transaction, *revisionCounter);
283 const auto type = Sink::Storage::getTypeFromRevision(transaction, *revisionCounter);
284 // Trace() << "Revision" << *revisionCounter << type << uid;
285 Q_ASSERT(!uid.isEmpty());
286 Q_ASSERT(!type.isEmpty());
287 if (type != bufferType) {
288 // Skip revision
289 *revisionCounter += 1;
290 continue;
291 }
292 const auto key = Sink::Storage::assembleKey(uid, *revisionCounter);
293 *revisionCounter += 1;
294 return key;
295 } 175 }
296 Trace() << "Finished reading incremental result set:" << *revisionCounter; 176 return true;
297 // We're done 177 };
298 return QByteArray();
299 });
300} 178}
301 179
302template <class DomainType> 180template <class DomainType>
303ResultSet QueryWorker<DomainType>::filterAndSortSet(ResultSet &resultSet, const std::function<bool(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> &filter, 181QPair<qint64, qint64> QueryWorker<DomainType>::executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider)
304 const Sink::Storage::NamedDatabase &db, bool initialQuery, const QByteArray &sortProperty)
305{ 182{
306 const bool sortingRequired = !sortProperty.isEmpty(); 183 QTime time;
307 if (initialQuery && sortingRequired) { 184 time.start();
308 Trace() << "Sorting the resultset in memory according to property: " << sortProperty;
309 // Sort the complete set by reading the sort property and filling into a sorted map
310 auto sortedMap = QSharedPointer<QMap<QByteArray, QByteArray>>::create();
311 while (resultSet.next()) {
312 // readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess)
313 readEntity(db, resultSet.id(),
314 [this, filter, initialQuery, sortedMap, sortProperty, &resultSet](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject, Sink::Operation operation) {
315 // We're not interested in removals during the initial query
316 if ((operation != Sink::Operation_Removal) && filter(domainObject)) {
317 if (!sortProperty.isEmpty()) {
318 const auto sortValue = domainObject->getProperty(sortProperty);
319 if (sortValue.type() == QVariant::DateTime) {
320 sortedMap->insert(QByteArray::number(std::numeric_limits<unsigned int>::max() - sortValue.toDateTime().toTime_t()), domainObject->identifier());
321 } else {
322 sortedMap->insert(sortValue.toString().toLatin1(), domainObject->identifier());
323 }
324 } else {
325 sortedMap->insert(domainObject->identifier(), domainObject->identifier());
326 }
327 }
328 });
329 }
330 185
331 Trace() << "Sorted " << sortedMap->size() << " values."; 186 auto transaction = getTransaction();
332 auto iterator = QSharedPointer<QMapIterator<QByteArray, QByteArray>>::create(*sortedMap);
333 ResultSet::ValueGenerator generator = [this, iterator, sortedMap, &db, filter, initialQuery](
334 std::function<void(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &, Sink::Operation)> callback) -> bool {
335 if (iterator->hasNext()) {
336 readEntity(db, iterator->next().value(), [this, filter, callback, initialQuery](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject,
337 Sink::Operation operation) { callback(domainObject, Sink::Operation_Creation); });
338 return true;
339 }
340 return false;
341 };
342
343 auto skip = [iterator]() {
344 if (iterator->hasNext()) {
345 iterator->next();
346 }
347 };
348 return ResultSet(generator, skip);
349 } else {
350 auto resultSetPtr = QSharedPointer<ResultSet>::create(resultSet);
351 ResultSet::ValueGenerator generator = [this, resultSetPtr, &db, filter, initialQuery](
352 std::function<void(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &, Sink::Operation)> callback) -> bool {
353 if (resultSetPtr->next()) {
354 // readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess)
355 readEntity(db, resultSetPtr->id(), [this, filter, callback, initialQuery](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject, Sink::Operation operation) {
356 if (initialQuery) {
357 // We're not interested in removals during the initial query
358 if ((operation != Sink::Operation_Removal) && filter(domainObject)) {
359 // In the initial set every entity is new
360 callback(domainObject, Sink::Operation_Creation);
361 }
362 } else {
363 // Always remove removals, they probably don't match due to non-available properties
364 if ((operation == Sink::Operation_Removal) || filter(domainObject)) {
365 // TODO only replay if this is in the currently visible set (or just always replay, worst case we have a couple to many results)
366 callback(domainObject, operation);
367 }
368 }
369 });
370 return true;
371 }
372 return false;
373 };
374 auto skip = [resultSetPtr]() { resultSetPtr->skip(1); };
375 return ResultSet(generator, skip);
376 }
377}
378 187
379template <class DomainType> 188 Sink::EntityReader<DomainType> reader(*mDomainTypeAdaptorFactory, mResourceInstanceIdentifier, transaction);
380std::function<bool(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> 189 auto revisionAndReplayedEntities = reader.executeIncrementalQuery(query, resultProvider.revision(), resultProviderCallback(query, resultProvider));
381QueryWorker<DomainType>::getFilter(const QSet<QByteArray> remainingFilters, const Sink::Query &query) 190 Trace() << "Incremental query took: " << Log::TraceTime(time.elapsed());
382{ 191 return revisionAndReplayedEntities;
383 return [this, remainingFilters, query](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject) -> bool {
384 if (!query.ids.isEmpty()) {
385 if (!query.ids.contains(domainObject->identifier())) {
386 return false;
387 }
388 }
389 for (const auto &filterProperty : remainingFilters) {
390 const auto property = domainObject->getProperty(filterProperty);
391 if (property.isValid()) {
392 const auto comparator = query.propertyFilter.value(filterProperty);
393 if (!comparator.matches(property)) {
394 Trace() << "Filtering entity due to property mismatch on filter: " << filterProperty << property << ":" << comparator.value;
395 return false;
396 }
397 } else {
398 Warning() << "Ignored property filter because value is invalid: " << filterProperty;
399 }
400 }
401 return true;
402 };
403} 192}
404 193
405template <class DomainType> 194template <class DomainType>
406QPair<qint64, qint64> QueryWorker<DomainType>::load(const Sink::Query &query, const std::function<ResultSet(Sink::Storage::Transaction &, QSet<QByteArray> &, QByteArray &)> &baseSetRetriever, 195Storage::Transaction QueryWorker<DomainType>::getTransaction()
407 Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, bool initialQuery, int offset, int batchSize)
408{ 196{
409 QTime time;
410 time.start();
411
412 Sink::Storage::Transaction transaction; 197 Sink::Storage::Transaction transaction;
413 { 198 {
414 Sink::Storage storage(Sink::storageLocation(), mResourceInstanceIdentifier); 199 Sink::Storage storage(Sink::storageLocation(), mResourceInstanceIdentifier);
@@ -422,33 +207,7 @@ QPair<qint64, qint64> QueryWorker<DomainType>::load(const Sink::Query &query, co
422 Sink::Storage storage(Sink::storageLocation(), mResourceInstanceIdentifier); 207 Sink::Storage storage(Sink::storageLocation(), mResourceInstanceIdentifier);
423 transaction = storage.createTransaction(Sink::Storage::ReadOnly); 208 transaction = storage.createTransaction(Sink::Storage::ReadOnly);
424 } 209 }
425 auto db = Storage::mainDatabase(transaction, mBufferType); 210 return transaction;
426
427 QSet<QByteArray> remainingFilters;
428 QByteArray remainingSorting;
429 auto resultSet = baseSetRetriever(transaction, remainingFilters, remainingSorting);
430 Trace() << "Base set retrieved. " << Log::TraceTime(time.elapsed());
431 auto filteredSet = filterAndSortSet(resultSet, getFilter(remainingFilters, query), db, initialQuery, remainingSorting);
432 Trace() << "Filtered set retrieved. " << Log::TraceTime(time.elapsed());
433 auto replayedEntities = replaySet(filteredSet, resultProvider, query.requestedProperties, offset, batchSize);
434 Trace() << "Filtered set replayed. " << Log::TraceTime(time.elapsed());
435 resultProvider.setRevision(Sink::Storage::maxRevision(transaction));
436 return qMakePair(Sink::Storage::maxRevision(transaction), replayedEntities);
437}
438
439template <class DomainType>
440QPair<qint64, qint64> QueryWorker<DomainType>::executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider)
441{
442 QTime time;
443 time.start();
444
445 const qint64 baseRevision = resultProvider.revision() + 1;
446 Trace() << "Running incremental query " << baseRevision;
447 auto revisionAndReplayedEntities = load(query, [&](Sink::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters, QByteArray &remainingSorting) -> ResultSet {
448 return loadIncrementalResultSet(baseRevision, query, transaction, remainingFilters);
449 }, resultProvider, false, 0, 0);
450 Trace() << "Incremental query took: " << Log::TraceTime(time.elapsed());
451 return revisionAndReplayedEntities;
452} 211}
453 212
454template <class DomainType> 213template <class DomainType>
@@ -468,9 +227,11 @@ QPair<qint64, qint64> QueryWorker<DomainType>::executeInitialQuery(
468 modifiedQuery.propertyFilter.insert(query.parentProperty, Query::Comparator(QVariant())); 227 modifiedQuery.propertyFilter.insert(query.parentProperty, Query::Comparator(QVariant()));
469 } 228 }
470 } 229 }
471 auto revisionAndReplayedEntities = load(modifiedQuery, [&](Sink::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters, QByteArray &remainingSorting) -> ResultSet { 230
472 return loadInitialResultSet(modifiedQuery, transaction, remainingFilters, remainingSorting); 231 auto transaction = getTransaction();
473 }, resultProvider, true, offset, batchsize); 232
233 Sink::EntityReader<DomainType> reader(*mDomainTypeAdaptorFactory, mResourceInstanceIdentifier, transaction);
234 auto revisionAndReplayedEntities = reader.executeInitialQuery(modifiedQuery, offset, batchsize, resultProviderCallback(query, resultProvider));
474 Trace() << "Initial query took: " << Log::TraceTime(time.elapsed()); 235 Trace() << "Initial query took: " << Log::TraceTime(time.elapsed());
475 resultProvider.initialResultSetComplete(parent); 236 resultProvider.initialResultSetComplete(parent);
476 return revisionAndReplayedEntities; 237 return revisionAndReplayedEntities;
diff --git a/common/synchronizer.cpp b/common/synchronizer.cpp
index 1bac5d9..0314997 100644
--- a/common/synchronizer.cpp
+++ b/common/synchronizer.cpp
@@ -145,7 +145,7 @@ void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray
145 sinkId, bufferType, entity, *adaptorFactory, [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::CreateEntityCommand, buffer); }); 145 sinkId, bufferType, entity, *adaptorFactory, [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::CreateEntityCommand, buffer); });
146 } else { // modification 146 } else { // modification
147 qint64 retrievedRevision = 0; 147 qint64 retrievedRevision = 0;
148 if (auto current = store().getLatest(mainDatabase, sinkId, *adaptorFactory, retrievedRevision)) { 148 if (auto current = EntityReaderUtils::getLatest(mainDatabase, sinkId, *adaptorFactory, retrievedRevision)) {
149 bool changed = false; 149 bool changed = false;
150 for (const auto &property : entity.changedProperties()) { 150 for (const auto &property : entity.changedProperties()) {
151 if (entity.getProperty(property) != current->getProperty(property)) { 151 if (entity.getProperty(property) != current->getProperty(property)) {