summaryrefslogtreecommitdiffstats
path: root/common/queryrunner.cpp
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2018-06-22 17:46:57 +0200
committerChristian Mollekopf <chrigi_1@fastmail.fm>2018-06-22 17:46:57 +0200
commited17c92c9f3be95e9b280f2abc67f1c0ba48e8c4 (patch)
treed65e4a0a27fd851adf3d4d44e5beccb17f2fe7a6 /common/queryrunner.cpp
parentd52dc4cfb2cb219aa2db25ae6e201b21bbc3079f (diff)
downloadsink-ed17c92c9f3be95e9b280f2abc67f1c0ba48e8c4.tar.gz
sink-ed17c92c9f3be95e9b280f2abc67f1c0ba48e8c4.zip
Try harder to avoid storing a revision that is too high in the result
set. We might miss some updates. This should not normally ever happen if we assume that we have a transaction from start to finish of the query (the maxRevision() call should be equivalent. We do have some cornercases in our lmdb implementation that breaks transactions when new databases are opened, so we try to be extra safe this way.... Let's see if it works.
Diffstat (limited to 'common/queryrunner.cpp')
-rw-r--r--common/queryrunner.cpp11
1 files changed, 7 insertions, 4 deletions
diff --git a/common/queryrunner.cpp b/common/queryrunner.cpp
index 95a2fb4..0977940 100644
--- a/common/queryrunner.cpp
+++ b/common/queryrunner.cpp
@@ -295,9 +295,10 @@ ReplayResult QueryWorker<DomainType>::executeIncrementalQuery(const Sink::Query
295 time.start(); 295 time.start();
296 296
297 const qint64 baseRevision = resultProvider.revision() + 1; 297 const qint64 baseRevision = resultProvider.revision() + 1;
298 SinkTraceCtx(mLogCtx) << "Running query update from revision: " << baseRevision;
299 298
300 auto entityStore = EntityStore{mResourceContext, mLogCtx}; 299 auto entityStore = EntityStore{mResourceContext, mLogCtx};
300 const qint64 topRevision = entityStore.maxRevision();
301 SinkTraceCtx(mLogCtx) << "Running query update from revision: " << baseRevision << " to revision " << topRevision;
301 if (!state) { 302 if (!state) {
302 SinkWarningCtx(mLogCtx) << "No previous query state."; 303 SinkWarningCtx(mLogCtx) << "No previous query state.";
303 return {0, 0, false, DataStoreQuery::State::Ptr{}}; 304 return {0, 0, false, DataStoreQuery::State::Ptr{}};
@@ -309,10 +310,10 @@ ReplayResult QueryWorker<DomainType>::executeIncrementalQuery(const Sink::Query
309 resultProviderCallback(query, resultProvider, result); 310 resultProviderCallback(query, resultProvider, result);
310 }); 311 });
311 preparedQuery.updateComplete(); 312 preparedQuery.updateComplete();
312 SinkTraceCtx(mLogCtx) << "Replayed " << replayResult.replayedEntities << " results.\n" 313 SinkTraceCtx(mLogCtx) << "Replayed " << replayResult.replayedEntities << " results until revision: " << topRevision << "\n"
313 << (replayResult.replayedAll ? "Replayed all available results.\n" : "") 314 << (replayResult.replayedAll ? "Replayed all available results.\n" : "")
314 << "Incremental query took: " << Log::TraceTime(time.elapsed()); 315 << "Incremental query took: " << Log::TraceTime(time.elapsed());
315 return {entityStore.maxRevision(), replayResult.replayedEntities, false, preparedQuery.getState()}; 316 return {topRevision, replayResult.replayedEntities, false, preparedQuery.getState()};
316} 317}
317 318
318template <class DomainType> 319template <class DomainType>
@@ -323,6 +324,8 @@ ReplayResult QueryWorker<DomainType>::executeInitialQuery(
323 time.start(); 324 time.start();
324 325
325 auto entityStore = EntityStore{mResourceContext, mLogCtx}; 326 auto entityStore = EntityStore{mResourceContext, mLogCtx};
327 const qint64 topRevision = entityStore.maxRevision();
328 SinkTraceCtx(mLogCtx) << "Running query from revision: " << topRevision;
326 auto preparedQuery = [&] { 329 auto preparedQuery = [&] {
327 if (state) { 330 if (state) {
328 return DataStoreQuery{*state, ApplicationDomain::getTypeName<DomainType>(), entityStore, false}; 331 return DataStoreQuery{*state, ApplicationDomain::getTypeName<DomainType>(), entityStore, false};
@@ -341,7 +344,7 @@ ReplayResult QueryWorker<DomainType>::executeInitialQuery(
341 << (replayResult.replayedAll ? "Replayed all available results.\n" : "") 344 << (replayResult.replayedAll ? "Replayed all available results.\n" : "")
342 << "Initial query took: " << Log::TraceTime(time.elapsed()); 345 << "Initial query took: " << Log::TraceTime(time.elapsed());
343 346
344 return {entityStore.maxRevision(), replayResult.replayedEntities, replayResult.replayedAll, preparedQuery.getState()}; 347 return {topRevision, replayResult.replayedEntities, replayResult.replayedAll, preparedQuery.getState()};
345} 348}
346 349
347#define REGISTER_TYPE(T) \ 350#define REGISTER_TYPE(T) \