summaryrefslogtreecommitdiffstats
path: root/common
diff options
context:
space:
mode:
Diffstat (limited to 'common')
-rw-r--r--common/genericresource.cpp103
-rw-r--r--common/genericresource.h6
-rw-r--r--common/listener.cpp15
3 files changed, 122 insertions, 2 deletions
diff --git a/common/genericresource.cpp b/common/genericresource.cpp
index 313d99c..8ae20ed 100644
--- a/common/genericresource.cpp
+++ b/common/genericresource.cpp
@@ -13,6 +13,84 @@
13using namespace Akonadi2; 13using namespace Akonadi2;
14 14
15/** 15/**
16 * Replays changes from the storage one by one.
17 *
18 * Uses a local database to:
19 * * Remember what changes have been replayed already.
20 * * store a mapping of remote to local buffers
21 */
22class ChangeReplay : public QObject
23{
24 Q_OBJECT
25public:
26
27 typedef std::function<KAsync::Job<void>(const QByteArray &type, const QByteArray &key, const QByteArray &value)> ReplayFunction;
28
29 ChangeReplay(const QString &resourceName, const ReplayFunction replayFunction)
30 : mStorage(storageLocation(), resourceName, Storage::ReadOnly),
31 mChangeReplayStore(storageLocation(), resourceName + ".changereplay", Storage::ReadWrite),
32 mReplayFunction(replayFunction)
33 {
34
35 }
36
37 qint64 getLastReplayedRevision()
38 {
39 qint64 lastReplayedRevision = 0;
40 auto replayStoreTransaction = mChangeReplayStore.createTransaction(Storage::ReadOnly);
41 replayStoreTransaction.openDatabase().scan("lastReplayedRevision", [&lastReplayedRevision](const QByteArray &key, const QByteArray &value) -> bool {
42 lastReplayedRevision = value.toLongLong();
43 return false;
44 }, [](const Storage::Error &) {
45 });
46 return lastReplayedRevision;
47 }
48
49Q_SIGNALS:
50 void changesReplayed();
51
52public Q_SLOTS:
53 void revisionChanged()
54 {
55 auto mainStoreTransaction = mStorage.createTransaction(Storage::ReadOnly);
56 auto replayStoreTransaction = mChangeReplayStore.createTransaction(Storage::ReadWrite);
57 qint64 lastReplayedRevision = 0;
58 replayStoreTransaction.openDatabase().scan("lastReplayedRevision", [&lastReplayedRevision](const QByteArray &key, const QByteArray &value) -> bool {
59 lastReplayedRevision = value.toLongLong();
60 return false;
61 }, [](const Storage::Error &) {
62 });
63 const qint64 topRevision = Storage::maxRevision(mainStoreTransaction);
64
65 if (lastReplayedRevision < topRevision) {
66 qint64 revision = lastReplayedRevision;
67 for (;revision <= topRevision; revision++) {
68 const auto uid = Storage::getUidFromRevision(mainStoreTransaction, revision);
69 const auto type = Storage::getTypeFromRevision(mainStoreTransaction, revision);
70 const auto key = Storage::assembleKey(uid, revision);
71 mainStoreTransaction.openDatabase(type + ".main").scan(key, [&lastReplayedRevision, type, this](const QByteArray &key, const QByteArray &value) -> bool {
72 mReplayFunction(type, key, value).exec();
73 //TODO make for loop async, and pass to async replay function together with type
74 Trace() << "Replaying " << key;
75 return false;
76 }, [key](const Storage::Error &) {
77 ErrorMsg() << "Failed to replay change " << key;
78 });
79 }
80 revision--;
81 replayStoreTransaction.openDatabase().write("lastReplayedRevision", QByteArray::number(revision));
82 replayStoreTransaction.commit();
83 Trace() << "Replayed until " << revision;
84 }
85 }
86
87private:
88 Akonadi2::Storage mChangeReplayStore;
89 Akonadi2::Storage mStorage;
90 ReplayFunction mReplayFunction;
91};
92
93/**
16 * Drives the pipeline using the output from all command queues 94 * Drives the pipeline using the output from all command queues
17 */ 95 */
18class Processor : public QObject 96class Processor : public QObject
@@ -181,11 +259,19 @@ GenericResource::GenericResource(const QByteArray &resourceInstanceIdentifier, c
181 mSynchronizerQueue(Akonadi2::storageLocation(), resourceInstanceIdentifier + ".synchronizerqueue"), 259 mSynchronizerQueue(Akonadi2::storageLocation(), resourceInstanceIdentifier + ".synchronizerqueue"),
182 mResourceInstanceIdentifier(resourceInstanceIdentifier), 260 mResourceInstanceIdentifier(resourceInstanceIdentifier),
183 mPipeline(pipeline ? pipeline : QSharedPointer<Akonadi2::Pipeline>::create(resourceInstanceIdentifier)), 261 mPipeline(pipeline ? pipeline : QSharedPointer<Akonadi2::Pipeline>::create(resourceInstanceIdentifier)),
184 mError(0) 262 mError(0),
263 mClientLowerBoundRevision(std::numeric_limits<qint64>::max())
185{ 264{
186 mProcessor = new Processor(mPipeline.data(), QList<MessageQueue*>() << &mUserQueue << &mSynchronizerQueue); 265 mProcessor = new Processor(mPipeline.data(), QList<MessageQueue*>() << &mUserQueue << &mSynchronizerQueue);
187 QObject::connect(mProcessor, &Processor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); }); 266 QObject::connect(mProcessor, &Processor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); });
188 QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, this, &Resource::revisionUpdated); 267 QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, this, &Resource::revisionUpdated);
268 mSourceChangeReplay = new ChangeReplay(resourceInstanceIdentifier, [](const QByteArray &type, const QByteArray &key, const QByteArray &value) {
269 return KAsync::null<void>();
270 });
271 QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, mSourceChangeReplay, &ChangeReplay::revisionChanged);
272 QObject::connect(mSourceChangeReplay, &ChangeReplay::changesReplayed, this, &GenericResource::updateLowerBoundRevision);
273 mClientLowerBoundRevision = mPipeline->cleanedUpRevision();
274 mProcessor->setOldestUsedRevision(mSourceChangeReplay->getLastReplayedRevision());
189 275
190 mCommitQueueTimer.setInterval(100); 276 mCommitQueueTimer.setInterval(100);
191 mCommitQueueTimer.setSingleShot(true); 277 mCommitQueueTimer.setSingleShot(true);
@@ -194,8 +280,15 @@ GenericResource::GenericResource(const QByteArray &resourceInstanceIdentifier, c
194 280
195GenericResource::~GenericResource() 281GenericResource::~GenericResource()
196{ 282{
283 delete mProcessor;
284 delete mSourceChangeReplay;
197} 285}
198 286
287// void GenericResource::revisionChanged()
288// {
289// //TODO replay revision
290// }
291
199void GenericResource::onProcessorError(int errorCode, const QString &errorMessage) 292void GenericResource::onProcessorError(int errorCode, const QString &errorMessage)
200{ 293{
201 Warning() << "Received error from Processor: " << errorCode << errorMessage; 294 Warning() << "Received error from Processor: " << errorCode << errorMessage;
@@ -266,9 +359,15 @@ KAsync::Job<void> GenericResource::processAllMessages()
266 }); 359 });
267} 360}
268 361
362void GenericResource::updateLowerBoundRevision()
363{
364 mProcessor->setOldestUsedRevision(qMin(mClientLowerBoundRevision, mSourceChangeReplay->getLastReplayedRevision()));
365}
366
269void GenericResource::setLowerBoundRevision(qint64 revision) 367void GenericResource::setLowerBoundRevision(qint64 revision)
270{ 368{
271 mProcessor->setOldestUsedRevision(revision); 369 mClientLowerBoundRevision = revision;
370 updateLowerBoundRevision();
272} 371}
273 372
274#include "genericresource.moc" 373#include "genericresource.moc"
diff --git a/common/genericresource.h b/common/genericresource.h
index 052a9f5..cfc6653 100644
--- a/common/genericresource.h
+++ b/common/genericresource.h
@@ -26,6 +26,7 @@
26#include <QTimer> 26#include <QTimer>
27 27
28class Processor; 28class Processor;
29class ChangeReplay;
29 30
30namespace Akonadi2 31namespace Akonadi2
31{ 32{
@@ -47,6 +48,9 @@ public:
47 48
48 int error() const; 49 int error() const;
49 50
51private Q_SLOTS:
52 void updateLowerBoundRevision();
53
50protected: 54protected:
51 void onProcessorError(int errorCode, const QString &errorMessage); 55 void onProcessorError(int errorCode, const QString &errorMessage);
52 void enqueueCommand(MessageQueue &mq, int commandId, const QByteArray &data); 56 void enqueueCommand(MessageQueue &mq, int commandId, const QByteArray &data);
@@ -58,8 +62,10 @@ protected:
58 62
59private: 63private:
60 Processor *mProcessor; 64 Processor *mProcessor;
65 ChangeReplay *mSourceChangeReplay;
61 int mError; 66 int mError;
62 QTimer mCommitQueueTimer; 67 QTimer mCommitQueueTimer;
68 qint64 mClientLowerBoundRevision;
63}; 69};
64 70
65} 71}
diff --git a/common/listener.cpp b/common/listener.cpp
index 16da770..6e6808a 100644
--- a/common/listener.cpp
+++ b/common/listener.cpp
@@ -114,6 +114,14 @@ void Listener::acceptConnection()
114 this, &Listener::clientDropped); 114 this, &Listener::clientDropped);
115 m_checkConnectionsTimer->stop(); 115 m_checkConnectionsTimer->stop();
116 116
117 //If this is the first client, set the lower limit for revision cleanup
118 if (m_connections.size() == 1) {
119 loadResource();
120 if (m_resource) {
121 m_resource->setLowerBoundRevision(0);
122 }
123 }
124
117 if (socket->bytesAvailable()) { 125 if (socket->bytesAvailable()) {
118 readFromSocket(socket); 126 readFromSocket(socket);
119 } 127 }
@@ -146,6 +154,13 @@ void Listener::clientDropped()
146 154
147void Listener::checkConnections() 155void Listener::checkConnections()
148{ 156{
157 //If this was the last client, disengage the lower limit for revision cleanup
158 if (m_connections.isEmpty()) {
159 loadResource();
160 if (m_resource) {
161 m_resource->setLowerBoundRevision(std::numeric_limits<qint64>::max());
162 }
163 }
149 m_checkConnectionsTimer->start(); 164 m_checkConnectionsTimer->start();
150} 165}
151 166