diff options
author | Christian Mollekopf <chrigi_1@fastmail.fm> | 2016-11-28 19:43:23 +0100 |
---|---|---|
committer | Christian Mollekopf <chrigi_1@fastmail.fm> | 2016-11-28 20:42:32 +0100 |
commit | 7fdcc36a1a352bb869020ade8a8aa697c3e8b80c (patch) | |
tree | 041d2ea430698bbea66a7e848621b49900e09357 | |
parent | 938554f267193b652478fc12343819fa45d76034 (diff) | |
download | sink-7fdcc36a1a352bb869020ade8a8aa697c3e8b80c.tar.gz sink-7fdcc36a1a352bb869020ade8a8aa697c3e8b80c.zip |
Moved the flush command to the command processor.
-rw-r--r-- | common/commandprocessor.cpp | 39 | ||||
-rw-r--r-- | common/commandprocessor.h | 6 | ||||
-rw-r--r-- | common/genericresource.cpp | 22 |
3 files changed, 31 insertions, 36 deletions
diff --git a/common/commandprocessor.cpp b/common/commandprocessor.cpp index 4ff352b..57fe524 100644 --- a/common/commandprocessor.cpp +++ b/common/commandprocessor.cpp | |||
@@ -22,9 +22,11 @@ | |||
22 | #include "commands.h" | 22 | #include "commands.h" |
23 | #include "messagequeue.h" | 23 | #include "messagequeue.h" |
24 | #include "queuedcommand_generated.h" | 24 | #include "queuedcommand_generated.h" |
25 | #include "flush_generated.h" | ||
25 | #include "inspector.h" | 26 | #include "inspector.h" |
26 | #include "synchronizer.h" | 27 | #include "synchronizer.h" |
27 | #include "pipeline.h" | 28 | #include "pipeline.h" |
29 | #include "bufferutils.h" | ||
28 | 30 | ||
29 | static int sBatchSize = 100; | 31 | static int sBatchSize = 100; |
30 | 32 | ||
@@ -43,11 +45,6 @@ void CommandProcessor::setOldestUsedRevision(qint64 revision) | |||
43 | mLowerBoundRevision = revision; | 45 | mLowerBoundRevision = revision; |
44 | } | 46 | } |
45 | 47 | ||
46 | void CommandProcessor::setFlushCommand(const FlushFunction &f) | ||
47 | { | ||
48 | mFlush = f; | ||
49 | } | ||
50 | |||
51 | bool CommandProcessor::messagesToProcessAvailable() | 48 | bool CommandProcessor::messagesToProcessAvailable() |
52 | { | 49 | { |
53 | for (auto queue : mCommandQueues) { | 50 | for (auto queue : mCommandQueues) { |
@@ -91,12 +88,8 @@ KAsync::Job<qint64> CommandProcessor::processQueuedCommand(const Sink::QueuedCom | |||
91 | return mInspector->processCommand(data, size) | 88 | return mInspector->processCommand(data, size) |
92 | .syncThen<qint64>([]() { return -1; }); | 89 | .syncThen<qint64>([]() { return -1; }); |
93 | case Sink::Commands::FlushCommand: | 90 | case Sink::Commands::FlushCommand: |
94 | if (mFlush) { | 91 | return flush(data, size) |
95 | return mFlush(data, size) | 92 | .syncThen<qint64>([]() { return -1; }); |
96 | .syncThen<qint64>([]() { return -1; }); | ||
97 | } else { | ||
98 | return KAsync::error<qint64>(-1, "Missing inspection command."); | ||
99 | } | ||
100 | default: | 93 | default: |
101 | return KAsync::error<qint64>(-1, "Unhandled command"); | 94 | return KAsync::error<qint64>(-1, "Unhandled command"); |
102 | } | 95 | } |
@@ -194,5 +187,29 @@ void CommandProcessor::setSynchronizer(const QSharedPointer<Synchronizer> &synch | |||
194 | { | 187 | { |
195 | mSynchronizer = synchronizer; | 188 | mSynchronizer = synchronizer; |
196 | QObject::connect(mSynchronizer.data(), &Synchronizer::notify, this, &CommandProcessor::notify); | 189 | QObject::connect(mSynchronizer.data(), &Synchronizer::notify, this, &CommandProcessor::notify); |
190 | setOldestUsedRevision(mSynchronizer->getLastReplayedRevision()); | ||
191 | } | ||
192 | |||
193 | KAsync::Job<void> CommandProcessor::flush(void const *command, size_t size) | ||
194 | { | ||
195 | flatbuffers::Verifier verifier((const uint8_t *)command, size); | ||
196 | if (Sink::Commands::VerifyFlushBuffer(verifier)) { | ||
197 | auto buffer = Sink::Commands::GetFlush(command); | ||
198 | const auto flushType = buffer->type(); | ||
199 | const auto flushId = BufferUtils::extractBuffer(buffer->id()); | ||
200 | if (flushType == Sink::Flush::FlushReplayQueue) { | ||
201 | SinkTrace() << "Flushing synchronizer "; | ||
202 | Q_ASSERT(mSynchronizer); | ||
203 | mSynchronizer->flush(flushType, flushId); | ||
204 | } else { | ||
205 | SinkTrace() << "Emitting flush completion" << flushId; | ||
206 | Sink::Notification n; | ||
207 | n.type = Sink::Notification::FlushCompletion; | ||
208 | n.id = flushId; | ||
209 | emit notify(n); | ||
210 | } | ||
211 | return KAsync::null<void>(); | ||
212 | } | ||
213 | return KAsync::error<void>(-1, "Invalid flush command."); | ||
197 | } | 214 | } |
198 | 215 | ||
diff --git a/common/commandprocessor.h b/common/commandprocessor.h index 75ae37a..d00cf43 100644 --- a/common/commandprocessor.h +++ b/common/commandprocessor.h | |||
@@ -42,7 +42,6 @@ namespace Sink { | |||
42 | class CommandProcessor : public QObject | 42 | class CommandProcessor : public QObject |
43 | { | 43 | { |
44 | Q_OBJECT | 44 | Q_OBJECT |
45 | typedef std::function<KAsync::Job<void>(void const *, size_t)> FlushFunction; | ||
46 | SINK_DEBUG_AREA("commandprocessor") | 45 | SINK_DEBUG_AREA("commandprocessor") |
47 | 46 | ||
48 | public: | 47 | public: |
@@ -50,8 +49,6 @@ public: | |||
50 | 49 | ||
51 | void setOldestUsedRevision(qint64 revision); | 50 | void setOldestUsedRevision(qint64 revision); |
52 | 51 | ||
53 | void setFlushCommand(const FlushFunction &f); | ||
54 | |||
55 | void setInspector(const QSharedPointer<Inspector> &inspector); | 52 | void setInspector(const QSharedPointer<Inspector> &inspector); |
56 | void setSynchronizer(const QSharedPointer<Synchronizer> &synchronizer); | 53 | void setSynchronizer(const QSharedPointer<Synchronizer> &synchronizer); |
57 | 54 | ||
@@ -71,13 +68,14 @@ private slots: | |||
71 | KAsync::Job<void> processPipeline(); | 68 | KAsync::Job<void> processPipeline(); |
72 | 69 | ||
73 | private: | 70 | private: |
71 | KAsync::Job<void> flush(void const *command, size_t size); | ||
72 | |||
74 | Sink::Pipeline *mPipeline; | 73 | Sink::Pipeline *mPipeline; |
75 | // Ordered by priority | 74 | // Ordered by priority |
76 | QList<MessageQueue *> mCommandQueues; | 75 | QList<MessageQueue *> mCommandQueues; |
77 | bool mProcessingLock; | 76 | bool mProcessingLock; |
78 | // The lowest revision we no longer need | 77 | // The lowest revision we no longer need |
79 | qint64 mLowerBoundRevision; | 78 | qint64 mLowerBoundRevision; |
80 | FlushFunction mFlush; | ||
81 | QSharedPointer<Synchronizer> mSynchronizer; | 79 | QSharedPointer<Synchronizer> mSynchronizer; |
82 | QSharedPointer<Inspector> mInspector; | 80 | QSharedPointer<Inspector> mInspector; |
83 | }; | 81 | }; |
diff --git a/common/genericresource.cpp b/common/genericresource.cpp index 80e59c9..38da6bf 100644 --- a/common/genericresource.cpp +++ b/common/genericresource.cpp | |||
@@ -59,26 +59,6 @@ GenericResource::GenericResource(const ResourceContext &resourceContext, const Q | |||
59 | mClientLowerBoundRevision(std::numeric_limits<qint64>::max()) | 59 | mClientLowerBoundRevision(std::numeric_limits<qint64>::max()) |
60 | { | 60 | { |
61 | mProcessor = std::unique_ptr<CommandProcessor>(new CommandProcessor(mPipeline.data(), QList<MessageQueue *>() << &mUserQueue << &mSynchronizerQueue)); | 61 | mProcessor = std::unique_ptr<CommandProcessor>(new CommandProcessor(mPipeline.data(), QList<MessageQueue *>() << &mUserQueue << &mSynchronizerQueue)); |
62 | mProcessor->setFlushCommand([this](void const *command, size_t size) { | ||
63 | flatbuffers::Verifier verifier((const uint8_t *)command, size); | ||
64 | if (Sink::Commands::VerifyFlushBuffer(verifier)) { | ||
65 | auto buffer = Sink::Commands::GetFlush(command); | ||
66 | const auto flushType = buffer->type(); | ||
67 | const auto flushId = BufferUtils::extractBuffer(buffer->id()); | ||
68 | if (flushType == Sink::Flush::FlushReplayQueue) { | ||
69 | SinkTrace() << "Flushing synchronizer "; | ||
70 | mSynchronizer->flush(flushType, flushId); | ||
71 | } else { | ||
72 | SinkTrace() << "Emitting flush completion" << flushId; | ||
73 | Sink::Notification n; | ||
74 | n.type = Sink::Notification::FlushCompletion; | ||
75 | n.id = flushId; | ||
76 | emit notify(n); | ||
77 | } | ||
78 | return KAsync::null<void>(); | ||
79 | } | ||
80 | return KAsync::error<void>(-1, "Invalid flush command."); | ||
81 | }); | ||
82 | QObject::connect(mProcessor.get(), &CommandProcessor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); }); | 62 | QObject::connect(mProcessor.get(), &CommandProcessor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); }); |
83 | QObject::connect(mProcessor.get(), &CommandProcessor::notify, this, &GenericResource::notify); | 63 | QObject::connect(mProcessor.get(), &CommandProcessor::notify, this, &GenericResource::notify); |
84 | QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, this, &Resource::revisionUpdated); | 64 | QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, this, &Resource::revisionUpdated); |
@@ -126,7 +106,7 @@ void GenericResource::setupSynchronizer(const QSharedPointer<Synchronizer> &sync | |||
126 | Q_ASSERT(ret); | 106 | Q_ASSERT(ret); |
127 | } | 107 | } |
128 | 108 | ||
129 | mProcessor->setOldestUsedRevision(mSynchronizer->getLastReplayedRevision()); | 109 | mProcessor->setSynchronizer(synchronizer); |
130 | QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, mSynchronizer.data(), &ChangeReplay::revisionChanged, Qt::QueuedConnection); | 110 | QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, mSynchronizer.data(), &ChangeReplay::revisionChanged, Qt::QueuedConnection); |
131 | QObject::connect(mSynchronizer.data(), &ChangeReplay::changesReplayed, this, &GenericResource::updateLowerBoundRevision); | 111 | QObject::connect(mSynchronizer.data(), &ChangeReplay::changesReplayed, this, &GenericResource::updateLowerBoundRevision); |
132 | QMetaObject::invokeMethod(mSynchronizer.data(), "revisionChanged", Qt::QueuedConnection); | 112 | QMetaObject::invokeMethod(mSynchronizer.data(), "revisionChanged", Qt::QueuedConnection); |