summaryrefslogtreecommitdiffstats
path: root/common/datastorequery.cpp
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2017-02-11 12:02:58 +0100
committerChristian Mollekopf <chrigi_1@fastmail.fm>2017-02-13 19:42:39 +0100
commit1259b236704e790fa1284a63ec537525bce23841 (patch)
tree85cd0491e56d2f604cc8aa291a49d20f8f73c684 /common/datastorequery.cpp
parentb4bd3932aa2a8e841ed204b341bcbf65ba59c5b2 (diff)
downloadsink-1259b236704e790fa1284a63ec537525bce23841.tar.gz
sink-1259b236704e790fa1284a63ec537525bce23841.zip
Fixed reduction updates with stateful query.
Some filters need to maintain state between runs in order to be able to emit only what has changed. This now also make reduction work for live queries.
Diffstat (limited to 'common/datastorequery.cpp')
-rw-r--r--common/datastorequery.cpp163
1 files changed, 104 insertions, 59 deletions
diff --git a/common/datastorequery.cpp b/common/datastorequery.cpp
index 41d962c..3ba8f40 100644
--- a/common/datastorequery.cpp
+++ b/common/datastorequery.cpp
@@ -72,7 +72,7 @@ class Source : public FilterBase {
72 if (mIt == mIds.constEnd()) { 72 if (mIt == mIds.constEnd()) {
73 return false; 73 return false;
74 } 74 }
75 readEntity(*mIt, [callback](const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation operation) { 75 readEntity(*mIt, [this, callback](const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation operation) {
76 SinkTraceCtx(mDatastore->mLogCtx) << "Source: Read entity: " << entity.identifier() << operationName(operation); 76 SinkTraceCtx(mDatastore->mLogCtx) << "Source: Read entity: " << entity.identifier() << operationName(operation);
77 callback({entity, operation}); 77 callback({entity, operation});
78 }); 78 });
@@ -115,7 +115,7 @@ public:
115 virtual bool next(const std::function<void(const ResultSet::Result &result)> &callback) Q_DECL_OVERRIDE { 115 virtual bool next(const std::function<void(const ResultSet::Result &result)> &callback) Q_DECL_OVERRIDE {
116 bool foundValue = false; 116 bool foundValue = false;
117 while(!foundValue && mSource->next([this, callback, &foundValue](const ResultSet::Result &result) { 117 while(!foundValue && mSource->next([this, callback, &foundValue](const ResultSet::Result &result) {
118 SinkTraceCtx(mDatastore->mLogCtx) << "Filter: " << result.entity.identifier() << result.operation; 118 SinkTraceCtx(mDatastore->mLogCtx) << "Filter: " << result.entity.identifier() << operationName(result.operation);
119 119
120 //Always accept removals. They can't match the filter since the data is gone. 120 //Always accept removals. They can't match the filter since the data is gone.
121 if (result.operation == Sink::Operation_Removal) { 121 if (result.operation == Sink::Operation_Removal) {
@@ -167,17 +167,11 @@ public:
167 167
168 } 168 }
169 169
170 void process() {
171 if (operation == QueryBase::Reduce::Aggregator::Count) {
172 mResult = mResult.toInt() + 1;
173 } else {
174 Q_ASSERT(false);
175 }
176 }
177
178 void process(const QVariant &value) { 170 void process(const QVariant &value) {
179 if (operation == QueryBase::Reduce::Aggregator::Collect) { 171 if (operation == QueryBase::Reduce::Aggregator::Collect) {
180 mResult = mResult.toList() << value; 172 mResult = mResult.toList() << value;
173 } else if (operation == QueryBase::Reduce::Aggregator::Count) {
174 mResult = mResult.toInt() + 1;
181 } else { 175 } else {
182 Q_ASSERT(false); 176 Q_ASSERT(false);
183 } 177 }
@@ -196,8 +190,8 @@ public:
196 QVariant mResult; 190 QVariant mResult;
197 }; 191 };
198 192
199 QHash<QByteArray, QVariant> mAggregateValues;
200 QSet<QByteArray> mReducedValues; 193 QSet<QByteArray> mReducedValues;
194 QHash<QByteArray, QByteArray> mSelectedValues;
201 QByteArray mReductionProperty; 195 QByteArray mReductionProperty;
202 QByteArray mSelectionProperty; 196 QByteArray mSelectionProperty;
203 QueryBase::Reduce::Selector::Comparator mSelectionComparator; 197 QueryBase::Reduce::Selector::Comparator mSelectionComparator;
@@ -231,51 +225,80 @@ public:
231 return false; 225 return false;
232 } 226 }
233 227
228 QByteArray reduceOnValue(const QVariant &reductionValue, QMap<QByteArray, QVariant> &aggregateValues)
229 {
230 QVariant selectionResultValue;
231 QByteArray selectionResult;
232 auto results = indexLookup(mReductionProperty, reductionValue);
233 for (auto &aggregator : mAggregators) {
234 aggregator.reset();
235 }
236
237 for (const auto &r : results) {
238 readEntity(r, [&, this](const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation operation) {
239 Q_ASSERT(operation != Sink::Operation_Removal);
240 for (auto &aggregator : mAggregators) {
241 if (!aggregator.property.isEmpty()) {
242 aggregator.process(entity.getProperty(aggregator.property));
243 } else {
244 aggregator.process(QVariant{});
245 }
246 }
247 auto selectionValue = entity.getProperty(mSelectionProperty);
248 if (!selectionResultValue.isValid() || compare(selectionValue, selectionResultValue, mSelectionComparator)) {
249 selectionResultValue = selectionValue;
250 selectionResult = entity.identifier();
251 }
252 });
253 }
254
255 for (auto &aggregator : mAggregators) {
256 aggregateValues.insert(aggregator.resultProperty, aggregator.result());
257 }
258 return selectionResult;
259 }
260
234 bool next(const std::function<void(const ResultSet::Result &)> &callback) Q_DECL_OVERRIDE { 261 bool next(const std::function<void(const ResultSet::Result &)> &callback) Q_DECL_OVERRIDE {
235 bool foundValue = false; 262 bool foundValue = false;
236 while(!foundValue && mSource->next([this, callback, &foundValue](const ResultSet::Result &result) { 263 while(!foundValue && mSource->next([this, callback, &foundValue](const ResultSet::Result &result) {
237 auto reductionValue = result.entity.getProperty(mReductionProperty);
238 if (result.operation == Sink::Operation_Removal) { 264 if (result.operation == Sink::Operation_Removal) {
239 callback(result); 265 callback(result);
240 return false; 266 return false;
241 } 267 }
242 if (!mReducedValues.contains(getByteArray(reductionValue))) { 268 auto reductionValue = result.entity.getProperty(mReductionProperty);
269 const auto &reductionValueBa = getByteArray(reductionValue);
270 if (!mReducedValues.contains(reductionValueBa)) {
243 //Only reduce every value once. 271 //Only reduce every value once.
244 mReducedValues.insert(getByteArray(reductionValue)); 272 mReducedValues.insert(reductionValueBa);
245 QVariant selectionResultValue;
246 QByteArray selectionResult;
247 auto results = indexLookup(mReductionProperty, reductionValue);
248 for (auto &aggregator : mAggregators) {
249 aggregator.reset();
250 }
251
252 QVariantList list;
253 for (const auto &r : results) {
254 readEntity(r, [&, this](const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation operation) {
255 for (auto &aggregator : mAggregators) {
256 if (!aggregator.property.isEmpty()) {
257 aggregator.process(entity.getProperty(aggregator.property));
258 } else {
259 aggregator.process();
260 }
261 }
262 auto selectionValue = entity.getProperty(mSelectionProperty);
263 if (!selectionResultValue.isValid() || compare(selectionValue, selectionResultValue, mSelectionComparator)) {
264 selectionResultValue = selectionValue;
265 selectionResult = entity.identifier();
266 }
267 });
268 }
269
270 QMap<QByteArray, QVariant> aggregateValues; 273 QMap<QByteArray, QVariant> aggregateValues;
271 for (auto &aggregator : mAggregators) { 274 auto selectionResult = reduceOnValue(reductionValue, aggregateValues);
272 aggregateValues.insert(aggregator.resultProperty, aggregator.result());
273 }
274 275
276 mSelectedValues.insert(reductionValueBa, selectionResult);
275 readEntity(selectionResult, [&, this](const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation operation) { 277 readEntity(selectionResult, [&, this](const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation operation) {
276 callback({entity, operation, aggregateValues}); 278 callback({entity, operation, aggregateValues});
277 foundValue = true; 279 foundValue = true;
278 }); 280 });
281 } else {
282 //During initial query, do nothing. The lookup above will take care of it.
283 //During updates adjust the reduction according to the modification/addition or removal
284 if (mIncremental) {
285 //redo the reduction
286 QMap<QByteArray, QVariant> aggregateValues;
287 auto selectionResult = reduceOnValue(reductionValue, aggregateValues);
288
289 //TODO if old and new are the same a modification would be enough
290 auto oldSelectionResult = mSelectedValues.take(reductionValueBa);
291 //remove old result
292 readEntity(oldSelectionResult, [&, this](const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation) {
293 callback({entity, Sink::Operation_Removal});
294 });
295
296 //add new result
297 mSelectedValues.insert(reductionValueBa, selectionResult);
298 readEntity(selectionResult, [&, this](const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation) {
299 callback({entity, Sink::Operation_Creation, aggregateValues});
300 });
301 }
279 } 302 }
280 return false; 303 return false;
281 })) 304 }))
@@ -330,9 +353,23 @@ public:
330}; 353};
331 354
332DataStoreQuery::DataStoreQuery(const Sink::QueryBase &query, const QByteArray &type, EntityStore &store) 355DataStoreQuery::DataStoreQuery(const Sink::QueryBase &query, const QByteArray &type, EntityStore &store)
333 : mQuery(query), mType(type), mStore(store), mLogCtx(store.logContext().subContext("datastorequery")) 356 : mType(type), mStore(store), mLogCtx(store.logContext().subContext("datastorequery"))
357{
358 setupQuery(query);
359}
360
361DataStoreQuery::DataStoreQuery(const DataStoreQuery::State &state, const QByteArray &type, Sink::Storage::EntityStore &store)
362 : mType(type), mStore(store), mLogCtx(store.logContext().subContext("datastorequery"))
334{ 363{
335 setupQuery(); 364 mCollector = state.mCollector;
365 mSource = state.mSource;
366
367 auto source = mCollector;
368 while (source) {
369 source->mDatastore = this;
370 source->mIncremental = true;
371 source = source->mSource;
372 }
336} 373}
337 374
338DataStoreQuery::~DataStoreQuery() 375DataStoreQuery::~DataStoreQuery()
@@ -340,6 +377,14 @@ DataStoreQuery::~DataStoreQuery()
340 377
341} 378}
342 379
380DataStoreQuery::State::Ptr DataStoreQuery::getState()
381{
382 auto state = State::Ptr::create();
383 state->mSource = mSource;
384 state->mCollector = mCollector;
385 return state;
386}
387
343void DataStoreQuery::readEntity(const QByteArray &key, const BufferCallback &resultCallback) 388void DataStoreQuery::readEntity(const QByteArray &key, const BufferCallback &resultCallback)
344{ 389{
345 mStore.readLatest(mType, key, resultCallback); 390 mStore.readLatest(mType, key, resultCallback);
@@ -443,9 +488,10 @@ QByteArrayList DataStoreQuery::executeSubquery(const QueryBase &subquery)
443 return ids; 488 return ids;
444} 489}
445 490
446void DataStoreQuery::setupQuery() 491void DataStoreQuery::setupQuery(const Sink::QueryBase &query_)
447{ 492{
448 auto baseFilters = mQuery.getBaseFilters(); 493 auto query = query_;
494 auto baseFilters = query.getBaseFilters();
449 for (const auto &k : baseFilters.keys()) { 495 for (const auto &k : baseFilters.keys()) {
450 const auto comparator = baseFilters.value(k); 496 const auto comparator = baseFilters.value(k);
451 if (comparator.value.canConvert<Query>()) { 497 if (comparator.value.canConvert<Query>()) {
@@ -454,44 +500,43 @@ void DataStoreQuery::setupQuery()
454 baseFilters.insert(k, Query::Comparator(QVariant::fromValue(result), Query::Comparator::In)); 500 baseFilters.insert(k, Query::Comparator(QVariant::fromValue(result), Query::Comparator::In));
455 } 501 }
456 } 502 }
457 mQuery.setBaseFilters(baseFilters); 503 query.setBaseFilters(baseFilters);
458 504
459 FilterBase::Ptr baseSet; 505 FilterBase::Ptr baseSet;
460 QSet<QByteArray> remainingFilters = mQuery.getBaseFilters().keys().toSet(); 506 QSet<QByteArray> remainingFilters = query.getBaseFilters().keys().toSet();
461 QByteArray appliedSorting; 507 QByteArray appliedSorting;
462 if (!mQuery.ids().isEmpty()) { 508 if (!query.ids().isEmpty()) {
463 mSource = Source::Ptr::create(mQuery.ids().toVector(), this); 509 mSource = Source::Ptr::create(query.ids().toVector(), this);
464 baseSet = mSource; 510 baseSet = mSource;
465 } else { 511 } else {
466 QSet<QByteArray> appliedFilters; 512 QSet<QByteArray> appliedFilters;
467 513
468 auto resultSet = mStore.indexLookup(mType, mQuery, appliedFilters, appliedSorting); 514 auto resultSet = mStore.indexLookup(mType, query, appliedFilters, appliedSorting);
469 remainingFilters = remainingFilters - appliedFilters; 515 remainingFilters = remainingFilters - appliedFilters;
470 516
471 // We do a full scan if there were no indexes available to create the initial set. 517 // We do a full scan if there were no indexes available to create the initial set.
472 if (appliedFilters.isEmpty()) { 518 if (appliedFilters.isEmpty()) {
473 // TODO this should be replaced by an index lookup on the uid index
474 mSource = Source::Ptr::create(mStore.fullScan(mType), this); 519 mSource = Source::Ptr::create(mStore.fullScan(mType), this);
475 } else { 520 } else {
476 mSource = Source::Ptr::create(resultSet, this); 521 mSource = Source::Ptr::create(resultSet, this);
477 } 522 }
478 baseSet = mSource; 523 baseSet = mSource;
479 } 524 }
480 if (!mQuery.getBaseFilters().isEmpty()) { 525 if (!query.getBaseFilters().isEmpty()) {
481 auto filter = Filter::Ptr::create(baseSet, this); 526 auto filter = Filter::Ptr::create(baseSet, this);
482 //For incremental queries the remaining filters are not sufficient 527 //For incremental queries the remaining filters are not sufficient
483 for (const auto &f : mQuery.getBaseFilters().keys()) { 528 for (const auto &f : query.getBaseFilters().keys()) {
484 filter->propertyFilter.insert(f, mQuery.getFilter(f)); 529 filter->propertyFilter.insert(f, query.getFilter(f));
485 } 530 }
486 baseSet = filter; 531 baseSet = filter;
487 } 532 }
488 /* if (appliedSorting.isEmpty() && !mQuery.sortProperty.isEmpty()) { */ 533 /* if (appliedSorting.isEmpty() && !query.sortProperty.isEmpty()) { */
489 /* //Apply manual sorting */ 534 /* //Apply manual sorting */
490 /* baseSet = Sort::Ptr::create(baseSet, mQuery.sortProperty); */ 535 /* baseSet = Sort::Ptr::create(baseSet, query.sortProperty); */
491 /* } */ 536 /* } */
492 537
493 //Setup the rest of the filter stages on top of the base set 538 //Setup the rest of the filter stages on top of the base set
494 for (const auto &stage : mQuery.getFilterStages()) { 539 for (const auto &stage : query.getFilterStages()) {
495 if (auto filter = stage.dynamicCast<Query::Filter>()) { 540 if (auto filter = stage.dynamicCast<Query::Filter>()) {
496 auto f = Filter::Ptr::create(baseSet, this); 541 auto f = Filter::Ptr::create(baseSet, this);
497 f->propertyFilter = filter->propertyFilter; 542 f->propertyFilter = filter->propertyFilter;
@@ -521,7 +566,7 @@ QVector<QByteArray> DataStoreQuery::loadIncrementalResultSet(qint64 baseRevision
521 566
522ResultSet DataStoreQuery::update(qint64 baseRevision) 567ResultSet DataStoreQuery::update(qint64 baseRevision)
523{ 568{
524 SinkTraceCtx(mLogCtx) << "Executing query update to revision " << baseRevision; 569 SinkTraceCtx(mLogCtx) << "Executing query update from revision " << baseRevision;
525 auto incrementalResultSet = loadIncrementalResultSet(baseRevision); 570 auto incrementalResultSet = loadIncrementalResultSet(baseRevision);
526 SinkTraceCtx(mLogCtx) << "Incremental changes: " << incrementalResultSet; 571 SinkTraceCtx(mLogCtx) << "Incremental changes: " << incrementalResultSet;
527 mSource->add(incrementalResultSet); 572 mSource->add(incrementalResultSet);