summaryrefslogtreecommitdiffstats
path: root/common/queryrunner.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'common/queryrunner.cpp')
-rw-r--r--common/queryrunner.cpp153
1 files changed, 86 insertions, 67 deletions
diff --git a/common/queryrunner.cpp b/common/queryrunner.cpp
index 928e1e0..0ed4cb5 100644
--- a/common/queryrunner.cpp
+++ b/common/queryrunner.cpp
@@ -69,79 +69,14 @@ QueryRunner<DomainType>::QueryRunner(const Sink::Query &query, const Sink::Resou
69 if (query.limit() && query.sortProperty().isEmpty()) { 69 if (query.limit() && query.sortProperty().isEmpty()) {
70 SinkWarningCtx(mLogCtx) << "A limited query without sorting is typically a bad idea, because there is no telling what you're going to get."; 70 SinkWarningCtx(mLogCtx) << "A limited query without sorting is typically a bad idea, because there is no telling what you're going to get.";
71 } 71 }
72 auto guardPtr = QPointer<QObject>(&guard);
73 auto fetcher = [=]() {
74 SinkTraceCtx(mLogCtx) << "Running fetcher. Batchsize: " << mBatchSize;
75 auto resultProvider = mResultProvider;
76 auto resultTransformation = mResultTransformation;
77 auto batchSize = mBatchSize;
78 auto resourceContext = mResourceContext;
79 auto logCtx = mLogCtx;
80 auto state = mQueryState;
81 const bool runAsync = !query.synchronousQuery();
82 //The lambda will be executed in a separate thread, so copy all arguments
83 async::run<ReplayResult>([=]() {
84 QueryWorker<DomainType> worker(query, resourceContext, bufferType, resultTransformation, logCtx);
85 return worker.executeInitialQuery(query, *resultProvider, batchSize, state);
86 }, runAsync)
87 .then([this, query, resultProvider, guardPtr](const ReplayResult &result) {
88 if (!guardPtr) {
89 //Not an error, the query can vanish at any time.
90 return;
91 }
92 mInitialQueryComplete = true;
93 mQueryState = result.queryState;
94 // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise.
95 if (query.liveQuery()) {
96 mResourceAccess->sendRevisionReplayedCommand(result.newRevision);
97 }
98 resultProvider->setRevision(result.newRevision);
99 resultProvider->initialResultSetComplete(result.replayedAll);
100 })
101 .exec();
102 };
103
104 // We delegate loading of initial data to the result provider, so it can decide for itself what it needs to load. 72 // We delegate loading of initial data to the result provider, so it can decide for itself what it needs to load.
105 mResultProvider->setFetcher(fetcher); 73 mResultProvider->setFetcher([this, query, bufferType] { fetch(query, bufferType); });
106 74
107 // In case of a live query we keep the runner for as long alive as the result provider exists 75 // In case of a live query we keep the runner for as long alive as the result provider exists
108 if (query.liveQuery()) { 76 if (query.liveQuery()) {
109 Q_ASSERT(!query.synchronousQuery()); 77 Q_ASSERT(!query.synchronousQuery());
110 // Incremental updates are always loaded directly, leaving it up to the result to discard the changes if they are not interesting 78 // Incremental updates are always loaded directly, leaving it up to the result to discard the changes if they are not interesting
111 setQuery([=]() -> KAsync::Job<void> { 79 setQuery([=]() { return incrementalFetch(query, bufferType); });
112 auto resultProvider = mResultProvider;
113 auto resourceContext = mResourceContext;
114 auto logCtx = mLogCtx;
115 auto state = mQueryState;
116 auto resultTransformation = mResultTransformation;
117 if (!mInitialQueryComplete) {
118 SinkWarningCtx(mLogCtx) << "Can't start the incremental query before the initial query is complete";
119 fetcher();
120 return KAsync::null();
121 }
122 if (mQueryInProgress) {
123 //Can happen if the revision come in quicker than we process them.
124 return KAsync::null();
125 }
126 Q_ASSERT(!mQueryInProgress);
127 return KAsync::start([&] {
128 mQueryInProgress = true;
129 })
130 .then(async::run<ReplayResult>([=]() {
131 QueryWorker<DomainType> worker(query, resourceContext, bufferType, resultTransformation, logCtx);
132 return worker.executeIncrementalQuery(query, *resultProvider, state);
133 }))
134 .then([query, this, resultProvider, guardPtr](const ReplayResult &newRevisionAndReplayedEntities) {
135 if (!guardPtr) {
136 //Not an error, the query can vanish at any time.
137 return;
138 }
139 mQueryInProgress = false;
140 // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise.
141 mResourceAccess->sendRevisionReplayedCommand(newRevisionAndReplayedEntities.newRevision);
142 resultProvider->setRevision(newRevisionAndReplayedEntities.newRevision);
143 });
144 });
145 // Ensure the connection is open, if it wasn't already opened 80 // Ensure the connection is open, if it wasn't already opened
146 // TODO If we are not connected already, we have to check for the latest revision once connected, otherwise we could miss some updates 81 // TODO If we are not connected already, we have to check for the latest revision once connected, otherwise we could miss some updates
147 mResourceAccess->open(); 82 mResourceAccess->open();
@@ -158,6 +93,90 @@ QueryRunner<DomainType>::~QueryRunner()
158 SinkTraceCtx(mLogCtx) << "Stopped query"; 93 SinkTraceCtx(mLogCtx) << "Stopped query";
159} 94}
160 95
96//This function triggers the initial fetch, and then subsequent calls will simply fetch more data of mBatchSize.
97template <class DomainType>
98void QueryRunner<DomainType>::fetch(const Sink::Query &query, const QByteArray &bufferType)
99{
100 auto guardPtr = QPointer<QObject>(&guard);
101 SinkTraceCtx(mLogCtx) << "Running fetcher. Batchsize: " << mBatchSize;
102 if (mQueryInProgress) {
103 SinkTraceCtx(mLogCtx) << "Query is already in progress, postponing: " << mBatchSize;
104 mRequestFetchMore = true;
105 return;
106 }
107 mQueryInProgress = true;
108 auto resultProvider = mResultProvider;
109 auto resultTransformation = mResultTransformation;
110 auto batchSize = mBatchSize;
111 auto resourceContext = mResourceContext;
112 auto logCtx = mLogCtx;
113 auto state = mQueryState;
114 const bool runAsync = !query.synchronousQuery();
115 //The lambda will be executed in a separate thread, so copy all arguments
116 async::run<ReplayResult>([=]() {
117 QueryWorker<DomainType> worker(query, resourceContext, bufferType, resultTransformation, logCtx);
118 return worker.executeInitialQuery(query, *resultProvider, batchSize, state);
119 }, runAsync)
120 .then([=](const ReplayResult &result) {
121 if (!guardPtr) {
122 //Not an error, the query can vanish at any time.
123 return;
124 }
125 mInitialQueryComplete = true;
126 mQueryInProgress = false;
127 mQueryState = result.queryState;
128 // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise.
129 if (query.liveQuery()) {
130 mResourceAccess->sendRevisionReplayedCommand(result.newRevision);
131 }
132 resultProvider->setRevision(result.newRevision);
133 resultProvider->initialResultSetComplete(result.replayedAll);
134 if (mRequestFetchMore) {
135 mRequestFetchMore = false;
136 fetch(query, bufferType);
137 }
138 })
139 .exec();
140}
141
142template <class DomainType>
143KAsync::Job<void> QueryRunner<DomainType>::incrementalFetch(const Sink::Query &query, const QByteArray &bufferType)
144{
145 if (!mInitialQueryComplete) {
146 SinkWarningCtx(mLogCtx) << "Can't start the incremental query before the initial query is complete";
147 fetch(query, bufferType);
148 return KAsync::null();
149 }
150 if (mQueryInProgress) {
151 //Can happen if the revision come in quicker than we process them.
152 return KAsync::null();
153 }
154 auto resultProvider = mResultProvider;
155 auto resourceContext = mResourceContext;
156 auto logCtx = mLogCtx;
157 auto state = mQueryState;
158 auto resultTransformation = mResultTransformation;
159 Q_ASSERT(!mQueryInProgress);
160 auto guardPtr = QPointer<QObject>(&guard);
161 return KAsync::start([&] {
162 mQueryInProgress = true;
163 })
164 .then(async::run<ReplayResult>([=]() {
165 QueryWorker<DomainType> worker(query, resourceContext, bufferType, resultTransformation, logCtx);
166 return worker.executeIncrementalQuery(query, *resultProvider, state);
167 }))
168 .then([query, this, resultProvider, guardPtr](const ReplayResult &newRevisionAndReplayedEntities) {
169 if (!guardPtr) {
170 //Not an error, the query can vanish at any time.
171 return;
172 }
173 mQueryInProgress = false;
174 // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise.
175 mResourceAccess->sendRevisionReplayedCommand(newRevisionAndReplayedEntities.newRevision);
176 resultProvider->setRevision(newRevisionAndReplayedEntities.newRevision);
177 });
178}
179
161template <class DomainType> 180template <class DomainType>
162void QueryRunner<DomainType>::setResultTransformation(const ResultTransformation &transformation) 181void QueryRunner<DomainType>::setResultTransformation(const ResultTransformation &transformation)
163{ 182{