diff options
author | Aaron Seigo <aseigo@kde.org> | 2014-12-16 12:14:38 +0100 |
---|---|---|
committer | Aaron Seigo <aseigo@kde.org> | 2014-12-16 12:14:38 +0100 |
commit | 8ae840ac13d9e2161b411fbceae281a725fa8b1f (patch) | |
tree | 743fbaf8ba404db97995b336db27228ce7f6eff4 /synchronizer | |
parent | ce9498597c789453db5f3b770f0f21fc119f404f (diff) | |
download | sink-8ae840ac13d9e2161b411fbceae281a725fa8b1f.tar.gz sink-8ae840ac13d9e2161b411fbceae281a725fa8b1f.zip |
load the resource on first command .. it LIIIIVES!
Diffstat (limited to 'synchronizer')
-rw-r--r-- | synchronizer/listener.cpp | 66 | ||||
-rw-r--r-- | synchronizer/listener.h | 16 |
2 files changed, 61 insertions, 21 deletions
diff --git a/synchronizer/listener.cpp b/synchronizer/listener.cpp index 1d20c4d..ef9738e 100644 --- a/synchronizer/listener.cpp +++ b/synchronizer/listener.cpp | |||
@@ -19,26 +19,30 @@ | |||
19 | 19 | ||
20 | #include "listener.h" | 20 | #include "listener.h" |
21 | 21 | ||
22 | #include "common/clientapi.h" | ||
22 | #include "common/console.h" | 23 | #include "common/console.h" |
23 | #include "common/commands.h" | 24 | #include "common/commands.h" |
24 | #include "common/handshake_generated.h" | 25 | #include "common/handshake_generated.h" |
26 | #include "common/resource.h" | ||
25 | #include "common/revisionupdate_generated.h" | 27 | #include "common/revisionupdate_generated.h" |
26 | 28 | ||
27 | #include <QLocalSocket> | 29 | #include <QLocalSocket> |
28 | #include <QTimer> | 30 | #include <QTimer> |
29 | 31 | ||
30 | Listener::Listener(const QString &resource, QObject *parent) | 32 | Listener::Listener(const QString &resourceName, QObject *parent) |
31 | : QObject(parent), | 33 | : QObject(parent), |
32 | m_server(new QLocalServer(this)), | 34 | m_server(new QLocalServer(this)), |
33 | m_revision(0) | 35 | m_revision(0), |
36 | m_resourceName(resourceName), | ||
37 | m_resource(0) | ||
34 | { | 38 | { |
35 | connect(m_server, &QLocalServer::newConnection, | 39 | connect(m_server, &QLocalServer::newConnection, |
36 | this, &Listener::acceptConnection); | 40 | this, &Listener::acceptConnection); |
37 | Akonadi2::Console::main()->log(QString("Trying to open %1").arg(resource)); | 41 | Akonadi2::Console::main()->log(QString("Trying to open %1").arg(resourceName)); |
38 | if (!m_server->listen(resource)) { | 42 | if (!m_server->listen(resourceName)) { |
39 | // FIXME: multiple starts need to be handled here | 43 | // FIXME: multiple starts need to be handled here |
40 | m_server->removeServer(resource); | 44 | m_server->removeServer(resourceName); |
41 | if (!m_server->listen(resource)) { | 45 | if (!m_server->listen(resourceName)) { |
42 | Akonadi2::Console::main()->log("Utter failure to start server"); | 46 | Akonadi2::Console::main()->log("Utter failure to start server"); |
43 | exit(-1); | 47 | exit(-1); |
44 | } | 48 | } |
@@ -70,7 +74,6 @@ unsigned long long Listener::revision() const | |||
70 | 74 | ||
71 | void Listener::closeAllConnections() | 75 | void Listener::closeAllConnections() |
72 | { | 76 | { |
73 | //TODO: close all client connectionsin m_connections | ||
74 | for (Client &client: m_connections) { | 77 | for (Client &client: m_connections) { |
75 | if (client.socket) { | 78 | if (client.socket) { |
76 | client.socket->close(); | 79 | client.socket->close(); |
@@ -90,7 +93,7 @@ void Listener::acceptConnection() | |||
90 | } | 93 | } |
91 | 94 | ||
92 | Akonadi2::Console::main()->log("Got a connection"); | 95 | Akonadi2::Console::main()->log("Got a connection"); |
93 | Client client("Unknown Client" /*fixme: actual names!*/, socket); | 96 | Client client("Unknown Client", socket); |
94 | connect(socket, &QIODevice::readyRead, | 97 | connect(socket, &QIODevice::readyRead, |
95 | this, &Listener::readFromSocket); | 98 | this, &Listener::readFromSocket); |
96 | m_connections << client; | 99 | m_connections << client; |
@@ -137,6 +140,7 @@ void Listener::readFromSocket() | |||
137 | 140 | ||
138 | Akonadi2::Console::main()->log("Reading from socket..."); | 141 | Akonadi2::Console::main()->log("Reading from socket..."); |
139 | for (Client &client: m_connections) { | 142 | for (Client &client: m_connections) { |
143 | Akonadi2::Console::main()->log(QString("Checking %1 %2").arg((qlonglong)client.socket).arg((qlonglong)socket)); | ||
140 | if (client.socket == socket) { | 144 | if (client.socket == socket) { |
141 | Akonadi2::Console::main()->log(QString(" Client: %1").arg(client.name)); | 145 | Akonadi2::Console::main()->log(QString(" Client: %1").arg(client.name)); |
142 | client.commandBuffer += socket->readAll(); | 146 | client.commandBuffer += socket->readAll(); |
@@ -151,17 +155,19 @@ void Listener::readFromSocket() | |||
151 | 155 | ||
152 | bool Listener::processClientBuffer(Client &client) | 156 | bool Listener::processClientBuffer(Client &client) |
153 | { | 157 | { |
154 | static const int headerSize = (sizeof(int) * 2); | 158 | static const int headerSize = (sizeof(int) + sizeof(uint)); |
155 | Akonadi2::Console::main()->log(QString("processing %1").arg(client.commandBuffer.size())); | ||
156 | if (client.commandBuffer.size() < headerSize) { | 159 | if (client.commandBuffer.size() < headerSize) { |
157 | return false; | 160 | return false; |
158 | } | 161 | } |
159 | 162 | ||
160 | int commandId, size; | 163 | int commandId; |
164 | uint size; | ||
161 | commandId = *(int*)client.commandBuffer.constData(); | 165 | commandId = *(int*)client.commandBuffer.constData(); |
162 | size = *(int*)(client.commandBuffer.constData() + sizeof(int)); | 166 | size = *(uint*)(client.commandBuffer.constData() + sizeof(uint)); |
163 | 167 | ||
164 | if (size <= client.commandBuffer.size() - headerSize) { | 168 | //TODO: reject messages above a certain size? |
169 | |||
170 | if (size <= uint(client.commandBuffer.size() - headerSize)) { | ||
165 | QByteArray data = client.commandBuffer.mid(headerSize, size); | 171 | QByteArray data = client.commandBuffer.mid(headerSize, size); |
166 | client.commandBuffer.remove(0, headerSize + size); | 172 | client.commandBuffer.remove(0, headerSize + size); |
167 | 173 | ||
@@ -169,18 +175,27 @@ bool Listener::processClientBuffer(Client &client) | |||
169 | case Akonadi2::Commands::HandshakeCommand: { | 175 | case Akonadi2::Commands::HandshakeCommand: { |
170 | auto buffer = Akonadi2::GetHandshake(data.constData()); | 176 | auto buffer = Akonadi2::GetHandshake(data.constData()); |
171 | Akonadi2::Console::main()->log(QString(" Handshake from %1").arg(buffer->name()->c_str())); | 177 | Akonadi2::Console::main()->log(QString(" Handshake from %1").arg(buffer->name()->c_str())); |
178 | client.name = buffer->name()->c_str(); | ||
172 | sendCurrentRevision(client); | 179 | sendCurrentRevision(client); |
173 | break; | 180 | break; |
174 | } | 181 | } |
182 | case Akonadi2::Commands::SynchronizeCommand: { | ||
183 | Akonadi2::Console::main()->log(QString(" Synchronize request from %1").arg(client.name)); | ||
184 | loadResource(); | ||
185 | //TODO: on failure ... what? | ||
186 | if (m_resource) { | ||
187 | m_resource->synchronizeWithSource(); | ||
188 | } | ||
189 | break; | ||
190 | } | ||
175 | default: | 191 | default: |
176 | // client.hasSentCommand = true; | ||
177 | break; | 192 | break; |
178 | } | 193 | } |
179 | 194 | ||
180 | return client.commandBuffer.size() >= headerSize; | 195 | return client.commandBuffer.size() >= headerSize; |
181 | } else { | ||
182 | return false; | ||
183 | } | 196 | } |
197 | |||
198 | return false; | ||
184 | } | 199 | } |
185 | 200 | ||
186 | void Listener::sendCurrentRevision(Client &client) | 201 | void Listener::sendCurrentRevision(Client &client) |
@@ -209,3 +224,22 @@ void Listener::updateClientsWithRevision() | |||
209 | } | 224 | } |
210 | m_fbb.Clear(); | 225 | m_fbb.Clear(); |
211 | } | 226 | } |
227 | |||
228 | void Listener::loadResource() | ||
229 | { | ||
230 | if (m_resource) { | ||
231 | return; | ||
232 | } | ||
233 | |||
234 | Akonadi2::ResourceFactory *resourceFactory = Akonadi2::ResourceFactory::load(m_resourceName); | ||
235 | if (resourceFactory) { | ||
236 | m_resource = resourceFactory->createResource(); | ||
237 | Akonadi2::Console::main()->log(QString("Resource factory: %1").arg((qlonglong)resourceFactory)); | ||
238 | Akonadi2::Console::main()->log(QString("\tResource: %1").arg((qlonglong)m_resource)); | ||
239 | //TODO: this doesn't really list all the facades .. fix | ||
240 | Akonadi2::Console::main()->log(QString("\tFacades: %1").arg(Akonadi2::FacadeFactory::instance().getFacade<Akonadi2::Domain::Event>(m_resourceName)->type())); | ||
241 | } else { | ||
242 | Akonadi2::Console::main()->log(QString("Failed to load resource %1").arg(m_resourceName)); | ||
243 | } | ||
244 | } | ||
245 | |||
diff --git a/synchronizer/listener.h b/synchronizer/listener.h index b20da7f..5c725df 100644 --- a/synchronizer/listener.h +++ b/synchronizer/listener.h | |||
@@ -26,26 +26,28 @@ | |||
26 | 26 | ||
27 | #include <flatbuffers/flatbuffers.h> | 27 | #include <flatbuffers/flatbuffers.h> |
28 | 28 | ||
29 | namespace Akonadi2 | ||
30 | { | ||
31 | class Resource; | ||
32 | } | ||
33 | |||
29 | class Client | 34 | class Client |
30 | { | 35 | { |
31 | public: | 36 | public: |
32 | Client() | 37 | Client() |
33 | : socket(nullptr), | 38 | : socket(nullptr) |
34 | hasSentCommand(false) | ||
35 | { | 39 | { |
36 | } | 40 | } |
37 | 41 | ||
38 | Client(const QString &n, QLocalSocket *s) | 42 | Client(const QString &n, QLocalSocket *s) |
39 | : name(n), | 43 | : name(n), |
40 | socket(s), | 44 | socket(s) |
41 | hasSentCommand(false) | ||
42 | { | 45 | { |
43 | } | 46 | } |
44 | 47 | ||
45 | QString name; | 48 | QString name; |
46 | QLocalSocket *socket; | 49 | QLocalSocket *socket; |
47 | QByteArray commandBuffer; | 50 | QByteArray commandBuffer; |
48 | bool hasSentCommand; | ||
49 | }; | 51 | }; |
50 | 52 | ||
51 | class Listener : public QObject | 53 | class Listener : public QObject |
@@ -75,8 +77,12 @@ private: | |||
75 | bool processClientBuffer(Client &client); | 77 | bool processClientBuffer(Client &client); |
76 | void sendCurrentRevision(Client &client); | 78 | void sendCurrentRevision(Client &client); |
77 | void updateClientsWithRevision(); | 79 | void updateClientsWithRevision(); |
80 | void loadResource(); | ||
81 | |||
78 | QLocalServer *m_server; | 82 | QLocalServer *m_server; |
79 | QVector<Client> m_connections; | 83 | QVector<Client> m_connections; |
80 | unsigned long long m_revision; | 84 | unsigned long long m_revision; |
81 | flatbuffers::FlatBufferBuilder m_fbb; | 85 | flatbuffers::FlatBufferBuilder m_fbb; |
86 | const QString m_resourceName; | ||
87 | Akonadi2::Resource *m_resource; | ||
82 | }; | 88 | }; |