From 1c7e8fd482bb67a5487449948488bd286a3504c1 Mon Sep 17 00:00:00 2001 From: Aaron Seigo Date: Wed, 17 Dec 2014 08:27:31 +0100 Subject: a basically-working Pipeline implementation still a skeleton rather than a full body with flesh and blood, but it is getting there! --- synchronizer/listener.cpp | 23 ++++++++++++++++------- synchronizer/listener.h | 1 + 2 files changed, 17 insertions(+), 7 deletions(-) (limited to 'synchronizer') diff --git a/synchronizer/listener.cpp b/synchronizer/listener.cpp index 18442e7..328d4d6 100644 --- a/synchronizer/listener.cpp +++ b/synchronizer/listener.cpp @@ -38,10 +38,12 @@ Listener::Listener(const QString &resourceName, QObject *parent) m_revision(0), m_resourceName(resourceName), m_resource(0), - m_pipeline(new Akonadi2::Pipeline(resourceName)), + m_pipeline(new Akonadi2::Pipeline(resourceName, parent)), m_clientBufferProcessesTimer(new QTimer(this)), m_messageId(0) { + connect(m_pipeline, &Akonadi2::Pipeline::revisionUpdated, + this, &Listener::refreshRevision); connect(m_server, &QLocalServer::newConnection, this, &Listener::acceptConnection); Akonadi2::Console::main()->log(QString("Trying to open %1").arg(resourceName)); @@ -69,7 +71,6 @@ Listener::Listener(const QString &resourceName, QObject *parent) Listener::~Listener() { - delete m_pipeline; } void Listener::setRevision(unsigned long long revision) @@ -219,7 +220,7 @@ bool Listener::processClientBuffer(Client &client) Akonadi2::Console::main()->log(QString("\tSynchronize request (id %1) from %2").arg(messageId).arg(client.name)); loadResource(); if (m_resource) { - m_resource->synchronizeWithSource(); + m_resource->synchronizeWithSource(m_pipeline); } break; } @@ -227,14 +228,14 @@ bool Listener::processClientBuffer(Client &client) case Akonadi2::Commands::DeleteEntityCommand: case Akonadi2::Commands::ModifyEntityCommand: case Akonadi2::Commands::CreateEntityCommand: - Akonadi2::Console::main()->log(QString("\tCommand id %1 of type %2 from %1").arg(messageId).arg(commandId).arg(client.name)); - m_resource->processCommand(messageId, commandId, client.commandBuffer, size, m_pipeline); + Akonadi2::Console::main()->log(QString("\tCommand id %1 of type %2 from %3").arg(messageId).arg(commandId).arg(client.name)); + m_resource->processCommand(commandId, client.commandBuffer, size, m_pipeline); break; default: if (commandId > Akonadi2::Commands::CustomCommand) { loadResource(); if (m_resource) { - m_resource->processCommand(messageId, commandId, client.commandBuffer, size, m_pipeline); + m_resource->processCommand(commandId, client.commandBuffer, size, m_pipeline); } } else { //TODO: handle error: we don't know wtf this command is @@ -243,8 +244,9 @@ bool Listener::processClientBuffer(Client &client) } //TODO: async commands == async sendCommandCompleted - sendCommandCompleted(client, messageId); + Akonadi2::Console::main()->log(QString("\tCompleted command id %1 of type %2 from %3").arg(messageId).arg(commandId).arg(client.name)); client.commandBuffer.remove(0, size); + sendCommandCompleted(client, messageId); return client.commandBuffer.size() >= headerSize; } @@ -275,6 +277,13 @@ void Listener::sendCommandCompleted(Client &client, uint messageId) m_fbb.Clear(); } +void Listener::refreshRevision() +{ + //TODO this should be coming out of m_pipeline->storage() + ++m_revision; + updateClientsWithRevision(); +} + void Listener::updateClientsWithRevision() { auto command = Akonadi2::CreateRevisionUpdate(m_fbb, m_revision); diff --git a/synchronizer/listener.h b/synchronizer/listener.h index b294277..357ae37 100644 --- a/synchronizer/listener.h +++ b/synchronizer/listener.h @@ -76,6 +76,7 @@ private Q_SLOTS: void checkConnections(); void readFromSocket(); void processClientBuffers(); + void refreshRevision(); private: bool processClientBuffer(Client &client); -- cgit v1.2.3