summaryrefslogtreecommitdiffstats
path: root/common/queryrunner.cpp
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2016-04-29 15:16:50 +0200
committerChristian Mollekopf <chrigi_1@fastmail.fm>2016-04-29 15:16:50 +0200
commit78d60438e8f1c962b6933431fe59ac44318d0352 (patch)
treef795e3f9641065d6233708cdbac6794f837ed0db /common/queryrunner.cpp
parent2be6033726b332fa78268989f0dacede4efc59bf (diff)
downloadsink-78d60438e8f1c962b6933431fe59ac44318d0352.tar.gz
sink-78d60438e8f1c962b6933431fe59ac44318d0352.zip
Increase the offset by the actually replayed items.
Diffstat (limited to 'common/queryrunner.cpp')
-rw-r--r--common/queryrunner.cpp53
1 files changed, 27 insertions, 26 deletions
diff --git a/common/queryrunner.cpp b/common/queryrunner.cpp
index 144c487..d86d26e 100644
--- a/common/queryrunner.cpp
+++ b/common/queryrunner.cpp
@@ -47,11 +47,11 @@ public:
47 const QueryRunnerBase::ResultTransformation &transformation); 47 const QueryRunnerBase::ResultTransformation &transformation);
48 virtual ~QueryWorker(); 48 virtual ~QueryWorker();
49 49
50 qint64 executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider); 50 QPair<qint64, qint64> executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider);
51 qint64 executeInitialQuery(const Sink::Query &query, const typename DomainType::Ptr &parent, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, int offset, int batchsize); 51 QPair<qint64, qint64> executeInitialQuery(const Sink::Query &query, const typename DomainType::Ptr &parent, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, int offset, int batchsize);
52 52
53private: 53private:
54 void replaySet(ResultSet &resultSet, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, const QList<QByteArray> &properties, int offset, int batchSize); 54 qint64 replaySet(ResultSet &resultSet, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, const QList<QByteArray> &properties, int offset, int batchSize);
55 55
56 void readEntity(const Sink::Storage::NamedDatabase &db, const QByteArray &key, 56 void readEntity(const Sink::Storage::NamedDatabase &db, const QByteArray &key,
57 const std::function<void(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &, Sink::Operation)> &resultCallback); 57 const std::function<void(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &, Sink::Operation)> &resultCallback);
@@ -62,7 +62,7 @@ private:
62 ResultSet filterAndSortSet(ResultSet &resultSet, const std::function<bool(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> &filter, 62 ResultSet filterAndSortSet(ResultSet &resultSet, const std::function<bool(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> &filter,
63 const Sink::Storage::NamedDatabase &db, bool initialQuery, const QByteArray &sortProperty); 63 const Sink::Storage::NamedDatabase &db, bool initialQuery, const QByteArray &sortProperty);
64 std::function<bool(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> getFilter(const QSet<QByteArray> remainingFilters, const Sink::Query &query); 64 std::function<bool(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> getFilter(const QSet<QByteArray> remainingFilters, const Sink::Query &query);
65 qint64 load(const Sink::Query &query, const std::function<ResultSet(Sink::Storage::Transaction &, QSet<QByteArray> &, QByteArray &)> &baseSetRetriever, 65 QPair<qint64, qint64> load(const Sink::Query &query, const std::function<ResultSet(Sink::Storage::Transaction &, QSet<QByteArray> &, QByteArray &)> &baseSetRetriever,
66 Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, bool initialQuery, int offset, int batchSize); 66 Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, bool initialQuery, int offset, int batchSize);
67 67
68private: 68private:
@@ -83,20 +83,20 @@ QueryRunner<DomainType>::QueryRunner(const Sink::Query &query, const Sink::Resou
83 if (query.limit && query.sortProperty.isEmpty()) { 83 if (query.limit && query.sortProperty.isEmpty()) {
84 Warning() << "A limited query without sorting is typically a bad idea."; 84 Warning() << "A limited query without sorting is typically a bad idea.";
85 } 85 }
86 // We delegate loading of initial data to the result provider, os it can decide for itself what it needs to load. 86 // We delegate loading of initial data to the result provider, so it can decide for itself what it needs to load.
87 mResultProvider->setFetcher([=](const typename DomainType::Ptr &parent) { 87 mResultProvider->setFetcher([=](const typename DomainType::Ptr &parent) {
88 Trace() << "Running fetcher. Offset: " << mOffset << " Batchsize: " << mBatchSize; 88 Trace() << "Running fetcher. Offset: " << mOffset << " Batchsize: " << mBatchSize;
89 auto resultProvider = mResultProvider; 89 auto resultProvider = mResultProvider;
90 async::run<qint64>([=]() -> qint64 { 90 async::run<QPair<qint64, qint64> >([=]() {
91 QueryWorker<DomainType> worker(query, instanceIdentifier, factory, bufferType, mResultTransformation); 91 QueryWorker<DomainType> worker(query, instanceIdentifier, factory, bufferType, mResultTransformation);
92 const qint64 newRevision = worker.executeInitialQuery(query, parent, *resultProvider, mOffset, mBatchSize); 92 const auto newRevisionAndReplayedEntities = worker.executeInitialQuery(query, parent, *resultProvider, mOffset, mBatchSize);
93 return newRevision; 93 return newRevisionAndReplayedEntities;
94 }) 94 })
95 .template then<void, qint64>([query, this](qint64 newRevision) { 95 .template then<void, QPair<qint64, qint64>>([query, this](const QPair<qint64, qint64> &newRevisionAndReplayedEntities) {
96 mOffset += mBatchSize; 96 mOffset += newRevisionAndReplayedEntities.second;
97 // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. 97 // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise.
98 if (query.liveQuery) { 98 if (query.liveQuery) {
99 mResourceAccess->sendRevisionReplayedCommand(newRevision); 99 mResourceAccess->sendRevisionReplayedCommand(newRevisionAndReplayedEntities.first);
100 } 100 }
101 }) 101 })
102 .exec(); 102 .exec();
@@ -107,14 +107,14 @@ QueryRunner<DomainType>::QueryRunner(const Sink::Query &query, const Sink::Resou
107 // Incremental updates are always loaded directly, leaving it up to the result to discard the changes if they are not interesting 107 // Incremental updates are always loaded directly, leaving it up to the result to discard the changes if they are not interesting
108 setQuery([=]() -> KAsync::Job<void> { 108 setQuery([=]() -> KAsync::Job<void> {
109 auto resultProvider = mResultProvider; 109 auto resultProvider = mResultProvider;
110 return async::run<qint64>([=]() -> qint64 { 110 return async::run<QPair<qint64, qint64> >([=]() {
111 QueryWorker<DomainType> worker(query, instanceIdentifier, factory, bufferType, mResultTransformation); 111 QueryWorker<DomainType> worker(query, instanceIdentifier, factory, bufferType, mResultTransformation);
112 const qint64 newRevision = worker.executeIncrementalQuery(query, *resultProvider); 112 const auto newRevisionAndReplayedEntities = worker.executeIncrementalQuery(query, *resultProvider);
113 return newRevision; 113 return newRevisionAndReplayedEntities;
114 }) 114 })
115 .template then<void, qint64>([query, this](qint64 newRevision) { 115 .template then<void, QPair<qint64, qint64> >([query, this](const QPair<qint64, qint64> &newRevisionAndReplayedEntities) {
116 // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. 116 // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise.
117 mResourceAccess->sendRevisionReplayedCommand(newRevision); 117 mResourceAccess->sendRevisionReplayedCommand(newRevisionAndReplayedEntities.first);
118 }); 118 });
119 }); 119 });
120 // Ensure the connection is open, if it wasn't already opened 120 // Ensure the connection is open, if it wasn't already opened
@@ -179,7 +179,7 @@ QueryWorker<DomainType>::~QueryWorker()
179} 179}
180 180
181template <class DomainType> 181template <class DomainType>
182void QueryWorker<DomainType>::replaySet(ResultSet &resultSet, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, const QList<QByteArray> &properties, int offset, int batchSize) 182qint64 QueryWorker<DomainType>::replaySet(ResultSet &resultSet, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, const QList<QByteArray> &properties, int offset, int batchSize)
183{ 183{
184 Trace() << "Skipping over " << offset << " results"; 184 Trace() << "Skipping over " << offset << " results";
185 resultSet.skip(offset); 185 resultSet.skip(offset);
@@ -214,6 +214,7 @@ void QueryWorker<DomainType>::replaySet(ResultSet &resultSet, Sink::ResultProvid
214 }; 214 };
215 Trace() << "Replayed " << counter << " results." 215 Trace() << "Replayed " << counter << " results."
216 << "Limit " << batchSize; 216 << "Limit " << batchSize;
217 return counter;
217} 218}
218 219
219template <class DomainType> 220template <class DomainType>
@@ -394,7 +395,7 @@ QueryWorker<DomainType>::getFilter(const QSet<QByteArray> remainingFilters, cons
394} 395}
395 396
396template <class DomainType> 397template <class DomainType>
397qint64 QueryWorker<DomainType>::load(const Sink::Query &query, const std::function<ResultSet(Sink::Storage::Transaction &, QSet<QByteArray> &, QByteArray &)> &baseSetRetriever, 398QPair<qint64, qint64> QueryWorker<DomainType>::load(const Sink::Query &query, const std::function<ResultSet(Sink::Storage::Transaction &, QSet<QByteArray> &, QByteArray &)> &baseSetRetriever,
398 Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, bool initialQuery, int offset, int batchSize) 399 Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, bool initialQuery, int offset, int batchSize)
399{ 400{
400 QTime time; 401 QTime time;
@@ -411,29 +412,29 @@ qint64 QueryWorker<DomainType>::load(const Sink::Query &query, const std::functi
411 Trace() << "Base set retrieved. " << Log::TraceTime(time.elapsed()); 412 Trace() << "Base set retrieved. " << Log::TraceTime(time.elapsed());
412 auto filteredSet = filterAndSortSet(resultSet, getFilter(remainingFilters, query), db, initialQuery, remainingSorting); 413 auto filteredSet = filterAndSortSet(resultSet, getFilter(remainingFilters, query), db, initialQuery, remainingSorting);
413 Trace() << "Filtered set retrieved. " << Log::TraceTime(time.elapsed()); 414 Trace() << "Filtered set retrieved. " << Log::TraceTime(time.elapsed());
414 replaySet(filteredSet, resultProvider, query.requestedProperties, offset, batchSize); 415 auto replayedEntities = replaySet(filteredSet, resultProvider, query.requestedProperties, offset, batchSize);
415 Trace() << "Filtered set replayed. " << Log::TraceTime(time.elapsed()); 416 Trace() << "Filtered set replayed. " << Log::TraceTime(time.elapsed());
416 resultProvider.setRevision(Sink::Storage::maxRevision(transaction)); 417 resultProvider.setRevision(Sink::Storage::maxRevision(transaction));
417 return Sink::Storage::maxRevision(transaction); 418 return qMakePair(Sink::Storage::maxRevision(transaction), replayedEntities);
418} 419}
419 420
420template <class DomainType> 421template <class DomainType>
421qint64 QueryWorker<DomainType>::executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider) 422QPair<qint64, qint64> QueryWorker<DomainType>::executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider)
422{ 423{
423 QTime time; 424 QTime time;
424 time.start(); 425 time.start();
425 426
426 const qint64 baseRevision = resultProvider.revision() + 1; 427 const qint64 baseRevision = resultProvider.revision() + 1;
427 Trace() << "Running incremental query " << baseRevision; 428 Trace() << "Running incremental query " << baseRevision;
428 auto revision = load(query, [&](Sink::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters, QByteArray &remainingSorting) -> ResultSet { 429 auto revisionAndReplayedEntities = load(query, [&](Sink::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters, QByteArray &remainingSorting) -> ResultSet {
429 return loadIncrementalResultSet(baseRevision, query, transaction, remainingFilters); 430 return loadIncrementalResultSet(baseRevision, query, transaction, remainingFilters);
430 }, resultProvider, false, 0, 0); 431 }, resultProvider, false, 0, 0);
431 Trace() << "Incremental query took: " << Log::TraceTime(time.elapsed()); 432 Trace() << "Incremental query took: " << Log::TraceTime(time.elapsed());
432 return revision; 433 return revisionAndReplayedEntities;
433} 434}
434 435
435template <class DomainType> 436template <class DomainType>
436qint64 QueryWorker<DomainType>::executeInitialQuery( 437QPair<qint64, qint64> QueryWorker<DomainType>::executeInitialQuery(
437 const Sink::Query &query, const typename DomainType::Ptr &parent, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, int offset, int batchsize) 438 const Sink::Query &query, const typename DomainType::Ptr &parent, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, int offset, int batchsize)
438{ 439{
439 QTime time; 440 QTime time;
@@ -449,12 +450,12 @@ qint64 QueryWorker<DomainType>::executeInitialQuery(
449 modifiedQuery.propertyFilter.insert(query.parentProperty, QVariant()); 450 modifiedQuery.propertyFilter.insert(query.parentProperty, QVariant());
450 } 451 }
451 } 452 }
452 auto revision = load(modifiedQuery, [&](Sink::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters, QByteArray &remainingSorting) -> ResultSet { 453 auto revisionAndReplayedEntities = load(modifiedQuery, [&](Sink::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters, QByteArray &remainingSorting) -> ResultSet {
453 return loadInitialResultSet(modifiedQuery, transaction, remainingFilters, remainingSorting); 454 return loadInitialResultSet(modifiedQuery, transaction, remainingFilters, remainingSorting);
454 }, resultProvider, true, offset, batchsize); 455 }, resultProvider, true, offset, batchsize);
455 Trace() << "Initial query took: " << Log::TraceTime(time.elapsed()); 456 Trace() << "Initial query took: " << Log::TraceTime(time.elapsed());
456 resultProvider.initialResultSetComplete(parent); 457 resultProvider.initialResultSetComplete(parent);
457 return revision; 458 return revisionAndReplayedEntities;
458} 459}
459 460
460template class QueryRunner<Sink::ApplicationDomain::Folder>; 461template class QueryRunner<Sink::ApplicationDomain::Folder>;