summaryrefslogtreecommitdiffstats
path: root/examples/dummyresource/resourcefactory.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'examples/dummyresource/resourcefactory.cpp')
-rw-r--r--examples/dummyresource/resourcefactory.cpp428
1 files changed, 428 insertions, 0 deletions
diff --git a/examples/dummyresource/resourcefactory.cpp b/examples/dummyresource/resourcefactory.cpp
new file mode 100644
index 0000000..d5765e2
--- /dev/null
+++ b/examples/dummyresource/resourcefactory.cpp
@@ -0,0 +1,428 @@
1/*
2 * Copyright (C) 2014 Aaron Seigo <aseigo@kde.org>
3 *
4 * This program is free software; you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License as published by
6 * the Free Software Foundation; either version 2 of the License, or
7 * (at your option) any later version.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU General Public License for more details.
13 *
14 * You should have received a copy of the GNU General Public License
15 * along with this program; if not, write to the
16 * Free Software Foundation, Inc.,
17 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18 */
19
20#include "resourcefactory.h"
21#include "facade.h"
22#include "entitybuffer.h"
23#include "pipeline.h"
24#include "dummycalendar_generated.h"
25#include "metadata_generated.h"
26#include "queuedcommand_generated.h"
27#include "createentity_generated.h"
28#include "domainadaptor.h"
29#include "commands.h"
30#include "clientapi.h"
31#include "index.h"
32#include "log.h"
33#include <QUuid>
34#include <assert.h>
35
36
37/*
38 * Figure out how to implement various classes of processors:
39 * * read-only (index and such) => extractor function, probably using domain adaptor
40 * * filter => provide means to move entity elsewhere, and also reflect change in source (I guess?)
41 * * flag extractors? => like read-only? Or write to local portion of buffer?
42 * ** $ISSPAM should become part of domain object and is written to the local part of the mail.
43 * ** => value could be calculated by the server directly
44 */
45class SimpleProcessor : public Akonadi2::Preprocessor
46{
47public:
48 SimpleProcessor(const QString &id, const std::function<void(const Akonadi2::PipelineState &state, const Akonadi2::Entity &e)> &f)
49 : Akonadi2::Preprocessor(),
50 mFunction(f),
51 mId(id)
52 {
53 }
54
55 void process(const Akonadi2::PipelineState &state, const Akonadi2::Entity &e) Q_DECL_OVERRIDE
56 {
57 mFunction(state, e);
58 processingCompleted(state);
59 }
60
61 QString id() const
62 {
63 return mId;
64 }
65
66protected:
67 std::function<void(const Akonadi2::PipelineState &state, const Akonadi2::Entity &e)> mFunction;
68 QString mId;
69};
70
71
72
73static std::string createEvent()
74{
75 static const size_t attachmentSize = 1024*2; // 2KB
76 static uint8_t rawData[attachmentSize];
77 static flatbuffers::FlatBufferBuilder fbb;
78 fbb.Clear();
79 {
80 uint8_t *rawDataPtr = Q_NULLPTR;
81 auto summary = fbb.CreateString("summary");
82 auto data = fbb.CreateUninitializedVector<uint8_t>(attachmentSize, &rawDataPtr);
83 DummyCalendar::DummyEventBuilder eventBuilder(fbb);
84 eventBuilder.add_summary(summary);
85 eventBuilder.add_attachment(data);
86 auto eventLocation = eventBuilder.Finish();
87 DummyCalendar::FinishDummyEventBuffer(fbb, eventLocation);
88 memcpy((void*)rawDataPtr, rawData, attachmentSize);
89 }
90
91 return std::string(reinterpret_cast<const char *>(fbb.GetBufferPointer()), fbb.GetSize());
92}
93
94QMap<QString, QString> populate()
95{
96 QMap<QString, QString> content;
97 for (int i = 0; i < 2; i++) {
98 auto event = createEvent();
99 content.insert(QString("key%1").arg(i), QString::fromStdString(event));
100 }
101 return content;
102}
103
104static QMap<QString, QString> s_dataSource = populate();
105
106//Drives the pipeline using the output from all command queues
107class Processor : public QObject
108{
109 Q_OBJECT
110public:
111 Processor(Akonadi2::Pipeline *pipeline, QList<MessageQueue*> commandQueues)
112 : QObject(),
113 mPipeline(pipeline),
114 mCommandQueues(commandQueues),
115 mProcessingLock(false)
116 {
117 for (auto queue : mCommandQueues) {
118 const bool ret = connect(queue, &MessageQueue::messageReady, this, &Processor::process);
119 Q_UNUSED(ret);
120 }
121 }
122
123signals:
124 void error(int errorCode, const QString &errorMessage);
125
126private:
127 bool messagesToProcessAvailable()
128 {
129 for (auto queue : mCommandQueues) {
130 if (!queue->isEmpty()) {
131 return true;
132 }
133 }
134 return false;
135 }
136
137private slots:
138 void process()
139 {
140 if (mProcessingLock) {
141 return;
142 }
143 mProcessingLock = true;
144 auto job = processPipeline().then<void>([this]() {
145 mProcessingLock = false;
146 if (messagesToProcessAvailable()) {
147 process();
148 }
149 }).exec();
150 }
151
152 Async::Job<void> processQueuedCommand(const Akonadi2::QueuedCommand *queuedCommand)
153 {
154 Log() << "Processing command: " << Akonadi2::Commands::name(queuedCommand->commandId());
155 //Throw command into appropriate pipeline
156 switch (queuedCommand->commandId()) {
157 case Akonadi2::Commands::DeleteEntityCommand:
158 //mPipeline->removedEntity
159 return Async::null<void>();
160 case Akonadi2::Commands::ModifyEntityCommand:
161 //mPipeline->modifiedEntity
162 return Async::null<void>();
163 case Akonadi2::Commands::CreateEntityCommand:
164 return mPipeline->newEntity(queuedCommand->command()->Data(), queuedCommand->command()->size());
165 default:
166 return Async::error<void>(-1, "Unhandled command");
167 }
168 return Async::null<void>();
169 }
170
171 //Process all messages of this queue
172 Async::Job<void> processQueue(MessageQueue *queue)
173 {
174 //TODO use something like:
175 //Async::foreach("pass iterator here").each("process value here").join();
176 //Async::foreach("pass iterator here").parallel("process value here").join();
177 return Async::dowhile(
178 [this, queue](Async::Future<bool> &future) {
179 if (queue->isEmpty()) {
180 future.setValue(false);
181 future.setFinished();
182 return;
183 }
184 queue->dequeue(
185 [this, &future](void *ptr, int size, std::function<void(bool success)> messageQueueCallback) {
186 auto callback = [messageQueueCallback, &future](bool success) {
187 messageQueueCallback(success);
188 future.setValue(!success);
189 future.setFinished();
190 };
191
192 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(ptr), size);
193 if (!Akonadi2::VerifyQueuedCommandBuffer(verifyer)) {
194 Warning() << "invalid buffer";
195 callback(false);
196 return;
197 }
198 auto queuedCommand = Akonadi2::GetQueuedCommand(ptr);
199 Trace() << "Dequeued Command: " << Akonadi2::Commands::name(queuedCommand->commandId());
200 //TODO JOBAPI: job lifetime management
201 //Right now we're just leaking jobs. In this case we'd like jobs that are heap allocated and delete
202 //themselves once done. In other cases we'd like jobs that only live as long as their handle though.
203 //FIXME this job is stack allocated and thus simply dies....
204 processQueuedCommand(queuedCommand).then<void>(
205 [callback]() {
206 callback(true);
207 },
208 [callback](int errorCode, QString errorMessage) {
209 Warning() << "Error while processing queue command: " << errorMessage;
210 callback(false);
211 }
212 ).exec();
213 },
214 [&future](const MessageQueue::Error &error) {
215 Warning() << "Error while getting message from messagequeue: " << error.message;
216 future.setValue(false);
217 future.setFinished();
218 }
219 );
220 }
221 );
222 }
223
224 Async::Job<void> processPipeline()
225 {
226 //Go through all message queues
227 auto it = QSharedPointer<QListIterator<MessageQueue*> >::create(mCommandQueues);
228 return Async::dowhile(
229 [it]() { return it->hasNext(); },
230 [it, this](Async::Future<void> &future) {
231 auto queue = it->next();
232 processQueue(queue).then<void>([&future]() {
233 Trace() << "Queue processed";
234 future.setFinished();
235 }).exec();
236 }
237 );
238 }
239
240private:
241 Akonadi2::Pipeline *mPipeline;
242 //Ordered by priority
243 QList<MessageQueue*> mCommandQueues;
244 bool mProcessingLock;
245};
246
247DummyResource::DummyResource()
248 : Akonadi2::Resource(),
249 mUserQueue(QStandardPaths::writableLocation(QStandardPaths::GenericDataLocation) + "/akonadi2/storage", "org.kde.dummy.userqueue"),
250 mSynchronizerQueue(QStandardPaths::writableLocation(QStandardPaths::GenericDataLocation) + "/akonadi2/storage", "org.kde.dummy.synchronizerqueue"),
251 mError(0)
252{
253}
254
255void DummyResource::configurePipeline(Akonadi2::Pipeline *pipeline)
256{
257 auto eventFactory = QSharedPointer<DummyEventAdaptorFactory>::create();
258 //FIXME we should setup for each resource entity type, not for each domain type
259 //i.e. If a resource stores tags as part of each message it needs to update the tag index
260 //TODO setup preprocessors for each resource entity type and pipeline type allowing full customization
261 //Eventually the order should be self configuring, for now it's hardcoded.
262 auto eventIndexer = new SimpleProcessor("summaryprocessor", [eventFactory](const Akonadi2::PipelineState &state, const Akonadi2::Entity &entity) {
263 auto adaptor = eventFactory->createAdaptor(entity);
264 // Log() << "Summary preprocessor: " << adaptor->getProperty("summary").toString();
265 });
266
267 auto uidIndexer = new SimpleProcessor("uidIndexer", [eventFactory](const Akonadi2::PipelineState &state, const Akonadi2::Entity &entity) {
268 static Index uidIndex(QStandardPaths::writableLocation(QStandardPaths::GenericDataLocation) + "/akonadi2/storage", "org.kde.dummy.index.uid", Akonadi2::Storage::ReadWrite);
269
270 //TODO: Benchmark if this is performance wise acceptable, or if we have to access the buffer directly
271 auto adaptor = eventFactory->createAdaptor(entity);
272 const auto uid = adaptor->getProperty("uid");
273 if (uid.isValid()) {
274 uidIndex.add(uid.toByteArray(), state.key());
275 }
276 });
277
278 //event is the entitytype and not the domain type
279 pipeline->setPreprocessors("event", Akonadi2::Pipeline::NewPipeline, QVector<Akonadi2::Preprocessor*>() << eventIndexer << uidIndexer);
280 mProcessor = new Processor(pipeline, QList<MessageQueue*>() << &mUserQueue << &mSynchronizerQueue);
281 QObject::connect(mProcessor, &Processor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); });
282}
283
284void DummyResource::onProcessorError(int errorCode, const QString &errorMessage)
285{
286 Warning() << "Received error from Processor: " << errorCode << errorMessage;
287 mError = errorCode;
288}
289
290int DummyResource::error() const
291{
292 return mError;
293}
294
295void findByRemoteId(QSharedPointer<Akonadi2::Storage> storage, const QString &rid, std::function<void(void *keyValue, int keySize, void *dataValue, int dataSize)> callback)
296{
297 //TODO lookup in rid index instead of doing a full scan
298 const std::string ridString = rid.toStdString();
299 storage->scan("", [&](void *keyValue, int keySize, void *dataValue, int dataSize) -> bool {
300 if (Akonadi2::Storage::isInternalKey(keyValue, keySize)) {
301 return true;
302 }
303
304 Akonadi2::EntityBuffer::extractResourceBuffer(dataValue, dataSize, [&](const uint8_t *buffer, size_t size) {
305 flatbuffers::Verifier verifier(buffer, size);
306 if (DummyCalendar::VerifyDummyEventBuffer(verifier)) {
307 DummyCalendar::DummyEvent const *resourceBuffer = DummyCalendar::GetDummyEvent(buffer);
308 if (resourceBuffer && resourceBuffer->remoteId()) {
309 if (std::string(resourceBuffer->remoteId()->c_str(), resourceBuffer->remoteId()->size()) == ridString) {
310 callback(keyValue, keySize, dataValue, dataSize);
311 }
312 }
313 }
314 });
315 return true;
316 });
317}
318
319void DummyResource::enqueueCommand(MessageQueue &mq, int commandId, const QByteArray &data)
320{
321 m_fbb.Clear();
322 auto commandData = Akonadi2::EntityBuffer::appendAsVector(m_fbb, data.constData(), data.size());
323 auto buffer = Akonadi2::CreateQueuedCommand(m_fbb, commandId, commandData);
324 Akonadi2::FinishQueuedCommandBuffer(m_fbb, buffer);
325 mq.enqueue(m_fbb.GetBufferPointer(), m_fbb.GetSize());
326}
327
328Async::Job<void> DummyResource::synchronizeWithSource(Akonadi2::Pipeline *pipeline)
329{
330 return Async::start<void>([this, pipeline](Async::Future<void> &f) {
331 //TODO use a read-only transaction during the complete sync to sync against a defined revision
332 auto storage = QSharedPointer<Akonadi2::Storage>::create(Akonadi2::Store::storageLocation(), "org.kde.dummy");
333 for (auto it = s_dataSource.constBegin(); it != s_dataSource.constEnd(); it++) {
334 bool isNew = true;
335 if (storage->exists()) {
336 findByRemoteId(storage, it.key(), [&](void *keyValue, int keySize, void *dataValue, int dataSize) {
337 isNew = false;
338 });
339 }
340 if (isNew) {
341 m_fbb.Clear();
342
343 const QByteArray data = it.value().toUtf8();
344 auto eventBuffer = DummyCalendar::GetDummyEvent(data.data());
345
346 //Map the source format to the buffer format (which happens to be an exact copy here)
347 auto summary = m_fbb.CreateString(eventBuffer->summary()->c_str());
348 auto rid = m_fbb.CreateString(it.key().toStdString().c_str());
349 auto description = m_fbb.CreateString(it.key().toStdString().c_str());
350 static uint8_t rawData[100];
351 auto attachment = Akonadi2::EntityBuffer::appendAsVector(m_fbb, rawData, 100);
352
353 auto builder = DummyCalendar::DummyEventBuilder(m_fbb);
354 builder.add_summary(summary);
355 builder.add_remoteId(rid);
356 builder.add_description(description);
357 builder.add_attachment(attachment);
358 auto buffer = builder.Finish();
359 DummyCalendar::FinishDummyEventBuffer(m_fbb, buffer);
360 flatbuffers::FlatBufferBuilder entityFbb;
361 Akonadi2::EntityBuffer::assembleEntityBuffer(entityFbb, 0, 0, m_fbb.GetBufferPointer(), m_fbb.GetSize(), 0, 0);
362
363 flatbuffers::FlatBufferBuilder fbb;
364 //This is the resource type and not the domain type
365 auto type = fbb.CreateString("event");
366 auto delta = Akonadi2::EntityBuffer::appendAsVector(fbb, entityFbb.GetBufferPointer(), entityFbb.GetSize());
367 auto location = Akonadi2::Commands::CreateCreateEntity(fbb, type, delta);
368 Akonadi2::Commands::FinishCreateEntityBuffer(fbb, location);
369
370 enqueueCommand(mSynchronizerQueue, Akonadi2::Commands::CreateEntityCommand, QByteArray::fromRawData(reinterpret_cast<char const *>(fbb.GetBufferPointer()), fbb.GetSize()));
371 } else { //modification
372 //TODO diff and create modification if necessary
373 }
374 }
375 //TODO find items to remove
376 f.setFinished();
377 });
378}
379
380Async::Job<void> DummyResource::processAllMessages()
381{
382 //We have to wait for all items to be processed to ensure the synced items are available when a query gets executed.
383 //TODO: report errors while processing sync?
384 //TODO JOBAPI: A helper that waits for n events and then continues?
385 return Async::start<void>([this](Async::Future<void> &f) {
386 if (mSynchronizerQueue.isEmpty()) {
387 f.setFinished();
388 } else {
389 QObject::connect(&mSynchronizerQueue, &MessageQueue::drained, [&f]() {
390 f.setFinished();
391 });
392 }
393 }).then<void>([this](Async::Future<void> &f) {
394 if (mUserQueue.isEmpty()) {
395 f.setFinished();
396 } else {
397 QObject::connect(&mUserQueue, &MessageQueue::drained, [&f]() {
398 f.setFinished();
399 });
400 }
401 });
402}
403
404void DummyResource::processCommand(int commandId, const QByteArray &data, uint size, Akonadi2::Pipeline *pipeline)
405{
406 //TODO instead of copying the command including the full entity first into the command queue, we could directly
407 //create a new revision, only pushing a handle into the commandqueue with the relevant changeset (for changereplay).
408 //The problem is that we then require write access from multiple threads (or even processes to avoid sending the full entity over the wire).
409 enqueueCommand(mUserQueue, commandId, data);
410}
411
412DummyResourceFactory::DummyResourceFactory(QObject *parent)
413 : Akonadi2::ResourceFactory(parent)
414{
415
416}
417
418Akonadi2::Resource *DummyResourceFactory::createResource()
419{
420 return new DummyResource();
421}
422
423void DummyResourceFactory::registerFacades(Akonadi2::FacadeFactory &factory)
424{
425 factory.registerFacade<Akonadi2::ApplicationDomain::Event, DummyResourceFacade>(PLUGIN_NAME);
426}
427
428#include "resourcefactory.moc"