From 2b012938ac0adaa173705c931e12f40184036183 Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Sun, 27 Dec 2015 10:50:18 +0100 Subject: Threaded query runner implementation All database access is now implemented in threads, to avoid blocking the main thread. The resource communication still resides in the main thread to keep the coordination simple. With it comes a test that ensures we don't block the main thread for too long. --- common/queryrunner.cpp | 169 ++++++++++++++++++++++++++++----------- common/queryrunner.h | 39 +++------ tests/CMakeLists.txt | 2 + tests/modelinteractivitytest.cpp | 101 +++++++++++++++++++++++ 4 files changed, 235 insertions(+), 76 deletions(-) create mode 100644 tests/modelinteractivitytest.cpp diff --git a/common/queryrunner.cpp b/common/queryrunner.cpp index 25c9d5b..af232c3 100644 --- a/common/queryrunner.cpp +++ b/common/queryrunner.cpp @@ -30,57 +30,89 @@ using namespace Akonadi2; -static inline ResultSet fullScan(const Akonadi2::Storage::Transaction &transaction, const QByteArray &bufferType) +/* + * This class wraps the actual query implementation. + * + * This is a worker object that can be moved to a thread to execute the query. + * The only interaction point is the ResultProvider, which handles the threadsafe reporting of the result. + */ +template +class QueryWorker : public QObject { - //TODO use a result set with an iterator, to read values on demand - QVector keys; - transaction.openDatabase(bufferType + ".main").scan(QByteArray(), [&](const QByteArray &key, const QByteArray &value) -> bool { - //Skip internals - if (Akonadi2::Storage::isInternalKey(key)) { - return true; - } - keys << Akonadi2::Storage::uidFromKey(key); - return true; - }, - [](const Akonadi2::Storage::Error &error) { - qWarning() << "Error during query: " << error.message; - }); +public: + QueryWorker(const Akonadi2::Query &query, const QByteArray &instanceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &, const QByteArray &bufferType); + virtual ~QueryWorker(); + + qint64 executeIncrementalQuery(const Akonadi2::Query &query, Akonadi2::ResultProviderInterface &resultProvider); + qint64 executeInitialQuery(const Akonadi2::Query &query, const typename DomainType::Ptr &parent, Akonadi2::ResultProviderInterface &resultProvider); + +private: + static void replaySet(ResultSet &resultSet, Akonadi2::ResultProviderInterface &resultProvider, const QList &properties); + + void readEntity(const Akonadi2::Storage::NamedDatabase &db, const QByteArray &key, const std::function &resultCallback); + + ResultSet loadInitialResultSet(const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet &remainingFilters); + ResultSet loadIncrementalResultSet(qint64 baseRevision, const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet &remainingFilters); + + ResultSet filterSet(const ResultSet &resultSet, const std::function &filter, const Akonadi2::Storage::NamedDatabase &db, bool initialQuery); + std::function getFilter(const QSet remainingFilters, const Akonadi2::Query &query); + qint64 load(const Akonadi2::Query &query, const std::function &)> &baseSetRetriever, Akonadi2::ResultProviderInterface &resultProvider, bool initialQuery); + +private: + DomainTypeAdaptorFactoryInterface::Ptr mDomainTypeAdaptorFactory; + QByteArray mResourceInstanceIdentifier; + QByteArray mBufferType; + Akonadi2::Query mQuery; +}; - Trace() << "Full scan on " << bufferType << " found " << keys.size() << " results"; - return ResultSet(keys); -} template QueryRunner::QueryRunner(const Akonadi2::Query &query, const Akonadi2::ResourceAccessInterface::Ptr &resourceAccess, const QByteArray &instanceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &factory, const QByteArray &bufferType) : QueryRunnerBase(), - mResultProvider(new ResultProvider), mResourceAccess(resourceAccess), - mDomainTypeAdaptorFactory(factory), - mResourceInstanceIdentifier(instanceIdentifier), - mBufferType(bufferType), - mQuery(query) + mResultProvider(new ResultProvider) { Trace() << "Starting query"; //We delegate loading of initial data to the result provider, os it can decide for itself what it needs to load. - mResultProvider->setFetcher([this, query](const typename DomainType::Ptr &parent) { - Trace() << "Running fetcher"; - const qint64 newRevision = executeInitialQuery(query, parent, *mResultProvider); - //Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. - if (query.liveQuery) { - mResourceAccess->sendRevisionReplayedCommand(newRevision); - } + mResultProvider->setFetcher([this, query, instanceIdentifier, factory, bufferType](const typename DomainType::Ptr &parent) { + Trace() << "Running fetcher"; + auto resultProvider = mResultProvider; + auto result = QtConcurrent::run([query, instanceIdentifier, factory, bufferType, parent, resultProvider]() -> qint64 { + QueryWorker worker(query, instanceIdentifier, factory, bufferType); + const qint64 newRevision = worker.executeInitialQuery(query, parent, *resultProvider); + return newRevision; + }); + //Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. + if (query.liveQuery) { + auto watcher = new QFutureWatcher; + watcher->setFuture(result); + QObject::connect(watcher, &QFutureWatcher::finished, watcher, [this, watcher]() { + const auto newRevision = watcher->future().result(); + mResourceAccess->sendRevisionReplayedCommand(newRevision); + delete watcher; + }); + } }); - - //In case of a live query we keep the runner for as long alive as the result provider exists + // In case of a live query we keep the runner for as long alive as the result provider exists if (query.liveQuery) { //Incremental updates are always loaded directly, leaving it up to the result to discard the changes if they are not interesting - setQuery([this, query] () -> KAsync::Job { - return KAsync::start([this, query](KAsync::Future &future) { - //TODO execute in thread - const qint64 newRevision = executeIncrementalQuery(query, *mResultProvider); - mResourceAccess->sendRevisionReplayedCommand(newRevision); - future.setFinished(); + setQuery([this, query, instanceIdentifier, factory, bufferType] () -> KAsync::Job { + return KAsync::start([this, query, instanceIdentifier, factory, bufferType](KAsync::Future &future) { + auto resultProvider = mResultProvider; + auto result = QtConcurrent::run([query, instanceIdentifier, factory, bufferType, resultProvider]() -> qint64 { + QueryWorker worker(query, instanceIdentifier, factory, bufferType); + const qint64 newRevision = worker.executeIncrementalQuery(query, *resultProvider); + return newRevision; + }); + auto watcher = new QFutureWatcher; + watcher->setFuture(result); + QObject::connect(watcher, &QFutureWatcher::finished, watcher, [this, &future, watcher]() { + const auto newRevision = watcher->future().result(); + mResourceAccess->sendRevisionReplayedCommand(newRevision); + future.setFinished(); + delete watcher; + }); }); }); //Ensure the connection is open, if it wasn't already opened @@ -102,8 +134,48 @@ typename Akonadi2::ResultEmitter::Ptr QueryRunneremitter(); } + + +static inline ResultSet fullScan(const Akonadi2::Storage::Transaction &transaction, const QByteArray &bufferType) +{ + //TODO use a result set with an iterator, to read values on demand + QVector keys; + transaction.openDatabase(bufferType + ".main").scan(QByteArray(), [&](const QByteArray &key, const QByteArray &value) -> bool { + //Skip internals + if (Akonadi2::Storage::isInternalKey(key)) { + return true; + } + keys << Akonadi2::Storage::uidFromKey(key); + return true; + }, + [](const Akonadi2::Storage::Error &error) { + qWarning() << "Error during query: " << error.message; + }); + + Trace() << "Full scan on " << bufferType << " found " << keys.size() << " results"; + return ResultSet(keys); +} + + template -void QueryRunner::replaySet(ResultSet &resultSet, Akonadi2::ResultProviderInterface &resultProvider, const QList &properties) +QueryWorker::QueryWorker(const Akonadi2::Query &query, const QByteArray &instanceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &factory, const QByteArray &bufferType) + : QObject(), + mDomainTypeAdaptorFactory(factory), + mResourceInstanceIdentifier(instanceIdentifier), + mBufferType(bufferType), + mQuery(query) +{ + Trace() << "Starting query worker"; +} + +template +QueryWorker::~QueryWorker() +{ + Trace() << "Stopped query worker"; +} + +template +void QueryWorker::replaySet(ResultSet &resultSet, Akonadi2::ResultProviderInterface &resultProvider, const QList &properties) { int counter = 0; while (resultSet.next([&resultProvider, &counter, &properties](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &value, Akonadi2::Operation operation) -> bool { @@ -128,7 +200,7 @@ void QueryRunner::replaySet(ResultSet &resultSet, Akonadi2::ResultPr } template -void QueryRunner::readEntity(const Akonadi2::Storage::NamedDatabase &db, const QByteArray &key, const std::function &resultCallback) +void QueryWorker::readEntity(const Akonadi2::Storage::NamedDatabase &db, const QByteArray &key, const std::function &resultCallback) { //This only works for a 1:1 mapping of resource to domain types. //Not i.e. for tags that are stored as flags in each entity of an imap store. @@ -150,7 +222,7 @@ void QueryRunner::readEntity(const Akonadi2::Storage::NamedDatabase } template -ResultSet QueryRunner::loadInitialResultSet(const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet &remainingFilters) +ResultSet QueryWorker::loadInitialResultSet(const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet &remainingFilters) { if (!query.ids.isEmpty()) { return ResultSet(query.ids.toVector()); @@ -168,7 +240,7 @@ ResultSet QueryRunner::loadInitialResultSet(const Akonadi2::Query &q } template -ResultSet QueryRunner::loadIncrementalResultSet(qint64 baseRevision, const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet &remainingFilters) +ResultSet QueryWorker::loadIncrementalResultSet(qint64 baseRevision, const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet &remainingFilters) { const auto bufferType = mBufferType; auto revisionCounter = QSharedPointer::create(baseRevision); @@ -196,7 +268,7 @@ ResultSet QueryRunner::loadIncrementalResultSet(qint64 baseRevision, } template -ResultSet QueryRunner::filterSet(const ResultSet &resultSet, const std::function &filter, const Akonadi2::Storage::NamedDatabase &db, bool initialQuery) +ResultSet QueryWorker::filterSet(const ResultSet &resultSet, const std::function &filter, const Akonadi2::Storage::NamedDatabase &db, bool initialQuery) { auto resultSetPtr = QSharedPointer::create(resultSet); @@ -225,9 +297,8 @@ ResultSet QueryRunner::filterSet(const ResultSet &resultSet, const s return ResultSet(generator); } - template -std::function QueryRunner::getFilter(const QSet remainingFilters, const Akonadi2::Query &query) +std::function QueryWorker::getFilter(const QSet remainingFilters, const Akonadi2::Query &query) { return [remainingFilters, query](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject) -> bool { if (!query.ids.isEmpty()) { @@ -252,7 +323,7 @@ std::function -qint64 QueryRunner::load(const Akonadi2::Query &query, const std::function &)> &baseSetRetriever, Akonadi2::ResultProviderInterface &resultProvider, bool initialQuery) +qint64 QueryWorker::load(const Akonadi2::Query &query, const std::function &)> &baseSetRetriever, Akonadi2::ResultProviderInterface &resultProvider, bool initialQuery) { Akonadi2::Storage storage(Akonadi2::storageLocation(), mResourceInstanceIdentifier); storage.setDefaultErrorHandler([](const Akonadi2::Storage::Error &error) { @@ -269,9 +340,8 @@ qint64 QueryRunner::load(const Akonadi2::Query &query, const std::fu return Akonadi2::Storage::maxRevision(transaction); } - template -qint64 QueryRunner::executeIncrementalQuery(const Akonadi2::Query &query, Akonadi2::ResultProviderInterface &resultProvider) +qint64 QueryWorker::executeIncrementalQuery(const Akonadi2::Query &query, Akonadi2::ResultProviderInterface &resultProvider) { QTime time; time.start(); @@ -286,7 +356,7 @@ qint64 QueryRunner::executeIncrementalQuery(const Akonadi2::Query &q } template -qint64 QueryRunner::executeInitialQuery(const Akonadi2::Query &query, const typename DomainType::Ptr &parent, Akonadi2::ResultProviderInterface &resultProvider) +qint64 QueryWorker::executeInitialQuery(const Akonadi2::Query &query, const typename DomainType::Ptr &parent, Akonadi2::ResultProviderInterface &resultProvider) { QTime time; time.start(); @@ -312,3 +382,6 @@ qint64 QueryRunner::executeInitialQuery(const Akonadi2::Query &query template class QueryRunner; template class QueryRunner; template class QueryRunner; +template class QueryWorker; +template class QueryWorker; +template class QueryWorker; diff --git a/common/queryrunner.h b/common/queryrunner.h index 8df0ecd..aba7912 100644 --- a/common/queryrunner.h +++ b/common/queryrunner.h @@ -20,7 +20,6 @@ #pragma once #include -#include "facadeinterface.h" #include "resourceaccess.h" #include "resultprovider.h" #include "domaintypeadaptorfactoryinterface.h" @@ -28,15 +27,8 @@ #include "query.h" /** - * A QueryRunner runs a query and updates the corresponding result set. - * - * The lifetime of the QueryRunner is defined by the resut set (otherwise it's doing useless work), - * and by how long a result set must be updated. If the query is one off the runner dies after the execution, - * otherwise it lives on the react to changes and updates the corresponding result set. - * - * QueryRunner has to keep ResourceAccess alive in order to keep getting updates. + * Base clase because you can't have the Q_OBJECT macro in template classes */ - class QueryRunnerBase : public QObject { Q_OBJECT @@ -74,6 +66,15 @@ private: QueryFunction queryFunction; }; +/** + * A QueryRunner runs a query and updates the corresponding result set. + * + * The lifetime of the QueryRunner is defined by the resut set (otherwise it's doing useless work), + * and by how long a result set must be updated. If the query is one off the runner dies after the execution, + * otherwise it lives on the react to changes and updates the corresponding result set. + * + * QueryRunner has to keep ResourceAccess alive in order to keep getting updates. + */ template class QueryRunner : public QueryRunnerBase { @@ -84,25 +85,7 @@ public: typename Akonadi2::ResultEmitter::Ptr emitter(); private: - static void replaySet(ResultSet &resultSet, Akonadi2::ResultProviderInterface &resultProvider, const QList &properties); - - void readEntity(const Akonadi2::Storage::NamedDatabase &db, const QByteArray &key, const std::function &resultCallback); - - ResultSet loadInitialResultSet(const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet &remainingFilters); - ResultSet loadIncrementalResultSet(qint64 baseRevision, const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet &remainingFilters); - - ResultSet filterSet(const ResultSet &resultSet, const std::function &filter, const Akonadi2::Storage::NamedDatabase &db, bool initialQuery); - std::function getFilter(const QSet remainingFilters, const Akonadi2::Query &query); - qint64 load(const Akonadi2::Query &query, const std::function &)> &baseSetRetriever, Akonadi2::ResultProviderInterface &resultProvider, bool initialQuery); - qint64 executeIncrementalQuery(const Akonadi2::Query &query, Akonadi2::ResultProviderInterface &resultProvider); - qint64 executeInitialQuery(const Akonadi2::Query &query, const typename DomainType::Ptr &parent, Akonadi2::ResultProviderInterface &resultProvider); - -private: - QSharedPointer > mResultProvider; QSharedPointer mResourceAccess; - DomainTypeAdaptorFactoryInterface::Ptr mDomainTypeAdaptorFactory; - QByteArray mResourceInstanceIdentifier; - QByteArray mBufferType; - Akonadi2::Query mQuery; + QSharedPointer > mResultProvider; }; diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 5d64511..1e0f6b5 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -50,11 +50,13 @@ auto_tests ( querytest databasepopulationandfacadequerybenchmark dummyresourcewritebenchmark + modelinteractivitytest ) target_link_libraries(dummyresourcetest akonadi2_resource_dummy) target_link_libraries(dummyresourcebenchmark akonadi2_resource_dummy) target_link_libraries(dummyresourcewritebenchmark akonadi2_resource_dummy) target_link_libraries(querytest akonadi2_resource_dummy) +target_link_libraries(modelinteractivitytest akonadi2_resource_dummy) if (BUILD_MAILDIR) auto_tests ( diff --git a/tests/modelinteractivitytest.cpp b/tests/modelinteractivitytest.cpp new file mode 100644 index 0000000..52db932 --- /dev/null +++ b/tests/modelinteractivitytest.cpp @@ -0,0 +1,101 @@ +#include + +#include +#include + +#include "dummyresource/resourcefactory.h" +#include "clientapi.h" +#include "commands.h" +#include "resourceconfig.h" +#include "log.h" +#include "modelresult.h" + +static int blockingTime; + +class TimeMeasuringApplication : public QCoreApplication +{ + QElapsedTimer t; +public: + TimeMeasuringApplication(int& argc, char ** argv) : QCoreApplication(argc, argv) { } + virtual ~TimeMeasuringApplication() { } + + virtual bool notify(QObject* receiver, QEvent* event) + { + t.start(); + const bool ret = QCoreApplication::notify(receiver, event); + if(t.elapsed() > 1) + std::cout << QString("processing event type %1 for object %2 took %3ms") + .arg((int)event->type()) + .arg(""/* receiver->objectName().toLocal8Bit().data()*/) + .arg((int)t.elapsed()) + .toStdString() << std::endl; + blockingTime += t.elapsed(); + return ret; + } +}; + +/** + * Ensure that queries don't block the system for an extended period of time. + * + * This is done by ensuring that the event loop is never blocked. + */ +class ModelinteractivityTest : public QObject +{ + Q_OBJECT +private Q_SLOTS: + void initTestCase() + { + Akonadi2::Log::setDebugOutputLevel(Akonadi2::Log::Warning); + DummyResource::removeFromDisk("org.kde.dummy.instance1"); + ResourceConfig::addResource("org.kde.dummy.instance1", "org.kde.dummy"); + } + + void cleanup() + { + Akonadi2::Store::shutdown(QByteArray("org.kde.dummy.instance1")).exec().waitForFinished(); + DummyResource::removeFromDisk("org.kde.dummy.instance1"); + Akonadi2::Store::start(QByteArray("org.kde.dummy.instance1")).exec().waitForFinished(); + } + + void init() + { + } + + void testSingle() + { + //Setup + { + Akonadi2::ApplicationDomain::Mail mail("org.kde.dummy.instance1"); + for (int i = 0; i < 1000; i++) { + Akonadi2::Store::create(mail).exec().waitForFinished(); + } + } + + Akonadi2::Query query; + query.resources << "org.kde.dummy.instance1"; + query.syncOnDemand = false; + query.processAll = true; + query.liveQuery = true; + + Akonadi2::Store::synchronize(query).exec().waitForFinished(); + + //Test + QTime time; + time.start(); + auto model = Akonadi2::Store::loadModel(query); + blockingTime += time.elapsed(); + QTRY_VERIFY(model->data(QModelIndex(), Akonadi2::Store::ChildrenFetchedRole).toBool()); + //Never block longer than 10 ms + QVERIFY2(blockingTime < 10, QString("Total blocking time: %1").arg(blockingTime).toLatin1().data()); + } +}; + +int main(int argc, char *argv[]) +{ + blockingTime = 0; + TimeMeasuringApplication app(argc, argv); + ModelinteractivityTest tc; + return QTest::qExec(&tc, argc, argv); +} + +#include "modelinteractivitytest.moc" -- cgit v1.2.3