From e661bd33b7bf8da546cbdbe23c9ddcf568930a1a Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Mon, 17 Oct 2016 22:32:44 +0200 Subject: Simplified replaySet --- common/queryrunner.cpp | 83 +++++++++++++++++++------------------------------- 1 file changed, 31 insertions(+), 52 deletions(-) (limited to 'common/queryrunner.cpp') diff --git a/common/queryrunner.cpp b/common/queryrunner.cpp index e7963a3..4422229 100644 --- a/common/queryrunner.cpp +++ b/common/queryrunner.cpp @@ -52,12 +52,11 @@ public: QueryWorker(const Sink::Query &query, const ResourceContext &context, const QByteArray &bufferType, const QueryRunnerBase::ResultTransformation &transformation); virtual ~QueryWorker(); - qint64 replaySet(ResultSet &resultSet, int offset, int batchSize, const ResultCallback &callback); QPair executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface &resultProvider); QPair executeInitialQuery(const Sink::Query &query, const typename DomainType::Ptr &parent, Sink::ResultProviderInterface &resultProvider, int offset, int batchsize); private: - std::function &)> resultProviderCallback(const Sink::Query &query, Sink::ResultProviderInterface &resultProvider); + void resultProviderCallback(const Sink::Query &query, Sink::ResultProviderInterface &resultProvider, const ResultSet::Result &result); QueryRunnerBase::ResultTransformation mResultTransformation; ResourceContext mResourceContext; @@ -175,55 +174,32 @@ QueryWorker::~QueryWorker() } template -std::function &)> QueryWorker::resultProviderCallback(const Sink::Query &query, Sink::ResultProviderInterface &resultProvider) +void QueryWorker::resultProviderCallback(const Sink::Query &query, Sink::ResultProviderInterface &resultProvider, const ResultSet::Result &result) { - return [this, &query, &resultProvider](const typename DomainType::Ptr &domainObject, Sink::Operation operation, const QMap &aggregateValues) -> bool { - auto valueCopy = Sink::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation(*domainObject, query.requestedProperties).template staticCast(); - for (auto it = aggregateValues.constBegin(); it != aggregateValues.constEnd(); it++) { - valueCopy->setProperty(it.key(), it.value()); - } - if (mResultTransformation) { - mResultTransformation(*valueCopy); - } - switch (operation) { - case Sink::Operation_Creation: - // SinkTrace() << "Got creation"; - resultProvider.add(valueCopy); - break; - case Sink::Operation_Modification: - // SinkTrace() << "Got modification"; - resultProvider.modify(valueCopy); - break; - case Sink::Operation_Removal: - // SinkTrace() << "Got removal"; - resultProvider.remove(valueCopy); - break; - } - return true; - }; -} - -template -qint64 QueryWorker::replaySet(ResultSet &resultSet, int offset, int batchSize, const ResultCallback &callback) -{ - SinkTrace() << "Skipping over " << offset << " results"; - resultSet.skip(offset); - int counter = 0; - while (!batchSize || (counter < batchSize)) { - const bool ret = - resultSet.next([this, &counter, callback](const ResultSet::Result &result) -> bool { - counter++; - auto adaptor = mResourceContext.adaptorFactory().createAdaptor(result.buffer.entity()); - Q_ASSERT(adaptor); - return callback(QSharedPointer::create(mResourceContext.instanceId(), result.uid, result.buffer.revision(), adaptor), result.operation, result.aggregateValues); - }); - if (!ret) { + auto adaptor = mResourceContext.adaptorFactory().createAdaptor(result.buffer.entity()); + Q_ASSERT(adaptor); + auto domainObject = DomainType{mResourceContext.instanceId(), result.uid, result.buffer.revision(), adaptor}; + auto valueCopy = Sink::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation(domainObject, query.requestedProperties).template staticCast(); + for (auto it = result.aggregateValues.constBegin(); it != result.aggregateValues.constEnd(); it++) { + valueCopy->setProperty(it.key(), it.value()); + } + if (mResultTransformation) { + mResultTransformation(*valueCopy); + } + switch (result.operation) { + case Sink::Operation_Creation: + // SinkTrace() << "Got creation"; + resultProvider.add(valueCopy); break; - } - }; - SinkTrace() << "Replayed " << counter << " results." - << "Limit " << batchSize; - return counter; + case Sink::Operation_Modification: + // SinkTrace() << "Got modification"; + resultProvider.modify(valueCopy); + break; + case Sink::Operation_Removal: + // SinkTrace() << "Got removal"; + resultProvider.remove(valueCopy); + break; + } } template @@ -238,9 +214,10 @@ QPair QueryWorker::executeIncrementalQuery(const Sin auto preparedQuery = ApplicationDomain::TypeImplementation::prepareQuery(query, entityStore); auto resultSet = preparedQuery->update(baseRevision); - SinkTrace() << "Filtered set retrieved. " << Log::TraceTime(time.elapsed()); - auto replayedEntities = replaySet(resultSet, 0, 0, resultProviderCallback(query, resultProvider)); + auto replayedEntities = resultSet.replaySet(0, 0, [this, query, &resultProvider](const ResultSet::Result &result) { + resultProviderCallback(query, resultProvider, result); + }); SinkTrace() << "Incremental query took: " << Log::TraceTime(time.elapsed()); return qMakePair(entityStore->maxRevision(), replayedEntities); @@ -270,7 +247,9 @@ QPair QueryWorker::executeInitialQuery( auto resultSet = preparedQuery->execute(); SinkTrace() << "Filtered set retrieved. " << Log::TraceTime(time.elapsed()); - auto replayedEntities = replaySet(resultSet, offset, batchsize, resultProviderCallback(query, resultProvider)); + auto replayedEntities = resultSet.replaySet(offset, batchsize, [this, query, &resultProvider](const ResultSet::Result &result) { + resultProviderCallback(query, resultProvider, result); + }); SinkTrace() << "Initial query took: " << Log::TraceTime(time.elapsed()); return qMakePair(entityStore->maxRevision(), replayedEntities); -- cgit v1.2.3