From 129333371d28c06d85f75ca579ce17798e615e84 Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Wed, 28 Oct 2015 16:39:16 +0100 Subject: Made pipeline preprocessing synchronous. Instead of having the asynchronous preprocessor concept with different pipelines for new/modify/delete we have a single pipeline with synchronous preprocessors that act upon new/modify/delete. This keeps the code simpler due to lack of asynchronity and keeps the new/modify/delete operations together (which at least for the indexing makes a lot of sense). Not supporting asynchronity is ok because the tasks done in preprocessing are not cpu intensive (if they were we had a problem since they are directly involved in the round-trip time), and the main cost comes from i/o, meaning we don't gain much by doing multithreading. Costly tasks (such as full-text indexing) should rather be implemented as post-processing, since that doesn't increase the round-trip time directly, and eventually consistent is typically good enough for that. --- common/domain/event.cpp | 6 +- common/domain/event.h | 2 +- common/domain/mail.cpp | 7 +- common/domain/mail.h | 2 +- common/genericresource.cpp | 10 +- common/genericresource.h | 1 + common/pipeline.cpp | 286 +++++++++++---------------------------------- common/pipeline.h | 90 +------------- 8 files changed, 87 insertions(+), 317 deletions(-) (limited to 'common') diff --git a/common/domain/event.cpp b/common/domain/event.cpp index 9759fc3..83a6906 100644 --- a/common/domain/event.cpp +++ b/common/domain/event.cpp @@ -50,11 +50,11 @@ ResultSet TypeImplementation::queryIndexes(const Akonadi2::Query &query, return ResultSet(keys); } -void TypeImplementation::index(const Event &type, Akonadi2::Storage::Transaction &transaction) +void TypeImplementation::index(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Akonadi2::Storage::Transaction &transaction) { - const auto uid = type.getProperty("uid"); + const auto uid = bufferAdaptor.getProperty("uid"); if (uid.isValid()) { - Index("event.index.uid", transaction).add(uid.toByteArray(), type.identifier()); + Index("event.index.uid", transaction).add(uid.toByteArray(), identifier); } } diff --git a/common/domain/event.h b/common/domain/event.h index f21cd34..e9ba52a 100644 --- a/common/domain/event.h +++ b/common/domain/event.h @@ -56,7 +56,7 @@ public: * An empty result set indicates that a full scan is required. */ static ResultSet queryIndexes(const Akonadi2::Query &query, const QByteArray &resourceInstanceIdentifier, QSet &appliedFilters, Akonadi2::Storage::Transaction &transaction); - static void index(const Event &type, Akonadi2::Storage::Transaction &transaction); + static void index(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Akonadi2::Storage::Transaction &transaction); static QSharedPointer > initializeReadPropertyMapper(); static QSharedPointer > initializeWritePropertyMapper(); }; diff --git a/common/domain/mail.cpp b/common/domain/mail.cpp index d40dde9..ffe322e 100644 --- a/common/domain/mail.cpp +++ b/common/domain/mail.cpp @@ -50,12 +50,11 @@ ResultSet TypeImplementation::queryIndexes(const Akonadi2::Query &query, c return ResultSet(keys); } -void TypeImplementation::index(const Mail &type, Akonadi2::Storage::Transaction &transaction) +void TypeImplementation::index(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Akonadi2::Storage::Transaction &transaction) { - const auto uid = type.getProperty("uid"); + const auto uid = bufferAdaptor.getProperty("uid"); if (uid.isValid()) { - Index uidIndex("mail.index.uid", transaction); - uidIndex.add(uid.toByteArray(), type.identifier()); + Index("mail.index.uid", transaction).add(uid.toByteArray(), identifier); } } diff --git a/common/domain/mail.h b/common/domain/mail.h index b58ce44..38f1d03 100644 --- a/common/domain/mail.h +++ b/common/domain/mail.h @@ -51,7 +51,7 @@ public: * An empty result set indicates that a full scan is required. */ static ResultSet queryIndexes(const Akonadi2::Query &query, const QByteArray &resourceInstanceIdentifier, QSet &appliedFilters, Akonadi2::Storage::Transaction &transaction); - static void index(const Mail &type, Akonadi2::Storage::Transaction &transaction); + static void index(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Akonadi2::Storage::Transaction &transaction); static QSharedPointer > initializeReadPropertyMapper(); static QSharedPointer > initializeWritePropertyMapper(); }; diff --git a/common/genericresource.cpp b/common/genericresource.cpp index dcae43d..ec68f33 100644 --- a/common/genericresource.cpp +++ b/common/genericresource.cpp @@ -285,10 +285,12 @@ GenericResource::~GenericResource() delete mSourceChangeReplay; } -// void GenericResource::revisionChanged() -// { -// //TODO replay revision -// } +void GenericResource::removeFromDisk(const QByteArray &instanceIdentifier) +{ + Akonadi2::Storage(Akonadi2::storageLocation(), instanceIdentifier, Akonadi2::Storage::ReadWrite).removeFromDisk(); + Akonadi2::Storage(Akonadi2::storageLocation(), instanceIdentifier + ".userqueue", Akonadi2::Storage::ReadWrite).removeFromDisk(); + Akonadi2::Storage(Akonadi2::storageLocation(), instanceIdentifier + ".synchronizerqueue", Akonadi2::Storage::ReadWrite).removeFromDisk(); +} void GenericResource::onProcessorError(int errorCode, const QString &errorMessage) { diff --git a/common/genericresource.h b/common/genericresource.h index cfc6653..33de0e7 100644 --- a/common/genericresource.h +++ b/common/genericresource.h @@ -48,6 +48,7 @@ public: int error() const; + static void removeFromDisk(const QByteArray &instanceIdentifier); private Q_SLOTS: void updateLowerBoundRevision(); diff --git a/common/pipeline.cpp b/common/pipeline.cpp index 15d2401..de63288 100644 --- a/common/pipeline.cpp +++ b/common/pipeline.cpp @@ -1,5 +1,6 @@ /* * Copyright (C) 2014 Aaron Seigo + * Copyright (C) 2015 Christian Mollekopf * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public @@ -41,19 +42,13 @@ class Pipeline::Private { public: Private(const QString &resourceName) - : storage(Akonadi2::storageLocation(), resourceName, Storage::ReadWrite), - stepScheduled(false) + : storage(Akonadi2::storageLocation(), resourceName, Storage::ReadWrite) { } Storage storage; Storage::Transaction transaction; - QHash > nullPipeline; - QHash > newPipeline; - QHash > modifiedPipeline; - QHash > deletedPipeline; - QVector activePipelines; - bool stepScheduled; + QHash > processors; QHash adaptorFactory; }; @@ -68,21 +63,9 @@ Pipeline::~Pipeline() delete d; } -void Pipeline::setPreprocessors(const QString &entityType, Type pipelineType, const QVector &preprocessors) +void Pipeline::setPreprocessors(const QString &entityType, const QVector &processors) { - switch (pipelineType) { - case NewPipeline: - d->newPipeline[entityType] = preprocessors; - break; - case ModifiedPipeline: - d->modifiedPipeline[entityType] = preprocessors; - break; - case DeletedPipeline: - d->deletedPipeline[entityType] = preprocessors; - break; - default: - break; - }; + d->processors[entityType] = processors; } void Pipeline::setAdaptorFactory(const QString &entityType, DomainTypeAdaptorFactoryInterface::Ptr factory) @@ -92,6 +75,11 @@ void Pipeline::setAdaptorFactory(const QString &entityType, DomainTypeAdaptorFac void Pipeline::startTransaction() { + //TODO call for all types + //But avoid doing it during cleanup + // for (auto processor : d->processors[bufferType]) { + // processor->startBatch(); + // } if (d->transaction) { return; } @@ -100,7 +88,13 @@ void Pipeline::startTransaction() void Pipeline::commit() { + //TODO call for all types + //But avoid doing it during cleanup + // for (auto processor : d->processors[bufferType]) { + // processor->finalize(); + // } const auto revision = Akonadi2::Storage::maxRevision(d->transaction); + Trace() << "Committing " << revision; if (d->transaction) { d->transaction.commit(); } @@ -118,14 +112,6 @@ Storage &Pipeline::storage() const return d->storage; } -void Pipeline::null() -{ - //TODO: is there really any need for the null pipeline? if so, it should be doing something ;) - // PipelineState state(this, NullPipeline, QByteArray(), d->nullPipeline); - // d->activePipelines << state; - // state.step(); -} - void Pipeline::storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid) { d->transaction.openDatabase(bufferType + ".main").write(Akonadi2::Storage::assembleKey(uid, newRevision), QByteArray::fromRawData(reinterpret_cast(fbb.GetBufferPointer()), fbb.GetSize()), @@ -181,15 +167,25 @@ KAsync::Job Pipeline::newEntity(void const *command, size_t size) storeNewRevision(newRevision, fbb, bufferType, key); - Log() << "Pipeline: wrote entity: " << key << newRevision << bufferType; + auto adaptorFactory = d->adaptorFactory.value(bufferType); + if (!adaptorFactory) { + Warning() << "no adaptor factory for type " << bufferType; + return KAsync::error(0); + } - return KAsync::start([this, key, bufferType, newRevision](KAsync::Future &future) { - PipelineState state(this, NewPipeline, Akonadi2::Storage::assembleKey(key, newRevision), d->newPipeline[bufferType], newRevision, [&future, newRevision]() { - future.setValue(newRevision); - future.setFinished(); - }, bufferType); - d->activePipelines << state; - state.step(); + Log() << "Pipeline: wrote entity: " << key << newRevision << bufferType; + d->transaction.openDatabase(bufferType + ".main").scan(Akonadi2::Storage::assembleKey(key, newRevision), [this, bufferType, newRevision, adaptorFactory, key](const QByteArray &, const QByteArray &value) -> bool { + auto entity = Akonadi2::GetEntity(value); + auto adaptor = adaptorFactory->createAdaptor(*entity); + for (auto processor : d->processors[bufferType]) { + processor->newEntity(key, newRevision, *adaptor, d->transaction); + } + return false; + }, [this](const Akonadi2::Storage::Error &error) { + ErrorMsg() << "Failed to find value in pipeline: " << error.message; + }); + return KAsync::start([newRevision](){ + return newRevision; }); } @@ -287,14 +283,18 @@ KAsync::Job Pipeline::modifiedEntity(void const *command, size_t size) storeNewRevision(newRevision, fbb, bufferType, key); Log() << "Pipeline: modified entity: " << key << newRevision << bufferType; - - return KAsync::start([this, key, bufferType, newRevision](KAsync::Future &future) { - PipelineState state(this, ModifiedPipeline, Akonadi2::Storage::assembleKey(key, newRevision), d->modifiedPipeline[bufferType], newRevision, [&future, newRevision]() { - future.setValue(newRevision); - future.setFinished(); - }, bufferType); - d->activePipelines << state; - state.step(); + d->transaction.openDatabase(bufferType + ".main").scan(Akonadi2::Storage::assembleKey(key, newRevision), [this, bufferType, newRevision, adaptorFactory, current, key](const QByteArray &, const QByteArray &value) -> bool { + auto entity = Akonadi2::GetEntity(value); + auto newEntity = adaptorFactory->createAdaptor(*entity); + for (auto processor : d->processors[bufferType]) { + processor->modifiedEntity(key, newRevision, *current, *newEntity, d->transaction); + } + return false; + }, [this](const Akonadi2::Storage::Error &error) { + ErrorMsg() << "Failed to find value in pipeline: " << error.message; + }); + return KAsync::start([newRevision](){ + return newRevision; }); } @@ -331,13 +331,24 @@ KAsync::Job Pipeline::deletedEntity(void const *command, size_t size) storeNewRevision(newRevision, fbb, bufferType, key); Log() << "Pipeline: deleted entity: "<< newRevision; - return KAsync::start([this, key, bufferType, newRevision](KAsync::Future &future) { - PipelineState state(this, DeletedPipeline, key, d->deletedPipeline[bufferType], newRevision, [&future, newRevision](){ - future.setValue(newRevision); - future.setFinished(); - }, bufferType); - d->activePipelines << state; - state.step(); + auto adaptorFactory = d->adaptorFactory.value(bufferType); + if (!adaptorFactory) { + Warning() << "no adaptor factory for type " << bufferType; + return KAsync::error(0); + } + + // d->transaction.openDatabase(bufferType + ".main").scan(key, [this, bufferType, newRevision, adaptorFactory](const QByteArray &, const QByteArray &value) -> bool { + // auto entity = Akonadi2::GetEntity(value); + // auto newEntity = adaptorFactory->createAdaptor(*entity); + for (auto processor : d->processors[bufferType]) { + processor->deletedEntity(key, newRevision, Akonadi2::ApplicationDomain::BufferAdaptor(), d->transaction); + } + // return false; + // }, [this](const Akonadi2::Storage::Error &error) { + // ErrorMsg() << "Failed to find value in pipeline: " << error.message; + // }); + return KAsync::start([newRevision](){ + return newRevision; }); } @@ -372,164 +383,6 @@ qint64 Pipeline::cleanedUpRevision() return Akonadi2::Storage::cleanedUpRevision(d->transaction); } -void Pipeline::pipelineStepped(const PipelineState &state) -{ - scheduleStep(); -} - -void Pipeline::scheduleStep() -{ - if (!d->stepScheduled) { - d->stepScheduled = true; - QMetaObject::invokeMethod(this, "stepPipelines", Qt::QueuedConnection); - } -} - -void Pipeline::stepPipelines() -{ - d->stepScheduled = false; - for (PipelineState &state: d->activePipelines) { - if (state.isIdle()) { - state.step(); - } - } -} - -void Pipeline::pipelineCompleted(PipelineState state) -{ - //TODO finalize the datastore, inform clients of the new rev - const int index = d->activePipelines.indexOf(state); - if (index > -1) { - d->activePipelines.remove(index); - } - state.callback(); - - scheduleStep(); - if (d->activePipelines.isEmpty()) { - emit pipelinesDrained(); - } -} - - -class PipelineState::Private : public QSharedData -{ -public: - Private(Pipeline *p, Pipeline::Type t, const QByteArray &k, QVector filters, const std::function &c, qint64 r, const QByteArray &b) - : pipeline(p), - type(t), - key(k), - filterIt(filters), - idle(true), - callback(c), - revision(r), - bufferType(b) - {} - - Private() - : pipeline(0), - filterIt(QVector()), - idle(true), - revision(-1) - {} - - Pipeline *pipeline; - Pipeline::Type type; - QByteArray key; - QVectorIterator filterIt; - bool idle; - std::function callback; - qint64 revision; - QByteArray bufferType; -}; - -PipelineState::PipelineState() - : d(new Private()) -{ - -} - -PipelineState::PipelineState(Pipeline *pipeline, Pipeline::Type type, const QByteArray &key, const QVector &filters, qint64 revision, const std::function &callback, const QByteArray &bufferType) - : d(new Private(pipeline, type, key, filters, callback, revision, bufferType)) -{ -} - -PipelineState::PipelineState(const PipelineState &other) - : d(other.d) -{ -} - -PipelineState::~PipelineState() -{ -} - -PipelineState &PipelineState::operator=(const PipelineState &rhs) -{ - d = rhs.d; - return *this; -} - -bool PipelineState::operator==(const PipelineState &rhs) -{ - return d == rhs.d; -} - -bool PipelineState::isIdle() const -{ - return d->idle; -} - -QByteArray PipelineState::key() const -{ - return d->key; -} - -Pipeline::Type PipelineState::type() const -{ - return d->type; -} - -qint64 PipelineState::revision() const -{ - return d->revision; -} - -QByteArray PipelineState::bufferType() const -{ - return d->bufferType; -} - -void PipelineState::step() -{ - if (!d->pipeline) { - Q_ASSERT(false); - return; - } - - d->idle = false; - if (d->filterIt.hasNext()) { - //TODO skip step if already processed - auto preprocessor = d->filterIt.next(); - preprocessor->process(*this, d->pipeline->transaction()); - } else { - //This object becomes invalid after this call - d->pipeline->pipelineCompleted(*this); - } -} - -void PipelineState::processingCompleted(Preprocessor *filter) -{ - if (d->pipeline && filter == d->filterIt.peekPrevious()) { - d->idle = true; - d->pipeline->pipelineStepped(*this); - } -} - -void PipelineState::callback() -{ - d->callback(); -} - - Preprocessor::Preprocessor() : d(0) { @@ -539,19 +392,12 @@ Preprocessor::~Preprocessor() { } -void Preprocessor::process(const PipelineState &state, Akonadi2::Storage::Transaction &transaction) -{ - processingCompleted(state); -} - -void Preprocessor::processingCompleted(PipelineState state) +void Preprocessor::startBatch() { - state.processingCompleted(this); } -QString Preprocessor::id() const +void Preprocessor::finalize() { - return QLatin1String("unknown processor"); } } // namespace Akonadi2 diff --git a/common/pipeline.h b/common/pipeline.h index c8d9ddc..f11d880 100644 --- a/common/pipeline.h +++ b/common/pipeline.h @@ -35,7 +35,6 @@ namespace Akonadi2 { -class PipelineState; class Preprocessor; class AKONADI2COMMON_EXPORT Pipeline : public QObject @@ -43,19 +42,16 @@ class AKONADI2COMMON_EXPORT Pipeline : public QObject Q_OBJECT public: - enum Type { NullPipeline, NewPipeline, ModifiedPipeline, DeletedPipeline }; - Pipeline(const QString &storagePath, QObject *parent = 0); ~Pipeline(); Storage &storage() const; - void setPreprocessors(const QString &entityType, Type pipelineType, const QVector &preprocessors); + void setPreprocessors(const QString &entityType, const QVector &preprocessors); void startTransaction(); void commit(); Storage::Transaction &transaction(); - void null(); void setAdaptorFactory(const QString &entityType, DomainTypeAdaptorFactoryInterface::Ptr factory); KAsync::Job newEntity(void const *command, size_t size); @@ -75,104 +71,30 @@ public: Q_SIGNALS: void revisionUpdated(qint64); - void pipelinesDrained(); - -private Q_SLOTS: - void stepPipelines(); private: - void pipelineStepped(const PipelineState &state); - //Don't use a reference here (it would invalidate itself) - void pipelineCompleted(PipelineState state); - void scheduleStep(); void storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid); - friend class PipelineState; - class Private; Private * const d; }; -class AKONADI2COMMON_EXPORT PipelineState -{ -public: - PipelineState(); - PipelineState(Pipeline *pipeline, Pipeline::Type type, const QByteArray &key, const QVector &filters, qint64 revision, const std::function &callback, const QByteArray &bufferType); - PipelineState(const PipelineState &other); - ~PipelineState(); - - PipelineState &operator=(const PipelineState &rhs); - bool operator==(const PipelineState &rhs); - - bool isIdle() const; - QByteArray key() const; - Pipeline::Type type() const; - qint64 revision() const; - QByteArray bufferType() const; - - void step(); - void processingCompleted(Preprocessor *filter); - - void callback(); - -private: - class Private; - QExplicitlySharedDataPointer d; -}; - class AKONADI2COMMON_EXPORT Preprocessor { public: Preprocessor(); virtual ~Preprocessor(); - virtual void process(const PipelineState &state, Akonadi2::Storage::Transaction &transaction); - //TODO to record progress - virtual QString id() const; - -protected: - void processingCompleted(PipelineState state); + virtual void startBatch(); + virtual void newEntity(const QByteArray &key, qint64 revision, const Akonadi2::ApplicationDomain::BufferAdaptor &newEntity, Akonadi2::Storage::Transaction &transaction) = 0; + virtual void modifiedEntity(const QByteArray &key, qint64 revision, const Akonadi2::ApplicationDomain::BufferAdaptor &oldEntity, const Akonadi2::ApplicationDomain::BufferAdaptor &newEntity, Akonadi2::Storage::Transaction &transaction) = 0; + virtual void deletedEntity(const QByteArray &key, qint64 revision, const Akonadi2::ApplicationDomain::BufferAdaptor &oldEntity, Akonadi2::Storage::Transaction &transaction) = 0; + virtual void finalize(); private: class Private; Private * const d; }; -/** - * A simple processor that takes a single function - */ -class AKONADI2COMMON_EXPORT SimpleProcessor : public Akonadi2::Preprocessor -{ -public: - SimpleProcessor(const QString &id, const std::function &f) - : Akonadi2::Preprocessor(), - mFunction(f), - mId(id) - { - } - - void process(const PipelineState &state, Akonadi2::Storage::Transaction &transaction) Q_DECL_OVERRIDE - { - transaction.openDatabase(state.bufferType() + ".main").scan(state.key(), [this, &state, &transaction](const QByteArray &key, const QByteArray &value) -> bool { - auto entity = Akonadi2::GetEntity(value); - mFunction(state, *entity, transaction); - processingCompleted(state); - return false; - }, [this, state](const Akonadi2::Storage::Error &error) { - ErrorMsg() << "Failed to find value in pipeline: " << error.message; - processingCompleted(state); - }); - } - - QString id() const Q_DECL_OVERRIDE - { - return mId; - } - -protected: - std::function mFunction; - QString mId; -}; - } // namespace Akonadi2 -- cgit v1.2.3 From 81859328bf30c2aeecdf3ee48e5939e0496552fd Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Wed, 28 Oct 2015 16:46:10 +0100 Subject: Reuse the existing transaction --- common/pipeline.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'common') diff --git a/common/pipeline.cpp b/common/pipeline.cpp index de63288..ae4cc3d 100644 --- a/common/pipeline.cpp +++ b/common/pipeline.cpp @@ -233,7 +233,7 @@ KAsync::Job Pipeline::modifiedEntity(void const *command, size_t size) auto diff = adaptorFactory->createAdaptor(*diffEntity); QSharedPointer current; - storage().createTransaction(Akonadi2::Storage::ReadOnly).openDatabase(bufferType + ".main").scan(Akonadi2::Storage::assembleKey(key, baseRevision), [¤t, adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool { + d->transaction.openDatabase(bufferType + ".main").scan(Akonadi2::Storage::assembleKey(key, baseRevision), [¤t, adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool { Akonadi2::EntityBuffer buffer(const_cast(data.data()), data.size()); if (!buffer.isValid()) { Warning() << "Read invalid buffer from disk"; -- cgit v1.2.3 From 043cd5c9e1c90ba04659b67000b974cf8c35f7ba Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Wed, 28 Oct 2015 17:21:47 +0100 Subject: Correctly execute modifications and removals ... also if there are intermediate revisions. --- common/pipeline.cpp | 33 ++++++++++++++++++++------------- 1 file changed, 20 insertions(+), 13 deletions(-) (limited to 'common') diff --git a/common/pipeline.cpp b/common/pipeline.cpp index ae4cc3d..0ce478b 100644 --- a/common/pipeline.cpp +++ b/common/pipeline.cpp @@ -233,7 +233,7 @@ KAsync::Job Pipeline::modifiedEntity(void const *command, size_t size) auto diff = adaptorFactory->createAdaptor(*diffEntity); QSharedPointer current; - d->transaction.openDatabase(bufferType + ".main").scan(Akonadi2::Storage::assembleKey(key, baseRevision), [¤t, adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool { + d->transaction.openDatabase(bufferType + ".main").findLatest(key, [¤t, adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool { Akonadi2::EntityBuffer buffer(const_cast(data.data()), data.size()); if (!buffer.isValid()) { Warning() << "Read invalid buffer from disk"; @@ -328,25 +328,32 @@ KAsync::Job Pipeline::deletedEntity(void const *command, size_t size) flatbuffers::FlatBufferBuilder fbb; EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), 0, 0, 0, 0); - storeNewRevision(newRevision, fbb, bufferType, key); - Log() << "Pipeline: deleted entity: "<< newRevision; - auto adaptorFactory = d->adaptorFactory.value(bufferType); if (!adaptorFactory) { Warning() << "no adaptor factory for type " << bufferType; return KAsync::error(0); } - // d->transaction.openDatabase(bufferType + ".main").scan(key, [this, bufferType, newRevision, adaptorFactory](const QByteArray &, const QByteArray &value) -> bool { - // auto entity = Akonadi2::GetEntity(value); - // auto newEntity = adaptorFactory->createAdaptor(*entity); - for (auto processor : d->processors[bufferType]) { - processor->deletedEntity(key, newRevision, Akonadi2::ApplicationDomain::BufferAdaptor(), d->transaction); + QSharedPointer current; + d->transaction.openDatabase(bufferType + ".main").findLatest(key, [this, bufferType, newRevision, adaptorFactory, key, ¤t](const QByteArray &, const QByteArray &data) -> bool { + Akonadi2::EntityBuffer buffer(const_cast(data.data()), data.size()); + if (!buffer.isValid()) { + Warning() << "Read invalid buffer from disk"; + } else { + current = adaptorFactory->createAdaptor(buffer.entity()); } - // return false; - // }, [this](const Akonadi2::Storage::Error &error) { - // ErrorMsg() << "Failed to find value in pipeline: " << error.message; - // }); + return false; + }, [this](const Akonadi2::Storage::Error &error) { + ErrorMsg() << "Failed to find value in pipeline: " << error.message; + }); + + storeNewRevision(newRevision, fbb, bufferType, key); + Log() << "Pipeline: deleted entity: "<< newRevision; + + for (auto processor : d->processors[bufferType]) { + processor->deletedEntity(key, newRevision, *current, d->transaction); + } + return KAsync::start([newRevision](){ return newRevision; }); -- cgit v1.2.3 From bb1b238d6982abe1e640fbf424234b2c5389642b Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Wed, 28 Oct 2015 18:28:23 +0100 Subject: Cleanup changereplay, and replay revision 1 as the first --- common/genericresource.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'common') diff --git a/common/genericresource.cpp b/common/genericresource.cpp index ec68f33..652154d 100644 --- a/common/genericresource.cpp +++ b/common/genericresource.cpp @@ -54,7 +54,7 @@ public Q_SLOTS: { auto mainStoreTransaction = mStorage.createTransaction(Storage::ReadOnly); auto replayStoreTransaction = mChangeReplayStore.createTransaction(Storage::ReadWrite); - qint64 lastReplayedRevision = 0; + qint64 lastReplayedRevision = 1; replayStoreTransaction.openDatabase().scan("lastReplayedRevision", [&lastReplayedRevision](const QByteArray &key, const QByteArray &value) -> bool { lastReplayedRevision = value.toLongLong(); return false; @@ -290,6 +290,7 @@ void GenericResource::removeFromDisk(const QByteArray &instanceIdentifier) Akonadi2::Storage(Akonadi2::storageLocation(), instanceIdentifier, Akonadi2::Storage::ReadWrite).removeFromDisk(); Akonadi2::Storage(Akonadi2::storageLocation(), instanceIdentifier + ".userqueue", Akonadi2::Storage::ReadWrite).removeFromDisk(); Akonadi2::Storage(Akonadi2::storageLocation(), instanceIdentifier + ".synchronizerqueue", Akonadi2::Storage::ReadWrite).removeFromDisk(); + Akonadi2::Storage(Akonadi2::storageLocation(), instanceIdentifier + ".changereplay", Akonadi2::Storage::ReadWrite).removeFromDisk(); } void GenericResource::onProcessorError(int errorCode, const QString &errorMessage) -- cgit v1.2.3