summaryrefslogtreecommitdiffstats
path: root/synchronizer
diff options
context:
space:
mode:
authorAaron Seigo <aseigo@kde.org>2014-12-06 02:33:51 +0100
committerAaron Seigo <aseigo@kde.org>2014-12-06 02:33:51 +0100
commit5f40ace47be289c74ad95948c75ed86676158639 (patch)
treeb3c50012da8c7404398050a17724342866a9e83e /synchronizer
parent6b8994edba4fcc0663838b90f66fe2ad0f9e134a (diff)
downloadsink-5f40ace47be289c74ad95948c75ed86676158639.tar.gz
sink-5f40ace47be289c74ad95948c75ed86676158639.zip
resource -> synchronizer
the resource will be the plugin that interacts with the source and store
Diffstat (limited to 'synchronizer')
-rw-r--r--synchronizer/CMakeLists.txt13
-rw-r--r--synchronizer/listener.cpp192
-rw-r--r--synchronizer/listener.h63
-rw-r--r--synchronizer/main.cpp25
4 files changed, 293 insertions, 0 deletions
diff --git a/synchronizer/CMakeLists.txt b/synchronizer/CMakeLists.txt
new file mode 100644
index 0000000..92cb465
--- /dev/null
+++ b/synchronizer/CMakeLists.txt
@@ -0,0 +1,13 @@
1project(akonadinext_synchronizer)
2
3include_directories(${CMAKE_CURRENT_SOURCE_DIR} ${CMAKE_CURRENT_BINARY_DIR})
4
5set(akonadinextsynchronizer_SRCS
6 main.cpp
7 listener.cpp
8)
9
10add_executable(${PROJECT_NAME} ${akonadinextsynchronizer_SRCS})
11target_link_libraries(${PROJECT_NAME} akonadinextcommon)
12qt5_use_modules(${PROJECT_NAME} Widgets Network)
13install(TARGETS ${PROJECT_NAME} DESTINATION bin)
diff --git a/synchronizer/listener.cpp b/synchronizer/listener.cpp
new file mode 100644
index 0000000..23a5a70
--- /dev/null
+++ b/synchronizer/listener.cpp
@@ -0,0 +1,192 @@
1#include "listener.h"
2
3#include "common/console.h"
4#include "common/commands.h"
5#include "common/handshake_generated.h"
6#include "common/revisionupdate_generated.h"
7
8#include <QLocalSocket>
9#include <QTimer>
10
11Listener::Listener(const QString &resource, QObject *parent)
12 : QObject(parent),
13 m_server(new QLocalServer(this)),
14 m_revision(0)
15{
16 connect(m_server, &QLocalServer::newConnection,
17 this, &Listener::acceptConnection);
18 Console::main()->log(QString("Trying to open %1").arg(resource));
19 if (!m_server->listen(resource)) {
20 // FIXME: multiple starts need to be handled here
21 m_server->removeServer(resource);
22 if (!m_server->listen(resource)) {
23 Console::main()->log("Utter failure to start server");
24 exit(-1);
25 }
26 }
27
28 if (m_server->isListening()) {
29 Console::main()->log(QString("Listening on %1").arg(m_server->serverName()));
30 }
31
32 QTimer::singleShot(2000, this, SLOT(checkConnections()));
33}
34
35Listener::~Listener()
36{
37}
38
39void Listener::setRevision(unsigned long long revision)
40{
41 if (m_revision != revision) {
42 m_revision = revision;
43 updateClientsWithRevision();
44 }
45}
46
47unsigned long long Listener::revision() const
48{
49 return m_revision;
50}
51
52void Listener::closeAllConnections()
53{
54 //TODO: close all client connectionsin m_connections
55 for (Client &client: m_connections) {
56 if (client.socket) {
57 client.socket->close();
58 delete client.socket;
59 client.socket = 0;
60 }
61 }
62}
63
64void Listener::acceptConnection()
65{
66 Console::main()->log(QString("Accepting connection"));
67 QLocalSocket *socket = m_server->nextPendingConnection();
68
69 if (!socket) {
70 return;
71 }
72
73 Console::main()->log("Got a connection");
74 Client client("Unknown Client" /*fixme: actual names!*/, socket);
75 connect(socket, &QIODevice::readyRead,
76 this, &Listener::readFromSocket);
77 m_connections << client;
78 connect(socket, &QLocalSocket::disconnected,
79 this, &Listener::clientDropped);
80
81}
82
83void Listener::clientDropped()
84{
85 QLocalSocket *socket = qobject_cast<QLocalSocket *>(sender());
86 if (!socket) {
87 return;
88 }
89
90 Console::main()->log("Dropping connection...");
91 QMutableVectorIterator<Client> it(m_connections);
92 while (it.hasNext()) {
93 const Client &client = it.next();
94 if (client.socket == socket) {
95 Console::main()->log(QString(" dropped... %1").arg(client.name));
96 it.remove();
97 break;
98 }
99 }
100
101 checkConnections();
102}
103
104void Listener::checkConnections()
105{
106 if (m_connections.isEmpty()) {
107 m_server->close();
108 emit noClients();
109 }
110}
111
112void Listener::readFromSocket()
113{
114 QLocalSocket *socket = qobject_cast<QLocalSocket *>(sender());
115 if (!socket) {
116 return;
117 }
118
119 Console::main()->log("Reading from socket...");
120 for (Client &client: m_connections) {
121 if (client.socket == socket) {
122 Console::main()->log(QString(" Client: %1").arg(client.name));
123 client.commandBuffer += socket->readAll();
124 // FIXME: schedule these rather than process them all at once
125 // right now this can lead to starvation of clients due to
126 // one overly active client
127 while (processClientBuffer(client)) {}
128 break;
129 }
130 }
131}
132
133bool Listener::processClientBuffer(Client &client)
134{
135 static const int headerSize = (sizeof(int) * 2);
136 Console::main()->log(QString("processing %1").arg(client.commandBuffer.size()));
137 if (client.commandBuffer.size() < headerSize) {
138 return false;
139 }
140
141 int commandId, size;
142 commandId = *(int*)client.commandBuffer.constData();
143 size = *(int*)(client.commandBuffer.constData() + sizeof(int));
144
145 if (size <= client.commandBuffer.size() - headerSize) {
146 QByteArray data = client.commandBuffer.mid(headerSize, size);
147 client.commandBuffer.remove(0, headerSize + size);
148
149 switch (commandId) {
150 case Commands::HandshakeCommand: {
151 auto buffer = Akonadi::GetHandshake(data.constData());
152 Console::main()->log(QString(" Handshake from %1").arg(buffer->name()->c_str()));
153 sendCurrentRevision(client);
154 break;
155 }
156 default:
157 // client.hasSentCommand = true;
158 break;
159 }
160
161 return client.commandBuffer.size() >= headerSize;
162 } else {
163 return false;
164 }
165}
166
167void Listener::sendCurrentRevision(Client &client)
168{
169 if (!client.socket || !client.socket->isValid()) {
170 return;
171 }
172
173 auto command = Akonadi::CreateRevisionUpdate(m_fbb, m_revision);
174 Akonadi::FinishRevisionUpdateBuffer(m_fbb, command);
175 Commands::write(client.socket, Commands::RevisionUpdateCommand, m_fbb);
176 m_fbb.Clear();
177}
178
179void Listener::updateClientsWithRevision()
180{
181 auto command = Akonadi::CreateRevisionUpdate(m_fbb, m_revision);
182 Akonadi::FinishRevisionUpdateBuffer(m_fbb, command);
183
184 for (const Client &client: m_connections) {
185 if (!client.socket || !client.socket->isValid()) {
186 continue;
187 }
188
189 Commands::write(client.socket, Commands::RevisionUpdateCommand, m_fbb);
190 }
191 m_fbb.Clear();
192}
diff --git a/synchronizer/listener.h b/synchronizer/listener.h
new file mode 100644
index 0000000..dcc3818
--- /dev/null
+++ b/synchronizer/listener.h
@@ -0,0 +1,63 @@
1#pragma once
2
3#include <QLocalServer>
4#include <QLocalSocket>
5#include <QList>
6#include <QObject>
7
8#include <flatbuffers/flatbuffers.h>
9
10class Client
11{
12public:
13 Client()
14 : socket(nullptr),
15 hasSentCommand(false)
16 {
17 }
18
19 Client(const QString &n, QLocalSocket *s)
20 : name(n),
21 socket(s),
22 hasSentCommand(false)
23 {
24 }
25
26 QString name;
27 QLocalSocket *socket;
28 QByteArray commandBuffer;
29 bool hasSentCommand;
30};
31
32class Listener : public QObject
33{
34 Q_OBJECT
35
36public:
37 Listener(const QString &resourceName, QObject *parent = 0);
38 ~Listener();
39
40 void setRevision(unsigned long long revision);
41 unsigned long long revision() const;
42
43Q_SIGNALS:
44 void noClients();
45
46public Q_SLOTS:
47 void closeAllConnections();
48
49private Q_SLOTS:
50 void acceptConnection();
51 void clientDropped();
52 void checkConnections();
53 void readFromSocket();
54
55private:
56 bool processClientBuffer(Client &client);
57 void sendCurrentRevision(Client &client);
58 void updateClientsWithRevision();
59 QLocalServer *m_server;
60 QVector<Client> m_connections;
61 unsigned long long m_revision;
62 flatbuffers::FlatBufferBuilder m_fbb;
63};
diff --git a/synchronizer/main.cpp b/synchronizer/main.cpp
new file mode 100644
index 0000000..91c0a9a
--- /dev/null
+++ b/synchronizer/main.cpp
@@ -0,0 +1,25 @@
1
2#include <QApplication>
3
4#include "common/console.h"
5#include "listener.h"
6
7int main(int argc, char *argv[])
8{
9 QApplication app(argc, argv);
10
11 new Console(QString("Resource: %1").arg(argv[1]));
12 if (argc < 2) {
13 Console::main()->log("Not enough args");
14 return app.exec();
15 }
16
17 Listener *listener = new Listener(argv[1]);
18
19 QObject::connect(&app, &QCoreApplication::aboutToQuit,
20 listener, &Listener::closeAllConnections);
21 QObject::connect(listener, &Listener::noClients,
22 &app, &QCoreApplication::quit);
23
24 return app.exec();
25} \ No newline at end of file