summaryrefslogtreecommitdiffstats
path: root/common/queryrunner.cpp
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2015-12-27 11:51:19 +0100
committerChristian Mollekopf <chrigi_1@fastmail.fm>2015-12-27 11:51:19 +0100
commit3df975bd9cd244cb941a3ce5068924ef3a05df4f (patch)
tree436535ac2732b6b729d8d07f9dd4adcf422d8f87 /common/queryrunner.cpp
parente490c4798253a418d9fda17f40eff822f8d6ae36 (diff)
downloadsink-3df975bd9cd244cb941a3ce5068924ef3a05df4f.tar.gz
sink-3df975bd9cd244cb941a3ce5068924ef3a05df4f.zip
Use KAsync::Job as abstraction for the threading implementation
Diffstat (limited to 'common/queryrunner.cpp')
-rw-r--r--common/queryrunner.cpp45
1 files changed, 17 insertions, 28 deletions
diff --git a/common/queryrunner.cpp b/common/queryrunner.cpp
index af232c3..89610e3 100644
--- a/common/queryrunner.cpp
+++ b/common/queryrunner.cpp
@@ -18,15 +18,13 @@
18*/ 18*/
19#include "queryrunner.h" 19#include "queryrunner.h"
20 20
21#include <QtConcurrent/QtConcurrentRun>
22#include <QFuture>
23#include <QFutureWatcher>
24#include <QTime> 21#include <QTime>
25#include "commands.h" 22#include "commands.h"
26#include "log.h" 23#include "log.h"
27#include "storage.h" 24#include "storage.h"
28#include "definitions.h" 25#include "definitions.h"
29#include "domainadaptor.h" 26#include "domainadaptor.h"
27#include "asyncutils.h"
30 28
31using namespace Akonadi2; 29using namespace Akonadi2;
32 30
@@ -75,45 +73,36 @@ QueryRunner<DomainType>::QueryRunner(const Akonadi2::Query &query, const Akonadi
75 Trace() << "Starting query"; 73 Trace() << "Starting query";
76 //We delegate loading of initial data to the result provider, os it can decide for itself what it needs to load. 74 //We delegate loading of initial data to the result provider, os it can decide for itself what it needs to load.
77 mResultProvider->setFetcher([this, query, instanceIdentifier, factory, bufferType](const typename DomainType::Ptr &parent) { 75 mResultProvider->setFetcher([this, query, instanceIdentifier, factory, bufferType](const typename DomainType::Ptr &parent) {
78 Trace() << "Running fetcher"; 76 Trace() << "Running fetcher";
79 auto resultProvider = mResultProvider; 77 auto resultProvider = mResultProvider;
80 auto result = QtConcurrent::run([query, instanceIdentifier, factory, bufferType, parent, resultProvider]() -> qint64 { 78 async::run<qint64>([query, instanceIdentifier, factory, bufferType, parent, resultProvider]() -> qint64 {
81 QueryWorker<DomainType> worker(query, instanceIdentifier, factory, bufferType); 79 QueryWorker<DomainType> worker(query, instanceIdentifier, factory, bufferType);
82 const qint64 newRevision = worker.executeInitialQuery(query, parent, *resultProvider); 80 const qint64 newRevision = worker.executeInitialQuery(query, parent, *resultProvider);
83 return newRevision; 81 return newRevision;
84 }); 82 })
85 //Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. 83 .template then<void, qint64>([query, this](qint64 newRevision) {
86 if (query.liveQuery) { 84 //Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise.
87 auto watcher = new QFutureWatcher<qint64>; 85 if (query.liveQuery) {
88 watcher->setFuture(result);
89 QObject::connect(watcher, &QFutureWatcher<qint64>::finished, watcher, [this, watcher]() {
90 const auto newRevision = watcher->future().result();
91 mResourceAccess->sendRevisionReplayedCommand(newRevision); 86 mResourceAccess->sendRevisionReplayedCommand(newRevision);
92 delete watcher; 87 }
93 }); 88 }).exec();
94 }
95 }); 89 });
96 90
97 // In case of a live query we keep the runner for as long alive as the result provider exists 91 // In case of a live query we keep the runner for as long alive as the result provider exists
98 if (query.liveQuery) { 92 if (query.liveQuery) {
99 //Incremental updates are always loaded directly, leaving it up to the result to discard the changes if they are not interesting 93 //Incremental updates are always loaded directly, leaving it up to the result to discard the changes if they are not interesting
100 setQuery([this, query, instanceIdentifier, factory, bufferType] () -> KAsync::Job<void> { 94 setQuery([this, query, instanceIdentifier, factory, bufferType] () -> KAsync::Job<void> {
101 return KAsync::start<void>([this, query, instanceIdentifier, factory, bufferType](KAsync::Future<void> &future) { 95 auto resultProvider = mResultProvider;
102 auto resultProvider = mResultProvider; 96 return async::run<qint64>([query, instanceIdentifier, factory, bufferType, resultProvider]() -> qint64 {
103 auto result = QtConcurrent::run([query, instanceIdentifier, factory, bufferType, resultProvider]() -> qint64 {
104 QueryWorker<DomainType> worker(query, instanceIdentifier, factory, bufferType); 97 QueryWorker<DomainType> worker(query, instanceIdentifier, factory, bufferType);
105 const qint64 newRevision = worker.executeIncrementalQuery(query, *resultProvider); 98 const qint64 newRevision = worker.executeIncrementalQuery(query, *resultProvider);
106 return newRevision; 99 return newRevision;
107 }); 100 })
108 auto watcher = new QFutureWatcher<qint64>; 101 .template then<void, qint64>([query, this](qint64 newRevision) {
109 watcher->setFuture(result); 102 //Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise.
110 QObject::connect(watcher, &QFutureWatcher<qint64>::finished, watcher, [this, &future, watcher]() {
111 const auto newRevision = watcher->future().result();
112 mResourceAccess->sendRevisionReplayedCommand(newRevision); 103 mResourceAccess->sendRevisionReplayedCommand(newRevision);
113 future.setFinished(); 104 })
114 delete watcher; 105 .template then<void>([](){});
115 });
116 });
117 }); 106 });
118 //Ensure the connection is open, if it wasn't already opened 107 //Ensure the connection is open, if it wasn't already opened
119 //TODO If we are not connected already, we have to check for the latest revision once connected, otherwise we could miss some updates 108 //TODO If we are not connected already, we have to check for the latest revision once connected, otherwise we could miss some updates