summaryrefslogtreecommitdiffstats
path: root/common/queryrunner.cpp
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2016-06-09 17:27:29 +0200
committerChristian Mollekopf <chrigi_1@fastmail.fm>2016-06-09 17:27:29 +0200
commit555c373a0c4dfe386dcd2c88ae9548d95e307409 (patch)
tree57dd13d7e6d6b5c5929f15f4fafc19881deb84c0 /common/queryrunner.cpp
parent32a6f2ad3b66431c157e00ba5b1fb40c05e3c407 (diff)
downloadsink-555c373a0c4dfe386dcd2c88ae9548d95e307409.tar.gz
sink-555c373a0c4dfe386dcd2c88ae9548d95e307409.zip
Moved query logic to EntityReader to make it reusable in the resource.
Diffstat (limited to 'common/queryrunner.cpp')
-rw-r--r--common/queryrunner.cpp317
1 files changed, 39 insertions, 278 deletions
diff --git a/common/queryrunner.cpp b/common/queryrunner.cpp
index cb8157e..c6a6b86 100644
--- a/common/queryrunner.cpp
+++ b/common/queryrunner.cpp
@@ -27,6 +27,7 @@
27#include "definitions.h" 27#include "definitions.h"
28#include "domainadaptor.h" 28#include "domainadaptor.h"
29#include "asyncutils.h" 29#include "asyncutils.h"
30#include "entityreader.h"
30 31
31#undef DEBUG_AREA 32#undef DEBUG_AREA
32#define DEBUG_AREA "client.queryrunner" 33#define DEBUG_AREA "client.queryrunner"
@@ -51,27 +52,13 @@ public:
51 QPair<qint64, qint64> executeInitialQuery(const Sink::Query &query, const typename DomainType::Ptr &parent, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, int offset, int batchsize); 52 QPair<qint64, qint64> executeInitialQuery(const Sink::Query &query, const typename DomainType::Ptr &parent, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, int offset, int batchsize);
52 53
53private: 54private:
54 qint64 replaySet(ResultSet &resultSet, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, const QList<QByteArray> &properties, int offset, int batchSize); 55 Storage::Transaction getTransaction();
56 std::function<bool(const typename DomainType::Ptr &, Sink::Operation)> resultProviderCallback(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider);
55 57
56 void readEntity(const Sink::Storage::NamedDatabase &db, const QByteArray &key,
57 const std::function<void(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &, Sink::Operation)> &resultCallback);
58
59 ResultSet loadInitialResultSet(const Sink::Query &query, Sink::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters, QByteArray &remainingSorting);
60 ResultSet loadIncrementalResultSet(qint64 baseRevision, const Sink::Query &query, Sink::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters);
61
62 ResultSet filterAndSortSet(ResultSet &resultSet, const std::function<bool(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> &filter,
63 const Sink::Storage::NamedDatabase &db, bool initialQuery, const QByteArray &sortProperty);
64 std::function<bool(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> getFilter(const QSet<QByteArray> remainingFilters, const Sink::Query &query);
65 QPair<qint64, qint64> load(const Sink::Query &query, const std::function<ResultSet(Sink::Storage::Transaction &, QSet<QByteArray> &, QByteArray &)> &baseSetRetriever,
66 Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, bool initialQuery, int offset, int batchSize);
67
68private:
69 QueryRunnerBase::ResultTransformation mResultTransformation; 58 QueryRunnerBase::ResultTransformation mResultTransformation;
70 DomainTypeAdaptorFactoryInterface::Ptr mDomainTypeAdaptorFactory; 59 DomainTypeAdaptorFactoryInterface::Ptr mDomainTypeAdaptorFactory;
71 QByteArray mResourceInstanceIdentifier; 60 QByteArray mResourceInstanceIdentifier;
72 QByteArray mBufferType;
73 QByteArray mId; //Used for identification in debug output 61 QByteArray mId; //Used for identification in debug output
74 Sink::Query mQuery;
75}; 62};
76 63
77#undef Trace 64#undef Trace
@@ -147,35 +134,13 @@ typename Sink::ResultEmitter<typename DomainType::Ptr>::Ptr QueryRunner<DomainTy
147} 134}
148 135
149 136
150static inline ResultSet fullScan(const Sink::Storage::Transaction &transaction, const QByteArray &bufferType)
151{
152 // TODO use a result set with an iterator, to read values on demand
153 Trace() << "Looking for : " << bufferType;
154 //The scan can return duplicate results if we have multiple revisions, so we use a set to deduplicate.
155 QSet<QByteArray> keys;
156 Storage::mainDatabase(transaction, bufferType)
157 .scan(QByteArray(),
158 [&](const QByteArray &key, const QByteArray &value) -> bool {
159 if (keys.contains(Sink::Storage::uidFromKey(key))) {
160 //Not something that should persist if the replay works, so we keep a message for now.
161 Trace() << "Multiple revisions for key: " << key;
162 }
163 keys << Sink::Storage::uidFromKey(key);
164 return true;
165 },
166 [](const Sink::Storage::Error &error) { Warning() << "Error during query: " << error.message; });
167
168 Trace() << "Full scan retrieved " << keys.size() << " results.";
169 return ResultSet(keys.toList().toVector());
170}
171
172#undef Trace 137#undef Trace
173#define Trace() Trace_area("client.queryrunner." + mId) 138#define Trace() Trace_area("client.queryrunner." + mId)
174 139
175template <class DomainType> 140template <class DomainType>
176QueryWorker<DomainType>::QueryWorker(const Sink::Query &query, const QByteArray &instanceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &factory, 141QueryWorker<DomainType>::QueryWorker(const Sink::Query &query, const QByteArray &instanceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &factory,
177 const QByteArray &bufferType, const QueryRunnerBase::ResultTransformation &transformation) 142 const QByteArray &bufferType, const QueryRunnerBase::ResultTransformation &transformation)
178 : QObject(), mResultTransformation(transformation), mDomainTypeAdaptorFactory(factory), mResourceInstanceIdentifier(instanceIdentifier), mBufferType(bufferType), mId(QUuid::createUuid().toByteArray()), mQuery(query) 143 : QObject(), mResultTransformation(transformation), mDomainTypeAdaptorFactory(factory), mResourceInstanceIdentifier(instanceIdentifier), mId(QUuid::createUuid().toByteArray())
179{ 144{
180 Trace() << "Starting query worker"; 145 Trace() << "Starting query worker";
181} 146}
@@ -187,228 +152,48 @@ QueryWorker<DomainType>::~QueryWorker()
187} 152}
188 153
189template <class DomainType> 154template <class DomainType>
190qint64 QueryWorker<DomainType>::replaySet(ResultSet &resultSet, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, const QList<QByteArray> &properties, int offset, int batchSize) 155std::function<bool(const typename DomainType::Ptr &, Sink::Operation)> QueryWorker<DomainType>::resultProviderCallback(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider)
191{ 156{
192 Trace() << "Skipping over " << offset << " results"; 157 return [this, &query, &resultProvider](const typename DomainType::Ptr &domainObject, Sink::Operation operation) -> bool {
193 resultSet.skip(offset); 158 auto valueCopy = Sink::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<DomainType>(*domainObject, query.requestedProperties).template staticCast<DomainType>();
194 int counter = 0; 159 if (mResultTransformation) {
195 while (!batchSize || (counter < batchSize)) { 160 mResultTransformation(*valueCopy);
196 const bool ret =
197 resultSet.next([this, &resultProvider, &counter, &properties, batchSize](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &value, Sink::Operation operation) -> bool {
198 counter++;
199 auto valueCopy = Sink::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<DomainType>(*value, properties).template staticCast<DomainType>();
200 if (mResultTransformation) {
201 mResultTransformation(*valueCopy);
202 }
203 switch (operation) {
204 case Sink::Operation_Creation:
205 // Trace() << "Got creation";
206 resultProvider.add(valueCopy);
207 break;
208 case Sink::Operation_Modification:
209 // Trace() << "Got modification";
210 resultProvider.modify(valueCopy);
211 break;
212 case Sink::Operation_Removal:
213 // Trace() << "Got removal";
214 resultProvider.remove(valueCopy);
215 break;
216 }
217 return true;
218 });
219 if (!ret) {
220 break;
221 } 161 }
222 }; 162 switch (operation) {
223 Trace() << "Replayed " << counter << " results." 163 case Sink::Operation_Creation:
224 << "Limit " << batchSize; 164 // Trace() << "Got creation";
225 return counter; 165 resultProvider.add(valueCopy);
226} 166 break;
227 167 case Sink::Operation_Modification:
228template <class DomainType> 168 // Trace() << "Got modification";
229void QueryWorker<DomainType>::readEntity(const Sink::Storage::NamedDatabase &db, const QByteArray &key, 169 resultProvider.modify(valueCopy);
230 const std::function<void(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &, Sink::Operation)> &resultCallback) 170 break;
231{ 171 case Sink::Operation_Removal:
232 // This only works for a 1:1 mapping of resource to domain types. 172 // Trace() << "Got removal";
233 // Not i.e. for tags that are stored as flags in each entity of an imap store. 173 resultProvider.remove(valueCopy);
234 // additional properties that don't have a 1:1 mapping (such as separately stored tags), 174 break;
235 // could be added to the adaptor.
236 db.findLatest(key,
237 [=](const QByteArray &key, const QByteArray &value) -> bool {
238 Sink::EntityBuffer buffer(value.data(), value.size());
239 const Sink::Entity &entity = buffer.entity();
240 const auto metadataBuffer = Sink::EntityBuffer::readBuffer<Sink::Metadata>(entity.metadata());
241 const qint64 revision = metadataBuffer ? metadataBuffer->revision() : -1;
242 const auto operation = metadataBuffer ? metadataBuffer->operation() : Sink::Operation_Creation;
243 auto adaptor = mDomainTypeAdaptorFactory->createAdaptor(entity);
244 resultCallback(DomainType::Ptr::create(mResourceInstanceIdentifier, Sink::Storage::uidFromKey(key), revision, adaptor), operation);
245 return false;
246 },
247 [&](const Sink::Storage::Error &error) { Warning() << "Error during query: " << error.message << key; });
248}
249
250template <class DomainType>
251ResultSet QueryWorker<DomainType>::loadInitialResultSet(const Sink::Query &query, Sink::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters, QByteArray &remainingSorting)
252{
253 if (!query.ids.isEmpty()) {
254 return ResultSet(query.ids.toVector());
255 }
256 QSet<QByteArray> appliedFilters;
257 QByteArray appliedSorting;
258 auto resultSet = Sink::ApplicationDomain::TypeImplementation<DomainType>::queryIndexes(query, mResourceInstanceIdentifier, appliedFilters, appliedSorting, transaction);
259 remainingFilters = query.propertyFilter.keys().toSet() - appliedFilters;
260 if (appliedSorting.isEmpty()) {
261 remainingSorting = query.sortProperty;
262 }
263
264 // We do a full scan if there were no indexes available to create the initial set.
265 if (appliedFilters.isEmpty()) {
266 // TODO this should be replaced by an index lookup as well
267 resultSet = fullScan(transaction, mBufferType);
268 }
269 return resultSet;
270}
271
272template <class DomainType>
273ResultSet QueryWorker<DomainType>::loadIncrementalResultSet(qint64 baseRevision, const Sink::Query &query, Sink::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters)
274{
275 const auto bufferType = mBufferType;
276 auto revisionCounter = QSharedPointer<qint64>::create(baseRevision);
277 remainingFilters = query.propertyFilter.keys().toSet();
278 return ResultSet([this, bufferType, revisionCounter, &transaction]() -> QByteArray {
279 const qint64 topRevision = Sink::Storage::maxRevision(transaction);
280 // Spit out the revision keys one by one.
281 while (*revisionCounter <= topRevision) {
282 const auto uid = Sink::Storage::getUidFromRevision(transaction, *revisionCounter);
283 const auto type = Sink::Storage::getTypeFromRevision(transaction, *revisionCounter);
284 // Trace() << "Revision" << *revisionCounter << type << uid;
285 Q_ASSERT(!uid.isEmpty());
286 Q_ASSERT(!type.isEmpty());
287 if (type != bufferType) {
288 // Skip revision
289 *revisionCounter += 1;
290 continue;
291 }
292 const auto key = Sink::Storage::assembleKey(uid, *revisionCounter);
293 *revisionCounter += 1;
294 return key;
295 } 175 }
296 Trace() << "Finished reading incremental result set:" << *revisionCounter; 176 return true;
297 // We're done 177 };
298 return QByteArray();
299 });
300} 178}
301 179
302template <class DomainType> 180template <class DomainType>
303ResultSet QueryWorker<DomainType>::filterAndSortSet(ResultSet &resultSet, const std::function<bool(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> &filter, 181QPair<qint64, qint64> QueryWorker<DomainType>::executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider)
304 const Sink::Storage::NamedDatabase &db, bool initialQuery, const QByteArray &sortProperty)
305{ 182{
306 const bool sortingRequired = !sortProperty.isEmpty(); 183 QTime time;
307 if (initialQuery && sortingRequired) { 184 time.start();
308 Trace() << "Sorting the resultset in memory according to property: " << sortProperty;
309 // Sort the complete set by reading the sort property and filling into a sorted map
310 auto sortedMap = QSharedPointer<QMap<QByteArray, QByteArray>>::create();
311 while (resultSet.next()) {
312 // readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess)
313 readEntity(db, resultSet.id(),
314 [this, filter, initialQuery, sortedMap, sortProperty, &resultSet](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject, Sink::Operation operation) {
315 // We're not interested in removals during the initial query
316 if ((operation != Sink::Operation_Removal) && filter(domainObject)) {
317 if (!sortProperty.isEmpty()) {
318 const auto sortValue = domainObject->getProperty(sortProperty);
319 if (sortValue.type() == QVariant::DateTime) {
320 sortedMap->insert(QByteArray::number(std::numeric_limits<unsigned int>::max() - sortValue.toDateTime().toTime_t()), domainObject->identifier());
321 } else {
322 sortedMap->insert(sortValue.toString().toLatin1(), domainObject->identifier());
323 }
324 } else {
325 sortedMap->insert(domainObject->identifier(), domainObject->identifier());
326 }
327 }
328 });
329 }
330 185
331 Trace() << "Sorted " << sortedMap->size() << " values."; 186 auto transaction = getTransaction();
332 auto iterator = QSharedPointer<QMapIterator<QByteArray, QByteArray>>::create(*sortedMap);
333 ResultSet::ValueGenerator generator = [this, iterator, sortedMap, &db, filter, initialQuery](
334 std::function<void(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &, Sink::Operation)> callback) -> bool {
335 if (iterator->hasNext()) {
336 readEntity(db, iterator->next().value(), [this, filter, callback, initialQuery](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject,
337 Sink::Operation operation) { callback(domainObject, Sink::Operation_Creation); });
338 return true;
339 }
340 return false;
341 };
342
343 auto skip = [iterator]() {
344 if (iterator->hasNext()) {
345 iterator->next();
346 }
347 };
348 return ResultSet(generator, skip);
349 } else {
350 auto resultSetPtr = QSharedPointer<ResultSet>::create(resultSet);
351 ResultSet::ValueGenerator generator = [this, resultSetPtr, &db, filter, initialQuery](
352 std::function<void(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &, Sink::Operation)> callback) -> bool {
353 if (resultSetPtr->next()) {
354 // readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess)
355 readEntity(db, resultSetPtr->id(), [this, filter, callback, initialQuery](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject, Sink::Operation operation) {
356 if (initialQuery) {
357 // We're not interested in removals during the initial query
358 if ((operation != Sink::Operation_Removal) && filter(domainObject)) {
359 // In the initial set every entity is new
360 callback(domainObject, Sink::Operation_Creation);
361 }
362 } else {
363 // Always remove removals, they probably don't match due to non-available properties
364 if ((operation == Sink::Operation_Removal) || filter(domainObject)) {
365 // TODO only replay if this is in the currently visible set (or just always replay, worst case we have a couple to many results)
366 callback(domainObject, operation);
367 }
368 }
369 });
370 return true;
371 }
372 return false;
373 };
374 auto skip = [resultSetPtr]() { resultSetPtr->skip(1); };
375 return ResultSet(generator, skip);
376 }
377}
378 187
379template <class DomainType> 188 Sink::EntityReader<DomainType> reader(*mDomainTypeAdaptorFactory, mResourceInstanceIdentifier, transaction);
380std::function<bool(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> 189 auto revisionAndReplayedEntities = reader.executeIncrementalQuery(query, resultProvider.revision(), resultProviderCallback(query, resultProvider));
381QueryWorker<DomainType>::getFilter(const QSet<QByteArray> remainingFilters, const Sink::Query &query) 190 Trace() << "Incremental query took: " << Log::TraceTime(time.elapsed());
382{ 191 return revisionAndReplayedEntities;
383 return [this, remainingFilters, query](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject) -> bool {
384 if (!query.ids.isEmpty()) {
385 if (!query.ids.contains(domainObject->identifier())) {
386 return false;
387 }
388 }
389 for (const auto &filterProperty : remainingFilters) {
390 const auto property = domainObject->getProperty(filterProperty);
391 if (property.isValid()) {
392 const auto comparator = query.propertyFilter.value(filterProperty);
393 if (!comparator.matches(property)) {
394 Trace() << "Filtering entity due to property mismatch on filter: " << filterProperty << property << ":" << comparator.value;
395 return false;
396 }
397 } else {
398 Warning() << "Ignored property filter because value is invalid: " << filterProperty;
399 }
400 }
401 return true;
402 };
403} 192}
404 193
405template <class DomainType> 194template <class DomainType>
406QPair<qint64, qint64> QueryWorker<DomainType>::load(const Sink::Query &query, const std::function<ResultSet(Sink::Storage::Transaction &, QSet<QByteArray> &, QByteArray &)> &baseSetRetriever, 195Storage::Transaction QueryWorker<DomainType>::getTransaction()
407 Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, bool initialQuery, int offset, int batchSize)
408{ 196{
409 QTime time;
410 time.start();
411
412 Sink::Storage::Transaction transaction; 197 Sink::Storage::Transaction transaction;
413 { 198 {
414 Sink::Storage storage(Sink::storageLocation(), mResourceInstanceIdentifier); 199 Sink::Storage storage(Sink::storageLocation(), mResourceInstanceIdentifier);
@@ -422,33 +207,7 @@ QPair<qint64, qint64> QueryWorker<DomainType>::load(const Sink::Query &query, co
422 Sink::Storage storage(Sink::storageLocation(), mResourceInstanceIdentifier); 207 Sink::Storage storage(Sink::storageLocation(), mResourceInstanceIdentifier);
423 transaction = storage.createTransaction(Sink::Storage::ReadOnly); 208 transaction = storage.createTransaction(Sink::Storage::ReadOnly);
424 } 209 }
425 auto db = Storage::mainDatabase(transaction, mBufferType); 210 return transaction;
426
427 QSet<QByteArray> remainingFilters;
428 QByteArray remainingSorting;
429 auto resultSet = baseSetRetriever(transaction, remainingFilters, remainingSorting);
430 Trace() << "Base set retrieved. " << Log::TraceTime(time.elapsed());
431 auto filteredSet = filterAndSortSet(resultSet, getFilter(remainingFilters, query), db, initialQuery, remainingSorting);
432 Trace() << "Filtered set retrieved. " << Log::TraceTime(time.elapsed());
433 auto replayedEntities = replaySet(filteredSet, resultProvider, query.requestedProperties, offset, batchSize);
434 Trace() << "Filtered set replayed. " << Log::TraceTime(time.elapsed());
435 resultProvider.setRevision(Sink::Storage::maxRevision(transaction));
436 return qMakePair(Sink::Storage::maxRevision(transaction), replayedEntities);
437}
438
439template <class DomainType>
440QPair<qint64, qint64> QueryWorker<DomainType>::executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider)
441{
442 QTime time;
443 time.start();
444
445 const qint64 baseRevision = resultProvider.revision() + 1;
446 Trace() << "Running incremental query " << baseRevision;
447 auto revisionAndReplayedEntities = load(query, [&](Sink::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters, QByteArray &remainingSorting) -> ResultSet {
448 return loadIncrementalResultSet(baseRevision, query, transaction, remainingFilters);
449 }, resultProvider, false, 0, 0);
450 Trace() << "Incremental query took: " << Log::TraceTime(time.elapsed());
451 return revisionAndReplayedEntities;
452} 211}
453 212
454template <class DomainType> 213template <class DomainType>
@@ -468,9 +227,11 @@ QPair<qint64, qint64> QueryWorker<DomainType>::executeInitialQuery(
468 modifiedQuery.propertyFilter.insert(query.parentProperty, Query::Comparator(QVariant())); 227 modifiedQuery.propertyFilter.insert(query.parentProperty, Query::Comparator(QVariant()));
469 } 228 }
470 } 229 }
471 auto revisionAndReplayedEntities = load(modifiedQuery, [&](Sink::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters, QByteArray &remainingSorting) -> ResultSet { 230
472 return loadInitialResultSet(modifiedQuery, transaction, remainingFilters, remainingSorting); 231 auto transaction = getTransaction();
473 }, resultProvider, true, offset, batchsize); 232
233 Sink::EntityReader<DomainType> reader(*mDomainTypeAdaptorFactory, mResourceInstanceIdentifier, transaction);
234 auto revisionAndReplayedEntities = reader.executeInitialQuery(modifiedQuery, offset, batchsize, resultProviderCallback(query, resultProvider));
474 Trace() << "Initial query took: " << Log::TraceTime(time.elapsed()); 235 Trace() << "Initial query took: " << Log::TraceTime(time.elapsed());
475 resultProvider.initialResultSetComplete(parent); 236 resultProvider.initialResultSetComplete(parent);
476 return revisionAndReplayedEntities; 237 return revisionAndReplayedEntities;