summaryrefslogtreecommitdiffstats
path: root/common/resourceaccess.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'common/resourceaccess.cpp')
-rw-r--r--common/resourceaccess.cpp62
1 files changed, 35 insertions, 27 deletions
diff --git a/common/resourceaccess.cpp b/common/resourceaccess.cpp
index 7a343f9..9ecdab1 100644
--- a/common/resourceaccess.cpp
+++ b/common/resourceaccess.cpp
@@ -72,7 +72,8 @@ public:
72 bool openingConnection; 72 bool openingConnection;
73 QByteArray partialMessageBuffer; 73 QByteArray partialMessageBuffer;
74 flatbuffers::FlatBufferBuilder fbb; 74 flatbuffers::FlatBufferBuilder fbb;
75 QVector<QueuedCommand *> commandQueue; 75 QVector<QSharedPointer<QueuedCommand>> commandQueue;
76 QMap<uint, QSharedPointer<QueuedCommand>> pendingCommands;
76 QMultiMap<uint, std::function<void(int error, const QString &errorMessage)> > resultHandler; 77 QMultiMap<uint, std::function<void(int error, const QString &errorMessage)> > resultHandler;
77 uint messageId; 78 uint messageId;
78}; 79};
@@ -136,13 +137,9 @@ Async::Job<void> ResourceAccess::sendCommand(int commandId)
136 } 137 }
137 f.setFinished(); 138 f.setFinished();
138 }; 139 };
140 d->commandQueue << QSharedPointer<QueuedCommand>::create(commandId, continuation);
139 if (isReady()) { 141 if (isReady()) {
140 log(QString("Sending command %1").arg(commandId)); 142 processCommandQueue();
141 d->messageId++;
142 registerCallback(d->messageId, continuation);
143 Commands::write(d->socket, d->messageId, commandId);
144 } else {
145 d->commandQueue << new QueuedCommand(commandId, continuation);
146 } 143 }
147 }); 144 });
148} 145}
@@ -160,14 +157,9 @@ Async::Job<void> ResourceAccess::sendCommand(int commandId, flatbuffers::FlatBu
160 } 157 }
161 }; 158 };
162 159
160 d->commandQueue << QSharedPointer<QueuedCommand>::create(commandId, buffer, callback);
163 if (isReady()) { 161 if (isReady()) {
164 //TODO: We probably always want to queue the command, so we can resend it in case something goes wrong 162 processCommandQueue();
165 d->messageId++;
166 log(QString("Sending command %1 with messageId %2").arg(commandId).arg(d->messageId));
167 registerCallback(d->messageId, callback);
168 Commands::write(d->socket, d->messageId, commandId, buffer.constData(), buffer.size());
169 } else {
170 d->commandQueue << new QueuedCommand(commandId, buffer, callback);
171 } 163 }
172 }); 164 });
173} 165}
@@ -202,6 +194,34 @@ void ResourceAccess::close()
202 d->socket->close(); 194 d->socket->close();
203} 195}
204 196
197void ResourceAccess::sendCommand(const QSharedPointer<QueuedCommand> &command)
198{
199 Q_ASSERT(isReady());
200 //TODO: we should have a timeout for commands
201 d->messageId++;
202 const auto messageId = d->messageId;
203 log(QString("Sending command %1 with messageId %2").arg(command->commandId).arg(d->messageId));
204 if (command->callback) {
205 registerCallback(d->messageId, [this, messageId, command](int number, QString foo) {
206 d->pendingCommands.remove(messageId);
207 command->callback(number, foo);
208 });
209 }
210 //Keep track of the command until we're sure it arrived
211 d->pendingCommands.insert(d->messageId, command);
212 Commands::write(d->socket, d->messageId, command->commandId, command->buffer.constData(), command->buffer.size());
213}
214
215void ResourceAccess::processCommandQueue()
216{
217 //TODO: serialize instead of blast them all through the socket?
218 log(QString("We have %1 queued commands").arg(d->commandQueue.size()));
219 for (auto command: d->commandQueue) {
220 sendCommand(command);
221 }
222 d->commandQueue.clear();
223}
224
205void ResourceAccess::connected() 225void ResourceAccess::connected()
206{ 226{
207 d->startingProcess = false; 227 d->startingProcess = false;
@@ -221,19 +241,7 @@ void ResourceAccess::connected()
221 d->fbb.Clear(); 241 d->fbb.Clear();
222 } 242 }
223 243
224 //TODO: should confirm the commands made it with a response? 244 processCommandQueue();
225 //TODO: serialize instead of blast them all through the socket?
226 log(QString("We have %1 queued commands").arg(d->commandQueue.size()));
227 for (QueuedCommand *command: d->commandQueue) {
228 d->messageId++;
229 log(QString("Sending command %1 with messageId %2").arg(command->commandId).arg(d->messageId));
230 if (command->callback) {
231 registerCallback(d->messageId, command->callback);
232 }
233 Commands::write(d->socket, d->messageId, command->commandId, command->buffer.constData(), command->buffer.size());
234 delete command;
235 }
236 d->commandQueue.clear();
237 245
238 emit ready(true); 246 emit ready(true);
239} 247}