diff options
author | Christian Mollekopf <chrigi_1@fastmail.fm> | 2015-07-28 21:02:58 +0200 |
---|---|---|
committer | Christian Mollekopf <chrigi_1@fastmail.fm> | 2015-07-28 21:02:58 +0200 |
commit | 07572b25af45c41a82eb8ddfdecf18e58958788b (patch) | |
tree | 2ecb98a9fbe7c7c0244d7511e543beff8df68c3a | |
parent | dd86c15b48f33c120c510327569fb1cc3ffa3d45 (diff) | |
download | sink-07572b25af45c41a82eb8ddfdecf18e58958788b.tar.gz sink-07572b25af45c41a82eb8ddfdecf18e58958788b.zip |
Forward revision updates through resource
-rw-r--r-- | common/genericresource.cpp | 1 | ||||
-rw-r--r-- | common/listener.cpp | 24 | ||||
-rw-r--r-- | common/listener.h | 5 | ||||
-rw-r--r-- | common/pipeline.cpp | 2 | ||||
-rw-r--r-- | common/pipeline.h | 2 | ||||
-rw-r--r-- | common/resource.cpp | 3 | ||||
-rw-r--r-- | common/resource.h | 6 | ||||
-rw-r--r-- | tests/genericresourcetest.cpp | 2 |
8 files changed, 19 insertions, 26 deletions
diff --git a/common/genericresource.cpp b/common/genericresource.cpp index ae06ef4..99d1aaa 100644 --- a/common/genericresource.cpp +++ b/common/genericresource.cpp | |||
@@ -171,6 +171,7 @@ void GenericResource::configurePipeline(Akonadi2::Pipeline *pipeline) | |||
171 | //TODO figure out lifetime of the processor | 171 | //TODO figure out lifetime of the processor |
172 | mProcessor = new Processor(pipeline, QList<MessageQueue*>() << &mUserQueue << &mSynchronizerQueue); | 172 | mProcessor = new Processor(pipeline, QList<MessageQueue*>() << &mUserQueue << &mSynchronizerQueue); |
173 | QObject::connect(mProcessor, &Processor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); }); | 173 | QObject::connect(mProcessor, &Processor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); }); |
174 | QObject::connect(pipeline, &Pipeline::revisionUpdated, this, &Resource::revisionUpdated); | ||
174 | } | 175 | } |
175 | 176 | ||
176 | void GenericResource::onProcessorError(int errorCode, const QString &errorMessage) | 177 | void GenericResource::onProcessorError(int errorCode, const QString &errorMessage) |
diff --git a/common/listener.cpp b/common/listener.cpp index 4316c63..2e2e98e 100644 --- a/common/listener.cpp +++ b/common/listener.cpp | |||
@@ -45,8 +45,6 @@ Listener::Listener(const QByteArray &resourceInstanceIdentifier, QObject *parent | |||
45 | m_clientBufferProcessesTimer(new QTimer(this)), | 45 | m_clientBufferProcessesTimer(new QTimer(this)), |
46 | m_messageId(0) | 46 | m_messageId(0) |
47 | { | 47 | { |
48 | connect(m_pipeline, &Akonadi2::Pipeline::revisionUpdated, | ||
49 | this, &Listener::refreshRevision); | ||
50 | connect(m_server, &QLocalServer::newConnection, | 48 | connect(m_server, &QLocalServer::newConnection, |
51 | this, &Listener::acceptConnection); | 49 | this, &Listener::acceptConnection); |
52 | Trace() << "Trying to open " << m_resourceInstanceIdentifier; | 50 | Trace() << "Trying to open " << m_resourceInstanceIdentifier; |
@@ -325,18 +323,6 @@ bool Listener::processClientBuffer(Client &client) | |||
325 | return false; | 323 | return false; |
326 | } | 324 | } |
327 | 325 | ||
328 | void Listener::sendCurrentRevision(Client &client) | ||
329 | { | ||
330 | if (!client.socket || !client.socket->isValid()) { | ||
331 | return; | ||
332 | } | ||
333 | |||
334 | auto command = Akonadi2::CreateRevisionUpdate(m_fbb, m_pipeline->storage().maxRevision()); | ||
335 | Akonadi2::FinishRevisionUpdateBuffer(m_fbb, command); | ||
336 | Akonadi2::Commands::write(client.socket, ++m_messageId, Akonadi2::Commands::RevisionUpdateCommand, m_fbb); | ||
337 | m_fbb.Clear(); | ||
338 | } | ||
339 | |||
340 | void Listener::sendCommandCompleted(QLocalSocket *socket, uint messageId) | 326 | void Listener::sendCommandCompleted(QLocalSocket *socket, uint messageId) |
341 | { | 327 | { |
342 | if (!socket || !socket->isValid()) { | 328 | if (!socket || !socket->isValid()) { |
@@ -349,15 +335,15 @@ void Listener::sendCommandCompleted(QLocalSocket *socket, uint messageId) | |||
349 | m_fbb.Clear(); | 335 | m_fbb.Clear(); |
350 | } | 336 | } |
351 | 337 | ||
352 | void Listener::refreshRevision() | 338 | void Listener::refreshRevision(qint64 revision) |
353 | { | 339 | { |
354 | updateClientsWithRevision(); | 340 | updateClientsWithRevision(revision); |
355 | } | 341 | } |
356 | 342 | ||
357 | void Listener::updateClientsWithRevision() | 343 | void Listener::updateClientsWithRevision(qint64 revision) |
358 | { | 344 | { |
359 | //FIXME don't send revision updates for revisions that are still being processed. | 345 | //FIXME don't send revision updates for revisions that are still being processed. |
360 | auto command = Akonadi2::CreateRevisionUpdate(m_fbb, m_pipeline->storage().maxRevision()); | 346 | auto command = Akonadi2::CreateRevisionUpdate(m_fbb, revision); |
361 | Akonadi2::FinishRevisionUpdateBuffer(m_fbb, command); | 347 | Akonadi2::FinishRevisionUpdateBuffer(m_fbb, command); |
362 | 348 | ||
363 | for (const Client &client: m_connections) { | 349 | for (const Client &client: m_connections) { |
@@ -382,6 +368,8 @@ void Listener::loadResource() | |||
382 | Log() << QString("Resource factory: %1").arg((qlonglong)resourceFactory); | 368 | Log() << QString("Resource factory: %1").arg((qlonglong)resourceFactory); |
383 | Log() << QString("\tResource: %1").arg((qlonglong)m_resource); | 369 | Log() << QString("\tResource: %1").arg((qlonglong)m_resource); |
384 | m_resource->configurePipeline(m_pipeline); | 370 | m_resource->configurePipeline(m_pipeline); |
371 | connect(m_resource, &Akonadi2::Resource::revisionUpdated, | ||
372 | this, &Listener::refreshRevision); | ||
385 | } else { | 373 | } else { |
386 | ErrorMsg() << "Failed to load resource " << m_resourceName; | 374 | ErrorMsg() << "Failed to load resource " << m_resourceName; |
387 | } | 375 | } |
diff --git a/common/listener.h b/common/listener.h index 560f052..649c3ed 100644 --- a/common/listener.h +++ b/common/listener.h | |||
@@ -75,15 +75,14 @@ private Q_SLOTS: | |||
75 | void checkConnections(); | 75 | void checkConnections(); |
76 | void onDataAvailable(); | 76 | void onDataAvailable(); |
77 | void processClientBuffers(); | 77 | void processClientBuffers(); |
78 | void refreshRevision(); | 78 | void refreshRevision(qint64); |
79 | void quit(); | 79 | void quit(); |
80 | 80 | ||
81 | private: | 81 | private: |
82 | void processCommand(int commandId, uint messageId, const QByteArray &commandBuffer, Client &client, const std::function<void()> &callback); | 82 | void processCommand(int commandId, uint messageId, const QByteArray &commandBuffer, Client &client, const std::function<void()> &callback); |
83 | bool processClientBuffer(Client &client); | 83 | bool processClientBuffer(Client &client); |
84 | void sendCurrentRevision(Client &client); | ||
85 | void sendCommandCompleted(QLocalSocket *socket, uint messageId); | 84 | void sendCommandCompleted(QLocalSocket *socket, uint messageId); |
86 | void updateClientsWithRevision(); | 85 | void updateClientsWithRevision(qint64); |
87 | void loadResource(); | 86 | void loadResource(); |
88 | void readFromSocket(QLocalSocket *socket); | 87 | void readFromSocket(QLocalSocket *socket); |
89 | 88 | ||
diff --git a/common/pipeline.cpp b/common/pipeline.cpp index 31c9a52..0ebe2f3 100644 --- a/common/pipeline.cpp +++ b/common/pipeline.cpp | |||
@@ -321,7 +321,7 @@ void Pipeline::pipelineCompleted(PipelineState state) | |||
321 | 321 | ||
322 | if (state.type() != NullPipeline) { | 322 | if (state.type() != NullPipeline) { |
323 | //TODO what revision is finalized? | 323 | //TODO what revision is finalized? |
324 | emit revisionUpdated(); | 324 | emit revisionUpdated(storage().maxRevision()); |
325 | } | 325 | } |
326 | scheduleStep(); | 326 | scheduleStep(); |
327 | if (d->activePipelines.isEmpty()) { | 327 | if (d->activePipelines.isEmpty()) { |
diff --git a/common/pipeline.h b/common/pipeline.h index a6696ec..9c3e7a1 100644 --- a/common/pipeline.h +++ b/common/pipeline.h | |||
@@ -61,7 +61,7 @@ public: | |||
61 | KAsync::Job<void> deletedEntity(void const *command, size_t size); | 61 | KAsync::Job<void> deletedEntity(void const *command, size_t size); |
62 | 62 | ||
63 | Q_SIGNALS: | 63 | Q_SIGNALS: |
64 | void revisionUpdated(); | 64 | void revisionUpdated(qint64); |
65 | void pipelinesDrained(); | 65 | void pipelinesDrained(); |
66 | 66 | ||
67 | private Q_SLOTS: | 67 | private Q_SLOTS: |
diff --git a/common/resource.cpp b/common/resource.cpp index 40ad04c..68a237c 100644 --- a/common/resource.cpp +++ b/common/resource.cpp | |||
@@ -29,7 +29,8 @@ namespace Akonadi2 | |||
29 | { | 29 | { |
30 | 30 | ||
31 | Resource::Resource() | 31 | Resource::Resource() |
32 | : d(0) | 32 | : QObject(), |
33 | d(0) | ||
33 | { | 34 | { |
34 | 35 | ||
35 | } | 36 | } |
diff --git a/common/resource.h b/common/resource.h index 009050e..9f657f7 100644 --- a/common/resource.h +++ b/common/resource.h | |||
@@ -30,8 +30,9 @@ namespace Akonadi2 | |||
30 | /** | 30 | /** |
31 | * Resource interface | 31 | * Resource interface |
32 | */ | 32 | */ |
33 | class AKONADI2COMMON_EXPORT Resource | 33 | class AKONADI2COMMON_EXPORT Resource : public QObject |
34 | { | 34 | { |
35 | Q_OBJECT | ||
35 | public: | 36 | public: |
36 | Resource(); | 37 | Resource(); |
37 | virtual ~Resource(); | 38 | virtual ~Resource(); |
@@ -42,6 +43,9 @@ public: | |||
42 | 43 | ||
43 | virtual void configurePipeline(Pipeline *pipeline); | 44 | virtual void configurePipeline(Pipeline *pipeline); |
44 | 45 | ||
46 | Q_SIGNALS: | ||
47 | void revisionUpdated(qint64); | ||
48 | |||
45 | private: | 49 | private: |
46 | class Private; | 50 | class Private; |
47 | Private * const d; | 51 | Private * const d; |
diff --git a/tests/genericresourcetest.cpp b/tests/genericresourcetest.cpp index 57fa458..0b9a5c1 100644 --- a/tests/genericresourcetest.cpp +++ b/tests/genericresourcetest.cpp | |||
@@ -90,7 +90,7 @@ private Q_SLOTS: | |||
90 | 90 | ||
91 | //Actual test | 91 | //Actual test |
92 | Akonadi2::Pipeline pipeline("org.kde.test.instance1"); | 92 | Akonadi2::Pipeline pipeline("org.kde.test.instance1"); |
93 | QSignalSpy revisionSpy(&pipeline, SIGNAL(revisionUpdated())); | 93 | QSignalSpy revisionSpy(&pipeline, SIGNAL(revisionUpdated(qint64))); |
94 | TestResource resource("org.kde.test.instance1"); | 94 | TestResource resource("org.kde.test.instance1"); |
95 | resource.configurePipeline(&pipeline); | 95 | resource.configurePipeline(&pipeline); |
96 | resource.processCommand(Akonadi2::Commands::CreateEntityCommand, command, &pipeline); | 96 | resource.processCommand(Akonadi2::Commands::CreateEntityCommand, command, &pipeline); |