From 3e7b8fe8b8cca75b546c8cac2c09ce231861f21b Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Mon, 28 Nov 2016 23:04:59 +0100 Subject: Used the CommandProcessor as central place for all command processing. --- common/genericresource.cpp | 167 +++------------------------------------------ 1 file changed, 8 insertions(+), 159 deletions(-) (limited to 'common/genericresource.cpp') diff --git a/common/genericresource.cpp b/common/genericresource.cpp index 38da6bf..82112b3 100644 --- a/common/genericresource.cpp +++ b/common/genericresource.cpp @@ -19,32 +19,13 @@ */ #include "genericresource.h" -#include "entitybuffer.h" #include "pipeline.h" -#include "queuedcommand_generated.h" -#include "createentity_generated.h" -#include "modifyentity_generated.h" -#include "deleteentity_generated.h" -#include "inspection_generated.h" -#include "notification_generated.h" -#include "flush_generated.h" #include "domainadaptor.h" -#include "commands.h" -#include "index.h" #include "log.h" -#include "definitions.h" -#include "bufferutils.h" -#include "adaptorfactoryregistry.h" #include "synchronizer.h" #include "commandprocessor.h" - -#include -#include -#include - -static int sBatchSize = 100; -// This interval directly affects the roundtrip time of single commands -static int sCommitInterval = 10; +#include "definitions.h" +#include "storage.h" using namespace Sink; using namespace Sink::Storage; @@ -52,20 +33,14 @@ using namespace Sink::Storage; GenericResource::GenericResource(const ResourceContext &resourceContext, const QSharedPointer &pipeline ) : Sink::Resource(), mResourceContext(resourceContext), - mUserQueue(Sink::storageLocation(), resourceContext.instanceId() + ".userqueue"), - mSynchronizerQueue(Sink::storageLocation(), resourceContext.instanceId() + ".synchronizerqueue"), mPipeline(pipeline ? pipeline : QSharedPointer::create(resourceContext)), + mProcessor(QSharedPointer::create(mPipeline.data(), resourceContext.instanceId())), mError(0), mClientLowerBoundRevision(std::numeric_limits::max()) { - mProcessor = std::unique_ptr(new CommandProcessor(mPipeline.data(), QList() << &mUserQueue << &mSynchronizerQueue)); - QObject::connect(mProcessor.get(), &CommandProcessor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); }); - QObject::connect(mProcessor.get(), &CommandProcessor::notify, this, &GenericResource::notify); + QObject::connect(mProcessor.data(), &CommandProcessor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); }); + QObject::connect(mProcessor.data(), &CommandProcessor::notify, this, &GenericResource::notify); QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, this, &Resource::revisionUpdated); - - mCommitQueueTimer.setInterval(sCommitInterval); - mCommitQueueTimer.setSingleShot(true); - QObject::connect(&mCommitQueueTimer, &QTimer::timeout, &mUserQueue, &MessageQueue::commit); } GenericResource::~GenericResource() @@ -80,32 +55,6 @@ void GenericResource::setupPreprocessors(const QByteArray &type, const QVector &synchronizer) { mSynchronizer = synchronizer; - mSynchronizer->setup([this](int commandId, const QByteArray &data) { - enqueueCommand(mSynchronizerQueue, commandId, data); - }, mSynchronizerQueue); - { - auto ret = QObject::connect(mSynchronizer.data(), &Synchronizer::replayingChanges, [this]() { - Sink::Notification n; - n.id = "changereplay"; - n.type = Sink::Notification::Status; - n.message = "Replaying changes."; - n.code = Sink::ApplicationDomain::BusyStatus; - emit notify(n); - }); - Q_ASSERT(ret); - } - { - auto ret = QObject::connect(mSynchronizer.data(), &Synchronizer::changesReplayed, [this]() { - Sink::Notification n; - n.id = "changereplay"; - n.type = Sink::Notification::Status; - n.message = "All changes have been replayed."; - n.code = Sink::ApplicationDomain::ConnectedStatus; - emit notify(n); - }); - Q_ASSERT(ret); - } - mProcessor->setSynchronizer(synchronizer); QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, mSynchronizer.data(), &ChangeReplay::revisionChanged, Qt::QueuedConnection); QObject::connect(mSynchronizer.data(), &ChangeReplay::changesReplayed, this, &GenericResource::updateLowerBoundRevision); @@ -146,119 +95,19 @@ int GenericResource::error() const return mError; } -void GenericResource::enqueueCommand(MessageQueue &mq, int commandId, const QByteArray &data) -{ - flatbuffers::FlatBufferBuilder fbb; - auto commandData = Sink::EntityBuffer::appendAsVector(fbb, data.constData(), data.size()); - auto buffer = Sink::CreateQueuedCommand(fbb, commandId, commandData); - Sink::FinishQueuedCommandBuffer(fbb, buffer); - mq.enqueue(fbb.GetBufferPointer(), fbb.GetSize()); -} - void GenericResource::processCommand(int commandId, const QByteArray &data) { - if (commandId == Commands::FlushCommand) { - processFlushCommand(data); - return; - } - static int modifications = 0; - mUserQueue.startTransaction(); - enqueueCommand(mUserQueue, commandId, data); - modifications++; - if (modifications >= sBatchSize) { - mUserQueue.commit(); - modifications = 0; - mCommitQueueTimer.stop(); - } else { - mCommitQueueTimer.start(); - } -} - -void GenericResource::processFlushCommand(const QByteArray &data) -{ - flatbuffers::Verifier verifier((const uint8_t *)data.constData(), data.size()); - if (Sink::Commands::VerifyFlushBuffer(verifier)) { - auto buffer = Sink::Commands::GetFlush(data.constData()); - const auto flushType = buffer->type(); - const auto flushId = BufferUtils::extractBuffer(buffer->id()); - if (flushType == Sink::Flush::FlushSynchronization) { - mSynchronizer->flush(flushType, flushId); - } else { - mUserQueue.startTransaction(); - enqueueCommand(mUserQueue, Commands::FlushCommand, data); - mUserQueue.commit(); - } - } - + mProcessor->processCommand(commandId, data); } KAsync::Job GenericResource::synchronizeWithSource(const Sink::QueryBase &query) { - return KAsync::start([this, query] { - - Sink::Notification n; - n.id = "sync"; - n.type = Sink::Notification::Status; - n.message = "Synchronization has started."; - n.code = Sink::ApplicationDomain::BusyStatus; - emit notify(n); - - SinkLog() << " Synchronizing"; - return mSynchronizer->synchronize(query) - .then([this](const KAsync::Error &error) { - if (!error) { - SinkLog() << "Done Synchronizing"; - Sink::Notification n; - n.id = "sync"; - n.type = Sink::Notification::Status; - n.message = "Synchronization has ended."; - n.code = Sink::ApplicationDomain::ConnectedStatus; - emit notify(n); - return KAsync::null(); - } - return KAsync::error(error); - }); - }); + return mSynchronizer->synchronize(query); } -static void waitForDrained(KAsync::Future &f, MessageQueue &queue) -{ - if (queue.isEmpty()) { - f.setFinished(); - } else { - QObject::connect(&queue, &MessageQueue::drained, [&f]() { f.setFinished(); }); - } -}; - KAsync::Job GenericResource::processAllMessages() { - // We have to wait for all items to be processed to ensure the synced items are available when a query gets executed. - // TODO: report errors while processing sync? - // TODO JOBAPI: A helper that waits for n events and then continues? - return KAsync::start([this](KAsync::Future &f) { - if (mCommitQueueTimer.isActive()) { - auto context = new QObject; - QObject::connect(&mCommitQueueTimer, &QTimer::timeout, context, [&f, context]() { - delete context; - f.setFinished(); - }); - } else { - f.setFinished(); - } - }) - .then([this](KAsync::Future &f) { waitForDrained(f, mSynchronizerQueue); }) - .then([this](KAsync::Future &f) { waitForDrained(f, mUserQueue); }) - .then([this](KAsync::Future &f) { - if (mSynchronizer->allChangesReplayed()) { - f.setFinished(); - } else { - auto context = new QObject; - QObject::connect(mSynchronizer.data(), &ChangeReplay::changesReplayed, context, [&f, context]() { - delete context; - f.setFinished(); - }); - } - }); + return mProcessor->processAllMessages(); } void GenericResource::updateLowerBoundRevision() -- cgit v1.2.3