summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--common/pipeline.cpp93
-rw-r--r--common/storage/entitystore.cpp34
-rw-r--r--common/storage/entitystore.h4
3 files changed, 68 insertions, 63 deletions
diff --git a/common/pipeline.cpp b/common/pipeline.cpp
index 7f836c4..15ed5fc 100644
--- a/common/pipeline.cpp
+++ b/common/pipeline.cpp
@@ -195,6 +195,11 @@ struct CreateHelper {
195 } 195 }
196}; 196};
197 197
198static KAsync::Job<void> create(const QByteArray &type, const ApplicationDomain::ApplicationDomainType &newEntity)
199{
200 return TypeHelper<CreateHelper>{type}.operator()<KAsync::Job<void>, const ApplicationDomain::ApplicationDomainType&>(newEntity);
201}
202
198KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size) 203KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size)
199{ 204{
200 d->transactionItemCount++; 205 d->transactionItemCount++;
@@ -248,64 +253,52 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size)
248 deletions = BufferUtils::fromVector(*modifyEntity->deletions()); 253 deletions = BufferUtils::fromVector(*modifyEntity->deletions());
249 } 254 }
250 255
251 if (modifyEntity->targetResource()) { 256 const auto current = d->entityStore.readLatest(bufferType, diff.identifier());
252 auto isMove = modifyEntity->removeEntity(); 257 if (current.identifier().isEmpty()) {
253 auto targetResource = BufferUtils::extractBuffer(modifyEntity->targetResource()); 258 SinkWarningCtx(d->logCtx) << "Failed to read current version: " << diff.identifier();
254 auto changeset = diff.changedProperties(); 259 return KAsync::error<qint64>(0);
255 const auto current = d->entityStore.readLatest(bufferType, diff.identifier()); 260 }
256 if (current.identifier().isEmpty()) {
257 SinkWarningCtx(d->logCtx) << "Failed to read current version: " << diff.identifier();
258 return KAsync::error<qint64>(0);
259 }
260 261
261 auto newEntity = *ApplicationDomain::ApplicationDomainType::getInMemoryCopy<ApplicationDomain::ApplicationDomainType>(current, current.availableProperties()); 262 auto newEntity = d->entityStore.applyDiff(bufferType, current, diff, deletions);
262 263
263 // Apply diff 264 bool isMove = false;
264 for (const auto &property : changeset) { 265 if (modifyEntity->targetResource()) {
265 const auto value = diff.getProperty(property); 266 isMove = modifyEntity->removeEntity();
266 if (value.isValid()) { 267 newEntity.setResource(BufferUtils::extractBuffer(modifyEntity->targetResource()));
267 newEntity.setProperty(property, value); 268 }
268 }
269 }
270 269
271 // Remove deletions 270 foreach (const auto &processor, d->processors[bufferType]) {
272 for (const auto &property : deletions) { 271 processor->modifiedEntity(current, newEntity);
273 newEntity.setProperty(property, QVariant()); 272 }
274 }
275 newEntity.setResource(targetResource);
276 newEntity.setChangedProperties(newEntity.availableProperties().toSet());
277 273
278 SinkTraceCtx(d->logCtx) << "Moving entity to new resource " << newEntity.identifier() << newEntity.resourceInstanceIdentifier() << targetResource; 274 //The entity is either being copied or moved
279 auto job = TypeHelper<CreateHelper>{bufferType}.operator()<KAsync::Job<void>, ApplicationDomain::ApplicationDomainType&>(newEntity); 275 if (newEntity.resourceInstanceIdentifier() != d->resourceContext.resourceInstanceIdentifier) {
280 job = job.then([this, current, isMove, targetResource, bufferType](const KAsync::Error &error) { 276 SinkTraceCtx(d->logCtx) << "Moving entity to new resource " << newEntity.identifier() << newEntity.resourceInstanceIdentifier();
281 if (!error) { 277 newEntity.setChangedProperties(newEntity.availableProperties().toSet());
282 SinkTraceCtx(d->logCtx) << "Move of " << current.identifier() << "was successfull"; 278 return create(bufferType, newEntity)
283 if (isMove) { 279 .then([=](const KAsync::Error &error) {
284 flatbuffers::FlatBufferBuilder fbb; 280 if (!error) {
285 auto entityId = fbb.CreateString(current.identifier()); 281 SinkTraceCtx(d->logCtx) << "Move of " << current.identifier() << "was successfull";
286 auto type = fbb.CreateString(bufferType); 282 if (isMove) {
287 auto location = Sink::Commands::CreateDeleteEntity(fbb, current.revision(), entityId, type, true); 283 flatbuffers::FlatBufferBuilder fbb;
288 Sink::Commands::FinishDeleteEntityBuffer(fbb, location); 284 auto entityId = fbb.CreateString(current.identifier());
289 const auto data = BufferUtils::extractBuffer(fbb); 285 auto type = fbb.CreateString(bufferType);
290 deletedEntity(data, data.size()).exec(); 286 auto location = Sink::Commands::CreateDeleteEntity(fbb, current.revision(), entityId, type, true);
287 Sink::Commands::FinishDeleteEntityBuffer(fbb, location);
288 const auto data = BufferUtils::extractBuffer(fbb);
289 deletedEntity(data, data.size()).exec();
290 }
291 } else {
292 SinkErrorCtx(d->logCtx) << "Failed to move entity " << newEntity.identifier() << " to resource " << newEntity.resourceInstanceIdentifier();
291 } 293 }
292 } else { 294 })
293 SinkErrorCtx(d->logCtx) << "Failed to move entity " << targetResource << " to resource " << current.identifier(); 295 .then([this] {
294 } 296 return d->entityStore.maxRevision();
295 }); 297 });
296 return job.then([this] {
297 return d->entityStore.maxRevision();
298 });
299 } 298 }
300 299
301 auto preprocess = [&, this](const ApplicationDomain::ApplicationDomainType &oldEntity, ApplicationDomain::ApplicationDomainType &newEntity) {
302 foreach (const auto &processor, d->processors[bufferType]) {
303 processor->modifiedEntity(oldEntity, newEntity);
304 }
305 };
306
307 d->revisionChanged = true; 300 d->revisionChanged = true;
308 if (!d->entityStore.modify(bufferType, diff, deletions, replayToSource, preprocess)) { 301 if (!d->entityStore.modify(bufferType, current, newEntity, replayToSource)) {
309 return KAsync::error<qint64>(0); 302 return KAsync::error<qint64>(0);
310 } 303 }
311 304
diff --git a/common/storage/entitystore.cpp b/common/storage/entitystore.cpp
index 4afb407..3ef8784 100644
--- a/common/storage/entitystore.cpp
+++ b/common/storage/entitystore.cpp
@@ -209,22 +209,15 @@ bool EntityStore::add(const QByteArray &type, const ApplicationDomain::Applicati
209 return true; 209 return true;
210} 210}
211 211
212bool EntityStore::modify(const QByteArray &type, const ApplicationDomain::ApplicationDomainType &diff, const QByteArrayList &deletions, bool replayToSource, const PreprocessModification &preprocess) 212ApplicationDomain::ApplicationDomainType EntityStore::applyDiff(const QByteArray &type, const ApplicationDomain::ApplicationDomainType &current, const ApplicationDomain::ApplicationDomainType &diff, const QByteArrayList &deletions) const
213{ 213{
214 auto changeset = diff.changedProperties();
215 const auto current = readLatest(type, diff.identifier());
216 if (current.identifier().isEmpty()) {
217 SinkWarningCtx(d->logCtx) << "Failed to read current version: " << diff.identifier();
218 return false;
219 }
220
221 auto newEntity = *ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<ApplicationDomain::ApplicationDomainType>(current, current.availableProperties()); 214 auto newEntity = *ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<ApplicationDomain::ApplicationDomainType>(current, current.availableProperties());
222 215
223 SinkTraceCtx(d->logCtx) << "Modified entity: " << newEntity; 216 SinkTraceCtx(d->logCtx) << "Modified entity: " << newEntity;
224 217
225 // Apply diff 218 // Apply diff
226 //SinkTrace() << "Applying changed properties: " << changeset; 219 //SinkTrace() << "Applying changed properties: " << changeset;
227 for (const auto &property : changeset) { 220 for (const auto &property : diff.changedProperties()) {
228 const auto value = diff.getProperty(property); 221 const auto value = diff.getProperty(property);
229 if (value.isValid()) { 222 if (value.isValid()) {
230 //SinkTrace() << "Setting property: " << property; 223 //SinkTrace() << "Setting property: " << property;
@@ -237,8 +230,25 @@ bool EntityStore::modify(const QByteArray &type, const ApplicationDomain::Applic
237 //SinkTrace() << "Removing property: " << property; 230 //SinkTrace() << "Removing property: " << property;
238 newEntity.setProperty(property, QVariant()); 231 newEntity.setProperty(property, QVariant());
239 } 232 }
233 return newEntity;
234}
235
236bool EntityStore::modify(const QByteArray &type, const ApplicationDomain::ApplicationDomainType &diff, const QByteArrayList &deletions, bool replayToSource)
237{
238 const auto current = readLatest(type, diff.identifier());
239 if (current.identifier().isEmpty()) {
240 SinkWarningCtx(d->logCtx) << "Failed to read current version: " << diff.identifier();
241 return false;
242 }
243
244 auto newEntity = applyDiff(type, current, diff, deletions);
245 return modify(type, current, newEntity, replayToSource);
246}
247
248bool EntityStore::modify(const QByteArray &type, const ApplicationDomain::ApplicationDomainType &current, ApplicationDomain::ApplicationDomainType newEntity, bool replayToSource)
249{
250 SinkTraceCtx(d->logCtx) << "Modified entity: " << newEntity;
240 251
241 preprocess(current, newEntity);
242 d->typeIndex(type).remove(current.identifier(), current, d->transaction); 252 d->typeIndex(type).remove(current.identifier(), current, d->transaction);
243 d->typeIndex(type).add(newEntity.identifier(), newEntity, d->transaction); 253 d->typeIndex(type).add(newEntity.identifier(), newEntity, d->transaction);
244 254
@@ -250,7 +260,7 @@ bool EntityStore::modify(const QByteArray &type, const ApplicationDomain::Applic
250 flatbuffers::FlatBufferBuilder metadataFbb; 260 flatbuffers::FlatBufferBuilder metadataFbb;
251 { 261 {
252 //We add availableProperties to account for the properties that have been changed by the preprocessors 262 //We add availableProperties to account for the properties that have been changed by the preprocessors
253 auto modifiedProperties = BufferUtils::toVector(metadataFbb, changeset + newEntity.changedProperties()); 263 auto modifiedProperties = BufferUtils::toVector(metadataFbb, newEntity.changedProperties());
254 auto metadataBuilder = MetadataBuilder(metadataFbb); 264 auto metadataBuilder = MetadataBuilder(metadataFbb);
255 metadataBuilder.add_revision(newRevision); 265 metadataBuilder.add_revision(newRevision);
256 metadataBuilder.add_operation(Operation_Modification); 266 metadataBuilder.add_operation(Operation_Modification);
@@ -259,7 +269,7 @@ bool EntityStore::modify(const QByteArray &type, const ApplicationDomain::Applic
259 auto metadataBuffer = metadataBuilder.Finish(); 269 auto metadataBuffer = metadataBuilder.Finish();
260 FinishMetadataBuffer(metadataFbb, metadataBuffer); 270 FinishMetadataBuffer(metadataFbb, metadataBuffer);
261 } 271 }
262 SinkTraceCtx(d->logCtx) << "Changed properties: " << changeset + newEntity.changedProperties(); 272 SinkTraceCtx(d->logCtx) << "Changed properties: " << newEntity.changedProperties();
263 273
264 newEntity.setChangedProperties(newEntity.availableProperties().toSet()); 274 newEntity.setChangedProperties(newEntity.availableProperties().toSet());
265 275
diff --git a/common/storage/entitystore.h b/common/storage/entitystore.h
index 46410cd..ddb4ef9 100644
--- a/common/storage/entitystore.h
+++ b/common/storage/entitystore.h
@@ -44,9 +44,11 @@ public:
44 44
45 //Only the pipeline may call the following functions outside of tests 45 //Only the pipeline may call the following functions outside of tests
46 bool add(const QByteArray &type, const ApplicationDomain::ApplicationDomainType &, bool replayToSource, const PreprocessCreation &); 46 bool add(const QByteArray &type, const ApplicationDomain::ApplicationDomainType &, bool replayToSource, const PreprocessCreation &);
47 bool modify(const QByteArray &type, const ApplicationDomain::ApplicationDomainType &, const QByteArrayList &deletions, bool replayToSource, const PreprocessModification &); 47 bool modify(const QByteArray &type, const ApplicationDomain::ApplicationDomainType &diff, const QByteArrayList &deletions, bool replayToSource);
48 bool modify(const QByteArray &type, const ApplicationDomain::ApplicationDomainType &current, ApplicationDomain::ApplicationDomainType newEntity, bool replayToSource);
48 bool remove(const QByteArray &type, const QByteArray &uid, bool replayToSource, const PreprocessRemoval &); 49 bool remove(const QByteArray &type, const QByteArray &uid, bool replayToSource, const PreprocessRemoval &);
49 bool cleanupRevisions(qint64 revision); 50 bool cleanupRevisions(qint64 revision);
51 ApplicationDomain::ApplicationDomainType applyDiff(const QByteArray &type, const ApplicationDomain::ApplicationDomainType &current, const ApplicationDomain::ApplicationDomainType &diff, const QByteArrayList &deletions) const;
50 52
51 void startTransaction(Sink::Storage::DataStore::AccessMode); 53 void startTransaction(Sink::Storage::DataStore::AccessMode);
52 void commitTransaction(); 54 void commitTransaction();