summaryrefslogtreecommitdiffstats
path: root/common/pipeline.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'common/pipeline.cpp')
-rw-r--r--common/pipeline.cpp90
1 files changed, 45 insertions, 45 deletions
diff --git a/common/pipeline.cpp b/common/pipeline.cpp
index c6d5297..f1a4a32 100644
--- a/common/pipeline.cpp
+++ b/common/pipeline.cpp
@@ -38,8 +38,7 @@
38#include "definitions.h" 38#include "definitions.h"
39#include "bufferutils.h" 39#include "bufferutils.h"
40 40
41#undef DEBUG_AREA 41SINK_DEBUG_AREA("pipeline")
42#define DEBUG_AREA "resource.pipeline"
43 42
44namespace Sink { 43namespace Sink {
45 44
@@ -52,7 +51,7 @@ public:
52 51
53 Storage storage; 52 Storage storage;
54 Storage::Transaction transaction; 53 Storage::Transaction transaction;
55 QHash<QString, QVector<Preprocessor *>> processors; 54 QHash<QString, QVector<QSharedPointer<Preprocessor>>> processors;
56 bool revisionChanged; 55 bool revisionChanged;
57 void storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid); 56 void storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid);
58 QTime transactionTime; 57 QTime transactionTime;
@@ -63,10 +62,10 @@ public:
63 62
64void Pipeline::Private::storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid) 63void Pipeline::Private::storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid)
65{ 64{
66 Trace() << "Committing new revision: " << uid << newRevision; 65 SinkTrace() << "Committing new revision: " << uid << newRevision;
67 Storage::mainDatabase(transaction, bufferType) 66 Storage::mainDatabase(transaction, bufferType)
68 .write(Storage::assembleKey(uid, newRevision), BufferUtils::extractBuffer(fbb), 67 .write(Storage::assembleKey(uid, newRevision), BufferUtils::extractBuffer(fbb),
69 [uid, newRevision](const Storage::Error &error) { Warning() << "Failed to write entity" << uid << newRevision; }); 68 [uid, newRevision](const Storage::Error &error) { SinkWarning() << "Failed to write entity" << uid << newRevision; });
70 revisionChanged = true; 69 revisionChanged = true;
71 Storage::setMaxRevision(transaction, newRevision); 70 Storage::setMaxRevision(transaction, newRevision);
72 Storage::recordRevision(transaction, newRevision, uid, bufferType); 71 Storage::recordRevision(transaction, newRevision, uid, bufferType);
@@ -79,15 +78,17 @@ Pipeline::Pipeline(const QString &resourceName, QObject *parent) : QObject(paren
79 78
80Pipeline::~Pipeline() 79Pipeline::~Pipeline()
81{ 80{
82 delete d; 81 d->transaction = Storage::Transaction();
83} 82}
84 83
85void Pipeline::setPreprocessors(const QString &entityType, const QVector<Preprocessor *> &processors) 84void Pipeline::setPreprocessors(const QString &entityType, const QVector<Preprocessor *> &processors)
86{ 85{
86 auto &list = d->processors[entityType];
87 list.clear();
87 for (auto p : processors) { 88 for (auto p : processors) {
88 p->setup(d->resourceType, d->resourceInstanceIdentifier, this); 89 p->setup(d->resourceType, d->resourceInstanceIdentifier, this);
90 list.append(QSharedPointer<Preprocessor>(p));
89 } 91 }
90 d->processors[entityType] = processors;
91} 92}
92 93
93void Pipeline::setResourceType(const QByteArray &resourceType) 94void Pipeline::setResourceType(const QByteArray &resourceType)
@@ -105,21 +106,21 @@ void Pipeline::startTransaction()
105 if (d->transaction) { 106 if (d->transaction) {
106 return; 107 return;
107 } 108 }
108 Trace() << "Starting transaction."; 109 SinkTrace() << "Starting transaction.";
109 d->transactionTime.start(); 110 d->transactionTime.start();
110 d->transactionItemCount = 0; 111 d->transactionItemCount = 0;
111 d->transaction = std::move(storage().createTransaction(Storage::ReadWrite, [](const Sink::Storage::Error &error) { 112 d->transaction = storage().createTransaction(Storage::ReadWrite, [](const Sink::Storage::Error &error) {
112 Warning() << error.message; 113 SinkWarning() << error.message;
113 })); 114 });
114 115
115 //FIXME this is a temporary measure to recover from a failure to open the named databases correctly. 116 //FIXME this is a temporary measure to recover from a failure to open the named databases correctly.
116 //Once the actual problem is fixed it will be enough to simply crash if we open the wrong database (which we check in openDatabase already). 117 //Once the actual problem is fixed it will be enough to simply crash if we open the wrong database (which we check in openDatabase already).
117 //It seems like the validateNamedDatabase calls actually stops the mdb_put failures during sync... 118 //It seems like the validateNamedDatabase calls actually stops the mdb_put failures during sync...
118 if (d->storage.exists()) { 119 if (d->storage.exists()) {
119 while (!d->transaction.validateNamedDatabases()) { 120 while (!d->transaction.validateNamedDatabases()) {
120 Warning() << "Opened an invalid transaction!!!!!!"; 121 SinkWarning() << "Opened an invalid transaction!!!!!!";
121 d->transaction = std::move(storage().createTransaction(Storage::ReadWrite, [](const Sink::Storage::Error &error) { 122 d->transaction = std::move(storage().createTransaction(Storage::ReadWrite, [](const Sink::Storage::Error &error) {
122 Warning() << error.message; 123 SinkWarning() << error.message;
123 })); 124 }));
124 } 125 }
125 } 126 }
@@ -139,7 +140,7 @@ void Pipeline::commit()
139 } 140 }
140 const auto revision = Storage::maxRevision(d->transaction); 141 const auto revision = Storage::maxRevision(d->transaction);
141 const auto elapsed = d->transactionTime.elapsed(); 142 const auto elapsed = d->transactionTime.elapsed();
142 Log() << "Committing revision: " << revision << ":" << d->transactionItemCount << " items in: " << Log::TraceTime(elapsed) << " " 143 SinkLog() << "Committing revision: " << revision << ":" << d->transactionItemCount << " items in: " << Log::TraceTime(elapsed) << " "
143 << (double)elapsed / (double)qMax(d->transactionItemCount, 1) << "[ms/item]"; 144 << (double)elapsed / (double)qMax(d->transactionItemCount, 1) << "[ms/item]";
144 if (d->transaction) { 145 if (d->transaction) {
145 d->transaction.commit(); 146 d->transaction.commit();
@@ -168,7 +169,7 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size)
168 { 169 {
169 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size); 170 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size);
170 if (!Commands::VerifyCreateEntityBuffer(verifyer)) { 171 if (!Commands::VerifyCreateEntityBuffer(verifyer)) {
171 Warning() << "invalid buffer, not a create entity buffer"; 172 SinkWarning() << "invalid buffer, not a create entity buffer";
172 return KAsync::error<qint64>(0); 173 return KAsync::error<qint64>(0);
173 } 174 }
174 } 175 }
@@ -180,7 +181,7 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size)
180 if (createEntity->entityId()) { 181 if (createEntity->entityId()) {
181 key = QByteArray(reinterpret_cast<char const *>(createEntity->entityId()->Data()), createEntity->entityId()->size()); 182 key = QByteArray(reinterpret_cast<char const *>(createEntity->entityId()->Data()), createEntity->entityId()->size());
182 if (Storage::mainDatabase(d->transaction, bufferType).contains(key)) { 183 if (Storage::mainDatabase(d->transaction, bufferType).contains(key)) {
183 ErrorMsg() << "An entity with this id already exists: " << key; 184 SinkError() << "An entity with this id already exists: " << key;
184 return KAsync::error<qint64>(0); 185 return KAsync::error<qint64>(0);
185 } 186 }
186 } 187 }
@@ -188,31 +189,31 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size)
188 if (key.isEmpty()) { 189 if (key.isEmpty()) {
189 key = Sink::Storage::generateUid(); 190 key = Sink::Storage::generateUid();
190 } 191 }
191 Log() << "New Entity. Type: " << bufferType << "uid: "<< key << " replayToSource: " << replayToSource; 192 SinkLog() << "New Entity. Type: " << bufferType << "uid: "<< key << " replayToSource: " << replayToSource;
192 Q_ASSERT(!key.isEmpty()); 193 Q_ASSERT(!key.isEmpty());
193 194
194 { 195 {
195 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(createEntity->delta()->Data()), createEntity->delta()->size()); 196 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(createEntity->delta()->Data()), createEntity->delta()->size());
196 if (!VerifyEntityBuffer(verifyer)) { 197 if (!VerifyEntityBuffer(verifyer)) {
197 Warning() << "invalid buffer, not an entity buffer"; 198 SinkWarning() << "invalid buffer, not an entity buffer";
198 return KAsync::error<qint64>(0); 199 return KAsync::error<qint64>(0);
199 } 200 }
200 } 201 }
201 auto entity = GetEntity(createEntity->delta()->Data()); 202 auto entity = GetEntity(createEntity->delta()->Data());
202 if (!entity->resource()->size() && !entity->local()->size()) { 203 if (!entity->resource()->size() && !entity->local()->size()) {
203 Warning() << "No local and no resource buffer while trying to create entity."; 204 SinkWarning() << "No local and no resource buffer while trying to create entity.";
204 return KAsync::error<qint64>(0); 205 return KAsync::error<qint64>(0);
205 } 206 }
206 207
207 auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceType, bufferType); 208 auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceType, bufferType);
208 if (!adaptorFactory) { 209 if (!adaptorFactory) {
209 Warning() << "no adaptor factory for type " << bufferType; 210 SinkWarning() << "no adaptor factory for type " << bufferType;
210 return KAsync::error<qint64>(0); 211 return KAsync::error<qint64>(0);
211 } 212 }
212 213
213 auto adaptor = adaptorFactory->createAdaptor(*entity); 214 auto adaptor = adaptorFactory->createAdaptor(*entity);
214 auto memoryAdaptor = QSharedPointer<Sink::ApplicationDomain::MemoryBufferAdaptor>::create(*(adaptor), adaptor->availableProperties()); 215 auto memoryAdaptor = QSharedPointer<Sink::ApplicationDomain::MemoryBufferAdaptor>::create(*(adaptor), adaptor->availableProperties());
215 for (auto processor : d->processors[bufferType]) { 216 foreach (const auto &processor, d->processors[bufferType]) {
216 processor->newEntity(key, Storage::maxRevision(d->transaction) + 1, *memoryAdaptor, d->transaction); 217 processor->newEntity(key, Storage::maxRevision(d->transaction) + 1, *memoryAdaptor, d->transaction);
217 } 218 }
218 //The maxRevision may have changed meanwhile if the entity created sub-entities 219 //The maxRevision may have changed meanwhile if the entity created sub-entities
@@ -242,7 +243,7 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size)
242 { 243 {
243 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size); 244 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size);
244 if (!Commands::VerifyModifyEntityBuffer(verifyer)) { 245 if (!Commands::VerifyModifyEntityBuffer(verifyer)) {
245 Warning() << "invalid buffer, not a modify entity buffer"; 246 SinkWarning() << "invalid buffer, not a modify entity buffer";
246 return KAsync::error<qint64>(0); 247 return KAsync::error<qint64>(0);
247 } 248 }
248 } 249 }
@@ -252,21 +253,21 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size)
252 if (modifyEntity->modifiedProperties()) { 253 if (modifyEntity->modifiedProperties()) {
253 changeset = BufferUtils::fromVector(*modifyEntity->modifiedProperties()); 254 changeset = BufferUtils::fromVector(*modifyEntity->modifiedProperties());
254 } else { 255 } else {
255 Warning() << "No changeset available"; 256 SinkWarning() << "No changeset available";
256 } 257 }
257 const qint64 baseRevision = modifyEntity->revision(); 258 const qint64 baseRevision = modifyEntity->revision();
258 const bool replayToSource = modifyEntity->replayToSource(); 259 const bool replayToSource = modifyEntity->replayToSource();
259 const QByteArray bufferType = QByteArray(reinterpret_cast<char const *>(modifyEntity->domainType()->Data()), modifyEntity->domainType()->size()); 260 const QByteArray bufferType = QByteArray(reinterpret_cast<char const *>(modifyEntity->domainType()->Data()), modifyEntity->domainType()->size());
260 const QByteArray key = QByteArray(reinterpret_cast<char const *>(modifyEntity->entityId()->Data()), modifyEntity->entityId()->size()); 261 const QByteArray key = QByteArray(reinterpret_cast<char const *>(modifyEntity->entityId()->Data()), modifyEntity->entityId()->size());
261 Log() << "Modified Entity. Type: " << bufferType << "uid: "<< key << " replayToSource: " << replayToSource; 262 SinkLog() << "Modified Entity. Type: " << bufferType << "uid: "<< key << " replayToSource: " << replayToSource;
262 if (bufferType.isEmpty() || key.isEmpty()) { 263 if (bufferType.isEmpty() || key.isEmpty()) {
263 Warning() << "entity type or key " << bufferType << key; 264 SinkWarning() << "entity type or key " << bufferType << key;
264 return KAsync::error<qint64>(0); 265 return KAsync::error<qint64>(0);
265 } 266 }
266 { 267 {
267 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(modifyEntity->delta()->Data()), modifyEntity->delta()->size()); 268 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(modifyEntity->delta()->Data()), modifyEntity->delta()->size());
268 if (!VerifyEntityBuffer(verifyer)) { 269 if (!VerifyEntityBuffer(verifyer)) {
269 Warning() << "invalid buffer, not an entity buffer"; 270 SinkWarning() << "invalid buffer, not an entity buffer";
270 return KAsync::error<qint64>(0); 271 return KAsync::error<qint64>(0);
271 } 272 }
272 } 273 }
@@ -274,7 +275,7 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size)
274 // TODO use only readPropertyMapper and writePropertyMapper 275 // TODO use only readPropertyMapper and writePropertyMapper
275 auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceType, bufferType); 276 auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceType, bufferType);
276 if (!adaptorFactory) { 277 if (!adaptorFactory) {
277 Warning() << "no adaptor factory for type " << bufferType; 278 SinkWarning() << "no adaptor factory for type " << bufferType;
278 return KAsync::error<qint64>(0); 279 return KAsync::error<qint64>(0);
279 } 280 }
280 281
@@ -288,16 +289,16 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size)
288 [&current, adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool { 289 [&current, adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool {
289 EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); 290 EntityBuffer buffer(const_cast<const char *>(data.data()), data.size());
290 if (!buffer.isValid()) { 291 if (!buffer.isValid()) {
291 Warning() << "Read invalid buffer from disk"; 292 SinkWarning() << "Read invalid buffer from disk";
292 } else { 293 } else {
293 current = adaptorFactory->createAdaptor(buffer.entity()); 294 current = adaptorFactory->createAdaptor(buffer.entity());
294 } 295 }
295 return false; 296 return false;
296 }, 297 },
297 [baseRevision](const Storage::Error &error) { Warning() << "Failed to read old revision from storage: " << error.message << "Revision: " << baseRevision; }); 298 [baseRevision](const Storage::Error &error) { SinkWarning() << "Failed to read old revision from storage: " << error.message << "Revision: " << baseRevision; });
298 299
299 if (!current) { 300 if (!current) {
300 Warning() << "Failed to read local value " << key; 301 SinkWarning() << "Failed to read local value " << key;
301 return KAsync::error<qint64>(0); 302 return KAsync::error<qint64>(0);
302 } 303 }
303 304
@@ -305,7 +306,7 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size)
305 306
306 // Apply diff 307 // Apply diff
307 // FIXME only apply the properties that are available in the buffer 308 // FIXME only apply the properties that are available in the buffer
308 Trace() << "Applying changed properties: " << changeset; 309 SinkTrace() << "Applying changed properties: " << changeset;
309 for (const auto &property : changeset) { 310 for (const auto &property : changeset) {
310 const auto value = diff->getProperty(property); 311 const auto value = diff->getProperty(property);
311 if (value.isValid()) { 312 if (value.isValid()) {
@@ -321,7 +322,7 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size)
321 } 322 }
322 323
323 newAdaptor->resetChangedProperties(); 324 newAdaptor->resetChangedProperties();
324 for (auto processor : d->processors[bufferType]) { 325 foreach (const auto &processor, d->processors[bufferType]) {
325 processor->modifiedEntity(key, Storage::maxRevision(d->transaction) + 1, *current, *newAdaptor, d->transaction); 326 processor->modifiedEntity(key, Storage::maxRevision(d->transaction) + 1, *current, *newAdaptor, d->transaction);
326 } 327 }
327 //The maxRevision may have changed meanwhile if the entity created sub-entities 328 //The maxRevision may have changed meanwhile if the entity created sub-entities
@@ -355,7 +356,7 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size)
355 { 356 {
356 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size); 357 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size);
357 if (!Commands::VerifyDeleteEntityBuffer(verifyer)) { 358 if (!Commands::VerifyDeleteEntityBuffer(verifyer)) {
358 Warning() << "invalid buffer, not a delete entity buffer"; 359 SinkWarning() << "invalid buffer, not a delete entity buffer";
359 return KAsync::error<qint64>(0); 360 return KAsync::error<qint64>(0);
360 } 361 }
361 } 362 }
@@ -364,7 +365,7 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size)
364 const bool replayToSource = deleteEntity->replayToSource(); 365 const bool replayToSource = deleteEntity->replayToSource();
365 const QByteArray bufferType = QByteArray(reinterpret_cast<char const *>(deleteEntity->domainType()->Data()), deleteEntity->domainType()->size()); 366 const QByteArray bufferType = QByteArray(reinterpret_cast<char const *>(deleteEntity->domainType()->Data()), deleteEntity->domainType()->size());
366 const QByteArray key = QByteArray(reinterpret_cast<char const *>(deleteEntity->entityId()->Data()), deleteEntity->entityId()->size()); 367 const QByteArray key = QByteArray(reinterpret_cast<char const *>(deleteEntity->entityId()->Data()), deleteEntity->entityId()->size());
367 Log() << "Deleted Entity. Type: " << bufferType << "uid: "<< key << " replayToSource: " << replayToSource; 368 SinkLog() << "Deleted Entity. Type: " << bufferType << "uid: "<< key << " replayToSource: " << replayToSource;
368 369
369 bool found = false; 370 bool found = false;
370 bool alreadyRemoved = false; 371 bool alreadyRemoved = false;
@@ -381,14 +382,14 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size)
381 } 382 }
382 return false; 383 return false;
383 }, 384 },
384 [](const Storage::Error &error) { Warning() << "Failed to read old revision from storage: " << error.message; }); 385 [](const Storage::Error &error) { SinkWarning() << "Failed to read old revision from storage: " << error.message; });
385 386
386 if (!found) { 387 if (!found) {
387 Warning() << "Failed to find entity " << key; 388 SinkWarning() << "Failed to find entity " << key;
388 return KAsync::error<qint64>(0); 389 return KAsync::error<qint64>(0);
389 } 390 }
390 if (alreadyRemoved) { 391 if (alreadyRemoved) {
391 Warning() << "Entity is already removed " << key; 392 SinkWarning() << "Entity is already removed " << key;
392 return KAsync::error<qint64>(0); 393 return KAsync::error<qint64>(0);
393 } 394 }
394 395
@@ -408,7 +409,7 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size)
408 409
409 auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceType, bufferType); 410 auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceType, bufferType);
410 if (!adaptorFactory) { 411 if (!adaptorFactory) {
411 Warning() << "no adaptor factory for type " << bufferType; 412 SinkWarning() << "no adaptor factory for type " << bufferType;
412 return KAsync::error<qint64>(0); 413 return KAsync::error<qint64>(0);
413 } 414 }
414 415
@@ -418,17 +419,17 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size)
418 [this, bufferType, newRevision, adaptorFactory, key, &current](const QByteArray &, const QByteArray &data) -> bool { 419 [this, bufferType, newRevision, adaptorFactory, key, &current](const QByteArray &, const QByteArray &data) -> bool {
419 EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); 420 EntityBuffer buffer(const_cast<const char *>(data.data()), data.size());
420 if (!buffer.isValid()) { 421 if (!buffer.isValid()) {
421 Warning() << "Read invalid buffer from disk"; 422 SinkWarning() << "Read invalid buffer from disk";
422 } else { 423 } else {
423 current = adaptorFactory->createAdaptor(buffer.entity()); 424 current = adaptorFactory->createAdaptor(buffer.entity());
424 } 425 }
425 return false; 426 return false;
426 }, 427 },
427 [this](const Storage::Error &error) { ErrorMsg() << "Failed to find value in pipeline: " << error.message; }); 428 [this](const Storage::Error &error) { SinkError() << "Failed to find value in pipeline: " << error.message; });
428 429
429 d->storeNewRevision(newRevision, fbb, bufferType, key); 430 d->storeNewRevision(newRevision, fbb, bufferType, key);
430 431
431 for (auto processor : d->processors[bufferType]) { 432 foreach (const auto &processor, d->processors[bufferType]) {
432 processor->deletedEntity(key, newRevision, *current, d->transaction); 433 processor->deletedEntity(key, newRevision, *current, d->transaction);
433 } 434 }
434 435
@@ -440,13 +441,13 @@ void Pipeline::cleanupRevision(qint64 revision)
440 d->revisionChanged = true; 441 d->revisionChanged = true;
441 const auto uid = Storage::getUidFromRevision(d->transaction, revision); 442 const auto uid = Storage::getUidFromRevision(d->transaction, revision);
442 const auto bufferType = Storage::getTypeFromRevision(d->transaction, revision); 443 const auto bufferType = Storage::getTypeFromRevision(d->transaction, revision);
443 Trace() << "Cleaning up revision " << revision << uid << bufferType; 444 SinkTrace() << "Cleaning up revision " << revision << uid << bufferType;
444 Storage::mainDatabase(d->transaction, bufferType) 445 Storage::mainDatabase(d->transaction, bufferType)
445 .scan(uid, 446 .scan(uid,
446 [&](const QByteArray &key, const QByteArray &data) -> bool { 447 [&](const QByteArray &key, const QByteArray &data) -> bool {
447 EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); 448 EntityBuffer buffer(const_cast<const char *>(data.data()), data.size());
448 if (!buffer.isValid()) { 449 if (!buffer.isValid()) {
449 Warning() << "Read invalid buffer from disk"; 450 SinkWarning() << "Read invalid buffer from disk";
450 } else { 451 } else {
451 const auto metadata = flatbuffers::GetRoot<Metadata>(buffer.metadataBuffer()); 452 const auto metadata = flatbuffers::GetRoot<Metadata>(buffer.metadataBuffer());
452 const qint64 rev = metadata->revision(); 453 const qint64 rev = metadata->revision();
@@ -459,7 +460,7 @@ void Pipeline::cleanupRevision(qint64 revision)
459 460
460 return true; 461 return true;
461 }, 462 },
462 [](const Storage::Error &error) { Warning() << "Error while reading: " << error.message; }, true); 463 [](const Storage::Error &error) { SinkWarning() << "Error while reading: " << error.message; }, true);
463 Storage::setCleanedUpRevision(d->transaction, revision); 464 Storage::setCleanedUpRevision(d->transaction, revision);
464} 465}
465 466
@@ -481,7 +482,6 @@ Preprocessor::Preprocessor() : d(new Preprocessor::Private)
481 482
482Preprocessor::~Preprocessor() 483Preprocessor::~Preprocessor()
483{ 484{
484 delete d;
485} 485}
486 486
487void Preprocessor::setup(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier, Pipeline *pipeline) 487void Preprocessor::setup(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier, Pipeline *pipeline)