From 237b9ae4113e7a9f489632296941becb71afdb45 Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Sun, 16 Oct 2016 14:55:20 +0200 Subject: Refactor how the storage is used. This is the initial refactoring to improve how we deal with the storage. It does a couple of things: * Rename Sink::Storage to Sink::Storage::DataStore to free up the Sink::Storage namespace * Introduce a Sink::ResourceContext to have a single object that can be passed around containing everything that is necessary to operate on a resource. This is a lot better than the multiple separate parameters that we used to pass around all over the place, while still allowing for dependency injection for tests. * Tie storage access together using the new EntityStore that directly works with ApplicationDomainTypes. This gives us a central place where main storage, indexes and buffer adaptors are tied together, which will also give us a place to implement external indexes, such as a fulltextindex using xapian. * Use ApplicationDomainTypes as the default way to pass around entities. Instead of using various ways to pass around entities (buffers, buffer adaptors, ApplicationDomainTypes), only use a single way. The old approach was confusing, and was only done as: * optimization; really shouldn't be necessary and otherwise I'm sure we can find better ways to optimize ApplicationDomainType itself. * a way to account for entities that have multiple buffers, a concept that I no longer deem relevant. While this commit does the bulk of the work to get there, the following commits will refactor more stuff to get things back to normal. --- common/storage/entitystore.cpp | 338 +++++++++++++++++++++++++++++++++++++++++ common/storage/entitystore.h | 109 +++++++++++++ 2 files changed, 447 insertions(+) create mode 100644 common/storage/entitystore.cpp create mode 100644 common/storage/entitystore.h (limited to 'common/storage') diff --git a/common/storage/entitystore.cpp b/common/storage/entitystore.cpp new file mode 100644 index 0000000..9615eca --- /dev/null +++ b/common/storage/entitystore.cpp @@ -0,0 +1,338 @@ +/* + * Copyright (C) 2016 Christian Mollekopf + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) version 3, or any + * later version accepted by the membership of KDE e.V. (or its + * successor approved by the membership of KDE e.V.), which shall + * act as a proxy defined in Section 6 of version 3 of the license. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library. If not, see . + */ +#include "entitystore.h" + +#include "entitybuffer.h" +#include "log.h" +#include "typeindex.h" +#include "definitions.h" +#include "resourcecontext.h" +#include "index.h" + +#include "mail.h" +#include "folder.h" +#include "event.h" + +using namespace Sink; +using namespace Sink::Storage; + +SINK_DEBUG_AREA("entitystore"); + +class EntityStore::Private { +public: + Private(const ResourceContext &context) : resourceContext(context) {} + + ResourceContext resourceContext; + DataStore::Transaction transaction; + QHash > indexByType; + + DataStore::Transaction &getTransaction() + { + if (transaction) { + return transaction; + } + + Sink::Storage::DataStore store(Sink::storageLocation(), resourceContext.instanceId(), DataStore::ReadOnly); + transaction = store.createTransaction(DataStore::ReadOnly); + Q_ASSERT(transaction.validateNamedDatabases()); + return transaction; + } + + /* template */ + /* TypeIndex &typeIndex(const QByteArray &type) */ + /* { */ + /* if (indexByType.contains(type)) { */ + /* return *indexByType.value(type); */ + /* } */ + /* auto index = QSharedPointer::create(type); */ + /* ApplicationDomain::TypeImplementation::configureIndex(*index); */ + /* indexByType.insert(type, index); */ + /* return *index; */ + /* } */ + + TypeIndex &typeIndex(const QByteArray &type) + { + /* return applyType(type); */ + if (indexByType.contains(type)) { + return *indexByType.value(type); + } + auto index = QSharedPointer::create(type); + //TODO expand for all types + /* TypeHelper::configureIndex(*index); */ + // Try this: (T would i.e. become + // TypeHelper::T::configureIndex(*index); + if (type == ApplicationDomain::getTypeName()) { + ApplicationDomain::TypeImplementation::configureIndex(*index); + } else if (type == ApplicationDomain::getTypeName()) { + ApplicationDomain::TypeImplementation::configureIndex(*index); + } else if (type == ApplicationDomain::getTypeName()) { + ApplicationDomain::TypeImplementation::configureIndex(*index); + } else { + Q_ASSERT(false); + SinkError() << "Unkonwn type " << type; + } + indexByType.insert(type, index); + return *index; + } +}; + +EntityStore::EntityStore(const ResourceContext &context) + : d(new EntityStore::Private{context}) +{ + +} + +void EntityStore::startTransaction(Sink::Storage::DataStore::AccessMode accessMode) +{ + Sink::Storage::DataStore store(Sink::storageLocation(), d->resourceContext.instanceId(), accessMode); + d->transaction = store.createTransaction(accessMode); + Q_ASSERT(d->transaction.validateNamedDatabases()); +} + +void EntityStore::commitTransaction() +{ + d->transaction.commit(); + d->transaction = Storage::DataStore::Transaction(); +} + +void EntityStore::abortTransaction() +{ + d->transaction.abort(); + d->transaction = Storage::DataStore::Transaction(); +} + +QVector EntityStore::fullScan(const QByteArray &type) +{ + SinkTrace() << "Looking for : " << type; + //The scan can return duplicate results if we have multiple revisions, so we use a set to deduplicate. + QSet keys; + DataStore::mainDatabase(d->getTransaction(), type) + .scan(QByteArray(), + [&](const QByteArray &key, const QByteArray &value) -> bool { + const auto uid = DataStore::uidFromKey(key); + if (keys.contains(uid)) { + //Not something that should persist if the replay works, so we keep a message for now. + SinkTrace() << "Multiple revisions for key: " << key; + } + keys << uid; + return true; + }, + [](const DataStore::Error &error) { SinkWarning() << "Error during query: " << error.message; }); + + SinkTrace() << "Full scan retrieved " << keys.size() << " results."; + return keys.toList().toVector(); +} + +QVector EntityStore::indexLookup(const QByteArray &type, const Query &query, QSet &appliedFilters, QByteArray &appliedSorting) +{ + return d->typeIndex(type).query(query, appliedFilters, appliedSorting, d->getTransaction()); +} + +QVector EntityStore::indexLookup(const QByteArray &type, const QByteArray &property, const QVariant &value) +{ + return d->typeIndex(type).lookup(property, value, d->getTransaction()); +} + +void EntityStore::indexLookup(const QByteArray &type, const QByteArray &property, const QVariant &value, const std::function &callback) +{ + auto list = d->typeIndex(type).lookup(property, value, d->getTransaction()); + for (const auto &uid : list) { + callback(uid); + } + /* Index index(type + ".index." + property, d->transaction); */ + /* index.lookup(value, [&](const QByteArray &sinkId) { */ + /* callback(sinkId); */ + /* }, */ + /* [&](const Index::Error &error) { */ + /* SinkWarning() << "Error in index: " << error.message << property; */ + /* }); */ +} + +void EntityStore::readLatest(const QByteArray &type, const QByteArray &uid, const std::function callback) +{ + auto db = DataStore::mainDatabase(d->getTransaction(), type); + db.findLatest(uid, + [=](const QByteArray &key, const QByteArray &value) -> bool { + callback(DataStore::uidFromKey(key), Sink::EntityBuffer(value.data(), value.size())); + return false; + }, + [&](const DataStore::Error &error) { SinkWarning() << "Error during query: " << error.message << uid; }); +} + +void EntityStore::readLatest(const QByteArray &type, const QByteArray &uid, const std::function callback) +{ + readLatest(type, uid, [&](const QByteArray &uid, const EntityBuffer &buffer) { + auto adaptor = d->resourceContext.adaptorFactory(type).createAdaptor(buffer.entity()); + callback(ApplicationDomain::ApplicationDomainType{d->resourceContext.instanceId(), uid, DataStore::maxRevision(d->getTransaction()), adaptor}); + }); +} + +ApplicationDomain::ApplicationDomainType EntityStore::readLatest(const QByteArray &type, const QByteArray &uid) +{ + ApplicationDomain::ApplicationDomainType dt; + readLatest(type, uid, [&](const ApplicationDomain::ApplicationDomainType &entity) { + dt = entity; + }); + return dt; +} + +void EntityStore::readEntity(const QByteArray &type, const QByteArray &key, const std::function callback) +{ + auto db = DataStore::mainDatabase(d->getTransaction(), type); + db.scan(key, + [=](const QByteArray &key, const QByteArray &value) -> bool { + callback(DataStore::uidFromKey(key), Sink::EntityBuffer(value.data(), value.size())); + return false; + }, + [&](const DataStore::Error &error) { SinkWarning() << "Error during query: " << error.message << key; }); +} + +void EntityStore::readEntity(const QByteArray &type, const QByteArray &uid, const std::function callback) +{ + readEntity(type, uid, [&](const QByteArray &uid, const EntityBuffer &buffer) { + auto adaptor = d->resourceContext.adaptorFactory(type).createAdaptor(buffer.entity()); + callback(ApplicationDomain::ApplicationDomainType{d->resourceContext.instanceId(), uid, DataStore::maxRevision(d->getTransaction()), adaptor}); + }); +} + +ApplicationDomain::ApplicationDomainType EntityStore::readEntity(const QByteArray &type, const QByteArray &uid) +{ + ApplicationDomain::ApplicationDomainType dt; + readEntity(type, uid, [&](const ApplicationDomain::ApplicationDomainType &entity) { + dt = entity; + }); + return dt; +} + + +void EntityStore::readAll(const QByteArray &type, const std::function &callback) +{ + auto db = DataStore::mainDatabase(d->getTransaction(), type); + db.scan("", + [=](const QByteArray &key, const QByteArray &value) -> bool { + auto uid = DataStore::uidFromKey(key); + auto buffer = Sink::EntityBuffer{value.data(), value.size()}; + auto adaptor = d->resourceContext.adaptorFactory(type).createAdaptor(buffer.entity()); + callback(ApplicationDomain::ApplicationDomainType{d->resourceContext.instanceId(), uid, DataStore::maxRevision(d->getTransaction()), adaptor}); + return true; + }, + [&](const DataStore::Error &error) { SinkWarning() << "Error during query: " << error.message; }); +} + +void EntityStore::readRevisions(qint64 baseRevision, const QByteArray &expectedType, const std::function &callback) +{ + qint64 revisionCounter = baseRevision; + const qint64 topRevision = DataStore::maxRevision(d->getTransaction()); + // Spit out the revision keys one by one. + while (revisionCounter <= topRevision) { + const auto uid = DataStore::getUidFromRevision(d->getTransaction(), revisionCounter); + const auto type = DataStore::getTypeFromRevision(d->getTransaction(), revisionCounter); + // SinkTrace() << "Revision" << *revisionCounter << type << uid; + Q_ASSERT(!uid.isEmpty()); + Q_ASSERT(!type.isEmpty()); + if (type != expectedType) { + // Skip revision + revisionCounter++; + continue; + } + const auto key = DataStore::assembleKey(uid, revisionCounter); + revisionCounter++; + callback(key); + } +} + +void EntityStore::readPrevious(const QByteArray &type, const QByteArray &uid, qint64 revision, const std::function callback) +{ + auto db = DataStore::mainDatabase(d->getTransaction(), type); + qint64 latestRevision = 0; + db.scan(uid, + [&latestRevision, revision](const QByteArray &key, const QByteArray &) -> bool { + const auto foundRevision = Sink::Storage::DataStore::revisionFromKey(key); + if (foundRevision < revision && foundRevision > latestRevision) { + latestRevision = foundRevision; + } + return true; + }, + [](const Sink::Storage::DataStore::Error &error) { SinkWarning() << "Failed to read current value from storage: " << error.message; }, true); + return readEntity(type, Sink::Storage::DataStore::assembleKey(uid, latestRevision), callback); +} + +void EntityStore::readPrevious(const QByteArray &type, const QByteArray &uid, qint64 revision, const std::function callback) +{ + readPrevious(type, uid, revision, [&](const QByteArray &uid, const EntityBuffer &buffer) { + auto adaptor = d->resourceContext.adaptorFactory(type).createAdaptor(buffer.entity()); + callback(ApplicationDomain::ApplicationDomainType{d->resourceContext.instanceId(), uid, DataStore::maxRevision(d->getTransaction()), adaptor}); + }); +} + +ApplicationDomain::ApplicationDomainType EntityStore::readPrevious(const QByteArray &type, const QByteArray &uid, qint64 revision) +{ + ApplicationDomain::ApplicationDomainType dt; + readPrevious(type, uid, revision, [&](const ApplicationDomain::ApplicationDomainType &entity) { + dt = entity; + }); + return dt; +} + +void EntityStore::readAllUids(const QByteArray &type, const std::function callback) +{ + //TODO use uid index instead + //FIXME we currently report each uid for every revision with the same uid + auto db = DataStore::mainDatabase(d->getTransaction(), type); + db.scan("", + [callback](const QByteArray &key, const QByteArray &) -> bool { + callback(Sink::Storage::DataStore::uidFromKey(key)); + return true; + }, + [](const Sink::Storage::DataStore::Error &error) { SinkWarning() << "Failed to read current value from storage: " << error.message; }); +} + +bool EntityStore::contains(const QByteArray &type, const QByteArray &uid) +{ + return DataStore::mainDatabase(d->getTransaction(), type).contains(uid); +} + +qint64 EntityStore::maxRevision() +{ + return DataStore::maxRevision(d->getTransaction()); +} + +/* DataStore::Transaction getTransaction() */ +/* { */ +/* Sink::Storage::DataStore::Transaction transaction; */ +/* { */ +/* Sink::Storage::DataStore storage(Sink::storageLocation(), mResourceInstanceIdentifier); */ +/* if (!storage.exists()) { */ +/* //This is not an error if the resource wasn't started before */ +/* SinkLog() << "Store doesn't exist: " << mResourceInstanceIdentifier; */ +/* return Sink::Storage::DataStore::Transaction(); */ +/* } */ +/* storage.setDefaultErrorHandler([this](const Sink::Storage::DataStore::Error &error) { SinkWarning() << "Error during query: " << error.store << error.message; }); */ +/* transaction = storage.createTransaction(Sink::Storage::DataStore::ReadOnly); */ +/* } */ + +/* //FIXME this is a temporary measure to recover from a failure to open the named databases correctly. */ +/* //Once the actual problem is fixed it will be enough to simply crash if we open the wrong database (which we check in openDatabase already). */ +/* while (!transaction.validateNamedDatabases()) { */ +/* Sink::Storage::DataStore storage(Sink::storageLocation(), mResourceInstanceIdentifier); */ +/* transaction = storage.createTransaction(Sink::Storage::DataStore::ReadOnly); */ +/* } */ +/* return transaction; */ +/* } */ diff --git a/common/storage/entitystore.h b/common/storage/entitystore.h new file mode 100644 index 0000000..de29e87 --- /dev/null +++ b/common/storage/entitystore.h @@ -0,0 +1,109 @@ +/* + * Copyright (C) 2016 Christian Mollekopf + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) version 3, or any + * later version accepted by the membership of KDE e.V. (or its + * successor approved by the membership of KDE e.V.), which shall + * act as a proxy defined in Section 6 of version 3 of the license. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library. If not, see . + */ +#pragma once + +#include "sink_export.h" + +#include +#include "domaintypeadaptorfactoryinterface.h" +#include "query.h" +#include "storage.h" +#include "resourcecontext.h" + +namespace Sink { +class EntityBuffer; +namespace Storage { + +class SINK_EXPORT EntityStore +{ +public: + typedef QSharedPointer Ptr; + EntityStore(const ResourceContext &resourceContext); + + void add(const ApplicationDomain::ApplicationDomainType &); + void modify(const ApplicationDomain::ApplicationDomainType &); + void remove(const ApplicationDomain::ApplicationDomainType &); + + void startTransaction(Sink::Storage::DataStore::AccessMode); + void commitTransaction(); + void abortTransaction(); + + QVector fullScan(const QByteArray &type); + QVector indexLookup(const QByteArray &type, const Query &query, QSet &appliedFilters, QByteArray &appliedSorting); + QVector indexLookup(const QByteArray &type, const QByteArray &property, const QVariant &value); + void indexLookup(const QByteArray &type, const QByteArray &property, const QVariant &value, const std::function &callback); + template + void indexLookup(const QVariant &value, const std::function &callback) { + return indexLookup(ApplicationDomain::getTypeName(), PropertyType::name, value, callback); + } + + void readLatest(const QByteArray &type, const QByteArray &uid, const std::function callback); + void readLatest(const QByteArray &type, const QByteArray &uid, const std::function callback); + + ApplicationDomain::ApplicationDomainType readLatest(const QByteArray &type, const QByteArray &uid); + + template + T readLatest(const QByteArray &uid) { + return T(readLatest(ApplicationDomain::getTypeName(), uid)); + } + + void readEntity(const QByteArray &type, const QByteArray &uid, const std::function callback); + void readEntity(const QByteArray &type, const QByteArray &uid, const std::function callback); + ApplicationDomain::ApplicationDomainType readEntity(const QByteArray &type, const QByteArray &key); + + template + T readEntity(const QByteArray &key) { + return T(readEntity(ApplicationDomain::getTypeName(), key)); + } + + + void readPrevious(const QByteArray &type, const QByteArray &uid, qint64 revision, const std::function callback); + void readPrevious(const QByteArray &type, const QByteArray &uid, qint64 revision, const std::function callback); + ApplicationDomain::ApplicationDomainType readPrevious(const QByteArray &type, const QByteArray &uid, qint64 revision); + + template + T readPrevious(const QByteArray &uid, qint64 revision) { + return T(readPrevious(ApplicationDomain::getTypeName(), uid, revision)); + } + + void readAllUids(const QByteArray &type, const std::function callback); + + void readAll(const QByteArray &type, const std::function &callback); + + template + void readAll(const std::function &callback) { + return readAll(ApplicationDomain::getTypeName(), [&](const ApplicationDomain::ApplicationDomainType &entity) { + callback(T(entity)); + }); + } + + void readRevisions(qint64 baseRevision, const QByteArray &type, const std::function &callback); + + bool contains(const QByteArray &type, const QByteArray &uid); + + qint64 maxRevision(); + +private: + class Private; + const QSharedPointer d; +}; + +} +} -- cgit v1.2.3