From 555c373a0c4dfe386dcd2c88ae9548d95e307409 Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Thu, 9 Jun 2016 17:27:29 +0200 Subject: Moved query logic to EntityReader to make it reusable in the resource. --- common/CMakeLists.txt | 1 + common/entityreader.cpp | 412 ++++++++++++++++++++++++++++++++++++++++++++++++ common/entityreader.h | 110 +++++++++++++ common/entitystore.cpp | 52 ------ common/entitystore.h | 37 +---- common/query.h | 40 +++-- common/queryrunner.cpp | 317 +++++-------------------------------- common/synchronizer.cpp | 2 +- 8 files changed, 594 insertions(+), 377 deletions(-) create mode 100644 common/entityreader.cpp create mode 100644 common/entityreader.h (limited to 'common') diff --git a/common/CMakeLists.txt b/common/CMakeLists.txt index 3c6a083..752e4e4 100644 --- a/common/CMakeLists.txt +++ b/common/CMakeLists.txt @@ -72,6 +72,7 @@ set(command_SRCS entitystore.cpp remoteidmap.cpp sourcewriteback.cpp + entityreader.cpp ${storage_SRCS}) add_library(${PROJECT_NAME} SHARED ${command_SRCS}) diff --git a/common/entityreader.cpp b/common/entityreader.cpp new file mode 100644 index 0000000..b29b2a3 --- /dev/null +++ b/common/entityreader.cpp @@ -0,0 +1,412 @@ +/* + * Copyright (C) 2016 Christian Mollekopf + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) version 3, or any + * later version accepted by the membership of KDE e.V. (or its + * successor approved by the membership of KDE e.V.), which shall + * act as a proxy defined in Section 6 of version 3 of the license. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library. If not, see . + */ +#include "entityreader.h" + +#include "resultset.h" +#include "storage.h" +#include "query.h" + +using namespace Sink; + +QSharedPointer EntityReaderUtils::getLatest(const Sink::Storage::NamedDatabase &db, const QByteArray &uid, DomainTypeAdaptorFactoryInterface &adaptorFactory, qint64 &retrievedRevision) +{ + QSharedPointer current; + db.findLatest(uid, + [¤t, &adaptorFactory, &retrievedRevision](const QByteArray &key, const QByteArray &data) -> bool { + Sink::EntityBuffer buffer(const_cast(data.data()), data.size()); + if (!buffer.isValid()) { + Warning() << "Read invalid buffer from disk"; + } else { + Trace() << "Found value " << key; + current = adaptorFactory.createAdaptor(buffer.entity()); + retrievedRevision = Sink::Storage::revisionFromKey(key); + } + return false; + }, + [](const Sink::Storage::Error &error) { Warning() << "Failed to read current value from storage: " << error.message; }); + return current; +} + +QSharedPointer EntityReaderUtils::get(const Sink::Storage::NamedDatabase &db, const QByteArray &key, DomainTypeAdaptorFactoryInterface &adaptorFactory, qint64 &retrievedRevision) +{ + QSharedPointer current; + db.scan(key, + [¤t, &adaptorFactory, &retrievedRevision](const QByteArray &key, const QByteArray &data) -> bool { + Sink::EntityBuffer buffer(const_cast(data.data()), data.size()); + if (!buffer.isValid()) { + Warning() << "Read invalid buffer from disk"; + } else { + current = adaptorFactory.createAdaptor(buffer.entity()); + retrievedRevision = Sink::Storage::revisionFromKey(key); + } + return false; + }, + [](const Sink::Storage::Error &error) { Warning() << "Failed to read current value from storage: " << error.message; }); + return current; +} + +QSharedPointer EntityReaderUtils::getPrevious(const Sink::Storage::NamedDatabase &db, const QByteArray &uid, qint64 revision, DomainTypeAdaptorFactoryInterface &adaptorFactory, qint64 &retrievedRevision) +{ + QSharedPointer current; + qint64 latestRevision = 0; + db.scan(uid, + [¤t, &latestRevision, revision](const QByteArray &key, const QByteArray &) -> bool { + auto foundRevision = Sink::Storage::revisionFromKey(key); + if (foundRevision < revision && foundRevision > latestRevision) { + latestRevision = foundRevision; + } + return true; + }, + [](const Sink::Storage::Error &error) { Warning() << "Failed to read current value from storage: " << error.message; }, true); + return get(db, Sink::Storage::assembleKey(uid, latestRevision), adaptorFactory, retrievedRevision); +} + +template +EntityReader::EntityReader(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier, Sink::Storage::Transaction &transaction) + : mResourceInstanceIdentifier(resourceInstanceIdentifier), + mTransaction(transaction), + mDomainTypeAdaptorFactoryPtr(Sink::AdaptorFactoryRegistry::instance().getFactory(resourceType)), + mDomainTypeAdaptorFactory(*mDomainTypeAdaptorFactoryPtr) +{ + +} + +template +EntityReader::EntityReader(DomainTypeAdaptorFactoryInterface &domainTypeAdaptorFactory, const QByteArray &resourceInstanceIdentifier, Sink::Storage::Transaction &transaction) + : mResourceInstanceIdentifier(resourceInstanceIdentifier), + mTransaction(transaction), + mDomainTypeAdaptorFactory(domainTypeAdaptorFactory) +{ + +} + +template +DomainType EntityReader::read(const QByteArray &identifier) const +{ + auto typeName = ApplicationDomain::getTypeName(); + auto mainDatabase = Storage::mainDatabase(mTransaction, typeName); + qint64 retrievedRevision = 0; + auto bufferAdaptor = EntityReaderUtils::getLatest(mainDatabase, identifier, mDomainTypeAdaptorFactory, retrievedRevision); + if (!bufferAdaptor) { + return DomainType(); + } + return DomainType(mResourceInstanceIdentifier, identifier, retrievedRevision, bufferAdaptor); +} + +template +DomainType EntityReader::readFromKey(const QByteArray &key) const +{ + auto typeName = ApplicationDomain::getTypeName(); + auto mainDatabase = Storage::mainDatabase(mTransaction, typeName); + qint64 retrievedRevision = 0; + auto bufferAdaptor = EntityReaderUtils::get(mainDatabase, key, mDomainTypeAdaptorFactory, retrievedRevision); + const auto identifier = Storage::uidFromKey(key); + if (!bufferAdaptor) { + return DomainType(); + } + return DomainType(mResourceInstanceIdentifier, identifier, retrievedRevision, bufferAdaptor); +} + +template +DomainType EntityReader::readPrevious(const QByteArray &uid, qint64 revision) const +{ + auto typeName = ApplicationDomain::getTypeName(); + auto mainDatabase = Storage::mainDatabase(mTransaction, typeName); + qint64 retrievedRevision = 0; + auto bufferAdaptor = EntityReaderUtils::getPrevious(mainDatabase, uid, revision, mDomainTypeAdaptorFactory, retrievedRevision); + if (!bufferAdaptor) { + return DomainType(); + } + return DomainType(mResourceInstanceIdentifier, uid, retrievedRevision, bufferAdaptor); +} + +template +void EntityReader::query(const Sink::Query &query, const std::function &callback) +{ + executeInitialQuery(query, 0, 0, + [&callback](const typename DomainType::Ptr &value, Sink::Operation operation) -> bool { + Q_ASSERT(operation == Sink::Operation_Creation); + return callback(*value); + }); +} + +template +void EntityReader::readEntity(const Sink::Storage::NamedDatabase &db, const QByteArray &key, + const std::function &resultCallback) +{ + db.findLatest(key, + [=](const QByteArray &key, const QByteArray &value) -> bool { + Sink::EntityBuffer buffer(value.data(), value.size()); + const Sink::Entity &entity = buffer.entity(); + const auto metadataBuffer = Sink::EntityBuffer::readBuffer(entity.metadata()); + const qint64 revision = metadataBuffer ? metadataBuffer->revision() : -1; + const auto operation = metadataBuffer ? metadataBuffer->operation() : Sink::Operation_Creation; + auto adaptor = mDomainTypeAdaptorFactory.createAdaptor(entity); + resultCallback(DomainType::Ptr::create(mResourceInstanceIdentifier, Sink::Storage::uidFromKey(key), revision, adaptor), operation); + return false; + }, + [&](const Sink::Storage::Error &error) { Warning() << "Error during query: " << error.message << key; }); +} + +static inline ResultSet fullScan(const Sink::Storage::Transaction &transaction, const QByteArray &bufferType) +{ + // TODO use a result set with an iterator, to read values on demand + Trace() << "Looking for : " << bufferType; + //The scan can return duplicate results if we have multiple revisions, so we use a set to deduplicate. + QSet keys; + Storage::mainDatabase(transaction, bufferType) + .scan(QByteArray(), + [&](const QByteArray &key, const QByteArray &value) -> bool { + if (keys.contains(Sink::Storage::uidFromKey(key))) { + //Not something that should persist if the replay works, so we keep a message for now. + Trace() << "Multiple revisions for key: " << key; + } + keys << Sink::Storage::uidFromKey(key); + return true; + }, + [](const Sink::Storage::Error &error) { Warning() << "Error during query: " << error.message; }); + + Trace() << "Full scan retrieved " << keys.size() << " results."; + return ResultSet(keys.toList().toVector()); +} + +template +ResultSet EntityReader::loadInitialResultSet(const Sink::Query &query, QSet &remainingFilters, QByteArray &remainingSorting) +{ + if (!query.ids.isEmpty()) { + return ResultSet(query.ids.toVector()); + } + QSet appliedFilters; + QByteArray appliedSorting; + auto resultSet = Sink::ApplicationDomain::TypeImplementation::queryIndexes(query, mResourceInstanceIdentifier, appliedFilters, appliedSorting, mTransaction); + remainingFilters = query.propertyFilter.keys().toSet() - appliedFilters; + if (appliedSorting.isEmpty()) { + remainingSorting = query.sortProperty; + } + + // We do a full scan if there were no indexes available to create the initial set. + if (appliedFilters.isEmpty()) { + // TODO this should be replaced by an index lookup as well + resultSet = fullScan(mTransaction, ApplicationDomain::getTypeName()); + } + return resultSet; +} + +template +ResultSet EntityReader::loadIncrementalResultSet(qint64 baseRevision, const Sink::Query &query, QSet &remainingFilters) +{ + const auto bufferType = ApplicationDomain::getTypeName(); + auto revisionCounter = QSharedPointer::create(baseRevision); + remainingFilters = query.propertyFilter.keys().toSet(); + return ResultSet([this, bufferType, revisionCounter]() -> QByteArray { + const qint64 topRevision = Sink::Storage::maxRevision(mTransaction); + // Spit out the revision keys one by one. + while (*revisionCounter <= topRevision) { + const auto uid = Sink::Storage::getUidFromRevision(mTransaction, *revisionCounter); + const auto type = Sink::Storage::getTypeFromRevision(mTransaction, *revisionCounter); + // Trace() << "Revision" << *revisionCounter << type << uid; + Q_ASSERT(!uid.isEmpty()); + Q_ASSERT(!type.isEmpty()); + if (type != bufferType) { + // Skip revision + *revisionCounter += 1; + continue; + } + const auto key = Sink::Storage::assembleKey(uid, *revisionCounter); + *revisionCounter += 1; + return key; + } + Trace() << "Finished reading incremental result set:" << *revisionCounter; + // We're done + return QByteArray(); + }); +} + +template +ResultSet EntityReader::filterAndSortSet(ResultSet &resultSet, const std::function &filter, + const Sink::Storage::NamedDatabase &db, bool initialQuery, const QByteArray &sortProperty) +{ + const bool sortingRequired = !sortProperty.isEmpty(); + if (initialQuery && sortingRequired) { + Trace() << "Sorting the resultset in memory according to property: " << sortProperty; + // Sort the complete set by reading the sort property and filling into a sorted map + auto sortedMap = QSharedPointer>::create(); + while (resultSet.next()) { + // readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess) + readEntity(db, resultSet.id(), + [this, filter, initialQuery, sortedMap, sortProperty, &resultSet](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject, Sink::Operation operation) { + // We're not interested in removals during the initial query + if ((operation != Sink::Operation_Removal) && filter(domainObject)) { + if (!sortProperty.isEmpty()) { + const auto sortValue = domainObject->getProperty(sortProperty); + if (sortValue.type() == QVariant::DateTime) { + sortedMap->insert(QByteArray::number(std::numeric_limits::max() - sortValue.toDateTime().toTime_t()), domainObject->identifier()); + } else { + sortedMap->insert(sortValue.toString().toLatin1(), domainObject->identifier()); + } + } else { + sortedMap->insert(domainObject->identifier(), domainObject->identifier()); + } + } + }); + } + + Trace() << "Sorted " << sortedMap->size() << " values."; + auto iterator = QSharedPointer>::create(*sortedMap); + ResultSet::ValueGenerator generator = [this, iterator, sortedMap, &db, filter, initialQuery]( + std::function callback) -> bool { + if (iterator->hasNext()) { + readEntity(db, iterator->next().value(), [this, filter, callback, initialQuery](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject, + Sink::Operation operation) { callback(domainObject, Sink::Operation_Creation); }); + return true; + } + return false; + }; + + auto skip = [iterator]() { + if (iterator->hasNext()) { + iterator->next(); + } + }; + return ResultSet(generator, skip); + } else { + auto resultSetPtr = QSharedPointer::create(resultSet); + ResultSet::ValueGenerator generator = [this, resultSetPtr, &db, filter, initialQuery]( + std::function callback) -> bool { + if (resultSetPtr->next()) { + // readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess) + readEntity(db, resultSetPtr->id(), [this, filter, callback, initialQuery](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject, Sink::Operation operation) { + if (initialQuery) { + // We're not interested in removals during the initial query + if ((operation != Sink::Operation_Removal) && filter(domainObject)) { + // In the initial set every entity is new + callback(domainObject, Sink::Operation_Creation); + } + } else { + // Always remove removals, they probably don't match due to non-available properties + if ((operation == Sink::Operation_Removal) || filter(domainObject)) { + // TODO only replay if this is in the currently visible set (or just always replay, worst case we have a couple to many results) + callback(domainObject, operation); + } + } + }); + return true; + } + return false; + }; + auto skip = [resultSetPtr]() { resultSetPtr->skip(1); }; + return ResultSet(generator, skip); + } +} + +template +QPair EntityReader::load(const Sink::Query &query, const std::function &, QByteArray &)> &baseSetRetriever, bool initialQuery, int offset, int batchSize, const std::function &callback) +{ + QTime time; + time.start(); + + auto db = Storage::mainDatabase(mTransaction, ApplicationDomain::getTypeName()); + + QSet remainingFilters; + QByteArray remainingSorting; + auto resultSet = baseSetRetriever(remainingFilters, remainingSorting); + Trace() << "Base set retrieved. " << Log::TraceTime(time.elapsed()); + auto filteredSet = filterAndSortSet(resultSet, getFilter(remainingFilters, query), db, initialQuery, remainingSorting); + Trace() << "Filtered set retrieved. " << Log::TraceTime(time.elapsed()); + auto replayedEntities = replaySet(filteredSet, offset, batchSize, callback); + // Trace() << "Filtered set replayed. " << Log::TraceTime(time.elapsed()); + return qMakePair(Sink::Storage::maxRevision(mTransaction), replayedEntities); +} + +template +QPair EntityReader::executeInitialQuery(const Sink::Query &query, int offset, int batchsize, const std::function &callback) +{ + QTime time; + time.start(); + auto revisionAndReplayedEntities = load(query, [&](QSet &remainingFilters, QByteArray &remainingSorting) -> ResultSet { + return loadInitialResultSet(query, remainingFilters, remainingSorting); + }, true, offset, batchsize, callback); + Trace() << "Initial query took: " << Log::TraceTime(time.elapsed()); + return revisionAndReplayedEntities; +} + +template +QPair EntityReader::executeIncrementalQuery(const Sink::Query &query, qint64 lastRevision, const std::function &callback) +{ + QTime time; + time.start(); + const qint64 baseRevision = lastRevision + 1; + auto revisionAndReplayedEntities = load(query, [&](QSet &remainingFilters, QByteArray &remainingSorting) -> ResultSet { + return loadIncrementalResultSet(baseRevision, query, remainingFilters); + }, false, 0, 0, callback); + Trace() << "Initial query took: " << Log::TraceTime(time.elapsed()); + return revisionAndReplayedEntities; +} + +template +std::function +EntityReader::getFilter(const QSet remainingFilters, const Sink::Query &query) +{ + return [this, remainingFilters, query](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject) -> bool { + if (!query.ids.isEmpty()) { + if (!query.ids.contains(domainObject->identifier())) { + return false; + } + } + for (const auto &filterProperty : remainingFilters) { + const auto property = domainObject->getProperty(filterProperty); + if (property.isValid()) { + const auto comparator = query.propertyFilter.value(filterProperty); + if (!comparator.matches(property)) { + Trace() << "Filtering entity due to property mismatch on filter: " << filterProperty << property << ":" << comparator.value; + return false; + } + } else { + Warning() << "Ignored property filter because value is invalid: " << filterProperty; + } + } + return true; + }; +} + +template +qint64 EntityReader::replaySet(ResultSet &resultSet, int offset, int batchSize, const std::function &callback) +{ + Trace() << "Skipping over " << offset << " results"; + resultSet.skip(offset); + int counter = 0; + while (!batchSize || (counter < batchSize)) { + const bool ret = + resultSet.next([&counter, callback](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &value, Sink::Operation operation) -> bool { + counter++; + return callback(value.staticCast(), operation); + }); + if (!ret) { + break; + } + }; + Trace() << "Replayed " << counter << " results." + << "Limit " << batchSize; + return counter; +} + +template class Sink::EntityReader; +template class Sink::EntityReader; +template class Sink::EntityReader; diff --git a/common/entityreader.h b/common/entityreader.h new file mode 100644 index 0000000..a479679 --- /dev/null +++ b/common/entityreader.h @@ -0,0 +1,110 @@ + +/* + * Copyright (C) 2016 Christian Mollekopf + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) version 3, or any + * later version accepted by the membership of KDE e.V. (or its + * successor approved by the membership of KDE e.V.), which shall + * act as a proxy defined in Section 6 of version 3 of the license. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library. If not, see . + */ +#pragma once + +#include "sink_export.h" +#include + +#include "storage.h" +#include "resultprovider.h" +#include "adaptorfactoryregistry.h" + +namespace Sink { + +namespace EntityReaderUtils { + SINK_EXPORT QSharedPointer getLatest(const Sink::Storage::NamedDatabase &db, const QByteArray &uid, DomainTypeAdaptorFactoryInterface &adaptorFactory, qint64 &retrievedRevision); + SINK_EXPORT QSharedPointer get(const Sink::Storage::NamedDatabase &db, const QByteArray &key, DomainTypeAdaptorFactoryInterface &adaptorFactory, qint64 &retrievedRevision); + SINK_EXPORT QSharedPointer getPrevious(const Sink::Storage::NamedDatabase &db, const QByteArray &uid, qint64 revision, DomainTypeAdaptorFactoryInterface &adaptorFactory, qint64 &retrievedRevision); +}; + +/** + * A synchronous interface to read entities from the storage. + * + * All callbacks will be called before the end of the function. + * The caller must ensure passed in references remain valid for the lifetime of the object. + * + * This class is meaent to be instantiated temporarily during reads on the stack. + * + * Note that all objects returned in callbacks are only valid during the execution of the callback and may start pointing into invalid memory if shallow-copied. + */ +template +class SINK_EXPORT EntityReader +{ + typedef std::function ResultCallback; + +public: + EntityReader(const QByteArray &resourceType, const QByteArray &mResourceInstanceIdentifier, Sink::Storage::Transaction &transaction); + EntityReader(DomainTypeAdaptorFactoryInterface &domainTypeAdaptorFactory, const QByteArray &resourceInstanceIdentifier, Sink::Storage::Transaction &transaction); + + /** + * Reads the latest revision of an entity identified by @param uid + */ + DomainType read(const QByteArray &uid) const; + + /** + * Reads the revision of the entity identified by @param key (uid + revision) + */ + DomainType readFromKey(const QByteArray &key) const; + + /** + * Reads the (revision - 1) of an entity identified by @param uid + */ + DomainType readPrevious(const QByteArray &uid, qint64 revision) const; + + /** + * Reads all entities that match @param query. + */ + void query(const Sink::Query &query, const std::function &callback); + + /** + * Returns all entities that match @param query. + * + * @param offset and @param batchsize can be used to return paginated results. + */ + QPair executeInitialQuery(const Sink::Query &query, int offset, int batchsize, const ResultCallback &callback); + + /** + * Returns all changed entities that match @param query starting from @param lastRevision + */ + QPair executeIncrementalQuery(const Sink::Query &query, qint64 lastRevision, const ResultCallback &callback); + +private: + qint64 replaySet(ResultSet &resultSet, int offset, int batchSize, const ResultCallback &callback); + + void readEntity(const Sink::Storage::NamedDatabase &db, const QByteArray &key, + const std::function &resultCallback); + + ResultSet loadInitialResultSet(const Sink::Query &query, QSet &remainingFilters, QByteArray &remainingSorting); + ResultSet loadIncrementalResultSet(qint64 baseRevision, const Sink::Query &query, QSet &remainingFilters); + + ResultSet filterAndSortSet(ResultSet &resultSet, const std::function &filter, + const Sink::Storage::NamedDatabase &db, bool initialQuery, const QByteArray &sortProperty); + std::function getFilter(const QSet remainingFilters, const Sink::Query &query); + QPair load(const Sink::Query &query, const std::function &, QByteArray &)> &baseSetRetriever, bool initialQuery, int offset, int batchSize, const ResultCallback &callback); + +private: + QByteArray mResourceInstanceIdentifier; + Sink::Storage::Transaction &mTransaction; + std::shared_ptr mDomainTypeAdaptorFactoryPtr; + DomainTypeAdaptorFactoryInterface &mDomainTypeAdaptorFactory; +}; + +} diff --git a/common/entitystore.cpp b/common/entitystore.cpp index 5296d53..5fb213d 100644 --- a/common/entitystore.cpp +++ b/common/entitystore.cpp @@ -28,55 +28,3 @@ EntityStore::EntityStore(const QByteArray &resourceType, const QByteArray &resou } -QSharedPointer EntityStore::getLatest(const Sink::Storage::NamedDatabase &db, const QByteArray &uid, DomainTypeAdaptorFactoryInterface &adaptorFactory, qint64 &retrievedRevision) -{ - QSharedPointer current; - db.findLatest(uid, - [¤t, &adaptorFactory, &retrievedRevision](const QByteArray &key, const QByteArray &data) -> bool { - Sink::EntityBuffer buffer(const_cast(data.data()), data.size()); - if (!buffer.isValid()) { - Warning() << "Read invalid buffer from disk"; - } else { - Trace() << "Found value " << key; - current = adaptorFactory.createAdaptor(buffer.entity()); - retrievedRevision = Sink::Storage::revisionFromKey(key); - } - return false; - }, - [](const Sink::Storage::Error &error) { Warning() << "Failed to read current value from storage: " << error.message; }); - return current; -} - -QSharedPointer EntityStore::get(const Sink::Storage::NamedDatabase &db, const QByteArray &key, DomainTypeAdaptorFactoryInterface &adaptorFactory, qint64 &retrievedRevision) -{ - QSharedPointer current; - db.scan(key, - [¤t, &adaptorFactory, &retrievedRevision](const QByteArray &key, const QByteArray &data) -> bool { - Sink::EntityBuffer buffer(const_cast(data.data()), data.size()); - if (!buffer.isValid()) { - Warning() << "Read invalid buffer from disk"; - } else { - current = adaptorFactory.createAdaptor(buffer.entity()); - retrievedRevision = Sink::Storage::revisionFromKey(key); - } - return false; - }, - [](const Sink::Storage::Error &error) { Warning() << "Failed to read current value from storage: " << error.message; }); - return current; -} - -QSharedPointer EntityStore::getPrevious(const Sink::Storage::NamedDatabase &db, const QByteArray &uid, qint64 revision, DomainTypeAdaptorFactoryInterface &adaptorFactory, qint64 &retrievedRevision) -{ - QSharedPointer current; - qint64 latestRevision = 0; - db.scan(uid, - [¤t, &latestRevision, revision](const QByteArray &key, const QByteArray &) -> bool { - auto foundRevision = Sink::Storage::revisionFromKey(key); - if (foundRevision < revision && foundRevision > latestRevision) { - latestRevision = foundRevision; - } - return true; - }, - [](const Sink::Storage::Error &error) { Warning() << "Failed to read current value from storage: " << error.message; }, true); - return get(db, Sink::Storage::assembleKey(uid, latestRevision), adaptorFactory, retrievedRevision); -} diff --git a/common/entitystore.h b/common/entitystore.h index 24f43b1..b795b26 100644 --- a/common/entitystore.h +++ b/common/entitystore.h @@ -24,6 +24,7 @@ #include "storage.h" #include "adaptorfactoryregistry.h" +#include "entityreader.h" namespace Sink { @@ -35,48 +36,24 @@ public: template T read(const QByteArray &identifier) const { - auto typeName = ApplicationDomain::getTypeName(); - auto mainDatabase = Storage::mainDatabase(mTransaction, typeName); - qint64 retrievedRevision = 0; - auto bufferAdaptor = getLatest(mainDatabase, identifier, *Sink::AdaptorFactoryRegistry::instance().getFactory(mResourceType), retrievedRevision); - if (!bufferAdaptor) { - return T(); - } - return T(mResourceInstanceIdentifier, identifier, retrievedRevision, bufferAdaptor); + EntityReader reader(mResourceType, mResourceInstanceIdentifier, mTransaction); + return reader.read(identifier); } template T readFromKey(const QByteArray &key) const { - auto typeName = ApplicationDomain::getTypeName(); - auto mainDatabase = Storage::mainDatabase(mTransaction, typeName); - qint64 retrievedRevision = 0; - auto bufferAdaptor = get(mainDatabase, key, *Sink::AdaptorFactoryRegistry::instance().getFactory(mResourceType), retrievedRevision); - const auto identifier = Storage::uidFromKey(key); - if (!bufferAdaptor) { - return T(); - } - return T(mResourceInstanceIdentifier, identifier, retrievedRevision, bufferAdaptor); + EntityReader reader(mResourceType, mResourceInstanceIdentifier, mTransaction); + return reader.readFromKey(key); } template T readPrevious(const QByteArray &uid, qint64 revision) const { - auto typeName = ApplicationDomain::getTypeName(); - auto mainDatabase = Storage::mainDatabase(mTransaction, typeName); - qint64 retrievedRevision = 0; - auto bufferAdaptor = getPrevious(mainDatabase, uid, revision, *Sink::AdaptorFactoryRegistry::instance().getFactory(mResourceType), retrievedRevision); - if (!bufferAdaptor) { - return T(); - } - return T(mResourceInstanceIdentifier, uid, retrievedRevision, bufferAdaptor); + EntityReader reader(mResourceType, mResourceInstanceIdentifier, mTransaction); + return reader.readPrevious(uid, revision); } - - - static QSharedPointer getLatest(const Sink::Storage::NamedDatabase &db, const QByteArray &uid, DomainTypeAdaptorFactoryInterface &adaptorFactory, qint64 &retrievedRevision); - static QSharedPointer get(const Sink::Storage::NamedDatabase &db, const QByteArray &key, DomainTypeAdaptorFactoryInterface &adaptorFactory, qint64 &retrievedRevision); - static QSharedPointer getPrevious(const Sink::Storage::NamedDatabase &db, const QByteArray &uid, qint64 revision, DomainTypeAdaptorFactoryInterface &adaptorFactory, qint64 &retrievedRevision); private: QByteArray mResourceType; QByteArray mResourceInstanceIdentifier; diff --git a/common/query.h b/common/query.h index 8a7edf6..717ed75 100644 --- a/common/query.h +++ b/common/query.h @@ -40,6 +40,23 @@ public: }; Q_DECLARE_FLAGS(Flags, Flag) + struct Comparator { + enum Comparators { + Invalid, + Equals, + Contains + }; + + Comparator(); + Comparator(const QVariant &v); + Comparator(const QVariant &v, Comparators c); + bool matches(const QVariant &v) const; + + QVariant value; + Comparators comparator; + }; + + static Query PropertyFilter(const QByteArray &key, const QVariant &value) { Query query; @@ -160,6 +177,13 @@ public: return *this; } + template + Query &filter(const Comparator &comparator) + { + propertyFilter.insert(T::name, comparator); + return *this; + } + template Query &filter(const ApplicationDomain::Entity &value) { @@ -199,22 +223,6 @@ public: return lhs; } - struct Comparator { - enum Comparators { - Invalid, - Equals, - Contains - }; - - Comparator(); - Comparator(const QVariant &v); - Comparator(const QVariant &v, Comparators c); - bool matches(const QVariant &v) const; - - QVariant value; - Comparators comparator; - }; - QByteArrayList resources; QByteArrayList accounts; QByteArrayList ids; diff --git a/common/queryrunner.cpp b/common/queryrunner.cpp index cb8157e..c6a6b86 100644 --- a/common/queryrunner.cpp +++ b/common/queryrunner.cpp @@ -27,6 +27,7 @@ #include "definitions.h" #include "domainadaptor.h" #include "asyncutils.h" +#include "entityreader.h" #undef DEBUG_AREA #define DEBUG_AREA "client.queryrunner" @@ -51,27 +52,13 @@ public: QPair executeInitialQuery(const Sink::Query &query, const typename DomainType::Ptr &parent, Sink::ResultProviderInterface &resultProvider, int offset, int batchsize); private: - qint64 replaySet(ResultSet &resultSet, Sink::ResultProviderInterface &resultProvider, const QList &properties, int offset, int batchSize); + Storage::Transaction getTransaction(); + std::function resultProviderCallback(const Sink::Query &query, Sink::ResultProviderInterface &resultProvider); - void readEntity(const Sink::Storage::NamedDatabase &db, const QByteArray &key, - const std::function &resultCallback); - - ResultSet loadInitialResultSet(const Sink::Query &query, Sink::Storage::Transaction &transaction, QSet &remainingFilters, QByteArray &remainingSorting); - ResultSet loadIncrementalResultSet(qint64 baseRevision, const Sink::Query &query, Sink::Storage::Transaction &transaction, QSet &remainingFilters); - - ResultSet filterAndSortSet(ResultSet &resultSet, const std::function &filter, - const Sink::Storage::NamedDatabase &db, bool initialQuery, const QByteArray &sortProperty); - std::function getFilter(const QSet remainingFilters, const Sink::Query &query); - QPair load(const Sink::Query &query, const std::function &, QByteArray &)> &baseSetRetriever, - Sink::ResultProviderInterface &resultProvider, bool initialQuery, int offset, int batchSize); - -private: QueryRunnerBase::ResultTransformation mResultTransformation; DomainTypeAdaptorFactoryInterface::Ptr mDomainTypeAdaptorFactory; QByteArray mResourceInstanceIdentifier; - QByteArray mBufferType; QByteArray mId; //Used for identification in debug output - Sink::Query mQuery; }; #undef Trace @@ -147,35 +134,13 @@ typename Sink::ResultEmitter::Ptr QueryRunner keys; - Storage::mainDatabase(transaction, bufferType) - .scan(QByteArray(), - [&](const QByteArray &key, const QByteArray &value) -> bool { - if (keys.contains(Sink::Storage::uidFromKey(key))) { - //Not something that should persist if the replay works, so we keep a message for now. - Trace() << "Multiple revisions for key: " << key; - } - keys << Sink::Storage::uidFromKey(key); - return true; - }, - [](const Sink::Storage::Error &error) { Warning() << "Error during query: " << error.message; }); - - Trace() << "Full scan retrieved " << keys.size() << " results."; - return ResultSet(keys.toList().toVector()); -} - #undef Trace #define Trace() Trace_area("client.queryrunner." + mId) template QueryWorker::QueryWorker(const Sink::Query &query, const QByteArray &instanceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &factory, const QByteArray &bufferType, const QueryRunnerBase::ResultTransformation &transformation) - : QObject(), mResultTransformation(transformation), mDomainTypeAdaptorFactory(factory), mResourceInstanceIdentifier(instanceIdentifier), mBufferType(bufferType), mId(QUuid::createUuid().toByteArray()), mQuery(query) + : QObject(), mResultTransformation(transformation), mDomainTypeAdaptorFactory(factory), mResourceInstanceIdentifier(instanceIdentifier), mId(QUuid::createUuid().toByteArray()) { Trace() << "Starting query worker"; } @@ -187,228 +152,48 @@ QueryWorker::~QueryWorker() } template -qint64 QueryWorker::replaySet(ResultSet &resultSet, Sink::ResultProviderInterface &resultProvider, const QList &properties, int offset, int batchSize) +std::function QueryWorker::resultProviderCallback(const Sink::Query &query, Sink::ResultProviderInterface &resultProvider) { - Trace() << "Skipping over " << offset << " results"; - resultSet.skip(offset); - int counter = 0; - while (!batchSize || (counter < batchSize)) { - const bool ret = - resultSet.next([this, &resultProvider, &counter, &properties, batchSize](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &value, Sink::Operation operation) -> bool { - counter++; - auto valueCopy = Sink::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation(*value, properties).template staticCast(); - if (mResultTransformation) { - mResultTransformation(*valueCopy); - } - switch (operation) { - case Sink::Operation_Creation: - // Trace() << "Got creation"; - resultProvider.add(valueCopy); - break; - case Sink::Operation_Modification: - // Trace() << "Got modification"; - resultProvider.modify(valueCopy); - break; - case Sink::Operation_Removal: - // Trace() << "Got removal"; - resultProvider.remove(valueCopy); - break; - } - return true; - }); - if (!ret) { - break; + return [this, &query, &resultProvider](const typename DomainType::Ptr &domainObject, Sink::Operation operation) -> bool { + auto valueCopy = Sink::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation(*domainObject, query.requestedProperties).template staticCast(); + if (mResultTransformation) { + mResultTransformation(*valueCopy); } - }; - Trace() << "Replayed " << counter << " results." - << "Limit " << batchSize; - return counter; -} - -template -void QueryWorker::readEntity(const Sink::Storage::NamedDatabase &db, const QByteArray &key, - const std::function &resultCallback) -{ - // This only works for a 1:1 mapping of resource to domain types. - // Not i.e. for tags that are stored as flags in each entity of an imap store. - // additional properties that don't have a 1:1 mapping (such as separately stored tags), - // could be added to the adaptor. - db.findLatest(key, - [=](const QByteArray &key, const QByteArray &value) -> bool { - Sink::EntityBuffer buffer(value.data(), value.size()); - const Sink::Entity &entity = buffer.entity(); - const auto metadataBuffer = Sink::EntityBuffer::readBuffer(entity.metadata()); - const qint64 revision = metadataBuffer ? metadataBuffer->revision() : -1; - const auto operation = metadataBuffer ? metadataBuffer->operation() : Sink::Operation_Creation; - auto adaptor = mDomainTypeAdaptorFactory->createAdaptor(entity); - resultCallback(DomainType::Ptr::create(mResourceInstanceIdentifier, Sink::Storage::uidFromKey(key), revision, adaptor), operation); - return false; - }, - [&](const Sink::Storage::Error &error) { Warning() << "Error during query: " << error.message << key; }); -} - -template -ResultSet QueryWorker::loadInitialResultSet(const Sink::Query &query, Sink::Storage::Transaction &transaction, QSet &remainingFilters, QByteArray &remainingSorting) -{ - if (!query.ids.isEmpty()) { - return ResultSet(query.ids.toVector()); - } - QSet appliedFilters; - QByteArray appliedSorting; - auto resultSet = Sink::ApplicationDomain::TypeImplementation::queryIndexes(query, mResourceInstanceIdentifier, appliedFilters, appliedSorting, transaction); - remainingFilters = query.propertyFilter.keys().toSet() - appliedFilters; - if (appliedSorting.isEmpty()) { - remainingSorting = query.sortProperty; - } - - // We do a full scan if there were no indexes available to create the initial set. - if (appliedFilters.isEmpty()) { - // TODO this should be replaced by an index lookup as well - resultSet = fullScan(transaction, mBufferType); - } - return resultSet; -} - -template -ResultSet QueryWorker::loadIncrementalResultSet(qint64 baseRevision, const Sink::Query &query, Sink::Storage::Transaction &transaction, QSet &remainingFilters) -{ - const auto bufferType = mBufferType; - auto revisionCounter = QSharedPointer::create(baseRevision); - remainingFilters = query.propertyFilter.keys().toSet(); - return ResultSet([this, bufferType, revisionCounter, &transaction]() -> QByteArray { - const qint64 topRevision = Sink::Storage::maxRevision(transaction); - // Spit out the revision keys one by one. - while (*revisionCounter <= topRevision) { - const auto uid = Sink::Storage::getUidFromRevision(transaction, *revisionCounter); - const auto type = Sink::Storage::getTypeFromRevision(transaction, *revisionCounter); - // Trace() << "Revision" << *revisionCounter << type << uid; - Q_ASSERT(!uid.isEmpty()); - Q_ASSERT(!type.isEmpty()); - if (type != bufferType) { - // Skip revision - *revisionCounter += 1; - continue; - } - const auto key = Sink::Storage::assembleKey(uid, *revisionCounter); - *revisionCounter += 1; - return key; + switch (operation) { + case Sink::Operation_Creation: + // Trace() << "Got creation"; + resultProvider.add(valueCopy); + break; + case Sink::Operation_Modification: + // Trace() << "Got modification"; + resultProvider.modify(valueCopy); + break; + case Sink::Operation_Removal: + // Trace() << "Got removal"; + resultProvider.remove(valueCopy); + break; } - Trace() << "Finished reading incremental result set:" << *revisionCounter; - // We're done - return QByteArray(); - }); + return true; + }; } template -ResultSet QueryWorker::filterAndSortSet(ResultSet &resultSet, const std::function &filter, - const Sink::Storage::NamedDatabase &db, bool initialQuery, const QByteArray &sortProperty) +QPair QueryWorker::executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface &resultProvider) { - const bool sortingRequired = !sortProperty.isEmpty(); - if (initialQuery && sortingRequired) { - Trace() << "Sorting the resultset in memory according to property: " << sortProperty; - // Sort the complete set by reading the sort property and filling into a sorted map - auto sortedMap = QSharedPointer>::create(); - while (resultSet.next()) { - // readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess) - readEntity(db, resultSet.id(), - [this, filter, initialQuery, sortedMap, sortProperty, &resultSet](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject, Sink::Operation operation) { - // We're not interested in removals during the initial query - if ((operation != Sink::Operation_Removal) && filter(domainObject)) { - if (!sortProperty.isEmpty()) { - const auto sortValue = domainObject->getProperty(sortProperty); - if (sortValue.type() == QVariant::DateTime) { - sortedMap->insert(QByteArray::number(std::numeric_limits::max() - sortValue.toDateTime().toTime_t()), domainObject->identifier()); - } else { - sortedMap->insert(sortValue.toString().toLatin1(), domainObject->identifier()); - } - } else { - sortedMap->insert(domainObject->identifier(), domainObject->identifier()); - } - } - }); - } + QTime time; + time.start(); - Trace() << "Sorted " << sortedMap->size() << " values."; - auto iterator = QSharedPointer>::create(*sortedMap); - ResultSet::ValueGenerator generator = [this, iterator, sortedMap, &db, filter, initialQuery]( - std::function callback) -> bool { - if (iterator->hasNext()) { - readEntity(db, iterator->next().value(), [this, filter, callback, initialQuery](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject, - Sink::Operation operation) { callback(domainObject, Sink::Operation_Creation); }); - return true; - } - return false; - }; - - auto skip = [iterator]() { - if (iterator->hasNext()) { - iterator->next(); - } - }; - return ResultSet(generator, skip); - } else { - auto resultSetPtr = QSharedPointer::create(resultSet); - ResultSet::ValueGenerator generator = [this, resultSetPtr, &db, filter, initialQuery]( - std::function callback) -> bool { - if (resultSetPtr->next()) { - // readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess) - readEntity(db, resultSetPtr->id(), [this, filter, callback, initialQuery](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject, Sink::Operation operation) { - if (initialQuery) { - // We're not interested in removals during the initial query - if ((operation != Sink::Operation_Removal) && filter(domainObject)) { - // In the initial set every entity is new - callback(domainObject, Sink::Operation_Creation); - } - } else { - // Always remove removals, they probably don't match due to non-available properties - if ((operation == Sink::Operation_Removal) || filter(domainObject)) { - // TODO only replay if this is in the currently visible set (or just always replay, worst case we have a couple to many results) - callback(domainObject, operation); - } - } - }); - return true; - } - return false; - }; - auto skip = [resultSetPtr]() { resultSetPtr->skip(1); }; - return ResultSet(generator, skip); - } -} + auto transaction = getTransaction(); -template -std::function -QueryWorker::getFilter(const QSet remainingFilters, const Sink::Query &query) -{ - return [this, remainingFilters, query](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject) -> bool { - if (!query.ids.isEmpty()) { - if (!query.ids.contains(domainObject->identifier())) { - return false; - } - } - for (const auto &filterProperty : remainingFilters) { - const auto property = domainObject->getProperty(filterProperty); - if (property.isValid()) { - const auto comparator = query.propertyFilter.value(filterProperty); - if (!comparator.matches(property)) { - Trace() << "Filtering entity due to property mismatch on filter: " << filterProperty << property << ":" << comparator.value; - return false; - } - } else { - Warning() << "Ignored property filter because value is invalid: " << filterProperty; - } - } - return true; - }; + Sink::EntityReader reader(*mDomainTypeAdaptorFactory, mResourceInstanceIdentifier, transaction); + auto revisionAndReplayedEntities = reader.executeIncrementalQuery(query, resultProvider.revision(), resultProviderCallback(query, resultProvider)); + Trace() << "Incremental query took: " << Log::TraceTime(time.elapsed()); + return revisionAndReplayedEntities; } template -QPair QueryWorker::load(const Sink::Query &query, const std::function &, QByteArray &)> &baseSetRetriever, - Sink::ResultProviderInterface &resultProvider, bool initialQuery, int offset, int batchSize) +Storage::Transaction QueryWorker::getTransaction() { - QTime time; - time.start(); - Sink::Storage::Transaction transaction; { Sink::Storage storage(Sink::storageLocation(), mResourceInstanceIdentifier); @@ -422,33 +207,7 @@ QPair QueryWorker::load(const Sink::Query &query, co Sink::Storage storage(Sink::storageLocation(), mResourceInstanceIdentifier); transaction = storage.createTransaction(Sink::Storage::ReadOnly); } - auto db = Storage::mainDatabase(transaction, mBufferType); - - QSet remainingFilters; - QByteArray remainingSorting; - auto resultSet = baseSetRetriever(transaction, remainingFilters, remainingSorting); - Trace() << "Base set retrieved. " << Log::TraceTime(time.elapsed()); - auto filteredSet = filterAndSortSet(resultSet, getFilter(remainingFilters, query), db, initialQuery, remainingSorting); - Trace() << "Filtered set retrieved. " << Log::TraceTime(time.elapsed()); - auto replayedEntities = replaySet(filteredSet, resultProvider, query.requestedProperties, offset, batchSize); - Trace() << "Filtered set replayed. " << Log::TraceTime(time.elapsed()); - resultProvider.setRevision(Sink::Storage::maxRevision(transaction)); - return qMakePair(Sink::Storage::maxRevision(transaction), replayedEntities); -} - -template -QPair QueryWorker::executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface &resultProvider) -{ - QTime time; - time.start(); - - const qint64 baseRevision = resultProvider.revision() + 1; - Trace() << "Running incremental query " << baseRevision; - auto revisionAndReplayedEntities = load(query, [&](Sink::Storage::Transaction &transaction, QSet &remainingFilters, QByteArray &remainingSorting) -> ResultSet { - return loadIncrementalResultSet(baseRevision, query, transaction, remainingFilters); - }, resultProvider, false, 0, 0); - Trace() << "Incremental query took: " << Log::TraceTime(time.elapsed()); - return revisionAndReplayedEntities; + return transaction; } template @@ -468,9 +227,11 @@ QPair QueryWorker::executeInitialQuery( modifiedQuery.propertyFilter.insert(query.parentProperty, Query::Comparator(QVariant())); } } - auto revisionAndReplayedEntities = load(modifiedQuery, [&](Sink::Storage::Transaction &transaction, QSet &remainingFilters, QByteArray &remainingSorting) -> ResultSet { - return loadInitialResultSet(modifiedQuery, transaction, remainingFilters, remainingSorting); - }, resultProvider, true, offset, batchsize); + + auto transaction = getTransaction(); + + Sink::EntityReader reader(*mDomainTypeAdaptorFactory, mResourceInstanceIdentifier, transaction); + auto revisionAndReplayedEntities = reader.executeInitialQuery(modifiedQuery, offset, batchsize, resultProviderCallback(query, resultProvider)); Trace() << "Initial query took: " << Log::TraceTime(time.elapsed()); resultProvider.initialResultSetComplete(parent); return revisionAndReplayedEntities; diff --git a/common/synchronizer.cpp b/common/synchronizer.cpp index 1bac5d9..0314997 100644 --- a/common/synchronizer.cpp +++ b/common/synchronizer.cpp @@ -145,7 +145,7 @@ void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray sinkId, bufferType, entity, *adaptorFactory, [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::CreateEntityCommand, buffer); }); } else { // modification qint64 retrievedRevision = 0; - if (auto current = store().getLatest(mainDatabase, sinkId, *adaptorFactory, retrievedRevision)) { + if (auto current = EntityReaderUtils::getLatest(mainDatabase, sinkId, *adaptorFactory, retrievedRevision)) { bool changed = false; for (const auto &property : entity.changedProperties()) { if (entity.getProperty(property) != current->getProperty(property)) { -- cgit v1.2.3