diff options
Diffstat (limited to 'common/genericresource.cpp')
-rw-r--r-- | common/genericresource.cpp | 103 |
1 files changed, 101 insertions, 2 deletions
diff --git a/common/genericresource.cpp b/common/genericresource.cpp index 313d99c..8ae20ed 100644 --- a/common/genericresource.cpp +++ b/common/genericresource.cpp | |||
@@ -13,6 +13,84 @@ | |||
13 | using namespace Akonadi2; | 13 | using namespace Akonadi2; |
14 | 14 | ||
15 | /** | 15 | /** |
16 | * Replays changes from the storage one by one. | ||
17 | * | ||
18 | * Uses a local database to: | ||
19 | * * Remember what changes have been replayed already. | ||
20 | * * store a mapping of remote to local buffers | ||
21 | */ | ||
22 | class ChangeReplay : public QObject | ||
23 | { | ||
24 | Q_OBJECT | ||
25 | public: | ||
26 | |||
27 | typedef std::function<KAsync::Job<void>(const QByteArray &type, const QByteArray &key, const QByteArray &value)> ReplayFunction; | ||
28 | |||
29 | ChangeReplay(const QString &resourceName, const ReplayFunction replayFunction) | ||
30 | : mStorage(storageLocation(), resourceName, Storage::ReadOnly), | ||
31 | mChangeReplayStore(storageLocation(), resourceName + ".changereplay", Storage::ReadWrite), | ||
32 | mReplayFunction(replayFunction) | ||
33 | { | ||
34 | |||
35 | } | ||
36 | |||
37 | qint64 getLastReplayedRevision() | ||
38 | { | ||
39 | qint64 lastReplayedRevision = 0; | ||
40 | auto replayStoreTransaction = mChangeReplayStore.createTransaction(Storage::ReadOnly); | ||
41 | replayStoreTransaction.openDatabase().scan("lastReplayedRevision", [&lastReplayedRevision](const QByteArray &key, const QByteArray &value) -> bool { | ||
42 | lastReplayedRevision = value.toLongLong(); | ||
43 | return false; | ||
44 | }, [](const Storage::Error &) { | ||
45 | }); | ||
46 | return lastReplayedRevision; | ||
47 | } | ||
48 | |||
49 | Q_SIGNALS: | ||
50 | void changesReplayed(); | ||
51 | |||
52 | public Q_SLOTS: | ||
53 | void revisionChanged() | ||
54 | { | ||
55 | auto mainStoreTransaction = mStorage.createTransaction(Storage::ReadOnly); | ||
56 | auto replayStoreTransaction = mChangeReplayStore.createTransaction(Storage::ReadWrite); | ||
57 | qint64 lastReplayedRevision = 0; | ||
58 | replayStoreTransaction.openDatabase().scan("lastReplayedRevision", [&lastReplayedRevision](const QByteArray &key, const QByteArray &value) -> bool { | ||
59 | lastReplayedRevision = value.toLongLong(); | ||
60 | return false; | ||
61 | }, [](const Storage::Error &) { | ||
62 | }); | ||
63 | const qint64 topRevision = Storage::maxRevision(mainStoreTransaction); | ||
64 | |||
65 | if (lastReplayedRevision < topRevision) { | ||
66 | qint64 revision = lastReplayedRevision; | ||
67 | for (;revision <= topRevision; revision++) { | ||
68 | const auto uid = Storage::getUidFromRevision(mainStoreTransaction, revision); | ||
69 | const auto type = Storage::getTypeFromRevision(mainStoreTransaction, revision); | ||
70 | const auto key = Storage::assembleKey(uid, revision); | ||
71 | mainStoreTransaction.openDatabase(type + ".main").scan(key, [&lastReplayedRevision, type, this](const QByteArray &key, const QByteArray &value) -> bool { | ||
72 | mReplayFunction(type, key, value).exec(); | ||
73 | //TODO make for loop async, and pass to async replay function together with type | ||
74 | Trace() << "Replaying " << key; | ||
75 | return false; | ||
76 | }, [key](const Storage::Error &) { | ||
77 | ErrorMsg() << "Failed to replay change " << key; | ||
78 | }); | ||
79 | } | ||
80 | revision--; | ||
81 | replayStoreTransaction.openDatabase().write("lastReplayedRevision", QByteArray::number(revision)); | ||
82 | replayStoreTransaction.commit(); | ||
83 | Trace() << "Replayed until " << revision; | ||
84 | } | ||
85 | } | ||
86 | |||
87 | private: | ||
88 | Akonadi2::Storage mChangeReplayStore; | ||
89 | Akonadi2::Storage mStorage; | ||
90 | ReplayFunction mReplayFunction; | ||
91 | }; | ||
92 | |||
93 | /** | ||
16 | * Drives the pipeline using the output from all command queues | 94 | * Drives the pipeline using the output from all command queues |
17 | */ | 95 | */ |
18 | class Processor : public QObject | 96 | class Processor : public QObject |
@@ -181,11 +259,19 @@ GenericResource::GenericResource(const QByteArray &resourceInstanceIdentifier, c | |||
181 | mSynchronizerQueue(Akonadi2::storageLocation(), resourceInstanceIdentifier + ".synchronizerqueue"), | 259 | mSynchronizerQueue(Akonadi2::storageLocation(), resourceInstanceIdentifier + ".synchronizerqueue"), |
182 | mResourceInstanceIdentifier(resourceInstanceIdentifier), | 260 | mResourceInstanceIdentifier(resourceInstanceIdentifier), |
183 | mPipeline(pipeline ? pipeline : QSharedPointer<Akonadi2::Pipeline>::create(resourceInstanceIdentifier)), | 261 | mPipeline(pipeline ? pipeline : QSharedPointer<Akonadi2::Pipeline>::create(resourceInstanceIdentifier)), |
184 | mError(0) | 262 | mError(0), |
263 | mClientLowerBoundRevision(std::numeric_limits<qint64>::max()) | ||
185 | { | 264 | { |
186 | mProcessor = new Processor(mPipeline.data(), QList<MessageQueue*>() << &mUserQueue << &mSynchronizerQueue); | 265 | mProcessor = new Processor(mPipeline.data(), QList<MessageQueue*>() << &mUserQueue << &mSynchronizerQueue); |
187 | QObject::connect(mProcessor, &Processor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); }); | 266 | QObject::connect(mProcessor, &Processor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); }); |
188 | QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, this, &Resource::revisionUpdated); | 267 | QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, this, &Resource::revisionUpdated); |
268 | mSourceChangeReplay = new ChangeReplay(resourceInstanceIdentifier, [](const QByteArray &type, const QByteArray &key, const QByteArray &value) { | ||
269 | return KAsync::null<void>(); | ||
270 | }); | ||
271 | QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, mSourceChangeReplay, &ChangeReplay::revisionChanged); | ||
272 | QObject::connect(mSourceChangeReplay, &ChangeReplay::changesReplayed, this, &GenericResource::updateLowerBoundRevision); | ||
273 | mClientLowerBoundRevision = mPipeline->cleanedUpRevision(); | ||
274 | mProcessor->setOldestUsedRevision(mSourceChangeReplay->getLastReplayedRevision()); | ||
189 | 275 | ||
190 | mCommitQueueTimer.setInterval(100); | 276 | mCommitQueueTimer.setInterval(100); |
191 | mCommitQueueTimer.setSingleShot(true); | 277 | mCommitQueueTimer.setSingleShot(true); |
@@ -194,8 +280,15 @@ GenericResource::GenericResource(const QByteArray &resourceInstanceIdentifier, c | |||
194 | 280 | ||
195 | GenericResource::~GenericResource() | 281 | GenericResource::~GenericResource() |
196 | { | 282 | { |
283 | delete mProcessor; | ||
284 | delete mSourceChangeReplay; | ||
197 | } | 285 | } |
198 | 286 | ||
287 | // void GenericResource::revisionChanged() | ||
288 | // { | ||
289 | // //TODO replay revision | ||
290 | // } | ||
291 | |||
199 | void GenericResource::onProcessorError(int errorCode, const QString &errorMessage) | 292 | void GenericResource::onProcessorError(int errorCode, const QString &errorMessage) |
200 | { | 293 | { |
201 | Warning() << "Received error from Processor: " << errorCode << errorMessage; | 294 | Warning() << "Received error from Processor: " << errorCode << errorMessage; |
@@ -266,9 +359,15 @@ KAsync::Job<void> GenericResource::processAllMessages() | |||
266 | }); | 359 | }); |
267 | } | 360 | } |
268 | 361 | ||
362 | void GenericResource::updateLowerBoundRevision() | ||
363 | { | ||
364 | mProcessor->setOldestUsedRevision(qMin(mClientLowerBoundRevision, mSourceChangeReplay->getLastReplayedRevision())); | ||
365 | } | ||
366 | |||
269 | void GenericResource::setLowerBoundRevision(qint64 revision) | 367 | void GenericResource::setLowerBoundRevision(qint64 revision) |
270 | { | 368 | { |
271 | mProcessor->setOldestUsedRevision(revision); | 369 | mClientLowerBoundRevision = revision; |
370 | updateLowerBoundRevision(); | ||
272 | } | 371 | } |
273 | 372 | ||
274 | #include "genericresource.moc" | 373 | #include "genericresource.moc" |