summaryrefslogtreecommitdiffstats
path: root/common/queryrunner.cpp
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2017-12-28 12:05:34 +0100
committerChristian Mollekopf <chrigi_1@fastmail.fm>2018-01-02 13:31:14 +0100
commit625190d311adfcf3f0436cfece82249a92489348 (patch)
tree1010950e14b57560ae90abe20a5657750ad27137 /common/queryrunner.cpp
parent11b790ba6f06141db802273628ce2d191982677e (diff)
downloadsink-625190d311adfcf3f0436cfece82249a92489348.tar.gz
sink-625190d311adfcf3f0436cfece82249a92489348.zip
No parent query
Diffstat (limited to 'common/queryrunner.cpp')
-rw-r--r--common/queryrunner.cpp22
1 files changed, 10 insertions, 12 deletions
diff --git a/common/queryrunner.cpp b/common/queryrunner.cpp
index f3a9af8..288ce27 100644
--- a/common/queryrunner.cpp
+++ b/common/queryrunner.cpp
@@ -51,7 +51,7 @@ public:
51 virtual ~QueryWorker(); 51 virtual ~QueryWorker();
52 52
53 ReplayResult executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, DataStoreQuery::State::Ptr state); 53 ReplayResult executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, DataStoreQuery::State::Ptr state);
54 ReplayResult executeInitialQuery(const Sink::Query &query, const typename DomainType::Ptr &parent, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, int batchsize, DataStoreQuery::State::Ptr state); 54 ReplayResult executeInitialQuery(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, int batchsize, DataStoreQuery::State::Ptr state);
55 55
56private: 56private:
57 void resultProviderCallback(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, const ResultSet::Result &result); 57 void resultProviderCallback(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, const ResultSet::Result &result);
@@ -70,34 +70,33 @@ QueryRunner<DomainType>::QueryRunner(const Sink::Query &query, const Sink::Resou
70 SinkWarningCtx(mLogCtx) << "A limited query without sorting is typically a bad idea, because there is no telling what you're going to get."; 70 SinkWarningCtx(mLogCtx) << "A limited query without sorting is typically a bad idea, because there is no telling what you're going to get.";
71 } 71 }
72 auto guardPtr = QPointer<QObject>(&guard); 72 auto guardPtr = QPointer<QObject>(&guard);
73 auto fetcher = [=](const typename DomainType::Ptr &parent) { 73 auto fetcher = [=]() {
74 const QByteArray parentId = parent ? parent->identifier() : QByteArray();
75 SinkTraceCtx(mLogCtx) << "Running fetcher. Batchsize: " << mBatchSize; 74 SinkTraceCtx(mLogCtx) << "Running fetcher. Batchsize: " << mBatchSize;
76 auto resultProvider = mResultProvider; 75 auto resultProvider = mResultProvider;
77 auto resultTransformation = mResultTransformation; 76 auto resultTransformation = mResultTransformation;
78 auto batchSize = mBatchSize; 77 auto batchSize = mBatchSize;
79 auto resourceContext = mResourceContext; 78 auto resourceContext = mResourceContext;
80 auto logCtx = mLogCtx; 79 auto logCtx = mLogCtx;
81 auto state = mQueryState.value(parentId); 80 auto state = mQueryState;
82 const bool runAsync = !query.synchronousQuery(); 81 const bool runAsync = !query.synchronousQuery();
83 //The lambda will be executed in a separate thread, so copy all arguments 82 //The lambda will be executed in a separate thread, so copy all arguments
84 async::run<ReplayResult>([=]() { 83 async::run<ReplayResult>([=]() {
85 QueryWorker<DomainType> worker(query, resourceContext, bufferType, resultTransformation, logCtx); 84 QueryWorker<DomainType> worker(query, resourceContext, bufferType, resultTransformation, logCtx);
86 return worker.executeInitialQuery(query, parent, *resultProvider, batchSize, state); 85 return worker.executeInitialQuery(query, *resultProvider, batchSize, state);
87 }, runAsync) 86 }, runAsync)
88 .then([this, parentId, query, parent, resultProvider, guardPtr](const ReplayResult &result) { 87 .then([this, query, resultProvider, guardPtr](const ReplayResult &result) {
89 if (!guardPtr) { 88 if (!guardPtr) {
90 //Not an error, the query can vanish at any time. 89 //Not an error, the query can vanish at any time.
91 return; 90 return;
92 } 91 }
93 mInitialQueryComplete = true; 92 mInitialQueryComplete = true;
94 mQueryState[parentId] = result.queryState; 93 mQueryState = result.queryState;
95 // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. 94 // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise.
96 if (query.liveQuery()) { 95 if (query.liveQuery()) {
97 mResourceAccess->sendRevisionReplayedCommand(result.newRevision); 96 mResourceAccess->sendRevisionReplayedCommand(result.newRevision);
98 } 97 }
99 resultProvider->setRevision(result.newRevision); 98 resultProvider->setRevision(result.newRevision);
100 resultProvider->initialResultSetComplete(parent, result.replayedAll); 99 resultProvider->initialResultSetComplete(result.replayedAll);
101 }) 100 })
102 .exec(); 101 .exec();
103 }; 102 };
@@ -110,14 +109,13 @@ QueryRunner<DomainType>::QueryRunner(const Sink::Query &query, const Sink::Resou
110 Q_ASSERT(!query.synchronousQuery()); 109 Q_ASSERT(!query.synchronousQuery());
111 // Incremental updates are always loaded directly, leaving it up to the result to discard the changes if they are not interesting 110 // Incremental updates are always loaded directly, leaving it up to the result to discard the changes if they are not interesting
112 setQuery([=]() -> KAsync::Job<void> { 111 setQuery([=]() -> KAsync::Job<void> {
113 const QByteArray parentId;
114 auto resultProvider = mResultProvider; 112 auto resultProvider = mResultProvider;
115 auto resourceContext = mResourceContext; 113 auto resourceContext = mResourceContext;
116 auto logCtx = mLogCtx; 114 auto logCtx = mLogCtx;
117 auto state = mQueryState.value(parentId); 115 auto state = mQueryState;
118 if (!mInitialQueryComplete) { 116 if (!mInitialQueryComplete) {
119 SinkWarningCtx(mLogCtx) << "Can't start the incremental query before the initial query is complete"; 117 SinkWarningCtx(mLogCtx) << "Can't start the incremental query before the initial query is complete";
120 fetcher({}); 118 fetcher();
121 return KAsync::null(); 119 return KAsync::null();
122 } 120 }
123 if (mQueryInProgress) { 121 if (mQueryInProgress) {
@@ -240,7 +238,7 @@ ReplayResult QueryWorker<DomainType>::executeIncrementalQuery(const Sink::Query
240 238
241template <class DomainType> 239template <class DomainType>
242ReplayResult QueryWorker<DomainType>::executeInitialQuery( 240ReplayResult QueryWorker<DomainType>::executeInitialQuery(
243 const Sink::Query &query, const typename DomainType::Ptr &parent, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, int batchsize, DataStoreQuery::State::Ptr state) 241 const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, int batchsize, DataStoreQuery::State::Ptr state)
244{ 242{
245 QTime time; 243 QTime time;
246 time.start(); 244 time.start();