From 77115bab30aa789f9af9fe49006e8747488f8a4c Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Wed, 15 Jun 2016 06:59:06 +0200 Subject: Moved thread-boundary crossing to the model. That way we avoid any unnecessary queuing for the sync API, and enable fine-tuning in the model code at a later stage. --- common/modelresult.cpp | 31 ++++++++++++-- common/modelresult.h | 7 ++-- common/resultprovider.h | 105 ++++++++++++++++------------------------------ common/threadboundary.cpp | 7 +++- 4 files changed, 73 insertions(+), 77 deletions(-) (limited to 'common') diff --git a/common/modelresult.cpp b/common/modelresult.cpp index 4a85610..6feca43 100644 --- a/common/modelresult.cpp +++ b/common/modelresult.cpp @@ -20,9 +20,11 @@ #include "modelresult.h" #include +#include #include "domain/folder.h" #include "log.h" +#include "threadboundary.h" #undef DEBUG_AREA #define DEBUG_AREA "client.modelresult" @@ -63,12 +65,14 @@ qint64 ModelResult::parentId(const Ptr &value) template int ModelResult::rowCount(const QModelIndex &parent) const { + Q_ASSERT(QThread::currentThread() == this->thread()); return mTree[getIdentifier(parent)].size(); } template int ModelResult::columnCount(const QModelIndex &parent) const { + Q_ASSERT(QThread::currentThread() == this->thread()); return mPropertyColumns.size(); } @@ -86,6 +90,7 @@ QVariant ModelResult::headerData(int section, Qt::Orientation orientatio template QVariant ModelResult::data(const QModelIndex &index, int role) const { + Q_ASSERT(QThread::currentThread() == this->thread()); if (role == DomainObjectRole && index.isValid()) { Q_ASSERT(mEntities.contains(index.internalId())); return QVariant::fromValue(mEntities.value(index.internalId())); @@ -112,6 +117,7 @@ QVariant ModelResult::data(const QModelIndex &index, int role) const template QModelIndex ModelResult::index(int row, int column, const QModelIndex &parent) const { + Q_ASSERT(QThread::currentThread() == this->thread()); const auto id = getIdentifier(parent); const auto list = mTree.value(id); if (list.size() > row) { @@ -119,12 +125,14 @@ QModelIndex ModelResult::index(int row, int column, const QModelIndex &p return createIndex(row, column, childId); } Warning() << "Index not available " << row << column << parent; + Q_ASSERT(false); return QModelIndex(); } template QModelIndex ModelResult::createIndexFromId(const qint64 &id) const { + Q_ASSERT(QThread::currentThread() == this->thread()); if (id == 0) { return QModelIndex(); } @@ -185,6 +193,7 @@ void ModelResult::add(const Ptr &value) } if (mEntities.contains(childId)) { Warning() << "Entity already in model " << value->identifier(); + Q_ASSERT(false); return; } // qDebug() << "Inserting rows " << index << parent; @@ -216,6 +225,7 @@ void ModelResult::remove(const Ptr &value) template void ModelResult::fetchEntities(const QModelIndex &parent) { + Q_ASSERT(QThread::currentThread() == this->thread()); const auto id = getIdentifier(parent); mEntityChildrenFetchComplete.remove(id); mEntityChildrenFetched.insert(id); @@ -237,11 +247,26 @@ void ModelResult::setFetcher(const std::function void ModelResult::setEmitter(const typename Sink::ResultEmitter::Ptr &emitter) { + static async::ThreadBoundary threadBoundary; setFetcher([this](const Ptr &parent) { mEmitter->fetch(parent); }); - emitter->onAdded([this](const Ptr &value) { this->add(value); }); - emitter->onModified([this](const Ptr &value) { this->modify(value); }); - emitter->onRemoved([this](const Ptr &value) { this->remove(value); }); + + emitter->onAdded([this](const Ptr &value) { + threadBoundary.callInMainThread([this, value]() { + add(value); + }); + }); + emitter->onModified([this](const Ptr &value) { + threadBoundary.callInMainThread([this, value]() { + modify(value); + }); + }); + emitter->onRemoved([this](const Ptr &value) { + threadBoundary.callInMainThread([this, value]() { + remove(value); + }); + }); emitter->onInitialResultSetComplete([this](const Ptr &parent) { + Trace() << "Initial result set complete"; const qint64 parentId = parent ? qHash(*parent) : 0; const auto parentIndex = createIndexFromId(parentId); mEntityChildrenFetchComplete.insert(parentId); diff --git a/common/modelresult.h b/common/modelresult.h index 0f0c06a..64431da 100644 --- a/common/modelresult.h +++ b/common/modelresult.h @@ -54,15 +54,14 @@ public: bool canFetchMore(const QModelIndex &parent) const; void fetchMore(const QModelIndex &parent); + void setFetcher(const std::function &fetcher); + +private: void add(const Ptr &value); void modify(const Ptr &value); void remove(const Ptr &value); - - void setFetcher(const std::function &fetcher); - bool childrenFetched(const QModelIndex &) const; -private: qint64 parentId(const Ptr &value); QModelIndex createIndexFromId(const qint64 &id) const; void fetchEntities(const QModelIndex &parent); diff --git a/common/resultprovider.h b/common/resultprovider.h index b7d9272..1c1b0e9 100644 --- a/common/resultprovider.h +++ b/common/resultprovider.h @@ -20,10 +20,8 @@ #pragma once -#include #include #include -#include "threadboundary.h" #include "resultset.h" #include "log.h" @@ -75,33 +73,6 @@ private: template class ResultProvider : public ResultProviderInterface { -private: - void callInMainThreadOnEmitter(void (ResultEmitter::*f)()) - { - // We use the eventloop to call the addHandler directly from the main eventloop. - // That way the result emitter implementation doesn't have to care about threadsafety at all. - // The alternative would be to make all handlers of the emitter threadsafe. - if (auto emitter = mResultEmitter.toStrongRef()) { - auto weakEmitter = mResultEmitter; - // We don't want to keep the emitter alive here, so we only capture a weak reference - emitter->mThreadBoundary.callInMainThread([weakEmitter, f]() { - if (auto strongRef = weakEmitter.toStrongRef()) { - (strongRef.data()->*f)(); - } - }); - } - } - - void callInMainThreadOnEmitter(const std::function &f) - { - // We use the eventloop to call the addHandler directly from the main eventloop. - // That way the result emitter implementation doesn't have to care about threadsafety at all. - // The alternative would be to make all handlers of the emitter threadsafe. - if (auto emitter = mResultEmitter.toStrongRef()) { - emitter->mThreadBoundary.callInMainThread(f); - } - } - public: typedef QSharedPointer> Ptr; @@ -112,57 +83,45 @@ public: // Called from worker thread void add(const T &value) { - // Because I don't know how to use bind - auto weakEmitter = mResultEmitter; - callInMainThreadOnEmitter([weakEmitter, value]() { - if (auto strongRef = weakEmitter.toStrongRef()) { - strongRef->addHandler(value); - } - }); + if (auto strongRef = mResultEmitter.toStrongRef()) { + strongRef->addHandler(value); + } } void modify(const T &value) { - // Because I don't know how to use bind - auto weakEmitter = mResultEmitter; - callInMainThreadOnEmitter([weakEmitter, value]() { - if (auto strongRef = weakEmitter.toStrongRef()) { - strongRef->modifyHandler(value); - } - }); + if (auto strongRef = mResultEmitter.toStrongRef()) { + strongRef->modifyHandler(value); + } } void remove(const T &value) { - // Because I don't know how to use bind - auto weakEmitter = mResultEmitter; - callInMainThreadOnEmitter([weakEmitter, value]() { - if (auto strongRef = weakEmitter.toStrongRef()) { - strongRef->removeHandler(value); - } - }); + if (auto strongRef = mResultEmitter.toStrongRef()) { + strongRef->removeHandler(value); + } } void initialResultSetComplete(const T &parent) { - // Because I don't know how to use bind - auto weakEmitter = mResultEmitter; - callInMainThreadOnEmitter([weakEmitter, parent]() { - if (auto strongRef = weakEmitter.toStrongRef()) { - strongRef->initialResultSetComplete(parent); - } - }); + if (auto strongRef = mResultEmitter.toStrongRef()) { + strongRef->initialResultSetComplete(parent); + } } // Called from worker thread void complete() { - callInMainThreadOnEmitter(&ResultEmitter::complete); + if (auto strongRef = mResultEmitter.toStrongRef()) { + strongRef->complete(); + } } void clear() { - callInMainThreadOnEmitter(&ResultEmitter::clear); + if (auto strongRef = mResultEmitter.toStrongRef()) { + strongRef->clear(); + } } @@ -171,7 +130,7 @@ public: if (!mResultEmitter) { // We have to go over a separate var and return that, otherwise we'd delete the emitter immediately again auto sharedPtr = QSharedPointer>(new ResultEmitter, [this](ResultEmitter *emitter) { - mThreadBoundary->callInMainThread([this]() { done(); }); + done(); delete emitter; }); mResultEmitter = sharedPtr; @@ -187,7 +146,6 @@ public: void onDone(const std::function &callback) { - mThreadBoundary = QSharedPointer::create(); mOnDoneCallback = callback; } @@ -205,7 +163,6 @@ public: private: void done() { - qWarning() << "done"; if (mOnDoneCallback) { auto callback = mOnDoneCallback; mOnDoneCallback = std::function(); @@ -216,7 +173,6 @@ private: QWeakPointer> mResultEmitter; std::function mOnDoneCallback; - QSharedPointer mThreadBoundary; std::function mFetcher; }; @@ -331,7 +287,6 @@ private: std::function clearHandler; std::function mFetcher; - async::ThreadBoundary mThreadBoundary; }; template @@ -350,31 +305,43 @@ public: emitter->onInitialResultSetComplete([this, ptr](const DomainType &parent) { auto hashValue = qHash(parent); mInitialResultSetInProgress.remove(hashValue, ptr); - // Normally a parent is only in a single resource, except the toplevel (invalid) parent - if (!mInitialResultSetInProgress.contains(hashValue)) { - this->initialResultSetComplete(parent); - } + callInitialResultCompleteIfDone(parent); }); emitter->onComplete([this]() { this->complete(); }); emitter->onClear([this]() { this->clear(); }); mEmitter << emitter; } + void callInitialResultCompleteIfDone(const DomainType &parent) + { + auto hashValue = qHash(parent); + // Normally a parent is only in a single resource, except the toplevel (invalid) parent + if (!mInitialResultSetInProgress.contains(hashValue) && mAllResultsFetched && !mResultEmitted) { + mResultEmitted = true; + this->initialResultSetComplete(parent); + } + } + void fetch(const DomainType &parent) Q_DECL_OVERRIDE { if (mEmitter.isEmpty()) { - Trace() << "No child emitters, the result is complete"; this->initialResultSetComplete(parent); } else { + mResultEmitted = false; + mAllResultsFetched = false; for (const auto &emitter : mEmitter) { mInitialResultSetInProgress.insert(qHash(parent), emitter.data()); emitter->fetch(parent); } + mAllResultsFetched = true; + callInitialResultCompleteIfDone(parent); } } private: QList::Ptr> mEmitter; QMultiMap *> mInitialResultSetInProgress; + bool mAllResultsFetched; + bool mResultEmitted; }; } diff --git a/common/threadboundary.cpp b/common/threadboundary.cpp index 705009b..22864dd 100644 --- a/common/threadboundary.cpp +++ b/common/threadboundary.cpp @@ -19,6 +19,7 @@ */ #include "threadboundary.h" +#include Q_DECLARE_METATYPE(std::function); @@ -39,7 +40,11 @@ void ThreadBoundary::callInMainThread(std::function f) * than the target thread is able to execute the function calls. In that case any captures will equally pile up, resulting * in significant memory usage i.e. due to Emitter::addHandler calls that each capture a domain object. */ - QMetaObject::invokeMethod(this, "runInMainThread", Qt::QueuedConnection, QGenericReturnArgument(), Q_ARG(std::function, f)); + if (QThread::currentThread() == this->thread()) { + f(); + } else { + QMetaObject::invokeMethod(this, "runInMainThread", Qt::QueuedConnection, QGenericReturnArgument(), Q_ARG(std::function, f)); + } } void ThreadBoundary::runInMainThread(std::function f) -- cgit v1.2.3