summaryrefslogtreecommitdiffstats
path: root/common/queryrunner.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'common/queryrunner.cpp')
-rw-r--r--common/queryrunner.cpp83
1 files changed, 31 insertions, 52 deletions
diff --git a/common/queryrunner.cpp b/common/queryrunner.cpp
index e7963a3..4422229 100644
--- a/common/queryrunner.cpp
+++ b/common/queryrunner.cpp
@@ -52,12 +52,11 @@ public:
52 QueryWorker(const Sink::Query &query, const ResourceContext &context, const QByteArray &bufferType, const QueryRunnerBase::ResultTransformation &transformation); 52 QueryWorker(const Sink::Query &query, const ResourceContext &context, const QByteArray &bufferType, const QueryRunnerBase::ResultTransformation &transformation);
53 virtual ~QueryWorker(); 53 virtual ~QueryWorker();
54 54
55 qint64 replaySet(ResultSet &resultSet, int offset, int batchSize, const ResultCallback &callback);
56 QPair<qint64, qint64> executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider); 55 QPair<qint64, qint64> executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider);
57 QPair<qint64, qint64> executeInitialQuery(const Sink::Query &query, const typename DomainType::Ptr &parent, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, int offset, int batchsize); 56 QPair<qint64, qint64> executeInitialQuery(const Sink::Query &query, const typename DomainType::Ptr &parent, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, int offset, int batchsize);
58 57
59private: 58private:
60 std::function<bool(const typename DomainType::Ptr &, Sink::Operation, const QMap<QByteArray, QVariant> &)> resultProviderCallback(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider); 59 void resultProviderCallback(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, const ResultSet::Result &result);
61 60
62 QueryRunnerBase::ResultTransformation mResultTransformation; 61 QueryRunnerBase::ResultTransformation mResultTransformation;
63 ResourceContext mResourceContext; 62 ResourceContext mResourceContext;
@@ -175,55 +174,32 @@ QueryWorker<DomainType>::~QueryWorker()
175} 174}
176 175
177template <class DomainType> 176template <class DomainType>
178std::function<bool(const typename DomainType::Ptr &, Sink::Operation, const QMap<QByteArray, QVariant> &)> QueryWorker<DomainType>::resultProviderCallback(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider) 177void QueryWorker<DomainType>::resultProviderCallback(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, const ResultSet::Result &result)
179{ 178{
180 return [this, &query, &resultProvider](const typename DomainType::Ptr &domainObject, Sink::Operation operation, const QMap<QByteArray, QVariant> &aggregateValues) -> bool { 179 auto adaptor = mResourceContext.adaptorFactory<DomainType>().createAdaptor(result.buffer.entity());
181 auto valueCopy = Sink::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<DomainType>(*domainObject, query.requestedProperties).template staticCast<DomainType>(); 180 Q_ASSERT(adaptor);
182 for (auto it = aggregateValues.constBegin(); it != aggregateValues.constEnd(); it++) { 181 auto domainObject = DomainType{mResourceContext.instanceId(), result.uid, result.buffer.revision(), adaptor};
183 valueCopy->setProperty(it.key(), it.value()); 182 auto valueCopy = Sink::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<DomainType>(domainObject, query.requestedProperties).template staticCast<DomainType>();
184 } 183 for (auto it = result.aggregateValues.constBegin(); it != result.aggregateValues.constEnd(); it++) {
185 if (mResultTransformation) { 184 valueCopy->setProperty(it.key(), it.value());
186 mResultTransformation(*valueCopy); 185 }
187 } 186 if (mResultTransformation) {
188 switch (operation) { 187 mResultTransformation(*valueCopy);
189 case Sink::Operation_Creation: 188 }
190 // SinkTrace() << "Got creation"; 189 switch (result.operation) {
191 resultProvider.add(valueCopy); 190 case Sink::Operation_Creation:
192 break; 191 // SinkTrace() << "Got creation";
193 case Sink::Operation_Modification: 192 resultProvider.add(valueCopy);
194 // SinkTrace() << "Got modification";
195 resultProvider.modify(valueCopy);
196 break;
197 case Sink::Operation_Removal:
198 // SinkTrace() << "Got removal";
199 resultProvider.remove(valueCopy);
200 break;
201 }
202 return true;
203 };
204}
205
206template <class DomainType>
207qint64 QueryWorker<DomainType>::replaySet(ResultSet &resultSet, int offset, int batchSize, const ResultCallback &callback)
208{
209 SinkTrace() << "Skipping over " << offset << " results";
210 resultSet.skip(offset);
211 int counter = 0;
212 while (!batchSize || (counter < batchSize)) {
213 const bool ret =
214 resultSet.next([this, &counter, callback](const ResultSet::Result &result) -> bool {
215 counter++;
216 auto adaptor = mResourceContext.adaptorFactory<DomainType>().createAdaptor(result.buffer.entity());
217 Q_ASSERT(adaptor);
218 return callback(QSharedPointer<DomainType>::create(mResourceContext.instanceId(), result.uid, result.buffer.revision(), adaptor), result.operation, result.aggregateValues);
219 });
220 if (!ret) {
221 break; 193 break;
222 } 194 case Sink::Operation_Modification:
223 }; 195 // SinkTrace() << "Got modification";
224 SinkTrace() << "Replayed " << counter << " results." 196 resultProvider.modify(valueCopy);
225 << "Limit " << batchSize; 197 break;
226 return counter; 198 case Sink::Operation_Removal:
199 // SinkTrace() << "Got removal";
200 resultProvider.remove(valueCopy);
201 break;
202 }
227} 203}
228 204
229template <class DomainType> 205template <class DomainType>
@@ -238,9 +214,10 @@ QPair<qint64, qint64> QueryWorker<DomainType>::executeIncrementalQuery(const Sin
238 214
239 auto preparedQuery = ApplicationDomain::TypeImplementation<DomainType>::prepareQuery(query, entityStore); 215 auto preparedQuery = ApplicationDomain::TypeImplementation<DomainType>::prepareQuery(query, entityStore);
240 auto resultSet = preparedQuery->update(baseRevision); 216 auto resultSet = preparedQuery->update(baseRevision);
241
242 SinkTrace() << "Filtered set retrieved. " << Log::TraceTime(time.elapsed()); 217 SinkTrace() << "Filtered set retrieved. " << Log::TraceTime(time.elapsed());
243 auto replayedEntities = replaySet(resultSet, 0, 0, resultProviderCallback(query, resultProvider)); 218 auto replayedEntities = resultSet.replaySet(0, 0, [this, query, &resultProvider](const ResultSet::Result &result) {
219 resultProviderCallback(query, resultProvider, result);
220 });
244 221
245 SinkTrace() << "Incremental query took: " << Log::TraceTime(time.elapsed()); 222 SinkTrace() << "Incremental query took: " << Log::TraceTime(time.elapsed());
246 return qMakePair(entityStore->maxRevision(), replayedEntities); 223 return qMakePair(entityStore->maxRevision(), replayedEntities);
@@ -270,7 +247,9 @@ QPair<qint64, qint64> QueryWorker<DomainType>::executeInitialQuery(
270 auto resultSet = preparedQuery->execute(); 247 auto resultSet = preparedQuery->execute();
271 248
272 SinkTrace() << "Filtered set retrieved. " << Log::TraceTime(time.elapsed()); 249 SinkTrace() << "Filtered set retrieved. " << Log::TraceTime(time.elapsed());
273 auto replayedEntities = replaySet(resultSet, offset, batchsize, resultProviderCallback(query, resultProvider)); 250 auto replayedEntities = resultSet.replaySet(offset, batchsize, [this, query, &resultProvider](const ResultSet::Result &result) {
251 resultProviderCallback(query, resultProvider, result);
252 });
274 253
275 SinkTrace() << "Initial query took: " << Log::TraceTime(time.elapsed()); 254 SinkTrace() << "Initial query took: " << Log::TraceTime(time.elapsed());
276 return qMakePair(entityStore->maxRevision(), replayedEntities); 255 return qMakePair(entityStore->maxRevision(), replayedEntities);