diff options
author | Christian Mollekopf <chrigi_1@fastmail.fm> | 2017-12-28 12:05:34 +0100 |
---|---|---|
committer | Christian Mollekopf <chrigi_1@fastmail.fm> | 2018-01-02 13:31:14 +0100 |
commit | 625190d311adfcf3f0436cfece82249a92489348 (patch) | |
tree | 1010950e14b57560ae90abe20a5657750ad27137 /common/queryrunner.cpp | |
parent | 11b790ba6f06141db802273628ce2d191982677e (diff) | |
download | sink-625190d311adfcf3f0436cfece82249a92489348.tar.gz sink-625190d311adfcf3f0436cfece82249a92489348.zip |
No parent query
Diffstat (limited to 'common/queryrunner.cpp')
-rw-r--r-- | common/queryrunner.cpp | 22 |
1 files changed, 10 insertions, 12 deletions
diff --git a/common/queryrunner.cpp b/common/queryrunner.cpp index f3a9af8..288ce27 100644 --- a/common/queryrunner.cpp +++ b/common/queryrunner.cpp | |||
@@ -51,7 +51,7 @@ public: | |||
51 | virtual ~QueryWorker(); | 51 | virtual ~QueryWorker(); |
52 | 52 | ||
53 | ReplayResult executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, DataStoreQuery::State::Ptr state); | 53 | ReplayResult executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, DataStoreQuery::State::Ptr state); |
54 | ReplayResult executeInitialQuery(const Sink::Query &query, const typename DomainType::Ptr &parent, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, int batchsize, DataStoreQuery::State::Ptr state); | 54 | ReplayResult executeInitialQuery(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, int batchsize, DataStoreQuery::State::Ptr state); |
55 | 55 | ||
56 | private: | 56 | private: |
57 | void resultProviderCallback(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, const ResultSet::Result &result); | 57 | void resultProviderCallback(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, const ResultSet::Result &result); |
@@ -70,34 +70,33 @@ QueryRunner<DomainType>::QueryRunner(const Sink::Query &query, const Sink::Resou | |||
70 | SinkWarningCtx(mLogCtx) << "A limited query without sorting is typically a bad idea, because there is no telling what you're going to get."; | 70 | SinkWarningCtx(mLogCtx) << "A limited query without sorting is typically a bad idea, because there is no telling what you're going to get."; |
71 | } | 71 | } |
72 | auto guardPtr = QPointer<QObject>(&guard); | 72 | auto guardPtr = QPointer<QObject>(&guard); |
73 | auto fetcher = [=](const typename DomainType::Ptr &parent) { | 73 | auto fetcher = [=]() { |
74 | const QByteArray parentId = parent ? parent->identifier() : QByteArray(); | ||
75 | SinkTraceCtx(mLogCtx) << "Running fetcher. Batchsize: " << mBatchSize; | 74 | SinkTraceCtx(mLogCtx) << "Running fetcher. Batchsize: " << mBatchSize; |
76 | auto resultProvider = mResultProvider; | 75 | auto resultProvider = mResultProvider; |
77 | auto resultTransformation = mResultTransformation; | 76 | auto resultTransformation = mResultTransformation; |
78 | auto batchSize = mBatchSize; | 77 | auto batchSize = mBatchSize; |
79 | auto resourceContext = mResourceContext; | 78 | auto resourceContext = mResourceContext; |
80 | auto logCtx = mLogCtx; | 79 | auto logCtx = mLogCtx; |
81 | auto state = mQueryState.value(parentId); | 80 | auto state = mQueryState; |
82 | const bool runAsync = !query.synchronousQuery(); | 81 | const bool runAsync = !query.synchronousQuery(); |
83 | //The lambda will be executed in a separate thread, so copy all arguments | 82 | //The lambda will be executed in a separate thread, so copy all arguments |
84 | async::run<ReplayResult>([=]() { | 83 | async::run<ReplayResult>([=]() { |
85 | QueryWorker<DomainType> worker(query, resourceContext, bufferType, resultTransformation, logCtx); | 84 | QueryWorker<DomainType> worker(query, resourceContext, bufferType, resultTransformation, logCtx); |
86 | return worker.executeInitialQuery(query, parent, *resultProvider, batchSize, state); | 85 | return worker.executeInitialQuery(query, *resultProvider, batchSize, state); |
87 | }, runAsync) | 86 | }, runAsync) |
88 | .then([this, parentId, query, parent, resultProvider, guardPtr](const ReplayResult &result) { | 87 | .then([this, query, resultProvider, guardPtr](const ReplayResult &result) { |
89 | if (!guardPtr) { | 88 | if (!guardPtr) { |
90 | //Not an error, the query can vanish at any time. | 89 | //Not an error, the query can vanish at any time. |
91 | return; | 90 | return; |
92 | } | 91 | } |
93 | mInitialQueryComplete = true; | 92 | mInitialQueryComplete = true; |
94 | mQueryState[parentId] = result.queryState; | 93 | mQueryState = result.queryState; |
95 | // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. | 94 | // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. |
96 | if (query.liveQuery()) { | 95 | if (query.liveQuery()) { |
97 | mResourceAccess->sendRevisionReplayedCommand(result.newRevision); | 96 | mResourceAccess->sendRevisionReplayedCommand(result.newRevision); |
98 | } | 97 | } |
99 | resultProvider->setRevision(result.newRevision); | 98 | resultProvider->setRevision(result.newRevision); |
100 | resultProvider->initialResultSetComplete(parent, result.replayedAll); | 99 | resultProvider->initialResultSetComplete(result.replayedAll); |
101 | }) | 100 | }) |
102 | .exec(); | 101 | .exec(); |
103 | }; | 102 | }; |
@@ -110,14 +109,13 @@ QueryRunner<DomainType>::QueryRunner(const Sink::Query &query, const Sink::Resou | |||
110 | Q_ASSERT(!query.synchronousQuery()); | 109 | Q_ASSERT(!query.synchronousQuery()); |
111 | // Incremental updates are always loaded directly, leaving it up to the result to discard the changes if they are not interesting | 110 | // Incremental updates are always loaded directly, leaving it up to the result to discard the changes if they are not interesting |
112 | setQuery([=]() -> KAsync::Job<void> { | 111 | setQuery([=]() -> KAsync::Job<void> { |
113 | const QByteArray parentId; | ||
114 | auto resultProvider = mResultProvider; | 112 | auto resultProvider = mResultProvider; |
115 | auto resourceContext = mResourceContext; | 113 | auto resourceContext = mResourceContext; |
116 | auto logCtx = mLogCtx; | 114 | auto logCtx = mLogCtx; |
117 | auto state = mQueryState.value(parentId); | 115 | auto state = mQueryState; |
118 | if (!mInitialQueryComplete) { | 116 | if (!mInitialQueryComplete) { |
119 | SinkWarningCtx(mLogCtx) << "Can't start the incremental query before the initial query is complete"; | 117 | SinkWarningCtx(mLogCtx) << "Can't start the incremental query before the initial query is complete"; |
120 | fetcher({}); | 118 | fetcher(); |
121 | return KAsync::null(); | 119 | return KAsync::null(); |
122 | } | 120 | } |
123 | if (mQueryInProgress) { | 121 | if (mQueryInProgress) { |
@@ -240,7 +238,7 @@ ReplayResult QueryWorker<DomainType>::executeIncrementalQuery(const Sink::Query | |||
240 | 238 | ||
241 | template <class DomainType> | 239 | template <class DomainType> |
242 | ReplayResult QueryWorker<DomainType>::executeInitialQuery( | 240 | ReplayResult QueryWorker<DomainType>::executeInitialQuery( |
243 | const Sink::Query &query, const typename DomainType::Ptr &parent, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, int batchsize, DataStoreQuery::State::Ptr state) | 241 | const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, int batchsize, DataStoreQuery::State::Ptr state) |
244 | { | 242 | { |
245 | QTime time; | 243 | QTime time; |
246 | time.start(); | 244 | time.start(); |