From b187b95672fe0d8b16ba80bedd9022f1cda3a051 Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Mon, 20 Jul 2015 16:15:49 +0200 Subject: Pass command around as QByteArray Simpler api, GenericResource didn't honor size anyways, and we copy the command for now to avoid sideeffects of data coming in in the meantime (although that should generally work since data is always appended). --- common/genericresource.cpp | 2 +- common/genericresource.h | 2 +- common/resource.cpp | 3 +-- common/resource.h | 2 +- synchronizer/listener.cpp | 19 ++++++++++--------- synchronizer/listener.h | 2 +- tests/dummyresourcebenchmark.cpp | 2 +- tests/dummyresourcetest.cpp | 4 ++-- 8 files changed, 18 insertions(+), 18 deletions(-) diff --git a/common/genericresource.cpp b/common/genericresource.cpp index 5dfa9b5..ae06ef4 100644 --- a/common/genericresource.cpp +++ b/common/genericresource.cpp @@ -194,7 +194,7 @@ void GenericResource::enqueueCommand(MessageQueue &mq, int commandId, const QByt mq.enqueue(m_fbb.GetBufferPointer(), m_fbb.GetSize()); } -void GenericResource::processCommand(int commandId, const QByteArray &data, uint size, Akonadi2::Pipeline *pipeline) +void GenericResource::processCommand(int commandId, const QByteArray &data, Akonadi2::Pipeline *pipeline) { //TODO instead of copying the command including the full entity first into the command queue, we could directly //create a new revision, only pushing a handle into the commandqueue with the relevant changeset (for changereplay). diff --git a/common/genericresource.h b/common/genericresource.h index c44989e..e9d5d59 100644 --- a/common/genericresource.h +++ b/common/genericresource.h @@ -37,7 +37,7 @@ public: GenericResource(const QByteArray &resourceInstanceIdentifier); virtual ~GenericResource(); - virtual void processCommand(int commandId, const QByteArray &data, uint size, Pipeline *pipeline) Q_DECL_OVERRIDE; + virtual void processCommand(int commandId, const QByteArray &data, Pipeline *pipeline) Q_DECL_OVERRIDE; virtual KAsync::Job synchronizeWithSource(Pipeline *pipeline) Q_DECL_OVERRIDE = 0; virtual KAsync::Job processAllMessages() Q_DECL_OVERRIDE; diff --git a/common/resource.cpp b/common/resource.cpp index bd69afd..40ad04c 100644 --- a/common/resource.cpp +++ b/common/resource.cpp @@ -44,11 +44,10 @@ void Resource::configurePipeline(Pipeline *pipeline) } -void Resource::processCommand(int commandId, const QByteArray &data, uint size, Pipeline *pipeline) +void Resource::processCommand(int commandId, const QByteArray &data, Pipeline *pipeline) { Q_UNUSED(commandId) Q_UNUSED(data) - Q_UNUSED(size) Q_UNUSED(pipeline) pipeline->null(); } diff --git a/common/resource.h b/common/resource.h index ebbc2e1..009050e 100644 --- a/common/resource.h +++ b/common/resource.h @@ -36,7 +36,7 @@ public: Resource(); virtual ~Resource(); - virtual void processCommand(int commandId, const QByteArray &data, uint size, Pipeline *pipeline); + virtual void processCommand(int commandId, const QByteArray &data, Pipeline *pipeline); virtual KAsync::Job synchronizeWithSource(Pipeline *pipeline); virtual KAsync::Job processAllMessages(); diff --git a/synchronizer/listener.cpp b/synchronizer/listener.cpp index 1553f7d..2559664 100644 --- a/synchronizer/listener.cpp +++ b/synchronizer/listener.cpp @@ -203,13 +203,13 @@ void Listener::processClientBuffers() } } -void Listener::processCommand(int commandId, uint messageId, Client &client, uint size, const std::function &callback) +void Listener::processCommand(int commandId, uint messageId, const QByteArray &commandBuffer, Client &client, const std::function &callback) { switch (commandId) { case Akonadi2::Commands::HandshakeCommand: { - flatbuffers::Verifier verifier((const uint8_t *)client.commandBuffer.constData(), size); + flatbuffers::Verifier verifier((const uint8_t *)commandBuffer.constData(), commandBuffer.size()); if (Akonadi2::VerifyHandshakeBuffer(verifier)) { - auto buffer = Akonadi2::GetHandshake(client.commandBuffer.constData()); + auto buffer = Akonadi2::GetHandshake(commandBuffer.constData()); client.name = buffer->name()->c_str(); sendCurrentRevision(client); } else { @@ -218,9 +218,9 @@ void Listener::processCommand(int commandId, uint messageId, Client &client, uin break; } case Akonadi2::Commands::SynchronizeCommand: { - flatbuffers::Verifier verifier((const uint8_t *)client.commandBuffer.constData(), size); + flatbuffers::Verifier verifier((const uint8_t *)commandBuffer.constData(), commandBuffer.size()); if (Akonadi2::VerifySynchronizeBuffer(verifier)) { - auto buffer = Akonadi2::GetSynchronize(client.commandBuffer.constData()); + auto buffer = Akonadi2::GetSynchronize(commandBuffer.constData()); Log() << QString("\tSynchronize request (id %1) from %2").arg(messageId).arg(client.name); loadResource(); if (!m_resource) { @@ -250,7 +250,7 @@ void Listener::processCommand(int commandId, uint messageId, Client &client, uin Log() << "\tCommand id " << messageId << " of type \"" << Akonadi2::Commands::name(commandId) << "\" from " << client.name; loadResource(); if (m_resource) { - m_resource->processCommand(commandId, client.commandBuffer, size, m_pipeline); + m_resource->processCommand(commandId, commandBuffer, m_pipeline); } break; case Akonadi2::Commands::ShutdownCommand: @@ -262,7 +262,7 @@ void Listener::processCommand(int commandId, uint messageId, Client &client, uin Log() << QString("\tReceived custom command from %1: ").arg(client.name) << commandId; loadResource(); if (m_resource) { - m_resource->processCommand(commandId, client.commandBuffer, size, m_pipeline); + m_resource->processCommand(commandId, commandBuffer, m_pipeline); } } else { Warning() << QString("\tReceived invalid command from %1: ").arg(client.name) << commandId; @@ -307,7 +307,9 @@ bool Listener::processClientBuffer(Client &client) auto socket = QPointer(client.socket); auto clientName = client.name; - processCommand(commandId, messageId, client, size, [this, messageId, commandId, socket, clientName]() { + const QByteArray commandBuffer = client.commandBuffer.left(size); + client.commandBuffer.remove(0, size); + processCommand(commandId, messageId, commandBuffer, client, [this, messageId, commandId, socket, clientName]() { Log() << QString("\tCompleted command messageid %1 of type \"%2\" from %3").arg(messageId).arg(QString(Akonadi2::Commands::name(commandId))).arg(clientName); if (socket) { sendCommandCompleted(socket.data(), messageId); @@ -315,7 +317,6 @@ bool Listener::processClientBuffer(Client &client) Log() << QString("Socket became invalid before we could send a response. client: %1").arg(clientName); } }); - client.commandBuffer.remove(0, size); return client.commandBuffer.size() >= headerSize; } diff --git a/synchronizer/listener.h b/synchronizer/listener.h index e03c310..560f052 100644 --- a/synchronizer/listener.h +++ b/synchronizer/listener.h @@ -79,7 +79,7 @@ private Q_SLOTS: void quit(); private: - void processCommand(int commandId, uint messageId, Client &client, uint size, const std::function &callback); + void processCommand(int commandId, uint messageId, const QByteArray &commandBuffer, Client &client, const std::function &callback); bool processClientBuffer(Client &client); void sendCurrentRevision(Client &client); void sendCommandCompleted(QLocalSocket *socket, uint messageId); diff --git a/tests/dummyresourcebenchmark.cpp b/tests/dummyresourcebenchmark.cpp index fd2cb01..7d40779 100644 --- a/tests/dummyresourcebenchmark.cpp +++ b/tests/dummyresourcebenchmark.cpp @@ -133,7 +133,7 @@ private Q_SLOTS: const QByteArray command(reinterpret_cast(fbb.GetBufferPointer()), fbb.GetSize()); for (int i = 0; i < num; i++) { - resource.processCommand(Akonadi2::Commands::CreateEntityCommand, command, command.size(), &pipeline); + resource.processCommand(Akonadi2::Commands::CreateEntityCommand, command, &pipeline); } auto appendTime = time.elapsed(); diff --git a/tests/dummyresourcetest.cpp b/tests/dummyresourcetest.cpp index 7499d62..36812c1 100644 --- a/tests/dummyresourcetest.cpp +++ b/tests/dummyresourcetest.cpp @@ -89,8 +89,8 @@ private Q_SLOTS: QSignalSpy revisionSpy(&pipeline, SIGNAL(revisionUpdated())); DummyResource resource("org.kde.dummy.instance1"); resource.configurePipeline(&pipeline); - resource.processCommand(Akonadi2::Commands::CreateEntityCommand, command, command.size(), &pipeline); - resource.processCommand(Akonadi2::Commands::CreateEntityCommand, command, command.size(), &pipeline); + resource.processCommand(Akonadi2::Commands::CreateEntityCommand, command, &pipeline); + resource.processCommand(Akonadi2::Commands::CreateEntityCommand, command, &pipeline); QVERIFY(revisionSpy.isValid()); QTRY_COMPARE(revisionSpy.count(), 2); -- cgit v1.2.3