diff options
author | Christian Mollekopf <chrigi_1@fastmail.fm> | 2016-09-19 18:55:21 +0200 |
---|---|---|
committer | Christian Mollekopf <chrigi_1@fastmail.fm> | 2016-09-19 18:55:21 +0200 |
commit | 4a14a6fade947aa830d3f21598a4a6ba7316b933 (patch) | |
tree | c6b340bf1c6284e5501d371f65b58e3a69391a26 /common/datastorequery.cpp | |
parent | 1deac558af4b1c9f04352ede7f8e172f11a70a6b (diff) | |
download | sink-4a14a6fade947aa830d3f21598a4a6ba7316b933.tar.gz sink-4a14a6fade947aa830d3f21598a4a6ba7316b933.zip |
Refactored the query part of the entity reader into DataStoreQuery.
DataStoreQuery now encapsulates the low-level query that operates
directly on the storage. It no longer has access to the resource
buffers, and is instantiated by the type implementation, so we can
specialize the query alogorithm per type, but not per resource.
This will allow us to implement the threading queries for the mailtype.
Diffstat (limited to 'common/datastorequery.cpp')
-rw-r--r-- | common/datastorequery.cpp | 246 |
1 files changed, 246 insertions, 0 deletions
diff --git a/common/datastorequery.cpp b/common/datastorequery.cpp new file mode 100644 index 0000000..3237c53 --- /dev/null +++ b/common/datastorequery.cpp | |||
@@ -0,0 +1,246 @@ | |||
1 | /* | ||
2 | * Copyright (C) 2016 Christian Mollekopf <chrigi_1@fastmail.fm> | ||
3 | * | ||
4 | * This program is free software; you can redistribute it and/or modify | ||
5 | * it under the terms of the GNU General Public License as published by | ||
6 | * the Free Software Foundation; either version 2 of the License, or | ||
7 | * (at your option) any later version. | ||
8 | * | ||
9 | * This program is distributed in the hope that it will be useful, | ||
10 | * but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
11 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||
12 | * GNU General Public License for more details. | ||
13 | * | ||
14 | * You should have received a copy of the GNU General Public License | ||
15 | * along with this program; if not, write to the | ||
16 | * Free Software Foundation, Inc., | ||
17 | * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. | ||
18 | */ | ||
19 | #include "datastorequery.h" | ||
20 | |||
21 | #include "log.h" | ||
22 | #include "entitybuffer.h" | ||
23 | #include "entity_generated.h" | ||
24 | |||
25 | using namespace Sink; | ||
26 | |||
27 | |||
28 | SINK_DEBUG_AREA("datastorequery") | ||
29 | |||
30 | DataStoreQuery::DataStoreQuery(const Sink::Query &query, const QByteArray &type, Sink::Storage::Transaction &transaction, TypeIndex &typeIndex, std::function<QVariant(const Sink::Entity &entity, const QByteArray &property)> getProperty) | ||
31 | : mQuery(query), mTransaction(transaction), mType(type), mTypeIndex(typeIndex), mDb(Storage::mainDatabase(mTransaction, mType)), mGetProperty(getProperty) | ||
32 | { | ||
33 | |||
34 | } | ||
35 | |||
36 | static inline ResultSet fullScan(const Sink::Storage::Transaction &transaction, const QByteArray &bufferType) | ||
37 | { | ||
38 | // TODO use a result set with an iterator, to read values on demand | ||
39 | SinkTrace() << "Looking for : " << bufferType; | ||
40 | //The scan can return duplicate results if we have multiple revisions, so we use a set to deduplicate. | ||
41 | QSet<QByteArray> keys; | ||
42 | Storage::mainDatabase(transaction, bufferType) | ||
43 | .scan(QByteArray(), | ||
44 | [&](const QByteArray &key, const QByteArray &value) -> bool { | ||
45 | if (keys.contains(Sink::Storage::uidFromKey(key))) { | ||
46 | //Not something that should persist if the replay works, so we keep a message for now. | ||
47 | SinkTrace() << "Multiple revisions for key: " << key; | ||
48 | } | ||
49 | keys << Sink::Storage::uidFromKey(key); | ||
50 | return true; | ||
51 | }, | ||
52 | [](const Sink::Storage::Error &error) { SinkWarning() << "Error during query: " << error.message; }); | ||
53 | |||
54 | SinkTrace() << "Full scan retrieved " << keys.size() << " results."; | ||
55 | return ResultSet(keys.toList().toVector()); | ||
56 | } | ||
57 | |||
58 | ResultSet DataStoreQuery::loadInitialResultSet(QSet<QByteArray> &remainingFilters, QByteArray &remainingSorting) | ||
59 | { | ||
60 | if (!mQuery.ids.isEmpty()) { | ||
61 | return ResultSet(mQuery.ids.toVector()); | ||
62 | } | ||
63 | QSet<QByteArray> appliedFilters; | ||
64 | QByteArray appliedSorting; | ||
65 | |||
66 | auto resultSet = mTypeIndex.query(mQuery, appliedFilters, appliedSorting, mTransaction); | ||
67 | |||
68 | remainingFilters = mQuery.propertyFilter.keys().toSet() - appliedFilters; | ||
69 | if (appliedSorting.isEmpty()) { | ||
70 | remainingSorting = mQuery.sortProperty; | ||
71 | } | ||
72 | |||
73 | // We do a full scan if there were no indexes available to create the initial set. | ||
74 | if (appliedFilters.isEmpty()) { | ||
75 | // TODO this should be replaced by an index lookup as well | ||
76 | resultSet = fullScan(mTransaction, mType); | ||
77 | } | ||
78 | return resultSet; | ||
79 | } | ||
80 | |||
81 | ResultSet DataStoreQuery::loadIncrementalResultSet(qint64 baseRevision, QSet<QByteArray> &remainingFilters) | ||
82 | { | ||
83 | const auto bufferType = mType; | ||
84 | auto revisionCounter = QSharedPointer<qint64>::create(baseRevision); | ||
85 | remainingFilters = mQuery.propertyFilter.keys().toSet(); | ||
86 | return ResultSet([this, bufferType, revisionCounter]() -> QByteArray { | ||
87 | const qint64 topRevision = Sink::Storage::maxRevision(mTransaction); | ||
88 | // Spit out the revision keys one by one. | ||
89 | while (*revisionCounter <= topRevision) { | ||
90 | const auto uid = Sink::Storage::getUidFromRevision(mTransaction, *revisionCounter); | ||
91 | const auto type = Sink::Storage::getTypeFromRevision(mTransaction, *revisionCounter); | ||
92 | // SinkTrace() << "Revision" << *revisionCounter << type << uid; | ||
93 | Q_ASSERT(!uid.isEmpty()); | ||
94 | Q_ASSERT(!type.isEmpty()); | ||
95 | if (type != bufferType) { | ||
96 | // Skip revision | ||
97 | *revisionCounter += 1; | ||
98 | continue; | ||
99 | } | ||
100 | const auto key = Sink::Storage::assembleKey(uid, *revisionCounter); | ||
101 | *revisionCounter += 1; | ||
102 | return key; | ||
103 | } | ||
104 | SinkTrace() << "Finished reading incremental result set:" << *revisionCounter; | ||
105 | // We're done | ||
106 | return QByteArray(); | ||
107 | }); | ||
108 | } | ||
109 | |||
110 | void DataStoreQuery::readEntity(const QByteArray &key, const BufferCallback &resultCallback) | ||
111 | { | ||
112 | mDb.findLatest(key, | ||
113 | [=](const QByteArray &key, const QByteArray &value) -> bool { | ||
114 | resultCallback(Sink::Storage::uidFromKey(key), Sink::EntityBuffer(value.data(), value.size())); | ||
115 | return false; | ||
116 | }, | ||
117 | [&](const Sink::Storage::Error &error) { SinkWarning() << "Error during query: " << error.message << key; }); | ||
118 | } | ||
119 | |||
120 | QVariant DataStoreQuery::getProperty(const Sink::Entity &entity, const QByteArray &property) | ||
121 | { | ||
122 | return mGetProperty(entity, property); | ||
123 | } | ||
124 | |||
125 | ResultSet DataStoreQuery::filterAndSortSet(ResultSet &resultSet, const FilterFunction &filter, bool initialQuery, const QByteArray &sortProperty) | ||
126 | { | ||
127 | const bool sortingRequired = !sortProperty.isEmpty(); | ||
128 | if (initialQuery && sortingRequired) { | ||
129 | SinkTrace() << "Sorting the resultset in memory according to property: " << sortProperty; | ||
130 | // Sort the complete set by reading the sort property and filling into a sorted map | ||
131 | auto sortedMap = QSharedPointer<QMap<QByteArray, QByteArray>>::create(); | ||
132 | while (resultSet.next()) { | ||
133 | // readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess) | ||
134 | readEntity(resultSet.id(), | ||
135 | [this, filter, initialQuery, sortedMap, sortProperty, &resultSet](const QByteArray &uid, const Sink::EntityBuffer &buffer) { | ||
136 | |||
137 | const auto operation = buffer.operation(); | ||
138 | |||
139 | // We're not interested in removals during the initial query | ||
140 | if ((operation != Sink::Operation_Removal) && filter(uid, buffer)) { | ||
141 | if (!sortProperty.isEmpty()) { | ||
142 | const auto sortValue = getProperty(buffer.entity(), sortProperty); | ||
143 | if (sortValue.type() == QVariant::DateTime) { | ||
144 | sortedMap->insert(QByteArray::number(std::numeric_limits<unsigned int>::max() - sortValue.toDateTime().toTime_t()), uid); | ||
145 | } else { | ||
146 | sortedMap->insert(sortValue.toString().toLatin1(), uid); | ||
147 | } | ||
148 | } else { | ||
149 | sortedMap->insert(uid, uid); | ||
150 | } | ||
151 | } | ||
152 | }); | ||
153 | } | ||
154 | |||
155 | SinkTrace() << "Sorted " << sortedMap->size() << " values."; | ||
156 | auto iterator = QSharedPointer<QMapIterator<QByteArray, QByteArray>>::create(*sortedMap); | ||
157 | ResultSet::ValueGenerator generator = [this, iterator, sortedMap, filter, initialQuery]( | ||
158 | std::function<void(const QByteArray &uid, const Sink::EntityBuffer &entity, Sink::Operation)> callback) -> bool { | ||
159 | if (iterator->hasNext()) { | ||
160 | readEntity(iterator->next().value(), [this, filter, callback, initialQuery](const QByteArray &uid, const Sink::EntityBuffer &buffer) { | ||
161 | callback(uid, buffer, Sink::Operation_Creation); | ||
162 | }); | ||
163 | return true; | ||
164 | } | ||
165 | return false; | ||
166 | }; | ||
167 | |||
168 | auto skip = [iterator]() { | ||
169 | if (iterator->hasNext()) { | ||
170 | iterator->next(); | ||
171 | } | ||
172 | }; | ||
173 | return ResultSet(generator, skip); | ||
174 | } else { | ||
175 | auto resultSetPtr = QSharedPointer<ResultSet>::create(resultSet); | ||
176 | ResultSet::ValueGenerator generator = [this, resultSetPtr, filter, initialQuery](const ResultSet::Callback &callback) -> bool { | ||
177 | if (resultSetPtr->next()) { | ||
178 | SinkTrace() << "Reading the next value: " << resultSetPtr->id(); | ||
179 | // readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess) | ||
180 | readEntity(resultSetPtr->id(), [this, filter, callback, initialQuery](const QByteArray &uid, const Sink::EntityBuffer &buffer) { | ||
181 | const auto operation = buffer.operation(); | ||
182 | if (initialQuery) { | ||
183 | // We're not interested in removals during the initial query | ||
184 | if ((operation != Sink::Operation_Removal) && filter(uid, buffer)) { | ||
185 | // In the initial set every entity is new | ||
186 | callback(uid, buffer, Sink::Operation_Creation); | ||
187 | } | ||
188 | } else { | ||
189 | // Always remove removals, they probably don't match due to non-available properties | ||
190 | if ((operation == Sink::Operation_Removal) || filter(uid, buffer)) { | ||
191 | // TODO only replay if this is in the currently visible set (or just always replay, worst case we have a couple to many results) | ||
192 | callback(uid, buffer, operation); | ||
193 | } | ||
194 | } | ||
195 | }); | ||
196 | return true; | ||
197 | } | ||
198 | return false; | ||
199 | }; | ||
200 | auto skip = [resultSetPtr]() { resultSetPtr->skip(1); }; | ||
201 | return ResultSet(generator, skip); | ||
202 | } | ||
203 | } | ||
204 | |||
205 | |||
206 | DataStoreQuery::FilterFunction DataStoreQuery::getFilter(const QSet<QByteArray> &remainingFilters) | ||
207 | { | ||
208 | auto query = mQuery; | ||
209 | return [this, remainingFilters, query](const QByteArray &uid, const Sink::EntityBuffer &entity) -> bool { | ||
210 | if (!query.ids.isEmpty()) { | ||
211 | if (!query.ids.contains(uid)) { | ||
212 | SinkTrace() << "Filter by uid: " << uid; | ||
213 | return false; | ||
214 | } | ||
215 | } | ||
216 | for (const auto &filterProperty : remainingFilters) { | ||
217 | const auto property = getProperty(entity.entity(), filterProperty); | ||
218 | const auto comparator = query.propertyFilter.value(filterProperty); | ||
219 | if (!comparator.matches(property)) { | ||
220 | SinkTrace() << "Filtering entity due to property mismatch on filter: " << filterProperty << property << ":" << comparator.value; | ||
221 | return false; | ||
222 | } | ||
223 | } | ||
224 | return true; | ||
225 | }; | ||
226 | } | ||
227 | |||
228 | ResultSet DataStoreQuery::update(qint64 baseRevision) | ||
229 | { | ||
230 | SinkTrace() << "Executing query update"; | ||
231 | QSet<QByteArray> remainingFilters; | ||
232 | QByteArray remainingSorting; | ||
233 | auto resultSet = loadIncrementalResultSet(baseRevision, remainingFilters); | ||
234 | auto filteredSet = filterAndSortSet(resultSet, getFilter(remainingFilters), false, remainingSorting); | ||
235 | return filteredSet; | ||
236 | } | ||
237 | |||
238 | ResultSet DataStoreQuery::execute() | ||
239 | { | ||
240 | SinkTrace() << "Executing query"; | ||
241 | QSet<QByteArray> remainingFilters; | ||
242 | QByteArray remainingSorting; | ||
243 | auto resultSet = loadInitialResultSet(remainingFilters, remainingSorting); | ||
244 | auto filteredSet = filterAndSortSet(resultSet, getFilter(remainingFilters), true, remainingSorting); | ||
245 | return filteredSet; | ||
246 | } | ||