summaryrefslogtreecommitdiffstats
path: root/common/listener.cpp
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2016-11-25 08:27:06 +0100
committerChristian Mollekopf <chrigi_1@fastmail.fm>2016-11-25 09:23:55 +0100
commit22af1ed535b4afc8db3804e72bc5adb1a1b28d60 (patch)
tree07665f41d5b40d658e95a64bb76020f1fd9d088e /common/listener.cpp
parent64d7d7bdd1edb2bcc305ca007784d0708cf7ef3c (diff)
downloadsink-22af1ed535b4afc8db3804e72bc5adb1a1b28d60.tar.gz
sink-22af1ed535b4afc8db3804e72bc5adb1a1b28d60.zip
Added the flush command.
Instead of trying to actually flush queues, we send a special command through the same queues as the other commands and can thus guarantee that the respective commands have been processed without blocking anything.
Diffstat (limited to 'common/listener.cpp')
-rw-r--r--common/listener.cpp19
1 files changed, 7 insertions, 12 deletions
diff --git a/common/listener.cpp b/common/listener.cpp
index c3c6bc2..2ab0333 100644
--- a/common/listener.cpp
+++ b/common/listener.cpp
@@ -33,7 +33,6 @@
33#include "common/synchronize_generated.h" 33#include "common/synchronize_generated.h"
34#include "common/notification_generated.h" 34#include "common/notification_generated.h"
35#include "common/revisionreplayed_generated.h" 35#include "common/revisionreplayed_generated.h"
36#include "common/inspection_generated.h"
37 36
38#include <QLocalServer> 37#include <QLocalServer>
39#include <QLocalSocket> 38#include <QLocalSocket>
@@ -244,18 +243,13 @@ void Listener::processCommand(int commandId, uint messageId, const QByteArray &c
244 auto timer = QSharedPointer<QTime>::create(); 243 auto timer = QSharedPointer<QTime>::create();
245 timer->start(); 244 timer->start();
246 auto job = KAsync::null<void>(); 245 auto job = KAsync::null<void>();
247 if (buffer->sourceSync()) { 246 Sink::QueryBase query;
248 Sink::QueryBase query; 247 if (buffer->query()) {
249 if (buffer->query()) { 248 auto data = QByteArray::fromStdString(buffer->query()->str());
250 auto data = QByteArray::fromStdString(buffer->query()->str()); 249 QDataStream stream(&data, QIODevice::ReadOnly);
251 QDataStream stream(&data, QIODevice::ReadOnly); 250 stream >> query;
252 stream >> query;
253 }
254 job = loadResource().synchronizeWithSource(query);
255 }
256 if (buffer->localSync()) {
257 job = job.then<void>(loadResource().processAllMessages());
258 } 251 }
252 job = loadResource().synchronizeWithSource(query);
259 job.then<void>([callback, timer](const KAsync::Error &error) { 253 job.then<void>([callback, timer](const KAsync::Error &error) {
260 if (error) { 254 if (error) {
261 SinkWarning() << "Sync failed: " << error.errorMessage; 255 SinkWarning() << "Sync failed: " << error.errorMessage;
@@ -279,6 +273,7 @@ void Listener::processCommand(int commandId, uint messageId, const QByteArray &c
279 case Sink::Commands::DeleteEntityCommand: 273 case Sink::Commands::DeleteEntityCommand:
280 case Sink::Commands::ModifyEntityCommand: 274 case Sink::Commands::ModifyEntityCommand:
281 case Sink::Commands::CreateEntityCommand: 275 case Sink::Commands::CreateEntityCommand:
276 case Sink::Commands::FlushCommand:
282 SinkTrace() << "Command id " << messageId << " of type \"" << Sink::Commands::name(commandId) << "\" from " << client.name; 277 SinkTrace() << "Command id " << messageId << " of type \"" << Sink::Commands::name(commandId) << "\" from " << client.name;
283 loadResource().processCommand(commandId, commandBuffer); 278 loadResource().processCommand(commandId, commandBuffer);
284 break; 279 break;