diff options
Diffstat (limited to 'common/resourceaccess.cpp')
-rw-r--r-- | common/resourceaccess.cpp | 253 |
1 files changed, 119 insertions, 134 deletions
diff --git a/common/resourceaccess.cpp b/common/resourceaccess.cpp index 0716ae2..c8c8189 100644 --- a/common/resourceaccess.cpp +++ b/common/resourceaccess.cpp | |||
@@ -53,27 +53,20 @@ static void queuedInvoke(const std::function<void()> &f, QObject *context = 0) | |||
53 | { | 53 | { |
54 | auto timer = QSharedPointer<QTimer>::create(); | 54 | auto timer = QSharedPointer<QTimer>::create(); |
55 | timer->setSingleShot(true); | 55 | timer->setSingleShot(true); |
56 | QObject::connect(timer.data(), &QTimer::timeout, context, [f, timer]() { | 56 | QObject::connect(timer.data(), &QTimer::timeout, context, [f, timer]() { f(); }); |
57 | f(); | ||
58 | }); | ||
59 | timer->start(0); | 57 | timer->start(0); |
60 | } | 58 | } |
61 | 59 | ||
62 | namespace Sink | 60 | namespace Sink { |
63 | { | ||
64 | 61 | ||
65 | struct QueuedCommand | 62 | struct QueuedCommand |
66 | { | 63 | { |
67 | public: | 64 | public: |
68 | QueuedCommand(int commandId, const std::function<void(int, const QString &)> &callback) | 65 | QueuedCommand(int commandId, const std::function<void(int, const QString &)> &callback) : commandId(commandId), callback(callback) |
69 | : commandId(commandId), | 66 | { |
70 | callback(callback) | 67 | } |
71 | {} | 68 | |
72 | 69 | QueuedCommand(int commandId, const QByteArray &b, const std::function<void(int, const QString &)> &callback) : commandId(commandId), buffer(b), callback(callback) | |
73 | QueuedCommand(int commandId, const QByteArray &b, const std::function<void(int, const QString &)> &callback) | ||
74 | : commandId(commandId), | ||
75 | buffer(b), | ||
76 | callback(callback) | ||
77 | { | 70 | { |
78 | } | 71 | } |
79 | 72 | ||
@@ -102,17 +95,14 @@ public: | |||
102 | QByteArray partialMessageBuffer; | 95 | QByteArray partialMessageBuffer; |
103 | QVector<QSharedPointer<QueuedCommand>> commandQueue; | 96 | QVector<QSharedPointer<QueuedCommand>> commandQueue; |
104 | QMap<uint, QSharedPointer<QueuedCommand>> pendingCommands; | 97 | QMap<uint, QSharedPointer<QueuedCommand>> pendingCommands; |
105 | QMultiMap<uint, std::function<void(int error, const QString &errorMessage)> > resultHandler; | 98 | QMultiMap<uint, std::function<void(int error, const QString &errorMessage)>> resultHandler; |
106 | QSet<uint> completeCommands; | 99 | QSet<uint> completeCommands; |
107 | uint messageId; | 100 | uint messageId; |
108 | bool openingSocket; | 101 | bool openingSocket; |
109 | }; | 102 | }; |
110 | 103 | ||
111 | ResourceAccess::Private::Private(const QByteArray &name, const QByteArray &instanceIdentifier, ResourceAccess *q) | 104 | ResourceAccess::Private::Private(const QByteArray &name, const QByteArray &instanceIdentifier, ResourceAccess *q) |
112 | : resourceName(name), | 105 | : resourceName(name), resourceInstanceIdentifier(instanceIdentifier), messageId(0), openingSocket(false) |
113 | resourceInstanceIdentifier(instanceIdentifier), | ||
114 | messageId(0), | ||
115 | openingSocket(false) | ||
116 | { | 106 | { |
117 | } | 107 | } |
118 | 108 | ||
@@ -124,7 +114,7 @@ void ResourceAccess::Private::abortPendingOperations() | |||
124 | } | 114 | } |
125 | auto handlers = resultHandler.values(); | 115 | auto handlers = resultHandler.values(); |
126 | resultHandler.clear(); | 116 | resultHandler.clear(); |
127 | for(auto handler : handlers) { | 117 | for (auto handler : handlers) { |
128 | handler(1, "The resource closed unexpectedly"); | 118 | handler(1, "The resource closed unexpectedly"); |
129 | } | 119 | } |
130 | } | 120 | } |
@@ -132,20 +122,20 @@ void ResourceAccess::Private::abortPendingOperations() | |||
132 | void ResourceAccess::Private::callCallbacks() | 122 | void ResourceAccess::Private::callCallbacks() |
133 | { | 123 | { |
134 | for (auto id : completeCommands) { | 124 | for (auto id : completeCommands) { |
135 | //We remove the callbacks first because the handler can kill resourceaccess directly | 125 | // We remove the callbacks first because the handler can kill resourceaccess directly |
136 | const auto callbacks = resultHandler.values(id); | 126 | const auto callbacks = resultHandler.values(id); |
137 | resultHandler.remove(id); | 127 | resultHandler.remove(id); |
138 | for(auto handler : callbacks) { | 128 | for (auto handler : callbacks) { |
139 | handler(0, QString()); | 129 | handler(0, QString()); |
140 | } | 130 | } |
141 | } | 131 | } |
142 | } | 132 | } |
143 | 133 | ||
144 | //Connects to server and returns connected socket on success | 134 | // Connects to server and returns connected socket on success |
145 | KAsync::Job<QSharedPointer<QLocalSocket> > ResourceAccess::connectToServer(const QByteArray &identifier) | 135 | KAsync::Job<QSharedPointer<QLocalSocket>> ResourceAccess::connectToServer(const QByteArray &identifier) |
146 | { | 136 | { |
147 | auto s = QSharedPointer<QLocalSocket>::create(); | 137 | auto s = QSharedPointer<QLocalSocket>::create(); |
148 | return KAsync::start<QSharedPointer<QLocalSocket> >([identifier, s](KAsync::Future<QSharedPointer<QLocalSocket> > &future) { | 138 | return KAsync::start<QSharedPointer<QLocalSocket>>([identifier, s](KAsync::Future<QSharedPointer<QLocalSocket>> &future) { |
149 | s->setServerName(identifier); | 139 | s->setServerName(identifier); |
150 | auto context = new QObject; | 140 | auto context = new QObject; |
151 | QObject::connect(s.data(), &QLocalSocket::connected, context, [&future, &s, context]() { | 141 | QObject::connect(s.data(), &QLocalSocket::connected, context, [&future, &s, context]() { |
@@ -154,7 +144,7 @@ KAsync::Job<QSharedPointer<QLocalSocket> > ResourceAccess::connectToServer(const | |||
154 | future.setValue(s); | 144 | future.setValue(s); |
155 | future.setFinished(); | 145 | future.setFinished(); |
156 | }); | 146 | }); |
157 | QObject::connect(s.data(), static_cast<void(QLocalSocket::*)(QLocalSocket::LocalSocketError)>(&QLocalSocket::error), context, [&future, context](QLocalSocket::LocalSocketError) { | 147 | QObject::connect(s.data(), static_cast<void (QLocalSocket::*)(QLocalSocket::LocalSocketError)>(&QLocalSocket::error), context, [&future, context](QLocalSocket::LocalSocketError) { |
158 | delete context; | 148 | delete context; |
159 | future.setError(-1, "Failed to connect to server."); | 149 | future.setError(-1, "Failed to connect to server."); |
160 | }); | 150 | }); |
@@ -164,66 +154,67 @@ KAsync::Job<QSharedPointer<QLocalSocket> > ResourceAccess::connectToServer(const | |||
164 | 154 | ||
165 | KAsync::Job<void> ResourceAccess::Private::tryToConnect() | 155 | KAsync::Job<void> ResourceAccess::Private::tryToConnect() |
166 | { | 156 | { |
167 | //We may have a socket from the last connection leftover | 157 | // We may have a socket from the last connection leftover |
168 | socket.reset(); | 158 | socket.reset(); |
169 | auto counter = QSharedPointer<int>::create(0); | 159 | auto counter = QSharedPointer<int>::create(0); |
170 | return KAsync::dowhile([this]() -> bool { | 160 | return KAsync::dowhile([this]() -> bool { return !socket; }, |
171 | return !socket; | 161 | [this, counter](KAsync::Future<void> &future) { |
172 | }, | 162 | TracePrivate() << "Loop"; |
173 | [this, counter](KAsync::Future<void> &future) { | 163 | connectToServer(resourceInstanceIdentifier) |
174 | TracePrivate() << "Loop"; | 164 | .then<void, QSharedPointer<QLocalSocket>>( |
175 | connectToServer(resourceInstanceIdentifier) | 165 | [this, &future](const QSharedPointer<QLocalSocket> &s) { |
176 | .then<void, QSharedPointer<QLocalSocket> >([this, &future](const QSharedPointer<QLocalSocket> &s) { | 166 | Q_ASSERT(s); |
177 | Q_ASSERT(s); | 167 | socket = s; |
178 | socket = s; | 168 | future.setFinished(); |
179 | future.setFinished(); | 169 | }, |
180 | }, [&future, counter, this](int errorCode, const QString &errorString) { | 170 | [&future, counter, this](int errorCode, const QString &errorString) { |
181 | static int waitTime = 10; | 171 | static int waitTime = 10; |
182 | static int timeout = 500; | 172 | static int timeout = 500; |
183 | static int maxRetries = timeout / waitTime; | 173 | static int maxRetries = timeout / waitTime; |
184 | if (*counter > maxRetries) { | 174 | if (*counter > maxRetries) { |
185 | TracePrivate() << "Giving up"; | 175 | TracePrivate() << "Giving up"; |
186 | future.setError(-1, "Failed to connect to socket"); | 176 | future.setError(-1, "Failed to connect to socket"); |
187 | } else { | 177 | } else { |
188 | KAsync::wait(waitTime).then<void>([&future]() { | 178 | KAsync::wait(waitTime).then<void>([&future]() { future.setFinished(); }).exec(); |
189 | future.setFinished(); | 179 | } |
190 | }).exec(); | 180 | *counter = *counter + 1; |
191 | } | 181 | }) |
192 | *counter = *counter + 1; | 182 | .exec(); |
193 | }) | 183 | }); |
194 | .exec(); | ||
195 | }); | ||
196 | } | 184 | } |
197 | 185 | ||
198 | KAsync::Job<void> ResourceAccess::Private::initializeSocket() | 186 | KAsync::Job<void> ResourceAccess::Private::initializeSocket() |
199 | { | 187 | { |
200 | return KAsync::start<void>([this](KAsync::Future<void> &future) { | 188 | return KAsync::start<void>([this](KAsync::Future<void> &future) { |
201 | TracePrivate() << "Trying to connect"; | 189 | TracePrivate() << "Trying to connect"; |
202 | connectToServer(resourceInstanceIdentifier).then<void, QSharedPointer<QLocalSocket> >([this, &future](const QSharedPointer<QLocalSocket> &s) { | 190 | connectToServer(resourceInstanceIdentifier) |
203 | TracePrivate() << "Connected to resource, without having to start it."; | 191 | .then<void, QSharedPointer<QLocalSocket>>( |
204 | Q_ASSERT(s); | 192 | [this, &future](const QSharedPointer<QLocalSocket> &s) { |
205 | socket = s; | 193 | TracePrivate() << "Connected to resource, without having to start it."; |
206 | future.setFinished(); | 194 | Q_ASSERT(s); |
207 | }, | 195 | socket = s; |
208 | [this, &future](int errorCode, const QString &errorString) { | ||
209 | TracePrivate() << "Failed to connect, starting resource"; | ||
210 | //We failed to connect, so let's start the resource | ||
211 | QStringList args; | ||
212 | args << resourceInstanceIdentifier; | ||
213 | qint64 pid = 0; | ||
214 | if (QProcess::startDetached("sink_synchronizer", args, QDir::homePath(), &pid)) { | ||
215 | TracePrivate() << "Started resource " << pid; | ||
216 | tryToConnect() | ||
217 | .then<void>([&future]() { | ||
218 | future.setFinished(); | 196 | future.setFinished(); |
219 | }, [this, &future](int errorCode, const QString &errorString) { | 197 | }, |
220 | Warning() << "Failed to connect to started resource"; | 198 | [this, &future](int errorCode, const QString &errorString) { |
221 | future.setError(errorCode, errorString); | 199 | TracePrivate() << "Failed to connect, starting resource"; |
222 | }).exec(); | 200 | // We failed to connect, so let's start the resource |
223 | } else { | 201 | QStringList args; |
224 | Warning() << "Failed to start resource"; | 202 | args << resourceInstanceIdentifier; |
225 | } | 203 | qint64 pid = 0; |
226 | }).exec(); | 204 | if (QProcess::startDetached("sink_synchronizer", args, QDir::homePath(), &pid)) { |
205 | TracePrivate() << "Started resource " << pid; | ||
206 | tryToConnect() | ||
207 | .then<void>([&future]() { future.setFinished(); }, | ||
208 | [this, &future](int errorCode, const QString &errorString) { | ||
209 | Warning() << "Failed to connect to started resource"; | ||
210 | future.setError(errorCode, errorString); | ||
211 | }) | ||
212 | .exec(); | ||
213 | } else { | ||
214 | Warning() << "Failed to start resource"; | ||
215 | } | ||
216 | }) | ||
217 | .exec(); | ||
227 | }); | 218 | }); |
228 | } | 219 | } |
229 | 220 | ||
@@ -235,8 +226,7 @@ static QByteArray getResourceName(const QByteArray &instanceIdentifier) | |||
235 | } | 226 | } |
236 | 227 | ||
237 | ResourceAccess::ResourceAccess(const QByteArray &resourceInstanceIdentifier) | 228 | ResourceAccess::ResourceAccess(const QByteArray &resourceInstanceIdentifier) |
238 | : ResourceAccessInterface(), | 229 | : ResourceAccessInterface(), d(new Private(getResourceName(resourceInstanceIdentifier), resourceInstanceIdentifier, this)) |
239 | d(new Private(getResourceName(resourceInstanceIdentifier), resourceInstanceIdentifier, this)) | ||
240 | { | 230 | { |
241 | Log() << "Starting access"; | 231 | Log() << "Starting access"; |
242 | } | 232 | } |
@@ -280,10 +270,10 @@ KAsync::Job<void> ResourceAccess::sendCommand(int commandId) | |||
280 | }); | 270 | }); |
281 | } | 271 | } |
282 | 272 | ||
283 | KAsync::Job<void> ResourceAccess::sendCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb) | 273 | KAsync::Job<void> ResourceAccess::sendCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb) |
284 | { | 274 | { |
285 | //The flatbuffer is transient, but we want to store it until the job is executed | 275 | // The flatbuffer is transient, but we want to store it until the job is executed |
286 | QByteArray buffer(reinterpret_cast<const char*>(fbb.GetBufferPointer()), fbb.GetSize()); | 276 | QByteArray buffer(reinterpret_cast<const char *>(fbb.GetBufferPointer()), fbb.GetSize()); |
287 | return KAsync::start<void>([commandId, buffer, this](KAsync::Future<void> &f) { | 277 | return KAsync::start<void>([commandId, buffer, this](KAsync::Future<void> &f) { |
288 | auto callback = [&f](int error, const QString &errorMessage) { | 278 | auto callback = [&f](int error, const QString &errorMessage) { |
289 | if (error) { | 279 | if (error) { |
@@ -313,7 +303,7 @@ KAsync::Job<void> ResourceAccess::synchronizeResource(bool sourceSync, bool loca | |||
313 | KAsync::Job<void> ResourceAccess::sendCreateCommand(const QByteArray &resourceBufferType, const QByteArray &buffer) | 303 | KAsync::Job<void> ResourceAccess::sendCreateCommand(const QByteArray &resourceBufferType, const QByteArray &buffer) |
314 | { | 304 | { |
315 | flatbuffers::FlatBufferBuilder fbb; | 305 | flatbuffers::FlatBufferBuilder fbb; |
316 | //This is the resource buffer type and not the domain type | 306 | // This is the resource buffer type and not the domain type |
317 | auto type = fbb.CreateString(resourceBufferType.constData()); | 307 | auto type = fbb.CreateString(resourceBufferType.constData()); |
318 | auto delta = Sink::EntityBuffer::appendAsVector(fbb, buffer.constData(), buffer.size()); | 308 | auto delta = Sink::EntityBuffer::appendAsVector(fbb, buffer.constData(), buffer.size()); |
319 | auto location = Sink::Commands::CreateCreateEntity(fbb, 0, type, delta); | 309 | auto location = Sink::Commands::CreateCreateEntity(fbb, 0, type, delta); |
@@ -322,13 +312,14 @@ KAsync::Job<void> ResourceAccess::sendCreateCommand(const QByteArray &resourceBu | |||
322 | return sendCommand(Sink::Commands::CreateEntityCommand, fbb); | 312 | return sendCommand(Sink::Commands::CreateEntityCommand, fbb); |
323 | } | 313 | } |
324 | 314 | ||
325 | KAsync::Job<void> ResourceAccess::sendModifyCommand(const QByteArray &uid, qint64 revision, const QByteArray &resourceBufferType, const QByteArrayList &deletedProperties, const QByteArray &buffer) | 315 | KAsync::Job<void> |
316 | ResourceAccess::sendModifyCommand(const QByteArray &uid, qint64 revision, const QByteArray &resourceBufferType, const QByteArrayList &deletedProperties, const QByteArray &buffer) | ||
326 | { | 317 | { |
327 | flatbuffers::FlatBufferBuilder fbb; | 318 | flatbuffers::FlatBufferBuilder fbb; |
328 | auto entityId = fbb.CreateString(uid.constData()); | 319 | auto entityId = fbb.CreateString(uid.constData()); |
329 | //This is the resource buffer type and not the domain type | 320 | // This is the resource buffer type and not the domain type |
330 | auto type = fbb.CreateString(resourceBufferType.constData()); | 321 | auto type = fbb.CreateString(resourceBufferType.constData()); |
331 | //FIXME | 322 | // FIXME |
332 | auto deletions = 0; | 323 | auto deletions = 0; |
333 | auto delta = Sink::EntityBuffer::appendAsVector(fbb, buffer.constData(), buffer.size()); | 324 | auto delta = Sink::EntityBuffer::appendAsVector(fbb, buffer.constData(), buffer.size()); |
334 | auto location = Sink::Commands::CreateModifyEntity(fbb, revision, entityId, deletions, type, delta); | 325 | auto location = Sink::Commands::CreateModifyEntity(fbb, revision, entityId, deletions, type, delta); |
@@ -341,7 +332,7 @@ KAsync::Job<void> ResourceAccess::sendDeleteCommand(const QByteArray &uid, qint6 | |||
341 | { | 332 | { |
342 | flatbuffers::FlatBufferBuilder fbb; | 333 | flatbuffers::FlatBufferBuilder fbb; |
343 | auto entityId = fbb.CreateString(uid.constData()); | 334 | auto entityId = fbb.CreateString(uid.constData()); |
344 | //This is the resource buffer type and not the domain type | 335 | // This is the resource buffer type and not the domain type |
345 | auto type = fbb.CreateString(resourceBufferType.constData()); | 336 | auto type = fbb.CreateString(resourceBufferType.constData()); |
346 | auto location = Sink::Commands::CreateDeleteEntity(fbb, revision, entityId, type); | 337 | auto location = Sink::Commands::CreateDeleteEntity(fbb, revision, entityId, type); |
347 | Sink::Commands::FinishDeleteEntityBuffer(fbb, location); | 338 | Sink::Commands::FinishDeleteEntityBuffer(fbb, location); |
@@ -358,7 +349,8 @@ KAsync::Job<void> ResourceAccess::sendRevisionReplayedCommand(qint64 revision) | |||
358 | return sendCommand(Sink::Commands::RevisionReplayedCommand, fbb); | 349 | return sendCommand(Sink::Commands::RevisionReplayedCommand, fbb); |
359 | } | 350 | } |
360 | 351 | ||
361 | KAsync::Job<void> ResourceAccess::sendInspectionCommand(const QByteArray &inspectionId, const QByteArray &domainType, const QByteArray &entityId, const QByteArray &property, const QVariant &expectedValue) | 352 | KAsync::Job<void> |
353 | ResourceAccess::sendInspectionCommand(const QByteArray &inspectionId, const QByteArray &domainType, const QByteArray &entityId, const QByteArray &property, const QVariant &expectedValue) | ||
362 | { | 354 | { |
363 | flatbuffers::FlatBufferBuilder fbb; | 355 | flatbuffers::FlatBufferBuilder fbb; |
364 | auto id = fbb.CreateString(inspectionId.toStdString()); | 356 | auto id = fbb.CreateString(inspectionId.toStdString()); |
@@ -371,7 +363,7 @@ KAsync::Job<void> ResourceAccess::sendInspectionCommand(const QByteArray &inspec | |||
371 | s << expectedValue; | 363 | s << expectedValue; |
372 | 364 | ||
373 | auto expected = fbb.CreateString(array.toStdString()); | 365 | auto expected = fbb.CreateString(array.toStdString()); |
374 | auto location = Sink::Commands::CreateInspection (fbb, id, 0, entity, domain, prop, expected); | 366 | auto location = Sink::Commands::CreateInspection(fbb, id, 0, entity, domain, prop, expected); |
375 | Sink::Commands::FinishInspectionBuffer(fbb, location); | 367 | Sink::Commands::FinishInspectionBuffer(fbb, location); |
376 | open(); | 368 | open(); |
377 | return sendCommand(Sink::Commands::InspectionCommand, fbb); | 369 | return sendCommand(Sink::Commands::InspectionCommand, fbb); |
@@ -389,21 +381,21 @@ void ResourceAccess::open() | |||
389 | auto time = QSharedPointer<QTime>::create(); | 381 | auto time = QSharedPointer<QTime>::create(); |
390 | time->start(); | 382 | time->start(); |
391 | d->openingSocket = true; | 383 | d->openingSocket = true; |
392 | d->initializeSocket().then<void>([this, time]() { | 384 | d->initializeSocket() |
393 | Trace() << "Socket is initialized." << Log::TraceTime(time->elapsed()); | 385 | .then<void>( |
394 | d->openingSocket = false; | 386 | [this, time]() { |
395 | QObject::connect(d->socket.data(), &QLocalSocket::disconnected, | 387 | Trace() << "Socket is initialized." << Log::TraceTime(time->elapsed()); |
396 | this, &ResourceAccess::disconnected); | 388 | d->openingSocket = false; |
397 | QObject::connect(d->socket.data(), SIGNAL(error(QLocalSocket::LocalSocketError)), | 389 | QObject::connect(d->socket.data(), &QLocalSocket::disconnected, this, &ResourceAccess::disconnected); |
398 | this, SLOT(connectionError(QLocalSocket::LocalSocketError))); | 390 | QObject::connect(d->socket.data(), SIGNAL(error(QLocalSocket::LocalSocketError)), this, SLOT(connectionError(QLocalSocket::LocalSocketError))); |
399 | QObject::connect(d->socket.data(), &QIODevice::readyRead, | 391 | QObject::connect(d->socket.data(), &QIODevice::readyRead, this, &ResourceAccess::readResourceMessage); |
400 | this, &ResourceAccess::readResourceMessage); | 392 | connected(); |
401 | connected(); | 393 | }, |
402 | }, | 394 | [this](int error, const QString &errorString) { |
403 | [this](int error, const QString &errorString) { | 395 | d->openingSocket = false; |
404 | d->openingSocket = false; | 396 | Warning() << "Failed to initialize socket " << errorString; |
405 | Warning() << "Failed to initialize socket " << errorString; | 397 | }) |
406 | }).exec(); | 398 | .exec(); |
407 | } | 399 | } |
408 | 400 | ||
409 | void ResourceAccess::close() | 401 | void ResourceAccess::close() |
@@ -417,7 +409,7 @@ void ResourceAccess::close() | |||
417 | void ResourceAccess::sendCommand(const QSharedPointer<QueuedCommand> &command) | 409 | void ResourceAccess::sendCommand(const QSharedPointer<QueuedCommand> &command) |
418 | { | 410 | { |
419 | Q_ASSERT(isReady()); | 411 | Q_ASSERT(isReady()); |
420 | //TODO: we should have a timeout for commands | 412 | // TODO: we should have a timeout for commands |
421 | d->messageId++; | 413 | d->messageId++; |
422 | const auto messageId = d->messageId; | 414 | const auto messageId = d->messageId; |
423 | Log() << QString("Sending command \"%1\" with messageId %2").arg(QString(Sink::Commands::name(command->commandId))).arg(d->messageId); | 415 | Log() << QString("Sending command \"%1\" with messageId %2").arg(QString(Sink::Commands::name(command->commandId))).arg(d->messageId); |
@@ -427,17 +419,17 @@ void ResourceAccess::sendCommand(const QSharedPointer<QueuedCommand> &command) | |||
427 | d->pendingCommands.remove(messageId); | 419 | d->pendingCommands.remove(messageId); |
428 | command->callback(errorCode, errorMessage); | 420 | command->callback(errorCode, errorMessage); |
429 | }); | 421 | }); |
430 | //Keep track of the command until we're sure it arrived | 422 | // Keep track of the command until we're sure it arrived |
431 | d->pendingCommands.insert(d->messageId, command); | 423 | d->pendingCommands.insert(d->messageId, command); |
432 | Commands::write(d->socket.data(), d->messageId, command->commandId, command->buffer.constData(), command->buffer.size()); | 424 | Commands::write(d->socket.data(), d->messageId, command->commandId, command->buffer.constData(), command->buffer.size()); |
433 | } | 425 | } |
434 | 426 | ||
435 | void ResourceAccess::processCommandQueue() | 427 | void ResourceAccess::processCommandQueue() |
436 | { | 428 | { |
437 | //TODO: serialize instead of blast them all through the socket? | 429 | // TODO: serialize instead of blast them all through the socket? |
438 | Trace() << "We have " << d->commandQueue.size() << " queued commands"; | 430 | Trace() << "We have " << d->commandQueue.size() << " queued commands"; |
439 | Trace() << "Pending commands: " << d->pendingCommands.size(); | 431 | Trace() << "Pending commands: " << d->pendingCommands.size(); |
440 | for (auto command: d->commandQueue) { | 432 | for (auto command : d->commandQueue) { |
441 | sendCommand(command); | 433 | sendCommand(command); |
442 | } | 434 | } |
443 | d->commandQueue.clear(); | 435 | d->commandQueue.clear(); |
@@ -446,7 +438,7 @@ void ResourceAccess::processCommandQueue() | |||
446 | void ResourceAccess::processPendingCommandQueue() | 438 | void ResourceAccess::processPendingCommandQueue() |
447 | { | 439 | { |
448 | Trace() << "We have " << d->pendingCommands.size() << " pending commands"; | 440 | Trace() << "We have " << d->pendingCommands.size() << " pending commands"; |
449 | for (auto command: d->pendingCommands) { | 441 | for (auto command : d->pendingCommands) { |
450 | Trace() << "Reenquing command " << command->commandId; | 442 | Trace() << "Reenquing command " << command->commandId; |
451 | d->commandQueue << command; | 443 | d->commandQueue << command; |
452 | } | 444 | } |
@@ -471,9 +463,9 @@ void ResourceAccess::connected() | |||
471 | Commands::write(d->socket.data(), ++d->messageId, Commands::HandshakeCommand, fbb); | 463 | Commands::write(d->socket.data(), ++d->messageId, Commands::HandshakeCommand, fbb); |
472 | } | 464 | } |
473 | 465 | ||
474 | //Reenqueue pending commands, we failed to send them | 466 | // Reenqueue pending commands, we failed to send them |
475 | processPendingCommandQueue(); | 467 | processPendingCommandQueue(); |
476 | //Send queued commands | 468 | // Send queued commands |
477 | processCommandQueue(); | 469 | processCommandQueue(); |
478 | 470 | ||
479 | emit ready(true); | 471 | emit ready(true); |
@@ -510,7 +502,8 @@ void ResourceAccess::readResourceMessage() | |||
510 | d->partialMessageBuffer += d->socket->readAll(); | 502 | d->partialMessageBuffer += d->socket->readAll(); |
511 | 503 | ||
512 | // should be scheduled rather than processed all at once | 504 | // should be scheduled rather than processed all at once |
513 | while (processMessageBuffer()) {} | 505 | while (processMessageBuffer()) { |
506 | } | ||
514 | } | 507 | } |
515 | 508 | ||
516 | bool ResourceAccess::processMessageBuffer() | 509 | bool ResourceAccess::processMessageBuffer() |
@@ -521,9 +514,9 @@ bool ResourceAccess::processMessageBuffer() | |||
521 | return false; | 514 | return false; |
522 | } | 515 | } |
523 | 516 | ||
524 | //const uint messageId = *(int*)(d->partialMessageBuffer.constData()); | 517 | // const uint messageId = *(int*)(d->partialMessageBuffer.constData()); |
525 | const int commandId = *(int*)(d->partialMessageBuffer.constData() + sizeof(uint)); | 518 | const int commandId = *(int *)(d->partialMessageBuffer.constData() + sizeof(uint)); |
526 | const uint size = *(int*)(d->partialMessageBuffer.constData() + sizeof(int) + sizeof(uint)); | 519 | const uint size = *(int *)(d->partialMessageBuffer.constData() + sizeof(int) + sizeof(uint)); |
527 | 520 | ||
528 | if (size > (uint)(d->partialMessageBuffer.size() - headerSize)) { | 521 | if (size > (uint)(d->partialMessageBuffer.size() - headerSize)) { |
529 | Warning() << "command too small"; | 522 | Warning() << "command too small"; |
@@ -546,10 +539,8 @@ bool ResourceAccess::processMessageBuffer() | |||
546 | Log() << QString("Command with messageId %1 completed %2").arg(buffer->id()).arg(buffer->success() ? "sucessfully" : "unsuccessfully"); | 539 | Log() << QString("Command with messageId %1 completed %2").arg(buffer->id()).arg(buffer->success() ? "sucessfully" : "unsuccessfully"); |
547 | 540 | ||
548 | d->completeCommands << buffer->id(); | 541 | d->completeCommands << buffer->id(); |
549 | //The callbacks can result in this object getting destroyed directly, so we need to ensure we finish our work first | 542 | // The callbacks can result in this object getting destroyed directly, so we need to ensure we finish our work first |
550 | queuedInvoke([=]() { | 543 | queuedInvoke([=]() { d->callCallbacks(); }, this); |
551 | d->callCallbacks(); | ||
552 | }, this); | ||
553 | break; | 544 | break; |
554 | } | 545 | } |
555 | case Commands::NotificationCommand: { | 546 | case Commands::NotificationCommand: { |
@@ -563,21 +554,18 @@ bool ResourceAccess::processMessageBuffer() | |||
563 | Log() << "Received inspection notification."; | 554 | Log() << "Received inspection notification."; |
564 | Notification n; | 555 | Notification n; |
565 | if (buffer->identifier()) { | 556 | if (buffer->identifier()) { |
566 | //Don't use fromRawData, the buffer is gone once we invoke emit notification | 557 | // Don't use fromRawData, the buffer is gone once we invoke emit notification |
567 | n.id = BufferUtils::extractBufferCopy(buffer->identifier()); | 558 | n.id = BufferUtils::extractBufferCopy(buffer->identifier()); |
568 | } | 559 | } |
569 | if (buffer->message()) { | 560 | if (buffer->message()) { |
570 | //Don't use fromRawData, the buffer is gone once we invoke emit notification | 561 | // Don't use fromRawData, the buffer is gone once we invoke emit notification |
571 | n.message = BufferUtils::extractBufferCopy(buffer->message()); | 562 | n.message = BufferUtils::extractBufferCopy(buffer->message()); |
572 | } | 563 | } |
573 | n.type = buffer->type(); | 564 | n.type = buffer->type(); |
574 | n.code = buffer->code(); | 565 | n.code = buffer->code(); |
575 | //The callbacks can result in this object getting destroyed directly, so we need to ensure we finish our work first | 566 | // The callbacks can result in this object getting destroyed directly, so we need to ensure we finish our work first |
576 | queuedInvoke([=]() { | 567 | queuedInvoke([=]() { emit notification(n); }, this); |
577 | emit notification(n); | 568 | } break; |
578 | }, this); | ||
579 | } | ||
580 | break; | ||
581 | case Sink::Commands::NotificationType::NotificationType_Status: | 569 | case Sink::Commands::NotificationType::NotificationType_Status: |
582 | case Sink::Commands::NotificationType::NotificationType_Warning: | 570 | case Sink::Commands::NotificationType::NotificationType_Warning: |
583 | case Sink::Commands::NotificationType::NotificationType_Progress: | 571 | case Sink::Commands::NotificationType::NotificationType_Progress: |
@@ -608,7 +596,7 @@ ResourceAccessFactory &ResourceAccessFactory::instance() | |||
608 | Sink::ResourceAccess::Ptr ResourceAccessFactory::getAccess(const QByteArray &instanceIdentifier) | 596 | Sink::ResourceAccess::Ptr ResourceAccessFactory::getAccess(const QByteArray &instanceIdentifier) |
609 | { | 597 | { |
610 | if (!mCache.contains(instanceIdentifier)) { | 598 | if (!mCache.contains(instanceIdentifier)) { |
611 | //Reuse the pointer if something else kept the resourceaccess alive | 599 | // Reuse the pointer if something else kept the resourceaccess alive |
612 | if (mWeakCache.contains(instanceIdentifier)) { | 600 | if (mWeakCache.contains(instanceIdentifier)) { |
613 | auto sharedPointer = mWeakCache.value(instanceIdentifier).toStrongRef(); | 601 | auto sharedPointer = mWeakCache.value(instanceIdentifier).toStrongRef(); |
614 | if (sharedPointer) { | 602 | if (sharedPointer) { |
@@ -616,7 +604,7 @@ Sink::ResourceAccess::Ptr ResourceAccessFactory::getAccess(const QByteArray &ins | |||
616 | } | 604 | } |
617 | } | 605 | } |
618 | if (!mCache.contains(instanceIdentifier)) { | 606 | if (!mCache.contains(instanceIdentifier)) { |
619 | //Create a new instance if necessary | 607 | // Create a new instance if necessary |
620 | auto sharedPointer = Sink::ResourceAccess::Ptr::create(instanceIdentifier); | 608 | auto sharedPointer = Sink::ResourceAccess::Ptr::create(instanceIdentifier); |
621 | QObject::connect(sharedPointer.data(), &Sink::ResourceAccess::ready, sharedPointer.data(), [this, instanceIdentifier](bool ready) { | 609 | QObject::connect(sharedPointer.data(), &Sink::ResourceAccess::ready, sharedPointer.data(), [this, instanceIdentifier](bool ready) { |
622 | if (!ready) { | 610 | if (!ready) { |
@@ -629,10 +617,8 @@ Sink::ResourceAccess::Ptr ResourceAccessFactory::getAccess(const QByteArray &ins | |||
629 | } | 617 | } |
630 | if (!mTimer.contains(instanceIdentifier)) { | 618 | if (!mTimer.contains(instanceIdentifier)) { |
631 | auto timer = new QTimer; | 619 | auto timer = new QTimer; |
632 | //Drop connection after 3 seconds (which is a random value) | 620 | // Drop connection after 3 seconds (which is a random value) |
633 | QObject::connect(timer, &QTimer::timeout, timer, [this, instanceIdentifier]() { | 621 | QObject::connect(timer, &QTimer::timeout, timer, [this, instanceIdentifier]() { mCache.remove(instanceIdentifier); }); |
634 | mCache.remove(instanceIdentifier); | ||
635 | }); | ||
636 | timer->setInterval(3000); | 622 | timer->setInterval(3000); |
637 | mTimer.insert(instanceIdentifier, timer); | 623 | mTimer.insert(instanceIdentifier, timer); |
638 | } | 624 | } |
@@ -640,7 +626,6 @@ Sink::ResourceAccess::Ptr ResourceAccessFactory::getAccess(const QByteArray &ins | |||
640 | timer->start(); | 626 | timer->start(); |
641 | return mCache.value(instanceIdentifier); | 627 | return mCache.value(instanceIdentifier); |
642 | } | 628 | } |
643 | |||
644 | } | 629 | } |
645 | 630 | ||
646 | #pragma clang diagnostic push | 631 | #pragma clang diagnostic push |