From 39dc9e84a5b2b8fa8a62abf370c971898695ad4a Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Sun, 14 Dec 2014 11:54:38 +0100 Subject: Threaded query processing. --- client/CMakeLists.txt | 3 ++- client/clientapi.h | 46 ++++++++++++++++++++++++++++++---------------- client/test/CMakeLists.txt | 2 +- client/threadboundary.cpp | 9 +++++++++ client/threadboundary.h | 27 +++++++++++++++++++++++++++ 5 files changed, 69 insertions(+), 18 deletions(-) create mode 100644 client/threadboundary.cpp create mode 100644 client/threadboundary.h (limited to 'client') diff --git a/client/CMakeLists.txt b/client/CMakeLists.txt index 8001d5f..1ebf5fd 100644 --- a/client/CMakeLists.txt +++ b/client/CMakeLists.txt @@ -4,9 +4,10 @@ include_directories(${CMAKE_CURRENT_SOURCE_DIR} ${CMAKE_CURRENT_BINARY_DIR}) set(akonadi2client_SRCS resourceaccess.cpp + threadboundary.cpp ) -add_library(${PROJECT_NAME}_lib ${akonadi2client_SRCS}) +add_library(${PROJECT_NAME}_lib SHARED ${akonadi2client_SRCS}) target_link_libraries(${PROJECT_NAME}_lib akonadi2common) qt5_use_modules(${PROJECT_NAME}_lib Widgets Network) diff --git a/client/clientapi.h b/client/clientapi.h index 662f96c..a0020af 100644 --- a/client/clientapi.h +++ b/client/clientapi.h @@ -7,48 +7,60 @@ #include #include #include +#include #include +#include "threadboundary.h" namespace async { //This should abstract if we execute from eventloop or in thread. //It supposed to allow the caller to finish the current method before executing the runner. void run(const std::function &runner) { - //FIXME we should be using a Job instead of a timer - auto timer = new QTimer; - timer->setSingleShot(true); - QObject::connect(timer, &QTimer::timeout, runner); - QObject::connect(timer, &QTimer::timeout, timer, &QObject::deleteLater); - timer->start(0); + QtConcurrent::run(runner); + + // //FIXME we should be using a Job instead of a timer + // auto timer = new QTimer; + // timer->setSingleShot(true); + // QObject::connect(timer, &QTimer::timeout, runner); + // QObject::connect(timer, &QTimer::timeout, timer, &QObject::deleteLater); + // timer->start(0); }; /** * Query result set - * - * This should probably become part of a generic kasync library. - * - * Functional is nice because we don't have to store data in the emitter - * Non functional and storing may be the right thing because we want an in-memory representation of the set - * non-functional also allows us to batch move data across thread boundaries. */ template class ResultEmitter; /* - * The promise side for the result provider + * The promise side for the result emitter */ template class ResultProvider { public: + //Called from worker thread void add(const T &value) { - //the handler will be called in the other thread, protect - mResultEmitter->addHandler(value); + //We use the eventloop to call the addHandler directly from the main eventloop. + //That way the result emitter implementation doesn't have to care about threadsafety at all. + //The alternative would be to make all handlers of the emitter threadsafe. + auto emitter = mResultEmitter; + mResultEmitter->mThreadBoundary.callInMainThread([emitter, value]() { + if (emitter) { + emitter->addHandler(value); + } + }); } + //Called from worker thread void complete() { - mResultEmitter->completeHandler(); + auto emitter = mResultEmitter; + mResultEmitter->mThreadBoundary.callInMainThread([emitter]() { + if (emitter) { + emitter->completeHandler(); + } + }); } QSharedPointer > emitter() @@ -92,6 +104,7 @@ namespace async { std::function addHandler; // std::function removeHandler; std::function completeHandler; + ThreadBoundary mThreadBoundary; }; @@ -348,6 +361,7 @@ public: async::run([resultSet, query](){ // Query all resources and aggregate results // query tells us in which resources we're interested + // TODO: queries to individual resources could be parallelized for(const QString &resource : query.resources) { auto facade = FacadeFactory::instance().getFacade(resource); //We have to bind an instance to the function callback. Since we use a shared pointer this keeps the result provider instance (and thus also the emitter) alive. diff --git a/client/test/CMakeLists.txt b/client/test/CMakeLists.txt index 6453fc5..0d5da1b 100644 --- a/client/test/CMakeLists.txt +++ b/client/test/CMakeLists.txt @@ -5,7 +5,7 @@ macro(auto_tests) foreach(_testname ${ARGN}) add_executable(${_testname} ${_testname}.cpp ${store_SRCS}) qt5_use_modules(${_testname} Core Test) - target_link_libraries(${_testname} lmdb) + target_link_libraries(${_testname} lmdb akonadi2_client_lib) add_test(NAME ${_testname} COMMAND ${_testname}) endforeach(_testname) endmacro(auto_tests) diff --git a/client/threadboundary.cpp b/client/threadboundary.cpp new file mode 100644 index 0000000..cd30e74 --- /dev/null +++ b/client/threadboundary.cpp @@ -0,0 +1,9 @@ +#include "threadboundary.h" + +Q_DECLARE_METATYPE(std::function); + +namespace async { +ThreadBoundary::ThreadBoundary(): QObject() { qRegisterMetaType >("std::function"); } +ThreadBoundary:: ~ThreadBoundary() {} +} + diff --git a/client/threadboundary.h b/client/threadboundary.h new file mode 100644 index 0000000..8847849 --- /dev/null +++ b/client/threadboundary.h @@ -0,0 +1,27 @@ +#pragma once + +#include +#include + +namespace async { + /* + * A helper class to invoke a method in a different thread using the event loop. + * The ThreadBoundary object must live in the thread where the function should be called. + */ + class ThreadBoundary : public QObject { + Q_OBJECT + public: + ThreadBoundary(); + virtual ~ThreadBoundary(); + + //Call in worker thread + void callInMainThread(std::function f) { + QMetaObject::invokeMethod(this, "addValueInMainThread", Qt::QueuedConnection, QGenericReturnArgument(), Q_ARG(std::function, f)); + } + public slots: + //Get's called in main thread by it's eventloop + void addValueInMainThread(std::function f) { + f(); + } + }; +} -- cgit v1.2.3