From ed17c92c9f3be95e9b280f2abc67f1c0ba48e8c4 Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Fri, 22 Jun 2018 17:46:57 +0200 Subject: Try harder to avoid storing a revision that is too high in the result set. We might miss some updates. This should not normally ever happen if we assume that we have a transaction from start to finish of the query (the maxRevision() call should be equivalent. We do have some cornercases in our lmdb implementation that breaks transactions when new databases are opened, so we try to be extra safe this way.... Let's see if it works. --- common/datastorequery.cpp | 2 +- common/queryrunner.cpp | 11 +++++++---- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/common/datastorequery.cpp b/common/datastorequery.cpp index 0195cfc..fd910f8 100644 --- a/common/datastorequery.cpp +++ b/common/datastorequery.cpp @@ -665,7 +665,7 @@ QVector DataStoreQuery::loadIncrementalResultSet(qint64 baseRevision ResultSet DataStoreQuery::update(qint64 baseRevision) { - SinkTraceCtx(mLogCtx) << "Executing query update from revision " << baseRevision; + SinkTraceCtx(mLogCtx) << "Executing query update from revision " << baseRevision << " to revision " << mStore.maxRevision(); auto incrementalResultSet = loadIncrementalResultSet(baseRevision); SinkTraceCtx(mLogCtx) << "Incremental changes: " << incrementalResultSet; mSource->add(incrementalResultSet); diff --git a/common/queryrunner.cpp b/common/queryrunner.cpp index 95a2fb4..0977940 100644 --- a/common/queryrunner.cpp +++ b/common/queryrunner.cpp @@ -295,9 +295,10 @@ ReplayResult QueryWorker::executeIncrementalQuery(const Sink::Query time.start(); const qint64 baseRevision = resultProvider.revision() + 1; - SinkTraceCtx(mLogCtx) << "Running query update from revision: " << baseRevision; auto entityStore = EntityStore{mResourceContext, mLogCtx}; + const qint64 topRevision = entityStore.maxRevision(); + SinkTraceCtx(mLogCtx) << "Running query update from revision: " << baseRevision << " to revision " << topRevision; if (!state) { SinkWarningCtx(mLogCtx) << "No previous query state."; return {0, 0, false, DataStoreQuery::State::Ptr{}}; @@ -309,10 +310,10 @@ ReplayResult QueryWorker::executeIncrementalQuery(const Sink::Query resultProviderCallback(query, resultProvider, result); }); preparedQuery.updateComplete(); - SinkTraceCtx(mLogCtx) << "Replayed " << replayResult.replayedEntities << " results.\n" + SinkTraceCtx(mLogCtx) << "Replayed " << replayResult.replayedEntities << " results until revision: " << topRevision << "\n" << (replayResult.replayedAll ? "Replayed all available results.\n" : "") << "Incremental query took: " << Log::TraceTime(time.elapsed()); - return {entityStore.maxRevision(), replayResult.replayedEntities, false, preparedQuery.getState()}; + return {topRevision, replayResult.replayedEntities, false, preparedQuery.getState()}; } template @@ -323,6 +324,8 @@ ReplayResult QueryWorker::executeInitialQuery( time.start(); auto entityStore = EntityStore{mResourceContext, mLogCtx}; + const qint64 topRevision = entityStore.maxRevision(); + SinkTraceCtx(mLogCtx) << "Running query from revision: " << topRevision; auto preparedQuery = [&] { if (state) { return DataStoreQuery{*state, ApplicationDomain::getTypeName(), entityStore, false}; @@ -341,7 +344,7 @@ ReplayResult QueryWorker::executeInitialQuery( << (replayResult.replayedAll ? "Replayed all available results.\n" : "") << "Initial query took: " << Log::TraceTime(time.elapsed()); - return {entityStore.maxRevision(), replayResult.replayedEntities, replayResult.replayedAll, preparedQuery.getState()}; + return {topRevision, replayResult.replayedEntities, replayResult.replayedAll, preparedQuery.getState()}; } #define REGISTER_TYPE(T) \ -- cgit v1.2.3