summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--common/datastorequery.cpp163
-rw-r--r--common/datastorequery.h18
-rw-r--r--common/queryrunner.cpp119
-rw-r--r--common/queryrunner.h4
-rw-r--r--tests/querytest.cpp58
5 files changed, 244 insertions, 118 deletions
diff --git a/common/datastorequery.cpp b/common/datastorequery.cpp
index 41d962c..3ba8f40 100644
--- a/common/datastorequery.cpp
+++ b/common/datastorequery.cpp
@@ -72,7 +72,7 @@ class Source : public FilterBase {
72 if (mIt == mIds.constEnd()) { 72 if (mIt == mIds.constEnd()) {
73 return false; 73 return false;
74 } 74 }
75 readEntity(*mIt, [callback](const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation operation) { 75 readEntity(*mIt, [this, callback](const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation operation) {
76 SinkTraceCtx(mDatastore->mLogCtx) << "Source: Read entity: " << entity.identifier() << operationName(operation); 76 SinkTraceCtx(mDatastore->mLogCtx) << "Source: Read entity: " << entity.identifier() << operationName(operation);
77 callback({entity, operation}); 77 callback({entity, operation});
78 }); 78 });
@@ -115,7 +115,7 @@ public:
115 virtual bool next(const std::function<void(const ResultSet::Result &result)> &callback) Q_DECL_OVERRIDE { 115 virtual bool next(const std::function<void(const ResultSet::Result &result)> &callback) Q_DECL_OVERRIDE {
116 bool foundValue = false; 116 bool foundValue = false;
117 while(!foundValue && mSource->next([this, callback, &foundValue](const ResultSet::Result &result) { 117 while(!foundValue && mSource->next([this, callback, &foundValue](const ResultSet::Result &result) {
118 SinkTraceCtx(mDatastore->mLogCtx) << "Filter: " << result.entity.identifier() << result.operation; 118 SinkTraceCtx(mDatastore->mLogCtx) << "Filter: " << result.entity.identifier() << operationName(result.operation);
119 119
120 //Always accept removals. They can't match the filter since the data is gone. 120 //Always accept removals. They can't match the filter since the data is gone.
121 if (result.operation == Sink::Operation_Removal) { 121 if (result.operation == Sink::Operation_Removal) {
@@ -167,17 +167,11 @@ public:
167 167
168 } 168 }
169 169
170 void process() {
171 if (operation == QueryBase::Reduce::Aggregator::Count) {
172 mResult = mResult.toInt() + 1;
173 } else {
174 Q_ASSERT(false);
175 }
176 }
177
178 void process(const QVariant &value) { 170 void process(const QVariant &value) {
179 if (operation == QueryBase::Reduce::Aggregator::Collect) { 171 if (operation == QueryBase::Reduce::Aggregator::Collect) {
180 mResult = mResult.toList() << value; 172 mResult = mResult.toList() << value;
173 } else if (operation == QueryBase::Reduce::Aggregator::Count) {
174 mResult = mResult.toInt() + 1;
181 } else { 175 } else {
182 Q_ASSERT(false); 176 Q_ASSERT(false);
183 } 177 }
@@ -196,8 +190,8 @@ public:
196 QVariant mResult; 190 QVariant mResult;
197 }; 191 };
198 192
199 QHash<QByteArray, QVariant> mAggregateValues;
200 QSet<QByteArray> mReducedValues; 193 QSet<QByteArray> mReducedValues;
194 QHash<QByteArray, QByteArray> mSelectedValues;
201 QByteArray mReductionProperty; 195 QByteArray mReductionProperty;
202 QByteArray mSelectionProperty; 196 QByteArray mSelectionProperty;
203 QueryBase::Reduce::Selector::Comparator mSelectionComparator; 197 QueryBase::Reduce::Selector::Comparator mSelectionComparator;
@@ -231,51 +225,80 @@ public:
231 return false; 225 return false;
232 } 226 }
233 227
228 QByteArray reduceOnValue(const QVariant &reductionValue, QMap<QByteArray, QVariant> &aggregateValues)
229 {
230 QVariant selectionResultValue;
231 QByteArray selectionResult;
232 auto results = indexLookup(mReductionProperty, reductionValue);
233 for (auto &aggregator : mAggregators) {
234 aggregator.reset();
235 }
236
237 for (const auto &r : results) {
238 readEntity(r, [&, this](const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation operation) {
239 Q_ASSERT(operation != Sink::Operation_Removal);
240 for (auto &aggregator : mAggregators) {
241 if (!aggregator.property.isEmpty()) {
242 aggregator.process(entity.getProperty(aggregator.property));
243 } else {
244 aggregator.process(QVariant{});
245 }
246 }
247 auto selectionValue = entity.getProperty(mSelectionProperty);
248 if (!selectionResultValue.isValid() || compare(selectionValue, selectionResultValue, mSelectionComparator)) {
249 selectionResultValue = selectionValue;
250 selectionResult = entity.identifier();
251 }
252 });
253 }
254
255 for (auto &aggregator : mAggregators) {
256 aggregateValues.insert(aggregator.resultProperty, aggregator.result());
257 }
258 return selectionResult;
259 }
260
234 bool next(const std::function<void(const ResultSet::Result &)> &callback) Q_DECL_OVERRIDE { 261 bool next(const std::function<void(const ResultSet::Result &)> &callback) Q_DECL_OVERRIDE {
235 bool foundValue = false; 262 bool foundValue = false;
236 while(!foundValue && mSource->next([this, callback, &foundValue](const ResultSet::Result &result) { 263 while(!foundValue && mSource->next([this, callback, &foundValue](const ResultSet::Result &result) {
237 auto reductionValue = result.entity.getProperty(mReductionProperty);
238 if (result.operation == Sink::Operation_Removal) { 264 if (result.operation == Sink::Operation_Removal) {
239 callback(result); 265 callback(result);
240 return false; 266 return false;
241 } 267 }
242 if (!mReducedValues.contains(getByteArray(reductionValue))) { 268 auto reductionValue = result.entity.getProperty(mReductionProperty);
269 const auto &reductionValueBa = getByteArray(reductionValue);
270 if (!mReducedValues.contains(reductionValueBa)) {
243 //Only reduce every value once. 271 //Only reduce every value once.
244 mReducedValues.insert(getByteArray(reductionValue)); 272 mReducedValues.insert(reductionValueBa);
245 QVariant selectionResultValue;
246 QByteArray selectionResult;
247 auto results = indexLookup(mReductionProperty, reductionValue);
248 for (auto &aggregator : mAggregators) {
249 aggregator.reset();
250 }
251
252 QVariantList list;
253 for (const auto &r : results) {
254 readEntity(r, [&, this](const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation operation) {
255 for (auto &aggregator : mAggregators) {
256 if (!aggregator.property.isEmpty()) {
257 aggregator.process(entity.getProperty(aggregator.property));
258 } else {
259 aggregator.process();
260 }
261 }
262 auto selectionValue = entity.getProperty(mSelectionProperty);
263 if (!selectionResultValue.isValid() || compare(selectionValue, selectionResultValue, mSelectionComparator)) {
264 selectionResultValue = selectionValue;
265 selectionResult = entity.identifier();
266 }
267 });
268 }
269
270 QMap<QByteArray, QVariant> aggregateValues; 273 QMap<QByteArray, QVariant> aggregateValues;
271 for (auto &aggregator : mAggregators) { 274 auto selectionResult = reduceOnValue(reductionValue, aggregateValues);
272 aggregateValues.insert(aggregator.resultProperty, aggregator.result());
273 }
274 275
276 mSelectedValues.insert(reductionValueBa, selectionResult);
275 readEntity(selectionResult, [&, this](const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation operation) { 277 readEntity(selectionResult, [&, this](const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation operation) {
276 callback({entity, operation, aggregateValues}); 278 callback({entity, operation, aggregateValues});
277 foundValue = true; 279 foundValue = true;
278 }); 280 });
281 } else {
282 //During initial query, do nothing. The lookup above will take care of it.
283 //During updates adjust the reduction according to the modification/addition or removal
284 if (mIncremental) {
285 //redo the reduction
286 QMap<QByteArray, QVariant> aggregateValues;
287 auto selectionResult = reduceOnValue(reductionValue, aggregateValues);
288
289 //TODO if old and new are the same a modification would be enough
290 auto oldSelectionResult = mSelectedValues.take(reductionValueBa);
291 //remove old result
292 readEntity(oldSelectionResult, [&, this](const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation) {
293 callback({entity, Sink::Operation_Removal});
294 });
295
296 //add new result
297 mSelectedValues.insert(reductionValueBa, selectionResult);
298 readEntity(selectionResult, [&, this](const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation) {
299 callback({entity, Sink::Operation_Creation, aggregateValues});
300 });
301 }
279 } 302 }
280 return false; 303 return false;
281 })) 304 }))
@@ -330,9 +353,23 @@ public:
330}; 353};
331 354
332DataStoreQuery::DataStoreQuery(const Sink::QueryBase &query, const QByteArray &type, EntityStore &store) 355DataStoreQuery::DataStoreQuery(const Sink::QueryBase &query, const QByteArray &type, EntityStore &store)
333 : mQuery(query), mType(type), mStore(store), mLogCtx(store.logContext().subContext("datastorequery")) 356 : mType(type), mStore(store), mLogCtx(store.logContext().subContext("datastorequery"))
357{
358 setupQuery(query);
359}
360
361DataStoreQuery::DataStoreQuery(const DataStoreQuery::State &state, const QByteArray &type, Sink::Storage::EntityStore &store)
362 : mType(type), mStore(store), mLogCtx(store.logContext().subContext("datastorequery"))
334{ 363{
335 setupQuery(); 364 mCollector = state.mCollector;
365 mSource = state.mSource;
366
367 auto source = mCollector;
368 while (source) {
369 source->mDatastore = this;
370 source->mIncremental = true;
371 source = source->mSource;
372 }
336} 373}
337 374
338DataStoreQuery::~DataStoreQuery() 375DataStoreQuery::~DataStoreQuery()
@@ -340,6 +377,14 @@ DataStoreQuery::~DataStoreQuery()
340 377
341} 378}
342 379
380DataStoreQuery::State::Ptr DataStoreQuery::getState()
381{
382 auto state = State::Ptr::create();
383 state->mSource = mSource;
384 state->mCollector = mCollector;
385 return state;
386}
387
343void DataStoreQuery::readEntity(const QByteArray &key, const BufferCallback &resultCallback) 388void DataStoreQuery::readEntity(const QByteArray &key, const BufferCallback &resultCallback)
344{ 389{
345 mStore.readLatest(mType, key, resultCallback); 390 mStore.readLatest(mType, key, resultCallback);
@@ -443,9 +488,10 @@ QByteArrayList DataStoreQuery::executeSubquery(const QueryBase &subquery)
443 return ids; 488 return ids;
444} 489}
445 490
446void DataStoreQuery::setupQuery() 491void DataStoreQuery::setupQuery(const Sink::QueryBase &query_)
447{ 492{
448 auto baseFilters = mQuery.getBaseFilters(); 493 auto query = query_;
494 auto baseFilters = query.getBaseFilters();
449 for (const auto &k : baseFilters.keys()) { 495 for (const auto &k : baseFilters.keys()) {
450 const auto comparator = baseFilters.value(k); 496 const auto comparator = baseFilters.value(k);
451 if (comparator.value.canConvert<Query>()) { 497 if (comparator.value.canConvert<Query>()) {
@@ -454,44 +500,43 @@ void DataStoreQuery::setupQuery()
454 baseFilters.insert(k, Query::Comparator(QVariant::fromValue(result), Query::Comparator::In)); 500 baseFilters.insert(k, Query::Comparator(QVariant::fromValue(result), Query::Comparator::In));
455 } 501 }
456 } 502 }
457 mQuery.setBaseFilters(baseFilters); 503 query.setBaseFilters(baseFilters);
458 504
459 FilterBase::Ptr baseSet; 505 FilterBase::Ptr baseSet;
460 QSet<QByteArray> remainingFilters = mQuery.getBaseFilters().keys().toSet(); 506 QSet<QByteArray> remainingFilters = query.getBaseFilters().keys().toSet();
461 QByteArray appliedSorting; 507 QByteArray appliedSorting;
462 if (!mQuery.ids().isEmpty()) { 508 if (!query.ids().isEmpty()) {
463 mSource = Source::Ptr::create(mQuery.ids().toVector(), this); 509 mSource = Source::Ptr::create(query.ids().toVector(), this);
464 baseSet = mSource; 510 baseSet = mSource;
465 } else { 511 } else {
466 QSet<QByteArray> appliedFilters; 512 QSet<QByteArray> appliedFilters;
467 513
468 auto resultSet = mStore.indexLookup(mType, mQuery, appliedFilters, appliedSorting); 514 auto resultSet = mStore.indexLookup(mType, query, appliedFilters, appliedSorting);
469 remainingFilters = remainingFilters - appliedFilters; 515 remainingFilters = remainingFilters - appliedFilters;
470 516
471 // We do a full scan if there were no indexes available to create the initial set. 517 // We do a full scan if there were no indexes available to create the initial set.
472 if (appliedFilters.isEmpty()) { 518 if (appliedFilters.isEmpty()) {
473 // TODO this should be replaced by an index lookup on the uid index
474 mSource = Source::Ptr::create(mStore.fullScan(mType), this); 519 mSource = Source::Ptr::create(mStore.fullScan(mType), this);
475 } else { 520 } else {
476 mSource = Source::Ptr::create(resultSet, this); 521 mSource = Source::Ptr::create(resultSet, this);
477 } 522 }
478 baseSet = mSource; 523 baseSet = mSource;
479 } 524 }
480 if (!mQuery.getBaseFilters().isEmpty()) { 525 if (!query.getBaseFilters().isEmpty()) {
481 auto filter = Filter::Ptr::create(baseSet, this); 526 auto filter = Filter::Ptr::create(baseSet, this);
482 //For incremental queries the remaining filters are not sufficient 527 //For incremental queries the remaining filters are not sufficient
483 for (const auto &f : mQuery.getBaseFilters().keys()) { 528 for (const auto &f : query.getBaseFilters().keys()) {
484 filter->propertyFilter.insert(f, mQuery.getFilter(f)); 529 filter->propertyFilter.insert(f, query.getFilter(f));
485 } 530 }
486 baseSet = filter; 531 baseSet = filter;
487 } 532 }
488 /* if (appliedSorting.isEmpty() && !mQuery.sortProperty.isEmpty()) { */ 533 /* if (appliedSorting.isEmpty() && !query.sortProperty.isEmpty()) { */
489 /* //Apply manual sorting */ 534 /* //Apply manual sorting */
490 /* baseSet = Sort::Ptr::create(baseSet, mQuery.sortProperty); */ 535 /* baseSet = Sort::Ptr::create(baseSet, query.sortProperty); */
491 /* } */ 536 /* } */
492 537
493 //Setup the rest of the filter stages on top of the base set 538 //Setup the rest of the filter stages on top of the base set
494 for (const auto &stage : mQuery.getFilterStages()) { 539 for (const auto &stage : query.getFilterStages()) {
495 if (auto filter = stage.dynamicCast<Query::Filter>()) { 540 if (auto filter = stage.dynamicCast<Query::Filter>()) {
496 auto f = Filter::Ptr::create(baseSet, this); 541 auto f = Filter::Ptr::create(baseSet, this);
497 f->propertyFilter = filter->propertyFilter; 542 f->propertyFilter = filter->propertyFilter;
@@ -521,7 +566,7 @@ QVector<QByteArray> DataStoreQuery::loadIncrementalResultSet(qint64 baseRevision
521 566
522ResultSet DataStoreQuery::update(qint64 baseRevision) 567ResultSet DataStoreQuery::update(qint64 baseRevision)
523{ 568{
524 SinkTraceCtx(mLogCtx) << "Executing query update to revision " << baseRevision; 569 SinkTraceCtx(mLogCtx) << "Executing query update from revision " << baseRevision;
525 auto incrementalResultSet = loadIncrementalResultSet(baseRevision); 570 auto incrementalResultSet = loadIncrementalResultSet(baseRevision);
526 SinkTraceCtx(mLogCtx) << "Incremental changes: " << incrementalResultSet; 571 SinkTraceCtx(mLogCtx) << "Incremental changes: " << incrementalResultSet;
527 mSource->add(incrementalResultSet); 572 mSource->add(incrementalResultSet);
diff --git a/common/datastorequery.h b/common/datastorequery.h
index 5a47685..a797782 100644
--- a/common/datastorequery.h
+++ b/common/datastorequery.h
@@ -26,6 +26,7 @@
26 26
27class Source; 27class Source;
28class Bloom; 28class Bloom;
29class Reduce;
29class Filter; 30class Filter;
30class FilterBase; 31class FilterBase;
31 32
@@ -33,15 +34,25 @@ class DataStoreQuery {
33 friend class FilterBase; 34 friend class FilterBase;
34 friend class Source; 35 friend class Source;
35 friend class Bloom; 36 friend class Bloom;
37 friend class Reduce;
36 friend class Filter; 38 friend class Filter;
37public: 39public:
38 typedef QSharedPointer<DataStoreQuery> Ptr; 40 typedef QSharedPointer<DataStoreQuery> Ptr;
39 41
42 struct State {
43 typedef QSharedPointer<State> Ptr;
44 QSharedPointer<FilterBase> mCollector;
45 QSharedPointer<Source> mSource;
46 };
47
40 DataStoreQuery(const Sink::QueryBase &query, const QByteArray &type, Sink::Storage::EntityStore &store); 48 DataStoreQuery(const Sink::QueryBase &query, const QByteArray &type, Sink::Storage::EntityStore &store);
49 DataStoreQuery(const DataStoreQuery::State &state, const QByteArray &type, Sink::Storage::EntityStore &store);
41 ~DataStoreQuery(); 50 ~DataStoreQuery();
42 ResultSet execute(); 51 ResultSet execute();
43 ResultSet update(qint64 baseRevision); 52 ResultSet update(qint64 baseRevision);
44 53
54 State::Ptr getState();
55
45private: 56private:
46 57
47 typedef std::function<bool(const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation)> FilterFunction; 58 typedef std::function<bool(const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation)> FilterFunction;
@@ -54,12 +65,10 @@ private:
54 ResultSet createFilteredSet(ResultSet &resultSet, const FilterFunction &); 65 ResultSet createFilteredSet(ResultSet &resultSet, const FilterFunction &);
55 QVector<QByteArray> loadIncrementalResultSet(qint64 baseRevision); 66 QVector<QByteArray> loadIncrementalResultSet(qint64 baseRevision);
56 67
57 void setupQuery(); 68 void setupQuery(const Sink::QueryBase &query_);
58 QByteArrayList executeSubquery(const Sink::QueryBase &subquery); 69 QByteArrayList executeSubquery(const Sink::QueryBase &subquery);
59 70
60 Sink::QueryBase mQuery;
61 const QByteArray mType; 71 const QByteArray mType;
62 bool mInitialQuery;
63 QSharedPointer<FilterBase> mCollector; 72 QSharedPointer<FilterBase> mCollector;
64 QSharedPointer<Source> mSource; 73 QSharedPointer<Source> mSource;
65 74
@@ -102,7 +111,8 @@ public:
102 //Returns true for as long as a result is available 111 //Returns true for as long as a result is available
103 virtual bool next(const std::function<void(const ResultSet::Result &)> &callback) = 0; 112 virtual bool next(const std::function<void(const ResultSet::Result &)> &callback) = 0;
104 113
105 QSharedPointer<FilterBase> mSource; 114 FilterBase::Ptr mSource;
106 DataStoreQuery *mDatastore; 115 DataStoreQuery *mDatastore;
116 bool mIncremental = false;
107}; 117};
108 118
diff --git a/common/queryrunner.cpp b/common/queryrunner.cpp
index 748320f..40880eb 100644
--- a/common/queryrunner.cpp
+++ b/common/queryrunner.cpp
@@ -33,6 +33,7 @@ struct ReplayResult {
33 qint64 newRevision; 33 qint64 newRevision;
34 qint64 replayedEntities; 34 qint64 replayedEntities;
35 bool replayedAll; 35 bool replayedAll;
36 DataStoreQuery::State::Ptr queryState;
36}; 37};
37 38
38/* 39/*
@@ -49,7 +50,7 @@ public:
49 QueryWorker(const Sink::Query &query, const ResourceContext &context, const QByteArray &bufferType, const QueryRunnerBase::ResultTransformation &transformation, const Sink::Log::Context &logCtx); 50 QueryWorker(const Sink::Query &query, const ResourceContext &context, const QByteArray &bufferType, const QueryRunnerBase::ResultTransformation &transformation, const Sink::Log::Context &logCtx);
50 virtual ~QueryWorker(); 51 virtual ~QueryWorker();
51 52
52 ReplayResult executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider); 53 ReplayResult executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, DataStoreQuery::State::Ptr state);
53 ReplayResult executeInitialQuery(const Sink::Query &query, const typename DomainType::Ptr &parent, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, int offset, int batchsize); 54 ReplayResult executeInitialQuery(const Sink::Query &query, const typename DomainType::Ptr &parent, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, int offset, int batchsize);
54 55
55private: 56private:
@@ -69,45 +70,41 @@ QueryRunner<DomainType>::QueryRunner(const Sink::Query &query, const Sink::Resou
69 SinkWarningCtx(mLogCtx) << "A limited query without sorting is typically a bad idea, because there is no telling what you're going to get."; 70 SinkWarningCtx(mLogCtx) << "A limited query without sorting is typically a bad idea, because there is no telling what you're going to get.";
70 } 71 }
71 auto guardPtr = QPointer<QObject>(&guard); 72 auto guardPtr = QPointer<QObject>(&guard);
72 // We delegate loading of initial data to the result provider, so it can decide for itself what it needs to load. 73 auto fetcher = [=](const typename DomainType::Ptr &parent) {
73 mResultProvider->setFetcher([=](const typename DomainType::Ptr &parent) {
74 const QByteArray parentId = parent ? parent->identifier() : QByteArray(); 74 const QByteArray parentId = parent ? parent->identifier() : QByteArray();
75 SinkTraceCtx(mLogCtx) << "Running fetcher. Offset: " << mOffset[parentId] << " Batchsize: " << mBatchSize; 75 SinkTraceCtx(mLogCtx) << "Running fetcher. Offset: " << mOffset[parentId] << " Batchsize: " << mBatchSize;
76 auto resultProvider = mResultProvider; 76 auto resultProvider = mResultProvider;
77 if (query.synchronousQuery()) { 77 auto resultTransformation = mResultTransformation;
78 QueryWorker<DomainType> worker(query, mResourceContext, bufferType, mResultTransformation, mLogCtx); 78 auto offset = mOffset[parentId];
79 const auto newRevisionAndReplayedEntities = worker.executeInitialQuery(query, parent, *resultProvider, mOffset[parentId], mBatchSize); 79 auto batchSize = mBatchSize;
80 mOffset[parentId] += newRevisionAndReplayedEntities.replayedEntities; 80 auto resourceContext = mResourceContext;
81 resultProvider->setRevision(newRevisionAndReplayedEntities.newRevision); 81 auto logCtx = mLogCtx;
82 resultProvider->initialResultSetComplete(parent, newRevisionAndReplayedEntities.replayedAll); 82 const bool runAsync = !query.synchronousQuery();
83 } else { 83 //The lambda will be executed in a separate thread, so copy all arguments
84 auto resultTransformation = mResultTransformation; 84 async::run<ReplayResult>([resultTransformation, offset, batchSize, query, bufferType, resourceContext, resultProvider, parent, logCtx]() {
85 auto offset = mOffset[parentId]; 85 QueryWorker<DomainType> worker(query, resourceContext, bufferType, resultTransformation, logCtx);
86 auto batchSize = mBatchSize; 86 return worker.executeInitialQuery(query, parent, *resultProvider, offset, batchSize);
87 auto resourceContext = mResourceContext; 87 }, runAsync)
88 auto logCtx = mLogCtx; 88 .then([this, parentId, query, parent, resultProvider, guardPtr](const ReplayResult &result) {
89 //The lambda will be executed in a separate thread, so copy all arguments 89 if (!guardPtr) {
90 async::run<ReplayResult>([resultTransformation, offset, batchSize, query, bufferType, resourceContext, resultProvider, parent, logCtx]() { 90 qWarning() << "The parent object is already gone";
91 QueryWorker<DomainType> worker(query, resourceContext, bufferType, resultTransformation, logCtx); 91 return;
92 const auto newRevisionAndReplayedEntities = worker.executeInitialQuery(query, parent, *resultProvider, offset, batchSize); 92 }
93 return newRevisionAndReplayedEntities; 93 mInitialQueryComplete = true;
94 mQueryState = result.queryState;
95 mOffset[parentId] += result.replayedEntities;
96 // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise.
97 if (query.liveQuery()) {
98 mResourceAccess->sendRevisionReplayedCommand(result.newRevision);
99 }
100 resultProvider->setRevision(result.newRevision);
101 resultProvider->initialResultSetComplete(parent, result.replayedAll);
94 }) 102 })
95 .template then([this, parentId, query, parent, resultProvider, guardPtr](const ReplayResult &newRevisionAndReplayedEntities) { 103 .exec();
96 if (!guardPtr) { 104 };
97 qWarning() << "The parent object is already gone"; 105
98 return; 106 // We delegate loading of initial data to the result provider, so it can decide for itself what it needs to load.
99 } 107 mResultProvider->setFetcher(fetcher);
100 mOffset[parentId] += newRevisionAndReplayedEntities.replayedEntities;
101 // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise.
102 if (query.liveQuery()) {
103 mResourceAccess->sendRevisionReplayedCommand(newRevisionAndReplayedEntities.newRevision);
104 }
105 resultProvider->setRevision(newRevisionAndReplayedEntities.newRevision);
106 resultProvider->initialResultSetComplete(parent, newRevisionAndReplayedEntities.replayedAll);
107 })
108 .exec();
109 }
110 });
111 108
112 // In case of a live query we keep the runner for as long alive as the result provider exists 109 // In case of a live query we keep the runner for as long alive as the result provider exists
113 if (query.liveQuery()) { 110 if (query.liveQuery()) {
@@ -117,16 +114,26 @@ QueryRunner<DomainType>::QueryRunner(const Sink::Query &query, const Sink::Resou
117 auto resultProvider = mResultProvider; 114 auto resultProvider = mResultProvider;
118 auto resourceContext = mResourceContext; 115 auto resourceContext = mResourceContext;
119 auto logCtx = mLogCtx; 116 auto logCtx = mLogCtx;
120 return async::run<ReplayResult>([=]() { 117 auto state = mQueryState;
118 if (!mInitialQueryComplete) {
119 SinkWarningCtx(mLogCtx) << "Can't start the incremental query before the initial query is complete";
120 fetcher({});
121 return KAsync::null();
122 }
123 Q_ASSERT(!mQueryInProgress);
124 return KAsync::syncStart<void>([&] {
125 mQueryInProgress = true;
126 })
127 .then(async::run<ReplayResult>([=]() {
121 QueryWorker<DomainType> worker(query, resourceContext, bufferType, mResultTransformation, logCtx); 128 QueryWorker<DomainType> worker(query, resourceContext, bufferType, mResultTransformation, logCtx);
122 const auto newRevisionAndReplayedEntities = worker.executeIncrementalQuery(query, *resultProvider); 129 return worker.executeIncrementalQuery(query, *resultProvider, state);
123 return newRevisionAndReplayedEntities; 130 }))
124 }) 131 .then([query, this, resultProvider, guardPtr](const ReplayResult &newRevisionAndReplayedEntities) {
125 .template then([query, this, resultProvider, guardPtr](const ReplayResult &newRevisionAndReplayedEntities) {
126 if (!guardPtr) { 132 if (!guardPtr) {
127 qWarning() << "The parent object is already gone"; 133 qWarning() << "The parent object is already gone";
128 return; 134 return;
129 } 135 }
136 mQueryInProgress = false;
130 // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. 137 // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise.
131 mResourceAccess->sendRevisionReplayedCommand(newRevisionAndReplayedEntities.newRevision); 138 mResourceAccess->sendRevisionReplayedCommand(newRevisionAndReplayedEntities.newRevision);
132 resultProvider->setRevision(newRevisionAndReplayedEntities.newRevision); 139 resultProvider->setRevision(newRevisionAndReplayedEntities.newRevision);
@@ -201,14 +208,20 @@ void QueryWorker<DomainType>::resultProviderCallback(const Sink::Query &query, S
201} 208}
202 209
203template <class DomainType> 210template <class DomainType>
204ReplayResult QueryWorker<DomainType>::executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider) 211ReplayResult QueryWorker<DomainType>::executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, DataStoreQuery::State::Ptr state)
205{ 212{
206 QTime time; 213 QTime time;
207 time.start(); 214 time.start();
208 215
209 const qint64 baseRevision = resultProvider.revision() + 1; 216 const qint64 baseRevision = resultProvider.revision() + 1;
217 SinkTraceCtx(mLogCtx) << "Running query update from revision: " << baseRevision;
218
210 auto entityStore = EntityStore{mResourceContext, mLogCtx}; 219 auto entityStore = EntityStore{mResourceContext, mLogCtx};
211 auto preparedQuery = DataStoreQuery{query, ApplicationDomain::getTypeName<DomainType>(), entityStore}; 220 if (!state) {
221 SinkWarningCtx(mLogCtx) << "No previous query state.";
222 return {0, 0, false, DataStoreQuery::State::Ptr{}};
223 }
224 auto preparedQuery = DataStoreQuery{*state, ApplicationDomain::getTypeName<DomainType>(), entityStore};
212 auto resultSet = preparedQuery.update(baseRevision); 225 auto resultSet = preparedQuery.update(baseRevision);
213 SinkTraceCtx(mLogCtx) << "Filtered set retrieved. " << Log::TraceTime(time.elapsed()); 226 SinkTraceCtx(mLogCtx) << "Filtered set retrieved. " << Log::TraceTime(time.elapsed());
214 auto replayResult = resultSet.replaySet(0, 0, [this, query, &resultProvider](const ResultSet::Result &result) { 227 auto replayResult = resultSet.replaySet(0, 0, [this, query, &resultProvider](const ResultSet::Result &result) {
@@ -218,7 +231,7 @@ ReplayResult QueryWorker<DomainType>::executeIncrementalQuery(const Sink::Query
218 SinkTraceCtx(mLogCtx) << "Replayed " << replayResult.replayedEntities << " results.\n" 231 SinkTraceCtx(mLogCtx) << "Replayed " << replayResult.replayedEntities << " results.\n"
219 << (replayResult.replayedAll ? "Replayed all available results.\n" : "") 232 << (replayResult.replayedAll ? "Replayed all available results.\n" : "")
220 << "Incremental query took: " << Log::TraceTime(time.elapsed()); 233 << "Incremental query took: " << Log::TraceTime(time.elapsed());
221 return {entityStore.maxRevision(), replayResult.replayedEntities, replayResult.replayedAll}; 234 return {entityStore.maxRevision(), replayResult.replayedEntities, replayResult.replayedAll, preparedQuery.getState()};
222} 235}
223 236
224template <class DomainType> 237template <class DomainType>
@@ -251,14 +264,14 @@ ReplayResult QueryWorker<DomainType>::executeInitialQuery(
251 SinkTraceCtx(mLogCtx) << "Replayed " << replayResult.replayedEntities << " results.\n" 264 SinkTraceCtx(mLogCtx) << "Replayed " << replayResult.replayedEntities << " results.\n"
252 << (replayResult.replayedAll ? "Replayed all available results.\n" : "") 265 << (replayResult.replayedAll ? "Replayed all available results.\n" : "")
253 << "Initial query took: " << Log::TraceTime(time.elapsed()); 266 << "Initial query took: " << Log::TraceTime(time.elapsed());
254 return {entityStore.maxRevision(), replayResult.replayedEntities, replayResult.replayedAll}; 267
268 auto state = preparedQuery.getState();
269
270 return {entityStore.maxRevision(), replayResult.replayedEntities, replayResult.replayedAll, state};
255} 271}
256 272
257template class QueryRunner<Sink::ApplicationDomain::Contact>; 273#define REGISTER_TYPE(T) \
258template class QueryRunner<Sink::ApplicationDomain::Folder>; 274 template class QueryRunner<T>; \
259template class QueryRunner<Sink::ApplicationDomain::Mail>; 275 template class QueryWorker<T>; \
260template class QueryRunner<Sink::ApplicationDomain::Event>; 276
261template class QueryWorker<Sink::ApplicationDomain::Contact>; 277SINK_REGISTER_TYPES()
262template class QueryWorker<Sink::ApplicationDomain::Folder>;
263template class QueryWorker<Sink::ApplicationDomain::Mail>;
264template class QueryWorker<Sink::ApplicationDomain::Event>;
diff --git a/common/queryrunner.h b/common/queryrunner.h
index 66dc68f..f5c7ead 100644
--- a/common/queryrunner.h
+++ b/common/queryrunner.h
@@ -24,6 +24,7 @@
24#include "resultprovider.h" 24#include "resultprovider.h"
25#include "query.h" 25#include "query.h"
26#include "log.h" 26#include "log.h"
27#include "datastorequery.h"
27 28
28/** 29/**
29 * Base clase because you can't have the Q_OBJECT macro in template classes 30 * Base clase because you can't have the Q_OBJECT macro in template classes
@@ -101,4 +102,7 @@ private:
101 int mBatchSize; 102 int mBatchSize;
102 QObject guard; 103 QObject guard;
103 Sink::Log::Context mLogCtx; 104 Sink::Log::Context mLogCtx;
105 DataStoreQuery::State::Ptr mQueryState;
106 bool mInitialQueryComplete = false;
107 bool mQueryInProgress = false;
104}; 108};
diff --git a/tests/querytest.cpp b/tests/querytest.cpp
index 0641c0d..328448f 100644
--- a/tests/querytest.cpp
+++ b/tests/querytest.cpp
@@ -731,7 +731,7 @@ private slots:
731 Query query; 731 Query query;
732 query.setId("testLivequeryUnmatch"); 732 query.setId("testLivequeryUnmatch");
733 query.filter<Mail::Folder>(folder1); 733 query.filter<Mail::Folder>(folder1);
734 query.reduce<Mail::ThreadId>(Query::Reduce::Selector::max<Mail::Date>()).count("count").collect<Mail::Sender>("senders"); 734 query.reduce<Mail::ThreadId>(Query::Reduce::Selector::max<Mail::Date>()).count("count").collect<Mail::Folder>("folders");
735 query.sort<Mail::Date>(); 735 query.sort<Mail::Date>();
736 query.setFlags(Query::LiveQuery); 736 query.setFlags(Query::LiveQuery);
737 query.request<Mail::Unread>(); 737 query.request<Mail::Unread>();
@@ -747,7 +747,61 @@ private slots:
747 VERIFYEXEC(Sink::ResourceControl::flushMessageQueue("sink.dummy.instance1")); 747 VERIFYEXEC(Sink::ResourceControl::flushMessageQueue("sink.dummy.instance1"));
748 QTRY_COMPARE(model->rowCount(), 1); 748 QTRY_COMPARE(model->rowCount(), 1);
749 auto mail = model->data(model->index(0, 0, QModelIndex{}), Sink::Store::DomainObjectRole).value<Mail::Ptr>(); 749 auto mail = model->data(model->index(0, 0, QModelIndex{}), Sink::Store::DomainObjectRole).value<Mail::Ptr>();
750 QCOMPARE(mail->getUnread(), true); 750 QTRY_COMPARE(mail->getUnread(), true);
751 QCOMPARE(mail->getProperty("count").toInt(), 1);
752 QCOMPARE(mail->getProperty("folders").toList().size(), 1);
753 }
754
755 void testReductionUpdate()
756 {
757 // Setup
758 auto folder1 = Folder::createEntity<Folder>("sink.dummy.instance1");
759 VERIFYEXEC(Sink::Store::create<Folder>(folder1));
760
761 auto folder2 = Folder::createEntity<Folder>("sink.dummy.instance1");
762 VERIFYEXEC(Sink::Store::create<Folder>(folder2));
763
764 QDateTime now{QDate{2017, 2, 3}, QTime{10, 0, 0}};
765 QDateTime later{QDate{2017, 2, 3}, QTime{11, 0, 0}};
766
767 auto mail1 = Mail::createEntity<Mail>("sink.dummy.instance1");
768 mail1.setExtractedMessageId("mail1");
769 mail1.setFolder(folder1);
770 mail1.setUnread(false);
771 mail1.setExtractedDate(now);
772 VERIFYEXEC(Sink::Store::create(mail1));
773
774 // Ensure all local data is processed
775 VERIFYEXEC(Sink::ResourceControl::flushMessageQueue("sink.dummy.instance1"));
776
777 Query query;
778 query.setId("testLivequeryUnmatch");
779 query.setFlags(Query::LiveQuery);
780 query.filter<Mail::Folder>(folder1);
781 query.reduce<Mail::Folder>(Query::Reduce::Selector::max<Mail::Date>()).count("count").collect<Mail::Folder>("folders");
782 query.sort<Mail::Date>();
783 query.request<Mail::Unread>();
784 query.request<Mail::MessageId>();
785
786 auto model = Sink::Store::loadModel<Mail>(query);
787 QTRY_COMPARE(model->rowCount(), 1);
788
789 //The leader should change to mail2 after the modification
790 {
791 auto mail2 = Mail::createEntity<Mail>("sink.dummy.instance1");
792 mail2.setExtractedMessageId("mail2");
793 mail2.setFolder(folder1);
794 mail2.setUnread(false);
795 mail2.setExtractedDate(later);
796 VERIFYEXEC(Sink::Store::create(mail2));
797 }
798
799 VERIFYEXEC(Sink::ResourceControl::flushMessageQueue("sink.dummy.instance1"));
800 QTRY_COMPARE(model->rowCount(), 1);
801 auto mail = model->data(model->index(0, 0, QModelIndex{}), Sink::Store::DomainObjectRole).value<Mail::Ptr>();
802 QTRY_COMPARE(mail->getMessageId(), QByteArray{"mail2"});
803 QCOMPARE(mail->getProperty("count").toInt(), 2);
804 QCOMPARE(mail->getProperty("folders").toList().size(), 2);
751 } 805 }
752 806
753 void testBloom() 807 void testBloom()