diff options
author | Christian Mollekopf <chrigi_1@fastmail.fm> | 2015-01-18 10:51:34 +0100 |
---|---|---|
committer | Christian Mollekopf <chrigi_1@fastmail.fm> | 2015-01-18 10:51:34 +0100 |
commit | 47b4442c585a25b2e4b857f2d9e3ab371d942c19 (patch) | |
tree | 4167c6cb75e3ee8072452585f1b433fb637e3389 /common/resourceaccess.cpp | |
parent | aef2ebc45a30d3c3b15b630648e8b37a551ce1ef (diff) | |
download | sink-47b4442c585a25b2e4b857f2d9e3ab371d942c19.tar.gz sink-47b4442c585a25b2e4b857f2d9e3ab371d942c19.zip |
Use jobs to track progress of write commands.
Diffstat (limited to 'common/resourceaccess.cpp')
-rw-r--r-- | common/resourceaccess.cpp | 63 |
1 files changed, 42 insertions, 21 deletions
diff --git a/common/resourceaccess.cpp b/common/resourceaccess.cpp index 9fb0d4c..7b13101 100644 --- a/common/resourceaccess.cpp +++ b/common/resourceaccess.cpp | |||
@@ -133,41 +133,62 @@ void ResourceAccess::registerCallback(uint messageId, const std::function<void() | |||
133 | d->resultHandler.insert(messageId, callback); | 133 | d->resultHandler.insert(messageId, callback); |
134 | } | 134 | } |
135 | 135 | ||
136 | void ResourceAccess::sendCommand(int commandId, const std::function<void()> &callback) | 136 | Async::Job<void> ResourceAccess::sendCommand(int commandId) |
137 | { | 137 | { |
138 | if (isReady()) { | 138 | return Async::start<void>([this, commandId](Async::Future<void> &f) { |
139 | log(QString("Sending command %1").arg(commandId)); | 139 | if (isReady()) { |
140 | d->messageId++; | 140 | log(QString("Sending command %1").arg(commandId)); |
141 | d->messageId++; | ||
142 | registerCallback(d->messageId, [&f]() { f.setFinished(); }); | ||
143 | Commands::write(d->socket, d->messageId, commandId); | ||
144 | } else { | ||
145 | d->commandQueue << new QueuedCommand(commandId, [&f]() { f.setFinished(); }); | ||
146 | } | ||
147 | }); | ||
148 | } | ||
149 | |||
150 | struct JobFinisher { | ||
151 | bool finished; | ||
152 | std::function<void()> callback; | ||
153 | |||
154 | JobFinisher() : finished(false) {} | ||
155 | |||
156 | void setFinished() { | ||
157 | finished = true; | ||
141 | if (callback) { | 158 | if (callback) { |
142 | registerCallback(d->messageId, callback); | 159 | callback(); |
143 | } | 160 | } |
144 | Commands::write(d->socket, d->messageId, commandId); | ||
145 | } else { | ||
146 | d->commandQueue << new QueuedCommand(commandId, callback); | ||
147 | } | 161 | } |
148 | } | 162 | }; |
149 | 163 | ||
150 | void ResourceAccess::sendCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb, const std::function<void()> &callback) | 164 | Async::Job<void> ResourceAccess::sendCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb) |
151 | { | 165 | { |
166 | auto finisher = QSharedPointer<JobFinisher>::create(); | ||
167 | auto callback = [finisher] () { | ||
168 | finisher->setFinished(); | ||
169 | }; | ||
152 | if (isReady()) { | 170 | if (isReady()) { |
153 | log(QString("Sending command %1").arg(commandId)); | ||
154 | d->messageId++; | 171 | d->messageId++; |
155 | if (callback) { | 172 | log(QString("Sending command %1 with messageId %2").arg(commandId).arg(d->messageId)); |
156 | registerCallback(d->messageId, callback); | 173 | registerCallback(d->messageId, callback); |
157 | } | ||
158 | Commands::write(d->socket, d->messageId, commandId, fbb); | 174 | Commands::write(d->socket, d->messageId, commandId, fbb); |
159 | } else { | 175 | } else { |
160 | d->commandQueue << new QueuedCommand(commandId, fbb, callback); | 176 | d->commandQueue << new QueuedCommand(commandId, fbb, callback); |
161 | } | 177 | } |
178 | return Async::start<void>([this, finisher](Async::Future<void> &f) { | ||
179 | if (finisher->finished) { | ||
180 | f.setFinished(); | ||
181 | } else { | ||
182 | finisher->callback = [&f]() { | ||
183 | f.setFinished(); | ||
184 | }; | ||
185 | } | ||
186 | }); | ||
162 | } | 187 | } |
163 | 188 | ||
164 | Async::Job<void> ResourceAccess::synchronizeResource() | 189 | Async::Job<void> ResourceAccess::synchronizeResource() |
165 | { | 190 | { |
166 | return Async::start<void>([this](Async::Future<void> &f) { | 191 | return sendCommand(Commands::SynchronizeCommand); |
167 | sendCommand(Commands::SynchronizeCommand, [&f]() { | ||
168 | f.setFinished(); | ||
169 | }); | ||
170 | }); | ||
171 | } | 192 | } |
172 | 193 | ||
173 | void ResourceAccess::open() | 194 | void ResourceAccess::open() |
@@ -214,7 +235,7 @@ void ResourceAccess::connected() | |||
214 | log(QString("We have %1 queued commands").arg(d->commandQueue.size())); | 235 | log(QString("We have %1 queued commands").arg(d->commandQueue.size())); |
215 | for (QueuedCommand *command: d->commandQueue) { | 236 | for (QueuedCommand *command: d->commandQueue) { |
216 | d->messageId++; | 237 | d->messageId++; |
217 | log(QString("Sending command %1").arg(command->commandId)); | 238 | log(QString("Sending command %1 with messageId %2").arg(command->commandId).arg(d->messageId)); |
218 | if (command->callback) { | 239 | if (command->callback) { |
219 | registerCallback(d->messageId, command->callback); | 240 | registerCallback(d->messageId, command->callback); |
220 | } | 241 | } |
@@ -294,7 +315,7 @@ bool ResourceAccess::processMessageBuffer() | |||
294 | } | 315 | } |
295 | case Commands::CommandCompletion: { | 316 | case Commands::CommandCompletion: { |
296 | auto buffer = GetCommandCompletion(d->partialMessageBuffer.constData() + headerSize); | 317 | auto buffer = GetCommandCompletion(d->partialMessageBuffer.constData() + headerSize); |
297 | log(QString("Command %1 completed %2").arg(buffer->id()).arg(buffer->success() ? "sucessfully" : "unsuccessfully")); | 318 | log(QString("Command with messageId %1 completed %2").arg(buffer->id()).arg(buffer->success() ? "sucessfully" : "unsuccessfully")); |
298 | //TODO: if a queued command, get it out of the queue ... pass on completion ot the relevant objects .. etc | 319 | //TODO: if a queued command, get it out of the queue ... pass on completion ot the relevant objects .. etc |
299 | 320 | ||
300 | //The callbacks can result in this object getting destroyed directly, so we need to ensure we finish our work first | 321 | //The callbacks can result in this object getting destroyed directly, so we need to ensure we finish our work first |