summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--common/CMakeLists.txt1
-rw-r--r--common/clientapi.h181
-rw-r--r--common/facade.cpp20
-rw-r--r--common/facade.h51
-rw-r--r--common/synclistresult.h54
-rw-r--r--common/test/clientapitest.cpp106
-rw-r--r--dummyresource/facade.cpp44
-rw-r--r--dummyresource/facade.h9
-rw-r--r--tests/dummyresourcebenchmark.cpp1
-rw-r--r--tests/dummyresourcetest.cpp2
10 files changed, 393 insertions, 76 deletions
diff --git a/common/CMakeLists.txt b/common/CMakeLists.txt
index b06718f..a97c7f9 100644
--- a/common/CMakeLists.txt
+++ b/common/CMakeLists.txt
@@ -29,6 +29,7 @@ set(command_SRCS
29 clientapi.cpp 29 clientapi.cpp
30 commands.cpp 30 commands.cpp
31 console.cpp 31 console.cpp
32 facade.cpp
32 pipeline.cpp 33 pipeline.cpp
33 domainadaptor.cpp 34 domainadaptor.cpp
34 resource.cpp 35 resource.cpp
diff --git a/common/clientapi.h b/common/clientapi.h
index 22448b3..c2b9493 100644
--- a/common/clientapi.h
+++ b/common/clientapi.h
@@ -49,17 +49,44 @@ namespace async {
49 */ 49 */
50 template<class T> 50 template<class T>
51 class ResultProvider { 51 class ResultProvider {
52 public: 52 private:
53 //Called from worker thread 53 void callInMainThreadOnEmitter(void (ResultEmitter<T>::*f)())
54 void add(const T &value)
55 { 54 {
56 //We use the eventloop to call the addHandler directly from the main eventloop. 55 //We use the eventloop to call the addHandler directly from the main eventloop.
57 //That way the result emitter implementation doesn't have to care about threadsafety at all. 56 //That way the result emitter implementation doesn't have to care about threadsafety at all.
58 //The alternative would be to make all handlers of the emitter threadsafe. 57 //The alternative would be to make all handlers of the emitter threadsafe.
59 auto emitter = mResultEmitter; 58 if (auto emitter = mResultEmitter.toStrongRef()) {
60 mResultEmitter->mThreadBoundary.callInMainThread([emitter, value]() { 59 auto weakEmitter = mResultEmitter;
61 if (emitter) { 60 //We don't want to keep the emitter alive here, so we only capture a weak reference
62 emitter->addHandler(value); 61 emitter->mThreadBoundary.callInMainThread([weakEmitter, f]() {
62 if (auto strongRef = weakEmitter.toStrongRef()) {
63 (strongRef.data()->*f)();
64 }
65 });
66 }
67 }
68
69 void callInMainThreadOnEmitter(const std::function<void()> &f)
70 {
71 //We use the eventloop to call the addHandler directly from the main eventloop.
72 //That way the result emitter implementation doesn't have to care about threadsafety at all.
73 //The alternative would be to make all handlers of the emitter threadsafe.
74 if (auto emitter = mResultEmitter.toStrongRef()) {
75 emitter->mThreadBoundary.callInMainThread([f]() {
76 f();
77 });
78 }
79 }
80
81 public:
82 //Called from worker thread
83 void add(const T &value)
84 {
85 //Because I don't know how to use bind
86 auto weakEmitter = mResultEmitter;
87 callInMainThreadOnEmitter([weakEmitter, value](){
88 if (auto strongRef = weakEmitter.toStrongRef()) {
89 strongRef->addHandler(value);
63 } 90 }
64 }); 91 });
65 } 92 }
@@ -67,25 +94,39 @@ namespace async {
67 //Called from worker thread 94 //Called from worker thread
68 void complete() 95 void complete()
69 { 96 {
70 auto emitter = mResultEmitter; 97 callInMainThreadOnEmitter(&ResultEmitter<T>::complete);
71 mResultEmitter->mThreadBoundary.callInMainThread([emitter]() {
72 if (emitter) {
73 emitter->completeHandler();
74 }
75 });
76 } 98 }
77 99
100 void clear()
101 {
102 callInMainThreadOnEmitter(&ResultEmitter<T>::clear);
103 }
104
105
78 QSharedPointer<ResultEmitter<T> > emitter() 106 QSharedPointer<ResultEmitter<T> > emitter()
79 { 107 {
80 if (!mResultEmitter) { 108 if (!mResultEmitter) {
81 mResultEmitter = QSharedPointer<ResultEmitter<T> >(new ResultEmitter<T>()); 109 //We have to go over a separate var and return that, otherwise we'd delete the emitter immediately again
110 auto sharedPtr = QSharedPointer<ResultEmitter<T> >::create();
111 mResultEmitter = sharedPtr;
112 return sharedPtr;
82 } 113 }
83 114
84 return mResultEmitter; 115 return mResultEmitter.toStrongRef();
116 }
117
118 /**
119 * For lifetimemanagement only.
120 * We keep the runner alive as long as the result provider exists.
121 */
122 void setQueryRunner(const QSharedPointer<QObject> &runner)
123 {
124 mQueryRunner = runner;
85 } 125 }
86 126
87 private: 127 private:
88 QSharedPointer<ResultEmitter<T> > mResultEmitter; 128 QWeakPointer<ResultEmitter<T> > mResultEmitter;
129 QSharedPointer<QObject> mQueryRunner;
89 }; 130 };
90 131
91 /* 132 /*
@@ -99,7 +140,6 @@ namespace async {
99 * * build async interfaces with signals 140 * * build async interfaces with signals
100 * * build sync interfaces that block when accessing the value 141 * * build sync interfaces that block when accessing the value
101 * 142 *
102 * TODO: This should probably be merged with daniels futurebase used in Async
103 */ 143 */
104 template<class DomainType> 144 template<class DomainType>
105 class ResultEmitter { 145 class ResultEmitter {
@@ -113,51 +153,36 @@ namespace async {
113 { 153 {
114 completeHandler = handler; 154 completeHandler = handler;
115 } 155 }
156 void onClear(const std::function<void(void)> &handler)
157 {
158 clearHandler = handler;
159 }
116 160
117 private: 161 void add(const DomainType &value)
118 friend class ResultProvider<DomainType>; 162 {
119 std::function<void(const DomainType&)> addHandler; 163 addHandler(value);
120 // std::function<void(const T&)> removeHandler; 164 }
121 std::function<void(void)> completeHandler;
122 ThreadBoundary mThreadBoundary;
123 };
124
125 165
126 /* 166 void complete()
127 * A result set specialization that provides a syncronous list
128 */
129 template<class T>
130 class SyncListResult : public QList<T> {
131 public:
132 SyncListResult(const QSharedPointer<ResultEmitter<T> > &emitter)
133 :QList<T>(),
134 mComplete(false),
135 mEmitter(emitter)
136 { 167 {
137 emitter->onAdded([this](const T &value) { 168 completeHandler();
138 this->append(value);
139 });
140 emitter->onComplete([this]() {
141 mComplete = true;
142 auto loop = mWaitLoop.toStrongRef();
143 if (loop) {
144 loop->quit();
145 }
146 });
147 } 169 }
148 170
149 void exec() 171 void clear()
150 { 172 {
151 auto loop = QSharedPointer<QEventLoop>::create(); 173 clearHandler();
152 mWaitLoop = loop;
153 loop->exec(QEventLoop::ExcludeUserInputEvents);
154 } 174 }
155 175
156 private: 176 private:
157 bool mComplete; 177 friend class ResultProvider<DomainType>;
158 QWeakPointer<QEventLoop> mWaitLoop; 178
159 QSharedPointer<ResultEmitter<T> > mEmitter; 179 std::function<void(const DomainType&)> addHandler;
180 // std::function<void(const T&)> removeHandler;
181 std::function<void(void)> completeHandler;
182 std::function<void(void)> clearHandler;
183 ThreadBoundary mThreadBoundary;
160 }; 184 };
185
161} 186}
162 187
163namespace Akonadi2 { 188namespace Akonadi2 {
@@ -307,7 +332,7 @@ using namespace async;
307class Query 332class Query
308{ 333{
309public: 334public:
310 Query() : syncOnDemand(true), processAll(false) {} 335 Query() : syncOnDemand(true), processAll(false), liveQuery(false) {}
311 //Could also be a propertyFilter 336 //Could also be a propertyFilter
312 QByteArrayList resources; 337 QByteArrayList resources;
313 //Could also be a propertyFilter 338 //Could also be a propertyFilter
@@ -318,6 +343,8 @@ public:
318 QSet<QByteArray> requestedProperties; 343 QSet<QByteArray> requestedProperties;
319 bool syncOnDemand; 344 bool syncOnDemand;
320 bool processAll; 345 bool processAll;
346 //If live query is false, this query will not continuously be updated
347 bool liveQuery;
321}; 348};
322 349
323 350
@@ -337,7 +364,9 @@ public:
337 virtual Async::Job<void> create(const DomainType &domainObject) = 0; 364 virtual Async::Job<void> create(const DomainType &domainObject) = 0;
338 virtual Async::Job<void> modify(const DomainType &domainObject) = 0; 365 virtual Async::Job<void> modify(const DomainType &domainObject) = 0;
339 virtual Async::Job<void> remove(const DomainType &domainObject) = 0; 366 virtual Async::Job<void> remove(const DomainType &domainObject) = 0;
340 virtual Async::Job<void> load(const Query &query, const std::function<void(const typename DomainType::Ptr &)> &resultCallback) = 0; 367 //TODO remove from public API
368 virtual Async::Job<qint64> load(const Query &query, const std::function<void(const typename DomainType::Ptr &)> &resultCallback) = 0;
369 virtual Async::Job<void> load(const Query &query, const QSharedPointer<ResultProvider<typename DomainType::Ptr> > &resultProvider) { return Async::null<void>(); };
341}; 370};
342 371
343 372
@@ -349,6 +378,8 @@ public:
349 378
350class FacadeFactory { 379class FacadeFactory {
351public: 380public:
381 typedef std::function<void*(bool &externallyManaged)> FactoryFunction;
382
352 //FIXME: proper singleton implementation 383 //FIXME: proper singleton implementation
353 static FacadeFactory &instance() 384 static FacadeFactory &instance()
354 { 385 {
@@ -365,7 +396,7 @@ public:
365 void registerFacade(const QByteArray &resource) 396 void registerFacade(const QByteArray &resource)
366 { 397 {
367 const QByteArray typeName = ApplicationDomain::getTypeName<DomainType>(); 398 const QByteArray typeName = ApplicationDomain::getTypeName<DomainType>();
368 mFacadeRegistry.insert(key(resource, typeName), [](){ return new Facade; }); 399 mFacadeRegistry.insert(key(resource, typeName), [](bool &externallyManaged){ return new Facade; });
369 } 400 }
370 401
371 /* 402 /*
@@ -375,29 +406,51 @@ public:
375 * The facade factory takes ovnership of the pointer and typically deletes the instance via shared pointer. 406 * The facade factory takes ovnership of the pointer and typically deletes the instance via shared pointer.
376 * Supplied factory functions should therefore always return a new pointer (i.e. via clone()) 407 * Supplied factory functions should therefore always return a new pointer (i.e. via clone())
377 * 408 *
378 * FIXME the factory function should really be returning QSharedPointer<void>, which doesn't work (std::shared_pointer<void> would though). That way i.e. a test could keep the object alive until it's done. 409 * FIXME the factory function should really be returning QSharedPointer<void>, which doesn't work (std::shared_pointer<void> would though). That way i.e. a test could keep the object alive until it's done. As a workaround the factory function can define wether it manages the lifetime of the facade itself.
379 */ 410 */
380 template<class DomainType, class Facade> 411 template<class DomainType, class Facade>
381 void registerFacade(const QByteArray &resource, const std::function<void*(void)> &customFactoryFunction) 412 void registerFacade(const QByteArray &resource, const FactoryFunction &customFactoryFunction)
382 { 413 {
383 const QByteArray typeName = ApplicationDomain::getTypeName<DomainType>(); 414 const QByteArray typeName = ApplicationDomain::getTypeName<DomainType>();
384 mFacadeRegistry.insert(key(resource, typeName), customFactoryFunction); 415 mFacadeRegistry.insert(key(resource, typeName), customFactoryFunction);
385 } 416 }
386 417
418 /*
419 * Can be used to clear the factory.
420 *
421 * Primarily for testing.
422 */
423 void resetFactory()
424 {
425 mFacadeRegistry.clear();
426 }
427
428 static void doNothingDeleter(void *)
429 {
430 qWarning() << "Do nothing";
431 }
432
387 template<class DomainType> 433 template<class DomainType>
388 QSharedPointer<StoreFacade<DomainType> > getFacade(const QByteArray &resource) 434 QSharedPointer<StoreFacade<DomainType> > getFacade(const QByteArray &resource)
389 { 435 {
390 const QByteArray typeName = ApplicationDomain::getTypeName<DomainType>(); 436 const QByteArray typeName = ApplicationDomain::getTypeName<DomainType>();
391 auto factoryFunction = mFacadeRegistry.value(key(resource, typeName)); 437 auto factoryFunction = mFacadeRegistry.value(key(resource, typeName));
392 if (factoryFunction) { 438 if (factoryFunction) {
393 return QSharedPointer<StoreFacade<DomainType> >(static_cast<StoreFacade<DomainType>* >(factoryFunction())); 439 bool externallyManaged = false;
440 auto ptr = static_cast<StoreFacade<DomainType>* >(factoryFunction(externallyManaged));
441 if (externallyManaged) {
442 //Allows tests to manage the lifetime of injected facades themselves
443 return QSharedPointer<StoreFacade<DomainType> >(ptr, doNothingDeleter);
444 } else {
445 return QSharedPointer<StoreFacade<DomainType> >(ptr);
446 }
394 } 447 }
395 qWarning() << "Failed to find facade for resource: " << resource << " and type: " << typeName; 448 qWarning() << "Failed to find facade for resource: " << resource << " and type: " << typeName;
396 return QSharedPointer<StoreFacade<DomainType> >(); 449 return QSharedPointer<StoreFacade<DomainType> >();
397 } 450 }
398 451
399private: 452private:
400 QHash<QByteArray, std::function<void*(void)> > mFacadeRegistry; 453 QHash<QByteArray, FactoryFunction> mFacadeRegistry;
401}; 454};
402 455
403/** 456/**
@@ -416,7 +469,7 @@ public:
416 template <class DomainType> 469 template <class DomainType>
417 static QSharedPointer<ResultEmitter<typename DomainType::Ptr> > load(Query query) 470 static QSharedPointer<ResultEmitter<typename DomainType::Ptr> > load(Query query)
418 { 471 {
419 QSharedPointer<ResultProvider<typename DomainType::Ptr> > resultSet(new ResultProvider<typename DomainType::Ptr>); 472 auto resultSet = QSharedPointer<ResultProvider<typename DomainType::Ptr> >::create();
420 473
421 //Execute the search in a thread. 474 //Execute the search in a thread.
422 //We must guarantee that the emitter is returned before the first result is emitted. 475 //We must guarantee that the emitter is returned before the first result is emitted.
@@ -428,15 +481,15 @@ public:
428 Async::Job<void> job = Async::null<void>(); 481 Async::Job<void> job = Async::null<void>();
429 for(const QByteArray &resource : query.resources) { 482 for(const QByteArray &resource : query.resources) {
430 auto facade = FacadeFactory::instance().getFacade<DomainType>(resource); 483 auto facade = FacadeFactory::instance().getFacade<DomainType>(resource);
431 //We have to bind an instance to the function callback. Since we use a shared pointer this keeps the result provider instance (and thus also the emitter) alive.
432 std::function<void(const typename DomainType::Ptr &)> addCallback = std::bind(&ResultProvider<typename DomainType::Ptr>::add, resultSet, std::placeholders::_1);
433 484
434 // TODO The following is a necessary hack to keep the facade alive. 485 // TODO The following is a necessary hack to keep the facade alive.
435 // Otherwise this would reduce to: 486 // Otherwise this would reduce to:
436 // job = job.then(facade->load(query, addCallback)); 487 // job = job.then(facade->load(query, addCallback));
437 // We somehow have to guarantee that the facade remains valid for the duration of the job 488 // We somehow have to guarantee that the facade remains valid for the duration of the job
438 job = job.then<void>([facade, query, addCallback](Async::Future<void> &future) { 489 // TODO: Use one result set per facade, and merge the results separately
439 Async::Job<void> j = facade->load(query, addCallback); 490 // resultSet->addSubset(facade->query(query));
491 job = job.then<void>([facade, query, resultSet](Async::Future<void> &future) {
492 Async::Job<void> j = facade->load(query, resultSet);
440 j.then<void>([&future, facade](Async::Future<void> &f) { 493 j.then<void>([&future, facade](Async::Future<void> &f) {
441 future.setFinished(); 494 future.setFinished();
442 f.setFinished(); 495 f.setFinished();
diff --git a/common/facade.cpp b/common/facade.cpp
new file mode 100644
index 0000000..e51b32a
--- /dev/null
+++ b/common/facade.cpp
@@ -0,0 +1,20 @@
1/*
2 * Copyright (C) 2015 Christian Mollekopf <chrigi_1@fastmail.fm>
3 *
4 * This program is free software; you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License as published by
6 * the Free Software Foundation; either version 2 of the License, or
7 * (at your option) any later version.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU General Public License for more details.
13 *
14 * You should have received a copy of the GNU General Public License
15 * along with this program; if not, write to the
16 * Free Software Foundation, Inc.,
17 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18 */
19
20#include "facade.h"
diff --git a/common/facade.h b/common/facade.h
index 98bcb38..f9b5a83 100644
--- a/common/facade.h
+++ b/common/facade.h
@@ -30,6 +30,55 @@
30#include "domainadaptor.h" 30#include "domainadaptor.h"
31#include "entitybuffer.h" 31#include "entitybuffer.h"
32 32
33/**
34 * A QueryRunner runs a query and updates the corresponding result set.
35 *
36 * The lifetime of the QueryRunner is defined by the resut set (otherwise it's doing useless work),
37 * and by how long a result set must be updated. If the query is one off the runner dies after the execution,
38 * otherwise it lives on the react to changes and updates the corresponding result set.
39 *
40 * QueryRunner has to keep ResourceAccess alive in order to keep getting updates.
41 */
42class QueryRunner : public QObject
43{
44 Q_OBJECT
45public:
46 typedef std::function<Async::Job<qint64>(qint64 oldRevision, qint64 newRevision)> QueryFunction;
47
48 QueryRunner(const Akonadi2::Query &query) : mLatestRevision(0) {};
49 /**
50 * Starts query
51 */
52 Async::Job<void> run(qint64 newRevision = 0)
53 {
54 //TODO: JOBAPI: that last empty .then should not be necessary
55 return queryFunction(mLatestRevision, newRevision).then<void, qint64>([this](qint64 revision) {
56 mLatestRevision = revision;
57 }).then<void>([](){});
58 }
59
60 /**
61 *
62 */
63 void setQuery(const QueryFunction &query)
64 {
65 queryFunction = query;
66 }
67
68public slots:
69 /**
70 * Rerun query with new revision
71 */
72 void revisionChanged(qint64 newRevision)
73 {
74 run(newRevision).exec();
75 }
76
77private:
78 QueryFunction queryFunction;
79 qint64 mLatestRevision;
80};
81
33namespace Akonadi2 { 82namespace Akonadi2 {
34 class ResourceAccess; 83 class ResourceAccess;
35/** 84/**
@@ -80,7 +129,7 @@ protected:
80 return Async::null<void>(); 129 return Async::null<void>();
81 } 130 }
82 131
83private: 132protected:
84 //TODO use one resource access instance per application => make static 133 //TODO use one resource access instance per application => make static
85 QSharedPointer<Akonadi2::ResourceAccess> mResourceAccess; 134 QSharedPointer<Akonadi2::ResourceAccess> mResourceAccess;
86}; 135};
diff --git a/common/synclistresult.h b/common/synclistresult.h
new file mode 100644
index 0000000..5fa0efd
--- /dev/null
+++ b/common/synclistresult.h
@@ -0,0 +1,54 @@
1#pragma once
2
3#include <QList>
4#include <functional>
5#include <QSharedPointer>
6#include <clientapi.h>
7
8namespace async {
9
10/*
11* A result set specialization that provides a syncronous list.
12*
13* Only for testing purposes.
14*
15* WARNING: The nested eventloop can cause all sorts of trouble. Use only in testing code.
16*/
17template<class T>
18class SyncListResult : public QList<T> {
19public:
20 SyncListResult(const QSharedPointer<ResultEmitter<T> > &emitter)
21 :QList<T>(),
22 mComplete(false),
23 mEmitter(emitter)
24 {
25 emitter->onAdded([this](const T &value) {
26 this->append(value);
27 });
28 emitter->onComplete([this]() {
29 mComplete = true;
30 if (eventLoopAborter) {
31 eventLoopAborter();
32 //Be safe in case of a second invocation of the complete handler
33 eventLoopAborter = std::function<void()>();
34 }
35 });
36 emitter->onClear([this]() {
37 this->clear();
38 });
39 }
40
41 void exec()
42 {
43 QEventLoop eventLoop;
44 eventLoopAborter = [&eventLoop]() { eventLoop.quit(); };
45 eventLoop.exec();
46 }
47
48private:
49 bool mComplete;
50 QSharedPointer<ResultEmitter<T> > mEmitter;
51 std::function<void()> eventLoopAborter;
52};
53
54}
diff --git a/common/test/clientapitest.cpp b/common/test/clientapitest.cpp
index 24b3fb9..789c656 100644
--- a/common/test/clientapitest.cpp
+++ b/common/test/clientapitest.cpp
@@ -3,6 +3,22 @@
3#include <functional> 3#include <functional>
4 4
5#include "../clientapi.h" 5#include "../clientapi.h"
6#include "../facade.h"
7#include "../synclistresult.h"
8
9class RevisionNotifier : public QObject
10{
11 Q_OBJECT
12public:
13 RevisionNotifier() : QObject() {};
14 void notify(qint64 revision)
15 {
16 emit revisionChanged(revision);
17 }
18
19Q_SIGNALS:
20 void revisionChanged(qint64);
21};
6 22
7class DummyResourceFacade : public Akonadi2::StoreFacade<Akonadi2::ApplicationDomain::Event> 23class DummyResourceFacade : public Akonadi2::StoreFacade<Akonadi2::ApplicationDomain::Event>
8{ 24{
@@ -11,18 +27,63 @@ public:
11 virtual Async::Job<void> create(const Akonadi2::ApplicationDomain::Event &domainObject){ return Async::null<void>(); }; 27 virtual Async::Job<void> create(const Akonadi2::ApplicationDomain::Event &domainObject){ return Async::null<void>(); };
12 virtual Async::Job<void> modify(const Akonadi2::ApplicationDomain::Event &domainObject){ return Async::null<void>(); }; 28 virtual Async::Job<void> modify(const Akonadi2::ApplicationDomain::Event &domainObject){ return Async::null<void>(); };
13 virtual Async::Job<void> remove(const Akonadi2::ApplicationDomain::Event &domainObject){ return Async::null<void>(); }; 29 virtual Async::Job<void> remove(const Akonadi2::ApplicationDomain::Event &domainObject){ return Async::null<void>(); };
14 virtual Async::Job<void> load(const Akonadi2::Query &query, const std::function<void(const Akonadi2::ApplicationDomain::Event::Ptr &)> &resultCallback) 30 virtual Async::Job<qint64> load(const Akonadi2::Query &query, const std::function<void(const Akonadi2::ApplicationDomain::Event::Ptr &)> &resultCallback)
15 { 31 {
16 return Async::start<void>([this, resultCallback](Async::Future<void> &future) { 32 return Async::start<qint64>([this, resultCallback](Async::Future<qint64> &future) {
17 qDebug() << "load called"; 33 qDebug() << "load called";
18 for(const auto &result : results) { 34 for(const auto &result : results) {
19 resultCallback(result); 35 resultCallback(result);
20 } 36 }
37 future.setValue(0);
21 future.setFinished(); 38 future.setFinished();
22 }); 39 });
23 } 40 }
24 41
42 Async::Job<void> load(const Akonadi2::Query &query, const QSharedPointer<Akonadi2::ResultProvider<Akonadi2::ApplicationDomain::Event::Ptr> > &resultProvider)
43 {
44 auto runner = QSharedPointer<QueryRunner>::create(query);
45 //The runner only lives as long as the resultProvider
46 resultProvider->setQueryRunner(runner);
47 runner->setQuery([this, resultProvider, query](qint64 oldRevision, qint64 newRevision) -> Async::Job<qint64> {
48 qDebug() << "Creating query for revisions: " << oldRevision << newRevision;
49 return Async::start<qint64>([this, resultProvider, query](Async::Future<qint64> &future) {
50 //TODO only emit changes and don't replace everything
51 resultProvider->clear();
52 //rerun query
53 std::function<void(const Akonadi2::ApplicationDomain::Event::Ptr &)> addCallback = std::bind(&Akonadi2::ResultProvider<Akonadi2::ApplicationDomain::Event::Ptr>::add, resultProvider, std::placeholders::_1);
54 load(query, addCallback).then<void, qint64>([resultProvider, &future](qint64 queriedRevision) {
55 //TODO set revision in result provider?
56 //TODO update all existing results with new revision
57 resultProvider->complete();
58 future.setValue(queriedRevision);
59 future.setFinished();
60 }).exec();
61 });
62 });
63
64 //Ensure the notification is emitted in the right thread
65 //Otherwise we get crashes as we call revisionChanged from the test.
66 if (!notifier) {
67 notifier.reset(new RevisionNotifier);
68 }
69
70 //TODO somehow disconnect as resultNotifier is destroyed. Otherwise we keep the runner alive forever.
71 if (query.liveQuery) {
72 QObject::connect(notifier.data(), &RevisionNotifier::revisionChanged, [runner](qint64 newRevision) {
73 runner->revisionChanged(newRevision);
74 });
75 }
76
77 return Async::start<void>([runner](Async::Future<void> &future) {
78 runner->run().then<void>([&future]() {
79 //TODO if not live query, destroy runner.
80 future.setFinished();
81 }).exec();
82 });
83 }
84
25 QList<Akonadi2::ApplicationDomain::Event::Ptr> results; 85 QList<Akonadi2::ApplicationDomain::Event::Ptr> results;
86 QSharedPointer<RevisionNotifier> notifier;
26}; 87};
27 88
28class ClientAPITest : public QObject 89class ClientAPITest : public QObject
@@ -30,21 +91,60 @@ class ClientAPITest : public QObject
30 Q_OBJECT 91 Q_OBJECT
31private Q_SLOTS: 92private Q_SLOTS:
32 93
94 void initTestCase()
95 {
96 Akonadi2::FacadeFactory::instance().resetFactory();
97 }
98
33 void testLoad() 99 void testLoad()
34 { 100 {
35 DummyResourceFacade facade; 101 DummyResourceFacade facade;
36 facade.results << QSharedPointer<Akonadi2::ApplicationDomain::Event>::create("resource", "id", 0, QSharedPointer<Akonadi2::ApplicationDomain::BufferAdaptor>()); 102 facade.results << QSharedPointer<Akonadi2::ApplicationDomain::Event>::create("resource", "id", 0, QSharedPointer<Akonadi2::ApplicationDomain::BufferAdaptor>());
37 103
38 Akonadi2::FacadeFactory::instance().registerFacade<Akonadi2::ApplicationDomain::Event, DummyResourceFacade>("dummyresource", [facade](){ return new DummyResourceFacade(facade); }); 104 Akonadi2::FacadeFactory::instance().registerFacade<Akonadi2::ApplicationDomain::Event, DummyResourceFacade>("dummyresource",
105 [&facade](bool &externallyManaged) {
106 externallyManaged = true;
107 return &facade;
108 }
109 );
39 110
40 Akonadi2::Query query; 111 Akonadi2::Query query;
41 query.resources << "dummyresource"; 112 query.resources << "dummyresource";
113 query.liveQuery = false;
42 114
43 async::SyncListResult<Akonadi2::ApplicationDomain::Event::Ptr> result(Akonadi2::Store::load<Akonadi2::ApplicationDomain::Event>(query)); 115 async::SyncListResult<Akonadi2::ApplicationDomain::Event::Ptr> result(Akonadi2::Store::load<Akonadi2::ApplicationDomain::Event>(query));
44 result.exec(); 116 result.exec();
45 QCOMPARE(result.size(), 1); 117 QCOMPARE(result.size(), 1);
46 } 118 }
47 119
120 void testLiveQuery()
121 {
122 DummyResourceFacade facade;
123 facade.results << QSharedPointer<Akonadi2::ApplicationDomain::Event>::create("resource", "id", 0, QSharedPointer<Akonadi2::ApplicationDomain::BufferAdaptor>());
124
125 Akonadi2::FacadeFactory::instance().registerFacade<Akonadi2::ApplicationDomain::Event, DummyResourceFacade>("dummyresource",
126 [&facade](bool &externallManage){
127 externallManage = true;
128 return &facade;
129 }
130 );
131
132 Akonadi2::Query query;
133 query.resources << "dummyresource";
134 query.liveQuery = true;
135
136 async::SyncListResult<Akonadi2::ApplicationDomain::Event::Ptr> result(Akonadi2::Store::load<Akonadi2::ApplicationDomain::Event>(query));
137 result.exec();
138 QCOMPARE(result.size(), 1);
139
140 //Enter a second result
141 facade.results << QSharedPointer<Akonadi2::ApplicationDomain::Event>::create("resource", "id2", 0, QSharedPointer<Akonadi2::ApplicationDomain::BufferAdaptor>());
142 qWarning() << &facade;
143 QVERIFY(facade.notifier);
144 facade.notifier->revisionChanged(2);
145 QTRY_COMPARE(result.size(), 2);
146 }
147
48}; 148};
49 149
50QTEST_MAIN(ClientAPITest) 150QTEST_MAIN(ClientAPITest)
diff --git a/dummyresource/facade.cpp b/dummyresource/facade.cpp
index 1477fcf..f603c56 100644
--- a/dummyresource/facade.cpp
+++ b/dummyresource/facade.cpp
@@ -147,15 +147,51 @@ void DummyResourceFacade::readValue(QSharedPointer<Akonadi2::Storage> storage, c
147 }); 147 });
148} 148}
149 149
150Async::Job<void> DummyResourceFacade::load(const Akonadi2::Query &query, const std::function<void(const Akonadi2::ApplicationDomain::Event::Ptr &)> &resultCallback) 150Async::Job<void> DummyResourceFacade::load(const Akonadi2::Query &query, const QSharedPointer<Akonadi2::ResultProvider<Akonadi2::ApplicationDomain::Event::Ptr> > &resultProvider)
151{ 151{
152 return synchronizeResource(query.syncOnDemand, query.processAll).then<void>([=](Async::Future<void> &future) { 152 auto runner = QSharedPointer<QueryRunner>::create(query);
153 //The runner only lives as long as the resultProvider
154 resultProvider->setQueryRunner(runner);
155 runner->setQuery([this, resultProvider, query](qint64 oldRevision, qint64 newRevision) -> Async::Job<qint64> {
156 return Async::start<qint64>([this, resultProvider, query](Async::Future<qint64> &future) {
157 //TODO only emit changes and don't replace everything
158 resultProvider->clear();
159 //rerun query
160 std::function<void(const Akonadi2::ApplicationDomain::Event::Ptr &)> addCallback = std::bind(&Akonadi2::ResultProvider<Akonadi2::ApplicationDomain::Event::Ptr>::add, resultProvider, std::placeholders::_1);
161 load(query, addCallback).then<void, qint64>([resultProvider, &future](qint64 queriedRevision) {
162 //TODO set revision in result provider?
163 //TODO update all existing results with new revision
164 resultProvider->complete();
165 future.setValue(queriedRevision);
166 future.setFinished();
167 }).exec();
168 });
169 });
170
171 auto resourceAccess = mResourceAccess;
172 //TODO we need to somehow disconnect this
173 QObject::connect(resourceAccess.data(), &Akonadi2::ResourceAccess::revisionChanged, [runner](qint64 newRevision) {
174 runner->revisionChanged(newRevision);
175 });
176
177 return Async::start<void>([runner](Async::Future<void> &future) {
178 runner->run().then<void>([&future]() {
179 //TODO if not live query, destroy runner.
180 future.setFinished();
181 }).exec();
182 });
183}
184
185Async::Job<qint64> DummyResourceFacade::load(const Akonadi2::Query &query, const std::function<void(const Akonadi2::ApplicationDomain::Event::Ptr &)> &resultCallback)
186{
187 return synchronizeResource(query.syncOnDemand, query.processAll).then<qint64>([=](Async::Future<qint64> &future) {
153 //Now that the sync is complete we can execute the query 188 //Now that the sync is complete we can execute the query
154 const auto preparedQuery = prepareQuery(query); 189 const auto preparedQuery = prepareQuery(query);
155 190
156 auto storage = QSharedPointer<Akonadi2::Storage>::create(Akonadi2::Store::storageLocation(), "org.kde.dummy"); 191 auto storage = QSharedPointer<Akonadi2::Storage>::create(Akonadi2::Store::storageLocation(), "org.kde.dummy");
157 192
158 //TODO use transaction over full query and record store revision. We'll need it to update the query. 193 storage->startTransaction(Akonadi2::Storage::ReadOnly);
194 const qint64 revision = storage->maxRevision();
159 195
160 //Index lookups 196 //Index lookups
161 QVector<QByteArray> keys; 197 QVector<QByteArray> keys;
@@ -177,6 +213,8 @@ Async::Job<void> DummyResourceFacade::load(const Akonadi2::Query &query, const s
177 readValue(storage, key, resultCallback, preparedQuery); 213 readValue(storage, key, resultCallback, preparedQuery);
178 } 214 }
179 } 215 }
216 storage->abortTransaction();
217 future.setValue(revision);
180 future.setFinished(); 218 future.setFinished();
181 }); 219 });
182} 220}
diff --git a/dummyresource/facade.h b/dummyresource/facade.h
index 37ed81d..3ddfe15 100644
--- a/dummyresource/facade.h
+++ b/dummyresource/facade.h
@@ -34,10 +34,11 @@ class DummyResourceFacade : public Akonadi2::GenericFacade<Akonadi2::Application
34public: 34public:
35 DummyResourceFacade(); 35 DummyResourceFacade();
36 virtual ~DummyResourceFacade(); 36 virtual ~DummyResourceFacade();
37 virtual Async::Job<void> create(const Akonadi2::ApplicationDomain::Event &domainObject); 37 Async::Job<void> create(const Akonadi2::ApplicationDomain::Event &domainObject) Q_DECL_OVERRIDE;
38 virtual Async::Job<void> modify(const Akonadi2::ApplicationDomain::Event &domainObject); 38 Async::Job<void> modify(const Akonadi2::ApplicationDomain::Event &domainObject) Q_DECL_OVERRIDE;
39 virtual Async::Job<void> remove(const Akonadi2::ApplicationDomain::Event &domainObject); 39 Async::Job<void> remove(const Akonadi2::ApplicationDomain::Event &domainObject) Q_DECL_OVERRIDE;
40 virtual Async::Job<void> load(const Akonadi2::Query &query, const std::function<void(const Akonadi2::ApplicationDomain::Event::Ptr &)> &resultCallback); 40 Async::Job<qint64> load(const Akonadi2::Query &query, const std::function<void(const Akonadi2::ApplicationDomain::Event::Ptr &)> &resultCallback) Q_DECL_OVERRIDE;
41 Async::Job<void> load(const Akonadi2::Query &query, const QSharedPointer<Akonadi2::ResultProvider<Akonadi2::ApplicationDomain::Event::Ptr> > &resultProvider) Q_DECL_OVERRIDE;
41 42
42private: 43private:
43 void readValue(QSharedPointer<Akonadi2::Storage> storage, const QByteArray &key, const std::function<void(const Akonadi2::ApplicationDomain::Event::Ptr &)> &resultCallback, std::function<bool(const std::string &key, DummyCalendar::DummyEvent const *buffer, Akonadi2::ApplicationDomain::Buffer::Event const *local)>); 44 void readValue(QSharedPointer<Akonadi2::Storage> storage, const QByteArray &key, const std::function<void(const Akonadi2::ApplicationDomain::Event::Ptr &)> &resultCallback, std::function<bool(const std::string &key, DummyCalendar::DummyEvent const *buffer, Akonadi2::ApplicationDomain::Buffer::Event const *local)>);
diff --git a/tests/dummyresourcebenchmark.cpp b/tests/dummyresourcebenchmark.cpp
index d83cb70..e350747 100644
--- a/tests/dummyresourcebenchmark.cpp
+++ b/tests/dummyresourcebenchmark.cpp
@@ -7,6 +7,7 @@
7#include "clientapi.h" 7#include "clientapi.h"
8#include "commands.h" 8#include "commands.h"
9#include "entitybuffer.h" 9#include "entitybuffer.h"
10#include "synclistresult.h"
10 11
11#include "event_generated.h" 12#include "event_generated.h"
12#include "entity_generated.h" 13#include "entity_generated.h"
diff --git a/tests/dummyresourcetest.cpp b/tests/dummyresourcetest.cpp
index 5fed7cd..9523e5d 100644
--- a/tests/dummyresourcetest.cpp
+++ b/tests/dummyresourcetest.cpp
@@ -2,13 +2,13 @@
2 2
3#include <QString> 3#include <QString>
4 4
5// #include "dummycalendar_generated.h"
6#include "event_generated.h" 5#include "event_generated.h"
7#include "entity_generated.h" 6#include "entity_generated.h"
8#include "metadata_generated.h" 7#include "metadata_generated.h"
9#include "createentity_generated.h" 8#include "createentity_generated.h"
10#include "dummyresource/resourcefactory.h" 9#include "dummyresource/resourcefactory.h"
11#include "clientapi.h" 10#include "clientapi.h"
11#include "synclistresult.h"
12#include "commands.h" 12#include "commands.h"
13#include "entitybuffer.h" 13#include "entitybuffer.h"
14 14