summaryrefslogtreecommitdiffstats
path: root/synchronizer
diff options
context:
space:
mode:
authorAaron Seigo <aseigo@kde.org>2014-12-16 12:14:38 +0100
committerAaron Seigo <aseigo@kde.org>2014-12-16 12:14:38 +0100
commit8ae840ac13d9e2161b411fbceae281a725fa8b1f (patch)
tree743fbaf8ba404db97995b336db27228ce7f6eff4 /synchronizer
parentce9498597c789453db5f3b770f0f21fc119f404f (diff)
downloadsink-8ae840ac13d9e2161b411fbceae281a725fa8b1f.tar.gz
sink-8ae840ac13d9e2161b411fbceae281a725fa8b1f.zip
load the resource on first command .. it LIIIIVES!
Diffstat (limited to 'synchronizer')
-rw-r--r--synchronizer/listener.cpp66
-rw-r--r--synchronizer/listener.h16
2 files changed, 61 insertions, 21 deletions
diff --git a/synchronizer/listener.cpp b/synchronizer/listener.cpp
index 1d20c4d..ef9738e 100644
--- a/synchronizer/listener.cpp
+++ b/synchronizer/listener.cpp
@@ -19,26 +19,30 @@
19 19
20#include "listener.h" 20#include "listener.h"
21 21
22#include "common/clientapi.h"
22#include "common/console.h" 23#include "common/console.h"
23#include "common/commands.h" 24#include "common/commands.h"
24#include "common/handshake_generated.h" 25#include "common/handshake_generated.h"
26#include "common/resource.h"
25#include "common/revisionupdate_generated.h" 27#include "common/revisionupdate_generated.h"
26 28
27#include <QLocalSocket> 29#include <QLocalSocket>
28#include <QTimer> 30#include <QTimer>
29 31
30Listener::Listener(const QString &resource, QObject *parent) 32Listener::Listener(const QString &resourceName, QObject *parent)
31 : QObject(parent), 33 : QObject(parent),
32 m_server(new QLocalServer(this)), 34 m_server(new QLocalServer(this)),
33 m_revision(0) 35 m_revision(0),
36 m_resourceName(resourceName),
37 m_resource(0)
34{ 38{
35 connect(m_server, &QLocalServer::newConnection, 39 connect(m_server, &QLocalServer::newConnection,
36 this, &Listener::acceptConnection); 40 this, &Listener::acceptConnection);
37 Akonadi2::Console::main()->log(QString("Trying to open %1").arg(resource)); 41 Akonadi2::Console::main()->log(QString("Trying to open %1").arg(resourceName));
38 if (!m_server->listen(resource)) { 42 if (!m_server->listen(resourceName)) {
39 // FIXME: multiple starts need to be handled here 43 // FIXME: multiple starts need to be handled here
40 m_server->removeServer(resource); 44 m_server->removeServer(resourceName);
41 if (!m_server->listen(resource)) { 45 if (!m_server->listen(resourceName)) {
42 Akonadi2::Console::main()->log("Utter failure to start server"); 46 Akonadi2::Console::main()->log("Utter failure to start server");
43 exit(-1); 47 exit(-1);
44 } 48 }
@@ -70,7 +74,6 @@ unsigned long long Listener::revision() const
70 74
71void Listener::closeAllConnections() 75void Listener::closeAllConnections()
72{ 76{
73 //TODO: close all client connectionsin m_connections
74 for (Client &client: m_connections) { 77 for (Client &client: m_connections) {
75 if (client.socket) { 78 if (client.socket) {
76 client.socket->close(); 79 client.socket->close();
@@ -90,7 +93,7 @@ void Listener::acceptConnection()
90 } 93 }
91 94
92 Akonadi2::Console::main()->log("Got a connection"); 95 Akonadi2::Console::main()->log("Got a connection");
93 Client client("Unknown Client" /*fixme: actual names!*/, socket); 96 Client client("Unknown Client", socket);
94 connect(socket, &QIODevice::readyRead, 97 connect(socket, &QIODevice::readyRead,
95 this, &Listener::readFromSocket); 98 this, &Listener::readFromSocket);
96 m_connections << client; 99 m_connections << client;
@@ -137,6 +140,7 @@ void Listener::readFromSocket()
137 140
138 Akonadi2::Console::main()->log("Reading from socket..."); 141 Akonadi2::Console::main()->log("Reading from socket...");
139 for (Client &client: m_connections) { 142 for (Client &client: m_connections) {
143 Akonadi2::Console::main()->log(QString("Checking %1 %2").arg((qlonglong)client.socket).arg((qlonglong)socket));
140 if (client.socket == socket) { 144 if (client.socket == socket) {
141 Akonadi2::Console::main()->log(QString(" Client: %1").arg(client.name)); 145 Akonadi2::Console::main()->log(QString(" Client: %1").arg(client.name));
142 client.commandBuffer += socket->readAll(); 146 client.commandBuffer += socket->readAll();
@@ -151,17 +155,19 @@ void Listener::readFromSocket()
151 155
152bool Listener::processClientBuffer(Client &client) 156bool Listener::processClientBuffer(Client &client)
153{ 157{
154 static const int headerSize = (sizeof(int) * 2); 158 static const int headerSize = (sizeof(int) + sizeof(uint));
155 Akonadi2::Console::main()->log(QString("processing %1").arg(client.commandBuffer.size()));
156 if (client.commandBuffer.size() < headerSize) { 159 if (client.commandBuffer.size() < headerSize) {
157 return false; 160 return false;
158 } 161 }
159 162
160 int commandId, size; 163 int commandId;
164 uint size;
161 commandId = *(int*)client.commandBuffer.constData(); 165 commandId = *(int*)client.commandBuffer.constData();
162 size = *(int*)(client.commandBuffer.constData() + sizeof(int)); 166 size = *(uint*)(client.commandBuffer.constData() + sizeof(uint));
163 167
164 if (size <= client.commandBuffer.size() - headerSize) { 168 //TODO: reject messages above a certain size?
169
170 if (size <= uint(client.commandBuffer.size() - headerSize)) {
165 QByteArray data = client.commandBuffer.mid(headerSize, size); 171 QByteArray data = client.commandBuffer.mid(headerSize, size);
166 client.commandBuffer.remove(0, headerSize + size); 172 client.commandBuffer.remove(0, headerSize + size);
167 173
@@ -169,18 +175,27 @@ bool Listener::processClientBuffer(Client &client)
169 case Akonadi2::Commands::HandshakeCommand: { 175 case Akonadi2::Commands::HandshakeCommand: {
170 auto buffer = Akonadi2::GetHandshake(data.constData()); 176 auto buffer = Akonadi2::GetHandshake(data.constData());
171 Akonadi2::Console::main()->log(QString(" Handshake from %1").arg(buffer->name()->c_str())); 177 Akonadi2::Console::main()->log(QString(" Handshake from %1").arg(buffer->name()->c_str()));
178 client.name = buffer->name()->c_str();
172 sendCurrentRevision(client); 179 sendCurrentRevision(client);
173 break; 180 break;
174 } 181 }
182 case Akonadi2::Commands::SynchronizeCommand: {
183 Akonadi2::Console::main()->log(QString(" Synchronize request from %1").arg(client.name));
184 loadResource();
185 //TODO: on failure ... what?
186 if (m_resource) {
187 m_resource->synchronizeWithSource();
188 }
189 break;
190 }
175 default: 191 default:
176 // client.hasSentCommand = true;
177 break; 192 break;
178 } 193 }
179 194
180 return client.commandBuffer.size() >= headerSize; 195 return client.commandBuffer.size() >= headerSize;
181 } else {
182 return false;
183 } 196 }
197
198 return false;
184} 199}
185 200
186void Listener::sendCurrentRevision(Client &client) 201void Listener::sendCurrentRevision(Client &client)
@@ -209,3 +224,22 @@ void Listener::updateClientsWithRevision()
209 } 224 }
210 m_fbb.Clear(); 225 m_fbb.Clear();
211} 226}
227
228void Listener::loadResource()
229{
230 if (m_resource) {
231 return;
232 }
233
234 Akonadi2::ResourceFactory *resourceFactory = Akonadi2::ResourceFactory::load(m_resourceName);
235 if (resourceFactory) {
236 m_resource = resourceFactory->createResource();
237 Akonadi2::Console::main()->log(QString("Resource factory: %1").arg((qlonglong)resourceFactory));
238 Akonadi2::Console::main()->log(QString("\tResource: %1").arg((qlonglong)m_resource));
239 //TODO: this doesn't really list all the facades .. fix
240 Akonadi2::Console::main()->log(QString("\tFacades: %1").arg(Akonadi2::FacadeFactory::instance().getFacade<Akonadi2::Domain::Event>(m_resourceName)->type()));
241 } else {
242 Akonadi2::Console::main()->log(QString("Failed to load resource %1").arg(m_resourceName));
243 }
244}
245
diff --git a/synchronizer/listener.h b/synchronizer/listener.h
index b20da7f..5c725df 100644
--- a/synchronizer/listener.h
+++ b/synchronizer/listener.h
@@ -26,26 +26,28 @@
26 26
27#include <flatbuffers/flatbuffers.h> 27#include <flatbuffers/flatbuffers.h>
28 28
29namespace Akonadi2
30{
31 class Resource;
32}
33
29class Client 34class Client
30{ 35{
31public: 36public:
32 Client() 37 Client()
33 : socket(nullptr), 38 : socket(nullptr)
34 hasSentCommand(false)
35 { 39 {
36 } 40 }
37 41
38 Client(const QString &n, QLocalSocket *s) 42 Client(const QString &n, QLocalSocket *s)
39 : name(n), 43 : name(n),
40 socket(s), 44 socket(s)
41 hasSentCommand(false)
42 { 45 {
43 } 46 }
44 47
45 QString name; 48 QString name;
46 QLocalSocket *socket; 49 QLocalSocket *socket;
47 QByteArray commandBuffer; 50 QByteArray commandBuffer;
48 bool hasSentCommand;
49}; 51};
50 52
51class Listener : public QObject 53class Listener : public QObject
@@ -75,8 +77,12 @@ private:
75 bool processClientBuffer(Client &client); 77 bool processClientBuffer(Client &client);
76 void sendCurrentRevision(Client &client); 78 void sendCurrentRevision(Client &client);
77 void updateClientsWithRevision(); 79 void updateClientsWithRevision();
80 void loadResource();
81
78 QLocalServer *m_server; 82 QLocalServer *m_server;
79 QVector<Client> m_connections; 83 QVector<Client> m_connections;
80 unsigned long long m_revision; 84 unsigned long long m_revision;
81 flatbuffers::FlatBufferBuilder m_fbb; 85 flatbuffers::FlatBufferBuilder m_fbb;
86 const QString m_resourceName;
87 Akonadi2::Resource *m_resource;
82}; 88};