summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--common/asyncutils.h42
-rw-r--r--common/queryrunner.cpp45
2 files changed, 59 insertions, 28 deletions
diff --git a/common/asyncutils.h b/common/asyncutils.h
new file mode 100644
index 0000000..ddcc37c
--- /dev/null
+++ b/common/asyncutils.h
@@ -0,0 +1,42 @@
1/*
2 Copyright (c) 2015 Christian Mollekopf <mollekopf@kolabsys.com>
3
4 This library is free software; you can redistribute it and/or modify it
5 under the terms of the GNU Library General Public License as published by
6 the Free Software Foundation; either version 2 of the License, or (at your
7 option) any later version.
8
9 This library is distributed in the hope that it will be useful, but WITHOUT
10 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public
12 License for more details.
13
14 You should have received a copy of the GNU Library General Public License
15 along with this library; see the file COPYING.LIB. If not, write to the
16 Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
17 02110-1301, USA.
18*/
19#pragma once
20
21#include <Async/Async>
22#include <QtConcurrent/QtConcurrentRun>
23#include <QFuture>
24#include <QFutureWatcher>
25
26namespace async {
27 template<typename T>
28 KAsync::Job<T> run(const std::function<T()> &f)
29 {
30 return KAsync::start<T>([f](KAsync::Future<T> &future) {
31 auto result = QtConcurrent::run(f);
32 auto watcher = new QFutureWatcher<T>;
33 watcher->setFuture(result);
34 QObject::connect(watcher, &QFutureWatcher<T>::finished, watcher, [&future, watcher]() {
35 future.setValue(watcher->future().result());
36 delete watcher;
37 future.setFinished();
38 });
39 });
40 }
41
42}
diff --git a/common/queryrunner.cpp b/common/queryrunner.cpp
index af232c3..89610e3 100644
--- a/common/queryrunner.cpp
+++ b/common/queryrunner.cpp
@@ -18,15 +18,13 @@
18*/ 18*/
19#include "queryrunner.h" 19#include "queryrunner.h"
20 20
21#include <QtConcurrent/QtConcurrentRun>
22#include <QFuture>
23#include <QFutureWatcher>
24#include <QTime> 21#include <QTime>
25#include "commands.h" 22#include "commands.h"
26#include "log.h" 23#include "log.h"
27#include "storage.h" 24#include "storage.h"
28#include "definitions.h" 25#include "definitions.h"
29#include "domainadaptor.h" 26#include "domainadaptor.h"
27#include "asyncutils.h"
30 28
31using namespace Akonadi2; 29using namespace Akonadi2;
32 30
@@ -75,45 +73,36 @@ QueryRunner<DomainType>::QueryRunner(const Akonadi2::Query &query, const Akonadi
75 Trace() << "Starting query"; 73 Trace() << "Starting query";
76 //We delegate loading of initial data to the result provider, os it can decide for itself what it needs to load. 74 //We delegate loading of initial data to the result provider, os it can decide for itself what it needs to load.
77 mResultProvider->setFetcher([this, query, instanceIdentifier, factory, bufferType](const typename DomainType::Ptr &parent) { 75 mResultProvider->setFetcher([this, query, instanceIdentifier, factory, bufferType](const typename DomainType::Ptr &parent) {
78 Trace() << "Running fetcher"; 76 Trace() << "Running fetcher";
79 auto resultProvider = mResultProvider; 77 auto resultProvider = mResultProvider;
80 auto result = QtConcurrent::run([query, instanceIdentifier, factory, bufferType, parent, resultProvider]() -> qint64 { 78 async::run<qint64>([query, instanceIdentifier, factory, bufferType, parent, resultProvider]() -> qint64 {
81 QueryWorker<DomainType> worker(query, instanceIdentifier, factory, bufferType); 79 QueryWorker<DomainType> worker(query, instanceIdentifier, factory, bufferType);
82 const qint64 newRevision = worker.executeInitialQuery(query, parent, *resultProvider); 80 const qint64 newRevision = worker.executeInitialQuery(query, parent, *resultProvider);
83 return newRevision; 81 return newRevision;
84 }); 82 })
85 //Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. 83 .template then<void, qint64>([query, this](qint64 newRevision) {
86 if (query.liveQuery) { 84 //Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise.
87 auto watcher = new QFutureWatcher<qint64>; 85 if (query.liveQuery) {
88 watcher->setFuture(result);
89 QObject::connect(watcher, &QFutureWatcher<qint64>::finished, watcher, [this, watcher]() {
90 const auto newRevision = watcher->future().result();
91 mResourceAccess->sendRevisionReplayedCommand(newRevision); 86 mResourceAccess->sendRevisionReplayedCommand(newRevision);
92 delete watcher; 87 }
93 }); 88 }).exec();
94 }
95 }); 89 });
96 90
97 // In case of a live query we keep the runner for as long alive as the result provider exists 91 // In case of a live query we keep the runner for as long alive as the result provider exists
98 if (query.liveQuery) { 92 if (query.liveQuery) {
99 //Incremental updates are always loaded directly, leaving it up to the result to discard the changes if they are not interesting 93 //Incremental updates are always loaded directly, leaving it up to the result to discard the changes if they are not interesting
100 setQuery([this, query, instanceIdentifier, factory, bufferType] () -> KAsync::Job<void> { 94 setQuery([this, query, instanceIdentifier, factory, bufferType] () -> KAsync::Job<void> {
101 return KAsync::start<void>([this, query, instanceIdentifier, factory, bufferType](KAsync::Future<void> &future) { 95 auto resultProvider = mResultProvider;
102 auto resultProvider = mResultProvider; 96 return async::run<qint64>([query, instanceIdentifier, factory, bufferType, resultProvider]() -> qint64 {
103 auto result = QtConcurrent::run([query, instanceIdentifier, factory, bufferType, resultProvider]() -> qint64 {
104 QueryWorker<DomainType> worker(query, instanceIdentifier, factory, bufferType); 97 QueryWorker<DomainType> worker(query, instanceIdentifier, factory, bufferType);
105 const qint64 newRevision = worker.executeIncrementalQuery(query, *resultProvider); 98 const qint64 newRevision = worker.executeIncrementalQuery(query, *resultProvider);
106 return newRevision; 99 return newRevision;
107 }); 100 })
108 auto watcher = new QFutureWatcher<qint64>; 101 .template then<void, qint64>([query, this](qint64 newRevision) {
109 watcher->setFuture(result); 102 //Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise.
110 QObject::connect(watcher, &QFutureWatcher<qint64>::finished, watcher, [this, &future, watcher]() {
111 const auto newRevision = watcher->future().result();
112 mResourceAccess->sendRevisionReplayedCommand(newRevision); 103 mResourceAccess->sendRevisionReplayedCommand(newRevision);
113 future.setFinished(); 104 })
114 delete watcher; 105 .template then<void>([](){});
115 });
116 });
117 }); 106 });
118 //Ensure the connection is open, if it wasn't already opened 107 //Ensure the connection is open, if it wasn't already opened
119 //TODO If we are not connected already, we have to check for the latest revision once connected, otherwise we could miss some updates 108 //TODO If we are not connected already, we have to check for the latest revision once connected, otherwise we could miss some updates