diff options
Diffstat (limited to 'common/queryrunner.cpp')
-rw-r--r-- | common/queryrunner.cpp | 83 |
1 files changed, 31 insertions, 52 deletions
diff --git a/common/queryrunner.cpp b/common/queryrunner.cpp index e7963a3..4422229 100644 --- a/common/queryrunner.cpp +++ b/common/queryrunner.cpp | |||
@@ -52,12 +52,11 @@ public: | |||
52 | QueryWorker(const Sink::Query &query, const ResourceContext &context, const QByteArray &bufferType, const QueryRunnerBase::ResultTransformation &transformation); | 52 | QueryWorker(const Sink::Query &query, const ResourceContext &context, const QByteArray &bufferType, const QueryRunnerBase::ResultTransformation &transformation); |
53 | virtual ~QueryWorker(); | 53 | virtual ~QueryWorker(); |
54 | 54 | ||
55 | qint64 replaySet(ResultSet &resultSet, int offset, int batchSize, const ResultCallback &callback); | ||
56 | QPair<qint64, qint64> executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider); | 55 | QPair<qint64, qint64> executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider); |
57 | QPair<qint64, qint64> executeInitialQuery(const Sink::Query &query, const typename DomainType::Ptr &parent, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, int offset, int batchsize); | 56 | QPair<qint64, qint64> executeInitialQuery(const Sink::Query &query, const typename DomainType::Ptr &parent, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, int offset, int batchsize); |
58 | 57 | ||
59 | private: | 58 | private: |
60 | std::function<bool(const typename DomainType::Ptr &, Sink::Operation, const QMap<QByteArray, QVariant> &)> resultProviderCallback(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider); | 59 | void resultProviderCallback(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, const ResultSet::Result &result); |
61 | 60 | ||
62 | QueryRunnerBase::ResultTransformation mResultTransformation; | 61 | QueryRunnerBase::ResultTransformation mResultTransformation; |
63 | ResourceContext mResourceContext; | 62 | ResourceContext mResourceContext; |
@@ -175,55 +174,32 @@ QueryWorker<DomainType>::~QueryWorker() | |||
175 | } | 174 | } |
176 | 175 | ||
177 | template <class DomainType> | 176 | template <class DomainType> |
178 | std::function<bool(const typename DomainType::Ptr &, Sink::Operation, const QMap<QByteArray, QVariant> &)> QueryWorker<DomainType>::resultProviderCallback(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider) | 177 | void QueryWorker<DomainType>::resultProviderCallback(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, const ResultSet::Result &result) |
179 | { | 178 | { |
180 | return [this, &query, &resultProvider](const typename DomainType::Ptr &domainObject, Sink::Operation operation, const QMap<QByteArray, QVariant> &aggregateValues) -> bool { | 179 | auto adaptor = mResourceContext.adaptorFactory<DomainType>().createAdaptor(result.buffer.entity()); |
181 | auto valueCopy = Sink::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<DomainType>(*domainObject, query.requestedProperties).template staticCast<DomainType>(); | 180 | Q_ASSERT(adaptor); |
182 | for (auto it = aggregateValues.constBegin(); it != aggregateValues.constEnd(); it++) { | 181 | auto domainObject = DomainType{mResourceContext.instanceId(), result.uid, result.buffer.revision(), adaptor}; |
183 | valueCopy->setProperty(it.key(), it.value()); | 182 | auto valueCopy = Sink::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<DomainType>(domainObject, query.requestedProperties).template staticCast<DomainType>(); |
184 | } | 183 | for (auto it = result.aggregateValues.constBegin(); it != result.aggregateValues.constEnd(); it++) { |
185 | if (mResultTransformation) { | 184 | valueCopy->setProperty(it.key(), it.value()); |
186 | mResultTransformation(*valueCopy); | 185 | } |
187 | } | 186 | if (mResultTransformation) { |
188 | switch (operation) { | 187 | mResultTransformation(*valueCopy); |
189 | case Sink::Operation_Creation: | 188 | } |
190 | // SinkTrace() << "Got creation"; | 189 | switch (result.operation) { |
191 | resultProvider.add(valueCopy); | 190 | case Sink::Operation_Creation: |
192 | break; | 191 | // SinkTrace() << "Got creation"; |
193 | case Sink::Operation_Modification: | 192 | resultProvider.add(valueCopy); |
194 | // SinkTrace() << "Got modification"; | ||
195 | resultProvider.modify(valueCopy); | ||
196 | break; | ||
197 | case Sink::Operation_Removal: | ||
198 | // SinkTrace() << "Got removal"; | ||
199 | resultProvider.remove(valueCopy); | ||
200 | break; | ||
201 | } | ||
202 | return true; | ||
203 | }; | ||
204 | } | ||
205 | |||
206 | template <class DomainType> | ||
207 | qint64 QueryWorker<DomainType>::replaySet(ResultSet &resultSet, int offset, int batchSize, const ResultCallback &callback) | ||
208 | { | ||
209 | SinkTrace() << "Skipping over " << offset << " results"; | ||
210 | resultSet.skip(offset); | ||
211 | int counter = 0; | ||
212 | while (!batchSize || (counter < batchSize)) { | ||
213 | const bool ret = | ||
214 | resultSet.next([this, &counter, callback](const ResultSet::Result &result) -> bool { | ||
215 | counter++; | ||
216 | auto adaptor = mResourceContext.adaptorFactory<DomainType>().createAdaptor(result.buffer.entity()); | ||
217 | Q_ASSERT(adaptor); | ||
218 | return callback(QSharedPointer<DomainType>::create(mResourceContext.instanceId(), result.uid, result.buffer.revision(), adaptor), result.operation, result.aggregateValues); | ||
219 | }); | ||
220 | if (!ret) { | ||
221 | break; | 193 | break; |
222 | } | 194 | case Sink::Operation_Modification: |
223 | }; | 195 | // SinkTrace() << "Got modification"; |
224 | SinkTrace() << "Replayed " << counter << " results." | 196 | resultProvider.modify(valueCopy); |
225 | << "Limit " << batchSize; | 197 | break; |
226 | return counter; | 198 | case Sink::Operation_Removal: |
199 | // SinkTrace() << "Got removal"; | ||
200 | resultProvider.remove(valueCopy); | ||
201 | break; | ||
202 | } | ||
227 | } | 203 | } |
228 | 204 | ||
229 | template <class DomainType> | 205 | template <class DomainType> |
@@ -238,9 +214,10 @@ QPair<qint64, qint64> QueryWorker<DomainType>::executeIncrementalQuery(const Sin | |||
238 | 214 | ||
239 | auto preparedQuery = ApplicationDomain::TypeImplementation<DomainType>::prepareQuery(query, entityStore); | 215 | auto preparedQuery = ApplicationDomain::TypeImplementation<DomainType>::prepareQuery(query, entityStore); |
240 | auto resultSet = preparedQuery->update(baseRevision); | 216 | auto resultSet = preparedQuery->update(baseRevision); |
241 | |||
242 | SinkTrace() << "Filtered set retrieved. " << Log::TraceTime(time.elapsed()); | 217 | SinkTrace() << "Filtered set retrieved. " << Log::TraceTime(time.elapsed()); |
243 | auto replayedEntities = replaySet(resultSet, 0, 0, resultProviderCallback(query, resultProvider)); | 218 | auto replayedEntities = resultSet.replaySet(0, 0, [this, query, &resultProvider](const ResultSet::Result &result) { |
219 | resultProviderCallback(query, resultProvider, result); | ||
220 | }); | ||
244 | 221 | ||
245 | SinkTrace() << "Incremental query took: " << Log::TraceTime(time.elapsed()); | 222 | SinkTrace() << "Incremental query took: " << Log::TraceTime(time.elapsed()); |
246 | return qMakePair(entityStore->maxRevision(), replayedEntities); | 223 | return qMakePair(entityStore->maxRevision(), replayedEntities); |
@@ -270,7 +247,9 @@ QPair<qint64, qint64> QueryWorker<DomainType>::executeInitialQuery( | |||
270 | auto resultSet = preparedQuery->execute(); | 247 | auto resultSet = preparedQuery->execute(); |
271 | 248 | ||
272 | SinkTrace() << "Filtered set retrieved. " << Log::TraceTime(time.elapsed()); | 249 | SinkTrace() << "Filtered set retrieved. " << Log::TraceTime(time.elapsed()); |
273 | auto replayedEntities = replaySet(resultSet, offset, batchsize, resultProviderCallback(query, resultProvider)); | 250 | auto replayedEntities = resultSet.replaySet(offset, batchsize, [this, query, &resultProvider](const ResultSet::Result &result) { |
251 | resultProviderCallback(query, resultProvider, result); | ||
252 | }); | ||
274 | 253 | ||
275 | SinkTrace() << "Initial query took: " << Log::TraceTime(time.elapsed()); | 254 | SinkTrace() << "Initial query took: " << Log::TraceTime(time.elapsed()); |
276 | return qMakePair(entityStore->maxRevision(), replayedEntities); | 255 | return qMakePair(entityStore->maxRevision(), replayedEntities); |