diff options
Diffstat (limited to 'synchronizer/listener.cpp')
-rw-r--r-- | synchronizer/listener.cpp | 192 |
1 files changed, 192 insertions, 0 deletions
diff --git a/synchronizer/listener.cpp b/synchronizer/listener.cpp new file mode 100644 index 0000000..23a5a70 --- /dev/null +++ b/synchronizer/listener.cpp | |||
@@ -0,0 +1,192 @@ | |||
1 | #include "listener.h" | ||
2 | |||
3 | #include "common/console.h" | ||
4 | #include "common/commands.h" | ||
5 | #include "common/handshake_generated.h" | ||
6 | #include "common/revisionupdate_generated.h" | ||
7 | |||
8 | #include <QLocalSocket> | ||
9 | #include <QTimer> | ||
10 | |||
11 | Listener::Listener(const QString &resource, QObject *parent) | ||
12 | : QObject(parent), | ||
13 | m_server(new QLocalServer(this)), | ||
14 | m_revision(0) | ||
15 | { | ||
16 | connect(m_server, &QLocalServer::newConnection, | ||
17 | this, &Listener::acceptConnection); | ||
18 | Console::main()->log(QString("Trying to open %1").arg(resource)); | ||
19 | if (!m_server->listen(resource)) { | ||
20 | // FIXME: multiple starts need to be handled here | ||
21 | m_server->removeServer(resource); | ||
22 | if (!m_server->listen(resource)) { | ||
23 | Console::main()->log("Utter failure to start server"); | ||
24 | exit(-1); | ||
25 | } | ||
26 | } | ||
27 | |||
28 | if (m_server->isListening()) { | ||
29 | Console::main()->log(QString("Listening on %1").arg(m_server->serverName())); | ||
30 | } | ||
31 | |||
32 | QTimer::singleShot(2000, this, SLOT(checkConnections())); | ||
33 | } | ||
34 | |||
35 | Listener::~Listener() | ||
36 | { | ||
37 | } | ||
38 | |||
39 | void Listener::setRevision(unsigned long long revision) | ||
40 | { | ||
41 | if (m_revision != revision) { | ||
42 | m_revision = revision; | ||
43 | updateClientsWithRevision(); | ||
44 | } | ||
45 | } | ||
46 | |||
47 | unsigned long long Listener::revision() const | ||
48 | { | ||
49 | return m_revision; | ||
50 | } | ||
51 | |||
52 | void Listener::closeAllConnections() | ||
53 | { | ||
54 | //TODO: close all client connectionsin m_connections | ||
55 | for (Client &client: m_connections) { | ||
56 | if (client.socket) { | ||
57 | client.socket->close(); | ||
58 | delete client.socket; | ||
59 | client.socket = 0; | ||
60 | } | ||
61 | } | ||
62 | } | ||
63 | |||
64 | void Listener::acceptConnection() | ||
65 | { | ||
66 | Console::main()->log(QString("Accepting connection")); | ||
67 | QLocalSocket *socket = m_server->nextPendingConnection(); | ||
68 | |||
69 | if (!socket) { | ||
70 | return; | ||
71 | } | ||
72 | |||
73 | Console::main()->log("Got a connection"); | ||
74 | Client client("Unknown Client" /*fixme: actual names!*/, socket); | ||
75 | connect(socket, &QIODevice::readyRead, | ||
76 | this, &Listener::readFromSocket); | ||
77 | m_connections << client; | ||
78 | connect(socket, &QLocalSocket::disconnected, | ||
79 | this, &Listener::clientDropped); | ||
80 | |||
81 | } | ||
82 | |||
83 | void Listener::clientDropped() | ||
84 | { | ||
85 | QLocalSocket *socket = qobject_cast<QLocalSocket *>(sender()); | ||
86 | if (!socket) { | ||
87 | return; | ||
88 | } | ||
89 | |||
90 | Console::main()->log("Dropping connection..."); | ||
91 | QMutableVectorIterator<Client> it(m_connections); | ||
92 | while (it.hasNext()) { | ||
93 | const Client &client = it.next(); | ||
94 | if (client.socket == socket) { | ||
95 | Console::main()->log(QString(" dropped... %1").arg(client.name)); | ||
96 | it.remove(); | ||
97 | break; | ||
98 | } | ||
99 | } | ||
100 | |||
101 | checkConnections(); | ||
102 | } | ||
103 | |||
104 | void Listener::checkConnections() | ||
105 | { | ||
106 | if (m_connections.isEmpty()) { | ||
107 | m_server->close(); | ||
108 | emit noClients(); | ||
109 | } | ||
110 | } | ||
111 | |||
112 | void Listener::readFromSocket() | ||
113 | { | ||
114 | QLocalSocket *socket = qobject_cast<QLocalSocket *>(sender()); | ||
115 | if (!socket) { | ||
116 | return; | ||
117 | } | ||
118 | |||
119 | Console::main()->log("Reading from socket..."); | ||
120 | for (Client &client: m_connections) { | ||
121 | if (client.socket == socket) { | ||
122 | Console::main()->log(QString(" Client: %1").arg(client.name)); | ||
123 | client.commandBuffer += socket->readAll(); | ||
124 | // FIXME: schedule these rather than process them all at once | ||
125 | // right now this can lead to starvation of clients due to | ||
126 | // one overly active client | ||
127 | while (processClientBuffer(client)) {} | ||
128 | break; | ||
129 | } | ||
130 | } | ||
131 | } | ||
132 | |||
133 | bool Listener::processClientBuffer(Client &client) | ||
134 | { | ||
135 | static const int headerSize = (sizeof(int) * 2); | ||
136 | Console::main()->log(QString("processing %1").arg(client.commandBuffer.size())); | ||
137 | if (client.commandBuffer.size() < headerSize) { | ||
138 | return false; | ||
139 | } | ||
140 | |||
141 | int commandId, size; | ||
142 | commandId = *(int*)client.commandBuffer.constData(); | ||
143 | size = *(int*)(client.commandBuffer.constData() + sizeof(int)); | ||
144 | |||
145 | if (size <= client.commandBuffer.size() - headerSize) { | ||
146 | QByteArray data = client.commandBuffer.mid(headerSize, size); | ||
147 | client.commandBuffer.remove(0, headerSize + size); | ||
148 | |||
149 | switch (commandId) { | ||
150 | case Commands::HandshakeCommand: { | ||
151 | auto buffer = Akonadi::GetHandshake(data.constData()); | ||
152 | Console::main()->log(QString(" Handshake from %1").arg(buffer->name()->c_str())); | ||
153 | sendCurrentRevision(client); | ||
154 | break; | ||
155 | } | ||
156 | default: | ||
157 | // client.hasSentCommand = true; | ||
158 | break; | ||
159 | } | ||
160 | |||
161 | return client.commandBuffer.size() >= headerSize; | ||
162 | } else { | ||
163 | return false; | ||
164 | } | ||
165 | } | ||
166 | |||
167 | void Listener::sendCurrentRevision(Client &client) | ||
168 | { | ||
169 | if (!client.socket || !client.socket->isValid()) { | ||
170 | return; | ||
171 | } | ||
172 | |||
173 | auto command = Akonadi::CreateRevisionUpdate(m_fbb, m_revision); | ||
174 | Akonadi::FinishRevisionUpdateBuffer(m_fbb, command); | ||
175 | Commands::write(client.socket, Commands::RevisionUpdateCommand, m_fbb); | ||
176 | m_fbb.Clear(); | ||
177 | } | ||
178 | |||
179 | void Listener::updateClientsWithRevision() | ||
180 | { | ||
181 | auto command = Akonadi::CreateRevisionUpdate(m_fbb, m_revision); | ||
182 | Akonadi::FinishRevisionUpdateBuffer(m_fbb, command); | ||
183 | |||
184 | for (const Client &client: m_connections) { | ||
185 | if (!client.socket || !client.socket->isValid()) { | ||
186 | continue; | ||
187 | } | ||
188 | |||
189 | Commands::write(client.socket, Commands::RevisionUpdateCommand, m_fbb); | ||
190 | } | ||
191 | m_fbb.Clear(); | ||
192 | } | ||