diff options
Diffstat (limited to 'common')
-rw-r--r-- | common/genericresource.cpp | 17 | ||||
-rw-r--r-- | common/genericresource.h | 9 | ||||
-rw-r--r-- | common/listener.cpp | 8 | ||||
-rw-r--r-- | common/listener.h | 1 | ||||
-rw-r--r-- | common/resource.cpp | 15 | ||||
-rw-r--r-- | common/resource.h | 8 |
6 files changed, 20 insertions, 38 deletions
diff --git a/common/genericresource.cpp b/common/genericresource.cpp index 99d1aaa..4dd73b3 100644 --- a/common/genericresource.cpp +++ b/common/genericresource.cpp | |||
@@ -152,26 +152,21 @@ private: | |||
152 | }; | 152 | }; |
153 | 153 | ||
154 | 154 | ||
155 | GenericResource::GenericResource(const QByteArray &resourceInstanceIdentifier) | 155 | GenericResource::GenericResource(const QByteArray &resourceInstanceIdentifier, const QSharedPointer<Pipeline> &pipeline) |
156 | : Akonadi2::Resource(), | 156 | : Akonadi2::Resource(), |
157 | mUserQueue(QStandardPaths::writableLocation(QStandardPaths::GenericDataLocation) + "/akonadi2/storage", resourceInstanceIdentifier + ".userqueue"), | 157 | mUserQueue(QStandardPaths::writableLocation(QStandardPaths::GenericDataLocation) + "/akonadi2/storage", resourceInstanceIdentifier + ".userqueue"), |
158 | mSynchronizerQueue(QStandardPaths::writableLocation(QStandardPaths::GenericDataLocation) + "/akonadi2/storage", resourceInstanceIdentifier + ".synchronizerqueue"), | 158 | mSynchronizerQueue(QStandardPaths::writableLocation(QStandardPaths::GenericDataLocation) + "/akonadi2/storage", resourceInstanceIdentifier + ".synchronizerqueue"), |
159 | mResourceInstanceIdentifier(resourceInstanceIdentifier), | 159 | mResourceInstanceIdentifier(resourceInstanceIdentifier), |
160 | mPipeline(pipeline ? pipeline : QSharedPointer<Akonadi2::Pipeline>::create(resourceInstanceIdentifier)), | ||
160 | mError(0) | 161 | mError(0) |
161 | { | 162 | { |
163 | mProcessor = new Processor(mPipeline.data(), QList<MessageQueue*>() << &mUserQueue << &mSynchronizerQueue); | ||
164 | QObject::connect(mProcessor, &Processor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); }); | ||
165 | QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, this, &Resource::revisionUpdated); | ||
162 | } | 166 | } |
163 | 167 | ||
164 | GenericResource::~GenericResource() | 168 | GenericResource::~GenericResource() |
165 | { | 169 | { |
166 | |||
167 | } | ||
168 | |||
169 | void GenericResource::configurePipeline(Akonadi2::Pipeline *pipeline) | ||
170 | { | ||
171 | //TODO figure out lifetime of the processor | ||
172 | mProcessor = new Processor(pipeline, QList<MessageQueue*>() << &mUserQueue << &mSynchronizerQueue); | ||
173 | QObject::connect(mProcessor, &Processor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); }); | ||
174 | QObject::connect(pipeline, &Pipeline::revisionUpdated, this, &Resource::revisionUpdated); | ||
175 | } | 170 | } |
176 | 171 | ||
177 | void GenericResource::onProcessorError(int errorCode, const QString &errorMessage) | 172 | void GenericResource::onProcessorError(int errorCode, const QString &errorMessage) |
@@ -195,7 +190,7 @@ void GenericResource::enqueueCommand(MessageQueue &mq, int commandId, const QByt | |||
195 | mq.enqueue(m_fbb.GetBufferPointer(), m_fbb.GetSize()); | 190 | mq.enqueue(m_fbb.GetBufferPointer(), m_fbb.GetSize()); |
196 | } | 191 | } |
197 | 192 | ||
198 | void GenericResource::processCommand(int commandId, const QByteArray &data, Akonadi2::Pipeline *pipeline) | 193 | void GenericResource::processCommand(int commandId, const QByteArray &data) |
199 | { | 194 | { |
200 | //TODO instead of copying the command including the full entity first into the command queue, we could directly | 195 | //TODO instead of copying the command including the full entity first into the command queue, we could directly |
201 | //create a new revision, only pushing a handle into the commandqueue with the relevant changeset (for changereplay). | 196 | //create a new revision, only pushing a handle into the commandqueue with the relevant changeset (for changereplay). |
diff --git a/common/genericresource.h b/common/genericresource.h index e9d5d59..4a285ea 100644 --- a/common/genericresource.h +++ b/common/genericresource.h | |||
@@ -22,6 +22,7 @@ | |||
22 | #include <akonadi2common_export.h> | 22 | #include <akonadi2common_export.h> |
23 | #include <resource.h> | 23 | #include <resource.h> |
24 | #include <messagequeue.h> | 24 | #include <messagequeue.h> |
25 | #include <flatbuffers/flatbuffers.h> | ||
25 | 26 | ||
26 | class Processor; | 27 | class Processor; |
27 | 28 | ||
@@ -34,14 +35,13 @@ namespace Akonadi2 | |||
34 | class AKONADI2COMMON_EXPORT GenericResource : public Resource | 35 | class AKONADI2COMMON_EXPORT GenericResource : public Resource |
35 | { | 36 | { |
36 | public: | 37 | public: |
37 | GenericResource(const QByteArray &resourceInstanceIdentifier); | 38 | GenericResource(const QByteArray &resourceInstanceIdentifier, const QSharedPointer<Pipeline> &pipeline = QSharedPointer<Pipeline>()); |
38 | virtual ~GenericResource(); | 39 | virtual ~GenericResource(); |
39 | 40 | ||
40 | virtual void processCommand(int commandId, const QByteArray &data, Pipeline *pipeline) Q_DECL_OVERRIDE; | 41 | virtual void processCommand(int commandId, const QByteArray &data) Q_DECL_OVERRIDE; |
41 | virtual KAsync::Job<void> synchronizeWithSource(Pipeline *pipeline) Q_DECL_OVERRIDE = 0; | 42 | virtual KAsync::Job<void> synchronizeWithSource() Q_DECL_OVERRIDE = 0; |
42 | virtual KAsync::Job<void> processAllMessages() Q_DECL_OVERRIDE; | 43 | virtual KAsync::Job<void> processAllMessages() Q_DECL_OVERRIDE; |
43 | 44 | ||
44 | virtual void configurePipeline(Pipeline *pipeline) Q_DECL_OVERRIDE; | ||
45 | int error() const; | 45 | int error() const; |
46 | 46 | ||
47 | protected: | 47 | protected: |
@@ -51,6 +51,7 @@ protected: | |||
51 | MessageQueue mUserQueue; | 51 | MessageQueue mUserQueue; |
52 | MessageQueue mSynchronizerQueue; | 52 | MessageQueue mSynchronizerQueue; |
53 | QByteArray mResourceInstanceIdentifier; | 53 | QByteArray mResourceInstanceIdentifier; |
54 | QSharedPointer<Pipeline> mPipeline; | ||
54 | 55 | ||
55 | private: | 56 | private: |
56 | Processor *mProcessor; | 57 | Processor *mProcessor; |
diff --git a/common/listener.cpp b/common/listener.cpp index 2e2e98e..8ec9b3e 100644 --- a/common/listener.cpp +++ b/common/listener.cpp | |||
@@ -41,7 +41,6 @@ Listener::Listener(const QByteArray &resourceInstanceIdentifier, QObject *parent | |||
41 | m_resourceName(Akonadi2::Store::resourceName(resourceInstanceIdentifier)), | 41 | m_resourceName(Akonadi2::Store::resourceName(resourceInstanceIdentifier)), |
42 | m_resourceInstanceIdentifier(resourceInstanceIdentifier), | 42 | m_resourceInstanceIdentifier(resourceInstanceIdentifier), |
43 | m_resource(0), | 43 | m_resource(0), |
44 | m_pipeline(new Akonadi2::Pipeline(resourceInstanceIdentifier, parent)), | ||
45 | m_clientBufferProcessesTimer(new QTimer(this)), | 44 | m_clientBufferProcessesTimer(new QTimer(this)), |
46 | m_messageId(0) | 45 | m_messageId(0) |
47 | { | 46 | { |
@@ -226,7 +225,7 @@ void Listener::processCommand(int commandId, uint messageId, const QByteArray &c | |||
226 | } | 225 | } |
227 | auto job = KAsync::null<void>(); | 226 | auto job = KAsync::null<void>(); |
228 | if (buffer->sourceSync()) { | 227 | if (buffer->sourceSync()) { |
229 | job = m_resource->synchronizeWithSource(m_pipeline); | 228 | job = m_resource->synchronizeWithSource(); |
230 | } | 229 | } |
231 | if (buffer->localSync()) { | 230 | if (buffer->localSync()) { |
232 | job = job.then<void>(m_resource->processAllMessages()); | 231 | job = job.then<void>(m_resource->processAllMessages()); |
@@ -247,7 +246,7 @@ void Listener::processCommand(int commandId, uint messageId, const QByteArray &c | |||
247 | Log() << "\tCommand id " << messageId << " of type \"" << Akonadi2::Commands::name(commandId) << "\" from " << client.name; | 246 | Log() << "\tCommand id " << messageId << " of type \"" << Akonadi2::Commands::name(commandId) << "\" from " << client.name; |
248 | loadResource(); | 247 | loadResource(); |
249 | if (m_resource) { | 248 | if (m_resource) { |
250 | m_resource->processCommand(commandId, commandBuffer, m_pipeline); | 249 | m_resource->processCommand(commandId, commandBuffer); |
251 | } | 250 | } |
252 | break; | 251 | break; |
253 | case Akonadi2::Commands::ShutdownCommand: | 252 | case Akonadi2::Commands::ShutdownCommand: |
@@ -261,7 +260,7 @@ void Listener::processCommand(int commandId, uint messageId, const QByteArray &c | |||
261 | Log() << QString("\tReceived custom command from %1: ").arg(client.name) << commandId; | 260 | Log() << QString("\tReceived custom command from %1: ").arg(client.name) << commandId; |
262 | loadResource(); | 261 | loadResource(); |
263 | if (m_resource) { | 262 | if (m_resource) { |
264 | m_resource->processCommand(commandId, commandBuffer, m_pipeline); | 263 | m_resource->processCommand(commandId, commandBuffer); |
265 | } | 264 | } |
266 | } else { | 265 | } else { |
267 | Warning() << QString("\tReceived invalid command from %1: ").arg(client.name) << commandId; | 266 | Warning() << QString("\tReceived invalid command from %1: ").arg(client.name) << commandId; |
@@ -367,7 +366,6 @@ void Listener::loadResource() | |||
367 | m_resource = resourceFactory->createResource(m_resourceInstanceIdentifier); | 366 | m_resource = resourceFactory->createResource(m_resourceInstanceIdentifier); |
368 | Log() << QString("Resource factory: %1").arg((qlonglong)resourceFactory); | 367 | Log() << QString("Resource factory: %1").arg((qlonglong)resourceFactory); |
369 | Log() << QString("\tResource: %1").arg((qlonglong)m_resource); | 368 | Log() << QString("\tResource: %1").arg((qlonglong)m_resource); |
370 | m_resource->configurePipeline(m_pipeline); | ||
371 | connect(m_resource, &Akonadi2::Resource::revisionUpdated, | 369 | connect(m_resource, &Akonadi2::Resource::revisionUpdated, |
372 | this, &Listener::refreshRevision); | 370 | this, &Listener::refreshRevision); |
373 | } else { | 371 | } else { |
diff --git a/common/listener.h b/common/listener.h index 649c3ed..0d19823 100644 --- a/common/listener.h +++ b/common/listener.h | |||
@@ -92,7 +92,6 @@ private: | |||
92 | const QByteArray m_resourceName; | 92 | const QByteArray m_resourceName; |
93 | const QByteArray m_resourceInstanceIdentifier; | 93 | const QByteArray m_resourceInstanceIdentifier; |
94 | Akonadi2::Resource *m_resource; | 94 | Akonadi2::Resource *m_resource; |
95 | Akonadi2::Pipeline *m_pipeline; | ||
96 | QTimer *m_clientBufferProcessesTimer; | 95 | QTimer *m_clientBufferProcessesTimer; |
97 | QTimer *m_checkConnectionsTimer; | 96 | QTimer *m_checkConnectionsTimer; |
98 | int m_messageId; | 97 | int m_messageId; |
diff --git a/common/resource.cpp b/common/resource.cpp index 68a237c..2a86df5 100644 --- a/common/resource.cpp +++ b/common/resource.cpp | |||
@@ -40,24 +40,15 @@ Resource::~Resource() | |||
40 | //delete d; | 40 | //delete d; |
41 | } | 41 | } |
42 | 42 | ||
43 | void Resource::configurePipeline(Pipeline *pipeline) | 43 | void Resource::processCommand(int commandId, const QByteArray &data) |
44 | { | ||
45 | |||
46 | } | ||
47 | |||
48 | void Resource::processCommand(int commandId, const QByteArray &data, Pipeline *pipeline) | ||
49 | { | 44 | { |
50 | Q_UNUSED(commandId) | 45 | Q_UNUSED(commandId) |
51 | Q_UNUSED(data) | 46 | Q_UNUSED(data) |
52 | Q_UNUSED(pipeline) | ||
53 | pipeline->null(); | ||
54 | } | 47 | } |
55 | 48 | ||
56 | KAsync::Job<void> Resource::synchronizeWithSource(Pipeline *pipeline) | 49 | KAsync::Job<void> Resource::synchronizeWithSource() |
57 | { | 50 | { |
58 | return KAsync::start<void>([pipeline](KAsync::Future<void> &f) { | 51 | return KAsync::null<void>(); |
59 | pipeline->null(); | ||
60 | }); | ||
61 | } | 52 | } |
62 | 53 | ||
63 | KAsync::Job<void> Resource::processAllMessages() | 54 | KAsync::Job<void> Resource::processAllMessages() |
diff --git a/common/resource.h b/common/resource.h index 9f657f7..a51e12d 100644 --- a/common/resource.h +++ b/common/resource.h | |||
@@ -20,12 +20,12 @@ | |||
20 | 20 | ||
21 | #include <akonadi2common_export.h> | 21 | #include <akonadi2common_export.h> |
22 | #include <clientapi.h> | 22 | #include <clientapi.h> |
23 | #include <pipeline.h> | ||
24 | 23 | ||
25 | #include <Async/Async> | 24 | #include <Async/Async> |
26 | 25 | ||
27 | namespace Akonadi2 | 26 | namespace Akonadi2 |
28 | { | 27 | { |
28 | class Pipeline; | ||
29 | 29 | ||
30 | /** | 30 | /** |
31 | * Resource interface | 31 | * Resource interface |
@@ -37,12 +37,10 @@ public: | |||
37 | Resource(); | 37 | Resource(); |
38 | virtual ~Resource(); | 38 | virtual ~Resource(); |
39 | 39 | ||
40 | virtual void processCommand(int commandId, const QByteArray &data, Pipeline *pipeline); | 40 | virtual void processCommand(int commandId, const QByteArray &data); |
41 | virtual KAsync::Job<void> synchronizeWithSource(Pipeline *pipeline); | 41 | virtual KAsync::Job<void> synchronizeWithSource(); |
42 | virtual KAsync::Job<void> processAllMessages(); | 42 | virtual KAsync::Job<void> processAllMessages(); |
43 | 43 | ||
44 | virtual void configurePipeline(Pipeline *pipeline); | ||
45 | |||
46 | Q_SIGNALS: | 44 | Q_SIGNALS: |
47 | void revisionUpdated(qint64); | 45 | void revisionUpdated(qint64); |
48 | 46 | ||