diff options
author | Aaron Seigo <aseigo@kde.org> | 2014-12-16 22:40:44 +0100 |
---|---|---|
committer | Aaron Seigo <aseigo@kde.org> | 2014-12-16 22:40:44 +0100 |
commit | 77944384d24b5005d6b8648572a31a3ae84dd946 (patch) | |
tree | 8726831773b4182cb6177d6c72a723e08a6c15aa /synchronizer | |
parent | 66b21fd2e3c53e4a820e3343b192be7b043da110 (diff) | |
download | sink-77944384d24b5005d6b8648572a31a3ae84dd946.tar.gz sink-77944384d24b5005d6b8648572a31a3ae84dd946.zip |
add pipelines (as a sketch only), message ids and message responses
Diffstat (limited to 'synchronizer')
-rw-r--r-- | synchronizer/listener.cpp | 70 | ||||
-rw-r--r-- | synchronizer/listener.h | 5 |
2 files changed, 59 insertions, 16 deletions
diff --git a/synchronizer/listener.cpp b/synchronizer/listener.cpp index 368dae5..18442e7 100644 --- a/synchronizer/listener.cpp +++ b/synchronizer/listener.cpp | |||
@@ -22,8 +22,11 @@ | |||
22 | #include "common/clientapi.h" | 22 | #include "common/clientapi.h" |
23 | #include "common/console.h" | 23 | #include "common/console.h" |
24 | #include "common/commands.h" | 24 | #include "common/commands.h" |
25 | #include "common/handshake_generated.h" | ||
26 | #include "common/resource.h" | 25 | #include "common/resource.h" |
26 | |||
27 | // commands | ||
28 | #include "common/commandcompletion_generated.h" | ||
29 | #include "common/handshake_generated.h" | ||
27 | #include "common/revisionupdate_generated.h" | 30 | #include "common/revisionupdate_generated.h" |
28 | 31 | ||
29 | #include <QLocalSocket> | 32 | #include <QLocalSocket> |
@@ -35,7 +38,9 @@ Listener::Listener(const QString &resourceName, QObject *parent) | |||
35 | m_revision(0), | 38 | m_revision(0), |
36 | m_resourceName(resourceName), | 39 | m_resourceName(resourceName), |
37 | m_resource(0), | 40 | m_resource(0), |
38 | m_clientBufferProcessesTimer(new QTimer(this)) | 41 | m_pipeline(new Akonadi2::Pipeline(resourceName)), |
42 | m_clientBufferProcessesTimer(new QTimer(this)), | ||
43 | m_messageId(0) | ||
39 | { | 44 | { |
40 | connect(m_server, &QLocalServer::newConnection, | 45 | connect(m_server, &QLocalServer::newConnection, |
41 | this, &Listener::acceptConnection); | 46 | this, &Listener::acceptConnection); |
@@ -64,6 +69,7 @@ Listener::Listener(const QString &resourceName, QObject *parent) | |||
64 | 69 | ||
65 | Listener::~Listener() | 70 | Listener::~Listener() |
66 | { | 71 | { |
72 | delete m_pipeline; | ||
67 | } | 73 | } |
68 | 74 | ||
69 | void Listener::setRevision(unsigned long long revision) | 75 | void Listener::setRevision(unsigned long long revision) |
@@ -183,43 +189,62 @@ void Listener::processClientBuffers() | |||
183 | 189 | ||
184 | bool Listener::processClientBuffer(Client &client) | 190 | bool Listener::processClientBuffer(Client &client) |
185 | { | 191 | { |
186 | static const int headerSize = (sizeof(int) + sizeof(uint)); | 192 | static const int headerSize = Akonadi2::Commands::headerSize(); |
187 | if (client.commandBuffer.size() < headerSize) { | 193 | if (client.commandBuffer.size() < headerSize) { |
188 | return false; | 194 | return false; |
189 | } | 195 | } |
190 | 196 | ||
191 | int commandId; | 197 | int commandId; |
192 | uint size; | 198 | uint messageId, size; |
193 | commandId = *(int*)client.commandBuffer.constData(); | 199 | messageId = *(uint*)client.commandBuffer.constData(); |
194 | size = *(uint*)(client.commandBuffer.constData() + sizeof(uint)); | 200 | commandId = *(int*)(client.commandBuffer.constData() + sizeof(uint)); |
201 | size = *(uint*)(client.commandBuffer.constData() + sizeof(int) + sizeof(uint)); | ||
195 | 202 | ||
196 | //TODO: reject messages above a certain size? | 203 | //TODO: reject messages above a certain size? |
197 | 204 | ||
198 | if (size <= uint(client.commandBuffer.size() - headerSize)) { | 205 | if (size <= uint(client.commandBuffer.size() - headerSize)) { |
199 | QByteArray data = client.commandBuffer.mid(headerSize, size); | 206 | client.commandBuffer.remove(0, headerSize); |
200 | client.commandBuffer.remove(0, headerSize + size); | ||
201 | 207 | ||
202 | switch (commandId) { | 208 | switch (commandId) { |
203 | case Akonadi2::Commands::HandshakeCommand: { | 209 | case Akonadi2::Commands::HandshakeCommand: { |
204 | auto buffer = Akonadi2::GetHandshake(data.constData()); | 210 | flatbuffers::Verifier verifier((const uint8_t *)client.commandBuffer.constData(), size); |
205 | Akonadi2::Console::main()->log(QString(" Handshake from %1").arg(buffer->name()->c_str())); | 211 | if (Akonadi2::VerifyHandshakeBuffer(verifier)) { |
206 | client.name = buffer->name()->c_str(); | 212 | auto buffer = Akonadi2::GetHandshake(client.commandBuffer.constData()); |
207 | sendCurrentRevision(client); | 213 | client.name = buffer->name()->c_str(); |
214 | sendCurrentRevision(client); | ||
215 | } | ||
208 | break; | 216 | break; |
209 | } | 217 | } |
210 | case Akonadi2::Commands::SynchronizeCommand: { | 218 | case Akonadi2::Commands::SynchronizeCommand: { |
211 | Akonadi2::Console::main()->log(QString(" Synchronize request from %1").arg(client.name)); | 219 | Akonadi2::Console::main()->log(QString("\tSynchronize request (id %1) from %2").arg(messageId).arg(client.name)); |
212 | loadResource(); | 220 | loadResource(); |
213 | //TODO: on failure ... what? | ||
214 | if (m_resource) { | 221 | if (m_resource) { |
215 | m_resource->synchronizeWithSource(); | 222 | m_resource->synchronizeWithSource(); |
216 | } | 223 | } |
217 | break; | 224 | break; |
218 | } | 225 | } |
226 | case Akonadi2::Commands::FetchEntityCommand: | ||
227 | case Akonadi2::Commands::DeleteEntityCommand: | ||
228 | case Akonadi2::Commands::ModifyEntityCommand: | ||
229 | case Akonadi2::Commands::CreateEntityCommand: | ||
230 | Akonadi2::Console::main()->log(QString("\tCommand id %1 of type %2 from %1").arg(messageId).arg(commandId).arg(client.name)); | ||
231 | m_resource->processCommand(messageId, commandId, client.commandBuffer, size, m_pipeline); | ||
232 | break; | ||
219 | default: | 233 | default: |
234 | if (commandId > Akonadi2::Commands::CustomCommand) { | ||
235 | loadResource(); | ||
236 | if (m_resource) { | ||
237 | m_resource->processCommand(messageId, commandId, client.commandBuffer, size, m_pipeline); | ||
238 | } | ||
239 | } else { | ||
240 | //TODO: handle error: we don't know wtf this command is | ||
241 | } | ||
220 | break; | 242 | break; |
221 | } | 243 | } |
222 | 244 | ||
245 | //TODO: async commands == async sendCommandCompleted | ||
246 | sendCommandCompleted(client, messageId); | ||
247 | client.commandBuffer.remove(0, size); | ||
223 | return client.commandBuffer.size() >= headerSize; | 248 | return client.commandBuffer.size() >= headerSize; |
224 | } | 249 | } |
225 | 250 | ||
@@ -234,7 +259,19 @@ void Listener::sendCurrentRevision(Client &client) | |||
234 | 259 | ||
235 | auto command = Akonadi2::CreateRevisionUpdate(m_fbb, m_revision); | 260 | auto command = Akonadi2::CreateRevisionUpdate(m_fbb, m_revision); |
236 | Akonadi2::FinishRevisionUpdateBuffer(m_fbb, command); | 261 | Akonadi2::FinishRevisionUpdateBuffer(m_fbb, command); |
237 | Akonadi2::Commands::write(client.socket, Akonadi2::Commands::RevisionUpdateCommand, m_fbb); | 262 | Akonadi2::Commands::write(client.socket, ++m_messageId, Akonadi2::Commands::RevisionUpdateCommand, m_fbb); |
263 | m_fbb.Clear(); | ||
264 | } | ||
265 | |||
266 | void Listener::sendCommandCompleted(Client &client, uint messageId) | ||
267 | { | ||
268 | if (!client.socket || !client.socket->isValid()) { | ||
269 | return; | ||
270 | } | ||
271 | |||
272 | auto command = Akonadi2::CreateCommandCompletion(m_fbb, messageId); | ||
273 | Akonadi2::FinishCommandCompletionBuffer(m_fbb, command); | ||
274 | Akonadi2::Commands::write(client.socket, ++m_messageId, Akonadi2::Commands::CommandCompletion, m_fbb); | ||
238 | m_fbb.Clear(); | 275 | m_fbb.Clear(); |
239 | } | 276 | } |
240 | 277 | ||
@@ -248,7 +285,7 @@ void Listener::updateClientsWithRevision() | |||
248 | continue; | 285 | continue; |
249 | } | 286 | } |
250 | 287 | ||
251 | Akonadi2::Commands::write(client.socket, Akonadi2::Commands::RevisionUpdateCommand, m_fbb); | 288 | Akonadi2::Commands::write(client.socket, ++m_messageId, Akonadi2::Commands::RevisionUpdateCommand, m_fbb); |
252 | } | 289 | } |
253 | m_fbb.Clear(); | 290 | m_fbb.Clear(); |
254 | } | 291 | } |
@@ -269,5 +306,6 @@ void Listener::loadResource() | |||
269 | } else { | 306 | } else { |
270 | Akonadi2::Console::main()->log(QString("Failed to load resource %1").arg(m_resourceName)); | 307 | Akonadi2::Console::main()->log(QString("Failed to load resource %1").arg(m_resourceName)); |
271 | } | 308 | } |
309 | //TODO: on failure ... what? | ||
272 | } | 310 | } |
273 | 311 | ||
diff --git a/synchronizer/listener.h b/synchronizer/listener.h index 053fac3..b294277 100644 --- a/synchronizer/listener.h +++ b/synchronizer/listener.h | |||
@@ -25,6 +25,8 @@ | |||
25 | 25 | ||
26 | #include <flatbuffers/flatbuffers.h> | 26 | #include <flatbuffers/flatbuffers.h> |
27 | 27 | ||
28 | #include "common/pipeline.h" | ||
29 | |||
28 | namespace Akonadi2 | 30 | namespace Akonadi2 |
29 | { | 31 | { |
30 | class Resource; | 32 | class Resource; |
@@ -78,6 +80,7 @@ private Q_SLOTS: | |||
78 | private: | 80 | private: |
79 | bool processClientBuffer(Client &client); | 81 | bool processClientBuffer(Client &client); |
80 | void sendCurrentRevision(Client &client); | 82 | void sendCurrentRevision(Client &client); |
83 | void sendCommandCompleted(Client &client, uint messageId); | ||
81 | void updateClientsWithRevision(); | 84 | void updateClientsWithRevision(); |
82 | void loadResource(); | 85 | void loadResource(); |
83 | 86 | ||
@@ -87,5 +90,7 @@ private: | |||
87 | flatbuffers::FlatBufferBuilder m_fbb; | 90 | flatbuffers::FlatBufferBuilder m_fbb; |
88 | const QString m_resourceName; | 91 | const QString m_resourceName; |
89 | Akonadi2::Resource *m_resource; | 92 | Akonadi2::Resource *m_resource; |
93 | Akonadi2::Pipeline *m_pipeline; | ||
90 | QTimer *m_clientBufferProcessesTimer; | 94 | QTimer *m_clientBufferProcessesTimer; |
95 | int m_messageId; | ||
91 | }; | 96 | }; |