summaryrefslogtreecommitdiffstats
path: root/common/queryrunner.cpp
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2015-12-27 10:50:18 +0100
committerChristian Mollekopf <chrigi_1@fastmail.fm>2015-12-27 10:52:56 +0100
commit2b012938ac0adaa173705c931e12f40184036183 (patch)
treeed1f65aa5c1435ef1a4dba6829d306bd1dfbf453 /common/queryrunner.cpp
parent5eb17e7eab0cbbed0f7b7df84d745f228446703d (diff)
downloadsink-2b012938ac0adaa173705c931e12f40184036183.tar.gz
sink-2b012938ac0adaa173705c931e12f40184036183.zip
Threaded query runner implementation
All database access is now implemented in threads, to avoid blocking the main thread. The resource communication still resides in the main thread to keep the coordination simple. With it comes a test that ensures we don't block the main thread for too long.
Diffstat (limited to 'common/queryrunner.cpp')
-rw-r--r--common/queryrunner.cpp169
1 files changed, 121 insertions, 48 deletions
diff --git a/common/queryrunner.cpp b/common/queryrunner.cpp
index 25c9d5b..af232c3 100644
--- a/common/queryrunner.cpp
+++ b/common/queryrunner.cpp
@@ -30,57 +30,89 @@
30 30
31using namespace Akonadi2; 31using namespace Akonadi2;
32 32
33static inline ResultSet fullScan(const Akonadi2::Storage::Transaction &transaction, const QByteArray &bufferType) 33/*
34 * This class wraps the actual query implementation.
35 *
36 * This is a worker object that can be moved to a thread to execute the query.
37 * The only interaction point is the ResultProvider, which handles the threadsafe reporting of the result.
38 */
39template<typename DomainType>
40class QueryWorker : public QObject
34{ 41{
35 //TODO use a result set with an iterator, to read values on demand 42public:
36 QVector<QByteArray> keys; 43 QueryWorker(const Akonadi2::Query &query, const QByteArray &instanceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &, const QByteArray &bufferType);
37 transaction.openDatabase(bufferType + ".main").scan(QByteArray(), [&](const QByteArray &key, const QByteArray &value) -> bool { 44 virtual ~QueryWorker();
38 //Skip internals 45
39 if (Akonadi2::Storage::isInternalKey(key)) { 46 qint64 executeIncrementalQuery(const Akonadi2::Query &query, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider);
40 return true; 47 qint64 executeInitialQuery(const Akonadi2::Query &query, const typename DomainType::Ptr &parent, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider);
41 } 48
42 keys << Akonadi2::Storage::uidFromKey(key); 49private:
43 return true; 50 static void replaySet(ResultSet &resultSet, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, const QList<QByteArray> &properties);
44 }, 51
45 [](const Akonadi2::Storage::Error &error) { 52 void readEntity(const Akonadi2::Storage::NamedDatabase &db, const QByteArray &key, const std::function<void(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &, Akonadi2::Operation)> &resultCallback);
46 qWarning() << "Error during query: " << error.message; 53
47 }); 54 ResultSet loadInitialResultSet(const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters);
55 ResultSet loadIncrementalResultSet(qint64 baseRevision, const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters);
56
57 ResultSet filterSet(const ResultSet &resultSet, const std::function<bool(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> &filter, const Akonadi2::Storage::NamedDatabase &db, bool initialQuery);
58 std::function<bool(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> getFilter(const QSet<QByteArray> remainingFilters, const Akonadi2::Query &query);
59 qint64 load(const Akonadi2::Query &query, const std::function<ResultSet(Akonadi2::Storage::Transaction &, QSet<QByteArray> &)> &baseSetRetriever, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, bool initialQuery);
60
61private:
62 DomainTypeAdaptorFactoryInterface::Ptr mDomainTypeAdaptorFactory;
63 QByteArray mResourceInstanceIdentifier;
64 QByteArray mBufferType;
65 Akonadi2::Query mQuery;
66};
48 67
49 Trace() << "Full scan on " << bufferType << " found " << keys.size() << " results";
50 return ResultSet(keys);
51}
52 68
53template<class DomainType> 69template<class DomainType>
54QueryRunner<DomainType>::QueryRunner(const Akonadi2::Query &query, const Akonadi2::ResourceAccessInterface::Ptr &resourceAccess, const QByteArray &instanceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &factory, const QByteArray &bufferType) 70QueryRunner<DomainType>::QueryRunner(const Akonadi2::Query &query, const Akonadi2::ResourceAccessInterface::Ptr &resourceAccess, const QByteArray &instanceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &factory, const QByteArray &bufferType)
55 : QueryRunnerBase(), 71 : QueryRunnerBase(),
56 mResultProvider(new ResultProvider<typename DomainType::Ptr>),
57 mResourceAccess(resourceAccess), 72 mResourceAccess(resourceAccess),
58 mDomainTypeAdaptorFactory(factory), 73 mResultProvider(new ResultProvider<typename DomainType::Ptr>)
59 mResourceInstanceIdentifier(instanceIdentifier),
60 mBufferType(bufferType),
61 mQuery(query)
62{ 74{
63 Trace() << "Starting query"; 75 Trace() << "Starting query";
64 //We delegate loading of initial data to the result provider, os it can decide for itself what it needs to load. 76 //We delegate loading of initial data to the result provider, os it can decide for itself what it needs to load.
65 mResultProvider->setFetcher([this, query](const typename DomainType::Ptr &parent) { 77 mResultProvider->setFetcher([this, query, instanceIdentifier, factory, bufferType](const typename DomainType::Ptr &parent) {
66 Trace() << "Running fetcher"; 78 Trace() << "Running fetcher";
67 const qint64 newRevision = executeInitialQuery(query, parent, *mResultProvider); 79 auto resultProvider = mResultProvider;
68 //Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. 80 auto result = QtConcurrent::run([query, instanceIdentifier, factory, bufferType, parent, resultProvider]() -> qint64 {
69 if (query.liveQuery) { 81 QueryWorker<DomainType> worker(query, instanceIdentifier, factory, bufferType);
70 mResourceAccess->sendRevisionReplayedCommand(newRevision); 82 const qint64 newRevision = worker.executeInitialQuery(query, parent, *resultProvider);
71 } 83 return newRevision;
84 });
85 //Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise.
86 if (query.liveQuery) {
87 auto watcher = new QFutureWatcher<qint64>;
88 watcher->setFuture(result);
89 QObject::connect(watcher, &QFutureWatcher<qint64>::finished, watcher, [this, watcher]() {
90 const auto newRevision = watcher->future().result();
91 mResourceAccess->sendRevisionReplayedCommand(newRevision);
92 delete watcher;
93 });
94 }
72 }); 95 });
73 96
74 97 // In case of a live query we keep the runner for as long alive as the result provider exists
75 //In case of a live query we keep the runner for as long alive as the result provider exists
76 if (query.liveQuery) { 98 if (query.liveQuery) {
77 //Incremental updates are always loaded directly, leaving it up to the result to discard the changes if they are not interesting 99 //Incremental updates are always loaded directly, leaving it up to the result to discard the changes if they are not interesting
78 setQuery([this, query] () -> KAsync::Job<void> { 100 setQuery([this, query, instanceIdentifier, factory, bufferType] () -> KAsync::Job<void> {
79 return KAsync::start<void>([this, query](KAsync::Future<void> &future) { 101 return KAsync::start<void>([this, query, instanceIdentifier, factory, bufferType](KAsync::Future<void> &future) {
80 //TODO execute in thread 102 auto resultProvider = mResultProvider;
81 const qint64 newRevision = executeIncrementalQuery(query, *mResultProvider); 103 auto result = QtConcurrent::run([query, instanceIdentifier, factory, bufferType, resultProvider]() -> qint64 {
82 mResourceAccess->sendRevisionReplayedCommand(newRevision); 104 QueryWorker<DomainType> worker(query, instanceIdentifier, factory, bufferType);
83 future.setFinished(); 105 const qint64 newRevision = worker.executeIncrementalQuery(query, *resultProvider);
106 return newRevision;
107 });
108 auto watcher = new QFutureWatcher<qint64>;
109 watcher->setFuture(result);
110 QObject::connect(watcher, &QFutureWatcher<qint64>::finished, watcher, [this, &future, watcher]() {
111 const auto newRevision = watcher->future().result();
112 mResourceAccess->sendRevisionReplayedCommand(newRevision);
113 future.setFinished();
114 delete watcher;
115 });
84 }); 116 });
85 }); 117 });
86 //Ensure the connection is open, if it wasn't already opened 118 //Ensure the connection is open, if it wasn't already opened
@@ -102,8 +134,48 @@ typename Akonadi2::ResultEmitter<typename DomainType::Ptr>::Ptr QueryRunner<Doma
102 return mResultProvider->emitter(); 134 return mResultProvider->emitter();
103} 135}
104 136
137
138
139static inline ResultSet fullScan(const Akonadi2::Storage::Transaction &transaction, const QByteArray &bufferType)
140{
141 //TODO use a result set with an iterator, to read values on demand
142 QVector<QByteArray> keys;
143 transaction.openDatabase(bufferType + ".main").scan(QByteArray(), [&](const QByteArray &key, const QByteArray &value) -> bool {
144 //Skip internals
145 if (Akonadi2::Storage::isInternalKey(key)) {
146 return true;
147 }
148 keys << Akonadi2::Storage::uidFromKey(key);
149 return true;
150 },
151 [](const Akonadi2::Storage::Error &error) {
152 qWarning() << "Error during query: " << error.message;
153 });
154
155 Trace() << "Full scan on " << bufferType << " found " << keys.size() << " results";
156 return ResultSet(keys);
157}
158
159
105template<class DomainType> 160template<class DomainType>
106void QueryRunner<DomainType>::replaySet(ResultSet &resultSet, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, const QList<QByteArray> &properties) 161QueryWorker<DomainType>::QueryWorker(const Akonadi2::Query &query, const QByteArray &instanceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &factory, const QByteArray &bufferType)
162 : QObject(),
163 mDomainTypeAdaptorFactory(factory),
164 mResourceInstanceIdentifier(instanceIdentifier),
165 mBufferType(bufferType),
166 mQuery(query)
167{
168 Trace() << "Starting query worker";
169}
170
171template<class DomainType>
172QueryWorker<DomainType>::~QueryWorker()
173{
174 Trace() << "Stopped query worker";
175}
176
177template<class DomainType>
178void QueryWorker<DomainType>::replaySet(ResultSet &resultSet, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, const QList<QByteArray> &properties)
107{ 179{
108 int counter = 0; 180 int counter = 0;
109 while (resultSet.next([&resultProvider, &counter, &properties](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &value, Akonadi2::Operation operation) -> bool { 181 while (resultSet.next([&resultProvider, &counter, &properties](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &value, Akonadi2::Operation operation) -> bool {
@@ -128,7 +200,7 @@ void QueryRunner<DomainType>::replaySet(ResultSet &resultSet, Akonadi2::ResultPr
128} 200}
129 201
130template<class DomainType> 202template<class DomainType>
131void QueryRunner<DomainType>::readEntity(const Akonadi2::Storage::NamedDatabase &db, const QByteArray &key, const std::function<void(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &, Akonadi2::Operation)> &resultCallback) 203void QueryWorker<DomainType>::readEntity(const Akonadi2::Storage::NamedDatabase &db, const QByteArray &key, const std::function<void(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &, Akonadi2::Operation)> &resultCallback)
132{ 204{
133 //This only works for a 1:1 mapping of resource to domain types. 205 //This only works for a 1:1 mapping of resource to domain types.
134 //Not i.e. for tags that are stored as flags in each entity of an imap store. 206 //Not i.e. for tags that are stored as flags in each entity of an imap store.
@@ -150,7 +222,7 @@ void QueryRunner<DomainType>::readEntity(const Akonadi2::Storage::NamedDatabase
150} 222}
151 223
152template<class DomainType> 224template<class DomainType>
153ResultSet QueryRunner<DomainType>::loadInitialResultSet(const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters) 225ResultSet QueryWorker<DomainType>::loadInitialResultSet(const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters)
154{ 226{
155 if (!query.ids.isEmpty()) { 227 if (!query.ids.isEmpty()) {
156 return ResultSet(query.ids.toVector()); 228 return ResultSet(query.ids.toVector());
@@ -168,7 +240,7 @@ ResultSet QueryRunner<DomainType>::loadInitialResultSet(const Akonadi2::Query &q
168} 240}
169 241
170template<class DomainType> 242template<class DomainType>
171ResultSet QueryRunner<DomainType>::loadIncrementalResultSet(qint64 baseRevision, const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters) 243ResultSet QueryWorker<DomainType>::loadIncrementalResultSet(qint64 baseRevision, const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters)
172{ 244{
173 const auto bufferType = mBufferType; 245 const auto bufferType = mBufferType;
174 auto revisionCounter = QSharedPointer<qint64>::create(baseRevision); 246 auto revisionCounter = QSharedPointer<qint64>::create(baseRevision);
@@ -196,7 +268,7 @@ ResultSet QueryRunner<DomainType>::loadIncrementalResultSet(qint64 baseRevision,
196} 268}
197 269
198template<class DomainType> 270template<class DomainType>
199ResultSet QueryRunner<DomainType>::filterSet(const ResultSet &resultSet, const std::function<bool(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> &filter, const Akonadi2::Storage::NamedDatabase &db, bool initialQuery) 271ResultSet QueryWorker<DomainType>::filterSet(const ResultSet &resultSet, const std::function<bool(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> &filter, const Akonadi2::Storage::NamedDatabase &db, bool initialQuery)
200{ 272{
201 auto resultSetPtr = QSharedPointer<ResultSet>::create(resultSet); 273 auto resultSetPtr = QSharedPointer<ResultSet>::create(resultSet);
202 274
@@ -225,9 +297,8 @@ ResultSet QueryRunner<DomainType>::filterSet(const ResultSet &resultSet, const s
225 return ResultSet(generator); 297 return ResultSet(generator);
226} 298}
227 299
228
229template<class DomainType> 300template<class DomainType>
230std::function<bool(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> QueryRunner<DomainType>::getFilter(const QSet<QByteArray> remainingFilters, const Akonadi2::Query &query) 301std::function<bool(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> QueryWorker<DomainType>::getFilter(const QSet<QByteArray> remainingFilters, const Akonadi2::Query &query)
231{ 302{
232 return [remainingFilters, query](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject) -> bool { 303 return [remainingFilters, query](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject) -> bool {
233 if (!query.ids.isEmpty()) { 304 if (!query.ids.isEmpty()) {
@@ -252,7 +323,7 @@ std::function<bool(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr
252} 323}
253 324
254template<class DomainType> 325template<class DomainType>
255qint64 QueryRunner<DomainType>::load(const Akonadi2::Query &query, const std::function<ResultSet(Akonadi2::Storage::Transaction &, QSet<QByteArray> &)> &baseSetRetriever, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, bool initialQuery) 326qint64 QueryWorker<DomainType>::load(const Akonadi2::Query &query, const std::function<ResultSet(Akonadi2::Storage::Transaction &, QSet<QByteArray> &)> &baseSetRetriever, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, bool initialQuery)
256{ 327{
257 Akonadi2::Storage storage(Akonadi2::storageLocation(), mResourceInstanceIdentifier); 328 Akonadi2::Storage storage(Akonadi2::storageLocation(), mResourceInstanceIdentifier);
258 storage.setDefaultErrorHandler([](const Akonadi2::Storage::Error &error) { 329 storage.setDefaultErrorHandler([](const Akonadi2::Storage::Error &error) {
@@ -269,9 +340,8 @@ qint64 QueryRunner<DomainType>::load(const Akonadi2::Query &query, const std::fu
269 return Akonadi2::Storage::maxRevision(transaction); 340 return Akonadi2::Storage::maxRevision(transaction);
270} 341}
271 342
272
273template<class DomainType> 343template<class DomainType>
274qint64 QueryRunner<DomainType>::executeIncrementalQuery(const Akonadi2::Query &query, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider) 344qint64 QueryWorker<DomainType>::executeIncrementalQuery(const Akonadi2::Query &query, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider)
275{ 345{
276 QTime time; 346 QTime time;
277 time.start(); 347 time.start();
@@ -286,7 +356,7 @@ qint64 QueryRunner<DomainType>::executeIncrementalQuery(const Akonadi2::Query &q
286} 356}
287 357
288template<class DomainType> 358template<class DomainType>
289qint64 QueryRunner<DomainType>::executeInitialQuery(const Akonadi2::Query &query, const typename DomainType::Ptr &parent, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider) 359qint64 QueryWorker<DomainType>::executeInitialQuery(const Akonadi2::Query &query, const typename DomainType::Ptr &parent, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider)
290{ 360{
291 QTime time; 361 QTime time;
292 time.start(); 362 time.start();
@@ -312,3 +382,6 @@ qint64 QueryRunner<DomainType>::executeInitialQuery(const Akonadi2::Query &query
312template class QueryRunner<Akonadi2::ApplicationDomain::Folder>; 382template class QueryRunner<Akonadi2::ApplicationDomain::Folder>;
313template class QueryRunner<Akonadi2::ApplicationDomain::Mail>; 383template class QueryRunner<Akonadi2::ApplicationDomain::Mail>;
314template class QueryRunner<Akonadi2::ApplicationDomain::Event>; 384template class QueryRunner<Akonadi2::ApplicationDomain::Event>;
385template class QueryWorker<Akonadi2::ApplicationDomain::Folder>;
386template class QueryWorker<Akonadi2::ApplicationDomain::Mail>;
387template class QueryWorker<Akonadi2::ApplicationDomain::Event>;