summaryrefslogtreecommitdiffstats
path: root/common/datastorequery.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'common/datastorequery.cpp')
-rw-r--r--common/datastorequery.cpp246
1 files changed, 246 insertions, 0 deletions
diff --git a/common/datastorequery.cpp b/common/datastorequery.cpp
new file mode 100644
index 0000000..3237c53
--- /dev/null
+++ b/common/datastorequery.cpp
@@ -0,0 +1,246 @@
1/*
2 * Copyright (C) 2016 Christian Mollekopf <chrigi_1@fastmail.fm>
3 *
4 * This program is free software; you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License as published by
6 * the Free Software Foundation; either version 2 of the License, or
7 * (at your option) any later version.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU General Public License for more details.
13 *
14 * You should have received a copy of the GNU General Public License
15 * along with this program; if not, write to the
16 * Free Software Foundation, Inc.,
17 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18 */
19#include "datastorequery.h"
20
21#include "log.h"
22#include "entitybuffer.h"
23#include "entity_generated.h"
24
25using namespace Sink;
26
27
28SINK_DEBUG_AREA("datastorequery")
29
30DataStoreQuery::DataStoreQuery(const Sink::Query &query, const QByteArray &type, Sink::Storage::Transaction &transaction, TypeIndex &typeIndex, std::function<QVariant(const Sink::Entity &entity, const QByteArray &property)> getProperty)
31 : mQuery(query), mTransaction(transaction), mType(type), mTypeIndex(typeIndex), mDb(Storage::mainDatabase(mTransaction, mType)), mGetProperty(getProperty)
32{
33
34}
35
36static inline ResultSet fullScan(const Sink::Storage::Transaction &transaction, const QByteArray &bufferType)
37{
38 // TODO use a result set with an iterator, to read values on demand
39 SinkTrace() << "Looking for : " << bufferType;
40 //The scan can return duplicate results if we have multiple revisions, so we use a set to deduplicate.
41 QSet<QByteArray> keys;
42 Storage::mainDatabase(transaction, bufferType)
43 .scan(QByteArray(),
44 [&](const QByteArray &key, const QByteArray &value) -> bool {
45 if (keys.contains(Sink::Storage::uidFromKey(key))) {
46 //Not something that should persist if the replay works, so we keep a message for now.
47 SinkTrace() << "Multiple revisions for key: " << key;
48 }
49 keys << Sink::Storage::uidFromKey(key);
50 return true;
51 },
52 [](const Sink::Storage::Error &error) { SinkWarning() << "Error during query: " << error.message; });
53
54 SinkTrace() << "Full scan retrieved " << keys.size() << " results.";
55 return ResultSet(keys.toList().toVector());
56}
57
58ResultSet DataStoreQuery::loadInitialResultSet(QSet<QByteArray> &remainingFilters, QByteArray &remainingSorting)
59{
60 if (!mQuery.ids.isEmpty()) {
61 return ResultSet(mQuery.ids.toVector());
62 }
63 QSet<QByteArray> appliedFilters;
64 QByteArray appliedSorting;
65
66 auto resultSet = mTypeIndex.query(mQuery, appliedFilters, appliedSorting, mTransaction);
67
68 remainingFilters = mQuery.propertyFilter.keys().toSet() - appliedFilters;
69 if (appliedSorting.isEmpty()) {
70 remainingSorting = mQuery.sortProperty;
71 }
72
73 // We do a full scan if there were no indexes available to create the initial set.
74 if (appliedFilters.isEmpty()) {
75 // TODO this should be replaced by an index lookup as well
76 resultSet = fullScan(mTransaction, mType);
77 }
78 return resultSet;
79}
80
81ResultSet DataStoreQuery::loadIncrementalResultSet(qint64 baseRevision, QSet<QByteArray> &remainingFilters)
82{
83 const auto bufferType = mType;
84 auto revisionCounter = QSharedPointer<qint64>::create(baseRevision);
85 remainingFilters = mQuery.propertyFilter.keys().toSet();
86 return ResultSet([this, bufferType, revisionCounter]() -> QByteArray {
87 const qint64 topRevision = Sink::Storage::maxRevision(mTransaction);
88 // Spit out the revision keys one by one.
89 while (*revisionCounter <= topRevision) {
90 const auto uid = Sink::Storage::getUidFromRevision(mTransaction, *revisionCounter);
91 const auto type = Sink::Storage::getTypeFromRevision(mTransaction, *revisionCounter);
92 // SinkTrace() << "Revision" << *revisionCounter << type << uid;
93 Q_ASSERT(!uid.isEmpty());
94 Q_ASSERT(!type.isEmpty());
95 if (type != bufferType) {
96 // Skip revision
97 *revisionCounter += 1;
98 continue;
99 }
100 const auto key = Sink::Storage::assembleKey(uid, *revisionCounter);
101 *revisionCounter += 1;
102 return key;
103 }
104 SinkTrace() << "Finished reading incremental result set:" << *revisionCounter;
105 // We're done
106 return QByteArray();
107 });
108}
109
110void DataStoreQuery::readEntity(const QByteArray &key, const BufferCallback &resultCallback)
111{
112 mDb.findLatest(key,
113 [=](const QByteArray &key, const QByteArray &value) -> bool {
114 resultCallback(Sink::Storage::uidFromKey(key), Sink::EntityBuffer(value.data(), value.size()));
115 return false;
116 },
117 [&](const Sink::Storage::Error &error) { SinkWarning() << "Error during query: " << error.message << key; });
118}
119
120QVariant DataStoreQuery::getProperty(const Sink::Entity &entity, const QByteArray &property)
121{
122 return mGetProperty(entity, property);
123}
124
125ResultSet DataStoreQuery::filterAndSortSet(ResultSet &resultSet, const FilterFunction &filter, bool initialQuery, const QByteArray &sortProperty)
126{
127 const bool sortingRequired = !sortProperty.isEmpty();
128 if (initialQuery && sortingRequired) {
129 SinkTrace() << "Sorting the resultset in memory according to property: " << sortProperty;
130 // Sort the complete set by reading the sort property and filling into a sorted map
131 auto sortedMap = QSharedPointer<QMap<QByteArray, QByteArray>>::create();
132 while (resultSet.next()) {
133 // readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess)
134 readEntity(resultSet.id(),
135 [this, filter, initialQuery, sortedMap, sortProperty, &resultSet](const QByteArray &uid, const Sink::EntityBuffer &buffer) {
136
137 const auto operation = buffer.operation();
138
139 // We're not interested in removals during the initial query
140 if ((operation != Sink::Operation_Removal) && filter(uid, buffer)) {
141 if (!sortProperty.isEmpty()) {
142 const auto sortValue = getProperty(buffer.entity(), sortProperty);
143 if (sortValue.type() == QVariant::DateTime) {
144 sortedMap->insert(QByteArray::number(std::numeric_limits<unsigned int>::max() - sortValue.toDateTime().toTime_t()), uid);
145 } else {
146 sortedMap->insert(sortValue.toString().toLatin1(), uid);
147 }
148 } else {
149 sortedMap->insert(uid, uid);
150 }
151 }
152 });
153 }
154
155 SinkTrace() << "Sorted " << sortedMap->size() << " values.";
156 auto iterator = QSharedPointer<QMapIterator<QByteArray, QByteArray>>::create(*sortedMap);
157 ResultSet::ValueGenerator generator = [this, iterator, sortedMap, filter, initialQuery](
158 std::function<void(const QByteArray &uid, const Sink::EntityBuffer &entity, Sink::Operation)> callback) -> bool {
159 if (iterator->hasNext()) {
160 readEntity(iterator->next().value(), [this, filter, callback, initialQuery](const QByteArray &uid, const Sink::EntityBuffer &buffer) {
161 callback(uid, buffer, Sink::Operation_Creation);
162 });
163 return true;
164 }
165 return false;
166 };
167
168 auto skip = [iterator]() {
169 if (iterator->hasNext()) {
170 iterator->next();
171 }
172 };
173 return ResultSet(generator, skip);
174 } else {
175 auto resultSetPtr = QSharedPointer<ResultSet>::create(resultSet);
176 ResultSet::ValueGenerator generator = [this, resultSetPtr, filter, initialQuery](const ResultSet::Callback &callback) -> bool {
177 if (resultSetPtr->next()) {
178 SinkTrace() << "Reading the next value: " << resultSetPtr->id();
179 // readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess)
180 readEntity(resultSetPtr->id(), [this, filter, callback, initialQuery](const QByteArray &uid, const Sink::EntityBuffer &buffer) {
181 const auto operation = buffer.operation();
182 if (initialQuery) {
183 // We're not interested in removals during the initial query
184 if ((operation != Sink::Operation_Removal) && filter(uid, buffer)) {
185 // In the initial set every entity is new
186 callback(uid, buffer, Sink::Operation_Creation);
187 }
188 } else {
189 // Always remove removals, they probably don't match due to non-available properties
190 if ((operation == Sink::Operation_Removal) || filter(uid, buffer)) {
191 // TODO only replay if this is in the currently visible set (or just always replay, worst case we have a couple to many results)
192 callback(uid, buffer, operation);
193 }
194 }
195 });
196 return true;
197 }
198 return false;
199 };
200 auto skip = [resultSetPtr]() { resultSetPtr->skip(1); };
201 return ResultSet(generator, skip);
202 }
203}
204
205
206DataStoreQuery::FilterFunction DataStoreQuery::getFilter(const QSet<QByteArray> &remainingFilters)
207{
208 auto query = mQuery;
209 return [this, remainingFilters, query](const QByteArray &uid, const Sink::EntityBuffer &entity) -> bool {
210 if (!query.ids.isEmpty()) {
211 if (!query.ids.contains(uid)) {
212 SinkTrace() << "Filter by uid: " << uid;
213 return false;
214 }
215 }
216 for (const auto &filterProperty : remainingFilters) {
217 const auto property = getProperty(entity.entity(), filterProperty);
218 const auto comparator = query.propertyFilter.value(filterProperty);
219 if (!comparator.matches(property)) {
220 SinkTrace() << "Filtering entity due to property mismatch on filter: " << filterProperty << property << ":" << comparator.value;
221 return false;
222 }
223 }
224 return true;
225 };
226}
227
228ResultSet DataStoreQuery::update(qint64 baseRevision)
229{
230 SinkTrace() << "Executing query update";
231 QSet<QByteArray> remainingFilters;
232 QByteArray remainingSorting;
233 auto resultSet = loadIncrementalResultSet(baseRevision, remainingFilters);
234 auto filteredSet = filterAndSortSet(resultSet, getFilter(remainingFilters), false, remainingSorting);
235 return filteredSet;
236}
237
238ResultSet DataStoreQuery::execute()
239{
240 SinkTrace() << "Executing query";
241 QSet<QByteArray> remainingFilters;
242 QByteArray remainingSorting;
243 auto resultSet = loadInitialResultSet(remainingFilters, remainingSorting);
244 auto filteredSet = filterAndSortSet(resultSet, getFilter(remainingFilters), true, remainingSorting);
245 return filteredSet;
246}