diff options
Diffstat (limited to 'common/datastorequery.cpp')
-rw-r--r-- | common/datastorequery.cpp | 163 |
1 files changed, 104 insertions, 59 deletions
diff --git a/common/datastorequery.cpp b/common/datastorequery.cpp index 41d962c..3ba8f40 100644 --- a/common/datastorequery.cpp +++ b/common/datastorequery.cpp | |||
@@ -72,7 +72,7 @@ class Source : public FilterBase { | |||
72 | if (mIt == mIds.constEnd()) { | 72 | if (mIt == mIds.constEnd()) { |
73 | return false; | 73 | return false; |
74 | } | 74 | } |
75 | readEntity(*mIt, [callback](const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation operation) { | 75 | readEntity(*mIt, [this, callback](const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation operation) { |
76 | SinkTraceCtx(mDatastore->mLogCtx) << "Source: Read entity: " << entity.identifier() << operationName(operation); | 76 | SinkTraceCtx(mDatastore->mLogCtx) << "Source: Read entity: " << entity.identifier() << operationName(operation); |
77 | callback({entity, operation}); | 77 | callback({entity, operation}); |
78 | }); | 78 | }); |
@@ -115,7 +115,7 @@ public: | |||
115 | virtual bool next(const std::function<void(const ResultSet::Result &result)> &callback) Q_DECL_OVERRIDE { | 115 | virtual bool next(const std::function<void(const ResultSet::Result &result)> &callback) Q_DECL_OVERRIDE { |
116 | bool foundValue = false; | 116 | bool foundValue = false; |
117 | while(!foundValue && mSource->next([this, callback, &foundValue](const ResultSet::Result &result) { | 117 | while(!foundValue && mSource->next([this, callback, &foundValue](const ResultSet::Result &result) { |
118 | SinkTraceCtx(mDatastore->mLogCtx) << "Filter: " << result.entity.identifier() << result.operation; | 118 | SinkTraceCtx(mDatastore->mLogCtx) << "Filter: " << result.entity.identifier() << operationName(result.operation); |
119 | 119 | ||
120 | //Always accept removals. They can't match the filter since the data is gone. | 120 | //Always accept removals. They can't match the filter since the data is gone. |
121 | if (result.operation == Sink::Operation_Removal) { | 121 | if (result.operation == Sink::Operation_Removal) { |
@@ -167,17 +167,11 @@ public: | |||
167 | 167 | ||
168 | } | 168 | } |
169 | 169 | ||
170 | void process() { | ||
171 | if (operation == QueryBase::Reduce::Aggregator::Count) { | ||
172 | mResult = mResult.toInt() + 1; | ||
173 | } else { | ||
174 | Q_ASSERT(false); | ||
175 | } | ||
176 | } | ||
177 | |||
178 | void process(const QVariant &value) { | 170 | void process(const QVariant &value) { |
179 | if (operation == QueryBase::Reduce::Aggregator::Collect) { | 171 | if (operation == QueryBase::Reduce::Aggregator::Collect) { |
180 | mResult = mResult.toList() << value; | 172 | mResult = mResult.toList() << value; |
173 | } else if (operation == QueryBase::Reduce::Aggregator::Count) { | ||
174 | mResult = mResult.toInt() + 1; | ||
181 | } else { | 175 | } else { |
182 | Q_ASSERT(false); | 176 | Q_ASSERT(false); |
183 | } | 177 | } |
@@ -196,8 +190,8 @@ public: | |||
196 | QVariant mResult; | 190 | QVariant mResult; |
197 | }; | 191 | }; |
198 | 192 | ||
199 | QHash<QByteArray, QVariant> mAggregateValues; | ||
200 | QSet<QByteArray> mReducedValues; | 193 | QSet<QByteArray> mReducedValues; |
194 | QHash<QByteArray, QByteArray> mSelectedValues; | ||
201 | QByteArray mReductionProperty; | 195 | QByteArray mReductionProperty; |
202 | QByteArray mSelectionProperty; | 196 | QByteArray mSelectionProperty; |
203 | QueryBase::Reduce::Selector::Comparator mSelectionComparator; | 197 | QueryBase::Reduce::Selector::Comparator mSelectionComparator; |
@@ -231,51 +225,80 @@ public: | |||
231 | return false; | 225 | return false; |
232 | } | 226 | } |
233 | 227 | ||
228 | QByteArray reduceOnValue(const QVariant &reductionValue, QMap<QByteArray, QVariant> &aggregateValues) | ||
229 | { | ||
230 | QVariant selectionResultValue; | ||
231 | QByteArray selectionResult; | ||
232 | auto results = indexLookup(mReductionProperty, reductionValue); | ||
233 | for (auto &aggregator : mAggregators) { | ||
234 | aggregator.reset(); | ||
235 | } | ||
236 | |||
237 | for (const auto &r : results) { | ||
238 | readEntity(r, [&, this](const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation operation) { | ||
239 | Q_ASSERT(operation != Sink::Operation_Removal); | ||
240 | for (auto &aggregator : mAggregators) { | ||
241 | if (!aggregator.property.isEmpty()) { | ||
242 | aggregator.process(entity.getProperty(aggregator.property)); | ||
243 | } else { | ||
244 | aggregator.process(QVariant{}); | ||
245 | } | ||
246 | } | ||
247 | auto selectionValue = entity.getProperty(mSelectionProperty); | ||
248 | if (!selectionResultValue.isValid() || compare(selectionValue, selectionResultValue, mSelectionComparator)) { | ||
249 | selectionResultValue = selectionValue; | ||
250 | selectionResult = entity.identifier(); | ||
251 | } | ||
252 | }); | ||
253 | } | ||
254 | |||
255 | for (auto &aggregator : mAggregators) { | ||
256 | aggregateValues.insert(aggregator.resultProperty, aggregator.result()); | ||
257 | } | ||
258 | return selectionResult; | ||
259 | } | ||
260 | |||
234 | bool next(const std::function<void(const ResultSet::Result &)> &callback) Q_DECL_OVERRIDE { | 261 | bool next(const std::function<void(const ResultSet::Result &)> &callback) Q_DECL_OVERRIDE { |
235 | bool foundValue = false; | 262 | bool foundValue = false; |
236 | while(!foundValue && mSource->next([this, callback, &foundValue](const ResultSet::Result &result) { | 263 | while(!foundValue && mSource->next([this, callback, &foundValue](const ResultSet::Result &result) { |
237 | auto reductionValue = result.entity.getProperty(mReductionProperty); | ||
238 | if (result.operation == Sink::Operation_Removal) { | 264 | if (result.operation == Sink::Operation_Removal) { |
239 | callback(result); | 265 | callback(result); |
240 | return false; | 266 | return false; |
241 | } | 267 | } |
242 | if (!mReducedValues.contains(getByteArray(reductionValue))) { | 268 | auto reductionValue = result.entity.getProperty(mReductionProperty); |
269 | const auto &reductionValueBa = getByteArray(reductionValue); | ||
270 | if (!mReducedValues.contains(reductionValueBa)) { | ||
243 | //Only reduce every value once. | 271 | //Only reduce every value once. |
244 | mReducedValues.insert(getByteArray(reductionValue)); | 272 | mReducedValues.insert(reductionValueBa); |
245 | QVariant selectionResultValue; | ||
246 | QByteArray selectionResult; | ||
247 | auto results = indexLookup(mReductionProperty, reductionValue); | ||
248 | for (auto &aggregator : mAggregators) { | ||
249 | aggregator.reset(); | ||
250 | } | ||
251 | |||
252 | QVariantList list; | ||
253 | for (const auto &r : results) { | ||
254 | readEntity(r, [&, this](const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation operation) { | ||
255 | for (auto &aggregator : mAggregators) { | ||
256 | if (!aggregator.property.isEmpty()) { | ||
257 | aggregator.process(entity.getProperty(aggregator.property)); | ||
258 | } else { | ||
259 | aggregator.process(); | ||
260 | } | ||
261 | } | ||
262 | auto selectionValue = entity.getProperty(mSelectionProperty); | ||
263 | if (!selectionResultValue.isValid() || compare(selectionValue, selectionResultValue, mSelectionComparator)) { | ||
264 | selectionResultValue = selectionValue; | ||
265 | selectionResult = entity.identifier(); | ||
266 | } | ||
267 | }); | ||
268 | } | ||
269 | |||
270 | QMap<QByteArray, QVariant> aggregateValues; | 273 | QMap<QByteArray, QVariant> aggregateValues; |
271 | for (auto &aggregator : mAggregators) { | 274 | auto selectionResult = reduceOnValue(reductionValue, aggregateValues); |
272 | aggregateValues.insert(aggregator.resultProperty, aggregator.result()); | ||
273 | } | ||
274 | 275 | ||
276 | mSelectedValues.insert(reductionValueBa, selectionResult); | ||
275 | readEntity(selectionResult, [&, this](const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation operation) { | 277 | readEntity(selectionResult, [&, this](const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation operation) { |
276 | callback({entity, operation, aggregateValues}); | 278 | callback({entity, operation, aggregateValues}); |
277 | foundValue = true; | 279 | foundValue = true; |
278 | }); | 280 | }); |
281 | } else { | ||
282 | //During initial query, do nothing. The lookup above will take care of it. | ||
283 | //During updates adjust the reduction according to the modification/addition or removal | ||
284 | if (mIncremental) { | ||
285 | //redo the reduction | ||
286 | QMap<QByteArray, QVariant> aggregateValues; | ||
287 | auto selectionResult = reduceOnValue(reductionValue, aggregateValues); | ||
288 | |||
289 | //TODO if old and new are the same a modification would be enough | ||
290 | auto oldSelectionResult = mSelectedValues.take(reductionValueBa); | ||
291 | //remove old result | ||
292 | readEntity(oldSelectionResult, [&, this](const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation) { | ||
293 | callback({entity, Sink::Operation_Removal}); | ||
294 | }); | ||
295 | |||
296 | //add new result | ||
297 | mSelectedValues.insert(reductionValueBa, selectionResult); | ||
298 | readEntity(selectionResult, [&, this](const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation) { | ||
299 | callback({entity, Sink::Operation_Creation, aggregateValues}); | ||
300 | }); | ||
301 | } | ||
279 | } | 302 | } |
280 | return false; | 303 | return false; |
281 | })) | 304 | })) |
@@ -330,9 +353,23 @@ public: | |||
330 | }; | 353 | }; |
331 | 354 | ||
332 | DataStoreQuery::DataStoreQuery(const Sink::QueryBase &query, const QByteArray &type, EntityStore &store) | 355 | DataStoreQuery::DataStoreQuery(const Sink::QueryBase &query, const QByteArray &type, EntityStore &store) |
333 | : mQuery(query), mType(type), mStore(store), mLogCtx(store.logContext().subContext("datastorequery")) | 356 | : mType(type), mStore(store), mLogCtx(store.logContext().subContext("datastorequery")) |
357 | { | ||
358 | setupQuery(query); | ||
359 | } | ||
360 | |||
361 | DataStoreQuery::DataStoreQuery(const DataStoreQuery::State &state, const QByteArray &type, Sink::Storage::EntityStore &store) | ||
362 | : mType(type), mStore(store), mLogCtx(store.logContext().subContext("datastorequery")) | ||
334 | { | 363 | { |
335 | setupQuery(); | 364 | mCollector = state.mCollector; |
365 | mSource = state.mSource; | ||
366 | |||
367 | auto source = mCollector; | ||
368 | while (source) { | ||
369 | source->mDatastore = this; | ||
370 | source->mIncremental = true; | ||
371 | source = source->mSource; | ||
372 | } | ||
336 | } | 373 | } |
337 | 374 | ||
338 | DataStoreQuery::~DataStoreQuery() | 375 | DataStoreQuery::~DataStoreQuery() |
@@ -340,6 +377,14 @@ DataStoreQuery::~DataStoreQuery() | |||
340 | 377 | ||
341 | } | 378 | } |
342 | 379 | ||
380 | DataStoreQuery::State::Ptr DataStoreQuery::getState() | ||
381 | { | ||
382 | auto state = State::Ptr::create(); | ||
383 | state->mSource = mSource; | ||
384 | state->mCollector = mCollector; | ||
385 | return state; | ||
386 | } | ||
387 | |||
343 | void DataStoreQuery::readEntity(const QByteArray &key, const BufferCallback &resultCallback) | 388 | void DataStoreQuery::readEntity(const QByteArray &key, const BufferCallback &resultCallback) |
344 | { | 389 | { |
345 | mStore.readLatest(mType, key, resultCallback); | 390 | mStore.readLatest(mType, key, resultCallback); |
@@ -443,9 +488,10 @@ QByteArrayList DataStoreQuery::executeSubquery(const QueryBase &subquery) | |||
443 | return ids; | 488 | return ids; |
444 | } | 489 | } |
445 | 490 | ||
446 | void DataStoreQuery::setupQuery() | 491 | void DataStoreQuery::setupQuery(const Sink::QueryBase &query_) |
447 | { | 492 | { |
448 | auto baseFilters = mQuery.getBaseFilters(); | 493 | auto query = query_; |
494 | auto baseFilters = query.getBaseFilters(); | ||
449 | for (const auto &k : baseFilters.keys()) { | 495 | for (const auto &k : baseFilters.keys()) { |
450 | const auto comparator = baseFilters.value(k); | 496 | const auto comparator = baseFilters.value(k); |
451 | if (comparator.value.canConvert<Query>()) { | 497 | if (comparator.value.canConvert<Query>()) { |
@@ -454,44 +500,43 @@ void DataStoreQuery::setupQuery() | |||
454 | baseFilters.insert(k, Query::Comparator(QVariant::fromValue(result), Query::Comparator::In)); | 500 | baseFilters.insert(k, Query::Comparator(QVariant::fromValue(result), Query::Comparator::In)); |
455 | } | 501 | } |
456 | } | 502 | } |
457 | mQuery.setBaseFilters(baseFilters); | 503 | query.setBaseFilters(baseFilters); |
458 | 504 | ||
459 | FilterBase::Ptr baseSet; | 505 | FilterBase::Ptr baseSet; |
460 | QSet<QByteArray> remainingFilters = mQuery.getBaseFilters().keys().toSet(); | 506 | QSet<QByteArray> remainingFilters = query.getBaseFilters().keys().toSet(); |
461 | QByteArray appliedSorting; | 507 | QByteArray appliedSorting; |
462 | if (!mQuery.ids().isEmpty()) { | 508 | if (!query.ids().isEmpty()) { |
463 | mSource = Source::Ptr::create(mQuery.ids().toVector(), this); | 509 | mSource = Source::Ptr::create(query.ids().toVector(), this); |
464 | baseSet = mSource; | 510 | baseSet = mSource; |
465 | } else { | 511 | } else { |
466 | QSet<QByteArray> appliedFilters; | 512 | QSet<QByteArray> appliedFilters; |
467 | 513 | ||
468 | auto resultSet = mStore.indexLookup(mType, mQuery, appliedFilters, appliedSorting); | 514 | auto resultSet = mStore.indexLookup(mType, query, appliedFilters, appliedSorting); |
469 | remainingFilters = remainingFilters - appliedFilters; | 515 | remainingFilters = remainingFilters - appliedFilters; |
470 | 516 | ||
471 | // We do a full scan if there were no indexes available to create the initial set. | 517 | // We do a full scan if there were no indexes available to create the initial set. |
472 | if (appliedFilters.isEmpty()) { | 518 | if (appliedFilters.isEmpty()) { |
473 | // TODO this should be replaced by an index lookup on the uid index | ||
474 | mSource = Source::Ptr::create(mStore.fullScan(mType), this); | 519 | mSource = Source::Ptr::create(mStore.fullScan(mType), this); |
475 | } else { | 520 | } else { |
476 | mSource = Source::Ptr::create(resultSet, this); | 521 | mSource = Source::Ptr::create(resultSet, this); |
477 | } | 522 | } |
478 | baseSet = mSource; | 523 | baseSet = mSource; |
479 | } | 524 | } |
480 | if (!mQuery.getBaseFilters().isEmpty()) { | 525 | if (!query.getBaseFilters().isEmpty()) { |
481 | auto filter = Filter::Ptr::create(baseSet, this); | 526 | auto filter = Filter::Ptr::create(baseSet, this); |
482 | //For incremental queries the remaining filters are not sufficient | 527 | //For incremental queries the remaining filters are not sufficient |
483 | for (const auto &f : mQuery.getBaseFilters().keys()) { | 528 | for (const auto &f : query.getBaseFilters().keys()) { |
484 | filter->propertyFilter.insert(f, mQuery.getFilter(f)); | 529 | filter->propertyFilter.insert(f, query.getFilter(f)); |
485 | } | 530 | } |
486 | baseSet = filter; | 531 | baseSet = filter; |
487 | } | 532 | } |
488 | /* if (appliedSorting.isEmpty() && !mQuery.sortProperty.isEmpty()) { */ | 533 | /* if (appliedSorting.isEmpty() && !query.sortProperty.isEmpty()) { */ |
489 | /* //Apply manual sorting */ | 534 | /* //Apply manual sorting */ |
490 | /* baseSet = Sort::Ptr::create(baseSet, mQuery.sortProperty); */ | 535 | /* baseSet = Sort::Ptr::create(baseSet, query.sortProperty); */ |
491 | /* } */ | 536 | /* } */ |
492 | 537 | ||
493 | //Setup the rest of the filter stages on top of the base set | 538 | //Setup the rest of the filter stages on top of the base set |
494 | for (const auto &stage : mQuery.getFilterStages()) { | 539 | for (const auto &stage : query.getFilterStages()) { |
495 | if (auto filter = stage.dynamicCast<Query::Filter>()) { | 540 | if (auto filter = stage.dynamicCast<Query::Filter>()) { |
496 | auto f = Filter::Ptr::create(baseSet, this); | 541 | auto f = Filter::Ptr::create(baseSet, this); |
497 | f->propertyFilter = filter->propertyFilter; | 542 | f->propertyFilter = filter->propertyFilter; |
@@ -521,7 +566,7 @@ QVector<QByteArray> DataStoreQuery::loadIncrementalResultSet(qint64 baseRevision | |||
521 | 566 | ||
522 | ResultSet DataStoreQuery::update(qint64 baseRevision) | 567 | ResultSet DataStoreQuery::update(qint64 baseRevision) |
523 | { | 568 | { |
524 | SinkTraceCtx(mLogCtx) << "Executing query update to revision " << baseRevision; | 569 | SinkTraceCtx(mLogCtx) << "Executing query update from revision " << baseRevision; |
525 | auto incrementalResultSet = loadIncrementalResultSet(baseRevision); | 570 | auto incrementalResultSet = loadIncrementalResultSet(baseRevision); |
526 | SinkTraceCtx(mLogCtx) << "Incremental changes: " << incrementalResultSet; | 571 | SinkTraceCtx(mLogCtx) << "Incremental changes: " << incrementalResultSet; |
527 | mSource->add(incrementalResultSet); | 572 | mSource->add(incrementalResultSet); |