diff options
author | Christian Mollekopf <chrigi_1@fastmail.fm> | 2018-06-22 17:46:57 +0200 |
---|---|---|
committer | Christian Mollekopf <chrigi_1@fastmail.fm> | 2018-06-22 17:46:57 +0200 |
commit | ed17c92c9f3be95e9b280f2abc67f1c0ba48e8c4 (patch) | |
tree | d65e4a0a27fd851adf3d4d44e5beccb17f2fe7a6 | |
parent | d52dc4cfb2cb219aa2db25ae6e201b21bbc3079f (diff) | |
download | sink-ed17c92c9f3be95e9b280f2abc67f1c0ba48e8c4.tar.gz sink-ed17c92c9f3be95e9b280f2abc67f1c0ba48e8c4.zip |
Try harder to avoid storing a revision that is too high in the result
set.
We might miss some updates.
This should not normally ever happen if we assume that we have a
transaction from start to finish of the query (the maxRevision() call
should be equivalent. We do have some cornercases in our lmdb
implementation that breaks transactions when new databases are opened,
so we try to be extra safe this way....
Let's see if it works.
-rw-r--r-- | common/datastorequery.cpp | 2 | ||||
-rw-r--r-- | common/queryrunner.cpp | 11 |
2 files changed, 8 insertions, 5 deletions
diff --git a/common/datastorequery.cpp b/common/datastorequery.cpp index 0195cfc..fd910f8 100644 --- a/common/datastorequery.cpp +++ b/common/datastorequery.cpp | |||
@@ -665,7 +665,7 @@ QVector<QByteArray> DataStoreQuery::loadIncrementalResultSet(qint64 baseRevision | |||
665 | 665 | ||
666 | ResultSet DataStoreQuery::update(qint64 baseRevision) | 666 | ResultSet DataStoreQuery::update(qint64 baseRevision) |
667 | { | 667 | { |
668 | SinkTraceCtx(mLogCtx) << "Executing query update from revision " << baseRevision; | 668 | SinkTraceCtx(mLogCtx) << "Executing query update from revision " << baseRevision << " to revision " << mStore.maxRevision(); |
669 | auto incrementalResultSet = loadIncrementalResultSet(baseRevision); | 669 | auto incrementalResultSet = loadIncrementalResultSet(baseRevision); |
670 | SinkTraceCtx(mLogCtx) << "Incremental changes: " << incrementalResultSet; | 670 | SinkTraceCtx(mLogCtx) << "Incremental changes: " << incrementalResultSet; |
671 | mSource->add(incrementalResultSet); | 671 | mSource->add(incrementalResultSet); |
diff --git a/common/queryrunner.cpp b/common/queryrunner.cpp index 95a2fb4..0977940 100644 --- a/common/queryrunner.cpp +++ b/common/queryrunner.cpp | |||
@@ -295,9 +295,10 @@ ReplayResult QueryWorker<DomainType>::executeIncrementalQuery(const Sink::Query | |||
295 | time.start(); | 295 | time.start(); |
296 | 296 | ||
297 | const qint64 baseRevision = resultProvider.revision() + 1; | 297 | const qint64 baseRevision = resultProvider.revision() + 1; |
298 | SinkTraceCtx(mLogCtx) << "Running query update from revision: " << baseRevision; | ||
299 | 298 | ||
300 | auto entityStore = EntityStore{mResourceContext, mLogCtx}; | 299 | auto entityStore = EntityStore{mResourceContext, mLogCtx}; |
300 | const qint64 topRevision = entityStore.maxRevision(); | ||
301 | SinkTraceCtx(mLogCtx) << "Running query update from revision: " << baseRevision << " to revision " << topRevision; | ||
301 | if (!state) { | 302 | if (!state) { |
302 | SinkWarningCtx(mLogCtx) << "No previous query state."; | 303 | SinkWarningCtx(mLogCtx) << "No previous query state."; |
303 | return {0, 0, false, DataStoreQuery::State::Ptr{}}; | 304 | return {0, 0, false, DataStoreQuery::State::Ptr{}}; |
@@ -309,10 +310,10 @@ ReplayResult QueryWorker<DomainType>::executeIncrementalQuery(const Sink::Query | |||
309 | resultProviderCallback(query, resultProvider, result); | 310 | resultProviderCallback(query, resultProvider, result); |
310 | }); | 311 | }); |
311 | preparedQuery.updateComplete(); | 312 | preparedQuery.updateComplete(); |
312 | SinkTraceCtx(mLogCtx) << "Replayed " << replayResult.replayedEntities << " results.\n" | 313 | SinkTraceCtx(mLogCtx) << "Replayed " << replayResult.replayedEntities << " results until revision: " << topRevision << "\n" |
313 | << (replayResult.replayedAll ? "Replayed all available results.\n" : "") | 314 | << (replayResult.replayedAll ? "Replayed all available results.\n" : "") |
314 | << "Incremental query took: " << Log::TraceTime(time.elapsed()); | 315 | << "Incremental query took: " << Log::TraceTime(time.elapsed()); |
315 | return {entityStore.maxRevision(), replayResult.replayedEntities, false, preparedQuery.getState()}; | 316 | return {topRevision, replayResult.replayedEntities, false, preparedQuery.getState()}; |
316 | } | 317 | } |
317 | 318 | ||
318 | template <class DomainType> | 319 | template <class DomainType> |
@@ -323,6 +324,8 @@ ReplayResult QueryWorker<DomainType>::executeInitialQuery( | |||
323 | time.start(); | 324 | time.start(); |
324 | 325 | ||
325 | auto entityStore = EntityStore{mResourceContext, mLogCtx}; | 326 | auto entityStore = EntityStore{mResourceContext, mLogCtx}; |
327 | const qint64 topRevision = entityStore.maxRevision(); | ||
328 | SinkTraceCtx(mLogCtx) << "Running query from revision: " << topRevision; | ||
326 | auto preparedQuery = [&] { | 329 | auto preparedQuery = [&] { |
327 | if (state) { | 330 | if (state) { |
328 | return DataStoreQuery{*state, ApplicationDomain::getTypeName<DomainType>(), entityStore, false}; | 331 | return DataStoreQuery{*state, ApplicationDomain::getTypeName<DomainType>(), entityStore, false}; |
@@ -341,7 +344,7 @@ ReplayResult QueryWorker<DomainType>::executeInitialQuery( | |||
341 | << (replayResult.replayedAll ? "Replayed all available results.\n" : "") | 344 | << (replayResult.replayedAll ? "Replayed all available results.\n" : "") |
342 | << "Initial query took: " << Log::TraceTime(time.elapsed()); | 345 | << "Initial query took: " << Log::TraceTime(time.elapsed()); |
343 | 346 | ||
344 | return {entityStore.maxRevision(), replayResult.replayedEntities, replayResult.replayedAll, preparedQuery.getState()}; | 347 | return {topRevision, replayResult.replayedEntities, replayResult.replayedAll, preparedQuery.getState()}; |
345 | } | 348 | } |
346 | 349 | ||
347 | #define REGISTER_TYPE(T) \ | 350 | #define REGISTER_TYPE(T) \ |