summaryrefslogtreecommitdiffstats
path: root/dummyresource/resourcefactory.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'dummyresource/resourcefactory.cpp')
-rw-r--r--dummyresource/resourcefactory.cpp123
1 files changed, 109 insertions, 14 deletions
diff --git a/dummyresource/resourcefactory.cpp b/dummyresource/resourcefactory.cpp
index e4f7e41..e14aa01 100644
--- a/dummyresource/resourcefactory.cpp
+++ b/dummyresource/resourcefactory.cpp
@@ -23,18 +23,20 @@
23#include "pipeline.h" 23#include "pipeline.h"
24#include "dummycalendar_generated.h" 24#include "dummycalendar_generated.h"
25#include "metadata_generated.h" 25#include "metadata_generated.h"
26#include "queuedcommand_generated.h"
26#include "domainadaptor.h" 27#include "domainadaptor.h"
28#include "commands.h"
29#include "clientapi.h"
27#include <QUuid> 30#include <QUuid>
28 31
29/* 32/*
30 * Figure out how to implement various classes of processors: 33 * Figure out how to implement various classes of processors:
31 * * read-only (index and such) => domain adapter 34 * * read-only (index and such) => extractor function, probably using domain adaptor
32 * * filter => provide means to move entity elsewhere, and also reflect change in source (I guess?) 35 * * filter => provide means to move entity elsewhere, and also reflect change in source (I guess?)
33 * * flag extractors? => like read-only? Or write to local portion of buffer? 36 * * flag extractors? => like read-only? Or write to local portion of buffer?
34 * ** $ISSPAM should become part of domain object and is written to the local part of the mail. 37 * ** $ISSPAM should become part of domain object and is written to the local part of the mail.
35 * ** => value could be calculated by the server directly 38 * ** => value could be calculated by the server directly
36 */ 39 */
37// template <typename DomainType>
38class SimpleProcessor : public Akonadi2::Preprocessor 40class SimpleProcessor : public Akonadi2::Preprocessor
39{ 41{
40public: 42public:
@@ -96,21 +98,108 @@ QMap<QString, QString> populate()
96 98
97static QMap<QString, QString> s_dataSource = populate(); 99static QMap<QString, QString> s_dataSource = populate();
98 100
101//Drives the pipeline using the output from all command queues
102class Processor : public QObject
103{
104 Q_OBJECT
105public:
106 Processor(Akonadi2::Pipeline *pipeline, QList<MessageQueue*> commandQueues)
107 : QObject(),
108 mPipeline(pipeline),
109 mCommandQueues(commandQueues),
110 mProcessingLock(false)
111 {
112 for (auto queue : mCommandQueues) {
113 bool ret = connect(queue, &MessageQueue::messageReady, this, &Processor::process);
114 Q_ASSERT(ret);
115 }
116 }
117
118private slots:
119 void process()
120 {
121 if (mProcessingLock) {
122 return;
123 }
124 mProcessingLock = true;
125 //Empty queue after queue
126 //FIXME the for and while loops should be async, otherwise we process all messages in parallel
127 for (auto queue : mCommandQueues) {
128 qDebug() << "processing queue";
129 bool processedMessage = false;
130 while (processedMessage) {
131 qDebug() << "process";
132 processedMessage = false;
133 queue->dequeue([this, &processedMessage](void *ptr, int size, std::function<void(bool success)> messageQueueCallback) {
134 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(ptr), size);
135 if (!Akonadi2::VerifyQueuedCommandBuffer(verifyer)) {
136 qWarning() << "invalid buffer";
137 processedMessage = false;
138 messageQueueCallback(false);
139 return;
140 }
141 auto queuedCommand = Akonadi2::GetQueuedCommand(ptr);
142 qDebug() << "Dequeued: " << queuedCommand->commandId();
143 //Throw command into appropriate pipeline
144 switch (queuedCommand->commandId()) {
145 case Akonadi2::Commands::DeleteEntityCommand:
146 //mPipeline->removedEntity
147 break;
148 case Akonadi2::Commands::ModifyEntityCommand:
149 //mPipeline->modifiedEntity
150 break;
151 case Akonadi2::Commands::CreateEntityCommand: {
152 //TODO job lifetime management
153 auto job = mPipeline->newEntity(queuedCommand->command()->Data(), queuedCommand->command()->size()).then<void>([&messageQueueCallback](Async::Future<void> future) {
154 messageQueueCallback(true);
155 });
156 job.exec();
157 }
158 break;
159 default:
160 //Unhandled command
161 qWarning() << "Unhandled command";
162 messageQueueCallback(true);
163 break;
164 }
165 processedMessage = true;
166 },
167 [&processedMessage](const MessageQueue::Error &error) {
168 processedMessage = false;
169 });
170 }
171 }
172 mProcessingLock = false;
173 }
174
175private:
176 Akonadi2::Pipeline *mPipeline;
177 //Ordered by priority
178 QList<MessageQueue*> mCommandQueues;
179 bool mProcessingLock;
180};
181
99DummyResource::DummyResource() 182DummyResource::DummyResource()
100 : Akonadi2::Resource() 183 : Akonadi2::Resource(),
184 mUserQueue(QStandardPaths::writableLocation(QStandardPaths::GenericDataLocation) + "/akonadi2/storage", "org.kde.dummy.userqueue"),
185 mSynchronizerQueue(QStandardPaths::writableLocation(QStandardPaths::GenericDataLocation) + "/akonadi2/storage", "org.kde.dummy.synchronizerqueue")
101{ 186{
102} 187}
103 188
104void DummyResource::configurePipeline(Akonadi2::Pipeline *pipeline) 189void DummyResource::configurePipeline(Akonadi2::Pipeline *pipeline)
105{ 190{
106 auto eventFactory = QSharedPointer<DummyEventAdaptorFactory>::create(); 191 auto eventFactory = QSharedPointer<DummyEventAdaptorFactory>::create();
192 //FIXME we should setup for each resource entity type, not for each domain type
193 //i.e. If a resource stores tags as part of each message it needs to update the tag index
107 //TODO setup preprocessors for each domain type and pipeline type allowing full customization 194 //TODO setup preprocessors for each domain type and pipeline type allowing full customization
108 //Eventually the order should be self configuring, for now it's hardcoded. 195 //Eventually the order should be self configuring, for now it's hardcoded.
109 auto eventIndexer = new SimpleProcessor([eventFactory](const Akonadi2::PipelineState &state, const Akonadi2::Entity &entity) { 196 auto eventIndexer = new SimpleProcessor([eventFactory](const Akonadi2::PipelineState &state, const Akonadi2::Entity &entity) {
110 auto adaptor = eventFactory->createAdaptor(entity); 197 auto adaptor = eventFactory->createAdaptor(entity);
111 qDebug() << "Summary preprocessor: " << adaptor->getProperty("summary").toString(); 198 qDebug() << "Summary preprocessor: " << adaptor->getProperty("summary").toString();
112 }); 199 });
113 pipeline->setPreprocessors<Akonadi2::Domain::Event>(Akonadi2::Pipeline::NewPipeline, QVector<Akonadi2::Preprocessor*>() << eventIndexer); 200 //event is the entitytype and not the domain type
201 pipeline->setPreprocessors("event", Akonadi2::Pipeline::NewPipeline, QVector<Akonadi2::Preprocessor*>() << eventIndexer);
202 mProcessor = new Processor(pipeline, QList<MessageQueue*>() << &mUserQueue << &mSynchronizerQueue);
114} 203}
115 204
116void findByRemoteId(QSharedPointer<Akonadi2::Storage> storage, const QString &rid, std::function<void(void *keyValue, int keySize, void *dataValue, int dataSize)> callback) 205void findByRemoteId(QSharedPointer<Akonadi2::Storage> storage, const QString &rid, std::function<void(void *keyValue, int keySize, void *dataValue, int dataSize)> callback)
@@ -139,6 +228,7 @@ void findByRemoteId(QSharedPointer<Akonadi2::Storage> storage, const QString &ri
139 228
140Async::Job<void> DummyResource::synchronizeWithSource(Akonadi2::Pipeline *pipeline) 229Async::Job<void> DummyResource::synchronizeWithSource(Akonadi2::Pipeline *pipeline)
141{ 230{
231 qDebug() << "synchronizeWithSource";
142 return Async::start<void>([this, pipeline](Async::Future<void> &f) { 232 return Async::start<void>([this, pipeline](Async::Future<void> &f) {
143 //TODO use a read-only transaction during the complete sync to sync against a defined revision 233 //TODO use a read-only transaction during the complete sync to sync against a defined revision
144 auto storage = QSharedPointer<Akonadi2::Storage>::create(Akonadi2::Store::storageLocation(), "org.kde.dummy"); 234 auto storage = QSharedPointer<Akonadi2::Storage>::create(Akonadi2::Store::storageLocation(), "org.kde.dummy");
@@ -171,7 +261,9 @@ Async::Job<void> DummyResource::synchronizeWithSource(Akonadi2::Pipeline *pipeli
171 DummyCalendar::FinishDummyEventBuffer(m_fbb, buffer); 261 DummyCalendar::FinishDummyEventBuffer(m_fbb, buffer);
172 //TODO toRFC4122 would probably be more efficient, but results in non-printable keys. 262 //TODO toRFC4122 would probably be more efficient, but results in non-printable keys.
173 const auto key = QUuid::createUuid().toString().toUtf8(); 263 const auto key = QUuid::createUuid().toString().toUtf8();
174 pipeline->newEntity<Akonadi2::Domain::Event>(key, m_fbb.GetBufferPointer(), m_fbb.GetSize()); 264 //TODO Create queuedcommand and push into synchronizerQueue
265 //* create message in store directly, add command to messagequeue waiting for processing.
266 // pipeline->newEntity<Akonadi2::Domain::Event>(key, m_fbb.GetBufferPointer(), m_fbb.GetSize());
175 } else { //modification 267 } else { //modification
176 //TODO diff and create modification if necessary 268 //TODO diff and create modification if necessary
177 } 269 }
@@ -183,16 +275,18 @@ Async::Job<void> DummyResource::synchronizeWithSource(Akonadi2::Pipeline *pipeli
183 275
184void DummyResource::processCommand(int commandId, const QByteArray &data, uint size, Akonadi2::Pipeline *pipeline) 276void DummyResource::processCommand(int commandId, const QByteArray &data, uint size, Akonadi2::Pipeline *pipeline)
185{ 277{
186 Q_UNUSED(commandId) 278 qDebug() << "processCommand";
187 Q_UNUSED(data) 279 //TODO instead of copying the command including the full entity first into the command queue, we could directly
188 Q_UNUSED(size) 280 //create a new revision, only pushing a handle into the commandqueue with the relevant changeset (for changereplay).
189 //TODO reallly process the commands :) 281 //The problem is that we then require write access from multiple threads (or even processes to avoid sending the full entity over the wire).
190 auto builder = DummyCalendar::DummyEventBuilder(m_fbb);
191 builder .add_summary(m_fbb.CreateString("summary summary!"));
192 auto buffer = builder.Finish();
193 DummyCalendar::FinishDummyEventBuffer(m_fbb, buffer);
194 pipeline->newEntity<Akonadi2::Domain::Event>("fakekey", m_fbb.GetBufferPointer(), m_fbb.GetSize());
195 m_fbb.Clear(); 282 m_fbb.Clear();
283 auto commandData = m_fbb.CreateVector(reinterpret_cast<uint8_t const *>(data.data()), data.size());
284 auto builder = Akonadi2::QueuedCommandBuilder(m_fbb);
285 builder.add_commandId(commandId);
286 builder.add_command(commandData);
287 auto buffer = builder.Finish();
288 Akonadi2::FinishQueuedCommandBuffer(m_fbb, buffer);
289 mUserQueue.enqueue(m_fbb.GetBufferPointer(), m_fbb.GetSize());
196} 290}
197 291
198DummyResourceFactory::DummyResourceFactory(QObject *parent) 292DummyResourceFactory::DummyResourceFactory(QObject *parent)
@@ -211,3 +305,4 @@ void DummyResourceFactory::registerFacades(Akonadi2::FacadeFactory &factory)
211 factory.registerFacade<Akonadi2::Domain::Event, DummyResourceFacade>(PLUGIN_NAME); 305 factory.registerFacade<Akonadi2::Domain::Event, DummyResourceFacade>(PLUGIN_NAME);
212} 306}
213 307
308#include "resourcefactory.moc"