summaryrefslogtreecommitdiffstats
path: root/common/queryrunner.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'common/queryrunner.cpp')
-rw-r--r--common/queryrunner.cpp106
1 files changed, 58 insertions, 48 deletions
diff --git a/common/queryrunner.cpp b/common/queryrunner.cpp
index f037cfc..e7963a3 100644
--- a/common/queryrunner.cpp
+++ b/common/queryrunner.cpp
@@ -28,11 +28,13 @@
28#include "definitions.h" 28#include "definitions.h"
29#include "domainadaptor.h" 29#include "domainadaptor.h"
30#include "asyncutils.h" 30#include "asyncutils.h"
31#include "entityreader.h" 31#include "storage.h"
32#include "datastorequery.h"
32 33
33SINK_DEBUG_AREA("queryrunner") 34SINK_DEBUG_AREA("queryrunner")
34 35
35using namespace Sink; 36using namespace Sink;
37using namespace Sink::Storage;
36 38
37/* 39/*
38 * This class wraps the actual query implementation. 40 * This class wraps the actual query implementation.
@@ -43,30 +45,28 @@ using namespace Sink;
43template <typename DomainType> 45template <typename DomainType>
44class QueryWorker : public QObject 46class QueryWorker : public QObject
45{ 47{
48 typedef std::function<bool(const typename DomainType::Ptr &domainObject, Sink::Operation operation, const QMap<QByteArray, QVariant> &aggregateValues)> ResultCallback;
46 // SINK_DEBUG_COMPONENT(mResourceInstanceIdentifier, mId) 49 // SINK_DEBUG_COMPONENT(mResourceInstanceIdentifier, mId)
47 SINK_DEBUG_COMPONENT(mResourceInstanceIdentifier) 50 SINK_DEBUG_COMPONENT(mResourceContext.resourceInstanceIdentifier)
48public: 51public:
49 QueryWorker(const Sink::Query &query, const QByteArray &instanceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &, const QByteArray &bufferType, 52 QueryWorker(const Sink::Query &query, const ResourceContext &context, const QByteArray &bufferType, const QueryRunnerBase::ResultTransformation &transformation);
50 const QueryRunnerBase::ResultTransformation &transformation);
51 virtual ~QueryWorker(); 53 virtual ~QueryWorker();
52 54
55 qint64 replaySet(ResultSet &resultSet, int offset, int batchSize, const ResultCallback &callback);
53 QPair<qint64, qint64> executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider); 56 QPair<qint64, qint64> executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider);
54 QPair<qint64, qint64> executeInitialQuery(const Sink::Query &query, const typename DomainType::Ptr &parent, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, int offset, int batchsize); 57 QPair<qint64, qint64> executeInitialQuery(const Sink::Query &query, const typename DomainType::Ptr &parent, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, int offset, int batchsize);
55 58
56private: 59private:
57 Storage::Transaction getTransaction();
58 std::function<bool(const typename DomainType::Ptr &, Sink::Operation, const QMap<QByteArray, QVariant> &)> resultProviderCallback(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider); 60 std::function<bool(const typename DomainType::Ptr &, Sink::Operation, const QMap<QByteArray, QVariant> &)> resultProviderCallback(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider);
59 61
60 QueryRunnerBase::ResultTransformation mResultTransformation; 62 QueryRunnerBase::ResultTransformation mResultTransformation;
61 DomainTypeAdaptorFactoryInterface::Ptr mDomainTypeAdaptorFactory; 63 ResourceContext mResourceContext;
62 QByteArray mResourceInstanceIdentifier;
63 QByteArray mId; //Used for identification in debug output 64 QByteArray mId; //Used for identification in debug output
64}; 65};
65 66
66template <class DomainType> 67template <class DomainType>
67QueryRunner<DomainType>::QueryRunner(const Sink::Query &query, const Sink::ResourceAccessInterface::Ptr &resourceAccess, const QByteArray &instanceIdentifier, 68QueryRunner<DomainType>::QueryRunner(const Sink::Query &query, const Sink::ResourceContext &context, const QByteArray &bufferType)
68 const DomainTypeAdaptorFactoryInterface::Ptr &factory, const QByteArray &bufferType) 69 : QueryRunnerBase(), mResourceContext(context), mResourceAccess(mResourceContext.resourceAccess()), mResultProvider(new ResultProvider<typename DomainType::Ptr>), mBatchSize(query.limit)
69 : QueryRunnerBase(), mResourceInstanceIdentifier(instanceIdentifier), mResourceAccess(resourceAccess), mResultProvider(new ResultProvider<typename DomainType::Ptr>), mBatchSize(query.limit)
70{ 70{
71 SinkTrace() << "Starting query"; 71 SinkTrace() << "Starting query";
72 if (query.limit && query.sortProperty.isEmpty()) { 72 if (query.limit && query.sortProperty.isEmpty()) {
@@ -79,16 +79,17 @@ QueryRunner<DomainType>::QueryRunner(const Sink::Query &query, const Sink::Resou
79 SinkTrace() << "Running fetcher. Offset: " << mOffset[parentId] << " Batchsize: " << mBatchSize; 79 SinkTrace() << "Running fetcher. Offset: " << mOffset[parentId] << " Batchsize: " << mBatchSize;
80 auto resultProvider = mResultProvider; 80 auto resultProvider = mResultProvider;
81 if (query.synchronousQuery) { 81 if (query.synchronousQuery) {
82 QueryWorker<DomainType> worker(query, instanceIdentifier, factory, bufferType, mResultTransformation); 82 QueryWorker<DomainType> worker(query, mResourceContext, bufferType, mResultTransformation);
83 worker.executeInitialQuery(query, parent, *resultProvider, mOffset[parentId], mBatchSize); 83 worker.executeInitialQuery(query, parent, *resultProvider, mOffset[parentId], mBatchSize);
84 resultProvider->initialResultSetComplete(parent); 84 resultProvider->initialResultSetComplete(parent);
85 } else { 85 } else {
86 auto resultTransformation = mResultTransformation; 86 auto resultTransformation = mResultTransformation;
87 auto offset = mOffset[parentId]; 87 auto offset = mOffset[parentId];
88 auto batchSize = mBatchSize; 88 auto batchSize = mBatchSize;
89 auto resourceContext = mResourceContext;
89 //The lambda will be executed in a separate thread, so we're extra careful 90 //The lambda will be executed in a separate thread, so we're extra careful
90 async::run<QPair<qint64, qint64> >([resultTransformation, offset, batchSize, query, bufferType, instanceIdentifier, factory, resultProvider, parent]() { 91 async::run<QPair<qint64, qint64> >([resultTransformation, offset, batchSize, query, bufferType, resourceContext, resultProvider, parent]() {
91 QueryWorker<DomainType> worker(query, instanceIdentifier, factory, bufferType, resultTransformation); 92 QueryWorker<DomainType> worker(query, resourceContext, bufferType, resultTransformation);
92 const auto newRevisionAndReplayedEntities = worker.executeInitialQuery(query, parent, *resultProvider, offset, batchSize); 93 const auto newRevisionAndReplayedEntities = worker.executeInitialQuery(query, parent, *resultProvider, offset, batchSize);
93 return newRevisionAndReplayedEntities; 94 return newRevisionAndReplayedEntities;
94 }) 95 })
@@ -115,8 +116,9 @@ QueryRunner<DomainType>::QueryRunner(const Sink::Query &query, const Sink::Resou
115 // Incremental updates are always loaded directly, leaving it up to the result to discard the changes if they are not interesting 116 // Incremental updates are always loaded directly, leaving it up to the result to discard the changes if they are not interesting
116 setQuery([=]() -> KAsync::Job<void> { 117 setQuery([=]() -> KAsync::Job<void> {
117 auto resultProvider = mResultProvider; 118 auto resultProvider = mResultProvider;
119 auto resourceContext = mResourceContext;
118 return async::run<QPair<qint64, qint64> >([=]() { 120 return async::run<QPair<qint64, qint64> >([=]() {
119 QueryWorker<DomainType> worker(query, instanceIdentifier, factory, bufferType, mResultTransformation); 121 QueryWorker<DomainType> worker(query, resourceContext, bufferType, mResultTransformation);
120 const auto newRevisionAndReplayedEntities = worker.executeIncrementalQuery(query, *resultProvider); 122 const auto newRevisionAndReplayedEntities = worker.executeIncrementalQuery(query, *resultProvider);
121 return newRevisionAndReplayedEntities; 123 return newRevisionAndReplayedEntities;
122 }) 124 })
@@ -158,11 +160,10 @@ typename Sink::ResultEmitter<typename DomainType::Ptr>::Ptr QueryRunner<DomainTy
158 return mResultProvider->emitter(); 160 return mResultProvider->emitter();
159} 161}
160 162
161
162template <class DomainType> 163template <class DomainType>
163QueryWorker<DomainType>::QueryWorker(const Sink::Query &query, const QByteArray &instanceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &factory, 164QueryWorker<DomainType>::QueryWorker(const Sink::Query &query, const Sink::ResourceContext &resourceContext,
164 const QByteArray &bufferType, const QueryRunnerBase::ResultTransformation &transformation) 165 const QByteArray &bufferType, const QueryRunnerBase::ResultTransformation &transformation)
165 : QObject(), mResultTransformation(transformation), mDomainTypeAdaptorFactory(factory), mResourceInstanceIdentifier(instanceIdentifier), mId(QUuid::createUuid().toByteArray()) 166 : QObject(), mResultTransformation(transformation), mResourceContext(resourceContext), mId(QUuid::createUuid().toByteArray())
166{ 167{
167 SinkTrace() << "Starting query worker"; 168 SinkTrace() << "Starting query worker";
168} 169}
@@ -203,41 +204,46 @@ std::function<bool(const typename DomainType::Ptr &, Sink::Operation, const QMap
203} 204}
204 205
205template <class DomainType> 206template <class DomainType>
207qint64 QueryWorker<DomainType>::replaySet(ResultSet &resultSet, int offset, int batchSize, const ResultCallback &callback)
208{
209 SinkTrace() << "Skipping over " << offset << " results";
210 resultSet.skip(offset);
211 int counter = 0;
212 while (!batchSize || (counter < batchSize)) {
213 const bool ret =
214 resultSet.next([this, &counter, callback](const ResultSet::Result &result) -> bool {
215 counter++;
216 auto adaptor = mResourceContext.adaptorFactory<DomainType>().createAdaptor(result.buffer.entity());
217 Q_ASSERT(adaptor);
218 return callback(QSharedPointer<DomainType>::create(mResourceContext.instanceId(), result.uid, result.buffer.revision(), adaptor), result.operation, result.aggregateValues);
219 });
220 if (!ret) {
221 break;
222 }
223 };
224 SinkTrace() << "Replayed " << counter << " results."
225 << "Limit " << batchSize;
226 return counter;
227}
228
229template <class DomainType>
206QPair<qint64, qint64> QueryWorker<DomainType>::executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider) 230QPair<qint64, qint64> QueryWorker<DomainType>::executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider)
207{ 231{
208 QTime time; 232 QTime time;
209 time.start(); 233 time.start();
210 234
211 auto transaction = getTransaction(); 235 auto entityStore = EntityStore::Ptr::create(mResourceContext);
212 236
213 Sink::EntityReader<DomainType> reader(*mDomainTypeAdaptorFactory, mResourceInstanceIdentifier, transaction); 237 const qint64 baseRevision = resultProvider.revision() + 1;
214 auto revisionAndReplayedEntities = reader.executeIncrementalQuery(query, resultProvider.revision(), resultProviderCallback(query, resultProvider));
215 SinkTrace() << "Incremental query took: " << Log::TraceTime(time.elapsed());
216 return revisionAndReplayedEntities;
217}
218 238
219template <class DomainType> 239 auto preparedQuery = ApplicationDomain::TypeImplementation<DomainType>::prepareQuery(query, entityStore);
220Storage::Transaction QueryWorker<DomainType>::getTransaction() 240 auto resultSet = preparedQuery->update(baseRevision);
221{
222 Sink::Storage::Transaction transaction;
223 {
224 Sink::Storage storage(Sink::storageLocation(), mResourceInstanceIdentifier);
225 if (!storage.exists()) {
226 //This is not an error if the resource wasn't started before
227 SinkLog() << "Store doesn't exist: " << mResourceInstanceIdentifier;
228 return Sink::Storage::Transaction();
229 }
230 storage.setDefaultErrorHandler([this](const Sink::Storage::Error &error) { SinkWarning() << "Error during query: " << error.store << error.message; });
231 transaction = storage.createTransaction(Sink::Storage::ReadOnly);
232 }
233 241
234 //FIXME this is a temporary measure to recover from a failure to open the named databases correctly. 242 SinkTrace() << "Filtered set retrieved. " << Log::TraceTime(time.elapsed());
235 //Once the actual problem is fixed it will be enough to simply crash if we open the wrong database (which we check in openDatabase already). 243 auto replayedEntities = replaySet(resultSet, 0, 0, resultProviderCallback(query, resultProvider));
236 while (!transaction.validateNamedDatabases()) { 244
237 Sink::Storage storage(Sink::storageLocation(), mResourceInstanceIdentifier); 245 SinkTrace() << "Incremental query took: " << Log::TraceTime(time.elapsed());
238 transaction = storage.createTransaction(Sink::Storage::ReadOnly); 246 return qMakePair(entityStore->maxRevision(), replayedEntities);
239 }
240 return transaction;
241} 247}
242 248
243template <class DomainType> 249template <class DomainType>
@@ -258,12 +264,16 @@ QPair<qint64, qint64> QueryWorker<DomainType>::executeInitialQuery(
258 } 264 }
259 } 265 }
260 266
261 auto transaction = getTransaction(); 267 auto entityStore = EntityStore::Ptr::create(mResourceContext);
268
269 auto preparedQuery = ApplicationDomain::TypeImplementation<DomainType>::prepareQuery(query, entityStore);
270 auto resultSet = preparedQuery->execute();
271
272 SinkTrace() << "Filtered set retrieved. " << Log::TraceTime(time.elapsed());
273 auto replayedEntities = replaySet(resultSet, offset, batchsize, resultProviderCallback(query, resultProvider));
262 274
263 Sink::EntityReader<DomainType> reader(*mDomainTypeAdaptorFactory, mResourceInstanceIdentifier, transaction);
264 auto revisionAndReplayedEntities = reader.executeInitialQuery(modifiedQuery, offset, batchsize, resultProviderCallback(query, resultProvider));
265 SinkTrace() << "Initial query took: " << Log::TraceTime(time.elapsed()); 275 SinkTrace() << "Initial query took: " << Log::TraceTime(time.elapsed());
266 return revisionAndReplayedEntities; 276 return qMakePair(entityStore->maxRevision(), replayedEntities);
267} 277}
268 278
269template class QueryRunner<Sink::ApplicationDomain::Folder>; 279template class QueryRunner<Sink::ApplicationDomain::Folder>;