summaryrefslogtreecommitdiffstats
path: root/common/pipeline.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'common/pipeline.cpp')
-rw-r--r--common/pipeline.cpp22
1 files changed, 13 insertions, 9 deletions
diff --git a/common/pipeline.cpp b/common/pipeline.cpp
index 0ebe2f3..0231631 100644
--- a/common/pipeline.cpp
+++ b/common/pipeline.cpp
@@ -143,8 +143,10 @@ KAsync::Job<void> Pipeline::newEntity(void const *command, size_t size)
143 flatbuffers::FlatBufferBuilder fbb; 143 flatbuffers::FlatBufferBuilder fbb;
144 EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), entity->resource()->Data(), entity->resource()->size(), entity->local()->Data(), entity->local()->size()); 144 EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), entity->resource()->Data(), entity->resource()->size(), entity->local()->Data(), entity->local()->size());
145 145
146 storage().write(key.data(), key.size(), fbb.GetBufferPointer(), fbb.GetSize()); 146 auto transaction = storage().createTransaction(Akonadi2::Storage::ReadWrite);
147 storage().setMaxRevision(newRevision); 147 transaction.write(key, QByteArray::fromRawData(reinterpret_cast<char const *>(fbb.GetBufferPointer()), fbb.GetSize()));
148 Akonadi2::Storage::setMaxRevision(transaction, newRevision);
149 transaction.commit();
148 Log() << "Pipeline: wrote entity: " << key << newRevision; 150 Log() << "Pipeline: wrote entity: " << key << newRevision;
149 151
150 return KAsync::start<void>([this, key, entityType](KAsync::Future<void> &future) { 152 return KAsync::start<void>([this, key, entityType](KAsync::Future<void> &future) {
@@ -198,7 +200,7 @@ KAsync::Job<void> Pipeline::modifiedEntity(void const *command, size_t size)
198 auto diff = adaptorFactory->createAdaptor(*diffEntity); 200 auto diff = adaptorFactory->createAdaptor(*diffEntity);
199 201
200 QSharedPointer<Akonadi2::ApplicationDomain::BufferAdaptor> current; 202 QSharedPointer<Akonadi2::ApplicationDomain::BufferAdaptor> current;
201 storage().scan(QByteArray::fromRawData(key.data(), key.size()), [&current, adaptorFactory](const QByteArray &data) -> bool { 203 storage().createTransaction(Akonadi2::Storage::ReadOnly).scan(key, [&current, adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool {
202 Akonadi2::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); 204 Akonadi2::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size());
203 if (!buffer.isValid()) { 205 if (!buffer.isValid()) {
204 Warning() << "Read invalid buffer from disk"; 206 Warning() << "Read invalid buffer from disk";
@@ -243,8 +245,9 @@ KAsync::Job<void> Pipeline::modifiedEntity(void const *command, size_t size)
243 adaptorFactory->createBuffer(*newObject, fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize()); 245 adaptorFactory->createBuffer(*newObject, fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize());
244 246
245 //TODO don't overwrite the old entry, but instead store a new revision 247 //TODO don't overwrite the old entry, but instead store a new revision
246 storage().write(key.data(), key.size(), fbb.GetBufferPointer(), fbb.GetSize()); 248 auto transaction = storage().createTransaction(Akonadi2::Storage::ReadWrite);
247 storage().setMaxRevision(newRevision); 249 transaction.write(key, QByteArray::fromRawData(reinterpret_cast<char const *>(fbb.GetBufferPointer()), fbb.GetSize()));
250 Akonadi2::Storage::setMaxRevision(transaction, newRevision);
248 251
249 return KAsync::start<void>([this, key, entityType](KAsync::Future<void> &future) { 252 return KAsync::start<void>([this, key, entityType](KAsync::Future<void> &future) {
250 PipelineState state(this, ModifiedPipeline, key, d->modifiedPipeline[entityType], [&future]() { 253 PipelineState state(this, ModifiedPipeline, key, d->modifiedPipeline[entityType], [&future]() {
@@ -274,8 +277,9 @@ KAsync::Job<void> Pipeline::deletedEntity(void const *command, size_t size)
274 const QByteArray key = QByteArray(reinterpret_cast<char const*>(deleteEntity->entityId()->Data()), deleteEntity->entityId()->size()); 277 const QByteArray key = QByteArray(reinterpret_cast<char const*>(deleteEntity->entityId()->Data()), deleteEntity->entityId()->size());
275 278
276 //TODO instead of deleting the entry, a new revision should be created that marks the entity as deleted 279 //TODO instead of deleting the entry, a new revision should be created that marks the entity as deleted
277 storage().remove(key.data(), key.size()); 280 auto transaction = storage().createTransaction(Akonadi2::Storage::ReadWrite);
278 storage().setMaxRevision(newRevision); 281 transaction.remove(key);
282 Akonadi2::Storage::setMaxRevision(transaction, newRevision);
279 Log() << "Pipeline: deleted entity: "<< newRevision; 283 Log() << "Pipeline: deleted entity: "<< newRevision;
280 284
281 return KAsync::start<void>([this, key, entityType](KAsync::Future<void> &future) { 285 return KAsync::start<void>([this, key, entityType](KAsync::Future<void> &future) {
@@ -414,8 +418,8 @@ void PipelineState::step()
414 //TODO skip step if already processed 418 //TODO skip step if already processed
415 //FIXME error handling if no result is found 419 //FIXME error handling if no result is found
416 auto preprocessor = d->filterIt.next(); 420 auto preprocessor = d->filterIt.next();
417 d->pipeline->storage().scan(d->key, [this, preprocessor](void *keyValue, int keySize, void *dataValue, int dataSize) -> bool { 421 d->pipeline->storage().createTransaction(Akonadi2::Storage::ReadOnly).scan(d->key, [this, preprocessor](const QByteArray &key, const QByteArray &value) -> bool {
418 auto entity = Akonadi2::GetEntity(dataValue); 422 auto entity = Akonadi2::GetEntity(value);
419 preprocessor->process(*this, *entity); 423 preprocessor->process(*this, *entity);
420 return false; 424 return false;
421 }, [this](const Akonadi2::Storage::Error &error) { 425 }, [this](const Akonadi2::Storage::Error &error) {