summaryrefslogtreecommitdiffstats
path: root/common/pipeline.cpp
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2016-07-07 22:23:49 +0200
committerChristian Mollekopf <chrigi_1@fastmail.fm>2016-07-07 22:23:49 +0200
commitda2b049e248c1ad7efeb53685158a205335e4e36 (patch)
tree1e7e5e940e9b760b2108081b1d2f3879cebdb0ff /common/pipeline.cpp
parent9bcb822963fc96c94dbe7dcc4134dcd2dac454ff (diff)
downloadsink-da2b049e248c1ad7efeb53685158a205335e4e36.tar.gz
sink-da2b049e248c1ad7efeb53685158a205335e4e36.zip
A new debug system.
Instead of a single #define as debug area the new system allows for an identifier for each debug message with the structure component.area. The component is a dot separated identifier of the runtime component, such as the process or the plugin. The area is the code component, and can be as such defined at compiletime. The idea of this system is that it becomes possible to i.e. look at the output of all messages in the query subsystem of a specific resource (something that happens in the client process, but in the resource-specific subcomponent). The new macros are supposed to be less likely to clash with other names, hence the new names.
Diffstat (limited to 'common/pipeline.cpp')
-rw-r--r--common/pipeline.cpp71
1 files changed, 35 insertions, 36 deletions
diff --git a/common/pipeline.cpp b/common/pipeline.cpp
index 034f913..f1a4a32 100644
--- a/common/pipeline.cpp
+++ b/common/pipeline.cpp
@@ -38,8 +38,7 @@
38#include "definitions.h" 38#include "definitions.h"
39#include "bufferutils.h" 39#include "bufferutils.h"
40 40
41#undef DEBUG_AREA 41SINK_DEBUG_AREA("pipeline")
42#define DEBUG_AREA "resource.pipeline"
43 42
44namespace Sink { 43namespace Sink {
45 44
@@ -63,10 +62,10 @@ public:
63 62
64void Pipeline::Private::storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid) 63void Pipeline::Private::storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid)
65{ 64{
66 Trace() << "Committing new revision: " << uid << newRevision; 65 SinkTrace() << "Committing new revision: " << uid << newRevision;
67 Storage::mainDatabase(transaction, bufferType) 66 Storage::mainDatabase(transaction, bufferType)
68 .write(Storage::assembleKey(uid, newRevision), BufferUtils::extractBuffer(fbb), 67 .write(Storage::assembleKey(uid, newRevision), BufferUtils::extractBuffer(fbb),
69 [uid, newRevision](const Storage::Error &error) { Warning() << "Failed to write entity" << uid << newRevision; }); 68 [uid, newRevision](const Storage::Error &error) { SinkWarning() << "Failed to write entity" << uid << newRevision; });
70 revisionChanged = true; 69 revisionChanged = true;
71 Storage::setMaxRevision(transaction, newRevision); 70 Storage::setMaxRevision(transaction, newRevision);
72 Storage::recordRevision(transaction, newRevision, uid, bufferType); 71 Storage::recordRevision(transaction, newRevision, uid, bufferType);
@@ -107,11 +106,11 @@ void Pipeline::startTransaction()
107 if (d->transaction) { 106 if (d->transaction) {
108 return; 107 return;
109 } 108 }
110 Trace() << "Starting transaction."; 109 SinkTrace() << "Starting transaction.";
111 d->transactionTime.start(); 110 d->transactionTime.start();
112 d->transactionItemCount = 0; 111 d->transactionItemCount = 0;
113 d->transaction = storage().createTransaction(Storage::ReadWrite, [](const Sink::Storage::Error &error) { 112 d->transaction = storage().createTransaction(Storage::ReadWrite, [](const Sink::Storage::Error &error) {
114 Warning() << error.message; 113 SinkWarning() << error.message;
115 }); 114 });
116 115
117 //FIXME this is a temporary measure to recover from a failure to open the named databases correctly. 116 //FIXME this is a temporary measure to recover from a failure to open the named databases correctly.
@@ -119,9 +118,9 @@ void Pipeline::startTransaction()
119 //It seems like the validateNamedDatabase calls actually stops the mdb_put failures during sync... 118 //It seems like the validateNamedDatabase calls actually stops the mdb_put failures during sync...
120 if (d->storage.exists()) { 119 if (d->storage.exists()) {
121 while (!d->transaction.validateNamedDatabases()) { 120 while (!d->transaction.validateNamedDatabases()) {
122 Warning() << "Opened an invalid transaction!!!!!!"; 121 SinkWarning() << "Opened an invalid transaction!!!!!!";
123 d->transaction = std::move(storage().createTransaction(Storage::ReadWrite, [](const Sink::Storage::Error &error) { 122 d->transaction = std::move(storage().createTransaction(Storage::ReadWrite, [](const Sink::Storage::Error &error) {
124 Warning() << error.message; 123 SinkWarning() << error.message;
125 })); 124 }));
126 } 125 }
127 } 126 }
@@ -141,7 +140,7 @@ void Pipeline::commit()
141 } 140 }
142 const auto revision = Storage::maxRevision(d->transaction); 141 const auto revision = Storage::maxRevision(d->transaction);
143 const auto elapsed = d->transactionTime.elapsed(); 142 const auto elapsed = d->transactionTime.elapsed();
144 Log() << "Committing revision: " << revision << ":" << d->transactionItemCount << " items in: " << Log::TraceTime(elapsed) << " " 143 SinkLog() << "Committing revision: " << revision << ":" << d->transactionItemCount << " items in: " << Log::TraceTime(elapsed) << " "
145 << (double)elapsed / (double)qMax(d->transactionItemCount, 1) << "[ms/item]"; 144 << (double)elapsed / (double)qMax(d->transactionItemCount, 1) << "[ms/item]";
146 if (d->transaction) { 145 if (d->transaction) {
147 d->transaction.commit(); 146 d->transaction.commit();
@@ -170,7 +169,7 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size)
170 { 169 {
171 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size); 170 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size);
172 if (!Commands::VerifyCreateEntityBuffer(verifyer)) { 171 if (!Commands::VerifyCreateEntityBuffer(verifyer)) {
173 Warning() << "invalid buffer, not a create entity buffer"; 172 SinkWarning() << "invalid buffer, not a create entity buffer";
174 return KAsync::error<qint64>(0); 173 return KAsync::error<qint64>(0);
175 } 174 }
176 } 175 }
@@ -182,7 +181,7 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size)
182 if (createEntity->entityId()) { 181 if (createEntity->entityId()) {
183 key = QByteArray(reinterpret_cast<char const *>(createEntity->entityId()->Data()), createEntity->entityId()->size()); 182 key = QByteArray(reinterpret_cast<char const *>(createEntity->entityId()->Data()), createEntity->entityId()->size());
184 if (Storage::mainDatabase(d->transaction, bufferType).contains(key)) { 183 if (Storage::mainDatabase(d->transaction, bufferType).contains(key)) {
185 ErrorMsg() << "An entity with this id already exists: " << key; 184 SinkError() << "An entity with this id already exists: " << key;
186 return KAsync::error<qint64>(0); 185 return KAsync::error<qint64>(0);
187 } 186 }
188 } 187 }
@@ -190,25 +189,25 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size)
190 if (key.isEmpty()) { 189 if (key.isEmpty()) {
191 key = Sink::Storage::generateUid(); 190 key = Sink::Storage::generateUid();
192 } 191 }
193 Log() << "New Entity. Type: " << bufferType << "uid: "<< key << " replayToSource: " << replayToSource; 192 SinkLog() << "New Entity. Type: " << bufferType << "uid: "<< key << " replayToSource: " << replayToSource;
194 Q_ASSERT(!key.isEmpty()); 193 Q_ASSERT(!key.isEmpty());
195 194
196 { 195 {
197 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(createEntity->delta()->Data()), createEntity->delta()->size()); 196 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(createEntity->delta()->Data()), createEntity->delta()->size());
198 if (!VerifyEntityBuffer(verifyer)) { 197 if (!VerifyEntityBuffer(verifyer)) {
199 Warning() << "invalid buffer, not an entity buffer"; 198 SinkWarning() << "invalid buffer, not an entity buffer";
200 return KAsync::error<qint64>(0); 199 return KAsync::error<qint64>(0);
201 } 200 }
202 } 201 }
203 auto entity = GetEntity(createEntity->delta()->Data()); 202 auto entity = GetEntity(createEntity->delta()->Data());
204 if (!entity->resource()->size() && !entity->local()->size()) { 203 if (!entity->resource()->size() && !entity->local()->size()) {
205 Warning() << "No local and no resource buffer while trying to create entity."; 204 SinkWarning() << "No local and no resource buffer while trying to create entity.";
206 return KAsync::error<qint64>(0); 205 return KAsync::error<qint64>(0);
207 } 206 }
208 207
209 auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceType, bufferType); 208 auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceType, bufferType);
210 if (!adaptorFactory) { 209 if (!adaptorFactory) {
211 Warning() << "no adaptor factory for type " << bufferType; 210 SinkWarning() << "no adaptor factory for type " << bufferType;
212 return KAsync::error<qint64>(0); 211 return KAsync::error<qint64>(0);
213 } 212 }
214 213
@@ -244,7 +243,7 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size)
244 { 243 {
245 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size); 244 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size);
246 if (!Commands::VerifyModifyEntityBuffer(verifyer)) { 245 if (!Commands::VerifyModifyEntityBuffer(verifyer)) {
247 Warning() << "invalid buffer, not a modify entity buffer"; 246 SinkWarning() << "invalid buffer, not a modify entity buffer";
248 return KAsync::error<qint64>(0); 247 return KAsync::error<qint64>(0);
249 } 248 }
250 } 249 }
@@ -254,21 +253,21 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size)
254 if (modifyEntity->modifiedProperties()) { 253 if (modifyEntity->modifiedProperties()) {
255 changeset = BufferUtils::fromVector(*modifyEntity->modifiedProperties()); 254 changeset = BufferUtils::fromVector(*modifyEntity->modifiedProperties());
256 } else { 255 } else {
257 Warning() << "No changeset available"; 256 SinkWarning() << "No changeset available";
258 } 257 }
259 const qint64 baseRevision = modifyEntity->revision(); 258 const qint64 baseRevision = modifyEntity->revision();
260 const bool replayToSource = modifyEntity->replayToSource(); 259 const bool replayToSource = modifyEntity->replayToSource();
261 const QByteArray bufferType = QByteArray(reinterpret_cast<char const *>(modifyEntity->domainType()->Data()), modifyEntity->domainType()->size()); 260 const QByteArray bufferType = QByteArray(reinterpret_cast<char const *>(modifyEntity->domainType()->Data()), modifyEntity->domainType()->size());
262 const QByteArray key = QByteArray(reinterpret_cast<char const *>(modifyEntity->entityId()->Data()), modifyEntity->entityId()->size()); 261 const QByteArray key = QByteArray(reinterpret_cast<char const *>(modifyEntity->entityId()->Data()), modifyEntity->entityId()->size());
263 Log() << "Modified Entity. Type: " << bufferType << "uid: "<< key << " replayToSource: " << replayToSource; 262 SinkLog() << "Modified Entity. Type: " << bufferType << "uid: "<< key << " replayToSource: " << replayToSource;
264 if (bufferType.isEmpty() || key.isEmpty()) { 263 if (bufferType.isEmpty() || key.isEmpty()) {
265 Warning() << "entity type or key " << bufferType << key; 264 SinkWarning() << "entity type or key " << bufferType << key;
266 return KAsync::error<qint64>(0); 265 return KAsync::error<qint64>(0);
267 } 266 }
268 { 267 {
269 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(modifyEntity->delta()->Data()), modifyEntity->delta()->size()); 268 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(modifyEntity->delta()->Data()), modifyEntity->delta()->size());
270 if (!VerifyEntityBuffer(verifyer)) { 269 if (!VerifyEntityBuffer(verifyer)) {
271 Warning() << "invalid buffer, not an entity buffer"; 270 SinkWarning() << "invalid buffer, not an entity buffer";
272 return KAsync::error<qint64>(0); 271 return KAsync::error<qint64>(0);
273 } 272 }
274 } 273 }
@@ -276,7 +275,7 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size)
276 // TODO use only readPropertyMapper and writePropertyMapper 275 // TODO use only readPropertyMapper and writePropertyMapper
277 auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceType, bufferType); 276 auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceType, bufferType);
278 if (!adaptorFactory) { 277 if (!adaptorFactory) {
279 Warning() << "no adaptor factory for type " << bufferType; 278 SinkWarning() << "no adaptor factory for type " << bufferType;
280 return KAsync::error<qint64>(0); 279 return KAsync::error<qint64>(0);
281 } 280 }
282 281
@@ -290,16 +289,16 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size)
290 [&current, adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool { 289 [&current, adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool {
291 EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); 290 EntityBuffer buffer(const_cast<const char *>(data.data()), data.size());
292 if (!buffer.isValid()) { 291 if (!buffer.isValid()) {
293 Warning() << "Read invalid buffer from disk"; 292 SinkWarning() << "Read invalid buffer from disk";
294 } else { 293 } else {
295 current = adaptorFactory->createAdaptor(buffer.entity()); 294 current = adaptorFactory->createAdaptor(buffer.entity());
296 } 295 }
297 return false; 296 return false;
298 }, 297 },
299 [baseRevision](const Storage::Error &error) { Warning() << "Failed to read old revision from storage: " << error.message << "Revision: " << baseRevision; }); 298 [baseRevision](const Storage::Error &error) { SinkWarning() << "Failed to read old revision from storage: " << error.message << "Revision: " << baseRevision; });
300 299
301 if (!current) { 300 if (!current) {
302 Warning() << "Failed to read local value " << key; 301 SinkWarning() << "Failed to read local value " << key;
303 return KAsync::error<qint64>(0); 302 return KAsync::error<qint64>(0);
304 } 303 }
305 304
@@ -307,7 +306,7 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size)
307 306
308 // Apply diff 307 // Apply diff
309 // FIXME only apply the properties that are available in the buffer 308 // FIXME only apply the properties that are available in the buffer
310 Trace() << "Applying changed properties: " << changeset; 309 SinkTrace() << "Applying changed properties: " << changeset;
311 for (const auto &property : changeset) { 310 for (const auto &property : changeset) {
312 const auto value = diff->getProperty(property); 311 const auto value = diff->getProperty(property);
313 if (value.isValid()) { 312 if (value.isValid()) {
@@ -357,7 +356,7 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size)
357 { 356 {
358 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size); 357 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size);
359 if (!Commands::VerifyDeleteEntityBuffer(verifyer)) { 358 if (!Commands::VerifyDeleteEntityBuffer(verifyer)) {
360 Warning() << "invalid buffer, not a delete entity buffer"; 359 SinkWarning() << "invalid buffer, not a delete entity buffer";
361 return KAsync::error<qint64>(0); 360 return KAsync::error<qint64>(0);
362 } 361 }
363 } 362 }
@@ -366,7 +365,7 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size)
366 const bool replayToSource = deleteEntity->replayToSource(); 365 const bool replayToSource = deleteEntity->replayToSource();
367 const QByteArray bufferType = QByteArray(reinterpret_cast<char const *>(deleteEntity->domainType()->Data()), deleteEntity->domainType()->size()); 366 const QByteArray bufferType = QByteArray(reinterpret_cast<char const *>(deleteEntity->domainType()->Data()), deleteEntity->domainType()->size());
368 const QByteArray key = QByteArray(reinterpret_cast<char const *>(deleteEntity->entityId()->Data()), deleteEntity->entityId()->size()); 367 const QByteArray key = QByteArray(reinterpret_cast<char const *>(deleteEntity->entityId()->Data()), deleteEntity->entityId()->size());
369 Log() << "Deleted Entity. Type: " << bufferType << "uid: "<< key << " replayToSource: " << replayToSource; 368 SinkLog() << "Deleted Entity. Type: " << bufferType << "uid: "<< key << " replayToSource: " << replayToSource;
370 369
371 bool found = false; 370 bool found = false;
372 bool alreadyRemoved = false; 371 bool alreadyRemoved = false;
@@ -383,14 +382,14 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size)
383 } 382 }
384 return false; 383 return false;
385 }, 384 },
386 [](const Storage::Error &error) { Warning() << "Failed to read old revision from storage: " << error.message; }); 385 [](const Storage::Error &error) { SinkWarning() << "Failed to read old revision from storage: " << error.message; });
387 386
388 if (!found) { 387 if (!found) {
389 Warning() << "Failed to find entity " << key; 388 SinkWarning() << "Failed to find entity " << key;
390 return KAsync::error<qint64>(0); 389 return KAsync::error<qint64>(0);
391 } 390 }
392 if (alreadyRemoved) { 391 if (alreadyRemoved) {
393 Warning() << "Entity is already removed " << key; 392 SinkWarning() << "Entity is already removed " << key;
394 return KAsync::error<qint64>(0); 393 return KAsync::error<qint64>(0);
395 } 394 }
396 395
@@ -410,7 +409,7 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size)
410 409
411 auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceType, bufferType); 410 auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceType, bufferType);
412 if (!adaptorFactory) { 411 if (!adaptorFactory) {
413 Warning() << "no adaptor factory for type " << bufferType; 412 SinkWarning() << "no adaptor factory for type " << bufferType;
414 return KAsync::error<qint64>(0); 413 return KAsync::error<qint64>(0);
415 } 414 }
416 415
@@ -420,13 +419,13 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size)
420 [this, bufferType, newRevision, adaptorFactory, key, &current](const QByteArray &, const QByteArray &data) -> bool { 419 [this, bufferType, newRevision, adaptorFactory, key, &current](const QByteArray &, const QByteArray &data) -> bool {
421 EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); 420 EntityBuffer buffer(const_cast<const char *>(data.data()), data.size());
422 if (!buffer.isValid()) { 421 if (!buffer.isValid()) {
423 Warning() << "Read invalid buffer from disk"; 422 SinkWarning() << "Read invalid buffer from disk";
424 } else { 423 } else {
425 current = adaptorFactory->createAdaptor(buffer.entity()); 424 current = adaptorFactory->createAdaptor(buffer.entity());
426 } 425 }
427 return false; 426 return false;
428 }, 427 },
429 [this](const Storage::Error &error) { ErrorMsg() << "Failed to find value in pipeline: " << error.message; }); 428 [this](const Storage::Error &error) { SinkError() << "Failed to find value in pipeline: " << error.message; });
430 429
431 d->storeNewRevision(newRevision, fbb, bufferType, key); 430 d->storeNewRevision(newRevision, fbb, bufferType, key);
432 431
@@ -442,13 +441,13 @@ void Pipeline::cleanupRevision(qint64 revision)
442 d->revisionChanged = true; 441 d->revisionChanged = true;
443 const auto uid = Storage::getUidFromRevision(d->transaction, revision); 442 const auto uid = Storage::getUidFromRevision(d->transaction, revision);
444 const auto bufferType = Storage::getTypeFromRevision(d->transaction, revision); 443 const auto bufferType = Storage::getTypeFromRevision(d->transaction, revision);
445 Trace() << "Cleaning up revision " << revision << uid << bufferType; 444 SinkTrace() << "Cleaning up revision " << revision << uid << bufferType;
446 Storage::mainDatabase(d->transaction, bufferType) 445 Storage::mainDatabase(d->transaction, bufferType)
447 .scan(uid, 446 .scan(uid,
448 [&](const QByteArray &key, const QByteArray &data) -> bool { 447 [&](const QByteArray &key, const QByteArray &data) -> bool {
449 EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); 448 EntityBuffer buffer(const_cast<const char *>(data.data()), data.size());
450 if (!buffer.isValid()) { 449 if (!buffer.isValid()) {
451 Warning() << "Read invalid buffer from disk"; 450 SinkWarning() << "Read invalid buffer from disk";
452 } else { 451 } else {
453 const auto metadata = flatbuffers::GetRoot<Metadata>(buffer.metadataBuffer()); 452 const auto metadata = flatbuffers::GetRoot<Metadata>(buffer.metadataBuffer());
454 const qint64 rev = metadata->revision(); 453 const qint64 rev = metadata->revision();
@@ -461,7 +460,7 @@ void Pipeline::cleanupRevision(qint64 revision)
461 460
462 return true; 461 return true;
463 }, 462 },
464 [](const Storage::Error &error) { Warning() << "Error while reading: " << error.message; }, true); 463 [](const Storage::Error &error) { SinkWarning() << "Error while reading: " << error.message; }, true);
465 Storage::setCleanedUpRevision(d->transaction, revision); 464 Storage::setCleanedUpRevision(d->transaction, revision);
466} 465}
467 466