summaryrefslogtreecommitdiffstats
path: root/common/pipeline.cpp
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2017-04-11 15:16:26 +0200
committerChristian Mollekopf <chrigi_1@fastmail.fm>2017-04-11 15:16:26 +0200
commitf6c3c144e60611d2da7ba7aa5b115affe92a57a4 (patch)
tree4e4b01382d7c2893d4d1d14489506e3b0066fce9 /common/pipeline.cpp
parentadb11fd81404b9ab3b01975ed93babe12a22dee4 (diff)
downloadsink-f6c3c144e60611d2da7ba7aa5b115affe92a57a4.tar.gz
sink-f6c3c144e60611d2da7ba7aa5b115affe92a57a4.zip
Move the preprocssing back out of entitystore into the pipeline.
This is where this really belongs, only the indexing is part of storage. This is necessary so preprocessors can move entities as well.
Diffstat (limited to 'common/pipeline.cpp')
-rw-r--r--common/pipeline.cpp93
1 files changed, 43 insertions, 50 deletions
diff --git a/common/pipeline.cpp b/common/pipeline.cpp
index 7f836c4..15ed5fc 100644
--- a/common/pipeline.cpp
+++ b/common/pipeline.cpp
@@ -195,6 +195,11 @@ struct CreateHelper {
195 } 195 }
196}; 196};
197 197
198static KAsync::Job<void> create(const QByteArray &type, const ApplicationDomain::ApplicationDomainType &newEntity)
199{
200 return TypeHelper<CreateHelper>{type}.operator()<KAsync::Job<void>, const ApplicationDomain::ApplicationDomainType&>(newEntity);
201}
202
198KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size) 203KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size)
199{ 204{
200 d->transactionItemCount++; 205 d->transactionItemCount++;
@@ -248,64 +253,52 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size)
248 deletions = BufferUtils::fromVector(*modifyEntity->deletions()); 253 deletions = BufferUtils::fromVector(*modifyEntity->deletions());
249 } 254 }
250 255
251 if (modifyEntity->targetResource()) { 256 const auto current = d->entityStore.readLatest(bufferType, diff.identifier());
252 auto isMove = modifyEntity->removeEntity(); 257 if (current.identifier().isEmpty()) {
253 auto targetResource = BufferUtils::extractBuffer(modifyEntity->targetResource()); 258 SinkWarningCtx(d->logCtx) << "Failed to read current version: " << diff.identifier();
254 auto changeset = diff.changedProperties(); 259 return KAsync::error<qint64>(0);
255 const auto current = d->entityStore.readLatest(bufferType, diff.identifier()); 260 }
256 if (current.identifier().isEmpty()) {
257 SinkWarningCtx(d->logCtx) << "Failed to read current version: " << diff.identifier();
258 return KAsync::error<qint64>(0);
259 }
260 261
261 auto newEntity = *ApplicationDomain::ApplicationDomainType::getInMemoryCopy<ApplicationDomain::ApplicationDomainType>(current, current.availableProperties()); 262 auto newEntity = d->entityStore.applyDiff(bufferType, current, diff, deletions);
262 263
263 // Apply diff 264 bool isMove = false;
264 for (const auto &property : changeset) { 265 if (modifyEntity->targetResource()) {
265 const auto value = diff.getProperty(property); 266 isMove = modifyEntity->removeEntity();
266 if (value.isValid()) { 267 newEntity.setResource(BufferUtils::extractBuffer(modifyEntity->targetResource()));
267 newEntity.setProperty(property, value); 268 }
268 }
269 }
270 269
271 // Remove deletions 270 foreach (const auto &processor, d->processors[bufferType]) {
272 for (const auto &property : deletions) { 271 processor->modifiedEntity(current, newEntity);
273 newEntity.setProperty(property, QVariant()); 272 }
274 }
275 newEntity.setResource(targetResource);
276 newEntity.setChangedProperties(newEntity.availableProperties().toSet());
277 273
278 SinkTraceCtx(d->logCtx) << "Moving entity to new resource " << newEntity.identifier() << newEntity.resourceInstanceIdentifier() << targetResource; 274 //The entity is either being copied or moved
279 auto job = TypeHelper<CreateHelper>{bufferType}.operator()<KAsync::Job<void>, ApplicationDomain::ApplicationDomainType&>(newEntity); 275 if (newEntity.resourceInstanceIdentifier() != d->resourceContext.resourceInstanceIdentifier) {
280 job = job.then([this, current, isMove, targetResource, bufferType](const KAsync::Error &error) { 276 SinkTraceCtx(d->logCtx) << "Moving entity to new resource " << newEntity.identifier() << newEntity.resourceInstanceIdentifier();
281 if (!error) { 277 newEntity.setChangedProperties(newEntity.availableProperties().toSet());
282 SinkTraceCtx(d->logCtx) << "Move of " << current.identifier() << "was successfull"; 278 return create(bufferType, newEntity)
283 if (isMove) { 279 .then([=](const KAsync::Error &error) {
284 flatbuffers::FlatBufferBuilder fbb; 280 if (!error) {
285 auto entityId = fbb.CreateString(current.identifier()); 281 SinkTraceCtx(d->logCtx) << "Move of " << current.identifier() << "was successfull";
286 auto type = fbb.CreateString(bufferType); 282 if (isMove) {
287 auto location = Sink::Commands::CreateDeleteEntity(fbb, current.revision(), entityId, type, true); 283 flatbuffers::FlatBufferBuilder fbb;
288 Sink::Commands::FinishDeleteEntityBuffer(fbb, location); 284 auto entityId = fbb.CreateString(current.identifier());
289 const auto data = BufferUtils::extractBuffer(fbb); 285 auto type = fbb.CreateString(bufferType);
290 deletedEntity(data, data.size()).exec(); 286 auto location = Sink::Commands::CreateDeleteEntity(fbb, current.revision(), entityId, type, true);
287 Sink::Commands::FinishDeleteEntityBuffer(fbb, location);
288 const auto data = BufferUtils::extractBuffer(fbb);
289 deletedEntity(data, data.size()).exec();
290 }
291 } else {
292 SinkErrorCtx(d->logCtx) << "Failed to move entity " << newEntity.identifier() << " to resource " << newEntity.resourceInstanceIdentifier();
291 } 293 }
292 } else { 294 })
293 SinkErrorCtx(d->logCtx) << "Failed to move entity " << targetResource << " to resource " << current.identifier(); 295 .then([this] {
294 } 296 return d->entityStore.maxRevision();
295 }); 297 });
296 return job.then([this] {
297 return d->entityStore.maxRevision();
298 });
299 } 298 }
300 299
301 auto preprocess = [&, this](const ApplicationDomain::ApplicationDomainType &oldEntity, ApplicationDomain::ApplicationDomainType &newEntity) {
302 foreach (const auto &processor, d->processors[bufferType]) {
303 processor->modifiedEntity(oldEntity, newEntity);
304 }
305 };
306
307 d->revisionChanged = true; 300 d->revisionChanged = true;
308 if (!d->entityStore.modify(bufferType, diff, deletions, replayToSource, preprocess)) { 301 if (!d->entityStore.modify(bufferType, current, newEntity, replayToSource)) {
309 return KAsync::error<qint64>(0); 302 return KAsync::error<qint64>(0);
310 } 303 }
311 304