summaryrefslogtreecommitdiffstats
path: root/common/datastorequery.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'common/datastorequery.cpp')
-rw-r--r--common/datastorequery.cpp62
1 files changed, 12 insertions, 50 deletions
diff --git a/common/datastorequery.cpp b/common/datastorequery.cpp
index 7b7d3a3..d4a83b1 100644
--- a/common/datastorequery.cpp
+++ b/common/datastorequery.cpp
@@ -28,6 +28,7 @@
28#include "event.h" 28#include "event.h"
29 29
30using namespace Sink; 30using namespace Sink;
31using namespace Sink::Storage;
31 32
32 33
33SINK_DEBUG_AREA("datastorequery") 34SINK_DEBUG_AREA("datastorequery")
@@ -299,42 +300,18 @@ public:
299 } 300 }
300}; 301};
301 302
302DataStoreQuery::DataStoreQuery(const Sink::Query &query, const QByteArray &type, Sink::Storage::Transaction &transaction, TypeIndex &typeIndex, std::function<QVariant(const Sink::Entity &entity, const QByteArray &property)> getProperty) 303DataStoreQuery::DataStoreQuery(const Sink::Query &query, const QByteArray &type, EntityStore::Ptr store, TypeIndex &typeIndex, std::function<QVariant(const Sink::Entity &entity, const QByteArray &property)> getProperty)
303 : mQuery(query), mTransaction(transaction), mType(type), mTypeIndex(typeIndex), mDb(Storage::mainDatabase(mTransaction, mType)), mGetProperty(getProperty) 304 : mQuery(query), mType(type), mTypeIndex(typeIndex), mGetProperty(getProperty), mStore(store)
304{ 305{
305 setupQuery(); 306 setupQuery();
306} 307}
307 308
308static inline QVector<QByteArray> fullScan(const Sink::Storage::Transaction &transaction, const QByteArray &bufferType)
309{
310 // TODO use a result set with an iterator, to read values on demand
311 SinkTrace() << "Looking for : " << bufferType;
312 //The scan can return duplicate results if we have multiple revisions, so we use a set to deduplicate.
313 QSet<QByteArray> keys;
314 Storage::mainDatabase(transaction, bufferType)
315 .scan(QByteArray(),
316 [&](const QByteArray &key, const QByteArray &value) -> bool {
317 if (keys.contains(Sink::Storage::uidFromKey(key))) {
318 //Not something that should persist if the replay works, so we keep a message for now.
319 SinkTrace() << "Multiple revisions for key: " << key;
320 }
321 keys << Sink::Storage::uidFromKey(key);
322 return true;
323 },
324 [](const Sink::Storage::Error &error) { SinkWarning() << "Error during query: " << error.message; });
325
326 SinkTrace() << "Full scan retrieved " << keys.size() << " results.";
327 return keys.toList().toVector();
328}
329
330void DataStoreQuery::readEntity(const QByteArray &key, const BufferCallback &resultCallback) 309void DataStoreQuery::readEntity(const QByteArray &key, const BufferCallback &resultCallback)
331{ 310{
332 mDb.findLatest(key, 311 mStore->readLatest(mType, key, [=](const QByteArray &key, const Sink::EntityBuffer &buffer) {
333 [=](const QByteArray &key, const QByteArray &value) -> bool { 312 resultCallback(DataStore::uidFromKey(key), buffer);
334 resultCallback(Sink::Storage::uidFromKey(key), Sink::EntityBuffer(value.data(), value.size()));
335 return false; 313 return false;
336 }, 314 });
337 [&](const Sink::Storage::Error &error) { SinkWarning() << "Error during query: " << error.message << key; });
338} 315}
339 316
340QVariant DataStoreQuery::getProperty(const Sink::Entity &entity, const QByteArray &property) 317QVariant DataStoreQuery::getProperty(const Sink::Entity &entity, const QByteArray &property)
@@ -344,7 +321,7 @@ QVariant DataStoreQuery::getProperty(const Sink::Entity &entity, const QByteArra
344 321
345QVector<QByteArray> DataStoreQuery::indexLookup(const QByteArray &property, const QVariant &value) 322QVector<QByteArray> DataStoreQuery::indexLookup(const QByteArray &property, const QVariant &value)
346{ 323{
347 return mTypeIndex.lookup(property, value, mTransaction); 324 return mStore->indexLookup(mType, property, value);
348} 325}
349 326
350/* ResultSet DataStoreQuery::filterAndSortSet(ResultSet &resultSet, const FilterFunction &filter, const QByteArray &sortProperty) */ 327/* ResultSet DataStoreQuery::filterAndSortSet(ResultSet &resultSet, const FilterFunction &filter, const QByteArray &sortProperty) */
@@ -444,7 +421,7 @@ QSharedPointer<DataStoreQuery> prepareQuery(const QByteArray &type, Args && ...
444QByteArrayList DataStoreQuery::executeSubquery(const Query &subquery) 421QByteArrayList DataStoreQuery::executeSubquery(const Query &subquery)
445{ 422{
446 Q_ASSERT(!subquery.type.isEmpty()); 423 Q_ASSERT(!subquery.type.isEmpty());
447 auto sub = prepareQuery(subquery.type, subquery, mTransaction); 424 auto sub = prepareQuery(subquery.type, subquery, mStore);
448 auto result = sub->execute(); 425 auto result = sub->execute();
449 QByteArrayList ids; 426 QByteArrayList ids;
450 while (result.next([&ids](const ResultSet::Result &result) { 427 while (result.next([&ids](const ResultSet::Result &result) {
@@ -476,13 +453,13 @@ void DataStoreQuery::setupQuery()
476 } else { 453 } else {
477 QSet<QByteArray> appliedFilters; 454 QSet<QByteArray> appliedFilters;
478 455
479 auto resultSet = mTypeIndex.query(mQuery, appliedFilters, appliedSorting, mTransaction); 456 auto resultSet = mStore->indexLookup(mType, mQuery, appliedFilters, appliedSorting);
480 remainingFilters = remainingFilters - appliedFilters; 457 remainingFilters = remainingFilters - appliedFilters;
481 458
482 // We do a full scan if there were no indexes available to create the initial set. 459 // We do a full scan if there were no indexes available to create the initial set.
483 if (appliedFilters.isEmpty()) { 460 if (appliedFilters.isEmpty()) {
484 // TODO this should be replaced by an index lookup on the uid index 461 // TODO this should be replaced by an index lookup on the uid index
485 mSource = Source::Ptr::create(fullScan(mTransaction, mType), this); 462 mSource = Source::Ptr::create(mStore->fullScan(mType), this);
486 } else { 463 } else {
487 mSource = Source::Ptr::create(resultSet, this); 464 mSource = Source::Ptr::create(resultSet, this);
488 } 465 }
@@ -523,26 +500,11 @@ void DataStoreQuery::setupQuery()
523 500
524QVector<QByteArray> DataStoreQuery::loadIncrementalResultSet(qint64 baseRevision) 501QVector<QByteArray> DataStoreQuery::loadIncrementalResultSet(qint64 baseRevision)
525{ 502{
526 const auto bufferType = mType;
527 auto revisionCounter = QSharedPointer<qint64>::create(baseRevision); 503 auto revisionCounter = QSharedPointer<qint64>::create(baseRevision);
528 QVector<QByteArray> changedKeys; 504 QVector<QByteArray> changedKeys;
529 const qint64 topRevision = Sink::Storage::maxRevision(mTransaction); 505 mStore->readRevisions(baseRevision, mType, [&](const QByteArray &key) {
530 // Spit out the revision keys one by one.
531 while (*revisionCounter <= topRevision) {
532 const auto uid = Sink::Storage::getUidFromRevision(mTransaction, *revisionCounter);
533 const auto type = Sink::Storage::getTypeFromRevision(mTransaction, *revisionCounter);
534 // SinkTrace() << "Revision" << *revisionCounter << type << uid;
535 Q_ASSERT(!uid.isEmpty());
536 Q_ASSERT(!type.isEmpty());
537 if (type != bufferType) {
538 // Skip revision
539 *revisionCounter += 1;
540 continue;
541 }
542 const auto key = Sink::Storage::assembleKey(uid, *revisionCounter);
543 *revisionCounter += 1;
544 changedKeys << key; 506 changedKeys << key;
545 } 507 });
546 SinkTrace() << "Finished reading incremental result set:" << *revisionCounter; 508 SinkTrace() << "Finished reading incremental result set:" << *revisionCounter;
547 return changedKeys; 509 return changedKeys;
548} 510}