diff options
Diffstat (limited to 'examples/dummyresource/resourcefactory.cpp')
-rw-r--r-- | examples/dummyresource/resourcefactory.cpp | 428 |
1 files changed, 428 insertions, 0 deletions
diff --git a/examples/dummyresource/resourcefactory.cpp b/examples/dummyresource/resourcefactory.cpp new file mode 100644 index 0000000..d5765e2 --- /dev/null +++ b/examples/dummyresource/resourcefactory.cpp | |||
@@ -0,0 +1,428 @@ | |||
1 | /* | ||
2 | * Copyright (C) 2014 Aaron Seigo <aseigo@kde.org> | ||
3 | * | ||
4 | * This program is free software; you can redistribute it and/or modify | ||
5 | * it under the terms of the GNU General Public License as published by | ||
6 | * the Free Software Foundation; either version 2 of the License, or | ||
7 | * (at your option) any later version. | ||
8 | * | ||
9 | * This program is distributed in the hope that it will be useful, | ||
10 | * but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
11 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||
12 | * GNU General Public License for more details. | ||
13 | * | ||
14 | * You should have received a copy of the GNU General Public License | ||
15 | * along with this program; if not, write to the | ||
16 | * Free Software Foundation, Inc., | ||
17 | * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. | ||
18 | */ | ||
19 | |||
20 | #include "resourcefactory.h" | ||
21 | #include "facade.h" | ||
22 | #include "entitybuffer.h" | ||
23 | #include "pipeline.h" | ||
24 | #include "dummycalendar_generated.h" | ||
25 | #include "metadata_generated.h" | ||
26 | #include "queuedcommand_generated.h" | ||
27 | #include "createentity_generated.h" | ||
28 | #include "domainadaptor.h" | ||
29 | #include "commands.h" | ||
30 | #include "clientapi.h" | ||
31 | #include "index.h" | ||
32 | #include "log.h" | ||
33 | #include <QUuid> | ||
34 | #include <assert.h> | ||
35 | |||
36 | |||
37 | /* | ||
38 | * Figure out how to implement various classes of processors: | ||
39 | * * read-only (index and such) => extractor function, probably using domain adaptor | ||
40 | * * filter => provide means to move entity elsewhere, and also reflect change in source (I guess?) | ||
41 | * * flag extractors? => like read-only? Or write to local portion of buffer? | ||
42 | * ** $ISSPAM should become part of domain object and is written to the local part of the mail. | ||
43 | * ** => value could be calculated by the server directly | ||
44 | */ | ||
45 | class SimpleProcessor : public Akonadi2::Preprocessor | ||
46 | { | ||
47 | public: | ||
48 | SimpleProcessor(const QString &id, const std::function<void(const Akonadi2::PipelineState &state, const Akonadi2::Entity &e)> &f) | ||
49 | : Akonadi2::Preprocessor(), | ||
50 | mFunction(f), | ||
51 | mId(id) | ||
52 | { | ||
53 | } | ||
54 | |||
55 | void process(const Akonadi2::PipelineState &state, const Akonadi2::Entity &e) Q_DECL_OVERRIDE | ||
56 | { | ||
57 | mFunction(state, e); | ||
58 | processingCompleted(state); | ||
59 | } | ||
60 | |||
61 | QString id() const | ||
62 | { | ||
63 | return mId; | ||
64 | } | ||
65 | |||
66 | protected: | ||
67 | std::function<void(const Akonadi2::PipelineState &state, const Akonadi2::Entity &e)> mFunction; | ||
68 | QString mId; | ||
69 | }; | ||
70 | |||
71 | |||
72 | |||
73 | static std::string createEvent() | ||
74 | { | ||
75 | static const size_t attachmentSize = 1024*2; // 2KB | ||
76 | static uint8_t rawData[attachmentSize]; | ||
77 | static flatbuffers::FlatBufferBuilder fbb; | ||
78 | fbb.Clear(); | ||
79 | { | ||
80 | uint8_t *rawDataPtr = Q_NULLPTR; | ||
81 | auto summary = fbb.CreateString("summary"); | ||
82 | auto data = fbb.CreateUninitializedVector<uint8_t>(attachmentSize, &rawDataPtr); | ||
83 | DummyCalendar::DummyEventBuilder eventBuilder(fbb); | ||
84 | eventBuilder.add_summary(summary); | ||
85 | eventBuilder.add_attachment(data); | ||
86 | auto eventLocation = eventBuilder.Finish(); | ||
87 | DummyCalendar::FinishDummyEventBuffer(fbb, eventLocation); | ||
88 | memcpy((void*)rawDataPtr, rawData, attachmentSize); | ||
89 | } | ||
90 | |||
91 | return std::string(reinterpret_cast<const char *>(fbb.GetBufferPointer()), fbb.GetSize()); | ||
92 | } | ||
93 | |||
94 | QMap<QString, QString> populate() | ||
95 | { | ||
96 | QMap<QString, QString> content; | ||
97 | for (int i = 0; i < 2; i++) { | ||
98 | auto event = createEvent(); | ||
99 | content.insert(QString("key%1").arg(i), QString::fromStdString(event)); | ||
100 | } | ||
101 | return content; | ||
102 | } | ||
103 | |||
104 | static QMap<QString, QString> s_dataSource = populate(); | ||
105 | |||
106 | //Drives the pipeline using the output from all command queues | ||
107 | class Processor : public QObject | ||
108 | { | ||
109 | Q_OBJECT | ||
110 | public: | ||
111 | Processor(Akonadi2::Pipeline *pipeline, QList<MessageQueue*> commandQueues) | ||
112 | : QObject(), | ||
113 | mPipeline(pipeline), | ||
114 | mCommandQueues(commandQueues), | ||
115 | mProcessingLock(false) | ||
116 | { | ||
117 | for (auto queue : mCommandQueues) { | ||
118 | const bool ret = connect(queue, &MessageQueue::messageReady, this, &Processor::process); | ||
119 | Q_UNUSED(ret); | ||
120 | } | ||
121 | } | ||
122 | |||
123 | signals: | ||
124 | void error(int errorCode, const QString &errorMessage); | ||
125 | |||
126 | private: | ||
127 | bool messagesToProcessAvailable() | ||
128 | { | ||
129 | for (auto queue : mCommandQueues) { | ||
130 | if (!queue->isEmpty()) { | ||
131 | return true; | ||
132 | } | ||
133 | } | ||
134 | return false; | ||
135 | } | ||
136 | |||
137 | private slots: | ||
138 | void process() | ||
139 | { | ||
140 | if (mProcessingLock) { | ||
141 | return; | ||
142 | } | ||
143 | mProcessingLock = true; | ||
144 | auto job = processPipeline().then<void>([this]() { | ||
145 | mProcessingLock = false; | ||
146 | if (messagesToProcessAvailable()) { | ||
147 | process(); | ||
148 | } | ||
149 | }).exec(); | ||
150 | } | ||
151 | |||
152 | Async::Job<void> processQueuedCommand(const Akonadi2::QueuedCommand *queuedCommand) | ||
153 | { | ||
154 | Log() << "Processing command: " << Akonadi2::Commands::name(queuedCommand->commandId()); | ||
155 | //Throw command into appropriate pipeline | ||
156 | switch (queuedCommand->commandId()) { | ||
157 | case Akonadi2::Commands::DeleteEntityCommand: | ||
158 | //mPipeline->removedEntity | ||
159 | return Async::null<void>(); | ||
160 | case Akonadi2::Commands::ModifyEntityCommand: | ||
161 | //mPipeline->modifiedEntity | ||
162 | return Async::null<void>(); | ||
163 | case Akonadi2::Commands::CreateEntityCommand: | ||
164 | return mPipeline->newEntity(queuedCommand->command()->Data(), queuedCommand->command()->size()); | ||
165 | default: | ||
166 | return Async::error<void>(-1, "Unhandled command"); | ||
167 | } | ||
168 | return Async::null<void>(); | ||
169 | } | ||
170 | |||
171 | //Process all messages of this queue | ||
172 | Async::Job<void> processQueue(MessageQueue *queue) | ||
173 | { | ||
174 | //TODO use something like: | ||
175 | //Async::foreach("pass iterator here").each("process value here").join(); | ||
176 | //Async::foreach("pass iterator here").parallel("process value here").join(); | ||
177 | return Async::dowhile( | ||
178 | [this, queue](Async::Future<bool> &future) { | ||
179 | if (queue->isEmpty()) { | ||
180 | future.setValue(false); | ||
181 | future.setFinished(); | ||
182 | return; | ||
183 | } | ||
184 | queue->dequeue( | ||
185 | [this, &future](void *ptr, int size, std::function<void(bool success)> messageQueueCallback) { | ||
186 | auto callback = [messageQueueCallback, &future](bool success) { | ||
187 | messageQueueCallback(success); | ||
188 | future.setValue(!success); | ||
189 | future.setFinished(); | ||
190 | }; | ||
191 | |||
192 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(ptr), size); | ||
193 | if (!Akonadi2::VerifyQueuedCommandBuffer(verifyer)) { | ||
194 | Warning() << "invalid buffer"; | ||
195 | callback(false); | ||
196 | return; | ||
197 | } | ||
198 | auto queuedCommand = Akonadi2::GetQueuedCommand(ptr); | ||
199 | Trace() << "Dequeued Command: " << Akonadi2::Commands::name(queuedCommand->commandId()); | ||
200 | //TODO JOBAPI: job lifetime management | ||
201 | //Right now we're just leaking jobs. In this case we'd like jobs that are heap allocated and delete | ||
202 | //themselves once done. In other cases we'd like jobs that only live as long as their handle though. | ||
203 | //FIXME this job is stack allocated and thus simply dies.... | ||
204 | processQueuedCommand(queuedCommand).then<void>( | ||
205 | [callback]() { | ||
206 | callback(true); | ||
207 | }, | ||
208 | [callback](int errorCode, QString errorMessage) { | ||
209 | Warning() << "Error while processing queue command: " << errorMessage; | ||
210 | callback(false); | ||
211 | } | ||
212 | ).exec(); | ||
213 | }, | ||
214 | [&future](const MessageQueue::Error &error) { | ||
215 | Warning() << "Error while getting message from messagequeue: " << error.message; | ||
216 | future.setValue(false); | ||
217 | future.setFinished(); | ||
218 | } | ||
219 | ); | ||
220 | } | ||
221 | ); | ||
222 | } | ||
223 | |||
224 | Async::Job<void> processPipeline() | ||
225 | { | ||
226 | //Go through all message queues | ||
227 | auto it = QSharedPointer<QListIterator<MessageQueue*> >::create(mCommandQueues); | ||
228 | return Async::dowhile( | ||
229 | [it]() { return it->hasNext(); }, | ||
230 | [it, this](Async::Future<void> &future) { | ||
231 | auto queue = it->next(); | ||
232 | processQueue(queue).then<void>([&future]() { | ||
233 | Trace() << "Queue processed"; | ||
234 | future.setFinished(); | ||
235 | }).exec(); | ||
236 | } | ||
237 | ); | ||
238 | } | ||
239 | |||
240 | private: | ||
241 | Akonadi2::Pipeline *mPipeline; | ||
242 | //Ordered by priority | ||
243 | QList<MessageQueue*> mCommandQueues; | ||
244 | bool mProcessingLock; | ||
245 | }; | ||
246 | |||
247 | DummyResource::DummyResource() | ||
248 | : Akonadi2::Resource(), | ||
249 | mUserQueue(QStandardPaths::writableLocation(QStandardPaths::GenericDataLocation) + "/akonadi2/storage", "org.kde.dummy.userqueue"), | ||
250 | mSynchronizerQueue(QStandardPaths::writableLocation(QStandardPaths::GenericDataLocation) + "/akonadi2/storage", "org.kde.dummy.synchronizerqueue"), | ||
251 | mError(0) | ||
252 | { | ||
253 | } | ||
254 | |||
255 | void DummyResource::configurePipeline(Akonadi2::Pipeline *pipeline) | ||
256 | { | ||
257 | auto eventFactory = QSharedPointer<DummyEventAdaptorFactory>::create(); | ||
258 | //FIXME we should setup for each resource entity type, not for each domain type | ||
259 | //i.e. If a resource stores tags as part of each message it needs to update the tag index | ||
260 | //TODO setup preprocessors for each resource entity type and pipeline type allowing full customization | ||
261 | //Eventually the order should be self configuring, for now it's hardcoded. | ||
262 | auto eventIndexer = new SimpleProcessor("summaryprocessor", [eventFactory](const Akonadi2::PipelineState &state, const Akonadi2::Entity &entity) { | ||
263 | auto adaptor = eventFactory->createAdaptor(entity); | ||
264 | // Log() << "Summary preprocessor: " << adaptor->getProperty("summary").toString(); | ||
265 | }); | ||
266 | |||
267 | auto uidIndexer = new SimpleProcessor("uidIndexer", [eventFactory](const Akonadi2::PipelineState &state, const Akonadi2::Entity &entity) { | ||
268 | static Index uidIndex(QStandardPaths::writableLocation(QStandardPaths::GenericDataLocation) + "/akonadi2/storage", "org.kde.dummy.index.uid", Akonadi2::Storage::ReadWrite); | ||
269 | |||
270 | //TODO: Benchmark if this is performance wise acceptable, or if we have to access the buffer directly | ||
271 | auto adaptor = eventFactory->createAdaptor(entity); | ||
272 | const auto uid = adaptor->getProperty("uid"); | ||
273 | if (uid.isValid()) { | ||
274 | uidIndex.add(uid.toByteArray(), state.key()); | ||
275 | } | ||
276 | }); | ||
277 | |||
278 | //event is the entitytype and not the domain type | ||
279 | pipeline->setPreprocessors("event", Akonadi2::Pipeline::NewPipeline, QVector<Akonadi2::Preprocessor*>() << eventIndexer << uidIndexer); | ||
280 | mProcessor = new Processor(pipeline, QList<MessageQueue*>() << &mUserQueue << &mSynchronizerQueue); | ||
281 | QObject::connect(mProcessor, &Processor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); }); | ||
282 | } | ||
283 | |||
284 | void DummyResource::onProcessorError(int errorCode, const QString &errorMessage) | ||
285 | { | ||
286 | Warning() << "Received error from Processor: " << errorCode << errorMessage; | ||
287 | mError = errorCode; | ||
288 | } | ||
289 | |||
290 | int DummyResource::error() const | ||
291 | { | ||
292 | return mError; | ||
293 | } | ||
294 | |||
295 | void findByRemoteId(QSharedPointer<Akonadi2::Storage> storage, const QString &rid, std::function<void(void *keyValue, int keySize, void *dataValue, int dataSize)> callback) | ||
296 | { | ||
297 | //TODO lookup in rid index instead of doing a full scan | ||
298 | const std::string ridString = rid.toStdString(); | ||
299 | storage->scan("", [&](void *keyValue, int keySize, void *dataValue, int dataSize) -> bool { | ||
300 | if (Akonadi2::Storage::isInternalKey(keyValue, keySize)) { | ||
301 | return true; | ||
302 | } | ||
303 | |||
304 | Akonadi2::EntityBuffer::extractResourceBuffer(dataValue, dataSize, [&](const uint8_t *buffer, size_t size) { | ||
305 | flatbuffers::Verifier verifier(buffer, size); | ||
306 | if (DummyCalendar::VerifyDummyEventBuffer(verifier)) { | ||
307 | DummyCalendar::DummyEvent const *resourceBuffer = DummyCalendar::GetDummyEvent(buffer); | ||
308 | if (resourceBuffer && resourceBuffer->remoteId()) { | ||
309 | if (std::string(resourceBuffer->remoteId()->c_str(), resourceBuffer->remoteId()->size()) == ridString) { | ||
310 | callback(keyValue, keySize, dataValue, dataSize); | ||
311 | } | ||
312 | } | ||
313 | } | ||
314 | }); | ||
315 | return true; | ||
316 | }); | ||
317 | } | ||
318 | |||
319 | void DummyResource::enqueueCommand(MessageQueue &mq, int commandId, const QByteArray &data) | ||
320 | { | ||
321 | m_fbb.Clear(); | ||
322 | auto commandData = Akonadi2::EntityBuffer::appendAsVector(m_fbb, data.constData(), data.size()); | ||
323 | auto buffer = Akonadi2::CreateQueuedCommand(m_fbb, commandId, commandData); | ||
324 | Akonadi2::FinishQueuedCommandBuffer(m_fbb, buffer); | ||
325 | mq.enqueue(m_fbb.GetBufferPointer(), m_fbb.GetSize()); | ||
326 | } | ||
327 | |||
328 | Async::Job<void> DummyResource::synchronizeWithSource(Akonadi2::Pipeline *pipeline) | ||
329 | { | ||
330 | return Async::start<void>([this, pipeline](Async::Future<void> &f) { | ||
331 | //TODO use a read-only transaction during the complete sync to sync against a defined revision | ||
332 | auto storage = QSharedPointer<Akonadi2::Storage>::create(Akonadi2::Store::storageLocation(), "org.kde.dummy"); | ||
333 | for (auto it = s_dataSource.constBegin(); it != s_dataSource.constEnd(); it++) { | ||
334 | bool isNew = true; | ||
335 | if (storage->exists()) { | ||
336 | findByRemoteId(storage, it.key(), [&](void *keyValue, int keySize, void *dataValue, int dataSize) { | ||
337 | isNew = false; | ||
338 | }); | ||
339 | } | ||
340 | if (isNew) { | ||
341 | m_fbb.Clear(); | ||
342 | |||
343 | const QByteArray data = it.value().toUtf8(); | ||
344 | auto eventBuffer = DummyCalendar::GetDummyEvent(data.data()); | ||
345 | |||
346 | //Map the source format to the buffer format (which happens to be an exact copy here) | ||
347 | auto summary = m_fbb.CreateString(eventBuffer->summary()->c_str()); | ||
348 | auto rid = m_fbb.CreateString(it.key().toStdString().c_str()); | ||
349 | auto description = m_fbb.CreateString(it.key().toStdString().c_str()); | ||
350 | static uint8_t rawData[100]; | ||
351 | auto attachment = Akonadi2::EntityBuffer::appendAsVector(m_fbb, rawData, 100); | ||
352 | |||
353 | auto builder = DummyCalendar::DummyEventBuilder(m_fbb); | ||
354 | builder.add_summary(summary); | ||
355 | builder.add_remoteId(rid); | ||
356 | builder.add_description(description); | ||
357 | builder.add_attachment(attachment); | ||
358 | auto buffer = builder.Finish(); | ||
359 | DummyCalendar::FinishDummyEventBuffer(m_fbb, buffer); | ||
360 | flatbuffers::FlatBufferBuilder entityFbb; | ||
361 | Akonadi2::EntityBuffer::assembleEntityBuffer(entityFbb, 0, 0, m_fbb.GetBufferPointer(), m_fbb.GetSize(), 0, 0); | ||
362 | |||
363 | flatbuffers::FlatBufferBuilder fbb; | ||
364 | //This is the resource type and not the domain type | ||
365 | auto type = fbb.CreateString("event"); | ||
366 | auto delta = Akonadi2::EntityBuffer::appendAsVector(fbb, entityFbb.GetBufferPointer(), entityFbb.GetSize()); | ||
367 | auto location = Akonadi2::Commands::CreateCreateEntity(fbb, type, delta); | ||
368 | Akonadi2::Commands::FinishCreateEntityBuffer(fbb, location); | ||
369 | |||
370 | enqueueCommand(mSynchronizerQueue, Akonadi2::Commands::CreateEntityCommand, QByteArray::fromRawData(reinterpret_cast<char const *>(fbb.GetBufferPointer()), fbb.GetSize())); | ||
371 | } else { //modification | ||
372 | //TODO diff and create modification if necessary | ||
373 | } | ||
374 | } | ||
375 | //TODO find items to remove | ||
376 | f.setFinished(); | ||
377 | }); | ||
378 | } | ||
379 | |||
380 | Async::Job<void> DummyResource::processAllMessages() | ||
381 | { | ||
382 | //We have to wait for all items to be processed to ensure the synced items are available when a query gets executed. | ||
383 | //TODO: report errors while processing sync? | ||
384 | //TODO JOBAPI: A helper that waits for n events and then continues? | ||
385 | return Async::start<void>([this](Async::Future<void> &f) { | ||
386 | if (mSynchronizerQueue.isEmpty()) { | ||
387 | f.setFinished(); | ||
388 | } else { | ||
389 | QObject::connect(&mSynchronizerQueue, &MessageQueue::drained, [&f]() { | ||
390 | f.setFinished(); | ||
391 | }); | ||
392 | } | ||
393 | }).then<void>([this](Async::Future<void> &f) { | ||
394 | if (mUserQueue.isEmpty()) { | ||
395 | f.setFinished(); | ||
396 | } else { | ||
397 | QObject::connect(&mUserQueue, &MessageQueue::drained, [&f]() { | ||
398 | f.setFinished(); | ||
399 | }); | ||
400 | } | ||
401 | }); | ||
402 | } | ||
403 | |||
404 | void DummyResource::processCommand(int commandId, const QByteArray &data, uint size, Akonadi2::Pipeline *pipeline) | ||
405 | { | ||
406 | //TODO instead of copying the command including the full entity first into the command queue, we could directly | ||
407 | //create a new revision, only pushing a handle into the commandqueue with the relevant changeset (for changereplay). | ||
408 | //The problem is that we then require write access from multiple threads (or even processes to avoid sending the full entity over the wire). | ||
409 | enqueueCommand(mUserQueue, commandId, data); | ||
410 | } | ||
411 | |||
412 | DummyResourceFactory::DummyResourceFactory(QObject *parent) | ||
413 | : Akonadi2::ResourceFactory(parent) | ||
414 | { | ||
415 | |||
416 | } | ||
417 | |||
418 | Akonadi2::Resource *DummyResourceFactory::createResource() | ||
419 | { | ||
420 | return new DummyResource(); | ||
421 | } | ||
422 | |||
423 | void DummyResourceFactory::registerFacades(Akonadi2::FacadeFactory &factory) | ||
424 | { | ||
425 | factory.registerFacade<Akonadi2::ApplicationDomain::Event, DummyResourceFacade>(PLUGIN_NAME); | ||
426 | } | ||
427 | |||
428 | #include "resourcefactory.moc" | ||