summaryrefslogtreecommitdiffstats
path: root/common/queryrunner.cpp
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2016-07-11 11:55:29 +0200
committerChristian Mollekopf <chrigi_1@fastmail.fm>2016-07-11 11:55:29 +0200
commit3a3118e768e1447dc7524328e84b8d7faef81fe1 (patch)
treeaf5582170ed6164fffc9365f34b17bf449c0db40 /common/queryrunner.cpp
parentf9379318d801df204cc50385c5eca1f28e91755e (diff)
parentce2fd2666f084eebe443598f6f3740a02913091e (diff)
downloadsink-3a3118e768e1447dc7524328e84b8d7faef81fe1.tar.gz
sink-3a3118e768e1447dc7524328e84b8d7faef81fe1.zip
Merge branch 'feature/notifications' into develop
Diffstat (limited to 'common/queryrunner.cpp')
-rw-r--r--common/queryrunner.cpp44
1 files changed, 21 insertions, 23 deletions
diff --git a/common/queryrunner.cpp b/common/queryrunner.cpp
index 78a4b94..2e2e96d 100644
--- a/common/queryrunner.cpp
+++ b/common/queryrunner.cpp
@@ -29,8 +29,7 @@
29#include "asyncutils.h" 29#include "asyncutils.h"
30#include "entityreader.h" 30#include "entityreader.h"
31 31
32#undef DEBUG_AREA 32SINK_DEBUG_AREA("queryrunner")
33#define DEBUG_AREA "client.queryrunner"
34 33
35using namespace Sink; 34using namespace Sink;
36 35
@@ -43,6 +42,8 @@ using namespace Sink;
43template <typename DomainType> 42template <typename DomainType>
44class QueryWorker : public QObject 43class QueryWorker : public QObject
45{ 44{
45 // SINK_DEBUG_COMPONENT(mResourceInstanceIdentifier, mId)
46 SINK_DEBUG_COMPONENT(mResourceInstanceIdentifier)
46public: 47public:
47 QueryWorker(const Sink::Query &query, const QByteArray &instanceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &, const QByteArray &bufferType, 48 QueryWorker(const Sink::Query &query, const QByteArray &instanceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &, const QByteArray &bufferType,
48 const QueryRunnerBase::ResultTransformation &transformation); 49 const QueryRunnerBase::ResultTransformation &transformation);
@@ -61,22 +62,19 @@ private:
61 QByteArray mId; //Used for identification in debug output 62 QByteArray mId; //Used for identification in debug output
62}; 63};
63 64
64#undef Trace
65#define Trace() Trace_area(DEBUG_AREA)
66
67template <class DomainType> 65template <class DomainType>
68QueryRunner<DomainType>::QueryRunner(const Sink::Query &query, const Sink::ResourceAccessInterface::Ptr &resourceAccess, const QByteArray &instanceIdentifier, 66QueryRunner<DomainType>::QueryRunner(const Sink::Query &query, const Sink::ResourceAccessInterface::Ptr &resourceAccess, const QByteArray &instanceIdentifier,
69 const DomainTypeAdaptorFactoryInterface::Ptr &factory, const QByteArray &bufferType) 67 const DomainTypeAdaptorFactoryInterface::Ptr &factory, const QByteArray &bufferType)
70 : QueryRunnerBase(), mResourceAccess(resourceAccess), mResultProvider(new ResultProvider<typename DomainType::Ptr>), mBatchSize(query.limit) 68 : QueryRunnerBase(), mResourceAccess(resourceAccess), mResultProvider(new ResultProvider<typename DomainType::Ptr>), mBatchSize(query.limit)
71{ 69{
72 Trace() << "Starting query"; 70 SinkTrace() << "Starting query";
73 if (query.limit && query.sortProperty.isEmpty()) { 71 if (query.limit && query.sortProperty.isEmpty()) {
74 Warning() << "A limited query without sorting is typically a bad idea."; 72 SinkWarning() << "A limited query without sorting is typically a bad idea.";
75 } 73 }
76 // We delegate loading of initial data to the result provider, so it can decide for itself what it needs to load. 74 // We delegate loading of initial data to the result provider, so it can decide for itself what it needs to load.
77 mResultProvider->setFetcher([=](const typename DomainType::Ptr &parent) { 75 mResultProvider->setFetcher([=](const typename DomainType::Ptr &parent) {
78 const QByteArray parentId = parent ? parent->identifier() : QByteArray(); 76 const QByteArray parentId = parent ? parent->identifier() : QByteArray();
79 Trace() << "Running fetcher. Offset: " << mOffset[parentId] << " Batchsize: " << mBatchSize; 77 SinkTrace() << "Running fetcher. Offset: " << mOffset[parentId] << " Batchsize: " << mBatchSize;
80 auto resultProvider = mResultProvider; 78 auto resultProvider = mResultProvider;
81 if (query.synchronousQuery) { 79 if (query.synchronousQuery) {
82 QueryWorker<DomainType> worker(query, instanceIdentifier, factory, bufferType, mResultTransformation); 80 QueryWorker<DomainType> worker(query, instanceIdentifier, factory, bufferType, mResultTransformation);
@@ -123,12 +121,15 @@ QueryRunner<DomainType>::QueryRunner(const Sink::Query &query, const Sink::Resou
123 mResourceAccess->open(); 121 mResourceAccess->open();
124 QObject::connect(mResourceAccess.data(), &Sink::ResourceAccess::revisionChanged, this, &QueryRunner::revisionChanged); 122 QObject::connect(mResourceAccess.data(), &Sink::ResourceAccess::revisionChanged, this, &QueryRunner::revisionChanged);
125 } 123 }
124 mResultProvider->onDone([this]() {
125 delete this;
126 });
126} 127}
127 128
128template <class DomainType> 129template <class DomainType>
129QueryRunner<DomainType>::~QueryRunner() 130QueryRunner<DomainType>::~QueryRunner()
130{ 131{
131 Trace() << "Stopped query"; 132 SinkTrace() << "Stopped query";
132} 133}
133 134
134template <class DomainType> 135template <class DomainType>
@@ -144,21 +145,18 @@ typename Sink::ResultEmitter<typename DomainType::Ptr>::Ptr QueryRunner<DomainTy
144} 145}
145 146
146 147
147#undef Trace
148#define Trace() Trace_area("client.queryrunner." + mId)
149
150template <class DomainType> 148template <class DomainType>
151QueryWorker<DomainType>::QueryWorker(const Sink::Query &query, const QByteArray &instanceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &factory, 149QueryWorker<DomainType>::QueryWorker(const Sink::Query &query, const QByteArray &instanceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &factory,
152 const QByteArray &bufferType, const QueryRunnerBase::ResultTransformation &transformation) 150 const QByteArray &bufferType, const QueryRunnerBase::ResultTransformation &transformation)
153 : QObject(), mResultTransformation(transformation), mDomainTypeAdaptorFactory(factory), mResourceInstanceIdentifier(instanceIdentifier), mId(QUuid::createUuid().toByteArray()) 151 : QObject(), mResultTransformation(transformation), mDomainTypeAdaptorFactory(factory), mResourceInstanceIdentifier(instanceIdentifier), mId(QUuid::createUuid().toByteArray())
154{ 152{
155 Trace() << "Starting query worker"; 153 SinkTrace() << "Starting query worker";
156} 154}
157 155
158template <class DomainType> 156template <class DomainType>
159QueryWorker<DomainType>::~QueryWorker() 157QueryWorker<DomainType>::~QueryWorker()
160{ 158{
161 Trace() << "Stopped query worker"; 159 SinkTrace() << "Stopped query worker";
162} 160}
163 161
164template <class DomainType> 162template <class DomainType>
@@ -171,15 +169,15 @@ std::function<bool(const typename DomainType::Ptr &, Sink::Operation)> QueryWork
171 } 169 }
172 switch (operation) { 170 switch (operation) {
173 case Sink::Operation_Creation: 171 case Sink::Operation_Creation:
174 // Trace() << "Got creation"; 172 // SinkTrace() << "Got creation";
175 resultProvider.add(valueCopy); 173 resultProvider.add(valueCopy);
176 break; 174 break;
177 case Sink::Operation_Modification: 175 case Sink::Operation_Modification:
178 // Trace() << "Got modification"; 176 // SinkTrace() << "Got modification";
179 resultProvider.modify(valueCopy); 177 resultProvider.modify(valueCopy);
180 break; 178 break;
181 case Sink::Operation_Removal: 179 case Sink::Operation_Removal:
182 // Trace() << "Got removal"; 180 // SinkTrace() << "Got removal";
183 resultProvider.remove(valueCopy); 181 resultProvider.remove(valueCopy);
184 break; 182 break;
185 } 183 }
@@ -197,7 +195,7 @@ QPair<qint64, qint64> QueryWorker<DomainType>::executeIncrementalQuery(const Sin
197 195
198 Sink::EntityReader<DomainType> reader(*mDomainTypeAdaptorFactory, mResourceInstanceIdentifier, transaction); 196 Sink::EntityReader<DomainType> reader(*mDomainTypeAdaptorFactory, mResourceInstanceIdentifier, transaction);
199 auto revisionAndReplayedEntities = reader.executeIncrementalQuery(query, resultProvider.revision(), resultProviderCallback(query, resultProvider)); 197 auto revisionAndReplayedEntities = reader.executeIncrementalQuery(query, resultProvider.revision(), resultProviderCallback(query, resultProvider));
200 Trace() << "Incremental query took: " << Log::TraceTime(time.elapsed()); 198 SinkTrace() << "Incremental query took: " << Log::TraceTime(time.elapsed());
201 return revisionAndReplayedEntities; 199 return revisionAndReplayedEntities;
202} 200}
203 201
@@ -209,10 +207,10 @@ Storage::Transaction QueryWorker<DomainType>::getTransaction()
209 Sink::Storage storage(Sink::storageLocation(), mResourceInstanceIdentifier); 207 Sink::Storage storage(Sink::storageLocation(), mResourceInstanceIdentifier);
210 if (!storage.exists()) { 208 if (!storage.exists()) {
211 //This is not an error if the resource wasn't started before 209 //This is not an error if the resource wasn't started before
212 Log() << "Store doesn't exist: " << mResourceInstanceIdentifier; 210 SinkLog() << "Store doesn't exist: " << mResourceInstanceIdentifier;
213 return Sink::Storage::Transaction(); 211 return Sink::Storage::Transaction();
214 } 212 }
215 storage.setDefaultErrorHandler([](const Sink::Storage::Error &error) { Warning() << "Error during query: " << error.store << error.message; }); 213 storage.setDefaultErrorHandler([this](const Sink::Storage::Error &error) { SinkWarning() << "Error during query: " << error.store << error.message; });
216 transaction = storage.createTransaction(Sink::Storage::ReadOnly); 214 transaction = storage.createTransaction(Sink::Storage::ReadOnly);
217 } 215 }
218 216
@@ -235,10 +233,10 @@ QPair<qint64, qint64> QueryWorker<DomainType>::executeInitialQuery(
235 auto modifiedQuery = query; 233 auto modifiedQuery = query;
236 if (!query.parentProperty.isEmpty()) { 234 if (!query.parentProperty.isEmpty()) {
237 if (parent) { 235 if (parent) {
238 Trace() << "Running initial query for parent:" << parent->identifier(); 236 SinkTrace() << "Running initial query for parent:" << parent->identifier();
239 modifiedQuery.propertyFilter.insert(query.parentProperty, Query::Comparator(parent->identifier())); 237 modifiedQuery.propertyFilter.insert(query.parentProperty, Query::Comparator(parent->identifier()));
240 } else { 238 } else {
241 Trace() << "Running initial query for toplevel"; 239 SinkTrace() << "Running initial query for toplevel";
242 modifiedQuery.propertyFilter.insert(query.parentProperty, Query::Comparator(QVariant())); 240 modifiedQuery.propertyFilter.insert(query.parentProperty, Query::Comparator(QVariant()));
243 } 241 }
244 } 242 }
@@ -247,7 +245,7 @@ QPair<qint64, qint64> QueryWorker<DomainType>::executeInitialQuery(
247 245
248 Sink::EntityReader<DomainType> reader(*mDomainTypeAdaptorFactory, mResourceInstanceIdentifier, transaction); 246 Sink::EntityReader<DomainType> reader(*mDomainTypeAdaptorFactory, mResourceInstanceIdentifier, transaction);
249 auto revisionAndReplayedEntities = reader.executeInitialQuery(modifiedQuery, offset, batchsize, resultProviderCallback(query, resultProvider)); 247 auto revisionAndReplayedEntities = reader.executeInitialQuery(modifiedQuery, offset, batchsize, resultProviderCallback(query, resultProvider));
250 Trace() << "Initial query took: " << Log::TraceTime(time.elapsed()); 248 SinkTrace() << "Initial query took: " << Log::TraceTime(time.elapsed());
251 return revisionAndReplayedEntities; 249 return revisionAndReplayedEntities;
252} 250}
253 251