summaryrefslogtreecommitdiffstats
path: root/common
diff options
context:
space:
mode:
Diffstat (limited to 'common')
-rw-r--r--common/genericresource.cpp17
-rw-r--r--common/genericresource.h9
-rw-r--r--common/listener.cpp8
-rw-r--r--common/listener.h1
-rw-r--r--common/resource.cpp15
-rw-r--r--common/resource.h8
6 files changed, 20 insertions, 38 deletions
diff --git a/common/genericresource.cpp b/common/genericresource.cpp
index 99d1aaa..4dd73b3 100644
--- a/common/genericresource.cpp
+++ b/common/genericresource.cpp
@@ -152,26 +152,21 @@ private:
152}; 152};
153 153
154 154
155GenericResource::GenericResource(const QByteArray &resourceInstanceIdentifier) 155GenericResource::GenericResource(const QByteArray &resourceInstanceIdentifier, const QSharedPointer<Pipeline> &pipeline)
156 : Akonadi2::Resource(), 156 : Akonadi2::Resource(),
157 mUserQueue(QStandardPaths::writableLocation(QStandardPaths::GenericDataLocation) + "/akonadi2/storage", resourceInstanceIdentifier + ".userqueue"), 157 mUserQueue(QStandardPaths::writableLocation(QStandardPaths::GenericDataLocation) + "/akonadi2/storage", resourceInstanceIdentifier + ".userqueue"),
158 mSynchronizerQueue(QStandardPaths::writableLocation(QStandardPaths::GenericDataLocation) + "/akonadi2/storage", resourceInstanceIdentifier + ".synchronizerqueue"), 158 mSynchronizerQueue(QStandardPaths::writableLocation(QStandardPaths::GenericDataLocation) + "/akonadi2/storage", resourceInstanceIdentifier + ".synchronizerqueue"),
159 mResourceInstanceIdentifier(resourceInstanceIdentifier), 159 mResourceInstanceIdentifier(resourceInstanceIdentifier),
160 mPipeline(pipeline ? pipeline : QSharedPointer<Akonadi2::Pipeline>::create(resourceInstanceIdentifier)),
160 mError(0) 161 mError(0)
161{ 162{
163 mProcessor = new Processor(mPipeline.data(), QList<MessageQueue*>() << &mUserQueue << &mSynchronizerQueue);
164 QObject::connect(mProcessor, &Processor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); });
165 QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, this, &Resource::revisionUpdated);
162} 166}
163 167
164GenericResource::~GenericResource() 168GenericResource::~GenericResource()
165{ 169{
166
167}
168
169void GenericResource::configurePipeline(Akonadi2::Pipeline *pipeline)
170{
171 //TODO figure out lifetime of the processor
172 mProcessor = new Processor(pipeline, QList<MessageQueue*>() << &mUserQueue << &mSynchronizerQueue);
173 QObject::connect(mProcessor, &Processor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); });
174 QObject::connect(pipeline, &Pipeline::revisionUpdated, this, &Resource::revisionUpdated);
175} 170}
176 171
177void GenericResource::onProcessorError(int errorCode, const QString &errorMessage) 172void GenericResource::onProcessorError(int errorCode, const QString &errorMessage)
@@ -195,7 +190,7 @@ void GenericResource::enqueueCommand(MessageQueue &mq, int commandId, const QByt
195 mq.enqueue(m_fbb.GetBufferPointer(), m_fbb.GetSize()); 190 mq.enqueue(m_fbb.GetBufferPointer(), m_fbb.GetSize());
196} 191}
197 192
198void GenericResource::processCommand(int commandId, const QByteArray &data, Akonadi2::Pipeline *pipeline) 193void GenericResource::processCommand(int commandId, const QByteArray &data)
199{ 194{
200 //TODO instead of copying the command including the full entity first into the command queue, we could directly 195 //TODO instead of copying the command including the full entity first into the command queue, we could directly
201 //create a new revision, only pushing a handle into the commandqueue with the relevant changeset (for changereplay). 196 //create a new revision, only pushing a handle into the commandqueue with the relevant changeset (for changereplay).
diff --git a/common/genericresource.h b/common/genericresource.h
index e9d5d59..4a285ea 100644
--- a/common/genericresource.h
+++ b/common/genericresource.h
@@ -22,6 +22,7 @@
22#include <akonadi2common_export.h> 22#include <akonadi2common_export.h>
23#include <resource.h> 23#include <resource.h>
24#include <messagequeue.h> 24#include <messagequeue.h>
25#include <flatbuffers/flatbuffers.h>
25 26
26class Processor; 27class Processor;
27 28
@@ -34,14 +35,13 @@ namespace Akonadi2
34class AKONADI2COMMON_EXPORT GenericResource : public Resource 35class AKONADI2COMMON_EXPORT GenericResource : public Resource
35{ 36{
36public: 37public:
37 GenericResource(const QByteArray &resourceInstanceIdentifier); 38 GenericResource(const QByteArray &resourceInstanceIdentifier, const QSharedPointer<Pipeline> &pipeline = QSharedPointer<Pipeline>());
38 virtual ~GenericResource(); 39 virtual ~GenericResource();
39 40
40 virtual void processCommand(int commandId, const QByteArray &data, Pipeline *pipeline) Q_DECL_OVERRIDE; 41 virtual void processCommand(int commandId, const QByteArray &data) Q_DECL_OVERRIDE;
41 virtual KAsync::Job<void> synchronizeWithSource(Pipeline *pipeline) Q_DECL_OVERRIDE = 0; 42 virtual KAsync::Job<void> synchronizeWithSource() Q_DECL_OVERRIDE = 0;
42 virtual KAsync::Job<void> processAllMessages() Q_DECL_OVERRIDE; 43 virtual KAsync::Job<void> processAllMessages() Q_DECL_OVERRIDE;
43 44
44 virtual void configurePipeline(Pipeline *pipeline) Q_DECL_OVERRIDE;
45 int error() const; 45 int error() const;
46 46
47protected: 47protected:
@@ -51,6 +51,7 @@ protected:
51 MessageQueue mUserQueue; 51 MessageQueue mUserQueue;
52 MessageQueue mSynchronizerQueue; 52 MessageQueue mSynchronizerQueue;
53 QByteArray mResourceInstanceIdentifier; 53 QByteArray mResourceInstanceIdentifier;
54 QSharedPointer<Pipeline> mPipeline;
54 55
55private: 56private:
56 Processor *mProcessor; 57 Processor *mProcessor;
diff --git a/common/listener.cpp b/common/listener.cpp
index 2e2e98e..8ec9b3e 100644
--- a/common/listener.cpp
+++ b/common/listener.cpp
@@ -41,7 +41,6 @@ Listener::Listener(const QByteArray &resourceInstanceIdentifier, QObject *parent
41 m_resourceName(Akonadi2::Store::resourceName(resourceInstanceIdentifier)), 41 m_resourceName(Akonadi2::Store::resourceName(resourceInstanceIdentifier)),
42 m_resourceInstanceIdentifier(resourceInstanceIdentifier), 42 m_resourceInstanceIdentifier(resourceInstanceIdentifier),
43 m_resource(0), 43 m_resource(0),
44 m_pipeline(new Akonadi2::Pipeline(resourceInstanceIdentifier, parent)),
45 m_clientBufferProcessesTimer(new QTimer(this)), 44 m_clientBufferProcessesTimer(new QTimer(this)),
46 m_messageId(0) 45 m_messageId(0)
47{ 46{
@@ -226,7 +225,7 @@ void Listener::processCommand(int commandId, uint messageId, const QByteArray &c
226 } 225 }
227 auto job = KAsync::null<void>(); 226 auto job = KAsync::null<void>();
228 if (buffer->sourceSync()) { 227 if (buffer->sourceSync()) {
229 job = m_resource->synchronizeWithSource(m_pipeline); 228 job = m_resource->synchronizeWithSource();
230 } 229 }
231 if (buffer->localSync()) { 230 if (buffer->localSync()) {
232 job = job.then<void>(m_resource->processAllMessages()); 231 job = job.then<void>(m_resource->processAllMessages());
@@ -247,7 +246,7 @@ void Listener::processCommand(int commandId, uint messageId, const QByteArray &c
247 Log() << "\tCommand id " << messageId << " of type \"" << Akonadi2::Commands::name(commandId) << "\" from " << client.name; 246 Log() << "\tCommand id " << messageId << " of type \"" << Akonadi2::Commands::name(commandId) << "\" from " << client.name;
248 loadResource(); 247 loadResource();
249 if (m_resource) { 248 if (m_resource) {
250 m_resource->processCommand(commandId, commandBuffer, m_pipeline); 249 m_resource->processCommand(commandId, commandBuffer);
251 } 250 }
252 break; 251 break;
253 case Akonadi2::Commands::ShutdownCommand: 252 case Akonadi2::Commands::ShutdownCommand:
@@ -261,7 +260,7 @@ void Listener::processCommand(int commandId, uint messageId, const QByteArray &c
261 Log() << QString("\tReceived custom command from %1: ").arg(client.name) << commandId; 260 Log() << QString("\tReceived custom command from %1: ").arg(client.name) << commandId;
262 loadResource(); 261 loadResource();
263 if (m_resource) { 262 if (m_resource) {
264 m_resource->processCommand(commandId, commandBuffer, m_pipeline); 263 m_resource->processCommand(commandId, commandBuffer);
265 } 264 }
266 } else { 265 } else {
267 Warning() << QString("\tReceived invalid command from %1: ").arg(client.name) << commandId; 266 Warning() << QString("\tReceived invalid command from %1: ").arg(client.name) << commandId;
@@ -367,7 +366,6 @@ void Listener::loadResource()
367 m_resource = resourceFactory->createResource(m_resourceInstanceIdentifier); 366 m_resource = resourceFactory->createResource(m_resourceInstanceIdentifier);
368 Log() << QString("Resource factory: %1").arg((qlonglong)resourceFactory); 367 Log() << QString("Resource factory: %1").arg((qlonglong)resourceFactory);
369 Log() << QString("\tResource: %1").arg((qlonglong)m_resource); 368 Log() << QString("\tResource: %1").arg((qlonglong)m_resource);
370 m_resource->configurePipeline(m_pipeline);
371 connect(m_resource, &Akonadi2::Resource::revisionUpdated, 369 connect(m_resource, &Akonadi2::Resource::revisionUpdated,
372 this, &Listener::refreshRevision); 370 this, &Listener::refreshRevision);
373 } else { 371 } else {
diff --git a/common/listener.h b/common/listener.h
index 649c3ed..0d19823 100644
--- a/common/listener.h
+++ b/common/listener.h
@@ -92,7 +92,6 @@ private:
92 const QByteArray m_resourceName; 92 const QByteArray m_resourceName;
93 const QByteArray m_resourceInstanceIdentifier; 93 const QByteArray m_resourceInstanceIdentifier;
94 Akonadi2::Resource *m_resource; 94 Akonadi2::Resource *m_resource;
95 Akonadi2::Pipeline *m_pipeline;
96 QTimer *m_clientBufferProcessesTimer; 95 QTimer *m_clientBufferProcessesTimer;
97 QTimer *m_checkConnectionsTimer; 96 QTimer *m_checkConnectionsTimer;
98 int m_messageId; 97 int m_messageId;
diff --git a/common/resource.cpp b/common/resource.cpp
index 68a237c..2a86df5 100644
--- a/common/resource.cpp
+++ b/common/resource.cpp
@@ -40,24 +40,15 @@ Resource::~Resource()
40 //delete d; 40 //delete d;
41} 41}
42 42
43void Resource::configurePipeline(Pipeline *pipeline) 43void Resource::processCommand(int commandId, const QByteArray &data)
44{
45
46}
47
48void Resource::processCommand(int commandId, const QByteArray &data, Pipeline *pipeline)
49{ 44{
50 Q_UNUSED(commandId) 45 Q_UNUSED(commandId)
51 Q_UNUSED(data) 46 Q_UNUSED(data)
52 Q_UNUSED(pipeline)
53 pipeline->null();
54} 47}
55 48
56KAsync::Job<void> Resource::synchronizeWithSource(Pipeline *pipeline) 49KAsync::Job<void> Resource::synchronizeWithSource()
57{ 50{
58 return KAsync::start<void>([pipeline](KAsync::Future<void> &f) { 51 return KAsync::null<void>();
59 pipeline->null();
60 });
61} 52}
62 53
63KAsync::Job<void> Resource::processAllMessages() 54KAsync::Job<void> Resource::processAllMessages()
diff --git a/common/resource.h b/common/resource.h
index 9f657f7..a51e12d 100644
--- a/common/resource.h
+++ b/common/resource.h
@@ -20,12 +20,12 @@
20 20
21#include <akonadi2common_export.h> 21#include <akonadi2common_export.h>
22#include <clientapi.h> 22#include <clientapi.h>
23#include <pipeline.h>
24 23
25#include <Async/Async> 24#include <Async/Async>
26 25
27namespace Akonadi2 26namespace Akonadi2
28{ 27{
28class Pipeline;
29 29
30/** 30/**
31 * Resource interface 31 * Resource interface
@@ -37,12 +37,10 @@ public:
37 Resource(); 37 Resource();
38 virtual ~Resource(); 38 virtual ~Resource();
39 39
40 virtual void processCommand(int commandId, const QByteArray &data, Pipeline *pipeline); 40 virtual void processCommand(int commandId, const QByteArray &data);
41 virtual KAsync::Job<void> synchronizeWithSource(Pipeline *pipeline); 41 virtual KAsync::Job<void> synchronizeWithSource();
42 virtual KAsync::Job<void> processAllMessages(); 42 virtual KAsync::Job<void> processAllMessages();
43 43
44 virtual void configurePipeline(Pipeline *pipeline);
45
46Q_SIGNALS: 44Q_SIGNALS:
47 void revisionUpdated(qint64); 45 void revisionUpdated(qint64);
48 46