diff options
Diffstat (limited to 'common/datastorequery.cpp')
-rw-r--r-- | common/datastorequery.cpp | 62 |
1 files changed, 12 insertions, 50 deletions
diff --git a/common/datastorequery.cpp b/common/datastorequery.cpp index 7b7d3a3..d4a83b1 100644 --- a/common/datastorequery.cpp +++ b/common/datastorequery.cpp | |||
@@ -28,6 +28,7 @@ | |||
28 | #include "event.h" | 28 | #include "event.h" |
29 | 29 | ||
30 | using namespace Sink; | 30 | using namespace Sink; |
31 | using namespace Sink::Storage; | ||
31 | 32 | ||
32 | 33 | ||
33 | SINK_DEBUG_AREA("datastorequery") | 34 | SINK_DEBUG_AREA("datastorequery") |
@@ -299,42 +300,18 @@ public: | |||
299 | } | 300 | } |
300 | }; | 301 | }; |
301 | 302 | ||
302 | DataStoreQuery::DataStoreQuery(const Sink::Query &query, const QByteArray &type, Sink::Storage::Transaction &transaction, TypeIndex &typeIndex, std::function<QVariant(const Sink::Entity &entity, const QByteArray &property)> getProperty) | 303 | DataStoreQuery::DataStoreQuery(const Sink::Query &query, const QByteArray &type, EntityStore::Ptr store, TypeIndex &typeIndex, std::function<QVariant(const Sink::Entity &entity, const QByteArray &property)> getProperty) |
303 | : mQuery(query), mTransaction(transaction), mType(type), mTypeIndex(typeIndex), mDb(Storage::mainDatabase(mTransaction, mType)), mGetProperty(getProperty) | 304 | : mQuery(query), mType(type), mTypeIndex(typeIndex), mGetProperty(getProperty), mStore(store) |
304 | { | 305 | { |
305 | setupQuery(); | 306 | setupQuery(); |
306 | } | 307 | } |
307 | 308 | ||
308 | static inline QVector<QByteArray> fullScan(const Sink::Storage::Transaction &transaction, const QByteArray &bufferType) | ||
309 | { | ||
310 | // TODO use a result set with an iterator, to read values on demand | ||
311 | SinkTrace() << "Looking for : " << bufferType; | ||
312 | //The scan can return duplicate results if we have multiple revisions, so we use a set to deduplicate. | ||
313 | QSet<QByteArray> keys; | ||
314 | Storage::mainDatabase(transaction, bufferType) | ||
315 | .scan(QByteArray(), | ||
316 | [&](const QByteArray &key, const QByteArray &value) -> bool { | ||
317 | if (keys.contains(Sink::Storage::uidFromKey(key))) { | ||
318 | //Not something that should persist if the replay works, so we keep a message for now. | ||
319 | SinkTrace() << "Multiple revisions for key: " << key; | ||
320 | } | ||
321 | keys << Sink::Storage::uidFromKey(key); | ||
322 | return true; | ||
323 | }, | ||
324 | [](const Sink::Storage::Error &error) { SinkWarning() << "Error during query: " << error.message; }); | ||
325 | |||
326 | SinkTrace() << "Full scan retrieved " << keys.size() << " results."; | ||
327 | return keys.toList().toVector(); | ||
328 | } | ||
329 | |||
330 | void DataStoreQuery::readEntity(const QByteArray &key, const BufferCallback &resultCallback) | 309 | void DataStoreQuery::readEntity(const QByteArray &key, const BufferCallback &resultCallback) |
331 | { | 310 | { |
332 | mDb.findLatest(key, | 311 | mStore->readLatest(mType, key, [=](const QByteArray &key, const Sink::EntityBuffer &buffer) { |
333 | [=](const QByteArray &key, const QByteArray &value) -> bool { | 312 | resultCallback(DataStore::uidFromKey(key), buffer); |
334 | resultCallback(Sink::Storage::uidFromKey(key), Sink::EntityBuffer(value.data(), value.size())); | ||
335 | return false; | 313 | return false; |
336 | }, | 314 | }); |
337 | [&](const Sink::Storage::Error &error) { SinkWarning() << "Error during query: " << error.message << key; }); | ||
338 | } | 315 | } |
339 | 316 | ||
340 | QVariant DataStoreQuery::getProperty(const Sink::Entity &entity, const QByteArray &property) | 317 | QVariant DataStoreQuery::getProperty(const Sink::Entity &entity, const QByteArray &property) |
@@ -344,7 +321,7 @@ QVariant DataStoreQuery::getProperty(const Sink::Entity &entity, const QByteArra | |||
344 | 321 | ||
345 | QVector<QByteArray> DataStoreQuery::indexLookup(const QByteArray &property, const QVariant &value) | 322 | QVector<QByteArray> DataStoreQuery::indexLookup(const QByteArray &property, const QVariant &value) |
346 | { | 323 | { |
347 | return mTypeIndex.lookup(property, value, mTransaction); | 324 | return mStore->indexLookup(mType, property, value); |
348 | } | 325 | } |
349 | 326 | ||
350 | /* ResultSet DataStoreQuery::filterAndSortSet(ResultSet &resultSet, const FilterFunction &filter, const QByteArray &sortProperty) */ | 327 | /* ResultSet DataStoreQuery::filterAndSortSet(ResultSet &resultSet, const FilterFunction &filter, const QByteArray &sortProperty) */ |
@@ -444,7 +421,7 @@ QSharedPointer<DataStoreQuery> prepareQuery(const QByteArray &type, Args && ... | |||
444 | QByteArrayList DataStoreQuery::executeSubquery(const Query &subquery) | 421 | QByteArrayList DataStoreQuery::executeSubquery(const Query &subquery) |
445 | { | 422 | { |
446 | Q_ASSERT(!subquery.type.isEmpty()); | 423 | Q_ASSERT(!subquery.type.isEmpty()); |
447 | auto sub = prepareQuery(subquery.type, subquery, mTransaction); | 424 | auto sub = prepareQuery(subquery.type, subquery, mStore); |
448 | auto result = sub->execute(); | 425 | auto result = sub->execute(); |
449 | QByteArrayList ids; | 426 | QByteArrayList ids; |
450 | while (result.next([&ids](const ResultSet::Result &result) { | 427 | while (result.next([&ids](const ResultSet::Result &result) { |
@@ -476,13 +453,13 @@ void DataStoreQuery::setupQuery() | |||
476 | } else { | 453 | } else { |
477 | QSet<QByteArray> appliedFilters; | 454 | QSet<QByteArray> appliedFilters; |
478 | 455 | ||
479 | auto resultSet = mTypeIndex.query(mQuery, appliedFilters, appliedSorting, mTransaction); | 456 | auto resultSet = mStore->indexLookup(mType, mQuery, appliedFilters, appliedSorting); |
480 | remainingFilters = remainingFilters - appliedFilters; | 457 | remainingFilters = remainingFilters - appliedFilters; |
481 | 458 | ||
482 | // We do a full scan if there were no indexes available to create the initial set. | 459 | // We do a full scan if there were no indexes available to create the initial set. |
483 | if (appliedFilters.isEmpty()) { | 460 | if (appliedFilters.isEmpty()) { |
484 | // TODO this should be replaced by an index lookup on the uid index | 461 | // TODO this should be replaced by an index lookup on the uid index |
485 | mSource = Source::Ptr::create(fullScan(mTransaction, mType), this); | 462 | mSource = Source::Ptr::create(mStore->fullScan(mType), this); |
486 | } else { | 463 | } else { |
487 | mSource = Source::Ptr::create(resultSet, this); | 464 | mSource = Source::Ptr::create(resultSet, this); |
488 | } | 465 | } |
@@ -523,26 +500,11 @@ void DataStoreQuery::setupQuery() | |||
523 | 500 | ||
524 | QVector<QByteArray> DataStoreQuery::loadIncrementalResultSet(qint64 baseRevision) | 501 | QVector<QByteArray> DataStoreQuery::loadIncrementalResultSet(qint64 baseRevision) |
525 | { | 502 | { |
526 | const auto bufferType = mType; | ||
527 | auto revisionCounter = QSharedPointer<qint64>::create(baseRevision); | 503 | auto revisionCounter = QSharedPointer<qint64>::create(baseRevision); |
528 | QVector<QByteArray> changedKeys; | 504 | QVector<QByteArray> changedKeys; |
529 | const qint64 topRevision = Sink::Storage::maxRevision(mTransaction); | 505 | mStore->readRevisions(baseRevision, mType, [&](const QByteArray &key) { |
530 | // Spit out the revision keys one by one. | ||
531 | while (*revisionCounter <= topRevision) { | ||
532 | const auto uid = Sink::Storage::getUidFromRevision(mTransaction, *revisionCounter); | ||
533 | const auto type = Sink::Storage::getTypeFromRevision(mTransaction, *revisionCounter); | ||
534 | // SinkTrace() << "Revision" << *revisionCounter << type << uid; | ||
535 | Q_ASSERT(!uid.isEmpty()); | ||
536 | Q_ASSERT(!type.isEmpty()); | ||
537 | if (type != bufferType) { | ||
538 | // Skip revision | ||
539 | *revisionCounter += 1; | ||
540 | continue; | ||
541 | } | ||
542 | const auto key = Sink::Storage::assembleKey(uid, *revisionCounter); | ||
543 | *revisionCounter += 1; | ||
544 | changedKeys << key; | 506 | changedKeys << key; |
545 | } | 507 | }); |
546 | SinkTrace() << "Finished reading incremental result set:" << *revisionCounter; | 508 | SinkTrace() << "Finished reading incremental result set:" << *revisionCounter; |
547 | return changedKeys; | 509 | return changedKeys; |
548 | } | 510 | } |