summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--common/commandprocessor.cpp39
-rw-r--r--common/commandprocessor.h6
-rw-r--r--common/genericresource.cpp22
3 files changed, 31 insertions, 36 deletions
diff --git a/common/commandprocessor.cpp b/common/commandprocessor.cpp
index 4ff352b..57fe524 100644
--- a/common/commandprocessor.cpp
+++ b/common/commandprocessor.cpp
@@ -22,9 +22,11 @@
22#include "commands.h" 22#include "commands.h"
23#include "messagequeue.h" 23#include "messagequeue.h"
24#include "queuedcommand_generated.h" 24#include "queuedcommand_generated.h"
25#include "flush_generated.h"
25#include "inspector.h" 26#include "inspector.h"
26#include "synchronizer.h" 27#include "synchronizer.h"
27#include "pipeline.h" 28#include "pipeline.h"
29#include "bufferutils.h"
28 30
29static int sBatchSize = 100; 31static int sBatchSize = 100;
30 32
@@ -43,11 +45,6 @@ void CommandProcessor::setOldestUsedRevision(qint64 revision)
43 mLowerBoundRevision = revision; 45 mLowerBoundRevision = revision;
44} 46}
45 47
46void CommandProcessor::setFlushCommand(const FlushFunction &f)
47{
48 mFlush = f;
49}
50
51bool CommandProcessor::messagesToProcessAvailable() 48bool CommandProcessor::messagesToProcessAvailable()
52{ 49{
53 for (auto queue : mCommandQueues) { 50 for (auto queue : mCommandQueues) {
@@ -91,12 +88,8 @@ KAsync::Job<qint64> CommandProcessor::processQueuedCommand(const Sink::QueuedCom
91 return mInspector->processCommand(data, size) 88 return mInspector->processCommand(data, size)
92 .syncThen<qint64>([]() { return -1; }); 89 .syncThen<qint64>([]() { return -1; });
93 case Sink::Commands::FlushCommand: 90 case Sink::Commands::FlushCommand:
94 if (mFlush) { 91 return flush(data, size)
95 return mFlush(data, size) 92 .syncThen<qint64>([]() { return -1; });
96 .syncThen<qint64>([]() { return -1; });
97 } else {
98 return KAsync::error<qint64>(-1, "Missing inspection command.");
99 }
100 default: 93 default:
101 return KAsync::error<qint64>(-1, "Unhandled command"); 94 return KAsync::error<qint64>(-1, "Unhandled command");
102 } 95 }
@@ -194,5 +187,29 @@ void CommandProcessor::setSynchronizer(const QSharedPointer<Synchronizer> &synch
194{ 187{
195 mSynchronizer = synchronizer; 188 mSynchronizer = synchronizer;
196 QObject::connect(mSynchronizer.data(), &Synchronizer::notify, this, &CommandProcessor::notify); 189 QObject::connect(mSynchronizer.data(), &Synchronizer::notify, this, &CommandProcessor::notify);
190 setOldestUsedRevision(mSynchronizer->getLastReplayedRevision());
191}
192
193KAsync::Job<void> CommandProcessor::flush(void const *command, size_t size)
194{
195 flatbuffers::Verifier verifier((const uint8_t *)command, size);
196 if (Sink::Commands::VerifyFlushBuffer(verifier)) {
197 auto buffer = Sink::Commands::GetFlush(command);
198 const auto flushType = buffer->type();
199 const auto flushId = BufferUtils::extractBuffer(buffer->id());
200 if (flushType == Sink::Flush::FlushReplayQueue) {
201 SinkTrace() << "Flushing synchronizer ";
202 Q_ASSERT(mSynchronizer);
203 mSynchronizer->flush(flushType, flushId);
204 } else {
205 SinkTrace() << "Emitting flush completion" << flushId;
206 Sink::Notification n;
207 n.type = Sink::Notification::FlushCompletion;
208 n.id = flushId;
209 emit notify(n);
210 }
211 return KAsync::null<void>();
212 }
213 return KAsync::error<void>(-1, "Invalid flush command.");
197} 214}
198 215
diff --git a/common/commandprocessor.h b/common/commandprocessor.h
index 75ae37a..d00cf43 100644
--- a/common/commandprocessor.h
+++ b/common/commandprocessor.h
@@ -42,7 +42,6 @@ namespace Sink {
42class CommandProcessor : public QObject 42class CommandProcessor : public QObject
43{ 43{
44 Q_OBJECT 44 Q_OBJECT
45 typedef std::function<KAsync::Job<void>(void const *, size_t)> FlushFunction;
46 SINK_DEBUG_AREA("commandprocessor") 45 SINK_DEBUG_AREA("commandprocessor")
47 46
48public: 47public:
@@ -50,8 +49,6 @@ public:
50 49
51 void setOldestUsedRevision(qint64 revision); 50 void setOldestUsedRevision(qint64 revision);
52 51
53 void setFlushCommand(const FlushFunction &f);
54
55 void setInspector(const QSharedPointer<Inspector> &inspector); 52 void setInspector(const QSharedPointer<Inspector> &inspector);
56 void setSynchronizer(const QSharedPointer<Synchronizer> &synchronizer); 53 void setSynchronizer(const QSharedPointer<Synchronizer> &synchronizer);
57 54
@@ -71,13 +68,14 @@ private slots:
71 KAsync::Job<void> processPipeline(); 68 KAsync::Job<void> processPipeline();
72 69
73private: 70private:
71 KAsync::Job<void> flush(void const *command, size_t size);
72
74 Sink::Pipeline *mPipeline; 73 Sink::Pipeline *mPipeline;
75 // Ordered by priority 74 // Ordered by priority
76 QList<MessageQueue *> mCommandQueues; 75 QList<MessageQueue *> mCommandQueues;
77 bool mProcessingLock; 76 bool mProcessingLock;
78 // The lowest revision we no longer need 77 // The lowest revision we no longer need
79 qint64 mLowerBoundRevision; 78 qint64 mLowerBoundRevision;
80 FlushFunction mFlush;
81 QSharedPointer<Synchronizer> mSynchronizer; 79 QSharedPointer<Synchronizer> mSynchronizer;
82 QSharedPointer<Inspector> mInspector; 80 QSharedPointer<Inspector> mInspector;
83}; 81};
diff --git a/common/genericresource.cpp b/common/genericresource.cpp
index 80e59c9..38da6bf 100644
--- a/common/genericresource.cpp
+++ b/common/genericresource.cpp
@@ -59,26 +59,6 @@ GenericResource::GenericResource(const ResourceContext &resourceContext, const Q
59 mClientLowerBoundRevision(std::numeric_limits<qint64>::max()) 59 mClientLowerBoundRevision(std::numeric_limits<qint64>::max())
60{ 60{
61 mProcessor = std::unique_ptr<CommandProcessor>(new CommandProcessor(mPipeline.data(), QList<MessageQueue *>() << &mUserQueue << &mSynchronizerQueue)); 61 mProcessor = std::unique_ptr<CommandProcessor>(new CommandProcessor(mPipeline.data(), QList<MessageQueue *>() << &mUserQueue << &mSynchronizerQueue));
62 mProcessor->setFlushCommand([this](void const *command, size_t size) {
63 flatbuffers::Verifier verifier((const uint8_t *)command, size);
64 if (Sink::Commands::VerifyFlushBuffer(verifier)) {
65 auto buffer = Sink::Commands::GetFlush(command);
66 const auto flushType = buffer->type();
67 const auto flushId = BufferUtils::extractBuffer(buffer->id());
68 if (flushType == Sink::Flush::FlushReplayQueue) {
69 SinkTrace() << "Flushing synchronizer ";
70 mSynchronizer->flush(flushType, flushId);
71 } else {
72 SinkTrace() << "Emitting flush completion" << flushId;
73 Sink::Notification n;
74 n.type = Sink::Notification::FlushCompletion;
75 n.id = flushId;
76 emit notify(n);
77 }
78 return KAsync::null<void>();
79 }
80 return KAsync::error<void>(-1, "Invalid flush command.");
81 });
82 QObject::connect(mProcessor.get(), &CommandProcessor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); }); 62 QObject::connect(mProcessor.get(), &CommandProcessor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); });
83 QObject::connect(mProcessor.get(), &CommandProcessor::notify, this, &GenericResource::notify); 63 QObject::connect(mProcessor.get(), &CommandProcessor::notify, this, &GenericResource::notify);
84 QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, this, &Resource::revisionUpdated); 64 QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, this, &Resource::revisionUpdated);
@@ -126,7 +106,7 @@ void GenericResource::setupSynchronizer(const QSharedPointer<Synchronizer> &sync
126 Q_ASSERT(ret); 106 Q_ASSERT(ret);
127 } 107 }
128 108
129 mProcessor->setOldestUsedRevision(mSynchronizer->getLastReplayedRevision()); 109 mProcessor->setSynchronizer(synchronizer);
130 QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, mSynchronizer.data(), &ChangeReplay::revisionChanged, Qt::QueuedConnection); 110 QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, mSynchronizer.data(), &ChangeReplay::revisionChanged, Qt::QueuedConnection);
131 QObject::connect(mSynchronizer.data(), &ChangeReplay::changesReplayed, this, &GenericResource::updateLowerBoundRevision); 111 QObject::connect(mSynchronizer.data(), &ChangeReplay::changesReplayed, this, &GenericResource::updateLowerBoundRevision);
132 QMetaObject::invokeMethod(mSynchronizer.data(), "revisionChanged", Qt::QueuedConnection); 112 QMetaObject::invokeMethod(mSynchronizer.data(), "revisionChanged", Qt::QueuedConnection);