diff options
Diffstat (limited to 'common/queryrunner.cpp')
-rw-r--r-- | common/queryrunner.cpp | 50 |
1 files changed, 25 insertions, 25 deletions
diff --git a/common/queryrunner.cpp b/common/queryrunner.cpp index 377e3b9..ab4d60b 100644 --- a/common/queryrunner.cpp +++ b/common/queryrunner.cpp | |||
@@ -47,10 +47,8 @@ template <typename DomainType> | |||
47 | class QueryWorker : public QObject | 47 | class QueryWorker : public QObject |
48 | { | 48 | { |
49 | typedef std::function<bool(const typename DomainType::Ptr &domainObject, Sink::Operation operation, const QMap<QByteArray, QVariant> &aggregateValues)> ResultCallback; | 49 | typedef std::function<bool(const typename DomainType::Ptr &domainObject, Sink::Operation operation, const QMap<QByteArray, QVariant> &aggregateValues)> ResultCallback; |
50 | // SINK_DEBUG_COMPONENT(mResourceInstanceIdentifier, mId) | ||
51 | SINK_DEBUG_COMPONENT(mResourceContext.resourceInstanceIdentifier) | ||
52 | public: | 50 | public: |
53 | QueryWorker(const Sink::Query &query, const ResourceContext &context, const QByteArray &bufferType, const QueryRunnerBase::ResultTransformation &transformation); | 51 | QueryWorker(const Sink::Query &query, const ResourceContext &context, const QByteArray &bufferType, const QueryRunnerBase::ResultTransformation &transformation, const Sink::Log::Context &logCtx); |
54 | virtual ~QueryWorker(); | 52 | virtual ~QueryWorker(); |
55 | 53 | ||
56 | ReplayResult executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider); | 54 | ReplayResult executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider); |
@@ -61,14 +59,14 @@ private: | |||
61 | 59 | ||
62 | QueryRunnerBase::ResultTransformation mResultTransformation; | 60 | QueryRunnerBase::ResultTransformation mResultTransformation; |
63 | ResourceContext mResourceContext; | 61 | ResourceContext mResourceContext; |
64 | QByteArray mId; //Used for identification in debug output | 62 | Sink::Log::Context mLogCtx; |
65 | }; | 63 | }; |
66 | 64 | ||
67 | template <class DomainType> | 65 | template <class DomainType> |
68 | QueryRunner<DomainType>::QueryRunner(const Sink::Query &query, const Sink::ResourceContext &context, const QByteArray &bufferType) | 66 | QueryRunner<DomainType>::QueryRunner(const Sink::Query &query, const Sink::ResourceContext &context, const QByteArray &bufferType, const Sink::Log::Context &logCtx) |
69 | : QueryRunnerBase(), mResourceContext(context), mResourceAccess(mResourceContext.resourceAccess()), mResultProvider(new ResultProvider<typename DomainType::Ptr>), mBatchSize(query.limit()) | 67 | : QueryRunnerBase(), mResourceContext(context), mResourceAccess(mResourceContext.resourceAccess()), mResultProvider(new ResultProvider<typename DomainType::Ptr>), mBatchSize(query.limit()), mLogCtx(logCtx.subContext("queryrunner")) |
70 | { | 68 | { |
71 | SinkTrace() << "Starting query. Is live:" << query.liveQuery() << " Limit: " << query.limit(); | 69 | SinkTraceCtx(mLogCtx) << "Starting query. Is live:" << query.liveQuery() << " Limit: " << query.limit(); |
72 | if (query.limit() && query.sortProperty().isEmpty()) { | 70 | if (query.limit() && query.sortProperty().isEmpty()) { |
73 | SinkWarning() << "A limited query without sorting is typically a bad idea, because there is no telling what you're going to get."; | 71 | SinkWarning() << "A limited query without sorting is typically a bad idea, because there is no telling what you're going to get."; |
74 | } | 72 | } |
@@ -76,10 +74,10 @@ QueryRunner<DomainType>::QueryRunner(const Sink::Query &query, const Sink::Resou | |||
76 | // We delegate loading of initial data to the result provider, so it can decide for itself what it needs to load. | 74 | // We delegate loading of initial data to the result provider, so it can decide for itself what it needs to load. |
77 | mResultProvider->setFetcher([=](const typename DomainType::Ptr &parent) { | 75 | mResultProvider->setFetcher([=](const typename DomainType::Ptr &parent) { |
78 | const QByteArray parentId = parent ? parent->identifier() : QByteArray(); | 76 | const QByteArray parentId = parent ? parent->identifier() : QByteArray(); |
79 | SinkTrace() << "Running fetcher. Offset: " << mOffset[parentId] << " Batchsize: " << mBatchSize; | 77 | SinkTraceCtx(mLogCtx) << "Running fetcher. Offset: " << mOffset[parentId] << " Batchsize: " << mBatchSize; |
80 | auto resultProvider = mResultProvider; | 78 | auto resultProvider = mResultProvider; |
81 | if (query.synchronousQuery()) { | 79 | if (query.synchronousQuery()) { |
82 | QueryWorker<DomainType> worker(query, mResourceContext, bufferType, mResultTransformation); | 80 | QueryWorker<DomainType> worker(query, mResourceContext, bufferType, mResultTransformation, mLogCtx); |
83 | const auto newRevisionAndReplayedEntities = worker.executeInitialQuery(query, parent, *resultProvider, mOffset[parentId], mBatchSize); | 81 | const auto newRevisionAndReplayedEntities = worker.executeInitialQuery(query, parent, *resultProvider, mOffset[parentId], mBatchSize); |
84 | mOffset[parentId] += newRevisionAndReplayedEntities.replayedEntities; | 82 | mOffset[parentId] += newRevisionAndReplayedEntities.replayedEntities; |
85 | resultProvider->setRevision(newRevisionAndReplayedEntities.newRevision); | 83 | resultProvider->setRevision(newRevisionAndReplayedEntities.newRevision); |
@@ -89,9 +87,10 @@ QueryRunner<DomainType>::QueryRunner(const Sink::Query &query, const Sink::Resou | |||
89 | auto offset = mOffset[parentId]; | 87 | auto offset = mOffset[parentId]; |
90 | auto batchSize = mBatchSize; | 88 | auto batchSize = mBatchSize; |
91 | auto resourceContext = mResourceContext; | 89 | auto resourceContext = mResourceContext; |
90 | auto logCtx = mLogCtx; | ||
92 | //The lambda will be executed in a separate thread, so copy all arguments | 91 | //The lambda will be executed in a separate thread, so copy all arguments |
93 | async::run<ReplayResult>([resultTransformation, offset, batchSize, query, bufferType, resourceContext, resultProvider, parent]() { | 92 | async::run<ReplayResult>([resultTransformation, offset, batchSize, query, bufferType, resourceContext, resultProvider, parent, logCtx]() { |
94 | QueryWorker<DomainType> worker(query, resourceContext, bufferType, resultTransformation); | 93 | QueryWorker<DomainType> worker(query, resourceContext, bufferType, resultTransformation, logCtx); |
95 | const auto newRevisionAndReplayedEntities = worker.executeInitialQuery(query, parent, *resultProvider, offset, batchSize); | 94 | const auto newRevisionAndReplayedEntities = worker.executeInitialQuery(query, parent, *resultProvider, offset, batchSize); |
96 | return newRevisionAndReplayedEntities; | 95 | return newRevisionAndReplayedEntities; |
97 | }) | 96 | }) |
@@ -119,8 +118,9 @@ QueryRunner<DomainType>::QueryRunner(const Sink::Query &query, const Sink::Resou | |||
119 | setQuery([=]() -> KAsync::Job<void> { | 118 | setQuery([=]() -> KAsync::Job<void> { |
120 | auto resultProvider = mResultProvider; | 119 | auto resultProvider = mResultProvider; |
121 | auto resourceContext = mResourceContext; | 120 | auto resourceContext = mResourceContext; |
121 | auto logCtx = mLogCtx; | ||
122 | return async::run<ReplayResult>([=]() { | 122 | return async::run<ReplayResult>([=]() { |
123 | QueryWorker<DomainType> worker(query, resourceContext, bufferType, mResultTransformation); | 123 | QueryWorker<DomainType> worker(query, resourceContext, bufferType, mResultTransformation, logCtx); |
124 | const auto newRevisionAndReplayedEntities = worker.executeIncrementalQuery(query, *resultProvider); | 124 | const auto newRevisionAndReplayedEntities = worker.executeIncrementalQuery(query, *resultProvider); |
125 | return newRevisionAndReplayedEntities; | 125 | return newRevisionAndReplayedEntities; |
126 | }) | 126 | }) |
@@ -147,7 +147,7 @@ QueryRunner<DomainType>::QueryRunner(const Sink::Query &query, const Sink::Resou | |||
147 | template <class DomainType> | 147 | template <class DomainType> |
148 | QueryRunner<DomainType>::~QueryRunner() | 148 | QueryRunner<DomainType>::~QueryRunner() |
149 | { | 149 | { |
150 | SinkTrace() << "Stopped query"; | 150 | SinkTraceCtx(mLogCtx) << "Stopped query"; |
151 | } | 151 | } |
152 | 152 | ||
153 | template <class DomainType> | 153 | template <class DomainType> |
@@ -164,16 +164,16 @@ typename Sink::ResultEmitter<typename DomainType::Ptr>::Ptr QueryRunner<DomainTy | |||
164 | 164 | ||
165 | template <class DomainType> | 165 | template <class DomainType> |
166 | QueryWorker<DomainType>::QueryWorker(const Sink::Query &query, const Sink::ResourceContext &resourceContext, | 166 | QueryWorker<DomainType>::QueryWorker(const Sink::Query &query, const Sink::ResourceContext &resourceContext, |
167 | const QByteArray &bufferType, const QueryRunnerBase::ResultTransformation &transformation) | 167 | const QByteArray &bufferType, const QueryRunnerBase::ResultTransformation &transformation, const Sink::Log::Context &logCtx) |
168 | : QObject(), mResultTransformation(transformation), mResourceContext(resourceContext), mId(QUuid::createUuid().toByteArray()) | 168 | : QObject(), mResultTransformation(transformation), mResourceContext(resourceContext), mLogCtx(logCtx.subContext("worker")) |
169 | { | 169 | { |
170 | SinkTrace() << "Starting query worker"; | 170 | SinkTraceCtx(mLogCtx) << "Starting query worker"; |
171 | } | 171 | } |
172 | 172 | ||
173 | template <class DomainType> | 173 | template <class DomainType> |
174 | QueryWorker<DomainType>::~QueryWorker() | 174 | QueryWorker<DomainType>::~QueryWorker() |
175 | { | 175 | { |
176 | SinkTrace() << "Stopped query worker"; | 176 | SinkTraceCtx(mLogCtx) << "Stopped query worker"; |
177 | } | 177 | } |
178 | 178 | ||
179 | template <class DomainType> | 179 | template <class DomainType> |
@@ -209,15 +209,15 @@ ReplayResult QueryWorker<DomainType>::executeIncrementalQuery(const Sink::Query | |||
209 | time.start(); | 209 | time.start(); |
210 | 210 | ||
211 | const qint64 baseRevision = resultProvider.revision() + 1; | 211 | const qint64 baseRevision = resultProvider.revision() + 1; |
212 | auto entityStore = EntityStore{mResourceContext}; | 212 | auto entityStore = EntityStore{mResourceContext, mLogCtx}; |
213 | auto preparedQuery = DataStoreQuery{query, ApplicationDomain::getTypeName<DomainType>(), entityStore}; | 213 | auto preparedQuery = DataStoreQuery{query, ApplicationDomain::getTypeName<DomainType>(), entityStore}; |
214 | auto resultSet = preparedQuery.update(baseRevision); | 214 | auto resultSet = preparedQuery.update(baseRevision); |
215 | SinkTrace() << "Filtered set retrieved. " << Log::TraceTime(time.elapsed()); | 215 | SinkTraceCtx(mLogCtx) << "Filtered set retrieved. " << Log::TraceTime(time.elapsed()); |
216 | auto replayResult = resultSet.replaySet(0, 0, [this, query, &resultProvider](const ResultSet::Result &result) { | 216 | auto replayResult = resultSet.replaySet(0, 0, [this, query, &resultProvider](const ResultSet::Result &result) { |
217 | resultProviderCallback(query, resultProvider, result); | 217 | resultProviderCallback(query, resultProvider, result); |
218 | }); | 218 | }); |
219 | 219 | ||
220 | SinkTrace() << "Incremental query took: " << Log::TraceTime(time.elapsed()); | 220 | SinkTraceCtx(mLogCtx) << "Incremental query took: " << Log::TraceTime(time.elapsed()); |
221 | return {entityStore.maxRevision(), replayResult.replayedEntities, replayResult.replayedAll}; | 221 | return {entityStore.maxRevision(), replayResult.replayedEntities, replayResult.replayedAll}; |
222 | } | 222 | } |
223 | 223 | ||
@@ -231,24 +231,24 @@ ReplayResult QueryWorker<DomainType>::executeInitialQuery( | |||
231 | auto modifiedQuery = query; | 231 | auto modifiedQuery = query; |
232 | if (!query.parentProperty().isEmpty()) { | 232 | if (!query.parentProperty().isEmpty()) { |
233 | if (parent) { | 233 | if (parent) { |
234 | SinkTrace() << "Running initial query for parent:" << parent->identifier(); | 234 | SinkTraceCtx(mLogCtx) << "Running initial query for parent:" << parent->identifier(); |
235 | modifiedQuery.filter(query.parentProperty(), Query::Comparator(QVariant::fromValue(Sink::ApplicationDomain::Reference{parent->identifier()}))); | 235 | modifiedQuery.filter(query.parentProperty(), Query::Comparator(QVariant::fromValue(Sink::ApplicationDomain::Reference{parent->identifier()}))); |
236 | } else { | 236 | } else { |
237 | SinkTrace() << "Running initial query for toplevel"; | 237 | SinkTraceCtx(mLogCtx) << "Running initial query for toplevel"; |
238 | modifiedQuery.filter(query.parentProperty(), Query::Comparator(QVariant{})); | 238 | modifiedQuery.filter(query.parentProperty(), Query::Comparator(QVariant{})); |
239 | } | 239 | } |
240 | } | 240 | } |
241 | 241 | ||
242 | auto entityStore = EntityStore{mResourceContext}; | 242 | auto entityStore = EntityStore{mResourceContext, mLogCtx}; |
243 | auto preparedQuery = DataStoreQuery{modifiedQuery, ApplicationDomain::getTypeName<DomainType>(), entityStore}; | 243 | auto preparedQuery = DataStoreQuery{modifiedQuery, ApplicationDomain::getTypeName<DomainType>(), entityStore}; |
244 | auto resultSet = preparedQuery.execute(); | 244 | auto resultSet = preparedQuery.execute(); |
245 | 245 | ||
246 | SinkTrace() << "Filtered set retrieved. " << Log::TraceTime(time.elapsed()); | 246 | SinkTraceCtx(mLogCtx) << "Filtered set retrieved. " << Log::TraceTime(time.elapsed()); |
247 | auto replayResult = resultSet.replaySet(offset, batchsize, [this, query, &resultProvider](const ResultSet::Result &result) { | 247 | auto replayResult = resultSet.replaySet(offset, batchsize, [this, query, &resultProvider](const ResultSet::Result &result) { |
248 | resultProviderCallback(query, resultProvider, result); | 248 | resultProviderCallback(query, resultProvider, result); |
249 | }); | 249 | }); |
250 | 250 | ||
251 | SinkTrace() << "Initial query took: " << Log::TraceTime(time.elapsed()); | 251 | SinkTraceCtx(mLogCtx) << "Initial query took: " << Log::TraceTime(time.elapsed()); |
252 | return {entityStore.maxRevision(), replayResult.replayedEntities, replayResult.replayedAll}; | 252 | return {entityStore.maxRevision(), replayResult.replayedEntities, replayResult.replayedAll}; |
253 | } | 253 | } |
254 | 254 | ||