summaryrefslogtreecommitdiffstats
path: root/common/queryrunner.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'common/queryrunner.cpp')
-rw-r--r--common/queryrunner.cpp50
1 files changed, 25 insertions, 25 deletions
diff --git a/common/queryrunner.cpp b/common/queryrunner.cpp
index 377e3b9..ab4d60b 100644
--- a/common/queryrunner.cpp
+++ b/common/queryrunner.cpp
@@ -47,10 +47,8 @@ template <typename DomainType>
47class QueryWorker : public QObject 47class QueryWorker : public QObject
48{ 48{
49 typedef std::function<bool(const typename DomainType::Ptr &domainObject, Sink::Operation operation, const QMap<QByteArray, QVariant> &aggregateValues)> ResultCallback; 49 typedef std::function<bool(const typename DomainType::Ptr &domainObject, Sink::Operation operation, const QMap<QByteArray, QVariant> &aggregateValues)> ResultCallback;
50 // SINK_DEBUG_COMPONENT(mResourceInstanceIdentifier, mId)
51 SINK_DEBUG_COMPONENT(mResourceContext.resourceInstanceIdentifier)
52public: 50public:
53 QueryWorker(const Sink::Query &query, const ResourceContext &context, const QByteArray &bufferType, const QueryRunnerBase::ResultTransformation &transformation); 51 QueryWorker(const Sink::Query &query, const ResourceContext &context, const QByteArray &bufferType, const QueryRunnerBase::ResultTransformation &transformation, const Sink::Log::Context &logCtx);
54 virtual ~QueryWorker(); 52 virtual ~QueryWorker();
55 53
56 ReplayResult executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider); 54 ReplayResult executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider);
@@ -61,14 +59,14 @@ private:
61 59
62 QueryRunnerBase::ResultTransformation mResultTransformation; 60 QueryRunnerBase::ResultTransformation mResultTransformation;
63 ResourceContext mResourceContext; 61 ResourceContext mResourceContext;
64 QByteArray mId; //Used for identification in debug output 62 Sink::Log::Context mLogCtx;
65}; 63};
66 64
67template <class DomainType> 65template <class DomainType>
68QueryRunner<DomainType>::QueryRunner(const Sink::Query &query, const Sink::ResourceContext &context, const QByteArray &bufferType) 66QueryRunner<DomainType>::QueryRunner(const Sink::Query &query, const Sink::ResourceContext &context, const QByteArray &bufferType, const Sink::Log::Context &logCtx)
69 : QueryRunnerBase(), mResourceContext(context), mResourceAccess(mResourceContext.resourceAccess()), mResultProvider(new ResultProvider<typename DomainType::Ptr>), mBatchSize(query.limit()) 67 : QueryRunnerBase(), mResourceContext(context), mResourceAccess(mResourceContext.resourceAccess()), mResultProvider(new ResultProvider<typename DomainType::Ptr>), mBatchSize(query.limit()), mLogCtx(logCtx.subContext("queryrunner"))
70{ 68{
71 SinkTrace() << "Starting query. Is live:" << query.liveQuery() << " Limit: " << query.limit(); 69 SinkTraceCtx(mLogCtx) << "Starting query. Is live:" << query.liveQuery() << " Limit: " << query.limit();
72 if (query.limit() && query.sortProperty().isEmpty()) { 70 if (query.limit() && query.sortProperty().isEmpty()) {
73 SinkWarning() << "A limited query without sorting is typically a bad idea, because there is no telling what you're going to get."; 71 SinkWarning() << "A limited query without sorting is typically a bad idea, because there is no telling what you're going to get.";
74 } 72 }
@@ -76,10 +74,10 @@ QueryRunner<DomainType>::QueryRunner(const Sink::Query &query, const Sink::Resou
76 // We delegate loading of initial data to the result provider, so it can decide for itself what it needs to load. 74 // We delegate loading of initial data to the result provider, so it can decide for itself what it needs to load.
77 mResultProvider->setFetcher([=](const typename DomainType::Ptr &parent) { 75 mResultProvider->setFetcher([=](const typename DomainType::Ptr &parent) {
78 const QByteArray parentId = parent ? parent->identifier() : QByteArray(); 76 const QByteArray parentId = parent ? parent->identifier() : QByteArray();
79 SinkTrace() << "Running fetcher. Offset: " << mOffset[parentId] << " Batchsize: " << mBatchSize; 77 SinkTraceCtx(mLogCtx) << "Running fetcher. Offset: " << mOffset[parentId] << " Batchsize: " << mBatchSize;
80 auto resultProvider = mResultProvider; 78 auto resultProvider = mResultProvider;
81 if (query.synchronousQuery()) { 79 if (query.synchronousQuery()) {
82 QueryWorker<DomainType> worker(query, mResourceContext, bufferType, mResultTransformation); 80 QueryWorker<DomainType> worker(query, mResourceContext, bufferType, mResultTransformation, mLogCtx);
83 const auto newRevisionAndReplayedEntities = worker.executeInitialQuery(query, parent, *resultProvider, mOffset[parentId], mBatchSize); 81 const auto newRevisionAndReplayedEntities = worker.executeInitialQuery(query, parent, *resultProvider, mOffset[parentId], mBatchSize);
84 mOffset[parentId] += newRevisionAndReplayedEntities.replayedEntities; 82 mOffset[parentId] += newRevisionAndReplayedEntities.replayedEntities;
85 resultProvider->setRevision(newRevisionAndReplayedEntities.newRevision); 83 resultProvider->setRevision(newRevisionAndReplayedEntities.newRevision);
@@ -89,9 +87,10 @@ QueryRunner<DomainType>::QueryRunner(const Sink::Query &query, const Sink::Resou
89 auto offset = mOffset[parentId]; 87 auto offset = mOffset[parentId];
90 auto batchSize = mBatchSize; 88 auto batchSize = mBatchSize;
91 auto resourceContext = mResourceContext; 89 auto resourceContext = mResourceContext;
90 auto logCtx = mLogCtx;
92 //The lambda will be executed in a separate thread, so copy all arguments 91 //The lambda will be executed in a separate thread, so copy all arguments
93 async::run<ReplayResult>([resultTransformation, offset, batchSize, query, bufferType, resourceContext, resultProvider, parent]() { 92 async::run<ReplayResult>([resultTransformation, offset, batchSize, query, bufferType, resourceContext, resultProvider, parent, logCtx]() {
94 QueryWorker<DomainType> worker(query, resourceContext, bufferType, resultTransformation); 93 QueryWorker<DomainType> worker(query, resourceContext, bufferType, resultTransformation, logCtx);
95 const auto newRevisionAndReplayedEntities = worker.executeInitialQuery(query, parent, *resultProvider, offset, batchSize); 94 const auto newRevisionAndReplayedEntities = worker.executeInitialQuery(query, parent, *resultProvider, offset, batchSize);
96 return newRevisionAndReplayedEntities; 95 return newRevisionAndReplayedEntities;
97 }) 96 })
@@ -119,8 +118,9 @@ QueryRunner<DomainType>::QueryRunner(const Sink::Query &query, const Sink::Resou
119 setQuery([=]() -> KAsync::Job<void> { 118 setQuery([=]() -> KAsync::Job<void> {
120 auto resultProvider = mResultProvider; 119 auto resultProvider = mResultProvider;
121 auto resourceContext = mResourceContext; 120 auto resourceContext = mResourceContext;
121 auto logCtx = mLogCtx;
122 return async::run<ReplayResult>([=]() { 122 return async::run<ReplayResult>([=]() {
123 QueryWorker<DomainType> worker(query, resourceContext, bufferType, mResultTransformation); 123 QueryWorker<DomainType> worker(query, resourceContext, bufferType, mResultTransformation, logCtx);
124 const auto newRevisionAndReplayedEntities = worker.executeIncrementalQuery(query, *resultProvider); 124 const auto newRevisionAndReplayedEntities = worker.executeIncrementalQuery(query, *resultProvider);
125 return newRevisionAndReplayedEntities; 125 return newRevisionAndReplayedEntities;
126 }) 126 })
@@ -147,7 +147,7 @@ QueryRunner<DomainType>::QueryRunner(const Sink::Query &query, const Sink::Resou
147template <class DomainType> 147template <class DomainType>
148QueryRunner<DomainType>::~QueryRunner() 148QueryRunner<DomainType>::~QueryRunner()
149{ 149{
150 SinkTrace() << "Stopped query"; 150 SinkTraceCtx(mLogCtx) << "Stopped query";
151} 151}
152 152
153template <class DomainType> 153template <class DomainType>
@@ -164,16 +164,16 @@ typename Sink::ResultEmitter<typename DomainType::Ptr>::Ptr QueryRunner<DomainTy
164 164
165template <class DomainType> 165template <class DomainType>
166QueryWorker<DomainType>::QueryWorker(const Sink::Query &query, const Sink::ResourceContext &resourceContext, 166QueryWorker<DomainType>::QueryWorker(const Sink::Query &query, const Sink::ResourceContext &resourceContext,
167 const QByteArray &bufferType, const QueryRunnerBase::ResultTransformation &transformation) 167 const QByteArray &bufferType, const QueryRunnerBase::ResultTransformation &transformation, const Sink::Log::Context &logCtx)
168 : QObject(), mResultTransformation(transformation), mResourceContext(resourceContext), mId(QUuid::createUuid().toByteArray()) 168 : QObject(), mResultTransformation(transformation), mResourceContext(resourceContext), mLogCtx(logCtx.subContext("worker"))
169{ 169{
170 SinkTrace() << "Starting query worker"; 170 SinkTraceCtx(mLogCtx) << "Starting query worker";
171} 171}
172 172
173template <class DomainType> 173template <class DomainType>
174QueryWorker<DomainType>::~QueryWorker() 174QueryWorker<DomainType>::~QueryWorker()
175{ 175{
176 SinkTrace() << "Stopped query worker"; 176 SinkTraceCtx(mLogCtx) << "Stopped query worker";
177} 177}
178 178
179template <class DomainType> 179template <class DomainType>
@@ -209,15 +209,15 @@ ReplayResult QueryWorker<DomainType>::executeIncrementalQuery(const Sink::Query
209 time.start(); 209 time.start();
210 210
211 const qint64 baseRevision = resultProvider.revision() + 1; 211 const qint64 baseRevision = resultProvider.revision() + 1;
212 auto entityStore = EntityStore{mResourceContext}; 212 auto entityStore = EntityStore{mResourceContext, mLogCtx};
213 auto preparedQuery = DataStoreQuery{query, ApplicationDomain::getTypeName<DomainType>(), entityStore}; 213 auto preparedQuery = DataStoreQuery{query, ApplicationDomain::getTypeName<DomainType>(), entityStore};
214 auto resultSet = preparedQuery.update(baseRevision); 214 auto resultSet = preparedQuery.update(baseRevision);
215 SinkTrace() << "Filtered set retrieved. " << Log::TraceTime(time.elapsed()); 215 SinkTraceCtx(mLogCtx) << "Filtered set retrieved. " << Log::TraceTime(time.elapsed());
216 auto replayResult = resultSet.replaySet(0, 0, [this, query, &resultProvider](const ResultSet::Result &result) { 216 auto replayResult = resultSet.replaySet(0, 0, [this, query, &resultProvider](const ResultSet::Result &result) {
217 resultProviderCallback(query, resultProvider, result); 217 resultProviderCallback(query, resultProvider, result);
218 }); 218 });
219 219
220 SinkTrace() << "Incremental query took: " << Log::TraceTime(time.elapsed()); 220 SinkTraceCtx(mLogCtx) << "Incremental query took: " << Log::TraceTime(time.elapsed());
221 return {entityStore.maxRevision(), replayResult.replayedEntities, replayResult.replayedAll}; 221 return {entityStore.maxRevision(), replayResult.replayedEntities, replayResult.replayedAll};
222} 222}
223 223
@@ -231,24 +231,24 @@ ReplayResult QueryWorker<DomainType>::executeInitialQuery(
231 auto modifiedQuery = query; 231 auto modifiedQuery = query;
232 if (!query.parentProperty().isEmpty()) { 232 if (!query.parentProperty().isEmpty()) {
233 if (parent) { 233 if (parent) {
234 SinkTrace() << "Running initial query for parent:" << parent->identifier(); 234 SinkTraceCtx(mLogCtx) << "Running initial query for parent:" << parent->identifier();
235 modifiedQuery.filter(query.parentProperty(), Query::Comparator(QVariant::fromValue(Sink::ApplicationDomain::Reference{parent->identifier()}))); 235 modifiedQuery.filter(query.parentProperty(), Query::Comparator(QVariant::fromValue(Sink::ApplicationDomain::Reference{parent->identifier()})));
236 } else { 236 } else {
237 SinkTrace() << "Running initial query for toplevel"; 237 SinkTraceCtx(mLogCtx) << "Running initial query for toplevel";
238 modifiedQuery.filter(query.parentProperty(), Query::Comparator(QVariant{})); 238 modifiedQuery.filter(query.parentProperty(), Query::Comparator(QVariant{}));
239 } 239 }
240 } 240 }
241 241
242 auto entityStore = EntityStore{mResourceContext}; 242 auto entityStore = EntityStore{mResourceContext, mLogCtx};
243 auto preparedQuery = DataStoreQuery{modifiedQuery, ApplicationDomain::getTypeName<DomainType>(), entityStore}; 243 auto preparedQuery = DataStoreQuery{modifiedQuery, ApplicationDomain::getTypeName<DomainType>(), entityStore};
244 auto resultSet = preparedQuery.execute(); 244 auto resultSet = preparedQuery.execute();
245 245
246 SinkTrace() << "Filtered set retrieved. " << Log::TraceTime(time.elapsed()); 246 SinkTraceCtx(mLogCtx) << "Filtered set retrieved. " << Log::TraceTime(time.elapsed());
247 auto replayResult = resultSet.replaySet(offset, batchsize, [this, query, &resultProvider](const ResultSet::Result &result) { 247 auto replayResult = resultSet.replaySet(offset, batchsize, [this, query, &resultProvider](const ResultSet::Result &result) {
248 resultProviderCallback(query, resultProvider, result); 248 resultProviderCallback(query, resultProvider, result);
249 }); 249 });
250 250
251 SinkTrace() << "Initial query took: " << Log::TraceTime(time.elapsed()); 251 SinkTraceCtx(mLogCtx) << "Initial query took: " << Log::TraceTime(time.elapsed());
252 return {entityStore.maxRevision(), replayResult.replayedEntities, replayResult.replayedAll}; 252 return {entityStore.maxRevision(), replayResult.replayedEntities, replayResult.replayedAll};
253} 253}
254 254