summaryrefslogtreecommitdiffstats
path: root/synchronizer/listener.cpp
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2015-07-28 20:49:08 +0200
committerChristian Mollekopf <chrigi_1@fastmail.fm>2015-07-28 20:49:08 +0200
commitdd86c15b48f33c120c510327569fb1cc3ffa3d45 (patch)
tree22db98990fbfe189ff40f34befa58ffb08287d9a /synchronizer/listener.cpp
parente22776a57bd12621358ad7cd98dac3261f2a70db (diff)
downloadsink-dd86c15b48f33c120c510327569fb1cc3ffa3d45.tar.gz
sink-dd86c15b48f33c120c510327569fb1cc3ffa3d45.zip
Moved listener to common
So we can use it in tests as well.
Diffstat (limited to 'synchronizer/listener.cpp')
-rw-r--r--synchronizer/listener.cpp391
1 files changed, 0 insertions, 391 deletions
diff --git a/synchronizer/listener.cpp b/synchronizer/listener.cpp
deleted file mode 100644
index 4316c63..0000000
--- a/synchronizer/listener.cpp
+++ /dev/null
@@ -1,391 +0,0 @@
1/*
2 * Copyright (C) 2014 Aaron Seigo <aseigo@kde.org>
3 *
4 * This program is free software; you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License as published by
6 * the Free Software Foundation; either version 2 of the License, or
7 * (at your option) any later version.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU General Public License for more details.
13 *
14 * You should have received a copy of the GNU General Public License
15 * along with this program; if not, write to the
16 * Free Software Foundation, Inc.,
17 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18 */
19
20#include "listener.h"
21
22#include "common/clientapi.h"
23#include "common/commands.h"
24#include "common/resource.h"
25#include "common/log.h"
26
27// commands
28#include "common/commandcompletion_generated.h"
29#include "common/handshake_generated.h"
30#include "common/revisionupdate_generated.h"
31#include "common/synchronize_generated.h"
32#include "common/notification_generated.h"
33
34#include <QLocalSocket>
35#include <QTimer>
36#include <QLockFile>
37
38Listener::Listener(const QByteArray &resourceInstanceIdentifier, QObject *parent)
39 : QObject(parent),
40 m_server(new QLocalServer(this)),
41 m_resourceName(Akonadi2::Store::resourceName(resourceInstanceIdentifier)),
42 m_resourceInstanceIdentifier(resourceInstanceIdentifier),
43 m_resource(0),
44 m_pipeline(new Akonadi2::Pipeline(resourceInstanceIdentifier, parent)),
45 m_clientBufferProcessesTimer(new QTimer(this)),
46 m_messageId(0)
47{
48 connect(m_pipeline, &Akonadi2::Pipeline::revisionUpdated,
49 this, &Listener::refreshRevision);
50 connect(m_server, &QLocalServer::newConnection,
51 this, &Listener::acceptConnection);
52 Trace() << "Trying to open " << m_resourceInstanceIdentifier;
53
54 m_lockfile = new QLockFile(m_resourceInstanceIdentifier + ".lock");
55 m_lockfile->setStaleLockTime(0);
56 if (!m_lockfile->tryLock(0)) {
57 Warning() << "Failed to acquire exclusive lock on socket.";
58 exit(-1);
59 }
60
61 if (!m_server->listen(QString::fromLatin1(m_resourceInstanceIdentifier))) {
62 // FIXME: multiple starts need to be handled here
63 m_server->removeServer(m_resourceInstanceIdentifier);
64 if (!m_server->listen(QString::fromLatin1(m_resourceInstanceIdentifier))) {
65 Warning() << "Utter failure to start server";
66 exit(-1);
67 }
68 }
69
70 if (m_server->isListening()) {
71 Log() << QString("Listening on %1").arg(m_server->serverName());
72 }
73
74 m_checkConnectionsTimer = new QTimer;
75 m_checkConnectionsTimer->setSingleShot(true);
76 m_checkConnectionsTimer->setInterval(1000);
77 connect(m_checkConnectionsTimer, &QTimer::timeout, [this]() {
78 if (m_connections.isEmpty()) {
79 Log() << QString("No connections, shutting down.");
80 quit();
81 }
82 });
83
84 //TODO: experiment with different timeouts
85 // or even just drop down to invoking the method queued? => invoke queued unless we need throttling
86 m_clientBufferProcessesTimer->setInterval(0);
87 m_clientBufferProcessesTimer->setSingleShot(true);
88 connect(m_clientBufferProcessesTimer, &QTimer::timeout,
89 this, &Listener::processClientBuffers);
90}
91
92Listener::~Listener()
93{
94}
95
96void Listener::closeAllConnections()
97{
98 for (Client &client: m_connections) {
99 if (client.socket) {
100 disconnect(client.socket, 0, this, 0);
101 client.socket->close();
102 delete client.socket;
103 client.socket = 0;
104 }
105 }
106
107 m_connections.clear();
108}
109
110void Listener::acceptConnection()
111{
112 Trace() << "Accepting connection";
113 QLocalSocket *socket = m_server->nextPendingConnection();
114
115 if (!socket) {
116 return;
117 }
118
119 Log() << "Got a connection";
120 m_connections << Client("Unknown Client", socket);
121 connect(socket, &QIODevice::readyRead,
122 this, &Listener::onDataAvailable);
123 connect(socket, &QLocalSocket::disconnected,
124 this, &Listener::clientDropped);
125 m_checkConnectionsTimer->stop();
126
127 if (socket->bytesAvailable()) {
128 readFromSocket(socket);
129 }
130}
131
132void Listener::clientDropped()
133{
134 QLocalSocket *socket = qobject_cast<QLocalSocket *>(sender());
135 if (!socket) {
136 return;
137 }
138
139 bool dropped = false;
140 QMutableVectorIterator<Client> it(m_connections);
141 while (it.hasNext()) {
142 const Client &client = it.next();
143 if (client.socket == socket) {
144 dropped = true;
145 Log() << QString("Dropped connection: %1").arg(client.name) << socket;
146 it.remove();
147 break;
148 }
149 }
150 if (!dropped) {
151 Warning() << "Failed to find connection for disconnected socket: " << socket;
152 }
153
154 checkConnections();
155}
156
157void Listener::checkConnections()
158{
159 m_checkConnectionsTimer->start();
160}
161
162void Listener::onDataAvailable()
163{
164 QLocalSocket *socket = qobject_cast<QLocalSocket *>(sender());
165 if (!socket) {
166 return;
167 }
168 readFromSocket(socket);
169}
170
171void Listener::readFromSocket(QLocalSocket *socket)
172{
173 Trace() << "Reading from socket...";
174 for (Client &client: m_connections) {
175 if (client.socket == socket) {
176 client.commandBuffer += socket->readAll();
177 if (!m_clientBufferProcessesTimer->isActive()) {
178 m_clientBufferProcessesTimer->start();
179 }
180 break;
181 }
182 }
183}
184
185void Listener::processClientBuffers()
186{
187 //TODO: we should not process all clients, but iterate async over them and process
188 // one command from each in turn to ensure all clients get fair handling of
189 // commands?
190 bool again = false;
191 for (Client &client: m_connections) {
192 if (!client.socket || !client.socket->isValid() || client.commandBuffer.isEmpty()) {
193 continue;
194 }
195
196 if (processClientBuffer(client)) {
197 again = true;
198 }
199 }
200
201 if (again) {
202 m_clientBufferProcessesTimer->start();
203 }
204}
205
206void Listener::processCommand(int commandId, uint messageId, const QByteArray &commandBuffer, Client &client, const std::function<void()> &callback)
207{
208 switch (commandId) {
209 case Akonadi2::Commands::HandshakeCommand: {
210 flatbuffers::Verifier verifier((const uint8_t *)commandBuffer.constData(), commandBuffer.size());
211 if (Akonadi2::VerifyHandshakeBuffer(verifier)) {
212 auto buffer = Akonadi2::GetHandshake(commandBuffer.constData());
213 client.name = buffer->name()->c_str();
214 } else {
215 Warning() << "received invalid command";
216 }
217 break;
218 }
219 case Akonadi2::Commands::SynchronizeCommand: {
220 flatbuffers::Verifier verifier((const uint8_t *)commandBuffer.constData(), commandBuffer.size());
221 if (Akonadi2::VerifySynchronizeBuffer(verifier)) {
222 auto buffer = Akonadi2::GetSynchronize(commandBuffer.constData());
223 Log() << QString("\tSynchronize request (id %1) from %2").arg(messageId).arg(client.name);
224 loadResource();
225 if (!m_resource) {
226 Warning() << "No resource loaded";
227 break;
228 }
229 auto job = KAsync::null<void>();
230 if (buffer->sourceSync()) {
231 job = m_resource->synchronizeWithSource(m_pipeline);
232 }
233 if (buffer->localSync()) {
234 job = job.then<void>(m_resource->processAllMessages());
235 }
236 job.then<void>([callback]() {
237 callback();
238 }).exec();
239 return;
240 } else {
241 Warning() << "received invalid command";
242 }
243 break;
244 }
245 case Akonadi2::Commands::FetchEntityCommand:
246 case Akonadi2::Commands::DeleteEntityCommand:
247 case Akonadi2::Commands::ModifyEntityCommand:
248 case Akonadi2::Commands::CreateEntityCommand:
249 Log() << "\tCommand id " << messageId << " of type \"" << Akonadi2::Commands::name(commandId) << "\" from " << client.name;
250 loadResource();
251 if (m_resource) {
252 m_resource->processCommand(commandId, commandBuffer, m_pipeline);
253 }
254 break;
255 case Akonadi2::Commands::ShutdownCommand:
256 Log() << QString("\tReceived shutdown command from %1").arg(client.name);
257 //Immediately reject new connections
258 m_server->close();
259 QTimer::singleShot(0, this, &Listener::quit);
260 break;
261 default:
262 if (commandId > Akonadi2::Commands::CustomCommand) {
263 Log() << QString("\tReceived custom command from %1: ").arg(client.name) << commandId;
264 loadResource();
265 if (m_resource) {
266 m_resource->processCommand(commandId, commandBuffer, m_pipeline);
267 }
268 } else {
269 Warning() << QString("\tReceived invalid command from %1: ").arg(client.name) << commandId;
270 //TODO: handle error: we don't know wtf this command is
271 }
272 break;
273 }
274 callback();
275}
276
277void Listener::quit()
278{
279 //Broadcast shutdown notifications to open clients, so they don't try to restart the resource
280 auto command = Akonadi2::CreateNotification(m_fbb, Akonadi2::NotificationType::NotificationType_Shutdown);
281 Akonadi2::FinishNotificationBuffer(m_fbb, command);
282 for (Client &client : m_connections) {
283 if (client.socket && client.socket->isOpen()) {
284 Akonadi2::Commands::write(client.socket, ++m_messageId, Akonadi2::Commands::NotificationCommand, m_fbb);
285 }
286 }
287 m_fbb.Clear();
288
289 //Connections will be cleaned up later
290 emit noClients();
291}
292
293bool Listener::processClientBuffer(Client &client)
294{
295 static const int headerSize = Akonadi2::Commands::headerSize();
296 if (client.commandBuffer.size() < headerSize) {
297 return false;
298 }
299
300 const uint messageId = *(uint*)client.commandBuffer.constData();
301 const int commandId = *(int*)(client.commandBuffer.constData() + sizeof(uint));
302 const uint size = *(uint*)(client.commandBuffer.constData() + sizeof(int) + sizeof(uint));
303
304 //TODO: reject messages above a certain size?
305
306 if (size <= uint(client.commandBuffer.size() - headerSize)) {
307 client.commandBuffer.remove(0, headerSize);
308
309 auto socket = QPointer<QLocalSocket>(client.socket);
310 auto clientName = client.name;
311 const QByteArray commandBuffer = client.commandBuffer.left(size);
312 client.commandBuffer.remove(0, size);
313 processCommand(commandId, messageId, commandBuffer, client, [this, messageId, commandId, socket, clientName]() {
314 Log() << QString("\tCompleted command messageid %1 of type \"%2\" from %3").arg(messageId).arg(QString(Akonadi2::Commands::name(commandId))).arg(clientName);
315 if (socket) {
316 sendCommandCompleted(socket.data(), messageId);
317 } else {
318 Log() << QString("Socket became invalid before we could send a response. client: %1").arg(clientName);
319 }
320 });
321
322 return client.commandBuffer.size() >= headerSize;
323 }
324
325 return false;
326}
327
328void Listener::sendCurrentRevision(Client &client)
329{
330 if (!client.socket || !client.socket->isValid()) {
331 return;
332 }
333
334 auto command = Akonadi2::CreateRevisionUpdate(m_fbb, m_pipeline->storage().maxRevision());
335 Akonadi2::FinishRevisionUpdateBuffer(m_fbb, command);
336 Akonadi2::Commands::write(client.socket, ++m_messageId, Akonadi2::Commands::RevisionUpdateCommand, m_fbb);
337 m_fbb.Clear();
338}
339
340void Listener::sendCommandCompleted(QLocalSocket *socket, uint messageId)
341{
342 if (!socket || !socket->isValid()) {
343 return;
344 }
345
346 auto command = Akonadi2::CreateCommandCompletion(m_fbb, messageId);
347 Akonadi2::FinishCommandCompletionBuffer(m_fbb, command);
348 Akonadi2::Commands::write(socket, ++m_messageId, Akonadi2::Commands::CommandCompletion, m_fbb);
349 m_fbb.Clear();
350}
351
352void Listener::refreshRevision()
353{
354 updateClientsWithRevision();
355}
356
357void Listener::updateClientsWithRevision()
358{
359 //FIXME don't send revision updates for revisions that are still being processed.
360 auto command = Akonadi2::CreateRevisionUpdate(m_fbb, m_pipeline->storage().maxRevision());
361 Akonadi2::FinishRevisionUpdateBuffer(m_fbb, command);
362
363 for (const Client &client: m_connections) {
364 if (!client.socket || !client.socket->isValid()) {
365 continue;
366 }
367
368 Akonadi2::Commands::write(client.socket, ++m_messageId, Akonadi2::Commands::RevisionUpdateCommand, m_fbb);
369 }
370 m_fbb.Clear();
371}
372
373void Listener::loadResource()
374{
375 if (m_resource) {
376 return;
377 }
378
379 Akonadi2::ResourceFactory *resourceFactory = Akonadi2::ResourceFactory::load(m_resourceName);
380 if (resourceFactory) {
381 m_resource = resourceFactory->createResource(m_resourceInstanceIdentifier);
382 Log() << QString("Resource factory: %1").arg((qlonglong)resourceFactory);
383 Log() << QString("\tResource: %1").arg((qlonglong)m_resource);
384 m_resource->configurePipeline(m_pipeline);
385 } else {
386 ErrorMsg() << "Failed to load resource " << m_resourceName;
387 }
388 //TODO: on failure ... what?
389 //Enter broken state?
390}
391