diff options
author | Aaron Seigo <aseigo@kde.org> | 2014-12-17 08:27:31 +0100 |
---|---|---|
committer | Aaron Seigo <aseigo@kde.org> | 2014-12-17 08:27:31 +0100 |
commit | 1c7e8fd482bb67a5487449948488bd286a3504c1 (patch) | |
tree | 38789d22037a0b2ed7550b60fd2280fa522c086d /synchronizer | |
parent | 7265d88245767960f1b551bb57f8f84942b898d2 (diff) | |
download | sink-1c7e8fd482bb67a5487449948488bd286a3504c1.tar.gz sink-1c7e8fd482bb67a5487449948488bd286a3504c1.zip |
a basically-working Pipeline implementation
still a skeleton rather than a full body with flesh and blood, but
it is getting there!
Diffstat (limited to 'synchronizer')
-rw-r--r-- | synchronizer/listener.cpp | 23 | ||||
-rw-r--r-- | synchronizer/listener.h | 1 |
2 files changed, 17 insertions, 7 deletions
diff --git a/synchronizer/listener.cpp b/synchronizer/listener.cpp index 18442e7..328d4d6 100644 --- a/synchronizer/listener.cpp +++ b/synchronizer/listener.cpp | |||
@@ -38,10 +38,12 @@ Listener::Listener(const QString &resourceName, QObject *parent) | |||
38 | m_revision(0), | 38 | m_revision(0), |
39 | m_resourceName(resourceName), | 39 | m_resourceName(resourceName), |
40 | m_resource(0), | 40 | m_resource(0), |
41 | m_pipeline(new Akonadi2::Pipeline(resourceName)), | 41 | m_pipeline(new Akonadi2::Pipeline(resourceName, parent)), |
42 | m_clientBufferProcessesTimer(new QTimer(this)), | 42 | m_clientBufferProcessesTimer(new QTimer(this)), |
43 | m_messageId(0) | 43 | m_messageId(0) |
44 | { | 44 | { |
45 | connect(m_pipeline, &Akonadi2::Pipeline::revisionUpdated, | ||
46 | this, &Listener::refreshRevision); | ||
45 | connect(m_server, &QLocalServer::newConnection, | 47 | connect(m_server, &QLocalServer::newConnection, |
46 | this, &Listener::acceptConnection); | 48 | this, &Listener::acceptConnection); |
47 | Akonadi2::Console::main()->log(QString("Trying to open %1").arg(resourceName)); | 49 | Akonadi2::Console::main()->log(QString("Trying to open %1").arg(resourceName)); |
@@ -69,7 +71,6 @@ Listener::Listener(const QString &resourceName, QObject *parent) | |||
69 | 71 | ||
70 | Listener::~Listener() | 72 | Listener::~Listener() |
71 | { | 73 | { |
72 | delete m_pipeline; | ||
73 | } | 74 | } |
74 | 75 | ||
75 | void Listener::setRevision(unsigned long long revision) | 76 | void Listener::setRevision(unsigned long long revision) |
@@ -219,7 +220,7 @@ bool Listener::processClientBuffer(Client &client) | |||
219 | Akonadi2::Console::main()->log(QString("\tSynchronize request (id %1) from %2").arg(messageId).arg(client.name)); | 220 | Akonadi2::Console::main()->log(QString("\tSynchronize request (id %1) from %2").arg(messageId).arg(client.name)); |
220 | loadResource(); | 221 | loadResource(); |
221 | if (m_resource) { | 222 | if (m_resource) { |
222 | m_resource->synchronizeWithSource(); | 223 | m_resource->synchronizeWithSource(m_pipeline); |
223 | } | 224 | } |
224 | break; | 225 | break; |
225 | } | 226 | } |
@@ -227,14 +228,14 @@ bool Listener::processClientBuffer(Client &client) | |||
227 | case Akonadi2::Commands::DeleteEntityCommand: | 228 | case Akonadi2::Commands::DeleteEntityCommand: |
228 | case Akonadi2::Commands::ModifyEntityCommand: | 229 | case Akonadi2::Commands::ModifyEntityCommand: |
229 | case Akonadi2::Commands::CreateEntityCommand: | 230 | case Akonadi2::Commands::CreateEntityCommand: |
230 | Akonadi2::Console::main()->log(QString("\tCommand id %1 of type %2 from %1").arg(messageId).arg(commandId).arg(client.name)); | 231 | Akonadi2::Console::main()->log(QString("\tCommand id %1 of type %2 from %3").arg(messageId).arg(commandId).arg(client.name)); |
231 | m_resource->processCommand(messageId, commandId, client.commandBuffer, size, m_pipeline); | 232 | m_resource->processCommand(commandId, client.commandBuffer, size, m_pipeline); |
232 | break; | 233 | break; |
233 | default: | 234 | default: |
234 | if (commandId > Akonadi2::Commands::CustomCommand) { | 235 | if (commandId > Akonadi2::Commands::CustomCommand) { |
235 | loadResource(); | 236 | loadResource(); |
236 | if (m_resource) { | 237 | if (m_resource) { |
237 | m_resource->processCommand(messageId, commandId, client.commandBuffer, size, m_pipeline); | 238 | m_resource->processCommand(commandId, client.commandBuffer, size, m_pipeline); |
238 | } | 239 | } |
239 | } else { | 240 | } else { |
240 | //TODO: handle error: we don't know wtf this command is | 241 | //TODO: handle error: we don't know wtf this command is |
@@ -243,8 +244,9 @@ bool Listener::processClientBuffer(Client &client) | |||
243 | } | 244 | } |
244 | 245 | ||
245 | //TODO: async commands == async sendCommandCompleted | 246 | //TODO: async commands == async sendCommandCompleted |
246 | sendCommandCompleted(client, messageId); | 247 | Akonadi2::Console::main()->log(QString("\tCompleted command id %1 of type %2 from %3").arg(messageId).arg(commandId).arg(client.name)); |
247 | client.commandBuffer.remove(0, size); | 248 | client.commandBuffer.remove(0, size); |
249 | sendCommandCompleted(client, messageId); | ||
248 | return client.commandBuffer.size() >= headerSize; | 250 | return client.commandBuffer.size() >= headerSize; |
249 | } | 251 | } |
250 | 252 | ||
@@ -275,6 +277,13 @@ void Listener::sendCommandCompleted(Client &client, uint messageId) | |||
275 | m_fbb.Clear(); | 277 | m_fbb.Clear(); |
276 | } | 278 | } |
277 | 279 | ||
280 | void Listener::refreshRevision() | ||
281 | { | ||
282 | //TODO this should be coming out of m_pipeline->storage() | ||
283 | ++m_revision; | ||
284 | updateClientsWithRevision(); | ||
285 | } | ||
286 | |||
278 | void Listener::updateClientsWithRevision() | 287 | void Listener::updateClientsWithRevision() |
279 | { | 288 | { |
280 | auto command = Akonadi2::CreateRevisionUpdate(m_fbb, m_revision); | 289 | auto command = Akonadi2::CreateRevisionUpdate(m_fbb, m_revision); |
diff --git a/synchronizer/listener.h b/synchronizer/listener.h index b294277..357ae37 100644 --- a/synchronizer/listener.h +++ b/synchronizer/listener.h | |||
@@ -76,6 +76,7 @@ private Q_SLOTS: | |||
76 | void checkConnections(); | 76 | void checkConnections(); |
77 | void readFromSocket(); | 77 | void readFromSocket(); |
78 | void processClientBuffers(); | 78 | void processClientBuffers(); |
79 | void refreshRevision(); | ||
79 | 80 | ||
80 | private: | 81 | private: |
81 | bool processClientBuffer(Client &client); | 82 | bool processClientBuffer(Client &client); |