summaryrefslogtreecommitdiffstats
path: root/dummyresource
diff options
context:
space:
mode:
Diffstat (limited to 'dummyresource')
-rw-r--r--dummyresource/domainadaptor.cpp5
-rw-r--r--dummyresource/facade.cpp24
-rw-r--r--dummyresource/resourcefactory.cpp123
-rw-r--r--dummyresource/resourcefactory.h6
4 files changed, 142 insertions, 16 deletions
diff --git a/dummyresource/domainadaptor.cpp b/dummyresource/domainadaptor.cpp
index ae001cf..9bd3770 100644
--- a/dummyresource/domainadaptor.cpp
+++ b/dummyresource/domainadaptor.cpp
@@ -29,8 +29,8 @@ public:
29 29
30 void setProperty(const QString &key, const QVariant &value) 30 void setProperty(const QString &key, const QVariant &value)
31 { 31 {
32 if (mResourceMapper->mWriteAccessors.contains(key)) { 32 if (mResourceMapper && mResourceMapper->mWriteAccessors.contains(key)) {
33 // mResourceMapper.setProperty(key, value, mResourceBuffer); 33 // mResourceMapper->setProperty(key, value, mResourceBuffer);
34 } else { 34 } else {
35 // mLocalMapper.; 35 // mLocalMapper.;
36 } 36 }
@@ -69,6 +69,7 @@ DummyEventAdaptorFactory::DummyEventAdaptorFactory()
69 mResourceMapper->mReadAccessors.insert("summary", [](DummyEvent const *buffer) -> QVariant { 69 mResourceMapper->mReadAccessors.insert("summary", [](DummyEvent const *buffer) -> QVariant {
70 return QString::fromStdString(buffer->summary()->c_str()); 70 return QString::fromStdString(buffer->summary()->c_str());
71 }); 71 });
72 mLocalMapper = QSharedPointer<PropertyMapper<Akonadi2::Domain::Buffer::Event> >::create();
72 //TODO set accessors for all properties 73 //TODO set accessors for all properties
73 74
74} 75}
diff --git a/dummyresource/facade.cpp b/dummyresource/facade.cpp
index f754c7e..668fbbf 100644
--- a/dummyresource/facade.cpp
+++ b/dummyresource/facade.cpp
@@ -28,6 +28,7 @@
28#include "event_generated.h" 28#include "event_generated.h"
29#include "entity_generated.h" 29#include "entity_generated.h"
30#include "metadata_generated.h" 30#include "metadata_generated.h"
31#include "createentity_generated.h"
31#include "domainadaptor.h" 32#include "domainadaptor.h"
32#include <common/entitybuffer.h> 33#include <common/entitybuffer.h>
33 34
@@ -48,6 +49,29 @@ DummyResourceFacade::~DummyResourceFacade()
48void DummyResourceFacade::create(const Akonadi2::Domain::Event &domainObject) 49void DummyResourceFacade::create(const Akonadi2::Domain::Event &domainObject)
49{ 50{
50 //Create message buffer and send to resource 51 //Create message buffer and send to resource
52 flatbuffers::FlatBufferBuilder eventFbb;
53 eventFbb.Clear();
54 {
55 auto summary = eventFbb.CreateString("summary");
56 // auto data = fbb.CreateUninitializedVector<uint8_t>(attachmentSize);
57 DummyCalendar::DummyEventBuilder eventBuilder(eventFbb);
58 eventBuilder.add_summary(summary);
59 auto eventLocation = eventBuilder.Finish();
60 DummyCalendar::FinishDummyEventBuffer(eventFbb, eventLocation);
61 // memcpy((void*)DummyCalendar::GetDummyEvent(fbb.GetBufferPointer())->attachment()->Data(), rawData, attachmentSize);
62 }
63 flatbuffers::FlatBufferBuilder entityFbb;
64 Akonadi2::EntityBuffer::assembleEntityBuffer(entityFbb, 0, 0, eventFbb.GetBufferPointer(), eventFbb.GetSize(), 0, 0);
65
66 flatbuffers::FlatBufferBuilder fbb;
67 auto type = fbb.CreateString(Akonadi2::Domain::getTypeName<Akonadi2::Domain::Event>().toStdString().data());
68 auto delta = fbb.CreateVector<uint8_t>(entityFbb.GetBufferPointer(), entityFbb.GetSize());
69 Akonadi2::Commands::CreateEntityBuilder builder(fbb);
70 builder.add_domainType(type);
71 builder.add_delta(delta);
72 auto location = builder.Finish();
73 Akonadi2::Commands::FinishCreateEntityBuffer(fbb, location);
74 mResourceAccess->sendCommand(Akonadi2::Commands::CreateEntityCommand, fbb);
51} 75}
52 76
53void DummyResourceFacade::modify(const Akonadi2::Domain::Event &domainObject) 77void DummyResourceFacade::modify(const Akonadi2::Domain::Event &domainObject)
diff --git a/dummyresource/resourcefactory.cpp b/dummyresource/resourcefactory.cpp
index e4f7e41..e14aa01 100644
--- a/dummyresource/resourcefactory.cpp
+++ b/dummyresource/resourcefactory.cpp
@@ -23,18 +23,20 @@
23#include "pipeline.h" 23#include "pipeline.h"
24#include "dummycalendar_generated.h" 24#include "dummycalendar_generated.h"
25#include "metadata_generated.h" 25#include "metadata_generated.h"
26#include "queuedcommand_generated.h"
26#include "domainadaptor.h" 27#include "domainadaptor.h"
28#include "commands.h"
29#include "clientapi.h"
27#include <QUuid> 30#include <QUuid>
28 31
29/* 32/*
30 * Figure out how to implement various classes of processors: 33 * Figure out how to implement various classes of processors:
31 * * read-only (index and such) => domain adapter 34 * * read-only (index and such) => extractor function, probably using domain adaptor
32 * * filter => provide means to move entity elsewhere, and also reflect change in source (I guess?) 35 * * filter => provide means to move entity elsewhere, and also reflect change in source (I guess?)
33 * * flag extractors? => like read-only? Or write to local portion of buffer? 36 * * flag extractors? => like read-only? Or write to local portion of buffer?
34 * ** $ISSPAM should become part of domain object and is written to the local part of the mail. 37 * ** $ISSPAM should become part of domain object and is written to the local part of the mail.
35 * ** => value could be calculated by the server directly 38 * ** => value could be calculated by the server directly
36 */ 39 */
37// template <typename DomainType>
38class SimpleProcessor : public Akonadi2::Preprocessor 40class SimpleProcessor : public Akonadi2::Preprocessor
39{ 41{
40public: 42public:
@@ -96,21 +98,108 @@ QMap<QString, QString> populate()
96 98
97static QMap<QString, QString> s_dataSource = populate(); 99static QMap<QString, QString> s_dataSource = populate();
98 100
101//Drives the pipeline using the output from all command queues
102class Processor : public QObject
103{
104 Q_OBJECT
105public:
106 Processor(Akonadi2::Pipeline *pipeline, QList<MessageQueue*> commandQueues)
107 : QObject(),
108 mPipeline(pipeline),
109 mCommandQueues(commandQueues),
110 mProcessingLock(false)
111 {
112 for (auto queue : mCommandQueues) {
113 bool ret = connect(queue, &MessageQueue::messageReady, this, &Processor::process);
114 Q_ASSERT(ret);
115 }
116 }
117
118private slots:
119 void process()
120 {
121 if (mProcessingLock) {
122 return;
123 }
124 mProcessingLock = true;
125 //Empty queue after queue
126 //FIXME the for and while loops should be async, otherwise we process all messages in parallel
127 for (auto queue : mCommandQueues) {
128 qDebug() << "processing queue";
129 bool processedMessage = false;
130 while (processedMessage) {
131 qDebug() << "process";
132 processedMessage = false;
133 queue->dequeue([this, &processedMessage](void *ptr, int size, std::function<void(bool success)> messageQueueCallback) {
134 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(ptr), size);
135 if (!Akonadi2::VerifyQueuedCommandBuffer(verifyer)) {
136 qWarning() << "invalid buffer";
137 processedMessage = false;
138 messageQueueCallback(false);
139 return;
140 }
141 auto queuedCommand = Akonadi2::GetQueuedCommand(ptr);
142 qDebug() << "Dequeued: " << queuedCommand->commandId();
143 //Throw command into appropriate pipeline
144 switch (queuedCommand->commandId()) {
145 case Akonadi2::Commands::DeleteEntityCommand:
146 //mPipeline->removedEntity
147 break;
148 case Akonadi2::Commands::ModifyEntityCommand:
149 //mPipeline->modifiedEntity
150 break;
151 case Akonadi2::Commands::CreateEntityCommand: {
152 //TODO job lifetime management
153 auto job = mPipeline->newEntity(queuedCommand->command()->Data(), queuedCommand->command()->size()).then<void>([&messageQueueCallback](Async::Future<void> future) {
154 messageQueueCallback(true);
155 });
156 job.exec();
157 }
158 break;
159 default:
160 //Unhandled command
161 qWarning() << "Unhandled command";
162 messageQueueCallback(true);
163 break;
164 }
165 processedMessage = true;
166 },
167 [&processedMessage](const MessageQueue::Error &error) {
168 processedMessage = false;
169 });
170 }
171 }
172 mProcessingLock = false;
173 }
174
175private:
176 Akonadi2::Pipeline *mPipeline;
177 //Ordered by priority
178 QList<MessageQueue*> mCommandQueues;
179 bool mProcessingLock;
180};
181
99DummyResource::DummyResource() 182DummyResource::DummyResource()
100 : Akonadi2::Resource() 183 : Akonadi2::Resource(),
184 mUserQueue(QStandardPaths::writableLocation(QStandardPaths::GenericDataLocation) + "/akonadi2/storage", "org.kde.dummy.userqueue"),
185 mSynchronizerQueue(QStandardPaths::writableLocation(QStandardPaths::GenericDataLocation) + "/akonadi2/storage", "org.kde.dummy.synchronizerqueue")
101{ 186{
102} 187}
103 188
104void DummyResource::configurePipeline(Akonadi2::Pipeline *pipeline) 189void DummyResource::configurePipeline(Akonadi2::Pipeline *pipeline)
105{ 190{
106 auto eventFactory = QSharedPointer<DummyEventAdaptorFactory>::create(); 191 auto eventFactory = QSharedPointer<DummyEventAdaptorFactory>::create();
192 //FIXME we should setup for each resource entity type, not for each domain type
193 //i.e. If a resource stores tags as part of each message it needs to update the tag index
107 //TODO setup preprocessors for each domain type and pipeline type allowing full customization 194 //TODO setup preprocessors for each domain type and pipeline type allowing full customization
108 //Eventually the order should be self configuring, for now it's hardcoded. 195 //Eventually the order should be self configuring, for now it's hardcoded.
109 auto eventIndexer = new SimpleProcessor([eventFactory](const Akonadi2::PipelineState &state, const Akonadi2::Entity &entity) { 196 auto eventIndexer = new SimpleProcessor([eventFactory](const Akonadi2::PipelineState &state, const Akonadi2::Entity &entity) {
110 auto adaptor = eventFactory->createAdaptor(entity); 197 auto adaptor = eventFactory->createAdaptor(entity);
111 qDebug() << "Summary preprocessor: " << adaptor->getProperty("summary").toString(); 198 qDebug() << "Summary preprocessor: " << adaptor->getProperty("summary").toString();
112 }); 199 });
113 pipeline->setPreprocessors<Akonadi2::Domain::Event>(Akonadi2::Pipeline::NewPipeline, QVector<Akonadi2::Preprocessor*>() << eventIndexer); 200 //event is the entitytype and not the domain type
201 pipeline->setPreprocessors("event", Akonadi2::Pipeline::NewPipeline, QVector<Akonadi2::Preprocessor*>() << eventIndexer);
202 mProcessor = new Processor(pipeline, QList<MessageQueue*>() << &mUserQueue << &mSynchronizerQueue);
114} 203}
115 204
116void findByRemoteId(QSharedPointer<Akonadi2::Storage> storage, const QString &rid, std::function<void(void *keyValue, int keySize, void *dataValue, int dataSize)> callback) 205void findByRemoteId(QSharedPointer<Akonadi2::Storage> storage, const QString &rid, std::function<void(void *keyValue, int keySize, void *dataValue, int dataSize)> callback)
@@ -139,6 +228,7 @@ void findByRemoteId(QSharedPointer<Akonadi2::Storage> storage, const QString &ri
139 228
140Async::Job<void> DummyResource::synchronizeWithSource(Akonadi2::Pipeline *pipeline) 229Async::Job<void> DummyResource::synchronizeWithSource(Akonadi2::Pipeline *pipeline)
141{ 230{
231 qDebug() << "synchronizeWithSource";
142 return Async::start<void>([this, pipeline](Async::Future<void> &f) { 232 return Async::start<void>([this, pipeline](Async::Future<void> &f) {
143 //TODO use a read-only transaction during the complete sync to sync against a defined revision 233 //TODO use a read-only transaction during the complete sync to sync against a defined revision
144 auto storage = QSharedPointer<Akonadi2::Storage>::create(Akonadi2::Store::storageLocation(), "org.kde.dummy"); 234 auto storage = QSharedPointer<Akonadi2::Storage>::create(Akonadi2::Store::storageLocation(), "org.kde.dummy");
@@ -171,7 +261,9 @@ Async::Job<void> DummyResource::synchronizeWithSource(Akonadi2::Pipeline *pipeli
171 DummyCalendar::FinishDummyEventBuffer(m_fbb, buffer); 261 DummyCalendar::FinishDummyEventBuffer(m_fbb, buffer);
172 //TODO toRFC4122 would probably be more efficient, but results in non-printable keys. 262 //TODO toRFC4122 would probably be more efficient, but results in non-printable keys.
173 const auto key = QUuid::createUuid().toString().toUtf8(); 263 const auto key = QUuid::createUuid().toString().toUtf8();
174 pipeline->newEntity<Akonadi2::Domain::Event>(key, m_fbb.GetBufferPointer(), m_fbb.GetSize()); 264 //TODO Create queuedcommand and push into synchronizerQueue
265 //* create message in store directly, add command to messagequeue waiting for processing.
266 // pipeline->newEntity<Akonadi2::Domain::Event>(key, m_fbb.GetBufferPointer(), m_fbb.GetSize());
175 } else { //modification 267 } else { //modification
176 //TODO diff and create modification if necessary 268 //TODO diff and create modification if necessary
177 } 269 }
@@ -183,16 +275,18 @@ Async::Job<void> DummyResource::synchronizeWithSource(Akonadi2::Pipeline *pipeli
183 275
184void DummyResource::processCommand(int commandId, const QByteArray &data, uint size, Akonadi2::Pipeline *pipeline) 276void DummyResource::processCommand(int commandId, const QByteArray &data, uint size, Akonadi2::Pipeline *pipeline)
185{ 277{
186 Q_UNUSED(commandId) 278 qDebug() << "processCommand";
187 Q_UNUSED(data) 279 //TODO instead of copying the command including the full entity first into the command queue, we could directly
188 Q_UNUSED(size) 280 //create a new revision, only pushing a handle into the commandqueue with the relevant changeset (for changereplay).
189 //TODO reallly process the commands :) 281 //The problem is that we then require write access from multiple threads (or even processes to avoid sending the full entity over the wire).
190 auto builder = DummyCalendar::DummyEventBuilder(m_fbb);
191 builder .add_summary(m_fbb.CreateString("summary summary!"));
192 auto buffer = builder.Finish();
193 DummyCalendar::FinishDummyEventBuffer(m_fbb, buffer);
194 pipeline->newEntity<Akonadi2::Domain::Event>("fakekey", m_fbb.GetBufferPointer(), m_fbb.GetSize());
195 m_fbb.Clear(); 282 m_fbb.Clear();
283 auto commandData = m_fbb.CreateVector(reinterpret_cast<uint8_t const *>(data.data()), data.size());
284 auto builder = Akonadi2::QueuedCommandBuilder(m_fbb);
285 builder.add_commandId(commandId);
286 builder.add_command(commandData);
287 auto buffer = builder.Finish();
288 Akonadi2::FinishQueuedCommandBuffer(m_fbb, buffer);
289 mUserQueue.enqueue(m_fbb.GetBufferPointer(), m_fbb.GetSize());
196} 290}
197 291
198DummyResourceFactory::DummyResourceFactory(QObject *parent) 292DummyResourceFactory::DummyResourceFactory(QObject *parent)
@@ -211,3 +305,4 @@ void DummyResourceFactory::registerFacades(Akonadi2::FacadeFactory &factory)
211 factory.registerFacade<Akonadi2::Domain::Event, DummyResourceFacade>(PLUGIN_NAME); 305 factory.registerFacade<Akonadi2::Domain::Event, DummyResourceFacade>(PLUGIN_NAME);
212} 306}
213 307
308#include "resourcefactory.moc"
diff --git a/dummyresource/resourcefactory.h b/dummyresource/resourcefactory.h
index 427fcc6..682f25c 100644
--- a/dummyresource/resourcefactory.h
+++ b/dummyresource/resourcefactory.h
@@ -21,12 +21,15 @@
21 21
22#include "common/resource.h" 22#include "common/resource.h"
23#include "async/src/async.h" 23#include "async/src/async.h"
24#include "common/messagequeue.h"
24 25
25#include <flatbuffers/flatbuffers.h> 26#include <flatbuffers/flatbuffers.h>
26 27
27//TODO: a little ugly to have this in two places, once here and once in Q_PLUGIN_METADATA 28//TODO: a little ugly to have this in two places, once here and once in Q_PLUGIN_METADATA
28#define PLUGIN_NAME "org.kde.dummy" 29#define PLUGIN_NAME "org.kde.dummy"
29 30
31class Processor;
32
30class DummyResource : public Akonadi2::Resource 33class DummyResource : public Akonadi2::Resource
31{ 34{
32public: 35public:
@@ -37,6 +40,9 @@ public:
37 40
38private: 41private:
39 flatbuffers::FlatBufferBuilder m_fbb; 42 flatbuffers::FlatBufferBuilder m_fbb;
43 MessageQueue mUserQueue;
44 MessageQueue mSynchronizerQueue;
45 Processor *mProcessor;
40}; 46};
41 47
42class DummyResourceFactory : public Akonadi2::ResourceFactory 48class DummyResourceFactory : public Akonadi2::ResourceFactory