From 87695f52d5ac627cdd710f37c275fccdf920af0b Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Thu, 6 Oct 2016 17:52:52 +0200 Subject: count as a first aggregation function --- common/datastorequery.cpp | 65 +++++++++++++++++++++++++---------------------- common/datastorequery.h | 2 +- common/entityreader.cpp | 14 +++++----- common/entityreader.h | 2 +- common/query.h | 33 ++++++++++++++++++++++++ common/queryrunner.cpp | 9 ++++--- common/resultset.cpp | 2 +- common/resultset.h | 10 +++++++- common/standardqueries.h | 2 +- 9 files changed, 93 insertions(+), 46 deletions(-) (limited to 'common') diff --git a/common/datastorequery.cpp b/common/datastorequery.cpp index 7341675..8b14951 100644 --- a/common/datastorequery.cpp +++ b/common/datastorequery.cpp @@ -62,13 +62,13 @@ class Source : public FilterBase { mIt = mIds.constBegin(); } - bool next(const std::function &callback) Q_DECL_OVERRIDE + bool next(const std::function &callback) Q_DECL_OVERRIDE { if (mIt == mIds.constEnd()) { return false; } readEntity(*mIt, [callback](const QByteArray &uid, const Sink::EntityBuffer &entityBuffer) { - callback(entityBuffer.operation(), uid, entityBuffer); + callback({uid, entityBuffer, entityBuffer.operation()}); }); mIt++; return mIt != mIds.constEnd(); @@ -86,7 +86,7 @@ public: } virtual ~Collector(){} - bool next(const std::function &callback) Q_DECL_OVERRIDE + bool next(const std::function &callback) Q_DECL_OVERRIDE { return mSource->next(callback); } @@ -106,26 +106,26 @@ public: virtual ~Filter(){} - bool next(const std::function &callback) Q_DECL_OVERRIDE { + bool next(const std::function &callback) Q_DECL_OVERRIDE { bool foundValue = false; - while(!foundValue && mSource->next([this, callback, &foundValue](Sink::Operation operation, const QByteArray &uid, const Sink::EntityBuffer &entityBuffer) { - SinkTrace() << "Filter: " << uid << operation; + while(!foundValue && mSource->next([this, callback, &foundValue](const ResultSet::Result &result) { + SinkTrace() << "Filter: " << result.uid << result.operation; //Always accept removals. They can't match the filter since the data is gone. - if (operation == Sink::Operation_Removal) { - SinkTrace() << "Removal: " << uid << operation; - callback(operation, uid, entityBuffer); + if (result.operation == Sink::Operation_Removal) { + SinkTrace() << "Removal: " << result.uid << result.operation; + callback(result); foundValue = true; - } else if (matchesFilter(uid, entityBuffer)) { - SinkTrace() << "Accepted: " << uid << operation; - callback(operation, uid, entityBuffer); + } else if (matchesFilter(result.uid, result.buffer)) { + SinkTrace() << "Accepted: " << result.uid << result.operation; + callback(result); foundValue = true; //TODO if something did not match the filter so far but does now, turn into an add operation. } else { - SinkTrace() << "Rejected: " << uid << operation; + SinkTrace() << "Rejected: " << result.uid << result.operation; //TODO emit a removal if we had the uid in the result set and this is a modification. //We don't know if this results in a removal from the dataset, so we emit a removal notification anyways - callback(Sink::Operation_Removal, uid, entityBuffer); + callback({result.uid, result.buffer, Sink::Operation_Removal, result.aggregateValues}); } return false; })) @@ -186,10 +186,10 @@ public: return false; } - bool next(const std::function &callback) Q_DECL_OVERRIDE { + bool next(const std::function &callback) Q_DECL_OVERRIDE { bool foundValue = false; - while(!foundValue && mSource->next([this, callback, &foundValue](Sink::Operation operation, const QByteArray &uid, const Sink::EntityBuffer &entityBuffer) { - auto reductionValue = getProperty(entityBuffer.entity(), mReductionProperty); + while(!foundValue && mSource->next([this, callback, &foundValue](const ResultSet::Result &result) { + auto reductionValue = getProperty(result.buffer.entity(), mReductionProperty); if (!mReducedValues.contains(getByteArray(reductionValue))) { //Only reduce every value once. mReducedValues.insert(getByteArray(reductionValue)); @@ -205,8 +205,11 @@ public: } }); } + int count = results.size(); readEntity(selectionResult, [&, this](const QByteArray &uid, const Sink::EntityBuffer &entityBuffer) { - callback(Sink::Operation_Creation, uid, entityBuffer); + QMap aggregateValues; + aggregateValues.insert("count", count); + callback({uid, entityBuffer, Sink::Operation_Creation, aggregateValues}); foundValue = true; }); } @@ -232,14 +235,14 @@ public: virtual ~Bloom(){} - bool next(const std::function &callback) Q_DECL_OVERRIDE { + bool next(const std::function &callback) Q_DECL_OVERRIDE { bool foundValue = false; - while(!foundValue && mSource->next([this, callback, &foundValue](Sink::Operation operation, const QByteArray &uid, const Sink::EntityBuffer &entityBuffer) { - auto bloomValue = getProperty(entityBuffer.entity(), mBloomProperty); + while(!foundValue && mSource->next([this, callback, &foundValue](const ResultSet::Result &result) { + auto bloomValue = getProperty(result.buffer.entity(), mBloomProperty); auto results = indexLookup(mBloomProperty, bloomValue); for (const auto r : results) { readEntity(r, [&, this](const QByteArray &uid, const Sink::EntityBuffer &entityBuffer) { - callback(Sink::Operation_Creation, uid, entityBuffer); + callback({uid, entityBuffer, Sink::Operation_Creation}); foundValue = true; }); } @@ -398,8 +401,8 @@ QByteArrayList DataStoreQuery::executeSubquery(const Query &subquery) auto sub = prepareQuery(subquery.type, subquery, mTransaction); auto result = sub->execute(); QByteArrayList ids; - while (result.next([&ids](const QByteArray &uid, const Sink::EntityBuffer &, Sink::Operation) { - ids << uid; + while (result.next([&ids](const ResultSet::Result &result) { + ids << result.uid; })) {} return ids; @@ -502,9 +505,9 @@ ResultSet DataStoreQuery::update(qint64 baseRevision) SinkTrace() << "Changed: " << incrementalResultSet; mSource->add(incrementalResultSet); ResultSet::ValueGenerator generator = [this](const ResultSet::Callback &callback) -> bool { - if (mCollector->next([this, callback](Sink::Operation operation, const QByteArray &uid, const Sink::EntityBuffer &buffer) { - SinkTrace() << "Got incremental result: " << uid << operation; - callback(uid, buffer, operation); + if (mCollector->next([this, callback](const ResultSet::Result &result) { + SinkTrace() << "Got incremental result: " << result.uid << result.operation; + callback(result); })) { return true; @@ -520,10 +523,10 @@ ResultSet DataStoreQuery::execute() SinkTrace() << "Executing query"; ResultSet::ValueGenerator generator = [this](const ResultSet::Callback &callback) -> bool { - if (mCollector->next([this, callback](Sink::Operation operation, const QByteArray &uid, const Sink::EntityBuffer &buffer) { - if (operation != Sink::Operation_Removal) { - SinkTrace() << "Got initial result: " << uid << operation; - callback(uid, buffer, Sink::Operation_Creation); + if (mCollector->next([this, callback](const ResultSet::Result &result) { + if (result.operation != Sink::Operation_Removal) { + SinkTrace() << "Got initial result: " << result.uid << result.operation; + callback(ResultSet::Result{result.uid, result.buffer, Sink::Operation_Creation, result.aggregateValues}); } })) { diff --git a/common/datastorequery.h b/common/datastorequery.h index 03b4eac..164d721 100644 --- a/common/datastorequery.h +++ b/common/datastorequery.h @@ -107,7 +107,7 @@ public: virtual void skip() { mSource->skip(); }; //Returns true for as long as a result is available - virtual bool next(const std::function &callback) = 0; + virtual bool next(const std::function &callback) = 0; QSharedPointer mSource; DataStoreQuery *mDatastore; diff --git a/common/entityreader.cpp b/common/entityreader.cpp index bd973d0..cca1511 100644 --- a/common/entityreader.cpp +++ b/common/entityreader.cpp @@ -145,14 +145,14 @@ template void EntityReader::query(const Sink::Query &query, const std::function &callback) { executeInitialQuery(query, 0, 0, - [&callback](const typename DomainType::Ptr &value, Sink::Operation operation) -> bool { + [&callback](const typename DomainType::Ptr &value, Sink::Operation operation, const QMap &) -> bool { Q_ASSERT(operation == Sink::Operation_Creation); return callback(*value); }); } template -QPair EntityReader::executeInitialQuery(const Sink::Query &query, int offset, int batchsize, const std::function &callback) +QPair EntityReader::executeInitialQuery(const Sink::Query &query, int offset, int batchsize, const ResultCallback &callback) { QTime time; time.start(); @@ -168,7 +168,7 @@ QPair EntityReader::executeInitialQuery(const Sink:: } template -QPair EntityReader::executeIncrementalQuery(const Sink::Query &query, qint64 lastRevision, const std::function &callback) +QPair EntityReader::executeIncrementalQuery(const Sink::Query &query, qint64 lastRevision, const ResultCallback &callback) { QTime time; time.start(); @@ -185,18 +185,18 @@ QPair EntityReader::executeIncrementalQuery(const Si } template -qint64 EntityReader::replaySet(ResultSet &resultSet, int offset, int batchSize, const std::function &callback) +qint64 EntityReader::replaySet(ResultSet &resultSet, int offset, int batchSize, const ResultCallback &callback) { SinkTrace() << "Skipping over " << offset << " results"; resultSet.skip(offset); int counter = 0; while (!batchSize || (counter < batchSize)) { const bool ret = - resultSet.next([this, &counter, callback](const QByteArray &uid, const Sink::EntityBuffer &value, Sink::Operation operation) -> bool { + resultSet.next([this, &counter, callback](const ResultSet::Result &result) -> bool { counter++; - auto adaptor = mDomainTypeAdaptorFactory.createAdaptor(value.entity()); + auto adaptor = mDomainTypeAdaptorFactory.createAdaptor(result.buffer.entity()); Q_ASSERT(adaptor); - return callback(QSharedPointer::create(mResourceInstanceIdentifier, uid, value.revision(), adaptor), operation); + return callback(QSharedPointer::create(mResourceInstanceIdentifier, result.uid, result.buffer.revision(), adaptor), result.operation, result.aggregateValues); }); if (!ret) { break; diff --git a/common/entityreader.h b/common/entityreader.h index f216453..1e7b086 100644 --- a/common/entityreader.h +++ b/common/entityreader.h @@ -48,7 +48,7 @@ namespace EntityReaderUtils { template class SINK_EXPORT EntityReader { - typedef std::function ResultCallback; + typedef std::function &aggregateValues)> ResultCallback; public: EntityReader(const QByteArray &resourceType, const QByteArray &mResourceInstanceIdentifier, Sink::Storage::Transaction &transaction); diff --git a/common/query.h b/common/query.h index 00ae086..3ab5acf 100644 --- a/common/query.h +++ b/common/query.h @@ -255,15 +255,48 @@ public: Comparator comparator; }; + class Aggregator { + public: + enum Operation { + Count, + Collect + }; + + Aggregator(const QByteArray &p, Operation o, const QByteArray &c = QByteArray()) + : resultProperty(p), + operation(o), + propertyToCollect(c) + { + } + + QByteArray resultProperty; + Operation operation; + QByteArray propertyToCollect; + }; + Reduce(const QByteArray &p, const Selector &s) : property(p), selector(s) { } + Reduce &count(const QByteArray &propertyName = "count") + { + aggregators << Aggregator(propertyName, Aggregator::Count); + return *this; + } + + template + Reduce &collect(const QByteArray &propertyName) + { + aggregators << Aggregator(propertyName, Aggregator::Collect, T::name); + return *this; + } + //Reduce on property QByteArray property; Selector selector; + QList aggregators; //TODO add aggregate functions like: //.count() diff --git a/common/queryrunner.cpp b/common/queryrunner.cpp index 1835e1f..f037cfc 100644 --- a/common/queryrunner.cpp +++ b/common/queryrunner.cpp @@ -55,7 +55,7 @@ public: private: Storage::Transaction getTransaction(); - std::function resultProviderCallback(const Sink::Query &query, Sink::ResultProviderInterface &resultProvider); + std::function &)> resultProviderCallback(const Sink::Query &query, Sink::ResultProviderInterface &resultProvider); QueryRunnerBase::ResultTransformation mResultTransformation; DomainTypeAdaptorFactoryInterface::Ptr mDomainTypeAdaptorFactory; @@ -174,10 +174,13 @@ QueryWorker::~QueryWorker() } template -std::function QueryWorker::resultProviderCallback(const Sink::Query &query, Sink::ResultProviderInterface &resultProvider) +std::function &)> QueryWorker::resultProviderCallback(const Sink::Query &query, Sink::ResultProviderInterface &resultProvider) { - return [this, &query, &resultProvider](const typename DomainType::Ptr &domainObject, Sink::Operation operation) -> bool { + return [this, &query, &resultProvider](const typename DomainType::Ptr &domainObject, Sink::Operation operation, const QMap &aggregateValues) -> bool { auto valueCopy = Sink::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation(*domainObject, query.requestedProperties).template staticCast(); + for (auto it = aggregateValues.constBegin(); it != aggregateValues.constEnd(); it++) { + valueCopy->setProperty(it.key(), it.value()); + } if (mResultTransformation) { mResultTransformation(*valueCopy); } diff --git a/common/resultset.cpp b/common/resultset.cpp index c3ed5f6..13b5f16 100644 --- a/common/resultset.cpp +++ b/common/resultset.cpp @@ -79,7 +79,7 @@ bool ResultSet::next() return true; } } else { - next([](const QByteArray &, const Sink::EntityBuffer &, Sink::Operation) { return false; }); + next([](const Result &) { return false; }); } return false; } diff --git a/common/resultset.h b/common/resultset.h index 4e934fc..7b77417 100644 --- a/common/resultset.h +++ b/common/resultset.h @@ -19,6 +19,8 @@ #pragma once #include +#include +#include #include #include "metadata_generated.h" #include "entitybuffer.h" @@ -31,7 +33,13 @@ class ResultSet { public: - typedef std::function Callback; + struct Result { + QByteArray uid; + Sink::EntityBuffer buffer; + Sink::Operation operation; + QMap aggregateValues; + }; + typedef std::function Callback; typedef std::function ValueGenerator; typedef std::function IdGenerator; typedef std::function SkipValue; diff --git a/common/standardqueries.h b/common/standardqueries.h index 06ce396..07ce637 100644 --- a/common/standardqueries.h +++ b/common/standardqueries.h @@ -50,7 +50,7 @@ namespace StandardQueries { } query.filter(folder); query.sort(); - query.reduce(Query::Reduce::Selector::max()); + query.reduce(Query::Reduce::Selector::max()).count("count"); return query; } -- cgit v1.2.3