summaryrefslogtreecommitdiffstats
path: root/common/queryrunner.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'common/queryrunner.cpp')
-rw-r--r--common/queryrunner.cpp67
1 files changed, 45 insertions, 22 deletions
diff --git a/common/queryrunner.cpp b/common/queryrunner.cpp
index d4ace86..1f645e8 100644
--- a/common/queryrunner.cpp
+++ b/common/queryrunner.cpp
@@ -45,10 +45,10 @@ public:
45 virtual ~QueryWorker(); 45 virtual ~QueryWorker();
46 46
47 qint64 executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider); 47 qint64 executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider);
48 qint64 executeInitialQuery(const Sink::Query &query, const typename DomainType::Ptr &parent, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider); 48 qint64 executeInitialQuery(const Sink::Query &query, const typename DomainType::Ptr &parent, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, int offset, int batchsize);
49 49
50private: 50private:
51 void replaySet(ResultSet &resultSet, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, const QList<QByteArray> &properties); 51 void replaySet(ResultSet &resultSet, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, const QList<QByteArray> &properties, int offset, int batchSize);
52 52
53 void readEntity(const Sink::Storage::NamedDatabase &db, const QByteArray &key, const std::function<void(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &, Sink::Operation)> &resultCallback); 53 void readEntity(const Sink::Storage::NamedDatabase &db, const QByteArray &key, const std::function<void(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &, Sink::Operation)> &resultCallback);
54 54
@@ -57,7 +57,7 @@ private:
57 57
58 ResultSet filterAndSortSet(ResultSet &resultSet, const std::function<bool(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> &filter, const Sink::Storage::NamedDatabase &db, bool initialQuery, const QByteArray &sortProperty); 58 ResultSet filterAndSortSet(ResultSet &resultSet, const std::function<bool(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> &filter, const Sink::Storage::NamedDatabase &db, bool initialQuery, const QByteArray &sortProperty);
59 std::function<bool(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> getFilter(const QSet<QByteArray> remainingFilters, const Sink::Query &query); 59 std::function<bool(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> getFilter(const QSet<QByteArray> remainingFilters, const Sink::Query &query);
60 qint64 load(const Sink::Query &query, const std::function<ResultSet(Sink::Storage::Transaction &, QSet<QByteArray> &)> &baseSetRetriever, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, bool initialQuery); 60 qint64 load(const Sink::Query &query, const std::function<ResultSet(Sink::Storage::Transaction &, QSet<QByteArray> &)> &baseSetRetriever, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, bool initialQuery, int offset, int batchSize);
61 61
62private: 62private:
63 QueryRunnerBase::ResultTransformation mResultTransformation; 63 QueryRunnerBase::ResultTransformation mResultTransformation;
@@ -72,7 +72,9 @@ template<class DomainType>
72QueryRunner<DomainType>::QueryRunner(const Sink::Query &query, const Sink::ResourceAccessInterface::Ptr &resourceAccess, const QByteArray &instanceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &factory, const QByteArray &bufferType) 72QueryRunner<DomainType>::QueryRunner(const Sink::Query &query, const Sink::ResourceAccessInterface::Ptr &resourceAccess, const QByteArray &instanceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &factory, const QByteArray &bufferType)
73 : QueryRunnerBase(), 73 : QueryRunnerBase(),
74 mResourceAccess(resourceAccess), 74 mResourceAccess(resourceAccess),
75 mResultProvider(new ResultProvider<typename DomainType::Ptr>) 75 mResultProvider(new ResultProvider<typename DomainType::Ptr>),
76 mOffset(0),
77 mBatchSize(0)
76{ 78{
77 Trace() << "Starting query"; 79 Trace() << "Starting query";
78 //We delegate loading of initial data to the result provider, os it can decide for itself what it needs to load. 80 //We delegate loading of initial data to the result provider, os it can decide for itself what it needs to load.
@@ -81,7 +83,7 @@ QueryRunner<DomainType>::QueryRunner(const Sink::Query &query, const Sink::Resou
81 auto resultProvider = mResultProvider; 83 auto resultProvider = mResultProvider;
82 async::run<qint64>([=]() -> qint64 { 84 async::run<qint64>([=]() -> qint64 {
83 QueryWorker<DomainType> worker(query, instanceIdentifier, factory, bufferType, mResultTransformation); 85 QueryWorker<DomainType> worker(query, instanceIdentifier, factory, bufferType, mResultTransformation);
84 const qint64 newRevision = worker.executeInitialQuery(query, parent, *resultProvider); 86 const qint64 newRevision = worker.executeInitialQuery(query, parent, *resultProvider, mOffset, mBatchSize);
85 return newRevision; 87 return newRevision;
86 }) 88 })
87 .template then<void, qint64>([query, this](qint64 newRevision) { 89 .template then<void, qint64>([query, this](qint64 newRevision) {
@@ -150,7 +152,6 @@ static inline ResultSet fullScan(const Sink::Storage::Transaction &transaction,
150 Warning() << "Error during query: " << error.message; 152 Warning() << "Error during query: " << error.message;
151 }); 153 });
152 154
153 Trace() << "Full scan on " << bufferType << " found " << keys.size() << " results";
154 return ResultSet(keys); 155 return ResultSet(keys);
155} 156}
156 157
@@ -174,15 +175,19 @@ QueryWorker<DomainType>::~QueryWorker()
174} 175}
175 176
176template<class DomainType> 177template<class DomainType>
177void QueryWorker<DomainType>::replaySet(ResultSet &resultSet, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, const QList<QByteArray> &properties) 178void QueryWorker<DomainType>::replaySet(ResultSet &resultSet, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, const QList<QByteArray> &properties, int offset, int batchSize)
178{ 179{
179 int counter = 0; 180 int counter = 0;
180 while (resultSet.next([this, &resultProvider, &counter, &properties](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &value, Sink::Operation operation) -> bool { 181 resultSet.skip(offset);
182 while (resultSet.next([this, &resultProvider, &counter, &properties, batchSize](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &value, Sink::Operation operation) -> bool {
181 //FIXME allow maildir resource to set the mimeMessage property 183 //FIXME allow maildir resource to set the mimeMessage property
182 auto valueCopy = Sink::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<DomainType>(*value, properties).template staticCast<DomainType>(); 184 auto valueCopy = Sink::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<DomainType>(*value, properties).template staticCast<DomainType>();
183 if (mResultTransformation) { 185 if (mResultTransformation) {
184 mResultTransformation(*valueCopy); 186 mResultTransformation(*valueCopy);
185 } 187 }
188 if (batchSize && counter >= batchSize) {
189 return false;
190 }
186 counter++; 191 counter++;
187 switch (operation) { 192 switch (operation) {
188 case Sink::Operation_Creation: 193 case Sink::Operation_Creation:
@@ -217,7 +222,8 @@ void QueryWorker<DomainType>::readEntity(const Sink::Storage::NamedDatabase &db,
217 Q_ASSERT(metadataBuffer); 222 Q_ASSERT(metadataBuffer);
218 const qint64 revision = metadataBuffer ? metadataBuffer->revision() : -1; 223 const qint64 revision = metadataBuffer ? metadataBuffer->revision() : -1;
219 const auto operation = metadataBuffer ? metadataBuffer->operation() : Sink::Operation_Creation; 224 const auto operation = metadataBuffer ? metadataBuffer->operation() : Sink::Operation_Creation;
220 resultCallback(DomainType::Ptr::create(mResourceInstanceIdentifier, Sink::Storage::uidFromKey(key), revision, mDomainTypeAdaptorFactory->createAdaptor(entity)), operation); 225 auto adaptor = mDomainTypeAdaptorFactory->createAdaptor(entity);
226 resultCallback(DomainType::Ptr::create(mResourceInstanceIdentifier, Sink::Storage::uidFromKey(key), revision, adaptor), operation);
221 return false; 227 return false;
222 }, 228 },
223 [](const Sink::Storage::Error &error) { 229 [](const Sink::Storage::Error &error) {
@@ -274,9 +280,10 @@ ResultSet QueryWorker<DomainType>::loadIncrementalResultSet(qint64 baseRevision,
274template<class DomainType> 280template<class DomainType>
275ResultSet QueryWorker<DomainType>::filterAndSortSet(ResultSet &resultSet, const std::function<bool(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> &filter, const Sink::Storage::NamedDatabase &db, bool initialQuery, const QByteArray &sortProperty) 281ResultSet QueryWorker<DomainType>::filterAndSortSet(ResultSet &resultSet, const std::function<bool(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> &filter, const Sink::Storage::NamedDatabase &db, bool initialQuery, const QByteArray &sortProperty)
276{ 282{
277 auto sortedMap = QSharedPointer<QMap<QByteArray, QByteArray>>::create(); 283 bool sortingRequired = false;
278 284 if (initialQuery && sortingRequired) {
279 if (initialQuery) { 285 //Sort the complete set by reading the sort property and filling into a sorted map
286 auto sortedMap = QSharedPointer<QMap<QByteArray, QByteArray>>::create();
280 while (resultSet.next()) { 287 while (resultSet.next()) {
281 //readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess) 288 //readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess)
282 readEntity(db, resultSet.id(), [this, filter, initialQuery, sortedMap, sortProperty, &resultSet](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject, Sink::Operation operation) { 289 readEntity(db, resultSet.id(), [this, filter, initialQuery, sortedMap, sortProperty, &resultSet](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject, Sink::Operation operation) {
@@ -301,23 +308,39 @@ ResultSet QueryWorker<DomainType>::filterAndSortSet(ResultSet &resultSet, const
301 } 308 }
302 return false; 309 return false;
303 }; 310 };
304 return ResultSet(generator); 311
312 auto skip = [iterator]() {
313 iterator->next();
314 };
315 return ResultSet(generator, skip);
305 } else { 316 } else {
306 auto resultSetPtr = QSharedPointer<ResultSet>::create(resultSet); 317 auto resultSetPtr = QSharedPointer<ResultSet>::create(resultSet);
307 ResultSet::ValueGenerator generator = [this, resultSetPtr, &db, filter, initialQuery](std::function<void(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &, Sink::Operation)> callback) -> bool { 318 ResultSet::ValueGenerator generator = [this, resultSetPtr, &db, filter, initialQuery](std::function<void(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &, Sink::Operation)> callback) -> bool {
308 if (resultSetPtr->next()) { 319 if (resultSetPtr->next()) {
309 //readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess) 320 //readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess)
310 readEntity(db, resultSetPtr->id(), [this, filter, callback, initialQuery](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject, Sink::Operation operation) { 321 readEntity(db, resultSetPtr->id(), [this, filter, callback, initialQuery](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject, Sink::Operation operation) {
311 //Always remove removals, they probably don't match due to non-available properties 322 if (initialQuery) {
312 if ((operation == Sink::Operation_Removal) || filter(domainObject)) { 323 //We're not interested in removals during the initial query
313 callback(domainObject, operation); 324 if ((operation != Sink::Operation_Removal) && filter(domainObject)) {
325 //In the initial set every entity is new
326 callback(domainObject, Sink::Operation_Creation);
327 }
328 } else {
329 //Always remove removals, they probably don't match due to non-available properties
330 if ((operation == Sink::Operation_Removal) || filter(domainObject)) {
331 //TODO only replay if this is in the currently visible set (or just always replay, worst case we have a couple to many results)
332 callback(domainObject, operation);
333 }
314 } 334 }
315 }); 335 });
316 return true; 336 return true;
317 } 337 }
318 return false; 338 return false;
319 }; 339 };
320 return ResultSet(generator); 340 auto skip = [resultSetPtr]() {
341 resultSetPtr->skip(1);
342 };
343 return ResultSet(generator, skip);
321 } 344 }
322} 345}
323 346
@@ -347,7 +370,7 @@ std::function<bool(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &do
347} 370}
348 371
349template<class DomainType> 372template<class DomainType>
350qint64 QueryWorker<DomainType>::load(const Sink::Query &query, const std::function<ResultSet(Sink::Storage::Transaction &, QSet<QByteArray> &)> &baseSetRetriever, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, bool initialQuery) 373qint64 QueryWorker<DomainType>::load(const Sink::Query &query, const std::function<ResultSet(Sink::Storage::Transaction &, QSet<QByteArray> &)> &baseSetRetriever, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, bool initialQuery, int offset, int batchSize)
351{ 374{
352 QTime time; 375 QTime time;
353 time.start(); 376 time.start();
@@ -364,7 +387,7 @@ qint64 QueryWorker<DomainType>::load(const Sink::Query &query, const std::functi
364 Trace() << "Base set retrieved. " << time.elapsed(); 387 Trace() << "Base set retrieved. " << time.elapsed();
365 auto filteredSet = filterAndSortSet(resultSet, getFilter(remainingFilters, query), db, initialQuery, query.sortProperty); 388 auto filteredSet = filterAndSortSet(resultSet, getFilter(remainingFilters, query), db, initialQuery, query.sortProperty);
366 Trace() << "Filtered set retrieved. " << time.elapsed(); 389 Trace() << "Filtered set retrieved. " << time.elapsed();
367 replaySet(filteredSet, resultProvider, query.requestedProperties); 390 replaySet(filteredSet, resultProvider, query.requestedProperties, offset, batchSize);
368 Trace() << "Filtered set replayed. " << time.elapsed(); 391 Trace() << "Filtered set replayed. " << time.elapsed();
369 resultProvider.setRevision(Sink::Storage::maxRevision(transaction)); 392 resultProvider.setRevision(Sink::Storage::maxRevision(transaction));
370 return Sink::Storage::maxRevision(transaction); 393 return Sink::Storage::maxRevision(transaction);
@@ -380,13 +403,13 @@ qint64 QueryWorker<DomainType>::executeIncrementalQuery(const Sink::Query &query
380 Trace() << "Running incremental query " << baseRevision; 403 Trace() << "Running incremental query " << baseRevision;
381 auto revision = load(query, [&](Sink::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters) -> ResultSet { 404 auto revision = load(query, [&](Sink::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters) -> ResultSet {
382 return loadIncrementalResultSet(baseRevision, query, transaction, remainingFilters); 405 return loadIncrementalResultSet(baseRevision, query, transaction, remainingFilters);
383 }, resultProvider, false); 406 }, resultProvider, false, 0, 0);
384 Trace() << "Incremental query took: " << time.elapsed() << " ms"; 407 Trace() << "Incremental query took: " << time.elapsed() << " ms";
385 return revision; 408 return revision;
386} 409}
387 410
388template<class DomainType> 411template<class DomainType>
389qint64 QueryWorker<DomainType>::executeInitialQuery(const Sink::Query &query, const typename DomainType::Ptr &parent, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider) 412qint64 QueryWorker<DomainType>::executeInitialQuery(const Sink::Query &query, const typename DomainType::Ptr &parent, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, int offset, int batchsize)
390{ 413{
391 QTime time; 414 QTime time;
392 time.start(); 415 time.start();
@@ -403,7 +426,7 @@ qint64 QueryWorker<DomainType>::executeInitialQuery(const Sink::Query &query, co
403 } 426 }
404 auto revision = load(modifiedQuery, [&](Sink::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters) -> ResultSet { 427 auto revision = load(modifiedQuery, [&](Sink::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters) -> ResultSet {
405 return loadInitialResultSet(modifiedQuery, transaction, remainingFilters); 428 return loadInitialResultSet(modifiedQuery, transaction, remainingFilters);
406 }, resultProvider, true); 429 }, resultProvider, true, offset, batchsize);
407 Trace() << "Initial query took: " << time.elapsed() << " ms"; 430 Trace() << "Initial query took: " << time.elapsed() << " ms";
408 resultProvider.initialResultSetComplete(parent); 431 resultProvider.initialResultSetComplete(parent);
409 return revision; 432 return revision;