diff options
author | Christian Mollekopf <chrigi_1@fastmail.fm> | 2016-07-11 11:55:29 +0200 |
---|---|---|
committer | Christian Mollekopf <chrigi_1@fastmail.fm> | 2016-07-11 11:55:29 +0200 |
commit | 3a3118e768e1447dc7524328e84b8d7faef81fe1 (patch) | |
tree | af5582170ed6164fffc9365f34b17bf449c0db40 /common/resourcefacade.cpp | |
parent | f9379318d801df204cc50385c5eca1f28e91755e (diff) | |
parent | ce2fd2666f084eebe443598f6f3740a02913091e (diff) | |
download | sink-3a3118e768e1447dc7524328e84b8d7faef81fe1.tar.gz sink-3a3118e768e1447dc7524328e84b8d7faef81fe1.zip |
Merge branch 'feature/notifications' into develop
Diffstat (limited to 'common/resourcefacade.cpp')
-rw-r--r-- | common/resourcefacade.cpp | 278 |
1 files changed, 200 insertions, 78 deletions
diff --git a/common/resourcefacade.cpp b/common/resourcefacade.cpp index 3901f43..583d6ec 100644 --- a/common/resourcefacade.cpp +++ b/common/resourcefacade.cpp | |||
@@ -22,23 +22,19 @@ | |||
22 | #include "query.h" | 22 | #include "query.h" |
23 | #include "definitions.h" | 23 | #include "definitions.h" |
24 | #include "storage.h" | 24 | #include "storage.h" |
25 | #include "store.h" | ||
26 | #include "resourceaccess.h" | ||
25 | #include <QDir> | 27 | #include <QDir> |
26 | 28 | ||
27 | template <typename DomainType> | 29 | using namespace Sink; |
28 | ConfigNotifier LocalStorageFacade<DomainType>::sConfigNotifier; | ||
29 | 30 | ||
30 | template <typename DomainType> | 31 | SINK_DEBUG_AREA("ResourceFacade") |
31 | LocalStorageFacade<DomainType>::LocalStorageFacade(const QByteArray &identifier) : Sink::StoreFacade<DomainType>(), mConfigStore(identifier), mResourceInstanceIdentifier(identifier) | ||
32 | { | ||
33 | } | ||
34 | 32 | ||
35 | template <typename DomainType> | 33 | template<typename DomainType> |
36 | LocalStorageFacade<DomainType>::~LocalStorageFacade() | 34 | ConfigNotifier LocalStorageFacade<DomainType>::sConfigNotifier; |
37 | { | ||
38 | } | ||
39 | 35 | ||
40 | template <typename DomainType> | 36 | template <typename DomainType> |
41 | typename DomainType::Ptr LocalStorageFacade<DomainType>::readFromConfig(ConfigStore &configStore, const QByteArray &id, const QByteArray &type) | 37 | static typename DomainType::Ptr readFromConfig(ConfigStore &configStore, const QByteArray &id, const QByteArray &type) |
42 | { | 38 | { |
43 | auto object = DomainType::Ptr::create(id); | 39 | auto object = DomainType::Ptr::create(id); |
44 | object->setProperty("type", type); | 40 | object->setProperty("type", type); |
@@ -49,10 +45,127 @@ typename DomainType::Ptr LocalStorageFacade<DomainType>::readFromConfig(ConfigSt | |||
49 | return object; | 45 | return object; |
50 | } | 46 | } |
51 | 47 | ||
48 | static bool matchesFilter(const QHash<QByteArray, Query::Comparator> &filter, const QMap<QByteArray, QVariant> &properties) | ||
49 | { | ||
50 | for (const auto &filterProperty : filter.keys()) { | ||
51 | if (filterProperty == "type") { | ||
52 | continue; | ||
53 | } | ||
54 | if (!filter.value(filterProperty).matches(properties.value(filterProperty))) { | ||
55 | return false; | ||
56 | } | ||
57 | } | ||
58 | return true; | ||
59 | } | ||
60 | |||
61 | template<typename DomainType> | ||
62 | LocalStorageQueryRunner<DomainType>::LocalStorageQueryRunner(const Query &query, const QByteArray &identifier, ConfigNotifier &configNotifier) | ||
63 | : mResultProvider(new ResultProvider<typename DomainType::Ptr>), mConfigStore(identifier), mGuard(new QObject) | ||
64 | { | ||
65 | QObject *guard = new QObject; | ||
66 | mResultProvider->setFetcher([this, query, guard, &configNotifier](const QSharedPointer<DomainType> &) { | ||
67 | const auto entries = mConfigStore.getEntries(); | ||
68 | for (const auto &res : entries.keys()) { | ||
69 | const auto type = entries.value(res); | ||
70 | |||
71 | if (query.propertyFilter.contains("type") && query.propertyFilter.value("type").value.toByteArray() != type) { | ||
72 | SinkTrace() << "Skipping due to type."; | ||
73 | continue; | ||
74 | } | ||
75 | if (!query.ids.isEmpty() && !query.ids.contains(res)) { | ||
76 | continue; | ||
77 | } | ||
78 | const auto configurationValues = mConfigStore.get(res); | ||
79 | if (!matchesFilter(query.propertyFilter, configurationValues)){ | ||
80 | SinkTrace() << "Skipping due to filter."; | ||
81 | continue; | ||
82 | } | ||
83 | SinkTrace() << "Found match " << res; | ||
84 | auto entity = readFromConfig<DomainType>(mConfigStore, res, type); | ||
85 | updateStatus(*entity); | ||
86 | mResultProvider->add(entity); | ||
87 | } | ||
88 | if (query.liveQuery) { | ||
89 | { | ||
90 | auto ret = QObject::connect(&configNotifier, &ConfigNotifier::added, guard, [this](const ApplicationDomain::ApplicationDomainType::Ptr &entry) { | ||
91 | auto entity = entry.staticCast<DomainType>(); | ||
92 | updateStatus(*entity); | ||
93 | mResultProvider->add(entity); | ||
94 | }); | ||
95 | Q_ASSERT(ret); | ||
96 | } | ||
97 | { | ||
98 | auto ret = QObject::connect(&configNotifier, &ConfigNotifier::modified, guard, [this](const ApplicationDomain::ApplicationDomainType::Ptr &entry) { | ||
99 | auto entity = entry.staticCast<DomainType>(); | ||
100 | updateStatus(*entity); | ||
101 | mResultProvider->modify(entity); | ||
102 | }); | ||
103 | Q_ASSERT(ret); | ||
104 | } | ||
105 | { | ||
106 | auto ret = QObject::connect(&configNotifier, &ConfigNotifier::removed, guard, [this](const ApplicationDomain::ApplicationDomainType::Ptr &entry) { | ||
107 | mResultProvider->remove(entry.staticCast<DomainType>()); | ||
108 | }); | ||
109 | Q_ASSERT(ret); | ||
110 | } | ||
111 | } | ||
112 | // TODO initialResultSetComplete should be implicit | ||
113 | mResultProvider->initialResultSetComplete(typename DomainType::Ptr()); | ||
114 | mResultProvider->complete(); | ||
115 | }); | ||
116 | mResultProvider->onDone([=]() { delete guard; delete this; }); | ||
117 | } | ||
118 | |||
119 | template<typename DomainType> | ||
120 | QObject *LocalStorageQueryRunner<DomainType>::guard() const | ||
121 | { | ||
122 | return mGuard.get(); | ||
123 | } | ||
124 | |||
125 | template<typename DomainType> | ||
126 | void LocalStorageQueryRunner<DomainType>::updateStatus(DomainType &entity) | ||
127 | { | ||
128 | if (mStatusUpdater) { | ||
129 | mStatusUpdater(entity); | ||
130 | } | ||
131 | } | ||
132 | |||
133 | template<typename DomainType> | ||
134 | void LocalStorageQueryRunner<DomainType>::setStatusUpdater(const std::function<void(DomainType &)> &updater) | ||
135 | { | ||
136 | mStatusUpdater = updater; | ||
137 | } | ||
138 | |||
139 | template<typename DomainType> | ||
140 | void LocalStorageQueryRunner<DomainType>::statusChanged(const QByteArray &identifier) | ||
141 | { | ||
142 | SinkTrace() << "Status changed " << identifier; | ||
143 | auto entity = readFromConfig<DomainType>(mConfigStore, identifier, ApplicationDomain::getTypeName<DomainType>()); | ||
144 | updateStatus(*entity); | ||
145 | mResultProvider->modify(entity); | ||
146 | } | ||
147 | |||
148 | template<typename DomainType> | ||
149 | typename Sink::ResultEmitter<typename DomainType::Ptr>::Ptr LocalStorageQueryRunner<DomainType>::emitter() | ||
150 | { | ||
151 | return mResultProvider->emitter(); | ||
152 | } | ||
153 | |||
154 | |||
155 | template <typename DomainType> | ||
156 | LocalStorageFacade<DomainType>::LocalStorageFacade(const QByteArray &identifier) : StoreFacade<DomainType>(), mIdentifier(identifier), mConfigStore(identifier) | ||
157 | { | ||
158 | } | ||
159 | |||
160 | template <typename DomainType> | ||
161 | LocalStorageFacade<DomainType>::~LocalStorageFacade() | ||
162 | { | ||
163 | } | ||
164 | |||
52 | template <typename DomainType> | 165 | template <typename DomainType> |
53 | typename DomainType::Ptr LocalStorageFacade<DomainType>::readFromConfig(const QByteArray &id, const QByteArray &type) | 166 | typename DomainType::Ptr LocalStorageFacade<DomainType>::readFromConfig(const QByteArray &id, const QByteArray &type) |
54 | { | 167 | { |
55 | return readFromConfig(mConfigStore, id, type); | 168 | return ::readFromConfig<DomainType>(mConfigStore, id, type); |
56 | } | 169 | } |
57 | 170 | ||
58 | template <typename DomainType> | 171 | template <typename DomainType> |
@@ -84,7 +197,7 @@ KAsync::Job<void> LocalStorageFacade<DomainType>::modify(const DomainType &domai | |||
84 | return KAsync::start<void>([domainObject, this]() { | 197 | return KAsync::start<void>([domainObject, this]() { |
85 | const QByteArray identifier = domainObject.identifier(); | 198 | const QByteArray identifier = domainObject.identifier(); |
86 | if (identifier.isEmpty()) { | 199 | if (identifier.isEmpty()) { |
87 | Warning() << "We need an \"identifier\" property to identify the entity to configure."; | 200 | SinkWarning() << "We need an \"identifier\" property to identify the entity to configure."; |
88 | return; | 201 | return; |
89 | } | 202 | } |
90 | auto changedProperties = domainObject.changedProperties(); | 203 | auto changedProperties = domainObject.changedProperties(); |
@@ -110,77 +223,22 @@ KAsync::Job<void> LocalStorageFacade<DomainType>::remove(const DomainType &domai | |||
110 | return KAsync::start<void>([domainObject, this]() { | 223 | return KAsync::start<void>([domainObject, this]() { |
111 | const QByteArray identifier = domainObject.identifier(); | 224 | const QByteArray identifier = domainObject.identifier(); |
112 | if (identifier.isEmpty()) { | 225 | if (identifier.isEmpty()) { |
113 | Warning() << "We need an \"identifier\" property to identify the entity to configure"; | 226 | SinkWarning() << "We need an \"identifier\" property to identify the entity to configure"; |
114 | return; | 227 | return; |
115 | } | 228 | } |
116 | Trace() << "Removing: " << identifier; | 229 | SinkTrace() << "Removing: " << identifier; |
117 | mConfigStore.remove(identifier); | 230 | mConfigStore.remove(identifier); |
118 | sConfigNotifier.remove(QSharedPointer<DomainType>::create(domainObject)); | 231 | sConfigNotifier.remove(QSharedPointer<DomainType>::create(domainObject)); |
119 | }); | 232 | }); |
120 | } | 233 | } |
121 | 234 | ||
122 | static bool matchesFilter(const QHash<QByteArray, Sink::Query::Comparator> &filter, const QMap<QByteArray, QVariant> &properties) | ||
123 | { | ||
124 | for (const auto &filterProperty : filter.keys()) { | ||
125 | if (filterProperty == "type") { | ||
126 | continue; | ||
127 | } | ||
128 | if (!filter.value(filterProperty).matches(properties.value(filterProperty))) { | ||
129 | return false; | ||
130 | } | ||
131 | } | ||
132 | return true; | ||
133 | } | ||
134 | |||
135 | template <typename DomainType> | 235 | template <typename DomainType> |
136 | QPair<KAsync::Job<void>, typename Sink::ResultEmitter<typename DomainType::Ptr>::Ptr> LocalStorageFacade<DomainType>::load(const Sink::Query &query) | 236 | QPair<KAsync::Job<void>, typename ResultEmitter<typename DomainType::Ptr>::Ptr> LocalStorageFacade<DomainType>::load(const Query &query) |
137 | { | 237 | { |
138 | QObject *guard = new QObject; | 238 | auto runner = new LocalStorageQueryRunner<DomainType>(query, mIdentifier, sConfigNotifier); |
139 | auto resultProvider = new Sink::ResultProvider<typename DomainType::Ptr>(); | 239 | return qMakePair(KAsync::null<void>(), runner->emitter()); |
140 | auto emitter = resultProvider->emitter(); | ||
141 | auto identifier = mResourceInstanceIdentifier; | ||
142 | resultProvider->setFetcher([identifier, query, guard, resultProvider](const QSharedPointer<DomainType> &) { | ||
143 | ConfigStore mConfigStore(identifier); | ||
144 | const auto entries = mConfigStore.getEntries(); | ||
145 | for (const auto &res : entries.keys()) { | ||
146 | const auto type = entries.value(res); | ||
147 | |||
148 | if (query.propertyFilter.contains("type") && query.propertyFilter.value("type").value.toByteArray() != type) { | ||
149 | Trace() << "Skipping due to type."; | ||
150 | continue; | ||
151 | } | ||
152 | if (!query.ids.isEmpty() && !query.ids.contains(res)) { | ||
153 | continue; | ||
154 | } | ||
155 | const auto configurationValues = mConfigStore.get(res); | ||
156 | if (!matchesFilter(query.propertyFilter, configurationValues)){ | ||
157 | Trace() << "Skipping due to filter."; | ||
158 | continue; | ||
159 | } | ||
160 | Trace() << "Found match " << res; | ||
161 | resultProvider->add(readFromConfig(mConfigStore, res, type)); | ||
162 | } | ||
163 | if (query.liveQuery) { | ||
164 | QObject::connect(&sConfigNotifier, &ConfigNotifier::modified, guard, [resultProvider](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &entry) { | ||
165 | resultProvider->modify(entry.staticCast<DomainType>()); | ||
166 | }); | ||
167 | QObject::connect(&sConfigNotifier, &ConfigNotifier::added, guard, [resultProvider](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &entry) { | ||
168 | resultProvider->add(entry.staticCast<DomainType>()); | ||
169 | }); | ||
170 | QObject::connect(&sConfigNotifier, &ConfigNotifier::removed, guard,[resultProvider](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &entry) { | ||
171 | resultProvider->remove(entry.staticCast<DomainType>()); | ||
172 | }); | ||
173 | } | ||
174 | // TODO initialResultSetComplete should be implicit | ||
175 | resultProvider->initialResultSetComplete(typename DomainType::Ptr()); | ||
176 | resultProvider->complete(); | ||
177 | }); | ||
178 | resultProvider->onDone([=]() { delete resultProvider; delete guard; }); | ||
179 | |||
180 | return qMakePair(KAsync::null<void>(), emitter); | ||
181 | } | 240 | } |
182 | 241 | ||
183 | |||
184 | ResourceFacade::ResourceFacade() : LocalStorageFacade<Sink::ApplicationDomain::SinkResource>("resources") | 242 | ResourceFacade::ResourceFacade() : LocalStorageFacade<Sink::ApplicationDomain::SinkResource>("resources") |
185 | { | 243 | { |
186 | } | 244 | } |
@@ -192,13 +250,28 @@ ResourceFacade::~ResourceFacade() | |||
192 | KAsync::Job<void> ResourceFacade::remove(const Sink::ApplicationDomain::SinkResource &resource) | 250 | KAsync::Job<void> ResourceFacade::remove(const Sink::ApplicationDomain::SinkResource &resource) |
193 | { | 251 | { |
194 | const auto identifier = resource.identifier(); | 252 | const auto identifier = resource.identifier(); |
195 | return LocalStorageFacade<Sink::ApplicationDomain::SinkResource>::remove(resource).then<void>([identifier]() { | 253 | return Sink::Store::removeDataFromDisk(identifier).then(LocalStorageFacade<Sink::ApplicationDomain::SinkResource>::remove(resource)); |
196 | // TODO shutdown resource, or use the resource process with a --remove option to cleanup (so we can take advantage of the file locking) | 254 | } |
197 | QDir dir(Sink::storageLocation()); | 255 | |
198 | for (const auto &folder : dir.entryList(QStringList() << identifier + "*")) { | 256 | QPair<KAsync::Job<void>, typename Sink::ResultEmitter<typename ApplicationDomain::SinkResource::Ptr>::Ptr> ResourceFacade::load(const Sink::Query &query) |
199 | Sink::Storage(Sink::storageLocation(), folder, Sink::Storage::ReadWrite).removeFromDisk(); | 257 | { |
258 | auto runner = new LocalStorageQueryRunner<ApplicationDomain::SinkResource>(query, mIdentifier, sConfigNotifier); | ||
259 | auto monitoredResources = QSharedPointer<QSet<QByteArray>>::create(); | ||
260 | runner->setStatusUpdater([runner, monitoredResources](ApplicationDomain::SinkResource &resource) { | ||
261 | auto resourceAccess = ResourceAccessFactory::instance().getAccess(resource.identifier(), ResourceConfig::getResourceType(resource.identifier())); | ||
262 | if (!monitoredResources->contains(resource.identifier())) { | ||
263 | auto ret = QObject::connect(resourceAccess.data(), &ResourceAccess::notification, runner->guard(), [resource, runner, resourceAccess](const Notification ¬ification) { | ||
264 | SinkTrace() << "Received notification in facade: " << notification.type; | ||
265 | if (notification.type == Notification::Status) { | ||
266 | runner->statusChanged(resource.identifier()); | ||
267 | } | ||
268 | }); | ||
269 | Q_ASSERT(ret); | ||
270 | monitoredResources->insert(resource.identifier()); | ||
200 | } | 271 | } |
272 | resource.setStatusStatus(resourceAccess->getResourceStatus()); | ||
201 | }); | 273 | }); |
274 | return qMakePair(KAsync::null<void>(), runner->emitter()); | ||
202 | } | 275 | } |
203 | 276 | ||
204 | 277 | ||
@@ -210,6 +283,55 @@ AccountFacade::~AccountFacade() | |||
210 | { | 283 | { |
211 | } | 284 | } |
212 | 285 | ||
286 | QPair<KAsync::Job<void>, typename Sink::ResultEmitter<typename ApplicationDomain::SinkAccount::Ptr>::Ptr> AccountFacade::load(const Sink::Query &query) | ||
287 | { | ||
288 | auto runner = new LocalStorageQueryRunner<ApplicationDomain::SinkAccount>(query, mIdentifier, sConfigNotifier); | ||
289 | auto monitoredResources = QSharedPointer<QSet<QByteArray>>::create(); | ||
290 | runner->setStatusUpdater([runner, monitoredResources](ApplicationDomain::SinkAccount &account) { | ||
291 | Query query; | ||
292 | query.filter<ApplicationDomain::SinkResource::Account>(account.identifier()); | ||
293 | const auto resources = Store::read<ApplicationDomain::SinkResource>(query); | ||
294 | SinkTrace() << "Found resource belonging to the account " << account.identifier() << " : " << resources; | ||
295 | auto accountIdentifier = account.identifier(); | ||
296 | ApplicationDomain::Status status = ApplicationDomain::ConnectedStatus; | ||
297 | for (const auto &resource : resources) { | ||
298 | auto resourceAccess = ResourceAccessFactory::instance().getAccess(resource.identifier(), ResourceConfig::getResourceType(resource.identifier())); | ||
299 | if (!monitoredResources->contains(resource.identifier())) { | ||
300 | auto ret = QObject::connect(resourceAccess.data(), &ResourceAccess::notification, runner->guard(), [resource, runner, resourceAccess, accountIdentifier](const Notification ¬ification) { | ||
301 | SinkTrace() << "Received notification in facade: " << notification.type; | ||
302 | if (notification.type == Notification::Status) { | ||
303 | runner->statusChanged(accountIdentifier); | ||
304 | } | ||
305 | }); | ||
306 | Q_ASSERT(ret); | ||
307 | monitoredResources->insert(resource.identifier()); | ||
308 | } | ||
309 | |||
310 | //Figure out overall status | ||
311 | auto s = resourceAccess->getResourceStatus(); | ||
312 | switch (s) { | ||
313 | case ApplicationDomain::ErrorStatus: | ||
314 | status = ApplicationDomain::ErrorStatus; | ||
315 | break; | ||
316 | case ApplicationDomain::OfflineStatus: | ||
317 | if (status == ApplicationDomain::ConnectedStatus) { | ||
318 | status = ApplicationDomain::OfflineStatus; | ||
319 | } | ||
320 | break; | ||
321 | case ApplicationDomain::ConnectedStatus: | ||
322 | break; | ||
323 | case ApplicationDomain::BusyStatus: | ||
324 | if (status != ApplicationDomain::ErrorStatus) { | ||
325 | status = ApplicationDomain::BusyStatus; | ||
326 | } | ||
327 | break; | ||
328 | } | ||
329 | } | ||
330 | account.setStatusStatus(status); | ||
331 | }); | ||
332 | return qMakePair(KAsync::null<void>(), runner->emitter()); | ||
333 | } | ||
334 | |||
213 | IdentityFacade::IdentityFacade() : LocalStorageFacade<Sink::ApplicationDomain::Identity>("identities") | 335 | IdentityFacade::IdentityFacade() : LocalStorageFacade<Sink::ApplicationDomain::Identity>("identities") |
214 | { | 336 | { |
215 | } | 337 | } |