diff options
author | Christian Mollekopf <chrigi_1@fastmail.fm> | 2016-12-20 16:02:38 +0100 |
---|---|---|
committer | Christian Mollekopf <chrigi_1@fastmail.fm> | 2016-12-20 16:02:38 +0100 |
commit | 1864024012213dc0a17c76e9755bf50a19944ec7 (patch) | |
tree | 06690c35c05c29deec30a3dd412d4043e2c956a3 /common/queryrunner.cpp | |
parent | 6625bb5a2145008ad47ae963e1546714b7342bf0 (diff) | |
download | sink-1864024012213dc0a17c76e9755bf50a19944ec7.tar.gz sink-1864024012213dc0a17c76e9755bf50a19944ec7.zip |
Report when we don't have any more to fetch.
... so we can use that information in fetchMore.
Diffstat (limited to 'common/queryrunner.cpp')
-rw-r--r-- | common/queryrunner.cpp | 52 |
1 files changed, 30 insertions, 22 deletions
diff --git a/common/queryrunner.cpp b/common/queryrunner.cpp index d6a90de..377e3b9 100644 --- a/common/queryrunner.cpp +++ b/common/queryrunner.cpp | |||
@@ -31,6 +31,12 @@ SINK_DEBUG_AREA("queryrunner") | |||
31 | using namespace Sink; | 31 | using namespace Sink; |
32 | using namespace Sink::Storage; | 32 | using namespace Sink::Storage; |
33 | 33 | ||
34 | struct ReplayResult { | ||
35 | qint64 newRevision; | ||
36 | qint64 replayedEntities; | ||
37 | bool replayedAll; | ||
38 | }; | ||
39 | |||
34 | /* | 40 | /* |
35 | * This class wraps the actual query implementation. | 41 | * This class wraps the actual query implementation. |
36 | * | 42 | * |
@@ -47,8 +53,8 @@ public: | |||
47 | QueryWorker(const Sink::Query &query, const ResourceContext &context, const QByteArray &bufferType, const QueryRunnerBase::ResultTransformation &transformation); | 53 | QueryWorker(const Sink::Query &query, const ResourceContext &context, const QByteArray &bufferType, const QueryRunnerBase::ResultTransformation &transformation); |
48 | virtual ~QueryWorker(); | 54 | virtual ~QueryWorker(); |
49 | 55 | ||
50 | QPair<qint64, qint64> executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider); | 56 | ReplayResult executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider); |
51 | QPair<qint64, qint64> executeInitialQuery(const Sink::Query &query, const typename DomainType::Ptr &parent, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, int offset, int batchsize); | 57 | ReplayResult executeInitialQuery(const Sink::Query &query, const typename DomainType::Ptr &parent, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, int offset, int batchsize); |
52 | 58 | ||
53 | private: | 59 | private: |
54 | void resultProviderCallback(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, const ResultSet::Result &result); | 60 | void resultProviderCallback(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, const ResultSet::Result &result); |
@@ -64,7 +70,7 @@ QueryRunner<DomainType>::QueryRunner(const Sink::Query &query, const Sink::Resou | |||
64 | { | 70 | { |
65 | SinkTrace() << "Starting query. Is live:" << query.liveQuery() << " Limit: " << query.limit(); | 71 | SinkTrace() << "Starting query. Is live:" << query.liveQuery() << " Limit: " << query.limit(); |
66 | if (query.limit() && query.sortProperty().isEmpty()) { | 72 | if (query.limit() && query.sortProperty().isEmpty()) { |
67 | SinkWarning() << "A limited query without sorting is typically a bad idea."; | 73 | SinkWarning() << "A limited query without sorting is typically a bad idea, because there is no telling what you're going to get."; |
68 | } | 74 | } |
69 | auto guardPtr = QPointer<QObject>(&guard); | 75 | auto guardPtr = QPointer<QObject>(&guard); |
70 | // We delegate loading of initial data to the result provider, so it can decide for itself what it needs to load. | 76 | // We delegate loading of initial data to the result provider, so it can decide for itself what it needs to load. |
@@ -74,31 +80,33 @@ QueryRunner<DomainType>::QueryRunner(const Sink::Query &query, const Sink::Resou | |||
74 | auto resultProvider = mResultProvider; | 80 | auto resultProvider = mResultProvider; |
75 | if (query.synchronousQuery()) { | 81 | if (query.synchronousQuery()) { |
76 | QueryWorker<DomainType> worker(query, mResourceContext, bufferType, mResultTransformation); | 82 | QueryWorker<DomainType> worker(query, mResourceContext, bufferType, mResultTransformation); |
77 | worker.executeInitialQuery(query, parent, *resultProvider, mOffset[parentId], mBatchSize); | 83 | const auto newRevisionAndReplayedEntities = worker.executeInitialQuery(query, parent, *resultProvider, mOffset[parentId], mBatchSize); |
78 | resultProvider->initialResultSetComplete(parent); | 84 | mOffset[parentId] += newRevisionAndReplayedEntities.replayedEntities; |
85 | resultProvider->setRevision(newRevisionAndReplayedEntities.newRevision); | ||
86 | resultProvider->initialResultSetComplete(parent, newRevisionAndReplayedEntities.replayedAll); | ||
79 | } else { | 87 | } else { |
80 | auto resultTransformation = mResultTransformation; | 88 | auto resultTransformation = mResultTransformation; |
81 | auto offset = mOffset[parentId]; | 89 | auto offset = mOffset[parentId]; |
82 | auto batchSize = mBatchSize; | 90 | auto batchSize = mBatchSize; |
83 | auto resourceContext = mResourceContext; | 91 | auto resourceContext = mResourceContext; |
84 | //The lambda will be executed in a separate thread, so we're extra careful | 92 | //The lambda will be executed in a separate thread, so copy all arguments |
85 | async::run<QPair<qint64, qint64> >([resultTransformation, offset, batchSize, query, bufferType, resourceContext, resultProvider, parent]() { | 93 | async::run<ReplayResult>([resultTransformation, offset, batchSize, query, bufferType, resourceContext, resultProvider, parent]() { |
86 | QueryWorker<DomainType> worker(query, resourceContext, bufferType, resultTransformation); | 94 | QueryWorker<DomainType> worker(query, resourceContext, bufferType, resultTransformation); |
87 | const auto newRevisionAndReplayedEntities = worker.executeInitialQuery(query, parent, *resultProvider, offset, batchSize); | 95 | const auto newRevisionAndReplayedEntities = worker.executeInitialQuery(query, parent, *resultProvider, offset, batchSize); |
88 | return newRevisionAndReplayedEntities; | 96 | return newRevisionAndReplayedEntities; |
89 | }) | 97 | }) |
90 | .template syncThen<void, QPair<qint64, qint64>>([this, parentId, query, parent, resultProvider, guardPtr](const QPair<qint64, qint64> &newRevisionAndReplayedEntities) { | 98 | .template syncThen<void, ReplayResult>([this, parentId, query, parent, resultProvider, guardPtr](const ReplayResult &newRevisionAndReplayedEntities) { |
91 | if (!guardPtr) { | 99 | if (!guardPtr) { |
92 | qWarning() << "The parent object is already gone"; | 100 | qWarning() << "The parent object is already gone"; |
93 | return; | 101 | return; |
94 | } | 102 | } |
95 | mOffset[parentId] += newRevisionAndReplayedEntities.second; | 103 | mOffset[parentId] += newRevisionAndReplayedEntities.replayedEntities; |
96 | // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. | 104 | // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. |
97 | if (query.liveQuery()) { | 105 | if (query.liveQuery()) { |
98 | mResourceAccess->sendRevisionReplayedCommand(newRevisionAndReplayedEntities.first); | 106 | mResourceAccess->sendRevisionReplayedCommand(newRevisionAndReplayedEntities.newRevision); |
99 | } | 107 | } |
100 | resultProvider->setRevision(newRevisionAndReplayedEntities.first); | 108 | resultProvider->setRevision(newRevisionAndReplayedEntities.newRevision); |
101 | resultProvider->initialResultSetComplete(parent); | 109 | resultProvider->initialResultSetComplete(parent, newRevisionAndReplayedEntities.replayedAll); |
102 | }) | 110 | }) |
103 | .exec(); | 111 | .exec(); |
104 | } | 112 | } |
@@ -111,19 +119,19 @@ QueryRunner<DomainType>::QueryRunner(const Sink::Query &query, const Sink::Resou | |||
111 | setQuery([=]() -> KAsync::Job<void> { | 119 | setQuery([=]() -> KAsync::Job<void> { |
112 | auto resultProvider = mResultProvider; | 120 | auto resultProvider = mResultProvider; |
113 | auto resourceContext = mResourceContext; | 121 | auto resourceContext = mResourceContext; |
114 | return async::run<QPair<qint64, qint64> >([=]() { | 122 | return async::run<ReplayResult>([=]() { |
115 | QueryWorker<DomainType> worker(query, resourceContext, bufferType, mResultTransformation); | 123 | QueryWorker<DomainType> worker(query, resourceContext, bufferType, mResultTransformation); |
116 | const auto newRevisionAndReplayedEntities = worker.executeIncrementalQuery(query, *resultProvider); | 124 | const auto newRevisionAndReplayedEntities = worker.executeIncrementalQuery(query, *resultProvider); |
117 | return newRevisionAndReplayedEntities; | 125 | return newRevisionAndReplayedEntities; |
118 | }) | 126 | }) |
119 | .template syncThen<void, QPair<qint64, qint64> >([query, this, resultProvider, guardPtr](const QPair<qint64, qint64> &newRevisionAndReplayedEntities) { | 127 | .template syncThen<void, ReplayResult>([query, this, resultProvider, guardPtr](const ReplayResult &newRevisionAndReplayedEntities) { |
120 | if (!guardPtr) { | 128 | if (!guardPtr) { |
121 | qWarning() << "The parent object is already gone"; | 129 | qWarning() << "The parent object is already gone"; |
122 | return; | 130 | return; |
123 | } | 131 | } |
124 | // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. | 132 | // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. |
125 | mResourceAccess->sendRevisionReplayedCommand(newRevisionAndReplayedEntities.first); | 133 | mResourceAccess->sendRevisionReplayedCommand(newRevisionAndReplayedEntities.newRevision); |
126 | resultProvider->setRevision(newRevisionAndReplayedEntities.first); | 134 | resultProvider->setRevision(newRevisionAndReplayedEntities.newRevision); |
127 | }); | 135 | }); |
128 | }); | 136 | }); |
129 | // Ensure the connection is open, if it wasn't already opened | 137 | // Ensure the connection is open, if it wasn't already opened |
@@ -195,7 +203,7 @@ void QueryWorker<DomainType>::resultProviderCallback(const Sink::Query &query, S | |||
195 | } | 203 | } |
196 | 204 | ||
197 | template <class DomainType> | 205 | template <class DomainType> |
198 | QPair<qint64, qint64> QueryWorker<DomainType>::executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider) | 206 | ReplayResult QueryWorker<DomainType>::executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider) |
199 | { | 207 | { |
200 | QTime time; | 208 | QTime time; |
201 | time.start(); | 209 | time.start(); |
@@ -205,16 +213,16 @@ QPair<qint64, qint64> QueryWorker<DomainType>::executeIncrementalQuery(const Sin | |||
205 | auto preparedQuery = DataStoreQuery{query, ApplicationDomain::getTypeName<DomainType>(), entityStore}; | 213 | auto preparedQuery = DataStoreQuery{query, ApplicationDomain::getTypeName<DomainType>(), entityStore}; |
206 | auto resultSet = preparedQuery.update(baseRevision); | 214 | auto resultSet = preparedQuery.update(baseRevision); |
207 | SinkTrace() << "Filtered set retrieved. " << Log::TraceTime(time.elapsed()); | 215 | SinkTrace() << "Filtered set retrieved. " << Log::TraceTime(time.elapsed()); |
208 | auto replayedEntities = resultSet.replaySet(0, 0, [this, query, &resultProvider](const ResultSet::Result &result) { | 216 | auto replayResult = resultSet.replaySet(0, 0, [this, query, &resultProvider](const ResultSet::Result &result) { |
209 | resultProviderCallback(query, resultProvider, result); | 217 | resultProviderCallback(query, resultProvider, result); |
210 | }); | 218 | }); |
211 | 219 | ||
212 | SinkTrace() << "Incremental query took: " << Log::TraceTime(time.elapsed()); | 220 | SinkTrace() << "Incremental query took: " << Log::TraceTime(time.elapsed()); |
213 | return qMakePair(entityStore.maxRevision(), replayedEntities); | 221 | return {entityStore.maxRevision(), replayResult.replayedEntities, replayResult.replayedAll}; |
214 | } | 222 | } |
215 | 223 | ||
216 | template <class DomainType> | 224 | template <class DomainType> |
217 | QPair<qint64, qint64> QueryWorker<DomainType>::executeInitialQuery( | 225 | ReplayResult QueryWorker<DomainType>::executeInitialQuery( |
218 | const Sink::Query &query, const typename DomainType::Ptr &parent, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, int offset, int batchsize) | 226 | const Sink::Query &query, const typename DomainType::Ptr &parent, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, int offset, int batchsize) |
219 | { | 227 | { |
220 | QTime time; | 228 | QTime time; |
@@ -236,12 +244,12 @@ QPair<qint64, qint64> QueryWorker<DomainType>::executeInitialQuery( | |||
236 | auto resultSet = preparedQuery.execute(); | 244 | auto resultSet = preparedQuery.execute(); |
237 | 245 | ||
238 | SinkTrace() << "Filtered set retrieved. " << Log::TraceTime(time.elapsed()); | 246 | SinkTrace() << "Filtered set retrieved. " << Log::TraceTime(time.elapsed()); |
239 | auto replayedEntities = resultSet.replaySet(offset, batchsize, [this, query, &resultProvider](const ResultSet::Result &result) { | 247 | auto replayResult = resultSet.replaySet(offset, batchsize, [this, query, &resultProvider](const ResultSet::Result &result) { |
240 | resultProviderCallback(query, resultProvider, result); | 248 | resultProviderCallback(query, resultProvider, result); |
241 | }); | 249 | }); |
242 | 250 | ||
243 | SinkTrace() << "Initial query took: " << Log::TraceTime(time.elapsed()); | 251 | SinkTrace() << "Initial query took: " << Log::TraceTime(time.elapsed()); |
244 | return qMakePair(entityStore.maxRevision(), replayedEntities); | 252 | return {entityStore.maxRevision(), replayResult.replayedEntities, replayResult.replayedAll}; |
245 | } | 253 | } |
246 | 254 | ||
247 | template class QueryRunner<Sink::ApplicationDomain::Folder>; | 255 | template class QueryRunner<Sink::ApplicationDomain::Folder>; |