diff options
author | Christian Mollekopf <chrigi_1@fastmail.fm> | 2016-07-07 22:23:49 +0200 |
---|---|---|
committer | Christian Mollekopf <chrigi_1@fastmail.fm> | 2016-07-07 22:23:49 +0200 |
commit | da2b049e248c1ad7efeb53685158a205335e4e36 (patch) | |
tree | 1e7e5e940e9b760b2108081b1d2f3879cebdb0ff /common/pipeline.cpp | |
parent | 9bcb822963fc96c94dbe7dcc4134dcd2dac454ff (diff) | |
download | sink-da2b049e248c1ad7efeb53685158a205335e4e36.tar.gz sink-da2b049e248c1ad7efeb53685158a205335e4e36.zip |
A new debug system.
Instead of a single #define as debug area the new system allows for an
identifier for each debug message with the structure component.area.
The component is a dot separated identifier of the runtime component,
such as the process or the plugin.
The area is the code component, and can be as such defined at
compiletime.
The idea of this system is that it becomes possible to i.e. look at the
output of all messages in the query subsystem of a specific resource
(something that happens in the client process, but in the
resource-specific subcomponent).
The new macros are supposed to be less likely to clash with other names,
hence the new names.
Diffstat (limited to 'common/pipeline.cpp')
-rw-r--r-- | common/pipeline.cpp | 71 |
1 files changed, 35 insertions, 36 deletions
diff --git a/common/pipeline.cpp b/common/pipeline.cpp index 034f913..f1a4a32 100644 --- a/common/pipeline.cpp +++ b/common/pipeline.cpp | |||
@@ -38,8 +38,7 @@ | |||
38 | #include "definitions.h" | 38 | #include "definitions.h" |
39 | #include "bufferutils.h" | 39 | #include "bufferutils.h" |
40 | 40 | ||
41 | #undef DEBUG_AREA | 41 | SINK_DEBUG_AREA("pipeline") |
42 | #define DEBUG_AREA "resource.pipeline" | ||
43 | 42 | ||
44 | namespace Sink { | 43 | namespace Sink { |
45 | 44 | ||
@@ -63,10 +62,10 @@ public: | |||
63 | 62 | ||
64 | void Pipeline::Private::storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid) | 63 | void Pipeline::Private::storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid) |
65 | { | 64 | { |
66 | Trace() << "Committing new revision: " << uid << newRevision; | 65 | SinkTrace() << "Committing new revision: " << uid << newRevision; |
67 | Storage::mainDatabase(transaction, bufferType) | 66 | Storage::mainDatabase(transaction, bufferType) |
68 | .write(Storage::assembleKey(uid, newRevision), BufferUtils::extractBuffer(fbb), | 67 | .write(Storage::assembleKey(uid, newRevision), BufferUtils::extractBuffer(fbb), |
69 | [uid, newRevision](const Storage::Error &error) { Warning() << "Failed to write entity" << uid << newRevision; }); | 68 | [uid, newRevision](const Storage::Error &error) { SinkWarning() << "Failed to write entity" << uid << newRevision; }); |
70 | revisionChanged = true; | 69 | revisionChanged = true; |
71 | Storage::setMaxRevision(transaction, newRevision); | 70 | Storage::setMaxRevision(transaction, newRevision); |
72 | Storage::recordRevision(transaction, newRevision, uid, bufferType); | 71 | Storage::recordRevision(transaction, newRevision, uid, bufferType); |
@@ -107,11 +106,11 @@ void Pipeline::startTransaction() | |||
107 | if (d->transaction) { | 106 | if (d->transaction) { |
108 | return; | 107 | return; |
109 | } | 108 | } |
110 | Trace() << "Starting transaction."; | 109 | SinkTrace() << "Starting transaction."; |
111 | d->transactionTime.start(); | 110 | d->transactionTime.start(); |
112 | d->transactionItemCount = 0; | 111 | d->transactionItemCount = 0; |
113 | d->transaction = storage().createTransaction(Storage::ReadWrite, [](const Sink::Storage::Error &error) { | 112 | d->transaction = storage().createTransaction(Storage::ReadWrite, [](const Sink::Storage::Error &error) { |
114 | Warning() << error.message; | 113 | SinkWarning() << error.message; |
115 | }); | 114 | }); |
116 | 115 | ||
117 | //FIXME this is a temporary measure to recover from a failure to open the named databases correctly. | 116 | //FIXME this is a temporary measure to recover from a failure to open the named databases correctly. |
@@ -119,9 +118,9 @@ void Pipeline::startTransaction() | |||
119 | //It seems like the validateNamedDatabase calls actually stops the mdb_put failures during sync... | 118 | //It seems like the validateNamedDatabase calls actually stops the mdb_put failures during sync... |
120 | if (d->storage.exists()) { | 119 | if (d->storage.exists()) { |
121 | while (!d->transaction.validateNamedDatabases()) { | 120 | while (!d->transaction.validateNamedDatabases()) { |
122 | Warning() << "Opened an invalid transaction!!!!!!"; | 121 | SinkWarning() << "Opened an invalid transaction!!!!!!"; |
123 | d->transaction = std::move(storage().createTransaction(Storage::ReadWrite, [](const Sink::Storage::Error &error) { | 122 | d->transaction = std::move(storage().createTransaction(Storage::ReadWrite, [](const Sink::Storage::Error &error) { |
124 | Warning() << error.message; | 123 | SinkWarning() << error.message; |
125 | })); | 124 | })); |
126 | } | 125 | } |
127 | } | 126 | } |
@@ -141,7 +140,7 @@ void Pipeline::commit() | |||
141 | } | 140 | } |
142 | const auto revision = Storage::maxRevision(d->transaction); | 141 | const auto revision = Storage::maxRevision(d->transaction); |
143 | const auto elapsed = d->transactionTime.elapsed(); | 142 | const auto elapsed = d->transactionTime.elapsed(); |
144 | Log() << "Committing revision: " << revision << ":" << d->transactionItemCount << " items in: " << Log::TraceTime(elapsed) << " " | 143 | SinkLog() << "Committing revision: " << revision << ":" << d->transactionItemCount << " items in: " << Log::TraceTime(elapsed) << " " |
145 | << (double)elapsed / (double)qMax(d->transactionItemCount, 1) << "[ms/item]"; | 144 | << (double)elapsed / (double)qMax(d->transactionItemCount, 1) << "[ms/item]"; |
146 | if (d->transaction) { | 145 | if (d->transaction) { |
147 | d->transaction.commit(); | 146 | d->transaction.commit(); |
@@ -170,7 +169,7 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size) | |||
170 | { | 169 | { |
171 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size); | 170 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size); |
172 | if (!Commands::VerifyCreateEntityBuffer(verifyer)) { | 171 | if (!Commands::VerifyCreateEntityBuffer(verifyer)) { |
173 | Warning() << "invalid buffer, not a create entity buffer"; | 172 | SinkWarning() << "invalid buffer, not a create entity buffer"; |
174 | return KAsync::error<qint64>(0); | 173 | return KAsync::error<qint64>(0); |
175 | } | 174 | } |
176 | } | 175 | } |
@@ -182,7 +181,7 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size) | |||
182 | if (createEntity->entityId()) { | 181 | if (createEntity->entityId()) { |
183 | key = QByteArray(reinterpret_cast<char const *>(createEntity->entityId()->Data()), createEntity->entityId()->size()); | 182 | key = QByteArray(reinterpret_cast<char const *>(createEntity->entityId()->Data()), createEntity->entityId()->size()); |
184 | if (Storage::mainDatabase(d->transaction, bufferType).contains(key)) { | 183 | if (Storage::mainDatabase(d->transaction, bufferType).contains(key)) { |
185 | ErrorMsg() << "An entity with this id already exists: " << key; | 184 | SinkError() << "An entity with this id already exists: " << key; |
186 | return KAsync::error<qint64>(0); | 185 | return KAsync::error<qint64>(0); |
187 | } | 186 | } |
188 | } | 187 | } |
@@ -190,25 +189,25 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size) | |||
190 | if (key.isEmpty()) { | 189 | if (key.isEmpty()) { |
191 | key = Sink::Storage::generateUid(); | 190 | key = Sink::Storage::generateUid(); |
192 | } | 191 | } |
193 | Log() << "New Entity. Type: " << bufferType << "uid: "<< key << " replayToSource: " << replayToSource; | 192 | SinkLog() << "New Entity. Type: " << bufferType << "uid: "<< key << " replayToSource: " << replayToSource; |
194 | Q_ASSERT(!key.isEmpty()); | 193 | Q_ASSERT(!key.isEmpty()); |
195 | 194 | ||
196 | { | 195 | { |
197 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(createEntity->delta()->Data()), createEntity->delta()->size()); | 196 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(createEntity->delta()->Data()), createEntity->delta()->size()); |
198 | if (!VerifyEntityBuffer(verifyer)) { | 197 | if (!VerifyEntityBuffer(verifyer)) { |
199 | Warning() << "invalid buffer, not an entity buffer"; | 198 | SinkWarning() << "invalid buffer, not an entity buffer"; |
200 | return KAsync::error<qint64>(0); | 199 | return KAsync::error<qint64>(0); |
201 | } | 200 | } |
202 | } | 201 | } |
203 | auto entity = GetEntity(createEntity->delta()->Data()); | 202 | auto entity = GetEntity(createEntity->delta()->Data()); |
204 | if (!entity->resource()->size() && !entity->local()->size()) { | 203 | if (!entity->resource()->size() && !entity->local()->size()) { |
205 | Warning() << "No local and no resource buffer while trying to create entity."; | 204 | SinkWarning() << "No local and no resource buffer while trying to create entity."; |
206 | return KAsync::error<qint64>(0); | 205 | return KAsync::error<qint64>(0); |
207 | } | 206 | } |
208 | 207 | ||
209 | auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceType, bufferType); | 208 | auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceType, bufferType); |
210 | if (!adaptorFactory) { | 209 | if (!adaptorFactory) { |
211 | Warning() << "no adaptor factory for type " << bufferType; | 210 | SinkWarning() << "no adaptor factory for type " << bufferType; |
212 | return KAsync::error<qint64>(0); | 211 | return KAsync::error<qint64>(0); |
213 | } | 212 | } |
214 | 213 | ||
@@ -244,7 +243,7 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size) | |||
244 | { | 243 | { |
245 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size); | 244 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size); |
246 | if (!Commands::VerifyModifyEntityBuffer(verifyer)) { | 245 | if (!Commands::VerifyModifyEntityBuffer(verifyer)) { |
247 | Warning() << "invalid buffer, not a modify entity buffer"; | 246 | SinkWarning() << "invalid buffer, not a modify entity buffer"; |
248 | return KAsync::error<qint64>(0); | 247 | return KAsync::error<qint64>(0); |
249 | } | 248 | } |
250 | } | 249 | } |
@@ -254,21 +253,21 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size) | |||
254 | if (modifyEntity->modifiedProperties()) { | 253 | if (modifyEntity->modifiedProperties()) { |
255 | changeset = BufferUtils::fromVector(*modifyEntity->modifiedProperties()); | 254 | changeset = BufferUtils::fromVector(*modifyEntity->modifiedProperties()); |
256 | } else { | 255 | } else { |
257 | Warning() << "No changeset available"; | 256 | SinkWarning() << "No changeset available"; |
258 | } | 257 | } |
259 | const qint64 baseRevision = modifyEntity->revision(); | 258 | const qint64 baseRevision = modifyEntity->revision(); |
260 | const bool replayToSource = modifyEntity->replayToSource(); | 259 | const bool replayToSource = modifyEntity->replayToSource(); |
261 | const QByteArray bufferType = QByteArray(reinterpret_cast<char const *>(modifyEntity->domainType()->Data()), modifyEntity->domainType()->size()); | 260 | const QByteArray bufferType = QByteArray(reinterpret_cast<char const *>(modifyEntity->domainType()->Data()), modifyEntity->domainType()->size()); |
262 | const QByteArray key = QByteArray(reinterpret_cast<char const *>(modifyEntity->entityId()->Data()), modifyEntity->entityId()->size()); | 261 | const QByteArray key = QByteArray(reinterpret_cast<char const *>(modifyEntity->entityId()->Data()), modifyEntity->entityId()->size()); |
263 | Log() << "Modified Entity. Type: " << bufferType << "uid: "<< key << " replayToSource: " << replayToSource; | 262 | SinkLog() << "Modified Entity. Type: " << bufferType << "uid: "<< key << " replayToSource: " << replayToSource; |
264 | if (bufferType.isEmpty() || key.isEmpty()) { | 263 | if (bufferType.isEmpty() || key.isEmpty()) { |
265 | Warning() << "entity type or key " << bufferType << key; | 264 | SinkWarning() << "entity type or key " << bufferType << key; |
266 | return KAsync::error<qint64>(0); | 265 | return KAsync::error<qint64>(0); |
267 | } | 266 | } |
268 | { | 267 | { |
269 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(modifyEntity->delta()->Data()), modifyEntity->delta()->size()); | 268 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(modifyEntity->delta()->Data()), modifyEntity->delta()->size()); |
270 | if (!VerifyEntityBuffer(verifyer)) { | 269 | if (!VerifyEntityBuffer(verifyer)) { |
271 | Warning() << "invalid buffer, not an entity buffer"; | 270 | SinkWarning() << "invalid buffer, not an entity buffer"; |
272 | return KAsync::error<qint64>(0); | 271 | return KAsync::error<qint64>(0); |
273 | } | 272 | } |
274 | } | 273 | } |
@@ -276,7 +275,7 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size) | |||
276 | // TODO use only readPropertyMapper and writePropertyMapper | 275 | // TODO use only readPropertyMapper and writePropertyMapper |
277 | auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceType, bufferType); | 276 | auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceType, bufferType); |
278 | if (!adaptorFactory) { | 277 | if (!adaptorFactory) { |
279 | Warning() << "no adaptor factory for type " << bufferType; | 278 | SinkWarning() << "no adaptor factory for type " << bufferType; |
280 | return KAsync::error<qint64>(0); | 279 | return KAsync::error<qint64>(0); |
281 | } | 280 | } |
282 | 281 | ||
@@ -290,16 +289,16 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size) | |||
290 | [¤t, adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool { | 289 | [¤t, adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool { |
291 | EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); | 290 | EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); |
292 | if (!buffer.isValid()) { | 291 | if (!buffer.isValid()) { |
293 | Warning() << "Read invalid buffer from disk"; | 292 | SinkWarning() << "Read invalid buffer from disk"; |
294 | } else { | 293 | } else { |
295 | current = adaptorFactory->createAdaptor(buffer.entity()); | 294 | current = adaptorFactory->createAdaptor(buffer.entity()); |
296 | } | 295 | } |
297 | return false; | 296 | return false; |
298 | }, | 297 | }, |
299 | [baseRevision](const Storage::Error &error) { Warning() << "Failed to read old revision from storage: " << error.message << "Revision: " << baseRevision; }); | 298 | [baseRevision](const Storage::Error &error) { SinkWarning() << "Failed to read old revision from storage: " << error.message << "Revision: " << baseRevision; }); |
300 | 299 | ||
301 | if (!current) { | 300 | if (!current) { |
302 | Warning() << "Failed to read local value " << key; | 301 | SinkWarning() << "Failed to read local value " << key; |
303 | return KAsync::error<qint64>(0); | 302 | return KAsync::error<qint64>(0); |
304 | } | 303 | } |
305 | 304 | ||
@@ -307,7 +306,7 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size) | |||
307 | 306 | ||
308 | // Apply diff | 307 | // Apply diff |
309 | // FIXME only apply the properties that are available in the buffer | 308 | // FIXME only apply the properties that are available in the buffer |
310 | Trace() << "Applying changed properties: " << changeset; | 309 | SinkTrace() << "Applying changed properties: " << changeset; |
311 | for (const auto &property : changeset) { | 310 | for (const auto &property : changeset) { |
312 | const auto value = diff->getProperty(property); | 311 | const auto value = diff->getProperty(property); |
313 | if (value.isValid()) { | 312 | if (value.isValid()) { |
@@ -357,7 +356,7 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size) | |||
357 | { | 356 | { |
358 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size); | 357 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size); |
359 | if (!Commands::VerifyDeleteEntityBuffer(verifyer)) { | 358 | if (!Commands::VerifyDeleteEntityBuffer(verifyer)) { |
360 | Warning() << "invalid buffer, not a delete entity buffer"; | 359 | SinkWarning() << "invalid buffer, not a delete entity buffer"; |
361 | return KAsync::error<qint64>(0); | 360 | return KAsync::error<qint64>(0); |
362 | } | 361 | } |
363 | } | 362 | } |
@@ -366,7 +365,7 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size) | |||
366 | const bool replayToSource = deleteEntity->replayToSource(); | 365 | const bool replayToSource = deleteEntity->replayToSource(); |
367 | const QByteArray bufferType = QByteArray(reinterpret_cast<char const *>(deleteEntity->domainType()->Data()), deleteEntity->domainType()->size()); | 366 | const QByteArray bufferType = QByteArray(reinterpret_cast<char const *>(deleteEntity->domainType()->Data()), deleteEntity->domainType()->size()); |
368 | const QByteArray key = QByteArray(reinterpret_cast<char const *>(deleteEntity->entityId()->Data()), deleteEntity->entityId()->size()); | 367 | const QByteArray key = QByteArray(reinterpret_cast<char const *>(deleteEntity->entityId()->Data()), deleteEntity->entityId()->size()); |
369 | Log() << "Deleted Entity. Type: " << bufferType << "uid: "<< key << " replayToSource: " << replayToSource; | 368 | SinkLog() << "Deleted Entity. Type: " << bufferType << "uid: "<< key << " replayToSource: " << replayToSource; |
370 | 369 | ||
371 | bool found = false; | 370 | bool found = false; |
372 | bool alreadyRemoved = false; | 371 | bool alreadyRemoved = false; |
@@ -383,14 +382,14 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size) | |||
383 | } | 382 | } |
384 | return false; | 383 | return false; |
385 | }, | 384 | }, |
386 | [](const Storage::Error &error) { Warning() << "Failed to read old revision from storage: " << error.message; }); | 385 | [](const Storage::Error &error) { SinkWarning() << "Failed to read old revision from storage: " << error.message; }); |
387 | 386 | ||
388 | if (!found) { | 387 | if (!found) { |
389 | Warning() << "Failed to find entity " << key; | 388 | SinkWarning() << "Failed to find entity " << key; |
390 | return KAsync::error<qint64>(0); | 389 | return KAsync::error<qint64>(0); |
391 | } | 390 | } |
392 | if (alreadyRemoved) { | 391 | if (alreadyRemoved) { |
393 | Warning() << "Entity is already removed " << key; | 392 | SinkWarning() << "Entity is already removed " << key; |
394 | return KAsync::error<qint64>(0); | 393 | return KAsync::error<qint64>(0); |
395 | } | 394 | } |
396 | 395 | ||
@@ -410,7 +409,7 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size) | |||
410 | 409 | ||
411 | auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceType, bufferType); | 410 | auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceType, bufferType); |
412 | if (!adaptorFactory) { | 411 | if (!adaptorFactory) { |
413 | Warning() << "no adaptor factory for type " << bufferType; | 412 | SinkWarning() << "no adaptor factory for type " << bufferType; |
414 | return KAsync::error<qint64>(0); | 413 | return KAsync::error<qint64>(0); |
415 | } | 414 | } |
416 | 415 | ||
@@ -420,13 +419,13 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size) | |||
420 | [this, bufferType, newRevision, adaptorFactory, key, ¤t](const QByteArray &, const QByteArray &data) -> bool { | 419 | [this, bufferType, newRevision, adaptorFactory, key, ¤t](const QByteArray &, const QByteArray &data) -> bool { |
421 | EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); | 420 | EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); |
422 | if (!buffer.isValid()) { | 421 | if (!buffer.isValid()) { |
423 | Warning() << "Read invalid buffer from disk"; | 422 | SinkWarning() << "Read invalid buffer from disk"; |
424 | } else { | 423 | } else { |
425 | current = adaptorFactory->createAdaptor(buffer.entity()); | 424 | current = adaptorFactory->createAdaptor(buffer.entity()); |
426 | } | 425 | } |
427 | return false; | 426 | return false; |
428 | }, | 427 | }, |
429 | [this](const Storage::Error &error) { ErrorMsg() << "Failed to find value in pipeline: " << error.message; }); | 428 | [this](const Storage::Error &error) { SinkError() << "Failed to find value in pipeline: " << error.message; }); |
430 | 429 | ||
431 | d->storeNewRevision(newRevision, fbb, bufferType, key); | 430 | d->storeNewRevision(newRevision, fbb, bufferType, key); |
432 | 431 | ||
@@ -442,13 +441,13 @@ void Pipeline::cleanupRevision(qint64 revision) | |||
442 | d->revisionChanged = true; | 441 | d->revisionChanged = true; |
443 | const auto uid = Storage::getUidFromRevision(d->transaction, revision); | 442 | const auto uid = Storage::getUidFromRevision(d->transaction, revision); |
444 | const auto bufferType = Storage::getTypeFromRevision(d->transaction, revision); | 443 | const auto bufferType = Storage::getTypeFromRevision(d->transaction, revision); |
445 | Trace() << "Cleaning up revision " << revision << uid << bufferType; | 444 | SinkTrace() << "Cleaning up revision " << revision << uid << bufferType; |
446 | Storage::mainDatabase(d->transaction, bufferType) | 445 | Storage::mainDatabase(d->transaction, bufferType) |
447 | .scan(uid, | 446 | .scan(uid, |
448 | [&](const QByteArray &key, const QByteArray &data) -> bool { | 447 | [&](const QByteArray &key, const QByteArray &data) -> bool { |
449 | EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); | 448 | EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); |
450 | if (!buffer.isValid()) { | 449 | if (!buffer.isValid()) { |
451 | Warning() << "Read invalid buffer from disk"; | 450 | SinkWarning() << "Read invalid buffer from disk"; |
452 | } else { | 451 | } else { |
453 | const auto metadata = flatbuffers::GetRoot<Metadata>(buffer.metadataBuffer()); | 452 | const auto metadata = flatbuffers::GetRoot<Metadata>(buffer.metadataBuffer()); |
454 | const qint64 rev = metadata->revision(); | 453 | const qint64 rev = metadata->revision(); |
@@ -461,7 +460,7 @@ void Pipeline::cleanupRevision(qint64 revision) | |||
461 | 460 | ||
462 | return true; | 461 | return true; |
463 | }, | 462 | }, |
464 | [](const Storage::Error &error) { Warning() << "Error while reading: " << error.message; }, true); | 463 | [](const Storage::Error &error) { SinkWarning() << "Error while reading: " << error.message; }, true); |
465 | Storage::setCleanedUpRevision(d->transaction, revision); | 464 | Storage::setCleanedUpRevision(d->transaction, revision); |
466 | } | 465 | } |
467 | 466 | ||