diff options
Diffstat (limited to 'common/pipeline.cpp')
-rw-r--r-- | common/pipeline.cpp | 93 |
1 files changed, 43 insertions, 50 deletions
diff --git a/common/pipeline.cpp b/common/pipeline.cpp index 7f836c4..15ed5fc 100644 --- a/common/pipeline.cpp +++ b/common/pipeline.cpp | |||
@@ -195,6 +195,11 @@ struct CreateHelper { | |||
195 | } | 195 | } |
196 | }; | 196 | }; |
197 | 197 | ||
198 | static KAsync::Job<void> create(const QByteArray &type, const ApplicationDomain::ApplicationDomainType &newEntity) | ||
199 | { | ||
200 | return TypeHelper<CreateHelper>{type}.operator()<KAsync::Job<void>, const ApplicationDomain::ApplicationDomainType&>(newEntity); | ||
201 | } | ||
202 | |||
198 | KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size) | 203 | KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size) |
199 | { | 204 | { |
200 | d->transactionItemCount++; | 205 | d->transactionItemCount++; |
@@ -248,64 +253,52 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size) | |||
248 | deletions = BufferUtils::fromVector(*modifyEntity->deletions()); | 253 | deletions = BufferUtils::fromVector(*modifyEntity->deletions()); |
249 | } | 254 | } |
250 | 255 | ||
251 | if (modifyEntity->targetResource()) { | 256 | const auto current = d->entityStore.readLatest(bufferType, diff.identifier()); |
252 | auto isMove = modifyEntity->removeEntity(); | 257 | if (current.identifier().isEmpty()) { |
253 | auto targetResource = BufferUtils::extractBuffer(modifyEntity->targetResource()); | 258 | SinkWarningCtx(d->logCtx) << "Failed to read current version: " << diff.identifier(); |
254 | auto changeset = diff.changedProperties(); | 259 | return KAsync::error<qint64>(0); |
255 | const auto current = d->entityStore.readLatest(bufferType, diff.identifier()); | 260 | } |
256 | if (current.identifier().isEmpty()) { | ||
257 | SinkWarningCtx(d->logCtx) << "Failed to read current version: " << diff.identifier(); | ||
258 | return KAsync::error<qint64>(0); | ||
259 | } | ||
260 | 261 | ||
261 | auto newEntity = *ApplicationDomain::ApplicationDomainType::getInMemoryCopy<ApplicationDomain::ApplicationDomainType>(current, current.availableProperties()); | 262 | auto newEntity = d->entityStore.applyDiff(bufferType, current, diff, deletions); |
262 | 263 | ||
263 | // Apply diff | 264 | bool isMove = false; |
264 | for (const auto &property : changeset) { | 265 | if (modifyEntity->targetResource()) { |
265 | const auto value = diff.getProperty(property); | 266 | isMove = modifyEntity->removeEntity(); |
266 | if (value.isValid()) { | 267 | newEntity.setResource(BufferUtils::extractBuffer(modifyEntity->targetResource())); |
267 | newEntity.setProperty(property, value); | 268 | } |
268 | } | ||
269 | } | ||
270 | 269 | ||
271 | // Remove deletions | 270 | foreach (const auto &processor, d->processors[bufferType]) { |
272 | for (const auto &property : deletions) { | 271 | processor->modifiedEntity(current, newEntity); |
273 | newEntity.setProperty(property, QVariant()); | 272 | } |
274 | } | ||
275 | newEntity.setResource(targetResource); | ||
276 | newEntity.setChangedProperties(newEntity.availableProperties().toSet()); | ||
277 | 273 | ||
278 | SinkTraceCtx(d->logCtx) << "Moving entity to new resource " << newEntity.identifier() << newEntity.resourceInstanceIdentifier() << targetResource; | 274 | //The entity is either being copied or moved |
279 | auto job = TypeHelper<CreateHelper>{bufferType}.operator()<KAsync::Job<void>, ApplicationDomain::ApplicationDomainType&>(newEntity); | 275 | if (newEntity.resourceInstanceIdentifier() != d->resourceContext.resourceInstanceIdentifier) { |
280 | job = job.then([this, current, isMove, targetResource, bufferType](const KAsync::Error &error) { | 276 | SinkTraceCtx(d->logCtx) << "Moving entity to new resource " << newEntity.identifier() << newEntity.resourceInstanceIdentifier(); |
281 | if (!error) { | 277 | newEntity.setChangedProperties(newEntity.availableProperties().toSet()); |
282 | SinkTraceCtx(d->logCtx) << "Move of " << current.identifier() << "was successfull"; | 278 | return create(bufferType, newEntity) |
283 | if (isMove) { | 279 | .then([=](const KAsync::Error &error) { |
284 | flatbuffers::FlatBufferBuilder fbb; | 280 | if (!error) { |
285 | auto entityId = fbb.CreateString(current.identifier()); | 281 | SinkTraceCtx(d->logCtx) << "Move of " << current.identifier() << "was successfull"; |
286 | auto type = fbb.CreateString(bufferType); | 282 | if (isMove) { |
287 | auto location = Sink::Commands::CreateDeleteEntity(fbb, current.revision(), entityId, type, true); | 283 | flatbuffers::FlatBufferBuilder fbb; |
288 | Sink::Commands::FinishDeleteEntityBuffer(fbb, location); | 284 | auto entityId = fbb.CreateString(current.identifier()); |
289 | const auto data = BufferUtils::extractBuffer(fbb); | 285 | auto type = fbb.CreateString(bufferType); |
290 | deletedEntity(data, data.size()).exec(); | 286 | auto location = Sink::Commands::CreateDeleteEntity(fbb, current.revision(), entityId, type, true); |
287 | Sink::Commands::FinishDeleteEntityBuffer(fbb, location); | ||
288 | const auto data = BufferUtils::extractBuffer(fbb); | ||
289 | deletedEntity(data, data.size()).exec(); | ||
290 | } | ||
291 | } else { | ||
292 | SinkErrorCtx(d->logCtx) << "Failed to move entity " << newEntity.identifier() << " to resource " << newEntity.resourceInstanceIdentifier(); | ||
291 | } | 293 | } |
292 | } else { | 294 | }) |
293 | SinkErrorCtx(d->logCtx) << "Failed to move entity " << targetResource << " to resource " << current.identifier(); | 295 | .then([this] { |
294 | } | 296 | return d->entityStore.maxRevision(); |
295 | }); | 297 | }); |
296 | return job.then([this] { | ||
297 | return d->entityStore.maxRevision(); | ||
298 | }); | ||
299 | } | 298 | } |
300 | 299 | ||
301 | auto preprocess = [&, this](const ApplicationDomain::ApplicationDomainType &oldEntity, ApplicationDomain::ApplicationDomainType &newEntity) { | ||
302 | foreach (const auto &processor, d->processors[bufferType]) { | ||
303 | processor->modifiedEntity(oldEntity, newEntity); | ||
304 | } | ||
305 | }; | ||
306 | |||
307 | d->revisionChanged = true; | 300 | d->revisionChanged = true; |
308 | if (!d->entityStore.modify(bufferType, diff, deletions, replayToSource, preprocess)) { | 301 | if (!d->entityStore.modify(bufferType, current, newEntity, replayToSource)) { |
309 | return KAsync::error<qint64>(0); | 302 | return KAsync::error<qint64>(0); |
310 | } | 303 | } |
311 | 304 | ||