summaryrefslogtreecommitdiffstats
path: root/common/resourceaccess.cpp
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2016-11-25 08:27:06 +0100
committerChristian Mollekopf <chrigi_1@fastmail.fm>2016-11-25 09:23:55 +0100
commit22af1ed535b4afc8db3804e72bc5adb1a1b28d60 (patch)
tree07665f41d5b40d658e95a64bb76020f1fd9d088e /common/resourceaccess.cpp
parent64d7d7bdd1edb2bcc305ca007784d0708cf7ef3c (diff)
downloadsink-22af1ed535b4afc8db3804e72bc5adb1a1b28d60.tar.gz
sink-22af1ed535b4afc8db3804e72bc5adb1a1b28d60.zip
Added the flush command.
Instead of trying to actually flush queues, we send a special command through the same queues as the other commands and can thus guarantee that the respective commands have been processed without blocking anything.
Diffstat (limited to 'common/resourceaccess.cpp')
-rw-r--r--common/resourceaccess.cpp25
1 files changed, 13 insertions, 12 deletions
diff --git a/common/resourceaccess.cpp b/common/resourceaccess.cpp
index 822b5cd..b46e8b2 100644
--- a/common/resourceaccess.cpp
+++ b/common/resourceaccess.cpp
@@ -31,6 +31,7 @@
31#include "common/deleteentity_generated.h" 31#include "common/deleteentity_generated.h"
32#include "common/revisionreplayed_generated.h" 32#include "common/revisionreplayed_generated.h"
33#include "common/inspection_generated.h" 33#include "common/inspection_generated.h"
34#include "common/flush_generated.h"
34#include "common/entitybuffer.h" 35#include "common/entitybuffer.h"
35#include "common/bufferutils.h" 36#include "common/bufferutils.h"
36#include "common/test.h" 37#include "common/test.h"
@@ -291,16 +292,6 @@ KAsync::Job<void> ResourceAccess::sendCommand(int commandId, flatbuffers::FlatBu
291 }); 292 });
292} 293}
293 294
294KAsync::Job<void> ResourceAccess::synchronizeResource(bool sourceSync, bool localSync)
295{
296 SinkTrace() << "Sending synchronize command: " << sourceSync << localSync;
297 flatbuffers::FlatBufferBuilder fbb;
298 auto command = Sink::Commands::CreateSynchronize(fbb, sourceSync, localSync);
299 Sink::Commands::FinishSynchronizeBuffer(fbb, command);
300 open();
301 return sendCommand(Commands::SynchronizeCommand, fbb);
302}
303
304KAsync::Job<void> ResourceAccess::synchronizeResource(const Sink::QueryBase &query) 295KAsync::Job<void> ResourceAccess::synchronizeResource(const Sink::QueryBase &query)
305{ 296{
306 flatbuffers::FlatBufferBuilder fbb; 297 flatbuffers::FlatBufferBuilder fbb;
@@ -311,8 +302,6 @@ KAsync::Job<void> ResourceAccess::synchronizeResource(const Sink::QueryBase &que
311 } 302 }
312 auto q = fbb.CreateString(queryString.toStdString()); 303 auto q = fbb.CreateString(queryString.toStdString());
313 auto builder = Sink::Commands::SynchronizeBuilder(fbb); 304 auto builder = Sink::Commands::SynchronizeBuilder(fbb);
314 builder.add_sourceSync(true);
315 builder.add_localSync(false);
316 builder.add_query(q); 305 builder.add_query(q);
317 Sink::Commands::FinishSynchronizeBuffer(fbb, builder.Finish()); 306 Sink::Commands::FinishSynchronizeBuffer(fbb, builder.Finish());
318 307
@@ -390,6 +379,16 @@ ResourceAccess::sendInspectionCommand(int inspectionType, const QByteArray &insp
390 return sendCommand(Sink::Commands::InspectionCommand, fbb); 379 return sendCommand(Sink::Commands::InspectionCommand, fbb);
391} 380}
392 381
382KAsync::Job<void> ResourceAccess::sendFlushCommand(int flushType, const QByteArray &flushId)
383{
384 flatbuffers::FlatBufferBuilder fbb;
385 auto id = fbb.CreateString(flushId.toStdString());
386 auto location = Sink::Commands::CreateFlush(fbb, id, flushType);
387 Sink::Commands::FinishFlushBuffer(fbb, location);
388 open();
389 return sendCommand(Sink::Commands::FlushCommand, fbb);
390}
391
393void ResourceAccess::open() 392void ResourceAccess::open()
394{ 393{
395 if (d->socket && d->socket->isValid()) { 394 if (d->socket && d->socket->isValid()) {
@@ -613,6 +612,8 @@ bool ResourceAccess::processMessageBuffer()
613 [[clang::fallthrough]]; 612 [[clang::fallthrough]];
614 case Sink::Notification::Warning: 613 case Sink::Notification::Warning:
615 [[clang::fallthrough]]; 614 [[clang::fallthrough]];
615 case Sink::Notification::FlushCompletion:
616 [[clang::fallthrough]];
616 case Sink::Notification::Progress: { 617 case Sink::Notification::Progress: {
617 auto n = getNotification(buffer); 618 auto n = getNotification(buffer);
618 SinkTrace() << "Received notification: " << n.type; 619 SinkTrace() << "Received notification: " << n.type;