diff options
-rw-r--r-- | common/genericresource.cpp | 39 | ||||
-rw-r--r-- | common/pipeline.cpp | 7 | ||||
-rw-r--r-- | common/pipeline.h | 5 | ||||
-rw-r--r-- | common/storage.h | 3 | ||||
-rw-r--r-- | common/storage_common.cpp | 19 | ||||
-rw-r--r-- | tests/dummyresourcetest.cpp | 2 |
6 files changed, 70 insertions, 5 deletions
diff --git a/common/genericresource.cpp b/common/genericresource.cpp index acf84c4..2a0d6bd 100644 --- a/common/genericresource.cpp +++ b/common/genericresource.cpp | |||
@@ -25,12 +25,23 @@ public: | |||
25 | mCommandQueues(commandQueues), | 25 | mCommandQueues(commandQueues), |
26 | mProcessingLock(false) | 26 | mProcessingLock(false) |
27 | { | 27 | { |
28 | mPipeline->startTransaction(); | ||
29 | //FIXME Should be initialized to the current value of the change replay queue | ||
30 | mLowerBoundRevision = Storage::maxRevision(mPipeline->transaction()); | ||
31 | mPipeline->commit(); | ||
32 | |||
28 | for (auto queue : mCommandQueues) { | 33 | for (auto queue : mCommandQueues) { |
29 | const bool ret = connect(queue, &MessageQueue::messageReady, this, &Processor::process); | 34 | const bool ret = connect(queue, &MessageQueue::messageReady, this, &Processor::process); |
30 | Q_UNUSED(ret); | 35 | Q_UNUSED(ret); |
31 | } | 36 | } |
32 | } | 37 | } |
33 | 38 | ||
39 | void setOldestUsedRevision(qint64 revision) | ||
40 | { | ||
41 | mLowerBoundRevision = revision; | ||
42 | } | ||
43 | |||
44 | |||
34 | signals: | 45 | signals: |
35 | void error(int errorCode, const QString &errorMessage); | 46 | void error(int errorCode, const QString &errorMessage); |
36 | 47 | ||
@@ -113,9 +124,6 @@ private slots: | |||
113 | return KAsync::start<void>([this, data](KAsync::Future<void> &future) { | 124 | return KAsync::start<void>([this, data](KAsync::Future<void> &future) { |
114 | processQueuedCommand(data).then<void, qint64>([&future, this](qint64 createdRevision) { | 125 | processQueuedCommand(data).then<void, qint64>([&future, this](qint64 createdRevision) { |
115 | Trace() << "Created revision " << createdRevision; | 126 | Trace() << "Created revision " << createdRevision; |
116 | //We don't have a writeback yet, so we cleanup revisions immediately | ||
117 | //TODO: only cleanup once writeback is done | ||
118 | mPipeline->cleanupRevision(createdRevision); | ||
119 | future.setFinished(); | 127 | future.setFinished(); |
120 | }).exec(); | 128 | }).exec(); |
121 | }); | 129 | }); |
@@ -137,6 +145,12 @@ private slots: | |||
137 | 145 | ||
138 | KAsync::Job<void> processPipeline() | 146 | KAsync::Job<void> processPipeline() |
139 | { | 147 | { |
148 | mPipeline->startTransaction(); | ||
149 | for (qint64 revision = mPipeline->cleanedUpRevision() + 1; revision <= mLowerBoundRevision; revision++) { | ||
150 | mPipeline->cleanupRevision(revision); | ||
151 | } | ||
152 | mPipeline->commit(); | ||
153 | |||
140 | //Go through all message queues | 154 | //Go through all message queues |
141 | auto it = QSharedPointer<QListIterator<MessageQueue*> >::create(mCommandQueues); | 155 | auto it = QSharedPointer<QListIterator<MessageQueue*> >::create(mCommandQueues); |
142 | return KAsync::dowhile( | 156 | return KAsync::dowhile( |
@@ -156,6 +170,8 @@ private: | |||
156 | //Ordered by priority | 170 | //Ordered by priority |
157 | QList<MessageQueue*> mCommandQueues; | 171 | QList<MessageQueue*> mCommandQueues; |
158 | bool mProcessingLock; | 172 | bool mProcessingLock; |
173 | //The lowest revision we no longer need | ||
174 | qint64 mLowerBoundRevision; | ||
159 | }; | 175 | }; |
160 | 176 | ||
161 | 177 | ||
@@ -171,6 +187,23 @@ GenericResource::GenericResource(const QByteArray &resourceInstanceIdentifier, c | |||
171 | QObject::connect(mProcessor, &Processor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); }); | 187 | QObject::connect(mProcessor, &Processor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); }); |
172 | QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, this, &Resource::revisionUpdated); | 188 | QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, this, &Resource::revisionUpdated); |
173 | 189 | ||
190 | //We simply drop revisions with 100ms delay until we have better information from clients and writeback | ||
191 | //FIXME On startup, read the latest revision that is replayed to initialize. Then bump revision when change-replay and | ||
192 | //all clients have advanced to a later revision. | ||
193 | QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, [this](qint64 revision) { | ||
194 | QTimer *dropRevisionTimer = new QTimer(); | ||
195 | dropRevisionTimer->setInterval(100); | ||
196 | dropRevisionTimer->setSingleShot(true); | ||
197 | auto processor = QPointer<Processor>(mProcessor); | ||
198 | QObject::connect(dropRevisionTimer, &QTimer::timeout, dropRevisionTimer, [processor, revision, dropRevisionTimer]() { | ||
199 | if (processor) { | ||
200 | processor->setOldestUsedRevision(revision); | ||
201 | } | ||
202 | delete dropRevisionTimer; | ||
203 | }); | ||
204 | dropRevisionTimer->start(); | ||
205 | }); | ||
206 | |||
174 | mCommitQueueTimer.setInterval(100); | 207 | mCommitQueueTimer.setInterval(100); |
175 | mCommitQueueTimer.setSingleShot(true); | 208 | mCommitQueueTimer.setSingleShot(true); |
176 | QObject::connect(&mCommitQueueTimer, &QTimer::timeout, &mUserQueue, &MessageQueue::commit); | 209 | QObject::connect(&mCommitQueueTimer, &QTimer::timeout, &mUserQueue, &MessageQueue::commit); |
diff --git a/common/pipeline.cpp b/common/pipeline.cpp index b05cb2f..9816129 100644 --- a/common/pipeline.cpp +++ b/common/pipeline.cpp | |||
@@ -313,7 +313,6 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size) | |||
313 | 313 | ||
314 | const QByteArray bufferType = QByteArray(reinterpret_cast<char const*>(deleteEntity->domainType()->Data()), deleteEntity->domainType()->size()); | 314 | const QByteArray bufferType = QByteArray(reinterpret_cast<char const*>(deleteEntity->domainType()->Data()), deleteEntity->domainType()->size()); |
315 | const QByteArray key = QByteArray(reinterpret_cast<char const*>(deleteEntity->entityId()->Data()), deleteEntity->entityId()->size()); | 315 | const QByteArray key = QByteArray(reinterpret_cast<char const*>(deleteEntity->entityId()->Data()), deleteEntity->entityId()->size()); |
316 | const qint64 baseRevision = deleteEntity->revision(); | ||
317 | 316 | ||
318 | const qint64 newRevision = Akonadi2::Storage::maxRevision(d->transaction) + 1; | 317 | const qint64 newRevision = Akonadi2::Storage::maxRevision(d->transaction) + 1; |
319 | 318 | ||
@@ -365,6 +364,12 @@ void Pipeline::cleanupRevision(qint64 revision) | |||
365 | }, [](const Akonadi2::Storage::Error &error) { | 364 | }, [](const Akonadi2::Storage::Error &error) { |
366 | Warning() << "Error while reading: " << error.message; | 365 | Warning() << "Error while reading: " << error.message; |
367 | }, true); | 366 | }, true); |
367 | Akonadi2::Storage::setCleanedUpRevision(d->transaction, revision); | ||
368 | } | ||
369 | |||
370 | qint64 Pipeline::cleanedUpRevision() | ||
371 | { | ||
372 | return Akonadi2::Storage::cleanedUpRevision(d->transaction); | ||
368 | } | 373 | } |
369 | 374 | ||
370 | void Pipeline::pipelineStepped(const PipelineState &state) | 375 | void Pipeline::pipelineStepped(const PipelineState &state) |
diff --git a/common/pipeline.h b/common/pipeline.h index f4e8ae0..837c18a 100644 --- a/common/pipeline.h +++ b/common/pipeline.h | |||
@@ -68,6 +68,11 @@ public: | |||
68 | */ | 68 | */ |
69 | void cleanupRevision(qint64 revision); | 69 | void cleanupRevision(qint64 revision); |
70 | 70 | ||
71 | /* | ||
72 | * Returns the latest cleaned up revision. | ||
73 | */ | ||
74 | qint64 cleanedUpRevision(); | ||
75 | |||
71 | Q_SIGNALS: | 76 | Q_SIGNALS: |
72 | void revisionUpdated(qint64); | 77 | void revisionUpdated(qint64); |
73 | void pipelinesDrained(); | 78 | void pipelinesDrained(); |
diff --git a/common/storage.h b/common/storage.h index 9459f04..aa0be4c 100644 --- a/common/storage.h +++ b/common/storage.h | |||
@@ -168,6 +168,9 @@ public: | |||
168 | static qint64 maxRevision(const Akonadi2::Storage::Transaction &); | 168 | static qint64 maxRevision(const Akonadi2::Storage::Transaction &); |
169 | static void setMaxRevision(Akonadi2::Storage::Transaction &, qint64 revision); | 169 | static void setMaxRevision(Akonadi2::Storage::Transaction &, qint64 revision); |
170 | 170 | ||
171 | static qint64 cleanedUpRevision(const Akonadi2::Storage::Transaction &); | ||
172 | static void setCleanedUpRevision(Akonadi2::Storage::Transaction &, qint64 revision); | ||
173 | |||
171 | static QByteArray getUidFromRevision(const Akonadi2::Storage::Transaction &, qint64 revision); | 174 | static QByteArray getUidFromRevision(const Akonadi2::Storage::Transaction &, qint64 revision); |
172 | static QByteArray getTypeFromRevision(const Akonadi2::Storage::Transaction &, qint64 revision); | 175 | static QByteArray getTypeFromRevision(const Akonadi2::Storage::Transaction &, qint64 revision); |
173 | static void recordRevision(Akonadi2::Storage::Transaction &, qint64 revision, const QByteArray &uid, const QByteArray &type); | 176 | static void recordRevision(Akonadi2::Storage::Transaction &, qint64 revision, const QByteArray &uid, const QByteArray &type); |
diff --git a/common/storage_common.cpp b/common/storage_common.cpp index 2c23f97..1dbc178 100644 --- a/common/storage_common.cpp +++ b/common/storage_common.cpp | |||
@@ -73,6 +73,25 @@ qint64 Storage::maxRevision(const Akonadi2::Storage::Transaction &transaction) | |||
73 | return r; | 73 | return r; |
74 | } | 74 | } |
75 | 75 | ||
76 | void Storage::setCleanedUpRevision(Akonadi2::Storage::Transaction &transaction, qint64 revision) | ||
77 | { | ||
78 | transaction.openDatabase().write("__internal_cleanedUpRevision", QByteArray::number(revision)); | ||
79 | } | ||
80 | |||
81 | qint64 Storage::cleanedUpRevision(const Akonadi2::Storage::Transaction &transaction) | ||
82 | { | ||
83 | qint64 r = 0; | ||
84 | transaction.openDatabase().scan("__internal_cleanedUpRevision", [&](const QByteArray &, const QByteArray &revision) -> bool { | ||
85 | r = revision.toLongLong(); | ||
86 | return false; | ||
87 | }, [](const Error &error){ | ||
88 | if (error.code != Akonadi2::Storage::NotFound) { | ||
89 | std::cout << "Coultn'd find the maximum revision" << std::endl; | ||
90 | } | ||
91 | }); | ||
92 | return r; | ||
93 | } | ||
94 | |||
76 | QByteArray Storage::getUidFromRevision(const Akonadi2::Storage::Transaction &transaction, qint64 revision) | 95 | QByteArray Storage::getUidFromRevision(const Akonadi2::Storage::Transaction &transaction, qint64 revision) |
77 | { | 96 | { |
78 | QByteArray uid; | 97 | QByteArray uid; |
diff --git a/tests/dummyresourcetest.cpp b/tests/dummyresourcetest.cpp index fe3acc9..93a6a9c 100644 --- a/tests/dummyresourcetest.cpp +++ b/tests/dummyresourcetest.cpp | |||
@@ -221,7 +221,7 @@ private Q_SLOTS: | |||
221 | { | 221 | { |
222 | async::SyncListResult<Akonadi2::ApplicationDomain::Event::Ptr> result(Akonadi2::Store::load<Akonadi2::ApplicationDomain::Event>(query)); | 222 | async::SyncListResult<Akonadi2::ApplicationDomain::Event::Ptr> result(Akonadi2::Store::load<Akonadi2::ApplicationDomain::Event>(query)); |
223 | result.exec(); | 223 | result.exec(); |
224 | QCOMPARE(result.size(), 0); | 224 | QTRY_COMPARE(result.size(), 0); |
225 | } | 225 | } |
226 | } | 226 | } |
227 | 227 | ||