diff options
Diffstat (limited to 'common/queryrunner.cpp')
-rw-r--r-- | common/queryrunner.cpp | 53 |
1 files changed, 45 insertions, 8 deletions
diff --git a/common/queryrunner.cpp b/common/queryrunner.cpp index 2c50fca..c0c1d00 100644 --- a/common/queryrunner.cpp +++ b/common/queryrunner.cpp | |||
@@ -21,6 +21,8 @@ | |||
21 | #include <limits> | 21 | #include <limits> |
22 | #include <QTime> | 22 | #include <QTime> |
23 | #include <QPointer> | 23 | #include <QPointer> |
24 | #include <thread> | ||
25 | #include <chrono> | ||
24 | 26 | ||
25 | #include "commands.h" | 27 | #include "commands.h" |
26 | #include "asyncutils.h" | 28 | #include "asyncutils.h" |
@@ -97,6 +99,13 @@ QueryRunner<DomainType>::~QueryRunner() | |||
97 | SinkTraceCtx(mLogCtx) << "Stopped query"; | 99 | SinkTraceCtx(mLogCtx) << "Stopped query"; |
98 | } | 100 | } |
99 | 101 | ||
102 | |||
103 | template <class DomainType> | ||
104 | void QueryRunner<DomainType>::delayNextQuery() | ||
105 | { | ||
106 | mDelayNextQuery = true; | ||
107 | } | ||
108 | |||
100 | //This function triggers the initial fetch, and then subsequent calls will simply fetch more data of mBatchSize. | 109 | //This function triggers the initial fetch, and then subsequent calls will simply fetch more data of mBatchSize. |
101 | template <class DomainType> | 110 | template <class DomainType> |
102 | void QueryRunner<DomainType>::fetch(const Sink::Query &query, const QByteArray &bufferType) | 111 | void QueryRunner<DomainType>::fetch(const Sink::Query &query, const QByteArray &bufferType) |
@@ -115,11 +124,20 @@ void QueryRunner<DomainType>::fetch(const Sink::Query &query, const QByteArray & | |||
115 | auto resourceContext = mResourceContext; | 124 | auto resourceContext = mResourceContext; |
116 | auto logCtx = mLogCtx; | 125 | auto logCtx = mLogCtx; |
117 | auto state = mQueryState; | 126 | auto state = mQueryState; |
127 | bool addDelay = mDelayNextQuery; | ||
128 | mDelayNextQuery = false; | ||
118 | const bool runAsync = !query.synchronousQuery(); | 129 | const bool runAsync = !query.synchronousQuery(); |
119 | //The lambda will be executed in a separate thread, so copy all arguments | 130 | //The lambda will be executed in a separate thread, so copy all arguments |
120 | async::run<ReplayResult>([=]() { | 131 | async::run<ReplayResult>([=]() { |
121 | QueryWorker<DomainType> worker(query, resourceContext, bufferType, resultTransformation, logCtx); | 132 | QueryWorker<DomainType> worker(query, resourceContext, bufferType, resultTransformation, logCtx); |
122 | return worker.executeInitialQuery(query, *resultProvider, batchSize, state); | 133 | const auto result = worker.executeInitialQuery(query, *resultProvider, batchSize, state); |
134 | |||
135 | //For testing only | ||
136 | if (addDelay) { | ||
137 | std::this_thread::sleep_for(std::chrono::seconds(1)); | ||
138 | } | ||
139 | |||
140 | return result; | ||
123 | }, runAsync) | 141 | }, runAsync) |
124 | .then([=](const ReplayResult &result) { | 142 | .then([=](const ReplayResult &result) { |
125 | if (!guardPtr) { | 143 | if (!guardPtr) { |
@@ -137,7 +155,12 @@ void QueryRunner<DomainType>::fetch(const Sink::Query &query, const QByteArray & | |||
137 | resultProvider->initialResultSetComplete(result.replayedAll); | 155 | resultProvider->initialResultSetComplete(result.replayedAll); |
138 | if (mRequestFetchMore) { | 156 | if (mRequestFetchMore) { |
139 | mRequestFetchMore = false; | 157 | mRequestFetchMore = false; |
158 | //This code exists for incemental fetches, so we don't skip loading another set. | ||
140 | fetch(query, bufferType); | 159 | fetch(query, bufferType); |
160 | return; | ||
161 | } | ||
162 | if (mRevisionChangedMeanwhile) { | ||
163 | incrementalFetch(query, bufferType).exec(); | ||
141 | } | 164 | } |
142 | }) | 165 | }) |
143 | .exec(); | 166 | .exec(); |
@@ -146,15 +169,17 @@ void QueryRunner<DomainType>::fetch(const Sink::Query &query, const QByteArray & | |||
146 | template <class DomainType> | 169 | template <class DomainType> |
147 | KAsync::Job<void> QueryRunner<DomainType>::incrementalFetch(const Sink::Query &query, const QByteArray &bufferType) | 170 | KAsync::Job<void> QueryRunner<DomainType>::incrementalFetch(const Sink::Query &query, const QByteArray &bufferType) |
148 | { | 171 | { |
149 | if (!mInitialQueryComplete) { | 172 | if (!mInitialQueryComplete && !mQueryInProgress) { |
150 | SinkWarningCtx(mLogCtx) << "Can't start the incremental query before the initial query is complete"; | 173 | //We rely on this codepath in the case of newly added resources to trigger the initial fetch. |
151 | fetch(query, bufferType); | 174 | fetch(query, bufferType); |
152 | return KAsync::null(); | 175 | return KAsync::null(); |
153 | } | 176 | } |
154 | if (mQueryInProgress) { | 177 | if (mQueryInProgress) { |
155 | //Can happen if the revision come in quicker than we process them. | 178 | //If a query is already in progress we just remember to fetch again once the current query is done. |
179 | mRevisionChangedMeanwhile = true; | ||
156 | return KAsync::null(); | 180 | return KAsync::null(); |
157 | } | 181 | } |
182 | mRevisionChangedMeanwhile = false; | ||
158 | auto resultProvider = mResultProvider; | 183 | auto resultProvider = mResultProvider; |
159 | auto resourceContext = mResourceContext; | 184 | auto resourceContext = mResourceContext; |
160 | auto logCtx = mLogCtx; | 185 | auto logCtx = mLogCtx; |
@@ -162,22 +187,34 @@ KAsync::Job<void> QueryRunner<DomainType>::incrementalFetch(const Sink::Query &q | |||
162 | auto resultTransformation = mResultTransformation; | 187 | auto resultTransformation = mResultTransformation; |
163 | Q_ASSERT(!mQueryInProgress); | 188 | Q_ASSERT(!mQueryInProgress); |
164 | auto guardPtr = QPointer<QObject>(&guard); | 189 | auto guardPtr = QPointer<QObject>(&guard); |
190 | bool addDelay = mDelayNextQuery; | ||
191 | mDelayNextQuery = false; | ||
165 | return KAsync::start([&] { | 192 | return KAsync::start([&] { |
166 | mQueryInProgress = true; | 193 | mQueryInProgress = true; |
167 | }) | 194 | }) |
168 | .then(async::run<ReplayResult>([=]() { | 195 | .then(async::run<ReplayResult>([=]() { |
169 | QueryWorker<DomainType> worker(query, resourceContext, bufferType, resultTransformation, logCtx); | 196 | QueryWorker<DomainType> worker(query, resourceContext, bufferType, resultTransformation, logCtx); |
170 | return worker.executeIncrementalQuery(query, *resultProvider, state); | 197 | const auto result = worker.executeIncrementalQuery(query, *resultProvider, state); |
198 | ////For testing only | ||
199 | if (addDelay) { | ||
200 | SinkWarning() << "Sleeping in incremental query"; | ||
201 | std::this_thread::sleep_for(std::chrono::seconds(1)); | ||
202 | } | ||
203 | |||
204 | return result; | ||
171 | })) | 205 | })) |
172 | .then([query, this, resultProvider, guardPtr](const ReplayResult &newRevisionAndReplayedEntities) { | 206 | .then([query, this, resultProvider, guardPtr, bufferType](const ReplayResult &newRevisionAndReplayedEntities) { |
173 | if (!guardPtr) { | 207 | if (!guardPtr) { |
174 | //Not an error, the query can vanish at any time. | 208 | //Not an error, the query can vanish at any time. |
175 | return; | 209 | return KAsync::null(); |
176 | } | 210 | } |
177 | mQueryInProgress = false; | 211 | mQueryInProgress = false; |
178 | // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. | ||
179 | mResourceAccess->sendRevisionReplayedCommand(newRevisionAndReplayedEntities.newRevision); | 212 | mResourceAccess->sendRevisionReplayedCommand(newRevisionAndReplayedEntities.newRevision); |
180 | resultProvider->setRevision(newRevisionAndReplayedEntities.newRevision); | 213 | resultProvider->setRevision(newRevisionAndReplayedEntities.newRevision); |
214 | if (mRevisionChangedMeanwhile) { | ||
215 | return incrementalFetch(query, bufferType); | ||
216 | } | ||
217 | return KAsync::null(); | ||
181 | }); | 218 | }); |
182 | } | 219 | } |
183 | 220 | ||