diff options
Diffstat (limited to 'common/pipeline.cpp')
-rw-r--r-- | common/pipeline.cpp | 247 |
1 files changed, 119 insertions, 128 deletions
diff --git a/common/pipeline.cpp b/common/pipeline.cpp index 35e582b..65a2f5b 100644 --- a/common/pipeline.cpp +++ b/common/pipeline.cpp | |||
@@ -40,21 +40,18 @@ | |||
40 | #undef DEBUG_AREA | 40 | #undef DEBUG_AREA |
41 | #define DEBUG_AREA "resource.pipeline" | 41 | #define DEBUG_AREA "resource.pipeline" |
42 | 42 | ||
43 | namespace Sink | 43 | namespace Sink { |
44 | { | ||
45 | 44 | ||
46 | class Pipeline::Private | 45 | class Pipeline::Private |
47 | { | 46 | { |
48 | public: | 47 | public: |
49 | Private(const QString &resourceName) | 48 | Private(const QString &resourceName) : storage(Sink::storageLocation(), resourceName, Storage::ReadWrite), revisionChanged(false) |
50 | : storage(Sink::storageLocation(), resourceName, Storage::ReadWrite), | ||
51 | revisionChanged(false) | ||
52 | { | 49 | { |
53 | } | 50 | } |
54 | 51 | ||
55 | Storage storage; | 52 | Storage storage; |
56 | Storage::Transaction transaction; | 53 | Storage::Transaction transaction; |
57 | QHash<QString, QVector<Preprocessor *> > processors; | 54 | QHash<QString, QVector<Preprocessor *>> processors; |
58 | QHash<QString, DomainTypeAdaptorFactoryInterface::Ptr> adaptorFactory; | 55 | QHash<QString, DomainTypeAdaptorFactoryInterface::Ptr> adaptorFactory; |
59 | bool revisionChanged; | 56 | bool revisionChanged; |
60 | void storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid); | 57 | void storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid); |
@@ -64,20 +61,16 @@ public: | |||
64 | 61 | ||
65 | void Pipeline::Private::storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid) | 62 | void Pipeline::Private::storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid) |
66 | { | 63 | { |
67 | Storage::mainDatabase(transaction, bufferType).write(Storage::assembleKey(uid, newRevision), BufferUtils::extractBuffer(fbb), | 64 | Storage::mainDatabase(transaction, bufferType) |
68 | [uid, newRevision](const Storage::Error &error) { | 65 | .write(Storage::assembleKey(uid, newRevision), BufferUtils::extractBuffer(fbb), |
69 | Warning() << "Failed to write entity" << uid << newRevision; | 66 | [uid, newRevision](const Storage::Error &error) { Warning() << "Failed to write entity" << uid << newRevision; }); |
70 | } | ||
71 | ); | ||
72 | revisionChanged = true; | 67 | revisionChanged = true; |
73 | Storage::setMaxRevision(transaction, newRevision); | 68 | Storage::setMaxRevision(transaction, newRevision); |
74 | Storage::recordRevision(transaction, newRevision, uid, bufferType); | 69 | Storage::recordRevision(transaction, newRevision, uid, bufferType); |
75 | } | 70 | } |
76 | 71 | ||
77 | 72 | ||
78 | Pipeline::Pipeline(const QString &resourceName, QObject *parent) | 73 | Pipeline::Pipeline(const QString &resourceName, QObject *parent) : QObject(parent), d(new Private(resourceName)) |
79 | : QObject(parent), | ||
80 | d(new Private(resourceName)) | ||
81 | { | 74 | { |
82 | } | 75 | } |
83 | 76 | ||
@@ -98,8 +91,8 @@ void Pipeline::setAdaptorFactory(const QString &entityType, DomainTypeAdaptorFac | |||
98 | 91 | ||
99 | void Pipeline::startTransaction() | 92 | void Pipeline::startTransaction() |
100 | { | 93 | { |
101 | //TODO call for all types | 94 | // TODO call for all types |
102 | //But avoid doing it during cleanup | 95 | // But avoid doing it during cleanup |
103 | // for (auto processor : d->processors[bufferType]) { | 96 | // for (auto processor : d->processors[bufferType]) { |
104 | // processor->startBatch(); | 97 | // processor->startBatch(); |
105 | // } | 98 | // } |
@@ -114,14 +107,15 @@ void Pipeline::startTransaction() | |||
114 | 107 | ||
115 | void Pipeline::commit() | 108 | void Pipeline::commit() |
116 | { | 109 | { |
117 | //TODO call for all types | 110 | // TODO call for all types |
118 | //But avoid doing it during cleanup | 111 | // But avoid doing it during cleanup |
119 | // for (auto processor : d->processors[bufferType]) { | 112 | // for (auto processor : d->processors[bufferType]) { |
120 | // processor->finalize(); | 113 | // processor->finalize(); |
121 | // } | 114 | // } |
122 | const auto revision = Storage::maxRevision(d->transaction); | 115 | const auto revision = Storage::maxRevision(d->transaction); |
123 | const auto elapsed = d->transactionTime.elapsed(); | 116 | const auto elapsed = d->transactionTime.elapsed(); |
124 | Trace() << "Committing revision: " << revision << ":" << d->transactionItemCount << " items in: " << Log::TraceTime(elapsed) << " " << (double)elapsed/(double)qMax(d->transactionItemCount, 1) << "[ms/item]"; | 117 | Trace() << "Committing revision: " << revision << ":" << d->transactionItemCount << " items in: " << Log::TraceTime(elapsed) << " " |
118 | << (double)elapsed / (double)qMax(d->transactionItemCount, 1) << "[ms/item]"; | ||
125 | if (d->transaction) { | 119 | if (d->transaction) { |
126 | d->transaction.commit(); | 120 | d->transaction.commit(); |
127 | } | 121 | } |
@@ -157,7 +151,7 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size) | |||
157 | auto createEntity = Commands::GetCreateEntity(command); | 151 | auto createEntity = Commands::GetCreateEntity(command); |
158 | 152 | ||
159 | const bool replayToSource = createEntity->replayToSource(); | 153 | const bool replayToSource = createEntity->replayToSource(); |
160 | const QByteArray bufferType = QByteArray(reinterpret_cast<char const*>(createEntity->domainType()->Data()), createEntity->domainType()->size()); | 154 | const QByteArray bufferType = QByteArray(reinterpret_cast<char const *>(createEntity->domainType()->Data()), createEntity->domainType()->size()); |
161 | { | 155 | { |
162 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(createEntity->delta()->Data()), createEntity->delta()->size()); | 156 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(createEntity->delta()->Data()), createEntity->delta()->size()); |
163 | if (!VerifyEntityBuffer(verifyer)) { | 157 | if (!VerifyEntityBuffer(verifyer)) { |
@@ -173,7 +167,7 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size) | |||
173 | 167 | ||
174 | QByteArray key; | 168 | QByteArray key; |
175 | if (createEntity->entityId()) { | 169 | if (createEntity->entityId()) { |
176 | key = QByteArray(reinterpret_cast<char const*>(createEntity->entityId()->Data()), createEntity->entityId()->size()); | 170 | key = QByteArray(reinterpret_cast<char const *>(createEntity->entityId()->Data()), createEntity->entityId()->size()); |
177 | if (Storage::mainDatabase(d->transaction, bufferType).contains(key)) { | 171 | if (Storage::mainDatabase(d->transaction, bufferType).contains(key)) { |
178 | ErrorMsg() << "An entity with this id already exists: " << key; | 172 | ErrorMsg() << "An entity with this id already exists: " << key; |
179 | return KAsync::error<qint64>(0); | 173 | return KAsync::error<qint64>(0); |
@@ -186,7 +180,7 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size) | |||
186 | Q_ASSERT(!key.isEmpty()); | 180 | Q_ASSERT(!key.isEmpty()); |
187 | const qint64 newRevision = Storage::maxRevision(d->transaction) + 1; | 181 | const qint64 newRevision = Storage::maxRevision(d->transaction) + 1; |
188 | 182 | ||
189 | //Add metadata buffer | 183 | // Add metadata buffer |
190 | flatbuffers::FlatBufferBuilder metadataFbb; | 184 | flatbuffers::FlatBufferBuilder metadataFbb; |
191 | auto metadataBuilder = MetadataBuilder(metadataFbb); | 185 | auto metadataBuilder = MetadataBuilder(metadataFbb); |
192 | metadataBuilder.add_revision(newRevision); | 186 | metadataBuilder.add_revision(newRevision); |
@@ -196,7 +190,8 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size) | |||
196 | FinishMetadataBuffer(metadataFbb, metadataBuffer); | 190 | FinishMetadataBuffer(metadataFbb, metadataBuffer); |
197 | 191 | ||
198 | flatbuffers::FlatBufferBuilder fbb; | 192 | flatbuffers::FlatBufferBuilder fbb; |
199 | EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), entity->resource()->Data(), entity->resource()->size(), entity->local()->Data(), entity->local()->size()); | 193 | EntityBuffer::assembleEntityBuffer( |
194 | fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), entity->resource()->Data(), entity->resource()->size(), entity->local()->Data(), entity->local()->size()); | ||
200 | 195 | ||
201 | d->storeNewRevision(newRevision, fbb, bufferType, key); | 196 | d->storeNewRevision(newRevision, fbb, bufferType, key); |
202 | 197 | ||
@@ -207,20 +202,19 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size) | |||
207 | } | 202 | } |
208 | 203 | ||
209 | Log() << "Pipeline: wrote entity: " << key << newRevision << bufferType; | 204 | Log() << "Pipeline: wrote entity: " << key << newRevision << bufferType; |
210 | Storage::mainDatabase(d->transaction, bufferType).scan(Storage::assembleKey(key, newRevision), [this, bufferType, newRevision, adaptorFactory, key](const QByteArray &, const QByteArray &value) -> bool { | 205 | Storage::mainDatabase(d->transaction, bufferType) |
211 | auto entity = GetEntity(value); | 206 | .scan(Storage::assembleKey(key, newRevision), |
212 | Q_ASSERT(entity->resource() || entity->local()); | 207 | [this, bufferType, newRevision, adaptorFactory, key](const QByteArray &, const QByteArray &value) -> bool { |
213 | auto adaptor = adaptorFactory->createAdaptor(*entity); | 208 | auto entity = GetEntity(value); |
214 | for (auto processor : d->processors[bufferType]) { | 209 | Q_ASSERT(entity->resource() || entity->local()); |
215 | processor->newEntity(key, newRevision, *adaptor, d->transaction); | 210 | auto adaptor = adaptorFactory->createAdaptor(*entity); |
216 | } | 211 | for (auto processor : d->processors[bufferType]) { |
217 | return false; | 212 | processor->newEntity(key, newRevision, *adaptor, d->transaction); |
218 | }, [this](const Storage::Error &error) { | 213 | } |
219 | ErrorMsg() << "Failed to find value in pipeline: " << error.message; | 214 | return false; |
220 | }); | 215 | }, |
221 | return KAsync::start<qint64>([newRevision](){ | 216 | [this](const Storage::Error &error) { ErrorMsg() << "Failed to find value in pipeline: " << error.message; }); |
222 | return newRevision; | 217 | return KAsync::start<qint64>([newRevision]() { return newRevision; }); |
223 | }); | ||
224 | } | 218 | } |
225 | 219 | ||
226 | KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size) | 220 | KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size) |
@@ -242,9 +236,9 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size) | |||
242 | 236 | ||
243 | const qint64 baseRevision = modifyEntity->revision(); | 237 | const qint64 baseRevision = modifyEntity->revision(); |
244 | const bool replayToSource = modifyEntity->replayToSource(); | 238 | const bool replayToSource = modifyEntity->replayToSource(); |
245 | //TODO rename modifyEntity->domainType to bufferType | 239 | // TODO rename modifyEntity->domainType to bufferType |
246 | const QByteArray bufferType = QByteArray(reinterpret_cast<char const*>(modifyEntity->domainType()->Data()), modifyEntity->domainType()->size()); | 240 | const QByteArray bufferType = QByteArray(reinterpret_cast<char const *>(modifyEntity->domainType()->Data()), modifyEntity->domainType()->size()); |
247 | const QByteArray key = QByteArray(reinterpret_cast<char const*>(modifyEntity->entityId()->Data()), modifyEntity->entityId()->size()); | 241 | const QByteArray key = QByteArray(reinterpret_cast<char const *>(modifyEntity->entityId()->Data()), modifyEntity->entityId()->size()); |
248 | if (bufferType.isEmpty() || key.isEmpty()) { | 242 | if (bufferType.isEmpty() || key.isEmpty()) { |
249 | Warning() << "entity type or key " << bufferType << key; | 243 | Warning() << "entity type or key " << bufferType << key; |
250 | return KAsync::error<qint64>(0); | 244 | return KAsync::error<qint64>(0); |
@@ -257,7 +251,7 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size) | |||
257 | } | 251 | } |
258 | } | 252 | } |
259 | 253 | ||
260 | //TODO use only readPropertyMapper and writePropertyMapper | 254 | // TODO use only readPropertyMapper and writePropertyMapper |
261 | auto adaptorFactory = d->adaptorFactory.value(bufferType); | 255 | auto adaptorFactory = d->adaptorFactory.value(bufferType); |
262 | if (!adaptorFactory) { | 256 | if (!adaptorFactory) { |
263 | Warning() << "no adaptor factory for type " << bufferType; | 257 | Warning() << "no adaptor factory for type " << bufferType; |
@@ -269,30 +263,30 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size) | |||
269 | auto diff = adaptorFactory->createAdaptor(*diffEntity); | 263 | auto diff = adaptorFactory->createAdaptor(*diffEntity); |
270 | 264 | ||
271 | QSharedPointer<ApplicationDomain::BufferAdaptor> current; | 265 | QSharedPointer<ApplicationDomain::BufferAdaptor> current; |
272 | Storage::mainDatabase(d->transaction, bufferType).findLatest(key, [¤t, adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool { | 266 | Storage::mainDatabase(d->transaction, bufferType) |
273 | EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); | 267 | .findLatest(key, |
274 | if (!buffer.isValid()) { | 268 | [¤t, adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool { |
275 | Warning() << "Read invalid buffer from disk"; | 269 | EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); |
276 | } else { | 270 | if (!buffer.isValid()) { |
277 | current = adaptorFactory->createAdaptor(buffer.entity()); | 271 | Warning() << "Read invalid buffer from disk"; |
278 | } | 272 | } else { |
279 | return false; | 273 | current = adaptorFactory->createAdaptor(buffer.entity()); |
280 | }, | 274 | } |
281 | [baseRevision](const Storage::Error &error) { | 275 | return false; |
282 | Warning() << "Failed to read old revision from storage: " << error.message << "Revision: " << baseRevision; | 276 | }, |
283 | }); | 277 | [baseRevision](const Storage::Error &error) { Warning() << "Failed to read old revision from storage: " << error.message << "Revision: " << baseRevision; }); |
284 | 278 | ||
285 | if (!current) { | 279 | if (!current) { |
286 | Warning() << "Failed to read local value " << key; | 280 | Warning() << "Failed to read local value " << key; |
287 | return KAsync::error<qint64>(0); | 281 | return KAsync::error<qint64>(0); |
288 | } | 282 | } |
289 | 283 | ||
290 | //resource and uid don't matter at this point | 284 | // resource and uid don't matter at this point |
291 | const ApplicationDomain::ApplicationDomainType existingObject("", "", newRevision, current); | 285 | const ApplicationDomain::ApplicationDomainType existingObject("", "", newRevision, current); |
292 | auto newObject = ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<ApplicationDomain::ApplicationDomainType>(existingObject); | 286 | auto newObject = ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<ApplicationDomain::ApplicationDomainType>(existingObject); |
293 | 287 | ||
294 | //Apply diff | 288 | // Apply diff |
295 | //FIXME only apply the properties that are available in the buffer | 289 | // FIXME only apply the properties that are available in the buffer |
296 | Trace() << "Applying changed properties: " << diff->availableProperties(); | 290 | Trace() << "Applying changed properties: " << diff->availableProperties(); |
297 | QSet<QByteArray> changeset; | 291 | QSet<QByteArray> changeset; |
298 | for (const auto &property : diff->availableProperties()) { | 292 | for (const auto &property : diff->availableProperties()) { |
@@ -302,17 +296,17 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size) | |||
302 | newObject->setProperty(property, value); | 296 | newObject->setProperty(property, value); |
303 | } | 297 | } |
304 | } | 298 | } |
305 | //Altough we only set some properties, we want all to be serialized | 299 | // Altough we only set some properties, we want all to be serialized |
306 | newObject->setChangedProperties(changeset); | 300 | newObject->setChangedProperties(changeset); |
307 | 301 | ||
308 | //Remove deletions | 302 | // Remove deletions |
309 | if (modifyEntity->deletions()) { | 303 | if (modifyEntity->deletions()) { |
310 | for (const auto &property : *modifyEntity->deletions()) { | 304 | for (const auto &property : *modifyEntity->deletions()) { |
311 | newObject->setProperty(BufferUtils::extractBuffer(property), QVariant()); | 305 | newObject->setProperty(BufferUtils::extractBuffer(property), QVariant()); |
312 | } | 306 | } |
313 | } | 307 | } |
314 | 308 | ||
315 | //Add metadata buffer | 309 | // Add metadata buffer |
316 | flatbuffers::FlatBufferBuilder metadataFbb; | 310 | flatbuffers::FlatBufferBuilder metadataFbb; |
317 | auto metadataBuilder = MetadataBuilder(metadataFbb); | 311 | auto metadataBuilder = MetadataBuilder(metadataFbb); |
318 | metadataBuilder.add_revision(newRevision); | 312 | metadataBuilder.add_revision(newRevision); |
@@ -326,22 +320,21 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size) | |||
326 | 320 | ||
327 | d->storeNewRevision(newRevision, fbb, bufferType, key); | 321 | d->storeNewRevision(newRevision, fbb, bufferType, key); |
328 | Log() << "Pipeline: modified entity: " << key << newRevision << bufferType; | 322 | Log() << "Pipeline: modified entity: " << key << newRevision << bufferType; |
329 | Storage::mainDatabase(d->transaction, bufferType).scan(Storage::assembleKey(key, newRevision), [this, bufferType, newRevision, adaptorFactory, current, key](const QByteArray &k, const QByteArray &value) -> bool { | 323 | Storage::mainDatabase(d->transaction, bufferType) |
330 | if (value.isEmpty()) { | 324 | .scan(Storage::assembleKey(key, newRevision), |
331 | ErrorMsg() << "Read buffer is empty."; | 325 | [this, bufferType, newRevision, adaptorFactory, current, key](const QByteArray &k, const QByteArray &value) -> bool { |
332 | } | 326 | if (value.isEmpty()) { |
333 | auto entity = GetEntity(value.data()); | 327 | ErrorMsg() << "Read buffer is empty."; |
334 | auto newEntity = adaptorFactory->createAdaptor(*entity); | 328 | } |
335 | for (auto processor : d->processors[bufferType]) { | 329 | auto entity = GetEntity(value.data()); |
336 | processor->modifiedEntity(key, newRevision, *current, *newEntity, d->transaction); | 330 | auto newEntity = adaptorFactory->createAdaptor(*entity); |
337 | } | 331 | for (auto processor : d->processors[bufferType]) { |
338 | return false; | 332 | processor->modifiedEntity(key, newRevision, *current, *newEntity, d->transaction); |
339 | }, [this](const Storage::Error &error) { | 333 | } |
340 | ErrorMsg() << "Failed to find value in pipeline: " << error.message; | 334 | return false; |
341 | }); | 335 | }, |
342 | return KAsync::start<qint64>([newRevision](){ | 336 | [this](const Storage::Error &error) { ErrorMsg() << "Failed to find value in pipeline: " << error.message; }); |
343 | return newRevision; | 337 | return KAsync::start<qint64>([newRevision]() { return newRevision; }); |
344 | }); | ||
345 | } | 338 | } |
346 | 339 | ||
347 | KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size) | 340 | KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size) |
@@ -359,26 +352,25 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size) | |||
359 | auto deleteEntity = Commands::GetDeleteEntity(command); | 352 | auto deleteEntity = Commands::GetDeleteEntity(command); |
360 | 353 | ||
361 | const bool replayToSource = deleteEntity->replayToSource(); | 354 | const bool replayToSource = deleteEntity->replayToSource(); |
362 | const QByteArray bufferType = QByteArray(reinterpret_cast<char const*>(deleteEntity->domainType()->Data()), deleteEntity->domainType()->size()); | 355 | const QByteArray bufferType = QByteArray(reinterpret_cast<char const *>(deleteEntity->domainType()->Data()), deleteEntity->domainType()->size()); |
363 | const QByteArray key = QByteArray(reinterpret_cast<char const*>(deleteEntity->entityId()->Data()), deleteEntity->entityId()->size()); | 356 | const QByteArray key = QByteArray(reinterpret_cast<char const *>(deleteEntity->entityId()->Data()), deleteEntity->entityId()->size()); |
364 | 357 | ||
365 | bool found = false; | 358 | bool found = false; |
366 | bool alreadyRemoved = false; | 359 | bool alreadyRemoved = false; |
367 | Storage::mainDatabase(d->transaction, bufferType).findLatest(key, [&found, &alreadyRemoved](const QByteArray &key, const QByteArray &data) -> bool { | 360 | Storage::mainDatabase(d->transaction, bufferType) |
368 | auto entity = GetEntity(data.data()); | 361 | .findLatest(key, |
369 | if (entity && entity->metadata()) { | 362 | [&found, &alreadyRemoved](const QByteArray &key, const QByteArray &data) -> bool { |
370 | auto metadata = GetMetadata(entity->metadata()->Data()); | 363 | auto entity = GetEntity(data.data()); |
371 | found = true; | 364 | if (entity && entity->metadata()) { |
372 | if (metadata->operation() == Operation_Removal) { | 365 | auto metadata = GetMetadata(entity->metadata()->Data()); |
373 | alreadyRemoved = true; | 366 | found = true; |
374 | } | 367 | if (metadata->operation() == Operation_Removal) { |
375 | 368 | alreadyRemoved = true; | |
376 | } | 369 | } |
377 | return false; | 370 | } |
378 | }, | 371 | return false; |
379 | [](const Storage::Error &error) { | 372 | }, |
380 | Warning() << "Failed to read old revision from storage: " << error.message; | 373 | [](const Storage::Error &error) { Warning() << "Failed to read old revision from storage: " << error.message; }); |
381 | }); | ||
382 | 374 | ||
383 | if (!found) { | 375 | if (!found) { |
384 | Warning() << "Failed to find entity " << key; | 376 | Warning() << "Failed to find entity " << key; |
@@ -391,7 +383,7 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size) | |||
391 | 383 | ||
392 | const qint64 newRevision = Storage::maxRevision(d->transaction) + 1; | 384 | const qint64 newRevision = Storage::maxRevision(d->transaction) + 1; |
393 | 385 | ||
394 | //Add metadata buffer | 386 | // Add metadata buffer |
395 | flatbuffers::FlatBufferBuilder metadataFbb; | 387 | flatbuffers::FlatBufferBuilder metadataFbb; |
396 | auto metadataBuilder = MetadataBuilder(metadataFbb); | 388 | auto metadataBuilder = MetadataBuilder(metadataFbb); |
397 | metadataBuilder.add_revision(newRevision); | 389 | metadataBuilder.add_revision(newRevision); |
@@ -410,28 +402,27 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size) | |||
410 | } | 402 | } |
411 | 403 | ||
412 | QSharedPointer<ApplicationDomain::BufferAdaptor> current; | 404 | QSharedPointer<ApplicationDomain::BufferAdaptor> current; |
413 | Storage::mainDatabase(d->transaction, bufferType).findLatest(key, [this, bufferType, newRevision, adaptorFactory, key, ¤t](const QByteArray &, const QByteArray &data) -> bool { | 405 | Storage::mainDatabase(d->transaction, bufferType) |
414 | EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); | 406 | .findLatest(key, |
415 | if (!buffer.isValid()) { | 407 | [this, bufferType, newRevision, adaptorFactory, key, ¤t](const QByteArray &, const QByteArray &data) -> bool { |
416 | Warning() << "Read invalid buffer from disk"; | 408 | EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); |
417 | } else { | 409 | if (!buffer.isValid()) { |
418 | current = adaptorFactory->createAdaptor(buffer.entity()); | 410 | Warning() << "Read invalid buffer from disk"; |
419 | } | 411 | } else { |
420 | return false; | 412 | current = adaptorFactory->createAdaptor(buffer.entity()); |
421 | }, [this](const Storage::Error &error) { | 413 | } |
422 | ErrorMsg() << "Failed to find value in pipeline: " << error.message; | 414 | return false; |
423 | }); | 415 | }, |
416 | [this](const Storage::Error &error) { ErrorMsg() << "Failed to find value in pipeline: " << error.message; }); | ||
424 | 417 | ||
425 | d->storeNewRevision(newRevision, fbb, bufferType, key); | 418 | d->storeNewRevision(newRevision, fbb, bufferType, key); |
426 | Log() << "Pipeline: deleted entity: "<< newRevision; | 419 | Log() << "Pipeline: deleted entity: " << newRevision; |
427 | 420 | ||
428 | for (auto processor : d->processors[bufferType]) { | 421 | for (auto processor : d->processors[bufferType]) { |
429 | processor->deletedEntity(key, newRevision, *current, d->transaction); | 422 | processor->deletedEntity(key, newRevision, *current, d->transaction); |
430 | } | 423 | } |
431 | 424 | ||
432 | return KAsync::start<qint64>([newRevision](){ | 425 | return KAsync::start<qint64>([newRevision]() { return newRevision; }); |
433 | return newRevision; | ||
434 | }); | ||
435 | } | 426 | } |
436 | 427 | ||
437 | void Pipeline::cleanupRevision(qint64 revision) | 428 | void Pipeline::cleanupRevision(qint64 revision) |
@@ -439,24 +430,25 @@ void Pipeline::cleanupRevision(qint64 revision) | |||
439 | const auto uid = Storage::getUidFromRevision(d->transaction, revision); | 430 | const auto uid = Storage::getUidFromRevision(d->transaction, revision); |
440 | const auto bufferType = Storage::getTypeFromRevision(d->transaction, revision); | 431 | const auto bufferType = Storage::getTypeFromRevision(d->transaction, revision); |
441 | Trace() << "Cleaning up revision " << revision << uid << bufferType; | 432 | Trace() << "Cleaning up revision " << revision << uid << bufferType; |
442 | Storage::mainDatabase(d->transaction, bufferType).scan(uid, [&](const QByteArray &key, const QByteArray &data) -> bool { | 433 | Storage::mainDatabase(d->transaction, bufferType) |
443 | EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); | 434 | .scan(uid, |
444 | if (!buffer.isValid()) { | 435 | [&](const QByteArray &key, const QByteArray &data) -> bool { |
445 | Warning() << "Read invalid buffer from disk"; | 436 | EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); |
446 | } else { | 437 | if (!buffer.isValid()) { |
447 | const auto metadata = flatbuffers::GetRoot<Metadata>(buffer.metadataBuffer()); | 438 | Warning() << "Read invalid buffer from disk"; |
448 | const qint64 rev = metadata->revision(); | 439 | } else { |
449 | //Remove old revisions, and the current if the entity has already been removed | 440 | const auto metadata = flatbuffers::GetRoot<Metadata>(buffer.metadataBuffer()); |
450 | if (rev < revision || metadata->operation() == Operation_Removal) { | 441 | const qint64 rev = metadata->revision(); |
451 | Storage::removeRevision(d->transaction, rev); | 442 | // Remove old revisions, and the current if the entity has already been removed |
452 | Storage::mainDatabase(d->transaction, bufferType).remove(key); | 443 | if (rev < revision || metadata->operation() == Operation_Removal) { |
453 | } | 444 | Storage::removeRevision(d->transaction, rev); |
454 | } | 445 | Storage::mainDatabase(d->transaction, bufferType).remove(key); |
455 | 446 | } | |
456 | return true; | 447 | } |
457 | }, [](const Storage::Error &error) { | 448 | |
458 | Warning() << "Error while reading: " << error.message; | 449 | return true; |
459 | }, true); | 450 | }, |
451 | [](const Storage::Error &error) { Warning() << "Error while reading: " << error.message; }, true); | ||
460 | Storage::setCleanedUpRevision(d->transaction, revision); | 452 | Storage::setCleanedUpRevision(d->transaction, revision); |
461 | } | 453 | } |
462 | 454 | ||
@@ -465,8 +457,7 @@ qint64 Pipeline::cleanedUpRevision() | |||
465 | return Storage::cleanedUpRevision(d->transaction); | 457 | return Storage::cleanedUpRevision(d->transaction); |
466 | } | 458 | } |
467 | 459 | ||
468 | Preprocessor::Preprocessor() | 460 | Preprocessor::Preprocessor() : d(0) |
469 | : d(0) | ||
470 | { | 461 | { |
471 | } | 462 | } |
472 | 463 | ||