From be8dba1827ec54ec11d9a3ef143db9ad7f7f38df Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Mon, 26 Sep 2016 11:58:38 +0200 Subject: The threading reduction is working. --- common/datastorequery.cpp | 121 +++++++++++++++++++++------------- common/datastorequery.h | 7 ++ common/domain/applicationdomaintype.h | 8 +++ common/domain/mail.cpp | 80 ++++++++++++---------- common/modelresult.cpp | 6 ++ common/typeindex.cpp | 68 +++++++++++++++++++ common/typeindex.h | 39 +++++++++++ 7 files changed, 251 insertions(+), 78 deletions(-) (limited to 'common') diff --git a/common/datastorequery.cpp b/common/datastorequery.cpp index 95df1a0..7c0fdea 100644 --- a/common/datastorequery.cpp +++ b/common/datastorequery.cpp @@ -141,51 +141,78 @@ public: } }; -/* class Reduction : public FilterBase { */ -/* public: */ -/* typedef QSharedPointer Ptr; */ +class Reduce : public FilterBase { +public: + typedef QSharedPointer Ptr; + + QHash mAggregateValues; + QByteArray mReductionProperty; + QByteArray mSelectionProperty; + enum SelectionComparator { + Max + /* Min, */ + /* First */ + }; + SelectionComparator mSelectionComparator; -/* QHash aggregateValues; */ + Reduce(const QByteArray &reductionProperty, const QByteArray &selectionProperty, SelectionComparator comparator, FilterBase::Ptr source, DataStoreQuery *store) + : FilterBase(source, store), + mReductionProperty(reductionProperty), + mSelectionProperty(selectionProperty), + mSelectionComparator(comparator) + { -/* Reduction(FilterBase::Ptr source, DataStoreQuery *store) */ -/* : FilterBase(source, store) */ -/* { */ + } -/* } */ + virtual ~Reduce(){} -/* virtual ~Reduction(){} */ - -/* bool next(const std::function &callback) Q_DECL_OVERRIDE { */ -/* bool foundValue = false; */ -/* while(!foundValue && mSource->next([this, callback, &foundValue](const QByteArray &uid, const Sink::EntityBuffer &entityBuffer) { */ -/* const auto operation = entityBuffer.operation(); */ -/* SinkTrace() << "Filter: " << uid << operation; */ -/* //Always accept removals. They can't match the filter since the data is gone. */ -/* if (operation == Sink::Operation_Removal) { */ -/* callback(uid, entityBuffer); */ -/* foundValue = true; */ -/* } else if (matchesFilter(uid, entityBuffer)) { */ -/* callback(uid, entityBuffer); */ -/* foundValue = true; */ -/* } */ -/* return false; */ -/* })) */ -/* {} */ -/* return foundValue; */ -/* } */ + static QByteArray getByteArray(const QVariant &value) + { + if (value.type() == QVariant::DateTime) { + return value.toDateTime().toString().toLatin1(); + } + if (value.isValid() && !value.toByteArray().isEmpty()) { + return value.toByteArray(); + } + return QByteArray(); + } -/* bool matchesFilter(const QByteArray &uid, const Sink::EntityBuffer &entityBuffer) { */ -/* for (const auto &filterProperty : propertyFilter.keys()) { */ -/* const auto property = getProperty(entityBuffer.entity(), filterProperty); */ -/* const auto comparator = propertyFilter.value(filterProperty); */ -/* if (!comparator.matches(property)) { */ -/* SinkTrace() << "Filtering entity due to property mismatch on filter: " << filterProperty << property << ":" << comparator.value; */ -/* return false; */ -/* } */ -/* } */ -/* return true; */ -/* } */ -/* }; */ + static bool compare(const QVariant &left, const QVariant &right, SelectionComparator comparator) + { + if (comparator == Max) { + return left > right; + } + return false; + } + + bool next(const std::function &callback) Q_DECL_OVERRIDE { + bool foundValue = false; + while(!foundValue && mSource->next([this, callback, &foundValue](Sink::Operation operation, const QByteArray &uid, const Sink::EntityBuffer &entityBuffer) { + auto reductionValue = getProperty(entityBuffer.entity(), mReductionProperty); + if (!mAggregateValues.contains(getByteArray(reductionValue))) { + QVariant selectionResultValue; + QByteArray selectionResult; + auto results = indexLookup(mReductionProperty, reductionValue); + for (const auto r : results) { + readEntity(r, [&, this](const QByteArray &uid, const Sink::EntityBuffer &entityBuffer) { + auto selectionValue = getProperty(entityBuffer.entity(), mSelectionProperty); + if (!selectionResultValue.isValid() || compare(selectionValue, selectionResultValue, mSelectionComparator)) { + selectionResultValue = selectionValue; + selectionResult = uid; + } + }); + } + readEntity(selectionResult, [&, this](const QByteArray &uid, const Sink::EntityBuffer &entityBuffer) { + callback(Sink::Operation_Creation, uid, entityBuffer); + foundValue = true; + }); + } + return false; + })) + {} + return foundValue; + } +}; DataStoreQuery::DataStoreQuery(const Sink::Query &query, const QByteArray &type, Sink::Storage::Transaction &transaction, TypeIndex &typeIndex, std::function getProperty) : mQuery(query), mTransaction(transaction), mType(type), mTypeIndex(typeIndex), mDb(Storage::mainDatabase(mTransaction, mType)), mGetProperty(getProperty) @@ -230,6 +257,11 @@ QVariant DataStoreQuery::getProperty(const Sink::Entity &entity, const QByteArra return mGetProperty(entity, property); } +QVector DataStoreQuery::indexLookup(const QByteArray &property, const QVariant &value) +{ + return mTypeIndex.lookup(property, value, mTransaction); +} + /* ResultSet DataStoreQuery::filterAndSortSet(ResultSet &resultSet, const FilterFunction &filter, const QByteArray &sortProperty) */ /* { */ /* const bool sortingRequired = !sortProperty.isEmpty(); */ @@ -347,11 +379,10 @@ void DataStoreQuery::setupQuery() /* baseSet = Sort::Ptr::create(baseSet, mQuery.sortProperty); */ /* } */ - /* if (mQuery.threadLeaderOnly) { */ - /* auto reduce = Reduce::Ptr::create(baseSet, this); */ - - /* baseSet = reduce; */ - /* } */ + if (mQuery.threadLeaderOnly) { + auto reduce = Reduce::Ptr::create("threadId", "date", Reduce::Max, baseSet, this); + baseSet = reduce; + } mCollector = Collector::Ptr::create(baseSet, this); } diff --git a/common/datastorequery.h b/common/datastorequery.h index c9f6a3a..ea61780 100644 --- a/common/datastorequery.h +++ b/common/datastorequery.h @@ -44,6 +44,7 @@ protected: typedef std::function BufferCallback; virtual QVariant getProperty(const Sink::Entity &entity, const QByteArray &property); + QVector indexLookup(const QByteArray &property, const QVariant &value); virtual void readEntity(const QByteArray &key, const BufferCallback &resultCallback); @@ -100,6 +101,12 @@ public: return mDatastore->getProperty(entity, property); } + QVector indexLookup(const QByteArray &property, const QVariant &value) + { + Q_ASSERT(mDatastore); + return mDatastore->indexLookup(property, value); + } + virtual void skip() { mSource->skip(); }; //Returns true for as long as a result is available diff --git a/common/domain/applicationdomaintype.h b/common/domain/applicationdomaintype.h index c853397..c824251 100644 --- a/common/domain/applicationdomaintype.h +++ b/common/domain/applicationdomaintype.h @@ -79,6 +79,12 @@ void set##NAME(const QByteArray &value) { setProperty(NAME::name, QVariant::fromValue(value)); } \ QByteArray get##NAME() const { return getProperty(NAME::name).value(); } \ +#define SINK_INDEX_PROPERTY(TYPE, NAME, LOWERCASENAME) \ + struct NAME { \ + static constexpr const char *name = #LOWERCASENAME; \ + typedef TYPE Type; \ + }; \ + namespace Sink { namespace ApplicationDomain { @@ -240,6 +246,7 @@ struct SINK_EXPORT Mail : public Entity { SINK_PROPERTY(bool, Sent, sent); SINK_EXTRACTED_PROPERTY(QByteArray, MessageId, messageId); SINK_EXTRACTED_PROPERTY(QByteArray, ParentMessageId, parentMessageId); + SINK_INDEX_PROPERTY(QByteArray, ThreadId, threadId); }; /** @@ -378,6 +385,7 @@ class SINK_EXPORT TypeImplementation; #undef SINK_EXTRACTED_PROPERTY #undef SINK_BLOB_PROPERTY #undef SINK_REFERENCE_PROPERTY +#undef SINK_INDEX_PROPERTY Q_DECLARE_METATYPE(Sink::ApplicationDomain::ApplicationDomainType) Q_DECLARE_METATYPE(Sink::ApplicationDomain::ApplicationDomainType::Ptr) diff --git a/common/domain/mail.cpp b/common/domain/mail.cpp index 483a2f2..2b6eb84 100644 --- a/common/domain/mail.cpp +++ b/common/domain/mail.cpp @@ -60,6 +60,10 @@ static TypeIndex &getIndex() index->addPropertyWithSorting(Mail::Folder::name, Mail::Date::name); index->addProperty(Mail::MessageId::name); index->addProperty(Mail::ParentMessageId::name); + + index->addProperty(); + index->addSecondaryProperty(); + index->addSecondaryProperty(); } return *index; } @@ -120,42 +124,44 @@ static QString stripOffPrefixes(const QString &subject) static void updateThreadingIndex(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::Transaction &transaction) { - auto messageId = bufferAdaptor.getProperty(Mail::MessageId::name).toByteArray(); - auto parentMessageId = bufferAdaptor.getProperty(Mail::ParentMessageId::name).toByteArray(); - auto subject = bufferAdaptor.getProperty(Mail::Subject::name).toString(); + auto messageId = bufferAdaptor.getProperty(Mail::MessageId::name); + auto parentMessageId = bufferAdaptor.getProperty(Mail::ParentMessageId::name); + auto subject = bufferAdaptor.getProperty(Mail::Subject::name); - Index msgIdIndex("msgId", transaction); - Index msgIdThreadIdIndex("msgIdThreadId", transaction); - Index subjectThreadIdIndex("subjectThreadId", transaction); + auto normalizedSubject = stripOffPrefixes(subject.toString()).toUtf8(); - //Add the message to the index - Q_ASSERT(msgIdIndex.lookup(messageId).isEmpty()); - msgIdIndex.add(messageId, identifier); + QVector thread; - auto normalizedSubject = stripOffPrefixes(subject).toUtf8(); + //a child already registered our thread. + thread = getIndex().secondaryLookup(messageId, transaction); - QByteArray thread; //If parent is already available, add to thread of parent - if (!parentMessageId.isEmpty() && !msgIdIndex.lookup(parentMessageId).isEmpty()) { - thread = msgIdThreadIdIndex.lookup(parentMessageId); - msgIdThreadIdIndex.add(messageId, thread); - subjectThreadIdIndex.add(normalizedSubject, thread); - } else { + if (thread.isEmpty() && parentMessageId.isValid()) { + thread = getIndex().secondaryLookup(parentMessageId, transaction); + SinkTrace() << "Found parent"; + } + if (thread.isEmpty()) { //Try to lookup the thread by subject: - thread = subjectThreadIdIndex.lookup(normalizedSubject); - if (!thread.isEmpty()) { - msgIdThreadIdIndex.add(messageId, thread); + thread = getIndex().secondaryLookup(normalizedSubject, transaction); + if (thread.isEmpty()) { + SinkTrace() << "Created a new thread "; + thread << QUuid::createUuid().toByteArray(); } else { - thread = QUuid::createUuid().toByteArray(); - subjectThreadIdIndex.add(normalizedSubject, thread); - if (!parentMessageId.isEmpty()) { - //Register parent with thread for when it becomes available - msgIdThreadIdIndex.add(parentMessageId, thread); - } } } - Q_ASSERT(!thread.isEmpty()); - msgIdThreadIdIndex.add(messageId, thread); + + //We should have found the thread by now + if (!thread.isEmpty()) { + if (parentMessageId.isValid()) { + //Register parent with thread for when it becomes available + getIndex().index(parentMessageId, thread.first(), transaction); + } + getIndex().index(messageId, thread.first(), transaction); + getIndex().index(thread.first(), messageId, transaction); + getIndex().index(normalizedSubject, thread.first(), transaction); + } else { + SinkWarning() << "Couldn't find a thread for: " << messageId; + } } void TypeImplementation::index(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::Transaction &transaction) @@ -214,13 +220,21 @@ QSharedPointer::BufferBuilder> > Ty DataStoreQuery::Ptr TypeImplementation::prepareQuery(const Sink::Query &query, Sink::Storage::Transaction &transaction) { - - - auto mapper = initializeReadPropertyMapper(); - return DataStoreQuery::Ptr::create(query, ApplicationDomain::getTypeName(), transaction, getIndex(), [mapper](const Sink::Entity &entity, const QByteArray &property) { - + auto mapper = initializeReadPropertyMapper(); + return DataStoreQuery::Ptr::create(query, ApplicationDomain::getTypeName(), transaction, getIndex(), [mapper, &transaction](const Sink::Entity &entity, const QByteArray &property) -> QVariant { + if (property == Mail::ThreadId::name) { const auto localBuffer = Sink::EntityBuffer::readBuffer(entity.local()); + Q_ASSERT(localBuffer); + auto messageId = mapper->getProperty(Mail::MessageId::name, localBuffer); + //This is an index property that we have too lookup + auto thread = getIndex().secondaryLookup(messageId, transaction); + Q_ASSERT(!thread.isEmpty()); + return thread.first(); + } else { + const auto localBuffer = Sink::EntityBuffer::readBuffer(entity.local()); + Q_ASSERT(localBuffer); return mapper->getProperty(property, localBuffer); - }); + } + }); } diff --git a/common/modelresult.cpp b/common/modelresult.cpp index d13bba9..add84aa 100644 --- a/common/modelresult.cpp +++ b/common/modelresult.cpp @@ -289,6 +289,12 @@ template void ModelResult::modify(const Ptr &value) { auto childId = qHash(*value); + if (!mEntities.contains(childId)) { + //Happens because the DatabaseQuery emits modifiations also if the item used to be filtered. + SinkTrace() << "Tried to modify a value that is not yet part of the model"; + add(value); + return; + } auto id = parentId(value); // Ignore updates we get before the initial fetch is done if (!mEntityChildrenFetched.contains(id)) { diff --git a/common/typeindex.cpp b/common/typeindex.cpp index 1b04966..f537493 100644 --- a/common/typeindex.cpp +++ b/common/typeindex.cpp @@ -168,3 +168,71 @@ QVector TypeIndex::query(const Sink::Query &query, QSet SinkTrace() << "No matching index"; return keys; } + +QVector TypeIndex::lookup(const QByteArray &property, const QVariant &value, Sink::Storage::Transaction &transaction) +{ + SinkTrace() << "Index lookup on property: " << property << mSecondaryProperties.keys() << mProperties; + if (mProperties.contains(property)) { + QVector keys; + Index index(indexName(property), transaction); + const auto lookupKey = getByteArray(value); + index.lookup( + lookupKey, [&](const QByteArray &value) { keys << value; }, [property](const Index::Error &error) { SinkWarning() << "Error in index: " << error.message << property; }); + SinkTrace() << "Index lookup on " << property << " found " << keys.size() << " keys."; + return keys; + } else if (mSecondaryProperties.contains(property)) { + //Lookups on secondary indexes first lookup the key, and then lookup the results again to resolve to entity id's + QVector keys; + auto resultProperty = mSecondaryProperties.value(property); + + QVector secondaryKeys; + Index index(indexName(property + resultProperty), transaction); + const auto lookupKey = getByteArray(value); + index.lookup( + lookupKey, [&](const QByteArray &value) { secondaryKeys << value; }, [property](const Index::Error &error) { SinkWarning() << "Error in index: " << error.message << property; }); + SinkTrace() << "Looked up secondary keys: " << secondaryKeys; + for (const auto &secondary : secondaryKeys) { + keys += lookup(resultProperty, secondary, transaction); + } + return keys; + } else { + SinkWarning() << "Tried to lookup " << property << " but couldn't find value"; + } + return QVector(); +} + +template <> +void TypeIndex::index(const QByteArray &leftName, const QByteArray &rightName, const QVariant &leftValue, const QVariant &rightValue, Sink::Storage::Transaction &transaction) +{ + Index(indexName(leftName + rightName), transaction).add(getByteArray(leftValue), getByteArray(rightValue)); +} + +template <> +void TypeIndex::index(const QByteArray &leftName, const QByteArray &rightName, const QVariant &leftValue, const QVariant &rightValue, Sink::Storage::Transaction &transaction) +{ + Index(indexName(leftName + rightName), transaction).add(getByteArray(leftValue), getByteArray(rightValue)); +} + +template <> +QVector TypeIndex::secondaryLookup(const QByteArray &leftName, const QByteArray &rightName, const QVariant &value, Sink::Storage::Transaction &transaction) +{ + QVector keys; + Index index(indexName(leftName + rightName), transaction); + const auto lookupKey = getByteArray(value); + index.lookup( + lookupKey, [&](const QByteArray &value) { keys << value; }, [=](const Index::Error &error) { SinkWarning() << "Error in index: " << error.message << value; }); + + return keys; +} + +template <> +QVector TypeIndex::secondaryLookup(const QByteArray &leftName, const QByteArray &rightName, const QVariant &value, Sink::Storage::Transaction &transaction) +{ + QVector keys; + Index index(indexName(leftName + rightName), transaction); + const auto lookupKey = getByteArray(value); + index.lookup( + lookupKey, [&](const QByteArray &value) { keys << value; }, [=](const Index::Error &error) { SinkWarning() << "Error in index: " << error.message << value; }); + + return keys; +} diff --git a/common/typeindex.h b/common/typeindex.h index f5a32b9..7266f02 100644 --- a/common/typeindex.h +++ b/common/typeindex.h @@ -34,16 +34,55 @@ public: template void addPropertyWithSorting(const QByteArray &property, const QByteArray &sortProperty); + template + void addProperty() + { + addProperty(T::name); + } + + template + void addPropertyWithSorting() + { + addPropertyWithSorting(T::name); + } + + template + void addSecondaryProperty() + { + mSecondaryProperties.insert(Left::name, Right::name); + } void add(const QByteArray &identifier, const Sink::ApplicationDomain::BufferAdaptor &bufferAdaptor, Sink::Storage::Transaction &transaction); void remove(const QByteArray &identifier, const Sink::ApplicationDomain::BufferAdaptor &bufferAdaptor, Sink::Storage::Transaction &transaction); QVector query(const Sink::Query &query, QSet &appliedFilters, QByteArray &appliedSorting, Sink::Storage::Transaction &transaction); + QVector lookup(const QByteArray &property, const QVariant &value, Sink::Storage::Transaction &transaction); + + template + QVector secondaryLookup(const QVariant &value, Sink::Storage::Transaction &transaction) + { + return secondaryLookup(Left::name, Right::name, value, transaction); + } + + template + QVector secondaryLookup(const QByteArray &leftName, const QByteArray &rightName, const QVariant &value, Sink::Storage::Transaction &transaction); + + template + void index(const QVariant &leftValue, const QVariant &rightValue, Sink::Storage::Transaction &transaction) + { + index(Left::name, Right::name, leftValue, rightValue, transaction); + } + + template + void index(const QByteArray &leftName, const QByteArray &rightName, const QVariant &leftValue, const QVariant &rightValue, Sink::Storage::Transaction &transaction); + private: QByteArray indexName(const QByteArray &property, const QByteArray &sortProperty = QByteArray()) const; QByteArray mType; QByteArrayList mProperties; QMap mSortedProperties; + // + QMap mSecondaryProperties; QHash> mIndexer; QHash> mSortIndexer; }; -- cgit v1.2.3