summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAaron Seigo <aseigo@kde.org>2014-12-16 22:40:44 +0100
committerAaron Seigo <aseigo@kde.org>2014-12-16 22:40:44 +0100
commit77944384d24b5005d6b8648572a31a3ae84dd946 (patch)
tree8726831773b4182cb6177d6c72a723e08a6c15aa
parent66b21fd2e3c53e4a820e3343b192be7b043da110 (diff)
downloadsink-77944384d24b5005d6b8648572a31a3ae84dd946.tar.gz
sink-77944384d24b5005d6b8648572a31a3ae84dd946.zip
add pipelines (as a sketch only), message ids and message responses
-rw-r--r--CMakeLists.txt3
-rw-r--r--common/CMakeLists.txt2
-rw-r--r--common/commands.cpp15
-rw-r--r--common/commands.h9
-rw-r--r--common/commands/commandcompletion.fbs6
-rw-r--r--common/pipeline.cpp75
-rw-r--r--common/pipeline.h47
-rw-r--r--common/resource.cpp4
-rw-r--r--common/resource.h8
-rw-r--r--common/resourceaccess.cpp35
-rw-r--r--synchronizer/listener.cpp70
-rw-r--r--synchronizer/listener.h5
12 files changed, 237 insertions, 42 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 4096fa4..8d33281 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -36,7 +36,8 @@ endfunction(generate_flatbuffers)
36 36
37set(CMAKE_AUTOMOC ON) 37set(CMAKE_AUTOMOC ON)
38add_definitions("-Wall -std=c++0x") 38add_definitions("-Wall -std=c++0x")
39include_directories(${CMAKE_SOURCE_DIR} ${CMAKE_BINARY_DIR} ${CMAKE_BINARY_DIR}/common ${FLATBUFFERS_INCLUDE_DIR}) 39include_directories(${CMAKE_SOURCE_DIR} ${CMAKE_BINARY_DIR} ${FLATBUFFERS_INCLUDE_DIR} ${CMAKE_BINARY_DIR}/common)
40include_directories(SYSTEM ${CMAKE_SOURCE_DIR}/common)
40 41
41configure_file(hawd.conf hawd.conf) 42configure_file(hawd.conf hawd.conf)
42 43
diff --git a/common/CMakeLists.txt b/common/CMakeLists.txt
index 7de4aa9..001dab5 100644
--- a/common/CMakeLists.txt
+++ b/common/CMakeLists.txt
@@ -1,5 +1,6 @@
1project(akonadi2common) 1project(akonadi2common)
2generate_flatbuffers( 2generate_flatbuffers(
3 commands/commandcompletion
3 commands/createentity 4 commands/createentity
4 commands/deleteentity 5 commands/deleteentity
5 commands/fetchentity 6 commands/fetchentity
@@ -20,6 +21,7 @@ set(command_SRCS
20 clientapi.cpp 21 clientapi.cpp
21 commands.cpp 22 commands.cpp
22 console.cpp 23 console.cpp
24 pipeline.cpp
23 resource.cpp 25 resource.cpp
24 resourceaccess.cpp 26 resourceaccess.cpp
25 storage_common.cpp 27 storage_common.cpp
diff --git a/common/commands.cpp b/common/commands.cpp
index ecbbfdb..1dfeabe 100644
--- a/common/commands.cpp
+++ b/common/commands.cpp
@@ -28,17 +28,23 @@ namespace Akonadi2
28namespace Commands 28namespace Commands
29{ 29{
30 30
31void write(QIODevice *device, int commandId) 31int headerSize()
32{ 32{
33 write(device, commandId, 0, 0); 33 return sizeof(int) + (sizeof(uint) * 2);
34} 34}
35 35
36void write(QIODevice *device, int commandId, const char *buffer, uint size) 36void write(QIODevice *device, int messageId, int commandId)
37{
38 write(device, messageId, commandId, 0, 0);
39}
40
41void write(QIODevice *device, int messageId, int commandId, const char *buffer, uint size)
37{ 42{
38 if (size > 0 && !buffer) { 43 if (size > 0 && !buffer) {
39 size = 0; 44 size = 0;
40 } 45 }
41 46
47 device->write((const char*)&messageId, sizeof(int));
42 device->write((const char*)&commandId, sizeof(int)); 48 device->write((const char*)&commandId, sizeof(int));
43 device->write((const char*)&size, sizeof(uint)); 49 device->write((const char*)&size, sizeof(uint));
44 if (buffer) { 50 if (buffer) {
@@ -46,9 +52,10 @@ void write(QIODevice *device, int commandId, const char *buffer, uint size)
46 } 52 }
47} 53}
48 54
49void write(QIODevice *device, int commandId, flatbuffers::FlatBufferBuilder &fbb) 55void write(QIODevice *device, int messageId, int commandId, flatbuffers::FlatBufferBuilder &fbb)
50{ 56{
51 const int dataSize = fbb.GetSize(); 57 const int dataSize = fbb.GetSize();
58 device->write((const char*)&messageId, sizeof(int));
52 device->write((const char*)&commandId, sizeof(int)); 59 device->write((const char*)&commandId, sizeof(int));
53 device->write((const char*)&dataSize, sizeof(int)); 60 device->write((const char*)&dataSize, sizeof(int));
54 device->write((const char*)fbb.GetBufferPointer(), dataSize); 61 device->write((const char*)fbb.GetBufferPointer(), dataSize);
diff --git a/common/commands.h b/common/commands.h
index 874db73..c63bb47 100644
--- a/common/commands.h
+++ b/common/commands.h
@@ -33,8 +33,8 @@ namespace Commands
33 33
34enum CommandIds { 34enum CommandIds {
35 UnknownCommand = 0, 35 UnknownCommand = 0,
36 HandshakeCommand,
37 CommandCompletion, 36 CommandCompletion,
37 HandshakeCommand,
38 RevisionUpdateCommand, 38 RevisionUpdateCommand,
39 SynchronizeCommand, 39 SynchronizeCommand,
40 FetchEntityCommand, 40 FetchEntityCommand,
@@ -45,9 +45,10 @@ enum CommandIds {
45 CustomCommand = 0xffff 45 CustomCommand = 0xffff
46}; 46};
47 47
48void AKONADI2COMMON_EXPORT write(QIODevice *device, int commandId); 48int AKONADI2COMMON_EXPORT headerSize();
49void AKONADI2COMMON_EXPORT write(QIODevice *device, int commandId, const char *buffer, uint size); 49void AKONADI2COMMON_EXPORT write(QIODevice *device, int messageId, int commandId);
50void AKONADI2COMMON_EXPORT write(QIODevice *device, int commandId, flatbuffers::FlatBufferBuilder &fbb); 50void AKONADI2COMMON_EXPORT write(QIODevice *device, int messageId, int commandId, const char *buffer, uint size);
51void AKONADI2COMMON_EXPORT write(QIODevice *device, int messageId, int commandId, flatbuffers::FlatBufferBuilder &fbb);
51 52
52} 53}
53 54
diff --git a/common/commands/commandcompletion.fbs b/common/commands/commandcompletion.fbs
index 9583108..5330b4f 100644
--- a/common/commands/commandcompletion.fbs
+++ b/common/commands/commandcompletion.fbs
@@ -1,9 +1,9 @@
1namespace Akonadi2; 1namespace Akonadi2;
2 2
3table CommandCompletion { 3table CommandCompletion {
4 id: ulong 4 id: ulong;
5 success: bool 5 success: bool = true;
6 log: ulong = 0 6 log: ulong = 0;
7} 7}
8 8
9root_type CommandCompletion; 9root_type CommandCompletion;
diff --git a/common/pipeline.cpp b/common/pipeline.cpp
new file mode 100644
index 0000000..41def7c
--- /dev/null
+++ b/common/pipeline.cpp
@@ -0,0 +1,75 @@
1/*
2 * Copyright (C) 2014 Aaron Seigo <aseigo@kde.org>
3 *
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Lesser General Public
6 * License as published by the Free Software Foundation; either
7 * version 2.1 of the License, or (at your option) version 3, or any
8 * later version accepted by the membership of KDE e.V. (or its
9 * successor approved by the membership of KDE e.V.), which shall
10 * act as a proxy defined in Section 6 of version 3 of the license.
11 *
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this library. If not, see <http://www.gnu.org/licenses/>.
19 */
20
21#include "pipeline.h"
22
23#include <QStandardPaths>
24
25namespace Akonadi2
26{
27
28class Pipeline::Private
29{
30public:
31 Private(const QString &storageName)
32 : storage(QStandardPaths::writableLocation(QStandardPaths::QStandardPaths::GenericDataLocation) + "/akonadi2", storageName, Akonadi2::Storage::ReadWrite)
33 {
34
35 }
36
37 Akonadi2::Storage storage;
38};
39
40Pipeline::Pipeline(const QString &storageName)
41 : d(new Private(storageName))
42{
43}
44
45Pipeline::~Pipeline()
46{
47}
48
49Storage &Pipeline::storage()
50{
51 return d->storage;
52}
53
54void Pipeline::null(uint messageId, const char *key, size_t keySize, const char *buffer, size_t bufferSize)
55{
56 d->storage.write(key, keySize, buffer, bufferSize);
57}
58
59void Pipeline::newEntity(uint messageId, const char *key, size_t keySize, const char *buffer, size_t bufferSize)
60{
61 d->storage.write(key, keySize, buffer, bufferSize);
62}
63
64void Pipeline::modifiedEntity(uint messageId, const char *key, size_t keySize, const char *buffer, size_t bufferSize)
65{
66 d->storage.write(key, keySize, buffer, bufferSize);
67}
68
69void Pipeline::deletedEntity(uint messageId, const char *key, size_t keySize, const char *buffer, size_t bufferSize)
70{
71 d->storage.write(key, keySize, buffer, bufferSize);
72}
73
74} // namespace Akonadi2
75
diff --git a/common/pipeline.h b/common/pipeline.h
new file mode 100644
index 0000000..635e630
--- /dev/null
+++ b/common/pipeline.h
@@ -0,0 +1,47 @@
1/*
2 * Copyright (C) 2014 Aaron Seigo <aseigo@kde.org>
3 *
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Lesser General Public
6 * License as published by the Free Software Foundation; either
7 * version 2.1 of the License, or (at your option) version 3, or any
8 * later version accepted by the membership of KDE e.V. (or its
9 * successor approved by the membership of KDE e.V.), which shall
10 * act as a proxy defined in Section 6 of version 3 of the license.
11 *
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this library. If not, see <http://www.gnu.org/licenses/>.
19 */
20
21#pragma once
22
23#include <akonadi2common_export.h>
24#include <storage.h>
25
26namespace Akonadi2
27{
28
29class AKONADI2COMMON_EXPORT Pipeline
30{
31public:
32 Pipeline(const QString &storagePath);
33 ~Pipeline();
34
35 Storage &storage();
36 void null(uint messageId, const char *key, size_t keySize, const char *buffer, size_t bufferSize);
37 void newEntity(uint messageId, const char *key, size_t keySize, const char *buffer, size_t bufferSize);
38 void modifiedEntity(uint messageId, const char *key, size_t keySize, const char *buffer, size_t bufferSize);
39 void deletedEntity(uint messageId, const char *key, size_t keySize, const char *buffer, size_t bufferSize);
40
41private:
42 class Private;
43 Private * const d;
44};
45
46} // namespace Akonadi2
47
diff --git a/common/resource.cpp b/common/resource.cpp
index 7120d1c..11a03ca 100644
--- a/common/resource.cpp
+++ b/common/resource.cpp
@@ -39,6 +39,10 @@ Resource::~Resource()
39 //delete d; 39 //delete d;
40} 40}
41 41
42void Resource::processCommand(uint messageId, int commandId, const QByteArray &data, uint size, Pipeline *pipeline)
43{
44}
45
42void Resource::synchronizeWithSource() 46void Resource::synchronizeWithSource()
43{ 47{
44} 48}
diff --git a/common/resource.h b/common/resource.h
index 2ecff03..53c0bc1 100644
--- a/common/resource.h
+++ b/common/resource.h
@@ -18,18 +18,22 @@
18 * License along with this library. If not, see <http://www.gnu.org/licenses/>. 18 * License along with this library. If not, see <http://www.gnu.org/licenses/>.
19 */ 19 */
20 20
21#include "clientapi.h" 21#include <clientapi.h>
22#include <akonadi2common_export.h>
23#include <pipeline.h>
22 24
23namespace Akonadi2 25namespace Akonadi2
24{ 26{
25 27
26class Resource 28class AKONADI2COMMON_EXPORT Resource
27{ 29{
28public: 30public:
29 //TODO: configuration 31 //TODO: configuration
30 Resource(); 32 Resource();
31 virtual ~Resource(); 33 virtual ~Resource();
32 34
35 //TODO: this will need to be async
36 virtual void processCommand(uint messageId, int commandId, const QByteArray &data, uint size, Pipeline *pipeline);
33 virtual void synchronizeWithSource(); 37 virtual void synchronizeWithSource();
34 38
35private: 39private:
diff --git a/common/resourceaccess.cpp b/common/resourceaccess.cpp
index a4f3c94..2f7d207 100644
--- a/common/resourceaccess.cpp
+++ b/common/resourceaccess.cpp
@@ -22,6 +22,7 @@
22 22
23#include "common/console.h" 23#include "common/console.h"
24#include "common/commands.h" 24#include "common/commands.h"
25#include "common/commandcompletion_generated.h"
25#include "common/handshake_generated.h" 26#include "common/handshake_generated.h"
26#include "common/revisionupdate_generated.h" 27#include "common/revisionupdate_generated.h"
27 28
@@ -54,10 +55,10 @@ public:
54 delete[] m_buffer; 55 delete[] m_buffer;
55 } 56 }
56 57
57 void write(QIODevice *device) 58 void write(QIODevice *device, uint messageId)
58 { 59 {
59 Console::main()->log(QString("\tSending queued command %1").arg(m_commandId)); 60 Console::main()->log(QString("\tSending queued command %1").arg(m_commandId));
60 Commands::write(device, m_commandId, m_buffer, m_bufferSize); 61 Commands::write(device, messageId, m_commandId, m_buffer, m_bufferSize);
61 } 62 }
62 63
63private: 64private:
@@ -80,13 +81,15 @@ public:
80 QByteArray partialMessageBuffer; 81 QByteArray partialMessageBuffer;
81 flatbuffers::FlatBufferBuilder fbb; 82 flatbuffers::FlatBufferBuilder fbb;
82 QVector<QueuedCommand *> commandQueue; 83 QVector<QueuedCommand *> commandQueue;
84 uint messageId;
83}; 85};
84 86
85ResourceAccess::Private::Private(const QString &name, ResourceAccess *q) 87ResourceAccess::Private::Private(const QString &name, ResourceAccess *q)
86 : resourceName(name), 88 : resourceName(name),
87 socket(new QLocalSocket(q)), 89 socket(new QLocalSocket(q)),
88 tryOpenTimer(new QTimer(q)), 90 tryOpenTimer(new QTimer(q)),
89 startingProcess(false) 91 startingProcess(false),
92 messageId(0)
90{ 93{
91} 94}
92 95
@@ -129,7 +132,7 @@ void ResourceAccess::sendCommand(int commandId)
129{ 132{
130 if (isReady()) { 133 if (isReady()) {
131 log(QString("Sending command %1").arg(commandId)); 134 log(QString("Sending command %1").arg(commandId));
132 Commands::write(d->socket, commandId); 135 Commands::write(d->socket, ++d->messageId, commandId);
133 } else { 136 } else {
134 d->commandQueue << new QueuedCommand(commandId); 137 d->commandQueue << new QueuedCommand(commandId);
135 } 138 }
@@ -139,7 +142,7 @@ void ResourceAccess::sendCommand(int commandId, flatbuffers::FlatBufferBuilder &
139{ 142{
140 if (isReady()) { 143 if (isReady()) {
141 log(QString("Sending command %1").arg(commandId)); 144 log(QString("Sending command %1").arg(commandId));
142 Commands::write(d->socket, commandId, fbb); 145 Commands::write(d->socket, ++d->messageId, commandId, fbb);
143 } else { 146 } else {
144 d->commandQueue << new QueuedCommand(commandId, fbb); 147 d->commandQueue << new QueuedCommand(commandId, fbb);
145 } 148 }
@@ -180,7 +183,7 @@ void ResourceAccess::connected()
180 auto name = d->fbb.CreateString(QString::number((long long)this).toLatin1()); 183 auto name = d->fbb.CreateString(QString::number((long long)this).toLatin1());
181 auto command = Akonadi2::CreateHandshake(d->fbb, name); 184 auto command = Akonadi2::CreateHandshake(d->fbb, name);
182 Akonadi2::FinishHandshakeBuffer(d->fbb, command); 185 Akonadi2::FinishHandshakeBuffer(d->fbb, command);
183 Commands::write(d->socket, Commands::HandshakeCommand, d->fbb); 186 Commands::write(d->socket, ++d->messageId, Commands::HandshakeCommand, d->fbb);
184 d->fbb.Clear(); 187 d->fbb.Clear();
185 } 188 }
186 189
@@ -188,7 +191,7 @@ void ResourceAccess::connected()
188 //TODO: serialize instead of blast them all through the socket? 191 //TODO: serialize instead of blast them all through the socket?
189 log(QString("We have %1 queued commands").arg(d->commandQueue.size())); 192 log(QString("We have %1 queued commands").arg(d->commandQueue.size()));
190 for (QueuedCommand *command: d->commandQueue) { 193 for (QueuedCommand *command: d->commandQueue) {
191 command->write(d->socket); 194 command->write(d->socket, ++d->messageId);
192 delete command; 195 delete command;
193 } 196 }
194 d->commandQueue.clear(); 197 d->commandQueue.clear();
@@ -239,25 +242,33 @@ void ResourceAccess::readResourceMessage()
239 242
240bool ResourceAccess::processMessageBuffer() 243bool ResourceAccess::processMessageBuffer()
241{ 244{
242 static const int headerSize = (sizeof(int) * 2); 245 static const int headerSize = Commands::headerSize();
243 if (d->partialMessageBuffer.size() < headerSize) { 246 if (d->partialMessageBuffer.size() < headerSize) {
244 return false; 247 return false;
245 } 248 }
246 249
247 const int commandId = *(int*)d->partialMessageBuffer.constData(); 250 //messageId is unused, so commented out
248 const int size = *(int*)(d->partialMessageBuffer.constData() + sizeof(int)); 251 //const uint messageId = *(int*)(d->partialMessageBuffer.constData());
252 const int commandId = *(int*)(d->partialMessageBuffer.constData() + sizeof(uint));
253 const uint size = *(int*)(d->partialMessageBuffer.constData() + sizeof(int) + sizeof(uint));
249 254
250 if (size > d->partialMessageBuffer.size() - headerSize) { 255 if (size > (uint)(d->partialMessageBuffer.size() - headerSize)) {
251 return false; 256 return false;
252 } 257 }
253 258
254 switch (commandId) { 259 switch (commandId) {
255 case Commands::RevisionUpdateCommand: { 260 case Commands::RevisionUpdateCommand: {
256 auto buffer = Akonadi2::GetRevisionUpdate(d->partialMessageBuffer.constData() + headerSize); 261 auto buffer = GetRevisionUpdate(d->partialMessageBuffer.constData() + headerSize);
257 log(QString("Revision updated to: %1").arg(buffer->revision())); 262 log(QString("Revision updated to: %1").arg(buffer->revision()));
258 emit revisionChanged(buffer->revision()); 263 emit revisionChanged(buffer->revision());
259 break; 264 break;
260 } 265 }
266 case Commands::CommandCompletion: {
267 auto buffer = GetCommandCompletion(d->partialMessageBuffer.constData() + headerSize);
268 log(QString("Command %1 completed %2").arg(buffer->id()).arg(buffer->success() ? "sucessfully" : "unsuccessfully"));
269 //TODO: if a queued command, get it out of the queue ... pass on completion ot the relevant objects .. etc
270 break;
271 }
261 default: 272 default:
262 break; 273 break;
263 } 274 }
diff --git a/synchronizer/listener.cpp b/synchronizer/listener.cpp
index 368dae5..18442e7 100644
--- a/synchronizer/listener.cpp
+++ b/synchronizer/listener.cpp
@@ -22,8 +22,11 @@
22#include "common/clientapi.h" 22#include "common/clientapi.h"
23#include "common/console.h" 23#include "common/console.h"
24#include "common/commands.h" 24#include "common/commands.h"
25#include "common/handshake_generated.h"
26#include "common/resource.h" 25#include "common/resource.h"
26
27// commands
28#include "common/commandcompletion_generated.h"
29#include "common/handshake_generated.h"
27#include "common/revisionupdate_generated.h" 30#include "common/revisionupdate_generated.h"
28 31
29#include <QLocalSocket> 32#include <QLocalSocket>
@@ -35,7 +38,9 @@ Listener::Listener(const QString &resourceName, QObject *parent)
35 m_revision(0), 38 m_revision(0),
36 m_resourceName(resourceName), 39 m_resourceName(resourceName),
37 m_resource(0), 40 m_resource(0),
38 m_clientBufferProcessesTimer(new QTimer(this)) 41 m_pipeline(new Akonadi2::Pipeline(resourceName)),
42 m_clientBufferProcessesTimer(new QTimer(this)),
43 m_messageId(0)
39{ 44{
40 connect(m_server, &QLocalServer::newConnection, 45 connect(m_server, &QLocalServer::newConnection,
41 this, &Listener::acceptConnection); 46 this, &Listener::acceptConnection);
@@ -64,6 +69,7 @@ Listener::Listener(const QString &resourceName, QObject *parent)
64 69
65Listener::~Listener() 70Listener::~Listener()
66{ 71{
72 delete m_pipeline;
67} 73}
68 74
69void Listener::setRevision(unsigned long long revision) 75void Listener::setRevision(unsigned long long revision)
@@ -183,43 +189,62 @@ void Listener::processClientBuffers()
183 189
184bool Listener::processClientBuffer(Client &client) 190bool Listener::processClientBuffer(Client &client)
185{ 191{
186 static const int headerSize = (sizeof(int) + sizeof(uint)); 192 static const int headerSize = Akonadi2::Commands::headerSize();
187 if (client.commandBuffer.size() < headerSize) { 193 if (client.commandBuffer.size() < headerSize) {
188 return false; 194 return false;
189 } 195 }
190 196
191 int commandId; 197 int commandId;
192 uint size; 198 uint messageId, size;
193 commandId = *(int*)client.commandBuffer.constData(); 199 messageId = *(uint*)client.commandBuffer.constData();
194 size = *(uint*)(client.commandBuffer.constData() + sizeof(uint)); 200 commandId = *(int*)(client.commandBuffer.constData() + sizeof(uint));
201 size = *(uint*)(client.commandBuffer.constData() + sizeof(int) + sizeof(uint));
195 202
196 //TODO: reject messages above a certain size? 203 //TODO: reject messages above a certain size?
197 204
198 if (size <= uint(client.commandBuffer.size() - headerSize)) { 205 if (size <= uint(client.commandBuffer.size() - headerSize)) {
199 QByteArray data = client.commandBuffer.mid(headerSize, size); 206 client.commandBuffer.remove(0, headerSize);
200 client.commandBuffer.remove(0, headerSize + size);
201 207
202 switch (commandId) { 208 switch (commandId) {
203 case Akonadi2::Commands::HandshakeCommand: { 209 case Akonadi2::Commands::HandshakeCommand: {
204 auto buffer = Akonadi2::GetHandshake(data.constData()); 210 flatbuffers::Verifier verifier((const uint8_t *)client.commandBuffer.constData(), size);
205 Akonadi2::Console::main()->log(QString(" Handshake from %1").arg(buffer->name()->c_str())); 211 if (Akonadi2::VerifyHandshakeBuffer(verifier)) {
206 client.name = buffer->name()->c_str(); 212 auto buffer = Akonadi2::GetHandshake(client.commandBuffer.constData());
207 sendCurrentRevision(client); 213 client.name = buffer->name()->c_str();
214 sendCurrentRevision(client);
215 }
208 break; 216 break;
209 } 217 }
210 case Akonadi2::Commands::SynchronizeCommand: { 218 case Akonadi2::Commands::SynchronizeCommand: {
211 Akonadi2::Console::main()->log(QString(" Synchronize request from %1").arg(client.name)); 219 Akonadi2::Console::main()->log(QString("\tSynchronize request (id %1) from %2").arg(messageId).arg(client.name));
212 loadResource(); 220 loadResource();
213 //TODO: on failure ... what?
214 if (m_resource) { 221 if (m_resource) {
215 m_resource->synchronizeWithSource(); 222 m_resource->synchronizeWithSource();
216 } 223 }
217 break; 224 break;
218 } 225 }
226 case Akonadi2::Commands::FetchEntityCommand:
227 case Akonadi2::Commands::DeleteEntityCommand:
228 case Akonadi2::Commands::ModifyEntityCommand:
229 case Akonadi2::Commands::CreateEntityCommand:
230 Akonadi2::Console::main()->log(QString("\tCommand id %1 of type %2 from %1").arg(messageId).arg(commandId).arg(client.name));
231 m_resource->processCommand(messageId, commandId, client.commandBuffer, size, m_pipeline);
232 break;
219 default: 233 default:
234 if (commandId > Akonadi2::Commands::CustomCommand) {
235 loadResource();
236 if (m_resource) {
237 m_resource->processCommand(messageId, commandId, client.commandBuffer, size, m_pipeline);
238 }
239 } else {
240 //TODO: handle error: we don't know wtf this command is
241 }
220 break; 242 break;
221 } 243 }
222 244
245 //TODO: async commands == async sendCommandCompleted
246 sendCommandCompleted(client, messageId);
247 client.commandBuffer.remove(0, size);
223 return client.commandBuffer.size() >= headerSize; 248 return client.commandBuffer.size() >= headerSize;
224 } 249 }
225 250
@@ -234,7 +259,19 @@ void Listener::sendCurrentRevision(Client &client)
234 259
235 auto command = Akonadi2::CreateRevisionUpdate(m_fbb, m_revision); 260 auto command = Akonadi2::CreateRevisionUpdate(m_fbb, m_revision);
236 Akonadi2::FinishRevisionUpdateBuffer(m_fbb, command); 261 Akonadi2::FinishRevisionUpdateBuffer(m_fbb, command);
237 Akonadi2::Commands::write(client.socket, Akonadi2::Commands::RevisionUpdateCommand, m_fbb); 262 Akonadi2::Commands::write(client.socket, ++m_messageId, Akonadi2::Commands::RevisionUpdateCommand, m_fbb);
263 m_fbb.Clear();
264}
265
266void Listener::sendCommandCompleted(Client &client, uint messageId)
267{
268 if (!client.socket || !client.socket->isValid()) {
269 return;
270 }
271
272 auto command = Akonadi2::CreateCommandCompletion(m_fbb, messageId);
273 Akonadi2::FinishCommandCompletionBuffer(m_fbb, command);
274 Akonadi2::Commands::write(client.socket, ++m_messageId, Akonadi2::Commands::CommandCompletion, m_fbb);
238 m_fbb.Clear(); 275 m_fbb.Clear();
239} 276}
240 277
@@ -248,7 +285,7 @@ void Listener::updateClientsWithRevision()
248 continue; 285 continue;
249 } 286 }
250 287
251 Akonadi2::Commands::write(client.socket, Akonadi2::Commands::RevisionUpdateCommand, m_fbb); 288 Akonadi2::Commands::write(client.socket, ++m_messageId, Akonadi2::Commands::RevisionUpdateCommand, m_fbb);
252 } 289 }
253 m_fbb.Clear(); 290 m_fbb.Clear();
254} 291}
@@ -269,5 +306,6 @@ void Listener::loadResource()
269 } else { 306 } else {
270 Akonadi2::Console::main()->log(QString("Failed to load resource %1").arg(m_resourceName)); 307 Akonadi2::Console::main()->log(QString("Failed to load resource %1").arg(m_resourceName));
271 } 308 }
309 //TODO: on failure ... what?
272} 310}
273 311
diff --git a/synchronizer/listener.h b/synchronizer/listener.h
index 053fac3..b294277 100644
--- a/synchronizer/listener.h
+++ b/synchronizer/listener.h
@@ -25,6 +25,8 @@
25 25
26#include <flatbuffers/flatbuffers.h> 26#include <flatbuffers/flatbuffers.h>
27 27
28#include "common/pipeline.h"
29
28namespace Akonadi2 30namespace Akonadi2
29{ 31{
30 class Resource; 32 class Resource;
@@ -78,6 +80,7 @@ private Q_SLOTS:
78private: 80private:
79 bool processClientBuffer(Client &client); 81 bool processClientBuffer(Client &client);
80 void sendCurrentRevision(Client &client); 82 void sendCurrentRevision(Client &client);
83 void sendCommandCompleted(Client &client, uint messageId);
81 void updateClientsWithRevision(); 84 void updateClientsWithRevision();
82 void loadResource(); 85 void loadResource();
83 86
@@ -87,5 +90,7 @@ private:
87 flatbuffers::FlatBufferBuilder m_fbb; 90 flatbuffers::FlatBufferBuilder m_fbb;
88 const QString m_resourceName; 91 const QString m_resourceName;
89 Akonadi2::Resource *m_resource; 92 Akonadi2::Resource *m_resource;
93 Akonadi2::Pipeline *m_pipeline;
90 QTimer *m_clientBufferProcessesTimer; 94 QTimer *m_clientBufferProcessesTimer;
95 int m_messageId;
91}; 96};