summaryrefslogtreecommitdiffstats
path: root/common/pipeline.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'common/pipeline.cpp')
-rw-r--r--common/pipeline.cpp64
1 files changed, 39 insertions, 25 deletions
diff --git a/common/pipeline.cpp b/common/pipeline.cpp
index 8ef6187..33e5d5c 100644
--- a/common/pipeline.cpp
+++ b/common/pipeline.cpp
@@ -143,7 +143,7 @@ KAsync::Job<void> Pipeline::newEntity(void const *command, size_t size)
143 auto createEntity = Akonadi2::Commands::GetCreateEntity(command); 143 auto createEntity = Akonadi2::Commands::GetCreateEntity(command);
144 144
145 //TODO rename createEntitiy->domainType to bufferType 145 //TODO rename createEntitiy->domainType to bufferType
146 const QString entityType = QString::fromUtf8(reinterpret_cast<char const*>(createEntity->domainType()->Data()), createEntity->domainType()->size()); 146 const QByteArray bufferType = QByteArray(reinterpret_cast<char const*>(createEntity->domainType()->Data()), createEntity->domainType()->size());
147 { 147 {
148 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(createEntity->delta()->Data()), createEntity->delta()->size()); 148 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(createEntity->delta()->Data()), createEntity->delta()->size());
149 if (!Akonadi2::VerifyEntityBuffer(verifyer)) { 149 if (!Akonadi2::VerifyEntityBuffer(verifyer)) {
@@ -152,6 +152,10 @@ KAsync::Job<void> Pipeline::newEntity(void const *command, size_t size)
152 } 152 }
153 } 153 }
154 auto entity = Akonadi2::GetEntity(createEntity->delta()->Data()); 154 auto entity = Akonadi2::GetEntity(createEntity->delta()->Data());
155 if (!entity->resource()->size() && !entity->local()->size()) {
156 Warning() << "No local and no resource buffer while trying to create entity.";
157 return KAsync::error<void>();
158 }
155 159
156 //Add metadata buffer 160 //Add metadata buffer
157 flatbuffers::FlatBufferBuilder metadataFbb; 161 flatbuffers::FlatBufferBuilder metadataFbb;
@@ -165,14 +169,14 @@ KAsync::Job<void> Pipeline::newEntity(void const *command, size_t size)
165 flatbuffers::FlatBufferBuilder fbb; 169 flatbuffers::FlatBufferBuilder fbb;
166 EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), entity->resource()->Data(), entity->resource()->size(), entity->local()->Data(), entity->local()->size()); 170 EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), entity->resource()->Data(), entity->resource()->size(), entity->local()->Data(), entity->local()->size());
167 171
168 d->transaction.openDatabase().write(key, QByteArray::fromRawData(reinterpret_cast<char const *>(fbb.GetBufferPointer()), fbb.GetSize())); 172 d->transaction.openDatabase(bufferType + ".main").write(key, QByteArray::fromRawData(reinterpret_cast<char const *>(fbb.GetBufferPointer()), fbb.GetSize()));
169 Akonadi2::Storage::setMaxRevision(d->transaction, newRevision); 173 Akonadi2::Storage::setMaxRevision(d->transaction, newRevision);
170 Log() << "Pipeline: wrote entity: " << key << newRevision; 174 Log() << "Pipeline: wrote entity: " << key << newRevision << bufferType;
171 175
172 return KAsync::start<void>([this, key, entityType, newRevision](KAsync::Future<void> &future) { 176 return KAsync::start<void>([this, key, bufferType, newRevision](KAsync::Future<void> &future) {
173 PipelineState state(this, NewPipeline, key, d->newPipeline[entityType], newRevision, [&future]() { 177 PipelineState state(this, NewPipeline, key, d->newPipeline[bufferType], newRevision, [&future]() {
174 future.setFinished(); 178 future.setFinished();
175 }); 179 }, bufferType);
176 d->activePipelines << state; 180 d->activePipelines << state;
177 state.step(); 181 state.step();
178 }); 182 });
@@ -195,10 +199,10 @@ KAsync::Job<void> Pipeline::modifiedEntity(void const *command, size_t size)
195 Q_ASSERT(modifyEntity); 199 Q_ASSERT(modifyEntity);
196 200
197 //TODO rename modifyEntity->domainType to bufferType 201 //TODO rename modifyEntity->domainType to bufferType
198 const QByteArray entityType = QByteArray(reinterpret_cast<char const*>(modifyEntity->domainType()->Data()), modifyEntity->domainType()->size()); 202 const QByteArray bufferType = QByteArray(reinterpret_cast<char const*>(modifyEntity->domainType()->Data()), modifyEntity->domainType()->size());
199 const QByteArray key = QByteArray(reinterpret_cast<char const*>(modifyEntity->entityId()->Data()), modifyEntity->entityId()->size()); 203 const QByteArray key = QByteArray(reinterpret_cast<char const*>(modifyEntity->entityId()->Data()), modifyEntity->entityId()->size());
200 if (entityType.isEmpty() || key.isEmpty()) { 204 if (bufferType.isEmpty() || key.isEmpty()) {
201 Warning() << "entity type or key " << entityType << key; 205 Warning() << "entity type or key " << bufferType << key;
202 return KAsync::error<void>(); 206 return KAsync::error<void>();
203 } 207 }
204 { 208 {
@@ -209,9 +213,9 @@ KAsync::Job<void> Pipeline::modifiedEntity(void const *command, size_t size)
209 } 213 }
210 } 214 }
211 215
212 auto adaptorFactory = d->adaptorFactory.value(entityType); 216 auto adaptorFactory = d->adaptorFactory.value(bufferType);
213 if (!adaptorFactory) { 217 if (!adaptorFactory) {
214 Warning() << "no adaptor factory for type " << entityType; 218 Warning() << "no adaptor factory for type " << bufferType;
215 return KAsync::error<void>(); 219 return KAsync::error<void>();
216 } 220 }
217 221
@@ -220,7 +224,7 @@ KAsync::Job<void> Pipeline::modifiedEntity(void const *command, size_t size)
220 auto diff = adaptorFactory->createAdaptor(*diffEntity); 224 auto diff = adaptorFactory->createAdaptor(*diffEntity);
221 225
222 QSharedPointer<Akonadi2::ApplicationDomain::BufferAdaptor> current; 226 QSharedPointer<Akonadi2::ApplicationDomain::BufferAdaptor> current;
223 storage().createTransaction(Akonadi2::Storage::ReadOnly).openDatabase().scan(key, [&current, adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool { 227 storage().createTransaction(Akonadi2::Storage::ReadOnly).openDatabase(bufferType + ".main").scan(key, [&current, adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool {
224 Akonadi2::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); 228 Akonadi2::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size());
225 if (!buffer.isValid()) { 229 if (!buffer.isValid()) {
226 Warning() << "Read invalid buffer from disk"; 230 Warning() << "Read invalid buffer from disk";
@@ -228,6 +232,9 @@ KAsync::Job<void> Pipeline::modifiedEntity(void const *command, size_t size)
228 current = adaptorFactory->createAdaptor(buffer.entity()); 232 current = adaptorFactory->createAdaptor(buffer.entity());
229 } 233 }
230 return false; 234 return false;
235 },
236 [](const Storage::Error &error) {
237 Warning() << "Failed to read value from storage: " << error.message;
231 }); 238 });
232 //TODO error handler 239 //TODO error handler
233 240
@@ -265,13 +272,13 @@ KAsync::Job<void> Pipeline::modifiedEntity(void const *command, size_t size)
265 adaptorFactory->createBuffer(*newObject, fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize()); 272 adaptorFactory->createBuffer(*newObject, fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize());
266 273
267 //TODO don't overwrite the old entry, but instead store a new revision 274 //TODO don't overwrite the old entry, but instead store a new revision
268 d->transaction.openDatabase().write(key, QByteArray::fromRawData(reinterpret_cast<char const *>(fbb.GetBufferPointer()), fbb.GetSize())); 275 d->transaction.openDatabase(bufferType + ".main").write(key, QByteArray::fromRawData(reinterpret_cast<char const *>(fbb.GetBufferPointer()), fbb.GetSize()));
269 Akonadi2::Storage::setMaxRevision(d->transaction, newRevision); 276 Akonadi2::Storage::setMaxRevision(d->transaction, newRevision);
270 277
271 return KAsync::start<void>([this, key, entityType, newRevision](KAsync::Future<void> &future) { 278 return KAsync::start<void>([this, key, bufferType, newRevision](KAsync::Future<void> &future) {
272 PipelineState state(this, ModifiedPipeline, key, d->modifiedPipeline[entityType], newRevision, [&future]() { 279 PipelineState state(this, ModifiedPipeline, key, d->modifiedPipeline[bufferType], newRevision, [&future]() {
273 future.setFinished(); 280 future.setFinished();
274 }); 281 }, bufferType);
275 d->activePipelines << state; 282 d->activePipelines << state;
276 state.step(); 283 state.step();
277 }); 284 });
@@ -292,18 +299,18 @@ KAsync::Job<void> Pipeline::deletedEntity(void const *command, size_t size)
292 } 299 }
293 auto deleteEntity = Akonadi2::Commands::GetDeleteEntity(command); 300 auto deleteEntity = Akonadi2::Commands::GetDeleteEntity(command);
294 301
295 const QByteArray entityType = QByteArray(reinterpret_cast<char const*>(deleteEntity->domainType()->Data()), deleteEntity->domainType()->size()); 302 const QByteArray bufferType = QByteArray(reinterpret_cast<char const*>(deleteEntity->domainType()->Data()), deleteEntity->domainType()->size());
296 const QByteArray key = QByteArray(reinterpret_cast<char const*>(deleteEntity->entityId()->Data()), deleteEntity->entityId()->size()); 303 const QByteArray key = QByteArray(reinterpret_cast<char const*>(deleteEntity->entityId()->Data()), deleteEntity->entityId()->size());
297 304
298 //TODO instead of deleting the entry, a new revision should be created that marks the entity as deleted 305 //TODO instead of deleting the entry, a new revision should be created that marks the entity as deleted
299 d->transaction.openDatabase().remove(key); 306 d->transaction.openDatabase(bufferType + ".main").remove(key);
300 Akonadi2::Storage::setMaxRevision(d->transaction, newRevision); 307 Akonadi2::Storage::setMaxRevision(d->transaction, newRevision);
301 Log() << "Pipeline: deleted entity: "<< newRevision; 308 Log() << "Pipeline: deleted entity: "<< newRevision;
302 309
303 return KAsync::start<void>([this, key, entityType, newRevision](KAsync::Future<void> &future) { 310 return KAsync::start<void>([this, key, bufferType, newRevision](KAsync::Future<void> &future) {
304 PipelineState state(this, DeletedPipeline, key, d->deletedPipeline[entityType], newRevision, [&future](){ 311 PipelineState state(this, DeletedPipeline, key, d->deletedPipeline[bufferType], newRevision, [&future](){
305 future.setFinished(); 312 future.setFinished();
306 }); 313 }, bufferType);
307 d->activePipelines << state; 314 d->activePipelines << state;
308 state.step(); 315 state.step();
309 }); 316 });
@@ -354,14 +361,15 @@ void Pipeline::pipelineCompleted(PipelineState state)
354class PipelineState::Private : public QSharedData 361class PipelineState::Private : public QSharedData
355{ 362{
356public: 363public:
357 Private(Pipeline *p, Pipeline::Type t, const QByteArray &k, QVector<Preprocessor *> filters, const std::function<void()> &c, qint64 r) 364 Private(Pipeline *p, Pipeline::Type t, const QByteArray &k, QVector<Preprocessor *> filters, const std::function<void()> &c, qint64 r, const QByteArray &b)
358 : pipeline(p), 365 : pipeline(p),
359 type(t), 366 type(t),
360 key(k), 367 key(k),
361 filterIt(filters), 368 filterIt(filters),
362 idle(true), 369 idle(true),
363 callback(c), 370 callback(c),
364 revision(r) 371 revision(r),
372 bufferType(b)
365 {} 373 {}
366 374
367 Private() 375 Private()
@@ -378,6 +386,7 @@ public:
378 bool idle; 386 bool idle;
379 std::function<void()> callback; 387 std::function<void()> callback;
380 qint64 revision; 388 qint64 revision;
389 QByteArray bufferType;
381}; 390};
382 391
383PipelineState::PipelineState() 392PipelineState::PipelineState()
@@ -386,8 +395,8 @@ PipelineState::PipelineState()
386 395
387} 396}
388 397
389PipelineState::PipelineState(Pipeline *pipeline, Pipeline::Type type, const QByteArray &key, const QVector<Preprocessor *> &filters, qint64 revision, const std::function<void()> &callback) 398PipelineState::PipelineState(Pipeline *pipeline, Pipeline::Type type, const QByteArray &key, const QVector<Preprocessor *> &filters, qint64 revision, const std::function<void()> &callback, const QByteArray &bufferType)
390 : d(new Private(pipeline, type, key, filters, callback, revision)) 399 : d(new Private(pipeline, type, key, filters, callback, revision, bufferType))
391{ 400{
392} 401}
393 402
@@ -431,6 +440,11 @@ qint64 PipelineState::revision() const
431 return d->revision; 440 return d->revision;
432} 441}
433 442
443QByteArray PipelineState::bufferType() const
444{
445 return d->bufferType;
446}
447
434void PipelineState::step() 448void PipelineState::step()
435{ 449{
436 if (!d->pipeline) { 450 if (!d->pipeline) {