summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--common/query.h15
-rw-r--r--common/queryrunner.cpp31
-rw-r--r--common/store.cpp49
-rw-r--r--common/store.h6
4 files changed, 83 insertions, 18 deletions
diff --git a/common/query.h b/common/query.h
index 717ed75..d2c0e2e 100644
--- a/common/query.h
+++ b/common/query.h
@@ -36,7 +36,9 @@ public:
36 enum Flag 36 enum Flag
37 { 37 {
38 /** Leave the query running and continuously update the result set. */ 38 /** Leave the query running and continuously update the result set. */
39 LiveQuery 39 LiveQuery,
40 /** Run the query synchronously. */
41 SynchronousQuery
40 }; 42 };
41 Q_DECLARE_FLAGS(Flags, Flag) 43 Q_DECLARE_FLAGS(Flags, Flag)
42 44
@@ -139,7 +141,9 @@ public:
139 141
140 static Query IdentityFilter(const ApplicationDomain::Entity &entity) 142 static Query IdentityFilter(const ApplicationDomain::Entity &entity)
141 { 143 {
142 return IdentityFilter(entity.identifier()); 144 auto query = IdentityFilter(entity.identifier());
145 query.resources << entity.resourceInstanceIdentifier();
146 return query;
143 } 147 }
144 148
145 static Query RequestedProperties(const QByteArrayList &properties) 149 static Query RequestedProperties(const QByteArrayList &properties)
@@ -191,13 +195,13 @@ public:
191 return *this; 195 return *this;
192 } 196 }
193 197
194 Query(const ApplicationDomain::Entity &value) : limit(0) 198 Query(const ApplicationDomain::Entity &value) : limit(0), liveQuery(false), synchronousQuery(false)
195 { 199 {
196 ids << value.identifier(); 200 ids << value.identifier();
197 resources << value.resourceInstanceIdentifier(); 201 resources << value.resourceInstanceIdentifier();
198 } 202 }
199 203
200 Query(Flags flags = Flags()) : limit(0) 204 Query(Flags flags = Flags()) : limit(0), liveQuery(false), synchronousQuery(false)
201 { 205 {
202 } 206 }
203 207
@@ -230,8 +234,9 @@ public:
230 QByteArrayList requestedProperties; 234 QByteArrayList requestedProperties;
231 QByteArray parentProperty; 235 QByteArray parentProperty;
232 QByteArray sortProperty; 236 QByteArray sortProperty;
233 bool liveQuery;
234 int limit; 237 int limit;
238 bool liveQuery;
239 bool synchronousQuery;
235}; 240};
236} 241}
237 242
diff --git a/common/queryrunner.cpp b/common/queryrunner.cpp
index c6a6b86..f2a7753 100644
--- a/common/queryrunner.cpp
+++ b/common/queryrunner.cpp
@@ -78,23 +78,31 @@ QueryRunner<DomainType>::QueryRunner(const Sink::Query &query, const Sink::Resou
78 const QByteArray parentId = parent ? parent->identifier() : QByteArray(); 78 const QByteArray parentId = parent ? parent->identifier() : QByteArray();
79 Trace() << "Running fetcher. Offset: " << mOffset[parentId] << " Batchsize: " << mBatchSize; 79 Trace() << "Running fetcher. Offset: " << mOffset[parentId] << " Batchsize: " << mBatchSize;
80 auto resultProvider = mResultProvider; 80 auto resultProvider = mResultProvider;
81 async::run<QPair<qint64, qint64> >([=]() { 81 if (query.synchronousQuery) {
82 QueryWorker<DomainType> worker(query, instanceIdentifier, factory, bufferType, mResultTransformation); 82 QueryWorker<DomainType> worker(query, instanceIdentifier, factory, bufferType, mResultTransformation);
83 const auto newRevisionAndReplayedEntities = worker.executeInitialQuery(query, parent, *resultProvider, mOffset[parentId], mBatchSize); 83 worker.executeInitialQuery(query, parent, *resultProvider, mOffset[parentId], mBatchSize);
84 return newRevisionAndReplayedEntities; 84 resultProvider->initialResultSetComplete(parent);
85 }) 85 } else {
86 .template then<void, QPair<qint64, qint64>>([=](const QPair<qint64, qint64> &newRevisionAndReplayedEntities) { 86 async::run<QPair<qint64, qint64> >([=]() {
87 mOffset[parentId] += newRevisionAndReplayedEntities.second; 87 QueryWorker<DomainType> worker(query, instanceIdentifier, factory, bufferType, mResultTransformation);
88 // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. 88 const auto newRevisionAndReplayedEntities = worker.executeInitialQuery(query, parent, *resultProvider, mOffset[parentId], mBatchSize);
89 if (query.liveQuery) { 89 return newRevisionAndReplayedEntities;
90 mResourceAccess->sendRevisionReplayedCommand(newRevisionAndReplayedEntities.first);
91 }
92 }) 90 })
93 .exec(); 91 .template then<void, QPair<qint64, qint64>>([=](const QPair<qint64, qint64> &newRevisionAndReplayedEntities) {
92 mOffset[parentId] += newRevisionAndReplayedEntities.second;
93 // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise.
94 if (query.liveQuery) {
95 mResourceAccess->sendRevisionReplayedCommand(newRevisionAndReplayedEntities.first);
96 }
97 resultProvider->initialResultSetComplete(parent);
98 })
99 .exec();
100 }
94 }); 101 });
95 102
96 // In case of a live query we keep the runner for as long alive as the result provider exists 103 // In case of a live query we keep the runner for as long alive as the result provider exists
97 if (query.liveQuery) { 104 if (query.liveQuery) {
105 Q_ASSERT(!query.synchronousQuery);
98 // Incremental updates are always loaded directly, leaving it up to the result to discard the changes if they are not interesting 106 // Incremental updates are always loaded directly, leaving it up to the result to discard the changes if they are not interesting
99 setQuery([=]() -> KAsync::Job<void> { 107 setQuery([=]() -> KAsync::Job<void> {
100 auto resultProvider = mResultProvider; 108 auto resultProvider = mResultProvider;
@@ -233,7 +241,6 @@ QPair<qint64, qint64> QueryWorker<DomainType>::executeInitialQuery(
233 Sink::EntityReader<DomainType> reader(*mDomainTypeAdaptorFactory, mResourceInstanceIdentifier, transaction); 241 Sink::EntityReader<DomainType> reader(*mDomainTypeAdaptorFactory, mResourceInstanceIdentifier, transaction);
234 auto revisionAndReplayedEntities = reader.executeInitialQuery(modifiedQuery, offset, batchsize, resultProviderCallback(query, resultProvider)); 242 auto revisionAndReplayedEntities = reader.executeInitialQuery(modifiedQuery, offset, batchsize, resultProviderCallback(query, resultProvider));
235 Trace() << "Initial query took: " << Log::TraceTime(time.elapsed()); 243 Trace() << "Initial query took: " << Log::TraceTime(time.elapsed());
236 resultProvider.initialResultSetComplete(parent);
237 return revisionAndReplayedEntities; 244 return revisionAndReplayedEntities;
238} 245}
239 246
diff --git a/common/store.cpp b/common/store.cpp
index 92915c4..1162a18 100644
--- a/common/store.cpp
+++ b/common/store.cpp
@@ -286,6 +286,51 @@ KAsync::Job<QList<typename DomainType::Ptr>> Store::fetch(const Sink::Query &que
286 }); 286 });
287} 287}
288 288
289template <class DomainType>
290DomainType Store::readOne(const Sink::Query &query)
291{
292 const auto list = read<DomainType>(query);
293 if (!list.isEmpty()) {
294 return list.first();
295 }
296 return DomainType();
297}
298
299template <class DomainType>
300QList<DomainType> Store::read(const Sink::Query &q)
301{
302 auto query = q;
303 query.synchronousQuery = true;
304 query.liveQuery = false;
305 QList<DomainType> list;
306 auto resources = getResources(query.resources, query.accounts, ApplicationDomain::getTypeName<DomainType>());
307 auto aggregatingEmitter = AggregatingResultEmitter<typename DomainType::Ptr>::Ptr::create();
308 aggregatingEmitter->onAdded([&list](const typename DomainType::Ptr &value){
309 Trace() << "Found value: " << value->identifier();
310 list << *value;
311 });
312 for (const auto resourceInstanceIdentifier : resources.keys()) {
313 const auto resourceType = resources.value(resourceInstanceIdentifier);
314 Trace() << "Looking for " << resourceType << resourceInstanceIdentifier;
315 auto facade = FacadeFactory::instance().getFacade<DomainType>(resourceType, resourceInstanceIdentifier);
316 if (facade) {
317 Trace() << "Trying to fetch from resource " << resourceInstanceIdentifier;
318 auto result = facade->load(query);
319 if (result.second) {
320 aggregatingEmitter->addEmitter(result.second);
321 } else {
322 Warning() << "Null emitter for resource " << resourceInstanceIdentifier;
323 }
324 result.first.exec();
325 aggregatingEmitter->fetch(typename DomainType::Ptr());
326 } else {
327 Trace() << "Couldn't find a facade for " << resourceInstanceIdentifier;
328 // Ignore the error and carry on
329 }
330 }
331 return list;
332}
333
289#define REGISTER_TYPE(T) \ 334#define REGISTER_TYPE(T) \
290 template KAsync::Job<void> Store::remove<T>(const T &domainObject); \ 335 template KAsync::Job<void> Store::remove<T>(const T &domainObject); \
291 template KAsync::Job<void> Store::create<T>(const T &domainObject); \ 336 template KAsync::Job<void> Store::create<T>(const T &domainObject); \
@@ -293,7 +338,9 @@ KAsync::Job<QList<typename DomainType::Ptr>> Store::fetch(const Sink::Query &que
293 template QSharedPointer<QAbstractItemModel> Store::loadModel<T>(Query query); \ 338 template QSharedPointer<QAbstractItemModel> Store::loadModel<T>(Query query); \
294 template KAsync::Job<T> Store::fetchOne<T>(const Query &); \ 339 template KAsync::Job<T> Store::fetchOne<T>(const Query &); \
295 template KAsync::Job<QList<T::Ptr>> Store::fetchAll<T>(const Query &); \ 340 template KAsync::Job<QList<T::Ptr>> Store::fetchAll<T>(const Query &); \
296 template KAsync::Job<QList<T::Ptr>> Store::fetch<T>(const Query &, int); 341 template KAsync::Job<QList<T::Ptr>> Store::fetch<T>(const Query &, int); \
342 template T Store::readOne<T>(const Query &); \
343 template QList<T> Store::read<T>(const Query &);
297 344
298REGISTER_TYPE(ApplicationDomain::Event); 345REGISTER_TYPE(ApplicationDomain::Event);
299REGISTER_TYPE(ApplicationDomain::Mail); 346REGISTER_TYPE(ApplicationDomain::Mail);
diff --git a/common/store.h b/common/store.h
index aed3be4..571ffff 100644
--- a/common/store.h
+++ b/common/store.h
@@ -101,5 +101,11 @@ KAsync::Job<QList<typename DomainType::Ptr>> SINK_EXPORT fetchAll(const Sink::Qu
101 101
102template <class DomainType> 102template <class DomainType>
103KAsync::Job<QList<typename DomainType::Ptr>> SINK_EXPORT fetch(const Sink::Query &query, int minimumAmount = 0); 103KAsync::Job<QList<typename DomainType::Ptr>> SINK_EXPORT fetch(const Sink::Query &query, int minimumAmount = 0);
104
105template <class DomainType>
106DomainType SINK_EXPORT readOne(const Sink::Query &query);
107
108template <class DomainType>
109QList<DomainType> SINK_EXPORT read(const Sink::Query &query);
104} 110}
105} 111}