diff options
-rw-r--r-- | common/CMakeLists.txt | 1 | ||||
-rw-r--r-- | common/datastorequery.cpp | 246 | ||||
-rw-r--r-- | common/datastorequery.h | 59 | ||||
-rw-r--r-- | common/domain/event.cpp | 13 | ||||
-rw-r--r-- | common/domain/event.h | 2 | ||||
-rw-r--r-- | common/domain/folder.cpp | 11 | ||||
-rw-r--r-- | common/domain/folder.h | 2 | ||||
-rw-r--r-- | common/domain/mail.cpp | 13 | ||||
-rw-r--r-- | common/domain/mail.h | 2 | ||||
-rw-r--r-- | common/entitybuffer.cpp | 14 | ||||
-rw-r--r-- | common/entitybuffer.h | 6 | ||||
-rw-r--r-- | common/entityreader.cpp | 250 | ||||
-rw-r--r-- | common/entityreader.h | 11 | ||||
-rw-r--r-- | common/resultset.cpp | 6 | ||||
-rw-r--r-- | common/resultset.h | 7 |
15 files changed, 409 insertions, 234 deletions
diff --git a/common/CMakeLists.txt b/common/CMakeLists.txt index 5eb15ba..0fc8d81 100644 --- a/common/CMakeLists.txt +++ b/common/CMakeLists.txt | |||
@@ -75,6 +75,7 @@ set(command_SRCS | |||
75 | entityreader.cpp | 75 | entityreader.cpp |
76 | mailpreprocessor.cpp | 76 | mailpreprocessor.cpp |
77 | specialpurposepreprocessor.cpp | 77 | specialpurposepreprocessor.cpp |
78 | datastorequery.cpp | ||
78 | ${storage_SRCS}) | 79 | ${storage_SRCS}) |
79 | 80 | ||
80 | add_library(${PROJECT_NAME} SHARED ${command_SRCS}) | 81 | add_library(${PROJECT_NAME} SHARED ${command_SRCS}) |
diff --git a/common/datastorequery.cpp b/common/datastorequery.cpp new file mode 100644 index 0000000..3237c53 --- /dev/null +++ b/common/datastorequery.cpp | |||
@@ -0,0 +1,246 @@ | |||
1 | /* | ||
2 | * Copyright (C) 2016 Christian Mollekopf <chrigi_1@fastmail.fm> | ||
3 | * | ||
4 | * This program is free software; you can redistribute it and/or modify | ||
5 | * it under the terms of the GNU General Public License as published by | ||
6 | * the Free Software Foundation; either version 2 of the License, or | ||
7 | * (at your option) any later version. | ||
8 | * | ||
9 | * This program is distributed in the hope that it will be useful, | ||
10 | * but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
11 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||
12 | * GNU General Public License for more details. | ||
13 | * | ||
14 | * You should have received a copy of the GNU General Public License | ||
15 | * along with this program; if not, write to the | ||
16 | * Free Software Foundation, Inc., | ||
17 | * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. | ||
18 | */ | ||
19 | #include "datastorequery.h" | ||
20 | |||
21 | #include "log.h" | ||
22 | #include "entitybuffer.h" | ||
23 | #include "entity_generated.h" | ||
24 | |||
25 | using namespace Sink; | ||
26 | |||
27 | |||
28 | SINK_DEBUG_AREA("datastorequery") | ||
29 | |||
30 | DataStoreQuery::DataStoreQuery(const Sink::Query &query, const QByteArray &type, Sink::Storage::Transaction &transaction, TypeIndex &typeIndex, std::function<QVariant(const Sink::Entity &entity, const QByteArray &property)> getProperty) | ||
31 | : mQuery(query), mTransaction(transaction), mType(type), mTypeIndex(typeIndex), mDb(Storage::mainDatabase(mTransaction, mType)), mGetProperty(getProperty) | ||
32 | { | ||
33 | |||
34 | } | ||
35 | |||
36 | static inline ResultSet fullScan(const Sink::Storage::Transaction &transaction, const QByteArray &bufferType) | ||
37 | { | ||
38 | // TODO use a result set with an iterator, to read values on demand | ||
39 | SinkTrace() << "Looking for : " << bufferType; | ||
40 | //The scan can return duplicate results if we have multiple revisions, so we use a set to deduplicate. | ||
41 | QSet<QByteArray> keys; | ||
42 | Storage::mainDatabase(transaction, bufferType) | ||
43 | .scan(QByteArray(), | ||
44 | [&](const QByteArray &key, const QByteArray &value) -> bool { | ||
45 | if (keys.contains(Sink::Storage::uidFromKey(key))) { | ||
46 | //Not something that should persist if the replay works, so we keep a message for now. | ||
47 | SinkTrace() << "Multiple revisions for key: " << key; | ||
48 | } | ||
49 | keys << Sink::Storage::uidFromKey(key); | ||
50 | return true; | ||
51 | }, | ||
52 | [](const Sink::Storage::Error &error) { SinkWarning() << "Error during query: " << error.message; }); | ||
53 | |||
54 | SinkTrace() << "Full scan retrieved " << keys.size() << " results."; | ||
55 | return ResultSet(keys.toList().toVector()); | ||
56 | } | ||
57 | |||
58 | ResultSet DataStoreQuery::loadInitialResultSet(QSet<QByteArray> &remainingFilters, QByteArray &remainingSorting) | ||
59 | { | ||
60 | if (!mQuery.ids.isEmpty()) { | ||
61 | return ResultSet(mQuery.ids.toVector()); | ||
62 | } | ||
63 | QSet<QByteArray> appliedFilters; | ||
64 | QByteArray appliedSorting; | ||
65 | |||
66 | auto resultSet = mTypeIndex.query(mQuery, appliedFilters, appliedSorting, mTransaction); | ||
67 | |||
68 | remainingFilters = mQuery.propertyFilter.keys().toSet() - appliedFilters; | ||
69 | if (appliedSorting.isEmpty()) { | ||
70 | remainingSorting = mQuery.sortProperty; | ||
71 | } | ||
72 | |||
73 | // We do a full scan if there were no indexes available to create the initial set. | ||
74 | if (appliedFilters.isEmpty()) { | ||
75 | // TODO this should be replaced by an index lookup as well | ||
76 | resultSet = fullScan(mTransaction, mType); | ||
77 | } | ||
78 | return resultSet; | ||
79 | } | ||
80 | |||
81 | ResultSet DataStoreQuery::loadIncrementalResultSet(qint64 baseRevision, QSet<QByteArray> &remainingFilters) | ||
82 | { | ||
83 | const auto bufferType = mType; | ||
84 | auto revisionCounter = QSharedPointer<qint64>::create(baseRevision); | ||
85 | remainingFilters = mQuery.propertyFilter.keys().toSet(); | ||
86 | return ResultSet([this, bufferType, revisionCounter]() -> QByteArray { | ||
87 | const qint64 topRevision = Sink::Storage::maxRevision(mTransaction); | ||
88 | // Spit out the revision keys one by one. | ||
89 | while (*revisionCounter <= topRevision) { | ||
90 | const auto uid = Sink::Storage::getUidFromRevision(mTransaction, *revisionCounter); | ||
91 | const auto type = Sink::Storage::getTypeFromRevision(mTransaction, *revisionCounter); | ||
92 | // SinkTrace() << "Revision" << *revisionCounter << type << uid; | ||
93 | Q_ASSERT(!uid.isEmpty()); | ||
94 | Q_ASSERT(!type.isEmpty()); | ||
95 | if (type != bufferType) { | ||
96 | // Skip revision | ||
97 | *revisionCounter += 1; | ||
98 | continue; | ||
99 | } | ||
100 | const auto key = Sink::Storage::assembleKey(uid, *revisionCounter); | ||
101 | *revisionCounter += 1; | ||
102 | return key; | ||
103 | } | ||
104 | SinkTrace() << "Finished reading incremental result set:" << *revisionCounter; | ||
105 | // We're done | ||
106 | return QByteArray(); | ||
107 | }); | ||
108 | } | ||
109 | |||
110 | void DataStoreQuery::readEntity(const QByteArray &key, const BufferCallback &resultCallback) | ||
111 | { | ||
112 | mDb.findLatest(key, | ||
113 | [=](const QByteArray &key, const QByteArray &value) -> bool { | ||
114 | resultCallback(Sink::Storage::uidFromKey(key), Sink::EntityBuffer(value.data(), value.size())); | ||
115 | return false; | ||
116 | }, | ||
117 | [&](const Sink::Storage::Error &error) { SinkWarning() << "Error during query: " << error.message << key; }); | ||
118 | } | ||
119 | |||
120 | QVariant DataStoreQuery::getProperty(const Sink::Entity &entity, const QByteArray &property) | ||
121 | { | ||
122 | return mGetProperty(entity, property); | ||
123 | } | ||
124 | |||
125 | ResultSet DataStoreQuery::filterAndSortSet(ResultSet &resultSet, const FilterFunction &filter, bool initialQuery, const QByteArray &sortProperty) | ||
126 | { | ||
127 | const bool sortingRequired = !sortProperty.isEmpty(); | ||
128 | if (initialQuery && sortingRequired) { | ||
129 | SinkTrace() << "Sorting the resultset in memory according to property: " << sortProperty; | ||
130 | // Sort the complete set by reading the sort property and filling into a sorted map | ||
131 | auto sortedMap = QSharedPointer<QMap<QByteArray, QByteArray>>::create(); | ||
132 | while (resultSet.next()) { | ||
133 | // readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess) | ||
134 | readEntity(resultSet.id(), | ||
135 | [this, filter, initialQuery, sortedMap, sortProperty, &resultSet](const QByteArray &uid, const Sink::EntityBuffer &buffer) { | ||
136 | |||
137 | const auto operation = buffer.operation(); | ||
138 | |||
139 | // We're not interested in removals during the initial query | ||
140 | if ((operation != Sink::Operation_Removal) && filter(uid, buffer)) { | ||
141 | if (!sortProperty.isEmpty()) { | ||
142 | const auto sortValue = getProperty(buffer.entity(), sortProperty); | ||
143 | if (sortValue.type() == QVariant::DateTime) { | ||
144 | sortedMap->insert(QByteArray::number(std::numeric_limits<unsigned int>::max() - sortValue.toDateTime().toTime_t()), uid); | ||
145 | } else { | ||
146 | sortedMap->insert(sortValue.toString().toLatin1(), uid); | ||
147 | } | ||
148 | } else { | ||
149 | sortedMap->insert(uid, uid); | ||
150 | } | ||
151 | } | ||
152 | }); | ||
153 | } | ||
154 | |||
155 | SinkTrace() << "Sorted " << sortedMap->size() << " values."; | ||
156 | auto iterator = QSharedPointer<QMapIterator<QByteArray, QByteArray>>::create(*sortedMap); | ||
157 | ResultSet::ValueGenerator generator = [this, iterator, sortedMap, filter, initialQuery]( | ||
158 | std::function<void(const QByteArray &uid, const Sink::EntityBuffer &entity, Sink::Operation)> callback) -> bool { | ||
159 | if (iterator->hasNext()) { | ||
160 | readEntity(iterator->next().value(), [this, filter, callback, initialQuery](const QByteArray &uid, const Sink::EntityBuffer &buffer) { | ||
161 | callback(uid, buffer, Sink::Operation_Creation); | ||
162 | }); | ||
163 | return true; | ||
164 | } | ||
165 | return false; | ||
166 | }; | ||
167 | |||
168 | auto skip = [iterator]() { | ||
169 | if (iterator->hasNext()) { | ||
170 | iterator->next(); | ||
171 | } | ||
172 | }; | ||
173 | return ResultSet(generator, skip); | ||
174 | } else { | ||
175 | auto resultSetPtr = QSharedPointer<ResultSet>::create(resultSet); | ||
176 | ResultSet::ValueGenerator generator = [this, resultSetPtr, filter, initialQuery](const ResultSet::Callback &callback) -> bool { | ||
177 | if (resultSetPtr->next()) { | ||
178 | SinkTrace() << "Reading the next value: " << resultSetPtr->id(); | ||
179 | // readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess) | ||
180 | readEntity(resultSetPtr->id(), [this, filter, callback, initialQuery](const QByteArray &uid, const Sink::EntityBuffer &buffer) { | ||
181 | const auto operation = buffer.operation(); | ||
182 | if (initialQuery) { | ||
183 | // We're not interested in removals during the initial query | ||
184 | if ((operation != Sink::Operation_Removal) && filter(uid, buffer)) { | ||
185 | // In the initial set every entity is new | ||
186 | callback(uid, buffer, Sink::Operation_Creation); | ||
187 | } | ||
188 | } else { | ||
189 | // Always remove removals, they probably don't match due to non-available properties | ||
190 | if ((operation == Sink::Operation_Removal) || filter(uid, buffer)) { | ||
191 | // TODO only replay if this is in the currently visible set (or just always replay, worst case we have a couple to many results) | ||
192 | callback(uid, buffer, operation); | ||
193 | } | ||
194 | } | ||
195 | }); | ||
196 | return true; | ||
197 | } | ||
198 | return false; | ||
199 | }; | ||
200 | auto skip = [resultSetPtr]() { resultSetPtr->skip(1); }; | ||
201 | return ResultSet(generator, skip); | ||
202 | } | ||
203 | } | ||
204 | |||
205 | |||
206 | DataStoreQuery::FilterFunction DataStoreQuery::getFilter(const QSet<QByteArray> &remainingFilters) | ||
207 | { | ||
208 | auto query = mQuery; | ||
209 | return [this, remainingFilters, query](const QByteArray &uid, const Sink::EntityBuffer &entity) -> bool { | ||
210 | if (!query.ids.isEmpty()) { | ||
211 | if (!query.ids.contains(uid)) { | ||
212 | SinkTrace() << "Filter by uid: " << uid; | ||
213 | return false; | ||
214 | } | ||
215 | } | ||
216 | for (const auto &filterProperty : remainingFilters) { | ||
217 | const auto property = getProperty(entity.entity(), filterProperty); | ||
218 | const auto comparator = query.propertyFilter.value(filterProperty); | ||
219 | if (!comparator.matches(property)) { | ||
220 | SinkTrace() << "Filtering entity due to property mismatch on filter: " << filterProperty << property << ":" << comparator.value; | ||
221 | return false; | ||
222 | } | ||
223 | } | ||
224 | return true; | ||
225 | }; | ||
226 | } | ||
227 | |||
228 | ResultSet DataStoreQuery::update(qint64 baseRevision) | ||
229 | { | ||
230 | SinkTrace() << "Executing query update"; | ||
231 | QSet<QByteArray> remainingFilters; | ||
232 | QByteArray remainingSorting; | ||
233 | auto resultSet = loadIncrementalResultSet(baseRevision, remainingFilters); | ||
234 | auto filteredSet = filterAndSortSet(resultSet, getFilter(remainingFilters), false, remainingSorting); | ||
235 | return filteredSet; | ||
236 | } | ||
237 | |||
238 | ResultSet DataStoreQuery::execute() | ||
239 | { | ||
240 | SinkTrace() << "Executing query"; | ||
241 | QSet<QByteArray> remainingFilters; | ||
242 | QByteArray remainingSorting; | ||
243 | auto resultSet = loadInitialResultSet(remainingFilters, remainingSorting); | ||
244 | auto filteredSet = filterAndSortSet(resultSet, getFilter(remainingFilters), true, remainingSorting); | ||
245 | return filteredSet; | ||
246 | } | ||
diff --git a/common/datastorequery.h b/common/datastorequery.h new file mode 100644 index 0000000..cf9d9e2 --- /dev/null +++ b/common/datastorequery.h | |||
@@ -0,0 +1,59 @@ | |||
1 | /* | ||
2 | * Copyright (C) 2016 Christian Mollekopf <chrigi_1@fastmail.fm> | ||
3 | * | ||
4 | * This program is free software; you can redistribute it and/or modify | ||
5 | * it under the terms of the GNU General Public License as published by | ||
6 | * the Free Software Foundation; either version 2 of the License, or | ||
7 | * (at your option) any later version. | ||
8 | * | ||
9 | * This program is distributed in the hope that it will be useful, | ||
10 | * but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
11 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||
12 | * GNU General Public License for more details. | ||
13 | * | ||
14 | * You should have received a copy of the GNU General Public License | ||
15 | * along with this program; if not, write to the | ||
16 | * Free Software Foundation, Inc., | ||
17 | * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. | ||
18 | */ | ||
19 | #pragma once | ||
20 | |||
21 | #include "query.h" | ||
22 | #include "storage.h" | ||
23 | #include "resultset.h" | ||
24 | #include "typeindex.h" | ||
25 | #include "query.h" | ||
26 | #include "entitybuffer.h" | ||
27 | |||
28 | class DataStoreQuery { | ||
29 | public: | ||
30 | DataStoreQuery(const Sink::Query &query, const QByteArray &type, Sink::Storage::Transaction &transaction, TypeIndex &typeIndex, std::function<QVariant(const Sink::Entity &entity, const QByteArray &property)> getProperty); | ||
31 | ResultSet execute(); | ||
32 | ResultSet update(qint64 baseRevision); | ||
33 | |||
34 | private: | ||
35 | |||
36 | typedef std::function<bool(const QByteArray &uid, const Sink::EntityBuffer &entityBuffer)> FilterFunction; | ||
37 | typedef std::function<void(const QByteArray &uid, const Sink::EntityBuffer &entityBuffer)> BufferCallback; | ||
38 | |||
39 | QVariant getProperty(const Sink::Entity &entity, const QByteArray &property); | ||
40 | |||
41 | void readEntity(const QByteArray &key, const BufferCallback &resultCallback); | ||
42 | |||
43 | ResultSet loadInitialResultSet(QSet<QByteArray> &remainingFilters, QByteArray &remainingSorting); | ||
44 | ResultSet loadIncrementalResultSet(qint64 baseRevision, QSet<QByteArray> &remainingFilters); | ||
45 | |||
46 | ResultSet filterAndSortSet(ResultSet &resultSet, const FilterFunction &filter, bool initialQuery, const QByteArray &sortProperty); | ||
47 | FilterFunction getFilter(const QSet<QByteArray> &remainingFilters); | ||
48 | |||
49 | Sink::Query mQuery; | ||
50 | Sink::Storage::Transaction &mTransaction; | ||
51 | const QByteArray mType; | ||
52 | TypeIndex &mTypeIndex; | ||
53 | Sink::Storage::NamedDatabase mDb; | ||
54 | std::function<QVariant(const Sink::Entity &entity, const QByteArray &property)> mGetProperty; | ||
55 | }; | ||
56 | |||
57 | |||
58 | |||
59 | |||
diff --git a/common/domain/event.cpp b/common/domain/event.cpp index 0909bf1..dfbcb61 100644 --- a/common/domain/event.cpp +++ b/common/domain/event.cpp | |||
@@ -32,6 +32,8 @@ | |||
32 | #include "../query.h" | 32 | #include "../query.h" |
33 | #include "../definitions.h" | 33 | #include "../definitions.h" |
34 | #include "../typeindex.h" | 34 | #include "../typeindex.h" |
35 | #include "entitybuffer.h" | ||
36 | #include "entity_generated.h" | ||
35 | 37 | ||
36 | #include "event_generated.h" | 38 | #include "event_generated.h" |
37 | 39 | ||
@@ -84,3 +86,14 @@ QSharedPointer<WritePropertyMapper<TypeImplementation<Event>::BufferBuilder> > T | |||
84 | propertyMapper->addMapping<Event::Attachment>(&BufferBuilder::add_attachment); | 86 | propertyMapper->addMapping<Event::Attachment>(&BufferBuilder::add_attachment); |
85 | return propertyMapper; | 87 | return propertyMapper; |
86 | } | 88 | } |
89 | |||
90 | DataStoreQuery TypeImplementation<Event>::prepareQuery(const Sink::Query &query, Sink::Storage::Transaction &transaction) | ||
91 | { | ||
92 | |||
93 | auto mapper = initializeReadPropertyMapper(); | ||
94 | return DataStoreQuery(query, ApplicationDomain::getTypeName<Event>(), transaction, getIndex(), [mapper](const Sink::Entity &entity, const QByteArray &property) { | ||
95 | |||
96 | const auto localBuffer = Sink::EntityBuffer::readBuffer<Buffer>(entity.local()); | ||
97 | return mapper->getProperty(property, localBuffer); | ||
98 | }); | ||
99 | } | ||
diff --git a/common/domain/event.h b/common/domain/event.h index 5315566..4ac572c 100644 --- a/common/domain/event.h +++ b/common/domain/event.h | |||
@@ -21,6 +21,7 @@ | |||
21 | #include "applicationdomaintype.h" | 21 | #include "applicationdomaintype.h" |
22 | 22 | ||
23 | #include "storage.h" | 23 | #include "storage.h" |
24 | #include "datastorequery.h" | ||
24 | 25 | ||
25 | class ResultSet; | 26 | class ResultSet; |
26 | class QByteArray; | 27 | class QByteArray; |
@@ -50,6 +51,7 @@ public: | |||
50 | typedef Sink::ApplicationDomain::Buffer::Event Buffer; | 51 | typedef Sink::ApplicationDomain::Buffer::Event Buffer; |
51 | typedef Sink::ApplicationDomain::Buffer::EventBuilder BufferBuilder; | 52 | typedef Sink::ApplicationDomain::Buffer::EventBuilder BufferBuilder; |
52 | static QSet<QByteArray> indexedProperties(); | 53 | static QSet<QByteArray> indexedProperties(); |
54 | static DataStoreQuery prepareQuery(const Sink::Query &query, Sink::Storage::Transaction &transaction); | ||
53 | /** | 55 | /** |
54 | * Returns the potential result set based on the indexes. | 56 | * Returns the potential result set based on the indexes. |
55 | * | 57 | * |
diff --git a/common/domain/folder.cpp b/common/domain/folder.cpp index ddb0c10..6d487b1 100644 --- a/common/domain/folder.cpp +++ b/common/domain/folder.cpp | |||
@@ -32,6 +32,8 @@ | |||
32 | #include "../query.h" | 32 | #include "../query.h" |
33 | #include "../definitions.h" | 33 | #include "../definitions.h" |
34 | #include "../typeindex.h" | 34 | #include "../typeindex.h" |
35 | #include "entitybuffer.h" | ||
36 | #include "entity_generated.h" | ||
35 | 37 | ||
36 | #include "folder_generated.h" | 38 | #include "folder_generated.h" |
37 | 39 | ||
@@ -88,3 +90,12 @@ QSharedPointer<WritePropertyMapper<TypeImplementation<Folder>::BufferBuilder> > | |||
88 | propertyMapper->addMapping<Folder::SpecialPurpose>(&BufferBuilder::add_specialpurpose); | 90 | propertyMapper->addMapping<Folder::SpecialPurpose>(&BufferBuilder::add_specialpurpose); |
89 | return propertyMapper; | 91 | return propertyMapper; |
90 | } | 92 | } |
93 | |||
94 | DataStoreQuery TypeImplementation<Folder>::prepareQuery(const Sink::Query &query, Sink::Storage::Transaction &transaction) | ||
95 | { | ||
96 | auto mapper = initializeReadPropertyMapper(); | ||
97 | return DataStoreQuery(query, ApplicationDomain::getTypeName<Folder>(), transaction, getIndex(), [mapper](const Sink::Entity &entity, const QByteArray &property) { | ||
98 | const auto localBuffer = Sink::EntityBuffer::readBuffer<Buffer>(entity.local()); | ||
99 | return mapper->getProperty(property, localBuffer); | ||
100 | }); | ||
101 | } | ||
diff --git a/common/domain/folder.h b/common/domain/folder.h index 6e066e1..77edc8a 100644 --- a/common/domain/folder.h +++ b/common/domain/folder.h | |||
@@ -21,6 +21,7 @@ | |||
21 | #include "applicationdomaintype.h" | 21 | #include "applicationdomaintype.h" |
22 | 22 | ||
23 | #include "storage.h" | 23 | #include "storage.h" |
24 | #include "datastorequery.h" | ||
24 | 25 | ||
25 | class ResultSet; | 26 | class ResultSet; |
26 | class QByteArray; | 27 | class QByteArray; |
@@ -44,6 +45,7 @@ class TypeImplementation<Sink::ApplicationDomain::Folder> { | |||
44 | public: | 45 | public: |
45 | typedef Sink::ApplicationDomain::Buffer::Folder Buffer; | 46 | typedef Sink::ApplicationDomain::Buffer::Folder Buffer; |
46 | typedef Sink::ApplicationDomain::Buffer::FolderBuilder BufferBuilder; | 47 | typedef Sink::ApplicationDomain::Buffer::FolderBuilder BufferBuilder; |
48 | static DataStoreQuery prepareQuery(const Sink::Query &query, Sink::Storage::Transaction &transaction); | ||
47 | static QSet<QByteArray> indexedProperties(); | 49 | static QSet<QByteArray> indexedProperties(); |
48 | static ResultSet queryIndexes(const Sink::Query &query, const QByteArray &resourceInstanceIdentifier, QSet<QByteArray> &appliedFilters, QByteArray &appliedSorting, Sink::Storage::Transaction &transaction); | 50 | static ResultSet queryIndexes(const Sink::Query &query, const QByteArray &resourceInstanceIdentifier, QSet<QByteArray> &appliedFilters, QByteArray &appliedSorting, Sink::Storage::Transaction &transaction); |
49 | static void index(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::Transaction &transaction); | 51 | static void index(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::Transaction &transaction); |
diff --git a/common/domain/mail.cpp b/common/domain/mail.cpp index 13e1305..bb5ad58 100644 --- a/common/domain/mail.cpp +++ b/common/domain/mail.cpp | |||
@@ -32,6 +32,8 @@ | |||
32 | #include "../query.h" | 32 | #include "../query.h" |
33 | #include "../definitions.h" | 33 | #include "../definitions.h" |
34 | #include "../typeindex.h" | 34 | #include "../typeindex.h" |
35 | #include "entitybuffer.h" | ||
36 | #include "entity_generated.h" | ||
35 | 37 | ||
36 | #include "mail_generated.h" | 38 | #include "mail_generated.h" |
37 | 39 | ||
@@ -110,3 +112,14 @@ QSharedPointer<WritePropertyMapper<TypeImplementation<Mail>::BufferBuilder> > Ty | |||
110 | propertyMapper->addMapping<Mail::Sent>(&BufferBuilder::add_sent); | 112 | propertyMapper->addMapping<Mail::Sent>(&BufferBuilder::add_sent); |
111 | return propertyMapper; | 113 | return propertyMapper; |
112 | } | 114 | } |
115 | |||
116 | DataStoreQuery TypeImplementation<Mail>::prepareQuery(const Sink::Query &query, Sink::Storage::Transaction &transaction) | ||
117 | { | ||
118 | auto mapper = initializeReadPropertyMapper(); | ||
119 | return DataStoreQuery(query, ApplicationDomain::getTypeName<Mail>(), transaction, getIndex(), [mapper](const Sink::Entity &entity, const QByteArray &property) { | ||
120 | |||
121 | const auto localBuffer = Sink::EntityBuffer::readBuffer<Buffer>(entity.local()); | ||
122 | return mapper->getProperty(property, localBuffer); | ||
123 | }); | ||
124 | } | ||
125 | |||
diff --git a/common/domain/mail.h b/common/domain/mail.h index ff169dd..d6af9c5 100644 --- a/common/domain/mail.h +++ b/common/domain/mail.h | |||
@@ -21,6 +21,7 @@ | |||
21 | #include "applicationdomaintype.h" | 21 | #include "applicationdomaintype.h" |
22 | 22 | ||
23 | #include "storage.h" | 23 | #include "storage.h" |
24 | #include "datastorequery.h" | ||
24 | 25 | ||
25 | class ResultSet; | 26 | class ResultSet; |
26 | class QByteArray; | 27 | class QByteArray; |
@@ -44,6 +45,7 @@ class TypeImplementation<Sink::ApplicationDomain::Mail> { | |||
44 | public: | 45 | public: |
45 | typedef Sink::ApplicationDomain::Buffer::Mail Buffer; | 46 | typedef Sink::ApplicationDomain::Buffer::Mail Buffer; |
46 | typedef Sink::ApplicationDomain::Buffer::MailBuilder BufferBuilder; | 47 | typedef Sink::ApplicationDomain::Buffer::MailBuilder BufferBuilder; |
48 | static DataStoreQuery prepareQuery(const Sink::Query &query, Sink::Storage::Transaction &transaction); | ||
47 | static QSet<QByteArray> indexedProperties(); | 49 | static QSet<QByteArray> indexedProperties(); |
48 | /** | 50 | /** |
49 | * Returns the potential result set based on the indexes. | 51 | * Returns the potential result set based on the indexes. |
diff --git a/common/entitybuffer.cpp b/common/entitybuffer.cpp index 950bc46..32583cc 100644 --- a/common/entitybuffer.cpp +++ b/common/entitybuffer.cpp | |||
@@ -26,7 +26,7 @@ bool EntityBuffer::isValid() const | |||
26 | return mEntity; | 26 | return mEntity; |
27 | } | 27 | } |
28 | 28 | ||
29 | const Sink::Entity &EntityBuffer::entity() | 29 | const Sink::Entity &EntityBuffer::entity() const |
30 | { | 30 | { |
31 | Q_ASSERT(mEntity); | 31 | Q_ASSERT(mEntity); |
32 | return *mEntity; | 32 | return *mEntity; |
@@ -84,3 +84,15 @@ void EntityBuffer::assembleEntityBuffer( | |||
84 | auto entity = Sink::CreateEntity(fbb, metadata, resource, local); | 84 | auto entity = Sink::CreateEntity(fbb, metadata, resource, local); |
85 | Sink::FinishEntityBuffer(fbb, entity); | 85 | Sink::FinishEntityBuffer(fbb, entity); |
86 | } | 86 | } |
87 | |||
88 | Sink::Operation EntityBuffer::operation() const | ||
89 | { | ||
90 | const auto metadataBuffer = readBuffer<Sink::Metadata>(mEntity->metadata()); | ||
91 | return metadataBuffer ? metadataBuffer->operation() : Sink::Operation_Creation; | ||
92 | } | ||
93 | |||
94 | qint64 EntityBuffer::revision() const | ||
95 | { | ||
96 | const auto metadataBuffer = readBuffer<Sink::Metadata>(mEntity->metadata()); | ||
97 | return metadataBuffer ? metadataBuffer->revision() : -1; | ||
98 | } | ||
diff --git a/common/entitybuffer.h b/common/entitybuffer.h index 866a7d0..4162605 100644 --- a/common/entitybuffer.h +++ b/common/entitybuffer.h | |||
@@ -3,6 +3,7 @@ | |||
3 | #include "sink_export.h" | 3 | #include "sink_export.h" |
4 | #include <functional> | 4 | #include <functional> |
5 | #include <flatbuffers/flatbuffers.h> | 5 | #include <flatbuffers/flatbuffers.h> |
6 | #include "metadata_generated.h" | ||
6 | #include <QByteArray> | 7 | #include <QByteArray> |
7 | 8 | ||
8 | namespace Sink { | 9 | namespace Sink { |
@@ -16,9 +17,12 @@ public: | |||
16 | const uint8_t *resourceBuffer(); | 17 | const uint8_t *resourceBuffer(); |
17 | const uint8_t *metadataBuffer(); | 18 | const uint8_t *metadataBuffer(); |
18 | const uint8_t *localBuffer(); | 19 | const uint8_t *localBuffer(); |
19 | const Entity &entity(); | 20 | const Entity &entity() const; |
20 | bool isValid() const; | 21 | bool isValid() const; |
21 | 22 | ||
23 | Sink::Operation operation() const; | ||
24 | qint64 revision() const; | ||
25 | |||
22 | static void extractResourceBuffer(void *dataValue, int dataSize, const std::function<void(const uint8_t *, size_t size)> &handler); | 26 | static void extractResourceBuffer(void *dataValue, int dataSize, const std::function<void(const uint8_t *, size_t size)> &handler); |
23 | /* | 27 | /* |
24 | * TODO: Ideally we would be passing references to vectors in the same bufferbuilder, to avoid needlessly copying data. | 28 | * TODO: Ideally we would be passing references to vectors in the same bufferbuilder, to avoid needlessly copying data. |
diff --git a/common/entityreader.cpp b/common/entityreader.cpp index 01c25d2..faa154b 100644 --- a/common/entityreader.cpp +++ b/common/entityreader.cpp | |||
@@ -150,205 +150,41 @@ void EntityReader<DomainType>::query(const Sink::Query &query, const std::functi | |||
150 | }); | 150 | }); |
151 | } | 151 | } |
152 | 152 | ||
153 | template <class DomainType> | 153 | /* template <class DomainType> */ |
154 | void EntityReader<DomainType>::readEntity(const Sink::Storage::NamedDatabase &db, const QByteArray &key, | 154 | /* void EntityReader<DomainType>::readEntity(const Sink::Storage::NamedDatabase &db, const QByteArray &key, */ |
155 | const std::function<void(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &, Sink::Operation)> &resultCallback) | 155 | /* const std::function<void(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &, Sink::Operation)> &resultCallback) */ |
156 | { | 156 | /* { */ |
157 | db.findLatest(key, | 157 | /* db.findLatest(key, */ |
158 | [=](const QByteArray &key, const QByteArray &value) -> bool { | 158 | /* [=](const QByteArray &key, const QByteArray &value) -> bool { */ |
159 | Sink::EntityBuffer buffer(value.data(), value.size()); | 159 | /* Sink::EntityBuffer buffer(value.data(), value.size()); */ |
160 | const Sink::Entity &entity = buffer.entity(); | 160 | /* const Sink::Entity &entity = buffer.entity(); */ |
161 | const auto metadataBuffer = Sink::EntityBuffer::readBuffer<Sink::Metadata>(entity.metadata()); | 161 | /* const auto metadataBuffer = Sink::EntityBuffer::readBuffer<Sink::Metadata>(entity.metadata()); */ |
162 | const qint64 revision = metadataBuffer ? metadataBuffer->revision() : -1; | 162 | /* const qint64 revision = metadataBuffer ? metadataBuffer->revision() : -1; */ |
163 | const auto operation = metadataBuffer ? metadataBuffer->operation() : Sink::Operation_Creation; | 163 | /* const auto operation = metadataBuffer ? metadataBuffer->operation() : Sink::Operation_Creation; */ |
164 | auto adaptor = mDomainTypeAdaptorFactory.createAdaptor(entity); | 164 | /* auto adaptor = mDomainTypeAdaptorFactory.createAdaptor(entity); */ |
165 | Q_ASSERT(adaptor); | 165 | /* Q_ASSERT(adaptor); */ |
166 | resultCallback(DomainType::Ptr::create(mResourceInstanceIdentifier, Sink::Storage::uidFromKey(key), revision, adaptor), operation); | 166 | /* resultCallback(DomainType::Ptr::create(mResourceInstanceIdentifier, Sink::Storage::uidFromKey(key), revision, adaptor), operation); */ |
167 | return false; | 167 | /* return false; */ |
168 | }, | 168 | /* }, */ |
169 | [&](const Sink::Storage::Error &error) { SinkWarning() << "Error during query: " << error.message << key; }); | 169 | /* [&](const Sink::Storage::Error &error) { SinkWarning() << "Error during query: " << error.message << key; }); */ |
170 | } | 170 | /* } */ |
171 | |||
172 | static inline ResultSet fullScan(const Sink::Storage::Transaction &transaction, const QByteArray &bufferType) | ||
173 | { | ||
174 | // TODO use a result set with an iterator, to read values on demand | ||
175 | SinkTrace() << "Looking for : " << bufferType; | ||
176 | //The scan can return duplicate results if we have multiple revisions, so we use a set to deduplicate. | ||
177 | QSet<QByteArray> keys; | ||
178 | Storage::mainDatabase(transaction, bufferType) | ||
179 | .scan(QByteArray(), | ||
180 | [&](const QByteArray &key, const QByteArray &value) -> bool { | ||
181 | if (keys.contains(Sink::Storage::uidFromKey(key))) { | ||
182 | //Not something that should persist if the replay works, so we keep a message for now. | ||
183 | SinkTrace() << "Multiple revisions for key: " << key; | ||
184 | } | ||
185 | keys << Sink::Storage::uidFromKey(key); | ||
186 | return true; | ||
187 | }, | ||
188 | [](const Sink::Storage::Error &error) { SinkWarning() << "Error during query: " << error.message; }); | ||
189 | |||
190 | SinkTrace() << "Full scan retrieved " << keys.size() << " results."; | ||
191 | return ResultSet(keys.toList().toVector()); | ||
192 | } | ||
193 | |||
194 | template <class DomainType> | ||
195 | ResultSet EntityReader<DomainType>::loadInitialResultSet(const Sink::Query &query, QSet<QByteArray> &remainingFilters, QByteArray &remainingSorting) | ||
196 | { | ||
197 | if (!query.ids.isEmpty()) { | ||
198 | return ResultSet(query.ids.toVector()); | ||
199 | } | ||
200 | QSet<QByteArray> appliedFilters; | ||
201 | QByteArray appliedSorting; | ||
202 | auto resultSet = Sink::ApplicationDomain::TypeImplementation<DomainType>::queryIndexes(query, mResourceInstanceIdentifier, appliedFilters, appliedSorting, mTransaction); | ||
203 | remainingFilters = query.propertyFilter.keys().toSet() - appliedFilters; | ||
204 | if (appliedSorting.isEmpty()) { | ||
205 | remainingSorting = query.sortProperty; | ||
206 | } | ||
207 | 171 | ||
208 | // We do a full scan if there were no indexes available to create the initial set. | ||
209 | if (appliedFilters.isEmpty()) { | ||
210 | // TODO this should be replaced by an index lookup as well | ||
211 | resultSet = fullScan(mTransaction, ApplicationDomain::getTypeName<DomainType>()); | ||
212 | } | ||
213 | return resultSet; | ||
214 | } | ||
215 | |||
216 | template <class DomainType> | ||
217 | ResultSet EntityReader<DomainType>::loadIncrementalResultSet(qint64 baseRevision, const Sink::Query &query, QSet<QByteArray> &remainingFilters) | ||
218 | { | ||
219 | const auto bufferType = ApplicationDomain::getTypeName<DomainType>(); | ||
220 | auto revisionCounter = QSharedPointer<qint64>::create(baseRevision); | ||
221 | remainingFilters = query.propertyFilter.keys().toSet(); | ||
222 | return ResultSet([this, bufferType, revisionCounter]() -> QByteArray { | ||
223 | const qint64 topRevision = Sink::Storage::maxRevision(mTransaction); | ||
224 | // Spit out the revision keys one by one. | ||
225 | while (*revisionCounter <= topRevision) { | ||
226 | const auto uid = Sink::Storage::getUidFromRevision(mTransaction, *revisionCounter); | ||
227 | const auto type = Sink::Storage::getTypeFromRevision(mTransaction, *revisionCounter); | ||
228 | // SinkTrace() << "Revision" << *revisionCounter << type << uid; | ||
229 | Q_ASSERT(!uid.isEmpty()); | ||
230 | Q_ASSERT(!type.isEmpty()); | ||
231 | if (type != bufferType) { | ||
232 | // Skip revision | ||
233 | *revisionCounter += 1; | ||
234 | continue; | ||
235 | } | ||
236 | const auto key = Sink::Storage::assembleKey(uid, *revisionCounter); | ||
237 | *revisionCounter += 1; | ||
238 | return key; | ||
239 | } | ||
240 | SinkTrace() << "Finished reading incremental result set:" << *revisionCounter; | ||
241 | // We're done | ||
242 | return QByteArray(); | ||
243 | }); | ||
244 | } | ||
245 | |||
246 | template <class DomainType> | ||
247 | ResultSet EntityReader<DomainType>::filterAndSortSet(ResultSet &resultSet, const std::function<bool(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> &filter, | ||
248 | const Sink::Storage::NamedDatabase &db, bool initialQuery, const QByteArray &sortProperty) | ||
249 | { | ||
250 | const bool sortingRequired = !sortProperty.isEmpty(); | ||
251 | if (initialQuery && sortingRequired) { | ||
252 | SinkTrace() << "Sorting the resultset in memory according to property: " << sortProperty; | ||
253 | // Sort the complete set by reading the sort property and filling into a sorted map | ||
254 | auto sortedMap = QSharedPointer<QMap<QByteArray, QByteArray>>::create(); | ||
255 | while (resultSet.next()) { | ||
256 | // readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess) | ||
257 | readEntity(db, resultSet.id(), | ||
258 | [this, filter, initialQuery, sortedMap, sortProperty, &resultSet](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject, Sink::Operation operation) { | ||
259 | // We're not interested in removals during the initial query | ||
260 | if ((operation != Sink::Operation_Removal) && filter(domainObject)) { | ||
261 | if (!sortProperty.isEmpty()) { | ||
262 | const auto sortValue = domainObject->getProperty(sortProperty); | ||
263 | if (sortValue.type() == QVariant::DateTime) { | ||
264 | sortedMap->insert(QByteArray::number(std::numeric_limits<unsigned int>::max() - sortValue.toDateTime().toTime_t()), domainObject->identifier()); | ||
265 | } else { | ||
266 | sortedMap->insert(sortValue.toString().toLatin1(), domainObject->identifier()); | ||
267 | } | ||
268 | } else { | ||
269 | sortedMap->insert(domainObject->identifier(), domainObject->identifier()); | ||
270 | } | ||
271 | } | ||
272 | }); | ||
273 | } | ||
274 | |||
275 | SinkTrace() << "Sorted " << sortedMap->size() << " values."; | ||
276 | auto iterator = QSharedPointer<QMapIterator<QByteArray, QByteArray>>::create(*sortedMap); | ||
277 | ResultSet::ValueGenerator generator = [this, iterator, sortedMap, &db, filter, initialQuery]( | ||
278 | std::function<void(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &, Sink::Operation)> callback) -> bool { | ||
279 | if (iterator->hasNext()) { | ||
280 | readEntity(db, iterator->next().value(), [this, filter, callback, initialQuery](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject, | ||
281 | Sink::Operation operation) { callback(domainObject, Sink::Operation_Creation); }); | ||
282 | return true; | ||
283 | } | ||
284 | return false; | ||
285 | }; | ||
286 | 172 | ||
287 | auto skip = [iterator]() { | ||
288 | if (iterator->hasNext()) { | ||
289 | iterator->next(); | ||
290 | } | ||
291 | }; | ||
292 | return ResultSet(generator, skip); | ||
293 | } else { | ||
294 | auto resultSetPtr = QSharedPointer<ResultSet>::create(resultSet); | ||
295 | ResultSet::ValueGenerator generator = [this, resultSetPtr, &db, filter, initialQuery]( | ||
296 | std::function<void(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &, Sink::Operation)> callback) -> bool { | ||
297 | if (resultSetPtr->next()) { | ||
298 | // readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess) | ||
299 | readEntity(db, resultSetPtr->id(), [this, filter, callback, initialQuery](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject, Sink::Operation operation) { | ||
300 | if (initialQuery) { | ||
301 | // We're not interested in removals during the initial query | ||
302 | if ((operation != Sink::Operation_Removal) && filter(domainObject)) { | ||
303 | // In the initial set every entity is new | ||
304 | callback(domainObject, Sink::Operation_Creation); | ||
305 | } | ||
306 | } else { | ||
307 | // Always remove removals, they probably don't match due to non-available properties | ||
308 | if ((operation == Sink::Operation_Removal) || filter(domainObject)) { | ||
309 | // TODO only replay if this is in the currently visible set (or just always replay, worst case we have a couple to many results) | ||
310 | callback(domainObject, operation); | ||
311 | } | ||
312 | } | ||
313 | }); | ||
314 | return true; | ||
315 | } | ||
316 | return false; | ||
317 | }; | ||
318 | auto skip = [resultSetPtr]() { resultSetPtr->skip(1); }; | ||
319 | return ResultSet(generator, skip); | ||
320 | } | ||
321 | } | ||
322 | 173 | ||
323 | template <class DomainType> | 174 | template <class DomainType> |
324 | QPair<qint64, qint64> EntityReader<DomainType>::load(const Sink::Query &query, const std::function<ResultSet(QSet<QByteArray> &, QByteArray &)> &baseSetRetriever, bool initialQuery, int offset, int batchSize, const std::function<bool(const typename DomainType::Ptr &value, Sink::Operation operation)> &callback) | 175 | QPair<qint64, qint64> EntityReader<DomainType>::executeInitialQuery(const Sink::Query &query, int offset, int batchsize, const std::function<bool(const typename DomainType::Ptr &value, Sink::Operation operation)> &callback) |
325 | { | 176 | { |
326 | QTime time; | 177 | QTime time; |
327 | time.start(); | 178 | time.start(); |
328 | 179 | ||
329 | auto db = Storage::mainDatabase(mTransaction, ApplicationDomain::getTypeName<DomainType>()); | 180 | auto preparedQuery = ApplicationDomain::TypeImplementation<DomainType>::prepareQuery(query, mTransaction); |
181 | auto resultSet = preparedQuery.execute(); | ||
330 | 182 | ||
331 | QSet<QByteArray> remainingFilters; | ||
332 | QByteArray remainingSorting; | ||
333 | auto resultSet = baseSetRetriever(remainingFilters, remainingSorting); | ||
334 | SinkTrace() << "Base set retrieved. " << Log::TraceTime(time.elapsed()); | ||
335 | auto filteredSet = filterAndSortSet(resultSet, getFilter(remainingFilters, query), db, initialQuery, remainingSorting); | ||
336 | SinkTrace() << "Filtered set retrieved. " << Log::TraceTime(time.elapsed()); | 183 | SinkTrace() << "Filtered set retrieved. " << Log::TraceTime(time.elapsed()); |
337 | auto replayedEntities = replaySet(filteredSet, offset, batchSize, callback); | 184 | auto replayedEntities = replaySet(resultSet, offset, batchsize, callback); |
338 | // SinkTrace() << "Filtered set replayed. " << Log::TraceTime(time.elapsed()); | ||
339 | return qMakePair(Sink::Storage::maxRevision(mTransaction), replayedEntities); | ||
340 | } | ||
341 | 185 | ||
342 | template <class DomainType> | ||
343 | QPair<qint64, qint64> EntityReader<DomainType>::executeInitialQuery(const Sink::Query &query, int offset, int batchsize, const std::function<bool(const typename DomainType::Ptr &value, Sink::Operation operation)> &callback) | ||
344 | { | ||
345 | QTime time; | ||
346 | time.start(); | ||
347 | auto revisionAndReplayedEntities = load(query, [&](QSet<QByteArray> &remainingFilters, QByteArray &remainingSorting) -> ResultSet { | ||
348 | return loadInitialResultSet(query, remainingFilters, remainingSorting); | ||
349 | }, true, offset, batchsize, callback); | ||
350 | SinkTrace() << "Initial query took: " << Log::TraceTime(time.elapsed()); | 186 | SinkTrace() << "Initial query took: " << Log::TraceTime(time.elapsed()); |
351 | return revisionAndReplayedEntities; | 187 | return qMakePair(Sink::Storage::maxRevision(mTransaction), replayedEntities); |
352 | } | 188 | } |
353 | 189 | ||
354 | template <class DomainType> | 190 | template <class DomainType> |
@@ -357,33 +193,15 @@ QPair<qint64, qint64> EntityReader<DomainType>::executeIncrementalQuery(const Si | |||
357 | QTime time; | 193 | QTime time; |
358 | time.start(); | 194 | time.start(); |
359 | const qint64 baseRevision = lastRevision + 1; | 195 | const qint64 baseRevision = lastRevision + 1; |
360 | auto revisionAndReplayedEntities = load(query, [&](QSet<QByteArray> &remainingFilters, QByteArray &remainingSorting) -> ResultSet { | ||
361 | return loadIncrementalResultSet(baseRevision, query, remainingFilters); | ||
362 | }, false, 0, 0, callback); | ||
363 | SinkTrace() << "Initial query took: " << Log::TraceTime(time.elapsed()); | ||
364 | return revisionAndReplayedEntities; | ||
365 | } | ||
366 | 196 | ||
367 | template <class DomainType> | 197 | auto preparedQuery = ApplicationDomain::TypeImplementation<DomainType>::prepareQuery(query, mTransaction); |
368 | std::function<bool(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> | 198 | auto resultSet = preparedQuery.update(baseRevision); |
369 | EntityReader<DomainType>::getFilter(const QSet<QByteArray> remainingFilters, const Sink::Query &query) | 199 | |
370 | { | 200 | SinkTrace() << "Filtered set retrieved. " << Log::TraceTime(time.elapsed()); |
371 | return [this, remainingFilters, query](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject) -> bool { | 201 | auto replayedEntities = replaySet(resultSet, 0, 0, callback); |
372 | if (!query.ids.isEmpty()) { | 202 | |
373 | if (!query.ids.contains(domainObject->identifier())) { | 203 | SinkTrace() << "Incremental query took: " << Log::TraceTime(time.elapsed()); |
374 | return false; | 204 | return qMakePair(Sink::Storage::maxRevision(mTransaction), replayedEntities); |
375 | } | ||
376 | } | ||
377 | for (const auto &filterProperty : remainingFilters) { | ||
378 | const auto property = domainObject->getProperty(filterProperty); | ||
379 | const auto comparator = query.propertyFilter.value(filterProperty); | ||
380 | if (!comparator.matches(property)) { | ||
381 | SinkTrace() << "Filtering entity due to property mismatch on filter: " << filterProperty << property << ":" << comparator.value; | ||
382 | return false; | ||
383 | } | ||
384 | } | ||
385 | return true; | ||
386 | }; | ||
387 | } | 205 | } |
388 | 206 | ||
389 | template <class DomainType> | 207 | template <class DomainType> |
@@ -394,9 +212,11 @@ qint64 EntityReader<DomainType>::replaySet(ResultSet &resultSet, int offset, int | |||
394 | int counter = 0; | 212 | int counter = 0; |
395 | while (!batchSize || (counter < batchSize)) { | 213 | while (!batchSize || (counter < batchSize)) { |
396 | const bool ret = | 214 | const bool ret = |
397 | resultSet.next([&counter, callback](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &value, Sink::Operation operation) -> bool { | 215 | resultSet.next([this, &counter, callback](const QByteArray &uid, const Sink::EntityBuffer &value, Sink::Operation operation) -> bool { |
398 | counter++; | 216 | counter++; |
399 | return callback(value.staticCast<DomainType>(), operation); | 217 | auto adaptor = mDomainTypeAdaptorFactory.createAdaptor(value.entity()); |
218 | Q_ASSERT(adaptor); | ||
219 | return callback(QSharedPointer<DomainType>::create(mResourceInstanceIdentifier, uid, value.revision(), adaptor), operation); | ||
400 | }); | 220 | }); |
401 | if (!ret) { | 221 | if (!ret) { |
402 | break; | 222 | break; |
diff --git a/common/entityreader.h b/common/entityreader.h index a479679..f216453 100644 --- a/common/entityreader.h +++ b/common/entityreader.h | |||
@@ -89,17 +89,6 @@ public: | |||
89 | private: | 89 | private: |
90 | qint64 replaySet(ResultSet &resultSet, int offset, int batchSize, const ResultCallback &callback); | 90 | qint64 replaySet(ResultSet &resultSet, int offset, int batchSize, const ResultCallback &callback); |
91 | 91 | ||
92 | void readEntity(const Sink::Storage::NamedDatabase &db, const QByteArray &key, | ||
93 | const std::function<void(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &, Sink::Operation)> &resultCallback); | ||
94 | |||
95 | ResultSet loadInitialResultSet(const Sink::Query &query, QSet<QByteArray> &remainingFilters, QByteArray &remainingSorting); | ||
96 | ResultSet loadIncrementalResultSet(qint64 baseRevision, const Sink::Query &query, QSet<QByteArray> &remainingFilters); | ||
97 | |||
98 | ResultSet filterAndSortSet(ResultSet &resultSet, const std::function<bool(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> &filter, | ||
99 | const Sink::Storage::NamedDatabase &db, bool initialQuery, const QByteArray &sortProperty); | ||
100 | std::function<bool(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> getFilter(const QSet<QByteArray> remainingFilters, const Sink::Query &query); | ||
101 | QPair<qint64, qint64> load(const Sink::Query &query, const std::function<ResultSet(QSet<QByteArray> &, QByteArray &)> &baseSetRetriever, bool initialQuery, int offset, int batchSize, const ResultCallback &callback); | ||
102 | |||
103 | private: | 92 | private: |
104 | QByteArray mResourceInstanceIdentifier; | 93 | QByteArray mResourceInstanceIdentifier; |
105 | Sink::Storage::Transaction &mTransaction; | 94 | Sink::Storage::Transaction &mTransaction; |
diff --git a/common/resultset.cpp b/common/resultset.cpp index 293035b..51914e9 100644 --- a/common/resultset.cpp +++ b/common/resultset.cpp | |||
@@ -18,7 +18,7 @@ | |||
18 | */ | 18 | */ |
19 | #include "resultset.h" | 19 | #include "resultset.h" |
20 | 20 | ||
21 | #include "common/log.h" | 21 | #include "log.h" |
22 | 22 | ||
23 | ResultSet::ResultSet() : mIt(nullptr) | 23 | ResultSet::ResultSet() : mIt(nullptr) |
24 | { | 24 | { |
@@ -78,12 +78,12 @@ bool ResultSet::next() | |||
78 | return true; | 78 | return true; |
79 | } | 79 | } |
80 | } else { | 80 | } else { |
81 | next([](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &value, Sink::Operation) { return false; }); | 81 | next([](const QByteArray &, const Sink::EntityBuffer &, Sink::Operation) { return false; }); |
82 | } | 82 | } |
83 | return false; | 83 | return false; |
84 | } | 84 | } |
85 | 85 | ||
86 | bool ResultSet::next(std::function<bool(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &value, Sink::Operation)> callback) | 86 | bool ResultSet::next(const Callback &callback) |
87 | { | 87 | { |
88 | Q_ASSERT(mValueGenerator); | 88 | Q_ASSERT(mValueGenerator); |
89 | return mValueGenerator(callback); | 89 | return mValueGenerator(callback); |
diff --git a/common/resultset.h b/common/resultset.h index 88f7055..4e934fc 100644 --- a/common/resultset.h +++ b/common/resultset.h | |||
@@ -20,8 +20,8 @@ | |||
20 | 20 | ||
21 | #include <QVector> | 21 | #include <QVector> |
22 | #include <functional> | 22 | #include <functional> |
23 | #include "domain/applicationdomaintype.h" | ||
24 | #include "metadata_generated.h" | 23 | #include "metadata_generated.h" |
24 | #include "entitybuffer.h" | ||
25 | 25 | ||
26 | /* | 26 | /* |
27 | * An iterator to a result set. | 27 | * An iterator to a result set. |
@@ -31,7 +31,8 @@ | |||
31 | class ResultSet | 31 | class ResultSet |
32 | { | 32 | { |
33 | public: | 33 | public: |
34 | typedef std::function<bool(std::function<void(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &, Sink::Operation)>)> ValueGenerator; | 34 | typedef std::function<void(const QByteArray &uid, const Sink::EntityBuffer &, Sink::Operation)> Callback; |
35 | typedef std::function<bool(Callback)> ValueGenerator; | ||
35 | typedef std::function<QByteArray()> IdGenerator; | 36 | typedef std::function<QByteArray()> IdGenerator; |
36 | typedef std::function<void()> SkipValue; | 37 | typedef std::function<void()> SkipValue; |
37 | 38 | ||
@@ -42,7 +43,7 @@ public: | |||
42 | ResultSet(const ResultSet &other); | 43 | ResultSet(const ResultSet &other); |
43 | 44 | ||
44 | bool next(); | 45 | bool next(); |
45 | bool next(std::function<bool(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &value, Sink::Operation)> callback); | 46 | bool next(const Callback &callback); |
46 | 47 | ||
47 | void skip(int number); | 48 | void skip(int number); |
48 | 49 | ||