summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2015-07-28 21:02:58 +0200
committerChristian Mollekopf <chrigi_1@fastmail.fm>2015-07-28 21:02:58 +0200
commit07572b25af45c41a82eb8ddfdecf18e58958788b (patch)
tree2ecb98a9fbe7c7c0244d7511e543beff8df68c3a
parentdd86c15b48f33c120c510327569fb1cc3ffa3d45 (diff)
downloadsink-07572b25af45c41a82eb8ddfdecf18e58958788b.tar.gz
sink-07572b25af45c41a82eb8ddfdecf18e58958788b.zip
Forward revision updates through resource
-rw-r--r--common/genericresource.cpp1
-rw-r--r--common/listener.cpp24
-rw-r--r--common/listener.h5
-rw-r--r--common/pipeline.cpp2
-rw-r--r--common/pipeline.h2
-rw-r--r--common/resource.cpp3
-rw-r--r--common/resource.h6
-rw-r--r--tests/genericresourcetest.cpp2
8 files changed, 19 insertions, 26 deletions
diff --git a/common/genericresource.cpp b/common/genericresource.cpp
index ae06ef4..99d1aaa 100644
--- a/common/genericresource.cpp
+++ b/common/genericresource.cpp
@@ -171,6 +171,7 @@ void GenericResource::configurePipeline(Akonadi2::Pipeline *pipeline)
171 //TODO figure out lifetime of the processor 171 //TODO figure out lifetime of the processor
172 mProcessor = new Processor(pipeline, QList<MessageQueue*>() << &mUserQueue << &mSynchronizerQueue); 172 mProcessor = new Processor(pipeline, QList<MessageQueue*>() << &mUserQueue << &mSynchronizerQueue);
173 QObject::connect(mProcessor, &Processor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); }); 173 QObject::connect(mProcessor, &Processor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); });
174 QObject::connect(pipeline, &Pipeline::revisionUpdated, this, &Resource::revisionUpdated);
174} 175}
175 176
176void GenericResource::onProcessorError(int errorCode, const QString &errorMessage) 177void GenericResource::onProcessorError(int errorCode, const QString &errorMessage)
diff --git a/common/listener.cpp b/common/listener.cpp
index 4316c63..2e2e98e 100644
--- a/common/listener.cpp
+++ b/common/listener.cpp
@@ -45,8 +45,6 @@ Listener::Listener(const QByteArray &resourceInstanceIdentifier, QObject *parent
45 m_clientBufferProcessesTimer(new QTimer(this)), 45 m_clientBufferProcessesTimer(new QTimer(this)),
46 m_messageId(0) 46 m_messageId(0)
47{ 47{
48 connect(m_pipeline, &Akonadi2::Pipeline::revisionUpdated,
49 this, &Listener::refreshRevision);
50 connect(m_server, &QLocalServer::newConnection, 48 connect(m_server, &QLocalServer::newConnection,
51 this, &Listener::acceptConnection); 49 this, &Listener::acceptConnection);
52 Trace() << "Trying to open " << m_resourceInstanceIdentifier; 50 Trace() << "Trying to open " << m_resourceInstanceIdentifier;
@@ -325,18 +323,6 @@ bool Listener::processClientBuffer(Client &client)
325 return false; 323 return false;
326} 324}
327 325
328void Listener::sendCurrentRevision(Client &client)
329{
330 if (!client.socket || !client.socket->isValid()) {
331 return;
332 }
333
334 auto command = Akonadi2::CreateRevisionUpdate(m_fbb, m_pipeline->storage().maxRevision());
335 Akonadi2::FinishRevisionUpdateBuffer(m_fbb, command);
336 Akonadi2::Commands::write(client.socket, ++m_messageId, Akonadi2::Commands::RevisionUpdateCommand, m_fbb);
337 m_fbb.Clear();
338}
339
340void Listener::sendCommandCompleted(QLocalSocket *socket, uint messageId) 326void Listener::sendCommandCompleted(QLocalSocket *socket, uint messageId)
341{ 327{
342 if (!socket || !socket->isValid()) { 328 if (!socket || !socket->isValid()) {
@@ -349,15 +335,15 @@ void Listener::sendCommandCompleted(QLocalSocket *socket, uint messageId)
349 m_fbb.Clear(); 335 m_fbb.Clear();
350} 336}
351 337
352void Listener::refreshRevision() 338void Listener::refreshRevision(qint64 revision)
353{ 339{
354 updateClientsWithRevision(); 340 updateClientsWithRevision(revision);
355} 341}
356 342
357void Listener::updateClientsWithRevision() 343void Listener::updateClientsWithRevision(qint64 revision)
358{ 344{
359 //FIXME don't send revision updates for revisions that are still being processed. 345 //FIXME don't send revision updates for revisions that are still being processed.
360 auto command = Akonadi2::CreateRevisionUpdate(m_fbb, m_pipeline->storage().maxRevision()); 346 auto command = Akonadi2::CreateRevisionUpdate(m_fbb, revision);
361 Akonadi2::FinishRevisionUpdateBuffer(m_fbb, command); 347 Akonadi2::FinishRevisionUpdateBuffer(m_fbb, command);
362 348
363 for (const Client &client: m_connections) { 349 for (const Client &client: m_connections) {
@@ -382,6 +368,8 @@ void Listener::loadResource()
382 Log() << QString("Resource factory: %1").arg((qlonglong)resourceFactory); 368 Log() << QString("Resource factory: %1").arg((qlonglong)resourceFactory);
383 Log() << QString("\tResource: %1").arg((qlonglong)m_resource); 369 Log() << QString("\tResource: %1").arg((qlonglong)m_resource);
384 m_resource->configurePipeline(m_pipeline); 370 m_resource->configurePipeline(m_pipeline);
371 connect(m_resource, &Akonadi2::Resource::revisionUpdated,
372 this, &Listener::refreshRevision);
385 } else { 373 } else {
386 ErrorMsg() << "Failed to load resource " << m_resourceName; 374 ErrorMsg() << "Failed to load resource " << m_resourceName;
387 } 375 }
diff --git a/common/listener.h b/common/listener.h
index 560f052..649c3ed 100644
--- a/common/listener.h
+++ b/common/listener.h
@@ -75,15 +75,14 @@ private Q_SLOTS:
75 void checkConnections(); 75 void checkConnections();
76 void onDataAvailable(); 76 void onDataAvailable();
77 void processClientBuffers(); 77 void processClientBuffers();
78 void refreshRevision(); 78 void refreshRevision(qint64);
79 void quit(); 79 void quit();
80 80
81private: 81private:
82 void processCommand(int commandId, uint messageId, const QByteArray &commandBuffer, Client &client, const std::function<void()> &callback); 82 void processCommand(int commandId, uint messageId, const QByteArray &commandBuffer, Client &client, const std::function<void()> &callback);
83 bool processClientBuffer(Client &client); 83 bool processClientBuffer(Client &client);
84 void sendCurrentRevision(Client &client);
85 void sendCommandCompleted(QLocalSocket *socket, uint messageId); 84 void sendCommandCompleted(QLocalSocket *socket, uint messageId);
86 void updateClientsWithRevision(); 85 void updateClientsWithRevision(qint64);
87 void loadResource(); 86 void loadResource();
88 void readFromSocket(QLocalSocket *socket); 87 void readFromSocket(QLocalSocket *socket);
89 88
diff --git a/common/pipeline.cpp b/common/pipeline.cpp
index 31c9a52..0ebe2f3 100644
--- a/common/pipeline.cpp
+++ b/common/pipeline.cpp
@@ -321,7 +321,7 @@ void Pipeline::pipelineCompleted(PipelineState state)
321 321
322 if (state.type() != NullPipeline) { 322 if (state.type() != NullPipeline) {
323 //TODO what revision is finalized? 323 //TODO what revision is finalized?
324 emit revisionUpdated(); 324 emit revisionUpdated(storage().maxRevision());
325 } 325 }
326 scheduleStep(); 326 scheduleStep();
327 if (d->activePipelines.isEmpty()) { 327 if (d->activePipelines.isEmpty()) {
diff --git a/common/pipeline.h b/common/pipeline.h
index a6696ec..9c3e7a1 100644
--- a/common/pipeline.h
+++ b/common/pipeline.h
@@ -61,7 +61,7 @@ public:
61 KAsync::Job<void> deletedEntity(void const *command, size_t size); 61 KAsync::Job<void> deletedEntity(void const *command, size_t size);
62 62
63Q_SIGNALS: 63Q_SIGNALS:
64 void revisionUpdated(); 64 void revisionUpdated(qint64);
65 void pipelinesDrained(); 65 void pipelinesDrained();
66 66
67private Q_SLOTS: 67private Q_SLOTS:
diff --git a/common/resource.cpp b/common/resource.cpp
index 40ad04c..68a237c 100644
--- a/common/resource.cpp
+++ b/common/resource.cpp
@@ -29,7 +29,8 @@ namespace Akonadi2
29{ 29{
30 30
31Resource::Resource() 31Resource::Resource()
32 : d(0) 32 : QObject(),
33 d(0)
33{ 34{
34 35
35} 36}
diff --git a/common/resource.h b/common/resource.h
index 009050e..9f657f7 100644
--- a/common/resource.h
+++ b/common/resource.h
@@ -30,8 +30,9 @@ namespace Akonadi2
30/** 30/**
31 * Resource interface 31 * Resource interface
32 */ 32 */
33class AKONADI2COMMON_EXPORT Resource 33class AKONADI2COMMON_EXPORT Resource : public QObject
34{ 34{
35 Q_OBJECT
35public: 36public:
36 Resource(); 37 Resource();
37 virtual ~Resource(); 38 virtual ~Resource();
@@ -42,6 +43,9 @@ public:
42 43
43 virtual void configurePipeline(Pipeline *pipeline); 44 virtual void configurePipeline(Pipeline *pipeline);
44 45
46Q_SIGNALS:
47 void revisionUpdated(qint64);
48
45private: 49private:
46 class Private; 50 class Private;
47 Private * const d; 51 Private * const d;
diff --git a/tests/genericresourcetest.cpp b/tests/genericresourcetest.cpp
index 57fa458..0b9a5c1 100644
--- a/tests/genericresourcetest.cpp
+++ b/tests/genericresourcetest.cpp
@@ -90,7 +90,7 @@ private Q_SLOTS:
90 90
91 //Actual test 91 //Actual test
92 Akonadi2::Pipeline pipeline("org.kde.test.instance1"); 92 Akonadi2::Pipeline pipeline("org.kde.test.instance1");
93 QSignalSpy revisionSpy(&pipeline, SIGNAL(revisionUpdated())); 93 QSignalSpy revisionSpy(&pipeline, SIGNAL(revisionUpdated(qint64)));
94 TestResource resource("org.kde.test.instance1"); 94 TestResource resource("org.kde.test.instance1");
95 resource.configurePipeline(&pipeline); 95 resource.configurePipeline(&pipeline);
96 resource.processCommand(Akonadi2::Commands::CreateEntityCommand, command, &pipeline); 96 resource.processCommand(Akonadi2::Commands::CreateEntityCommand, command, &pipeline);