From 4d9746c828558c9f872e0aed52442863affb25d5 Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Thu, 3 Mar 2016 09:01:05 +0100 Subject: Fromatted the whole codebase with clang-format. clang-format -i */**{.cpp,.h} --- common/queryrunner.cpp | 292 +++++++++++++++++++++++++------------------------ 1 file changed, 147 insertions(+), 145 deletions(-) (limited to 'common/queryrunner.cpp') diff --git a/common/queryrunner.cpp b/common/queryrunner.cpp index 5ac1344..c150159 100644 --- a/common/queryrunner.cpp +++ b/common/queryrunner.cpp @@ -39,11 +39,12 @@ using namespace Sink; * This is a worker object that can be moved to a thread to execute the query. * The only interaction point is the ResultProvider, which handles the threadsafe reporting of the result. */ -template +template class QueryWorker : public QObject { public: - QueryWorker(const Sink::Query &query, const QByteArray &instanceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &, const QByteArray &bufferType, const QueryRunnerBase::ResultTransformation &transformation); + QueryWorker(const Sink::Query &query, const QByteArray &instanceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &, const QByteArray &bufferType, + const QueryRunnerBase::ResultTransformation &transformation); virtual ~QueryWorker(); qint64 executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface &resultProvider); @@ -52,14 +53,17 @@ public: private: void replaySet(ResultSet &resultSet, Sink::ResultProviderInterface &resultProvider, const QList &properties, int offset, int batchSize); - void readEntity(const Sink::Storage::NamedDatabase &db, const QByteArray &key, const std::function &resultCallback); + void readEntity(const Sink::Storage::NamedDatabase &db, const QByteArray &key, + const std::function &resultCallback); ResultSet loadInitialResultSet(const Sink::Query &query, Sink::Storage::Transaction &transaction, QSet &remainingFilters, QByteArray &remainingSorting); ResultSet loadIncrementalResultSet(qint64 baseRevision, const Sink::Query &query, Sink::Storage::Transaction &transaction, QSet &remainingFilters); - ResultSet filterAndSortSet(ResultSet &resultSet, const std::function &filter, const Sink::Storage::NamedDatabase &db, bool initialQuery, const QByteArray &sortProperty); + ResultSet filterAndSortSet(ResultSet &resultSet, const std::function &filter, + const Sink::Storage::NamedDatabase &db, bool initialQuery, const QByteArray &sortProperty); std::function getFilter(const QSet remainingFilters, const Sink::Query &query); - qint64 load(const Sink::Query &query, const std::function &, QByteArray &)> &baseSetRetriever, Sink::ResultProviderInterface &resultProvider, bool initialQuery, int offset, int batchSize); + qint64 load(const Sink::Query &query, const std::function &, QByteArray &)> &baseSetRetriever, + Sink::ResultProviderInterface &resultProvider, bool initialQuery, int offset, int batchSize); private: QueryRunnerBase::ResultTransformation mResultTransformation; @@ -70,176 +74,171 @@ private: }; -template -QueryRunner::QueryRunner(const Sink::Query &query, const Sink::ResourceAccessInterface::Ptr &resourceAccess, const QByteArray &instanceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &factory, const QByteArray &bufferType) - : QueryRunnerBase(), - mResourceAccess(resourceAccess), - mResultProvider(new ResultProvider), - mOffset(0), - mBatchSize(query.limit) +template +QueryRunner::QueryRunner(const Sink::Query &query, const Sink::ResourceAccessInterface::Ptr &resourceAccess, const QByteArray &instanceIdentifier, + const DomainTypeAdaptorFactoryInterface::Ptr &factory, const QByteArray &bufferType) + : QueryRunnerBase(), mResourceAccess(resourceAccess), mResultProvider(new ResultProvider), mOffset(0), mBatchSize(query.limit) { Trace() << "Starting query"; if (query.limit && query.sortProperty.isEmpty()) { Warning() << "A limited query without sorting is typically a bad idea."; } - //We delegate loading of initial data to the result provider, os it can decide for itself what it needs to load. + // We delegate loading of initial data to the result provider, os it can decide for itself what it needs to load. mResultProvider->setFetcher([=](const typename DomainType::Ptr &parent) { Trace() << "Running fetcher. Offset: " << mOffset << " Batchsize: " << mBatchSize; auto resultProvider = mResultProvider; async::run([=]() -> qint64 { - QueryWorker worker(query, instanceIdentifier, factory, bufferType, mResultTransformation); - const qint64 newRevision = worker.executeInitialQuery(query, parent, *resultProvider, mOffset, mBatchSize); - return newRevision; - }) + QueryWorker worker(query, instanceIdentifier, factory, bufferType, mResultTransformation); + const qint64 newRevision = worker.executeInitialQuery(query, parent, *resultProvider, mOffset, mBatchSize); + return newRevision; + }) .template then([query, this](qint64 newRevision) { mOffset += mBatchSize; - //Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. + // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. if (query.liveQuery) { mResourceAccess->sendRevisionReplayedCommand(newRevision); } - }).exec(); + }) + .exec(); }); // In case of a live query we keep the runner for as long alive as the result provider exists if (query.liveQuery) { - //Incremental updates are always loaded directly, leaving it up to the result to discard the changes if they are not interesting - setQuery([=] () -> KAsync::Job { + // Incremental updates are always loaded directly, leaving it up to the result to discard the changes if they are not interesting + setQuery([=]() -> KAsync::Job { auto resultProvider = mResultProvider; return async::run([=]() -> qint64 { - QueryWorker worker(query, instanceIdentifier, factory, bufferType, mResultTransformation); - const qint64 newRevision = worker.executeIncrementalQuery(query, *resultProvider); - return newRevision; - }) + QueryWorker worker(query, instanceIdentifier, factory, bufferType, mResultTransformation); + const qint64 newRevision = worker.executeIncrementalQuery(query, *resultProvider); + return newRevision; + }) .template then([query, this](qint64 newRevision) { - //Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. + // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. mResourceAccess->sendRevisionReplayedCommand(newRevision); }); }); - //Ensure the connection is open, if it wasn't already opened - //TODO If we are not connected already, we have to check for the latest revision once connected, otherwise we could miss some updates + // Ensure the connection is open, if it wasn't already opened + // TODO If we are not connected already, we have to check for the latest revision once connected, otherwise we could miss some updates mResourceAccess->open(); QObject::connect(mResourceAccess.data(), &Sink::ResourceAccess::revisionChanged, this, &QueryRunner::revisionChanged); } } -template +template QueryRunner::~QueryRunner() { Trace() << "Stopped query"; } -template +template void QueryRunner::setResultTransformation(const ResultTransformation &transformation) { mResultTransformation = transformation; } -template +template typename Sink::ResultEmitter::Ptr QueryRunner::emitter() { return mResultProvider->emitter(); } - static inline ResultSet fullScan(const Sink::Storage::Transaction &transaction, const QByteArray &bufferType) { - //TODO use a result set with an iterator, to read values on demand + // TODO use a result set with an iterator, to read values on demand QVector keys; - Storage::mainDatabase(transaction, bufferType).scan(QByteArray(), [&](const QByteArray &key, const QByteArray &value) -> bool { - //Skip internals - if (Sink::Storage::isInternalKey(key)) { - return true; - } - keys << Sink::Storage::uidFromKey(key); - return true; - }, - [](const Sink::Storage::Error &error) { - Warning() << "Error during query: " << error.message; - }); + Storage::mainDatabase(transaction, bufferType) + .scan(QByteArray(), + [&](const QByteArray &key, const QByteArray &value) -> bool { + // Skip internals + if (Sink::Storage::isInternalKey(key)) { + return true; + } + keys << Sink::Storage::uidFromKey(key); + return true; + }, + [](const Sink::Storage::Error &error) { Warning() << "Error during query: " << error.message; }); Trace() << "Full scan retrieved " << keys.size() << " results."; return ResultSet(keys); } -template -QueryWorker::QueryWorker(const Sink::Query &query, const QByteArray &instanceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &factory, const QByteArray &bufferType, const QueryRunnerBase::ResultTransformation &transformation) - : QObject(), - mResultTransformation(transformation), - mDomainTypeAdaptorFactory(factory), - mResourceInstanceIdentifier(instanceIdentifier), - mBufferType(bufferType), - mQuery(query) +template +QueryWorker::QueryWorker(const Sink::Query &query, const QByteArray &instanceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &factory, + const QByteArray &bufferType, const QueryRunnerBase::ResultTransformation &transformation) + : QObject(), mResultTransformation(transformation), mDomainTypeAdaptorFactory(factory), mResourceInstanceIdentifier(instanceIdentifier), mBufferType(bufferType), mQuery(query) { Trace() << "Starting query worker"; } -template +template QueryWorker::~QueryWorker() { Trace() << "Stopped query worker"; } -template +template void QueryWorker::replaySet(ResultSet &resultSet, Sink::ResultProviderInterface &resultProvider, const QList &properties, int offset, int batchSize) { Trace() << "Skipping over " << offset << " results"; resultSet.skip(offset); int counter; for (counter = 0; !batchSize || (counter < batchSize); counter++) { - const bool ret = resultSet.next([this, &resultProvider, &counter, &properties, batchSize](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &value, Sink::Operation operation) -> bool { - //FIXME allow maildir resource to set the mimeMessage property - auto valueCopy = Sink::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation(*value, properties).template staticCast(); - if (mResultTransformation) { - mResultTransformation(*valueCopy); - } - switch (operation) { - case Sink::Operation_Creation: - // Trace() << "Got creation"; - resultProvider.add(valueCopy); - break; - case Sink::Operation_Modification: - // Trace() << "Got modification"; - resultProvider.modify(valueCopy); - break; - case Sink::Operation_Removal: - // Trace() << "Got removal"; - resultProvider.remove(valueCopy); - break; - } - return true; - }); + const bool ret = + resultSet.next([this, &resultProvider, &counter, &properties, batchSize](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &value, Sink::Operation operation) -> bool { + // FIXME allow maildir resource to set the mimeMessage property + auto valueCopy = Sink::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation(*value, properties).template staticCast(); + if (mResultTransformation) { + mResultTransformation(*valueCopy); + } + switch (operation) { + case Sink::Operation_Creation: + // Trace() << "Got creation"; + resultProvider.add(valueCopy); + break; + case Sink::Operation_Modification: + // Trace() << "Got modification"; + resultProvider.modify(valueCopy); + break; + case Sink::Operation_Removal: + // Trace() << "Got removal"; + resultProvider.remove(valueCopy); + break; + } + return true; + }); if (!ret) { break; } }; - Trace() << "Replayed " << counter << " results." << "Limit " << batchSize; + Trace() << "Replayed " << counter << " results." + << "Limit " << batchSize; } -template -void QueryWorker::readEntity(const Sink::Storage::NamedDatabase &db, const QByteArray &key, const std::function &resultCallback) +template +void QueryWorker::readEntity(const Sink::Storage::NamedDatabase &db, const QByteArray &key, + const std::function &resultCallback) { - //This only works for a 1:1 mapping of resource to domain types. - //Not i.e. for tags that are stored as flags in each entity of an imap store. - //additional properties that don't have a 1:1 mapping (such as separately stored tags), - //could be added to the adaptor. - db.findLatest(key, [=](const QByteArray &key, const QByteArray &value) -> bool { - Sink::EntityBuffer buffer(value.data(), value.size()); - const Sink::Entity &entity = buffer.entity(); - const auto metadataBuffer = Sink::EntityBuffer::readBuffer(entity.metadata()); - const qint64 revision = metadataBuffer ? metadataBuffer->revision() : -1; - const auto operation = metadataBuffer ? metadataBuffer->operation() : Sink::Operation_Creation; - auto adaptor = mDomainTypeAdaptorFactory->createAdaptor(entity); - resultCallback(DomainType::Ptr::create(mResourceInstanceIdentifier, Sink::Storage::uidFromKey(key), revision, adaptor), operation); - return false; - }, - [](const Sink::Storage::Error &error) { - Warning() << "Error during query: " << error.message; - }); + // This only works for a 1:1 mapping of resource to domain types. + // Not i.e. for tags that are stored as flags in each entity of an imap store. + // additional properties that don't have a 1:1 mapping (such as separately stored tags), + // could be added to the adaptor. + db.findLatest(key, + [=](const QByteArray &key, const QByteArray &value) -> bool { + Sink::EntityBuffer buffer(value.data(), value.size()); + const Sink::Entity &entity = buffer.entity(); + const auto metadataBuffer = Sink::EntityBuffer::readBuffer(entity.metadata()); + const qint64 revision = metadataBuffer ? metadataBuffer->revision() : -1; + const auto operation = metadataBuffer ? metadataBuffer->operation() : Sink::Operation_Creation; + auto adaptor = mDomainTypeAdaptorFactory->createAdaptor(entity); + resultCallback(DomainType::Ptr::create(mResourceInstanceIdentifier, Sink::Storage::uidFromKey(key), revision, adaptor), operation); + return false; + }, + [](const Sink::Storage::Error &error) { Warning() << "Error during query: " << error.message; }); } -template +template ResultSet QueryWorker::loadInitialResultSet(const Sink::Query &query, Sink::Storage::Transaction &transaction, QSet &remainingFilters, QByteArray &remainingSorting) { if (!query.ids.isEmpty()) { @@ -253,15 +252,15 @@ ResultSet QueryWorker::loadInitialResultSet(const Sink::Query &query remainingSorting = query.sortProperty; } - //We do a full scan if there were no indexes available to create the initial set. + // We do a full scan if there were no indexes available to create the initial set. if (appliedFilters.isEmpty()) { - //TODO this should be replaced by an index lookup as well + // TODO this should be replaced by an index lookup as well resultSet = fullScan(transaction, mBufferType); } return resultSet; } -template +template ResultSet QueryWorker::loadIncrementalResultSet(qint64 baseRevision, const Sink::Query &query, Sink::Storage::Transaction &transaction, QSet &remainingFilters) { const auto bufferType = mBufferType; @@ -269,13 +268,13 @@ ResultSet QueryWorker::loadIncrementalResultSet(qint64 baseRevision, remainingFilters = query.propertyFilter.keys().toSet(); return ResultSet([bufferType, revisionCounter, &transaction]() -> QByteArray { const qint64 topRevision = Sink::Storage::maxRevision(transaction); - //Spit out the revision keys one by one. + // Spit out the revision keys one by one. while (*revisionCounter <= topRevision) { const auto uid = Sink::Storage::getUidFromRevision(transaction, *revisionCounter); const auto type = Sink::Storage::getTypeFromRevision(transaction, *revisionCounter); // Trace() << "Revision" << *revisionCounter << type << uid; if (type != bufferType) { - //Skip revision + // Skip revision *revisionCounter += 1; continue; } @@ -284,45 +283,47 @@ ResultSet QueryWorker::loadIncrementalResultSet(qint64 baseRevision, return key; } Trace() << "Finished reading incremental result set:" << *revisionCounter; - //We're done + // We're done return QByteArray(); }); } -template -ResultSet QueryWorker::filterAndSortSet(ResultSet &resultSet, const std::function &filter, const Sink::Storage::NamedDatabase &db, bool initialQuery, const QByteArray &sortProperty) +template +ResultSet QueryWorker::filterAndSortSet(ResultSet &resultSet, const std::function &filter, + const Sink::Storage::NamedDatabase &db, bool initialQuery, const QByteArray &sortProperty) { const bool sortingRequired = !sortProperty.isEmpty(); if (initialQuery && sortingRequired) { Trace() << "Sorting the resultset in memory according to property: " << sortProperty; - //Sort the complete set by reading the sort property and filling into a sorted map + // Sort the complete set by reading the sort property and filling into a sorted map auto sortedMap = QSharedPointer>::create(); while (resultSet.next()) { - //readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess) - readEntity(db, resultSet.id(), [this, filter, initialQuery, sortedMap, sortProperty, &resultSet](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject, Sink::Operation operation) { - //We're not interested in removals during the initial query - if ((operation != Sink::Operation_Removal) && filter(domainObject)) { - if (!sortProperty.isEmpty()) { - const auto sortValue = domainObject->getProperty(sortProperty); - if (sortValue.type() == QVariant::DateTime) { - sortedMap->insert(QByteArray::number(std::numeric_limits::max() - sortValue.toDateTime().toTime_t()), domainObject->identifier()); + // readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess) + readEntity(db, resultSet.id(), + [this, filter, initialQuery, sortedMap, sortProperty, &resultSet](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject, Sink::Operation operation) { + // We're not interested in removals during the initial query + if ((operation != Sink::Operation_Removal) && filter(domainObject)) { + if (!sortProperty.isEmpty()) { + const auto sortValue = domainObject->getProperty(sortProperty); + if (sortValue.type() == QVariant::DateTime) { + sortedMap->insert(QByteArray::number(std::numeric_limits::max() - sortValue.toDateTime().toTime_t()), domainObject->identifier()); + } else { + sortedMap->insert(sortValue.toString().toLatin1(), domainObject->identifier()); + } } else { - sortedMap->insert(sortValue.toString().toLatin1(), domainObject->identifier()); + sortedMap->insert(domainObject->identifier(), domainObject->identifier()); } - } else { - sortedMap->insert(domainObject->identifier(), domainObject->identifier()); } - } - }); + }); } Trace() << "Sorted " << sortedMap->size() << " values."; - auto iterator = QSharedPointer >::create(*sortedMap); - ResultSet::ValueGenerator generator = [this, iterator, sortedMap, &db, filter, initialQuery](std::function callback) -> bool { + auto iterator = QSharedPointer>::create(*sortedMap); + ResultSet::ValueGenerator generator = [this, iterator, sortedMap, &db, filter, initialQuery]( + std::function callback) -> bool { if (iterator->hasNext()) { - readEntity(db, iterator->next().value(), [this, filter, callback, initialQuery](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject, Sink::Operation operation) { - callback(domainObject, Sink::Operation_Creation); - }); + readEntity(db, iterator->next().value(), [this, filter, callback, initialQuery](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject, + Sink::Operation operation) { callback(domainObject, Sink::Operation_Creation); }); return true; } return false; @@ -336,19 +337,21 @@ ResultSet QueryWorker::filterAndSortSet(ResultSet &resultSet, const return ResultSet(generator, skip); } else { auto resultSetPtr = QSharedPointer::create(resultSet); - ResultSet::ValueGenerator generator = [this, resultSetPtr, &db, filter, initialQuery](std::function callback) -> bool { + ResultSet::ValueGenerator generator = [this, resultSetPtr, &db, filter, initialQuery]( + std::function callback) -> bool { if (resultSetPtr->next()) { - //readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess) + // readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess) readEntity(db, resultSetPtr->id(), [this, filter, callback, initialQuery](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject, Sink::Operation operation) { if (initialQuery) { - //We're not interested in removals during the initial query + // We're not interested in removals during the initial query if ((operation != Sink::Operation_Removal) && filter(domainObject)) { - //In the initial set every entity is new + // In the initial set every entity is new callback(domainObject, Sink::Operation_Creation); - } } else { - //Always remove removals, they probably don't match due to non-available properties + } + } else { + // Always remove removals, they probably don't match due to non-available properties if ((operation == Sink::Operation_Removal) || filter(domainObject)) { - //TODO only replay if this is in the currently visible set (or just always replay, worst case we have a couple to many results) + // TODO only replay if this is in the currently visible set (or just always replay, worst case we have a couple to many results) callback(domainObject, operation); } } @@ -357,15 +360,14 @@ ResultSet QueryWorker::filterAndSortSet(ResultSet &resultSet, const } return false; }; - auto skip = [resultSetPtr]() { - resultSetPtr->skip(1); - }; + auto skip = [resultSetPtr]() { resultSetPtr->skip(1); }; return ResultSet(generator, skip); } } -template -std::function QueryWorker::getFilter(const QSet remainingFilters, const Sink::Query &query) +template +std::function +QueryWorker::getFilter(const QSet remainingFilters, const Sink::Query &query) { return [remainingFilters, query](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject) -> bool { if (!query.ids.isEmpty()) { @@ -376,7 +378,7 @@ std::functiongetProperty(filterProperty); if (property.isValid()) { - //TODO implement other comparison operators than equality + // TODO implement other comparison operators than equality if (property != query.propertyFilter.value(filterProperty)) { Trace() << "Filtering entity due to property mismatch on filter: " << filterProperty << property << ":" << query.propertyFilter.value(filterProperty); return false; @@ -389,16 +391,15 @@ std::function -qint64 QueryWorker::load(const Sink::Query &query, const std::function &, QByteArray &)> &baseSetRetriever, Sink::ResultProviderInterface &resultProvider, bool initialQuery, int offset, int batchSize) +template +qint64 QueryWorker::load(const Sink::Query &query, const std::function &, QByteArray &)> &baseSetRetriever, + Sink::ResultProviderInterface &resultProvider, bool initialQuery, int offset, int batchSize) { QTime time; time.start(); Sink::Storage storage(Sink::storageLocation(), mResourceInstanceIdentifier); - storage.setDefaultErrorHandler([](const Sink::Storage::Error &error) { - Warning() << "Error during query: " << error.store << error.message; - }); + storage.setDefaultErrorHandler([](const Sink::Storage::Error &error) { Warning() << "Error during query: " << error.store << error.message; }); auto transaction = storage.createTransaction(Sink::Storage::ReadOnly); auto db = Storage::mainDatabase(transaction, mBufferType); @@ -414,7 +415,7 @@ qint64 QueryWorker::load(const Sink::Query &query, const std::functi return Sink::Storage::maxRevision(transaction); } -template +template qint64 QueryWorker::executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface &resultProvider) { QTime time; @@ -429,8 +430,9 @@ qint64 QueryWorker::executeIncrementalQuery(const Sink::Query &query return revision; } -template -qint64 QueryWorker::executeInitialQuery(const Sink::Query &query, const typename DomainType::Ptr &parent, Sink::ResultProviderInterface &resultProvider, int offset, int batchsize) +template +qint64 QueryWorker::executeInitialQuery( + const Sink::Query &query, const typename DomainType::Ptr &parent, Sink::ResultProviderInterface &resultProvider, int offset, int batchsize) { QTime time; time.start(); -- cgit v1.2.3