summaryrefslogtreecommitdiffstats
path: root/common/resourcecontrol.cpp
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2016-11-25 08:27:06 +0100
committerChristian Mollekopf <chrigi_1@fastmail.fm>2016-11-25 09:23:55 +0100
commit22af1ed535b4afc8db3804e72bc5adb1a1b28d60 (patch)
tree07665f41d5b40d658e95a64bb76020f1fd9d088e /common/resourcecontrol.cpp
parent64d7d7bdd1edb2bcc305ca007784d0708cf7ef3c (diff)
downloadsink-22af1ed535b4afc8db3804e72bc5adb1a1b28d60.tar.gz
sink-22af1ed535b4afc8db3804e72bc5adb1a1b28d60.zip
Added the flush command.
Instead of trying to actually flush queues, we send a special command through the same queues as the other commands and can thus guarantee that the respective commands have been processed without blocking anything.
Diffstat (limited to 'common/resourcecontrol.cpp')
-rw-r--r--common/resourcecontrol.cpp34
1 files changed, 28 insertions, 6 deletions
diff --git a/common/resourcecontrol.cpp b/common/resourcecontrol.cpp
index 3568844..af98b8b 100644
--- a/common/resourcecontrol.cpp
+++ b/common/resourcecontrol.cpp
@@ -85,17 +85,39 @@ KAsync::Job<void> ResourceControl::flushMessageQueue(const QByteArrayList &resou
85 SinkTrace() << "flushMessageQueue" << resourceIdentifier; 85 SinkTrace() << "flushMessageQueue" << resourceIdentifier;
86 return KAsync::value(resourceIdentifier) 86 return KAsync::value(resourceIdentifier)
87 .template each([](const QByteArray &resource) { 87 .template each([](const QByteArray &resource) {
88 SinkTrace() << "Flushing message queue " << resource; 88 return flushMessageQueue(resource);
89 auto resourceAccess = ResourceAccessFactory::instance().getAccess(resource, ResourceConfig::getResourceType(resource));
90 resourceAccess->open();
91 return resourceAccess->synchronizeResource(false, true)
92 .addToContext(resourceAccess);
93 }); 89 });
94} 90}
95 91
96KAsync::Job<void> ResourceControl::flushMessageQueue(const QByteArray &resourceIdentifier) 92KAsync::Job<void> ResourceControl::flushMessageQueue(const QByteArray &resourceIdentifier)
97{ 93{
98 return flushMessageQueue(QByteArrayList() << resourceIdentifier); 94 return flush(Flush::FlushUserQueue, resourceIdentifier).then(flush(Flush::FlushSynchronization, resourceIdentifier));
95}
96
97KAsync::Job<void> ResourceControl::flush(Flush::FlushType type, const QByteArray &resourceIdentifier)
98{
99 auto resourceAccess = ResourceAccessFactory::instance().getAccess(resourceIdentifier, ResourceConfig::getResourceType(resourceIdentifier));
100 auto notifier = QSharedPointer<Sink::Notifier>::create(resourceAccess);
101 auto id = QUuid::createUuid().toByteArray();
102 return KAsync::start<void>([=](KAsync::Future<void> &future) {
103 SinkTrace() << "Waiting for notification notification " << id;
104 notifier->registerHandler([&future, id](const Notification &notification) {
105 SinkTrace() << "Received notification " << notification.type << notification.id;
106 if (notification.id == id) {
107 SinkTrace() << "FlushComplete";
108 if (notification.code) {
109 SinkWarning() << "Flush return an error";
110 future.setError(-1, "Flush returned an error: " + notification.message);
111 } else {
112 future.setFinished();
113 }
114 }
115 });
116 resourceAccess->sendFlushCommand(type, id).onError([&future] (const KAsync::Error &error) {
117 SinkWarning() << "Failed to send command";
118 future.setError(1, "Failed to send command: " + error.errorMessage);
119 }).exec();
120 });
99} 121}
100 122
101KAsync::Job<void> ResourceControl::flushReplayQueue(const QByteArrayList &resourceIdentifier) 123KAsync::Job<void> ResourceControl::flushReplayQueue(const QByteArrayList &resourceIdentifier)