summaryrefslogtreecommitdiffstats
path: root/common/commandprocessor.cpp
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2016-11-28 19:43:23 +0100
committerChristian Mollekopf <chrigi_1@fastmail.fm>2016-11-28 20:42:32 +0100
commit7fdcc36a1a352bb869020ade8a8aa697c3e8b80c (patch)
tree041d2ea430698bbea66a7e848621b49900e09357 /common/commandprocessor.cpp
parent938554f267193b652478fc12343819fa45d76034 (diff)
downloadsink-7fdcc36a1a352bb869020ade8a8aa697c3e8b80c.tar.gz
sink-7fdcc36a1a352bb869020ade8a8aa697c3e8b80c.zip
Moved the flush command to the command processor.
Diffstat (limited to 'common/commandprocessor.cpp')
-rw-r--r--common/commandprocessor.cpp39
1 files changed, 28 insertions, 11 deletions
diff --git a/common/commandprocessor.cpp b/common/commandprocessor.cpp
index 4ff352b..57fe524 100644
--- a/common/commandprocessor.cpp
+++ b/common/commandprocessor.cpp
@@ -22,9 +22,11 @@
22#include "commands.h" 22#include "commands.h"
23#include "messagequeue.h" 23#include "messagequeue.h"
24#include "queuedcommand_generated.h" 24#include "queuedcommand_generated.h"
25#include "flush_generated.h"
25#include "inspector.h" 26#include "inspector.h"
26#include "synchronizer.h" 27#include "synchronizer.h"
27#include "pipeline.h" 28#include "pipeline.h"
29#include "bufferutils.h"
28 30
29static int sBatchSize = 100; 31static int sBatchSize = 100;
30 32
@@ -43,11 +45,6 @@ void CommandProcessor::setOldestUsedRevision(qint64 revision)
43 mLowerBoundRevision = revision; 45 mLowerBoundRevision = revision;
44} 46}
45 47
46void CommandProcessor::setFlushCommand(const FlushFunction &f)
47{
48 mFlush = f;
49}
50
51bool CommandProcessor::messagesToProcessAvailable() 48bool CommandProcessor::messagesToProcessAvailable()
52{ 49{
53 for (auto queue : mCommandQueues) { 50 for (auto queue : mCommandQueues) {
@@ -91,12 +88,8 @@ KAsync::Job<qint64> CommandProcessor::processQueuedCommand(const Sink::QueuedCom
91 return mInspector->processCommand(data, size) 88 return mInspector->processCommand(data, size)
92 .syncThen<qint64>([]() { return -1; }); 89 .syncThen<qint64>([]() { return -1; });
93 case Sink::Commands::FlushCommand: 90 case Sink::Commands::FlushCommand:
94 if (mFlush) { 91 return flush(data, size)
95 return mFlush(data, size) 92 .syncThen<qint64>([]() { return -1; });
96 .syncThen<qint64>([]() { return -1; });
97 } else {
98 return KAsync::error<qint64>(-1, "Missing inspection command.");
99 }
100 default: 93 default:
101 return KAsync::error<qint64>(-1, "Unhandled command"); 94 return KAsync::error<qint64>(-1, "Unhandled command");
102 } 95 }
@@ -194,5 +187,29 @@ void CommandProcessor::setSynchronizer(const QSharedPointer<Synchronizer> &synch
194{ 187{
195 mSynchronizer = synchronizer; 188 mSynchronizer = synchronizer;
196 QObject::connect(mSynchronizer.data(), &Synchronizer::notify, this, &CommandProcessor::notify); 189 QObject::connect(mSynchronizer.data(), &Synchronizer::notify, this, &CommandProcessor::notify);
190 setOldestUsedRevision(mSynchronizer->getLastReplayedRevision());
191}
192
193KAsync::Job<void> CommandProcessor::flush(void const *command, size_t size)
194{
195 flatbuffers::Verifier verifier((const uint8_t *)command, size);
196 if (Sink::Commands::VerifyFlushBuffer(verifier)) {
197 auto buffer = Sink::Commands::GetFlush(command);
198 const auto flushType = buffer->type();
199 const auto flushId = BufferUtils::extractBuffer(buffer->id());
200 if (flushType == Sink::Flush::FlushReplayQueue) {
201 SinkTrace() << "Flushing synchronizer ";
202 Q_ASSERT(mSynchronizer);
203 mSynchronizer->flush(flushType, flushId);
204 } else {
205 SinkTrace() << "Emitting flush completion" << flushId;
206 Sink::Notification n;
207 n.type = Sink::Notification::FlushCompletion;
208 n.id = flushId;
209 emit notify(n);
210 }
211 return KAsync::null<void>();
212 }
213 return KAsync::error<void>(-1, "Invalid flush command.");
197} 214}
198 215