From 3e7b8fe8b8cca75b546c8cac2c09ce231861f21b Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Mon, 28 Nov 2016 23:04:59 +0100 Subject: Used the CommandProcessor as central place for all command processing. --- common/commandprocessor.cpp | 210 +++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 208 insertions(+), 2 deletions(-) (limited to 'common/commandprocessor.cpp') diff --git a/common/commandprocessor.cpp b/common/commandprocessor.cpp index 57fe524..bdff905 100644 --- a/common/commandprocessor.cpp +++ b/common/commandprocessor.cpp @@ -19,25 +19,171 @@ */ #include "commandprocessor.h" +#include + #include "commands.h" #include "messagequeue.h" -#include "queuedcommand_generated.h" #include "flush_generated.h" #include "inspector.h" #include "synchronizer.h" #include "pipeline.h" #include "bufferutils.h" +#include "definitions.h" +#include "storage.h" + +#include "queuedcommand_generated.h" +#include "revisionreplayed_generated.h" +#include "synchronize_generated.h" static int sBatchSize = 100; +// This interval directly affects the roundtrip time of single commands +static int sCommitInterval = 10; + using namespace Sink; +using namespace Sink::Storage; -CommandProcessor::CommandProcessor(Sink::Pipeline *pipeline, QList commandQueues) : QObject(), mPipeline(pipeline), mCommandQueues(commandQueues), mProcessingLock(false), mLowerBoundRevision(0) +CommandProcessor::CommandProcessor(Sink::Pipeline *pipeline, const QByteArray &instanceId) + : QObject(), + mPipeline(pipeline), + mUserQueue(Sink::storageLocation(), instanceId + ".userqueue"), + mSynchronizerQueue(Sink::storageLocation(), instanceId + ".synchronizerqueue"), + mCommandQueues(QList() << &mUserQueue << &mSynchronizerQueue), mProcessingLock(false), mLowerBoundRevision(0) { for (auto queue : mCommandQueues) { const bool ret = connect(queue, &MessageQueue::messageReady, this, &CommandProcessor::process); Q_UNUSED(ret); } + + mCommitQueueTimer.setInterval(sCommitInterval); + mCommitQueueTimer.setSingleShot(true); + QObject::connect(&mCommitQueueTimer, &QTimer::timeout, &mUserQueue, &MessageQueue::commit); +} + +static void enqueueCommand(MessageQueue &mq, int commandId, const QByteArray &data) +{ + flatbuffers::FlatBufferBuilder fbb; + auto commandData = Sink::EntityBuffer::appendAsVector(fbb, data.constData(), data.size()); + auto buffer = Sink::CreateQueuedCommand(fbb, commandId, commandData); + Sink::FinishQueuedCommandBuffer(fbb, buffer); + mq.enqueue(fbb.GetBufferPointer(), fbb.GetSize()); +} + +void CommandProcessor::processCommand(int commandId, const QByteArray &data) +{ + switch (commandId) { + case Commands::FlushCommand: + processFlushCommand(data); + break; + case Commands::SynchronizeCommand: + processSynchronizeCommand(data); + break; + // case Commands::RevisionReplayedCommand: + // processRevisionReplayedCommand(data); + // break; + default: { + static int modifications = 0; + mUserQueue.startTransaction(); + enqueueCommand(mUserQueue, commandId, data); + modifications++; + if (modifications >= sBatchSize) { + mUserQueue.commit(); + modifications = 0; + mCommitQueueTimer.stop(); + } else { + mCommitQueueTimer.start(); + } + } + }; +} + +void CommandProcessor::processFlushCommand(const QByteArray &data) +{ + flatbuffers::Verifier verifier((const uint8_t *)data.constData(), data.size()); + if (Sink::Commands::VerifyFlushBuffer(verifier)) { + auto buffer = Sink::Commands::GetFlush(data.constData()); + const auto flushType = buffer->type(); + const auto flushId = BufferUtils::extractBuffer(buffer->id()); + if (flushType == Sink::Flush::FlushSynchronization) { + mSynchronizer->flush(flushType, flushId); + } else { + mUserQueue.startTransaction(); + enqueueCommand(mUserQueue, Commands::FlushCommand, data); + mUserQueue.commit(); + } + } + +} + +void CommandProcessor::processSynchronizeCommand(const QByteArray &data) +{ + flatbuffers::Verifier verifier((const uint8_t *)data.constData(), data.size()); + if (Sink::Commands::VerifySynchronizeBuffer(verifier)) { + auto buffer = Sink::Commands::GetSynchronize(data.constData()); + auto timer = QSharedPointer::create(); + timer->start(); + auto job = KAsync::null(); + Sink::QueryBase query; + if (buffer->query()) { + auto data = QByteArray::fromStdString(buffer->query()->str()); + QDataStream stream(&data, QIODevice::ReadOnly); + stream >> query; + } + job = synchronizeWithSource(query); + job.then([timer](const KAsync::Error &error) { + if (error) { + SinkWarning() << "Sync failed: " << error.errorMessage; + return KAsync::error(error); + } else { + SinkTrace() << "Sync took " << Sink::Log::TraceTime(timer->elapsed()); + return KAsync::null(); + } + }) + .exec(); + return; + } else { + SinkWarning() << "received invalid command"; + } +} + +// void CommandProcessor::processRevisionReplayedCommand(const QByteArray &data) +// { +// flatbuffers::Verifier verifier((const uint8_t *)commandBuffer.constData(), commandBuffer.size()); +// if (Sink::Commands::VerifyRevisionReplayedBuffer(verifier)) { +// auto buffer = Sink::Commands::GetRevisionReplayed(commandBuffer.constData()); +// client.currentRevision = buffer->revision(); +// } else { +// SinkWarning() << "received invalid command"; +// } +// loadResource().setLowerBoundRevision(lowerBoundRevision()); +// } + +KAsync::Job CommandProcessor::synchronizeWithSource(const Sink::QueryBase &query) +{ + return KAsync::start([this, query] { + Sink::Notification n; + n.id = "sync"; + n.type = Sink::Notification::Status; + n.message = "Synchronization has started."; + n.code = Sink::ApplicationDomain::BusyStatus; + emit notify(n); + + SinkLog() << " Synchronizing"; + return mSynchronizer->synchronize(query) + .then([this](const KAsync::Error &error) { + if (!error) { + SinkLog() << "Done Synchronizing"; + Sink::Notification n; + n.id = "sync"; + n.type = Sink::Notification::Status; + n.message = "Synchronization has ended."; + n.code = Sink::ApplicationDomain::ConnectedStatus; + emit notify(n); + return KAsync::null(); + } + return KAsync::error(error); + }); + }); } void CommandProcessor::setOldestUsedRevision(qint64 revision) @@ -186,6 +332,27 @@ void CommandProcessor::setInspector(const QSharedPointer &inspector) void CommandProcessor::setSynchronizer(const QSharedPointer &synchronizer) { mSynchronizer = synchronizer; + mSynchronizer->setup([this](int commandId, const QByteArray &data) { + enqueueCommand(mSynchronizerQueue, commandId, data); + }, mSynchronizerQueue); + + QObject::connect(mSynchronizer.data(), &Synchronizer::replayingChanges, [this]() { + Sink::Notification n; + n.id = "changereplay"; + n.type = Sink::Notification::Status; + n.message = "Replaying changes."; + n.code = Sink::ApplicationDomain::BusyStatus; + emit notify(n); + }); + QObject::connect(mSynchronizer.data(), &Synchronizer::changesReplayed, [this]() { + Sink::Notification n; + n.id = "changereplay"; + n.type = Sink::Notification::Status; + n.message = "All changes have been replayed."; + n.code = Sink::ApplicationDomain::ConnectedStatus; + emit notify(n); + }); + QObject::connect(mSynchronizer.data(), &Synchronizer::notify, this, &CommandProcessor::notify); setOldestUsedRevision(mSynchronizer->getLastReplayedRevision()); } @@ -213,3 +380,42 @@ KAsync::Job CommandProcessor::flush(void const *command, size_t size) return KAsync::error(-1, "Invalid flush command."); } +static void waitForDrained(KAsync::Future &f, MessageQueue &queue) +{ + if (queue.isEmpty()) { + f.setFinished(); + } else { + QObject::connect(&queue, &MessageQueue::drained, [&f]() { f.setFinished(); }); + } +}; + +KAsync::Job CommandProcessor::processAllMessages() +{ + // We have to wait for all items to be processed to ensure the synced items are available when a query gets executed. + // TODO: report errors while processing sync? + // TODO JOBAPI: A helper that waits for n events and then continues? + return KAsync::start([this](KAsync::Future &f) { + if (mCommitQueueTimer.isActive()) { + auto context = new QObject; + QObject::connect(&mCommitQueueTimer, &QTimer::timeout, context, [&f, context]() { + delete context; + f.setFinished(); + }); + } else { + f.setFinished(); + } + }) + .then([this](KAsync::Future &f) { waitForDrained(f, mSynchronizerQueue); }) + .then([this](KAsync::Future &f) { waitForDrained(f, mUserQueue); }) + .then([this](KAsync::Future &f) { + if (mSynchronizer->allChangesReplayed()) { + f.setFinished(); + } else { + auto context = new QObject; + QObject::connect(mSynchronizer.data(), &ChangeReplay::changesReplayed, context, [&f, context]() { + delete context; + f.setFinished(); + }); + } + }); +} -- cgit v1.2.3