From bb70bdcd0eaf72ffc304536267a66c5de5eaf2e9 Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Tue, 14 Jun 2016 13:26:21 +0200 Subject: Synchronous API --- common/query.h | 15 ++++++++++----- common/queryrunner.cpp | 31 +++++++++++++++++++------------ common/store.cpp | 49 ++++++++++++++++++++++++++++++++++++++++++++++++- common/store.h | 6 ++++++ 4 files changed, 83 insertions(+), 18 deletions(-) (limited to 'common') diff --git a/common/query.h b/common/query.h index 717ed75..d2c0e2e 100644 --- a/common/query.h +++ b/common/query.h @@ -36,7 +36,9 @@ public: enum Flag { /** Leave the query running and continuously update the result set. */ - LiveQuery + LiveQuery, + /** Run the query synchronously. */ + SynchronousQuery }; Q_DECLARE_FLAGS(Flags, Flag) @@ -139,7 +141,9 @@ public: static Query IdentityFilter(const ApplicationDomain::Entity &entity) { - return IdentityFilter(entity.identifier()); + auto query = IdentityFilter(entity.identifier()); + query.resources << entity.resourceInstanceIdentifier(); + return query; } static Query RequestedProperties(const QByteArrayList &properties) @@ -191,13 +195,13 @@ public: return *this; } - Query(const ApplicationDomain::Entity &value) : limit(0) + Query(const ApplicationDomain::Entity &value) : limit(0), liveQuery(false), synchronousQuery(false) { ids << value.identifier(); resources << value.resourceInstanceIdentifier(); } - Query(Flags flags = Flags()) : limit(0) + Query(Flags flags = Flags()) : limit(0), liveQuery(false), synchronousQuery(false) { } @@ -230,8 +234,9 @@ public: QByteArrayList requestedProperties; QByteArray parentProperty; QByteArray sortProperty; - bool liveQuery; int limit; + bool liveQuery; + bool synchronousQuery; }; } diff --git a/common/queryrunner.cpp b/common/queryrunner.cpp index c6a6b86..f2a7753 100644 --- a/common/queryrunner.cpp +++ b/common/queryrunner.cpp @@ -78,23 +78,31 @@ QueryRunner::QueryRunner(const Sink::Query &query, const Sink::Resou const QByteArray parentId = parent ? parent->identifier() : QByteArray(); Trace() << "Running fetcher. Offset: " << mOffset[parentId] << " Batchsize: " << mBatchSize; auto resultProvider = mResultProvider; - async::run >([=]() { + if (query.synchronousQuery) { QueryWorker worker(query, instanceIdentifier, factory, bufferType, mResultTransformation); - const auto newRevisionAndReplayedEntities = worker.executeInitialQuery(query, parent, *resultProvider, mOffset[parentId], mBatchSize); - return newRevisionAndReplayedEntities; - }) - .template then>([=](const QPair &newRevisionAndReplayedEntities) { - mOffset[parentId] += newRevisionAndReplayedEntities.second; - // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. - if (query.liveQuery) { - mResourceAccess->sendRevisionReplayedCommand(newRevisionAndReplayedEntities.first); - } + worker.executeInitialQuery(query, parent, *resultProvider, mOffset[parentId], mBatchSize); + resultProvider->initialResultSetComplete(parent); + } else { + async::run >([=]() { + QueryWorker worker(query, instanceIdentifier, factory, bufferType, mResultTransformation); + const auto newRevisionAndReplayedEntities = worker.executeInitialQuery(query, parent, *resultProvider, mOffset[parentId], mBatchSize); + return newRevisionAndReplayedEntities; }) - .exec(); + .template then>([=](const QPair &newRevisionAndReplayedEntities) { + mOffset[parentId] += newRevisionAndReplayedEntities.second; + // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. + if (query.liveQuery) { + mResourceAccess->sendRevisionReplayedCommand(newRevisionAndReplayedEntities.first); + } + resultProvider->initialResultSetComplete(parent); + }) + .exec(); + } }); // In case of a live query we keep the runner for as long alive as the result provider exists if (query.liveQuery) { + Q_ASSERT(!query.synchronousQuery); // Incremental updates are always loaded directly, leaving it up to the result to discard the changes if they are not interesting setQuery([=]() -> KAsync::Job { auto resultProvider = mResultProvider; @@ -233,7 +241,6 @@ QPair QueryWorker::executeInitialQuery( Sink::EntityReader reader(*mDomainTypeAdaptorFactory, mResourceInstanceIdentifier, transaction); auto revisionAndReplayedEntities = reader.executeInitialQuery(modifiedQuery, offset, batchsize, resultProviderCallback(query, resultProvider)); Trace() << "Initial query took: " << Log::TraceTime(time.elapsed()); - resultProvider.initialResultSetComplete(parent); return revisionAndReplayedEntities; } diff --git a/common/store.cpp b/common/store.cpp index 92915c4..1162a18 100644 --- a/common/store.cpp +++ b/common/store.cpp @@ -286,6 +286,51 @@ KAsync::Job> Store::fetch(const Sink::Query &que }); } +template +DomainType Store::readOne(const Sink::Query &query) +{ + const auto list = read(query); + if (!list.isEmpty()) { + return list.first(); + } + return DomainType(); +} + +template +QList Store::read(const Sink::Query &q) +{ + auto query = q; + query.synchronousQuery = true; + query.liveQuery = false; + QList list; + auto resources = getResources(query.resources, query.accounts, ApplicationDomain::getTypeName()); + auto aggregatingEmitter = AggregatingResultEmitter::Ptr::create(); + aggregatingEmitter->onAdded([&list](const typename DomainType::Ptr &value){ + Trace() << "Found value: " << value->identifier(); + list << *value; + }); + for (const auto resourceInstanceIdentifier : resources.keys()) { + const auto resourceType = resources.value(resourceInstanceIdentifier); + Trace() << "Looking for " << resourceType << resourceInstanceIdentifier; + auto facade = FacadeFactory::instance().getFacade(resourceType, resourceInstanceIdentifier); + if (facade) { + Trace() << "Trying to fetch from resource " << resourceInstanceIdentifier; + auto result = facade->load(query); + if (result.second) { + aggregatingEmitter->addEmitter(result.second); + } else { + Warning() << "Null emitter for resource " << resourceInstanceIdentifier; + } + result.first.exec(); + aggregatingEmitter->fetch(typename DomainType::Ptr()); + } else { + Trace() << "Couldn't find a facade for " << resourceInstanceIdentifier; + // Ignore the error and carry on + } + } + return list; +} + #define REGISTER_TYPE(T) \ template KAsync::Job Store::remove(const T &domainObject); \ template KAsync::Job Store::create(const T &domainObject); \ @@ -293,7 +338,9 @@ KAsync::Job> Store::fetch(const Sink::Query &que template QSharedPointer Store::loadModel(Query query); \ template KAsync::Job Store::fetchOne(const Query &); \ template KAsync::Job> Store::fetchAll(const Query &); \ - template KAsync::Job> Store::fetch(const Query &, int); + template KAsync::Job> Store::fetch(const Query &, int); \ + template T Store::readOne(const Query &); \ + template QList Store::read(const Query &); REGISTER_TYPE(ApplicationDomain::Event); REGISTER_TYPE(ApplicationDomain::Mail); diff --git a/common/store.h b/common/store.h index aed3be4..571ffff 100644 --- a/common/store.h +++ b/common/store.h @@ -101,5 +101,11 @@ KAsync::Job> SINK_EXPORT fetchAll(const Sink::Qu template KAsync::Job> SINK_EXPORT fetch(const Sink::Query &query, int minimumAmount = 0); + +template +DomainType SINK_EXPORT readOne(const Sink::Query &query); + +template +QList SINK_EXPORT read(const Sink::Query &query); } } -- cgit v1.2.3