diff options
author | Christian Mollekopf <chrigi_1@fastmail.fm> | 2016-11-25 08:27:06 +0100 |
---|---|---|
committer | Christian Mollekopf <chrigi_1@fastmail.fm> | 2016-11-25 09:23:55 +0100 |
commit | 22af1ed535b4afc8db3804e72bc5adb1a1b28d60 (patch) | |
tree | 07665f41d5b40d658e95a64bb76020f1fd9d088e /common/listener.cpp | |
parent | 64d7d7bdd1edb2bcc305ca007784d0708cf7ef3c (diff) | |
download | sink-22af1ed535b4afc8db3804e72bc5adb1a1b28d60.tar.gz sink-22af1ed535b4afc8db3804e72bc5adb1a1b28d60.zip |
Added the flush command.
Instead of trying to actually flush queues, we send a special command
through the same queues as the other commands and can thus guarantee
that the respective commands have been processed without blocking
anything.
Diffstat (limited to 'common/listener.cpp')
-rw-r--r-- | common/listener.cpp | 19 |
1 files changed, 7 insertions, 12 deletions
diff --git a/common/listener.cpp b/common/listener.cpp index c3c6bc2..2ab0333 100644 --- a/common/listener.cpp +++ b/common/listener.cpp | |||
@@ -33,7 +33,6 @@ | |||
33 | #include "common/synchronize_generated.h" | 33 | #include "common/synchronize_generated.h" |
34 | #include "common/notification_generated.h" | 34 | #include "common/notification_generated.h" |
35 | #include "common/revisionreplayed_generated.h" | 35 | #include "common/revisionreplayed_generated.h" |
36 | #include "common/inspection_generated.h" | ||
37 | 36 | ||
38 | #include <QLocalServer> | 37 | #include <QLocalServer> |
39 | #include <QLocalSocket> | 38 | #include <QLocalSocket> |
@@ -244,18 +243,13 @@ void Listener::processCommand(int commandId, uint messageId, const QByteArray &c | |||
244 | auto timer = QSharedPointer<QTime>::create(); | 243 | auto timer = QSharedPointer<QTime>::create(); |
245 | timer->start(); | 244 | timer->start(); |
246 | auto job = KAsync::null<void>(); | 245 | auto job = KAsync::null<void>(); |
247 | if (buffer->sourceSync()) { | 246 | Sink::QueryBase query; |
248 | Sink::QueryBase query; | 247 | if (buffer->query()) { |
249 | if (buffer->query()) { | 248 | auto data = QByteArray::fromStdString(buffer->query()->str()); |
250 | auto data = QByteArray::fromStdString(buffer->query()->str()); | 249 | QDataStream stream(&data, QIODevice::ReadOnly); |
251 | QDataStream stream(&data, QIODevice::ReadOnly); | 250 | stream >> query; |
252 | stream >> query; | ||
253 | } | ||
254 | job = loadResource().synchronizeWithSource(query); | ||
255 | } | ||
256 | if (buffer->localSync()) { | ||
257 | job = job.then<void>(loadResource().processAllMessages()); | ||
258 | } | 251 | } |
252 | job = loadResource().synchronizeWithSource(query); | ||
259 | job.then<void>([callback, timer](const KAsync::Error &error) { | 253 | job.then<void>([callback, timer](const KAsync::Error &error) { |
260 | if (error) { | 254 | if (error) { |
261 | SinkWarning() << "Sync failed: " << error.errorMessage; | 255 | SinkWarning() << "Sync failed: " << error.errorMessage; |
@@ -279,6 +273,7 @@ void Listener::processCommand(int commandId, uint messageId, const QByteArray &c | |||
279 | case Sink::Commands::DeleteEntityCommand: | 273 | case Sink::Commands::DeleteEntityCommand: |
280 | case Sink::Commands::ModifyEntityCommand: | 274 | case Sink::Commands::ModifyEntityCommand: |
281 | case Sink::Commands::CreateEntityCommand: | 275 | case Sink::Commands::CreateEntityCommand: |
276 | case Sink::Commands::FlushCommand: | ||
282 | SinkTrace() << "Command id " << messageId << " of type \"" << Sink::Commands::name(commandId) << "\" from " << client.name; | 277 | SinkTrace() << "Command id " << messageId << " of type \"" << Sink::Commands::name(commandId) << "\" from " << client.name; |
283 | loadResource().processCommand(commandId, commandBuffer); | 278 | loadResource().processCommand(commandId, commandBuffer); |
284 | break; | 279 | break; |