summaryrefslogtreecommitdiffstats
path: root/common/queryrunner.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'common/queryrunner.cpp')
-rw-r--r--common/queryrunner.cpp41
1 files changed, 18 insertions, 23 deletions
diff --git a/common/queryrunner.cpp b/common/queryrunner.cpp
index 0be2ae1..2e2e96d 100644
--- a/common/queryrunner.cpp
+++ b/common/queryrunner.cpp
@@ -29,8 +29,7 @@
29#include "asyncutils.h" 29#include "asyncutils.h"
30#include "entityreader.h" 30#include "entityreader.h"
31 31
32#undef DEBUG_AREA 32SINK_DEBUG_AREA("queryrunner")
33#define DEBUG_AREA "client.queryrunner"
34 33
35using namespace Sink; 34using namespace Sink;
36 35
@@ -43,6 +42,8 @@ using namespace Sink;
43template <typename DomainType> 42template <typename DomainType>
44class QueryWorker : public QObject 43class QueryWorker : public QObject
45{ 44{
45 // SINK_DEBUG_COMPONENT(mResourceInstanceIdentifier, mId)
46 SINK_DEBUG_COMPONENT(mResourceInstanceIdentifier)
46public: 47public:
47 QueryWorker(const Sink::Query &query, const QByteArray &instanceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &, const QByteArray &bufferType, 48 QueryWorker(const Sink::Query &query, const QByteArray &instanceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &, const QByteArray &bufferType,
48 const QueryRunnerBase::ResultTransformation &transformation); 49 const QueryRunnerBase::ResultTransformation &transformation);
@@ -61,22 +62,19 @@ private:
61 QByteArray mId; //Used for identification in debug output 62 QByteArray mId; //Used for identification in debug output
62}; 63};
63 64
64#undef Trace
65#define Trace() Trace_area(DEBUG_AREA)
66
67template <class DomainType> 65template <class DomainType>
68QueryRunner<DomainType>::QueryRunner(const Sink::Query &query, const Sink::ResourceAccessInterface::Ptr &resourceAccess, const QByteArray &instanceIdentifier, 66QueryRunner<DomainType>::QueryRunner(const Sink::Query &query, const Sink::ResourceAccessInterface::Ptr &resourceAccess, const QByteArray &instanceIdentifier,
69 const DomainTypeAdaptorFactoryInterface::Ptr &factory, const QByteArray &bufferType) 67 const DomainTypeAdaptorFactoryInterface::Ptr &factory, const QByteArray &bufferType)
70 : QueryRunnerBase(), mResourceAccess(resourceAccess), mResultProvider(new ResultProvider<typename DomainType::Ptr>), mBatchSize(query.limit) 68 : QueryRunnerBase(), mResourceAccess(resourceAccess), mResultProvider(new ResultProvider<typename DomainType::Ptr>), mBatchSize(query.limit)
71{ 69{
72 Trace() << "Starting query"; 70 SinkTrace() << "Starting query";
73 if (query.limit && query.sortProperty.isEmpty()) { 71 if (query.limit && query.sortProperty.isEmpty()) {
74 Warning() << "A limited query without sorting is typically a bad idea."; 72 SinkWarning() << "A limited query without sorting is typically a bad idea.";
75 } 73 }
76 // We delegate loading of initial data to the result provider, so it can decide for itself what it needs to load. 74 // We delegate loading of initial data to the result provider, so it can decide for itself what it needs to load.
77 mResultProvider->setFetcher([=](const typename DomainType::Ptr &parent) { 75 mResultProvider->setFetcher([=](const typename DomainType::Ptr &parent) {
78 const QByteArray parentId = parent ? parent->identifier() : QByteArray(); 76 const QByteArray parentId = parent ? parent->identifier() : QByteArray();
79 Trace() << "Running fetcher. Offset: " << mOffset[parentId] << " Batchsize: " << mBatchSize; 77 SinkTrace() << "Running fetcher. Offset: " << mOffset[parentId] << " Batchsize: " << mBatchSize;
80 auto resultProvider = mResultProvider; 78 auto resultProvider = mResultProvider;
81 if (query.synchronousQuery) { 79 if (query.synchronousQuery) {
82 QueryWorker<DomainType> worker(query, instanceIdentifier, factory, bufferType, mResultTransformation); 80 QueryWorker<DomainType> worker(query, instanceIdentifier, factory, bufferType, mResultTransformation);
@@ -131,7 +129,7 @@ QueryRunner<DomainType>::QueryRunner(const Sink::Query &query, const Sink::Resou
131template <class DomainType> 129template <class DomainType>
132QueryRunner<DomainType>::~QueryRunner() 130QueryRunner<DomainType>::~QueryRunner()
133{ 131{
134 Trace() << "Stopped query"; 132 SinkTrace() << "Stopped query";
135} 133}
136 134
137template <class DomainType> 135template <class DomainType>
@@ -147,21 +145,18 @@ typename Sink::ResultEmitter<typename DomainType::Ptr>::Ptr QueryRunner<DomainTy
147} 145}
148 146
149 147
150#undef Trace
151#define Trace() Trace_area("client.queryrunner." + mId)
152
153template <class DomainType> 148template <class DomainType>
154QueryWorker<DomainType>::QueryWorker(const Sink::Query &query, const QByteArray &instanceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &factory, 149QueryWorker<DomainType>::QueryWorker(const Sink::Query &query, const QByteArray &instanceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &factory,
155 const QByteArray &bufferType, const QueryRunnerBase::ResultTransformation &transformation) 150 const QByteArray &bufferType, const QueryRunnerBase::ResultTransformation &transformation)
156 : QObject(), mResultTransformation(transformation), mDomainTypeAdaptorFactory(factory), mResourceInstanceIdentifier(instanceIdentifier), mId(QUuid::createUuid().toByteArray()) 151 : QObject(), mResultTransformation(transformation), mDomainTypeAdaptorFactory(factory), mResourceInstanceIdentifier(instanceIdentifier), mId(QUuid::createUuid().toByteArray())
157{ 152{
158 Trace() << "Starting query worker"; 153 SinkTrace() << "Starting query worker";
159} 154}
160 155
161template <class DomainType> 156template <class DomainType>
162QueryWorker<DomainType>::~QueryWorker() 157QueryWorker<DomainType>::~QueryWorker()
163{ 158{
164 Trace() << "Stopped query worker"; 159 SinkTrace() << "Stopped query worker";
165} 160}
166 161
167template <class DomainType> 162template <class DomainType>
@@ -174,15 +169,15 @@ std::function<bool(const typename DomainType::Ptr &, Sink::Operation)> QueryWork
174 } 169 }
175 switch (operation) { 170 switch (operation) {
176 case Sink::Operation_Creation: 171 case Sink::Operation_Creation:
177 // Trace() << "Got creation"; 172 // SinkTrace() << "Got creation";
178 resultProvider.add(valueCopy); 173 resultProvider.add(valueCopy);
179 break; 174 break;
180 case Sink::Operation_Modification: 175 case Sink::Operation_Modification:
181 // Trace() << "Got modification"; 176 // SinkTrace() << "Got modification";
182 resultProvider.modify(valueCopy); 177 resultProvider.modify(valueCopy);
183 break; 178 break;
184 case Sink::Operation_Removal: 179 case Sink::Operation_Removal:
185 // Trace() << "Got removal"; 180 // SinkTrace() << "Got removal";
186 resultProvider.remove(valueCopy); 181 resultProvider.remove(valueCopy);
187 break; 182 break;
188 } 183 }
@@ -200,7 +195,7 @@ QPair<qint64, qint64> QueryWorker<DomainType>::executeIncrementalQuery(const Sin
200 195
201 Sink::EntityReader<DomainType> reader(*mDomainTypeAdaptorFactory, mResourceInstanceIdentifier, transaction); 196 Sink::EntityReader<DomainType> reader(*mDomainTypeAdaptorFactory, mResourceInstanceIdentifier, transaction);
202 auto revisionAndReplayedEntities = reader.executeIncrementalQuery(query, resultProvider.revision(), resultProviderCallback(query, resultProvider)); 197 auto revisionAndReplayedEntities = reader.executeIncrementalQuery(query, resultProvider.revision(), resultProviderCallback(query, resultProvider));
203 Trace() << "Incremental query took: " << Log::TraceTime(time.elapsed()); 198 SinkTrace() << "Incremental query took: " << Log::TraceTime(time.elapsed());
204 return revisionAndReplayedEntities; 199 return revisionAndReplayedEntities;
205} 200}
206 201
@@ -212,10 +207,10 @@ Storage::Transaction QueryWorker<DomainType>::getTransaction()
212 Sink::Storage storage(Sink::storageLocation(), mResourceInstanceIdentifier); 207 Sink::Storage storage(Sink::storageLocation(), mResourceInstanceIdentifier);
213 if (!storage.exists()) { 208 if (!storage.exists()) {
214 //This is not an error if the resource wasn't started before 209 //This is not an error if the resource wasn't started before
215 Log() << "Store doesn't exist: " << mResourceInstanceIdentifier; 210 SinkLog() << "Store doesn't exist: " << mResourceInstanceIdentifier;
216 return Sink::Storage::Transaction(); 211 return Sink::Storage::Transaction();
217 } 212 }
218 storage.setDefaultErrorHandler([](const Sink::Storage::Error &error) { Warning() << "Error during query: " << error.store << error.message; }); 213 storage.setDefaultErrorHandler([this](const Sink::Storage::Error &error) { SinkWarning() << "Error during query: " << error.store << error.message; });
219 transaction = storage.createTransaction(Sink::Storage::ReadOnly); 214 transaction = storage.createTransaction(Sink::Storage::ReadOnly);
220 } 215 }
221 216
@@ -238,10 +233,10 @@ QPair<qint64, qint64> QueryWorker<DomainType>::executeInitialQuery(
238 auto modifiedQuery = query; 233 auto modifiedQuery = query;
239 if (!query.parentProperty.isEmpty()) { 234 if (!query.parentProperty.isEmpty()) {
240 if (parent) { 235 if (parent) {
241 Trace() << "Running initial query for parent:" << parent->identifier(); 236 SinkTrace() << "Running initial query for parent:" << parent->identifier();
242 modifiedQuery.propertyFilter.insert(query.parentProperty, Query::Comparator(parent->identifier())); 237 modifiedQuery.propertyFilter.insert(query.parentProperty, Query::Comparator(parent->identifier()));
243 } else { 238 } else {
244 Trace() << "Running initial query for toplevel"; 239 SinkTrace() << "Running initial query for toplevel";
245 modifiedQuery.propertyFilter.insert(query.parentProperty, Query::Comparator(QVariant())); 240 modifiedQuery.propertyFilter.insert(query.parentProperty, Query::Comparator(QVariant()));
246 } 241 }
247 } 242 }
@@ -250,7 +245,7 @@ QPair<qint64, qint64> QueryWorker<DomainType>::executeInitialQuery(
250 245
251 Sink::EntityReader<DomainType> reader(*mDomainTypeAdaptorFactory, mResourceInstanceIdentifier, transaction); 246 Sink::EntityReader<DomainType> reader(*mDomainTypeAdaptorFactory, mResourceInstanceIdentifier, transaction);
252 auto revisionAndReplayedEntities = reader.executeInitialQuery(modifiedQuery, offset, batchsize, resultProviderCallback(query, resultProvider)); 247 auto revisionAndReplayedEntities = reader.executeInitialQuery(modifiedQuery, offset, batchsize, resultProviderCallback(query, resultProvider));
253 Trace() << "Initial query took: " << Log::TraceTime(time.elapsed()); 248 SinkTrace() << "Initial query took: " << Log::TraceTime(time.elapsed());
254 return revisionAndReplayedEntities; 249 return revisionAndReplayedEntities;
255} 250}
256 251