diff options
-rw-r--r-- | common/datastorequery.cpp | 163 | ||||
-rw-r--r-- | common/datastorequery.h | 18 | ||||
-rw-r--r-- | common/queryrunner.cpp | 119 | ||||
-rw-r--r-- | common/queryrunner.h | 4 | ||||
-rw-r--r-- | tests/querytest.cpp | 58 |
5 files changed, 244 insertions, 118 deletions
diff --git a/common/datastorequery.cpp b/common/datastorequery.cpp index 41d962c..3ba8f40 100644 --- a/common/datastorequery.cpp +++ b/common/datastorequery.cpp | |||
@@ -72,7 +72,7 @@ class Source : public FilterBase { | |||
72 | if (mIt == mIds.constEnd()) { | 72 | if (mIt == mIds.constEnd()) { |
73 | return false; | 73 | return false; |
74 | } | 74 | } |
75 | readEntity(*mIt, [callback](const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation operation) { | 75 | readEntity(*mIt, [this, callback](const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation operation) { |
76 | SinkTraceCtx(mDatastore->mLogCtx) << "Source: Read entity: " << entity.identifier() << operationName(operation); | 76 | SinkTraceCtx(mDatastore->mLogCtx) << "Source: Read entity: " << entity.identifier() << operationName(operation); |
77 | callback({entity, operation}); | 77 | callback({entity, operation}); |
78 | }); | 78 | }); |
@@ -115,7 +115,7 @@ public: | |||
115 | virtual bool next(const std::function<void(const ResultSet::Result &result)> &callback) Q_DECL_OVERRIDE { | 115 | virtual bool next(const std::function<void(const ResultSet::Result &result)> &callback) Q_DECL_OVERRIDE { |
116 | bool foundValue = false; | 116 | bool foundValue = false; |
117 | while(!foundValue && mSource->next([this, callback, &foundValue](const ResultSet::Result &result) { | 117 | while(!foundValue && mSource->next([this, callback, &foundValue](const ResultSet::Result &result) { |
118 | SinkTraceCtx(mDatastore->mLogCtx) << "Filter: " << result.entity.identifier() << result.operation; | 118 | SinkTraceCtx(mDatastore->mLogCtx) << "Filter: " << result.entity.identifier() << operationName(result.operation); |
119 | 119 | ||
120 | //Always accept removals. They can't match the filter since the data is gone. | 120 | //Always accept removals. They can't match the filter since the data is gone. |
121 | if (result.operation == Sink::Operation_Removal) { | 121 | if (result.operation == Sink::Operation_Removal) { |
@@ -167,17 +167,11 @@ public: | |||
167 | 167 | ||
168 | } | 168 | } |
169 | 169 | ||
170 | void process() { | ||
171 | if (operation == QueryBase::Reduce::Aggregator::Count) { | ||
172 | mResult = mResult.toInt() + 1; | ||
173 | } else { | ||
174 | Q_ASSERT(false); | ||
175 | } | ||
176 | } | ||
177 | |||
178 | void process(const QVariant &value) { | 170 | void process(const QVariant &value) { |
179 | if (operation == QueryBase::Reduce::Aggregator::Collect) { | 171 | if (operation == QueryBase::Reduce::Aggregator::Collect) { |
180 | mResult = mResult.toList() << value; | 172 | mResult = mResult.toList() << value; |
173 | } else if (operation == QueryBase::Reduce::Aggregator::Count) { | ||
174 | mResult = mResult.toInt() + 1; | ||
181 | } else { | 175 | } else { |
182 | Q_ASSERT(false); | 176 | Q_ASSERT(false); |
183 | } | 177 | } |
@@ -196,8 +190,8 @@ public: | |||
196 | QVariant mResult; | 190 | QVariant mResult; |
197 | }; | 191 | }; |
198 | 192 | ||
199 | QHash<QByteArray, QVariant> mAggregateValues; | ||
200 | QSet<QByteArray> mReducedValues; | 193 | QSet<QByteArray> mReducedValues; |
194 | QHash<QByteArray, QByteArray> mSelectedValues; | ||
201 | QByteArray mReductionProperty; | 195 | QByteArray mReductionProperty; |
202 | QByteArray mSelectionProperty; | 196 | QByteArray mSelectionProperty; |
203 | QueryBase::Reduce::Selector::Comparator mSelectionComparator; | 197 | QueryBase::Reduce::Selector::Comparator mSelectionComparator; |
@@ -231,51 +225,80 @@ public: | |||
231 | return false; | 225 | return false; |
232 | } | 226 | } |
233 | 227 | ||
228 | QByteArray reduceOnValue(const QVariant &reductionValue, QMap<QByteArray, QVariant> &aggregateValues) | ||
229 | { | ||
230 | QVariant selectionResultValue; | ||
231 | QByteArray selectionResult; | ||
232 | auto results = indexLookup(mReductionProperty, reductionValue); | ||
233 | for (auto &aggregator : mAggregators) { | ||
234 | aggregator.reset(); | ||
235 | } | ||
236 | |||
237 | for (const auto &r : results) { | ||
238 | readEntity(r, [&, this](const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation operation) { | ||
239 | Q_ASSERT(operation != Sink::Operation_Removal); | ||
240 | for (auto &aggregator : mAggregators) { | ||
241 | if (!aggregator.property.isEmpty()) { | ||
242 | aggregator.process(entity.getProperty(aggregator.property)); | ||
243 | } else { | ||
244 | aggregator.process(QVariant{}); | ||
245 | } | ||
246 | } | ||
247 | auto selectionValue = entity.getProperty(mSelectionProperty); | ||
248 | if (!selectionResultValue.isValid() || compare(selectionValue, selectionResultValue, mSelectionComparator)) { | ||
249 | selectionResultValue = selectionValue; | ||
250 | selectionResult = entity.identifier(); | ||
251 | } | ||
252 | }); | ||
253 | } | ||
254 | |||
255 | for (auto &aggregator : mAggregators) { | ||
256 | aggregateValues.insert(aggregator.resultProperty, aggregator.result()); | ||
257 | } | ||
258 | return selectionResult; | ||
259 | } | ||
260 | |||
234 | bool next(const std::function<void(const ResultSet::Result &)> &callback) Q_DECL_OVERRIDE { | 261 | bool next(const std::function<void(const ResultSet::Result &)> &callback) Q_DECL_OVERRIDE { |
235 | bool foundValue = false; | 262 | bool foundValue = false; |
236 | while(!foundValue && mSource->next([this, callback, &foundValue](const ResultSet::Result &result) { | 263 | while(!foundValue && mSource->next([this, callback, &foundValue](const ResultSet::Result &result) { |
237 | auto reductionValue = result.entity.getProperty(mReductionProperty); | ||
238 | if (result.operation == Sink::Operation_Removal) { | 264 | if (result.operation == Sink::Operation_Removal) { |
239 | callback(result); | 265 | callback(result); |
240 | return false; | 266 | return false; |
241 | } | 267 | } |
242 | if (!mReducedValues.contains(getByteArray(reductionValue))) { | 268 | auto reductionValue = result.entity.getProperty(mReductionProperty); |
269 | const auto &reductionValueBa = getByteArray(reductionValue); | ||
270 | if (!mReducedValues.contains(reductionValueBa)) { | ||
243 | //Only reduce every value once. | 271 | //Only reduce every value once. |
244 | mReducedValues.insert(getByteArray(reductionValue)); | 272 | mReducedValues.insert(reductionValueBa); |
245 | QVariant selectionResultValue; | ||
246 | QByteArray selectionResult; | ||
247 | auto results = indexLookup(mReductionProperty, reductionValue); | ||
248 | for (auto &aggregator : mAggregators) { | ||
249 | aggregator.reset(); | ||
250 | } | ||
251 | |||
252 | QVariantList list; | ||
253 | for (const auto &r : results) { | ||
254 | readEntity(r, [&, this](const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation operation) { | ||
255 | for (auto &aggregator : mAggregators) { | ||
256 | if (!aggregator.property.isEmpty()) { | ||
257 | aggregator.process(entity.getProperty(aggregator.property)); | ||
258 | } else { | ||
259 | aggregator.process(); | ||
260 | } | ||
261 | } | ||
262 | auto selectionValue = entity.getProperty(mSelectionProperty); | ||
263 | if (!selectionResultValue.isValid() || compare(selectionValue, selectionResultValue, mSelectionComparator)) { | ||
264 | selectionResultValue = selectionValue; | ||
265 | selectionResult = entity.identifier(); | ||
266 | } | ||
267 | }); | ||
268 | } | ||
269 | |||
270 | QMap<QByteArray, QVariant> aggregateValues; | 273 | QMap<QByteArray, QVariant> aggregateValues; |
271 | for (auto &aggregator : mAggregators) { | 274 | auto selectionResult = reduceOnValue(reductionValue, aggregateValues); |
272 | aggregateValues.insert(aggregator.resultProperty, aggregator.result()); | ||
273 | } | ||
274 | 275 | ||
276 | mSelectedValues.insert(reductionValueBa, selectionResult); | ||
275 | readEntity(selectionResult, [&, this](const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation operation) { | 277 | readEntity(selectionResult, [&, this](const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation operation) { |
276 | callback({entity, operation, aggregateValues}); | 278 | callback({entity, operation, aggregateValues}); |
277 | foundValue = true; | 279 | foundValue = true; |
278 | }); | 280 | }); |
281 | } else { | ||
282 | //During initial query, do nothing. The lookup above will take care of it. | ||
283 | //During updates adjust the reduction according to the modification/addition or removal | ||
284 | if (mIncremental) { | ||
285 | //redo the reduction | ||
286 | QMap<QByteArray, QVariant> aggregateValues; | ||
287 | auto selectionResult = reduceOnValue(reductionValue, aggregateValues); | ||
288 | |||
289 | //TODO if old and new are the same a modification would be enough | ||
290 | auto oldSelectionResult = mSelectedValues.take(reductionValueBa); | ||
291 | //remove old result | ||
292 | readEntity(oldSelectionResult, [&, this](const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation) { | ||
293 | callback({entity, Sink::Operation_Removal}); | ||
294 | }); | ||
295 | |||
296 | //add new result | ||
297 | mSelectedValues.insert(reductionValueBa, selectionResult); | ||
298 | readEntity(selectionResult, [&, this](const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation) { | ||
299 | callback({entity, Sink::Operation_Creation, aggregateValues}); | ||
300 | }); | ||
301 | } | ||
279 | } | 302 | } |
280 | return false; | 303 | return false; |
281 | })) | 304 | })) |
@@ -330,9 +353,23 @@ public: | |||
330 | }; | 353 | }; |
331 | 354 | ||
332 | DataStoreQuery::DataStoreQuery(const Sink::QueryBase &query, const QByteArray &type, EntityStore &store) | 355 | DataStoreQuery::DataStoreQuery(const Sink::QueryBase &query, const QByteArray &type, EntityStore &store) |
333 | : mQuery(query), mType(type), mStore(store), mLogCtx(store.logContext().subContext("datastorequery")) | 356 | : mType(type), mStore(store), mLogCtx(store.logContext().subContext("datastorequery")) |
357 | { | ||
358 | setupQuery(query); | ||
359 | } | ||
360 | |||
361 | DataStoreQuery::DataStoreQuery(const DataStoreQuery::State &state, const QByteArray &type, Sink::Storage::EntityStore &store) | ||
362 | : mType(type), mStore(store), mLogCtx(store.logContext().subContext("datastorequery")) | ||
334 | { | 363 | { |
335 | setupQuery(); | 364 | mCollector = state.mCollector; |
365 | mSource = state.mSource; | ||
366 | |||
367 | auto source = mCollector; | ||
368 | while (source) { | ||
369 | source->mDatastore = this; | ||
370 | source->mIncremental = true; | ||
371 | source = source->mSource; | ||
372 | } | ||
336 | } | 373 | } |
337 | 374 | ||
338 | DataStoreQuery::~DataStoreQuery() | 375 | DataStoreQuery::~DataStoreQuery() |
@@ -340,6 +377,14 @@ DataStoreQuery::~DataStoreQuery() | |||
340 | 377 | ||
341 | } | 378 | } |
342 | 379 | ||
380 | DataStoreQuery::State::Ptr DataStoreQuery::getState() | ||
381 | { | ||
382 | auto state = State::Ptr::create(); | ||
383 | state->mSource = mSource; | ||
384 | state->mCollector = mCollector; | ||
385 | return state; | ||
386 | } | ||
387 | |||
343 | void DataStoreQuery::readEntity(const QByteArray &key, const BufferCallback &resultCallback) | 388 | void DataStoreQuery::readEntity(const QByteArray &key, const BufferCallback &resultCallback) |
344 | { | 389 | { |
345 | mStore.readLatest(mType, key, resultCallback); | 390 | mStore.readLatest(mType, key, resultCallback); |
@@ -443,9 +488,10 @@ QByteArrayList DataStoreQuery::executeSubquery(const QueryBase &subquery) | |||
443 | return ids; | 488 | return ids; |
444 | } | 489 | } |
445 | 490 | ||
446 | void DataStoreQuery::setupQuery() | 491 | void DataStoreQuery::setupQuery(const Sink::QueryBase &query_) |
447 | { | 492 | { |
448 | auto baseFilters = mQuery.getBaseFilters(); | 493 | auto query = query_; |
494 | auto baseFilters = query.getBaseFilters(); | ||
449 | for (const auto &k : baseFilters.keys()) { | 495 | for (const auto &k : baseFilters.keys()) { |
450 | const auto comparator = baseFilters.value(k); | 496 | const auto comparator = baseFilters.value(k); |
451 | if (comparator.value.canConvert<Query>()) { | 497 | if (comparator.value.canConvert<Query>()) { |
@@ -454,44 +500,43 @@ void DataStoreQuery::setupQuery() | |||
454 | baseFilters.insert(k, Query::Comparator(QVariant::fromValue(result), Query::Comparator::In)); | 500 | baseFilters.insert(k, Query::Comparator(QVariant::fromValue(result), Query::Comparator::In)); |
455 | } | 501 | } |
456 | } | 502 | } |
457 | mQuery.setBaseFilters(baseFilters); | 503 | query.setBaseFilters(baseFilters); |
458 | 504 | ||
459 | FilterBase::Ptr baseSet; | 505 | FilterBase::Ptr baseSet; |
460 | QSet<QByteArray> remainingFilters = mQuery.getBaseFilters().keys().toSet(); | 506 | QSet<QByteArray> remainingFilters = query.getBaseFilters().keys().toSet(); |
461 | QByteArray appliedSorting; | 507 | QByteArray appliedSorting; |
462 | if (!mQuery.ids().isEmpty()) { | 508 | if (!query.ids().isEmpty()) { |
463 | mSource = Source::Ptr::create(mQuery.ids().toVector(), this); | 509 | mSource = Source::Ptr::create(query.ids().toVector(), this); |
464 | baseSet = mSource; | 510 | baseSet = mSource; |
465 | } else { | 511 | } else { |
466 | QSet<QByteArray> appliedFilters; | 512 | QSet<QByteArray> appliedFilters; |
467 | 513 | ||
468 | auto resultSet = mStore.indexLookup(mType, mQuery, appliedFilters, appliedSorting); | 514 | auto resultSet = mStore.indexLookup(mType, query, appliedFilters, appliedSorting); |
469 | remainingFilters = remainingFilters - appliedFilters; | 515 | remainingFilters = remainingFilters - appliedFilters; |
470 | 516 | ||
471 | // We do a full scan if there were no indexes available to create the initial set. | 517 | // We do a full scan if there were no indexes available to create the initial set. |
472 | if (appliedFilters.isEmpty()) { | 518 | if (appliedFilters.isEmpty()) { |
473 | // TODO this should be replaced by an index lookup on the uid index | ||
474 | mSource = Source::Ptr::create(mStore.fullScan(mType), this); | 519 | mSource = Source::Ptr::create(mStore.fullScan(mType), this); |
475 | } else { | 520 | } else { |
476 | mSource = Source::Ptr::create(resultSet, this); | 521 | mSource = Source::Ptr::create(resultSet, this); |
477 | } | 522 | } |
478 | baseSet = mSource; | 523 | baseSet = mSource; |
479 | } | 524 | } |
480 | if (!mQuery.getBaseFilters().isEmpty()) { | 525 | if (!query.getBaseFilters().isEmpty()) { |
481 | auto filter = Filter::Ptr::create(baseSet, this); | 526 | auto filter = Filter::Ptr::create(baseSet, this); |
482 | //For incremental queries the remaining filters are not sufficient | 527 | //For incremental queries the remaining filters are not sufficient |
483 | for (const auto &f : mQuery.getBaseFilters().keys()) { | 528 | for (const auto &f : query.getBaseFilters().keys()) { |
484 | filter->propertyFilter.insert(f, mQuery.getFilter(f)); | 529 | filter->propertyFilter.insert(f, query.getFilter(f)); |
485 | } | 530 | } |
486 | baseSet = filter; | 531 | baseSet = filter; |
487 | } | 532 | } |
488 | /* if (appliedSorting.isEmpty() && !mQuery.sortProperty.isEmpty()) { */ | 533 | /* if (appliedSorting.isEmpty() && !query.sortProperty.isEmpty()) { */ |
489 | /* //Apply manual sorting */ | 534 | /* //Apply manual sorting */ |
490 | /* baseSet = Sort::Ptr::create(baseSet, mQuery.sortProperty); */ | 535 | /* baseSet = Sort::Ptr::create(baseSet, query.sortProperty); */ |
491 | /* } */ | 536 | /* } */ |
492 | 537 | ||
493 | //Setup the rest of the filter stages on top of the base set | 538 | //Setup the rest of the filter stages on top of the base set |
494 | for (const auto &stage : mQuery.getFilterStages()) { | 539 | for (const auto &stage : query.getFilterStages()) { |
495 | if (auto filter = stage.dynamicCast<Query::Filter>()) { | 540 | if (auto filter = stage.dynamicCast<Query::Filter>()) { |
496 | auto f = Filter::Ptr::create(baseSet, this); | 541 | auto f = Filter::Ptr::create(baseSet, this); |
497 | f->propertyFilter = filter->propertyFilter; | 542 | f->propertyFilter = filter->propertyFilter; |
@@ -521,7 +566,7 @@ QVector<QByteArray> DataStoreQuery::loadIncrementalResultSet(qint64 baseRevision | |||
521 | 566 | ||
522 | ResultSet DataStoreQuery::update(qint64 baseRevision) | 567 | ResultSet DataStoreQuery::update(qint64 baseRevision) |
523 | { | 568 | { |
524 | SinkTraceCtx(mLogCtx) << "Executing query update to revision " << baseRevision; | 569 | SinkTraceCtx(mLogCtx) << "Executing query update from revision " << baseRevision; |
525 | auto incrementalResultSet = loadIncrementalResultSet(baseRevision); | 570 | auto incrementalResultSet = loadIncrementalResultSet(baseRevision); |
526 | SinkTraceCtx(mLogCtx) << "Incremental changes: " << incrementalResultSet; | 571 | SinkTraceCtx(mLogCtx) << "Incremental changes: " << incrementalResultSet; |
527 | mSource->add(incrementalResultSet); | 572 | mSource->add(incrementalResultSet); |
diff --git a/common/datastorequery.h b/common/datastorequery.h index 5a47685..a797782 100644 --- a/common/datastorequery.h +++ b/common/datastorequery.h | |||
@@ -26,6 +26,7 @@ | |||
26 | 26 | ||
27 | class Source; | 27 | class Source; |
28 | class Bloom; | 28 | class Bloom; |
29 | class Reduce; | ||
29 | class Filter; | 30 | class Filter; |
30 | class FilterBase; | 31 | class FilterBase; |
31 | 32 | ||
@@ -33,15 +34,25 @@ class DataStoreQuery { | |||
33 | friend class FilterBase; | 34 | friend class FilterBase; |
34 | friend class Source; | 35 | friend class Source; |
35 | friend class Bloom; | 36 | friend class Bloom; |
37 | friend class Reduce; | ||
36 | friend class Filter; | 38 | friend class Filter; |
37 | public: | 39 | public: |
38 | typedef QSharedPointer<DataStoreQuery> Ptr; | 40 | typedef QSharedPointer<DataStoreQuery> Ptr; |
39 | 41 | ||
42 | struct State { | ||
43 | typedef QSharedPointer<State> Ptr; | ||
44 | QSharedPointer<FilterBase> mCollector; | ||
45 | QSharedPointer<Source> mSource; | ||
46 | }; | ||
47 | |||
40 | DataStoreQuery(const Sink::QueryBase &query, const QByteArray &type, Sink::Storage::EntityStore &store); | 48 | DataStoreQuery(const Sink::QueryBase &query, const QByteArray &type, Sink::Storage::EntityStore &store); |
49 | DataStoreQuery(const DataStoreQuery::State &state, const QByteArray &type, Sink::Storage::EntityStore &store); | ||
41 | ~DataStoreQuery(); | 50 | ~DataStoreQuery(); |
42 | ResultSet execute(); | 51 | ResultSet execute(); |
43 | ResultSet update(qint64 baseRevision); | 52 | ResultSet update(qint64 baseRevision); |
44 | 53 | ||
54 | State::Ptr getState(); | ||
55 | |||
45 | private: | 56 | private: |
46 | 57 | ||
47 | typedef std::function<bool(const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation)> FilterFunction; | 58 | typedef std::function<bool(const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation)> FilterFunction; |
@@ -54,12 +65,10 @@ private: | |||
54 | ResultSet createFilteredSet(ResultSet &resultSet, const FilterFunction &); | 65 | ResultSet createFilteredSet(ResultSet &resultSet, const FilterFunction &); |
55 | QVector<QByteArray> loadIncrementalResultSet(qint64 baseRevision); | 66 | QVector<QByteArray> loadIncrementalResultSet(qint64 baseRevision); |
56 | 67 | ||
57 | void setupQuery(); | 68 | void setupQuery(const Sink::QueryBase &query_); |
58 | QByteArrayList executeSubquery(const Sink::QueryBase &subquery); | 69 | QByteArrayList executeSubquery(const Sink::QueryBase &subquery); |
59 | 70 | ||
60 | Sink::QueryBase mQuery; | ||
61 | const QByteArray mType; | 71 | const QByteArray mType; |
62 | bool mInitialQuery; | ||
63 | QSharedPointer<FilterBase> mCollector; | 72 | QSharedPointer<FilterBase> mCollector; |
64 | QSharedPointer<Source> mSource; | 73 | QSharedPointer<Source> mSource; |
65 | 74 | ||
@@ -102,7 +111,8 @@ public: | |||
102 | //Returns true for as long as a result is available | 111 | //Returns true for as long as a result is available |
103 | virtual bool next(const std::function<void(const ResultSet::Result &)> &callback) = 0; | 112 | virtual bool next(const std::function<void(const ResultSet::Result &)> &callback) = 0; |
104 | 113 | ||
105 | QSharedPointer<FilterBase> mSource; | 114 | FilterBase::Ptr mSource; |
106 | DataStoreQuery *mDatastore; | 115 | DataStoreQuery *mDatastore; |
116 | bool mIncremental = false; | ||
107 | }; | 117 | }; |
108 | 118 | ||
diff --git a/common/queryrunner.cpp b/common/queryrunner.cpp index 748320f..40880eb 100644 --- a/common/queryrunner.cpp +++ b/common/queryrunner.cpp | |||
@@ -33,6 +33,7 @@ struct ReplayResult { | |||
33 | qint64 newRevision; | 33 | qint64 newRevision; |
34 | qint64 replayedEntities; | 34 | qint64 replayedEntities; |
35 | bool replayedAll; | 35 | bool replayedAll; |
36 | DataStoreQuery::State::Ptr queryState; | ||
36 | }; | 37 | }; |
37 | 38 | ||
38 | /* | 39 | /* |
@@ -49,7 +50,7 @@ public: | |||
49 | QueryWorker(const Sink::Query &query, const ResourceContext &context, const QByteArray &bufferType, const QueryRunnerBase::ResultTransformation &transformation, const Sink::Log::Context &logCtx); | 50 | QueryWorker(const Sink::Query &query, const ResourceContext &context, const QByteArray &bufferType, const QueryRunnerBase::ResultTransformation &transformation, const Sink::Log::Context &logCtx); |
50 | virtual ~QueryWorker(); | 51 | virtual ~QueryWorker(); |
51 | 52 | ||
52 | ReplayResult executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider); | 53 | ReplayResult executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, DataStoreQuery::State::Ptr state); |
53 | ReplayResult executeInitialQuery(const Sink::Query &query, const typename DomainType::Ptr &parent, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, int offset, int batchsize); | 54 | ReplayResult executeInitialQuery(const Sink::Query &query, const typename DomainType::Ptr &parent, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, int offset, int batchsize); |
54 | 55 | ||
55 | private: | 56 | private: |
@@ -69,45 +70,41 @@ QueryRunner<DomainType>::QueryRunner(const Sink::Query &query, const Sink::Resou | |||
69 | SinkWarningCtx(mLogCtx) << "A limited query without sorting is typically a bad idea, because there is no telling what you're going to get."; | 70 | SinkWarningCtx(mLogCtx) << "A limited query without sorting is typically a bad idea, because there is no telling what you're going to get."; |
70 | } | 71 | } |
71 | auto guardPtr = QPointer<QObject>(&guard); | 72 | auto guardPtr = QPointer<QObject>(&guard); |
72 | // We delegate loading of initial data to the result provider, so it can decide for itself what it needs to load. | 73 | auto fetcher = [=](const typename DomainType::Ptr &parent) { |
73 | mResultProvider->setFetcher([=](const typename DomainType::Ptr &parent) { | ||
74 | const QByteArray parentId = parent ? parent->identifier() : QByteArray(); | 74 | const QByteArray parentId = parent ? parent->identifier() : QByteArray(); |
75 | SinkTraceCtx(mLogCtx) << "Running fetcher. Offset: " << mOffset[parentId] << " Batchsize: " << mBatchSize; | 75 | SinkTraceCtx(mLogCtx) << "Running fetcher. Offset: " << mOffset[parentId] << " Batchsize: " << mBatchSize; |
76 | auto resultProvider = mResultProvider; | 76 | auto resultProvider = mResultProvider; |
77 | if (query.synchronousQuery()) { | 77 | auto resultTransformation = mResultTransformation; |
78 | QueryWorker<DomainType> worker(query, mResourceContext, bufferType, mResultTransformation, mLogCtx); | 78 | auto offset = mOffset[parentId]; |
79 | const auto newRevisionAndReplayedEntities = worker.executeInitialQuery(query, parent, *resultProvider, mOffset[parentId], mBatchSize); | 79 | auto batchSize = mBatchSize; |
80 | mOffset[parentId] += newRevisionAndReplayedEntities.replayedEntities; | 80 | auto resourceContext = mResourceContext; |
81 | resultProvider->setRevision(newRevisionAndReplayedEntities.newRevision); | 81 | auto logCtx = mLogCtx; |
82 | resultProvider->initialResultSetComplete(parent, newRevisionAndReplayedEntities.replayedAll); | 82 | const bool runAsync = !query.synchronousQuery(); |
83 | } else { | 83 | //The lambda will be executed in a separate thread, so copy all arguments |
84 | auto resultTransformation = mResultTransformation; | 84 | async::run<ReplayResult>([resultTransformation, offset, batchSize, query, bufferType, resourceContext, resultProvider, parent, logCtx]() { |
85 | auto offset = mOffset[parentId]; | 85 | QueryWorker<DomainType> worker(query, resourceContext, bufferType, resultTransformation, logCtx); |
86 | auto batchSize = mBatchSize; | 86 | return worker.executeInitialQuery(query, parent, *resultProvider, offset, batchSize); |
87 | auto resourceContext = mResourceContext; | 87 | }, runAsync) |
88 | auto logCtx = mLogCtx; | 88 | .then([this, parentId, query, parent, resultProvider, guardPtr](const ReplayResult &result) { |
89 | //The lambda will be executed in a separate thread, so copy all arguments | 89 | if (!guardPtr) { |
90 | async::run<ReplayResult>([resultTransformation, offset, batchSize, query, bufferType, resourceContext, resultProvider, parent, logCtx]() { | 90 | qWarning() << "The parent object is already gone"; |
91 | QueryWorker<DomainType> worker(query, resourceContext, bufferType, resultTransformation, logCtx); | 91 | return; |
92 | const auto newRevisionAndReplayedEntities = worker.executeInitialQuery(query, parent, *resultProvider, offset, batchSize); | 92 | } |
93 | return newRevisionAndReplayedEntities; | 93 | mInitialQueryComplete = true; |
94 | mQueryState = result.queryState; | ||
95 | mOffset[parentId] += result.replayedEntities; | ||
96 | // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. | ||
97 | if (query.liveQuery()) { | ||
98 | mResourceAccess->sendRevisionReplayedCommand(result.newRevision); | ||
99 | } | ||
100 | resultProvider->setRevision(result.newRevision); | ||
101 | resultProvider->initialResultSetComplete(parent, result.replayedAll); | ||
94 | }) | 102 | }) |
95 | .template then([this, parentId, query, parent, resultProvider, guardPtr](const ReplayResult &newRevisionAndReplayedEntities) { | 103 | .exec(); |
96 | if (!guardPtr) { | 104 | }; |
97 | qWarning() << "The parent object is already gone"; | 105 | |
98 | return; | 106 | // We delegate loading of initial data to the result provider, so it can decide for itself what it needs to load. |
99 | } | 107 | mResultProvider->setFetcher(fetcher); |
100 | mOffset[parentId] += newRevisionAndReplayedEntities.replayedEntities; | ||
101 | // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. | ||
102 | if (query.liveQuery()) { | ||
103 | mResourceAccess->sendRevisionReplayedCommand(newRevisionAndReplayedEntities.newRevision); | ||
104 | } | ||
105 | resultProvider->setRevision(newRevisionAndReplayedEntities.newRevision); | ||
106 | resultProvider->initialResultSetComplete(parent, newRevisionAndReplayedEntities.replayedAll); | ||
107 | }) | ||
108 | .exec(); | ||
109 | } | ||
110 | }); | ||
111 | 108 | ||
112 | // In case of a live query we keep the runner for as long alive as the result provider exists | 109 | // In case of a live query we keep the runner for as long alive as the result provider exists |
113 | if (query.liveQuery()) { | 110 | if (query.liveQuery()) { |
@@ -117,16 +114,26 @@ QueryRunner<DomainType>::QueryRunner(const Sink::Query &query, const Sink::Resou | |||
117 | auto resultProvider = mResultProvider; | 114 | auto resultProvider = mResultProvider; |
118 | auto resourceContext = mResourceContext; | 115 | auto resourceContext = mResourceContext; |
119 | auto logCtx = mLogCtx; | 116 | auto logCtx = mLogCtx; |
120 | return async::run<ReplayResult>([=]() { | 117 | auto state = mQueryState; |
118 | if (!mInitialQueryComplete) { | ||
119 | SinkWarningCtx(mLogCtx) << "Can't start the incremental query before the initial query is complete"; | ||
120 | fetcher({}); | ||
121 | return KAsync::null(); | ||
122 | } | ||
123 | Q_ASSERT(!mQueryInProgress); | ||
124 | return KAsync::syncStart<void>([&] { | ||
125 | mQueryInProgress = true; | ||
126 | }) | ||
127 | .then(async::run<ReplayResult>([=]() { | ||
121 | QueryWorker<DomainType> worker(query, resourceContext, bufferType, mResultTransformation, logCtx); | 128 | QueryWorker<DomainType> worker(query, resourceContext, bufferType, mResultTransformation, logCtx); |
122 | const auto newRevisionAndReplayedEntities = worker.executeIncrementalQuery(query, *resultProvider); | 129 | return worker.executeIncrementalQuery(query, *resultProvider, state); |
123 | return newRevisionAndReplayedEntities; | 130 | })) |
124 | }) | 131 | .then([query, this, resultProvider, guardPtr](const ReplayResult &newRevisionAndReplayedEntities) { |
125 | .template then([query, this, resultProvider, guardPtr](const ReplayResult &newRevisionAndReplayedEntities) { | ||
126 | if (!guardPtr) { | 132 | if (!guardPtr) { |
127 | qWarning() << "The parent object is already gone"; | 133 | qWarning() << "The parent object is already gone"; |
128 | return; | 134 | return; |
129 | } | 135 | } |
136 | mQueryInProgress = false; | ||
130 | // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. | 137 | // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. |
131 | mResourceAccess->sendRevisionReplayedCommand(newRevisionAndReplayedEntities.newRevision); | 138 | mResourceAccess->sendRevisionReplayedCommand(newRevisionAndReplayedEntities.newRevision); |
132 | resultProvider->setRevision(newRevisionAndReplayedEntities.newRevision); | 139 | resultProvider->setRevision(newRevisionAndReplayedEntities.newRevision); |
@@ -201,14 +208,20 @@ void QueryWorker<DomainType>::resultProviderCallback(const Sink::Query &query, S | |||
201 | } | 208 | } |
202 | 209 | ||
203 | template <class DomainType> | 210 | template <class DomainType> |
204 | ReplayResult QueryWorker<DomainType>::executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider) | 211 | ReplayResult QueryWorker<DomainType>::executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, DataStoreQuery::State::Ptr state) |
205 | { | 212 | { |
206 | QTime time; | 213 | QTime time; |
207 | time.start(); | 214 | time.start(); |
208 | 215 | ||
209 | const qint64 baseRevision = resultProvider.revision() + 1; | 216 | const qint64 baseRevision = resultProvider.revision() + 1; |
217 | SinkTraceCtx(mLogCtx) << "Running query update from revision: " << baseRevision; | ||
218 | |||
210 | auto entityStore = EntityStore{mResourceContext, mLogCtx}; | 219 | auto entityStore = EntityStore{mResourceContext, mLogCtx}; |
211 | auto preparedQuery = DataStoreQuery{query, ApplicationDomain::getTypeName<DomainType>(), entityStore}; | 220 | if (!state) { |
221 | SinkWarningCtx(mLogCtx) << "No previous query state."; | ||
222 | return {0, 0, false, DataStoreQuery::State::Ptr{}}; | ||
223 | } | ||
224 | auto preparedQuery = DataStoreQuery{*state, ApplicationDomain::getTypeName<DomainType>(), entityStore}; | ||
212 | auto resultSet = preparedQuery.update(baseRevision); | 225 | auto resultSet = preparedQuery.update(baseRevision); |
213 | SinkTraceCtx(mLogCtx) << "Filtered set retrieved. " << Log::TraceTime(time.elapsed()); | 226 | SinkTraceCtx(mLogCtx) << "Filtered set retrieved. " << Log::TraceTime(time.elapsed()); |
214 | auto replayResult = resultSet.replaySet(0, 0, [this, query, &resultProvider](const ResultSet::Result &result) { | 227 | auto replayResult = resultSet.replaySet(0, 0, [this, query, &resultProvider](const ResultSet::Result &result) { |
@@ -218,7 +231,7 @@ ReplayResult QueryWorker<DomainType>::executeIncrementalQuery(const Sink::Query | |||
218 | SinkTraceCtx(mLogCtx) << "Replayed " << replayResult.replayedEntities << " results.\n" | 231 | SinkTraceCtx(mLogCtx) << "Replayed " << replayResult.replayedEntities << " results.\n" |
219 | << (replayResult.replayedAll ? "Replayed all available results.\n" : "") | 232 | << (replayResult.replayedAll ? "Replayed all available results.\n" : "") |
220 | << "Incremental query took: " << Log::TraceTime(time.elapsed()); | 233 | << "Incremental query took: " << Log::TraceTime(time.elapsed()); |
221 | return {entityStore.maxRevision(), replayResult.replayedEntities, replayResult.replayedAll}; | 234 | return {entityStore.maxRevision(), replayResult.replayedEntities, replayResult.replayedAll, preparedQuery.getState()}; |
222 | } | 235 | } |
223 | 236 | ||
224 | template <class DomainType> | 237 | template <class DomainType> |
@@ -251,14 +264,14 @@ ReplayResult QueryWorker<DomainType>::executeInitialQuery( | |||
251 | SinkTraceCtx(mLogCtx) << "Replayed " << replayResult.replayedEntities << " results.\n" | 264 | SinkTraceCtx(mLogCtx) << "Replayed " << replayResult.replayedEntities << " results.\n" |
252 | << (replayResult.replayedAll ? "Replayed all available results.\n" : "") | 265 | << (replayResult.replayedAll ? "Replayed all available results.\n" : "") |
253 | << "Initial query took: " << Log::TraceTime(time.elapsed()); | 266 | << "Initial query took: " << Log::TraceTime(time.elapsed()); |
254 | return {entityStore.maxRevision(), replayResult.replayedEntities, replayResult.replayedAll}; | 267 | |
268 | auto state = preparedQuery.getState(); | ||
269 | |||
270 | return {entityStore.maxRevision(), replayResult.replayedEntities, replayResult.replayedAll, state}; | ||
255 | } | 271 | } |
256 | 272 | ||
257 | template class QueryRunner<Sink::ApplicationDomain::Contact>; | 273 | #define REGISTER_TYPE(T) \ |
258 | template class QueryRunner<Sink::ApplicationDomain::Folder>; | 274 | template class QueryRunner<T>; \ |
259 | template class QueryRunner<Sink::ApplicationDomain::Mail>; | 275 | template class QueryWorker<T>; \ |
260 | template class QueryRunner<Sink::ApplicationDomain::Event>; | 276 | |
261 | template class QueryWorker<Sink::ApplicationDomain::Contact>; | 277 | SINK_REGISTER_TYPES() |
262 | template class QueryWorker<Sink::ApplicationDomain::Folder>; | ||
263 | template class QueryWorker<Sink::ApplicationDomain::Mail>; | ||
264 | template class QueryWorker<Sink::ApplicationDomain::Event>; | ||
diff --git a/common/queryrunner.h b/common/queryrunner.h index 66dc68f..f5c7ead 100644 --- a/common/queryrunner.h +++ b/common/queryrunner.h | |||
@@ -24,6 +24,7 @@ | |||
24 | #include "resultprovider.h" | 24 | #include "resultprovider.h" |
25 | #include "query.h" | 25 | #include "query.h" |
26 | #include "log.h" | 26 | #include "log.h" |
27 | #include "datastorequery.h" | ||
27 | 28 | ||
28 | /** | 29 | /** |
29 | * Base clase because you can't have the Q_OBJECT macro in template classes | 30 | * Base clase because you can't have the Q_OBJECT macro in template classes |
@@ -101,4 +102,7 @@ private: | |||
101 | int mBatchSize; | 102 | int mBatchSize; |
102 | QObject guard; | 103 | QObject guard; |
103 | Sink::Log::Context mLogCtx; | 104 | Sink::Log::Context mLogCtx; |
105 | DataStoreQuery::State::Ptr mQueryState; | ||
106 | bool mInitialQueryComplete = false; | ||
107 | bool mQueryInProgress = false; | ||
104 | }; | 108 | }; |
diff --git a/tests/querytest.cpp b/tests/querytest.cpp index 0641c0d..328448f 100644 --- a/tests/querytest.cpp +++ b/tests/querytest.cpp | |||
@@ -731,7 +731,7 @@ private slots: | |||
731 | Query query; | 731 | Query query; |
732 | query.setId("testLivequeryUnmatch"); | 732 | query.setId("testLivequeryUnmatch"); |
733 | query.filter<Mail::Folder>(folder1); | 733 | query.filter<Mail::Folder>(folder1); |
734 | query.reduce<Mail::ThreadId>(Query::Reduce::Selector::max<Mail::Date>()).count("count").collect<Mail::Sender>("senders"); | 734 | query.reduce<Mail::ThreadId>(Query::Reduce::Selector::max<Mail::Date>()).count("count").collect<Mail::Folder>("folders"); |
735 | query.sort<Mail::Date>(); | 735 | query.sort<Mail::Date>(); |
736 | query.setFlags(Query::LiveQuery); | 736 | query.setFlags(Query::LiveQuery); |
737 | query.request<Mail::Unread>(); | 737 | query.request<Mail::Unread>(); |
@@ -747,7 +747,61 @@ private slots: | |||
747 | VERIFYEXEC(Sink::ResourceControl::flushMessageQueue("sink.dummy.instance1")); | 747 | VERIFYEXEC(Sink::ResourceControl::flushMessageQueue("sink.dummy.instance1")); |
748 | QTRY_COMPARE(model->rowCount(), 1); | 748 | QTRY_COMPARE(model->rowCount(), 1); |
749 | auto mail = model->data(model->index(0, 0, QModelIndex{}), Sink::Store::DomainObjectRole).value<Mail::Ptr>(); | 749 | auto mail = model->data(model->index(0, 0, QModelIndex{}), Sink::Store::DomainObjectRole).value<Mail::Ptr>(); |
750 | QCOMPARE(mail->getUnread(), true); | 750 | QTRY_COMPARE(mail->getUnread(), true); |
751 | QCOMPARE(mail->getProperty("count").toInt(), 1); | ||
752 | QCOMPARE(mail->getProperty("folders").toList().size(), 1); | ||
753 | } | ||
754 | |||
755 | void testReductionUpdate() | ||
756 | { | ||
757 | // Setup | ||
758 | auto folder1 = Folder::createEntity<Folder>("sink.dummy.instance1"); | ||
759 | VERIFYEXEC(Sink::Store::create<Folder>(folder1)); | ||
760 | |||
761 | auto folder2 = Folder::createEntity<Folder>("sink.dummy.instance1"); | ||
762 | VERIFYEXEC(Sink::Store::create<Folder>(folder2)); | ||
763 | |||
764 | QDateTime now{QDate{2017, 2, 3}, QTime{10, 0, 0}}; | ||
765 | QDateTime later{QDate{2017, 2, 3}, QTime{11, 0, 0}}; | ||
766 | |||
767 | auto mail1 = Mail::createEntity<Mail>("sink.dummy.instance1"); | ||
768 | mail1.setExtractedMessageId("mail1"); | ||
769 | mail1.setFolder(folder1); | ||
770 | mail1.setUnread(false); | ||
771 | mail1.setExtractedDate(now); | ||
772 | VERIFYEXEC(Sink::Store::create(mail1)); | ||
773 | |||
774 | // Ensure all local data is processed | ||
775 | VERIFYEXEC(Sink::ResourceControl::flushMessageQueue("sink.dummy.instance1")); | ||
776 | |||
777 | Query query; | ||
778 | query.setId("testLivequeryUnmatch"); | ||
779 | query.setFlags(Query::LiveQuery); | ||
780 | query.filter<Mail::Folder>(folder1); | ||
781 | query.reduce<Mail::Folder>(Query::Reduce::Selector::max<Mail::Date>()).count("count").collect<Mail::Folder>("folders"); | ||
782 | query.sort<Mail::Date>(); | ||
783 | query.request<Mail::Unread>(); | ||
784 | query.request<Mail::MessageId>(); | ||
785 | |||
786 | auto model = Sink::Store::loadModel<Mail>(query); | ||
787 | QTRY_COMPARE(model->rowCount(), 1); | ||
788 | |||
789 | //The leader should change to mail2 after the modification | ||
790 | { | ||
791 | auto mail2 = Mail::createEntity<Mail>("sink.dummy.instance1"); | ||
792 | mail2.setExtractedMessageId("mail2"); | ||
793 | mail2.setFolder(folder1); | ||
794 | mail2.setUnread(false); | ||
795 | mail2.setExtractedDate(later); | ||
796 | VERIFYEXEC(Sink::Store::create(mail2)); | ||
797 | } | ||
798 | |||
799 | VERIFYEXEC(Sink::ResourceControl::flushMessageQueue("sink.dummy.instance1")); | ||
800 | QTRY_COMPARE(model->rowCount(), 1); | ||
801 | auto mail = model->data(model->index(0, 0, QModelIndex{}), Sink::Store::DomainObjectRole).value<Mail::Ptr>(); | ||
802 | QTRY_COMPARE(mail->getMessageId(), QByteArray{"mail2"}); | ||
803 | QCOMPARE(mail->getProperty("count").toInt(), 2); | ||
804 | QCOMPARE(mail->getProperty("folders").toList().size(), 2); | ||
751 | } | 805 | } |
752 | 806 | ||
753 | void testBloom() | 807 | void testBloom() |