From 39b3b6c7ff99f18e8719b28d748ec63adf76808d Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Mon, 10 Apr 2017 16:11:45 +0200 Subject: Don't continue processing the pipeline until we have appended the message. Otherwise the processor might think its done before it actually is. --- common/pipeline.cpp | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) (limited to 'common/pipeline.cpp') diff --git a/common/pipeline.cpp b/common/pipeline.cpp index 887b6b3..7f836c4 100644 --- a/common/pipeline.cpp +++ b/common/pipeline.cpp @@ -281,7 +281,6 @@ KAsync::Job Pipeline::modifiedEntity(void const *command, size_t size) if (!error) { SinkTraceCtx(d->logCtx) << "Move of " << current.identifier() << "was successfull"; if (isMove) { - startTransaction(); flatbuffers::FlatBufferBuilder fbb; auto entityId = fbb.CreateString(current.identifier()); auto type = fbb.CreateString(bufferType); @@ -289,14 +288,14 @@ KAsync::Job Pipeline::modifiedEntity(void const *command, size_t size) Sink::Commands::FinishDeleteEntityBuffer(fbb, location); const auto data = BufferUtils::extractBuffer(fbb); deletedEntity(data, data.size()).exec(); - commit(); } } else { SinkErrorCtx(d->logCtx) << "Failed to move entity " << targetResource << " to resource " << current.identifier(); } }); - job.exec(); - return KAsync::value(0); + return job.then([this] { + return d->entityStore.maxRevision(); + }); } auto preprocess = [&, this](const ApplicationDomain::ApplicationDomainType &oldEntity, ApplicationDomain::ApplicationDomainType &newEntity) { -- cgit v1.2.3