From 07572b25af45c41a82eb8ddfdecf18e58958788b Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Tue, 28 Jul 2015 21:02:58 +0200 Subject: Forward revision updates through resource --- common/genericresource.cpp | 1 + common/listener.cpp | 24 ++++++------------------ common/listener.h | 5 ++--- common/pipeline.cpp | 2 +- common/pipeline.h | 2 +- common/resource.cpp | 3 ++- common/resource.h | 6 +++++- tests/genericresourcetest.cpp | 2 +- 8 files changed, 19 insertions(+), 26 deletions(-) diff --git a/common/genericresource.cpp b/common/genericresource.cpp index ae06ef4..99d1aaa 100644 --- a/common/genericresource.cpp +++ b/common/genericresource.cpp @@ -171,6 +171,7 @@ void GenericResource::configurePipeline(Akonadi2::Pipeline *pipeline) //TODO figure out lifetime of the processor mProcessor = new Processor(pipeline, QList() << &mUserQueue << &mSynchronizerQueue); QObject::connect(mProcessor, &Processor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); }); + QObject::connect(pipeline, &Pipeline::revisionUpdated, this, &Resource::revisionUpdated); } void GenericResource::onProcessorError(int errorCode, const QString &errorMessage) diff --git a/common/listener.cpp b/common/listener.cpp index 4316c63..2e2e98e 100644 --- a/common/listener.cpp +++ b/common/listener.cpp @@ -45,8 +45,6 @@ Listener::Listener(const QByteArray &resourceInstanceIdentifier, QObject *parent m_clientBufferProcessesTimer(new QTimer(this)), m_messageId(0) { - connect(m_pipeline, &Akonadi2::Pipeline::revisionUpdated, - this, &Listener::refreshRevision); connect(m_server, &QLocalServer::newConnection, this, &Listener::acceptConnection); Trace() << "Trying to open " << m_resourceInstanceIdentifier; @@ -325,18 +323,6 @@ bool Listener::processClientBuffer(Client &client) return false; } -void Listener::sendCurrentRevision(Client &client) -{ - if (!client.socket || !client.socket->isValid()) { - return; - } - - auto command = Akonadi2::CreateRevisionUpdate(m_fbb, m_pipeline->storage().maxRevision()); - Akonadi2::FinishRevisionUpdateBuffer(m_fbb, command); - Akonadi2::Commands::write(client.socket, ++m_messageId, Akonadi2::Commands::RevisionUpdateCommand, m_fbb); - m_fbb.Clear(); -} - void Listener::sendCommandCompleted(QLocalSocket *socket, uint messageId) { if (!socket || !socket->isValid()) { @@ -349,15 +335,15 @@ void Listener::sendCommandCompleted(QLocalSocket *socket, uint messageId) m_fbb.Clear(); } -void Listener::refreshRevision() +void Listener::refreshRevision(qint64 revision) { - updateClientsWithRevision(); + updateClientsWithRevision(revision); } -void Listener::updateClientsWithRevision() +void Listener::updateClientsWithRevision(qint64 revision) { //FIXME don't send revision updates for revisions that are still being processed. - auto command = Akonadi2::CreateRevisionUpdate(m_fbb, m_pipeline->storage().maxRevision()); + auto command = Akonadi2::CreateRevisionUpdate(m_fbb, revision); Akonadi2::FinishRevisionUpdateBuffer(m_fbb, command); for (const Client &client: m_connections) { @@ -382,6 +368,8 @@ void Listener::loadResource() Log() << QString("Resource factory: %1").arg((qlonglong)resourceFactory); Log() << QString("\tResource: %1").arg((qlonglong)m_resource); m_resource->configurePipeline(m_pipeline); + connect(m_resource, &Akonadi2::Resource::revisionUpdated, + this, &Listener::refreshRevision); } else { ErrorMsg() << "Failed to load resource " << m_resourceName; } diff --git a/common/listener.h b/common/listener.h index 560f052..649c3ed 100644 --- a/common/listener.h +++ b/common/listener.h @@ -75,15 +75,14 @@ private Q_SLOTS: void checkConnections(); void onDataAvailable(); void processClientBuffers(); - void refreshRevision(); + void refreshRevision(qint64); void quit(); private: void processCommand(int commandId, uint messageId, const QByteArray &commandBuffer, Client &client, const std::function &callback); bool processClientBuffer(Client &client); - void sendCurrentRevision(Client &client); void sendCommandCompleted(QLocalSocket *socket, uint messageId); - void updateClientsWithRevision(); + void updateClientsWithRevision(qint64); void loadResource(); void readFromSocket(QLocalSocket *socket); diff --git a/common/pipeline.cpp b/common/pipeline.cpp index 31c9a52..0ebe2f3 100644 --- a/common/pipeline.cpp +++ b/common/pipeline.cpp @@ -321,7 +321,7 @@ void Pipeline::pipelineCompleted(PipelineState state) if (state.type() != NullPipeline) { //TODO what revision is finalized? - emit revisionUpdated(); + emit revisionUpdated(storage().maxRevision()); } scheduleStep(); if (d->activePipelines.isEmpty()) { diff --git a/common/pipeline.h b/common/pipeline.h index a6696ec..9c3e7a1 100644 --- a/common/pipeline.h +++ b/common/pipeline.h @@ -61,7 +61,7 @@ public: KAsync::Job deletedEntity(void const *command, size_t size); Q_SIGNALS: - void revisionUpdated(); + void revisionUpdated(qint64); void pipelinesDrained(); private Q_SLOTS: diff --git a/common/resource.cpp b/common/resource.cpp index 40ad04c..68a237c 100644 --- a/common/resource.cpp +++ b/common/resource.cpp @@ -29,7 +29,8 @@ namespace Akonadi2 { Resource::Resource() - : d(0) + : QObject(), + d(0) { } diff --git a/common/resource.h b/common/resource.h index 009050e..9f657f7 100644 --- a/common/resource.h +++ b/common/resource.h @@ -30,8 +30,9 @@ namespace Akonadi2 /** * Resource interface */ -class AKONADI2COMMON_EXPORT Resource +class AKONADI2COMMON_EXPORT Resource : public QObject { + Q_OBJECT public: Resource(); virtual ~Resource(); @@ -42,6 +43,9 @@ public: virtual void configurePipeline(Pipeline *pipeline); +Q_SIGNALS: + void revisionUpdated(qint64); + private: class Private; Private * const d; diff --git a/tests/genericresourcetest.cpp b/tests/genericresourcetest.cpp index 57fa458..0b9a5c1 100644 --- a/tests/genericresourcetest.cpp +++ b/tests/genericresourcetest.cpp @@ -90,7 +90,7 @@ private Q_SLOTS: //Actual test Akonadi2::Pipeline pipeline("org.kde.test.instance1"); - QSignalSpy revisionSpy(&pipeline, SIGNAL(revisionUpdated())); + QSignalSpy revisionSpy(&pipeline, SIGNAL(revisionUpdated(qint64))); TestResource resource("org.kde.test.instance1"); resource.configurePipeline(&pipeline); resource.processCommand(Akonadi2::Commands::CreateEntityCommand, command, &pipeline); -- cgit v1.2.3