From da1c86b80f230c3a2023f97c0048020a12e38de4 Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Tue, 18 Oct 2016 12:57:52 +0200 Subject: Removed EntityReader --- common/CMakeLists.txt | 1 - common/datastorequery.cpp | 12 +- common/datastorequery.h | 4 +- common/entityreader.cpp | 207 ---------------------------------- common/entityreader.h | 95 ---------------- common/queryrunner.cpp | 8 +- common/specialpurposepreprocessor.cpp | 1 - common/synchronizer.cpp | 20 ++-- 8 files changed, 22 insertions(+), 326 deletions(-) delete mode 100644 common/entityreader.cpp delete mode 100644 common/entityreader.h diff --git a/common/CMakeLists.txt b/common/CMakeLists.txt index fd8e898..e329d93 100644 --- a/common/CMakeLists.txt +++ b/common/CMakeLists.txt @@ -71,7 +71,6 @@ set(command_SRCS synchronizer.cpp remoteidmap.cpp sourcewriteback.cpp - entityreader.cpp mailpreprocessor.cpp specialpurposepreprocessor.cpp datastorequery.cpp diff --git a/common/datastorequery.cpp b/common/datastorequery.cpp index f56ac91..9f20ba9 100644 --- a/common/datastorequery.cpp +++ b/common/datastorequery.cpp @@ -298,7 +298,7 @@ public: } }; -DataStoreQuery::DataStoreQuery(const Sink::Query &query, const QByteArray &type, EntityStore::Ptr store) +DataStoreQuery::DataStoreQuery(const Sink::Query &query, const QByteArray &type, EntityStore &store) : mQuery(query), mType(type), mStore(store) { setupQuery(); @@ -306,12 +306,12 @@ DataStoreQuery::DataStoreQuery(const Sink::Query &query, const QByteArray &type, void DataStoreQuery::readEntity(const QByteArray &key, const BufferCallback &resultCallback) { - mStore->readLatest(mType, key, resultCallback); + mStore.readLatest(mType, key, resultCallback); } QVector DataStoreQuery::indexLookup(const QByteArray &property, const QVariant &value) { - return mStore->indexLookup(mType, property, value); + return mStore.indexLookup(mType, property, value); } /* ResultSet DataStoreQuery::filterAndSortSet(ResultSet &resultSet, const FilterFunction &filter, const QByteArray &sortProperty) */ @@ -443,13 +443,13 @@ void DataStoreQuery::setupQuery() } else { QSet appliedFilters; - auto resultSet = mStore->indexLookup(mType, mQuery, appliedFilters, appliedSorting); + auto resultSet = mStore.indexLookup(mType, mQuery, appliedFilters, appliedSorting); remainingFilters = remainingFilters - appliedFilters; // We do a full scan if there were no indexes available to create the initial set. if (appliedFilters.isEmpty()) { // TODO this should be replaced by an index lookup on the uid index - mSource = Source::Ptr::create(mStore->fullScan(mType), this); + mSource = Source::Ptr::create(mStore.fullScan(mType), this); } else { mSource = Source::Ptr::create(resultSet, this); } @@ -492,7 +492,7 @@ QVector DataStoreQuery::loadIncrementalResultSet(qint64 baseRevision { auto revisionCounter = QSharedPointer::create(baseRevision); QVector changedKeys; - mStore->readRevisions(baseRevision, mType, [&](const QByteArray &key) { + mStore.readRevisions(baseRevision, mType, [&](const QByteArray &key) { changedKeys << key; }); SinkTrace() << "Finished reading incremental result set:" << *revisionCounter; diff --git a/common/datastorequery.h b/common/datastorequery.h index 92235fd..25fee66 100644 --- a/common/datastorequery.h +++ b/common/datastorequery.h @@ -35,7 +35,7 @@ class DataStoreQuery { public: typedef QSharedPointer Ptr; - DataStoreQuery(const Sink::Query &query, const QByteArray &type, Sink::Storage::EntityStore::Ptr store); + DataStoreQuery(const Sink::Query &query, const QByteArray &type, Sink::Storage::EntityStore &store); ResultSet execute(); ResultSet update(qint64 baseRevision); @@ -60,7 +60,7 @@ private: QSharedPointer mCollector; QSharedPointer mSource; - QSharedPointer mStore; + Sink::Storage::EntityStore &mStore; SINK_DEBUG_COMPONENT(mType) }; diff --git a/common/entityreader.cpp b/common/entityreader.cpp deleted file mode 100644 index c49d1f7..0000000 --- a/common/entityreader.cpp +++ /dev/null @@ -1,207 +0,0 @@ -/* - * Copyright (C) 2016 Christian Mollekopf - * - * This library is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License as published by the Free Software Foundation; either - * version 2.1 of the License, or (at your option) version 3, or any - * later version accepted by the membership of KDE e.V. (or its - * successor approved by the membership of KDE e.V.), which shall - * act as a proxy defined in Section 6 of version 3 of the license. - * - * This library is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public - * License along with this library. If not, see . - */ -#include "entityreader.h" - -#include "resultset.h" -#include "storage.h" -#include "query.h" -#include "datastorequery.h" - -SINK_DEBUG_AREA("entityreader") - -using namespace Sink; - -/* QSharedPointer EntityReaderUtils::getLatest(const Sink::Storage::DataStore::NamedDatabase &db, const QByteArray &uid, DomainTypeAdaptorFactoryInterface &adaptorFactory, qint64 &retrievedRevision) */ -/* { */ -/* QSharedPointer current; */ -/* db.findLatest(uid, */ -/* [¤t, &adaptorFactory, &retrievedRevision](const QByteArray &key, const QByteArray &data) -> bool { */ -/* Sink::EntityBuffer buffer(const_cast(data.data()), data.size()); */ -/* if (!buffer.isValid()) { */ -/* SinkWarning() << "Read invalid buffer from disk"; */ -/* } else { */ -/* SinkTrace() << "Found value " << key; */ -/* current = adaptorFactory.createAdaptor(buffer.entity()); */ -/* retrievedRevision = Sink::Storage::DataStore::revisionFromKey(key); */ -/* } */ -/* return false; */ -/* }, */ -/* [](const Sink::Storage::DataStore::Error &error) { SinkWarning() << "Failed to read current value from storage: " << error.message; }); */ -/* return current; */ -/* } */ - -/* QSharedPointer EntityReaderUtils::get(const Sink::Storage::DataStore::NamedDatabase &db, const QByteArray &key, DomainTypeAdaptorFactoryInterface &adaptorFactory, qint64 &retrievedRevision) */ -/* { */ -/* QSharedPointer current; */ -/* db.scan(key, */ -/* [¤t, &adaptorFactory, &retrievedRevision](const QByteArray &key, const QByteArray &data) -> bool { */ -/* Sink::EntityBuffer buffer(const_cast(data.data()), data.size()); */ -/* if (!buffer.isValid()) { */ -/* SinkWarning() << "Read invalid buffer from disk"; */ -/* } else { */ -/* current = adaptorFactory.createAdaptor(buffer.entity()); */ -/* retrievedRevision = Sink::Storage::DataStore::revisionFromKey(key); */ -/* } */ -/* return false; */ -/* }, */ -/* [](const Sink::Storage::DataStore::Error &error) { SinkWarning() << "Failed to read current value from storage: " << error.message; }); */ -/* return current; */ -/* } */ - -/* QSharedPointer EntityReaderUtils::getPrevious(const Sink::Storage::DataStore::NamedDatabase &db, const QByteArray &uid, qint64 revision, DomainTypeAdaptorFactoryInterface &adaptorFactory, qint64 &retrievedRevision) */ -/* { */ -/* QSharedPointer current; */ -/* qint64 latestRevision = 0; */ -/* db.scan(uid, */ -/* [¤t, &latestRevision, revision](const QByteArray &key, const QByteArray &) -> bool { */ -/* auto foundRevision = Sink::Storage::DataStore::revisionFromKey(key); */ -/* if (foundRevision < revision && foundRevision > latestRevision) { */ -/* latestRevision = foundRevision; */ -/* } */ -/* return true; */ -/* }, */ -/* [](const Sink::Storage::DataStore::Error &error) { SinkWarning() << "Failed to read current value from storage: " << error.message; }, true); */ -/* return get(db, Sink::Storage::DataStore::assembleKey(uid, latestRevision), adaptorFactory, retrievedRevision); */ -/* } */ - -/* template */ -/* EntityReader::EntityReader(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier, Sink::Storage::DataStore::Transaction &transaction) */ -/* : mResourceInstanceIdentifier(resourceInstanceIdentifier), */ -/* mTransaction(transaction), */ -/* mDomainTypeAdaptorFactoryPtr(Sink::AdaptorFactoryRegistry::instance().getFactory(resourceType)), */ -/* mDomainTypeAdaptorFactory(*mDomainTypeAdaptorFactoryPtr) */ -/* { */ -/* Q_ASSERT(!resourceType.isEmpty()); */ -/* Q_ASSERT(mDomainTypeAdaptorFactoryPtr); */ -/* } */ - -/* template */ -/* EntityReader::EntityReader(DomainTypeAdaptorFactoryInterface &domainTypeAdaptorFactory, const QByteArray &resourceInstanceIdentifier, Sink::Storage::DataStore::Transaction &transaction) */ -/* : mResourceInstanceIdentifier(resourceInstanceIdentifier), */ -/* mTransaction(transaction), */ -/* mDomainTypeAdaptorFactory(domainTypeAdaptorFactory) */ -/* { */ - -/* } */ - -template -EntityReader::EntityReader(Storage::EntityStore &entityStore) - : mEntityStore(entityStore) -{ - -} - -template -DomainType EntityReader::read(const QByteArray &identifier) const -{ - auto typeName = ApplicationDomain::getTypeName(); - return mEntityStore.readLatest(identifier); -} - -template -DomainType EntityReader::readFromKey(const QByteArray &key) const -{ - /* auto typeName = ApplicationDomain::getTypeName(); */ - /* auto mainDatabase = Storage::DataStore::mainDatabase(mTransaction, typeName); */ - /* qint64 retrievedRevision = 0; */ - /* auto bufferAdaptor = EntityReaderUtils::get(mainDatabase, key, mDomainTypeAdaptorFactory, retrievedRevision); */ - /* const auto identifier = Storage::DataStore::uidFromKey(key); */ - /* if (!bufferAdaptor) { */ - /* return DomainType(); */ - /* } */ - /* return DomainType(mResourceInstanceIdentifier, identifier, retrievedRevision, bufferAdaptor); */ - return mEntityStore.readEntity(key); -} - -template -DomainType EntityReader::readPrevious(const QByteArray &uid, qint64 revision) const -{ - return mEntityStore.readPrevious(uid, revision); -} - -template -void EntityReader::query(const Sink::Query &query, const std::function &callback) -{ - executeInitialQuery(query, 0, 0, - [&callback](const typename DomainType::Ptr &value, Sink::Operation operation, const QMap &) -> bool { - Q_ASSERT(operation == Sink::Operation_Creation); - return callback(*value); - }); -} - -template -QPair EntityReader::executeInitialQuery(const Sink::Query &query, int offset, int batchsize, const ResultCallback &callback) -{ - QTime time; - time.start(); - - auto preparedQuery = ApplicationDomain::TypeImplementation::prepareQuery(query, Storage::EntityStore::Ptr(&mEntityStore, [](Storage::EntityStore *){})); - auto resultSet = preparedQuery->execute(); - - SinkTrace() << "Filtered set retrieved. " << Log::TraceTime(time.elapsed()); - auto replayedEntities = replaySet(resultSet, offset, batchsize, callback); - - SinkTrace() << "Initial query took: " << Log::TraceTime(time.elapsed()); - return qMakePair(mEntityStore.maxRevision(), replayedEntities); -} - -template -QPair EntityReader::executeIncrementalQuery(const Sink::Query &query, qint64 lastRevision, const ResultCallback &callback) -{ - QTime time; - time.start(); - const qint64 baseRevision = lastRevision + 1; - - auto preparedQuery = ApplicationDomain::TypeImplementation::prepareQuery(query, Storage::EntityStore::Ptr(&mEntityStore, [](Storage::EntityStore *){})); - auto resultSet = preparedQuery->update(baseRevision); - - SinkTrace() << "Filtered set retrieved. " << Log::TraceTime(time.elapsed()); - auto replayedEntities = replaySet(resultSet, 0, 0, callback); - - SinkTrace() << "Incremental query took: " << Log::TraceTime(time.elapsed()); - return qMakePair(mEntityStore.maxRevision(), replayedEntities); -} - -template -qint64 EntityReader::replaySet(ResultSet &resultSet, int offset, int batchSize, const ResultCallback &callback) -{ - SinkTrace() << "Skipping over " << offset << " results"; - resultSet.skip(offset); - int counter = 0; - /* while (!batchSize || (counter < batchSize)) { */ - /* const bool ret = */ - /* resultSet.next([this, &counter, callback](const ResultSet::Result &result) -> bool { */ - /* counter++; */ - /* auto adaptor = mResourceContext.adaptorFactory().createAdaptor(result.buffer.entity()); */ - /* Q_ASSERT(adaptor); */ - /* return callback(QSharedPointer::create(mResourceContext, result.uid, result.buffer.revision(), adaptor), result.operation, result.aggregateValues); */ - /* }); */ - /* if (!ret) { */ - /* break; */ - /* } */ - /* }; */ - SinkTrace() << "Replayed " << counter << " results." - << "Limit " << batchSize; - return counter; -} - -template class Sink::EntityReader; -template class Sink::EntityReader; -template class Sink::EntityReader; diff --git a/common/entityreader.h b/common/entityreader.h deleted file mode 100644 index a641106..0000000 --- a/common/entityreader.h +++ /dev/null @@ -1,95 +0,0 @@ - -/* - * Copyright (C) 2016 Christian Mollekopf - * - * This library is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License as published by the Free Software Foundation; either - * version 2.1 of the License, or (at your option) version 3, or any - * later version accepted by the membership of KDE e.V. (or its - * successor approved by the membership of KDE e.V.), which shall - * act as a proxy defined in Section 6 of version 3 of the license. - * - * This library is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public - * License along with this library. If not, see . - */ -#pragma once - -#include "sink_export.h" -#include - -#include "storage.h" -#include "resultprovider.h" -#include "adaptorfactoryregistry.h" - -namespace Sink { - -namespace EntityReaderUtils { - SINK_EXPORT QSharedPointer getLatest(const Sink::Storage::DataStore::NamedDatabase &db, const QByteArray &uid, DomainTypeAdaptorFactoryInterface &adaptorFactory, qint64 &retrievedRevision); - SINK_EXPORT QSharedPointer get(const Sink::Storage::DataStore::NamedDatabase &db, const QByteArray &key, DomainTypeAdaptorFactoryInterface &adaptorFactory, qint64 &retrievedRevision); - SINK_EXPORT QSharedPointer getPrevious(const Sink::Storage::DataStore::NamedDatabase &db, const QByteArray &uid, qint64 revision, DomainTypeAdaptorFactoryInterface &adaptorFactory, qint64 &retrievedRevision); -}; - -/** - * A synchronous interface to read entities from the storage. - * - * All callbacks will be called before the end of the function. - * The caller must ensure passed in references remain valid for the lifetime of the object. - * - * This class is meant to be instantiated temporarily during reads on the stack. - * - * Note that all objects returned in callbacks are only valid during the execution of the callback and may start pointing into invalid memory if shallow-copied. - */ -template -class SINK_EXPORT EntityReader -{ - typedef std::function &aggregateValues)> ResultCallback; - -public: - EntityReader(Storage::EntityStore &store); - - /** - * Reads the latest revision of an entity identified by @param uid - */ - DomainType read(const QByteArray &uid) const; - - /** - * Reads the revision of the entity identified by @param key (uid + revision) - */ - DomainType readFromKey(const QByteArray &key) const; - - /** - * Reads the (revision - 1) of an entity identified by @param uid - */ - DomainType readPrevious(const QByteArray &uid, qint64 revision) const; - - /** - * Reads all entities that match @param query. - */ - void query(const Sink::Query &query, const std::function &callback); - - /** - * Returns all entities that match @param query. - * - * @param offset and @param batchsize can be used to return paginated results. - */ - QPair executeInitialQuery(const Sink::Query &query, int offset, int batchsize, const ResultCallback &callback); - - /** - * Returns all changed entities that match @param query starting from @param lastRevision - */ - QPair executeIncrementalQuery(const Sink::Query &query, qint64 lastRevision, const ResultCallback &callback); - -private: - qint64 replaySet(ResultSet &resultSet, int offset, int batchSize, const ResultCallback &callback); - -private: - Sink::Storage::EntityStore &mEntityStore; -}; - -} diff --git a/common/queryrunner.cpp b/common/queryrunner.cpp index 67e83ea..bc4b5fc 100644 --- a/common/queryrunner.cpp +++ b/common/queryrunner.cpp @@ -206,7 +206,7 @@ QPair QueryWorker::executeIncrementalQuery(const Sin time.start(); const qint64 baseRevision = resultProvider.revision() + 1; - auto entityStore = EntityStore::Ptr::create(mResourceContext); + auto entityStore = EntityStore{mResourceContext}; auto preparedQuery = DataStoreQuery{query, ApplicationDomain::getTypeName(), entityStore}; auto resultSet = preparedQuery.update(baseRevision); SinkTrace() << "Filtered set retrieved. " << Log::TraceTime(time.elapsed()); @@ -215,7 +215,7 @@ QPair QueryWorker::executeIncrementalQuery(const Sin }); SinkTrace() << "Incremental query took: " << Log::TraceTime(time.elapsed()); - return qMakePair(entityStore->maxRevision(), replayedEntities); + return qMakePair(entityStore.maxRevision(), replayedEntities); } template @@ -236,7 +236,7 @@ QPair QueryWorker::executeInitialQuery( } } - auto entityStore = EntityStore::Ptr::create(mResourceContext); + auto entityStore = EntityStore{mResourceContext}; auto preparedQuery = DataStoreQuery{query, ApplicationDomain::getTypeName(), entityStore}; auto resultSet = preparedQuery.execute(); @@ -246,7 +246,7 @@ QPair QueryWorker::executeInitialQuery( }); SinkTrace() << "Initial query took: " << Log::TraceTime(time.elapsed()); - return qMakePair(entityStore->maxRevision(), replayedEntities); + return qMakePair(entityStore.maxRevision(), replayedEntities); } template class QueryRunner; diff --git a/common/specialpurposepreprocessor.cpp b/common/specialpurposepreprocessor.cpp index a6ee373..0fd8e34 100644 --- a/common/specialpurposepreprocessor.cpp +++ b/common/specialpurposepreprocessor.cpp @@ -1,5 +1,4 @@ #include "specialpurposepreprocessor.h" -#include "entityreader.h" #include "query.h" #include "applicationdomaintype.h" diff --git a/common/synchronizer.cpp b/common/synchronizer.cpp index 9bba9cf..206cf5e 100644 --- a/common/synchronizer.cpp +++ b/common/synchronizer.cpp @@ -23,7 +23,7 @@ #include "commands.h" #include "bufferutils.h" #include "remoteidmap.h" -#include "entityreader.h" +#include "datastorequery.h" #include "createentity_generated.h" #include "modifyentity_generated.h" #include "deleteentity_generated.h" @@ -200,15 +200,15 @@ void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray query.filter(it.key(), it.value()); } bool merge = false; - Storage::EntityStore store(mResourceContext); - Sink::EntityReader reader(store); - reader.query(query, - [this, bufferType, remoteId, &merge](const DomainType &o) -> bool{ - merge = true; - SinkTrace() << "Merging local entity with remote entity: " << o.identifier() << remoteId; - syncStore().recordRemoteId(bufferType, o.identifier(), remoteId); - return false; - }); + Storage::EntityStore store{mResourceContext}; + DataStoreQuery dataStoreQuery{query, ApplicationDomain::getTypeName(), store}; + auto resultSet = dataStoreQuery.execute(); + resultSet.replaySet(0, 1, [this, &merge, bufferType, remoteId](const ResultSet::Result &r) { + merge = true; + SinkTrace() << "Merging local entity with remote entity: " << r.entity.identifier() << remoteId; + syncStore().recordRemoteId(bufferType, r.entity.identifier(), remoteId); + }); + if (!merge) { SinkTrace() << "Found a new entity: " << remoteId; createEntity(sinkId, bufferType, entity); -- cgit v1.2.3