diff options
author | Christian Mollekopf <chrigi_1@fastmail.fm> | 2016-11-25 08:27:06 +0100 |
---|---|---|
committer | Christian Mollekopf <chrigi_1@fastmail.fm> | 2016-11-25 09:23:55 +0100 |
commit | 22af1ed535b4afc8db3804e72bc5adb1a1b28d60 (patch) | |
tree | 07665f41d5b40d658e95a64bb76020f1fd9d088e /common/synchronizer.cpp | |
parent | 64d7d7bdd1edb2bcc305ca007784d0708cf7ef3c (diff) | |
download | sink-22af1ed535b4afc8db3804e72bc5adb1a1b28d60.tar.gz sink-22af1ed535b4afc8db3804e72bc5adb1a1b28d60.zip |
Added the flush command.
Instead of trying to actually flush queues, we send a special command
through the same queues as the other commands and can thus guarantee
that the respective commands have been processed without blocking
anything.
Diffstat (limited to 'common/synchronizer.cpp')
-rw-r--r-- | common/synchronizer.cpp | 23 |
1 files changed, 23 insertions, 0 deletions
diff --git a/common/synchronizer.cpp b/common/synchronizer.cpp index 5bde597..f7dd816 100644 --- a/common/synchronizer.cpp +++ b/common/synchronizer.cpp | |||
@@ -27,6 +27,8 @@ | |||
27 | #include "createentity_generated.h" | 27 | #include "createentity_generated.h" |
28 | #include "modifyentity_generated.h" | 28 | #include "modifyentity_generated.h" |
29 | #include "deleteentity_generated.h" | 29 | #include "deleteentity_generated.h" |
30 | #include "flush_generated.h" | ||
31 | #include "notification_generated.h" | ||
30 | 32 | ||
31 | SINK_DEBUG_AREA("synchronizer") | 33 | SINK_DEBUG_AREA("synchronizer") |
32 | 34 | ||
@@ -263,6 +265,13 @@ KAsync::Job<void> Synchronizer::synchronize(const Sink::QueryBase &query) | |||
263 | return processSyncQueue(); | 265 | return processSyncQueue(); |
264 | } | 266 | } |
265 | 267 | ||
268 | void Synchronizer::flush(int commandId, const QByteArray &flushId) | ||
269 | { | ||
270 | SinkTrace() << "Flushing the synchronization queue"; | ||
271 | mSyncRequestQueue << Synchronizer::SyncRequest{Synchronizer::SyncRequest::Flush, commandId, flushId}; | ||
272 | processSyncQueue().exec(); | ||
273 | } | ||
274 | |||
266 | KAsync::Job<void> Synchronizer::processSyncQueue() | 275 | KAsync::Job<void> Synchronizer::processSyncQueue() |
267 | { | 276 | { |
268 | if (mSyncRequestQueue.isEmpty() || mSyncInProgress) { | 277 | if (mSyncRequestQueue.isEmpty() || mSyncInProgress) { |
@@ -279,6 +288,20 @@ KAsync::Job<void> Synchronizer::processSyncQueue() | |||
279 | //Commit after every request, so implementations only have to commit more if they add a lot of data. | 288 | //Commit after every request, so implementations only have to commit more if they add a lot of data. |
280 | commit(); | 289 | commit(); |
281 | }); | 290 | }); |
291 | } else if (request.requestType == Synchronizer::SyncRequest::Flush) { | ||
292 | if (request.flushType == Flush::FlushReplayQueue) { | ||
293 | SinkTrace() << "Emitting flush completion."; | ||
294 | Sink::Notification n; | ||
295 | n.type = Sink::Notification::FlushCompletion; | ||
296 | n.id = request.flushId; | ||
297 | emit notify(n); | ||
298 | } else { | ||
299 | flatbuffers::FlatBufferBuilder fbb; | ||
300 | auto flushId = fbb.CreateString(request.flushId); | ||
301 | auto location = Sink::Commands::CreateFlush(fbb, flushId, static_cast<int>(Sink::Flush::FlushSynchronization)); | ||
302 | Sink::Commands::FinishFlushBuffer(fbb, location); | ||
303 | enqueueCommand(Sink::Commands::FlushCommand, BufferUtils::extractBuffer(fbb)); | ||
304 | } | ||
282 | } else { | 305 | } else { |
283 | job = replayNextRevision(); | 306 | job = replayNextRevision(); |
284 | } | 307 | } |