diff options
Diffstat (limited to 'common/pipeline.cpp')
-rw-r--r-- | common/pipeline.cpp | 169 |
1 files changed, 107 insertions, 62 deletions
diff --git a/common/pipeline.cpp b/common/pipeline.cpp index 887b6b3..019784e 100644 --- a/common/pipeline.cpp +++ b/common/pipeline.cpp | |||
@@ -175,13 +175,14 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size) | |||
175 | auto o = Sink::ApplicationDomain::ApplicationDomainType{d->resourceContext.instanceId(), key, revision, memoryAdaptor}; | 175 | auto o = Sink::ApplicationDomain::ApplicationDomainType{d->resourceContext.instanceId(), key, revision, memoryAdaptor}; |
176 | o.setChangedProperties(o.availableProperties().toSet()); | 176 | o.setChangedProperties(o.availableProperties().toSet()); |
177 | 177 | ||
178 | auto preprocess = [&, this](ApplicationDomain::ApplicationDomainType &newEntity) { | 178 | auto newEntity = *ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<ApplicationDomain::ApplicationDomainType>(o, o.availableProperties()); |
179 | foreach (const auto &processor, d->processors[bufferType]) { | 179 | newEntity.setChangedProperties(newEntity.availableProperties().toSet()); |
180 | processor->newEntity(newEntity); | 180 | |
181 | } | 181 | foreach (const auto &processor, d->processors[bufferType]) { |
182 | }; | 182 | processor->newEntity(newEntity); |
183 | } | ||
183 | 184 | ||
184 | if (!d->entityStore.add(bufferType, o, replayToSource, preprocess)) { | 185 | if (!d->entityStore.add(bufferType, o, replayToSource)) { |
185 | return KAsync::error<qint64>(0); | 186 | return KAsync::error<qint64>(0); |
186 | } | 187 | } |
187 | 188 | ||
@@ -195,6 +196,11 @@ struct CreateHelper { | |||
195 | } | 196 | } |
196 | }; | 197 | }; |
197 | 198 | ||
199 | static KAsync::Job<void> create(const QByteArray &type, const ApplicationDomain::ApplicationDomainType &newEntity) | ||
200 | { | ||
201 | return TypeHelper<CreateHelper>{type}.operator()<KAsync::Job<void>, const ApplicationDomain::ApplicationDomainType&>(newEntity); | ||
202 | } | ||
203 | |||
198 | KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size) | 204 | KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size) |
199 | { | 205 | { |
200 | d->transactionItemCount++; | 206 | d->transactionItemCount++; |
@@ -248,65 +254,71 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size) | |||
248 | deletions = BufferUtils::fromVector(*modifyEntity->deletions()); | 254 | deletions = BufferUtils::fromVector(*modifyEntity->deletions()); |
249 | } | 255 | } |
250 | 256 | ||
251 | if (modifyEntity->targetResource()) { | 257 | const auto current = d->entityStore.readLatest(bufferType, diff.identifier()); |
252 | auto isMove = modifyEntity->removeEntity(); | 258 | if (current.identifier().isEmpty()) { |
253 | auto targetResource = BufferUtils::extractBuffer(modifyEntity->targetResource()); | 259 | SinkWarningCtx(d->logCtx) << "Failed to read current version: " << diff.identifier(); |
254 | auto changeset = diff.changedProperties(); | 260 | return KAsync::error<qint64>(0); |
255 | const auto current = d->entityStore.readLatest(bufferType, diff.identifier()); | 261 | } |
256 | if (current.identifier().isEmpty()) { | ||
257 | SinkWarningCtx(d->logCtx) << "Failed to read current version: " << diff.identifier(); | ||
258 | return KAsync::error<qint64>(0); | ||
259 | } | ||
260 | 262 | ||
261 | auto newEntity = *ApplicationDomain::ApplicationDomainType::getInMemoryCopy<ApplicationDomain::ApplicationDomainType>(current, current.availableProperties()); | 263 | auto newEntity = d->entityStore.applyDiff(bufferType, current, diff, deletions); |
262 | 264 | ||
263 | // Apply diff | 265 | bool isMove = false; |
264 | for (const auto &property : changeset) { | 266 | if (modifyEntity->targetResource()) { |
265 | const auto value = diff.getProperty(property); | 267 | isMove = modifyEntity->removeEntity(); |
266 | if (value.isValid()) { | 268 | newEntity.setResource(BufferUtils::extractBuffer(modifyEntity->targetResource())); |
267 | newEntity.setProperty(property, value); | 269 | } |
268 | } | ||
269 | } | ||
270 | 270 | ||
271 | // Remove deletions | 271 | foreach (const auto &processor, d->processors[bufferType]) { |
272 | for (const auto &property : deletions) { | 272 | bool exitLoop = false; |
273 | newEntity.setProperty(property, QVariant()); | 273 | const auto result = processor->processModification(Preprocessor::Modification, current, newEntity); |
274 | switch (result.action) { | ||
275 | case Preprocessor::MoveToResource: | ||
276 | isMove = true; | ||
277 | exitLoop = true; | ||
278 | break; | ||
279 | case Preprocessor::CopyToResource: | ||
280 | isMove = true; | ||
281 | exitLoop = true; | ||
282 | break; | ||
283 | case Preprocessor::DropModification: | ||
284 | SinkTraceCtx(d->logCtx) << "Dropping modification"; | ||
285 | return KAsync::error<qint64>(0); | ||
286 | default: | ||
287 | break; | ||
274 | } | 288 | } |
275 | newEntity.setResource(targetResource); | 289 | if (exitLoop) { |
276 | newEntity.setChangedProperties(newEntity.availableProperties().toSet()); | 290 | break; |
291 | } | ||
292 | } | ||
277 | 293 | ||
278 | SinkTraceCtx(d->logCtx) << "Moving entity to new resource " << newEntity.identifier() << newEntity.resourceInstanceIdentifier() << targetResource; | 294 | //The entity is either being copied or moved |
279 | auto job = TypeHelper<CreateHelper>{bufferType}.operator()<KAsync::Job<void>, ApplicationDomain::ApplicationDomainType&>(newEntity); | 295 | if (newEntity.resourceInstanceIdentifier() != d->resourceContext.resourceInstanceIdentifier) { |
280 | job = job.then([this, current, isMove, targetResource, bufferType](const KAsync::Error &error) { | 296 | SinkTraceCtx(d->logCtx) << "Moving entity to new resource " << newEntity.identifier() << newEntity.resourceInstanceIdentifier(); |
281 | if (!error) { | 297 | newEntity.setChangedProperties(newEntity.availableProperties().toSet()); |
282 | SinkTraceCtx(d->logCtx) << "Move of " << current.identifier() << "was successfull"; | 298 | return create(bufferType, newEntity) |
283 | if (isMove) { | 299 | .then([=](const KAsync::Error &error) { |
284 | startTransaction(); | 300 | if (!error) { |
285 | flatbuffers::FlatBufferBuilder fbb; | 301 | SinkTraceCtx(d->logCtx) << "Move of " << current.identifier() << "was successfull"; |
286 | auto entityId = fbb.CreateString(current.identifier()); | 302 | if (isMove) { |
287 | auto type = fbb.CreateString(bufferType); | 303 | flatbuffers::FlatBufferBuilder fbb; |
288 | auto location = Sink::Commands::CreateDeleteEntity(fbb, current.revision(), entityId, type, true); | 304 | auto entityId = fbb.CreateString(current.identifier()); |
289 | Sink::Commands::FinishDeleteEntityBuffer(fbb, location); | 305 | auto type = fbb.CreateString(bufferType); |
290 | const auto data = BufferUtils::extractBuffer(fbb); | 306 | auto location = Sink::Commands::CreateDeleteEntity(fbb, current.revision(), entityId, type, true); |
291 | deletedEntity(data, data.size()).exec(); | 307 | Sink::Commands::FinishDeleteEntityBuffer(fbb, location); |
292 | commit(); | 308 | const auto data = BufferUtils::extractBuffer(fbb); |
309 | deletedEntity(data, data.size()).exec(); | ||
310 | } | ||
311 | } else { | ||
312 | SinkErrorCtx(d->logCtx) << "Failed to move entity " << newEntity.identifier() << " to resource " << newEntity.resourceInstanceIdentifier(); | ||
293 | } | 313 | } |
294 | } else { | 314 | }) |
295 | SinkErrorCtx(d->logCtx) << "Failed to move entity " << targetResource << " to resource " << current.identifier(); | 315 | .then([this] { |
296 | } | 316 | return d->entityStore.maxRevision(); |
297 | }); | 317 | }); |
298 | job.exec(); | ||
299 | return KAsync::value<qint64>(0); | ||
300 | } | 318 | } |
301 | 319 | ||
302 | auto preprocess = [&, this](const ApplicationDomain::ApplicationDomainType &oldEntity, ApplicationDomain::ApplicationDomainType &newEntity) { | ||
303 | foreach (const auto &processor, d->processors[bufferType]) { | ||
304 | processor->modifiedEntity(oldEntity, newEntity); | ||
305 | } | ||
306 | }; | ||
307 | |||
308 | d->revisionChanged = true; | 320 | d->revisionChanged = true; |
309 | if (!d->entityStore.modify(bufferType, diff, deletions, replayToSource, preprocess)) { | 321 | if (!d->entityStore.modify(bufferType, current, newEntity, replayToSource)) { |
310 | return KAsync::error<qint64>(0); | 322 | return KAsync::error<qint64>(0); |
311 | } | 323 | } |
312 | 324 | ||
@@ -331,14 +343,14 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size) | |||
331 | const QByteArray key = QByteArray(reinterpret_cast<char const *>(deleteEntity->entityId()->Data()), deleteEntity->entityId()->size()); | 343 | const QByteArray key = QByteArray(reinterpret_cast<char const *>(deleteEntity->entityId()->Data()), deleteEntity->entityId()->size()); |
332 | SinkTraceCtx(d->logCtx) << "Deleted Entity. Type: " << bufferType << "uid: "<< key << " replayToSource: " << replayToSource; | 344 | SinkTraceCtx(d->logCtx) << "Deleted Entity. Type: " << bufferType << "uid: "<< key << " replayToSource: " << replayToSource; |
333 | 345 | ||
334 | auto preprocess = [&, this](const ApplicationDomain::ApplicationDomainType &oldEntity) { | 346 | const auto current = d->entityStore.readLatest(bufferType, key); |
335 | foreach (const auto &processor, d->processors[bufferType]) { | 347 | |
336 | processor->deletedEntity(oldEntity); | 348 | foreach (const auto &processor, d->processors[bufferType]) { |
337 | } | 349 | processor->deletedEntity(current); |
338 | }; | 350 | } |
339 | 351 | ||
340 | d->revisionChanged = true; | 352 | d->revisionChanged = true; |
341 | if (!d->entityStore.remove(bufferType, key, replayToSource, preprocess)) { | 353 | if (!d->entityStore.remove(bufferType, current, replayToSource)) { |
342 | return KAsync::error<qint64>(0); | 354 | return KAsync::error<qint64>(0); |
343 | } | 355 | } |
344 | 356 | ||
@@ -385,6 +397,39 @@ void Preprocessor::finalizeBatch() | |||
385 | { | 397 | { |
386 | } | 398 | } |
387 | 399 | ||
400 | void Preprocessor::newEntity(ApplicationDomain::ApplicationDomainType &newEntity) | ||
401 | { | ||
402 | |||
403 | } | ||
404 | |||
405 | void Preprocessor::modifiedEntity(const ApplicationDomain::ApplicationDomainType &oldEntity, ApplicationDomain::ApplicationDomainType &newEntity) | ||
406 | { | ||
407 | |||
408 | } | ||
409 | |||
410 | void Preprocessor::deletedEntity(const ApplicationDomain::ApplicationDomainType &oldEntity) | ||
411 | { | ||
412 | |||
413 | } | ||
414 | |||
415 | Preprocessor::Result Preprocessor::processModification(Type type, const ApplicationDomain::ApplicationDomainType ¤t, ApplicationDomain::ApplicationDomainType &diff) | ||
416 | { | ||
417 | switch(type) { | ||
418 | case Creation: | ||
419 | newEntity(diff); | ||
420 | return {NoAction}; | ||
421 | case Modification: | ||
422 | modifiedEntity(current, diff); | ||
423 | return {NoAction}; | ||
424 | case Deletion: | ||
425 | deletedEntity(current); | ||
426 | return {NoAction}; | ||
427 | default: | ||
428 | break; | ||
429 | } | ||
430 | return {NoAction}; | ||
431 | } | ||
432 | |||
388 | QByteArray Preprocessor::resourceInstanceIdentifier() const | 433 | QByteArray Preprocessor::resourceInstanceIdentifier() const |
389 | { | 434 | { |
390 | return d->resourceInstanceIdentifier; | 435 | return d->resourceInstanceIdentifier; |