diff options
-rw-r--r-- | client/resourceaccess.cpp | 45 | ||||
-rw-r--r-- | client/resourceaccess.h | 4 | ||||
-rw-r--r-- | common/CMakeLists.txt | 3 | ||||
-rw-r--r-- | common/commands.h | 1 | ||||
-rw-r--r-- | common/commands/revisionupdate.fbs | 7 | ||||
-rw-r--r-- | resource/listener.cpp | 72 | ||||
-rw-r--r-- | resource/listener.h | 16 |
7 files changed, 136 insertions, 12 deletions
diff --git a/client/resourceaccess.cpp b/client/resourceaccess.cpp index 4042219..e73c773 100644 --- a/client/resourceaccess.cpp +++ b/client/resourceaccess.cpp | |||
@@ -3,6 +3,7 @@ | |||
3 | #include "common/console.h" | 3 | #include "common/console.h" |
4 | #include "common/commands.h" | 4 | #include "common/commands.h" |
5 | #include "common/handshake_generated.h" | 5 | #include "common/handshake_generated.h" |
6 | #include "common/revisionupdate_generated.h" | ||
6 | 7 | ||
7 | #include <QDebug> | 8 | #include <QDebug> |
8 | #include <QProcess> | 9 | #include <QProcess> |
@@ -26,6 +27,9 @@ ResourceAccess::ResourceAccess(const QString &resourceName, QObject *parent) | |||
26 | this, &ResourceAccess::disconnected); | 27 | this, &ResourceAccess::disconnected); |
27 | connect(m_socket, SIGNAL(error(QLocalSocket::LocalSocketError)), | 28 | connect(m_socket, SIGNAL(error(QLocalSocket::LocalSocketError)), |
28 | this, SLOT(connectionError(QLocalSocket::LocalSocketError))); | 29 | this, SLOT(connectionError(QLocalSocket::LocalSocketError))); |
30 | connect(m_socket, &QIODevice::readyRead, | ||
31 | this, &ResourceAccess::readResourceMessage); | ||
32 | |||
29 | } | 33 | } |
30 | 34 | ||
31 | ResourceAccess::~ResourceAccess() | 35 | ResourceAccess::~ResourceAccess() |
@@ -109,3 +113,44 @@ void ResourceAccess::connectionError(QLocalSocket::LocalSocketError error) | |||
109 | } | 113 | } |
110 | } | 114 | } |
111 | 115 | ||
116 | void ResourceAccess::readResourceMessage() | ||
117 | { | ||
118 | if (!m_socket->isValid()) { | ||
119 | return; | ||
120 | } | ||
121 | |||
122 | m_partialMessageBuffer += m_socket->readAll(); | ||
123 | |||
124 | // should be scheduled rather than processed all at once | ||
125 | while (processMessageBuffer()) {} | ||
126 | } | ||
127 | |||
128 | bool ResourceAccess::processMessageBuffer() | ||
129 | { | ||
130 | static const int headerSize = (sizeof(int) * 2); | ||
131 | Console::main()->log(QString("processing %1").arg(m_partialMessageBuffer.size())); | ||
132 | if (m_partialMessageBuffer.size() < headerSize) { | ||
133 | return false; | ||
134 | } | ||
135 | |||
136 | const int commandId = *(int*)m_partialMessageBuffer.constData(); | ||
137 | const int size = *(int*)(m_partialMessageBuffer.constData() + sizeof(int)); | ||
138 | |||
139 | if (size > m_partialMessageBuffer.size() - headerSize) { | ||
140 | return false; | ||
141 | } | ||
142 | |||
143 | switch (commandId) { | ||
144 | case Commands::RevisionUpdateCommand: { | ||
145 | auto buffer = Toynadi::GetRevisionUpdate(m_partialMessageBuffer.constData() + headerSize); | ||
146 | Console::main()->log(QString(" Revision updated to: %1").arg(buffer->revision())); | ||
147 | emit revisionChanged(buffer->revision()); | ||
148 | break; | ||
149 | } | ||
150 | default: | ||
151 | break; | ||
152 | } | ||
153 | |||
154 | m_partialMessageBuffer.remove(0, headerSize + size); | ||
155 | return m_partialMessageBuffer.size() >= headerSize; | ||
156 | } | ||
diff --git a/client/resourceaccess.h b/client/resourceaccess.h index 53b46c5..09df614 100644 --- a/client/resourceaccess.h +++ b/client/resourceaccess.h | |||
@@ -21,15 +21,19 @@ public Q_SLOTS: | |||
21 | 21 | ||
22 | Q_SIGNALS: | 22 | Q_SIGNALS: |
23 | void ready(bool isReady); | 23 | void ready(bool isReady); |
24 | void revisionChanged(unsigned long long revision); | ||
24 | 25 | ||
25 | private Q_SLOTS: | 26 | private Q_SLOTS: |
26 | void connected(); | 27 | void connected(); |
27 | void disconnected(); | 28 | void disconnected(); |
28 | void connectionError(QLocalSocket::LocalSocketError error); | 29 | void connectionError(QLocalSocket::LocalSocketError error); |
30 | void readResourceMessage(); | ||
31 | bool processMessageBuffer(); | ||
29 | 32 | ||
30 | private: | 33 | private: |
31 | QString m_resourceName; | 34 | QString m_resourceName; |
32 | QLocalSocket *m_socket; | 35 | QLocalSocket *m_socket; |
33 | QTimer *m_tryOpenTimer; | 36 | QTimer *m_tryOpenTimer; |
34 | bool m_startingProcess; | 37 | bool m_startingProcess; |
38 | QByteArray m_partialMessageBuffer; | ||
35 | }; | 39 | }; |
diff --git a/common/CMakeLists.txt b/common/CMakeLists.txt index 32e10f6..a0d3ac9 100644 --- a/common/CMakeLists.txt +++ b/common/CMakeLists.txt | |||
@@ -1,5 +1,6 @@ | |||
1 | project(toynadicommon) | 1 | project(toynadicommon) |
2 | generate_flatbuffers(commands/handshake) | 2 | generate_flatbuffers(commands/handshake |
3 | commands/revisionupdate) | ||
3 | 4 | ||
4 | set(command_SRCS | 5 | set(command_SRCS |
5 | console.cpp | 6 | console.cpp |
diff --git a/common/commands.h b/common/commands.h index 74174ee..68a7c1e 100644 --- a/common/commands.h +++ b/common/commands.h | |||
@@ -6,6 +6,7 @@ namespace Commands | |||
6 | enum CommandIds { | 6 | enum CommandIds { |
7 | UnknownCommand = 0, | 7 | UnknownCommand = 0, |
8 | HandshakeCommand, | 8 | HandshakeCommand, |
9 | RevisionUpdateCommand, | ||
9 | CustomCommand = 0xffff | 10 | CustomCommand = 0xffff |
10 | }; | 11 | }; |
11 | 12 | ||
diff --git a/common/commands/revisionupdate.fbs b/common/commands/revisionupdate.fbs new file mode 100644 index 0000000..d95f1b5 --- /dev/null +++ b/common/commands/revisionupdate.fbs | |||
@@ -0,0 +1,7 @@ | |||
1 | namespace Toynadi; | ||
2 | |||
3 | table RevisionUpdate { | ||
4 | revision: ulong; | ||
5 | } | ||
6 | |||
7 | root_type RevisionUpdate; \ No newline at end of file | ||
diff --git a/resource/listener.cpp b/resource/listener.cpp index 8a571e9..5dd8cc3 100644 --- a/resource/listener.cpp +++ b/resource/listener.cpp | |||
@@ -3,13 +3,15 @@ | |||
3 | #include "common/console.h" | 3 | #include "common/console.h" |
4 | #include "common/commands.h" | 4 | #include "common/commands.h" |
5 | #include "common/handshake_generated.h" | 5 | #include "common/handshake_generated.h" |
6 | #include "common/revisionupdate_generated.h" | ||
6 | 7 | ||
7 | #include <QLocalSocket> | 8 | #include <QLocalSocket> |
8 | #include <QTimer> | 9 | #include <QTimer> |
9 | 10 | ||
10 | Listener::Listener(const QString &resource, QObject *parent) | 11 | Listener::Listener(const QString &resource, QObject *parent) |
11 | : QObject(parent), | 12 | : QObject(parent), |
12 | m_server(new QLocalServer(this)) | 13 | m_server(new QLocalServer(this)), |
14 | m_revision(0) | ||
13 | { | 15 | { |
14 | connect(m_server, &QLocalServer::newConnection, | 16 | connect(m_server, &QLocalServer::newConnection, |
15 | this, &Listener::acceptConnection); | 17 | this, &Listener::acceptConnection); |
@@ -34,6 +36,19 @@ Listener::~Listener() | |||
34 | { | 36 | { |
35 | } | 37 | } |
36 | 38 | ||
39 | void Listener::setRevision(unsigned long long revision) | ||
40 | { | ||
41 | if (m_revision != revision) { | ||
42 | m_revision = revision; | ||
43 | updateClientsWithRevision(); | ||
44 | } | ||
45 | } | ||
46 | |||
47 | unsigned long long Listener::revision() const | ||
48 | { | ||
49 | return m_revision; | ||
50 | } | ||
51 | |||
37 | void Listener::closeAllConnections() | 52 | void Listener::closeAllConnections() |
38 | { | 53 | { |
39 | //TODO: close all client connectionsin m_connections | 54 | //TODO: close all client connectionsin m_connections |
@@ -73,7 +88,7 @@ void Listener::clientDropped() | |||
73 | } | 88 | } |
74 | 89 | ||
75 | Console::main()->log("Dropping connection..."); | 90 | Console::main()->log("Dropping connection..."); |
76 | QMutableListIterator<Client> it(m_connections); | 91 | QMutableVectorIterator<Client> it(m_connections); |
77 | while (it.hasNext()) { | 92 | while (it.hasNext()) { |
78 | const Client &client = it.next(); | 93 | const Client &client = it.next(); |
79 | if (client.socket == socket) { | 94 | if (client.socket == socket) { |
@@ -102,24 +117,25 @@ void Listener::readFromSocket() | |||
102 | } | 117 | } |
103 | 118 | ||
104 | Console::main()->log("Reading from socket..."); | 119 | Console::main()->log("Reading from socket..."); |
105 | QMutableListIterator<Client> it(m_connections); | 120 | for (Client &client: m_connections) { |
106 | while (it.hasNext()) { | ||
107 | Client &client = it.next(); | ||
108 | if (client.socket == socket) { | 121 | if (client.socket == socket) { |
109 | Console::main()->log(QString(" Client: %1").arg(client.name)); | 122 | Console::main()->log(QString(" Client: %1").arg(client.name)); |
110 | client.commandBuffer += socket->readAll(); | 123 | client.commandBuffer += socket->readAll(); |
111 | processClientBuffer(client); | 124 | // FIXME: schedule these rather than process them all at once |
125 | // right now this can lead to starvation of clients due to | ||
126 | // one overly active client | ||
127 | while (processClientBuffer(client)) {} | ||
112 | break; | 128 | break; |
113 | } | 129 | } |
114 | } | 130 | } |
115 | } | 131 | } |
116 | 132 | ||
117 | void Listener::processClientBuffer(Client &client) | 133 | bool Listener::processClientBuffer(Client &client) |
118 | { | 134 | { |
119 | static const int headerSize = (sizeof(int) * 2); | 135 | static const int headerSize = (sizeof(int) * 2); |
120 | Console::main()->log(QString("processing %1").arg(client.commandBuffer.size())); | 136 | Console::main()->log(QString("processing %1").arg(client.commandBuffer.size())); |
121 | if (client.commandBuffer.size() < headerSize) { | 137 | if (client.commandBuffer.size() < headerSize) { |
122 | return; | 138 | return false; |
123 | } | 139 | } |
124 | 140 | ||
125 | int commandId, size; | 141 | int commandId, size; |
@@ -134,12 +150,50 @@ void Listener::processClientBuffer(Client &client) | |||
134 | case Commands::HandshakeCommand: { | 150 | case Commands::HandshakeCommand: { |
135 | auto buffer = Toynadi::GetHandshake(data.constData()); | 151 | auto buffer = Toynadi::GetHandshake(data.constData()); |
136 | Console::main()->log(QString(" Handshake from %1").arg(buffer->name()->c_str())); | 152 | Console::main()->log(QString(" Handshake from %1").arg(buffer->name()->c_str())); |
137 | //TODO: reply? | 153 | sendCurrentRevision(client); |
138 | break; | 154 | break; |
139 | } | 155 | } |
140 | default: | 156 | default: |
141 | // client.hasSentCommand = true; | 157 | // client.hasSentCommand = true; |
142 | break; | 158 | break; |
143 | } | 159 | } |
160 | |||
161 | return client.commandBuffer.size() >= headerSize; | ||
162 | } else { | ||
163 | return false; | ||
164 | } | ||
165 | } | ||
166 | |||
167 | void Listener::sendCurrentRevision(Client &client) | ||
168 | { | ||
169 | if (!client.socket || !client.socket->isValid()) { | ||
170 | return; | ||
171 | } | ||
172 | |||
173 | flatbuffers::FlatBufferBuilder fbb; | ||
174 | auto command = Toynadi::CreateRevisionUpdate(fbb, m_revision); | ||
175 | Toynadi::FinishRevisionUpdateBuffer(fbb, command); | ||
176 | const int commandId = Commands::RevisionUpdateCommand; | ||
177 | const int dataSize = fbb.GetSize(); | ||
178 | client.socket->write((const char*)&commandId, sizeof(int)); | ||
179 | client.socket->write((const char*)&dataSize, sizeof(int)); | ||
180 | client.socket->write((const char*)fbb.GetBufferPointer(), dataSize); | ||
181 | } | ||
182 | |||
183 | void Listener::updateClientsWithRevision() | ||
184 | { | ||
185 | flatbuffers::FlatBufferBuilder fbb; | ||
186 | auto command = Toynadi::CreateRevisionUpdate(fbb, m_revision); | ||
187 | Toynadi::FinishRevisionUpdateBuffer(fbb, command); | ||
188 | const int commandId = Commands::RevisionUpdateCommand; | ||
189 | const int dataSize = fbb.GetSize(); | ||
190 | |||
191 | for (const Client &client: m_connections) { | ||
192 | if (!client.socket || !client.socket->isValid()) { | ||
193 | continue; | ||
194 | } | ||
195 | client.socket->write((const char*)&commandId, sizeof(int)); | ||
196 | client.socket->write((const char*)&dataSize, sizeof(int)); | ||
197 | client.socket->write((const char*)fbb.GetBufferPointer(), dataSize); | ||
144 | } | 198 | } |
145 | } | 199 | } |
diff --git a/resource/listener.h b/resource/listener.h index bfafe5f..0d8f388 100644 --- a/resource/listener.h +++ b/resource/listener.h | |||
@@ -8,6 +8,12 @@ | |||
8 | class Client | 8 | class Client |
9 | { | 9 | { |
10 | public: | 10 | public: |
11 | Client() | ||
12 | : socket(nullptr), | ||
13 | hasSentCommand(false) | ||
14 | { | ||
15 | } | ||
16 | |||
11 | Client(const QString &n, QLocalSocket *s) | 17 | Client(const QString &n, QLocalSocket *s) |
12 | : name(n), | 18 | : name(n), |
13 | socket(s), | 19 | socket(s), |
@@ -29,6 +35,9 @@ public: | |||
29 | Listener(const QString &resourceName, QObject *parent = 0); | 35 | Listener(const QString &resourceName, QObject *parent = 0); |
30 | ~Listener(); | 36 | ~Listener(); |
31 | 37 | ||
38 | void setRevision(unsigned long long revision); | ||
39 | unsigned long long revision() const; | ||
40 | |||
32 | Q_SIGNALS: | 41 | Q_SIGNALS: |
33 | void noClients(); | 42 | void noClients(); |
34 | 43 | ||
@@ -42,7 +51,10 @@ private Q_SLOTS: | |||
42 | void readFromSocket(); | 51 | void readFromSocket(); |
43 | 52 | ||
44 | private: | 53 | private: |
45 | void processClientBuffer(Client &client); | 54 | bool processClientBuffer(Client &client); |
55 | void sendCurrentRevision(Client &client); | ||
56 | void updateClientsWithRevision(); | ||
46 | QLocalServer *m_server; | 57 | QLocalServer *m_server; |
47 | QList<Client> m_connections; | 58 | QVector<Client> m_connections; |
59 | unsigned long long m_revision; | ||
48 | }; \ No newline at end of file | 60 | }; \ No newline at end of file |