diff options
Diffstat (limited to 'common/queryrunner.cpp')
-rw-r--r-- | common/queryrunner.cpp | 67 |
1 files changed, 45 insertions, 22 deletions
diff --git a/common/queryrunner.cpp b/common/queryrunner.cpp index d4ace86..1f645e8 100644 --- a/common/queryrunner.cpp +++ b/common/queryrunner.cpp | |||
@@ -45,10 +45,10 @@ public: | |||
45 | virtual ~QueryWorker(); | 45 | virtual ~QueryWorker(); |
46 | 46 | ||
47 | qint64 executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider); | 47 | qint64 executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider); |
48 | qint64 executeInitialQuery(const Sink::Query &query, const typename DomainType::Ptr &parent, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider); | 48 | qint64 executeInitialQuery(const Sink::Query &query, const typename DomainType::Ptr &parent, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, int offset, int batchsize); |
49 | 49 | ||
50 | private: | 50 | private: |
51 | void replaySet(ResultSet &resultSet, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, const QList<QByteArray> &properties); | 51 | void replaySet(ResultSet &resultSet, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, const QList<QByteArray> &properties, int offset, int batchSize); |
52 | 52 | ||
53 | void readEntity(const Sink::Storage::NamedDatabase &db, const QByteArray &key, const std::function<void(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &, Sink::Operation)> &resultCallback); | 53 | void readEntity(const Sink::Storage::NamedDatabase &db, const QByteArray &key, const std::function<void(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &, Sink::Operation)> &resultCallback); |
54 | 54 | ||
@@ -57,7 +57,7 @@ private: | |||
57 | 57 | ||
58 | ResultSet filterAndSortSet(ResultSet &resultSet, const std::function<bool(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> &filter, const Sink::Storage::NamedDatabase &db, bool initialQuery, const QByteArray &sortProperty); | 58 | ResultSet filterAndSortSet(ResultSet &resultSet, const std::function<bool(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> &filter, const Sink::Storage::NamedDatabase &db, bool initialQuery, const QByteArray &sortProperty); |
59 | std::function<bool(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> getFilter(const QSet<QByteArray> remainingFilters, const Sink::Query &query); | 59 | std::function<bool(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> getFilter(const QSet<QByteArray> remainingFilters, const Sink::Query &query); |
60 | qint64 load(const Sink::Query &query, const std::function<ResultSet(Sink::Storage::Transaction &, QSet<QByteArray> &)> &baseSetRetriever, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, bool initialQuery); | 60 | qint64 load(const Sink::Query &query, const std::function<ResultSet(Sink::Storage::Transaction &, QSet<QByteArray> &)> &baseSetRetriever, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, bool initialQuery, int offset, int batchSize); |
61 | 61 | ||
62 | private: | 62 | private: |
63 | QueryRunnerBase::ResultTransformation mResultTransformation; | 63 | QueryRunnerBase::ResultTransformation mResultTransformation; |
@@ -72,7 +72,9 @@ template<class DomainType> | |||
72 | QueryRunner<DomainType>::QueryRunner(const Sink::Query &query, const Sink::ResourceAccessInterface::Ptr &resourceAccess, const QByteArray &instanceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &factory, const QByteArray &bufferType) | 72 | QueryRunner<DomainType>::QueryRunner(const Sink::Query &query, const Sink::ResourceAccessInterface::Ptr &resourceAccess, const QByteArray &instanceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &factory, const QByteArray &bufferType) |
73 | : QueryRunnerBase(), | 73 | : QueryRunnerBase(), |
74 | mResourceAccess(resourceAccess), | 74 | mResourceAccess(resourceAccess), |
75 | mResultProvider(new ResultProvider<typename DomainType::Ptr>) | 75 | mResultProvider(new ResultProvider<typename DomainType::Ptr>), |
76 | mOffset(0), | ||
77 | mBatchSize(0) | ||
76 | { | 78 | { |
77 | Trace() << "Starting query"; | 79 | Trace() << "Starting query"; |
78 | //We delegate loading of initial data to the result provider, os it can decide for itself what it needs to load. | 80 | //We delegate loading of initial data to the result provider, os it can decide for itself what it needs to load. |
@@ -81,7 +83,7 @@ QueryRunner<DomainType>::QueryRunner(const Sink::Query &query, const Sink::Resou | |||
81 | auto resultProvider = mResultProvider; | 83 | auto resultProvider = mResultProvider; |
82 | async::run<qint64>([=]() -> qint64 { | 84 | async::run<qint64>([=]() -> qint64 { |
83 | QueryWorker<DomainType> worker(query, instanceIdentifier, factory, bufferType, mResultTransformation); | 85 | QueryWorker<DomainType> worker(query, instanceIdentifier, factory, bufferType, mResultTransformation); |
84 | const qint64 newRevision = worker.executeInitialQuery(query, parent, *resultProvider); | 86 | const qint64 newRevision = worker.executeInitialQuery(query, parent, *resultProvider, mOffset, mBatchSize); |
85 | return newRevision; | 87 | return newRevision; |
86 | }) | 88 | }) |
87 | .template then<void, qint64>([query, this](qint64 newRevision) { | 89 | .template then<void, qint64>([query, this](qint64 newRevision) { |
@@ -150,7 +152,6 @@ static inline ResultSet fullScan(const Sink::Storage::Transaction &transaction, | |||
150 | Warning() << "Error during query: " << error.message; | 152 | Warning() << "Error during query: " << error.message; |
151 | }); | 153 | }); |
152 | 154 | ||
153 | Trace() << "Full scan on " << bufferType << " found " << keys.size() << " results"; | ||
154 | return ResultSet(keys); | 155 | return ResultSet(keys); |
155 | } | 156 | } |
156 | 157 | ||
@@ -174,15 +175,19 @@ QueryWorker<DomainType>::~QueryWorker() | |||
174 | } | 175 | } |
175 | 176 | ||
176 | template<class DomainType> | 177 | template<class DomainType> |
177 | void QueryWorker<DomainType>::replaySet(ResultSet &resultSet, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, const QList<QByteArray> &properties) | 178 | void QueryWorker<DomainType>::replaySet(ResultSet &resultSet, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, const QList<QByteArray> &properties, int offset, int batchSize) |
178 | { | 179 | { |
179 | int counter = 0; | 180 | int counter = 0; |
180 | while (resultSet.next([this, &resultProvider, &counter, &properties](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &value, Sink::Operation operation) -> bool { | 181 | resultSet.skip(offset); |
182 | while (resultSet.next([this, &resultProvider, &counter, &properties, batchSize](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &value, Sink::Operation operation) -> bool { | ||
181 | //FIXME allow maildir resource to set the mimeMessage property | 183 | //FIXME allow maildir resource to set the mimeMessage property |
182 | auto valueCopy = Sink::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<DomainType>(*value, properties).template staticCast<DomainType>(); | 184 | auto valueCopy = Sink::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<DomainType>(*value, properties).template staticCast<DomainType>(); |
183 | if (mResultTransformation) { | 185 | if (mResultTransformation) { |
184 | mResultTransformation(*valueCopy); | 186 | mResultTransformation(*valueCopy); |
185 | } | 187 | } |
188 | if (batchSize && counter >= batchSize) { | ||
189 | return false; | ||
190 | } | ||
186 | counter++; | 191 | counter++; |
187 | switch (operation) { | 192 | switch (operation) { |
188 | case Sink::Operation_Creation: | 193 | case Sink::Operation_Creation: |
@@ -217,7 +222,8 @@ void QueryWorker<DomainType>::readEntity(const Sink::Storage::NamedDatabase &db, | |||
217 | Q_ASSERT(metadataBuffer); | 222 | Q_ASSERT(metadataBuffer); |
218 | const qint64 revision = metadataBuffer ? metadataBuffer->revision() : -1; | 223 | const qint64 revision = metadataBuffer ? metadataBuffer->revision() : -1; |
219 | const auto operation = metadataBuffer ? metadataBuffer->operation() : Sink::Operation_Creation; | 224 | const auto operation = metadataBuffer ? metadataBuffer->operation() : Sink::Operation_Creation; |
220 | resultCallback(DomainType::Ptr::create(mResourceInstanceIdentifier, Sink::Storage::uidFromKey(key), revision, mDomainTypeAdaptorFactory->createAdaptor(entity)), operation); | 225 | auto adaptor = mDomainTypeAdaptorFactory->createAdaptor(entity); |
226 | resultCallback(DomainType::Ptr::create(mResourceInstanceIdentifier, Sink::Storage::uidFromKey(key), revision, adaptor), operation); | ||
221 | return false; | 227 | return false; |
222 | }, | 228 | }, |
223 | [](const Sink::Storage::Error &error) { | 229 | [](const Sink::Storage::Error &error) { |
@@ -274,9 +280,10 @@ ResultSet QueryWorker<DomainType>::loadIncrementalResultSet(qint64 baseRevision, | |||
274 | template<class DomainType> | 280 | template<class DomainType> |
275 | ResultSet QueryWorker<DomainType>::filterAndSortSet(ResultSet &resultSet, const std::function<bool(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> &filter, const Sink::Storage::NamedDatabase &db, bool initialQuery, const QByteArray &sortProperty) | 281 | ResultSet QueryWorker<DomainType>::filterAndSortSet(ResultSet &resultSet, const std::function<bool(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> &filter, const Sink::Storage::NamedDatabase &db, bool initialQuery, const QByteArray &sortProperty) |
276 | { | 282 | { |
277 | auto sortedMap = QSharedPointer<QMap<QByteArray, QByteArray>>::create(); | 283 | bool sortingRequired = false; |
278 | 284 | if (initialQuery && sortingRequired) { | |
279 | if (initialQuery) { | 285 | //Sort the complete set by reading the sort property and filling into a sorted map |
286 | auto sortedMap = QSharedPointer<QMap<QByteArray, QByteArray>>::create(); | ||
280 | while (resultSet.next()) { | 287 | while (resultSet.next()) { |
281 | //readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess) | 288 | //readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess) |
282 | readEntity(db, resultSet.id(), [this, filter, initialQuery, sortedMap, sortProperty, &resultSet](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject, Sink::Operation operation) { | 289 | readEntity(db, resultSet.id(), [this, filter, initialQuery, sortedMap, sortProperty, &resultSet](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject, Sink::Operation operation) { |
@@ -301,23 +308,39 @@ ResultSet QueryWorker<DomainType>::filterAndSortSet(ResultSet &resultSet, const | |||
301 | } | 308 | } |
302 | return false; | 309 | return false; |
303 | }; | 310 | }; |
304 | return ResultSet(generator); | 311 | |
312 | auto skip = [iterator]() { | ||
313 | iterator->next(); | ||
314 | }; | ||
315 | return ResultSet(generator, skip); | ||
305 | } else { | 316 | } else { |
306 | auto resultSetPtr = QSharedPointer<ResultSet>::create(resultSet); | 317 | auto resultSetPtr = QSharedPointer<ResultSet>::create(resultSet); |
307 | ResultSet::ValueGenerator generator = [this, resultSetPtr, &db, filter, initialQuery](std::function<void(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &, Sink::Operation)> callback) -> bool { | 318 | ResultSet::ValueGenerator generator = [this, resultSetPtr, &db, filter, initialQuery](std::function<void(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &, Sink::Operation)> callback) -> bool { |
308 | if (resultSetPtr->next()) { | 319 | if (resultSetPtr->next()) { |
309 | //readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess) | 320 | //readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess) |
310 | readEntity(db, resultSetPtr->id(), [this, filter, callback, initialQuery](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject, Sink::Operation operation) { | 321 | readEntity(db, resultSetPtr->id(), [this, filter, callback, initialQuery](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject, Sink::Operation operation) { |
311 | //Always remove removals, they probably don't match due to non-available properties | 322 | if (initialQuery) { |
312 | if ((operation == Sink::Operation_Removal) || filter(domainObject)) { | 323 | //We're not interested in removals during the initial query |
313 | callback(domainObject, operation); | 324 | if ((operation != Sink::Operation_Removal) && filter(domainObject)) { |
325 | //In the initial set every entity is new | ||
326 | callback(domainObject, Sink::Operation_Creation); | ||
327 | } | ||
328 | } else { | ||
329 | //Always remove removals, they probably don't match due to non-available properties | ||
330 | if ((operation == Sink::Operation_Removal) || filter(domainObject)) { | ||
331 | //TODO only replay if this is in the currently visible set (or just always replay, worst case we have a couple to many results) | ||
332 | callback(domainObject, operation); | ||
333 | } | ||
314 | } | 334 | } |
315 | }); | 335 | }); |
316 | return true; | 336 | return true; |
317 | } | 337 | } |
318 | return false; | 338 | return false; |
319 | }; | 339 | }; |
320 | return ResultSet(generator); | 340 | auto skip = [resultSetPtr]() { |
341 | resultSetPtr->skip(1); | ||
342 | }; | ||
343 | return ResultSet(generator, skip); | ||
321 | } | 344 | } |
322 | } | 345 | } |
323 | 346 | ||
@@ -347,7 +370,7 @@ std::function<bool(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &do | |||
347 | } | 370 | } |
348 | 371 | ||
349 | template<class DomainType> | 372 | template<class DomainType> |
350 | qint64 QueryWorker<DomainType>::load(const Sink::Query &query, const std::function<ResultSet(Sink::Storage::Transaction &, QSet<QByteArray> &)> &baseSetRetriever, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, bool initialQuery) | 373 | qint64 QueryWorker<DomainType>::load(const Sink::Query &query, const std::function<ResultSet(Sink::Storage::Transaction &, QSet<QByteArray> &)> &baseSetRetriever, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, bool initialQuery, int offset, int batchSize) |
351 | { | 374 | { |
352 | QTime time; | 375 | QTime time; |
353 | time.start(); | 376 | time.start(); |
@@ -364,7 +387,7 @@ qint64 QueryWorker<DomainType>::load(const Sink::Query &query, const std::functi | |||
364 | Trace() << "Base set retrieved. " << time.elapsed(); | 387 | Trace() << "Base set retrieved. " << time.elapsed(); |
365 | auto filteredSet = filterAndSortSet(resultSet, getFilter(remainingFilters, query), db, initialQuery, query.sortProperty); | 388 | auto filteredSet = filterAndSortSet(resultSet, getFilter(remainingFilters, query), db, initialQuery, query.sortProperty); |
366 | Trace() << "Filtered set retrieved. " << time.elapsed(); | 389 | Trace() << "Filtered set retrieved. " << time.elapsed(); |
367 | replaySet(filteredSet, resultProvider, query.requestedProperties); | 390 | replaySet(filteredSet, resultProvider, query.requestedProperties, offset, batchSize); |
368 | Trace() << "Filtered set replayed. " << time.elapsed(); | 391 | Trace() << "Filtered set replayed. " << time.elapsed(); |
369 | resultProvider.setRevision(Sink::Storage::maxRevision(transaction)); | 392 | resultProvider.setRevision(Sink::Storage::maxRevision(transaction)); |
370 | return Sink::Storage::maxRevision(transaction); | 393 | return Sink::Storage::maxRevision(transaction); |
@@ -380,13 +403,13 @@ qint64 QueryWorker<DomainType>::executeIncrementalQuery(const Sink::Query &query | |||
380 | Trace() << "Running incremental query " << baseRevision; | 403 | Trace() << "Running incremental query " << baseRevision; |
381 | auto revision = load(query, [&](Sink::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters) -> ResultSet { | 404 | auto revision = load(query, [&](Sink::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters) -> ResultSet { |
382 | return loadIncrementalResultSet(baseRevision, query, transaction, remainingFilters); | 405 | return loadIncrementalResultSet(baseRevision, query, transaction, remainingFilters); |
383 | }, resultProvider, false); | 406 | }, resultProvider, false, 0, 0); |
384 | Trace() << "Incremental query took: " << time.elapsed() << " ms"; | 407 | Trace() << "Incremental query took: " << time.elapsed() << " ms"; |
385 | return revision; | 408 | return revision; |
386 | } | 409 | } |
387 | 410 | ||
388 | template<class DomainType> | 411 | template<class DomainType> |
389 | qint64 QueryWorker<DomainType>::executeInitialQuery(const Sink::Query &query, const typename DomainType::Ptr &parent, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider) | 412 | qint64 QueryWorker<DomainType>::executeInitialQuery(const Sink::Query &query, const typename DomainType::Ptr &parent, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, int offset, int batchsize) |
390 | { | 413 | { |
391 | QTime time; | 414 | QTime time; |
392 | time.start(); | 415 | time.start(); |
@@ -403,7 +426,7 @@ qint64 QueryWorker<DomainType>::executeInitialQuery(const Sink::Query &query, co | |||
403 | } | 426 | } |
404 | auto revision = load(modifiedQuery, [&](Sink::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters) -> ResultSet { | 427 | auto revision = load(modifiedQuery, [&](Sink::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters) -> ResultSet { |
405 | return loadInitialResultSet(modifiedQuery, transaction, remainingFilters); | 428 | return loadInitialResultSet(modifiedQuery, transaction, remainingFilters); |
406 | }, resultProvider, true); | 429 | }, resultProvider, true, offset, batchsize); |
407 | Trace() << "Initial query took: " << time.elapsed() << " ms"; | 430 | Trace() << "Initial query took: " << time.elapsed() << " ms"; |
408 | resultProvider.initialResultSetComplete(parent); | 431 | resultProvider.initialResultSetComplete(parent); |
409 | return revision; | 432 | return revision; |