From 22af1ed535b4afc8db3804e72bc5adb1a1b28d60 Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Fri, 25 Nov 2016 08:27:06 +0100 Subject: Added the flush command. Instead of trying to actually flush queues, we send a special command through the same queues as the other commands and can thus guarantee that the respective commands have been processed without blocking anything. --- common/synchronizer.cpp | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) (limited to 'common/synchronizer.cpp') diff --git a/common/synchronizer.cpp b/common/synchronizer.cpp index 5bde597..f7dd816 100644 --- a/common/synchronizer.cpp +++ b/common/synchronizer.cpp @@ -27,6 +27,8 @@ #include "createentity_generated.h" #include "modifyentity_generated.h" #include "deleteentity_generated.h" +#include "flush_generated.h" +#include "notification_generated.h" SINK_DEBUG_AREA("synchronizer") @@ -263,6 +265,13 @@ KAsync::Job Synchronizer::synchronize(const Sink::QueryBase &query) return processSyncQueue(); } +void Synchronizer::flush(int commandId, const QByteArray &flushId) +{ + SinkTrace() << "Flushing the synchronization queue"; + mSyncRequestQueue << Synchronizer::SyncRequest{Synchronizer::SyncRequest::Flush, commandId, flushId}; + processSyncQueue().exec(); +} + KAsync::Job Synchronizer::processSyncQueue() { if (mSyncRequestQueue.isEmpty() || mSyncInProgress) { @@ -279,6 +288,20 @@ KAsync::Job Synchronizer::processSyncQueue() //Commit after every request, so implementations only have to commit more if they add a lot of data. commit(); }); + } else if (request.requestType == Synchronizer::SyncRequest::Flush) { + if (request.flushType == Flush::FlushReplayQueue) { + SinkTrace() << "Emitting flush completion."; + Sink::Notification n; + n.type = Sink::Notification::FlushCompletion; + n.id = request.flushId; + emit notify(n); + } else { + flatbuffers::FlatBufferBuilder fbb; + auto flushId = fbb.CreateString(request.flushId); + auto location = Sink::Commands::CreateFlush(fbb, flushId, static_cast(Sink::Flush::FlushSynchronization)); + Sink::Commands::FinishFlushBuffer(fbb, location); + enqueueCommand(Sink::Commands::FlushCommand, BufferUtils::extractBuffer(fbb)); + } } else { job = replayNextRevision(); } -- cgit v1.2.3