diff options
author | Christian Mollekopf <chrigi_1@fastmail.fm> | 2016-09-19 18:55:21 +0200 |
---|---|---|
committer | Christian Mollekopf <chrigi_1@fastmail.fm> | 2016-09-19 18:55:21 +0200 |
commit | 4a14a6fade947aa830d3f21598a4a6ba7316b933 (patch) | |
tree | c6b340bf1c6284e5501d371f65b58e3a69391a26 | |
parent | 1deac558af4b1c9f04352ede7f8e172f11a70a6b (diff) | |
download | sink-4a14a6fade947aa830d3f21598a4a6ba7316b933.tar.gz sink-4a14a6fade947aa830d3f21598a4a6ba7316b933.zip |
Refactored the query part of the entity reader into DataStoreQuery.
DataStoreQuery now encapsulates the low-level query that operates
directly on the storage. It no longer has access to the resource
buffers, and is instantiated by the type implementation, so we can
specialize the query alogorithm per type, but not per resource.
This will allow us to implement the threading queries for the mailtype.
-rw-r--r-- | common/CMakeLists.txt | 1 | ||||
-rw-r--r-- | common/datastorequery.cpp | 246 | ||||
-rw-r--r-- | common/datastorequery.h | 59 | ||||
-rw-r--r-- | common/domain/event.cpp | 13 | ||||
-rw-r--r-- | common/domain/event.h | 2 | ||||
-rw-r--r-- | common/domain/folder.cpp | 11 | ||||
-rw-r--r-- | common/domain/folder.h | 2 | ||||
-rw-r--r-- | common/domain/mail.cpp | 13 | ||||
-rw-r--r-- | common/domain/mail.h | 2 | ||||
-rw-r--r-- | common/entitybuffer.cpp | 14 | ||||
-rw-r--r-- | common/entitybuffer.h | 6 | ||||
-rw-r--r-- | common/entityreader.cpp | 250 | ||||
-rw-r--r-- | common/entityreader.h | 11 | ||||
-rw-r--r-- | common/resultset.cpp | 6 | ||||
-rw-r--r-- | common/resultset.h | 7 |
15 files changed, 409 insertions, 234 deletions
diff --git a/common/CMakeLists.txt b/common/CMakeLists.txt index 5eb15ba..0fc8d81 100644 --- a/common/CMakeLists.txt +++ b/common/CMakeLists.txt | |||
@@ -75,6 +75,7 @@ set(command_SRCS | |||
75 | entityreader.cpp | 75 | entityreader.cpp |
76 | mailpreprocessor.cpp | 76 | mailpreprocessor.cpp |
77 | specialpurposepreprocessor.cpp | 77 | specialpurposepreprocessor.cpp |
78 | datastorequery.cpp | ||
78 | ${storage_SRCS}) | 79 | ${storage_SRCS}) |
79 | 80 | ||
80 | add_library(${PROJECT_NAME} SHARED ${command_SRCS}) | 81 | add_library(${PROJECT_NAME} SHARED ${command_SRCS}) |
diff --git a/common/datastorequery.cpp b/common/datastorequery.cpp new file mode 100644 index 0000000..3237c53 --- /dev/null +++ b/common/datastorequery.cpp | |||
@@ -0,0 +1,246 @@ | |||
1 | /* | ||
2 | * Copyright (C) 2016 Christian Mollekopf <chrigi_1@fastmail.fm> | ||
3 | * | ||
4 | * This program is free software; you can redistribute it and/or modify | ||
5 | * it under the terms of the GNU General Public License as published by | ||
6 | * the Free Software Foundation; either version 2 of the License, or | ||
7 | * (at your option) any later version. | ||
8 | * | ||
9 | * This program is distributed in the hope that it will be useful, | ||
10 | * but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
11 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||
12 | * GNU General Public License for more details. | ||
13 | * | ||
14 | * You should have received a copy of the GNU General Public License | ||
15 | * along with this program; if not, write to the | ||
16 | * Free Software Foundation, Inc., | ||
17 | * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. | ||
18 | */ | ||
19 | #include "datastorequery.h" | ||
20 | |||
21 | #include "log.h" | ||
22 | #include "entitybuffer.h" | ||
23 | #include "entity_generated.h" | ||
24 | |||
25 | using namespace Sink; | ||
26 | |||
27 | |||
28 | SINK_DEBUG_AREA("datastorequery") | ||
29 | |||
30 | DataStoreQuery::DataStoreQuery(const Sink::Query &query, const QByteArray &type, Sink::Storage::Transaction &transaction, TypeIndex &typeIndex, std::function<QVariant(const Sink::Entity &entity, const QByteArray &property)> getProperty) | ||
31 | : mQuery(query), mTransaction(transaction), mType(type), mTypeIndex(typeIndex), mDb(Storage::mainDatabase(mTransaction, mType)), mGetProperty(getProperty) | ||
32 | { | ||
33 | |||
34 | } | ||
35 | |||
36 | static inline ResultSet fullScan(const Sink::Storage::Transaction &transaction, const QByteArray &bufferType) | ||
37 | { | ||
38 | // TODO use a result set with an iterator, to read values on demand | ||
39 | SinkTrace() << "Looking for : " << bufferType; | ||
40 | //The scan can return duplicate results if we have multiple revisions, so we use a set to deduplicate. | ||
41 | QSet<QByteArray> keys; | ||
42 | Storage::mainDatabase(transaction, bufferType) | ||
43 | .scan(QByteArray(), | ||
44 | [&](const QByteArray &key, const QByteArray &value) -> bool { | ||
45 | if (keys.contains(Sink::Storage::uidFromKey(key))) { | ||
46 | //Not something that should persist if the replay works, so we keep a message for now. | ||
47 | SinkTrace() << "Multiple revisions for key: " << key; | ||
48 | } | ||
49 | keys << Sink::Storage::uidFromKey(key); | ||
50 | return true; | ||
51 | }, | ||
52 | [](const Sink::Storage::Error &error) { SinkWarning() << "Error during query: " << error.message; }); | ||
53 | |||
54 | SinkTrace() << "Full scan retrieved " << keys.size() << " results."; | ||
55 | return ResultSet(keys.toList().toVector()); | ||
56 | } | ||
57 | |||
58 | ResultSet DataStoreQuery::loadInitialResultSet(QSet<QByteArray> &remainingFilters, QByteArray &remainingSorting) | ||
59 | { | ||
60 | if (!mQuery.ids.isEmpty()) { | ||
61 | return ResultSet(mQuery.ids.toVector()); | ||
62 | } | ||
63 | QSet<QByteArray> appliedFilters; | ||
64 | QByteArray appliedSorting; | ||
65 | |||
66 | auto resultSet = mTypeIndex.query(mQuery, appliedFilters, appliedSorting, mTransaction); | ||
67 | |||
68 | remainingFilters = mQuery.propertyFilter.keys().toSet() - appliedFilters; | ||
69 | if (appliedSorting.isEmpty()) { | ||
70 | remainingSorting = mQuery.sortProperty; | ||
71 | } | ||
72 | |||
73 | // We do a full scan if there were no indexes available to create the initial set. | ||
74 | if (appliedFilters.isEmpty()) { | ||
75 | // TODO this should be replaced by an index lookup as well | ||
76 | resultSet = fullScan(mTransaction, mType); | ||
77 | } | ||
78 | return resultSet; | ||
79 | } | ||
80 | |||
81 | ResultSet DataStoreQuery::loadIncrementalResultSet(qint64 baseRevision, QSet<QByteArray> &remainingFilters) | ||
82 | { | ||
83 | const auto bufferType = mType; | ||
84 | auto revisionCounter = QSharedPointer<qint64>::create(baseRevision); | ||
85 | remainingFilters = mQuery.propertyFilter.keys().toSet(); | ||
86 | return ResultSet([this, bufferType, revisionCounter]() -> QByteArray { | ||
87 | const qint64 topRevision = Sink::Storage::maxRevision(mTransaction); | ||
88 | // Spit out the revision keys one by one. | ||
89 | while (*revisionCounter <= topRevision) { | ||
90 | const auto uid = Sink::Storage::getUidFromRevision(mTransaction, *revisionCounter); | ||
91 | const auto type = Sink::Storage::getTypeFromRevision(mTransaction, *revisionCounter); | ||
92 | // SinkTrace() << "Revision" << *revisionCounter << type << uid; | ||
93 | Q_ASSERT(!uid.isEmpty()); | ||
94 | Q_ASSERT(!type.isEmpty()); | ||
95 | if (type != bufferType) { | ||
96 | // Skip revision | ||
97 | *revisionCounter += 1; | ||
98 | continue; | ||
99 | } | ||
100 | const auto key = Sink::Storage::assembleKey(uid, *revisionCounter); | ||
101 | *revisionCounter += 1; | ||
102 | return key; | ||
103 | } | ||
104 | SinkTrace() << "Finished reading incremental result set:" << *revisionCounter; | ||
105 | // We're done | ||
106 | return QByteArray(); | ||
107 | }); | ||
108 | } | ||
109 | |||
110 | void DataStoreQuery::readEntity(const QByteArray &key, const BufferCallback &resultCallback) | ||
111 | { | ||
112 | mDb.findLatest(key, | ||
113 | [=](const QByteArray &key, const QByteArray &value) -> bool { | ||
114 | resultCallback(Sink::Storage::uidFromKey(key), Sink::EntityBuffer(value.data(), value.size())); | ||
115 | return false; | ||
116 | }, | ||
117 | [&](const Sink::Storage::Error &error) { SinkWarning() << "Error during query: " << error.message << key; }); | ||
118 | } | ||
119 | |||
120 | QVariant DataStoreQuery::getProperty(const Sink::Entity &entity, const QByteArray &property) | ||
121 | { | ||
122 | return mGetProperty(entity, property); | ||
123 | } | ||
124 | |||
125 | ResultSet DataStoreQuery::filterAndSortSet(ResultSet &resultSet, const FilterFunction &filter, bool initialQuery, const QByteArray &sortProperty) | ||
126 | { | ||
127 | const bool sortingRequired = !sortProperty.isEmpty(); | ||
128 | if (initialQuery && sortingRequired) { | ||
129 | SinkTrace() << "Sorting the resultset in memory according to property: " << sortProperty; | ||
130 | // Sort the complete set by reading the sort property and filling into a sorted map | ||
131 | auto sortedMap = QSharedPointer<QMap<QByteArray, QByteArray>>::create(); | ||
132 | while (resultSet.next()) { | ||
133 | // readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess) | ||
134 | readEntity(resultSet.id(), | ||
135 | [this, filter, initialQuery, sortedMap, sortProperty, &resultSet](const QByteArray &uid, const Sink::EntityBuffer &buffer) { | ||
136 | |||
137 | const auto operation = buffer.operation(); | ||
138 | |||
139 | // We're not interested in removals during the initial query | ||
140 | if ((operation != Sink::Operation_Removal) && filter(uid, buffer)) { | ||
141 | if (!sortProperty.isEmpty()) { | ||
142 | const auto sortValue = getProperty(buffer.entity(), sortProperty); | ||
143 | if (sortValue.type() == QVariant::DateTime) { | ||
144 | sortedMap->insert(QByteArray::number(std::numeric_limits<unsigned int>::max() - sortValue.toDateTime().toTime_t()), uid); | ||
145 | } else { | ||
146 | sortedMap->insert(sortValue.toString().toLatin1(), uid); | ||
147 | } | ||
148 | } else { | ||
149 | sortedMap->insert(uid, uid); | ||
150 | } | ||
151 | } | ||
152 | }); | ||
153 | } | ||
154 | |||
155 | SinkTrace() << "Sorted " << sortedMap->size() << " values."; | ||
156 | auto iterator = QSharedPointer<QMapIterator<QByteArray, QByteArray>>::create(*sortedMap); | ||
157 | ResultSet::ValueGenerator generator = [this, iterator, sortedMap, filter, initialQuery]( | ||
158 | std::function<void(const QByteArray &uid, const Sink::EntityBuffer &entity, Sink::Operation)> callback) -> bool { | ||
159 | if (iterator->hasNext()) { | ||
160 | readEntity(iterator->next().value(), [this, filter, callback, initialQuery](const QByteArray &uid, const Sink::EntityBuffer &buffer) { | ||
161 | callback(uid, buffer, Sink::Operation_Creation); | ||
162 | }); | ||
163 | return true; | ||
164 | } | ||
165 | return false; | ||
166 | }; | ||
167 | |||
168 | auto skip = [iterator]() { | ||
169 | if (iterator->hasNext()) { | ||
170 | iterator->next(); | ||
171 | } | ||
172 | }; | ||
173 | return ResultSet(generator, skip); | ||
174 | } else { | ||
175 | auto resultSetPtr = QSharedPointer<ResultSet>::create(resultSet); | ||
176 | ResultSet::ValueGenerator generator = [this, resultSetPtr, filter, initialQuery](const ResultSet::Callback &callback) -> bool { | ||
177 | if (resultSetPtr->next()) { | ||
178 | SinkTrace() << "Reading the next value: " << resultSetPtr->id(); | ||
179 | // readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess) | ||
180 | readEntity(resultSetPtr->id(), [this, filter, callback, initialQuery](const QByteArray &uid, const Sink::EntityBuffer &buffer) { | ||
181 | const auto operation = buffer.operation(); | ||
182 | if (initialQuery) { | ||
183 | // We're not interested in removals during the initial query | ||
184 | if ((operation != Sink::Operation_Removal) && filter(uid, buffer)) { | ||
185 | // In the initial set every entity is new | ||
186 | callback(uid, buffer, Sink::Operation_Creation); | ||
187 | } | ||
188 | } else { | ||
189 | // Always remove removals, they probably don't match due to non-available properties | ||
190 | if ((operation == Sink::Operation_Removal) || filter(uid, buffer)) { | ||
191 | // TODO only replay if this is in the currently visible set (or just always replay, worst case we have a couple to many results) | ||
192 | callback(uid, buffer, operation); | ||
193 | } | ||
194 | } | ||
195 | }); | ||
196 | return true; | ||
197 | } | ||
198 | return false; | ||
199 | }; | ||
200 | auto skip = [resultSetPtr]() { resultSetPtr->skip(1); }; | ||
201 | return ResultSet(generator, skip); | ||
202 | } | ||
203 | } | ||
204 | |||
205 | |||
206 | DataStoreQuery::FilterFunction DataStoreQuery::getFilter(const QSet<QByteArray> &remainingFilters) | ||
207 | { | ||
208 | auto query = mQuery; | ||
209 | return [this, remainingFilters, query](const QByteArray &uid, const Sink::EntityBuffer &entity) -> bool { | ||
210 | if (!query.ids.isEmpty()) { | ||
211 | if (!query.ids.contains(uid)) { | ||
212 | SinkTrace() << "Filter by uid: " << uid; | ||
213 | return false; | ||
214 | } | ||
215 | } | ||
216 | for (const auto &filterProperty : remainingFilters) { | ||
217 | const auto property = getProperty(entity.entity(), filterProperty); | ||
218 | const auto comparator = query.propertyFilter.value(filterProperty); | ||
219 | if (!comparator.matches(property)) { | ||
220 | SinkTrace() << "Filtering entity due to property mismatch on filter: " << filterProperty << property << ":" << comparator.value; | ||
221 | return false; | ||
222 | } | ||
223 | } | ||
224 | return true; | ||
225 | }; | ||
226 | } | ||
227 | |||
228 | ResultSet DataStoreQuery::update(qint64 baseRevision) | ||
229 | { | ||
230 | SinkTrace() << "Executing query update"; | ||
231 | QSet<QByteArray> remainingFilters; | ||
232 | QByteArray remainingSorting; | ||
233 | auto resultSet = loadIncrementalResultSet(baseRevision, remainingFilters); | ||
234 | auto filteredSet = filterAndSortSet(resultSet, getFilter(remainingFilters), false, remainingSorting); | ||
235 | return filteredSet; | ||
236 | } | ||
237 | |||
238 | ResultSet DataStoreQuery::execute() | ||
239 | { | ||
240 | SinkTrace() << "Executing query"; | ||
241 | QSet<QByteArray> remainingFilters; | ||
242 | QByteArray remainingSorting; | ||
243 | auto resultSet = loadInitialResultSet(remainingFilters, remainingSorting); | ||
244 | auto filteredSet = filterAndSortSet(resultSet, getFilter(remainingFilters), true, remainingSorting); | ||
245 | return filteredSet; | ||
246 | } | ||
diff --git a/common/datastorequery.h b/common/datastorequery.h new file mode 100644 index 0000000..cf9d9e2 --- /dev/null +++ b/common/datastorequery.h | |||
@@ -0,0 +1,59 @@ | |||
1 | /* | ||
2 | * Copyright (C) 2016 Christian Mollekopf <chrigi_1@fastmail.fm> | ||
3 | * | ||
4 | * This program is free software; you can redistribute it and/or modify | ||
5 | * it under the terms of the GNU General Public License as published by | ||
6 | * the Free Software Foundation; either version 2 of the License, or | ||
7 | * (at your option) any later version. | ||
8 | * | ||
9 | * This program is distributed in the hope that it will be useful, | ||
10 | * but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
11 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||
12 | * GNU General Public License for more details. | ||
13 | * | ||
14 | * You should have received a copy of the GNU General Public License | ||
15 | * along with this program; if not, write to the | ||
16 | * Free Software Foundation, Inc., | ||
17 | * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. | ||
18 | */ | ||
19 | #pragma once | ||
20 | |||
21 | #include "query.h" | ||
22 | #include "storage.h" | ||
23 | #include "resultset.h" | ||
24 | #include "typeindex.h" | ||
25 | #include "query.h" | ||
26 | #include "entitybuffer.h" | ||
27 | |||
28 | class DataStoreQuery { | ||
29 | public: | ||
30 | DataStoreQuery(const Sink::Query &query, const QByteArray &type, Sink::Storage::Transaction &transaction, TypeIndex &typeIndex, std::function<QVariant(const Sink::Entity &entity, const QByteArray &property)> getProperty); | ||
31 | ResultSet execute(); | ||
32 | ResultSet update(qint64 baseRevision); | ||
33 | |||
34 | private: | ||
35 | |||
36 | typedef std::function<bool(const QByteArray &uid, const Sink::EntityBuffer &entityBuffer)> FilterFunction; | ||
37 | typedef std::function<void(const QByteArray &uid, const Sink::EntityBuffer &entityBuffer)> BufferCallback; | ||
38 | |||
39 | QVariant getProperty(const Sink::Entity &entity, const QByteArray &property); | ||
40 | |||
41 | void readEntity(const QByteArray &key, const BufferCallback &resultCallback); | ||
42 | |||
43 | ResultSet loadInitialResultSet(QSet<QByteArray> &remainingFilters, QByteArray &remainingSorting); | ||
44 | ResultSet loadIncrementalResultSet(qint64 baseRevision, QSet<QByteArray> &remainingFilters); | ||
45 | |||
46 | ResultSet filterAndSortSet(ResultSet &resultSet, const FilterFunction &filter, bool initialQuery, const QByteArray &sortProperty); | ||
47 | FilterFunction getFilter(const QSet<QByteArray> &remainingFilters); | ||
48 | |||
49 | Sink::Query mQuery; | ||
50 | Sink::Storage::Transaction &mTransaction; | ||
51 | const QByteArray mType; | ||
52 | TypeIndex &mTypeIndex; | ||
53 | Sink::Storage::NamedDatabase mDb; | ||
54 | std::function<QVariant(const Sink::Entity &entity, const QByteArray &property)> mGetProperty; | ||
55 | }; | ||
56 | |||
57 | |||
58 | |||
59 | |||
diff --git a/common/domain/event.cpp b/common/domain/event.cpp index 0909bf1..dfbcb61 100644 --- a/common/domain/event.cpp +++ b/common/domain/event.cpp | |||
@@ -32,6 +32,8 @@ | |||
32 | #include "../query.h" | 32 | #include "../query.h" |
33 | #include "../definitions.h" | 33 | #include "../definitions.h" |
34 | #include "../typeindex.h" | 34 | #include "../typeindex.h" |
35 | #include "entitybuffer.h" | ||
36 | #include "entity_generated.h" | ||
35 | 37 | ||
36 | #include "event_generated.h" | 38 | #include "event_generated.h" |
37 | 39 | ||
@@ -84,3 +86,14 @@ QSharedPointer<WritePropertyMapper<TypeImplementation<Event>::BufferBuilder> > T | |||
84 | propertyMapper->addMapping<Event::Attachment>(&BufferBuilder::add_attachment); | 86 | propertyMapper->addMapping<Event::Attachment>(&BufferBuilder::add_attachment); |
85 | return propertyMapper; | 87 | return propertyMapper; |
86 | } | 88 | } |
89 | |||
90 | DataStoreQuery TypeImplementation<Event>::prepareQuery(const Sink::Query &query, Sink::Storage::Transaction &transaction) | ||
91 | { | ||
92 | |||
93 | auto mapper = initializeReadPropertyMapper(); | ||
94 | return DataStoreQuery(query, ApplicationDomain::getTypeName<Event>(), transaction, getIndex(), [mapper](const Sink::Entity &entity, const QByteArray &property) { | ||
95 | |||
96 | const auto localBuffer = Sink::EntityBuffer::readBuffer<Buffer>(entity.local()); | ||
97 | return mapper->getProperty(property, localBuffer); | ||
98 | }); | ||
99 | } | ||
diff --git a/common/domain/event.h b/common/domain/event.h index 5315566..4ac572c 100644 --- a/common/domain/event.h +++ b/common/domain/event.h | |||
@@ -21,6 +21,7 @@ | |||
21 | #include "applicationdomaintype.h" | 21 | #include "applicationdomaintype.h" |
22 | 22 | ||
23 | #include "storage.h" | 23 | #include "storage.h" |
24 | #include "datastorequery.h" | ||
24 | 25 | ||
25 | class ResultSet; | 26 | class ResultSet; |
26 | class QByteArray; | 27 | class QByteArray; |
@@ -50,6 +51,7 @@ public: | |||
50 | typedef Sink::ApplicationDomain::Buffer::Event Buffer; | 51 | typedef Sink::ApplicationDomain::Buffer::Event Buffer; |
51 | typedef Sink::ApplicationDomain::Buffer::EventBuilder BufferBuilder; | 52 | typedef Sink::ApplicationDomain::Buffer::EventBuilder BufferBuilder; |
52 | static QSet<QByteArray> indexedProperties(); | 53 | static QSet<QByteArray> indexedProperties(); |
54 | static DataStoreQuery prepareQuery(const Sink::Query &query, Sink::Storage::Transaction &transaction); | ||
53 | /** | 55 | /** |
54 | * Returns the potential result set based on the indexes. | 56 | * Returns the potential result set based on the indexes. |
55 | * | 57 | * |
diff --git a/common/domain/folder.cpp b/common/domain/folder.cpp index ddb0c10..6d487b1 100644 --- a/common/domain/folder.cpp +++ b/common/domain/folder.cpp | |||
@@ -32,6 +32,8 @@ | |||
32 | #include "../query.h" | 32 | #include "../query.h" |
33 | #include "../definitions.h" | 33 | #include "../definitions.h" |
34 | #include "../typeindex.h" | 34 | #include "../typeindex.h" |
35 | #include "entitybuffer.h" | ||
36 | #include "entity_generated.h" | ||
35 | 37 | ||
36 | #include "folder_generated.h" | 38 | #include "folder_generated.h" |
37 | 39 | ||
@@ -88,3 +90,12 @@ QSharedPointer<WritePropertyMapper<TypeImplementation<Folder>::BufferBuilder> > | |||
88 | propertyMapper->addMapping<Folder::SpecialPurpose>(&BufferBuilder::add_specialpurpose); | 90 | propertyMapper->addMapping<Folder::SpecialPurpose>(&BufferBuilder::add_specialpurpose); |
89 | return propertyMapper; | 91 | return propertyMapper; |
90 | } | 92 | } |
93 | |||
94 | DataStoreQuery TypeImplementation<Folder>::prepareQuery(const Sink::Query &query, Sink::Storage::Transaction &transaction) | ||
95 | { | ||
96 | auto mapper = initializeReadPropertyMapper(); | ||
97 | return DataStoreQuery(query, ApplicationDomain::getTypeName<Folder>(), transaction, getIndex(), [mapper](const Sink::Entity &entity, const QByteArray &property) { | ||
98 | const auto localBuffer = Sink::EntityBuffer::readBuffer<Buffer>(entity.local()); | ||
99 | return mapper->getProperty(property, localBuffer); | ||
100 | }); | ||
101 | } | ||
diff --git a/common/domain/folder.h b/common/domain/folder.h index 6e066e1..77edc8a 100644 --- a/common/domain/folder.h +++ b/common/domain/folder.h | |||
@@ -21,6 +21,7 @@ | |||
21 | #include "applicationdomaintype.h" | 21 | #include "applicationdomaintype.h" |
22 | 22 | ||
23 | #include "storage.h" | 23 | #include "storage.h" |
24 | #include "datastorequery.h" | ||
24 | 25 | ||
25 | class ResultSet; | 26 | class ResultSet; |
26 | class QByteArray; | 27 | class QByteArray; |
@@ -44,6 +45,7 @@ class TypeImplementation<Sink::ApplicationDomain::Folder> { | |||
44 | public: | 45 | public: |
45 | typedef Sink::ApplicationDomain::Buffer::Folder Buffer; | 46 | typedef Sink::ApplicationDomain::Buffer::Folder Buffer; |
46 | typedef Sink::ApplicationDomain::Buffer::FolderBuilder BufferBuilder; | 47 | typedef Sink::ApplicationDomain::Buffer::FolderBuilder BufferBuilder; |
48 | static DataStoreQuery prepareQuery(const Sink::Query &query, Sink::Storage::Transaction &transaction); | ||
47 | static QSet<QByteArray> indexedProperties(); | 49 | static QSet<QByteArray> indexedProperties(); |
48 | static ResultSet queryIndexes(const Sink::Query &query, const QByteArray &resourceInstanceIdentifier, QSet<QByteArray> &appliedFilters, QByteArray &appliedSorting, Sink::Storage::Transaction &transaction); | 50 | static ResultSet queryIndexes(const Sink::Query &query, const QByteArray &resourceInstanceIdentifier, QSet<QByteArray> &appliedFilters, QByteArray &appliedSorting, Sink::Storage::Transaction &transaction); |
49 | static void index(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::Transaction &transaction); | 51 | static void index(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::Transaction &transaction); |
diff --git a/common/domain/mail.cpp b/common/domain/mail.cpp index 13e1305..bb5ad58 100644 --- a/common/domain/mail.cpp +++ b/common/domain/mail.cpp | |||
@@ -32,6 +32,8 @@ | |||
32 | #include "../query.h" | 32 | #include "../query.h" |
33 | #include "../definitions.h" | 33 | #include "../definitions.h" |
34 | #include "../typeindex.h" | 34 | #include "../typeindex.h" |
35 | #include "entitybuffer.h" | ||
36 | #include "entity_generated.h" | ||
35 | 37 | ||
36 | #include "mail_generated.h" | 38 | #include "mail_generated.h" |
37 | 39 | ||
@@ -110,3 +112,14 @@ QSharedPointer<WritePropertyMapper<TypeImplementation<Mail>::BufferBuilder> > Ty | |||
110 | propertyMapper->addMapping<Mail::Sent>(&BufferBuilder::add_sent); | 112 | propertyMapper->addMapping<Mail::Sent>(&BufferBuilder::add_sent); |
111 | return propertyMapper; | 113 | return propertyMapper; |
112 | } | 114 | } |
115 | |||
116 | DataStoreQuery TypeImplementation<Mail>::prepareQuery(const Sink::Query &query, Sink::Storage::Transaction &transaction) | ||
117 | { | ||
118 | auto mapper = initializeReadPropertyMapper(); | ||
119 | return DataStoreQuery(query, ApplicationDomain::getTypeName<Mail>(), transaction, getIndex(), [mapper](const Sink::Entity &entity, const QByteArray &property) { | ||
120 | |||
121 | const auto localBuffer = Sink::EntityBuffer::readBuffer<Buffer>(entity.local()); | ||
122 | return mapper->getProperty(property, localBuffer); | ||
123 | }); | ||
124 | } | ||
125 | |||
diff --git a/common/domain/mail.h b/common/domain/mail.h index ff169dd..d6af9c5 100644 --- a/common/domain/mail.h +++ b/common/domain/mail.h | |||
@@ -21,6 +21,7 @@ | |||
21 | #include "applicationdomaintype.h" | 21 | #include "applicationdomaintype.h" |
22 | 22 | ||
23 | #include "storage.h" | 23 | #include "storage.h" |
24 | #include "datastorequery.h" | ||
24 | 25 | ||
25 | class ResultSet; | 26 | class ResultSet; |
26 | class QByteArray; | 27 | class QByteArray; |
@@ -44,6 +45,7 @@ class TypeImplementation<Sink::ApplicationDomain::Mail> { | |||
44 | public: | 45 | public: |
45 | typedef Sink::ApplicationDomain::Buffer::Mail Buffer; | 46 | typedef Sink::ApplicationDomain::Buffer::Mail Buffer; |
46 | typedef Sink::ApplicationDomain::Buffer::MailBuilder BufferBuilder; | 47 | typedef Sink::ApplicationDomain::Buffer::MailBuilder BufferBuilder; |
48 | static DataStoreQuery prepareQuery(const Sink::Query &query, Sink::Storage::Transaction &transaction); | ||
47 | static QSet<QByteArray> indexedProperties(); | 49 | static QSet<QByteArray> indexedProperties(); |
48 | /** | 50 | /** |
49 | * Returns the potential result set based on the indexes. | 51 | * Returns the potential result set based on the indexes. |
diff --git a/common/entitybuffer.cpp b/common/entitybuffer.cpp index 950bc46..32583cc 100644 --- a/common/entitybuffer.cpp +++ b/common/entitybuffer.cpp | |||
@@ -26,7 +26,7 @@ bool EntityBuffer::isValid() const | |||
26 | return mEntity; | 26 | return mEntity; |
27 | } | 27 | } |
28 | 28 | ||
29 | const Sink::Entity &EntityBuffer::entity() | 29 | const Sink::Entity &EntityBuffer::entity() const |
30 | { | 30 | { |
31 | Q_ASSERT(mEntity); | 31 | Q_ASSERT(mEntity); |
32 | return *mEntity; | 32 | return *mEntity; |
@@ -84,3 +84,15 @@ void EntityBuffer::assembleEntityBuffer( | |||
84 | auto entity = Sink::CreateEntity(fbb, metadata, resource, local); | 84 | auto entity = Sink::CreateEntity(fbb, metadata, resource, local); |
85 | Sink::FinishEntityBuffer(fbb, entity); | 85 | Sink::FinishEntityBuffer(fbb, entity); |
86 | } | 86 | } |
87 | |||
88 | Sink::Operation EntityBuffer::operation() const | ||
89 | { | ||
90 | const auto metadataBuffer = readBuffer<Sink::Metadata>(mEntity->metadata()); | ||
91 | return metadataBuffer ? metadataBuffer->operation() : Sink::Operation_Creation; | ||
92 | } | ||
93 | |||
94 | qint64 EntityBuffer::revision() const | ||
95 | { | ||
96 | const auto metadataBuffer = readBuffer<Sink::Metadata>(mEntity->metadata()); | ||
97 | return metadataBuffer ? metadataBuffer->revision() : -1; | ||
98 | } | ||
diff --git a/common/entitybuffer.h b/common/entitybuffer.h index 866a7d0..4162605 100644 --- a/common/entitybuffer.h +++ b/common/entitybuffer.h | |||
@@ -3,6 +3,7 @@ | |||
3 | #include "sink_export.h" | 3 | #include "sink_export.h" |
4 | #include <functional> | 4 | #include <functional> |
5 | #include <flatbuffers/flatbuffers.h> | 5 | #include <flatbuffers/flatbuffers.h> |
6 | #include "metadata_generated.h" | ||
6 | #include <QByteArray> | 7 | #include <QByteArray> |
7 | 8 | ||
8 | namespace Sink { | 9 | namespace Sink { |
@@ -16,9 +17,12 @@ public: | |||
16 | const uint8_t *resourceBuffer(); | 17 | const uint8_t *resourceBuffer(); |
17 | const uint8_t *metadataBuffer(); | 18 | const uint8_t *metadataBuffer(); |
18 | const uint8_t *localBuffer(); | 19 | const uint8_t *localBuffer(); |
19 | const Entity &entity(); | 20 | const Entity &entity() const; |
20 | bool isValid() const; | 21 | bool isValid() const; |
21 | 22 | ||
23 | Sink::Operation operation() const; | ||
24 | qint64 revision() const; | ||
25 | |||
22 | static void extractResourceBuffer(void *dataValue, int dataSize, const std::function<void(const uint8_t *, size_t size)> &handler); | 26 | static void extractResourceBuffer(void *dataValue, int dataSize, const std::function<void(const uint8_t *, size_t size)> &handler); |
23 | /* | 27 | /* |
24 | * TODO: Ideally we would be passing references to vectors in the same bufferbuilder, to avoid needlessly copying data. | 28 | * TODO: Ideally we would be passing references to vectors in the same bufferbuilder, to avoid needlessly copying data. |
diff --git a/common/entityreader.cpp b/common/entityreader.cpp index 01c25d2..faa154b 100644 --- a/common/entityreader.cpp +++ b/common/entityreader.cpp | |||
@@ -150,205 +150,41 @@ void EntityReader<DomainType>::query(const Sink::Query &query, const std::functi | |||
150 | }); | 150 | }); |
151 | } | 151 | } |
152 | 152 | ||
153 | template <class DomainType> | 153 | /* template <class DomainType> */ |
154 | void EntityReader<DomainType>::readEntity(const Sink::Storage::NamedDatabase &db, const QByteArray &key, | 154 | /* void EntityReader<DomainType>::readEntity(const Sink::Storage::NamedDatabase &db, const QByteArray &key, */ |
155 | const std::function<void(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &, Sink::Operation)> &resultCallback) | 155 | /* const std::function<void(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &, Sink::Operation)> &resultCallback) */ |
156 | { | 156 | /* { */ |
157 | db.findLatest(key, | 157 | /* db.findLatest(key, */ |
158 | [=](const QByteArray &key, const QByteArray &value) -> bool { | 158 | /* [=](const QByteArray &key, const QByteArray &value) -> bool { */ |
159 | Sink::EntityBuffer buffer(value.data(), value.size()); | 159 | /* Sink::EntityBuffer buffer(value.data(), value.size()); */ |
160 | const Sink::Entity &entity = buffer.entity(); | 160 | /* const Sink::Entity &entity = buffer.entity(); */ |
161 | const auto metadataBuffer = Sink::EntityBuffer::readBuffer<Sink::Metadata>(entity.metadata()); | 161 | /* const auto metadataBuffer = Sink::EntityBuffer::readBuffer<Sink::Metadata>(entity.metadata()); */ |
162 | const qint64 revision = metadataBuffer ? metadataBuffer->revision() : -1; | 162 | /* const qint64 revision = metadataBuffer ? metadataBuffer->revision() : -1; */ |
163 | const auto operation = metadataBuffer ? metadataBuffer->operation() : Sink::Operation_Creation; | 163 | /* const auto operation = metadataBuffer ? metadataBuffer->operation() : Sink::Operation_Creation; */ |
164 | auto adaptor = mDomainTypeAdaptorFactory.createAdaptor(entity); | 164 | /* auto adaptor = mDomainTypeAdaptorFactory.createAdaptor(entity); */ |
165 | Q_ASSERT(adaptor); | 165 | /* Q_ASSERT(adaptor); */ |
166 | resultCallback(DomainType::Ptr::create(mResourceInstanceIdentifier, Sink::Storage::uidFromKey(key), revision, adaptor), operation); | 166 | /* resultCallback(DomainType::Ptr::create(mResourceInstanceIdentifier, Sink::Storage::uidFromKey(key), revision, adaptor), operation); */ |
167 | return false; | 167 | /* return false; */ |
168 | }, | 168 | /* }, */ |
169 | [&](const Sink::Storage::Error &error) { SinkWarning() << "Error during query: " << error.message << key; }); | 169 | /* [&](const Sink::Storage::Error &error) { SinkWarning() << "Error during query: " << error.message << key; }); */ |
170 | } | 170 | /* } */ |
171 | |||
172 | static inline ResultSet fullScan(const Sink::Storage::Transaction &transaction, const QByteArray &bufferType) | ||
173 | { | ||
174 | // TODO use a result set with an iterator, to read values on demand | ||
175 | SinkTrace() << "Looking for : " << bufferType; | ||
176 | //The scan can return duplicate results if we have multiple revisions, so we use a set to deduplicate. | ||
177 | QSet<QByteArray> keys; | ||
178 | Storage::mainDatabase(transaction, bufferType) | ||
179 | .scan(QByteArray(), | ||
180 | [&](const QByteArray &key, const QByteArray &value) -> bool { | ||
181 | if (keys.contains(Sink::Storage::uidFromKey(key))) { | ||
182 | //Not something that should persist if the replay works, so we keep a message for now. | ||
183 | SinkTrace() << "Multiple revisions for key: " << key; | ||
184 | } | ||
185 | keys << Sink::Storage::uidFromKey(key); | ||
186 | return true; | ||
187 | }, | ||
188 | [](const Sink::Storage::Error &error) { SinkWarning() << "Error during query: " << error.message; }); | ||
189 | |||
190 | SinkTrace() << "Full scan retrieved " << keys.size() << " results."; | ||
191 | return ResultSet(keys.toList().toVector()); | ||
192 | } | ||
193 | |||
194 | template <class DomainType> | ||
195 | ResultSet EntityReader<DomainType>::loadInitialResultSet(const Sink::Query &query, QSet<QByteArray> &remainingFilters, QByteArray &remainingSorting) | ||
196 | { | ||
197 | if (!query.ids.isEmpty()) { | ||
198 | return ResultSet(query.ids.toVector()); | ||
199 | } | ||
200 | QSet<QByteArray> appliedFilters; | ||
201 | QByteArray appliedSorting; | ||
202 | auto resultSet = Sink::ApplicationDomain::TypeImplementation<DomainType>::queryIndexes(query, mResourceInstanceIdentifier, appliedFilters, appliedSorting, mTransaction); | ||
203 | remainingFilters = query.propertyFilter.keys().toSet() - appliedFilters; | ||
204 | if (appliedSorting.isEmpty()) { | ||
205 | remainingSorting = query.sortProperty; | ||
206 | } | ||
207 | 171 | ||
208 | // We do a full scan if there were no indexes available to create the initial set. | ||
209 | if (appliedFilters.isEmpty()) { | ||
210 | // TODO this should be replaced by an index lookup as well | ||
211 | resultSet = fullScan(mTransaction, ApplicationDomain::getTypeName<DomainType>()); | ||
212 | } | ||
213 | return resultSet; | ||
214 | } | ||
215 | |||
216 | template <class DomainType> | ||
217 | ResultSet EntityReader<DomainType>::loadIncrementalResultSet(qint64 baseRevision, const Sink::Query &query, QSet<QByteArray> &remainingFilters) | ||
218 | { | ||
219 | const auto bufferType = ApplicationDomain::getTypeName<DomainType>(); | ||
220 | auto revisionCounter = QSharedPointer<qint64>::create(baseRevision); | ||
221 | remainingFilters = query.propertyFilter.keys().toSet(); | ||
222 | return ResultSet([this, bufferType, revisionCounter]() -> QByteArray { | ||
223 | const qint64 topRevision = Sink::Storage::maxRevision(mTransaction); | ||
224 | // Spit out the revision keys one by one. | ||
225 | while (*revisionCounter <= topRevision) { | ||
226 | const auto uid = Sink::Storage::getUidFromRevision(mTransaction, *revisionCounter); | ||
227 | const auto type = Sink::Storage::getTypeFromRevision(mTransaction, *revisionCounter); | ||
228 | // SinkTrace() << "Revision" << *revisionCounter << type << uid; | ||
229 | Q_ASSERT(!uid.isEmpty()); | ||
230 | Q_ASSERT(!type.isEmpty()); | ||
231 | if (type != bufferType) { | ||
232 | // Skip revision | ||
233 | *revisionCounter += 1; | ||
234 | continue; | ||
235 | } | ||
236 | const auto key = Sink::Storage::assembleKey(uid, *revisionCounter); | ||
237 | *revisionCounter += 1; | ||
238 | return key; | ||
239 | } | ||
240 | SinkTrace() << "Finished reading incremental result set:" << *revisionCounter; | ||
241 | // We're done | ||
242 | return QByteArray(); | ||
243 | }); | ||
244 | } | ||
245 | |||
246 | template <class DomainType> | ||
247 | ResultSet EntityReader<DomainType>::filterAndSortSet(ResultSet &resultSet, const std::function<bool(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> &filter, | ||
248 | const Sink::Storage::NamedDatabase &db, bool initialQuery, const QByteArray &sortProperty) | ||
249 | { | ||
250 | const bool sortingRequired = !sortProperty.isEmpty(); | ||
251 | if (initialQuery && sortingRequired) { | ||
252 | SinkTrace() << "Sorting the resultset in memory according to property: " << sortProperty; | ||
253 | // Sort the complete set by reading the sort property and filling into a sorted map | ||
254 | auto sortedMap = QSharedPointer<QMap<QByteArray, QByteArray>>::create(); | ||
255 | while (resultSet.next()) { | ||
256 | // readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess) | ||
257 | readEntity(db, resultSet.id(), | ||
258 | [this, filter, initialQuery, sortedMap, sortProperty, &resultSet](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject, Sink::Operation operation) { | ||
259 | // We're not interested in removals during the initial query | ||
260 | if ((operation != Sink::Operation_Removal) && filter(domainObject)) { | ||
261 | if (!sortProperty.isEmpty()) { | ||
262 | const auto sortValue = domainObject->getProperty(sortProperty); | ||
263 | if (sortValue.type() == QVariant::DateTime) { | ||
264 | sortedMap->insert(QByteArray::number(std::numeric_limits<unsigned int>::max() - sortValue.toDateTime().toTime_t()), domainObject->identifier()); | ||
265 | } else { | ||
266 | sortedMap->insert(sortValue.toString().toLatin1(), domainObject->identifier()); | ||
267 | } | ||
268 | } else { | ||
269 | sortedMap->insert(domainObject->identifier(), domainObject->identifier()); | ||
270 | } | ||
271 | } | ||
272 | }); | ||
273 | } | ||
274 | |||
275 | SinkTrace() << "Sorted " << sortedMap->size() << " values."; | ||
276 | auto iterator = QSharedPointer<QMapIterator<QByteArray, QByteArray>>::create(*sortedMap); | ||
277 | ResultSet::ValueGenerator generator = [this, iterator, sortedMap, &db, filter, initialQuery]( | ||
278 | std::function<void(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &, Sink::Operation)> callback) -> bool { | ||
279 | if (iterator->hasNext()) { | ||
280 | readEntity(db, iterator->next().value(), [this, filter, callback, initialQuery](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject, | ||
281 | Sink::Operation operation) { callback(domainObject, Sink::Operation_Creation); }); | ||
282 | return true; | ||
283 | } | ||
284 | return false; | ||
285 | }; | ||
286 | 172 | ||
287 | auto skip = [iterator]() { | ||
288 | if (iterator->hasNext()) { | ||
289 | iterator->next(); | ||
290 | } | ||
291 | }; | ||
292 | return ResultSet(generator, skip); | ||
293 | } else { | ||
294 | auto resultSetPtr = QSharedPointer<ResultSet>::create(resultSet); | ||
295 | ResultSet::ValueGenerator generator = [this, resultSetPtr, &db, filter, initialQuery]( | ||
296 | std::function<void(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &, Sink::Operation)> callback) -> bool { | ||
297 | if (resultSetPtr->next()) { | ||
298 | // readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess) | ||
299 | readEntity(db, resultSetPtr->id(), [this, filter, callback, initialQuery](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject, Sink::Operation operation) { | ||
300 | if (initialQuery) { | ||
301 | // We're not interested in removals during the initial query | ||
302 | if ((operation != Sink::Operation_Removal) && filter(domainObject)) { | ||
303 | // In the initial set every entity is new | ||
304 | callback(domainObject, Sink::Operation_Creation); | ||
305 | } | ||
306 | } else { | ||
307 | // Always remove removals, they probably don't match due to non-available properties | ||
308 | if ((operation == Sink::Operation_Removal) || filter(domainObject)) { | ||
309 | // TODO only replay if this is in the currently visible set (or just always replay, worst case we have a couple to many results) | ||
310 | callback(domainObject, operation); | ||
311 | } | ||
312 | } | ||
313 | }); | ||
314 | return true; | ||
315 | } | ||
316 | return false; | ||
317 | }; | ||
318 | auto skip = [resultSetPtr]() { resultSetPtr->skip(1); }; | ||
319 | return ResultSet(generator, skip); | ||
320 | } | ||
321 | } | ||
322 | 173 | ||
323 | template <class DomainType> | 174 | template <class DomainType> |
324 | QPair<qint64, qint64> EntityReader<DomainType>::load(const Sink::Query &query, const std::function<ResultSet(QSet<QByteArray> &, QByteArray &)> &baseSetRetriever, bool initialQuery, int offset, int batchSize, const std::function<bool(const typename DomainType::Ptr &value, Sink::Operation operation)> &callback) | 175 | QPair<qint64, qint64> EntityReader<DomainType>::executeInitialQuery(const Sink::Query &query, int offset, int batchsize, const std::function<bool(const typename DomainType::Ptr &value, Sink::Operation operation)> &callback) |
325 | { | 176 | { |
326 | QTime time; | 177 | QTime time; |
327 | time.start(); | 178 | time.start(); |
328 | 179 | ||
329 | auto db = Storage::mainDatabase(mTransaction, ApplicationDomain::getTypeName<DomainType>()); | 180 | auto preparedQuery = ApplicationDomain::TypeImplementation<DomainType>::prepareQuery(query, mTransaction); |
181 | auto resultSet = preparedQuery.execute(); | ||
330 | 182 | ||
331 | QSet<QByteArray> remainingFilters; | ||
332 | QByteArray remainingSorting; | ||
333 | auto resultSet = baseSetRetriever(remainingFilters, remainingSorting); | ||
334 | SinkTrace() << "Base set retrieved. " << Log::TraceTime(time.elapsed()); | ||
335 | auto filteredSet = filterAndSortSet(resultSet, getFilter(remainingFilters, query), db, initialQuery, remainingSorting); | ||
336 | SinkTrace() << "Filtered set retrieved. " << Log::TraceTime(time.elapsed()); | 183 | SinkTrace() << "Filtered set retrieved. " << Log::TraceTime(time.elapsed()); |
337 | auto replayedEntities = replaySet(filteredSet, offset, batchSize, callback); | 184 | auto replayedEntities = replaySet(resultSet, offset, batchsize, callback); |
338 | // SinkTrace() << "Filtered set replayed. " << Log::TraceTime(time.elapsed()); | ||
339 | return qMakePair(Sink::Storage::maxRevision(mTransaction), replayedEntities); | ||
340 | } | ||
341 | 185 | ||
342 | template <class DomainType> | ||
343 | QPair<qint64, qint64> EntityReader<DomainType>::executeInitialQuery(const Sink::Query &query, int offset, int batchsize, const std::function<bool(const typename DomainType::Ptr &value, Sink::Operation operation)> &callback) | ||
344 | { | ||
345 | QTime time; | ||
346 | time.start(); | ||
347 | auto revisionAndReplayedEntities = load(query, [&](QSet<QByteArray> &remainingFilters, QByteArray &remainingSorting) -> ResultSet { | ||
348 | return loadInitialResultSet(query, remainingFilters, remainingSorting); | ||
349 | }, true, offset, batchsize, callback); | ||
350 | SinkTrace() << "Initial query took: " << Log::TraceTime(time.elapsed()); | 186 | SinkTrace() << "Initial query took: " << Log::TraceTime(time.elapsed()); |
351 | return revisionAndReplayedEntities; | 187 | return qMakePair(Sink::Storage::maxRevision(mTransaction), replayedEntities); |
352 | } | 188 | } |
353 | 189 | ||
354 | template <class DomainType> | 190 | template <class DomainType> |
@@ -357,33 +193,15 @@ QPair<qint64, qint64> EntityReader<DomainType>::executeIncrementalQuery(const Si | |||
357 | QTime time; | 193 | QTime time; |
358 | time.start(); | 194 | time.start(); |
359 | const qint64 baseRevision = lastRevision + 1; | 195 | const qint64 baseRevision = lastRevision + 1; |
360 | auto revisionAndReplayedEntities = load(query, [&](QSet<QByteArray> &remainingFilters, QByteArray &remainingSorting) -> ResultSet { | ||
361 | return loadIncrementalResultSet(baseRevision, query, remainingFilters); | ||
362 | }, false, 0, 0, callback); | ||
363 | SinkTrace() << "Initial query took: " << Log::TraceTime(time.elapsed()); | ||
364 | return revisionAndReplayedEntities; | ||
365 | } | ||
366 | 196 | ||
367 | template <class DomainType> | 197 | auto preparedQuery = ApplicationDomain::TypeImplementation<DomainType>::prepareQuery(query, mTransaction); |
368 | std::function<bool(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> | 198 | auto resultSet = preparedQuery.update(baseRevision); |
369 | EntityReader<DomainType>::getFilter(const QSet<QByteArray> remainingFilters, const Sink::Query &query) | 199 | |
370 | { | 200 | SinkTrace() << "Filtered set retrieved. " << Log::TraceTime(time.elapsed()); |
371 | return [this, remainingFilters, query](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject) -> bool { | 201 | auto replayedEntities = replaySet(resultSet, 0, 0, callback); |
372 | if (!query.ids.isEmpty()) { | 202 | |
373 | if (!query.ids.contains(domainObject->identifier())) { | 203 | SinkTrace() << "Incremental query took: " << Log::TraceTime(time.elapsed()); |
374 | return false; | 204 | return qMakePair(Sink::Storage::maxRevision(mTransaction), replayedEntities); |
375 | } | ||
376 | } | ||
377 | for (const auto &filterProperty : remainingFilters) { | ||
378 | const auto property = domainObject->getProperty(filterProperty); | ||
379 | const auto comparator = query.propertyFilter.value(filterProperty); | ||
380 | if (!comparator.matches(property)) { | ||
381 | SinkTrace() << "Filtering entity due to property mismatch on filter: " << filterProperty << property << ":" << comparator.value; | ||
382 | return false; | ||
383 | } | ||
384 | } | ||
385 | return true; | ||
386 | }; | ||
387 | } | 205 | } |
388 | 206 | ||
389 | template <class DomainType> | 207 | template <class DomainType> |
@@ -394,9 +212,11 @@ qint64 EntityReader<DomainType>::replaySet(ResultSet &resultSet, int offset, int | |||
394 | int counter = 0; | 212 | int counter = 0; |
395 | while (!batchSize || (counter < batchSize)) { | 213 | while (!batchSize || (counter < batchSize)) { |
396 | const bool ret = | 214 | const bool ret = |
397 | resultSet.next([&counter, callback](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &value, Sink::Operation operation) -> bool { | 215 | resultSet.next([this, &counter, callback](const QByteArray &uid, const Sink::EntityBuffer &value, Sink::Operation operation) -> bool { |
398 | counter++; | 216 | counter++; |
399 | return callback(value.staticCast<DomainType>(), operation); | 217 | auto adaptor = mDomainTypeAdaptorFactory.createAdaptor(value.entity()); |
218 | Q_ASSERT(adaptor); | ||
219 | return callback(QSharedPointer<DomainType>::create(mResourceInstanceIdentifier, uid, value.revision(), adaptor), operation); | ||
400 | }); | 220 | }); |
401 | if (!ret) { | 221 | if (!ret) { |
402 | break; | 222 | break; |
diff --git a/common/entityreader.h b/common/entityreader.h index a479679..f216453 100644 --- a/common/entityreader.h +++ b/common/entityreader.h | |||
@@ -89,17 +89,6 @@ public: | |||
89 | private: | 89 | private: |
90 | qint64 replaySet(ResultSet &resultSet, int offset, int batchSize, const ResultCallback &callback); | 90 | qint64 replaySet(ResultSet &resultSet, int offset, int batchSize, const ResultCallback &callback); |
91 | 91 | ||
92 | void readEntity(const Sink::Storage::NamedDatabase &db, const QByteArray &key, | ||
93 | const std::function<void(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &, Sink::Operation)> &resultCallback); | ||
94 | |||
95 | ResultSet loadInitialResultSet(const Sink::Query &query, QSet<QByteArray> &remainingFilters, QByteArray &remainingSorting); | ||
96 | ResultSet loadIncrementalResultSet(qint64 baseRevision, const Sink::Query &query, QSet<QByteArray> &remainingFilters); | ||
97 | |||
98 | ResultSet filterAndSortSet(ResultSet &resultSet, const std::function<bool(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> &filter, | ||
99 | const Sink::Storage::NamedDatabase &db, bool initialQuery, const QByteArray &sortProperty); | ||
100 | std::function<bool(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> getFilter(const QSet<QByteArray> remainingFilters, const Sink::Query &query); | ||
101 | QPair<qint64, qint64> load(const Sink::Query &query, const std::function<ResultSet(QSet<QByteArray> &, QByteArray &)> &baseSetRetriever, bool initialQuery, int offset, int batchSize, const ResultCallback &callback); | ||
102 | |||
103 | private: | 92 | private: |
104 | QByteArray mResourceInstanceIdentifier; | 93 | QByteArray mResourceInstanceIdentifier; |
105 | Sink::Storage::Transaction &mTransaction; | 94 | Sink::Storage::Transaction &mTransaction; |
diff --git a/common/resultset.cpp b/common/resultset.cpp index 293035b..51914e9 100644 --- a/common/resultset.cpp +++ b/common/resultset.cpp | |||
@@ -18,7 +18,7 @@ | |||
18 | */ | 18 | */ |
19 | #include "resultset.h" | 19 | #include "resultset.h" |
20 | 20 | ||
21 | #include "common/log.h" | 21 | #include "log.h" |
22 | 22 | ||
23 | ResultSet::ResultSet() : mIt(nullptr) | 23 | ResultSet::ResultSet() : mIt(nullptr) |
24 | { | 24 | { |
@@ -78,12 +78,12 @@ bool ResultSet::next() | |||
78 | return true; | 78 | return true; |
79 | } | 79 | } |
80 | } else { | 80 | } else { |
81 | next([](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &value, Sink::Operation) { return false; }); | 81 | next([](const QByteArray &, const Sink::EntityBuffer &, Sink::Operation) { return false; }); |
82 | } | 82 | } |
83 | return false; | 83 | return false; |
84 | } | 84 | } |
85 | 85 | ||
86 | bool ResultSet::next(std::function<bool(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &value, Sink::Operation)> callback) | 86 | bool ResultSet::next(const Callback &callback) |
87 | { | 87 | { |
88 | Q_ASSERT(mValueGenerator); | 88 | Q_ASSERT(mValueGenerator); |
89 | return mValueGenerator(callback); | 89 | return mValueGenerator(callback); |
diff --git a/common/resultset.h b/common/resultset.h index 88f7055..4e934fc 100644 --- a/common/resultset.h +++ b/common/resultset.h | |||
@@ -20,8 +20,8 @@ | |||
20 | 20 | ||
21 | #include <QVector> | 21 | #include <QVector> |
22 | #include <functional> | 22 | #include <functional> |
23 | #include "domain/applicationdomaintype.h" | ||
24 | #include "metadata_generated.h" | 23 | #include "metadata_generated.h" |
24 | #include "entitybuffer.h" | ||
25 | 25 | ||
26 | /* | 26 | /* |
27 | * An iterator to a result set. | 27 | * An iterator to a result set. |
@@ -31,7 +31,8 @@ | |||
31 | class ResultSet | 31 | class ResultSet |
32 | { | 32 | { |
33 | public: | 33 | public: |
34 | typedef std::function<bool(std::function<void(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &, Sink::Operation)>)> ValueGenerator; | 34 | typedef std::function<void(const QByteArray &uid, const Sink::EntityBuffer &, Sink::Operation)> Callback; |
35 | typedef std::function<bool(Callback)> ValueGenerator; | ||
35 | typedef std::function<QByteArray()> IdGenerator; | 36 | typedef std::function<QByteArray()> IdGenerator; |
36 | typedef std::function<void()> SkipValue; | 37 | typedef std::function<void()> SkipValue; |
37 | 38 | ||
@@ -42,7 +43,7 @@ public: | |||
42 | ResultSet(const ResultSet &other); | 43 | ResultSet(const ResultSet &other); |
43 | 44 | ||
44 | bool next(); | 45 | bool next(); |
45 | bool next(std::function<bool(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &value, Sink::Operation)> callback); | 46 | bool next(const Callback &callback); |
46 | 47 | ||
47 | void skip(int number); | 48 | void skip(int number); |
48 | 49 | ||