From 22af1ed535b4afc8db3804e72bc5adb1a1b28d60 Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Fri, 25 Nov 2016 08:27:06 +0100 Subject: Added the flush command. Instead of trying to actually flush queues, we send a special command through the same queues as the other commands and can thus guarantee that the respective commands have been processed without blocking anything. --- common/genericresource.cpp | 57 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 57 insertions(+) (limited to 'common/genericresource.cpp') diff --git a/common/genericresource.cpp b/common/genericresource.cpp index 7c4d4ea..7b83957 100644 --- a/common/genericresource.cpp +++ b/common/genericresource.cpp @@ -27,6 +27,7 @@ #include "deleteentity_generated.h" #include "inspection_generated.h" #include "notification_generated.h" +#include "flush_generated.h" #include "domainadaptor.h" #include "commands.h" #include "index.h" @@ -54,6 +55,7 @@ class CommandProcessor : public QObject { Q_OBJECT typedef std::function(void const *, size_t)> InspectionFunction; + typedef std::function(void const *, size_t)> FlushFunction; SINK_DEBUG_AREA("commandprocessor") public: @@ -75,6 +77,11 @@ public: mInspect = f; } + void setFlushCommand(const FlushFunction &f) + { + mFlush = f; + } + signals: void error(int errorCode, const QString &errorMessage); @@ -124,6 +131,13 @@ private slots: } else { return KAsync::error(-1, "Missing inspection command."); } + case Sink::Commands::FlushCommand: + if (mFlush) { + return mFlush(queuedCommand->command()->Data(), queuedCommand->command()->size()) + .syncThen([]() { return -1; }); + } else { + return KAsync::error(-1, "Missing inspection command."); + } default: return KAsync::error(-1, "Unhandled command"); } @@ -219,6 +233,7 @@ private: // The lowest revision we no longer need qint64 mLowerBoundRevision; InspectionFunction mInspect; + FlushFunction mFlush; }; GenericResource::GenericResource(const ResourceContext &resourceContext, const QSharedPointer &pipeline ) @@ -266,6 +281,26 @@ GenericResource::GenericResource(const ResourceContext &resourceContext, const Q } return KAsync::error(-1, "Invalid inspection command."); }); + mProcessor->setFlushCommand([this](void const *command, size_t size) { + flatbuffers::Verifier verifier((const uint8_t *)command, size); + if (Sink::Commands::VerifyFlushBuffer(verifier)) { + auto buffer = Sink::Commands::GetFlush(command); + const auto flushType = buffer->type(); + const auto flushId = BufferUtils::extractBuffer(buffer->id()); + if (flushType == Sink::Flush::FlushReplayQueue) { + SinkTrace() << "Flushing synchronizer "; + mSynchronizer->flush(flushType, flushId); + } else { + SinkTrace() << "Emitting flush completion" << flushId; + Sink::Notification n; + n.type = Sink::Notification::FlushCompletion; + n.id = flushId; + emit notify(n); + } + return KAsync::null(); + } + return KAsync::error(-1, "Invalid flush command."); + }); { auto ret = QObject::connect(mProcessor.get(), &CommandProcessor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); }); Q_ASSERT(ret); @@ -371,6 +406,10 @@ void GenericResource::enqueueCommand(MessageQueue &mq, int commandId, const QByt void GenericResource::processCommand(int commandId, const QByteArray &data) { + if (commandId == Commands::FlushCommand) { + processFlushCommand(data); + return; + } static int modifications = 0; mUserQueue.startTransaction(); enqueueCommand(mUserQueue, commandId, data); @@ -384,6 +423,24 @@ void GenericResource::processCommand(int commandId, const QByteArray &data) } } +void GenericResource::processFlushCommand(const QByteArray &data) +{ + flatbuffers::Verifier verifier((const uint8_t *)data.constData(), data.size()); + if (Sink::Commands::VerifyFlushBuffer(verifier)) { + auto buffer = Sink::Commands::GetFlush(data.constData()); + const auto flushType = buffer->type(); + const auto flushId = BufferUtils::extractBuffer(buffer->id()); + if (flushType == Sink::Flush::FlushSynchronization) { + mSynchronizer->flush(flushType, flushId); + } else { + mUserQueue.startTransaction(); + enqueueCommand(mUserQueue, Commands::FlushCommand, data); + mUserQueue.commit(); + } + } + +} + KAsync::Job GenericResource::synchronizeWithSource(const Sink::QueryBase &query) { return KAsync::start([this, query] { -- cgit v1.2.3