summaryrefslogtreecommitdiffstats
path: root/common
diff options
context:
space:
mode:
authorAaron Seigo <aseigo@kde.org>2014-12-16 22:40:44 +0100
committerAaron Seigo <aseigo@kde.org>2014-12-16 22:40:44 +0100
commit77944384d24b5005d6b8648572a31a3ae84dd946 (patch)
tree8726831773b4182cb6177d6c72a723e08a6c15aa /common
parent66b21fd2e3c53e4a820e3343b192be7b043da110 (diff)
downloadsink-77944384d24b5005d6b8648572a31a3ae84dd946.tar.gz
sink-77944384d24b5005d6b8648572a31a3ae84dd946.zip
add pipelines (as a sketch only), message ids and message responses
Diffstat (limited to 'common')
-rw-r--r--common/CMakeLists.txt2
-rw-r--r--common/commands.cpp15
-rw-r--r--common/commands.h9
-rw-r--r--common/commands/commandcompletion.fbs6
-rw-r--r--common/pipeline.cpp75
-rw-r--r--common/pipeline.h47
-rw-r--r--common/resource.cpp4
-rw-r--r--common/resource.h8
-rw-r--r--common/resourceaccess.cpp35
9 files changed, 176 insertions, 25 deletions
diff --git a/common/CMakeLists.txt b/common/CMakeLists.txt
index 7de4aa9..001dab5 100644
--- a/common/CMakeLists.txt
+++ b/common/CMakeLists.txt
@@ -1,5 +1,6 @@
1project(akonadi2common) 1project(akonadi2common)
2generate_flatbuffers( 2generate_flatbuffers(
3 commands/commandcompletion
3 commands/createentity 4 commands/createentity
4 commands/deleteentity 5 commands/deleteentity
5 commands/fetchentity 6 commands/fetchentity
@@ -20,6 +21,7 @@ set(command_SRCS
20 clientapi.cpp 21 clientapi.cpp
21 commands.cpp 22 commands.cpp
22 console.cpp 23 console.cpp
24 pipeline.cpp
23 resource.cpp 25 resource.cpp
24 resourceaccess.cpp 26 resourceaccess.cpp
25 storage_common.cpp 27 storage_common.cpp
diff --git a/common/commands.cpp b/common/commands.cpp
index ecbbfdb..1dfeabe 100644
--- a/common/commands.cpp
+++ b/common/commands.cpp
@@ -28,17 +28,23 @@ namespace Akonadi2
28namespace Commands 28namespace Commands
29{ 29{
30 30
31void write(QIODevice *device, int commandId) 31int headerSize()
32{ 32{
33 write(device, commandId, 0, 0); 33 return sizeof(int) + (sizeof(uint) * 2);
34} 34}
35 35
36void write(QIODevice *device, int commandId, const char *buffer, uint size) 36void write(QIODevice *device, int messageId, int commandId)
37{
38 write(device, messageId, commandId, 0, 0);
39}
40
41void write(QIODevice *device, int messageId, int commandId, const char *buffer, uint size)
37{ 42{
38 if (size > 0 && !buffer) { 43 if (size > 0 && !buffer) {
39 size = 0; 44 size = 0;
40 } 45 }
41 46
47 device->write((const char*)&messageId, sizeof(int));
42 device->write((const char*)&commandId, sizeof(int)); 48 device->write((const char*)&commandId, sizeof(int));
43 device->write((const char*)&size, sizeof(uint)); 49 device->write((const char*)&size, sizeof(uint));
44 if (buffer) { 50 if (buffer) {
@@ -46,9 +52,10 @@ void write(QIODevice *device, int commandId, const char *buffer, uint size)
46 } 52 }
47} 53}
48 54
49void write(QIODevice *device, int commandId, flatbuffers::FlatBufferBuilder &fbb) 55void write(QIODevice *device, int messageId, int commandId, flatbuffers::FlatBufferBuilder &fbb)
50{ 56{
51 const int dataSize = fbb.GetSize(); 57 const int dataSize = fbb.GetSize();
58 device->write((const char*)&messageId, sizeof(int));
52 device->write((const char*)&commandId, sizeof(int)); 59 device->write((const char*)&commandId, sizeof(int));
53 device->write((const char*)&dataSize, sizeof(int)); 60 device->write((const char*)&dataSize, sizeof(int));
54 device->write((const char*)fbb.GetBufferPointer(), dataSize); 61 device->write((const char*)fbb.GetBufferPointer(), dataSize);
diff --git a/common/commands.h b/common/commands.h
index 874db73..c63bb47 100644
--- a/common/commands.h
+++ b/common/commands.h
@@ -33,8 +33,8 @@ namespace Commands
33 33
34enum CommandIds { 34enum CommandIds {
35 UnknownCommand = 0, 35 UnknownCommand = 0,
36 HandshakeCommand,
37 CommandCompletion, 36 CommandCompletion,
37 HandshakeCommand,
38 RevisionUpdateCommand, 38 RevisionUpdateCommand,
39 SynchronizeCommand, 39 SynchronizeCommand,
40 FetchEntityCommand, 40 FetchEntityCommand,
@@ -45,9 +45,10 @@ enum CommandIds {
45 CustomCommand = 0xffff 45 CustomCommand = 0xffff
46}; 46};
47 47
48void AKONADI2COMMON_EXPORT write(QIODevice *device, int commandId); 48int AKONADI2COMMON_EXPORT headerSize();
49void AKONADI2COMMON_EXPORT write(QIODevice *device, int commandId, const char *buffer, uint size); 49void AKONADI2COMMON_EXPORT write(QIODevice *device, int messageId, int commandId);
50void AKONADI2COMMON_EXPORT write(QIODevice *device, int commandId, flatbuffers::FlatBufferBuilder &fbb); 50void AKONADI2COMMON_EXPORT write(QIODevice *device, int messageId, int commandId, const char *buffer, uint size);
51void AKONADI2COMMON_EXPORT write(QIODevice *device, int messageId, int commandId, flatbuffers::FlatBufferBuilder &fbb);
51 52
52} 53}
53 54
diff --git a/common/commands/commandcompletion.fbs b/common/commands/commandcompletion.fbs
index 9583108..5330b4f 100644
--- a/common/commands/commandcompletion.fbs
+++ b/common/commands/commandcompletion.fbs
@@ -1,9 +1,9 @@
1namespace Akonadi2; 1namespace Akonadi2;
2 2
3table CommandCompletion { 3table CommandCompletion {
4 id: ulong 4 id: ulong;
5 success: bool 5 success: bool = true;
6 log: ulong = 0 6 log: ulong = 0;
7} 7}
8 8
9root_type CommandCompletion; 9root_type CommandCompletion;
diff --git a/common/pipeline.cpp b/common/pipeline.cpp
new file mode 100644
index 0000000..41def7c
--- /dev/null
+++ b/common/pipeline.cpp
@@ -0,0 +1,75 @@
1/*
2 * Copyright (C) 2014 Aaron Seigo <aseigo@kde.org>
3 *
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Lesser General Public
6 * License as published by the Free Software Foundation; either
7 * version 2.1 of the License, or (at your option) version 3, or any
8 * later version accepted by the membership of KDE e.V. (or its
9 * successor approved by the membership of KDE e.V.), which shall
10 * act as a proxy defined in Section 6 of version 3 of the license.
11 *
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this library. If not, see <http://www.gnu.org/licenses/>.
19 */
20
21#include "pipeline.h"
22
23#include <QStandardPaths>
24
25namespace Akonadi2
26{
27
28class Pipeline::Private
29{
30public:
31 Private(const QString &storageName)
32 : storage(QStandardPaths::writableLocation(QStandardPaths::QStandardPaths::GenericDataLocation) + "/akonadi2", storageName, Akonadi2::Storage::ReadWrite)
33 {
34
35 }
36
37 Akonadi2::Storage storage;
38};
39
40Pipeline::Pipeline(const QString &storageName)
41 : d(new Private(storageName))
42{
43}
44
45Pipeline::~Pipeline()
46{
47}
48
49Storage &Pipeline::storage()
50{
51 return d->storage;
52}
53
54void Pipeline::null(uint messageId, const char *key, size_t keySize, const char *buffer, size_t bufferSize)
55{
56 d->storage.write(key, keySize, buffer, bufferSize);
57}
58
59void Pipeline::newEntity(uint messageId, const char *key, size_t keySize, const char *buffer, size_t bufferSize)
60{
61 d->storage.write(key, keySize, buffer, bufferSize);
62}
63
64void Pipeline::modifiedEntity(uint messageId, const char *key, size_t keySize, const char *buffer, size_t bufferSize)
65{
66 d->storage.write(key, keySize, buffer, bufferSize);
67}
68
69void Pipeline::deletedEntity(uint messageId, const char *key, size_t keySize, const char *buffer, size_t bufferSize)
70{
71 d->storage.write(key, keySize, buffer, bufferSize);
72}
73
74} // namespace Akonadi2
75
diff --git a/common/pipeline.h b/common/pipeline.h
new file mode 100644
index 0000000..635e630
--- /dev/null
+++ b/common/pipeline.h
@@ -0,0 +1,47 @@
1/*
2 * Copyright (C) 2014 Aaron Seigo <aseigo@kde.org>
3 *
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Lesser General Public
6 * License as published by the Free Software Foundation; either
7 * version 2.1 of the License, or (at your option) version 3, or any
8 * later version accepted by the membership of KDE e.V. (or its
9 * successor approved by the membership of KDE e.V.), which shall
10 * act as a proxy defined in Section 6 of version 3 of the license.
11 *
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this library. If not, see <http://www.gnu.org/licenses/>.
19 */
20
21#pragma once
22
23#include <akonadi2common_export.h>
24#include <storage.h>
25
26namespace Akonadi2
27{
28
29class AKONADI2COMMON_EXPORT Pipeline
30{
31public:
32 Pipeline(const QString &storagePath);
33 ~Pipeline();
34
35 Storage &storage();
36 void null(uint messageId, const char *key, size_t keySize, const char *buffer, size_t bufferSize);
37 void newEntity(uint messageId, const char *key, size_t keySize, const char *buffer, size_t bufferSize);
38 void modifiedEntity(uint messageId, const char *key, size_t keySize, const char *buffer, size_t bufferSize);
39 void deletedEntity(uint messageId, const char *key, size_t keySize, const char *buffer, size_t bufferSize);
40
41private:
42 class Private;
43 Private * const d;
44};
45
46} // namespace Akonadi2
47
diff --git a/common/resource.cpp b/common/resource.cpp
index 7120d1c..11a03ca 100644
--- a/common/resource.cpp
+++ b/common/resource.cpp
@@ -39,6 +39,10 @@ Resource::~Resource()
39 //delete d; 39 //delete d;
40} 40}
41 41
42void Resource::processCommand(uint messageId, int commandId, const QByteArray &data, uint size, Pipeline *pipeline)
43{
44}
45
42void Resource::synchronizeWithSource() 46void Resource::synchronizeWithSource()
43{ 47{
44} 48}
diff --git a/common/resource.h b/common/resource.h
index 2ecff03..53c0bc1 100644
--- a/common/resource.h
+++ b/common/resource.h
@@ -18,18 +18,22 @@
18 * License along with this library. If not, see <http://www.gnu.org/licenses/>. 18 * License along with this library. If not, see <http://www.gnu.org/licenses/>.
19 */ 19 */
20 20
21#include "clientapi.h" 21#include <clientapi.h>
22#include <akonadi2common_export.h>
23#include <pipeline.h>
22 24
23namespace Akonadi2 25namespace Akonadi2
24{ 26{
25 27
26class Resource 28class AKONADI2COMMON_EXPORT Resource
27{ 29{
28public: 30public:
29 //TODO: configuration 31 //TODO: configuration
30 Resource(); 32 Resource();
31 virtual ~Resource(); 33 virtual ~Resource();
32 34
35 //TODO: this will need to be async
36 virtual void processCommand(uint messageId, int commandId, const QByteArray &data, uint size, Pipeline *pipeline);
33 virtual void synchronizeWithSource(); 37 virtual void synchronizeWithSource();
34 38
35private: 39private:
diff --git a/common/resourceaccess.cpp b/common/resourceaccess.cpp
index a4f3c94..2f7d207 100644
--- a/common/resourceaccess.cpp
+++ b/common/resourceaccess.cpp
@@ -22,6 +22,7 @@
22 22
23#include "common/console.h" 23#include "common/console.h"
24#include "common/commands.h" 24#include "common/commands.h"
25#include "common/commandcompletion_generated.h"
25#include "common/handshake_generated.h" 26#include "common/handshake_generated.h"
26#include "common/revisionupdate_generated.h" 27#include "common/revisionupdate_generated.h"
27 28
@@ -54,10 +55,10 @@ public:
54 delete[] m_buffer; 55 delete[] m_buffer;
55 } 56 }
56 57
57 void write(QIODevice *device) 58 void write(QIODevice *device, uint messageId)
58 { 59 {
59 Console::main()->log(QString("\tSending queued command %1").arg(m_commandId)); 60 Console::main()->log(QString("\tSending queued command %1").arg(m_commandId));
60 Commands::write(device, m_commandId, m_buffer, m_bufferSize); 61 Commands::write(device, messageId, m_commandId, m_buffer, m_bufferSize);
61 } 62 }
62 63
63private: 64private:
@@ -80,13 +81,15 @@ public:
80 QByteArray partialMessageBuffer; 81 QByteArray partialMessageBuffer;
81 flatbuffers::FlatBufferBuilder fbb; 82 flatbuffers::FlatBufferBuilder fbb;
82 QVector<QueuedCommand *> commandQueue; 83 QVector<QueuedCommand *> commandQueue;
84 uint messageId;
83}; 85};
84 86
85ResourceAccess::Private::Private(const QString &name, ResourceAccess *q) 87ResourceAccess::Private::Private(const QString &name, ResourceAccess *q)
86 : resourceName(name), 88 : resourceName(name),
87 socket(new QLocalSocket(q)), 89 socket(new QLocalSocket(q)),
88 tryOpenTimer(new QTimer(q)), 90 tryOpenTimer(new QTimer(q)),
89 startingProcess(false) 91 startingProcess(false),
92 messageId(0)
90{ 93{
91} 94}
92 95
@@ -129,7 +132,7 @@ void ResourceAccess::sendCommand(int commandId)
129{ 132{
130 if (isReady()) { 133 if (isReady()) {
131 log(QString("Sending command %1").arg(commandId)); 134 log(QString("Sending command %1").arg(commandId));
132 Commands::write(d->socket, commandId); 135 Commands::write(d->socket, ++d->messageId, commandId);
133 } else { 136 } else {
134 d->commandQueue << new QueuedCommand(commandId); 137 d->commandQueue << new QueuedCommand(commandId);
135 } 138 }
@@ -139,7 +142,7 @@ void ResourceAccess::sendCommand(int commandId, flatbuffers::FlatBufferBuilder &
139{ 142{
140 if (isReady()) { 143 if (isReady()) {
141 log(QString("Sending command %1").arg(commandId)); 144 log(QString("Sending command %1").arg(commandId));
142 Commands::write(d->socket, commandId, fbb); 145 Commands::write(d->socket, ++d->messageId, commandId, fbb);
143 } else { 146 } else {
144 d->commandQueue << new QueuedCommand(commandId, fbb); 147 d->commandQueue << new QueuedCommand(commandId, fbb);
145 } 148 }
@@ -180,7 +183,7 @@ void ResourceAccess::connected()
180 auto name = d->fbb.CreateString(QString::number((long long)this).toLatin1()); 183 auto name = d->fbb.CreateString(QString::number((long long)this).toLatin1());
181 auto command = Akonadi2::CreateHandshake(d->fbb, name); 184 auto command = Akonadi2::CreateHandshake(d->fbb, name);
182 Akonadi2::FinishHandshakeBuffer(d->fbb, command); 185 Akonadi2::FinishHandshakeBuffer(d->fbb, command);
183 Commands::write(d->socket, Commands::HandshakeCommand, d->fbb); 186 Commands::write(d->socket, ++d->messageId, Commands::HandshakeCommand, d->fbb);
184 d->fbb.Clear(); 187 d->fbb.Clear();
185 } 188 }
186 189
@@ -188,7 +191,7 @@ void ResourceAccess::connected()
188 //TODO: serialize instead of blast them all through the socket? 191 //TODO: serialize instead of blast them all through the socket?
189 log(QString("We have %1 queued commands").arg(d->commandQueue.size())); 192 log(QString("We have %1 queued commands").arg(d->commandQueue.size()));
190 for (QueuedCommand *command: d->commandQueue) { 193 for (QueuedCommand *command: d->commandQueue) {
191 command->write(d->socket); 194 command->write(d->socket, ++d->messageId);
192 delete command; 195 delete command;
193 } 196 }
194 d->commandQueue.clear(); 197 d->commandQueue.clear();
@@ -239,25 +242,33 @@ void ResourceAccess::readResourceMessage()
239 242
240bool ResourceAccess::processMessageBuffer() 243bool ResourceAccess::processMessageBuffer()
241{ 244{
242 static const int headerSize = (sizeof(int) * 2); 245 static const int headerSize = Commands::headerSize();
243 if (d->partialMessageBuffer.size() < headerSize) { 246 if (d->partialMessageBuffer.size() < headerSize) {
244 return false; 247 return false;
245 } 248 }
246 249
247 const int commandId = *(int*)d->partialMessageBuffer.constData(); 250 //messageId is unused, so commented out
248 const int size = *(int*)(d->partialMessageBuffer.constData() + sizeof(int)); 251 //const uint messageId = *(int*)(d->partialMessageBuffer.constData());
252 const int commandId = *(int*)(d->partialMessageBuffer.constData() + sizeof(uint));
253 const uint size = *(int*)(d->partialMessageBuffer.constData() + sizeof(int) + sizeof(uint));
249 254
250 if (size > d->partialMessageBuffer.size() - headerSize) { 255 if (size > (uint)(d->partialMessageBuffer.size() - headerSize)) {
251 return false; 256 return false;
252 } 257 }
253 258
254 switch (commandId) { 259 switch (commandId) {
255 case Commands::RevisionUpdateCommand: { 260 case Commands::RevisionUpdateCommand: {
256 auto buffer = Akonadi2::GetRevisionUpdate(d->partialMessageBuffer.constData() + headerSize); 261 auto buffer = GetRevisionUpdate(d->partialMessageBuffer.constData() + headerSize);
257 log(QString("Revision updated to: %1").arg(buffer->revision())); 262 log(QString("Revision updated to: %1").arg(buffer->revision()));
258 emit revisionChanged(buffer->revision()); 263 emit revisionChanged(buffer->revision());
259 break; 264 break;
260 } 265 }
266 case Commands::CommandCompletion: {
267 auto buffer = GetCommandCompletion(d->partialMessageBuffer.constData() + headerSize);
268 log(QString("Command %1 completed %2").arg(buffer->id()).arg(buffer->success() ? "sucessfully" : "unsuccessfully"));
269 //TODO: if a queued command, get it out of the queue ... pass on completion ot the relevant objects .. etc
270 break;
271 }
261 default: 272 default:
262 break; 273 break;
263 } 274 }