diff options
author | Christian Mollekopf <chrigi_1@fastmail.fm> | 2018-04-01 09:41:28 +0200 |
---|---|---|
committer | Christian Mollekopf <chrigi_1@fastmail.fm> | 2018-04-01 10:02:41 +0200 |
commit | 8d2d52fcfd3c42a82b7f86c6f3c5009461f1de9f (patch) | |
tree | 042e2db12927d289fce3a2fab8486e03ac4c9049 | |
parent | 53b76e66f5527a5f9442173279b0a01f1f07da46 (diff) | |
download | sink-8d2d52fcfd3c42a82b7f86c6f3c5009461f1de9f.tar.gz sink-8d2d52fcfd3c42a82b7f86c6f3c5009461f1de9f.zip |
Avoid missing revision updates while a query is running.
Instead we have to remember that something has changed and rerun an
incremental query.
-rw-r--r-- | common/queryrunner.cpp | 53 | ||||
-rw-r--r-- | common/queryrunner.h | 9 | ||||
-rw-r--r-- | tests/querytest.cpp | 51 |
3 files changed, 104 insertions, 9 deletions
diff --git a/common/queryrunner.cpp b/common/queryrunner.cpp index 2c50fca..c0c1d00 100644 --- a/common/queryrunner.cpp +++ b/common/queryrunner.cpp | |||
@@ -21,6 +21,8 @@ | |||
21 | #include <limits> | 21 | #include <limits> |
22 | #include <QTime> | 22 | #include <QTime> |
23 | #include <QPointer> | 23 | #include <QPointer> |
24 | #include <thread> | ||
25 | #include <chrono> | ||
24 | 26 | ||
25 | #include "commands.h" | 27 | #include "commands.h" |
26 | #include "asyncutils.h" | 28 | #include "asyncutils.h" |
@@ -97,6 +99,13 @@ QueryRunner<DomainType>::~QueryRunner() | |||
97 | SinkTraceCtx(mLogCtx) << "Stopped query"; | 99 | SinkTraceCtx(mLogCtx) << "Stopped query"; |
98 | } | 100 | } |
99 | 101 | ||
102 | |||
103 | template <class DomainType> | ||
104 | void QueryRunner<DomainType>::delayNextQuery() | ||
105 | { | ||
106 | mDelayNextQuery = true; | ||
107 | } | ||
108 | |||
100 | //This function triggers the initial fetch, and then subsequent calls will simply fetch more data of mBatchSize. | 109 | //This function triggers the initial fetch, and then subsequent calls will simply fetch more data of mBatchSize. |
101 | template <class DomainType> | 110 | template <class DomainType> |
102 | void QueryRunner<DomainType>::fetch(const Sink::Query &query, const QByteArray &bufferType) | 111 | void QueryRunner<DomainType>::fetch(const Sink::Query &query, const QByteArray &bufferType) |
@@ -115,11 +124,20 @@ void QueryRunner<DomainType>::fetch(const Sink::Query &query, const QByteArray & | |||
115 | auto resourceContext = mResourceContext; | 124 | auto resourceContext = mResourceContext; |
116 | auto logCtx = mLogCtx; | 125 | auto logCtx = mLogCtx; |
117 | auto state = mQueryState; | 126 | auto state = mQueryState; |
127 | bool addDelay = mDelayNextQuery; | ||
128 | mDelayNextQuery = false; | ||
118 | const bool runAsync = !query.synchronousQuery(); | 129 | const bool runAsync = !query.synchronousQuery(); |
119 | //The lambda will be executed in a separate thread, so copy all arguments | 130 | //The lambda will be executed in a separate thread, so copy all arguments |
120 | async::run<ReplayResult>([=]() { | 131 | async::run<ReplayResult>([=]() { |
121 | QueryWorker<DomainType> worker(query, resourceContext, bufferType, resultTransformation, logCtx); | 132 | QueryWorker<DomainType> worker(query, resourceContext, bufferType, resultTransformation, logCtx); |
122 | return worker.executeInitialQuery(query, *resultProvider, batchSize, state); | 133 | const auto result = worker.executeInitialQuery(query, *resultProvider, batchSize, state); |
134 | |||
135 | //For testing only | ||
136 | if (addDelay) { | ||
137 | std::this_thread::sleep_for(std::chrono::seconds(1)); | ||
138 | } | ||
139 | |||
140 | return result; | ||
123 | }, runAsync) | 141 | }, runAsync) |
124 | .then([=](const ReplayResult &result) { | 142 | .then([=](const ReplayResult &result) { |
125 | if (!guardPtr) { | 143 | if (!guardPtr) { |
@@ -137,7 +155,12 @@ void QueryRunner<DomainType>::fetch(const Sink::Query &query, const QByteArray & | |||
137 | resultProvider->initialResultSetComplete(result.replayedAll); | 155 | resultProvider->initialResultSetComplete(result.replayedAll); |
138 | if (mRequestFetchMore) { | 156 | if (mRequestFetchMore) { |
139 | mRequestFetchMore = false; | 157 | mRequestFetchMore = false; |
158 | //This code exists for incemental fetches, so we don't skip loading another set. | ||
140 | fetch(query, bufferType); | 159 | fetch(query, bufferType); |
160 | return; | ||
161 | } | ||
162 | if (mRevisionChangedMeanwhile) { | ||
163 | incrementalFetch(query, bufferType).exec(); | ||
141 | } | 164 | } |
142 | }) | 165 | }) |
143 | .exec(); | 166 | .exec(); |
@@ -146,15 +169,17 @@ void QueryRunner<DomainType>::fetch(const Sink::Query &query, const QByteArray & | |||
146 | template <class DomainType> | 169 | template <class DomainType> |
147 | KAsync::Job<void> QueryRunner<DomainType>::incrementalFetch(const Sink::Query &query, const QByteArray &bufferType) | 170 | KAsync::Job<void> QueryRunner<DomainType>::incrementalFetch(const Sink::Query &query, const QByteArray &bufferType) |
148 | { | 171 | { |
149 | if (!mInitialQueryComplete) { | 172 | if (!mInitialQueryComplete && !mQueryInProgress) { |
150 | SinkWarningCtx(mLogCtx) << "Can't start the incremental query before the initial query is complete"; | 173 | //We rely on this codepath in the case of newly added resources to trigger the initial fetch. |
151 | fetch(query, bufferType); | 174 | fetch(query, bufferType); |
152 | return KAsync::null(); | 175 | return KAsync::null(); |
153 | } | 176 | } |
154 | if (mQueryInProgress) { | 177 | if (mQueryInProgress) { |
155 | //Can happen if the revision come in quicker than we process them. | 178 | //If a query is already in progress we just remember to fetch again once the current query is done. |
179 | mRevisionChangedMeanwhile = true; | ||
156 | return KAsync::null(); | 180 | return KAsync::null(); |
157 | } | 181 | } |
182 | mRevisionChangedMeanwhile = false; | ||
158 | auto resultProvider = mResultProvider; | 183 | auto resultProvider = mResultProvider; |
159 | auto resourceContext = mResourceContext; | 184 | auto resourceContext = mResourceContext; |
160 | auto logCtx = mLogCtx; | 185 | auto logCtx = mLogCtx; |
@@ -162,22 +187,34 @@ KAsync::Job<void> QueryRunner<DomainType>::incrementalFetch(const Sink::Query &q | |||
162 | auto resultTransformation = mResultTransformation; | 187 | auto resultTransformation = mResultTransformation; |
163 | Q_ASSERT(!mQueryInProgress); | 188 | Q_ASSERT(!mQueryInProgress); |
164 | auto guardPtr = QPointer<QObject>(&guard); | 189 | auto guardPtr = QPointer<QObject>(&guard); |
190 | bool addDelay = mDelayNextQuery; | ||
191 | mDelayNextQuery = false; | ||
165 | return KAsync::start([&] { | 192 | return KAsync::start([&] { |
166 | mQueryInProgress = true; | 193 | mQueryInProgress = true; |
167 | }) | 194 | }) |
168 | .then(async::run<ReplayResult>([=]() { | 195 | .then(async::run<ReplayResult>([=]() { |
169 | QueryWorker<DomainType> worker(query, resourceContext, bufferType, resultTransformation, logCtx); | 196 | QueryWorker<DomainType> worker(query, resourceContext, bufferType, resultTransformation, logCtx); |
170 | return worker.executeIncrementalQuery(query, *resultProvider, state); | 197 | const auto result = worker.executeIncrementalQuery(query, *resultProvider, state); |
198 | ////For testing only | ||
199 | if (addDelay) { | ||
200 | SinkWarning() << "Sleeping in incremental query"; | ||
201 | std::this_thread::sleep_for(std::chrono::seconds(1)); | ||
202 | } | ||
203 | |||
204 | return result; | ||
171 | })) | 205 | })) |
172 | .then([query, this, resultProvider, guardPtr](const ReplayResult &newRevisionAndReplayedEntities) { | 206 | .then([query, this, resultProvider, guardPtr, bufferType](const ReplayResult &newRevisionAndReplayedEntities) { |
173 | if (!guardPtr) { | 207 | if (!guardPtr) { |
174 | //Not an error, the query can vanish at any time. | 208 | //Not an error, the query can vanish at any time. |
175 | return; | 209 | return KAsync::null(); |
176 | } | 210 | } |
177 | mQueryInProgress = false; | 211 | mQueryInProgress = false; |
178 | // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. | ||
179 | mResourceAccess->sendRevisionReplayedCommand(newRevisionAndReplayedEntities.newRevision); | 212 | mResourceAccess->sendRevisionReplayedCommand(newRevisionAndReplayedEntities.newRevision); |
180 | resultProvider->setRevision(newRevisionAndReplayedEntities.newRevision); | 213 | resultProvider->setRevision(newRevisionAndReplayedEntities.newRevision); |
214 | if (mRevisionChangedMeanwhile) { | ||
215 | return incrementalFetch(query, bufferType); | ||
216 | } | ||
217 | return KAsync::null(); | ||
181 | }); | 218 | }); |
182 | } | 219 | } |
183 | 220 | ||
diff --git a/common/queryrunner.h b/common/queryrunner.h index 35093e2..e449570 100644 --- a/common/queryrunner.h +++ b/common/queryrunner.h | |||
@@ -77,7 +77,7 @@ private: | |||
77 | * QueryRunner has to keep ResourceAccess alive in order to keep getting updates. | 77 | * QueryRunner has to keep ResourceAccess alive in order to keep getting updates. |
78 | */ | 78 | */ |
79 | template <typename DomainType> | 79 | template <typename DomainType> |
80 | class QueryRunner : public QueryRunnerBase | 80 | class SINK_EXPORT QueryRunner : public QueryRunnerBase |
81 | { | 81 | { |
82 | public: | 82 | public: |
83 | QueryRunner(const Sink::Query &query, const Sink::ResourceContext &context, const QByteArray &bufferType, const Sink::Log::Context &logCtx); | 83 | QueryRunner(const Sink::Query &query, const Sink::ResourceContext &context, const QByteArray &bufferType, const Sink::Log::Context &logCtx); |
@@ -91,6 +91,11 @@ public: | |||
91 | 91 | ||
92 | typename Sink::ResultEmitter<typename DomainType::Ptr>::Ptr emitter(); | 92 | typename Sink::ResultEmitter<typename DomainType::Ptr>::Ptr emitter(); |
93 | 93 | ||
94 | /** | ||
95 | * For testing only. | ||
96 | */ | ||
97 | void delayNextQuery(); | ||
98 | |||
94 | private: | 99 | private: |
95 | void fetch(const Sink::Query &query, const QByteArray &bufferType); | 100 | void fetch(const Sink::Query &query, const QByteArray &bufferType); |
96 | KAsync::Job<void> incrementalFetch(const Sink::Query &query, const QByteArray &bufferType); | 101 | KAsync::Job<void> incrementalFetch(const Sink::Query &query, const QByteArray &bufferType); |
@@ -106,4 +111,6 @@ private: | |||
106 | bool mInitialQueryComplete = false; | 111 | bool mInitialQueryComplete = false; |
107 | bool mQueryInProgress = false; | 112 | bool mQueryInProgress = false; |
108 | bool mRequestFetchMore = false; | 113 | bool mRequestFetchMore = false; |
114 | bool mDelayNextQuery = false; | ||
115 | bool mRevisionChangedMeanwhile = false; | ||
109 | }; | 116 | }; |
diff --git a/tests/querytest.cpp b/tests/querytest.cpp index 2a12979..81b7cdc 100644 --- a/tests/querytest.cpp +++ b/tests/querytest.cpp | |||
@@ -13,6 +13,8 @@ | |||
13 | #include "test.h" | 13 | #include "test.h" |
14 | #include "testutils.h" | 14 | #include "testutils.h" |
15 | #include "applicationdomaintype.h" | 15 | #include "applicationdomaintype.h" |
16 | #include "queryrunner.h" | ||
17 | #include "adaptorfactoryregistry.h" | ||
16 | 18 | ||
17 | #include <KMime/Message> | 19 | #include <KMime/Message> |
18 | 20 | ||
@@ -1229,6 +1231,55 @@ private slots: | |||
1229 | } | 1231 | } |
1230 | } | 1232 | } |
1231 | 1233 | ||
1234 | void testQueryRunnerDontMissUpdates() | ||
1235 | { | ||
1236 | // Setup | ||
1237 | auto folder1 = Folder::createEntity<Folder>("sink.dummy.instance1"); | ||
1238 | VERIFYEXEC(Sink::Store::create<Folder>(folder1)); | ||
1239 | |||
1240 | QDateTime now{QDate{2017, 2, 3}, QTime{10, 0, 0}}; | ||
1241 | |||
1242 | auto createMail = [] (const QByteArray &messageid, const Folder &folder, const QDateTime &date, bool important) { | ||
1243 | auto mail = Mail::createEntity<Mail>("sink.dummy.instance1"); | ||
1244 | mail.setExtractedMessageId(messageid); | ||
1245 | mail.setFolder(folder); | ||
1246 | mail.setExtractedDate(date); | ||
1247 | mail.setImportant(important); | ||
1248 | return mail; | ||
1249 | }; | ||
1250 | |||
1251 | VERIFYEXEC(Sink::Store::create(createMail("mail1", folder1, now, false))); | ||
1252 | |||
1253 | // Ensure all local data is processed | ||
1254 | VERIFYEXEC(Sink::ResourceControl::flushMessageQueue("sink.dummy.instance1")); | ||
1255 | |||
1256 | Query query; | ||
1257 | query.setFlags(Query::LiveQuery); | ||
1258 | |||
1259 | Sink::ResourceContext resourceContext{"sink.dummy.instance1", "sink.dummy", Sink::AdaptorFactoryRegistry::instance().getFactories("sink.dummy")}; | ||
1260 | Sink::Log::Context logCtx; | ||
1261 | auto runner = new QueryRunner<Mail>(query, resourceContext, ApplicationDomain::getTypeName<Mail>(), logCtx); | ||
1262 | runner->delayNextQuery(); | ||
1263 | |||
1264 | auto emitter = runner->emitter(); | ||
1265 | QList<Mail::Ptr> added; | ||
1266 | emitter->onAdded([&](Mail::Ptr mail) { | ||
1267 | added << mail; | ||
1268 | }); | ||
1269 | |||
1270 | emitter->fetch(); | ||
1271 | VERIFYEXEC(Sink::Store::create(createMail("mail2", folder1, now, false))); | ||
1272 | QTRY_COMPARE(added.size(), 2); | ||
1273 | |||
1274 | runner->delayNextQuery(); | ||
1275 | VERIFYEXEC(Sink::Store::create(createMail("mail3", folder1, now, false))); | ||
1276 | //The second revision update is supposed to come in while the initial revision update is still in the query. | ||
1277 | //So wait a bit to make sure the query is currently runnning. | ||
1278 | QTest::qWait(500); | ||
1279 | VERIFYEXEC(Sink::Store::create(createMail("mail4", folder1, now, false))); | ||
1280 | QTRY_COMPARE(added.size(), 4); | ||
1281 | } | ||
1282 | |||
1232 | /* | 1283 | /* |
1233 | * This test is here to ensure we don't crash if we call removeFromDisk with a running query. | 1284 | * This test is here to ensure we don't crash if we call removeFromDisk with a running query. |
1234 | */ | 1285 | */ |