diff options
Diffstat (limited to 'common/queryrunner.cpp')
-rw-r--r-- | common/queryrunner.cpp | 153 |
1 files changed, 86 insertions, 67 deletions
diff --git a/common/queryrunner.cpp b/common/queryrunner.cpp index 928e1e0..0ed4cb5 100644 --- a/common/queryrunner.cpp +++ b/common/queryrunner.cpp | |||
@@ -69,79 +69,14 @@ QueryRunner<DomainType>::QueryRunner(const Sink::Query &query, const Sink::Resou | |||
69 | if (query.limit() && query.sortProperty().isEmpty()) { | 69 | if (query.limit() && query.sortProperty().isEmpty()) { |
70 | SinkWarningCtx(mLogCtx) << "A limited query without sorting is typically a bad idea, because there is no telling what you're going to get."; | 70 | SinkWarningCtx(mLogCtx) << "A limited query without sorting is typically a bad idea, because there is no telling what you're going to get."; |
71 | } | 71 | } |
72 | auto guardPtr = QPointer<QObject>(&guard); | ||
73 | auto fetcher = [=]() { | ||
74 | SinkTraceCtx(mLogCtx) << "Running fetcher. Batchsize: " << mBatchSize; | ||
75 | auto resultProvider = mResultProvider; | ||
76 | auto resultTransformation = mResultTransformation; | ||
77 | auto batchSize = mBatchSize; | ||
78 | auto resourceContext = mResourceContext; | ||
79 | auto logCtx = mLogCtx; | ||
80 | auto state = mQueryState; | ||
81 | const bool runAsync = !query.synchronousQuery(); | ||
82 | //The lambda will be executed in a separate thread, so copy all arguments | ||
83 | async::run<ReplayResult>([=]() { | ||
84 | QueryWorker<DomainType> worker(query, resourceContext, bufferType, resultTransformation, logCtx); | ||
85 | return worker.executeInitialQuery(query, *resultProvider, batchSize, state); | ||
86 | }, runAsync) | ||
87 | .then([this, query, resultProvider, guardPtr](const ReplayResult &result) { | ||
88 | if (!guardPtr) { | ||
89 | //Not an error, the query can vanish at any time. | ||
90 | return; | ||
91 | } | ||
92 | mInitialQueryComplete = true; | ||
93 | mQueryState = result.queryState; | ||
94 | // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. | ||
95 | if (query.liveQuery()) { | ||
96 | mResourceAccess->sendRevisionReplayedCommand(result.newRevision); | ||
97 | } | ||
98 | resultProvider->setRevision(result.newRevision); | ||
99 | resultProvider->initialResultSetComplete(result.replayedAll); | ||
100 | }) | ||
101 | .exec(); | ||
102 | }; | ||
103 | |||
104 | // We delegate loading of initial data to the result provider, so it can decide for itself what it needs to load. | 72 | // We delegate loading of initial data to the result provider, so it can decide for itself what it needs to load. |
105 | mResultProvider->setFetcher(fetcher); | 73 | mResultProvider->setFetcher([this, query, bufferType] { fetch(query, bufferType); }); |
106 | 74 | ||
107 | // In case of a live query we keep the runner for as long alive as the result provider exists | 75 | // In case of a live query we keep the runner for as long alive as the result provider exists |
108 | if (query.liveQuery()) { | 76 | if (query.liveQuery()) { |
109 | Q_ASSERT(!query.synchronousQuery()); | 77 | Q_ASSERT(!query.synchronousQuery()); |
110 | // Incremental updates are always loaded directly, leaving it up to the result to discard the changes if they are not interesting | 78 | // Incremental updates are always loaded directly, leaving it up to the result to discard the changes if they are not interesting |
111 | setQuery([=]() -> KAsync::Job<void> { | 79 | setQuery([=]() { return incrementalFetch(query, bufferType); }); |
112 | auto resultProvider = mResultProvider; | ||
113 | auto resourceContext = mResourceContext; | ||
114 | auto logCtx = mLogCtx; | ||
115 | auto state = mQueryState; | ||
116 | auto resultTransformation = mResultTransformation; | ||
117 | if (!mInitialQueryComplete) { | ||
118 | SinkWarningCtx(mLogCtx) << "Can't start the incremental query before the initial query is complete"; | ||
119 | fetcher(); | ||
120 | return KAsync::null(); | ||
121 | } | ||
122 | if (mQueryInProgress) { | ||
123 | //Can happen if the revision come in quicker than we process them. | ||
124 | return KAsync::null(); | ||
125 | } | ||
126 | Q_ASSERT(!mQueryInProgress); | ||
127 | return KAsync::start([&] { | ||
128 | mQueryInProgress = true; | ||
129 | }) | ||
130 | .then(async::run<ReplayResult>([=]() { | ||
131 | QueryWorker<DomainType> worker(query, resourceContext, bufferType, resultTransformation, logCtx); | ||
132 | return worker.executeIncrementalQuery(query, *resultProvider, state); | ||
133 | })) | ||
134 | .then([query, this, resultProvider, guardPtr](const ReplayResult &newRevisionAndReplayedEntities) { | ||
135 | if (!guardPtr) { | ||
136 | //Not an error, the query can vanish at any time. | ||
137 | return; | ||
138 | } | ||
139 | mQueryInProgress = false; | ||
140 | // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. | ||
141 | mResourceAccess->sendRevisionReplayedCommand(newRevisionAndReplayedEntities.newRevision); | ||
142 | resultProvider->setRevision(newRevisionAndReplayedEntities.newRevision); | ||
143 | }); | ||
144 | }); | ||
145 | // Ensure the connection is open, if it wasn't already opened | 80 | // Ensure the connection is open, if it wasn't already opened |
146 | // TODO If we are not connected already, we have to check for the latest revision once connected, otherwise we could miss some updates | 81 | // TODO If we are not connected already, we have to check for the latest revision once connected, otherwise we could miss some updates |
147 | mResourceAccess->open(); | 82 | mResourceAccess->open(); |
@@ -158,6 +93,90 @@ QueryRunner<DomainType>::~QueryRunner() | |||
158 | SinkTraceCtx(mLogCtx) << "Stopped query"; | 93 | SinkTraceCtx(mLogCtx) << "Stopped query"; |
159 | } | 94 | } |
160 | 95 | ||
96 | //This function triggers the initial fetch, and then subsequent calls will simply fetch more data of mBatchSize. | ||
97 | template <class DomainType> | ||
98 | void QueryRunner<DomainType>::fetch(const Sink::Query &query, const QByteArray &bufferType) | ||
99 | { | ||
100 | auto guardPtr = QPointer<QObject>(&guard); | ||
101 | SinkTraceCtx(mLogCtx) << "Running fetcher. Batchsize: " << mBatchSize; | ||
102 | if (mQueryInProgress) { | ||
103 | SinkTraceCtx(mLogCtx) << "Query is already in progress, postponing: " << mBatchSize; | ||
104 | mRequestFetchMore = true; | ||
105 | return; | ||
106 | } | ||
107 | mQueryInProgress = true; | ||
108 | auto resultProvider = mResultProvider; | ||
109 | auto resultTransformation = mResultTransformation; | ||
110 | auto batchSize = mBatchSize; | ||
111 | auto resourceContext = mResourceContext; | ||
112 | auto logCtx = mLogCtx; | ||
113 | auto state = mQueryState; | ||
114 | const bool runAsync = !query.synchronousQuery(); | ||
115 | //The lambda will be executed in a separate thread, so copy all arguments | ||
116 | async::run<ReplayResult>([=]() { | ||
117 | QueryWorker<DomainType> worker(query, resourceContext, bufferType, resultTransformation, logCtx); | ||
118 | return worker.executeInitialQuery(query, *resultProvider, batchSize, state); | ||
119 | }, runAsync) | ||
120 | .then([=](const ReplayResult &result) { | ||
121 | if (!guardPtr) { | ||
122 | //Not an error, the query can vanish at any time. | ||
123 | return; | ||
124 | } | ||
125 | mInitialQueryComplete = true; | ||
126 | mQueryInProgress = false; | ||
127 | mQueryState = result.queryState; | ||
128 | // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. | ||
129 | if (query.liveQuery()) { | ||
130 | mResourceAccess->sendRevisionReplayedCommand(result.newRevision); | ||
131 | } | ||
132 | resultProvider->setRevision(result.newRevision); | ||
133 | resultProvider->initialResultSetComplete(result.replayedAll); | ||
134 | if (mRequestFetchMore) { | ||
135 | mRequestFetchMore = false; | ||
136 | fetch(query, bufferType); | ||
137 | } | ||
138 | }) | ||
139 | .exec(); | ||
140 | } | ||
141 | |||
142 | template <class DomainType> | ||
143 | KAsync::Job<void> QueryRunner<DomainType>::incrementalFetch(const Sink::Query &query, const QByteArray &bufferType) | ||
144 | { | ||
145 | if (!mInitialQueryComplete) { | ||
146 | SinkWarningCtx(mLogCtx) << "Can't start the incremental query before the initial query is complete"; | ||
147 | fetch(query, bufferType); | ||
148 | return KAsync::null(); | ||
149 | } | ||
150 | if (mQueryInProgress) { | ||
151 | //Can happen if the revision come in quicker than we process them. | ||
152 | return KAsync::null(); | ||
153 | } | ||
154 | auto resultProvider = mResultProvider; | ||
155 | auto resourceContext = mResourceContext; | ||
156 | auto logCtx = mLogCtx; | ||
157 | auto state = mQueryState; | ||
158 | auto resultTransformation = mResultTransformation; | ||
159 | Q_ASSERT(!mQueryInProgress); | ||
160 | auto guardPtr = QPointer<QObject>(&guard); | ||
161 | return KAsync::start([&] { | ||
162 | mQueryInProgress = true; | ||
163 | }) | ||
164 | .then(async::run<ReplayResult>([=]() { | ||
165 | QueryWorker<DomainType> worker(query, resourceContext, bufferType, resultTransformation, logCtx); | ||
166 | return worker.executeIncrementalQuery(query, *resultProvider, state); | ||
167 | })) | ||
168 | .then([query, this, resultProvider, guardPtr](const ReplayResult &newRevisionAndReplayedEntities) { | ||
169 | if (!guardPtr) { | ||
170 | //Not an error, the query can vanish at any time. | ||
171 | return; | ||
172 | } | ||
173 | mQueryInProgress = false; | ||
174 | // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. | ||
175 | mResourceAccess->sendRevisionReplayedCommand(newRevisionAndReplayedEntities.newRevision); | ||
176 | resultProvider->setRevision(newRevisionAndReplayedEntities.newRevision); | ||
177 | }); | ||
178 | } | ||
179 | |||
161 | template <class DomainType> | 180 | template <class DomainType> |
162 | void QueryRunner<DomainType>::setResultTransformation(const ResultTransformation &transformation) | 181 | void QueryRunner<DomainType>::setResultTransformation(const ResultTransformation &transformation) |
163 | { | 182 | { |