summaryrefslogtreecommitdiffstats
path: root/common/synchronizer.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'common/synchronizer.cpp')
-rw-r--r--common/synchronizer.cpp23
1 files changed, 23 insertions, 0 deletions
diff --git a/common/synchronizer.cpp b/common/synchronizer.cpp
index 5bde597..f7dd816 100644
--- a/common/synchronizer.cpp
+++ b/common/synchronizer.cpp
@@ -27,6 +27,8 @@
27#include "createentity_generated.h" 27#include "createentity_generated.h"
28#include "modifyentity_generated.h" 28#include "modifyentity_generated.h"
29#include "deleteentity_generated.h" 29#include "deleteentity_generated.h"
30#include "flush_generated.h"
31#include "notification_generated.h"
30 32
31SINK_DEBUG_AREA("synchronizer") 33SINK_DEBUG_AREA("synchronizer")
32 34
@@ -263,6 +265,13 @@ KAsync::Job<void> Synchronizer::synchronize(const Sink::QueryBase &query)
263 return processSyncQueue(); 265 return processSyncQueue();
264} 266}
265 267
268void Synchronizer::flush(int commandId, const QByteArray &flushId)
269{
270 SinkTrace() << "Flushing the synchronization queue";
271 mSyncRequestQueue << Synchronizer::SyncRequest{Synchronizer::SyncRequest::Flush, commandId, flushId};
272 processSyncQueue().exec();
273}
274
266KAsync::Job<void> Synchronizer::processSyncQueue() 275KAsync::Job<void> Synchronizer::processSyncQueue()
267{ 276{
268 if (mSyncRequestQueue.isEmpty() || mSyncInProgress) { 277 if (mSyncRequestQueue.isEmpty() || mSyncInProgress) {
@@ -279,6 +288,20 @@ KAsync::Job<void> Synchronizer::processSyncQueue()
279 //Commit after every request, so implementations only have to commit more if they add a lot of data. 288 //Commit after every request, so implementations only have to commit more if they add a lot of data.
280 commit(); 289 commit();
281 }); 290 });
291 } else if (request.requestType == Synchronizer::SyncRequest::Flush) {
292 if (request.flushType == Flush::FlushReplayQueue) {
293 SinkTrace() << "Emitting flush completion.";
294 Sink::Notification n;
295 n.type = Sink::Notification::FlushCompletion;
296 n.id = request.flushId;
297 emit notify(n);
298 } else {
299 flatbuffers::FlatBufferBuilder fbb;
300 auto flushId = fbb.CreateString(request.flushId);
301 auto location = Sink::Commands::CreateFlush(fbb, flushId, static_cast<int>(Sink::Flush::FlushSynchronization));
302 Sink::Commands::FinishFlushBuffer(fbb, location);
303 enqueueCommand(Sink::Commands::FlushCommand, BufferUtils::extractBuffer(fbb));
304 }
282 } else { 305 } else {
283 job = replayNextRevision(); 306 job = replayNextRevision();
284 } 307 }