summaryrefslogtreecommitdiffstats
path: root/common/datastorequery.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'common/datastorequery.cpp')
-rw-r--r--common/datastorequery.cpp163
1 files changed, 104 insertions, 59 deletions
diff --git a/common/datastorequery.cpp b/common/datastorequery.cpp
index 41d962c..3ba8f40 100644
--- a/common/datastorequery.cpp
+++ b/common/datastorequery.cpp
@@ -72,7 +72,7 @@ class Source : public FilterBase {
72 if (mIt == mIds.constEnd()) { 72 if (mIt == mIds.constEnd()) {
73 return false; 73 return false;
74 } 74 }
75 readEntity(*mIt, [callback](const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation operation) { 75 readEntity(*mIt, [this, callback](const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation operation) {
76 SinkTraceCtx(mDatastore->mLogCtx) << "Source: Read entity: " << entity.identifier() << operationName(operation); 76 SinkTraceCtx(mDatastore->mLogCtx) << "Source: Read entity: " << entity.identifier() << operationName(operation);
77 callback({entity, operation}); 77 callback({entity, operation});
78 }); 78 });
@@ -115,7 +115,7 @@ public:
115 virtual bool next(const std::function<void(const ResultSet::Result &result)> &callback) Q_DECL_OVERRIDE { 115 virtual bool next(const std::function<void(const ResultSet::Result &result)> &callback) Q_DECL_OVERRIDE {
116 bool foundValue = false; 116 bool foundValue = false;
117 while(!foundValue && mSource->next([this, callback, &foundValue](const ResultSet::Result &result) { 117 while(!foundValue && mSource->next([this, callback, &foundValue](const ResultSet::Result &result) {
118 SinkTraceCtx(mDatastore->mLogCtx) << "Filter: " << result.entity.identifier() << result.operation; 118 SinkTraceCtx(mDatastore->mLogCtx) << "Filter: " << result.entity.identifier() << operationName(result.operation);
119 119
120 //Always accept removals. They can't match the filter since the data is gone. 120 //Always accept removals. They can't match the filter since the data is gone.
121 if (result.operation == Sink::Operation_Removal) { 121 if (result.operation == Sink::Operation_Removal) {
@@ -167,17 +167,11 @@ public:
167 167
168 } 168 }
169 169
170 void process() {
171 if (operation == QueryBase::Reduce::Aggregator::Count) {
172 mResult = mResult.toInt() + 1;
173 } else {
174 Q_ASSERT(false);
175 }
176 }
177
178 void process(const QVariant &value) { 170 void process(const QVariant &value) {
179 if (operation == QueryBase::Reduce::Aggregator::Collect) { 171 if (operation == QueryBase::Reduce::Aggregator::Collect) {
180 mResult = mResult.toList() << value; 172 mResult = mResult.toList() << value;
173 } else if (operation == QueryBase::Reduce::Aggregator::Count) {
174 mResult = mResult.toInt() + 1;
181 } else { 175 } else {
182 Q_ASSERT(false); 176 Q_ASSERT(false);
183 } 177 }
@@ -196,8 +190,8 @@ public:
196 QVariant mResult; 190 QVariant mResult;
197 }; 191 };
198 192
199 QHash<QByteArray, QVariant> mAggregateValues;
200 QSet<QByteArray> mReducedValues; 193 QSet<QByteArray> mReducedValues;
194 QHash<QByteArray, QByteArray> mSelectedValues;
201 QByteArray mReductionProperty; 195 QByteArray mReductionProperty;
202 QByteArray mSelectionProperty; 196 QByteArray mSelectionProperty;
203 QueryBase::Reduce::Selector::Comparator mSelectionComparator; 197 QueryBase::Reduce::Selector::Comparator mSelectionComparator;
@@ -231,51 +225,80 @@ public:
231 return false; 225 return false;
232 } 226 }
233 227
228 QByteArray reduceOnValue(const QVariant &reductionValue, QMap<QByteArray, QVariant> &aggregateValues)
229 {
230 QVariant selectionResultValue;
231 QByteArray selectionResult;
232 auto results = indexLookup(mReductionProperty, reductionValue);
233 for (auto &aggregator : mAggregators) {
234 aggregator.reset();
235 }
236
237 for (const auto &r : results) {
238 readEntity(r, [&, this](const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation operation) {
239 Q_ASSERT(operation != Sink::Operation_Removal);
240 for (auto &aggregator : mAggregators) {
241 if (!aggregator.property.isEmpty()) {
242 aggregator.process(entity.getProperty(aggregator.property));
243 } else {
244 aggregator.process(QVariant{});
245 }
246 }
247 auto selectionValue = entity.getProperty(mSelectionProperty);
248 if (!selectionResultValue.isValid() || compare(selectionValue, selectionResultValue, mSelectionComparator)) {
249 selectionResultValue = selectionValue;
250 selectionResult = entity.identifier();
251 }
252 });
253 }
254
255 for (auto &aggregator : mAggregators) {
256 aggregateValues.insert(aggregator.resultProperty, aggregator.result());
257 }
258 return selectionResult;
259 }
260
234 bool next(const std::function<void(const ResultSet::Result &)> &callback) Q_DECL_OVERRIDE { 261 bool next(const std::function<void(const ResultSet::Result &)> &callback) Q_DECL_OVERRIDE {
235 bool foundValue = false; 262 bool foundValue = false;
236 while(!foundValue && mSource->next([this, callback, &foundValue](const ResultSet::Result &result) { 263 while(!foundValue && mSource->next([this, callback, &foundValue](const ResultSet::Result &result) {
237 auto reductionValue = result.entity.getProperty(mReductionProperty);
238 if (result.operation == Sink::Operation_Removal) { 264 if (result.operation == Sink::Operation_Removal) {
239 callback(result); 265 callback(result);
240 return false; 266 return false;
241 } 267 }
242 if (!mReducedValues.contains(getByteArray(reductionValue))) { 268 auto reductionValue = result.entity.getProperty(mReductionProperty);
269 const auto &reductionValueBa = getByteArray(reductionValue);
270 if (!mReducedValues.contains(reductionValueBa)) {
243 //Only reduce every value once. 271 //Only reduce every value once.
244 mReducedValues.insert(getByteArray(reductionValue)); 272 mReducedValues.insert(reductionValueBa);
245 QVariant selectionResultValue;
246 QByteArray selectionResult;
247 auto results = indexLookup(mReductionProperty, reductionValue);
248 for (auto &aggregator : mAggregators) {
249 aggregator.reset();
250 }
251
252 QVariantList list;
253 for (const auto &r : results) {
254 readEntity(r, [&, this](const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation operation) {
255 for (auto &aggregator : mAggregators) {
256 if (!aggregator.property.isEmpty()) {
257 aggregator.process(entity.getProperty(aggregator.property));
258 } else {
259 aggregator.process();
260 }
261 }
262 auto selectionValue = entity.getProperty(mSelectionProperty);
263 if (!selectionResultValue.isValid() || compare(selectionValue, selectionResultValue, mSelectionComparator)) {
264 selectionResultValue = selectionValue;
265 selectionResult = entity.identifier();
266 }
267 });
268 }
269
270 QMap<QByteArray, QVariant> aggregateValues; 273 QMap<QByteArray, QVariant> aggregateValues;
271 for (auto &aggregator : mAggregators) { 274 auto selectionResult = reduceOnValue(reductionValue, aggregateValues);
272 aggregateValues.insert(aggregator.resultProperty, aggregator.result());
273 }
274 275
276 mSelectedValues.insert(reductionValueBa, selectionResult);
275 readEntity(selectionResult, [&, this](const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation operation) { 277 readEntity(selectionResult, [&, this](const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation operation) {
276 callback({entity, operation, aggregateValues}); 278 callback({entity, operation, aggregateValues});
277 foundValue = true; 279 foundValue = true;
278 }); 280 });
281 } else {
282 //During initial query, do nothing. The lookup above will take care of it.
283 //During updates adjust the reduction according to the modification/addition or removal
284 if (mIncremental) {
285 //redo the reduction
286 QMap<QByteArray, QVariant> aggregateValues;
287 auto selectionResult = reduceOnValue(reductionValue, aggregateValues);
288
289 //TODO if old and new are the same a modification would be enough
290 auto oldSelectionResult = mSelectedValues.take(reductionValueBa);
291 //remove old result
292 readEntity(oldSelectionResult, [&, this](const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation) {
293 callback({entity, Sink::Operation_Removal});
294 });
295
296 //add new result
297 mSelectedValues.insert(reductionValueBa, selectionResult);
298 readEntity(selectionResult, [&, this](const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation) {
299 callback({entity, Sink::Operation_Creation, aggregateValues});
300 });
301 }
279 } 302 }
280 return false; 303 return false;
281 })) 304 }))
@@ -330,9 +353,23 @@ public:
330}; 353};
331 354
332DataStoreQuery::DataStoreQuery(const Sink::QueryBase &query, const QByteArray &type, EntityStore &store) 355DataStoreQuery::DataStoreQuery(const Sink::QueryBase &query, const QByteArray &type, EntityStore &store)
333 : mQuery(query), mType(type), mStore(store), mLogCtx(store.logContext().subContext("datastorequery")) 356 : mType(type), mStore(store), mLogCtx(store.logContext().subContext("datastorequery"))
357{
358 setupQuery(query);
359}
360
361DataStoreQuery::DataStoreQuery(const DataStoreQuery::State &state, const QByteArray &type, Sink::Storage::EntityStore &store)
362 : mType(type), mStore(store), mLogCtx(store.logContext().subContext("datastorequery"))
334{ 363{
335 setupQuery(); 364 mCollector = state.mCollector;
365 mSource = state.mSource;
366
367 auto source = mCollector;
368 while (source) {
369 source->mDatastore = this;
370 source->mIncremental = true;
371 source = source->mSource;
372 }
336} 373}
337 374
338DataStoreQuery::~DataStoreQuery() 375DataStoreQuery::~DataStoreQuery()
@@ -340,6 +377,14 @@ DataStoreQuery::~DataStoreQuery()
340 377
341} 378}
342 379
380DataStoreQuery::State::Ptr DataStoreQuery::getState()
381{
382 auto state = State::Ptr::create();
383 state->mSource = mSource;
384 state->mCollector = mCollector;
385 return state;
386}
387
343void DataStoreQuery::readEntity(const QByteArray &key, const BufferCallback &resultCallback) 388void DataStoreQuery::readEntity(const QByteArray &key, const BufferCallback &resultCallback)
344{ 389{
345 mStore.readLatest(mType, key, resultCallback); 390 mStore.readLatest(mType, key, resultCallback);
@@ -443,9 +488,10 @@ QByteArrayList DataStoreQuery::executeSubquery(const QueryBase &subquery)
443 return ids; 488 return ids;
444} 489}
445 490
446void DataStoreQuery::setupQuery() 491void DataStoreQuery::setupQuery(const Sink::QueryBase &query_)
447{ 492{
448 auto baseFilters = mQuery.getBaseFilters(); 493 auto query = query_;
494 auto baseFilters = query.getBaseFilters();
449 for (const auto &k : baseFilters.keys()) { 495 for (const auto &k : baseFilters.keys()) {
450 const auto comparator = baseFilters.value(k); 496 const auto comparator = baseFilters.value(k);
451 if (comparator.value.canConvert<Query>()) { 497 if (comparator.value.canConvert<Query>()) {
@@ -454,44 +500,43 @@ void DataStoreQuery::setupQuery()
454 baseFilters.insert(k, Query::Comparator(QVariant::fromValue(result), Query::Comparator::In)); 500 baseFilters.insert(k, Query::Comparator(QVariant::fromValue(result), Query::Comparator::In));
455 } 501 }
456 } 502 }
457 mQuery.setBaseFilters(baseFilters); 503 query.setBaseFilters(baseFilters);
458 504
459 FilterBase::Ptr baseSet; 505 FilterBase::Ptr baseSet;
460 QSet<QByteArray> remainingFilters = mQuery.getBaseFilters().keys().toSet(); 506 QSet<QByteArray> remainingFilters = query.getBaseFilters().keys().toSet();
461 QByteArray appliedSorting; 507 QByteArray appliedSorting;
462 if (!mQuery.ids().isEmpty()) { 508 if (!query.ids().isEmpty()) {
463 mSource = Source::Ptr::create(mQuery.ids().toVector(), this); 509 mSource = Source::Ptr::create(query.ids().toVector(), this);
464 baseSet = mSource; 510 baseSet = mSource;
465 } else { 511 } else {
466 QSet<QByteArray> appliedFilters; 512 QSet<QByteArray> appliedFilters;
467 513
468 auto resultSet = mStore.indexLookup(mType, mQuery, appliedFilters, appliedSorting); 514 auto resultSet = mStore.indexLookup(mType, query, appliedFilters, appliedSorting);
469 remainingFilters = remainingFilters - appliedFilters; 515 remainingFilters = remainingFilters - appliedFilters;
470 516
471 // We do a full scan if there were no indexes available to create the initial set. 517 // We do a full scan if there were no indexes available to create the initial set.
472 if (appliedFilters.isEmpty()) { 518 if (appliedFilters.isEmpty()) {
473 // TODO this should be replaced by an index lookup on the uid index
474 mSource = Source::Ptr::create(mStore.fullScan(mType), this); 519 mSource = Source::Ptr::create(mStore.fullScan(mType), this);
475 } else { 520 } else {
476 mSource = Source::Ptr::create(resultSet, this); 521 mSource = Source::Ptr::create(resultSet, this);
477 } 522 }
478 baseSet = mSource; 523 baseSet = mSource;
479 } 524 }
480 if (!mQuery.getBaseFilters().isEmpty()) { 525 if (!query.getBaseFilters().isEmpty()) {
481 auto filter = Filter::Ptr::create(baseSet, this); 526 auto filter = Filter::Ptr::create(baseSet, this);
482 //For incremental queries the remaining filters are not sufficient 527 //For incremental queries the remaining filters are not sufficient
483 for (const auto &f : mQuery.getBaseFilters().keys()) { 528 for (const auto &f : query.getBaseFilters().keys()) {
484 filter->propertyFilter.insert(f, mQuery.getFilter(f)); 529 filter->propertyFilter.insert(f, query.getFilter(f));
485 } 530 }
486 baseSet = filter; 531 baseSet = filter;
487 } 532 }
488 /* if (appliedSorting.isEmpty() && !mQuery.sortProperty.isEmpty()) { */ 533 /* if (appliedSorting.isEmpty() && !query.sortProperty.isEmpty()) { */
489 /* //Apply manual sorting */ 534 /* //Apply manual sorting */
490 /* baseSet = Sort::Ptr::create(baseSet, mQuery.sortProperty); */ 535 /* baseSet = Sort::Ptr::create(baseSet, query.sortProperty); */
491 /* } */ 536 /* } */
492 537
493 //Setup the rest of the filter stages on top of the base set 538 //Setup the rest of the filter stages on top of the base set
494 for (const auto &stage : mQuery.getFilterStages()) { 539 for (const auto &stage : query.getFilterStages()) {
495 if (auto filter = stage.dynamicCast<Query::Filter>()) { 540 if (auto filter = stage.dynamicCast<Query::Filter>()) {
496 auto f = Filter::Ptr::create(baseSet, this); 541 auto f = Filter::Ptr::create(baseSet, this);
497 f->propertyFilter = filter->propertyFilter; 542 f->propertyFilter = filter->propertyFilter;
@@ -521,7 +566,7 @@ QVector<QByteArray> DataStoreQuery::loadIncrementalResultSet(qint64 baseRevision
521 566
522ResultSet DataStoreQuery::update(qint64 baseRevision) 567ResultSet DataStoreQuery::update(qint64 baseRevision)
523{ 568{
524 SinkTraceCtx(mLogCtx) << "Executing query update to revision " << baseRevision; 569 SinkTraceCtx(mLogCtx) << "Executing query update from revision " << baseRevision;
525 auto incrementalResultSet = loadIncrementalResultSet(baseRevision); 570 auto incrementalResultSet = loadIncrementalResultSet(baseRevision);
526 SinkTraceCtx(mLogCtx) << "Incremental changes: " << incrementalResultSet; 571 SinkTraceCtx(mLogCtx) << "Incremental changes: " << incrementalResultSet;
527 mSource->add(incrementalResultSet); 572 mSource->add(incrementalResultSet);