diff options
author | Christian Mollekopf <chrigi_1@fastmail.fm> | 2016-04-29 15:16:50 +0200 |
---|---|---|
committer | Christian Mollekopf <chrigi_1@fastmail.fm> | 2016-04-29 15:16:50 +0200 |
commit | 78d60438e8f1c962b6933431fe59ac44318d0352 (patch) | |
tree | f795e3f9641065d6233708cdbac6794f837ed0db /common/queryrunner.cpp | |
parent | 2be6033726b332fa78268989f0dacede4efc59bf (diff) | |
download | sink-78d60438e8f1c962b6933431fe59ac44318d0352.tar.gz sink-78d60438e8f1c962b6933431fe59ac44318d0352.zip |
Increase the offset by the actually replayed items.
Diffstat (limited to 'common/queryrunner.cpp')
-rw-r--r-- | common/queryrunner.cpp | 53 |
1 files changed, 27 insertions, 26 deletions
diff --git a/common/queryrunner.cpp b/common/queryrunner.cpp index 144c487..d86d26e 100644 --- a/common/queryrunner.cpp +++ b/common/queryrunner.cpp | |||
@@ -47,11 +47,11 @@ public: | |||
47 | const QueryRunnerBase::ResultTransformation &transformation); | 47 | const QueryRunnerBase::ResultTransformation &transformation); |
48 | virtual ~QueryWorker(); | 48 | virtual ~QueryWorker(); |
49 | 49 | ||
50 | qint64 executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider); | 50 | QPair<qint64, qint64> executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider); |
51 | qint64 executeInitialQuery(const Sink::Query &query, const typename DomainType::Ptr &parent, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, int offset, int batchsize); | 51 | QPair<qint64, qint64> executeInitialQuery(const Sink::Query &query, const typename DomainType::Ptr &parent, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, int offset, int batchsize); |
52 | 52 | ||
53 | private: | 53 | private: |
54 | void replaySet(ResultSet &resultSet, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, const QList<QByteArray> &properties, int offset, int batchSize); | 54 | qint64 replaySet(ResultSet &resultSet, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, const QList<QByteArray> &properties, int offset, int batchSize); |
55 | 55 | ||
56 | void readEntity(const Sink::Storage::NamedDatabase &db, const QByteArray &key, | 56 | void readEntity(const Sink::Storage::NamedDatabase &db, const QByteArray &key, |
57 | const std::function<void(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &, Sink::Operation)> &resultCallback); | 57 | const std::function<void(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &, Sink::Operation)> &resultCallback); |
@@ -62,7 +62,7 @@ private: | |||
62 | ResultSet filterAndSortSet(ResultSet &resultSet, const std::function<bool(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> &filter, | 62 | ResultSet filterAndSortSet(ResultSet &resultSet, const std::function<bool(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> &filter, |
63 | const Sink::Storage::NamedDatabase &db, bool initialQuery, const QByteArray &sortProperty); | 63 | const Sink::Storage::NamedDatabase &db, bool initialQuery, const QByteArray &sortProperty); |
64 | std::function<bool(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> getFilter(const QSet<QByteArray> remainingFilters, const Sink::Query &query); | 64 | std::function<bool(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> getFilter(const QSet<QByteArray> remainingFilters, const Sink::Query &query); |
65 | qint64 load(const Sink::Query &query, const std::function<ResultSet(Sink::Storage::Transaction &, QSet<QByteArray> &, QByteArray &)> &baseSetRetriever, | 65 | QPair<qint64, qint64> load(const Sink::Query &query, const std::function<ResultSet(Sink::Storage::Transaction &, QSet<QByteArray> &, QByteArray &)> &baseSetRetriever, |
66 | Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, bool initialQuery, int offset, int batchSize); | 66 | Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, bool initialQuery, int offset, int batchSize); |
67 | 67 | ||
68 | private: | 68 | private: |
@@ -83,20 +83,20 @@ QueryRunner<DomainType>::QueryRunner(const Sink::Query &query, const Sink::Resou | |||
83 | if (query.limit && query.sortProperty.isEmpty()) { | 83 | if (query.limit && query.sortProperty.isEmpty()) { |
84 | Warning() << "A limited query without sorting is typically a bad idea."; | 84 | Warning() << "A limited query without sorting is typically a bad idea."; |
85 | } | 85 | } |
86 | // We delegate loading of initial data to the result provider, os it can decide for itself what it needs to load. | 86 | // We delegate loading of initial data to the result provider, so it can decide for itself what it needs to load. |
87 | mResultProvider->setFetcher([=](const typename DomainType::Ptr &parent) { | 87 | mResultProvider->setFetcher([=](const typename DomainType::Ptr &parent) { |
88 | Trace() << "Running fetcher. Offset: " << mOffset << " Batchsize: " << mBatchSize; | 88 | Trace() << "Running fetcher. Offset: " << mOffset << " Batchsize: " << mBatchSize; |
89 | auto resultProvider = mResultProvider; | 89 | auto resultProvider = mResultProvider; |
90 | async::run<qint64>([=]() -> qint64 { | 90 | async::run<QPair<qint64, qint64> >([=]() { |
91 | QueryWorker<DomainType> worker(query, instanceIdentifier, factory, bufferType, mResultTransformation); | 91 | QueryWorker<DomainType> worker(query, instanceIdentifier, factory, bufferType, mResultTransformation); |
92 | const qint64 newRevision = worker.executeInitialQuery(query, parent, *resultProvider, mOffset, mBatchSize); | 92 | const auto newRevisionAndReplayedEntities = worker.executeInitialQuery(query, parent, *resultProvider, mOffset, mBatchSize); |
93 | return newRevision; | 93 | return newRevisionAndReplayedEntities; |
94 | }) | 94 | }) |
95 | .template then<void, qint64>([query, this](qint64 newRevision) { | 95 | .template then<void, QPair<qint64, qint64>>([query, this](const QPair<qint64, qint64> &newRevisionAndReplayedEntities) { |
96 | mOffset += mBatchSize; | 96 | mOffset += newRevisionAndReplayedEntities.second; |
97 | // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. | 97 | // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. |
98 | if (query.liveQuery) { | 98 | if (query.liveQuery) { |
99 | mResourceAccess->sendRevisionReplayedCommand(newRevision); | 99 | mResourceAccess->sendRevisionReplayedCommand(newRevisionAndReplayedEntities.first); |
100 | } | 100 | } |
101 | }) | 101 | }) |
102 | .exec(); | 102 | .exec(); |
@@ -107,14 +107,14 @@ QueryRunner<DomainType>::QueryRunner(const Sink::Query &query, const Sink::Resou | |||
107 | // Incremental updates are always loaded directly, leaving it up to the result to discard the changes if they are not interesting | 107 | // Incremental updates are always loaded directly, leaving it up to the result to discard the changes if they are not interesting |
108 | setQuery([=]() -> KAsync::Job<void> { | 108 | setQuery([=]() -> KAsync::Job<void> { |
109 | auto resultProvider = mResultProvider; | 109 | auto resultProvider = mResultProvider; |
110 | return async::run<qint64>([=]() -> qint64 { | 110 | return async::run<QPair<qint64, qint64> >([=]() { |
111 | QueryWorker<DomainType> worker(query, instanceIdentifier, factory, bufferType, mResultTransformation); | 111 | QueryWorker<DomainType> worker(query, instanceIdentifier, factory, bufferType, mResultTransformation); |
112 | const qint64 newRevision = worker.executeIncrementalQuery(query, *resultProvider); | 112 | const auto newRevisionAndReplayedEntities = worker.executeIncrementalQuery(query, *resultProvider); |
113 | return newRevision; | 113 | return newRevisionAndReplayedEntities; |
114 | }) | 114 | }) |
115 | .template then<void, qint64>([query, this](qint64 newRevision) { | 115 | .template then<void, QPair<qint64, qint64> >([query, this](const QPair<qint64, qint64> &newRevisionAndReplayedEntities) { |
116 | // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. | 116 | // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. |
117 | mResourceAccess->sendRevisionReplayedCommand(newRevision); | 117 | mResourceAccess->sendRevisionReplayedCommand(newRevisionAndReplayedEntities.first); |
118 | }); | 118 | }); |
119 | }); | 119 | }); |
120 | // Ensure the connection is open, if it wasn't already opened | 120 | // Ensure the connection is open, if it wasn't already opened |
@@ -179,7 +179,7 @@ QueryWorker<DomainType>::~QueryWorker() | |||
179 | } | 179 | } |
180 | 180 | ||
181 | template <class DomainType> | 181 | template <class DomainType> |
182 | void QueryWorker<DomainType>::replaySet(ResultSet &resultSet, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, const QList<QByteArray> &properties, int offset, int batchSize) | 182 | qint64 QueryWorker<DomainType>::replaySet(ResultSet &resultSet, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, const QList<QByteArray> &properties, int offset, int batchSize) |
183 | { | 183 | { |
184 | Trace() << "Skipping over " << offset << " results"; | 184 | Trace() << "Skipping over " << offset << " results"; |
185 | resultSet.skip(offset); | 185 | resultSet.skip(offset); |
@@ -214,6 +214,7 @@ void QueryWorker<DomainType>::replaySet(ResultSet &resultSet, Sink::ResultProvid | |||
214 | }; | 214 | }; |
215 | Trace() << "Replayed " << counter << " results." | 215 | Trace() << "Replayed " << counter << " results." |
216 | << "Limit " << batchSize; | 216 | << "Limit " << batchSize; |
217 | return counter; | ||
217 | } | 218 | } |
218 | 219 | ||
219 | template <class DomainType> | 220 | template <class DomainType> |
@@ -394,7 +395,7 @@ QueryWorker<DomainType>::getFilter(const QSet<QByteArray> remainingFilters, cons | |||
394 | } | 395 | } |
395 | 396 | ||
396 | template <class DomainType> | 397 | template <class DomainType> |
397 | qint64 QueryWorker<DomainType>::load(const Sink::Query &query, const std::function<ResultSet(Sink::Storage::Transaction &, QSet<QByteArray> &, QByteArray &)> &baseSetRetriever, | 398 | QPair<qint64, qint64> QueryWorker<DomainType>::load(const Sink::Query &query, const std::function<ResultSet(Sink::Storage::Transaction &, QSet<QByteArray> &, QByteArray &)> &baseSetRetriever, |
398 | Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, bool initialQuery, int offset, int batchSize) | 399 | Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, bool initialQuery, int offset, int batchSize) |
399 | { | 400 | { |
400 | QTime time; | 401 | QTime time; |
@@ -411,29 +412,29 @@ qint64 QueryWorker<DomainType>::load(const Sink::Query &query, const std::functi | |||
411 | Trace() << "Base set retrieved. " << Log::TraceTime(time.elapsed()); | 412 | Trace() << "Base set retrieved. " << Log::TraceTime(time.elapsed()); |
412 | auto filteredSet = filterAndSortSet(resultSet, getFilter(remainingFilters, query), db, initialQuery, remainingSorting); | 413 | auto filteredSet = filterAndSortSet(resultSet, getFilter(remainingFilters, query), db, initialQuery, remainingSorting); |
413 | Trace() << "Filtered set retrieved. " << Log::TraceTime(time.elapsed()); | 414 | Trace() << "Filtered set retrieved. " << Log::TraceTime(time.elapsed()); |
414 | replaySet(filteredSet, resultProvider, query.requestedProperties, offset, batchSize); | 415 | auto replayedEntities = replaySet(filteredSet, resultProvider, query.requestedProperties, offset, batchSize); |
415 | Trace() << "Filtered set replayed. " << Log::TraceTime(time.elapsed()); | 416 | Trace() << "Filtered set replayed. " << Log::TraceTime(time.elapsed()); |
416 | resultProvider.setRevision(Sink::Storage::maxRevision(transaction)); | 417 | resultProvider.setRevision(Sink::Storage::maxRevision(transaction)); |
417 | return Sink::Storage::maxRevision(transaction); | 418 | return qMakePair(Sink::Storage::maxRevision(transaction), replayedEntities); |
418 | } | 419 | } |
419 | 420 | ||
420 | template <class DomainType> | 421 | template <class DomainType> |
421 | qint64 QueryWorker<DomainType>::executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider) | 422 | QPair<qint64, qint64> QueryWorker<DomainType>::executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider) |
422 | { | 423 | { |
423 | QTime time; | 424 | QTime time; |
424 | time.start(); | 425 | time.start(); |
425 | 426 | ||
426 | const qint64 baseRevision = resultProvider.revision() + 1; | 427 | const qint64 baseRevision = resultProvider.revision() + 1; |
427 | Trace() << "Running incremental query " << baseRevision; | 428 | Trace() << "Running incremental query " << baseRevision; |
428 | auto revision = load(query, [&](Sink::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters, QByteArray &remainingSorting) -> ResultSet { | 429 | auto revisionAndReplayedEntities = load(query, [&](Sink::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters, QByteArray &remainingSorting) -> ResultSet { |
429 | return loadIncrementalResultSet(baseRevision, query, transaction, remainingFilters); | 430 | return loadIncrementalResultSet(baseRevision, query, transaction, remainingFilters); |
430 | }, resultProvider, false, 0, 0); | 431 | }, resultProvider, false, 0, 0); |
431 | Trace() << "Incremental query took: " << Log::TraceTime(time.elapsed()); | 432 | Trace() << "Incremental query took: " << Log::TraceTime(time.elapsed()); |
432 | return revision; | 433 | return revisionAndReplayedEntities; |
433 | } | 434 | } |
434 | 435 | ||
435 | template <class DomainType> | 436 | template <class DomainType> |
436 | qint64 QueryWorker<DomainType>::executeInitialQuery( | 437 | QPair<qint64, qint64> QueryWorker<DomainType>::executeInitialQuery( |
437 | const Sink::Query &query, const typename DomainType::Ptr &parent, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, int offset, int batchsize) | 438 | const Sink::Query &query, const typename DomainType::Ptr &parent, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, int offset, int batchsize) |
438 | { | 439 | { |
439 | QTime time; | 440 | QTime time; |
@@ -449,12 +450,12 @@ qint64 QueryWorker<DomainType>::executeInitialQuery( | |||
449 | modifiedQuery.propertyFilter.insert(query.parentProperty, QVariant()); | 450 | modifiedQuery.propertyFilter.insert(query.parentProperty, QVariant()); |
450 | } | 451 | } |
451 | } | 452 | } |
452 | auto revision = load(modifiedQuery, [&](Sink::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters, QByteArray &remainingSorting) -> ResultSet { | 453 | auto revisionAndReplayedEntities = load(modifiedQuery, [&](Sink::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters, QByteArray &remainingSorting) -> ResultSet { |
453 | return loadInitialResultSet(modifiedQuery, transaction, remainingFilters, remainingSorting); | 454 | return loadInitialResultSet(modifiedQuery, transaction, remainingFilters, remainingSorting); |
454 | }, resultProvider, true, offset, batchsize); | 455 | }, resultProvider, true, offset, batchsize); |
455 | Trace() << "Initial query took: " << Log::TraceTime(time.elapsed()); | 456 | Trace() << "Initial query took: " << Log::TraceTime(time.elapsed()); |
456 | resultProvider.initialResultSetComplete(parent); | 457 | resultProvider.initialResultSetComplete(parent); |
457 | return revision; | 458 | return revisionAndReplayedEntities; |
458 | } | 459 | } |
459 | 460 | ||
460 | template class QueryRunner<Sink::ApplicationDomain::Folder>; | 461 | template class QueryRunner<Sink::ApplicationDomain::Folder>; |