diff options
author | Christian Mollekopf <chrigi_1@fastmail.fm> | 2017-03-21 22:06:12 +0100 |
---|---|---|
committer | Christian Mollekopf <chrigi_1@fastmail.fm> | 2017-03-22 11:32:53 +0100 |
commit | 86045e308c10c60cd7c4339d305cee1acb084760 (patch) | |
tree | b8fd3e91ffbdeb8ad2e78a61fe1d3426eb5874be | |
parent | daf96f7efec0538e161eab8e906a291015842e1e (diff) | |
download | sink-86045e308c10c60cd7c4339d305cee1acb084760.tar.gz sink-86045e308c10c60cd7c4339d305cee1acb084760.zip |
Maintain the query state instead of using the offset.
Instead of using the offset to skip over old results requires
recalculating them, and resulted in some cases in results being added
multiple times to the model.
By just maintaining the state we can apply the offset directly to the
base-set, and maintain the state in reduction etc. which is necessary to
continue streaming results while making sure we don't report anything
twice.
-rw-r--r-- | common/datastorequery.cpp | 9 | ||||
-rw-r--r-- | common/datastorequery.h | 2 | ||||
-rw-r--r-- | common/queryrunner.cpp | 38 | ||||
-rw-r--r-- | common/queryrunner.h | 3 |
4 files changed, 29 insertions, 23 deletions
diff --git a/common/datastorequery.cpp b/common/datastorequery.cpp index 0db59e1..2e0c348 100644 --- a/common/datastorequery.cpp +++ b/common/datastorequery.cpp | |||
@@ -144,7 +144,7 @@ public: | |||
144 | const auto property = entity.getProperty(filterProperty); | 144 | const auto property = entity.getProperty(filterProperty); |
145 | const auto comparator = propertyFilter.value(filterProperty); | 145 | const auto comparator = propertyFilter.value(filterProperty); |
146 | if (!comparator.matches(property)) { | 146 | if (!comparator.matches(property)) { |
147 | SinkTraceCtx(mDatastore->mLogCtx) << "Filtering entity due to property mismatch on filter: " << filterProperty << property << ":" << comparator.value; | 147 | SinkTraceCtx(mDatastore->mLogCtx) << "Filtering entity due to property mismatch on filter: " << entity.identifier() << "Property: " << filterProperty << property << " Filter:" << comparator.value; |
148 | return false; | 148 | return false; |
149 | } | 149 | } |
150 | } | 150 | } |
@@ -367,19 +367,22 @@ public: | |||
367 | DataStoreQuery::DataStoreQuery(const Sink::QueryBase &query, const QByteArray &type, EntityStore &store) | 367 | DataStoreQuery::DataStoreQuery(const Sink::QueryBase &query, const QByteArray &type, EntityStore &store) |
368 | : mType(type), mStore(store), mLogCtx(store.logContext().subContext("datastorequery")) | 368 | : mType(type), mStore(store), mLogCtx(store.logContext().subContext("datastorequery")) |
369 | { | 369 | { |
370 | //This is what we use during a new query | ||
370 | setupQuery(query); | 371 | setupQuery(query); |
371 | } | 372 | } |
372 | 373 | ||
373 | DataStoreQuery::DataStoreQuery(const DataStoreQuery::State &state, const QByteArray &type, Sink::Storage::EntityStore &store) | 374 | DataStoreQuery::DataStoreQuery(const DataStoreQuery::State &state, const QByteArray &type, Sink::Storage::EntityStore &store, bool incremental) |
374 | : mType(type), mStore(store), mLogCtx(store.logContext().subContext("datastorequery")) | 375 | : mType(type), mStore(store), mLogCtx(store.logContext().subContext("datastorequery")) |
375 | { | 376 | { |
377 | //This is what we use when fetching more data, without having a new revision with incremental=false | ||
378 | //And this is what we use when the data changed and we want to update with incremental = true | ||
376 | mCollector = state.mCollector; | 379 | mCollector = state.mCollector; |
377 | mSource = state.mSource; | 380 | mSource = state.mSource; |
378 | 381 | ||
379 | auto source = mCollector; | 382 | auto source = mCollector; |
380 | while (source) { | 383 | while (source) { |
381 | source->mDatastore = this; | 384 | source->mDatastore = this; |
382 | source->mIncremental = true; | 385 | source->mIncremental = incremental; |
383 | source = source->mSource; | 386 | source = source->mSource; |
384 | } | 387 | } |
385 | } | 388 | } |
diff --git a/common/datastorequery.h b/common/datastorequery.h index a797782..ee5f99e 100644 --- a/common/datastorequery.h +++ b/common/datastorequery.h | |||
@@ -46,7 +46,7 @@ public: | |||
46 | }; | 46 | }; |
47 | 47 | ||
48 | DataStoreQuery(const Sink::QueryBase &query, const QByteArray &type, Sink::Storage::EntityStore &store); | 48 | DataStoreQuery(const Sink::QueryBase &query, const QByteArray &type, Sink::Storage::EntityStore &store); |
49 | DataStoreQuery(const DataStoreQuery::State &state, const QByteArray &type, Sink::Storage::EntityStore &store); | 49 | DataStoreQuery(const DataStoreQuery::State &state, const QByteArray &type, Sink::Storage::EntityStore &store, bool incremental); |
50 | ~DataStoreQuery(); | 50 | ~DataStoreQuery(); |
51 | ResultSet execute(); | 51 | ResultSet execute(); |
52 | ResultSet update(qint64 baseRevision); | 52 | ResultSet update(qint64 baseRevision); |
diff --git a/common/queryrunner.cpp b/common/queryrunner.cpp index 802fc48..43f48c0 100644 --- a/common/queryrunner.cpp +++ b/common/queryrunner.cpp | |||
@@ -51,7 +51,7 @@ public: | |||
51 | virtual ~QueryWorker(); | 51 | virtual ~QueryWorker(); |
52 | 52 | ||
53 | ReplayResult executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, DataStoreQuery::State::Ptr state); | 53 | ReplayResult executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, DataStoreQuery::State::Ptr state); |
54 | ReplayResult executeInitialQuery(const Sink::Query &query, const typename DomainType::Ptr &parent, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, int offset, int batchsize); | 54 | ReplayResult executeInitialQuery(const Sink::Query &query, const typename DomainType::Ptr &parent, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, int batchsize, DataStoreQuery::State::Ptr state); |
55 | 55 | ||
56 | private: | 56 | private: |
57 | void resultProviderCallback(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, const ResultSet::Result &result); | 57 | void resultProviderCallback(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, const ResultSet::Result &result); |
@@ -72,18 +72,18 @@ QueryRunner<DomainType>::QueryRunner(const Sink::Query &query, const Sink::Resou | |||
72 | auto guardPtr = QPointer<QObject>(&guard); | 72 | auto guardPtr = QPointer<QObject>(&guard); |
73 | auto fetcher = [=](const typename DomainType::Ptr &parent) { | 73 | auto fetcher = [=](const typename DomainType::Ptr &parent) { |
74 | const QByteArray parentId = parent ? parent->identifier() : QByteArray(); | 74 | const QByteArray parentId = parent ? parent->identifier() : QByteArray(); |
75 | SinkTraceCtx(mLogCtx) << "Running fetcher. Offset: " << mOffset[parentId] << " Batchsize: " << mBatchSize; | 75 | SinkTraceCtx(mLogCtx) << "Running fetcher. Batchsize: " << mBatchSize; |
76 | auto resultProvider = mResultProvider; | 76 | auto resultProvider = mResultProvider; |
77 | auto resultTransformation = mResultTransformation; | 77 | auto resultTransformation = mResultTransformation; |
78 | auto offset = mOffset[parentId]; | ||
79 | auto batchSize = mBatchSize; | 78 | auto batchSize = mBatchSize; |
80 | auto resourceContext = mResourceContext; | 79 | auto resourceContext = mResourceContext; |
81 | auto logCtx = mLogCtx; | 80 | auto logCtx = mLogCtx; |
81 | auto state = mQueryState.value(parentId); | ||
82 | const bool runAsync = !query.synchronousQuery(); | 82 | const bool runAsync = !query.synchronousQuery(); |
83 | //The lambda will be executed in a separate thread, so copy all arguments | 83 | //The lambda will be executed in a separate thread, so copy all arguments |
84 | async::run<ReplayResult>([resultTransformation, offset, batchSize, query, bufferType, resourceContext, resultProvider, parent, logCtx]() { | 84 | async::run<ReplayResult>([=]() { |
85 | QueryWorker<DomainType> worker(query, resourceContext, bufferType, resultTransformation, logCtx); | 85 | QueryWorker<DomainType> worker(query, resourceContext, bufferType, resultTransformation, logCtx); |
86 | return worker.executeInitialQuery(query, parent, *resultProvider, offset, batchSize); | 86 | return worker.executeInitialQuery(query, parent, *resultProvider, batchSize, state); |
87 | }, runAsync) | 87 | }, runAsync) |
88 | .then([this, parentId, query, parent, resultProvider, guardPtr](const ReplayResult &result) { | 88 | .then([this, parentId, query, parent, resultProvider, guardPtr](const ReplayResult &result) { |
89 | if (!guardPtr) { | 89 | if (!guardPtr) { |
@@ -91,8 +91,7 @@ QueryRunner<DomainType>::QueryRunner(const Sink::Query &query, const Sink::Resou | |||
91 | return; | 91 | return; |
92 | } | 92 | } |
93 | mInitialQueryComplete = true; | 93 | mInitialQueryComplete = true; |
94 | mQueryState = result.queryState; | 94 | mQueryState[parentId] = result.queryState; |
95 | mOffset[parentId] += result.replayedEntities; | ||
96 | // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. | 95 | // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. |
97 | if (query.liveQuery()) { | 96 | if (query.liveQuery()) { |
98 | mResourceAccess->sendRevisionReplayedCommand(result.newRevision); | 97 | mResourceAccess->sendRevisionReplayedCommand(result.newRevision); |
@@ -111,10 +110,11 @@ QueryRunner<DomainType>::QueryRunner(const Sink::Query &query, const Sink::Resou | |||
111 | Q_ASSERT(!query.synchronousQuery()); | 110 | Q_ASSERT(!query.synchronousQuery()); |
112 | // Incremental updates are always loaded directly, leaving it up to the result to discard the changes if they are not interesting | 111 | // Incremental updates are always loaded directly, leaving it up to the result to discard the changes if they are not interesting |
113 | setQuery([=]() -> KAsync::Job<void> { | 112 | setQuery([=]() -> KAsync::Job<void> { |
113 | const QByteArray parentId; | ||
114 | auto resultProvider = mResultProvider; | 114 | auto resultProvider = mResultProvider; |
115 | auto resourceContext = mResourceContext; | 115 | auto resourceContext = mResourceContext; |
116 | auto logCtx = mLogCtx; | 116 | auto logCtx = mLogCtx; |
117 | auto state = mQueryState; | 117 | auto state = mQueryState.value(parentId); |
118 | if (!mInitialQueryComplete) { | 118 | if (!mInitialQueryComplete) { |
119 | SinkWarningCtx(mLogCtx) << "Can't start the incremental query before the initial query is complete"; | 119 | SinkWarningCtx(mLogCtx) << "Can't start the incremental query before the initial query is complete"; |
120 | fetcher({}); | 120 | fetcher({}); |
@@ -225,7 +225,7 @@ ReplayResult QueryWorker<DomainType>::executeIncrementalQuery(const Sink::Query | |||
225 | SinkWarningCtx(mLogCtx) << "No previous query state."; | 225 | SinkWarningCtx(mLogCtx) << "No previous query state."; |
226 | return {0, 0, false, DataStoreQuery::State::Ptr{}}; | 226 | return {0, 0, false, DataStoreQuery::State::Ptr{}}; |
227 | } | 227 | } |
228 | auto preparedQuery = DataStoreQuery{*state, ApplicationDomain::getTypeName<DomainType>(), entityStore}; | 228 | auto preparedQuery = DataStoreQuery{*state, ApplicationDomain::getTypeName<DomainType>(), entityStore, true}; |
229 | auto resultSet = preparedQuery.update(baseRevision); | 229 | auto resultSet = preparedQuery.update(baseRevision); |
230 | SinkTraceCtx(mLogCtx) << "Filtered set retrieved. " << Log::TraceTime(time.elapsed()); | 230 | SinkTraceCtx(mLogCtx) << "Filtered set retrieved. " << Log::TraceTime(time.elapsed()); |
231 | auto replayResult = resultSet.replaySet(0, 0, [this, query, &resultProvider](const ResultSet::Result &result) { | 231 | auto replayResult = resultSet.replaySet(0, 0, [this, query, &resultProvider](const ResultSet::Result &result) { |
@@ -240,7 +240,7 @@ ReplayResult QueryWorker<DomainType>::executeIncrementalQuery(const Sink::Query | |||
240 | 240 | ||
241 | template <class DomainType> | 241 | template <class DomainType> |
242 | ReplayResult QueryWorker<DomainType>::executeInitialQuery( | 242 | ReplayResult QueryWorker<DomainType>::executeInitialQuery( |
243 | const Sink::Query &query, const typename DomainType::Ptr &parent, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, int offset, int batchsize) | 243 | const Sink::Query &query, const typename DomainType::Ptr &parent, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, int batchsize, DataStoreQuery::State::Ptr state) |
244 | { | 244 | { |
245 | QTime time; | 245 | QTime time; |
246 | time.start(); | 246 | time.start(); |
@@ -257,11 +257,17 @@ ReplayResult QueryWorker<DomainType>::executeInitialQuery( | |||
257 | } | 257 | } |
258 | 258 | ||
259 | auto entityStore = EntityStore{mResourceContext, mLogCtx}; | 259 | auto entityStore = EntityStore{mResourceContext, mLogCtx}; |
260 | auto preparedQuery = DataStoreQuery{modifiedQuery, ApplicationDomain::getTypeName<DomainType>(), entityStore}; | 260 | auto preparedQuery = [&] { |
261 | auto resultSet = preparedQuery.execute(); | 261 | if (state) { |
262 | return DataStoreQuery{*state, ApplicationDomain::getTypeName<DomainType>(), entityStore, false}; | ||
263 | } else { | ||
264 | return DataStoreQuery{modifiedQuery, ApplicationDomain::getTypeName<DomainType>(), entityStore}; | ||
265 | } | ||
266 | }(); | ||
267 | auto resultSet = preparedQuery.execute();; | ||
262 | 268 | ||
263 | SinkTraceCtx(mLogCtx) << "Filtered set retrieved. " << Log::TraceTime(time.elapsed()); | 269 | SinkTraceCtx(mLogCtx) << "Filtered set retrieved." << Log::TraceTime(time.elapsed()); |
264 | auto replayResult = resultSet.replaySet(offset, batchsize, [this, query, &resultProvider](const ResultSet::Result &result) { | 270 | auto replayResult = resultSet.replaySet(0, batchsize, [this, query, &resultProvider](const ResultSet::Result &result) { |
265 | resultProviderCallback(query, resultProvider, result); | 271 | resultProviderCallback(query, resultProvider, result); |
266 | }); | 272 | }); |
267 | 273 | ||
@@ -269,9 +275,7 @@ ReplayResult QueryWorker<DomainType>::executeInitialQuery( | |||
269 | << (replayResult.replayedAll ? "Replayed all available results.\n" : "") | 275 | << (replayResult.replayedAll ? "Replayed all available results.\n" : "") |
270 | << "Initial query took: " << Log::TraceTime(time.elapsed()); | 276 | << "Initial query took: " << Log::TraceTime(time.elapsed()); |
271 | 277 | ||
272 | auto state = preparedQuery.getState(); | 278 | return {entityStore.maxRevision(), replayResult.replayedEntities, replayResult.replayedAll, preparedQuery.getState()}; |
273 | |||
274 | return {entityStore.maxRevision(), replayResult.replayedEntities, replayResult.replayedAll, state}; | ||
275 | } | 279 | } |
276 | 280 | ||
277 | #define REGISTER_TYPE(T) \ | 281 | #define REGISTER_TYPE(T) \ |
diff --git a/common/queryrunner.h b/common/queryrunner.h index f5c7ead..5308eac 100644 --- a/common/queryrunner.h +++ b/common/queryrunner.h | |||
@@ -98,11 +98,10 @@ private: | |||
98 | QSharedPointer<Sink::ResourceAccessInterface> mResourceAccess; | 98 | QSharedPointer<Sink::ResourceAccessInterface> mResourceAccess; |
99 | QSharedPointer<Sink::ResultProvider<typename DomainType::Ptr>> mResultProvider; | 99 | QSharedPointer<Sink::ResultProvider<typename DomainType::Ptr>> mResultProvider; |
100 | ResultTransformation mResultTransformation; | 100 | ResultTransformation mResultTransformation; |
101 | QHash<QByteArray, qint64> mOffset; | 101 | QHash<QByteArray, DataStoreQuery::State::Ptr> mQueryState; |
102 | int mBatchSize; | 102 | int mBatchSize; |
103 | QObject guard; | 103 | QObject guard; |
104 | Sink::Log::Context mLogCtx; | 104 | Sink::Log::Context mLogCtx; |
105 | DataStoreQuery::State::Ptr mQueryState; | ||
106 | bool mInitialQueryComplete = false; | 105 | bool mInitialQueryComplete = false; |
107 | bool mQueryInProgress = false; | 106 | bool mQueryInProgress = false; |
108 | }; | 107 | }; |