From 9b2257d680a5e4fa2fda8cf3302f25054a06710e Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Sun, 28 Dec 2014 14:44:50 +0100 Subject: Buffers wrapped into entity buffer, async command progress tracking. --- synchronizer/listener.cpp | 155 ++++++++++++++++++++++++---------------------- synchronizer/listener.h | 6 +- 2 files changed, 82 insertions(+), 79 deletions(-) (limited to 'synchronizer') diff --git a/synchronizer/listener.cpp b/synchronizer/listener.cpp index 328d4d6..8e94213 100644 --- a/synchronizer/listener.cpp +++ b/synchronizer/listener.cpp @@ -35,7 +35,6 @@ Listener::Listener(const QString &resourceName, QObject *parent) : QObject(parent), m_server(new QLocalServer(this)), - m_revision(0), m_resourceName(resourceName), m_resource(0), m_pipeline(new Akonadi2::Pipeline(resourceName, parent)), @@ -46,18 +45,18 @@ Listener::Listener(const QString &resourceName, QObject *parent) this, &Listener::refreshRevision); connect(m_server, &QLocalServer::newConnection, this, &Listener::acceptConnection); - Akonadi2::Console::main()->log(QString("Trying to open %1").arg(resourceName)); + log(QString("Trying to open %1").arg(resourceName)); if (!m_server->listen(resourceName)) { // FIXME: multiple starts need to be handled here m_server->removeServer(resourceName); if (!m_server->listen(resourceName)) { - Akonadi2::Console::main()->log("Utter failure to start server"); + log("Utter failure to start server"); exit(-1); } } if (m_server->isListening()) { - Akonadi2::Console::main()->log(QString("Listening on %1").arg(m_server->serverName())); + log(QString("Listening on %1").arg(m_server->serverName())); } //TODO: experiment with different timeouts @@ -73,19 +72,6 @@ Listener::~Listener() { } -void Listener::setRevision(unsigned long long revision) -{ - if (m_revision != revision) { - m_revision = revision; - updateClientsWithRevision(); - } -} - -unsigned long long Listener::revision() const -{ - return m_revision; -} - void Listener::closeAllConnections() { for (Client &client: m_connections) { @@ -101,14 +87,14 @@ void Listener::closeAllConnections() void Listener::acceptConnection() { - Akonadi2::Console::main()->log(QString("Accepting connection")); + log(QString("Accepting connection")); QLocalSocket *socket = m_server->nextPendingConnection(); if (!socket) { return; } - Akonadi2::Console::main()->log("Got a connection"); + log("Got a connection"); Client client("Unknown Client", socket); connect(socket, &QIODevice::readyRead, this, &Listener::readFromSocket); @@ -125,12 +111,12 @@ void Listener::clientDropped() return; } - Akonadi2::Console::main()->log("Dropping connection..."); + log("Dropping connection..."); QMutableVectorIterator it(m_connections); while (it.hasNext()) { const Client &client = it.next(); if (client.socket == socket) { - Akonadi2::Console::main()->log(QString(" dropped... %1").arg(client.name)); + log(QString(" dropped... %1").arg(client.name)); it.remove(); break; } @@ -154,7 +140,7 @@ void Listener::readFromSocket() return; } - Akonadi2::Console::main()->log("Reading from socket..."); + log("Reading from socket..."); for (Client &client: m_connections) { if (client.socket == socket) { client.commandBuffer += socket->readAll(); @@ -188,6 +174,57 @@ void Listener::processClientBuffers() } } +void Listener::processCommand(int commandId, uint messageId, Client &client, uint size, const std::function &callback) +{ + switch (commandId) { + case Akonadi2::Commands::HandshakeCommand: { + flatbuffers::Verifier verifier((const uint8_t *)client.commandBuffer.constData(), size); + if (Akonadi2::VerifyHandshakeBuffer(verifier)) { + auto buffer = Akonadi2::GetHandshake(client.commandBuffer.constData()); + client.name = buffer->name()->c_str(); + sendCurrentRevision(client); + } else { + qWarning() << "received invalid command"; + } + break; + } + case Akonadi2::Commands::SynchronizeCommand: { + log(QString("\tSynchronize request (id %1) from %2").arg(messageId).arg(client.name)); + loadResource(); + if (m_resource) { + qDebug() << "synchronizing"; + m_resource->synchronizeWithSource(m_pipeline).then([callback](Async::Future &f){ + //FIXME: the command is complete once all pipelines have been drained, track progress and async emit result + callback(); + f.setFinished(); + }).exec(); + return; + } else { + qWarning() << "No resource loaded"; + } + break; + } + case Akonadi2::Commands::FetchEntityCommand: + case Akonadi2::Commands::DeleteEntityCommand: + case Akonadi2::Commands::ModifyEntityCommand: + case Akonadi2::Commands::CreateEntityCommand: + log(QString("\tCommand id %1 of type %2 from %3").arg(messageId).arg(commandId).arg(client.name)); + m_resource->processCommand(commandId, client.commandBuffer, size, m_pipeline); + break; + default: + if (commandId > Akonadi2::Commands::CustomCommand) { + loadResource(); + if (m_resource) { + m_resource->processCommand(commandId, client.commandBuffer, size, m_pipeline); + } + } else { + //TODO: handle error: we don't know wtf this command is + } + break; + } + callback(); +} + bool Listener::processClientBuffer(Client &client) { static const int headerSize = Akonadi2::Commands::headerSize(); @@ -195,58 +232,22 @@ bool Listener::processClientBuffer(Client &client) return false; } - int commandId; - uint messageId, size; - messageId = *(uint*)client.commandBuffer.constData(); - commandId = *(int*)(client.commandBuffer.constData() + sizeof(uint)); - size = *(uint*)(client.commandBuffer.constData() + sizeof(int) + sizeof(uint)); + const uint messageId = *(uint*)client.commandBuffer.constData(); + const int commandId = *(int*)(client.commandBuffer.constData() + sizeof(uint)); + const uint size = *(uint*)(client.commandBuffer.constData() + sizeof(int) + sizeof(uint)); //TODO: reject messages above a certain size? if (size <= uint(client.commandBuffer.size() - headerSize)) { client.commandBuffer.remove(0, headerSize); - switch (commandId) { - case Akonadi2::Commands::HandshakeCommand: { - flatbuffers::Verifier verifier((const uint8_t *)client.commandBuffer.constData(), size); - if (Akonadi2::VerifyHandshakeBuffer(verifier)) { - auto buffer = Akonadi2::GetHandshake(client.commandBuffer.constData()); - client.name = buffer->name()->c_str(); - sendCurrentRevision(client); - } - break; - } - case Akonadi2::Commands::SynchronizeCommand: { - Akonadi2::Console::main()->log(QString("\tSynchronize request (id %1) from %2").arg(messageId).arg(client.name)); - loadResource(); - if (m_resource) { - m_resource->synchronizeWithSource(m_pipeline); - } - break; - } - case Akonadi2::Commands::FetchEntityCommand: - case Akonadi2::Commands::DeleteEntityCommand: - case Akonadi2::Commands::ModifyEntityCommand: - case Akonadi2::Commands::CreateEntityCommand: - Akonadi2::Console::main()->log(QString("\tCommand id %1 of type %2 from %3").arg(messageId).arg(commandId).arg(client.name)); - m_resource->processCommand(commandId, client.commandBuffer, size, m_pipeline); - break; - default: - if (commandId > Akonadi2::Commands::CustomCommand) { - loadResource(); - if (m_resource) { - m_resource->processCommand(commandId, client.commandBuffer, size, m_pipeline); - } - } else { - //TODO: handle error: we don't know wtf this command is - } - break; - } - - //TODO: async commands == async sendCommandCompleted - Akonadi2::Console::main()->log(QString("\tCompleted command id %1 of type %2 from %3").arg(messageId).arg(commandId).arg(client.name)); + processCommand(commandId, messageId, client, size, [this, messageId, commandId, &client]() { + log(QString("\tCompleted command messageid %1 of type %2 from %3").arg(messageId).arg(commandId).arg(client.name)); + //FIXME, client needs to become a shared pointer and not a reference, or we have to search through m_connections everytime. + sendCommandCompleted(client, messageId); + }); client.commandBuffer.remove(0, size); - sendCommandCompleted(client, messageId); + return client.commandBuffer.size() >= headerSize; } @@ -259,7 +260,7 @@ void Listener::sendCurrentRevision(Client &client) return; } - auto command = Akonadi2::CreateRevisionUpdate(m_fbb, m_revision); + auto command = Akonadi2::CreateRevisionUpdate(m_fbb, m_pipeline->storage().maxRevision()); Akonadi2::FinishRevisionUpdateBuffer(m_fbb, command); Akonadi2::Commands::write(client.socket, ++m_messageId, Akonadi2::Commands::RevisionUpdateCommand, m_fbb); m_fbb.Clear(); @@ -279,14 +280,12 @@ void Listener::sendCommandCompleted(Client &client, uint messageId) void Listener::refreshRevision() { - //TODO this should be coming out of m_pipeline->storage() - ++m_revision; updateClientsWithRevision(); } void Listener::updateClientsWithRevision() { - auto command = Akonadi2::CreateRevisionUpdate(m_fbb, m_revision); + auto command = Akonadi2::CreateRevisionUpdate(m_fbb, m_pipeline->storage().maxRevision()); Akonadi2::FinishRevisionUpdateBuffer(m_fbb, command); for (const Client &client: m_connections) { @@ -308,13 +307,19 @@ void Listener::loadResource() Akonadi2::ResourceFactory *resourceFactory = Akonadi2::ResourceFactory::load(m_resourceName); if (resourceFactory) { m_resource = resourceFactory->createResource(); - Akonadi2::Console::main()->log(QString("Resource factory: %1").arg((qlonglong)resourceFactory)); - Akonadi2::Console::main()->log(QString("\tResource: %1").arg((qlonglong)m_resource)); + log(QString("Resource factory: %1").arg((qlonglong)resourceFactory)); + log(QString("\tResource: %1").arg((qlonglong)m_resource)); //TODO: this doesn't really list all the facades .. fix - Akonadi2::Console::main()->log(QString("\tFacades: %1").arg(Akonadi2::FacadeFactory::instance().getFacade(m_resourceName)->type())); + log(QString("\tFacades: %1").arg(Akonadi2::FacadeFactory::instance().getFacade(m_resourceName)->type())); } else { - Akonadi2::Console::main()->log(QString("Failed to load resource %1").arg(m_resourceName)); + log(QString("Failed to load resource %1").arg(m_resourceName)); } //TODO: on failure ... what? + //Enter broken state? +} + +void Listener::log(const QString &message) +{ + Akonadi2::Console::main()->log("Listener: " + message); } diff --git a/synchronizer/listener.h b/synchronizer/listener.h index 357ae37..4c35191 100644 --- a/synchronizer/listener.h +++ b/synchronizer/listener.h @@ -61,9 +61,6 @@ public: Listener(const QString &resourceName, QObject *parent = 0); ~Listener(); - void setRevision(unsigned long long revision); - unsigned long long revision() const; - Q_SIGNALS: void noClients(); @@ -79,15 +76,16 @@ private Q_SLOTS: void refreshRevision(); private: + void processCommand(int commandId, uint messageId, Client &client, uint size, const std::function &callback); bool processClientBuffer(Client &client); void sendCurrentRevision(Client &client); void sendCommandCompleted(Client &client, uint messageId); void updateClientsWithRevision(); void loadResource(); + void log(const QString &); QLocalServer *m_server; QVector m_connections; - unsigned long long m_revision; flatbuffers::FlatBufferBuilder m_fbb; const QString m_resourceName; Akonadi2::Resource *m_resource; -- cgit v1.2.3