diff options
Diffstat (limited to 'dummyresource/resourcefactory.cpp')
-rw-r--r-- | dummyresource/resourcefactory.cpp | 123 |
1 files changed, 109 insertions, 14 deletions
diff --git a/dummyresource/resourcefactory.cpp b/dummyresource/resourcefactory.cpp index e4f7e41..e14aa01 100644 --- a/dummyresource/resourcefactory.cpp +++ b/dummyresource/resourcefactory.cpp | |||
@@ -23,18 +23,20 @@ | |||
23 | #include "pipeline.h" | 23 | #include "pipeline.h" |
24 | #include "dummycalendar_generated.h" | 24 | #include "dummycalendar_generated.h" |
25 | #include "metadata_generated.h" | 25 | #include "metadata_generated.h" |
26 | #include "queuedcommand_generated.h" | ||
26 | #include "domainadaptor.h" | 27 | #include "domainadaptor.h" |
28 | #include "commands.h" | ||
29 | #include "clientapi.h" | ||
27 | #include <QUuid> | 30 | #include <QUuid> |
28 | 31 | ||
29 | /* | 32 | /* |
30 | * Figure out how to implement various classes of processors: | 33 | * Figure out how to implement various classes of processors: |
31 | * * read-only (index and such) => domain adapter | 34 | * * read-only (index and such) => extractor function, probably using domain adaptor |
32 | * * filter => provide means to move entity elsewhere, and also reflect change in source (I guess?) | 35 | * * filter => provide means to move entity elsewhere, and also reflect change in source (I guess?) |
33 | * * flag extractors? => like read-only? Or write to local portion of buffer? | 36 | * * flag extractors? => like read-only? Or write to local portion of buffer? |
34 | * ** $ISSPAM should become part of domain object and is written to the local part of the mail. | 37 | * ** $ISSPAM should become part of domain object and is written to the local part of the mail. |
35 | * ** => value could be calculated by the server directly | 38 | * ** => value could be calculated by the server directly |
36 | */ | 39 | */ |
37 | // template <typename DomainType> | ||
38 | class SimpleProcessor : public Akonadi2::Preprocessor | 40 | class SimpleProcessor : public Akonadi2::Preprocessor |
39 | { | 41 | { |
40 | public: | 42 | public: |
@@ -96,21 +98,108 @@ QMap<QString, QString> populate() | |||
96 | 98 | ||
97 | static QMap<QString, QString> s_dataSource = populate(); | 99 | static QMap<QString, QString> s_dataSource = populate(); |
98 | 100 | ||
101 | //Drives the pipeline using the output from all command queues | ||
102 | class Processor : public QObject | ||
103 | { | ||
104 | Q_OBJECT | ||
105 | public: | ||
106 | Processor(Akonadi2::Pipeline *pipeline, QList<MessageQueue*> commandQueues) | ||
107 | : QObject(), | ||
108 | mPipeline(pipeline), | ||
109 | mCommandQueues(commandQueues), | ||
110 | mProcessingLock(false) | ||
111 | { | ||
112 | for (auto queue : mCommandQueues) { | ||
113 | bool ret = connect(queue, &MessageQueue::messageReady, this, &Processor::process); | ||
114 | Q_ASSERT(ret); | ||
115 | } | ||
116 | } | ||
117 | |||
118 | private slots: | ||
119 | void process() | ||
120 | { | ||
121 | if (mProcessingLock) { | ||
122 | return; | ||
123 | } | ||
124 | mProcessingLock = true; | ||
125 | //Empty queue after queue | ||
126 | //FIXME the for and while loops should be async, otherwise we process all messages in parallel | ||
127 | for (auto queue : mCommandQueues) { | ||
128 | qDebug() << "processing queue"; | ||
129 | bool processedMessage = false; | ||
130 | while (processedMessage) { | ||
131 | qDebug() << "process"; | ||
132 | processedMessage = false; | ||
133 | queue->dequeue([this, &processedMessage](void *ptr, int size, std::function<void(bool success)> messageQueueCallback) { | ||
134 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(ptr), size); | ||
135 | if (!Akonadi2::VerifyQueuedCommandBuffer(verifyer)) { | ||
136 | qWarning() << "invalid buffer"; | ||
137 | processedMessage = false; | ||
138 | messageQueueCallback(false); | ||
139 | return; | ||
140 | } | ||
141 | auto queuedCommand = Akonadi2::GetQueuedCommand(ptr); | ||
142 | qDebug() << "Dequeued: " << queuedCommand->commandId(); | ||
143 | //Throw command into appropriate pipeline | ||
144 | switch (queuedCommand->commandId()) { | ||
145 | case Akonadi2::Commands::DeleteEntityCommand: | ||
146 | //mPipeline->removedEntity | ||
147 | break; | ||
148 | case Akonadi2::Commands::ModifyEntityCommand: | ||
149 | //mPipeline->modifiedEntity | ||
150 | break; | ||
151 | case Akonadi2::Commands::CreateEntityCommand: { | ||
152 | //TODO job lifetime management | ||
153 | auto job = mPipeline->newEntity(queuedCommand->command()->Data(), queuedCommand->command()->size()).then<void>([&messageQueueCallback](Async::Future<void> future) { | ||
154 | messageQueueCallback(true); | ||
155 | }); | ||
156 | job.exec(); | ||
157 | } | ||
158 | break; | ||
159 | default: | ||
160 | //Unhandled command | ||
161 | qWarning() << "Unhandled command"; | ||
162 | messageQueueCallback(true); | ||
163 | break; | ||
164 | } | ||
165 | processedMessage = true; | ||
166 | }, | ||
167 | [&processedMessage](const MessageQueue::Error &error) { | ||
168 | processedMessage = false; | ||
169 | }); | ||
170 | } | ||
171 | } | ||
172 | mProcessingLock = false; | ||
173 | } | ||
174 | |||
175 | private: | ||
176 | Akonadi2::Pipeline *mPipeline; | ||
177 | //Ordered by priority | ||
178 | QList<MessageQueue*> mCommandQueues; | ||
179 | bool mProcessingLock; | ||
180 | }; | ||
181 | |||
99 | DummyResource::DummyResource() | 182 | DummyResource::DummyResource() |
100 | : Akonadi2::Resource() | 183 | : Akonadi2::Resource(), |
184 | mUserQueue(QStandardPaths::writableLocation(QStandardPaths::GenericDataLocation) + "/akonadi2/storage", "org.kde.dummy.userqueue"), | ||
185 | mSynchronizerQueue(QStandardPaths::writableLocation(QStandardPaths::GenericDataLocation) + "/akonadi2/storage", "org.kde.dummy.synchronizerqueue") | ||
101 | { | 186 | { |
102 | } | 187 | } |
103 | 188 | ||
104 | void DummyResource::configurePipeline(Akonadi2::Pipeline *pipeline) | 189 | void DummyResource::configurePipeline(Akonadi2::Pipeline *pipeline) |
105 | { | 190 | { |
106 | auto eventFactory = QSharedPointer<DummyEventAdaptorFactory>::create(); | 191 | auto eventFactory = QSharedPointer<DummyEventAdaptorFactory>::create(); |
192 | //FIXME we should setup for each resource entity type, not for each domain type | ||
193 | //i.e. If a resource stores tags as part of each message it needs to update the tag index | ||
107 | //TODO setup preprocessors for each domain type and pipeline type allowing full customization | 194 | //TODO setup preprocessors for each domain type and pipeline type allowing full customization |
108 | //Eventually the order should be self configuring, for now it's hardcoded. | 195 | //Eventually the order should be self configuring, for now it's hardcoded. |
109 | auto eventIndexer = new SimpleProcessor([eventFactory](const Akonadi2::PipelineState &state, const Akonadi2::Entity &entity) { | 196 | auto eventIndexer = new SimpleProcessor([eventFactory](const Akonadi2::PipelineState &state, const Akonadi2::Entity &entity) { |
110 | auto adaptor = eventFactory->createAdaptor(entity); | 197 | auto adaptor = eventFactory->createAdaptor(entity); |
111 | qDebug() << "Summary preprocessor: " << adaptor->getProperty("summary").toString(); | 198 | qDebug() << "Summary preprocessor: " << adaptor->getProperty("summary").toString(); |
112 | }); | 199 | }); |
113 | pipeline->setPreprocessors<Akonadi2::Domain::Event>(Akonadi2::Pipeline::NewPipeline, QVector<Akonadi2::Preprocessor*>() << eventIndexer); | 200 | //event is the entitytype and not the domain type |
201 | pipeline->setPreprocessors("event", Akonadi2::Pipeline::NewPipeline, QVector<Akonadi2::Preprocessor*>() << eventIndexer); | ||
202 | mProcessor = new Processor(pipeline, QList<MessageQueue*>() << &mUserQueue << &mSynchronizerQueue); | ||
114 | } | 203 | } |
115 | 204 | ||
116 | void findByRemoteId(QSharedPointer<Akonadi2::Storage> storage, const QString &rid, std::function<void(void *keyValue, int keySize, void *dataValue, int dataSize)> callback) | 205 | void findByRemoteId(QSharedPointer<Akonadi2::Storage> storage, const QString &rid, std::function<void(void *keyValue, int keySize, void *dataValue, int dataSize)> callback) |
@@ -139,6 +228,7 @@ void findByRemoteId(QSharedPointer<Akonadi2::Storage> storage, const QString &ri | |||
139 | 228 | ||
140 | Async::Job<void> DummyResource::synchronizeWithSource(Akonadi2::Pipeline *pipeline) | 229 | Async::Job<void> DummyResource::synchronizeWithSource(Akonadi2::Pipeline *pipeline) |
141 | { | 230 | { |
231 | qDebug() << "synchronizeWithSource"; | ||
142 | return Async::start<void>([this, pipeline](Async::Future<void> &f) { | 232 | return Async::start<void>([this, pipeline](Async::Future<void> &f) { |
143 | //TODO use a read-only transaction during the complete sync to sync against a defined revision | 233 | //TODO use a read-only transaction during the complete sync to sync against a defined revision |
144 | auto storage = QSharedPointer<Akonadi2::Storage>::create(Akonadi2::Store::storageLocation(), "org.kde.dummy"); | 234 | auto storage = QSharedPointer<Akonadi2::Storage>::create(Akonadi2::Store::storageLocation(), "org.kde.dummy"); |
@@ -171,7 +261,9 @@ Async::Job<void> DummyResource::synchronizeWithSource(Akonadi2::Pipeline *pipeli | |||
171 | DummyCalendar::FinishDummyEventBuffer(m_fbb, buffer); | 261 | DummyCalendar::FinishDummyEventBuffer(m_fbb, buffer); |
172 | //TODO toRFC4122 would probably be more efficient, but results in non-printable keys. | 262 | //TODO toRFC4122 would probably be more efficient, but results in non-printable keys. |
173 | const auto key = QUuid::createUuid().toString().toUtf8(); | 263 | const auto key = QUuid::createUuid().toString().toUtf8(); |
174 | pipeline->newEntity<Akonadi2::Domain::Event>(key, m_fbb.GetBufferPointer(), m_fbb.GetSize()); | 264 | //TODO Create queuedcommand and push into synchronizerQueue |
265 | //* create message in store directly, add command to messagequeue waiting for processing. | ||
266 | // pipeline->newEntity<Akonadi2::Domain::Event>(key, m_fbb.GetBufferPointer(), m_fbb.GetSize()); | ||
175 | } else { //modification | 267 | } else { //modification |
176 | //TODO diff and create modification if necessary | 268 | //TODO diff and create modification if necessary |
177 | } | 269 | } |
@@ -183,16 +275,18 @@ Async::Job<void> DummyResource::synchronizeWithSource(Akonadi2::Pipeline *pipeli | |||
183 | 275 | ||
184 | void DummyResource::processCommand(int commandId, const QByteArray &data, uint size, Akonadi2::Pipeline *pipeline) | 276 | void DummyResource::processCommand(int commandId, const QByteArray &data, uint size, Akonadi2::Pipeline *pipeline) |
185 | { | 277 | { |
186 | Q_UNUSED(commandId) | 278 | qDebug() << "processCommand"; |
187 | Q_UNUSED(data) | 279 | //TODO instead of copying the command including the full entity first into the command queue, we could directly |
188 | Q_UNUSED(size) | 280 | //create a new revision, only pushing a handle into the commandqueue with the relevant changeset (for changereplay). |
189 | //TODO reallly process the commands :) | 281 | //The problem is that we then require write access from multiple threads (or even processes to avoid sending the full entity over the wire). |
190 | auto builder = DummyCalendar::DummyEventBuilder(m_fbb); | ||
191 | builder .add_summary(m_fbb.CreateString("summary summary!")); | ||
192 | auto buffer = builder.Finish(); | ||
193 | DummyCalendar::FinishDummyEventBuffer(m_fbb, buffer); | ||
194 | pipeline->newEntity<Akonadi2::Domain::Event>("fakekey", m_fbb.GetBufferPointer(), m_fbb.GetSize()); | ||
195 | m_fbb.Clear(); | 282 | m_fbb.Clear(); |
283 | auto commandData = m_fbb.CreateVector(reinterpret_cast<uint8_t const *>(data.data()), data.size()); | ||
284 | auto builder = Akonadi2::QueuedCommandBuilder(m_fbb); | ||
285 | builder.add_commandId(commandId); | ||
286 | builder.add_command(commandData); | ||
287 | auto buffer = builder.Finish(); | ||
288 | Akonadi2::FinishQueuedCommandBuffer(m_fbb, buffer); | ||
289 | mUserQueue.enqueue(m_fbb.GetBufferPointer(), m_fbb.GetSize()); | ||
196 | } | 290 | } |
197 | 291 | ||
198 | DummyResourceFactory::DummyResourceFactory(QObject *parent) | 292 | DummyResourceFactory::DummyResourceFactory(QObject *parent) |
@@ -211,3 +305,4 @@ void DummyResourceFactory::registerFacades(Akonadi2::FacadeFactory &factory) | |||
211 | factory.registerFacade<Akonadi2::Domain::Event, DummyResourceFacade>(PLUGIN_NAME); | 305 | factory.registerFacade<Akonadi2::Domain::Event, DummyResourceFacade>(PLUGIN_NAME); |
212 | } | 306 | } |
213 | 307 | ||
308 | #include "resourcefactory.moc" | ||