summaryrefslogtreecommitdiffstats
path: root/common/genericresource.cpp
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2016-11-28 19:43:23 +0100
committerChristian Mollekopf <chrigi_1@fastmail.fm>2016-11-28 20:42:32 +0100
commit7fdcc36a1a352bb869020ade8a8aa697c3e8b80c (patch)
tree041d2ea430698bbea66a7e848621b49900e09357 /common/genericresource.cpp
parent938554f267193b652478fc12343819fa45d76034 (diff)
downloadsink-7fdcc36a1a352bb869020ade8a8aa697c3e8b80c.tar.gz
sink-7fdcc36a1a352bb869020ade8a8aa697c3e8b80c.zip
Moved the flush command to the command processor.
Diffstat (limited to 'common/genericresource.cpp')
-rw-r--r--common/genericresource.cpp22
1 files changed, 1 insertions, 21 deletions
diff --git a/common/genericresource.cpp b/common/genericresource.cpp
index 80e59c9..38da6bf 100644
--- a/common/genericresource.cpp
+++ b/common/genericresource.cpp
@@ -59,26 +59,6 @@ GenericResource::GenericResource(const ResourceContext &resourceContext, const Q
59 mClientLowerBoundRevision(std::numeric_limits<qint64>::max()) 59 mClientLowerBoundRevision(std::numeric_limits<qint64>::max())
60{ 60{
61 mProcessor = std::unique_ptr<CommandProcessor>(new CommandProcessor(mPipeline.data(), QList<MessageQueue *>() << &mUserQueue << &mSynchronizerQueue)); 61 mProcessor = std::unique_ptr<CommandProcessor>(new CommandProcessor(mPipeline.data(), QList<MessageQueue *>() << &mUserQueue << &mSynchronizerQueue));
62 mProcessor->setFlushCommand([this](void const *command, size_t size) {
63 flatbuffers::Verifier verifier((const uint8_t *)command, size);
64 if (Sink::Commands::VerifyFlushBuffer(verifier)) {
65 auto buffer = Sink::Commands::GetFlush(command);
66 const auto flushType = buffer->type();
67 const auto flushId = BufferUtils::extractBuffer(buffer->id());
68 if (flushType == Sink::Flush::FlushReplayQueue) {
69 SinkTrace() << "Flushing synchronizer ";
70 mSynchronizer->flush(flushType, flushId);
71 } else {
72 SinkTrace() << "Emitting flush completion" << flushId;
73 Sink::Notification n;
74 n.type = Sink::Notification::FlushCompletion;
75 n.id = flushId;
76 emit notify(n);
77 }
78 return KAsync::null<void>();
79 }
80 return KAsync::error<void>(-1, "Invalid flush command.");
81 });
82 QObject::connect(mProcessor.get(), &CommandProcessor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); }); 62 QObject::connect(mProcessor.get(), &CommandProcessor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); });
83 QObject::connect(mProcessor.get(), &CommandProcessor::notify, this, &GenericResource::notify); 63 QObject::connect(mProcessor.get(), &CommandProcessor::notify, this, &GenericResource::notify);
84 QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, this, &Resource::revisionUpdated); 64 QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, this, &Resource::revisionUpdated);
@@ -126,7 +106,7 @@ void GenericResource::setupSynchronizer(const QSharedPointer<Synchronizer> &sync
126 Q_ASSERT(ret); 106 Q_ASSERT(ret);
127 } 107 }
128 108
129 mProcessor->setOldestUsedRevision(mSynchronizer->getLastReplayedRevision()); 109 mProcessor->setSynchronizer(synchronizer);
130 QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, mSynchronizer.data(), &ChangeReplay::revisionChanged, Qt::QueuedConnection); 110 QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, mSynchronizer.data(), &ChangeReplay::revisionChanged, Qt::QueuedConnection);
131 QObject::connect(mSynchronizer.data(), &ChangeReplay::changesReplayed, this, &GenericResource::updateLowerBoundRevision); 111 QObject::connect(mSynchronizer.data(), &ChangeReplay::changesReplayed, this, &GenericResource::updateLowerBoundRevision);
132 QMetaObject::invokeMethod(mSynchronizer.data(), "revisionChanged", Qt::QueuedConnection); 112 QMetaObject::invokeMethod(mSynchronizer.data(), "revisionChanged", Qt::QueuedConnection);