diff options
Diffstat (limited to 'common/genericresource.cpp')
-rw-r--r-- | common/genericresource.cpp | 22 |
1 files changed, 1 insertions, 21 deletions
diff --git a/common/genericresource.cpp b/common/genericresource.cpp index 80e59c9..38da6bf 100644 --- a/common/genericresource.cpp +++ b/common/genericresource.cpp | |||
@@ -59,26 +59,6 @@ GenericResource::GenericResource(const ResourceContext &resourceContext, const Q | |||
59 | mClientLowerBoundRevision(std::numeric_limits<qint64>::max()) | 59 | mClientLowerBoundRevision(std::numeric_limits<qint64>::max()) |
60 | { | 60 | { |
61 | mProcessor = std::unique_ptr<CommandProcessor>(new CommandProcessor(mPipeline.data(), QList<MessageQueue *>() << &mUserQueue << &mSynchronizerQueue)); | 61 | mProcessor = std::unique_ptr<CommandProcessor>(new CommandProcessor(mPipeline.data(), QList<MessageQueue *>() << &mUserQueue << &mSynchronizerQueue)); |
62 | mProcessor->setFlushCommand([this](void const *command, size_t size) { | ||
63 | flatbuffers::Verifier verifier((const uint8_t *)command, size); | ||
64 | if (Sink::Commands::VerifyFlushBuffer(verifier)) { | ||
65 | auto buffer = Sink::Commands::GetFlush(command); | ||
66 | const auto flushType = buffer->type(); | ||
67 | const auto flushId = BufferUtils::extractBuffer(buffer->id()); | ||
68 | if (flushType == Sink::Flush::FlushReplayQueue) { | ||
69 | SinkTrace() << "Flushing synchronizer "; | ||
70 | mSynchronizer->flush(flushType, flushId); | ||
71 | } else { | ||
72 | SinkTrace() << "Emitting flush completion" << flushId; | ||
73 | Sink::Notification n; | ||
74 | n.type = Sink::Notification::FlushCompletion; | ||
75 | n.id = flushId; | ||
76 | emit notify(n); | ||
77 | } | ||
78 | return KAsync::null<void>(); | ||
79 | } | ||
80 | return KAsync::error<void>(-1, "Invalid flush command."); | ||
81 | }); | ||
82 | QObject::connect(mProcessor.get(), &CommandProcessor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); }); | 62 | QObject::connect(mProcessor.get(), &CommandProcessor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); }); |
83 | QObject::connect(mProcessor.get(), &CommandProcessor::notify, this, &GenericResource::notify); | 63 | QObject::connect(mProcessor.get(), &CommandProcessor::notify, this, &GenericResource::notify); |
84 | QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, this, &Resource::revisionUpdated); | 64 | QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, this, &Resource::revisionUpdated); |
@@ -126,7 +106,7 @@ void GenericResource::setupSynchronizer(const QSharedPointer<Synchronizer> &sync | |||
126 | Q_ASSERT(ret); | 106 | Q_ASSERT(ret); |
127 | } | 107 | } |
128 | 108 | ||
129 | mProcessor->setOldestUsedRevision(mSynchronizer->getLastReplayedRevision()); | 109 | mProcessor->setSynchronizer(synchronizer); |
130 | QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, mSynchronizer.data(), &ChangeReplay::revisionChanged, Qt::QueuedConnection); | 110 | QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, mSynchronizer.data(), &ChangeReplay::revisionChanged, Qt::QueuedConnection); |
131 | QObject::connect(mSynchronizer.data(), &ChangeReplay::changesReplayed, this, &GenericResource::updateLowerBoundRevision); | 111 | QObject::connect(mSynchronizer.data(), &ChangeReplay::changesReplayed, this, &GenericResource::updateLowerBoundRevision); |
132 | QMetaObject::invokeMethod(mSynchronizer.data(), "revisionChanged", Qt::QueuedConnection); | 112 | QMetaObject::invokeMethod(mSynchronizer.data(), "revisionChanged", Qt::QueuedConnection); |