From 7eb4227cfcda1cbcf066c37d5e6679ef350d518c Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Mon, 12 Oct 2015 13:14:41 +0200 Subject: Cleanup revisions with a delay --- common/genericresource.cpp | 39 ++++++++++++++++++++++++++++++++++++--- common/pipeline.cpp | 7 ++++++- common/pipeline.h | 5 +++++ common/storage.h | 3 +++ common/storage_common.cpp | 19 +++++++++++++++++++ tests/dummyresourcetest.cpp | 2 +- 6 files changed, 70 insertions(+), 5 deletions(-) diff --git a/common/genericresource.cpp b/common/genericresource.cpp index acf84c4..2a0d6bd 100644 --- a/common/genericresource.cpp +++ b/common/genericresource.cpp @@ -25,12 +25,23 @@ public: mCommandQueues(commandQueues), mProcessingLock(false) { + mPipeline->startTransaction(); + //FIXME Should be initialized to the current value of the change replay queue + mLowerBoundRevision = Storage::maxRevision(mPipeline->transaction()); + mPipeline->commit(); + for (auto queue : mCommandQueues) { const bool ret = connect(queue, &MessageQueue::messageReady, this, &Processor::process); Q_UNUSED(ret); } } + void setOldestUsedRevision(qint64 revision) + { + mLowerBoundRevision = revision; + } + + signals: void error(int errorCode, const QString &errorMessage); @@ -113,9 +124,6 @@ private slots: return KAsync::start([this, data](KAsync::Future &future) { processQueuedCommand(data).then([&future, this](qint64 createdRevision) { Trace() << "Created revision " << createdRevision; - //We don't have a writeback yet, so we cleanup revisions immediately - //TODO: only cleanup once writeback is done - mPipeline->cleanupRevision(createdRevision); future.setFinished(); }).exec(); }); @@ -137,6 +145,12 @@ private slots: KAsync::Job processPipeline() { + mPipeline->startTransaction(); + for (qint64 revision = mPipeline->cleanedUpRevision() + 1; revision <= mLowerBoundRevision; revision++) { + mPipeline->cleanupRevision(revision); + } + mPipeline->commit(); + //Go through all message queues auto it = QSharedPointer >::create(mCommandQueues); return KAsync::dowhile( @@ -156,6 +170,8 @@ private: //Ordered by priority QList mCommandQueues; bool mProcessingLock; + //The lowest revision we no longer need + qint64 mLowerBoundRevision; }; @@ -171,6 +187,23 @@ GenericResource::GenericResource(const QByteArray &resourceInstanceIdentifier, c QObject::connect(mProcessor, &Processor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); }); QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, this, &Resource::revisionUpdated); + //We simply drop revisions with 100ms delay until we have better information from clients and writeback + //FIXME On startup, read the latest revision that is replayed to initialize. Then bump revision when change-replay and + //all clients have advanced to a later revision. + QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, [this](qint64 revision) { + QTimer *dropRevisionTimer = new QTimer(); + dropRevisionTimer->setInterval(100); + dropRevisionTimer->setSingleShot(true); + auto processor = QPointer(mProcessor); + QObject::connect(dropRevisionTimer, &QTimer::timeout, dropRevisionTimer, [processor, revision, dropRevisionTimer]() { + if (processor) { + processor->setOldestUsedRevision(revision); + } + delete dropRevisionTimer; + }); + dropRevisionTimer->start(); + }); + mCommitQueueTimer.setInterval(100); mCommitQueueTimer.setSingleShot(true); QObject::connect(&mCommitQueueTimer, &QTimer::timeout, &mUserQueue, &MessageQueue::commit); diff --git a/common/pipeline.cpp b/common/pipeline.cpp index b05cb2f..9816129 100644 --- a/common/pipeline.cpp +++ b/common/pipeline.cpp @@ -313,7 +313,6 @@ KAsync::Job Pipeline::deletedEntity(void const *command, size_t size) const QByteArray bufferType = QByteArray(reinterpret_cast(deleteEntity->domainType()->Data()), deleteEntity->domainType()->size()); const QByteArray key = QByteArray(reinterpret_cast(deleteEntity->entityId()->Data()), deleteEntity->entityId()->size()); - const qint64 baseRevision = deleteEntity->revision(); const qint64 newRevision = Akonadi2::Storage::maxRevision(d->transaction) + 1; @@ -365,6 +364,12 @@ void Pipeline::cleanupRevision(qint64 revision) }, [](const Akonadi2::Storage::Error &error) { Warning() << "Error while reading: " << error.message; }, true); + Akonadi2::Storage::setCleanedUpRevision(d->transaction, revision); +} + +qint64 Pipeline::cleanedUpRevision() +{ + return Akonadi2::Storage::cleanedUpRevision(d->transaction); } void Pipeline::pipelineStepped(const PipelineState &state) diff --git a/common/pipeline.h b/common/pipeline.h index f4e8ae0..837c18a 100644 --- a/common/pipeline.h +++ b/common/pipeline.h @@ -68,6 +68,11 @@ public: */ void cleanupRevision(qint64 revision); + /* + * Returns the latest cleaned up revision. + */ + qint64 cleanedUpRevision(); + Q_SIGNALS: void revisionUpdated(qint64); void pipelinesDrained(); diff --git a/common/storage.h b/common/storage.h index 9459f04..aa0be4c 100644 --- a/common/storage.h +++ b/common/storage.h @@ -168,6 +168,9 @@ public: static qint64 maxRevision(const Akonadi2::Storage::Transaction &); static void setMaxRevision(Akonadi2::Storage::Transaction &, qint64 revision); + static qint64 cleanedUpRevision(const Akonadi2::Storage::Transaction &); + static void setCleanedUpRevision(Akonadi2::Storage::Transaction &, qint64 revision); + static QByteArray getUidFromRevision(const Akonadi2::Storage::Transaction &, qint64 revision); static QByteArray getTypeFromRevision(const Akonadi2::Storage::Transaction &, qint64 revision); static void recordRevision(Akonadi2::Storage::Transaction &, qint64 revision, const QByteArray &uid, const QByteArray &type); diff --git a/common/storage_common.cpp b/common/storage_common.cpp index 2c23f97..1dbc178 100644 --- a/common/storage_common.cpp +++ b/common/storage_common.cpp @@ -73,6 +73,25 @@ qint64 Storage::maxRevision(const Akonadi2::Storage::Transaction &transaction) return r; } +void Storage::setCleanedUpRevision(Akonadi2::Storage::Transaction &transaction, qint64 revision) +{ + transaction.openDatabase().write("__internal_cleanedUpRevision", QByteArray::number(revision)); +} + +qint64 Storage::cleanedUpRevision(const Akonadi2::Storage::Transaction &transaction) +{ + qint64 r = 0; + transaction.openDatabase().scan("__internal_cleanedUpRevision", [&](const QByteArray &, const QByteArray &revision) -> bool { + r = revision.toLongLong(); + return false; + }, [](const Error &error){ + if (error.code != Akonadi2::Storage::NotFound) { + std::cout << "Coultn'd find the maximum revision" << std::endl; + } + }); + return r; +} + QByteArray Storage::getUidFromRevision(const Akonadi2::Storage::Transaction &transaction, qint64 revision) { QByteArray uid; diff --git a/tests/dummyresourcetest.cpp b/tests/dummyresourcetest.cpp index fe3acc9..93a6a9c 100644 --- a/tests/dummyresourcetest.cpp +++ b/tests/dummyresourcetest.cpp @@ -221,7 +221,7 @@ private Q_SLOTS: { async::SyncListResult result(Akonadi2::Store::load(query)); result.exec(); - QCOMPARE(result.size(), 0); + QTRY_COMPARE(result.size(), 0); } } -- cgit v1.2.3