From ebc5c48c03b6145e604da7c313b35321d0a71142 Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Tue, 20 Sep 2016 17:18:21 +0200 Subject: A first draft of the threading algorithm. --- common/datastorequery.cpp | 55 ++++++++++++---- common/datastorequery.h | 20 ++++-- common/domain/applicationdomaintype.h | 2 + common/domain/event.cpp | 10 +-- common/domain/event.h | 8 +-- common/domain/folder.cpp | 9 +-- common/domain/folder.h | 3 +- common/domain/mail.cpp | 117 +++++++++++++++++++++++++++++----- common/domain/mail.fbs | 2 + common/domain/mail.h | 8 +-- common/entityreader.cpp | 25 +------- common/mailpreprocessor.cpp | 102 +++++++++++++++++++++-------- common/mailpreprocessor.h | 3 - common/query.h | 5 +- 14 files changed, 250 insertions(+), 119 deletions(-) (limited to 'common') diff --git a/common/datastorequery.cpp b/common/datastorequery.cpp index 3237c53..cc070be 100644 --- a/common/datastorequery.cpp +++ b/common/datastorequery.cpp @@ -122,17 +122,17 @@ QVariant DataStoreQuery::getProperty(const Sink::Entity &entity, const QByteArra return mGetProperty(entity, property); } -ResultSet DataStoreQuery::filterAndSortSet(ResultSet &resultSet, const FilterFunction &filter, bool initialQuery, const QByteArray &sortProperty) +ResultSet DataStoreQuery::filterAndSortSet(ResultSet &resultSet, const FilterFunction &filter, const QByteArray &sortProperty) { const bool sortingRequired = !sortProperty.isEmpty(); - if (initialQuery && sortingRequired) { + if (mInitialQuery && sortingRequired) { SinkTrace() << "Sorting the resultset in memory according to property: " << sortProperty; // Sort the complete set by reading the sort property and filling into a sorted map auto sortedMap = QSharedPointer>::create(); while (resultSet.next()) { // readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess) readEntity(resultSet.id(), - [this, filter, initialQuery, sortedMap, sortProperty, &resultSet](const QByteArray &uid, const Sink::EntityBuffer &buffer) { + [this, filter, sortedMap, sortProperty, &resultSet](const QByteArray &uid, const Sink::EntityBuffer &buffer) { const auto operation = buffer.operation(); @@ -154,10 +154,10 @@ ResultSet DataStoreQuery::filterAndSortSet(ResultSet &resultSet, const FilterFun SinkTrace() << "Sorted " << sortedMap->size() << " values."; auto iterator = QSharedPointer>::create(*sortedMap); - ResultSet::ValueGenerator generator = [this, iterator, sortedMap, filter, initialQuery]( + ResultSet::ValueGenerator generator = [this, iterator, sortedMap, filter]( std::function callback) -> bool { if (iterator->hasNext()) { - readEntity(iterator->next().value(), [this, filter, callback, initialQuery](const QByteArray &uid, const Sink::EntityBuffer &buffer) { + readEntity(iterator->next().value(), [this, filter, callback](const QByteArray &uid, const Sink::EntityBuffer &buffer) { callback(uid, buffer, Sink::Operation_Creation); }); return true; @@ -173,13 +173,13 @@ ResultSet DataStoreQuery::filterAndSortSet(ResultSet &resultSet, const FilterFun return ResultSet(generator, skip); } else { auto resultSetPtr = QSharedPointer::create(resultSet); - ResultSet::ValueGenerator generator = [this, resultSetPtr, filter, initialQuery](const ResultSet::Callback &callback) -> bool { + ResultSet::ValueGenerator generator = [this, resultSetPtr, filter](const ResultSet::Callback &callback) -> bool { if (resultSetPtr->next()) { SinkTrace() << "Reading the next value: " << resultSetPtr->id(); // readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess) - readEntity(resultSetPtr->id(), [this, filter, callback, initialQuery](const QByteArray &uid, const Sink::EntityBuffer &buffer) { + readEntity(resultSetPtr->id(), [this, filter, callback](const QByteArray &uid, const Sink::EntityBuffer &buffer) { const auto operation = buffer.operation(); - if (initialQuery) { + if (mInitialQuery) { // We're not interested in removals during the initial query if ((operation != Sink::Operation_Removal) && filter(uid, buffer)) { // In the initial set every entity is new @@ -225,22 +225,53 @@ DataStoreQuery::FilterFunction DataStoreQuery::getFilter(const QSet }; } +ResultSet DataStoreQuery::createFilteredSet(ResultSet &resultSet, const std::function &filter) +{ + auto resultSetPtr = QSharedPointer::create(resultSet); + ResultSet::ValueGenerator generator = [this, resultSetPtr, filter](const ResultSet::Callback &callback) -> bool { + return resultSetPtr->next([=](const QByteArray &uid, const Sink::EntityBuffer &buffer, Sink::Operation operation) { + if (mInitialQuery) { + // We're not interested in removals during the initial query + if ((operation != Sink::Operation_Removal) && filter(uid, buffer)) { + // In the initial set every entity is new + callback(uid, buffer, Sink::Operation_Creation); + } + } else { + // Always remove removals, they probably don't match due to non-available properties + if ((operation == Sink::Operation_Removal) || filter(uid, buffer)) { + // TODO only replay if this is in the currently visible set (or just always replay, worst case we have a couple to many results) + callback(uid, buffer, operation); + } + } + }); + }; + auto skip = [resultSetPtr]() { resultSetPtr->skip(1); }; + return ResultSet(generator, skip); +} + +ResultSet DataStoreQuery::postSortFilter(ResultSet &resultSet) +{ + return resultSet; +} + ResultSet DataStoreQuery::update(qint64 baseRevision) { SinkTrace() << "Executing query update"; + mInitialQuery = false; QSet remainingFilters; QByteArray remainingSorting; auto resultSet = loadIncrementalResultSet(baseRevision, remainingFilters); - auto filteredSet = filterAndSortSet(resultSet, getFilter(remainingFilters), false, remainingSorting); - return filteredSet; + auto filteredSet = filterAndSortSet(resultSet, getFilter(remainingFilters), remainingSorting); + return postSortFilter(filteredSet); } ResultSet DataStoreQuery::execute() { SinkTrace() << "Executing query"; + mInitialQuery = true; QSet remainingFilters; QByteArray remainingSorting; auto resultSet = loadInitialResultSet(remainingFilters, remainingSorting); - auto filteredSet = filterAndSortSet(resultSet, getFilter(remainingFilters), true, remainingSorting); - return filteredSet; + auto filteredSet = filterAndSortSet(resultSet, getFilter(remainingFilters), remainingSorting); + return postSortFilter(filteredSet); } diff --git a/common/datastorequery.h b/common/datastorequery.h index cf9d9e2..7712ac7 100644 --- a/common/datastorequery.h +++ b/common/datastorequery.h @@ -27,24 +27,29 @@ class DataStoreQuery { public: + typedef QSharedPointer Ptr; + DataStoreQuery(const Sink::Query &query, const QByteArray &type, Sink::Storage::Transaction &transaction, TypeIndex &typeIndex, std::function getProperty); ResultSet execute(); ResultSet update(qint64 baseRevision); -private: +protected: typedef std::function FilterFunction; typedef std::function BufferCallback; - QVariant getProperty(const Sink::Entity &entity, const QByteArray &property); + virtual QVariant getProperty(const Sink::Entity &entity, const QByteArray &property); + + virtual void readEntity(const QByteArray &key, const BufferCallback &resultCallback); - void readEntity(const QByteArray &key, const BufferCallback &resultCallback); + virtual ResultSet loadInitialResultSet(QSet &remainingFilters, QByteArray &remainingSorting); + virtual ResultSet loadIncrementalResultSet(qint64 baseRevision, QSet &remainingFilters); - ResultSet loadInitialResultSet(QSet &remainingFilters, QByteArray &remainingSorting); - ResultSet loadIncrementalResultSet(qint64 baseRevision, QSet &remainingFilters); + virtual ResultSet filterAndSortSet(ResultSet &resultSet, const FilterFunction &filter, const QByteArray &sortProperty); + virtual ResultSet postSortFilter(ResultSet &resultSet); + virtual FilterFunction getFilter(const QSet &remainingFilters); - ResultSet filterAndSortSet(ResultSet &resultSet, const FilterFunction &filter, bool initialQuery, const QByteArray &sortProperty); - FilterFunction getFilter(const QSet &remainingFilters); + ResultSet createFilteredSet(ResultSet &resultSet, const std::function &); Sink::Query mQuery; Sink::Storage::Transaction &mTransaction; @@ -52,6 +57,7 @@ private: TypeIndex &mTypeIndex; Sink::Storage::NamedDatabase mDb; std::function mGetProperty; + bool mInitialQuery; }; diff --git a/common/domain/applicationdomaintype.h b/common/domain/applicationdomaintype.h index 67f33c8..c853397 100644 --- a/common/domain/applicationdomaintype.h +++ b/common/domain/applicationdomaintype.h @@ -238,6 +238,8 @@ struct SINK_EXPORT Mail : public Entity { SINK_PROPERTY(bool, Draft, draft); SINK_PROPERTY(bool, Trash, trash); SINK_PROPERTY(bool, Sent, sent); + SINK_EXTRACTED_PROPERTY(QByteArray, MessageId, messageId); + SINK_EXTRACTED_PROPERTY(QByteArray, ParentMessageId, parentMessageId); }; /** diff --git a/common/domain/event.cpp b/common/domain/event.cpp index dfbcb61..118ffa3 100644 --- a/common/domain/event.cpp +++ b/common/domain/event.cpp @@ -52,11 +52,6 @@ static TypeIndex &getIndex() return *index; } -ResultSet TypeImplementation::queryIndexes(const Sink::Query &query, const QByteArray &resourceInstanceIdentifier, QSet &appliedFilters, QByteArray &appliedSorting, Sink::Storage::Transaction &transaction) -{ - return getIndex().query(query, appliedFilters, appliedSorting, transaction); -} - void TypeImplementation::index(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::Transaction &transaction) { return getIndex().add(identifier, bufferAdaptor, transaction); @@ -87,11 +82,10 @@ QSharedPointer::BufferBuilder> > T return propertyMapper; } -DataStoreQuery TypeImplementation::prepareQuery(const Sink::Query &query, Sink::Storage::Transaction &transaction) +DataStoreQuery::Ptr TypeImplementation::prepareQuery(const Sink::Query &query, Sink::Storage::Transaction &transaction) { - auto mapper = initializeReadPropertyMapper(); - return DataStoreQuery(query, ApplicationDomain::getTypeName(), transaction, getIndex(), [mapper](const Sink::Entity &entity, const QByteArray &property) { + return DataStoreQuery::Ptr::create(query, ApplicationDomain::getTypeName(), transaction, getIndex(), [mapper](const Sink::Entity &entity, const QByteArray &property) { const auto localBuffer = Sink::EntityBuffer::readBuffer(entity.local()); return mapper->getProperty(property, localBuffer); diff --git a/common/domain/event.h b/common/domain/event.h index 4ac572c..e1ca061 100644 --- a/common/domain/event.h +++ b/common/domain/event.h @@ -51,13 +51,7 @@ public: typedef Sink::ApplicationDomain::Buffer::Event Buffer; typedef Sink::ApplicationDomain::Buffer::EventBuilder BufferBuilder; static QSet indexedProperties(); - static DataStoreQuery prepareQuery(const Sink::Query &query, Sink::Storage::Transaction &transaction); - /** - * Returns the potential result set based on the indexes. - * - * An empty result set indicates that a full scan is required. - */ - static ResultSet queryIndexes(const Sink::Query &query, const QByteArray &resourceInstanceIdentifier, QSet &appliedFilters, QByteArray &appliedSorting, Sink::Storage::Transaction &transaction); + static DataStoreQuery::Ptr prepareQuery(const Sink::Query &query, Sink::Storage::Transaction &transaction); static void index(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::Transaction &transaction); static void removeIndex(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::Transaction &transaction); static QSharedPointer > initializeReadPropertyMapper(); diff --git a/common/domain/folder.cpp b/common/domain/folder.cpp index 6d487b1..17d9f13 100644 --- a/common/domain/folder.cpp +++ b/common/domain/folder.cpp @@ -55,11 +55,6 @@ static TypeIndex &getIndex() return *index; } -ResultSet TypeImplementation::queryIndexes(const Sink::Query &query, const QByteArray &resourceInstanceIdentifier, QSet &appliedFilters, QByteArray &appliedSorting, Sink::Storage::Transaction &transaction) -{ - return getIndex().query(query, appliedFilters, appliedSorting, transaction); -} - void TypeImplementation::index(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::Transaction &transaction) { SinkTrace() << "Indexing " << identifier; @@ -91,10 +86,10 @@ QSharedPointer::BufferBuilder> > return propertyMapper; } -DataStoreQuery TypeImplementation::prepareQuery(const Sink::Query &query, Sink::Storage::Transaction &transaction) +DataStoreQuery::Ptr TypeImplementation::prepareQuery(const Sink::Query &query, Sink::Storage::Transaction &transaction) { auto mapper = initializeReadPropertyMapper(); - return DataStoreQuery(query, ApplicationDomain::getTypeName(), transaction, getIndex(), [mapper](const Sink::Entity &entity, const QByteArray &property) { + return DataStoreQuery::Ptr::create(query, ApplicationDomain::getTypeName(), transaction, getIndex(), [mapper](const Sink::Entity &entity, const QByteArray &property) { const auto localBuffer = Sink::EntityBuffer::readBuffer(entity.local()); return mapper->getProperty(property, localBuffer); }); diff --git a/common/domain/folder.h b/common/domain/folder.h index 77edc8a..ff87006 100644 --- a/common/domain/folder.h +++ b/common/domain/folder.h @@ -45,9 +45,8 @@ class TypeImplementation { public: typedef Sink::ApplicationDomain::Buffer::Folder Buffer; typedef Sink::ApplicationDomain::Buffer::FolderBuilder BufferBuilder; - static DataStoreQuery prepareQuery(const Sink::Query &query, Sink::Storage::Transaction &transaction); + static DataStoreQuery::Ptr prepareQuery(const Sink::Query &query, Sink::Storage::Transaction &transaction); static QSet indexedProperties(); - static ResultSet queryIndexes(const Sink::Query &query, const QByteArray &resourceInstanceIdentifier, QSet &appliedFilters, QByteArray &appliedSorting, Sink::Storage::Transaction &transaction); static void index(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::Transaction &transaction); static void removeIndex(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::Transaction &transaction); static QSharedPointer > initializeReadPropertyMapper(); diff --git a/common/domain/mail.cpp b/common/domain/mail.cpp index bb5ad58..859ebef 100644 --- a/common/domain/mail.cpp +++ b/common/domain/mail.cpp @@ -41,6 +41,7 @@ SINK_DEBUG_AREA("mail"); static QMutex sMutex; +using namespace Sink; using namespace Sink::ApplicationDomain; static TypeIndex &getIndex() @@ -49,26 +50,54 @@ static TypeIndex &getIndex() static TypeIndex *index = 0; if (!index) { index = new TypeIndex("mail"); - index->addProperty("uid"); - index->addProperty("sender"); - index->addProperty("senderName"); - index->addProperty("subject"); - index->addProperty("date"); - index->addProperty("folder"); - index->addPropertyWithSorting("folder", "date"); + index->addProperty(Mail::Uid::name); + index->addProperty(Mail::Sender::name); + index->addProperty(Mail::SenderName::name); + index->addProperty(Mail::Subject::name); + index->addProperty(Mail::Date::name); + index->addProperty(Mail::Folder::name); + index->addPropertyWithSorting(Mail::Folder::name, Mail::Date::name); + index->addProperty(Mail::MessageId::name); + index->addProperty(Mail::ParentMessageId::name); } return *index; } -ResultSet TypeImplementation::queryIndexes(const Sink::Query &query, const QByteArray &resourceInstanceIdentifier, QSet &appliedFilters, QByteArray &appliedSorting, Sink::Storage::Transaction &transaction) +static void updateThreadingIndex(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::Transaction &transaction) { - return getIndex().query(query, appliedFilters, appliedSorting, transaction); + auto messageId = bufferAdaptor.getProperty(Mail::MessageId::name).toByteArray(); + auto parentMessageId = bufferAdaptor.getProperty(Mail::ParentMessageId::name).toByteArray(); + + Index msgIdIndex("msgId", transaction); + Index msgIdThreadIdIndex("msgIdThreadId", transaction); + + //Add the message to the index + Q_ASSERT(msgIdIndex.lookup(messageId).isEmpty()); + msgIdIndex.add(messageId, identifier); + + //If parent is already available, add to thread of parent + QByteArray thread; + if (!parentMessageId.isEmpty() && !msgIdIndex.lookup(parentMessageId).isEmpty()) { + thread = msgIdThreadIdIndex.lookup(parentMessageId); + msgIdThreadIdIndex.add(messageId, thread); + } else { + thread = QUuid::createUuid().toByteArray(); + if (!parentMessageId.isEmpty()) { + //Register parent with thread for when it becomes available + msgIdThreadIdIndex.add(parentMessageId, thread); + } + } + Q_ASSERT(!thread.isEmpty()); + msgIdThreadIdIndex.add(messageId, thread); + + //Look for parentMessageId and resolve to local id if available } void TypeImplementation::index(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::Transaction &transaction) { SinkTrace() << "Indexing " << identifier; getIndex().add(identifier, bufferAdaptor, transaction); + updateThreadingIndex(identifier, bufferAdaptor, transaction); } void TypeImplementation::removeIndex(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::Transaction &transaction) @@ -91,6 +120,8 @@ QSharedPointer::Buffer> > TypeImplem propertyMapper->addMapping(&Buffer::draft); propertyMapper->addMapping(&Buffer::trash); propertyMapper->addMapping(&Buffer::sent); + propertyMapper->addMapping(&Buffer::messageId); + propertyMapper->addMapping(&Buffer::parentMessageId); return propertyMapper; } @@ -110,16 +141,72 @@ QSharedPointer::BufferBuilder> > Ty propertyMapper->addMapping(&BufferBuilder::add_draft); propertyMapper->addMapping(&BufferBuilder::add_trash); propertyMapper->addMapping(&BufferBuilder::add_sent); + propertyMapper->addMapping(&BufferBuilder::add_messageId); + propertyMapper->addMapping(&BufferBuilder::add_parentMessageId); return propertyMapper; } -DataStoreQuery TypeImplementation::prepareQuery(const Sink::Query &query, Sink::Storage::Transaction &transaction) +class ThreadedDataStoreQuery : public DataStoreQuery +{ +public: + typedef QSharedPointer Ptr; + using DataStoreQuery::DataStoreQuery; + +protected: + ResultSet postSortFilter(ResultSet &resultSet) Q_DECL_OVERRIDE + { + auto query = mQuery; + if (query.threadLeaderOnly) { + auto rootCollection = QSharedPointer>::create(); + auto filter = [this, query, rootCollection](const QByteArray &uid, const Sink::EntityBuffer &entity) -> bool { + //TODO lookup thread + //if we got thread already in the result set compare dates and if newer replace + //else insert + + const auto messageId = getProperty(entity.entity(), ApplicationDomain::Mail::MessageId::name).toByteArray(); + + Index msgIdIndex("msgId", mTransaction); + Index msgIdThreadIdIndex("msgIdThreadId", mTransaction); + auto thread = msgIdThreadIdIndex.lookup(messageId); + SinkTrace() << "MsgId: " << messageId << " Thread: " << thread << getProperty(entity.entity(), ApplicationDomain::Mail::Date::name).toDateTime(); + + if (rootCollection->contains(thread)) { + auto date = rootCollection->value(thread); + //The mail we have in our result already is newer, so we can ignore this one + if (date > getProperty(entity.entity(), ApplicationDomain::Mail::Date::name).toDateTime()) { + return false; + } + qWarning() << "############################################################################"; + qWarning() << "Found a newer mail, remove the old one"; + qWarning() << "############################################################################"; + } + rootCollection->insert(thread, getProperty(entity.entity(), ApplicationDomain::Mail::Date::name).toDateTime()); + return true; + }; + return createFilteredSet(resultSet, filter); + } else { + return resultSet; + } + } +}; + +DataStoreQuery::Ptr TypeImplementation::prepareQuery(const Sink::Query &query, Sink::Storage::Transaction &transaction) { - auto mapper = initializeReadPropertyMapper(); - return DataStoreQuery(query, ApplicationDomain::getTypeName(), transaction, getIndex(), [mapper](const Sink::Entity &entity, const QByteArray &property) { + if (query.threadLeaderOnly) { + auto mapper = initializeReadPropertyMapper(); + return ThreadedDataStoreQuery::Ptr::create(query, ApplicationDomain::getTypeName(), transaction, getIndex(), [mapper](const Sink::Entity &entity, const QByteArray &property) { + + const auto localBuffer = Sink::EntityBuffer::readBuffer(entity.local()); + return mapper->getProperty(property, localBuffer); + }); - const auto localBuffer = Sink::EntityBuffer::readBuffer(entity.local()); - return mapper->getProperty(property, localBuffer); - }); + } else { + auto mapper = initializeReadPropertyMapper(); + return DataStoreQuery::Ptr::create(query, ApplicationDomain::getTypeName(), transaction, getIndex(), [mapper](const Sink::Entity &entity, const QByteArray &property) { + + const auto localBuffer = Sink::EntityBuffer::readBuffer(entity.local()); + return mapper->getProperty(property, localBuffer); + }); + } } diff --git a/common/domain/mail.fbs b/common/domain/mail.fbs index a0c0d82..f14e9f1 100644 --- a/common/domain/mail.fbs +++ b/common/domain/mail.fbs @@ -13,6 +13,8 @@ table Mail { draft:bool = false; trash:bool = false; sent:bool = false; + messageId:string; + parentMessageId:string; } root_type Mail; diff --git a/common/domain/mail.h b/common/domain/mail.h index d6af9c5..3b0e9da 100644 --- a/common/domain/mail.h +++ b/common/domain/mail.h @@ -45,14 +45,8 @@ class TypeImplementation { public: typedef Sink::ApplicationDomain::Buffer::Mail Buffer; typedef Sink::ApplicationDomain::Buffer::MailBuilder BufferBuilder; - static DataStoreQuery prepareQuery(const Sink::Query &query, Sink::Storage::Transaction &transaction); + static DataStoreQuery::Ptr prepareQuery(const Sink::Query &query, Sink::Storage::Transaction &transaction); static QSet indexedProperties(); - /** - * Returns the potential result set based on the indexes. - * - * An empty result set indicates that a full scan is required. - */ - static ResultSet queryIndexes(const Sink::Query &query, const QByteArray &resourceInstanceIdentifier, QSet &appliedFilters, QByteArray &appliedSorting, Sink::Storage::Transaction &transaction); static void index(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::Transaction &transaction); static void removeIndex(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::Transaction &transaction); static QSharedPointer > initializeReadPropertyMapper(); diff --git a/common/entityreader.cpp b/common/entityreader.cpp index faa154b..d86f4a9 100644 --- a/common/entityreader.cpp +++ b/common/entityreader.cpp @@ -150,27 +150,6 @@ void EntityReader::query(const Sink::Query &query, const std::functi }); } -/* template */ -/* void EntityReader::readEntity(const Sink::Storage::NamedDatabase &db, const QByteArray &key, */ -/* const std::function &resultCallback) */ -/* { */ -/* db.findLatest(key, */ -/* [=](const QByteArray &key, const QByteArray &value) -> bool { */ -/* Sink::EntityBuffer buffer(value.data(), value.size()); */ -/* const Sink::Entity &entity = buffer.entity(); */ -/* const auto metadataBuffer = Sink::EntityBuffer::readBuffer(entity.metadata()); */ -/* const qint64 revision = metadataBuffer ? metadataBuffer->revision() : -1; */ -/* const auto operation = metadataBuffer ? metadataBuffer->operation() : Sink::Operation_Creation; */ -/* auto adaptor = mDomainTypeAdaptorFactory.createAdaptor(entity); */ -/* Q_ASSERT(adaptor); */ -/* resultCallback(DomainType::Ptr::create(mResourceInstanceIdentifier, Sink::Storage::uidFromKey(key), revision, adaptor), operation); */ -/* return false; */ -/* }, */ -/* [&](const Sink::Storage::Error &error) { SinkWarning() << "Error during query: " << error.message << key; }); */ -/* } */ - - - template QPair EntityReader::executeInitialQuery(const Sink::Query &query, int offset, int batchsize, const std::function &callback) { @@ -178,7 +157,7 @@ QPair EntityReader::executeInitialQuery(const Sink:: time.start(); auto preparedQuery = ApplicationDomain::TypeImplementation::prepareQuery(query, mTransaction); - auto resultSet = preparedQuery.execute(); + auto resultSet = preparedQuery->execute(); SinkTrace() << "Filtered set retrieved. " << Log::TraceTime(time.elapsed()); auto replayedEntities = replaySet(resultSet, offset, batchsize, callback); @@ -195,7 +174,7 @@ QPair EntityReader::executeIncrementalQuery(const Si const qint64 baseRevision = lastRevision + 1; auto preparedQuery = ApplicationDomain::TypeImplementation::prepareQuery(query, mTransaction); - auto resultSet = preparedQuery.update(baseRevision); + auto resultSet = preparedQuery->update(baseRevision); SinkTrace() << "Filtered set retrieved. " << Log::TraceTime(time.elapsed()); auto replayedEntities = replaySet(resultSet, 0, 0, callback); diff --git a/common/mailpreprocessor.cpp b/common/mailpreprocessor.cpp index 2863ad4..0534338 100644 --- a/common/mailpreprocessor.cpp +++ b/common/mailpreprocessor.cpp @@ -36,48 +36,98 @@ QString MailPropertyExtractor::getFilePathFromMimeMessagePath(const QString &s) return s; } -void MailPropertyExtractor::updatedIndexedProperties(Sink::ApplicationDomain::Mail &mail) -{ - const auto mimeMessagePath = getFilePathFromMimeMessagePath(mail.getMimeMessagePath()); - if (mimeMessagePath.isNull()) { - SinkTrace() << "No mime message"; - return; - } - SinkTrace() << "Updating indexed properties " << mimeMessagePath; - QFile f(mimeMessagePath); - if (!f.open(QIODevice::ReadOnly)) { - SinkWarning() << "Failed to open the file: " << mimeMessagePath; - return; - } - if (!f.size()) { - SinkWarning() << "The file is empty."; - return; +struct MimeMessageReader { + MimeMessageReader(const QString &mimeMessagePath) + : f(mimeMessagePath), + mapped(0), + mappedSize(0) + { + if (mimeMessagePath.isNull()) { + SinkTrace() << "No mime message"; + return; + } + SinkTrace() << "Updating indexed properties " << mimeMessagePath; + if (!f.open(QIODevice::ReadOnly)) { + SinkWarning() << "Failed to open the file: " << mimeMessagePath; + return; + } + if (!f.size()) { + SinkWarning() << "The file is empty."; + return; + } + mappedSize = qMin((qint64)8000, f.size()); + mapped = f.map(0, mappedSize); + if (!mapped) { + SinkWarning() << "Failed to map the file: " << f.errorString(); + return; + } } - const auto mappedSize = qMin((qint64)8000, f.size()); - auto mapped = f.map(0, mappedSize); - if (!mapped) { - SinkWarning() << "Failed to map the file: " << f.errorString(); - return; + + KMime::Message::Ptr mimeMessage() + { + if (!mapped) { + return KMime::Message::Ptr(); + } + Q_ASSERT(mapped); + Q_ASSERT(mappedSize); + auto msg = KMime::Message::Ptr(new KMime::Message); + msg->setHead(KMime::CRLFtoLF(QByteArray::fromRawData(reinterpret_cast(mapped), mappedSize))); + msg->parse(); + return msg; } - KMime::Message *msg = new KMime::Message; - msg->setHead(KMime::CRLFtoLF(QByteArray::fromRawData(reinterpret_cast(mapped), mappedSize))); - msg->parse(); + QFile f; + uchar *mapped; + qint64 mappedSize; +}; +static void updatedIndexedProperties(Sink::ApplicationDomain::Mail &mail, KMime::Message::Ptr msg) +{ mail.setExtractedSubject(msg->subject(true)->asUnicodeString()); mail.setExtractedSender(msg->from(true)->asUnicodeString()); mail.setExtractedSenderName(msg->from(true)->asUnicodeString()); mail.setExtractedDate(msg->date(true)->dateTime()); + + //The rest should never change, unless we didn't have the headers available initially. + auto messageId = msg->messageID(true)->identifier(); + + //Ensure the mssageId is unique. + //If there already is one with the same id we'd have to assign a new message id, which probably doesn't make any sense. + + //The last is the parent + auto references = msg->references(true)->identifiers(); + + //The first is the parent + auto inReplyTo = msg->inReplyTo(true)->identifiers(); + QByteArray parentMessageId; + if (!references.isEmpty()) { + parentMessageId = references.last(); + //TODO we could use the rest of the references header to complete the ancestry in case we have missing parents. + } else { + if (!inReplyTo.isEmpty()) { + //According to RFC5256 we should ignore all but the first + parentMessageId = inReplyTo.first(); + } + } + + mail.setExtractedMessageId(messageId); + if (!parentMessageId.isEmpty()) { + mail.setExtractedParentMessageId(parentMessageId); + } } void MailPropertyExtractor::newEntity(Sink::ApplicationDomain::Mail &mail, Sink::Storage::Transaction &transaction) { - updatedIndexedProperties(mail); + MimeMessageReader mimeMessageReader(getFilePathFromMimeMessagePath(mail.getMimeMessagePath())); + auto msg = mimeMessageReader.mimeMessage(); + updatedIndexedProperties(mail, msg); } void MailPropertyExtractor::modifiedEntity(const Sink::ApplicationDomain::Mail &oldMail, Sink::ApplicationDomain::Mail &newMail,Sink::Storage::Transaction &transaction) { - updatedIndexedProperties(newMail); + MimeMessageReader mimeMessageReader(getFilePathFromMimeMessagePath(newMail.getMimeMessagePath())); + auto msg = mimeMessageReader.mimeMessage(); + updatedIndexedProperties(newMail, msg); } diff --git a/common/mailpreprocessor.h b/common/mailpreprocessor.h index 473931c..b7cd0e7 100644 --- a/common/mailpreprocessor.h +++ b/common/mailpreprocessor.h @@ -28,9 +28,6 @@ public: virtual void modifiedEntity(const Sink::ApplicationDomain::Mail &oldMail, Sink::ApplicationDomain::Mail &newMail,Sink::Storage::Transaction &transaction) Q_DECL_OVERRIDE; protected: virtual QString getFilePathFromMimeMessagePath(const QString &) const; - -private: - void updatedIndexedProperties(Sink::ApplicationDomain::Mail &mail); }; class SINK_EXPORT MimeMessageMover : public Sink::EntityPreprocessor diff --git a/common/query.h b/common/query.h index 68463cd..0808432 100644 --- a/common/query.h +++ b/common/query.h @@ -195,13 +195,13 @@ public: return *this; } - Query(const ApplicationDomain::Entity &value) : limit(0), liveQuery(false), synchronousQuery(false) + Query(const ApplicationDomain::Entity &value) : limit(0), liveQuery(false), synchronousQuery(false), threadLeaderOnly(false) { ids << value.identifier(); resources << value.resourceInstanceIdentifier(); } - Query(Flags flags = Flags()) : limit(0), liveQuery(false), synchronousQuery(false) + Query(Flags flags = Flags()) : limit(0), liveQuery(false), synchronousQuery(false), threadLeaderOnly(false) { } @@ -236,6 +236,7 @@ public: int limit; bool liveQuery; bool synchronousQuery; + bool threadLeaderOnly; }; } -- cgit v1.2.3