diff options
Diffstat (limited to 'common/resourceaccess.cpp')
-rw-r--r-- | common/resourceaccess.cpp | 96 |
1 files changed, 58 insertions, 38 deletions
diff --git a/common/resourceaccess.cpp b/common/resourceaccess.cpp index 1706ac4..31b9e79 100644 --- a/common/resourceaccess.cpp +++ b/common/resourceaccess.cpp | |||
@@ -37,38 +37,36 @@ namespace Akonadi2 | |||
37 | class QueuedCommand | 37 | class QueuedCommand |
38 | { | 38 | { |
39 | public: | 39 | public: |
40 | QueuedCommand(int commandId) | 40 | QueuedCommand(int commandId, const std::function<void()> &callback) |
41 | : m_commandId(commandId), | 41 | : commandId(commandId), |
42 | m_bufferSize(0), | 42 | bufferSize(0), |
43 | m_buffer(0) | 43 | buffer(0), |
44 | callback(callback) | ||
44 | {} | 45 | {} |
45 | 46 | ||
46 | QueuedCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb) | 47 | QueuedCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb, const std::function<void()> &callback) |
47 | : m_commandId(commandId), | 48 | : commandId(commandId), |
48 | m_bufferSize(fbb.GetSize()), | 49 | bufferSize(fbb.GetSize()), |
49 | m_buffer(new char[m_bufferSize]) | 50 | buffer(new char[bufferSize]), |
51 | callback(callback) | ||
50 | { | 52 | { |
51 | memcpy(m_buffer, fbb.GetBufferPointer(), m_bufferSize); | 53 | memcpy(buffer, fbb.GetBufferPointer(), bufferSize); |
52 | } | 54 | } |
53 | 55 | ||
54 | ~QueuedCommand() | 56 | ~QueuedCommand() |
55 | { | 57 | { |
56 | delete[] m_buffer; | 58 | delete[] buffer; |
57 | } | ||
58 | |||
59 | void write(QIODevice *device, uint messageId) | ||
60 | { | ||
61 | // Console::main()->log(QString("\tSending queued command %1").arg(m_commandId)); | ||
62 | Commands::write(device, messageId, m_commandId, m_buffer, m_bufferSize); | ||
63 | } | 59 | } |
64 | 60 | ||
65 | private: | 61 | private: |
66 | QueuedCommand(const QueuedCommand &other); | 62 | QueuedCommand(const QueuedCommand &other); |
67 | QueuedCommand &operator=(const QueuedCommand &rhs); | 63 | QueuedCommand &operator=(const QueuedCommand &rhs); |
68 | 64 | ||
69 | const int m_commandId; | 65 | public: |
70 | const uint m_bufferSize; | 66 | const int commandId; |
71 | char *m_buffer; | 67 | const uint bufferSize; |
68 | char *buffer; | ||
69 | std::function<void()> callback; | ||
72 | }; | 70 | }; |
73 | 71 | ||
74 | class ResourceAccess::Private | 72 | class ResourceAccess::Private |
@@ -82,7 +80,7 @@ public: | |||
82 | QByteArray partialMessageBuffer; | 80 | QByteArray partialMessageBuffer; |
83 | flatbuffers::FlatBufferBuilder fbb; | 81 | flatbuffers::FlatBufferBuilder fbb; |
84 | QVector<QueuedCommand *> commandQueue; | 82 | QVector<QueuedCommand *> commandQueue; |
85 | QVector<std::function<void()> > synchronizeResultHandler; | 83 | QMultiMap<uint, std::function<void()> > resultHandler; |
86 | uint messageId; | 84 | uint messageId; |
87 | }; | 85 | }; |
88 | 86 | ||
@@ -130,31 +128,42 @@ bool ResourceAccess::isReady() const | |||
130 | return d->socket->isValid(); | 128 | return d->socket->isValid(); |
131 | } | 129 | } |
132 | 130 | ||
133 | void ResourceAccess::sendCommand(int commandId) | 131 | void ResourceAccess::registerCallback(uint messageId, const std::function<void()> &callback) |
132 | { | ||
133 | d->resultHandler.insert(messageId, callback); | ||
134 | } | ||
135 | |||
136 | void ResourceAccess::sendCommand(int commandId, const std::function<void()> &callback) | ||
134 | { | 137 | { |
135 | if (isReady()) { | 138 | if (isReady()) { |
136 | log(QString("Sending command %1").arg(commandId)); | 139 | log(QString("Sending command %1").arg(commandId)); |
137 | Commands::write(d->socket, ++d->messageId, commandId); | 140 | d->messageId++; |
141 | if (callback) { | ||
142 | registerCallback(d->messageId, callback); | ||
143 | } | ||
144 | Commands::write(d->socket, d->messageId, commandId); | ||
138 | } else { | 145 | } else { |
139 | d->commandQueue << new QueuedCommand(commandId); | 146 | d->commandQueue << new QueuedCommand(commandId, callback); |
140 | } | 147 | } |
141 | } | 148 | } |
142 | 149 | ||
143 | void ResourceAccess::sendCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb) | 150 | void ResourceAccess::sendCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb, const std::function<void()> &callback) |
144 | { | 151 | { |
145 | if (isReady()) { | 152 | if (isReady()) { |
146 | log(QString("Sending command %1").arg(commandId)); | 153 | log(QString("Sending command %1").arg(commandId)); |
147 | Commands::write(d->socket, ++d->messageId, commandId, fbb); | 154 | d->messageId++; |
155 | if (callback) { | ||
156 | registerCallback(d->messageId, callback); | ||
157 | } | ||
158 | Commands::write(d->socket, d->messageId, commandId, fbb); | ||
148 | } else { | 159 | } else { |
149 | d->commandQueue << new QueuedCommand(commandId, fbb); | 160 | d->commandQueue << new QueuedCommand(commandId, fbb, callback); |
150 | } | 161 | } |
151 | } | 162 | } |
152 | 163 | ||
153 | void ResourceAccess::synchronizeResource(const std::function<void()> &resultHandler) | 164 | void ResourceAccess::synchronizeResource(const std::function<void()> &resultHandler) |
154 | { | 165 | { |
155 | sendCommand(Commands::SynchronizeCommand); | 166 | sendCommand(Commands::SynchronizeCommand, resultHandler); |
156 | //TODO: this should be implemented as a job, so we don't need to store the result handler as member | ||
157 | d->synchronizeResultHandler << resultHandler; | ||
158 | } | 167 | } |
159 | 168 | ||
160 | void ResourceAccess::open() | 169 | void ResourceAccess::open() |
@@ -200,7 +209,12 @@ void ResourceAccess::connected() | |||
200 | //TODO: serialize instead of blast them all through the socket? | 209 | //TODO: serialize instead of blast them all through the socket? |
201 | log(QString("We have %1 queued commands").arg(d->commandQueue.size())); | 210 | log(QString("We have %1 queued commands").arg(d->commandQueue.size())); |
202 | for (QueuedCommand *command: d->commandQueue) { | 211 | for (QueuedCommand *command: d->commandQueue) { |
203 | command->write(d->socket, ++d->messageId); | 212 | d->messageId++; |
213 | log(QString("Sending command %1").arg(command->commandId)); | ||
214 | if (command->callback) { | ||
215 | registerCallback(d->messageId, command->callback); | ||
216 | } | ||
217 | Commands::write(d->socket, d->messageId, command->commandId, command->buffer, command->bufferSize); | ||
204 | delete command; | 218 | delete command; |
205 | } | 219 | } |
206 | d->commandQueue.clear(); | 220 | d->commandQueue.clear(); |
@@ -234,6 +248,8 @@ void ResourceAccess::connectionError(QLocalSocket::LocalSocketError error) | |||
234 | if (!d->tryOpenTimer->isActive()) { | 248 | if (!d->tryOpenTimer->isActive()) { |
235 | d->tryOpenTimer->start(); | 249 | d->tryOpenTimer->start(); |
236 | } | 250 | } |
251 | } else { | ||
252 | qWarning() << "Failed to start resource"; | ||
237 | } | 253 | } |
238 | } | 254 | } |
239 | 255 | ||
@@ -256,8 +272,7 @@ bool ResourceAccess::processMessageBuffer() | |||
256 | return false; | 272 | return false; |
257 | } | 273 | } |
258 | 274 | ||
259 | //messageId is unused, so commented out | 275 | const uint messageId = *(int*)(d->partialMessageBuffer.constData()); |
260 | //const uint messageId = *(int*)(d->partialMessageBuffer.constData()); | ||
261 | const int commandId = *(int*)(d->partialMessageBuffer.constData() + sizeof(uint)); | 276 | const int commandId = *(int*)(d->partialMessageBuffer.constData() + sizeof(uint)); |
262 | const uint size = *(int*)(d->partialMessageBuffer.constData() + sizeof(int) + sizeof(uint)); | 277 | const uint size = *(int*)(d->partialMessageBuffer.constData() + sizeof(int) + sizeof(uint)); |
263 | 278 | ||
@@ -271,18 +286,15 @@ bool ResourceAccess::processMessageBuffer() | |||
271 | log(QString("Revision updated to: %1").arg(buffer->revision())); | 286 | log(QString("Revision updated to: %1").arg(buffer->revision())); |
272 | emit revisionChanged(buffer->revision()); | 287 | emit revisionChanged(buffer->revision()); |
273 | 288 | ||
274 | //FIXME: The result handler should be called on completion of the synchronize command, and not upon arbitrary revision updates. | ||
275 | for(auto handler : d->synchronizeResultHandler) { | ||
276 | //FIXME: we should associate the handler with a buffer->id() to avoid prematurely triggering the result handler from a delayed synchronized response (this is relevant for on-demand syncing). | ||
277 | handler(); | ||
278 | } | ||
279 | d->synchronizeResultHandler.clear(); | ||
280 | break; | 289 | break; |
281 | } | 290 | } |
282 | case Commands::CommandCompletion: { | 291 | case Commands::CommandCompletion: { |
283 | auto buffer = GetCommandCompletion(d->partialMessageBuffer.constData() + headerSize); | 292 | auto buffer = GetCommandCompletion(d->partialMessageBuffer.constData() + headerSize); |
284 | log(QString("Command %1 completed %2").arg(buffer->id()).arg(buffer->success() ? "sucessfully" : "unsuccessfully")); | 293 | log(QString("Command %1 completed %2").arg(buffer->id()).arg(buffer->success() ? "sucessfully" : "unsuccessfully")); |
285 | //TODO: if a queued command, get it out of the queue ... pass on completion ot the relevant objects .. etc | 294 | //TODO: if a queued command, get it out of the queue ... pass on completion ot the relevant objects .. etc |
295 | |||
296 | //The callbacks can result in this object getting destroyed directly, so we need to ensure we finish our work first | ||
297 | QMetaObject::invokeMethod(this, "callCallbacks", Qt::QueuedConnection, QGenericReturnArgument(), Q_ARG(int, buffer->id())); | ||
286 | break; | 298 | break; |
287 | } | 299 | } |
288 | default: | 300 | default: |
@@ -293,6 +305,14 @@ bool ResourceAccess::processMessageBuffer() | |||
293 | return d->partialMessageBuffer.size() >= headerSize; | 305 | return d->partialMessageBuffer.size() >= headerSize; |
294 | } | 306 | } |
295 | 307 | ||
308 | void ResourceAccess::callCallbacks(int id) | ||
309 | { | ||
310 | for(auto handler : d->resultHandler.values(id)) { | ||
311 | handler(); | ||
312 | } | ||
313 | d->resultHandler.remove(id); | ||
314 | } | ||
315 | |||
296 | void ResourceAccess::log(const QString &message) | 316 | void ResourceAccess::log(const QString &message) |
297 | { | 317 | { |
298 | qDebug() << d->resourceName + ": " + message; | 318 | qDebug() << d->resourceName + ": " + message; |