summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2017-02-11 12:02:58 +0100
committerChristian Mollekopf <chrigi_1@fastmail.fm>2017-02-13 19:42:39 +0100
commit1259b236704e790fa1284a63ec537525bce23841 (patch)
tree85cd0491e56d2f604cc8aa291a49d20f8f73c684
parentb4bd3932aa2a8e841ed204b341bcbf65ba59c5b2 (diff)
downloadsink-1259b236704e790fa1284a63ec537525bce23841.tar.gz
sink-1259b236704e790fa1284a63ec537525bce23841.zip
Fixed reduction updates with stateful query.
Some filters need to maintain state between runs in order to be able to emit only what has changed. This now also make reduction work for live queries.
-rw-r--r--common/datastorequery.cpp163
-rw-r--r--common/datastorequery.h18
-rw-r--r--common/queryrunner.cpp119
-rw-r--r--common/queryrunner.h4
-rw-r--r--tests/querytest.cpp58
5 files changed, 244 insertions, 118 deletions
diff --git a/common/datastorequery.cpp b/common/datastorequery.cpp
index 41d962c..3ba8f40 100644
--- a/common/datastorequery.cpp
+++ b/common/datastorequery.cpp
@@ -72,7 +72,7 @@ class Source : public FilterBase {
72 if (mIt == mIds.constEnd()) { 72 if (mIt == mIds.constEnd()) {
73 return false; 73 return false;
74 } 74 }
75 readEntity(*mIt, [callback](const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation operation) { 75 readEntity(*mIt, [this, callback](const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation operation) {
76 SinkTraceCtx(mDatastore->mLogCtx) << "Source: Read entity: " << entity.identifier() << operationName(operation); 76 SinkTraceCtx(mDatastore->mLogCtx) << "Source: Read entity: " << entity.identifier() << operationName(operation);
77 callback({entity, operation}); 77 callback({entity, operation});
78 }); 78 });
@@ -115,7 +115,7 @@ public:
115 virtual bool next(const std::function<void(const ResultSet::Result &result)> &callback) Q_DECL_OVERRIDE { 115 virtual bool next(const std::function<void(const ResultSet::Result &result)> &callback) Q_DECL_OVERRIDE {
116 bool foundValue = false; 116 bool foundValue = false;
117 while(!foundValue && mSource->next([this, callback, &foundValue](const ResultSet::Result &result) { 117 while(!foundValue && mSource->next([this, callback, &foundValue](const ResultSet::Result &result) {
118 SinkTraceCtx(mDatastore->mLogCtx) << "Filter: " << result.entity.identifier() << result.operation; 118 SinkTraceCtx(mDatastore->mLogCtx) << "Filter: " << result.entity.identifier() << operationName(result.operation);
119 119
120 //Always accept removals. They can't match the filter since the data is gone. 120 //Always accept removals. They can't match the filter since the data is gone.
121 if (result.operation == Sink::Operation_Removal) { 121 if (result.operation == Sink::Operation_Removal) {
@@ -167,17 +167,11 @@ public:
167 167
168 } 168 }
169 169
170 void process() {
171 if (operation == QueryBase::Reduce::Aggregator::Count) {
172 mResult = mResult.toInt() + 1;
173 } else {
174 Q_ASSERT(false);
175 }
176 }
177
178 void process(const QVariant &value) { 170 void process(const QVariant &value) {
179 if (operation == QueryBase::Reduce::Aggregator::Collect) { 171 if (operation == QueryBase::Reduce::Aggregator::Collect) {
180 mResult = mResult.toList() << value; 172 mResult = mResult.toList() << value;
173 } else if (operation == QueryBase::Reduce::Aggregator::Count) {
174 mResult = mResult.toInt() + 1;
181 } else { 175 } else {
182 Q_ASSERT(false); 176 Q_ASSERT(false);
183 } 177 }
@@ -196,8 +190,8 @@ public:
196 QVariant mResult; 190 QVariant mResult;
197 }; 191 };
198 192
199 QHash<QByteArray, QVariant> mAggregateValues;
200 QSet<QByteArray> mReducedValues; 193 QSet<QByteArray> mReducedValues;
194 QHash<QByteArray, QByteArray> mSelectedValues;
201 QByteArray mReductionProperty; 195 QByteArray mReductionProperty;
202 QByteArray mSelectionProperty; 196 QByteArray mSelectionProperty;
203 QueryBase::Reduce::Selector::Comparator mSelectionComparator; 197 QueryBase::Reduce::Selector::Comparator mSelectionComparator;
@@ -231,51 +225,80 @@ public:
231 return false; 225 return false;
232 } 226 }
233 227
228 QByteArray reduceOnValue(const QVariant &reductionValue, QMap<QByteArray, QVariant> &aggregateValues)
229 {
230 QVariant selectionResultValue;
231 QByteArray selectionResult;
232 auto results = indexLookup(mReductionProperty, reductionValue);
233 for (auto &aggregator : mAggregators) {
234 aggregator.reset();
235 }
236
237 for (const auto &r : results) {
238 readEntity(r, [&, this](const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation operation) {
239 Q_ASSERT(operation != Sink::Operation_Removal);
240 for (auto &aggregator : mAggregators) {
241 if (!aggregator.property.isEmpty()) {
242 aggregator.process(entity.getProperty(aggregator.property));
243 } else {
244 aggregator.process(QVariant{});
245 }
246 }
247 auto selectionValue = entity.getProperty(mSelectionProperty);
248 if (!selectionResultValue.isValid() || compare(selectionValue, selectionResultValue, mSelectionComparator)) {
249 selectionResultValue = selectionValue;
250 selectionResult = entity.identifier();
251 }
252 });
253 }
254
255 for (auto &aggregator : mAggregators) {
256 aggregateValues.insert(aggregator.resultProperty, aggregator.result());
257 }
258 return selectionResult;
259 }
260
234 bool next(const std::function<void(const ResultSet::Result &)> &callback) Q_DECL_OVERRIDE { 261 bool next(const std::function<void(const ResultSet::Result &)> &callback) Q_DECL_OVERRIDE {
235 bool foundValue = false; 262 bool foundValue = false;
236 while(!foundValue && mSource->next([this, callback, &foundValue](const ResultSet::Result &result) { 263 while(!foundValue && mSource->next([this, callback, &foundValue](const ResultSet::Result &result) {
237 auto reductionValue = result.entity.getProperty(mReductionProperty);
238 if (result.operation == Sink::Operation_Removal) { 264 if (result.operation == Sink::Operation_Removal) {
239 callback(result); 265 callback(result);
240 return false; 266 return false;
241 } 267 }
242 if (!mReducedValues.contains(getByteArray(reductionValue))) { 268 auto reductionValue = result.entity.getProperty(mReductionProperty);
269 const auto &reductionValueBa = getByteArray(reductionValue);
270 if (!mReducedValues.contains(reductionValueBa)) {
243 //Only reduce every value once. 271 //Only reduce every value once.
244 mReducedValues.insert(getByteArray(reductionValue)); 272 mReducedValues.insert(reductionValueBa);
245 QVariant selectionResultValue;
246 QByteArray selectionResult;
247 auto results = indexLookup(mReductionProperty, reductionValue);
248 for (auto &aggregator : mAggregators) {
249 aggregator.reset();
250 }
251
252 QVariantList list;
253 for (const auto &r : results) {
254 readEntity(r, [&, this](const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation operation) {
255 for (auto &aggregator : mAggregators) {
256 if (!aggregator.property.isEmpty()) {
257 aggregator.process(entity.getProperty(aggregator.property));
258 } else {
259 aggregator.process();
260 }
261 }
262 auto selectionValue = entity.getProperty(mSelectionProperty);
263 if (!selectionResultValue.isValid() || compare(selectionValue, selectionResultValue, mSelectionComparator)) {
264 selectionResultValue = selectionValue;
265 selectionResult = entity.identifier();
266 }
267 });
268 }
269
270 QMap<QByteArray, QVariant> aggregateValues; 273 QMap<QByteArray, QVariant> aggregateValues;
271 for (auto &aggregator : mAggregators) { 274 auto selectionResult = reduceOnValue(reductionValue, aggregateValues);
272 aggregateValues.insert(aggregator.resultProperty, aggregator.result());
273 }
274 275
276 mSelectedValues.insert(reductionValueBa, selectionResult);
275 readEntity(selectionResult, [&, this](const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation operation) { 277 readEntity(selectionResult, [&, this](const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation operation) {
276 callback({entity, operation, aggregateValues}); 278 callback({entity, operation, aggregateValues});
277 foundValue = true; 279 foundValue = true;
278 }); 280 });
281 } else {
282 //During initial query, do nothing. The lookup above will take care of it.
283 //During updates adjust the reduction according to the modification/addition or removal
284 if (mIncremental) {
285 //redo the reduction
286 QMap<QByteArray, QVariant> aggregateValues;
287 auto selectionResult = reduceOnValue(reductionValue, aggregateValues);
288
289 //TODO if old and new are the same a modification would be enough
290 auto oldSelectionResult = mSelectedValues.take(reductionValueBa);
291 //remove old result
292 readEntity(oldSelectionResult, [&, this](const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation) {
293 callback({entity, Sink::Operation_Removal});
294 });
295
296 //add new result
297 mSelectedValues.insert(reductionValueBa, selectionResult);
298 readEntity(selectionResult, [&, this](const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation) {
299 callback({entity, Sink::Operation_Creation, aggregateValues});
300 });
301 }
279 } 302 }
280 return false; 303 return false;
281 })) 304 }))
@@ -330,9 +353,23 @@ public:
330}; 353};
331 354
332DataStoreQuery::DataStoreQuery(const Sink::QueryBase &query, const QByteArray &type, EntityStore &store) 355DataStoreQuery::DataStoreQuery(const Sink::QueryBase &query, const QByteArray &type, EntityStore &store)
333 : mQuery(query), mType(type), mStore(store), mLogCtx(store.logContext().subContext("datastorequery")) 356 : mType(type), mStore(store), mLogCtx(store.logContext().subContext("datastorequery"))
357{
358 setupQuery(query);
359}
360
361DataStoreQuery::DataStoreQuery(const DataStoreQuery::State &state, const QByteArray &type, Sink::Storage::EntityStore &store)
362 : mType(type), mStore(store), mLogCtx(store.logContext().subContext("datastorequery"))
334{ 363{
335 setupQuery(); 364 mCollector = state.mCollector;
365 mSource = state.mSource;
366
367 auto source = mCollector;
368 while (source) {
369 source->mDatastore = this;
370 source->mIncremental = true;
371 source = source->mSource;
372 }
336} 373}
337 374
338DataStoreQuery::~DataStoreQuery() 375DataStoreQuery::~DataStoreQuery()
@@ -340,6 +377,14 @@ DataStoreQuery::~DataStoreQuery()
340 377
341} 378}
342 379
380DataStoreQuery::State::Ptr DataStoreQuery::getState()
381{
382 auto state = State::Ptr::create();
383 state->mSource = mSource;
384 state->mCollector = mCollector;
385 return state;
386}
387
343void DataStoreQuery::readEntity(const QByteArray &key, const BufferCallback &resultCallback) 388void DataStoreQuery::readEntity(const QByteArray &key, const BufferCallback &resultCallback)
344{ 389{
345 mStore.readLatest(mType, key, resultCallback); 390 mStore.readLatest(mType, key, resultCallback);
@@ -443,9 +488,10 @@ QByteArrayList DataStoreQuery::executeSubquery(const QueryBase &subquery)
443 return ids; 488 return ids;
444} 489}
445 490
446void DataStoreQuery::setupQuery() 491void DataStoreQuery::setupQuery(const Sink::QueryBase &query_)
447{ 492{
448 auto baseFilters = mQuery.getBaseFilters(); 493 auto query = query_;
494 auto baseFilters = query.getBaseFilters();
449 for (const auto &k : baseFilters.keys()) { 495 for (const auto &k : baseFilters.keys()) {
450 const auto comparator = baseFilters.value(k); 496 const auto comparator = baseFilters.value(k);
451 if (comparator.value.canConvert<Query>()) { 497 if (comparator.value.canConvert<Query>()) {
@@ -454,44 +500,43 @@ void DataStoreQuery::setupQuery()
454 baseFilters.insert(k, Query::Comparator(QVariant::fromValue(result), Query::Comparator::In)); 500 baseFilters.insert(k, Query::Comparator(QVariant::fromValue(result), Query::Comparator::In));
455 } 501 }
456 } 502 }
457 mQuery.setBaseFilters(baseFilters); 503 query.setBaseFilters(baseFilters);
458 504
459 FilterBase::Ptr baseSet; 505 FilterBase::Ptr baseSet;
460 QSet<QByteArray> remainingFilters = mQuery.getBaseFilters().keys().toSet(); 506 QSet<QByteArray> remainingFilters = query.getBaseFilters().keys().toSet();
461 QByteArray appliedSorting; 507 QByteArray appliedSorting;
462 if (!mQuery.ids().isEmpty()) { 508 if (!query.ids().isEmpty()) {
463 mSource = Source::Ptr::create(mQuery.ids().toVector(), this); 509 mSource = Source::Ptr::create(query.ids().toVector(), this);
464 baseSet = mSource; 510 baseSet = mSource;
465 } else { 511 } else {
466 QSet<QByteArray> appliedFilters; 512 QSet<QByteArray> appliedFilters;
467 513
468 auto resultSet = mStore.indexLookup(mType, mQuery, appliedFilters, appliedSorting); 514 auto resultSet = mStore.indexLookup(mType, query, appliedFilters, appliedSorting);
469 remainingFilters = remainingFilters - appliedFilters; 515 remainingFilters = remainingFilters - appliedFilters;
470 516
471 // We do a full scan if there were no indexes available to create the initial set. 517 // We do a full scan if there were no indexes available to create the initial set.
472 if (appliedFilters.isEmpty()) { 518 if (appliedFilters.isEmpty()) {
473 // TODO this should be replaced by an index lookup on the uid index
474 mSource = Source::Ptr::create(mStore.fullScan(mType), this); 519 mSource = Source::Ptr::create(mStore.fullScan(mType), this);
475 } else { 520 } else {
476 mSource = Source::Ptr::create(resultSet, this); 521 mSource = Source::Ptr::create(resultSet, this);
477 } 522 }
478 baseSet = mSource; 523 baseSet = mSource;
479 } 524 }
480 if (!mQuery.getBaseFilters().isEmpty()) { 525 if (!query.getBaseFilters().isEmpty()) {
481 auto filter = Filter::Ptr::create(baseSet, this); 526 auto filter = Filter::Ptr::create(baseSet, this);
482 //For incremental queries the remaining filters are not sufficient 527 //For incremental queries the remaining filters are not sufficient
483 for (const auto &f : mQuery.getBaseFilters().keys()) { 528 for (const auto &f : query.getBaseFilters().keys()) {
484 filter->propertyFilter.insert(f, mQuery.getFilter(f)); 529 filter->propertyFilter.insert(f, query.getFilter(f));
485 } 530 }
486 baseSet = filter; 531 baseSet = filter;
487 } 532 }
488 /* if (appliedSorting.isEmpty() && !mQuery.sortProperty.isEmpty()) { */ 533 /* if (appliedSorting.isEmpty() && !query.sortProperty.isEmpty()) { */
489 /* //Apply manual sorting */ 534 /* //Apply manual sorting */
490 /* baseSet = Sort::Ptr::create(baseSet, mQuery.sortProperty); */ 535 /* baseSet = Sort::Ptr::create(baseSet, query.sortProperty); */
491 /* } */ 536 /* } */
492 537
493 //Setup the rest of the filter stages on top of the base set 538 //Setup the rest of the filter stages on top of the base set
494 for (const auto &stage : mQuery.getFilterStages()) { 539 for (const auto &stage : query.getFilterStages()) {
495 if (auto filter = stage.dynamicCast<Query::Filter>()) { 540 if (auto filter = stage.dynamicCast<Query::Filter>()) {
496 auto f = Filter::Ptr::create(baseSet, this); 541 auto f = Filter::Ptr::create(baseSet, this);
497 f->propertyFilter = filter->propertyFilter; 542 f->propertyFilter = filter->propertyFilter;
@@ -521,7 +566,7 @@ QVector<QByteArray> DataStoreQuery::loadIncrementalResultSet(qint64 baseRevision
521 566
522ResultSet DataStoreQuery::update(qint64 baseRevision) 567ResultSet DataStoreQuery::update(qint64 baseRevision)
523{ 568{
524 SinkTraceCtx(mLogCtx) << "Executing query update to revision " << baseRevision; 569 SinkTraceCtx(mLogCtx) << "Executing query update from revision " << baseRevision;
525 auto incrementalResultSet = loadIncrementalResultSet(baseRevision); 570 auto incrementalResultSet = loadIncrementalResultSet(baseRevision);
526 SinkTraceCtx(mLogCtx) << "Incremental changes: " << incrementalResultSet; 571 SinkTraceCtx(mLogCtx) << "Incremental changes: " << incrementalResultSet;
527 mSource->add(incrementalResultSet); 572 mSource->add(incrementalResultSet);
diff --git a/common/datastorequery.h b/common/datastorequery.h
index 5a47685..a797782 100644
--- a/common/datastorequery.h
+++ b/common/datastorequery.h
@@ -26,6 +26,7 @@
26 26
27class Source; 27class Source;
28class Bloom; 28class Bloom;
29class Reduce;
29class Filter; 30class Filter;
30class FilterBase; 31class FilterBase;
31 32
@@ -33,15 +34,25 @@ class DataStoreQuery {
33 friend class FilterBase; 34 friend class FilterBase;
34 friend class Source; 35 friend class Source;
35 friend class Bloom; 36 friend class Bloom;
37 friend class Reduce;
36 friend class Filter; 38 friend class Filter;
37public: 39public:
38 typedef QSharedPointer<DataStoreQuery> Ptr; 40 typedef QSharedPointer<DataStoreQuery> Ptr;
39 41
42 struct State {
43 typedef QSharedPointer<State> Ptr;
44 QSharedPointer<FilterBase> mCollector;
45 QSharedPointer<Source> mSource;
46 };
47
40 DataStoreQuery(const Sink::QueryBase &query, const QByteArray &type, Sink::Storage::EntityStore &store); 48 DataStoreQuery(const Sink::QueryBase &query, const QByteArray &type, Sink::Storage::EntityStore &store);
49 DataStoreQuery(const DataStoreQuery::State &state, const QByteArray &type, Sink::Storage::EntityStore &store);
41 ~DataStoreQuery(); 50 ~DataStoreQuery();
42 ResultSet execute(); 51 ResultSet execute();
43 ResultSet update(qint64 baseRevision); 52 ResultSet update(qint64 baseRevision);
44 53
54 State::Ptr getState();
55
45private: 56private:
46 57
47 typedef std::function<bool(const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation)> FilterFunction; 58 typedef std::function<bool(const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation)> FilterFunction;
@@ -54,12 +65,10 @@ private:
54 ResultSet createFilteredSet(ResultSet &resultSet, const FilterFunction &); 65 ResultSet createFilteredSet(ResultSet &resultSet, const FilterFunction &);
55 QVector<QByteArray> loadIncrementalResultSet(qint64 baseRevision); 66 QVector<QByteArray> loadIncrementalResultSet(qint64 baseRevision);
56 67
57 void setupQuery(); 68 void setupQuery(const Sink::QueryBase &query_);
58 QByteArrayList executeSubquery(const Sink::QueryBase &subquery); 69 QByteArrayList executeSubquery(const Sink::QueryBase &subquery);
59 70
60 Sink::QueryBase mQuery;
61 const QByteArray mType; 71 const QByteArray mType;
62 bool mInitialQuery;
63 QSharedPointer<FilterBase> mCollector; 72 QSharedPointer<FilterBase> mCollector;
64 QSharedPointer<Source> mSource; 73 QSharedPointer<Source> mSource;
65 74
@@ -102,7 +111,8 @@ public:
102 //Returns true for as long as a result is available 111 //Returns true for as long as a result is available
103 virtual bool next(const std::function<void(const ResultSet::Result &)> &callback) = 0; 112 virtual bool next(const std::function<void(const ResultSet::Result &)> &callback) = 0;
104 113
105 QSharedPointer<FilterBase> mSource; 114 FilterBase::Ptr mSource;
106 DataStoreQuery *mDatastore; 115 DataStoreQuery *mDatastore;
116 bool mIncremental = false;
107}; 117};
108 118
diff --git a/common/queryrunner.cpp b/common/queryrunner.cpp
index 748320f..40880eb 100644
--- a/common/queryrunner.cpp
+++ b/common/queryrunner.cpp
@@ -33,6 +33,7 @@ struct ReplayResult {
33 qint64 newRevision; 33 qint64 newRevision;
34 qint64 replayedEntities; 34 qint64 replayedEntities;
35 bool replayedAll; 35 bool replayedAll;
36 DataStoreQuery::State::Ptr queryState;
36}; 37};
37 38
38/* 39/*
@@ -49,7 +50,7 @@ public:
49 QueryWorker(const Sink::Query &query, const ResourceContext &context, const QByteArray &bufferType, const QueryRunnerBase::ResultTransformation &transformation, const Sink::Log::Context &logCtx); 50 QueryWorker(const Sink::Query &query, const ResourceContext &context, const QByteArray &bufferType, const QueryRunnerBase::ResultTransformation &transformation, const Sink::Log::Context &logCtx);
50 virtual ~QueryWorker(); 51 virtual ~QueryWorker();
51 52
52 ReplayResult executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider); 53 ReplayResult executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, DataStoreQuery::State::Ptr state);
53 ReplayResult executeInitialQuery(const Sink::Query &query, const typename DomainType::Ptr &parent, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, int offset, int batchsize); 54 ReplayResult executeInitialQuery(const Sink::Query &query, const typename DomainType::Ptr &parent, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, int offset, int batchsize);
54 55
55private: 56private:
@@ -69,45 +70,41 @@ QueryRunner<DomainType>::QueryRunner(const Sink::Query &query, const Sink::Resou
69 SinkWarningCtx(mLogCtx) << "A limited query without sorting is typically a bad idea, because there is no telling what you're going to get."; 70 SinkWarningCtx(mLogCtx) << "A limited query without sorting is typically a bad idea, because there is no telling what you're going to get.";
70 } 71 }
71 auto guardPtr = QPointer<QObject>(&guard); 72 auto guardPtr = QPointer<QObject>(&guard);
72 // We delegate loading of initial data to the result provider, so it can decide for itself what it needs to load. 73 auto fetcher = [=](const typename DomainType::Ptr &parent) {
73 mResultProvider->setFetcher([=](const typename DomainType::Ptr &parent) {
74 const QByteArray parentId = parent ? parent->identifier() : QByteArray(); 74 const QByteArray parentId = parent ? parent->identifier() : QByteArray();
75 SinkTraceCtx(mLogCtx) << "Running fetcher. Offset: " << mOffset[parentId] << " Batchsize: " << mBatchSize; 75 SinkTraceCtx(mLogCtx) << "Running fetcher. Offset: " << mOffset[parentId] << " Batchsize: " << mBatchSize;
76 auto resultProvider = mResultProvider; 76 auto resultProvider = mResultProvider;
77 if (query.synchronousQuery()) { 77 auto resultTransformation = mResultTransformation;
78 QueryWorker<DomainType> worker(query, mResourceContext, bufferType, mResultTransformation, mLogCtx); 78 auto offset = mOffset[parentId];
79 const auto newRevisionAndReplayedEntities = worker.executeInitialQuery(query, parent, *resultProvider, mOffset[parentId], mBatchSize); 79 auto batchSize = mBatchSize;
80 mOffset[parentId] += newRevisionAndReplayedEntities.replayedEntities; 80 auto resourceContext = mResourceContext;
81 resultProvider->setRevision(newRevisionAndReplayedEntities.newRevision); 81 auto logCtx = mLogCtx;
82 resultProvider->initialResultSetComplete(parent, newRevisionAndReplayedEntities.replayedAll); 82 const bool runAsync = !query.synchronousQuery();
83 } else { 83 //The lambda will be executed in a separate thread, so copy all arguments
84 auto resultTransformation = mResultTransformation; 84 async::run<ReplayResult>([resultTransformation, offset, batchSize, query, bufferType, resourceContext, resultProvider, parent, logCtx]() {
85 auto offset = mOffset[parentId]; 85 QueryWorker<DomainType> worker(query, resourceContext, bufferType, resultTransformation, logCtx);
86 auto batchSize = mBatchSize; 86 return worker.executeInitialQuery(query, parent, *resultProvider, offset, batchSize);
87 auto resourceContext = mResourceContext; 87 }, runAsync)
88 auto logCtx = mLogCtx; 88 .then([this, parentId, query, parent, resultProvider, guardPtr](const ReplayResult &result) {
89 //The lambda will be executed in a separate thread, so copy all arguments 89 if (!guardPtr) {
90 async::run<ReplayResult>([resultTransformation, offset, batchSize, query, bufferType, resourceContext, resultProvider, parent, logCtx]() { 90 qWarning() << "The parent object is already gone";
91 QueryWorker<DomainType> worker(query, resourceContext, bufferType, resultTransformation, logCtx); 91 return;
92 const auto newRevisionAndReplayedEntities = worker.executeInitialQuery(query, parent, *resultProvider, offset, batchSize); 92 }
93 return newRevisionAndReplayedEntities; 93 mInitialQueryComplete = true;
94 mQueryState = result.queryState;
95 mOffset[parentId] += result.replayedEntities;
96 // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise.
97 if (query.liveQuery()) {
98 mResourceAccess->sendRevisionReplayedCommand(result.newRevision);
99 }
100 resultProvider->setRevision(result.newRevision);
101 resultProvider->initialResultSetComplete(parent, result.replayedAll);
94 }) 102 })
95 .template then([this, parentId, query, parent, resultProvider, guardPtr](const ReplayResult &newRevisionAndReplayedEntities) { 103 .exec();
96 if (!guardPtr) { 104 };
97 qWarning() << "The parent object is already gone"; 105
98 return; 106 // We delegate loading of initial data to the result provider, so it can decide for itself what it needs to load.
99 } 107 mResultProvider->setFetcher(fetcher);
100 mOffset[parentId] += newRevisionAndReplayedEntities.replayedEntities;
101 // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise.
102 if (query.liveQuery()) {
103 mResourceAccess->sendRevisionReplayedCommand(newRevisionAndReplayedEntities.newRevision);
104 }
105 resultProvider->setRevision(newRevisionAndReplayedEntities.newRevision);
106 resultProvider->initialResultSetComplete(parent, newRevisionAndReplayedEntities.replayedAll);
107 })
108 .exec();
109 }
110 });
111 108
112 // In case of a live query we keep the runner for as long alive as the result provider exists 109 // In case of a live query we keep the runner for as long alive as the result provider exists
113 if (query.liveQuery()) { 110 if (query.liveQuery()) {
@@ -117,16 +114,26 @@ QueryRunner<DomainType>::QueryRunner(const Sink::Query &query, const Sink::Resou
117 auto resultProvider = mResultProvider; 114 auto resultProvider = mResultProvider;
118 auto resourceContext = mResourceContext; 115 auto resourceContext = mResourceContext;
119 auto logCtx = mLogCtx; 116 auto logCtx = mLogCtx;
120 return async::run<ReplayResult>([=]() { 117 auto state = mQueryState;
118 if (!mInitialQueryComplete) {
119 SinkWarningCtx(mLogCtx) << "Can't start the incremental query before the initial query is complete";
120 fetcher({});
121 return KAsync::null();
122 }
123 Q_ASSERT(!mQueryInProgress);
124 return KAsync::syncStart<void>([&] {
125 mQueryInProgress = true;
126 })
127 .then(async::run<ReplayResult>([=]() {
121 QueryWorker<DomainType> worker(query, resourceContext, bufferType, mResultTransformation, logCtx); 128 QueryWorker<DomainType> worker(query, resourceContext, bufferType, mResultTransformation, logCtx);
122 const auto newRevisionAndReplayedEntities = worker.executeIncrementalQuery(query, *resultProvider); 129 return worker.executeIncrementalQuery(query, *resultProvider, state);
123 return newRevisionAndReplayedEntities; 130 }))
124 }) 131 .then([query, this, resultProvider, guardPtr](const ReplayResult &newRevisionAndReplayedEntities) {
125 .template then([query, this, resultProvider, guardPtr](const ReplayResult &newRevisionAndReplayedEntities) {
126 if (!guardPtr) { 132 if (!guardPtr) {
127 qWarning() << "The parent object is already gone"; 133 qWarning() << "The parent object is already gone";
128 return; 134 return;
129 } 135 }
136 mQueryInProgress = false;
130 // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. 137 // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise.
131 mResourceAccess->sendRevisionReplayedCommand(newRevisionAndReplayedEntities.newRevision); 138 mResourceAccess->sendRevisionReplayedCommand(newRevisionAndReplayedEntities.newRevision);
132 resultProvider->setRevision(newRevisionAndReplayedEntities.newRevision); 139 resultProvider->setRevision(newRevisionAndReplayedEntities.newRevision);
@@ -201,14 +208,20 @@ void QueryWorker<DomainType>::resultProviderCallback(const Sink::Query &query, S
201} 208}
202 209
203template <class DomainType> 210template <class DomainType>
204ReplayResult QueryWorker<DomainType>::executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider) 211ReplayResult QueryWorker<DomainType>::executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, DataStoreQuery::State::Ptr state)
205{ 212{
206 QTime time; 213 QTime time;
207 time.start(); 214 time.start();
208 215
209 const qint64 baseRevision = resultProvider.revision() + 1; 216 const qint64 baseRevision = resultProvider.revision() + 1;
217 SinkTraceCtx(mLogCtx) << "Running query update from revision: " << baseRevision;
218
210 auto entityStore = EntityStore{mResourceContext, mLogCtx}; 219 auto entityStore = EntityStore{mResourceContext, mLogCtx};
211 auto preparedQuery = DataStoreQuery{query, ApplicationDomain::getTypeName<DomainType>(), entityStore}; 220 if (!state) {
221 SinkWarningCtx(mLogCtx) << "No previous query state.";
222 return {0, 0, false, DataStoreQuery::State::Ptr{}};
223 }
224 auto preparedQuery = DataStoreQuery{*state, ApplicationDomain::getTypeName<DomainType>(), entityStore};
212 auto resultSet = preparedQuery.update(baseRevision); 225 auto resultSet = preparedQuery.update(baseRevision);
213 SinkTraceCtx(mLogCtx) << "Filtered set retrieved. " << Log::TraceTime(time.elapsed()); 226 SinkTraceCtx(mLogCtx) << "Filtered set retrieved. " << Log::TraceTime(time.elapsed());
214 auto replayResult = resultSet.replaySet(0, 0, [this, query, &resultProvider](const ResultSet::Result &result) { 227 auto replayResult = resultSet.replaySet(0, 0, [this, query, &resultProvider](const ResultSet::Result &result) {
@@ -218,7 +231,7 @@ ReplayResult QueryWorker<DomainType>::executeIncrementalQuery(const Sink::Query
218 SinkTraceCtx(mLogCtx) << "Replayed " << replayResult.replayedEntities << " results.\n" 231 SinkTraceCtx(mLogCtx) << "Replayed " << replayResult.replayedEntities << " results.\n"
219 << (replayResult.replayedAll ? "Replayed all available results.\n" : "") 232 << (replayResult.replayedAll ? "Replayed all available results.\n" : "")
220 << "Incremental query took: " << Log::TraceTime(time.elapsed()); 233 << "Incremental query took: " << Log::TraceTime(time.elapsed());
221 return {entityStore.maxRevision(), replayResult.replayedEntities, replayResult.replayedAll}; 234 return {entityStore.maxRevision(), replayResult.replayedEntities, replayResult.replayedAll, preparedQuery.getState()};
222} 235}
223 236
224template <class DomainType> 237template <class DomainType>
@@ -251,14 +264,14 @@ ReplayResult QueryWorker<DomainType>::executeInitialQuery(
251 SinkTraceCtx(mLogCtx) << "Replayed " << replayResult.replayedEntities << " results.\n" 264 SinkTraceCtx(mLogCtx) << "Replayed " << replayResult.replayedEntities << " results.\n"
252 << (replayResult.replayedAll ? "Replayed all available results.\n" : "") 265 << (replayResult.replayedAll ? "Replayed all available results.\n" : "")
253 << "Initial query took: " << Log::TraceTime(time.elapsed()); 266 << "Initial query took: " << Log::TraceTime(time.elapsed());
254 return {entityStore.maxRevision(), replayResult.replayedEntities, replayResult.replayedAll}; 267
268 auto state = preparedQuery.getState();
269
270 return {entityStore.maxRevision(), replayResult.replayedEntities, replayResult.replayedAll, state};
255} 271}
256 272
257template class QueryRunner<Sink::ApplicationDomain::Contact>; 273#define REGISTER_TYPE(T) \
258template class QueryRunner<Sink::ApplicationDomain::Folder>; 274 template class QueryRunner<T>; \
259template class QueryRunner<Sink::ApplicationDomain::Mail>; 275 template class QueryWorker<T>; \
260template class QueryRunner<Sink::ApplicationDomain::Event>; 276
261template class QueryWorker<Sink::ApplicationDomain::Contact>; 277SINK_REGISTER_TYPES()
262template class QueryWorker<Sink::ApplicationDomain::Folder>;
263template class QueryWorker<Sink::ApplicationDomain::Mail>;
264template class QueryWorker<Sink::ApplicationDomain::Event>;
diff --git a/common/queryrunner.h b/common/queryrunner.h
index 66dc68f..f5c7ead 100644
--- a/common/queryrunner.h
+++ b/common/queryrunner.h
@@ -24,6 +24,7 @@
24#include "resultprovider.h" 24#include "resultprovider.h"
25#include "query.h" 25#include "query.h"
26#include "log.h" 26#include "log.h"
27#include "datastorequery.h"
27 28
28/** 29/**
29 * Base clase because you can't have the Q_OBJECT macro in template classes 30 * Base clase because you can't have the Q_OBJECT macro in template classes
@@ -101,4 +102,7 @@ private:
101 int mBatchSize; 102 int mBatchSize;
102 QObject guard; 103 QObject guard;
103 Sink::Log::Context mLogCtx; 104 Sink::Log::Context mLogCtx;
105 DataStoreQuery::State::Ptr mQueryState;
106 bool mInitialQueryComplete = false;
107 bool mQueryInProgress = false;
104}; 108};
diff --git a/tests/querytest.cpp b/tests/querytest.cpp
index 0641c0d..328448f 100644
--- a/tests/querytest.cpp
+++ b/tests/querytest.cpp
@@ -731,7 +731,7 @@ private slots:
731 Query query; 731 Query query;
732 query.setId("testLivequeryUnmatch"); 732 query.setId("testLivequeryUnmatch");
733 query.filter<Mail::Folder>(folder1); 733 query.filter<Mail::Folder>(folder1);
734 query.reduce<Mail::ThreadId>(Query::Reduce::Selector::max<Mail::Date>()).count("count").collect<Mail::Sender>("senders"); 734 query.reduce<Mail::ThreadId>(Query::Reduce::Selector::max<Mail::Date>()).count("count").collect<Mail::Folder>("folders");
735 query.sort<Mail::Date>(); 735 query.sort<Mail::Date>();
736 query.setFlags(Query::LiveQuery); 736 query.setFlags(Query::LiveQuery);
737 query.request<Mail::Unread>(); 737 query.request<Mail::Unread>();
@@ -747,7 +747,61 @@ private slots:
747 VERIFYEXEC(Sink::ResourceControl::flushMessageQueue("sink.dummy.instance1")); 747 VERIFYEXEC(Sink::ResourceControl::flushMessageQueue("sink.dummy.instance1"));
748 QTRY_COMPARE(model->rowCount(), 1); 748 QTRY_COMPARE(model->rowCount(), 1);
749 auto mail = model->data(model->index(0, 0, QModelIndex{}), Sink::Store::DomainObjectRole).value<Mail::Ptr>(); 749 auto mail = model->data(model->index(0, 0, QModelIndex{}), Sink::Store::DomainObjectRole).value<Mail::Ptr>();
750 QCOMPARE(mail->getUnread(), true); 750 QTRY_COMPARE(mail->getUnread(), true);
751 QCOMPARE(mail->getProperty("count").toInt(), 1);
752 QCOMPARE(mail->getProperty("folders").toList().size(), 1);
753 }
754
755 void testReductionUpdate()
756 {
757 // Setup
758 auto folder1 = Folder::createEntity<Folder>("sink.dummy.instance1");
759 VERIFYEXEC(Sink::Store::create<Folder>(folder1));
760
761 auto folder2 = Folder::createEntity<Folder>("sink.dummy.instance1");
762 VERIFYEXEC(Sink::Store::create<Folder>(folder2));
763
764 QDateTime now{QDate{2017, 2, 3}, QTime{10, 0, 0}};
765 QDateTime later{QDate{2017, 2, 3}, QTime{11, 0, 0}};
766
767 auto mail1 = Mail::createEntity<Mail>("sink.dummy.instance1");
768 mail1.setExtractedMessageId("mail1");
769 mail1.setFolder(folder1);
770 mail1.setUnread(false);
771 mail1.setExtractedDate(now);
772 VERIFYEXEC(Sink::Store::create(mail1));
773
774 // Ensure all local data is processed
775 VERIFYEXEC(Sink::ResourceControl::flushMessageQueue("sink.dummy.instance1"));
776
777 Query query;
778 query.setId("testLivequeryUnmatch");
779 query.setFlags(Query::LiveQuery);
780 query.filter<Mail::Folder>(folder1);
781 query.reduce<Mail::Folder>(Query::Reduce::Selector::max<Mail::Date>()).count("count").collect<Mail::Folder>("folders");
782 query.sort<Mail::Date>();
783 query.request<Mail::Unread>();
784 query.request<Mail::MessageId>();
785
786 auto model = Sink::Store::loadModel<Mail>(query);
787 QTRY_COMPARE(model->rowCount(), 1);
788
789 //The leader should change to mail2 after the modification
790 {
791 auto mail2 = Mail::createEntity<Mail>("sink.dummy.instance1");
792 mail2.setExtractedMessageId("mail2");
793 mail2.setFolder(folder1);
794 mail2.setUnread(false);
795 mail2.setExtractedDate(later);
796 VERIFYEXEC(Sink::Store::create(mail2));
797 }
798
799 VERIFYEXEC(Sink::ResourceControl::flushMessageQueue("sink.dummy.instance1"));
800 QTRY_COMPARE(model->rowCount(), 1);
801 auto mail = model->data(model->index(0, 0, QModelIndex{}), Sink::Store::DomainObjectRole).value<Mail::Ptr>();
802 QTRY_COMPARE(mail->getMessageId(), QByteArray{"mail2"});
803 QCOMPARE(mail->getProperty("count").toInt(), 2);
804 QCOMPARE(mail->getProperty("folders").toList().size(), 2);
751 } 805 }
752 806
753 void testBloom() 807 void testBloom()