diff options
author | Christian Mollekopf <chrigi_1@fastmail.fm> | 2016-11-28 19:43:23 +0100 |
---|---|---|
committer | Christian Mollekopf <chrigi_1@fastmail.fm> | 2016-11-28 20:42:32 +0100 |
commit | 7fdcc36a1a352bb869020ade8a8aa697c3e8b80c (patch) | |
tree | 041d2ea430698bbea66a7e848621b49900e09357 /common/commandprocessor.cpp | |
parent | 938554f267193b652478fc12343819fa45d76034 (diff) | |
download | sink-7fdcc36a1a352bb869020ade8a8aa697c3e8b80c.tar.gz sink-7fdcc36a1a352bb869020ade8a8aa697c3e8b80c.zip |
Moved the flush command to the command processor.
Diffstat (limited to 'common/commandprocessor.cpp')
-rw-r--r-- | common/commandprocessor.cpp | 39 |
1 files changed, 28 insertions, 11 deletions
diff --git a/common/commandprocessor.cpp b/common/commandprocessor.cpp index 4ff352b..57fe524 100644 --- a/common/commandprocessor.cpp +++ b/common/commandprocessor.cpp | |||
@@ -22,9 +22,11 @@ | |||
22 | #include "commands.h" | 22 | #include "commands.h" |
23 | #include "messagequeue.h" | 23 | #include "messagequeue.h" |
24 | #include "queuedcommand_generated.h" | 24 | #include "queuedcommand_generated.h" |
25 | #include "flush_generated.h" | ||
25 | #include "inspector.h" | 26 | #include "inspector.h" |
26 | #include "synchronizer.h" | 27 | #include "synchronizer.h" |
27 | #include "pipeline.h" | 28 | #include "pipeline.h" |
29 | #include "bufferutils.h" | ||
28 | 30 | ||
29 | static int sBatchSize = 100; | 31 | static int sBatchSize = 100; |
30 | 32 | ||
@@ -43,11 +45,6 @@ void CommandProcessor::setOldestUsedRevision(qint64 revision) | |||
43 | mLowerBoundRevision = revision; | 45 | mLowerBoundRevision = revision; |
44 | } | 46 | } |
45 | 47 | ||
46 | void CommandProcessor::setFlushCommand(const FlushFunction &f) | ||
47 | { | ||
48 | mFlush = f; | ||
49 | } | ||
50 | |||
51 | bool CommandProcessor::messagesToProcessAvailable() | 48 | bool CommandProcessor::messagesToProcessAvailable() |
52 | { | 49 | { |
53 | for (auto queue : mCommandQueues) { | 50 | for (auto queue : mCommandQueues) { |
@@ -91,12 +88,8 @@ KAsync::Job<qint64> CommandProcessor::processQueuedCommand(const Sink::QueuedCom | |||
91 | return mInspector->processCommand(data, size) | 88 | return mInspector->processCommand(data, size) |
92 | .syncThen<qint64>([]() { return -1; }); | 89 | .syncThen<qint64>([]() { return -1; }); |
93 | case Sink::Commands::FlushCommand: | 90 | case Sink::Commands::FlushCommand: |
94 | if (mFlush) { | 91 | return flush(data, size) |
95 | return mFlush(data, size) | 92 | .syncThen<qint64>([]() { return -1; }); |
96 | .syncThen<qint64>([]() { return -1; }); | ||
97 | } else { | ||
98 | return KAsync::error<qint64>(-1, "Missing inspection command."); | ||
99 | } | ||
100 | default: | 93 | default: |
101 | return KAsync::error<qint64>(-1, "Unhandled command"); | 94 | return KAsync::error<qint64>(-1, "Unhandled command"); |
102 | } | 95 | } |
@@ -194,5 +187,29 @@ void CommandProcessor::setSynchronizer(const QSharedPointer<Synchronizer> &synch | |||
194 | { | 187 | { |
195 | mSynchronizer = synchronizer; | 188 | mSynchronizer = synchronizer; |
196 | QObject::connect(mSynchronizer.data(), &Synchronizer::notify, this, &CommandProcessor::notify); | 189 | QObject::connect(mSynchronizer.data(), &Synchronizer::notify, this, &CommandProcessor::notify); |
190 | setOldestUsedRevision(mSynchronizer->getLastReplayedRevision()); | ||
191 | } | ||
192 | |||
193 | KAsync::Job<void> CommandProcessor::flush(void const *command, size_t size) | ||
194 | { | ||
195 | flatbuffers::Verifier verifier((const uint8_t *)command, size); | ||
196 | if (Sink::Commands::VerifyFlushBuffer(verifier)) { | ||
197 | auto buffer = Sink::Commands::GetFlush(command); | ||
198 | const auto flushType = buffer->type(); | ||
199 | const auto flushId = BufferUtils::extractBuffer(buffer->id()); | ||
200 | if (flushType == Sink::Flush::FlushReplayQueue) { | ||
201 | SinkTrace() << "Flushing synchronizer "; | ||
202 | Q_ASSERT(mSynchronizer); | ||
203 | mSynchronizer->flush(flushType, flushId); | ||
204 | } else { | ||
205 | SinkTrace() << "Emitting flush completion" << flushId; | ||
206 | Sink::Notification n; | ||
207 | n.type = Sink::Notification::FlushCompletion; | ||
208 | n.id = flushId; | ||
209 | emit notify(n); | ||
210 | } | ||
211 | return KAsync::null<void>(); | ||
212 | } | ||
213 | return KAsync::error<void>(-1, "Invalid flush command."); | ||
197 | } | 214 | } |
198 | 215 | ||