summaryrefslogtreecommitdiffstats
path: root/synchronizer
diff options
context:
space:
mode:
authorAaron Seigo <aseigo@kde.org>2014-12-17 08:27:31 +0100
committerAaron Seigo <aseigo@kde.org>2014-12-17 08:27:31 +0100
commit1c7e8fd482bb67a5487449948488bd286a3504c1 (patch)
tree38789d22037a0b2ed7550b60fd2280fa522c086d /synchronizer
parent7265d88245767960f1b551bb57f8f84942b898d2 (diff)
downloadsink-1c7e8fd482bb67a5487449948488bd286a3504c1.tar.gz
sink-1c7e8fd482bb67a5487449948488bd286a3504c1.zip
a basically-working Pipeline implementation
still a skeleton rather than a full body with flesh and blood, but it is getting there!
Diffstat (limited to 'synchronizer')
-rw-r--r--synchronizer/listener.cpp23
-rw-r--r--synchronizer/listener.h1
2 files changed, 17 insertions, 7 deletions
diff --git a/synchronizer/listener.cpp b/synchronizer/listener.cpp
index 18442e7..328d4d6 100644
--- a/synchronizer/listener.cpp
+++ b/synchronizer/listener.cpp
@@ -38,10 +38,12 @@ Listener::Listener(const QString &resourceName, QObject *parent)
38 m_revision(0), 38 m_revision(0),
39 m_resourceName(resourceName), 39 m_resourceName(resourceName),
40 m_resource(0), 40 m_resource(0),
41 m_pipeline(new Akonadi2::Pipeline(resourceName)), 41 m_pipeline(new Akonadi2::Pipeline(resourceName, parent)),
42 m_clientBufferProcessesTimer(new QTimer(this)), 42 m_clientBufferProcessesTimer(new QTimer(this)),
43 m_messageId(0) 43 m_messageId(0)
44{ 44{
45 connect(m_pipeline, &Akonadi2::Pipeline::revisionUpdated,
46 this, &Listener::refreshRevision);
45 connect(m_server, &QLocalServer::newConnection, 47 connect(m_server, &QLocalServer::newConnection,
46 this, &Listener::acceptConnection); 48 this, &Listener::acceptConnection);
47 Akonadi2::Console::main()->log(QString("Trying to open %1").arg(resourceName)); 49 Akonadi2::Console::main()->log(QString("Trying to open %1").arg(resourceName));
@@ -69,7 +71,6 @@ Listener::Listener(const QString &resourceName, QObject *parent)
69 71
70Listener::~Listener() 72Listener::~Listener()
71{ 73{
72 delete m_pipeline;
73} 74}
74 75
75void Listener::setRevision(unsigned long long revision) 76void Listener::setRevision(unsigned long long revision)
@@ -219,7 +220,7 @@ bool Listener::processClientBuffer(Client &client)
219 Akonadi2::Console::main()->log(QString("\tSynchronize request (id %1) from %2").arg(messageId).arg(client.name)); 220 Akonadi2::Console::main()->log(QString("\tSynchronize request (id %1) from %2").arg(messageId).arg(client.name));
220 loadResource(); 221 loadResource();
221 if (m_resource) { 222 if (m_resource) {
222 m_resource->synchronizeWithSource(); 223 m_resource->synchronizeWithSource(m_pipeline);
223 } 224 }
224 break; 225 break;
225 } 226 }
@@ -227,14 +228,14 @@ bool Listener::processClientBuffer(Client &client)
227 case Akonadi2::Commands::DeleteEntityCommand: 228 case Akonadi2::Commands::DeleteEntityCommand:
228 case Akonadi2::Commands::ModifyEntityCommand: 229 case Akonadi2::Commands::ModifyEntityCommand:
229 case Akonadi2::Commands::CreateEntityCommand: 230 case Akonadi2::Commands::CreateEntityCommand:
230 Akonadi2::Console::main()->log(QString("\tCommand id %1 of type %2 from %1").arg(messageId).arg(commandId).arg(client.name)); 231 Akonadi2::Console::main()->log(QString("\tCommand id %1 of type %2 from %3").arg(messageId).arg(commandId).arg(client.name));
231 m_resource->processCommand(messageId, commandId, client.commandBuffer, size, m_pipeline); 232 m_resource->processCommand(commandId, client.commandBuffer, size, m_pipeline);
232 break; 233 break;
233 default: 234 default:
234 if (commandId > Akonadi2::Commands::CustomCommand) { 235 if (commandId > Akonadi2::Commands::CustomCommand) {
235 loadResource(); 236 loadResource();
236 if (m_resource) { 237 if (m_resource) {
237 m_resource->processCommand(messageId, commandId, client.commandBuffer, size, m_pipeline); 238 m_resource->processCommand(commandId, client.commandBuffer, size, m_pipeline);
238 } 239 }
239 } else { 240 } else {
240 //TODO: handle error: we don't know wtf this command is 241 //TODO: handle error: we don't know wtf this command is
@@ -243,8 +244,9 @@ bool Listener::processClientBuffer(Client &client)
243 } 244 }
244 245
245 //TODO: async commands == async sendCommandCompleted 246 //TODO: async commands == async sendCommandCompleted
246 sendCommandCompleted(client, messageId); 247 Akonadi2::Console::main()->log(QString("\tCompleted command id %1 of type %2 from %3").arg(messageId).arg(commandId).arg(client.name));
247 client.commandBuffer.remove(0, size); 248 client.commandBuffer.remove(0, size);
249 sendCommandCompleted(client, messageId);
248 return client.commandBuffer.size() >= headerSize; 250 return client.commandBuffer.size() >= headerSize;
249 } 251 }
250 252
@@ -275,6 +277,13 @@ void Listener::sendCommandCompleted(Client &client, uint messageId)
275 m_fbb.Clear(); 277 m_fbb.Clear();
276} 278}
277 279
280void Listener::refreshRevision()
281{
282 //TODO this should be coming out of m_pipeline->storage()
283 ++m_revision;
284 updateClientsWithRevision();
285}
286
278void Listener::updateClientsWithRevision() 287void Listener::updateClientsWithRevision()
279{ 288{
280 auto command = Akonadi2::CreateRevisionUpdate(m_fbb, m_revision); 289 auto command = Akonadi2::CreateRevisionUpdate(m_fbb, m_revision);
diff --git a/synchronizer/listener.h b/synchronizer/listener.h
index b294277..357ae37 100644
--- a/synchronizer/listener.h
+++ b/synchronizer/listener.h
@@ -76,6 +76,7 @@ private Q_SLOTS:
76 void checkConnections(); 76 void checkConnections();
77 void readFromSocket(); 77 void readFromSocket();
78 void processClientBuffers(); 78 void processClientBuffers();
79 void refreshRevision();
79 80
80private: 81private:
81 bool processClientBuffer(Client &client); 82 bool processClientBuffer(Client &client);