summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--common/commands/deleteentity.fbs3
-rw-r--r--common/commands/modifyentity.fbs2
-rw-r--r--common/genericresource.cpp7
-rw-r--r--common/pipeline.cpp119
-rw-r--r--common/pipeline.h6
-rw-r--r--examples/dummyresource/resourcefactory.cpp1
6 files changed, 121 insertions, 17 deletions
diff --git a/common/commands/deleteentity.fbs b/common/commands/deleteentity.fbs
index c9b7850..4f32b54 100644
--- a/common/commands/deleteentity.fbs
+++ b/common/commands/deleteentity.fbs
@@ -1,8 +1,9 @@
1namespace Akonadi2; 1namespace Akonadi2.Commands;
2 2
3table DeleteEntity { 3table DeleteEntity {
4 revision: ulong; 4 revision: ulong;
5 entityId: string; 5 entityId: string;
6 domainType: string;
6} 7}
7 8
8root_type DeleteEntity; 9root_type DeleteEntity;
diff --git a/common/commands/modifyentity.fbs b/common/commands/modifyentity.fbs
index d26051e..a59eb9b 100644
--- a/common/commands/modifyentity.fbs
+++ b/common/commands/modifyentity.fbs
@@ -1,4 +1,4 @@
1namespace Akonadi2; 1namespace Akonadi2.Commands;
2 2
3table ModifyEntity { 3table ModifyEntity {
4 revision: ulong; 4 revision: ulong;
diff --git a/common/genericresource.cpp b/common/genericresource.cpp
index 139ae98..a500aed 100644
--- a/common/genericresource.cpp
+++ b/common/genericresource.cpp
@@ -9,7 +9,6 @@
9#include "clientapi.h" 9#include "clientapi.h"
10#include "index.h" 10#include "index.h"
11#include "log.h" 11#include "log.h"
12#include <assert.h>
13 12
14using namespace Akonadi2; 13using namespace Akonadi2;
15 14
@@ -67,11 +66,9 @@ private slots:
67 //Throw command into appropriate pipeline 66 //Throw command into appropriate pipeline
68 switch (queuedCommand->commandId()) { 67 switch (queuedCommand->commandId()) {
69 case Akonadi2::Commands::DeleteEntityCommand: 68 case Akonadi2::Commands::DeleteEntityCommand:
70 //mPipeline->removedEntity 69 return mPipeline->deletedEntity(queuedCommand->command()->Data(), queuedCommand->command()->size());
71 return KAsync::null<void>();
72 case Akonadi2::Commands::ModifyEntityCommand: 70 case Akonadi2::Commands::ModifyEntityCommand:
73 //mPipeline->modifiedEntity 71 return mPipeline->modifiedEntity(queuedCommand->command()->Data(), queuedCommand->command()->size());
74 return KAsync::null<void>();
75 case Akonadi2::Commands::CreateEntityCommand: 72 case Akonadi2::Commands::CreateEntityCommand:
76 return mPipeline->newEntity(queuedCommand->command()->Data(), queuedCommand->command()->size()); 73 return mPipeline->newEntity(queuedCommand->command()->Data(), queuedCommand->command()->size());
77 default: 74 default:
diff --git a/common/pipeline.cpp b/common/pipeline.cpp
index 21cf1c5..afb9e34 100644
--- a/common/pipeline.cpp
+++ b/common/pipeline.cpp
@@ -28,8 +28,11 @@
28#include "entity_generated.h" 28#include "entity_generated.h"
29#include "metadata_generated.h" 29#include "metadata_generated.h"
30#include "createentity_generated.h" 30#include "createentity_generated.h"
31#include "modifyentity_generated.h"
32#include "deleteentity_generated.h"
31#include "entitybuffer.h" 33#include "entitybuffer.h"
32#include "log.h" 34#include "log.h"
35#include "domain/applicationdomaintype.h"
33 36
34namespace Akonadi2 37namespace Akonadi2
35{ 38{
@@ -50,6 +53,7 @@ public:
50 QHash<QString, QVector<Preprocessor *> > deletedPipeline; 53 QHash<QString, QVector<Preprocessor *> > deletedPipeline;
51 QVector<PipelineState> activePipelines; 54 QVector<PipelineState> activePipelines;
52 bool stepScheduled; 55 bool stepScheduled;
56 QHash<QString, DomainTypeAdaptorFactoryInterface::Ptr> adaptorFactory;
53}; 57};
54 58
55Pipeline::Pipeline(const QString &resourceName, QObject *parent) 59Pipeline::Pipeline(const QString &resourceName, QObject *parent)
@@ -81,8 +85,10 @@ void Pipeline::setPreprocessors(const QString &entityType, Type pipelineType, co
81} 85}
82 86
83Storage &Pipeline::storage() const 87Storage &Pipeline::storage() const
88void Pipeline::setAdaptorFactory(const QString &entityType, DomainTypeAdaptorFactoryInterface::Ptr factory)
84{ 89{
85 return d->storage; 90 return d->storage;
91 d->adaptorFactory.insert(entityType, factory);
86} 92}
87 93
88void Pipeline::null() 94void Pipeline::null()
@@ -147,18 +153,115 @@ KAsync::Job<void> Pipeline::newEntity(void const *command, size_t size)
147 }); 153 });
148} 154}
149 155
150void Pipeline::modifiedEntity(const QString &entityType, const QByteArray &key, void *data, size_t size) 156KAsync::Job<void> Pipeline::modifiedEntity(void const *command, size_t size)
151{ 157{
152 PipelineState state(this, ModifiedPipeline, key, d->modifiedPipeline[entityType], [](){}); 158 Log() << "Pipeline: Modified Entity";
153 d->activePipelines << state; 159
154 state.step(); 160 const qint64 newRevision = storage().maxRevision() + 1;
161
162 {
163 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size);
164 if (!Akonadi2::Commands::VerifyModifyEntityBuffer(verifyer)) {
165 qWarning() << "invalid buffer, not a modify entity buffer";
166 return KAsync::error<void>();
167 }
168 }
169 auto modifyEntity = Akonadi2::Commands::GetModifyEntity(command);
170
171 //TODO rename modifyEntity->domainType to bufferType
172 const QByteArray entityType = QByteArray(reinterpret_cast<char const*>(modifyEntity->domainType()->Data()), modifyEntity->domainType()->size());
173 const QByteArray key = QByteArray(reinterpret_cast<char const*>(modifyEntity->entityId()->Data()), modifyEntity->entityId()->size());
174 {
175 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(modifyEntity->delta()->Data()), modifyEntity->delta()->size());
176 if (!Akonadi2::VerifyEntityBuffer(verifyer)) {
177 qWarning() << "invalid buffer, not an entity buffer";
178 return KAsync::error<void>();
179 }
180 }
181
182 auto adaptorFactory = d->adaptorFactory.value(entityType);
183 if (adaptorFactory) {
184 qWarning() << "no adaptor factory";
185 return KAsync::error<void>();
186 }
187
188 auto diffEntity = Akonadi2::GetEntity(modifyEntity->delta()->Data());
189 auto diff = adaptorFactory->createAdaptor(*diffEntity);
190
191 Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr domainType;
192 storage().scan(QByteArray::fromRawData(key.data(), key.size()), [&domainType](const QByteArray &data) -> bool {
193 auto existingEntity = Akonadi2::GetEntity(data.data());
194 domainType = getDomainType(*existingEntity);
195 return false;
196 });
197 //TODO error handler
198
199 //Apply diff
200 //FIXME only apply the properties that are available in the buffer
201 for (const auto &property : diff->availableProperties()) {
202 domainType->setProperty(property, diff->getProperty(property));
203 }
204 //Remove deletions
205 for (const auto &property : *modifyEntity->deletions()) {
206 domainType->setProperty(QByteArray::fromRawData(property->data(), property->size()), QVariant());
207 }
208
209
210 //Add metadata buffer
211 flatbuffers::FlatBufferBuilder metadataFbb;
212 auto metadataBuilder = Akonadi2::MetadataBuilder(metadataFbb);
213 metadataBuilder.add_revision(newRevision);
214 metadataBuilder.add_processed(false);
215 auto metadataBuffer = metadataBuilder.Finish();
216 Akonadi2::FinishMetadataBuffer(metadataFbb, metadataBuffer);
217
218
219 flatbuffers::FlatBufferBuilder fbb;
220 adaptorFactory->createBuffer(*domainType, fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize());
221
222 //TODO don't overwrite the old entry, but instead store a new revision
223 storage().write(key.data(), key.size(), fbb.GetBufferPointer(), fbb.GetSize());
224 storage().setMaxRevision(newRevision);
225
226 return KAsync::start<void>([this, key, entityType](KAsync::Future<void> &future) {
227 PipelineState state(this, ModifiedPipeline, key, d->newPipeline[entityType], [&future]() {
228 future.setFinished();
229 });
230 d->activePipelines << state;
231 state.step();
232 });
155} 233}
156 234
157void Pipeline::deletedEntity(const QString &entityType, const QByteArray &key) 235KAsync::Job<void> Pipeline::deletedEntity(void const *command, size_t size)
158{ 236{
159 PipelineState state(this, DeletedPipeline, key, d->deletedPipeline[entityType], [](){}); 237 Log() << "Pipeline: Deleted Entity";
160 d->activePipelines << state; 238
161 state.step(); 239 const qint64 newRevision = storage().maxRevision() + 1;
240
241 {
242 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size);
243 if (!Akonadi2::Commands::VerifyDeleteEntityBuffer(verifyer)) {
244 qWarning() << "invalid buffer, not a delete entity buffer";
245 return KAsync::error<void>();
246 }
247 }
248 auto deleteEntity = Akonadi2::Commands::GetDeleteEntity(command);
249
250 const QByteArray entityType = QByteArray(reinterpret_cast<char const*>(deleteEntity->domainType()->Data()), deleteEntity->domainType()->size());
251 const QByteArray key = QByteArray(reinterpret_cast<char const*>(deleteEntity->entityId()->Data()), deleteEntity->entityId()->size());
252
253 //TODO instead of deleting the entry, a new revision should be created that marks the entity as deleted
254 storage().remove(key.data(), key.size());
255 storage().setMaxRevision(newRevision);
256 Log() << "Pipeline: deleted entity: "<< newRevision;
257
258 return KAsync::start<void>([this, key, entityType](KAsync::Future<void> &future) {
259 PipelineState state(this, DeletedPipeline, key, d->deletedPipeline[entityType], [&future](){
260 future.setFinished();
261 });
262 d->activePipelines << state;
263 state.step();
264 });
162} 265}
163 266
164void Pipeline::pipelineStepped(const PipelineState &state) 267void Pipeline::pipelineStepped(const PipelineState &state)
diff --git a/common/pipeline.h b/common/pipeline.h
index 6df2d76..a6696ec 100644
--- a/common/pipeline.h
+++ b/common/pipeline.h
@@ -31,6 +31,7 @@
31#include <Async/Async> 31#include <Async/Async>
32 32
33#include "entity_generated.h" 33#include "entity_generated.h"
34#include "domainadaptor.h"
34 35
35namespace Akonadi2 36namespace Akonadi2
36{ 37{
@@ -53,10 +54,11 @@ public:
53 void setPreprocessors(const QString &entityType, Type pipelineType, const QVector<Preprocessor *> &preprocessors); 54 void setPreprocessors(const QString &entityType, Type pipelineType, const QVector<Preprocessor *> &preprocessors);
54 55
55 void null(); 56 void null();
57 void setAdaptorFactory(const QString &entityType, DomainTypeAdaptorFactoryInterface::Ptr factory);
56 58
57 KAsync::Job<void> newEntity(void const *command, size_t size); 59 KAsync::Job<void> newEntity(void const *command, size_t size);
58 void modifiedEntity(const QString &entityType, const QByteArray &key, void *data, size_t size); 60 KAsync::Job<void> modifiedEntity(void const *command, size_t size);
59 void deletedEntity(const QString &entityType, const QByteArray &key); 61 KAsync::Job<void> deletedEntity(void const *command, size_t size);
60 62
61Q_SIGNALS: 63Q_SIGNALS:
62 void revisionUpdated(); 64 void revisionUpdated();
diff --git a/examples/dummyresource/resourcefactory.cpp b/examples/dummyresource/resourcefactory.cpp
index 9e8a2fc..b9123d5 100644
--- a/examples/dummyresource/resourcefactory.cpp
+++ b/examples/dummyresource/resourcefactory.cpp
@@ -58,6 +58,7 @@ void DummyResource::configurePipeline(Akonadi2::Pipeline *pipeline)
58 }); 58 });
59 59
60 pipeline->setPreprocessors(ENTITY_TYPE_EVENT, Akonadi2::Pipeline::NewPipeline, QVector<Akonadi2::Preprocessor*>() << eventIndexer); 60 pipeline->setPreprocessors(ENTITY_TYPE_EVENT, Akonadi2::Pipeline::NewPipeline, QVector<Akonadi2::Preprocessor*>() << eventIndexer);
61 pipeline->setAdaptorFactory(ENTITY_TYPE_EVENT, eventFactory);
61 //TODO cleanup indexes during removal 62 //TODO cleanup indexes during removal
62 GenericResource::configurePipeline(pipeline); 63 GenericResource::configurePipeline(pipeline);
63} 64}