/* * Copyright (C) 2016 Christian Mollekopf * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) version 3, or any * later version accepted by the membership of KDE e.V. (or its * successor approved by the membership of KDE e.V.), which shall * act as a proxy defined in Section 6 of version 3 of the license. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library. If not, see . */ #include "entityreader.h" #include "resultset.h" #include "storage.h" #include "query.h" SINK_DEBUG_AREA("entityreader") using namespace Sink; QSharedPointer EntityReaderUtils::getLatest(const Sink::Storage::NamedDatabase &db, const QByteArray &uid, DomainTypeAdaptorFactoryInterface &adaptorFactory, qint64 &retrievedRevision) { QSharedPointer current; db.findLatest(uid, [¤t, &adaptorFactory, &retrievedRevision](const QByteArray &key, const QByteArray &data) -> bool { Sink::EntityBuffer buffer(const_cast(data.data()), data.size()); if (!buffer.isValid()) { SinkWarning() << "Read invalid buffer from disk"; } else { SinkTrace() << "Found value " << key; current = adaptorFactory.createAdaptor(buffer.entity()); retrievedRevision = Sink::Storage::revisionFromKey(key); } return false; }, [](const Sink::Storage::Error &error) { SinkWarning() << "Failed to read current value from storage: " << error.message; }); return current; } QSharedPointer EntityReaderUtils::get(const Sink::Storage::NamedDatabase &db, const QByteArray &key, DomainTypeAdaptorFactoryInterface &adaptorFactory, qint64 &retrievedRevision) { QSharedPointer current; db.scan(key, [¤t, &adaptorFactory, &retrievedRevision](const QByteArray &key, const QByteArray &data) -> bool { Sink::EntityBuffer buffer(const_cast(data.data()), data.size()); if (!buffer.isValid()) { SinkWarning() << "Read invalid buffer from disk"; } else { current = adaptorFactory.createAdaptor(buffer.entity()); retrievedRevision = Sink::Storage::revisionFromKey(key); } return false; }, [](const Sink::Storage::Error &error) { SinkWarning() << "Failed to read current value from storage: " << error.message; }); return current; } QSharedPointer EntityReaderUtils::getPrevious(const Sink::Storage::NamedDatabase &db, const QByteArray &uid, qint64 revision, DomainTypeAdaptorFactoryInterface &adaptorFactory, qint64 &retrievedRevision) { QSharedPointer current; qint64 latestRevision = 0; db.scan(uid, [¤t, &latestRevision, revision](const QByteArray &key, const QByteArray &) -> bool { auto foundRevision = Sink::Storage::revisionFromKey(key); if (foundRevision < revision && foundRevision > latestRevision) { latestRevision = foundRevision; } return true; }, [](const Sink::Storage::Error &error) { SinkWarning() << "Failed to read current value from storage: " << error.message; }, true); return get(db, Sink::Storage::assembleKey(uid, latestRevision), adaptorFactory, retrievedRevision); } template EntityReader::EntityReader(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier, Sink::Storage::Transaction &transaction) : mResourceInstanceIdentifier(resourceInstanceIdentifier), mTransaction(transaction), mDomainTypeAdaptorFactoryPtr(Sink::AdaptorFactoryRegistry::instance().getFactory(resourceType)), mDomainTypeAdaptorFactory(*mDomainTypeAdaptorFactoryPtr) { Q_ASSERT(!resourceType.isEmpty()); Q_ASSERT(mDomainTypeAdaptorFactoryPtr); } template EntityReader::EntityReader(DomainTypeAdaptorFactoryInterface &domainTypeAdaptorFactory, const QByteArray &resourceInstanceIdentifier, Sink::Storage::Transaction &transaction) : mResourceInstanceIdentifier(resourceInstanceIdentifier), mTransaction(transaction), mDomainTypeAdaptorFactory(domainTypeAdaptorFactory) { } template DomainType EntityReader::read(const QByteArray &identifier) const { auto typeName = ApplicationDomain::getTypeName(); auto mainDatabase = Storage::mainDatabase(mTransaction, typeName); qint64 retrievedRevision = 0; auto bufferAdaptor = EntityReaderUtils::getLatest(mainDatabase, identifier, mDomainTypeAdaptorFactory, retrievedRevision); if (!bufferAdaptor) { return DomainType(); } return DomainType(mResourceInstanceIdentifier, identifier, retrievedRevision, bufferAdaptor); } template DomainType EntityReader::readFromKey(const QByteArray &key) const { auto typeName = ApplicationDomain::getTypeName(); auto mainDatabase = Storage::mainDatabase(mTransaction, typeName); qint64 retrievedRevision = 0; auto bufferAdaptor = EntityReaderUtils::get(mainDatabase, key, mDomainTypeAdaptorFactory, retrievedRevision); const auto identifier = Storage::uidFromKey(key); if (!bufferAdaptor) { return DomainType(); } return DomainType(mResourceInstanceIdentifier, identifier, retrievedRevision, bufferAdaptor); } template DomainType EntityReader::readPrevious(const QByteArray &uid, qint64 revision) const { auto typeName = ApplicationDomain::getTypeName(); auto mainDatabase = Storage::mainDatabase(mTransaction, typeName); qint64 retrievedRevision = 0; auto bufferAdaptor = EntityReaderUtils::getPrevious(mainDatabase, uid, revision, mDomainTypeAdaptorFactory, retrievedRevision); if (!bufferAdaptor) { return DomainType(); } return DomainType(mResourceInstanceIdentifier, uid, retrievedRevision, bufferAdaptor); } template void EntityReader::query(const Sink::Query &query, const std::function &callback) { executeInitialQuery(query, 0, 0, [&callback](const typename DomainType::Ptr &value, Sink::Operation operation) -> bool { Q_ASSERT(operation == Sink::Operation_Creation); return callback(*value); }); } /* template */ /* void EntityReader::readEntity(const Sink::Storage::NamedDatabase &db, const QByteArray &key, */ /* const std::function &resultCallback) */ /* { */ /* db.findLatest(key, */ /* [=](const QByteArray &key, const QByteArray &value) -> bool { */ /* Sink::EntityBuffer buffer(value.data(), value.size()); */ /* const Sink::Entity &entity = buffer.entity(); */ /* const auto metadataBuffer = Sink::EntityBuffer::readBuffer(entity.metadata()); */ /* const qint64 revision = metadataBuffer ? metadataBuffer->revision() : -1; */ /* const auto operation = metadataBuffer ? metadataBuffer->operation() : Sink::Operation_Creation; */ /* auto adaptor = mDomainTypeAdaptorFactory.createAdaptor(entity); */ /* Q_ASSERT(adaptor); */ /* resultCallback(DomainType::Ptr::create(mResourceInstanceIdentifier, Sink::Storage::uidFromKey(key), revision, adaptor), operation); */ /* return false; */ /* }, */ /* [&](const Sink::Storage::Error &error) { SinkWarning() << "Error during query: " << error.message << key; }); */ /* } */ template QPair EntityReader::executeInitialQuery(const Sink::Query &query, int offset, int batchsize, const std::function &callback) { QTime time; time.start(); auto preparedQuery = ApplicationDomain::TypeImplementation::prepareQuery(query, mTransaction); auto resultSet = preparedQuery.execute(); SinkTrace() << "Filtered set retrieved. " << Log::TraceTime(time.elapsed()); auto replayedEntities = replaySet(resultSet, offset, batchsize, callback); SinkTrace() << "Initial query took: " << Log::TraceTime(time.elapsed()); return qMakePair(Sink::Storage::maxRevision(mTransaction), replayedEntities); } template QPair EntityReader::executeIncrementalQuery(const Sink::Query &query, qint64 lastRevision, const std::function &callback) { QTime time; time.start(); const qint64 baseRevision = lastRevision + 1; auto preparedQuery = ApplicationDomain::TypeImplementation::prepareQuery(query, mTransaction); auto resultSet = preparedQuery.update(baseRevision); SinkTrace() << "Filtered set retrieved. " << Log::TraceTime(time.elapsed()); auto replayedEntities = replaySet(resultSet, 0, 0, callback); SinkTrace() << "Incremental query took: " << Log::TraceTime(time.elapsed()); return qMakePair(Sink::Storage::maxRevision(mTransaction), replayedEntities); } template qint64 EntityReader::replaySet(ResultSet &resultSet, int offset, int batchSize, const std::function &callback) { SinkTrace() << "Skipping over " << offset << " results"; resultSet.skip(offset); int counter = 0; while (!batchSize || (counter < batchSize)) { const bool ret = resultSet.next([this, &counter, callback](const QByteArray &uid, const Sink::EntityBuffer &value, Sink::Operation operation) -> bool { counter++; auto adaptor = mDomainTypeAdaptorFactory.createAdaptor(value.entity()); Q_ASSERT(adaptor); return callback(QSharedPointer::create(mResourceInstanceIdentifier, uid, value.revision(), adaptor), operation); }); if (!ret) { break; } }; SinkTrace() << "Replayed " << counter << " results." << "Limit " << batchSize; return counter; } template class Sink::EntityReader; template class Sink::EntityReader; template class Sink::EntityReader;