diff options
author | Christian Mollekopf <chrigi_1@fastmail.fm> | 2016-06-15 06:59:06 +0200 |
---|---|---|
committer | Christian Mollekopf <chrigi_1@fastmail.fm> | 2016-06-15 07:12:13 +0200 |
commit | 77115bab30aa789f9af9fe49006e8747488f8a4c (patch) | |
tree | 5a370c6f25fdce3396d8172b509306f8f501d34f /common/resultprovider.h | |
parent | bb70bdcd0eaf72ffc304536267a66c5de5eaf2e9 (diff) | |
download | sink-77115bab30aa789f9af9fe49006e8747488f8a4c.tar.gz sink-77115bab30aa789f9af9fe49006e8747488f8a4c.zip |
Moved thread-boundary crossing to the model.
That way we avoid any unnecessary queuing for the sync API,
and enable fine-tuning in the model code at a later stage.
Diffstat (limited to 'common/resultprovider.h')
-rw-r--r-- | common/resultprovider.h | 105 |
1 files changed, 36 insertions, 69 deletions
diff --git a/common/resultprovider.h b/common/resultprovider.h index b7d9272..1c1b0e9 100644 --- a/common/resultprovider.h +++ b/common/resultprovider.h | |||
@@ -20,10 +20,8 @@ | |||
20 | 20 | ||
21 | #pragma once | 21 | #pragma once |
22 | 22 | ||
23 | #include <QThread> | ||
24 | #include <functional> | 23 | #include <functional> |
25 | #include <memory> | 24 | #include <memory> |
26 | #include "threadboundary.h" | ||
27 | #include "resultset.h" | 25 | #include "resultset.h" |
28 | #include "log.h" | 26 | #include "log.h" |
29 | 27 | ||
@@ -75,33 +73,6 @@ private: | |||
75 | template <class T> | 73 | template <class T> |
76 | class ResultProvider : public ResultProviderInterface<T> | 74 | class ResultProvider : public ResultProviderInterface<T> |
77 | { | 75 | { |
78 | private: | ||
79 | void callInMainThreadOnEmitter(void (ResultEmitter<T>::*f)()) | ||
80 | { | ||
81 | // We use the eventloop to call the addHandler directly from the main eventloop. | ||
82 | // That way the result emitter implementation doesn't have to care about threadsafety at all. | ||
83 | // The alternative would be to make all handlers of the emitter threadsafe. | ||
84 | if (auto emitter = mResultEmitter.toStrongRef()) { | ||
85 | auto weakEmitter = mResultEmitter; | ||
86 | // We don't want to keep the emitter alive here, so we only capture a weak reference | ||
87 | emitter->mThreadBoundary.callInMainThread([weakEmitter, f]() { | ||
88 | if (auto strongRef = weakEmitter.toStrongRef()) { | ||
89 | (strongRef.data()->*f)(); | ||
90 | } | ||
91 | }); | ||
92 | } | ||
93 | } | ||
94 | |||
95 | void callInMainThreadOnEmitter(const std::function<void()> &f) | ||
96 | { | ||
97 | // We use the eventloop to call the addHandler directly from the main eventloop. | ||
98 | // That way the result emitter implementation doesn't have to care about threadsafety at all. | ||
99 | // The alternative would be to make all handlers of the emitter threadsafe. | ||
100 | if (auto emitter = mResultEmitter.toStrongRef()) { | ||
101 | emitter->mThreadBoundary.callInMainThread(f); | ||
102 | } | ||
103 | } | ||
104 | |||
105 | public: | 76 | public: |
106 | typedef QSharedPointer<ResultProvider<T>> Ptr; | 77 | typedef QSharedPointer<ResultProvider<T>> Ptr; |
107 | 78 | ||
@@ -112,57 +83,45 @@ public: | |||
112 | // Called from worker thread | 83 | // Called from worker thread |
113 | void add(const T &value) | 84 | void add(const T &value) |
114 | { | 85 | { |
115 | // Because I don't know how to use bind | 86 | if (auto strongRef = mResultEmitter.toStrongRef()) { |
116 | auto weakEmitter = mResultEmitter; | 87 | strongRef->addHandler(value); |
117 | callInMainThreadOnEmitter([weakEmitter, value]() { | 88 | } |
118 | if (auto strongRef = weakEmitter.toStrongRef()) { | ||
119 | strongRef->addHandler(value); | ||
120 | } | ||
121 | }); | ||
122 | } | 89 | } |
123 | 90 | ||
124 | void modify(const T &value) | 91 | void modify(const T &value) |
125 | { | 92 | { |
126 | // Because I don't know how to use bind | 93 | if (auto strongRef = mResultEmitter.toStrongRef()) { |
127 | auto weakEmitter = mResultEmitter; | 94 | strongRef->modifyHandler(value); |
128 | callInMainThreadOnEmitter([weakEmitter, value]() { | 95 | } |
129 | if (auto strongRef = weakEmitter.toStrongRef()) { | ||
130 | strongRef->modifyHandler(value); | ||
131 | } | ||
132 | }); | ||
133 | } | 96 | } |
134 | 97 | ||
135 | void remove(const T &value) | 98 | void remove(const T &value) |
136 | { | 99 | { |
137 | // Because I don't know how to use bind | 100 | if (auto strongRef = mResultEmitter.toStrongRef()) { |
138 | auto weakEmitter = mResultEmitter; | 101 | strongRef->removeHandler(value); |
139 | callInMainThreadOnEmitter([weakEmitter, value]() { | 102 | } |
140 | if (auto strongRef = weakEmitter.toStrongRef()) { | ||
141 | strongRef->removeHandler(value); | ||
142 | } | ||
143 | }); | ||
144 | } | 103 | } |
145 | 104 | ||
146 | void initialResultSetComplete(const T &parent) | 105 | void initialResultSetComplete(const T &parent) |
147 | { | 106 | { |
148 | // Because I don't know how to use bind | 107 | if (auto strongRef = mResultEmitter.toStrongRef()) { |
149 | auto weakEmitter = mResultEmitter; | 108 | strongRef->initialResultSetComplete(parent); |
150 | callInMainThreadOnEmitter([weakEmitter, parent]() { | 109 | } |
151 | if (auto strongRef = weakEmitter.toStrongRef()) { | ||
152 | strongRef->initialResultSetComplete(parent); | ||
153 | } | ||
154 | }); | ||
155 | } | 110 | } |
156 | 111 | ||
157 | // Called from worker thread | 112 | // Called from worker thread |
158 | void complete() | 113 | void complete() |
159 | { | 114 | { |
160 | callInMainThreadOnEmitter(&ResultEmitter<T>::complete); | 115 | if (auto strongRef = mResultEmitter.toStrongRef()) { |
116 | strongRef->complete(); | ||
117 | } | ||
161 | } | 118 | } |
162 | 119 | ||
163 | void clear() | 120 | void clear() |
164 | { | 121 | { |
165 | callInMainThreadOnEmitter(&ResultEmitter<T>::clear); | 122 | if (auto strongRef = mResultEmitter.toStrongRef()) { |
123 | strongRef->clear(); | ||
124 | } | ||
166 | } | 125 | } |
167 | 126 | ||
168 | 127 | ||
@@ -171,7 +130,7 @@ public: | |||
171 | if (!mResultEmitter) { | 130 | if (!mResultEmitter) { |
172 | // We have to go over a separate var and return that, otherwise we'd delete the emitter immediately again | 131 | // We have to go over a separate var and return that, otherwise we'd delete the emitter immediately again |
173 | auto sharedPtr = QSharedPointer<ResultEmitter<T>>(new ResultEmitter<T>, [this](ResultEmitter<T> *emitter) { | 132 | auto sharedPtr = QSharedPointer<ResultEmitter<T>>(new ResultEmitter<T>, [this](ResultEmitter<T> *emitter) { |
174 | mThreadBoundary->callInMainThread([this]() { done(); }); | 133 | done(); |
175 | delete emitter; | 134 | delete emitter; |
176 | }); | 135 | }); |
177 | mResultEmitter = sharedPtr; | 136 | mResultEmitter = sharedPtr; |
@@ -187,7 +146,6 @@ public: | |||
187 | 146 | ||
188 | void onDone(const std::function<void()> &callback) | 147 | void onDone(const std::function<void()> &callback) |
189 | { | 148 | { |
190 | mThreadBoundary = QSharedPointer<async::ThreadBoundary>::create(); | ||
191 | mOnDoneCallback = callback; | 149 | mOnDoneCallback = callback; |
192 | } | 150 | } |
193 | 151 | ||
@@ -205,7 +163,6 @@ public: | |||
205 | private: | 163 | private: |
206 | void done() | 164 | void done() |
207 | { | 165 | { |
208 | qWarning() << "done"; | ||
209 | if (mOnDoneCallback) { | 166 | if (mOnDoneCallback) { |
210 | auto callback = mOnDoneCallback; | 167 | auto callback = mOnDoneCallback; |
211 | mOnDoneCallback = std::function<void()>(); | 168 | mOnDoneCallback = std::function<void()>(); |
@@ -216,7 +173,6 @@ private: | |||
216 | 173 | ||
217 | QWeakPointer<ResultEmitter<T>> mResultEmitter; | 174 | QWeakPointer<ResultEmitter<T>> mResultEmitter; |
218 | std::function<void()> mOnDoneCallback; | 175 | std::function<void()> mOnDoneCallback; |
219 | QSharedPointer<async::ThreadBoundary> mThreadBoundary; | ||
220 | std::function<void(const T &parent)> mFetcher; | 176 | std::function<void(const T &parent)> mFetcher; |
221 | }; | 177 | }; |
222 | 178 | ||
@@ -331,7 +287,6 @@ private: | |||
331 | std::function<void(void)> clearHandler; | 287 | std::function<void(void)> clearHandler; |
332 | 288 | ||
333 | std::function<void(const DomainType &parent)> mFetcher; | 289 | std::function<void(const DomainType &parent)> mFetcher; |
334 | async::ThreadBoundary mThreadBoundary; | ||
335 | }; | 290 | }; |
336 | 291 | ||
337 | template <class DomainType> | 292 | template <class DomainType> |
@@ -350,31 +305,43 @@ public: | |||
350 | emitter->onInitialResultSetComplete([this, ptr](const DomainType &parent) { | 305 | emitter->onInitialResultSetComplete([this, ptr](const DomainType &parent) { |
351 | auto hashValue = qHash(parent); | 306 | auto hashValue = qHash(parent); |
352 | mInitialResultSetInProgress.remove(hashValue, ptr); | 307 | mInitialResultSetInProgress.remove(hashValue, ptr); |
353 | // Normally a parent is only in a single resource, except the toplevel (invalid) parent | 308 | callInitialResultCompleteIfDone(parent); |
354 | if (!mInitialResultSetInProgress.contains(hashValue)) { | ||
355 | this->initialResultSetComplete(parent); | ||
356 | } | ||
357 | }); | 309 | }); |
358 | emitter->onComplete([this]() { this->complete(); }); | 310 | emitter->onComplete([this]() { this->complete(); }); |
359 | emitter->onClear([this]() { this->clear(); }); | 311 | emitter->onClear([this]() { this->clear(); }); |
360 | mEmitter << emitter; | 312 | mEmitter << emitter; |
361 | } | 313 | } |
362 | 314 | ||
315 | void callInitialResultCompleteIfDone(const DomainType &parent) | ||
316 | { | ||
317 | auto hashValue = qHash(parent); | ||
318 | // Normally a parent is only in a single resource, except the toplevel (invalid) parent | ||
319 | if (!mInitialResultSetInProgress.contains(hashValue) && mAllResultsFetched && !mResultEmitted) { | ||
320 | mResultEmitted = true; | ||
321 | this->initialResultSetComplete(parent); | ||
322 | } | ||
323 | } | ||
324 | |||
363 | void fetch(const DomainType &parent) Q_DECL_OVERRIDE | 325 | void fetch(const DomainType &parent) Q_DECL_OVERRIDE |
364 | { | 326 | { |
365 | if (mEmitter.isEmpty()) { | 327 | if (mEmitter.isEmpty()) { |
366 | Trace() << "No child emitters, the result is complete"; | ||
367 | this->initialResultSetComplete(parent); | 328 | this->initialResultSetComplete(parent); |
368 | } else { | 329 | } else { |
330 | mResultEmitted = false; | ||
331 | mAllResultsFetched = false; | ||
369 | for (const auto &emitter : mEmitter) { | 332 | for (const auto &emitter : mEmitter) { |
370 | mInitialResultSetInProgress.insert(qHash(parent), emitter.data()); | 333 | mInitialResultSetInProgress.insert(qHash(parent), emitter.data()); |
371 | emitter->fetch(parent); | 334 | emitter->fetch(parent); |
372 | } | 335 | } |
336 | mAllResultsFetched = true; | ||
337 | callInitialResultCompleteIfDone(parent); | ||
373 | } | 338 | } |
374 | } | 339 | } |
375 | 340 | ||
376 | private: | 341 | private: |
377 | QList<typename ResultEmitter<DomainType>::Ptr> mEmitter; | 342 | QList<typename ResultEmitter<DomainType>::Ptr> mEmitter; |
378 | QMultiMap<qint64, ResultEmitter<DomainType> *> mInitialResultSetInProgress; | 343 | QMultiMap<qint64, ResultEmitter<DomainType> *> mInitialResultSetInProgress; |
344 | bool mAllResultsFetched; | ||
345 | bool mResultEmitted; | ||
379 | }; | 346 | }; |
380 | } | 347 | } |