diff options
author | Christian Mollekopf <chrigi_1@fastmail.fm> | 2017-04-11 15:16:26 +0200 |
---|---|---|
committer | Christian Mollekopf <chrigi_1@fastmail.fm> | 2017-04-11 15:16:26 +0200 |
commit | f6c3c144e60611d2da7ba7aa5b115affe92a57a4 (patch) | |
tree | 4e4b01382d7c2893d4d1d14489506e3b0066fce9 /common/pipeline.cpp | |
parent | adb11fd81404b9ab3b01975ed93babe12a22dee4 (diff) | |
download | sink-f6c3c144e60611d2da7ba7aa5b115affe92a57a4.tar.gz sink-f6c3c144e60611d2da7ba7aa5b115affe92a57a4.zip |
Move the preprocssing back out of entitystore into the pipeline.
This is where this really belongs, only the indexing is part of storage.
This is necessary so preprocessors can move entities as well.
Diffstat (limited to 'common/pipeline.cpp')
-rw-r--r-- | common/pipeline.cpp | 93 |
1 files changed, 43 insertions, 50 deletions
diff --git a/common/pipeline.cpp b/common/pipeline.cpp index 7f836c4..15ed5fc 100644 --- a/common/pipeline.cpp +++ b/common/pipeline.cpp | |||
@@ -195,6 +195,11 @@ struct CreateHelper { | |||
195 | } | 195 | } |
196 | }; | 196 | }; |
197 | 197 | ||
198 | static KAsync::Job<void> create(const QByteArray &type, const ApplicationDomain::ApplicationDomainType &newEntity) | ||
199 | { | ||
200 | return TypeHelper<CreateHelper>{type}.operator()<KAsync::Job<void>, const ApplicationDomain::ApplicationDomainType&>(newEntity); | ||
201 | } | ||
202 | |||
198 | KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size) | 203 | KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size) |
199 | { | 204 | { |
200 | d->transactionItemCount++; | 205 | d->transactionItemCount++; |
@@ -248,64 +253,52 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size) | |||
248 | deletions = BufferUtils::fromVector(*modifyEntity->deletions()); | 253 | deletions = BufferUtils::fromVector(*modifyEntity->deletions()); |
249 | } | 254 | } |
250 | 255 | ||
251 | if (modifyEntity->targetResource()) { | 256 | const auto current = d->entityStore.readLatest(bufferType, diff.identifier()); |
252 | auto isMove = modifyEntity->removeEntity(); | 257 | if (current.identifier().isEmpty()) { |
253 | auto targetResource = BufferUtils::extractBuffer(modifyEntity->targetResource()); | 258 | SinkWarningCtx(d->logCtx) << "Failed to read current version: " << diff.identifier(); |
254 | auto changeset = diff.changedProperties(); | 259 | return KAsync::error<qint64>(0); |
255 | const auto current = d->entityStore.readLatest(bufferType, diff.identifier()); | 260 | } |
256 | if (current.identifier().isEmpty()) { | ||
257 | SinkWarningCtx(d->logCtx) << "Failed to read current version: " << diff.identifier(); | ||
258 | return KAsync::error<qint64>(0); | ||
259 | } | ||
260 | 261 | ||
261 | auto newEntity = *ApplicationDomain::ApplicationDomainType::getInMemoryCopy<ApplicationDomain::ApplicationDomainType>(current, current.availableProperties()); | 262 | auto newEntity = d->entityStore.applyDiff(bufferType, current, diff, deletions); |
262 | 263 | ||
263 | // Apply diff | 264 | bool isMove = false; |
264 | for (const auto &property : changeset) { | 265 | if (modifyEntity->targetResource()) { |
265 | const auto value = diff.getProperty(property); | 266 | isMove = modifyEntity->removeEntity(); |
266 | if (value.isValid()) { | 267 | newEntity.setResource(BufferUtils::extractBuffer(modifyEntity->targetResource())); |
267 | newEntity.setProperty(property, value); | 268 | } |
268 | } | ||
269 | } | ||
270 | 269 | ||
271 | // Remove deletions | 270 | foreach (const auto &processor, d->processors[bufferType]) { |
272 | for (const auto &property : deletions) { | 271 | processor->modifiedEntity(current, newEntity); |
273 | newEntity.setProperty(property, QVariant()); | 272 | } |
274 | } | ||
275 | newEntity.setResource(targetResource); | ||
276 | newEntity.setChangedProperties(newEntity.availableProperties().toSet()); | ||
277 | 273 | ||
278 | SinkTraceCtx(d->logCtx) << "Moving entity to new resource " << newEntity.identifier() << newEntity.resourceInstanceIdentifier() << targetResource; | 274 | //The entity is either being copied or moved |
279 | auto job = TypeHelper<CreateHelper>{bufferType}.operator()<KAsync::Job<void>, ApplicationDomain::ApplicationDomainType&>(newEntity); | 275 | if (newEntity.resourceInstanceIdentifier() != d->resourceContext.resourceInstanceIdentifier) { |
280 | job = job.then([this, current, isMove, targetResource, bufferType](const KAsync::Error &error) { | 276 | SinkTraceCtx(d->logCtx) << "Moving entity to new resource " << newEntity.identifier() << newEntity.resourceInstanceIdentifier(); |
281 | if (!error) { | 277 | newEntity.setChangedProperties(newEntity.availableProperties().toSet()); |
282 | SinkTraceCtx(d->logCtx) << "Move of " << current.identifier() << "was successfull"; | 278 | return create(bufferType, newEntity) |
283 | if (isMove) { | 279 | .then([=](const KAsync::Error &error) { |
284 | flatbuffers::FlatBufferBuilder fbb; | 280 | if (!error) { |
285 | auto entityId = fbb.CreateString(current.identifier()); | 281 | SinkTraceCtx(d->logCtx) << "Move of " << current.identifier() << "was successfull"; |
286 | auto type = fbb.CreateString(bufferType); | 282 | if (isMove) { |
287 | auto location = Sink::Commands::CreateDeleteEntity(fbb, current.revision(), entityId, type, true); | 283 | flatbuffers::FlatBufferBuilder fbb; |
288 | Sink::Commands::FinishDeleteEntityBuffer(fbb, location); | 284 | auto entityId = fbb.CreateString(current.identifier()); |
289 | const auto data = BufferUtils::extractBuffer(fbb); | 285 | auto type = fbb.CreateString(bufferType); |
290 | deletedEntity(data, data.size()).exec(); | 286 | auto location = Sink::Commands::CreateDeleteEntity(fbb, current.revision(), entityId, type, true); |
287 | Sink::Commands::FinishDeleteEntityBuffer(fbb, location); | ||
288 | const auto data = BufferUtils::extractBuffer(fbb); | ||
289 | deletedEntity(data, data.size()).exec(); | ||
290 | } | ||
291 | } else { | ||
292 | SinkErrorCtx(d->logCtx) << "Failed to move entity " << newEntity.identifier() << " to resource " << newEntity.resourceInstanceIdentifier(); | ||
291 | } | 293 | } |
292 | } else { | 294 | }) |
293 | SinkErrorCtx(d->logCtx) << "Failed to move entity " << targetResource << " to resource " << current.identifier(); | 295 | .then([this] { |
294 | } | 296 | return d->entityStore.maxRevision(); |
295 | }); | 297 | }); |
296 | return job.then([this] { | ||
297 | return d->entityStore.maxRevision(); | ||
298 | }); | ||
299 | } | 298 | } |
300 | 299 | ||
301 | auto preprocess = [&, this](const ApplicationDomain::ApplicationDomainType &oldEntity, ApplicationDomain::ApplicationDomainType &newEntity) { | ||
302 | foreach (const auto &processor, d->processors[bufferType]) { | ||
303 | processor->modifiedEntity(oldEntity, newEntity); | ||
304 | } | ||
305 | }; | ||
306 | |||
307 | d->revisionChanged = true; | 300 | d->revisionChanged = true; |
308 | if (!d->entityStore.modify(bufferType, diff, deletions, replayToSource, preprocess)) { | 301 | if (!d->entityStore.modify(bufferType, current, newEntity, replayToSource)) { |
309 | return KAsync::error<qint64>(0); | 302 | return KAsync::error<qint64>(0); |
310 | } | 303 | } |
311 | 304 | ||