diff options
author | Christian Mollekopf <chrigi_1@fastmail.fm> | 2015-04-12 22:47:50 +0200 |
---|---|---|
committer | Christian Mollekopf <chrigi_1@fastmail.fm> | 2015-04-12 22:47:50 +0200 |
commit | 4652a39fc6869fc5af46367c35027b2b53478268 (patch) | |
tree | a13626626ef5b057630c68f09e0a925ed13bb0f9 | |
parent | 5e5b1e77252ae1417b339a01bba3ed1fe5705629 (diff) | |
download | sink-4652a39fc6869fc5af46367c35027b2b53478268.tar.gz sink-4652a39fc6869fc5af46367c35027b2b53478268.zip |
Always queue commands in resourceaccess.
We want to keep the command until we know it arrived in the resource,
so we can resend it otherwise.
-rw-r--r-- | common/resourceaccess.cpp | 62 | ||||
-rw-r--r-- | common/resourceaccess.h | 5 |
2 files changed, 40 insertions, 27 deletions
diff --git a/common/resourceaccess.cpp b/common/resourceaccess.cpp index 7a343f9..9ecdab1 100644 --- a/common/resourceaccess.cpp +++ b/common/resourceaccess.cpp | |||
@@ -72,7 +72,8 @@ public: | |||
72 | bool openingConnection; | 72 | bool openingConnection; |
73 | QByteArray partialMessageBuffer; | 73 | QByteArray partialMessageBuffer; |
74 | flatbuffers::FlatBufferBuilder fbb; | 74 | flatbuffers::FlatBufferBuilder fbb; |
75 | QVector<QueuedCommand *> commandQueue; | 75 | QVector<QSharedPointer<QueuedCommand>> commandQueue; |
76 | QMap<uint, QSharedPointer<QueuedCommand>> pendingCommands; | ||
76 | QMultiMap<uint, std::function<void(int error, const QString &errorMessage)> > resultHandler; | 77 | QMultiMap<uint, std::function<void(int error, const QString &errorMessage)> > resultHandler; |
77 | uint messageId; | 78 | uint messageId; |
78 | }; | 79 | }; |
@@ -136,13 +137,9 @@ Async::Job<void> ResourceAccess::sendCommand(int commandId) | |||
136 | } | 137 | } |
137 | f.setFinished(); | 138 | f.setFinished(); |
138 | }; | 139 | }; |
140 | d->commandQueue << QSharedPointer<QueuedCommand>::create(commandId, continuation); | ||
139 | if (isReady()) { | 141 | if (isReady()) { |
140 | log(QString("Sending command %1").arg(commandId)); | 142 | processCommandQueue(); |
141 | d->messageId++; | ||
142 | registerCallback(d->messageId, continuation); | ||
143 | Commands::write(d->socket, d->messageId, commandId); | ||
144 | } else { | ||
145 | d->commandQueue << new QueuedCommand(commandId, continuation); | ||
146 | } | 143 | } |
147 | }); | 144 | }); |
148 | } | 145 | } |
@@ -160,14 +157,9 @@ Async::Job<void> ResourceAccess::sendCommand(int commandId, flatbuffers::FlatBu | |||
160 | } | 157 | } |
161 | }; | 158 | }; |
162 | 159 | ||
160 | d->commandQueue << QSharedPointer<QueuedCommand>::create(commandId, buffer, callback); | ||
163 | if (isReady()) { | 161 | if (isReady()) { |
164 | //TODO: We probably always want to queue the command, so we can resend it in case something goes wrong | 162 | processCommandQueue(); |
165 | d->messageId++; | ||
166 | log(QString("Sending command %1 with messageId %2").arg(commandId).arg(d->messageId)); | ||
167 | registerCallback(d->messageId, callback); | ||
168 | Commands::write(d->socket, d->messageId, commandId, buffer.constData(), buffer.size()); | ||
169 | } else { | ||
170 | d->commandQueue << new QueuedCommand(commandId, buffer, callback); | ||
171 | } | 163 | } |
172 | }); | 164 | }); |
173 | } | 165 | } |
@@ -202,6 +194,34 @@ void ResourceAccess::close() | |||
202 | d->socket->close(); | 194 | d->socket->close(); |
203 | } | 195 | } |
204 | 196 | ||
197 | void ResourceAccess::sendCommand(const QSharedPointer<QueuedCommand> &command) | ||
198 | { | ||
199 | Q_ASSERT(isReady()); | ||
200 | //TODO: we should have a timeout for commands | ||
201 | d->messageId++; | ||
202 | const auto messageId = d->messageId; | ||
203 | log(QString("Sending command %1 with messageId %2").arg(command->commandId).arg(d->messageId)); | ||
204 | if (command->callback) { | ||
205 | registerCallback(d->messageId, [this, messageId, command](int number, QString foo) { | ||
206 | d->pendingCommands.remove(messageId); | ||
207 | command->callback(number, foo); | ||
208 | }); | ||
209 | } | ||
210 | //Keep track of the command until we're sure it arrived | ||
211 | d->pendingCommands.insert(d->messageId, command); | ||
212 | Commands::write(d->socket, d->messageId, command->commandId, command->buffer.constData(), command->buffer.size()); | ||
213 | } | ||
214 | |||
215 | void ResourceAccess::processCommandQueue() | ||
216 | { | ||
217 | //TODO: serialize instead of blast them all through the socket? | ||
218 | log(QString("We have %1 queued commands").arg(d->commandQueue.size())); | ||
219 | for (auto command: d->commandQueue) { | ||
220 | sendCommand(command); | ||
221 | } | ||
222 | d->commandQueue.clear(); | ||
223 | } | ||
224 | |||
205 | void ResourceAccess::connected() | 225 | void ResourceAccess::connected() |
206 | { | 226 | { |
207 | d->startingProcess = false; | 227 | d->startingProcess = false; |
@@ -221,19 +241,7 @@ void ResourceAccess::connected() | |||
221 | d->fbb.Clear(); | 241 | d->fbb.Clear(); |
222 | } | 242 | } |
223 | 243 | ||
224 | //TODO: should confirm the commands made it with a response? | 244 | processCommandQueue(); |
225 | //TODO: serialize instead of blast them all through the socket? | ||
226 | log(QString("We have %1 queued commands").arg(d->commandQueue.size())); | ||
227 | for (QueuedCommand *command: d->commandQueue) { | ||
228 | d->messageId++; | ||
229 | log(QString("Sending command %1 with messageId %2").arg(command->commandId).arg(d->messageId)); | ||
230 | if (command->callback) { | ||
231 | registerCallback(d->messageId, command->callback); | ||
232 | } | ||
233 | Commands::write(d->socket, d->messageId, command->commandId, command->buffer.constData(), command->buffer.size()); | ||
234 | delete command; | ||
235 | } | ||
236 | d->commandQueue.clear(); | ||
237 | 245 | ||
238 | emit ready(true); | 246 | emit ready(true); |
239 | } | 247 | } |
diff --git a/common/resourceaccess.h b/common/resourceaccess.h index 088bf36..4c9d9d2 100644 --- a/common/resourceaccess.h +++ b/common/resourceaccess.h | |||
@@ -30,6 +30,8 @@ | |||
30 | namespace Akonadi2 | 30 | namespace Akonadi2 |
31 | { | 31 | { |
32 | 32 | ||
33 | class QueuedCommand; | ||
34 | |||
33 | class ResourceAccess : public QObject | 35 | class ResourceAccess : public QObject |
34 | { | 36 | { |
35 | Q_OBJECT | 37 | Q_OBJECT |
@@ -68,6 +70,9 @@ private: | |||
68 | void registerCallback(uint messageId, const std::function<void(int error, const QString &)> &callback); | 70 | void registerCallback(uint messageId, const std::function<void(int error, const QString &)> &callback); |
69 | void startResourceAndConnect(); | 71 | void startResourceAndConnect(); |
70 | 72 | ||
73 | void sendCommand(const QSharedPointer<QueuedCommand> &command); | ||
74 | void processCommandQueue(); | ||
75 | |||
71 | class Private; | 76 | class Private; |
72 | Private * const d; | 77 | Private * const d; |
73 | }; | 78 | }; |