summaryrefslogtreecommitdiffstats
path: root/common/pipeline.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'common/pipeline.cpp')
-rw-r--r--common/pipeline.cpp169
1 files changed, 107 insertions, 62 deletions
diff --git a/common/pipeline.cpp b/common/pipeline.cpp
index 887b6b3..019784e 100644
--- a/common/pipeline.cpp
+++ b/common/pipeline.cpp
@@ -175,13 +175,14 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size)
175 auto o = Sink::ApplicationDomain::ApplicationDomainType{d->resourceContext.instanceId(), key, revision, memoryAdaptor}; 175 auto o = Sink::ApplicationDomain::ApplicationDomainType{d->resourceContext.instanceId(), key, revision, memoryAdaptor};
176 o.setChangedProperties(o.availableProperties().toSet()); 176 o.setChangedProperties(o.availableProperties().toSet());
177 177
178 auto preprocess = [&, this](ApplicationDomain::ApplicationDomainType &newEntity) { 178 auto newEntity = *ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<ApplicationDomain::ApplicationDomainType>(o, o.availableProperties());
179 foreach (const auto &processor, d->processors[bufferType]) { 179 newEntity.setChangedProperties(newEntity.availableProperties().toSet());
180 processor->newEntity(newEntity); 180
181 } 181 foreach (const auto &processor, d->processors[bufferType]) {
182 }; 182 processor->newEntity(newEntity);
183 }
183 184
184 if (!d->entityStore.add(bufferType, o, replayToSource, preprocess)) { 185 if (!d->entityStore.add(bufferType, o, replayToSource)) {
185 return KAsync::error<qint64>(0); 186 return KAsync::error<qint64>(0);
186 } 187 }
187 188
@@ -195,6 +196,11 @@ struct CreateHelper {
195 } 196 }
196}; 197};
197 198
199static KAsync::Job<void> create(const QByteArray &type, const ApplicationDomain::ApplicationDomainType &newEntity)
200{
201 return TypeHelper<CreateHelper>{type}.operator()<KAsync::Job<void>, const ApplicationDomain::ApplicationDomainType&>(newEntity);
202}
203
198KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size) 204KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size)
199{ 205{
200 d->transactionItemCount++; 206 d->transactionItemCount++;
@@ -248,65 +254,71 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size)
248 deletions = BufferUtils::fromVector(*modifyEntity->deletions()); 254 deletions = BufferUtils::fromVector(*modifyEntity->deletions());
249 } 255 }
250 256
251 if (modifyEntity->targetResource()) { 257 const auto current = d->entityStore.readLatest(bufferType, diff.identifier());
252 auto isMove = modifyEntity->removeEntity(); 258 if (current.identifier().isEmpty()) {
253 auto targetResource = BufferUtils::extractBuffer(modifyEntity->targetResource()); 259 SinkWarningCtx(d->logCtx) << "Failed to read current version: " << diff.identifier();
254 auto changeset = diff.changedProperties(); 260 return KAsync::error<qint64>(0);
255 const auto current = d->entityStore.readLatest(bufferType, diff.identifier()); 261 }
256 if (current.identifier().isEmpty()) {
257 SinkWarningCtx(d->logCtx) << "Failed to read current version: " << diff.identifier();
258 return KAsync::error<qint64>(0);
259 }
260 262
261 auto newEntity = *ApplicationDomain::ApplicationDomainType::getInMemoryCopy<ApplicationDomain::ApplicationDomainType>(current, current.availableProperties()); 263 auto newEntity = d->entityStore.applyDiff(bufferType, current, diff, deletions);
262 264
263 // Apply diff 265 bool isMove = false;
264 for (const auto &property : changeset) { 266 if (modifyEntity->targetResource()) {
265 const auto value = diff.getProperty(property); 267 isMove = modifyEntity->removeEntity();
266 if (value.isValid()) { 268 newEntity.setResource(BufferUtils::extractBuffer(modifyEntity->targetResource()));
267 newEntity.setProperty(property, value); 269 }
268 }
269 }
270 270
271 // Remove deletions 271 foreach (const auto &processor, d->processors[bufferType]) {
272 for (const auto &property : deletions) { 272 bool exitLoop = false;
273 newEntity.setProperty(property, QVariant()); 273 const auto result = processor->processModification(Preprocessor::Modification, current, newEntity);
274 switch (result.action) {
275 case Preprocessor::MoveToResource:
276 isMove = true;
277 exitLoop = true;
278 break;
279 case Preprocessor::CopyToResource:
280 isMove = true;
281 exitLoop = true;
282 break;
283 case Preprocessor::DropModification:
284 SinkTraceCtx(d->logCtx) << "Dropping modification";
285 return KAsync::error<qint64>(0);
286 default:
287 break;
274 } 288 }
275 newEntity.setResource(targetResource); 289 if (exitLoop) {
276 newEntity.setChangedProperties(newEntity.availableProperties().toSet()); 290 break;
291 }
292 }
277 293
278 SinkTraceCtx(d->logCtx) << "Moving entity to new resource " << newEntity.identifier() << newEntity.resourceInstanceIdentifier() << targetResource; 294 //The entity is either being copied or moved
279 auto job = TypeHelper<CreateHelper>{bufferType}.operator()<KAsync::Job<void>, ApplicationDomain::ApplicationDomainType&>(newEntity); 295 if (newEntity.resourceInstanceIdentifier() != d->resourceContext.resourceInstanceIdentifier) {
280 job = job.then([this, current, isMove, targetResource, bufferType](const KAsync::Error &error) { 296 SinkTraceCtx(d->logCtx) << "Moving entity to new resource " << newEntity.identifier() << newEntity.resourceInstanceIdentifier();
281 if (!error) { 297 newEntity.setChangedProperties(newEntity.availableProperties().toSet());
282 SinkTraceCtx(d->logCtx) << "Move of " << current.identifier() << "was successfull"; 298 return create(bufferType, newEntity)
283 if (isMove) { 299 .then([=](const KAsync::Error &error) {
284 startTransaction(); 300 if (!error) {
285 flatbuffers::FlatBufferBuilder fbb; 301 SinkTraceCtx(d->logCtx) << "Move of " << current.identifier() << "was successfull";
286 auto entityId = fbb.CreateString(current.identifier()); 302 if (isMove) {
287 auto type = fbb.CreateString(bufferType); 303 flatbuffers::FlatBufferBuilder fbb;
288 auto location = Sink::Commands::CreateDeleteEntity(fbb, current.revision(), entityId, type, true); 304 auto entityId = fbb.CreateString(current.identifier());
289 Sink::Commands::FinishDeleteEntityBuffer(fbb, location); 305 auto type = fbb.CreateString(bufferType);
290 const auto data = BufferUtils::extractBuffer(fbb); 306 auto location = Sink::Commands::CreateDeleteEntity(fbb, current.revision(), entityId, type, true);
291 deletedEntity(data, data.size()).exec(); 307 Sink::Commands::FinishDeleteEntityBuffer(fbb, location);
292 commit(); 308 const auto data = BufferUtils::extractBuffer(fbb);
309 deletedEntity(data, data.size()).exec();
310 }
311 } else {
312 SinkErrorCtx(d->logCtx) << "Failed to move entity " << newEntity.identifier() << " to resource " << newEntity.resourceInstanceIdentifier();
293 } 313 }
294 } else { 314 })
295 SinkErrorCtx(d->logCtx) << "Failed to move entity " << targetResource << " to resource " << current.identifier(); 315 .then([this] {
296 } 316 return d->entityStore.maxRevision();
297 }); 317 });
298 job.exec();
299 return KAsync::value<qint64>(0);
300 } 318 }
301 319
302 auto preprocess = [&, this](const ApplicationDomain::ApplicationDomainType &oldEntity, ApplicationDomain::ApplicationDomainType &newEntity) {
303 foreach (const auto &processor, d->processors[bufferType]) {
304 processor->modifiedEntity(oldEntity, newEntity);
305 }
306 };
307
308 d->revisionChanged = true; 320 d->revisionChanged = true;
309 if (!d->entityStore.modify(bufferType, diff, deletions, replayToSource, preprocess)) { 321 if (!d->entityStore.modify(bufferType, current, newEntity, replayToSource)) {
310 return KAsync::error<qint64>(0); 322 return KAsync::error<qint64>(0);
311 } 323 }
312 324
@@ -331,14 +343,14 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size)
331 const QByteArray key = QByteArray(reinterpret_cast<char const *>(deleteEntity->entityId()->Data()), deleteEntity->entityId()->size()); 343 const QByteArray key = QByteArray(reinterpret_cast<char const *>(deleteEntity->entityId()->Data()), deleteEntity->entityId()->size());
332 SinkTraceCtx(d->logCtx) << "Deleted Entity. Type: " << bufferType << "uid: "<< key << " replayToSource: " << replayToSource; 344 SinkTraceCtx(d->logCtx) << "Deleted Entity. Type: " << bufferType << "uid: "<< key << " replayToSource: " << replayToSource;
333 345
334 auto preprocess = [&, this](const ApplicationDomain::ApplicationDomainType &oldEntity) { 346 const auto current = d->entityStore.readLatest(bufferType, key);
335 foreach (const auto &processor, d->processors[bufferType]) { 347
336 processor->deletedEntity(oldEntity); 348 foreach (const auto &processor, d->processors[bufferType]) {
337 } 349 processor->deletedEntity(current);
338 }; 350 }
339 351
340 d->revisionChanged = true; 352 d->revisionChanged = true;
341 if (!d->entityStore.remove(bufferType, key, replayToSource, preprocess)) { 353 if (!d->entityStore.remove(bufferType, current, replayToSource)) {
342 return KAsync::error<qint64>(0); 354 return KAsync::error<qint64>(0);
343 } 355 }
344 356
@@ -385,6 +397,39 @@ void Preprocessor::finalizeBatch()
385{ 397{
386} 398}
387 399
400void Preprocessor::newEntity(ApplicationDomain::ApplicationDomainType &newEntity)
401{
402
403}
404
405void Preprocessor::modifiedEntity(const ApplicationDomain::ApplicationDomainType &oldEntity, ApplicationDomain::ApplicationDomainType &newEntity)
406{
407
408}
409
410void Preprocessor::deletedEntity(const ApplicationDomain::ApplicationDomainType &oldEntity)
411{
412
413}
414
415Preprocessor::Result Preprocessor::processModification(Type type, const ApplicationDomain::ApplicationDomainType &current, ApplicationDomain::ApplicationDomainType &diff)
416{
417 switch(type) {
418 case Creation:
419 newEntity(diff);
420 return {NoAction};
421 case Modification:
422 modifiedEntity(current, diff);
423 return {NoAction};
424 case Deletion:
425 deletedEntity(current);
426 return {NoAction};
427 default:
428 break;
429 }
430 return {NoAction};
431}
432
388QByteArray Preprocessor::resourceInstanceIdentifier() const 433QByteArray Preprocessor::resourceInstanceIdentifier() const
389{ 434{
390 return d->resourceInstanceIdentifier; 435 return d->resourceInstanceIdentifier;