diff options
-rw-r--r-- | CMakeLists.txt | 3 | ||||
-rw-r--r-- | common/CMakeLists.txt | 2 | ||||
-rw-r--r-- | common/commands.cpp | 15 | ||||
-rw-r--r-- | common/commands.h | 9 | ||||
-rw-r--r-- | common/commands/commandcompletion.fbs | 6 | ||||
-rw-r--r-- | common/pipeline.cpp | 75 | ||||
-rw-r--r-- | common/pipeline.h | 47 | ||||
-rw-r--r-- | common/resource.cpp | 4 | ||||
-rw-r--r-- | common/resource.h | 8 | ||||
-rw-r--r-- | common/resourceaccess.cpp | 35 | ||||
-rw-r--r-- | synchronizer/listener.cpp | 70 | ||||
-rw-r--r-- | synchronizer/listener.h | 5 |
12 files changed, 237 insertions, 42 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt index 4096fa4..8d33281 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt | |||
@@ -36,7 +36,8 @@ endfunction(generate_flatbuffers) | |||
36 | 36 | ||
37 | set(CMAKE_AUTOMOC ON) | 37 | set(CMAKE_AUTOMOC ON) |
38 | add_definitions("-Wall -std=c++0x") | 38 | add_definitions("-Wall -std=c++0x") |
39 | include_directories(${CMAKE_SOURCE_DIR} ${CMAKE_BINARY_DIR} ${CMAKE_BINARY_DIR}/common ${FLATBUFFERS_INCLUDE_DIR}) | 39 | include_directories(${CMAKE_SOURCE_DIR} ${CMAKE_BINARY_DIR} ${FLATBUFFERS_INCLUDE_DIR} ${CMAKE_BINARY_DIR}/common) |
40 | include_directories(SYSTEM ${CMAKE_SOURCE_DIR}/common) | ||
40 | 41 | ||
41 | configure_file(hawd.conf hawd.conf) | 42 | configure_file(hawd.conf hawd.conf) |
42 | 43 | ||
diff --git a/common/CMakeLists.txt b/common/CMakeLists.txt index 7de4aa9..001dab5 100644 --- a/common/CMakeLists.txt +++ b/common/CMakeLists.txt | |||
@@ -1,5 +1,6 @@ | |||
1 | project(akonadi2common) | 1 | project(akonadi2common) |
2 | generate_flatbuffers( | 2 | generate_flatbuffers( |
3 | commands/commandcompletion | ||
3 | commands/createentity | 4 | commands/createentity |
4 | commands/deleteentity | 5 | commands/deleteentity |
5 | commands/fetchentity | 6 | commands/fetchentity |
@@ -20,6 +21,7 @@ set(command_SRCS | |||
20 | clientapi.cpp | 21 | clientapi.cpp |
21 | commands.cpp | 22 | commands.cpp |
22 | console.cpp | 23 | console.cpp |
24 | pipeline.cpp | ||
23 | resource.cpp | 25 | resource.cpp |
24 | resourceaccess.cpp | 26 | resourceaccess.cpp |
25 | storage_common.cpp | 27 | storage_common.cpp |
diff --git a/common/commands.cpp b/common/commands.cpp index ecbbfdb..1dfeabe 100644 --- a/common/commands.cpp +++ b/common/commands.cpp | |||
@@ -28,17 +28,23 @@ namespace Akonadi2 | |||
28 | namespace Commands | 28 | namespace Commands |
29 | { | 29 | { |
30 | 30 | ||
31 | void write(QIODevice *device, int commandId) | 31 | int headerSize() |
32 | { | 32 | { |
33 | write(device, commandId, 0, 0); | 33 | return sizeof(int) + (sizeof(uint) * 2); |
34 | } | 34 | } |
35 | 35 | ||
36 | void write(QIODevice *device, int commandId, const char *buffer, uint size) | 36 | void write(QIODevice *device, int messageId, int commandId) |
37 | { | ||
38 | write(device, messageId, commandId, 0, 0); | ||
39 | } | ||
40 | |||
41 | void write(QIODevice *device, int messageId, int commandId, const char *buffer, uint size) | ||
37 | { | 42 | { |
38 | if (size > 0 && !buffer) { | 43 | if (size > 0 && !buffer) { |
39 | size = 0; | 44 | size = 0; |
40 | } | 45 | } |
41 | 46 | ||
47 | device->write((const char*)&messageId, sizeof(int)); | ||
42 | device->write((const char*)&commandId, sizeof(int)); | 48 | device->write((const char*)&commandId, sizeof(int)); |
43 | device->write((const char*)&size, sizeof(uint)); | 49 | device->write((const char*)&size, sizeof(uint)); |
44 | if (buffer) { | 50 | if (buffer) { |
@@ -46,9 +52,10 @@ void write(QIODevice *device, int commandId, const char *buffer, uint size) | |||
46 | } | 52 | } |
47 | } | 53 | } |
48 | 54 | ||
49 | void write(QIODevice *device, int commandId, flatbuffers::FlatBufferBuilder &fbb) | 55 | void write(QIODevice *device, int messageId, int commandId, flatbuffers::FlatBufferBuilder &fbb) |
50 | { | 56 | { |
51 | const int dataSize = fbb.GetSize(); | 57 | const int dataSize = fbb.GetSize(); |
58 | device->write((const char*)&messageId, sizeof(int)); | ||
52 | device->write((const char*)&commandId, sizeof(int)); | 59 | device->write((const char*)&commandId, sizeof(int)); |
53 | device->write((const char*)&dataSize, sizeof(int)); | 60 | device->write((const char*)&dataSize, sizeof(int)); |
54 | device->write((const char*)fbb.GetBufferPointer(), dataSize); | 61 | device->write((const char*)fbb.GetBufferPointer(), dataSize); |
diff --git a/common/commands.h b/common/commands.h index 874db73..c63bb47 100644 --- a/common/commands.h +++ b/common/commands.h | |||
@@ -33,8 +33,8 @@ namespace Commands | |||
33 | 33 | ||
34 | enum CommandIds { | 34 | enum CommandIds { |
35 | UnknownCommand = 0, | 35 | UnknownCommand = 0, |
36 | HandshakeCommand, | ||
37 | CommandCompletion, | 36 | CommandCompletion, |
37 | HandshakeCommand, | ||
38 | RevisionUpdateCommand, | 38 | RevisionUpdateCommand, |
39 | SynchronizeCommand, | 39 | SynchronizeCommand, |
40 | FetchEntityCommand, | 40 | FetchEntityCommand, |
@@ -45,9 +45,10 @@ enum CommandIds { | |||
45 | CustomCommand = 0xffff | 45 | CustomCommand = 0xffff |
46 | }; | 46 | }; |
47 | 47 | ||
48 | void AKONADI2COMMON_EXPORT write(QIODevice *device, int commandId); | 48 | int AKONADI2COMMON_EXPORT headerSize(); |
49 | void AKONADI2COMMON_EXPORT write(QIODevice *device, int commandId, const char *buffer, uint size); | 49 | void AKONADI2COMMON_EXPORT write(QIODevice *device, int messageId, int commandId); |
50 | void AKONADI2COMMON_EXPORT write(QIODevice *device, int commandId, flatbuffers::FlatBufferBuilder &fbb); | 50 | void AKONADI2COMMON_EXPORT write(QIODevice *device, int messageId, int commandId, const char *buffer, uint size); |
51 | void AKONADI2COMMON_EXPORT write(QIODevice *device, int messageId, int commandId, flatbuffers::FlatBufferBuilder &fbb); | ||
51 | 52 | ||
52 | } | 53 | } |
53 | 54 | ||
diff --git a/common/commands/commandcompletion.fbs b/common/commands/commandcompletion.fbs index 9583108..5330b4f 100644 --- a/common/commands/commandcompletion.fbs +++ b/common/commands/commandcompletion.fbs | |||
@@ -1,9 +1,9 @@ | |||
1 | namespace Akonadi2; | 1 | namespace Akonadi2; |
2 | 2 | ||
3 | table CommandCompletion { | 3 | table CommandCompletion { |
4 | id: ulong | 4 | id: ulong; |
5 | success: bool | 5 | success: bool = true; |
6 | log: ulong = 0 | 6 | log: ulong = 0; |
7 | } | 7 | } |
8 | 8 | ||
9 | root_type CommandCompletion; | 9 | root_type CommandCompletion; |
diff --git a/common/pipeline.cpp b/common/pipeline.cpp new file mode 100644 index 0000000..41def7c --- /dev/null +++ b/common/pipeline.cpp | |||
@@ -0,0 +1,75 @@ | |||
1 | /* | ||
2 | * Copyright (C) 2014 Aaron Seigo <aseigo@kde.org> | ||
3 | * | ||
4 | * This library is free software; you can redistribute it and/or | ||
5 | * modify it under the terms of the GNU Lesser General Public | ||
6 | * License as published by the Free Software Foundation; either | ||
7 | * version 2.1 of the License, or (at your option) version 3, or any | ||
8 | * later version accepted by the membership of KDE e.V. (or its | ||
9 | * successor approved by the membership of KDE e.V.), which shall | ||
10 | * act as a proxy defined in Section 6 of version 3 of the license. | ||
11 | * | ||
12 | * This library is distributed in the hope that it will be useful, | ||
13 | * but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
14 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | ||
15 | * Lesser General Public License for more details. | ||
16 | * | ||
17 | * You should have received a copy of the GNU Lesser General Public | ||
18 | * License along with this library. If not, see <http://www.gnu.org/licenses/>. | ||
19 | */ | ||
20 | |||
21 | #include "pipeline.h" | ||
22 | |||
23 | #include <QStandardPaths> | ||
24 | |||
25 | namespace Akonadi2 | ||
26 | { | ||
27 | |||
28 | class Pipeline::Private | ||
29 | { | ||
30 | public: | ||
31 | Private(const QString &storageName) | ||
32 | : storage(QStandardPaths::writableLocation(QStandardPaths::QStandardPaths::GenericDataLocation) + "/akonadi2", storageName, Akonadi2::Storage::ReadWrite) | ||
33 | { | ||
34 | |||
35 | } | ||
36 | |||
37 | Akonadi2::Storage storage; | ||
38 | }; | ||
39 | |||
40 | Pipeline::Pipeline(const QString &storageName) | ||
41 | : d(new Private(storageName)) | ||
42 | { | ||
43 | } | ||
44 | |||
45 | Pipeline::~Pipeline() | ||
46 | { | ||
47 | } | ||
48 | |||
49 | Storage &Pipeline::storage() | ||
50 | { | ||
51 | return d->storage; | ||
52 | } | ||
53 | |||
54 | void Pipeline::null(uint messageId, const char *key, size_t keySize, const char *buffer, size_t bufferSize) | ||
55 | { | ||
56 | d->storage.write(key, keySize, buffer, bufferSize); | ||
57 | } | ||
58 | |||
59 | void Pipeline::newEntity(uint messageId, const char *key, size_t keySize, const char *buffer, size_t bufferSize) | ||
60 | { | ||
61 | d->storage.write(key, keySize, buffer, bufferSize); | ||
62 | } | ||
63 | |||
64 | void Pipeline::modifiedEntity(uint messageId, const char *key, size_t keySize, const char *buffer, size_t bufferSize) | ||
65 | { | ||
66 | d->storage.write(key, keySize, buffer, bufferSize); | ||
67 | } | ||
68 | |||
69 | void Pipeline::deletedEntity(uint messageId, const char *key, size_t keySize, const char *buffer, size_t bufferSize) | ||
70 | { | ||
71 | d->storage.write(key, keySize, buffer, bufferSize); | ||
72 | } | ||
73 | |||
74 | } // namespace Akonadi2 | ||
75 | |||
diff --git a/common/pipeline.h b/common/pipeline.h new file mode 100644 index 0000000..635e630 --- /dev/null +++ b/common/pipeline.h | |||
@@ -0,0 +1,47 @@ | |||
1 | /* | ||
2 | * Copyright (C) 2014 Aaron Seigo <aseigo@kde.org> | ||
3 | * | ||
4 | * This library is free software; you can redistribute it and/or | ||
5 | * modify it under the terms of the GNU Lesser General Public | ||
6 | * License as published by the Free Software Foundation; either | ||
7 | * version 2.1 of the License, or (at your option) version 3, or any | ||
8 | * later version accepted by the membership of KDE e.V. (or its | ||
9 | * successor approved by the membership of KDE e.V.), which shall | ||
10 | * act as a proxy defined in Section 6 of version 3 of the license. | ||
11 | * | ||
12 | * This library is distributed in the hope that it will be useful, | ||
13 | * but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
14 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | ||
15 | * Lesser General Public License for more details. | ||
16 | * | ||
17 | * You should have received a copy of the GNU Lesser General Public | ||
18 | * License along with this library. If not, see <http://www.gnu.org/licenses/>. | ||
19 | */ | ||
20 | |||
21 | #pragma once | ||
22 | |||
23 | #include <akonadi2common_export.h> | ||
24 | #include <storage.h> | ||
25 | |||
26 | namespace Akonadi2 | ||
27 | { | ||
28 | |||
29 | class AKONADI2COMMON_EXPORT Pipeline | ||
30 | { | ||
31 | public: | ||
32 | Pipeline(const QString &storagePath); | ||
33 | ~Pipeline(); | ||
34 | |||
35 | Storage &storage(); | ||
36 | void null(uint messageId, const char *key, size_t keySize, const char *buffer, size_t bufferSize); | ||
37 | void newEntity(uint messageId, const char *key, size_t keySize, const char *buffer, size_t bufferSize); | ||
38 | void modifiedEntity(uint messageId, const char *key, size_t keySize, const char *buffer, size_t bufferSize); | ||
39 | void deletedEntity(uint messageId, const char *key, size_t keySize, const char *buffer, size_t bufferSize); | ||
40 | |||
41 | private: | ||
42 | class Private; | ||
43 | Private * const d; | ||
44 | }; | ||
45 | |||
46 | } // namespace Akonadi2 | ||
47 | |||
diff --git a/common/resource.cpp b/common/resource.cpp index 7120d1c..11a03ca 100644 --- a/common/resource.cpp +++ b/common/resource.cpp | |||
@@ -39,6 +39,10 @@ Resource::~Resource() | |||
39 | //delete d; | 39 | //delete d; |
40 | } | 40 | } |
41 | 41 | ||
42 | void Resource::processCommand(uint messageId, int commandId, const QByteArray &data, uint size, Pipeline *pipeline) | ||
43 | { | ||
44 | } | ||
45 | |||
42 | void Resource::synchronizeWithSource() | 46 | void Resource::synchronizeWithSource() |
43 | { | 47 | { |
44 | } | 48 | } |
diff --git a/common/resource.h b/common/resource.h index 2ecff03..53c0bc1 100644 --- a/common/resource.h +++ b/common/resource.h | |||
@@ -18,18 +18,22 @@ | |||
18 | * License along with this library. If not, see <http://www.gnu.org/licenses/>. | 18 | * License along with this library. If not, see <http://www.gnu.org/licenses/>. |
19 | */ | 19 | */ |
20 | 20 | ||
21 | #include "clientapi.h" | 21 | #include <clientapi.h> |
22 | #include <akonadi2common_export.h> | ||
23 | #include <pipeline.h> | ||
22 | 24 | ||
23 | namespace Akonadi2 | 25 | namespace Akonadi2 |
24 | { | 26 | { |
25 | 27 | ||
26 | class Resource | 28 | class AKONADI2COMMON_EXPORT Resource |
27 | { | 29 | { |
28 | public: | 30 | public: |
29 | //TODO: configuration | 31 | //TODO: configuration |
30 | Resource(); | 32 | Resource(); |
31 | virtual ~Resource(); | 33 | virtual ~Resource(); |
32 | 34 | ||
35 | //TODO: this will need to be async | ||
36 | virtual void processCommand(uint messageId, int commandId, const QByteArray &data, uint size, Pipeline *pipeline); | ||
33 | virtual void synchronizeWithSource(); | 37 | virtual void synchronizeWithSource(); |
34 | 38 | ||
35 | private: | 39 | private: |
diff --git a/common/resourceaccess.cpp b/common/resourceaccess.cpp index a4f3c94..2f7d207 100644 --- a/common/resourceaccess.cpp +++ b/common/resourceaccess.cpp | |||
@@ -22,6 +22,7 @@ | |||
22 | 22 | ||
23 | #include "common/console.h" | 23 | #include "common/console.h" |
24 | #include "common/commands.h" | 24 | #include "common/commands.h" |
25 | #include "common/commandcompletion_generated.h" | ||
25 | #include "common/handshake_generated.h" | 26 | #include "common/handshake_generated.h" |
26 | #include "common/revisionupdate_generated.h" | 27 | #include "common/revisionupdate_generated.h" |
27 | 28 | ||
@@ -54,10 +55,10 @@ public: | |||
54 | delete[] m_buffer; | 55 | delete[] m_buffer; |
55 | } | 56 | } |
56 | 57 | ||
57 | void write(QIODevice *device) | 58 | void write(QIODevice *device, uint messageId) |
58 | { | 59 | { |
59 | Console::main()->log(QString("\tSending queued command %1").arg(m_commandId)); | 60 | Console::main()->log(QString("\tSending queued command %1").arg(m_commandId)); |
60 | Commands::write(device, m_commandId, m_buffer, m_bufferSize); | 61 | Commands::write(device, messageId, m_commandId, m_buffer, m_bufferSize); |
61 | } | 62 | } |
62 | 63 | ||
63 | private: | 64 | private: |
@@ -80,13 +81,15 @@ public: | |||
80 | QByteArray partialMessageBuffer; | 81 | QByteArray partialMessageBuffer; |
81 | flatbuffers::FlatBufferBuilder fbb; | 82 | flatbuffers::FlatBufferBuilder fbb; |
82 | QVector<QueuedCommand *> commandQueue; | 83 | QVector<QueuedCommand *> commandQueue; |
84 | uint messageId; | ||
83 | }; | 85 | }; |
84 | 86 | ||
85 | ResourceAccess::Private::Private(const QString &name, ResourceAccess *q) | 87 | ResourceAccess::Private::Private(const QString &name, ResourceAccess *q) |
86 | : resourceName(name), | 88 | : resourceName(name), |
87 | socket(new QLocalSocket(q)), | 89 | socket(new QLocalSocket(q)), |
88 | tryOpenTimer(new QTimer(q)), | 90 | tryOpenTimer(new QTimer(q)), |
89 | startingProcess(false) | 91 | startingProcess(false), |
92 | messageId(0) | ||
90 | { | 93 | { |
91 | } | 94 | } |
92 | 95 | ||
@@ -129,7 +132,7 @@ void ResourceAccess::sendCommand(int commandId) | |||
129 | { | 132 | { |
130 | if (isReady()) { | 133 | if (isReady()) { |
131 | log(QString("Sending command %1").arg(commandId)); | 134 | log(QString("Sending command %1").arg(commandId)); |
132 | Commands::write(d->socket, commandId); | 135 | Commands::write(d->socket, ++d->messageId, commandId); |
133 | } else { | 136 | } else { |
134 | d->commandQueue << new QueuedCommand(commandId); | 137 | d->commandQueue << new QueuedCommand(commandId); |
135 | } | 138 | } |
@@ -139,7 +142,7 @@ void ResourceAccess::sendCommand(int commandId, flatbuffers::FlatBufferBuilder & | |||
139 | { | 142 | { |
140 | if (isReady()) { | 143 | if (isReady()) { |
141 | log(QString("Sending command %1").arg(commandId)); | 144 | log(QString("Sending command %1").arg(commandId)); |
142 | Commands::write(d->socket, commandId, fbb); | 145 | Commands::write(d->socket, ++d->messageId, commandId, fbb); |
143 | } else { | 146 | } else { |
144 | d->commandQueue << new QueuedCommand(commandId, fbb); | 147 | d->commandQueue << new QueuedCommand(commandId, fbb); |
145 | } | 148 | } |
@@ -180,7 +183,7 @@ void ResourceAccess::connected() | |||
180 | auto name = d->fbb.CreateString(QString::number((long long)this).toLatin1()); | 183 | auto name = d->fbb.CreateString(QString::number((long long)this).toLatin1()); |
181 | auto command = Akonadi2::CreateHandshake(d->fbb, name); | 184 | auto command = Akonadi2::CreateHandshake(d->fbb, name); |
182 | Akonadi2::FinishHandshakeBuffer(d->fbb, command); | 185 | Akonadi2::FinishHandshakeBuffer(d->fbb, command); |
183 | Commands::write(d->socket, Commands::HandshakeCommand, d->fbb); | 186 | Commands::write(d->socket, ++d->messageId, Commands::HandshakeCommand, d->fbb); |
184 | d->fbb.Clear(); | 187 | d->fbb.Clear(); |
185 | } | 188 | } |
186 | 189 | ||
@@ -188,7 +191,7 @@ void ResourceAccess::connected() | |||
188 | //TODO: serialize instead of blast them all through the socket? | 191 | //TODO: serialize instead of blast them all through the socket? |
189 | log(QString("We have %1 queued commands").arg(d->commandQueue.size())); | 192 | log(QString("We have %1 queued commands").arg(d->commandQueue.size())); |
190 | for (QueuedCommand *command: d->commandQueue) { | 193 | for (QueuedCommand *command: d->commandQueue) { |
191 | command->write(d->socket); | 194 | command->write(d->socket, ++d->messageId); |
192 | delete command; | 195 | delete command; |
193 | } | 196 | } |
194 | d->commandQueue.clear(); | 197 | d->commandQueue.clear(); |
@@ -239,25 +242,33 @@ void ResourceAccess::readResourceMessage() | |||
239 | 242 | ||
240 | bool ResourceAccess::processMessageBuffer() | 243 | bool ResourceAccess::processMessageBuffer() |
241 | { | 244 | { |
242 | static const int headerSize = (sizeof(int) * 2); | 245 | static const int headerSize = Commands::headerSize(); |
243 | if (d->partialMessageBuffer.size() < headerSize) { | 246 | if (d->partialMessageBuffer.size() < headerSize) { |
244 | return false; | 247 | return false; |
245 | } | 248 | } |
246 | 249 | ||
247 | const int commandId = *(int*)d->partialMessageBuffer.constData(); | 250 | //messageId is unused, so commented out |
248 | const int size = *(int*)(d->partialMessageBuffer.constData() + sizeof(int)); | 251 | //const uint messageId = *(int*)(d->partialMessageBuffer.constData()); |
252 | const int commandId = *(int*)(d->partialMessageBuffer.constData() + sizeof(uint)); | ||
253 | const uint size = *(int*)(d->partialMessageBuffer.constData() + sizeof(int) + sizeof(uint)); | ||
249 | 254 | ||
250 | if (size > d->partialMessageBuffer.size() - headerSize) { | 255 | if (size > (uint)(d->partialMessageBuffer.size() - headerSize)) { |
251 | return false; | 256 | return false; |
252 | } | 257 | } |
253 | 258 | ||
254 | switch (commandId) { | 259 | switch (commandId) { |
255 | case Commands::RevisionUpdateCommand: { | 260 | case Commands::RevisionUpdateCommand: { |
256 | auto buffer = Akonadi2::GetRevisionUpdate(d->partialMessageBuffer.constData() + headerSize); | 261 | auto buffer = GetRevisionUpdate(d->partialMessageBuffer.constData() + headerSize); |
257 | log(QString("Revision updated to: %1").arg(buffer->revision())); | 262 | log(QString("Revision updated to: %1").arg(buffer->revision())); |
258 | emit revisionChanged(buffer->revision()); | 263 | emit revisionChanged(buffer->revision()); |
259 | break; | 264 | break; |
260 | } | 265 | } |
266 | case Commands::CommandCompletion: { | ||
267 | auto buffer = GetCommandCompletion(d->partialMessageBuffer.constData() + headerSize); | ||
268 | log(QString("Command %1 completed %2").arg(buffer->id()).arg(buffer->success() ? "sucessfully" : "unsuccessfully")); | ||
269 | //TODO: if a queued command, get it out of the queue ... pass on completion ot the relevant objects .. etc | ||
270 | break; | ||
271 | } | ||
261 | default: | 272 | default: |
262 | break; | 273 | break; |
263 | } | 274 | } |
diff --git a/synchronizer/listener.cpp b/synchronizer/listener.cpp index 368dae5..18442e7 100644 --- a/synchronizer/listener.cpp +++ b/synchronizer/listener.cpp | |||
@@ -22,8 +22,11 @@ | |||
22 | #include "common/clientapi.h" | 22 | #include "common/clientapi.h" |
23 | #include "common/console.h" | 23 | #include "common/console.h" |
24 | #include "common/commands.h" | 24 | #include "common/commands.h" |
25 | #include "common/handshake_generated.h" | ||
26 | #include "common/resource.h" | 25 | #include "common/resource.h" |
26 | |||
27 | // commands | ||
28 | #include "common/commandcompletion_generated.h" | ||
29 | #include "common/handshake_generated.h" | ||
27 | #include "common/revisionupdate_generated.h" | 30 | #include "common/revisionupdate_generated.h" |
28 | 31 | ||
29 | #include <QLocalSocket> | 32 | #include <QLocalSocket> |
@@ -35,7 +38,9 @@ Listener::Listener(const QString &resourceName, QObject *parent) | |||
35 | m_revision(0), | 38 | m_revision(0), |
36 | m_resourceName(resourceName), | 39 | m_resourceName(resourceName), |
37 | m_resource(0), | 40 | m_resource(0), |
38 | m_clientBufferProcessesTimer(new QTimer(this)) | 41 | m_pipeline(new Akonadi2::Pipeline(resourceName)), |
42 | m_clientBufferProcessesTimer(new QTimer(this)), | ||
43 | m_messageId(0) | ||
39 | { | 44 | { |
40 | connect(m_server, &QLocalServer::newConnection, | 45 | connect(m_server, &QLocalServer::newConnection, |
41 | this, &Listener::acceptConnection); | 46 | this, &Listener::acceptConnection); |
@@ -64,6 +69,7 @@ Listener::Listener(const QString &resourceName, QObject *parent) | |||
64 | 69 | ||
65 | Listener::~Listener() | 70 | Listener::~Listener() |
66 | { | 71 | { |
72 | delete m_pipeline; | ||
67 | } | 73 | } |
68 | 74 | ||
69 | void Listener::setRevision(unsigned long long revision) | 75 | void Listener::setRevision(unsigned long long revision) |
@@ -183,43 +189,62 @@ void Listener::processClientBuffers() | |||
183 | 189 | ||
184 | bool Listener::processClientBuffer(Client &client) | 190 | bool Listener::processClientBuffer(Client &client) |
185 | { | 191 | { |
186 | static const int headerSize = (sizeof(int) + sizeof(uint)); | 192 | static const int headerSize = Akonadi2::Commands::headerSize(); |
187 | if (client.commandBuffer.size() < headerSize) { | 193 | if (client.commandBuffer.size() < headerSize) { |
188 | return false; | 194 | return false; |
189 | } | 195 | } |
190 | 196 | ||
191 | int commandId; | 197 | int commandId; |
192 | uint size; | 198 | uint messageId, size; |
193 | commandId = *(int*)client.commandBuffer.constData(); | 199 | messageId = *(uint*)client.commandBuffer.constData(); |
194 | size = *(uint*)(client.commandBuffer.constData() + sizeof(uint)); | 200 | commandId = *(int*)(client.commandBuffer.constData() + sizeof(uint)); |
201 | size = *(uint*)(client.commandBuffer.constData() + sizeof(int) + sizeof(uint)); | ||
195 | 202 | ||
196 | //TODO: reject messages above a certain size? | 203 | //TODO: reject messages above a certain size? |
197 | 204 | ||
198 | if (size <= uint(client.commandBuffer.size() - headerSize)) { | 205 | if (size <= uint(client.commandBuffer.size() - headerSize)) { |
199 | QByteArray data = client.commandBuffer.mid(headerSize, size); | 206 | client.commandBuffer.remove(0, headerSize); |
200 | client.commandBuffer.remove(0, headerSize + size); | ||
201 | 207 | ||
202 | switch (commandId) { | 208 | switch (commandId) { |
203 | case Akonadi2::Commands::HandshakeCommand: { | 209 | case Akonadi2::Commands::HandshakeCommand: { |
204 | auto buffer = Akonadi2::GetHandshake(data.constData()); | 210 | flatbuffers::Verifier verifier((const uint8_t *)client.commandBuffer.constData(), size); |
205 | Akonadi2::Console::main()->log(QString(" Handshake from %1").arg(buffer->name()->c_str())); | 211 | if (Akonadi2::VerifyHandshakeBuffer(verifier)) { |
206 | client.name = buffer->name()->c_str(); | 212 | auto buffer = Akonadi2::GetHandshake(client.commandBuffer.constData()); |
207 | sendCurrentRevision(client); | 213 | client.name = buffer->name()->c_str(); |
214 | sendCurrentRevision(client); | ||
215 | } | ||
208 | break; | 216 | break; |
209 | } | 217 | } |
210 | case Akonadi2::Commands::SynchronizeCommand: { | 218 | case Akonadi2::Commands::SynchronizeCommand: { |
211 | Akonadi2::Console::main()->log(QString(" Synchronize request from %1").arg(client.name)); | 219 | Akonadi2::Console::main()->log(QString("\tSynchronize request (id %1) from %2").arg(messageId).arg(client.name)); |
212 | loadResource(); | 220 | loadResource(); |
213 | //TODO: on failure ... what? | ||
214 | if (m_resource) { | 221 | if (m_resource) { |
215 | m_resource->synchronizeWithSource(); | 222 | m_resource->synchronizeWithSource(); |
216 | } | 223 | } |
217 | break; | 224 | break; |
218 | } | 225 | } |
226 | case Akonadi2::Commands::FetchEntityCommand: | ||
227 | case Akonadi2::Commands::DeleteEntityCommand: | ||
228 | case Akonadi2::Commands::ModifyEntityCommand: | ||
229 | case Akonadi2::Commands::CreateEntityCommand: | ||
230 | Akonadi2::Console::main()->log(QString("\tCommand id %1 of type %2 from %1").arg(messageId).arg(commandId).arg(client.name)); | ||
231 | m_resource->processCommand(messageId, commandId, client.commandBuffer, size, m_pipeline); | ||
232 | break; | ||
219 | default: | 233 | default: |
234 | if (commandId > Akonadi2::Commands::CustomCommand) { | ||
235 | loadResource(); | ||
236 | if (m_resource) { | ||
237 | m_resource->processCommand(messageId, commandId, client.commandBuffer, size, m_pipeline); | ||
238 | } | ||
239 | } else { | ||
240 | //TODO: handle error: we don't know wtf this command is | ||
241 | } | ||
220 | break; | 242 | break; |
221 | } | 243 | } |
222 | 244 | ||
245 | //TODO: async commands == async sendCommandCompleted | ||
246 | sendCommandCompleted(client, messageId); | ||
247 | client.commandBuffer.remove(0, size); | ||
223 | return client.commandBuffer.size() >= headerSize; | 248 | return client.commandBuffer.size() >= headerSize; |
224 | } | 249 | } |
225 | 250 | ||
@@ -234,7 +259,19 @@ void Listener::sendCurrentRevision(Client &client) | |||
234 | 259 | ||
235 | auto command = Akonadi2::CreateRevisionUpdate(m_fbb, m_revision); | 260 | auto command = Akonadi2::CreateRevisionUpdate(m_fbb, m_revision); |
236 | Akonadi2::FinishRevisionUpdateBuffer(m_fbb, command); | 261 | Akonadi2::FinishRevisionUpdateBuffer(m_fbb, command); |
237 | Akonadi2::Commands::write(client.socket, Akonadi2::Commands::RevisionUpdateCommand, m_fbb); | 262 | Akonadi2::Commands::write(client.socket, ++m_messageId, Akonadi2::Commands::RevisionUpdateCommand, m_fbb); |
263 | m_fbb.Clear(); | ||
264 | } | ||
265 | |||
266 | void Listener::sendCommandCompleted(Client &client, uint messageId) | ||
267 | { | ||
268 | if (!client.socket || !client.socket->isValid()) { | ||
269 | return; | ||
270 | } | ||
271 | |||
272 | auto command = Akonadi2::CreateCommandCompletion(m_fbb, messageId); | ||
273 | Akonadi2::FinishCommandCompletionBuffer(m_fbb, command); | ||
274 | Akonadi2::Commands::write(client.socket, ++m_messageId, Akonadi2::Commands::CommandCompletion, m_fbb); | ||
238 | m_fbb.Clear(); | 275 | m_fbb.Clear(); |
239 | } | 276 | } |
240 | 277 | ||
@@ -248,7 +285,7 @@ void Listener::updateClientsWithRevision() | |||
248 | continue; | 285 | continue; |
249 | } | 286 | } |
250 | 287 | ||
251 | Akonadi2::Commands::write(client.socket, Akonadi2::Commands::RevisionUpdateCommand, m_fbb); | 288 | Akonadi2::Commands::write(client.socket, ++m_messageId, Akonadi2::Commands::RevisionUpdateCommand, m_fbb); |
252 | } | 289 | } |
253 | m_fbb.Clear(); | 290 | m_fbb.Clear(); |
254 | } | 291 | } |
@@ -269,5 +306,6 @@ void Listener::loadResource() | |||
269 | } else { | 306 | } else { |
270 | Akonadi2::Console::main()->log(QString("Failed to load resource %1").arg(m_resourceName)); | 307 | Akonadi2::Console::main()->log(QString("Failed to load resource %1").arg(m_resourceName)); |
271 | } | 308 | } |
309 | //TODO: on failure ... what? | ||
272 | } | 310 | } |
273 | 311 | ||
diff --git a/synchronizer/listener.h b/synchronizer/listener.h index 053fac3..b294277 100644 --- a/synchronizer/listener.h +++ b/synchronizer/listener.h | |||
@@ -25,6 +25,8 @@ | |||
25 | 25 | ||
26 | #include <flatbuffers/flatbuffers.h> | 26 | #include <flatbuffers/flatbuffers.h> |
27 | 27 | ||
28 | #include "common/pipeline.h" | ||
29 | |||
28 | namespace Akonadi2 | 30 | namespace Akonadi2 |
29 | { | 31 | { |
30 | class Resource; | 32 | class Resource; |
@@ -78,6 +80,7 @@ private Q_SLOTS: | |||
78 | private: | 80 | private: |
79 | bool processClientBuffer(Client &client); | 81 | bool processClientBuffer(Client &client); |
80 | void sendCurrentRevision(Client &client); | 82 | void sendCurrentRevision(Client &client); |
83 | void sendCommandCompleted(Client &client, uint messageId); | ||
81 | void updateClientsWithRevision(); | 84 | void updateClientsWithRevision(); |
82 | void loadResource(); | 85 | void loadResource(); |
83 | 86 | ||
@@ -87,5 +90,7 @@ private: | |||
87 | flatbuffers::FlatBufferBuilder m_fbb; | 90 | flatbuffers::FlatBufferBuilder m_fbb; |
88 | const QString m_resourceName; | 91 | const QString m_resourceName; |
89 | Akonadi2::Resource *m_resource; | 92 | Akonadi2::Resource *m_resource; |
93 | Akonadi2::Pipeline *m_pipeline; | ||
90 | QTimer *m_clientBufferProcessesTimer; | 94 | QTimer *m_clientBufferProcessesTimer; |
95 | int m_messageId; | ||
91 | }; | 96 | }; |