diff options
-rw-r--r-- | common/commands.h | 1 | ||||
-rw-r--r-- | common/commands/notification.fbs | 9 | ||||
-rw-r--r-- | common/resourceaccess.cpp | 14 | ||||
-rw-r--r-- | synchronizer/listener.cpp | 32 | ||||
-rw-r--r-- | synchronizer/listener.h | 5 |
5 files changed, 50 insertions, 11 deletions
diff --git a/common/commands.h b/common/commands.h index 26729dc..0007ffc 100644 --- a/common/commands.h +++ b/common/commands.h | |||
@@ -43,6 +43,7 @@ enum CommandIds { | |||
43 | CreateEntityCommand, | 43 | CreateEntityCommand, |
44 | SearchSourceCommand, // need a buffer definition for this, but relies on Query API | 44 | SearchSourceCommand, // need a buffer definition for this, but relies on Query API |
45 | ShutdownCommand, | 45 | ShutdownCommand, |
46 | NotificationCommand, | ||
46 | CustomCommand = 0xffff | 47 | CustomCommand = 0xffff |
47 | }; | 48 | }; |
48 | 49 | ||
diff --git a/common/commands/notification.fbs b/common/commands/notification.fbs new file mode 100644 index 0000000..6684472 --- /dev/null +++ b/common/commands/notification.fbs | |||
@@ -0,0 +1,9 @@ | |||
1 | namespace Akonadi2; | ||
2 | |||
3 | enum NotificationType : byte { Shutdown = 1, Status, Warning, Progress } | ||
4 | |||
5 | table Notification { | ||
6 | type: NotificationType = Status; | ||
7 | } | ||
8 | |||
9 | root_type Notification; | ||
diff --git a/common/resourceaccess.cpp b/common/resourceaccess.cpp index 59cbece..20e1e8c 100644 --- a/common/resourceaccess.cpp +++ b/common/resourceaccess.cpp | |||
@@ -25,6 +25,7 @@ | |||
25 | #include "common/handshake_generated.h" | 25 | #include "common/handshake_generated.h" |
26 | #include "common/revisionupdate_generated.h" | 26 | #include "common/revisionupdate_generated.h" |
27 | #include "common/synchronize_generated.h" | 27 | #include "common/synchronize_generated.h" |
28 | #include "common/notification_generated.h" | ||
28 | #include "log.h" | 29 | #include "log.h" |
29 | 30 | ||
30 | #include <QCoreApplication> | 31 | #include <QCoreApplication> |
@@ -325,6 +326,19 @@ bool ResourceAccess::processMessageBuffer() | |||
325 | QMetaObject::invokeMethod(this, "callCallbacks", Qt::QueuedConnection, QGenericReturnArgument(), Q_ARG(int, buffer->id())); | 326 | QMetaObject::invokeMethod(this, "callCallbacks", Qt::QueuedConnection, QGenericReturnArgument(), Q_ARG(int, buffer->id())); |
326 | break; | 327 | break; |
327 | } | 328 | } |
329 | case Commands::NotificationCommand: { | ||
330 | auto buffer = GetNotification(d->partialMessageBuffer.constData() + headerSize); | ||
331 | switch (buffer->type()) { | ||
332 | case Akonadi2::NotificationType::NotificationType_Shutdown: | ||
333 | Log() << "Received shutdown notification."; | ||
334 | close(); | ||
335 | break; | ||
336 | default: | ||
337 | Warning() << "Received unknown notification: " << buffer->type(); | ||
338 | break; | ||
339 | } | ||
340 | break; | ||
341 | } | ||
328 | default: | 342 | default: |
329 | break; | 343 | break; |
330 | } | 344 | } |
diff --git a/synchronizer/listener.cpp b/synchronizer/listener.cpp index 2bc6be0..5165111 100644 --- a/synchronizer/listener.cpp +++ b/synchronizer/listener.cpp | |||
@@ -30,6 +30,7 @@ | |||
30 | #include "common/handshake_generated.h" | 30 | #include "common/handshake_generated.h" |
31 | #include "common/revisionupdate_generated.h" | 31 | #include "common/revisionupdate_generated.h" |
32 | #include "common/synchronize_generated.h" | 32 | #include "common/synchronize_generated.h" |
33 | #include "common/notification_generated.h" | ||
33 | 34 | ||
34 | #include <QLocalSocket> | 35 | #include <QLocalSocket> |
35 | #include <QTimer> | 36 | #include <QTimer> |
@@ -67,8 +68,7 @@ Listener::Listener(const QString &resourceName, QObject *parent) | |||
67 | connect(m_checkConnectionsTimer, &QTimer::timeout, [this]() { | 68 | connect(m_checkConnectionsTimer, &QTimer::timeout, [this]() { |
68 | if (m_connections.isEmpty()) { | 69 | if (m_connections.isEmpty()) { |
69 | Log() << QString("No connections, shutting down."); | 70 | Log() << QString("No connections, shutting down."); |
70 | m_server->close(); | 71 | quit(); |
71 | emit noClients(); | ||
72 | } | 72 | } |
73 | }); | 73 | }); |
74 | 74 | ||
@@ -250,10 +250,8 @@ void Listener::processCommand(int commandId, uint messageId, Client &client, uin | |||
250 | break; | 250 | break; |
251 | case Akonadi2::Commands::ShutdownCommand: | 251 | case Akonadi2::Commands::ShutdownCommand: |
252 | Log() << QString("\tReceived shutdown command from %1").arg(client.name); | 252 | Log() << QString("\tReceived shutdown command from %1").arg(client.name); |
253 | callback(); | 253 | QTimer::singleShot(0, this, &Listener::quit); |
254 | m_server->close(); | 254 | break; |
255 | emit noClients(); | ||
256 | return; | ||
257 | default: | 255 | default: |
258 | if (commandId > Akonadi2::Commands::CustomCommand) { | 256 | if (commandId > Akonadi2::Commands::CustomCommand) { |
259 | loadResource(); | 257 | loadResource(); |
@@ -268,6 +266,22 @@ void Listener::processCommand(int commandId, uint messageId, Client &client, uin | |||
268 | callback(); | 266 | callback(); |
269 | } | 267 | } |
270 | 268 | ||
269 | void Listener::quit() | ||
270 | { | ||
271 | //Broadcast shutdown notifications to open clients, so they don't try to restart the resource | ||
272 | auto command = Akonadi2::CreateNotification(m_fbb, Akonadi2::NotificationType::NotificationType_Shutdown); | ||
273 | Akonadi2::FinishNotificationBuffer(m_fbb, command); | ||
274 | for (Client &client : m_connections) { | ||
275 | if (client.socket && client.socket->isOpen()) { | ||
276 | Akonadi2::Commands::write(client.socket, ++m_messageId, Akonadi2::Commands::NotificationCommand, m_fbb); | ||
277 | } | ||
278 | } | ||
279 | m_fbb.Clear(); | ||
280 | |||
281 | m_server->close(); | ||
282 | emit noClients(); | ||
283 | } | ||
284 | |||
271 | bool Listener::processClientBuffer(Client &client) | 285 | bool Listener::processClientBuffer(Client &client) |
272 | { | 286 | { |
273 | static const int headerSize = Akonadi2::Commands::headerSize(); | 287 | static const int headerSize = Akonadi2::Commands::headerSize(); |
@@ -314,15 +328,15 @@ void Listener::sendCurrentRevision(Client &client) | |||
314 | m_fbb.Clear(); | 328 | m_fbb.Clear(); |
315 | } | 329 | } |
316 | 330 | ||
317 | void Listener::sendCommandCompleted(Client &client, uint messageId) | 331 | void Listener::sendCommandCompleted(QLocalSocket *socket, uint messageId) |
318 | { | 332 | { |
319 | if (!client.socket || !client.socket->isValid()) { | 333 | if (!socket || !socket->isValid()) { |
320 | return; | 334 | return; |
321 | } | 335 | } |
322 | 336 | ||
323 | auto command = Akonadi2::CreateCommandCompletion(m_fbb, messageId); | 337 | auto command = Akonadi2::CreateCommandCompletion(m_fbb, messageId); |
324 | Akonadi2::FinishCommandCompletionBuffer(m_fbb, command); | 338 | Akonadi2::FinishCommandCompletionBuffer(m_fbb, command); |
325 | Akonadi2::Commands::write(client.socket, ++m_messageId, Akonadi2::Commands::CommandCompletion, m_fbb); | 339 | Akonadi2::Commands::write(socket, ++m_messageId, Akonadi2::Commands::CommandCompletion, m_fbb); |
326 | m_fbb.Clear(); | 340 | m_fbb.Clear(); |
327 | } | 341 | } |
328 | 342 | ||
diff --git a/synchronizer/listener.h b/synchronizer/listener.h index ee73766..8dad3a4 100644 --- a/synchronizer/listener.h +++ b/synchronizer/listener.h | |||
@@ -49,7 +49,7 @@ public: | |||
49 | } | 49 | } |
50 | 50 | ||
51 | QString name; | 51 | QString name; |
52 | QLocalSocket *socket; | 52 | QPointer<QLocalSocket> socket; |
53 | QByteArray commandBuffer; | 53 | QByteArray commandBuffer; |
54 | }; | 54 | }; |
55 | 55 | ||
@@ -74,12 +74,13 @@ private Q_SLOTS: | |||
74 | void readFromSocket(); | 74 | void readFromSocket(); |
75 | void processClientBuffers(); | 75 | void processClientBuffers(); |
76 | void refreshRevision(); | 76 | void refreshRevision(); |
77 | void quit(); | ||
77 | 78 | ||
78 | private: | 79 | private: |
79 | void processCommand(int commandId, uint messageId, Client &client, uint size, const std::function<void()> &callback); | 80 | void processCommand(int commandId, uint messageId, Client &client, uint size, const std::function<void()> &callback); |
80 | bool processClientBuffer(Client &client); | 81 | bool processClientBuffer(Client &client); |
81 | void sendCurrentRevision(Client &client); | 82 | void sendCurrentRevision(Client &client); |
82 | void sendCommandCompleted(Client &client, uint messageId); | 83 | void sendCommandCompleted(QLocalSocket *socket, uint messageId); |
83 | void updateClientsWithRevision(); | 84 | void updateClientsWithRevision(); |
84 | void loadResource(); | 85 | void loadResource(); |
85 | 86 | ||