summaryrefslogtreecommitdiffstats
path: root/common/datastorequery.cpp
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2016-09-19 18:55:21 +0200
committerChristian Mollekopf <chrigi_1@fastmail.fm>2016-09-19 18:55:21 +0200
commit4a14a6fade947aa830d3f21598a4a6ba7316b933 (patch)
treec6b340bf1c6284e5501d371f65b58e3a69391a26 /common/datastorequery.cpp
parent1deac558af4b1c9f04352ede7f8e172f11a70a6b (diff)
downloadsink-4a14a6fade947aa830d3f21598a4a6ba7316b933.tar.gz
sink-4a14a6fade947aa830d3f21598a4a6ba7316b933.zip
Refactored the query part of the entity reader into DataStoreQuery.
DataStoreQuery now encapsulates the low-level query that operates directly on the storage. It no longer has access to the resource buffers, and is instantiated by the type implementation, so we can specialize the query alogorithm per type, but not per resource. This will allow us to implement the threading queries for the mailtype.
Diffstat (limited to 'common/datastorequery.cpp')
-rw-r--r--common/datastorequery.cpp246
1 files changed, 246 insertions, 0 deletions
diff --git a/common/datastorequery.cpp b/common/datastorequery.cpp
new file mode 100644
index 0000000..3237c53
--- /dev/null
+++ b/common/datastorequery.cpp
@@ -0,0 +1,246 @@
1/*
2 * Copyright (C) 2016 Christian Mollekopf <chrigi_1@fastmail.fm>
3 *
4 * This program is free software; you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License as published by
6 * the Free Software Foundation; either version 2 of the License, or
7 * (at your option) any later version.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU General Public License for more details.
13 *
14 * You should have received a copy of the GNU General Public License
15 * along with this program; if not, write to the
16 * Free Software Foundation, Inc.,
17 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18 */
19#include "datastorequery.h"
20
21#include "log.h"
22#include "entitybuffer.h"
23#include "entity_generated.h"
24
25using namespace Sink;
26
27
28SINK_DEBUG_AREA("datastorequery")
29
30DataStoreQuery::DataStoreQuery(const Sink::Query &query, const QByteArray &type, Sink::Storage::Transaction &transaction, TypeIndex &typeIndex, std::function<QVariant(const Sink::Entity &entity, const QByteArray &property)> getProperty)
31 : mQuery(query), mTransaction(transaction), mType(type), mTypeIndex(typeIndex), mDb(Storage::mainDatabase(mTransaction, mType)), mGetProperty(getProperty)
32{
33
34}
35
36static inline ResultSet fullScan(const Sink::Storage::Transaction &transaction, const QByteArray &bufferType)
37{
38 // TODO use a result set with an iterator, to read values on demand
39 SinkTrace() << "Looking for : " << bufferType;
40 //The scan can return duplicate results if we have multiple revisions, so we use a set to deduplicate.
41 QSet<QByteArray> keys;
42 Storage::mainDatabase(transaction, bufferType)
43 .scan(QByteArray(),
44 [&](const QByteArray &key, const QByteArray &value) -> bool {
45 if (keys.contains(Sink::Storage::uidFromKey(key))) {
46 //Not something that should persist if the replay works, so we keep a message for now.
47 SinkTrace() << "Multiple revisions for key: " << key;
48 }
49 keys << Sink::Storage::uidFromKey(key);
50 return true;
51 },
52 [](const Sink::Storage::Error &error) { SinkWarning() << "Error during query: " << error.message; });
53
54 SinkTrace() << "Full scan retrieved " << keys.size() << " results.";
55 return ResultSet(keys.toList().toVector());
56}
57
58ResultSet DataStoreQuery::loadInitialResultSet(QSet<QByteArray> &remainingFilters, QByteArray &remainingSorting)
59{
60 if (!mQuery.ids.isEmpty()) {
61 return ResultSet(mQuery.ids.toVector());
62 }
63 QSet<QByteArray> appliedFilters;
64 QByteArray appliedSorting;
65
66 auto resultSet = mTypeIndex.query(mQuery, appliedFilters, appliedSorting, mTransaction);
67
68 remainingFilters = mQuery.propertyFilter.keys().toSet() - appliedFilters;
69 if (appliedSorting.isEmpty()) {
70 remainingSorting = mQuery.sortProperty;
71 }
72
73 // We do a full scan if there were no indexes available to create the initial set.
74 if (appliedFilters.isEmpty()) {
75 // TODO this should be replaced by an index lookup as well
76 resultSet = fullScan(mTransaction, mType);
77 }
78 return resultSet;
79}
80
81ResultSet DataStoreQuery::loadIncrementalResultSet(qint64 baseRevision, QSet<QByteArray> &remainingFilters)
82{
83 const auto bufferType = mType;
84 auto revisionCounter = QSharedPointer<qint64>::create(baseRevision);
85 remainingFilters = mQuery.propertyFilter.keys().toSet();
86 return ResultSet([this, bufferType, revisionCounter]() -> QByteArray {
87 const qint64 topRevision = Sink::Storage::maxRevision(mTransaction);
88 // Spit out the revision keys one by one.
89 while (*revisionCounter <= topRevision) {
90 const auto uid = Sink::Storage::getUidFromRevision(mTransaction, *revisionCounter);
91 const auto type = Sink::Storage::getTypeFromRevision(mTransaction, *revisionCounter);
92 // SinkTrace() << "Revision" << *revisionCounter << type << uid;
93 Q_ASSERT(!uid.isEmpty());
94 Q_ASSERT(!type.isEmpty());
95 if (type != bufferType) {
96 // Skip revision
97 *revisionCounter += 1;
98 continue;
99 }
100 const auto key = Sink::Storage::assembleKey(uid, *revisionCounter);
101 *revisionCounter += 1;
102 return key;
103 }
104 SinkTrace() << "Finished reading incremental result set:" << *revisionCounter;
105 // We're done
106 return QByteArray();
107 });
108}
109
110void DataStoreQuery::readEntity(const QByteArray &key, const BufferCallback &resultCallback)
111{
112 mDb.findLatest(key,
113 [=](const QByteArray &key, const QByteArray &value) -> bool {
114 resultCallback(Sink::Storage::uidFromKey(key), Sink::EntityBuffer(value.data(), value.size()));
115 return false;
116 },
117 [&](const Sink::Storage::Error &error) { SinkWarning() << "Error during query: " << error.message << key; });
118}
119
120QVariant DataStoreQuery::getProperty(const Sink::Entity &entity, const QByteArray &property)
121{
122 return mGetProperty(entity, property);
123}
124
125ResultSet DataStoreQuery::filterAndSortSet(ResultSet &resultSet, const FilterFunction &filter, bool initialQuery, const QByteArray &sortProperty)
126{
127 const bool sortingRequired = !sortProperty.isEmpty();
128 if (initialQuery && sortingRequired) {
129 SinkTrace() << "Sorting the resultset in memory according to property: " << sortProperty;
130 // Sort the complete set by reading the sort property and filling into a sorted map
131 auto sortedMap = QSharedPointer<QMap<QByteArray, QByteArray>>::create();
132 while (resultSet.next()) {
133 // readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess)
134 readEntity(resultSet.id(),
135 [this, filter, initialQuery, sortedMap, sortProperty, &resultSet](const QByteArray &uid, const Sink::EntityBuffer &buffer) {
136
137 const auto operation = buffer.operation();
138
139 // We're not interested in removals during the initial query
140 if ((operation != Sink::Operation_Removal) && filter(uid, buffer)) {
141 if (!sortProperty.isEmpty()) {
142 const auto sortValue = getProperty(buffer.entity(), sortProperty);
143 if (sortValue.type() == QVariant::DateTime) {
144 sortedMap->insert(QByteArray::number(std::numeric_limits<unsigned int>::max() - sortValue.toDateTime().toTime_t()), uid);
145 } else {
146 sortedMap->insert(sortValue.toString().toLatin1(), uid);
147 }
148 } else {
149 sortedMap->insert(uid, uid);
150 }
151 }
152 });
153 }
154
155 SinkTrace() << "Sorted " << sortedMap->size() << " values.";
156 auto iterator = QSharedPointer<QMapIterator<QByteArray, QByteArray>>::create(*sortedMap);
157 ResultSet::ValueGenerator generator = [this, iterator, sortedMap, filter, initialQuery](
158 std::function<void(const QByteArray &uid, const Sink::EntityBuffer &entity, Sink::Operation)> callback) -> bool {
159 if (iterator->hasNext()) {
160 readEntity(iterator->next().value(), [this, filter, callback, initialQuery](const QByteArray &uid, const Sink::EntityBuffer &buffer) {
161 callback(uid, buffer, Sink::Operation_Creation);
162 });
163 return true;
164 }
165 return false;
166 };
167
168 auto skip = [iterator]() {
169 if (iterator->hasNext()) {
170 iterator->next();
171 }
172 };
173 return ResultSet(generator, skip);
174 } else {
175 auto resultSetPtr = QSharedPointer<ResultSet>::create(resultSet);
176 ResultSet::ValueGenerator generator = [this, resultSetPtr, filter, initialQuery](const ResultSet::Callback &callback) -> bool {
177 if (resultSetPtr->next()) {
178 SinkTrace() << "Reading the next value: " << resultSetPtr->id();
179 // readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess)
180 readEntity(resultSetPtr->id(), [this, filter, callback, initialQuery](const QByteArray &uid, const Sink::EntityBuffer &buffer) {
181 const auto operation = buffer.operation();
182 if (initialQuery) {
183 // We're not interested in removals during the initial query
184 if ((operation != Sink::Operation_Removal) && filter(uid, buffer)) {
185 // In the initial set every entity is new
186 callback(uid, buffer, Sink::Operation_Creation);
187 }
188 } else {
189 // Always remove removals, they probably don't match due to non-available properties
190 if ((operation == Sink::Operation_Removal) || filter(uid, buffer)) {
191 // TODO only replay if this is in the currently visible set (or just always replay, worst case we have a couple to many results)
192 callback(uid, buffer, operation);
193 }
194 }
195 });
196 return true;
197 }
198 return false;
199 };
200 auto skip = [resultSetPtr]() { resultSetPtr->skip(1); };
201 return ResultSet(generator, skip);
202 }
203}
204
205
206DataStoreQuery::FilterFunction DataStoreQuery::getFilter(const QSet<QByteArray> &remainingFilters)
207{
208 auto query = mQuery;
209 return [this, remainingFilters, query](const QByteArray &uid, const Sink::EntityBuffer &entity) -> bool {
210 if (!query.ids.isEmpty()) {
211 if (!query.ids.contains(uid)) {
212 SinkTrace() << "Filter by uid: " << uid;
213 return false;
214 }
215 }
216 for (const auto &filterProperty : remainingFilters) {
217 const auto property = getProperty(entity.entity(), filterProperty);
218 const auto comparator = query.propertyFilter.value(filterProperty);
219 if (!comparator.matches(property)) {
220 SinkTrace() << "Filtering entity due to property mismatch on filter: " << filterProperty << property << ":" << comparator.value;
221 return false;
222 }
223 }
224 return true;
225 };
226}
227
228ResultSet DataStoreQuery::update(qint64 baseRevision)
229{
230 SinkTrace() << "Executing query update";
231 QSet<QByteArray> remainingFilters;
232 QByteArray remainingSorting;
233 auto resultSet = loadIncrementalResultSet(baseRevision, remainingFilters);
234 auto filteredSet = filterAndSortSet(resultSet, getFilter(remainingFilters), false, remainingSorting);
235 return filteredSet;
236}
237
238ResultSet DataStoreQuery::execute()
239{
240 SinkTrace() << "Executing query";
241 QSet<QByteArray> remainingFilters;
242 QByteArray remainingSorting;
243 auto resultSet = loadInitialResultSet(remainingFilters, remainingSorting);
244 auto filteredSet = filterAndSortSet(resultSet, getFilter(remainingFilters), true, remainingSorting);
245 return filteredSet;
246}