diff options
Diffstat (limited to 'synchronizer/listener.cpp')
-rw-r--r-- | synchronizer/listener.cpp | 23 |
1 files changed, 16 insertions, 7 deletions
diff --git a/synchronizer/listener.cpp b/synchronizer/listener.cpp index 18442e7..328d4d6 100644 --- a/synchronizer/listener.cpp +++ b/synchronizer/listener.cpp | |||
@@ -38,10 +38,12 @@ Listener::Listener(const QString &resourceName, QObject *parent) | |||
38 | m_revision(0), | 38 | m_revision(0), |
39 | m_resourceName(resourceName), | 39 | m_resourceName(resourceName), |
40 | m_resource(0), | 40 | m_resource(0), |
41 | m_pipeline(new Akonadi2::Pipeline(resourceName)), | 41 | m_pipeline(new Akonadi2::Pipeline(resourceName, parent)), |
42 | m_clientBufferProcessesTimer(new QTimer(this)), | 42 | m_clientBufferProcessesTimer(new QTimer(this)), |
43 | m_messageId(0) | 43 | m_messageId(0) |
44 | { | 44 | { |
45 | connect(m_pipeline, &Akonadi2::Pipeline::revisionUpdated, | ||
46 | this, &Listener::refreshRevision); | ||
45 | connect(m_server, &QLocalServer::newConnection, | 47 | connect(m_server, &QLocalServer::newConnection, |
46 | this, &Listener::acceptConnection); | 48 | this, &Listener::acceptConnection); |
47 | Akonadi2::Console::main()->log(QString("Trying to open %1").arg(resourceName)); | 49 | Akonadi2::Console::main()->log(QString("Trying to open %1").arg(resourceName)); |
@@ -69,7 +71,6 @@ Listener::Listener(const QString &resourceName, QObject *parent) | |||
69 | 71 | ||
70 | Listener::~Listener() | 72 | Listener::~Listener() |
71 | { | 73 | { |
72 | delete m_pipeline; | ||
73 | } | 74 | } |
74 | 75 | ||
75 | void Listener::setRevision(unsigned long long revision) | 76 | void Listener::setRevision(unsigned long long revision) |
@@ -219,7 +220,7 @@ bool Listener::processClientBuffer(Client &client) | |||
219 | Akonadi2::Console::main()->log(QString("\tSynchronize request (id %1) from %2").arg(messageId).arg(client.name)); | 220 | Akonadi2::Console::main()->log(QString("\tSynchronize request (id %1) from %2").arg(messageId).arg(client.name)); |
220 | loadResource(); | 221 | loadResource(); |
221 | if (m_resource) { | 222 | if (m_resource) { |
222 | m_resource->synchronizeWithSource(); | 223 | m_resource->synchronizeWithSource(m_pipeline); |
223 | } | 224 | } |
224 | break; | 225 | break; |
225 | } | 226 | } |
@@ -227,14 +228,14 @@ bool Listener::processClientBuffer(Client &client) | |||
227 | case Akonadi2::Commands::DeleteEntityCommand: | 228 | case Akonadi2::Commands::DeleteEntityCommand: |
228 | case Akonadi2::Commands::ModifyEntityCommand: | 229 | case Akonadi2::Commands::ModifyEntityCommand: |
229 | case Akonadi2::Commands::CreateEntityCommand: | 230 | case Akonadi2::Commands::CreateEntityCommand: |
230 | Akonadi2::Console::main()->log(QString("\tCommand id %1 of type %2 from %1").arg(messageId).arg(commandId).arg(client.name)); | 231 | Akonadi2::Console::main()->log(QString("\tCommand id %1 of type %2 from %3").arg(messageId).arg(commandId).arg(client.name)); |
231 | m_resource->processCommand(messageId, commandId, client.commandBuffer, size, m_pipeline); | 232 | m_resource->processCommand(commandId, client.commandBuffer, size, m_pipeline); |
232 | break; | 233 | break; |
233 | default: | 234 | default: |
234 | if (commandId > Akonadi2::Commands::CustomCommand) { | 235 | if (commandId > Akonadi2::Commands::CustomCommand) { |
235 | loadResource(); | 236 | loadResource(); |
236 | if (m_resource) { | 237 | if (m_resource) { |
237 | m_resource->processCommand(messageId, commandId, client.commandBuffer, size, m_pipeline); | 238 | m_resource->processCommand(commandId, client.commandBuffer, size, m_pipeline); |
238 | } | 239 | } |
239 | } else { | 240 | } else { |
240 | //TODO: handle error: we don't know wtf this command is | 241 | //TODO: handle error: we don't know wtf this command is |
@@ -243,8 +244,9 @@ bool Listener::processClientBuffer(Client &client) | |||
243 | } | 244 | } |
244 | 245 | ||
245 | //TODO: async commands == async sendCommandCompleted | 246 | //TODO: async commands == async sendCommandCompleted |
246 | sendCommandCompleted(client, messageId); | 247 | Akonadi2::Console::main()->log(QString("\tCompleted command id %1 of type %2 from %3").arg(messageId).arg(commandId).arg(client.name)); |
247 | client.commandBuffer.remove(0, size); | 248 | client.commandBuffer.remove(0, size); |
249 | sendCommandCompleted(client, messageId); | ||
248 | return client.commandBuffer.size() >= headerSize; | 250 | return client.commandBuffer.size() >= headerSize; |
249 | } | 251 | } |
250 | 252 | ||
@@ -275,6 +277,13 @@ void Listener::sendCommandCompleted(Client &client, uint messageId) | |||
275 | m_fbb.Clear(); | 277 | m_fbb.Clear(); |
276 | } | 278 | } |
277 | 279 | ||
280 | void Listener::refreshRevision() | ||
281 | { | ||
282 | //TODO this should be coming out of m_pipeline->storage() | ||
283 | ++m_revision; | ||
284 | updateClientsWithRevision(); | ||
285 | } | ||
286 | |||
278 | void Listener::updateClientsWithRevision() | 287 | void Listener::updateClientsWithRevision() |
279 | { | 288 | { |
280 | auto command = Akonadi2::CreateRevisionUpdate(m_fbb, m_revision); | 289 | auto command = Akonadi2::CreateRevisionUpdate(m_fbb, m_revision); |