diff options
-rw-r--r-- | common/query.h | 15 | ||||
-rw-r--r-- | common/queryrunner.cpp | 31 | ||||
-rw-r--r-- | common/store.cpp | 49 | ||||
-rw-r--r-- | common/store.h | 6 |
4 files changed, 83 insertions, 18 deletions
diff --git a/common/query.h b/common/query.h index 717ed75..d2c0e2e 100644 --- a/common/query.h +++ b/common/query.h | |||
@@ -36,7 +36,9 @@ public: | |||
36 | enum Flag | 36 | enum Flag |
37 | { | 37 | { |
38 | /** Leave the query running and continuously update the result set. */ | 38 | /** Leave the query running and continuously update the result set. */ |
39 | LiveQuery | 39 | LiveQuery, |
40 | /** Run the query synchronously. */ | ||
41 | SynchronousQuery | ||
40 | }; | 42 | }; |
41 | Q_DECLARE_FLAGS(Flags, Flag) | 43 | Q_DECLARE_FLAGS(Flags, Flag) |
42 | 44 | ||
@@ -139,7 +141,9 @@ public: | |||
139 | 141 | ||
140 | static Query IdentityFilter(const ApplicationDomain::Entity &entity) | 142 | static Query IdentityFilter(const ApplicationDomain::Entity &entity) |
141 | { | 143 | { |
142 | return IdentityFilter(entity.identifier()); | 144 | auto query = IdentityFilter(entity.identifier()); |
145 | query.resources << entity.resourceInstanceIdentifier(); | ||
146 | return query; | ||
143 | } | 147 | } |
144 | 148 | ||
145 | static Query RequestedProperties(const QByteArrayList &properties) | 149 | static Query RequestedProperties(const QByteArrayList &properties) |
@@ -191,13 +195,13 @@ public: | |||
191 | return *this; | 195 | return *this; |
192 | } | 196 | } |
193 | 197 | ||
194 | Query(const ApplicationDomain::Entity &value) : limit(0) | 198 | Query(const ApplicationDomain::Entity &value) : limit(0), liveQuery(false), synchronousQuery(false) |
195 | { | 199 | { |
196 | ids << value.identifier(); | 200 | ids << value.identifier(); |
197 | resources << value.resourceInstanceIdentifier(); | 201 | resources << value.resourceInstanceIdentifier(); |
198 | } | 202 | } |
199 | 203 | ||
200 | Query(Flags flags = Flags()) : limit(0) | 204 | Query(Flags flags = Flags()) : limit(0), liveQuery(false), synchronousQuery(false) |
201 | { | 205 | { |
202 | } | 206 | } |
203 | 207 | ||
@@ -230,8 +234,9 @@ public: | |||
230 | QByteArrayList requestedProperties; | 234 | QByteArrayList requestedProperties; |
231 | QByteArray parentProperty; | 235 | QByteArray parentProperty; |
232 | QByteArray sortProperty; | 236 | QByteArray sortProperty; |
233 | bool liveQuery; | ||
234 | int limit; | 237 | int limit; |
238 | bool liveQuery; | ||
239 | bool synchronousQuery; | ||
235 | }; | 240 | }; |
236 | } | 241 | } |
237 | 242 | ||
diff --git a/common/queryrunner.cpp b/common/queryrunner.cpp index c6a6b86..f2a7753 100644 --- a/common/queryrunner.cpp +++ b/common/queryrunner.cpp | |||
@@ -78,23 +78,31 @@ QueryRunner<DomainType>::QueryRunner(const Sink::Query &query, const Sink::Resou | |||
78 | const QByteArray parentId = parent ? parent->identifier() : QByteArray(); | 78 | const QByteArray parentId = parent ? parent->identifier() : QByteArray(); |
79 | Trace() << "Running fetcher. Offset: " << mOffset[parentId] << " Batchsize: " << mBatchSize; | 79 | Trace() << "Running fetcher. Offset: " << mOffset[parentId] << " Batchsize: " << mBatchSize; |
80 | auto resultProvider = mResultProvider; | 80 | auto resultProvider = mResultProvider; |
81 | async::run<QPair<qint64, qint64> >([=]() { | 81 | if (query.synchronousQuery) { |
82 | QueryWorker<DomainType> worker(query, instanceIdentifier, factory, bufferType, mResultTransformation); | 82 | QueryWorker<DomainType> worker(query, instanceIdentifier, factory, bufferType, mResultTransformation); |
83 | const auto newRevisionAndReplayedEntities = worker.executeInitialQuery(query, parent, *resultProvider, mOffset[parentId], mBatchSize); | 83 | worker.executeInitialQuery(query, parent, *resultProvider, mOffset[parentId], mBatchSize); |
84 | return newRevisionAndReplayedEntities; | 84 | resultProvider->initialResultSetComplete(parent); |
85 | }) | 85 | } else { |
86 | .template then<void, QPair<qint64, qint64>>([=](const QPair<qint64, qint64> &newRevisionAndReplayedEntities) { | 86 | async::run<QPair<qint64, qint64> >([=]() { |
87 | mOffset[parentId] += newRevisionAndReplayedEntities.second; | 87 | QueryWorker<DomainType> worker(query, instanceIdentifier, factory, bufferType, mResultTransformation); |
88 | // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. | 88 | const auto newRevisionAndReplayedEntities = worker.executeInitialQuery(query, parent, *resultProvider, mOffset[parentId], mBatchSize); |
89 | if (query.liveQuery) { | 89 | return newRevisionAndReplayedEntities; |
90 | mResourceAccess->sendRevisionReplayedCommand(newRevisionAndReplayedEntities.first); | ||
91 | } | ||
92 | }) | 90 | }) |
93 | .exec(); | 91 | .template then<void, QPair<qint64, qint64>>([=](const QPair<qint64, qint64> &newRevisionAndReplayedEntities) { |
92 | mOffset[parentId] += newRevisionAndReplayedEntities.second; | ||
93 | // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. | ||
94 | if (query.liveQuery) { | ||
95 | mResourceAccess->sendRevisionReplayedCommand(newRevisionAndReplayedEntities.first); | ||
96 | } | ||
97 | resultProvider->initialResultSetComplete(parent); | ||
98 | }) | ||
99 | .exec(); | ||
100 | } | ||
94 | }); | 101 | }); |
95 | 102 | ||
96 | // In case of a live query we keep the runner for as long alive as the result provider exists | 103 | // In case of a live query we keep the runner for as long alive as the result provider exists |
97 | if (query.liveQuery) { | 104 | if (query.liveQuery) { |
105 | Q_ASSERT(!query.synchronousQuery); | ||
98 | // Incremental updates are always loaded directly, leaving it up to the result to discard the changes if they are not interesting | 106 | // Incremental updates are always loaded directly, leaving it up to the result to discard the changes if they are not interesting |
99 | setQuery([=]() -> KAsync::Job<void> { | 107 | setQuery([=]() -> KAsync::Job<void> { |
100 | auto resultProvider = mResultProvider; | 108 | auto resultProvider = mResultProvider; |
@@ -233,7 +241,6 @@ QPair<qint64, qint64> QueryWorker<DomainType>::executeInitialQuery( | |||
233 | Sink::EntityReader<DomainType> reader(*mDomainTypeAdaptorFactory, mResourceInstanceIdentifier, transaction); | 241 | Sink::EntityReader<DomainType> reader(*mDomainTypeAdaptorFactory, mResourceInstanceIdentifier, transaction); |
234 | auto revisionAndReplayedEntities = reader.executeInitialQuery(modifiedQuery, offset, batchsize, resultProviderCallback(query, resultProvider)); | 242 | auto revisionAndReplayedEntities = reader.executeInitialQuery(modifiedQuery, offset, batchsize, resultProviderCallback(query, resultProvider)); |
235 | Trace() << "Initial query took: " << Log::TraceTime(time.elapsed()); | 243 | Trace() << "Initial query took: " << Log::TraceTime(time.elapsed()); |
236 | resultProvider.initialResultSetComplete(parent); | ||
237 | return revisionAndReplayedEntities; | 244 | return revisionAndReplayedEntities; |
238 | } | 245 | } |
239 | 246 | ||
diff --git a/common/store.cpp b/common/store.cpp index 92915c4..1162a18 100644 --- a/common/store.cpp +++ b/common/store.cpp | |||
@@ -286,6 +286,51 @@ KAsync::Job<QList<typename DomainType::Ptr>> Store::fetch(const Sink::Query &que | |||
286 | }); | 286 | }); |
287 | } | 287 | } |
288 | 288 | ||
289 | template <class DomainType> | ||
290 | DomainType Store::readOne(const Sink::Query &query) | ||
291 | { | ||
292 | const auto list = read<DomainType>(query); | ||
293 | if (!list.isEmpty()) { | ||
294 | return list.first(); | ||
295 | } | ||
296 | return DomainType(); | ||
297 | } | ||
298 | |||
299 | template <class DomainType> | ||
300 | QList<DomainType> Store::read(const Sink::Query &q) | ||
301 | { | ||
302 | auto query = q; | ||
303 | query.synchronousQuery = true; | ||
304 | query.liveQuery = false; | ||
305 | QList<DomainType> list; | ||
306 | auto resources = getResources(query.resources, query.accounts, ApplicationDomain::getTypeName<DomainType>()); | ||
307 | auto aggregatingEmitter = AggregatingResultEmitter<typename DomainType::Ptr>::Ptr::create(); | ||
308 | aggregatingEmitter->onAdded([&list](const typename DomainType::Ptr &value){ | ||
309 | Trace() << "Found value: " << value->identifier(); | ||
310 | list << *value; | ||
311 | }); | ||
312 | for (const auto resourceInstanceIdentifier : resources.keys()) { | ||
313 | const auto resourceType = resources.value(resourceInstanceIdentifier); | ||
314 | Trace() << "Looking for " << resourceType << resourceInstanceIdentifier; | ||
315 | auto facade = FacadeFactory::instance().getFacade<DomainType>(resourceType, resourceInstanceIdentifier); | ||
316 | if (facade) { | ||
317 | Trace() << "Trying to fetch from resource " << resourceInstanceIdentifier; | ||
318 | auto result = facade->load(query); | ||
319 | if (result.second) { | ||
320 | aggregatingEmitter->addEmitter(result.second); | ||
321 | } else { | ||
322 | Warning() << "Null emitter for resource " << resourceInstanceIdentifier; | ||
323 | } | ||
324 | result.first.exec(); | ||
325 | aggregatingEmitter->fetch(typename DomainType::Ptr()); | ||
326 | } else { | ||
327 | Trace() << "Couldn't find a facade for " << resourceInstanceIdentifier; | ||
328 | // Ignore the error and carry on | ||
329 | } | ||
330 | } | ||
331 | return list; | ||
332 | } | ||
333 | |||
289 | #define REGISTER_TYPE(T) \ | 334 | #define REGISTER_TYPE(T) \ |
290 | template KAsync::Job<void> Store::remove<T>(const T &domainObject); \ | 335 | template KAsync::Job<void> Store::remove<T>(const T &domainObject); \ |
291 | template KAsync::Job<void> Store::create<T>(const T &domainObject); \ | 336 | template KAsync::Job<void> Store::create<T>(const T &domainObject); \ |
@@ -293,7 +338,9 @@ KAsync::Job<QList<typename DomainType::Ptr>> Store::fetch(const Sink::Query &que | |||
293 | template QSharedPointer<QAbstractItemModel> Store::loadModel<T>(Query query); \ | 338 | template QSharedPointer<QAbstractItemModel> Store::loadModel<T>(Query query); \ |
294 | template KAsync::Job<T> Store::fetchOne<T>(const Query &); \ | 339 | template KAsync::Job<T> Store::fetchOne<T>(const Query &); \ |
295 | template KAsync::Job<QList<T::Ptr>> Store::fetchAll<T>(const Query &); \ | 340 | template KAsync::Job<QList<T::Ptr>> Store::fetchAll<T>(const Query &); \ |
296 | template KAsync::Job<QList<T::Ptr>> Store::fetch<T>(const Query &, int); | 341 | template KAsync::Job<QList<T::Ptr>> Store::fetch<T>(const Query &, int); \ |
342 | template T Store::readOne<T>(const Query &); \ | ||
343 | template QList<T> Store::read<T>(const Query &); | ||
297 | 344 | ||
298 | REGISTER_TYPE(ApplicationDomain::Event); | 345 | REGISTER_TYPE(ApplicationDomain::Event); |
299 | REGISTER_TYPE(ApplicationDomain::Mail); | 346 | REGISTER_TYPE(ApplicationDomain::Mail); |
diff --git a/common/store.h b/common/store.h index aed3be4..571ffff 100644 --- a/common/store.h +++ b/common/store.h | |||
@@ -101,5 +101,11 @@ KAsync::Job<QList<typename DomainType::Ptr>> SINK_EXPORT fetchAll(const Sink::Qu | |||
101 | 101 | ||
102 | template <class DomainType> | 102 | template <class DomainType> |
103 | KAsync::Job<QList<typename DomainType::Ptr>> SINK_EXPORT fetch(const Sink::Query &query, int minimumAmount = 0); | 103 | KAsync::Job<QList<typename DomainType::Ptr>> SINK_EXPORT fetch(const Sink::Query &query, int minimumAmount = 0); |
104 | |||
105 | template <class DomainType> | ||
106 | DomainType SINK_EXPORT readOne(const Sink::Query &query); | ||
107 | |||
108 | template <class DomainType> | ||
109 | QList<DomainType> SINK_EXPORT read(const Sink::Query &query); | ||
104 | } | 110 | } |
105 | } | 111 | } |