diff options
-rw-r--r-- | common/CMakeLists.txt | 1 | ||||
-rw-r--r-- | common/commands.cpp | 2 | ||||
-rw-r--r-- | common/commands.h | 1 | ||||
-rw-r--r-- | common/commands/revisionreplayed.fbs | 7 | ||||
-rw-r--r-- | common/facade.h | 3 | ||||
-rw-r--r-- | common/genericresource.cpp | 22 | ||||
-rw-r--r-- | common/genericresource.h | 1 | ||||
-rw-r--r-- | common/listener.cpp | 34 | ||||
-rw-r--r-- | common/listener.h | 8 | ||||
-rw-r--r-- | common/resource.cpp | 5 | ||||
-rw-r--r-- | common/resource.h | 1 | ||||
-rw-r--r-- | common/resourceaccess.cpp | 10 | ||||
-rw-r--r-- | common/resourceaccess.h | 8 |
13 files changed, 78 insertions, 25 deletions
diff --git a/common/CMakeLists.txt b/common/CMakeLists.txt index 25ea667..61019b3 100644 --- a/common/CMakeLists.txt +++ b/common/CMakeLists.txt | |||
@@ -51,6 +51,7 @@ generate_flatbuffers( | |||
51 | commands/revisionupdate | 51 | commands/revisionupdate |
52 | commands/synchronize | 52 | commands/synchronize |
53 | commands/notification | 53 | commands/notification |
54 | commands/revisionreplayed | ||
54 | domain/event | 55 | domain/event |
55 | domain/mail | 56 | domain/mail |
56 | entity | 57 | entity |
diff --git a/common/commands.cpp b/common/commands.cpp index 221c211..7a0ae23 100644 --- a/common/commands.cpp +++ b/common/commands.cpp | |||
@@ -57,6 +57,8 @@ QByteArray name(int commandId) | |||
57 | return "Notification"; | 57 | return "Notification"; |
58 | case PingCommand: | 58 | case PingCommand: |
59 | return "Ping"; | 59 | return "Ping"; |
60 | case RevisionReplayedCommand: | ||
61 | return "RevisionReplayed"; | ||
60 | case CustomCommand: | 62 | case CustomCommand: |
61 | return "Custom"; | 63 | return "Custom"; |
62 | }; | 64 | }; |
diff --git a/common/commands.h b/common/commands.h index da2438a..c68ef90 100644 --- a/common/commands.h +++ b/common/commands.h | |||
@@ -46,6 +46,7 @@ enum CommandIds { | |||
46 | ShutdownCommand, | 46 | ShutdownCommand, |
47 | NotificationCommand, | 47 | NotificationCommand, |
48 | PingCommand, | 48 | PingCommand, |
49 | RevisionReplayedCommand, | ||
49 | CustomCommand = 0xffff | 50 | CustomCommand = 0xffff |
50 | }; | 51 | }; |
51 | 52 | ||
diff --git a/common/commands/revisionreplayed.fbs b/common/commands/revisionreplayed.fbs new file mode 100644 index 0000000..e1b11e3 --- /dev/null +++ b/common/commands/revisionreplayed.fbs | |||
@@ -0,0 +1,7 @@ | |||
1 | namespace Akonadi2.Commands; | ||
2 | |||
3 | table RevisionReplayed { | ||
4 | revision: ulong; | ||
5 | } | ||
6 | |||
7 | root_type RevisionReplayed; | ||
diff --git a/common/facade.h b/common/facade.h index 8b37579..a0971a1 100644 --- a/common/facade.h +++ b/common/facade.h | |||
@@ -251,9 +251,10 @@ public: | |||
251 | future.setFinished(); | 251 | future.setFinished(); |
252 | return; | 252 | return; |
253 | } | 253 | } |
254 | load(query, resultProvider, oldRevision).template then<void, qint64>([&future](qint64 queriedRevision) { | 254 | load(query, resultProvider, oldRevision).template then<void, qint64>([&future, this](qint64 queriedRevision) { |
255 | //TODO set revision in result provider? | 255 | //TODO set revision in result provider? |
256 | //TODO update all existing results with new revision | 256 | //TODO update all existing results with new revision |
257 | mResourceAccess->sendRevisionReplayedCommand(queriedRevision); | ||
257 | future.setValue(queriedRevision); | 258 | future.setValue(queriedRevision); |
258 | future.setFinished(); | 259 | future.setFinished(); |
259 | }).exec(); | 260 | }).exec(); |
diff --git a/common/genericresource.cpp b/common/genericresource.cpp index 2a0d6bd..313d99c 100644 --- a/common/genericresource.cpp +++ b/common/genericresource.cpp | |||
@@ -187,23 +187,6 @@ GenericResource::GenericResource(const QByteArray &resourceInstanceIdentifier, c | |||
187 | QObject::connect(mProcessor, &Processor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); }); | 187 | QObject::connect(mProcessor, &Processor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); }); |
188 | QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, this, &Resource::revisionUpdated); | 188 | QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, this, &Resource::revisionUpdated); |
189 | 189 | ||
190 | //We simply drop revisions with 100ms delay until we have better information from clients and writeback | ||
191 | //FIXME On startup, read the latest revision that is replayed to initialize. Then bump revision when change-replay and | ||
192 | //all clients have advanced to a later revision. | ||
193 | QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, [this](qint64 revision) { | ||
194 | QTimer *dropRevisionTimer = new QTimer(); | ||
195 | dropRevisionTimer->setInterval(100); | ||
196 | dropRevisionTimer->setSingleShot(true); | ||
197 | auto processor = QPointer<Processor>(mProcessor); | ||
198 | QObject::connect(dropRevisionTimer, &QTimer::timeout, dropRevisionTimer, [processor, revision, dropRevisionTimer]() { | ||
199 | if (processor) { | ||
200 | processor->setOldestUsedRevision(revision); | ||
201 | } | ||
202 | delete dropRevisionTimer; | ||
203 | }); | ||
204 | dropRevisionTimer->start(); | ||
205 | }); | ||
206 | |||
207 | mCommitQueueTimer.setInterval(100); | 190 | mCommitQueueTimer.setInterval(100); |
208 | mCommitQueueTimer.setSingleShot(true); | 191 | mCommitQueueTimer.setSingleShot(true); |
209 | QObject::connect(&mCommitQueueTimer, &QTimer::timeout, &mUserQueue, &MessageQueue::commit); | 192 | QObject::connect(&mCommitQueueTimer, &QTimer::timeout, &mUserQueue, &MessageQueue::commit); |
@@ -283,4 +266,9 @@ KAsync::Job<void> GenericResource::processAllMessages() | |||
283 | }); | 266 | }); |
284 | } | 267 | } |
285 | 268 | ||
269 | void GenericResource::setLowerBoundRevision(qint64 revision) | ||
270 | { | ||
271 | mProcessor->setOldestUsedRevision(revision); | ||
272 | } | ||
273 | |||
286 | #include "genericresource.moc" | 274 | #include "genericresource.moc" |
diff --git a/common/genericresource.h b/common/genericresource.h index 2d171b6..052a9f5 100644 --- a/common/genericresource.h +++ b/common/genericresource.h | |||
@@ -43,6 +43,7 @@ public: | |||
43 | virtual void processCommand(int commandId, const QByteArray &data) Q_DECL_OVERRIDE; | 43 | virtual void processCommand(int commandId, const QByteArray &data) Q_DECL_OVERRIDE; |
44 | virtual KAsync::Job<void> synchronizeWithSource() Q_DECL_OVERRIDE = 0; | 44 | virtual KAsync::Job<void> synchronizeWithSource() Q_DECL_OVERRIDE = 0; |
45 | virtual KAsync::Job<void> processAllMessages() Q_DECL_OVERRIDE; | 45 | virtual KAsync::Job<void> processAllMessages() Q_DECL_OVERRIDE; |
46 | virtual void setLowerBoundRevision(qint64 revision) Q_DECL_OVERRIDE; | ||
46 | 47 | ||
47 | int error() const; | 48 | int error() const; |
48 | 49 | ||
diff --git a/common/listener.cpp b/common/listener.cpp index 9773835..16da770 100644 --- a/common/listener.cpp +++ b/common/listener.cpp | |||
@@ -30,6 +30,7 @@ | |||
30 | #include "common/revisionupdate_generated.h" | 30 | #include "common/revisionupdate_generated.h" |
31 | #include "common/synchronize_generated.h" | 31 | #include "common/synchronize_generated.h" |
32 | #include "common/notification_generated.h" | 32 | #include "common/notification_generated.h" |
33 | #include "common/revisionreplayed_generated.h" | ||
33 | 34 | ||
34 | #include <QLocalServer> | 35 | #include <QLocalServer> |
35 | #include <QLocalSocket> | 36 | #include <QLocalSocket> |
@@ -250,6 +251,21 @@ void Listener::processCommand(int commandId, uint messageId, const QByteArray &c | |||
250 | case Akonadi2::Commands::PingCommand: | 251 | case Akonadi2::Commands::PingCommand: |
251 | Log() << QString("\tReceived ping command from %1").arg(client.name); | 252 | Log() << QString("\tReceived ping command from %1").arg(client.name); |
252 | break; | 253 | break; |
254 | case Akonadi2::Commands::RevisionReplayedCommand: { | ||
255 | Log() << QString("\tReceived revision replayed command from %1").arg(client.name); | ||
256 | flatbuffers::Verifier verifier((const uint8_t *)commandBuffer.constData(), commandBuffer.size()); | ||
257 | if (Akonadi2::Commands::VerifyRevisionReplayedBuffer(verifier)) { | ||
258 | auto buffer = Akonadi2::Commands::GetRevisionReplayed(commandBuffer.constData()); | ||
259 | client.currentRevision = buffer->revision(); | ||
260 | } else { | ||
261 | Warning() << "received invalid command"; | ||
262 | } | ||
263 | loadResource(); | ||
264 | if (m_resource) { | ||
265 | m_resource->setLowerBoundRevision(lowerBoundRevision()); | ||
266 | } | ||
267 | } | ||
268 | break; | ||
253 | default: | 269 | default: |
254 | if (commandId > Akonadi2::Commands::CustomCommand) { | 270 | if (commandId > Akonadi2::Commands::CustomCommand) { |
255 | Log() << QString("\tReceived custom command from %1: ").arg(client.name) << commandId; | 271 | Log() << QString("\tReceived custom command from %1: ").arg(client.name) << commandId; |
@@ -258,14 +274,28 @@ void Listener::processCommand(int commandId, uint messageId, const QByteArray &c | |||
258 | m_resource->processCommand(commandId, commandBuffer); | 274 | m_resource->processCommand(commandId, commandBuffer); |
259 | } | 275 | } |
260 | } else { | 276 | } else { |
261 | Warning() << QString("\tReceived invalid command from %1: ").arg(client.name) << commandId; | 277 | ErrorMsg() << QString("\tReceived invalid command from %1: ").arg(client.name) << commandId; |
262 | //TODO: handle error: we don't know wtf this command is | ||
263 | } | 278 | } |
264 | break; | 279 | break; |
265 | } | 280 | } |
266 | callback(); | 281 | callback(); |
267 | } | 282 | } |
268 | 283 | ||
284 | qint64 Listener::lowerBoundRevision() | ||
285 | { | ||
286 | qint64 lowerBound = 0; | ||
287 | for (const Client &c : m_connections) { | ||
288 | if (c.currentRevision > 0) { | ||
289 | if (lowerBound == 0) { | ||
290 | lowerBound = c.currentRevision; | ||
291 | } else { | ||
292 | lowerBound = qMin(c.currentRevision, lowerBound); | ||
293 | } | ||
294 | } | ||
295 | } | ||
296 | return lowerBound; | ||
297 | } | ||
298 | |||
269 | void Listener::quit() | 299 | void Listener::quit() |
270 | { | 300 | { |
271 | //Broadcast shutdown notifications to open clients, so they don't try to restart the resource | 301 | //Broadcast shutdown notifications to open clients, so they don't try to restart the resource |
diff --git a/common/listener.h b/common/listener.h index 30807d7..8f89d23 100644 --- a/common/listener.h +++ b/common/listener.h | |||
@@ -37,19 +37,22 @@ class Client | |||
37 | { | 37 | { |
38 | public: | 38 | public: |
39 | Client() | 39 | Client() |
40 | : socket(nullptr) | 40 | : socket(nullptr), |
41 | currentRevision(0) | ||
41 | { | 42 | { |
42 | } | 43 | } |
43 | 44 | ||
44 | Client(const QString &n, QLocalSocket *s) | 45 | Client(const QString &n, QLocalSocket *s) |
45 | : name(n), | 46 | : name(n), |
46 | socket(s) | 47 | socket(s), |
48 | currentRevision(0) | ||
47 | { | 49 | { |
48 | } | 50 | } |
49 | 51 | ||
50 | QString name; | 52 | QString name; |
51 | QPointer<QLocalSocket> socket; | 53 | QPointer<QLocalSocket> socket; |
52 | QByteArray commandBuffer; | 54 | QByteArray commandBuffer; |
55 | qint64 currentRevision; | ||
53 | }; | 56 | }; |
54 | 57 | ||
55 | class Listener : public QObject | 58 | class Listener : public QObject |
@@ -82,6 +85,7 @@ private: | |||
82 | void updateClientsWithRevision(qint64); | 85 | void updateClientsWithRevision(qint64); |
83 | void loadResource(); | 86 | void loadResource(); |
84 | void readFromSocket(QLocalSocket *socket); | 87 | void readFromSocket(QLocalSocket *socket); |
88 | qint64 lowerBoundRevision(); | ||
85 | 89 | ||
86 | QLocalServer *m_server; | 90 | QLocalServer *m_server; |
87 | QVector<Client> m_connections; | 91 | QVector<Client> m_connections; |
diff --git a/common/resource.cpp b/common/resource.cpp index 58ba82f..4541726 100644 --- a/common/resource.cpp +++ b/common/resource.cpp | |||
@@ -58,6 +58,11 @@ KAsync::Job<void> Resource::processAllMessages() | |||
58 | return KAsync::null<void>(); | 58 | return KAsync::null<void>(); |
59 | } | 59 | } |
60 | 60 | ||
61 | void Resource::setLowerBoundRevision(qint64 revision) | ||
62 | { | ||
63 | Q_UNUSED(revision) | ||
64 | } | ||
65 | |||
61 | class ResourceFactory::Private | 66 | class ResourceFactory::Private |
62 | { | 67 | { |
63 | public: | 68 | public: |
diff --git a/common/resource.h b/common/resource.h index 1a97aeb..33805ca 100644 --- a/common/resource.h +++ b/common/resource.h | |||
@@ -39,6 +39,7 @@ public: | |||
39 | virtual void processCommand(int commandId, const QByteArray &data); | 39 | virtual void processCommand(int commandId, const QByteArray &data); |
40 | virtual KAsync::Job<void> synchronizeWithSource(); | 40 | virtual KAsync::Job<void> synchronizeWithSource(); |
41 | virtual KAsync::Job<void> processAllMessages(); | 41 | virtual KAsync::Job<void> processAllMessages(); |
42 | virtual void setLowerBoundRevision(qint64 revision); | ||
42 | 43 | ||
43 | Q_SIGNALS: | 44 | Q_SIGNALS: |
44 | void revisionUpdated(qint64); | 45 | void revisionUpdated(qint64); |
diff --git a/common/resourceaccess.cpp b/common/resourceaccess.cpp index 84bc4ea..7f9306b 100644 --- a/common/resourceaccess.cpp +++ b/common/resourceaccess.cpp | |||
@@ -29,6 +29,7 @@ | |||
29 | #include "common/createentity_generated.h" | 29 | #include "common/createentity_generated.h" |
30 | #include "common/modifyentity_generated.h" | 30 | #include "common/modifyentity_generated.h" |
31 | #include "common/deleteentity_generated.h" | 31 | #include "common/deleteentity_generated.h" |
32 | #include "common/revisionreplayed_generated.h" | ||
32 | #include "common/entitybuffer.h" | 33 | #include "common/entitybuffer.h" |
33 | #include "log.h" | 34 | #include "log.h" |
34 | 35 | ||
@@ -325,6 +326,15 @@ KAsync::Job<void> ResourceAccess::sendDeleteCommand(const QByteArray &uid, qint6 | |||
325 | return sendCommand(Akonadi2::Commands::DeleteEntityCommand, fbb); | 326 | return sendCommand(Akonadi2::Commands::DeleteEntityCommand, fbb); |
326 | } | 327 | } |
327 | 328 | ||
329 | KAsync::Job<void> ResourceAccess::sendRevisionReplayedCommand(qint64 revision) | ||
330 | { | ||
331 | flatbuffers::FlatBufferBuilder fbb; | ||
332 | auto location = Akonadi2::Commands::CreateRevisionReplayed(fbb, revision); | ||
333 | Akonadi2::Commands::FinishRevisionReplayedBuffer(fbb, location); | ||
334 | open(); | ||
335 | return sendCommand(Akonadi2::Commands::RevisionReplayedCommand, fbb); | ||
336 | } | ||
337 | |||
328 | void ResourceAccess::open() | 338 | void ResourceAccess::open() |
329 | { | 339 | { |
330 | if (d->socket && d->socket->isValid()) { | 340 | if (d->socket && d->socket->isValid()) { |
diff --git a/common/resourceaccess.h b/common/resourceaccess.h index 1ff9ca6..8e27054 100644 --- a/common/resourceaccess.h +++ b/common/resourceaccess.h | |||
@@ -46,6 +46,7 @@ public: | |||
46 | virtual KAsync::Job<void> sendCreateCommand(const QByteArray &resourceBufferType, const QByteArray &buffer) { return KAsync::null<void>(); }; | 46 | virtual KAsync::Job<void> sendCreateCommand(const QByteArray &resourceBufferType, const QByteArray &buffer) { return KAsync::null<void>(); }; |
47 | virtual KAsync::Job<void> sendModifyCommand(const QByteArray &uid, qint64 revision, const QByteArray &resourceBufferType, const QByteArrayList &deletedProperties, const QByteArray &buffer) { return KAsync::null<void>(); }; | 47 | virtual KAsync::Job<void> sendModifyCommand(const QByteArray &uid, qint64 revision, const QByteArray &resourceBufferType, const QByteArrayList &deletedProperties, const QByteArray &buffer) { return KAsync::null<void>(); }; |
48 | virtual KAsync::Job<void> sendDeleteCommand(const QByteArray &uid, qint64 revision, const QByteArray &resourceBufferType) { return KAsync::null<void>(); }; | 48 | virtual KAsync::Job<void> sendDeleteCommand(const QByteArray &uid, qint64 revision, const QByteArray &resourceBufferType) { return KAsync::null<void>(); }; |
49 | virtual KAsync::Job<void> sendRevisionReplayedCommand(qint64 revision) {return KAsync::null<void>(); }; | ||
49 | 50 | ||
50 | Q_SIGNALS: | 51 | Q_SIGNALS: |
51 | void ready(bool isReady); | 52 | void ready(bool isReady); |
@@ -69,9 +70,10 @@ public: | |||
69 | KAsync::Job<void> sendCommand(int commandId) Q_DECL_OVERRIDE; | 70 | KAsync::Job<void> sendCommand(int commandId) Q_DECL_OVERRIDE; |
70 | KAsync::Job<void> sendCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb) Q_DECL_OVERRIDE; | 71 | KAsync::Job<void> sendCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb) Q_DECL_OVERRIDE; |
71 | KAsync::Job<void> synchronizeResource(bool remoteSync, bool localSync) Q_DECL_OVERRIDE; | 72 | KAsync::Job<void> synchronizeResource(bool remoteSync, bool localSync) Q_DECL_OVERRIDE; |
72 | KAsync::Job<void> sendCreateCommand(const QByteArray &resourceBufferType, const QByteArray &buffer); | 73 | KAsync::Job<void> sendCreateCommand(const QByteArray &resourceBufferType, const QByteArray &buffer) Q_DECL_OVERRIDE; |
73 | KAsync::Job<void> sendModifyCommand(const QByteArray &uid, qint64 revision, const QByteArray &resourceBufferType, const QByteArrayList &deletedProperties, const QByteArray &buffer); | 74 | KAsync::Job<void> sendModifyCommand(const QByteArray &uid, qint64 revision, const QByteArray &resourceBufferType, const QByteArrayList &deletedProperties, const QByteArray &buffer) Q_DECL_OVERRIDE; |
74 | KAsync::Job<void> sendDeleteCommand(const QByteArray &uid, qint64 revision, const QByteArray &resourceBufferType); | 75 | KAsync::Job<void> sendDeleteCommand(const QByteArray &uid, qint64 revision, const QByteArray &resourceBufferType) Q_DECL_OVERRIDE; |
76 | KAsync::Job<void> sendRevisionReplayedCommand(qint64 revision) Q_DECL_OVERRIDE; | ||
75 | /** | 77 | /** |
76 | * Tries to connect to server, and returns a connected socket on success. | 78 | * Tries to connect to server, and returns a connected socket on success. |
77 | */ | 79 | */ |