summaryrefslogtreecommitdiffstats
path: root/common/pipeline.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'common/pipeline.cpp')
-rw-r--r--common/pipeline.cpp247
1 files changed, 119 insertions, 128 deletions
diff --git a/common/pipeline.cpp b/common/pipeline.cpp
index 35e582b..65a2f5b 100644
--- a/common/pipeline.cpp
+++ b/common/pipeline.cpp
@@ -40,21 +40,18 @@
40#undef DEBUG_AREA 40#undef DEBUG_AREA
41#define DEBUG_AREA "resource.pipeline" 41#define DEBUG_AREA "resource.pipeline"
42 42
43namespace Sink 43namespace Sink {
44{
45 44
46class Pipeline::Private 45class Pipeline::Private
47{ 46{
48public: 47public:
49 Private(const QString &resourceName) 48 Private(const QString &resourceName) : storage(Sink::storageLocation(), resourceName, Storage::ReadWrite), revisionChanged(false)
50 : storage(Sink::storageLocation(), resourceName, Storage::ReadWrite),
51 revisionChanged(false)
52 { 49 {
53 } 50 }
54 51
55 Storage storage; 52 Storage storage;
56 Storage::Transaction transaction; 53 Storage::Transaction transaction;
57 QHash<QString, QVector<Preprocessor *> > processors; 54 QHash<QString, QVector<Preprocessor *>> processors;
58 QHash<QString, DomainTypeAdaptorFactoryInterface::Ptr> adaptorFactory; 55 QHash<QString, DomainTypeAdaptorFactoryInterface::Ptr> adaptorFactory;
59 bool revisionChanged; 56 bool revisionChanged;
60 void storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid); 57 void storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid);
@@ -64,20 +61,16 @@ public:
64 61
65void Pipeline::Private::storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid) 62void Pipeline::Private::storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid)
66{ 63{
67 Storage::mainDatabase(transaction, bufferType).write(Storage::assembleKey(uid, newRevision), BufferUtils::extractBuffer(fbb), 64 Storage::mainDatabase(transaction, bufferType)
68 [uid, newRevision](const Storage::Error &error) { 65 .write(Storage::assembleKey(uid, newRevision), BufferUtils::extractBuffer(fbb),
69 Warning() << "Failed to write entity" << uid << newRevision; 66 [uid, newRevision](const Storage::Error &error) { Warning() << "Failed to write entity" << uid << newRevision; });
70 }
71 );
72 revisionChanged = true; 67 revisionChanged = true;
73 Storage::setMaxRevision(transaction, newRevision); 68 Storage::setMaxRevision(transaction, newRevision);
74 Storage::recordRevision(transaction, newRevision, uid, bufferType); 69 Storage::recordRevision(transaction, newRevision, uid, bufferType);
75} 70}
76 71
77 72
78Pipeline::Pipeline(const QString &resourceName, QObject *parent) 73Pipeline::Pipeline(const QString &resourceName, QObject *parent) : QObject(parent), d(new Private(resourceName))
79 : QObject(parent),
80 d(new Private(resourceName))
81{ 74{
82} 75}
83 76
@@ -98,8 +91,8 @@ void Pipeline::setAdaptorFactory(const QString &entityType, DomainTypeAdaptorFac
98 91
99void Pipeline::startTransaction() 92void Pipeline::startTransaction()
100{ 93{
101 //TODO call for all types 94 // TODO call for all types
102 //But avoid doing it during cleanup 95 // But avoid doing it during cleanup
103 // for (auto processor : d->processors[bufferType]) { 96 // for (auto processor : d->processors[bufferType]) {
104 // processor->startBatch(); 97 // processor->startBatch();
105 // } 98 // }
@@ -114,14 +107,15 @@ void Pipeline::startTransaction()
114 107
115void Pipeline::commit() 108void Pipeline::commit()
116{ 109{
117 //TODO call for all types 110 // TODO call for all types
118 //But avoid doing it during cleanup 111 // But avoid doing it during cleanup
119 // for (auto processor : d->processors[bufferType]) { 112 // for (auto processor : d->processors[bufferType]) {
120 // processor->finalize(); 113 // processor->finalize();
121 // } 114 // }
122 const auto revision = Storage::maxRevision(d->transaction); 115 const auto revision = Storage::maxRevision(d->transaction);
123 const auto elapsed = d->transactionTime.elapsed(); 116 const auto elapsed = d->transactionTime.elapsed();
124 Trace() << "Committing revision: " << revision << ":" << d->transactionItemCount << " items in: " << Log::TraceTime(elapsed) << " " << (double)elapsed/(double)qMax(d->transactionItemCount, 1) << "[ms/item]"; 117 Trace() << "Committing revision: " << revision << ":" << d->transactionItemCount << " items in: " << Log::TraceTime(elapsed) << " "
118 << (double)elapsed / (double)qMax(d->transactionItemCount, 1) << "[ms/item]";
125 if (d->transaction) { 119 if (d->transaction) {
126 d->transaction.commit(); 120 d->transaction.commit();
127 } 121 }
@@ -157,7 +151,7 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size)
157 auto createEntity = Commands::GetCreateEntity(command); 151 auto createEntity = Commands::GetCreateEntity(command);
158 152
159 const bool replayToSource = createEntity->replayToSource(); 153 const bool replayToSource = createEntity->replayToSource();
160 const QByteArray bufferType = QByteArray(reinterpret_cast<char const*>(createEntity->domainType()->Data()), createEntity->domainType()->size()); 154 const QByteArray bufferType = QByteArray(reinterpret_cast<char const *>(createEntity->domainType()->Data()), createEntity->domainType()->size());
161 { 155 {
162 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(createEntity->delta()->Data()), createEntity->delta()->size()); 156 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(createEntity->delta()->Data()), createEntity->delta()->size());
163 if (!VerifyEntityBuffer(verifyer)) { 157 if (!VerifyEntityBuffer(verifyer)) {
@@ -173,7 +167,7 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size)
173 167
174 QByteArray key; 168 QByteArray key;
175 if (createEntity->entityId()) { 169 if (createEntity->entityId()) {
176 key = QByteArray(reinterpret_cast<char const*>(createEntity->entityId()->Data()), createEntity->entityId()->size()); 170 key = QByteArray(reinterpret_cast<char const *>(createEntity->entityId()->Data()), createEntity->entityId()->size());
177 if (Storage::mainDatabase(d->transaction, bufferType).contains(key)) { 171 if (Storage::mainDatabase(d->transaction, bufferType).contains(key)) {
178 ErrorMsg() << "An entity with this id already exists: " << key; 172 ErrorMsg() << "An entity with this id already exists: " << key;
179 return KAsync::error<qint64>(0); 173 return KAsync::error<qint64>(0);
@@ -186,7 +180,7 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size)
186 Q_ASSERT(!key.isEmpty()); 180 Q_ASSERT(!key.isEmpty());
187 const qint64 newRevision = Storage::maxRevision(d->transaction) + 1; 181 const qint64 newRevision = Storage::maxRevision(d->transaction) + 1;
188 182
189 //Add metadata buffer 183 // Add metadata buffer
190 flatbuffers::FlatBufferBuilder metadataFbb; 184 flatbuffers::FlatBufferBuilder metadataFbb;
191 auto metadataBuilder = MetadataBuilder(metadataFbb); 185 auto metadataBuilder = MetadataBuilder(metadataFbb);
192 metadataBuilder.add_revision(newRevision); 186 metadataBuilder.add_revision(newRevision);
@@ -196,7 +190,8 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size)
196 FinishMetadataBuffer(metadataFbb, metadataBuffer); 190 FinishMetadataBuffer(metadataFbb, metadataBuffer);
197 191
198 flatbuffers::FlatBufferBuilder fbb; 192 flatbuffers::FlatBufferBuilder fbb;
199 EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), entity->resource()->Data(), entity->resource()->size(), entity->local()->Data(), entity->local()->size()); 193 EntityBuffer::assembleEntityBuffer(
194 fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), entity->resource()->Data(), entity->resource()->size(), entity->local()->Data(), entity->local()->size());
200 195
201 d->storeNewRevision(newRevision, fbb, bufferType, key); 196 d->storeNewRevision(newRevision, fbb, bufferType, key);
202 197
@@ -207,20 +202,19 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size)
207 } 202 }
208 203
209 Log() << "Pipeline: wrote entity: " << key << newRevision << bufferType; 204 Log() << "Pipeline: wrote entity: " << key << newRevision << bufferType;
210 Storage::mainDatabase(d->transaction, bufferType).scan(Storage::assembleKey(key, newRevision), [this, bufferType, newRevision, adaptorFactory, key](const QByteArray &, const QByteArray &value) -> bool { 205 Storage::mainDatabase(d->transaction, bufferType)
211 auto entity = GetEntity(value); 206 .scan(Storage::assembleKey(key, newRevision),
212 Q_ASSERT(entity->resource() || entity->local()); 207 [this, bufferType, newRevision, adaptorFactory, key](const QByteArray &, const QByteArray &value) -> bool {
213 auto adaptor = adaptorFactory->createAdaptor(*entity); 208 auto entity = GetEntity(value);
214 for (auto processor : d->processors[bufferType]) { 209 Q_ASSERT(entity->resource() || entity->local());
215 processor->newEntity(key, newRevision, *adaptor, d->transaction); 210 auto adaptor = adaptorFactory->createAdaptor(*entity);
216 } 211 for (auto processor : d->processors[bufferType]) {
217 return false; 212 processor->newEntity(key, newRevision, *adaptor, d->transaction);
218 }, [this](const Storage::Error &error) { 213 }
219 ErrorMsg() << "Failed to find value in pipeline: " << error.message; 214 return false;
220 }); 215 },
221 return KAsync::start<qint64>([newRevision](){ 216 [this](const Storage::Error &error) { ErrorMsg() << "Failed to find value in pipeline: " << error.message; });
222 return newRevision; 217 return KAsync::start<qint64>([newRevision]() { return newRevision; });
223 });
224} 218}
225 219
226KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size) 220KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size)
@@ -242,9 +236,9 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size)
242 236
243 const qint64 baseRevision = modifyEntity->revision(); 237 const qint64 baseRevision = modifyEntity->revision();
244 const bool replayToSource = modifyEntity->replayToSource(); 238 const bool replayToSource = modifyEntity->replayToSource();
245 //TODO rename modifyEntity->domainType to bufferType 239 // TODO rename modifyEntity->domainType to bufferType
246 const QByteArray bufferType = QByteArray(reinterpret_cast<char const*>(modifyEntity->domainType()->Data()), modifyEntity->domainType()->size()); 240 const QByteArray bufferType = QByteArray(reinterpret_cast<char const *>(modifyEntity->domainType()->Data()), modifyEntity->domainType()->size());
247 const QByteArray key = QByteArray(reinterpret_cast<char const*>(modifyEntity->entityId()->Data()), modifyEntity->entityId()->size()); 241 const QByteArray key = QByteArray(reinterpret_cast<char const *>(modifyEntity->entityId()->Data()), modifyEntity->entityId()->size());
248 if (bufferType.isEmpty() || key.isEmpty()) { 242 if (bufferType.isEmpty() || key.isEmpty()) {
249 Warning() << "entity type or key " << bufferType << key; 243 Warning() << "entity type or key " << bufferType << key;
250 return KAsync::error<qint64>(0); 244 return KAsync::error<qint64>(0);
@@ -257,7 +251,7 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size)
257 } 251 }
258 } 252 }
259 253
260 //TODO use only readPropertyMapper and writePropertyMapper 254 // TODO use only readPropertyMapper and writePropertyMapper
261 auto adaptorFactory = d->adaptorFactory.value(bufferType); 255 auto adaptorFactory = d->adaptorFactory.value(bufferType);
262 if (!adaptorFactory) { 256 if (!adaptorFactory) {
263 Warning() << "no adaptor factory for type " << bufferType; 257 Warning() << "no adaptor factory for type " << bufferType;
@@ -269,30 +263,30 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size)
269 auto diff = adaptorFactory->createAdaptor(*diffEntity); 263 auto diff = adaptorFactory->createAdaptor(*diffEntity);
270 264
271 QSharedPointer<ApplicationDomain::BufferAdaptor> current; 265 QSharedPointer<ApplicationDomain::BufferAdaptor> current;
272 Storage::mainDatabase(d->transaction, bufferType).findLatest(key, [&current, adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool { 266 Storage::mainDatabase(d->transaction, bufferType)
273 EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); 267 .findLatest(key,
274 if (!buffer.isValid()) { 268 [&current, adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool {
275 Warning() << "Read invalid buffer from disk"; 269 EntityBuffer buffer(const_cast<const char *>(data.data()), data.size());
276 } else { 270 if (!buffer.isValid()) {
277 current = adaptorFactory->createAdaptor(buffer.entity()); 271 Warning() << "Read invalid buffer from disk";
278 } 272 } else {
279 return false; 273 current = adaptorFactory->createAdaptor(buffer.entity());
280 }, 274 }
281 [baseRevision](const Storage::Error &error) { 275 return false;
282 Warning() << "Failed to read old revision from storage: " << error.message << "Revision: " << baseRevision; 276 },
283 }); 277 [baseRevision](const Storage::Error &error) { Warning() << "Failed to read old revision from storage: " << error.message << "Revision: " << baseRevision; });
284 278
285 if (!current) { 279 if (!current) {
286 Warning() << "Failed to read local value " << key; 280 Warning() << "Failed to read local value " << key;
287 return KAsync::error<qint64>(0); 281 return KAsync::error<qint64>(0);
288 } 282 }
289 283
290 //resource and uid don't matter at this point 284 // resource and uid don't matter at this point
291 const ApplicationDomain::ApplicationDomainType existingObject("", "", newRevision, current); 285 const ApplicationDomain::ApplicationDomainType existingObject("", "", newRevision, current);
292 auto newObject = ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<ApplicationDomain::ApplicationDomainType>(existingObject); 286 auto newObject = ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<ApplicationDomain::ApplicationDomainType>(existingObject);
293 287
294 //Apply diff 288 // Apply diff
295 //FIXME only apply the properties that are available in the buffer 289 // FIXME only apply the properties that are available in the buffer
296 Trace() << "Applying changed properties: " << diff->availableProperties(); 290 Trace() << "Applying changed properties: " << diff->availableProperties();
297 QSet<QByteArray> changeset; 291 QSet<QByteArray> changeset;
298 for (const auto &property : diff->availableProperties()) { 292 for (const auto &property : diff->availableProperties()) {
@@ -302,17 +296,17 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size)
302 newObject->setProperty(property, value); 296 newObject->setProperty(property, value);
303 } 297 }
304 } 298 }
305 //Altough we only set some properties, we want all to be serialized 299 // Altough we only set some properties, we want all to be serialized
306 newObject->setChangedProperties(changeset); 300 newObject->setChangedProperties(changeset);
307 301
308 //Remove deletions 302 // Remove deletions
309 if (modifyEntity->deletions()) { 303 if (modifyEntity->deletions()) {
310 for (const auto &property : *modifyEntity->deletions()) { 304 for (const auto &property : *modifyEntity->deletions()) {
311 newObject->setProperty(BufferUtils::extractBuffer(property), QVariant()); 305 newObject->setProperty(BufferUtils::extractBuffer(property), QVariant());
312 } 306 }
313 } 307 }
314 308
315 //Add metadata buffer 309 // Add metadata buffer
316 flatbuffers::FlatBufferBuilder metadataFbb; 310 flatbuffers::FlatBufferBuilder metadataFbb;
317 auto metadataBuilder = MetadataBuilder(metadataFbb); 311 auto metadataBuilder = MetadataBuilder(metadataFbb);
318 metadataBuilder.add_revision(newRevision); 312 metadataBuilder.add_revision(newRevision);
@@ -326,22 +320,21 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size)
326 320
327 d->storeNewRevision(newRevision, fbb, bufferType, key); 321 d->storeNewRevision(newRevision, fbb, bufferType, key);
328 Log() << "Pipeline: modified entity: " << key << newRevision << bufferType; 322 Log() << "Pipeline: modified entity: " << key << newRevision << bufferType;
329 Storage::mainDatabase(d->transaction, bufferType).scan(Storage::assembleKey(key, newRevision), [this, bufferType, newRevision, adaptorFactory, current, key](const QByteArray &k, const QByteArray &value) -> bool { 323 Storage::mainDatabase(d->transaction, bufferType)
330 if (value.isEmpty()) { 324 .scan(Storage::assembleKey(key, newRevision),
331 ErrorMsg() << "Read buffer is empty."; 325 [this, bufferType, newRevision, adaptorFactory, current, key](const QByteArray &k, const QByteArray &value) -> bool {
332 } 326 if (value.isEmpty()) {
333 auto entity = GetEntity(value.data()); 327 ErrorMsg() << "Read buffer is empty.";
334 auto newEntity = adaptorFactory->createAdaptor(*entity); 328 }
335 for (auto processor : d->processors[bufferType]) { 329 auto entity = GetEntity(value.data());
336 processor->modifiedEntity(key, newRevision, *current, *newEntity, d->transaction); 330 auto newEntity = adaptorFactory->createAdaptor(*entity);
337 } 331 for (auto processor : d->processors[bufferType]) {
338 return false; 332 processor->modifiedEntity(key, newRevision, *current, *newEntity, d->transaction);
339 }, [this](const Storage::Error &error) { 333 }
340 ErrorMsg() << "Failed to find value in pipeline: " << error.message; 334 return false;
341 }); 335 },
342 return KAsync::start<qint64>([newRevision](){ 336 [this](const Storage::Error &error) { ErrorMsg() << "Failed to find value in pipeline: " << error.message; });
343 return newRevision; 337 return KAsync::start<qint64>([newRevision]() { return newRevision; });
344 });
345} 338}
346 339
347KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size) 340KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size)
@@ -359,26 +352,25 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size)
359 auto deleteEntity = Commands::GetDeleteEntity(command); 352 auto deleteEntity = Commands::GetDeleteEntity(command);
360 353
361 const bool replayToSource = deleteEntity->replayToSource(); 354 const bool replayToSource = deleteEntity->replayToSource();
362 const QByteArray bufferType = QByteArray(reinterpret_cast<char const*>(deleteEntity->domainType()->Data()), deleteEntity->domainType()->size()); 355 const QByteArray bufferType = QByteArray(reinterpret_cast<char const *>(deleteEntity->domainType()->Data()), deleteEntity->domainType()->size());
363 const QByteArray key = QByteArray(reinterpret_cast<char const*>(deleteEntity->entityId()->Data()), deleteEntity->entityId()->size()); 356 const QByteArray key = QByteArray(reinterpret_cast<char const *>(deleteEntity->entityId()->Data()), deleteEntity->entityId()->size());
364 357
365 bool found = false; 358 bool found = false;
366 bool alreadyRemoved = false; 359 bool alreadyRemoved = false;
367 Storage::mainDatabase(d->transaction, bufferType).findLatest(key, [&found, &alreadyRemoved](const QByteArray &key, const QByteArray &data) -> bool { 360 Storage::mainDatabase(d->transaction, bufferType)
368 auto entity = GetEntity(data.data()); 361 .findLatest(key,
369 if (entity && entity->metadata()) { 362 [&found, &alreadyRemoved](const QByteArray &key, const QByteArray &data) -> bool {
370 auto metadata = GetMetadata(entity->metadata()->Data()); 363 auto entity = GetEntity(data.data());
371 found = true; 364 if (entity && entity->metadata()) {
372 if (metadata->operation() == Operation_Removal) { 365 auto metadata = GetMetadata(entity->metadata()->Data());
373 alreadyRemoved = true; 366 found = true;
374 } 367 if (metadata->operation() == Operation_Removal) {
375 368 alreadyRemoved = true;
376 } 369 }
377 return false; 370 }
378 }, 371 return false;
379 [](const Storage::Error &error) { 372 },
380 Warning() << "Failed to read old revision from storage: " << error.message; 373 [](const Storage::Error &error) { Warning() << "Failed to read old revision from storage: " << error.message; });
381 });
382 374
383 if (!found) { 375 if (!found) {
384 Warning() << "Failed to find entity " << key; 376 Warning() << "Failed to find entity " << key;
@@ -391,7 +383,7 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size)
391 383
392 const qint64 newRevision = Storage::maxRevision(d->transaction) + 1; 384 const qint64 newRevision = Storage::maxRevision(d->transaction) + 1;
393 385
394 //Add metadata buffer 386 // Add metadata buffer
395 flatbuffers::FlatBufferBuilder metadataFbb; 387 flatbuffers::FlatBufferBuilder metadataFbb;
396 auto metadataBuilder = MetadataBuilder(metadataFbb); 388 auto metadataBuilder = MetadataBuilder(metadataFbb);
397 metadataBuilder.add_revision(newRevision); 389 metadataBuilder.add_revision(newRevision);
@@ -410,28 +402,27 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size)
410 } 402 }
411 403
412 QSharedPointer<ApplicationDomain::BufferAdaptor> current; 404 QSharedPointer<ApplicationDomain::BufferAdaptor> current;
413 Storage::mainDatabase(d->transaction, bufferType).findLatest(key, [this, bufferType, newRevision, adaptorFactory, key, &current](const QByteArray &, const QByteArray &data) -> bool { 405 Storage::mainDatabase(d->transaction, bufferType)
414 EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); 406 .findLatest(key,
415 if (!buffer.isValid()) { 407 [this, bufferType, newRevision, adaptorFactory, key, &current](const QByteArray &, const QByteArray &data) -> bool {
416 Warning() << "Read invalid buffer from disk"; 408 EntityBuffer buffer(const_cast<const char *>(data.data()), data.size());
417 } else { 409 if (!buffer.isValid()) {
418 current = adaptorFactory->createAdaptor(buffer.entity()); 410 Warning() << "Read invalid buffer from disk";
419 } 411 } else {
420 return false; 412 current = adaptorFactory->createAdaptor(buffer.entity());
421 }, [this](const Storage::Error &error) { 413 }
422 ErrorMsg() << "Failed to find value in pipeline: " << error.message; 414 return false;
423 }); 415 },
416 [this](const Storage::Error &error) { ErrorMsg() << "Failed to find value in pipeline: " << error.message; });
424 417
425 d->storeNewRevision(newRevision, fbb, bufferType, key); 418 d->storeNewRevision(newRevision, fbb, bufferType, key);
426 Log() << "Pipeline: deleted entity: "<< newRevision; 419 Log() << "Pipeline: deleted entity: " << newRevision;
427 420
428 for (auto processor : d->processors[bufferType]) { 421 for (auto processor : d->processors[bufferType]) {
429 processor->deletedEntity(key, newRevision, *current, d->transaction); 422 processor->deletedEntity(key, newRevision, *current, d->transaction);
430 } 423 }
431 424
432 return KAsync::start<qint64>([newRevision](){ 425 return KAsync::start<qint64>([newRevision]() { return newRevision; });
433 return newRevision;
434 });
435} 426}
436 427
437void Pipeline::cleanupRevision(qint64 revision) 428void Pipeline::cleanupRevision(qint64 revision)
@@ -439,24 +430,25 @@ void Pipeline::cleanupRevision(qint64 revision)
439 const auto uid = Storage::getUidFromRevision(d->transaction, revision); 430 const auto uid = Storage::getUidFromRevision(d->transaction, revision);
440 const auto bufferType = Storage::getTypeFromRevision(d->transaction, revision); 431 const auto bufferType = Storage::getTypeFromRevision(d->transaction, revision);
441 Trace() << "Cleaning up revision " << revision << uid << bufferType; 432 Trace() << "Cleaning up revision " << revision << uid << bufferType;
442 Storage::mainDatabase(d->transaction, bufferType).scan(uid, [&](const QByteArray &key, const QByteArray &data) -> bool { 433 Storage::mainDatabase(d->transaction, bufferType)
443 EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); 434 .scan(uid,
444 if (!buffer.isValid()) { 435 [&](const QByteArray &key, const QByteArray &data) -> bool {
445 Warning() << "Read invalid buffer from disk"; 436 EntityBuffer buffer(const_cast<const char *>(data.data()), data.size());
446 } else { 437 if (!buffer.isValid()) {
447 const auto metadata = flatbuffers::GetRoot<Metadata>(buffer.metadataBuffer()); 438 Warning() << "Read invalid buffer from disk";
448 const qint64 rev = metadata->revision(); 439 } else {
449 //Remove old revisions, and the current if the entity has already been removed 440 const auto metadata = flatbuffers::GetRoot<Metadata>(buffer.metadataBuffer());
450 if (rev < revision || metadata->operation() == Operation_Removal) { 441 const qint64 rev = metadata->revision();
451 Storage::removeRevision(d->transaction, rev); 442 // Remove old revisions, and the current if the entity has already been removed
452 Storage::mainDatabase(d->transaction, bufferType).remove(key); 443 if (rev < revision || metadata->operation() == Operation_Removal) {
453 } 444 Storage::removeRevision(d->transaction, rev);
454 } 445 Storage::mainDatabase(d->transaction, bufferType).remove(key);
455 446 }
456 return true; 447 }
457 }, [](const Storage::Error &error) { 448
458 Warning() << "Error while reading: " << error.message; 449 return true;
459 }, true); 450 },
451 [](const Storage::Error &error) { Warning() << "Error while reading: " << error.message; }, true);
460 Storage::setCleanedUpRevision(d->transaction, revision); 452 Storage::setCleanedUpRevision(d->transaction, revision);
461} 453}
462 454
@@ -465,8 +457,7 @@ qint64 Pipeline::cleanedUpRevision()
465 return Storage::cleanedUpRevision(d->transaction); 457 return Storage::cleanedUpRevision(d->transaction);
466} 458}
467 459
468Preprocessor::Preprocessor() 460Preprocessor::Preprocessor() : d(0)
469 : d(0)
470{ 461{
471} 462}
472 463