diff options
Diffstat (limited to 'common/resourceaccess.cpp')
-rw-r--r-- | common/resourceaccess.cpp | 62 |
1 files changed, 35 insertions, 27 deletions
diff --git a/common/resourceaccess.cpp b/common/resourceaccess.cpp index 7a343f9..9ecdab1 100644 --- a/common/resourceaccess.cpp +++ b/common/resourceaccess.cpp | |||
@@ -72,7 +72,8 @@ public: | |||
72 | bool openingConnection; | 72 | bool openingConnection; |
73 | QByteArray partialMessageBuffer; | 73 | QByteArray partialMessageBuffer; |
74 | flatbuffers::FlatBufferBuilder fbb; | 74 | flatbuffers::FlatBufferBuilder fbb; |
75 | QVector<QueuedCommand *> commandQueue; | 75 | QVector<QSharedPointer<QueuedCommand>> commandQueue; |
76 | QMap<uint, QSharedPointer<QueuedCommand>> pendingCommands; | ||
76 | QMultiMap<uint, std::function<void(int error, const QString &errorMessage)> > resultHandler; | 77 | QMultiMap<uint, std::function<void(int error, const QString &errorMessage)> > resultHandler; |
77 | uint messageId; | 78 | uint messageId; |
78 | }; | 79 | }; |
@@ -136,13 +137,9 @@ Async::Job<void> ResourceAccess::sendCommand(int commandId) | |||
136 | } | 137 | } |
137 | f.setFinished(); | 138 | f.setFinished(); |
138 | }; | 139 | }; |
140 | d->commandQueue << QSharedPointer<QueuedCommand>::create(commandId, continuation); | ||
139 | if (isReady()) { | 141 | if (isReady()) { |
140 | log(QString("Sending command %1").arg(commandId)); | 142 | processCommandQueue(); |
141 | d->messageId++; | ||
142 | registerCallback(d->messageId, continuation); | ||
143 | Commands::write(d->socket, d->messageId, commandId); | ||
144 | } else { | ||
145 | d->commandQueue << new QueuedCommand(commandId, continuation); | ||
146 | } | 143 | } |
147 | }); | 144 | }); |
148 | } | 145 | } |
@@ -160,14 +157,9 @@ Async::Job<void> ResourceAccess::sendCommand(int commandId, flatbuffers::FlatBu | |||
160 | } | 157 | } |
161 | }; | 158 | }; |
162 | 159 | ||
160 | d->commandQueue << QSharedPointer<QueuedCommand>::create(commandId, buffer, callback); | ||
163 | if (isReady()) { | 161 | if (isReady()) { |
164 | //TODO: We probably always want to queue the command, so we can resend it in case something goes wrong | 162 | processCommandQueue(); |
165 | d->messageId++; | ||
166 | log(QString("Sending command %1 with messageId %2").arg(commandId).arg(d->messageId)); | ||
167 | registerCallback(d->messageId, callback); | ||
168 | Commands::write(d->socket, d->messageId, commandId, buffer.constData(), buffer.size()); | ||
169 | } else { | ||
170 | d->commandQueue << new QueuedCommand(commandId, buffer, callback); | ||
171 | } | 163 | } |
172 | }); | 164 | }); |
173 | } | 165 | } |
@@ -202,6 +194,34 @@ void ResourceAccess::close() | |||
202 | d->socket->close(); | 194 | d->socket->close(); |
203 | } | 195 | } |
204 | 196 | ||
197 | void ResourceAccess::sendCommand(const QSharedPointer<QueuedCommand> &command) | ||
198 | { | ||
199 | Q_ASSERT(isReady()); | ||
200 | //TODO: we should have a timeout for commands | ||
201 | d->messageId++; | ||
202 | const auto messageId = d->messageId; | ||
203 | log(QString("Sending command %1 with messageId %2").arg(command->commandId).arg(d->messageId)); | ||
204 | if (command->callback) { | ||
205 | registerCallback(d->messageId, [this, messageId, command](int number, QString foo) { | ||
206 | d->pendingCommands.remove(messageId); | ||
207 | command->callback(number, foo); | ||
208 | }); | ||
209 | } | ||
210 | //Keep track of the command until we're sure it arrived | ||
211 | d->pendingCommands.insert(d->messageId, command); | ||
212 | Commands::write(d->socket, d->messageId, command->commandId, command->buffer.constData(), command->buffer.size()); | ||
213 | } | ||
214 | |||
215 | void ResourceAccess::processCommandQueue() | ||
216 | { | ||
217 | //TODO: serialize instead of blast them all through the socket? | ||
218 | log(QString("We have %1 queued commands").arg(d->commandQueue.size())); | ||
219 | for (auto command: d->commandQueue) { | ||
220 | sendCommand(command); | ||
221 | } | ||
222 | d->commandQueue.clear(); | ||
223 | } | ||
224 | |||
205 | void ResourceAccess::connected() | 225 | void ResourceAccess::connected() |
206 | { | 226 | { |
207 | d->startingProcess = false; | 227 | d->startingProcess = false; |
@@ -221,19 +241,7 @@ void ResourceAccess::connected() | |||
221 | d->fbb.Clear(); | 241 | d->fbb.Clear(); |
222 | } | 242 | } |
223 | 243 | ||
224 | //TODO: should confirm the commands made it with a response? | 244 | processCommandQueue(); |
225 | //TODO: serialize instead of blast them all through the socket? | ||
226 | log(QString("We have %1 queued commands").arg(d->commandQueue.size())); | ||
227 | for (QueuedCommand *command: d->commandQueue) { | ||
228 | d->messageId++; | ||
229 | log(QString("Sending command %1 with messageId %2").arg(command->commandId).arg(d->messageId)); | ||
230 | if (command->callback) { | ||
231 | registerCallback(d->messageId, command->callback); | ||
232 | } | ||
233 | Commands::write(d->socket, d->messageId, command->commandId, command->buffer.constData(), command->buffer.size()); | ||
234 | delete command; | ||
235 | } | ||
236 | d->commandQueue.clear(); | ||
237 | 245 | ||
238 | emit ready(true); | 246 | emit ready(true); |
239 | } | 247 | } |