summaryrefslogtreecommitdiffstats
path: root/synchronizer/listener.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'synchronizer/listener.cpp')
-rw-r--r--synchronizer/listener.cpp66
1 files changed, 50 insertions, 16 deletions
diff --git a/synchronizer/listener.cpp b/synchronizer/listener.cpp
index 1d20c4d..ef9738e 100644
--- a/synchronizer/listener.cpp
+++ b/synchronizer/listener.cpp
@@ -19,26 +19,30 @@
19 19
20#include "listener.h" 20#include "listener.h"
21 21
22#include "common/clientapi.h"
22#include "common/console.h" 23#include "common/console.h"
23#include "common/commands.h" 24#include "common/commands.h"
24#include "common/handshake_generated.h" 25#include "common/handshake_generated.h"
26#include "common/resource.h"
25#include "common/revisionupdate_generated.h" 27#include "common/revisionupdate_generated.h"
26 28
27#include <QLocalSocket> 29#include <QLocalSocket>
28#include <QTimer> 30#include <QTimer>
29 31
30Listener::Listener(const QString &resource, QObject *parent) 32Listener::Listener(const QString &resourceName, QObject *parent)
31 : QObject(parent), 33 : QObject(parent),
32 m_server(new QLocalServer(this)), 34 m_server(new QLocalServer(this)),
33 m_revision(0) 35 m_revision(0),
36 m_resourceName(resourceName),
37 m_resource(0)
34{ 38{
35 connect(m_server, &QLocalServer::newConnection, 39 connect(m_server, &QLocalServer::newConnection,
36 this, &Listener::acceptConnection); 40 this, &Listener::acceptConnection);
37 Akonadi2::Console::main()->log(QString("Trying to open %1").arg(resource)); 41 Akonadi2::Console::main()->log(QString("Trying to open %1").arg(resourceName));
38 if (!m_server->listen(resource)) { 42 if (!m_server->listen(resourceName)) {
39 // FIXME: multiple starts need to be handled here 43 // FIXME: multiple starts need to be handled here
40 m_server->removeServer(resource); 44 m_server->removeServer(resourceName);
41 if (!m_server->listen(resource)) { 45 if (!m_server->listen(resourceName)) {
42 Akonadi2::Console::main()->log("Utter failure to start server"); 46 Akonadi2::Console::main()->log("Utter failure to start server");
43 exit(-1); 47 exit(-1);
44 } 48 }
@@ -70,7 +74,6 @@ unsigned long long Listener::revision() const
70 74
71void Listener::closeAllConnections() 75void Listener::closeAllConnections()
72{ 76{
73 //TODO: close all client connectionsin m_connections
74 for (Client &client: m_connections) { 77 for (Client &client: m_connections) {
75 if (client.socket) { 78 if (client.socket) {
76 client.socket->close(); 79 client.socket->close();
@@ -90,7 +93,7 @@ void Listener::acceptConnection()
90 } 93 }
91 94
92 Akonadi2::Console::main()->log("Got a connection"); 95 Akonadi2::Console::main()->log("Got a connection");
93 Client client("Unknown Client" /*fixme: actual names!*/, socket); 96 Client client("Unknown Client", socket);
94 connect(socket, &QIODevice::readyRead, 97 connect(socket, &QIODevice::readyRead,
95 this, &Listener::readFromSocket); 98 this, &Listener::readFromSocket);
96 m_connections << client; 99 m_connections << client;
@@ -137,6 +140,7 @@ void Listener::readFromSocket()
137 140
138 Akonadi2::Console::main()->log("Reading from socket..."); 141 Akonadi2::Console::main()->log("Reading from socket...");
139 for (Client &client: m_connections) { 142 for (Client &client: m_connections) {
143 Akonadi2::Console::main()->log(QString("Checking %1 %2").arg((qlonglong)client.socket).arg((qlonglong)socket));
140 if (client.socket == socket) { 144 if (client.socket == socket) {
141 Akonadi2::Console::main()->log(QString(" Client: %1").arg(client.name)); 145 Akonadi2::Console::main()->log(QString(" Client: %1").arg(client.name));
142 client.commandBuffer += socket->readAll(); 146 client.commandBuffer += socket->readAll();
@@ -151,17 +155,19 @@ void Listener::readFromSocket()
151 155
152bool Listener::processClientBuffer(Client &client) 156bool Listener::processClientBuffer(Client &client)
153{ 157{
154 static const int headerSize = (sizeof(int) * 2); 158 static const int headerSize = (sizeof(int) + sizeof(uint));
155 Akonadi2::Console::main()->log(QString("processing %1").arg(client.commandBuffer.size()));
156 if (client.commandBuffer.size() < headerSize) { 159 if (client.commandBuffer.size() < headerSize) {
157 return false; 160 return false;
158 } 161 }
159 162
160 int commandId, size; 163 int commandId;
164 uint size;
161 commandId = *(int*)client.commandBuffer.constData(); 165 commandId = *(int*)client.commandBuffer.constData();
162 size = *(int*)(client.commandBuffer.constData() + sizeof(int)); 166 size = *(uint*)(client.commandBuffer.constData() + sizeof(uint));
163 167
164 if (size <= client.commandBuffer.size() - headerSize) { 168 //TODO: reject messages above a certain size?
169
170 if (size <= uint(client.commandBuffer.size() - headerSize)) {
165 QByteArray data = client.commandBuffer.mid(headerSize, size); 171 QByteArray data = client.commandBuffer.mid(headerSize, size);
166 client.commandBuffer.remove(0, headerSize + size); 172 client.commandBuffer.remove(0, headerSize + size);
167 173
@@ -169,18 +175,27 @@ bool Listener::processClientBuffer(Client &client)
169 case Akonadi2::Commands::HandshakeCommand: { 175 case Akonadi2::Commands::HandshakeCommand: {
170 auto buffer = Akonadi2::GetHandshake(data.constData()); 176 auto buffer = Akonadi2::GetHandshake(data.constData());
171 Akonadi2::Console::main()->log(QString(" Handshake from %1").arg(buffer->name()->c_str())); 177 Akonadi2::Console::main()->log(QString(" Handshake from %1").arg(buffer->name()->c_str()));
178 client.name = buffer->name()->c_str();
172 sendCurrentRevision(client); 179 sendCurrentRevision(client);
173 break; 180 break;
174 } 181 }
182 case Akonadi2::Commands::SynchronizeCommand: {
183 Akonadi2::Console::main()->log(QString(" Synchronize request from %1").arg(client.name));
184 loadResource();
185 //TODO: on failure ... what?
186 if (m_resource) {
187 m_resource->synchronizeWithSource();
188 }
189 break;
190 }
175 default: 191 default:
176 // client.hasSentCommand = true;
177 break; 192 break;
178 } 193 }
179 194
180 return client.commandBuffer.size() >= headerSize; 195 return client.commandBuffer.size() >= headerSize;
181 } else {
182 return false;
183 } 196 }
197
198 return false;
184} 199}
185 200
186void Listener::sendCurrentRevision(Client &client) 201void Listener::sendCurrentRevision(Client &client)
@@ -209,3 +224,22 @@ void Listener::updateClientsWithRevision()
209 } 224 }
210 m_fbb.Clear(); 225 m_fbb.Clear();
211} 226}
227
228void Listener::loadResource()
229{
230 if (m_resource) {
231 return;
232 }
233
234 Akonadi2::ResourceFactory *resourceFactory = Akonadi2::ResourceFactory::load(m_resourceName);
235 if (resourceFactory) {
236 m_resource = resourceFactory->createResource();
237 Akonadi2::Console::main()->log(QString("Resource factory: %1").arg((qlonglong)resourceFactory));
238 Akonadi2::Console::main()->log(QString("\tResource: %1").arg((qlonglong)m_resource));
239 //TODO: this doesn't really list all the facades .. fix
240 Akonadi2::Console::main()->log(QString("\tFacades: %1").arg(Akonadi2::FacadeFactory::instance().getFacade<Akonadi2::Domain::Event>(m_resourceName)->type()));
241 } else {
242 Akonadi2::Console::main()->log(QString("Failed to load resource %1").arg(m_resourceName));
243 }
244}
245