summaryrefslogtreecommitdiffstats
path: root/synchronizer
diff options
context:
space:
mode:
Diffstat (limited to 'synchronizer')
-rw-r--r--synchronizer/listener.cpp155
-rw-r--r--synchronizer/listener.h6
2 files changed, 82 insertions, 79 deletions
diff --git a/synchronizer/listener.cpp b/synchronizer/listener.cpp
index 328d4d6..8e94213 100644
--- a/synchronizer/listener.cpp
+++ b/synchronizer/listener.cpp
@@ -35,7 +35,6 @@
35Listener::Listener(const QString &resourceName, QObject *parent) 35Listener::Listener(const QString &resourceName, QObject *parent)
36 : QObject(parent), 36 : QObject(parent),
37 m_server(new QLocalServer(this)), 37 m_server(new QLocalServer(this)),
38 m_revision(0),
39 m_resourceName(resourceName), 38 m_resourceName(resourceName),
40 m_resource(0), 39 m_resource(0),
41 m_pipeline(new Akonadi2::Pipeline(resourceName, parent)), 40 m_pipeline(new Akonadi2::Pipeline(resourceName, parent)),
@@ -46,18 +45,18 @@ Listener::Listener(const QString &resourceName, QObject *parent)
46 this, &Listener::refreshRevision); 45 this, &Listener::refreshRevision);
47 connect(m_server, &QLocalServer::newConnection, 46 connect(m_server, &QLocalServer::newConnection,
48 this, &Listener::acceptConnection); 47 this, &Listener::acceptConnection);
49 Akonadi2::Console::main()->log(QString("Trying to open %1").arg(resourceName)); 48 log(QString("Trying to open %1").arg(resourceName));
50 if (!m_server->listen(resourceName)) { 49 if (!m_server->listen(resourceName)) {
51 // FIXME: multiple starts need to be handled here 50 // FIXME: multiple starts need to be handled here
52 m_server->removeServer(resourceName); 51 m_server->removeServer(resourceName);
53 if (!m_server->listen(resourceName)) { 52 if (!m_server->listen(resourceName)) {
54 Akonadi2::Console::main()->log("Utter failure to start server"); 53 log("Utter failure to start server");
55 exit(-1); 54 exit(-1);
56 } 55 }
57 } 56 }
58 57
59 if (m_server->isListening()) { 58 if (m_server->isListening()) {
60 Akonadi2::Console::main()->log(QString("Listening on %1").arg(m_server->serverName())); 59 log(QString("Listening on %1").arg(m_server->serverName()));
61 } 60 }
62 61
63 //TODO: experiment with different timeouts 62 //TODO: experiment with different timeouts
@@ -73,19 +72,6 @@ Listener::~Listener()
73{ 72{
74} 73}
75 74
76void Listener::setRevision(unsigned long long revision)
77{
78 if (m_revision != revision) {
79 m_revision = revision;
80 updateClientsWithRevision();
81 }
82}
83
84unsigned long long Listener::revision() const
85{
86 return m_revision;
87}
88
89void Listener::closeAllConnections() 75void Listener::closeAllConnections()
90{ 76{
91 for (Client &client: m_connections) { 77 for (Client &client: m_connections) {
@@ -101,14 +87,14 @@ void Listener::closeAllConnections()
101 87
102void Listener::acceptConnection() 88void Listener::acceptConnection()
103{ 89{
104 Akonadi2::Console::main()->log(QString("Accepting connection")); 90 log(QString("Accepting connection"));
105 QLocalSocket *socket = m_server->nextPendingConnection(); 91 QLocalSocket *socket = m_server->nextPendingConnection();
106 92
107 if (!socket) { 93 if (!socket) {
108 return; 94 return;
109 } 95 }
110 96
111 Akonadi2::Console::main()->log("Got a connection"); 97 log("Got a connection");
112 Client client("Unknown Client", socket); 98 Client client("Unknown Client", socket);
113 connect(socket, &QIODevice::readyRead, 99 connect(socket, &QIODevice::readyRead,
114 this, &Listener::readFromSocket); 100 this, &Listener::readFromSocket);
@@ -125,12 +111,12 @@ void Listener::clientDropped()
125 return; 111 return;
126 } 112 }
127 113
128 Akonadi2::Console::main()->log("Dropping connection..."); 114 log("Dropping connection...");
129 QMutableVectorIterator<Client> it(m_connections); 115 QMutableVectorIterator<Client> it(m_connections);
130 while (it.hasNext()) { 116 while (it.hasNext()) {
131 const Client &client = it.next(); 117 const Client &client = it.next();
132 if (client.socket == socket) { 118 if (client.socket == socket) {
133 Akonadi2::Console::main()->log(QString(" dropped... %1").arg(client.name)); 119 log(QString(" dropped... %1").arg(client.name));
134 it.remove(); 120 it.remove();
135 break; 121 break;
136 } 122 }
@@ -154,7 +140,7 @@ void Listener::readFromSocket()
154 return; 140 return;
155 } 141 }
156 142
157 Akonadi2::Console::main()->log("Reading from socket..."); 143 log("Reading from socket...");
158 for (Client &client: m_connections) { 144 for (Client &client: m_connections) {
159 if (client.socket == socket) { 145 if (client.socket == socket) {
160 client.commandBuffer += socket->readAll(); 146 client.commandBuffer += socket->readAll();
@@ -188,6 +174,57 @@ void Listener::processClientBuffers()
188 } 174 }
189} 175}
190 176
177void Listener::processCommand(int commandId, uint messageId, Client &client, uint size, const std::function<void()> &callback)
178{
179 switch (commandId) {
180 case Akonadi2::Commands::HandshakeCommand: {
181 flatbuffers::Verifier verifier((const uint8_t *)client.commandBuffer.constData(), size);
182 if (Akonadi2::VerifyHandshakeBuffer(verifier)) {
183 auto buffer = Akonadi2::GetHandshake(client.commandBuffer.constData());
184 client.name = buffer->name()->c_str();
185 sendCurrentRevision(client);
186 } else {
187 qWarning() << "received invalid command";
188 }
189 break;
190 }
191 case Akonadi2::Commands::SynchronizeCommand: {
192 log(QString("\tSynchronize request (id %1) from %2").arg(messageId).arg(client.name));
193 loadResource();
194 if (m_resource) {
195 qDebug() << "synchronizing";
196 m_resource->synchronizeWithSource(m_pipeline).then<void>([callback](Async::Future<void> &f){
197 //FIXME: the command is complete once all pipelines have been drained, track progress and async emit result
198 callback();
199 f.setFinished();
200 }).exec();
201 return;
202 } else {
203 qWarning() << "No resource loaded";
204 }
205 break;
206 }
207 case Akonadi2::Commands::FetchEntityCommand:
208 case Akonadi2::Commands::DeleteEntityCommand:
209 case Akonadi2::Commands::ModifyEntityCommand:
210 case Akonadi2::Commands::CreateEntityCommand:
211 log(QString("\tCommand id %1 of type %2 from %3").arg(messageId).arg(commandId).arg(client.name));
212 m_resource->processCommand(commandId, client.commandBuffer, size, m_pipeline);
213 break;
214 default:
215 if (commandId > Akonadi2::Commands::CustomCommand) {
216 loadResource();
217 if (m_resource) {
218 m_resource->processCommand(commandId, client.commandBuffer, size, m_pipeline);
219 }
220 } else {
221 //TODO: handle error: we don't know wtf this command is
222 }
223 break;
224 }
225 callback();
226}
227
191bool Listener::processClientBuffer(Client &client) 228bool Listener::processClientBuffer(Client &client)
192{ 229{
193 static const int headerSize = Akonadi2::Commands::headerSize(); 230 static const int headerSize = Akonadi2::Commands::headerSize();
@@ -195,58 +232,22 @@ bool Listener::processClientBuffer(Client &client)
195 return false; 232 return false;
196 } 233 }
197 234
198 int commandId; 235 const uint messageId = *(uint*)client.commandBuffer.constData();
199 uint messageId, size; 236 const int commandId = *(int*)(client.commandBuffer.constData() + sizeof(uint));
200 messageId = *(uint*)client.commandBuffer.constData(); 237 const uint size = *(uint*)(client.commandBuffer.constData() + sizeof(int) + sizeof(uint));
201 commandId = *(int*)(client.commandBuffer.constData() + sizeof(uint));
202 size = *(uint*)(client.commandBuffer.constData() + sizeof(int) + sizeof(uint));
203 238
204 //TODO: reject messages above a certain size? 239 //TODO: reject messages above a certain size?
205 240
206 if (size <= uint(client.commandBuffer.size() - headerSize)) { 241 if (size <= uint(client.commandBuffer.size() - headerSize)) {
207 client.commandBuffer.remove(0, headerSize); 242 client.commandBuffer.remove(0, headerSize);
208 243
209 switch (commandId) { 244 processCommand(commandId, messageId, client, size, [this, messageId, commandId, &client]() {
210 case Akonadi2::Commands::HandshakeCommand: { 245 log(QString("\tCompleted command messageid %1 of type %2 from %3").arg(messageId).arg(commandId).arg(client.name));
211 flatbuffers::Verifier verifier((const uint8_t *)client.commandBuffer.constData(), size); 246 //FIXME, client needs to become a shared pointer and not a reference, or we have to search through m_connections everytime.
212 if (Akonadi2::VerifyHandshakeBuffer(verifier)) { 247 sendCommandCompleted(client, messageId);
213 auto buffer = Akonadi2::GetHandshake(client.commandBuffer.constData()); 248 });
214 client.name = buffer->name()->c_str();
215 sendCurrentRevision(client);
216 }
217 break;
218 }
219 case Akonadi2::Commands::SynchronizeCommand: {
220 Akonadi2::Console::main()->log(QString("\tSynchronize request (id %1) from %2").arg(messageId).arg(client.name));
221 loadResource();
222 if (m_resource) {
223 m_resource->synchronizeWithSource(m_pipeline);
224 }
225 break;
226 }
227 case Akonadi2::Commands::FetchEntityCommand:
228 case Akonadi2::Commands::DeleteEntityCommand:
229 case Akonadi2::Commands::ModifyEntityCommand:
230 case Akonadi2::Commands::CreateEntityCommand:
231 Akonadi2::Console::main()->log(QString("\tCommand id %1 of type %2 from %3").arg(messageId).arg(commandId).arg(client.name));
232 m_resource->processCommand(commandId, client.commandBuffer, size, m_pipeline);
233 break;
234 default:
235 if (commandId > Akonadi2::Commands::CustomCommand) {
236 loadResource();
237 if (m_resource) {
238 m_resource->processCommand(commandId, client.commandBuffer, size, m_pipeline);
239 }
240 } else {
241 //TODO: handle error: we don't know wtf this command is
242 }
243 break;
244 }
245
246 //TODO: async commands == async sendCommandCompleted
247 Akonadi2::Console::main()->log(QString("\tCompleted command id %1 of type %2 from %3").arg(messageId).arg(commandId).arg(client.name));
248 client.commandBuffer.remove(0, size); 249 client.commandBuffer.remove(0, size);
249 sendCommandCompleted(client, messageId); 250
250 return client.commandBuffer.size() >= headerSize; 251 return client.commandBuffer.size() >= headerSize;
251 } 252 }
252 253
@@ -259,7 +260,7 @@ void Listener::sendCurrentRevision(Client &client)
259 return; 260 return;
260 } 261 }
261 262
262 auto command = Akonadi2::CreateRevisionUpdate(m_fbb, m_revision); 263 auto command = Akonadi2::CreateRevisionUpdate(m_fbb, m_pipeline->storage().maxRevision());
263 Akonadi2::FinishRevisionUpdateBuffer(m_fbb, command); 264 Akonadi2::FinishRevisionUpdateBuffer(m_fbb, command);
264 Akonadi2::Commands::write(client.socket, ++m_messageId, Akonadi2::Commands::RevisionUpdateCommand, m_fbb); 265 Akonadi2::Commands::write(client.socket, ++m_messageId, Akonadi2::Commands::RevisionUpdateCommand, m_fbb);
265 m_fbb.Clear(); 266 m_fbb.Clear();
@@ -279,14 +280,12 @@ void Listener::sendCommandCompleted(Client &client, uint messageId)
279 280
280void Listener::refreshRevision() 281void Listener::refreshRevision()
281{ 282{
282 //TODO this should be coming out of m_pipeline->storage()
283 ++m_revision;
284 updateClientsWithRevision(); 283 updateClientsWithRevision();
285} 284}
286 285
287void Listener::updateClientsWithRevision() 286void Listener::updateClientsWithRevision()
288{ 287{
289 auto command = Akonadi2::CreateRevisionUpdate(m_fbb, m_revision); 288 auto command = Akonadi2::CreateRevisionUpdate(m_fbb, m_pipeline->storage().maxRevision());
290 Akonadi2::FinishRevisionUpdateBuffer(m_fbb, command); 289 Akonadi2::FinishRevisionUpdateBuffer(m_fbb, command);
291 290
292 for (const Client &client: m_connections) { 291 for (const Client &client: m_connections) {
@@ -308,13 +307,19 @@ void Listener::loadResource()
308 Akonadi2::ResourceFactory *resourceFactory = Akonadi2::ResourceFactory::load(m_resourceName); 307 Akonadi2::ResourceFactory *resourceFactory = Akonadi2::ResourceFactory::load(m_resourceName);
309 if (resourceFactory) { 308 if (resourceFactory) {
310 m_resource = resourceFactory->createResource(); 309 m_resource = resourceFactory->createResource();
311 Akonadi2::Console::main()->log(QString("Resource factory: %1").arg((qlonglong)resourceFactory)); 310 log(QString("Resource factory: %1").arg((qlonglong)resourceFactory));
312 Akonadi2::Console::main()->log(QString("\tResource: %1").arg((qlonglong)m_resource)); 311 log(QString("\tResource: %1").arg((qlonglong)m_resource));
313 //TODO: this doesn't really list all the facades .. fix 312 //TODO: this doesn't really list all the facades .. fix
314 Akonadi2::Console::main()->log(QString("\tFacades: %1").arg(Akonadi2::FacadeFactory::instance().getFacade<Akonadi2::Domain::Event>(m_resourceName)->type())); 313 log(QString("\tFacades: %1").arg(Akonadi2::FacadeFactory::instance().getFacade<Akonadi2::Domain::Event>(m_resourceName)->type()));
315 } else { 314 } else {
316 Akonadi2::Console::main()->log(QString("Failed to load resource %1").arg(m_resourceName)); 315 log(QString("Failed to load resource %1").arg(m_resourceName));
317 } 316 }
318 //TODO: on failure ... what? 317 //TODO: on failure ... what?
318 //Enter broken state?
319}
320
321void Listener::log(const QString &message)
322{
323 Akonadi2::Console::main()->log("Listener: " + message);
319} 324}
320 325
diff --git a/synchronizer/listener.h b/synchronizer/listener.h
index 357ae37..4c35191 100644
--- a/synchronizer/listener.h
+++ b/synchronizer/listener.h
@@ -61,9 +61,6 @@ public:
61 Listener(const QString &resourceName, QObject *parent = 0); 61 Listener(const QString &resourceName, QObject *parent = 0);
62 ~Listener(); 62 ~Listener();
63 63
64 void setRevision(unsigned long long revision);
65 unsigned long long revision() const;
66
67Q_SIGNALS: 64Q_SIGNALS:
68 void noClients(); 65 void noClients();
69 66
@@ -79,15 +76,16 @@ private Q_SLOTS:
79 void refreshRevision(); 76 void refreshRevision();
80 77
81private: 78private:
79 void processCommand(int commandId, uint messageId, Client &client, uint size, const std::function<void()> &callback);
82 bool processClientBuffer(Client &client); 80 bool processClientBuffer(Client &client);
83 void sendCurrentRevision(Client &client); 81 void sendCurrentRevision(Client &client);
84 void sendCommandCompleted(Client &client, uint messageId); 82 void sendCommandCompleted(Client &client, uint messageId);
85 void updateClientsWithRevision(); 83 void updateClientsWithRevision();
86 void loadResource(); 84 void loadResource();
85 void log(const QString &);
87 86
88 QLocalServer *m_server; 87 QLocalServer *m_server;
89 QVector<Client> m_connections; 88 QVector<Client> m_connections;
90 unsigned long long m_revision;
91 flatbuffers::FlatBufferBuilder m_fbb; 89 flatbuffers::FlatBufferBuilder m_fbb;
92 const QString m_resourceName; 90 const QString m_resourceName;
93 Akonadi2::Resource *m_resource; 91 Akonadi2::Resource *m_resource;