From 46313049ac01a3007ef60bdc937442945355a38d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?R=C3=A9mi=20Nicole?= Date: Wed, 22 Aug 2018 14:16:59 +0200 Subject: Separate UIDs and Revisions in main databases Summary: - Change revision type from `qint64` to `size_t` for LMDB in a couple of places (LMDB supports `unsigned int` or `size_t` which are `long unsigned int` on my machine) - Better support for database flags (duplicate, integer keys, integer values for now but is extensible) - Main databases' keys are now revisions - Some databases switched to integer keys databases: - Main databases - the revision to uid mapping database - the revision to entity type mapping database - Refactor the entity type's `typeDatabases` method (if in the future we need to change the main databases' flags again) - New uid to revision mapping database (`uidsToRevisions`): - Stores all revisions (not uid to latest revision) because we need it for cleaning old revisions - Flags are: duplicates + integer values (so findLatest finds the latest revision for the given uid) ~~Problems to fix before merging:~~ All Fixed! - ~~Sometimes Sink can't read what has just been written to the database (maybe because of transactions race conditions)~~ - ~~Most of the times, this results in Sink not able to find the uid for a given revision by reading the `revisions` database~~ - ~~`pipelinetest`'s `testModifyWithConflict` fails because the local changes are overridden~~ ~~The first problem prevents me from running benchmarks~~ Reviewers: cmollekopf Tags: #sink Differential Revision: https://phabricator.kde.org/D14974 --- common/storage/entitystore.cpp | 160 ++++++++++++++++++++--------------------- common/storage/key.cpp | 5 ++ common/storage/key.h | 5 +- 3 files changed, 87 insertions(+), 83 deletions(-) (limited to 'common/storage') diff --git a/common/storage/entitystore.cpp b/common/storage/entitystore.cpp index 276ee6a..454e25a 100644 --- a/common/storage/entitystore.cpp +++ b/common/storage/entitystore.cpp @@ -38,8 +38,9 @@ using namespace Sink::Storage; static QMap baseDbs() { - return {{"revisionType", 0}, - {"revisions", 0}, + return {{"revisionType", Storage::IntegerKeys}, + {"revisions", Storage::IntegerKeys}, + {"uidsToRevisions", Storage::AllowDuplicates | Storage::IntegerValues}, {"uids", 0}, {"default", 0}, {"__flagtable", 0}}; @@ -242,12 +243,13 @@ bool EntityStore::add(const QByteArray &type, ApplicationDomainType entity, bool const auto key = Key(identifier, newRevision); DataStore::mainDatabase(d->transaction, type) - .write(key.toInternalByteArray(), BufferUtils::extractBuffer(fbb), + .write(newRevision, BufferUtils::extractBuffer(fbb), [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Failed to write entity" << entity.identifier() << newRevision; }); + DataStore::setMaxRevision(d->transaction, newRevision); DataStore::recordRevision(d->transaction, newRevision, entity.identifier(), type); DataStore::recordUid(d->transaction, entity.identifier(), type); - SinkTraceCtx(d->logCtx) << "Wrote entity: " << entity.identifier() << type << newRevision; + SinkTraceCtx(d->logCtx) << "Wrote entity: " << key << "of type:" << type; return true; } @@ -319,8 +321,9 @@ bool EntityStore::modify(const QByteArray &type, const ApplicationDomainType &cu const auto key = Key(identifier, newRevision); DataStore::mainDatabase(d->transaction, type) - .write(key.toInternalByteArray(), BufferUtils::extractBuffer(fbb), + .write(newRevision, BufferUtils::extractBuffer(fbb), [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Failed to write entity" << newEntity.identifier() << newRevision; }); + DataStore::setMaxRevision(d->transaction, newRevision); DataStore::recordRevision(d->transaction, newRevision, newEntity.identifier(), type); SinkTraceCtx(d->logCtx) << "Wrote modified entity: " << newEntity.identifier() << type << newRevision; @@ -356,8 +359,9 @@ bool EntityStore::remove(const QByteArray &type, const ApplicationDomainType &cu const auto key = Key(identifier, newRevision); DataStore::mainDatabase(d->transaction, type) - .write(key.toInternalByteArray(), BufferUtils::extractBuffer(fbb), + .write(newRevision, BufferUtils::extractBuffer(fbb), [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Failed to write entity" << uid << newRevision; }); + DataStore::setMaxRevision(d->transaction, newRevision); DataStore::recordRevision(d->transaction, newRevision, uid, type); DataStore::removeUid(d->transaction, uid, type); @@ -375,30 +379,33 @@ void EntityStore::cleanupEntityRevisionsUntil(qint64 revision) } SinkTraceCtx(d->logCtx) << "Cleaning up revision " << revision << uid << bufferType; const auto internalUid = Identifier::fromDisplayByteArray(uid).toInternalByteArray(); - DataStore::mainDatabase(d->transaction, bufferType) - .scan(internalUid, - [&](const QByteArray &key, const QByteArray &data) -> bool { - EntityBuffer buffer(const_cast(data.data()), data.size()); - if (!buffer.isValid()) { - SinkWarningCtx(d->logCtx) << "Read invalid buffer from disk"; - } else { - const auto metadata = flatbuffers::GetRoot(buffer.metadataBuffer()); - const qint64 rev = metadata->revision(); - const auto isRemoval = metadata->operation() == Operation_Removal; - // Remove old revisions, and the current if the entity has already been removed - if (rev < revision || isRemoval) { - DataStore::removeRevision(d->transaction, rev); - DataStore::mainDatabase(d->transaction, bufferType).remove(key); - } - //Don't cleanup more than specified - if (rev >= revision) { - return false; - } - } - return true; - }, - [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Error while reading: " << error.message; }, true); + // Remove old revisions + const auto revisionsToRemove = DataStore::getRevisionsUntilFromUid(d->transaction, uid, revision); + + for (const auto &revisionToRemove : revisionsToRemove) { + DataStore::removeRevision(d->transaction, revisionToRemove); + DataStore::mainDatabase(d->transaction, bufferType).remove(revisionToRemove); + } + + // And remove the specified revision only if marked for removal + DataStore::mainDatabase(d->transaction, bufferType).scan(revision, [&](size_t, const QByteArray &data) { + EntityBuffer buffer(const_cast(data.data()), data.size()); + if (!buffer.isValid()) { + SinkWarningCtx(d->logCtx) << "Read invalid buffer from disk"; + return false; + } + + const auto metadata = flatbuffers::GetRoot(buffer.metadataBuffer()); + const qint64 rev = metadata->revision(); + if (metadata->operation() == Operation_Removal) { + DataStore::removeRevision(d->transaction, revision); + DataStore::mainDatabase(d->transaction, bufferType).remove(revision); + } + + return false; + }); + DataStore::setCleanedUpRevision(d->transaction, revision); } @@ -433,20 +440,12 @@ QVector EntityStore::fullScan(const QByteArray &type) SinkTraceCtx(d->logCtx) << "Database is not existing: " << type; return {}; } - //The scan can return duplicate results if we have multiple revisions, so we use a set to deduplicate. + QSet keys; - DataStore::mainDatabase(d->getTransaction(), type) - .scan(QByteArray(), - [&](const QByteArray &key, const QByteArray &value) -> bool { - const auto uid = Sink::Storage::Key::fromInternalByteArray(key).identifier(); - if (keys.contains(uid)) { - //Not something that should persist if the replay works, so we keep a message for now. - SinkTraceCtx(d->logCtx) << "Multiple revisions for uid: " << Sink::Storage::Key::fromInternalByteArray(key) << ". This is normal if changereplay has not completed yet."; - } - keys << uid; - return true; - }, - [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Error during fullScan query: " << error.message; }); + + DataStore::getUids(type, d->getTransaction(), [&keys] (const QByteArray &uid) { + keys << Identifier::fromDisplayByteArray(uid); + }); SinkTraceCtx(d->logCtx) << "Full scan retrieved " << keys.size() << " results."; return keys.toList().toVector(); @@ -492,12 +491,12 @@ void EntityStore::indexLookup(const QByteArray &type, const QByteArray &property void EntityStore::readLatest(const QByteArray &type, const Identifier &id, const std::function callback) { Q_ASSERT(d); - const auto internalKey = id.toInternalByteArray(); + const size_t revision = DataStore::getLatestRevisionFromUid(d->getTransaction(), id.toDisplayByteArray()); auto db = DataStore::mainDatabase(d->getTransaction(), type); - db.findLatest(internalKey, - [=](const QByteArray &key, const QByteArray &value) { - const auto uid = Sink::Storage::Key::fromInternalByteArray(key).identifier().toDisplayByteArray(); - callback(uid, Sink::EntityBuffer(value.data(), value.size())); + db.scan(revision, + [=](size_t, const QByteArray &value) { + callback(id.toDisplayByteArray(), Sink::EntityBuffer(value.data(), value.size())); + return false; }, [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Error during readLatest query: " << error.message << id; }); } @@ -546,9 +545,9 @@ void EntityStore::readEntity(const QByteArray &type, const QByteArray &displayKe { const auto key = Key::fromDisplayByteArray(displayKey); auto db = DataStore::mainDatabase(d->getTransaction(), type); - db.scan(key.toInternalByteArray(), - [=](const QByteArray &key, const QByteArray &value) -> bool { - const auto uid = Sink::Storage::Key::fromInternalByteArray(key).identifier().toDisplayByteArray(); + db.scan(key.revision().toSizeT(), + [=](size_t rev, const QByteArray &value) -> bool { + const auto uid = DataStore::getUidFromRevision(d->transaction, rev); callback(uid, Sink::EntityBuffer(value.data(), value.size())); return false; }, @@ -604,18 +603,8 @@ void EntityStore::readRevisions(qint64 baseRevision, const QByteArray &expectedT void EntityStore::readPrevious(const QByteArray &type, const Identifier &id, qint64 revision, const std::function callback) { - auto db = DataStore::mainDatabase(d->getTransaction(), type); - qint64 latestRevision = 0; - const auto internalUid = id.toInternalByteArray(); - db.scan(internalUid, - [&latestRevision, revision](const QByteArray &key, const QByteArray &) -> bool { - const auto foundRevision = Key::fromInternalByteArray(key).revision().toQint64(); - if (foundRevision < revision && foundRevision > latestRevision) { - latestRevision = foundRevision; - } - return true; - }, - [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Failed to read current value from storage: " << error.message; }, true); + const auto previousRevisions = DataStore::getRevisionsUntilFromUid(d->getTransaction(), id.toDisplayByteArray(), revision); + const size_t latestRevision = previousRevisions[previousRevisions.size() - 1]; const auto key = Key(id, latestRevision); readEntity(type, key.toDisplayByteArray(), callback); } @@ -641,21 +630,20 @@ void EntityStore::readAllUids(const QByteArray &type, const std::functiongetTransaction(), callback); } -bool EntityStore::contains(const QByteArray &type, const QByteArray &uid) +bool EntityStore::contains(const QByteArray & /* type */, const QByteArray &uid) { Q_ASSERT(!uid.isEmpty()); - const auto internalUid = Identifier::fromDisplayByteArray(uid).toInternalByteArray(); - return DataStore::mainDatabase(d->getTransaction(), type).contains(internalUid); + return !DataStore::getRevisionsFromUid(d->getTransaction(), uid).isEmpty(); } bool EntityStore::exists(const QByteArray &type, const QByteArray &uid) { bool found = false; bool alreadyRemoved = false; - const auto internalUid = Identifier::fromDisplayByteArray(uid).toInternalByteArray(); + const size_t revision = DataStore::getLatestRevisionFromUid(d->getTransaction(), uid); DataStore::mainDatabase(d->transaction, type) - .findLatest(internalUid, - [&found, &alreadyRemoved](const QByteArray &key, const QByteArray &data) { + .scan(revision, + [&found, &alreadyRemoved](size_t, const QByteArray &data) { auto entity = GetEntity(data.data()); if (entity && entity->metadata()) { auto metadata = GetMetadata(entity->metadata()->Data()); @@ -664,6 +652,7 @@ bool EntityStore::exists(const QByteArray &type, const QByteArray &uid) alreadyRemoved = true; } } + return true; }, [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Failed to read old revision from storage: " << error.message; }); if (!found) { @@ -677,23 +666,32 @@ bool EntityStore::exists(const QByteArray &type, const QByteArray &uid) return true; } -void EntityStore::readRevisions(const QByteArray &type, const QByteArray &uid, qint64 startingRevision, const std::function callback) +void EntityStore::readRevisions(const QByteArray &type, const QByteArray &uid, qint64 startingRevision, + const std::function callback) { Q_ASSERT(d); Q_ASSERT(!uid.isEmpty()); - const auto internalUid = Identifier::fromDisplayByteArray(uid).toInternalByteArray(); - DataStore::mainDatabase(d->transaction, type) - .scan(internalUid, - [&](const QByteArray &key, const QByteArray &value) -> bool { - const auto parsedKey = Key::fromInternalByteArray(key); - const auto revision = parsedKey.revision().toQint64(); - if (revision >= startingRevision) { - callback(parsedKey.identifier().toDisplayByteArray(), revision, Sink::EntityBuffer(value.data(), value.size())); - } - return true; - }, - [&](const DataStore::Error &error) { SinkWarningCtx(d->logCtx) << "Error while reading: " << error.message; }, true); + const auto revisions = DataStore::getRevisionsFromUid(d->transaction, uid); + + const auto db = DataStore::mainDatabase(d->transaction, type); + + for (const auto revision : revisions) { + if (revision < startingRevision) { + continue; + } + + db.scan(revision, + [&](size_t rev, const QByteArray &value) { + Q_ASSERT(rev == revision); + callback(uid, revision, Sink::EntityBuffer(value.data(), value.size())); + return false; + }, + [&](const DataStore::Error &error) { + SinkWarningCtx(d->logCtx) << "Error while reading: " << error.message; + }, + true); + } } qint64 EntityStore::maxRevision() diff --git a/common/storage/key.cpp b/common/storage/key.cpp index 2327061..a6567ea 100644 --- a/common/storage/key.cpp +++ b/common/storage/key.cpp @@ -155,6 +155,11 @@ qint64 Revision::toQint64() const return rev; } +size_t Revision::toSizeT() const +{ + return rev; +} + bool Revision::isValidInternal(const QByteArray &bytes) { if (bytes.size() != Revision::INTERNAL_REPR_SIZE) { diff --git a/common/storage/key.h b/common/storage/key.h index baabe38..da90ddd 100644 --- a/common/storage/key.h +++ b/common/storage/key.h @@ -67,7 +67,7 @@ public: static const constexpr size_t INTERNAL_REPR_SIZE = 19; static const constexpr size_t DISPLAY_REPR_SIZE = 19; - Revision(qint64 rev) : rev(rev) {} + Revision(size_t rev) : rev(rev) {} QByteArray toInternalByteArray() const; static Revision fromInternalByteArray(const QByteArray &bytes); @@ -75,6 +75,7 @@ public: QByteArray toDisplayByteArray() const; static Revision fromDisplayByteArray(const QByteArray &bytes); qint64 toQint64() const; + size_t toSizeT() const; static bool isValidInternal(const QByteArray &); static bool isValidDisplay(const QByteArray &); @@ -84,7 +85,7 @@ public: bool operator!=(const Revision &other) const; private: - qint64 rev; + size_t rev; }; class Key -- cgit v1.2.3