diff options
Diffstat (limited to 'synchronizer')
-rw-r--r-- | synchronizer/listener.cpp | 62 | ||||
-rw-r--r-- | synchronizer/listener.h | 1 | ||||
-rw-r--r-- | synchronizer/main.cpp | 14 |
3 files changed, 45 insertions, 32 deletions
diff --git a/synchronizer/listener.cpp b/synchronizer/listener.cpp index d191bb8..2bc6be0 100644 --- a/synchronizer/listener.cpp +++ b/synchronizer/listener.cpp | |||
@@ -23,6 +23,7 @@ | |||
23 | #include "common/console.h" | 23 | #include "common/console.h" |
24 | #include "common/commands.h" | 24 | #include "common/commands.h" |
25 | #include "common/resource.h" | 25 | #include "common/resource.h" |
26 | #include "common/log.h" | ||
26 | 27 | ||
27 | // commands | 28 | // commands |
28 | #include "common/commandcompletion_generated.h" | 29 | #include "common/commandcompletion_generated.h" |
@@ -46,18 +47,18 @@ Listener::Listener(const QString &resourceName, QObject *parent) | |||
46 | this, &Listener::refreshRevision); | 47 | this, &Listener::refreshRevision); |
47 | connect(m_server, &QLocalServer::newConnection, | 48 | connect(m_server, &QLocalServer::newConnection, |
48 | this, &Listener::acceptConnection); | 49 | this, &Listener::acceptConnection); |
49 | log(QString("Trying to open %1").arg(resourceName)); | 50 | Log() << QString("Trying to open %1").arg(resourceName); |
50 | if (!m_server->listen(resourceName)) { | 51 | if (!m_server->listen(resourceName)) { |
51 | // FIXME: multiple starts need to be handled here | 52 | // FIXME: multiple starts need to be handled here |
52 | m_server->removeServer(resourceName); | 53 | m_server->removeServer(resourceName); |
53 | if (!m_server->listen(resourceName)) { | 54 | if (!m_server->listen(resourceName)) { |
54 | log("Utter failure to start server"); | 55 | Warning() << "Utter failure to start server"; |
55 | exit(-1); | 56 | exit(-1); |
56 | } | 57 | } |
57 | } | 58 | } |
58 | 59 | ||
59 | if (m_server->isListening()) { | 60 | if (m_server->isListening()) { |
60 | log(QString("Listening on %1").arg(m_server->serverName())); | 61 | Log() << QString("Listening on %1").arg(m_server->serverName()); |
61 | } | 62 | } |
62 | 63 | ||
63 | m_checkConnectionsTimer = new QTimer; | 64 | m_checkConnectionsTimer = new QTimer; |
@@ -65,7 +66,7 @@ Listener::Listener(const QString &resourceName, QObject *parent) | |||
65 | m_checkConnectionsTimer->setInterval(1000); | 66 | m_checkConnectionsTimer->setInterval(1000); |
66 | connect(m_checkConnectionsTimer, &QTimer::timeout, [this]() { | 67 | connect(m_checkConnectionsTimer, &QTimer::timeout, [this]() { |
67 | if (m_connections.isEmpty()) { | 68 | if (m_connections.isEmpty()) { |
68 | log(QString("No connections, shutting down.")); | 69 | Log() << QString("No connections, shutting down."); |
69 | m_server->close(); | 70 | m_server->close(); |
70 | emit noClients(); | 71 | emit noClients(); |
71 | } | 72 | } |
@@ -98,14 +99,14 @@ void Listener::closeAllConnections() | |||
98 | 99 | ||
99 | void Listener::acceptConnection() | 100 | void Listener::acceptConnection() |
100 | { | 101 | { |
101 | log(QString("Accepting connection")); | 102 | Log() << QString("Accepting connection"); |
102 | QLocalSocket *socket = m_server->nextPendingConnection(); | 103 | QLocalSocket *socket = m_server->nextPendingConnection(); |
103 | 104 | ||
104 | if (!socket) { | 105 | if (!socket) { |
105 | return; | 106 | return; |
106 | } | 107 | } |
107 | 108 | ||
108 | log("Got a connection"); | 109 | Log() << "Got a connection"; |
109 | Client client("Unknown Client", socket); | 110 | Client client("Unknown Client", socket); |
110 | connect(socket, &QIODevice::readyRead, | 111 | connect(socket, &QIODevice::readyRead, |
111 | this, &Listener::readFromSocket); | 112 | this, &Listener::readFromSocket); |
@@ -123,16 +124,20 @@ void Listener::clientDropped() | |||
123 | return; | 124 | return; |
124 | } | 125 | } |
125 | 126 | ||
126 | log("Dropping connection..."); | 127 | bool dropped = false; |
127 | QMutableVectorIterator<Client> it(m_connections); | 128 | QMutableVectorIterator<Client> it(m_connections); |
128 | while (it.hasNext()) { | 129 | while (it.hasNext()) { |
129 | const Client &client = it.next(); | 130 | const Client &client = it.next(); |
130 | if (client.socket == socket) { | 131 | if (client.socket == socket) { |
131 | log(QString(" dropped... %1").arg(client.name)); | 132 | dropped = true; |
133 | Log() << QString("Dropped connection: %1").arg(client.name) << socket; | ||
132 | it.remove(); | 134 | it.remove(); |
133 | break; | 135 | break; |
134 | } | 136 | } |
135 | } | 137 | } |
138 | if (!dropped) { | ||
139 | Warning() << "Failed to find connection for disconnected socket: " << socket; | ||
140 | } | ||
136 | 141 | ||
137 | checkConnections(); | 142 | checkConnections(); |
138 | } | 143 | } |
@@ -149,7 +154,7 @@ void Listener::readFromSocket() | |||
149 | return; | 154 | return; |
150 | } | 155 | } |
151 | 156 | ||
152 | log("Reading from socket..."); | 157 | Log() << "Reading from socket..."; |
153 | for (Client &client: m_connections) { | 158 | for (Client &client: m_connections) { |
154 | if (client.socket == socket) { | 159 | if (client.socket == socket) { |
155 | client.commandBuffer += socket->readAll(); | 160 | client.commandBuffer += socket->readAll(); |
@@ -193,7 +198,7 @@ void Listener::processCommand(int commandId, uint messageId, Client &client, uin | |||
193 | client.name = buffer->name()->c_str(); | 198 | client.name = buffer->name()->c_str(); |
194 | sendCurrentRevision(client); | 199 | sendCurrentRevision(client); |
195 | } else { | 200 | } else { |
196 | qWarning() << "received invalid command"; | 201 | Warning() << "received invalid command"; |
197 | } | 202 | } |
198 | break; | 203 | break; |
199 | } | 204 | } |
@@ -201,10 +206,10 @@ void Listener::processCommand(int commandId, uint messageId, Client &client, uin | |||
201 | flatbuffers::Verifier verifier((const uint8_t *)client.commandBuffer.constData(), size); | 206 | flatbuffers::Verifier verifier((const uint8_t *)client.commandBuffer.constData(), size); |
202 | if (Akonadi2::VerifySynchronizeBuffer(verifier)) { | 207 | if (Akonadi2::VerifySynchronizeBuffer(verifier)) { |
203 | auto buffer = Akonadi2::GetSynchronize(client.commandBuffer.constData()); | 208 | auto buffer = Akonadi2::GetSynchronize(client.commandBuffer.constData()); |
204 | log(QString("\tSynchronize request (id %1) from %2").arg(messageId).arg(client.name)); | 209 | Log() << QString("\tSynchronize request (id %1) from %2").arg(messageId).arg(client.name); |
205 | loadResource(); | 210 | loadResource(); |
206 | if (!m_resource) { | 211 | if (!m_resource) { |
207 | qWarning() << "No resource loaded"; | 212 | Warning() << "No resource loaded"; |
208 | break; | 213 | break; |
209 | } | 214 | } |
210 | //TODO a more elegant composition of jobs should be possible | 215 | //TODO a more elegant composition of jobs should be possible |
@@ -229,7 +234,7 @@ void Listener::processCommand(int commandId, uint messageId, Client &client, uin | |||
229 | } | 234 | } |
230 | return; | 235 | return; |
231 | } else { | 236 | } else { |
232 | qWarning() << "received invalid command"; | 237 | Warning() << "received invalid command"; |
233 | } | 238 | } |
234 | break; | 239 | break; |
235 | } | 240 | } |
@@ -237,14 +242,14 @@ void Listener::processCommand(int commandId, uint messageId, Client &client, uin | |||
237 | case Akonadi2::Commands::DeleteEntityCommand: | 242 | case Akonadi2::Commands::DeleteEntityCommand: |
238 | case Akonadi2::Commands::ModifyEntityCommand: | 243 | case Akonadi2::Commands::ModifyEntityCommand: |
239 | case Akonadi2::Commands::CreateEntityCommand: | 244 | case Akonadi2::Commands::CreateEntityCommand: |
240 | log(QString("\tCommand id %1 of type %2 from %3").arg(messageId).arg(commandId).arg(client.name)); | 245 | Log() << QString("\tCommand id %1 of type %2 from %3").arg(messageId).arg(commandId).arg(client.name); |
241 | loadResource(); | 246 | loadResource(); |
242 | if (m_resource) { | 247 | if (m_resource) { |
243 | m_resource->processCommand(commandId, client.commandBuffer, size, m_pipeline); | 248 | m_resource->processCommand(commandId, client.commandBuffer, size, m_pipeline); |
244 | } | 249 | } |
245 | break; | 250 | break; |
246 | case Akonadi2::Commands::ShutdownCommand: | 251 | case Akonadi2::Commands::ShutdownCommand: |
247 | log(QString("\tReceived shutdown command from %1").arg(client.name)); | 252 | Log() << QString("\tReceived shutdown command from %1").arg(client.name); |
248 | callback(); | 253 | callback(); |
249 | m_server->close(); | 254 | m_server->close(); |
250 | emit noClients(); | 255 | emit noClients(); |
@@ -279,10 +284,15 @@ bool Listener::processClientBuffer(Client &client) | |||
279 | if (size <= uint(client.commandBuffer.size() - headerSize)) { | 284 | if (size <= uint(client.commandBuffer.size() - headerSize)) { |
280 | client.commandBuffer.remove(0, headerSize); | 285 | client.commandBuffer.remove(0, headerSize); |
281 | 286 | ||
282 | processCommand(commandId, messageId, client, size, [this, messageId, commandId, &client]() { | 287 | auto socket = QPointer<QLocalSocket>(client.socket); |
283 | log(QString("\tCompleted command messageid %1 of type %2 from %3").arg(messageId).arg(commandId).arg(client.name)); | 288 | auto clientName = client.name; |
284 | //FIXME, client needs to become a shared pointer and not a reference, or we have to search through m_connections everytime. | 289 | processCommand(commandId, messageId, client, size, [this, messageId, commandId, socket, clientName]() { |
285 | sendCommandCompleted(client, messageId); | 290 | Log() << QString("\tCompleted command messageid %1 of type %2 from %3").arg(messageId).arg(commandId).arg(clientName); |
291 | if (socket) { | ||
292 | sendCommandCompleted(socket.data(), messageId); | ||
293 | } else { | ||
294 | Log() << QString("Socket became invalid before we could send a response. client: %1").arg(clientName); | ||
295 | } | ||
286 | }); | 296 | }); |
287 | client.commandBuffer.remove(0, size); | 297 | client.commandBuffer.remove(0, size); |
288 | 298 | ||
@@ -346,21 +356,15 @@ void Listener::loadResource() | |||
346 | Akonadi2::ResourceFactory *resourceFactory = Akonadi2::ResourceFactory::load(m_resourceName); | 356 | Akonadi2::ResourceFactory *resourceFactory = Akonadi2::ResourceFactory::load(m_resourceName); |
347 | if (resourceFactory) { | 357 | if (resourceFactory) { |
348 | m_resource = resourceFactory->createResource(); | 358 | m_resource = resourceFactory->createResource(); |
349 | log(QString("Resource factory: %1").arg((qlonglong)resourceFactory)); | 359 | Log() << QString("Resource factory: %1").arg((qlonglong)resourceFactory); |
350 | log(QString("\tResource: %1").arg((qlonglong)m_resource)); | 360 | Log() << QString("\tResource: %1").arg((qlonglong)m_resource); |
351 | //TODO: this doesn't really list all the facades .. fix | 361 | //TODO: this doesn't really list all the facades .. fix |
352 | log(QString("\tFacades: %1").arg(Akonadi2::FacadeFactory::instance().getFacade<Akonadi2::Domain::Event>(m_resourceName)->type())); | 362 | Log() << QString("\tFacades: %1").arg(Akonadi2::FacadeFactory::instance().getFacade<Akonadi2::Domain::Event>(m_resourceName)->type()); |
353 | m_resource->configurePipeline(m_pipeline); | 363 | m_resource->configurePipeline(m_pipeline); |
354 | } else { | 364 | } else { |
355 | log(QString("Failed to load resource %1").arg(m_resourceName)); | 365 | Warning() << QString("Failed to load resource %1").arg(m_resourceName); |
356 | } | 366 | } |
357 | //TODO: on failure ... what? | 367 | //TODO: on failure ... what? |
358 | //Enter broken state? | 368 | //Enter broken state? |
359 | } | 369 | } |
360 | 370 | ||
361 | void Listener::log(const QString &message) | ||
362 | { | ||
363 | qDebug() << "Listener: " << message; | ||
364 | // Akonadi2::Console::main()->log("Listener: " + message); | ||
365 | } | ||
366 | |||
diff --git a/synchronizer/listener.h b/synchronizer/listener.h index f1241c7..ee73766 100644 --- a/synchronizer/listener.h +++ b/synchronizer/listener.h | |||
@@ -82,7 +82,6 @@ private: | |||
82 | void sendCommandCompleted(Client &client, uint messageId); | 82 | void sendCommandCompleted(Client &client, uint messageId); |
83 | void updateClientsWithRevision(); | 83 | void updateClientsWithRevision(); |
84 | void loadResource(); | 84 | void loadResource(); |
85 | void log(const QString &); | ||
86 | 85 | ||
87 | QLocalServer *m_server; | 86 | QLocalServer *m_server; |
88 | QVector<Client> m_connections; | 87 | QVector<Client> m_connections; |
diff --git a/synchronizer/main.cpp b/synchronizer/main.cpp index 86e1497..587f96a 100644 --- a/synchronizer/main.cpp +++ b/synchronizer/main.cpp | |||
@@ -19,15 +19,25 @@ | |||
19 | 19 | ||
20 | #include <QApplication> | 20 | #include <QApplication> |
21 | 21 | ||
22 | #include "common/console.h" | 22 | #include <signal.h> |
23 | |||
23 | #include "listener.h" | 24 | #include "listener.h" |
25 | #include "log.h" | ||
26 | |||
27 | void crashHandler(int sig) { | ||
28 | std::fprintf(stderr, "Error: signal %d\n", sig); | ||
29 | std::system("exec xterm -e gdb -p \"$PPID\""); | ||
30 | std::abort(); | ||
31 | } | ||
24 | 32 | ||
25 | int main(int argc, char *argv[]) | 33 | int main(int argc, char *argv[]) |
26 | { | 34 | { |
35 | //For crashes | ||
36 | signal(SIGSEGV, crashHandler); | ||
27 | QApplication app(argc, argv); | 37 | QApplication app(argc, argv); |
28 | 38 | ||
29 | if (argc < 2) { | 39 | if (argc < 2) { |
30 | qWarning() << "Not enough args passed, no resource loaded."; | 40 | Warning() << "Not enough args passed, no resource loaded."; |
31 | return app.exec(); | 41 | return app.exec(); |
32 | } | 42 | } |
33 | 43 | ||