diff options
Diffstat (limited to 'common/pipeline.cpp')
-rw-r--r-- | common/pipeline.cpp | 90 |
1 files changed, 45 insertions, 45 deletions
diff --git a/common/pipeline.cpp b/common/pipeline.cpp index c6d5297..f1a4a32 100644 --- a/common/pipeline.cpp +++ b/common/pipeline.cpp | |||
@@ -38,8 +38,7 @@ | |||
38 | #include "definitions.h" | 38 | #include "definitions.h" |
39 | #include "bufferutils.h" | 39 | #include "bufferutils.h" |
40 | 40 | ||
41 | #undef DEBUG_AREA | 41 | SINK_DEBUG_AREA("pipeline") |
42 | #define DEBUG_AREA "resource.pipeline" | ||
43 | 42 | ||
44 | namespace Sink { | 43 | namespace Sink { |
45 | 44 | ||
@@ -52,7 +51,7 @@ public: | |||
52 | 51 | ||
53 | Storage storage; | 52 | Storage storage; |
54 | Storage::Transaction transaction; | 53 | Storage::Transaction transaction; |
55 | QHash<QString, QVector<Preprocessor *>> processors; | 54 | QHash<QString, QVector<QSharedPointer<Preprocessor>>> processors; |
56 | bool revisionChanged; | 55 | bool revisionChanged; |
57 | void storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid); | 56 | void storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid); |
58 | QTime transactionTime; | 57 | QTime transactionTime; |
@@ -63,10 +62,10 @@ public: | |||
63 | 62 | ||
64 | void Pipeline::Private::storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid) | 63 | void Pipeline::Private::storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid) |
65 | { | 64 | { |
66 | Trace() << "Committing new revision: " << uid << newRevision; | 65 | SinkTrace() << "Committing new revision: " << uid << newRevision; |
67 | Storage::mainDatabase(transaction, bufferType) | 66 | Storage::mainDatabase(transaction, bufferType) |
68 | .write(Storage::assembleKey(uid, newRevision), BufferUtils::extractBuffer(fbb), | 67 | .write(Storage::assembleKey(uid, newRevision), BufferUtils::extractBuffer(fbb), |
69 | [uid, newRevision](const Storage::Error &error) { Warning() << "Failed to write entity" << uid << newRevision; }); | 68 | [uid, newRevision](const Storage::Error &error) { SinkWarning() << "Failed to write entity" << uid << newRevision; }); |
70 | revisionChanged = true; | 69 | revisionChanged = true; |
71 | Storage::setMaxRevision(transaction, newRevision); | 70 | Storage::setMaxRevision(transaction, newRevision); |
72 | Storage::recordRevision(transaction, newRevision, uid, bufferType); | 71 | Storage::recordRevision(transaction, newRevision, uid, bufferType); |
@@ -79,15 +78,17 @@ Pipeline::Pipeline(const QString &resourceName, QObject *parent) : QObject(paren | |||
79 | 78 | ||
80 | Pipeline::~Pipeline() | 79 | Pipeline::~Pipeline() |
81 | { | 80 | { |
82 | delete d; | 81 | d->transaction = Storage::Transaction(); |
83 | } | 82 | } |
84 | 83 | ||
85 | void Pipeline::setPreprocessors(const QString &entityType, const QVector<Preprocessor *> &processors) | 84 | void Pipeline::setPreprocessors(const QString &entityType, const QVector<Preprocessor *> &processors) |
86 | { | 85 | { |
86 | auto &list = d->processors[entityType]; | ||
87 | list.clear(); | ||
87 | for (auto p : processors) { | 88 | for (auto p : processors) { |
88 | p->setup(d->resourceType, d->resourceInstanceIdentifier, this); | 89 | p->setup(d->resourceType, d->resourceInstanceIdentifier, this); |
90 | list.append(QSharedPointer<Preprocessor>(p)); | ||
89 | } | 91 | } |
90 | d->processors[entityType] = processors; | ||
91 | } | 92 | } |
92 | 93 | ||
93 | void Pipeline::setResourceType(const QByteArray &resourceType) | 94 | void Pipeline::setResourceType(const QByteArray &resourceType) |
@@ -105,21 +106,21 @@ void Pipeline::startTransaction() | |||
105 | if (d->transaction) { | 106 | if (d->transaction) { |
106 | return; | 107 | return; |
107 | } | 108 | } |
108 | Trace() << "Starting transaction."; | 109 | SinkTrace() << "Starting transaction."; |
109 | d->transactionTime.start(); | 110 | d->transactionTime.start(); |
110 | d->transactionItemCount = 0; | 111 | d->transactionItemCount = 0; |
111 | d->transaction = std::move(storage().createTransaction(Storage::ReadWrite, [](const Sink::Storage::Error &error) { | 112 | d->transaction = storage().createTransaction(Storage::ReadWrite, [](const Sink::Storage::Error &error) { |
112 | Warning() << error.message; | 113 | SinkWarning() << error.message; |
113 | })); | 114 | }); |
114 | 115 | ||
115 | //FIXME this is a temporary measure to recover from a failure to open the named databases correctly. | 116 | //FIXME this is a temporary measure to recover from a failure to open the named databases correctly. |
116 | //Once the actual problem is fixed it will be enough to simply crash if we open the wrong database (which we check in openDatabase already). | 117 | //Once the actual problem is fixed it will be enough to simply crash if we open the wrong database (which we check in openDatabase already). |
117 | //It seems like the validateNamedDatabase calls actually stops the mdb_put failures during sync... | 118 | //It seems like the validateNamedDatabase calls actually stops the mdb_put failures during sync... |
118 | if (d->storage.exists()) { | 119 | if (d->storage.exists()) { |
119 | while (!d->transaction.validateNamedDatabases()) { | 120 | while (!d->transaction.validateNamedDatabases()) { |
120 | Warning() << "Opened an invalid transaction!!!!!!"; | 121 | SinkWarning() << "Opened an invalid transaction!!!!!!"; |
121 | d->transaction = std::move(storage().createTransaction(Storage::ReadWrite, [](const Sink::Storage::Error &error) { | 122 | d->transaction = std::move(storage().createTransaction(Storage::ReadWrite, [](const Sink::Storage::Error &error) { |
122 | Warning() << error.message; | 123 | SinkWarning() << error.message; |
123 | })); | 124 | })); |
124 | } | 125 | } |
125 | } | 126 | } |
@@ -139,7 +140,7 @@ void Pipeline::commit() | |||
139 | } | 140 | } |
140 | const auto revision = Storage::maxRevision(d->transaction); | 141 | const auto revision = Storage::maxRevision(d->transaction); |
141 | const auto elapsed = d->transactionTime.elapsed(); | 142 | const auto elapsed = d->transactionTime.elapsed(); |
142 | Log() << "Committing revision: " << revision << ":" << d->transactionItemCount << " items in: " << Log::TraceTime(elapsed) << " " | 143 | SinkLog() << "Committing revision: " << revision << ":" << d->transactionItemCount << " items in: " << Log::TraceTime(elapsed) << " " |
143 | << (double)elapsed / (double)qMax(d->transactionItemCount, 1) << "[ms/item]"; | 144 | << (double)elapsed / (double)qMax(d->transactionItemCount, 1) << "[ms/item]"; |
144 | if (d->transaction) { | 145 | if (d->transaction) { |
145 | d->transaction.commit(); | 146 | d->transaction.commit(); |
@@ -168,7 +169,7 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size) | |||
168 | { | 169 | { |
169 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size); | 170 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size); |
170 | if (!Commands::VerifyCreateEntityBuffer(verifyer)) { | 171 | if (!Commands::VerifyCreateEntityBuffer(verifyer)) { |
171 | Warning() << "invalid buffer, not a create entity buffer"; | 172 | SinkWarning() << "invalid buffer, not a create entity buffer"; |
172 | return KAsync::error<qint64>(0); | 173 | return KAsync::error<qint64>(0); |
173 | } | 174 | } |
174 | } | 175 | } |
@@ -180,7 +181,7 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size) | |||
180 | if (createEntity->entityId()) { | 181 | if (createEntity->entityId()) { |
181 | key = QByteArray(reinterpret_cast<char const *>(createEntity->entityId()->Data()), createEntity->entityId()->size()); | 182 | key = QByteArray(reinterpret_cast<char const *>(createEntity->entityId()->Data()), createEntity->entityId()->size()); |
182 | if (Storage::mainDatabase(d->transaction, bufferType).contains(key)) { | 183 | if (Storage::mainDatabase(d->transaction, bufferType).contains(key)) { |
183 | ErrorMsg() << "An entity with this id already exists: " << key; | 184 | SinkError() << "An entity with this id already exists: " << key; |
184 | return KAsync::error<qint64>(0); | 185 | return KAsync::error<qint64>(0); |
185 | } | 186 | } |
186 | } | 187 | } |
@@ -188,31 +189,31 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size) | |||
188 | if (key.isEmpty()) { | 189 | if (key.isEmpty()) { |
189 | key = Sink::Storage::generateUid(); | 190 | key = Sink::Storage::generateUid(); |
190 | } | 191 | } |
191 | Log() << "New Entity. Type: " << bufferType << "uid: "<< key << " replayToSource: " << replayToSource; | 192 | SinkLog() << "New Entity. Type: " << bufferType << "uid: "<< key << " replayToSource: " << replayToSource; |
192 | Q_ASSERT(!key.isEmpty()); | 193 | Q_ASSERT(!key.isEmpty()); |
193 | 194 | ||
194 | { | 195 | { |
195 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(createEntity->delta()->Data()), createEntity->delta()->size()); | 196 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(createEntity->delta()->Data()), createEntity->delta()->size()); |
196 | if (!VerifyEntityBuffer(verifyer)) { | 197 | if (!VerifyEntityBuffer(verifyer)) { |
197 | Warning() << "invalid buffer, not an entity buffer"; | 198 | SinkWarning() << "invalid buffer, not an entity buffer"; |
198 | return KAsync::error<qint64>(0); | 199 | return KAsync::error<qint64>(0); |
199 | } | 200 | } |
200 | } | 201 | } |
201 | auto entity = GetEntity(createEntity->delta()->Data()); | 202 | auto entity = GetEntity(createEntity->delta()->Data()); |
202 | if (!entity->resource()->size() && !entity->local()->size()) { | 203 | if (!entity->resource()->size() && !entity->local()->size()) { |
203 | Warning() << "No local and no resource buffer while trying to create entity."; | 204 | SinkWarning() << "No local and no resource buffer while trying to create entity."; |
204 | return KAsync::error<qint64>(0); | 205 | return KAsync::error<qint64>(0); |
205 | } | 206 | } |
206 | 207 | ||
207 | auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceType, bufferType); | 208 | auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceType, bufferType); |
208 | if (!adaptorFactory) { | 209 | if (!adaptorFactory) { |
209 | Warning() << "no adaptor factory for type " << bufferType; | 210 | SinkWarning() << "no adaptor factory for type " << bufferType; |
210 | return KAsync::error<qint64>(0); | 211 | return KAsync::error<qint64>(0); |
211 | } | 212 | } |
212 | 213 | ||
213 | auto adaptor = adaptorFactory->createAdaptor(*entity); | 214 | auto adaptor = adaptorFactory->createAdaptor(*entity); |
214 | auto memoryAdaptor = QSharedPointer<Sink::ApplicationDomain::MemoryBufferAdaptor>::create(*(adaptor), adaptor->availableProperties()); | 215 | auto memoryAdaptor = QSharedPointer<Sink::ApplicationDomain::MemoryBufferAdaptor>::create(*(adaptor), adaptor->availableProperties()); |
215 | for (auto processor : d->processors[bufferType]) { | 216 | foreach (const auto &processor, d->processors[bufferType]) { |
216 | processor->newEntity(key, Storage::maxRevision(d->transaction) + 1, *memoryAdaptor, d->transaction); | 217 | processor->newEntity(key, Storage::maxRevision(d->transaction) + 1, *memoryAdaptor, d->transaction); |
217 | } | 218 | } |
218 | //The maxRevision may have changed meanwhile if the entity created sub-entities | 219 | //The maxRevision may have changed meanwhile if the entity created sub-entities |
@@ -242,7 +243,7 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size) | |||
242 | { | 243 | { |
243 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size); | 244 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size); |
244 | if (!Commands::VerifyModifyEntityBuffer(verifyer)) { | 245 | if (!Commands::VerifyModifyEntityBuffer(verifyer)) { |
245 | Warning() << "invalid buffer, not a modify entity buffer"; | 246 | SinkWarning() << "invalid buffer, not a modify entity buffer"; |
246 | return KAsync::error<qint64>(0); | 247 | return KAsync::error<qint64>(0); |
247 | } | 248 | } |
248 | } | 249 | } |
@@ -252,21 +253,21 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size) | |||
252 | if (modifyEntity->modifiedProperties()) { | 253 | if (modifyEntity->modifiedProperties()) { |
253 | changeset = BufferUtils::fromVector(*modifyEntity->modifiedProperties()); | 254 | changeset = BufferUtils::fromVector(*modifyEntity->modifiedProperties()); |
254 | } else { | 255 | } else { |
255 | Warning() << "No changeset available"; | 256 | SinkWarning() << "No changeset available"; |
256 | } | 257 | } |
257 | const qint64 baseRevision = modifyEntity->revision(); | 258 | const qint64 baseRevision = modifyEntity->revision(); |
258 | const bool replayToSource = modifyEntity->replayToSource(); | 259 | const bool replayToSource = modifyEntity->replayToSource(); |
259 | const QByteArray bufferType = QByteArray(reinterpret_cast<char const *>(modifyEntity->domainType()->Data()), modifyEntity->domainType()->size()); | 260 | const QByteArray bufferType = QByteArray(reinterpret_cast<char const *>(modifyEntity->domainType()->Data()), modifyEntity->domainType()->size()); |
260 | const QByteArray key = QByteArray(reinterpret_cast<char const *>(modifyEntity->entityId()->Data()), modifyEntity->entityId()->size()); | 261 | const QByteArray key = QByteArray(reinterpret_cast<char const *>(modifyEntity->entityId()->Data()), modifyEntity->entityId()->size()); |
261 | Log() << "Modified Entity. Type: " << bufferType << "uid: "<< key << " replayToSource: " << replayToSource; | 262 | SinkLog() << "Modified Entity. Type: " << bufferType << "uid: "<< key << " replayToSource: " << replayToSource; |
262 | if (bufferType.isEmpty() || key.isEmpty()) { | 263 | if (bufferType.isEmpty() || key.isEmpty()) { |
263 | Warning() << "entity type or key " << bufferType << key; | 264 | SinkWarning() << "entity type or key " << bufferType << key; |
264 | return KAsync::error<qint64>(0); | 265 | return KAsync::error<qint64>(0); |
265 | } | 266 | } |
266 | { | 267 | { |
267 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(modifyEntity->delta()->Data()), modifyEntity->delta()->size()); | 268 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(modifyEntity->delta()->Data()), modifyEntity->delta()->size()); |
268 | if (!VerifyEntityBuffer(verifyer)) { | 269 | if (!VerifyEntityBuffer(verifyer)) { |
269 | Warning() << "invalid buffer, not an entity buffer"; | 270 | SinkWarning() << "invalid buffer, not an entity buffer"; |
270 | return KAsync::error<qint64>(0); | 271 | return KAsync::error<qint64>(0); |
271 | } | 272 | } |
272 | } | 273 | } |
@@ -274,7 +275,7 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size) | |||
274 | // TODO use only readPropertyMapper and writePropertyMapper | 275 | // TODO use only readPropertyMapper and writePropertyMapper |
275 | auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceType, bufferType); | 276 | auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceType, bufferType); |
276 | if (!adaptorFactory) { | 277 | if (!adaptorFactory) { |
277 | Warning() << "no adaptor factory for type " << bufferType; | 278 | SinkWarning() << "no adaptor factory for type " << bufferType; |
278 | return KAsync::error<qint64>(0); | 279 | return KAsync::error<qint64>(0); |
279 | } | 280 | } |
280 | 281 | ||
@@ -288,16 +289,16 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size) | |||
288 | [¤t, adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool { | 289 | [¤t, adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool { |
289 | EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); | 290 | EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); |
290 | if (!buffer.isValid()) { | 291 | if (!buffer.isValid()) { |
291 | Warning() << "Read invalid buffer from disk"; | 292 | SinkWarning() << "Read invalid buffer from disk"; |
292 | } else { | 293 | } else { |
293 | current = adaptorFactory->createAdaptor(buffer.entity()); | 294 | current = adaptorFactory->createAdaptor(buffer.entity()); |
294 | } | 295 | } |
295 | return false; | 296 | return false; |
296 | }, | 297 | }, |
297 | [baseRevision](const Storage::Error &error) { Warning() << "Failed to read old revision from storage: " << error.message << "Revision: " << baseRevision; }); | 298 | [baseRevision](const Storage::Error &error) { SinkWarning() << "Failed to read old revision from storage: " << error.message << "Revision: " << baseRevision; }); |
298 | 299 | ||
299 | if (!current) { | 300 | if (!current) { |
300 | Warning() << "Failed to read local value " << key; | 301 | SinkWarning() << "Failed to read local value " << key; |
301 | return KAsync::error<qint64>(0); | 302 | return KAsync::error<qint64>(0); |
302 | } | 303 | } |
303 | 304 | ||
@@ -305,7 +306,7 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size) | |||
305 | 306 | ||
306 | // Apply diff | 307 | // Apply diff |
307 | // FIXME only apply the properties that are available in the buffer | 308 | // FIXME only apply the properties that are available in the buffer |
308 | Trace() << "Applying changed properties: " << changeset; | 309 | SinkTrace() << "Applying changed properties: " << changeset; |
309 | for (const auto &property : changeset) { | 310 | for (const auto &property : changeset) { |
310 | const auto value = diff->getProperty(property); | 311 | const auto value = diff->getProperty(property); |
311 | if (value.isValid()) { | 312 | if (value.isValid()) { |
@@ -321,7 +322,7 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size) | |||
321 | } | 322 | } |
322 | 323 | ||
323 | newAdaptor->resetChangedProperties(); | 324 | newAdaptor->resetChangedProperties(); |
324 | for (auto processor : d->processors[bufferType]) { | 325 | foreach (const auto &processor, d->processors[bufferType]) { |
325 | processor->modifiedEntity(key, Storage::maxRevision(d->transaction) + 1, *current, *newAdaptor, d->transaction); | 326 | processor->modifiedEntity(key, Storage::maxRevision(d->transaction) + 1, *current, *newAdaptor, d->transaction); |
326 | } | 327 | } |
327 | //The maxRevision may have changed meanwhile if the entity created sub-entities | 328 | //The maxRevision may have changed meanwhile if the entity created sub-entities |
@@ -355,7 +356,7 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size) | |||
355 | { | 356 | { |
356 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size); | 357 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size); |
357 | if (!Commands::VerifyDeleteEntityBuffer(verifyer)) { | 358 | if (!Commands::VerifyDeleteEntityBuffer(verifyer)) { |
358 | Warning() << "invalid buffer, not a delete entity buffer"; | 359 | SinkWarning() << "invalid buffer, not a delete entity buffer"; |
359 | return KAsync::error<qint64>(0); | 360 | return KAsync::error<qint64>(0); |
360 | } | 361 | } |
361 | } | 362 | } |
@@ -364,7 +365,7 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size) | |||
364 | const bool replayToSource = deleteEntity->replayToSource(); | 365 | const bool replayToSource = deleteEntity->replayToSource(); |
365 | const QByteArray bufferType = QByteArray(reinterpret_cast<char const *>(deleteEntity->domainType()->Data()), deleteEntity->domainType()->size()); | 366 | const QByteArray bufferType = QByteArray(reinterpret_cast<char const *>(deleteEntity->domainType()->Data()), deleteEntity->domainType()->size()); |
366 | const QByteArray key = QByteArray(reinterpret_cast<char const *>(deleteEntity->entityId()->Data()), deleteEntity->entityId()->size()); | 367 | const QByteArray key = QByteArray(reinterpret_cast<char const *>(deleteEntity->entityId()->Data()), deleteEntity->entityId()->size()); |
367 | Log() << "Deleted Entity. Type: " << bufferType << "uid: "<< key << " replayToSource: " << replayToSource; | 368 | SinkLog() << "Deleted Entity. Type: " << bufferType << "uid: "<< key << " replayToSource: " << replayToSource; |
368 | 369 | ||
369 | bool found = false; | 370 | bool found = false; |
370 | bool alreadyRemoved = false; | 371 | bool alreadyRemoved = false; |
@@ -381,14 +382,14 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size) | |||
381 | } | 382 | } |
382 | return false; | 383 | return false; |
383 | }, | 384 | }, |
384 | [](const Storage::Error &error) { Warning() << "Failed to read old revision from storage: " << error.message; }); | 385 | [](const Storage::Error &error) { SinkWarning() << "Failed to read old revision from storage: " << error.message; }); |
385 | 386 | ||
386 | if (!found) { | 387 | if (!found) { |
387 | Warning() << "Failed to find entity " << key; | 388 | SinkWarning() << "Failed to find entity " << key; |
388 | return KAsync::error<qint64>(0); | 389 | return KAsync::error<qint64>(0); |
389 | } | 390 | } |
390 | if (alreadyRemoved) { | 391 | if (alreadyRemoved) { |
391 | Warning() << "Entity is already removed " << key; | 392 | SinkWarning() << "Entity is already removed " << key; |
392 | return KAsync::error<qint64>(0); | 393 | return KAsync::error<qint64>(0); |
393 | } | 394 | } |
394 | 395 | ||
@@ -408,7 +409,7 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size) | |||
408 | 409 | ||
409 | auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceType, bufferType); | 410 | auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceType, bufferType); |
410 | if (!adaptorFactory) { | 411 | if (!adaptorFactory) { |
411 | Warning() << "no adaptor factory for type " << bufferType; | 412 | SinkWarning() << "no adaptor factory for type " << bufferType; |
412 | return KAsync::error<qint64>(0); | 413 | return KAsync::error<qint64>(0); |
413 | } | 414 | } |
414 | 415 | ||
@@ -418,17 +419,17 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size) | |||
418 | [this, bufferType, newRevision, adaptorFactory, key, ¤t](const QByteArray &, const QByteArray &data) -> bool { | 419 | [this, bufferType, newRevision, adaptorFactory, key, ¤t](const QByteArray &, const QByteArray &data) -> bool { |
419 | EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); | 420 | EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); |
420 | if (!buffer.isValid()) { | 421 | if (!buffer.isValid()) { |
421 | Warning() << "Read invalid buffer from disk"; | 422 | SinkWarning() << "Read invalid buffer from disk"; |
422 | } else { | 423 | } else { |
423 | current = adaptorFactory->createAdaptor(buffer.entity()); | 424 | current = adaptorFactory->createAdaptor(buffer.entity()); |
424 | } | 425 | } |
425 | return false; | 426 | return false; |
426 | }, | 427 | }, |
427 | [this](const Storage::Error &error) { ErrorMsg() << "Failed to find value in pipeline: " << error.message; }); | 428 | [this](const Storage::Error &error) { SinkError() << "Failed to find value in pipeline: " << error.message; }); |
428 | 429 | ||
429 | d->storeNewRevision(newRevision, fbb, bufferType, key); | 430 | d->storeNewRevision(newRevision, fbb, bufferType, key); |
430 | 431 | ||
431 | for (auto processor : d->processors[bufferType]) { | 432 | foreach (const auto &processor, d->processors[bufferType]) { |
432 | processor->deletedEntity(key, newRevision, *current, d->transaction); | 433 | processor->deletedEntity(key, newRevision, *current, d->transaction); |
433 | } | 434 | } |
434 | 435 | ||
@@ -440,13 +441,13 @@ void Pipeline::cleanupRevision(qint64 revision) | |||
440 | d->revisionChanged = true; | 441 | d->revisionChanged = true; |
441 | const auto uid = Storage::getUidFromRevision(d->transaction, revision); | 442 | const auto uid = Storage::getUidFromRevision(d->transaction, revision); |
442 | const auto bufferType = Storage::getTypeFromRevision(d->transaction, revision); | 443 | const auto bufferType = Storage::getTypeFromRevision(d->transaction, revision); |
443 | Trace() << "Cleaning up revision " << revision << uid << bufferType; | 444 | SinkTrace() << "Cleaning up revision " << revision << uid << bufferType; |
444 | Storage::mainDatabase(d->transaction, bufferType) | 445 | Storage::mainDatabase(d->transaction, bufferType) |
445 | .scan(uid, | 446 | .scan(uid, |
446 | [&](const QByteArray &key, const QByteArray &data) -> bool { | 447 | [&](const QByteArray &key, const QByteArray &data) -> bool { |
447 | EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); | 448 | EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); |
448 | if (!buffer.isValid()) { | 449 | if (!buffer.isValid()) { |
449 | Warning() << "Read invalid buffer from disk"; | 450 | SinkWarning() << "Read invalid buffer from disk"; |
450 | } else { | 451 | } else { |
451 | const auto metadata = flatbuffers::GetRoot<Metadata>(buffer.metadataBuffer()); | 452 | const auto metadata = flatbuffers::GetRoot<Metadata>(buffer.metadataBuffer()); |
452 | const qint64 rev = metadata->revision(); | 453 | const qint64 rev = metadata->revision(); |
@@ -459,7 +460,7 @@ void Pipeline::cleanupRevision(qint64 revision) | |||
459 | 460 | ||
460 | return true; | 461 | return true; |
461 | }, | 462 | }, |
462 | [](const Storage::Error &error) { Warning() << "Error while reading: " << error.message; }, true); | 463 | [](const Storage::Error &error) { SinkWarning() << "Error while reading: " << error.message; }, true); |
463 | Storage::setCleanedUpRevision(d->transaction, revision); | 464 | Storage::setCleanedUpRevision(d->transaction, revision); |
464 | } | 465 | } |
465 | 466 | ||
@@ -481,7 +482,6 @@ Preprocessor::Preprocessor() : d(new Preprocessor::Private) | |||
481 | 482 | ||
482 | Preprocessor::~Preprocessor() | 483 | Preprocessor::~Preprocessor() |
483 | { | 484 | { |
484 | delete d; | ||
485 | } | 485 | } |
486 | 486 | ||
487 | void Preprocessor::setup(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier, Pipeline *pipeline) | 487 | void Preprocessor::setup(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier, Pipeline *pipeline) |