diff options
author | Aaron Seigo <aseigo@kde.org> | 2014-12-06 02:33:51 +0100 |
---|---|---|
committer | Aaron Seigo <aseigo@kde.org> | 2014-12-06 02:33:51 +0100 |
commit | 5f40ace47be289c74ad95948c75ed86676158639 (patch) | |
tree | b3c50012da8c7404398050a17724342866a9e83e /synchronizer | |
parent | 6b8994edba4fcc0663838b90f66fe2ad0f9e134a (diff) | |
download | sink-5f40ace47be289c74ad95948c75ed86676158639.tar.gz sink-5f40ace47be289c74ad95948c75ed86676158639.zip |
resource -> synchronizer
the resource will be the plugin that interacts with the source and store
Diffstat (limited to 'synchronizer')
-rw-r--r-- | synchronizer/CMakeLists.txt | 13 | ||||
-rw-r--r-- | synchronizer/listener.cpp | 192 | ||||
-rw-r--r-- | synchronizer/listener.h | 63 | ||||
-rw-r--r-- | synchronizer/main.cpp | 25 |
4 files changed, 293 insertions, 0 deletions
diff --git a/synchronizer/CMakeLists.txt b/synchronizer/CMakeLists.txt new file mode 100644 index 0000000..92cb465 --- /dev/null +++ b/synchronizer/CMakeLists.txt | |||
@@ -0,0 +1,13 @@ | |||
1 | project(akonadinext_synchronizer) | ||
2 | |||
3 | include_directories(${CMAKE_CURRENT_SOURCE_DIR} ${CMAKE_CURRENT_BINARY_DIR}) | ||
4 | |||
5 | set(akonadinextsynchronizer_SRCS | ||
6 | main.cpp | ||
7 | listener.cpp | ||
8 | ) | ||
9 | |||
10 | add_executable(${PROJECT_NAME} ${akonadinextsynchronizer_SRCS}) | ||
11 | target_link_libraries(${PROJECT_NAME} akonadinextcommon) | ||
12 | qt5_use_modules(${PROJECT_NAME} Widgets Network) | ||
13 | install(TARGETS ${PROJECT_NAME} DESTINATION bin) | ||
diff --git a/synchronizer/listener.cpp b/synchronizer/listener.cpp new file mode 100644 index 0000000..23a5a70 --- /dev/null +++ b/synchronizer/listener.cpp | |||
@@ -0,0 +1,192 @@ | |||
1 | #include "listener.h" | ||
2 | |||
3 | #include "common/console.h" | ||
4 | #include "common/commands.h" | ||
5 | #include "common/handshake_generated.h" | ||
6 | #include "common/revisionupdate_generated.h" | ||
7 | |||
8 | #include <QLocalSocket> | ||
9 | #include <QTimer> | ||
10 | |||
11 | Listener::Listener(const QString &resource, QObject *parent) | ||
12 | : QObject(parent), | ||
13 | m_server(new QLocalServer(this)), | ||
14 | m_revision(0) | ||
15 | { | ||
16 | connect(m_server, &QLocalServer::newConnection, | ||
17 | this, &Listener::acceptConnection); | ||
18 | Console::main()->log(QString("Trying to open %1").arg(resource)); | ||
19 | if (!m_server->listen(resource)) { | ||
20 | // FIXME: multiple starts need to be handled here | ||
21 | m_server->removeServer(resource); | ||
22 | if (!m_server->listen(resource)) { | ||
23 | Console::main()->log("Utter failure to start server"); | ||
24 | exit(-1); | ||
25 | } | ||
26 | } | ||
27 | |||
28 | if (m_server->isListening()) { | ||
29 | Console::main()->log(QString("Listening on %1").arg(m_server->serverName())); | ||
30 | } | ||
31 | |||
32 | QTimer::singleShot(2000, this, SLOT(checkConnections())); | ||
33 | } | ||
34 | |||
35 | Listener::~Listener() | ||
36 | { | ||
37 | } | ||
38 | |||
39 | void Listener::setRevision(unsigned long long revision) | ||
40 | { | ||
41 | if (m_revision != revision) { | ||
42 | m_revision = revision; | ||
43 | updateClientsWithRevision(); | ||
44 | } | ||
45 | } | ||
46 | |||
47 | unsigned long long Listener::revision() const | ||
48 | { | ||
49 | return m_revision; | ||
50 | } | ||
51 | |||
52 | void Listener::closeAllConnections() | ||
53 | { | ||
54 | //TODO: close all client connectionsin m_connections | ||
55 | for (Client &client: m_connections) { | ||
56 | if (client.socket) { | ||
57 | client.socket->close(); | ||
58 | delete client.socket; | ||
59 | client.socket = 0; | ||
60 | } | ||
61 | } | ||
62 | } | ||
63 | |||
64 | void Listener::acceptConnection() | ||
65 | { | ||
66 | Console::main()->log(QString("Accepting connection")); | ||
67 | QLocalSocket *socket = m_server->nextPendingConnection(); | ||
68 | |||
69 | if (!socket) { | ||
70 | return; | ||
71 | } | ||
72 | |||
73 | Console::main()->log("Got a connection"); | ||
74 | Client client("Unknown Client" /*fixme: actual names!*/, socket); | ||
75 | connect(socket, &QIODevice::readyRead, | ||
76 | this, &Listener::readFromSocket); | ||
77 | m_connections << client; | ||
78 | connect(socket, &QLocalSocket::disconnected, | ||
79 | this, &Listener::clientDropped); | ||
80 | |||
81 | } | ||
82 | |||
83 | void Listener::clientDropped() | ||
84 | { | ||
85 | QLocalSocket *socket = qobject_cast<QLocalSocket *>(sender()); | ||
86 | if (!socket) { | ||
87 | return; | ||
88 | } | ||
89 | |||
90 | Console::main()->log("Dropping connection..."); | ||
91 | QMutableVectorIterator<Client> it(m_connections); | ||
92 | while (it.hasNext()) { | ||
93 | const Client &client = it.next(); | ||
94 | if (client.socket == socket) { | ||
95 | Console::main()->log(QString(" dropped... %1").arg(client.name)); | ||
96 | it.remove(); | ||
97 | break; | ||
98 | } | ||
99 | } | ||
100 | |||
101 | checkConnections(); | ||
102 | } | ||
103 | |||
104 | void Listener::checkConnections() | ||
105 | { | ||
106 | if (m_connections.isEmpty()) { | ||
107 | m_server->close(); | ||
108 | emit noClients(); | ||
109 | } | ||
110 | } | ||
111 | |||
112 | void Listener::readFromSocket() | ||
113 | { | ||
114 | QLocalSocket *socket = qobject_cast<QLocalSocket *>(sender()); | ||
115 | if (!socket) { | ||
116 | return; | ||
117 | } | ||
118 | |||
119 | Console::main()->log("Reading from socket..."); | ||
120 | for (Client &client: m_connections) { | ||
121 | if (client.socket == socket) { | ||
122 | Console::main()->log(QString(" Client: %1").arg(client.name)); | ||
123 | client.commandBuffer += socket->readAll(); | ||
124 | // FIXME: schedule these rather than process them all at once | ||
125 | // right now this can lead to starvation of clients due to | ||
126 | // one overly active client | ||
127 | while (processClientBuffer(client)) {} | ||
128 | break; | ||
129 | } | ||
130 | } | ||
131 | } | ||
132 | |||
133 | bool Listener::processClientBuffer(Client &client) | ||
134 | { | ||
135 | static const int headerSize = (sizeof(int) * 2); | ||
136 | Console::main()->log(QString("processing %1").arg(client.commandBuffer.size())); | ||
137 | if (client.commandBuffer.size() < headerSize) { | ||
138 | return false; | ||
139 | } | ||
140 | |||
141 | int commandId, size; | ||
142 | commandId = *(int*)client.commandBuffer.constData(); | ||
143 | size = *(int*)(client.commandBuffer.constData() + sizeof(int)); | ||
144 | |||
145 | if (size <= client.commandBuffer.size() - headerSize) { | ||
146 | QByteArray data = client.commandBuffer.mid(headerSize, size); | ||
147 | client.commandBuffer.remove(0, headerSize + size); | ||
148 | |||
149 | switch (commandId) { | ||
150 | case Commands::HandshakeCommand: { | ||
151 | auto buffer = Akonadi::GetHandshake(data.constData()); | ||
152 | Console::main()->log(QString(" Handshake from %1").arg(buffer->name()->c_str())); | ||
153 | sendCurrentRevision(client); | ||
154 | break; | ||
155 | } | ||
156 | default: | ||
157 | // client.hasSentCommand = true; | ||
158 | break; | ||
159 | } | ||
160 | |||
161 | return client.commandBuffer.size() >= headerSize; | ||
162 | } else { | ||
163 | return false; | ||
164 | } | ||
165 | } | ||
166 | |||
167 | void Listener::sendCurrentRevision(Client &client) | ||
168 | { | ||
169 | if (!client.socket || !client.socket->isValid()) { | ||
170 | return; | ||
171 | } | ||
172 | |||
173 | auto command = Akonadi::CreateRevisionUpdate(m_fbb, m_revision); | ||
174 | Akonadi::FinishRevisionUpdateBuffer(m_fbb, command); | ||
175 | Commands::write(client.socket, Commands::RevisionUpdateCommand, m_fbb); | ||
176 | m_fbb.Clear(); | ||
177 | } | ||
178 | |||
179 | void Listener::updateClientsWithRevision() | ||
180 | { | ||
181 | auto command = Akonadi::CreateRevisionUpdate(m_fbb, m_revision); | ||
182 | Akonadi::FinishRevisionUpdateBuffer(m_fbb, command); | ||
183 | |||
184 | for (const Client &client: m_connections) { | ||
185 | if (!client.socket || !client.socket->isValid()) { | ||
186 | continue; | ||
187 | } | ||
188 | |||
189 | Commands::write(client.socket, Commands::RevisionUpdateCommand, m_fbb); | ||
190 | } | ||
191 | m_fbb.Clear(); | ||
192 | } | ||
diff --git a/synchronizer/listener.h b/synchronizer/listener.h new file mode 100644 index 0000000..dcc3818 --- /dev/null +++ b/synchronizer/listener.h | |||
@@ -0,0 +1,63 @@ | |||
1 | #pragma once | ||
2 | |||
3 | #include <QLocalServer> | ||
4 | #include <QLocalSocket> | ||
5 | #include <QList> | ||
6 | #include <QObject> | ||
7 | |||
8 | #include <flatbuffers/flatbuffers.h> | ||
9 | |||
10 | class Client | ||
11 | { | ||
12 | public: | ||
13 | Client() | ||
14 | : socket(nullptr), | ||
15 | hasSentCommand(false) | ||
16 | { | ||
17 | } | ||
18 | |||
19 | Client(const QString &n, QLocalSocket *s) | ||
20 | : name(n), | ||
21 | socket(s), | ||
22 | hasSentCommand(false) | ||
23 | { | ||
24 | } | ||
25 | |||
26 | QString name; | ||
27 | QLocalSocket *socket; | ||
28 | QByteArray commandBuffer; | ||
29 | bool hasSentCommand; | ||
30 | }; | ||
31 | |||
32 | class Listener : public QObject | ||
33 | { | ||
34 | Q_OBJECT | ||
35 | |||
36 | public: | ||
37 | Listener(const QString &resourceName, QObject *parent = 0); | ||
38 | ~Listener(); | ||
39 | |||
40 | void setRevision(unsigned long long revision); | ||
41 | unsigned long long revision() const; | ||
42 | |||
43 | Q_SIGNALS: | ||
44 | void noClients(); | ||
45 | |||
46 | public Q_SLOTS: | ||
47 | void closeAllConnections(); | ||
48 | |||
49 | private Q_SLOTS: | ||
50 | void acceptConnection(); | ||
51 | void clientDropped(); | ||
52 | void checkConnections(); | ||
53 | void readFromSocket(); | ||
54 | |||
55 | private: | ||
56 | bool processClientBuffer(Client &client); | ||
57 | void sendCurrentRevision(Client &client); | ||
58 | void updateClientsWithRevision(); | ||
59 | QLocalServer *m_server; | ||
60 | QVector<Client> m_connections; | ||
61 | unsigned long long m_revision; | ||
62 | flatbuffers::FlatBufferBuilder m_fbb; | ||
63 | }; | ||
diff --git a/synchronizer/main.cpp b/synchronizer/main.cpp new file mode 100644 index 0000000..91c0a9a --- /dev/null +++ b/synchronizer/main.cpp | |||
@@ -0,0 +1,25 @@ | |||
1 | |||
2 | #include <QApplication> | ||
3 | |||
4 | #include "common/console.h" | ||
5 | #include "listener.h" | ||
6 | |||
7 | int main(int argc, char *argv[]) | ||
8 | { | ||
9 | QApplication app(argc, argv); | ||
10 | |||
11 | new Console(QString("Resource: %1").arg(argv[1])); | ||
12 | if (argc < 2) { | ||
13 | Console::main()->log("Not enough args"); | ||
14 | return app.exec(); | ||
15 | } | ||
16 | |||
17 | Listener *listener = new Listener(argv[1]); | ||
18 | |||
19 | QObject::connect(&app, &QCoreApplication::aboutToQuit, | ||
20 | listener, &Listener::closeAllConnections); | ||
21 | QObject::connect(listener, &Listener::noClients, | ||
22 | &app, &QCoreApplication::quit); | ||
23 | |||
24 | return app.exec(); | ||
25 | } \ No newline at end of file | ||