diff options
-rw-r--r-- | common/datastorequery.cpp | 9 | ||||
-rw-r--r-- | common/datastorequery.h | 2 | ||||
-rw-r--r-- | common/queryrunner.cpp | 38 | ||||
-rw-r--r-- | common/queryrunner.h | 3 |
4 files changed, 29 insertions, 23 deletions
diff --git a/common/datastorequery.cpp b/common/datastorequery.cpp index 0db59e1..2e0c348 100644 --- a/common/datastorequery.cpp +++ b/common/datastorequery.cpp | |||
@@ -144,7 +144,7 @@ public: | |||
144 | const auto property = entity.getProperty(filterProperty); | 144 | const auto property = entity.getProperty(filterProperty); |
145 | const auto comparator = propertyFilter.value(filterProperty); | 145 | const auto comparator = propertyFilter.value(filterProperty); |
146 | if (!comparator.matches(property)) { | 146 | if (!comparator.matches(property)) { |
147 | SinkTraceCtx(mDatastore->mLogCtx) << "Filtering entity due to property mismatch on filter: " << filterProperty << property << ":" << comparator.value; | 147 | SinkTraceCtx(mDatastore->mLogCtx) << "Filtering entity due to property mismatch on filter: " << entity.identifier() << "Property: " << filterProperty << property << " Filter:" << comparator.value; |
148 | return false; | 148 | return false; |
149 | } | 149 | } |
150 | } | 150 | } |
@@ -367,19 +367,22 @@ public: | |||
367 | DataStoreQuery::DataStoreQuery(const Sink::QueryBase &query, const QByteArray &type, EntityStore &store) | 367 | DataStoreQuery::DataStoreQuery(const Sink::QueryBase &query, const QByteArray &type, EntityStore &store) |
368 | : mType(type), mStore(store), mLogCtx(store.logContext().subContext("datastorequery")) | 368 | : mType(type), mStore(store), mLogCtx(store.logContext().subContext("datastorequery")) |
369 | { | 369 | { |
370 | //This is what we use during a new query | ||
370 | setupQuery(query); | 371 | setupQuery(query); |
371 | } | 372 | } |
372 | 373 | ||
373 | DataStoreQuery::DataStoreQuery(const DataStoreQuery::State &state, const QByteArray &type, Sink::Storage::EntityStore &store) | 374 | DataStoreQuery::DataStoreQuery(const DataStoreQuery::State &state, const QByteArray &type, Sink::Storage::EntityStore &store, bool incremental) |
374 | : mType(type), mStore(store), mLogCtx(store.logContext().subContext("datastorequery")) | 375 | : mType(type), mStore(store), mLogCtx(store.logContext().subContext("datastorequery")) |
375 | { | 376 | { |
377 | //This is what we use when fetching more data, without having a new revision with incremental=false | ||
378 | //And this is what we use when the data changed and we want to update with incremental = true | ||
376 | mCollector = state.mCollector; | 379 | mCollector = state.mCollector; |
377 | mSource = state.mSource; | 380 | mSource = state.mSource; |
378 | 381 | ||
379 | auto source = mCollector; | 382 | auto source = mCollector; |
380 | while (source) { | 383 | while (source) { |
381 | source->mDatastore = this; | 384 | source->mDatastore = this; |
382 | source->mIncremental = true; | 385 | source->mIncremental = incremental; |
383 | source = source->mSource; | 386 | source = source->mSource; |
384 | } | 387 | } |
385 | } | 388 | } |
diff --git a/common/datastorequery.h b/common/datastorequery.h index a797782..ee5f99e 100644 --- a/common/datastorequery.h +++ b/common/datastorequery.h | |||
@@ -46,7 +46,7 @@ public: | |||
46 | }; | 46 | }; |
47 | 47 | ||
48 | DataStoreQuery(const Sink::QueryBase &query, const QByteArray &type, Sink::Storage::EntityStore &store); | 48 | DataStoreQuery(const Sink::QueryBase &query, const QByteArray &type, Sink::Storage::EntityStore &store); |
49 | DataStoreQuery(const DataStoreQuery::State &state, const QByteArray &type, Sink::Storage::EntityStore &store); | 49 | DataStoreQuery(const DataStoreQuery::State &state, const QByteArray &type, Sink::Storage::EntityStore &store, bool incremental); |
50 | ~DataStoreQuery(); | 50 | ~DataStoreQuery(); |
51 | ResultSet execute(); | 51 | ResultSet execute(); |
52 | ResultSet update(qint64 baseRevision); | 52 | ResultSet update(qint64 baseRevision); |
diff --git a/common/queryrunner.cpp b/common/queryrunner.cpp index 802fc48..43f48c0 100644 --- a/common/queryrunner.cpp +++ b/common/queryrunner.cpp | |||
@@ -51,7 +51,7 @@ public: | |||
51 | virtual ~QueryWorker(); | 51 | virtual ~QueryWorker(); |
52 | 52 | ||
53 | ReplayResult executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, DataStoreQuery::State::Ptr state); | 53 | ReplayResult executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, DataStoreQuery::State::Ptr state); |
54 | ReplayResult executeInitialQuery(const Sink::Query &query, const typename DomainType::Ptr &parent, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, int offset, int batchsize); | 54 | ReplayResult executeInitialQuery(const Sink::Query &query, const typename DomainType::Ptr &parent, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, int batchsize, DataStoreQuery::State::Ptr state); |
55 | 55 | ||
56 | private: | 56 | private: |
57 | void resultProviderCallback(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, const ResultSet::Result &result); | 57 | void resultProviderCallback(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, const ResultSet::Result &result); |
@@ -72,18 +72,18 @@ QueryRunner<DomainType>::QueryRunner(const Sink::Query &query, const Sink::Resou | |||
72 | auto guardPtr = QPointer<QObject>(&guard); | 72 | auto guardPtr = QPointer<QObject>(&guard); |
73 | auto fetcher = [=](const typename DomainType::Ptr &parent) { | 73 | auto fetcher = [=](const typename DomainType::Ptr &parent) { |
74 | const QByteArray parentId = parent ? parent->identifier() : QByteArray(); | 74 | const QByteArray parentId = parent ? parent->identifier() : QByteArray(); |
75 | SinkTraceCtx(mLogCtx) << "Running fetcher. Offset: " << mOffset[parentId] << " Batchsize: " << mBatchSize; | 75 | SinkTraceCtx(mLogCtx) << "Running fetcher. Batchsize: " << mBatchSize; |
76 | auto resultProvider = mResultProvider; | 76 | auto resultProvider = mResultProvider; |
77 | auto resultTransformation = mResultTransformation; | 77 | auto resultTransformation = mResultTransformation; |
78 | auto offset = mOffset[parentId]; | ||
79 | auto batchSize = mBatchSize; | 78 | auto batchSize = mBatchSize; |
80 | auto resourceContext = mResourceContext; | 79 | auto resourceContext = mResourceContext; |
81 | auto logCtx = mLogCtx; | 80 | auto logCtx = mLogCtx; |
81 | auto state = mQueryState.value(parentId); | ||
82 | const bool runAsync = !query.synchronousQuery(); | 82 | const bool runAsync = !query.synchronousQuery(); |
83 | //The lambda will be executed in a separate thread, so copy all arguments | 83 | //The lambda will be executed in a separate thread, so copy all arguments |
84 | async::run<ReplayResult>([resultTransformation, offset, batchSize, query, bufferType, resourceContext, resultProvider, parent, logCtx]() { | 84 | async::run<ReplayResult>([=]() { |
85 | QueryWorker<DomainType> worker(query, resourceContext, bufferType, resultTransformation, logCtx); | 85 | QueryWorker<DomainType> worker(query, resourceContext, bufferType, resultTransformation, logCtx); |
86 | return worker.executeInitialQuery(query, parent, *resultProvider, offset, batchSize); | 86 | return worker.executeInitialQuery(query, parent, *resultProvider, batchSize, state); |
87 | }, runAsync) | 87 | }, runAsync) |
88 | .then([this, parentId, query, parent, resultProvider, guardPtr](const ReplayResult &result) { | 88 | .then([this, parentId, query, parent, resultProvider, guardPtr](const ReplayResult &result) { |
89 | if (!guardPtr) { | 89 | if (!guardPtr) { |
@@ -91,8 +91,7 @@ QueryRunner<DomainType>::QueryRunner(const Sink::Query &query, const Sink::Resou | |||
91 | return; | 91 | return; |
92 | } | 92 | } |
93 | mInitialQueryComplete = true; | 93 | mInitialQueryComplete = true; |
94 | mQueryState = result.queryState; | 94 | mQueryState[parentId] = result.queryState; |
95 | mOffset[parentId] += result.replayedEntities; | ||
96 | // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. | 95 | // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. |
97 | if (query.liveQuery()) { | 96 | if (query.liveQuery()) { |
98 | mResourceAccess->sendRevisionReplayedCommand(result.newRevision); | 97 | mResourceAccess->sendRevisionReplayedCommand(result.newRevision); |
@@ -111,10 +110,11 @@ QueryRunner<DomainType>::QueryRunner(const Sink::Query &query, const Sink::Resou | |||
111 | Q_ASSERT(!query.synchronousQuery()); | 110 | Q_ASSERT(!query.synchronousQuery()); |
112 | // Incremental updates are always loaded directly, leaving it up to the result to discard the changes if they are not interesting | 111 | // Incremental updates are always loaded directly, leaving it up to the result to discard the changes if they are not interesting |
113 | setQuery([=]() -> KAsync::Job<void> { | 112 | setQuery([=]() -> KAsync::Job<void> { |
113 | const QByteArray parentId; | ||
114 | auto resultProvider = mResultProvider; | 114 | auto resultProvider = mResultProvider; |
115 | auto resourceContext = mResourceContext; | 115 | auto resourceContext = mResourceContext; |
116 | auto logCtx = mLogCtx; | 116 | auto logCtx = mLogCtx; |
117 | auto state = mQueryState; | 117 | auto state = mQueryState.value(parentId); |
118 | if (!mInitialQueryComplete) { | 118 | if (!mInitialQueryComplete) { |
119 | SinkWarningCtx(mLogCtx) << "Can't start the incremental query before the initial query is complete"; | 119 | SinkWarningCtx(mLogCtx) << "Can't start the incremental query before the initial query is complete"; |
120 | fetcher({}); | 120 | fetcher({}); |
@@ -225,7 +225,7 @@ ReplayResult QueryWorker<DomainType>::executeIncrementalQuery(const Sink::Query | |||
225 | SinkWarningCtx(mLogCtx) << "No previous query state."; | 225 | SinkWarningCtx(mLogCtx) << "No previous query state."; |
226 | return {0, 0, false, DataStoreQuery::State::Ptr{}}; | 226 | return {0, 0, false, DataStoreQuery::State::Ptr{}}; |
227 | } | 227 | } |
228 | auto preparedQuery = DataStoreQuery{*state, ApplicationDomain::getTypeName<DomainType>(), entityStore}; | 228 | auto preparedQuery = DataStoreQuery{*state, ApplicationDomain::getTypeName<DomainType>(), entityStore, true}; |
229 | auto resultSet = preparedQuery.update(baseRevision); | 229 | auto resultSet = preparedQuery.update(baseRevision); |
230 | SinkTraceCtx(mLogCtx) << "Filtered set retrieved. " << Log::TraceTime(time.elapsed()); | 230 | SinkTraceCtx(mLogCtx) << "Filtered set retrieved. " << Log::TraceTime(time.elapsed()); |
231 | auto replayResult = resultSet.replaySet(0, 0, [this, query, &resultProvider](const ResultSet::Result &result) { | 231 | auto replayResult = resultSet.replaySet(0, 0, [this, query, &resultProvider](const ResultSet::Result &result) { |
@@ -240,7 +240,7 @@ ReplayResult QueryWorker<DomainType>::executeIncrementalQuery(const Sink::Query | |||
240 | 240 | ||
241 | template <class DomainType> | 241 | template <class DomainType> |
242 | ReplayResult QueryWorker<DomainType>::executeInitialQuery( | 242 | ReplayResult QueryWorker<DomainType>::executeInitialQuery( |
243 | const Sink::Query &query, const typename DomainType::Ptr &parent, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, int offset, int batchsize) | 243 | const Sink::Query &query, const typename DomainType::Ptr &parent, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, int batchsize, DataStoreQuery::State::Ptr state) |
244 | { | 244 | { |
245 | QTime time; | 245 | QTime time; |
246 | time.start(); | 246 | time.start(); |
@@ -257,11 +257,17 @@ ReplayResult QueryWorker<DomainType>::executeInitialQuery( | |||
257 | } | 257 | } |
258 | 258 | ||
259 | auto entityStore = EntityStore{mResourceContext, mLogCtx}; | 259 | auto entityStore = EntityStore{mResourceContext, mLogCtx}; |
260 | auto preparedQuery = DataStoreQuery{modifiedQuery, ApplicationDomain::getTypeName<DomainType>(), entityStore}; | 260 | auto preparedQuery = [&] { |
261 | auto resultSet = preparedQuery.execute(); | 261 | if (state) { |
262 | return DataStoreQuery{*state, ApplicationDomain::getTypeName<DomainType>(), entityStore, false}; | ||
263 | } else { | ||
264 | return DataStoreQuery{modifiedQuery, ApplicationDomain::getTypeName<DomainType>(), entityStore}; | ||
265 | } | ||
266 | }(); | ||
267 | auto resultSet = preparedQuery.execute();; | ||
262 | 268 | ||
263 | SinkTraceCtx(mLogCtx) << "Filtered set retrieved. " << Log::TraceTime(time.elapsed()); | 269 | SinkTraceCtx(mLogCtx) << "Filtered set retrieved." << Log::TraceTime(time.elapsed()); |
264 | auto replayResult = resultSet.replaySet(offset, batchsize, [this, query, &resultProvider](const ResultSet::Result &result) { | 270 | auto replayResult = resultSet.replaySet(0, batchsize, [this, query, &resultProvider](const ResultSet::Result &result) { |
265 | resultProviderCallback(query, resultProvider, result); | 271 | resultProviderCallback(query, resultProvider, result); |
266 | }); | 272 | }); |
267 | 273 | ||
@@ -269,9 +275,7 @@ ReplayResult QueryWorker<DomainType>::executeInitialQuery( | |||
269 | << (replayResult.replayedAll ? "Replayed all available results.\n" : "") | 275 | << (replayResult.replayedAll ? "Replayed all available results.\n" : "") |
270 | << "Initial query took: " << Log::TraceTime(time.elapsed()); | 276 | << "Initial query took: " << Log::TraceTime(time.elapsed()); |
271 | 277 | ||
272 | auto state = preparedQuery.getState(); | 278 | return {entityStore.maxRevision(), replayResult.replayedEntities, replayResult.replayedAll, preparedQuery.getState()}; |
273 | |||
274 | return {entityStore.maxRevision(), replayResult.replayedEntities, replayResult.replayedAll, state}; | ||
275 | } | 279 | } |
276 | 280 | ||
277 | #define REGISTER_TYPE(T) \ | 281 | #define REGISTER_TYPE(T) \ |
diff --git a/common/queryrunner.h b/common/queryrunner.h index f5c7ead..5308eac 100644 --- a/common/queryrunner.h +++ b/common/queryrunner.h | |||
@@ -98,11 +98,10 @@ private: | |||
98 | QSharedPointer<Sink::ResourceAccessInterface> mResourceAccess; | 98 | QSharedPointer<Sink::ResourceAccessInterface> mResourceAccess; |
99 | QSharedPointer<Sink::ResultProvider<typename DomainType::Ptr>> mResultProvider; | 99 | QSharedPointer<Sink::ResultProvider<typename DomainType::Ptr>> mResultProvider; |
100 | ResultTransformation mResultTransformation; | 100 | ResultTransformation mResultTransformation; |
101 | QHash<QByteArray, qint64> mOffset; | 101 | QHash<QByteArray, DataStoreQuery::State::Ptr> mQueryState; |
102 | int mBatchSize; | 102 | int mBatchSize; |
103 | QObject guard; | 103 | QObject guard; |
104 | Sink::Log::Context mLogCtx; | 104 | Sink::Log::Context mLogCtx; |
105 | DataStoreQuery::State::Ptr mQueryState; | ||
106 | bool mInitialQueryComplete = false; | 105 | bool mInitialQueryComplete = false; |
107 | bool mQueryInProgress = false; | 106 | bool mQueryInProgress = false; |
108 | }; | 107 | }; |