From 7eb4227cfcda1cbcf066c37d5e6679ef350d518c Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Mon, 12 Oct 2015 13:14:41 +0200 Subject: Cleanup revisions with a delay --- common/genericresource.cpp | 39 ++++++++++++++++++++++++++++++++++++--- 1 file changed, 36 insertions(+), 3 deletions(-) (limited to 'common/genericresource.cpp') diff --git a/common/genericresource.cpp b/common/genericresource.cpp index acf84c4..2a0d6bd 100644 --- a/common/genericresource.cpp +++ b/common/genericresource.cpp @@ -25,12 +25,23 @@ public: mCommandQueues(commandQueues), mProcessingLock(false) { + mPipeline->startTransaction(); + //FIXME Should be initialized to the current value of the change replay queue + mLowerBoundRevision = Storage::maxRevision(mPipeline->transaction()); + mPipeline->commit(); + for (auto queue : mCommandQueues) { const bool ret = connect(queue, &MessageQueue::messageReady, this, &Processor::process); Q_UNUSED(ret); } } + void setOldestUsedRevision(qint64 revision) + { + mLowerBoundRevision = revision; + } + + signals: void error(int errorCode, const QString &errorMessage); @@ -113,9 +124,6 @@ private slots: return KAsync::start([this, data](KAsync::Future &future) { processQueuedCommand(data).then([&future, this](qint64 createdRevision) { Trace() << "Created revision " << createdRevision; - //We don't have a writeback yet, so we cleanup revisions immediately - //TODO: only cleanup once writeback is done - mPipeline->cleanupRevision(createdRevision); future.setFinished(); }).exec(); }); @@ -137,6 +145,12 @@ private slots: KAsync::Job processPipeline() { + mPipeline->startTransaction(); + for (qint64 revision = mPipeline->cleanedUpRevision() + 1; revision <= mLowerBoundRevision; revision++) { + mPipeline->cleanupRevision(revision); + } + mPipeline->commit(); + //Go through all message queues auto it = QSharedPointer >::create(mCommandQueues); return KAsync::dowhile( @@ -156,6 +170,8 @@ private: //Ordered by priority QList mCommandQueues; bool mProcessingLock; + //The lowest revision we no longer need + qint64 mLowerBoundRevision; }; @@ -171,6 +187,23 @@ GenericResource::GenericResource(const QByteArray &resourceInstanceIdentifier, c QObject::connect(mProcessor, &Processor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); }); QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, this, &Resource::revisionUpdated); + //We simply drop revisions with 100ms delay until we have better information from clients and writeback + //FIXME On startup, read the latest revision that is replayed to initialize. Then bump revision when change-replay and + //all clients have advanced to a later revision. + QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, [this](qint64 revision) { + QTimer *dropRevisionTimer = new QTimer(); + dropRevisionTimer->setInterval(100); + dropRevisionTimer->setSingleShot(true); + auto processor = QPointer(mProcessor); + QObject::connect(dropRevisionTimer, &QTimer::timeout, dropRevisionTimer, [processor, revision, dropRevisionTimer]() { + if (processor) { + processor->setOldestUsedRevision(revision); + } + delete dropRevisionTimer; + }); + dropRevisionTimer->start(); + }); + mCommitQueueTimer.setInterval(100); mCommitQueueTimer.setSingleShot(true); QObject::connect(&mCommitQueueTimer, &QTimer::timeout, &mUserQueue, &MessageQueue::commit); -- cgit v1.2.3