summaryrefslogtreecommitdiffstats
path: root/common/pipeline.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'common/pipeline.cpp')
-rw-r--r--common/pipeline.cpp93
1 files changed, 43 insertions, 50 deletions
diff --git a/common/pipeline.cpp b/common/pipeline.cpp
index 7f836c4..15ed5fc 100644
--- a/common/pipeline.cpp
+++ b/common/pipeline.cpp
@@ -195,6 +195,11 @@ struct CreateHelper {
195 } 195 }
196}; 196};
197 197
198static KAsync::Job<void> create(const QByteArray &type, const ApplicationDomain::ApplicationDomainType &newEntity)
199{
200 return TypeHelper<CreateHelper>{type}.operator()<KAsync::Job<void>, const ApplicationDomain::ApplicationDomainType&>(newEntity);
201}
202
198KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size) 203KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size)
199{ 204{
200 d->transactionItemCount++; 205 d->transactionItemCount++;
@@ -248,64 +253,52 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size)
248 deletions = BufferUtils::fromVector(*modifyEntity->deletions()); 253 deletions = BufferUtils::fromVector(*modifyEntity->deletions());
249 } 254 }
250 255
251 if (modifyEntity->targetResource()) { 256 const auto current = d->entityStore.readLatest(bufferType, diff.identifier());
252 auto isMove = modifyEntity->removeEntity(); 257 if (current.identifier().isEmpty()) {
253 auto targetResource = BufferUtils::extractBuffer(modifyEntity->targetResource()); 258 SinkWarningCtx(d->logCtx) << "Failed to read current version: " << diff.identifier();
254 auto changeset = diff.changedProperties(); 259 return KAsync::error<qint64>(0);
255 const auto current = d->entityStore.readLatest(bufferType, diff.identifier()); 260 }
256 if (current.identifier().isEmpty()) {
257 SinkWarningCtx(d->logCtx) << "Failed to read current version: " << diff.identifier();
258 return KAsync::error<qint64>(0);
259 }
260 261
261 auto newEntity = *ApplicationDomain::ApplicationDomainType::getInMemoryCopy<ApplicationDomain::ApplicationDomainType>(current, current.availableProperties()); 262 auto newEntity = d->entityStore.applyDiff(bufferType, current, diff, deletions);
262 263
263 // Apply diff 264 bool isMove = false;
264 for (const auto &property : changeset) { 265 if (modifyEntity->targetResource()) {
265 const auto value = diff.getProperty(property); 266 isMove = modifyEntity->removeEntity();
266 if (value.isValid()) { 267 newEntity.setResource(BufferUtils::extractBuffer(modifyEntity->targetResource()));
267 newEntity.setProperty(property, value); 268 }
268 }
269 }
270 269
271 // Remove deletions 270 foreach (const auto &processor, d->processors[bufferType]) {
272 for (const auto &property : deletions) { 271 processor->modifiedEntity(current, newEntity);
273 newEntity.setProperty(property, QVariant()); 272 }
274 }
275 newEntity.setResource(targetResource);
276 newEntity.setChangedProperties(newEntity.availableProperties().toSet());
277 273
278 SinkTraceCtx(d->logCtx) << "Moving entity to new resource " << newEntity.identifier() << newEntity.resourceInstanceIdentifier() << targetResource; 274 //The entity is either being copied or moved
279 auto job = TypeHelper<CreateHelper>{bufferType}.operator()<KAsync::Job<void>, ApplicationDomain::ApplicationDomainType&>(newEntity); 275 if (newEntity.resourceInstanceIdentifier() != d->resourceContext.resourceInstanceIdentifier) {
280 job = job.then([this, current, isMove, targetResource, bufferType](const KAsync::Error &error) { 276 SinkTraceCtx(d->logCtx) << "Moving entity to new resource " << newEntity.identifier() << newEntity.resourceInstanceIdentifier();
281 if (!error) { 277 newEntity.setChangedProperties(newEntity.availableProperties().toSet());
282 SinkTraceCtx(d->logCtx) << "Move of " << current.identifier() << "was successfull"; 278 return create(bufferType, newEntity)
283 if (isMove) { 279 .then([=](const KAsync::Error &error) {
284 flatbuffers::FlatBufferBuilder fbb; 280 if (!error) {
285 auto entityId = fbb.CreateString(current.identifier()); 281 SinkTraceCtx(d->logCtx) << "Move of " << current.identifier() << "was successfull";
286 auto type = fbb.CreateString(bufferType); 282 if (isMove) {
287 auto location = Sink::Commands::CreateDeleteEntity(fbb, current.revision(), entityId, type, true); 283 flatbuffers::FlatBufferBuilder fbb;
288 Sink::Commands::FinishDeleteEntityBuffer(fbb, location); 284 auto entityId = fbb.CreateString(current.identifier());
289 const auto data = BufferUtils::extractBuffer(fbb); 285 auto type = fbb.CreateString(bufferType);
290 deletedEntity(data, data.size()).exec(); 286 auto location = Sink::Commands::CreateDeleteEntity(fbb, current.revision(), entityId, type, true);
287 Sink::Commands::FinishDeleteEntityBuffer(fbb, location);
288 const auto data = BufferUtils::extractBuffer(fbb);
289 deletedEntity(data, data.size()).exec();
290 }
291 } else {
292 SinkErrorCtx(d->logCtx) << "Failed to move entity " << newEntity.identifier() << " to resource " << newEntity.resourceInstanceIdentifier();
291 } 293 }
292 } else { 294 })
293 SinkErrorCtx(d->logCtx) << "Failed to move entity " << targetResource << " to resource " << current.identifier(); 295 .then([this] {
294 } 296 return d->entityStore.maxRevision();
295 }); 297 });
296 return job.then([this] {
297 return d->entityStore.maxRevision();
298 });
299 } 298 }
300 299
301 auto preprocess = [&, this](const ApplicationDomain::ApplicationDomainType &oldEntity, ApplicationDomain::ApplicationDomainType &newEntity) {
302 foreach (const auto &processor, d->processors[bufferType]) {
303 processor->modifiedEntity(oldEntity, newEntity);
304 }
305 };
306
307 d->revisionChanged = true; 300 d->revisionChanged = true;
308 if (!d->entityStore.modify(bufferType, diff, deletions, replayToSource, preprocess)) { 301 if (!d->entityStore.modify(bufferType, current, newEntity, replayToSource)) {
309 return KAsync::error<qint64>(0); 302 return KAsync::error<qint64>(0);
310 } 303 }
311 304