summaryrefslogtreecommitdiffstats
path: root/common/pipeline.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'common/pipeline.cpp')
-rw-r--r--common/pipeline.cpp48
1 files changed, 34 insertions, 14 deletions
diff --git a/common/pipeline.cpp b/common/pipeline.cpp
index 207cc5e..27b9deb 100644
--- a/common/pipeline.cpp
+++ b/common/pipeline.cpp
@@ -47,6 +47,7 @@ public:
47 } 47 }
48 48
49 Storage storage; 49 Storage storage;
50 Storage::Transaction transaction;
50 QHash<QString, QVector<Preprocessor *> > nullPipeline; 51 QHash<QString, QVector<Preprocessor *> > nullPipeline;
51 QHash<QString, QVector<Preprocessor *> > newPipeline; 52 QHash<QString, QVector<Preprocessor *> > newPipeline;
52 QHash<QString, QVector<Preprocessor *> > modifiedPipeline; 53 QHash<QString, QVector<Preprocessor *> > modifiedPipeline;
@@ -89,6 +90,27 @@ void Pipeline::setAdaptorFactory(const QString &entityType, DomainTypeAdaptorFac
89 d->adaptorFactory.insert(entityType, factory); 90 d->adaptorFactory.insert(entityType, factory);
90} 91}
91 92
93void Pipeline::startTransaction()
94{
95 if (d->transaction) {
96 return;
97 }
98 d->transaction = std::move(storage().createTransaction(Akonadi2::Storage::ReadWrite));
99}
100
101void Pipeline::commit()
102{
103 if (d->transaction) {
104 d->transaction.commit();
105 }
106 d->transaction = Storage::Transaction();
107}
108
109Storage::Transaction &Pipeline::transaction()
110{
111 return d->transaction;
112}
113
92Storage &Pipeline::storage() const 114Storage &Pipeline::storage() const
93{ 115{
94 return d->storage; 116 return d->storage;
@@ -109,7 +131,7 @@ KAsync::Job<void> Pipeline::newEntity(void const *command, size_t size)
109 //TODO toRFC4122 would probably be more efficient, but results in non-printable keys. 131 //TODO toRFC4122 would probably be more efficient, but results in non-printable keys.
110 const auto key = QUuid::createUuid().toString().toUtf8(); 132 const auto key = QUuid::createUuid().toString().toUtf8();
111 133
112 const qint64 newRevision = storage().maxRevision() + 1; 134 const qint64 newRevision = Akonadi2::Storage::maxRevision(d->transaction) + 1;
113 135
114 { 136 {
115 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size); 137 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size);
@@ -143,10 +165,8 @@ KAsync::Job<void> Pipeline::newEntity(void const *command, size_t size)
143 flatbuffers::FlatBufferBuilder fbb; 165 flatbuffers::FlatBufferBuilder fbb;
144 EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), entity->resource()->Data(), entity->resource()->size(), entity->local()->Data(), entity->local()->size()); 166 EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), entity->resource()->Data(), entity->resource()->size(), entity->local()->Data(), entity->local()->size());
145 167
146 auto transaction = storage().createTransaction(Akonadi2::Storage::ReadWrite); 168 d->transaction.write(key, QByteArray::fromRawData(reinterpret_cast<char const *>(fbb.GetBufferPointer()), fbb.GetSize()));
147 transaction.write(key, QByteArray::fromRawData(reinterpret_cast<char const *>(fbb.GetBufferPointer()), fbb.GetSize())); 169 Akonadi2::Storage::setMaxRevision(d->transaction, newRevision);
148 Akonadi2::Storage::setMaxRevision(transaction, newRevision);
149 transaction.commit();
150 Log() << "Pipeline: wrote entity: " << key << newRevision; 170 Log() << "Pipeline: wrote entity: " << key << newRevision;
151 171
152 return KAsync::start<void>([this, key, entityType](KAsync::Future<void> &future) { 172 return KAsync::start<void>([this, key, entityType](KAsync::Future<void> &future) {
@@ -162,7 +182,7 @@ KAsync::Job<void> Pipeline::modifiedEntity(void const *command, size_t size)
162{ 182{
163 Log() << "Pipeline: Modified Entity"; 183 Log() << "Pipeline: Modified Entity";
164 184
165 const qint64 newRevision = storage().maxRevision() + 1; 185 const qint64 newRevision = Akonadi2::Storage::maxRevision(d->transaction) + 1;
166 186
167 { 187 {
168 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size); 188 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size);
@@ -245,9 +265,8 @@ KAsync::Job<void> Pipeline::modifiedEntity(void const *command, size_t size)
245 adaptorFactory->createBuffer(*newObject, fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize()); 265 adaptorFactory->createBuffer(*newObject, fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize());
246 266
247 //TODO don't overwrite the old entry, but instead store a new revision 267 //TODO don't overwrite the old entry, but instead store a new revision
248 auto transaction = storage().createTransaction(Akonadi2::Storage::ReadWrite); 268 d->transaction.write(key, QByteArray::fromRawData(reinterpret_cast<char const *>(fbb.GetBufferPointer()), fbb.GetSize()));
249 transaction.write(key, QByteArray::fromRawData(reinterpret_cast<char const *>(fbb.GetBufferPointer()), fbb.GetSize())); 269 Akonadi2::Storage::setMaxRevision(d->transaction, newRevision);
250 Akonadi2::Storage::setMaxRevision(transaction, newRevision);
251 270
252 return KAsync::start<void>([this, key, entityType](KAsync::Future<void> &future) { 271 return KAsync::start<void>([this, key, entityType](KAsync::Future<void> &future) {
253 PipelineState state(this, ModifiedPipeline, key, d->modifiedPipeline[entityType], [&future]() { 272 PipelineState state(this, ModifiedPipeline, key, d->modifiedPipeline[entityType], [&future]() {
@@ -262,7 +281,7 @@ KAsync::Job<void> Pipeline::deletedEntity(void const *command, size_t size)
262{ 281{
263 Log() << "Pipeline: Deleted Entity"; 282 Log() << "Pipeline: Deleted Entity";
264 283
265 const qint64 newRevision = storage().maxRevision() + 1; 284 const qint64 newRevision = Akonadi2::Storage::maxRevision(d->transaction) + 1;
266 285
267 { 286 {
268 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size); 287 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size);
@@ -277,9 +296,8 @@ KAsync::Job<void> Pipeline::deletedEntity(void const *command, size_t size)
277 const QByteArray key = QByteArray(reinterpret_cast<char const*>(deleteEntity->entityId()->Data()), deleteEntity->entityId()->size()); 296 const QByteArray key = QByteArray(reinterpret_cast<char const*>(deleteEntity->entityId()->Data()), deleteEntity->entityId()->size());
278 297
279 //TODO instead of deleting the entry, a new revision should be created that marks the entity as deleted 298 //TODO instead of deleting the entry, a new revision should be created that marks the entity as deleted
280 auto transaction = storage().createTransaction(Akonadi2::Storage::ReadWrite); 299 d->transaction.remove(key);
281 transaction.remove(key); 300 Akonadi2::Storage::setMaxRevision(d->transaction, newRevision);
282 Akonadi2::Storage::setMaxRevision(transaction, newRevision);
283 Log() << "Pipeline: deleted entity: "<< newRevision; 301 Log() << "Pipeline: deleted entity: "<< newRevision;
284 302
285 return KAsync::start<void>([this, key, entityType](KAsync::Future<void> &future) { 303 return KAsync::start<void>([this, key, entityType](KAsync::Future<void> &future) {
@@ -418,7 +436,9 @@ void PipelineState::step()
418 //TODO skip step if already processed 436 //TODO skip step if already processed
419 //FIXME error handling if no result is found 437 //FIXME error handling if no result is found
420 auto preprocessor = d->filterIt.next(); 438 auto preprocessor = d->filterIt.next();
421 d->pipeline->storage().createTransaction(Akonadi2::Storage::ReadOnly).scan(d->key, [this, preprocessor](const QByteArray &key, const QByteArray &value) -> bool { 439 //FIXME this read should not be necessary
440 //Perhaps simply use entity that is initially stored and synchronously process all filters. (Making the first filter somewhat redundant)
441 d->pipeline->transaction().scan(d->key, [this, preprocessor](const QByteArray &key, const QByteArray &value) -> bool {
422 auto entity = Akonadi2::GetEntity(value); 442 auto entity = Akonadi2::GetEntity(value);
423 preprocessor->process(*this, *entity); 443 preprocessor->process(*this, *entity);
424 return false; 444 return false;