From 52ad48c8bd755a2fde249296d6017853538f478f Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Fri, 23 Sep 2016 01:35:13 +0200 Subject: A new query system --- common/datastorequery.cpp | 520 ++++++++++++++++++++++++++++---------------- common/datastorequery.h | 74 ++++++- common/domain/event.cpp | 1 + common/domain/event.h | 5 +- common/domain/folder.cpp | 1 + common/domain/folder.h | 4 +- common/domain/mail.cpp | 53 +---- common/domain/mail.h | 3 +- common/entityreader.cpp | 1 + common/mailpreprocessor.cpp | 8 +- common/modelresult.cpp | 12 +- common/query.cpp | 3 + common/typeindex.cpp | 10 +- common/typeindex.h | 2 +- 14 files changed, 435 insertions(+), 262 deletions(-) (limited to 'common') diff --git a/common/datastorequery.cpp b/common/datastorequery.cpp index cc070be..95df1a0 100644 --- a/common/datastorequery.cpp +++ b/common/datastorequery.cpp @@ -27,13 +27,173 @@ using namespace Sink; SINK_DEBUG_AREA("datastorequery") +class Source : public FilterBase { + public: + typedef QSharedPointer Ptr; + + QVector mIds; + QVector::ConstIterator mIt; + + Source (const QVector &ids, DataStoreQuery *store) + : FilterBase(store), + mIds(ids), + mIt(mIds.constBegin()) + { + + } + + virtual ~Source(){} + + virtual void skip() Q_DECL_OVERRIDE + { + if (mIt != mIds.constEnd()) { + mIt++; + } + }; + + void add(const QVector &ids) + { + mIds = ids; + mIt = mIds.constBegin(); + } + + bool next(const std::function &callback) Q_DECL_OVERRIDE + { + if (mIt == mIds.constEnd()) { + return false; + } + readEntity(*mIt, [callback](const QByteArray &uid, const Sink::EntityBuffer &entityBuffer) { + callback(entityBuffer.operation(), uid, entityBuffer); + }); + mIt++; + return mIt != mIds.constEnd(); + } +}; + +class Collector : public FilterBase { +public: + typedef QSharedPointer Ptr; + + Collector(FilterBase::Ptr source, DataStoreQuery *store) + : FilterBase(source, store) + { + + } + virtual ~Collector(){} + + bool next(const std::function &callback) Q_DECL_OVERRIDE + { + return mSource->next(callback); + } +}; + +class Filter : public FilterBase { +public: + typedef QSharedPointer Ptr; + + QHash propertyFilter; + + Filter(FilterBase::Ptr source, DataStoreQuery *store) + : FilterBase(source, store) + { + + } + + virtual ~Filter(){} + + bool next(const std::function &callback) Q_DECL_OVERRIDE { + bool foundValue = false; + while(!foundValue && mSource->next([this, callback, &foundValue](Sink::Operation operation, const QByteArray &uid, const Sink::EntityBuffer &entityBuffer) { + SinkTrace() << "Filter: " << uid << operation; + + //Always accept removals. They can't match the filter since the data is gone. + if (operation == Sink::Operation_Removal) { + SinkTrace() << "Removal: " << uid << operation; + callback(operation, uid, entityBuffer); + foundValue = true; + } else if (matchesFilter(uid, entityBuffer)) { + SinkTrace() << "Accepted: " << uid << operation; + callback(operation, uid, entityBuffer); + foundValue = true; + //TODO if something did not match the filter so far but does now, turn into an add operation. + } else { + SinkTrace() << "Rejected: " << uid << operation; + //TODO emit a removal if we had the uid in the result set and this is a modification. + //We don't know if this results in a removal from the dataset, so we emit a removal notification anyways + callback(Sink::Operation_Removal, uid, entityBuffer); + } + return false; + })) + {} + return foundValue; + } + + bool matchesFilter(const QByteArray &uid, const Sink::EntityBuffer &entityBuffer) { + for (const auto &filterProperty : propertyFilter.keys()) { + const auto property = getProperty(entityBuffer.entity(), filterProperty); + const auto comparator = propertyFilter.value(filterProperty); + if (!comparator.matches(property)) { + SinkTrace() << "Filtering entity due to property mismatch on filter: " << filterProperty << property << ":" << comparator.value; + return false; + } + } + return true; + } +}; + +/* class Reduction : public FilterBase { */ +/* public: */ +/* typedef QSharedPointer Ptr; */ + +/* QHash aggregateValues; */ + +/* Reduction(FilterBase::Ptr source, DataStoreQuery *store) */ +/* : FilterBase(source, store) */ +/* { */ + +/* } */ + +/* virtual ~Reduction(){} */ + +/* bool next(const std::function &callback) Q_DECL_OVERRIDE { */ +/* bool foundValue = false; */ +/* while(!foundValue && mSource->next([this, callback, &foundValue](const QByteArray &uid, const Sink::EntityBuffer &entityBuffer) { */ +/* const auto operation = entityBuffer.operation(); */ +/* SinkTrace() << "Filter: " << uid << operation; */ +/* //Always accept removals. They can't match the filter since the data is gone. */ +/* if (operation == Sink::Operation_Removal) { */ +/* callback(uid, entityBuffer); */ +/* foundValue = true; */ +/* } else if (matchesFilter(uid, entityBuffer)) { */ +/* callback(uid, entityBuffer); */ +/* foundValue = true; */ +/* } */ +/* return false; */ +/* })) */ +/* {} */ +/* return foundValue; */ +/* } */ + +/* bool matchesFilter(const QByteArray &uid, const Sink::EntityBuffer &entityBuffer) { */ +/* for (const auto &filterProperty : propertyFilter.keys()) { */ +/* const auto property = getProperty(entityBuffer.entity(), filterProperty); */ +/* const auto comparator = propertyFilter.value(filterProperty); */ +/* if (!comparator.matches(property)) { */ +/* SinkTrace() << "Filtering entity due to property mismatch on filter: " << filterProperty << property << ":" << comparator.value; */ +/* return false; */ +/* } */ +/* } */ +/* return true; */ +/* } */ +/* }; */ + DataStoreQuery::DataStoreQuery(const Sink::Query &query, const QByteArray &type, Sink::Storage::Transaction &transaction, TypeIndex &typeIndex, std::function getProperty) : mQuery(query), mTransaction(transaction), mType(type), mTypeIndex(typeIndex), mDb(Storage::mainDatabase(mTransaction, mType)), mGetProperty(getProperty) { - + setupQuery(); } -static inline ResultSet fullScan(const Sink::Storage::Transaction &transaction, const QByteArray &bufferType) +static inline QVector fullScan(const Sink::Storage::Transaction &transaction, const QByteArray &bufferType) { // TODO use a result set with an iterator, to read values on demand SinkTrace() << "Looking for : " << bufferType; @@ -52,59 +212,7 @@ static inline ResultSet fullScan(const Sink::Storage::Transaction &transaction, [](const Sink::Storage::Error &error) { SinkWarning() << "Error during query: " << error.message; }); SinkTrace() << "Full scan retrieved " << keys.size() << " results."; - return ResultSet(keys.toList().toVector()); -} - -ResultSet DataStoreQuery::loadInitialResultSet(QSet &remainingFilters, QByteArray &remainingSorting) -{ - if (!mQuery.ids.isEmpty()) { - return ResultSet(mQuery.ids.toVector()); - } - QSet appliedFilters; - QByteArray appliedSorting; - - auto resultSet = mTypeIndex.query(mQuery, appliedFilters, appliedSorting, mTransaction); - - remainingFilters = mQuery.propertyFilter.keys().toSet() - appliedFilters; - if (appliedSorting.isEmpty()) { - remainingSorting = mQuery.sortProperty; - } - - // We do a full scan if there were no indexes available to create the initial set. - if (appliedFilters.isEmpty()) { - // TODO this should be replaced by an index lookup as well - resultSet = fullScan(mTransaction, mType); - } - return resultSet; -} - -ResultSet DataStoreQuery::loadIncrementalResultSet(qint64 baseRevision, QSet &remainingFilters) -{ - const auto bufferType = mType; - auto revisionCounter = QSharedPointer::create(baseRevision); - remainingFilters = mQuery.propertyFilter.keys().toSet(); - return ResultSet([this, bufferType, revisionCounter]() -> QByteArray { - const qint64 topRevision = Sink::Storage::maxRevision(mTransaction); - // Spit out the revision keys one by one. - while (*revisionCounter <= topRevision) { - const auto uid = Sink::Storage::getUidFromRevision(mTransaction, *revisionCounter); - const auto type = Sink::Storage::getTypeFromRevision(mTransaction, *revisionCounter); - // SinkTrace() << "Revision" << *revisionCounter << type << uid; - Q_ASSERT(!uid.isEmpty()); - Q_ASSERT(!type.isEmpty()); - if (type != bufferType) { - // Skip revision - *revisionCounter += 1; - continue; - } - const auto key = Sink::Storage::assembleKey(uid, *revisionCounter); - *revisionCounter += 1; - return key; - } - SinkTrace() << "Finished reading incremental result set:" << *revisionCounter; - // We're done - return QByteArray(); - }); + return keys.toList().toVector(); } void DataStoreQuery::readEntity(const QByteArray &key, const BufferCallback &resultCallback) @@ -122,156 +230,194 @@ QVariant DataStoreQuery::getProperty(const Sink::Entity &entity, const QByteArra return mGetProperty(entity, property); } -ResultSet DataStoreQuery::filterAndSortSet(ResultSet &resultSet, const FilterFunction &filter, const QByteArray &sortProperty) -{ - const bool sortingRequired = !sortProperty.isEmpty(); - if (mInitialQuery && sortingRequired) { - SinkTrace() << "Sorting the resultset in memory according to property: " << sortProperty; - // Sort the complete set by reading the sort property and filling into a sorted map - auto sortedMap = QSharedPointer>::create(); - while (resultSet.next()) { - // readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess) - readEntity(resultSet.id(), - [this, filter, sortedMap, sortProperty, &resultSet](const QByteArray &uid, const Sink::EntityBuffer &buffer) { - - const auto operation = buffer.operation(); - - // We're not interested in removals during the initial query - if ((operation != Sink::Operation_Removal) && filter(uid, buffer)) { - if (!sortProperty.isEmpty()) { - const auto sortValue = getProperty(buffer.entity(), sortProperty); - if (sortValue.type() == QVariant::DateTime) { - sortedMap->insert(QByteArray::number(std::numeric_limits::max() - sortValue.toDateTime().toTime_t()), uid); - } else { - sortedMap->insert(sortValue.toString().toLatin1(), uid); - } - } else { - sortedMap->insert(uid, uid); - } - } - }); - } +/* ResultSet DataStoreQuery::filterAndSortSet(ResultSet &resultSet, const FilterFunction &filter, const QByteArray &sortProperty) */ +/* { */ +/* const bool sortingRequired = !sortProperty.isEmpty(); */ +/* if (mInitialQuery && sortingRequired) { */ +/* SinkTrace() << "Sorting the resultset in memory according to property: " << sortProperty; */ +/* // Sort the complete set by reading the sort property and filling into a sorted map */ +/* auto sortedMap = QSharedPointer>::create(); */ +/* while (resultSet.next()) { */ +/* // readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess) */ +/* readEntity(resultSet.id(), */ +/* [this, filter, sortedMap, sortProperty, &resultSet](const QByteArray &uid, const Sink::EntityBuffer &buffer) { */ - SinkTrace() << "Sorted " << sortedMap->size() << " values."; - auto iterator = QSharedPointer>::create(*sortedMap); - ResultSet::ValueGenerator generator = [this, iterator, sortedMap, filter]( - std::function callback) -> bool { - if (iterator->hasNext()) { - readEntity(iterator->next().value(), [this, filter, callback](const QByteArray &uid, const Sink::EntityBuffer &buffer) { - callback(uid, buffer, Sink::Operation_Creation); - }); - return true; - } - return false; - }; +/* const auto operation = buffer.operation(); */ - auto skip = [iterator]() { - if (iterator->hasNext()) { - iterator->next(); - } - }; - return ResultSet(generator, skip); - } else { - auto resultSetPtr = QSharedPointer::create(resultSet); - ResultSet::ValueGenerator generator = [this, resultSetPtr, filter](const ResultSet::Callback &callback) -> bool { - if (resultSetPtr->next()) { - SinkTrace() << "Reading the next value: " << resultSetPtr->id(); - // readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess) - readEntity(resultSetPtr->id(), [this, filter, callback](const QByteArray &uid, const Sink::EntityBuffer &buffer) { - const auto operation = buffer.operation(); - if (mInitialQuery) { - // We're not interested in removals during the initial query - if ((operation != Sink::Operation_Removal) && filter(uid, buffer)) { - // In the initial set every entity is new - callback(uid, buffer, Sink::Operation_Creation); - } - } else { - // Always remove removals, they probably don't match due to non-available properties - if ((operation == Sink::Operation_Removal) || filter(uid, buffer)) { - // TODO only replay if this is in the currently visible set (or just always replay, worst case we have a couple to many results) - callback(uid, buffer, operation); - } - } - }); - return true; - } - return false; - }; - auto skip = [resultSetPtr]() { resultSetPtr->skip(1); }; - return ResultSet(generator, skip); - } -} +/* // We're not interested in removals during the initial query */ +/* if ((operation != Sink::Operation_Removal) && filter(uid, buffer)) { */ +/* if (!sortProperty.isEmpty()) { */ +/* const auto sortValue = getProperty(buffer.entity(), sortProperty); */ +/* if (sortValue.type() == QVariant::DateTime) { */ +/* sortedMap->insert(QByteArray::number(std::numeric_limits::max() - sortValue.toDateTime().toTime_t()), uid); */ +/* } else { */ +/* sortedMap->insert(sortValue.toString().toLatin1(), uid); */ +/* } */ +/* } else { */ +/* sortedMap->insert(uid, uid); */ +/* } */ +/* } */ +/* }); */ +/* } */ +/* SinkTrace() << "Sorted " << sortedMap->size() << " values."; */ +/* auto iterator = QSharedPointer>::create(*sortedMap); */ +/* ResultSet::ValueGenerator generator = [this, iterator, sortedMap, filter]( */ +/* std::function callback) -> bool { */ +/* if (iterator->hasNext()) { */ +/* readEntity(iterator->next().value(), [this, filter, callback](const QByteArray &uid, const Sink::EntityBuffer &buffer) { */ +/* callback(uid, buffer, Sink::Operation_Creation); */ +/* }); */ +/* return true; */ +/* } */ +/* return false; */ +/* }; */ -DataStoreQuery::FilterFunction DataStoreQuery::getFilter(const QSet &remainingFilters) +/* auto skip = [iterator]() { */ +/* if (iterator->hasNext()) { */ +/* iterator->next(); */ +/* } */ +/* }; */ +/* return ResultSet(generator, skip); */ +/* } else { */ +/* auto resultSetPtr = QSharedPointer::create(resultSet); */ +/* ResultSet::ValueGenerator generator = [this, resultSetPtr, filter](const ResultSet::Callback &callback) -> bool { */ +/* if (resultSetPtr->next()) { */ +/* SinkTrace() << "Reading the next value: " << resultSetPtr->id(); */ +/* // readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess) */ +/* readEntity(resultSetPtr->id(), [this, filter, callback](const QByteArray &uid, const Sink::EntityBuffer &buffer) { */ +/* const auto operation = buffer.operation(); */ +/* if (mInitialQuery) { */ +/* // We're not interested in removals during the initial query */ +/* if ((operation != Sink::Operation_Removal) && filter(uid, buffer)) { */ +/* // In the initial set every entity is new */ +/* callback(uid, buffer, Sink::Operation_Creation); */ +/* } */ +/* } else { */ +/* // Always remove removals, they probably don't match due to non-available properties */ +/* if ((operation == Sink::Operation_Removal) || filter(uid, buffer)) { */ +/* // TODO only replay if this is in the currently visible set (or just always replay, worst case we have a couple to many results) */ +/* callback(uid, buffer, operation); */ +/* } */ +/* } */ +/* }); */ +/* return true; */ +/* } */ +/* return false; */ +/* }; */ +/* auto skip = [resultSetPtr]() { resultSetPtr->skip(1); }; */ +/* return ResultSet(generator, skip); */ +/* } */ +/* } */ + +void DataStoreQuery::setupQuery() { - auto query = mQuery; - return [this, remainingFilters, query](const QByteArray &uid, const Sink::EntityBuffer &entity) -> bool { - if (!query.ids.isEmpty()) { - if (!query.ids.contains(uid)) { - SinkTrace() << "Filter by uid: " << uid; - return false; - } - } - for (const auto &filterProperty : remainingFilters) { - const auto property = getProperty(entity.entity(), filterProperty); - const auto comparator = query.propertyFilter.value(filterProperty); - if (!comparator.matches(property)) { - SinkTrace() << "Filtering entity due to property mismatch on filter: " << filterProperty << property << ":" << comparator.value; - return false; - } + FilterBase::Ptr baseSet; + QSet remainingFilters; + QByteArray appliedSorting; + if (!mQuery.ids.isEmpty()) { + mSource = Source::Ptr::create(mQuery.ids.toVector(), this); + baseSet = mSource; + remainingFilters = mQuery.propertyFilter.keys().toSet(); + } else { + QSet appliedFilters; + + auto resultSet = mTypeIndex.query(mQuery, appliedFilters, appliedSorting, mTransaction); + remainingFilters = mQuery.propertyFilter.keys().toSet() - appliedFilters; + + // We do a full scan if there were no indexes available to create the initial set. + if (appliedFilters.isEmpty()) { + // TODO this should be replaced by an index lookup on the uid index + mSource = Source::Ptr::create(fullScan(mTransaction, mType), this); + } else { + mSource = Source::Ptr::create(resultSet, this); } - return true; - }; -} + baseSet = mSource; + } + if (!mQuery.propertyFilter.isEmpty()) { + auto filter = Filter::Ptr::create(baseSet, this); + filter->propertyFilter = mQuery.propertyFilter; + /* for (const auto &f : remainingFilters) { */ + /* filter->propertyFilter.insert(f, mQuery.propertyFilter.value(f)); */ + /* } */ + baseSet = filter; + } + /* if (appliedSorting.isEmpty() && !mQuery.sortProperty.isEmpty()) { */ + /* //Apply manual sorting */ + /* baseSet = Sort::Ptr::create(baseSet, mQuery.sortProperty); */ + /* } */ -ResultSet DataStoreQuery::createFilteredSet(ResultSet &resultSet, const std::function &filter) -{ - auto resultSetPtr = QSharedPointer::create(resultSet); - ResultSet::ValueGenerator generator = [this, resultSetPtr, filter](const ResultSet::Callback &callback) -> bool { - return resultSetPtr->next([=](const QByteArray &uid, const Sink::EntityBuffer &buffer, Sink::Operation operation) { - if (mInitialQuery) { - // We're not interested in removals during the initial query - if ((operation != Sink::Operation_Removal) && filter(uid, buffer)) { - // In the initial set every entity is new - callback(uid, buffer, Sink::Operation_Creation); - } - } else { - // Always remove removals, they probably don't match due to non-available properties - if ((operation == Sink::Operation_Removal) || filter(uid, buffer)) { - // TODO only replay if this is in the currently visible set (or just always replay, worst case we have a couple to many results) - callback(uid, buffer, operation); - } - } - }); - }; - auto skip = [resultSetPtr]() { resultSetPtr->skip(1); }; - return ResultSet(generator, skip); + /* if (mQuery.threadLeaderOnly) { */ + /* auto reduce = Reduce::Ptr::create(baseSet, this); */ + + /* baseSet = reduce; */ + /* } */ + + mCollector = Collector::Ptr::create(baseSet, this); } -ResultSet DataStoreQuery::postSortFilter(ResultSet &resultSet) +QVector DataStoreQuery::loadIncrementalResultSet(qint64 baseRevision) { - return resultSet; + const auto bufferType = mType; + auto revisionCounter = QSharedPointer::create(baseRevision); + QVector changedKeys; + const qint64 topRevision = Sink::Storage::maxRevision(mTransaction); + // Spit out the revision keys one by one. + while (*revisionCounter <= topRevision) { + const auto uid = Sink::Storage::getUidFromRevision(mTransaction, *revisionCounter); + const auto type = Sink::Storage::getTypeFromRevision(mTransaction, *revisionCounter); + // SinkTrace() << "Revision" << *revisionCounter << type << uid; + Q_ASSERT(!uid.isEmpty()); + Q_ASSERT(!type.isEmpty()); + if (type != bufferType) { + // Skip revision + *revisionCounter += 1; + continue; + } + const auto key = Sink::Storage::assembleKey(uid, *revisionCounter); + *revisionCounter += 1; + changedKeys << key; + } + SinkTrace() << "Finished reading incremental result set:" << *revisionCounter; + return changedKeys; } + ResultSet DataStoreQuery::update(qint64 baseRevision) { SinkTrace() << "Executing query update"; - mInitialQuery = false; - QSet remainingFilters; - QByteArray remainingSorting; - auto resultSet = loadIncrementalResultSet(baseRevision, remainingFilters); - auto filteredSet = filterAndSortSet(resultSet, getFilter(remainingFilters), remainingSorting); - return postSortFilter(filteredSet); + auto incrementalResultSet = loadIncrementalResultSet(baseRevision); + SinkTrace() << "Changed: " << incrementalResultSet; + mSource->add(incrementalResultSet); + ResultSet::ValueGenerator generator = [this](const ResultSet::Callback &callback) -> bool { + if (mCollector->next([callback](Sink::Operation operation, const QByteArray &uid, const Sink::EntityBuffer &buffer) { + SinkTrace() << "Got incremental result: " << uid << operation; + callback(uid, buffer, operation); + })) + { + return true; + } + return false; + }; + return ResultSet(generator, [this]() { mCollector->skip(); }); } + ResultSet DataStoreQuery::execute() { SinkTrace() << "Executing query"; - mInitialQuery = true; - QSet remainingFilters; - QByteArray remainingSorting; - auto resultSet = loadInitialResultSet(remainingFilters, remainingSorting); - auto filteredSet = filterAndSortSet(resultSet, getFilter(remainingFilters), remainingSorting); - return postSortFilter(filteredSet); + + ResultSet::ValueGenerator generator = [this](const ResultSet::Callback &callback) -> bool { + if (mCollector->next([callback](Sink::Operation operation, const QByteArray &uid, const Sink::EntityBuffer &buffer) { + if (operation != Sink::Operation_Removal) { + SinkTrace() << "Got initial result: " << uid << operation; + callback(uid, buffer, Sink::Operation_Creation); + } + })) + { + return true; + } + return false; + }; + return ResultSet(generator, [this]() { mCollector->skip(); }); } diff --git a/common/datastorequery.h b/common/datastorequery.h index 7712ac7..c9f6a3a 100644 --- a/common/datastorequery.h +++ b/common/datastorequery.h @@ -25,7 +25,12 @@ #include "query.h" #include "entitybuffer.h" + +class Source; +class FilterBase; + class DataStoreQuery { + friend class FilterBase; public: typedef QSharedPointer Ptr; @@ -42,14 +47,17 @@ protected: virtual void readEntity(const QByteArray &key, const BufferCallback &resultCallback); - virtual ResultSet loadInitialResultSet(QSet &remainingFilters, QByteArray &remainingSorting); - virtual ResultSet loadIncrementalResultSet(qint64 baseRevision, QSet &remainingFilters); + /* virtual ResultSet loadInitialResultSet(QSet &remainingFilters, QByteArray &remainingSorting); */ + /* virtual ResultSet loadIncrementalResultSet(qint64 baseRevision, QSet &remainingFilters); */ - virtual ResultSet filterAndSortSet(ResultSet &resultSet, const FilterFunction &filter, const QByteArray &sortProperty); - virtual ResultSet postSortFilter(ResultSet &resultSet); - virtual FilterFunction getFilter(const QSet &remainingFilters); + /* virtual ResultSet filterAndSortSet(ResultSet &resultSet, const FilterFunction &filter, const QByteArray &sortProperty); */ + /* virtual ResultSet postSortFilter(ResultSet &resultSet); */ + /* virtual FilterFunction getFilter(const QSet &remainingFilters); */ ResultSet createFilteredSet(ResultSet &resultSet, const std::function &); + QVector loadIncrementalResultSet(qint64 baseRevision); + + void setupQuery(); Sink::Query mQuery; Sink::Storage::Transaction &mTransaction; @@ -58,8 +66,64 @@ protected: Sink::Storage::NamedDatabase mDb; std::function mGetProperty; bool mInitialQuery; + QSharedPointer mCollector; + QSharedPointer mSource; +}; + + +class FilterBase { +public: + typedef QSharedPointer Ptr; + FilterBase(DataStoreQuery *store) + : mDatastore(store) + { + + } + + FilterBase(FilterBase::Ptr source, DataStoreQuery *store) + : mSource(source), + mDatastore(store) + { + } + + virtual ~FilterBase(){} + + void readEntity(const QByteArray &key, const std::function &callback) + { + Q_ASSERT(mDatastore); + mDatastore->readEntity(key, callback); + } + + QVariant getProperty(const Sink::Entity &entity, const QByteArray &property) + { + Q_ASSERT(mDatastore); + return mDatastore->getProperty(entity, property); + } + + virtual void skip() { mSource->skip(); }; + + //Returns true for as long as a result is available + virtual bool next(const std::function &callback) = 0; + + QSharedPointer mSource; + DataStoreQuery *mDatastore; }; +/* class Reduce { */ +/* QByteArray property; */ + +/* //Property - value, reduction result */ +/* QHash mReducedValue; */ +/* }; */ + +/* class Bloom { */ +/* QByteArray property; */ + +/* //Property - value, reduction result */ +/* QSet mPropertyValues; */ + +/* }; */ + diff --git a/common/domain/event.cpp b/common/domain/event.cpp index 118ffa3..f3abd62 100644 --- a/common/domain/event.cpp +++ b/common/domain/event.cpp @@ -33,6 +33,7 @@ #include "../definitions.h" #include "../typeindex.h" #include "entitybuffer.h" +#include "datastorequery.h" #include "entity_generated.h" #include "event_generated.h" diff --git a/common/domain/event.h b/common/domain/event.h index e1ca061..684b58e 100644 --- a/common/domain/event.h +++ b/common/domain/event.h @@ -21,7 +21,6 @@ #include "applicationdomaintype.h" #include "storage.h" -#include "datastorequery.h" class ResultSet; class QByteArray; @@ -31,6 +30,8 @@ class ReadPropertyMapper; template class WritePropertyMapper; +class DataStoreQuery; + namespace Sink { class Query; @@ -51,7 +52,7 @@ public: typedef Sink::ApplicationDomain::Buffer::Event Buffer; typedef Sink::ApplicationDomain::Buffer::EventBuilder BufferBuilder; static QSet indexedProperties(); - static DataStoreQuery::Ptr prepareQuery(const Sink::Query &query, Sink::Storage::Transaction &transaction); + static QSharedPointer prepareQuery(const Sink::Query &query, Sink::Storage::Transaction &transaction); static void index(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::Transaction &transaction); static void removeIndex(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::Transaction &transaction); static QSharedPointer > initializeReadPropertyMapper(); diff --git a/common/domain/folder.cpp b/common/domain/folder.cpp index 17d9f13..824fa0b 100644 --- a/common/domain/folder.cpp +++ b/common/domain/folder.cpp @@ -33,6 +33,7 @@ #include "../definitions.h" #include "../typeindex.h" #include "entitybuffer.h" +#include "datastorequery.h" #include "entity_generated.h" #include "folder_generated.h" diff --git a/common/domain/folder.h b/common/domain/folder.h index ff87006..e4631de 100644 --- a/common/domain/folder.h +++ b/common/domain/folder.h @@ -21,10 +21,10 @@ #include "applicationdomaintype.h" #include "storage.h" -#include "datastorequery.h" class ResultSet; class QByteArray; +class DataStoreQuery; template class ReadPropertyMapper; @@ -45,7 +45,7 @@ class TypeImplementation { public: typedef Sink::ApplicationDomain::Buffer::Folder Buffer; typedef Sink::ApplicationDomain::Buffer::FolderBuilder BufferBuilder; - static DataStoreQuery::Ptr prepareQuery(const Sink::Query &query, Sink::Storage::Transaction &transaction); + static QSharedPointer prepareQuery(const Sink::Query &query, Sink::Storage::Transaction &transaction); static QSet indexedProperties(); static void index(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::Transaction &transaction); static void removeIndex(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::Transaction &transaction); diff --git a/common/domain/mail.cpp b/common/domain/mail.cpp index 0c737fa..483a2f2 100644 --- a/common/domain/mail.cpp +++ b/common/domain/mail.cpp @@ -33,6 +33,7 @@ #include "../definitions.h" #include "../typeindex.h" #include "entitybuffer.h" +#include "datastorequery.h" #include "entity_generated.h" #include "mail_generated.h" @@ -210,68 +211,16 @@ QSharedPointer::BufferBuilder> > Ty return propertyMapper; } -class ThreadedDataStoreQuery : public DataStoreQuery -{ -public: - typedef QSharedPointer Ptr; - using DataStoreQuery::DataStoreQuery; - -protected: - ResultSet postSortFilter(ResultSet &resultSet) Q_DECL_OVERRIDE - { - auto query = mQuery; - if (query.threadLeaderOnly) { - auto rootCollection = QSharedPointer>::create(); - auto filter = [this, query, rootCollection](const QByteArray &uid, const Sink::EntityBuffer &entity) -> bool { - //TODO lookup thread - //if we got thread already in the result set compare dates and if newer replace - //else insert - - const auto messageId = getProperty(entity.entity(), ApplicationDomain::Mail::MessageId::name).toByteArray(); - - Index msgIdIndex("msgId", mTransaction); - Index msgIdThreadIdIndex("msgIdThreadId", mTransaction); - auto thread = msgIdThreadIdIndex.lookup(messageId); - SinkTrace() << "MsgId: " << messageId << " Thread: " << thread << getProperty(entity.entity(), ApplicationDomain::Mail::Date::name).toDateTime(); - - if (rootCollection->contains(thread)) { - auto date = rootCollection->value(thread); - //The mail we have in our result already is newer, so we can ignore this one - //This is always true during the initial query if the set has been sorted by date. - if (date > getProperty(entity.entity(), ApplicationDomain::Mail::Date::name).toDateTime()) { - return false; - } - qWarning() << "############################################################################"; - qWarning() << "Found a newer mail, remove the old one"; - qWarning() << "############################################################################"; - } - rootCollection->insert(thread, getProperty(entity.entity(), ApplicationDomain::Mail::Date::name).toDateTime()); - return true; - }; - return createFilteredSet(resultSet, filter); - } else { - return resultSet; - } - } -}; DataStoreQuery::Ptr TypeImplementation::prepareQuery(const Sink::Query &query, Sink::Storage::Transaction &transaction) { - if (query.threadLeaderOnly) { - auto mapper = initializeReadPropertyMapper(); - return ThreadedDataStoreQuery::Ptr::create(query, ApplicationDomain::getTypeName(), transaction, getIndex(), [mapper](const Sink::Entity &entity, const QByteArray &property) { - const auto localBuffer = Sink::EntityBuffer::readBuffer(entity.local()); - return mapper->getProperty(property, localBuffer); - }); - } else { auto mapper = initializeReadPropertyMapper(); return DataStoreQuery::Ptr::create(query, ApplicationDomain::getTypeName(), transaction, getIndex(), [mapper](const Sink::Entity &entity, const QByteArray &property) { const auto localBuffer = Sink::EntityBuffer::readBuffer(entity.local()); return mapper->getProperty(property, localBuffer); }); - } } diff --git a/common/domain/mail.h b/common/domain/mail.h index 3b0e9da..ea3ef9e 100644 --- a/common/domain/mail.h +++ b/common/domain/mail.h @@ -25,6 +25,7 @@ class ResultSet; class QByteArray; +class DataStoreQuery; template class ReadPropertyMapper; @@ -45,7 +46,7 @@ class TypeImplementation { public: typedef Sink::ApplicationDomain::Buffer::Mail Buffer; typedef Sink::ApplicationDomain::Buffer::MailBuilder BufferBuilder; - static DataStoreQuery::Ptr prepareQuery(const Sink::Query &query, Sink::Storage::Transaction &transaction); + static QSharedPointer prepareQuery(const Sink::Query &query, Sink::Storage::Transaction &transaction); static QSet indexedProperties(); static void index(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::Transaction &transaction); static void removeIndex(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::Transaction &transaction); diff --git a/common/entityreader.cpp b/common/entityreader.cpp index d86f4a9..bd973d0 100644 --- a/common/entityreader.cpp +++ b/common/entityreader.cpp @@ -22,6 +22,7 @@ #include "resultset.h" #include "storage.h" #include "query.h" +#include "datastorequery.h" SINK_DEBUG_AREA("entityreader") diff --git a/common/mailpreprocessor.cpp b/common/mailpreprocessor.cpp index 0534338..ec5748f 100644 --- a/common/mailpreprocessor.cpp +++ b/common/mailpreprocessor.cpp @@ -120,14 +120,18 @@ void MailPropertyExtractor::newEntity(Sink::ApplicationDomain::Mail &mail, Sink: { MimeMessageReader mimeMessageReader(getFilePathFromMimeMessagePath(mail.getMimeMessagePath())); auto msg = mimeMessageReader.mimeMessage(); - updatedIndexedProperties(mail, msg); + if (msg) { + updatedIndexedProperties(mail, msg); + } } void MailPropertyExtractor::modifiedEntity(const Sink::ApplicationDomain::Mail &oldMail, Sink::ApplicationDomain::Mail &newMail,Sink::Storage::Transaction &transaction) { MimeMessageReader mimeMessageReader(getFilePathFromMimeMessagePath(newMail.getMimeMessagePath())); auto msg = mimeMessageReader.mimeMessage(); - updatedIndexedProperties(newMail, msg); + if (msg) { + updatedIndexedProperties(newMail, msg); + } } diff --git a/common/modelresult.cpp b/common/modelresult.cpp index 56a39ee..d13bba9 100644 --- a/common/modelresult.cpp +++ b/common/modelresult.cpp @@ -188,7 +188,7 @@ void ModelResult::add(const Ptr &value) return; } auto parent = createIndexFromId(id); - // qDebug() << "Added entity " << childId << value->identifier() << id; + SinkTrace() << "Added entity " << childId << value->identifier() << id; const auto keys = mTree[id]; int index = 0; for (; index < keys.size(); index++) { @@ -200,13 +200,13 @@ void ModelResult::add(const Ptr &value) SinkWarning() << "Entity already in model " << value->identifier(); return; } - // qDebug() << "Inserting rows " << index << parent; + // SinkTrace() << "Inserting rows " << index << parent; beginInsertRows(parent, index, index); mEntities.insert(childId, value); mTree[id].insert(index, childId); mParents.insert(childId, id); endInsertRows(); - // qDebug() << "Inserted rows " << mTree[id].size(); + // SinkTrace() << "Inserted rows " << mTree[id].size(); } @@ -216,7 +216,7 @@ void ModelResult::remove(const Ptr &value) auto childId = qHash(*value); auto id = parentId(value); auto parent = createIndexFromId(id); - // qDebug() << "Removed entity" << childId; + SinkTrace() << "Removed entity" << childId; auto index = mTree[id].indexOf(childId); beginRemoveRows(parent, index, index); mEntities.remove(childId); @@ -259,6 +259,7 @@ void ModelResult::setEmitter(const typename Sink::ResultEmitter::Pt }); }); emitter->onModified([this](const Ptr &value) { + SinkTrace() << "Received modification: " << value->identifier(); threadBoundary.callInMainThread([this, value]() { modify(value); }); @@ -294,8 +295,9 @@ void ModelResult::modify(const Ptr &value) return; } auto parent = createIndexFromId(id); - // qDebug() << "Modified entity" << childId; + SinkTrace() << "Modified entity" << childId; auto i = mTree[id].indexOf(childId); + Q_ASSERT(i >= 0); mEntities.remove(childId); mEntities.insert(childId, value); // TODO check for change of parents diff --git a/common/query.cpp b/common/query.cpp index fd99367..3de80d8 100644 --- a/common/query.cpp +++ b/common/query.cpp @@ -51,6 +51,9 @@ bool Query::Comparator::matches(const QVariant &v) const switch(comparator) { case Equals: if (!v.isValid()) { + if (!value.isValid()) { + return true; + } return false; } return v == value; diff --git a/common/typeindex.cpp b/common/typeindex.cpp index b96c5b5..1b04966 100644 --- a/common/typeindex.cpp +++ b/common/typeindex.cpp @@ -87,7 +87,7 @@ template <> void TypeIndex::addProperty(const QByteArray &property) { auto indexer = [this, property](const QByteArray &identifier, const QVariant &value, Sink::Storage::Transaction &transaction) { - // SinkTrace() << "Indexing " << mType + ".index." + property << date.toString(); + //SinkTrace() << "Indexing " << mType + ".index." + property << getByteArray(value); Index(indexName(property), transaction).add(getByteArray(value), identifier); }; mIndexer.insert(property, indexer); @@ -138,7 +138,7 @@ void TypeIndex::remove(const QByteArray &identifier, const Sink::ApplicationDoma } } -ResultSet TypeIndex::query(const Sink::Query &query, QSet &appliedFilters, QByteArray &appliedSorting, Sink::Storage::Transaction &transaction) +QVector TypeIndex::query(const Sink::Query &query, QSet &appliedFilters, QByteArray &appliedSorting, Sink::Storage::Transaction &transaction) { QVector keys; for (auto it = mSortedProperties.constBegin(); it != mSortedProperties.constEnd(); it++) { @@ -151,7 +151,7 @@ ResultSet TypeIndex::query(const Sink::Query &query, QSet &appliedFi appliedFilters << it.key(); appliedSorting = it.value(); SinkTrace() << "Index lookup on " << it.key() << it.value() << " found " << keys.size() << " keys."; - return ResultSet(keys); + return keys; } } for (const auto &property : mProperties) { @@ -162,9 +162,9 @@ ResultSet TypeIndex::query(const Sink::Query &query, QSet &appliedFi lookupKey, [&](const QByteArray &value) { keys << value; }, [property](const Index::Error &error) { SinkWarning() << "Error in index: " << error.message << property; }); appliedFilters << property; SinkTrace() << "Index lookup on " << property << " found " << keys.size() << " keys."; - return ResultSet(keys); + return keys; } } SinkTrace() << "No matching index"; - return ResultSet(keys); + return keys; } diff --git a/common/typeindex.h b/common/typeindex.h index a16179c..f5a32b9 100644 --- a/common/typeindex.h +++ b/common/typeindex.h @@ -37,7 +37,7 @@ public: void add(const QByteArray &identifier, const Sink::ApplicationDomain::BufferAdaptor &bufferAdaptor, Sink::Storage::Transaction &transaction); void remove(const QByteArray &identifier, const Sink::ApplicationDomain::BufferAdaptor &bufferAdaptor, Sink::Storage::Transaction &transaction); - ResultSet query(const Sink::Query &query, QSet &appliedFilters, QByteArray &appliedSorting, Sink::Storage::Transaction &transaction); + QVector query(const Sink::Query &query, QSet &appliedFilters, QByteArray &appliedSorting, Sink::Storage::Transaction &transaction); private: QByteArray indexName(const QByteArray &property, const QByteArray &sortProperty = QByteArray()) const; -- cgit v1.2.3