summaryrefslogtreecommitdiffstats
path: root/common/queryrunner.cpp
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2016-12-20 16:02:38 +0100
committerChristian Mollekopf <chrigi_1@fastmail.fm>2016-12-20 16:02:38 +0100
commit1864024012213dc0a17c76e9755bf50a19944ec7 (patch)
tree06690c35c05c29deec30a3dd412d4043e2c956a3 /common/queryrunner.cpp
parent6625bb5a2145008ad47ae963e1546714b7342bf0 (diff)
downloadsink-1864024012213dc0a17c76e9755bf50a19944ec7.tar.gz
sink-1864024012213dc0a17c76e9755bf50a19944ec7.zip
Report when we don't have any more to fetch.
... so we can use that information in fetchMore.
Diffstat (limited to 'common/queryrunner.cpp')
-rw-r--r--common/queryrunner.cpp52
1 files changed, 30 insertions, 22 deletions
diff --git a/common/queryrunner.cpp b/common/queryrunner.cpp
index d6a90de..377e3b9 100644
--- a/common/queryrunner.cpp
+++ b/common/queryrunner.cpp
@@ -31,6 +31,12 @@ SINK_DEBUG_AREA("queryrunner")
31using namespace Sink; 31using namespace Sink;
32using namespace Sink::Storage; 32using namespace Sink::Storage;
33 33
34struct ReplayResult {
35 qint64 newRevision;
36 qint64 replayedEntities;
37 bool replayedAll;
38};
39
34/* 40/*
35 * This class wraps the actual query implementation. 41 * This class wraps the actual query implementation.
36 * 42 *
@@ -47,8 +53,8 @@ public:
47 QueryWorker(const Sink::Query &query, const ResourceContext &context, const QByteArray &bufferType, const QueryRunnerBase::ResultTransformation &transformation); 53 QueryWorker(const Sink::Query &query, const ResourceContext &context, const QByteArray &bufferType, const QueryRunnerBase::ResultTransformation &transformation);
48 virtual ~QueryWorker(); 54 virtual ~QueryWorker();
49 55
50 QPair<qint64, qint64> executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider); 56 ReplayResult executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider);
51 QPair<qint64, qint64> executeInitialQuery(const Sink::Query &query, const typename DomainType::Ptr &parent, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, int offset, int batchsize); 57 ReplayResult executeInitialQuery(const Sink::Query &query, const typename DomainType::Ptr &parent, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, int offset, int batchsize);
52 58
53private: 59private:
54 void resultProviderCallback(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, const ResultSet::Result &result); 60 void resultProviderCallback(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, const ResultSet::Result &result);
@@ -64,7 +70,7 @@ QueryRunner<DomainType>::QueryRunner(const Sink::Query &query, const Sink::Resou
64{ 70{
65 SinkTrace() << "Starting query. Is live:" << query.liveQuery() << " Limit: " << query.limit(); 71 SinkTrace() << "Starting query. Is live:" << query.liveQuery() << " Limit: " << query.limit();
66 if (query.limit() && query.sortProperty().isEmpty()) { 72 if (query.limit() && query.sortProperty().isEmpty()) {
67 SinkWarning() << "A limited query without sorting is typically a bad idea."; 73 SinkWarning() << "A limited query without sorting is typically a bad idea, because there is no telling what you're going to get.";
68 } 74 }
69 auto guardPtr = QPointer<QObject>(&guard); 75 auto guardPtr = QPointer<QObject>(&guard);
70 // We delegate loading of initial data to the result provider, so it can decide for itself what it needs to load. 76 // We delegate loading of initial data to the result provider, so it can decide for itself what it needs to load.
@@ -74,31 +80,33 @@ QueryRunner<DomainType>::QueryRunner(const Sink::Query &query, const Sink::Resou
74 auto resultProvider = mResultProvider; 80 auto resultProvider = mResultProvider;
75 if (query.synchronousQuery()) { 81 if (query.synchronousQuery()) {
76 QueryWorker<DomainType> worker(query, mResourceContext, bufferType, mResultTransformation); 82 QueryWorker<DomainType> worker(query, mResourceContext, bufferType, mResultTransformation);
77 worker.executeInitialQuery(query, parent, *resultProvider, mOffset[parentId], mBatchSize); 83 const auto newRevisionAndReplayedEntities = worker.executeInitialQuery(query, parent, *resultProvider, mOffset[parentId], mBatchSize);
78 resultProvider->initialResultSetComplete(parent); 84 mOffset[parentId] += newRevisionAndReplayedEntities.replayedEntities;
85 resultProvider->setRevision(newRevisionAndReplayedEntities.newRevision);
86 resultProvider->initialResultSetComplete(parent, newRevisionAndReplayedEntities.replayedAll);
79 } else { 87 } else {
80 auto resultTransformation = mResultTransformation; 88 auto resultTransformation = mResultTransformation;
81 auto offset = mOffset[parentId]; 89 auto offset = mOffset[parentId];
82 auto batchSize = mBatchSize; 90 auto batchSize = mBatchSize;
83 auto resourceContext = mResourceContext; 91 auto resourceContext = mResourceContext;
84 //The lambda will be executed in a separate thread, so we're extra careful 92 //The lambda will be executed in a separate thread, so copy all arguments
85 async::run<QPair<qint64, qint64> >([resultTransformation, offset, batchSize, query, bufferType, resourceContext, resultProvider, parent]() { 93 async::run<ReplayResult>([resultTransformation, offset, batchSize, query, bufferType, resourceContext, resultProvider, parent]() {
86 QueryWorker<DomainType> worker(query, resourceContext, bufferType, resultTransformation); 94 QueryWorker<DomainType> worker(query, resourceContext, bufferType, resultTransformation);
87 const auto newRevisionAndReplayedEntities = worker.executeInitialQuery(query, parent, *resultProvider, offset, batchSize); 95 const auto newRevisionAndReplayedEntities = worker.executeInitialQuery(query, parent, *resultProvider, offset, batchSize);
88 return newRevisionAndReplayedEntities; 96 return newRevisionAndReplayedEntities;
89 }) 97 })
90 .template syncThen<void, QPair<qint64, qint64>>([this, parentId, query, parent, resultProvider, guardPtr](const QPair<qint64, qint64> &newRevisionAndReplayedEntities) { 98 .template syncThen<void, ReplayResult>([this, parentId, query, parent, resultProvider, guardPtr](const ReplayResult &newRevisionAndReplayedEntities) {
91 if (!guardPtr) { 99 if (!guardPtr) {
92 qWarning() << "The parent object is already gone"; 100 qWarning() << "The parent object is already gone";
93 return; 101 return;
94 } 102 }
95 mOffset[parentId] += newRevisionAndReplayedEntities.second; 103 mOffset[parentId] += newRevisionAndReplayedEntities.replayedEntities;
96 // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. 104 // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise.
97 if (query.liveQuery()) { 105 if (query.liveQuery()) {
98 mResourceAccess->sendRevisionReplayedCommand(newRevisionAndReplayedEntities.first); 106 mResourceAccess->sendRevisionReplayedCommand(newRevisionAndReplayedEntities.newRevision);
99 } 107 }
100 resultProvider->setRevision(newRevisionAndReplayedEntities.first); 108 resultProvider->setRevision(newRevisionAndReplayedEntities.newRevision);
101 resultProvider->initialResultSetComplete(parent); 109 resultProvider->initialResultSetComplete(parent, newRevisionAndReplayedEntities.replayedAll);
102 }) 110 })
103 .exec(); 111 .exec();
104 } 112 }
@@ -111,19 +119,19 @@ QueryRunner<DomainType>::QueryRunner(const Sink::Query &query, const Sink::Resou
111 setQuery([=]() -> KAsync::Job<void> { 119 setQuery([=]() -> KAsync::Job<void> {
112 auto resultProvider = mResultProvider; 120 auto resultProvider = mResultProvider;
113 auto resourceContext = mResourceContext; 121 auto resourceContext = mResourceContext;
114 return async::run<QPair<qint64, qint64> >([=]() { 122 return async::run<ReplayResult>([=]() {
115 QueryWorker<DomainType> worker(query, resourceContext, bufferType, mResultTransformation); 123 QueryWorker<DomainType> worker(query, resourceContext, bufferType, mResultTransformation);
116 const auto newRevisionAndReplayedEntities = worker.executeIncrementalQuery(query, *resultProvider); 124 const auto newRevisionAndReplayedEntities = worker.executeIncrementalQuery(query, *resultProvider);
117 return newRevisionAndReplayedEntities; 125 return newRevisionAndReplayedEntities;
118 }) 126 })
119 .template syncThen<void, QPair<qint64, qint64> >([query, this, resultProvider, guardPtr](const QPair<qint64, qint64> &newRevisionAndReplayedEntities) { 127 .template syncThen<void, ReplayResult>([query, this, resultProvider, guardPtr](const ReplayResult &newRevisionAndReplayedEntities) {
120 if (!guardPtr) { 128 if (!guardPtr) {
121 qWarning() << "The parent object is already gone"; 129 qWarning() << "The parent object is already gone";
122 return; 130 return;
123 } 131 }
124 // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. 132 // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise.
125 mResourceAccess->sendRevisionReplayedCommand(newRevisionAndReplayedEntities.first); 133 mResourceAccess->sendRevisionReplayedCommand(newRevisionAndReplayedEntities.newRevision);
126 resultProvider->setRevision(newRevisionAndReplayedEntities.first); 134 resultProvider->setRevision(newRevisionAndReplayedEntities.newRevision);
127 }); 135 });
128 }); 136 });
129 // Ensure the connection is open, if it wasn't already opened 137 // Ensure the connection is open, if it wasn't already opened
@@ -195,7 +203,7 @@ void QueryWorker<DomainType>::resultProviderCallback(const Sink::Query &query, S
195} 203}
196 204
197template <class DomainType> 205template <class DomainType>
198QPair<qint64, qint64> QueryWorker<DomainType>::executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider) 206ReplayResult QueryWorker<DomainType>::executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider)
199{ 207{
200 QTime time; 208 QTime time;
201 time.start(); 209 time.start();
@@ -205,16 +213,16 @@ QPair<qint64, qint64> QueryWorker<DomainType>::executeIncrementalQuery(const Sin
205 auto preparedQuery = DataStoreQuery{query, ApplicationDomain::getTypeName<DomainType>(), entityStore}; 213 auto preparedQuery = DataStoreQuery{query, ApplicationDomain::getTypeName<DomainType>(), entityStore};
206 auto resultSet = preparedQuery.update(baseRevision); 214 auto resultSet = preparedQuery.update(baseRevision);
207 SinkTrace() << "Filtered set retrieved. " << Log::TraceTime(time.elapsed()); 215 SinkTrace() << "Filtered set retrieved. " << Log::TraceTime(time.elapsed());
208 auto replayedEntities = resultSet.replaySet(0, 0, [this, query, &resultProvider](const ResultSet::Result &result) { 216 auto replayResult = resultSet.replaySet(0, 0, [this, query, &resultProvider](const ResultSet::Result &result) {
209 resultProviderCallback(query, resultProvider, result); 217 resultProviderCallback(query, resultProvider, result);
210 }); 218 });
211 219
212 SinkTrace() << "Incremental query took: " << Log::TraceTime(time.elapsed()); 220 SinkTrace() << "Incremental query took: " << Log::TraceTime(time.elapsed());
213 return qMakePair(entityStore.maxRevision(), replayedEntities); 221 return {entityStore.maxRevision(), replayResult.replayedEntities, replayResult.replayedAll};
214} 222}
215 223
216template <class DomainType> 224template <class DomainType>
217QPair<qint64, qint64> QueryWorker<DomainType>::executeInitialQuery( 225ReplayResult QueryWorker<DomainType>::executeInitialQuery(
218 const Sink::Query &query, const typename DomainType::Ptr &parent, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, int offset, int batchsize) 226 const Sink::Query &query, const typename DomainType::Ptr &parent, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, int offset, int batchsize)
219{ 227{
220 QTime time; 228 QTime time;
@@ -236,12 +244,12 @@ QPair<qint64, qint64> QueryWorker<DomainType>::executeInitialQuery(
236 auto resultSet = preparedQuery.execute(); 244 auto resultSet = preparedQuery.execute();
237 245
238 SinkTrace() << "Filtered set retrieved. " << Log::TraceTime(time.elapsed()); 246 SinkTrace() << "Filtered set retrieved. " << Log::TraceTime(time.elapsed());
239 auto replayedEntities = resultSet.replaySet(offset, batchsize, [this, query, &resultProvider](const ResultSet::Result &result) { 247 auto replayResult = resultSet.replaySet(offset, batchsize, [this, query, &resultProvider](const ResultSet::Result &result) {
240 resultProviderCallback(query, resultProvider, result); 248 resultProviderCallback(query, resultProvider, result);
241 }); 249 });
242 250
243 SinkTrace() << "Initial query took: " << Log::TraceTime(time.elapsed()); 251 SinkTrace() << "Initial query took: " << Log::TraceTime(time.elapsed());
244 return qMakePair(entityStore.maxRevision(), replayedEntities); 252 return {entityStore.maxRevision(), replayResult.replayedEntities, replayResult.replayedAll};
245} 253}
246 254
247template class QueryRunner<Sink::ApplicationDomain::Folder>; 255template class QueryRunner<Sink::ApplicationDomain::Folder>;