diff options
author | Christian Mollekopf <chrigi_1@fastmail.fm> | 2015-03-27 16:45:48 +0100 |
---|---|---|
committer | Christian Mollekopf <chrigi_1@fastmail.fm> | 2015-03-31 11:11:08 +0200 |
commit | 7642433a4fafd2ccf7e8b43c81ec18282f544a3b (patch) | |
tree | 067d1faa86061eabe24444e59118e9c1567848ba | |
parent | c9aeb8896ae578515c217b9a08988156b4d62f1e (diff) | |
download | sink-7642433a4fafd2ccf7e8b43c81ec18282f544a3b.tar.gz sink-7642433a4fafd2ccf7e8b43c81ec18282f544a3b.zip |
Shutdown notification to achieve a clean shutdown.
Otherwise the client always restarts the resource because of the lost connection.
We currently require this in tests to be able to delete the db, but eventually
we likely want a "disable akonadi" function that shuts resources down,
and keeps clients from restarting them (e.g. via configuration).
-rw-r--r-- | common/commands.h | 1 | ||||
-rw-r--r-- | common/commands/notification.fbs | 9 | ||||
-rw-r--r-- | common/resourceaccess.cpp | 14 | ||||
-rw-r--r-- | synchronizer/listener.cpp | 32 | ||||
-rw-r--r-- | synchronizer/listener.h | 5 |
5 files changed, 50 insertions, 11 deletions
diff --git a/common/commands.h b/common/commands.h index 26729dc..0007ffc 100644 --- a/common/commands.h +++ b/common/commands.h | |||
@@ -43,6 +43,7 @@ enum CommandIds { | |||
43 | CreateEntityCommand, | 43 | CreateEntityCommand, |
44 | SearchSourceCommand, // need a buffer definition for this, but relies on Query API | 44 | SearchSourceCommand, // need a buffer definition for this, but relies on Query API |
45 | ShutdownCommand, | 45 | ShutdownCommand, |
46 | NotificationCommand, | ||
46 | CustomCommand = 0xffff | 47 | CustomCommand = 0xffff |
47 | }; | 48 | }; |
48 | 49 | ||
diff --git a/common/commands/notification.fbs b/common/commands/notification.fbs new file mode 100644 index 0000000..6684472 --- /dev/null +++ b/common/commands/notification.fbs | |||
@@ -0,0 +1,9 @@ | |||
1 | namespace Akonadi2; | ||
2 | |||
3 | enum NotificationType : byte { Shutdown = 1, Status, Warning, Progress } | ||
4 | |||
5 | table Notification { | ||
6 | type: NotificationType = Status; | ||
7 | } | ||
8 | |||
9 | root_type Notification; | ||
diff --git a/common/resourceaccess.cpp b/common/resourceaccess.cpp index 59cbece..20e1e8c 100644 --- a/common/resourceaccess.cpp +++ b/common/resourceaccess.cpp | |||
@@ -25,6 +25,7 @@ | |||
25 | #include "common/handshake_generated.h" | 25 | #include "common/handshake_generated.h" |
26 | #include "common/revisionupdate_generated.h" | 26 | #include "common/revisionupdate_generated.h" |
27 | #include "common/synchronize_generated.h" | 27 | #include "common/synchronize_generated.h" |
28 | #include "common/notification_generated.h" | ||
28 | #include "log.h" | 29 | #include "log.h" |
29 | 30 | ||
30 | #include <QCoreApplication> | 31 | #include <QCoreApplication> |
@@ -325,6 +326,19 @@ bool ResourceAccess::processMessageBuffer() | |||
325 | QMetaObject::invokeMethod(this, "callCallbacks", Qt::QueuedConnection, QGenericReturnArgument(), Q_ARG(int, buffer->id())); | 326 | QMetaObject::invokeMethod(this, "callCallbacks", Qt::QueuedConnection, QGenericReturnArgument(), Q_ARG(int, buffer->id())); |
326 | break; | 327 | break; |
327 | } | 328 | } |
329 | case Commands::NotificationCommand: { | ||
330 | auto buffer = GetNotification(d->partialMessageBuffer.constData() + headerSize); | ||
331 | switch (buffer->type()) { | ||
332 | case Akonadi2::NotificationType::NotificationType_Shutdown: | ||
333 | Log() << "Received shutdown notification."; | ||
334 | close(); | ||
335 | break; | ||
336 | default: | ||
337 | Warning() << "Received unknown notification: " << buffer->type(); | ||
338 | break; | ||
339 | } | ||
340 | break; | ||
341 | } | ||
328 | default: | 342 | default: |
329 | break; | 343 | break; |
330 | } | 344 | } |
diff --git a/synchronizer/listener.cpp b/synchronizer/listener.cpp index 2bc6be0..5165111 100644 --- a/synchronizer/listener.cpp +++ b/synchronizer/listener.cpp | |||
@@ -30,6 +30,7 @@ | |||
30 | #include "common/handshake_generated.h" | 30 | #include "common/handshake_generated.h" |
31 | #include "common/revisionupdate_generated.h" | 31 | #include "common/revisionupdate_generated.h" |
32 | #include "common/synchronize_generated.h" | 32 | #include "common/synchronize_generated.h" |
33 | #include "common/notification_generated.h" | ||
33 | 34 | ||
34 | #include <QLocalSocket> | 35 | #include <QLocalSocket> |
35 | #include <QTimer> | 36 | #include <QTimer> |
@@ -67,8 +68,7 @@ Listener::Listener(const QString &resourceName, QObject *parent) | |||
67 | connect(m_checkConnectionsTimer, &QTimer::timeout, [this]() { | 68 | connect(m_checkConnectionsTimer, &QTimer::timeout, [this]() { |
68 | if (m_connections.isEmpty()) { | 69 | if (m_connections.isEmpty()) { |
69 | Log() << QString("No connections, shutting down."); | 70 | Log() << QString("No connections, shutting down."); |
70 | m_server->close(); | 71 | quit(); |
71 | emit noClients(); | ||
72 | } | 72 | } |
73 | }); | 73 | }); |
74 | 74 | ||
@@ -250,10 +250,8 @@ void Listener::processCommand(int commandId, uint messageId, Client &client, uin | |||
250 | break; | 250 | break; |
251 | case Akonadi2::Commands::ShutdownCommand: | 251 | case Akonadi2::Commands::ShutdownCommand: |
252 | Log() << QString("\tReceived shutdown command from %1").arg(client.name); | 252 | Log() << QString("\tReceived shutdown command from %1").arg(client.name); |
253 | callback(); | 253 | QTimer::singleShot(0, this, &Listener::quit); |
254 | m_server->close(); | 254 | break; |
255 | emit noClients(); | ||
256 | return; | ||
257 | default: | 255 | default: |
258 | if (commandId > Akonadi2::Commands::CustomCommand) { | 256 | if (commandId > Akonadi2::Commands::CustomCommand) { |
259 | loadResource(); | 257 | loadResource(); |
@@ -268,6 +266,22 @@ void Listener::processCommand(int commandId, uint messageId, Client &client, uin | |||
268 | callback(); | 266 | callback(); |
269 | } | 267 | } |
270 | 268 | ||
269 | void Listener::quit() | ||
270 | { | ||
271 | //Broadcast shutdown notifications to open clients, so they don't try to restart the resource | ||
272 | auto command = Akonadi2::CreateNotification(m_fbb, Akonadi2::NotificationType::NotificationType_Shutdown); | ||
273 | Akonadi2::FinishNotificationBuffer(m_fbb, command); | ||
274 | for (Client &client : m_connections) { | ||
275 | if (client.socket && client.socket->isOpen()) { | ||
276 | Akonadi2::Commands::write(client.socket, ++m_messageId, Akonadi2::Commands::NotificationCommand, m_fbb); | ||
277 | } | ||
278 | } | ||
279 | m_fbb.Clear(); | ||
280 | |||
281 | m_server->close(); | ||
282 | emit noClients(); | ||
283 | } | ||
284 | |||
271 | bool Listener::processClientBuffer(Client &client) | 285 | bool Listener::processClientBuffer(Client &client) |
272 | { | 286 | { |
273 | static const int headerSize = Akonadi2::Commands::headerSize(); | 287 | static const int headerSize = Akonadi2::Commands::headerSize(); |
@@ -314,15 +328,15 @@ void Listener::sendCurrentRevision(Client &client) | |||
314 | m_fbb.Clear(); | 328 | m_fbb.Clear(); |
315 | } | 329 | } |
316 | 330 | ||
317 | void Listener::sendCommandCompleted(Client &client, uint messageId) | 331 | void Listener::sendCommandCompleted(QLocalSocket *socket, uint messageId) |
318 | { | 332 | { |
319 | if (!client.socket || !client.socket->isValid()) { | 333 | if (!socket || !socket->isValid()) { |
320 | return; | 334 | return; |
321 | } | 335 | } |
322 | 336 | ||
323 | auto command = Akonadi2::CreateCommandCompletion(m_fbb, messageId); | 337 | auto command = Akonadi2::CreateCommandCompletion(m_fbb, messageId); |
324 | Akonadi2::FinishCommandCompletionBuffer(m_fbb, command); | 338 | Akonadi2::FinishCommandCompletionBuffer(m_fbb, command); |
325 | Akonadi2::Commands::write(client.socket, ++m_messageId, Akonadi2::Commands::CommandCompletion, m_fbb); | 339 | Akonadi2::Commands::write(socket, ++m_messageId, Akonadi2::Commands::CommandCompletion, m_fbb); |
326 | m_fbb.Clear(); | 340 | m_fbb.Clear(); |
327 | } | 341 | } |
328 | 342 | ||
diff --git a/synchronizer/listener.h b/synchronizer/listener.h index ee73766..8dad3a4 100644 --- a/synchronizer/listener.h +++ b/synchronizer/listener.h | |||
@@ -49,7 +49,7 @@ public: | |||
49 | } | 49 | } |
50 | 50 | ||
51 | QString name; | 51 | QString name; |
52 | QLocalSocket *socket; | 52 | QPointer<QLocalSocket> socket; |
53 | QByteArray commandBuffer; | 53 | QByteArray commandBuffer; |
54 | }; | 54 | }; |
55 | 55 | ||
@@ -74,12 +74,13 @@ private Q_SLOTS: | |||
74 | void readFromSocket(); | 74 | void readFromSocket(); |
75 | void processClientBuffers(); | 75 | void processClientBuffers(); |
76 | void refreshRevision(); | 76 | void refreshRevision(); |
77 | void quit(); | ||
77 | 78 | ||
78 | private: | 79 | private: |
79 | void processCommand(int commandId, uint messageId, Client &client, uint size, const std::function<void()> &callback); | 80 | void processCommand(int commandId, uint messageId, Client &client, uint size, const std::function<void()> &callback); |
80 | bool processClientBuffer(Client &client); | 81 | bool processClientBuffer(Client &client); |
81 | void sendCurrentRevision(Client &client); | 82 | void sendCurrentRevision(Client &client); |
82 | void sendCommandCompleted(Client &client, uint messageId); | 83 | void sendCommandCompleted(QLocalSocket *socket, uint messageId); |
83 | void updateClientsWithRevision(); | 84 | void updateClientsWithRevision(); |
84 | void loadResource(); | 85 | void loadResource(); |
85 | 86 | ||