diff options
Diffstat (limited to 'common/storage/entitystore.cpp')
-rw-r--r-- | common/storage/entitystore.cpp | 127 |
1 files changed, 64 insertions, 63 deletions
diff --git a/common/storage/entitystore.cpp b/common/storage/entitystore.cpp index 909f1b2..b7309ab 100644 --- a/common/storage/entitystore.cpp +++ b/common/storage/entitystore.cpp | |||
@@ -31,8 +31,7 @@ | |||
31 | #include "bufferutils.h" | 31 | #include "bufferutils.h" |
32 | #include "entity_generated.h" | 32 | #include "entity_generated.h" |
33 | #include "applicationdomaintype_p.h" | 33 | #include "applicationdomaintype_p.h" |
34 | 34 | #include "typeimplementations.h" | |
35 | #include "domaintypes.h" | ||
36 | 35 | ||
37 | using namespace Sink; | 36 | using namespace Sink; |
38 | using namespace Sink::Storage; | 37 | using namespace Sink::Storage; |
@@ -168,19 +167,15 @@ void EntityStore::copyBlobs(ApplicationDomain::ApplicationDomainType &entity, qi | |||
168 | } | 167 | } |
169 | } | 168 | } |
170 | 169 | ||
171 | bool EntityStore::add(const QByteArray &type, const ApplicationDomain::ApplicationDomainType &entity_, bool replayToSource, const PreprocessCreation &preprocess) | 170 | bool EntityStore::add(const QByteArray &type, ApplicationDomain::ApplicationDomainType entity, bool replayToSource) |
172 | { | 171 | { |
173 | if (entity_.identifier().isEmpty()) { | 172 | if (entity.identifier().isEmpty()) { |
174 | SinkWarningCtx(d->logCtx) << "Can't write entity with an empty identifier"; | 173 | SinkWarningCtx(d->logCtx) << "Can't write entity with an empty identifier"; |
175 | return false; | 174 | return false; |
176 | } | 175 | } |
177 | 176 | ||
178 | auto entity = *ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<ApplicationDomain::ApplicationDomainType>(entity_, entity_.availableProperties()); | ||
179 | entity.setChangedProperties(entity.availableProperties().toSet()); | ||
180 | |||
181 | SinkTraceCtx(d->logCtx) << "New entity " << entity; | 177 | SinkTraceCtx(d->logCtx) << "New entity " << entity; |
182 | 178 | ||
183 | preprocess(entity); | ||
184 | d->typeIndex(type).add(entity.identifier(), entity, d->transaction); | 179 | d->typeIndex(type).add(entity.identifier(), entity, d->transaction); |
185 | 180 | ||
186 | //The maxRevision may have changed meanwhile if the entity created sub-entities | 181 | //The maxRevision may have changed meanwhile if the entity created sub-entities |
@@ -205,26 +200,20 @@ bool EntityStore::add(const QByteArray &type, const ApplicationDomain::Applicati | |||
205 | [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Failed to write entity" << entity.identifier() << newRevision; }); | 200 | [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Failed to write entity" << entity.identifier() << newRevision; }); |
206 | DataStore::setMaxRevision(d->transaction, newRevision); | 201 | DataStore::setMaxRevision(d->transaction, newRevision); |
207 | DataStore::recordRevision(d->transaction, newRevision, entity.identifier(), type); | 202 | DataStore::recordRevision(d->transaction, newRevision, entity.identifier(), type); |
203 | DataStore::recordUid(d->transaction, entity.identifier()); | ||
208 | SinkTraceCtx(d->logCtx) << "Wrote entity: " << entity.identifier() << type << newRevision; | 204 | SinkTraceCtx(d->logCtx) << "Wrote entity: " << entity.identifier() << type << newRevision; |
209 | return true; | 205 | return true; |
210 | } | 206 | } |
211 | 207 | ||
212 | bool EntityStore::modify(const QByteArray &type, const ApplicationDomain::ApplicationDomainType &diff, const QByteArrayList &deletions, bool replayToSource, const PreprocessModification &preprocess) | 208 | ApplicationDomain::ApplicationDomainType EntityStore::applyDiff(const QByteArray &type, const ApplicationDomain::ApplicationDomainType ¤t, const ApplicationDomain::ApplicationDomainType &diff, const QByteArrayList &deletions) const |
213 | { | 209 | { |
214 | auto changeset = diff.changedProperties(); | ||
215 | const auto current = readLatest(type, diff.identifier()); | ||
216 | if (current.identifier().isEmpty()) { | ||
217 | SinkWarningCtx(d->logCtx) << "Failed to read current version: " << diff.identifier(); | ||
218 | return false; | ||
219 | } | ||
220 | |||
221 | auto newEntity = *ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<ApplicationDomain::ApplicationDomainType>(current, current.availableProperties()); | 210 | auto newEntity = *ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<ApplicationDomain::ApplicationDomainType>(current, current.availableProperties()); |
222 | 211 | ||
223 | SinkTraceCtx(d->logCtx) << "Modified entity: " << newEntity; | 212 | SinkTraceCtx(d->logCtx) << "Modified entity: " << newEntity; |
224 | 213 | ||
225 | // Apply diff | 214 | // Apply diff |
226 | //SinkTrace() << "Applying changed properties: " << changeset; | 215 | //SinkTrace() << "Applying changed properties: " << changeset; |
227 | for (const auto &property : changeset) { | 216 | for (const auto &property : diff.changedProperties()) { |
228 | const auto value = diff.getProperty(property); | 217 | const auto value = diff.getProperty(property); |
229 | if (value.isValid()) { | 218 | if (value.isValid()) { |
230 | //SinkTrace() << "Setting property: " << property; | 219 | //SinkTrace() << "Setting property: " << property; |
@@ -237,8 +226,25 @@ bool EntityStore::modify(const QByteArray &type, const ApplicationDomain::Applic | |||
237 | //SinkTrace() << "Removing property: " << property; | 226 | //SinkTrace() << "Removing property: " << property; |
238 | newEntity.setProperty(property, QVariant()); | 227 | newEntity.setProperty(property, QVariant()); |
239 | } | 228 | } |
229 | return newEntity; | ||
230 | } | ||
231 | |||
232 | bool EntityStore::modify(const QByteArray &type, const ApplicationDomain::ApplicationDomainType &diff, const QByteArrayList &deletions, bool replayToSource) | ||
233 | { | ||
234 | const auto current = readLatest(type, diff.identifier()); | ||
235 | if (current.identifier().isEmpty()) { | ||
236 | SinkWarningCtx(d->logCtx) << "Failed to read current version: " << diff.identifier(); | ||
237 | return false; | ||
238 | } | ||
239 | |||
240 | auto newEntity = applyDiff(type, current, diff, deletions); | ||
241 | return modify(type, current, newEntity, replayToSource); | ||
242 | } | ||
243 | |||
244 | bool EntityStore::modify(const QByteArray &type, const ApplicationDomain::ApplicationDomainType ¤t, ApplicationDomain::ApplicationDomainType newEntity, bool replayToSource) | ||
245 | { | ||
246 | SinkTraceCtx(d->logCtx) << "Modified entity: " << newEntity; | ||
240 | 247 | ||
241 | preprocess(current, newEntity); | ||
242 | d->typeIndex(type).remove(current.identifier(), current, d->transaction); | 248 | d->typeIndex(type).remove(current.identifier(), current, d->transaction); |
243 | d->typeIndex(type).add(newEntity.identifier(), newEntity, d->transaction); | 249 | d->typeIndex(type).add(newEntity.identifier(), newEntity, d->transaction); |
244 | 250 | ||
@@ -250,7 +256,7 @@ bool EntityStore::modify(const QByteArray &type, const ApplicationDomain::Applic | |||
250 | flatbuffers::FlatBufferBuilder metadataFbb; | 256 | flatbuffers::FlatBufferBuilder metadataFbb; |
251 | { | 257 | { |
252 | //We add availableProperties to account for the properties that have been changed by the preprocessors | 258 | //We add availableProperties to account for the properties that have been changed by the preprocessors |
253 | auto modifiedProperties = BufferUtils::toVector(metadataFbb, changeset + newEntity.changedProperties()); | 259 | auto modifiedProperties = BufferUtils::toVector(metadataFbb, newEntity.changedProperties()); |
254 | auto metadataBuilder = MetadataBuilder(metadataFbb); | 260 | auto metadataBuilder = MetadataBuilder(metadataFbb); |
255 | metadataBuilder.add_revision(newRevision); | 261 | metadataBuilder.add_revision(newRevision); |
256 | metadataBuilder.add_operation(Operation_Modification); | 262 | metadataBuilder.add_operation(Operation_Modification); |
@@ -259,7 +265,7 @@ bool EntityStore::modify(const QByteArray &type, const ApplicationDomain::Applic | |||
259 | auto metadataBuffer = metadataBuilder.Finish(); | 265 | auto metadataBuffer = metadataBuilder.Finish(); |
260 | FinishMetadataBuffer(metadataFbb, metadataBuffer); | 266 | FinishMetadataBuffer(metadataFbb, metadataBuffer); |
261 | } | 267 | } |
262 | SinkTraceCtx(d->logCtx) << "Changed properties: " << changeset + newEntity.changedProperties(); | 268 | SinkTraceCtx(d->logCtx) << "Changed properties: " << newEntity.changedProperties(); |
263 | 269 | ||
264 | newEntity.setChangedProperties(newEntity.availableProperties().toSet()); | 270 | newEntity.setChangedProperties(newEntity.availableProperties().toSet()); |
265 | 271 | ||
@@ -275,36 +281,14 @@ bool EntityStore::modify(const QByteArray &type, const ApplicationDomain::Applic | |||
275 | return true; | 281 | return true; |
276 | } | 282 | } |
277 | 283 | ||
278 | bool EntityStore::remove(const QByteArray &type, const QByteArray &uid, bool replayToSource, const PreprocessRemoval &preprocess) | 284 | bool EntityStore::remove(const QByteArray &type, const Sink::ApplicationDomain::ApplicationDomainType ¤t, bool replayToSource) |
279 | { | 285 | { |
280 | bool found = false; | 286 | const auto uid = current.identifier(); |
281 | bool alreadyRemoved = false; | 287 | if (!exists(type, uid)) { |
282 | DataStore::mainDatabase(d->transaction, type) | ||
283 | .findLatest(uid, | ||
284 | [&found, &alreadyRemoved](const QByteArray &key, const QByteArray &data) -> bool { | ||
285 | auto entity = GetEntity(data.data()); | ||
286 | if (entity && entity->metadata()) { | ||
287 | auto metadata = GetMetadata(entity->metadata()->Data()); | ||
288 | found = true; | ||
289 | if (metadata->operation() == Operation_Removal) { | ||
290 | alreadyRemoved = true; | ||
291 | } | ||
292 | } | ||
293 | return false; | ||
294 | }, | ||
295 | [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Failed to read old revision from storage: " << error.message; }); | ||
296 | |||
297 | if (!found) { | ||
298 | SinkWarningCtx(d->logCtx) << "Remove: Failed to find entity " << uid; | ||
299 | return false; | ||
300 | } | ||
301 | if (alreadyRemoved) { | ||
302 | SinkWarningCtx(d->logCtx) << "Remove: Entity is already removed " << uid; | 288 | SinkWarningCtx(d->logCtx) << "Remove: Entity is already removed " << uid; |
303 | return false; | 289 | return false; |
304 | } | 290 | } |
305 | 291 | ||
306 | const auto current = readLatest(type, uid); | ||
307 | preprocess(current); | ||
308 | d->typeIndex(type).remove(current.identifier(), current, d->transaction); | 292 | d->typeIndex(type).remove(current.identifier(), current, d->transaction); |
309 | 293 | ||
310 | SinkTraceCtx(d->logCtx) << "Removed entity " << current; | 294 | SinkTraceCtx(d->logCtx) << "Removed entity " << current; |
@@ -328,6 +312,7 @@ bool EntityStore::remove(const QByteArray &type, const QByteArray &uid, bool rep | |||
328 | [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Failed to write entity" << uid << newRevision; }); | 312 | [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Failed to write entity" << uid << newRevision; }); |
329 | DataStore::setMaxRevision(d->transaction, newRevision); | 313 | DataStore::setMaxRevision(d->transaction, newRevision); |
330 | DataStore::recordRevision(d->transaction, newRevision, uid, type); | 314 | DataStore::recordRevision(d->transaction, newRevision, uid, type); |
315 | DataStore::removeUid(d->transaction, uid); | ||
331 | return true; | 316 | return true; |
332 | } | 317 | } |
333 | 318 | ||
@@ -521,15 +506,9 @@ ApplicationDomain::ApplicationDomainType EntityStore::readEntity(const QByteArra | |||
521 | 506 | ||
522 | void EntityStore::readAll(const QByteArray &type, const std::function<void(const ApplicationDomain::ApplicationDomainType &entity)> &callback) | 507 | void EntityStore::readAll(const QByteArray &type, const std::function<void(const ApplicationDomain::ApplicationDomainType &entity)> &callback) |
523 | { | 508 | { |
524 | auto db = DataStore::mainDatabase(d->getTransaction(), type); | 509 | readAllUids(type, [&] (const QByteArray &uid) { |
525 | db.scan("", | 510 | readLatest(type, uid, callback); |
526 | [=](const QByteArray &key, const QByteArray &value) -> bool { | 511 | }); |
527 | auto uid = DataStore::uidFromKey(key); | ||
528 | auto buffer = Sink::EntityBuffer{value.data(), value.size()}; | ||
529 | callback(d->createApplicationDomainType(type, uid, DataStore::maxRevision(d->getTransaction()), buffer)); | ||
530 | return true; | ||
531 | }, | ||
532 | [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Error during query: " << error.message; }); | ||
533 | } | 512 | } |
534 | 513 | ||
535 | void EntityStore::readRevisions(qint64 baseRevision, const QByteArray &expectedType, const std::function<void(const QByteArray &key)> &callback) | 514 | void EntityStore::readRevisions(qint64 baseRevision, const QByteArray &expectedType, const std::function<void(const QByteArray &key)> &callback) |
@@ -588,15 +567,7 @@ ApplicationDomain::ApplicationDomainType EntityStore::readPrevious(const QByteAr | |||
588 | 567 | ||
589 | void EntityStore::readAllUids(const QByteArray &type, const std::function<void(const QByteArray &uid)> callback) | 568 | void EntityStore::readAllUids(const QByteArray &type, const std::function<void(const QByteArray &uid)> callback) |
590 | { | 569 | { |
591 | //TODO use uid index instead | 570 | DataStore::getUids(d->getTransaction(), callback); |
592 | //FIXME we currently report each uid for every revision with the same uid | ||
593 | auto db = DataStore::mainDatabase(d->getTransaction(), type); | ||
594 | db.scan("", | ||
595 | [callback](const QByteArray &key, const QByteArray &) -> bool { | ||
596 | callback(Sink::Storage::DataStore::uidFromKey(key)); | ||
597 | return true; | ||
598 | }, | ||
599 | [&](const Sink::Storage::DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Failed to read current value from storage: " << error.message; }); | ||
600 | } | 571 | } |
601 | 572 | ||
602 | bool EntityStore::contains(const QByteArray &type, const QByteArray &uid) | 573 | bool EntityStore::contains(const QByteArray &type, const QByteArray &uid) |
@@ -604,6 +575,36 @@ bool EntityStore::contains(const QByteArray &type, const QByteArray &uid) | |||
604 | return DataStore::mainDatabase(d->getTransaction(), type).contains(uid); | 575 | return DataStore::mainDatabase(d->getTransaction(), type).contains(uid); |
605 | } | 576 | } |
606 | 577 | ||
578 | bool EntityStore::exists(const QByteArray &type, const QByteArray &uid) | ||
579 | { | ||
580 | bool found = false; | ||
581 | bool alreadyRemoved = false; | ||
582 | DataStore::mainDatabase(d->transaction, type) | ||
583 | .findLatest(uid, | ||
584 | [&found, &alreadyRemoved](const QByteArray &key, const QByteArray &data) -> bool { | ||
585 | auto entity = GetEntity(data.data()); | ||
586 | if (entity && entity->metadata()) { | ||
587 | auto metadata = GetMetadata(entity->metadata()->Data()); | ||
588 | found = true; | ||
589 | if (metadata->operation() == Operation_Removal) { | ||
590 | alreadyRemoved = true; | ||
591 | } | ||
592 | } | ||
593 | return false; | ||
594 | }, | ||
595 | [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Failed to read old revision from storage: " << error.message; }); | ||
596 | if (!found) { | ||
597 | SinkTraceCtx(d->logCtx) << "Remove: Failed to find entity " << uid; | ||
598 | return false; | ||
599 | } | ||
600 | if (alreadyRemoved) { | ||
601 | SinkTraceCtx(d->logCtx) << "Remove: Entity is already removed " << uid; | ||
602 | return false; | ||
603 | } | ||
604 | return true; | ||
605 | } | ||
606 | |||
607 | |||
607 | qint64 EntityStore::maxRevision() | 608 | qint64 EntityStore::maxRevision() |
608 | { | 609 | { |
609 | if (!d->exists()) { | 610 | if (!d->exists()) { |