summaryrefslogtreecommitdiffstats
path: root/common/synchronizer.cpp
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2016-11-25 08:27:06 +0100
committerChristian Mollekopf <chrigi_1@fastmail.fm>2016-11-25 09:23:55 +0100
commit22af1ed535b4afc8db3804e72bc5adb1a1b28d60 (patch)
tree07665f41d5b40d658e95a64bb76020f1fd9d088e /common/synchronizer.cpp
parent64d7d7bdd1edb2bcc305ca007784d0708cf7ef3c (diff)
downloadsink-22af1ed535b4afc8db3804e72bc5adb1a1b28d60.tar.gz
sink-22af1ed535b4afc8db3804e72bc5adb1a1b28d60.zip
Added the flush command.
Instead of trying to actually flush queues, we send a special command through the same queues as the other commands and can thus guarantee that the respective commands have been processed without blocking anything.
Diffstat (limited to 'common/synchronizer.cpp')
-rw-r--r--common/synchronizer.cpp23
1 files changed, 23 insertions, 0 deletions
diff --git a/common/synchronizer.cpp b/common/synchronizer.cpp
index 5bde597..f7dd816 100644
--- a/common/synchronizer.cpp
+++ b/common/synchronizer.cpp
@@ -27,6 +27,8 @@
27#include "createentity_generated.h" 27#include "createentity_generated.h"
28#include "modifyentity_generated.h" 28#include "modifyentity_generated.h"
29#include "deleteentity_generated.h" 29#include "deleteentity_generated.h"
30#include "flush_generated.h"
31#include "notification_generated.h"
30 32
31SINK_DEBUG_AREA("synchronizer") 33SINK_DEBUG_AREA("synchronizer")
32 34
@@ -263,6 +265,13 @@ KAsync::Job<void> Synchronizer::synchronize(const Sink::QueryBase &query)
263 return processSyncQueue(); 265 return processSyncQueue();
264} 266}
265 267
268void Synchronizer::flush(int commandId, const QByteArray &flushId)
269{
270 SinkTrace() << "Flushing the synchronization queue";
271 mSyncRequestQueue << Synchronizer::SyncRequest{Synchronizer::SyncRequest::Flush, commandId, flushId};
272 processSyncQueue().exec();
273}
274
266KAsync::Job<void> Synchronizer::processSyncQueue() 275KAsync::Job<void> Synchronizer::processSyncQueue()
267{ 276{
268 if (mSyncRequestQueue.isEmpty() || mSyncInProgress) { 277 if (mSyncRequestQueue.isEmpty() || mSyncInProgress) {
@@ -279,6 +288,20 @@ KAsync::Job<void> Synchronizer::processSyncQueue()
279 //Commit after every request, so implementations only have to commit more if they add a lot of data. 288 //Commit after every request, so implementations only have to commit more if they add a lot of data.
280 commit(); 289 commit();
281 }); 290 });
291 } else if (request.requestType == Synchronizer::SyncRequest::Flush) {
292 if (request.flushType == Flush::FlushReplayQueue) {
293 SinkTrace() << "Emitting flush completion.";
294 Sink::Notification n;
295 n.type = Sink::Notification::FlushCompletion;
296 n.id = request.flushId;
297 emit notify(n);
298 } else {
299 flatbuffers::FlatBufferBuilder fbb;
300 auto flushId = fbb.CreateString(request.flushId);
301 auto location = Sink::Commands::CreateFlush(fbb, flushId, static_cast<int>(Sink::Flush::FlushSynchronization));
302 Sink::Commands::FinishFlushBuffer(fbb, location);
303 enqueueCommand(Sink::Commands::FlushCommand, BufferUtils::extractBuffer(fbb));
304 }
282 } else { 305 } else {
283 job = replayNextRevision(); 306 job = replayNextRevision();
284 } 307 }