summaryrefslogtreecommitdiffstats
path: root/common/pipeline.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'common/pipeline.cpp')
-rw-r--r--common/pipeline.cpp71
1 files changed, 35 insertions, 36 deletions
diff --git a/common/pipeline.cpp b/common/pipeline.cpp
index 034f913..f1a4a32 100644
--- a/common/pipeline.cpp
+++ b/common/pipeline.cpp
@@ -38,8 +38,7 @@
38#include "definitions.h" 38#include "definitions.h"
39#include "bufferutils.h" 39#include "bufferutils.h"
40 40
41#undef DEBUG_AREA 41SINK_DEBUG_AREA("pipeline")
42#define DEBUG_AREA "resource.pipeline"
43 42
44namespace Sink { 43namespace Sink {
45 44
@@ -63,10 +62,10 @@ public:
63 62
64void Pipeline::Private::storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid) 63void Pipeline::Private::storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid)
65{ 64{
66 Trace() << "Committing new revision: " << uid << newRevision; 65 SinkTrace() << "Committing new revision: " << uid << newRevision;
67 Storage::mainDatabase(transaction, bufferType) 66 Storage::mainDatabase(transaction, bufferType)
68 .write(Storage::assembleKey(uid, newRevision), BufferUtils::extractBuffer(fbb), 67 .write(Storage::assembleKey(uid, newRevision), BufferUtils::extractBuffer(fbb),
69 [uid, newRevision](const Storage::Error &error) { Warning() << "Failed to write entity" << uid << newRevision; }); 68 [uid, newRevision](const Storage::Error &error) { SinkWarning() << "Failed to write entity" << uid << newRevision; });
70 revisionChanged = true; 69 revisionChanged = true;
71 Storage::setMaxRevision(transaction, newRevision); 70 Storage::setMaxRevision(transaction, newRevision);
72 Storage::recordRevision(transaction, newRevision, uid, bufferType); 71 Storage::recordRevision(transaction, newRevision, uid, bufferType);
@@ -107,11 +106,11 @@ void Pipeline::startTransaction()
107 if (d->transaction) { 106 if (d->transaction) {
108 return; 107 return;
109 } 108 }
110 Trace() << "Starting transaction."; 109 SinkTrace() << "Starting transaction.";
111 d->transactionTime.start(); 110 d->transactionTime.start();
112 d->transactionItemCount = 0; 111 d->transactionItemCount = 0;
113 d->transaction = storage().createTransaction(Storage::ReadWrite, [](const Sink::Storage::Error &error) { 112 d->transaction = storage().createTransaction(Storage::ReadWrite, [](const Sink::Storage::Error &error) {
114 Warning() << error.message; 113 SinkWarning() << error.message;
115 }); 114 });
116 115
117 //FIXME this is a temporary measure to recover from a failure to open the named databases correctly. 116 //FIXME this is a temporary measure to recover from a failure to open the named databases correctly.
@@ -119,9 +118,9 @@ void Pipeline::startTransaction()
119 //It seems like the validateNamedDatabase calls actually stops the mdb_put failures during sync... 118 //It seems like the validateNamedDatabase calls actually stops the mdb_put failures during sync...
120 if (d->storage.exists()) { 119 if (d->storage.exists()) {
121 while (!d->transaction.validateNamedDatabases()) { 120 while (!d->transaction.validateNamedDatabases()) {
122 Warning() << "Opened an invalid transaction!!!!!!"; 121 SinkWarning() << "Opened an invalid transaction!!!!!!";
123 d->transaction = std::move(storage().createTransaction(Storage::ReadWrite, [](const Sink::Storage::Error &error) { 122 d->transaction = std::move(storage().createTransaction(Storage::ReadWrite, [](const Sink::Storage::Error &error) {
124 Warning() << error.message; 123 SinkWarning() << error.message;
125 })); 124 }));
126 } 125 }
127 } 126 }
@@ -141,7 +140,7 @@ void Pipeline::commit()
141 } 140 }
142 const auto revision = Storage::maxRevision(d->transaction); 141 const auto revision = Storage::maxRevision(d->transaction);
143 const auto elapsed = d->transactionTime.elapsed(); 142 const auto elapsed = d->transactionTime.elapsed();
144 Log() << "Committing revision: " << revision << ":" << d->transactionItemCount << " items in: " << Log::TraceTime(elapsed) << " " 143 SinkLog() << "Committing revision: " << revision << ":" << d->transactionItemCount << " items in: " << Log::TraceTime(elapsed) << " "
145 << (double)elapsed / (double)qMax(d->transactionItemCount, 1) << "[ms/item]"; 144 << (double)elapsed / (double)qMax(d->transactionItemCount, 1) << "[ms/item]";
146 if (d->transaction) { 145 if (d->transaction) {
147 d->transaction.commit(); 146 d->transaction.commit();
@@ -170,7 +169,7 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size)
170 { 169 {
171 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size); 170 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size);
172 if (!Commands::VerifyCreateEntityBuffer(verifyer)) { 171 if (!Commands::VerifyCreateEntityBuffer(verifyer)) {
173 Warning() << "invalid buffer, not a create entity buffer"; 172 SinkWarning() << "invalid buffer, not a create entity buffer";
174 return KAsync::error<qint64>(0); 173 return KAsync::error<qint64>(0);
175 } 174 }
176 } 175 }
@@ -182,7 +181,7 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size)
182 if (createEntity->entityId()) { 181 if (createEntity->entityId()) {
183 key = QByteArray(reinterpret_cast<char const *>(createEntity->entityId()->Data()), createEntity->entityId()->size()); 182 key = QByteArray(reinterpret_cast<char const *>(createEntity->entityId()->Data()), createEntity->entityId()->size());
184 if (Storage::mainDatabase(d->transaction, bufferType).contains(key)) { 183 if (Storage::mainDatabase(d->transaction, bufferType).contains(key)) {
185 ErrorMsg() << "An entity with this id already exists: " << key; 184 SinkError() << "An entity with this id already exists: " << key;
186 return KAsync::error<qint64>(0); 185 return KAsync::error<qint64>(0);
187 } 186 }
188 } 187 }
@@ -190,25 +189,25 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size)
190 if (key.isEmpty()) { 189 if (key.isEmpty()) {
191 key = Sink::Storage::generateUid(); 190 key = Sink::Storage::generateUid();
192 } 191 }
193 Log() << "New Entity. Type: " << bufferType << "uid: "<< key << " replayToSource: " << replayToSource; 192 SinkLog() << "New Entity. Type: " << bufferType << "uid: "<< key << " replayToSource: " << replayToSource;
194 Q_ASSERT(!key.isEmpty()); 193 Q_ASSERT(!key.isEmpty());
195 194
196 { 195 {
197 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(createEntity->delta()->Data()), createEntity->delta()->size()); 196 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(createEntity->delta()->Data()), createEntity->delta()->size());
198 if (!VerifyEntityBuffer(verifyer)) { 197 if (!VerifyEntityBuffer(verifyer)) {
199 Warning() << "invalid buffer, not an entity buffer"; 198 SinkWarning() << "invalid buffer, not an entity buffer";
200 return KAsync::error<qint64>(0); 199 return KAsync::error<qint64>(0);
201 } 200 }
202 } 201 }
203 auto entity = GetEntity(createEntity->delta()->Data()); 202 auto entity = GetEntity(createEntity->delta()->Data());
204 if (!entity->resource()->size() && !entity->local()->size()) { 203 if (!entity->resource()->size() && !entity->local()->size()) {
205 Warning() << "No local and no resource buffer while trying to create entity."; 204 SinkWarning() << "No local and no resource buffer while trying to create entity.";
206 return KAsync::error<qint64>(0); 205 return KAsync::error<qint64>(0);
207 } 206 }
208 207
209 auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceType, bufferType); 208 auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceType, bufferType);
210 if (!adaptorFactory) { 209 if (!adaptorFactory) {
211 Warning() << "no adaptor factory for type " << bufferType; 210 SinkWarning() << "no adaptor factory for type " << bufferType;
212 return KAsync::error<qint64>(0); 211 return KAsync::error<qint64>(0);
213 } 212 }
214 213
@@ -244,7 +243,7 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size)
244 { 243 {
245 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size); 244 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size);
246 if (!Commands::VerifyModifyEntityBuffer(verifyer)) { 245 if (!Commands::VerifyModifyEntityBuffer(verifyer)) {
247 Warning() << "invalid buffer, not a modify entity buffer"; 246 SinkWarning() << "invalid buffer, not a modify entity buffer";
248 return KAsync::error<qint64>(0); 247 return KAsync::error<qint64>(0);
249 } 248 }
250 } 249 }
@@ -254,21 +253,21 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size)
254 if (modifyEntity->modifiedProperties()) { 253 if (modifyEntity->modifiedProperties()) {
255 changeset = BufferUtils::fromVector(*modifyEntity->modifiedProperties()); 254 changeset = BufferUtils::fromVector(*modifyEntity->modifiedProperties());
256 } else { 255 } else {
257 Warning() << "No changeset available"; 256 SinkWarning() << "No changeset available";
258 } 257 }
259 const qint64 baseRevision = modifyEntity->revision(); 258 const qint64 baseRevision = modifyEntity->revision();
260 const bool replayToSource = modifyEntity->replayToSource(); 259 const bool replayToSource = modifyEntity->replayToSource();
261 const QByteArray bufferType = QByteArray(reinterpret_cast<char const *>(modifyEntity->domainType()->Data()), modifyEntity->domainType()->size()); 260 const QByteArray bufferType = QByteArray(reinterpret_cast<char const *>(modifyEntity->domainType()->Data()), modifyEntity->domainType()->size());
262 const QByteArray key = QByteArray(reinterpret_cast<char const *>(modifyEntity->entityId()->Data()), modifyEntity->entityId()->size()); 261 const QByteArray key = QByteArray(reinterpret_cast<char const *>(modifyEntity->entityId()->Data()), modifyEntity->entityId()->size());
263 Log() << "Modified Entity. Type: " << bufferType << "uid: "<< key << " replayToSource: " << replayToSource; 262 SinkLog() << "Modified Entity. Type: " << bufferType << "uid: "<< key << " replayToSource: " << replayToSource;
264 if (bufferType.isEmpty() || key.isEmpty()) { 263 if (bufferType.isEmpty() || key.isEmpty()) {
265 Warning() << "entity type or key " << bufferType << key; 264 SinkWarning() << "entity type or key " << bufferType << key;
266 return KAsync::error<qint64>(0); 265 return KAsync::error<qint64>(0);
267 } 266 }
268 { 267 {
269 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(modifyEntity->delta()->Data()), modifyEntity->delta()->size()); 268 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(modifyEntity->delta()->Data()), modifyEntity->delta()->size());
270 if (!VerifyEntityBuffer(verifyer)) { 269 if (!VerifyEntityBuffer(verifyer)) {
271 Warning() << "invalid buffer, not an entity buffer"; 270 SinkWarning() << "invalid buffer, not an entity buffer";
272 return KAsync::error<qint64>(0); 271 return KAsync::error<qint64>(0);
273 } 272 }
274 } 273 }
@@ -276,7 +275,7 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size)
276 // TODO use only readPropertyMapper and writePropertyMapper 275 // TODO use only readPropertyMapper and writePropertyMapper
277 auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceType, bufferType); 276 auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceType, bufferType);
278 if (!adaptorFactory) { 277 if (!adaptorFactory) {
279 Warning() << "no adaptor factory for type " << bufferType; 278 SinkWarning() << "no adaptor factory for type " << bufferType;
280 return KAsync::error<qint64>(0); 279 return KAsync::error<qint64>(0);
281 } 280 }
282 281
@@ -290,16 +289,16 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size)
290 [&current, adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool { 289 [&current, adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool {
291 EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); 290 EntityBuffer buffer(const_cast<const char *>(data.data()), data.size());
292 if (!buffer.isValid()) { 291 if (!buffer.isValid()) {
293 Warning() << "Read invalid buffer from disk"; 292 SinkWarning() << "Read invalid buffer from disk";
294 } else { 293 } else {
295 current = adaptorFactory->createAdaptor(buffer.entity()); 294 current = adaptorFactory->createAdaptor(buffer.entity());
296 } 295 }
297 return false; 296 return false;
298 }, 297 },
299 [baseRevision](const Storage::Error &error) { Warning() << "Failed to read old revision from storage: " << error.message << "Revision: " << baseRevision; }); 298 [baseRevision](const Storage::Error &error) { SinkWarning() << "Failed to read old revision from storage: " << error.message << "Revision: " << baseRevision; });
300 299
301 if (!current) { 300 if (!current) {
302 Warning() << "Failed to read local value " << key; 301 SinkWarning() << "Failed to read local value " << key;
303 return KAsync::error<qint64>(0); 302 return KAsync::error<qint64>(0);
304 } 303 }
305 304
@@ -307,7 +306,7 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size)
307 306
308 // Apply diff 307 // Apply diff
309 // FIXME only apply the properties that are available in the buffer 308 // FIXME only apply the properties that are available in the buffer
310 Trace() << "Applying changed properties: " << changeset; 309 SinkTrace() << "Applying changed properties: " << changeset;
311 for (const auto &property : changeset) { 310 for (const auto &property : changeset) {
312 const auto value = diff->getProperty(property); 311 const auto value = diff->getProperty(property);
313 if (value.isValid()) { 312 if (value.isValid()) {
@@ -357,7 +356,7 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size)
357 { 356 {
358 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size); 357 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size);
359 if (!Commands::VerifyDeleteEntityBuffer(verifyer)) { 358 if (!Commands::VerifyDeleteEntityBuffer(verifyer)) {
360 Warning() << "invalid buffer, not a delete entity buffer"; 359 SinkWarning() << "invalid buffer, not a delete entity buffer";
361 return KAsync::error<qint64>(0); 360 return KAsync::error<qint64>(0);
362 } 361 }
363 } 362 }
@@ -366,7 +365,7 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size)
366 const bool replayToSource = deleteEntity->replayToSource(); 365 const bool replayToSource = deleteEntity->replayToSource();
367 const QByteArray bufferType = QByteArray(reinterpret_cast<char const *>(deleteEntity->domainType()->Data()), deleteEntity->domainType()->size()); 366 const QByteArray bufferType = QByteArray(reinterpret_cast<char const *>(deleteEntity->domainType()->Data()), deleteEntity->domainType()->size());
368 const QByteArray key = QByteArray(reinterpret_cast<char const *>(deleteEntity->entityId()->Data()), deleteEntity->entityId()->size()); 367 const QByteArray key = QByteArray(reinterpret_cast<char const *>(deleteEntity->entityId()->Data()), deleteEntity->entityId()->size());
369 Log() << "Deleted Entity. Type: " << bufferType << "uid: "<< key << " replayToSource: " << replayToSource; 368 SinkLog() << "Deleted Entity. Type: " << bufferType << "uid: "<< key << " replayToSource: " << replayToSource;
370 369
371 bool found = false; 370 bool found = false;
372 bool alreadyRemoved = false; 371 bool alreadyRemoved = false;
@@ -383,14 +382,14 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size)
383 } 382 }
384 return false; 383 return false;
385 }, 384 },
386 [](const Storage::Error &error) { Warning() << "Failed to read old revision from storage: " << error.message; }); 385 [](const Storage::Error &error) { SinkWarning() << "Failed to read old revision from storage: " << error.message; });
387 386
388 if (!found) { 387 if (!found) {
389 Warning() << "Failed to find entity " << key; 388 SinkWarning() << "Failed to find entity " << key;
390 return KAsync::error<qint64>(0); 389 return KAsync::error<qint64>(0);
391 } 390 }
392 if (alreadyRemoved) { 391 if (alreadyRemoved) {
393 Warning() << "Entity is already removed " << key; 392 SinkWarning() << "Entity is already removed " << key;
394 return KAsync::error<qint64>(0); 393 return KAsync::error<qint64>(0);
395 } 394 }
396 395
@@ -410,7 +409,7 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size)
410 409
411 auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceType, bufferType); 410 auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceType, bufferType);
412 if (!adaptorFactory) { 411 if (!adaptorFactory) {
413 Warning() << "no adaptor factory for type " << bufferType; 412 SinkWarning() << "no adaptor factory for type " << bufferType;
414 return KAsync::error<qint64>(0); 413 return KAsync::error<qint64>(0);
415 } 414 }
416 415
@@ -420,13 +419,13 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size)
420 [this, bufferType, newRevision, adaptorFactory, key, &current](const QByteArray &, const QByteArray &data) -> bool { 419 [this, bufferType, newRevision, adaptorFactory, key, &current](const QByteArray &, const QByteArray &data) -> bool {
421 EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); 420 EntityBuffer buffer(const_cast<const char *>(data.data()), data.size());
422 if (!buffer.isValid()) { 421 if (!buffer.isValid()) {
423 Warning() << "Read invalid buffer from disk"; 422 SinkWarning() << "Read invalid buffer from disk";
424 } else { 423 } else {
425 current = adaptorFactory->createAdaptor(buffer.entity()); 424 current = adaptorFactory->createAdaptor(buffer.entity());
426 } 425 }
427 return false; 426 return false;
428 }, 427 },
429 [this](const Storage::Error &error) { ErrorMsg() << "Failed to find value in pipeline: " << error.message; }); 428 [this](const Storage::Error &error) { SinkError() << "Failed to find value in pipeline: " << error.message; });
430 429
431 d->storeNewRevision(newRevision, fbb, bufferType, key); 430 d->storeNewRevision(newRevision, fbb, bufferType, key);
432 431
@@ -442,13 +441,13 @@ void Pipeline::cleanupRevision(qint64 revision)
442 d->revisionChanged = true; 441 d->revisionChanged = true;
443 const auto uid = Storage::getUidFromRevision(d->transaction, revision); 442 const auto uid = Storage::getUidFromRevision(d->transaction, revision);
444 const auto bufferType = Storage::getTypeFromRevision(d->transaction, revision); 443 const auto bufferType = Storage::getTypeFromRevision(d->transaction, revision);
445 Trace() << "Cleaning up revision " << revision << uid << bufferType; 444 SinkTrace() << "Cleaning up revision " << revision << uid << bufferType;
446 Storage::mainDatabase(d->transaction, bufferType) 445 Storage::mainDatabase(d->transaction, bufferType)
447 .scan(uid, 446 .scan(uid,
448 [&](const QByteArray &key, const QByteArray &data) -> bool { 447 [&](const QByteArray &key, const QByteArray &data) -> bool {
449 EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); 448 EntityBuffer buffer(const_cast<const char *>(data.data()), data.size());
450 if (!buffer.isValid()) { 449 if (!buffer.isValid()) {
451 Warning() << "Read invalid buffer from disk"; 450 SinkWarning() << "Read invalid buffer from disk";
452 } else { 451 } else {
453 const auto metadata = flatbuffers::GetRoot<Metadata>(buffer.metadataBuffer()); 452 const auto metadata = flatbuffers::GetRoot<Metadata>(buffer.metadataBuffer());
454 const qint64 rev = metadata->revision(); 453 const qint64 rev = metadata->revision();
@@ -461,7 +460,7 @@ void Pipeline::cleanupRevision(qint64 revision)
461 460
462 return true; 461 return true;
463 }, 462 },
464 [](const Storage::Error &error) { Warning() << "Error while reading: " << error.message; }, true); 463 [](const Storage::Error &error) { SinkWarning() << "Error while reading: " << error.message; }, true);
465 Storage::setCleanedUpRevision(d->transaction, revision); 464 Storage::setCleanedUpRevision(d->transaction, revision);
466} 465}
467 466