summaryrefslogtreecommitdiffstats
path: root/common/resourceaccess.cpp
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2014-12-28 14:44:50 +0100
committerChristian Mollekopf <chrigi_1@fastmail.fm>2014-12-28 14:44:50 +0100
commit9b2257d680a5e4fa2fda8cf3302f25054a06710e (patch)
tree9abaf141018eb83d26ce07f5bd0e9436003ce732 /common/resourceaccess.cpp
parentc83c2ef64b5a1e4b1dc0102df36687caebb96ff0 (diff)
downloadsink-9b2257d680a5e4fa2fda8cf3302f25054a06710e.tar.gz
sink-9b2257d680a5e4fa2fda8cf3302f25054a06710e.zip
Buffers wrapped into entity buffer, async command progress tracking.
Diffstat (limited to 'common/resourceaccess.cpp')
-rw-r--r--common/resourceaccess.cpp96
1 files changed, 58 insertions, 38 deletions
diff --git a/common/resourceaccess.cpp b/common/resourceaccess.cpp
index 1706ac4..31b9e79 100644
--- a/common/resourceaccess.cpp
+++ b/common/resourceaccess.cpp
@@ -37,38 +37,36 @@ namespace Akonadi2
37class QueuedCommand 37class QueuedCommand
38{ 38{
39public: 39public:
40 QueuedCommand(int commandId) 40 QueuedCommand(int commandId, const std::function<void()> &callback)
41 : m_commandId(commandId), 41 : commandId(commandId),
42 m_bufferSize(0), 42 bufferSize(0),
43 m_buffer(0) 43 buffer(0),
44 callback(callback)
44 {} 45 {}
45 46
46 QueuedCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb) 47 QueuedCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb, const std::function<void()> &callback)
47 : m_commandId(commandId), 48 : commandId(commandId),
48 m_bufferSize(fbb.GetSize()), 49 bufferSize(fbb.GetSize()),
49 m_buffer(new char[m_bufferSize]) 50 buffer(new char[bufferSize]),
51 callback(callback)
50 { 52 {
51 memcpy(m_buffer, fbb.GetBufferPointer(), m_bufferSize); 53 memcpy(buffer, fbb.GetBufferPointer(), bufferSize);
52 } 54 }
53 55
54 ~QueuedCommand() 56 ~QueuedCommand()
55 { 57 {
56 delete[] m_buffer; 58 delete[] buffer;
57 }
58
59 void write(QIODevice *device, uint messageId)
60 {
61 // Console::main()->log(QString("\tSending queued command %1").arg(m_commandId));
62 Commands::write(device, messageId, m_commandId, m_buffer, m_bufferSize);
63 } 59 }
64 60
65private: 61private:
66 QueuedCommand(const QueuedCommand &other); 62 QueuedCommand(const QueuedCommand &other);
67 QueuedCommand &operator=(const QueuedCommand &rhs); 63 QueuedCommand &operator=(const QueuedCommand &rhs);
68 64
69 const int m_commandId; 65public:
70 const uint m_bufferSize; 66 const int commandId;
71 char *m_buffer; 67 const uint bufferSize;
68 char *buffer;
69 std::function<void()> callback;
72}; 70};
73 71
74class ResourceAccess::Private 72class ResourceAccess::Private
@@ -82,7 +80,7 @@ public:
82 QByteArray partialMessageBuffer; 80 QByteArray partialMessageBuffer;
83 flatbuffers::FlatBufferBuilder fbb; 81 flatbuffers::FlatBufferBuilder fbb;
84 QVector<QueuedCommand *> commandQueue; 82 QVector<QueuedCommand *> commandQueue;
85 QVector<std::function<void()> > synchronizeResultHandler; 83 QMultiMap<uint, std::function<void()> > resultHandler;
86 uint messageId; 84 uint messageId;
87}; 85};
88 86
@@ -130,31 +128,42 @@ bool ResourceAccess::isReady() const
130 return d->socket->isValid(); 128 return d->socket->isValid();
131} 129}
132 130
133void ResourceAccess::sendCommand(int commandId) 131void ResourceAccess::registerCallback(uint messageId, const std::function<void()> &callback)
132{
133 d->resultHandler.insert(messageId, callback);
134}
135
136void ResourceAccess::sendCommand(int commandId, const std::function<void()> &callback)
134{ 137{
135 if (isReady()) { 138 if (isReady()) {
136 log(QString("Sending command %1").arg(commandId)); 139 log(QString("Sending command %1").arg(commandId));
137 Commands::write(d->socket, ++d->messageId, commandId); 140 d->messageId++;
141 if (callback) {
142 registerCallback(d->messageId, callback);
143 }
144 Commands::write(d->socket, d->messageId, commandId);
138 } else { 145 } else {
139 d->commandQueue << new QueuedCommand(commandId); 146 d->commandQueue << new QueuedCommand(commandId, callback);
140 } 147 }
141} 148}
142 149
143void ResourceAccess::sendCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb) 150void ResourceAccess::sendCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb, const std::function<void()> &callback)
144{ 151{
145 if (isReady()) { 152 if (isReady()) {
146 log(QString("Sending command %1").arg(commandId)); 153 log(QString("Sending command %1").arg(commandId));
147 Commands::write(d->socket, ++d->messageId, commandId, fbb); 154 d->messageId++;
155 if (callback) {
156 registerCallback(d->messageId, callback);
157 }
158 Commands::write(d->socket, d->messageId, commandId, fbb);
148 } else { 159 } else {
149 d->commandQueue << new QueuedCommand(commandId, fbb); 160 d->commandQueue << new QueuedCommand(commandId, fbb, callback);
150 } 161 }
151} 162}
152 163
153void ResourceAccess::synchronizeResource(const std::function<void()> &resultHandler) 164void ResourceAccess::synchronizeResource(const std::function<void()> &resultHandler)
154{ 165{
155 sendCommand(Commands::SynchronizeCommand); 166 sendCommand(Commands::SynchronizeCommand, resultHandler);
156 //TODO: this should be implemented as a job, so we don't need to store the result handler as member
157 d->synchronizeResultHandler << resultHandler;
158} 167}
159 168
160void ResourceAccess::open() 169void ResourceAccess::open()
@@ -200,7 +209,12 @@ void ResourceAccess::connected()
200 //TODO: serialize instead of blast them all through the socket? 209 //TODO: serialize instead of blast them all through the socket?
201 log(QString("We have %1 queued commands").arg(d->commandQueue.size())); 210 log(QString("We have %1 queued commands").arg(d->commandQueue.size()));
202 for (QueuedCommand *command: d->commandQueue) { 211 for (QueuedCommand *command: d->commandQueue) {
203 command->write(d->socket, ++d->messageId); 212 d->messageId++;
213 log(QString("Sending command %1").arg(command->commandId));
214 if (command->callback) {
215 registerCallback(d->messageId, command->callback);
216 }
217 Commands::write(d->socket, d->messageId, command->commandId, command->buffer, command->bufferSize);
204 delete command; 218 delete command;
205 } 219 }
206 d->commandQueue.clear(); 220 d->commandQueue.clear();
@@ -234,6 +248,8 @@ void ResourceAccess::connectionError(QLocalSocket::LocalSocketError error)
234 if (!d->tryOpenTimer->isActive()) { 248 if (!d->tryOpenTimer->isActive()) {
235 d->tryOpenTimer->start(); 249 d->tryOpenTimer->start();
236 } 250 }
251 } else {
252 qWarning() << "Failed to start resource";
237 } 253 }
238} 254}
239 255
@@ -256,8 +272,7 @@ bool ResourceAccess::processMessageBuffer()
256 return false; 272 return false;
257 } 273 }
258 274
259 //messageId is unused, so commented out 275 const uint messageId = *(int*)(d->partialMessageBuffer.constData());
260 //const uint messageId = *(int*)(d->partialMessageBuffer.constData());
261 const int commandId = *(int*)(d->partialMessageBuffer.constData() + sizeof(uint)); 276 const int commandId = *(int*)(d->partialMessageBuffer.constData() + sizeof(uint));
262 const uint size = *(int*)(d->partialMessageBuffer.constData() + sizeof(int) + sizeof(uint)); 277 const uint size = *(int*)(d->partialMessageBuffer.constData() + sizeof(int) + sizeof(uint));
263 278
@@ -271,18 +286,15 @@ bool ResourceAccess::processMessageBuffer()
271 log(QString("Revision updated to: %1").arg(buffer->revision())); 286 log(QString("Revision updated to: %1").arg(buffer->revision()));
272 emit revisionChanged(buffer->revision()); 287 emit revisionChanged(buffer->revision());
273 288
274 //FIXME: The result handler should be called on completion of the synchronize command, and not upon arbitrary revision updates.
275 for(auto handler : d->synchronizeResultHandler) {
276 //FIXME: we should associate the handler with a buffer->id() to avoid prematurely triggering the result handler from a delayed synchronized response (this is relevant for on-demand syncing).
277 handler();
278 }
279 d->synchronizeResultHandler.clear();
280 break; 289 break;
281 } 290 }
282 case Commands::CommandCompletion: { 291 case Commands::CommandCompletion: {
283 auto buffer = GetCommandCompletion(d->partialMessageBuffer.constData() + headerSize); 292 auto buffer = GetCommandCompletion(d->partialMessageBuffer.constData() + headerSize);
284 log(QString("Command %1 completed %2").arg(buffer->id()).arg(buffer->success() ? "sucessfully" : "unsuccessfully")); 293 log(QString("Command %1 completed %2").arg(buffer->id()).arg(buffer->success() ? "sucessfully" : "unsuccessfully"));
285 //TODO: if a queued command, get it out of the queue ... pass on completion ot the relevant objects .. etc 294 //TODO: if a queued command, get it out of the queue ... pass on completion ot the relevant objects .. etc
295
296 //The callbacks can result in this object getting destroyed directly, so we need to ensure we finish our work first
297 QMetaObject::invokeMethod(this, "callCallbacks", Qt::QueuedConnection, QGenericReturnArgument(), Q_ARG(int, buffer->id()));
286 break; 298 break;
287 } 299 }
288 default: 300 default:
@@ -293,6 +305,14 @@ bool ResourceAccess::processMessageBuffer()
293 return d->partialMessageBuffer.size() >= headerSize; 305 return d->partialMessageBuffer.size() >= headerSize;
294} 306}
295 307
308void ResourceAccess::callCallbacks(int id)
309{
310 for(auto handler : d->resultHandler.values(id)) {
311 handler();
312 }
313 d->resultHandler.remove(id);
314}
315
296void ResourceAccess::log(const QString &message) 316void ResourceAccess::log(const QString &message)
297{ 317{
298 qDebug() << d->resourceName + ": " + message; 318 qDebug() << d->resourceName + ": " + message;