diff options
author | Christian Mollekopf <chrigi_1@fastmail.fm> | 2015-12-27 10:50:18 +0100 |
---|---|---|
committer | Christian Mollekopf <chrigi_1@fastmail.fm> | 2015-12-27 10:52:56 +0100 |
commit | 2b012938ac0adaa173705c931e12f40184036183 (patch) | |
tree | ed1f65aa5c1435ef1a4dba6829d306bd1dfbf453 | |
parent | 5eb17e7eab0cbbed0f7b7df84d745f228446703d (diff) | |
download | sink-2b012938ac0adaa173705c931e12f40184036183.tar.gz sink-2b012938ac0adaa173705c931e12f40184036183.zip |
Threaded query runner implementation
All database access is now implemented in threads, to avoid
blocking the main thread. The resource communication still resides in
the main thread to keep the coordination simple.
With it comes a test that ensures we don't block the main thread for
too long.
-rw-r--r-- | common/queryrunner.cpp | 169 | ||||
-rw-r--r-- | common/queryrunner.h | 39 | ||||
-rw-r--r-- | tests/CMakeLists.txt | 2 | ||||
-rw-r--r-- | tests/modelinteractivitytest.cpp | 101 |
4 files changed, 235 insertions, 76 deletions
diff --git a/common/queryrunner.cpp b/common/queryrunner.cpp index 25c9d5b..af232c3 100644 --- a/common/queryrunner.cpp +++ b/common/queryrunner.cpp | |||
@@ -30,57 +30,89 @@ | |||
30 | 30 | ||
31 | using namespace Akonadi2; | 31 | using namespace Akonadi2; |
32 | 32 | ||
33 | static inline ResultSet fullScan(const Akonadi2::Storage::Transaction &transaction, const QByteArray &bufferType) | 33 | /* |
34 | * This class wraps the actual query implementation. | ||
35 | * | ||
36 | * This is a worker object that can be moved to a thread to execute the query. | ||
37 | * The only interaction point is the ResultProvider, which handles the threadsafe reporting of the result. | ||
38 | */ | ||
39 | template<typename DomainType> | ||
40 | class QueryWorker : public QObject | ||
34 | { | 41 | { |
35 | //TODO use a result set with an iterator, to read values on demand | 42 | public: |
36 | QVector<QByteArray> keys; | 43 | QueryWorker(const Akonadi2::Query &query, const QByteArray &instanceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &, const QByteArray &bufferType); |
37 | transaction.openDatabase(bufferType + ".main").scan(QByteArray(), [&](const QByteArray &key, const QByteArray &value) -> bool { | 44 | virtual ~QueryWorker(); |
38 | //Skip internals | 45 | |
39 | if (Akonadi2::Storage::isInternalKey(key)) { | 46 | qint64 executeIncrementalQuery(const Akonadi2::Query &query, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider); |
40 | return true; | 47 | qint64 executeInitialQuery(const Akonadi2::Query &query, const typename DomainType::Ptr &parent, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider); |
41 | } | 48 | |
42 | keys << Akonadi2::Storage::uidFromKey(key); | 49 | private: |
43 | return true; | 50 | static void replaySet(ResultSet &resultSet, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, const QList<QByteArray> &properties); |
44 | }, | 51 | |
45 | [](const Akonadi2::Storage::Error &error) { | 52 | void readEntity(const Akonadi2::Storage::NamedDatabase &db, const QByteArray &key, const std::function<void(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &, Akonadi2::Operation)> &resultCallback); |
46 | qWarning() << "Error during query: " << error.message; | 53 | |
47 | }); | 54 | ResultSet loadInitialResultSet(const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters); |
55 | ResultSet loadIncrementalResultSet(qint64 baseRevision, const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters); | ||
56 | |||
57 | ResultSet filterSet(const ResultSet &resultSet, const std::function<bool(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> &filter, const Akonadi2::Storage::NamedDatabase &db, bool initialQuery); | ||
58 | std::function<bool(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> getFilter(const QSet<QByteArray> remainingFilters, const Akonadi2::Query &query); | ||
59 | qint64 load(const Akonadi2::Query &query, const std::function<ResultSet(Akonadi2::Storage::Transaction &, QSet<QByteArray> &)> &baseSetRetriever, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, bool initialQuery); | ||
60 | |||
61 | private: | ||
62 | DomainTypeAdaptorFactoryInterface::Ptr mDomainTypeAdaptorFactory; | ||
63 | QByteArray mResourceInstanceIdentifier; | ||
64 | QByteArray mBufferType; | ||
65 | Akonadi2::Query mQuery; | ||
66 | }; | ||
48 | 67 | ||
49 | Trace() << "Full scan on " << bufferType << " found " << keys.size() << " results"; | ||
50 | return ResultSet(keys); | ||
51 | } | ||
52 | 68 | ||
53 | template<class DomainType> | 69 | template<class DomainType> |
54 | QueryRunner<DomainType>::QueryRunner(const Akonadi2::Query &query, const Akonadi2::ResourceAccessInterface::Ptr &resourceAccess, const QByteArray &instanceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &factory, const QByteArray &bufferType) | 70 | QueryRunner<DomainType>::QueryRunner(const Akonadi2::Query &query, const Akonadi2::ResourceAccessInterface::Ptr &resourceAccess, const QByteArray &instanceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &factory, const QByteArray &bufferType) |
55 | : QueryRunnerBase(), | 71 | : QueryRunnerBase(), |
56 | mResultProvider(new ResultProvider<typename DomainType::Ptr>), | ||
57 | mResourceAccess(resourceAccess), | 72 | mResourceAccess(resourceAccess), |
58 | mDomainTypeAdaptorFactory(factory), | 73 | mResultProvider(new ResultProvider<typename DomainType::Ptr>) |
59 | mResourceInstanceIdentifier(instanceIdentifier), | ||
60 | mBufferType(bufferType), | ||
61 | mQuery(query) | ||
62 | { | 74 | { |
63 | Trace() << "Starting query"; | 75 | Trace() << "Starting query"; |
64 | //We delegate loading of initial data to the result provider, os it can decide for itself what it needs to load. | 76 | //We delegate loading of initial data to the result provider, os it can decide for itself what it needs to load. |
65 | mResultProvider->setFetcher([this, query](const typename DomainType::Ptr &parent) { | 77 | mResultProvider->setFetcher([this, query, instanceIdentifier, factory, bufferType](const typename DomainType::Ptr &parent) { |
66 | Trace() << "Running fetcher"; | 78 | Trace() << "Running fetcher"; |
67 | const qint64 newRevision = executeInitialQuery(query, parent, *mResultProvider); | 79 | auto resultProvider = mResultProvider; |
68 | //Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. | 80 | auto result = QtConcurrent::run([query, instanceIdentifier, factory, bufferType, parent, resultProvider]() -> qint64 { |
69 | if (query.liveQuery) { | 81 | QueryWorker<DomainType> worker(query, instanceIdentifier, factory, bufferType); |
70 | mResourceAccess->sendRevisionReplayedCommand(newRevision); | 82 | const qint64 newRevision = worker.executeInitialQuery(query, parent, *resultProvider); |
71 | } | 83 | return newRevision; |
84 | }); | ||
85 | //Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. | ||
86 | if (query.liveQuery) { | ||
87 | auto watcher = new QFutureWatcher<qint64>; | ||
88 | watcher->setFuture(result); | ||
89 | QObject::connect(watcher, &QFutureWatcher<qint64>::finished, watcher, [this, watcher]() { | ||
90 | const auto newRevision = watcher->future().result(); | ||
91 | mResourceAccess->sendRevisionReplayedCommand(newRevision); | ||
92 | delete watcher; | ||
93 | }); | ||
94 | } | ||
72 | }); | 95 | }); |
73 | 96 | ||
74 | 97 | // In case of a live query we keep the runner for as long alive as the result provider exists | |
75 | //In case of a live query we keep the runner for as long alive as the result provider exists | ||
76 | if (query.liveQuery) { | 98 | if (query.liveQuery) { |
77 | //Incremental updates are always loaded directly, leaving it up to the result to discard the changes if they are not interesting | 99 | //Incremental updates are always loaded directly, leaving it up to the result to discard the changes if they are not interesting |
78 | setQuery([this, query] () -> KAsync::Job<void> { | 100 | setQuery([this, query, instanceIdentifier, factory, bufferType] () -> KAsync::Job<void> { |
79 | return KAsync::start<void>([this, query](KAsync::Future<void> &future) { | 101 | return KAsync::start<void>([this, query, instanceIdentifier, factory, bufferType](KAsync::Future<void> &future) { |
80 | //TODO execute in thread | 102 | auto resultProvider = mResultProvider; |
81 | const qint64 newRevision = executeIncrementalQuery(query, *mResultProvider); | 103 | auto result = QtConcurrent::run([query, instanceIdentifier, factory, bufferType, resultProvider]() -> qint64 { |
82 | mResourceAccess->sendRevisionReplayedCommand(newRevision); | 104 | QueryWorker<DomainType> worker(query, instanceIdentifier, factory, bufferType); |
83 | future.setFinished(); | 105 | const qint64 newRevision = worker.executeIncrementalQuery(query, *resultProvider); |
106 | return newRevision; | ||
107 | }); | ||
108 | auto watcher = new QFutureWatcher<qint64>; | ||
109 | watcher->setFuture(result); | ||
110 | QObject::connect(watcher, &QFutureWatcher<qint64>::finished, watcher, [this, &future, watcher]() { | ||
111 | const auto newRevision = watcher->future().result(); | ||
112 | mResourceAccess->sendRevisionReplayedCommand(newRevision); | ||
113 | future.setFinished(); | ||
114 | delete watcher; | ||
115 | }); | ||
84 | }); | 116 | }); |
85 | }); | 117 | }); |
86 | //Ensure the connection is open, if it wasn't already opened | 118 | //Ensure the connection is open, if it wasn't already opened |
@@ -102,8 +134,48 @@ typename Akonadi2::ResultEmitter<typename DomainType::Ptr>::Ptr QueryRunner<Doma | |||
102 | return mResultProvider->emitter(); | 134 | return mResultProvider->emitter(); |
103 | } | 135 | } |
104 | 136 | ||
137 | |||
138 | |||
139 | static inline ResultSet fullScan(const Akonadi2::Storage::Transaction &transaction, const QByteArray &bufferType) | ||
140 | { | ||
141 | //TODO use a result set with an iterator, to read values on demand | ||
142 | QVector<QByteArray> keys; | ||
143 | transaction.openDatabase(bufferType + ".main").scan(QByteArray(), [&](const QByteArray &key, const QByteArray &value) -> bool { | ||
144 | //Skip internals | ||
145 | if (Akonadi2::Storage::isInternalKey(key)) { | ||
146 | return true; | ||
147 | } | ||
148 | keys << Akonadi2::Storage::uidFromKey(key); | ||
149 | return true; | ||
150 | }, | ||
151 | [](const Akonadi2::Storage::Error &error) { | ||
152 | qWarning() << "Error during query: " << error.message; | ||
153 | }); | ||
154 | |||
155 | Trace() << "Full scan on " << bufferType << " found " << keys.size() << " results"; | ||
156 | return ResultSet(keys); | ||
157 | } | ||
158 | |||
159 | |||
105 | template<class DomainType> | 160 | template<class DomainType> |
106 | void QueryRunner<DomainType>::replaySet(ResultSet &resultSet, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, const QList<QByteArray> &properties) | 161 | QueryWorker<DomainType>::QueryWorker(const Akonadi2::Query &query, const QByteArray &instanceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &factory, const QByteArray &bufferType) |
162 | : QObject(), | ||
163 | mDomainTypeAdaptorFactory(factory), | ||
164 | mResourceInstanceIdentifier(instanceIdentifier), | ||
165 | mBufferType(bufferType), | ||
166 | mQuery(query) | ||
167 | { | ||
168 | Trace() << "Starting query worker"; | ||
169 | } | ||
170 | |||
171 | template<class DomainType> | ||
172 | QueryWorker<DomainType>::~QueryWorker() | ||
173 | { | ||
174 | Trace() << "Stopped query worker"; | ||
175 | } | ||
176 | |||
177 | template<class DomainType> | ||
178 | void QueryWorker<DomainType>::replaySet(ResultSet &resultSet, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, const QList<QByteArray> &properties) | ||
107 | { | 179 | { |
108 | int counter = 0; | 180 | int counter = 0; |
109 | while (resultSet.next([&resultProvider, &counter, &properties](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &value, Akonadi2::Operation operation) -> bool { | 181 | while (resultSet.next([&resultProvider, &counter, &properties](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &value, Akonadi2::Operation operation) -> bool { |
@@ -128,7 +200,7 @@ void QueryRunner<DomainType>::replaySet(ResultSet &resultSet, Akonadi2::ResultPr | |||
128 | } | 200 | } |
129 | 201 | ||
130 | template<class DomainType> | 202 | template<class DomainType> |
131 | void QueryRunner<DomainType>::readEntity(const Akonadi2::Storage::NamedDatabase &db, const QByteArray &key, const std::function<void(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &, Akonadi2::Operation)> &resultCallback) | 203 | void QueryWorker<DomainType>::readEntity(const Akonadi2::Storage::NamedDatabase &db, const QByteArray &key, const std::function<void(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &, Akonadi2::Operation)> &resultCallback) |
132 | { | 204 | { |
133 | //This only works for a 1:1 mapping of resource to domain types. | 205 | //This only works for a 1:1 mapping of resource to domain types. |
134 | //Not i.e. for tags that are stored as flags in each entity of an imap store. | 206 | //Not i.e. for tags that are stored as flags in each entity of an imap store. |
@@ -150,7 +222,7 @@ void QueryRunner<DomainType>::readEntity(const Akonadi2::Storage::NamedDatabase | |||
150 | } | 222 | } |
151 | 223 | ||
152 | template<class DomainType> | 224 | template<class DomainType> |
153 | ResultSet QueryRunner<DomainType>::loadInitialResultSet(const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters) | 225 | ResultSet QueryWorker<DomainType>::loadInitialResultSet(const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters) |
154 | { | 226 | { |
155 | if (!query.ids.isEmpty()) { | 227 | if (!query.ids.isEmpty()) { |
156 | return ResultSet(query.ids.toVector()); | 228 | return ResultSet(query.ids.toVector()); |
@@ -168,7 +240,7 @@ ResultSet QueryRunner<DomainType>::loadInitialResultSet(const Akonadi2::Query &q | |||
168 | } | 240 | } |
169 | 241 | ||
170 | template<class DomainType> | 242 | template<class DomainType> |
171 | ResultSet QueryRunner<DomainType>::loadIncrementalResultSet(qint64 baseRevision, const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters) | 243 | ResultSet QueryWorker<DomainType>::loadIncrementalResultSet(qint64 baseRevision, const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters) |
172 | { | 244 | { |
173 | const auto bufferType = mBufferType; | 245 | const auto bufferType = mBufferType; |
174 | auto revisionCounter = QSharedPointer<qint64>::create(baseRevision); | 246 | auto revisionCounter = QSharedPointer<qint64>::create(baseRevision); |
@@ -196,7 +268,7 @@ ResultSet QueryRunner<DomainType>::loadIncrementalResultSet(qint64 baseRevision, | |||
196 | } | 268 | } |
197 | 269 | ||
198 | template<class DomainType> | 270 | template<class DomainType> |
199 | ResultSet QueryRunner<DomainType>::filterSet(const ResultSet &resultSet, const std::function<bool(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> &filter, const Akonadi2::Storage::NamedDatabase &db, bool initialQuery) | 271 | ResultSet QueryWorker<DomainType>::filterSet(const ResultSet &resultSet, const std::function<bool(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> &filter, const Akonadi2::Storage::NamedDatabase &db, bool initialQuery) |
200 | { | 272 | { |
201 | auto resultSetPtr = QSharedPointer<ResultSet>::create(resultSet); | 273 | auto resultSetPtr = QSharedPointer<ResultSet>::create(resultSet); |
202 | 274 | ||
@@ -225,9 +297,8 @@ ResultSet QueryRunner<DomainType>::filterSet(const ResultSet &resultSet, const s | |||
225 | return ResultSet(generator); | 297 | return ResultSet(generator); |
226 | } | 298 | } |
227 | 299 | ||
228 | |||
229 | template<class DomainType> | 300 | template<class DomainType> |
230 | std::function<bool(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> QueryRunner<DomainType>::getFilter(const QSet<QByteArray> remainingFilters, const Akonadi2::Query &query) | 301 | std::function<bool(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> QueryWorker<DomainType>::getFilter(const QSet<QByteArray> remainingFilters, const Akonadi2::Query &query) |
231 | { | 302 | { |
232 | return [remainingFilters, query](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject) -> bool { | 303 | return [remainingFilters, query](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject) -> bool { |
233 | if (!query.ids.isEmpty()) { | 304 | if (!query.ids.isEmpty()) { |
@@ -252,7 +323,7 @@ std::function<bool(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr | |||
252 | } | 323 | } |
253 | 324 | ||
254 | template<class DomainType> | 325 | template<class DomainType> |
255 | qint64 QueryRunner<DomainType>::load(const Akonadi2::Query &query, const std::function<ResultSet(Akonadi2::Storage::Transaction &, QSet<QByteArray> &)> &baseSetRetriever, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, bool initialQuery) | 326 | qint64 QueryWorker<DomainType>::load(const Akonadi2::Query &query, const std::function<ResultSet(Akonadi2::Storage::Transaction &, QSet<QByteArray> &)> &baseSetRetriever, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, bool initialQuery) |
256 | { | 327 | { |
257 | Akonadi2::Storage storage(Akonadi2::storageLocation(), mResourceInstanceIdentifier); | 328 | Akonadi2::Storage storage(Akonadi2::storageLocation(), mResourceInstanceIdentifier); |
258 | storage.setDefaultErrorHandler([](const Akonadi2::Storage::Error &error) { | 329 | storage.setDefaultErrorHandler([](const Akonadi2::Storage::Error &error) { |
@@ -269,9 +340,8 @@ qint64 QueryRunner<DomainType>::load(const Akonadi2::Query &query, const std::fu | |||
269 | return Akonadi2::Storage::maxRevision(transaction); | 340 | return Akonadi2::Storage::maxRevision(transaction); |
270 | } | 341 | } |
271 | 342 | ||
272 | |||
273 | template<class DomainType> | 343 | template<class DomainType> |
274 | qint64 QueryRunner<DomainType>::executeIncrementalQuery(const Akonadi2::Query &query, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider) | 344 | qint64 QueryWorker<DomainType>::executeIncrementalQuery(const Akonadi2::Query &query, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider) |
275 | { | 345 | { |
276 | QTime time; | 346 | QTime time; |
277 | time.start(); | 347 | time.start(); |
@@ -286,7 +356,7 @@ qint64 QueryRunner<DomainType>::executeIncrementalQuery(const Akonadi2::Query &q | |||
286 | } | 356 | } |
287 | 357 | ||
288 | template<class DomainType> | 358 | template<class DomainType> |
289 | qint64 QueryRunner<DomainType>::executeInitialQuery(const Akonadi2::Query &query, const typename DomainType::Ptr &parent, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider) | 359 | qint64 QueryWorker<DomainType>::executeInitialQuery(const Akonadi2::Query &query, const typename DomainType::Ptr &parent, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider) |
290 | { | 360 | { |
291 | QTime time; | 361 | QTime time; |
292 | time.start(); | 362 | time.start(); |
@@ -312,3 +382,6 @@ qint64 QueryRunner<DomainType>::executeInitialQuery(const Akonadi2::Query &query | |||
312 | template class QueryRunner<Akonadi2::ApplicationDomain::Folder>; | 382 | template class QueryRunner<Akonadi2::ApplicationDomain::Folder>; |
313 | template class QueryRunner<Akonadi2::ApplicationDomain::Mail>; | 383 | template class QueryRunner<Akonadi2::ApplicationDomain::Mail>; |
314 | template class QueryRunner<Akonadi2::ApplicationDomain::Event>; | 384 | template class QueryRunner<Akonadi2::ApplicationDomain::Event>; |
385 | template class QueryWorker<Akonadi2::ApplicationDomain::Folder>; | ||
386 | template class QueryWorker<Akonadi2::ApplicationDomain::Mail>; | ||
387 | template class QueryWorker<Akonadi2::ApplicationDomain::Event>; | ||
diff --git a/common/queryrunner.h b/common/queryrunner.h index 8df0ecd..aba7912 100644 --- a/common/queryrunner.h +++ b/common/queryrunner.h | |||
@@ -20,7 +20,6 @@ | |||
20 | #pragma once | 20 | #pragma once |
21 | 21 | ||
22 | #include <QObject> | 22 | #include <QObject> |
23 | #include "facadeinterface.h" | ||
24 | #include "resourceaccess.h" | 23 | #include "resourceaccess.h" |
25 | #include "resultprovider.h" | 24 | #include "resultprovider.h" |
26 | #include "domaintypeadaptorfactoryinterface.h" | 25 | #include "domaintypeadaptorfactoryinterface.h" |
@@ -28,15 +27,8 @@ | |||
28 | #include "query.h" | 27 | #include "query.h" |
29 | 28 | ||
30 | /** | 29 | /** |
31 | * A QueryRunner runs a query and updates the corresponding result set. | 30 | * Base clase because you can't have the Q_OBJECT macro in template classes |
32 | * | ||
33 | * The lifetime of the QueryRunner is defined by the resut set (otherwise it's doing useless work), | ||
34 | * and by how long a result set must be updated. If the query is one off the runner dies after the execution, | ||
35 | * otherwise it lives on the react to changes and updates the corresponding result set. | ||
36 | * | ||
37 | * QueryRunner has to keep ResourceAccess alive in order to keep getting updates. | ||
38 | */ | 31 | */ |
39 | |||
40 | class QueryRunnerBase : public QObject | 32 | class QueryRunnerBase : public QObject |
41 | { | 33 | { |
42 | Q_OBJECT | 34 | Q_OBJECT |
@@ -74,6 +66,15 @@ private: | |||
74 | QueryFunction queryFunction; | 66 | QueryFunction queryFunction; |
75 | }; | 67 | }; |
76 | 68 | ||
69 | /** | ||
70 | * A QueryRunner runs a query and updates the corresponding result set. | ||
71 | * | ||
72 | * The lifetime of the QueryRunner is defined by the resut set (otherwise it's doing useless work), | ||
73 | * and by how long a result set must be updated. If the query is one off the runner dies after the execution, | ||
74 | * otherwise it lives on the react to changes and updates the corresponding result set. | ||
75 | * | ||
76 | * QueryRunner has to keep ResourceAccess alive in order to keep getting updates. | ||
77 | */ | ||
77 | template<typename DomainType> | 78 | template<typename DomainType> |
78 | class QueryRunner : public QueryRunnerBase | 79 | class QueryRunner : public QueryRunnerBase |
79 | { | 80 | { |
@@ -84,25 +85,7 @@ public: | |||
84 | typename Akonadi2::ResultEmitter<typename DomainType::Ptr>::Ptr emitter(); | 85 | typename Akonadi2::ResultEmitter<typename DomainType::Ptr>::Ptr emitter(); |
85 | 86 | ||
86 | private: | 87 | private: |
87 | static void replaySet(ResultSet &resultSet, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, const QList<QByteArray> &properties); | ||
88 | |||
89 | void readEntity(const Akonadi2::Storage::NamedDatabase &db, const QByteArray &key, const std::function<void(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &, Akonadi2::Operation)> &resultCallback); | ||
90 | |||
91 | ResultSet loadInitialResultSet(const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters); | ||
92 | ResultSet loadIncrementalResultSet(qint64 baseRevision, const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters); | ||
93 | |||
94 | ResultSet filterSet(const ResultSet &resultSet, const std::function<bool(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> &filter, const Akonadi2::Storage::NamedDatabase &db, bool initialQuery); | ||
95 | std::function<bool(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> getFilter(const QSet<QByteArray> remainingFilters, const Akonadi2::Query &query); | ||
96 | qint64 load(const Akonadi2::Query &query, const std::function<ResultSet(Akonadi2::Storage::Transaction &, QSet<QByteArray> &)> &baseSetRetriever, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, bool initialQuery); | ||
97 | qint64 executeIncrementalQuery(const Akonadi2::Query &query, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider); | ||
98 | qint64 executeInitialQuery(const Akonadi2::Query &query, const typename DomainType::Ptr &parent, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider); | ||
99 | |||
100 | private: | ||
101 | QSharedPointer<Akonadi2::ResultProvider<typename DomainType::Ptr> > mResultProvider; | ||
102 | QSharedPointer<Akonadi2::ResourceAccessInterface> mResourceAccess; | 88 | QSharedPointer<Akonadi2::ResourceAccessInterface> mResourceAccess; |
103 | DomainTypeAdaptorFactoryInterface::Ptr mDomainTypeAdaptorFactory; | 89 | QSharedPointer<Akonadi2::ResultProvider<typename DomainType::Ptr> > mResultProvider; |
104 | QByteArray mResourceInstanceIdentifier; | ||
105 | QByteArray mBufferType; | ||
106 | Akonadi2::Query mQuery; | ||
107 | }; | 90 | }; |
108 | 91 | ||
diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 5d64511..1e0f6b5 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt | |||
@@ -50,11 +50,13 @@ auto_tests ( | |||
50 | querytest | 50 | querytest |
51 | databasepopulationandfacadequerybenchmark | 51 | databasepopulationandfacadequerybenchmark |
52 | dummyresourcewritebenchmark | 52 | dummyresourcewritebenchmark |
53 | modelinteractivitytest | ||
53 | ) | 54 | ) |
54 | target_link_libraries(dummyresourcetest akonadi2_resource_dummy) | 55 | target_link_libraries(dummyresourcetest akonadi2_resource_dummy) |
55 | target_link_libraries(dummyresourcebenchmark akonadi2_resource_dummy) | 56 | target_link_libraries(dummyresourcebenchmark akonadi2_resource_dummy) |
56 | target_link_libraries(dummyresourcewritebenchmark akonadi2_resource_dummy) | 57 | target_link_libraries(dummyresourcewritebenchmark akonadi2_resource_dummy) |
57 | target_link_libraries(querytest akonadi2_resource_dummy) | 58 | target_link_libraries(querytest akonadi2_resource_dummy) |
59 | target_link_libraries(modelinteractivitytest akonadi2_resource_dummy) | ||
58 | 60 | ||
59 | if (BUILD_MAILDIR) | 61 | if (BUILD_MAILDIR) |
60 | auto_tests ( | 62 | auto_tests ( |
diff --git a/tests/modelinteractivitytest.cpp b/tests/modelinteractivitytest.cpp new file mode 100644 index 0000000..52db932 --- /dev/null +++ b/tests/modelinteractivitytest.cpp | |||
@@ -0,0 +1,101 @@ | |||
1 | #include <QtTest> | ||
2 | |||
3 | #include <QString> | ||
4 | #include <iostream> | ||
5 | |||
6 | #include "dummyresource/resourcefactory.h" | ||
7 | #include "clientapi.h" | ||
8 | #include "commands.h" | ||
9 | #include "resourceconfig.h" | ||
10 | #include "log.h" | ||
11 | #include "modelresult.h" | ||
12 | |||
13 | static int blockingTime; | ||
14 | |||
15 | class TimeMeasuringApplication : public QCoreApplication | ||
16 | { | ||
17 | QElapsedTimer t; | ||
18 | public: | ||
19 | TimeMeasuringApplication(int& argc, char ** argv) : QCoreApplication(argc, argv) { } | ||
20 | virtual ~TimeMeasuringApplication() { } | ||
21 | |||
22 | virtual bool notify(QObject* receiver, QEvent* event) | ||
23 | { | ||
24 | t.start(); | ||
25 | const bool ret = QCoreApplication::notify(receiver, event); | ||
26 | if(t.elapsed() > 1) | ||
27 | std::cout << QString("processing event type %1 for object %2 took %3ms") | ||
28 | .arg((int)event->type()) | ||
29 | .arg(""/* receiver->objectName().toLocal8Bit().data()*/) | ||
30 | .arg((int)t.elapsed()) | ||
31 | .toStdString() << std::endl; | ||
32 | blockingTime += t.elapsed(); | ||
33 | return ret; | ||
34 | } | ||
35 | }; | ||
36 | |||
37 | /** | ||
38 | * Ensure that queries don't block the system for an extended period of time. | ||
39 | * | ||
40 | * This is done by ensuring that the event loop is never blocked. | ||
41 | */ | ||
42 | class ModelinteractivityTest : public QObject | ||
43 | { | ||
44 | Q_OBJECT | ||
45 | private Q_SLOTS: | ||
46 | void initTestCase() | ||
47 | { | ||
48 | Akonadi2::Log::setDebugOutputLevel(Akonadi2::Log::Warning); | ||
49 | DummyResource::removeFromDisk("org.kde.dummy.instance1"); | ||
50 | ResourceConfig::addResource("org.kde.dummy.instance1", "org.kde.dummy"); | ||
51 | } | ||
52 | |||
53 | void cleanup() | ||
54 | { | ||
55 | Akonadi2::Store::shutdown(QByteArray("org.kde.dummy.instance1")).exec().waitForFinished(); | ||
56 | DummyResource::removeFromDisk("org.kde.dummy.instance1"); | ||
57 | Akonadi2::Store::start(QByteArray("org.kde.dummy.instance1")).exec().waitForFinished(); | ||
58 | } | ||
59 | |||
60 | void init() | ||
61 | { | ||
62 | } | ||
63 | |||
64 | void testSingle() | ||
65 | { | ||
66 | //Setup | ||
67 | { | ||
68 | Akonadi2::ApplicationDomain::Mail mail("org.kde.dummy.instance1"); | ||
69 | for (int i = 0; i < 1000; i++) { | ||
70 | Akonadi2::Store::create<Akonadi2::ApplicationDomain::Mail>(mail).exec().waitForFinished(); | ||
71 | } | ||
72 | } | ||
73 | |||
74 | Akonadi2::Query query; | ||
75 | query.resources << "org.kde.dummy.instance1"; | ||
76 | query.syncOnDemand = false; | ||
77 | query.processAll = true; | ||
78 | query.liveQuery = true; | ||
79 | |||
80 | Akonadi2::Store::synchronize(query).exec().waitForFinished(); | ||
81 | |||
82 | //Test | ||
83 | QTime time; | ||
84 | time.start(); | ||
85 | auto model = Akonadi2::Store::loadModel<Akonadi2::ApplicationDomain::Mail>(query); | ||
86 | blockingTime += time.elapsed(); | ||
87 | QTRY_VERIFY(model->data(QModelIndex(), Akonadi2::Store::ChildrenFetchedRole).toBool()); | ||
88 | //Never block longer than 10 ms | ||
89 | QVERIFY2(blockingTime < 10, QString("Total blocking time: %1").arg(blockingTime).toLatin1().data()); | ||
90 | } | ||
91 | }; | ||
92 | |||
93 | int main(int argc, char *argv[]) | ||
94 | { | ||
95 | blockingTime = 0; | ||
96 | TimeMeasuringApplication app(argc, argv); | ||
97 | ModelinteractivityTest tc; | ||
98 | return QTest::qExec(&tc, argc, argv); | ||
99 | } | ||
100 | |||
101 | #include "modelinteractivitytest.moc" | ||