summaryrefslogtreecommitdiffstats
path: root/common/queryrunner.cpp
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2016-10-16 14:55:20 +0200
committerChristian Mollekopf <chrigi_1@fastmail.fm>2016-10-21 09:02:21 +0200
commit237b9ae4113e7a9f489632296941becb71afdb45 (patch)
tree01cde58f495944f01cad9d282391d4efd2897141 /common/queryrunner.cpp
parent95d11bf0be98a4e3c08502fe23417b800233ce14 (diff)
downloadsink-237b9ae4113e7a9f489632296941becb71afdb45.tar.gz
sink-237b9ae4113e7a9f489632296941becb71afdb45.zip
Refactor how the storage is used.
This is the initial refactoring to improve how we deal with the storage. It does a couple of things: * Rename Sink::Storage to Sink::Storage::DataStore to free up the Sink::Storage namespace * Introduce a Sink::ResourceContext to have a single object that can be passed around containing everything that is necessary to operate on a resource. This is a lot better than the multiple separate parameters that we used to pass around all over the place, while still allowing for dependency injection for tests. * Tie storage access together using the new EntityStore that directly works with ApplicationDomainTypes. This gives us a central place where main storage, indexes and buffer adaptors are tied together, which will also give us a place to implement external indexes, such as a fulltextindex using xapian. * Use ApplicationDomainTypes as the default way to pass around entities. Instead of using various ways to pass around entities (buffers, buffer adaptors, ApplicationDomainTypes), only use a single way. The old approach was confusing, and was only done as: * optimization; really shouldn't be necessary and otherwise I'm sure we can find better ways to optimize ApplicationDomainType itself. * a way to account for entities that have multiple buffers, a concept that I no longer deem relevant. While this commit does the bulk of the work to get there, the following commits will refactor more stuff to get things back to normal.
Diffstat (limited to 'common/queryrunner.cpp')
-rw-r--r--common/queryrunner.cpp106
1 files changed, 58 insertions, 48 deletions
diff --git a/common/queryrunner.cpp b/common/queryrunner.cpp
index f037cfc..e7963a3 100644
--- a/common/queryrunner.cpp
+++ b/common/queryrunner.cpp
@@ -28,11 +28,13 @@
28#include "definitions.h" 28#include "definitions.h"
29#include "domainadaptor.h" 29#include "domainadaptor.h"
30#include "asyncutils.h" 30#include "asyncutils.h"
31#include "entityreader.h" 31#include "storage.h"
32#include "datastorequery.h"
32 33
33SINK_DEBUG_AREA("queryrunner") 34SINK_DEBUG_AREA("queryrunner")
34 35
35using namespace Sink; 36using namespace Sink;
37using namespace Sink::Storage;
36 38
37/* 39/*
38 * This class wraps the actual query implementation. 40 * This class wraps the actual query implementation.
@@ -43,30 +45,28 @@ using namespace Sink;
43template <typename DomainType> 45template <typename DomainType>
44class QueryWorker : public QObject 46class QueryWorker : public QObject
45{ 47{
48 typedef std::function<bool(const typename DomainType::Ptr &domainObject, Sink::Operation operation, const QMap<QByteArray, QVariant> &aggregateValues)> ResultCallback;
46 // SINK_DEBUG_COMPONENT(mResourceInstanceIdentifier, mId) 49 // SINK_DEBUG_COMPONENT(mResourceInstanceIdentifier, mId)
47 SINK_DEBUG_COMPONENT(mResourceInstanceIdentifier) 50 SINK_DEBUG_COMPONENT(mResourceContext.resourceInstanceIdentifier)
48public: 51public:
49 QueryWorker(const Sink::Query &query, const QByteArray &instanceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &, const QByteArray &bufferType, 52 QueryWorker(const Sink::Query &query, const ResourceContext &context, const QByteArray &bufferType, const QueryRunnerBase::ResultTransformation &transformation);
50 const QueryRunnerBase::ResultTransformation &transformation);
51 virtual ~QueryWorker(); 53 virtual ~QueryWorker();
52 54
55 qint64 replaySet(ResultSet &resultSet, int offset, int batchSize, const ResultCallback &callback);
53 QPair<qint64, qint64> executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider); 56 QPair<qint64, qint64> executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider);
54 QPair<qint64, qint64> executeInitialQuery(const Sink::Query &query, const typename DomainType::Ptr &parent, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, int offset, int batchsize); 57 QPair<qint64, qint64> executeInitialQuery(const Sink::Query &query, const typename DomainType::Ptr &parent, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, int offset, int batchsize);
55 58
56private: 59private:
57 Storage::Transaction getTransaction();
58 std::function<bool(const typename DomainType::Ptr &, Sink::Operation, const QMap<QByteArray, QVariant> &)> resultProviderCallback(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider); 60 std::function<bool(const typename DomainType::Ptr &, Sink::Operation, const QMap<QByteArray, QVariant> &)> resultProviderCallback(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider);
59 61
60 QueryRunnerBase::ResultTransformation mResultTransformation; 62 QueryRunnerBase::ResultTransformation mResultTransformation;
61 DomainTypeAdaptorFactoryInterface::Ptr mDomainTypeAdaptorFactory; 63 ResourceContext mResourceContext;
62 QByteArray mResourceInstanceIdentifier;
63 QByteArray mId; //Used for identification in debug output 64 QByteArray mId; //Used for identification in debug output
64}; 65};
65 66
66template <class DomainType> 67template <class DomainType>
67QueryRunner<DomainType>::QueryRunner(const Sink::Query &query, const Sink::ResourceAccessInterface::Ptr &resourceAccess, const QByteArray &instanceIdentifier, 68QueryRunner<DomainType>::QueryRunner(const Sink::Query &query, const Sink::ResourceContext &context, const QByteArray &bufferType)
68 const DomainTypeAdaptorFactoryInterface::Ptr &factory, const QByteArray &bufferType) 69 : QueryRunnerBase(), mResourceContext(context), mResourceAccess(mResourceContext.resourceAccess()), mResultProvider(new ResultProvider<typename DomainType::Ptr>), mBatchSize(query.limit)
69 : QueryRunnerBase(), mResourceInstanceIdentifier(instanceIdentifier), mResourceAccess(resourceAccess), mResultProvider(new ResultProvider<typename DomainType::Ptr>), mBatchSize(query.limit)
70{ 70{
71 SinkTrace() << "Starting query"; 71 SinkTrace() << "Starting query";
72 if (query.limit && query.sortProperty.isEmpty()) { 72 if (query.limit && query.sortProperty.isEmpty()) {
@@ -79,16 +79,17 @@ QueryRunner<DomainType>::QueryRunner(const Sink::Query &query, const Sink::Resou
79 SinkTrace() << "Running fetcher. Offset: " << mOffset[parentId] << " Batchsize: " << mBatchSize; 79 SinkTrace() << "Running fetcher. Offset: " << mOffset[parentId] << " Batchsize: " << mBatchSize;
80 auto resultProvider = mResultProvider; 80 auto resultProvider = mResultProvider;
81 if (query.synchronousQuery) { 81 if (query.synchronousQuery) {
82 QueryWorker<DomainType> worker(query, instanceIdentifier, factory, bufferType, mResultTransformation); 82 QueryWorker<DomainType> worker(query, mResourceContext, bufferType, mResultTransformation);
83 worker.executeInitialQuery(query, parent, *resultProvider, mOffset[parentId], mBatchSize); 83 worker.executeInitialQuery(query, parent, *resultProvider, mOffset[parentId], mBatchSize);
84 resultProvider->initialResultSetComplete(parent); 84 resultProvider->initialResultSetComplete(parent);
85 } else { 85 } else {
86 auto resultTransformation = mResultTransformation; 86 auto resultTransformation = mResultTransformation;
87 auto offset = mOffset[parentId]; 87 auto offset = mOffset[parentId];
88 auto batchSize = mBatchSize; 88 auto batchSize = mBatchSize;
89 auto resourceContext = mResourceContext;
89 //The lambda will be executed in a separate thread, so we're extra careful 90 //The lambda will be executed in a separate thread, so we're extra careful
90 async::run<QPair<qint64, qint64> >([resultTransformation, offset, batchSize, query, bufferType, instanceIdentifier, factory, resultProvider, parent]() { 91 async::run<QPair<qint64, qint64> >([resultTransformation, offset, batchSize, query, bufferType, resourceContext, resultProvider, parent]() {
91 QueryWorker<DomainType> worker(query, instanceIdentifier, factory, bufferType, resultTransformation); 92 QueryWorker<DomainType> worker(query, resourceContext, bufferType, resultTransformation);
92 const auto newRevisionAndReplayedEntities = worker.executeInitialQuery(query, parent, *resultProvider, offset, batchSize); 93 const auto newRevisionAndReplayedEntities = worker.executeInitialQuery(query, parent, *resultProvider, offset, batchSize);
93 return newRevisionAndReplayedEntities; 94 return newRevisionAndReplayedEntities;
94 }) 95 })
@@ -115,8 +116,9 @@ QueryRunner<DomainType>::QueryRunner(const Sink::Query &query, const Sink::Resou
115 // Incremental updates are always loaded directly, leaving it up to the result to discard the changes if they are not interesting 116 // Incremental updates are always loaded directly, leaving it up to the result to discard the changes if they are not interesting
116 setQuery([=]() -> KAsync::Job<void> { 117 setQuery([=]() -> KAsync::Job<void> {
117 auto resultProvider = mResultProvider; 118 auto resultProvider = mResultProvider;
119 auto resourceContext = mResourceContext;
118 return async::run<QPair<qint64, qint64> >([=]() { 120 return async::run<QPair<qint64, qint64> >([=]() {
119 QueryWorker<DomainType> worker(query, instanceIdentifier, factory, bufferType, mResultTransformation); 121 QueryWorker<DomainType> worker(query, resourceContext, bufferType, mResultTransformation);
120 const auto newRevisionAndReplayedEntities = worker.executeIncrementalQuery(query, *resultProvider); 122 const auto newRevisionAndReplayedEntities = worker.executeIncrementalQuery(query, *resultProvider);
121 return newRevisionAndReplayedEntities; 123 return newRevisionAndReplayedEntities;
122 }) 124 })
@@ -158,11 +160,10 @@ typename Sink::ResultEmitter<typename DomainType::Ptr>::Ptr QueryRunner<DomainTy
158 return mResultProvider->emitter(); 160 return mResultProvider->emitter();
159} 161}
160 162
161
162template <class DomainType> 163template <class DomainType>
163QueryWorker<DomainType>::QueryWorker(const Sink::Query &query, const QByteArray &instanceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &factory, 164QueryWorker<DomainType>::QueryWorker(const Sink::Query &query, const Sink::ResourceContext &resourceContext,
164 const QByteArray &bufferType, const QueryRunnerBase::ResultTransformation &transformation) 165 const QByteArray &bufferType, const QueryRunnerBase::ResultTransformation &transformation)
165 : QObject(), mResultTransformation(transformation), mDomainTypeAdaptorFactory(factory), mResourceInstanceIdentifier(instanceIdentifier), mId(QUuid::createUuid().toByteArray()) 166 : QObject(), mResultTransformation(transformation), mResourceContext(resourceContext), mId(QUuid::createUuid().toByteArray())
166{ 167{
167 SinkTrace() << "Starting query worker"; 168 SinkTrace() << "Starting query worker";
168} 169}
@@ -203,41 +204,46 @@ std::function<bool(const typename DomainType::Ptr &, Sink::Operation, const QMap
203} 204}
204 205
205template <class DomainType> 206template <class DomainType>
207qint64 QueryWorker<DomainType>::replaySet(ResultSet &resultSet, int offset, int batchSize, const ResultCallback &callback)
208{
209 SinkTrace() << "Skipping over " << offset << " results";
210 resultSet.skip(offset);
211 int counter = 0;
212 while (!batchSize || (counter < batchSize)) {
213 const bool ret =
214 resultSet.next([this, &counter, callback](const ResultSet::Result &result) -> bool {
215 counter++;
216 auto adaptor = mResourceContext.adaptorFactory<DomainType>().createAdaptor(result.buffer.entity());
217 Q_ASSERT(adaptor);
218 return callback(QSharedPointer<DomainType>::create(mResourceContext.instanceId(), result.uid, result.buffer.revision(), adaptor), result.operation, result.aggregateValues);
219 });
220 if (!ret) {
221 break;
222 }
223 };
224 SinkTrace() << "Replayed " << counter << " results."
225 << "Limit " << batchSize;
226 return counter;
227}
228
229template <class DomainType>
206QPair<qint64, qint64> QueryWorker<DomainType>::executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider) 230QPair<qint64, qint64> QueryWorker<DomainType>::executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider)
207{ 231{
208 QTime time; 232 QTime time;
209 time.start(); 233 time.start();
210 234
211 auto transaction = getTransaction(); 235 auto entityStore = EntityStore::Ptr::create(mResourceContext);
212 236
213 Sink::EntityReader<DomainType> reader(*mDomainTypeAdaptorFactory, mResourceInstanceIdentifier, transaction); 237 const qint64 baseRevision = resultProvider.revision() + 1;
214 auto revisionAndReplayedEntities = reader.executeIncrementalQuery(query, resultProvider.revision(), resultProviderCallback(query, resultProvider));
215 SinkTrace() << "Incremental query took: " << Log::TraceTime(time.elapsed());
216 return revisionAndReplayedEntities;
217}
218 238
219template <class DomainType> 239 auto preparedQuery = ApplicationDomain::TypeImplementation<DomainType>::prepareQuery(query, entityStore);
220Storage::Transaction QueryWorker<DomainType>::getTransaction() 240 auto resultSet = preparedQuery->update(baseRevision);
221{
222 Sink::Storage::Transaction transaction;
223 {
224 Sink::Storage storage(Sink::storageLocation(), mResourceInstanceIdentifier);
225 if (!storage.exists()) {
226 //This is not an error if the resource wasn't started before
227 SinkLog() << "Store doesn't exist: " << mResourceInstanceIdentifier;
228 return Sink::Storage::Transaction();
229 }
230 storage.setDefaultErrorHandler([this](const Sink::Storage::Error &error) { SinkWarning() << "Error during query: " << error.store << error.message; });
231 transaction = storage.createTransaction(Sink::Storage::ReadOnly);
232 }
233 241
234 //FIXME this is a temporary measure to recover from a failure to open the named databases correctly. 242 SinkTrace() << "Filtered set retrieved. " << Log::TraceTime(time.elapsed());
235 //Once the actual problem is fixed it will be enough to simply crash if we open the wrong database (which we check in openDatabase already). 243 auto replayedEntities = replaySet(resultSet, 0, 0, resultProviderCallback(query, resultProvider));
236 while (!transaction.validateNamedDatabases()) { 244
237 Sink::Storage storage(Sink::storageLocation(), mResourceInstanceIdentifier); 245 SinkTrace() << "Incremental query took: " << Log::TraceTime(time.elapsed());
238 transaction = storage.createTransaction(Sink::Storage::ReadOnly); 246 return qMakePair(entityStore->maxRevision(), replayedEntities);
239 }
240 return transaction;
241} 247}
242 248
243template <class DomainType> 249template <class DomainType>
@@ -258,12 +264,16 @@ QPair<qint64, qint64> QueryWorker<DomainType>::executeInitialQuery(
258 } 264 }
259 } 265 }
260 266
261 auto transaction = getTransaction(); 267 auto entityStore = EntityStore::Ptr::create(mResourceContext);
268
269 auto preparedQuery = ApplicationDomain::TypeImplementation<DomainType>::prepareQuery(query, entityStore);
270 auto resultSet = preparedQuery->execute();
271
272 SinkTrace() << "Filtered set retrieved. " << Log::TraceTime(time.elapsed());
273 auto replayedEntities = replaySet(resultSet, offset, batchsize, resultProviderCallback(query, resultProvider));
262 274
263 Sink::EntityReader<DomainType> reader(*mDomainTypeAdaptorFactory, mResourceInstanceIdentifier, transaction);
264 auto revisionAndReplayedEntities = reader.executeInitialQuery(modifiedQuery, offset, batchsize, resultProviderCallback(query, resultProvider));
265 SinkTrace() << "Initial query took: " << Log::TraceTime(time.elapsed()); 275 SinkTrace() << "Initial query took: " << Log::TraceTime(time.elapsed());
266 return revisionAndReplayedEntities; 276 return qMakePair(entityStore->maxRevision(), replayedEntities);
267} 277}
268 278
269template class QueryRunner<Sink::ApplicationDomain::Folder>; 279template class QueryRunner<Sink::ApplicationDomain::Folder>;