summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--common/CMakeLists.txt1
-rw-r--r--common/commands.cpp2
-rw-r--r--common/commands.h1
-rw-r--r--common/commands/revisionreplayed.fbs7
-rw-r--r--common/facade.h3
-rw-r--r--common/genericresource.cpp22
-rw-r--r--common/genericresource.h1
-rw-r--r--common/listener.cpp34
-rw-r--r--common/listener.h8
-rw-r--r--common/resource.cpp5
-rw-r--r--common/resource.h1
-rw-r--r--common/resourceaccess.cpp10
-rw-r--r--common/resourceaccess.h8
13 files changed, 78 insertions, 25 deletions
diff --git a/common/CMakeLists.txt b/common/CMakeLists.txt
index 25ea667..61019b3 100644
--- a/common/CMakeLists.txt
+++ b/common/CMakeLists.txt
@@ -51,6 +51,7 @@ generate_flatbuffers(
51 commands/revisionupdate 51 commands/revisionupdate
52 commands/synchronize 52 commands/synchronize
53 commands/notification 53 commands/notification
54 commands/revisionreplayed
54 domain/event 55 domain/event
55 domain/mail 56 domain/mail
56 entity 57 entity
diff --git a/common/commands.cpp b/common/commands.cpp
index 221c211..7a0ae23 100644
--- a/common/commands.cpp
+++ b/common/commands.cpp
@@ -57,6 +57,8 @@ QByteArray name(int commandId)
57 return "Notification"; 57 return "Notification";
58 case PingCommand: 58 case PingCommand:
59 return "Ping"; 59 return "Ping";
60 case RevisionReplayedCommand:
61 return "RevisionReplayed";
60 case CustomCommand: 62 case CustomCommand:
61 return "Custom"; 63 return "Custom";
62 }; 64 };
diff --git a/common/commands.h b/common/commands.h
index da2438a..c68ef90 100644
--- a/common/commands.h
+++ b/common/commands.h
@@ -46,6 +46,7 @@ enum CommandIds {
46 ShutdownCommand, 46 ShutdownCommand,
47 NotificationCommand, 47 NotificationCommand,
48 PingCommand, 48 PingCommand,
49 RevisionReplayedCommand,
49 CustomCommand = 0xffff 50 CustomCommand = 0xffff
50}; 51};
51 52
diff --git a/common/commands/revisionreplayed.fbs b/common/commands/revisionreplayed.fbs
new file mode 100644
index 0000000..e1b11e3
--- /dev/null
+++ b/common/commands/revisionreplayed.fbs
@@ -0,0 +1,7 @@
1namespace Akonadi2.Commands;
2
3table RevisionReplayed {
4 revision: ulong;
5}
6
7root_type RevisionReplayed;
diff --git a/common/facade.h b/common/facade.h
index 8b37579..a0971a1 100644
--- a/common/facade.h
+++ b/common/facade.h
@@ -251,9 +251,10 @@ public:
251 future.setFinished(); 251 future.setFinished();
252 return; 252 return;
253 } 253 }
254 load(query, resultProvider, oldRevision).template then<void, qint64>([&future](qint64 queriedRevision) { 254 load(query, resultProvider, oldRevision).template then<void, qint64>([&future, this](qint64 queriedRevision) {
255 //TODO set revision in result provider? 255 //TODO set revision in result provider?
256 //TODO update all existing results with new revision 256 //TODO update all existing results with new revision
257 mResourceAccess->sendRevisionReplayedCommand(queriedRevision);
257 future.setValue(queriedRevision); 258 future.setValue(queriedRevision);
258 future.setFinished(); 259 future.setFinished();
259 }).exec(); 260 }).exec();
diff --git a/common/genericresource.cpp b/common/genericresource.cpp
index 2a0d6bd..313d99c 100644
--- a/common/genericresource.cpp
+++ b/common/genericresource.cpp
@@ -187,23 +187,6 @@ GenericResource::GenericResource(const QByteArray &resourceInstanceIdentifier, c
187 QObject::connect(mProcessor, &Processor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); }); 187 QObject::connect(mProcessor, &Processor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); });
188 QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, this, &Resource::revisionUpdated); 188 QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, this, &Resource::revisionUpdated);
189 189
190 //We simply drop revisions with 100ms delay until we have better information from clients and writeback
191 //FIXME On startup, read the latest revision that is replayed to initialize. Then bump revision when change-replay and
192 //all clients have advanced to a later revision.
193 QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, [this](qint64 revision) {
194 QTimer *dropRevisionTimer = new QTimer();
195 dropRevisionTimer->setInterval(100);
196 dropRevisionTimer->setSingleShot(true);
197 auto processor = QPointer<Processor>(mProcessor);
198 QObject::connect(dropRevisionTimer, &QTimer::timeout, dropRevisionTimer, [processor, revision, dropRevisionTimer]() {
199 if (processor) {
200 processor->setOldestUsedRevision(revision);
201 }
202 delete dropRevisionTimer;
203 });
204 dropRevisionTimer->start();
205 });
206
207 mCommitQueueTimer.setInterval(100); 190 mCommitQueueTimer.setInterval(100);
208 mCommitQueueTimer.setSingleShot(true); 191 mCommitQueueTimer.setSingleShot(true);
209 QObject::connect(&mCommitQueueTimer, &QTimer::timeout, &mUserQueue, &MessageQueue::commit); 192 QObject::connect(&mCommitQueueTimer, &QTimer::timeout, &mUserQueue, &MessageQueue::commit);
@@ -283,4 +266,9 @@ KAsync::Job<void> GenericResource::processAllMessages()
283 }); 266 });
284} 267}
285 268
269void GenericResource::setLowerBoundRevision(qint64 revision)
270{
271 mProcessor->setOldestUsedRevision(revision);
272}
273
286#include "genericresource.moc" 274#include "genericresource.moc"
diff --git a/common/genericresource.h b/common/genericresource.h
index 2d171b6..052a9f5 100644
--- a/common/genericresource.h
+++ b/common/genericresource.h
@@ -43,6 +43,7 @@ public:
43 virtual void processCommand(int commandId, const QByteArray &data) Q_DECL_OVERRIDE; 43 virtual void processCommand(int commandId, const QByteArray &data) Q_DECL_OVERRIDE;
44 virtual KAsync::Job<void> synchronizeWithSource() Q_DECL_OVERRIDE = 0; 44 virtual KAsync::Job<void> synchronizeWithSource() Q_DECL_OVERRIDE = 0;
45 virtual KAsync::Job<void> processAllMessages() Q_DECL_OVERRIDE; 45 virtual KAsync::Job<void> processAllMessages() Q_DECL_OVERRIDE;
46 virtual void setLowerBoundRevision(qint64 revision) Q_DECL_OVERRIDE;
46 47
47 int error() const; 48 int error() const;
48 49
diff --git a/common/listener.cpp b/common/listener.cpp
index 9773835..16da770 100644
--- a/common/listener.cpp
+++ b/common/listener.cpp
@@ -30,6 +30,7 @@
30#include "common/revisionupdate_generated.h" 30#include "common/revisionupdate_generated.h"
31#include "common/synchronize_generated.h" 31#include "common/synchronize_generated.h"
32#include "common/notification_generated.h" 32#include "common/notification_generated.h"
33#include "common/revisionreplayed_generated.h"
33 34
34#include <QLocalServer> 35#include <QLocalServer>
35#include <QLocalSocket> 36#include <QLocalSocket>
@@ -250,6 +251,21 @@ void Listener::processCommand(int commandId, uint messageId, const QByteArray &c
250 case Akonadi2::Commands::PingCommand: 251 case Akonadi2::Commands::PingCommand:
251 Log() << QString("\tReceived ping command from %1").arg(client.name); 252 Log() << QString("\tReceived ping command from %1").arg(client.name);
252 break; 253 break;
254 case Akonadi2::Commands::RevisionReplayedCommand: {
255 Log() << QString("\tReceived revision replayed command from %1").arg(client.name);
256 flatbuffers::Verifier verifier((const uint8_t *)commandBuffer.constData(), commandBuffer.size());
257 if (Akonadi2::Commands::VerifyRevisionReplayedBuffer(verifier)) {
258 auto buffer = Akonadi2::Commands::GetRevisionReplayed(commandBuffer.constData());
259 client.currentRevision = buffer->revision();
260 } else {
261 Warning() << "received invalid command";
262 }
263 loadResource();
264 if (m_resource) {
265 m_resource->setLowerBoundRevision(lowerBoundRevision());
266 }
267 }
268 break;
253 default: 269 default:
254 if (commandId > Akonadi2::Commands::CustomCommand) { 270 if (commandId > Akonadi2::Commands::CustomCommand) {
255 Log() << QString("\tReceived custom command from %1: ").arg(client.name) << commandId; 271 Log() << QString("\tReceived custom command from %1: ").arg(client.name) << commandId;
@@ -258,14 +274,28 @@ void Listener::processCommand(int commandId, uint messageId, const QByteArray &c
258 m_resource->processCommand(commandId, commandBuffer); 274 m_resource->processCommand(commandId, commandBuffer);
259 } 275 }
260 } else { 276 } else {
261 Warning() << QString("\tReceived invalid command from %1: ").arg(client.name) << commandId; 277 ErrorMsg() << QString("\tReceived invalid command from %1: ").arg(client.name) << commandId;
262 //TODO: handle error: we don't know wtf this command is
263 } 278 }
264 break; 279 break;
265 } 280 }
266 callback(); 281 callback();
267} 282}
268 283
284qint64 Listener::lowerBoundRevision()
285{
286 qint64 lowerBound = 0;
287 for (const Client &c : m_connections) {
288 if (c.currentRevision > 0) {
289 if (lowerBound == 0) {
290 lowerBound = c.currentRevision;
291 } else {
292 lowerBound = qMin(c.currentRevision, lowerBound);
293 }
294 }
295 }
296 return lowerBound;
297}
298
269void Listener::quit() 299void Listener::quit()
270{ 300{
271 //Broadcast shutdown notifications to open clients, so they don't try to restart the resource 301 //Broadcast shutdown notifications to open clients, so they don't try to restart the resource
diff --git a/common/listener.h b/common/listener.h
index 30807d7..8f89d23 100644
--- a/common/listener.h
+++ b/common/listener.h
@@ -37,19 +37,22 @@ class Client
37{ 37{
38public: 38public:
39 Client() 39 Client()
40 : socket(nullptr) 40 : socket(nullptr),
41 currentRevision(0)
41 { 42 {
42 } 43 }
43 44
44 Client(const QString &n, QLocalSocket *s) 45 Client(const QString &n, QLocalSocket *s)
45 : name(n), 46 : name(n),
46 socket(s) 47 socket(s),
48 currentRevision(0)
47 { 49 {
48 } 50 }
49 51
50 QString name; 52 QString name;
51 QPointer<QLocalSocket> socket; 53 QPointer<QLocalSocket> socket;
52 QByteArray commandBuffer; 54 QByteArray commandBuffer;
55 qint64 currentRevision;
53}; 56};
54 57
55class Listener : public QObject 58class Listener : public QObject
@@ -82,6 +85,7 @@ private:
82 void updateClientsWithRevision(qint64); 85 void updateClientsWithRevision(qint64);
83 void loadResource(); 86 void loadResource();
84 void readFromSocket(QLocalSocket *socket); 87 void readFromSocket(QLocalSocket *socket);
88 qint64 lowerBoundRevision();
85 89
86 QLocalServer *m_server; 90 QLocalServer *m_server;
87 QVector<Client> m_connections; 91 QVector<Client> m_connections;
diff --git a/common/resource.cpp b/common/resource.cpp
index 58ba82f..4541726 100644
--- a/common/resource.cpp
+++ b/common/resource.cpp
@@ -58,6 +58,11 @@ KAsync::Job<void> Resource::processAllMessages()
58 return KAsync::null<void>(); 58 return KAsync::null<void>();
59} 59}
60 60
61void Resource::setLowerBoundRevision(qint64 revision)
62{
63 Q_UNUSED(revision)
64}
65
61class ResourceFactory::Private 66class ResourceFactory::Private
62{ 67{
63public: 68public:
diff --git a/common/resource.h b/common/resource.h
index 1a97aeb..33805ca 100644
--- a/common/resource.h
+++ b/common/resource.h
@@ -39,6 +39,7 @@ public:
39 virtual void processCommand(int commandId, const QByteArray &data); 39 virtual void processCommand(int commandId, const QByteArray &data);
40 virtual KAsync::Job<void> synchronizeWithSource(); 40 virtual KAsync::Job<void> synchronizeWithSource();
41 virtual KAsync::Job<void> processAllMessages(); 41 virtual KAsync::Job<void> processAllMessages();
42 virtual void setLowerBoundRevision(qint64 revision);
42 43
43Q_SIGNALS: 44Q_SIGNALS:
44 void revisionUpdated(qint64); 45 void revisionUpdated(qint64);
diff --git a/common/resourceaccess.cpp b/common/resourceaccess.cpp
index 84bc4ea..7f9306b 100644
--- a/common/resourceaccess.cpp
+++ b/common/resourceaccess.cpp
@@ -29,6 +29,7 @@
29#include "common/createentity_generated.h" 29#include "common/createentity_generated.h"
30#include "common/modifyentity_generated.h" 30#include "common/modifyentity_generated.h"
31#include "common/deleteentity_generated.h" 31#include "common/deleteentity_generated.h"
32#include "common/revisionreplayed_generated.h"
32#include "common/entitybuffer.h" 33#include "common/entitybuffer.h"
33#include "log.h" 34#include "log.h"
34 35
@@ -325,6 +326,15 @@ KAsync::Job<void> ResourceAccess::sendDeleteCommand(const QByteArray &uid, qint6
325 return sendCommand(Akonadi2::Commands::DeleteEntityCommand, fbb); 326 return sendCommand(Akonadi2::Commands::DeleteEntityCommand, fbb);
326} 327}
327 328
329KAsync::Job<void> ResourceAccess::sendRevisionReplayedCommand(qint64 revision)
330{
331 flatbuffers::FlatBufferBuilder fbb;
332 auto location = Akonadi2::Commands::CreateRevisionReplayed(fbb, revision);
333 Akonadi2::Commands::FinishRevisionReplayedBuffer(fbb, location);
334 open();
335 return sendCommand(Akonadi2::Commands::RevisionReplayedCommand, fbb);
336}
337
328void ResourceAccess::open() 338void ResourceAccess::open()
329{ 339{
330 if (d->socket && d->socket->isValid()) { 340 if (d->socket && d->socket->isValid()) {
diff --git a/common/resourceaccess.h b/common/resourceaccess.h
index 1ff9ca6..8e27054 100644
--- a/common/resourceaccess.h
+++ b/common/resourceaccess.h
@@ -46,6 +46,7 @@ public:
46 virtual KAsync::Job<void> sendCreateCommand(const QByteArray &resourceBufferType, const QByteArray &buffer) { return KAsync::null<void>(); }; 46 virtual KAsync::Job<void> sendCreateCommand(const QByteArray &resourceBufferType, const QByteArray &buffer) { return KAsync::null<void>(); };
47 virtual KAsync::Job<void> sendModifyCommand(const QByteArray &uid, qint64 revision, const QByteArray &resourceBufferType, const QByteArrayList &deletedProperties, const QByteArray &buffer) { return KAsync::null<void>(); }; 47 virtual KAsync::Job<void> sendModifyCommand(const QByteArray &uid, qint64 revision, const QByteArray &resourceBufferType, const QByteArrayList &deletedProperties, const QByteArray &buffer) { return KAsync::null<void>(); };
48 virtual KAsync::Job<void> sendDeleteCommand(const QByteArray &uid, qint64 revision, const QByteArray &resourceBufferType) { return KAsync::null<void>(); }; 48 virtual KAsync::Job<void> sendDeleteCommand(const QByteArray &uid, qint64 revision, const QByteArray &resourceBufferType) { return KAsync::null<void>(); };
49 virtual KAsync::Job<void> sendRevisionReplayedCommand(qint64 revision) {return KAsync::null<void>(); };
49 50
50Q_SIGNALS: 51Q_SIGNALS:
51 void ready(bool isReady); 52 void ready(bool isReady);
@@ -69,9 +70,10 @@ public:
69 KAsync::Job<void> sendCommand(int commandId) Q_DECL_OVERRIDE; 70 KAsync::Job<void> sendCommand(int commandId) Q_DECL_OVERRIDE;
70 KAsync::Job<void> sendCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb) Q_DECL_OVERRIDE; 71 KAsync::Job<void> sendCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb) Q_DECL_OVERRIDE;
71 KAsync::Job<void> synchronizeResource(bool remoteSync, bool localSync) Q_DECL_OVERRIDE; 72 KAsync::Job<void> synchronizeResource(bool remoteSync, bool localSync) Q_DECL_OVERRIDE;
72 KAsync::Job<void> sendCreateCommand(const QByteArray &resourceBufferType, const QByteArray &buffer); 73 KAsync::Job<void> sendCreateCommand(const QByteArray &resourceBufferType, const QByteArray &buffer) Q_DECL_OVERRIDE;
73 KAsync::Job<void> sendModifyCommand(const QByteArray &uid, qint64 revision, const QByteArray &resourceBufferType, const QByteArrayList &deletedProperties, const QByteArray &buffer); 74 KAsync::Job<void> sendModifyCommand(const QByteArray &uid, qint64 revision, const QByteArray &resourceBufferType, const QByteArrayList &deletedProperties, const QByteArray &buffer) Q_DECL_OVERRIDE;
74 KAsync::Job<void> sendDeleteCommand(const QByteArray &uid, qint64 revision, const QByteArray &resourceBufferType); 75 KAsync::Job<void> sendDeleteCommand(const QByteArray &uid, qint64 revision, const QByteArray &resourceBufferType) Q_DECL_OVERRIDE;
76 KAsync::Job<void> sendRevisionReplayedCommand(qint64 revision) Q_DECL_OVERRIDE;
75 /** 77 /**
76 * Tries to connect to server, and returns a connected socket on success. 78 * Tries to connect to server, and returns a connected socket on success.
77 */ 79 */