summaryrefslogtreecommitdiffstats
path: root/common/pipeline.cpp
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2016-01-20 19:07:07 +0100
committerChristian Mollekopf <chrigi_1@fastmail.fm>2016-01-20 19:07:07 +0100
commitbdb01c2c068df326f5a8328ed1492ab1bea388c5 (patch)
tree25c2ee1b29bc481b6914c244ed9ca194b1415d16 /common/pipeline.cpp
parent17e7ee40c9185c0505883853345fd6024c675b1a (diff)
downloadsink-bdb01c2c068df326f5a8328ed1492ab1bea388c5.tar.gz
sink-bdb01c2c068df326f5a8328ed1492ab1bea388c5.zip
Renamed Akonadi2 to Sink
(except for documentation).
Diffstat (limited to 'common/pipeline.cpp')
-rw-r--r--common/pipeline.cpp114
1 files changed, 57 insertions, 57 deletions
diff --git a/common/pipeline.cpp b/common/pipeline.cpp
index a087def..401c26d 100644
--- a/common/pipeline.cpp
+++ b/common/pipeline.cpp
@@ -36,14 +36,14 @@
36#include "definitions.h" 36#include "definitions.h"
37#include "bufferutils.h" 37#include "bufferutils.h"
38 38
39namespace Akonadi2 39namespace Sink
40{ 40{
41 41
42class Pipeline::Private 42class Pipeline::Private
43{ 43{
44public: 44public:
45 Private(const QString &resourceName) 45 Private(const QString &resourceName)
46 : storage(Akonadi2::storageLocation(), resourceName, Storage::ReadWrite), 46 : storage(Sink::storageLocation(), resourceName, Storage::ReadWrite),
47 revisionChanged(false) 47 revisionChanged(false)
48 { 48 {
49 } 49 }
@@ -86,7 +86,7 @@ void Pipeline::startTransaction()
86 if (d->transaction) { 86 if (d->transaction) {
87 return; 87 return;
88 } 88 }
89 d->transaction = std::move(storage().createTransaction(Akonadi2::Storage::ReadWrite)); 89 d->transaction = std::move(storage().createTransaction(Sink::Storage::ReadWrite));
90} 90}
91 91
92void Pipeline::commit() 92void Pipeline::commit()
@@ -96,7 +96,7 @@ void Pipeline::commit()
96 // for (auto processor : d->processors[bufferType]) { 96 // for (auto processor : d->processors[bufferType]) {
97 // processor->finalize(); 97 // processor->finalize();
98 // } 98 // }
99 const auto revision = Akonadi2::Storage::maxRevision(d->transaction); 99 const auto revision = Sink::Storage::maxRevision(d->transaction);
100 Trace() << "Committing " << revision; 100 Trace() << "Committing " << revision;
101 if (d->transaction) { 101 if (d->transaction) {
102 d->transaction.commit(); 102 d->transaction.commit();
@@ -120,14 +120,14 @@ Storage &Pipeline::storage() const
120 120
121void Pipeline::storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid) 121void Pipeline::storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid)
122{ 122{
123 d->transaction.openDatabase(bufferType + ".main").write(Akonadi2::Storage::assembleKey(uid, newRevision), BufferUtils::extractBuffer(fbb), 123 d->transaction.openDatabase(bufferType + ".main").write(Sink::Storage::assembleKey(uid, newRevision), BufferUtils::extractBuffer(fbb),
124 [](const Akonadi2::Storage::Error &error) { 124 [](const Sink::Storage::Error &error) {
125 Warning() << "Failed to write entity"; 125 Warning() << "Failed to write entity";
126 } 126 }
127 ); 127 );
128 d->revisionChanged = true; 128 d->revisionChanged = true;
129 Akonadi2::Storage::setMaxRevision(d->transaction, newRevision); 129 Sink::Storage::setMaxRevision(d->transaction, newRevision);
130 Akonadi2::Storage::recordRevision(d->transaction, newRevision, uid, bufferType); 130 Sink::Storage::recordRevision(d->transaction, newRevision, uid, bufferType);
131} 131}
132 132
133KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size) 133KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size)
@@ -136,23 +136,23 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size)
136 136
137 { 137 {
138 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size); 138 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size);
139 if (!Akonadi2::Commands::VerifyCreateEntityBuffer(verifyer)) { 139 if (!Sink::Commands::VerifyCreateEntityBuffer(verifyer)) {
140 Warning() << "invalid buffer, not a create entity buffer"; 140 Warning() << "invalid buffer, not a create entity buffer";
141 return KAsync::error<qint64>(0); 141 return KAsync::error<qint64>(0);
142 } 142 }
143 } 143 }
144 auto createEntity = Akonadi2::Commands::GetCreateEntity(command); 144 auto createEntity = Sink::Commands::GetCreateEntity(command);
145 145
146 const bool replayToSource = createEntity->replayToSource(); 146 const bool replayToSource = createEntity->replayToSource();
147 const QByteArray bufferType = QByteArray(reinterpret_cast<char const*>(createEntity->domainType()->Data()), createEntity->domainType()->size()); 147 const QByteArray bufferType = QByteArray(reinterpret_cast<char const*>(createEntity->domainType()->Data()), createEntity->domainType()->size());
148 { 148 {
149 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(createEntity->delta()->Data()), createEntity->delta()->size()); 149 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(createEntity->delta()->Data()), createEntity->delta()->size());
150 if (!Akonadi2::VerifyEntityBuffer(verifyer)) { 150 if (!Sink::VerifyEntityBuffer(verifyer)) {
151 Warning() << "invalid buffer, not an entity buffer"; 151 Warning() << "invalid buffer, not an entity buffer";
152 return KAsync::error<qint64>(0); 152 return KAsync::error<qint64>(0);
153 } 153 }
154 } 154 }
155 auto entity = Akonadi2::GetEntity(createEntity->delta()->Data()); 155 auto entity = Sink::GetEntity(createEntity->delta()->Data());
156 if (!entity->resource()->size() && !entity->local()->size()) { 156 if (!entity->resource()->size() && !entity->local()->size()) {
157 Warning() << "No local and no resource buffer while trying to create entity."; 157 Warning() << "No local and no resource buffer while trying to create entity.";
158 return KAsync::error<qint64>(0); 158 return KAsync::error<qint64>(0);
@@ -171,16 +171,16 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size)
171 key = QUuid::createUuid().toString().toUtf8(); 171 key = QUuid::createUuid().toString().toUtf8();
172 } 172 }
173 Q_ASSERT(!key.isEmpty()); 173 Q_ASSERT(!key.isEmpty());
174 const qint64 newRevision = Akonadi2::Storage::maxRevision(d->transaction) + 1; 174 const qint64 newRevision = Sink::Storage::maxRevision(d->transaction) + 1;
175 175
176 //Add metadata buffer 176 //Add metadata buffer
177 flatbuffers::FlatBufferBuilder metadataFbb; 177 flatbuffers::FlatBufferBuilder metadataFbb;
178 auto metadataBuilder = Akonadi2::MetadataBuilder(metadataFbb); 178 auto metadataBuilder = Sink::MetadataBuilder(metadataFbb);
179 metadataBuilder.add_revision(newRevision); 179 metadataBuilder.add_revision(newRevision);
180 metadataBuilder.add_operation(Akonadi2::Operation_Creation); 180 metadataBuilder.add_operation(Sink::Operation_Creation);
181 metadataBuilder.add_replayToSource(replayToSource); 181 metadataBuilder.add_replayToSource(replayToSource);
182 auto metadataBuffer = metadataBuilder.Finish(); 182 auto metadataBuffer = metadataBuilder.Finish();
183 Akonadi2::FinishMetadataBuffer(metadataFbb, metadataBuffer); 183 Sink::FinishMetadataBuffer(metadataFbb, metadataBuffer);
184 184
185 flatbuffers::FlatBufferBuilder fbb; 185 flatbuffers::FlatBufferBuilder fbb;
186 EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), entity->resource()->Data(), entity->resource()->size(), entity->local()->Data(), entity->local()->size()); 186 EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), entity->resource()->Data(), entity->resource()->size(), entity->local()->Data(), entity->local()->size());
@@ -194,14 +194,14 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size)
194 } 194 }
195 195
196 Log() << "Pipeline: wrote entity: " << key << newRevision << bufferType; 196 Log() << "Pipeline: wrote entity: " << key << newRevision << bufferType;
197 d->transaction.openDatabase(bufferType + ".main").scan(Akonadi2::Storage::assembleKey(key, newRevision), [this, bufferType, newRevision, adaptorFactory, key](const QByteArray &, const QByteArray &value) -> bool { 197 d->transaction.openDatabase(bufferType + ".main").scan(Sink::Storage::assembleKey(key, newRevision), [this, bufferType, newRevision, adaptorFactory, key](const QByteArray &, const QByteArray &value) -> bool {
198 auto entity = Akonadi2::GetEntity(value); 198 auto entity = Sink::GetEntity(value);
199 auto adaptor = adaptorFactory->createAdaptor(*entity); 199 auto adaptor = adaptorFactory->createAdaptor(*entity);
200 for (auto processor : d->processors[bufferType]) { 200 for (auto processor : d->processors[bufferType]) {
201 processor->newEntity(key, newRevision, *adaptor, d->transaction); 201 processor->newEntity(key, newRevision, *adaptor, d->transaction);
202 } 202 }
203 return false; 203 return false;
204 }, [this](const Akonadi2::Storage::Error &error) { 204 }, [this](const Sink::Storage::Error &error) {
205 ErrorMsg() << "Failed to find value in pipeline: " << error.message; 205 ErrorMsg() << "Failed to find value in pipeline: " << error.message;
206 }); 206 });
207 return KAsync::start<qint64>([newRevision](){ 207 return KAsync::start<qint64>([newRevision](){
@@ -213,16 +213,16 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size)
213{ 213{
214 Trace() << "Pipeline: Modified Entity"; 214 Trace() << "Pipeline: Modified Entity";
215 215
216 const qint64 newRevision = Akonadi2::Storage::maxRevision(d->transaction) + 1; 216 const qint64 newRevision = Sink::Storage::maxRevision(d->transaction) + 1;
217 217
218 { 218 {
219 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size); 219 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size);
220 if (!Akonadi2::Commands::VerifyModifyEntityBuffer(verifyer)) { 220 if (!Sink::Commands::VerifyModifyEntityBuffer(verifyer)) {
221 Warning() << "invalid buffer, not a modify entity buffer"; 221 Warning() << "invalid buffer, not a modify entity buffer";
222 return KAsync::error<qint64>(0); 222 return KAsync::error<qint64>(0);
223 } 223 }
224 } 224 }
225 auto modifyEntity = Akonadi2::Commands::GetModifyEntity(command); 225 auto modifyEntity = Sink::Commands::GetModifyEntity(command);
226 Q_ASSERT(modifyEntity); 226 Q_ASSERT(modifyEntity);
227 227
228 const qint64 baseRevision = modifyEntity->revision(); 228 const qint64 baseRevision = modifyEntity->revision();
@@ -236,7 +236,7 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size)
236 } 236 }
237 { 237 {
238 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(modifyEntity->delta()->Data()), modifyEntity->delta()->size()); 238 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(modifyEntity->delta()->Data()), modifyEntity->delta()->size());
239 if (!Akonadi2::VerifyEntityBuffer(verifyer)) { 239 if (!Sink::VerifyEntityBuffer(verifyer)) {
240 Warning() << "invalid buffer, not an entity buffer"; 240 Warning() << "invalid buffer, not an entity buffer";
241 return KAsync::error<qint64>(0); 241 return KAsync::error<qint64>(0);
242 } 242 }
@@ -249,13 +249,13 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size)
249 return KAsync::error<qint64>(0); 249 return KAsync::error<qint64>(0);
250 } 250 }
251 251
252 auto diffEntity = Akonadi2::GetEntity(modifyEntity->delta()->Data()); 252 auto diffEntity = Sink::GetEntity(modifyEntity->delta()->Data());
253 Q_ASSERT(diffEntity); 253 Q_ASSERT(diffEntity);
254 auto diff = adaptorFactory->createAdaptor(*diffEntity); 254 auto diff = adaptorFactory->createAdaptor(*diffEntity);
255 255
256 QSharedPointer<Akonadi2::ApplicationDomain::BufferAdaptor> current; 256 QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> current;
257 d->transaction.openDatabase(bufferType + ".main").findLatest(key, [&current, adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool { 257 d->transaction.openDatabase(bufferType + ".main").findLatest(key, [&current, adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool {
258 Akonadi2::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); 258 Sink::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size());
259 if (!buffer.isValid()) { 259 if (!buffer.isValid()) {
260 Warning() << "Read invalid buffer from disk"; 260 Warning() << "Read invalid buffer from disk";
261 } else { 261 } else {
@@ -273,8 +273,8 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size)
273 } 273 }
274 274
275 //resource and uid don't matter at this point 275 //resource and uid don't matter at this point
276 const Akonadi2::ApplicationDomain::ApplicationDomainType existingObject("", "", newRevision, current); 276 const Sink::ApplicationDomain::ApplicationDomainType existingObject("", "", newRevision, current);
277 auto newObject = Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<Akonadi2::ApplicationDomain::ApplicationDomainType>(existingObject); 277 auto newObject = Sink::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<Sink::ApplicationDomain::ApplicationDomainType>(existingObject);
278 278
279 //Apply diff 279 //Apply diff
280 //FIXME only apply the properties that are available in the buffer 280 //FIXME only apply the properties that are available in the buffer
@@ -292,26 +292,26 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size)
292 292
293 //Add metadata buffer 293 //Add metadata buffer
294 flatbuffers::FlatBufferBuilder metadataFbb; 294 flatbuffers::FlatBufferBuilder metadataFbb;
295 auto metadataBuilder = Akonadi2::MetadataBuilder(metadataFbb); 295 auto metadataBuilder = Sink::MetadataBuilder(metadataFbb);
296 metadataBuilder.add_revision(newRevision); 296 metadataBuilder.add_revision(newRevision);
297 metadataBuilder.add_operation(Akonadi2::Operation_Modification); 297 metadataBuilder.add_operation(Sink::Operation_Modification);
298 metadataBuilder.add_replayToSource(replayToSource); 298 metadataBuilder.add_replayToSource(replayToSource);
299 auto metadataBuffer = metadataBuilder.Finish(); 299 auto metadataBuffer = metadataBuilder.Finish();
300 Akonadi2::FinishMetadataBuffer(metadataFbb, metadataBuffer); 300 Sink::FinishMetadataBuffer(metadataFbb, metadataBuffer);
301 301
302 flatbuffers::FlatBufferBuilder fbb; 302 flatbuffers::FlatBufferBuilder fbb;
303 adaptorFactory->createBuffer(*newObject, fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize()); 303 adaptorFactory->createBuffer(*newObject, fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize());
304 304
305 storeNewRevision(newRevision, fbb, bufferType, key); 305 storeNewRevision(newRevision, fbb, bufferType, key);
306 Log() << "Pipeline: modified entity: " << key << newRevision << bufferType; 306 Log() << "Pipeline: modified entity: " << key << newRevision << bufferType;
307 d->transaction.openDatabase(bufferType + ".main").scan(Akonadi2::Storage::assembleKey(key, newRevision), [this, bufferType, newRevision, adaptorFactory, current, key](const QByteArray &, const QByteArray &value) -> bool { 307 d->transaction.openDatabase(bufferType + ".main").scan(Sink::Storage::assembleKey(key, newRevision), [this, bufferType, newRevision, adaptorFactory, current, key](const QByteArray &, const QByteArray &value) -> bool {
308 auto entity = Akonadi2::GetEntity(value); 308 auto entity = Sink::GetEntity(value);
309 auto newEntity = adaptorFactory->createAdaptor(*entity); 309 auto newEntity = adaptorFactory->createAdaptor(*entity);
310 for (auto processor : d->processors[bufferType]) { 310 for (auto processor : d->processors[bufferType]) {
311 processor->modifiedEntity(key, newRevision, *current, *newEntity, d->transaction); 311 processor->modifiedEntity(key, newRevision, *current, *newEntity, d->transaction);
312 } 312 }
313 return false; 313 return false;
314 }, [this](const Akonadi2::Storage::Error &error) { 314 }, [this](const Sink::Storage::Error &error) {
315 ErrorMsg() << "Failed to find value in pipeline: " << error.message; 315 ErrorMsg() << "Failed to find value in pipeline: " << error.message;
316 }); 316 });
317 return KAsync::start<qint64>([newRevision](){ 317 return KAsync::start<qint64>([newRevision](){
@@ -325,12 +325,12 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size)
325 325
326 { 326 {
327 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size); 327 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size);
328 if (!Akonadi2::Commands::VerifyDeleteEntityBuffer(verifyer)) { 328 if (!Sink::Commands::VerifyDeleteEntityBuffer(verifyer)) {
329 Warning() << "invalid buffer, not a delete entity buffer"; 329 Warning() << "invalid buffer, not a delete entity buffer";
330 return KAsync::error<qint64>(0); 330 return KAsync::error<qint64>(0);
331 } 331 }
332 } 332 }
333 auto deleteEntity = Akonadi2::Commands::GetDeleteEntity(command); 333 auto deleteEntity = Sink::Commands::GetDeleteEntity(command);
334 334
335 const bool replayToSource = deleteEntity->replayToSource(); 335 const bool replayToSource = deleteEntity->replayToSource();
336 const QByteArray bufferType = QByteArray(reinterpret_cast<char const*>(deleteEntity->domainType()->Data()), deleteEntity->domainType()->size()); 336 const QByteArray bufferType = QByteArray(reinterpret_cast<char const*>(deleteEntity->domainType()->Data()), deleteEntity->domainType()->size());
@@ -339,12 +339,12 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size)
339 bool found = false; 339 bool found = false;
340 bool alreadyRemoved = false; 340 bool alreadyRemoved = false;
341 d->transaction.openDatabase(bufferType + ".main").findLatest(key, [&found, &alreadyRemoved](const QByteArray &key, const QByteArray &data) -> bool { 341 d->transaction.openDatabase(bufferType + ".main").findLatest(key, [&found, &alreadyRemoved](const QByteArray &key, const QByteArray &data) -> bool {
342 Akonadi2::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); 342 Sink::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size());
343 auto entity = Akonadi2::GetEntity(data.data()); 343 auto entity = Sink::GetEntity(data.data());
344 if (entity && entity->metadata()) { 344 if (entity && entity->metadata()) {
345 auto metadata = Akonadi2::GetMetadata(entity->metadata()->Data()); 345 auto metadata = Sink::GetMetadata(entity->metadata()->Data());
346 found = true; 346 found = true;
347 if (metadata->operation() == Akonadi2::Operation_Removal) { 347 if (metadata->operation() == Sink::Operation_Removal) {
348 alreadyRemoved = true; 348 alreadyRemoved = true;
349 } 349 }
350 350
@@ -364,16 +364,16 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size)
364 return KAsync::error<qint64>(0); 364 return KAsync::error<qint64>(0);
365 } 365 }
366 366
367 const qint64 newRevision = Akonadi2::Storage::maxRevision(d->transaction) + 1; 367 const qint64 newRevision = Sink::Storage::maxRevision(d->transaction) + 1;
368 368
369 //Add metadata buffer 369 //Add metadata buffer
370 flatbuffers::FlatBufferBuilder metadataFbb; 370 flatbuffers::FlatBufferBuilder metadataFbb;
371 auto metadataBuilder = Akonadi2::MetadataBuilder(metadataFbb); 371 auto metadataBuilder = Sink::MetadataBuilder(metadataFbb);
372 metadataBuilder.add_revision(newRevision); 372 metadataBuilder.add_revision(newRevision);
373 metadataBuilder.add_operation(Akonadi2::Operation_Removal); 373 metadataBuilder.add_operation(Sink::Operation_Removal);
374 metadataBuilder.add_replayToSource(replayToSource); 374 metadataBuilder.add_replayToSource(replayToSource);
375 auto metadataBuffer = metadataBuilder.Finish(); 375 auto metadataBuffer = metadataBuilder.Finish();
376 Akonadi2::FinishMetadataBuffer(metadataFbb, metadataBuffer); 376 Sink::FinishMetadataBuffer(metadataFbb, metadataBuffer);
377 377
378 flatbuffers::FlatBufferBuilder fbb; 378 flatbuffers::FlatBufferBuilder fbb;
379 EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), 0, 0, 0, 0); 379 EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), 0, 0, 0, 0);
@@ -384,16 +384,16 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size)
384 return KAsync::error<qint64>(0); 384 return KAsync::error<qint64>(0);
385 } 385 }
386 386
387 QSharedPointer<Akonadi2::ApplicationDomain::BufferAdaptor> current; 387 QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> current;
388 d->transaction.openDatabase(bufferType + ".main").findLatest(key, [this, bufferType, newRevision, adaptorFactory, key, &current](const QByteArray &, const QByteArray &data) -> bool { 388 d->transaction.openDatabase(bufferType + ".main").findLatest(key, [this, bufferType, newRevision, adaptorFactory, key, &current](const QByteArray &, const QByteArray &data) -> bool {
389 Akonadi2::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); 389 Sink::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size());
390 if (!buffer.isValid()) { 390 if (!buffer.isValid()) {
391 Warning() << "Read invalid buffer from disk"; 391 Warning() << "Read invalid buffer from disk";
392 } else { 392 } else {
393 current = adaptorFactory->createAdaptor(buffer.entity()); 393 current = adaptorFactory->createAdaptor(buffer.entity());
394 } 394 }
395 return false; 395 return false;
396 }, [this](const Akonadi2::Storage::Error &error) { 396 }, [this](const Sink::Storage::Error &error) {
397 ErrorMsg() << "Failed to find value in pipeline: " << error.message; 397 ErrorMsg() << "Failed to find value in pipeline: " << error.message;
398 }); 398 });
399 399
@@ -411,33 +411,33 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size)
411 411
412void Pipeline::cleanupRevision(qint64 revision) 412void Pipeline::cleanupRevision(qint64 revision)
413{ 413{
414 const auto uid = Akonadi2::Storage::getUidFromRevision(d->transaction, revision); 414 const auto uid = Sink::Storage::getUidFromRevision(d->transaction, revision);
415 const auto bufferType = Akonadi2::Storage::getTypeFromRevision(d->transaction, revision); 415 const auto bufferType = Sink::Storage::getTypeFromRevision(d->transaction, revision);
416 Trace() << "Cleaning up revision " << revision << uid << bufferType; 416 Trace() << "Cleaning up revision " << revision << uid << bufferType;
417 d->transaction.openDatabase(bufferType + ".main").scan(uid, [&](const QByteArray &key, const QByteArray &data) -> bool { 417 d->transaction.openDatabase(bufferType + ".main").scan(uid, [&](const QByteArray &key, const QByteArray &data) -> bool {
418 Akonadi2::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); 418 Sink::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size());
419 if (!buffer.isValid()) { 419 if (!buffer.isValid()) {
420 Warning() << "Read invalid buffer from disk"; 420 Warning() << "Read invalid buffer from disk";
421 } else { 421 } else {
422 const auto metadata = flatbuffers::GetRoot<Akonadi2::Metadata>(buffer.metadataBuffer()); 422 const auto metadata = flatbuffers::GetRoot<Sink::Metadata>(buffer.metadataBuffer());
423 const qint64 rev = metadata->revision(); 423 const qint64 rev = metadata->revision();
424 //Remove old revisions, and the current if the entity has already been removed 424 //Remove old revisions, and the current if the entity has already been removed
425 if (rev < revision || metadata->operation() == Akonadi2::Operation_Removal) { 425 if (rev < revision || metadata->operation() == Sink::Operation_Removal) {
426 Akonadi2::Storage::removeRevision(d->transaction, rev); 426 Sink::Storage::removeRevision(d->transaction, rev);
427 d->transaction.openDatabase(bufferType + ".main").remove(key); 427 d->transaction.openDatabase(bufferType + ".main").remove(key);
428 } 428 }
429 } 429 }
430 430
431 return true; 431 return true;
432 }, [](const Akonadi2::Storage::Error &error) { 432 }, [](const Sink::Storage::Error &error) {
433 Warning() << "Error while reading: " << error.message; 433 Warning() << "Error while reading: " << error.message;
434 }, true); 434 }, true);
435 Akonadi2::Storage::setCleanedUpRevision(d->transaction, revision); 435 Sink::Storage::setCleanedUpRevision(d->transaction, revision);
436} 436}
437 437
438qint64 Pipeline::cleanedUpRevision() 438qint64 Pipeline::cleanedUpRevision()
439{ 439{
440 return Akonadi2::Storage::cleanedUpRevision(d->transaction); 440 return Sink::Storage::cleanedUpRevision(d->transaction);
441} 441}
442 442
443Preprocessor::Preprocessor() 443Preprocessor::Preprocessor()
@@ -457,5 +457,5 @@ void Preprocessor::finalize()
457{ 457{
458} 458}
459 459
460} // namespace Akonadi2 460} // namespace Sink
461 461