diff options
-rw-r--r-- | common/asyncutils.h | 42 | ||||
-rw-r--r-- | common/queryrunner.cpp | 45 |
2 files changed, 59 insertions, 28 deletions
diff --git a/common/asyncutils.h b/common/asyncutils.h new file mode 100644 index 0000000..ddcc37c --- /dev/null +++ b/common/asyncutils.h | |||
@@ -0,0 +1,42 @@ | |||
1 | /* | ||
2 | Copyright (c) 2015 Christian Mollekopf <mollekopf@kolabsys.com> | ||
3 | |||
4 | This library is free software; you can redistribute it and/or modify it | ||
5 | under the terms of the GNU Library General Public License as published by | ||
6 | the Free Software Foundation; either version 2 of the License, or (at your | ||
7 | option) any later version. | ||
8 | |||
9 | This library is distributed in the hope that it will be useful, but WITHOUT | ||
10 | ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or | ||
11 | FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public | ||
12 | License for more details. | ||
13 | |||
14 | You should have received a copy of the GNU Library General Public License | ||
15 | along with this library; see the file COPYING.LIB. If not, write to the | ||
16 | Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA | ||
17 | 02110-1301, USA. | ||
18 | */ | ||
19 | #pragma once | ||
20 | |||
21 | #include <Async/Async> | ||
22 | #include <QtConcurrent/QtConcurrentRun> | ||
23 | #include <QFuture> | ||
24 | #include <QFutureWatcher> | ||
25 | |||
26 | namespace async { | ||
27 | template<typename T> | ||
28 | KAsync::Job<T> run(const std::function<T()> &f) | ||
29 | { | ||
30 | return KAsync::start<T>([f](KAsync::Future<T> &future) { | ||
31 | auto result = QtConcurrent::run(f); | ||
32 | auto watcher = new QFutureWatcher<T>; | ||
33 | watcher->setFuture(result); | ||
34 | QObject::connect(watcher, &QFutureWatcher<T>::finished, watcher, [&future, watcher]() { | ||
35 | future.setValue(watcher->future().result()); | ||
36 | delete watcher; | ||
37 | future.setFinished(); | ||
38 | }); | ||
39 | }); | ||
40 | } | ||
41 | |||
42 | } | ||
diff --git a/common/queryrunner.cpp b/common/queryrunner.cpp index af232c3..89610e3 100644 --- a/common/queryrunner.cpp +++ b/common/queryrunner.cpp | |||
@@ -18,15 +18,13 @@ | |||
18 | */ | 18 | */ |
19 | #include "queryrunner.h" | 19 | #include "queryrunner.h" |
20 | 20 | ||
21 | #include <QtConcurrent/QtConcurrentRun> | ||
22 | #include <QFuture> | ||
23 | #include <QFutureWatcher> | ||
24 | #include <QTime> | 21 | #include <QTime> |
25 | #include "commands.h" | 22 | #include "commands.h" |
26 | #include "log.h" | 23 | #include "log.h" |
27 | #include "storage.h" | 24 | #include "storage.h" |
28 | #include "definitions.h" | 25 | #include "definitions.h" |
29 | #include "domainadaptor.h" | 26 | #include "domainadaptor.h" |
27 | #include "asyncutils.h" | ||
30 | 28 | ||
31 | using namespace Akonadi2; | 29 | using namespace Akonadi2; |
32 | 30 | ||
@@ -75,45 +73,36 @@ QueryRunner<DomainType>::QueryRunner(const Akonadi2::Query &query, const Akonadi | |||
75 | Trace() << "Starting query"; | 73 | Trace() << "Starting query"; |
76 | //We delegate loading of initial data to the result provider, os it can decide for itself what it needs to load. | 74 | //We delegate loading of initial data to the result provider, os it can decide for itself what it needs to load. |
77 | mResultProvider->setFetcher([this, query, instanceIdentifier, factory, bufferType](const typename DomainType::Ptr &parent) { | 75 | mResultProvider->setFetcher([this, query, instanceIdentifier, factory, bufferType](const typename DomainType::Ptr &parent) { |
78 | Trace() << "Running fetcher"; | 76 | Trace() << "Running fetcher"; |
79 | auto resultProvider = mResultProvider; | 77 | auto resultProvider = mResultProvider; |
80 | auto result = QtConcurrent::run([query, instanceIdentifier, factory, bufferType, parent, resultProvider]() -> qint64 { | 78 | async::run<qint64>([query, instanceIdentifier, factory, bufferType, parent, resultProvider]() -> qint64 { |
81 | QueryWorker<DomainType> worker(query, instanceIdentifier, factory, bufferType); | 79 | QueryWorker<DomainType> worker(query, instanceIdentifier, factory, bufferType); |
82 | const qint64 newRevision = worker.executeInitialQuery(query, parent, *resultProvider); | 80 | const qint64 newRevision = worker.executeInitialQuery(query, parent, *resultProvider); |
83 | return newRevision; | 81 | return newRevision; |
84 | }); | 82 | }) |
85 | //Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. | 83 | .template then<void, qint64>([query, this](qint64 newRevision) { |
86 | if (query.liveQuery) { | 84 | //Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. |
87 | auto watcher = new QFutureWatcher<qint64>; | 85 | if (query.liveQuery) { |
88 | watcher->setFuture(result); | ||
89 | QObject::connect(watcher, &QFutureWatcher<qint64>::finished, watcher, [this, watcher]() { | ||
90 | const auto newRevision = watcher->future().result(); | ||
91 | mResourceAccess->sendRevisionReplayedCommand(newRevision); | 86 | mResourceAccess->sendRevisionReplayedCommand(newRevision); |
92 | delete watcher; | 87 | } |
93 | }); | 88 | }).exec(); |
94 | } | ||
95 | }); | 89 | }); |
96 | 90 | ||
97 | // In case of a live query we keep the runner for as long alive as the result provider exists | 91 | // In case of a live query we keep the runner for as long alive as the result provider exists |
98 | if (query.liveQuery) { | 92 | if (query.liveQuery) { |
99 | //Incremental updates are always loaded directly, leaving it up to the result to discard the changes if they are not interesting | 93 | //Incremental updates are always loaded directly, leaving it up to the result to discard the changes if they are not interesting |
100 | setQuery([this, query, instanceIdentifier, factory, bufferType] () -> KAsync::Job<void> { | 94 | setQuery([this, query, instanceIdentifier, factory, bufferType] () -> KAsync::Job<void> { |
101 | return KAsync::start<void>([this, query, instanceIdentifier, factory, bufferType](KAsync::Future<void> &future) { | 95 | auto resultProvider = mResultProvider; |
102 | auto resultProvider = mResultProvider; | 96 | return async::run<qint64>([query, instanceIdentifier, factory, bufferType, resultProvider]() -> qint64 { |
103 | auto result = QtConcurrent::run([query, instanceIdentifier, factory, bufferType, resultProvider]() -> qint64 { | ||
104 | QueryWorker<DomainType> worker(query, instanceIdentifier, factory, bufferType); | 97 | QueryWorker<DomainType> worker(query, instanceIdentifier, factory, bufferType); |
105 | const qint64 newRevision = worker.executeIncrementalQuery(query, *resultProvider); | 98 | const qint64 newRevision = worker.executeIncrementalQuery(query, *resultProvider); |
106 | return newRevision; | 99 | return newRevision; |
107 | }); | 100 | }) |
108 | auto watcher = new QFutureWatcher<qint64>; | 101 | .template then<void, qint64>([query, this](qint64 newRevision) { |
109 | watcher->setFuture(result); | 102 | //Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. |
110 | QObject::connect(watcher, &QFutureWatcher<qint64>::finished, watcher, [this, &future, watcher]() { | ||
111 | const auto newRevision = watcher->future().result(); | ||
112 | mResourceAccess->sendRevisionReplayedCommand(newRevision); | 103 | mResourceAccess->sendRevisionReplayedCommand(newRevision); |
113 | future.setFinished(); | 104 | }) |
114 | delete watcher; | 105 | .template then<void>([](){}); |
115 | }); | ||
116 | }); | ||
117 | }); | 106 | }); |
118 | //Ensure the connection is open, if it wasn't already opened | 107 | //Ensure the connection is open, if it wasn't already opened |
119 | //TODO If we are not connected already, we have to check for the latest revision once connected, otherwise we could miss some updates | 108 | //TODO If we are not connected already, we have to check for the latest revision once connected, otherwise we could miss some updates |