summaryrefslogtreecommitdiffstats
path: root/common/storage/entitystore.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'common/storage/entitystore.cpp')
-rw-r--r--common/storage/entitystore.cpp160
1 files changed, 79 insertions, 81 deletions
diff --git a/common/storage/entitystore.cpp b/common/storage/entitystore.cpp
index 276ee6a..454e25a 100644
--- a/common/storage/entitystore.cpp
+++ b/common/storage/entitystore.cpp
@@ -38,8 +38,9 @@ using namespace Sink::Storage;
38 38
39static QMap<QByteArray, int> baseDbs() 39static QMap<QByteArray, int> baseDbs()
40{ 40{
41 return {{"revisionType", 0}, 41 return {{"revisionType", Storage::IntegerKeys},
42 {"revisions", 0}, 42 {"revisions", Storage::IntegerKeys},
43 {"uidsToRevisions", Storage::AllowDuplicates | Storage::IntegerValues},
43 {"uids", 0}, 44 {"uids", 0},
44 {"default", 0}, 45 {"default", 0},
45 {"__flagtable", 0}}; 46 {"__flagtable", 0}};
@@ -242,12 +243,13 @@ bool EntityStore::add(const QByteArray &type, ApplicationDomainType entity, bool
242 const auto key = Key(identifier, newRevision); 243 const auto key = Key(identifier, newRevision);
243 244
244 DataStore::mainDatabase(d->transaction, type) 245 DataStore::mainDatabase(d->transaction, type)
245 .write(key.toInternalByteArray(), BufferUtils::extractBuffer(fbb), 246 .write(newRevision, BufferUtils::extractBuffer(fbb),
246 [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Failed to write entity" << entity.identifier() << newRevision; }); 247 [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Failed to write entity" << entity.identifier() << newRevision; });
248
247 DataStore::setMaxRevision(d->transaction, newRevision); 249 DataStore::setMaxRevision(d->transaction, newRevision);
248 DataStore::recordRevision(d->transaction, newRevision, entity.identifier(), type); 250 DataStore::recordRevision(d->transaction, newRevision, entity.identifier(), type);
249 DataStore::recordUid(d->transaction, entity.identifier(), type); 251 DataStore::recordUid(d->transaction, entity.identifier(), type);
250 SinkTraceCtx(d->logCtx) << "Wrote entity: " << entity.identifier() << type << newRevision; 252 SinkTraceCtx(d->logCtx) << "Wrote entity: " << key << "of type:" << type;
251 return true; 253 return true;
252} 254}
253 255
@@ -319,8 +321,9 @@ bool EntityStore::modify(const QByteArray &type, const ApplicationDomainType &cu
319 const auto key = Key(identifier, newRevision); 321 const auto key = Key(identifier, newRevision);
320 322
321 DataStore::mainDatabase(d->transaction, type) 323 DataStore::mainDatabase(d->transaction, type)
322 .write(key.toInternalByteArray(), BufferUtils::extractBuffer(fbb), 324 .write(newRevision, BufferUtils::extractBuffer(fbb),
323 [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Failed to write entity" << newEntity.identifier() << newRevision; }); 325 [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Failed to write entity" << newEntity.identifier() << newRevision; });
326
324 DataStore::setMaxRevision(d->transaction, newRevision); 327 DataStore::setMaxRevision(d->transaction, newRevision);
325 DataStore::recordRevision(d->transaction, newRevision, newEntity.identifier(), type); 328 DataStore::recordRevision(d->transaction, newRevision, newEntity.identifier(), type);
326 SinkTraceCtx(d->logCtx) << "Wrote modified entity: " << newEntity.identifier() << type << newRevision; 329 SinkTraceCtx(d->logCtx) << "Wrote modified entity: " << newEntity.identifier() << type << newRevision;
@@ -356,8 +359,9 @@ bool EntityStore::remove(const QByteArray &type, const ApplicationDomainType &cu
356 const auto key = Key(identifier, newRevision); 359 const auto key = Key(identifier, newRevision);
357 360
358 DataStore::mainDatabase(d->transaction, type) 361 DataStore::mainDatabase(d->transaction, type)
359 .write(key.toInternalByteArray(), BufferUtils::extractBuffer(fbb), 362 .write(newRevision, BufferUtils::extractBuffer(fbb),
360 [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Failed to write entity" << uid << newRevision; }); 363 [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Failed to write entity" << uid << newRevision; });
364
361 DataStore::setMaxRevision(d->transaction, newRevision); 365 DataStore::setMaxRevision(d->transaction, newRevision);
362 DataStore::recordRevision(d->transaction, newRevision, uid, type); 366 DataStore::recordRevision(d->transaction, newRevision, uid, type);
363 DataStore::removeUid(d->transaction, uid, type); 367 DataStore::removeUid(d->transaction, uid, type);
@@ -375,30 +379,33 @@ void EntityStore::cleanupEntityRevisionsUntil(qint64 revision)
375 } 379 }
376 SinkTraceCtx(d->logCtx) << "Cleaning up revision " << revision << uid << bufferType; 380 SinkTraceCtx(d->logCtx) << "Cleaning up revision " << revision << uid << bufferType;
377 const auto internalUid = Identifier::fromDisplayByteArray(uid).toInternalByteArray(); 381 const auto internalUid = Identifier::fromDisplayByteArray(uid).toInternalByteArray();
378 DataStore::mainDatabase(d->transaction, bufferType)
379 .scan(internalUid,
380 [&](const QByteArray &key, const QByteArray &data) -> bool {
381 EntityBuffer buffer(const_cast<const char *>(data.data()), data.size());
382 if (!buffer.isValid()) {
383 SinkWarningCtx(d->logCtx) << "Read invalid buffer from disk";
384 } else {
385 const auto metadata = flatbuffers::GetRoot<Metadata>(buffer.metadataBuffer());
386 const qint64 rev = metadata->revision();
387 const auto isRemoval = metadata->operation() == Operation_Removal;
388 // Remove old revisions, and the current if the entity has already been removed
389 if (rev < revision || isRemoval) {
390 DataStore::removeRevision(d->transaction, rev);
391 DataStore::mainDatabase(d->transaction, bufferType).remove(key);
392 }
393 //Don't cleanup more than specified
394 if (rev >= revision) {
395 return false;
396 }
397 }
398 382
399 return true; 383 // Remove old revisions
400 }, 384 const auto revisionsToRemove = DataStore::getRevisionsUntilFromUid(d->transaction, uid, revision);
401 [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Error while reading: " << error.message; }, true); 385
386 for (const auto &revisionToRemove : revisionsToRemove) {
387 DataStore::removeRevision(d->transaction, revisionToRemove);
388 DataStore::mainDatabase(d->transaction, bufferType).remove(revisionToRemove);
389 }
390
391 // And remove the specified revision only if marked for removal
392 DataStore::mainDatabase(d->transaction, bufferType).scan(revision, [&](size_t, const QByteArray &data) {
393 EntityBuffer buffer(const_cast<const char *>(data.data()), data.size());
394 if (!buffer.isValid()) {
395 SinkWarningCtx(d->logCtx) << "Read invalid buffer from disk";
396 return false;
397 }
398
399 const auto metadata = flatbuffers::GetRoot<Metadata>(buffer.metadataBuffer());
400 const qint64 rev = metadata->revision();
401 if (metadata->operation() == Operation_Removal) {
402 DataStore::removeRevision(d->transaction, revision);
403 DataStore::mainDatabase(d->transaction, bufferType).remove(revision);
404 }
405
406 return false;
407 });
408
402 DataStore::setCleanedUpRevision(d->transaction, revision); 409 DataStore::setCleanedUpRevision(d->transaction, revision);
403} 410}
404 411
@@ -433,20 +440,12 @@ QVector<Identifier> EntityStore::fullScan(const QByteArray &type)
433 SinkTraceCtx(d->logCtx) << "Database is not existing: " << type; 440 SinkTraceCtx(d->logCtx) << "Database is not existing: " << type;
434 return {}; 441 return {};
435 } 442 }
436 //The scan can return duplicate results if we have multiple revisions, so we use a set to deduplicate. 443
437 QSet<Identifier> keys; 444 QSet<Identifier> keys;
438 DataStore::mainDatabase(d->getTransaction(), type) 445
439 .scan(QByteArray(), 446 DataStore::getUids(type, d->getTransaction(), [&keys] (const QByteArray &uid) {
440 [&](const QByteArray &key, const QByteArray &value) -> bool { 447 keys << Identifier::fromDisplayByteArray(uid);
441 const auto uid = Sink::Storage::Key::fromInternalByteArray(key).identifier(); 448 });
442 if (keys.contains(uid)) {
443 //Not something that should persist if the replay works, so we keep a message for now.
444 SinkTraceCtx(d->logCtx) << "Multiple revisions for uid: " << Sink::Storage::Key::fromInternalByteArray(key) << ". This is normal if changereplay has not completed yet.";
445 }
446 keys << uid;
447 return true;
448 },
449 [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Error during fullScan query: " << error.message; });
450 449
451 SinkTraceCtx(d->logCtx) << "Full scan retrieved " << keys.size() << " results."; 450 SinkTraceCtx(d->logCtx) << "Full scan retrieved " << keys.size() << " results.";
452 return keys.toList().toVector(); 451 return keys.toList().toVector();
@@ -492,12 +491,12 @@ void EntityStore::indexLookup(const QByteArray &type, const QByteArray &property
492void EntityStore::readLatest(const QByteArray &type, const Identifier &id, const std::function<void(const QByteArray &uid, const EntityBuffer &entity)> callback) 491void EntityStore::readLatest(const QByteArray &type, const Identifier &id, const std::function<void(const QByteArray &uid, const EntityBuffer &entity)> callback)
493{ 492{
494 Q_ASSERT(d); 493 Q_ASSERT(d);
495 const auto internalKey = id.toInternalByteArray(); 494 const size_t revision = DataStore::getLatestRevisionFromUid(d->getTransaction(), id.toDisplayByteArray());
496 auto db = DataStore::mainDatabase(d->getTransaction(), type); 495 auto db = DataStore::mainDatabase(d->getTransaction(), type);
497 db.findLatest(internalKey, 496 db.scan(revision,
498 [=](const QByteArray &key, const QByteArray &value) { 497 [=](size_t, const QByteArray &value) {
499 const auto uid = Sink::Storage::Key::fromInternalByteArray(key).identifier().toDisplayByteArray(); 498 callback(id.toDisplayByteArray(), Sink::EntityBuffer(value.data(), value.size()));
500 callback(uid, Sink::EntityBuffer(value.data(), value.size())); 499 return false;
501 }, 500 },
502 [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Error during readLatest query: " << error.message << id; }); 501 [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Error during readLatest query: " << error.message << id; });
503} 502}
@@ -546,9 +545,9 @@ void EntityStore::readEntity(const QByteArray &type, const QByteArray &displayKe
546{ 545{
547 const auto key = Key::fromDisplayByteArray(displayKey); 546 const auto key = Key::fromDisplayByteArray(displayKey);
548 auto db = DataStore::mainDatabase(d->getTransaction(), type); 547 auto db = DataStore::mainDatabase(d->getTransaction(), type);
549 db.scan(key.toInternalByteArray(), 548 db.scan(key.revision().toSizeT(),
550 [=](const QByteArray &key, const QByteArray &value) -> bool { 549 [=](size_t rev, const QByteArray &value) -> bool {
551 const auto uid = Sink::Storage::Key::fromInternalByteArray(key).identifier().toDisplayByteArray(); 550 const auto uid = DataStore::getUidFromRevision(d->transaction, rev);
552 callback(uid, Sink::EntityBuffer(value.data(), value.size())); 551 callback(uid, Sink::EntityBuffer(value.data(), value.size()));
553 return false; 552 return false;
554 }, 553 },
@@ -604,18 +603,8 @@ void EntityStore::readRevisions(qint64 baseRevision, const QByteArray &expectedT
604 603
605void EntityStore::readPrevious(const QByteArray &type, const Identifier &id, qint64 revision, const std::function<void(const QByteArray &uid, const EntityBuffer &entity)> callback) 604void EntityStore::readPrevious(const QByteArray &type, const Identifier &id, qint64 revision, const std::function<void(const QByteArray &uid, const EntityBuffer &entity)> callback)
606{ 605{
607 auto db = DataStore::mainDatabase(d->getTransaction(), type); 606 const auto previousRevisions = DataStore::getRevisionsUntilFromUid(d->getTransaction(), id.toDisplayByteArray(), revision);
608 qint64 latestRevision = 0; 607 const size_t latestRevision = previousRevisions[previousRevisions.size() - 1];
609 const auto internalUid = id.toInternalByteArray();
610 db.scan(internalUid,
611 [&latestRevision, revision](const QByteArray &key, const QByteArray &) -> bool {
612 const auto foundRevision = Key::fromInternalByteArray(key).revision().toQint64();
613 if (foundRevision < revision && foundRevision > latestRevision) {
614 latestRevision = foundRevision;
615 }
616 return true;
617 },
618 [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Failed to read current value from storage: " << error.message; }, true);
619 const auto key = Key(id, latestRevision); 608 const auto key = Key(id, latestRevision);
620 readEntity(type, key.toDisplayByteArray(), callback); 609 readEntity(type, key.toDisplayByteArray(), callback);
621} 610}
@@ -641,21 +630,20 @@ void EntityStore::readAllUids(const QByteArray &type, const std::function<void(c
641 DataStore::getUids(type, d->getTransaction(), callback); 630 DataStore::getUids(type, d->getTransaction(), callback);
642} 631}
643 632
644bool EntityStore::contains(const QByteArray &type, const QByteArray &uid) 633bool EntityStore::contains(const QByteArray & /* type */, const QByteArray &uid)
645{ 634{
646 Q_ASSERT(!uid.isEmpty()); 635 Q_ASSERT(!uid.isEmpty());
647 const auto internalUid = Identifier::fromDisplayByteArray(uid).toInternalByteArray(); 636 return !DataStore::getRevisionsFromUid(d->getTransaction(), uid).isEmpty();
648 return DataStore::mainDatabase(d->getTransaction(), type).contains(internalUid);
649} 637}
650 638
651bool EntityStore::exists(const QByteArray &type, const QByteArray &uid) 639bool EntityStore::exists(const QByteArray &type, const QByteArray &uid)
652{ 640{
653 bool found = false; 641 bool found = false;
654 bool alreadyRemoved = false; 642 bool alreadyRemoved = false;
655 const auto internalUid = Identifier::fromDisplayByteArray(uid).toInternalByteArray(); 643 const size_t revision = DataStore::getLatestRevisionFromUid(d->getTransaction(), uid);
656 DataStore::mainDatabase(d->transaction, type) 644 DataStore::mainDatabase(d->transaction, type)
657 .findLatest(internalUid, 645 .scan(revision,
658 [&found, &alreadyRemoved](const QByteArray &key, const QByteArray &data) { 646 [&found, &alreadyRemoved](size_t, const QByteArray &data) {
659 auto entity = GetEntity(data.data()); 647 auto entity = GetEntity(data.data());
660 if (entity && entity->metadata()) { 648 if (entity && entity->metadata()) {
661 auto metadata = GetMetadata(entity->metadata()->Data()); 649 auto metadata = GetMetadata(entity->metadata()->Data());
@@ -664,6 +652,7 @@ bool EntityStore::exists(const QByteArray &type, const QByteArray &uid)
664 alreadyRemoved = true; 652 alreadyRemoved = true;
665 } 653 }
666 } 654 }
655 return true;
667 }, 656 },
668 [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Failed to read old revision from storage: " << error.message; }); 657 [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Failed to read old revision from storage: " << error.message; });
669 if (!found) { 658 if (!found) {
@@ -677,23 +666,32 @@ bool EntityStore::exists(const QByteArray &type, const QByteArray &uid)
677 return true; 666 return true;
678} 667}
679 668
680void EntityStore::readRevisions(const QByteArray &type, const QByteArray &uid, qint64 startingRevision, const std::function<void(const QByteArray &uid, qint64 revision, const EntityBuffer &entity)> callback) 669void EntityStore::readRevisions(const QByteArray &type, const QByteArray &uid, qint64 startingRevision,
670 const std::function<void(const QByteArray &uid, qint64 revision, const EntityBuffer &entity)> callback)
681{ 671{
682 Q_ASSERT(d); 672 Q_ASSERT(d);
683 Q_ASSERT(!uid.isEmpty()); 673 Q_ASSERT(!uid.isEmpty());
684 const auto internalUid = Identifier::fromDisplayByteArray(uid).toInternalByteArray();
685 DataStore::mainDatabase(d->transaction, type)
686 .scan(internalUid,
687 [&](const QByteArray &key, const QByteArray &value) -> bool {
688 const auto parsedKey = Key::fromInternalByteArray(key);
689 const auto revision = parsedKey.revision().toQint64();
690 if (revision >= startingRevision) {
691 callback(parsedKey.identifier().toDisplayByteArray(), revision, Sink::EntityBuffer(value.data(), value.size()));
692 }
693 return true;
694 },
695 [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Error while reading: " << error.message; }, true);
696 674
675 const auto revisions = DataStore::getRevisionsFromUid(d->transaction, uid);
676
677 const auto db = DataStore::mainDatabase(d->transaction, type);
678
679 for (const auto revision : revisions) {
680 if (revision < startingRevision) {
681 continue;
682 }
683
684 db.scan(revision,
685 [&](size_t rev, const QByteArray &value) {
686 Q_ASSERT(rev == revision);
687 callback(uid, revision, Sink::EntityBuffer(value.data(), value.size()));
688 return false;
689 },
690 [&](const DataStore::Error &error) {
691 SinkWarningCtx(d->logCtx) << "Error while reading: " << error.message;
692 },
693 true);
694 }
697} 695}
698 696
699qint64 EntityStore::maxRevision() 697qint64 EntityStore::maxRevision()