summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2018-04-01 09:41:28 +0200
committerChristian Mollekopf <chrigi_1@fastmail.fm>2018-04-01 10:02:41 +0200
commit8d2d52fcfd3c42a82b7f86c6f3c5009461f1de9f (patch)
tree042e2db12927d289fce3a2fab8486e03ac4c9049
parent53b76e66f5527a5f9442173279b0a01f1f07da46 (diff)
downloadsink-8d2d52fcfd3c42a82b7f86c6f3c5009461f1de9f.tar.gz
sink-8d2d52fcfd3c42a82b7f86c6f3c5009461f1de9f.zip
Avoid missing revision updates while a query is running.
Instead we have to remember that something has changed and rerun an incremental query.
-rw-r--r--common/queryrunner.cpp53
-rw-r--r--common/queryrunner.h9
-rw-r--r--tests/querytest.cpp51
3 files changed, 104 insertions, 9 deletions
diff --git a/common/queryrunner.cpp b/common/queryrunner.cpp
index 2c50fca..c0c1d00 100644
--- a/common/queryrunner.cpp
+++ b/common/queryrunner.cpp
@@ -21,6 +21,8 @@
21#include <limits> 21#include <limits>
22#include <QTime> 22#include <QTime>
23#include <QPointer> 23#include <QPointer>
24#include <thread>
25#include <chrono>
24 26
25#include "commands.h" 27#include "commands.h"
26#include "asyncutils.h" 28#include "asyncutils.h"
@@ -97,6 +99,13 @@ QueryRunner<DomainType>::~QueryRunner()
97 SinkTraceCtx(mLogCtx) << "Stopped query"; 99 SinkTraceCtx(mLogCtx) << "Stopped query";
98} 100}
99 101
102
103template <class DomainType>
104void QueryRunner<DomainType>::delayNextQuery()
105{
106 mDelayNextQuery = true;
107}
108
100//This function triggers the initial fetch, and then subsequent calls will simply fetch more data of mBatchSize. 109//This function triggers the initial fetch, and then subsequent calls will simply fetch more data of mBatchSize.
101template <class DomainType> 110template <class DomainType>
102void QueryRunner<DomainType>::fetch(const Sink::Query &query, const QByteArray &bufferType) 111void QueryRunner<DomainType>::fetch(const Sink::Query &query, const QByteArray &bufferType)
@@ -115,11 +124,20 @@ void QueryRunner<DomainType>::fetch(const Sink::Query &query, const QByteArray &
115 auto resourceContext = mResourceContext; 124 auto resourceContext = mResourceContext;
116 auto logCtx = mLogCtx; 125 auto logCtx = mLogCtx;
117 auto state = mQueryState; 126 auto state = mQueryState;
127 bool addDelay = mDelayNextQuery;
128 mDelayNextQuery = false;
118 const bool runAsync = !query.synchronousQuery(); 129 const bool runAsync = !query.synchronousQuery();
119 //The lambda will be executed in a separate thread, so copy all arguments 130 //The lambda will be executed in a separate thread, so copy all arguments
120 async::run<ReplayResult>([=]() { 131 async::run<ReplayResult>([=]() {
121 QueryWorker<DomainType> worker(query, resourceContext, bufferType, resultTransformation, logCtx); 132 QueryWorker<DomainType> worker(query, resourceContext, bufferType, resultTransformation, logCtx);
122 return worker.executeInitialQuery(query, *resultProvider, batchSize, state); 133 const auto result = worker.executeInitialQuery(query, *resultProvider, batchSize, state);
134
135 //For testing only
136 if (addDelay) {
137 std::this_thread::sleep_for(std::chrono::seconds(1));
138 }
139
140 return result;
123 }, runAsync) 141 }, runAsync)
124 .then([=](const ReplayResult &result) { 142 .then([=](const ReplayResult &result) {
125 if (!guardPtr) { 143 if (!guardPtr) {
@@ -137,7 +155,12 @@ void QueryRunner<DomainType>::fetch(const Sink::Query &query, const QByteArray &
137 resultProvider->initialResultSetComplete(result.replayedAll); 155 resultProvider->initialResultSetComplete(result.replayedAll);
138 if (mRequestFetchMore) { 156 if (mRequestFetchMore) {
139 mRequestFetchMore = false; 157 mRequestFetchMore = false;
158 //This code exists for incemental fetches, so we don't skip loading another set.
140 fetch(query, bufferType); 159 fetch(query, bufferType);
160 return;
161 }
162 if (mRevisionChangedMeanwhile) {
163 incrementalFetch(query, bufferType).exec();
141 } 164 }
142 }) 165 })
143 .exec(); 166 .exec();
@@ -146,15 +169,17 @@ void QueryRunner<DomainType>::fetch(const Sink::Query &query, const QByteArray &
146template <class DomainType> 169template <class DomainType>
147KAsync::Job<void> QueryRunner<DomainType>::incrementalFetch(const Sink::Query &query, const QByteArray &bufferType) 170KAsync::Job<void> QueryRunner<DomainType>::incrementalFetch(const Sink::Query &query, const QByteArray &bufferType)
148{ 171{
149 if (!mInitialQueryComplete) { 172 if (!mInitialQueryComplete && !mQueryInProgress) {
150 SinkWarningCtx(mLogCtx) << "Can't start the incremental query before the initial query is complete"; 173 //We rely on this codepath in the case of newly added resources to trigger the initial fetch.
151 fetch(query, bufferType); 174 fetch(query, bufferType);
152 return KAsync::null(); 175 return KAsync::null();
153 } 176 }
154 if (mQueryInProgress) { 177 if (mQueryInProgress) {
155 //Can happen if the revision come in quicker than we process them. 178 //If a query is already in progress we just remember to fetch again once the current query is done.
179 mRevisionChangedMeanwhile = true;
156 return KAsync::null(); 180 return KAsync::null();
157 } 181 }
182 mRevisionChangedMeanwhile = false;
158 auto resultProvider = mResultProvider; 183 auto resultProvider = mResultProvider;
159 auto resourceContext = mResourceContext; 184 auto resourceContext = mResourceContext;
160 auto logCtx = mLogCtx; 185 auto logCtx = mLogCtx;
@@ -162,22 +187,34 @@ KAsync::Job<void> QueryRunner<DomainType>::incrementalFetch(const Sink::Query &q
162 auto resultTransformation = mResultTransformation; 187 auto resultTransformation = mResultTransformation;
163 Q_ASSERT(!mQueryInProgress); 188 Q_ASSERT(!mQueryInProgress);
164 auto guardPtr = QPointer<QObject>(&guard); 189 auto guardPtr = QPointer<QObject>(&guard);
190 bool addDelay = mDelayNextQuery;
191 mDelayNextQuery = false;
165 return KAsync::start([&] { 192 return KAsync::start([&] {
166 mQueryInProgress = true; 193 mQueryInProgress = true;
167 }) 194 })
168 .then(async::run<ReplayResult>([=]() { 195 .then(async::run<ReplayResult>([=]() {
169 QueryWorker<DomainType> worker(query, resourceContext, bufferType, resultTransformation, logCtx); 196 QueryWorker<DomainType> worker(query, resourceContext, bufferType, resultTransformation, logCtx);
170 return worker.executeIncrementalQuery(query, *resultProvider, state); 197 const auto result = worker.executeIncrementalQuery(query, *resultProvider, state);
198 ////For testing only
199 if (addDelay) {
200 SinkWarning() << "Sleeping in incremental query";
201 std::this_thread::sleep_for(std::chrono::seconds(1));
202 }
203
204 return result;
171 })) 205 }))
172 .then([query, this, resultProvider, guardPtr](const ReplayResult &newRevisionAndReplayedEntities) { 206 .then([query, this, resultProvider, guardPtr, bufferType](const ReplayResult &newRevisionAndReplayedEntities) {
173 if (!guardPtr) { 207 if (!guardPtr) {
174 //Not an error, the query can vanish at any time. 208 //Not an error, the query can vanish at any time.
175 return; 209 return KAsync::null();
176 } 210 }
177 mQueryInProgress = false; 211 mQueryInProgress = false;
178 // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise.
179 mResourceAccess->sendRevisionReplayedCommand(newRevisionAndReplayedEntities.newRevision); 212 mResourceAccess->sendRevisionReplayedCommand(newRevisionAndReplayedEntities.newRevision);
180 resultProvider->setRevision(newRevisionAndReplayedEntities.newRevision); 213 resultProvider->setRevision(newRevisionAndReplayedEntities.newRevision);
214 if (mRevisionChangedMeanwhile) {
215 return incrementalFetch(query, bufferType);
216 }
217 return KAsync::null();
181 }); 218 });
182} 219}
183 220
diff --git a/common/queryrunner.h b/common/queryrunner.h
index 35093e2..e449570 100644
--- a/common/queryrunner.h
+++ b/common/queryrunner.h
@@ -77,7 +77,7 @@ private:
77 * QueryRunner has to keep ResourceAccess alive in order to keep getting updates. 77 * QueryRunner has to keep ResourceAccess alive in order to keep getting updates.
78 */ 78 */
79template <typename DomainType> 79template <typename DomainType>
80class QueryRunner : public QueryRunnerBase 80class SINK_EXPORT QueryRunner : public QueryRunnerBase
81{ 81{
82public: 82public:
83 QueryRunner(const Sink::Query &query, const Sink::ResourceContext &context, const QByteArray &bufferType, const Sink::Log::Context &logCtx); 83 QueryRunner(const Sink::Query &query, const Sink::ResourceContext &context, const QByteArray &bufferType, const Sink::Log::Context &logCtx);
@@ -91,6 +91,11 @@ public:
91 91
92 typename Sink::ResultEmitter<typename DomainType::Ptr>::Ptr emitter(); 92 typename Sink::ResultEmitter<typename DomainType::Ptr>::Ptr emitter();
93 93
94 /**
95 * For testing only.
96 */
97 void delayNextQuery();
98
94private: 99private:
95 void fetch(const Sink::Query &query, const QByteArray &bufferType); 100 void fetch(const Sink::Query &query, const QByteArray &bufferType);
96 KAsync::Job<void> incrementalFetch(const Sink::Query &query, const QByteArray &bufferType); 101 KAsync::Job<void> incrementalFetch(const Sink::Query &query, const QByteArray &bufferType);
@@ -106,4 +111,6 @@ private:
106 bool mInitialQueryComplete = false; 111 bool mInitialQueryComplete = false;
107 bool mQueryInProgress = false; 112 bool mQueryInProgress = false;
108 bool mRequestFetchMore = false; 113 bool mRequestFetchMore = false;
114 bool mDelayNextQuery = false;
115 bool mRevisionChangedMeanwhile = false;
109}; 116};
diff --git a/tests/querytest.cpp b/tests/querytest.cpp
index 2a12979..81b7cdc 100644
--- a/tests/querytest.cpp
+++ b/tests/querytest.cpp
@@ -13,6 +13,8 @@
13#include "test.h" 13#include "test.h"
14#include "testutils.h" 14#include "testutils.h"
15#include "applicationdomaintype.h" 15#include "applicationdomaintype.h"
16#include "queryrunner.h"
17#include "adaptorfactoryregistry.h"
16 18
17#include <KMime/Message> 19#include <KMime/Message>
18 20
@@ -1229,6 +1231,55 @@ private slots:
1229 } 1231 }
1230 } 1232 }
1231 1233
1234 void testQueryRunnerDontMissUpdates()
1235 {
1236 // Setup
1237 auto folder1 = Folder::createEntity<Folder>("sink.dummy.instance1");
1238 VERIFYEXEC(Sink::Store::create<Folder>(folder1));
1239
1240 QDateTime now{QDate{2017, 2, 3}, QTime{10, 0, 0}};
1241
1242 auto createMail = [] (const QByteArray &messageid, const Folder &folder, const QDateTime &date, bool important) {
1243 auto mail = Mail::createEntity<Mail>("sink.dummy.instance1");
1244 mail.setExtractedMessageId(messageid);
1245 mail.setFolder(folder);
1246 mail.setExtractedDate(date);
1247 mail.setImportant(important);
1248 return mail;
1249 };
1250
1251 VERIFYEXEC(Sink::Store::create(createMail("mail1", folder1, now, false)));
1252
1253 // Ensure all local data is processed
1254 VERIFYEXEC(Sink::ResourceControl::flushMessageQueue("sink.dummy.instance1"));
1255
1256 Query query;
1257 query.setFlags(Query::LiveQuery);
1258
1259 Sink::ResourceContext resourceContext{"sink.dummy.instance1", "sink.dummy", Sink::AdaptorFactoryRegistry::instance().getFactories("sink.dummy")};
1260 Sink::Log::Context logCtx;
1261 auto runner = new QueryRunner<Mail>(query, resourceContext, ApplicationDomain::getTypeName<Mail>(), logCtx);
1262 runner->delayNextQuery();
1263
1264 auto emitter = runner->emitter();
1265 QList<Mail::Ptr> added;
1266 emitter->onAdded([&](Mail::Ptr mail) {
1267 added << mail;
1268 });
1269
1270 emitter->fetch();
1271 VERIFYEXEC(Sink::Store::create(createMail("mail2", folder1, now, false)));
1272 QTRY_COMPARE(added.size(), 2);
1273
1274 runner->delayNextQuery();
1275 VERIFYEXEC(Sink::Store::create(createMail("mail3", folder1, now, false)));
1276 //The second revision update is supposed to come in while the initial revision update is still in the query.
1277 //So wait a bit to make sure the query is currently runnning.
1278 QTest::qWait(500);
1279 VERIFYEXEC(Sink::Store::create(createMail("mail4", folder1, now, false)));
1280 QTRY_COMPARE(added.size(), 4);
1281 }
1282
1232 /* 1283 /*
1233 * This test is here to ensure we don't crash if we call removeFromDisk with a running query. 1284 * This test is here to ensure we don't crash if we call removeFromDisk with a running query.
1234 */ 1285 */