diff options
author | Christian Mollekopf <chrigi_1@fastmail.fm> | 2016-10-16 14:55:20 +0200 |
---|---|---|
committer | Christian Mollekopf <chrigi_1@fastmail.fm> | 2016-10-21 09:02:21 +0200 |
commit | 237b9ae4113e7a9f489632296941becb71afdb45 (patch) | |
tree | 01cde58f495944f01cad9d282391d4efd2897141 /common/queryrunner.cpp | |
parent | 95d11bf0be98a4e3c08502fe23417b800233ce14 (diff) | |
download | sink-237b9ae4113e7a9f489632296941becb71afdb45.tar.gz sink-237b9ae4113e7a9f489632296941becb71afdb45.zip |
Refactor how the storage is used.
This is the initial refactoring to improve how we deal with the storage.
It does a couple of things:
* Rename Sink::Storage to Sink::Storage::DataStore to free up the
Sink::Storage namespace
* Introduce a Sink::ResourceContext to have a single object that can be
passed around containing everything that is necessary to operate on a
resource. This is a lot better than the multiple separate parameters
that we used to pass around all over the place, while still allowing
for dependency injection for tests.
* Tie storage access together using the new EntityStore that directly
works with ApplicationDomainTypes. This gives us a central place where
main storage, indexes and buffer adaptors are tied together, which
will also give us a place to implement external indexes, such as a
fulltextindex using xapian.
* Use ApplicationDomainTypes as the default way to pass around entities.
Instead of using various ways to pass around entities (buffers,
buffer adaptors, ApplicationDomainTypes), only use a single way.
The old approach was confusing, and was only done as:
* optimization; really shouldn't be necessary and otherwise I'm sure
we can find better ways to optimize ApplicationDomainType itself.
* a way to account for entities that have multiple buffers, a concept
that I no longer deem relevant.
While this commit does the bulk of the work to get there, the following
commits will refactor more stuff to get things back to normal.
Diffstat (limited to 'common/queryrunner.cpp')
-rw-r--r-- | common/queryrunner.cpp | 106 |
1 files changed, 58 insertions, 48 deletions
diff --git a/common/queryrunner.cpp b/common/queryrunner.cpp index f037cfc..e7963a3 100644 --- a/common/queryrunner.cpp +++ b/common/queryrunner.cpp | |||
@@ -28,11 +28,13 @@ | |||
28 | #include "definitions.h" | 28 | #include "definitions.h" |
29 | #include "domainadaptor.h" | 29 | #include "domainadaptor.h" |
30 | #include "asyncutils.h" | 30 | #include "asyncutils.h" |
31 | #include "entityreader.h" | 31 | #include "storage.h" |
32 | #include "datastorequery.h" | ||
32 | 33 | ||
33 | SINK_DEBUG_AREA("queryrunner") | 34 | SINK_DEBUG_AREA("queryrunner") |
34 | 35 | ||
35 | using namespace Sink; | 36 | using namespace Sink; |
37 | using namespace Sink::Storage; | ||
36 | 38 | ||
37 | /* | 39 | /* |
38 | * This class wraps the actual query implementation. | 40 | * This class wraps the actual query implementation. |
@@ -43,30 +45,28 @@ using namespace Sink; | |||
43 | template <typename DomainType> | 45 | template <typename DomainType> |
44 | class QueryWorker : public QObject | 46 | class QueryWorker : public QObject |
45 | { | 47 | { |
48 | typedef std::function<bool(const typename DomainType::Ptr &domainObject, Sink::Operation operation, const QMap<QByteArray, QVariant> &aggregateValues)> ResultCallback; | ||
46 | // SINK_DEBUG_COMPONENT(mResourceInstanceIdentifier, mId) | 49 | // SINK_DEBUG_COMPONENT(mResourceInstanceIdentifier, mId) |
47 | SINK_DEBUG_COMPONENT(mResourceInstanceIdentifier) | 50 | SINK_DEBUG_COMPONENT(mResourceContext.resourceInstanceIdentifier) |
48 | public: | 51 | public: |
49 | QueryWorker(const Sink::Query &query, const QByteArray &instanceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &, const QByteArray &bufferType, | 52 | QueryWorker(const Sink::Query &query, const ResourceContext &context, const QByteArray &bufferType, const QueryRunnerBase::ResultTransformation &transformation); |
50 | const QueryRunnerBase::ResultTransformation &transformation); | ||
51 | virtual ~QueryWorker(); | 53 | virtual ~QueryWorker(); |
52 | 54 | ||
55 | qint64 replaySet(ResultSet &resultSet, int offset, int batchSize, const ResultCallback &callback); | ||
53 | QPair<qint64, qint64> executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider); | 56 | QPair<qint64, qint64> executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider); |
54 | QPair<qint64, qint64> executeInitialQuery(const Sink::Query &query, const typename DomainType::Ptr &parent, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, int offset, int batchsize); | 57 | QPair<qint64, qint64> executeInitialQuery(const Sink::Query &query, const typename DomainType::Ptr &parent, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, int offset, int batchsize); |
55 | 58 | ||
56 | private: | 59 | private: |
57 | Storage::Transaction getTransaction(); | ||
58 | std::function<bool(const typename DomainType::Ptr &, Sink::Operation, const QMap<QByteArray, QVariant> &)> resultProviderCallback(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider); | 60 | std::function<bool(const typename DomainType::Ptr &, Sink::Operation, const QMap<QByteArray, QVariant> &)> resultProviderCallback(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider); |
59 | 61 | ||
60 | QueryRunnerBase::ResultTransformation mResultTransformation; | 62 | QueryRunnerBase::ResultTransformation mResultTransformation; |
61 | DomainTypeAdaptorFactoryInterface::Ptr mDomainTypeAdaptorFactory; | 63 | ResourceContext mResourceContext; |
62 | QByteArray mResourceInstanceIdentifier; | ||
63 | QByteArray mId; //Used for identification in debug output | 64 | QByteArray mId; //Used for identification in debug output |
64 | }; | 65 | }; |
65 | 66 | ||
66 | template <class DomainType> | 67 | template <class DomainType> |
67 | QueryRunner<DomainType>::QueryRunner(const Sink::Query &query, const Sink::ResourceAccessInterface::Ptr &resourceAccess, const QByteArray &instanceIdentifier, | 68 | QueryRunner<DomainType>::QueryRunner(const Sink::Query &query, const Sink::ResourceContext &context, const QByteArray &bufferType) |
68 | const DomainTypeAdaptorFactoryInterface::Ptr &factory, const QByteArray &bufferType) | 69 | : QueryRunnerBase(), mResourceContext(context), mResourceAccess(mResourceContext.resourceAccess()), mResultProvider(new ResultProvider<typename DomainType::Ptr>), mBatchSize(query.limit) |
69 | : QueryRunnerBase(), mResourceInstanceIdentifier(instanceIdentifier), mResourceAccess(resourceAccess), mResultProvider(new ResultProvider<typename DomainType::Ptr>), mBatchSize(query.limit) | ||
70 | { | 70 | { |
71 | SinkTrace() << "Starting query"; | 71 | SinkTrace() << "Starting query"; |
72 | if (query.limit && query.sortProperty.isEmpty()) { | 72 | if (query.limit && query.sortProperty.isEmpty()) { |
@@ -79,16 +79,17 @@ QueryRunner<DomainType>::QueryRunner(const Sink::Query &query, const Sink::Resou | |||
79 | SinkTrace() << "Running fetcher. Offset: " << mOffset[parentId] << " Batchsize: " << mBatchSize; | 79 | SinkTrace() << "Running fetcher. Offset: " << mOffset[parentId] << " Batchsize: " << mBatchSize; |
80 | auto resultProvider = mResultProvider; | 80 | auto resultProvider = mResultProvider; |
81 | if (query.synchronousQuery) { | 81 | if (query.synchronousQuery) { |
82 | QueryWorker<DomainType> worker(query, instanceIdentifier, factory, bufferType, mResultTransformation); | 82 | QueryWorker<DomainType> worker(query, mResourceContext, bufferType, mResultTransformation); |
83 | worker.executeInitialQuery(query, parent, *resultProvider, mOffset[parentId], mBatchSize); | 83 | worker.executeInitialQuery(query, parent, *resultProvider, mOffset[parentId], mBatchSize); |
84 | resultProvider->initialResultSetComplete(parent); | 84 | resultProvider->initialResultSetComplete(parent); |
85 | } else { | 85 | } else { |
86 | auto resultTransformation = mResultTransformation; | 86 | auto resultTransformation = mResultTransformation; |
87 | auto offset = mOffset[parentId]; | 87 | auto offset = mOffset[parentId]; |
88 | auto batchSize = mBatchSize; | 88 | auto batchSize = mBatchSize; |
89 | auto resourceContext = mResourceContext; | ||
89 | //The lambda will be executed in a separate thread, so we're extra careful | 90 | //The lambda will be executed in a separate thread, so we're extra careful |
90 | async::run<QPair<qint64, qint64> >([resultTransformation, offset, batchSize, query, bufferType, instanceIdentifier, factory, resultProvider, parent]() { | 91 | async::run<QPair<qint64, qint64> >([resultTransformation, offset, batchSize, query, bufferType, resourceContext, resultProvider, parent]() { |
91 | QueryWorker<DomainType> worker(query, instanceIdentifier, factory, bufferType, resultTransformation); | 92 | QueryWorker<DomainType> worker(query, resourceContext, bufferType, resultTransformation); |
92 | const auto newRevisionAndReplayedEntities = worker.executeInitialQuery(query, parent, *resultProvider, offset, batchSize); | 93 | const auto newRevisionAndReplayedEntities = worker.executeInitialQuery(query, parent, *resultProvider, offset, batchSize); |
93 | return newRevisionAndReplayedEntities; | 94 | return newRevisionAndReplayedEntities; |
94 | }) | 95 | }) |
@@ -115,8 +116,9 @@ QueryRunner<DomainType>::QueryRunner(const Sink::Query &query, const Sink::Resou | |||
115 | // Incremental updates are always loaded directly, leaving it up to the result to discard the changes if they are not interesting | 116 | // Incremental updates are always loaded directly, leaving it up to the result to discard the changes if they are not interesting |
116 | setQuery([=]() -> KAsync::Job<void> { | 117 | setQuery([=]() -> KAsync::Job<void> { |
117 | auto resultProvider = mResultProvider; | 118 | auto resultProvider = mResultProvider; |
119 | auto resourceContext = mResourceContext; | ||
118 | return async::run<QPair<qint64, qint64> >([=]() { | 120 | return async::run<QPair<qint64, qint64> >([=]() { |
119 | QueryWorker<DomainType> worker(query, instanceIdentifier, factory, bufferType, mResultTransformation); | 121 | QueryWorker<DomainType> worker(query, resourceContext, bufferType, mResultTransformation); |
120 | const auto newRevisionAndReplayedEntities = worker.executeIncrementalQuery(query, *resultProvider); | 122 | const auto newRevisionAndReplayedEntities = worker.executeIncrementalQuery(query, *resultProvider); |
121 | return newRevisionAndReplayedEntities; | 123 | return newRevisionAndReplayedEntities; |
122 | }) | 124 | }) |
@@ -158,11 +160,10 @@ typename Sink::ResultEmitter<typename DomainType::Ptr>::Ptr QueryRunner<DomainTy | |||
158 | return mResultProvider->emitter(); | 160 | return mResultProvider->emitter(); |
159 | } | 161 | } |
160 | 162 | ||
161 | |||
162 | template <class DomainType> | 163 | template <class DomainType> |
163 | QueryWorker<DomainType>::QueryWorker(const Sink::Query &query, const QByteArray &instanceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &factory, | 164 | QueryWorker<DomainType>::QueryWorker(const Sink::Query &query, const Sink::ResourceContext &resourceContext, |
164 | const QByteArray &bufferType, const QueryRunnerBase::ResultTransformation &transformation) | 165 | const QByteArray &bufferType, const QueryRunnerBase::ResultTransformation &transformation) |
165 | : QObject(), mResultTransformation(transformation), mDomainTypeAdaptorFactory(factory), mResourceInstanceIdentifier(instanceIdentifier), mId(QUuid::createUuid().toByteArray()) | 166 | : QObject(), mResultTransformation(transformation), mResourceContext(resourceContext), mId(QUuid::createUuid().toByteArray()) |
166 | { | 167 | { |
167 | SinkTrace() << "Starting query worker"; | 168 | SinkTrace() << "Starting query worker"; |
168 | } | 169 | } |
@@ -203,41 +204,46 @@ std::function<bool(const typename DomainType::Ptr &, Sink::Operation, const QMap | |||
203 | } | 204 | } |
204 | 205 | ||
205 | template <class DomainType> | 206 | template <class DomainType> |
207 | qint64 QueryWorker<DomainType>::replaySet(ResultSet &resultSet, int offset, int batchSize, const ResultCallback &callback) | ||
208 | { | ||
209 | SinkTrace() << "Skipping over " << offset << " results"; | ||
210 | resultSet.skip(offset); | ||
211 | int counter = 0; | ||
212 | while (!batchSize || (counter < batchSize)) { | ||
213 | const bool ret = | ||
214 | resultSet.next([this, &counter, callback](const ResultSet::Result &result) -> bool { | ||
215 | counter++; | ||
216 | auto adaptor = mResourceContext.adaptorFactory<DomainType>().createAdaptor(result.buffer.entity()); | ||
217 | Q_ASSERT(adaptor); | ||
218 | return callback(QSharedPointer<DomainType>::create(mResourceContext.instanceId(), result.uid, result.buffer.revision(), adaptor), result.operation, result.aggregateValues); | ||
219 | }); | ||
220 | if (!ret) { | ||
221 | break; | ||
222 | } | ||
223 | }; | ||
224 | SinkTrace() << "Replayed " << counter << " results." | ||
225 | << "Limit " << batchSize; | ||
226 | return counter; | ||
227 | } | ||
228 | |||
229 | template <class DomainType> | ||
206 | QPair<qint64, qint64> QueryWorker<DomainType>::executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider) | 230 | QPair<qint64, qint64> QueryWorker<DomainType>::executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider) |
207 | { | 231 | { |
208 | QTime time; | 232 | QTime time; |
209 | time.start(); | 233 | time.start(); |
210 | 234 | ||
211 | auto transaction = getTransaction(); | 235 | auto entityStore = EntityStore::Ptr::create(mResourceContext); |
212 | 236 | ||
213 | Sink::EntityReader<DomainType> reader(*mDomainTypeAdaptorFactory, mResourceInstanceIdentifier, transaction); | 237 | const qint64 baseRevision = resultProvider.revision() + 1; |
214 | auto revisionAndReplayedEntities = reader.executeIncrementalQuery(query, resultProvider.revision(), resultProviderCallback(query, resultProvider)); | ||
215 | SinkTrace() << "Incremental query took: " << Log::TraceTime(time.elapsed()); | ||
216 | return revisionAndReplayedEntities; | ||
217 | } | ||
218 | 238 | ||
219 | template <class DomainType> | 239 | auto preparedQuery = ApplicationDomain::TypeImplementation<DomainType>::prepareQuery(query, entityStore); |
220 | Storage::Transaction QueryWorker<DomainType>::getTransaction() | 240 | auto resultSet = preparedQuery->update(baseRevision); |
221 | { | ||
222 | Sink::Storage::Transaction transaction; | ||
223 | { | ||
224 | Sink::Storage storage(Sink::storageLocation(), mResourceInstanceIdentifier); | ||
225 | if (!storage.exists()) { | ||
226 | //This is not an error if the resource wasn't started before | ||
227 | SinkLog() << "Store doesn't exist: " << mResourceInstanceIdentifier; | ||
228 | return Sink::Storage::Transaction(); | ||
229 | } | ||
230 | storage.setDefaultErrorHandler([this](const Sink::Storage::Error &error) { SinkWarning() << "Error during query: " << error.store << error.message; }); | ||
231 | transaction = storage.createTransaction(Sink::Storage::ReadOnly); | ||
232 | } | ||
233 | 241 | ||
234 | //FIXME this is a temporary measure to recover from a failure to open the named databases correctly. | 242 | SinkTrace() << "Filtered set retrieved. " << Log::TraceTime(time.elapsed()); |
235 | //Once the actual problem is fixed it will be enough to simply crash if we open the wrong database (which we check in openDatabase already). | 243 | auto replayedEntities = replaySet(resultSet, 0, 0, resultProviderCallback(query, resultProvider)); |
236 | while (!transaction.validateNamedDatabases()) { | 244 | |
237 | Sink::Storage storage(Sink::storageLocation(), mResourceInstanceIdentifier); | 245 | SinkTrace() << "Incremental query took: " << Log::TraceTime(time.elapsed()); |
238 | transaction = storage.createTransaction(Sink::Storage::ReadOnly); | 246 | return qMakePair(entityStore->maxRevision(), replayedEntities); |
239 | } | ||
240 | return transaction; | ||
241 | } | 247 | } |
242 | 248 | ||
243 | template <class DomainType> | 249 | template <class DomainType> |
@@ -258,12 +264,16 @@ QPair<qint64, qint64> QueryWorker<DomainType>::executeInitialQuery( | |||
258 | } | 264 | } |
259 | } | 265 | } |
260 | 266 | ||
261 | auto transaction = getTransaction(); | 267 | auto entityStore = EntityStore::Ptr::create(mResourceContext); |
268 | |||
269 | auto preparedQuery = ApplicationDomain::TypeImplementation<DomainType>::prepareQuery(query, entityStore); | ||
270 | auto resultSet = preparedQuery->execute(); | ||
271 | |||
272 | SinkTrace() << "Filtered set retrieved. " << Log::TraceTime(time.elapsed()); | ||
273 | auto replayedEntities = replaySet(resultSet, offset, batchsize, resultProviderCallback(query, resultProvider)); | ||
262 | 274 | ||
263 | Sink::EntityReader<DomainType> reader(*mDomainTypeAdaptorFactory, mResourceInstanceIdentifier, transaction); | ||
264 | auto revisionAndReplayedEntities = reader.executeInitialQuery(modifiedQuery, offset, batchsize, resultProviderCallback(query, resultProvider)); | ||
265 | SinkTrace() << "Initial query took: " << Log::TraceTime(time.elapsed()); | 275 | SinkTrace() << "Initial query took: " << Log::TraceTime(time.elapsed()); |
266 | return revisionAndReplayedEntities; | 276 | return qMakePair(entityStore->maxRevision(), replayedEntities); |
267 | } | 277 | } |
268 | 278 | ||
269 | template class QueryRunner<Sink::ApplicationDomain::Folder>; | 279 | template class QueryRunner<Sink::ApplicationDomain::Folder>; |