summaryrefslogtreecommitdiffstats
path: root/common
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2017-06-10 09:37:08 +0200
committerChristian Mollekopf <chrigi_1@fastmail.fm>2017-06-10 09:37:08 +0200
commite382924f1a90b5a27eba2e8c5981f6a4fe7892c9 (patch)
tree315780c4699be92d20121351bca4154c4fff8994 /common
parentf254153ac1571d5ab31a4f17fba2db9256bc24d3 (diff)
downloadsink-e382924f1a90b5a27eba2e8c5981f6a4fe7892c9.tar.gz
sink-e382924f1a90b5a27eba2e8c5981f6a4fe7892c9.zip
Fixed incremental queries
The incremental querying broke as soon as a revision update came in since it would nuke the base-set. This fixes it, but it's definitely not pretty.
Diffstat (limited to 'common')
-rw-r--r--common/datastorequery.cpp41
-rw-r--r--common/datastorequery.h1
-rw-r--r--common/queryrunner.cpp6
3 files changed, 35 insertions, 13 deletions
diff --git a/common/datastorequery.cpp b/common/datastorequery.cpp
index 2e0c348..4c95606 100644
--- a/common/datastorequery.cpp
+++ b/common/datastorequery.cpp
@@ -43,6 +43,8 @@ class Source : public FilterBase {
43 43
44 QVector<QByteArray> mIds; 44 QVector<QByteArray> mIds;
45 QVector<QByteArray>::ConstIterator mIt; 45 QVector<QByteArray>::ConstIterator mIt;
46 QVector<QByteArray> mIncrementalIds;
47 QVector<QByteArray>::ConstIterator mIncrementalIt;
46 48
47 Source (const QVector<QByteArray> &ids, DataStoreQuery *store) 49 Source (const QVector<QByteArray> &ids, DataStoreQuery *store)
48 : FilterBase(store), 50 : FilterBase(store),
@@ -63,21 +65,36 @@ class Source : public FilterBase {
63 65
64 void add(const QVector<QByteArray> &ids) 66 void add(const QVector<QByteArray> &ids)
65 { 67 {
66 mIds = ids; 68 mIncrementalIds = ids;
67 mIt = mIds.constBegin(); 69 mIncrementalIt = mIncrementalIds.constBegin();
68 } 70 }
69 71
70 bool next(const std::function<void(const ResultSet::Result &result)> &callback) Q_DECL_OVERRIDE 72 bool next(const std::function<void(const ResultSet::Result &result)> &callback) Q_DECL_OVERRIDE
71 { 73 {
72 if (mIt == mIds.constEnd()) { 74 if (!mIncrementalIds.isEmpty()) {
73 return false; 75 if (mIncrementalIt == mIncrementalIds.constEnd()) {
76 return false;
77 }
78 readEntity(*mIncrementalIt, [this, callback](const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation operation) {
79 SinkTraceCtx(mDatastore->mLogCtx) << "Source: Read entity: " << entity.identifier() << operationName(operation);
80 callback({entity, operation});
81 });
82 mIncrementalIt++;
83 if (mIncrementalIt == mIncrementalIds.constEnd()) {
84 return false;
85 }
86 return true;
87 } else {
88 if (mIt == mIds.constEnd()) {
89 return false;
90 }
91 readEntity(*mIt, [this, callback](const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation operation) {
92 SinkTraceCtx(mDatastore->mLogCtx) << "Source: Read entity: " << entity.identifier() << operationName(operation);
93 callback({entity, operation});
94 });
95 mIt++;
96 return mIt != mIds.constEnd();
74 } 97 }
75 readEntity(*mIt, [this, callback](const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Operation operation) {
76 SinkTraceCtx(mDatastore->mLogCtx) << "Source: Read entity: " << entity.identifier() << operationName(operation);
77 callback({entity, operation});
78 });
79 mIt++;
80 return mIt != mIds.constEnd();
81 } 98 }
82}; 99};
83 100
@@ -599,6 +616,10 @@ ResultSet DataStoreQuery::update(qint64 baseRevision)
599 return ResultSet(generator, [this]() { mCollector->skip(); }); 616 return ResultSet(generator, [this]() { mCollector->skip(); });
600} 617}
601 618
619void DataStoreQuery::updateComplete()
620{
621 mSource->mIncrementalIds.clear();
622}
602 623
603ResultSet DataStoreQuery::execute() 624ResultSet DataStoreQuery::execute()
604{ 625{
diff --git a/common/datastorequery.h b/common/datastorequery.h
index ee5f99e..de4ae26 100644
--- a/common/datastorequery.h
+++ b/common/datastorequery.h
@@ -50,6 +50,7 @@ public:
50 ~DataStoreQuery(); 50 ~DataStoreQuery();
51 ResultSet execute(); 51 ResultSet execute();
52 ResultSet update(qint64 baseRevision); 52 ResultSet update(qint64 baseRevision);
53 void updateComplete();
53 54
54 State::Ptr getState(); 55 State::Ptr getState();
55 56
diff --git a/common/queryrunner.cpp b/common/queryrunner.cpp
index 43f48c0..f196965 100644
--- a/common/queryrunner.cpp
+++ b/common/queryrunner.cpp
@@ -231,11 +231,11 @@ ReplayResult QueryWorker<DomainType>::executeIncrementalQuery(const Sink::Query
231 auto replayResult = resultSet.replaySet(0, 0, [this, query, &resultProvider](const ResultSet::Result &result) { 231 auto replayResult = resultSet.replaySet(0, 0, [this, query, &resultProvider](const ResultSet::Result &result) {
232 resultProviderCallback(query, resultProvider, result); 232 resultProviderCallback(query, resultProvider, result);
233 }); 233 });
234 234 preparedQuery.updateComplete();
235 SinkTraceCtx(mLogCtx) << "Replayed " << replayResult.replayedEntities << " results.\n" 235 SinkTraceCtx(mLogCtx) << "Replayed " << replayResult.replayedEntities << " results.\n"
236 << (replayResult.replayedAll ? "Replayed all available results.\n" : "") 236 << (replayResult.replayedAll ? "Replayed all available results.\n" : "")
237 << "Incremental query took: " << Log::TraceTime(time.elapsed()); 237 << "Incremental query took: " << Log::TraceTime(time.elapsed());
238 return {entityStore.maxRevision(), replayResult.replayedEntities, replayResult.replayedAll, preparedQuery.getState()}; 238 return {entityStore.maxRevision(), replayResult.replayedEntities, false, preparedQuery.getState()};
239} 239}
240 240
241template <class DomainType> 241template <class DomainType>
@@ -264,7 +264,7 @@ ReplayResult QueryWorker<DomainType>::executeInitialQuery(
264 return DataStoreQuery{modifiedQuery, ApplicationDomain::getTypeName<DomainType>(), entityStore}; 264 return DataStoreQuery{modifiedQuery, ApplicationDomain::getTypeName<DomainType>(), entityStore};
265 } 265 }
266 }(); 266 }();
267 auto resultSet = preparedQuery.execute();; 267 auto resultSet = preparedQuery.execute();
268 268
269 SinkTraceCtx(mLogCtx) << "Filtered set retrieved." << Log::TraceTime(time.elapsed()); 269 SinkTraceCtx(mLogCtx) << "Filtered set retrieved." << Log::TraceTime(time.elapsed());
270 auto replayResult = resultSet.replaySet(0, batchsize, [this, query, &resultProvider](const ResultSet::Result &result) { 270 auto replayResult = resultSet.replaySet(0, batchsize, [this, query, &resultProvider](const ResultSet::Result &result) {