diff options
author | Christian Mollekopf <chrigi_1@fastmail.fm> | 2017-02-11 12:02:58 +0100 |
---|---|---|
committer | Christian Mollekopf <chrigi_1@fastmail.fm> | 2017-02-13 19:42:39 +0100 |
commit | 1259b236704e790fa1284a63ec537525bce23841 (patch) | |
tree | 85cd0491e56d2f604cc8aa291a49d20f8f73c684 /common/datastorequery.cpp | |
parent | b4bd3932aa2a8e841ed204b341bcbf65ba59c5b2 (diff) | |
download | sink-1259b236704e790fa1284a63ec537525bce23841.tar.gz sink-1259b236704e790fa1284a63ec537525bce23841.zip |
Fixed reduction updates with stateful query.
Some filters need to maintain state between runs in order to be able to
emit only what has changed. This now also make reduction work for live
queries.
Diffstat (limited to 'common/datastorequery.cpp')
-rw-r--r-- | common/datastorequery.cpp | 163 |
1 files changed, 104 insertions, 59 deletions
diff --git a/common/datastorequery.cpp b/common/datastorequery.cpp index 41d962c..3ba8f40 100644 --- a/common/datastorequery.cpp +++ b/common/datastorequery.cpp | |||
@@ -72,7 +72,7 @@ class Source : public FilterBase { | |||
72 | if (mIt == mIds.constEnd()) { | 72 | if (mIt == mIds.constEnd()) { |
73 | return false; | 73 | return false; |
74 | } | 74 | } |
75 | readEntity(*mIt, [callback](const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation operation) { | 75 | readEntity(*mIt, [this, callback](const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation operation) { |
76 | SinkTraceCtx(mDatastore->mLogCtx) << "Source: Read entity: " << entity.identifier() << operationName(operation); | 76 | SinkTraceCtx(mDatastore->mLogCtx) << "Source: Read entity: " << entity.identifier() << operationName(operation); |
77 | callback({entity, operation}); | 77 | callback({entity, operation}); |
78 | }); | 78 | }); |
@@ -115,7 +115,7 @@ public: | |||
115 | virtual bool next(const std::function<void(const ResultSet::Result &result)> &callback) Q_DECL_OVERRIDE { | 115 | virtual bool next(const std::function<void(const ResultSet::Result &result)> &callback) Q_DECL_OVERRIDE { |
116 | bool foundValue = false; | 116 | bool foundValue = false; |
117 | while(!foundValue && mSource->next([this, callback, &foundValue](const ResultSet::Result &result) { | 117 | while(!foundValue && mSource->next([this, callback, &foundValue](const ResultSet::Result &result) { |
118 | SinkTraceCtx(mDatastore->mLogCtx) << "Filter: " << result.entity.identifier() << result.operation; | 118 | SinkTraceCtx(mDatastore->mLogCtx) << "Filter: " << result.entity.identifier() << operationName(result.operation); |
119 | 119 | ||
120 | //Always accept removals. They can't match the filter since the data is gone. | 120 | //Always accept removals. They can't match the filter since the data is gone. |
121 | if (result.operation == Sink::Operation_Removal) { | 121 | if (result.operation == Sink::Operation_Removal) { |
@@ -167,17 +167,11 @@ public: | |||
167 | 167 | ||
168 | } | 168 | } |
169 | 169 | ||
170 | void process() { | ||
171 | if (operation == QueryBase::Reduce::Aggregator::Count) { | ||
172 | mResult = mResult.toInt() + 1; | ||
173 | } else { | ||
174 | Q_ASSERT(false); | ||
175 | } | ||
176 | } | ||
177 | |||
178 | void process(const QVariant &value) { | 170 | void process(const QVariant &value) { |
179 | if (operation == QueryBase::Reduce::Aggregator::Collect) { | 171 | if (operation == QueryBase::Reduce::Aggregator::Collect) { |
180 | mResult = mResult.toList() << value; | 172 | mResult = mResult.toList() << value; |
173 | } else if (operation == QueryBase::Reduce::Aggregator::Count) { | ||
174 | mResult = mResult.toInt() + 1; | ||
181 | } else { | 175 | } else { |
182 | Q_ASSERT(false); | 176 | Q_ASSERT(false); |
183 | } | 177 | } |
@@ -196,8 +190,8 @@ public: | |||
196 | QVariant mResult; | 190 | QVariant mResult; |
197 | }; | 191 | }; |
198 | 192 | ||
199 | QHash<QByteArray, QVariant> mAggregateValues; | ||
200 | QSet<QByteArray> mReducedValues; | 193 | QSet<QByteArray> mReducedValues; |
194 | QHash<QByteArray, QByteArray> mSelectedValues; | ||
201 | QByteArray mReductionProperty; | 195 | QByteArray mReductionProperty; |
202 | QByteArray mSelectionProperty; | 196 | QByteArray mSelectionProperty; |
203 | QueryBase::Reduce::Selector::Comparator mSelectionComparator; | 197 | QueryBase::Reduce::Selector::Comparator mSelectionComparator; |
@@ -231,51 +225,80 @@ public: | |||
231 | return false; | 225 | return false; |
232 | } | 226 | } |
233 | 227 | ||
228 | QByteArray reduceOnValue(const QVariant &reductionValue, QMap<QByteArray, QVariant> &aggregateValues) | ||
229 | { | ||
230 | QVariant selectionResultValue; | ||
231 | QByteArray selectionResult; | ||
232 | auto results = indexLookup(mReductionProperty, reductionValue); | ||
233 | for (auto &aggregator : mAggregators) { | ||
234 | aggregator.reset(); | ||
235 | } | ||
236 | |||
237 | for (const auto &r : results) { | ||
238 | readEntity(r, [&, this](const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation operation) { | ||
239 | Q_ASSERT(operation != Sink::Operation_Removal); | ||
240 | for (auto &aggregator : mAggregators) { | ||
241 | if (!aggregator.property.isEmpty()) { | ||
242 | aggregator.process(entity.getProperty(aggregator.property)); | ||
243 | } else { | ||
244 | aggregator.process(QVariant{}); | ||
245 | } | ||
246 | } | ||
247 | auto selectionValue = entity.getProperty(mSelectionProperty); | ||
248 | if (!selectionResultValue.isValid() || compare(selectionValue, selectionResultValue, mSelectionComparator)) { | ||
249 | selectionResultValue = selectionValue; | ||
250 | selectionResult = entity.identifier(); | ||
251 | } | ||
252 | }); | ||
253 | } | ||
254 | |||
255 | for (auto &aggregator : mAggregators) { | ||
256 | aggregateValues.insert(aggregator.resultProperty, aggregator.result()); | ||
257 | } | ||
258 | return selectionResult; | ||
259 | } | ||
260 | |||
234 | bool next(const std::function<void(const ResultSet::Result &)> &callback) Q_DECL_OVERRIDE { | 261 | bool next(const std::function<void(const ResultSet::Result &)> &callback) Q_DECL_OVERRIDE { |
235 | bool foundValue = false; | 262 | bool foundValue = false; |
236 | while(!foundValue && mSource->next([this, callback, &foundValue](const ResultSet::Result &result) { | 263 | while(!foundValue && mSource->next([this, callback, &foundValue](const ResultSet::Result &result) { |
237 | auto reductionValue = result.entity.getProperty(mReductionProperty); | ||
238 | if (result.operation == Sink::Operation_Removal) { | 264 | if (result.operation == Sink::Operation_Removal) { |
239 | callback(result); | 265 | callback(result); |
240 | return false; | 266 | return false; |
241 | } | 267 | } |
242 | if (!mReducedValues.contains(getByteArray(reductionValue))) { | 268 | auto reductionValue = result.entity.getProperty(mReductionProperty); |
269 | const auto &reductionValueBa = getByteArray(reductionValue); | ||
270 | if (!mReducedValues.contains(reductionValueBa)) { | ||
243 | //Only reduce every value once. | 271 | //Only reduce every value once. |
244 | mReducedValues.insert(getByteArray(reductionValue)); | 272 | mReducedValues.insert(reductionValueBa); |
245 | QVariant selectionResultValue; | ||
246 | QByteArray selectionResult; | ||
247 | auto results = indexLookup(mReductionProperty, reductionValue); | ||
248 | for (auto &aggregator : mAggregators) { | ||
249 | aggregator.reset(); | ||
250 | } | ||
251 | |||
252 | QVariantList list; | ||
253 | for (const auto &r : results) { | ||
254 | readEntity(r, [&, this](const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation operation) { | ||
255 | for (auto &aggregator : mAggregators) { | ||
256 | if (!aggregator.property.isEmpty()) { | ||
257 | aggregator.process(entity.getProperty(aggregator.property)); | ||
258 | } else { | ||
259 | aggregator.process(); | ||
260 | } | ||
261 | } | ||
262 | auto selectionValue = entity.getProperty(mSelectionProperty); | ||
263 | if (!selectionResultValue.isValid() || compare(selectionValue, selectionResultValue, mSelectionComparator)) { | ||
264 | selectionResultValue = selectionValue; | ||
265 | selectionResult = entity.identifier(); | ||
266 | } | ||
267 | }); | ||
268 | } | ||
269 | |||
270 | QMap<QByteArray, QVariant> aggregateValues; | 273 | QMap<QByteArray, QVariant> aggregateValues; |
271 | for (auto &aggregator : mAggregators) { | 274 | auto selectionResult = reduceOnValue(reductionValue, aggregateValues); |
272 | aggregateValues.insert(aggregator.resultProperty, aggregator.result()); | ||
273 | } | ||
274 | 275 | ||
276 | mSelectedValues.insert(reductionValueBa, selectionResult); | ||
275 | readEntity(selectionResult, [&, this](const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation operation) { | 277 | readEntity(selectionResult, [&, this](const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation operation) { |
276 | callback({entity, operation, aggregateValues}); | 278 | callback({entity, operation, aggregateValues}); |
277 | foundValue = true; | 279 | foundValue = true; |
278 | }); | 280 | }); |
281 | } else { | ||
282 | //During initial query, do nothing. The lookup above will take care of it. | ||
283 | //During updates adjust the reduction according to the modification/addition or removal | ||
284 | if (mIncremental) { | ||
285 | //redo the reduction | ||
286 | QMap<QByteArray, QVariant> aggregateValues; | ||
287 | auto selectionResult = reduceOnValue(reductionValue, aggregateValues); | ||
288 | |||
289 | //TODO if old and new are the same a modification would be enough | ||
290 | auto oldSelectionResult = mSelectedValues.take(reductionValueBa); | ||
291 | //remove old result | ||
292 | readEntity(oldSelectionResult, [&, this](const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation) { | ||
293 | callback({entity, Sink::Operation_Removal}); | ||
294 | }); | ||
295 | |||
296 | //add new result | ||
297 | mSelectedValues.insert(reductionValueBa, selectionResult); | ||
298 | readEntity(selectionResult, [&, this](const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation) { | ||
299 | callback({entity, Sink::Operation_Creation, aggregateValues}); | ||
300 | }); | ||
301 | } | ||
279 | } | 302 | } |
280 | return false; | 303 | return false; |
281 | })) | 304 | })) |
@@ -330,9 +353,23 @@ public: | |||
330 | }; | 353 | }; |
331 | 354 | ||
332 | DataStoreQuery::DataStoreQuery(const Sink::QueryBase &query, const QByteArray &type, EntityStore &store) | 355 | DataStoreQuery::DataStoreQuery(const Sink::QueryBase &query, const QByteArray &type, EntityStore &store) |
333 | : mQuery(query), mType(type), mStore(store), mLogCtx(store.logContext().subContext("datastorequery")) | 356 | : mType(type), mStore(store), mLogCtx(store.logContext().subContext("datastorequery")) |
357 | { | ||
358 | setupQuery(query); | ||
359 | } | ||
360 | |||
361 | DataStoreQuery::DataStoreQuery(const DataStoreQuery::State &state, const QByteArray &type, Sink::Storage::EntityStore &store) | ||
362 | : mType(type), mStore(store), mLogCtx(store.logContext().subContext("datastorequery")) | ||
334 | { | 363 | { |
335 | setupQuery(); | 364 | mCollector = state.mCollector; |
365 | mSource = state.mSource; | ||
366 | |||
367 | auto source = mCollector; | ||
368 | while (source) { | ||
369 | source->mDatastore = this; | ||
370 | source->mIncremental = true; | ||
371 | source = source->mSource; | ||
372 | } | ||
336 | } | 373 | } |
337 | 374 | ||
338 | DataStoreQuery::~DataStoreQuery() | 375 | DataStoreQuery::~DataStoreQuery() |
@@ -340,6 +377,14 @@ DataStoreQuery::~DataStoreQuery() | |||
340 | 377 | ||
341 | } | 378 | } |
342 | 379 | ||
380 | DataStoreQuery::State::Ptr DataStoreQuery::getState() | ||
381 | { | ||
382 | auto state = State::Ptr::create(); | ||
383 | state->mSource = mSource; | ||
384 | state->mCollector = mCollector; | ||
385 | return state; | ||
386 | } | ||
387 | |||
343 | void DataStoreQuery::readEntity(const QByteArray &key, const BufferCallback &resultCallback) | 388 | void DataStoreQuery::readEntity(const QByteArray &key, const BufferCallback &resultCallback) |
344 | { | 389 | { |
345 | mStore.readLatest(mType, key, resultCallback); | 390 | mStore.readLatest(mType, key, resultCallback); |
@@ -443,9 +488,10 @@ QByteArrayList DataStoreQuery::executeSubquery(const QueryBase &subquery) | |||
443 | return ids; | 488 | return ids; |
444 | } | 489 | } |
445 | 490 | ||
446 | void DataStoreQuery::setupQuery() | 491 | void DataStoreQuery::setupQuery(const Sink::QueryBase &query_) |
447 | { | 492 | { |
448 | auto baseFilters = mQuery.getBaseFilters(); | 493 | auto query = query_; |
494 | auto baseFilters = query.getBaseFilters(); | ||
449 | for (const auto &k : baseFilters.keys()) { | 495 | for (const auto &k : baseFilters.keys()) { |
450 | const auto comparator = baseFilters.value(k); | 496 | const auto comparator = baseFilters.value(k); |
451 | if (comparator.value.canConvert<Query>()) { | 497 | if (comparator.value.canConvert<Query>()) { |
@@ -454,44 +500,43 @@ void DataStoreQuery::setupQuery() | |||
454 | baseFilters.insert(k, Query::Comparator(QVariant::fromValue(result), Query::Comparator::In)); | 500 | baseFilters.insert(k, Query::Comparator(QVariant::fromValue(result), Query::Comparator::In)); |
455 | } | 501 | } |
456 | } | 502 | } |
457 | mQuery.setBaseFilters(baseFilters); | 503 | query.setBaseFilters(baseFilters); |
458 | 504 | ||
459 | FilterBase::Ptr baseSet; | 505 | FilterBase::Ptr baseSet; |
460 | QSet<QByteArray> remainingFilters = mQuery.getBaseFilters().keys().toSet(); | 506 | QSet<QByteArray> remainingFilters = query.getBaseFilters().keys().toSet(); |
461 | QByteArray appliedSorting; | 507 | QByteArray appliedSorting; |
462 | if (!mQuery.ids().isEmpty()) { | 508 | if (!query.ids().isEmpty()) { |
463 | mSource = Source::Ptr::create(mQuery.ids().toVector(), this); | 509 | mSource = Source::Ptr::create(query.ids().toVector(), this); |
464 | baseSet = mSource; | 510 | baseSet = mSource; |
465 | } else { | 511 | } else { |
466 | QSet<QByteArray> appliedFilters; | 512 | QSet<QByteArray> appliedFilters; |
467 | 513 | ||
468 | auto resultSet = mStore.indexLookup(mType, mQuery, appliedFilters, appliedSorting); | 514 | auto resultSet = mStore.indexLookup(mType, query, appliedFilters, appliedSorting); |
469 | remainingFilters = remainingFilters - appliedFilters; | 515 | remainingFilters = remainingFilters - appliedFilters; |
470 | 516 | ||
471 | // We do a full scan if there were no indexes available to create the initial set. | 517 | // We do a full scan if there were no indexes available to create the initial set. |
472 | if (appliedFilters.isEmpty()) { | 518 | if (appliedFilters.isEmpty()) { |
473 | // TODO this should be replaced by an index lookup on the uid index | ||
474 | mSource = Source::Ptr::create(mStore.fullScan(mType), this); | 519 | mSource = Source::Ptr::create(mStore.fullScan(mType), this); |
475 | } else { | 520 | } else { |
476 | mSource = Source::Ptr::create(resultSet, this); | 521 | mSource = Source::Ptr::create(resultSet, this); |
477 | } | 522 | } |
478 | baseSet = mSource; | 523 | baseSet = mSource; |
479 | } | 524 | } |
480 | if (!mQuery.getBaseFilters().isEmpty()) { | 525 | if (!query.getBaseFilters().isEmpty()) { |
481 | auto filter = Filter::Ptr::create(baseSet, this); | 526 | auto filter = Filter::Ptr::create(baseSet, this); |
482 | //For incremental queries the remaining filters are not sufficient | 527 | //For incremental queries the remaining filters are not sufficient |
483 | for (const auto &f : mQuery.getBaseFilters().keys()) { | 528 | for (const auto &f : query.getBaseFilters().keys()) { |
484 | filter->propertyFilter.insert(f, mQuery.getFilter(f)); | 529 | filter->propertyFilter.insert(f, query.getFilter(f)); |
485 | } | 530 | } |
486 | baseSet = filter; | 531 | baseSet = filter; |
487 | } | 532 | } |
488 | /* if (appliedSorting.isEmpty() && !mQuery.sortProperty.isEmpty()) { */ | 533 | /* if (appliedSorting.isEmpty() && !query.sortProperty.isEmpty()) { */ |
489 | /* //Apply manual sorting */ | 534 | /* //Apply manual sorting */ |
490 | /* baseSet = Sort::Ptr::create(baseSet, mQuery.sortProperty); */ | 535 | /* baseSet = Sort::Ptr::create(baseSet, query.sortProperty); */ |
491 | /* } */ | 536 | /* } */ |
492 | 537 | ||
493 | //Setup the rest of the filter stages on top of the base set | 538 | //Setup the rest of the filter stages on top of the base set |
494 | for (const auto &stage : mQuery.getFilterStages()) { | 539 | for (const auto &stage : query.getFilterStages()) { |
495 | if (auto filter = stage.dynamicCast<Query::Filter>()) { | 540 | if (auto filter = stage.dynamicCast<Query::Filter>()) { |
496 | auto f = Filter::Ptr::create(baseSet, this); | 541 | auto f = Filter::Ptr::create(baseSet, this); |
497 | f->propertyFilter = filter->propertyFilter; | 542 | f->propertyFilter = filter->propertyFilter; |
@@ -521,7 +566,7 @@ QVector<QByteArray> DataStoreQuery::loadIncrementalResultSet(qint64 baseRevision | |||
521 | 566 | ||
522 | ResultSet DataStoreQuery::update(qint64 baseRevision) | 567 | ResultSet DataStoreQuery::update(qint64 baseRevision) |
523 | { | 568 | { |
524 | SinkTraceCtx(mLogCtx) << "Executing query update to revision " << baseRevision; | 569 | SinkTraceCtx(mLogCtx) << "Executing query update from revision " << baseRevision; |
525 | auto incrementalResultSet = loadIncrementalResultSet(baseRevision); | 570 | auto incrementalResultSet = loadIncrementalResultSet(baseRevision); |
526 | SinkTraceCtx(mLogCtx) << "Incremental changes: " << incrementalResultSet; | 571 | SinkTraceCtx(mLogCtx) << "Incremental changes: " << incrementalResultSet; |
527 | mSource->add(incrementalResultSet); | 572 | mSource->add(incrementalResultSet); |