diff options
author | Christian Mollekopf <chrigi_1@fastmail.fm> | 2017-03-07 12:04:47 +0100 |
---|---|---|
committer | Christian Mollekopf <chrigi_1@fastmail.fm> | 2017-03-07 12:04:47 +0100 |
commit | 4ecb10d3b1294d03578c28467c0f3759b3496e0b (patch) | |
tree | d8110a1109a0510746e9083b3272dda872dfe4a2 | |
parent | 88a0c750a80e6efb207877cac73144266e62d5ff (diff) | |
download | sink-4ecb10d3b1294d03578c28467c0f3759b3496e0b.tar.gz sink-4ecb10d3b1294d03578c28467c0f3759b3496e0b.zip |
Resolved potential deadlock
When trying to reply to a mail from kube we ran into a deadlock.
The initial result callback is called from the main thread, and that can
thus directly lead to destruction of the emitter.
-rw-r--r-- | common/modelresult.cpp | 18 | ||||
-rw-r--r-- | common/resultprovider.h | 10 | ||||
-rw-r--r-- | tests/clientapitest.cpp | 20 |
3 files changed, 32 insertions, 16 deletions
diff --git a/common/modelresult.cpp b/common/modelresult.cpp index f935419..904766d 100644 --- a/common/modelresult.cpp +++ b/common/modelresult.cpp | |||
@@ -296,15 +296,15 @@ void ModelResult<T, Ptr>::setEmitter(const typename Sink::ResultEmitter<Ptr>::Pt | |||
296 | emitter->onInitialResultSetComplete([this, guard](const Ptr &parent, bool fetchedAll) { | 296 | emitter->onInitialResultSetComplete([this, guard](const Ptr &parent, bool fetchedAll) { |
297 | SinkTraceCtx(mLogCtx) << "Initial result set complete. Fetched all: " << fetchedAll; | 297 | SinkTraceCtx(mLogCtx) << "Initial result set complete. Fetched all: " << fetchedAll; |
298 | Q_ASSERT(guard); | 298 | Q_ASSERT(guard); |
299 | threadBoundary.callInMainThread([=]() { | 299 | Q_ASSERT(QThread::currentThread() == this->thread()); |
300 | const qint64 parentId = parent ? qHash(*parent) : 0; | 300 | |
301 | const auto parentIndex = createIndexFromId(parentId); | 301 | const qint64 parentId = parent ? qHash(*parent) : 0; |
302 | mEntityChildrenFetchComplete.insert(parentId); | 302 | const auto parentIndex = createIndexFromId(parentId); |
303 | if (fetchedAll) { | 303 | mEntityChildrenFetchComplete.insert(parentId); |
304 | mEntityAllChildrenFetched.insert(parentId); | 304 | if (fetchedAll) { |
305 | } | 305 | mEntityAllChildrenFetched.insert(parentId); |
306 | emit dataChanged(parentIndex, parentIndex, QVector<int>() << ChildrenFetchedRole); | 306 | } |
307 | }); | 307 | emit dataChanged(parentIndex, parentIndex, QVector<int>() << ChildrenFetchedRole); |
308 | }); | 308 | }); |
309 | mEmitter = emitter; | 309 | mEmitter = emitter; |
310 | } | 310 | } |
diff --git a/common/resultprovider.h b/common/resultprovider.h index a2ed0b5..d6feaf9 100644 --- a/common/resultprovider.h +++ b/common/resultprovider.h | |||
@@ -268,8 +268,9 @@ public: | |||
268 | 268 | ||
269 | void initialResultSetComplete(const DomainType &parent, bool replayedAll) | 269 | void initialResultSetComplete(const DomainType &parent, bool replayedAll) |
270 | { | 270 | { |
271 | QMutexLocker locker{&mMutex}; | 271 | //This callback is only ever called from the main thread, so we don't do any locking |
272 | if (initialResultSetCompleteHandler && guardOk()) { | 272 | if (initialResultSetCompleteHandler && guardOk()) { |
273 | //This can directly lead to our destruction and thus waitForMethodExecutionEnd | ||
273 | initialResultSetCompleteHandler(parent, replayedAll); | 274 | initialResultSetCompleteHandler(parent, replayedAll); |
274 | } | 275 | } |
275 | } | 276 | } |
@@ -313,6 +314,13 @@ private: | |||
313 | std::function<void(void)> clearHandler; | 314 | std::function<void(void)> clearHandler; |
314 | 315 | ||
315 | std::function<void(const DomainType &parent)> mFetcher; | 316 | std::function<void(const DomainType &parent)> mFetcher; |
317 | /* | ||
318 | * This mutex is here to protect the emitter from getting destroyed while the producer-thread (ResultProvider) is calling into it, | ||
319 | * and vice-verca, to protect the producer thread from calling into a destroyed emitter. | ||
320 | * | ||
321 | * This is necessary because Emitter and ResultProvider have lifetimes managed by two different threads. | ||
322 | * The emitter lives in the application thread, and the resultprovider in the query thread. | ||
323 | */ | ||
316 | QMutex mMutex; | 324 | QMutex mMutex; |
317 | bool mDone = false; | 325 | bool mDone = false; |
318 | }; | 326 | }; |
diff --git a/tests/clientapitest.cpp b/tests/clientapitest.cpp index da52afb..3955ebd 100644 --- a/tests/clientapitest.cpp +++ b/tests/clientapitest.cpp | |||
@@ -12,6 +12,12 @@ | |||
12 | #include "asyncutils.h" | 12 | #include "asyncutils.h" |
13 | 13 | ||
14 | template <typename T> | 14 | template <typename T> |
15 | struct Result { | ||
16 | QSharedPointer<T> parent; | ||
17 | bool fetchedAll; | ||
18 | }; | ||
19 | |||
20 | template <typename T> | ||
15 | class TestDummyResourceFacade : public Sink::StoreFacade<T> | 21 | class TestDummyResourceFacade : public Sink::StoreFacade<T> |
16 | { | 22 | { |
17 | public: | 23 | public: |
@@ -68,7 +74,7 @@ public: | |||
68 | auto emitter = resultProvider->emitter(); | 74 | auto emitter = resultProvider->emitter(); |
69 | 75 | ||
70 | resultProvider->setFetcher([query, resultProvider, this, ctx](const typename T::Ptr &parent) { | 76 | resultProvider->setFetcher([query, resultProvider, this, ctx](const typename T::Ptr &parent) { |
71 | async::run<int>([=] { | 77 | async::run<Result<T>>([=] { |
72 | if (parent) { | 78 | if (parent) { |
73 | SinkTraceCtx(ctx) << "Running the fetcher " << parent->identifier(); | 79 | SinkTraceCtx(ctx) << "Running the fetcher " << parent->identifier(); |
74 | } else { | 80 | } else { |
@@ -90,14 +96,16 @@ public: | |||
90 | SinkTraceCtx(ctx) << "Aborting early after " << count << "results."; | 96 | SinkTraceCtx(ctx) << "Aborting early after " << count << "results."; |
91 | offset = i + 1; | 97 | offset = i + 1; |
92 | bool fetchedAll = (i + 1 >= results.size()); | 98 | bool fetchedAll = (i + 1 >= results.size()); |
93 | resultProvider->initialResultSetComplete(parent, fetchedAll); | 99 | return Result<T>{parent, fetchedAll}; |
94 | return 0; | ||
95 | } | 100 | } |
96 | } | 101 | } |
97 | } | 102 | } |
98 | resultProvider->initialResultSetComplete(parent, true); | 103 | return Result<T>{parent, true}; |
99 | return 0; | 104 | }, runAsync) |
100 | }, runAsync).exec(); | 105 | .then([=] (const Result<T> &r) { |
106 | resultProvider->initialResultSetComplete(r.parent, r.fetchedAll); | ||
107 | }) | ||
108 | .exec(); | ||
101 | }); | 109 | }); |
102 | auto job = KAsync::start([query, resultProvider]() {}); | 110 | auto job = KAsync::start([query, resultProvider]() {}); |
103 | mResultProvider = resultProvider.data(); | 111 | mResultProvider = resultProvider.data(); |