diff options
author | Christian Mollekopf <chrigi_1@fastmail.fm> | 2015-07-20 16:15:49 +0200 |
---|---|---|
committer | Christian Mollekopf <chrigi_1@fastmail.fm> | 2015-07-23 17:37:53 +0200 |
commit | b187b95672fe0d8b16ba80bedd9022f1cda3a051 (patch) | |
tree | c86bf63c73eb91aca83a3d0c22be95b7a7515f0b | |
parent | 4a71e5b06506af6e3ab7a7715705e76b6d6e9bb7 (diff) | |
download | sink-b187b95672fe0d8b16ba80bedd9022f1cda3a051.tar.gz sink-b187b95672fe0d8b16ba80bedd9022f1cda3a051.zip |
Pass command around as QByteArray
Simpler api, GenericResource didn't honor size anyways,
and we copy the command for now to avoid sideeffects of data coming in
in the meantime (although that should generally work since data is
always appended).
-rw-r--r-- | common/genericresource.cpp | 2 | ||||
-rw-r--r-- | common/genericresource.h | 2 | ||||
-rw-r--r-- | common/resource.cpp | 3 | ||||
-rw-r--r-- | common/resource.h | 2 | ||||
-rw-r--r-- | synchronizer/listener.cpp | 19 | ||||
-rw-r--r-- | synchronizer/listener.h | 2 | ||||
-rw-r--r-- | tests/dummyresourcebenchmark.cpp | 2 | ||||
-rw-r--r-- | tests/dummyresourcetest.cpp | 4 |
8 files changed, 18 insertions, 18 deletions
diff --git a/common/genericresource.cpp b/common/genericresource.cpp index 5dfa9b5..ae06ef4 100644 --- a/common/genericresource.cpp +++ b/common/genericresource.cpp | |||
@@ -194,7 +194,7 @@ void GenericResource::enqueueCommand(MessageQueue &mq, int commandId, const QByt | |||
194 | mq.enqueue(m_fbb.GetBufferPointer(), m_fbb.GetSize()); | 194 | mq.enqueue(m_fbb.GetBufferPointer(), m_fbb.GetSize()); |
195 | } | 195 | } |
196 | 196 | ||
197 | void GenericResource::processCommand(int commandId, const QByteArray &data, uint size, Akonadi2::Pipeline *pipeline) | 197 | void GenericResource::processCommand(int commandId, const QByteArray &data, Akonadi2::Pipeline *pipeline) |
198 | { | 198 | { |
199 | //TODO instead of copying the command including the full entity first into the command queue, we could directly | 199 | //TODO instead of copying the command including the full entity first into the command queue, we could directly |
200 | //create a new revision, only pushing a handle into the commandqueue with the relevant changeset (for changereplay). | 200 | //create a new revision, only pushing a handle into the commandqueue with the relevant changeset (for changereplay). |
diff --git a/common/genericresource.h b/common/genericresource.h index c44989e..e9d5d59 100644 --- a/common/genericresource.h +++ b/common/genericresource.h | |||
@@ -37,7 +37,7 @@ public: | |||
37 | GenericResource(const QByteArray &resourceInstanceIdentifier); | 37 | GenericResource(const QByteArray &resourceInstanceIdentifier); |
38 | virtual ~GenericResource(); | 38 | virtual ~GenericResource(); |
39 | 39 | ||
40 | virtual void processCommand(int commandId, const QByteArray &data, uint size, Pipeline *pipeline) Q_DECL_OVERRIDE; | 40 | virtual void processCommand(int commandId, const QByteArray &data, Pipeline *pipeline) Q_DECL_OVERRIDE; |
41 | virtual KAsync::Job<void> synchronizeWithSource(Pipeline *pipeline) Q_DECL_OVERRIDE = 0; | 41 | virtual KAsync::Job<void> synchronizeWithSource(Pipeline *pipeline) Q_DECL_OVERRIDE = 0; |
42 | virtual KAsync::Job<void> processAllMessages() Q_DECL_OVERRIDE; | 42 | virtual KAsync::Job<void> processAllMessages() Q_DECL_OVERRIDE; |
43 | 43 | ||
diff --git a/common/resource.cpp b/common/resource.cpp index bd69afd..40ad04c 100644 --- a/common/resource.cpp +++ b/common/resource.cpp | |||
@@ -44,11 +44,10 @@ void Resource::configurePipeline(Pipeline *pipeline) | |||
44 | 44 | ||
45 | } | 45 | } |
46 | 46 | ||
47 | void Resource::processCommand(int commandId, const QByteArray &data, uint size, Pipeline *pipeline) | 47 | void Resource::processCommand(int commandId, const QByteArray &data, Pipeline *pipeline) |
48 | { | 48 | { |
49 | Q_UNUSED(commandId) | 49 | Q_UNUSED(commandId) |
50 | Q_UNUSED(data) | 50 | Q_UNUSED(data) |
51 | Q_UNUSED(size) | ||
52 | Q_UNUSED(pipeline) | 51 | Q_UNUSED(pipeline) |
53 | pipeline->null(); | 52 | pipeline->null(); |
54 | } | 53 | } |
diff --git a/common/resource.h b/common/resource.h index ebbc2e1..009050e 100644 --- a/common/resource.h +++ b/common/resource.h | |||
@@ -36,7 +36,7 @@ public: | |||
36 | Resource(); | 36 | Resource(); |
37 | virtual ~Resource(); | 37 | virtual ~Resource(); |
38 | 38 | ||
39 | virtual void processCommand(int commandId, const QByteArray &data, uint size, Pipeline *pipeline); | 39 | virtual void processCommand(int commandId, const QByteArray &data, Pipeline *pipeline); |
40 | virtual KAsync::Job<void> synchronizeWithSource(Pipeline *pipeline); | 40 | virtual KAsync::Job<void> synchronizeWithSource(Pipeline *pipeline); |
41 | virtual KAsync::Job<void> processAllMessages(); | 41 | virtual KAsync::Job<void> processAllMessages(); |
42 | 42 | ||
diff --git a/synchronizer/listener.cpp b/synchronizer/listener.cpp index 1553f7d..2559664 100644 --- a/synchronizer/listener.cpp +++ b/synchronizer/listener.cpp | |||
@@ -203,13 +203,13 @@ void Listener::processClientBuffers() | |||
203 | } | 203 | } |
204 | } | 204 | } |
205 | 205 | ||
206 | void Listener::processCommand(int commandId, uint messageId, Client &client, uint size, const std::function<void()> &callback) | 206 | void Listener::processCommand(int commandId, uint messageId, const QByteArray &commandBuffer, Client &client, const std::function<void()> &callback) |
207 | { | 207 | { |
208 | switch (commandId) { | 208 | switch (commandId) { |
209 | case Akonadi2::Commands::HandshakeCommand: { | 209 | case Akonadi2::Commands::HandshakeCommand: { |
210 | flatbuffers::Verifier verifier((const uint8_t *)client.commandBuffer.constData(), size); | 210 | flatbuffers::Verifier verifier((const uint8_t *)commandBuffer.constData(), commandBuffer.size()); |
211 | if (Akonadi2::VerifyHandshakeBuffer(verifier)) { | 211 | if (Akonadi2::VerifyHandshakeBuffer(verifier)) { |
212 | auto buffer = Akonadi2::GetHandshake(client.commandBuffer.constData()); | 212 | auto buffer = Akonadi2::GetHandshake(commandBuffer.constData()); |
213 | client.name = buffer->name()->c_str(); | 213 | client.name = buffer->name()->c_str(); |
214 | sendCurrentRevision(client); | 214 | sendCurrentRevision(client); |
215 | } else { | 215 | } else { |
@@ -218,9 +218,9 @@ void Listener::processCommand(int commandId, uint messageId, Client &client, uin | |||
218 | break; | 218 | break; |
219 | } | 219 | } |
220 | case Akonadi2::Commands::SynchronizeCommand: { | 220 | case Akonadi2::Commands::SynchronizeCommand: { |
221 | flatbuffers::Verifier verifier((const uint8_t *)client.commandBuffer.constData(), size); | 221 | flatbuffers::Verifier verifier((const uint8_t *)commandBuffer.constData(), commandBuffer.size()); |
222 | if (Akonadi2::VerifySynchronizeBuffer(verifier)) { | 222 | if (Akonadi2::VerifySynchronizeBuffer(verifier)) { |
223 | auto buffer = Akonadi2::GetSynchronize(client.commandBuffer.constData()); | 223 | auto buffer = Akonadi2::GetSynchronize(commandBuffer.constData()); |
224 | Log() << QString("\tSynchronize request (id %1) from %2").arg(messageId).arg(client.name); | 224 | Log() << QString("\tSynchronize request (id %1) from %2").arg(messageId).arg(client.name); |
225 | loadResource(); | 225 | loadResource(); |
226 | if (!m_resource) { | 226 | if (!m_resource) { |
@@ -250,7 +250,7 @@ void Listener::processCommand(int commandId, uint messageId, Client &client, uin | |||
250 | Log() << "\tCommand id " << messageId << " of type \"" << Akonadi2::Commands::name(commandId) << "\" from " << client.name; | 250 | Log() << "\tCommand id " << messageId << " of type \"" << Akonadi2::Commands::name(commandId) << "\" from " << client.name; |
251 | loadResource(); | 251 | loadResource(); |
252 | if (m_resource) { | 252 | if (m_resource) { |
253 | m_resource->processCommand(commandId, client.commandBuffer, size, m_pipeline); | 253 | m_resource->processCommand(commandId, commandBuffer, m_pipeline); |
254 | } | 254 | } |
255 | break; | 255 | break; |
256 | case Akonadi2::Commands::ShutdownCommand: | 256 | case Akonadi2::Commands::ShutdownCommand: |
@@ -262,7 +262,7 @@ void Listener::processCommand(int commandId, uint messageId, Client &client, uin | |||
262 | Log() << QString("\tReceived custom command from %1: ").arg(client.name) << commandId; | 262 | Log() << QString("\tReceived custom command from %1: ").arg(client.name) << commandId; |
263 | loadResource(); | 263 | loadResource(); |
264 | if (m_resource) { | 264 | if (m_resource) { |
265 | m_resource->processCommand(commandId, client.commandBuffer, size, m_pipeline); | 265 | m_resource->processCommand(commandId, commandBuffer, m_pipeline); |
266 | } | 266 | } |
267 | } else { | 267 | } else { |
268 | Warning() << QString("\tReceived invalid command from %1: ").arg(client.name) << commandId; | 268 | Warning() << QString("\tReceived invalid command from %1: ").arg(client.name) << commandId; |
@@ -307,7 +307,9 @@ bool Listener::processClientBuffer(Client &client) | |||
307 | 307 | ||
308 | auto socket = QPointer<QLocalSocket>(client.socket); | 308 | auto socket = QPointer<QLocalSocket>(client.socket); |
309 | auto clientName = client.name; | 309 | auto clientName = client.name; |
310 | processCommand(commandId, messageId, client, size, [this, messageId, commandId, socket, clientName]() { | 310 | const QByteArray commandBuffer = client.commandBuffer.left(size); |
311 | client.commandBuffer.remove(0, size); | ||
312 | processCommand(commandId, messageId, commandBuffer, client, [this, messageId, commandId, socket, clientName]() { | ||
311 | Log() << QString("\tCompleted command messageid %1 of type \"%2\" from %3").arg(messageId).arg(QString(Akonadi2::Commands::name(commandId))).arg(clientName); | 313 | Log() << QString("\tCompleted command messageid %1 of type \"%2\" from %3").arg(messageId).arg(QString(Akonadi2::Commands::name(commandId))).arg(clientName); |
312 | if (socket) { | 314 | if (socket) { |
313 | sendCommandCompleted(socket.data(), messageId); | 315 | sendCommandCompleted(socket.data(), messageId); |
@@ -315,7 +317,6 @@ bool Listener::processClientBuffer(Client &client) | |||
315 | Log() << QString("Socket became invalid before we could send a response. client: %1").arg(clientName); | 317 | Log() << QString("Socket became invalid before we could send a response. client: %1").arg(clientName); |
316 | } | 318 | } |
317 | }); | 319 | }); |
318 | client.commandBuffer.remove(0, size); | ||
319 | 320 | ||
320 | return client.commandBuffer.size() >= headerSize; | 321 | return client.commandBuffer.size() >= headerSize; |
321 | } | 322 | } |
diff --git a/synchronizer/listener.h b/synchronizer/listener.h index e03c310..560f052 100644 --- a/synchronizer/listener.h +++ b/synchronizer/listener.h | |||
@@ -79,7 +79,7 @@ private Q_SLOTS: | |||
79 | void quit(); | 79 | void quit(); |
80 | 80 | ||
81 | private: | 81 | private: |
82 | void processCommand(int commandId, uint messageId, Client &client, uint size, const std::function<void()> &callback); | 82 | void processCommand(int commandId, uint messageId, const QByteArray &commandBuffer, Client &client, const std::function<void()> &callback); |
83 | bool processClientBuffer(Client &client); | 83 | bool processClientBuffer(Client &client); |
84 | void sendCurrentRevision(Client &client); | 84 | void sendCurrentRevision(Client &client); |
85 | void sendCommandCompleted(QLocalSocket *socket, uint messageId); | 85 | void sendCommandCompleted(QLocalSocket *socket, uint messageId); |
diff --git a/tests/dummyresourcebenchmark.cpp b/tests/dummyresourcebenchmark.cpp index fd2cb01..7d40779 100644 --- a/tests/dummyresourcebenchmark.cpp +++ b/tests/dummyresourcebenchmark.cpp | |||
@@ -133,7 +133,7 @@ private Q_SLOTS: | |||
133 | const QByteArray command(reinterpret_cast<const char *>(fbb.GetBufferPointer()), fbb.GetSize()); | 133 | const QByteArray command(reinterpret_cast<const char *>(fbb.GetBufferPointer()), fbb.GetSize()); |
134 | 134 | ||
135 | for (int i = 0; i < num; i++) { | 135 | for (int i = 0; i < num; i++) { |
136 | resource.processCommand(Akonadi2::Commands::CreateEntityCommand, command, command.size(), &pipeline); | 136 | resource.processCommand(Akonadi2::Commands::CreateEntityCommand, command, &pipeline); |
137 | } | 137 | } |
138 | auto appendTime = time.elapsed(); | 138 | auto appendTime = time.elapsed(); |
139 | 139 | ||
diff --git a/tests/dummyresourcetest.cpp b/tests/dummyresourcetest.cpp index 7499d62..36812c1 100644 --- a/tests/dummyresourcetest.cpp +++ b/tests/dummyresourcetest.cpp | |||
@@ -89,8 +89,8 @@ private Q_SLOTS: | |||
89 | QSignalSpy revisionSpy(&pipeline, SIGNAL(revisionUpdated())); | 89 | QSignalSpy revisionSpy(&pipeline, SIGNAL(revisionUpdated())); |
90 | DummyResource resource("org.kde.dummy.instance1"); | 90 | DummyResource resource("org.kde.dummy.instance1"); |
91 | resource.configurePipeline(&pipeline); | 91 | resource.configurePipeline(&pipeline); |
92 | resource.processCommand(Akonadi2::Commands::CreateEntityCommand, command, command.size(), &pipeline); | 92 | resource.processCommand(Akonadi2::Commands::CreateEntityCommand, command, &pipeline); |
93 | resource.processCommand(Akonadi2::Commands::CreateEntityCommand, command, command.size(), &pipeline); | 93 | resource.processCommand(Akonadi2::Commands::CreateEntityCommand, command, &pipeline); |
94 | 94 | ||
95 | QVERIFY(revisionSpy.isValid()); | 95 | QVERIFY(revisionSpy.isValid()); |
96 | QTRY_COMPARE(revisionSpy.count(), 2); | 96 | QTRY_COMPARE(revisionSpy.count(), 2); |