From 78d60438e8f1c962b6933431fe59ac44318d0352 Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Fri, 29 Apr 2016 15:16:50 +0200 Subject: Increase the offset by the actually replayed items. --- common/queryrunner.cpp | 53 +++++++++++++++++++++++++------------------------- 1 file changed, 27 insertions(+), 26 deletions(-) (limited to 'common/queryrunner.cpp') diff --git a/common/queryrunner.cpp b/common/queryrunner.cpp index 144c487..d86d26e 100644 --- a/common/queryrunner.cpp +++ b/common/queryrunner.cpp @@ -47,11 +47,11 @@ public: const QueryRunnerBase::ResultTransformation &transformation); virtual ~QueryWorker(); - qint64 executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface &resultProvider); - qint64 executeInitialQuery(const Sink::Query &query, const typename DomainType::Ptr &parent, Sink::ResultProviderInterface &resultProvider, int offset, int batchsize); + QPair executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface &resultProvider); + QPair executeInitialQuery(const Sink::Query &query, const typename DomainType::Ptr &parent, Sink::ResultProviderInterface &resultProvider, int offset, int batchsize); private: - void replaySet(ResultSet &resultSet, Sink::ResultProviderInterface &resultProvider, const QList &properties, int offset, int batchSize); + qint64 replaySet(ResultSet &resultSet, Sink::ResultProviderInterface &resultProvider, const QList &properties, int offset, int batchSize); void readEntity(const Sink::Storage::NamedDatabase &db, const QByteArray &key, const std::function &resultCallback); @@ -62,7 +62,7 @@ private: ResultSet filterAndSortSet(ResultSet &resultSet, const std::function &filter, const Sink::Storage::NamedDatabase &db, bool initialQuery, const QByteArray &sortProperty); std::function getFilter(const QSet remainingFilters, const Sink::Query &query); - qint64 load(const Sink::Query &query, const std::function &, QByteArray &)> &baseSetRetriever, + QPair load(const Sink::Query &query, const std::function &, QByteArray &)> &baseSetRetriever, Sink::ResultProviderInterface &resultProvider, bool initialQuery, int offset, int batchSize); private: @@ -83,20 +83,20 @@ QueryRunner::QueryRunner(const Sink::Query &query, const Sink::Resou if (query.limit && query.sortProperty.isEmpty()) { Warning() << "A limited query without sorting is typically a bad idea."; } - // We delegate loading of initial data to the result provider, os it can decide for itself what it needs to load. + // We delegate loading of initial data to the result provider, so it can decide for itself what it needs to load. mResultProvider->setFetcher([=](const typename DomainType::Ptr &parent) { Trace() << "Running fetcher. Offset: " << mOffset << " Batchsize: " << mBatchSize; auto resultProvider = mResultProvider; - async::run([=]() -> qint64 { + async::run >([=]() { QueryWorker worker(query, instanceIdentifier, factory, bufferType, mResultTransformation); - const qint64 newRevision = worker.executeInitialQuery(query, parent, *resultProvider, mOffset, mBatchSize); - return newRevision; + const auto newRevisionAndReplayedEntities = worker.executeInitialQuery(query, parent, *resultProvider, mOffset, mBatchSize); + return newRevisionAndReplayedEntities; }) - .template then([query, this](qint64 newRevision) { - mOffset += mBatchSize; + .template then>([query, this](const QPair &newRevisionAndReplayedEntities) { + mOffset += newRevisionAndReplayedEntities.second; // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. if (query.liveQuery) { - mResourceAccess->sendRevisionReplayedCommand(newRevision); + mResourceAccess->sendRevisionReplayedCommand(newRevisionAndReplayedEntities.first); } }) .exec(); @@ -107,14 +107,14 @@ QueryRunner::QueryRunner(const Sink::Query &query, const Sink::Resou // Incremental updates are always loaded directly, leaving it up to the result to discard the changes if they are not interesting setQuery([=]() -> KAsync::Job { auto resultProvider = mResultProvider; - return async::run([=]() -> qint64 { + return async::run >([=]() { QueryWorker worker(query, instanceIdentifier, factory, bufferType, mResultTransformation); - const qint64 newRevision = worker.executeIncrementalQuery(query, *resultProvider); - return newRevision; + const auto newRevisionAndReplayedEntities = worker.executeIncrementalQuery(query, *resultProvider); + return newRevisionAndReplayedEntities; }) - .template then([query, this](qint64 newRevision) { + .template then >([query, this](const QPair &newRevisionAndReplayedEntities) { // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. - mResourceAccess->sendRevisionReplayedCommand(newRevision); + mResourceAccess->sendRevisionReplayedCommand(newRevisionAndReplayedEntities.first); }); }); // Ensure the connection is open, if it wasn't already opened @@ -179,7 +179,7 @@ QueryWorker::~QueryWorker() } template -void QueryWorker::replaySet(ResultSet &resultSet, Sink::ResultProviderInterface &resultProvider, const QList &properties, int offset, int batchSize) +qint64 QueryWorker::replaySet(ResultSet &resultSet, Sink::ResultProviderInterface &resultProvider, const QList &properties, int offset, int batchSize) { Trace() << "Skipping over " << offset << " results"; resultSet.skip(offset); @@ -214,6 +214,7 @@ void QueryWorker::replaySet(ResultSet &resultSet, Sink::ResultProvid }; Trace() << "Replayed " << counter << " results." << "Limit " << batchSize; + return counter; } template @@ -394,7 +395,7 @@ QueryWorker::getFilter(const QSet remainingFilters, cons } template -qint64 QueryWorker::load(const Sink::Query &query, const std::function &, QByteArray &)> &baseSetRetriever, +QPair QueryWorker::load(const Sink::Query &query, const std::function &, QByteArray &)> &baseSetRetriever, Sink::ResultProviderInterface &resultProvider, bool initialQuery, int offset, int batchSize) { QTime time; @@ -411,29 +412,29 @@ qint64 QueryWorker::load(const Sink::Query &query, const std::functi Trace() << "Base set retrieved. " << Log::TraceTime(time.elapsed()); auto filteredSet = filterAndSortSet(resultSet, getFilter(remainingFilters, query), db, initialQuery, remainingSorting); Trace() << "Filtered set retrieved. " << Log::TraceTime(time.elapsed()); - replaySet(filteredSet, resultProvider, query.requestedProperties, offset, batchSize); + auto replayedEntities = replaySet(filteredSet, resultProvider, query.requestedProperties, offset, batchSize); Trace() << "Filtered set replayed. " << Log::TraceTime(time.elapsed()); resultProvider.setRevision(Sink::Storage::maxRevision(transaction)); - return Sink::Storage::maxRevision(transaction); + return qMakePair(Sink::Storage::maxRevision(transaction), replayedEntities); } template -qint64 QueryWorker::executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface &resultProvider) +QPair QueryWorker::executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface &resultProvider) { QTime time; time.start(); const qint64 baseRevision = resultProvider.revision() + 1; Trace() << "Running incremental query " << baseRevision; - auto revision = load(query, [&](Sink::Storage::Transaction &transaction, QSet &remainingFilters, QByteArray &remainingSorting) -> ResultSet { + auto revisionAndReplayedEntities = load(query, [&](Sink::Storage::Transaction &transaction, QSet &remainingFilters, QByteArray &remainingSorting) -> ResultSet { return loadIncrementalResultSet(baseRevision, query, transaction, remainingFilters); }, resultProvider, false, 0, 0); Trace() << "Incremental query took: " << Log::TraceTime(time.elapsed()); - return revision; + return revisionAndReplayedEntities; } template -qint64 QueryWorker::executeInitialQuery( +QPair QueryWorker::executeInitialQuery( const Sink::Query &query, const typename DomainType::Ptr &parent, Sink::ResultProviderInterface &resultProvider, int offset, int batchsize) { QTime time; @@ -449,12 +450,12 @@ qint64 QueryWorker::executeInitialQuery( modifiedQuery.propertyFilter.insert(query.parentProperty, QVariant()); } } - auto revision = load(modifiedQuery, [&](Sink::Storage::Transaction &transaction, QSet &remainingFilters, QByteArray &remainingSorting) -> ResultSet { + auto revisionAndReplayedEntities = load(modifiedQuery, [&](Sink::Storage::Transaction &transaction, QSet &remainingFilters, QByteArray &remainingSorting) -> ResultSet { return loadInitialResultSet(modifiedQuery, transaction, remainingFilters, remainingSorting); }, resultProvider, true, offset, batchsize); Trace() << "Initial query took: " << Log::TraceTime(time.elapsed()); resultProvider.initialResultSetComplete(parent); - return revision; + return revisionAndReplayedEntities; } template class QueryRunner; -- cgit v1.2.3