summaryrefslogtreecommitdiffstats
path: root/common/queryrunner.cpp
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2016-06-14 13:26:21 +0200
committerChristian Mollekopf <chrigi_1@fastmail.fm>2016-06-15 07:12:13 +0200
commitbb70bdcd0eaf72ffc304536267a66c5de5eaf2e9 (patch)
tree659702de6c71ea8f6e15836e56fd44be625ab553 /common/queryrunner.cpp
parentbd64dd286ddc0c5732b9977f78dc945ac40f5b4f (diff)
downloadsink-bb70bdcd0eaf72ffc304536267a66c5de5eaf2e9.tar.gz
sink-bb70bdcd0eaf72ffc304536267a66c5de5eaf2e9.zip
Synchronous API
Diffstat (limited to 'common/queryrunner.cpp')
-rw-r--r--common/queryrunner.cpp31
1 files changed, 19 insertions, 12 deletions
diff --git a/common/queryrunner.cpp b/common/queryrunner.cpp
index c6a6b86..f2a7753 100644
--- a/common/queryrunner.cpp
+++ b/common/queryrunner.cpp
@@ -78,23 +78,31 @@ QueryRunner<DomainType>::QueryRunner(const Sink::Query &query, const Sink::Resou
78 const QByteArray parentId = parent ? parent->identifier() : QByteArray(); 78 const QByteArray parentId = parent ? parent->identifier() : QByteArray();
79 Trace() << "Running fetcher. Offset: " << mOffset[parentId] << " Batchsize: " << mBatchSize; 79 Trace() << "Running fetcher. Offset: " << mOffset[parentId] << " Batchsize: " << mBatchSize;
80 auto resultProvider = mResultProvider; 80 auto resultProvider = mResultProvider;
81 async::run<QPair<qint64, qint64> >([=]() { 81 if (query.synchronousQuery) {
82 QueryWorker<DomainType> worker(query, instanceIdentifier, factory, bufferType, mResultTransformation); 82 QueryWorker<DomainType> worker(query, instanceIdentifier, factory, bufferType, mResultTransformation);
83 const auto newRevisionAndReplayedEntities = worker.executeInitialQuery(query, parent, *resultProvider, mOffset[parentId], mBatchSize); 83 worker.executeInitialQuery(query, parent, *resultProvider, mOffset[parentId], mBatchSize);
84 return newRevisionAndReplayedEntities; 84 resultProvider->initialResultSetComplete(parent);
85 }) 85 } else {
86 .template then<void, QPair<qint64, qint64>>([=](const QPair<qint64, qint64> &newRevisionAndReplayedEntities) { 86 async::run<QPair<qint64, qint64> >([=]() {
87 mOffset[parentId] += newRevisionAndReplayedEntities.second; 87 QueryWorker<DomainType> worker(query, instanceIdentifier, factory, bufferType, mResultTransformation);
88 // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. 88 const auto newRevisionAndReplayedEntities = worker.executeInitialQuery(query, parent, *resultProvider, mOffset[parentId], mBatchSize);
89 if (query.liveQuery) { 89 return newRevisionAndReplayedEntities;
90 mResourceAccess->sendRevisionReplayedCommand(newRevisionAndReplayedEntities.first);
91 }
92 }) 90 })
93 .exec(); 91 .template then<void, QPair<qint64, qint64>>([=](const QPair<qint64, qint64> &newRevisionAndReplayedEntities) {
92 mOffset[parentId] += newRevisionAndReplayedEntities.second;
93 // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise.
94 if (query.liveQuery) {
95 mResourceAccess->sendRevisionReplayedCommand(newRevisionAndReplayedEntities.first);
96 }
97 resultProvider->initialResultSetComplete(parent);
98 })
99 .exec();
100 }
94 }); 101 });
95 102
96 // In case of a live query we keep the runner for as long alive as the result provider exists 103 // In case of a live query we keep the runner for as long alive as the result provider exists
97 if (query.liveQuery) { 104 if (query.liveQuery) {
105 Q_ASSERT(!query.synchronousQuery);
98 // Incremental updates are always loaded directly, leaving it up to the result to discard the changes if they are not interesting 106 // Incremental updates are always loaded directly, leaving it up to the result to discard the changes if they are not interesting
99 setQuery([=]() -> KAsync::Job<void> { 107 setQuery([=]() -> KAsync::Job<void> {
100 auto resultProvider = mResultProvider; 108 auto resultProvider = mResultProvider;
@@ -233,7 +241,6 @@ QPair<qint64, qint64> QueryWorker<DomainType>::executeInitialQuery(
233 Sink::EntityReader<DomainType> reader(*mDomainTypeAdaptorFactory, mResourceInstanceIdentifier, transaction); 241 Sink::EntityReader<DomainType> reader(*mDomainTypeAdaptorFactory, mResourceInstanceIdentifier, transaction);
234 auto revisionAndReplayedEntities = reader.executeInitialQuery(modifiedQuery, offset, batchsize, resultProviderCallback(query, resultProvider)); 242 auto revisionAndReplayedEntities = reader.executeInitialQuery(modifiedQuery, offset, batchsize, resultProviderCallback(query, resultProvider));
235 Trace() << "Initial query took: " << Log::TraceTime(time.elapsed()); 243 Trace() << "Initial query took: " << Log::TraceTime(time.elapsed());
236 resultProvider.initialResultSetComplete(parent);
237 return revisionAndReplayedEntities; 244 return revisionAndReplayedEntities;
238} 245}
239 246