diff options
author | Christian Mollekopf <chrigi_1@fastmail.fm> | 2015-12-27 10:50:18 +0100 |
---|---|---|
committer | Christian Mollekopf <chrigi_1@fastmail.fm> | 2015-12-27 10:52:56 +0100 |
commit | 2b012938ac0adaa173705c931e12f40184036183 (patch) | |
tree | ed1f65aa5c1435ef1a4dba6829d306bd1dfbf453 /common/queryrunner.cpp | |
parent | 5eb17e7eab0cbbed0f7b7df84d745f228446703d (diff) | |
download | sink-2b012938ac0adaa173705c931e12f40184036183.tar.gz sink-2b012938ac0adaa173705c931e12f40184036183.zip |
Threaded query runner implementation
All database access is now implemented in threads, to avoid
blocking the main thread. The resource communication still resides in
the main thread to keep the coordination simple.
With it comes a test that ensures we don't block the main thread for
too long.
Diffstat (limited to 'common/queryrunner.cpp')
-rw-r--r-- | common/queryrunner.cpp | 169 |
1 files changed, 121 insertions, 48 deletions
diff --git a/common/queryrunner.cpp b/common/queryrunner.cpp index 25c9d5b..af232c3 100644 --- a/common/queryrunner.cpp +++ b/common/queryrunner.cpp | |||
@@ -30,57 +30,89 @@ | |||
30 | 30 | ||
31 | using namespace Akonadi2; | 31 | using namespace Akonadi2; |
32 | 32 | ||
33 | static inline ResultSet fullScan(const Akonadi2::Storage::Transaction &transaction, const QByteArray &bufferType) | 33 | /* |
34 | * This class wraps the actual query implementation. | ||
35 | * | ||
36 | * This is a worker object that can be moved to a thread to execute the query. | ||
37 | * The only interaction point is the ResultProvider, which handles the threadsafe reporting of the result. | ||
38 | */ | ||
39 | template<typename DomainType> | ||
40 | class QueryWorker : public QObject | ||
34 | { | 41 | { |
35 | //TODO use a result set with an iterator, to read values on demand | 42 | public: |
36 | QVector<QByteArray> keys; | 43 | QueryWorker(const Akonadi2::Query &query, const QByteArray &instanceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &, const QByteArray &bufferType); |
37 | transaction.openDatabase(bufferType + ".main").scan(QByteArray(), [&](const QByteArray &key, const QByteArray &value) -> bool { | 44 | virtual ~QueryWorker(); |
38 | //Skip internals | 45 | |
39 | if (Akonadi2::Storage::isInternalKey(key)) { | 46 | qint64 executeIncrementalQuery(const Akonadi2::Query &query, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider); |
40 | return true; | 47 | qint64 executeInitialQuery(const Akonadi2::Query &query, const typename DomainType::Ptr &parent, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider); |
41 | } | 48 | |
42 | keys << Akonadi2::Storage::uidFromKey(key); | 49 | private: |
43 | return true; | 50 | static void replaySet(ResultSet &resultSet, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, const QList<QByteArray> &properties); |
44 | }, | 51 | |
45 | [](const Akonadi2::Storage::Error &error) { | 52 | void readEntity(const Akonadi2::Storage::NamedDatabase &db, const QByteArray &key, const std::function<void(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &, Akonadi2::Operation)> &resultCallback); |
46 | qWarning() << "Error during query: " << error.message; | 53 | |
47 | }); | 54 | ResultSet loadInitialResultSet(const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters); |
55 | ResultSet loadIncrementalResultSet(qint64 baseRevision, const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters); | ||
56 | |||
57 | ResultSet filterSet(const ResultSet &resultSet, const std::function<bool(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> &filter, const Akonadi2::Storage::NamedDatabase &db, bool initialQuery); | ||
58 | std::function<bool(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> getFilter(const QSet<QByteArray> remainingFilters, const Akonadi2::Query &query); | ||
59 | qint64 load(const Akonadi2::Query &query, const std::function<ResultSet(Akonadi2::Storage::Transaction &, QSet<QByteArray> &)> &baseSetRetriever, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, bool initialQuery); | ||
60 | |||
61 | private: | ||
62 | DomainTypeAdaptorFactoryInterface::Ptr mDomainTypeAdaptorFactory; | ||
63 | QByteArray mResourceInstanceIdentifier; | ||
64 | QByteArray mBufferType; | ||
65 | Akonadi2::Query mQuery; | ||
66 | }; | ||
48 | 67 | ||
49 | Trace() << "Full scan on " << bufferType << " found " << keys.size() << " results"; | ||
50 | return ResultSet(keys); | ||
51 | } | ||
52 | 68 | ||
53 | template<class DomainType> | 69 | template<class DomainType> |
54 | QueryRunner<DomainType>::QueryRunner(const Akonadi2::Query &query, const Akonadi2::ResourceAccessInterface::Ptr &resourceAccess, const QByteArray &instanceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &factory, const QByteArray &bufferType) | 70 | QueryRunner<DomainType>::QueryRunner(const Akonadi2::Query &query, const Akonadi2::ResourceAccessInterface::Ptr &resourceAccess, const QByteArray &instanceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &factory, const QByteArray &bufferType) |
55 | : QueryRunnerBase(), | 71 | : QueryRunnerBase(), |
56 | mResultProvider(new ResultProvider<typename DomainType::Ptr>), | ||
57 | mResourceAccess(resourceAccess), | 72 | mResourceAccess(resourceAccess), |
58 | mDomainTypeAdaptorFactory(factory), | 73 | mResultProvider(new ResultProvider<typename DomainType::Ptr>) |
59 | mResourceInstanceIdentifier(instanceIdentifier), | ||
60 | mBufferType(bufferType), | ||
61 | mQuery(query) | ||
62 | { | 74 | { |
63 | Trace() << "Starting query"; | 75 | Trace() << "Starting query"; |
64 | //We delegate loading of initial data to the result provider, os it can decide for itself what it needs to load. | 76 | //We delegate loading of initial data to the result provider, os it can decide for itself what it needs to load. |
65 | mResultProvider->setFetcher([this, query](const typename DomainType::Ptr &parent) { | 77 | mResultProvider->setFetcher([this, query, instanceIdentifier, factory, bufferType](const typename DomainType::Ptr &parent) { |
66 | Trace() << "Running fetcher"; | 78 | Trace() << "Running fetcher"; |
67 | const qint64 newRevision = executeInitialQuery(query, parent, *mResultProvider); | 79 | auto resultProvider = mResultProvider; |
68 | //Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. | 80 | auto result = QtConcurrent::run([query, instanceIdentifier, factory, bufferType, parent, resultProvider]() -> qint64 { |
69 | if (query.liveQuery) { | 81 | QueryWorker<DomainType> worker(query, instanceIdentifier, factory, bufferType); |
70 | mResourceAccess->sendRevisionReplayedCommand(newRevision); | 82 | const qint64 newRevision = worker.executeInitialQuery(query, parent, *resultProvider); |
71 | } | 83 | return newRevision; |
84 | }); | ||
85 | //Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. | ||
86 | if (query.liveQuery) { | ||
87 | auto watcher = new QFutureWatcher<qint64>; | ||
88 | watcher->setFuture(result); | ||
89 | QObject::connect(watcher, &QFutureWatcher<qint64>::finished, watcher, [this, watcher]() { | ||
90 | const auto newRevision = watcher->future().result(); | ||
91 | mResourceAccess->sendRevisionReplayedCommand(newRevision); | ||
92 | delete watcher; | ||
93 | }); | ||
94 | } | ||
72 | }); | 95 | }); |
73 | 96 | ||
74 | 97 | // In case of a live query we keep the runner for as long alive as the result provider exists | |
75 | //In case of a live query we keep the runner for as long alive as the result provider exists | ||
76 | if (query.liveQuery) { | 98 | if (query.liveQuery) { |
77 | //Incremental updates are always loaded directly, leaving it up to the result to discard the changes if they are not interesting | 99 | //Incremental updates are always loaded directly, leaving it up to the result to discard the changes if they are not interesting |
78 | setQuery([this, query] () -> KAsync::Job<void> { | 100 | setQuery([this, query, instanceIdentifier, factory, bufferType] () -> KAsync::Job<void> { |
79 | return KAsync::start<void>([this, query](KAsync::Future<void> &future) { | 101 | return KAsync::start<void>([this, query, instanceIdentifier, factory, bufferType](KAsync::Future<void> &future) { |
80 | //TODO execute in thread | 102 | auto resultProvider = mResultProvider; |
81 | const qint64 newRevision = executeIncrementalQuery(query, *mResultProvider); | 103 | auto result = QtConcurrent::run([query, instanceIdentifier, factory, bufferType, resultProvider]() -> qint64 { |
82 | mResourceAccess->sendRevisionReplayedCommand(newRevision); | 104 | QueryWorker<DomainType> worker(query, instanceIdentifier, factory, bufferType); |
83 | future.setFinished(); | 105 | const qint64 newRevision = worker.executeIncrementalQuery(query, *resultProvider); |
106 | return newRevision; | ||
107 | }); | ||
108 | auto watcher = new QFutureWatcher<qint64>; | ||
109 | watcher->setFuture(result); | ||
110 | QObject::connect(watcher, &QFutureWatcher<qint64>::finished, watcher, [this, &future, watcher]() { | ||
111 | const auto newRevision = watcher->future().result(); | ||
112 | mResourceAccess->sendRevisionReplayedCommand(newRevision); | ||
113 | future.setFinished(); | ||
114 | delete watcher; | ||
115 | }); | ||
84 | }); | 116 | }); |
85 | }); | 117 | }); |
86 | //Ensure the connection is open, if it wasn't already opened | 118 | //Ensure the connection is open, if it wasn't already opened |
@@ -102,8 +134,48 @@ typename Akonadi2::ResultEmitter<typename DomainType::Ptr>::Ptr QueryRunner<Doma | |||
102 | return mResultProvider->emitter(); | 134 | return mResultProvider->emitter(); |
103 | } | 135 | } |
104 | 136 | ||
137 | |||
138 | |||
139 | static inline ResultSet fullScan(const Akonadi2::Storage::Transaction &transaction, const QByteArray &bufferType) | ||
140 | { | ||
141 | //TODO use a result set with an iterator, to read values on demand | ||
142 | QVector<QByteArray> keys; | ||
143 | transaction.openDatabase(bufferType + ".main").scan(QByteArray(), [&](const QByteArray &key, const QByteArray &value) -> bool { | ||
144 | //Skip internals | ||
145 | if (Akonadi2::Storage::isInternalKey(key)) { | ||
146 | return true; | ||
147 | } | ||
148 | keys << Akonadi2::Storage::uidFromKey(key); | ||
149 | return true; | ||
150 | }, | ||
151 | [](const Akonadi2::Storage::Error &error) { | ||
152 | qWarning() << "Error during query: " << error.message; | ||
153 | }); | ||
154 | |||
155 | Trace() << "Full scan on " << bufferType << " found " << keys.size() << " results"; | ||
156 | return ResultSet(keys); | ||
157 | } | ||
158 | |||
159 | |||
105 | template<class DomainType> | 160 | template<class DomainType> |
106 | void QueryRunner<DomainType>::replaySet(ResultSet &resultSet, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, const QList<QByteArray> &properties) | 161 | QueryWorker<DomainType>::QueryWorker(const Akonadi2::Query &query, const QByteArray &instanceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &factory, const QByteArray &bufferType) |
162 | : QObject(), | ||
163 | mDomainTypeAdaptorFactory(factory), | ||
164 | mResourceInstanceIdentifier(instanceIdentifier), | ||
165 | mBufferType(bufferType), | ||
166 | mQuery(query) | ||
167 | { | ||
168 | Trace() << "Starting query worker"; | ||
169 | } | ||
170 | |||
171 | template<class DomainType> | ||
172 | QueryWorker<DomainType>::~QueryWorker() | ||
173 | { | ||
174 | Trace() << "Stopped query worker"; | ||
175 | } | ||
176 | |||
177 | template<class DomainType> | ||
178 | void QueryWorker<DomainType>::replaySet(ResultSet &resultSet, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, const QList<QByteArray> &properties) | ||
107 | { | 179 | { |
108 | int counter = 0; | 180 | int counter = 0; |
109 | while (resultSet.next([&resultProvider, &counter, &properties](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &value, Akonadi2::Operation operation) -> bool { | 181 | while (resultSet.next([&resultProvider, &counter, &properties](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &value, Akonadi2::Operation operation) -> bool { |
@@ -128,7 +200,7 @@ void QueryRunner<DomainType>::replaySet(ResultSet &resultSet, Akonadi2::ResultPr | |||
128 | } | 200 | } |
129 | 201 | ||
130 | template<class DomainType> | 202 | template<class DomainType> |
131 | void QueryRunner<DomainType>::readEntity(const Akonadi2::Storage::NamedDatabase &db, const QByteArray &key, const std::function<void(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &, Akonadi2::Operation)> &resultCallback) | 203 | void QueryWorker<DomainType>::readEntity(const Akonadi2::Storage::NamedDatabase &db, const QByteArray &key, const std::function<void(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &, Akonadi2::Operation)> &resultCallback) |
132 | { | 204 | { |
133 | //This only works for a 1:1 mapping of resource to domain types. | 205 | //This only works for a 1:1 mapping of resource to domain types. |
134 | //Not i.e. for tags that are stored as flags in each entity of an imap store. | 206 | //Not i.e. for tags that are stored as flags in each entity of an imap store. |
@@ -150,7 +222,7 @@ void QueryRunner<DomainType>::readEntity(const Akonadi2::Storage::NamedDatabase | |||
150 | } | 222 | } |
151 | 223 | ||
152 | template<class DomainType> | 224 | template<class DomainType> |
153 | ResultSet QueryRunner<DomainType>::loadInitialResultSet(const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters) | 225 | ResultSet QueryWorker<DomainType>::loadInitialResultSet(const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters) |
154 | { | 226 | { |
155 | if (!query.ids.isEmpty()) { | 227 | if (!query.ids.isEmpty()) { |
156 | return ResultSet(query.ids.toVector()); | 228 | return ResultSet(query.ids.toVector()); |
@@ -168,7 +240,7 @@ ResultSet QueryRunner<DomainType>::loadInitialResultSet(const Akonadi2::Query &q | |||
168 | } | 240 | } |
169 | 241 | ||
170 | template<class DomainType> | 242 | template<class DomainType> |
171 | ResultSet QueryRunner<DomainType>::loadIncrementalResultSet(qint64 baseRevision, const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters) | 243 | ResultSet QueryWorker<DomainType>::loadIncrementalResultSet(qint64 baseRevision, const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters) |
172 | { | 244 | { |
173 | const auto bufferType = mBufferType; | 245 | const auto bufferType = mBufferType; |
174 | auto revisionCounter = QSharedPointer<qint64>::create(baseRevision); | 246 | auto revisionCounter = QSharedPointer<qint64>::create(baseRevision); |
@@ -196,7 +268,7 @@ ResultSet QueryRunner<DomainType>::loadIncrementalResultSet(qint64 baseRevision, | |||
196 | } | 268 | } |
197 | 269 | ||
198 | template<class DomainType> | 270 | template<class DomainType> |
199 | ResultSet QueryRunner<DomainType>::filterSet(const ResultSet &resultSet, const std::function<bool(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> &filter, const Akonadi2::Storage::NamedDatabase &db, bool initialQuery) | 271 | ResultSet QueryWorker<DomainType>::filterSet(const ResultSet &resultSet, const std::function<bool(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> &filter, const Akonadi2::Storage::NamedDatabase &db, bool initialQuery) |
200 | { | 272 | { |
201 | auto resultSetPtr = QSharedPointer<ResultSet>::create(resultSet); | 273 | auto resultSetPtr = QSharedPointer<ResultSet>::create(resultSet); |
202 | 274 | ||
@@ -225,9 +297,8 @@ ResultSet QueryRunner<DomainType>::filterSet(const ResultSet &resultSet, const s | |||
225 | return ResultSet(generator); | 297 | return ResultSet(generator); |
226 | } | 298 | } |
227 | 299 | ||
228 | |||
229 | template<class DomainType> | 300 | template<class DomainType> |
230 | std::function<bool(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> QueryRunner<DomainType>::getFilter(const QSet<QByteArray> remainingFilters, const Akonadi2::Query &query) | 301 | std::function<bool(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> QueryWorker<DomainType>::getFilter(const QSet<QByteArray> remainingFilters, const Akonadi2::Query &query) |
231 | { | 302 | { |
232 | return [remainingFilters, query](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject) -> bool { | 303 | return [remainingFilters, query](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject) -> bool { |
233 | if (!query.ids.isEmpty()) { | 304 | if (!query.ids.isEmpty()) { |
@@ -252,7 +323,7 @@ std::function<bool(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr | |||
252 | } | 323 | } |
253 | 324 | ||
254 | template<class DomainType> | 325 | template<class DomainType> |
255 | qint64 QueryRunner<DomainType>::load(const Akonadi2::Query &query, const std::function<ResultSet(Akonadi2::Storage::Transaction &, QSet<QByteArray> &)> &baseSetRetriever, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, bool initialQuery) | 326 | qint64 QueryWorker<DomainType>::load(const Akonadi2::Query &query, const std::function<ResultSet(Akonadi2::Storage::Transaction &, QSet<QByteArray> &)> &baseSetRetriever, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, bool initialQuery) |
256 | { | 327 | { |
257 | Akonadi2::Storage storage(Akonadi2::storageLocation(), mResourceInstanceIdentifier); | 328 | Akonadi2::Storage storage(Akonadi2::storageLocation(), mResourceInstanceIdentifier); |
258 | storage.setDefaultErrorHandler([](const Akonadi2::Storage::Error &error) { | 329 | storage.setDefaultErrorHandler([](const Akonadi2::Storage::Error &error) { |
@@ -269,9 +340,8 @@ qint64 QueryRunner<DomainType>::load(const Akonadi2::Query &query, const std::fu | |||
269 | return Akonadi2::Storage::maxRevision(transaction); | 340 | return Akonadi2::Storage::maxRevision(transaction); |
270 | } | 341 | } |
271 | 342 | ||
272 | |||
273 | template<class DomainType> | 343 | template<class DomainType> |
274 | qint64 QueryRunner<DomainType>::executeIncrementalQuery(const Akonadi2::Query &query, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider) | 344 | qint64 QueryWorker<DomainType>::executeIncrementalQuery(const Akonadi2::Query &query, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider) |
275 | { | 345 | { |
276 | QTime time; | 346 | QTime time; |
277 | time.start(); | 347 | time.start(); |
@@ -286,7 +356,7 @@ qint64 QueryRunner<DomainType>::executeIncrementalQuery(const Akonadi2::Query &q | |||
286 | } | 356 | } |
287 | 357 | ||
288 | template<class DomainType> | 358 | template<class DomainType> |
289 | qint64 QueryRunner<DomainType>::executeInitialQuery(const Akonadi2::Query &query, const typename DomainType::Ptr &parent, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider) | 359 | qint64 QueryWorker<DomainType>::executeInitialQuery(const Akonadi2::Query &query, const typename DomainType::Ptr &parent, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider) |
290 | { | 360 | { |
291 | QTime time; | 361 | QTime time; |
292 | time.start(); | 362 | time.start(); |
@@ -312,3 +382,6 @@ qint64 QueryRunner<DomainType>::executeInitialQuery(const Akonadi2::Query &query | |||
312 | template class QueryRunner<Akonadi2::ApplicationDomain::Folder>; | 382 | template class QueryRunner<Akonadi2::ApplicationDomain::Folder>; |
313 | template class QueryRunner<Akonadi2::ApplicationDomain::Mail>; | 383 | template class QueryRunner<Akonadi2::ApplicationDomain::Mail>; |
314 | template class QueryRunner<Akonadi2::ApplicationDomain::Event>; | 384 | template class QueryRunner<Akonadi2::ApplicationDomain::Event>; |
385 | template class QueryWorker<Akonadi2::ApplicationDomain::Folder>; | ||
386 | template class QueryWorker<Akonadi2::ApplicationDomain::Mail>; | ||
387 | template class QueryWorker<Akonadi2::ApplicationDomain::Event>; | ||