summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--common/datastorequery.cpp9
-rw-r--r--common/datastorequery.h2
-rw-r--r--common/queryrunner.cpp38
-rw-r--r--common/queryrunner.h3
4 files changed, 29 insertions, 23 deletions
diff --git a/common/datastorequery.cpp b/common/datastorequery.cpp
index 0db59e1..2e0c348 100644
--- a/common/datastorequery.cpp
+++ b/common/datastorequery.cpp
@@ -144,7 +144,7 @@ public:
144 const auto property = entity.getProperty(filterProperty); 144 const auto property = entity.getProperty(filterProperty);
145 const auto comparator = propertyFilter.value(filterProperty); 145 const auto comparator = propertyFilter.value(filterProperty);
146 if (!comparator.matches(property)) { 146 if (!comparator.matches(property)) {
147 SinkTraceCtx(mDatastore->mLogCtx) << "Filtering entity due to property mismatch on filter: " << filterProperty << property << ":" << comparator.value; 147 SinkTraceCtx(mDatastore->mLogCtx) << "Filtering entity due to property mismatch on filter: " << entity.identifier() << "Property: " << filterProperty << property << " Filter:" << comparator.value;
148 return false; 148 return false;
149 } 149 }
150 } 150 }
@@ -367,19 +367,22 @@ public:
367DataStoreQuery::DataStoreQuery(const Sink::QueryBase &query, const QByteArray &type, EntityStore &store) 367DataStoreQuery::DataStoreQuery(const Sink::QueryBase &query, const QByteArray &type, EntityStore &store)
368 : mType(type), mStore(store), mLogCtx(store.logContext().subContext("datastorequery")) 368 : mType(type), mStore(store), mLogCtx(store.logContext().subContext("datastorequery"))
369{ 369{
370 //This is what we use during a new query
370 setupQuery(query); 371 setupQuery(query);
371} 372}
372 373
373DataStoreQuery::DataStoreQuery(const DataStoreQuery::State &state, const QByteArray &type, Sink::Storage::EntityStore &store) 374DataStoreQuery::DataStoreQuery(const DataStoreQuery::State &state, const QByteArray &type, Sink::Storage::EntityStore &store, bool incremental)
374 : mType(type), mStore(store), mLogCtx(store.logContext().subContext("datastorequery")) 375 : mType(type), mStore(store), mLogCtx(store.logContext().subContext("datastorequery"))
375{ 376{
377 //This is what we use when fetching more data, without having a new revision with incremental=false
378 //And this is what we use when the data changed and we want to update with incremental = true
376 mCollector = state.mCollector; 379 mCollector = state.mCollector;
377 mSource = state.mSource; 380 mSource = state.mSource;
378 381
379 auto source = mCollector; 382 auto source = mCollector;
380 while (source) { 383 while (source) {
381 source->mDatastore = this; 384 source->mDatastore = this;
382 source->mIncremental = true; 385 source->mIncremental = incremental;
383 source = source->mSource; 386 source = source->mSource;
384 } 387 }
385} 388}
diff --git a/common/datastorequery.h b/common/datastorequery.h
index a797782..ee5f99e 100644
--- a/common/datastorequery.h
+++ b/common/datastorequery.h
@@ -46,7 +46,7 @@ public:
46 }; 46 };
47 47
48 DataStoreQuery(const Sink::QueryBase &query, const QByteArray &type, Sink::Storage::EntityStore &store); 48 DataStoreQuery(const Sink::QueryBase &query, const QByteArray &type, Sink::Storage::EntityStore &store);
49 DataStoreQuery(const DataStoreQuery::State &state, const QByteArray &type, Sink::Storage::EntityStore &store); 49 DataStoreQuery(const DataStoreQuery::State &state, const QByteArray &type, Sink::Storage::EntityStore &store, bool incremental);
50 ~DataStoreQuery(); 50 ~DataStoreQuery();
51 ResultSet execute(); 51 ResultSet execute();
52 ResultSet update(qint64 baseRevision); 52 ResultSet update(qint64 baseRevision);
diff --git a/common/queryrunner.cpp b/common/queryrunner.cpp
index 802fc48..43f48c0 100644
--- a/common/queryrunner.cpp
+++ b/common/queryrunner.cpp
@@ -51,7 +51,7 @@ public:
51 virtual ~QueryWorker(); 51 virtual ~QueryWorker();
52 52
53 ReplayResult executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, DataStoreQuery::State::Ptr state); 53 ReplayResult executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, DataStoreQuery::State::Ptr state);
54 ReplayResult executeInitialQuery(const Sink::Query &query, const typename DomainType::Ptr &parent, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, int offset, int batchsize); 54 ReplayResult executeInitialQuery(const Sink::Query &query, const typename DomainType::Ptr &parent, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, int batchsize, DataStoreQuery::State::Ptr state);
55 55
56private: 56private:
57 void resultProviderCallback(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, const ResultSet::Result &result); 57 void resultProviderCallback(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, const ResultSet::Result &result);
@@ -72,18 +72,18 @@ QueryRunner<DomainType>::QueryRunner(const Sink::Query &query, const Sink::Resou
72 auto guardPtr = QPointer<QObject>(&guard); 72 auto guardPtr = QPointer<QObject>(&guard);
73 auto fetcher = [=](const typename DomainType::Ptr &parent) { 73 auto fetcher = [=](const typename DomainType::Ptr &parent) {
74 const QByteArray parentId = parent ? parent->identifier() : QByteArray(); 74 const QByteArray parentId = parent ? parent->identifier() : QByteArray();
75 SinkTraceCtx(mLogCtx) << "Running fetcher. Offset: " << mOffset[parentId] << " Batchsize: " << mBatchSize; 75 SinkTraceCtx(mLogCtx) << "Running fetcher. Batchsize: " << mBatchSize;
76 auto resultProvider = mResultProvider; 76 auto resultProvider = mResultProvider;
77 auto resultTransformation = mResultTransformation; 77 auto resultTransformation = mResultTransformation;
78 auto offset = mOffset[parentId];
79 auto batchSize = mBatchSize; 78 auto batchSize = mBatchSize;
80 auto resourceContext = mResourceContext; 79 auto resourceContext = mResourceContext;
81 auto logCtx = mLogCtx; 80 auto logCtx = mLogCtx;
81 auto state = mQueryState.value(parentId);
82 const bool runAsync = !query.synchronousQuery(); 82 const bool runAsync = !query.synchronousQuery();
83 //The lambda will be executed in a separate thread, so copy all arguments 83 //The lambda will be executed in a separate thread, so copy all arguments
84 async::run<ReplayResult>([resultTransformation, offset, batchSize, query, bufferType, resourceContext, resultProvider, parent, logCtx]() { 84 async::run<ReplayResult>([=]() {
85 QueryWorker<DomainType> worker(query, resourceContext, bufferType, resultTransformation, logCtx); 85 QueryWorker<DomainType> worker(query, resourceContext, bufferType, resultTransformation, logCtx);
86 return worker.executeInitialQuery(query, parent, *resultProvider, offset, batchSize); 86 return worker.executeInitialQuery(query, parent, *resultProvider, batchSize, state);
87 }, runAsync) 87 }, runAsync)
88 .then([this, parentId, query, parent, resultProvider, guardPtr](const ReplayResult &result) { 88 .then([this, parentId, query, parent, resultProvider, guardPtr](const ReplayResult &result) {
89 if (!guardPtr) { 89 if (!guardPtr) {
@@ -91,8 +91,7 @@ QueryRunner<DomainType>::QueryRunner(const Sink::Query &query, const Sink::Resou
91 return; 91 return;
92 } 92 }
93 mInitialQueryComplete = true; 93 mInitialQueryComplete = true;
94 mQueryState = result.queryState; 94 mQueryState[parentId] = result.queryState;
95 mOffset[parentId] += result.replayedEntities;
96 // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. 95 // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise.
97 if (query.liveQuery()) { 96 if (query.liveQuery()) {
98 mResourceAccess->sendRevisionReplayedCommand(result.newRevision); 97 mResourceAccess->sendRevisionReplayedCommand(result.newRevision);
@@ -111,10 +110,11 @@ QueryRunner<DomainType>::QueryRunner(const Sink::Query &query, const Sink::Resou
111 Q_ASSERT(!query.synchronousQuery()); 110 Q_ASSERT(!query.synchronousQuery());
112 // Incremental updates are always loaded directly, leaving it up to the result to discard the changes if they are not interesting 111 // Incremental updates are always loaded directly, leaving it up to the result to discard the changes if they are not interesting
113 setQuery([=]() -> KAsync::Job<void> { 112 setQuery([=]() -> KAsync::Job<void> {
113 const QByteArray parentId;
114 auto resultProvider = mResultProvider; 114 auto resultProvider = mResultProvider;
115 auto resourceContext = mResourceContext; 115 auto resourceContext = mResourceContext;
116 auto logCtx = mLogCtx; 116 auto logCtx = mLogCtx;
117 auto state = mQueryState; 117 auto state = mQueryState.value(parentId);
118 if (!mInitialQueryComplete) { 118 if (!mInitialQueryComplete) {
119 SinkWarningCtx(mLogCtx) << "Can't start the incremental query before the initial query is complete"; 119 SinkWarningCtx(mLogCtx) << "Can't start the incremental query before the initial query is complete";
120 fetcher({}); 120 fetcher({});
@@ -225,7 +225,7 @@ ReplayResult QueryWorker<DomainType>::executeIncrementalQuery(const Sink::Query
225 SinkWarningCtx(mLogCtx) << "No previous query state."; 225 SinkWarningCtx(mLogCtx) << "No previous query state.";
226 return {0, 0, false, DataStoreQuery::State::Ptr{}}; 226 return {0, 0, false, DataStoreQuery::State::Ptr{}};
227 } 227 }
228 auto preparedQuery = DataStoreQuery{*state, ApplicationDomain::getTypeName<DomainType>(), entityStore}; 228 auto preparedQuery = DataStoreQuery{*state, ApplicationDomain::getTypeName<DomainType>(), entityStore, true};
229 auto resultSet = preparedQuery.update(baseRevision); 229 auto resultSet = preparedQuery.update(baseRevision);
230 SinkTraceCtx(mLogCtx) << "Filtered set retrieved. " << Log::TraceTime(time.elapsed()); 230 SinkTraceCtx(mLogCtx) << "Filtered set retrieved. " << Log::TraceTime(time.elapsed());
231 auto replayResult = resultSet.replaySet(0, 0, [this, query, &resultProvider](const ResultSet::Result &result) { 231 auto replayResult = resultSet.replaySet(0, 0, [this, query, &resultProvider](const ResultSet::Result &result) {
@@ -240,7 +240,7 @@ ReplayResult QueryWorker<DomainType>::executeIncrementalQuery(const Sink::Query
240 240
241template <class DomainType> 241template <class DomainType>
242ReplayResult QueryWorker<DomainType>::executeInitialQuery( 242ReplayResult QueryWorker<DomainType>::executeInitialQuery(
243 const Sink::Query &query, const typename DomainType::Ptr &parent, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, int offset, int batchsize) 243 const Sink::Query &query, const typename DomainType::Ptr &parent, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, int batchsize, DataStoreQuery::State::Ptr state)
244{ 244{
245 QTime time; 245 QTime time;
246 time.start(); 246 time.start();
@@ -257,11 +257,17 @@ ReplayResult QueryWorker<DomainType>::executeInitialQuery(
257 } 257 }
258 258
259 auto entityStore = EntityStore{mResourceContext, mLogCtx}; 259 auto entityStore = EntityStore{mResourceContext, mLogCtx};
260 auto preparedQuery = DataStoreQuery{modifiedQuery, ApplicationDomain::getTypeName<DomainType>(), entityStore}; 260 auto preparedQuery = [&] {
261 auto resultSet = preparedQuery.execute(); 261 if (state) {
262 return DataStoreQuery{*state, ApplicationDomain::getTypeName<DomainType>(), entityStore, false};
263 } else {
264 return DataStoreQuery{modifiedQuery, ApplicationDomain::getTypeName<DomainType>(), entityStore};
265 }
266 }();
267 auto resultSet = preparedQuery.execute();;
262 268
263 SinkTraceCtx(mLogCtx) << "Filtered set retrieved. " << Log::TraceTime(time.elapsed()); 269 SinkTraceCtx(mLogCtx) << "Filtered set retrieved." << Log::TraceTime(time.elapsed());
264 auto replayResult = resultSet.replaySet(offset, batchsize, [this, query, &resultProvider](const ResultSet::Result &result) { 270 auto replayResult = resultSet.replaySet(0, batchsize, [this, query, &resultProvider](const ResultSet::Result &result) {
265 resultProviderCallback(query, resultProvider, result); 271 resultProviderCallback(query, resultProvider, result);
266 }); 272 });
267 273
@@ -269,9 +275,7 @@ ReplayResult QueryWorker<DomainType>::executeInitialQuery(
269 << (replayResult.replayedAll ? "Replayed all available results.\n" : "") 275 << (replayResult.replayedAll ? "Replayed all available results.\n" : "")
270 << "Initial query took: " << Log::TraceTime(time.elapsed()); 276 << "Initial query took: " << Log::TraceTime(time.elapsed());
271 277
272 auto state = preparedQuery.getState(); 278 return {entityStore.maxRevision(), replayResult.replayedEntities, replayResult.replayedAll, preparedQuery.getState()};
273
274 return {entityStore.maxRevision(), replayResult.replayedEntities, replayResult.replayedAll, state};
275} 279}
276 280
277#define REGISTER_TYPE(T) \ 281#define REGISTER_TYPE(T) \
diff --git a/common/queryrunner.h b/common/queryrunner.h
index f5c7ead..5308eac 100644
--- a/common/queryrunner.h
+++ b/common/queryrunner.h
@@ -98,11 +98,10 @@ private:
98 QSharedPointer<Sink::ResourceAccessInterface> mResourceAccess; 98 QSharedPointer<Sink::ResourceAccessInterface> mResourceAccess;
99 QSharedPointer<Sink::ResultProvider<typename DomainType::Ptr>> mResultProvider; 99 QSharedPointer<Sink::ResultProvider<typename DomainType::Ptr>> mResultProvider;
100 ResultTransformation mResultTransformation; 100 ResultTransformation mResultTransformation;
101 QHash<QByteArray, qint64> mOffset; 101 QHash<QByteArray, DataStoreQuery::State::Ptr> mQueryState;
102 int mBatchSize; 102 int mBatchSize;
103 QObject guard; 103 QObject guard;
104 Sink::Log::Context mLogCtx; 104 Sink::Log::Context mLogCtx;
105 DataStoreQuery::State::Ptr mQueryState;
106 bool mInitialQueryComplete = false; 105 bool mInitialQueryComplete = false;
107 bool mQueryInProgress = false; 106 bool mQueryInProgress = false;
108}; 107};