diff options
author | Christian Mollekopf <chrigi_1@fastmail.fm> | 2015-08-19 14:05:05 +0200 |
---|---|---|
committer | Christian Mollekopf <chrigi_1@fastmail.fm> | 2015-08-19 14:05:05 +0200 |
commit | 67bb6035b6333fe0d6d8566b5962f83c5870185f (patch) | |
tree | 39f2fdbeb4ad814cbe0066f1df627b56328f5fe1 /common/pipeline.cpp | |
parent | b6502ce1137b3ef7af8a908a9fa5d8fbeed6ed32 (diff) | |
download | sink-67bb6035b6333fe0d6d8566b5962f83c5870185f.tar.gz sink-67bb6035b6333fe0d6d8566b5962f83c5870185f.zip |
Transactions in the pipeline
Diffstat (limited to 'common/pipeline.cpp')
-rw-r--r-- | common/pipeline.cpp | 48 |
1 files changed, 34 insertions, 14 deletions
diff --git a/common/pipeline.cpp b/common/pipeline.cpp index 207cc5e..27b9deb 100644 --- a/common/pipeline.cpp +++ b/common/pipeline.cpp | |||
@@ -47,6 +47,7 @@ public: | |||
47 | } | 47 | } |
48 | 48 | ||
49 | Storage storage; | 49 | Storage storage; |
50 | Storage::Transaction transaction; | ||
50 | QHash<QString, QVector<Preprocessor *> > nullPipeline; | 51 | QHash<QString, QVector<Preprocessor *> > nullPipeline; |
51 | QHash<QString, QVector<Preprocessor *> > newPipeline; | 52 | QHash<QString, QVector<Preprocessor *> > newPipeline; |
52 | QHash<QString, QVector<Preprocessor *> > modifiedPipeline; | 53 | QHash<QString, QVector<Preprocessor *> > modifiedPipeline; |
@@ -89,6 +90,27 @@ void Pipeline::setAdaptorFactory(const QString &entityType, DomainTypeAdaptorFac | |||
89 | d->adaptorFactory.insert(entityType, factory); | 90 | d->adaptorFactory.insert(entityType, factory); |
90 | } | 91 | } |
91 | 92 | ||
93 | void Pipeline::startTransaction() | ||
94 | { | ||
95 | if (d->transaction) { | ||
96 | return; | ||
97 | } | ||
98 | d->transaction = std::move(storage().createTransaction(Akonadi2::Storage::ReadWrite)); | ||
99 | } | ||
100 | |||
101 | void Pipeline::commit() | ||
102 | { | ||
103 | if (d->transaction) { | ||
104 | d->transaction.commit(); | ||
105 | } | ||
106 | d->transaction = Storage::Transaction(); | ||
107 | } | ||
108 | |||
109 | Storage::Transaction &Pipeline::transaction() | ||
110 | { | ||
111 | return d->transaction; | ||
112 | } | ||
113 | |||
92 | Storage &Pipeline::storage() const | 114 | Storage &Pipeline::storage() const |
93 | { | 115 | { |
94 | return d->storage; | 116 | return d->storage; |
@@ -109,7 +131,7 @@ KAsync::Job<void> Pipeline::newEntity(void const *command, size_t size) | |||
109 | //TODO toRFC4122 would probably be more efficient, but results in non-printable keys. | 131 | //TODO toRFC4122 would probably be more efficient, but results in non-printable keys. |
110 | const auto key = QUuid::createUuid().toString().toUtf8(); | 132 | const auto key = QUuid::createUuid().toString().toUtf8(); |
111 | 133 | ||
112 | const qint64 newRevision = storage().maxRevision() + 1; | 134 | const qint64 newRevision = Akonadi2::Storage::maxRevision(d->transaction) + 1; |
113 | 135 | ||
114 | { | 136 | { |
115 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size); | 137 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size); |
@@ -143,10 +165,8 @@ KAsync::Job<void> Pipeline::newEntity(void const *command, size_t size) | |||
143 | flatbuffers::FlatBufferBuilder fbb; | 165 | flatbuffers::FlatBufferBuilder fbb; |
144 | EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), entity->resource()->Data(), entity->resource()->size(), entity->local()->Data(), entity->local()->size()); | 166 | EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), entity->resource()->Data(), entity->resource()->size(), entity->local()->Data(), entity->local()->size()); |
145 | 167 | ||
146 | auto transaction = storage().createTransaction(Akonadi2::Storage::ReadWrite); | 168 | d->transaction.write(key, QByteArray::fromRawData(reinterpret_cast<char const *>(fbb.GetBufferPointer()), fbb.GetSize())); |
147 | transaction.write(key, QByteArray::fromRawData(reinterpret_cast<char const *>(fbb.GetBufferPointer()), fbb.GetSize())); | 169 | Akonadi2::Storage::setMaxRevision(d->transaction, newRevision); |
148 | Akonadi2::Storage::setMaxRevision(transaction, newRevision); | ||
149 | transaction.commit(); | ||
150 | Log() << "Pipeline: wrote entity: " << key << newRevision; | 170 | Log() << "Pipeline: wrote entity: " << key << newRevision; |
151 | 171 | ||
152 | return KAsync::start<void>([this, key, entityType](KAsync::Future<void> &future) { | 172 | return KAsync::start<void>([this, key, entityType](KAsync::Future<void> &future) { |
@@ -162,7 +182,7 @@ KAsync::Job<void> Pipeline::modifiedEntity(void const *command, size_t size) | |||
162 | { | 182 | { |
163 | Log() << "Pipeline: Modified Entity"; | 183 | Log() << "Pipeline: Modified Entity"; |
164 | 184 | ||
165 | const qint64 newRevision = storage().maxRevision() + 1; | 185 | const qint64 newRevision = Akonadi2::Storage::maxRevision(d->transaction) + 1; |
166 | 186 | ||
167 | { | 187 | { |
168 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size); | 188 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size); |
@@ -245,9 +265,8 @@ KAsync::Job<void> Pipeline::modifiedEntity(void const *command, size_t size) | |||
245 | adaptorFactory->createBuffer(*newObject, fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize()); | 265 | adaptorFactory->createBuffer(*newObject, fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize()); |
246 | 266 | ||
247 | //TODO don't overwrite the old entry, but instead store a new revision | 267 | //TODO don't overwrite the old entry, but instead store a new revision |
248 | auto transaction = storage().createTransaction(Akonadi2::Storage::ReadWrite); | 268 | d->transaction.write(key, QByteArray::fromRawData(reinterpret_cast<char const *>(fbb.GetBufferPointer()), fbb.GetSize())); |
249 | transaction.write(key, QByteArray::fromRawData(reinterpret_cast<char const *>(fbb.GetBufferPointer()), fbb.GetSize())); | 269 | Akonadi2::Storage::setMaxRevision(d->transaction, newRevision); |
250 | Akonadi2::Storage::setMaxRevision(transaction, newRevision); | ||
251 | 270 | ||
252 | return KAsync::start<void>([this, key, entityType](KAsync::Future<void> &future) { | 271 | return KAsync::start<void>([this, key, entityType](KAsync::Future<void> &future) { |
253 | PipelineState state(this, ModifiedPipeline, key, d->modifiedPipeline[entityType], [&future]() { | 272 | PipelineState state(this, ModifiedPipeline, key, d->modifiedPipeline[entityType], [&future]() { |
@@ -262,7 +281,7 @@ KAsync::Job<void> Pipeline::deletedEntity(void const *command, size_t size) | |||
262 | { | 281 | { |
263 | Log() << "Pipeline: Deleted Entity"; | 282 | Log() << "Pipeline: Deleted Entity"; |
264 | 283 | ||
265 | const qint64 newRevision = storage().maxRevision() + 1; | 284 | const qint64 newRevision = Akonadi2::Storage::maxRevision(d->transaction) + 1; |
266 | 285 | ||
267 | { | 286 | { |
268 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size); | 287 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size); |
@@ -277,9 +296,8 @@ KAsync::Job<void> Pipeline::deletedEntity(void const *command, size_t size) | |||
277 | const QByteArray key = QByteArray(reinterpret_cast<char const*>(deleteEntity->entityId()->Data()), deleteEntity->entityId()->size()); | 296 | const QByteArray key = QByteArray(reinterpret_cast<char const*>(deleteEntity->entityId()->Data()), deleteEntity->entityId()->size()); |
278 | 297 | ||
279 | //TODO instead of deleting the entry, a new revision should be created that marks the entity as deleted | 298 | //TODO instead of deleting the entry, a new revision should be created that marks the entity as deleted |
280 | auto transaction = storage().createTransaction(Akonadi2::Storage::ReadWrite); | 299 | d->transaction.remove(key); |
281 | transaction.remove(key); | 300 | Akonadi2::Storage::setMaxRevision(d->transaction, newRevision); |
282 | Akonadi2::Storage::setMaxRevision(transaction, newRevision); | ||
283 | Log() << "Pipeline: deleted entity: "<< newRevision; | 301 | Log() << "Pipeline: deleted entity: "<< newRevision; |
284 | 302 | ||
285 | return KAsync::start<void>([this, key, entityType](KAsync::Future<void> &future) { | 303 | return KAsync::start<void>([this, key, entityType](KAsync::Future<void> &future) { |
@@ -418,7 +436,9 @@ void PipelineState::step() | |||
418 | //TODO skip step if already processed | 436 | //TODO skip step if already processed |
419 | //FIXME error handling if no result is found | 437 | //FIXME error handling if no result is found |
420 | auto preprocessor = d->filterIt.next(); | 438 | auto preprocessor = d->filterIt.next(); |
421 | d->pipeline->storage().createTransaction(Akonadi2::Storage::ReadOnly).scan(d->key, [this, preprocessor](const QByteArray &key, const QByteArray &value) -> bool { | 439 | //FIXME this read should not be necessary |
440 | //Perhaps simply use entity that is initially stored and synchronously process all filters. (Making the first filter somewhat redundant) | ||
441 | d->pipeline->transaction().scan(d->key, [this, preprocessor](const QByteArray &key, const QByteArray &value) -> bool { | ||
422 | auto entity = Akonadi2::GetEntity(value); | 442 | auto entity = Akonadi2::GetEntity(value); |
423 | preprocessor->process(*this, *entity); | 443 | preprocessor->process(*this, *entity); |
424 | return false; | 444 | return false; |