summaryrefslogtreecommitdiffstats
path: root/common
diff options
context:
space:
mode:
Diffstat (limited to 'common')
-rw-r--r--common/genericresource.cpp39
-rw-r--r--common/pipeline.cpp7
-rw-r--r--common/pipeline.h5
-rw-r--r--common/storage.h3
-rw-r--r--common/storage_common.cpp19
5 files changed, 69 insertions, 4 deletions
diff --git a/common/genericresource.cpp b/common/genericresource.cpp
index acf84c4..2a0d6bd 100644
--- a/common/genericresource.cpp
+++ b/common/genericresource.cpp
@@ -25,12 +25,23 @@ public:
25 mCommandQueues(commandQueues), 25 mCommandQueues(commandQueues),
26 mProcessingLock(false) 26 mProcessingLock(false)
27 { 27 {
28 mPipeline->startTransaction();
29 //FIXME Should be initialized to the current value of the change replay queue
30 mLowerBoundRevision = Storage::maxRevision(mPipeline->transaction());
31 mPipeline->commit();
32
28 for (auto queue : mCommandQueues) { 33 for (auto queue : mCommandQueues) {
29 const bool ret = connect(queue, &MessageQueue::messageReady, this, &Processor::process); 34 const bool ret = connect(queue, &MessageQueue::messageReady, this, &Processor::process);
30 Q_UNUSED(ret); 35 Q_UNUSED(ret);
31 } 36 }
32 } 37 }
33 38
39 void setOldestUsedRevision(qint64 revision)
40 {
41 mLowerBoundRevision = revision;
42 }
43
44
34signals: 45signals:
35 void error(int errorCode, const QString &errorMessage); 46 void error(int errorCode, const QString &errorMessage);
36 47
@@ -113,9 +124,6 @@ private slots:
113 return KAsync::start<void>([this, data](KAsync::Future<void> &future) { 124 return KAsync::start<void>([this, data](KAsync::Future<void> &future) {
114 processQueuedCommand(data).then<void, qint64>([&future, this](qint64 createdRevision) { 125 processQueuedCommand(data).then<void, qint64>([&future, this](qint64 createdRevision) {
115 Trace() << "Created revision " << createdRevision; 126 Trace() << "Created revision " << createdRevision;
116 //We don't have a writeback yet, so we cleanup revisions immediately
117 //TODO: only cleanup once writeback is done
118 mPipeline->cleanupRevision(createdRevision);
119 future.setFinished(); 127 future.setFinished();
120 }).exec(); 128 }).exec();
121 }); 129 });
@@ -137,6 +145,12 @@ private slots:
137 145
138 KAsync::Job<void> processPipeline() 146 KAsync::Job<void> processPipeline()
139 { 147 {
148 mPipeline->startTransaction();
149 for (qint64 revision = mPipeline->cleanedUpRevision() + 1; revision <= mLowerBoundRevision; revision++) {
150 mPipeline->cleanupRevision(revision);
151 }
152 mPipeline->commit();
153
140 //Go through all message queues 154 //Go through all message queues
141 auto it = QSharedPointer<QListIterator<MessageQueue*> >::create(mCommandQueues); 155 auto it = QSharedPointer<QListIterator<MessageQueue*> >::create(mCommandQueues);
142 return KAsync::dowhile( 156 return KAsync::dowhile(
@@ -156,6 +170,8 @@ private:
156 //Ordered by priority 170 //Ordered by priority
157 QList<MessageQueue*> mCommandQueues; 171 QList<MessageQueue*> mCommandQueues;
158 bool mProcessingLock; 172 bool mProcessingLock;
173 //The lowest revision we no longer need
174 qint64 mLowerBoundRevision;
159}; 175};
160 176
161 177
@@ -171,6 +187,23 @@ GenericResource::GenericResource(const QByteArray &resourceInstanceIdentifier, c
171 QObject::connect(mProcessor, &Processor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); }); 187 QObject::connect(mProcessor, &Processor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); });
172 QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, this, &Resource::revisionUpdated); 188 QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, this, &Resource::revisionUpdated);
173 189
190 //We simply drop revisions with 100ms delay until we have better information from clients and writeback
191 //FIXME On startup, read the latest revision that is replayed to initialize. Then bump revision when change-replay and
192 //all clients have advanced to a later revision.
193 QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, [this](qint64 revision) {
194 QTimer *dropRevisionTimer = new QTimer();
195 dropRevisionTimer->setInterval(100);
196 dropRevisionTimer->setSingleShot(true);
197 auto processor = QPointer<Processor>(mProcessor);
198 QObject::connect(dropRevisionTimer, &QTimer::timeout, dropRevisionTimer, [processor, revision, dropRevisionTimer]() {
199 if (processor) {
200 processor->setOldestUsedRevision(revision);
201 }
202 delete dropRevisionTimer;
203 });
204 dropRevisionTimer->start();
205 });
206
174 mCommitQueueTimer.setInterval(100); 207 mCommitQueueTimer.setInterval(100);
175 mCommitQueueTimer.setSingleShot(true); 208 mCommitQueueTimer.setSingleShot(true);
176 QObject::connect(&mCommitQueueTimer, &QTimer::timeout, &mUserQueue, &MessageQueue::commit); 209 QObject::connect(&mCommitQueueTimer, &QTimer::timeout, &mUserQueue, &MessageQueue::commit);
diff --git a/common/pipeline.cpp b/common/pipeline.cpp
index b05cb2f..9816129 100644
--- a/common/pipeline.cpp
+++ b/common/pipeline.cpp
@@ -313,7 +313,6 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size)
313 313
314 const QByteArray bufferType = QByteArray(reinterpret_cast<char const*>(deleteEntity->domainType()->Data()), deleteEntity->domainType()->size()); 314 const QByteArray bufferType = QByteArray(reinterpret_cast<char const*>(deleteEntity->domainType()->Data()), deleteEntity->domainType()->size());
315 const QByteArray key = QByteArray(reinterpret_cast<char const*>(deleteEntity->entityId()->Data()), deleteEntity->entityId()->size()); 315 const QByteArray key = QByteArray(reinterpret_cast<char const*>(deleteEntity->entityId()->Data()), deleteEntity->entityId()->size());
316 const qint64 baseRevision = deleteEntity->revision();
317 316
318 const qint64 newRevision = Akonadi2::Storage::maxRevision(d->transaction) + 1; 317 const qint64 newRevision = Akonadi2::Storage::maxRevision(d->transaction) + 1;
319 318
@@ -365,6 +364,12 @@ void Pipeline::cleanupRevision(qint64 revision)
365 }, [](const Akonadi2::Storage::Error &error) { 364 }, [](const Akonadi2::Storage::Error &error) {
366 Warning() << "Error while reading: " << error.message; 365 Warning() << "Error while reading: " << error.message;
367 }, true); 366 }, true);
367 Akonadi2::Storage::setCleanedUpRevision(d->transaction, revision);
368}
369
370qint64 Pipeline::cleanedUpRevision()
371{
372 return Akonadi2::Storage::cleanedUpRevision(d->transaction);
368} 373}
369 374
370void Pipeline::pipelineStepped(const PipelineState &state) 375void Pipeline::pipelineStepped(const PipelineState &state)
diff --git a/common/pipeline.h b/common/pipeline.h
index f4e8ae0..837c18a 100644
--- a/common/pipeline.h
+++ b/common/pipeline.h
@@ -68,6 +68,11 @@ public:
68 */ 68 */
69 void cleanupRevision(qint64 revision); 69 void cleanupRevision(qint64 revision);
70 70
71 /*
72 * Returns the latest cleaned up revision.
73 */
74 qint64 cleanedUpRevision();
75
71Q_SIGNALS: 76Q_SIGNALS:
72 void revisionUpdated(qint64); 77 void revisionUpdated(qint64);
73 void pipelinesDrained(); 78 void pipelinesDrained();
diff --git a/common/storage.h b/common/storage.h
index 9459f04..aa0be4c 100644
--- a/common/storage.h
+++ b/common/storage.h
@@ -168,6 +168,9 @@ public:
168 static qint64 maxRevision(const Akonadi2::Storage::Transaction &); 168 static qint64 maxRevision(const Akonadi2::Storage::Transaction &);
169 static void setMaxRevision(Akonadi2::Storage::Transaction &, qint64 revision); 169 static void setMaxRevision(Akonadi2::Storage::Transaction &, qint64 revision);
170 170
171 static qint64 cleanedUpRevision(const Akonadi2::Storage::Transaction &);
172 static void setCleanedUpRevision(Akonadi2::Storage::Transaction &, qint64 revision);
173
171 static QByteArray getUidFromRevision(const Akonadi2::Storage::Transaction &, qint64 revision); 174 static QByteArray getUidFromRevision(const Akonadi2::Storage::Transaction &, qint64 revision);
172 static QByteArray getTypeFromRevision(const Akonadi2::Storage::Transaction &, qint64 revision); 175 static QByteArray getTypeFromRevision(const Akonadi2::Storage::Transaction &, qint64 revision);
173 static void recordRevision(Akonadi2::Storage::Transaction &, qint64 revision, const QByteArray &uid, const QByteArray &type); 176 static void recordRevision(Akonadi2::Storage::Transaction &, qint64 revision, const QByteArray &uid, const QByteArray &type);
diff --git a/common/storage_common.cpp b/common/storage_common.cpp
index 2c23f97..1dbc178 100644
--- a/common/storage_common.cpp
+++ b/common/storage_common.cpp
@@ -73,6 +73,25 @@ qint64 Storage::maxRevision(const Akonadi2::Storage::Transaction &transaction)
73 return r; 73 return r;
74} 74}
75 75
76void Storage::setCleanedUpRevision(Akonadi2::Storage::Transaction &transaction, qint64 revision)
77{
78 transaction.openDatabase().write("__internal_cleanedUpRevision", QByteArray::number(revision));
79}
80
81qint64 Storage::cleanedUpRevision(const Akonadi2::Storage::Transaction &transaction)
82{
83 qint64 r = 0;
84 transaction.openDatabase().scan("__internal_cleanedUpRevision", [&](const QByteArray &, const QByteArray &revision) -> bool {
85 r = revision.toLongLong();
86 return false;
87 }, [](const Error &error){
88 if (error.code != Akonadi2::Storage::NotFound) {
89 std::cout << "Coultn'd find the maximum revision" << std::endl;
90 }
91 });
92 return r;
93}
94
76QByteArray Storage::getUidFromRevision(const Akonadi2::Storage::Transaction &transaction, qint64 revision) 95QByteArray Storage::getUidFromRevision(const Akonadi2::Storage::Transaction &transaction, qint64 revision)
77{ 96{
78 QByteArray uid; 97 QByteArray uid;