diff options
author | Aaron Seigo <aseigo@kde.org> | 2014-12-16 22:40:44 +0100 |
---|---|---|
committer | Aaron Seigo <aseigo@kde.org> | 2014-12-16 22:40:44 +0100 |
commit | 77944384d24b5005d6b8648572a31a3ae84dd946 (patch) | |
tree | 8726831773b4182cb6177d6c72a723e08a6c15aa /common | |
parent | 66b21fd2e3c53e4a820e3343b192be7b043da110 (diff) | |
download | sink-77944384d24b5005d6b8648572a31a3ae84dd946.tar.gz sink-77944384d24b5005d6b8648572a31a3ae84dd946.zip |
add pipelines (as a sketch only), message ids and message responses
Diffstat (limited to 'common')
-rw-r--r-- | common/CMakeLists.txt | 2 | ||||
-rw-r--r-- | common/commands.cpp | 15 | ||||
-rw-r--r-- | common/commands.h | 9 | ||||
-rw-r--r-- | common/commands/commandcompletion.fbs | 6 | ||||
-rw-r--r-- | common/pipeline.cpp | 75 | ||||
-rw-r--r-- | common/pipeline.h | 47 | ||||
-rw-r--r-- | common/resource.cpp | 4 | ||||
-rw-r--r-- | common/resource.h | 8 | ||||
-rw-r--r-- | common/resourceaccess.cpp | 35 |
9 files changed, 176 insertions, 25 deletions
diff --git a/common/CMakeLists.txt b/common/CMakeLists.txt index 7de4aa9..001dab5 100644 --- a/common/CMakeLists.txt +++ b/common/CMakeLists.txt | |||
@@ -1,5 +1,6 @@ | |||
1 | project(akonadi2common) | 1 | project(akonadi2common) |
2 | generate_flatbuffers( | 2 | generate_flatbuffers( |
3 | commands/commandcompletion | ||
3 | commands/createentity | 4 | commands/createentity |
4 | commands/deleteentity | 5 | commands/deleteentity |
5 | commands/fetchentity | 6 | commands/fetchentity |
@@ -20,6 +21,7 @@ set(command_SRCS | |||
20 | clientapi.cpp | 21 | clientapi.cpp |
21 | commands.cpp | 22 | commands.cpp |
22 | console.cpp | 23 | console.cpp |
24 | pipeline.cpp | ||
23 | resource.cpp | 25 | resource.cpp |
24 | resourceaccess.cpp | 26 | resourceaccess.cpp |
25 | storage_common.cpp | 27 | storage_common.cpp |
diff --git a/common/commands.cpp b/common/commands.cpp index ecbbfdb..1dfeabe 100644 --- a/common/commands.cpp +++ b/common/commands.cpp | |||
@@ -28,17 +28,23 @@ namespace Akonadi2 | |||
28 | namespace Commands | 28 | namespace Commands |
29 | { | 29 | { |
30 | 30 | ||
31 | void write(QIODevice *device, int commandId) | 31 | int headerSize() |
32 | { | 32 | { |
33 | write(device, commandId, 0, 0); | 33 | return sizeof(int) + (sizeof(uint) * 2); |
34 | } | 34 | } |
35 | 35 | ||
36 | void write(QIODevice *device, int commandId, const char *buffer, uint size) | 36 | void write(QIODevice *device, int messageId, int commandId) |
37 | { | ||
38 | write(device, messageId, commandId, 0, 0); | ||
39 | } | ||
40 | |||
41 | void write(QIODevice *device, int messageId, int commandId, const char *buffer, uint size) | ||
37 | { | 42 | { |
38 | if (size > 0 && !buffer) { | 43 | if (size > 0 && !buffer) { |
39 | size = 0; | 44 | size = 0; |
40 | } | 45 | } |
41 | 46 | ||
47 | device->write((const char*)&messageId, sizeof(int)); | ||
42 | device->write((const char*)&commandId, sizeof(int)); | 48 | device->write((const char*)&commandId, sizeof(int)); |
43 | device->write((const char*)&size, sizeof(uint)); | 49 | device->write((const char*)&size, sizeof(uint)); |
44 | if (buffer) { | 50 | if (buffer) { |
@@ -46,9 +52,10 @@ void write(QIODevice *device, int commandId, const char *buffer, uint size) | |||
46 | } | 52 | } |
47 | } | 53 | } |
48 | 54 | ||
49 | void write(QIODevice *device, int commandId, flatbuffers::FlatBufferBuilder &fbb) | 55 | void write(QIODevice *device, int messageId, int commandId, flatbuffers::FlatBufferBuilder &fbb) |
50 | { | 56 | { |
51 | const int dataSize = fbb.GetSize(); | 57 | const int dataSize = fbb.GetSize(); |
58 | device->write((const char*)&messageId, sizeof(int)); | ||
52 | device->write((const char*)&commandId, sizeof(int)); | 59 | device->write((const char*)&commandId, sizeof(int)); |
53 | device->write((const char*)&dataSize, sizeof(int)); | 60 | device->write((const char*)&dataSize, sizeof(int)); |
54 | device->write((const char*)fbb.GetBufferPointer(), dataSize); | 61 | device->write((const char*)fbb.GetBufferPointer(), dataSize); |
diff --git a/common/commands.h b/common/commands.h index 874db73..c63bb47 100644 --- a/common/commands.h +++ b/common/commands.h | |||
@@ -33,8 +33,8 @@ namespace Commands | |||
33 | 33 | ||
34 | enum CommandIds { | 34 | enum CommandIds { |
35 | UnknownCommand = 0, | 35 | UnknownCommand = 0, |
36 | HandshakeCommand, | ||
37 | CommandCompletion, | 36 | CommandCompletion, |
37 | HandshakeCommand, | ||
38 | RevisionUpdateCommand, | 38 | RevisionUpdateCommand, |
39 | SynchronizeCommand, | 39 | SynchronizeCommand, |
40 | FetchEntityCommand, | 40 | FetchEntityCommand, |
@@ -45,9 +45,10 @@ enum CommandIds { | |||
45 | CustomCommand = 0xffff | 45 | CustomCommand = 0xffff |
46 | }; | 46 | }; |
47 | 47 | ||
48 | void AKONADI2COMMON_EXPORT write(QIODevice *device, int commandId); | 48 | int AKONADI2COMMON_EXPORT headerSize(); |
49 | void AKONADI2COMMON_EXPORT write(QIODevice *device, int commandId, const char *buffer, uint size); | 49 | void AKONADI2COMMON_EXPORT write(QIODevice *device, int messageId, int commandId); |
50 | void AKONADI2COMMON_EXPORT write(QIODevice *device, int commandId, flatbuffers::FlatBufferBuilder &fbb); | 50 | void AKONADI2COMMON_EXPORT write(QIODevice *device, int messageId, int commandId, const char *buffer, uint size); |
51 | void AKONADI2COMMON_EXPORT write(QIODevice *device, int messageId, int commandId, flatbuffers::FlatBufferBuilder &fbb); | ||
51 | 52 | ||
52 | } | 53 | } |
53 | 54 | ||
diff --git a/common/commands/commandcompletion.fbs b/common/commands/commandcompletion.fbs index 9583108..5330b4f 100644 --- a/common/commands/commandcompletion.fbs +++ b/common/commands/commandcompletion.fbs | |||
@@ -1,9 +1,9 @@ | |||
1 | namespace Akonadi2; | 1 | namespace Akonadi2; |
2 | 2 | ||
3 | table CommandCompletion { | 3 | table CommandCompletion { |
4 | id: ulong | 4 | id: ulong; |
5 | success: bool | 5 | success: bool = true; |
6 | log: ulong = 0 | 6 | log: ulong = 0; |
7 | } | 7 | } |
8 | 8 | ||
9 | root_type CommandCompletion; | 9 | root_type CommandCompletion; |
diff --git a/common/pipeline.cpp b/common/pipeline.cpp new file mode 100644 index 0000000..41def7c --- /dev/null +++ b/common/pipeline.cpp | |||
@@ -0,0 +1,75 @@ | |||
1 | /* | ||
2 | * Copyright (C) 2014 Aaron Seigo <aseigo@kde.org> | ||
3 | * | ||
4 | * This library is free software; you can redistribute it and/or | ||
5 | * modify it under the terms of the GNU Lesser General Public | ||
6 | * License as published by the Free Software Foundation; either | ||
7 | * version 2.1 of the License, or (at your option) version 3, or any | ||
8 | * later version accepted by the membership of KDE e.V. (or its | ||
9 | * successor approved by the membership of KDE e.V.), which shall | ||
10 | * act as a proxy defined in Section 6 of version 3 of the license. | ||
11 | * | ||
12 | * This library is distributed in the hope that it will be useful, | ||
13 | * but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
14 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | ||
15 | * Lesser General Public License for more details. | ||
16 | * | ||
17 | * You should have received a copy of the GNU Lesser General Public | ||
18 | * License along with this library. If not, see <http://www.gnu.org/licenses/>. | ||
19 | */ | ||
20 | |||
21 | #include "pipeline.h" | ||
22 | |||
23 | #include <QStandardPaths> | ||
24 | |||
25 | namespace Akonadi2 | ||
26 | { | ||
27 | |||
28 | class Pipeline::Private | ||
29 | { | ||
30 | public: | ||
31 | Private(const QString &storageName) | ||
32 | : storage(QStandardPaths::writableLocation(QStandardPaths::QStandardPaths::GenericDataLocation) + "/akonadi2", storageName, Akonadi2::Storage::ReadWrite) | ||
33 | { | ||
34 | |||
35 | } | ||
36 | |||
37 | Akonadi2::Storage storage; | ||
38 | }; | ||
39 | |||
40 | Pipeline::Pipeline(const QString &storageName) | ||
41 | : d(new Private(storageName)) | ||
42 | { | ||
43 | } | ||
44 | |||
45 | Pipeline::~Pipeline() | ||
46 | { | ||
47 | } | ||
48 | |||
49 | Storage &Pipeline::storage() | ||
50 | { | ||
51 | return d->storage; | ||
52 | } | ||
53 | |||
54 | void Pipeline::null(uint messageId, const char *key, size_t keySize, const char *buffer, size_t bufferSize) | ||
55 | { | ||
56 | d->storage.write(key, keySize, buffer, bufferSize); | ||
57 | } | ||
58 | |||
59 | void Pipeline::newEntity(uint messageId, const char *key, size_t keySize, const char *buffer, size_t bufferSize) | ||
60 | { | ||
61 | d->storage.write(key, keySize, buffer, bufferSize); | ||
62 | } | ||
63 | |||
64 | void Pipeline::modifiedEntity(uint messageId, const char *key, size_t keySize, const char *buffer, size_t bufferSize) | ||
65 | { | ||
66 | d->storage.write(key, keySize, buffer, bufferSize); | ||
67 | } | ||
68 | |||
69 | void Pipeline::deletedEntity(uint messageId, const char *key, size_t keySize, const char *buffer, size_t bufferSize) | ||
70 | { | ||
71 | d->storage.write(key, keySize, buffer, bufferSize); | ||
72 | } | ||
73 | |||
74 | } // namespace Akonadi2 | ||
75 | |||
diff --git a/common/pipeline.h b/common/pipeline.h new file mode 100644 index 0000000..635e630 --- /dev/null +++ b/common/pipeline.h | |||
@@ -0,0 +1,47 @@ | |||
1 | /* | ||
2 | * Copyright (C) 2014 Aaron Seigo <aseigo@kde.org> | ||
3 | * | ||
4 | * This library is free software; you can redistribute it and/or | ||
5 | * modify it under the terms of the GNU Lesser General Public | ||
6 | * License as published by the Free Software Foundation; either | ||
7 | * version 2.1 of the License, or (at your option) version 3, or any | ||
8 | * later version accepted by the membership of KDE e.V. (or its | ||
9 | * successor approved by the membership of KDE e.V.), which shall | ||
10 | * act as a proxy defined in Section 6 of version 3 of the license. | ||
11 | * | ||
12 | * This library is distributed in the hope that it will be useful, | ||
13 | * but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
14 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | ||
15 | * Lesser General Public License for more details. | ||
16 | * | ||
17 | * You should have received a copy of the GNU Lesser General Public | ||
18 | * License along with this library. If not, see <http://www.gnu.org/licenses/>. | ||
19 | */ | ||
20 | |||
21 | #pragma once | ||
22 | |||
23 | #include <akonadi2common_export.h> | ||
24 | #include <storage.h> | ||
25 | |||
26 | namespace Akonadi2 | ||
27 | { | ||
28 | |||
29 | class AKONADI2COMMON_EXPORT Pipeline | ||
30 | { | ||
31 | public: | ||
32 | Pipeline(const QString &storagePath); | ||
33 | ~Pipeline(); | ||
34 | |||
35 | Storage &storage(); | ||
36 | void null(uint messageId, const char *key, size_t keySize, const char *buffer, size_t bufferSize); | ||
37 | void newEntity(uint messageId, const char *key, size_t keySize, const char *buffer, size_t bufferSize); | ||
38 | void modifiedEntity(uint messageId, const char *key, size_t keySize, const char *buffer, size_t bufferSize); | ||
39 | void deletedEntity(uint messageId, const char *key, size_t keySize, const char *buffer, size_t bufferSize); | ||
40 | |||
41 | private: | ||
42 | class Private; | ||
43 | Private * const d; | ||
44 | }; | ||
45 | |||
46 | } // namespace Akonadi2 | ||
47 | |||
diff --git a/common/resource.cpp b/common/resource.cpp index 7120d1c..11a03ca 100644 --- a/common/resource.cpp +++ b/common/resource.cpp | |||
@@ -39,6 +39,10 @@ Resource::~Resource() | |||
39 | //delete d; | 39 | //delete d; |
40 | } | 40 | } |
41 | 41 | ||
42 | void Resource::processCommand(uint messageId, int commandId, const QByteArray &data, uint size, Pipeline *pipeline) | ||
43 | { | ||
44 | } | ||
45 | |||
42 | void Resource::synchronizeWithSource() | 46 | void Resource::synchronizeWithSource() |
43 | { | 47 | { |
44 | } | 48 | } |
diff --git a/common/resource.h b/common/resource.h index 2ecff03..53c0bc1 100644 --- a/common/resource.h +++ b/common/resource.h | |||
@@ -18,18 +18,22 @@ | |||
18 | * License along with this library. If not, see <http://www.gnu.org/licenses/>. | 18 | * License along with this library. If not, see <http://www.gnu.org/licenses/>. |
19 | */ | 19 | */ |
20 | 20 | ||
21 | #include "clientapi.h" | 21 | #include <clientapi.h> |
22 | #include <akonadi2common_export.h> | ||
23 | #include <pipeline.h> | ||
22 | 24 | ||
23 | namespace Akonadi2 | 25 | namespace Akonadi2 |
24 | { | 26 | { |
25 | 27 | ||
26 | class Resource | 28 | class AKONADI2COMMON_EXPORT Resource |
27 | { | 29 | { |
28 | public: | 30 | public: |
29 | //TODO: configuration | 31 | //TODO: configuration |
30 | Resource(); | 32 | Resource(); |
31 | virtual ~Resource(); | 33 | virtual ~Resource(); |
32 | 34 | ||
35 | //TODO: this will need to be async | ||
36 | virtual void processCommand(uint messageId, int commandId, const QByteArray &data, uint size, Pipeline *pipeline); | ||
33 | virtual void synchronizeWithSource(); | 37 | virtual void synchronizeWithSource(); |
34 | 38 | ||
35 | private: | 39 | private: |
diff --git a/common/resourceaccess.cpp b/common/resourceaccess.cpp index a4f3c94..2f7d207 100644 --- a/common/resourceaccess.cpp +++ b/common/resourceaccess.cpp | |||
@@ -22,6 +22,7 @@ | |||
22 | 22 | ||
23 | #include "common/console.h" | 23 | #include "common/console.h" |
24 | #include "common/commands.h" | 24 | #include "common/commands.h" |
25 | #include "common/commandcompletion_generated.h" | ||
25 | #include "common/handshake_generated.h" | 26 | #include "common/handshake_generated.h" |
26 | #include "common/revisionupdate_generated.h" | 27 | #include "common/revisionupdate_generated.h" |
27 | 28 | ||
@@ -54,10 +55,10 @@ public: | |||
54 | delete[] m_buffer; | 55 | delete[] m_buffer; |
55 | } | 56 | } |
56 | 57 | ||
57 | void write(QIODevice *device) | 58 | void write(QIODevice *device, uint messageId) |
58 | { | 59 | { |
59 | Console::main()->log(QString("\tSending queued command %1").arg(m_commandId)); | 60 | Console::main()->log(QString("\tSending queued command %1").arg(m_commandId)); |
60 | Commands::write(device, m_commandId, m_buffer, m_bufferSize); | 61 | Commands::write(device, messageId, m_commandId, m_buffer, m_bufferSize); |
61 | } | 62 | } |
62 | 63 | ||
63 | private: | 64 | private: |
@@ -80,13 +81,15 @@ public: | |||
80 | QByteArray partialMessageBuffer; | 81 | QByteArray partialMessageBuffer; |
81 | flatbuffers::FlatBufferBuilder fbb; | 82 | flatbuffers::FlatBufferBuilder fbb; |
82 | QVector<QueuedCommand *> commandQueue; | 83 | QVector<QueuedCommand *> commandQueue; |
84 | uint messageId; | ||
83 | }; | 85 | }; |
84 | 86 | ||
85 | ResourceAccess::Private::Private(const QString &name, ResourceAccess *q) | 87 | ResourceAccess::Private::Private(const QString &name, ResourceAccess *q) |
86 | : resourceName(name), | 88 | : resourceName(name), |
87 | socket(new QLocalSocket(q)), | 89 | socket(new QLocalSocket(q)), |
88 | tryOpenTimer(new QTimer(q)), | 90 | tryOpenTimer(new QTimer(q)), |
89 | startingProcess(false) | 91 | startingProcess(false), |
92 | messageId(0) | ||
90 | { | 93 | { |
91 | } | 94 | } |
92 | 95 | ||
@@ -129,7 +132,7 @@ void ResourceAccess::sendCommand(int commandId) | |||
129 | { | 132 | { |
130 | if (isReady()) { | 133 | if (isReady()) { |
131 | log(QString("Sending command %1").arg(commandId)); | 134 | log(QString("Sending command %1").arg(commandId)); |
132 | Commands::write(d->socket, commandId); | 135 | Commands::write(d->socket, ++d->messageId, commandId); |
133 | } else { | 136 | } else { |
134 | d->commandQueue << new QueuedCommand(commandId); | 137 | d->commandQueue << new QueuedCommand(commandId); |
135 | } | 138 | } |
@@ -139,7 +142,7 @@ void ResourceAccess::sendCommand(int commandId, flatbuffers::FlatBufferBuilder & | |||
139 | { | 142 | { |
140 | if (isReady()) { | 143 | if (isReady()) { |
141 | log(QString("Sending command %1").arg(commandId)); | 144 | log(QString("Sending command %1").arg(commandId)); |
142 | Commands::write(d->socket, commandId, fbb); | 145 | Commands::write(d->socket, ++d->messageId, commandId, fbb); |
143 | } else { | 146 | } else { |
144 | d->commandQueue << new QueuedCommand(commandId, fbb); | 147 | d->commandQueue << new QueuedCommand(commandId, fbb); |
145 | } | 148 | } |
@@ -180,7 +183,7 @@ void ResourceAccess::connected() | |||
180 | auto name = d->fbb.CreateString(QString::number((long long)this).toLatin1()); | 183 | auto name = d->fbb.CreateString(QString::number((long long)this).toLatin1()); |
181 | auto command = Akonadi2::CreateHandshake(d->fbb, name); | 184 | auto command = Akonadi2::CreateHandshake(d->fbb, name); |
182 | Akonadi2::FinishHandshakeBuffer(d->fbb, command); | 185 | Akonadi2::FinishHandshakeBuffer(d->fbb, command); |
183 | Commands::write(d->socket, Commands::HandshakeCommand, d->fbb); | 186 | Commands::write(d->socket, ++d->messageId, Commands::HandshakeCommand, d->fbb); |
184 | d->fbb.Clear(); | 187 | d->fbb.Clear(); |
185 | } | 188 | } |
186 | 189 | ||
@@ -188,7 +191,7 @@ void ResourceAccess::connected() | |||
188 | //TODO: serialize instead of blast them all through the socket? | 191 | //TODO: serialize instead of blast them all through the socket? |
189 | log(QString("We have %1 queued commands").arg(d->commandQueue.size())); | 192 | log(QString("We have %1 queued commands").arg(d->commandQueue.size())); |
190 | for (QueuedCommand *command: d->commandQueue) { | 193 | for (QueuedCommand *command: d->commandQueue) { |
191 | command->write(d->socket); | 194 | command->write(d->socket, ++d->messageId); |
192 | delete command; | 195 | delete command; |
193 | } | 196 | } |
194 | d->commandQueue.clear(); | 197 | d->commandQueue.clear(); |
@@ -239,25 +242,33 @@ void ResourceAccess::readResourceMessage() | |||
239 | 242 | ||
240 | bool ResourceAccess::processMessageBuffer() | 243 | bool ResourceAccess::processMessageBuffer() |
241 | { | 244 | { |
242 | static const int headerSize = (sizeof(int) * 2); | 245 | static const int headerSize = Commands::headerSize(); |
243 | if (d->partialMessageBuffer.size() < headerSize) { | 246 | if (d->partialMessageBuffer.size() < headerSize) { |
244 | return false; | 247 | return false; |
245 | } | 248 | } |
246 | 249 | ||
247 | const int commandId = *(int*)d->partialMessageBuffer.constData(); | 250 | //messageId is unused, so commented out |
248 | const int size = *(int*)(d->partialMessageBuffer.constData() + sizeof(int)); | 251 | //const uint messageId = *(int*)(d->partialMessageBuffer.constData()); |
252 | const int commandId = *(int*)(d->partialMessageBuffer.constData() + sizeof(uint)); | ||
253 | const uint size = *(int*)(d->partialMessageBuffer.constData() + sizeof(int) + sizeof(uint)); | ||
249 | 254 | ||
250 | if (size > d->partialMessageBuffer.size() - headerSize) { | 255 | if (size > (uint)(d->partialMessageBuffer.size() - headerSize)) { |
251 | return false; | 256 | return false; |
252 | } | 257 | } |
253 | 258 | ||
254 | switch (commandId) { | 259 | switch (commandId) { |
255 | case Commands::RevisionUpdateCommand: { | 260 | case Commands::RevisionUpdateCommand: { |
256 | auto buffer = Akonadi2::GetRevisionUpdate(d->partialMessageBuffer.constData() + headerSize); | 261 | auto buffer = GetRevisionUpdate(d->partialMessageBuffer.constData() + headerSize); |
257 | log(QString("Revision updated to: %1").arg(buffer->revision())); | 262 | log(QString("Revision updated to: %1").arg(buffer->revision())); |
258 | emit revisionChanged(buffer->revision()); | 263 | emit revisionChanged(buffer->revision()); |
259 | break; | 264 | break; |
260 | } | 265 | } |
266 | case Commands::CommandCompletion: { | ||
267 | auto buffer = GetCommandCompletion(d->partialMessageBuffer.constData() + headerSize); | ||
268 | log(QString("Command %1 completed %2").arg(buffer->id()).arg(buffer->success() ? "sucessfully" : "unsuccessfully")); | ||
269 | //TODO: if a queued command, get it out of the queue ... pass on completion ot the relevant objects .. etc | ||
270 | break; | ||
271 | } | ||
261 | default: | 272 | default: |
262 | break; | 273 | break; |
263 | } | 274 | } |