diff options
author | Christian Mollekopf <chrigi_1@fastmail.fm> | 2017-04-10 16:11:45 +0200 |
---|---|---|
committer | Christian Mollekopf <chrigi_1@fastmail.fm> | 2017-04-10 16:11:45 +0200 |
commit | 39b3b6c7ff99f18e8719b28d748ec63adf76808d (patch) | |
tree | 7002af8378b35490046acec8be6ccfbaed1d7eee /common/pipeline.cpp | |
parent | 7890b7fcb4ffdfc570e306983787bc884bf0f62b (diff) | |
download | sink-39b3b6c7ff99f18e8719b28d748ec63adf76808d.tar.gz sink-39b3b6c7ff99f18e8719b28d748ec63adf76808d.zip |
Don't continue processing the pipeline until we have appended the
message.
Otherwise the processor might think its done before it actually is.
Diffstat (limited to 'common/pipeline.cpp')
-rw-r--r-- | common/pipeline.cpp | 7 |
1 files changed, 3 insertions, 4 deletions
diff --git a/common/pipeline.cpp b/common/pipeline.cpp index 887b6b3..7f836c4 100644 --- a/common/pipeline.cpp +++ b/common/pipeline.cpp | |||
@@ -281,7 +281,6 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size) | |||
281 | if (!error) { | 281 | if (!error) { |
282 | SinkTraceCtx(d->logCtx) << "Move of " << current.identifier() << "was successfull"; | 282 | SinkTraceCtx(d->logCtx) << "Move of " << current.identifier() << "was successfull"; |
283 | if (isMove) { | 283 | if (isMove) { |
284 | startTransaction(); | ||
285 | flatbuffers::FlatBufferBuilder fbb; | 284 | flatbuffers::FlatBufferBuilder fbb; |
286 | auto entityId = fbb.CreateString(current.identifier()); | 285 | auto entityId = fbb.CreateString(current.identifier()); |
287 | auto type = fbb.CreateString(bufferType); | 286 | auto type = fbb.CreateString(bufferType); |
@@ -289,14 +288,14 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size) | |||
289 | Sink::Commands::FinishDeleteEntityBuffer(fbb, location); | 288 | Sink::Commands::FinishDeleteEntityBuffer(fbb, location); |
290 | const auto data = BufferUtils::extractBuffer(fbb); | 289 | const auto data = BufferUtils::extractBuffer(fbb); |
291 | deletedEntity(data, data.size()).exec(); | 290 | deletedEntity(data, data.size()).exec(); |
292 | commit(); | ||
293 | } | 291 | } |
294 | } else { | 292 | } else { |
295 | SinkErrorCtx(d->logCtx) << "Failed to move entity " << targetResource << " to resource " << current.identifier(); | 293 | SinkErrorCtx(d->logCtx) << "Failed to move entity " << targetResource << " to resource " << current.identifier(); |
296 | } | 294 | } |
297 | }); | 295 | }); |
298 | job.exec(); | 296 | return job.then([this] { |
299 | return KAsync::value<qint64>(0); | 297 | return d->entityStore.maxRevision(); |
298 | }); | ||
300 | } | 299 | } |
301 | 300 | ||
302 | auto preprocess = [&, this](const ApplicationDomain::ApplicationDomainType &oldEntity, ApplicationDomain::ApplicationDomainType &newEntity) { | 301 | auto preprocess = [&, this](const ApplicationDomain::ApplicationDomainType &oldEntity, ApplicationDomain::ApplicationDomainType &newEntity) { |