From 4a14a6fade947aa830d3f21598a4a6ba7316b933 Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Mon, 19 Sep 2016 18:55:21 +0200 Subject: Refactored the query part of the entity reader into DataStoreQuery. DataStoreQuery now encapsulates the low-level query that operates directly on the storage. It no longer has access to the resource buffers, and is instantiated by the type implementation, so we can specialize the query alogorithm per type, but not per resource. This will allow us to implement the threading queries for the mailtype. --- common/CMakeLists.txt | 1 + common/datastorequery.cpp | 246 +++++++++++++++++++++++++++++++++++++++++++++ common/datastorequery.h | 59 +++++++++++ common/domain/event.cpp | 13 +++ common/domain/event.h | 2 + common/domain/folder.cpp | 11 ++ common/domain/folder.h | 2 + common/domain/mail.cpp | 13 +++ common/domain/mail.h | 2 + common/entitybuffer.cpp | 14 ++- common/entitybuffer.h | 6 +- common/entityreader.cpp | 250 +++++++--------------------------------------- common/entityreader.h | 11 -- common/resultset.cpp | 6 +- common/resultset.h | 7 +- 15 files changed, 409 insertions(+), 234 deletions(-) create mode 100644 common/datastorequery.cpp create mode 100644 common/datastorequery.h diff --git a/common/CMakeLists.txt b/common/CMakeLists.txt index 5eb15ba..0fc8d81 100644 --- a/common/CMakeLists.txt +++ b/common/CMakeLists.txt @@ -75,6 +75,7 @@ set(command_SRCS entityreader.cpp mailpreprocessor.cpp specialpurposepreprocessor.cpp + datastorequery.cpp ${storage_SRCS}) add_library(${PROJECT_NAME} SHARED ${command_SRCS}) diff --git a/common/datastorequery.cpp b/common/datastorequery.cpp new file mode 100644 index 0000000..3237c53 --- /dev/null +++ b/common/datastorequery.cpp @@ -0,0 +1,246 @@ +/* + * Copyright (C) 2016 Christian Mollekopf + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the + * Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ +#include "datastorequery.h" + +#include "log.h" +#include "entitybuffer.h" +#include "entity_generated.h" + +using namespace Sink; + + +SINK_DEBUG_AREA("datastorequery") + +DataStoreQuery::DataStoreQuery(const Sink::Query &query, const QByteArray &type, Sink::Storage::Transaction &transaction, TypeIndex &typeIndex, std::function getProperty) + : mQuery(query), mTransaction(transaction), mType(type), mTypeIndex(typeIndex), mDb(Storage::mainDatabase(mTransaction, mType)), mGetProperty(getProperty) +{ + +} + +static inline ResultSet fullScan(const Sink::Storage::Transaction &transaction, const QByteArray &bufferType) +{ + // TODO use a result set with an iterator, to read values on demand + SinkTrace() << "Looking for : " << bufferType; + //The scan can return duplicate results if we have multiple revisions, so we use a set to deduplicate. + QSet keys; + Storage::mainDatabase(transaction, bufferType) + .scan(QByteArray(), + [&](const QByteArray &key, const QByteArray &value) -> bool { + if (keys.contains(Sink::Storage::uidFromKey(key))) { + //Not something that should persist if the replay works, so we keep a message for now. + SinkTrace() << "Multiple revisions for key: " << key; + } + keys << Sink::Storage::uidFromKey(key); + return true; + }, + [](const Sink::Storage::Error &error) { SinkWarning() << "Error during query: " << error.message; }); + + SinkTrace() << "Full scan retrieved " << keys.size() << " results."; + return ResultSet(keys.toList().toVector()); +} + +ResultSet DataStoreQuery::loadInitialResultSet(QSet &remainingFilters, QByteArray &remainingSorting) +{ + if (!mQuery.ids.isEmpty()) { + return ResultSet(mQuery.ids.toVector()); + } + QSet appliedFilters; + QByteArray appliedSorting; + + auto resultSet = mTypeIndex.query(mQuery, appliedFilters, appliedSorting, mTransaction); + + remainingFilters = mQuery.propertyFilter.keys().toSet() - appliedFilters; + if (appliedSorting.isEmpty()) { + remainingSorting = mQuery.sortProperty; + } + + // We do a full scan if there were no indexes available to create the initial set. + if (appliedFilters.isEmpty()) { + // TODO this should be replaced by an index lookup as well + resultSet = fullScan(mTransaction, mType); + } + return resultSet; +} + +ResultSet DataStoreQuery::loadIncrementalResultSet(qint64 baseRevision, QSet &remainingFilters) +{ + const auto bufferType = mType; + auto revisionCounter = QSharedPointer::create(baseRevision); + remainingFilters = mQuery.propertyFilter.keys().toSet(); + return ResultSet([this, bufferType, revisionCounter]() -> QByteArray { + const qint64 topRevision = Sink::Storage::maxRevision(mTransaction); + // Spit out the revision keys one by one. + while (*revisionCounter <= topRevision) { + const auto uid = Sink::Storage::getUidFromRevision(mTransaction, *revisionCounter); + const auto type = Sink::Storage::getTypeFromRevision(mTransaction, *revisionCounter); + // SinkTrace() << "Revision" << *revisionCounter << type << uid; + Q_ASSERT(!uid.isEmpty()); + Q_ASSERT(!type.isEmpty()); + if (type != bufferType) { + // Skip revision + *revisionCounter += 1; + continue; + } + const auto key = Sink::Storage::assembleKey(uid, *revisionCounter); + *revisionCounter += 1; + return key; + } + SinkTrace() << "Finished reading incremental result set:" << *revisionCounter; + // We're done + return QByteArray(); + }); +} + +void DataStoreQuery::readEntity(const QByteArray &key, const BufferCallback &resultCallback) +{ + mDb.findLatest(key, + [=](const QByteArray &key, const QByteArray &value) -> bool { + resultCallback(Sink::Storage::uidFromKey(key), Sink::EntityBuffer(value.data(), value.size())); + return false; + }, + [&](const Sink::Storage::Error &error) { SinkWarning() << "Error during query: " << error.message << key; }); +} + +QVariant DataStoreQuery::getProperty(const Sink::Entity &entity, const QByteArray &property) +{ + return mGetProperty(entity, property); +} + +ResultSet DataStoreQuery::filterAndSortSet(ResultSet &resultSet, const FilterFunction &filter, bool initialQuery, const QByteArray &sortProperty) +{ + const bool sortingRequired = !sortProperty.isEmpty(); + if (initialQuery && sortingRequired) { + SinkTrace() << "Sorting the resultset in memory according to property: " << sortProperty; + // Sort the complete set by reading the sort property and filling into a sorted map + auto sortedMap = QSharedPointer>::create(); + while (resultSet.next()) { + // readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess) + readEntity(resultSet.id(), + [this, filter, initialQuery, sortedMap, sortProperty, &resultSet](const QByteArray &uid, const Sink::EntityBuffer &buffer) { + + const auto operation = buffer.operation(); + + // We're not interested in removals during the initial query + if ((operation != Sink::Operation_Removal) && filter(uid, buffer)) { + if (!sortProperty.isEmpty()) { + const auto sortValue = getProperty(buffer.entity(), sortProperty); + if (sortValue.type() == QVariant::DateTime) { + sortedMap->insert(QByteArray::number(std::numeric_limits::max() - sortValue.toDateTime().toTime_t()), uid); + } else { + sortedMap->insert(sortValue.toString().toLatin1(), uid); + } + } else { + sortedMap->insert(uid, uid); + } + } + }); + } + + SinkTrace() << "Sorted " << sortedMap->size() << " values."; + auto iterator = QSharedPointer>::create(*sortedMap); + ResultSet::ValueGenerator generator = [this, iterator, sortedMap, filter, initialQuery]( + std::function callback) -> bool { + if (iterator->hasNext()) { + readEntity(iterator->next().value(), [this, filter, callback, initialQuery](const QByteArray &uid, const Sink::EntityBuffer &buffer) { + callback(uid, buffer, Sink::Operation_Creation); + }); + return true; + } + return false; + }; + + auto skip = [iterator]() { + if (iterator->hasNext()) { + iterator->next(); + } + }; + return ResultSet(generator, skip); + } else { + auto resultSetPtr = QSharedPointer::create(resultSet); + ResultSet::ValueGenerator generator = [this, resultSetPtr, filter, initialQuery](const ResultSet::Callback &callback) -> bool { + if (resultSetPtr->next()) { + SinkTrace() << "Reading the next value: " << resultSetPtr->id(); + // readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess) + readEntity(resultSetPtr->id(), [this, filter, callback, initialQuery](const QByteArray &uid, const Sink::EntityBuffer &buffer) { + const auto operation = buffer.operation(); + if (initialQuery) { + // We're not interested in removals during the initial query + if ((operation != Sink::Operation_Removal) && filter(uid, buffer)) { + // In the initial set every entity is new + callback(uid, buffer, Sink::Operation_Creation); + } + } else { + // Always remove removals, they probably don't match due to non-available properties + if ((operation == Sink::Operation_Removal) || filter(uid, buffer)) { + // TODO only replay if this is in the currently visible set (or just always replay, worst case we have a couple to many results) + callback(uid, buffer, operation); + } + } + }); + return true; + } + return false; + }; + auto skip = [resultSetPtr]() { resultSetPtr->skip(1); }; + return ResultSet(generator, skip); + } +} + + +DataStoreQuery::FilterFunction DataStoreQuery::getFilter(const QSet &remainingFilters) +{ + auto query = mQuery; + return [this, remainingFilters, query](const QByteArray &uid, const Sink::EntityBuffer &entity) -> bool { + if (!query.ids.isEmpty()) { + if (!query.ids.contains(uid)) { + SinkTrace() << "Filter by uid: " << uid; + return false; + } + } + for (const auto &filterProperty : remainingFilters) { + const auto property = getProperty(entity.entity(), filterProperty); + const auto comparator = query.propertyFilter.value(filterProperty); + if (!comparator.matches(property)) { + SinkTrace() << "Filtering entity due to property mismatch on filter: " << filterProperty << property << ":" << comparator.value; + return false; + } + } + return true; + }; +} + +ResultSet DataStoreQuery::update(qint64 baseRevision) +{ + SinkTrace() << "Executing query update"; + QSet remainingFilters; + QByteArray remainingSorting; + auto resultSet = loadIncrementalResultSet(baseRevision, remainingFilters); + auto filteredSet = filterAndSortSet(resultSet, getFilter(remainingFilters), false, remainingSorting); + return filteredSet; +} + +ResultSet DataStoreQuery::execute() +{ + SinkTrace() << "Executing query"; + QSet remainingFilters; + QByteArray remainingSorting; + auto resultSet = loadInitialResultSet(remainingFilters, remainingSorting); + auto filteredSet = filterAndSortSet(resultSet, getFilter(remainingFilters), true, remainingSorting); + return filteredSet; +} diff --git a/common/datastorequery.h b/common/datastorequery.h new file mode 100644 index 0000000..cf9d9e2 --- /dev/null +++ b/common/datastorequery.h @@ -0,0 +1,59 @@ +/* + * Copyright (C) 2016 Christian Mollekopf + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the + * Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ +#pragma once + +#include "query.h" +#include "storage.h" +#include "resultset.h" +#include "typeindex.h" +#include "query.h" +#include "entitybuffer.h" + +class DataStoreQuery { +public: + DataStoreQuery(const Sink::Query &query, const QByteArray &type, Sink::Storage::Transaction &transaction, TypeIndex &typeIndex, std::function getProperty); + ResultSet execute(); + ResultSet update(qint64 baseRevision); + +private: + + typedef std::function FilterFunction; + typedef std::function BufferCallback; + + QVariant getProperty(const Sink::Entity &entity, const QByteArray &property); + + void readEntity(const QByteArray &key, const BufferCallback &resultCallback); + + ResultSet loadInitialResultSet(QSet &remainingFilters, QByteArray &remainingSorting); + ResultSet loadIncrementalResultSet(qint64 baseRevision, QSet &remainingFilters); + + ResultSet filterAndSortSet(ResultSet &resultSet, const FilterFunction &filter, bool initialQuery, const QByteArray &sortProperty); + FilterFunction getFilter(const QSet &remainingFilters); + + Sink::Query mQuery; + Sink::Storage::Transaction &mTransaction; + const QByteArray mType; + TypeIndex &mTypeIndex; + Sink::Storage::NamedDatabase mDb; + std::function mGetProperty; +}; + + + + diff --git a/common/domain/event.cpp b/common/domain/event.cpp index 0909bf1..dfbcb61 100644 --- a/common/domain/event.cpp +++ b/common/domain/event.cpp @@ -32,6 +32,8 @@ #include "../query.h" #include "../definitions.h" #include "../typeindex.h" +#include "entitybuffer.h" +#include "entity_generated.h" #include "event_generated.h" @@ -84,3 +86,14 @@ QSharedPointer::BufferBuilder> > T propertyMapper->addMapping(&BufferBuilder::add_attachment); return propertyMapper; } + +DataStoreQuery TypeImplementation::prepareQuery(const Sink::Query &query, Sink::Storage::Transaction &transaction) +{ + + auto mapper = initializeReadPropertyMapper(); + return DataStoreQuery(query, ApplicationDomain::getTypeName(), transaction, getIndex(), [mapper](const Sink::Entity &entity, const QByteArray &property) { + + const auto localBuffer = Sink::EntityBuffer::readBuffer(entity.local()); + return mapper->getProperty(property, localBuffer); + }); +} diff --git a/common/domain/event.h b/common/domain/event.h index 5315566..4ac572c 100644 --- a/common/domain/event.h +++ b/common/domain/event.h @@ -21,6 +21,7 @@ #include "applicationdomaintype.h" #include "storage.h" +#include "datastorequery.h" class ResultSet; class QByteArray; @@ -50,6 +51,7 @@ public: typedef Sink::ApplicationDomain::Buffer::Event Buffer; typedef Sink::ApplicationDomain::Buffer::EventBuilder BufferBuilder; static QSet indexedProperties(); + static DataStoreQuery prepareQuery(const Sink::Query &query, Sink::Storage::Transaction &transaction); /** * Returns the potential result set based on the indexes. * diff --git a/common/domain/folder.cpp b/common/domain/folder.cpp index ddb0c10..6d487b1 100644 --- a/common/domain/folder.cpp +++ b/common/domain/folder.cpp @@ -32,6 +32,8 @@ #include "../query.h" #include "../definitions.h" #include "../typeindex.h" +#include "entitybuffer.h" +#include "entity_generated.h" #include "folder_generated.h" @@ -88,3 +90,12 @@ QSharedPointer::BufferBuilder> > propertyMapper->addMapping(&BufferBuilder::add_specialpurpose); return propertyMapper; } + +DataStoreQuery TypeImplementation::prepareQuery(const Sink::Query &query, Sink::Storage::Transaction &transaction) +{ + auto mapper = initializeReadPropertyMapper(); + return DataStoreQuery(query, ApplicationDomain::getTypeName(), transaction, getIndex(), [mapper](const Sink::Entity &entity, const QByteArray &property) { + const auto localBuffer = Sink::EntityBuffer::readBuffer(entity.local()); + return mapper->getProperty(property, localBuffer); + }); +} diff --git a/common/domain/folder.h b/common/domain/folder.h index 6e066e1..77edc8a 100644 --- a/common/domain/folder.h +++ b/common/domain/folder.h @@ -21,6 +21,7 @@ #include "applicationdomaintype.h" #include "storage.h" +#include "datastorequery.h" class ResultSet; class QByteArray; @@ -44,6 +45,7 @@ class TypeImplementation { public: typedef Sink::ApplicationDomain::Buffer::Folder Buffer; typedef Sink::ApplicationDomain::Buffer::FolderBuilder BufferBuilder; + static DataStoreQuery prepareQuery(const Sink::Query &query, Sink::Storage::Transaction &transaction); static QSet indexedProperties(); static ResultSet queryIndexes(const Sink::Query &query, const QByteArray &resourceInstanceIdentifier, QSet &appliedFilters, QByteArray &appliedSorting, Sink::Storage::Transaction &transaction); static void index(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::Transaction &transaction); diff --git a/common/domain/mail.cpp b/common/domain/mail.cpp index 13e1305..bb5ad58 100644 --- a/common/domain/mail.cpp +++ b/common/domain/mail.cpp @@ -32,6 +32,8 @@ #include "../query.h" #include "../definitions.h" #include "../typeindex.h" +#include "entitybuffer.h" +#include "entity_generated.h" #include "mail_generated.h" @@ -110,3 +112,14 @@ QSharedPointer::BufferBuilder> > Ty propertyMapper->addMapping(&BufferBuilder::add_sent); return propertyMapper; } + +DataStoreQuery TypeImplementation::prepareQuery(const Sink::Query &query, Sink::Storage::Transaction &transaction) +{ + auto mapper = initializeReadPropertyMapper(); + return DataStoreQuery(query, ApplicationDomain::getTypeName(), transaction, getIndex(), [mapper](const Sink::Entity &entity, const QByteArray &property) { + + const auto localBuffer = Sink::EntityBuffer::readBuffer(entity.local()); + return mapper->getProperty(property, localBuffer); + }); +} + diff --git a/common/domain/mail.h b/common/domain/mail.h index ff169dd..d6af9c5 100644 --- a/common/domain/mail.h +++ b/common/domain/mail.h @@ -21,6 +21,7 @@ #include "applicationdomaintype.h" #include "storage.h" +#include "datastorequery.h" class ResultSet; class QByteArray; @@ -44,6 +45,7 @@ class TypeImplementation { public: typedef Sink::ApplicationDomain::Buffer::Mail Buffer; typedef Sink::ApplicationDomain::Buffer::MailBuilder BufferBuilder; + static DataStoreQuery prepareQuery(const Sink::Query &query, Sink::Storage::Transaction &transaction); static QSet indexedProperties(); /** * Returns the potential result set based on the indexes. diff --git a/common/entitybuffer.cpp b/common/entitybuffer.cpp index 950bc46..32583cc 100644 --- a/common/entitybuffer.cpp +++ b/common/entitybuffer.cpp @@ -26,7 +26,7 @@ bool EntityBuffer::isValid() const return mEntity; } -const Sink::Entity &EntityBuffer::entity() +const Sink::Entity &EntityBuffer::entity() const { Q_ASSERT(mEntity); return *mEntity; @@ -84,3 +84,15 @@ void EntityBuffer::assembleEntityBuffer( auto entity = Sink::CreateEntity(fbb, metadata, resource, local); Sink::FinishEntityBuffer(fbb, entity); } + +Sink::Operation EntityBuffer::operation() const +{ + const auto metadataBuffer = readBuffer(mEntity->metadata()); + return metadataBuffer ? metadataBuffer->operation() : Sink::Operation_Creation; +} + +qint64 EntityBuffer::revision() const +{ + const auto metadataBuffer = readBuffer(mEntity->metadata()); + return metadataBuffer ? metadataBuffer->revision() : -1; +} diff --git a/common/entitybuffer.h b/common/entitybuffer.h index 866a7d0..4162605 100644 --- a/common/entitybuffer.h +++ b/common/entitybuffer.h @@ -3,6 +3,7 @@ #include "sink_export.h" #include #include +#include "metadata_generated.h" #include namespace Sink { @@ -16,9 +17,12 @@ public: const uint8_t *resourceBuffer(); const uint8_t *metadataBuffer(); const uint8_t *localBuffer(); - const Entity &entity(); + const Entity &entity() const; bool isValid() const; + Sink::Operation operation() const; + qint64 revision() const; + static void extractResourceBuffer(void *dataValue, int dataSize, const std::function &handler); /* * TODO: Ideally we would be passing references to vectors in the same bufferbuilder, to avoid needlessly copying data. diff --git a/common/entityreader.cpp b/common/entityreader.cpp index 01c25d2..faa154b 100644 --- a/common/entityreader.cpp +++ b/common/entityreader.cpp @@ -150,205 +150,41 @@ void EntityReader::query(const Sink::Query &query, const std::functi }); } -template -void EntityReader::readEntity(const Sink::Storage::NamedDatabase &db, const QByteArray &key, - const std::function &resultCallback) -{ - db.findLatest(key, - [=](const QByteArray &key, const QByteArray &value) -> bool { - Sink::EntityBuffer buffer(value.data(), value.size()); - const Sink::Entity &entity = buffer.entity(); - const auto metadataBuffer = Sink::EntityBuffer::readBuffer(entity.metadata()); - const qint64 revision = metadataBuffer ? metadataBuffer->revision() : -1; - const auto operation = metadataBuffer ? metadataBuffer->operation() : Sink::Operation_Creation; - auto adaptor = mDomainTypeAdaptorFactory.createAdaptor(entity); - Q_ASSERT(adaptor); - resultCallback(DomainType::Ptr::create(mResourceInstanceIdentifier, Sink::Storage::uidFromKey(key), revision, adaptor), operation); - return false; - }, - [&](const Sink::Storage::Error &error) { SinkWarning() << "Error during query: " << error.message << key; }); -} - -static inline ResultSet fullScan(const Sink::Storage::Transaction &transaction, const QByteArray &bufferType) -{ - // TODO use a result set with an iterator, to read values on demand - SinkTrace() << "Looking for : " << bufferType; - //The scan can return duplicate results if we have multiple revisions, so we use a set to deduplicate. - QSet keys; - Storage::mainDatabase(transaction, bufferType) - .scan(QByteArray(), - [&](const QByteArray &key, const QByteArray &value) -> bool { - if (keys.contains(Sink::Storage::uidFromKey(key))) { - //Not something that should persist if the replay works, so we keep a message for now. - SinkTrace() << "Multiple revisions for key: " << key; - } - keys << Sink::Storage::uidFromKey(key); - return true; - }, - [](const Sink::Storage::Error &error) { SinkWarning() << "Error during query: " << error.message; }); - - SinkTrace() << "Full scan retrieved " << keys.size() << " results."; - return ResultSet(keys.toList().toVector()); -} - -template -ResultSet EntityReader::loadInitialResultSet(const Sink::Query &query, QSet &remainingFilters, QByteArray &remainingSorting) -{ - if (!query.ids.isEmpty()) { - return ResultSet(query.ids.toVector()); - } - QSet appliedFilters; - QByteArray appliedSorting; - auto resultSet = Sink::ApplicationDomain::TypeImplementation::queryIndexes(query, mResourceInstanceIdentifier, appliedFilters, appliedSorting, mTransaction); - remainingFilters = query.propertyFilter.keys().toSet() - appliedFilters; - if (appliedSorting.isEmpty()) { - remainingSorting = query.sortProperty; - } +/* template */ +/* void EntityReader::readEntity(const Sink::Storage::NamedDatabase &db, const QByteArray &key, */ +/* const std::function &resultCallback) */ +/* { */ +/* db.findLatest(key, */ +/* [=](const QByteArray &key, const QByteArray &value) -> bool { */ +/* Sink::EntityBuffer buffer(value.data(), value.size()); */ +/* const Sink::Entity &entity = buffer.entity(); */ +/* const auto metadataBuffer = Sink::EntityBuffer::readBuffer(entity.metadata()); */ +/* const qint64 revision = metadataBuffer ? metadataBuffer->revision() : -1; */ +/* const auto operation = metadataBuffer ? metadataBuffer->operation() : Sink::Operation_Creation; */ +/* auto adaptor = mDomainTypeAdaptorFactory.createAdaptor(entity); */ +/* Q_ASSERT(adaptor); */ +/* resultCallback(DomainType::Ptr::create(mResourceInstanceIdentifier, Sink::Storage::uidFromKey(key), revision, adaptor), operation); */ +/* return false; */ +/* }, */ +/* [&](const Sink::Storage::Error &error) { SinkWarning() << "Error during query: " << error.message << key; }); */ +/* } */ - // We do a full scan if there were no indexes available to create the initial set. - if (appliedFilters.isEmpty()) { - // TODO this should be replaced by an index lookup as well - resultSet = fullScan(mTransaction, ApplicationDomain::getTypeName()); - } - return resultSet; -} - -template -ResultSet EntityReader::loadIncrementalResultSet(qint64 baseRevision, const Sink::Query &query, QSet &remainingFilters) -{ - const auto bufferType = ApplicationDomain::getTypeName(); - auto revisionCounter = QSharedPointer::create(baseRevision); - remainingFilters = query.propertyFilter.keys().toSet(); - return ResultSet([this, bufferType, revisionCounter]() -> QByteArray { - const qint64 topRevision = Sink::Storage::maxRevision(mTransaction); - // Spit out the revision keys one by one. - while (*revisionCounter <= topRevision) { - const auto uid = Sink::Storage::getUidFromRevision(mTransaction, *revisionCounter); - const auto type = Sink::Storage::getTypeFromRevision(mTransaction, *revisionCounter); - // SinkTrace() << "Revision" << *revisionCounter << type << uid; - Q_ASSERT(!uid.isEmpty()); - Q_ASSERT(!type.isEmpty()); - if (type != bufferType) { - // Skip revision - *revisionCounter += 1; - continue; - } - const auto key = Sink::Storage::assembleKey(uid, *revisionCounter); - *revisionCounter += 1; - return key; - } - SinkTrace() << "Finished reading incremental result set:" << *revisionCounter; - // We're done - return QByteArray(); - }); -} - -template -ResultSet EntityReader::filterAndSortSet(ResultSet &resultSet, const std::function &filter, - const Sink::Storage::NamedDatabase &db, bool initialQuery, const QByteArray &sortProperty) -{ - const bool sortingRequired = !sortProperty.isEmpty(); - if (initialQuery && sortingRequired) { - SinkTrace() << "Sorting the resultset in memory according to property: " << sortProperty; - // Sort the complete set by reading the sort property and filling into a sorted map - auto sortedMap = QSharedPointer>::create(); - while (resultSet.next()) { - // readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess) - readEntity(db, resultSet.id(), - [this, filter, initialQuery, sortedMap, sortProperty, &resultSet](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject, Sink::Operation operation) { - // We're not interested in removals during the initial query - if ((operation != Sink::Operation_Removal) && filter(domainObject)) { - if (!sortProperty.isEmpty()) { - const auto sortValue = domainObject->getProperty(sortProperty); - if (sortValue.type() == QVariant::DateTime) { - sortedMap->insert(QByteArray::number(std::numeric_limits::max() - sortValue.toDateTime().toTime_t()), domainObject->identifier()); - } else { - sortedMap->insert(sortValue.toString().toLatin1(), domainObject->identifier()); - } - } else { - sortedMap->insert(domainObject->identifier(), domainObject->identifier()); - } - } - }); - } - - SinkTrace() << "Sorted " << sortedMap->size() << " values."; - auto iterator = QSharedPointer>::create(*sortedMap); - ResultSet::ValueGenerator generator = [this, iterator, sortedMap, &db, filter, initialQuery]( - std::function callback) -> bool { - if (iterator->hasNext()) { - readEntity(db, iterator->next().value(), [this, filter, callback, initialQuery](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject, - Sink::Operation operation) { callback(domainObject, Sink::Operation_Creation); }); - return true; - } - return false; - }; - auto skip = [iterator]() { - if (iterator->hasNext()) { - iterator->next(); - } - }; - return ResultSet(generator, skip); - } else { - auto resultSetPtr = QSharedPointer::create(resultSet); - ResultSet::ValueGenerator generator = [this, resultSetPtr, &db, filter, initialQuery]( - std::function callback) -> bool { - if (resultSetPtr->next()) { - // readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess) - readEntity(db, resultSetPtr->id(), [this, filter, callback, initialQuery](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject, Sink::Operation operation) { - if (initialQuery) { - // We're not interested in removals during the initial query - if ((operation != Sink::Operation_Removal) && filter(domainObject)) { - // In the initial set every entity is new - callback(domainObject, Sink::Operation_Creation); - } - } else { - // Always remove removals, they probably don't match due to non-available properties - if ((operation == Sink::Operation_Removal) || filter(domainObject)) { - // TODO only replay if this is in the currently visible set (or just always replay, worst case we have a couple to many results) - callback(domainObject, operation); - } - } - }); - return true; - } - return false; - }; - auto skip = [resultSetPtr]() { resultSetPtr->skip(1); }; - return ResultSet(generator, skip); - } -} template -QPair EntityReader::load(const Sink::Query &query, const std::function &, QByteArray &)> &baseSetRetriever, bool initialQuery, int offset, int batchSize, const std::function &callback) +QPair EntityReader::executeInitialQuery(const Sink::Query &query, int offset, int batchsize, const std::function &callback) { QTime time; time.start(); - auto db = Storage::mainDatabase(mTransaction, ApplicationDomain::getTypeName()); + auto preparedQuery = ApplicationDomain::TypeImplementation::prepareQuery(query, mTransaction); + auto resultSet = preparedQuery.execute(); - QSet remainingFilters; - QByteArray remainingSorting; - auto resultSet = baseSetRetriever(remainingFilters, remainingSorting); - SinkTrace() << "Base set retrieved. " << Log::TraceTime(time.elapsed()); - auto filteredSet = filterAndSortSet(resultSet, getFilter(remainingFilters, query), db, initialQuery, remainingSorting); SinkTrace() << "Filtered set retrieved. " << Log::TraceTime(time.elapsed()); - auto replayedEntities = replaySet(filteredSet, offset, batchSize, callback); - // SinkTrace() << "Filtered set replayed. " << Log::TraceTime(time.elapsed()); - return qMakePair(Sink::Storage::maxRevision(mTransaction), replayedEntities); -} + auto replayedEntities = replaySet(resultSet, offset, batchsize, callback); -template -QPair EntityReader::executeInitialQuery(const Sink::Query &query, int offset, int batchsize, const std::function &callback) -{ - QTime time; - time.start(); - auto revisionAndReplayedEntities = load(query, [&](QSet &remainingFilters, QByteArray &remainingSorting) -> ResultSet { - return loadInitialResultSet(query, remainingFilters, remainingSorting); - }, true, offset, batchsize, callback); SinkTrace() << "Initial query took: " << Log::TraceTime(time.elapsed()); - return revisionAndReplayedEntities; + return qMakePair(Sink::Storage::maxRevision(mTransaction), replayedEntities); } template @@ -357,33 +193,15 @@ QPair EntityReader::executeIncrementalQuery(const Si QTime time; time.start(); const qint64 baseRevision = lastRevision + 1; - auto revisionAndReplayedEntities = load(query, [&](QSet &remainingFilters, QByteArray &remainingSorting) -> ResultSet { - return loadIncrementalResultSet(baseRevision, query, remainingFilters); - }, false, 0, 0, callback); - SinkTrace() << "Initial query took: " << Log::TraceTime(time.elapsed()); - return revisionAndReplayedEntities; -} -template -std::function -EntityReader::getFilter(const QSet remainingFilters, const Sink::Query &query) -{ - return [this, remainingFilters, query](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject) -> bool { - if (!query.ids.isEmpty()) { - if (!query.ids.contains(domainObject->identifier())) { - return false; - } - } - for (const auto &filterProperty : remainingFilters) { - const auto property = domainObject->getProperty(filterProperty); - const auto comparator = query.propertyFilter.value(filterProperty); - if (!comparator.matches(property)) { - SinkTrace() << "Filtering entity due to property mismatch on filter: " << filterProperty << property << ":" << comparator.value; - return false; - } - } - return true; - }; + auto preparedQuery = ApplicationDomain::TypeImplementation::prepareQuery(query, mTransaction); + auto resultSet = preparedQuery.update(baseRevision); + + SinkTrace() << "Filtered set retrieved. " << Log::TraceTime(time.elapsed()); + auto replayedEntities = replaySet(resultSet, 0, 0, callback); + + SinkTrace() << "Incremental query took: " << Log::TraceTime(time.elapsed()); + return qMakePair(Sink::Storage::maxRevision(mTransaction), replayedEntities); } template @@ -394,9 +212,11 @@ qint64 EntityReader::replaySet(ResultSet &resultSet, int offset, int int counter = 0; while (!batchSize || (counter < batchSize)) { const bool ret = - resultSet.next([&counter, callback](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &value, Sink::Operation operation) -> bool { + resultSet.next([this, &counter, callback](const QByteArray &uid, const Sink::EntityBuffer &value, Sink::Operation operation) -> bool { counter++; - return callback(value.staticCast(), operation); + auto adaptor = mDomainTypeAdaptorFactory.createAdaptor(value.entity()); + Q_ASSERT(adaptor); + return callback(QSharedPointer::create(mResourceInstanceIdentifier, uid, value.revision(), adaptor), operation); }); if (!ret) { break; diff --git a/common/entityreader.h b/common/entityreader.h index a479679..f216453 100644 --- a/common/entityreader.h +++ b/common/entityreader.h @@ -89,17 +89,6 @@ public: private: qint64 replaySet(ResultSet &resultSet, int offset, int batchSize, const ResultCallback &callback); - void readEntity(const Sink::Storage::NamedDatabase &db, const QByteArray &key, - const std::function &resultCallback); - - ResultSet loadInitialResultSet(const Sink::Query &query, QSet &remainingFilters, QByteArray &remainingSorting); - ResultSet loadIncrementalResultSet(qint64 baseRevision, const Sink::Query &query, QSet &remainingFilters); - - ResultSet filterAndSortSet(ResultSet &resultSet, const std::function &filter, - const Sink::Storage::NamedDatabase &db, bool initialQuery, const QByteArray &sortProperty); - std::function getFilter(const QSet remainingFilters, const Sink::Query &query); - QPair load(const Sink::Query &query, const std::function &, QByteArray &)> &baseSetRetriever, bool initialQuery, int offset, int batchSize, const ResultCallback &callback); - private: QByteArray mResourceInstanceIdentifier; Sink::Storage::Transaction &mTransaction; diff --git a/common/resultset.cpp b/common/resultset.cpp index 293035b..51914e9 100644 --- a/common/resultset.cpp +++ b/common/resultset.cpp @@ -18,7 +18,7 @@ */ #include "resultset.h" -#include "common/log.h" +#include "log.h" ResultSet::ResultSet() : mIt(nullptr) { @@ -78,12 +78,12 @@ bool ResultSet::next() return true; } } else { - next([](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &value, Sink::Operation) { return false; }); + next([](const QByteArray &, const Sink::EntityBuffer &, Sink::Operation) { return false; }); } return false; } -bool ResultSet::next(std::function callback) +bool ResultSet::next(const Callback &callback) { Q_ASSERT(mValueGenerator); return mValueGenerator(callback); diff --git a/common/resultset.h b/common/resultset.h index 88f7055..4e934fc 100644 --- a/common/resultset.h +++ b/common/resultset.h @@ -20,8 +20,8 @@ #include #include -#include "domain/applicationdomaintype.h" #include "metadata_generated.h" +#include "entitybuffer.h" /* * An iterator to a result set. @@ -31,7 +31,8 @@ class ResultSet { public: - typedef std::function)> ValueGenerator; + typedef std::function Callback; + typedef std::function ValueGenerator; typedef std::function IdGenerator; typedef std::function SkipValue; @@ -42,7 +43,7 @@ public: ResultSet(const ResultSet &other); bool next(); - bool next(std::function callback); + bool next(const Callback &callback); void skip(int number); -- cgit v1.2.3