diff options
-rw-r--r-- | common/pipeline.cpp | 93 | ||||
-rw-r--r-- | common/storage/entitystore.cpp | 34 | ||||
-rw-r--r-- | common/storage/entitystore.h | 4 |
3 files changed, 68 insertions, 63 deletions
diff --git a/common/pipeline.cpp b/common/pipeline.cpp index 7f836c4..15ed5fc 100644 --- a/common/pipeline.cpp +++ b/common/pipeline.cpp | |||
@@ -195,6 +195,11 @@ struct CreateHelper { | |||
195 | } | 195 | } |
196 | }; | 196 | }; |
197 | 197 | ||
198 | static KAsync::Job<void> create(const QByteArray &type, const ApplicationDomain::ApplicationDomainType &newEntity) | ||
199 | { | ||
200 | return TypeHelper<CreateHelper>{type}.operator()<KAsync::Job<void>, const ApplicationDomain::ApplicationDomainType&>(newEntity); | ||
201 | } | ||
202 | |||
198 | KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size) | 203 | KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size) |
199 | { | 204 | { |
200 | d->transactionItemCount++; | 205 | d->transactionItemCount++; |
@@ -248,64 +253,52 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size) | |||
248 | deletions = BufferUtils::fromVector(*modifyEntity->deletions()); | 253 | deletions = BufferUtils::fromVector(*modifyEntity->deletions()); |
249 | } | 254 | } |
250 | 255 | ||
251 | if (modifyEntity->targetResource()) { | 256 | const auto current = d->entityStore.readLatest(bufferType, diff.identifier()); |
252 | auto isMove = modifyEntity->removeEntity(); | 257 | if (current.identifier().isEmpty()) { |
253 | auto targetResource = BufferUtils::extractBuffer(modifyEntity->targetResource()); | 258 | SinkWarningCtx(d->logCtx) << "Failed to read current version: " << diff.identifier(); |
254 | auto changeset = diff.changedProperties(); | 259 | return KAsync::error<qint64>(0); |
255 | const auto current = d->entityStore.readLatest(bufferType, diff.identifier()); | 260 | } |
256 | if (current.identifier().isEmpty()) { | ||
257 | SinkWarningCtx(d->logCtx) << "Failed to read current version: " << diff.identifier(); | ||
258 | return KAsync::error<qint64>(0); | ||
259 | } | ||
260 | 261 | ||
261 | auto newEntity = *ApplicationDomain::ApplicationDomainType::getInMemoryCopy<ApplicationDomain::ApplicationDomainType>(current, current.availableProperties()); | 262 | auto newEntity = d->entityStore.applyDiff(bufferType, current, diff, deletions); |
262 | 263 | ||
263 | // Apply diff | 264 | bool isMove = false; |
264 | for (const auto &property : changeset) { | 265 | if (modifyEntity->targetResource()) { |
265 | const auto value = diff.getProperty(property); | 266 | isMove = modifyEntity->removeEntity(); |
266 | if (value.isValid()) { | 267 | newEntity.setResource(BufferUtils::extractBuffer(modifyEntity->targetResource())); |
267 | newEntity.setProperty(property, value); | 268 | } |
268 | } | ||
269 | } | ||
270 | 269 | ||
271 | // Remove deletions | 270 | foreach (const auto &processor, d->processors[bufferType]) { |
272 | for (const auto &property : deletions) { | 271 | processor->modifiedEntity(current, newEntity); |
273 | newEntity.setProperty(property, QVariant()); | 272 | } |
274 | } | ||
275 | newEntity.setResource(targetResource); | ||
276 | newEntity.setChangedProperties(newEntity.availableProperties().toSet()); | ||
277 | 273 | ||
278 | SinkTraceCtx(d->logCtx) << "Moving entity to new resource " << newEntity.identifier() << newEntity.resourceInstanceIdentifier() << targetResource; | 274 | //The entity is either being copied or moved |
279 | auto job = TypeHelper<CreateHelper>{bufferType}.operator()<KAsync::Job<void>, ApplicationDomain::ApplicationDomainType&>(newEntity); | 275 | if (newEntity.resourceInstanceIdentifier() != d->resourceContext.resourceInstanceIdentifier) { |
280 | job = job.then([this, current, isMove, targetResource, bufferType](const KAsync::Error &error) { | 276 | SinkTraceCtx(d->logCtx) << "Moving entity to new resource " << newEntity.identifier() << newEntity.resourceInstanceIdentifier(); |
281 | if (!error) { | 277 | newEntity.setChangedProperties(newEntity.availableProperties().toSet()); |
282 | SinkTraceCtx(d->logCtx) << "Move of " << current.identifier() << "was successfull"; | 278 | return create(bufferType, newEntity) |
283 | if (isMove) { | 279 | .then([=](const KAsync::Error &error) { |
284 | flatbuffers::FlatBufferBuilder fbb; | 280 | if (!error) { |
285 | auto entityId = fbb.CreateString(current.identifier()); | 281 | SinkTraceCtx(d->logCtx) << "Move of " << current.identifier() << "was successfull"; |
286 | auto type = fbb.CreateString(bufferType); | 282 | if (isMove) { |
287 | auto location = Sink::Commands::CreateDeleteEntity(fbb, current.revision(), entityId, type, true); | 283 | flatbuffers::FlatBufferBuilder fbb; |
288 | Sink::Commands::FinishDeleteEntityBuffer(fbb, location); | 284 | auto entityId = fbb.CreateString(current.identifier()); |
289 | const auto data = BufferUtils::extractBuffer(fbb); | 285 | auto type = fbb.CreateString(bufferType); |
290 | deletedEntity(data, data.size()).exec(); | 286 | auto location = Sink::Commands::CreateDeleteEntity(fbb, current.revision(), entityId, type, true); |
287 | Sink::Commands::FinishDeleteEntityBuffer(fbb, location); | ||
288 | const auto data = BufferUtils::extractBuffer(fbb); | ||
289 | deletedEntity(data, data.size()).exec(); | ||
290 | } | ||
291 | } else { | ||
292 | SinkErrorCtx(d->logCtx) << "Failed to move entity " << newEntity.identifier() << " to resource " << newEntity.resourceInstanceIdentifier(); | ||
291 | } | 293 | } |
292 | } else { | 294 | }) |
293 | SinkErrorCtx(d->logCtx) << "Failed to move entity " << targetResource << " to resource " << current.identifier(); | 295 | .then([this] { |
294 | } | 296 | return d->entityStore.maxRevision(); |
295 | }); | 297 | }); |
296 | return job.then([this] { | ||
297 | return d->entityStore.maxRevision(); | ||
298 | }); | ||
299 | } | 298 | } |
300 | 299 | ||
301 | auto preprocess = [&, this](const ApplicationDomain::ApplicationDomainType &oldEntity, ApplicationDomain::ApplicationDomainType &newEntity) { | ||
302 | foreach (const auto &processor, d->processors[bufferType]) { | ||
303 | processor->modifiedEntity(oldEntity, newEntity); | ||
304 | } | ||
305 | }; | ||
306 | |||
307 | d->revisionChanged = true; | 300 | d->revisionChanged = true; |
308 | if (!d->entityStore.modify(bufferType, diff, deletions, replayToSource, preprocess)) { | 301 | if (!d->entityStore.modify(bufferType, current, newEntity, replayToSource)) { |
309 | return KAsync::error<qint64>(0); | 302 | return KAsync::error<qint64>(0); |
310 | } | 303 | } |
311 | 304 | ||
diff --git a/common/storage/entitystore.cpp b/common/storage/entitystore.cpp index 4afb407..3ef8784 100644 --- a/common/storage/entitystore.cpp +++ b/common/storage/entitystore.cpp | |||
@@ -209,22 +209,15 @@ bool EntityStore::add(const QByteArray &type, const ApplicationDomain::Applicati | |||
209 | return true; | 209 | return true; |
210 | } | 210 | } |
211 | 211 | ||
212 | bool EntityStore::modify(const QByteArray &type, const ApplicationDomain::ApplicationDomainType &diff, const QByteArrayList &deletions, bool replayToSource, const PreprocessModification &preprocess) | 212 | ApplicationDomain::ApplicationDomainType EntityStore::applyDiff(const QByteArray &type, const ApplicationDomain::ApplicationDomainType ¤t, const ApplicationDomain::ApplicationDomainType &diff, const QByteArrayList &deletions) const |
213 | { | 213 | { |
214 | auto changeset = diff.changedProperties(); | ||
215 | const auto current = readLatest(type, diff.identifier()); | ||
216 | if (current.identifier().isEmpty()) { | ||
217 | SinkWarningCtx(d->logCtx) << "Failed to read current version: " << diff.identifier(); | ||
218 | return false; | ||
219 | } | ||
220 | |||
221 | auto newEntity = *ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<ApplicationDomain::ApplicationDomainType>(current, current.availableProperties()); | 214 | auto newEntity = *ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<ApplicationDomain::ApplicationDomainType>(current, current.availableProperties()); |
222 | 215 | ||
223 | SinkTraceCtx(d->logCtx) << "Modified entity: " << newEntity; | 216 | SinkTraceCtx(d->logCtx) << "Modified entity: " << newEntity; |
224 | 217 | ||
225 | // Apply diff | 218 | // Apply diff |
226 | //SinkTrace() << "Applying changed properties: " << changeset; | 219 | //SinkTrace() << "Applying changed properties: " << changeset; |
227 | for (const auto &property : changeset) { | 220 | for (const auto &property : diff.changedProperties()) { |
228 | const auto value = diff.getProperty(property); | 221 | const auto value = diff.getProperty(property); |
229 | if (value.isValid()) { | 222 | if (value.isValid()) { |
230 | //SinkTrace() << "Setting property: " << property; | 223 | //SinkTrace() << "Setting property: " << property; |
@@ -237,8 +230,25 @@ bool EntityStore::modify(const QByteArray &type, const ApplicationDomain::Applic | |||
237 | //SinkTrace() << "Removing property: " << property; | 230 | //SinkTrace() << "Removing property: " << property; |
238 | newEntity.setProperty(property, QVariant()); | 231 | newEntity.setProperty(property, QVariant()); |
239 | } | 232 | } |
233 | return newEntity; | ||
234 | } | ||
235 | |||
236 | bool EntityStore::modify(const QByteArray &type, const ApplicationDomain::ApplicationDomainType &diff, const QByteArrayList &deletions, bool replayToSource) | ||
237 | { | ||
238 | const auto current = readLatest(type, diff.identifier()); | ||
239 | if (current.identifier().isEmpty()) { | ||
240 | SinkWarningCtx(d->logCtx) << "Failed to read current version: " << diff.identifier(); | ||
241 | return false; | ||
242 | } | ||
243 | |||
244 | auto newEntity = applyDiff(type, current, diff, deletions); | ||
245 | return modify(type, current, newEntity, replayToSource); | ||
246 | } | ||
247 | |||
248 | bool EntityStore::modify(const QByteArray &type, const ApplicationDomain::ApplicationDomainType ¤t, ApplicationDomain::ApplicationDomainType newEntity, bool replayToSource) | ||
249 | { | ||
250 | SinkTraceCtx(d->logCtx) << "Modified entity: " << newEntity; | ||
240 | 251 | ||
241 | preprocess(current, newEntity); | ||
242 | d->typeIndex(type).remove(current.identifier(), current, d->transaction); | 252 | d->typeIndex(type).remove(current.identifier(), current, d->transaction); |
243 | d->typeIndex(type).add(newEntity.identifier(), newEntity, d->transaction); | 253 | d->typeIndex(type).add(newEntity.identifier(), newEntity, d->transaction); |
244 | 254 | ||
@@ -250,7 +260,7 @@ bool EntityStore::modify(const QByteArray &type, const ApplicationDomain::Applic | |||
250 | flatbuffers::FlatBufferBuilder metadataFbb; | 260 | flatbuffers::FlatBufferBuilder metadataFbb; |
251 | { | 261 | { |
252 | //We add availableProperties to account for the properties that have been changed by the preprocessors | 262 | //We add availableProperties to account for the properties that have been changed by the preprocessors |
253 | auto modifiedProperties = BufferUtils::toVector(metadataFbb, changeset + newEntity.changedProperties()); | 263 | auto modifiedProperties = BufferUtils::toVector(metadataFbb, newEntity.changedProperties()); |
254 | auto metadataBuilder = MetadataBuilder(metadataFbb); | 264 | auto metadataBuilder = MetadataBuilder(metadataFbb); |
255 | metadataBuilder.add_revision(newRevision); | 265 | metadataBuilder.add_revision(newRevision); |
256 | metadataBuilder.add_operation(Operation_Modification); | 266 | metadataBuilder.add_operation(Operation_Modification); |
@@ -259,7 +269,7 @@ bool EntityStore::modify(const QByteArray &type, const ApplicationDomain::Applic | |||
259 | auto metadataBuffer = metadataBuilder.Finish(); | 269 | auto metadataBuffer = metadataBuilder.Finish(); |
260 | FinishMetadataBuffer(metadataFbb, metadataBuffer); | 270 | FinishMetadataBuffer(metadataFbb, metadataBuffer); |
261 | } | 271 | } |
262 | SinkTraceCtx(d->logCtx) << "Changed properties: " << changeset + newEntity.changedProperties(); | 272 | SinkTraceCtx(d->logCtx) << "Changed properties: " << newEntity.changedProperties(); |
263 | 273 | ||
264 | newEntity.setChangedProperties(newEntity.availableProperties().toSet()); | 274 | newEntity.setChangedProperties(newEntity.availableProperties().toSet()); |
265 | 275 | ||
diff --git a/common/storage/entitystore.h b/common/storage/entitystore.h index 46410cd..ddb4ef9 100644 --- a/common/storage/entitystore.h +++ b/common/storage/entitystore.h | |||
@@ -44,9 +44,11 @@ public: | |||
44 | 44 | ||
45 | //Only the pipeline may call the following functions outside of tests | 45 | //Only the pipeline may call the following functions outside of tests |
46 | bool add(const QByteArray &type, const ApplicationDomain::ApplicationDomainType &, bool replayToSource, const PreprocessCreation &); | 46 | bool add(const QByteArray &type, const ApplicationDomain::ApplicationDomainType &, bool replayToSource, const PreprocessCreation &); |
47 | bool modify(const QByteArray &type, const ApplicationDomain::ApplicationDomainType &, const QByteArrayList &deletions, bool replayToSource, const PreprocessModification &); | 47 | bool modify(const QByteArray &type, const ApplicationDomain::ApplicationDomainType &diff, const QByteArrayList &deletions, bool replayToSource); |
48 | bool modify(const QByteArray &type, const ApplicationDomain::ApplicationDomainType ¤t, ApplicationDomain::ApplicationDomainType newEntity, bool replayToSource); | ||
48 | bool remove(const QByteArray &type, const QByteArray &uid, bool replayToSource, const PreprocessRemoval &); | 49 | bool remove(const QByteArray &type, const QByteArray &uid, bool replayToSource, const PreprocessRemoval &); |
49 | bool cleanupRevisions(qint64 revision); | 50 | bool cleanupRevisions(qint64 revision); |
51 | ApplicationDomain::ApplicationDomainType applyDiff(const QByteArray &type, const ApplicationDomain::ApplicationDomainType ¤t, const ApplicationDomain::ApplicationDomainType &diff, const QByteArrayList &deletions) const; | ||
50 | 52 | ||
51 | void startTransaction(Sink::Storage::DataStore::AccessMode); | 53 | void startTransaction(Sink::Storage::DataStore::AccessMode); |
52 | void commitTransaction(); | 54 | void commitTransaction(); |