diff options
Diffstat (limited to 'common/datastorequery.cpp')
-rw-r--r-- | common/datastorequery.cpp | 246 |
1 files changed, 246 insertions, 0 deletions
diff --git a/common/datastorequery.cpp b/common/datastorequery.cpp new file mode 100644 index 0000000..3237c53 --- /dev/null +++ b/common/datastorequery.cpp | |||
@@ -0,0 +1,246 @@ | |||
1 | /* | ||
2 | * Copyright (C) 2016 Christian Mollekopf <chrigi_1@fastmail.fm> | ||
3 | * | ||
4 | * This program is free software; you can redistribute it and/or modify | ||
5 | * it under the terms of the GNU General Public License as published by | ||
6 | * the Free Software Foundation; either version 2 of the License, or | ||
7 | * (at your option) any later version. | ||
8 | * | ||
9 | * This program is distributed in the hope that it will be useful, | ||
10 | * but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
11 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||
12 | * GNU General Public License for more details. | ||
13 | * | ||
14 | * You should have received a copy of the GNU General Public License | ||
15 | * along with this program; if not, write to the | ||
16 | * Free Software Foundation, Inc., | ||
17 | * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. | ||
18 | */ | ||
19 | #include "datastorequery.h" | ||
20 | |||
21 | #include "log.h" | ||
22 | #include "entitybuffer.h" | ||
23 | #include "entity_generated.h" | ||
24 | |||
25 | using namespace Sink; | ||
26 | |||
27 | |||
28 | SINK_DEBUG_AREA("datastorequery") | ||
29 | |||
30 | DataStoreQuery::DataStoreQuery(const Sink::Query &query, const QByteArray &type, Sink::Storage::Transaction &transaction, TypeIndex &typeIndex, std::function<QVariant(const Sink::Entity &entity, const QByteArray &property)> getProperty) | ||
31 | : mQuery(query), mTransaction(transaction), mType(type), mTypeIndex(typeIndex), mDb(Storage::mainDatabase(mTransaction, mType)), mGetProperty(getProperty) | ||
32 | { | ||
33 | |||
34 | } | ||
35 | |||
36 | static inline ResultSet fullScan(const Sink::Storage::Transaction &transaction, const QByteArray &bufferType) | ||
37 | { | ||
38 | // TODO use a result set with an iterator, to read values on demand | ||
39 | SinkTrace() << "Looking for : " << bufferType; | ||
40 | //The scan can return duplicate results if we have multiple revisions, so we use a set to deduplicate. | ||
41 | QSet<QByteArray> keys; | ||
42 | Storage::mainDatabase(transaction, bufferType) | ||
43 | .scan(QByteArray(), | ||
44 | [&](const QByteArray &key, const QByteArray &value) -> bool { | ||
45 | if (keys.contains(Sink::Storage::uidFromKey(key))) { | ||
46 | //Not something that should persist if the replay works, so we keep a message for now. | ||
47 | SinkTrace() << "Multiple revisions for key: " << key; | ||
48 | } | ||
49 | keys << Sink::Storage::uidFromKey(key); | ||
50 | return true; | ||
51 | }, | ||
52 | [](const Sink::Storage::Error &error) { SinkWarning() << "Error during query: " << error.message; }); | ||
53 | |||
54 | SinkTrace() << "Full scan retrieved " << keys.size() << " results."; | ||
55 | return ResultSet(keys.toList().toVector()); | ||
56 | } | ||
57 | |||
58 | ResultSet DataStoreQuery::loadInitialResultSet(QSet<QByteArray> &remainingFilters, QByteArray &remainingSorting) | ||
59 | { | ||
60 | if (!mQuery.ids.isEmpty()) { | ||
61 | return ResultSet(mQuery.ids.toVector()); | ||
62 | } | ||
63 | QSet<QByteArray> appliedFilters; | ||
64 | QByteArray appliedSorting; | ||
65 | |||
66 | auto resultSet = mTypeIndex.query(mQuery, appliedFilters, appliedSorting, mTransaction); | ||
67 | |||
68 | remainingFilters = mQuery.propertyFilter.keys().toSet() - appliedFilters; | ||
69 | if (appliedSorting.isEmpty()) { | ||
70 | remainingSorting = mQuery.sortProperty; | ||
71 | } | ||
72 | |||
73 | // We do a full scan if there were no indexes available to create the initial set. | ||
74 | if (appliedFilters.isEmpty()) { | ||
75 | // TODO this should be replaced by an index lookup as well | ||
76 | resultSet = fullScan(mTransaction, mType); | ||
77 | } | ||
78 | return resultSet; | ||
79 | } | ||
80 | |||
81 | ResultSet DataStoreQuery::loadIncrementalResultSet(qint64 baseRevision, QSet<QByteArray> &remainingFilters) | ||
82 | { | ||
83 | const auto bufferType = mType; | ||
84 | auto revisionCounter = QSharedPointer<qint64>::create(baseRevision); | ||
85 | remainingFilters = mQuery.propertyFilter.keys().toSet(); | ||
86 | return ResultSet([this, bufferType, revisionCounter]() -> QByteArray { | ||
87 | const qint64 topRevision = Sink::Storage::maxRevision(mTransaction); | ||
88 | // Spit out the revision keys one by one. | ||
89 | while (*revisionCounter <= topRevision) { | ||
90 | const auto uid = Sink::Storage::getUidFromRevision(mTransaction, *revisionCounter); | ||
91 | const auto type = Sink::Storage::getTypeFromRevision(mTransaction, *revisionCounter); | ||
92 | // SinkTrace() << "Revision" << *revisionCounter << type << uid; | ||
93 | Q_ASSERT(!uid.isEmpty()); | ||
94 | Q_ASSERT(!type.isEmpty()); | ||
95 | if (type != bufferType) { | ||
96 | // Skip revision | ||
97 | *revisionCounter += 1; | ||
98 | continue; | ||
99 | } | ||
100 | const auto key = Sink::Storage::assembleKey(uid, *revisionCounter); | ||
101 | *revisionCounter += 1; | ||
102 | return key; | ||
103 | } | ||
104 | SinkTrace() << "Finished reading incremental result set:" << *revisionCounter; | ||
105 | // We're done | ||
106 | return QByteArray(); | ||
107 | }); | ||
108 | } | ||
109 | |||
110 | void DataStoreQuery::readEntity(const QByteArray &key, const BufferCallback &resultCallback) | ||
111 | { | ||
112 | mDb.findLatest(key, | ||
113 | [=](const QByteArray &key, const QByteArray &value) -> bool { | ||
114 | resultCallback(Sink::Storage::uidFromKey(key), Sink::EntityBuffer(value.data(), value.size())); | ||
115 | return false; | ||
116 | }, | ||
117 | [&](const Sink::Storage::Error &error) { SinkWarning() << "Error during query: " << error.message << key; }); | ||
118 | } | ||
119 | |||
120 | QVariant DataStoreQuery::getProperty(const Sink::Entity &entity, const QByteArray &property) | ||
121 | { | ||
122 | return mGetProperty(entity, property); | ||
123 | } | ||
124 | |||
125 | ResultSet DataStoreQuery::filterAndSortSet(ResultSet &resultSet, const FilterFunction &filter, bool initialQuery, const QByteArray &sortProperty) | ||
126 | { | ||
127 | const bool sortingRequired = !sortProperty.isEmpty(); | ||
128 | if (initialQuery && sortingRequired) { | ||
129 | SinkTrace() << "Sorting the resultset in memory according to property: " << sortProperty; | ||
130 | // Sort the complete set by reading the sort property and filling into a sorted map | ||
131 | auto sortedMap = QSharedPointer<QMap<QByteArray, QByteArray>>::create(); | ||
132 | while (resultSet.next()) { | ||
133 | // readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess) | ||
134 | readEntity(resultSet.id(), | ||
135 | [this, filter, initialQuery, sortedMap, sortProperty, &resultSet](const QByteArray &uid, const Sink::EntityBuffer &buffer) { | ||
136 | |||
137 | const auto operation = buffer.operation(); | ||
138 | |||
139 | // We're not interested in removals during the initial query | ||
140 | if ((operation != Sink::Operation_Removal) && filter(uid, buffer)) { | ||
141 | if (!sortProperty.isEmpty()) { | ||
142 | const auto sortValue = getProperty(buffer.entity(), sortProperty); | ||
143 | if (sortValue.type() == QVariant::DateTime) { | ||
144 | sortedMap->insert(QByteArray::number(std::numeric_limits<unsigned int>::max() - sortValue.toDateTime().toTime_t()), uid); | ||
145 | } else { | ||
146 | sortedMap->insert(sortValue.toString().toLatin1(), uid); | ||
147 | } | ||
148 | } else { | ||
149 | sortedMap->insert(uid, uid); | ||
150 | } | ||
151 | } | ||
152 | }); | ||
153 | } | ||
154 | |||
155 | SinkTrace() << "Sorted " << sortedMap->size() << " values."; | ||
156 | auto iterator = QSharedPointer<QMapIterator<QByteArray, QByteArray>>::create(*sortedMap); | ||
157 | ResultSet::ValueGenerator generator = [this, iterator, sortedMap, filter, initialQuery]( | ||
158 | std::function<void(const QByteArray &uid, const Sink::EntityBuffer &entity, Sink::Operation)> callback) -> bool { | ||
159 | if (iterator->hasNext()) { | ||
160 | readEntity(iterator->next().value(), [this, filter, callback, initialQuery](const QByteArray &uid, const Sink::EntityBuffer &buffer) { | ||
161 | callback(uid, buffer, Sink::Operation_Creation); | ||
162 | }); | ||
163 | return true; | ||
164 | } | ||
165 | return false; | ||
166 | }; | ||
167 | |||
168 | auto skip = [iterator]() { | ||
169 | if (iterator->hasNext()) { | ||
170 | iterator->next(); | ||
171 | } | ||
172 | }; | ||
173 | return ResultSet(generator, skip); | ||
174 | } else { | ||
175 | auto resultSetPtr = QSharedPointer<ResultSet>::create(resultSet); | ||
176 | ResultSet::ValueGenerator generator = [this, resultSetPtr, filter, initialQuery](const ResultSet::Callback &callback) -> bool { | ||
177 | if (resultSetPtr->next()) { | ||
178 | SinkTrace() << "Reading the next value: " << resultSetPtr->id(); | ||
179 | // readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess) | ||
180 | readEntity(resultSetPtr->id(), [this, filter, callback, initialQuery](const QByteArray &uid, const Sink::EntityBuffer &buffer) { | ||
181 | const auto operation = buffer.operation(); | ||
182 | if (initialQuery) { | ||
183 | // We're not interested in removals during the initial query | ||
184 | if ((operation != Sink::Operation_Removal) && filter(uid, buffer)) { | ||
185 | // In the initial set every entity is new | ||
186 | callback(uid, buffer, Sink::Operation_Creation); | ||
187 | } | ||
188 | } else { | ||
189 | // Always remove removals, they probably don't match due to non-available properties | ||
190 | if ((operation == Sink::Operation_Removal) || filter(uid, buffer)) { | ||
191 | // TODO only replay if this is in the currently visible set (or just always replay, worst case we have a couple to many results) | ||
192 | callback(uid, buffer, operation); | ||
193 | } | ||
194 | } | ||
195 | }); | ||
196 | return true; | ||
197 | } | ||
198 | return false; | ||
199 | }; | ||
200 | auto skip = [resultSetPtr]() { resultSetPtr->skip(1); }; | ||
201 | return ResultSet(generator, skip); | ||
202 | } | ||
203 | } | ||
204 | |||
205 | |||
206 | DataStoreQuery::FilterFunction DataStoreQuery::getFilter(const QSet<QByteArray> &remainingFilters) | ||
207 | { | ||
208 | auto query = mQuery; | ||
209 | return [this, remainingFilters, query](const QByteArray &uid, const Sink::EntityBuffer &entity) -> bool { | ||
210 | if (!query.ids.isEmpty()) { | ||
211 | if (!query.ids.contains(uid)) { | ||
212 | SinkTrace() << "Filter by uid: " << uid; | ||
213 | return false; | ||
214 | } | ||
215 | } | ||
216 | for (const auto &filterProperty : remainingFilters) { | ||
217 | const auto property = getProperty(entity.entity(), filterProperty); | ||
218 | const auto comparator = query.propertyFilter.value(filterProperty); | ||
219 | if (!comparator.matches(property)) { | ||
220 | SinkTrace() << "Filtering entity due to property mismatch on filter: " << filterProperty << property << ":" << comparator.value; | ||
221 | return false; | ||
222 | } | ||
223 | } | ||
224 | return true; | ||
225 | }; | ||
226 | } | ||
227 | |||
228 | ResultSet DataStoreQuery::update(qint64 baseRevision) | ||
229 | { | ||
230 | SinkTrace() << "Executing query update"; | ||
231 | QSet<QByteArray> remainingFilters; | ||
232 | QByteArray remainingSorting; | ||
233 | auto resultSet = loadIncrementalResultSet(baseRevision, remainingFilters); | ||
234 | auto filteredSet = filterAndSortSet(resultSet, getFilter(remainingFilters), false, remainingSorting); | ||
235 | return filteredSet; | ||
236 | } | ||
237 | |||
238 | ResultSet DataStoreQuery::execute() | ||
239 | { | ||
240 | SinkTrace() << "Executing query"; | ||
241 | QSet<QByteArray> remainingFilters; | ||
242 | QByteArray remainingSorting; | ||
243 | auto resultSet = loadInitialResultSet(remainingFilters, remainingSorting); | ||
244 | auto filteredSet = filterAndSortSet(resultSet, getFilter(remainingFilters), true, remainingSorting); | ||
245 | return filteredSet; | ||
246 | } | ||