diff options
-rw-r--r-- | common/resourceaccess.cpp | 154 | ||||
-rw-r--r-- | common/resourceaccess.h | 3 |
2 files changed, 88 insertions, 69 deletions
diff --git a/common/resourceaccess.cpp b/common/resourceaccess.cpp index 52cd61a..c6d701d 100644 --- a/common/resourceaccess.cpp +++ b/common/resourceaccess.cpp | |||
@@ -65,11 +65,10 @@ class ResourceAccess::Private | |||
65 | { | 65 | { |
66 | public: | 66 | public: |
67 | Private(const QByteArray &name, ResourceAccess *ra); | 67 | Private(const QByteArray &name, ResourceAccess *ra); |
68 | Async::Job<void> tryToConnect(); | ||
69 | Async::Job<void> initializeSocket(); | ||
68 | QByteArray resourceName; | 70 | QByteArray resourceName; |
69 | QLocalSocket *socket; | 71 | QSharedPointer<QLocalSocket> socket; |
70 | QTimer *tryOpenTimer; | ||
71 | bool startingProcess; | ||
72 | bool openingConnection; | ||
73 | QByteArray partialMessageBuffer; | 72 | QByteArray partialMessageBuffer; |
74 | flatbuffers::FlatBufferBuilder fbb; | 73 | flatbuffers::FlatBufferBuilder fbb; |
75 | QVector<QSharedPointer<QueuedCommand>> commandQueue; | 74 | QVector<QSharedPointer<QueuedCommand>> commandQueue; |
@@ -80,32 +79,84 @@ public: | |||
80 | 79 | ||
81 | ResourceAccess::Private::Private(const QByteArray &name, ResourceAccess *q) | 80 | ResourceAccess::Private::Private(const QByteArray &name, ResourceAccess *q) |
82 | : resourceName(name), | 81 | : resourceName(name), |
83 | socket(new QLocalSocket(q)), | ||
84 | tryOpenTimer(new QTimer(q)), | ||
85 | startingProcess(false), | ||
86 | openingConnection(false), | ||
87 | messageId(0) | 82 | messageId(0) |
88 | { | 83 | { |
89 | } | 84 | } |
90 | 85 | ||
86 | //Connects to server and returns connected socket on success | ||
87 | static Async::Job<QSharedPointer<QLocalSocket> > connectToServer(const QByteArray &identifier) | ||
88 | { | ||
89 | auto s = QSharedPointer<QLocalSocket>::create(); | ||
90 | return Async::start<QSharedPointer<QLocalSocket> >([identifier, s](Async::Future<QSharedPointer<QLocalSocket> > &future) { | ||
91 | s->setServerName(identifier); | ||
92 | auto context = new QObject; | ||
93 | QObject::connect(s.data(), &QLocalSocket::connected, context, [&future, &s, context]() { | ||
94 | Q_ASSERT(s); | ||
95 | delete context; | ||
96 | future.setValue(s); | ||
97 | future.setFinished(); | ||
98 | }); | ||
99 | QObject::connect(s.data(), static_cast<void(QLocalSocket::*)(QLocalSocket::LocalSocketError)>(&QLocalSocket::error), context, [&future, context](QLocalSocket::LocalSocketError) { | ||
100 | delete context; | ||
101 | future.setError(-1, "Failed to connect to server."); | ||
102 | }); | ||
103 | s->open(); | ||
104 | }); | ||
105 | } | ||
106 | |||
107 | Async::Job<void> ResourceAccess::Private::tryToConnect() | ||
108 | { | ||
109 | return Async::dowhile([this]() -> bool { | ||
110 | //TODO abort after N retries? | ||
111 | return !socket; | ||
112 | }, | ||
113 | [this](Async::Future<void> &future) { | ||
114 | Trace() << "Loop"; | ||
115 | Async::wait(50) | ||
116 | .then(connectToServer(resourceName)) | ||
117 | .then<void, QSharedPointer<QLocalSocket> >([this, &future](const QSharedPointer<QLocalSocket> &s) { | ||
118 | Q_ASSERT(s); | ||
119 | socket = s; | ||
120 | future.setFinished(); | ||
121 | }, | ||
122 | [&future](int errorCode, const QString &errorString) { | ||
123 | future.setFinished(); | ||
124 | }).exec(); | ||
125 | }); | ||
126 | } | ||
127 | |||
128 | Async::Job<void> ResourceAccess::Private::initializeSocket() | ||
129 | { | ||
130 | return Async::start<void>([this](Async::Future<void> &future) { | ||
131 | Trace() << "Trying to connect"; | ||
132 | connectToServer(resourceName).then<void, QSharedPointer<QLocalSocket> >([this, &future](const QSharedPointer<QLocalSocket> &s) { | ||
133 | Trace() << "Connected to resource, without having to start it."; | ||
134 | Q_ASSERT(s); | ||
135 | socket = s; | ||
136 | future.setFinished(); | ||
137 | }, | ||
138 | [this, &future](int errorCode, const QString &errorString) { | ||
139 | Trace() << "Failed to connect, starting resource"; | ||
140 | //We failed to connect, so let's start the resource | ||
141 | QStringList args; | ||
142 | args << resourceName; | ||
143 | if (QProcess::startDetached("akonadi2_synchronizer", args, QDir::homePath())) { | ||
144 | tryToConnect() | ||
145 | .then<void>([&future]() { | ||
146 | future.setFinished(); | ||
147 | }).exec(); | ||
148 | } else { | ||
149 | Warning() << "Failed to start resource"; | ||
150 | } | ||
151 | }).exec(); | ||
152 | }); | ||
153 | } | ||
154 | |||
91 | ResourceAccess::ResourceAccess(const QByteArray &resourceName, QObject *parent) | 155 | ResourceAccess::ResourceAccess(const QByteArray &resourceName, QObject *parent) |
92 | : QObject(parent), | 156 | : QObject(parent), |
93 | d(new Private(resourceName, this)) | 157 | d(new Private(resourceName, this)) |
94 | { | 158 | { |
95 | d->tryOpenTimer->setInterval(50); | ||
96 | d->tryOpenTimer->setSingleShot(true); | ||
97 | connect(d->tryOpenTimer, &QTimer::timeout, | ||
98 | this, &ResourceAccess::open); | ||
99 | |||
100 | log("Starting access"); | 159 | log("Starting access"); |
101 | connect(d->socket, &QLocalSocket::connected, | ||
102 | this, &ResourceAccess::connected); | ||
103 | connect(d->socket, &QLocalSocket::disconnected, | ||
104 | this, &ResourceAccess::disconnected); | ||
105 | connect(d->socket, SIGNAL(error(QLocalSocket::LocalSocketError)), | ||
106 | this, SLOT(connectionError(QLocalSocket::LocalSocketError))); | ||
107 | connect(d->socket, &QIODevice::readyRead, | ||
108 | this, &ResourceAccess::readResourceMessage); | ||
109 | } | 160 | } |
110 | 161 | ||
111 | ResourceAccess::~ResourceAccess() | 162 | ResourceAccess::~ResourceAccess() |
@@ -120,7 +171,7 @@ QByteArray ResourceAccess::resourceName() const | |||
120 | 171 | ||
121 | bool ResourceAccess::isReady() const | 172 | bool ResourceAccess::isReady() const |
122 | { | 173 | { |
123 | return d->socket->isValid(); | 174 | return (d->socket && d->socket->isValid()); |
124 | } | 175 | } |
125 | 176 | ||
126 | void ResourceAccess::registerCallback(uint messageId, const std::function<void(int error, const QString &errorMessage)> &callback) | 177 | void ResourceAccess::registerCallback(uint messageId, const std::function<void(int error, const QString &errorMessage)> &callback) |
@@ -174,18 +225,20 @@ Async::Job<void> ResourceAccess::synchronizeResource(bool sourceSync, bool local | |||
174 | 225 | ||
175 | void ResourceAccess::open() | 226 | void ResourceAccess::open() |
176 | { | 227 | { |
177 | if (d->socket->isValid()) { | 228 | if (d->socket && d->socket->isValid()) { |
178 | log("Socket valid, so not opening again"); | 229 | log("Socket valid, so not opening again"); |
179 | return; | 230 | return; |
180 | } | 231 | } |
181 | d->openingConnection = true; | 232 | d->initializeSocket().then<void>([this]() { |
182 | 233 | Trace() << "Socket is initialized"; | |
183 | //TODO: if we try and try and the process does not pick up | 234 | QObject::connect(d->socket.data(), &QLocalSocket::disconnected, |
184 | // we should probably try to start the process again | 235 | this, &ResourceAccess::disconnected); |
185 | d->socket->setServerName(d->resourceName); | 236 | QObject::connect(d->socket.data(), SIGNAL(error(QLocalSocket::LocalSocketError)), |
186 | log(QString("Opening %1").arg(d->socket->serverName())); | 237 | this, SLOT(connectionError(QLocalSocket::LocalSocketError))); |
187 | //FIXME: race between starting the exec and opening the socket? | 238 | QObject::connect(d->socket.data(), &QIODevice::readyRead, |
188 | d->socket->open(); | 239 | this, &ResourceAccess::readResourceMessage); |
240 | connected(); | ||
241 | }).exec(); | ||
189 | } | 242 | } |
190 | 243 | ||
191 | void ResourceAccess::close() | 244 | void ResourceAccess::close() |
@@ -209,7 +262,7 @@ void ResourceAccess::sendCommand(const QSharedPointer<QueuedCommand> &command) | |||
209 | } | 262 | } |
210 | //Keep track of the command until we're sure it arrived | 263 | //Keep track of the command until we're sure it arrived |
211 | d->pendingCommands.insert(d->messageId, command); | 264 | d->pendingCommands.insert(d->messageId, command); |
212 | Commands::write(d->socket, d->messageId, command->commandId, command->buffer.constData(), command->buffer.size()); | 265 | Commands::write(d->socket.data(), d->messageId, command->commandId, command->buffer.constData(), command->buffer.size()); |
213 | } | 266 | } |
214 | 267 | ||
215 | void ResourceAccess::processCommandQueue() | 268 | void ResourceAccess::processCommandQueue() |
@@ -224,9 +277,6 @@ void ResourceAccess::processCommandQueue() | |||
224 | 277 | ||
225 | void ResourceAccess::connected() | 278 | void ResourceAccess::connected() |
226 | { | 279 | { |
227 | d->startingProcess = false; | ||
228 | d->openingConnection = false; | ||
229 | |||
230 | if (!isReady()) { | 280 | if (!isReady()) { |
231 | return; | 281 | return; |
232 | } | 282 | } |
@@ -237,7 +287,7 @@ void ResourceAccess::connected() | |||
237 | auto name = d->fbb.CreateString(QString::number(QCoreApplication::applicationPid()).toLatin1()); | 287 | auto name = d->fbb.CreateString(QString::number(QCoreApplication::applicationPid()).toLatin1()); |
238 | auto command = Akonadi2::CreateHandshake(d->fbb, name); | 288 | auto command = Akonadi2::CreateHandshake(d->fbb, name); |
239 | Akonadi2::FinishHandshakeBuffer(d->fbb, command); | 289 | Akonadi2::FinishHandshakeBuffer(d->fbb, command); |
240 | Commands::write(d->socket, ++d->messageId, Commands::HandshakeCommand, d->fbb); | 290 | Commands::write(d->socket.data(), ++d->messageId, Commands::HandshakeCommand, d->fbb); |
241 | d->fbb.Clear(); | 291 | d->fbb.Clear(); |
242 | } | 292 | } |
243 | 293 | ||
@@ -255,21 +305,6 @@ void ResourceAccess::disconnected() | |||
255 | 305 | ||
256 | void ResourceAccess::connectionError(QLocalSocket::LocalSocketError error) | 306 | void ResourceAccess::connectionError(QLocalSocket::LocalSocketError error) |
257 | { | 307 | { |
258 | //We tried to connect to the server, but the socket is not yet available. | ||
259 | //We're trying to connect but failed, start the resource and retry. | ||
260 | //Don't automatically restart on later disconnects. | ||
261 | if (d->openingConnection && error == QLocalSocket::LocalSocketError::ServerNotFoundError) { | ||
262 | startResourceAndConnect(); | ||
263 | return; | ||
264 | } | ||
265 | //Retry to connect to the server while starting the process | ||
266 | if (d->startingProcess) { | ||
267 | if (!d->tryOpenTimer->isActive()) { | ||
268 | d->tryOpenTimer->start(); | ||
269 | } | ||
270 | return; | ||
271 | } | ||
272 | |||
273 | if (error == QLocalSocket::PeerClosedError) { | 308 | if (error == QLocalSocket::PeerClosedError) { |
274 | Log() << "The resource closed the connection."; | 309 | Log() << "The resource closed the connection."; |
275 | } else { | 310 | } else { |
@@ -283,24 +318,9 @@ void ResourceAccess::connectionError(QLocalSocket::LocalSocketError error) | |||
283 | d->resultHandler.clear(); | 318 | d->resultHandler.clear(); |
284 | } | 319 | } |
285 | 320 | ||
286 | void ResourceAccess::startResourceAndConnect() | ||
287 | { | ||
288 | d->startingProcess = true; | ||
289 | Log() << "Attempting to start resource " + d->resourceName; | ||
290 | QStringList args; | ||
291 | args << d->resourceName; | ||
292 | if (QProcess::startDetached("akonadi2_synchronizer", args, QDir::homePath())) { | ||
293 | if (!d->tryOpenTimer->isActive()) { | ||
294 | d->tryOpenTimer->start(); | ||
295 | } | ||
296 | } else { | ||
297 | qWarning() << "Failed to start resource"; | ||
298 | } | ||
299 | } | ||
300 | |||
301 | void ResourceAccess::readResourceMessage() | 321 | void ResourceAccess::readResourceMessage() |
302 | { | 322 | { |
303 | if (!d->socket->isValid()) { | 323 | if (!d->socket || !d->socket->isValid()) { |
304 | return; | 324 | return; |
305 | } | 325 | } |
306 | 326 | ||
diff --git a/common/resourceaccess.h b/common/resourceaccess.h index 4c9d9d2..dc7640d 100644 --- a/common/resourceaccess.h +++ b/common/resourceaccess.h | |||
@@ -58,7 +58,6 @@ Q_SIGNALS: | |||
58 | 58 | ||
59 | private Q_SLOTS: | 59 | private Q_SLOTS: |
60 | //TODO: move these to the Private class | 60 | //TODO: move these to the Private class |
61 | void connected(); | ||
62 | void disconnected(); | 61 | void disconnected(); |
63 | void connectionError(QLocalSocket::LocalSocketError error); | 62 | void connectionError(QLocalSocket::LocalSocketError error); |
64 | void readResourceMessage(); | 63 | void readResourceMessage(); |
@@ -66,9 +65,9 @@ private Q_SLOTS: | |||
66 | void callCallbacks(int id); | 65 | void callCallbacks(int id); |
67 | 66 | ||
68 | private: | 67 | private: |
68 | void connected(); | ||
69 | void log(const QString &message); | 69 | void log(const QString &message); |
70 | void registerCallback(uint messageId, const std::function<void(int error, const QString &)> &callback); | 70 | void registerCallback(uint messageId, const std::function<void(int error, const QString &)> &callback); |
71 | void startResourceAndConnect(); | ||
72 | 71 | ||
73 | void sendCommand(const QSharedPointer<QueuedCommand> &command); | 72 | void sendCommand(const QSharedPointer<QueuedCommand> &command); |
74 | void processCommandQueue(); | 73 | void processCommandQueue(); |