diff options
Diffstat (limited to 'common/storage/entitystore.cpp')
-rw-r--r-- | common/storage/entitystore.cpp | 160 |
1 files changed, 79 insertions, 81 deletions
diff --git a/common/storage/entitystore.cpp b/common/storage/entitystore.cpp index 276ee6a..454e25a 100644 --- a/common/storage/entitystore.cpp +++ b/common/storage/entitystore.cpp | |||
@@ -38,8 +38,9 @@ using namespace Sink::Storage; | |||
38 | 38 | ||
39 | static QMap<QByteArray, int> baseDbs() | 39 | static QMap<QByteArray, int> baseDbs() |
40 | { | 40 | { |
41 | return {{"revisionType", 0}, | 41 | return {{"revisionType", Storage::IntegerKeys}, |
42 | {"revisions", 0}, | 42 | {"revisions", Storage::IntegerKeys}, |
43 | {"uidsToRevisions", Storage::AllowDuplicates | Storage::IntegerValues}, | ||
43 | {"uids", 0}, | 44 | {"uids", 0}, |
44 | {"default", 0}, | 45 | {"default", 0}, |
45 | {"__flagtable", 0}}; | 46 | {"__flagtable", 0}}; |
@@ -242,12 +243,13 @@ bool EntityStore::add(const QByteArray &type, ApplicationDomainType entity, bool | |||
242 | const auto key = Key(identifier, newRevision); | 243 | const auto key = Key(identifier, newRevision); |
243 | 244 | ||
244 | DataStore::mainDatabase(d->transaction, type) | 245 | DataStore::mainDatabase(d->transaction, type) |
245 | .write(key.toInternalByteArray(), BufferUtils::extractBuffer(fbb), | 246 | .write(newRevision, BufferUtils::extractBuffer(fbb), |
246 | [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Failed to write entity" << entity.identifier() << newRevision; }); | 247 | [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Failed to write entity" << entity.identifier() << newRevision; }); |
248 | |||
247 | DataStore::setMaxRevision(d->transaction, newRevision); | 249 | DataStore::setMaxRevision(d->transaction, newRevision); |
248 | DataStore::recordRevision(d->transaction, newRevision, entity.identifier(), type); | 250 | DataStore::recordRevision(d->transaction, newRevision, entity.identifier(), type); |
249 | DataStore::recordUid(d->transaction, entity.identifier(), type); | 251 | DataStore::recordUid(d->transaction, entity.identifier(), type); |
250 | SinkTraceCtx(d->logCtx) << "Wrote entity: " << entity.identifier() << type << newRevision; | 252 | SinkTraceCtx(d->logCtx) << "Wrote entity: " << key << "of type:" << type; |
251 | return true; | 253 | return true; |
252 | } | 254 | } |
253 | 255 | ||
@@ -319,8 +321,9 @@ bool EntityStore::modify(const QByteArray &type, const ApplicationDomainType &cu | |||
319 | const auto key = Key(identifier, newRevision); | 321 | const auto key = Key(identifier, newRevision); |
320 | 322 | ||
321 | DataStore::mainDatabase(d->transaction, type) | 323 | DataStore::mainDatabase(d->transaction, type) |
322 | .write(key.toInternalByteArray(), BufferUtils::extractBuffer(fbb), | 324 | .write(newRevision, BufferUtils::extractBuffer(fbb), |
323 | [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Failed to write entity" << newEntity.identifier() << newRevision; }); | 325 | [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Failed to write entity" << newEntity.identifier() << newRevision; }); |
326 | |||
324 | DataStore::setMaxRevision(d->transaction, newRevision); | 327 | DataStore::setMaxRevision(d->transaction, newRevision); |
325 | DataStore::recordRevision(d->transaction, newRevision, newEntity.identifier(), type); | 328 | DataStore::recordRevision(d->transaction, newRevision, newEntity.identifier(), type); |
326 | SinkTraceCtx(d->logCtx) << "Wrote modified entity: " << newEntity.identifier() << type << newRevision; | 329 | SinkTraceCtx(d->logCtx) << "Wrote modified entity: " << newEntity.identifier() << type << newRevision; |
@@ -356,8 +359,9 @@ bool EntityStore::remove(const QByteArray &type, const ApplicationDomainType &cu | |||
356 | const auto key = Key(identifier, newRevision); | 359 | const auto key = Key(identifier, newRevision); |
357 | 360 | ||
358 | DataStore::mainDatabase(d->transaction, type) | 361 | DataStore::mainDatabase(d->transaction, type) |
359 | .write(key.toInternalByteArray(), BufferUtils::extractBuffer(fbb), | 362 | .write(newRevision, BufferUtils::extractBuffer(fbb), |
360 | [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Failed to write entity" << uid << newRevision; }); | 363 | [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Failed to write entity" << uid << newRevision; }); |
364 | |||
361 | DataStore::setMaxRevision(d->transaction, newRevision); | 365 | DataStore::setMaxRevision(d->transaction, newRevision); |
362 | DataStore::recordRevision(d->transaction, newRevision, uid, type); | 366 | DataStore::recordRevision(d->transaction, newRevision, uid, type); |
363 | DataStore::removeUid(d->transaction, uid, type); | 367 | DataStore::removeUid(d->transaction, uid, type); |
@@ -375,30 +379,33 @@ void EntityStore::cleanupEntityRevisionsUntil(qint64 revision) | |||
375 | } | 379 | } |
376 | SinkTraceCtx(d->logCtx) << "Cleaning up revision " << revision << uid << bufferType; | 380 | SinkTraceCtx(d->logCtx) << "Cleaning up revision " << revision << uid << bufferType; |
377 | const auto internalUid = Identifier::fromDisplayByteArray(uid).toInternalByteArray(); | 381 | const auto internalUid = Identifier::fromDisplayByteArray(uid).toInternalByteArray(); |
378 | DataStore::mainDatabase(d->transaction, bufferType) | ||
379 | .scan(internalUid, | ||
380 | [&](const QByteArray &key, const QByteArray &data) -> bool { | ||
381 | EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); | ||
382 | if (!buffer.isValid()) { | ||
383 | SinkWarningCtx(d->logCtx) << "Read invalid buffer from disk"; | ||
384 | } else { | ||
385 | const auto metadata = flatbuffers::GetRoot<Metadata>(buffer.metadataBuffer()); | ||
386 | const qint64 rev = metadata->revision(); | ||
387 | const auto isRemoval = metadata->operation() == Operation_Removal; | ||
388 | // Remove old revisions, and the current if the entity has already been removed | ||
389 | if (rev < revision || isRemoval) { | ||
390 | DataStore::removeRevision(d->transaction, rev); | ||
391 | DataStore::mainDatabase(d->transaction, bufferType).remove(key); | ||
392 | } | ||
393 | //Don't cleanup more than specified | ||
394 | if (rev >= revision) { | ||
395 | return false; | ||
396 | } | ||
397 | } | ||
398 | 382 | ||
399 | return true; | 383 | // Remove old revisions |
400 | }, | 384 | const auto revisionsToRemove = DataStore::getRevisionsUntilFromUid(d->transaction, uid, revision); |
401 | [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Error while reading: " << error.message; }, true); | 385 | |
386 | for (const auto &revisionToRemove : revisionsToRemove) { | ||
387 | DataStore::removeRevision(d->transaction, revisionToRemove); | ||
388 | DataStore::mainDatabase(d->transaction, bufferType).remove(revisionToRemove); | ||
389 | } | ||
390 | |||
391 | // And remove the specified revision only if marked for removal | ||
392 | DataStore::mainDatabase(d->transaction, bufferType).scan(revision, [&](size_t, const QByteArray &data) { | ||
393 | EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); | ||
394 | if (!buffer.isValid()) { | ||
395 | SinkWarningCtx(d->logCtx) << "Read invalid buffer from disk"; | ||
396 | return false; | ||
397 | } | ||
398 | |||
399 | const auto metadata = flatbuffers::GetRoot<Metadata>(buffer.metadataBuffer()); | ||
400 | const qint64 rev = metadata->revision(); | ||
401 | if (metadata->operation() == Operation_Removal) { | ||
402 | DataStore::removeRevision(d->transaction, revision); | ||
403 | DataStore::mainDatabase(d->transaction, bufferType).remove(revision); | ||
404 | } | ||
405 | |||
406 | return false; | ||
407 | }); | ||
408 | |||
402 | DataStore::setCleanedUpRevision(d->transaction, revision); | 409 | DataStore::setCleanedUpRevision(d->transaction, revision); |
403 | } | 410 | } |
404 | 411 | ||
@@ -433,20 +440,12 @@ QVector<Identifier> EntityStore::fullScan(const QByteArray &type) | |||
433 | SinkTraceCtx(d->logCtx) << "Database is not existing: " << type; | 440 | SinkTraceCtx(d->logCtx) << "Database is not existing: " << type; |
434 | return {}; | 441 | return {}; |
435 | } | 442 | } |
436 | //The scan can return duplicate results if we have multiple revisions, so we use a set to deduplicate. | 443 | |
437 | QSet<Identifier> keys; | 444 | QSet<Identifier> keys; |
438 | DataStore::mainDatabase(d->getTransaction(), type) | 445 | |
439 | .scan(QByteArray(), | 446 | DataStore::getUids(type, d->getTransaction(), [&keys] (const QByteArray &uid) { |
440 | [&](const QByteArray &key, const QByteArray &value) -> bool { | 447 | keys << Identifier::fromDisplayByteArray(uid); |
441 | const auto uid = Sink::Storage::Key::fromInternalByteArray(key).identifier(); | 448 | }); |
442 | if (keys.contains(uid)) { | ||
443 | //Not something that should persist if the replay works, so we keep a message for now. | ||
444 | SinkTraceCtx(d->logCtx) << "Multiple revisions for uid: " << Sink::Storage::Key::fromInternalByteArray(key) << ". This is normal if changereplay has not completed yet."; | ||
445 | } | ||
446 | keys << uid; | ||
447 | return true; | ||
448 | }, | ||
449 | [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Error during fullScan query: " << error.message; }); | ||
450 | 449 | ||
451 | SinkTraceCtx(d->logCtx) << "Full scan retrieved " << keys.size() << " results."; | 450 | SinkTraceCtx(d->logCtx) << "Full scan retrieved " << keys.size() << " results."; |
452 | return keys.toList().toVector(); | 451 | return keys.toList().toVector(); |
@@ -492,12 +491,12 @@ void EntityStore::indexLookup(const QByteArray &type, const QByteArray &property | |||
492 | void EntityStore::readLatest(const QByteArray &type, const Identifier &id, const std::function<void(const QByteArray &uid, const EntityBuffer &entity)> callback) | 491 | void EntityStore::readLatest(const QByteArray &type, const Identifier &id, const std::function<void(const QByteArray &uid, const EntityBuffer &entity)> callback) |
493 | { | 492 | { |
494 | Q_ASSERT(d); | 493 | Q_ASSERT(d); |
495 | const auto internalKey = id.toInternalByteArray(); | 494 | const size_t revision = DataStore::getLatestRevisionFromUid(d->getTransaction(), id.toDisplayByteArray()); |
496 | auto db = DataStore::mainDatabase(d->getTransaction(), type); | 495 | auto db = DataStore::mainDatabase(d->getTransaction(), type); |
497 | db.findLatest(internalKey, | 496 | db.scan(revision, |
498 | [=](const QByteArray &key, const QByteArray &value) { | 497 | [=](size_t, const QByteArray &value) { |
499 | const auto uid = Sink::Storage::Key::fromInternalByteArray(key).identifier().toDisplayByteArray(); | 498 | callback(id.toDisplayByteArray(), Sink::EntityBuffer(value.data(), value.size())); |
500 | callback(uid, Sink::EntityBuffer(value.data(), value.size())); | 499 | return false; |
501 | }, | 500 | }, |
502 | [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Error during readLatest query: " << error.message << id; }); | 501 | [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Error during readLatest query: " << error.message << id; }); |
503 | } | 502 | } |
@@ -546,9 +545,9 @@ void EntityStore::readEntity(const QByteArray &type, const QByteArray &displayKe | |||
546 | { | 545 | { |
547 | const auto key = Key::fromDisplayByteArray(displayKey); | 546 | const auto key = Key::fromDisplayByteArray(displayKey); |
548 | auto db = DataStore::mainDatabase(d->getTransaction(), type); | 547 | auto db = DataStore::mainDatabase(d->getTransaction(), type); |
549 | db.scan(key.toInternalByteArray(), | 548 | db.scan(key.revision().toSizeT(), |
550 | [=](const QByteArray &key, const QByteArray &value) -> bool { | 549 | [=](size_t rev, const QByteArray &value) -> bool { |
551 | const auto uid = Sink::Storage::Key::fromInternalByteArray(key).identifier().toDisplayByteArray(); | 550 | const auto uid = DataStore::getUidFromRevision(d->transaction, rev); |
552 | callback(uid, Sink::EntityBuffer(value.data(), value.size())); | 551 | callback(uid, Sink::EntityBuffer(value.data(), value.size())); |
553 | return false; | 552 | return false; |
554 | }, | 553 | }, |
@@ -604,18 +603,8 @@ void EntityStore::readRevisions(qint64 baseRevision, const QByteArray &expectedT | |||
604 | 603 | ||
605 | void EntityStore::readPrevious(const QByteArray &type, const Identifier &id, qint64 revision, const std::function<void(const QByteArray &uid, const EntityBuffer &entity)> callback) | 604 | void EntityStore::readPrevious(const QByteArray &type, const Identifier &id, qint64 revision, const std::function<void(const QByteArray &uid, const EntityBuffer &entity)> callback) |
606 | { | 605 | { |
607 | auto db = DataStore::mainDatabase(d->getTransaction(), type); | 606 | const auto previousRevisions = DataStore::getRevisionsUntilFromUid(d->getTransaction(), id.toDisplayByteArray(), revision); |
608 | qint64 latestRevision = 0; | 607 | const size_t latestRevision = previousRevisions[previousRevisions.size() - 1]; |
609 | const auto internalUid = id.toInternalByteArray(); | ||
610 | db.scan(internalUid, | ||
611 | [&latestRevision, revision](const QByteArray &key, const QByteArray &) -> bool { | ||
612 | const auto foundRevision = Key::fromInternalByteArray(key).revision().toQint64(); | ||
613 | if (foundRevision < revision && foundRevision > latestRevision) { | ||
614 | latestRevision = foundRevision; | ||
615 | } | ||
616 | return true; | ||
617 | }, | ||
618 | [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Failed to read current value from storage: " << error.message; }, true); | ||
619 | const auto key = Key(id, latestRevision); | 608 | const auto key = Key(id, latestRevision); |
620 | readEntity(type, key.toDisplayByteArray(), callback); | 609 | readEntity(type, key.toDisplayByteArray(), callback); |
621 | } | 610 | } |
@@ -641,21 +630,20 @@ void EntityStore::readAllUids(const QByteArray &type, const std::function<void(c | |||
641 | DataStore::getUids(type, d->getTransaction(), callback); | 630 | DataStore::getUids(type, d->getTransaction(), callback); |
642 | } | 631 | } |
643 | 632 | ||
644 | bool EntityStore::contains(const QByteArray &type, const QByteArray &uid) | 633 | bool EntityStore::contains(const QByteArray & /* type */, const QByteArray &uid) |
645 | { | 634 | { |
646 | Q_ASSERT(!uid.isEmpty()); | 635 | Q_ASSERT(!uid.isEmpty()); |
647 | const auto internalUid = Identifier::fromDisplayByteArray(uid).toInternalByteArray(); | 636 | return !DataStore::getRevisionsFromUid(d->getTransaction(), uid).isEmpty(); |
648 | return DataStore::mainDatabase(d->getTransaction(), type).contains(internalUid); | ||
649 | } | 637 | } |
650 | 638 | ||
651 | bool EntityStore::exists(const QByteArray &type, const QByteArray &uid) | 639 | bool EntityStore::exists(const QByteArray &type, const QByteArray &uid) |
652 | { | 640 | { |
653 | bool found = false; | 641 | bool found = false; |
654 | bool alreadyRemoved = false; | 642 | bool alreadyRemoved = false; |
655 | const auto internalUid = Identifier::fromDisplayByteArray(uid).toInternalByteArray(); | 643 | const size_t revision = DataStore::getLatestRevisionFromUid(d->getTransaction(), uid); |
656 | DataStore::mainDatabase(d->transaction, type) | 644 | DataStore::mainDatabase(d->transaction, type) |
657 | .findLatest(internalUid, | 645 | .scan(revision, |
658 | [&found, &alreadyRemoved](const QByteArray &key, const QByteArray &data) { | 646 | [&found, &alreadyRemoved](size_t, const QByteArray &data) { |
659 | auto entity = GetEntity(data.data()); | 647 | auto entity = GetEntity(data.data()); |
660 | if (entity && entity->metadata()) { | 648 | if (entity && entity->metadata()) { |
661 | auto metadata = GetMetadata(entity->metadata()->Data()); | 649 | auto metadata = GetMetadata(entity->metadata()->Data()); |
@@ -664,6 +652,7 @@ bool EntityStore::exists(const QByteArray &type, const QByteArray &uid) | |||
664 | alreadyRemoved = true; | 652 | alreadyRemoved = true; |
665 | } | 653 | } |
666 | } | 654 | } |
655 | return true; | ||
667 | }, | 656 | }, |
668 | [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Failed to read old revision from storage: " << error.message; }); | 657 | [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Failed to read old revision from storage: " << error.message; }); |
669 | if (!found) { | 658 | if (!found) { |
@@ -677,23 +666,32 @@ bool EntityStore::exists(const QByteArray &type, const QByteArray &uid) | |||
677 | return true; | 666 | return true; |
678 | } | 667 | } |
679 | 668 | ||
680 | void EntityStore::readRevisions(const QByteArray &type, const QByteArray &uid, qint64 startingRevision, const std::function<void(const QByteArray &uid, qint64 revision, const EntityBuffer &entity)> callback) | 669 | void EntityStore::readRevisions(const QByteArray &type, const QByteArray &uid, qint64 startingRevision, |
670 | const std::function<void(const QByteArray &uid, qint64 revision, const EntityBuffer &entity)> callback) | ||
681 | { | 671 | { |
682 | Q_ASSERT(d); | 672 | Q_ASSERT(d); |
683 | Q_ASSERT(!uid.isEmpty()); | 673 | Q_ASSERT(!uid.isEmpty()); |
684 | const auto internalUid = Identifier::fromDisplayByteArray(uid).toInternalByteArray(); | ||
685 | DataStore::mainDatabase(d->transaction, type) | ||
686 | .scan(internalUid, | ||
687 | [&](const QByteArray &key, const QByteArray &value) -> bool { | ||
688 | const auto parsedKey = Key::fromInternalByteArray(key); | ||
689 | const auto revision = parsedKey.revision().toQint64(); | ||
690 | if (revision >= startingRevision) { | ||
691 | callback(parsedKey.identifier().toDisplayByteArray(), revision, Sink::EntityBuffer(value.data(), value.size())); | ||
692 | } | ||
693 | return true; | ||
694 | }, | ||
695 | [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Error while reading: " << error.message; }, true); | ||
696 | 674 | ||
675 | const auto revisions = DataStore::getRevisionsFromUid(d->transaction, uid); | ||
676 | |||
677 | const auto db = DataStore::mainDatabase(d->transaction, type); | ||
678 | |||
679 | for (const auto revision : revisions) { | ||
680 | if (revision < startingRevision) { | ||
681 | continue; | ||
682 | } | ||
683 | |||
684 | db.scan(revision, | ||
685 | [&](size_t rev, const QByteArray &value) { | ||
686 | Q_ASSERT(rev == revision); | ||
687 | callback(uid, revision, Sink::EntityBuffer(value.data(), value.size())); | ||
688 | return false; | ||
689 | }, | ||
690 | [&](const DataStore::Error &error) { | ||
691 | SinkWarningCtx(d->logCtx) << "Error while reading: " << error.message; | ||
692 | }, | ||
693 | true); | ||
694 | } | ||
697 | } | 695 | } |
698 | 696 | ||
699 | qint64 EntityStore::maxRevision() | 697 | qint64 EntityStore::maxRevision() |