diff options
author | Christian Mollekopf <chrigi_1@fastmail.fm> | 2016-11-25 08:27:06 +0100 |
---|---|---|
committer | Christian Mollekopf <chrigi_1@fastmail.fm> | 2016-11-25 09:23:55 +0100 |
commit | 22af1ed535b4afc8db3804e72bc5adb1a1b28d60 (patch) | |
tree | 07665f41d5b40d658e95a64bb76020f1fd9d088e /common/resourcecontrol.cpp | |
parent | 64d7d7bdd1edb2bcc305ca007784d0708cf7ef3c (diff) | |
download | sink-22af1ed535b4afc8db3804e72bc5adb1a1b28d60.tar.gz sink-22af1ed535b4afc8db3804e72bc5adb1a1b28d60.zip |
Added the flush command.
Instead of trying to actually flush queues, we send a special command
through the same queues as the other commands and can thus guarantee
that the respective commands have been processed without blocking
anything.
Diffstat (limited to 'common/resourcecontrol.cpp')
-rw-r--r-- | common/resourcecontrol.cpp | 34 |
1 files changed, 28 insertions, 6 deletions
diff --git a/common/resourcecontrol.cpp b/common/resourcecontrol.cpp index 3568844..af98b8b 100644 --- a/common/resourcecontrol.cpp +++ b/common/resourcecontrol.cpp | |||
@@ -85,17 +85,39 @@ KAsync::Job<void> ResourceControl::flushMessageQueue(const QByteArrayList &resou | |||
85 | SinkTrace() << "flushMessageQueue" << resourceIdentifier; | 85 | SinkTrace() << "flushMessageQueue" << resourceIdentifier; |
86 | return KAsync::value(resourceIdentifier) | 86 | return KAsync::value(resourceIdentifier) |
87 | .template each([](const QByteArray &resource) { | 87 | .template each([](const QByteArray &resource) { |
88 | SinkTrace() << "Flushing message queue " << resource; | 88 | return flushMessageQueue(resource); |
89 | auto resourceAccess = ResourceAccessFactory::instance().getAccess(resource, ResourceConfig::getResourceType(resource)); | ||
90 | resourceAccess->open(); | ||
91 | return resourceAccess->synchronizeResource(false, true) | ||
92 | .addToContext(resourceAccess); | ||
93 | }); | 89 | }); |
94 | } | 90 | } |
95 | 91 | ||
96 | KAsync::Job<void> ResourceControl::flushMessageQueue(const QByteArray &resourceIdentifier) | 92 | KAsync::Job<void> ResourceControl::flushMessageQueue(const QByteArray &resourceIdentifier) |
97 | { | 93 | { |
98 | return flushMessageQueue(QByteArrayList() << resourceIdentifier); | 94 | return flush(Flush::FlushUserQueue, resourceIdentifier).then(flush(Flush::FlushSynchronization, resourceIdentifier)); |
95 | } | ||
96 | |||
97 | KAsync::Job<void> ResourceControl::flush(Flush::FlushType type, const QByteArray &resourceIdentifier) | ||
98 | { | ||
99 | auto resourceAccess = ResourceAccessFactory::instance().getAccess(resourceIdentifier, ResourceConfig::getResourceType(resourceIdentifier)); | ||
100 | auto notifier = QSharedPointer<Sink::Notifier>::create(resourceAccess); | ||
101 | auto id = QUuid::createUuid().toByteArray(); | ||
102 | return KAsync::start<void>([=](KAsync::Future<void> &future) { | ||
103 | SinkTrace() << "Waiting for notification notification " << id; | ||
104 | notifier->registerHandler([&future, id](const Notification ¬ification) { | ||
105 | SinkTrace() << "Received notification " << notification.type << notification.id; | ||
106 | if (notification.id == id) { | ||
107 | SinkTrace() << "FlushComplete"; | ||
108 | if (notification.code) { | ||
109 | SinkWarning() << "Flush return an error"; | ||
110 | future.setError(-1, "Flush returned an error: " + notification.message); | ||
111 | } else { | ||
112 | future.setFinished(); | ||
113 | } | ||
114 | } | ||
115 | }); | ||
116 | resourceAccess->sendFlushCommand(type, id).onError([&future] (const KAsync::Error &error) { | ||
117 | SinkWarning() << "Failed to send command"; | ||
118 | future.setError(1, "Failed to send command: " + error.errorMessage); | ||
119 | }).exec(); | ||
120 | }); | ||
99 | } | 121 | } |
100 | 122 | ||
101 | KAsync::Job<void> ResourceControl::flushReplayQueue(const QByteArrayList &resourceIdentifier) | 123 | KAsync::Job<void> ResourceControl::flushReplayQueue(const QByteArrayList &resourceIdentifier) |