summaryrefslogtreecommitdiffstats
path: root/common/genericresource.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'common/genericresource.cpp')
-rw-r--r--common/genericresource.cpp234
1 files changed, 234 insertions, 0 deletions
diff --git a/common/genericresource.cpp b/common/genericresource.cpp
new file mode 100644
index 0000000..0b71500
--- /dev/null
+++ b/common/genericresource.cpp
@@ -0,0 +1,234 @@
1#include "genericresource.h"
2
3#include "facade.h"
4#include "entitybuffer.h"
5#include "pipeline.h"
6#include "queuedcommand_generated.h"
7#include "createentity_generated.h"
8#include "domainadaptor.h"
9#include "commands.h"
10#include "clientapi.h"
11#include "index.h"
12#include "log.h"
13#include <assert.h>
14
15using namespace Akonadi2;
16
17/**
18 * Drives the pipeline using the output from all command queues
19 */
20class Processor : public QObject
21{
22 Q_OBJECT
23public:
24 Processor(Akonadi2::Pipeline *pipeline, QList<MessageQueue*> commandQueues)
25 : QObject(),
26 mPipeline(pipeline),
27 mCommandQueues(commandQueues),
28 mProcessingLock(false)
29 {
30 for (auto queue : mCommandQueues) {
31 const bool ret = connect(queue, &MessageQueue::messageReady, this, &Processor::process);
32 Q_UNUSED(ret);
33 }
34 }
35
36signals:
37 void error(int errorCode, const QString &errorMessage);
38
39private:
40 bool messagesToProcessAvailable()
41 {
42 for (auto queue : mCommandQueues) {
43 if (!queue->isEmpty()) {
44 return true;
45 }
46 }
47 return false;
48 }
49
50private slots:
51 void process()
52 {
53 if (mProcessingLock) {
54 return;
55 }
56 mProcessingLock = true;
57 auto job = processPipeline().then<void>([this]() {
58 mProcessingLock = false;
59 if (messagesToProcessAvailable()) {
60 process();
61 }
62 }).exec();
63 }
64
65 Async::Job<void> processQueuedCommand(const Akonadi2::QueuedCommand *queuedCommand)
66 {
67 Log() << "Processing command: " << Akonadi2::Commands::name(queuedCommand->commandId());
68 //Throw command into appropriate pipeline
69 switch (queuedCommand->commandId()) {
70 case Akonadi2::Commands::DeleteEntityCommand:
71 //mPipeline->removedEntity
72 return Async::null<void>();
73 case Akonadi2::Commands::ModifyEntityCommand:
74 //mPipeline->modifiedEntity
75 return Async::null<void>();
76 case Akonadi2::Commands::CreateEntityCommand:
77 return mPipeline->newEntity(queuedCommand->command()->Data(), queuedCommand->command()->size());
78 default:
79 return Async::error<void>(-1, "Unhandled command");
80 }
81 return Async::null<void>();
82 }
83
84 //Process all messages of this queue
85 Async::Job<void> processQueue(MessageQueue *queue)
86 {
87 //TODO use something like:
88 //Async::foreach("pass iterator here").each("process value here").join();
89 //Async::foreach("pass iterator here").parallel("process value here").join();
90 return Async::dowhile(
91 [this, queue](Async::Future<bool> &future) {
92 if (queue->isEmpty()) {
93 future.setValue(false);
94 future.setFinished();
95 return;
96 }
97 queue->dequeue(
98 [this, &future](void *ptr, int size, std::function<void(bool success)> messageQueueCallback) {
99 auto callback = [messageQueueCallback, &future](bool success) {
100 messageQueueCallback(success);
101 future.setValue(!success);
102 future.setFinished();
103 };
104
105 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(ptr), size);
106 if (!Akonadi2::VerifyQueuedCommandBuffer(verifyer)) {
107 Warning() << "invalid buffer";
108 callback(false);
109 return;
110 }
111 auto queuedCommand = Akonadi2::GetQueuedCommand(ptr);
112 Trace() << "Dequeued Command: " << Akonadi2::Commands::name(queuedCommand->commandId());
113 //TODO JOBAPI: job lifetime management
114 //Right now we're just leaking jobs. In this case we'd like jobs that are heap allocated and delete
115 //themselves once done. In other cases we'd like jobs that only live as long as their handle though.
116 //FIXME this job is stack allocated and thus simply dies....
117 processQueuedCommand(queuedCommand).then<void>(
118 [callback]() {
119 callback(true);
120 },
121 [callback](int errorCode, QString errorMessage) {
122 Warning() << "Error while processing queue command: " << errorMessage;
123 callback(false);
124 }
125 ).exec();
126 },
127 [&future](const MessageQueue::Error &error) {
128 Warning() << "Error while getting message from messagequeue: " << error.message;
129 future.setValue(false);
130 future.setFinished();
131 }
132 );
133 }
134 );
135 }
136
137 Async::Job<void> processPipeline()
138 {
139 //Go through all message queues
140 auto it = QSharedPointer<QListIterator<MessageQueue*> >::create(mCommandQueues);
141 return Async::dowhile(
142 [it]() { return it->hasNext(); },
143 [it, this](Async::Future<void> &future) {
144 auto queue = it->next();
145 processQueue(queue).then<void>([&future]() {
146 Trace() << "Queue processed";
147 future.setFinished();
148 }).exec();
149 }
150 );
151 }
152
153private:
154 Akonadi2::Pipeline *mPipeline;
155 //Ordered by priority
156 QList<MessageQueue*> mCommandQueues;
157 bool mProcessingLock;
158};
159
160
161GenericResource::GenericResource(const QByteArray &resourceIdentifier)
162 : Akonadi2::Resource(),
163 mUserQueue(QStandardPaths::writableLocation(QStandardPaths::GenericDataLocation) + "/akonadi2/storage", "org.kde." + resourceIdentifier + ".userqueue"),
164 mSynchronizerQueue(QStandardPaths::writableLocation(QStandardPaths::GenericDataLocation) + "/akonadi2/storage", "org.kde." + resourceIdentifier + ".synchronizerqueue"),
165 mError(0)
166{
167}
168
169GenericResource::~GenericResource()
170{
171
172}
173
174void GenericResource::configurePipeline(Akonadi2::Pipeline *pipeline)
175{
176 //TODO figure out lifetime of the processor
177 mProcessor = new Processor(pipeline, QList<MessageQueue*>() << &mUserQueue << &mSynchronizerQueue);
178 QObject::connect(mProcessor, &Processor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); });
179}
180
181void GenericResource::onProcessorError(int errorCode, const QString &errorMessage)
182{
183 Warning() << "Received error from Processor: " << errorCode << errorMessage;
184 mError = errorCode;
185}
186
187int GenericResource::error() const
188{
189 return mError;
190}
191
192void GenericResource::enqueueCommand(MessageQueue &mq, int commandId, const QByteArray &data)
193{
194 //TODO get rid of m_fbb member variable
195 m_fbb.Clear();
196 auto commandData = Akonadi2::EntityBuffer::appendAsVector(m_fbb, data.constData(), data.size());
197 auto buffer = Akonadi2::CreateQueuedCommand(m_fbb, commandId, commandData);
198 Akonadi2::FinishQueuedCommandBuffer(m_fbb, buffer);
199 mq.enqueue(m_fbb.GetBufferPointer(), m_fbb.GetSize());
200}
201
202void GenericResource::processCommand(int commandId, const QByteArray &data, uint size, Akonadi2::Pipeline *pipeline)
203{
204 //TODO instead of copying the command including the full entity first into the command queue, we could directly
205 //create a new revision, only pushing a handle into the commandqueue with the relevant changeset (for changereplay).
206 //The problem is that we then require write access from multiple threads (or even processes to avoid sending the full entity over the wire).
207 enqueueCommand(mUserQueue, commandId, data);
208}
209
210Async::Job<void> GenericResource::processAllMessages()
211{
212 //We have to wait for all items to be processed to ensure the synced items are available when a query gets executed.
213 //TODO: report errors while processing sync?
214 //TODO JOBAPI: A helper that waits for n events and then continues?
215 return Async::start<void>([this](Async::Future<void> &f) {
216 if (mSynchronizerQueue.isEmpty()) {
217 f.setFinished();
218 } else {
219 QObject::connect(&mSynchronizerQueue, &MessageQueue::drained, [&f]() {
220 f.setFinished();
221 });
222 }
223 }).then<void>([this](Async::Future<void> &f) {
224 if (mUserQueue.isEmpty()) {
225 f.setFinished();
226 } else {
227 QObject::connect(&mUserQueue, &MessageQueue::drained, [&f]() {
228 f.setFinished();
229 });
230 }
231 });
232}
233
234#include "genericresource.moc"