summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2017-03-21 22:06:12 +0100
committerChristian Mollekopf <chrigi_1@fastmail.fm>2017-03-22 11:32:53 +0100
commit86045e308c10c60cd7c4339d305cee1acb084760 (patch)
treeb8fd3e91ffbdeb8ad2e78a61fe1d3426eb5874be
parentdaf96f7efec0538e161eab8e906a291015842e1e (diff)
downloadsink-86045e308c10c60cd7c4339d305cee1acb084760.tar.gz
sink-86045e308c10c60cd7c4339d305cee1acb084760.zip
Maintain the query state instead of using the offset.
Instead of using the offset to skip over old results requires recalculating them, and resulted in some cases in results being added multiple times to the model. By just maintaining the state we can apply the offset directly to the base-set, and maintain the state in reduction etc. which is necessary to continue streaming results while making sure we don't report anything twice.
-rw-r--r--common/datastorequery.cpp9
-rw-r--r--common/datastorequery.h2
-rw-r--r--common/queryrunner.cpp38
-rw-r--r--common/queryrunner.h3
4 files changed, 29 insertions, 23 deletions
diff --git a/common/datastorequery.cpp b/common/datastorequery.cpp
index 0db59e1..2e0c348 100644
--- a/common/datastorequery.cpp
+++ b/common/datastorequery.cpp
@@ -144,7 +144,7 @@ public:
144 const auto property = entity.getProperty(filterProperty); 144 const auto property = entity.getProperty(filterProperty);
145 const auto comparator = propertyFilter.value(filterProperty); 145 const auto comparator = propertyFilter.value(filterProperty);
146 if (!comparator.matches(property)) { 146 if (!comparator.matches(property)) {
147 SinkTraceCtx(mDatastore->mLogCtx) << "Filtering entity due to property mismatch on filter: " << filterProperty << property << ":" << comparator.value; 147 SinkTraceCtx(mDatastore->mLogCtx) << "Filtering entity due to property mismatch on filter: " << entity.identifier() << "Property: " << filterProperty << property << " Filter:" << comparator.value;
148 return false; 148 return false;
149 } 149 }
150 } 150 }
@@ -367,19 +367,22 @@ public:
367DataStoreQuery::DataStoreQuery(const Sink::QueryBase &query, const QByteArray &type, EntityStore &store) 367DataStoreQuery::DataStoreQuery(const Sink::QueryBase &query, const QByteArray &type, EntityStore &store)
368 : mType(type), mStore(store), mLogCtx(store.logContext().subContext("datastorequery")) 368 : mType(type), mStore(store), mLogCtx(store.logContext().subContext("datastorequery"))
369{ 369{
370 //This is what we use during a new query
370 setupQuery(query); 371 setupQuery(query);
371} 372}
372 373
373DataStoreQuery::DataStoreQuery(const DataStoreQuery::State &state, const QByteArray &type, Sink::Storage::EntityStore &store) 374DataStoreQuery::DataStoreQuery(const DataStoreQuery::State &state, const QByteArray &type, Sink::Storage::EntityStore &store, bool incremental)
374 : mType(type), mStore(store), mLogCtx(store.logContext().subContext("datastorequery")) 375 : mType(type), mStore(store), mLogCtx(store.logContext().subContext("datastorequery"))
375{ 376{
377 //This is what we use when fetching more data, without having a new revision with incremental=false
378 //And this is what we use when the data changed and we want to update with incremental = true
376 mCollector = state.mCollector; 379 mCollector = state.mCollector;
377 mSource = state.mSource; 380 mSource = state.mSource;
378 381
379 auto source = mCollector; 382 auto source = mCollector;
380 while (source) { 383 while (source) {
381 source->mDatastore = this; 384 source->mDatastore = this;
382 source->mIncremental = true; 385 source->mIncremental = incremental;
383 source = source->mSource; 386 source = source->mSource;
384 } 387 }
385} 388}
diff --git a/common/datastorequery.h b/common/datastorequery.h
index a797782..ee5f99e 100644
--- a/common/datastorequery.h
+++ b/common/datastorequery.h
@@ -46,7 +46,7 @@ public:
46 }; 46 };
47 47
48 DataStoreQuery(const Sink::QueryBase &query, const QByteArray &type, Sink::Storage::EntityStore &store); 48 DataStoreQuery(const Sink::QueryBase &query, const QByteArray &type, Sink::Storage::EntityStore &store);
49 DataStoreQuery(const DataStoreQuery::State &state, const QByteArray &type, Sink::Storage::EntityStore &store); 49 DataStoreQuery(const DataStoreQuery::State &state, const QByteArray &type, Sink::Storage::EntityStore &store, bool incremental);
50 ~DataStoreQuery(); 50 ~DataStoreQuery();
51 ResultSet execute(); 51 ResultSet execute();
52 ResultSet update(qint64 baseRevision); 52 ResultSet update(qint64 baseRevision);
diff --git a/common/queryrunner.cpp b/common/queryrunner.cpp
index 802fc48..43f48c0 100644
--- a/common/queryrunner.cpp
+++ b/common/queryrunner.cpp
@@ -51,7 +51,7 @@ public:
51 virtual ~QueryWorker(); 51 virtual ~QueryWorker();
52 52
53 ReplayResult executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, DataStoreQuery::State::Ptr state); 53 ReplayResult executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, DataStoreQuery::State::Ptr state);
54 ReplayResult executeInitialQuery(const Sink::Query &query, const typename DomainType::Ptr &parent, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, int offset, int batchsize); 54 ReplayResult executeInitialQuery(const Sink::Query &query, const typename DomainType::Ptr &parent, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, int batchsize, DataStoreQuery::State::Ptr state);
55 55
56private: 56private:
57 void resultProviderCallback(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, const ResultSet::Result &result); 57 void resultProviderCallback(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, const ResultSet::Result &result);
@@ -72,18 +72,18 @@ QueryRunner<DomainType>::QueryRunner(const Sink::Query &query, const Sink::Resou
72 auto guardPtr = QPointer<QObject>(&guard); 72 auto guardPtr = QPointer<QObject>(&guard);
73 auto fetcher = [=](const typename DomainType::Ptr &parent) { 73 auto fetcher = [=](const typename DomainType::Ptr &parent) {
74 const QByteArray parentId = parent ? parent->identifier() : QByteArray(); 74 const QByteArray parentId = parent ? parent->identifier() : QByteArray();
75 SinkTraceCtx(mLogCtx) << "Running fetcher. Offset: " << mOffset[parentId] << " Batchsize: " << mBatchSize; 75 SinkTraceCtx(mLogCtx) << "Running fetcher. Batchsize: " << mBatchSize;
76 auto resultProvider = mResultProvider; 76 auto resultProvider = mResultProvider;
77 auto resultTransformation = mResultTransformation; 77 auto resultTransformation = mResultTransformation;
78 auto offset = mOffset[parentId];
79 auto batchSize = mBatchSize; 78 auto batchSize = mBatchSize;
80 auto resourceContext = mResourceContext; 79 auto resourceContext = mResourceContext;
81 auto logCtx = mLogCtx; 80 auto logCtx = mLogCtx;
81 auto state = mQueryState.value(parentId);
82 const bool runAsync = !query.synchronousQuery(); 82 const bool runAsync = !query.synchronousQuery();
83 //The lambda will be executed in a separate thread, so copy all arguments 83 //The lambda will be executed in a separate thread, so copy all arguments
84 async::run<ReplayResult>([resultTransformation, offset, batchSize, query, bufferType, resourceContext, resultProvider, parent, logCtx]() { 84 async::run<ReplayResult>([=]() {
85 QueryWorker<DomainType> worker(query, resourceContext, bufferType, resultTransformation, logCtx); 85 QueryWorker<DomainType> worker(query, resourceContext, bufferType, resultTransformation, logCtx);
86 return worker.executeInitialQuery(query, parent, *resultProvider, offset, batchSize); 86 return worker.executeInitialQuery(query, parent, *resultProvider, batchSize, state);
87 }, runAsync) 87 }, runAsync)
88 .then([this, parentId, query, parent, resultProvider, guardPtr](const ReplayResult &result) { 88 .then([this, parentId, query, parent, resultProvider, guardPtr](const ReplayResult &result) {
89 if (!guardPtr) { 89 if (!guardPtr) {
@@ -91,8 +91,7 @@ QueryRunner<DomainType>::QueryRunner(const Sink::Query &query, const Sink::Resou
91 return; 91 return;
92 } 92 }
93 mInitialQueryComplete = true; 93 mInitialQueryComplete = true;
94 mQueryState = result.queryState; 94 mQueryState[parentId] = result.queryState;
95 mOffset[parentId] += result.replayedEntities;
96 // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. 95 // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise.
97 if (query.liveQuery()) { 96 if (query.liveQuery()) {
98 mResourceAccess->sendRevisionReplayedCommand(result.newRevision); 97 mResourceAccess->sendRevisionReplayedCommand(result.newRevision);
@@ -111,10 +110,11 @@ QueryRunner<DomainType>::QueryRunner(const Sink::Query &query, const Sink::Resou
111 Q_ASSERT(!query.synchronousQuery()); 110 Q_ASSERT(!query.synchronousQuery());
112 // Incremental updates are always loaded directly, leaving it up to the result to discard the changes if they are not interesting 111 // Incremental updates are always loaded directly, leaving it up to the result to discard the changes if they are not interesting
113 setQuery([=]() -> KAsync::Job<void> { 112 setQuery([=]() -> KAsync::Job<void> {
113 const QByteArray parentId;
114 auto resultProvider = mResultProvider; 114 auto resultProvider = mResultProvider;
115 auto resourceContext = mResourceContext; 115 auto resourceContext = mResourceContext;
116 auto logCtx = mLogCtx; 116 auto logCtx = mLogCtx;
117 auto state = mQueryState; 117 auto state = mQueryState.value(parentId);
118 if (!mInitialQueryComplete) { 118 if (!mInitialQueryComplete) {
119 SinkWarningCtx(mLogCtx) << "Can't start the incremental query before the initial query is complete"; 119 SinkWarningCtx(mLogCtx) << "Can't start the incremental query before the initial query is complete";
120 fetcher({}); 120 fetcher({});
@@ -225,7 +225,7 @@ ReplayResult QueryWorker<DomainType>::executeIncrementalQuery(const Sink::Query
225 SinkWarningCtx(mLogCtx) << "No previous query state."; 225 SinkWarningCtx(mLogCtx) << "No previous query state.";
226 return {0, 0, false, DataStoreQuery::State::Ptr{}}; 226 return {0, 0, false, DataStoreQuery::State::Ptr{}};
227 } 227 }
228 auto preparedQuery = DataStoreQuery{*state, ApplicationDomain::getTypeName<DomainType>(), entityStore}; 228 auto preparedQuery = DataStoreQuery{*state, ApplicationDomain::getTypeName<DomainType>(), entityStore, true};
229 auto resultSet = preparedQuery.update(baseRevision); 229 auto resultSet = preparedQuery.update(baseRevision);
230 SinkTraceCtx(mLogCtx) << "Filtered set retrieved. " << Log::TraceTime(time.elapsed()); 230 SinkTraceCtx(mLogCtx) << "Filtered set retrieved. " << Log::TraceTime(time.elapsed());
231 auto replayResult = resultSet.replaySet(0, 0, [this, query, &resultProvider](const ResultSet::Result &result) { 231 auto replayResult = resultSet.replaySet(0, 0, [this, query, &resultProvider](const ResultSet::Result &result) {
@@ -240,7 +240,7 @@ ReplayResult QueryWorker<DomainType>::executeIncrementalQuery(const Sink::Query
240 240
241template <class DomainType> 241template <class DomainType>
242ReplayResult QueryWorker<DomainType>::executeInitialQuery( 242ReplayResult QueryWorker<DomainType>::executeInitialQuery(
243 const Sink::Query &query, const typename DomainType::Ptr &parent, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, int offset, int batchsize) 243 const Sink::Query &query, const typename DomainType::Ptr &parent, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, int batchsize, DataStoreQuery::State::Ptr state)
244{ 244{
245 QTime time; 245 QTime time;
246 time.start(); 246 time.start();
@@ -257,11 +257,17 @@ ReplayResult QueryWorker<DomainType>::executeInitialQuery(
257 } 257 }
258 258
259 auto entityStore = EntityStore{mResourceContext, mLogCtx}; 259 auto entityStore = EntityStore{mResourceContext, mLogCtx};
260 auto preparedQuery = DataStoreQuery{modifiedQuery, ApplicationDomain::getTypeName<DomainType>(), entityStore}; 260 auto preparedQuery = [&] {
261 auto resultSet = preparedQuery.execute(); 261 if (state) {
262 return DataStoreQuery{*state, ApplicationDomain::getTypeName<DomainType>(), entityStore, false};
263 } else {
264 return DataStoreQuery{modifiedQuery, ApplicationDomain::getTypeName<DomainType>(), entityStore};
265 }
266 }();
267 auto resultSet = preparedQuery.execute();;
262 268
263 SinkTraceCtx(mLogCtx) << "Filtered set retrieved. " << Log::TraceTime(time.elapsed()); 269 SinkTraceCtx(mLogCtx) << "Filtered set retrieved." << Log::TraceTime(time.elapsed());
264 auto replayResult = resultSet.replaySet(offset, batchsize, [this, query, &resultProvider](const ResultSet::Result &result) { 270 auto replayResult = resultSet.replaySet(0, batchsize, [this, query, &resultProvider](const ResultSet::Result &result) {
265 resultProviderCallback(query, resultProvider, result); 271 resultProviderCallback(query, resultProvider, result);
266 }); 272 });
267 273
@@ -269,9 +275,7 @@ ReplayResult QueryWorker<DomainType>::executeInitialQuery(
269 << (replayResult.replayedAll ? "Replayed all available results.\n" : "") 275 << (replayResult.replayedAll ? "Replayed all available results.\n" : "")
270 << "Initial query took: " << Log::TraceTime(time.elapsed()); 276 << "Initial query took: " << Log::TraceTime(time.elapsed());
271 277
272 auto state = preparedQuery.getState(); 278 return {entityStore.maxRevision(), replayResult.replayedEntities, replayResult.replayedAll, preparedQuery.getState()};
273
274 return {entityStore.maxRevision(), replayResult.replayedEntities, replayResult.replayedAll, state};
275} 279}
276 280
277#define REGISTER_TYPE(T) \ 281#define REGISTER_TYPE(T) \
diff --git a/common/queryrunner.h b/common/queryrunner.h
index f5c7ead..5308eac 100644
--- a/common/queryrunner.h
+++ b/common/queryrunner.h
@@ -98,11 +98,10 @@ private:
98 QSharedPointer<Sink::ResourceAccessInterface> mResourceAccess; 98 QSharedPointer<Sink::ResourceAccessInterface> mResourceAccess;
99 QSharedPointer<Sink::ResultProvider<typename DomainType::Ptr>> mResultProvider; 99 QSharedPointer<Sink::ResultProvider<typename DomainType::Ptr>> mResultProvider;
100 ResultTransformation mResultTransformation; 100 ResultTransformation mResultTransformation;
101 QHash<QByteArray, qint64> mOffset; 101 QHash<QByteArray, DataStoreQuery::State::Ptr> mQueryState;
102 int mBatchSize; 102 int mBatchSize;
103 QObject guard; 103 QObject guard;
104 Sink::Log::Context mLogCtx; 104 Sink::Log::Context mLogCtx;
105 DataStoreQuery::State::Ptr mQueryState;
106 bool mInitialQueryComplete = false; 105 bool mInitialQueryComplete = false;
107 bool mQueryInProgress = false; 106 bool mQueryInProgress = false;
108}; 107};