summaryrefslogtreecommitdiffstats
path: root/common
diff options
context:
space:
mode:
Diffstat (limited to 'common')
-rw-r--r--common/CMakeLists.txt1
-rw-r--r--common/genericresource.cpp234
-rw-r--r--common/genericresource.h59
3 files changed, 294 insertions, 0 deletions
diff --git a/common/CMakeLists.txt b/common/CMakeLists.txt
index ce237c5..c18d98a 100644
--- a/common/CMakeLists.txt
+++ b/common/CMakeLists.txt
@@ -33,6 +33,7 @@ set(command_SRCS
33 pipeline.cpp 33 pipeline.cpp
34 domainadaptor.cpp 34 domainadaptor.cpp
35 resource.cpp 35 resource.cpp
36 genericresource.cpp
36 resourceaccess.cpp 37 resourceaccess.cpp
37 storage_common.cpp 38 storage_common.cpp
38 threadboundary.cpp 39 threadboundary.cpp
diff --git a/common/genericresource.cpp b/common/genericresource.cpp
new file mode 100644
index 0000000..0b71500
--- /dev/null
+++ b/common/genericresource.cpp
@@ -0,0 +1,234 @@
1#include "genericresource.h"
2
3#include "facade.h"
4#include "entitybuffer.h"
5#include "pipeline.h"
6#include "queuedcommand_generated.h"
7#include "createentity_generated.h"
8#include "domainadaptor.h"
9#include "commands.h"
10#include "clientapi.h"
11#include "index.h"
12#include "log.h"
13#include <assert.h>
14
15using namespace Akonadi2;
16
17/**
18 * Drives the pipeline using the output from all command queues
19 */
20class Processor : public QObject
21{
22 Q_OBJECT
23public:
24 Processor(Akonadi2::Pipeline *pipeline, QList<MessageQueue*> commandQueues)
25 : QObject(),
26 mPipeline(pipeline),
27 mCommandQueues(commandQueues),
28 mProcessingLock(false)
29 {
30 for (auto queue : mCommandQueues) {
31 const bool ret = connect(queue, &MessageQueue::messageReady, this, &Processor::process);
32 Q_UNUSED(ret);
33 }
34 }
35
36signals:
37 void error(int errorCode, const QString &errorMessage);
38
39private:
40 bool messagesToProcessAvailable()
41 {
42 for (auto queue : mCommandQueues) {
43 if (!queue->isEmpty()) {
44 return true;
45 }
46 }
47 return false;
48 }
49
50private slots:
51 void process()
52 {
53 if (mProcessingLock) {
54 return;
55 }
56 mProcessingLock = true;
57 auto job = processPipeline().then<void>([this]() {
58 mProcessingLock = false;
59 if (messagesToProcessAvailable()) {
60 process();
61 }
62 }).exec();
63 }
64
65 Async::Job<void> processQueuedCommand(const Akonadi2::QueuedCommand *queuedCommand)
66 {
67 Log() << "Processing command: " << Akonadi2::Commands::name(queuedCommand->commandId());
68 //Throw command into appropriate pipeline
69 switch (queuedCommand->commandId()) {
70 case Akonadi2::Commands::DeleteEntityCommand:
71 //mPipeline->removedEntity
72 return Async::null<void>();
73 case Akonadi2::Commands::ModifyEntityCommand:
74 //mPipeline->modifiedEntity
75 return Async::null<void>();
76 case Akonadi2::Commands::CreateEntityCommand:
77 return mPipeline->newEntity(queuedCommand->command()->Data(), queuedCommand->command()->size());
78 default:
79 return Async::error<void>(-1, "Unhandled command");
80 }
81 return Async::null<void>();
82 }
83
84 //Process all messages of this queue
85 Async::Job<void> processQueue(MessageQueue *queue)
86 {
87 //TODO use something like:
88 //Async::foreach("pass iterator here").each("process value here").join();
89 //Async::foreach("pass iterator here").parallel("process value here").join();
90 return Async::dowhile(
91 [this, queue](Async::Future<bool> &future) {
92 if (queue->isEmpty()) {
93 future.setValue(false);
94 future.setFinished();
95 return;
96 }
97 queue->dequeue(
98 [this, &future](void *ptr, int size, std::function<void(bool success)> messageQueueCallback) {
99 auto callback = [messageQueueCallback, &future](bool success) {
100 messageQueueCallback(success);
101 future.setValue(!success);
102 future.setFinished();
103 };
104
105 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(ptr), size);
106 if (!Akonadi2::VerifyQueuedCommandBuffer(verifyer)) {
107 Warning() << "invalid buffer";
108 callback(false);
109 return;
110 }
111 auto queuedCommand = Akonadi2::GetQueuedCommand(ptr);
112 Trace() << "Dequeued Command: " << Akonadi2::Commands::name(queuedCommand->commandId());
113 //TODO JOBAPI: job lifetime management
114 //Right now we're just leaking jobs. In this case we'd like jobs that are heap allocated and delete
115 //themselves once done. In other cases we'd like jobs that only live as long as their handle though.
116 //FIXME this job is stack allocated and thus simply dies....
117 processQueuedCommand(queuedCommand).then<void>(
118 [callback]() {
119 callback(true);
120 },
121 [callback](int errorCode, QString errorMessage) {
122 Warning() << "Error while processing queue command: " << errorMessage;
123 callback(false);
124 }
125 ).exec();
126 },
127 [&future](const MessageQueue::Error &error) {
128 Warning() << "Error while getting message from messagequeue: " << error.message;
129 future.setValue(false);
130 future.setFinished();
131 }
132 );
133 }
134 );
135 }
136
137 Async::Job<void> processPipeline()
138 {
139 //Go through all message queues
140 auto it = QSharedPointer<QListIterator<MessageQueue*> >::create(mCommandQueues);
141 return Async::dowhile(
142 [it]() { return it->hasNext(); },
143 [it, this](Async::Future<void> &future) {
144 auto queue = it->next();
145 processQueue(queue).then<void>([&future]() {
146 Trace() << "Queue processed";
147 future.setFinished();
148 }).exec();
149 }
150 );
151 }
152
153private:
154 Akonadi2::Pipeline *mPipeline;
155 //Ordered by priority
156 QList<MessageQueue*> mCommandQueues;
157 bool mProcessingLock;
158};
159
160
161GenericResource::GenericResource(const QByteArray &resourceIdentifier)
162 : Akonadi2::Resource(),
163 mUserQueue(QStandardPaths::writableLocation(QStandardPaths::GenericDataLocation) + "/akonadi2/storage", "org.kde." + resourceIdentifier + ".userqueue"),
164 mSynchronizerQueue(QStandardPaths::writableLocation(QStandardPaths::GenericDataLocation) + "/akonadi2/storage", "org.kde." + resourceIdentifier + ".synchronizerqueue"),
165 mError(0)
166{
167}
168
169GenericResource::~GenericResource()
170{
171
172}
173
174void GenericResource::configurePipeline(Akonadi2::Pipeline *pipeline)
175{
176 //TODO figure out lifetime of the processor
177 mProcessor = new Processor(pipeline, QList<MessageQueue*>() << &mUserQueue << &mSynchronizerQueue);
178 QObject::connect(mProcessor, &Processor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); });
179}
180
181void GenericResource::onProcessorError(int errorCode, const QString &errorMessage)
182{
183 Warning() << "Received error from Processor: " << errorCode << errorMessage;
184 mError = errorCode;
185}
186
187int GenericResource::error() const
188{
189 return mError;
190}
191
192void GenericResource::enqueueCommand(MessageQueue &mq, int commandId, const QByteArray &data)
193{
194 //TODO get rid of m_fbb member variable
195 m_fbb.Clear();
196 auto commandData = Akonadi2::EntityBuffer::appendAsVector(m_fbb, data.constData(), data.size());
197 auto buffer = Akonadi2::CreateQueuedCommand(m_fbb, commandId, commandData);
198 Akonadi2::FinishQueuedCommandBuffer(m_fbb, buffer);
199 mq.enqueue(m_fbb.GetBufferPointer(), m_fbb.GetSize());
200}
201
202void GenericResource::processCommand(int commandId, const QByteArray &data, uint size, Akonadi2::Pipeline *pipeline)
203{
204 //TODO instead of copying the command including the full entity first into the command queue, we could directly
205 //create a new revision, only pushing a handle into the commandqueue with the relevant changeset (for changereplay).
206 //The problem is that we then require write access from multiple threads (or even processes to avoid sending the full entity over the wire).
207 enqueueCommand(mUserQueue, commandId, data);
208}
209
210Async::Job<void> GenericResource::processAllMessages()
211{
212 //We have to wait for all items to be processed to ensure the synced items are available when a query gets executed.
213 //TODO: report errors while processing sync?
214 //TODO JOBAPI: A helper that waits for n events and then continues?
215 return Async::start<void>([this](Async::Future<void> &f) {
216 if (mSynchronizerQueue.isEmpty()) {
217 f.setFinished();
218 } else {
219 QObject::connect(&mSynchronizerQueue, &MessageQueue::drained, [&f]() {
220 f.setFinished();
221 });
222 }
223 }).then<void>([this](Async::Future<void> &f) {
224 if (mUserQueue.isEmpty()) {
225 f.setFinished();
226 } else {
227 QObject::connect(&mUserQueue, &MessageQueue::drained, [&f]() {
228 f.setFinished();
229 });
230 }
231 });
232}
233
234#include "genericresource.moc"
diff --git a/common/genericresource.h b/common/genericresource.h
new file mode 100644
index 0000000..36fa567
--- /dev/null
+++ b/common/genericresource.h
@@ -0,0 +1,59 @@
1/*
2 * Copyright (C) 2015 Christian Mollekopf <chrigi_1@fastmail.fm>
3 *
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Lesser General Public
6 * License as published by the Free Software Foundation; either
7 * version 2.1 of the License, or (at your option) version 3, or any
8 * later version accepted by the membership of KDE e.V. (or its
9 * successor approved by the membership of KDE e.V.), which shall
10 * act as a proxy defined in Section 6 of version 3 of the license.
11 *
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this library. If not, see <http://www.gnu.org/licenses/>.
19 */
20#pragma once
21
22#include <akonadi2common_export.h>
23#include <resource.h>
24#include <messagequeue.h>
25
26class Processor;
27
28namespace Akonadi2
29{
30
31/**
32 * Generic Resource implementation.
33 */
34class AKONADI2COMMON_EXPORT GenericResource : public Resource
35{
36public:
37 GenericResource(const QByteArray &resourceIdentifier);
38 virtual ~GenericResource();
39
40 virtual void processCommand(int commandId, const QByteArray &data, uint size, Pipeline *pipeline) Q_DECL_OVERRIDE;
41 virtual Async::Job<void> synchronizeWithSource(Pipeline *pipeline) Q_DECL_OVERRIDE = 0;
42 virtual Async::Job<void> processAllMessages() Q_DECL_OVERRIDE;
43
44 virtual void configurePipeline(Pipeline *pipeline) Q_DECL_OVERRIDE;
45 int error() const;
46
47protected:
48 void onProcessorError(int errorCode, const QString &errorMessage);
49 void enqueueCommand(MessageQueue &mq, int commandId, const QByteArray &data);
50 flatbuffers::FlatBufferBuilder m_fbb;
51 MessageQueue mUserQueue;
52 MessageQueue mSynchronizerQueue;
53
54private:
55 Processor *mProcessor;
56 int mError;
57};
58
59}