From f689ad1021a7805f6f8b6a81f534b4cb9ca91f51 Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Tue, 6 Oct 2015 16:19:51 +0200 Subject: Change replay So far only includes modifications and additions, removals are not yet stored as separate revisions. --- common/entitystorage.cpp | 75 ++++++++++++++++++++++++++++++++++++--------- common/entitystorage.h | 38 +++++++++++++++++++---- common/facade.h | 4 +-- common/genericresource.cpp | 3 +- common/pipeline.cpp | 3 +- common/resourceaccess.h | 2 +- common/resultset.h | 12 ++++++-- common/storage_common.cpp | 19 +++--------- examples/client/main.cpp | 4 +++ tests/dummyresourcetest.cpp | 4 --- tests/genericfacadetest.cpp | 5 +++ tests/storagetest.cpp | 9 ++++++ 12 files changed, 131 insertions(+), 47 deletions(-) diff --git a/common/entitystorage.cpp b/common/entitystorage.cpp index 0eb2763..60d58ad 100644 --- a/common/entitystorage.cpp +++ b/common/entitystorage.cpp @@ -44,18 +44,21 @@ static void scan(const Akonadi2::Storage::Transaction &transaction, const QByteA }); } -void EntityStorageBase::readValue(const Akonadi2::Storage::Transaction &transaction, const QByteArray &key, const std::function &resultCallback) +void EntityStorageBase::readEntity(const Akonadi2::Storage::Transaction &transaction, const QByteArray &key, const std::function &resultCallback) { + //This only works for a 1:1 mapping of resource to domain types. + //Not i.e. for tags that are stored as flags in each entity of an imap store. + //additional properties that don't have a 1:1 mapping (such as separately stored tags), + //could be added to the adaptor. + //TODO: resource implementations should be able to customize the retrieval function for non 1:1 entity-buffer mapping cases scan(transaction, key, [=](const QByteArray &key, const Akonadi2::Entity &entity) { const auto metadataBuffer = Akonadi2::EntityBuffer::readBuffer(entity.metadata()); + Q_ASSERT(metadataBuffer); qint64 revision = metadataBuffer ? metadataBuffer->revision() : -1; - //This only works for a 1:1 mapping of resource to domain types. - //Not i.e. for tags that are stored as flags in each entity of an imap store. - //additional properties that don't have a 1:1 mapping (such as separately stored tags), - //could be added to the adaptor. + auto operation = metadataBuffer->operation(); auto domainObject = create(key, revision, mDomainTypeAdaptorFactory->createAdaptor(entity)); - resultCallback(domainObject); + resultCallback(domainObject, operation); return false; }, mBufferType); } @@ -80,16 +83,22 @@ static ResultSet fullScan(const Akonadi2::Storage::Transaction &transaction, con return ResultSet(keys); } -ResultSet EntityStorageBase::filteredSet(const ResultSet &resultSet, const std::function &filter, const Akonadi2::Storage::Transaction &transaction, qint64 baseRevision, qint64 topRevision) +ResultSet EntityStorageBase::filteredSet(const ResultSet &resultSet, const std::function &filter, const Akonadi2::Storage::Transaction &transaction, bool initialQuery) { auto resultSetPtr = QSharedPointer::create(resultSet); //Read through the source values and return whatever matches the filter - std::function)> generator = [this, resultSetPtr, &transaction, filter](std::function callback) -> bool { + std::function)> generator = [this, resultSetPtr, &transaction, filter, initialQuery](std::function callback) -> bool { while (resultSetPtr->next()) { - readValue(transaction, resultSetPtr->id(), [this, filter, callback](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject) { + //TODO. every read value is actually a revision that contains one of three operations. Reflect that so the result set can be updated appropriately. + //TODO while getting the initial set everything is adding + readEntity(transaction, resultSetPtr->id(), [this, filter, callback, initialQuery](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject, Akonadi2::Operation operation) { if (filter(domainObject)) { - callback(domainObject); + if (initialQuery) { + callback(domainObject, Akonadi2::Operation_Creation); + } else { + callback(domainObject, operation); + } } }); } @@ -98,15 +107,51 @@ ResultSet EntityStorageBase::filteredSet(const ResultSet &resultSet, const std:: return ResultSet(generator); } -ResultSet EntityStorageBase::getResultSet(const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, qint64 baseRevision, qint64 topRevision) +ResultSet EntityStorageBase::loadInitialResultSet(const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet &remainingFilters) { QSet appliedFilters; - ResultSet resultSet = queryIndexes(query, mResourceInstanceIdentifier, appliedFilters, transaction); - const auto remainingFilters = query.propertyFilter.keys().toSet() - appliedFilters; + auto resultSet = queryIndexes(query, mResourceInstanceIdentifier, appliedFilters, transaction); + remainingFilters = query.propertyFilter.keys().toSet() - appliedFilters; //We do a full scan if there were no indexes available to create the initial set. if (appliedFilters.isEmpty()) { - resultSet = fullScan(transaction, mBufferType); + //TODO this should be replaced by an index lookup as well + return fullScan(transaction, mBufferType); + } + return resultSet; +} + +ResultSet EntityStorageBase::getResultSet(const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, qint64 baseRevision, qint64 topRevision) +{ + QSet remainingFilters = query.propertyFilter.keys().toSet(); + ResultSet resultSet; + const bool initialQuery = (baseRevision == 0); + if (initialQuery) { + Trace() << "Initial result set update"; + resultSet = loadInitialResultSet(query, transaction, remainingFilters); + } else { + //TODO fallback in case the old revision is no longer available to clear + redo complete initial scan + Trace() << "Incremental result set update" << baseRevision << topRevision; + auto revisionCounter = QSharedPointer::create(baseRevision); + resultSet = ResultSet([revisionCounter, topRevision, &transaction, this]() -> QByteArray { + //Spit out the revision keys one by one. + while (*revisionCounter <= topRevision) { + const auto uid = Akonadi2::Storage::getUidFromRevision(transaction, *revisionCounter); + const auto type = Akonadi2::Storage::getTypeFromRevision(transaction, *revisionCounter); + Trace() << "Revision" << *revisionCounter << type << uid; + if (type != mBufferType) { + //Skip revision + *revisionCounter += 1; + continue; + } + const auto key = Akonadi2::Storage::assembleKey(uid, *revisionCounter); + *revisionCounter += 1; + return key; + } + //We're done + //FIXME make sure result set understands that this means we're done + return QByteArray(); + }); } auto filter = [remainingFilters, query, baseRevision, topRevision](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject) -> bool { @@ -125,5 +170,5 @@ ResultSet EntityStorageBase::getResultSet(const Akonadi2::Query &query, Akonadi2 return true; }; - return filteredSet(resultSet, filter, transaction, baseRevision, topRevision); + return filteredSet(resultSet, filter, transaction, initialQuery); } diff --git a/common/entitystorage.h b/common/entitystorage.h index 9d928b8..f1d7f84 100644 --- a/common/entitystorage.h +++ b/common/entitystorage.h @@ -31,6 +31,8 @@ /** * Wraps storage, entity adaptor factory and indexes into one. + * + * TODO: customize with readEntity instead of adaptor factory */ class EntityStorageBase { @@ -46,14 +48,26 @@ protected: virtual Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr copy(const Akonadi2::ApplicationDomain::ApplicationDomainType &) = 0; virtual ResultSet queryIndexes(const Akonadi2::Query &query, const QByteArray &resourceInstanceIdentifier, QSet &appliedFilters, Akonadi2::Storage::Transaction &transaction) = 0; - void readValue(const Akonadi2::Storage::Transaction &transaction, const QByteArray &key, const std::function &resultCallback); - ResultSet filteredSet(const ResultSet &resultSet, const std::function &filter, const Akonadi2::Storage::Transaction &transaction, qint64 baseRevision, qint64 topRevision); + /** + * Loads a single entity by uid from storage. + * + * TODO: Resources should be able to customize this for cases where an entity is not the same as a single buffer. + */ + void readEntity(const Akonadi2::Storage::Transaction &transaction, const QByteArray &key, const std::function &resultCallback); ResultSet getResultSet(const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, qint64 baseRevision, qint64 topRevision); protected: QByteArray mResourceInstanceIdentifier; QByteArray mBufferType; DomainTypeAdaptorFactoryInterface::Ptr mDomainTypeAdaptorFactory; +private: + /** + * Returns the initial result set that still needs to be filtered. + * + * To make this efficient indexes should be chosen that are as selective as possible. + */ + ResultSet loadInitialResultSet(const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet &remainingFilters); + ResultSet filteredSet(const ResultSet &resultSet, const std::function &filter, const Akonadi2::Storage::Transaction &transaction, bool isInitialQuery); }; template @@ -95,13 +109,25 @@ public: auto transaction = storage.createTransaction(Akonadi2::Storage::ReadOnly); Log() << "Querying" << revisionRange.first << revisionRange.second; + //TODO fallback in case the old revision is no longer available to clear + redo complete initial scan + // auto resultSet = getResultSet(query, transaction, revisionRange.first, revisionRange.second); - while(resultSet.next([this, resultProvider](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &value) -> bool { - auto cloned = copy(*value); - resultProvider->add(cloned.template staticCast()); + while(resultSet.next([this, resultProvider](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &value, Akonadi2::Operation operation) -> bool { + switch (operation) { + case Akonadi2::Operation_Creation: + Trace() << "Got creation"; + resultProvider->add(copy(*value).template staticCast()); + break; + case Akonadi2::Operation_Modification: + Trace() << "Got modification"; + resultProvider->add(copy(*value).template staticCast()); + break; + case Akonadi2::Operation_Removal: + Trace() << "Got removal"; + break; + } return true; })){}; - //TODO replay removals and modifications } }; diff --git a/common/facade.h b/common/facade.h index d53ec4a..dab1578 100644 --- a/common/facade.h +++ b/common/facade.h @@ -56,8 +56,8 @@ public: if (mLatestRevision == newRevision && mLatestRevision > 0) { return KAsync::null(); } - return queryFunction(mLatestRevision + 1, newRevision).then([this](qint64 revision) { - mLatestRevision = revision; + return queryFunction(mLatestRevision, newRevision).then([this](qint64 revision) { + mLatestRevision = revision + 1; }).then([](){}); } diff --git a/common/genericresource.cpp b/common/genericresource.cpp index 4abcecd..acf84c4 100644 --- a/common/genericresource.cpp +++ b/common/genericresource.cpp @@ -204,10 +204,11 @@ void GenericResource::enqueueCommand(MessageQueue &mq, int commandId, const QByt void GenericResource::processCommand(int commandId, const QByteArray &data) { static int modifications = 0; + const int batchSize = 100; mUserQueue.startTransaction(); enqueueCommand(mUserQueue, commandId, data); modifications++; - if (modifications >= 100) { + if (modifications >= batchSize) { mUserQueue.commit(); modifications = 0; mCommitQueueTimer.stop(); diff --git a/common/pipeline.cpp b/common/pipeline.cpp index c108540..6c75bde 100644 --- a/common/pipeline.cpp +++ b/common/pipeline.cpp @@ -220,6 +220,7 @@ KAsync::Job Pipeline::modifiedEntity(void const *command, size_t size) } } + //TODO use only readPropertyMapper and writePropertyMapper auto adaptorFactory = d->adaptorFactory.value(bufferType); if (!adaptorFactory) { Warning() << "no adaptor factory for type " << bufferType; @@ -255,6 +256,7 @@ KAsync::Job Pipeline::modifiedEntity(void const *command, size_t size) //Apply diff //FIXME only apply the properties that are available in the buffer + Trace() << "Applying changed properties: " << diff->availableProperties(); for (const auto &property : diff->availableProperties()) { newObject->setProperty(property, diff->getProperty(property)); } @@ -277,7 +279,6 @@ KAsync::Job Pipeline::modifiedEntity(void const *command, size_t size) flatbuffers::FlatBufferBuilder fbb; adaptorFactory->createBuffer(*newObject, fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize()); - //TODO don't overwrite the old entry, but instead store a new revision d->transaction.openDatabase(bufferType + ".main").write(Akonadi2::Storage::assembleKey(key, newRevision), QByteArray::fromRawData(reinterpret_cast(fbb.GetBufferPointer()), fbb.GetSize())); Akonadi2::Storage::setMaxRevision(d->transaction, newRevision); Akonadi2::Storage::recordRevision(d->transaction, newRevision, key, bufferType); diff --git a/common/resourceaccess.h b/common/resourceaccess.h index e6b9d91..1ff9ca6 100644 --- a/common/resourceaccess.h +++ b/common/resourceaccess.h @@ -49,7 +49,7 @@ public: Q_SIGNALS: void ready(bool isReady); - void revisionChanged(unsigned long long revision); + void revisionChanged(qint64 revision); public Q_SLOTS: virtual void open() = 0; diff --git a/common/resultset.h b/common/resultset.h index 1a19100..a888177 100644 --- a/common/resultset.h +++ b/common/resultset.h @@ -21,6 +21,7 @@ #include #include #include "domain/applicationdomaintype.h" +#include "metadata_generated.h" /* * An iterator to a result set. @@ -30,8 +31,13 @@ class ResultSet { public: + ResultSet() + : mIt(nullptr) + { + + } - ResultSet(const std::function)> &generator) + ResultSet(const std::function)> &generator) : mIt(nullptr), mValueGenerator(generator) { @@ -67,7 +73,7 @@ class ResultSet { return false; } - bool next(std::function callback) + bool next(std::function callback) { Q_ASSERT(mValueGenerator); return mValueGenerator(callback); @@ -107,6 +113,6 @@ class ResultSet { QVector::ConstIterator mIt; QByteArray mCurrentValue; std::function mGenerator; - std::function)> mValueGenerator; + std::function)> mValueGenerator; }; diff --git a/common/storage_common.cpp b/common/storage_common.cpp index dc02aec..512a13f 100644 --- a/common/storage_common.cpp +++ b/common/storage_common.cpp @@ -66,10 +66,7 @@ qint64 Storage::maxRevision(const Akonadi2::Storage::Transaction &transaction) r = revision.toLongLong(); return false; }, [](const Error &error){ - if (error.code != ErrorCodes::NotFound) { - //FIXME - // defaultErrorHandler()(error); - } + std::cout << "Coultn'd find uid for revision "; }); return r; } @@ -80,11 +77,8 @@ QByteArray Storage::getUidFromRevision(const Akonadi2::Storage::Transaction &tra transaction.openDatabase("revisions").scan(QByteArray::number(revision), [&](const QByteArray &, const QByteArray &value) -> bool { uid = value; return false; - }, [](const Error &error){ - if (error.code != ErrorCodes::NotFound) { - //FIXME - // defaultErrorHandler()(error); - } + }, [revision](const Error &error){ + std::cout << "Coultn'd find uid for revision " << revision; }); return uid; } @@ -95,11 +89,8 @@ QByteArray Storage::getTypeFromRevision(const Akonadi2::Storage::Transaction &tr transaction.openDatabase("revisionType").scan(QByteArray::number(revision), [&](const QByteArray &, const QByteArray &value) -> bool { type = value; return false; - }, [](const Error &error){ - if (error.code != ErrorCodes::NotFound) { - //FIXME - // defaultErrorHandler()(error); - } + }, [revision](const Error &error){ + std::cout << "Coultn'd find type for revision " << revision; }); return type; } diff --git a/examples/client/main.cpp b/examples/client/main.cpp index a0ca51b..ead9dd6 100644 --- a/examples/client/main.cpp +++ b/examples/client/main.cpp @@ -128,6 +128,7 @@ int main(int argc, char *argv[]) cliOptions.addPositionalArgument(QObject::tr("[resource]"), QObject::tr("A resource to connect to")); cliOptions.addOption(QCommandLineOption("clear")); + cliOptions.addOption(QCommandLineOption("debuglevel")); cliOptions.addHelpOption(); cliOptions.process(app); QStringList resources = cliOptions.positionalArguments(); @@ -143,6 +144,9 @@ int main(int argc, char *argv[]) } return 0; } + if (cliOptions.isSet("debuglevel")) { + Akonadi2::Log::setDebugOutputLevel(static_cast(cliOptions.value("debuglevel").toInt())); + } //Ensure resource is ready for (const auto &resource : resources) { diff --git a/tests/dummyresourcetest.cpp b/tests/dummyresourcetest.cpp index a28e071..caf808a 100644 --- a/tests/dummyresourcetest.cpp +++ b/tests/dummyresourcetest.cpp @@ -2,10 +2,6 @@ #include -#include "event_generated.h" -#include "entity_generated.h" -#include "metadata_generated.h" -#include "createentity_generated.h" #include "dummyresource/resourcefactory.h" #include "clientapi.h" #include "synclistresult.h" diff --git a/tests/genericfacadetest.cpp b/tests/genericfacadetest.cpp index 45ca54d..4c58b91 100644 --- a/tests/genericfacadetest.cpp +++ b/tests/genericfacadetest.cpp @@ -68,6 +68,11 @@ class GenericFacadeTest : public QObject Q_OBJECT private Q_SLOTS: + void init() + { + Akonadi2::Log::setDebugOutputLevel(Akonadi2::Log::Trace); + } + void testLoad() { Akonadi2::Query query; diff --git a/tests/storagetest.cpp b/tests/storagetest.cpp index 8d5ee00..8e841cb 100644 --- a/tests/storagetest.cpp +++ b/tests/storagetest.cpp @@ -381,6 +381,15 @@ private Q_SLOTS: QCOMPARE(result, QByteArray("value2")); } + + void testRecordRevision() + { + Akonadi2::Storage store(testDataPath, dbName, Akonadi2::Storage::ReadWrite); + auto transaction = store.createTransaction(Akonadi2::Storage::ReadWrite); + Akonadi2::Storage::recordRevision(transaction, 1, "uid", "type"); + QCOMPARE(Akonadi2::Storage::getTypeFromRevision(transaction, 1), QByteArray("type")); + QCOMPARE(Akonadi2::Storage::getUidFromRevision(transaction, 1), QByteArray("uid")); + } }; QTEST_MAIN(StorageTest) -- cgit v1.2.3