summaryrefslogtreecommitdiffstats
path: root/common/genericresource.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'common/genericresource.cpp')
-rw-r--r--common/genericresource.cpp39
1 files changed, 36 insertions, 3 deletions
diff --git a/common/genericresource.cpp b/common/genericresource.cpp
index acf84c4..2a0d6bd 100644
--- a/common/genericresource.cpp
+++ b/common/genericresource.cpp
@@ -25,12 +25,23 @@ public:
25 mCommandQueues(commandQueues), 25 mCommandQueues(commandQueues),
26 mProcessingLock(false) 26 mProcessingLock(false)
27 { 27 {
28 mPipeline->startTransaction();
29 //FIXME Should be initialized to the current value of the change replay queue
30 mLowerBoundRevision = Storage::maxRevision(mPipeline->transaction());
31 mPipeline->commit();
32
28 for (auto queue : mCommandQueues) { 33 for (auto queue : mCommandQueues) {
29 const bool ret = connect(queue, &MessageQueue::messageReady, this, &Processor::process); 34 const bool ret = connect(queue, &MessageQueue::messageReady, this, &Processor::process);
30 Q_UNUSED(ret); 35 Q_UNUSED(ret);
31 } 36 }
32 } 37 }
33 38
39 void setOldestUsedRevision(qint64 revision)
40 {
41 mLowerBoundRevision = revision;
42 }
43
44
34signals: 45signals:
35 void error(int errorCode, const QString &errorMessage); 46 void error(int errorCode, const QString &errorMessage);
36 47
@@ -113,9 +124,6 @@ private slots:
113 return KAsync::start<void>([this, data](KAsync::Future<void> &future) { 124 return KAsync::start<void>([this, data](KAsync::Future<void> &future) {
114 processQueuedCommand(data).then<void, qint64>([&future, this](qint64 createdRevision) { 125 processQueuedCommand(data).then<void, qint64>([&future, this](qint64 createdRevision) {
115 Trace() << "Created revision " << createdRevision; 126 Trace() << "Created revision " << createdRevision;
116 //We don't have a writeback yet, so we cleanup revisions immediately
117 //TODO: only cleanup once writeback is done
118 mPipeline->cleanupRevision(createdRevision);
119 future.setFinished(); 127 future.setFinished();
120 }).exec(); 128 }).exec();
121 }); 129 });
@@ -137,6 +145,12 @@ private slots:
137 145
138 KAsync::Job<void> processPipeline() 146 KAsync::Job<void> processPipeline()
139 { 147 {
148 mPipeline->startTransaction();
149 for (qint64 revision = mPipeline->cleanedUpRevision() + 1; revision <= mLowerBoundRevision; revision++) {
150 mPipeline->cleanupRevision(revision);
151 }
152 mPipeline->commit();
153
140 //Go through all message queues 154 //Go through all message queues
141 auto it = QSharedPointer<QListIterator<MessageQueue*> >::create(mCommandQueues); 155 auto it = QSharedPointer<QListIterator<MessageQueue*> >::create(mCommandQueues);
142 return KAsync::dowhile( 156 return KAsync::dowhile(
@@ -156,6 +170,8 @@ private:
156 //Ordered by priority 170 //Ordered by priority
157 QList<MessageQueue*> mCommandQueues; 171 QList<MessageQueue*> mCommandQueues;
158 bool mProcessingLock; 172 bool mProcessingLock;
173 //The lowest revision we no longer need
174 qint64 mLowerBoundRevision;
159}; 175};
160 176
161 177
@@ -171,6 +187,23 @@ GenericResource::GenericResource(const QByteArray &resourceInstanceIdentifier, c
171 QObject::connect(mProcessor, &Processor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); }); 187 QObject::connect(mProcessor, &Processor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); });
172 QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, this, &Resource::revisionUpdated); 188 QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, this, &Resource::revisionUpdated);
173 189
190 //We simply drop revisions with 100ms delay until we have better information from clients and writeback
191 //FIXME On startup, read the latest revision that is replayed to initialize. Then bump revision when change-replay and
192 //all clients have advanced to a later revision.
193 QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, [this](qint64 revision) {
194 QTimer *dropRevisionTimer = new QTimer();
195 dropRevisionTimer->setInterval(100);
196 dropRevisionTimer->setSingleShot(true);
197 auto processor = QPointer<Processor>(mProcessor);
198 QObject::connect(dropRevisionTimer, &QTimer::timeout, dropRevisionTimer, [processor, revision, dropRevisionTimer]() {
199 if (processor) {
200 processor->setOldestUsedRevision(revision);
201 }
202 delete dropRevisionTimer;
203 });
204 dropRevisionTimer->start();
205 });
206
174 mCommitQueueTimer.setInterval(100); 207 mCommitQueueTimer.setInterval(100);
175 mCommitQueueTimer.setSingleShot(true); 208 mCommitQueueTimer.setSingleShot(true);
176 QObject::connect(&mCommitQueueTimer, &QTimer::timeout, &mUserQueue, &MessageQueue::commit); 209 QObject::connect(&mCommitQueueTimer, &QTimer::timeout, &mUserQueue, &MessageQueue::commit);