summaryrefslogtreecommitdiffstats
path: root/common/resultprovider.h
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2016-06-15 06:59:06 +0200
committerChristian Mollekopf <chrigi_1@fastmail.fm>2016-06-15 07:12:13 +0200
commit77115bab30aa789f9af9fe49006e8747488f8a4c (patch)
tree5a370c6f25fdce3396d8172b509306f8f501d34f /common/resultprovider.h
parentbb70bdcd0eaf72ffc304536267a66c5de5eaf2e9 (diff)
downloadsink-77115bab30aa789f9af9fe49006e8747488f8a4c.tar.gz
sink-77115bab30aa789f9af9fe49006e8747488f8a4c.zip
Moved thread-boundary crossing to the model.
That way we avoid any unnecessary queuing for the sync API, and enable fine-tuning in the model code at a later stage.
Diffstat (limited to 'common/resultprovider.h')
-rw-r--r--common/resultprovider.h105
1 files changed, 36 insertions, 69 deletions
diff --git a/common/resultprovider.h b/common/resultprovider.h
index b7d9272..1c1b0e9 100644
--- a/common/resultprovider.h
+++ b/common/resultprovider.h
@@ -20,10 +20,8 @@
20 20
21#pragma once 21#pragma once
22 22
23#include <QThread>
24#include <functional> 23#include <functional>
25#include <memory> 24#include <memory>
26#include "threadboundary.h"
27#include "resultset.h" 25#include "resultset.h"
28#include "log.h" 26#include "log.h"
29 27
@@ -75,33 +73,6 @@ private:
75template <class T> 73template <class T>
76class ResultProvider : public ResultProviderInterface<T> 74class ResultProvider : public ResultProviderInterface<T>
77{ 75{
78private:
79 void callInMainThreadOnEmitter(void (ResultEmitter<T>::*f)())
80 {
81 // We use the eventloop to call the addHandler directly from the main eventloop.
82 // That way the result emitter implementation doesn't have to care about threadsafety at all.
83 // The alternative would be to make all handlers of the emitter threadsafe.
84 if (auto emitter = mResultEmitter.toStrongRef()) {
85 auto weakEmitter = mResultEmitter;
86 // We don't want to keep the emitter alive here, so we only capture a weak reference
87 emitter->mThreadBoundary.callInMainThread([weakEmitter, f]() {
88 if (auto strongRef = weakEmitter.toStrongRef()) {
89 (strongRef.data()->*f)();
90 }
91 });
92 }
93 }
94
95 void callInMainThreadOnEmitter(const std::function<void()> &f)
96 {
97 // We use the eventloop to call the addHandler directly from the main eventloop.
98 // That way the result emitter implementation doesn't have to care about threadsafety at all.
99 // The alternative would be to make all handlers of the emitter threadsafe.
100 if (auto emitter = mResultEmitter.toStrongRef()) {
101 emitter->mThreadBoundary.callInMainThread(f);
102 }
103 }
104
105public: 76public:
106 typedef QSharedPointer<ResultProvider<T>> Ptr; 77 typedef QSharedPointer<ResultProvider<T>> Ptr;
107 78
@@ -112,57 +83,45 @@ public:
112 // Called from worker thread 83 // Called from worker thread
113 void add(const T &value) 84 void add(const T &value)
114 { 85 {
115 // Because I don't know how to use bind 86 if (auto strongRef = mResultEmitter.toStrongRef()) {
116 auto weakEmitter = mResultEmitter; 87 strongRef->addHandler(value);
117 callInMainThreadOnEmitter([weakEmitter, value]() { 88 }
118 if (auto strongRef = weakEmitter.toStrongRef()) {
119 strongRef->addHandler(value);
120 }
121 });
122 } 89 }
123 90
124 void modify(const T &value) 91 void modify(const T &value)
125 { 92 {
126 // Because I don't know how to use bind 93 if (auto strongRef = mResultEmitter.toStrongRef()) {
127 auto weakEmitter = mResultEmitter; 94 strongRef->modifyHandler(value);
128 callInMainThreadOnEmitter([weakEmitter, value]() { 95 }
129 if (auto strongRef = weakEmitter.toStrongRef()) {
130 strongRef->modifyHandler(value);
131 }
132 });
133 } 96 }
134 97
135 void remove(const T &value) 98 void remove(const T &value)
136 { 99 {
137 // Because I don't know how to use bind 100 if (auto strongRef = mResultEmitter.toStrongRef()) {
138 auto weakEmitter = mResultEmitter; 101 strongRef->removeHandler(value);
139 callInMainThreadOnEmitter([weakEmitter, value]() { 102 }
140 if (auto strongRef = weakEmitter.toStrongRef()) {
141 strongRef->removeHandler(value);
142 }
143 });
144 } 103 }
145 104
146 void initialResultSetComplete(const T &parent) 105 void initialResultSetComplete(const T &parent)
147 { 106 {
148 // Because I don't know how to use bind 107 if (auto strongRef = mResultEmitter.toStrongRef()) {
149 auto weakEmitter = mResultEmitter; 108 strongRef->initialResultSetComplete(parent);
150 callInMainThreadOnEmitter([weakEmitter, parent]() { 109 }
151 if (auto strongRef = weakEmitter.toStrongRef()) {
152 strongRef->initialResultSetComplete(parent);
153 }
154 });
155 } 110 }
156 111
157 // Called from worker thread 112 // Called from worker thread
158 void complete() 113 void complete()
159 { 114 {
160 callInMainThreadOnEmitter(&ResultEmitter<T>::complete); 115 if (auto strongRef = mResultEmitter.toStrongRef()) {
116 strongRef->complete();
117 }
161 } 118 }
162 119
163 void clear() 120 void clear()
164 { 121 {
165 callInMainThreadOnEmitter(&ResultEmitter<T>::clear); 122 if (auto strongRef = mResultEmitter.toStrongRef()) {
123 strongRef->clear();
124 }
166 } 125 }
167 126
168 127
@@ -171,7 +130,7 @@ public:
171 if (!mResultEmitter) { 130 if (!mResultEmitter) {
172 // We have to go over a separate var and return that, otherwise we'd delete the emitter immediately again 131 // We have to go over a separate var and return that, otherwise we'd delete the emitter immediately again
173 auto sharedPtr = QSharedPointer<ResultEmitter<T>>(new ResultEmitter<T>, [this](ResultEmitter<T> *emitter) { 132 auto sharedPtr = QSharedPointer<ResultEmitter<T>>(new ResultEmitter<T>, [this](ResultEmitter<T> *emitter) {
174 mThreadBoundary->callInMainThread([this]() { done(); }); 133 done();
175 delete emitter; 134 delete emitter;
176 }); 135 });
177 mResultEmitter = sharedPtr; 136 mResultEmitter = sharedPtr;
@@ -187,7 +146,6 @@ public:
187 146
188 void onDone(const std::function<void()> &callback) 147 void onDone(const std::function<void()> &callback)
189 { 148 {
190 mThreadBoundary = QSharedPointer<async::ThreadBoundary>::create();
191 mOnDoneCallback = callback; 149 mOnDoneCallback = callback;
192 } 150 }
193 151
@@ -205,7 +163,6 @@ public:
205private: 163private:
206 void done() 164 void done()
207 { 165 {
208 qWarning() << "done";
209 if (mOnDoneCallback) { 166 if (mOnDoneCallback) {
210 auto callback = mOnDoneCallback; 167 auto callback = mOnDoneCallback;
211 mOnDoneCallback = std::function<void()>(); 168 mOnDoneCallback = std::function<void()>();
@@ -216,7 +173,6 @@ private:
216 173
217 QWeakPointer<ResultEmitter<T>> mResultEmitter; 174 QWeakPointer<ResultEmitter<T>> mResultEmitter;
218 std::function<void()> mOnDoneCallback; 175 std::function<void()> mOnDoneCallback;
219 QSharedPointer<async::ThreadBoundary> mThreadBoundary;
220 std::function<void(const T &parent)> mFetcher; 176 std::function<void(const T &parent)> mFetcher;
221}; 177};
222 178
@@ -331,7 +287,6 @@ private:
331 std::function<void(void)> clearHandler; 287 std::function<void(void)> clearHandler;
332 288
333 std::function<void(const DomainType &parent)> mFetcher; 289 std::function<void(const DomainType &parent)> mFetcher;
334 async::ThreadBoundary mThreadBoundary;
335}; 290};
336 291
337template <class DomainType> 292template <class DomainType>
@@ -350,31 +305,43 @@ public:
350 emitter->onInitialResultSetComplete([this, ptr](const DomainType &parent) { 305 emitter->onInitialResultSetComplete([this, ptr](const DomainType &parent) {
351 auto hashValue = qHash(parent); 306 auto hashValue = qHash(parent);
352 mInitialResultSetInProgress.remove(hashValue, ptr); 307 mInitialResultSetInProgress.remove(hashValue, ptr);
353 // Normally a parent is only in a single resource, except the toplevel (invalid) parent 308 callInitialResultCompleteIfDone(parent);
354 if (!mInitialResultSetInProgress.contains(hashValue)) {
355 this->initialResultSetComplete(parent);
356 }
357 }); 309 });
358 emitter->onComplete([this]() { this->complete(); }); 310 emitter->onComplete([this]() { this->complete(); });
359 emitter->onClear([this]() { this->clear(); }); 311 emitter->onClear([this]() { this->clear(); });
360 mEmitter << emitter; 312 mEmitter << emitter;
361 } 313 }
362 314
315 void callInitialResultCompleteIfDone(const DomainType &parent)
316 {
317 auto hashValue = qHash(parent);
318 // Normally a parent is only in a single resource, except the toplevel (invalid) parent
319 if (!mInitialResultSetInProgress.contains(hashValue) && mAllResultsFetched && !mResultEmitted) {
320 mResultEmitted = true;
321 this->initialResultSetComplete(parent);
322 }
323 }
324
363 void fetch(const DomainType &parent) Q_DECL_OVERRIDE 325 void fetch(const DomainType &parent) Q_DECL_OVERRIDE
364 { 326 {
365 if (mEmitter.isEmpty()) { 327 if (mEmitter.isEmpty()) {
366 Trace() << "No child emitters, the result is complete";
367 this->initialResultSetComplete(parent); 328 this->initialResultSetComplete(parent);
368 } else { 329 } else {
330 mResultEmitted = false;
331 mAllResultsFetched = false;
369 for (const auto &emitter : mEmitter) { 332 for (const auto &emitter : mEmitter) {
370 mInitialResultSetInProgress.insert(qHash(parent), emitter.data()); 333 mInitialResultSetInProgress.insert(qHash(parent), emitter.data());
371 emitter->fetch(parent); 334 emitter->fetch(parent);
372 } 335 }
336 mAllResultsFetched = true;
337 callInitialResultCompleteIfDone(parent);
373 } 338 }
374 } 339 }
375 340
376private: 341private:
377 QList<typename ResultEmitter<DomainType>::Ptr> mEmitter; 342 QList<typename ResultEmitter<DomainType>::Ptr> mEmitter;
378 QMultiMap<qint64, ResultEmitter<DomainType> *> mInitialResultSetInProgress; 343 QMultiMap<qint64, ResultEmitter<DomainType> *> mInitialResultSetInProgress;
344 bool mAllResultsFetched;
345 bool mResultEmitted;
379}; 346};
380} 347}