diff options
-rw-r--r-- | common/genericresource.cpp | 2 | ||||
-rw-r--r-- | common/genericresource.h | 2 | ||||
-rw-r--r-- | common/resource.cpp | 3 | ||||
-rw-r--r-- | common/resource.h | 2 | ||||
-rw-r--r-- | synchronizer/listener.cpp | 19 | ||||
-rw-r--r-- | synchronizer/listener.h | 2 | ||||
-rw-r--r-- | tests/dummyresourcebenchmark.cpp | 2 | ||||
-rw-r--r-- | tests/dummyresourcetest.cpp | 4 |
8 files changed, 18 insertions, 18 deletions
diff --git a/common/genericresource.cpp b/common/genericresource.cpp index 5dfa9b5..ae06ef4 100644 --- a/common/genericresource.cpp +++ b/common/genericresource.cpp | |||
@@ -194,7 +194,7 @@ void GenericResource::enqueueCommand(MessageQueue &mq, int commandId, const QByt | |||
194 | mq.enqueue(m_fbb.GetBufferPointer(), m_fbb.GetSize()); | 194 | mq.enqueue(m_fbb.GetBufferPointer(), m_fbb.GetSize()); |
195 | } | 195 | } |
196 | 196 | ||
197 | void GenericResource::processCommand(int commandId, const QByteArray &data, uint size, Akonadi2::Pipeline *pipeline) | 197 | void GenericResource::processCommand(int commandId, const QByteArray &data, Akonadi2::Pipeline *pipeline) |
198 | { | 198 | { |
199 | //TODO instead of copying the command including the full entity first into the command queue, we could directly | 199 | //TODO instead of copying the command including the full entity first into the command queue, we could directly |
200 | //create a new revision, only pushing a handle into the commandqueue with the relevant changeset (for changereplay). | 200 | //create a new revision, only pushing a handle into the commandqueue with the relevant changeset (for changereplay). |
diff --git a/common/genericresource.h b/common/genericresource.h index c44989e..e9d5d59 100644 --- a/common/genericresource.h +++ b/common/genericresource.h | |||
@@ -37,7 +37,7 @@ public: | |||
37 | GenericResource(const QByteArray &resourceInstanceIdentifier); | 37 | GenericResource(const QByteArray &resourceInstanceIdentifier); |
38 | virtual ~GenericResource(); | 38 | virtual ~GenericResource(); |
39 | 39 | ||
40 | virtual void processCommand(int commandId, const QByteArray &data, uint size, Pipeline *pipeline) Q_DECL_OVERRIDE; | 40 | virtual void processCommand(int commandId, const QByteArray &data, Pipeline *pipeline) Q_DECL_OVERRIDE; |
41 | virtual KAsync::Job<void> synchronizeWithSource(Pipeline *pipeline) Q_DECL_OVERRIDE = 0; | 41 | virtual KAsync::Job<void> synchronizeWithSource(Pipeline *pipeline) Q_DECL_OVERRIDE = 0; |
42 | virtual KAsync::Job<void> processAllMessages() Q_DECL_OVERRIDE; | 42 | virtual KAsync::Job<void> processAllMessages() Q_DECL_OVERRIDE; |
43 | 43 | ||
diff --git a/common/resource.cpp b/common/resource.cpp index bd69afd..40ad04c 100644 --- a/common/resource.cpp +++ b/common/resource.cpp | |||
@@ -44,11 +44,10 @@ void Resource::configurePipeline(Pipeline *pipeline) | |||
44 | 44 | ||
45 | } | 45 | } |
46 | 46 | ||
47 | void Resource::processCommand(int commandId, const QByteArray &data, uint size, Pipeline *pipeline) | 47 | void Resource::processCommand(int commandId, const QByteArray &data, Pipeline *pipeline) |
48 | { | 48 | { |
49 | Q_UNUSED(commandId) | 49 | Q_UNUSED(commandId) |
50 | Q_UNUSED(data) | 50 | Q_UNUSED(data) |
51 | Q_UNUSED(size) | ||
52 | Q_UNUSED(pipeline) | 51 | Q_UNUSED(pipeline) |
53 | pipeline->null(); | 52 | pipeline->null(); |
54 | } | 53 | } |
diff --git a/common/resource.h b/common/resource.h index ebbc2e1..009050e 100644 --- a/common/resource.h +++ b/common/resource.h | |||
@@ -36,7 +36,7 @@ public: | |||
36 | Resource(); | 36 | Resource(); |
37 | virtual ~Resource(); | 37 | virtual ~Resource(); |
38 | 38 | ||
39 | virtual void processCommand(int commandId, const QByteArray &data, uint size, Pipeline *pipeline); | 39 | virtual void processCommand(int commandId, const QByteArray &data, Pipeline *pipeline); |
40 | virtual KAsync::Job<void> synchronizeWithSource(Pipeline *pipeline); | 40 | virtual KAsync::Job<void> synchronizeWithSource(Pipeline *pipeline); |
41 | virtual KAsync::Job<void> processAllMessages(); | 41 | virtual KAsync::Job<void> processAllMessages(); |
42 | 42 | ||
diff --git a/synchronizer/listener.cpp b/synchronizer/listener.cpp index 1553f7d..2559664 100644 --- a/synchronizer/listener.cpp +++ b/synchronizer/listener.cpp | |||
@@ -203,13 +203,13 @@ void Listener::processClientBuffers() | |||
203 | } | 203 | } |
204 | } | 204 | } |
205 | 205 | ||
206 | void Listener::processCommand(int commandId, uint messageId, Client &client, uint size, const std::function<void()> &callback) | 206 | void Listener::processCommand(int commandId, uint messageId, const QByteArray &commandBuffer, Client &client, const std::function<void()> &callback) |
207 | { | 207 | { |
208 | switch (commandId) { | 208 | switch (commandId) { |
209 | case Akonadi2::Commands::HandshakeCommand: { | 209 | case Akonadi2::Commands::HandshakeCommand: { |
210 | flatbuffers::Verifier verifier((const uint8_t *)client.commandBuffer.constData(), size); | 210 | flatbuffers::Verifier verifier((const uint8_t *)commandBuffer.constData(), commandBuffer.size()); |
211 | if (Akonadi2::VerifyHandshakeBuffer(verifier)) { | 211 | if (Akonadi2::VerifyHandshakeBuffer(verifier)) { |
212 | auto buffer = Akonadi2::GetHandshake(client.commandBuffer.constData()); | 212 | auto buffer = Akonadi2::GetHandshake(commandBuffer.constData()); |
213 | client.name = buffer->name()->c_str(); | 213 | client.name = buffer->name()->c_str(); |
214 | sendCurrentRevision(client); | 214 | sendCurrentRevision(client); |
215 | } else { | 215 | } else { |
@@ -218,9 +218,9 @@ void Listener::processCommand(int commandId, uint messageId, Client &client, uin | |||
218 | break; | 218 | break; |
219 | } | 219 | } |
220 | case Akonadi2::Commands::SynchronizeCommand: { | 220 | case Akonadi2::Commands::SynchronizeCommand: { |
221 | flatbuffers::Verifier verifier((const uint8_t *)client.commandBuffer.constData(), size); | 221 | flatbuffers::Verifier verifier((const uint8_t *)commandBuffer.constData(), commandBuffer.size()); |
222 | if (Akonadi2::VerifySynchronizeBuffer(verifier)) { | 222 | if (Akonadi2::VerifySynchronizeBuffer(verifier)) { |
223 | auto buffer = Akonadi2::GetSynchronize(client.commandBuffer.constData()); | 223 | auto buffer = Akonadi2::GetSynchronize(commandBuffer.constData()); |
224 | Log() << QString("\tSynchronize request (id %1) from %2").arg(messageId).arg(client.name); | 224 | Log() << QString("\tSynchronize request (id %1) from %2").arg(messageId).arg(client.name); |
225 | loadResource(); | 225 | loadResource(); |
226 | if (!m_resource) { | 226 | if (!m_resource) { |
@@ -250,7 +250,7 @@ void Listener::processCommand(int commandId, uint messageId, Client &client, uin | |||
250 | Log() << "\tCommand id " << messageId << " of type \"" << Akonadi2::Commands::name(commandId) << "\" from " << client.name; | 250 | Log() << "\tCommand id " << messageId << " of type \"" << Akonadi2::Commands::name(commandId) << "\" from " << client.name; |
251 | loadResource(); | 251 | loadResource(); |
252 | if (m_resource) { | 252 | if (m_resource) { |
253 | m_resource->processCommand(commandId, client.commandBuffer, size, m_pipeline); | 253 | m_resource->processCommand(commandId, commandBuffer, m_pipeline); |
254 | } | 254 | } |
255 | break; | 255 | break; |
256 | case Akonadi2::Commands::ShutdownCommand: | 256 | case Akonadi2::Commands::ShutdownCommand: |
@@ -262,7 +262,7 @@ void Listener::processCommand(int commandId, uint messageId, Client &client, uin | |||
262 | Log() << QString("\tReceived custom command from %1: ").arg(client.name) << commandId; | 262 | Log() << QString("\tReceived custom command from %1: ").arg(client.name) << commandId; |
263 | loadResource(); | 263 | loadResource(); |
264 | if (m_resource) { | 264 | if (m_resource) { |
265 | m_resource->processCommand(commandId, client.commandBuffer, size, m_pipeline); | 265 | m_resource->processCommand(commandId, commandBuffer, m_pipeline); |
266 | } | 266 | } |
267 | } else { | 267 | } else { |
268 | Warning() << QString("\tReceived invalid command from %1: ").arg(client.name) << commandId; | 268 | Warning() << QString("\tReceived invalid command from %1: ").arg(client.name) << commandId; |
@@ -307,7 +307,9 @@ bool Listener::processClientBuffer(Client &client) | |||
307 | 307 | ||
308 | auto socket = QPointer<QLocalSocket>(client.socket); | 308 | auto socket = QPointer<QLocalSocket>(client.socket); |
309 | auto clientName = client.name; | 309 | auto clientName = client.name; |
310 | processCommand(commandId, messageId, client, size, [this, messageId, commandId, socket, clientName]() { | 310 | const QByteArray commandBuffer = client.commandBuffer.left(size); |
311 | client.commandBuffer.remove(0, size); | ||
312 | processCommand(commandId, messageId, commandBuffer, client, [this, messageId, commandId, socket, clientName]() { | ||
311 | Log() << QString("\tCompleted command messageid %1 of type \"%2\" from %3").arg(messageId).arg(QString(Akonadi2::Commands::name(commandId))).arg(clientName); | 313 | Log() << QString("\tCompleted command messageid %1 of type \"%2\" from %3").arg(messageId).arg(QString(Akonadi2::Commands::name(commandId))).arg(clientName); |
312 | if (socket) { | 314 | if (socket) { |
313 | sendCommandCompleted(socket.data(), messageId); | 315 | sendCommandCompleted(socket.data(), messageId); |
@@ -315,7 +317,6 @@ bool Listener::processClientBuffer(Client &client) | |||
315 | Log() << QString("Socket became invalid before we could send a response. client: %1").arg(clientName); | 317 | Log() << QString("Socket became invalid before we could send a response. client: %1").arg(clientName); |
316 | } | 318 | } |
317 | }); | 319 | }); |
318 | client.commandBuffer.remove(0, size); | ||
319 | 320 | ||
320 | return client.commandBuffer.size() >= headerSize; | 321 | return client.commandBuffer.size() >= headerSize; |
321 | } | 322 | } |
diff --git a/synchronizer/listener.h b/synchronizer/listener.h index e03c310..560f052 100644 --- a/synchronizer/listener.h +++ b/synchronizer/listener.h | |||
@@ -79,7 +79,7 @@ private Q_SLOTS: | |||
79 | void quit(); | 79 | void quit(); |
80 | 80 | ||
81 | private: | 81 | private: |
82 | void processCommand(int commandId, uint messageId, Client &client, uint size, const std::function<void()> &callback); | 82 | void processCommand(int commandId, uint messageId, const QByteArray &commandBuffer, Client &client, const std::function<void()> &callback); |
83 | bool processClientBuffer(Client &client); | 83 | bool processClientBuffer(Client &client); |
84 | void sendCurrentRevision(Client &client); | 84 | void sendCurrentRevision(Client &client); |
85 | void sendCommandCompleted(QLocalSocket *socket, uint messageId); | 85 | void sendCommandCompleted(QLocalSocket *socket, uint messageId); |
diff --git a/tests/dummyresourcebenchmark.cpp b/tests/dummyresourcebenchmark.cpp index fd2cb01..7d40779 100644 --- a/tests/dummyresourcebenchmark.cpp +++ b/tests/dummyresourcebenchmark.cpp | |||
@@ -133,7 +133,7 @@ private Q_SLOTS: | |||
133 | const QByteArray command(reinterpret_cast<const char *>(fbb.GetBufferPointer()), fbb.GetSize()); | 133 | const QByteArray command(reinterpret_cast<const char *>(fbb.GetBufferPointer()), fbb.GetSize()); |
134 | 134 | ||
135 | for (int i = 0; i < num; i++) { | 135 | for (int i = 0; i < num; i++) { |
136 | resource.processCommand(Akonadi2::Commands::CreateEntityCommand, command, command.size(), &pipeline); | 136 | resource.processCommand(Akonadi2::Commands::CreateEntityCommand, command, &pipeline); |
137 | } | 137 | } |
138 | auto appendTime = time.elapsed(); | 138 | auto appendTime = time.elapsed(); |
139 | 139 | ||
diff --git a/tests/dummyresourcetest.cpp b/tests/dummyresourcetest.cpp index 7499d62..36812c1 100644 --- a/tests/dummyresourcetest.cpp +++ b/tests/dummyresourcetest.cpp | |||
@@ -89,8 +89,8 @@ private Q_SLOTS: | |||
89 | QSignalSpy revisionSpy(&pipeline, SIGNAL(revisionUpdated())); | 89 | QSignalSpy revisionSpy(&pipeline, SIGNAL(revisionUpdated())); |
90 | DummyResource resource("org.kde.dummy.instance1"); | 90 | DummyResource resource("org.kde.dummy.instance1"); |
91 | resource.configurePipeline(&pipeline); | 91 | resource.configurePipeline(&pipeline); |
92 | resource.processCommand(Akonadi2::Commands::CreateEntityCommand, command, command.size(), &pipeline); | 92 | resource.processCommand(Akonadi2::Commands::CreateEntityCommand, command, &pipeline); |
93 | resource.processCommand(Akonadi2::Commands::CreateEntityCommand, command, command.size(), &pipeline); | 93 | resource.processCommand(Akonadi2::Commands::CreateEntityCommand, command, &pipeline); |
94 | 94 | ||
95 | QVERIFY(revisionSpy.isValid()); | 95 | QVERIFY(revisionSpy.isValid()); |
96 | QTRY_COMPARE(revisionSpy.count(), 2); | 96 | QTRY_COMPARE(revisionSpy.count(), 2); |