From 237b9ae4113e7a9f489632296941becb71afdb45 Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Sun, 16 Oct 2016 14:55:20 +0200 Subject: Refactor how the storage is used. This is the initial refactoring to improve how we deal with the storage. It does a couple of things: * Rename Sink::Storage to Sink::Storage::DataStore to free up the Sink::Storage namespace * Introduce a Sink::ResourceContext to have a single object that can be passed around containing everything that is necessary to operate on a resource. This is a lot better than the multiple separate parameters that we used to pass around all over the place, while still allowing for dependency injection for tests. * Tie storage access together using the new EntityStore that directly works with ApplicationDomainTypes. This gives us a central place where main storage, indexes and buffer adaptors are tied together, which will also give us a place to implement external indexes, such as a fulltextindex using xapian. * Use ApplicationDomainTypes as the default way to pass around entities. Instead of using various ways to pass around entities (buffers, buffer adaptors, ApplicationDomainTypes), only use a single way. The old approach was confusing, and was only done as: * optimization; really shouldn't be necessary and otherwise I'm sure we can find better ways to optimize ApplicationDomainType itself. * a way to account for entities that have multiple buffers, a concept that I no longer deem relevant. While this commit does the bulk of the work to get there, the following commits will refactor more stuff to get things back to normal. --- common/queryrunner.cpp | 106 +++++++++++++++++++++++++++---------------------- 1 file changed, 58 insertions(+), 48 deletions(-) (limited to 'common/queryrunner.cpp') diff --git a/common/queryrunner.cpp b/common/queryrunner.cpp index f037cfc..e7963a3 100644 --- a/common/queryrunner.cpp +++ b/common/queryrunner.cpp @@ -28,11 +28,13 @@ #include "definitions.h" #include "domainadaptor.h" #include "asyncutils.h" -#include "entityreader.h" +#include "storage.h" +#include "datastorequery.h" SINK_DEBUG_AREA("queryrunner") using namespace Sink; +using namespace Sink::Storage; /* * This class wraps the actual query implementation. @@ -43,30 +45,28 @@ using namespace Sink; template class QueryWorker : public QObject { + typedef std::function &aggregateValues)> ResultCallback; // SINK_DEBUG_COMPONENT(mResourceInstanceIdentifier, mId) - SINK_DEBUG_COMPONENT(mResourceInstanceIdentifier) + SINK_DEBUG_COMPONENT(mResourceContext.resourceInstanceIdentifier) public: - QueryWorker(const Sink::Query &query, const QByteArray &instanceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &, const QByteArray &bufferType, - const QueryRunnerBase::ResultTransformation &transformation); + QueryWorker(const Sink::Query &query, const ResourceContext &context, const QByteArray &bufferType, const QueryRunnerBase::ResultTransformation &transformation); virtual ~QueryWorker(); + qint64 replaySet(ResultSet &resultSet, int offset, int batchSize, const ResultCallback &callback); QPair executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface &resultProvider); QPair executeInitialQuery(const Sink::Query &query, const typename DomainType::Ptr &parent, Sink::ResultProviderInterface &resultProvider, int offset, int batchsize); private: - Storage::Transaction getTransaction(); std::function &)> resultProviderCallback(const Sink::Query &query, Sink::ResultProviderInterface &resultProvider); QueryRunnerBase::ResultTransformation mResultTransformation; - DomainTypeAdaptorFactoryInterface::Ptr mDomainTypeAdaptorFactory; - QByteArray mResourceInstanceIdentifier; + ResourceContext mResourceContext; QByteArray mId; //Used for identification in debug output }; template -QueryRunner::QueryRunner(const Sink::Query &query, const Sink::ResourceAccessInterface::Ptr &resourceAccess, const QByteArray &instanceIdentifier, - const DomainTypeAdaptorFactoryInterface::Ptr &factory, const QByteArray &bufferType) - : QueryRunnerBase(), mResourceInstanceIdentifier(instanceIdentifier), mResourceAccess(resourceAccess), mResultProvider(new ResultProvider), mBatchSize(query.limit) +QueryRunner::QueryRunner(const Sink::Query &query, const Sink::ResourceContext &context, const QByteArray &bufferType) + : QueryRunnerBase(), mResourceContext(context), mResourceAccess(mResourceContext.resourceAccess()), mResultProvider(new ResultProvider), mBatchSize(query.limit) { SinkTrace() << "Starting query"; if (query.limit && query.sortProperty.isEmpty()) { @@ -79,16 +79,17 @@ QueryRunner::QueryRunner(const Sink::Query &query, const Sink::Resou SinkTrace() << "Running fetcher. Offset: " << mOffset[parentId] << " Batchsize: " << mBatchSize; auto resultProvider = mResultProvider; if (query.synchronousQuery) { - QueryWorker worker(query, instanceIdentifier, factory, bufferType, mResultTransformation); + QueryWorker worker(query, mResourceContext, bufferType, mResultTransformation); worker.executeInitialQuery(query, parent, *resultProvider, mOffset[parentId], mBatchSize); resultProvider->initialResultSetComplete(parent); } else { auto resultTransformation = mResultTransformation; auto offset = mOffset[parentId]; auto batchSize = mBatchSize; + auto resourceContext = mResourceContext; //The lambda will be executed in a separate thread, so we're extra careful - async::run >([resultTransformation, offset, batchSize, query, bufferType, instanceIdentifier, factory, resultProvider, parent]() { - QueryWorker worker(query, instanceIdentifier, factory, bufferType, resultTransformation); + async::run >([resultTransformation, offset, batchSize, query, bufferType, resourceContext, resultProvider, parent]() { + QueryWorker worker(query, resourceContext, bufferType, resultTransformation); const auto newRevisionAndReplayedEntities = worker.executeInitialQuery(query, parent, *resultProvider, offset, batchSize); return newRevisionAndReplayedEntities; }) @@ -115,8 +116,9 @@ QueryRunner::QueryRunner(const Sink::Query &query, const Sink::Resou // Incremental updates are always loaded directly, leaving it up to the result to discard the changes if they are not interesting setQuery([=]() -> KAsync::Job { auto resultProvider = mResultProvider; + auto resourceContext = mResourceContext; return async::run >([=]() { - QueryWorker worker(query, instanceIdentifier, factory, bufferType, mResultTransformation); + QueryWorker worker(query, resourceContext, bufferType, mResultTransformation); const auto newRevisionAndReplayedEntities = worker.executeIncrementalQuery(query, *resultProvider); return newRevisionAndReplayedEntities; }) @@ -158,11 +160,10 @@ typename Sink::ResultEmitter::Ptr QueryRunneremitter(); } - template -QueryWorker::QueryWorker(const Sink::Query &query, const QByteArray &instanceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &factory, +QueryWorker::QueryWorker(const Sink::Query &query, const Sink::ResourceContext &resourceContext, const QByteArray &bufferType, const QueryRunnerBase::ResultTransformation &transformation) - : QObject(), mResultTransformation(transformation), mDomainTypeAdaptorFactory(factory), mResourceInstanceIdentifier(instanceIdentifier), mId(QUuid::createUuid().toByteArray()) + : QObject(), mResultTransformation(transformation), mResourceContext(resourceContext), mId(QUuid::createUuid().toByteArray()) { SinkTrace() << "Starting query worker"; } @@ -202,42 +203,47 @@ std::function +qint64 QueryWorker::replaySet(ResultSet &resultSet, int offset, int batchSize, const ResultCallback &callback) +{ + SinkTrace() << "Skipping over " << offset << " results"; + resultSet.skip(offset); + int counter = 0; + while (!batchSize || (counter < batchSize)) { + const bool ret = + resultSet.next([this, &counter, callback](const ResultSet::Result &result) -> bool { + counter++; + auto adaptor = mResourceContext.adaptorFactory().createAdaptor(result.buffer.entity()); + Q_ASSERT(adaptor); + return callback(QSharedPointer::create(mResourceContext.instanceId(), result.uid, result.buffer.revision(), adaptor), result.operation, result.aggregateValues); + }); + if (!ret) { + break; + } + }; + SinkTrace() << "Replayed " << counter << " results." + << "Limit " << batchSize; + return counter; +} + template QPair QueryWorker::executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface &resultProvider) { QTime time; time.start(); - auto transaction = getTransaction(); + auto entityStore = EntityStore::Ptr::create(mResourceContext); - Sink::EntityReader reader(*mDomainTypeAdaptorFactory, mResourceInstanceIdentifier, transaction); - auto revisionAndReplayedEntities = reader.executeIncrementalQuery(query, resultProvider.revision(), resultProviderCallback(query, resultProvider)); - SinkTrace() << "Incremental query took: " << Log::TraceTime(time.elapsed()); - return revisionAndReplayedEntities; -} + const qint64 baseRevision = resultProvider.revision() + 1; -template -Storage::Transaction QueryWorker::getTransaction() -{ - Sink::Storage::Transaction transaction; - { - Sink::Storage storage(Sink::storageLocation(), mResourceInstanceIdentifier); - if (!storage.exists()) { - //This is not an error if the resource wasn't started before - SinkLog() << "Store doesn't exist: " << mResourceInstanceIdentifier; - return Sink::Storage::Transaction(); - } - storage.setDefaultErrorHandler([this](const Sink::Storage::Error &error) { SinkWarning() << "Error during query: " << error.store << error.message; }); - transaction = storage.createTransaction(Sink::Storage::ReadOnly); - } + auto preparedQuery = ApplicationDomain::TypeImplementation::prepareQuery(query, entityStore); + auto resultSet = preparedQuery->update(baseRevision); - //FIXME this is a temporary measure to recover from a failure to open the named databases correctly. - //Once the actual problem is fixed it will be enough to simply crash if we open the wrong database (which we check in openDatabase already). - while (!transaction.validateNamedDatabases()) { - Sink::Storage storage(Sink::storageLocation(), mResourceInstanceIdentifier); - transaction = storage.createTransaction(Sink::Storage::ReadOnly); - } - return transaction; + SinkTrace() << "Filtered set retrieved. " << Log::TraceTime(time.elapsed()); + auto replayedEntities = replaySet(resultSet, 0, 0, resultProviderCallback(query, resultProvider)); + + SinkTrace() << "Incremental query took: " << Log::TraceTime(time.elapsed()); + return qMakePair(entityStore->maxRevision(), replayedEntities); } template @@ -258,12 +264,16 @@ QPair QueryWorker::executeInitialQuery( } } - auto transaction = getTransaction(); + auto entityStore = EntityStore::Ptr::create(mResourceContext); + + auto preparedQuery = ApplicationDomain::TypeImplementation::prepareQuery(query, entityStore); + auto resultSet = preparedQuery->execute(); + + SinkTrace() << "Filtered set retrieved. " << Log::TraceTime(time.elapsed()); + auto replayedEntities = replaySet(resultSet, offset, batchsize, resultProviderCallback(query, resultProvider)); - Sink::EntityReader reader(*mDomainTypeAdaptorFactory, mResourceInstanceIdentifier, transaction); - auto revisionAndReplayedEntities = reader.executeInitialQuery(modifiedQuery, offset, batchsize, resultProviderCallback(query, resultProvider)); SinkTrace() << "Initial query took: " << Log::TraceTime(time.elapsed()); - return revisionAndReplayedEntities; + return qMakePair(entityStore->maxRevision(), replayedEntities); } template class QueryRunner; -- cgit v1.2.3