From 5e590a4729b20751d27cb9e9f8af8b512d93c6ed Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Tue, 11 Aug 2015 23:49:22 +0200 Subject: Ported pipeline to new API --- common/pipeline.cpp | 22 +++++++++++++--------- 1 file changed, 13 insertions(+), 9 deletions(-) (limited to 'common') diff --git a/common/pipeline.cpp b/common/pipeline.cpp index 0ebe2f3..0231631 100644 --- a/common/pipeline.cpp +++ b/common/pipeline.cpp @@ -143,8 +143,10 @@ KAsync::Job Pipeline::newEntity(void const *command, size_t size) flatbuffers::FlatBufferBuilder fbb; EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), entity->resource()->Data(), entity->resource()->size(), entity->local()->Data(), entity->local()->size()); - storage().write(key.data(), key.size(), fbb.GetBufferPointer(), fbb.GetSize()); - storage().setMaxRevision(newRevision); + auto transaction = storage().createTransaction(Akonadi2::Storage::ReadWrite); + transaction.write(key, QByteArray::fromRawData(reinterpret_cast(fbb.GetBufferPointer()), fbb.GetSize())); + Akonadi2::Storage::setMaxRevision(transaction, newRevision); + transaction.commit(); Log() << "Pipeline: wrote entity: " << key << newRevision; return KAsync::start([this, key, entityType](KAsync::Future &future) { @@ -198,7 +200,7 @@ KAsync::Job Pipeline::modifiedEntity(void const *command, size_t size) auto diff = adaptorFactory->createAdaptor(*diffEntity); QSharedPointer current; - storage().scan(QByteArray::fromRawData(key.data(), key.size()), [¤t, adaptorFactory](const QByteArray &data) -> bool { + storage().createTransaction(Akonadi2::Storage::ReadOnly).scan(key, [¤t, adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool { Akonadi2::EntityBuffer buffer(const_cast(data.data()), data.size()); if (!buffer.isValid()) { Warning() << "Read invalid buffer from disk"; @@ -243,8 +245,9 @@ KAsync::Job Pipeline::modifiedEntity(void const *command, size_t size) adaptorFactory->createBuffer(*newObject, fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize()); //TODO don't overwrite the old entry, but instead store a new revision - storage().write(key.data(), key.size(), fbb.GetBufferPointer(), fbb.GetSize()); - storage().setMaxRevision(newRevision); + auto transaction = storage().createTransaction(Akonadi2::Storage::ReadWrite); + transaction.write(key, QByteArray::fromRawData(reinterpret_cast(fbb.GetBufferPointer()), fbb.GetSize())); + Akonadi2::Storage::setMaxRevision(transaction, newRevision); return KAsync::start([this, key, entityType](KAsync::Future &future) { PipelineState state(this, ModifiedPipeline, key, d->modifiedPipeline[entityType], [&future]() { @@ -274,8 +277,9 @@ KAsync::Job Pipeline::deletedEntity(void const *command, size_t size) const QByteArray key = QByteArray(reinterpret_cast(deleteEntity->entityId()->Data()), deleteEntity->entityId()->size()); //TODO instead of deleting the entry, a new revision should be created that marks the entity as deleted - storage().remove(key.data(), key.size()); - storage().setMaxRevision(newRevision); + auto transaction = storage().createTransaction(Akonadi2::Storage::ReadWrite); + transaction.remove(key); + Akonadi2::Storage::setMaxRevision(transaction, newRevision); Log() << "Pipeline: deleted entity: "<< newRevision; return KAsync::start([this, key, entityType](KAsync::Future &future) { @@ -414,8 +418,8 @@ void PipelineState::step() //TODO skip step if already processed //FIXME error handling if no result is found auto preprocessor = d->filterIt.next(); - d->pipeline->storage().scan(d->key, [this, preprocessor](void *keyValue, int keySize, void *dataValue, int dataSize) -> bool { - auto entity = Akonadi2::GetEntity(dataValue); + d->pipeline->storage().createTransaction(Akonadi2::Storage::ReadOnly).scan(d->key, [this, preprocessor](const QByteArray &key, const QByteArray &value) -> bool { + auto entity = Akonadi2::GetEntity(value); preprocessor->process(*this, *entity); return false; }, [this](const Akonadi2::Storage::Error &error) { -- cgit v1.2.3