summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--common/commands.h1
-rw-r--r--common/commands/notification.fbs9
-rw-r--r--common/resourceaccess.cpp14
-rw-r--r--synchronizer/listener.cpp32
-rw-r--r--synchronizer/listener.h5
5 files changed, 50 insertions, 11 deletions
diff --git a/common/commands.h b/common/commands.h
index 26729dc..0007ffc 100644
--- a/common/commands.h
+++ b/common/commands.h
@@ -43,6 +43,7 @@ enum CommandIds {
43 CreateEntityCommand, 43 CreateEntityCommand,
44 SearchSourceCommand, // need a buffer definition for this, but relies on Query API 44 SearchSourceCommand, // need a buffer definition for this, but relies on Query API
45 ShutdownCommand, 45 ShutdownCommand,
46 NotificationCommand,
46 CustomCommand = 0xffff 47 CustomCommand = 0xffff
47}; 48};
48 49
diff --git a/common/commands/notification.fbs b/common/commands/notification.fbs
new file mode 100644
index 0000000..6684472
--- /dev/null
+++ b/common/commands/notification.fbs
@@ -0,0 +1,9 @@
1namespace Akonadi2;
2
3enum NotificationType : byte { Shutdown = 1, Status, Warning, Progress }
4
5table Notification {
6 type: NotificationType = Status;
7}
8
9root_type Notification;
diff --git a/common/resourceaccess.cpp b/common/resourceaccess.cpp
index 59cbece..20e1e8c 100644
--- a/common/resourceaccess.cpp
+++ b/common/resourceaccess.cpp
@@ -25,6 +25,7 @@
25#include "common/handshake_generated.h" 25#include "common/handshake_generated.h"
26#include "common/revisionupdate_generated.h" 26#include "common/revisionupdate_generated.h"
27#include "common/synchronize_generated.h" 27#include "common/synchronize_generated.h"
28#include "common/notification_generated.h"
28#include "log.h" 29#include "log.h"
29 30
30#include <QCoreApplication> 31#include <QCoreApplication>
@@ -325,6 +326,19 @@ bool ResourceAccess::processMessageBuffer()
325 QMetaObject::invokeMethod(this, "callCallbacks", Qt::QueuedConnection, QGenericReturnArgument(), Q_ARG(int, buffer->id())); 326 QMetaObject::invokeMethod(this, "callCallbacks", Qt::QueuedConnection, QGenericReturnArgument(), Q_ARG(int, buffer->id()));
326 break; 327 break;
327 } 328 }
329 case Commands::NotificationCommand: {
330 auto buffer = GetNotification(d->partialMessageBuffer.constData() + headerSize);
331 switch (buffer->type()) {
332 case Akonadi2::NotificationType::NotificationType_Shutdown:
333 Log() << "Received shutdown notification.";
334 close();
335 break;
336 default:
337 Warning() << "Received unknown notification: " << buffer->type();
338 break;
339 }
340 break;
341 }
328 default: 342 default:
329 break; 343 break;
330 } 344 }
diff --git a/synchronizer/listener.cpp b/synchronizer/listener.cpp
index 2bc6be0..5165111 100644
--- a/synchronizer/listener.cpp
+++ b/synchronizer/listener.cpp
@@ -30,6 +30,7 @@
30#include "common/handshake_generated.h" 30#include "common/handshake_generated.h"
31#include "common/revisionupdate_generated.h" 31#include "common/revisionupdate_generated.h"
32#include "common/synchronize_generated.h" 32#include "common/synchronize_generated.h"
33#include "common/notification_generated.h"
33 34
34#include <QLocalSocket> 35#include <QLocalSocket>
35#include <QTimer> 36#include <QTimer>
@@ -67,8 +68,7 @@ Listener::Listener(const QString &resourceName, QObject *parent)
67 connect(m_checkConnectionsTimer, &QTimer::timeout, [this]() { 68 connect(m_checkConnectionsTimer, &QTimer::timeout, [this]() {
68 if (m_connections.isEmpty()) { 69 if (m_connections.isEmpty()) {
69 Log() << QString("No connections, shutting down."); 70 Log() << QString("No connections, shutting down.");
70 m_server->close(); 71 quit();
71 emit noClients();
72 } 72 }
73 }); 73 });
74 74
@@ -250,10 +250,8 @@ void Listener::processCommand(int commandId, uint messageId, Client &client, uin
250 break; 250 break;
251 case Akonadi2::Commands::ShutdownCommand: 251 case Akonadi2::Commands::ShutdownCommand:
252 Log() << QString("\tReceived shutdown command from %1").arg(client.name); 252 Log() << QString("\tReceived shutdown command from %1").arg(client.name);
253 callback(); 253 QTimer::singleShot(0, this, &Listener::quit);
254 m_server->close(); 254 break;
255 emit noClients();
256 return;
257 default: 255 default:
258 if (commandId > Akonadi2::Commands::CustomCommand) { 256 if (commandId > Akonadi2::Commands::CustomCommand) {
259 loadResource(); 257 loadResource();
@@ -268,6 +266,22 @@ void Listener::processCommand(int commandId, uint messageId, Client &client, uin
268 callback(); 266 callback();
269} 267}
270 268
269void Listener::quit()
270{
271 //Broadcast shutdown notifications to open clients, so they don't try to restart the resource
272 auto command = Akonadi2::CreateNotification(m_fbb, Akonadi2::NotificationType::NotificationType_Shutdown);
273 Akonadi2::FinishNotificationBuffer(m_fbb, command);
274 for (Client &client : m_connections) {
275 if (client.socket && client.socket->isOpen()) {
276 Akonadi2::Commands::write(client.socket, ++m_messageId, Akonadi2::Commands::NotificationCommand, m_fbb);
277 }
278 }
279 m_fbb.Clear();
280
281 m_server->close();
282 emit noClients();
283}
284
271bool Listener::processClientBuffer(Client &client) 285bool Listener::processClientBuffer(Client &client)
272{ 286{
273 static const int headerSize = Akonadi2::Commands::headerSize(); 287 static const int headerSize = Akonadi2::Commands::headerSize();
@@ -314,15 +328,15 @@ void Listener::sendCurrentRevision(Client &client)
314 m_fbb.Clear(); 328 m_fbb.Clear();
315} 329}
316 330
317void Listener::sendCommandCompleted(Client &client, uint messageId) 331void Listener::sendCommandCompleted(QLocalSocket *socket, uint messageId)
318{ 332{
319 if (!client.socket || !client.socket->isValid()) { 333 if (!socket || !socket->isValid()) {
320 return; 334 return;
321 } 335 }
322 336
323 auto command = Akonadi2::CreateCommandCompletion(m_fbb, messageId); 337 auto command = Akonadi2::CreateCommandCompletion(m_fbb, messageId);
324 Akonadi2::FinishCommandCompletionBuffer(m_fbb, command); 338 Akonadi2::FinishCommandCompletionBuffer(m_fbb, command);
325 Akonadi2::Commands::write(client.socket, ++m_messageId, Akonadi2::Commands::CommandCompletion, m_fbb); 339 Akonadi2::Commands::write(socket, ++m_messageId, Akonadi2::Commands::CommandCompletion, m_fbb);
326 m_fbb.Clear(); 340 m_fbb.Clear();
327} 341}
328 342
diff --git a/synchronizer/listener.h b/synchronizer/listener.h
index ee73766..8dad3a4 100644
--- a/synchronizer/listener.h
+++ b/synchronizer/listener.h
@@ -49,7 +49,7 @@ public:
49 } 49 }
50 50
51 QString name; 51 QString name;
52 QLocalSocket *socket; 52 QPointer<QLocalSocket> socket;
53 QByteArray commandBuffer; 53 QByteArray commandBuffer;
54}; 54};
55 55
@@ -74,12 +74,13 @@ private Q_SLOTS:
74 void readFromSocket(); 74 void readFromSocket();
75 void processClientBuffers(); 75 void processClientBuffers();
76 void refreshRevision(); 76 void refreshRevision();
77 void quit();
77 78
78private: 79private:
79 void processCommand(int commandId, uint messageId, Client &client, uint size, const std::function<void()> &callback); 80 void processCommand(int commandId, uint messageId, Client &client, uint size, const std::function<void()> &callback);
80 bool processClientBuffer(Client &client); 81 bool processClientBuffer(Client &client);
81 void sendCurrentRevision(Client &client); 82 void sendCurrentRevision(Client &client);
82 void sendCommandCompleted(Client &client, uint messageId); 83 void sendCommandCompleted(QLocalSocket *socket, uint messageId);
83 void updateClientsWithRevision(); 84 void updateClientsWithRevision();
84 void loadResource(); 85 void loadResource();
85 86