From 3df975bd9cd244cb941a3ce5068924ef3a05df4f Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Sun, 27 Dec 2015 11:51:19 +0100 Subject: Use KAsync::Job as abstraction for the threading implementation --- common/asyncutils.h | 42 ++++++++++++++++++++++++++++++++++++++++++ common/queryrunner.cpp | 45 +++++++++++++++++---------------------------- 2 files changed, 59 insertions(+), 28 deletions(-) create mode 100644 common/asyncutils.h (limited to 'common') diff --git a/common/asyncutils.h b/common/asyncutils.h new file mode 100644 index 0000000..ddcc37c --- /dev/null +++ b/common/asyncutils.h @@ -0,0 +1,42 @@ +/* + Copyright (c) 2015 Christian Mollekopf + + This library is free software; you can redistribute it and/or modify it + under the terms of the GNU Library General Public License as published by + the Free Software Foundation; either version 2 of the License, or (at your + option) any later version. + + This library is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public + License for more details. + + You should have received a copy of the GNU Library General Public License + along with this library; see the file COPYING.LIB. If not, write to the + Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA + 02110-1301, USA. +*/ +#pragma once + +#include +#include +#include +#include + +namespace async { + template + KAsync::Job run(const std::function &f) + { + return KAsync::start([f](KAsync::Future &future) { + auto result = QtConcurrent::run(f); + auto watcher = new QFutureWatcher; + watcher->setFuture(result); + QObject::connect(watcher, &QFutureWatcher::finished, watcher, [&future, watcher]() { + future.setValue(watcher->future().result()); + delete watcher; + future.setFinished(); + }); + }); + } + +} diff --git a/common/queryrunner.cpp b/common/queryrunner.cpp index af232c3..89610e3 100644 --- a/common/queryrunner.cpp +++ b/common/queryrunner.cpp @@ -18,15 +18,13 @@ */ #include "queryrunner.h" -#include -#include -#include #include #include "commands.h" #include "log.h" #include "storage.h" #include "definitions.h" #include "domainadaptor.h" +#include "asyncutils.h" using namespace Akonadi2; @@ -75,45 +73,36 @@ QueryRunner::QueryRunner(const Akonadi2::Query &query, const Akonadi Trace() << "Starting query"; //We delegate loading of initial data to the result provider, os it can decide for itself what it needs to load. mResultProvider->setFetcher([this, query, instanceIdentifier, factory, bufferType](const typename DomainType::Ptr &parent) { - Trace() << "Running fetcher"; - auto resultProvider = mResultProvider; - auto result = QtConcurrent::run([query, instanceIdentifier, factory, bufferType, parent, resultProvider]() -> qint64 { + Trace() << "Running fetcher"; + auto resultProvider = mResultProvider; + async::run([query, instanceIdentifier, factory, bufferType, parent, resultProvider]() -> qint64 { QueryWorker worker(query, instanceIdentifier, factory, bufferType); const qint64 newRevision = worker.executeInitialQuery(query, parent, *resultProvider); return newRevision; - }); - //Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. - if (query.liveQuery) { - auto watcher = new QFutureWatcher; - watcher->setFuture(result); - QObject::connect(watcher, &QFutureWatcher::finished, watcher, [this, watcher]() { - const auto newRevision = watcher->future().result(); + }) + .template then([query, this](qint64 newRevision) { + //Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. + if (query.liveQuery) { mResourceAccess->sendRevisionReplayedCommand(newRevision); - delete watcher; - }); - } + } + }).exec(); }); // In case of a live query we keep the runner for as long alive as the result provider exists if (query.liveQuery) { //Incremental updates are always loaded directly, leaving it up to the result to discard the changes if they are not interesting setQuery([this, query, instanceIdentifier, factory, bufferType] () -> KAsync::Job { - return KAsync::start([this, query, instanceIdentifier, factory, bufferType](KAsync::Future &future) { - auto resultProvider = mResultProvider; - auto result = QtConcurrent::run([query, instanceIdentifier, factory, bufferType, resultProvider]() -> qint64 { + auto resultProvider = mResultProvider; + return async::run([query, instanceIdentifier, factory, bufferType, resultProvider]() -> qint64 { QueryWorker worker(query, instanceIdentifier, factory, bufferType); const qint64 newRevision = worker.executeIncrementalQuery(query, *resultProvider); return newRevision; - }); - auto watcher = new QFutureWatcher; - watcher->setFuture(result); - QObject::connect(watcher, &QFutureWatcher::finished, watcher, [this, &future, watcher]() { - const auto newRevision = watcher->future().result(); + }) + .template then([query, this](qint64 newRevision) { + //Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. mResourceAccess->sendRevisionReplayedCommand(newRevision); - future.setFinished(); - delete watcher; - }); - }); + }) + .template then([](){}); }); //Ensure the connection is open, if it wasn't already opened //TODO If we are not connected already, we have to check for the latest revision once connected, otherwise we could miss some updates -- cgit v1.2.3