From 3e7b8fe8b8cca75b546c8cac2c09ce231861f21b Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Mon, 28 Nov 2016 23:04:59 +0100 Subject: Used the CommandProcessor as central place for all command processing. --- common/commandprocessor.cpp | 210 ++++++++++++++++++++++++++++- common/commandprocessor.h | 19 ++- common/genericresource.cpp | 167 ++--------------------- common/genericresource.h | 13 +- common/listener.cpp | 40 +----- common/resource.cpp | 10 -- common/resource.h | 10 -- common/synchronizer.h | 2 +- examples/dummyresource/resourcefactory.cpp | 19 +-- examples/dummyresource/resourcefactory.h | 2 - tests/testimplementations.h | 5 - 11 files changed, 248 insertions(+), 249 deletions(-) diff --git a/common/commandprocessor.cpp b/common/commandprocessor.cpp index 57fe524..bdff905 100644 --- a/common/commandprocessor.cpp +++ b/common/commandprocessor.cpp @@ -19,25 +19,171 @@ */ #include "commandprocessor.h" +#include + #include "commands.h" #include "messagequeue.h" -#include "queuedcommand_generated.h" #include "flush_generated.h" #include "inspector.h" #include "synchronizer.h" #include "pipeline.h" #include "bufferutils.h" +#include "definitions.h" +#include "storage.h" + +#include "queuedcommand_generated.h" +#include "revisionreplayed_generated.h" +#include "synchronize_generated.h" static int sBatchSize = 100; +// This interval directly affects the roundtrip time of single commands +static int sCommitInterval = 10; + using namespace Sink; +using namespace Sink::Storage; -CommandProcessor::CommandProcessor(Sink::Pipeline *pipeline, QList commandQueues) : QObject(), mPipeline(pipeline), mCommandQueues(commandQueues), mProcessingLock(false), mLowerBoundRevision(0) +CommandProcessor::CommandProcessor(Sink::Pipeline *pipeline, const QByteArray &instanceId) + : QObject(), + mPipeline(pipeline), + mUserQueue(Sink::storageLocation(), instanceId + ".userqueue"), + mSynchronizerQueue(Sink::storageLocation(), instanceId + ".synchronizerqueue"), + mCommandQueues(QList() << &mUserQueue << &mSynchronizerQueue), mProcessingLock(false), mLowerBoundRevision(0) { for (auto queue : mCommandQueues) { const bool ret = connect(queue, &MessageQueue::messageReady, this, &CommandProcessor::process); Q_UNUSED(ret); } + + mCommitQueueTimer.setInterval(sCommitInterval); + mCommitQueueTimer.setSingleShot(true); + QObject::connect(&mCommitQueueTimer, &QTimer::timeout, &mUserQueue, &MessageQueue::commit); +} + +static void enqueueCommand(MessageQueue &mq, int commandId, const QByteArray &data) +{ + flatbuffers::FlatBufferBuilder fbb; + auto commandData = Sink::EntityBuffer::appendAsVector(fbb, data.constData(), data.size()); + auto buffer = Sink::CreateQueuedCommand(fbb, commandId, commandData); + Sink::FinishQueuedCommandBuffer(fbb, buffer); + mq.enqueue(fbb.GetBufferPointer(), fbb.GetSize()); +} + +void CommandProcessor::processCommand(int commandId, const QByteArray &data) +{ + switch (commandId) { + case Commands::FlushCommand: + processFlushCommand(data); + break; + case Commands::SynchronizeCommand: + processSynchronizeCommand(data); + break; + // case Commands::RevisionReplayedCommand: + // processRevisionReplayedCommand(data); + // break; + default: { + static int modifications = 0; + mUserQueue.startTransaction(); + enqueueCommand(mUserQueue, commandId, data); + modifications++; + if (modifications >= sBatchSize) { + mUserQueue.commit(); + modifications = 0; + mCommitQueueTimer.stop(); + } else { + mCommitQueueTimer.start(); + } + } + }; +} + +void CommandProcessor::processFlushCommand(const QByteArray &data) +{ + flatbuffers::Verifier verifier((const uint8_t *)data.constData(), data.size()); + if (Sink::Commands::VerifyFlushBuffer(verifier)) { + auto buffer = Sink::Commands::GetFlush(data.constData()); + const auto flushType = buffer->type(); + const auto flushId = BufferUtils::extractBuffer(buffer->id()); + if (flushType == Sink::Flush::FlushSynchronization) { + mSynchronizer->flush(flushType, flushId); + } else { + mUserQueue.startTransaction(); + enqueueCommand(mUserQueue, Commands::FlushCommand, data); + mUserQueue.commit(); + } + } + +} + +void CommandProcessor::processSynchronizeCommand(const QByteArray &data) +{ + flatbuffers::Verifier verifier((const uint8_t *)data.constData(), data.size()); + if (Sink::Commands::VerifySynchronizeBuffer(verifier)) { + auto buffer = Sink::Commands::GetSynchronize(data.constData()); + auto timer = QSharedPointer::create(); + timer->start(); + auto job = KAsync::null(); + Sink::QueryBase query; + if (buffer->query()) { + auto data = QByteArray::fromStdString(buffer->query()->str()); + QDataStream stream(&data, QIODevice::ReadOnly); + stream >> query; + } + job = synchronizeWithSource(query); + job.then([timer](const KAsync::Error &error) { + if (error) { + SinkWarning() << "Sync failed: " << error.errorMessage; + return KAsync::error(error); + } else { + SinkTrace() << "Sync took " << Sink::Log::TraceTime(timer->elapsed()); + return KAsync::null(); + } + }) + .exec(); + return; + } else { + SinkWarning() << "received invalid command"; + } +} + +// void CommandProcessor::processRevisionReplayedCommand(const QByteArray &data) +// { +// flatbuffers::Verifier verifier((const uint8_t *)commandBuffer.constData(), commandBuffer.size()); +// if (Sink::Commands::VerifyRevisionReplayedBuffer(verifier)) { +// auto buffer = Sink::Commands::GetRevisionReplayed(commandBuffer.constData()); +// client.currentRevision = buffer->revision(); +// } else { +// SinkWarning() << "received invalid command"; +// } +// loadResource().setLowerBoundRevision(lowerBoundRevision()); +// } + +KAsync::Job CommandProcessor::synchronizeWithSource(const Sink::QueryBase &query) +{ + return KAsync::start([this, query] { + Sink::Notification n; + n.id = "sync"; + n.type = Sink::Notification::Status; + n.message = "Synchronization has started."; + n.code = Sink::ApplicationDomain::BusyStatus; + emit notify(n); + + SinkLog() << " Synchronizing"; + return mSynchronizer->synchronize(query) + .then([this](const KAsync::Error &error) { + if (!error) { + SinkLog() << "Done Synchronizing"; + Sink::Notification n; + n.id = "sync"; + n.type = Sink::Notification::Status; + n.message = "Synchronization has ended."; + n.code = Sink::ApplicationDomain::ConnectedStatus; + emit notify(n); + return KAsync::null(); + } + return KAsync::error(error); + }); + }); } void CommandProcessor::setOldestUsedRevision(qint64 revision) @@ -186,6 +332,27 @@ void CommandProcessor::setInspector(const QSharedPointer &inspector) void CommandProcessor::setSynchronizer(const QSharedPointer &synchronizer) { mSynchronizer = synchronizer; + mSynchronizer->setup([this](int commandId, const QByteArray &data) { + enqueueCommand(mSynchronizerQueue, commandId, data); + }, mSynchronizerQueue); + + QObject::connect(mSynchronizer.data(), &Synchronizer::replayingChanges, [this]() { + Sink::Notification n; + n.id = "changereplay"; + n.type = Sink::Notification::Status; + n.message = "Replaying changes."; + n.code = Sink::ApplicationDomain::BusyStatus; + emit notify(n); + }); + QObject::connect(mSynchronizer.data(), &Synchronizer::changesReplayed, [this]() { + Sink::Notification n; + n.id = "changereplay"; + n.type = Sink::Notification::Status; + n.message = "All changes have been replayed."; + n.code = Sink::ApplicationDomain::ConnectedStatus; + emit notify(n); + }); + QObject::connect(mSynchronizer.data(), &Synchronizer::notify, this, &CommandProcessor::notify); setOldestUsedRevision(mSynchronizer->getLastReplayedRevision()); } @@ -213,3 +380,42 @@ KAsync::Job CommandProcessor::flush(void const *command, size_t size) return KAsync::error(-1, "Invalid flush command."); } +static void waitForDrained(KAsync::Future &f, MessageQueue &queue) +{ + if (queue.isEmpty()) { + f.setFinished(); + } else { + QObject::connect(&queue, &MessageQueue::drained, [&f]() { f.setFinished(); }); + } +}; + +KAsync::Job CommandProcessor::processAllMessages() +{ + // We have to wait for all items to be processed to ensure the synced items are available when a query gets executed. + // TODO: report errors while processing sync? + // TODO JOBAPI: A helper that waits for n events and then continues? + return KAsync::start([this](KAsync::Future &f) { + if (mCommitQueueTimer.isActive()) { + auto context = new QObject; + QObject::connect(&mCommitQueueTimer, &QTimer::timeout, context, [&f, context]() { + delete context; + f.setFinished(); + }); + } else { + f.setFinished(); + } + }) + .then([this](KAsync::Future &f) { waitForDrained(f, mSynchronizerQueue); }) + .then([this](KAsync::Future &f) { waitForDrained(f, mUserQueue); }) + .then([this](KAsync::Future &f) { + if (mSynchronizer->allChangesReplayed()) { + f.setFinished(); + } else { + auto context = new QObject; + QObject::connect(mSynchronizer.data(), &ChangeReplay::changesReplayed, context, [&f, context]() { + delete context; + f.setFinished(); + }); + } + }); +} diff --git a/common/commandprocessor.h b/common/commandprocessor.h index d00cf43..a807f46 100644 --- a/common/commandprocessor.h +++ b/common/commandprocessor.h @@ -22,19 +22,20 @@ #include "sink_export.h" #include +#include #include #include #include "log.h" #include "notification.h" - -class MessageQueue; +#include "messagequeue.h" namespace Sink { class Pipeline; class Inspector; class Synchronizer; class QueuedCommand; + class QueryBase; /** * Drives the pipeline using the output from all command queues @@ -45,13 +46,17 @@ class CommandProcessor : public QObject SINK_DEBUG_AREA("commandprocessor") public: - CommandProcessor(Sink::Pipeline *pipeline, QList commandQueues); + CommandProcessor(Sink::Pipeline *pipeline, const QByteArray &instanceId); void setOldestUsedRevision(qint64 revision); void setInspector(const QSharedPointer &inspector); void setSynchronizer(const QSharedPointer &synchronizer); + void processCommand(int commandId, const QByteArray &data); + + KAsync::Job processAllMessages(); + signals: void notify(Notification); void error(int errorCode, const QString &errorMessage); @@ -68,9 +73,16 @@ private slots: KAsync::Job processPipeline(); private: + void processFlushCommand(const QByteArray &data); + void processSynchronizeCommand(const QByteArray &data); + // void processRevisionReplayedCommand(const QByteArray &data); + KAsync::Job flush(void const *command, size_t size); + KAsync::Job synchronizeWithSource(const Sink::QueryBase &query); Sink::Pipeline *mPipeline; + MessageQueue mUserQueue; + MessageQueue mSynchronizerQueue; // Ordered by priority QList mCommandQueues; bool mProcessingLock; @@ -78,6 +90,7 @@ private: qint64 mLowerBoundRevision; QSharedPointer mSynchronizer; QSharedPointer mInspector; + QTimer mCommitQueueTimer; }; }; diff --git a/common/genericresource.cpp b/common/genericresource.cpp index 38da6bf..82112b3 100644 --- a/common/genericresource.cpp +++ b/common/genericresource.cpp @@ -19,32 +19,13 @@ */ #include "genericresource.h" -#include "entitybuffer.h" #include "pipeline.h" -#include "queuedcommand_generated.h" -#include "createentity_generated.h" -#include "modifyentity_generated.h" -#include "deleteentity_generated.h" -#include "inspection_generated.h" -#include "notification_generated.h" -#include "flush_generated.h" #include "domainadaptor.h" -#include "commands.h" -#include "index.h" #include "log.h" -#include "definitions.h" -#include "bufferutils.h" -#include "adaptorfactoryregistry.h" #include "synchronizer.h" #include "commandprocessor.h" - -#include -#include -#include - -static int sBatchSize = 100; -// This interval directly affects the roundtrip time of single commands -static int sCommitInterval = 10; +#include "definitions.h" +#include "storage.h" using namespace Sink; using namespace Sink::Storage; @@ -52,20 +33,14 @@ using namespace Sink::Storage; GenericResource::GenericResource(const ResourceContext &resourceContext, const QSharedPointer &pipeline ) : Sink::Resource(), mResourceContext(resourceContext), - mUserQueue(Sink::storageLocation(), resourceContext.instanceId() + ".userqueue"), - mSynchronizerQueue(Sink::storageLocation(), resourceContext.instanceId() + ".synchronizerqueue"), mPipeline(pipeline ? pipeline : QSharedPointer::create(resourceContext)), + mProcessor(QSharedPointer::create(mPipeline.data(), resourceContext.instanceId())), mError(0), mClientLowerBoundRevision(std::numeric_limits::max()) { - mProcessor = std::unique_ptr(new CommandProcessor(mPipeline.data(), QList() << &mUserQueue << &mSynchronizerQueue)); - QObject::connect(mProcessor.get(), &CommandProcessor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); }); - QObject::connect(mProcessor.get(), &CommandProcessor::notify, this, &GenericResource::notify); + QObject::connect(mProcessor.data(), &CommandProcessor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); }); + QObject::connect(mProcessor.data(), &CommandProcessor::notify, this, &GenericResource::notify); QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, this, &Resource::revisionUpdated); - - mCommitQueueTimer.setInterval(sCommitInterval); - mCommitQueueTimer.setSingleShot(true); - QObject::connect(&mCommitQueueTimer, &QTimer::timeout, &mUserQueue, &MessageQueue::commit); } GenericResource::~GenericResource() @@ -80,32 +55,6 @@ void GenericResource::setupPreprocessors(const QByteArray &type, const QVector &synchronizer) { mSynchronizer = synchronizer; - mSynchronizer->setup([this](int commandId, const QByteArray &data) { - enqueueCommand(mSynchronizerQueue, commandId, data); - }, mSynchronizerQueue); - { - auto ret = QObject::connect(mSynchronizer.data(), &Synchronizer::replayingChanges, [this]() { - Sink::Notification n; - n.id = "changereplay"; - n.type = Sink::Notification::Status; - n.message = "Replaying changes."; - n.code = Sink::ApplicationDomain::BusyStatus; - emit notify(n); - }); - Q_ASSERT(ret); - } - { - auto ret = QObject::connect(mSynchronizer.data(), &Synchronizer::changesReplayed, [this]() { - Sink::Notification n; - n.id = "changereplay"; - n.type = Sink::Notification::Status; - n.message = "All changes have been replayed."; - n.code = Sink::ApplicationDomain::ConnectedStatus; - emit notify(n); - }); - Q_ASSERT(ret); - } - mProcessor->setSynchronizer(synchronizer); QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, mSynchronizer.data(), &ChangeReplay::revisionChanged, Qt::QueuedConnection); QObject::connect(mSynchronizer.data(), &ChangeReplay::changesReplayed, this, &GenericResource::updateLowerBoundRevision); @@ -146,119 +95,19 @@ int GenericResource::error() const return mError; } -void GenericResource::enqueueCommand(MessageQueue &mq, int commandId, const QByteArray &data) -{ - flatbuffers::FlatBufferBuilder fbb; - auto commandData = Sink::EntityBuffer::appendAsVector(fbb, data.constData(), data.size()); - auto buffer = Sink::CreateQueuedCommand(fbb, commandId, commandData); - Sink::FinishQueuedCommandBuffer(fbb, buffer); - mq.enqueue(fbb.GetBufferPointer(), fbb.GetSize()); -} - void GenericResource::processCommand(int commandId, const QByteArray &data) { - if (commandId == Commands::FlushCommand) { - processFlushCommand(data); - return; - } - static int modifications = 0; - mUserQueue.startTransaction(); - enqueueCommand(mUserQueue, commandId, data); - modifications++; - if (modifications >= sBatchSize) { - mUserQueue.commit(); - modifications = 0; - mCommitQueueTimer.stop(); - } else { - mCommitQueueTimer.start(); - } -} - -void GenericResource::processFlushCommand(const QByteArray &data) -{ - flatbuffers::Verifier verifier((const uint8_t *)data.constData(), data.size()); - if (Sink::Commands::VerifyFlushBuffer(verifier)) { - auto buffer = Sink::Commands::GetFlush(data.constData()); - const auto flushType = buffer->type(); - const auto flushId = BufferUtils::extractBuffer(buffer->id()); - if (flushType == Sink::Flush::FlushSynchronization) { - mSynchronizer->flush(flushType, flushId); - } else { - mUserQueue.startTransaction(); - enqueueCommand(mUserQueue, Commands::FlushCommand, data); - mUserQueue.commit(); - } - } - + mProcessor->processCommand(commandId, data); } KAsync::Job GenericResource::synchronizeWithSource(const Sink::QueryBase &query) { - return KAsync::start([this, query] { - - Sink::Notification n; - n.id = "sync"; - n.type = Sink::Notification::Status; - n.message = "Synchronization has started."; - n.code = Sink::ApplicationDomain::BusyStatus; - emit notify(n); - - SinkLog() << " Synchronizing"; - return mSynchronizer->synchronize(query) - .then([this](const KAsync::Error &error) { - if (!error) { - SinkLog() << "Done Synchronizing"; - Sink::Notification n; - n.id = "sync"; - n.type = Sink::Notification::Status; - n.message = "Synchronization has ended."; - n.code = Sink::ApplicationDomain::ConnectedStatus; - emit notify(n); - return KAsync::null(); - } - return KAsync::error(error); - }); - }); + return mSynchronizer->synchronize(query); } -static void waitForDrained(KAsync::Future &f, MessageQueue &queue) -{ - if (queue.isEmpty()) { - f.setFinished(); - } else { - QObject::connect(&queue, &MessageQueue::drained, [&f]() { f.setFinished(); }); - } -}; - KAsync::Job GenericResource::processAllMessages() { - // We have to wait for all items to be processed to ensure the synced items are available when a query gets executed. - // TODO: report errors while processing sync? - // TODO JOBAPI: A helper that waits for n events and then continues? - return KAsync::start([this](KAsync::Future &f) { - if (mCommitQueueTimer.isActive()) { - auto context = new QObject; - QObject::connect(&mCommitQueueTimer, &QTimer::timeout, context, [&f, context]() { - delete context; - f.setFinished(); - }); - } else { - f.setFinished(); - } - }) - .then([this](KAsync::Future &f) { waitForDrained(f, mSynchronizerQueue); }) - .then([this](KAsync::Future &f) { waitForDrained(f, mUserQueue); }) - .then([this](KAsync::Future &f) { - if (mSynchronizer->allChangesReplayed()) { - f.setFinished(); - } else { - auto context = new QObject; - QObject::connect(mSynchronizer.data(), &ChangeReplay::changesReplayed, context, [&f, context]() { - delete context; - f.setFinished(); - }); - } - }); + return mProcessor->processAllMessages(); } void GenericResource::updateLowerBoundRevision() diff --git a/common/genericresource.h b/common/genericresource.h index 0bc47da..cc73f50 100644 --- a/common/genericresource.h +++ b/common/genericresource.h @@ -47,9 +47,6 @@ public: virtual ~GenericResource(); virtual void processCommand(int commandId, const QByteArray &data) Q_DECL_OVERRIDE; - virtual void processFlushCommand(const QByteArray &data); - virtual KAsync::Job synchronizeWithSource(const Sink::QueryBase &query) Q_DECL_OVERRIDE; - virtual KAsync::Job processAllMessages() Q_DECL_OVERRIDE; virtual void setLowerBoundRevision(qint64 revision) Q_DECL_OVERRIDE; int error() const; @@ -57,6 +54,9 @@ public: static void removeFromDisk(const QByteArray &instanceIdentifier); static qint64 diskUsage(const QByteArray &instanceIdentifier); + KAsync::Job synchronizeWithSource(const Sink::QueryBase &query); + KAsync::Job processAllMessages(); + private slots: void updateLowerBoundRevision(); @@ -69,15 +69,12 @@ protected: void enqueueCommand(MessageQueue &mq, int commandId, const QByteArray &data); ResourceContext mResourceContext; - MessageQueue mUserQueue; - MessageQueue mSynchronizerQueue; - QSharedPointer mPipeline; private: - std::unique_ptr mProcessor; + QSharedPointer mPipeline; + QSharedPointer mProcessor; QSharedPointer mSynchronizer; int mError; - QTimer mCommitQueueTimer; qint64 mClientLowerBoundRevision; }; diff --git a/common/listener.cpp b/common/listener.cpp index 9e80c45..d3ef0f1 100644 --- a/common/listener.cpp +++ b/common/listener.cpp @@ -30,15 +30,12 @@ #include "common/commandcompletion_generated.h" #include "common/handshake_generated.h" #include "common/revisionupdate_generated.h" -#include "common/synchronize_generated.h" #include "common/notification_generated.h" #include "common/revisionreplayed_generated.h" #include #include #include -#include -#include Listener::Listener(const QByteArray &resourceInstanceIdentifier, const QByteArray &resourceType, QObject *parent) : QObject(parent), @@ -235,39 +232,7 @@ void Listener::processCommand(int commandId, uint messageId, const QByteArray &c } break; } - case Sink::Commands::SynchronizeCommand: { - flatbuffers::Verifier verifier((const uint8_t *)commandBuffer.constData(), commandBuffer.size()); - if (Sink::Commands::VerifySynchronizeBuffer(verifier)) { - auto buffer = Sink::Commands::GetSynchronize(commandBuffer.constData()); - SinkTrace() << QString("Synchronize request (id %1) from %2").arg(messageId).arg(client.name); - auto timer = QSharedPointer::create(); - timer->start(); - auto job = KAsync::null(); - Sink::QueryBase query; - if (buffer->query()) { - auto data = QByteArray::fromStdString(buffer->query()->str()); - QDataStream stream(&data, QIODevice::ReadOnly); - stream >> query; - } - job = loadResource().synchronizeWithSource(query); - job.then([callback, timer](const KAsync::Error &error) { - if (error) { - SinkWarning() << "Sync failed: " << error.errorMessage; - callback(false); - return KAsync::error(error); - } else { - SinkTrace() << "Sync took " << Sink::Log::TraceTime(timer->elapsed()); - callback(true); - return KAsync::null(); - } - }) - .exec(); - return; - } else { - SinkWarning() << "received invalid command"; - } - break; - } + case Sink::Commands::SynchronizeCommand: case Sink::Commands::InspectionCommand: case Sink::Commands::DeleteEntityCommand: case Sink::Commands::ModifyEntityCommand: @@ -293,7 +258,8 @@ void Listener::processCommand(int commandId, uint messageId, const QByteArray &c SinkWarning() << "received invalid command"; } loadResource().setLowerBoundRevision(lowerBoundRevision()); - } break; + } + break; case Sink::Commands::RemoveFromDiskCommand: { SinkLog() << QString("Received a remove from disk command from %1").arg(client.name); //Close the resource to ensure no transactions are open diff --git a/common/resource.cpp b/common/resource.cpp index f81f094..533a132 100644 --- a/common/resource.cpp +++ b/common/resource.cpp @@ -46,16 +46,6 @@ void Resource::processCommand(int commandId, const QByteArray &data) Q_UNUSED(data) } -KAsync::Job Resource::synchronizeWithSource(const Sink::QueryBase &query) -{ - return KAsync::null(); -} - -KAsync::Job Resource::processAllMessages() -{ - return KAsync::null(); -} - void Resource::setLowerBoundRevision(qint64 revision) { Q_UNUSED(revision) diff --git a/common/resource.h b/common/resource.h index 3cc326c..7789c53 100644 --- a/common/resource.h +++ b/common/resource.h @@ -42,16 +42,6 @@ public: virtual void processCommand(int commandId, const QByteArray &data); - /** - * Execute synchronization with the source. - */ - virtual KAsync::Job synchronizeWithSource(const Sink::QueryBase &); - - /** - * Process all internal messages, thus ensuring the store is up to date and no pending modifications are left. - */ - virtual KAsync::Job processAllMessages(); - /** * Set the lowest revision that is still referenced by external clients. */ diff --git a/common/synchronizer.h b/common/synchronizer.h index 99d4877..ae597bd 100644 --- a/common/synchronizer.h +++ b/common/synchronizer.h @@ -66,7 +66,7 @@ public slots: protected: ///Base implementation calls the replay$Type calls - virtual KAsync::Job replay(const QByteArray &type, const QByteArray &key, const QByteArray &value) Q_DECL_OVERRIDE; + KAsync::Job replay(const QByteArray &type, const QByteArray &key, const QByteArray &value) Q_DECL_OVERRIDE; virtual bool canReplay(const QByteArray &type, const QByteArray &key, const QByteArray &value) Q_DECL_OVERRIDE; protected: diff --git a/examples/dummyresource/resourcefactory.cpp b/examples/dummyresource/resourcefactory.cpp index 8e81c79..3565d57 100644 --- a/examples/dummyresource/resourcefactory.cpp +++ b/examples/dummyresource/resourcefactory.cpp @@ -113,6 +113,13 @@ class DummySynchronizer : public Sink::Synchronizer { KAsync::Job synchronizeWithSource(const Sink::QueryBase &) Q_DECL_OVERRIDE { SinkLog() << " Synchronizing with the source"; + SinkTrace() << "Synchronize with source and sending a notification about it"; + Sink::Notification n; + n.id = "connected"; + n.type = Sink::Notification::Status; + n.message = "We're connected"; + n.code = Sink::ApplicationDomain::ConnectedStatus; + emit notify(n); return KAsync::syncStart([this]() { synchronize(ENTITY_TYPE_EVENT, DummyStore::instance().events(), [this](const QByteArray &ridBuffer, const QMap &data) { return createEvent(ridBuffer, data); @@ -174,18 +181,6 @@ DummyResource::~DummyResource() } -KAsync::Job DummyResource::synchronizeWithSource(const Sink::QueryBase &query) -{ - SinkTrace() << "Synchronize with source and sending a notification about it"; - Sink::Notification n; - n.id = "connected"; - n.type = Sink::Notification::Status; - n.message = "We're connected"; - n.code = Sink::ApplicationDomain::ConnectedStatus; - emit notify(n); - return GenericResource::synchronizeWithSource(query); -} - DummyResourceFactory::DummyResourceFactory(QObject *parent) : Sink::ResourceFactory(parent) { diff --git a/examples/dummyresource/resourcefactory.h b/examples/dummyresource/resourcefactory.h index 2eb7558..bc35d82 100644 --- a/examples/dummyresource/resourcefactory.h +++ b/examples/dummyresource/resourcefactory.h @@ -31,8 +31,6 @@ class DummyResource : public Sink::GenericResource public: DummyResource(const Sink::ResourceContext &resourceContext, const QSharedPointer &pipeline = QSharedPointer()); virtual ~DummyResource(); - - KAsync::Job synchronizeWithSource(const Sink::QueryBase &) Q_DECL_OVERRIDE; }; class DummyResourceFactory : public Sink::ResourceFactory diff --git a/tests/testimplementations.h b/tests/testimplementations.h index 6fe08f7..d1a912a 100644 --- a/tests/testimplementations.h +++ b/tests/testimplementations.h @@ -110,11 +110,6 @@ public: TestResource(const Sink::ResourceContext &resourceContext, QSharedPointer pipeline) : Sink::GenericResource(resourceContext, pipeline) { } - - KAsync::Job synchronizeWithSource(const Sink::QueryBase &query) Q_DECL_OVERRIDE - { - return KAsync::null(); - } }; template -- cgit v1.2.3