diff options
Diffstat (limited to 'common/synchronizer.cpp')
-rw-r--r-- | common/synchronizer.cpp | 23 |
1 files changed, 23 insertions, 0 deletions
diff --git a/common/synchronizer.cpp b/common/synchronizer.cpp index 5bde597..f7dd816 100644 --- a/common/synchronizer.cpp +++ b/common/synchronizer.cpp | |||
@@ -27,6 +27,8 @@ | |||
27 | #include "createentity_generated.h" | 27 | #include "createentity_generated.h" |
28 | #include "modifyentity_generated.h" | 28 | #include "modifyentity_generated.h" |
29 | #include "deleteentity_generated.h" | 29 | #include "deleteentity_generated.h" |
30 | #include "flush_generated.h" | ||
31 | #include "notification_generated.h" | ||
30 | 32 | ||
31 | SINK_DEBUG_AREA("synchronizer") | 33 | SINK_DEBUG_AREA("synchronizer") |
32 | 34 | ||
@@ -263,6 +265,13 @@ KAsync::Job<void> Synchronizer::synchronize(const Sink::QueryBase &query) | |||
263 | return processSyncQueue(); | 265 | return processSyncQueue(); |
264 | } | 266 | } |
265 | 267 | ||
268 | void Synchronizer::flush(int commandId, const QByteArray &flushId) | ||
269 | { | ||
270 | SinkTrace() << "Flushing the synchronization queue"; | ||
271 | mSyncRequestQueue << Synchronizer::SyncRequest{Synchronizer::SyncRequest::Flush, commandId, flushId}; | ||
272 | processSyncQueue().exec(); | ||
273 | } | ||
274 | |||
266 | KAsync::Job<void> Synchronizer::processSyncQueue() | 275 | KAsync::Job<void> Synchronizer::processSyncQueue() |
267 | { | 276 | { |
268 | if (mSyncRequestQueue.isEmpty() || mSyncInProgress) { | 277 | if (mSyncRequestQueue.isEmpty() || mSyncInProgress) { |
@@ -279,6 +288,20 @@ KAsync::Job<void> Synchronizer::processSyncQueue() | |||
279 | //Commit after every request, so implementations only have to commit more if they add a lot of data. | 288 | //Commit after every request, so implementations only have to commit more if they add a lot of data. |
280 | commit(); | 289 | commit(); |
281 | }); | 290 | }); |
291 | } else if (request.requestType == Synchronizer::SyncRequest::Flush) { | ||
292 | if (request.flushType == Flush::FlushReplayQueue) { | ||
293 | SinkTrace() << "Emitting flush completion."; | ||
294 | Sink::Notification n; | ||
295 | n.type = Sink::Notification::FlushCompletion; | ||
296 | n.id = request.flushId; | ||
297 | emit notify(n); | ||
298 | } else { | ||
299 | flatbuffers::FlatBufferBuilder fbb; | ||
300 | auto flushId = fbb.CreateString(request.flushId); | ||
301 | auto location = Sink::Commands::CreateFlush(fbb, flushId, static_cast<int>(Sink::Flush::FlushSynchronization)); | ||
302 | Sink::Commands::FinishFlushBuffer(fbb, location); | ||
303 | enqueueCommand(Sink::Commands::FlushCommand, BufferUtils::extractBuffer(fbb)); | ||
304 | } | ||
282 | } else { | 305 | } else { |
283 | job = replayNextRevision(); | 306 | job = replayNextRevision(); |
284 | } | 307 | } |