summaryrefslogtreecommitdiffstats
path: root/common/pipeline.cpp
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2016-02-01 15:36:42 +0100
committerChristian Mollekopf <chrigi_1@fastmail.fm>2016-02-01 15:36:42 +0100
commitb1571c2be7342a0f7474e6a94d9c55230241fa1c (patch)
tree2fd90359cfb5faba9d07490a480a9e210dba2097 /common/pipeline.cpp
parent19998fce1c67fc99ccf9f30530275925927b88f3 (diff)
downloadsink-b1571c2be7342a0f7474e6a94d9c55230241fa1c.tar.gz
sink-b1571c2be7342a0f7474e6a94d9c55230241fa1c.zip
Avoid unnecessary namespace usage
Diffstat (limited to 'common/pipeline.cpp')
-rw-r--r--common/pipeline.cpp106
1 files changed, 53 insertions, 53 deletions
diff --git a/common/pipeline.cpp b/common/pipeline.cpp
index f861ab6..0a71c8d 100644
--- a/common/pipeline.cpp
+++ b/common/pipeline.cpp
@@ -86,7 +86,7 @@ void Pipeline::startTransaction()
86 if (d->transaction) { 86 if (d->transaction) {
87 return; 87 return;
88 } 88 }
89 d->transaction = std::move(storage().createTransaction(Sink::Storage::ReadWrite)); 89 d->transaction = std::move(storage().createTransaction(Storage::ReadWrite));
90} 90}
91 91
92void Pipeline::commit() 92void Pipeline::commit()
@@ -96,7 +96,7 @@ void Pipeline::commit()
96 // for (auto processor : d->processors[bufferType]) { 96 // for (auto processor : d->processors[bufferType]) {
97 // processor->finalize(); 97 // processor->finalize();
98 // } 98 // }
99 const auto revision = Sink::Storage::maxRevision(d->transaction); 99 const auto revision = Storage::maxRevision(d->transaction);
100 Trace() << "Committing " << revision; 100 Trace() << "Committing " << revision;
101 if (d->transaction) { 101 if (d->transaction) {
102 d->transaction.commit(); 102 d->transaction.commit();
@@ -120,14 +120,14 @@ Storage &Pipeline::storage() const
120 120
121void Pipeline::storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid) 121void Pipeline::storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid)
122{ 122{
123 Storage::mainDatabase(d->transaction, bufferType).write(Sink::Storage::assembleKey(uid, newRevision), BufferUtils::extractBuffer(fbb), 123 Storage::mainDatabase(d->transaction, bufferType).write(Storage::assembleKey(uid, newRevision), BufferUtils::extractBuffer(fbb),
124 [](const Sink::Storage::Error &error) { 124 [](const Storage::Error &error) {
125 Warning() << "Failed to write entity"; 125 Warning() << "Failed to write entity";
126 } 126 }
127 ); 127 );
128 d->revisionChanged = true; 128 d->revisionChanged = true;
129 Sink::Storage::setMaxRevision(d->transaction, newRevision); 129 Storage::setMaxRevision(d->transaction, newRevision);
130 Sink::Storage::recordRevision(d->transaction, newRevision, uid, bufferType); 130 Storage::recordRevision(d->transaction, newRevision, uid, bufferType);
131} 131}
132 132
133KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size) 133KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size)
@@ -136,23 +136,23 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size)
136 136
137 { 137 {
138 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size); 138 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size);
139 if (!Sink::Commands::VerifyCreateEntityBuffer(verifyer)) { 139 if (!Commands::VerifyCreateEntityBuffer(verifyer)) {
140 Warning() << "invalid buffer, not a create entity buffer"; 140 Warning() << "invalid buffer, not a create entity buffer";
141 return KAsync::error<qint64>(0); 141 return KAsync::error<qint64>(0);
142 } 142 }
143 } 143 }
144 auto createEntity = Sink::Commands::GetCreateEntity(command); 144 auto createEntity = Commands::GetCreateEntity(command);
145 145
146 const bool replayToSource = createEntity->replayToSource(); 146 const bool replayToSource = createEntity->replayToSource();
147 const QByteArray bufferType = QByteArray(reinterpret_cast<char const*>(createEntity->domainType()->Data()), createEntity->domainType()->size()); 147 const QByteArray bufferType = QByteArray(reinterpret_cast<char const*>(createEntity->domainType()->Data()), createEntity->domainType()->size());
148 { 148 {
149 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(createEntity->delta()->Data()), createEntity->delta()->size()); 149 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(createEntity->delta()->Data()), createEntity->delta()->size());
150 if (!Sink::VerifyEntityBuffer(verifyer)) { 150 if (!VerifyEntityBuffer(verifyer)) {
151 Warning() << "invalid buffer, not an entity buffer"; 151 Warning() << "invalid buffer, not an entity buffer";
152 return KAsync::error<qint64>(0); 152 return KAsync::error<qint64>(0);
153 } 153 }
154 } 154 }
155 auto entity = Sink::GetEntity(createEntity->delta()->Data()); 155 auto entity = GetEntity(createEntity->delta()->Data());
156 if (!entity->resource()->size() && !entity->local()->size()) { 156 if (!entity->resource()->size() && !entity->local()->size()) {
157 Warning() << "No local and no resource buffer while trying to create entity."; 157 Warning() << "No local and no resource buffer while trying to create entity.";
158 return KAsync::error<qint64>(0); 158 return KAsync::error<qint64>(0);
@@ -171,16 +171,16 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size)
171 key = QUuid::createUuid().toString().toUtf8(); 171 key = QUuid::createUuid().toString().toUtf8();
172 } 172 }
173 Q_ASSERT(!key.isEmpty()); 173 Q_ASSERT(!key.isEmpty());
174 const qint64 newRevision = Sink::Storage::maxRevision(d->transaction) + 1; 174 const qint64 newRevision = Storage::maxRevision(d->transaction) + 1;
175 175
176 //Add metadata buffer 176 //Add metadata buffer
177 flatbuffers::FlatBufferBuilder metadataFbb; 177 flatbuffers::FlatBufferBuilder metadataFbb;
178 auto metadataBuilder = Sink::MetadataBuilder(metadataFbb); 178 auto metadataBuilder = MetadataBuilder(metadataFbb);
179 metadataBuilder.add_revision(newRevision); 179 metadataBuilder.add_revision(newRevision);
180 metadataBuilder.add_operation(Sink::Operation_Creation); 180 metadataBuilder.add_operation(Operation_Creation);
181 metadataBuilder.add_replayToSource(replayToSource); 181 metadataBuilder.add_replayToSource(replayToSource);
182 auto metadataBuffer = metadataBuilder.Finish(); 182 auto metadataBuffer = metadataBuilder.Finish();
183 Sink::FinishMetadataBuffer(metadataFbb, metadataBuffer); 183 FinishMetadataBuffer(metadataFbb, metadataBuffer);
184 184
185 flatbuffers::FlatBufferBuilder fbb; 185 flatbuffers::FlatBufferBuilder fbb;
186 EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), entity->resource()->Data(), entity->resource()->size(), entity->local()->Data(), entity->local()->size()); 186 EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), entity->resource()->Data(), entity->resource()->size(), entity->local()->Data(), entity->local()->size());
@@ -194,14 +194,14 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size)
194 } 194 }
195 195
196 Log() << "Pipeline: wrote entity: " << key << newRevision << bufferType; 196 Log() << "Pipeline: wrote entity: " << key << newRevision << bufferType;
197 Storage::mainDatabase(d->transaction, bufferType).scan(Sink::Storage::assembleKey(key, newRevision), [this, bufferType, newRevision, adaptorFactory, key](const QByteArray &, const QByteArray &value) -> bool { 197 Storage::mainDatabase(d->transaction, bufferType).scan(Storage::assembleKey(key, newRevision), [this, bufferType, newRevision, adaptorFactory, key](const QByteArray &, const QByteArray &value) -> bool {
198 auto entity = Sink::GetEntity(value); 198 auto entity = GetEntity(value);
199 auto adaptor = adaptorFactory->createAdaptor(*entity); 199 auto adaptor = adaptorFactory->createAdaptor(*entity);
200 for (auto processor : d->processors[bufferType]) { 200 for (auto processor : d->processors[bufferType]) {
201 processor->newEntity(key, newRevision, *adaptor, d->transaction); 201 processor->newEntity(key, newRevision, *adaptor, d->transaction);
202 } 202 }
203 return false; 203 return false;
204 }, [this](const Sink::Storage::Error &error) { 204 }, [this](const Storage::Error &error) {
205 ErrorMsg() << "Failed to find value in pipeline: " << error.message; 205 ErrorMsg() << "Failed to find value in pipeline: " << error.message;
206 }); 206 });
207 return KAsync::start<qint64>([newRevision](){ 207 return KAsync::start<qint64>([newRevision](){
@@ -213,16 +213,16 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size)
213{ 213{
214 Trace() << "Pipeline: Modified Entity"; 214 Trace() << "Pipeline: Modified Entity";
215 215
216 const qint64 newRevision = Sink::Storage::maxRevision(d->transaction) + 1; 216 const qint64 newRevision = Storage::maxRevision(d->transaction) + 1;
217 217
218 { 218 {
219 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size); 219 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size);
220 if (!Sink::Commands::VerifyModifyEntityBuffer(verifyer)) { 220 if (!Commands::VerifyModifyEntityBuffer(verifyer)) {
221 Warning() << "invalid buffer, not a modify entity buffer"; 221 Warning() << "invalid buffer, not a modify entity buffer";
222 return KAsync::error<qint64>(0); 222 return KAsync::error<qint64>(0);
223 } 223 }
224 } 224 }
225 auto modifyEntity = Sink::Commands::GetModifyEntity(command); 225 auto modifyEntity = Commands::GetModifyEntity(command);
226 Q_ASSERT(modifyEntity); 226 Q_ASSERT(modifyEntity);
227 227
228 const qint64 baseRevision = modifyEntity->revision(); 228 const qint64 baseRevision = modifyEntity->revision();
@@ -236,7 +236,7 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size)
236 } 236 }
237 { 237 {
238 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(modifyEntity->delta()->Data()), modifyEntity->delta()->size()); 238 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(modifyEntity->delta()->Data()), modifyEntity->delta()->size());
239 if (!Sink::VerifyEntityBuffer(verifyer)) { 239 if (!VerifyEntityBuffer(verifyer)) {
240 Warning() << "invalid buffer, not an entity buffer"; 240 Warning() << "invalid buffer, not an entity buffer";
241 return KAsync::error<qint64>(0); 241 return KAsync::error<qint64>(0);
242 } 242 }
@@ -249,13 +249,13 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size)
249 return KAsync::error<qint64>(0); 249 return KAsync::error<qint64>(0);
250 } 250 }
251 251
252 auto diffEntity = Sink::GetEntity(modifyEntity->delta()->Data()); 252 auto diffEntity = GetEntity(modifyEntity->delta()->Data());
253 Q_ASSERT(diffEntity); 253 Q_ASSERT(diffEntity);
254 auto diff = adaptorFactory->createAdaptor(*diffEntity); 254 auto diff = adaptorFactory->createAdaptor(*diffEntity);
255 255
256 QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> current; 256 QSharedPointer<ApplicationDomain::BufferAdaptor> current;
257 Storage::mainDatabase(d->transaction, bufferType).findLatest(key, [&current, adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool { 257 Storage::mainDatabase(d->transaction, bufferType).findLatest(key, [&current, adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool {
258 Sink::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); 258 EntityBuffer buffer(const_cast<const char *>(data.data()), data.size());
259 if (!buffer.isValid()) { 259 if (!buffer.isValid()) {
260 Warning() << "Read invalid buffer from disk"; 260 Warning() << "Read invalid buffer from disk";
261 } else { 261 } else {
@@ -273,8 +273,8 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size)
273 } 273 }
274 274
275 //resource and uid don't matter at this point 275 //resource and uid don't matter at this point
276 const Sink::ApplicationDomain::ApplicationDomainType existingObject("", "", newRevision, current); 276 const ApplicationDomain::ApplicationDomainType existingObject("", "", newRevision, current);
277 auto newObject = Sink::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<Sink::ApplicationDomain::ApplicationDomainType>(existingObject); 277 auto newObject = ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<ApplicationDomain::ApplicationDomainType>(existingObject);
278 278
279 //Apply diff 279 //Apply diff
280 //FIXME only apply the properties that are available in the buffer 280 //FIXME only apply the properties that are available in the buffer
@@ -299,29 +299,29 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size)
299 299
300 //Add metadata buffer 300 //Add metadata buffer
301 flatbuffers::FlatBufferBuilder metadataFbb; 301 flatbuffers::FlatBufferBuilder metadataFbb;
302 auto metadataBuilder = Sink::MetadataBuilder(metadataFbb); 302 auto metadataBuilder = MetadataBuilder(metadataFbb);
303 metadataBuilder.add_revision(newRevision); 303 metadataBuilder.add_revision(newRevision);
304 metadataBuilder.add_operation(Sink::Operation_Modification); 304 metadataBuilder.add_operation(Operation_Modification);
305 metadataBuilder.add_replayToSource(replayToSource); 305 metadataBuilder.add_replayToSource(replayToSource);
306 auto metadataBuffer = metadataBuilder.Finish(); 306 auto metadataBuffer = metadataBuilder.Finish();
307 Sink::FinishMetadataBuffer(metadataFbb, metadataBuffer); 307 FinishMetadataBuffer(metadataFbb, metadataBuffer);
308 308
309 flatbuffers::FlatBufferBuilder fbb; 309 flatbuffers::FlatBufferBuilder fbb;
310 adaptorFactory->createBuffer(*newObject, fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize()); 310 adaptorFactory->createBuffer(*newObject, fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize());
311 311
312 storeNewRevision(newRevision, fbb, bufferType, key); 312 storeNewRevision(newRevision, fbb, bufferType, key);
313 Log() << "Pipeline: modified entity: " << key << newRevision << bufferType; 313 Log() << "Pipeline: modified entity: " << key << newRevision << bufferType;
314 Storage::mainDatabase(d->transaction, bufferType).scan(Sink::Storage::assembleKey(key, newRevision), [this, bufferType, newRevision, adaptorFactory, current, key](const QByteArray &k, const QByteArray &value) -> bool { 314 Storage::mainDatabase(d->transaction, bufferType).scan(Storage::assembleKey(key, newRevision), [this, bufferType, newRevision, adaptorFactory, current, key](const QByteArray &k, const QByteArray &value) -> bool {
315 if (value.isEmpty()) { 315 if (value.isEmpty()) {
316 ErrorMsg() << "Read buffer is empty."; 316 ErrorMsg() << "Read buffer is empty.";
317 } 317 }
318 auto entity = Sink::GetEntity(value.data()); 318 auto entity = GetEntity(value.data());
319 auto newEntity = adaptorFactory->createAdaptor(*entity); 319 auto newEntity = adaptorFactory->createAdaptor(*entity);
320 for (auto processor : d->processors[bufferType]) { 320 for (auto processor : d->processors[bufferType]) {
321 processor->modifiedEntity(key, newRevision, *current, *newEntity, d->transaction); 321 processor->modifiedEntity(key, newRevision, *current, *newEntity, d->transaction);
322 } 322 }
323 return false; 323 return false;
324 }, [this](const Sink::Storage::Error &error) { 324 }, [this](const Storage::Error &error) {
325 ErrorMsg() << "Failed to find value in pipeline: " << error.message; 325 ErrorMsg() << "Failed to find value in pipeline: " << error.message;
326 }); 326 });
327 return KAsync::start<qint64>([newRevision](){ 327 return KAsync::start<qint64>([newRevision](){
@@ -335,12 +335,12 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size)
335 335
336 { 336 {
337 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size); 337 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size);
338 if (!Sink::Commands::VerifyDeleteEntityBuffer(verifyer)) { 338 if (!Commands::VerifyDeleteEntityBuffer(verifyer)) {
339 Warning() << "invalid buffer, not a delete entity buffer"; 339 Warning() << "invalid buffer, not a delete entity buffer";
340 return KAsync::error<qint64>(0); 340 return KAsync::error<qint64>(0);
341 } 341 }
342 } 342 }
343 auto deleteEntity = Sink::Commands::GetDeleteEntity(command); 343 auto deleteEntity = Commands::GetDeleteEntity(command);
344 344
345 const bool replayToSource = deleteEntity->replayToSource(); 345 const bool replayToSource = deleteEntity->replayToSource();
346 const QByteArray bufferType = QByteArray(reinterpret_cast<char const*>(deleteEntity->domainType()->Data()), deleteEntity->domainType()->size()); 346 const QByteArray bufferType = QByteArray(reinterpret_cast<char const*>(deleteEntity->domainType()->Data()), deleteEntity->domainType()->size());
@@ -349,11 +349,11 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size)
349 bool found = false; 349 bool found = false;
350 bool alreadyRemoved = false; 350 bool alreadyRemoved = false;
351 Storage::mainDatabase(d->transaction, bufferType).findLatest(key, [&found, &alreadyRemoved](const QByteArray &key, const QByteArray &data) -> bool { 351 Storage::mainDatabase(d->transaction, bufferType).findLatest(key, [&found, &alreadyRemoved](const QByteArray &key, const QByteArray &data) -> bool {
352 auto entity = Sink::GetEntity(data.data()); 352 auto entity = GetEntity(data.data());
353 if (entity && entity->metadata()) { 353 if (entity && entity->metadata()) {
354 auto metadata = Sink::GetMetadata(entity->metadata()->Data()); 354 auto metadata = GetMetadata(entity->metadata()->Data());
355 found = true; 355 found = true;
356 if (metadata->operation() == Sink::Operation_Removal) { 356 if (metadata->operation() == Operation_Removal) {
357 alreadyRemoved = true; 357 alreadyRemoved = true;
358 } 358 }
359 359
@@ -373,16 +373,16 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size)
373 return KAsync::error<qint64>(0); 373 return KAsync::error<qint64>(0);
374 } 374 }
375 375
376 const qint64 newRevision = Sink::Storage::maxRevision(d->transaction) + 1; 376 const qint64 newRevision = Storage::maxRevision(d->transaction) + 1;
377 377
378 //Add metadata buffer 378 //Add metadata buffer
379 flatbuffers::FlatBufferBuilder metadataFbb; 379 flatbuffers::FlatBufferBuilder metadataFbb;
380 auto metadataBuilder = Sink::MetadataBuilder(metadataFbb); 380 auto metadataBuilder = MetadataBuilder(metadataFbb);
381 metadataBuilder.add_revision(newRevision); 381 metadataBuilder.add_revision(newRevision);
382 metadataBuilder.add_operation(Sink::Operation_Removal); 382 metadataBuilder.add_operation(Operation_Removal);
383 metadataBuilder.add_replayToSource(replayToSource); 383 metadataBuilder.add_replayToSource(replayToSource);
384 auto metadataBuffer = metadataBuilder.Finish(); 384 auto metadataBuffer = metadataBuilder.Finish();
385 Sink::FinishMetadataBuffer(metadataFbb, metadataBuffer); 385 FinishMetadataBuffer(metadataFbb, metadataBuffer);
386 386
387 flatbuffers::FlatBufferBuilder fbb; 387 flatbuffers::FlatBufferBuilder fbb;
388 EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), 0, 0, 0, 0); 388 EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), 0, 0, 0, 0);
@@ -393,16 +393,16 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size)
393 return KAsync::error<qint64>(0); 393 return KAsync::error<qint64>(0);
394 } 394 }
395 395
396 QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> current; 396 QSharedPointer<ApplicationDomain::BufferAdaptor> current;
397 Storage::mainDatabase(d->transaction, bufferType).findLatest(key, [this, bufferType, newRevision, adaptorFactory, key, &current](const QByteArray &, const QByteArray &data) -> bool { 397 Storage::mainDatabase(d->transaction, bufferType).findLatest(key, [this, bufferType, newRevision, adaptorFactory, key, &current](const QByteArray &, const QByteArray &data) -> bool {
398 Sink::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); 398 EntityBuffer buffer(const_cast<const char *>(data.data()), data.size());
399 if (!buffer.isValid()) { 399 if (!buffer.isValid()) {
400 Warning() << "Read invalid buffer from disk"; 400 Warning() << "Read invalid buffer from disk";
401 } else { 401 } else {
402 current = adaptorFactory->createAdaptor(buffer.entity()); 402 current = adaptorFactory->createAdaptor(buffer.entity());
403 } 403 }
404 return false; 404 return false;
405 }, [this](const Sink::Storage::Error &error) { 405 }, [this](const Storage::Error &error) {
406 ErrorMsg() << "Failed to find value in pipeline: " << error.message; 406 ErrorMsg() << "Failed to find value in pipeline: " << error.message;
407 }); 407 });
408 408
@@ -420,33 +420,33 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size)
420 420
421void Pipeline::cleanupRevision(qint64 revision) 421void Pipeline::cleanupRevision(qint64 revision)
422{ 422{
423 const auto uid = Sink::Storage::getUidFromRevision(d->transaction, revision); 423 const auto uid = Storage::getUidFromRevision(d->transaction, revision);
424 const auto bufferType = Sink::Storage::getTypeFromRevision(d->transaction, revision); 424 const auto bufferType = Storage::getTypeFromRevision(d->transaction, revision);
425 Trace() << "Cleaning up revision " << revision << uid << bufferType; 425 Trace() << "Cleaning up revision " << revision << uid << bufferType;
426 Storage::mainDatabase(d->transaction, bufferType).scan(uid, [&](const QByteArray &key, const QByteArray &data) -> bool { 426 Storage::mainDatabase(d->transaction, bufferType).scan(uid, [&](const QByteArray &key, const QByteArray &data) -> bool {
427 Sink::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); 427 EntityBuffer buffer(const_cast<const char *>(data.data()), data.size());
428 if (!buffer.isValid()) { 428 if (!buffer.isValid()) {
429 Warning() << "Read invalid buffer from disk"; 429 Warning() << "Read invalid buffer from disk";
430 } else { 430 } else {
431 const auto metadata = flatbuffers::GetRoot<Sink::Metadata>(buffer.metadataBuffer()); 431 const auto metadata = flatbuffers::GetRoot<Metadata>(buffer.metadataBuffer());
432 const qint64 rev = metadata->revision(); 432 const qint64 rev = metadata->revision();
433 //Remove old revisions, and the current if the entity has already been removed 433 //Remove old revisions, and the current if the entity has already been removed
434 if (rev < revision || metadata->operation() == Sink::Operation_Removal) { 434 if (rev < revision || metadata->operation() == Operation_Removal) {
435 Sink::Storage::removeRevision(d->transaction, rev); 435 Storage::removeRevision(d->transaction, rev);
436 Storage::mainDatabase(d->transaction, bufferType).remove(key); 436 Storage::mainDatabase(d->transaction, bufferType).remove(key);
437 } 437 }
438 } 438 }
439 439
440 return true; 440 return true;
441 }, [](const Sink::Storage::Error &error) { 441 }, [](const Storage::Error &error) {
442 Warning() << "Error while reading: " << error.message; 442 Warning() << "Error while reading: " << error.message;
443 }, true); 443 }, true);
444 Sink::Storage::setCleanedUpRevision(d->transaction, revision); 444 Storage::setCleanedUpRevision(d->transaction, revision);
445} 445}
446 446
447qint64 Pipeline::cleanedUpRevision() 447qint64 Pipeline::cleanedUpRevision()
448{ 448{
449 return Sink::Storage::cleanedUpRevision(d->transaction); 449 return Storage::cleanedUpRevision(d->transaction);
450} 450}
451 451
452Preprocessor::Preprocessor() 452Preprocessor::Preprocessor()