diff options
Diffstat (limited to 'common/resourcecontrol.cpp')
-rw-r--r-- | common/resourcecontrol.cpp | 34 |
1 files changed, 28 insertions, 6 deletions
diff --git a/common/resourcecontrol.cpp b/common/resourcecontrol.cpp index 3568844..af98b8b 100644 --- a/common/resourcecontrol.cpp +++ b/common/resourcecontrol.cpp | |||
@@ -85,17 +85,39 @@ KAsync::Job<void> ResourceControl::flushMessageQueue(const QByteArrayList &resou | |||
85 | SinkTrace() << "flushMessageQueue" << resourceIdentifier; | 85 | SinkTrace() << "flushMessageQueue" << resourceIdentifier; |
86 | return KAsync::value(resourceIdentifier) | 86 | return KAsync::value(resourceIdentifier) |
87 | .template each([](const QByteArray &resource) { | 87 | .template each([](const QByteArray &resource) { |
88 | SinkTrace() << "Flushing message queue " << resource; | 88 | return flushMessageQueue(resource); |
89 | auto resourceAccess = ResourceAccessFactory::instance().getAccess(resource, ResourceConfig::getResourceType(resource)); | ||
90 | resourceAccess->open(); | ||
91 | return resourceAccess->synchronizeResource(false, true) | ||
92 | .addToContext(resourceAccess); | ||
93 | }); | 89 | }); |
94 | } | 90 | } |
95 | 91 | ||
96 | KAsync::Job<void> ResourceControl::flushMessageQueue(const QByteArray &resourceIdentifier) | 92 | KAsync::Job<void> ResourceControl::flushMessageQueue(const QByteArray &resourceIdentifier) |
97 | { | 93 | { |
98 | return flushMessageQueue(QByteArrayList() << resourceIdentifier); | 94 | return flush(Flush::FlushUserQueue, resourceIdentifier).then(flush(Flush::FlushSynchronization, resourceIdentifier)); |
95 | } | ||
96 | |||
97 | KAsync::Job<void> ResourceControl::flush(Flush::FlushType type, const QByteArray &resourceIdentifier) | ||
98 | { | ||
99 | auto resourceAccess = ResourceAccessFactory::instance().getAccess(resourceIdentifier, ResourceConfig::getResourceType(resourceIdentifier)); | ||
100 | auto notifier = QSharedPointer<Sink::Notifier>::create(resourceAccess); | ||
101 | auto id = QUuid::createUuid().toByteArray(); | ||
102 | return KAsync::start<void>([=](KAsync::Future<void> &future) { | ||
103 | SinkTrace() << "Waiting for notification notification " << id; | ||
104 | notifier->registerHandler([&future, id](const Notification ¬ification) { | ||
105 | SinkTrace() << "Received notification " << notification.type << notification.id; | ||
106 | if (notification.id == id) { | ||
107 | SinkTrace() << "FlushComplete"; | ||
108 | if (notification.code) { | ||
109 | SinkWarning() << "Flush return an error"; | ||
110 | future.setError(-1, "Flush returned an error: " + notification.message); | ||
111 | } else { | ||
112 | future.setFinished(); | ||
113 | } | ||
114 | } | ||
115 | }); | ||
116 | resourceAccess->sendFlushCommand(type, id).onError([&future] (const KAsync::Error &error) { | ||
117 | SinkWarning() << "Failed to send command"; | ||
118 | future.setError(1, "Failed to send command: " + error.errorMessage); | ||
119 | }).exec(); | ||
120 | }); | ||
99 | } | 121 | } |
100 | 122 | ||
101 | KAsync::Job<void> ResourceControl::flushReplayQueue(const QByteArrayList &resourceIdentifier) | 123 | KAsync::Job<void> ResourceControl::flushReplayQueue(const QByteArrayList &resourceIdentifier) |