summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAaron Seigo <aseigo@kde.org>2014-12-01 11:54:37 +0100
committerAaron Seigo <aseigo@kde.org>2014-12-01 12:01:13 +0100
commit2f52bd2ed26cc151fa90ed9e06b1ea9990f9fc7c (patch)
treee125101f32ad78a679c63f9c08571d3b11cd691f
parentaf38dcdf3a836a1b94064a617acac387a2de6539 (diff)
downloadsink-2f52bd2ed26cc151fa90ed9e06b1ea9990f9fc7c.tar.gz
sink-2f52bd2ed26cc151fa90ed9e06b1ea9990f9fc7c.zip
send revision updates from resource to client
this includes an initial revision message on connect
-rw-r--r--client/resourceaccess.cpp45
-rw-r--r--client/resourceaccess.h4
-rw-r--r--common/CMakeLists.txt3
-rw-r--r--common/commands.h1
-rw-r--r--common/commands/revisionupdate.fbs7
-rw-r--r--resource/listener.cpp72
-rw-r--r--resource/listener.h16
7 files changed, 136 insertions, 12 deletions
diff --git a/client/resourceaccess.cpp b/client/resourceaccess.cpp
index 4042219..e73c773 100644
--- a/client/resourceaccess.cpp
+++ b/client/resourceaccess.cpp
@@ -3,6 +3,7 @@
3#include "common/console.h" 3#include "common/console.h"
4#include "common/commands.h" 4#include "common/commands.h"
5#include "common/handshake_generated.h" 5#include "common/handshake_generated.h"
6#include "common/revisionupdate_generated.h"
6 7
7#include <QDebug> 8#include <QDebug>
8#include <QProcess> 9#include <QProcess>
@@ -26,6 +27,9 @@ ResourceAccess::ResourceAccess(const QString &resourceName, QObject *parent)
26 this, &ResourceAccess::disconnected); 27 this, &ResourceAccess::disconnected);
27 connect(m_socket, SIGNAL(error(QLocalSocket::LocalSocketError)), 28 connect(m_socket, SIGNAL(error(QLocalSocket::LocalSocketError)),
28 this, SLOT(connectionError(QLocalSocket::LocalSocketError))); 29 this, SLOT(connectionError(QLocalSocket::LocalSocketError)));
30 connect(m_socket, &QIODevice::readyRead,
31 this, &ResourceAccess::readResourceMessage);
32
29} 33}
30 34
31ResourceAccess::~ResourceAccess() 35ResourceAccess::~ResourceAccess()
@@ -109,3 +113,44 @@ void ResourceAccess::connectionError(QLocalSocket::LocalSocketError error)
109 } 113 }
110} 114}
111 115
116void ResourceAccess::readResourceMessage()
117{
118 if (!m_socket->isValid()) {
119 return;
120 }
121
122 m_partialMessageBuffer += m_socket->readAll();
123
124 // should be scheduled rather than processed all at once
125 while (processMessageBuffer()) {}
126}
127
128bool ResourceAccess::processMessageBuffer()
129{
130 static const int headerSize = (sizeof(int) * 2);
131 Console::main()->log(QString("processing %1").arg(m_partialMessageBuffer.size()));
132 if (m_partialMessageBuffer.size() < headerSize) {
133 return false;
134 }
135
136 const int commandId = *(int*)m_partialMessageBuffer.constData();
137 const int size = *(int*)(m_partialMessageBuffer.constData() + sizeof(int));
138
139 if (size > m_partialMessageBuffer.size() - headerSize) {
140 return false;
141 }
142
143 switch (commandId) {
144 case Commands::RevisionUpdateCommand: {
145 auto buffer = Toynadi::GetRevisionUpdate(m_partialMessageBuffer.constData() + headerSize);
146 Console::main()->log(QString(" Revision updated to: %1").arg(buffer->revision()));
147 emit revisionChanged(buffer->revision());
148 break;
149 }
150 default:
151 break;
152 }
153
154 m_partialMessageBuffer.remove(0, headerSize + size);
155 return m_partialMessageBuffer.size() >= headerSize;
156}
diff --git a/client/resourceaccess.h b/client/resourceaccess.h
index 53b46c5..09df614 100644
--- a/client/resourceaccess.h
+++ b/client/resourceaccess.h
@@ -21,15 +21,19 @@ public Q_SLOTS:
21 21
22Q_SIGNALS: 22Q_SIGNALS:
23 void ready(bool isReady); 23 void ready(bool isReady);
24 void revisionChanged(unsigned long long revision);
24 25
25private Q_SLOTS: 26private Q_SLOTS:
26 void connected(); 27 void connected();
27 void disconnected(); 28 void disconnected();
28 void connectionError(QLocalSocket::LocalSocketError error); 29 void connectionError(QLocalSocket::LocalSocketError error);
30 void readResourceMessage();
31 bool processMessageBuffer();
29 32
30private: 33private:
31 QString m_resourceName; 34 QString m_resourceName;
32 QLocalSocket *m_socket; 35 QLocalSocket *m_socket;
33 QTimer *m_tryOpenTimer; 36 QTimer *m_tryOpenTimer;
34 bool m_startingProcess; 37 bool m_startingProcess;
38 QByteArray m_partialMessageBuffer;
35}; 39};
diff --git a/common/CMakeLists.txt b/common/CMakeLists.txt
index 32e10f6..a0d3ac9 100644
--- a/common/CMakeLists.txt
+++ b/common/CMakeLists.txt
@@ -1,5 +1,6 @@
1project(toynadicommon) 1project(toynadicommon)
2generate_flatbuffers(commands/handshake) 2generate_flatbuffers(commands/handshake
3 commands/revisionupdate)
3 4
4set(command_SRCS 5set(command_SRCS
5 console.cpp 6 console.cpp
diff --git a/common/commands.h b/common/commands.h
index 74174ee..68a7c1e 100644
--- a/common/commands.h
+++ b/common/commands.h
@@ -6,6 +6,7 @@ namespace Commands
6enum CommandIds { 6enum CommandIds {
7 UnknownCommand = 0, 7 UnknownCommand = 0,
8 HandshakeCommand, 8 HandshakeCommand,
9 RevisionUpdateCommand,
9 CustomCommand = 0xffff 10 CustomCommand = 0xffff
10}; 11};
11 12
diff --git a/common/commands/revisionupdate.fbs b/common/commands/revisionupdate.fbs
new file mode 100644
index 0000000..d95f1b5
--- /dev/null
+++ b/common/commands/revisionupdate.fbs
@@ -0,0 +1,7 @@
1namespace Toynadi;
2
3table RevisionUpdate {
4 revision: ulong;
5}
6
7root_type RevisionUpdate; \ No newline at end of file
diff --git a/resource/listener.cpp b/resource/listener.cpp
index 8a571e9..5dd8cc3 100644
--- a/resource/listener.cpp
+++ b/resource/listener.cpp
@@ -3,13 +3,15 @@
3#include "common/console.h" 3#include "common/console.h"
4#include "common/commands.h" 4#include "common/commands.h"
5#include "common/handshake_generated.h" 5#include "common/handshake_generated.h"
6#include "common/revisionupdate_generated.h"
6 7
7#include <QLocalSocket> 8#include <QLocalSocket>
8#include <QTimer> 9#include <QTimer>
9 10
10Listener::Listener(const QString &resource, QObject *parent) 11Listener::Listener(const QString &resource, QObject *parent)
11 : QObject(parent), 12 : QObject(parent),
12 m_server(new QLocalServer(this)) 13 m_server(new QLocalServer(this)),
14 m_revision(0)
13{ 15{
14 connect(m_server, &QLocalServer::newConnection, 16 connect(m_server, &QLocalServer::newConnection,
15 this, &Listener::acceptConnection); 17 this, &Listener::acceptConnection);
@@ -34,6 +36,19 @@ Listener::~Listener()
34{ 36{
35} 37}
36 38
39void Listener::setRevision(unsigned long long revision)
40{
41 if (m_revision != revision) {
42 m_revision = revision;
43 updateClientsWithRevision();
44 }
45}
46
47unsigned long long Listener::revision() const
48{
49 return m_revision;
50}
51
37void Listener::closeAllConnections() 52void Listener::closeAllConnections()
38{ 53{
39 //TODO: close all client connectionsin m_connections 54 //TODO: close all client connectionsin m_connections
@@ -73,7 +88,7 @@ void Listener::clientDropped()
73 } 88 }
74 89
75 Console::main()->log("Dropping connection..."); 90 Console::main()->log("Dropping connection...");
76 QMutableListIterator<Client> it(m_connections); 91 QMutableVectorIterator<Client> it(m_connections);
77 while (it.hasNext()) { 92 while (it.hasNext()) {
78 const Client &client = it.next(); 93 const Client &client = it.next();
79 if (client.socket == socket) { 94 if (client.socket == socket) {
@@ -102,24 +117,25 @@ void Listener::readFromSocket()
102 } 117 }
103 118
104 Console::main()->log("Reading from socket..."); 119 Console::main()->log("Reading from socket...");
105 QMutableListIterator<Client> it(m_connections); 120 for (Client &client: m_connections) {
106 while (it.hasNext()) {
107 Client &client = it.next();
108 if (client.socket == socket) { 121 if (client.socket == socket) {
109 Console::main()->log(QString(" Client: %1").arg(client.name)); 122 Console::main()->log(QString(" Client: %1").arg(client.name));
110 client.commandBuffer += socket->readAll(); 123 client.commandBuffer += socket->readAll();
111 processClientBuffer(client); 124 // FIXME: schedule these rather than process them all at once
125 // right now this can lead to starvation of clients due to
126 // one overly active client
127 while (processClientBuffer(client)) {}
112 break; 128 break;
113 } 129 }
114 } 130 }
115} 131}
116 132
117void Listener::processClientBuffer(Client &client) 133bool Listener::processClientBuffer(Client &client)
118{ 134{
119 static const int headerSize = (sizeof(int) * 2); 135 static const int headerSize = (sizeof(int) * 2);
120 Console::main()->log(QString("processing %1").arg(client.commandBuffer.size())); 136 Console::main()->log(QString("processing %1").arg(client.commandBuffer.size()));
121 if (client.commandBuffer.size() < headerSize) { 137 if (client.commandBuffer.size() < headerSize) {
122 return; 138 return false;
123 } 139 }
124 140
125 int commandId, size; 141 int commandId, size;
@@ -134,12 +150,50 @@ void Listener::processClientBuffer(Client &client)
134 case Commands::HandshakeCommand: { 150 case Commands::HandshakeCommand: {
135 auto buffer = Toynadi::GetHandshake(data.constData()); 151 auto buffer = Toynadi::GetHandshake(data.constData());
136 Console::main()->log(QString(" Handshake from %1").arg(buffer->name()->c_str())); 152 Console::main()->log(QString(" Handshake from %1").arg(buffer->name()->c_str()));
137 //TODO: reply? 153 sendCurrentRevision(client);
138 break; 154 break;
139 } 155 }
140 default: 156 default:
141 // client.hasSentCommand = true; 157 // client.hasSentCommand = true;
142 break; 158 break;
143 } 159 }
160
161 return client.commandBuffer.size() >= headerSize;
162 } else {
163 return false;
164 }
165}
166
167void Listener::sendCurrentRevision(Client &client)
168{
169 if (!client.socket || !client.socket->isValid()) {
170 return;
171 }
172
173 flatbuffers::FlatBufferBuilder fbb;
174 auto command = Toynadi::CreateRevisionUpdate(fbb, m_revision);
175 Toynadi::FinishRevisionUpdateBuffer(fbb, command);
176 const int commandId = Commands::RevisionUpdateCommand;
177 const int dataSize = fbb.GetSize();
178 client.socket->write((const char*)&commandId, sizeof(int));
179 client.socket->write((const char*)&dataSize, sizeof(int));
180 client.socket->write((const char*)fbb.GetBufferPointer(), dataSize);
181}
182
183void Listener::updateClientsWithRevision()
184{
185 flatbuffers::FlatBufferBuilder fbb;
186 auto command = Toynadi::CreateRevisionUpdate(fbb, m_revision);
187 Toynadi::FinishRevisionUpdateBuffer(fbb, command);
188 const int commandId = Commands::RevisionUpdateCommand;
189 const int dataSize = fbb.GetSize();
190
191 for (const Client &client: m_connections) {
192 if (!client.socket || !client.socket->isValid()) {
193 continue;
194 }
195 client.socket->write((const char*)&commandId, sizeof(int));
196 client.socket->write((const char*)&dataSize, sizeof(int));
197 client.socket->write((const char*)fbb.GetBufferPointer(), dataSize);
144 } 198 }
145} 199}
diff --git a/resource/listener.h b/resource/listener.h
index bfafe5f..0d8f388 100644
--- a/resource/listener.h
+++ b/resource/listener.h
@@ -8,6 +8,12 @@
8class Client 8class Client
9{ 9{
10public: 10public:
11 Client()
12 : socket(nullptr),
13 hasSentCommand(false)
14 {
15 }
16
11 Client(const QString &n, QLocalSocket *s) 17 Client(const QString &n, QLocalSocket *s)
12 : name(n), 18 : name(n),
13 socket(s), 19 socket(s),
@@ -29,6 +35,9 @@ public:
29 Listener(const QString &resourceName, QObject *parent = 0); 35 Listener(const QString &resourceName, QObject *parent = 0);
30 ~Listener(); 36 ~Listener();
31 37
38 void setRevision(unsigned long long revision);
39 unsigned long long revision() const;
40
32Q_SIGNALS: 41Q_SIGNALS:
33 void noClients(); 42 void noClients();
34 43
@@ -42,7 +51,10 @@ private Q_SLOTS:
42 void readFromSocket(); 51 void readFromSocket();
43 52
44private: 53private:
45 void processClientBuffer(Client &client); 54 bool processClientBuffer(Client &client);
55 void sendCurrentRevision(Client &client);
56 void updateClientsWithRevision();
46 QLocalServer *m_server; 57 QLocalServer *m_server;
47 QList<Client> m_connections; 58 QVector<Client> m_connections;
59 unsigned long long m_revision;
48}; \ No newline at end of file 60}; \ No newline at end of file