summaryrefslogtreecommitdiffstats
path: root/synchronizer
diff options
context:
space:
mode:
authorAaron Seigo <aseigo@kde.org>2014-12-16 22:40:44 +0100
committerAaron Seigo <aseigo@kde.org>2014-12-16 22:40:44 +0100
commit77944384d24b5005d6b8648572a31a3ae84dd946 (patch)
tree8726831773b4182cb6177d6c72a723e08a6c15aa /synchronizer
parent66b21fd2e3c53e4a820e3343b192be7b043da110 (diff)
downloadsink-77944384d24b5005d6b8648572a31a3ae84dd946.tar.gz
sink-77944384d24b5005d6b8648572a31a3ae84dd946.zip
add pipelines (as a sketch only), message ids and message responses
Diffstat (limited to 'synchronizer')
-rw-r--r--synchronizer/listener.cpp70
-rw-r--r--synchronizer/listener.h5
2 files changed, 59 insertions, 16 deletions
diff --git a/synchronizer/listener.cpp b/synchronizer/listener.cpp
index 368dae5..18442e7 100644
--- a/synchronizer/listener.cpp
+++ b/synchronizer/listener.cpp
@@ -22,8 +22,11 @@
22#include "common/clientapi.h" 22#include "common/clientapi.h"
23#include "common/console.h" 23#include "common/console.h"
24#include "common/commands.h" 24#include "common/commands.h"
25#include "common/handshake_generated.h"
26#include "common/resource.h" 25#include "common/resource.h"
26
27// commands
28#include "common/commandcompletion_generated.h"
29#include "common/handshake_generated.h"
27#include "common/revisionupdate_generated.h" 30#include "common/revisionupdate_generated.h"
28 31
29#include <QLocalSocket> 32#include <QLocalSocket>
@@ -35,7 +38,9 @@ Listener::Listener(const QString &resourceName, QObject *parent)
35 m_revision(0), 38 m_revision(0),
36 m_resourceName(resourceName), 39 m_resourceName(resourceName),
37 m_resource(0), 40 m_resource(0),
38 m_clientBufferProcessesTimer(new QTimer(this)) 41 m_pipeline(new Akonadi2::Pipeline(resourceName)),
42 m_clientBufferProcessesTimer(new QTimer(this)),
43 m_messageId(0)
39{ 44{
40 connect(m_server, &QLocalServer::newConnection, 45 connect(m_server, &QLocalServer::newConnection,
41 this, &Listener::acceptConnection); 46 this, &Listener::acceptConnection);
@@ -64,6 +69,7 @@ Listener::Listener(const QString &resourceName, QObject *parent)
64 69
65Listener::~Listener() 70Listener::~Listener()
66{ 71{
72 delete m_pipeline;
67} 73}
68 74
69void Listener::setRevision(unsigned long long revision) 75void Listener::setRevision(unsigned long long revision)
@@ -183,43 +189,62 @@ void Listener::processClientBuffers()
183 189
184bool Listener::processClientBuffer(Client &client) 190bool Listener::processClientBuffer(Client &client)
185{ 191{
186 static const int headerSize = (sizeof(int) + sizeof(uint)); 192 static const int headerSize = Akonadi2::Commands::headerSize();
187 if (client.commandBuffer.size() < headerSize) { 193 if (client.commandBuffer.size() < headerSize) {
188 return false; 194 return false;
189 } 195 }
190 196
191 int commandId; 197 int commandId;
192 uint size; 198 uint messageId, size;
193 commandId = *(int*)client.commandBuffer.constData(); 199 messageId = *(uint*)client.commandBuffer.constData();
194 size = *(uint*)(client.commandBuffer.constData() + sizeof(uint)); 200 commandId = *(int*)(client.commandBuffer.constData() + sizeof(uint));
201 size = *(uint*)(client.commandBuffer.constData() + sizeof(int) + sizeof(uint));
195 202
196 //TODO: reject messages above a certain size? 203 //TODO: reject messages above a certain size?
197 204
198 if (size <= uint(client.commandBuffer.size() - headerSize)) { 205 if (size <= uint(client.commandBuffer.size() - headerSize)) {
199 QByteArray data = client.commandBuffer.mid(headerSize, size); 206 client.commandBuffer.remove(0, headerSize);
200 client.commandBuffer.remove(0, headerSize + size);
201 207
202 switch (commandId) { 208 switch (commandId) {
203 case Akonadi2::Commands::HandshakeCommand: { 209 case Akonadi2::Commands::HandshakeCommand: {
204 auto buffer = Akonadi2::GetHandshake(data.constData()); 210 flatbuffers::Verifier verifier((const uint8_t *)client.commandBuffer.constData(), size);
205 Akonadi2::Console::main()->log(QString(" Handshake from %1").arg(buffer->name()->c_str())); 211 if (Akonadi2::VerifyHandshakeBuffer(verifier)) {
206 client.name = buffer->name()->c_str(); 212 auto buffer = Akonadi2::GetHandshake(client.commandBuffer.constData());
207 sendCurrentRevision(client); 213 client.name = buffer->name()->c_str();
214 sendCurrentRevision(client);
215 }
208 break; 216 break;
209 } 217 }
210 case Akonadi2::Commands::SynchronizeCommand: { 218 case Akonadi2::Commands::SynchronizeCommand: {
211 Akonadi2::Console::main()->log(QString(" Synchronize request from %1").arg(client.name)); 219 Akonadi2::Console::main()->log(QString("\tSynchronize request (id %1) from %2").arg(messageId).arg(client.name));
212 loadResource(); 220 loadResource();
213 //TODO: on failure ... what?
214 if (m_resource) { 221 if (m_resource) {
215 m_resource->synchronizeWithSource(); 222 m_resource->synchronizeWithSource();
216 } 223 }
217 break; 224 break;
218 } 225 }
226 case Akonadi2::Commands::FetchEntityCommand:
227 case Akonadi2::Commands::DeleteEntityCommand:
228 case Akonadi2::Commands::ModifyEntityCommand:
229 case Akonadi2::Commands::CreateEntityCommand:
230 Akonadi2::Console::main()->log(QString("\tCommand id %1 of type %2 from %1").arg(messageId).arg(commandId).arg(client.name));
231 m_resource->processCommand(messageId, commandId, client.commandBuffer, size, m_pipeline);
232 break;
219 default: 233 default:
234 if (commandId > Akonadi2::Commands::CustomCommand) {
235 loadResource();
236 if (m_resource) {
237 m_resource->processCommand(messageId, commandId, client.commandBuffer, size, m_pipeline);
238 }
239 } else {
240 //TODO: handle error: we don't know wtf this command is
241 }
220 break; 242 break;
221 } 243 }
222 244
245 //TODO: async commands == async sendCommandCompleted
246 sendCommandCompleted(client, messageId);
247 client.commandBuffer.remove(0, size);
223 return client.commandBuffer.size() >= headerSize; 248 return client.commandBuffer.size() >= headerSize;
224 } 249 }
225 250
@@ -234,7 +259,19 @@ void Listener::sendCurrentRevision(Client &client)
234 259
235 auto command = Akonadi2::CreateRevisionUpdate(m_fbb, m_revision); 260 auto command = Akonadi2::CreateRevisionUpdate(m_fbb, m_revision);
236 Akonadi2::FinishRevisionUpdateBuffer(m_fbb, command); 261 Akonadi2::FinishRevisionUpdateBuffer(m_fbb, command);
237 Akonadi2::Commands::write(client.socket, Akonadi2::Commands::RevisionUpdateCommand, m_fbb); 262 Akonadi2::Commands::write(client.socket, ++m_messageId, Akonadi2::Commands::RevisionUpdateCommand, m_fbb);
263 m_fbb.Clear();
264}
265
266void Listener::sendCommandCompleted(Client &client, uint messageId)
267{
268 if (!client.socket || !client.socket->isValid()) {
269 return;
270 }
271
272 auto command = Akonadi2::CreateCommandCompletion(m_fbb, messageId);
273 Akonadi2::FinishCommandCompletionBuffer(m_fbb, command);
274 Akonadi2::Commands::write(client.socket, ++m_messageId, Akonadi2::Commands::CommandCompletion, m_fbb);
238 m_fbb.Clear(); 275 m_fbb.Clear();
239} 276}
240 277
@@ -248,7 +285,7 @@ void Listener::updateClientsWithRevision()
248 continue; 285 continue;
249 } 286 }
250 287
251 Akonadi2::Commands::write(client.socket, Akonadi2::Commands::RevisionUpdateCommand, m_fbb); 288 Akonadi2::Commands::write(client.socket, ++m_messageId, Akonadi2::Commands::RevisionUpdateCommand, m_fbb);
252 } 289 }
253 m_fbb.Clear(); 290 m_fbb.Clear();
254} 291}
@@ -269,5 +306,6 @@ void Listener::loadResource()
269 } else { 306 } else {
270 Akonadi2::Console::main()->log(QString("Failed to load resource %1").arg(m_resourceName)); 307 Akonadi2::Console::main()->log(QString("Failed to load resource %1").arg(m_resourceName));
271 } 308 }
309 //TODO: on failure ... what?
272} 310}
273 311
diff --git a/synchronizer/listener.h b/synchronizer/listener.h
index 053fac3..b294277 100644
--- a/synchronizer/listener.h
+++ b/synchronizer/listener.h
@@ -25,6 +25,8 @@
25 25
26#include <flatbuffers/flatbuffers.h> 26#include <flatbuffers/flatbuffers.h>
27 27
28#include "common/pipeline.h"
29
28namespace Akonadi2 30namespace Akonadi2
29{ 31{
30 class Resource; 32 class Resource;
@@ -78,6 +80,7 @@ private Q_SLOTS:
78private: 80private:
79 bool processClientBuffer(Client &client); 81 bool processClientBuffer(Client &client);
80 void sendCurrentRevision(Client &client); 82 void sendCurrentRevision(Client &client);
83 void sendCommandCompleted(Client &client, uint messageId);
81 void updateClientsWithRevision(); 84 void updateClientsWithRevision();
82 void loadResource(); 85 void loadResource();
83 86
@@ -87,5 +90,7 @@ private:
87 flatbuffers::FlatBufferBuilder m_fbb; 90 flatbuffers::FlatBufferBuilder m_fbb;
88 const QString m_resourceName; 91 const QString m_resourceName;
89 Akonadi2::Resource *m_resource; 92 Akonadi2::Resource *m_resource;
93 Akonadi2::Pipeline *m_pipeline;
90 QTimer *m_clientBufferProcessesTimer; 94 QTimer *m_clientBufferProcessesTimer;
95 int m_messageId;
91}; 96};