diff options
author | Christian Mollekopf <chrigi_1@fastmail.fm> | 2015-10-20 16:46:59 +0200 |
---|---|---|
committer | Christian Mollekopf <chrigi_1@fastmail.fm> | 2015-10-20 16:55:20 +0200 |
commit | d6c3e998dbb61eedf37773082bf58ffbd11663b0 (patch) | |
tree | ded84435440d80257d0d79d17ad9647bd07f2151 | |
parent | dfb0495ead6cf25f1e9e1ed03a78d249ae2f4498 (diff) | |
download | sink-d6c3e998dbb61eedf37773082bf58ffbd11663b0.tar.gz sink-d6c3e998dbb61eedf37773082bf58ffbd11663b0.zip |
Draft of ChangeReplay
-rw-r--r-- | common/genericresource.cpp | 103 | ||||
-rw-r--r-- | common/genericresource.h | 6 | ||||
-rw-r--r-- | common/listener.cpp | 15 |
3 files changed, 122 insertions, 2 deletions
diff --git a/common/genericresource.cpp b/common/genericresource.cpp index 313d99c..8ae20ed 100644 --- a/common/genericresource.cpp +++ b/common/genericresource.cpp | |||
@@ -13,6 +13,84 @@ | |||
13 | using namespace Akonadi2; | 13 | using namespace Akonadi2; |
14 | 14 | ||
15 | /** | 15 | /** |
16 | * Replays changes from the storage one by one. | ||
17 | * | ||
18 | * Uses a local database to: | ||
19 | * * Remember what changes have been replayed already. | ||
20 | * * store a mapping of remote to local buffers | ||
21 | */ | ||
22 | class ChangeReplay : public QObject | ||
23 | { | ||
24 | Q_OBJECT | ||
25 | public: | ||
26 | |||
27 | typedef std::function<KAsync::Job<void>(const QByteArray &type, const QByteArray &key, const QByteArray &value)> ReplayFunction; | ||
28 | |||
29 | ChangeReplay(const QString &resourceName, const ReplayFunction replayFunction) | ||
30 | : mStorage(storageLocation(), resourceName, Storage::ReadOnly), | ||
31 | mChangeReplayStore(storageLocation(), resourceName + ".changereplay", Storage::ReadWrite), | ||
32 | mReplayFunction(replayFunction) | ||
33 | { | ||
34 | |||
35 | } | ||
36 | |||
37 | qint64 getLastReplayedRevision() | ||
38 | { | ||
39 | qint64 lastReplayedRevision = 0; | ||
40 | auto replayStoreTransaction = mChangeReplayStore.createTransaction(Storage::ReadOnly); | ||
41 | replayStoreTransaction.openDatabase().scan("lastReplayedRevision", [&lastReplayedRevision](const QByteArray &key, const QByteArray &value) -> bool { | ||
42 | lastReplayedRevision = value.toLongLong(); | ||
43 | return false; | ||
44 | }, [](const Storage::Error &) { | ||
45 | }); | ||
46 | return lastReplayedRevision; | ||
47 | } | ||
48 | |||
49 | Q_SIGNALS: | ||
50 | void changesReplayed(); | ||
51 | |||
52 | public Q_SLOTS: | ||
53 | void revisionChanged() | ||
54 | { | ||
55 | auto mainStoreTransaction = mStorage.createTransaction(Storage::ReadOnly); | ||
56 | auto replayStoreTransaction = mChangeReplayStore.createTransaction(Storage::ReadWrite); | ||
57 | qint64 lastReplayedRevision = 0; | ||
58 | replayStoreTransaction.openDatabase().scan("lastReplayedRevision", [&lastReplayedRevision](const QByteArray &key, const QByteArray &value) -> bool { | ||
59 | lastReplayedRevision = value.toLongLong(); | ||
60 | return false; | ||
61 | }, [](const Storage::Error &) { | ||
62 | }); | ||
63 | const qint64 topRevision = Storage::maxRevision(mainStoreTransaction); | ||
64 | |||
65 | if (lastReplayedRevision < topRevision) { | ||
66 | qint64 revision = lastReplayedRevision; | ||
67 | for (;revision <= topRevision; revision++) { | ||
68 | const auto uid = Storage::getUidFromRevision(mainStoreTransaction, revision); | ||
69 | const auto type = Storage::getTypeFromRevision(mainStoreTransaction, revision); | ||
70 | const auto key = Storage::assembleKey(uid, revision); | ||
71 | mainStoreTransaction.openDatabase(type + ".main").scan(key, [&lastReplayedRevision, type, this](const QByteArray &key, const QByteArray &value) -> bool { | ||
72 | mReplayFunction(type, key, value).exec(); | ||
73 | //TODO make for loop async, and pass to async replay function together with type | ||
74 | Trace() << "Replaying " << key; | ||
75 | return false; | ||
76 | }, [key](const Storage::Error &) { | ||
77 | ErrorMsg() << "Failed to replay change " << key; | ||
78 | }); | ||
79 | } | ||
80 | revision--; | ||
81 | replayStoreTransaction.openDatabase().write("lastReplayedRevision", QByteArray::number(revision)); | ||
82 | replayStoreTransaction.commit(); | ||
83 | Trace() << "Replayed until " << revision; | ||
84 | } | ||
85 | } | ||
86 | |||
87 | private: | ||
88 | Akonadi2::Storage mChangeReplayStore; | ||
89 | Akonadi2::Storage mStorage; | ||
90 | ReplayFunction mReplayFunction; | ||
91 | }; | ||
92 | |||
93 | /** | ||
16 | * Drives the pipeline using the output from all command queues | 94 | * Drives the pipeline using the output from all command queues |
17 | */ | 95 | */ |
18 | class Processor : public QObject | 96 | class Processor : public QObject |
@@ -181,11 +259,19 @@ GenericResource::GenericResource(const QByteArray &resourceInstanceIdentifier, c | |||
181 | mSynchronizerQueue(Akonadi2::storageLocation(), resourceInstanceIdentifier + ".synchronizerqueue"), | 259 | mSynchronizerQueue(Akonadi2::storageLocation(), resourceInstanceIdentifier + ".synchronizerqueue"), |
182 | mResourceInstanceIdentifier(resourceInstanceIdentifier), | 260 | mResourceInstanceIdentifier(resourceInstanceIdentifier), |
183 | mPipeline(pipeline ? pipeline : QSharedPointer<Akonadi2::Pipeline>::create(resourceInstanceIdentifier)), | 261 | mPipeline(pipeline ? pipeline : QSharedPointer<Akonadi2::Pipeline>::create(resourceInstanceIdentifier)), |
184 | mError(0) | 262 | mError(0), |
263 | mClientLowerBoundRevision(std::numeric_limits<qint64>::max()) | ||
185 | { | 264 | { |
186 | mProcessor = new Processor(mPipeline.data(), QList<MessageQueue*>() << &mUserQueue << &mSynchronizerQueue); | 265 | mProcessor = new Processor(mPipeline.data(), QList<MessageQueue*>() << &mUserQueue << &mSynchronizerQueue); |
187 | QObject::connect(mProcessor, &Processor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); }); | 266 | QObject::connect(mProcessor, &Processor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); }); |
188 | QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, this, &Resource::revisionUpdated); | 267 | QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, this, &Resource::revisionUpdated); |
268 | mSourceChangeReplay = new ChangeReplay(resourceInstanceIdentifier, [](const QByteArray &type, const QByteArray &key, const QByteArray &value) { | ||
269 | return KAsync::null<void>(); | ||
270 | }); | ||
271 | QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, mSourceChangeReplay, &ChangeReplay::revisionChanged); | ||
272 | QObject::connect(mSourceChangeReplay, &ChangeReplay::changesReplayed, this, &GenericResource::updateLowerBoundRevision); | ||
273 | mClientLowerBoundRevision = mPipeline->cleanedUpRevision(); | ||
274 | mProcessor->setOldestUsedRevision(mSourceChangeReplay->getLastReplayedRevision()); | ||
189 | 275 | ||
190 | mCommitQueueTimer.setInterval(100); | 276 | mCommitQueueTimer.setInterval(100); |
191 | mCommitQueueTimer.setSingleShot(true); | 277 | mCommitQueueTimer.setSingleShot(true); |
@@ -194,8 +280,15 @@ GenericResource::GenericResource(const QByteArray &resourceInstanceIdentifier, c | |||
194 | 280 | ||
195 | GenericResource::~GenericResource() | 281 | GenericResource::~GenericResource() |
196 | { | 282 | { |
283 | delete mProcessor; | ||
284 | delete mSourceChangeReplay; | ||
197 | } | 285 | } |
198 | 286 | ||
287 | // void GenericResource::revisionChanged() | ||
288 | // { | ||
289 | // //TODO replay revision | ||
290 | // } | ||
291 | |||
199 | void GenericResource::onProcessorError(int errorCode, const QString &errorMessage) | 292 | void GenericResource::onProcessorError(int errorCode, const QString &errorMessage) |
200 | { | 293 | { |
201 | Warning() << "Received error from Processor: " << errorCode << errorMessage; | 294 | Warning() << "Received error from Processor: " << errorCode << errorMessage; |
@@ -266,9 +359,15 @@ KAsync::Job<void> GenericResource::processAllMessages() | |||
266 | }); | 359 | }); |
267 | } | 360 | } |
268 | 361 | ||
362 | void GenericResource::updateLowerBoundRevision() | ||
363 | { | ||
364 | mProcessor->setOldestUsedRevision(qMin(mClientLowerBoundRevision, mSourceChangeReplay->getLastReplayedRevision())); | ||
365 | } | ||
366 | |||
269 | void GenericResource::setLowerBoundRevision(qint64 revision) | 367 | void GenericResource::setLowerBoundRevision(qint64 revision) |
270 | { | 368 | { |
271 | mProcessor->setOldestUsedRevision(revision); | 369 | mClientLowerBoundRevision = revision; |
370 | updateLowerBoundRevision(); | ||
272 | } | 371 | } |
273 | 372 | ||
274 | #include "genericresource.moc" | 373 | #include "genericresource.moc" |
diff --git a/common/genericresource.h b/common/genericresource.h index 052a9f5..cfc6653 100644 --- a/common/genericresource.h +++ b/common/genericresource.h | |||
@@ -26,6 +26,7 @@ | |||
26 | #include <QTimer> | 26 | #include <QTimer> |
27 | 27 | ||
28 | class Processor; | 28 | class Processor; |
29 | class ChangeReplay; | ||
29 | 30 | ||
30 | namespace Akonadi2 | 31 | namespace Akonadi2 |
31 | { | 32 | { |
@@ -47,6 +48,9 @@ public: | |||
47 | 48 | ||
48 | int error() const; | 49 | int error() const; |
49 | 50 | ||
51 | private Q_SLOTS: | ||
52 | void updateLowerBoundRevision(); | ||
53 | |||
50 | protected: | 54 | protected: |
51 | void onProcessorError(int errorCode, const QString &errorMessage); | 55 | void onProcessorError(int errorCode, const QString &errorMessage); |
52 | void enqueueCommand(MessageQueue &mq, int commandId, const QByteArray &data); | 56 | void enqueueCommand(MessageQueue &mq, int commandId, const QByteArray &data); |
@@ -58,8 +62,10 @@ protected: | |||
58 | 62 | ||
59 | private: | 63 | private: |
60 | Processor *mProcessor; | 64 | Processor *mProcessor; |
65 | ChangeReplay *mSourceChangeReplay; | ||
61 | int mError; | 66 | int mError; |
62 | QTimer mCommitQueueTimer; | 67 | QTimer mCommitQueueTimer; |
68 | qint64 mClientLowerBoundRevision; | ||
63 | }; | 69 | }; |
64 | 70 | ||
65 | } | 71 | } |
diff --git a/common/listener.cpp b/common/listener.cpp index 16da770..6e6808a 100644 --- a/common/listener.cpp +++ b/common/listener.cpp | |||
@@ -114,6 +114,14 @@ void Listener::acceptConnection() | |||
114 | this, &Listener::clientDropped); | 114 | this, &Listener::clientDropped); |
115 | m_checkConnectionsTimer->stop(); | 115 | m_checkConnectionsTimer->stop(); |
116 | 116 | ||
117 | //If this is the first client, set the lower limit for revision cleanup | ||
118 | if (m_connections.size() == 1) { | ||
119 | loadResource(); | ||
120 | if (m_resource) { | ||
121 | m_resource->setLowerBoundRevision(0); | ||
122 | } | ||
123 | } | ||
124 | |||
117 | if (socket->bytesAvailable()) { | 125 | if (socket->bytesAvailable()) { |
118 | readFromSocket(socket); | 126 | readFromSocket(socket); |
119 | } | 127 | } |
@@ -146,6 +154,13 @@ void Listener::clientDropped() | |||
146 | 154 | ||
147 | void Listener::checkConnections() | 155 | void Listener::checkConnections() |
148 | { | 156 | { |
157 | //If this was the last client, disengage the lower limit for revision cleanup | ||
158 | if (m_connections.isEmpty()) { | ||
159 | loadResource(); | ||
160 | if (m_resource) { | ||
161 | m_resource->setLowerBoundRevision(std::numeric_limits<qint64>::max()); | ||
162 | } | ||
163 | } | ||
149 | m_checkConnectionsTimer->start(); | 164 | m_checkConnectionsTimer->start(); |
150 | } | 165 | } |
151 | 166 | ||