summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2015-03-27 16:45:48 +0100
committerChristian Mollekopf <chrigi_1@fastmail.fm>2015-03-31 11:11:08 +0200
commit7642433a4fafd2ccf7e8b43c81ec18282f544a3b (patch)
tree067d1faa86061eabe24444e59118e9c1567848ba
parentc9aeb8896ae578515c217b9a08988156b4d62f1e (diff)
downloadsink-7642433a4fafd2ccf7e8b43c81ec18282f544a3b.tar.gz
sink-7642433a4fafd2ccf7e8b43c81ec18282f544a3b.zip
Shutdown notification to achieve a clean shutdown.
Otherwise the client always restarts the resource because of the lost connection. We currently require this in tests to be able to delete the db, but eventually we likely want a "disable akonadi" function that shuts resources down, and keeps clients from restarting them (e.g. via configuration).
-rw-r--r--common/commands.h1
-rw-r--r--common/commands/notification.fbs9
-rw-r--r--common/resourceaccess.cpp14
-rw-r--r--synchronizer/listener.cpp32
-rw-r--r--synchronizer/listener.h5
5 files changed, 50 insertions, 11 deletions
diff --git a/common/commands.h b/common/commands.h
index 26729dc..0007ffc 100644
--- a/common/commands.h
+++ b/common/commands.h
@@ -43,6 +43,7 @@ enum CommandIds {
43 CreateEntityCommand, 43 CreateEntityCommand,
44 SearchSourceCommand, // need a buffer definition for this, but relies on Query API 44 SearchSourceCommand, // need a buffer definition for this, but relies on Query API
45 ShutdownCommand, 45 ShutdownCommand,
46 NotificationCommand,
46 CustomCommand = 0xffff 47 CustomCommand = 0xffff
47}; 48};
48 49
diff --git a/common/commands/notification.fbs b/common/commands/notification.fbs
new file mode 100644
index 0000000..6684472
--- /dev/null
+++ b/common/commands/notification.fbs
@@ -0,0 +1,9 @@
1namespace Akonadi2;
2
3enum NotificationType : byte { Shutdown = 1, Status, Warning, Progress }
4
5table Notification {
6 type: NotificationType = Status;
7}
8
9root_type Notification;
diff --git a/common/resourceaccess.cpp b/common/resourceaccess.cpp
index 59cbece..20e1e8c 100644
--- a/common/resourceaccess.cpp
+++ b/common/resourceaccess.cpp
@@ -25,6 +25,7 @@
25#include "common/handshake_generated.h" 25#include "common/handshake_generated.h"
26#include "common/revisionupdate_generated.h" 26#include "common/revisionupdate_generated.h"
27#include "common/synchronize_generated.h" 27#include "common/synchronize_generated.h"
28#include "common/notification_generated.h"
28#include "log.h" 29#include "log.h"
29 30
30#include <QCoreApplication> 31#include <QCoreApplication>
@@ -325,6 +326,19 @@ bool ResourceAccess::processMessageBuffer()
325 QMetaObject::invokeMethod(this, "callCallbacks", Qt::QueuedConnection, QGenericReturnArgument(), Q_ARG(int, buffer->id())); 326 QMetaObject::invokeMethod(this, "callCallbacks", Qt::QueuedConnection, QGenericReturnArgument(), Q_ARG(int, buffer->id()));
326 break; 327 break;
327 } 328 }
329 case Commands::NotificationCommand: {
330 auto buffer = GetNotification(d->partialMessageBuffer.constData() + headerSize);
331 switch (buffer->type()) {
332 case Akonadi2::NotificationType::NotificationType_Shutdown:
333 Log() << "Received shutdown notification.";
334 close();
335 break;
336 default:
337 Warning() << "Received unknown notification: " << buffer->type();
338 break;
339 }
340 break;
341 }
328 default: 342 default:
329 break; 343 break;
330 } 344 }
diff --git a/synchronizer/listener.cpp b/synchronizer/listener.cpp
index 2bc6be0..5165111 100644
--- a/synchronizer/listener.cpp
+++ b/synchronizer/listener.cpp
@@ -30,6 +30,7 @@
30#include "common/handshake_generated.h" 30#include "common/handshake_generated.h"
31#include "common/revisionupdate_generated.h" 31#include "common/revisionupdate_generated.h"
32#include "common/synchronize_generated.h" 32#include "common/synchronize_generated.h"
33#include "common/notification_generated.h"
33 34
34#include <QLocalSocket> 35#include <QLocalSocket>
35#include <QTimer> 36#include <QTimer>
@@ -67,8 +68,7 @@ Listener::Listener(const QString &resourceName, QObject *parent)
67 connect(m_checkConnectionsTimer, &QTimer::timeout, [this]() { 68 connect(m_checkConnectionsTimer, &QTimer::timeout, [this]() {
68 if (m_connections.isEmpty()) { 69 if (m_connections.isEmpty()) {
69 Log() << QString("No connections, shutting down."); 70 Log() << QString("No connections, shutting down.");
70 m_server->close(); 71 quit();
71 emit noClients();
72 } 72 }
73 }); 73 });
74 74
@@ -250,10 +250,8 @@ void Listener::processCommand(int commandId, uint messageId, Client &client, uin
250 break; 250 break;
251 case Akonadi2::Commands::ShutdownCommand: 251 case Akonadi2::Commands::ShutdownCommand:
252 Log() << QString("\tReceived shutdown command from %1").arg(client.name); 252 Log() << QString("\tReceived shutdown command from %1").arg(client.name);
253 callback(); 253 QTimer::singleShot(0, this, &Listener::quit);
254 m_server->close(); 254 break;
255 emit noClients();
256 return;
257 default: 255 default:
258 if (commandId > Akonadi2::Commands::CustomCommand) { 256 if (commandId > Akonadi2::Commands::CustomCommand) {
259 loadResource(); 257 loadResource();
@@ -268,6 +266,22 @@ void Listener::processCommand(int commandId, uint messageId, Client &client, uin
268 callback(); 266 callback();
269} 267}
270 268
269void Listener::quit()
270{
271 //Broadcast shutdown notifications to open clients, so they don't try to restart the resource
272 auto command = Akonadi2::CreateNotification(m_fbb, Akonadi2::NotificationType::NotificationType_Shutdown);
273 Akonadi2::FinishNotificationBuffer(m_fbb, command);
274 for (Client &client : m_connections) {
275 if (client.socket && client.socket->isOpen()) {
276 Akonadi2::Commands::write(client.socket, ++m_messageId, Akonadi2::Commands::NotificationCommand, m_fbb);
277 }
278 }
279 m_fbb.Clear();
280
281 m_server->close();
282 emit noClients();
283}
284
271bool Listener::processClientBuffer(Client &client) 285bool Listener::processClientBuffer(Client &client)
272{ 286{
273 static const int headerSize = Akonadi2::Commands::headerSize(); 287 static const int headerSize = Akonadi2::Commands::headerSize();
@@ -314,15 +328,15 @@ void Listener::sendCurrentRevision(Client &client)
314 m_fbb.Clear(); 328 m_fbb.Clear();
315} 329}
316 330
317void Listener::sendCommandCompleted(Client &client, uint messageId) 331void Listener::sendCommandCompleted(QLocalSocket *socket, uint messageId)
318{ 332{
319 if (!client.socket || !client.socket->isValid()) { 333 if (!socket || !socket->isValid()) {
320 return; 334 return;
321 } 335 }
322 336
323 auto command = Akonadi2::CreateCommandCompletion(m_fbb, messageId); 337 auto command = Akonadi2::CreateCommandCompletion(m_fbb, messageId);
324 Akonadi2::FinishCommandCompletionBuffer(m_fbb, command); 338 Akonadi2::FinishCommandCompletionBuffer(m_fbb, command);
325 Akonadi2::Commands::write(client.socket, ++m_messageId, Akonadi2::Commands::CommandCompletion, m_fbb); 339 Akonadi2::Commands::write(socket, ++m_messageId, Akonadi2::Commands::CommandCompletion, m_fbb);
326 m_fbb.Clear(); 340 m_fbb.Clear();
327} 341}
328 342
diff --git a/synchronizer/listener.h b/synchronizer/listener.h
index ee73766..8dad3a4 100644
--- a/synchronizer/listener.h
+++ b/synchronizer/listener.h
@@ -49,7 +49,7 @@ public:
49 } 49 }
50 50
51 QString name; 51 QString name;
52 QLocalSocket *socket; 52 QPointer<QLocalSocket> socket;
53 QByteArray commandBuffer; 53 QByteArray commandBuffer;
54}; 54};
55 55
@@ -74,12 +74,13 @@ private Q_SLOTS:
74 void readFromSocket(); 74 void readFromSocket();
75 void processClientBuffers(); 75 void processClientBuffers();
76 void refreshRevision(); 76 void refreshRevision();
77 void quit();
77 78
78private: 79private:
79 void processCommand(int commandId, uint messageId, Client &client, uint size, const std::function<void()> &callback); 80 void processCommand(int commandId, uint messageId, Client &client, uint size, const std::function<void()> &callback);
80 bool processClientBuffer(Client &client); 81 bool processClientBuffer(Client &client);
81 void sendCurrentRevision(Client &client); 82 void sendCurrentRevision(Client &client);
82 void sendCommandCompleted(Client &client, uint messageId); 83 void sendCommandCompleted(QLocalSocket *socket, uint messageId);
83 void updateClientsWithRevision(); 84 void updateClientsWithRevision();
84 void loadResource(); 85 void loadResource();
85 86