summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2015-04-13 20:15:14 +0200
committerChristian Mollekopf <chrigi_1@fastmail.fm>2015-04-15 09:30:32 +0200
commitc55054e899660f2d667af2c2e573a1267d47358e (patch)
tree0f547effcad0c20521f0bc047a9eb1d4130b052b
parent4652a39fc6869fc5af46367c35027b2b53478268 (diff)
downloadsink-c55054e899660f2d667af2c2e573a1267d47358e.tar.gz
sink-c55054e899660f2d667af2c2e573a1267d47358e.zip
Use a queryrunner to execute queries.
The queryrunner is responsible for running queries and keeping them up to date. This is required for self-updating queries. To get this to work properly the ResultProvider/emitter had to be fixed. The emitter now only lives as long as the client holds a reference to it, allowing the provider to detect when it is no longer necessary to keep the query alive (because noone is listening). In the process various lifetime issues have been fixed, that we're caused by lambdas capturing smartpointers, that then extended the lifetime of the associated objects unpredictably.
-rw-r--r--common/CMakeLists.txt1
-rw-r--r--common/clientapi.h181
-rw-r--r--common/facade.cpp20
-rw-r--r--common/facade.h51
-rw-r--r--common/synclistresult.h54
-rw-r--r--common/test/clientapitest.cpp106
-rw-r--r--dummyresource/facade.cpp44
-rw-r--r--dummyresource/facade.h9
-rw-r--r--tests/dummyresourcebenchmark.cpp1
-rw-r--r--tests/dummyresourcetest.cpp2
10 files changed, 393 insertions, 76 deletions
diff --git a/common/CMakeLists.txt b/common/CMakeLists.txt
index b06718f..a97c7f9 100644
--- a/common/CMakeLists.txt
+++ b/common/CMakeLists.txt
@@ -29,6 +29,7 @@ set(command_SRCS
29 clientapi.cpp 29 clientapi.cpp
30 commands.cpp 30 commands.cpp
31 console.cpp 31 console.cpp
32 facade.cpp
32 pipeline.cpp 33 pipeline.cpp
33 domainadaptor.cpp 34 domainadaptor.cpp
34 resource.cpp 35 resource.cpp
diff --git a/common/clientapi.h b/common/clientapi.h
index 22448b3..c2b9493 100644
--- a/common/clientapi.h
+++ b/common/clientapi.h
@@ -49,17 +49,44 @@ namespace async {
49 */ 49 */
50 template<class T> 50 template<class T>
51 class ResultProvider { 51 class ResultProvider {
52 public: 52 private:
53 //Called from worker thread 53 void callInMainThreadOnEmitter(void (ResultEmitter<T>::*f)())
54 void add(const T &value)
55 { 54 {
56 //We use the eventloop to call the addHandler directly from the main eventloop. 55 //We use the eventloop to call the addHandler directly from the main eventloop.
57 //That way the result emitter implementation doesn't have to care about threadsafety at all. 56 //That way the result emitter implementation doesn't have to care about threadsafety at all.
58 //The alternative would be to make all handlers of the emitter threadsafe. 57 //The alternative would be to make all handlers of the emitter threadsafe.
59 auto emitter = mResultEmitter; 58 if (auto emitter = mResultEmitter.toStrongRef()) {
60 mResultEmitter->mThreadBoundary.callInMainThread([emitter, value]() { 59 auto weakEmitter = mResultEmitter;
61 if (emitter) { 60 //We don't want to keep the emitter alive here, so we only capture a weak reference
62 emitter->addHandler(value); 61 emitter->mThreadBoundary.callInMainThread([weakEmitter, f]() {
62 if (auto strongRef = weakEmitter.toStrongRef()) {
63 (strongRef.data()->*f)();
64 }
65 });
66 }
67 }
68
69 void callInMainThreadOnEmitter(const std::function<void()> &f)
70 {
71 //We use the eventloop to call the addHandler directly from the main eventloop.
72 //That way the result emitter implementation doesn't have to care about threadsafety at all.
73 //The alternative would be to make all handlers of the emitter threadsafe.
74 if (auto emitter = mResultEmitter.toStrongRef()) {
75 emitter->mThreadBoundary.callInMainThread([f]() {
76 f();
77 });
78 }
79 }
80
81 public:
82 //Called from worker thread
83 void add(const T &value)
84 {
85 //Because I don't know how to use bind
86 auto weakEmitter = mResultEmitter;
87 callInMainThreadOnEmitter([weakEmitter, value](){
88 if (auto strongRef = weakEmitter.toStrongRef()) {
89 strongRef->addHandler(value);
63 } 90 }
64 }); 91 });
65 } 92 }
@@ -67,25 +94,39 @@ namespace async {
67 //Called from worker thread 94 //Called from worker thread
68 void complete() 95 void complete()
69 { 96 {
70 auto emitter = mResultEmitter; 97 callInMainThreadOnEmitter(&ResultEmitter<T>::complete);
71 mResultEmitter->mThreadBoundary.callInMainThread([emitter]() {
72 if (emitter) {
73 emitter->completeHandler();
74 }
75 });
76 } 98 }
77 99
100 void clear()
101 {
102 callInMainThreadOnEmitter(&ResultEmitter<T>::clear);
103 }
104
105
78 QSharedPointer<ResultEmitter<T> > emitter() 106 QSharedPointer<ResultEmitter<T> > emitter()
79 { 107 {
80 if (!mResultEmitter) { 108 if (!mResultEmitter) {
81 mResultEmitter = QSharedPointer<ResultEmitter<T> >(new ResultEmitter<T>()); 109 //We have to go over a separate var and return that, otherwise we'd delete the emitter immediately again
110 auto sharedPtr = QSharedPointer<ResultEmitter<T> >::create();
111 mResultEmitter = sharedPtr;
112 return sharedPtr;
82 } 113 }
83 114
84 return mResultEmitter; 115 return mResultEmitter.toStrongRef();
116 }
117
118 /**
119 * For lifetimemanagement only.
120 * We keep the runner alive as long as the result provider exists.
121 */
122 void setQueryRunner(const QSharedPointer<QObject> &runner)
123 {
124 mQueryRunner = runner;
85 } 125 }
86 126
87 private: 127 private:
88 QSharedPointer<ResultEmitter<T> > mResultEmitter; 128 QWeakPointer<ResultEmitter<T> > mResultEmitter;
129 QSharedPointer<QObject> mQueryRunner;
89 }; 130 };
90 131
91 /* 132 /*
@@ -99,7 +140,6 @@ namespace async {
99 * * build async interfaces with signals 140 * * build async interfaces with signals
100 * * build sync interfaces that block when accessing the value 141 * * build sync interfaces that block when accessing the value
101 * 142 *
102 * TODO: This should probably be merged with daniels futurebase used in Async
103 */ 143 */
104 template<class DomainType> 144 template<class DomainType>
105 class ResultEmitter { 145 class ResultEmitter {
@@ -113,51 +153,36 @@ namespace async {
113 { 153 {
114 completeHandler = handler; 154 completeHandler = handler;
115 } 155 }
156 void onClear(const std::function<void(void)> &handler)
157 {
158 clearHandler = handler;
159 }
116 160
117 private: 161 void add(const DomainType &value)
118 friend class ResultProvider<DomainType>; 162 {
119 std::function<void(const DomainType&)> addHandler; 163 addHandler(value);
120 // std::function<void(const T&)> removeHandler; 164 }
121 std::function<void(void)> completeHandler;
122 ThreadBoundary mThreadBoundary;
123 };
124
125 165
126 /* 166 void complete()
127 * A result set specialization that provides a syncronous list
128 */
129 template<class T>
130 class SyncListResult : public QList<T> {
131 public:
132 SyncListResult(const QSharedPointer<ResultEmitter<T> > &emitter)
133 :QList<T>(),
134 mComplete(false),
135 mEmitter(emitter)
136 { 167 {
137 emitter->onAdded([this](const T &value) { 168 completeHandler();
138 this->append(value);
139 });
140 emitter->onComplete([this]() {
141 mComplete = true;
142 auto loop = mWaitLoop.toStrongRef();
143 if (loop) {
144 loop->quit();
145 }
146 });
147 } 169 }
148 170
149 void exec() 171 void clear()
150 { 172 {
151 auto loop = QSharedPointer<QEventLoop>::create(); 173 clearHandler();
152 mWaitLoop = loop;
153 loop->exec(QEventLoop::ExcludeUserInputEvents);
154 } 174 }
155 175
156 private: 176 private:
157 bool mComplete; 177 friend class ResultProvider<DomainType>;
158 QWeakPointer<QEventLoop> mWaitLoop; 178
159 QSharedPointer<ResultEmitter<T> > mEmitter; 179 std::function<void(const DomainType&)> addHandler;
180 // std::function<void(const T&)> removeHandler;
181 std::function<void(void)> completeHandler;
182 std::function<void(void)> clearHandler;
183 ThreadBoundary mThreadBoundary;
160 }; 184 };
185
161} 186}
162 187
163namespace Akonadi2 { 188namespace Akonadi2 {
@@ -307,7 +332,7 @@ using namespace async;
307class Query 332class Query
308{ 333{
309public: 334public:
310 Query() : syncOnDemand(true), processAll(false) {} 335 Query() : syncOnDemand(true), processAll(false), liveQuery(false) {}
311 //Could also be a propertyFilter 336 //Could also be a propertyFilter
312 QByteArrayList resources; 337 QByteArrayList resources;
313 //Could also be a propertyFilter 338 //Could also be a propertyFilter
@@ -318,6 +343,8 @@ public:
318 QSet<QByteArray> requestedProperties; 343 QSet<QByteArray> requestedProperties;
319 bool syncOnDemand; 344 bool syncOnDemand;
320 bool processAll; 345 bool processAll;
346 //If live query is false, this query will not continuously be updated
347 bool liveQuery;
321}; 348};
322 349
323 350
@@ -337,7 +364,9 @@ public:
337 virtual Async::Job<void> create(const DomainType &domainObject) = 0; 364 virtual Async::Job<void> create(const DomainType &domainObject) = 0;
338 virtual Async::Job<void> modify(const DomainType &domainObject) = 0; 365 virtual Async::Job<void> modify(const DomainType &domainObject) = 0;
339 virtual Async::Job<void> remove(const DomainType &domainObject) = 0; 366 virtual Async::Job<void> remove(const DomainType &domainObject) = 0;
340 virtual Async::Job<void> load(const Query &query, const std::function<void(const typename DomainType::Ptr &)> &resultCallback) = 0; 367 //TODO remove from public API
368 virtual Async::Job<qint64> load(const Query &query, const std::function<void(const typename DomainType::Ptr &)> &resultCallback) = 0;
369 virtual Async::Job<void> load(const Query &query, const QSharedPointer<ResultProvider<typename DomainType::Ptr> > &resultProvider) { return Async::null<void>(); };
341}; 370};
342 371
343 372
@@ -349,6 +378,8 @@ public:
349 378
350class FacadeFactory { 379class FacadeFactory {
351public: 380public:
381 typedef std::function<void*(bool &externallyManaged)> FactoryFunction;
382
352 //FIXME: proper singleton implementation 383 //FIXME: proper singleton implementation
353 static FacadeFactory &instance() 384 static FacadeFactory &instance()
354 { 385 {
@@ -365,7 +396,7 @@ public:
365 void registerFacade(const QByteArray &resource) 396 void registerFacade(const QByteArray &resource)
366 { 397 {
367 const QByteArray typeName = ApplicationDomain::getTypeName<DomainType>(); 398 const QByteArray typeName = ApplicationDomain::getTypeName<DomainType>();
368 mFacadeRegistry.insert(key(resource, typeName), [](){ return new Facade; }); 399 mFacadeRegistry.insert(key(resource, typeName), [](bool &externallyManaged){ return new Facade; });
369 } 400 }
370 401
371 /* 402 /*
@@ -375,29 +406,51 @@ public:
375 * The facade factory takes ovnership of the pointer and typically deletes the instance via shared pointer. 406 * The facade factory takes ovnership of the pointer and typically deletes the instance via shared pointer.
376 * Supplied factory functions should therefore always return a new pointer (i.e. via clone()) 407 * Supplied factory functions should therefore always return a new pointer (i.e. via clone())
377 * 408 *
378 * FIXME the factory function should really be returning QSharedPointer<void>, which doesn't work (std::shared_pointer<void> would though). That way i.e. a test could keep the object alive until it's done. 409 * FIXME the factory function should really be returning QSharedPointer<void>, which doesn't work (std::shared_pointer<void> would though). That way i.e. a test could keep the object alive until it's done. As a workaround the factory function can define wether it manages the lifetime of the facade itself.
379 */ 410 */
380 template<class DomainType, class Facade> 411 template<class DomainType, class Facade>
381 void registerFacade(const QByteArray &resource, const std::function<void*(void)> &customFactoryFunction) 412 void registerFacade(const QByteArray &resource, const FactoryFunction &customFactoryFunction)
382 { 413 {
383 const QByteArray typeName = ApplicationDomain::getTypeName<DomainType>(); 414 const QByteArray typeName = ApplicationDomain::getTypeName<DomainType>();
384 mFacadeRegistry.insert(key(resource, typeName), customFactoryFunction); 415 mFacadeRegistry.insert(key(resource, typeName), customFactoryFunction);
385 } 416 }
386 417
418 /*
419 * Can be used to clear the factory.
420 *
421 * Primarily for testing.
422 */
423 void resetFactory()
424 {
425 mFacadeRegistry.clear();
426 }
427
428 static void doNothingDeleter(void *)
429 {
430 qWarning() << "Do nothing";
431 }
432
387 template<class DomainType> 433 template<class DomainType>
388 QSharedPointer<StoreFacade<DomainType> > getFacade(const QByteArray &resource) 434 QSharedPointer<StoreFacade<DomainType> > getFacade(const QByteArray &resource)
389 { 435 {
390 const QByteArray typeName = ApplicationDomain::getTypeName<DomainType>(); 436 const QByteArray typeName = ApplicationDomain::getTypeName<DomainType>();
391 auto factoryFunction = mFacadeRegistry.value(key(resource, typeName)); 437 auto factoryFunction = mFacadeRegistry.value(key(resource, typeName));
392 if (factoryFunction) { 438 if (factoryFunction) {
393 return QSharedPointer<StoreFacade<DomainType> >(static_cast<StoreFacade<DomainType>* >(factoryFunction())); 439 bool externallyManaged = false;
440 auto ptr = static_cast<StoreFacade<DomainType>* >(factoryFunction(externallyManaged));
441 if (externallyManaged) {
442 //Allows tests to manage the lifetime of injected facades themselves
443 return QSharedPointer<StoreFacade<DomainType> >(ptr, doNothingDeleter);
444 } else {
445 return QSharedPointer<StoreFacade<DomainType> >(ptr);
446 }
394 } 447 }
395 qWarning() << "Failed to find facade for resource: " << resource << " and type: " << typeName; 448 qWarning() << "Failed to find facade for resource: " << resource << " and type: " << typeName;
396 return QSharedPointer<StoreFacade<DomainType> >(); 449 return QSharedPointer<StoreFacade<DomainType> >();
397 } 450 }
398 451
399private: 452private:
400 QHash<QByteArray, std::function<void*(void)> > mFacadeRegistry; 453 QHash<QByteArray, FactoryFunction> mFacadeRegistry;
401}; 454};
402 455
403/** 456/**
@@ -416,7 +469,7 @@ public:
416 template <class DomainType> 469 template <class DomainType>
417 static QSharedPointer<ResultEmitter<typename DomainType::Ptr> > load(Query query) 470 static QSharedPointer<ResultEmitter<typename DomainType::Ptr> > load(Query query)
418 { 471 {
419 QSharedPointer<ResultProvider<typename DomainType::Ptr> > resultSet(new ResultProvider<typename DomainType::Ptr>); 472 auto resultSet = QSharedPointer<ResultProvider<typename DomainType::Ptr> >::create();
420 473
421 //Execute the search in a thread. 474 //Execute the search in a thread.
422 //We must guarantee that the emitter is returned before the first result is emitted. 475 //We must guarantee that the emitter is returned before the first result is emitted.
@@ -428,15 +481,15 @@ public:
428 Async::Job<void> job = Async::null<void>(); 481 Async::Job<void> job = Async::null<void>();
429 for(const QByteArray &resource : query.resources) { 482 for(const QByteArray &resource : query.resources) {
430 auto facade = FacadeFactory::instance().getFacade<DomainType>(resource); 483 auto facade = FacadeFactory::instance().getFacade<DomainType>(resource);
431 //We have to bind an instance to the function callback. Since we use a shared pointer this keeps the result provider instance (and thus also the emitter) alive.
432 std::function<void(const typename DomainType::Ptr &)> addCallback = std::bind(&ResultProvider<typename DomainType::Ptr>::add, resultSet, std::placeholders::_1);
433 484
434 // TODO The following is a necessary hack to keep the facade alive. 485 // TODO The following is a necessary hack to keep the facade alive.
435 // Otherwise this would reduce to: 486 // Otherwise this would reduce to:
436 // job = job.then(facade->load(query, addCallback)); 487 // job = job.then(facade->load(query, addCallback));
437 // We somehow have to guarantee that the facade remains valid for the duration of the job 488 // We somehow have to guarantee that the facade remains valid for the duration of the job
438 job = job.then<void>([facade, query, addCallback](Async::Future<void> &future) { 489 // TODO: Use one result set per facade, and merge the results separately
439 Async::Job<void> j = facade->load(query, addCallback); 490 // resultSet->addSubset(facade->query(query));
491 job = job.then<void>([facade, query, resultSet](Async::Future<void> &future) {
492 Async::Job<void> j = facade->load(query, resultSet);
440 j.then<void>([&future, facade](Async::Future<void> &f) { 493 j.then<void>([&future, facade](Async::Future<void> &f) {
441 future.setFinished(); 494 future.setFinished();
442 f.setFinished(); 495 f.setFinished();
diff --git a/common/facade.cpp b/common/facade.cpp
new file mode 100644
index 0000000..e51b32a
--- /dev/null
+++ b/common/facade.cpp
@@ -0,0 +1,20 @@
1/*
2 * Copyright (C) 2015 Christian Mollekopf <chrigi_1@fastmail.fm>
3 *
4 * This program is free software; you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License as published by
6 * the Free Software Foundation; either version 2 of the License, or
7 * (at your option) any later version.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU General Public License for more details.
13 *
14 * You should have received a copy of the GNU General Public License
15 * along with this program; if not, write to the
16 * Free Software Foundation, Inc.,
17 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18 */
19
20#include "facade.h"
diff --git a/common/facade.h b/common/facade.h
index 98bcb38..f9b5a83 100644
--- a/common/facade.h
+++ b/common/facade.h
@@ -30,6 +30,55 @@
30#include "domainadaptor.h" 30#include "domainadaptor.h"
31#include "entitybuffer.h" 31#include "entitybuffer.h"
32 32
33/**
34 * A QueryRunner runs a query and updates the corresponding result set.
35 *
36 * The lifetime of the QueryRunner is defined by the resut set (otherwise it's doing useless work),
37 * and by how long a result set must be updated. If the query is one off the runner dies after the execution,
38 * otherwise it lives on the react to changes and updates the corresponding result set.
39 *
40 * QueryRunner has to keep ResourceAccess alive in order to keep getting updates.
41 */
42class QueryRunner : public QObject
43{
44 Q_OBJECT
45public:
46 typedef std::function<Async::Job<qint64>(qint64 oldRevision, qint64 newRevision)> QueryFunction;
47
48 QueryRunner(const Akonadi2::Query &query) : mLatestRevision(0) {};
49 /**
50 * Starts query
51 */
52 Async::Job<void> run(qint64 newRevision = 0)
53 {
54 //TODO: JOBAPI: that last empty .then should not be necessary
55 return queryFunction(mLatestRevision, newRevision).then<void, qint64>([this](qint64 revision) {
56 mLatestRevision = revision;
57 }).then<void>([](){});
58 }
59
60 /**
61 *
62 */
63 void setQuery(const QueryFunction &query)
64 {
65 queryFunction = query;
66 }
67
68public slots:
69 /**
70 * Rerun query with new revision
71 */
72 void revisionChanged(qint64 newRevision)
73 {
74 run(newRevision).exec();
75 }
76
77private:
78 QueryFunction queryFunction;
79 qint64 mLatestRevision;
80};
81
33namespace Akonadi2 { 82namespace Akonadi2 {
34 class ResourceAccess; 83 class ResourceAccess;
35/** 84/**
@@ -80,7 +129,7 @@ protected:
80 return Async::null<void>(); 129 return Async::null<void>();
81 } 130 }
82 131
83private: 132protected:
84 //TODO use one resource access instance per application => make static 133 //TODO use one resource access instance per application => make static
85 QSharedPointer<Akonadi2::ResourceAccess> mResourceAccess; 134 QSharedPointer<Akonadi2::ResourceAccess> mResourceAccess;
86}; 135};
diff --git a/common/synclistresult.h b/common/synclistresult.h
new file mode 100644
index 0000000..5fa0efd
--- /dev/null
+++ b/common/synclistresult.h
@@ -0,0 +1,54 @@
1#pragma once
2
3#include <QList>
4#include <functional>
5#include <QSharedPointer>
6#include <clientapi.h>
7
8namespace async {
9
10/*
11* A result set specialization that provides a syncronous list.
12*
13* Only for testing purposes.
14*
15* WARNING: The nested eventloop can cause all sorts of trouble. Use only in testing code.
16*/
17template<class T>
18class SyncListResult : public QList<T> {
19public:
20 SyncListResult(const QSharedPointer<ResultEmitter<T> > &emitter)
21 :QList<T>(),
22 mComplete(false),
23 mEmitter(emitter)
24 {
25 emitter->onAdded([this](const T &value) {
26 this->append(value);
27 });
28 emitter->onComplete([this]() {
29 mComplete = true;
30 if (eventLoopAborter) {
31 eventLoopAborter();
32 //Be safe in case of a second invocation of the complete handler
33 eventLoopAborter = std::function<void()>();
34 }
35 });
36 emitter->onClear([this]() {
37 this->clear();
38 });
39 }
40
41 void exec()
42 {
43 QEventLoop eventLoop;
44 eventLoopAborter = [&eventLoop]() { eventLoop.quit(); };
45 eventLoop.exec();
46 }
47
48private:
49 bool mComplete;
50 QSharedPointer<ResultEmitter<T> > mEmitter;
51 std::function<void()> eventLoopAborter;
52};
53
54}
diff --git a/common/test/clientapitest.cpp b/common/test/clientapitest.cpp
index 24b3fb9..789c656 100644
--- a/common/test/clientapitest.cpp
+++ b/common/test/clientapitest.cpp
@@ -3,6 +3,22 @@
3#include <functional> 3#include <functional>
4 4
5#include "../clientapi.h" 5#include "../clientapi.h"
6#include "../facade.h"
7#include "../synclistresult.h"
8
9class RevisionNotifier : public QObject
10{
11 Q_OBJECT
12public:
13 RevisionNotifier() : QObject() {};
14 void notify(qint64 revision)
15 {
16 emit revisionChanged(revision);
17 }
18
19Q_SIGNALS:
20 void revisionChanged(qint64);
21};
6 22
7class DummyResourceFacade : public Akonadi2::StoreFacade<Akonadi2::ApplicationDomain::Event> 23class DummyResourceFacade : public Akonadi2::StoreFacade<Akonadi2::ApplicationDomain::Event>
8{ 24{
@@ -11,18 +27,63 @@ public:
11 virtual Async::Job<void> create(const Akonadi2::ApplicationDomain::Event &domainObject){ return Async::null<void>(); }; 27 virtual Async::Job<void> create(const Akonadi2::ApplicationDomain::Event &domainObject){ return Async::null<void>(); };
12 virtual Async::Job<void> modify(const Akonadi2::ApplicationDomain::Event &domainObject){ return Async::null<void>(); }; 28 virtual Async::Job<void> modify(const Akonadi2::ApplicationDomain::Event &domainObject){ return Async::null<void>(); };
13 virtual Async::Job<void> remove(const Akonadi2::ApplicationDomain::Event &domainObject){ return Async::null<void>(); }; 29 virtual Async::Job<void> remove(const Akonadi2::ApplicationDomain::Event &domainObject){ return Async::null<void>(); };
14 virtual Async::Job<void> load(const Akonadi2::Query &query, const std::function<void(const Akonadi2::ApplicationDomain::Event::Ptr &)> &resultCallback) 30 virtual Async::Job<qint64> load(const Akonadi2::Query &query, const std::function<void(const Akonadi2::ApplicationDomain::Event::Ptr &)> &resultCallback)
15 { 31 {
16 return Async::start<void>([this, resultCallback](Async::Future<void> &future) { 32 return Async::start<qint64>([this, resultCallback](Async::Future<qint64> &future) {
17 qDebug() << "load called"; 33 qDebug() << "load called";
18 for(const auto &result : results) { 34 for(const auto &result : results) {
19 resultCallback(result); 35 resultCallback(result);
20 } 36 }
37 future.setValue(0);
21 future.setFinished(); 38 future.setFinished();
22 }); 39 });
23 } 40 }
24 41
42 Async::Job<void> load(const Akonadi2::Query &query, const QSharedPointer<Akonadi2::ResultProvider<Akonadi2::ApplicationDomain::Event::Ptr> > &resultProvider)
43 {
44 auto runner = QSharedPointer<QueryRunner>::create(query);
45 //The runner only lives as long as the resultProvider
46 resultProvider->setQueryRunner(runner);
47 runner->setQuery([this, resultProvider, query](qint64 oldRevision, qint64 newRevision) -> Async::Job<qint64> {
48 qDebug() << "Creating query for revisions: " << oldRevision << newRevision;
49 return Async::start<qint64>([this, resultProvider, query](Async::Future<qint64> &future) {
50 //TODO only emit changes and don't replace everything
51 resultProvider->clear();
52 //rerun query
53 std::function<void(const Akonadi2::ApplicationDomain::Event::Ptr &)> addCallback = std::bind(&Akonadi2::ResultProvider<Akonadi2::ApplicationDomain::Event::Ptr>::add, resultProvider, std::placeholders::_1);
54 load(query, addCallback).then<void, qint64>([resultProvider, &future](qint64 queriedRevision) {
55 //TODO set revision in result provider?
56 //TODO update all existing results with new revision
57 resultProvider->complete();
58 future.setValue(queriedRevision);
59 future.setFinished();
60 }).exec();
61 });
62 });
63
64 //Ensure the notification is emitted in the right thread
65 //Otherwise we get crashes as we call revisionChanged from the test.
66 if (!notifier) {
67 notifier.reset(new RevisionNotifier);
68 }
69
70 //TODO somehow disconnect as resultNotifier is destroyed. Otherwise we keep the runner alive forever.
71 if (query.liveQuery) {
72 QObject::connect(notifier.data(), &RevisionNotifier::revisionChanged, [runner](qint64 newRevision) {
73 runner->revisionChanged(newRevision);
74 });
75 }
76
77 return Async::start<void>([runner](Async::Future<void> &future) {
78 runner->run().then<void>([&future]() {
79 //TODO if not live query, destroy runner.
80 future.setFinished();
81 }).exec();
82 });
83 }
84
25 QList<Akonadi2::ApplicationDomain::Event::Ptr> results; 85 QList<Akonadi2::ApplicationDomain::Event::Ptr> results;
86 QSharedPointer<RevisionNotifier> notifier;
26}; 87};
27 88
28class ClientAPITest : public QObject 89class ClientAPITest : public QObject
@@ -30,21 +91,60 @@ class ClientAPITest : public QObject
30 Q_OBJECT 91 Q_OBJECT
31private Q_SLOTS: 92private Q_SLOTS:
32 93
94 void initTestCase()
95 {
96 Akonadi2::FacadeFactory::instance().resetFactory();
97 }
98
33 void testLoad() 99 void testLoad()
34 { 100 {
35 DummyResourceFacade facade; 101 DummyResourceFacade facade;
36 facade.results << QSharedPointer<Akonadi2::ApplicationDomain::Event>::create("resource", "id", 0, QSharedPointer<Akonadi2::ApplicationDomain::BufferAdaptor>()); 102 facade.results << QSharedPointer<Akonadi2::ApplicationDomain::Event>::create("resource", "id", 0, QSharedPointer<Akonadi2::ApplicationDomain::BufferAdaptor>());
37 103
38 Akonadi2::FacadeFactory::instance().registerFacade<Akonadi2::ApplicationDomain::Event, DummyResourceFacade>("dummyresource", [facade](){ return new DummyResourceFacade(facade); }); 104 Akonadi2::FacadeFactory::instance().registerFacade<Akonadi2::ApplicationDomain::Event, DummyResourceFacade>("dummyresource",
105 [&facade](bool &externallyManaged) {
106 externallyManaged = true;
107 return &facade;
108 }
109 );
39 110
40 Akonadi2::Query query; 111 Akonadi2::Query query;
41 query.resources << "dummyresource"; 112 query.resources << "dummyresource";
113 query.liveQuery = false;
42 114
43 async::SyncListResult<Akonadi2::ApplicationDomain::Event::Ptr> result(Akonadi2::Store::load<Akonadi2::ApplicationDomain::Event>(query)); 115 async::SyncListResult<Akonadi2::ApplicationDomain::Event::Ptr> result(Akonadi2::Store::load<Akonadi2::ApplicationDomain::Event>(query));
44 result.exec(); 116 result.exec();
45 QCOMPARE(result.size(), 1); 117 QCOMPARE(result.size(), 1);
46 } 118 }
47 119
120 void testLiveQuery()
121 {
122 DummyResourceFacade facade;
123 facade.results << QSharedPointer<Akonadi2::ApplicationDomain::Event>::create("resource", "id", 0, QSharedPointer<Akonadi2::ApplicationDomain::BufferAdaptor>());
124
125 Akonadi2::FacadeFactory::instance().registerFacade<Akonadi2::ApplicationDomain::Event, DummyResourceFacade>("dummyresource",
126 [&facade](bool &externallManage){
127 externallManage = true;
128 return &facade;
129 }
130 );
131
132 Akonadi2::Query query;
133 query.resources << "dummyresource";
134 query.liveQuery = true;
135
136 async::SyncListResult<Akonadi2::ApplicationDomain::Event::Ptr> result(Akonadi2::Store::load<Akonadi2::ApplicationDomain::Event>(query));
137 result.exec();
138 QCOMPARE(result.size(), 1);
139
140 //Enter a second result
141 facade.results << QSharedPointer<Akonadi2::ApplicationDomain::Event>::create("resource", "id2", 0, QSharedPointer<Akonadi2::ApplicationDomain::BufferAdaptor>());
142 qWarning() << &facade;
143 QVERIFY(facade.notifier);
144 facade.notifier->revisionChanged(2);
145 QTRY_COMPARE(result.size(), 2);
146 }
147
48}; 148};
49 149
50QTEST_MAIN(ClientAPITest) 150QTEST_MAIN(ClientAPITest)
diff --git a/dummyresource/facade.cpp b/dummyresource/facade.cpp
index 1477fcf..f603c56 100644
--- a/dummyresource/facade.cpp
+++ b/dummyresource/facade.cpp
@@ -147,15 +147,51 @@ void DummyResourceFacade::readValue(QSharedPointer<Akonadi2::Storage> storage, c
147 }); 147 });
148} 148}
149 149
150Async::Job<void> DummyResourceFacade::load(const Akonadi2::Query &query, const std::function<void(const Akonadi2::ApplicationDomain::Event::Ptr &)> &resultCallback) 150Async::Job<void> DummyResourceFacade::load(const Akonadi2::Query &query, const QSharedPointer<Akonadi2::ResultProvider<Akonadi2::ApplicationDomain::Event::Ptr> > &resultProvider)
151{ 151{
152 return synchronizeResource(query.syncOnDemand, query.processAll).then<void>([=](Async::Future<void> &future) { 152 auto runner = QSharedPointer<QueryRunner>::create(query);
153 //The runner only lives as long as the resultProvider
154 resultProvider->setQueryRunner(runner);
155 runner->setQuery([this, resultProvider, query](qint64 oldRevision, qint64 newRevision) -> Async::Job<qint64> {
156 return Async::start<qint64>([this, resultProvider, query](Async::Future<qint64> &future) {
157 //TODO only emit changes and don't replace everything
158 resultProvider->clear();
159 //rerun query
160 std::function<void(const Akonadi2::ApplicationDomain::Event::Ptr &)> addCallback = std::bind(&Akonadi2::ResultProvider<Akonadi2::ApplicationDomain::Event::Ptr>::add, resultProvider, std::placeholders::_1);
161 load(query, addCallback).then<void, qint64>([resultProvider, &future](qint64 queriedRevision) {
162 //TODO set revision in result provider?
163 //TODO update all existing results with new revision
164 resultProvider->complete();
165 future.setValue(queriedRevision);
166 future.setFinished();
167 }).exec();
168 });
169 });
170
171 auto resourceAccess = mResourceAccess;
172 //TODO we need to somehow disconnect this
173 QObject::connect(resourceAccess.data(), &Akonadi2::ResourceAccess::revisionChanged, [runner](qint64 newRevision) {
174 runner->revisionChanged(newRevision);
175 });
176
177 return Async::start<void>([runner](Async::Future<void> &future) {
178 runner->run().then<void>([&future]() {
179 //TODO if not live query, destroy runner.
180 future.setFinished();
181 }).exec();
182 });
183}
184
185Async::Job<qint64> DummyResourceFacade::load(const Akonadi2::Query &query, const std::function<void(const Akonadi2::ApplicationDomain::Event::Ptr &)> &resultCallback)
186{
187 return synchronizeResource(query.syncOnDemand, query.processAll).then<qint64>([=](Async::Future<qint64> &future) {
153 //Now that the sync is complete we can execute the query 188 //Now that the sync is complete we can execute the query
154 const auto preparedQuery = prepareQuery(query); 189 const auto preparedQuery = prepareQuery(query);
155 190
156 auto storage = QSharedPointer<Akonadi2::Storage>::create(Akonadi2::Store::storageLocation(), "org.kde.dummy"); 191 auto storage = QSharedPointer<Akonadi2::Storage>::create(Akonadi2::Store::storageLocation(), "org.kde.dummy");
157 192
158 //TODO use transaction over full query and record store revision. We'll need it to update the query. 193 storage->startTransaction(Akonadi2::Storage::ReadOnly);
194 const qint64 revision = storage->maxRevision();
159 195
160 //Index lookups 196 //Index lookups
161 QVector<QByteArray> keys; 197 QVector<QByteArray> keys;
@@ -177,6 +213,8 @@ Async::Job<void> DummyResourceFacade::load(const Akonadi2::Query &query, const s
177 readValue(storage, key, resultCallback, preparedQuery); 213 readValue(storage, key, resultCallback, preparedQuery);
178 } 214 }
179 } 215 }
216 storage->abortTransaction();
217 future.setValue(revision);
180 future.setFinished(); 218 future.setFinished();
181 }); 219 });
182} 220}
diff --git a/dummyresource/facade.h b/dummyresource/facade.h
index 37ed81d..3ddfe15 100644
--- a/dummyresource/facade.h
+++ b/dummyresource/facade.h
@@ -34,10 +34,11 @@ class DummyResourceFacade : public Akonadi2::GenericFacade<Akonadi2::Application
34public: 34public:
35 DummyResourceFacade(); 35 DummyResourceFacade();
36 virtual ~DummyResourceFacade(); 36 virtual ~DummyResourceFacade();
37 virtual Async::Job<void> create(const Akonadi2::ApplicationDomain::Event &domainObject); 37 Async::Job<void> create(const Akonadi2::ApplicationDomain::Event &domainObject) Q_DECL_OVERRIDE;
38 virtual Async::Job<void> modify(const Akonadi2::ApplicationDomain::Event &domainObject); 38 Async::Job<void> modify(const Akonadi2::ApplicationDomain::Event &domainObject) Q_DECL_OVERRIDE;
39 virtual Async::Job<void> remove(const Akonadi2::ApplicationDomain::Event &domainObject); 39 Async::Job<void> remove(const Akonadi2::ApplicationDomain::Event &domainObject) Q_DECL_OVERRIDE;
40 virtual Async::Job<void> load(const Akonadi2::Query &query, const std::function<void(const Akonadi2::ApplicationDomain::Event::Ptr &)> &resultCallback); 40 Async::Job<qint64> load(const Akonadi2::Query &query, const std::function<void(const Akonadi2::ApplicationDomain::Event::Ptr &)> &resultCallback) Q_DECL_OVERRIDE;
41 Async::Job<void> load(const Akonadi2::Query &query, const QSharedPointer<Akonadi2::ResultProvider<Akonadi2::ApplicationDomain::Event::Ptr> > &resultProvider) Q_DECL_OVERRIDE;
41 42
42private: 43private:
43 void readValue(QSharedPointer<Akonadi2::Storage> storage, const QByteArray &key, const std::function<void(const Akonadi2::ApplicationDomain::Event::Ptr &)> &resultCallback, std::function<bool(const std::string &key, DummyCalendar::DummyEvent const *buffer, Akonadi2::ApplicationDomain::Buffer::Event const *local)>); 44 void readValue(QSharedPointer<Akonadi2::Storage> storage, const QByteArray &key, const std::function<void(const Akonadi2::ApplicationDomain::Event::Ptr &)> &resultCallback, std::function<bool(const std::string &key, DummyCalendar::DummyEvent const *buffer, Akonadi2::ApplicationDomain::Buffer::Event const *local)>);
diff --git a/tests/dummyresourcebenchmark.cpp b/tests/dummyresourcebenchmark.cpp
index d83cb70..e350747 100644
--- a/tests/dummyresourcebenchmark.cpp
+++ b/tests/dummyresourcebenchmark.cpp
@@ -7,6 +7,7 @@
7#include "clientapi.h" 7#include "clientapi.h"
8#include "commands.h" 8#include "commands.h"
9#include "entitybuffer.h" 9#include "entitybuffer.h"
10#include "synclistresult.h"
10 11
11#include "event_generated.h" 12#include "event_generated.h"
12#include "entity_generated.h" 13#include "entity_generated.h"
diff --git a/tests/dummyresourcetest.cpp b/tests/dummyresourcetest.cpp
index 5fed7cd..9523e5d 100644
--- a/tests/dummyresourcetest.cpp
+++ b/tests/dummyresourcetest.cpp
@@ -2,13 +2,13 @@
2 2
3#include <QString> 3#include <QString>
4 4
5// #include "dummycalendar_generated.h"
6#include "event_generated.h" 5#include "event_generated.h"
7#include "entity_generated.h" 6#include "entity_generated.h"
8#include "metadata_generated.h" 7#include "metadata_generated.h"
9#include "createentity_generated.h" 8#include "createentity_generated.h"
10#include "dummyresource/resourcefactory.h" 9#include "dummyresource/resourcefactory.h"
11#include "clientapi.h" 10#include "clientapi.h"
11#include "synclistresult.h"
12#include "commands.h" 12#include "commands.h"
13#include "entitybuffer.h" 13#include "entitybuffer.h"
14 14