From ebc5c48c03b6145e604da7c313b35321d0a71142 Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Tue, 20 Sep 2016 17:18:21 +0200 Subject: A first draft of the threading algorithm. --- common/domain/applicationdomaintype.h | 2 + common/domain/event.cpp | 10 +-- common/domain/event.h | 8 +-- common/domain/folder.cpp | 9 +-- common/domain/folder.h | 3 +- common/domain/mail.cpp | 117 +++++++++++++++++++++++++++++----- common/domain/mail.fbs | 2 + common/domain/mail.h | 8 +-- 8 files changed, 113 insertions(+), 46 deletions(-) (limited to 'common/domain') diff --git a/common/domain/applicationdomaintype.h b/common/domain/applicationdomaintype.h index 67f33c8..c853397 100644 --- a/common/domain/applicationdomaintype.h +++ b/common/domain/applicationdomaintype.h @@ -238,6 +238,8 @@ struct SINK_EXPORT Mail : public Entity { SINK_PROPERTY(bool, Draft, draft); SINK_PROPERTY(bool, Trash, trash); SINK_PROPERTY(bool, Sent, sent); + SINK_EXTRACTED_PROPERTY(QByteArray, MessageId, messageId); + SINK_EXTRACTED_PROPERTY(QByteArray, ParentMessageId, parentMessageId); }; /** diff --git a/common/domain/event.cpp b/common/domain/event.cpp index dfbcb61..118ffa3 100644 --- a/common/domain/event.cpp +++ b/common/domain/event.cpp @@ -52,11 +52,6 @@ static TypeIndex &getIndex() return *index; } -ResultSet TypeImplementation::queryIndexes(const Sink::Query &query, const QByteArray &resourceInstanceIdentifier, QSet &appliedFilters, QByteArray &appliedSorting, Sink::Storage::Transaction &transaction) -{ - return getIndex().query(query, appliedFilters, appliedSorting, transaction); -} - void TypeImplementation::index(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::Transaction &transaction) { return getIndex().add(identifier, bufferAdaptor, transaction); @@ -87,11 +82,10 @@ QSharedPointer::BufferBuilder> > T return propertyMapper; } -DataStoreQuery TypeImplementation::prepareQuery(const Sink::Query &query, Sink::Storage::Transaction &transaction) +DataStoreQuery::Ptr TypeImplementation::prepareQuery(const Sink::Query &query, Sink::Storage::Transaction &transaction) { - auto mapper = initializeReadPropertyMapper(); - return DataStoreQuery(query, ApplicationDomain::getTypeName(), transaction, getIndex(), [mapper](const Sink::Entity &entity, const QByteArray &property) { + return DataStoreQuery::Ptr::create(query, ApplicationDomain::getTypeName(), transaction, getIndex(), [mapper](const Sink::Entity &entity, const QByteArray &property) { const auto localBuffer = Sink::EntityBuffer::readBuffer(entity.local()); return mapper->getProperty(property, localBuffer); diff --git a/common/domain/event.h b/common/domain/event.h index 4ac572c..e1ca061 100644 --- a/common/domain/event.h +++ b/common/domain/event.h @@ -51,13 +51,7 @@ public: typedef Sink::ApplicationDomain::Buffer::Event Buffer; typedef Sink::ApplicationDomain::Buffer::EventBuilder BufferBuilder; static QSet indexedProperties(); - static DataStoreQuery prepareQuery(const Sink::Query &query, Sink::Storage::Transaction &transaction); - /** - * Returns the potential result set based on the indexes. - * - * An empty result set indicates that a full scan is required. - */ - static ResultSet queryIndexes(const Sink::Query &query, const QByteArray &resourceInstanceIdentifier, QSet &appliedFilters, QByteArray &appliedSorting, Sink::Storage::Transaction &transaction); + static DataStoreQuery::Ptr prepareQuery(const Sink::Query &query, Sink::Storage::Transaction &transaction); static void index(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::Transaction &transaction); static void removeIndex(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::Transaction &transaction); static QSharedPointer > initializeReadPropertyMapper(); diff --git a/common/domain/folder.cpp b/common/domain/folder.cpp index 6d487b1..17d9f13 100644 --- a/common/domain/folder.cpp +++ b/common/domain/folder.cpp @@ -55,11 +55,6 @@ static TypeIndex &getIndex() return *index; } -ResultSet TypeImplementation::queryIndexes(const Sink::Query &query, const QByteArray &resourceInstanceIdentifier, QSet &appliedFilters, QByteArray &appliedSorting, Sink::Storage::Transaction &transaction) -{ - return getIndex().query(query, appliedFilters, appliedSorting, transaction); -} - void TypeImplementation::index(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::Transaction &transaction) { SinkTrace() << "Indexing " << identifier; @@ -91,10 +86,10 @@ QSharedPointer::BufferBuilder> > return propertyMapper; } -DataStoreQuery TypeImplementation::prepareQuery(const Sink::Query &query, Sink::Storage::Transaction &transaction) +DataStoreQuery::Ptr TypeImplementation::prepareQuery(const Sink::Query &query, Sink::Storage::Transaction &transaction) { auto mapper = initializeReadPropertyMapper(); - return DataStoreQuery(query, ApplicationDomain::getTypeName(), transaction, getIndex(), [mapper](const Sink::Entity &entity, const QByteArray &property) { + return DataStoreQuery::Ptr::create(query, ApplicationDomain::getTypeName(), transaction, getIndex(), [mapper](const Sink::Entity &entity, const QByteArray &property) { const auto localBuffer = Sink::EntityBuffer::readBuffer(entity.local()); return mapper->getProperty(property, localBuffer); }); diff --git a/common/domain/folder.h b/common/domain/folder.h index 77edc8a..ff87006 100644 --- a/common/domain/folder.h +++ b/common/domain/folder.h @@ -45,9 +45,8 @@ class TypeImplementation { public: typedef Sink::ApplicationDomain::Buffer::Folder Buffer; typedef Sink::ApplicationDomain::Buffer::FolderBuilder BufferBuilder; - static DataStoreQuery prepareQuery(const Sink::Query &query, Sink::Storage::Transaction &transaction); + static DataStoreQuery::Ptr prepareQuery(const Sink::Query &query, Sink::Storage::Transaction &transaction); static QSet indexedProperties(); - static ResultSet queryIndexes(const Sink::Query &query, const QByteArray &resourceInstanceIdentifier, QSet &appliedFilters, QByteArray &appliedSorting, Sink::Storage::Transaction &transaction); static void index(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::Transaction &transaction); static void removeIndex(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::Transaction &transaction); static QSharedPointer > initializeReadPropertyMapper(); diff --git a/common/domain/mail.cpp b/common/domain/mail.cpp index bb5ad58..859ebef 100644 --- a/common/domain/mail.cpp +++ b/common/domain/mail.cpp @@ -41,6 +41,7 @@ SINK_DEBUG_AREA("mail"); static QMutex sMutex; +using namespace Sink; using namespace Sink::ApplicationDomain; static TypeIndex &getIndex() @@ -49,26 +50,54 @@ static TypeIndex &getIndex() static TypeIndex *index = 0; if (!index) { index = new TypeIndex("mail"); - index->addProperty("uid"); - index->addProperty("sender"); - index->addProperty("senderName"); - index->addProperty("subject"); - index->addProperty("date"); - index->addProperty("folder"); - index->addPropertyWithSorting("folder", "date"); + index->addProperty(Mail::Uid::name); + index->addProperty(Mail::Sender::name); + index->addProperty(Mail::SenderName::name); + index->addProperty(Mail::Subject::name); + index->addProperty(Mail::Date::name); + index->addProperty(Mail::Folder::name); + index->addPropertyWithSorting(Mail::Folder::name, Mail::Date::name); + index->addProperty(Mail::MessageId::name); + index->addProperty(Mail::ParentMessageId::name); } return *index; } -ResultSet TypeImplementation::queryIndexes(const Sink::Query &query, const QByteArray &resourceInstanceIdentifier, QSet &appliedFilters, QByteArray &appliedSorting, Sink::Storage::Transaction &transaction) +static void updateThreadingIndex(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::Transaction &transaction) { - return getIndex().query(query, appliedFilters, appliedSorting, transaction); + auto messageId = bufferAdaptor.getProperty(Mail::MessageId::name).toByteArray(); + auto parentMessageId = bufferAdaptor.getProperty(Mail::ParentMessageId::name).toByteArray(); + + Index msgIdIndex("msgId", transaction); + Index msgIdThreadIdIndex("msgIdThreadId", transaction); + + //Add the message to the index + Q_ASSERT(msgIdIndex.lookup(messageId).isEmpty()); + msgIdIndex.add(messageId, identifier); + + //If parent is already available, add to thread of parent + QByteArray thread; + if (!parentMessageId.isEmpty() && !msgIdIndex.lookup(parentMessageId).isEmpty()) { + thread = msgIdThreadIdIndex.lookup(parentMessageId); + msgIdThreadIdIndex.add(messageId, thread); + } else { + thread = QUuid::createUuid().toByteArray(); + if (!parentMessageId.isEmpty()) { + //Register parent with thread for when it becomes available + msgIdThreadIdIndex.add(parentMessageId, thread); + } + } + Q_ASSERT(!thread.isEmpty()); + msgIdThreadIdIndex.add(messageId, thread); + + //Look for parentMessageId and resolve to local id if available } void TypeImplementation::index(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::Transaction &transaction) { SinkTrace() << "Indexing " << identifier; getIndex().add(identifier, bufferAdaptor, transaction); + updateThreadingIndex(identifier, bufferAdaptor, transaction); } void TypeImplementation::removeIndex(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::Transaction &transaction) @@ -91,6 +120,8 @@ QSharedPointer::Buffer> > TypeImplem propertyMapper->addMapping(&Buffer::draft); propertyMapper->addMapping(&Buffer::trash); propertyMapper->addMapping(&Buffer::sent); + propertyMapper->addMapping(&Buffer::messageId); + propertyMapper->addMapping(&Buffer::parentMessageId); return propertyMapper; } @@ -110,16 +141,72 @@ QSharedPointer::BufferBuilder> > Ty propertyMapper->addMapping(&BufferBuilder::add_draft); propertyMapper->addMapping(&BufferBuilder::add_trash); propertyMapper->addMapping(&BufferBuilder::add_sent); + propertyMapper->addMapping(&BufferBuilder::add_messageId); + propertyMapper->addMapping(&BufferBuilder::add_parentMessageId); return propertyMapper; } -DataStoreQuery TypeImplementation::prepareQuery(const Sink::Query &query, Sink::Storage::Transaction &transaction) +class ThreadedDataStoreQuery : public DataStoreQuery +{ +public: + typedef QSharedPointer Ptr; + using DataStoreQuery::DataStoreQuery; + +protected: + ResultSet postSortFilter(ResultSet &resultSet) Q_DECL_OVERRIDE + { + auto query = mQuery; + if (query.threadLeaderOnly) { + auto rootCollection = QSharedPointer>::create(); + auto filter = [this, query, rootCollection](const QByteArray &uid, const Sink::EntityBuffer &entity) -> bool { + //TODO lookup thread + //if we got thread already in the result set compare dates and if newer replace + //else insert + + const auto messageId = getProperty(entity.entity(), ApplicationDomain::Mail::MessageId::name).toByteArray(); + + Index msgIdIndex("msgId", mTransaction); + Index msgIdThreadIdIndex("msgIdThreadId", mTransaction); + auto thread = msgIdThreadIdIndex.lookup(messageId); + SinkTrace() << "MsgId: " << messageId << " Thread: " << thread << getProperty(entity.entity(), ApplicationDomain::Mail::Date::name).toDateTime(); + + if (rootCollection->contains(thread)) { + auto date = rootCollection->value(thread); + //The mail we have in our result already is newer, so we can ignore this one + if (date > getProperty(entity.entity(), ApplicationDomain::Mail::Date::name).toDateTime()) { + return false; + } + qWarning() << "############################################################################"; + qWarning() << "Found a newer mail, remove the old one"; + qWarning() << "############################################################################"; + } + rootCollection->insert(thread, getProperty(entity.entity(), ApplicationDomain::Mail::Date::name).toDateTime()); + return true; + }; + return createFilteredSet(resultSet, filter); + } else { + return resultSet; + } + } +}; + +DataStoreQuery::Ptr TypeImplementation::prepareQuery(const Sink::Query &query, Sink::Storage::Transaction &transaction) { - auto mapper = initializeReadPropertyMapper(); - return DataStoreQuery(query, ApplicationDomain::getTypeName(), transaction, getIndex(), [mapper](const Sink::Entity &entity, const QByteArray &property) { + if (query.threadLeaderOnly) { + auto mapper = initializeReadPropertyMapper(); + return ThreadedDataStoreQuery::Ptr::create(query, ApplicationDomain::getTypeName(), transaction, getIndex(), [mapper](const Sink::Entity &entity, const QByteArray &property) { + + const auto localBuffer = Sink::EntityBuffer::readBuffer(entity.local()); + return mapper->getProperty(property, localBuffer); + }); - const auto localBuffer = Sink::EntityBuffer::readBuffer(entity.local()); - return mapper->getProperty(property, localBuffer); - }); + } else { + auto mapper = initializeReadPropertyMapper(); + return DataStoreQuery::Ptr::create(query, ApplicationDomain::getTypeName(), transaction, getIndex(), [mapper](const Sink::Entity &entity, const QByteArray &property) { + + const auto localBuffer = Sink::EntityBuffer::readBuffer(entity.local()); + return mapper->getProperty(property, localBuffer); + }); + } } diff --git a/common/domain/mail.fbs b/common/domain/mail.fbs index a0c0d82..f14e9f1 100644 --- a/common/domain/mail.fbs +++ b/common/domain/mail.fbs @@ -13,6 +13,8 @@ table Mail { draft:bool = false; trash:bool = false; sent:bool = false; + messageId:string; + parentMessageId:string; } root_type Mail; diff --git a/common/domain/mail.h b/common/domain/mail.h index d6af9c5..3b0e9da 100644 --- a/common/domain/mail.h +++ b/common/domain/mail.h @@ -45,14 +45,8 @@ class TypeImplementation { public: typedef Sink::ApplicationDomain::Buffer::Mail Buffer; typedef Sink::ApplicationDomain::Buffer::MailBuilder BufferBuilder; - static DataStoreQuery prepareQuery(const Sink::Query &query, Sink::Storage::Transaction &transaction); + static DataStoreQuery::Ptr prepareQuery(const Sink::Query &query, Sink::Storage::Transaction &transaction); static QSet indexedProperties(); - /** - * Returns the potential result set based on the indexes. - * - * An empty result set indicates that a full scan is required. - */ - static ResultSet queryIndexes(const Sink::Query &query, const QByteArray &resourceInstanceIdentifier, QSet &appliedFilters, QByteArray &appliedSorting, Sink::Storage::Transaction &transaction); static void index(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::Transaction &transaction); static void removeIndex(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::Transaction &transaction); static QSharedPointer > initializeReadPropertyMapper(); -- cgit v1.2.3