summaryrefslogtreecommitdiffstats
path: root/common/genericresource.cpp
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2015-10-20 16:46:59 +0200
committerChristian Mollekopf <chrigi_1@fastmail.fm>2015-10-20 16:55:20 +0200
commitd6c3e998dbb61eedf37773082bf58ffbd11663b0 (patch)
treeded84435440d80257d0d79d17ad9647bd07f2151 /common/genericresource.cpp
parentdfb0495ead6cf25f1e9e1ed03a78d249ae2f4498 (diff)
downloadsink-d6c3e998dbb61eedf37773082bf58ffbd11663b0.tar.gz
sink-d6c3e998dbb61eedf37773082bf58ffbd11663b0.zip
Draft of ChangeReplay
Diffstat (limited to 'common/genericresource.cpp')
-rw-r--r--common/genericresource.cpp103
1 files changed, 101 insertions, 2 deletions
diff --git a/common/genericresource.cpp b/common/genericresource.cpp
index 313d99c..8ae20ed 100644
--- a/common/genericresource.cpp
+++ b/common/genericresource.cpp
@@ -13,6 +13,84 @@
13using namespace Akonadi2; 13using namespace Akonadi2;
14 14
15/** 15/**
16 * Replays changes from the storage one by one.
17 *
18 * Uses a local database to:
19 * * Remember what changes have been replayed already.
20 * * store a mapping of remote to local buffers
21 */
22class ChangeReplay : public QObject
23{
24 Q_OBJECT
25public:
26
27 typedef std::function<KAsync::Job<void>(const QByteArray &type, const QByteArray &key, const QByteArray &value)> ReplayFunction;
28
29 ChangeReplay(const QString &resourceName, const ReplayFunction replayFunction)
30 : mStorage(storageLocation(), resourceName, Storage::ReadOnly),
31 mChangeReplayStore(storageLocation(), resourceName + ".changereplay", Storage::ReadWrite),
32 mReplayFunction(replayFunction)
33 {
34
35 }
36
37 qint64 getLastReplayedRevision()
38 {
39 qint64 lastReplayedRevision = 0;
40 auto replayStoreTransaction = mChangeReplayStore.createTransaction(Storage::ReadOnly);
41 replayStoreTransaction.openDatabase().scan("lastReplayedRevision", [&lastReplayedRevision](const QByteArray &key, const QByteArray &value) -> bool {
42 lastReplayedRevision = value.toLongLong();
43 return false;
44 }, [](const Storage::Error &) {
45 });
46 return lastReplayedRevision;
47 }
48
49Q_SIGNALS:
50 void changesReplayed();
51
52public Q_SLOTS:
53 void revisionChanged()
54 {
55 auto mainStoreTransaction = mStorage.createTransaction(Storage::ReadOnly);
56 auto replayStoreTransaction = mChangeReplayStore.createTransaction(Storage::ReadWrite);
57 qint64 lastReplayedRevision = 0;
58 replayStoreTransaction.openDatabase().scan("lastReplayedRevision", [&lastReplayedRevision](const QByteArray &key, const QByteArray &value) -> bool {
59 lastReplayedRevision = value.toLongLong();
60 return false;
61 }, [](const Storage::Error &) {
62 });
63 const qint64 topRevision = Storage::maxRevision(mainStoreTransaction);
64
65 if (lastReplayedRevision < topRevision) {
66 qint64 revision = lastReplayedRevision;
67 for (;revision <= topRevision; revision++) {
68 const auto uid = Storage::getUidFromRevision(mainStoreTransaction, revision);
69 const auto type = Storage::getTypeFromRevision(mainStoreTransaction, revision);
70 const auto key = Storage::assembleKey(uid, revision);
71 mainStoreTransaction.openDatabase(type + ".main").scan(key, [&lastReplayedRevision, type, this](const QByteArray &key, const QByteArray &value) -> bool {
72 mReplayFunction(type, key, value).exec();
73 //TODO make for loop async, and pass to async replay function together with type
74 Trace() << "Replaying " << key;
75 return false;
76 }, [key](const Storage::Error &) {
77 ErrorMsg() << "Failed to replay change " << key;
78 });
79 }
80 revision--;
81 replayStoreTransaction.openDatabase().write("lastReplayedRevision", QByteArray::number(revision));
82 replayStoreTransaction.commit();
83 Trace() << "Replayed until " << revision;
84 }
85 }
86
87private:
88 Akonadi2::Storage mChangeReplayStore;
89 Akonadi2::Storage mStorage;
90 ReplayFunction mReplayFunction;
91};
92
93/**
16 * Drives the pipeline using the output from all command queues 94 * Drives the pipeline using the output from all command queues
17 */ 95 */
18class Processor : public QObject 96class Processor : public QObject
@@ -181,11 +259,19 @@ GenericResource::GenericResource(const QByteArray &resourceInstanceIdentifier, c
181 mSynchronizerQueue(Akonadi2::storageLocation(), resourceInstanceIdentifier + ".synchronizerqueue"), 259 mSynchronizerQueue(Akonadi2::storageLocation(), resourceInstanceIdentifier + ".synchronizerqueue"),
182 mResourceInstanceIdentifier(resourceInstanceIdentifier), 260 mResourceInstanceIdentifier(resourceInstanceIdentifier),
183 mPipeline(pipeline ? pipeline : QSharedPointer<Akonadi2::Pipeline>::create(resourceInstanceIdentifier)), 261 mPipeline(pipeline ? pipeline : QSharedPointer<Akonadi2::Pipeline>::create(resourceInstanceIdentifier)),
184 mError(0) 262 mError(0),
263 mClientLowerBoundRevision(std::numeric_limits<qint64>::max())
185{ 264{
186 mProcessor = new Processor(mPipeline.data(), QList<MessageQueue*>() << &mUserQueue << &mSynchronizerQueue); 265 mProcessor = new Processor(mPipeline.data(), QList<MessageQueue*>() << &mUserQueue << &mSynchronizerQueue);
187 QObject::connect(mProcessor, &Processor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); }); 266 QObject::connect(mProcessor, &Processor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); });
188 QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, this, &Resource::revisionUpdated); 267 QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, this, &Resource::revisionUpdated);
268 mSourceChangeReplay = new ChangeReplay(resourceInstanceIdentifier, [](const QByteArray &type, const QByteArray &key, const QByteArray &value) {
269 return KAsync::null<void>();
270 });
271 QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, mSourceChangeReplay, &ChangeReplay::revisionChanged);
272 QObject::connect(mSourceChangeReplay, &ChangeReplay::changesReplayed, this, &GenericResource::updateLowerBoundRevision);
273 mClientLowerBoundRevision = mPipeline->cleanedUpRevision();
274 mProcessor->setOldestUsedRevision(mSourceChangeReplay->getLastReplayedRevision());
189 275
190 mCommitQueueTimer.setInterval(100); 276 mCommitQueueTimer.setInterval(100);
191 mCommitQueueTimer.setSingleShot(true); 277 mCommitQueueTimer.setSingleShot(true);
@@ -194,8 +280,15 @@ GenericResource::GenericResource(const QByteArray &resourceInstanceIdentifier, c
194 280
195GenericResource::~GenericResource() 281GenericResource::~GenericResource()
196{ 282{
283 delete mProcessor;
284 delete mSourceChangeReplay;
197} 285}
198 286
287// void GenericResource::revisionChanged()
288// {
289// //TODO replay revision
290// }
291
199void GenericResource::onProcessorError(int errorCode, const QString &errorMessage) 292void GenericResource::onProcessorError(int errorCode, const QString &errorMessage)
200{ 293{
201 Warning() << "Received error from Processor: " << errorCode << errorMessage; 294 Warning() << "Received error from Processor: " << errorCode << errorMessage;
@@ -266,9 +359,15 @@ KAsync::Job<void> GenericResource::processAllMessages()
266 }); 359 });
267} 360}
268 361
362void GenericResource::updateLowerBoundRevision()
363{
364 mProcessor->setOldestUsedRevision(qMin(mClientLowerBoundRevision, mSourceChangeReplay->getLastReplayedRevision()));
365}
366
269void GenericResource::setLowerBoundRevision(qint64 revision) 367void GenericResource::setLowerBoundRevision(qint64 revision)
270{ 368{
271 mProcessor->setOldestUsedRevision(revision); 369 mClientLowerBoundRevision = revision;
370 updateLowerBoundRevision();
272} 371}
273 372
274#include "genericresource.moc" 373#include "genericresource.moc"