From be8dba1827ec54ec11d9a3ef143db9ad7f7f38df Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Mon, 26 Sep 2016 11:58:38 +0200 Subject: The threading reduction is working. --- common/datastorequery.cpp | 121 +++++++++++++++++++++++++++++----------------- 1 file changed, 76 insertions(+), 45 deletions(-) (limited to 'common/datastorequery.cpp') diff --git a/common/datastorequery.cpp b/common/datastorequery.cpp index 95df1a0..7c0fdea 100644 --- a/common/datastorequery.cpp +++ b/common/datastorequery.cpp @@ -141,51 +141,78 @@ public: } }; -/* class Reduction : public FilterBase { */ -/* public: */ -/* typedef QSharedPointer Ptr; */ +class Reduce : public FilterBase { +public: + typedef QSharedPointer Ptr; + + QHash mAggregateValues; + QByteArray mReductionProperty; + QByteArray mSelectionProperty; + enum SelectionComparator { + Max + /* Min, */ + /* First */ + }; + SelectionComparator mSelectionComparator; -/* QHash aggregateValues; */ + Reduce(const QByteArray &reductionProperty, const QByteArray &selectionProperty, SelectionComparator comparator, FilterBase::Ptr source, DataStoreQuery *store) + : FilterBase(source, store), + mReductionProperty(reductionProperty), + mSelectionProperty(selectionProperty), + mSelectionComparator(comparator) + { -/* Reduction(FilterBase::Ptr source, DataStoreQuery *store) */ -/* : FilterBase(source, store) */ -/* { */ + } -/* } */ + virtual ~Reduce(){} -/* virtual ~Reduction(){} */ - -/* bool next(const std::function &callback) Q_DECL_OVERRIDE { */ -/* bool foundValue = false; */ -/* while(!foundValue && mSource->next([this, callback, &foundValue](const QByteArray &uid, const Sink::EntityBuffer &entityBuffer) { */ -/* const auto operation = entityBuffer.operation(); */ -/* SinkTrace() << "Filter: " << uid << operation; */ -/* //Always accept removals. They can't match the filter since the data is gone. */ -/* if (operation == Sink::Operation_Removal) { */ -/* callback(uid, entityBuffer); */ -/* foundValue = true; */ -/* } else if (matchesFilter(uid, entityBuffer)) { */ -/* callback(uid, entityBuffer); */ -/* foundValue = true; */ -/* } */ -/* return false; */ -/* })) */ -/* {} */ -/* return foundValue; */ -/* } */ + static QByteArray getByteArray(const QVariant &value) + { + if (value.type() == QVariant::DateTime) { + return value.toDateTime().toString().toLatin1(); + } + if (value.isValid() && !value.toByteArray().isEmpty()) { + return value.toByteArray(); + } + return QByteArray(); + } -/* bool matchesFilter(const QByteArray &uid, const Sink::EntityBuffer &entityBuffer) { */ -/* for (const auto &filterProperty : propertyFilter.keys()) { */ -/* const auto property = getProperty(entityBuffer.entity(), filterProperty); */ -/* const auto comparator = propertyFilter.value(filterProperty); */ -/* if (!comparator.matches(property)) { */ -/* SinkTrace() << "Filtering entity due to property mismatch on filter: " << filterProperty << property << ":" << comparator.value; */ -/* return false; */ -/* } */ -/* } */ -/* return true; */ -/* } */ -/* }; */ + static bool compare(const QVariant &left, const QVariant &right, SelectionComparator comparator) + { + if (comparator == Max) { + return left > right; + } + return false; + } + + bool next(const std::function &callback) Q_DECL_OVERRIDE { + bool foundValue = false; + while(!foundValue && mSource->next([this, callback, &foundValue](Sink::Operation operation, const QByteArray &uid, const Sink::EntityBuffer &entityBuffer) { + auto reductionValue = getProperty(entityBuffer.entity(), mReductionProperty); + if (!mAggregateValues.contains(getByteArray(reductionValue))) { + QVariant selectionResultValue; + QByteArray selectionResult; + auto results = indexLookup(mReductionProperty, reductionValue); + for (const auto r : results) { + readEntity(r, [&, this](const QByteArray &uid, const Sink::EntityBuffer &entityBuffer) { + auto selectionValue = getProperty(entityBuffer.entity(), mSelectionProperty); + if (!selectionResultValue.isValid() || compare(selectionValue, selectionResultValue, mSelectionComparator)) { + selectionResultValue = selectionValue; + selectionResult = uid; + } + }); + } + readEntity(selectionResult, [&, this](const QByteArray &uid, const Sink::EntityBuffer &entityBuffer) { + callback(Sink::Operation_Creation, uid, entityBuffer); + foundValue = true; + }); + } + return false; + })) + {} + return foundValue; + } +}; DataStoreQuery::DataStoreQuery(const Sink::Query &query, const QByteArray &type, Sink::Storage::Transaction &transaction, TypeIndex &typeIndex, std::function getProperty) : mQuery(query), mTransaction(transaction), mType(type), mTypeIndex(typeIndex), mDb(Storage::mainDatabase(mTransaction, mType)), mGetProperty(getProperty) @@ -230,6 +257,11 @@ QVariant DataStoreQuery::getProperty(const Sink::Entity &entity, const QByteArra return mGetProperty(entity, property); } +QVector DataStoreQuery::indexLookup(const QByteArray &property, const QVariant &value) +{ + return mTypeIndex.lookup(property, value, mTransaction); +} + /* ResultSet DataStoreQuery::filterAndSortSet(ResultSet &resultSet, const FilterFunction &filter, const QByteArray &sortProperty) */ /* { */ /* const bool sortingRequired = !sortProperty.isEmpty(); */ @@ -347,11 +379,10 @@ void DataStoreQuery::setupQuery() /* baseSet = Sort::Ptr::create(baseSet, mQuery.sortProperty); */ /* } */ - /* if (mQuery.threadLeaderOnly) { */ - /* auto reduce = Reduce::Ptr::create(baseSet, this); */ - - /* baseSet = reduce; */ - /* } */ + if (mQuery.threadLeaderOnly) { + auto reduce = Reduce::Ptr::create("threadId", "date", Reduce::Max, baseSet, this); + baseSet = reduce; + } mCollector = Collector::Ptr::create(baseSet, this); } -- cgit v1.2.3