summaryrefslogtreecommitdiffstats
path: root/common
diff options
context:
space:
mode:
Diffstat (limited to 'common')
-rw-r--r--common/datastorequery.cpp55
-rw-r--r--common/datastorequery.h20
-rw-r--r--common/domain/applicationdomaintype.h2
-rw-r--r--common/domain/event.cpp10
-rw-r--r--common/domain/event.h8
-rw-r--r--common/domain/folder.cpp9
-rw-r--r--common/domain/folder.h3
-rw-r--r--common/domain/mail.cpp117
-rw-r--r--common/domain/mail.fbs2
-rw-r--r--common/domain/mail.h8
-rw-r--r--common/entityreader.cpp25
-rw-r--r--common/mailpreprocessor.cpp102
-rw-r--r--common/mailpreprocessor.h3
-rw-r--r--common/query.h5
14 files changed, 250 insertions, 119 deletions
diff --git a/common/datastorequery.cpp b/common/datastorequery.cpp
index 3237c53..cc070be 100644
--- a/common/datastorequery.cpp
+++ b/common/datastorequery.cpp
@@ -122,17 +122,17 @@ QVariant DataStoreQuery::getProperty(const Sink::Entity &entity, const QByteArra
122 return mGetProperty(entity, property); 122 return mGetProperty(entity, property);
123} 123}
124 124
125ResultSet DataStoreQuery::filterAndSortSet(ResultSet &resultSet, const FilterFunction &filter, bool initialQuery, const QByteArray &sortProperty) 125ResultSet DataStoreQuery::filterAndSortSet(ResultSet &resultSet, const FilterFunction &filter, const QByteArray &sortProperty)
126{ 126{
127 const bool sortingRequired = !sortProperty.isEmpty(); 127 const bool sortingRequired = !sortProperty.isEmpty();
128 if (initialQuery && sortingRequired) { 128 if (mInitialQuery && sortingRequired) {
129 SinkTrace() << "Sorting the resultset in memory according to property: " << sortProperty; 129 SinkTrace() << "Sorting the resultset in memory according to property: " << sortProperty;
130 // Sort the complete set by reading the sort property and filling into a sorted map 130 // Sort the complete set by reading the sort property and filling into a sorted map
131 auto sortedMap = QSharedPointer<QMap<QByteArray, QByteArray>>::create(); 131 auto sortedMap = QSharedPointer<QMap<QByteArray, QByteArray>>::create();
132 while (resultSet.next()) { 132 while (resultSet.next()) {
133 // readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess) 133 // readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess)
134 readEntity(resultSet.id(), 134 readEntity(resultSet.id(),
135 [this, filter, initialQuery, sortedMap, sortProperty, &resultSet](const QByteArray &uid, const Sink::EntityBuffer &buffer) { 135 [this, filter, sortedMap, sortProperty, &resultSet](const QByteArray &uid, const Sink::EntityBuffer &buffer) {
136 136
137 const auto operation = buffer.operation(); 137 const auto operation = buffer.operation();
138 138
@@ -154,10 +154,10 @@ ResultSet DataStoreQuery::filterAndSortSet(ResultSet &resultSet, const FilterFun
154 154
155 SinkTrace() << "Sorted " << sortedMap->size() << " values."; 155 SinkTrace() << "Sorted " << sortedMap->size() << " values.";
156 auto iterator = QSharedPointer<QMapIterator<QByteArray, QByteArray>>::create(*sortedMap); 156 auto iterator = QSharedPointer<QMapIterator<QByteArray, QByteArray>>::create(*sortedMap);
157 ResultSet::ValueGenerator generator = [this, iterator, sortedMap, filter, initialQuery]( 157 ResultSet::ValueGenerator generator = [this, iterator, sortedMap, filter](
158 std::function<void(const QByteArray &uid, const Sink::EntityBuffer &entity, Sink::Operation)> callback) -> bool { 158 std::function<void(const QByteArray &uid, const Sink::EntityBuffer &entity, Sink::Operation)> callback) -> bool {
159 if (iterator->hasNext()) { 159 if (iterator->hasNext()) {
160 readEntity(iterator->next().value(), [this, filter, callback, initialQuery](const QByteArray &uid, const Sink::EntityBuffer &buffer) { 160 readEntity(iterator->next().value(), [this, filter, callback](const QByteArray &uid, const Sink::EntityBuffer &buffer) {
161 callback(uid, buffer, Sink::Operation_Creation); 161 callback(uid, buffer, Sink::Operation_Creation);
162 }); 162 });
163 return true; 163 return true;
@@ -173,13 +173,13 @@ ResultSet DataStoreQuery::filterAndSortSet(ResultSet &resultSet, const FilterFun
173 return ResultSet(generator, skip); 173 return ResultSet(generator, skip);
174 } else { 174 } else {
175 auto resultSetPtr = QSharedPointer<ResultSet>::create(resultSet); 175 auto resultSetPtr = QSharedPointer<ResultSet>::create(resultSet);
176 ResultSet::ValueGenerator generator = [this, resultSetPtr, filter, initialQuery](const ResultSet::Callback &callback) -> bool { 176 ResultSet::ValueGenerator generator = [this, resultSetPtr, filter](const ResultSet::Callback &callback) -> bool {
177 if (resultSetPtr->next()) { 177 if (resultSetPtr->next()) {
178 SinkTrace() << "Reading the next value: " << resultSetPtr->id(); 178 SinkTrace() << "Reading the next value: " << resultSetPtr->id();
179 // readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess) 179 // readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess)
180 readEntity(resultSetPtr->id(), [this, filter, callback, initialQuery](const QByteArray &uid, const Sink::EntityBuffer &buffer) { 180 readEntity(resultSetPtr->id(), [this, filter, callback](const QByteArray &uid, const Sink::EntityBuffer &buffer) {
181 const auto operation = buffer.operation(); 181 const auto operation = buffer.operation();
182 if (initialQuery) { 182 if (mInitialQuery) {
183 // We're not interested in removals during the initial query 183 // We're not interested in removals during the initial query
184 if ((operation != Sink::Operation_Removal) && filter(uid, buffer)) { 184 if ((operation != Sink::Operation_Removal) && filter(uid, buffer)) {
185 // In the initial set every entity is new 185 // In the initial set every entity is new
@@ -225,22 +225,53 @@ DataStoreQuery::FilterFunction DataStoreQuery::getFilter(const QSet<QByteArray>
225 }; 225 };
226} 226}
227 227
228ResultSet DataStoreQuery::createFilteredSet(ResultSet &resultSet, const std::function<bool(const QByteArray &, const Sink::EntityBuffer &buffer)> &filter)
229{
230 auto resultSetPtr = QSharedPointer<ResultSet>::create(resultSet);
231 ResultSet::ValueGenerator generator = [this, resultSetPtr, filter](const ResultSet::Callback &callback) -> bool {
232 return resultSetPtr->next([=](const QByteArray &uid, const Sink::EntityBuffer &buffer, Sink::Operation operation) {
233 if (mInitialQuery) {
234 // We're not interested in removals during the initial query
235 if ((operation != Sink::Operation_Removal) && filter(uid, buffer)) {
236 // In the initial set every entity is new
237 callback(uid, buffer, Sink::Operation_Creation);
238 }
239 } else {
240 // Always remove removals, they probably don't match due to non-available properties
241 if ((operation == Sink::Operation_Removal) || filter(uid, buffer)) {
242 // TODO only replay if this is in the currently visible set (or just always replay, worst case we have a couple to many results)
243 callback(uid, buffer, operation);
244 }
245 }
246 });
247 };
248 auto skip = [resultSetPtr]() { resultSetPtr->skip(1); };
249 return ResultSet(generator, skip);
250}
251
252ResultSet DataStoreQuery::postSortFilter(ResultSet &resultSet)
253{
254 return resultSet;
255}
256
228ResultSet DataStoreQuery::update(qint64 baseRevision) 257ResultSet DataStoreQuery::update(qint64 baseRevision)
229{ 258{
230 SinkTrace() << "Executing query update"; 259 SinkTrace() << "Executing query update";
260 mInitialQuery = false;
231 QSet<QByteArray> remainingFilters; 261 QSet<QByteArray> remainingFilters;
232 QByteArray remainingSorting; 262 QByteArray remainingSorting;
233 auto resultSet = loadIncrementalResultSet(baseRevision, remainingFilters); 263 auto resultSet = loadIncrementalResultSet(baseRevision, remainingFilters);
234 auto filteredSet = filterAndSortSet(resultSet, getFilter(remainingFilters), false, remainingSorting); 264 auto filteredSet = filterAndSortSet(resultSet, getFilter(remainingFilters), remainingSorting);
235 return filteredSet; 265 return postSortFilter(filteredSet);
236} 266}
237 267
238ResultSet DataStoreQuery::execute() 268ResultSet DataStoreQuery::execute()
239{ 269{
240 SinkTrace() << "Executing query"; 270 SinkTrace() << "Executing query";
271 mInitialQuery = true;
241 QSet<QByteArray> remainingFilters; 272 QSet<QByteArray> remainingFilters;
242 QByteArray remainingSorting; 273 QByteArray remainingSorting;
243 auto resultSet = loadInitialResultSet(remainingFilters, remainingSorting); 274 auto resultSet = loadInitialResultSet(remainingFilters, remainingSorting);
244 auto filteredSet = filterAndSortSet(resultSet, getFilter(remainingFilters), true, remainingSorting); 275 auto filteredSet = filterAndSortSet(resultSet, getFilter(remainingFilters), remainingSorting);
245 return filteredSet; 276 return postSortFilter(filteredSet);
246} 277}
diff --git a/common/datastorequery.h b/common/datastorequery.h
index cf9d9e2..7712ac7 100644
--- a/common/datastorequery.h
+++ b/common/datastorequery.h
@@ -27,24 +27,29 @@
27 27
28class DataStoreQuery { 28class DataStoreQuery {
29public: 29public:
30 typedef QSharedPointer<DataStoreQuery> Ptr;
31
30 DataStoreQuery(const Sink::Query &query, const QByteArray &type, Sink::Storage::Transaction &transaction, TypeIndex &typeIndex, std::function<QVariant(const Sink::Entity &entity, const QByteArray &property)> getProperty); 32 DataStoreQuery(const Sink::Query &query, const QByteArray &type, Sink::Storage::Transaction &transaction, TypeIndex &typeIndex, std::function<QVariant(const Sink::Entity &entity, const QByteArray &property)> getProperty);
31 ResultSet execute(); 33 ResultSet execute();
32 ResultSet update(qint64 baseRevision); 34 ResultSet update(qint64 baseRevision);
33 35
34private: 36protected:
35 37
36 typedef std::function<bool(const QByteArray &uid, const Sink::EntityBuffer &entityBuffer)> FilterFunction; 38 typedef std::function<bool(const QByteArray &uid, const Sink::EntityBuffer &entityBuffer)> FilterFunction;
37 typedef std::function<void(const QByteArray &uid, const Sink::EntityBuffer &entityBuffer)> BufferCallback; 39 typedef std::function<void(const QByteArray &uid, const Sink::EntityBuffer &entityBuffer)> BufferCallback;
38 40
39 QVariant getProperty(const Sink::Entity &entity, const QByteArray &property); 41 virtual QVariant getProperty(const Sink::Entity &entity, const QByteArray &property);
42
43 virtual void readEntity(const QByteArray &key, const BufferCallback &resultCallback);
40 44
41 void readEntity(const QByteArray &key, const BufferCallback &resultCallback); 45 virtual ResultSet loadInitialResultSet(QSet<QByteArray> &remainingFilters, QByteArray &remainingSorting);
46 virtual ResultSet loadIncrementalResultSet(qint64 baseRevision, QSet<QByteArray> &remainingFilters);
42 47
43 ResultSet loadInitialResultSet(QSet<QByteArray> &remainingFilters, QByteArray &remainingSorting); 48 virtual ResultSet filterAndSortSet(ResultSet &resultSet, const FilterFunction &filter, const QByteArray &sortProperty);
44 ResultSet loadIncrementalResultSet(qint64 baseRevision, QSet<QByteArray> &remainingFilters); 49 virtual ResultSet postSortFilter(ResultSet &resultSet);
50 virtual FilterFunction getFilter(const QSet<QByteArray> &remainingFilters);
45 51
46 ResultSet filterAndSortSet(ResultSet &resultSet, const FilterFunction &filter, bool initialQuery, const QByteArray &sortProperty); 52 ResultSet createFilteredSet(ResultSet &resultSet, const std::function<bool(const QByteArray &, const Sink::EntityBuffer &buffer)> &);
47 FilterFunction getFilter(const QSet<QByteArray> &remainingFilters);
48 53
49 Sink::Query mQuery; 54 Sink::Query mQuery;
50 Sink::Storage::Transaction &mTransaction; 55 Sink::Storage::Transaction &mTransaction;
@@ -52,6 +57,7 @@ private:
52 TypeIndex &mTypeIndex; 57 TypeIndex &mTypeIndex;
53 Sink::Storage::NamedDatabase mDb; 58 Sink::Storage::NamedDatabase mDb;
54 std::function<QVariant(const Sink::Entity &entity, const QByteArray &property)> mGetProperty; 59 std::function<QVariant(const Sink::Entity &entity, const QByteArray &property)> mGetProperty;
60 bool mInitialQuery;
55}; 61};
56 62
57 63
diff --git a/common/domain/applicationdomaintype.h b/common/domain/applicationdomaintype.h
index 67f33c8..c853397 100644
--- a/common/domain/applicationdomaintype.h
+++ b/common/domain/applicationdomaintype.h
@@ -238,6 +238,8 @@ struct SINK_EXPORT Mail : public Entity {
238 SINK_PROPERTY(bool, Draft, draft); 238 SINK_PROPERTY(bool, Draft, draft);
239 SINK_PROPERTY(bool, Trash, trash); 239 SINK_PROPERTY(bool, Trash, trash);
240 SINK_PROPERTY(bool, Sent, sent); 240 SINK_PROPERTY(bool, Sent, sent);
241 SINK_EXTRACTED_PROPERTY(QByteArray, MessageId, messageId);
242 SINK_EXTRACTED_PROPERTY(QByteArray, ParentMessageId, parentMessageId);
241}; 243};
242 244
243/** 245/**
diff --git a/common/domain/event.cpp b/common/domain/event.cpp
index dfbcb61..118ffa3 100644
--- a/common/domain/event.cpp
+++ b/common/domain/event.cpp
@@ -52,11 +52,6 @@ static TypeIndex &getIndex()
52 return *index; 52 return *index;
53} 53}
54 54
55ResultSet TypeImplementation<Event>::queryIndexes(const Sink::Query &query, const QByteArray &resourceInstanceIdentifier, QSet<QByteArray> &appliedFilters, QByteArray &appliedSorting, Sink::Storage::Transaction &transaction)
56{
57 return getIndex().query(query, appliedFilters, appliedSorting, transaction);
58}
59
60void TypeImplementation<Event>::index(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::Transaction &transaction) 55void TypeImplementation<Event>::index(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::Transaction &transaction)
61{ 56{
62 return getIndex().add(identifier, bufferAdaptor, transaction); 57 return getIndex().add(identifier, bufferAdaptor, transaction);
@@ -87,11 +82,10 @@ QSharedPointer<WritePropertyMapper<TypeImplementation<Event>::BufferBuilder> > T
87 return propertyMapper; 82 return propertyMapper;
88} 83}
89 84
90DataStoreQuery TypeImplementation<Event>::prepareQuery(const Sink::Query &query, Sink::Storage::Transaction &transaction) 85DataStoreQuery::Ptr TypeImplementation<Event>::prepareQuery(const Sink::Query &query, Sink::Storage::Transaction &transaction)
91{ 86{
92
93 auto mapper = initializeReadPropertyMapper(); 87 auto mapper = initializeReadPropertyMapper();
94 return DataStoreQuery(query, ApplicationDomain::getTypeName<Event>(), transaction, getIndex(), [mapper](const Sink::Entity &entity, const QByteArray &property) { 88 return DataStoreQuery::Ptr::create(query, ApplicationDomain::getTypeName<Event>(), transaction, getIndex(), [mapper](const Sink::Entity &entity, const QByteArray &property) {
95 89
96 const auto localBuffer = Sink::EntityBuffer::readBuffer<Buffer>(entity.local()); 90 const auto localBuffer = Sink::EntityBuffer::readBuffer<Buffer>(entity.local());
97 return mapper->getProperty(property, localBuffer); 91 return mapper->getProperty(property, localBuffer);
diff --git a/common/domain/event.h b/common/domain/event.h
index 4ac572c..e1ca061 100644
--- a/common/domain/event.h
+++ b/common/domain/event.h
@@ -51,13 +51,7 @@ public:
51 typedef Sink::ApplicationDomain::Buffer::Event Buffer; 51 typedef Sink::ApplicationDomain::Buffer::Event Buffer;
52 typedef Sink::ApplicationDomain::Buffer::EventBuilder BufferBuilder; 52 typedef Sink::ApplicationDomain::Buffer::EventBuilder BufferBuilder;
53 static QSet<QByteArray> indexedProperties(); 53 static QSet<QByteArray> indexedProperties();
54 static DataStoreQuery prepareQuery(const Sink::Query &query, Sink::Storage::Transaction &transaction); 54 static DataStoreQuery::Ptr prepareQuery(const Sink::Query &query, Sink::Storage::Transaction &transaction);
55 /**
56 * Returns the potential result set based on the indexes.
57 *
58 * An empty result set indicates that a full scan is required.
59 */
60 static ResultSet queryIndexes(const Sink::Query &query, const QByteArray &resourceInstanceIdentifier, QSet<QByteArray> &appliedFilters, QByteArray &appliedSorting, Sink::Storage::Transaction &transaction);
61 static void index(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::Transaction &transaction); 55 static void index(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::Transaction &transaction);
62 static void removeIndex(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::Transaction &transaction); 56 static void removeIndex(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::Transaction &transaction);
63 static QSharedPointer<ReadPropertyMapper<Buffer> > initializeReadPropertyMapper(); 57 static QSharedPointer<ReadPropertyMapper<Buffer> > initializeReadPropertyMapper();
diff --git a/common/domain/folder.cpp b/common/domain/folder.cpp
index 6d487b1..17d9f13 100644
--- a/common/domain/folder.cpp
+++ b/common/domain/folder.cpp
@@ -55,11 +55,6 @@ static TypeIndex &getIndex()
55 return *index; 55 return *index;
56} 56}
57 57
58ResultSet TypeImplementation<Folder>::queryIndexes(const Sink::Query &query, const QByteArray &resourceInstanceIdentifier, QSet<QByteArray> &appliedFilters, QByteArray &appliedSorting, Sink::Storage::Transaction &transaction)
59{
60 return getIndex().query(query, appliedFilters, appliedSorting, transaction);
61}
62
63void TypeImplementation<Folder>::index(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::Transaction &transaction) 58void TypeImplementation<Folder>::index(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::Transaction &transaction)
64{ 59{
65 SinkTrace() << "Indexing " << identifier; 60 SinkTrace() << "Indexing " << identifier;
@@ -91,10 +86,10 @@ QSharedPointer<WritePropertyMapper<TypeImplementation<Folder>::BufferBuilder> >
91 return propertyMapper; 86 return propertyMapper;
92} 87}
93 88
94DataStoreQuery TypeImplementation<Folder>::prepareQuery(const Sink::Query &query, Sink::Storage::Transaction &transaction) 89DataStoreQuery::Ptr TypeImplementation<Folder>::prepareQuery(const Sink::Query &query, Sink::Storage::Transaction &transaction)
95{ 90{
96 auto mapper = initializeReadPropertyMapper(); 91 auto mapper = initializeReadPropertyMapper();
97 return DataStoreQuery(query, ApplicationDomain::getTypeName<Folder>(), transaction, getIndex(), [mapper](const Sink::Entity &entity, const QByteArray &property) { 92 return DataStoreQuery::Ptr::create(query, ApplicationDomain::getTypeName<Folder>(), transaction, getIndex(), [mapper](const Sink::Entity &entity, const QByteArray &property) {
98 const auto localBuffer = Sink::EntityBuffer::readBuffer<Buffer>(entity.local()); 93 const auto localBuffer = Sink::EntityBuffer::readBuffer<Buffer>(entity.local());
99 return mapper->getProperty(property, localBuffer); 94 return mapper->getProperty(property, localBuffer);
100 }); 95 });
diff --git a/common/domain/folder.h b/common/domain/folder.h
index 77edc8a..ff87006 100644
--- a/common/domain/folder.h
+++ b/common/domain/folder.h
@@ -45,9 +45,8 @@ class TypeImplementation<Sink::ApplicationDomain::Folder> {
45public: 45public:
46 typedef Sink::ApplicationDomain::Buffer::Folder Buffer; 46 typedef Sink::ApplicationDomain::Buffer::Folder Buffer;
47 typedef Sink::ApplicationDomain::Buffer::FolderBuilder BufferBuilder; 47 typedef Sink::ApplicationDomain::Buffer::FolderBuilder BufferBuilder;
48 static DataStoreQuery prepareQuery(const Sink::Query &query, Sink::Storage::Transaction &transaction); 48 static DataStoreQuery::Ptr prepareQuery(const Sink::Query &query, Sink::Storage::Transaction &transaction);
49 static QSet<QByteArray> indexedProperties(); 49 static QSet<QByteArray> indexedProperties();
50 static ResultSet queryIndexes(const Sink::Query &query, const QByteArray &resourceInstanceIdentifier, QSet<QByteArray> &appliedFilters, QByteArray &appliedSorting, Sink::Storage::Transaction &transaction);
51 static void index(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::Transaction &transaction); 50 static void index(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::Transaction &transaction);
52 static void removeIndex(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::Transaction &transaction); 51 static void removeIndex(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::Transaction &transaction);
53 static QSharedPointer<ReadPropertyMapper<Buffer> > initializeReadPropertyMapper(); 52 static QSharedPointer<ReadPropertyMapper<Buffer> > initializeReadPropertyMapper();
diff --git a/common/domain/mail.cpp b/common/domain/mail.cpp
index bb5ad58..859ebef 100644
--- a/common/domain/mail.cpp
+++ b/common/domain/mail.cpp
@@ -41,6 +41,7 @@ SINK_DEBUG_AREA("mail");
41 41
42static QMutex sMutex; 42static QMutex sMutex;
43 43
44using namespace Sink;
44using namespace Sink::ApplicationDomain; 45using namespace Sink::ApplicationDomain;
45 46
46static TypeIndex &getIndex() 47static TypeIndex &getIndex()
@@ -49,26 +50,54 @@ static TypeIndex &getIndex()
49 static TypeIndex *index = 0; 50 static TypeIndex *index = 0;
50 if (!index) { 51 if (!index) {
51 index = new TypeIndex("mail"); 52 index = new TypeIndex("mail");
52 index->addProperty<QByteArray>("uid"); 53 index->addProperty<QByteArray>(Mail::Uid::name);
53 index->addProperty<QByteArray>("sender"); 54 index->addProperty<QByteArray>(Mail::Sender::name);
54 index->addProperty<QByteArray>("senderName"); 55 index->addProperty<QByteArray>(Mail::SenderName::name);
55 index->addProperty<QString>("subject"); 56 index->addProperty<QString>(Mail::Subject::name);
56 index->addProperty<QDateTime>("date"); 57 index->addProperty<QDateTime>(Mail::Date::name);
57 index->addProperty<QByteArray>("folder"); 58 index->addProperty<QByteArray>(Mail::Folder::name);
58 index->addPropertyWithSorting<QByteArray, QDateTime>("folder", "date"); 59 index->addPropertyWithSorting<QByteArray, QDateTime>(Mail::Folder::name, Mail::Date::name);
60 index->addProperty<QByteArray>(Mail::MessageId::name);
61 index->addProperty<QByteArray>(Mail::ParentMessageId::name);
59 } 62 }
60 return *index; 63 return *index;
61} 64}
62 65
63ResultSet TypeImplementation<Mail>::queryIndexes(const Sink::Query &query, const QByteArray &resourceInstanceIdentifier, QSet<QByteArray> &appliedFilters, QByteArray &appliedSorting, Sink::Storage::Transaction &transaction) 66static void updateThreadingIndex(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::Transaction &transaction)
64{ 67{
65 return getIndex().query(query, appliedFilters, appliedSorting, transaction); 68 auto messageId = bufferAdaptor.getProperty(Mail::MessageId::name).toByteArray();
69 auto parentMessageId = bufferAdaptor.getProperty(Mail::ParentMessageId::name).toByteArray();
70
71 Index msgIdIndex("msgId", transaction);
72 Index msgIdThreadIdIndex("msgIdThreadId", transaction);
73
74 //Add the message to the index
75 Q_ASSERT(msgIdIndex.lookup(messageId).isEmpty());
76 msgIdIndex.add(messageId, identifier);
77
78 //If parent is already available, add to thread of parent
79 QByteArray thread;
80 if (!parentMessageId.isEmpty() && !msgIdIndex.lookup(parentMessageId).isEmpty()) {
81 thread = msgIdThreadIdIndex.lookup(parentMessageId);
82 msgIdThreadIdIndex.add(messageId, thread);
83 } else {
84 thread = QUuid::createUuid().toByteArray();
85 if (!parentMessageId.isEmpty()) {
86 //Register parent with thread for when it becomes available
87 msgIdThreadIdIndex.add(parentMessageId, thread);
88 }
89 }
90 Q_ASSERT(!thread.isEmpty());
91 msgIdThreadIdIndex.add(messageId, thread);
92
93 //Look for parentMessageId and resolve to local id if available
66} 94}
67 95
68void TypeImplementation<Mail>::index(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::Transaction &transaction) 96void TypeImplementation<Mail>::index(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::Transaction &transaction)
69{ 97{
70 SinkTrace() << "Indexing " << identifier; 98 SinkTrace() << "Indexing " << identifier;
71 getIndex().add(identifier, bufferAdaptor, transaction); 99 getIndex().add(identifier, bufferAdaptor, transaction);
100 updateThreadingIndex(identifier, bufferAdaptor, transaction);
72} 101}
73 102
74void TypeImplementation<Mail>::removeIndex(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::Transaction &transaction) 103void TypeImplementation<Mail>::removeIndex(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::Transaction &transaction)
@@ -91,6 +120,8 @@ QSharedPointer<ReadPropertyMapper<TypeImplementation<Mail>::Buffer> > TypeImplem
91 propertyMapper->addMapping<Mail::Draft, Buffer>(&Buffer::draft); 120 propertyMapper->addMapping<Mail::Draft, Buffer>(&Buffer::draft);
92 propertyMapper->addMapping<Mail::Trash, Buffer>(&Buffer::trash); 121 propertyMapper->addMapping<Mail::Trash, Buffer>(&Buffer::trash);
93 propertyMapper->addMapping<Mail::Sent, Buffer>(&Buffer::sent); 122 propertyMapper->addMapping<Mail::Sent, Buffer>(&Buffer::sent);
123 propertyMapper->addMapping<Mail::MessageId, Buffer>(&Buffer::messageId);
124 propertyMapper->addMapping<Mail::ParentMessageId, Buffer>(&Buffer::parentMessageId);
94 return propertyMapper; 125 return propertyMapper;
95} 126}
96 127
@@ -110,16 +141,72 @@ QSharedPointer<WritePropertyMapper<TypeImplementation<Mail>::BufferBuilder> > Ty
110 propertyMapper->addMapping<Mail::Draft>(&BufferBuilder::add_draft); 141 propertyMapper->addMapping<Mail::Draft>(&BufferBuilder::add_draft);
111 propertyMapper->addMapping<Mail::Trash>(&BufferBuilder::add_trash); 142 propertyMapper->addMapping<Mail::Trash>(&BufferBuilder::add_trash);
112 propertyMapper->addMapping<Mail::Sent>(&BufferBuilder::add_sent); 143 propertyMapper->addMapping<Mail::Sent>(&BufferBuilder::add_sent);
144 propertyMapper->addMapping<Mail::MessageId>(&BufferBuilder::add_messageId);
145 propertyMapper->addMapping<Mail::ParentMessageId>(&BufferBuilder::add_parentMessageId);
113 return propertyMapper; 146 return propertyMapper;
114} 147}
115 148
116DataStoreQuery TypeImplementation<Mail>::prepareQuery(const Sink::Query &query, Sink::Storage::Transaction &transaction) 149class ThreadedDataStoreQuery : public DataStoreQuery
150{
151public:
152 typedef QSharedPointer<ThreadedDataStoreQuery> Ptr;
153 using DataStoreQuery::DataStoreQuery;
154
155protected:
156 ResultSet postSortFilter(ResultSet &resultSet) Q_DECL_OVERRIDE
157 {
158 auto query = mQuery;
159 if (query.threadLeaderOnly) {
160 auto rootCollection = QSharedPointer<QMap<QByteArray, QDateTime>>::create();
161 auto filter = [this, query, rootCollection](const QByteArray &uid, const Sink::EntityBuffer &entity) -> bool {
162 //TODO lookup thread
163 //if we got thread already in the result set compare dates and if newer replace
164 //else insert
165
166 const auto messageId = getProperty(entity.entity(), ApplicationDomain::Mail::MessageId::name).toByteArray();
167
168 Index msgIdIndex("msgId", mTransaction);
169 Index msgIdThreadIdIndex("msgIdThreadId", mTransaction);
170 auto thread = msgIdThreadIdIndex.lookup(messageId);
171 SinkTrace() << "MsgId: " << messageId << " Thread: " << thread << getProperty(entity.entity(), ApplicationDomain::Mail::Date::name).toDateTime();
172
173 if (rootCollection->contains(thread)) {
174 auto date = rootCollection->value(thread);
175 //The mail we have in our result already is newer, so we can ignore this one
176 if (date > getProperty(entity.entity(), ApplicationDomain::Mail::Date::name).toDateTime()) {
177 return false;
178 }
179 qWarning() << "############################################################################";
180 qWarning() << "Found a newer mail, remove the old one";
181 qWarning() << "############################################################################";
182 }
183 rootCollection->insert(thread, getProperty(entity.entity(), ApplicationDomain::Mail::Date::name).toDateTime());
184 return true;
185 };
186 return createFilteredSet(resultSet, filter);
187 } else {
188 return resultSet;
189 }
190 }
191};
192
193DataStoreQuery::Ptr TypeImplementation<Mail>::prepareQuery(const Sink::Query &query, Sink::Storage::Transaction &transaction)
117{ 194{
118 auto mapper = initializeReadPropertyMapper(); 195 if (query.threadLeaderOnly) {
119 return DataStoreQuery(query, ApplicationDomain::getTypeName<Mail>(), transaction, getIndex(), [mapper](const Sink::Entity &entity, const QByteArray &property) { 196 auto mapper = initializeReadPropertyMapper();
197 return ThreadedDataStoreQuery::Ptr::create(query, ApplicationDomain::getTypeName<Mail>(), transaction, getIndex(), [mapper](const Sink::Entity &entity, const QByteArray &property) {
198
199 const auto localBuffer = Sink::EntityBuffer::readBuffer<Buffer>(entity.local());
200 return mapper->getProperty(property, localBuffer);
201 });
120 202
121 const auto localBuffer = Sink::EntityBuffer::readBuffer<Buffer>(entity.local()); 203 } else {
122 return mapper->getProperty(property, localBuffer); 204 auto mapper = initializeReadPropertyMapper();
123 }); 205 return DataStoreQuery::Ptr::create(query, ApplicationDomain::getTypeName<Mail>(), transaction, getIndex(), [mapper](const Sink::Entity &entity, const QByteArray &property) {
206
207 const auto localBuffer = Sink::EntityBuffer::readBuffer<Buffer>(entity.local());
208 return mapper->getProperty(property, localBuffer);
209 });
210 }
124} 211}
125 212
diff --git a/common/domain/mail.fbs b/common/domain/mail.fbs
index a0c0d82..f14e9f1 100644
--- a/common/domain/mail.fbs
+++ b/common/domain/mail.fbs
@@ -13,6 +13,8 @@ table Mail {
13 draft:bool = false; 13 draft:bool = false;
14 trash:bool = false; 14 trash:bool = false;
15 sent:bool = false; 15 sent:bool = false;
16 messageId:string;
17 parentMessageId:string;
16} 18}
17 19
18root_type Mail; 20root_type Mail;
diff --git a/common/domain/mail.h b/common/domain/mail.h
index d6af9c5..3b0e9da 100644
--- a/common/domain/mail.h
+++ b/common/domain/mail.h
@@ -45,14 +45,8 @@ class TypeImplementation<Sink::ApplicationDomain::Mail> {
45public: 45public:
46 typedef Sink::ApplicationDomain::Buffer::Mail Buffer; 46 typedef Sink::ApplicationDomain::Buffer::Mail Buffer;
47 typedef Sink::ApplicationDomain::Buffer::MailBuilder BufferBuilder; 47 typedef Sink::ApplicationDomain::Buffer::MailBuilder BufferBuilder;
48 static DataStoreQuery prepareQuery(const Sink::Query &query, Sink::Storage::Transaction &transaction); 48 static DataStoreQuery::Ptr prepareQuery(const Sink::Query &query, Sink::Storage::Transaction &transaction);
49 static QSet<QByteArray> indexedProperties(); 49 static QSet<QByteArray> indexedProperties();
50 /**
51 * Returns the potential result set based on the indexes.
52 *
53 * An empty result set indicates that a full scan is required.
54 */
55 static ResultSet queryIndexes(const Sink::Query &query, const QByteArray &resourceInstanceIdentifier, QSet<QByteArray> &appliedFilters, QByteArray &appliedSorting, Sink::Storage::Transaction &transaction);
56 static void index(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::Transaction &transaction); 50 static void index(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::Transaction &transaction);
57 static void removeIndex(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::Transaction &transaction); 51 static void removeIndex(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::Transaction &transaction);
58 static QSharedPointer<ReadPropertyMapper<Buffer> > initializeReadPropertyMapper(); 52 static QSharedPointer<ReadPropertyMapper<Buffer> > initializeReadPropertyMapper();
diff --git a/common/entityreader.cpp b/common/entityreader.cpp
index faa154b..d86f4a9 100644
--- a/common/entityreader.cpp
+++ b/common/entityreader.cpp
@@ -150,27 +150,6 @@ void EntityReader<DomainType>::query(const Sink::Query &query, const std::functi
150 }); 150 });
151} 151}
152 152
153/* template <class DomainType> */
154/* void EntityReader<DomainType>::readEntity(const Sink::Storage::NamedDatabase &db, const QByteArray &key, */
155/* const std::function<void(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &, Sink::Operation)> &resultCallback) */
156/* { */
157/* db.findLatest(key, */
158/* [=](const QByteArray &key, const QByteArray &value) -> bool { */
159/* Sink::EntityBuffer buffer(value.data(), value.size()); */
160/* const Sink::Entity &entity = buffer.entity(); */
161/* const auto metadataBuffer = Sink::EntityBuffer::readBuffer<Sink::Metadata>(entity.metadata()); */
162/* const qint64 revision = metadataBuffer ? metadataBuffer->revision() : -1; */
163/* const auto operation = metadataBuffer ? metadataBuffer->operation() : Sink::Operation_Creation; */
164/* auto adaptor = mDomainTypeAdaptorFactory.createAdaptor(entity); */
165/* Q_ASSERT(adaptor); */
166/* resultCallback(DomainType::Ptr::create(mResourceInstanceIdentifier, Sink::Storage::uidFromKey(key), revision, adaptor), operation); */
167/* return false; */
168/* }, */
169/* [&](const Sink::Storage::Error &error) { SinkWarning() << "Error during query: " << error.message << key; }); */
170/* } */
171
172
173
174template <class DomainType> 153template <class DomainType>
175QPair<qint64, qint64> EntityReader<DomainType>::executeInitialQuery(const Sink::Query &query, int offset, int batchsize, const std::function<bool(const typename DomainType::Ptr &value, Sink::Operation operation)> &callback) 154QPair<qint64, qint64> EntityReader<DomainType>::executeInitialQuery(const Sink::Query &query, int offset, int batchsize, const std::function<bool(const typename DomainType::Ptr &value, Sink::Operation operation)> &callback)
176{ 155{
@@ -178,7 +157,7 @@ QPair<qint64, qint64> EntityReader<DomainType>::executeInitialQuery(const Sink::
178 time.start(); 157 time.start();
179 158
180 auto preparedQuery = ApplicationDomain::TypeImplementation<DomainType>::prepareQuery(query, mTransaction); 159 auto preparedQuery = ApplicationDomain::TypeImplementation<DomainType>::prepareQuery(query, mTransaction);
181 auto resultSet = preparedQuery.execute(); 160 auto resultSet = preparedQuery->execute();
182 161
183 SinkTrace() << "Filtered set retrieved. " << Log::TraceTime(time.elapsed()); 162 SinkTrace() << "Filtered set retrieved. " << Log::TraceTime(time.elapsed());
184 auto replayedEntities = replaySet(resultSet, offset, batchsize, callback); 163 auto replayedEntities = replaySet(resultSet, offset, batchsize, callback);
@@ -195,7 +174,7 @@ QPair<qint64, qint64> EntityReader<DomainType>::executeIncrementalQuery(const Si
195 const qint64 baseRevision = lastRevision + 1; 174 const qint64 baseRevision = lastRevision + 1;
196 175
197 auto preparedQuery = ApplicationDomain::TypeImplementation<DomainType>::prepareQuery(query, mTransaction); 176 auto preparedQuery = ApplicationDomain::TypeImplementation<DomainType>::prepareQuery(query, mTransaction);
198 auto resultSet = preparedQuery.update(baseRevision); 177 auto resultSet = preparedQuery->update(baseRevision);
199 178
200 SinkTrace() << "Filtered set retrieved. " << Log::TraceTime(time.elapsed()); 179 SinkTrace() << "Filtered set retrieved. " << Log::TraceTime(time.elapsed());
201 auto replayedEntities = replaySet(resultSet, 0, 0, callback); 180 auto replayedEntities = replaySet(resultSet, 0, 0, callback);
diff --git a/common/mailpreprocessor.cpp b/common/mailpreprocessor.cpp
index 2863ad4..0534338 100644
--- a/common/mailpreprocessor.cpp
+++ b/common/mailpreprocessor.cpp
@@ -36,48 +36,98 @@ QString MailPropertyExtractor::getFilePathFromMimeMessagePath(const QString &s)
36 return s; 36 return s;
37} 37}
38 38
39void MailPropertyExtractor::updatedIndexedProperties(Sink::ApplicationDomain::Mail &mail) 39struct MimeMessageReader {
40{ 40 MimeMessageReader(const QString &mimeMessagePath)
41 const auto mimeMessagePath = getFilePathFromMimeMessagePath(mail.getMimeMessagePath()); 41 : f(mimeMessagePath),
42 if (mimeMessagePath.isNull()) { 42 mapped(0),
43 SinkTrace() << "No mime message"; 43 mappedSize(0)
44 return; 44 {
45 } 45 if (mimeMessagePath.isNull()) {
46 SinkTrace() << "Updating indexed properties " << mimeMessagePath; 46 SinkTrace() << "No mime message";
47 QFile f(mimeMessagePath); 47 return;
48 if (!f.open(QIODevice::ReadOnly)) { 48 }
49 SinkWarning() << "Failed to open the file: " << mimeMessagePath; 49 SinkTrace() << "Updating indexed properties " << mimeMessagePath;
50 return; 50 if (!f.open(QIODevice::ReadOnly)) {
51 } 51 SinkWarning() << "Failed to open the file: " << mimeMessagePath;
52 if (!f.size()) { 52 return;
53 SinkWarning() << "The file is empty."; 53 }
54 return; 54 if (!f.size()) {
55 SinkWarning() << "The file is empty.";
56 return;
57 }
58 mappedSize = qMin((qint64)8000, f.size());
59 mapped = f.map(0, mappedSize);
60 if (!mapped) {
61 SinkWarning() << "Failed to map the file: " << f.errorString();
62 return;
63 }
55 } 64 }
56 const auto mappedSize = qMin((qint64)8000, f.size()); 65
57 auto mapped = f.map(0, mappedSize); 66 KMime::Message::Ptr mimeMessage()
58 if (!mapped) { 67 {
59 SinkWarning() << "Failed to map the file: " << f.errorString(); 68 if (!mapped) {
60 return; 69 return KMime::Message::Ptr();
70 }
71 Q_ASSERT(mapped);
72 Q_ASSERT(mappedSize);
73 auto msg = KMime::Message::Ptr(new KMime::Message);
74 msg->setHead(KMime::CRLFtoLF(QByteArray::fromRawData(reinterpret_cast<const char*>(mapped), mappedSize)));
75 msg->parse();
76 return msg;
61 } 77 }
62 78
63 KMime::Message *msg = new KMime::Message; 79 QFile f;
64 msg->setHead(KMime::CRLFtoLF(QByteArray::fromRawData(reinterpret_cast<const char*>(mapped), mappedSize))); 80 uchar *mapped;
65 msg->parse(); 81 qint64 mappedSize;
82};
66 83
84static void updatedIndexedProperties(Sink::ApplicationDomain::Mail &mail, KMime::Message::Ptr msg)
85{
67 mail.setExtractedSubject(msg->subject(true)->asUnicodeString()); 86 mail.setExtractedSubject(msg->subject(true)->asUnicodeString());
68 mail.setExtractedSender(msg->from(true)->asUnicodeString()); 87 mail.setExtractedSender(msg->from(true)->asUnicodeString());
69 mail.setExtractedSenderName(msg->from(true)->asUnicodeString()); 88 mail.setExtractedSenderName(msg->from(true)->asUnicodeString());
70 mail.setExtractedDate(msg->date(true)->dateTime()); 89 mail.setExtractedDate(msg->date(true)->dateTime());
90
91 //The rest should never change, unless we didn't have the headers available initially.
92 auto messageId = msg->messageID(true)->identifier();
93
94 //Ensure the mssageId is unique.
95 //If there already is one with the same id we'd have to assign a new message id, which probably doesn't make any sense.
96
97 //The last is the parent
98 auto references = msg->references(true)->identifiers();
99
100 //The first is the parent
101 auto inReplyTo = msg->inReplyTo(true)->identifiers();
102 QByteArray parentMessageId;
103 if (!references.isEmpty()) {
104 parentMessageId = references.last();
105 //TODO we could use the rest of the references header to complete the ancestry in case we have missing parents.
106 } else {
107 if (!inReplyTo.isEmpty()) {
108 //According to RFC5256 we should ignore all but the first
109 parentMessageId = inReplyTo.first();
110 }
111 }
112
113 mail.setExtractedMessageId(messageId);
114 if (!parentMessageId.isEmpty()) {
115 mail.setExtractedParentMessageId(parentMessageId);
116 }
71} 117}
72 118
73void MailPropertyExtractor::newEntity(Sink::ApplicationDomain::Mail &mail, Sink::Storage::Transaction &transaction) 119void MailPropertyExtractor::newEntity(Sink::ApplicationDomain::Mail &mail, Sink::Storage::Transaction &transaction)
74{ 120{
75 updatedIndexedProperties(mail); 121 MimeMessageReader mimeMessageReader(getFilePathFromMimeMessagePath(mail.getMimeMessagePath()));
122 auto msg = mimeMessageReader.mimeMessage();
123 updatedIndexedProperties(mail, msg);
76} 124}
77 125
78void MailPropertyExtractor::modifiedEntity(const Sink::ApplicationDomain::Mail &oldMail, Sink::ApplicationDomain::Mail &newMail,Sink::Storage::Transaction &transaction) 126void MailPropertyExtractor::modifiedEntity(const Sink::ApplicationDomain::Mail &oldMail, Sink::ApplicationDomain::Mail &newMail,Sink::Storage::Transaction &transaction)
79{ 127{
80 updatedIndexedProperties(newMail); 128 MimeMessageReader mimeMessageReader(getFilePathFromMimeMessagePath(newMail.getMimeMessagePath()));
129 auto msg = mimeMessageReader.mimeMessage();
130 updatedIndexedProperties(newMail, msg);
81} 131}
82 132
83 133
diff --git a/common/mailpreprocessor.h b/common/mailpreprocessor.h
index 473931c..b7cd0e7 100644
--- a/common/mailpreprocessor.h
+++ b/common/mailpreprocessor.h
@@ -28,9 +28,6 @@ public:
28 virtual void modifiedEntity(const Sink::ApplicationDomain::Mail &oldMail, Sink::ApplicationDomain::Mail &newMail,Sink::Storage::Transaction &transaction) Q_DECL_OVERRIDE; 28 virtual void modifiedEntity(const Sink::ApplicationDomain::Mail &oldMail, Sink::ApplicationDomain::Mail &newMail,Sink::Storage::Transaction &transaction) Q_DECL_OVERRIDE;
29protected: 29protected:
30 virtual QString getFilePathFromMimeMessagePath(const QString &) const; 30 virtual QString getFilePathFromMimeMessagePath(const QString &) const;
31
32private:
33 void updatedIndexedProperties(Sink::ApplicationDomain::Mail &mail);
34}; 31};
35 32
36class SINK_EXPORT MimeMessageMover : public Sink::EntityPreprocessor<Sink::ApplicationDomain::Mail> 33class SINK_EXPORT MimeMessageMover : public Sink::EntityPreprocessor<Sink::ApplicationDomain::Mail>
diff --git a/common/query.h b/common/query.h
index 68463cd..0808432 100644
--- a/common/query.h
+++ b/common/query.h
@@ -195,13 +195,13 @@ public:
195 return *this; 195 return *this;
196 } 196 }
197 197
198 Query(const ApplicationDomain::Entity &value) : limit(0), liveQuery(false), synchronousQuery(false) 198 Query(const ApplicationDomain::Entity &value) : limit(0), liveQuery(false), synchronousQuery(false), threadLeaderOnly(false)
199 { 199 {
200 ids << value.identifier(); 200 ids << value.identifier();
201 resources << value.resourceInstanceIdentifier(); 201 resources << value.resourceInstanceIdentifier();
202 } 202 }
203 203
204 Query(Flags flags = Flags()) : limit(0), liveQuery(false), synchronousQuery(false) 204 Query(Flags flags = Flags()) : limit(0), liveQuery(false), synchronousQuery(false), threadLeaderOnly(false)
205 { 205 {
206 } 206 }
207 207
@@ -236,6 +236,7 @@ public:
236 int limit; 236 int limit;
237 bool liveQuery; 237 bool liveQuery;
238 bool synchronousQuery; 238 bool synchronousQuery;
239 bool threadLeaderOnly;
239}; 240};
240} 241}
241 242