summaryrefslogtreecommitdiffstats
path: root/common/pipeline.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'common/pipeline.cpp')
-rw-r--r--common/pipeline.cpp119
1 files changed, 111 insertions, 8 deletions
diff --git a/common/pipeline.cpp b/common/pipeline.cpp
index 21cf1c5..afb9e34 100644
--- a/common/pipeline.cpp
+++ b/common/pipeline.cpp
@@ -28,8 +28,11 @@
28#include "entity_generated.h" 28#include "entity_generated.h"
29#include "metadata_generated.h" 29#include "metadata_generated.h"
30#include "createentity_generated.h" 30#include "createentity_generated.h"
31#include "modifyentity_generated.h"
32#include "deleteentity_generated.h"
31#include "entitybuffer.h" 33#include "entitybuffer.h"
32#include "log.h" 34#include "log.h"
35#include "domain/applicationdomaintype.h"
33 36
34namespace Akonadi2 37namespace Akonadi2
35{ 38{
@@ -50,6 +53,7 @@ public:
50 QHash<QString, QVector<Preprocessor *> > deletedPipeline; 53 QHash<QString, QVector<Preprocessor *> > deletedPipeline;
51 QVector<PipelineState> activePipelines; 54 QVector<PipelineState> activePipelines;
52 bool stepScheduled; 55 bool stepScheduled;
56 QHash<QString, DomainTypeAdaptorFactoryInterface::Ptr> adaptorFactory;
53}; 57};
54 58
55Pipeline::Pipeline(const QString &resourceName, QObject *parent) 59Pipeline::Pipeline(const QString &resourceName, QObject *parent)
@@ -81,8 +85,10 @@ void Pipeline::setPreprocessors(const QString &entityType, Type pipelineType, co
81} 85}
82 86
83Storage &Pipeline::storage() const 87Storage &Pipeline::storage() const
88void Pipeline::setAdaptorFactory(const QString &entityType, DomainTypeAdaptorFactoryInterface::Ptr factory)
84{ 89{
85 return d->storage; 90 return d->storage;
91 d->adaptorFactory.insert(entityType, factory);
86} 92}
87 93
88void Pipeline::null() 94void Pipeline::null()
@@ -147,18 +153,115 @@ KAsync::Job<void> Pipeline::newEntity(void const *command, size_t size)
147 }); 153 });
148} 154}
149 155
150void Pipeline::modifiedEntity(const QString &entityType, const QByteArray &key, void *data, size_t size) 156KAsync::Job<void> Pipeline::modifiedEntity(void const *command, size_t size)
151{ 157{
152 PipelineState state(this, ModifiedPipeline, key, d->modifiedPipeline[entityType], [](){}); 158 Log() << "Pipeline: Modified Entity";
153 d->activePipelines << state; 159
154 state.step(); 160 const qint64 newRevision = storage().maxRevision() + 1;
161
162 {
163 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size);
164 if (!Akonadi2::Commands::VerifyModifyEntityBuffer(verifyer)) {
165 qWarning() << "invalid buffer, not a modify entity buffer";
166 return KAsync::error<void>();
167 }
168 }
169 auto modifyEntity = Akonadi2::Commands::GetModifyEntity(command);
170
171 //TODO rename modifyEntity->domainType to bufferType
172 const QByteArray entityType = QByteArray(reinterpret_cast<char const*>(modifyEntity->domainType()->Data()), modifyEntity->domainType()->size());
173 const QByteArray key = QByteArray(reinterpret_cast<char const*>(modifyEntity->entityId()->Data()), modifyEntity->entityId()->size());
174 {
175 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(modifyEntity->delta()->Data()), modifyEntity->delta()->size());
176 if (!Akonadi2::VerifyEntityBuffer(verifyer)) {
177 qWarning() << "invalid buffer, not an entity buffer";
178 return KAsync::error<void>();
179 }
180 }
181
182 auto adaptorFactory = d->adaptorFactory.value(entityType);
183 if (adaptorFactory) {
184 qWarning() << "no adaptor factory";
185 return KAsync::error<void>();
186 }
187
188 auto diffEntity = Akonadi2::GetEntity(modifyEntity->delta()->Data());
189 auto diff = adaptorFactory->createAdaptor(*diffEntity);
190
191 Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr domainType;
192 storage().scan(QByteArray::fromRawData(key.data(), key.size()), [&domainType](const QByteArray &data) -> bool {
193 auto existingEntity = Akonadi2::GetEntity(data.data());
194 domainType = getDomainType(*existingEntity);
195 return false;
196 });
197 //TODO error handler
198
199 //Apply diff
200 //FIXME only apply the properties that are available in the buffer
201 for (const auto &property : diff->availableProperties()) {
202 domainType->setProperty(property, diff->getProperty(property));
203 }
204 //Remove deletions
205 for (const auto &property : *modifyEntity->deletions()) {
206 domainType->setProperty(QByteArray::fromRawData(property->data(), property->size()), QVariant());
207 }
208
209
210 //Add metadata buffer
211 flatbuffers::FlatBufferBuilder metadataFbb;
212 auto metadataBuilder = Akonadi2::MetadataBuilder(metadataFbb);
213 metadataBuilder.add_revision(newRevision);
214 metadataBuilder.add_processed(false);
215 auto metadataBuffer = metadataBuilder.Finish();
216 Akonadi2::FinishMetadataBuffer(metadataFbb, metadataBuffer);
217
218
219 flatbuffers::FlatBufferBuilder fbb;
220 adaptorFactory->createBuffer(*domainType, fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize());
221
222 //TODO don't overwrite the old entry, but instead store a new revision
223 storage().write(key.data(), key.size(), fbb.GetBufferPointer(), fbb.GetSize());
224 storage().setMaxRevision(newRevision);
225
226 return KAsync::start<void>([this, key, entityType](KAsync::Future<void> &future) {
227 PipelineState state(this, ModifiedPipeline, key, d->newPipeline[entityType], [&future]() {
228 future.setFinished();
229 });
230 d->activePipelines << state;
231 state.step();
232 });
155} 233}
156 234
157void Pipeline::deletedEntity(const QString &entityType, const QByteArray &key) 235KAsync::Job<void> Pipeline::deletedEntity(void const *command, size_t size)
158{ 236{
159 PipelineState state(this, DeletedPipeline, key, d->deletedPipeline[entityType], [](){}); 237 Log() << "Pipeline: Deleted Entity";
160 d->activePipelines << state; 238
161 state.step(); 239 const qint64 newRevision = storage().maxRevision() + 1;
240
241 {
242 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size);
243 if (!Akonadi2::Commands::VerifyDeleteEntityBuffer(verifyer)) {
244 qWarning() << "invalid buffer, not a delete entity buffer";
245 return KAsync::error<void>();
246 }
247 }
248 auto deleteEntity = Akonadi2::Commands::GetDeleteEntity(command);
249
250 const QByteArray entityType = QByteArray(reinterpret_cast<char const*>(deleteEntity->domainType()->Data()), deleteEntity->domainType()->size());
251 const QByteArray key = QByteArray(reinterpret_cast<char const*>(deleteEntity->entityId()->Data()), deleteEntity->entityId()->size());
252
253 //TODO instead of deleting the entry, a new revision should be created that marks the entity as deleted
254 storage().remove(key.data(), key.size());
255 storage().setMaxRevision(newRevision);
256 Log() << "Pipeline: deleted entity: "<< newRevision;
257
258 return KAsync::start<void>([this, key, entityType](KAsync::Future<void> &future) {
259 PipelineState state(this, DeletedPipeline, key, d->deletedPipeline[entityType], [&future](){
260 future.setFinished();
261 });
262 d->activePipelines << state;
263 state.step();
264 });
162} 265}
163 266
164void Pipeline::pipelineStepped(const PipelineState &state) 267void Pipeline::pipelineStepped(const PipelineState &state)