diff options
Diffstat (limited to 'common/listener.cpp')
-rw-r--r-- | common/listener.cpp | 391 |
1 files changed, 391 insertions, 0 deletions
diff --git a/common/listener.cpp b/common/listener.cpp new file mode 100644 index 0000000..4316c63 --- /dev/null +++ b/common/listener.cpp | |||
@@ -0,0 +1,391 @@ | |||
1 | /* | ||
2 | * Copyright (C) 2014 Aaron Seigo <aseigo@kde.org> | ||
3 | * | ||
4 | * This program is free software; you can redistribute it and/or modify | ||
5 | * it under the terms of the GNU General Public License as published by | ||
6 | * the Free Software Foundation; either version 2 of the License, or | ||
7 | * (at your option) any later version. | ||
8 | * | ||
9 | * This program is distributed in the hope that it will be useful, | ||
10 | * but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
11 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||
12 | * GNU General Public License for more details. | ||
13 | * | ||
14 | * You should have received a copy of the GNU General Public License | ||
15 | * along with this program; if not, write to the | ||
16 | * Free Software Foundation, Inc., | ||
17 | * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. | ||
18 | */ | ||
19 | |||
20 | #include "listener.h" | ||
21 | |||
22 | #include "common/clientapi.h" | ||
23 | #include "common/commands.h" | ||
24 | #include "common/resource.h" | ||
25 | #include "common/log.h" | ||
26 | |||
27 | // commands | ||
28 | #include "common/commandcompletion_generated.h" | ||
29 | #include "common/handshake_generated.h" | ||
30 | #include "common/revisionupdate_generated.h" | ||
31 | #include "common/synchronize_generated.h" | ||
32 | #include "common/notification_generated.h" | ||
33 | |||
34 | #include <QLocalSocket> | ||
35 | #include <QTimer> | ||
36 | #include <QLockFile> | ||
37 | |||
38 | Listener::Listener(const QByteArray &resourceInstanceIdentifier, QObject *parent) | ||
39 | : QObject(parent), | ||
40 | m_server(new QLocalServer(this)), | ||
41 | m_resourceName(Akonadi2::Store::resourceName(resourceInstanceIdentifier)), | ||
42 | m_resourceInstanceIdentifier(resourceInstanceIdentifier), | ||
43 | m_resource(0), | ||
44 | m_pipeline(new Akonadi2::Pipeline(resourceInstanceIdentifier, parent)), | ||
45 | m_clientBufferProcessesTimer(new QTimer(this)), | ||
46 | m_messageId(0) | ||
47 | { | ||
48 | connect(m_pipeline, &Akonadi2::Pipeline::revisionUpdated, | ||
49 | this, &Listener::refreshRevision); | ||
50 | connect(m_server, &QLocalServer::newConnection, | ||
51 | this, &Listener::acceptConnection); | ||
52 | Trace() << "Trying to open " << m_resourceInstanceIdentifier; | ||
53 | |||
54 | m_lockfile = new QLockFile(m_resourceInstanceIdentifier + ".lock"); | ||
55 | m_lockfile->setStaleLockTime(0); | ||
56 | if (!m_lockfile->tryLock(0)) { | ||
57 | Warning() << "Failed to acquire exclusive lock on socket."; | ||
58 | exit(-1); | ||
59 | } | ||
60 | |||
61 | if (!m_server->listen(QString::fromLatin1(m_resourceInstanceIdentifier))) { | ||
62 | // FIXME: multiple starts need to be handled here | ||
63 | m_server->removeServer(m_resourceInstanceIdentifier); | ||
64 | if (!m_server->listen(QString::fromLatin1(m_resourceInstanceIdentifier))) { | ||
65 | Warning() << "Utter failure to start server"; | ||
66 | exit(-1); | ||
67 | } | ||
68 | } | ||
69 | |||
70 | if (m_server->isListening()) { | ||
71 | Log() << QString("Listening on %1").arg(m_server->serverName()); | ||
72 | } | ||
73 | |||
74 | m_checkConnectionsTimer = new QTimer; | ||
75 | m_checkConnectionsTimer->setSingleShot(true); | ||
76 | m_checkConnectionsTimer->setInterval(1000); | ||
77 | connect(m_checkConnectionsTimer, &QTimer::timeout, [this]() { | ||
78 | if (m_connections.isEmpty()) { | ||
79 | Log() << QString("No connections, shutting down."); | ||
80 | quit(); | ||
81 | } | ||
82 | }); | ||
83 | |||
84 | //TODO: experiment with different timeouts | ||
85 | // or even just drop down to invoking the method queued? => invoke queued unless we need throttling | ||
86 | m_clientBufferProcessesTimer->setInterval(0); | ||
87 | m_clientBufferProcessesTimer->setSingleShot(true); | ||
88 | connect(m_clientBufferProcessesTimer, &QTimer::timeout, | ||
89 | this, &Listener::processClientBuffers); | ||
90 | } | ||
91 | |||
92 | Listener::~Listener() | ||
93 | { | ||
94 | } | ||
95 | |||
96 | void Listener::closeAllConnections() | ||
97 | { | ||
98 | for (Client &client: m_connections) { | ||
99 | if (client.socket) { | ||
100 | disconnect(client.socket, 0, this, 0); | ||
101 | client.socket->close(); | ||
102 | delete client.socket; | ||
103 | client.socket = 0; | ||
104 | } | ||
105 | } | ||
106 | |||
107 | m_connections.clear(); | ||
108 | } | ||
109 | |||
110 | void Listener::acceptConnection() | ||
111 | { | ||
112 | Trace() << "Accepting connection"; | ||
113 | QLocalSocket *socket = m_server->nextPendingConnection(); | ||
114 | |||
115 | if (!socket) { | ||
116 | return; | ||
117 | } | ||
118 | |||
119 | Log() << "Got a connection"; | ||
120 | m_connections << Client("Unknown Client", socket); | ||
121 | connect(socket, &QIODevice::readyRead, | ||
122 | this, &Listener::onDataAvailable); | ||
123 | connect(socket, &QLocalSocket::disconnected, | ||
124 | this, &Listener::clientDropped); | ||
125 | m_checkConnectionsTimer->stop(); | ||
126 | |||
127 | if (socket->bytesAvailable()) { | ||
128 | readFromSocket(socket); | ||
129 | } | ||
130 | } | ||
131 | |||
132 | void Listener::clientDropped() | ||
133 | { | ||
134 | QLocalSocket *socket = qobject_cast<QLocalSocket *>(sender()); | ||
135 | if (!socket) { | ||
136 | return; | ||
137 | } | ||
138 | |||
139 | bool dropped = false; | ||
140 | QMutableVectorIterator<Client> it(m_connections); | ||
141 | while (it.hasNext()) { | ||
142 | const Client &client = it.next(); | ||
143 | if (client.socket == socket) { | ||
144 | dropped = true; | ||
145 | Log() << QString("Dropped connection: %1").arg(client.name) << socket; | ||
146 | it.remove(); | ||
147 | break; | ||
148 | } | ||
149 | } | ||
150 | if (!dropped) { | ||
151 | Warning() << "Failed to find connection for disconnected socket: " << socket; | ||
152 | } | ||
153 | |||
154 | checkConnections(); | ||
155 | } | ||
156 | |||
157 | void Listener::checkConnections() | ||
158 | { | ||
159 | m_checkConnectionsTimer->start(); | ||
160 | } | ||
161 | |||
162 | void Listener::onDataAvailable() | ||
163 | { | ||
164 | QLocalSocket *socket = qobject_cast<QLocalSocket *>(sender()); | ||
165 | if (!socket) { | ||
166 | return; | ||
167 | } | ||
168 | readFromSocket(socket); | ||
169 | } | ||
170 | |||
171 | void Listener::readFromSocket(QLocalSocket *socket) | ||
172 | { | ||
173 | Trace() << "Reading from socket..."; | ||
174 | for (Client &client: m_connections) { | ||
175 | if (client.socket == socket) { | ||
176 | client.commandBuffer += socket->readAll(); | ||
177 | if (!m_clientBufferProcessesTimer->isActive()) { | ||
178 | m_clientBufferProcessesTimer->start(); | ||
179 | } | ||
180 | break; | ||
181 | } | ||
182 | } | ||
183 | } | ||
184 | |||
185 | void Listener::processClientBuffers() | ||
186 | { | ||
187 | //TODO: we should not process all clients, but iterate async over them and process | ||
188 | // one command from each in turn to ensure all clients get fair handling of | ||
189 | // commands? | ||
190 | bool again = false; | ||
191 | for (Client &client: m_connections) { | ||
192 | if (!client.socket || !client.socket->isValid() || client.commandBuffer.isEmpty()) { | ||
193 | continue; | ||
194 | } | ||
195 | |||
196 | if (processClientBuffer(client)) { | ||
197 | again = true; | ||
198 | } | ||
199 | } | ||
200 | |||
201 | if (again) { | ||
202 | m_clientBufferProcessesTimer->start(); | ||
203 | } | ||
204 | } | ||
205 | |||
206 | void Listener::processCommand(int commandId, uint messageId, const QByteArray &commandBuffer, Client &client, const std::function<void()> &callback) | ||
207 | { | ||
208 | switch (commandId) { | ||
209 | case Akonadi2::Commands::HandshakeCommand: { | ||
210 | flatbuffers::Verifier verifier((const uint8_t *)commandBuffer.constData(), commandBuffer.size()); | ||
211 | if (Akonadi2::VerifyHandshakeBuffer(verifier)) { | ||
212 | auto buffer = Akonadi2::GetHandshake(commandBuffer.constData()); | ||
213 | client.name = buffer->name()->c_str(); | ||
214 | } else { | ||
215 | Warning() << "received invalid command"; | ||
216 | } | ||
217 | break; | ||
218 | } | ||
219 | case Akonadi2::Commands::SynchronizeCommand: { | ||
220 | flatbuffers::Verifier verifier((const uint8_t *)commandBuffer.constData(), commandBuffer.size()); | ||
221 | if (Akonadi2::VerifySynchronizeBuffer(verifier)) { | ||
222 | auto buffer = Akonadi2::GetSynchronize(commandBuffer.constData()); | ||
223 | Log() << QString("\tSynchronize request (id %1) from %2").arg(messageId).arg(client.name); | ||
224 | loadResource(); | ||
225 | if (!m_resource) { | ||
226 | Warning() << "No resource loaded"; | ||
227 | break; | ||
228 | } | ||
229 | auto job = KAsync::null<void>(); | ||
230 | if (buffer->sourceSync()) { | ||
231 | job = m_resource->synchronizeWithSource(m_pipeline); | ||
232 | } | ||
233 | if (buffer->localSync()) { | ||
234 | job = job.then<void>(m_resource->processAllMessages()); | ||
235 | } | ||
236 | job.then<void>([callback]() { | ||
237 | callback(); | ||
238 | }).exec(); | ||
239 | return; | ||
240 | } else { | ||
241 | Warning() << "received invalid command"; | ||
242 | } | ||
243 | break; | ||
244 | } | ||
245 | case Akonadi2::Commands::FetchEntityCommand: | ||
246 | case Akonadi2::Commands::DeleteEntityCommand: | ||
247 | case Akonadi2::Commands::ModifyEntityCommand: | ||
248 | case Akonadi2::Commands::CreateEntityCommand: | ||
249 | Log() << "\tCommand id " << messageId << " of type \"" << Akonadi2::Commands::name(commandId) << "\" from " << client.name; | ||
250 | loadResource(); | ||
251 | if (m_resource) { | ||
252 | m_resource->processCommand(commandId, commandBuffer, m_pipeline); | ||
253 | } | ||
254 | break; | ||
255 | case Akonadi2::Commands::ShutdownCommand: | ||
256 | Log() << QString("\tReceived shutdown command from %1").arg(client.name); | ||
257 | //Immediately reject new connections | ||
258 | m_server->close(); | ||
259 | QTimer::singleShot(0, this, &Listener::quit); | ||
260 | break; | ||
261 | default: | ||
262 | if (commandId > Akonadi2::Commands::CustomCommand) { | ||
263 | Log() << QString("\tReceived custom command from %1: ").arg(client.name) << commandId; | ||
264 | loadResource(); | ||
265 | if (m_resource) { | ||
266 | m_resource->processCommand(commandId, commandBuffer, m_pipeline); | ||
267 | } | ||
268 | } else { | ||
269 | Warning() << QString("\tReceived invalid command from %1: ").arg(client.name) << commandId; | ||
270 | //TODO: handle error: we don't know wtf this command is | ||
271 | } | ||
272 | break; | ||
273 | } | ||
274 | callback(); | ||
275 | } | ||
276 | |||
277 | void Listener::quit() | ||
278 | { | ||
279 | //Broadcast shutdown notifications to open clients, so they don't try to restart the resource | ||
280 | auto command = Akonadi2::CreateNotification(m_fbb, Akonadi2::NotificationType::NotificationType_Shutdown); | ||
281 | Akonadi2::FinishNotificationBuffer(m_fbb, command); | ||
282 | for (Client &client : m_connections) { | ||
283 | if (client.socket && client.socket->isOpen()) { | ||
284 | Akonadi2::Commands::write(client.socket, ++m_messageId, Akonadi2::Commands::NotificationCommand, m_fbb); | ||
285 | } | ||
286 | } | ||
287 | m_fbb.Clear(); | ||
288 | |||
289 | //Connections will be cleaned up later | ||
290 | emit noClients(); | ||
291 | } | ||
292 | |||
293 | bool Listener::processClientBuffer(Client &client) | ||
294 | { | ||
295 | static const int headerSize = Akonadi2::Commands::headerSize(); | ||
296 | if (client.commandBuffer.size() < headerSize) { | ||
297 | return false; | ||
298 | } | ||
299 | |||
300 | const uint messageId = *(uint*)client.commandBuffer.constData(); | ||
301 | const int commandId = *(int*)(client.commandBuffer.constData() + sizeof(uint)); | ||
302 | const uint size = *(uint*)(client.commandBuffer.constData() + sizeof(int) + sizeof(uint)); | ||
303 | |||
304 | //TODO: reject messages above a certain size? | ||
305 | |||
306 | if (size <= uint(client.commandBuffer.size() - headerSize)) { | ||
307 | client.commandBuffer.remove(0, headerSize); | ||
308 | |||
309 | auto socket = QPointer<QLocalSocket>(client.socket); | ||
310 | auto clientName = client.name; | ||
311 | const QByteArray commandBuffer = client.commandBuffer.left(size); | ||
312 | client.commandBuffer.remove(0, size); | ||
313 | processCommand(commandId, messageId, commandBuffer, client, [this, messageId, commandId, socket, clientName]() { | ||
314 | Log() << QString("\tCompleted command messageid %1 of type \"%2\" from %3").arg(messageId).arg(QString(Akonadi2::Commands::name(commandId))).arg(clientName); | ||
315 | if (socket) { | ||
316 | sendCommandCompleted(socket.data(), messageId); | ||
317 | } else { | ||
318 | Log() << QString("Socket became invalid before we could send a response. client: %1").arg(clientName); | ||
319 | } | ||
320 | }); | ||
321 | |||
322 | return client.commandBuffer.size() >= headerSize; | ||
323 | } | ||
324 | |||
325 | return false; | ||
326 | } | ||
327 | |||
328 | void Listener::sendCurrentRevision(Client &client) | ||
329 | { | ||
330 | if (!client.socket || !client.socket->isValid()) { | ||
331 | return; | ||
332 | } | ||
333 | |||
334 | auto command = Akonadi2::CreateRevisionUpdate(m_fbb, m_pipeline->storage().maxRevision()); | ||
335 | Akonadi2::FinishRevisionUpdateBuffer(m_fbb, command); | ||
336 | Akonadi2::Commands::write(client.socket, ++m_messageId, Akonadi2::Commands::RevisionUpdateCommand, m_fbb); | ||
337 | m_fbb.Clear(); | ||
338 | } | ||
339 | |||
340 | void Listener::sendCommandCompleted(QLocalSocket *socket, uint messageId) | ||
341 | { | ||
342 | if (!socket || !socket->isValid()) { | ||
343 | return; | ||
344 | } | ||
345 | |||
346 | auto command = Akonadi2::CreateCommandCompletion(m_fbb, messageId); | ||
347 | Akonadi2::FinishCommandCompletionBuffer(m_fbb, command); | ||
348 | Akonadi2::Commands::write(socket, ++m_messageId, Akonadi2::Commands::CommandCompletion, m_fbb); | ||
349 | m_fbb.Clear(); | ||
350 | } | ||
351 | |||
352 | void Listener::refreshRevision() | ||
353 | { | ||
354 | updateClientsWithRevision(); | ||
355 | } | ||
356 | |||
357 | void Listener::updateClientsWithRevision() | ||
358 | { | ||
359 | //FIXME don't send revision updates for revisions that are still being processed. | ||
360 | auto command = Akonadi2::CreateRevisionUpdate(m_fbb, m_pipeline->storage().maxRevision()); | ||
361 | Akonadi2::FinishRevisionUpdateBuffer(m_fbb, command); | ||
362 | |||
363 | for (const Client &client: m_connections) { | ||
364 | if (!client.socket || !client.socket->isValid()) { | ||
365 | continue; | ||
366 | } | ||
367 | |||
368 | Akonadi2::Commands::write(client.socket, ++m_messageId, Akonadi2::Commands::RevisionUpdateCommand, m_fbb); | ||
369 | } | ||
370 | m_fbb.Clear(); | ||
371 | } | ||
372 | |||
373 | void Listener::loadResource() | ||
374 | { | ||
375 | if (m_resource) { | ||
376 | return; | ||
377 | } | ||
378 | |||
379 | Akonadi2::ResourceFactory *resourceFactory = Akonadi2::ResourceFactory::load(m_resourceName); | ||
380 | if (resourceFactory) { | ||
381 | m_resource = resourceFactory->createResource(m_resourceInstanceIdentifier); | ||
382 | Log() << QString("Resource factory: %1").arg((qlonglong)resourceFactory); | ||
383 | Log() << QString("\tResource: %1").arg((qlonglong)m_resource); | ||
384 | m_resource->configurePipeline(m_pipeline); | ||
385 | } else { | ||
386 | ErrorMsg() << "Failed to load resource " << m_resourceName; | ||
387 | } | ||
388 | //TODO: on failure ... what? | ||
389 | //Enter broken state? | ||
390 | } | ||
391 | |||