diff options
author | Christian Mollekopf <chrigi_1@fastmail.fm> | 2016-11-25 08:27:06 +0100 |
---|---|---|
committer | Christian Mollekopf <chrigi_1@fastmail.fm> | 2016-11-25 09:23:55 +0100 |
commit | 22af1ed535b4afc8db3804e72bc5adb1a1b28d60 (patch) | |
tree | 07665f41d5b40d658e95a64bb76020f1fd9d088e /common/resourceaccess.cpp | |
parent | 64d7d7bdd1edb2bcc305ca007784d0708cf7ef3c (diff) | |
download | sink-22af1ed535b4afc8db3804e72bc5adb1a1b28d60.tar.gz sink-22af1ed535b4afc8db3804e72bc5adb1a1b28d60.zip |
Added the flush command.
Instead of trying to actually flush queues, we send a special command
through the same queues as the other commands and can thus guarantee
that the respective commands have been processed without blocking
anything.
Diffstat (limited to 'common/resourceaccess.cpp')
-rw-r--r-- | common/resourceaccess.cpp | 25 |
1 files changed, 13 insertions, 12 deletions
diff --git a/common/resourceaccess.cpp b/common/resourceaccess.cpp index 822b5cd..b46e8b2 100644 --- a/common/resourceaccess.cpp +++ b/common/resourceaccess.cpp | |||
@@ -31,6 +31,7 @@ | |||
31 | #include "common/deleteentity_generated.h" | 31 | #include "common/deleteentity_generated.h" |
32 | #include "common/revisionreplayed_generated.h" | 32 | #include "common/revisionreplayed_generated.h" |
33 | #include "common/inspection_generated.h" | 33 | #include "common/inspection_generated.h" |
34 | #include "common/flush_generated.h" | ||
34 | #include "common/entitybuffer.h" | 35 | #include "common/entitybuffer.h" |
35 | #include "common/bufferutils.h" | 36 | #include "common/bufferutils.h" |
36 | #include "common/test.h" | 37 | #include "common/test.h" |
@@ -291,16 +292,6 @@ KAsync::Job<void> ResourceAccess::sendCommand(int commandId, flatbuffers::FlatBu | |||
291 | }); | 292 | }); |
292 | } | 293 | } |
293 | 294 | ||
294 | KAsync::Job<void> ResourceAccess::synchronizeResource(bool sourceSync, bool localSync) | ||
295 | { | ||
296 | SinkTrace() << "Sending synchronize command: " << sourceSync << localSync; | ||
297 | flatbuffers::FlatBufferBuilder fbb; | ||
298 | auto command = Sink::Commands::CreateSynchronize(fbb, sourceSync, localSync); | ||
299 | Sink::Commands::FinishSynchronizeBuffer(fbb, command); | ||
300 | open(); | ||
301 | return sendCommand(Commands::SynchronizeCommand, fbb); | ||
302 | } | ||
303 | |||
304 | KAsync::Job<void> ResourceAccess::synchronizeResource(const Sink::QueryBase &query) | 295 | KAsync::Job<void> ResourceAccess::synchronizeResource(const Sink::QueryBase &query) |
305 | { | 296 | { |
306 | flatbuffers::FlatBufferBuilder fbb; | 297 | flatbuffers::FlatBufferBuilder fbb; |
@@ -311,8 +302,6 @@ KAsync::Job<void> ResourceAccess::synchronizeResource(const Sink::QueryBase &que | |||
311 | } | 302 | } |
312 | auto q = fbb.CreateString(queryString.toStdString()); | 303 | auto q = fbb.CreateString(queryString.toStdString()); |
313 | auto builder = Sink::Commands::SynchronizeBuilder(fbb); | 304 | auto builder = Sink::Commands::SynchronizeBuilder(fbb); |
314 | builder.add_sourceSync(true); | ||
315 | builder.add_localSync(false); | ||
316 | builder.add_query(q); | 305 | builder.add_query(q); |
317 | Sink::Commands::FinishSynchronizeBuffer(fbb, builder.Finish()); | 306 | Sink::Commands::FinishSynchronizeBuffer(fbb, builder.Finish()); |
318 | 307 | ||
@@ -390,6 +379,16 @@ ResourceAccess::sendInspectionCommand(int inspectionType, const QByteArray &insp | |||
390 | return sendCommand(Sink::Commands::InspectionCommand, fbb); | 379 | return sendCommand(Sink::Commands::InspectionCommand, fbb); |
391 | } | 380 | } |
392 | 381 | ||
382 | KAsync::Job<void> ResourceAccess::sendFlushCommand(int flushType, const QByteArray &flushId) | ||
383 | { | ||
384 | flatbuffers::FlatBufferBuilder fbb; | ||
385 | auto id = fbb.CreateString(flushId.toStdString()); | ||
386 | auto location = Sink::Commands::CreateFlush(fbb, id, flushType); | ||
387 | Sink::Commands::FinishFlushBuffer(fbb, location); | ||
388 | open(); | ||
389 | return sendCommand(Sink::Commands::FlushCommand, fbb); | ||
390 | } | ||
391 | |||
393 | void ResourceAccess::open() | 392 | void ResourceAccess::open() |
394 | { | 393 | { |
395 | if (d->socket && d->socket->isValid()) { | 394 | if (d->socket && d->socket->isValid()) { |
@@ -613,6 +612,8 @@ bool ResourceAccess::processMessageBuffer() | |||
613 | [[clang::fallthrough]]; | 612 | [[clang::fallthrough]]; |
614 | case Sink::Notification::Warning: | 613 | case Sink::Notification::Warning: |
615 | [[clang::fallthrough]]; | 614 | [[clang::fallthrough]]; |
615 | case Sink::Notification::FlushCompletion: | ||
616 | [[clang::fallthrough]]; | ||
616 | case Sink::Notification::Progress: { | 617 | case Sink::Notification::Progress: { |
617 | auto n = getNotification(buffer); | 618 | auto n = getNotification(buffer); |
618 | SinkTrace() << "Received notification: " << n.type; | 619 | SinkTrace() << "Received notification: " << n.type; |