diff options
Diffstat (limited to 'common/store.cpp')
-rw-r--r-- | common/store.cpp | 160 |
1 files changed, 56 insertions, 104 deletions
diff --git a/common/store.cpp b/common/store.cpp index 554f540..ac1124a 100644 --- a/common/store.cpp +++ b/common/store.cpp | |||
@@ -54,57 +54,6 @@ QString Store::getTemporaryFilePath() | |||
54 | return Sink::temporaryFileLocation() + "/" + QUuid::createUuid().toString(); | 54 | return Sink::temporaryFileLocation() + "/" + QUuid::createUuid().toString(); |
55 | } | 55 | } |
56 | 56 | ||
57 | /* | ||
58 | * Returns a map of resource instance identifiers and resource type | ||
59 | */ | ||
60 | static QMap<QByteArray, QByteArray> getResources(const Sink::Query::Filter &query, const QByteArray &type, const Sink::Log::Context &ctx) | ||
61 | { | ||
62 | const QList<QByteArray> resourceFilter = query.ids; | ||
63 | |||
64 | |||
65 | const auto filterResource = [&](const QByteArray &res) { | ||
66 | const auto configuration = ResourceConfig::getConfiguration(res); | ||
67 | for (const auto &filterProperty : query.propertyFilter.keys()) { | ||
68 | const auto filter = query.propertyFilter.value(filterProperty); | ||
69 | if (!filter.matches(configuration.value(filterProperty))) { | ||
70 | return true; | ||
71 | } | ||
72 | } | ||
73 | return false; | ||
74 | }; | ||
75 | |||
76 | QMap<QByteArray, QByteArray> resources; | ||
77 | // Return the global resource (signified by an empty name) for types that don't belong to a specific resource | ||
78 | if (ApplicationDomain::isGlobalType(type)) { | ||
79 | resources.insert("", ""); | ||
80 | return resources; | ||
81 | } | ||
82 | const auto configuredResources = ResourceConfig::getResources(); | ||
83 | if (resourceFilter.isEmpty()) { | ||
84 | for (const auto &res : configuredResources.keys()) { | ||
85 | const auto type = configuredResources.value(res); | ||
86 | if (filterResource(res)) { | ||
87 | continue; | ||
88 | } | ||
89 | // TODO filter by entity type | ||
90 | resources.insert(res, type); | ||
91 | } | ||
92 | } else { | ||
93 | for (const auto &res : resourceFilter) { | ||
94 | if (configuredResources.contains(res)) { | ||
95 | if (filterResource(res)) { | ||
96 | continue; | ||
97 | } | ||
98 | resources.insert(res, configuredResources.value(res)); | ||
99 | } else { | ||
100 | SinkWarningCtx(ctx) << "Resource is not existing: " << res; | ||
101 | } | ||
102 | } | ||
103 | } | ||
104 | SinkTraceCtx(ctx) << "Found resources: " << resources; | ||
105 | return resources; | ||
106 | } | ||
107 | |||
108 | 57 | ||
109 | template <class DomainType> | 58 | template <class DomainType> |
110 | KAsync::Job<void> queryResource(const QByteArray resourceType, const QByteArray &resourceInstanceIdentifier, const Query &query, typename AggregatingResultEmitter<typename DomainType::Ptr>::Ptr aggregatingEmitter, const Sink::Log::Context &ctx_) | 59 | KAsync::Job<void> queryResource(const QByteArray resourceType, const QByteArray &resourceInstanceIdentifier, const Query &query, typename AggregatingResultEmitter<typename DomainType::Ptr>::Ptr aggregatingEmitter, const Sink::Log::Context &ctx_) |
@@ -128,31 +77,26 @@ KAsync::Job<void> queryResource(const QByteArray resourceType, const QByteArray | |||
128 | } | 77 | } |
129 | 78 | ||
130 | template <class DomainType> | 79 | template <class DomainType> |
131 | QSharedPointer<QAbstractItemModel> Store::loadModel(Query query) | 80 | QPair<typename AggregatingResultEmitter<typename DomainType::Ptr>::Ptr, typename ResultEmitter<typename ApplicationDomain::SinkResource::Ptr>::Ptr> getEmitter(Query query, const Log::Context &ctx) |
132 | { | 81 | { |
133 | Log::Context ctx{query.id()}; | ||
134 | query.setType(ApplicationDomain::getTypeName<DomainType>()); | 82 | query.setType(ApplicationDomain::getTypeName<DomainType>()); |
135 | SinkTraceCtx(ctx) << "Loading model: " << query; | 83 | SinkTraceCtx(ctx) << "Query: " << query; |
136 | auto model = QSharedPointer<ModelResult<DomainType, typename DomainType::Ptr>>::create(query, query.requestedProperties, ctx); | ||
137 | |||
138 | //* Client defines lifetime of model | ||
139 | //* The model lifetime defines the duration of live-queries | ||
140 | //* The facade needs to life for the duration of any calls being made (assuming we get rid of any internal callbacks | ||
141 | //* The emitter needs to live or the duration of query (respectively, the model) | ||
142 | //* The result provider needs to live for as long as results are provided (until the last thread exits). | ||
143 | 84 | ||
144 | // Query all resources and aggregate results | 85 | // Query all resources and aggregate results |
145 | auto resources = getResources(query.getResourceFilter(), ApplicationDomain::getTypeName<DomainType>(), ctx); | ||
146 | auto aggregatingEmitter = AggregatingResultEmitter<typename DomainType::Ptr>::Ptr::create(); | 86 | auto aggregatingEmitter = AggregatingResultEmitter<typename DomainType::Ptr>::Ptr::create(); |
147 | model->setEmitter(aggregatingEmitter); | 87 | if (ApplicationDomain::isGlobalType(ApplicationDomain::getTypeName<DomainType>())) { |
148 | 88 | //For global types we don't need to query for the resources first. | |
149 | if (query.liveQuery() && query.getResourceFilter().ids.isEmpty() && !ApplicationDomain::isGlobalType(ApplicationDomain::getTypeName<DomainType>())) { | 89 | queryResource<DomainType>("", "", query, aggregatingEmitter, ctx).exec(); |
150 | SinkTraceCtx(ctx) << "Listening for new resources"; | 90 | } else { |
151 | auto resourceCtx = ctx.subContext("resourceQuery"); | 91 | auto resourceCtx = ctx.subContext("resourceQuery"); |
152 | auto facade = FacadeFactory::instance().getFacade<ApplicationDomain::SinkResource>(); | 92 | auto facade = FacadeFactory::instance().getFacade<ApplicationDomain::SinkResource>(); |
153 | Q_ASSERT(facade); | 93 | Q_ASSERT(facade); |
154 | Sink::Query resourceQuery; | 94 | Sink::Query resourceQuery; |
155 | query.setFlags(Query::LiveQuery); | 95 | if (query.liveQuery()) { |
96 | SinkTraceCtx(ctx) << "Listening for new resources"; | ||
97 | resourceQuery.setFlags(Query::LiveQuery); | ||
98 | } | ||
99 | resourceQuery.setFilter(query.getResourceFilter()); | ||
156 | auto result = facade->load(resourceQuery, resourceCtx); | 100 | auto result = facade->load(resourceQuery, resourceCtx); |
157 | auto emitter = result.second; | 101 | auto emitter = result.second; |
158 | emitter->onAdded([query, aggregatingEmitter, resourceCtx](const ApplicationDomain::SinkResource::Ptr &resource) { | 102 | emitter->onAdded([query, aggregatingEmitter, resourceCtx](const ApplicationDomain::SinkResource::Ptr &resource) { |
@@ -169,18 +113,36 @@ QSharedPointer<QAbstractItemModel> Store::loadModel(Query query) | |||
169 | }); | 113 | }); |
170 | emitter->onComplete([query, aggregatingEmitter, resourceCtx]() { | 114 | emitter->onComplete([query, aggregatingEmitter, resourceCtx]() { |
171 | SinkTraceCtx(resourceCtx) << "Resource query complete"; | 115 | SinkTraceCtx(resourceCtx) << "Resource query complete"; |
172 | |||
173 | }); | 116 | }); |
174 | model->setProperty("resourceEmitter", QVariant::fromValue(emitter)); | 117 | |
175 | result.first.exec(); | 118 | return qMakePair(aggregatingEmitter, emitter); |
176 | } | 119 | } |
120 | return qMakePair(aggregatingEmitter, ResultEmitter<typename ApplicationDomain::SinkResource::Ptr>::Ptr{}); | ||
121 | } | ||
177 | 122 | ||
178 | KAsync::value(resources.keys()) | 123 | template <class DomainType> |
179 | .template each([query, aggregatingEmitter, resources, ctx](const QByteArray &resourceInstanceIdentifier) { | 124 | QSharedPointer<QAbstractItemModel> Store::loadModel(const Query &query) |
180 | const auto resourceType = resources.value(resourceInstanceIdentifier); | 125 | { |
181 | return queryResource<DomainType>(resourceType, resourceInstanceIdentifier, query, aggregatingEmitter, ctx); | 126 | Log::Context ctx{query.id()}; |
182 | }) | 127 | auto model = QSharedPointer<ModelResult<DomainType, typename DomainType::Ptr>>::create(query, query.requestedProperties, ctx); |
183 | .exec(); | 128 | |
129 | //* Client defines lifetime of model | ||
130 | //* The model lifetime defines the duration of live-queries | ||
131 | //* The facade needs to life for the duration of any calls being made (assuming we get rid of any internal callbacks | ||
132 | //* The emitter needs to live or the duration of query (respectively, the model) | ||
133 | //* The result provider needs to live for as long as results are provided (until the last thread exits). | ||
134 | |||
135 | auto result = getEmitter<DomainType>(query, ctx); | ||
136 | model->setEmitter(result.first); | ||
137 | |||
138 | //Keep the emitter alive | ||
139 | if (auto resourceEmitter = result.second) { | ||
140 | model->setProperty("resourceEmitter", QVariant::fromValue(resourceEmitter)); //TODO only neceesary for live queries | ||
141 | resourceEmitter->fetch(ApplicationDomain::SinkResource::Ptr()); | ||
142 | } | ||
143 | |||
144 | |||
145 | //Automatically populate the top-level | ||
184 | model->fetchMore(QModelIndex()); | 146 | model->fetchMore(QModelIndex()); |
185 | 147 | ||
186 | return model; | 148 | return model; |
@@ -297,11 +259,11 @@ KAsync::Job<void> Store::synchronize(const Sink::Query &query) | |||
297 | 259 | ||
298 | KAsync::Job<void> Store::synchronize(const Sink::SyncScope &scope) | 260 | KAsync::Job<void> Store::synchronize(const Sink::SyncScope &scope) |
299 | { | 261 | { |
300 | auto resources = getResources(scope.getResourceFilter(), {}, {}).keys(); | 262 | Sink::Query query; |
301 | SinkLog() << "Synchronize" << resources; | 263 | query.setFilter(scope.getResourceFilter()); |
302 | return KAsync::value(resources) | 264 | return fetchAll<ApplicationDomain::SinkResource>(query) |
303 | .template each([scope](const QByteArray &resource) { | 265 | .template each([scope](const ApplicationDomain::SinkResource::Ptr &resource) -> KAsync::Job<void> { |
304 | return synchronize(resource, scope); | 266 | return synchronize(resource->identifier(), scope); |
305 | }); | 267 | }); |
306 | } | 268 | } |
307 | 269 | ||
@@ -371,36 +333,26 @@ DomainType Store::readOne(const Sink::Query &query) | |||
371 | } | 333 | } |
372 | 334 | ||
373 | template <class DomainType> | 335 | template <class DomainType> |
374 | QList<DomainType> Store::read(const Sink::Query &q) | 336 | QList<DomainType> Store::read(const Sink::Query &query_) |
375 | { | 337 | { |
376 | Log::Context ctx{q.id()}; | 338 | auto query = query_; |
377 | auto query = q; | ||
378 | query.setFlags(Query::SynchronousQuery); | 339 | query.setFlags(Query::SynchronousQuery); |
340 | |||
341 | Log::Context ctx{query.id()}; | ||
342 | |||
379 | QList<DomainType> list; | 343 | QList<DomainType> list; |
380 | auto resources = getResources(query.getResourceFilter(), ApplicationDomain::getTypeName<DomainType>(), ctx); | 344 | |
381 | auto aggregatingEmitter = AggregatingResultEmitter<typename DomainType::Ptr>::Ptr::create(); | 345 | auto result = getEmitter<DomainType>(query, ctx); |
346 | auto aggregatingEmitter = result.first; | ||
382 | aggregatingEmitter->onAdded([&list, ctx](const typename DomainType::Ptr &value){ | 347 | aggregatingEmitter->onAdded([&list, ctx](const typename DomainType::Ptr &value){ |
383 | SinkTraceCtx(ctx) << "Found value: " << value->identifier(); | 348 | SinkTraceCtx(ctx) << "Found value: " << value->identifier(); |
384 | list << *value; | 349 | list << *value; |
385 | }); | 350 | }); |
386 | for (const auto &resourceInstanceIdentifier : resources.keys()) { | 351 | |
387 | const auto resourceType = resources.value(resourceInstanceIdentifier); | 352 | if (auto resourceEmitter = result.second) { |
388 | SinkTraceCtx(ctx) << "Querying resource: " << resourceType << resourceInstanceIdentifier; | 353 | resourceEmitter->fetch(ApplicationDomain::SinkResource::Ptr()); |
389 | auto facade = FacadeFactory::instance().getFacade<DomainType>(resourceType, resourceInstanceIdentifier); | ||
390 | if (facade) { | ||
391 | SinkTraceCtx(ctx) << "Trying to fetch from resource " << resourceInstanceIdentifier; | ||
392 | auto result = facade->load(query, ctx); | ||
393 | if (result.second) { | ||
394 | aggregatingEmitter->addEmitter(result.second); | ||
395 | } else { | ||
396 | SinkWarningCtx(ctx) << "Null emitter for resource " << resourceInstanceIdentifier; | ||
397 | } | ||
398 | result.first.exec(); | ||
399 | } else { | ||
400 | SinkTraceCtx(ctx) << "Couldn't find a facade for " << resourceInstanceIdentifier; | ||
401 | // Ignore the error and carry on | ||
402 | } | ||
403 | } | 354 | } |
355 | |||
404 | aggregatingEmitter->fetch(typename DomainType::Ptr()); | 356 | aggregatingEmitter->fetch(typename DomainType::Ptr()); |
405 | return list; | 357 | return list; |
406 | } | 358 | } |
@@ -411,7 +363,7 @@ QList<DomainType> Store::read(const Sink::Query &q) | |||
411 | template KAsync::Job<void> Store::modify<T>(const T &domainObject); \ | 363 | template KAsync::Job<void> Store::modify<T>(const T &domainObject); \ |
412 | template KAsync::Job<void> Store::move<T>(const T &domainObject, const QByteArray &newResource); \ | 364 | template KAsync::Job<void> Store::move<T>(const T &domainObject, const QByteArray &newResource); \ |
413 | template KAsync::Job<void> Store::copy<T>(const T &domainObject, const QByteArray &newResource); \ | 365 | template KAsync::Job<void> Store::copy<T>(const T &domainObject, const QByteArray &newResource); \ |
414 | template QSharedPointer<QAbstractItemModel> Store::loadModel<T>(Query query); \ | 366 | template QSharedPointer<QAbstractItemModel> Store::loadModel<T>(const Query &query); \ |
415 | template KAsync::Job<T> Store::fetchOne<T>(const Query &); \ | 367 | template KAsync::Job<T> Store::fetchOne<T>(const Query &); \ |
416 | template KAsync::Job<QList<T::Ptr>> Store::fetchAll<T>(const Query &); \ | 368 | template KAsync::Job<QList<T::Ptr>> Store::fetchAll<T>(const Query &); \ |
417 | template KAsync::Job<QList<T::Ptr>> Store::fetch<T>(const Query &, int); \ | 369 | template KAsync::Job<QList<T::Ptr>> Store::fetch<T>(const Query &, int); \ |