diff options
author | Christian Mollekopf <chrigi_1@fastmail.fm> | 2016-07-27 02:26:47 +0200 |
---|---|---|
committer | Christian Mollekopf <chrigi_1@fastmail.fm> | 2016-09-15 16:14:19 +0200 |
commit | 26816c21f60450e461a5b6ef4ef740f6070ce278 (patch) | |
tree | 55e8aee03e094abf702438e6cd26233047345e70 /common/store.cpp | |
parent | 9a9bb39f7641a818434cafa0dae0c8aa47124c0b (diff) | |
download | sink-26816c21f60450e461a5b6ef4ef740f6070ce278.tar.gz sink-26816c21f60450e461a5b6ef4ef740f6070ce278.zip |
Ported to the kasync revamp
Diffstat (limited to 'common/store.cpp')
-rw-r--r-- | common/store.cpp | 68 |
1 files changed, 33 insertions, 35 deletions
diff --git a/common/store.cpp b/common/store.cpp index 07f41f8..c01d220 100644 --- a/common/store.cpp +++ b/common/store.cpp | |||
@@ -39,6 +39,8 @@ | |||
39 | SINK_DEBUG_AREA("store") | 39 | SINK_DEBUG_AREA("store") |
40 | 40 | ||
41 | Q_DECLARE_METATYPE(QSharedPointer<Sink::ResultEmitter<Sink::ApplicationDomain::SinkResource::Ptr>>) | 41 | Q_DECLARE_METATYPE(QSharedPointer<Sink::ResultEmitter<Sink::ApplicationDomain::SinkResource::Ptr>>) |
42 | Q_DECLARE_METATYPE(QSharedPointer<Sink::ResourceAccessInterface>); | ||
43 | Q_DECLARE_METATYPE(std::shared_ptr<void>); | ||
42 | 44 | ||
43 | namespace Sink { | 45 | namespace Sink { |
44 | 46 | ||
@@ -169,12 +171,10 @@ QSharedPointer<QAbstractItemModel> Store::loadModel(Query query) | |||
169 | result.first.exec(); | 171 | result.first.exec(); |
170 | } | 172 | } |
171 | 173 | ||
172 | KAsync::iterate(resources.keys()) | 174 | KAsync::value(resources.keys()) |
173 | .template each<void, QByteArray>([query, aggregatingEmitter, resources](const QByteArray &resourceInstanceIdentifier, KAsync::Future<void> &future) { | 175 | .template each([query, aggregatingEmitter, resources](const QByteArray &resourceInstanceIdentifier) { |
174 | const auto resourceType = resources.value(resourceInstanceIdentifier); | 176 | const auto resourceType = resources.value(resourceInstanceIdentifier); |
175 | queryResource<DomainType>(resourceType, resourceInstanceIdentifier, query, aggregatingEmitter).template then<void>([&future]() { | 177 | return queryResource<DomainType>(resourceType, resourceInstanceIdentifier, query, aggregatingEmitter); |
176 | future.setFinished(); | ||
177 | }).exec(); | ||
178 | }) | 178 | }) |
179 | .exec(); | 179 | .exec(); |
180 | model->fetchMore(QModelIndex()); | 180 | model->fetchMore(QModelIndex()); |
@@ -201,7 +201,7 @@ KAsync::Job<void> Store::create(const DomainType &domainObject) | |||
201 | { | 201 | { |
202 | // Potentially move to separate thread as well | 202 | // Potentially move to separate thread as well |
203 | auto facade = getFacade<DomainType>(domainObject.resourceInstanceIdentifier()); | 203 | auto facade = getFacade<DomainType>(domainObject.resourceInstanceIdentifier()); |
204 | return facade->create(domainObject).template then<void>([facade]() {}, [](int errorCode, const QString &error) { SinkWarning() << "Failed to create"; }); | 204 | return facade->create(domainObject).addToContext(std::shared_ptr<void>(facade)).onError([](const KAsync::Error &error) { SinkWarning() << "Failed to create"; }); |
205 | } | 205 | } |
206 | 206 | ||
207 | template <class DomainType> | 207 | template <class DomainType> |
@@ -209,7 +209,7 @@ KAsync::Job<void> Store::modify(const DomainType &domainObject) | |||
209 | { | 209 | { |
210 | // Potentially move to separate thread as well | 210 | // Potentially move to separate thread as well |
211 | auto facade = getFacade<DomainType>(domainObject.resourceInstanceIdentifier()); | 211 | auto facade = getFacade<DomainType>(domainObject.resourceInstanceIdentifier()); |
212 | return facade->modify(domainObject).template then<void>([facade]() {}, [](int errorCode, const QString &error) { SinkWarning() << "Failed to modify"; }); | 212 | return facade->modify(domainObject).addToContext(std::shared_ptr<void>(facade)).onError([](const KAsync::Error &error) { SinkWarning() << "Failed to modify"; }); |
213 | } | 213 | } |
214 | 214 | ||
215 | template <class DomainType> | 215 | template <class DomainType> |
@@ -217,7 +217,7 @@ KAsync::Job<void> Store::remove(const DomainType &domainObject) | |||
217 | { | 217 | { |
218 | // Potentially move to separate thread as well | 218 | // Potentially move to separate thread as well |
219 | auto facade = getFacade<DomainType>(domainObject.resourceInstanceIdentifier()); | 219 | auto facade = getFacade<DomainType>(domainObject.resourceInstanceIdentifier()); |
220 | return facade->remove(domainObject).template then<void>([facade]() {}, [](int errorCode, const QString &error) { SinkWarning() << "Failed to remove"; }); | 220 | return facade->remove(domainObject).addToContext(std::shared_ptr<void>(facade)).onError([](const KAsync::Error &error) { SinkWarning() << "Failed to remove"; }); |
221 | } | 221 | } |
222 | 222 | ||
223 | KAsync::Job<void> Store::removeDataFromDisk(const QByteArray &identifier) | 223 | KAsync::Job<void> Store::removeDataFromDisk(const QByteArray &identifier) |
@@ -231,6 +231,7 @@ KAsync::Job<void> Store::removeDataFromDisk(const QByteArray &identifier) | |||
231 | auto resourceAccess = ResourceAccessFactory::instance().getAccess(identifier, ResourceConfig::getResourceType(identifier)); | 231 | auto resourceAccess = ResourceAccessFactory::instance().getAccess(identifier, ResourceConfig::getResourceType(identifier)); |
232 | resourceAccess->open(); | 232 | resourceAccess->open(); |
233 | return resourceAccess->sendCommand(Sink::Commands::RemoveFromDiskCommand) | 233 | return resourceAccess->sendCommand(Sink::Commands::RemoveFromDiskCommand) |
234 | .addToContext(resourceAccess) | ||
234 | .then<void>([resourceAccess](KAsync::Future<void> &future) { | 235 | .then<void>([resourceAccess](KAsync::Future<void> &future) { |
235 | if (resourceAccess->isReady()) { | 236 | if (resourceAccess->isReady()) { |
236 | //Wait for the resource shutdown | 237 | //Wait for the resource shutdown |
@@ -243,8 +244,8 @@ KAsync::Job<void> Store::removeDataFromDisk(const QByteArray &identifier) | |||
243 | future.setFinished(); | 244 | future.setFinished(); |
244 | } | 245 | } |
245 | }) | 246 | }) |
246 | .then<void>([resourceAccess, time]() { | 247 | .syncThen<void>([time]() { |
247 | SinkTrace() << "Remove from disk complete." << Log::TraceTime(time->elapsed()); | 248 | SinkTrace() << "Remove from disk complete." << Log::TraceTime(time->elapsed()); |
248 | }); | 249 | }); |
249 | } | 250 | } |
250 | 251 | ||
@@ -253,41 +254,38 @@ KAsync::Job<void> Store::synchronize(const Sink::Query &query) | |||
253 | SinkTrace() << "synchronize" << query.resources; | 254 | SinkTrace() << "synchronize" << query.resources; |
254 | auto resources = getResources(query.resources, query.accounts).keys(); | 255 | auto resources = getResources(query.resources, query.accounts).keys(); |
255 | //FIXME only necessary because each doesn't propagate errors | 256 | //FIXME only necessary because each doesn't propagate errors |
256 | auto error = new bool; | 257 | auto errorFlag = new bool; |
257 | return KAsync::iterate(resources) | 258 | return KAsync::value(resources) |
258 | .template each<void, QByteArray>([query, error](const QByteArray &resource, KAsync::Future<void> &future) { | 259 | .template each([query, errorFlag](const QByteArray &resource) { |
259 | SinkTrace() << "Synchronizing " << resource; | 260 | SinkTrace() << "Synchronizing " << resource; |
260 | auto resourceAccess = ResourceAccessFactory::instance().getAccess(resource, ResourceConfig::getResourceType(resource)); | 261 | auto resourceAccess = ResourceAccessFactory::instance().getAccess(resource, ResourceConfig::getResourceType(resource)); |
261 | resourceAccess->open(); | 262 | resourceAccess->open(); |
262 | resourceAccess->synchronizeResource(true, false).then<void>([resourceAccess, &future]() {SinkTrace() << "synced."; future.setFinished(); }, | 263 | return resourceAccess->synchronizeResource(true, false) |
263 | [&future, error](int errorCode, QString msg) { *error = true; SinkWarning() << "Error during sync."; future.setError(errorCode, msg); }).exec(); | 264 | .addToContext(resourceAccess) |
264 | }).then<void>([error](KAsync::Future<void> &future) { | 265 | .then<void>([errorFlag](const KAsync::Error &error) { |
265 | if (*error) { | 266 | if (error) { |
266 | future.setError(1, "Error during sync."); | 267 | *errorFlag = true; |
267 | } else { | 268 | SinkWarning() << "Error during sync."; |
268 | future.setFinished(); | 269 | return KAsync::error<void>(error); |
270 | } | ||
271 | SinkTrace() << "synced."; | ||
272 | return KAsync::null<void>(); | ||
273 | }); | ||
274 | }) | ||
275 | .then<void>([errorFlag]() { | ||
276 | if (*errorFlag) { | ||
277 | return KAsync::error<void>("Error during sync."); | ||
269 | } | 278 | } |
270 | delete error; | 279 | delete errorFlag; |
280 | return KAsync::null<void>(); | ||
271 | }); | 281 | }); |
272 | } | 282 | } |
273 | 283 | ||
274 | template <class DomainType> | 284 | template <class DomainType> |
275 | KAsync::Job<DomainType> Store::fetchOne(const Sink::Query &query) | 285 | KAsync::Job<DomainType> Store::fetchOne(const Sink::Query &query) |
276 | { | 286 | { |
277 | return KAsync::start<DomainType>([query](KAsync::Future<DomainType> &future) { | 287 | return fetch<DomainType>(query, 1).template then<DomainType, QList<typename DomainType::Ptr>>([](const QList<typename DomainType::Ptr> &list) { |
278 | // FIXME We could do this more elegantly if composed jobs would have the correct type (In that case we'd simply return the value from then continuation, and could avoid the | 288 | return KAsync::value(*list.first()); |
279 | // outer job entirely) | ||
280 | fetch<DomainType>(query, 1) | ||
281 | .template then<void, QList<typename DomainType::Ptr>>( | ||
282 | [&future](const QList<typename DomainType::Ptr> &list) { | ||
283 | future.setValue(*list.first()); | ||
284 | future.setFinished(); | ||
285 | }, | ||
286 | [&future](int errorCode, const QString &errorMessage) { | ||
287 | future.setError(errorCode, errorMessage); | ||
288 | future.setFinished(); | ||
289 | }) | ||
290 | .exec(); | ||
291 | }); | 289 | }); |
292 | } | 290 | } |
293 | 291 | ||