summaryrefslogtreecommitdiffstats
path: root/synchronizer/listener.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'synchronizer/listener.cpp')
-rw-r--r--synchronizer/listener.cpp23
1 files changed, 16 insertions, 7 deletions
diff --git a/synchronizer/listener.cpp b/synchronizer/listener.cpp
index 18442e7..328d4d6 100644
--- a/synchronizer/listener.cpp
+++ b/synchronizer/listener.cpp
@@ -38,10 +38,12 @@ Listener::Listener(const QString &resourceName, QObject *parent)
38 m_revision(0), 38 m_revision(0),
39 m_resourceName(resourceName), 39 m_resourceName(resourceName),
40 m_resource(0), 40 m_resource(0),
41 m_pipeline(new Akonadi2::Pipeline(resourceName)), 41 m_pipeline(new Akonadi2::Pipeline(resourceName, parent)),
42 m_clientBufferProcessesTimer(new QTimer(this)), 42 m_clientBufferProcessesTimer(new QTimer(this)),
43 m_messageId(0) 43 m_messageId(0)
44{ 44{
45 connect(m_pipeline, &Akonadi2::Pipeline::revisionUpdated,
46 this, &Listener::refreshRevision);
45 connect(m_server, &QLocalServer::newConnection, 47 connect(m_server, &QLocalServer::newConnection,
46 this, &Listener::acceptConnection); 48 this, &Listener::acceptConnection);
47 Akonadi2::Console::main()->log(QString("Trying to open %1").arg(resourceName)); 49 Akonadi2::Console::main()->log(QString("Trying to open %1").arg(resourceName));
@@ -69,7 +71,6 @@ Listener::Listener(const QString &resourceName, QObject *parent)
69 71
70Listener::~Listener() 72Listener::~Listener()
71{ 73{
72 delete m_pipeline;
73} 74}
74 75
75void Listener::setRevision(unsigned long long revision) 76void Listener::setRevision(unsigned long long revision)
@@ -219,7 +220,7 @@ bool Listener::processClientBuffer(Client &client)
219 Akonadi2::Console::main()->log(QString("\tSynchronize request (id %1) from %2").arg(messageId).arg(client.name)); 220 Akonadi2::Console::main()->log(QString("\tSynchronize request (id %1) from %2").arg(messageId).arg(client.name));
220 loadResource(); 221 loadResource();
221 if (m_resource) { 222 if (m_resource) {
222 m_resource->synchronizeWithSource(); 223 m_resource->synchronizeWithSource(m_pipeline);
223 } 224 }
224 break; 225 break;
225 } 226 }
@@ -227,14 +228,14 @@ bool Listener::processClientBuffer(Client &client)
227 case Akonadi2::Commands::DeleteEntityCommand: 228 case Akonadi2::Commands::DeleteEntityCommand:
228 case Akonadi2::Commands::ModifyEntityCommand: 229 case Akonadi2::Commands::ModifyEntityCommand:
229 case Akonadi2::Commands::CreateEntityCommand: 230 case Akonadi2::Commands::CreateEntityCommand:
230 Akonadi2::Console::main()->log(QString("\tCommand id %1 of type %2 from %1").arg(messageId).arg(commandId).arg(client.name)); 231 Akonadi2::Console::main()->log(QString("\tCommand id %1 of type %2 from %3").arg(messageId).arg(commandId).arg(client.name));
231 m_resource->processCommand(messageId, commandId, client.commandBuffer, size, m_pipeline); 232 m_resource->processCommand(commandId, client.commandBuffer, size, m_pipeline);
232 break; 233 break;
233 default: 234 default:
234 if (commandId > Akonadi2::Commands::CustomCommand) { 235 if (commandId > Akonadi2::Commands::CustomCommand) {
235 loadResource(); 236 loadResource();
236 if (m_resource) { 237 if (m_resource) {
237 m_resource->processCommand(messageId, commandId, client.commandBuffer, size, m_pipeline); 238 m_resource->processCommand(commandId, client.commandBuffer, size, m_pipeline);
238 } 239 }
239 } else { 240 } else {
240 //TODO: handle error: we don't know wtf this command is 241 //TODO: handle error: we don't know wtf this command is
@@ -243,8 +244,9 @@ bool Listener::processClientBuffer(Client &client)
243 } 244 }
244 245
245 //TODO: async commands == async sendCommandCompleted 246 //TODO: async commands == async sendCommandCompleted
246 sendCommandCompleted(client, messageId); 247 Akonadi2::Console::main()->log(QString("\tCompleted command id %1 of type %2 from %3").arg(messageId).arg(commandId).arg(client.name));
247 client.commandBuffer.remove(0, size); 248 client.commandBuffer.remove(0, size);
249 sendCommandCompleted(client, messageId);
248 return client.commandBuffer.size() >= headerSize; 250 return client.commandBuffer.size() >= headerSize;
249 } 251 }
250 252
@@ -275,6 +277,13 @@ void Listener::sendCommandCompleted(Client &client, uint messageId)
275 m_fbb.Clear(); 277 m_fbb.Clear();
276} 278}
277 279
280void Listener::refreshRevision()
281{
282 //TODO this should be coming out of m_pipeline->storage()
283 ++m_revision;
284 updateClientsWithRevision();
285}
286
278void Listener::updateClientsWithRevision() 287void Listener::updateClientsWithRevision()
279{ 288{
280 auto command = Akonadi2::CreateRevisionUpdate(m_fbb, m_revision); 289 auto command = Akonadi2::CreateRevisionUpdate(m_fbb, m_revision);