From 6ef0a29d8e468de50c9dcf260db45957d028a083 Mon Sep 17 00:00:00 2001 From: Minijackson Date: Tue, 21 Aug 2018 12:03:40 +0200 Subject: Separate UIDs and revisions --- common/storage/entitystore.cpp | 106 ++++++++++++++++++++++++----------------- common/storage/key.cpp | 5 ++ common/storage/key.h | 1 + 3 files changed, 68 insertions(+), 44 deletions(-) (limited to 'common/storage') diff --git a/common/storage/entitystore.cpp b/common/storage/entitystore.cpp index bb0967a..6231479 100644 --- a/common/storage/entitystore.cpp +++ b/common/storage/entitystore.cpp @@ -38,8 +38,9 @@ using namespace Sink::Storage; static QMap baseDbs() { - return {{"revisionType", 0}, - {"revisions", 0}, + return {{"revisionType", Storage::IntegerKeys}, + {"revisions", Storage::IntegerKeys}, + {"uidsToRevisions", Storage::AllowDuplicates | Storage::IntegerValues}, {"uids", 0}, {"default", 0}, {"__flagtable", 0}}; @@ -242,12 +243,13 @@ bool EntityStore::add(const QByteArray &type, ApplicationDomainType entity, bool const auto key = Key(identifier, newRevision); DataStore::mainDatabase(d->transaction, type) - .write(key.toInternalByteArray(), BufferUtils::extractBuffer(fbb), + .write(newRevision, BufferUtils::extractBuffer(fbb), [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Failed to write entity" << entity.identifier() << newRevision; }); + DataStore::setMaxRevision(d->transaction, newRevision); DataStore::recordRevision(d->transaction, newRevision, entity.identifier(), type); DataStore::recordUid(d->transaction, entity.identifier(), type); - SinkTraceCtx(d->logCtx) << "Wrote entity: " << entity.identifier() << type << newRevision; + SinkTraceCtx(d->logCtx) << "Wrote entity: " << entity.identifier() << type << newRevision << "key:" << key.toInternalByteArray(); return true; } @@ -319,8 +321,9 @@ bool EntityStore::modify(const QByteArray &type, const ApplicationDomainType &cu const auto key = Key(identifier, newRevision); DataStore::mainDatabase(d->transaction, type) - .write(key.toInternalByteArray(), BufferUtils::extractBuffer(fbb), + .write(newRevision, BufferUtils::extractBuffer(fbb), [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Failed to write entity" << newEntity.identifier() << newRevision; }); + DataStore::setMaxRevision(d->transaction, newRevision); DataStore::recordRevision(d->transaction, newRevision, newEntity.identifier(), type); SinkTraceCtx(d->logCtx) << "Wrote modified entity: " << newEntity.identifier() << type << newRevision; @@ -356,8 +359,9 @@ bool EntityStore::remove(const QByteArray &type, const ApplicationDomainType &cu const auto key = Key(identifier, newRevision); DataStore::mainDatabase(d->transaction, type) - .write(key.toInternalByteArray(), BufferUtils::extractBuffer(fbb), + .write(newRevision, BufferUtils::extractBuffer(fbb), [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Failed to write entity" << uid << newRevision; }); + DataStore::setMaxRevision(d->transaction, newRevision); DataStore::recordRevision(d->transaction, newRevision, uid, type); DataStore::removeUid(d->transaction, uid, type); @@ -375,30 +379,33 @@ void EntityStore::cleanupEntityRevisionsUntil(qint64 revision) } SinkTraceCtx(d->logCtx) << "Cleaning up revision " << revision << uid << bufferType; const auto internalUid = Identifier::fromDisplayByteArray(uid).toInternalByteArray(); - DataStore::mainDatabase(d->transaction, bufferType) - .scan(internalUid, - [&](const QByteArray &key, const QByteArray &data) -> bool { - EntityBuffer buffer(const_cast(data.data()), data.size()); - if (!buffer.isValid()) { - SinkWarningCtx(d->logCtx) << "Read invalid buffer from disk"; - } else { - const auto metadata = flatbuffers::GetRoot(buffer.metadataBuffer()); - const qint64 rev = metadata->revision(); - const auto isRemoval = metadata->operation() == Operation_Removal; - // Remove old revisions, and the current if the entity has already been removed - if (rev < revision || isRemoval) { - DataStore::removeRevision(d->transaction, rev); - DataStore::mainDatabase(d->transaction, bufferType).remove(key); - } - //Don't cleanup more than specified - if (rev >= revision) { - return false; - } - } - return true; - }, - [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Error while reading: " << error.message; }, true); + // Remove old revisions + const auto revisionsToRemove = DataStore::getRevisionsUntilFromUid(d->transaction, uid, revision); + + for (const auto &revisionToRemove : revisionsToRemove) { + DataStore::removeRevision(d->transaction, revisionToRemove); + DataStore::mainDatabase(d->transaction, bufferType).remove(revisionToRemove); + } + + // And remove the specified revision only if marked for removal + DataStore::mainDatabase(d->transaction, bufferType).scan(revision, [&](size_t, const QByteArray &data) { + EntityBuffer buffer(const_cast(data.data()), data.size()); + if (!buffer.isValid()) { + SinkWarningCtx(d->logCtx) << "Read invalid buffer from disk"; + return false; + } + + const auto metadata = flatbuffers::GetRoot(buffer.metadataBuffer()); + const qint64 rev = metadata->revision(); + if (metadata->operation() == Operation_Removal) { + DataStore::removeRevision(d->transaction, revision); + DataStore::mainDatabase(d->transaction, bufferType).remove(revision); + } + + return false; + }); + DataStore::setCleanedUpRevision(d->transaction, revision); } @@ -437,13 +444,23 @@ QVector EntityStore::fullScan(const QByteArray &type) QSet keys; DataStore::mainDatabase(d->getTransaction(), type) .scan(QByteArray(), - [&](const QByteArray &key, const QByteArray &value) -> bool { - const auto uid = Sink::Storage::Key::fromInternalByteArray(key).identifier(); - if (keys.contains(uid)) { + [&](const QByteArray &key, const QByteArray &data) -> bool { + EntityBuffer buffer(const_cast(data.data()), data.size()); + if (!buffer.isValid()) { + SinkWarningCtx(d->logCtx) << "Read invalid buffer from disk"; + return true; + } + + size_t revision = *reinterpret_cast(key.constData()); + + const auto metadata = flatbuffers::GetRoot(buffer.metadataBuffer()); + const QByteArray uid = DataStore::getUidFromRevision(d->getTransaction(), revision); + const auto identifier = Sink::Storage::Identifier::fromDisplayByteArray(uid); + if (keys.contains(identifier)) { //Not something that should persist if the replay works, so we keep a message for now. SinkTraceCtx(d->logCtx) << "Multiple revisions for key: " << key; } - keys << uid; + keys << identifier; return true; }, [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Error during fullScan query: " << error.message; }); @@ -492,12 +509,12 @@ void EntityStore::indexLookup(const QByteArray &type, const QByteArray &property void EntityStore::readLatest(const QByteArray &type, const Identifier &id, const std::function callback) { Q_ASSERT(d); - const auto internalKey = id.toInternalByteArray(); + const size_t revision = DataStore::getLatestRevisionFromUid(d->getTransaction(), id.toDisplayByteArray()); auto db = DataStore::mainDatabase(d->getTransaction(), type); - db.findLatest(internalKey, - [=](const QByteArray &key, const QByteArray &value) { - const auto uid = Sink::Storage::Key::fromInternalByteArray(key).identifier().toDisplayByteArray(); - callback(uid, Sink::EntityBuffer(value.data(), value.size())); + db.scan(revision, + [=](size_t, const QByteArray &value) { + callback(id.toDisplayByteArray(), Sink::EntityBuffer(value.data(), value.size())); + return false; }, [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Error during readLatest query: " << error.message << id; }); } @@ -546,9 +563,9 @@ void EntityStore::readEntity(const QByteArray &type, const QByteArray &displayKe { const auto key = Key::fromDisplayByteArray(displayKey); auto db = DataStore::mainDatabase(d->getTransaction(), type); - db.scan(key.toInternalByteArray(), - [=](const QByteArray &key, const QByteArray &value) -> bool { - const auto uid = Sink::Storage::Key::fromInternalByteArray(key).identifier().toDisplayByteArray(); + db.scan(key.revision().toSizeT(), + [=](size_t rev, const QByteArray &value) -> bool { + const auto uid = DataStore::getUidFromRevision(d->transaction, rev); callback(uid, Sink::EntityBuffer(value.data(), value.size())); return false; }, @@ -652,10 +669,10 @@ bool EntityStore::exists(const QByteArray &type, const QByteArray &uid) { bool found = false; bool alreadyRemoved = false; - const auto internalUid = Identifier::fromDisplayByteArray(uid).toInternalByteArray(); + const size_t revision = DataStore::getLatestRevisionFromUid(d->getTransaction(), uid); DataStore::mainDatabase(d->transaction, type) - .findLatest(internalUid, - [&found, &alreadyRemoved](const QByteArray &key, const QByteArray &data) { + .scan(revision, + [&found, &alreadyRemoved](size_t, const QByteArray &data) { auto entity = GetEntity(data.data()); if (entity && entity->metadata()) { auto metadata = GetMetadata(entity->metadata()->Data()); @@ -664,6 +681,7 @@ bool EntityStore::exists(const QByteArray &type, const QByteArray &uid) alreadyRemoved = true; } } + return true; }, [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Failed to read old revision from storage: " << error.message; }); if (!found) { diff --git a/common/storage/key.cpp b/common/storage/key.cpp index 2327061..a6567ea 100644 --- a/common/storage/key.cpp +++ b/common/storage/key.cpp @@ -155,6 +155,11 @@ qint64 Revision::toQint64() const return rev; } +size_t Revision::toSizeT() const +{ + return rev; +} + bool Revision::isValidInternal(const QByteArray &bytes) { if (bytes.size() != Revision::INTERNAL_REPR_SIZE) { diff --git a/common/storage/key.h b/common/storage/key.h index acd81cf..da90ddd 100644 --- a/common/storage/key.h +++ b/common/storage/key.h @@ -75,6 +75,7 @@ public: QByteArray toDisplayByteArray() const; static Revision fromDisplayByteArray(const QByteArray &bytes); qint64 toQint64() const; + size_t toSizeT() const; static bool isValidInternal(const QByteArray &); static bool isValidDisplay(const QByteArray &); -- cgit v1.2.3