summaryrefslogtreecommitdiffstats
path: root/common/queryrunner.cpp
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2018-02-15 10:19:08 +0100
committerChristian Mollekopf <chrigi_1@fastmail.fm>2018-02-15 11:08:52 +0100
commit531972042d4b610258c8af8a17ec3a99cd063dda (patch)
treeef4356ec141f0f1ccd756e8610b08553b866bf78 /common/queryrunner.cpp
parentf51963f057bcbdd175114433913a1c5f0eebd546 (diff)
downloadsink-531972042d4b610258c8af8a17ec3a99cd063dda.tar.gz
sink-531972042d4b610258c8af8a17ec3a99cd063dda.zip
Fixed crashes due to concurrently running queries.
A single QueryRunner should never have multiple workers running at the same time. We did not properly enforce this in case of incremental updates coming in. The only way I managed to reproduce the crash: * Open a large folder with lots of unread mail in kube * Select a mail in the maillist and hold the down button * This will: * Repeatedly call fetch more * Trigger lot's of mark as read modifications that result in notifications. * Eventually it crashes somewhere in EntityStore, likely because of concurrent access of the filter structure which is shared through the state. We now ensure in the single threaded portion of the code that we only ever run one worker at a time. If we did receive an update during, we remember that change and fetch more once we're done. To be able to call fetch again that portion was also factored out into a separate function.
Diffstat (limited to 'common/queryrunner.cpp')
-rw-r--r--common/queryrunner.cpp153
1 files changed, 86 insertions, 67 deletions
diff --git a/common/queryrunner.cpp b/common/queryrunner.cpp
index 928e1e0..0ed4cb5 100644
--- a/common/queryrunner.cpp
+++ b/common/queryrunner.cpp
@@ -69,79 +69,14 @@ QueryRunner<DomainType>::QueryRunner(const Sink::Query &query, const Sink::Resou
69 if (query.limit() && query.sortProperty().isEmpty()) { 69 if (query.limit() && query.sortProperty().isEmpty()) {
70 SinkWarningCtx(mLogCtx) << "A limited query without sorting is typically a bad idea, because there is no telling what you're going to get."; 70 SinkWarningCtx(mLogCtx) << "A limited query without sorting is typically a bad idea, because there is no telling what you're going to get.";
71 } 71 }
72 auto guardPtr = QPointer<QObject>(&guard);
73 auto fetcher = [=]() {
74 SinkTraceCtx(mLogCtx) << "Running fetcher. Batchsize: " << mBatchSize;
75 auto resultProvider = mResultProvider;
76 auto resultTransformation = mResultTransformation;
77 auto batchSize = mBatchSize;
78 auto resourceContext = mResourceContext;
79 auto logCtx = mLogCtx;
80 auto state = mQueryState;
81 const bool runAsync = !query.synchronousQuery();
82 //The lambda will be executed in a separate thread, so copy all arguments
83 async::run<ReplayResult>([=]() {
84 QueryWorker<DomainType> worker(query, resourceContext, bufferType, resultTransformation, logCtx);
85 return worker.executeInitialQuery(query, *resultProvider, batchSize, state);
86 }, runAsync)
87 .then([this, query, resultProvider, guardPtr](const ReplayResult &result) {
88 if (!guardPtr) {
89 //Not an error, the query can vanish at any time.
90 return;
91 }
92 mInitialQueryComplete = true;
93 mQueryState = result.queryState;
94 // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise.
95 if (query.liveQuery()) {
96 mResourceAccess->sendRevisionReplayedCommand(result.newRevision);
97 }
98 resultProvider->setRevision(result.newRevision);
99 resultProvider->initialResultSetComplete(result.replayedAll);
100 })
101 .exec();
102 };
103
104 // We delegate loading of initial data to the result provider, so it can decide for itself what it needs to load. 72 // We delegate loading of initial data to the result provider, so it can decide for itself what it needs to load.
105 mResultProvider->setFetcher(fetcher); 73 mResultProvider->setFetcher([this, query, bufferType] { fetch(query, bufferType); });
106 74
107 // In case of a live query we keep the runner for as long alive as the result provider exists 75 // In case of a live query we keep the runner for as long alive as the result provider exists
108 if (query.liveQuery()) { 76 if (query.liveQuery()) {
109 Q_ASSERT(!query.synchronousQuery()); 77 Q_ASSERT(!query.synchronousQuery());
110 // Incremental updates are always loaded directly, leaving it up to the result to discard the changes if they are not interesting 78 // Incremental updates are always loaded directly, leaving it up to the result to discard the changes if they are not interesting
111 setQuery([=]() -> KAsync::Job<void> { 79 setQuery([=]() { return incrementalFetch(query, bufferType); });
112 auto resultProvider = mResultProvider;
113 auto resourceContext = mResourceContext;
114 auto logCtx = mLogCtx;
115 auto state = mQueryState;
116 auto resultTransformation = mResultTransformation;
117 if (!mInitialQueryComplete) {
118 SinkWarningCtx(mLogCtx) << "Can't start the incremental query before the initial query is complete";
119 fetcher();
120 return KAsync::null();
121 }
122 if (mQueryInProgress) {
123 //Can happen if the revision come in quicker than we process them.
124 return KAsync::null();
125 }
126 Q_ASSERT(!mQueryInProgress);
127 return KAsync::start([&] {
128 mQueryInProgress = true;
129 })
130 .then(async::run<ReplayResult>([=]() {
131 QueryWorker<DomainType> worker(query, resourceContext, bufferType, resultTransformation, logCtx);
132 return worker.executeIncrementalQuery(query, *resultProvider, state);
133 }))
134 .then([query, this, resultProvider, guardPtr](const ReplayResult &newRevisionAndReplayedEntities) {
135 if (!guardPtr) {
136 //Not an error, the query can vanish at any time.
137 return;
138 }
139 mQueryInProgress = false;
140 // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise.
141 mResourceAccess->sendRevisionReplayedCommand(newRevisionAndReplayedEntities.newRevision);
142 resultProvider->setRevision(newRevisionAndReplayedEntities.newRevision);
143 });
144 });
145 // Ensure the connection is open, if it wasn't already opened 80 // Ensure the connection is open, if it wasn't already opened
146 // TODO If we are not connected already, we have to check for the latest revision once connected, otherwise we could miss some updates 81 // TODO If we are not connected already, we have to check for the latest revision once connected, otherwise we could miss some updates
147 mResourceAccess->open(); 82 mResourceAccess->open();
@@ -158,6 +93,90 @@ QueryRunner<DomainType>::~QueryRunner()
158 SinkTraceCtx(mLogCtx) << "Stopped query"; 93 SinkTraceCtx(mLogCtx) << "Stopped query";
159} 94}
160 95
96//This function triggers the initial fetch, and then subsequent calls will simply fetch more data of mBatchSize.
97template <class DomainType>
98void QueryRunner<DomainType>::fetch(const Sink::Query &query, const QByteArray &bufferType)
99{
100 auto guardPtr = QPointer<QObject>(&guard);
101 SinkTraceCtx(mLogCtx) << "Running fetcher. Batchsize: " << mBatchSize;
102 if (mQueryInProgress) {
103 SinkTraceCtx(mLogCtx) << "Query is already in progress, postponing: " << mBatchSize;
104 mRequestFetchMore = true;
105 return;
106 }
107 mQueryInProgress = true;
108 auto resultProvider = mResultProvider;
109 auto resultTransformation = mResultTransformation;
110 auto batchSize = mBatchSize;
111 auto resourceContext = mResourceContext;
112 auto logCtx = mLogCtx;
113 auto state = mQueryState;
114 const bool runAsync = !query.synchronousQuery();
115 //The lambda will be executed in a separate thread, so copy all arguments
116 async::run<ReplayResult>([=]() {
117 QueryWorker<DomainType> worker(query, resourceContext, bufferType, resultTransformation, logCtx);
118 return worker.executeInitialQuery(query, *resultProvider, batchSize, state);
119 }, runAsync)
120 .then([=](const ReplayResult &result) {
121 if (!guardPtr) {
122 //Not an error, the query can vanish at any time.
123 return;
124 }
125 mInitialQueryComplete = true;
126 mQueryInProgress = false;
127 mQueryState = result.queryState;
128 // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise.
129 if (query.liveQuery()) {
130 mResourceAccess->sendRevisionReplayedCommand(result.newRevision);
131 }
132 resultProvider->setRevision(result.newRevision);
133 resultProvider->initialResultSetComplete(result.replayedAll);
134 if (mRequestFetchMore) {
135 mRequestFetchMore = false;
136 fetch(query, bufferType);
137 }
138 })
139 .exec();
140}
141
142template <class DomainType>
143KAsync::Job<void> QueryRunner<DomainType>::incrementalFetch(const Sink::Query &query, const QByteArray &bufferType)
144{
145 if (!mInitialQueryComplete) {
146 SinkWarningCtx(mLogCtx) << "Can't start the incremental query before the initial query is complete";
147 fetch(query, bufferType);
148 return KAsync::null();
149 }
150 if (mQueryInProgress) {
151 //Can happen if the revision come in quicker than we process them.
152 return KAsync::null();
153 }
154 auto resultProvider = mResultProvider;
155 auto resourceContext = mResourceContext;
156 auto logCtx = mLogCtx;
157 auto state = mQueryState;
158 auto resultTransformation = mResultTransformation;
159 Q_ASSERT(!mQueryInProgress);
160 auto guardPtr = QPointer<QObject>(&guard);
161 return KAsync::start([&] {
162 mQueryInProgress = true;
163 })
164 .then(async::run<ReplayResult>([=]() {
165 QueryWorker<DomainType> worker(query, resourceContext, bufferType, resultTransformation, logCtx);
166 return worker.executeIncrementalQuery(query, *resultProvider, state);
167 }))
168 .then([query, this, resultProvider, guardPtr](const ReplayResult &newRevisionAndReplayedEntities) {
169 if (!guardPtr) {
170 //Not an error, the query can vanish at any time.
171 return;
172 }
173 mQueryInProgress = false;
174 // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise.
175 mResourceAccess->sendRevisionReplayedCommand(newRevisionAndReplayedEntities.newRevision);
176 resultProvider->setRevision(newRevisionAndReplayedEntities.newRevision);
177 });
178}
179
161template <class DomainType> 180template <class DomainType>
162void QueryRunner<DomainType>::setResultTransformation(const ResultTransformation &transformation) 181void QueryRunner<DomainType>::setResultTransformation(const ResultTransformation &transformation)
163{ 182{