diff options
-rw-r--r-- | common/CMakeLists.txt | 1 | ||||
-rw-r--r-- | common/clientapi.h | 181 | ||||
-rw-r--r-- | common/facade.cpp | 20 | ||||
-rw-r--r-- | common/facade.h | 51 | ||||
-rw-r--r-- | common/synclistresult.h | 54 | ||||
-rw-r--r-- | common/test/clientapitest.cpp | 106 | ||||
-rw-r--r-- | dummyresource/facade.cpp | 44 | ||||
-rw-r--r-- | dummyresource/facade.h | 9 | ||||
-rw-r--r-- | tests/dummyresourcebenchmark.cpp | 1 | ||||
-rw-r--r-- | tests/dummyresourcetest.cpp | 2 |
10 files changed, 393 insertions, 76 deletions
diff --git a/common/CMakeLists.txt b/common/CMakeLists.txt index b06718f..a97c7f9 100644 --- a/common/CMakeLists.txt +++ b/common/CMakeLists.txt | |||
@@ -29,6 +29,7 @@ set(command_SRCS | |||
29 | clientapi.cpp | 29 | clientapi.cpp |
30 | commands.cpp | 30 | commands.cpp |
31 | console.cpp | 31 | console.cpp |
32 | facade.cpp | ||
32 | pipeline.cpp | 33 | pipeline.cpp |
33 | domainadaptor.cpp | 34 | domainadaptor.cpp |
34 | resource.cpp | 35 | resource.cpp |
diff --git a/common/clientapi.h b/common/clientapi.h index 22448b3..c2b9493 100644 --- a/common/clientapi.h +++ b/common/clientapi.h | |||
@@ -49,17 +49,44 @@ namespace async { | |||
49 | */ | 49 | */ |
50 | template<class T> | 50 | template<class T> |
51 | class ResultProvider { | 51 | class ResultProvider { |
52 | public: | 52 | private: |
53 | //Called from worker thread | 53 | void callInMainThreadOnEmitter(void (ResultEmitter<T>::*f)()) |
54 | void add(const T &value) | ||
55 | { | 54 | { |
56 | //We use the eventloop to call the addHandler directly from the main eventloop. | 55 | //We use the eventloop to call the addHandler directly from the main eventloop. |
57 | //That way the result emitter implementation doesn't have to care about threadsafety at all. | 56 | //That way the result emitter implementation doesn't have to care about threadsafety at all. |
58 | //The alternative would be to make all handlers of the emitter threadsafe. | 57 | //The alternative would be to make all handlers of the emitter threadsafe. |
59 | auto emitter = mResultEmitter; | 58 | if (auto emitter = mResultEmitter.toStrongRef()) { |
60 | mResultEmitter->mThreadBoundary.callInMainThread([emitter, value]() { | 59 | auto weakEmitter = mResultEmitter; |
61 | if (emitter) { | 60 | //We don't want to keep the emitter alive here, so we only capture a weak reference |
62 | emitter->addHandler(value); | 61 | emitter->mThreadBoundary.callInMainThread([weakEmitter, f]() { |
62 | if (auto strongRef = weakEmitter.toStrongRef()) { | ||
63 | (strongRef.data()->*f)(); | ||
64 | } | ||
65 | }); | ||
66 | } | ||
67 | } | ||
68 | |||
69 | void callInMainThreadOnEmitter(const std::function<void()> &f) | ||
70 | { | ||
71 | //We use the eventloop to call the addHandler directly from the main eventloop. | ||
72 | //That way the result emitter implementation doesn't have to care about threadsafety at all. | ||
73 | //The alternative would be to make all handlers of the emitter threadsafe. | ||
74 | if (auto emitter = mResultEmitter.toStrongRef()) { | ||
75 | emitter->mThreadBoundary.callInMainThread([f]() { | ||
76 | f(); | ||
77 | }); | ||
78 | } | ||
79 | } | ||
80 | |||
81 | public: | ||
82 | //Called from worker thread | ||
83 | void add(const T &value) | ||
84 | { | ||
85 | //Because I don't know how to use bind | ||
86 | auto weakEmitter = mResultEmitter; | ||
87 | callInMainThreadOnEmitter([weakEmitter, value](){ | ||
88 | if (auto strongRef = weakEmitter.toStrongRef()) { | ||
89 | strongRef->addHandler(value); | ||
63 | } | 90 | } |
64 | }); | 91 | }); |
65 | } | 92 | } |
@@ -67,25 +94,39 @@ namespace async { | |||
67 | //Called from worker thread | 94 | //Called from worker thread |
68 | void complete() | 95 | void complete() |
69 | { | 96 | { |
70 | auto emitter = mResultEmitter; | 97 | callInMainThreadOnEmitter(&ResultEmitter<T>::complete); |
71 | mResultEmitter->mThreadBoundary.callInMainThread([emitter]() { | ||
72 | if (emitter) { | ||
73 | emitter->completeHandler(); | ||
74 | } | ||
75 | }); | ||
76 | } | 98 | } |
77 | 99 | ||
100 | void clear() | ||
101 | { | ||
102 | callInMainThreadOnEmitter(&ResultEmitter<T>::clear); | ||
103 | } | ||
104 | |||
105 | |||
78 | QSharedPointer<ResultEmitter<T> > emitter() | 106 | QSharedPointer<ResultEmitter<T> > emitter() |
79 | { | 107 | { |
80 | if (!mResultEmitter) { | 108 | if (!mResultEmitter) { |
81 | mResultEmitter = QSharedPointer<ResultEmitter<T> >(new ResultEmitter<T>()); | 109 | //We have to go over a separate var and return that, otherwise we'd delete the emitter immediately again |
110 | auto sharedPtr = QSharedPointer<ResultEmitter<T> >::create(); | ||
111 | mResultEmitter = sharedPtr; | ||
112 | return sharedPtr; | ||
82 | } | 113 | } |
83 | 114 | ||
84 | return mResultEmitter; | 115 | return mResultEmitter.toStrongRef(); |
116 | } | ||
117 | |||
118 | /** | ||
119 | * For lifetimemanagement only. | ||
120 | * We keep the runner alive as long as the result provider exists. | ||
121 | */ | ||
122 | void setQueryRunner(const QSharedPointer<QObject> &runner) | ||
123 | { | ||
124 | mQueryRunner = runner; | ||
85 | } | 125 | } |
86 | 126 | ||
87 | private: | 127 | private: |
88 | QSharedPointer<ResultEmitter<T> > mResultEmitter; | 128 | QWeakPointer<ResultEmitter<T> > mResultEmitter; |
129 | QSharedPointer<QObject> mQueryRunner; | ||
89 | }; | 130 | }; |
90 | 131 | ||
91 | /* | 132 | /* |
@@ -99,7 +140,6 @@ namespace async { | |||
99 | * * build async interfaces with signals | 140 | * * build async interfaces with signals |
100 | * * build sync interfaces that block when accessing the value | 141 | * * build sync interfaces that block when accessing the value |
101 | * | 142 | * |
102 | * TODO: This should probably be merged with daniels futurebase used in Async | ||
103 | */ | 143 | */ |
104 | template<class DomainType> | 144 | template<class DomainType> |
105 | class ResultEmitter { | 145 | class ResultEmitter { |
@@ -113,51 +153,36 @@ namespace async { | |||
113 | { | 153 | { |
114 | completeHandler = handler; | 154 | completeHandler = handler; |
115 | } | 155 | } |
156 | void onClear(const std::function<void(void)> &handler) | ||
157 | { | ||
158 | clearHandler = handler; | ||
159 | } | ||
116 | 160 | ||
117 | private: | 161 | void add(const DomainType &value) |
118 | friend class ResultProvider<DomainType>; | 162 | { |
119 | std::function<void(const DomainType&)> addHandler; | 163 | addHandler(value); |
120 | // std::function<void(const T&)> removeHandler; | 164 | } |
121 | std::function<void(void)> completeHandler; | ||
122 | ThreadBoundary mThreadBoundary; | ||
123 | }; | ||
124 | |||
125 | 165 | ||
126 | /* | 166 | void complete() |
127 | * A result set specialization that provides a syncronous list | ||
128 | */ | ||
129 | template<class T> | ||
130 | class SyncListResult : public QList<T> { | ||
131 | public: | ||
132 | SyncListResult(const QSharedPointer<ResultEmitter<T> > &emitter) | ||
133 | :QList<T>(), | ||
134 | mComplete(false), | ||
135 | mEmitter(emitter) | ||
136 | { | 167 | { |
137 | emitter->onAdded([this](const T &value) { | 168 | completeHandler(); |
138 | this->append(value); | ||
139 | }); | ||
140 | emitter->onComplete([this]() { | ||
141 | mComplete = true; | ||
142 | auto loop = mWaitLoop.toStrongRef(); | ||
143 | if (loop) { | ||
144 | loop->quit(); | ||
145 | } | ||
146 | }); | ||
147 | } | 169 | } |
148 | 170 | ||
149 | void exec() | 171 | void clear() |
150 | { | 172 | { |
151 | auto loop = QSharedPointer<QEventLoop>::create(); | 173 | clearHandler(); |
152 | mWaitLoop = loop; | ||
153 | loop->exec(QEventLoop::ExcludeUserInputEvents); | ||
154 | } | 174 | } |
155 | 175 | ||
156 | private: | 176 | private: |
157 | bool mComplete; | 177 | friend class ResultProvider<DomainType>; |
158 | QWeakPointer<QEventLoop> mWaitLoop; | 178 | |
159 | QSharedPointer<ResultEmitter<T> > mEmitter; | 179 | std::function<void(const DomainType&)> addHandler; |
180 | // std::function<void(const T&)> removeHandler; | ||
181 | std::function<void(void)> completeHandler; | ||
182 | std::function<void(void)> clearHandler; | ||
183 | ThreadBoundary mThreadBoundary; | ||
160 | }; | 184 | }; |
185 | |||
161 | } | 186 | } |
162 | 187 | ||
163 | namespace Akonadi2 { | 188 | namespace Akonadi2 { |
@@ -307,7 +332,7 @@ using namespace async; | |||
307 | class Query | 332 | class Query |
308 | { | 333 | { |
309 | public: | 334 | public: |
310 | Query() : syncOnDemand(true), processAll(false) {} | 335 | Query() : syncOnDemand(true), processAll(false), liveQuery(false) {} |
311 | //Could also be a propertyFilter | 336 | //Could also be a propertyFilter |
312 | QByteArrayList resources; | 337 | QByteArrayList resources; |
313 | //Could also be a propertyFilter | 338 | //Could also be a propertyFilter |
@@ -318,6 +343,8 @@ public: | |||
318 | QSet<QByteArray> requestedProperties; | 343 | QSet<QByteArray> requestedProperties; |
319 | bool syncOnDemand; | 344 | bool syncOnDemand; |
320 | bool processAll; | 345 | bool processAll; |
346 | //If live query is false, this query will not continuously be updated | ||
347 | bool liveQuery; | ||
321 | }; | 348 | }; |
322 | 349 | ||
323 | 350 | ||
@@ -337,7 +364,9 @@ public: | |||
337 | virtual Async::Job<void> create(const DomainType &domainObject) = 0; | 364 | virtual Async::Job<void> create(const DomainType &domainObject) = 0; |
338 | virtual Async::Job<void> modify(const DomainType &domainObject) = 0; | 365 | virtual Async::Job<void> modify(const DomainType &domainObject) = 0; |
339 | virtual Async::Job<void> remove(const DomainType &domainObject) = 0; | 366 | virtual Async::Job<void> remove(const DomainType &domainObject) = 0; |
340 | virtual Async::Job<void> load(const Query &query, const std::function<void(const typename DomainType::Ptr &)> &resultCallback) = 0; | 367 | //TODO remove from public API |
368 | virtual Async::Job<qint64> load(const Query &query, const std::function<void(const typename DomainType::Ptr &)> &resultCallback) = 0; | ||
369 | virtual Async::Job<void> load(const Query &query, const QSharedPointer<ResultProvider<typename DomainType::Ptr> > &resultProvider) { return Async::null<void>(); }; | ||
341 | }; | 370 | }; |
342 | 371 | ||
343 | 372 | ||
@@ -349,6 +378,8 @@ public: | |||
349 | 378 | ||
350 | class FacadeFactory { | 379 | class FacadeFactory { |
351 | public: | 380 | public: |
381 | typedef std::function<void*(bool &externallyManaged)> FactoryFunction; | ||
382 | |||
352 | //FIXME: proper singleton implementation | 383 | //FIXME: proper singleton implementation |
353 | static FacadeFactory &instance() | 384 | static FacadeFactory &instance() |
354 | { | 385 | { |
@@ -365,7 +396,7 @@ public: | |||
365 | void registerFacade(const QByteArray &resource) | 396 | void registerFacade(const QByteArray &resource) |
366 | { | 397 | { |
367 | const QByteArray typeName = ApplicationDomain::getTypeName<DomainType>(); | 398 | const QByteArray typeName = ApplicationDomain::getTypeName<DomainType>(); |
368 | mFacadeRegistry.insert(key(resource, typeName), [](){ return new Facade; }); | 399 | mFacadeRegistry.insert(key(resource, typeName), [](bool &externallyManaged){ return new Facade; }); |
369 | } | 400 | } |
370 | 401 | ||
371 | /* | 402 | /* |
@@ -375,29 +406,51 @@ public: | |||
375 | * The facade factory takes ovnership of the pointer and typically deletes the instance via shared pointer. | 406 | * The facade factory takes ovnership of the pointer and typically deletes the instance via shared pointer. |
376 | * Supplied factory functions should therefore always return a new pointer (i.e. via clone()) | 407 | * Supplied factory functions should therefore always return a new pointer (i.e. via clone()) |
377 | * | 408 | * |
378 | * FIXME the factory function should really be returning QSharedPointer<void>, which doesn't work (std::shared_pointer<void> would though). That way i.e. a test could keep the object alive until it's done. | 409 | * FIXME the factory function should really be returning QSharedPointer<void>, which doesn't work (std::shared_pointer<void> would though). That way i.e. a test could keep the object alive until it's done. As a workaround the factory function can define wether it manages the lifetime of the facade itself. |
379 | */ | 410 | */ |
380 | template<class DomainType, class Facade> | 411 | template<class DomainType, class Facade> |
381 | void registerFacade(const QByteArray &resource, const std::function<void*(void)> &customFactoryFunction) | 412 | void registerFacade(const QByteArray &resource, const FactoryFunction &customFactoryFunction) |
382 | { | 413 | { |
383 | const QByteArray typeName = ApplicationDomain::getTypeName<DomainType>(); | 414 | const QByteArray typeName = ApplicationDomain::getTypeName<DomainType>(); |
384 | mFacadeRegistry.insert(key(resource, typeName), customFactoryFunction); | 415 | mFacadeRegistry.insert(key(resource, typeName), customFactoryFunction); |
385 | } | 416 | } |
386 | 417 | ||
418 | /* | ||
419 | * Can be used to clear the factory. | ||
420 | * | ||
421 | * Primarily for testing. | ||
422 | */ | ||
423 | void resetFactory() | ||
424 | { | ||
425 | mFacadeRegistry.clear(); | ||
426 | } | ||
427 | |||
428 | static void doNothingDeleter(void *) | ||
429 | { | ||
430 | qWarning() << "Do nothing"; | ||
431 | } | ||
432 | |||
387 | template<class DomainType> | 433 | template<class DomainType> |
388 | QSharedPointer<StoreFacade<DomainType> > getFacade(const QByteArray &resource) | 434 | QSharedPointer<StoreFacade<DomainType> > getFacade(const QByteArray &resource) |
389 | { | 435 | { |
390 | const QByteArray typeName = ApplicationDomain::getTypeName<DomainType>(); | 436 | const QByteArray typeName = ApplicationDomain::getTypeName<DomainType>(); |
391 | auto factoryFunction = mFacadeRegistry.value(key(resource, typeName)); | 437 | auto factoryFunction = mFacadeRegistry.value(key(resource, typeName)); |
392 | if (factoryFunction) { | 438 | if (factoryFunction) { |
393 | return QSharedPointer<StoreFacade<DomainType> >(static_cast<StoreFacade<DomainType>* >(factoryFunction())); | 439 | bool externallyManaged = false; |
440 | auto ptr = static_cast<StoreFacade<DomainType>* >(factoryFunction(externallyManaged)); | ||
441 | if (externallyManaged) { | ||
442 | //Allows tests to manage the lifetime of injected facades themselves | ||
443 | return QSharedPointer<StoreFacade<DomainType> >(ptr, doNothingDeleter); | ||
444 | } else { | ||
445 | return QSharedPointer<StoreFacade<DomainType> >(ptr); | ||
446 | } | ||
394 | } | 447 | } |
395 | qWarning() << "Failed to find facade for resource: " << resource << " and type: " << typeName; | 448 | qWarning() << "Failed to find facade for resource: " << resource << " and type: " << typeName; |
396 | return QSharedPointer<StoreFacade<DomainType> >(); | 449 | return QSharedPointer<StoreFacade<DomainType> >(); |
397 | } | 450 | } |
398 | 451 | ||
399 | private: | 452 | private: |
400 | QHash<QByteArray, std::function<void*(void)> > mFacadeRegistry; | 453 | QHash<QByteArray, FactoryFunction> mFacadeRegistry; |
401 | }; | 454 | }; |
402 | 455 | ||
403 | /** | 456 | /** |
@@ -416,7 +469,7 @@ public: | |||
416 | template <class DomainType> | 469 | template <class DomainType> |
417 | static QSharedPointer<ResultEmitter<typename DomainType::Ptr> > load(Query query) | 470 | static QSharedPointer<ResultEmitter<typename DomainType::Ptr> > load(Query query) |
418 | { | 471 | { |
419 | QSharedPointer<ResultProvider<typename DomainType::Ptr> > resultSet(new ResultProvider<typename DomainType::Ptr>); | 472 | auto resultSet = QSharedPointer<ResultProvider<typename DomainType::Ptr> >::create(); |
420 | 473 | ||
421 | //Execute the search in a thread. | 474 | //Execute the search in a thread. |
422 | //We must guarantee that the emitter is returned before the first result is emitted. | 475 | //We must guarantee that the emitter is returned before the first result is emitted. |
@@ -428,15 +481,15 @@ public: | |||
428 | Async::Job<void> job = Async::null<void>(); | 481 | Async::Job<void> job = Async::null<void>(); |
429 | for(const QByteArray &resource : query.resources) { | 482 | for(const QByteArray &resource : query.resources) { |
430 | auto facade = FacadeFactory::instance().getFacade<DomainType>(resource); | 483 | auto facade = FacadeFactory::instance().getFacade<DomainType>(resource); |
431 | //We have to bind an instance to the function callback. Since we use a shared pointer this keeps the result provider instance (and thus also the emitter) alive. | ||
432 | std::function<void(const typename DomainType::Ptr &)> addCallback = std::bind(&ResultProvider<typename DomainType::Ptr>::add, resultSet, std::placeholders::_1); | ||
433 | 484 | ||
434 | // TODO The following is a necessary hack to keep the facade alive. | 485 | // TODO The following is a necessary hack to keep the facade alive. |
435 | // Otherwise this would reduce to: | 486 | // Otherwise this would reduce to: |
436 | // job = job.then(facade->load(query, addCallback)); | 487 | // job = job.then(facade->load(query, addCallback)); |
437 | // We somehow have to guarantee that the facade remains valid for the duration of the job | 488 | // We somehow have to guarantee that the facade remains valid for the duration of the job |
438 | job = job.then<void>([facade, query, addCallback](Async::Future<void> &future) { | 489 | // TODO: Use one result set per facade, and merge the results separately |
439 | Async::Job<void> j = facade->load(query, addCallback); | 490 | // resultSet->addSubset(facade->query(query)); |
491 | job = job.then<void>([facade, query, resultSet](Async::Future<void> &future) { | ||
492 | Async::Job<void> j = facade->load(query, resultSet); | ||
440 | j.then<void>([&future, facade](Async::Future<void> &f) { | 493 | j.then<void>([&future, facade](Async::Future<void> &f) { |
441 | future.setFinished(); | 494 | future.setFinished(); |
442 | f.setFinished(); | 495 | f.setFinished(); |
diff --git a/common/facade.cpp b/common/facade.cpp new file mode 100644 index 0000000..e51b32a --- /dev/null +++ b/common/facade.cpp | |||
@@ -0,0 +1,20 @@ | |||
1 | /* | ||
2 | * Copyright (C) 2015 Christian Mollekopf <chrigi_1@fastmail.fm> | ||
3 | * | ||
4 | * This program is free software; you can redistribute it and/or modify | ||
5 | * it under the terms of the GNU General Public License as published by | ||
6 | * the Free Software Foundation; either version 2 of the License, or | ||
7 | * (at your option) any later version. | ||
8 | * | ||
9 | * This program is distributed in the hope that it will be useful, | ||
10 | * but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
11 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||
12 | * GNU General Public License for more details. | ||
13 | * | ||
14 | * You should have received a copy of the GNU General Public License | ||
15 | * along with this program; if not, write to the | ||
16 | * Free Software Foundation, Inc., | ||
17 | * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. | ||
18 | */ | ||
19 | |||
20 | #include "facade.h" | ||
diff --git a/common/facade.h b/common/facade.h index 98bcb38..f9b5a83 100644 --- a/common/facade.h +++ b/common/facade.h | |||
@@ -30,6 +30,55 @@ | |||
30 | #include "domainadaptor.h" | 30 | #include "domainadaptor.h" |
31 | #include "entitybuffer.h" | 31 | #include "entitybuffer.h" |
32 | 32 | ||
33 | /** | ||
34 | * A QueryRunner runs a query and updates the corresponding result set. | ||
35 | * | ||
36 | * The lifetime of the QueryRunner is defined by the resut set (otherwise it's doing useless work), | ||
37 | * and by how long a result set must be updated. If the query is one off the runner dies after the execution, | ||
38 | * otherwise it lives on the react to changes and updates the corresponding result set. | ||
39 | * | ||
40 | * QueryRunner has to keep ResourceAccess alive in order to keep getting updates. | ||
41 | */ | ||
42 | class QueryRunner : public QObject | ||
43 | { | ||
44 | Q_OBJECT | ||
45 | public: | ||
46 | typedef std::function<Async::Job<qint64>(qint64 oldRevision, qint64 newRevision)> QueryFunction; | ||
47 | |||
48 | QueryRunner(const Akonadi2::Query &query) : mLatestRevision(0) {}; | ||
49 | /** | ||
50 | * Starts query | ||
51 | */ | ||
52 | Async::Job<void> run(qint64 newRevision = 0) | ||
53 | { | ||
54 | //TODO: JOBAPI: that last empty .then should not be necessary | ||
55 | return queryFunction(mLatestRevision, newRevision).then<void, qint64>([this](qint64 revision) { | ||
56 | mLatestRevision = revision; | ||
57 | }).then<void>([](){}); | ||
58 | } | ||
59 | |||
60 | /** | ||
61 | * | ||
62 | */ | ||
63 | void setQuery(const QueryFunction &query) | ||
64 | { | ||
65 | queryFunction = query; | ||
66 | } | ||
67 | |||
68 | public slots: | ||
69 | /** | ||
70 | * Rerun query with new revision | ||
71 | */ | ||
72 | void revisionChanged(qint64 newRevision) | ||
73 | { | ||
74 | run(newRevision).exec(); | ||
75 | } | ||
76 | |||
77 | private: | ||
78 | QueryFunction queryFunction; | ||
79 | qint64 mLatestRevision; | ||
80 | }; | ||
81 | |||
33 | namespace Akonadi2 { | 82 | namespace Akonadi2 { |
34 | class ResourceAccess; | 83 | class ResourceAccess; |
35 | /** | 84 | /** |
@@ -80,7 +129,7 @@ protected: | |||
80 | return Async::null<void>(); | 129 | return Async::null<void>(); |
81 | } | 130 | } |
82 | 131 | ||
83 | private: | 132 | protected: |
84 | //TODO use one resource access instance per application => make static | 133 | //TODO use one resource access instance per application => make static |
85 | QSharedPointer<Akonadi2::ResourceAccess> mResourceAccess; | 134 | QSharedPointer<Akonadi2::ResourceAccess> mResourceAccess; |
86 | }; | 135 | }; |
diff --git a/common/synclistresult.h b/common/synclistresult.h new file mode 100644 index 0000000..5fa0efd --- /dev/null +++ b/common/synclistresult.h | |||
@@ -0,0 +1,54 @@ | |||
1 | #pragma once | ||
2 | |||
3 | #include <QList> | ||
4 | #include <functional> | ||
5 | #include <QSharedPointer> | ||
6 | #include <clientapi.h> | ||
7 | |||
8 | namespace async { | ||
9 | |||
10 | /* | ||
11 | * A result set specialization that provides a syncronous list. | ||
12 | * | ||
13 | * Only for testing purposes. | ||
14 | * | ||
15 | * WARNING: The nested eventloop can cause all sorts of trouble. Use only in testing code. | ||
16 | */ | ||
17 | template<class T> | ||
18 | class SyncListResult : public QList<T> { | ||
19 | public: | ||
20 | SyncListResult(const QSharedPointer<ResultEmitter<T> > &emitter) | ||
21 | :QList<T>(), | ||
22 | mComplete(false), | ||
23 | mEmitter(emitter) | ||
24 | { | ||
25 | emitter->onAdded([this](const T &value) { | ||
26 | this->append(value); | ||
27 | }); | ||
28 | emitter->onComplete([this]() { | ||
29 | mComplete = true; | ||
30 | if (eventLoopAborter) { | ||
31 | eventLoopAborter(); | ||
32 | //Be safe in case of a second invocation of the complete handler | ||
33 | eventLoopAborter = std::function<void()>(); | ||
34 | } | ||
35 | }); | ||
36 | emitter->onClear([this]() { | ||
37 | this->clear(); | ||
38 | }); | ||
39 | } | ||
40 | |||
41 | void exec() | ||
42 | { | ||
43 | QEventLoop eventLoop; | ||
44 | eventLoopAborter = [&eventLoop]() { eventLoop.quit(); }; | ||
45 | eventLoop.exec(); | ||
46 | } | ||
47 | |||
48 | private: | ||
49 | bool mComplete; | ||
50 | QSharedPointer<ResultEmitter<T> > mEmitter; | ||
51 | std::function<void()> eventLoopAborter; | ||
52 | }; | ||
53 | |||
54 | } | ||
diff --git a/common/test/clientapitest.cpp b/common/test/clientapitest.cpp index 24b3fb9..789c656 100644 --- a/common/test/clientapitest.cpp +++ b/common/test/clientapitest.cpp | |||
@@ -3,6 +3,22 @@ | |||
3 | #include <functional> | 3 | #include <functional> |
4 | 4 | ||
5 | #include "../clientapi.h" | 5 | #include "../clientapi.h" |
6 | #include "../facade.h" | ||
7 | #include "../synclistresult.h" | ||
8 | |||
9 | class RevisionNotifier : public QObject | ||
10 | { | ||
11 | Q_OBJECT | ||
12 | public: | ||
13 | RevisionNotifier() : QObject() {}; | ||
14 | void notify(qint64 revision) | ||
15 | { | ||
16 | emit revisionChanged(revision); | ||
17 | } | ||
18 | |||
19 | Q_SIGNALS: | ||
20 | void revisionChanged(qint64); | ||
21 | }; | ||
6 | 22 | ||
7 | class DummyResourceFacade : public Akonadi2::StoreFacade<Akonadi2::ApplicationDomain::Event> | 23 | class DummyResourceFacade : public Akonadi2::StoreFacade<Akonadi2::ApplicationDomain::Event> |
8 | { | 24 | { |
@@ -11,18 +27,63 @@ public: | |||
11 | virtual Async::Job<void> create(const Akonadi2::ApplicationDomain::Event &domainObject){ return Async::null<void>(); }; | 27 | virtual Async::Job<void> create(const Akonadi2::ApplicationDomain::Event &domainObject){ return Async::null<void>(); }; |
12 | virtual Async::Job<void> modify(const Akonadi2::ApplicationDomain::Event &domainObject){ return Async::null<void>(); }; | 28 | virtual Async::Job<void> modify(const Akonadi2::ApplicationDomain::Event &domainObject){ return Async::null<void>(); }; |
13 | virtual Async::Job<void> remove(const Akonadi2::ApplicationDomain::Event &domainObject){ return Async::null<void>(); }; | 29 | virtual Async::Job<void> remove(const Akonadi2::ApplicationDomain::Event &domainObject){ return Async::null<void>(); }; |
14 | virtual Async::Job<void> load(const Akonadi2::Query &query, const std::function<void(const Akonadi2::ApplicationDomain::Event::Ptr &)> &resultCallback) | 30 | virtual Async::Job<qint64> load(const Akonadi2::Query &query, const std::function<void(const Akonadi2::ApplicationDomain::Event::Ptr &)> &resultCallback) |
15 | { | 31 | { |
16 | return Async::start<void>([this, resultCallback](Async::Future<void> &future) { | 32 | return Async::start<qint64>([this, resultCallback](Async::Future<qint64> &future) { |
17 | qDebug() << "load called"; | 33 | qDebug() << "load called"; |
18 | for(const auto &result : results) { | 34 | for(const auto &result : results) { |
19 | resultCallback(result); | 35 | resultCallback(result); |
20 | } | 36 | } |
37 | future.setValue(0); | ||
21 | future.setFinished(); | 38 | future.setFinished(); |
22 | }); | 39 | }); |
23 | } | 40 | } |
24 | 41 | ||
42 | Async::Job<void> load(const Akonadi2::Query &query, const QSharedPointer<Akonadi2::ResultProvider<Akonadi2::ApplicationDomain::Event::Ptr> > &resultProvider) | ||
43 | { | ||
44 | auto runner = QSharedPointer<QueryRunner>::create(query); | ||
45 | //The runner only lives as long as the resultProvider | ||
46 | resultProvider->setQueryRunner(runner); | ||
47 | runner->setQuery([this, resultProvider, query](qint64 oldRevision, qint64 newRevision) -> Async::Job<qint64> { | ||
48 | qDebug() << "Creating query for revisions: " << oldRevision << newRevision; | ||
49 | return Async::start<qint64>([this, resultProvider, query](Async::Future<qint64> &future) { | ||
50 | //TODO only emit changes and don't replace everything | ||
51 | resultProvider->clear(); | ||
52 | //rerun query | ||
53 | std::function<void(const Akonadi2::ApplicationDomain::Event::Ptr &)> addCallback = std::bind(&Akonadi2::ResultProvider<Akonadi2::ApplicationDomain::Event::Ptr>::add, resultProvider, std::placeholders::_1); | ||
54 | load(query, addCallback).then<void, qint64>([resultProvider, &future](qint64 queriedRevision) { | ||
55 | //TODO set revision in result provider? | ||
56 | //TODO update all existing results with new revision | ||
57 | resultProvider->complete(); | ||
58 | future.setValue(queriedRevision); | ||
59 | future.setFinished(); | ||
60 | }).exec(); | ||
61 | }); | ||
62 | }); | ||
63 | |||
64 | //Ensure the notification is emitted in the right thread | ||
65 | //Otherwise we get crashes as we call revisionChanged from the test. | ||
66 | if (!notifier) { | ||
67 | notifier.reset(new RevisionNotifier); | ||
68 | } | ||
69 | |||
70 | //TODO somehow disconnect as resultNotifier is destroyed. Otherwise we keep the runner alive forever. | ||
71 | if (query.liveQuery) { | ||
72 | QObject::connect(notifier.data(), &RevisionNotifier::revisionChanged, [runner](qint64 newRevision) { | ||
73 | runner->revisionChanged(newRevision); | ||
74 | }); | ||
75 | } | ||
76 | |||
77 | return Async::start<void>([runner](Async::Future<void> &future) { | ||
78 | runner->run().then<void>([&future]() { | ||
79 | //TODO if not live query, destroy runner. | ||
80 | future.setFinished(); | ||
81 | }).exec(); | ||
82 | }); | ||
83 | } | ||
84 | |||
25 | QList<Akonadi2::ApplicationDomain::Event::Ptr> results; | 85 | QList<Akonadi2::ApplicationDomain::Event::Ptr> results; |
86 | QSharedPointer<RevisionNotifier> notifier; | ||
26 | }; | 87 | }; |
27 | 88 | ||
28 | class ClientAPITest : public QObject | 89 | class ClientAPITest : public QObject |
@@ -30,21 +91,60 @@ class ClientAPITest : public QObject | |||
30 | Q_OBJECT | 91 | Q_OBJECT |
31 | private Q_SLOTS: | 92 | private Q_SLOTS: |
32 | 93 | ||
94 | void initTestCase() | ||
95 | { | ||
96 | Akonadi2::FacadeFactory::instance().resetFactory(); | ||
97 | } | ||
98 | |||
33 | void testLoad() | 99 | void testLoad() |
34 | { | 100 | { |
35 | DummyResourceFacade facade; | 101 | DummyResourceFacade facade; |
36 | facade.results << QSharedPointer<Akonadi2::ApplicationDomain::Event>::create("resource", "id", 0, QSharedPointer<Akonadi2::ApplicationDomain::BufferAdaptor>()); | 102 | facade.results << QSharedPointer<Akonadi2::ApplicationDomain::Event>::create("resource", "id", 0, QSharedPointer<Akonadi2::ApplicationDomain::BufferAdaptor>()); |
37 | 103 | ||
38 | Akonadi2::FacadeFactory::instance().registerFacade<Akonadi2::ApplicationDomain::Event, DummyResourceFacade>("dummyresource", [facade](){ return new DummyResourceFacade(facade); }); | 104 | Akonadi2::FacadeFactory::instance().registerFacade<Akonadi2::ApplicationDomain::Event, DummyResourceFacade>("dummyresource", |
105 | [&facade](bool &externallyManaged) { | ||
106 | externallyManaged = true; | ||
107 | return &facade; | ||
108 | } | ||
109 | ); | ||
39 | 110 | ||
40 | Akonadi2::Query query; | 111 | Akonadi2::Query query; |
41 | query.resources << "dummyresource"; | 112 | query.resources << "dummyresource"; |
113 | query.liveQuery = false; | ||
42 | 114 | ||
43 | async::SyncListResult<Akonadi2::ApplicationDomain::Event::Ptr> result(Akonadi2::Store::load<Akonadi2::ApplicationDomain::Event>(query)); | 115 | async::SyncListResult<Akonadi2::ApplicationDomain::Event::Ptr> result(Akonadi2::Store::load<Akonadi2::ApplicationDomain::Event>(query)); |
44 | result.exec(); | 116 | result.exec(); |
45 | QCOMPARE(result.size(), 1); | 117 | QCOMPARE(result.size(), 1); |
46 | } | 118 | } |
47 | 119 | ||
120 | void testLiveQuery() | ||
121 | { | ||
122 | DummyResourceFacade facade; | ||
123 | facade.results << QSharedPointer<Akonadi2::ApplicationDomain::Event>::create("resource", "id", 0, QSharedPointer<Akonadi2::ApplicationDomain::BufferAdaptor>()); | ||
124 | |||
125 | Akonadi2::FacadeFactory::instance().registerFacade<Akonadi2::ApplicationDomain::Event, DummyResourceFacade>("dummyresource", | ||
126 | [&facade](bool &externallManage){ | ||
127 | externallManage = true; | ||
128 | return &facade; | ||
129 | } | ||
130 | ); | ||
131 | |||
132 | Akonadi2::Query query; | ||
133 | query.resources << "dummyresource"; | ||
134 | query.liveQuery = true; | ||
135 | |||
136 | async::SyncListResult<Akonadi2::ApplicationDomain::Event::Ptr> result(Akonadi2::Store::load<Akonadi2::ApplicationDomain::Event>(query)); | ||
137 | result.exec(); | ||
138 | QCOMPARE(result.size(), 1); | ||
139 | |||
140 | //Enter a second result | ||
141 | facade.results << QSharedPointer<Akonadi2::ApplicationDomain::Event>::create("resource", "id2", 0, QSharedPointer<Akonadi2::ApplicationDomain::BufferAdaptor>()); | ||
142 | qWarning() << &facade; | ||
143 | QVERIFY(facade.notifier); | ||
144 | facade.notifier->revisionChanged(2); | ||
145 | QTRY_COMPARE(result.size(), 2); | ||
146 | } | ||
147 | |||
48 | }; | 148 | }; |
49 | 149 | ||
50 | QTEST_MAIN(ClientAPITest) | 150 | QTEST_MAIN(ClientAPITest) |
diff --git a/dummyresource/facade.cpp b/dummyresource/facade.cpp index 1477fcf..f603c56 100644 --- a/dummyresource/facade.cpp +++ b/dummyresource/facade.cpp | |||
@@ -147,15 +147,51 @@ void DummyResourceFacade::readValue(QSharedPointer<Akonadi2::Storage> storage, c | |||
147 | }); | 147 | }); |
148 | } | 148 | } |
149 | 149 | ||
150 | Async::Job<void> DummyResourceFacade::load(const Akonadi2::Query &query, const std::function<void(const Akonadi2::ApplicationDomain::Event::Ptr &)> &resultCallback) | 150 | Async::Job<void> DummyResourceFacade::load(const Akonadi2::Query &query, const QSharedPointer<Akonadi2::ResultProvider<Akonadi2::ApplicationDomain::Event::Ptr> > &resultProvider) |
151 | { | 151 | { |
152 | return synchronizeResource(query.syncOnDemand, query.processAll).then<void>([=](Async::Future<void> &future) { | 152 | auto runner = QSharedPointer<QueryRunner>::create(query); |
153 | //The runner only lives as long as the resultProvider | ||
154 | resultProvider->setQueryRunner(runner); | ||
155 | runner->setQuery([this, resultProvider, query](qint64 oldRevision, qint64 newRevision) -> Async::Job<qint64> { | ||
156 | return Async::start<qint64>([this, resultProvider, query](Async::Future<qint64> &future) { | ||
157 | //TODO only emit changes and don't replace everything | ||
158 | resultProvider->clear(); | ||
159 | //rerun query | ||
160 | std::function<void(const Akonadi2::ApplicationDomain::Event::Ptr &)> addCallback = std::bind(&Akonadi2::ResultProvider<Akonadi2::ApplicationDomain::Event::Ptr>::add, resultProvider, std::placeholders::_1); | ||
161 | load(query, addCallback).then<void, qint64>([resultProvider, &future](qint64 queriedRevision) { | ||
162 | //TODO set revision in result provider? | ||
163 | //TODO update all existing results with new revision | ||
164 | resultProvider->complete(); | ||
165 | future.setValue(queriedRevision); | ||
166 | future.setFinished(); | ||
167 | }).exec(); | ||
168 | }); | ||
169 | }); | ||
170 | |||
171 | auto resourceAccess = mResourceAccess; | ||
172 | //TODO we need to somehow disconnect this | ||
173 | QObject::connect(resourceAccess.data(), &Akonadi2::ResourceAccess::revisionChanged, [runner](qint64 newRevision) { | ||
174 | runner->revisionChanged(newRevision); | ||
175 | }); | ||
176 | |||
177 | return Async::start<void>([runner](Async::Future<void> &future) { | ||
178 | runner->run().then<void>([&future]() { | ||
179 | //TODO if not live query, destroy runner. | ||
180 | future.setFinished(); | ||
181 | }).exec(); | ||
182 | }); | ||
183 | } | ||
184 | |||
185 | Async::Job<qint64> DummyResourceFacade::load(const Akonadi2::Query &query, const std::function<void(const Akonadi2::ApplicationDomain::Event::Ptr &)> &resultCallback) | ||
186 | { | ||
187 | return synchronizeResource(query.syncOnDemand, query.processAll).then<qint64>([=](Async::Future<qint64> &future) { | ||
153 | //Now that the sync is complete we can execute the query | 188 | //Now that the sync is complete we can execute the query |
154 | const auto preparedQuery = prepareQuery(query); | 189 | const auto preparedQuery = prepareQuery(query); |
155 | 190 | ||
156 | auto storage = QSharedPointer<Akonadi2::Storage>::create(Akonadi2::Store::storageLocation(), "org.kde.dummy"); | 191 | auto storage = QSharedPointer<Akonadi2::Storage>::create(Akonadi2::Store::storageLocation(), "org.kde.dummy"); |
157 | 192 | ||
158 | //TODO use transaction over full query and record store revision. We'll need it to update the query. | 193 | storage->startTransaction(Akonadi2::Storage::ReadOnly); |
194 | const qint64 revision = storage->maxRevision(); | ||
159 | 195 | ||
160 | //Index lookups | 196 | //Index lookups |
161 | QVector<QByteArray> keys; | 197 | QVector<QByteArray> keys; |
@@ -177,6 +213,8 @@ Async::Job<void> DummyResourceFacade::load(const Akonadi2::Query &query, const s | |||
177 | readValue(storage, key, resultCallback, preparedQuery); | 213 | readValue(storage, key, resultCallback, preparedQuery); |
178 | } | 214 | } |
179 | } | 215 | } |
216 | storage->abortTransaction(); | ||
217 | future.setValue(revision); | ||
180 | future.setFinished(); | 218 | future.setFinished(); |
181 | }); | 219 | }); |
182 | } | 220 | } |
diff --git a/dummyresource/facade.h b/dummyresource/facade.h index 37ed81d..3ddfe15 100644 --- a/dummyresource/facade.h +++ b/dummyresource/facade.h | |||
@@ -34,10 +34,11 @@ class DummyResourceFacade : public Akonadi2::GenericFacade<Akonadi2::Application | |||
34 | public: | 34 | public: |
35 | DummyResourceFacade(); | 35 | DummyResourceFacade(); |
36 | virtual ~DummyResourceFacade(); | 36 | virtual ~DummyResourceFacade(); |
37 | virtual Async::Job<void> create(const Akonadi2::ApplicationDomain::Event &domainObject); | 37 | Async::Job<void> create(const Akonadi2::ApplicationDomain::Event &domainObject) Q_DECL_OVERRIDE; |
38 | virtual Async::Job<void> modify(const Akonadi2::ApplicationDomain::Event &domainObject); | 38 | Async::Job<void> modify(const Akonadi2::ApplicationDomain::Event &domainObject) Q_DECL_OVERRIDE; |
39 | virtual Async::Job<void> remove(const Akonadi2::ApplicationDomain::Event &domainObject); | 39 | Async::Job<void> remove(const Akonadi2::ApplicationDomain::Event &domainObject) Q_DECL_OVERRIDE; |
40 | virtual Async::Job<void> load(const Akonadi2::Query &query, const std::function<void(const Akonadi2::ApplicationDomain::Event::Ptr &)> &resultCallback); | 40 | Async::Job<qint64> load(const Akonadi2::Query &query, const std::function<void(const Akonadi2::ApplicationDomain::Event::Ptr &)> &resultCallback) Q_DECL_OVERRIDE; |
41 | Async::Job<void> load(const Akonadi2::Query &query, const QSharedPointer<Akonadi2::ResultProvider<Akonadi2::ApplicationDomain::Event::Ptr> > &resultProvider) Q_DECL_OVERRIDE; | ||
41 | 42 | ||
42 | private: | 43 | private: |
43 | void readValue(QSharedPointer<Akonadi2::Storage> storage, const QByteArray &key, const std::function<void(const Akonadi2::ApplicationDomain::Event::Ptr &)> &resultCallback, std::function<bool(const std::string &key, DummyCalendar::DummyEvent const *buffer, Akonadi2::ApplicationDomain::Buffer::Event const *local)>); | 44 | void readValue(QSharedPointer<Akonadi2::Storage> storage, const QByteArray &key, const std::function<void(const Akonadi2::ApplicationDomain::Event::Ptr &)> &resultCallback, std::function<bool(const std::string &key, DummyCalendar::DummyEvent const *buffer, Akonadi2::ApplicationDomain::Buffer::Event const *local)>); |
diff --git a/tests/dummyresourcebenchmark.cpp b/tests/dummyresourcebenchmark.cpp index d83cb70..e350747 100644 --- a/tests/dummyresourcebenchmark.cpp +++ b/tests/dummyresourcebenchmark.cpp | |||
@@ -7,6 +7,7 @@ | |||
7 | #include "clientapi.h" | 7 | #include "clientapi.h" |
8 | #include "commands.h" | 8 | #include "commands.h" |
9 | #include "entitybuffer.h" | 9 | #include "entitybuffer.h" |
10 | #include "synclistresult.h" | ||
10 | 11 | ||
11 | #include "event_generated.h" | 12 | #include "event_generated.h" |
12 | #include "entity_generated.h" | 13 | #include "entity_generated.h" |
diff --git a/tests/dummyresourcetest.cpp b/tests/dummyresourcetest.cpp index 5fed7cd..9523e5d 100644 --- a/tests/dummyresourcetest.cpp +++ b/tests/dummyresourcetest.cpp | |||
@@ -2,13 +2,13 @@ | |||
2 | 2 | ||
3 | #include <QString> | 3 | #include <QString> |
4 | 4 | ||
5 | // #include "dummycalendar_generated.h" | ||
6 | #include "event_generated.h" | 5 | #include "event_generated.h" |
7 | #include "entity_generated.h" | 6 | #include "entity_generated.h" |
8 | #include "metadata_generated.h" | 7 | #include "metadata_generated.h" |
9 | #include "createentity_generated.h" | 8 | #include "createentity_generated.h" |
10 | #include "dummyresource/resourcefactory.h" | 9 | #include "dummyresource/resourcefactory.h" |
11 | #include "clientapi.h" | 10 | #include "clientapi.h" |
11 | #include "synclistresult.h" | ||
12 | #include "commands.h" | 12 | #include "commands.h" |
13 | #include "entitybuffer.h" | 13 | #include "entitybuffer.h" |
14 | 14 | ||