From 43c4cd555e4a265d3e484dfeea0aa05da0977cd0 Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Mon, 19 Oct 2015 15:01:02 +0200 Subject: Let clients tell the resource when they no longer require a revision. --- common/CMakeLists.txt | 1 + common/commands.cpp | 2 ++ common/commands.h | 1 + common/commands/revisionreplayed.fbs | 7 +++++++ common/facade.h | 3 ++- common/genericresource.cpp | 22 +++++----------------- common/genericresource.h | 1 + common/listener.cpp | 34 ++++++++++++++++++++++++++++++++-- common/listener.h | 8 ++++++-- common/resource.cpp | 5 +++++ common/resource.h | 1 + common/resourceaccess.cpp | 10 ++++++++++ common/resourceaccess.h | 8 +++++--- 13 files changed, 78 insertions(+), 25 deletions(-) create mode 100644 common/commands/revisionreplayed.fbs (limited to 'common') diff --git a/common/CMakeLists.txt b/common/CMakeLists.txt index 25ea667..61019b3 100644 --- a/common/CMakeLists.txt +++ b/common/CMakeLists.txt @@ -51,6 +51,7 @@ generate_flatbuffers( commands/revisionupdate commands/synchronize commands/notification + commands/revisionreplayed domain/event domain/mail entity diff --git a/common/commands.cpp b/common/commands.cpp index 221c211..7a0ae23 100644 --- a/common/commands.cpp +++ b/common/commands.cpp @@ -57,6 +57,8 @@ QByteArray name(int commandId) return "Notification"; case PingCommand: return "Ping"; + case RevisionReplayedCommand: + return "RevisionReplayed"; case CustomCommand: return "Custom"; }; diff --git a/common/commands.h b/common/commands.h index da2438a..c68ef90 100644 --- a/common/commands.h +++ b/common/commands.h @@ -46,6 +46,7 @@ enum CommandIds { ShutdownCommand, NotificationCommand, PingCommand, + RevisionReplayedCommand, CustomCommand = 0xffff }; diff --git a/common/commands/revisionreplayed.fbs b/common/commands/revisionreplayed.fbs new file mode 100644 index 0000000..e1b11e3 --- /dev/null +++ b/common/commands/revisionreplayed.fbs @@ -0,0 +1,7 @@ +namespace Akonadi2.Commands; + +table RevisionReplayed { + revision: ulong; +} + +root_type RevisionReplayed; diff --git a/common/facade.h b/common/facade.h index 8b37579..a0971a1 100644 --- a/common/facade.h +++ b/common/facade.h @@ -251,9 +251,10 @@ public: future.setFinished(); return; } - load(query, resultProvider, oldRevision).template then([&future](qint64 queriedRevision) { + load(query, resultProvider, oldRevision).template then([&future, this](qint64 queriedRevision) { //TODO set revision in result provider? //TODO update all existing results with new revision + mResourceAccess->sendRevisionReplayedCommand(queriedRevision); future.setValue(queriedRevision); future.setFinished(); }).exec(); diff --git a/common/genericresource.cpp b/common/genericresource.cpp index 2a0d6bd..313d99c 100644 --- a/common/genericresource.cpp +++ b/common/genericresource.cpp @@ -187,23 +187,6 @@ GenericResource::GenericResource(const QByteArray &resourceInstanceIdentifier, c QObject::connect(mProcessor, &Processor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); }); QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, this, &Resource::revisionUpdated); - //We simply drop revisions with 100ms delay until we have better information from clients and writeback - //FIXME On startup, read the latest revision that is replayed to initialize. Then bump revision when change-replay and - //all clients have advanced to a later revision. - QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, [this](qint64 revision) { - QTimer *dropRevisionTimer = new QTimer(); - dropRevisionTimer->setInterval(100); - dropRevisionTimer->setSingleShot(true); - auto processor = QPointer(mProcessor); - QObject::connect(dropRevisionTimer, &QTimer::timeout, dropRevisionTimer, [processor, revision, dropRevisionTimer]() { - if (processor) { - processor->setOldestUsedRevision(revision); - } - delete dropRevisionTimer; - }); - dropRevisionTimer->start(); - }); - mCommitQueueTimer.setInterval(100); mCommitQueueTimer.setSingleShot(true); QObject::connect(&mCommitQueueTimer, &QTimer::timeout, &mUserQueue, &MessageQueue::commit); @@ -283,4 +266,9 @@ KAsync::Job GenericResource::processAllMessages() }); } +void GenericResource::setLowerBoundRevision(qint64 revision) +{ + mProcessor->setOldestUsedRevision(revision); +} + #include "genericresource.moc" diff --git a/common/genericresource.h b/common/genericresource.h index 2d171b6..052a9f5 100644 --- a/common/genericresource.h +++ b/common/genericresource.h @@ -43,6 +43,7 @@ public: virtual void processCommand(int commandId, const QByteArray &data) Q_DECL_OVERRIDE; virtual KAsync::Job synchronizeWithSource() Q_DECL_OVERRIDE = 0; virtual KAsync::Job processAllMessages() Q_DECL_OVERRIDE; + virtual void setLowerBoundRevision(qint64 revision) Q_DECL_OVERRIDE; int error() const; diff --git a/common/listener.cpp b/common/listener.cpp index 9773835..16da770 100644 --- a/common/listener.cpp +++ b/common/listener.cpp @@ -30,6 +30,7 @@ #include "common/revisionupdate_generated.h" #include "common/synchronize_generated.h" #include "common/notification_generated.h" +#include "common/revisionreplayed_generated.h" #include #include @@ -250,6 +251,21 @@ void Listener::processCommand(int commandId, uint messageId, const QByteArray &c case Akonadi2::Commands::PingCommand: Log() << QString("\tReceived ping command from %1").arg(client.name); break; + case Akonadi2::Commands::RevisionReplayedCommand: { + Log() << QString("\tReceived revision replayed command from %1").arg(client.name); + flatbuffers::Verifier verifier((const uint8_t *)commandBuffer.constData(), commandBuffer.size()); + if (Akonadi2::Commands::VerifyRevisionReplayedBuffer(verifier)) { + auto buffer = Akonadi2::Commands::GetRevisionReplayed(commandBuffer.constData()); + client.currentRevision = buffer->revision(); + } else { + Warning() << "received invalid command"; + } + loadResource(); + if (m_resource) { + m_resource->setLowerBoundRevision(lowerBoundRevision()); + } + } + break; default: if (commandId > Akonadi2::Commands::CustomCommand) { Log() << QString("\tReceived custom command from %1: ").arg(client.name) << commandId; @@ -258,14 +274,28 @@ void Listener::processCommand(int commandId, uint messageId, const QByteArray &c m_resource->processCommand(commandId, commandBuffer); } } else { - Warning() << QString("\tReceived invalid command from %1: ").arg(client.name) << commandId; - //TODO: handle error: we don't know wtf this command is + ErrorMsg() << QString("\tReceived invalid command from %1: ").arg(client.name) << commandId; } break; } callback(); } +qint64 Listener::lowerBoundRevision() +{ + qint64 lowerBound = 0; + for (const Client &c : m_connections) { + if (c.currentRevision > 0) { + if (lowerBound == 0) { + lowerBound = c.currentRevision; + } else { + lowerBound = qMin(c.currentRevision, lowerBound); + } + } + } + return lowerBound; +} + void Listener::quit() { //Broadcast shutdown notifications to open clients, so they don't try to restart the resource diff --git a/common/listener.h b/common/listener.h index 30807d7..8f89d23 100644 --- a/common/listener.h +++ b/common/listener.h @@ -37,19 +37,22 @@ class Client { public: Client() - : socket(nullptr) + : socket(nullptr), + currentRevision(0) { } Client(const QString &n, QLocalSocket *s) : name(n), - socket(s) + socket(s), + currentRevision(0) { } QString name; QPointer socket; QByteArray commandBuffer; + qint64 currentRevision; }; class Listener : public QObject @@ -82,6 +85,7 @@ private: void updateClientsWithRevision(qint64); void loadResource(); void readFromSocket(QLocalSocket *socket); + qint64 lowerBoundRevision(); QLocalServer *m_server; QVector m_connections; diff --git a/common/resource.cpp b/common/resource.cpp index 58ba82f..4541726 100644 --- a/common/resource.cpp +++ b/common/resource.cpp @@ -58,6 +58,11 @@ KAsync::Job Resource::processAllMessages() return KAsync::null(); } +void Resource::setLowerBoundRevision(qint64 revision) +{ + Q_UNUSED(revision) +} + class ResourceFactory::Private { public: diff --git a/common/resource.h b/common/resource.h index 1a97aeb..33805ca 100644 --- a/common/resource.h +++ b/common/resource.h @@ -39,6 +39,7 @@ public: virtual void processCommand(int commandId, const QByteArray &data); virtual KAsync::Job synchronizeWithSource(); virtual KAsync::Job processAllMessages(); + virtual void setLowerBoundRevision(qint64 revision); Q_SIGNALS: void revisionUpdated(qint64); diff --git a/common/resourceaccess.cpp b/common/resourceaccess.cpp index 84bc4ea..7f9306b 100644 --- a/common/resourceaccess.cpp +++ b/common/resourceaccess.cpp @@ -29,6 +29,7 @@ #include "common/createentity_generated.h" #include "common/modifyentity_generated.h" #include "common/deleteentity_generated.h" +#include "common/revisionreplayed_generated.h" #include "common/entitybuffer.h" #include "log.h" @@ -325,6 +326,15 @@ KAsync::Job ResourceAccess::sendDeleteCommand(const QByteArray &uid, qint6 return sendCommand(Akonadi2::Commands::DeleteEntityCommand, fbb); } +KAsync::Job ResourceAccess::sendRevisionReplayedCommand(qint64 revision) +{ + flatbuffers::FlatBufferBuilder fbb; + auto location = Akonadi2::Commands::CreateRevisionReplayed(fbb, revision); + Akonadi2::Commands::FinishRevisionReplayedBuffer(fbb, location); + open(); + return sendCommand(Akonadi2::Commands::RevisionReplayedCommand, fbb); +} + void ResourceAccess::open() { if (d->socket && d->socket->isValid()) { diff --git a/common/resourceaccess.h b/common/resourceaccess.h index 1ff9ca6..8e27054 100644 --- a/common/resourceaccess.h +++ b/common/resourceaccess.h @@ -46,6 +46,7 @@ public: virtual KAsync::Job sendCreateCommand(const QByteArray &resourceBufferType, const QByteArray &buffer) { return KAsync::null(); }; virtual KAsync::Job sendModifyCommand(const QByteArray &uid, qint64 revision, const QByteArray &resourceBufferType, const QByteArrayList &deletedProperties, const QByteArray &buffer) { return KAsync::null(); }; virtual KAsync::Job sendDeleteCommand(const QByteArray &uid, qint64 revision, const QByteArray &resourceBufferType) { return KAsync::null(); }; + virtual KAsync::Job sendRevisionReplayedCommand(qint64 revision) {return KAsync::null(); }; Q_SIGNALS: void ready(bool isReady); @@ -69,9 +70,10 @@ public: KAsync::Job sendCommand(int commandId) Q_DECL_OVERRIDE; KAsync::Job sendCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb) Q_DECL_OVERRIDE; KAsync::Job synchronizeResource(bool remoteSync, bool localSync) Q_DECL_OVERRIDE; - KAsync::Job sendCreateCommand(const QByteArray &resourceBufferType, const QByteArray &buffer); - KAsync::Job sendModifyCommand(const QByteArray &uid, qint64 revision, const QByteArray &resourceBufferType, const QByteArrayList &deletedProperties, const QByteArray &buffer); - KAsync::Job sendDeleteCommand(const QByteArray &uid, qint64 revision, const QByteArray &resourceBufferType); + KAsync::Job sendCreateCommand(const QByteArray &resourceBufferType, const QByteArray &buffer) Q_DECL_OVERRIDE; + KAsync::Job sendModifyCommand(const QByteArray &uid, qint64 revision, const QByteArray &resourceBufferType, const QByteArrayList &deletedProperties, const QByteArray &buffer) Q_DECL_OVERRIDE; + KAsync::Job sendDeleteCommand(const QByteArray &uid, qint64 revision, const QByteArray &resourceBufferType) Q_DECL_OVERRIDE; + KAsync::Job sendRevisionReplayedCommand(qint64 revision) Q_DECL_OVERRIDE; /** * Tries to connect to server, and returns a connected socket on success. */ -- cgit v1.2.3