summaryrefslogtreecommitdiffstats
path: root/synchronizer/listener.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'synchronizer/listener.cpp')
-rw-r--r--synchronizer/listener.cpp192
1 files changed, 192 insertions, 0 deletions
diff --git a/synchronizer/listener.cpp b/synchronizer/listener.cpp
new file mode 100644
index 0000000..23a5a70
--- /dev/null
+++ b/synchronizer/listener.cpp
@@ -0,0 +1,192 @@
1#include "listener.h"
2
3#include "common/console.h"
4#include "common/commands.h"
5#include "common/handshake_generated.h"
6#include "common/revisionupdate_generated.h"
7
8#include <QLocalSocket>
9#include <QTimer>
10
11Listener::Listener(const QString &resource, QObject *parent)
12 : QObject(parent),
13 m_server(new QLocalServer(this)),
14 m_revision(0)
15{
16 connect(m_server, &QLocalServer::newConnection,
17 this, &Listener::acceptConnection);
18 Console::main()->log(QString("Trying to open %1").arg(resource));
19 if (!m_server->listen(resource)) {
20 // FIXME: multiple starts need to be handled here
21 m_server->removeServer(resource);
22 if (!m_server->listen(resource)) {
23 Console::main()->log("Utter failure to start server");
24 exit(-1);
25 }
26 }
27
28 if (m_server->isListening()) {
29 Console::main()->log(QString("Listening on %1").arg(m_server->serverName()));
30 }
31
32 QTimer::singleShot(2000, this, SLOT(checkConnections()));
33}
34
35Listener::~Listener()
36{
37}
38
39void Listener::setRevision(unsigned long long revision)
40{
41 if (m_revision != revision) {
42 m_revision = revision;
43 updateClientsWithRevision();
44 }
45}
46
47unsigned long long Listener::revision() const
48{
49 return m_revision;
50}
51
52void Listener::closeAllConnections()
53{
54 //TODO: close all client connectionsin m_connections
55 for (Client &client: m_connections) {
56 if (client.socket) {
57 client.socket->close();
58 delete client.socket;
59 client.socket = 0;
60 }
61 }
62}
63
64void Listener::acceptConnection()
65{
66 Console::main()->log(QString("Accepting connection"));
67 QLocalSocket *socket = m_server->nextPendingConnection();
68
69 if (!socket) {
70 return;
71 }
72
73 Console::main()->log("Got a connection");
74 Client client("Unknown Client" /*fixme: actual names!*/, socket);
75 connect(socket, &QIODevice::readyRead,
76 this, &Listener::readFromSocket);
77 m_connections << client;
78 connect(socket, &QLocalSocket::disconnected,
79 this, &Listener::clientDropped);
80
81}
82
83void Listener::clientDropped()
84{
85 QLocalSocket *socket = qobject_cast<QLocalSocket *>(sender());
86 if (!socket) {
87 return;
88 }
89
90 Console::main()->log("Dropping connection...");
91 QMutableVectorIterator<Client> it(m_connections);
92 while (it.hasNext()) {
93 const Client &client = it.next();
94 if (client.socket == socket) {
95 Console::main()->log(QString(" dropped... %1").arg(client.name));
96 it.remove();
97 break;
98 }
99 }
100
101 checkConnections();
102}
103
104void Listener::checkConnections()
105{
106 if (m_connections.isEmpty()) {
107 m_server->close();
108 emit noClients();
109 }
110}
111
112void Listener::readFromSocket()
113{
114 QLocalSocket *socket = qobject_cast<QLocalSocket *>(sender());
115 if (!socket) {
116 return;
117 }
118
119 Console::main()->log("Reading from socket...");
120 for (Client &client: m_connections) {
121 if (client.socket == socket) {
122 Console::main()->log(QString(" Client: %1").arg(client.name));
123 client.commandBuffer += socket->readAll();
124 // FIXME: schedule these rather than process them all at once
125 // right now this can lead to starvation of clients due to
126 // one overly active client
127 while (processClientBuffer(client)) {}
128 break;
129 }
130 }
131}
132
133bool Listener::processClientBuffer(Client &client)
134{
135 static const int headerSize = (sizeof(int) * 2);
136 Console::main()->log(QString("processing %1").arg(client.commandBuffer.size()));
137 if (client.commandBuffer.size() < headerSize) {
138 return false;
139 }
140
141 int commandId, size;
142 commandId = *(int*)client.commandBuffer.constData();
143 size = *(int*)(client.commandBuffer.constData() + sizeof(int));
144
145 if (size <= client.commandBuffer.size() - headerSize) {
146 QByteArray data = client.commandBuffer.mid(headerSize, size);
147 client.commandBuffer.remove(0, headerSize + size);
148
149 switch (commandId) {
150 case Commands::HandshakeCommand: {
151 auto buffer = Akonadi::GetHandshake(data.constData());
152 Console::main()->log(QString(" Handshake from %1").arg(buffer->name()->c_str()));
153 sendCurrentRevision(client);
154 break;
155 }
156 default:
157 // client.hasSentCommand = true;
158 break;
159 }
160
161 return client.commandBuffer.size() >= headerSize;
162 } else {
163 return false;
164 }
165}
166
167void Listener::sendCurrentRevision(Client &client)
168{
169 if (!client.socket || !client.socket->isValid()) {
170 return;
171 }
172
173 auto command = Akonadi::CreateRevisionUpdate(m_fbb, m_revision);
174 Akonadi::FinishRevisionUpdateBuffer(m_fbb, command);
175 Commands::write(client.socket, Commands::RevisionUpdateCommand, m_fbb);
176 m_fbb.Clear();
177}
178
179void Listener::updateClientsWithRevision()
180{
181 auto command = Akonadi::CreateRevisionUpdate(m_fbb, m_revision);
182 Akonadi::FinishRevisionUpdateBuffer(m_fbb, command);
183
184 for (const Client &client: m_connections) {
185 if (!client.socket || !client.socket->isValid()) {
186 continue;
187 }
188
189 Commands::write(client.socket, Commands::RevisionUpdateCommand, m_fbb);
190 }
191 m_fbb.Clear();
192}