diff options
author | Christian Mollekopf <chrigi_1@fastmail.fm> | 2017-02-11 12:02:58 +0100 |
---|---|---|
committer | Christian Mollekopf <chrigi_1@fastmail.fm> | 2017-02-13 19:42:39 +0100 |
commit | 1259b236704e790fa1284a63ec537525bce23841 (patch) | |
tree | 85cd0491e56d2f604cc8aa291a49d20f8f73c684 | |
parent | b4bd3932aa2a8e841ed204b341bcbf65ba59c5b2 (diff) | |
download | sink-1259b236704e790fa1284a63ec537525bce23841.tar.gz sink-1259b236704e790fa1284a63ec537525bce23841.zip |
Fixed reduction updates with stateful query.
Some filters need to maintain state between runs in order to be able to
emit only what has changed. This now also make reduction work for live
queries.
-rw-r--r-- | common/datastorequery.cpp | 163 | ||||
-rw-r--r-- | common/datastorequery.h | 18 | ||||
-rw-r--r-- | common/queryrunner.cpp | 119 | ||||
-rw-r--r-- | common/queryrunner.h | 4 | ||||
-rw-r--r-- | tests/querytest.cpp | 58 |
5 files changed, 244 insertions, 118 deletions
diff --git a/common/datastorequery.cpp b/common/datastorequery.cpp index 41d962c..3ba8f40 100644 --- a/common/datastorequery.cpp +++ b/common/datastorequery.cpp | |||
@@ -72,7 +72,7 @@ class Source : public FilterBase { | |||
72 | if (mIt == mIds.constEnd()) { | 72 | if (mIt == mIds.constEnd()) { |
73 | return false; | 73 | return false; |
74 | } | 74 | } |
75 | readEntity(*mIt, [callback](const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation operation) { | 75 | readEntity(*mIt, [this, callback](const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation operation) { |
76 | SinkTraceCtx(mDatastore->mLogCtx) << "Source: Read entity: " << entity.identifier() << operationName(operation); | 76 | SinkTraceCtx(mDatastore->mLogCtx) << "Source: Read entity: " << entity.identifier() << operationName(operation); |
77 | callback({entity, operation}); | 77 | callback({entity, operation}); |
78 | }); | 78 | }); |
@@ -115,7 +115,7 @@ public: | |||
115 | virtual bool next(const std::function<void(const ResultSet::Result &result)> &callback) Q_DECL_OVERRIDE { | 115 | virtual bool next(const std::function<void(const ResultSet::Result &result)> &callback) Q_DECL_OVERRIDE { |
116 | bool foundValue = false; | 116 | bool foundValue = false; |
117 | while(!foundValue && mSource->next([this, callback, &foundValue](const ResultSet::Result &result) { | 117 | while(!foundValue && mSource->next([this, callback, &foundValue](const ResultSet::Result &result) { |
118 | SinkTraceCtx(mDatastore->mLogCtx) << "Filter: " << result.entity.identifier() << result.operation; | 118 | SinkTraceCtx(mDatastore->mLogCtx) << "Filter: " << result.entity.identifier() << operationName(result.operation); |
119 | 119 | ||
120 | //Always accept removals. They can't match the filter since the data is gone. | 120 | //Always accept removals. They can't match the filter since the data is gone. |
121 | if (result.operation == Sink::Operation_Removal) { | 121 | if (result.operation == Sink::Operation_Removal) { |
@@ -167,17 +167,11 @@ public: | |||
167 | 167 | ||
168 | } | 168 | } |
169 | 169 | ||
170 | void process() { | ||
171 | if (operation == QueryBase::Reduce::Aggregator::Count) { | ||
172 | mResult = mResult.toInt() + 1; | ||
173 | } else { | ||
174 | Q_ASSERT(false); | ||
175 | } | ||
176 | } | ||
177 | |||
178 | void process(const QVariant &value) { | 170 | void process(const QVariant &value) { |
179 | if (operation == QueryBase::Reduce::Aggregator::Collect) { | 171 | if (operation == QueryBase::Reduce::Aggregator::Collect) { |
180 | mResult = mResult.toList() << value; | 172 | mResult = mResult.toList() << value; |
173 | } else if (operation == QueryBase::Reduce::Aggregator::Count) { | ||
174 | mResult = mResult.toInt() + 1; | ||
181 | } else { | 175 | } else { |
182 | Q_ASSERT(false); | 176 | Q_ASSERT(false); |
183 | } | 177 | } |
@@ -196,8 +190,8 @@ public: | |||
196 | QVariant mResult; | 190 | QVariant mResult; |
197 | }; | 191 | }; |
198 | 192 | ||
199 | QHash<QByteArray, QVariant> mAggregateValues; | ||
200 | QSet<QByteArray> mReducedValues; | 193 | QSet<QByteArray> mReducedValues; |
194 | QHash<QByteArray, QByteArray> mSelectedValues; | ||
201 | QByteArray mReductionProperty; | 195 | QByteArray mReductionProperty; |
202 | QByteArray mSelectionProperty; | 196 | QByteArray mSelectionProperty; |
203 | QueryBase::Reduce::Selector::Comparator mSelectionComparator; | 197 | QueryBase::Reduce::Selector::Comparator mSelectionComparator; |
@@ -231,51 +225,80 @@ public: | |||
231 | return false; | 225 | return false; |
232 | } | 226 | } |
233 | 227 | ||
228 | QByteArray reduceOnValue(const QVariant &reductionValue, QMap<QByteArray, QVariant> &aggregateValues) | ||
229 | { | ||
230 | QVariant selectionResultValue; | ||
231 | QByteArray selectionResult; | ||
232 | auto results = indexLookup(mReductionProperty, reductionValue); | ||
233 | for (auto &aggregator : mAggregators) { | ||
234 | aggregator.reset(); | ||
235 | } | ||
236 | |||
237 | for (const auto &r : results) { | ||
238 | readEntity(r, [&, this](const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation operation) { | ||
239 | Q_ASSERT(operation != Sink::Operation_Removal); | ||
240 | for (auto &aggregator : mAggregators) { | ||
241 | if (!aggregator.property.isEmpty()) { | ||
242 | aggregator.process(entity.getProperty(aggregator.property)); | ||
243 | } else { | ||
244 | aggregator.process(QVariant{}); | ||
245 | } | ||
246 | } | ||
247 | auto selectionValue = entity.getProperty(mSelectionProperty); | ||
248 | if (!selectionResultValue.isValid() || compare(selectionValue, selectionResultValue, mSelectionComparator)) { | ||
249 | selectionResultValue = selectionValue; | ||
250 | selectionResult = entity.identifier(); | ||
251 | } | ||
252 | }); | ||
253 | } | ||
254 | |||
255 | for (auto &aggregator : mAggregators) { | ||
256 | aggregateValues.insert(aggregator.resultProperty, aggregator.result()); | ||
257 | } | ||
258 | return selectionResult; | ||
259 | } | ||
260 | |||
234 | bool next(const std::function<void(const ResultSet::Result &)> &callback) Q_DECL_OVERRIDE { | 261 | bool next(const std::function<void(const ResultSet::Result &)> &callback) Q_DECL_OVERRIDE { |
235 | bool foundValue = false; | 262 | bool foundValue = false; |
236 | while(!foundValue && mSource->next([this, callback, &foundValue](const ResultSet::Result &result) { | 263 | while(!foundValue && mSource->next([this, callback, &foundValue](const ResultSet::Result &result) { |
237 | auto reductionValue = result.entity.getProperty(mReductionProperty); | ||
238 | if (result.operation == Sink::Operation_Removal) { | 264 | if (result.operation == Sink::Operation_Removal) { |
239 | callback(result); | 265 | callback(result); |
240 | return false; | 266 | return false; |
241 | } | 267 | } |
242 | if (!mReducedValues.contains(getByteArray(reductionValue))) { | 268 | auto reductionValue = result.entity.getProperty(mReductionProperty); |
269 | const auto &reductionValueBa = getByteArray(reductionValue); | ||
270 | if (!mReducedValues.contains(reductionValueBa)) { | ||
243 | //Only reduce every value once. | 271 | //Only reduce every value once. |
244 | mReducedValues.insert(getByteArray(reductionValue)); | 272 | mReducedValues.insert(reductionValueBa); |
245 | QVariant selectionResultValue; | ||
246 | QByteArray selectionResult; | ||
247 | auto results = indexLookup(mReductionProperty, reductionValue); | ||
248 | for (auto &aggregator : mAggregators) { | ||
249 | aggregator.reset(); | ||
250 | } | ||
251 | |||
252 | QVariantList list; | ||
253 | for (const auto &r : results) { | ||
254 | readEntity(r, [&, this](const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation operation) { | ||
255 | for (auto &aggregator : mAggregators) { | ||
256 | if (!aggregator.property.isEmpty()) { | ||
257 | aggregator.process(entity.getProperty(aggregator.property)); | ||
258 | } else { | ||
259 | aggregator.process(); | ||
260 | } | ||
261 | } | ||
262 | auto selectionValue = entity.getProperty(mSelectionProperty); | ||
263 | if (!selectionResultValue.isValid() || compare(selectionValue, selectionResultValue, mSelectionComparator)) { | ||
264 | selectionResultValue = selectionValue; | ||
265 | selectionResult = entity.identifier(); | ||
266 | } | ||
267 | }); | ||
268 | } | ||
269 | |||
270 | QMap<QByteArray, QVariant> aggregateValues; | 273 | QMap<QByteArray, QVariant> aggregateValues; |
271 | for (auto &aggregator : mAggregators) { | 274 | auto selectionResult = reduceOnValue(reductionValue, aggregateValues); |
272 | aggregateValues.insert(aggregator.resultProperty, aggregator.result()); | ||
273 | } | ||
274 | 275 | ||
276 | mSelectedValues.insert(reductionValueBa, selectionResult); | ||
275 | readEntity(selectionResult, [&, this](const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation operation) { | 277 | readEntity(selectionResult, [&, this](const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation operation) { |
276 | callback({entity, operation, aggregateValues}); | 278 | callback({entity, operation, aggregateValues}); |
277 | foundValue = true; | 279 | foundValue = true; |
278 | }); | 280 | }); |
281 | } else { | ||
282 | //During initial query, do nothing. The lookup above will take care of it. | ||
283 | //During updates adjust the reduction according to the modification/addition or removal | ||
284 | if (mIncremental) { | ||
285 | //redo the reduction | ||
286 | QMap<QByteArray, QVariant> aggregateValues; | ||
287 | auto selectionResult = reduceOnValue(reductionValue, aggregateValues); | ||
288 | |||
289 | //TODO if old and new are the same a modification would be enough | ||
290 | auto oldSelectionResult = mSelectedValues.take(reductionValueBa); | ||
291 | //remove old result | ||
292 | readEntity(oldSelectionResult, [&, this](const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation) { | ||
293 | callback({entity, Sink::Operation_Removal}); | ||
294 | }); | ||
295 | |||
296 | //add new result | ||
297 | mSelectedValues.insert(reductionValueBa, selectionResult); | ||
298 | readEntity(selectionResult, [&, this](const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation) { | ||
299 | callback({entity, Sink::Operation_Creation, aggregateValues}); | ||
300 | }); | ||
301 | } | ||
279 | } | 302 | } |
280 | return false; | 303 | return false; |
281 | })) | 304 | })) |
@@ -330,9 +353,23 @@ public: | |||
330 | }; | 353 | }; |
331 | 354 | ||
332 | DataStoreQuery::DataStoreQuery(const Sink::QueryBase &query, const QByteArray &type, EntityStore &store) | 355 | DataStoreQuery::DataStoreQuery(const Sink::QueryBase &query, const QByteArray &type, EntityStore &store) |
333 | : mQuery(query), mType(type), mStore(store), mLogCtx(store.logContext().subContext("datastorequery")) | 356 | : mType(type), mStore(store), mLogCtx(store.logContext().subContext("datastorequery")) |
357 | { | ||
358 | setupQuery(query); | ||
359 | } | ||
360 | |||
361 | DataStoreQuery::DataStoreQuery(const DataStoreQuery::State &state, const QByteArray &type, Sink::Storage::EntityStore &store) | ||
362 | : mType(type), mStore(store), mLogCtx(store.logContext().subContext("datastorequery")) | ||
334 | { | 363 | { |
335 | setupQuery(); | 364 | mCollector = state.mCollector; |
365 | mSource = state.mSource; | ||
366 | |||
367 | auto source = mCollector; | ||
368 | while (source) { | ||
369 | source->mDatastore = this; | ||
370 | source->mIncremental = true; | ||
371 | source = source->mSource; | ||
372 | } | ||
336 | } | 373 | } |
337 | 374 | ||
338 | DataStoreQuery::~DataStoreQuery() | 375 | DataStoreQuery::~DataStoreQuery() |
@@ -340,6 +377,14 @@ DataStoreQuery::~DataStoreQuery() | |||
340 | 377 | ||
341 | } | 378 | } |
342 | 379 | ||
380 | DataStoreQuery::State::Ptr DataStoreQuery::getState() | ||
381 | { | ||
382 | auto state = State::Ptr::create(); | ||
383 | state->mSource = mSource; | ||
384 | state->mCollector = mCollector; | ||
385 | return state; | ||
386 | } | ||
387 | |||
343 | void DataStoreQuery::readEntity(const QByteArray &key, const BufferCallback &resultCallback) | 388 | void DataStoreQuery::readEntity(const QByteArray &key, const BufferCallback &resultCallback) |
344 | { | 389 | { |
345 | mStore.readLatest(mType, key, resultCallback); | 390 | mStore.readLatest(mType, key, resultCallback); |
@@ -443,9 +488,10 @@ QByteArrayList DataStoreQuery::executeSubquery(const QueryBase &subquery) | |||
443 | return ids; | 488 | return ids; |
444 | } | 489 | } |
445 | 490 | ||
446 | void DataStoreQuery::setupQuery() | 491 | void DataStoreQuery::setupQuery(const Sink::QueryBase &query_) |
447 | { | 492 | { |
448 | auto baseFilters = mQuery.getBaseFilters(); | 493 | auto query = query_; |
494 | auto baseFilters = query.getBaseFilters(); | ||
449 | for (const auto &k : baseFilters.keys()) { | 495 | for (const auto &k : baseFilters.keys()) { |
450 | const auto comparator = baseFilters.value(k); | 496 | const auto comparator = baseFilters.value(k); |
451 | if (comparator.value.canConvert<Query>()) { | 497 | if (comparator.value.canConvert<Query>()) { |
@@ -454,44 +500,43 @@ void DataStoreQuery::setupQuery() | |||
454 | baseFilters.insert(k, Query::Comparator(QVariant::fromValue(result), Query::Comparator::In)); | 500 | baseFilters.insert(k, Query::Comparator(QVariant::fromValue(result), Query::Comparator::In)); |
455 | } | 501 | } |
456 | } | 502 | } |
457 | mQuery.setBaseFilters(baseFilters); | 503 | query.setBaseFilters(baseFilters); |
458 | 504 | ||
459 | FilterBase::Ptr baseSet; | 505 | FilterBase::Ptr baseSet; |
460 | QSet<QByteArray> remainingFilters = mQuery.getBaseFilters().keys().toSet(); | 506 | QSet<QByteArray> remainingFilters = query.getBaseFilters().keys().toSet(); |
461 | QByteArray appliedSorting; | 507 | QByteArray appliedSorting; |
462 | if (!mQuery.ids().isEmpty()) { | 508 | if (!query.ids().isEmpty()) { |
463 | mSource = Source::Ptr::create(mQuery.ids().toVector(), this); | 509 | mSource = Source::Ptr::create(query.ids().toVector(), this); |
464 | baseSet = mSource; | 510 | baseSet = mSource; |
465 | } else { | 511 | } else { |
466 | QSet<QByteArray> appliedFilters; | 512 | QSet<QByteArray> appliedFilters; |
467 | 513 | ||
468 | auto resultSet = mStore.indexLookup(mType, mQuery, appliedFilters, appliedSorting); | 514 | auto resultSet = mStore.indexLookup(mType, query, appliedFilters, appliedSorting); |
469 | remainingFilters = remainingFilters - appliedFilters; | 515 | remainingFilters = remainingFilters - appliedFilters; |
470 | 516 | ||
471 | // We do a full scan if there were no indexes available to create the initial set. | 517 | // We do a full scan if there were no indexes available to create the initial set. |
472 | if (appliedFilters.isEmpty()) { | 518 | if (appliedFilters.isEmpty()) { |
473 | // TODO this should be replaced by an index lookup on the uid index | ||
474 | mSource = Source::Ptr::create(mStore.fullScan(mType), this); | 519 | mSource = Source::Ptr::create(mStore.fullScan(mType), this); |
475 | } else { | 520 | } else { |
476 | mSource = Source::Ptr::create(resultSet, this); | 521 | mSource = Source::Ptr::create(resultSet, this); |
477 | } | 522 | } |
478 | baseSet = mSource; | 523 | baseSet = mSource; |
479 | } | 524 | } |
480 | if (!mQuery.getBaseFilters().isEmpty()) { | 525 | if (!query.getBaseFilters().isEmpty()) { |
481 | auto filter = Filter::Ptr::create(baseSet, this); | 526 | auto filter = Filter::Ptr::create(baseSet, this); |
482 | //For incremental queries the remaining filters are not sufficient | 527 | //For incremental queries the remaining filters are not sufficient |
483 | for (const auto &f : mQuery.getBaseFilters().keys()) { | 528 | for (const auto &f : query.getBaseFilters().keys()) { |
484 | filter->propertyFilter.insert(f, mQuery.getFilter(f)); | 529 | filter->propertyFilter.insert(f, query.getFilter(f)); |
485 | } | 530 | } |
486 | baseSet = filter; | 531 | baseSet = filter; |
487 | } | 532 | } |
488 | /* if (appliedSorting.isEmpty() && !mQuery.sortProperty.isEmpty()) { */ | 533 | /* if (appliedSorting.isEmpty() && !query.sortProperty.isEmpty()) { */ |
489 | /* //Apply manual sorting */ | 534 | /* //Apply manual sorting */ |
490 | /* baseSet = Sort::Ptr::create(baseSet, mQuery.sortProperty); */ | 535 | /* baseSet = Sort::Ptr::create(baseSet, query.sortProperty); */ |
491 | /* } */ | 536 | /* } */ |
492 | 537 | ||
493 | //Setup the rest of the filter stages on top of the base set | 538 | //Setup the rest of the filter stages on top of the base set |
494 | for (const auto &stage : mQuery.getFilterStages()) { | 539 | for (const auto &stage : query.getFilterStages()) { |
495 | if (auto filter = stage.dynamicCast<Query::Filter>()) { | 540 | if (auto filter = stage.dynamicCast<Query::Filter>()) { |
496 | auto f = Filter::Ptr::create(baseSet, this); | 541 | auto f = Filter::Ptr::create(baseSet, this); |
497 | f->propertyFilter = filter->propertyFilter; | 542 | f->propertyFilter = filter->propertyFilter; |
@@ -521,7 +566,7 @@ QVector<QByteArray> DataStoreQuery::loadIncrementalResultSet(qint64 baseRevision | |||
521 | 566 | ||
522 | ResultSet DataStoreQuery::update(qint64 baseRevision) | 567 | ResultSet DataStoreQuery::update(qint64 baseRevision) |
523 | { | 568 | { |
524 | SinkTraceCtx(mLogCtx) << "Executing query update to revision " << baseRevision; | 569 | SinkTraceCtx(mLogCtx) << "Executing query update from revision " << baseRevision; |
525 | auto incrementalResultSet = loadIncrementalResultSet(baseRevision); | 570 | auto incrementalResultSet = loadIncrementalResultSet(baseRevision); |
526 | SinkTraceCtx(mLogCtx) << "Incremental changes: " << incrementalResultSet; | 571 | SinkTraceCtx(mLogCtx) << "Incremental changes: " << incrementalResultSet; |
527 | mSource->add(incrementalResultSet); | 572 | mSource->add(incrementalResultSet); |
diff --git a/common/datastorequery.h b/common/datastorequery.h index 5a47685..a797782 100644 --- a/common/datastorequery.h +++ b/common/datastorequery.h | |||
@@ -26,6 +26,7 @@ | |||
26 | 26 | ||
27 | class Source; | 27 | class Source; |
28 | class Bloom; | 28 | class Bloom; |
29 | class Reduce; | ||
29 | class Filter; | 30 | class Filter; |
30 | class FilterBase; | 31 | class FilterBase; |
31 | 32 | ||
@@ -33,15 +34,25 @@ class DataStoreQuery { | |||
33 | friend class FilterBase; | 34 | friend class FilterBase; |
34 | friend class Source; | 35 | friend class Source; |
35 | friend class Bloom; | 36 | friend class Bloom; |
37 | friend class Reduce; | ||
36 | friend class Filter; | 38 | friend class Filter; |
37 | public: | 39 | public: |
38 | typedef QSharedPointer<DataStoreQuery> Ptr; | 40 | typedef QSharedPointer<DataStoreQuery> Ptr; |
39 | 41 | ||
42 | struct State { | ||
43 | typedef QSharedPointer<State> Ptr; | ||
44 | QSharedPointer<FilterBase> mCollector; | ||
45 | QSharedPointer<Source> mSource; | ||
46 | }; | ||
47 | |||
40 | DataStoreQuery(const Sink::QueryBase &query, const QByteArray &type, Sink::Storage::EntityStore &store); | 48 | DataStoreQuery(const Sink::QueryBase &query, const QByteArray &type, Sink::Storage::EntityStore &store); |
49 | DataStoreQuery(const DataStoreQuery::State &state, const QByteArray &type, Sink::Storage::EntityStore &store); | ||
41 | ~DataStoreQuery(); | 50 | ~DataStoreQuery(); |
42 | ResultSet execute(); | 51 | ResultSet execute(); |
43 | ResultSet update(qint64 baseRevision); | 52 | ResultSet update(qint64 baseRevision); |
44 | 53 | ||
54 | State::Ptr getState(); | ||
55 | |||
45 | private: | 56 | private: |
46 | 57 | ||
47 | typedef std::function<bool(const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation)> FilterFunction; | 58 | typedef std::function<bool(const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation)> FilterFunction; |
@@ -54,12 +65,10 @@ private: | |||
54 | ResultSet createFilteredSet(ResultSet &resultSet, const FilterFunction &); | 65 | ResultSet createFilteredSet(ResultSet &resultSet, const FilterFunction &); |
55 | QVector<QByteArray> loadIncrementalResultSet(qint64 baseRevision); | 66 | QVector<QByteArray> loadIncrementalResultSet(qint64 baseRevision); |
56 | 67 | ||
57 | void setupQuery(); | 68 | void setupQuery(const Sink::QueryBase &query_); |
58 | QByteArrayList executeSubquery(const Sink::QueryBase &subquery); | 69 | QByteArrayList executeSubquery(const Sink::QueryBase &subquery); |
59 | 70 | ||
60 | Sink::QueryBase mQuery; | ||
61 | const QByteArray mType; | 71 | const QByteArray mType; |
62 | bool mInitialQuery; | ||
63 | QSharedPointer<FilterBase> mCollector; | 72 | QSharedPointer<FilterBase> mCollector; |
64 | QSharedPointer<Source> mSource; | 73 | QSharedPointer<Source> mSource; |
65 | 74 | ||
@@ -102,7 +111,8 @@ public: | |||
102 | //Returns true for as long as a result is available | 111 | //Returns true for as long as a result is available |
103 | virtual bool next(const std::function<void(const ResultSet::Result &)> &callback) = 0; | 112 | virtual bool next(const std::function<void(const ResultSet::Result &)> &callback) = 0; |
104 | 113 | ||
105 | QSharedPointer<FilterBase> mSource; | 114 | FilterBase::Ptr mSource; |
106 | DataStoreQuery *mDatastore; | 115 | DataStoreQuery *mDatastore; |
116 | bool mIncremental = false; | ||
107 | }; | 117 | }; |
108 | 118 | ||
diff --git a/common/queryrunner.cpp b/common/queryrunner.cpp index 748320f..40880eb 100644 --- a/common/queryrunner.cpp +++ b/common/queryrunner.cpp | |||
@@ -33,6 +33,7 @@ struct ReplayResult { | |||
33 | qint64 newRevision; | 33 | qint64 newRevision; |
34 | qint64 replayedEntities; | 34 | qint64 replayedEntities; |
35 | bool replayedAll; | 35 | bool replayedAll; |
36 | DataStoreQuery::State::Ptr queryState; | ||
36 | }; | 37 | }; |
37 | 38 | ||
38 | /* | 39 | /* |
@@ -49,7 +50,7 @@ public: | |||
49 | QueryWorker(const Sink::Query &query, const ResourceContext &context, const QByteArray &bufferType, const QueryRunnerBase::ResultTransformation &transformation, const Sink::Log::Context &logCtx); | 50 | QueryWorker(const Sink::Query &query, const ResourceContext &context, const QByteArray &bufferType, const QueryRunnerBase::ResultTransformation &transformation, const Sink::Log::Context &logCtx); |
50 | virtual ~QueryWorker(); | 51 | virtual ~QueryWorker(); |
51 | 52 | ||
52 | ReplayResult executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider); | 53 | ReplayResult executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, DataStoreQuery::State::Ptr state); |
53 | ReplayResult executeInitialQuery(const Sink::Query &query, const typename DomainType::Ptr &parent, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, int offset, int batchsize); | 54 | ReplayResult executeInitialQuery(const Sink::Query &query, const typename DomainType::Ptr &parent, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, int offset, int batchsize); |
54 | 55 | ||
55 | private: | 56 | private: |
@@ -69,45 +70,41 @@ QueryRunner<DomainType>::QueryRunner(const Sink::Query &query, const Sink::Resou | |||
69 | SinkWarningCtx(mLogCtx) << "A limited query without sorting is typically a bad idea, because there is no telling what you're going to get."; | 70 | SinkWarningCtx(mLogCtx) << "A limited query without sorting is typically a bad idea, because there is no telling what you're going to get."; |
70 | } | 71 | } |
71 | auto guardPtr = QPointer<QObject>(&guard); | 72 | auto guardPtr = QPointer<QObject>(&guard); |
72 | // We delegate loading of initial data to the result provider, so it can decide for itself what it needs to load. | 73 | auto fetcher = [=](const typename DomainType::Ptr &parent) { |
73 | mResultProvider->setFetcher([=](const typename DomainType::Ptr &parent) { | ||
74 | const QByteArray parentId = parent ? parent->identifier() : QByteArray(); | 74 | const QByteArray parentId = parent ? parent->identifier() : QByteArray(); |
75 | SinkTraceCtx(mLogCtx) << "Running fetcher. Offset: " << mOffset[parentId] << " Batchsize: " << mBatchSize; | 75 | SinkTraceCtx(mLogCtx) << "Running fetcher. Offset: " << mOffset[parentId] << " Batchsize: " << mBatchSize; |
76 | auto resultProvider = mResultProvider; | 76 | auto resultProvider = mResultProvider; |
77 | if (query.synchronousQuery()) { | 77 | auto resultTransformation = mResultTransformation; |
78 | QueryWorker<DomainType> worker(query, mResourceContext, bufferType, mResultTransformation, mLogCtx); | 78 | auto offset = mOffset[parentId]; |
79 | const auto newRevisionAndReplayedEntities = worker.executeInitialQuery(query, parent, *resultProvider, mOffset[parentId], mBatchSize); | 79 | auto batchSize = mBatchSize; |
80 | mOffset[parentId] += newRevisionAndReplayedEntities.replayedEntities; | 80 | auto resourceContext = mResourceContext; |
81 | resultProvider->setRevision(newRevisionAndReplayedEntities.newRevision); | 81 | auto logCtx = mLogCtx; |
82 | resultProvider->initialResultSetComplete(parent, newRevisionAndReplayedEntities.replayedAll); | 82 | const bool runAsync = !query.synchronousQuery(); |
83 | } else { | 83 | //The lambda will be executed in a separate thread, so copy all arguments |
84 | auto resultTransformation = mResultTransformation; | 84 | async::run<ReplayResult>([resultTransformation, offset, batchSize, query, bufferType, resourceContext, resultProvider, parent, logCtx]() { |
85 | auto offset = mOffset[parentId]; | 85 | QueryWorker<DomainType> worker(query, resourceContext, bufferType, resultTransformation, logCtx); |
86 | auto batchSize = mBatchSize; | 86 | return worker.executeInitialQuery(query, parent, *resultProvider, offset, batchSize); |
87 | auto resourceContext = mResourceContext; | 87 | }, runAsync) |
88 | auto logCtx = mLogCtx; | 88 | .then([this, parentId, query, parent, resultProvider, guardPtr](const ReplayResult &result) { |
89 | //The lambda will be executed in a separate thread, so copy all arguments | 89 | if (!guardPtr) { |
90 | async::run<ReplayResult>([resultTransformation, offset, batchSize, query, bufferType, resourceContext, resultProvider, parent, logCtx]() { | 90 | qWarning() << "The parent object is already gone"; |
91 | QueryWorker<DomainType> worker(query, resourceContext, bufferType, resultTransformation, logCtx); | 91 | return; |
92 | const auto newRevisionAndReplayedEntities = worker.executeInitialQuery(query, parent, *resultProvider, offset, batchSize); | 92 | } |
93 | return newRevisionAndReplayedEntities; | 93 | mInitialQueryComplete = true; |
94 | mQueryState = result.queryState; | ||
95 | mOffset[parentId] += result.replayedEntities; | ||
96 | // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. | ||
97 | if (query.liveQuery()) { | ||
98 | mResourceAccess->sendRevisionReplayedCommand(result.newRevision); | ||
99 | } | ||
100 | resultProvider->setRevision(result.newRevision); | ||
101 | resultProvider->initialResultSetComplete(parent, result.replayedAll); | ||
94 | }) | 102 | }) |
95 | .template then([this, parentId, query, parent, resultProvider, guardPtr](const ReplayResult &newRevisionAndReplayedEntities) { | 103 | .exec(); |
96 | if (!guardPtr) { | 104 | }; |
97 | qWarning() << "The parent object is already gone"; | 105 | |
98 | return; | 106 | // We delegate loading of initial data to the result provider, so it can decide for itself what it needs to load. |
99 | } | 107 | mResultProvider->setFetcher(fetcher); |
100 | mOffset[parentId] += newRevisionAndReplayedEntities.replayedEntities; | ||
101 | // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. | ||
102 | if (query.liveQuery()) { | ||
103 | mResourceAccess->sendRevisionReplayedCommand(newRevisionAndReplayedEntities.newRevision); | ||
104 | } | ||
105 | resultProvider->setRevision(newRevisionAndReplayedEntities.newRevision); | ||
106 | resultProvider->initialResultSetComplete(parent, newRevisionAndReplayedEntities.replayedAll); | ||
107 | }) | ||
108 | .exec(); | ||
109 | } | ||
110 | }); | ||
111 | 108 | ||
112 | // In case of a live query we keep the runner for as long alive as the result provider exists | 109 | // In case of a live query we keep the runner for as long alive as the result provider exists |
113 | if (query.liveQuery()) { | 110 | if (query.liveQuery()) { |
@@ -117,16 +114,26 @@ QueryRunner<DomainType>::QueryRunner(const Sink::Query &query, const Sink::Resou | |||
117 | auto resultProvider = mResultProvider; | 114 | auto resultProvider = mResultProvider; |
118 | auto resourceContext = mResourceContext; | 115 | auto resourceContext = mResourceContext; |
119 | auto logCtx = mLogCtx; | 116 | auto logCtx = mLogCtx; |
120 | return async::run<ReplayResult>([=]() { | 117 | auto state = mQueryState; |
118 | if (!mInitialQueryComplete) { | ||
119 | SinkWarningCtx(mLogCtx) << "Can't start the incremental query before the initial query is complete"; | ||
120 | fetcher({}); | ||
121 | return KAsync::null(); | ||
122 | } | ||
123 | Q_ASSERT(!mQueryInProgress); | ||
124 | return KAsync::syncStart<void>([&] { | ||
125 | mQueryInProgress = true; | ||
126 | }) | ||
127 | .then(async::run<ReplayResult>([=]() { | ||
121 | QueryWorker<DomainType> worker(query, resourceContext, bufferType, mResultTransformation, logCtx); | 128 | QueryWorker<DomainType> worker(query, resourceContext, bufferType, mResultTransformation, logCtx); |
122 | const auto newRevisionAndReplayedEntities = worker.executeIncrementalQuery(query, *resultProvider); | 129 | return worker.executeIncrementalQuery(query, *resultProvider, state); |
123 | return newRevisionAndReplayedEntities; | 130 | })) |
124 | }) | 131 | .then([query, this, resultProvider, guardPtr](const ReplayResult &newRevisionAndReplayedEntities) { |
125 | .template then([query, this, resultProvider, guardPtr](const ReplayResult &newRevisionAndReplayedEntities) { | ||
126 | if (!guardPtr) { | 132 | if (!guardPtr) { |
127 | qWarning() << "The parent object is already gone"; | 133 | qWarning() << "The parent object is already gone"; |
128 | return; | 134 | return; |
129 | } | 135 | } |
136 | mQueryInProgress = false; | ||
130 | // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. | 137 | // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. |
131 | mResourceAccess->sendRevisionReplayedCommand(newRevisionAndReplayedEntities.newRevision); | 138 | mResourceAccess->sendRevisionReplayedCommand(newRevisionAndReplayedEntities.newRevision); |
132 | resultProvider->setRevision(newRevisionAndReplayedEntities.newRevision); | 139 | resultProvider->setRevision(newRevisionAndReplayedEntities.newRevision); |
@@ -201,14 +208,20 @@ void QueryWorker<DomainType>::resultProviderCallback(const Sink::Query &query, S | |||
201 | } | 208 | } |
202 | 209 | ||
203 | template <class DomainType> | 210 | template <class DomainType> |
204 | ReplayResult QueryWorker<DomainType>::executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider) | 211 | ReplayResult QueryWorker<DomainType>::executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, DataStoreQuery::State::Ptr state) |
205 | { | 212 | { |
206 | QTime time; | 213 | QTime time; |
207 | time.start(); | 214 | time.start(); |
208 | 215 | ||
209 | const qint64 baseRevision = resultProvider.revision() + 1; | 216 | const qint64 baseRevision = resultProvider.revision() + 1; |
217 | SinkTraceCtx(mLogCtx) << "Running query update from revision: " << baseRevision; | ||
218 | |||
210 | auto entityStore = EntityStore{mResourceContext, mLogCtx}; | 219 | auto entityStore = EntityStore{mResourceContext, mLogCtx}; |
211 | auto preparedQuery = DataStoreQuery{query, ApplicationDomain::getTypeName<DomainType>(), entityStore}; | 220 | if (!state) { |
221 | SinkWarningCtx(mLogCtx) << "No previous query state."; | ||
222 | return {0, 0, false, DataStoreQuery::State::Ptr{}}; | ||
223 | } | ||
224 | auto preparedQuery = DataStoreQuery{*state, ApplicationDomain::getTypeName<DomainType>(), entityStore}; | ||
212 | auto resultSet = preparedQuery.update(baseRevision); | 225 | auto resultSet = preparedQuery.update(baseRevision); |
213 | SinkTraceCtx(mLogCtx) << "Filtered set retrieved. " << Log::TraceTime(time.elapsed()); | 226 | SinkTraceCtx(mLogCtx) << "Filtered set retrieved. " << Log::TraceTime(time.elapsed()); |
214 | auto replayResult = resultSet.replaySet(0, 0, [this, query, &resultProvider](const ResultSet::Result &result) { | 227 | auto replayResult = resultSet.replaySet(0, 0, [this, query, &resultProvider](const ResultSet::Result &result) { |
@@ -218,7 +231,7 @@ ReplayResult QueryWorker<DomainType>::executeIncrementalQuery(const Sink::Query | |||
218 | SinkTraceCtx(mLogCtx) << "Replayed " << replayResult.replayedEntities << " results.\n" | 231 | SinkTraceCtx(mLogCtx) << "Replayed " << replayResult.replayedEntities << " results.\n" |
219 | << (replayResult.replayedAll ? "Replayed all available results.\n" : "") | 232 | << (replayResult.replayedAll ? "Replayed all available results.\n" : "") |
220 | << "Incremental query took: " << Log::TraceTime(time.elapsed()); | 233 | << "Incremental query took: " << Log::TraceTime(time.elapsed()); |
221 | return {entityStore.maxRevision(), replayResult.replayedEntities, replayResult.replayedAll}; | 234 | return {entityStore.maxRevision(), replayResult.replayedEntities, replayResult.replayedAll, preparedQuery.getState()}; |
222 | } | 235 | } |
223 | 236 | ||
224 | template <class DomainType> | 237 | template <class DomainType> |
@@ -251,14 +264,14 @@ ReplayResult QueryWorker<DomainType>::executeInitialQuery( | |||
251 | SinkTraceCtx(mLogCtx) << "Replayed " << replayResult.replayedEntities << " results.\n" | 264 | SinkTraceCtx(mLogCtx) << "Replayed " << replayResult.replayedEntities << " results.\n" |
252 | << (replayResult.replayedAll ? "Replayed all available results.\n" : "") | 265 | << (replayResult.replayedAll ? "Replayed all available results.\n" : "") |
253 | << "Initial query took: " << Log::TraceTime(time.elapsed()); | 266 | << "Initial query took: " << Log::TraceTime(time.elapsed()); |
254 | return {entityStore.maxRevision(), replayResult.replayedEntities, replayResult.replayedAll}; | 267 | |
268 | auto state = preparedQuery.getState(); | ||
269 | |||
270 | return {entityStore.maxRevision(), replayResult.replayedEntities, replayResult.replayedAll, state}; | ||
255 | } | 271 | } |
256 | 272 | ||
257 | template class QueryRunner<Sink::ApplicationDomain::Contact>; | 273 | #define REGISTER_TYPE(T) \ |
258 | template class QueryRunner<Sink::ApplicationDomain::Folder>; | 274 | template class QueryRunner<T>; \ |
259 | template class QueryRunner<Sink::ApplicationDomain::Mail>; | 275 | template class QueryWorker<T>; \ |
260 | template class QueryRunner<Sink::ApplicationDomain::Event>; | 276 | |
261 | template class QueryWorker<Sink::ApplicationDomain::Contact>; | 277 | SINK_REGISTER_TYPES() |
262 | template class QueryWorker<Sink::ApplicationDomain::Folder>; | ||
263 | template class QueryWorker<Sink::ApplicationDomain::Mail>; | ||
264 | template class QueryWorker<Sink::ApplicationDomain::Event>; | ||
diff --git a/common/queryrunner.h b/common/queryrunner.h index 66dc68f..f5c7ead 100644 --- a/common/queryrunner.h +++ b/common/queryrunner.h | |||
@@ -24,6 +24,7 @@ | |||
24 | #include "resultprovider.h" | 24 | #include "resultprovider.h" |
25 | #include "query.h" | 25 | #include "query.h" |
26 | #include "log.h" | 26 | #include "log.h" |
27 | #include "datastorequery.h" | ||
27 | 28 | ||
28 | /** | 29 | /** |
29 | * Base clase because you can't have the Q_OBJECT macro in template classes | 30 | * Base clase because you can't have the Q_OBJECT macro in template classes |
@@ -101,4 +102,7 @@ private: | |||
101 | int mBatchSize; | 102 | int mBatchSize; |
102 | QObject guard; | 103 | QObject guard; |
103 | Sink::Log::Context mLogCtx; | 104 | Sink::Log::Context mLogCtx; |
105 | DataStoreQuery::State::Ptr mQueryState; | ||
106 | bool mInitialQueryComplete = false; | ||
107 | bool mQueryInProgress = false; | ||
104 | }; | 108 | }; |
diff --git a/tests/querytest.cpp b/tests/querytest.cpp index 0641c0d..328448f 100644 --- a/tests/querytest.cpp +++ b/tests/querytest.cpp | |||
@@ -731,7 +731,7 @@ private slots: | |||
731 | Query query; | 731 | Query query; |
732 | query.setId("testLivequeryUnmatch"); | 732 | query.setId("testLivequeryUnmatch"); |
733 | query.filter<Mail::Folder>(folder1); | 733 | query.filter<Mail::Folder>(folder1); |
734 | query.reduce<Mail::ThreadId>(Query::Reduce::Selector::max<Mail::Date>()).count("count").collect<Mail::Sender>("senders"); | 734 | query.reduce<Mail::ThreadId>(Query::Reduce::Selector::max<Mail::Date>()).count("count").collect<Mail::Folder>("folders"); |
735 | query.sort<Mail::Date>(); | 735 | query.sort<Mail::Date>(); |
736 | query.setFlags(Query::LiveQuery); | 736 | query.setFlags(Query::LiveQuery); |
737 | query.request<Mail::Unread>(); | 737 | query.request<Mail::Unread>(); |
@@ -747,7 +747,61 @@ private slots: | |||
747 | VERIFYEXEC(Sink::ResourceControl::flushMessageQueue("sink.dummy.instance1")); | 747 | VERIFYEXEC(Sink::ResourceControl::flushMessageQueue("sink.dummy.instance1")); |
748 | QTRY_COMPARE(model->rowCount(), 1); | 748 | QTRY_COMPARE(model->rowCount(), 1); |
749 | auto mail = model->data(model->index(0, 0, QModelIndex{}), Sink::Store::DomainObjectRole).value<Mail::Ptr>(); | 749 | auto mail = model->data(model->index(0, 0, QModelIndex{}), Sink::Store::DomainObjectRole).value<Mail::Ptr>(); |
750 | QCOMPARE(mail->getUnread(), true); | 750 | QTRY_COMPARE(mail->getUnread(), true); |
751 | QCOMPARE(mail->getProperty("count").toInt(), 1); | ||
752 | QCOMPARE(mail->getProperty("folders").toList().size(), 1); | ||
753 | } | ||
754 | |||
755 | void testReductionUpdate() | ||
756 | { | ||
757 | // Setup | ||
758 | auto folder1 = Folder::createEntity<Folder>("sink.dummy.instance1"); | ||
759 | VERIFYEXEC(Sink::Store::create<Folder>(folder1)); | ||
760 | |||
761 | auto folder2 = Folder::createEntity<Folder>("sink.dummy.instance1"); | ||
762 | VERIFYEXEC(Sink::Store::create<Folder>(folder2)); | ||
763 | |||
764 | QDateTime now{QDate{2017, 2, 3}, QTime{10, 0, 0}}; | ||
765 | QDateTime later{QDate{2017, 2, 3}, QTime{11, 0, 0}}; | ||
766 | |||
767 | auto mail1 = Mail::createEntity<Mail>("sink.dummy.instance1"); | ||
768 | mail1.setExtractedMessageId("mail1"); | ||
769 | mail1.setFolder(folder1); | ||
770 | mail1.setUnread(false); | ||
771 | mail1.setExtractedDate(now); | ||
772 | VERIFYEXEC(Sink::Store::create(mail1)); | ||
773 | |||
774 | // Ensure all local data is processed | ||
775 | VERIFYEXEC(Sink::ResourceControl::flushMessageQueue("sink.dummy.instance1")); | ||
776 | |||
777 | Query query; | ||
778 | query.setId("testLivequeryUnmatch"); | ||
779 | query.setFlags(Query::LiveQuery); | ||
780 | query.filter<Mail::Folder>(folder1); | ||
781 | query.reduce<Mail::Folder>(Query::Reduce::Selector::max<Mail::Date>()).count("count").collect<Mail::Folder>("folders"); | ||
782 | query.sort<Mail::Date>(); | ||
783 | query.request<Mail::Unread>(); | ||
784 | query.request<Mail::MessageId>(); | ||
785 | |||
786 | auto model = Sink::Store::loadModel<Mail>(query); | ||
787 | QTRY_COMPARE(model->rowCount(), 1); | ||
788 | |||
789 | //The leader should change to mail2 after the modification | ||
790 | { | ||
791 | auto mail2 = Mail::createEntity<Mail>("sink.dummy.instance1"); | ||
792 | mail2.setExtractedMessageId("mail2"); | ||
793 | mail2.setFolder(folder1); | ||
794 | mail2.setUnread(false); | ||
795 | mail2.setExtractedDate(later); | ||
796 | VERIFYEXEC(Sink::Store::create(mail2)); | ||
797 | } | ||
798 | |||
799 | VERIFYEXEC(Sink::ResourceControl::flushMessageQueue("sink.dummy.instance1")); | ||
800 | QTRY_COMPARE(model->rowCount(), 1); | ||
801 | auto mail = model->data(model->index(0, 0, QModelIndex{}), Sink::Store::DomainObjectRole).value<Mail::Ptr>(); | ||
802 | QTRY_COMPARE(mail->getMessageId(), QByteArray{"mail2"}); | ||
803 | QCOMPARE(mail->getProperty("count").toInt(), 2); | ||
804 | QCOMPARE(mail->getProperty("folders").toList().size(), 2); | ||
751 | } | 805 | } |
752 | 806 | ||
753 | void testBloom() | 807 | void testBloom() |