diff options
author | Christian Mollekopf <chrigi_1@fastmail.fm> | 2015-10-12 13:14:41 +0200 |
---|---|---|
committer | Christian Mollekopf <chrigi_1@fastmail.fm> | 2015-10-12 13:15:05 +0200 |
commit | 7eb4227cfcda1cbcf066c37d5e6679ef350d518c (patch) | |
tree | b037abca2091bd92c123cd6fb6bfd6c46ac0bbfb /common/genericresource.cpp | |
parent | 2144ed85258e0e9d02d08cc4e5898dd34e776df6 (diff) | |
download | sink-7eb4227cfcda1cbcf066c37d5e6679ef350d518c.tar.gz sink-7eb4227cfcda1cbcf066c37d5e6679ef350d518c.zip |
Cleanup revisions with a delay
Diffstat (limited to 'common/genericresource.cpp')
-rw-r--r-- | common/genericresource.cpp | 39 |
1 files changed, 36 insertions, 3 deletions
diff --git a/common/genericresource.cpp b/common/genericresource.cpp index acf84c4..2a0d6bd 100644 --- a/common/genericresource.cpp +++ b/common/genericresource.cpp | |||
@@ -25,12 +25,23 @@ public: | |||
25 | mCommandQueues(commandQueues), | 25 | mCommandQueues(commandQueues), |
26 | mProcessingLock(false) | 26 | mProcessingLock(false) |
27 | { | 27 | { |
28 | mPipeline->startTransaction(); | ||
29 | //FIXME Should be initialized to the current value of the change replay queue | ||
30 | mLowerBoundRevision = Storage::maxRevision(mPipeline->transaction()); | ||
31 | mPipeline->commit(); | ||
32 | |||
28 | for (auto queue : mCommandQueues) { | 33 | for (auto queue : mCommandQueues) { |
29 | const bool ret = connect(queue, &MessageQueue::messageReady, this, &Processor::process); | 34 | const bool ret = connect(queue, &MessageQueue::messageReady, this, &Processor::process); |
30 | Q_UNUSED(ret); | 35 | Q_UNUSED(ret); |
31 | } | 36 | } |
32 | } | 37 | } |
33 | 38 | ||
39 | void setOldestUsedRevision(qint64 revision) | ||
40 | { | ||
41 | mLowerBoundRevision = revision; | ||
42 | } | ||
43 | |||
44 | |||
34 | signals: | 45 | signals: |
35 | void error(int errorCode, const QString &errorMessage); | 46 | void error(int errorCode, const QString &errorMessage); |
36 | 47 | ||
@@ -113,9 +124,6 @@ private slots: | |||
113 | return KAsync::start<void>([this, data](KAsync::Future<void> &future) { | 124 | return KAsync::start<void>([this, data](KAsync::Future<void> &future) { |
114 | processQueuedCommand(data).then<void, qint64>([&future, this](qint64 createdRevision) { | 125 | processQueuedCommand(data).then<void, qint64>([&future, this](qint64 createdRevision) { |
115 | Trace() << "Created revision " << createdRevision; | 126 | Trace() << "Created revision " << createdRevision; |
116 | //We don't have a writeback yet, so we cleanup revisions immediately | ||
117 | //TODO: only cleanup once writeback is done | ||
118 | mPipeline->cleanupRevision(createdRevision); | ||
119 | future.setFinished(); | 127 | future.setFinished(); |
120 | }).exec(); | 128 | }).exec(); |
121 | }); | 129 | }); |
@@ -137,6 +145,12 @@ private slots: | |||
137 | 145 | ||
138 | KAsync::Job<void> processPipeline() | 146 | KAsync::Job<void> processPipeline() |
139 | { | 147 | { |
148 | mPipeline->startTransaction(); | ||
149 | for (qint64 revision = mPipeline->cleanedUpRevision() + 1; revision <= mLowerBoundRevision; revision++) { | ||
150 | mPipeline->cleanupRevision(revision); | ||
151 | } | ||
152 | mPipeline->commit(); | ||
153 | |||
140 | //Go through all message queues | 154 | //Go through all message queues |
141 | auto it = QSharedPointer<QListIterator<MessageQueue*> >::create(mCommandQueues); | 155 | auto it = QSharedPointer<QListIterator<MessageQueue*> >::create(mCommandQueues); |
142 | return KAsync::dowhile( | 156 | return KAsync::dowhile( |
@@ -156,6 +170,8 @@ private: | |||
156 | //Ordered by priority | 170 | //Ordered by priority |
157 | QList<MessageQueue*> mCommandQueues; | 171 | QList<MessageQueue*> mCommandQueues; |
158 | bool mProcessingLock; | 172 | bool mProcessingLock; |
173 | //The lowest revision we no longer need | ||
174 | qint64 mLowerBoundRevision; | ||
159 | }; | 175 | }; |
160 | 176 | ||
161 | 177 | ||
@@ -171,6 +187,23 @@ GenericResource::GenericResource(const QByteArray &resourceInstanceIdentifier, c | |||
171 | QObject::connect(mProcessor, &Processor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); }); | 187 | QObject::connect(mProcessor, &Processor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); }); |
172 | QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, this, &Resource::revisionUpdated); | 188 | QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, this, &Resource::revisionUpdated); |
173 | 189 | ||
190 | //We simply drop revisions with 100ms delay until we have better information from clients and writeback | ||
191 | //FIXME On startup, read the latest revision that is replayed to initialize. Then bump revision when change-replay and | ||
192 | //all clients have advanced to a later revision. | ||
193 | QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, [this](qint64 revision) { | ||
194 | QTimer *dropRevisionTimer = new QTimer(); | ||
195 | dropRevisionTimer->setInterval(100); | ||
196 | dropRevisionTimer->setSingleShot(true); | ||
197 | auto processor = QPointer<Processor>(mProcessor); | ||
198 | QObject::connect(dropRevisionTimer, &QTimer::timeout, dropRevisionTimer, [processor, revision, dropRevisionTimer]() { | ||
199 | if (processor) { | ||
200 | processor->setOldestUsedRevision(revision); | ||
201 | } | ||
202 | delete dropRevisionTimer; | ||
203 | }); | ||
204 | dropRevisionTimer->start(); | ||
205 | }); | ||
206 | |||
174 | mCommitQueueTimer.setInterval(100); | 207 | mCommitQueueTimer.setInterval(100); |
175 | mCommitQueueTimer.setSingleShot(true); | 208 | mCommitQueueTimer.setSingleShot(true); |
176 | QObject::connect(&mCommitQueueTimer, &QTimer::timeout, &mUserQueue, &MessageQueue::commit); | 209 | QObject::connect(&mCommitQueueTimer, &QTimer::timeout, &mUserQueue, &MessageQueue::commit); |