summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2014-12-14 11:54:38 +0100
committerChristian Mollekopf <chrigi_1@fastmail.fm>2014-12-14 19:14:42 +0100
commit39dc9e84a5b2b8fa8a62abf370c971898695ad4a (patch)
tree80c6880b0002d94b06cc344479df20b95346233a
parent4dceb1b07227445b88a09cc1b88bfc5878a322bb (diff)
downloadsink-39dc9e84a5b2b8fa8a62abf370c971898695ad4a.tar.gz
sink-39dc9e84a5b2b8fa8a62abf370c971898695ad4a.zip
Threaded query processing.
-rw-r--r--client/CMakeLists.txt3
-rw-r--r--client/clientapi.h46
-rw-r--r--client/test/CMakeLists.txt2
-rw-r--r--client/threadboundary.cpp9
-rw-r--r--client/threadboundary.h27
5 files changed, 69 insertions, 18 deletions
diff --git a/client/CMakeLists.txt b/client/CMakeLists.txt
index 8001d5f..1ebf5fd 100644
--- a/client/CMakeLists.txt
+++ b/client/CMakeLists.txt
@@ -4,9 +4,10 @@ include_directories(${CMAKE_CURRENT_SOURCE_DIR} ${CMAKE_CURRENT_BINARY_DIR})
4 4
5set(akonadi2client_SRCS 5set(akonadi2client_SRCS
6 resourceaccess.cpp 6 resourceaccess.cpp
7 threadboundary.cpp
7) 8)
8 9
9add_library(${PROJECT_NAME}_lib ${akonadi2client_SRCS}) 10add_library(${PROJECT_NAME}_lib SHARED ${akonadi2client_SRCS})
10target_link_libraries(${PROJECT_NAME}_lib akonadi2common) 11target_link_libraries(${PROJECT_NAME}_lib akonadi2common)
11qt5_use_modules(${PROJECT_NAME}_lib Widgets Network) 12qt5_use_modules(${PROJECT_NAME}_lib Widgets Network)
12 13
diff --git a/client/clientapi.h b/client/clientapi.h
index 662f96c..a0020af 100644
--- a/client/clientapi.h
+++ b/client/clientapi.h
@@ -7,48 +7,60 @@
7#include <QTimer> 7#include <QTimer>
8#include <QDebug> 8#include <QDebug>
9#include <QEventLoop> 9#include <QEventLoop>
10#include <QtConcurrent/QtConcurrentRun>
10#include <functional> 11#include <functional>
12#include "threadboundary.h"
11 13
12namespace async { 14namespace async {
13 //This should abstract if we execute from eventloop or in thread. 15 //This should abstract if we execute from eventloop or in thread.
14 //It supposed to allow the caller to finish the current method before executing the runner. 16 //It supposed to allow the caller to finish the current method before executing the runner.
15 void run(const std::function<void()> &runner) { 17 void run(const std::function<void()> &runner) {
16 //FIXME we should be using a Job instead of a timer 18 QtConcurrent::run(runner);
17 auto timer = new QTimer; 19
18 timer->setSingleShot(true); 20 // //FIXME we should be using a Job instead of a timer
19 QObject::connect(timer, &QTimer::timeout, runner); 21 // auto timer = new QTimer;
20 QObject::connect(timer, &QTimer::timeout, timer, &QObject::deleteLater); 22 // timer->setSingleShot(true);
21 timer->start(0); 23 // QObject::connect(timer, &QTimer::timeout, runner);
24 // QObject::connect(timer, &QTimer::timeout, timer, &QObject::deleteLater);
25 // timer->start(0);
22 }; 26 };
23 27
24 /** 28 /**
25 * Query result set 29 * Query result set
26 *
27 * This should probably become part of a generic kasync library.
28 *
29 * Functional is nice because we don't have to store data in the emitter
30 * Non functional and storing may be the right thing because we want an in-memory representation of the set
31 * non-functional also allows us to batch move data across thread boundaries.
32 */ 30 */
33 31
34 template<class T> 32 template<class T>
35 class ResultEmitter; 33 class ResultEmitter;
36 34
37 /* 35 /*
38 * The promise side for the result provider 36 * The promise side for the result emitter
39 */ 37 */
40 template<class T> 38 template<class T>
41 class ResultProvider { 39 class ResultProvider {
42 public: 40 public:
41 //Called from worker thread
43 void add(const T &value) 42 void add(const T &value)
44 { 43 {
45 //the handler will be called in the other thread, protect 44 //We use the eventloop to call the addHandler directly from the main eventloop.
46 mResultEmitter->addHandler(value); 45 //That way the result emitter implementation doesn't have to care about threadsafety at all.
46 //The alternative would be to make all handlers of the emitter threadsafe.
47 auto emitter = mResultEmitter;
48 mResultEmitter->mThreadBoundary.callInMainThread([emitter, value]() {
49 if (emitter) {
50 emitter->addHandler(value);
51 }
52 });
47 } 53 }
48 54
55 //Called from worker thread
49 void complete() 56 void complete()
50 { 57 {
51 mResultEmitter->completeHandler(); 58 auto emitter = mResultEmitter;
59 mResultEmitter->mThreadBoundary.callInMainThread([emitter]() {
60 if (emitter) {
61 emitter->completeHandler();
62 }
63 });
52 } 64 }
53 65
54 QSharedPointer<ResultEmitter<T> > emitter() 66 QSharedPointer<ResultEmitter<T> > emitter()
@@ -92,6 +104,7 @@ namespace async {
92 std::function<void(const DomainType&)> addHandler; 104 std::function<void(const DomainType&)> addHandler;
93 // std::function<void(const T&)> removeHandler; 105 // std::function<void(const T&)> removeHandler;
94 std::function<void(void)> completeHandler; 106 std::function<void(void)> completeHandler;
107 ThreadBoundary mThreadBoundary;
95 }; 108 };
96 109
97 110
@@ -348,6 +361,7 @@ public:
348 async::run([resultSet, query](){ 361 async::run([resultSet, query](){
349 // Query all resources and aggregate results 362 // Query all resources and aggregate results
350 // query tells us in which resources we're interested 363 // query tells us in which resources we're interested
364 // TODO: queries to individual resources could be parallelized
351 for(const QString &resource : query.resources) { 365 for(const QString &resource : query.resources) {
352 auto facade = FacadeFactory::instance().getFacade<DomainType>(resource); 366 auto facade = FacadeFactory::instance().getFacade<DomainType>(resource);
353 //We have to bind an instance to the function callback. Since we use a shared pointer this keeps the result provider instance (and thus also the emitter) alive. 367 //We have to bind an instance to the function callback. Since we use a shared pointer this keeps the result provider instance (and thus also the emitter) alive.
diff --git a/client/test/CMakeLists.txt b/client/test/CMakeLists.txt
index 6453fc5..0d5da1b 100644
--- a/client/test/CMakeLists.txt
+++ b/client/test/CMakeLists.txt
@@ -5,7 +5,7 @@ macro(auto_tests)
5 foreach(_testname ${ARGN}) 5 foreach(_testname ${ARGN})
6 add_executable(${_testname} ${_testname}.cpp ${store_SRCS}) 6 add_executable(${_testname} ${_testname}.cpp ${store_SRCS})
7 qt5_use_modules(${_testname} Core Test) 7 qt5_use_modules(${_testname} Core Test)
8 target_link_libraries(${_testname} lmdb) 8 target_link_libraries(${_testname} lmdb akonadi2_client_lib)
9 add_test(NAME ${_testname} COMMAND ${_testname}) 9 add_test(NAME ${_testname} COMMAND ${_testname})
10 endforeach(_testname) 10 endforeach(_testname)
11endmacro(auto_tests) 11endmacro(auto_tests)
diff --git a/client/threadboundary.cpp b/client/threadboundary.cpp
new file mode 100644
index 0000000..cd30e74
--- /dev/null
+++ b/client/threadboundary.cpp
@@ -0,0 +1,9 @@
1#include "threadboundary.h"
2
3Q_DECLARE_METATYPE(std::function<void()>);
4
5namespace async {
6ThreadBoundary::ThreadBoundary(): QObject() { qRegisterMetaType<std::function<void()> >("std::function<void()>"); }
7ThreadBoundary:: ~ThreadBoundary() {}
8}
9
diff --git a/client/threadboundary.h b/client/threadboundary.h
new file mode 100644
index 0000000..8847849
--- /dev/null
+++ b/client/threadboundary.h
@@ -0,0 +1,27 @@
1#pragma once
2
3#include <QObject>
4#include <functional>
5
6namespace async {
7 /*
8 * A helper class to invoke a method in a different thread using the event loop.
9 * The ThreadBoundary object must live in the thread where the function should be called.
10 */
11 class ThreadBoundary : public QObject {
12 Q_OBJECT
13 public:
14 ThreadBoundary();
15 virtual ~ThreadBoundary();
16
17 //Call in worker thread
18 void callInMainThread(std::function<void()> f) {
19 QMetaObject::invokeMethod(this, "addValueInMainThread", Qt::QueuedConnection, QGenericReturnArgument(), Q_ARG(std::function<void()>, f));
20 }
21 public slots:
22 //Get's called in main thread by it's eventloop
23 void addValueInMainThread(std::function<void()> f) {
24 f();
25 }
26 };
27}