From fb0df0d8008ef05cfa94936e19c22dec0faaa8e4 Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Mon, 15 Feb 2016 09:35:43 +0100 Subject: Prepared querying of batches and added a switch to enable/disable sorting --- common/queryrunner.cpp | 67 +++++++++++++++++++++++++++++++++----------------- 1 file changed, 45 insertions(+), 22 deletions(-) (limited to 'common/queryrunner.cpp') diff --git a/common/queryrunner.cpp b/common/queryrunner.cpp index d4ace86..1f645e8 100644 --- a/common/queryrunner.cpp +++ b/common/queryrunner.cpp @@ -45,10 +45,10 @@ public: virtual ~QueryWorker(); qint64 executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface &resultProvider); - qint64 executeInitialQuery(const Sink::Query &query, const typename DomainType::Ptr &parent, Sink::ResultProviderInterface &resultProvider); + qint64 executeInitialQuery(const Sink::Query &query, const typename DomainType::Ptr &parent, Sink::ResultProviderInterface &resultProvider, int offset, int batchsize); private: - void replaySet(ResultSet &resultSet, Sink::ResultProviderInterface &resultProvider, const QList &properties); + void replaySet(ResultSet &resultSet, Sink::ResultProviderInterface &resultProvider, const QList &properties, int offset, int batchSize); void readEntity(const Sink::Storage::NamedDatabase &db, const QByteArray &key, const std::function &resultCallback); @@ -57,7 +57,7 @@ private: ResultSet filterAndSortSet(ResultSet &resultSet, const std::function &filter, const Sink::Storage::NamedDatabase &db, bool initialQuery, const QByteArray &sortProperty); std::function getFilter(const QSet remainingFilters, const Sink::Query &query); - qint64 load(const Sink::Query &query, const std::function &)> &baseSetRetriever, Sink::ResultProviderInterface &resultProvider, bool initialQuery); + qint64 load(const Sink::Query &query, const std::function &)> &baseSetRetriever, Sink::ResultProviderInterface &resultProvider, bool initialQuery, int offset, int batchSize); private: QueryRunnerBase::ResultTransformation mResultTransformation; @@ -72,7 +72,9 @@ template QueryRunner::QueryRunner(const Sink::Query &query, const Sink::ResourceAccessInterface::Ptr &resourceAccess, const QByteArray &instanceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &factory, const QByteArray &bufferType) : QueryRunnerBase(), mResourceAccess(resourceAccess), - mResultProvider(new ResultProvider) + mResultProvider(new ResultProvider), + mOffset(0), + mBatchSize(0) { Trace() << "Starting query"; //We delegate loading of initial data to the result provider, os it can decide for itself what it needs to load. @@ -81,7 +83,7 @@ QueryRunner::QueryRunner(const Sink::Query &query, const Sink::Resou auto resultProvider = mResultProvider; async::run([=]() -> qint64 { QueryWorker worker(query, instanceIdentifier, factory, bufferType, mResultTransformation); - const qint64 newRevision = worker.executeInitialQuery(query, parent, *resultProvider); + const qint64 newRevision = worker.executeInitialQuery(query, parent, *resultProvider, mOffset, mBatchSize); return newRevision; }) .template then([query, this](qint64 newRevision) { @@ -150,7 +152,6 @@ static inline ResultSet fullScan(const Sink::Storage::Transaction &transaction, Warning() << "Error during query: " << error.message; }); - Trace() << "Full scan on " << bufferType << " found " << keys.size() << " results"; return ResultSet(keys); } @@ -174,15 +175,19 @@ QueryWorker::~QueryWorker() } template -void QueryWorker::replaySet(ResultSet &resultSet, Sink::ResultProviderInterface &resultProvider, const QList &properties) +void QueryWorker::replaySet(ResultSet &resultSet, Sink::ResultProviderInterface &resultProvider, const QList &properties, int offset, int batchSize) { int counter = 0; - while (resultSet.next([this, &resultProvider, &counter, &properties](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &value, Sink::Operation operation) -> bool { + resultSet.skip(offset); + while (resultSet.next([this, &resultProvider, &counter, &properties, batchSize](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &value, Sink::Operation operation) -> bool { //FIXME allow maildir resource to set the mimeMessage property auto valueCopy = Sink::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation(*value, properties).template staticCast(); if (mResultTransformation) { mResultTransformation(*valueCopy); } + if (batchSize && counter >= batchSize) { + return false; + } counter++; switch (operation) { case Sink::Operation_Creation: @@ -217,7 +222,8 @@ void QueryWorker::readEntity(const Sink::Storage::NamedDatabase &db, Q_ASSERT(metadataBuffer); const qint64 revision = metadataBuffer ? metadataBuffer->revision() : -1; const auto operation = metadataBuffer ? metadataBuffer->operation() : Sink::Operation_Creation; - resultCallback(DomainType::Ptr::create(mResourceInstanceIdentifier, Sink::Storage::uidFromKey(key), revision, mDomainTypeAdaptorFactory->createAdaptor(entity)), operation); + auto adaptor = mDomainTypeAdaptorFactory->createAdaptor(entity); + resultCallback(DomainType::Ptr::create(mResourceInstanceIdentifier, Sink::Storage::uidFromKey(key), revision, adaptor), operation); return false; }, [](const Sink::Storage::Error &error) { @@ -274,9 +280,10 @@ ResultSet QueryWorker::loadIncrementalResultSet(qint64 baseRevision, template ResultSet QueryWorker::filterAndSortSet(ResultSet &resultSet, const std::function &filter, const Sink::Storage::NamedDatabase &db, bool initialQuery, const QByteArray &sortProperty) { - auto sortedMap = QSharedPointer>::create(); - - if (initialQuery) { + bool sortingRequired = false; + if (initialQuery && sortingRequired) { + //Sort the complete set by reading the sort property and filling into a sorted map + auto sortedMap = QSharedPointer>::create(); while (resultSet.next()) { //readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess) readEntity(db, resultSet.id(), [this, filter, initialQuery, sortedMap, sortProperty, &resultSet](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject, Sink::Operation operation) { @@ -301,23 +308,39 @@ ResultSet QueryWorker::filterAndSortSet(ResultSet &resultSet, const } return false; }; - return ResultSet(generator); + + auto skip = [iterator]() { + iterator->next(); + }; + return ResultSet(generator, skip); } else { auto resultSetPtr = QSharedPointer::create(resultSet); ResultSet::ValueGenerator generator = [this, resultSetPtr, &db, filter, initialQuery](std::function callback) -> bool { if (resultSetPtr->next()) { //readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess) readEntity(db, resultSetPtr->id(), [this, filter, callback, initialQuery](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject, Sink::Operation operation) { - //Always remove removals, they probably don't match due to non-available properties - if ((operation == Sink::Operation_Removal) || filter(domainObject)) { - callback(domainObject, operation); + if (initialQuery) { + //We're not interested in removals during the initial query + if ((operation != Sink::Operation_Removal) && filter(domainObject)) { + //In the initial set every entity is new + callback(domainObject, Sink::Operation_Creation); + } + } else { + //Always remove removals, they probably don't match due to non-available properties + if ((operation == Sink::Operation_Removal) || filter(domainObject)) { + //TODO only replay if this is in the currently visible set (or just always replay, worst case we have a couple to many results) + callback(domainObject, operation); + } } }); return true; } return false; }; - return ResultSet(generator); + auto skip = [resultSetPtr]() { + resultSetPtr->skip(1); + }; + return ResultSet(generator, skip); } } @@ -347,7 +370,7 @@ std::function -qint64 QueryWorker::load(const Sink::Query &query, const std::function &)> &baseSetRetriever, Sink::ResultProviderInterface &resultProvider, bool initialQuery) +qint64 QueryWorker::load(const Sink::Query &query, const std::function &)> &baseSetRetriever, Sink::ResultProviderInterface &resultProvider, bool initialQuery, int offset, int batchSize) { QTime time; time.start(); @@ -364,7 +387,7 @@ qint64 QueryWorker::load(const Sink::Query &query, const std::functi Trace() << "Base set retrieved. " << time.elapsed(); auto filteredSet = filterAndSortSet(resultSet, getFilter(remainingFilters, query), db, initialQuery, query.sortProperty); Trace() << "Filtered set retrieved. " << time.elapsed(); - replaySet(filteredSet, resultProvider, query.requestedProperties); + replaySet(filteredSet, resultProvider, query.requestedProperties, offset, batchSize); Trace() << "Filtered set replayed. " << time.elapsed(); resultProvider.setRevision(Sink::Storage::maxRevision(transaction)); return Sink::Storage::maxRevision(transaction); @@ -380,13 +403,13 @@ qint64 QueryWorker::executeIncrementalQuery(const Sink::Query &query Trace() << "Running incremental query " << baseRevision; auto revision = load(query, [&](Sink::Storage::Transaction &transaction, QSet &remainingFilters) -> ResultSet { return loadIncrementalResultSet(baseRevision, query, transaction, remainingFilters); - }, resultProvider, false); + }, resultProvider, false, 0, 0); Trace() << "Incremental query took: " << time.elapsed() << " ms"; return revision; } template -qint64 QueryWorker::executeInitialQuery(const Sink::Query &query, const typename DomainType::Ptr &parent, Sink::ResultProviderInterface &resultProvider) +qint64 QueryWorker::executeInitialQuery(const Sink::Query &query, const typename DomainType::Ptr &parent, Sink::ResultProviderInterface &resultProvider, int offset, int batchsize) { QTime time; time.start(); @@ -403,7 +426,7 @@ qint64 QueryWorker::executeInitialQuery(const Sink::Query &query, co } auto revision = load(modifiedQuery, [&](Sink::Storage::Transaction &transaction, QSet &remainingFilters) -> ResultSet { return loadInitialResultSet(modifiedQuery, transaction, remainingFilters); - }, resultProvider, true); + }, resultProvider, true, offset, batchsize); Trace() << "Initial query took: " << time.elapsed() << " ms"; resultProvider.initialResultSetComplete(parent); return revision; -- cgit v1.2.3