summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2015-07-20 16:15:49 +0200
committerChristian Mollekopf <chrigi_1@fastmail.fm>2015-07-23 17:37:53 +0200
commitb187b95672fe0d8b16ba80bedd9022f1cda3a051 (patch)
treec86bf63c73eb91aca83a3d0c22be95b7a7515f0b
parent4a71e5b06506af6e3ab7a7715705e76b6d6e9bb7 (diff)
downloadsink-b187b95672fe0d8b16ba80bedd9022f1cda3a051.tar.gz
sink-b187b95672fe0d8b16ba80bedd9022f1cda3a051.zip
Pass command around as QByteArray
Simpler api, GenericResource didn't honor size anyways, and we copy the command for now to avoid sideeffects of data coming in in the meantime (although that should generally work since data is always appended).
-rw-r--r--common/genericresource.cpp2
-rw-r--r--common/genericresource.h2
-rw-r--r--common/resource.cpp3
-rw-r--r--common/resource.h2
-rw-r--r--synchronizer/listener.cpp19
-rw-r--r--synchronizer/listener.h2
-rw-r--r--tests/dummyresourcebenchmark.cpp2
-rw-r--r--tests/dummyresourcetest.cpp4
8 files changed, 18 insertions, 18 deletions
diff --git a/common/genericresource.cpp b/common/genericresource.cpp
index 5dfa9b5..ae06ef4 100644
--- a/common/genericresource.cpp
+++ b/common/genericresource.cpp
@@ -194,7 +194,7 @@ void GenericResource::enqueueCommand(MessageQueue &mq, int commandId, const QByt
194 mq.enqueue(m_fbb.GetBufferPointer(), m_fbb.GetSize()); 194 mq.enqueue(m_fbb.GetBufferPointer(), m_fbb.GetSize());
195} 195}
196 196
197void GenericResource::processCommand(int commandId, const QByteArray &data, uint size, Akonadi2::Pipeline *pipeline) 197void GenericResource::processCommand(int commandId, const QByteArray &data, Akonadi2::Pipeline *pipeline)
198{ 198{
199 //TODO instead of copying the command including the full entity first into the command queue, we could directly 199 //TODO instead of copying the command including the full entity first into the command queue, we could directly
200 //create a new revision, only pushing a handle into the commandqueue with the relevant changeset (for changereplay). 200 //create a new revision, only pushing a handle into the commandqueue with the relevant changeset (for changereplay).
diff --git a/common/genericresource.h b/common/genericresource.h
index c44989e..e9d5d59 100644
--- a/common/genericresource.h
+++ b/common/genericresource.h
@@ -37,7 +37,7 @@ public:
37 GenericResource(const QByteArray &resourceInstanceIdentifier); 37 GenericResource(const QByteArray &resourceInstanceIdentifier);
38 virtual ~GenericResource(); 38 virtual ~GenericResource();
39 39
40 virtual void processCommand(int commandId, const QByteArray &data, uint size, Pipeline *pipeline) Q_DECL_OVERRIDE; 40 virtual void processCommand(int commandId, const QByteArray &data, Pipeline *pipeline) Q_DECL_OVERRIDE;
41 virtual KAsync::Job<void> synchronizeWithSource(Pipeline *pipeline) Q_DECL_OVERRIDE = 0; 41 virtual KAsync::Job<void> synchronizeWithSource(Pipeline *pipeline) Q_DECL_OVERRIDE = 0;
42 virtual KAsync::Job<void> processAllMessages() Q_DECL_OVERRIDE; 42 virtual KAsync::Job<void> processAllMessages() Q_DECL_OVERRIDE;
43 43
diff --git a/common/resource.cpp b/common/resource.cpp
index bd69afd..40ad04c 100644
--- a/common/resource.cpp
+++ b/common/resource.cpp
@@ -44,11 +44,10 @@ void Resource::configurePipeline(Pipeline *pipeline)
44 44
45} 45}
46 46
47void Resource::processCommand(int commandId, const QByteArray &data, uint size, Pipeline *pipeline) 47void Resource::processCommand(int commandId, const QByteArray &data, Pipeline *pipeline)
48{ 48{
49 Q_UNUSED(commandId) 49 Q_UNUSED(commandId)
50 Q_UNUSED(data) 50 Q_UNUSED(data)
51 Q_UNUSED(size)
52 Q_UNUSED(pipeline) 51 Q_UNUSED(pipeline)
53 pipeline->null(); 52 pipeline->null();
54} 53}
diff --git a/common/resource.h b/common/resource.h
index ebbc2e1..009050e 100644
--- a/common/resource.h
+++ b/common/resource.h
@@ -36,7 +36,7 @@ public:
36 Resource(); 36 Resource();
37 virtual ~Resource(); 37 virtual ~Resource();
38 38
39 virtual void processCommand(int commandId, const QByteArray &data, uint size, Pipeline *pipeline); 39 virtual void processCommand(int commandId, const QByteArray &data, Pipeline *pipeline);
40 virtual KAsync::Job<void> synchronizeWithSource(Pipeline *pipeline); 40 virtual KAsync::Job<void> synchronizeWithSource(Pipeline *pipeline);
41 virtual KAsync::Job<void> processAllMessages(); 41 virtual KAsync::Job<void> processAllMessages();
42 42
diff --git a/synchronizer/listener.cpp b/synchronizer/listener.cpp
index 1553f7d..2559664 100644
--- a/synchronizer/listener.cpp
+++ b/synchronizer/listener.cpp
@@ -203,13 +203,13 @@ void Listener::processClientBuffers()
203 } 203 }
204} 204}
205 205
206void Listener::processCommand(int commandId, uint messageId, Client &client, uint size, const std::function<void()> &callback) 206void Listener::processCommand(int commandId, uint messageId, const QByteArray &commandBuffer, Client &client, const std::function<void()> &callback)
207{ 207{
208 switch (commandId) { 208 switch (commandId) {
209 case Akonadi2::Commands::HandshakeCommand: { 209 case Akonadi2::Commands::HandshakeCommand: {
210 flatbuffers::Verifier verifier((const uint8_t *)client.commandBuffer.constData(), size); 210 flatbuffers::Verifier verifier((const uint8_t *)commandBuffer.constData(), commandBuffer.size());
211 if (Akonadi2::VerifyHandshakeBuffer(verifier)) { 211 if (Akonadi2::VerifyHandshakeBuffer(verifier)) {
212 auto buffer = Akonadi2::GetHandshake(client.commandBuffer.constData()); 212 auto buffer = Akonadi2::GetHandshake(commandBuffer.constData());
213 client.name = buffer->name()->c_str(); 213 client.name = buffer->name()->c_str();
214 sendCurrentRevision(client); 214 sendCurrentRevision(client);
215 } else { 215 } else {
@@ -218,9 +218,9 @@ void Listener::processCommand(int commandId, uint messageId, Client &client, uin
218 break; 218 break;
219 } 219 }
220 case Akonadi2::Commands::SynchronizeCommand: { 220 case Akonadi2::Commands::SynchronizeCommand: {
221 flatbuffers::Verifier verifier((const uint8_t *)client.commandBuffer.constData(), size); 221 flatbuffers::Verifier verifier((const uint8_t *)commandBuffer.constData(), commandBuffer.size());
222 if (Akonadi2::VerifySynchronizeBuffer(verifier)) { 222 if (Akonadi2::VerifySynchronizeBuffer(verifier)) {
223 auto buffer = Akonadi2::GetSynchronize(client.commandBuffer.constData()); 223 auto buffer = Akonadi2::GetSynchronize(commandBuffer.constData());
224 Log() << QString("\tSynchronize request (id %1) from %2").arg(messageId).arg(client.name); 224 Log() << QString("\tSynchronize request (id %1) from %2").arg(messageId).arg(client.name);
225 loadResource(); 225 loadResource();
226 if (!m_resource) { 226 if (!m_resource) {
@@ -250,7 +250,7 @@ void Listener::processCommand(int commandId, uint messageId, Client &client, uin
250 Log() << "\tCommand id " << messageId << " of type \"" << Akonadi2::Commands::name(commandId) << "\" from " << client.name; 250 Log() << "\tCommand id " << messageId << " of type \"" << Akonadi2::Commands::name(commandId) << "\" from " << client.name;
251 loadResource(); 251 loadResource();
252 if (m_resource) { 252 if (m_resource) {
253 m_resource->processCommand(commandId, client.commandBuffer, size, m_pipeline); 253 m_resource->processCommand(commandId, commandBuffer, m_pipeline);
254 } 254 }
255 break; 255 break;
256 case Akonadi2::Commands::ShutdownCommand: 256 case Akonadi2::Commands::ShutdownCommand:
@@ -262,7 +262,7 @@ void Listener::processCommand(int commandId, uint messageId, Client &client, uin
262 Log() << QString("\tReceived custom command from %1: ").arg(client.name) << commandId; 262 Log() << QString("\tReceived custom command from %1: ").arg(client.name) << commandId;
263 loadResource(); 263 loadResource();
264 if (m_resource) { 264 if (m_resource) {
265 m_resource->processCommand(commandId, client.commandBuffer, size, m_pipeline); 265 m_resource->processCommand(commandId, commandBuffer, m_pipeline);
266 } 266 }
267 } else { 267 } else {
268 Warning() << QString("\tReceived invalid command from %1: ").arg(client.name) << commandId; 268 Warning() << QString("\tReceived invalid command from %1: ").arg(client.name) << commandId;
@@ -307,7 +307,9 @@ bool Listener::processClientBuffer(Client &client)
307 307
308 auto socket = QPointer<QLocalSocket>(client.socket); 308 auto socket = QPointer<QLocalSocket>(client.socket);
309 auto clientName = client.name; 309 auto clientName = client.name;
310 processCommand(commandId, messageId, client, size, [this, messageId, commandId, socket, clientName]() { 310 const QByteArray commandBuffer = client.commandBuffer.left(size);
311 client.commandBuffer.remove(0, size);
312 processCommand(commandId, messageId, commandBuffer, client, [this, messageId, commandId, socket, clientName]() {
311 Log() << QString("\tCompleted command messageid %1 of type \"%2\" from %3").arg(messageId).arg(QString(Akonadi2::Commands::name(commandId))).arg(clientName); 313 Log() << QString("\tCompleted command messageid %1 of type \"%2\" from %3").arg(messageId).arg(QString(Akonadi2::Commands::name(commandId))).arg(clientName);
312 if (socket) { 314 if (socket) {
313 sendCommandCompleted(socket.data(), messageId); 315 sendCommandCompleted(socket.data(), messageId);
@@ -315,7 +317,6 @@ bool Listener::processClientBuffer(Client &client)
315 Log() << QString("Socket became invalid before we could send a response. client: %1").arg(clientName); 317 Log() << QString("Socket became invalid before we could send a response. client: %1").arg(clientName);
316 } 318 }
317 }); 319 });
318 client.commandBuffer.remove(0, size);
319 320
320 return client.commandBuffer.size() >= headerSize; 321 return client.commandBuffer.size() >= headerSize;
321 } 322 }
diff --git a/synchronizer/listener.h b/synchronizer/listener.h
index e03c310..560f052 100644
--- a/synchronizer/listener.h
+++ b/synchronizer/listener.h
@@ -79,7 +79,7 @@ private Q_SLOTS:
79 void quit(); 79 void quit();
80 80
81private: 81private:
82 void processCommand(int commandId, uint messageId, Client &client, uint size, const std::function<void()> &callback); 82 void processCommand(int commandId, uint messageId, const QByteArray &commandBuffer, Client &client, const std::function<void()> &callback);
83 bool processClientBuffer(Client &client); 83 bool processClientBuffer(Client &client);
84 void sendCurrentRevision(Client &client); 84 void sendCurrentRevision(Client &client);
85 void sendCommandCompleted(QLocalSocket *socket, uint messageId); 85 void sendCommandCompleted(QLocalSocket *socket, uint messageId);
diff --git a/tests/dummyresourcebenchmark.cpp b/tests/dummyresourcebenchmark.cpp
index fd2cb01..7d40779 100644
--- a/tests/dummyresourcebenchmark.cpp
+++ b/tests/dummyresourcebenchmark.cpp
@@ -133,7 +133,7 @@ private Q_SLOTS:
133 const QByteArray command(reinterpret_cast<const char *>(fbb.GetBufferPointer()), fbb.GetSize()); 133 const QByteArray command(reinterpret_cast<const char *>(fbb.GetBufferPointer()), fbb.GetSize());
134 134
135 for (int i = 0; i < num; i++) { 135 for (int i = 0; i < num; i++) {
136 resource.processCommand(Akonadi2::Commands::CreateEntityCommand, command, command.size(), &pipeline); 136 resource.processCommand(Akonadi2::Commands::CreateEntityCommand, command, &pipeline);
137 } 137 }
138 auto appendTime = time.elapsed(); 138 auto appendTime = time.elapsed();
139 139
diff --git a/tests/dummyresourcetest.cpp b/tests/dummyresourcetest.cpp
index 7499d62..36812c1 100644
--- a/tests/dummyresourcetest.cpp
+++ b/tests/dummyresourcetest.cpp
@@ -89,8 +89,8 @@ private Q_SLOTS:
89 QSignalSpy revisionSpy(&pipeline, SIGNAL(revisionUpdated())); 89 QSignalSpy revisionSpy(&pipeline, SIGNAL(revisionUpdated()));
90 DummyResource resource("org.kde.dummy.instance1"); 90 DummyResource resource("org.kde.dummy.instance1");
91 resource.configurePipeline(&pipeline); 91 resource.configurePipeline(&pipeline);
92 resource.processCommand(Akonadi2::Commands::CreateEntityCommand, command, command.size(), &pipeline); 92 resource.processCommand(Akonadi2::Commands::CreateEntityCommand, command, &pipeline);
93 resource.processCommand(Akonadi2::Commands::CreateEntityCommand, command, command.size(), &pipeline); 93 resource.processCommand(Akonadi2::Commands::CreateEntityCommand, command, &pipeline);
94 94
95 QVERIFY(revisionSpy.isValid()); 95 QVERIFY(revisionSpy.isValid());
96 QTRY_COMPARE(revisionSpy.count(), 2); 96 QTRY_COMPARE(revisionSpy.count(), 2);