summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--common/genericresource.cpp2
-rw-r--r--common/genericresource.h2
-rw-r--r--common/resource.cpp3
-rw-r--r--common/resource.h2
-rw-r--r--synchronizer/listener.cpp19
-rw-r--r--synchronizer/listener.h2
-rw-r--r--tests/dummyresourcebenchmark.cpp2
-rw-r--r--tests/dummyresourcetest.cpp4
8 files changed, 18 insertions, 18 deletions
diff --git a/common/genericresource.cpp b/common/genericresource.cpp
index 5dfa9b5..ae06ef4 100644
--- a/common/genericresource.cpp
+++ b/common/genericresource.cpp
@@ -194,7 +194,7 @@ void GenericResource::enqueueCommand(MessageQueue &mq, int commandId, const QByt
194 mq.enqueue(m_fbb.GetBufferPointer(), m_fbb.GetSize()); 194 mq.enqueue(m_fbb.GetBufferPointer(), m_fbb.GetSize());
195} 195}
196 196
197void GenericResource::processCommand(int commandId, const QByteArray &data, uint size, Akonadi2::Pipeline *pipeline) 197void GenericResource::processCommand(int commandId, const QByteArray &data, Akonadi2::Pipeline *pipeline)
198{ 198{
199 //TODO instead of copying the command including the full entity first into the command queue, we could directly 199 //TODO instead of copying the command including the full entity first into the command queue, we could directly
200 //create a new revision, only pushing a handle into the commandqueue with the relevant changeset (for changereplay). 200 //create a new revision, only pushing a handle into the commandqueue with the relevant changeset (for changereplay).
diff --git a/common/genericresource.h b/common/genericresource.h
index c44989e..e9d5d59 100644
--- a/common/genericresource.h
+++ b/common/genericresource.h
@@ -37,7 +37,7 @@ public:
37 GenericResource(const QByteArray &resourceInstanceIdentifier); 37 GenericResource(const QByteArray &resourceInstanceIdentifier);
38 virtual ~GenericResource(); 38 virtual ~GenericResource();
39 39
40 virtual void processCommand(int commandId, const QByteArray &data, uint size, Pipeline *pipeline) Q_DECL_OVERRIDE; 40 virtual void processCommand(int commandId, const QByteArray &data, Pipeline *pipeline) Q_DECL_OVERRIDE;
41 virtual KAsync::Job<void> synchronizeWithSource(Pipeline *pipeline) Q_DECL_OVERRIDE = 0; 41 virtual KAsync::Job<void> synchronizeWithSource(Pipeline *pipeline) Q_DECL_OVERRIDE = 0;
42 virtual KAsync::Job<void> processAllMessages() Q_DECL_OVERRIDE; 42 virtual KAsync::Job<void> processAllMessages() Q_DECL_OVERRIDE;
43 43
diff --git a/common/resource.cpp b/common/resource.cpp
index bd69afd..40ad04c 100644
--- a/common/resource.cpp
+++ b/common/resource.cpp
@@ -44,11 +44,10 @@ void Resource::configurePipeline(Pipeline *pipeline)
44 44
45} 45}
46 46
47void Resource::processCommand(int commandId, const QByteArray &data, uint size, Pipeline *pipeline) 47void Resource::processCommand(int commandId, const QByteArray &data, Pipeline *pipeline)
48{ 48{
49 Q_UNUSED(commandId) 49 Q_UNUSED(commandId)
50 Q_UNUSED(data) 50 Q_UNUSED(data)
51 Q_UNUSED(size)
52 Q_UNUSED(pipeline) 51 Q_UNUSED(pipeline)
53 pipeline->null(); 52 pipeline->null();
54} 53}
diff --git a/common/resource.h b/common/resource.h
index ebbc2e1..009050e 100644
--- a/common/resource.h
+++ b/common/resource.h
@@ -36,7 +36,7 @@ public:
36 Resource(); 36 Resource();
37 virtual ~Resource(); 37 virtual ~Resource();
38 38
39 virtual void processCommand(int commandId, const QByteArray &data, uint size, Pipeline *pipeline); 39 virtual void processCommand(int commandId, const QByteArray &data, Pipeline *pipeline);
40 virtual KAsync::Job<void> synchronizeWithSource(Pipeline *pipeline); 40 virtual KAsync::Job<void> synchronizeWithSource(Pipeline *pipeline);
41 virtual KAsync::Job<void> processAllMessages(); 41 virtual KAsync::Job<void> processAllMessages();
42 42
diff --git a/synchronizer/listener.cpp b/synchronizer/listener.cpp
index 1553f7d..2559664 100644
--- a/synchronizer/listener.cpp
+++ b/synchronizer/listener.cpp
@@ -203,13 +203,13 @@ void Listener::processClientBuffers()
203 } 203 }
204} 204}
205 205
206void Listener::processCommand(int commandId, uint messageId, Client &client, uint size, const std::function<void()> &callback) 206void Listener::processCommand(int commandId, uint messageId, const QByteArray &commandBuffer, Client &client, const std::function<void()> &callback)
207{ 207{
208 switch (commandId) { 208 switch (commandId) {
209 case Akonadi2::Commands::HandshakeCommand: { 209 case Akonadi2::Commands::HandshakeCommand: {
210 flatbuffers::Verifier verifier((const uint8_t *)client.commandBuffer.constData(), size); 210 flatbuffers::Verifier verifier((const uint8_t *)commandBuffer.constData(), commandBuffer.size());
211 if (Akonadi2::VerifyHandshakeBuffer(verifier)) { 211 if (Akonadi2::VerifyHandshakeBuffer(verifier)) {
212 auto buffer = Akonadi2::GetHandshake(client.commandBuffer.constData()); 212 auto buffer = Akonadi2::GetHandshake(commandBuffer.constData());
213 client.name = buffer->name()->c_str(); 213 client.name = buffer->name()->c_str();
214 sendCurrentRevision(client); 214 sendCurrentRevision(client);
215 } else { 215 } else {
@@ -218,9 +218,9 @@ void Listener::processCommand(int commandId, uint messageId, Client &client, uin
218 break; 218 break;
219 } 219 }
220 case Akonadi2::Commands::SynchronizeCommand: { 220 case Akonadi2::Commands::SynchronizeCommand: {
221 flatbuffers::Verifier verifier((const uint8_t *)client.commandBuffer.constData(), size); 221 flatbuffers::Verifier verifier((const uint8_t *)commandBuffer.constData(), commandBuffer.size());
222 if (Akonadi2::VerifySynchronizeBuffer(verifier)) { 222 if (Akonadi2::VerifySynchronizeBuffer(verifier)) {
223 auto buffer = Akonadi2::GetSynchronize(client.commandBuffer.constData()); 223 auto buffer = Akonadi2::GetSynchronize(commandBuffer.constData());
224 Log() << QString("\tSynchronize request (id %1) from %2").arg(messageId).arg(client.name); 224 Log() << QString("\tSynchronize request (id %1) from %2").arg(messageId).arg(client.name);
225 loadResource(); 225 loadResource();
226 if (!m_resource) { 226 if (!m_resource) {
@@ -250,7 +250,7 @@ void Listener::processCommand(int commandId, uint messageId, Client &client, uin
250 Log() << "\tCommand id " << messageId << " of type \"" << Akonadi2::Commands::name(commandId) << "\" from " << client.name; 250 Log() << "\tCommand id " << messageId << " of type \"" << Akonadi2::Commands::name(commandId) << "\" from " << client.name;
251 loadResource(); 251 loadResource();
252 if (m_resource) { 252 if (m_resource) {
253 m_resource->processCommand(commandId, client.commandBuffer, size, m_pipeline); 253 m_resource->processCommand(commandId, commandBuffer, m_pipeline);
254 } 254 }
255 break; 255 break;
256 case Akonadi2::Commands::ShutdownCommand: 256 case Akonadi2::Commands::ShutdownCommand:
@@ -262,7 +262,7 @@ void Listener::processCommand(int commandId, uint messageId, Client &client, uin
262 Log() << QString("\tReceived custom command from %1: ").arg(client.name) << commandId; 262 Log() << QString("\tReceived custom command from %1: ").arg(client.name) << commandId;
263 loadResource(); 263 loadResource();
264 if (m_resource) { 264 if (m_resource) {
265 m_resource->processCommand(commandId, client.commandBuffer, size, m_pipeline); 265 m_resource->processCommand(commandId, commandBuffer, m_pipeline);
266 } 266 }
267 } else { 267 } else {
268 Warning() << QString("\tReceived invalid command from %1: ").arg(client.name) << commandId; 268 Warning() << QString("\tReceived invalid command from %1: ").arg(client.name) << commandId;
@@ -307,7 +307,9 @@ bool Listener::processClientBuffer(Client &client)
307 307
308 auto socket = QPointer<QLocalSocket>(client.socket); 308 auto socket = QPointer<QLocalSocket>(client.socket);
309 auto clientName = client.name; 309 auto clientName = client.name;
310 processCommand(commandId, messageId, client, size, [this, messageId, commandId, socket, clientName]() { 310 const QByteArray commandBuffer = client.commandBuffer.left(size);
311 client.commandBuffer.remove(0, size);
312 processCommand(commandId, messageId, commandBuffer, client, [this, messageId, commandId, socket, clientName]() {
311 Log() << QString("\tCompleted command messageid %1 of type \"%2\" from %3").arg(messageId).arg(QString(Akonadi2::Commands::name(commandId))).arg(clientName); 313 Log() << QString("\tCompleted command messageid %1 of type \"%2\" from %3").arg(messageId).arg(QString(Akonadi2::Commands::name(commandId))).arg(clientName);
312 if (socket) { 314 if (socket) {
313 sendCommandCompleted(socket.data(), messageId); 315 sendCommandCompleted(socket.data(), messageId);
@@ -315,7 +317,6 @@ bool Listener::processClientBuffer(Client &client)
315 Log() << QString("Socket became invalid before we could send a response. client: %1").arg(clientName); 317 Log() << QString("Socket became invalid before we could send a response. client: %1").arg(clientName);
316 } 318 }
317 }); 319 });
318 client.commandBuffer.remove(0, size);
319 320
320 return client.commandBuffer.size() >= headerSize; 321 return client.commandBuffer.size() >= headerSize;
321 } 322 }
diff --git a/synchronizer/listener.h b/synchronizer/listener.h
index e03c310..560f052 100644
--- a/synchronizer/listener.h
+++ b/synchronizer/listener.h
@@ -79,7 +79,7 @@ private Q_SLOTS:
79 void quit(); 79 void quit();
80 80
81private: 81private:
82 void processCommand(int commandId, uint messageId, Client &client, uint size, const std::function<void()> &callback); 82 void processCommand(int commandId, uint messageId, const QByteArray &commandBuffer, Client &client, const std::function<void()> &callback);
83 bool processClientBuffer(Client &client); 83 bool processClientBuffer(Client &client);
84 void sendCurrentRevision(Client &client); 84 void sendCurrentRevision(Client &client);
85 void sendCommandCompleted(QLocalSocket *socket, uint messageId); 85 void sendCommandCompleted(QLocalSocket *socket, uint messageId);
diff --git a/tests/dummyresourcebenchmark.cpp b/tests/dummyresourcebenchmark.cpp
index fd2cb01..7d40779 100644
--- a/tests/dummyresourcebenchmark.cpp
+++ b/tests/dummyresourcebenchmark.cpp
@@ -133,7 +133,7 @@ private Q_SLOTS:
133 const QByteArray command(reinterpret_cast<const char *>(fbb.GetBufferPointer()), fbb.GetSize()); 133 const QByteArray command(reinterpret_cast<const char *>(fbb.GetBufferPointer()), fbb.GetSize());
134 134
135 for (int i = 0; i < num; i++) { 135 for (int i = 0; i < num; i++) {
136 resource.processCommand(Akonadi2::Commands::CreateEntityCommand, command, command.size(), &pipeline); 136 resource.processCommand(Akonadi2::Commands::CreateEntityCommand, command, &pipeline);
137 } 137 }
138 auto appendTime = time.elapsed(); 138 auto appendTime = time.elapsed();
139 139
diff --git a/tests/dummyresourcetest.cpp b/tests/dummyresourcetest.cpp
index 7499d62..36812c1 100644
--- a/tests/dummyresourcetest.cpp
+++ b/tests/dummyresourcetest.cpp
@@ -89,8 +89,8 @@ private Q_SLOTS:
89 QSignalSpy revisionSpy(&pipeline, SIGNAL(revisionUpdated())); 89 QSignalSpy revisionSpy(&pipeline, SIGNAL(revisionUpdated()));
90 DummyResource resource("org.kde.dummy.instance1"); 90 DummyResource resource("org.kde.dummy.instance1");
91 resource.configurePipeline(&pipeline); 91 resource.configurePipeline(&pipeline);
92 resource.processCommand(Akonadi2::Commands::CreateEntityCommand, command, command.size(), &pipeline); 92 resource.processCommand(Akonadi2::Commands::CreateEntityCommand, command, &pipeline);
93 resource.processCommand(Akonadi2::Commands::CreateEntityCommand, command, command.size(), &pipeline); 93 resource.processCommand(Akonadi2::Commands::CreateEntityCommand, command, &pipeline);
94 94
95 QVERIFY(revisionSpy.isValid()); 95 QVERIFY(revisionSpy.isValid());
96 QTRY_COMPARE(revisionSpy.count(), 2); 96 QTRY_COMPARE(revisionSpy.count(), 2);