summaryrefslogtreecommitdiffstats
path: root/common/entityreader.cpp
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2016-06-09 17:27:29 +0200
committerChristian Mollekopf <chrigi_1@fastmail.fm>2016-06-09 17:27:29 +0200
commit555c373a0c4dfe386dcd2c88ae9548d95e307409 (patch)
tree57dd13d7e6d6b5c5929f15f4fafc19881deb84c0 /common/entityreader.cpp
parent32a6f2ad3b66431c157e00ba5b1fb40c05e3c407 (diff)
downloadsink-555c373a0c4dfe386dcd2c88ae9548d95e307409.tar.gz
sink-555c373a0c4dfe386dcd2c88ae9548d95e307409.zip
Moved query logic to EntityReader to make it reusable in the resource.
Diffstat (limited to 'common/entityreader.cpp')
-rw-r--r--common/entityreader.cpp412
1 files changed, 412 insertions, 0 deletions
diff --git a/common/entityreader.cpp b/common/entityreader.cpp
new file mode 100644
index 0000000..b29b2a3
--- /dev/null
+++ b/common/entityreader.cpp
@@ -0,0 +1,412 @@
1/*
2 * Copyright (C) 2016 Christian Mollekopf <mollekopf@kolabsys.com>
3 *
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Lesser General Public
6 * License as published by the Free Software Foundation; either
7 * version 2.1 of the License, or (at your option) version 3, or any
8 * later version accepted by the membership of KDE e.V. (or its
9 * successor approved by the membership of KDE e.V.), which shall
10 * act as a proxy defined in Section 6 of version 3 of the license.
11 *
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this library. If not, see <http://www.gnu.org/licenses/>.
19 */
20#include "entityreader.h"
21
22#include "resultset.h"
23#include "storage.h"
24#include "query.h"
25
26using namespace Sink;
27
28QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> EntityReaderUtils::getLatest(const Sink::Storage::NamedDatabase &db, const QByteArray &uid, DomainTypeAdaptorFactoryInterface &adaptorFactory, qint64 &retrievedRevision)
29{
30 QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> current;
31 db.findLatest(uid,
32 [&current, &adaptorFactory, &retrievedRevision](const QByteArray &key, const QByteArray &data) -> bool {
33 Sink::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size());
34 if (!buffer.isValid()) {
35 Warning() << "Read invalid buffer from disk";
36 } else {
37 Trace() << "Found value " << key;
38 current = adaptorFactory.createAdaptor(buffer.entity());
39 retrievedRevision = Sink::Storage::revisionFromKey(key);
40 }
41 return false;
42 },
43 [](const Sink::Storage::Error &error) { Warning() << "Failed to read current value from storage: " << error.message; });
44 return current;
45}
46
47QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> EntityReaderUtils::get(const Sink::Storage::NamedDatabase &db, const QByteArray &key, DomainTypeAdaptorFactoryInterface &adaptorFactory, qint64 &retrievedRevision)
48{
49 QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> current;
50 db.scan(key,
51 [&current, &adaptorFactory, &retrievedRevision](const QByteArray &key, const QByteArray &data) -> bool {
52 Sink::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size());
53 if (!buffer.isValid()) {
54 Warning() << "Read invalid buffer from disk";
55 } else {
56 current = adaptorFactory.createAdaptor(buffer.entity());
57 retrievedRevision = Sink::Storage::revisionFromKey(key);
58 }
59 return false;
60 },
61 [](const Sink::Storage::Error &error) { Warning() << "Failed to read current value from storage: " << error.message; });
62 return current;
63}
64
65QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> EntityReaderUtils::getPrevious(const Sink::Storage::NamedDatabase &db, const QByteArray &uid, qint64 revision, DomainTypeAdaptorFactoryInterface &adaptorFactory, qint64 &retrievedRevision)
66{
67 QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> current;
68 qint64 latestRevision = 0;
69 db.scan(uid,
70 [&current, &latestRevision, revision](const QByteArray &key, const QByteArray &) -> bool {
71 auto foundRevision = Sink::Storage::revisionFromKey(key);
72 if (foundRevision < revision && foundRevision > latestRevision) {
73 latestRevision = foundRevision;
74 }
75 return true;
76 },
77 [](const Sink::Storage::Error &error) { Warning() << "Failed to read current value from storage: " << error.message; }, true);
78 return get(db, Sink::Storage::assembleKey(uid, latestRevision), adaptorFactory, retrievedRevision);
79}
80
81template <class DomainType>
82EntityReader<DomainType>::EntityReader(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier, Sink::Storage::Transaction &transaction)
83 : mResourceInstanceIdentifier(resourceInstanceIdentifier),
84 mTransaction(transaction),
85 mDomainTypeAdaptorFactoryPtr(Sink::AdaptorFactoryRegistry::instance().getFactory<DomainType>(resourceType)),
86 mDomainTypeAdaptorFactory(*mDomainTypeAdaptorFactoryPtr)
87{
88
89}
90
91template <class DomainType>
92EntityReader<DomainType>::EntityReader(DomainTypeAdaptorFactoryInterface &domainTypeAdaptorFactory, const QByteArray &resourceInstanceIdentifier, Sink::Storage::Transaction &transaction)
93 : mResourceInstanceIdentifier(resourceInstanceIdentifier),
94 mTransaction(transaction),
95 mDomainTypeAdaptorFactory(domainTypeAdaptorFactory)
96{
97
98}
99
100template <class DomainType>
101DomainType EntityReader<DomainType>::read(const QByteArray &identifier) const
102{
103 auto typeName = ApplicationDomain::getTypeName<DomainType>();
104 auto mainDatabase = Storage::mainDatabase(mTransaction, typeName);
105 qint64 retrievedRevision = 0;
106 auto bufferAdaptor = EntityReaderUtils::getLatest(mainDatabase, identifier, mDomainTypeAdaptorFactory, retrievedRevision);
107 if (!bufferAdaptor) {
108 return DomainType();
109 }
110 return DomainType(mResourceInstanceIdentifier, identifier, retrievedRevision, bufferAdaptor);
111}
112
113template <class DomainType>
114DomainType EntityReader<DomainType>::readFromKey(const QByteArray &key) const
115{
116 auto typeName = ApplicationDomain::getTypeName<DomainType>();
117 auto mainDatabase = Storage::mainDatabase(mTransaction, typeName);
118 qint64 retrievedRevision = 0;
119 auto bufferAdaptor = EntityReaderUtils::get(mainDatabase, key, mDomainTypeAdaptorFactory, retrievedRevision);
120 const auto identifier = Storage::uidFromKey(key);
121 if (!bufferAdaptor) {
122 return DomainType();
123 }
124 return DomainType(mResourceInstanceIdentifier, identifier, retrievedRevision, bufferAdaptor);
125}
126
127template <class DomainType>
128DomainType EntityReader<DomainType>::readPrevious(const QByteArray &uid, qint64 revision) const
129{
130 auto typeName = ApplicationDomain::getTypeName<DomainType>();
131 auto mainDatabase = Storage::mainDatabase(mTransaction, typeName);
132 qint64 retrievedRevision = 0;
133 auto bufferAdaptor = EntityReaderUtils::getPrevious(mainDatabase, uid, revision, mDomainTypeAdaptorFactory, retrievedRevision);
134 if (!bufferAdaptor) {
135 return DomainType();
136 }
137 return DomainType(mResourceInstanceIdentifier, uid, retrievedRevision, bufferAdaptor);
138}
139
140template <class DomainType>
141void EntityReader<DomainType>::query(const Sink::Query &query, const std::function<bool(const DomainType &)> &callback)
142{
143 executeInitialQuery(query, 0, 0,
144 [&callback](const typename DomainType::Ptr &value, Sink::Operation operation) -> bool {
145 Q_ASSERT(operation == Sink::Operation_Creation);
146 return callback(*value);
147 });
148}
149
150template <class DomainType>
151void EntityReader<DomainType>::readEntity(const Sink::Storage::NamedDatabase &db, const QByteArray &key,
152 const std::function<void(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &, Sink::Operation)> &resultCallback)
153{
154 db.findLatest(key,
155 [=](const QByteArray &key, const QByteArray &value) -> bool {
156 Sink::EntityBuffer buffer(value.data(), value.size());
157 const Sink::Entity &entity = buffer.entity();
158 const auto metadataBuffer = Sink::EntityBuffer::readBuffer<Sink::Metadata>(entity.metadata());
159 const qint64 revision = metadataBuffer ? metadataBuffer->revision() : -1;
160 const auto operation = metadataBuffer ? metadataBuffer->operation() : Sink::Operation_Creation;
161 auto adaptor = mDomainTypeAdaptorFactory.createAdaptor(entity);
162 resultCallback(DomainType::Ptr::create(mResourceInstanceIdentifier, Sink::Storage::uidFromKey(key), revision, adaptor), operation);
163 return false;
164 },
165 [&](const Sink::Storage::Error &error) { Warning() << "Error during query: " << error.message << key; });
166}
167
168static inline ResultSet fullScan(const Sink::Storage::Transaction &transaction, const QByteArray &bufferType)
169{
170 // TODO use a result set with an iterator, to read values on demand
171 Trace() << "Looking for : " << bufferType;
172 //The scan can return duplicate results if we have multiple revisions, so we use a set to deduplicate.
173 QSet<QByteArray> keys;
174 Storage::mainDatabase(transaction, bufferType)
175 .scan(QByteArray(),
176 [&](const QByteArray &key, const QByteArray &value) -> bool {
177 if (keys.contains(Sink::Storage::uidFromKey(key))) {
178 //Not something that should persist if the replay works, so we keep a message for now.
179 Trace() << "Multiple revisions for key: " << key;
180 }
181 keys << Sink::Storage::uidFromKey(key);
182 return true;
183 },
184 [](const Sink::Storage::Error &error) { Warning() << "Error during query: " << error.message; });
185
186 Trace() << "Full scan retrieved " << keys.size() << " results.";
187 return ResultSet(keys.toList().toVector());
188}
189
190template <class DomainType>
191ResultSet EntityReader<DomainType>::loadInitialResultSet(const Sink::Query &query, QSet<QByteArray> &remainingFilters, QByteArray &remainingSorting)
192{
193 if (!query.ids.isEmpty()) {
194 return ResultSet(query.ids.toVector());
195 }
196 QSet<QByteArray> appliedFilters;
197 QByteArray appliedSorting;
198 auto resultSet = Sink::ApplicationDomain::TypeImplementation<DomainType>::queryIndexes(query, mResourceInstanceIdentifier, appliedFilters, appliedSorting, mTransaction);
199 remainingFilters = query.propertyFilter.keys().toSet() - appliedFilters;
200 if (appliedSorting.isEmpty()) {
201 remainingSorting = query.sortProperty;
202 }
203
204 // We do a full scan if there were no indexes available to create the initial set.
205 if (appliedFilters.isEmpty()) {
206 // TODO this should be replaced by an index lookup as well
207 resultSet = fullScan(mTransaction, ApplicationDomain::getTypeName<DomainType>());
208 }
209 return resultSet;
210}
211
212template <class DomainType>
213ResultSet EntityReader<DomainType>::loadIncrementalResultSet(qint64 baseRevision, const Sink::Query &query, QSet<QByteArray> &remainingFilters)
214{
215 const auto bufferType = ApplicationDomain::getTypeName<DomainType>();
216 auto revisionCounter = QSharedPointer<qint64>::create(baseRevision);
217 remainingFilters = query.propertyFilter.keys().toSet();
218 return ResultSet([this, bufferType, revisionCounter]() -> QByteArray {
219 const qint64 topRevision = Sink::Storage::maxRevision(mTransaction);
220 // Spit out the revision keys one by one.
221 while (*revisionCounter <= topRevision) {
222 const auto uid = Sink::Storage::getUidFromRevision(mTransaction, *revisionCounter);
223 const auto type = Sink::Storage::getTypeFromRevision(mTransaction, *revisionCounter);
224 // Trace() << "Revision" << *revisionCounter << type << uid;
225 Q_ASSERT(!uid.isEmpty());
226 Q_ASSERT(!type.isEmpty());
227 if (type != bufferType) {
228 // Skip revision
229 *revisionCounter += 1;
230 continue;
231 }
232 const auto key = Sink::Storage::assembleKey(uid, *revisionCounter);
233 *revisionCounter += 1;
234 return key;
235 }
236 Trace() << "Finished reading incremental result set:" << *revisionCounter;
237 // We're done
238 return QByteArray();
239 });
240}
241
242template <class DomainType>
243ResultSet EntityReader<DomainType>::filterAndSortSet(ResultSet &resultSet, const std::function<bool(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> &filter,
244 const Sink::Storage::NamedDatabase &db, bool initialQuery, const QByteArray &sortProperty)
245{
246 const bool sortingRequired = !sortProperty.isEmpty();
247 if (initialQuery && sortingRequired) {
248 Trace() << "Sorting the resultset in memory according to property: " << sortProperty;
249 // Sort the complete set by reading the sort property and filling into a sorted map
250 auto sortedMap = QSharedPointer<QMap<QByteArray, QByteArray>>::create();
251 while (resultSet.next()) {
252 // readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess)
253 readEntity(db, resultSet.id(),
254 [this, filter, initialQuery, sortedMap, sortProperty, &resultSet](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject, Sink::Operation operation) {
255 // We're not interested in removals during the initial query
256 if ((operation != Sink::Operation_Removal) && filter(domainObject)) {
257 if (!sortProperty.isEmpty()) {
258 const auto sortValue = domainObject->getProperty(sortProperty);
259 if (sortValue.type() == QVariant::DateTime) {
260 sortedMap->insert(QByteArray::number(std::numeric_limits<unsigned int>::max() - sortValue.toDateTime().toTime_t()), domainObject->identifier());
261 } else {
262 sortedMap->insert(sortValue.toString().toLatin1(), domainObject->identifier());
263 }
264 } else {
265 sortedMap->insert(domainObject->identifier(), domainObject->identifier());
266 }
267 }
268 });
269 }
270
271 Trace() << "Sorted " << sortedMap->size() << " values.";
272 auto iterator = QSharedPointer<QMapIterator<QByteArray, QByteArray>>::create(*sortedMap);
273 ResultSet::ValueGenerator generator = [this, iterator, sortedMap, &db, filter, initialQuery](
274 std::function<void(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &, Sink::Operation)> callback) -> bool {
275 if (iterator->hasNext()) {
276 readEntity(db, iterator->next().value(), [this, filter, callback, initialQuery](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject,
277 Sink::Operation operation) { callback(domainObject, Sink::Operation_Creation); });
278 return true;
279 }
280 return false;
281 };
282
283 auto skip = [iterator]() {
284 if (iterator->hasNext()) {
285 iterator->next();
286 }
287 };
288 return ResultSet(generator, skip);
289 } else {
290 auto resultSetPtr = QSharedPointer<ResultSet>::create(resultSet);
291 ResultSet::ValueGenerator generator = [this, resultSetPtr, &db, filter, initialQuery](
292 std::function<void(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &, Sink::Operation)> callback) -> bool {
293 if (resultSetPtr->next()) {
294 // readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess)
295 readEntity(db, resultSetPtr->id(), [this, filter, callback, initialQuery](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject, Sink::Operation operation) {
296 if (initialQuery) {
297 // We're not interested in removals during the initial query
298 if ((operation != Sink::Operation_Removal) && filter(domainObject)) {
299 // In the initial set every entity is new
300 callback(domainObject, Sink::Operation_Creation);
301 }
302 } else {
303 // Always remove removals, they probably don't match due to non-available properties
304 if ((operation == Sink::Operation_Removal) || filter(domainObject)) {
305 // TODO only replay if this is in the currently visible set (or just always replay, worst case we have a couple to many results)
306 callback(domainObject, operation);
307 }
308 }
309 });
310 return true;
311 }
312 return false;
313 };
314 auto skip = [resultSetPtr]() { resultSetPtr->skip(1); };
315 return ResultSet(generator, skip);
316 }
317}
318
319template <class DomainType>
320QPair<qint64, qint64> EntityReader<DomainType>::load(const Sink::Query &query, const std::function<ResultSet(QSet<QByteArray> &, QByteArray &)> &baseSetRetriever, bool initialQuery, int offset, int batchSize, const std::function<bool(const typename DomainType::Ptr &value, Sink::Operation operation)> &callback)
321{
322 QTime time;
323 time.start();
324
325 auto db = Storage::mainDatabase(mTransaction, ApplicationDomain::getTypeName<DomainType>());
326
327 QSet<QByteArray> remainingFilters;
328 QByteArray remainingSorting;
329 auto resultSet = baseSetRetriever(remainingFilters, remainingSorting);
330 Trace() << "Base set retrieved. " << Log::TraceTime(time.elapsed());
331 auto filteredSet = filterAndSortSet(resultSet, getFilter(remainingFilters, query), db, initialQuery, remainingSorting);
332 Trace() << "Filtered set retrieved. " << Log::TraceTime(time.elapsed());
333 auto replayedEntities = replaySet(filteredSet, offset, batchSize, callback);
334 // Trace() << "Filtered set replayed. " << Log::TraceTime(time.elapsed());
335 return qMakePair(Sink::Storage::maxRevision(mTransaction), replayedEntities);
336}
337
338template <class DomainType>
339QPair<qint64, qint64> EntityReader<DomainType>::executeInitialQuery(const Sink::Query &query, int offset, int batchsize, const std::function<bool(const typename DomainType::Ptr &value, Sink::Operation operation)> &callback)
340{
341 QTime time;
342 time.start();
343 auto revisionAndReplayedEntities = load(query, [&](QSet<QByteArray> &remainingFilters, QByteArray &remainingSorting) -> ResultSet {
344 return loadInitialResultSet(query, remainingFilters, remainingSorting);
345 }, true, offset, batchsize, callback);
346 Trace() << "Initial query took: " << Log::TraceTime(time.elapsed());
347 return revisionAndReplayedEntities;
348}
349
350template <class DomainType>
351QPair<qint64, qint64> EntityReader<DomainType>::executeIncrementalQuery(const Sink::Query &query, qint64 lastRevision, const std::function<bool(const typename DomainType::Ptr &value, Sink::Operation operation)> &callback)
352{
353 QTime time;
354 time.start();
355 const qint64 baseRevision = lastRevision + 1;
356 auto revisionAndReplayedEntities = load(query, [&](QSet<QByteArray> &remainingFilters, QByteArray &remainingSorting) -> ResultSet {
357 return loadIncrementalResultSet(baseRevision, query, remainingFilters);
358 }, false, 0, 0, callback);
359 Trace() << "Initial query took: " << Log::TraceTime(time.elapsed());
360 return revisionAndReplayedEntities;
361}
362
363template <class DomainType>
364std::function<bool(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)>
365EntityReader<DomainType>::getFilter(const QSet<QByteArray> remainingFilters, const Sink::Query &query)
366{
367 return [this, remainingFilters, query](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject) -> bool {
368 if (!query.ids.isEmpty()) {
369 if (!query.ids.contains(domainObject->identifier())) {
370 return false;
371 }
372 }
373 for (const auto &filterProperty : remainingFilters) {
374 const auto property = domainObject->getProperty(filterProperty);
375 if (property.isValid()) {
376 const auto comparator = query.propertyFilter.value(filterProperty);
377 if (!comparator.matches(property)) {
378 Trace() << "Filtering entity due to property mismatch on filter: " << filterProperty << property << ":" << comparator.value;
379 return false;
380 }
381 } else {
382 Warning() << "Ignored property filter because value is invalid: " << filterProperty;
383 }
384 }
385 return true;
386 };
387}
388
389template <class DomainType>
390qint64 EntityReader<DomainType>::replaySet(ResultSet &resultSet, int offset, int batchSize, const std::function<bool(const typename DomainType::Ptr &value, Sink::Operation operation)> &callback)
391{
392 Trace() << "Skipping over " << offset << " results";
393 resultSet.skip(offset);
394 int counter = 0;
395 while (!batchSize || (counter < batchSize)) {
396 const bool ret =
397 resultSet.next([&counter, callback](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &value, Sink::Operation operation) -> bool {
398 counter++;
399 return callback(value.staticCast<DomainType>(), operation);
400 });
401 if (!ret) {
402 break;
403 }
404 };
405 Trace() << "Replayed " << counter << " results."
406 << "Limit " << batchSize;
407 return counter;
408}
409
410template class Sink::EntityReader<Sink::ApplicationDomain::Folder>;
411template class Sink::EntityReader<Sink::ApplicationDomain::Mail>;
412template class Sink::EntityReader<Sink::ApplicationDomain::Event>;