From 1c7e8fd482bb67a5487449948488bd286a3504c1 Mon Sep 17 00:00:00 2001 From: Aaron Seigo Date: Wed, 17 Dec 2014 08:27:31 +0100 Subject: a basically-working Pipeline implementation still a skeleton rather than a full body with flesh and blood, but it is getting there! --- common/pipeline.cpp | 199 +++++++++++++++++++++++++++++++++++--- common/pipeline.h | 80 +++++++++++++-- common/resource.cpp | 10 +- common/resource.h | 7 +- dummyresource/resourcefactory.cpp | 25 ++++- dummyresource/resourcefactory.h | 8 +- synchronizer/listener.cpp | 23 +++-- synchronizer/listener.h | 1 + 8 files changed, 315 insertions(+), 38 deletions(-) diff --git a/common/pipeline.cpp b/common/pipeline.cpp index 41def7c..5606c30 100644 --- a/common/pipeline.cpp +++ b/common/pipeline.cpp @@ -20,7 +20,9 @@ #include "pipeline.h" +#include #include +#include namespace Akonadi2 { @@ -28,47 +30,214 @@ namespace Akonadi2 class Pipeline::Private { public: - Private(const QString &storageName) - : storage(QStandardPaths::writableLocation(QStandardPaths::QStandardPaths::GenericDataLocation) + "/akonadi2", storageName, Akonadi2::Storage::ReadWrite) + Private(const QString &resourceName) + : storage(QStandardPaths::writableLocation(QStandardPaths::GenericDataLocation) + "/akonadi2/storage", resourceName), + stepScheduled(false) { - } - Akonadi2::Storage storage; + Storage storage; + QVector nullPipeline; + QVector newPipeline; + QVector modifiedPipeline; + QVector deletedPipeline; + QVector activePipelines; + bool stepScheduled; }; -Pipeline::Pipeline(const QString &storageName) - : d(new Private(storageName)) +Pipeline::Pipeline(const QString &resourceName, QObject *parent) + : QObject(parent), + d(new Private(resourceName)) { } Pipeline::~Pipeline() { + delete d; } -Storage &Pipeline::storage() +Storage &Pipeline::storage() const { return d->storage; } -void Pipeline::null(uint messageId, const char *key, size_t keySize, const char *buffer, size_t bufferSize) +void Pipeline::null() +{ + //TODO: is there really any need for the null pipeline? if so, it should be doing something ;) + PipelineState state(this, NullPipeline, QByteArray(), d->nullPipeline); + d->activePipelines << state; + state.step(); +} + +void Pipeline::newEntity(const QByteArray &key, flatbuffers::FlatBufferBuilder &entity) +{ + PipelineState state(this, NewPipeline, key, d->newPipeline); + d->activePipelines << state; + state.step(); +} + +void Pipeline::modifiedEntity(const QByteArray &key, flatbuffers::FlatBufferBuilder &entityDelta) +{ + PipelineState state(this, ModifiedPipeline, key, d->modifiedPipeline); + d->activePipelines << state; + state.step(); +} + +void Pipeline::deletedEntity(const QByteArray &key) +{ + PipelineState state(this, DeletedPipeline, key, d->deletedPipeline); + d->activePipelines << state; + state.step(); +} + +void Pipeline::pipelineStepped(const PipelineState &state) +{ + scheduleStep(); +} + +void Pipeline::scheduleStep() +{ + if (!d->stepScheduled) { + d->stepScheduled = true; + QMetaObject::invokeMethod(this, "stepPipelines", Qt::QueuedConnection); + } +} + +void Pipeline::stepPipelines() +{ + for (PipelineState &state: d->activePipelines) { + if (state.isIdle()) { + state.step(); + } + } + + d->stepScheduled = false; +} + +void Pipeline::pipelineCompleted(const PipelineState &state) +{ + //TODO finalize the datastore, inform clients of the new rev + const int index = d->activePipelines.indexOf(state); + if (index > -1) { + d->activePipelines.remove(index); + } + + if (state.type() != NullPipeline) { + emit revisionUpdated(); + } + scheduleStep(); +} + + +class PipelineState::Private : public QSharedData +{ +public: + Private(Pipeline *p, Pipeline::Type t, const QByteArray &k, QVector filters) + : pipeline(p), + type(t), + key(k), + filterIt(filters), + idle(true) + {} + + Private() + : pipeline(0), + filterIt(QVector()), + idle(true) + {} + + Pipeline *pipeline; + Pipeline::Type type; + QByteArray key; + QVectorIterator filterIt; + bool idle; +}; + +PipelineState::PipelineState() + : d(new Private()) +{ + +} + +PipelineState::PipelineState(Pipeline *pipeline, Pipeline::Type type, const QByteArray &key, const QVector &filters) + : d(new Private(pipeline, type, key, filters)) +{ +} + +PipelineState::PipelineState(const PipelineState &other) + : d(other.d) +{ +} + +PipelineState::~PipelineState() +{ +} + +PipelineState &PipelineState::operator=(const PipelineState &rhs) +{ + d = rhs.d; + return *this; +} + +bool PipelineState::operator==(const PipelineState &rhs) +{ + return d == rhs.d; +} + +bool PipelineState::isIdle() const +{ + return d->idle; +} + +QByteArray PipelineState::key() const +{ + return d->key; +} + +Pipeline::Type PipelineState::type() const +{ + return d->type; +} + +void PipelineState::step() +{ + if (!d->pipeline) { + return; + } + + d->idle = false; + if (d->filterIt.hasNext()) { + d->filterIt.next()->process(*this); + } else { + d->pipeline->pipelineCompleted(*this); + } +} + +void PipelineState::processingCompleted(PipelineFilter *filter) +{ + if (d->pipeline && filter == d->filterIt.peekPrevious()) { + d->idle = true; + d->pipeline->pipelineStepped(*this); + } +} + +PipelineFilter::PipelineFilter() + : d(0) { - d->storage.write(key, keySize, buffer, bufferSize); } -void Pipeline::newEntity(uint messageId, const char *key, size_t keySize, const char *buffer, size_t bufferSize) +PipelineFilter::~PipelineFilter() { - d->storage.write(key, keySize, buffer, bufferSize); } -void Pipeline::modifiedEntity(uint messageId, const char *key, size_t keySize, const char *buffer, size_t bufferSize) +void PipelineFilter::process(PipelineState state) { - d->storage.write(key, keySize, buffer, bufferSize); + processingCompleted(state); } -void Pipeline::deletedEntity(uint messageId, const char *key, size_t keySize, const char *buffer, size_t bufferSize) +void PipelineFilter::processingCompleted(PipelineState state) { - d->storage.write(key, keySize, buffer, bufferSize); + state.processingCompleted(this); } } // namespace Akonadi2 diff --git a/common/pipeline.h b/common/pipeline.h index 635e630..6ad78df 100644 --- a/common/pipeline.h +++ b/common/pipeline.h @@ -20,23 +20,89 @@ #pragma once +#include + +#include +#include + #include #include namespace Akonadi2 { -class AKONADI2COMMON_EXPORT Pipeline +class PipelineState; +class PipelineFilter; + +class AKONADI2COMMON_EXPORT Pipeline : public QObject { + Q_OBJECT + public: - Pipeline(const QString &storagePath); + enum Type { NullPipeline, NewPipeline, ModifiedPipeline, DeletedPipeline }; + + Pipeline(const QString &storagePath, QObject *parent = 0); ~Pipeline(); - Storage &storage(); - void null(uint messageId, const char *key, size_t keySize, const char *buffer, size_t bufferSize); - void newEntity(uint messageId, const char *key, size_t keySize, const char *buffer, size_t bufferSize); - void modifiedEntity(uint messageId, const char *key, size_t keySize, const char *buffer, size_t bufferSize); - void deletedEntity(uint messageId, const char *key, size_t keySize, const char *buffer, size_t bufferSize); + Storage &storage() const; + + // domain objects needed here + void null(); + void newEntity(const QByteArray &key, flatbuffers::FlatBufferBuilder &entity); + void modifiedEntity(const QByteArray &key, flatbuffers::FlatBufferBuilder &entityDelta); + void deletedEntity(const QByteArray &key); + +Q_SIGNALS: + void revisionUpdated(); + +private Q_SLOTS: + void stepPipelines(); + +private: + void pipelineStepped(const PipelineState &state); + void pipelineCompleted(const PipelineState &state); + void scheduleStep(); + + friend class PipelineState; + + class Private; + Private * const d; +}; + +class AKONADI2COMMON_EXPORT PipelineState +{ +public: + // domain objects? + PipelineState(); + PipelineState(Pipeline *pipeline, Pipeline::Type type, const QByteArray &key, const QVector &filters); + PipelineState(const PipelineState &other); + ~PipelineState(); + + PipelineState &operator=(const PipelineState &rhs); + bool operator==(const PipelineState &rhs); + + bool isIdle() const; + QByteArray key() const; + Pipeline::Type type() const; + + void step(); + void processingCompleted(PipelineFilter *filter); + +private: + class Private; + QExplicitlySharedDataPointer d; +}; + +class AKONADI2COMMON_EXPORT PipelineFilter +{ +public: + PipelineFilter(); + virtual ~PipelineFilter(); + + virtual void process(PipelineState state); + +protected: + void processingCompleted(PipelineState state); private: class Private; diff --git a/common/resource.cpp b/common/resource.cpp index 11a03ca..ae28485 100644 --- a/common/resource.cpp +++ b/common/resource.cpp @@ -39,12 +39,18 @@ Resource::~Resource() //delete d; } -void Resource::processCommand(uint messageId, int commandId, const QByteArray &data, uint size, Pipeline *pipeline) +void Resource::processCommand(int commandId, const QByteArray &data, uint size, Pipeline *pipeline) { + Q_UNUSED(commandId) + Q_UNUSED(data) + Q_UNUSED(size) + Q_UNUSED(pipeline) + pipeline->null(); } -void Resource::synchronizeWithSource() +void Resource::synchronizeWithSource(Pipeline *pipeline) { + pipeline->null(); } class ResourceFactory::Private diff --git a/common/resource.h b/common/resource.h index 53c0bc1..0f65e1f 100644 --- a/common/resource.h +++ b/common/resource.h @@ -18,8 +18,8 @@ * License along with this library. If not, see . */ -#include #include +#include #include namespace Akonadi2 @@ -32,9 +32,8 @@ public: Resource(); virtual ~Resource(); - //TODO: this will need to be async - virtual void processCommand(uint messageId, int commandId, const QByteArray &data, uint size, Pipeline *pipeline); - virtual void synchronizeWithSource(); + virtual void processCommand(int commandId, const QByteArray &data, uint size, Pipeline *pipeline); + virtual void synchronizeWithSource(Pipeline *pipeline); private: class Private; diff --git a/dummyresource/resourcefactory.cpp b/dummyresource/resourcefactory.cpp index 37bfdac..bd85b4f 100644 --- a/dummyresource/resourcefactory.cpp +++ b/dummyresource/resourcefactory.cpp @@ -19,6 +19,7 @@ #include "resourcefactory.h" #include "facade.h" +#include "dummycalendar_generated.h" DummyResource::DummyResource() : Akonadi2::Resource() @@ -26,9 +27,29 @@ DummyResource::DummyResource() } -void DummyResource::synchronizeWithSource() +void DummyResource::synchronizeWithSource(Akonadi2::Pipeline *pipeline) { - // TODO populate the storage + // TODO actually populate the storage with new items + auto builder = DummyCalendar::DummyEventBuilder(m_fbb); + builder .add_summary(m_fbb.CreateString("summary summary!")); + auto buffer = builder.Finish(); + DummyCalendar::FinishDummyEventBuffer(m_fbb, buffer); + pipeline->newEntity("fakekey", m_fbb); + m_fbb.Clear(); +} + +void DummyResource::processCommand(int commandId, const QByteArray &data, uint size, Akonadi2::Pipeline *pipeline) +{ + Q_UNUSED(commandId) + Q_UNUSED(data) + Q_UNUSED(size) + //TODO reallly process the commands :) + auto builder = DummyCalendar::DummyEventBuilder(m_fbb); + builder .add_summary(m_fbb.CreateString("summary summary!")); + auto buffer = builder.Finish(); + DummyCalendar::FinishDummyEventBuffer(m_fbb, buffer); + pipeline->newEntity("fakekey", m_fbb); + m_fbb.Clear(); } DummyResourceFactory::DummyResourceFactory(QObject *parent) diff --git a/dummyresource/resourcefactory.h b/dummyresource/resourcefactory.h index 7c40084..807a654 100644 --- a/dummyresource/resourcefactory.h +++ b/dummyresource/resourcefactory.h @@ -21,6 +21,8 @@ #include "common/resource.h" +#include + //TODO: a little ugly to have this in two places, once here and once in Q_PLUGIN_METADATA #define PLUGIN_NAME "org.kde.dummy" @@ -28,7 +30,11 @@ class DummyResource : public Akonadi2::Resource { public: DummyResource(); - void synchronizeWithSource(); + void synchronizeWithSource(Akonadi2::Pipeline *pipeline); + void processCommand(int commandId, const QByteArray &data, uint size, Akonadi2::Pipeline *pipeline); + +private: + flatbuffers::FlatBufferBuilder m_fbb; }; class DummyResourceFactory : public Akonadi2::ResourceFactory diff --git a/synchronizer/listener.cpp b/synchronizer/listener.cpp index 18442e7..328d4d6 100644 --- a/synchronizer/listener.cpp +++ b/synchronizer/listener.cpp @@ -38,10 +38,12 @@ Listener::Listener(const QString &resourceName, QObject *parent) m_revision(0), m_resourceName(resourceName), m_resource(0), - m_pipeline(new Akonadi2::Pipeline(resourceName)), + m_pipeline(new Akonadi2::Pipeline(resourceName, parent)), m_clientBufferProcessesTimer(new QTimer(this)), m_messageId(0) { + connect(m_pipeline, &Akonadi2::Pipeline::revisionUpdated, + this, &Listener::refreshRevision); connect(m_server, &QLocalServer::newConnection, this, &Listener::acceptConnection); Akonadi2::Console::main()->log(QString("Trying to open %1").arg(resourceName)); @@ -69,7 +71,6 @@ Listener::Listener(const QString &resourceName, QObject *parent) Listener::~Listener() { - delete m_pipeline; } void Listener::setRevision(unsigned long long revision) @@ -219,7 +220,7 @@ bool Listener::processClientBuffer(Client &client) Akonadi2::Console::main()->log(QString("\tSynchronize request (id %1) from %2").arg(messageId).arg(client.name)); loadResource(); if (m_resource) { - m_resource->synchronizeWithSource(); + m_resource->synchronizeWithSource(m_pipeline); } break; } @@ -227,14 +228,14 @@ bool Listener::processClientBuffer(Client &client) case Akonadi2::Commands::DeleteEntityCommand: case Akonadi2::Commands::ModifyEntityCommand: case Akonadi2::Commands::CreateEntityCommand: - Akonadi2::Console::main()->log(QString("\tCommand id %1 of type %2 from %1").arg(messageId).arg(commandId).arg(client.name)); - m_resource->processCommand(messageId, commandId, client.commandBuffer, size, m_pipeline); + Akonadi2::Console::main()->log(QString("\tCommand id %1 of type %2 from %3").arg(messageId).arg(commandId).arg(client.name)); + m_resource->processCommand(commandId, client.commandBuffer, size, m_pipeline); break; default: if (commandId > Akonadi2::Commands::CustomCommand) { loadResource(); if (m_resource) { - m_resource->processCommand(messageId, commandId, client.commandBuffer, size, m_pipeline); + m_resource->processCommand(commandId, client.commandBuffer, size, m_pipeline); } } else { //TODO: handle error: we don't know wtf this command is @@ -243,8 +244,9 @@ bool Listener::processClientBuffer(Client &client) } //TODO: async commands == async sendCommandCompleted - sendCommandCompleted(client, messageId); + Akonadi2::Console::main()->log(QString("\tCompleted command id %1 of type %2 from %3").arg(messageId).arg(commandId).arg(client.name)); client.commandBuffer.remove(0, size); + sendCommandCompleted(client, messageId); return client.commandBuffer.size() >= headerSize; } @@ -275,6 +277,13 @@ void Listener::sendCommandCompleted(Client &client, uint messageId) m_fbb.Clear(); } +void Listener::refreshRevision() +{ + //TODO this should be coming out of m_pipeline->storage() + ++m_revision; + updateClientsWithRevision(); +} + void Listener::updateClientsWithRevision() { auto command = Akonadi2::CreateRevisionUpdate(m_fbb, m_revision); diff --git a/synchronizer/listener.h b/synchronizer/listener.h index b294277..357ae37 100644 --- a/synchronizer/listener.h +++ b/synchronizer/listener.h @@ -76,6 +76,7 @@ private Q_SLOTS: void checkConnections(); void readFromSocket(); void processClientBuffers(); + void refreshRevision(); private: bool processClientBuffer(Client &client); -- cgit v1.2.3