diff options
Diffstat (limited to 'synchronizer')
-rw-r--r-- | synchronizer/listener.cpp | 155 | ||||
-rw-r--r-- | synchronizer/listener.h | 6 |
2 files changed, 82 insertions, 79 deletions
diff --git a/synchronizer/listener.cpp b/synchronizer/listener.cpp index 328d4d6..8e94213 100644 --- a/synchronizer/listener.cpp +++ b/synchronizer/listener.cpp | |||
@@ -35,7 +35,6 @@ | |||
35 | Listener::Listener(const QString &resourceName, QObject *parent) | 35 | Listener::Listener(const QString &resourceName, QObject *parent) |
36 | : QObject(parent), | 36 | : QObject(parent), |
37 | m_server(new QLocalServer(this)), | 37 | m_server(new QLocalServer(this)), |
38 | m_revision(0), | ||
39 | m_resourceName(resourceName), | 38 | m_resourceName(resourceName), |
40 | m_resource(0), | 39 | m_resource(0), |
41 | m_pipeline(new Akonadi2::Pipeline(resourceName, parent)), | 40 | m_pipeline(new Akonadi2::Pipeline(resourceName, parent)), |
@@ -46,18 +45,18 @@ Listener::Listener(const QString &resourceName, QObject *parent) | |||
46 | this, &Listener::refreshRevision); | 45 | this, &Listener::refreshRevision); |
47 | connect(m_server, &QLocalServer::newConnection, | 46 | connect(m_server, &QLocalServer::newConnection, |
48 | this, &Listener::acceptConnection); | 47 | this, &Listener::acceptConnection); |
49 | Akonadi2::Console::main()->log(QString("Trying to open %1").arg(resourceName)); | 48 | log(QString("Trying to open %1").arg(resourceName)); |
50 | if (!m_server->listen(resourceName)) { | 49 | if (!m_server->listen(resourceName)) { |
51 | // FIXME: multiple starts need to be handled here | 50 | // FIXME: multiple starts need to be handled here |
52 | m_server->removeServer(resourceName); | 51 | m_server->removeServer(resourceName); |
53 | if (!m_server->listen(resourceName)) { | 52 | if (!m_server->listen(resourceName)) { |
54 | Akonadi2::Console::main()->log("Utter failure to start server"); | 53 | log("Utter failure to start server"); |
55 | exit(-1); | 54 | exit(-1); |
56 | } | 55 | } |
57 | } | 56 | } |
58 | 57 | ||
59 | if (m_server->isListening()) { | 58 | if (m_server->isListening()) { |
60 | Akonadi2::Console::main()->log(QString("Listening on %1").arg(m_server->serverName())); | 59 | log(QString("Listening on %1").arg(m_server->serverName())); |
61 | } | 60 | } |
62 | 61 | ||
63 | //TODO: experiment with different timeouts | 62 | //TODO: experiment with different timeouts |
@@ -73,19 +72,6 @@ Listener::~Listener() | |||
73 | { | 72 | { |
74 | } | 73 | } |
75 | 74 | ||
76 | void Listener::setRevision(unsigned long long revision) | ||
77 | { | ||
78 | if (m_revision != revision) { | ||
79 | m_revision = revision; | ||
80 | updateClientsWithRevision(); | ||
81 | } | ||
82 | } | ||
83 | |||
84 | unsigned long long Listener::revision() const | ||
85 | { | ||
86 | return m_revision; | ||
87 | } | ||
88 | |||
89 | void Listener::closeAllConnections() | 75 | void Listener::closeAllConnections() |
90 | { | 76 | { |
91 | for (Client &client: m_connections) { | 77 | for (Client &client: m_connections) { |
@@ -101,14 +87,14 @@ void Listener::closeAllConnections() | |||
101 | 87 | ||
102 | void Listener::acceptConnection() | 88 | void Listener::acceptConnection() |
103 | { | 89 | { |
104 | Akonadi2::Console::main()->log(QString("Accepting connection")); | 90 | log(QString("Accepting connection")); |
105 | QLocalSocket *socket = m_server->nextPendingConnection(); | 91 | QLocalSocket *socket = m_server->nextPendingConnection(); |
106 | 92 | ||
107 | if (!socket) { | 93 | if (!socket) { |
108 | return; | 94 | return; |
109 | } | 95 | } |
110 | 96 | ||
111 | Akonadi2::Console::main()->log("Got a connection"); | 97 | log("Got a connection"); |
112 | Client client("Unknown Client", socket); | 98 | Client client("Unknown Client", socket); |
113 | connect(socket, &QIODevice::readyRead, | 99 | connect(socket, &QIODevice::readyRead, |
114 | this, &Listener::readFromSocket); | 100 | this, &Listener::readFromSocket); |
@@ -125,12 +111,12 @@ void Listener::clientDropped() | |||
125 | return; | 111 | return; |
126 | } | 112 | } |
127 | 113 | ||
128 | Akonadi2::Console::main()->log("Dropping connection..."); | 114 | log("Dropping connection..."); |
129 | QMutableVectorIterator<Client> it(m_connections); | 115 | QMutableVectorIterator<Client> it(m_connections); |
130 | while (it.hasNext()) { | 116 | while (it.hasNext()) { |
131 | const Client &client = it.next(); | 117 | const Client &client = it.next(); |
132 | if (client.socket == socket) { | 118 | if (client.socket == socket) { |
133 | Akonadi2::Console::main()->log(QString(" dropped... %1").arg(client.name)); | 119 | log(QString(" dropped... %1").arg(client.name)); |
134 | it.remove(); | 120 | it.remove(); |
135 | break; | 121 | break; |
136 | } | 122 | } |
@@ -154,7 +140,7 @@ void Listener::readFromSocket() | |||
154 | return; | 140 | return; |
155 | } | 141 | } |
156 | 142 | ||
157 | Akonadi2::Console::main()->log("Reading from socket..."); | 143 | log("Reading from socket..."); |
158 | for (Client &client: m_connections) { | 144 | for (Client &client: m_connections) { |
159 | if (client.socket == socket) { | 145 | if (client.socket == socket) { |
160 | client.commandBuffer += socket->readAll(); | 146 | client.commandBuffer += socket->readAll(); |
@@ -188,6 +174,57 @@ void Listener::processClientBuffers() | |||
188 | } | 174 | } |
189 | } | 175 | } |
190 | 176 | ||
177 | void Listener::processCommand(int commandId, uint messageId, Client &client, uint size, const std::function<void()> &callback) | ||
178 | { | ||
179 | switch (commandId) { | ||
180 | case Akonadi2::Commands::HandshakeCommand: { | ||
181 | flatbuffers::Verifier verifier((const uint8_t *)client.commandBuffer.constData(), size); | ||
182 | if (Akonadi2::VerifyHandshakeBuffer(verifier)) { | ||
183 | auto buffer = Akonadi2::GetHandshake(client.commandBuffer.constData()); | ||
184 | client.name = buffer->name()->c_str(); | ||
185 | sendCurrentRevision(client); | ||
186 | } else { | ||
187 | qWarning() << "received invalid command"; | ||
188 | } | ||
189 | break; | ||
190 | } | ||
191 | case Akonadi2::Commands::SynchronizeCommand: { | ||
192 | log(QString("\tSynchronize request (id %1) from %2").arg(messageId).arg(client.name)); | ||
193 | loadResource(); | ||
194 | if (m_resource) { | ||
195 | qDebug() << "synchronizing"; | ||
196 | m_resource->synchronizeWithSource(m_pipeline).then<void>([callback](Async::Future<void> &f){ | ||
197 | //FIXME: the command is complete once all pipelines have been drained, track progress and async emit result | ||
198 | callback(); | ||
199 | f.setFinished(); | ||
200 | }).exec(); | ||
201 | return; | ||
202 | } else { | ||
203 | qWarning() << "No resource loaded"; | ||
204 | } | ||
205 | break; | ||
206 | } | ||
207 | case Akonadi2::Commands::FetchEntityCommand: | ||
208 | case Akonadi2::Commands::DeleteEntityCommand: | ||
209 | case Akonadi2::Commands::ModifyEntityCommand: | ||
210 | case Akonadi2::Commands::CreateEntityCommand: | ||
211 | log(QString("\tCommand id %1 of type %2 from %3").arg(messageId).arg(commandId).arg(client.name)); | ||
212 | m_resource->processCommand(commandId, client.commandBuffer, size, m_pipeline); | ||
213 | break; | ||
214 | default: | ||
215 | if (commandId > Akonadi2::Commands::CustomCommand) { | ||
216 | loadResource(); | ||
217 | if (m_resource) { | ||
218 | m_resource->processCommand(commandId, client.commandBuffer, size, m_pipeline); | ||
219 | } | ||
220 | } else { | ||
221 | //TODO: handle error: we don't know wtf this command is | ||
222 | } | ||
223 | break; | ||
224 | } | ||
225 | callback(); | ||
226 | } | ||
227 | |||
191 | bool Listener::processClientBuffer(Client &client) | 228 | bool Listener::processClientBuffer(Client &client) |
192 | { | 229 | { |
193 | static const int headerSize = Akonadi2::Commands::headerSize(); | 230 | static const int headerSize = Akonadi2::Commands::headerSize(); |
@@ -195,58 +232,22 @@ bool Listener::processClientBuffer(Client &client) | |||
195 | return false; | 232 | return false; |
196 | } | 233 | } |
197 | 234 | ||
198 | int commandId; | 235 | const uint messageId = *(uint*)client.commandBuffer.constData(); |
199 | uint messageId, size; | 236 | const int commandId = *(int*)(client.commandBuffer.constData() + sizeof(uint)); |
200 | messageId = *(uint*)client.commandBuffer.constData(); | 237 | const uint size = *(uint*)(client.commandBuffer.constData() + sizeof(int) + sizeof(uint)); |
201 | commandId = *(int*)(client.commandBuffer.constData() + sizeof(uint)); | ||
202 | size = *(uint*)(client.commandBuffer.constData() + sizeof(int) + sizeof(uint)); | ||
203 | 238 | ||
204 | //TODO: reject messages above a certain size? | 239 | //TODO: reject messages above a certain size? |
205 | 240 | ||
206 | if (size <= uint(client.commandBuffer.size() - headerSize)) { | 241 | if (size <= uint(client.commandBuffer.size() - headerSize)) { |
207 | client.commandBuffer.remove(0, headerSize); | 242 | client.commandBuffer.remove(0, headerSize); |
208 | 243 | ||
209 | switch (commandId) { | 244 | processCommand(commandId, messageId, client, size, [this, messageId, commandId, &client]() { |
210 | case Akonadi2::Commands::HandshakeCommand: { | 245 | log(QString("\tCompleted command messageid %1 of type %2 from %3").arg(messageId).arg(commandId).arg(client.name)); |
211 | flatbuffers::Verifier verifier((const uint8_t *)client.commandBuffer.constData(), size); | 246 | //FIXME, client needs to become a shared pointer and not a reference, or we have to search through m_connections everytime. |
212 | if (Akonadi2::VerifyHandshakeBuffer(verifier)) { | 247 | sendCommandCompleted(client, messageId); |
213 | auto buffer = Akonadi2::GetHandshake(client.commandBuffer.constData()); | 248 | }); |
214 | client.name = buffer->name()->c_str(); | ||
215 | sendCurrentRevision(client); | ||
216 | } | ||
217 | break; | ||
218 | } | ||
219 | case Akonadi2::Commands::SynchronizeCommand: { | ||
220 | Akonadi2::Console::main()->log(QString("\tSynchronize request (id %1) from %2").arg(messageId).arg(client.name)); | ||
221 | loadResource(); | ||
222 | if (m_resource) { | ||
223 | m_resource->synchronizeWithSource(m_pipeline); | ||
224 | } | ||
225 | break; | ||
226 | } | ||
227 | case Akonadi2::Commands::FetchEntityCommand: | ||
228 | case Akonadi2::Commands::DeleteEntityCommand: | ||
229 | case Akonadi2::Commands::ModifyEntityCommand: | ||
230 | case Akonadi2::Commands::CreateEntityCommand: | ||
231 | Akonadi2::Console::main()->log(QString("\tCommand id %1 of type %2 from %3").arg(messageId).arg(commandId).arg(client.name)); | ||
232 | m_resource->processCommand(commandId, client.commandBuffer, size, m_pipeline); | ||
233 | break; | ||
234 | default: | ||
235 | if (commandId > Akonadi2::Commands::CustomCommand) { | ||
236 | loadResource(); | ||
237 | if (m_resource) { | ||
238 | m_resource->processCommand(commandId, client.commandBuffer, size, m_pipeline); | ||
239 | } | ||
240 | } else { | ||
241 | //TODO: handle error: we don't know wtf this command is | ||
242 | } | ||
243 | break; | ||
244 | } | ||
245 | |||
246 | //TODO: async commands == async sendCommandCompleted | ||
247 | Akonadi2::Console::main()->log(QString("\tCompleted command id %1 of type %2 from %3").arg(messageId).arg(commandId).arg(client.name)); | ||
248 | client.commandBuffer.remove(0, size); | 249 | client.commandBuffer.remove(0, size); |
249 | sendCommandCompleted(client, messageId); | 250 | |
250 | return client.commandBuffer.size() >= headerSize; | 251 | return client.commandBuffer.size() >= headerSize; |
251 | } | 252 | } |
252 | 253 | ||
@@ -259,7 +260,7 @@ void Listener::sendCurrentRevision(Client &client) | |||
259 | return; | 260 | return; |
260 | } | 261 | } |
261 | 262 | ||
262 | auto command = Akonadi2::CreateRevisionUpdate(m_fbb, m_revision); | 263 | auto command = Akonadi2::CreateRevisionUpdate(m_fbb, m_pipeline->storage().maxRevision()); |
263 | Akonadi2::FinishRevisionUpdateBuffer(m_fbb, command); | 264 | Akonadi2::FinishRevisionUpdateBuffer(m_fbb, command); |
264 | Akonadi2::Commands::write(client.socket, ++m_messageId, Akonadi2::Commands::RevisionUpdateCommand, m_fbb); | 265 | Akonadi2::Commands::write(client.socket, ++m_messageId, Akonadi2::Commands::RevisionUpdateCommand, m_fbb); |
265 | m_fbb.Clear(); | 266 | m_fbb.Clear(); |
@@ -279,14 +280,12 @@ void Listener::sendCommandCompleted(Client &client, uint messageId) | |||
279 | 280 | ||
280 | void Listener::refreshRevision() | 281 | void Listener::refreshRevision() |
281 | { | 282 | { |
282 | //TODO this should be coming out of m_pipeline->storage() | ||
283 | ++m_revision; | ||
284 | updateClientsWithRevision(); | 283 | updateClientsWithRevision(); |
285 | } | 284 | } |
286 | 285 | ||
287 | void Listener::updateClientsWithRevision() | 286 | void Listener::updateClientsWithRevision() |
288 | { | 287 | { |
289 | auto command = Akonadi2::CreateRevisionUpdate(m_fbb, m_revision); | 288 | auto command = Akonadi2::CreateRevisionUpdate(m_fbb, m_pipeline->storage().maxRevision()); |
290 | Akonadi2::FinishRevisionUpdateBuffer(m_fbb, command); | 289 | Akonadi2::FinishRevisionUpdateBuffer(m_fbb, command); |
291 | 290 | ||
292 | for (const Client &client: m_connections) { | 291 | for (const Client &client: m_connections) { |
@@ -308,13 +307,19 @@ void Listener::loadResource() | |||
308 | Akonadi2::ResourceFactory *resourceFactory = Akonadi2::ResourceFactory::load(m_resourceName); | 307 | Akonadi2::ResourceFactory *resourceFactory = Akonadi2::ResourceFactory::load(m_resourceName); |
309 | if (resourceFactory) { | 308 | if (resourceFactory) { |
310 | m_resource = resourceFactory->createResource(); | 309 | m_resource = resourceFactory->createResource(); |
311 | Akonadi2::Console::main()->log(QString("Resource factory: %1").arg((qlonglong)resourceFactory)); | 310 | log(QString("Resource factory: %1").arg((qlonglong)resourceFactory)); |
312 | Akonadi2::Console::main()->log(QString("\tResource: %1").arg((qlonglong)m_resource)); | 311 | log(QString("\tResource: %1").arg((qlonglong)m_resource)); |
313 | //TODO: this doesn't really list all the facades .. fix | 312 | //TODO: this doesn't really list all the facades .. fix |
314 | Akonadi2::Console::main()->log(QString("\tFacades: %1").arg(Akonadi2::FacadeFactory::instance().getFacade<Akonadi2::Domain::Event>(m_resourceName)->type())); | 313 | log(QString("\tFacades: %1").arg(Akonadi2::FacadeFactory::instance().getFacade<Akonadi2::Domain::Event>(m_resourceName)->type())); |
315 | } else { | 314 | } else { |
316 | Akonadi2::Console::main()->log(QString("Failed to load resource %1").arg(m_resourceName)); | 315 | log(QString("Failed to load resource %1").arg(m_resourceName)); |
317 | } | 316 | } |
318 | //TODO: on failure ... what? | 317 | //TODO: on failure ... what? |
318 | //Enter broken state? | ||
319 | } | ||
320 | |||
321 | void Listener::log(const QString &message) | ||
322 | { | ||
323 | Akonadi2::Console::main()->log("Listener: " + message); | ||
319 | } | 324 | } |
320 | 325 | ||
diff --git a/synchronizer/listener.h b/synchronizer/listener.h index 357ae37..4c35191 100644 --- a/synchronizer/listener.h +++ b/synchronizer/listener.h | |||
@@ -61,9 +61,6 @@ public: | |||
61 | Listener(const QString &resourceName, QObject *parent = 0); | 61 | Listener(const QString &resourceName, QObject *parent = 0); |
62 | ~Listener(); | 62 | ~Listener(); |
63 | 63 | ||
64 | void setRevision(unsigned long long revision); | ||
65 | unsigned long long revision() const; | ||
66 | |||
67 | Q_SIGNALS: | 64 | Q_SIGNALS: |
68 | void noClients(); | 65 | void noClients(); |
69 | 66 | ||
@@ -79,15 +76,16 @@ private Q_SLOTS: | |||
79 | void refreshRevision(); | 76 | void refreshRevision(); |
80 | 77 | ||
81 | private: | 78 | private: |
79 | void processCommand(int commandId, uint messageId, Client &client, uint size, const std::function<void()> &callback); | ||
82 | bool processClientBuffer(Client &client); | 80 | bool processClientBuffer(Client &client); |
83 | void sendCurrentRevision(Client &client); | 81 | void sendCurrentRevision(Client &client); |
84 | void sendCommandCompleted(Client &client, uint messageId); | 82 | void sendCommandCompleted(Client &client, uint messageId); |
85 | void updateClientsWithRevision(); | 83 | void updateClientsWithRevision(); |
86 | void loadResource(); | 84 | void loadResource(); |
85 | void log(const QString &); | ||
87 | 86 | ||
88 | QLocalServer *m_server; | 87 | QLocalServer *m_server; |
89 | QVector<Client> m_connections; | 88 | QVector<Client> m_connections; |
90 | unsigned long long m_revision; | ||
91 | flatbuffers::FlatBufferBuilder m_fbb; | 89 | flatbuffers::FlatBufferBuilder m_fbb; |
92 | const QString m_resourceName; | 90 | const QString m_resourceName; |
93 | Akonadi2::Resource *m_resource; | 91 | Akonadi2::Resource *m_resource; |