diff options
author | Christian Mollekopf <chrigi_1@fastmail.fm> | 2015-08-11 23:49:22 +0200 |
---|---|---|
committer | Christian Mollekopf <chrigi_1@fastmail.fm> | 2015-08-11 23:49:22 +0200 |
commit | 5e590a4729b20751d27cb9e9f8af8b512d93c6ed (patch) | |
tree | 0e4a37c409b8ab47e1e58e5971119907c8c8798e /common/pipeline.cpp | |
parent | a9dc9ed667f06fa1828773d1bb8671ec2731dce5 (diff) | |
download | sink-5e590a4729b20751d27cb9e9f8af8b512d93c6ed.tar.gz sink-5e590a4729b20751d27cb9e9f8af8b512d93c6ed.zip |
Ported pipeline to new API
Diffstat (limited to 'common/pipeline.cpp')
-rw-r--r-- | common/pipeline.cpp | 22 |
1 files changed, 13 insertions, 9 deletions
diff --git a/common/pipeline.cpp b/common/pipeline.cpp index 0ebe2f3..0231631 100644 --- a/common/pipeline.cpp +++ b/common/pipeline.cpp | |||
@@ -143,8 +143,10 @@ KAsync::Job<void> Pipeline::newEntity(void const *command, size_t size) | |||
143 | flatbuffers::FlatBufferBuilder fbb; | 143 | flatbuffers::FlatBufferBuilder fbb; |
144 | EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), entity->resource()->Data(), entity->resource()->size(), entity->local()->Data(), entity->local()->size()); | 144 | EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), entity->resource()->Data(), entity->resource()->size(), entity->local()->Data(), entity->local()->size()); |
145 | 145 | ||
146 | storage().write(key.data(), key.size(), fbb.GetBufferPointer(), fbb.GetSize()); | 146 | auto transaction = storage().createTransaction(Akonadi2::Storage::ReadWrite); |
147 | storage().setMaxRevision(newRevision); | 147 | transaction.write(key, QByteArray::fromRawData(reinterpret_cast<char const *>(fbb.GetBufferPointer()), fbb.GetSize())); |
148 | Akonadi2::Storage::setMaxRevision(transaction, newRevision); | ||
149 | transaction.commit(); | ||
148 | Log() << "Pipeline: wrote entity: " << key << newRevision; | 150 | Log() << "Pipeline: wrote entity: " << key << newRevision; |
149 | 151 | ||
150 | return KAsync::start<void>([this, key, entityType](KAsync::Future<void> &future) { | 152 | return KAsync::start<void>([this, key, entityType](KAsync::Future<void> &future) { |
@@ -198,7 +200,7 @@ KAsync::Job<void> Pipeline::modifiedEntity(void const *command, size_t size) | |||
198 | auto diff = adaptorFactory->createAdaptor(*diffEntity); | 200 | auto diff = adaptorFactory->createAdaptor(*diffEntity); |
199 | 201 | ||
200 | QSharedPointer<Akonadi2::ApplicationDomain::BufferAdaptor> current; | 202 | QSharedPointer<Akonadi2::ApplicationDomain::BufferAdaptor> current; |
201 | storage().scan(QByteArray::fromRawData(key.data(), key.size()), [¤t, adaptorFactory](const QByteArray &data) -> bool { | 203 | storage().createTransaction(Akonadi2::Storage::ReadOnly).scan(key, [¤t, adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool { |
202 | Akonadi2::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); | 204 | Akonadi2::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); |
203 | if (!buffer.isValid()) { | 205 | if (!buffer.isValid()) { |
204 | Warning() << "Read invalid buffer from disk"; | 206 | Warning() << "Read invalid buffer from disk"; |
@@ -243,8 +245,9 @@ KAsync::Job<void> Pipeline::modifiedEntity(void const *command, size_t size) | |||
243 | adaptorFactory->createBuffer(*newObject, fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize()); | 245 | adaptorFactory->createBuffer(*newObject, fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize()); |
244 | 246 | ||
245 | //TODO don't overwrite the old entry, but instead store a new revision | 247 | //TODO don't overwrite the old entry, but instead store a new revision |
246 | storage().write(key.data(), key.size(), fbb.GetBufferPointer(), fbb.GetSize()); | 248 | auto transaction = storage().createTransaction(Akonadi2::Storage::ReadWrite); |
247 | storage().setMaxRevision(newRevision); | 249 | transaction.write(key, QByteArray::fromRawData(reinterpret_cast<char const *>(fbb.GetBufferPointer()), fbb.GetSize())); |
250 | Akonadi2::Storage::setMaxRevision(transaction, newRevision); | ||
248 | 251 | ||
249 | return KAsync::start<void>([this, key, entityType](KAsync::Future<void> &future) { | 252 | return KAsync::start<void>([this, key, entityType](KAsync::Future<void> &future) { |
250 | PipelineState state(this, ModifiedPipeline, key, d->modifiedPipeline[entityType], [&future]() { | 253 | PipelineState state(this, ModifiedPipeline, key, d->modifiedPipeline[entityType], [&future]() { |
@@ -274,8 +277,9 @@ KAsync::Job<void> Pipeline::deletedEntity(void const *command, size_t size) | |||
274 | const QByteArray key = QByteArray(reinterpret_cast<char const*>(deleteEntity->entityId()->Data()), deleteEntity->entityId()->size()); | 277 | const QByteArray key = QByteArray(reinterpret_cast<char const*>(deleteEntity->entityId()->Data()), deleteEntity->entityId()->size()); |
275 | 278 | ||
276 | //TODO instead of deleting the entry, a new revision should be created that marks the entity as deleted | 279 | //TODO instead of deleting the entry, a new revision should be created that marks the entity as deleted |
277 | storage().remove(key.data(), key.size()); | 280 | auto transaction = storage().createTransaction(Akonadi2::Storage::ReadWrite); |
278 | storage().setMaxRevision(newRevision); | 281 | transaction.remove(key); |
282 | Akonadi2::Storage::setMaxRevision(transaction, newRevision); | ||
279 | Log() << "Pipeline: deleted entity: "<< newRevision; | 283 | Log() << "Pipeline: deleted entity: "<< newRevision; |
280 | 284 | ||
281 | return KAsync::start<void>([this, key, entityType](KAsync::Future<void> &future) { | 285 | return KAsync::start<void>([this, key, entityType](KAsync::Future<void> &future) { |
@@ -414,8 +418,8 @@ void PipelineState::step() | |||
414 | //TODO skip step if already processed | 418 | //TODO skip step if already processed |
415 | //FIXME error handling if no result is found | 419 | //FIXME error handling if no result is found |
416 | auto preprocessor = d->filterIt.next(); | 420 | auto preprocessor = d->filterIt.next(); |
417 | d->pipeline->storage().scan(d->key, [this, preprocessor](void *keyValue, int keySize, void *dataValue, int dataSize) -> bool { | 421 | d->pipeline->storage().createTransaction(Akonadi2::Storage::ReadOnly).scan(d->key, [this, preprocessor](const QByteArray &key, const QByteArray &value) -> bool { |
418 | auto entity = Akonadi2::GetEntity(dataValue); | 422 | auto entity = Akonadi2::GetEntity(value); |
419 | preprocessor->process(*this, *entity); | 423 | preprocessor->process(*this, *entity); |
420 | return false; | 424 | return false; |
421 | }, [this](const Akonadi2::Storage::Error &error) { | 425 | }, [this](const Akonadi2::Storage::Error &error) { |