summaryrefslogtreecommitdiffstats
path: root/common
diff options
context:
space:
mode:
Diffstat (limited to 'common')
-rw-r--r--common/resourceaccess.cpp154
-rw-r--r--common/resourceaccess.h3
2 files changed, 88 insertions, 69 deletions
diff --git a/common/resourceaccess.cpp b/common/resourceaccess.cpp
index 52cd61a..c6d701d 100644
--- a/common/resourceaccess.cpp
+++ b/common/resourceaccess.cpp
@@ -65,11 +65,10 @@ class ResourceAccess::Private
65{ 65{
66public: 66public:
67 Private(const QByteArray &name, ResourceAccess *ra); 67 Private(const QByteArray &name, ResourceAccess *ra);
68 Async::Job<void> tryToConnect();
69 Async::Job<void> initializeSocket();
68 QByteArray resourceName; 70 QByteArray resourceName;
69 QLocalSocket *socket; 71 QSharedPointer<QLocalSocket> socket;
70 QTimer *tryOpenTimer;
71 bool startingProcess;
72 bool openingConnection;
73 QByteArray partialMessageBuffer; 72 QByteArray partialMessageBuffer;
74 flatbuffers::FlatBufferBuilder fbb; 73 flatbuffers::FlatBufferBuilder fbb;
75 QVector<QSharedPointer<QueuedCommand>> commandQueue; 74 QVector<QSharedPointer<QueuedCommand>> commandQueue;
@@ -80,32 +79,84 @@ public:
80 79
81ResourceAccess::Private::Private(const QByteArray &name, ResourceAccess *q) 80ResourceAccess::Private::Private(const QByteArray &name, ResourceAccess *q)
82 : resourceName(name), 81 : resourceName(name),
83 socket(new QLocalSocket(q)),
84 tryOpenTimer(new QTimer(q)),
85 startingProcess(false),
86 openingConnection(false),
87 messageId(0) 82 messageId(0)
88{ 83{
89} 84}
90 85
86//Connects to server and returns connected socket on success
87static Async::Job<QSharedPointer<QLocalSocket> > connectToServer(const QByteArray &identifier)
88{
89 auto s = QSharedPointer<QLocalSocket>::create();
90 return Async::start<QSharedPointer<QLocalSocket> >([identifier, s](Async::Future<QSharedPointer<QLocalSocket> > &future) {
91 s->setServerName(identifier);
92 auto context = new QObject;
93 QObject::connect(s.data(), &QLocalSocket::connected, context, [&future, &s, context]() {
94 Q_ASSERT(s);
95 delete context;
96 future.setValue(s);
97 future.setFinished();
98 });
99 QObject::connect(s.data(), static_cast<void(QLocalSocket::*)(QLocalSocket::LocalSocketError)>(&QLocalSocket::error), context, [&future, context](QLocalSocket::LocalSocketError) {
100 delete context;
101 future.setError(-1, "Failed to connect to server.");
102 });
103 s->open();
104 });
105}
106
107Async::Job<void> ResourceAccess::Private::tryToConnect()
108{
109 return Async::dowhile([this]() -> bool {
110 //TODO abort after N retries?
111 return !socket;
112 },
113 [this](Async::Future<void> &future) {
114 Trace() << "Loop";
115 Async::wait(50)
116 .then(connectToServer(resourceName))
117 .then<void, QSharedPointer<QLocalSocket> >([this, &future](const QSharedPointer<QLocalSocket> &s) {
118 Q_ASSERT(s);
119 socket = s;
120 future.setFinished();
121 },
122 [&future](int errorCode, const QString &errorString) {
123 future.setFinished();
124 }).exec();
125 });
126}
127
128Async::Job<void> ResourceAccess::Private::initializeSocket()
129{
130 return Async::start<void>([this](Async::Future<void> &future) {
131 Trace() << "Trying to connect";
132 connectToServer(resourceName).then<void, QSharedPointer<QLocalSocket> >([this, &future](const QSharedPointer<QLocalSocket> &s) {
133 Trace() << "Connected to resource, without having to start it.";
134 Q_ASSERT(s);
135 socket = s;
136 future.setFinished();
137 },
138 [this, &future](int errorCode, const QString &errorString) {
139 Trace() << "Failed to connect, starting resource";
140 //We failed to connect, so let's start the resource
141 QStringList args;
142 args << resourceName;
143 if (QProcess::startDetached("akonadi2_synchronizer", args, QDir::homePath())) {
144 tryToConnect()
145 .then<void>([&future]() {
146 future.setFinished();
147 }).exec();
148 } else {
149 Warning() << "Failed to start resource";
150 }
151 }).exec();
152 });
153}
154
91ResourceAccess::ResourceAccess(const QByteArray &resourceName, QObject *parent) 155ResourceAccess::ResourceAccess(const QByteArray &resourceName, QObject *parent)
92 : QObject(parent), 156 : QObject(parent),
93 d(new Private(resourceName, this)) 157 d(new Private(resourceName, this))
94{ 158{
95 d->tryOpenTimer->setInterval(50);
96 d->tryOpenTimer->setSingleShot(true);
97 connect(d->tryOpenTimer, &QTimer::timeout,
98 this, &ResourceAccess::open);
99
100 log("Starting access"); 159 log("Starting access");
101 connect(d->socket, &QLocalSocket::connected,
102 this, &ResourceAccess::connected);
103 connect(d->socket, &QLocalSocket::disconnected,
104 this, &ResourceAccess::disconnected);
105 connect(d->socket, SIGNAL(error(QLocalSocket::LocalSocketError)),
106 this, SLOT(connectionError(QLocalSocket::LocalSocketError)));
107 connect(d->socket, &QIODevice::readyRead,
108 this, &ResourceAccess::readResourceMessage);
109} 160}
110 161
111ResourceAccess::~ResourceAccess() 162ResourceAccess::~ResourceAccess()
@@ -120,7 +171,7 @@ QByteArray ResourceAccess::resourceName() const
120 171
121bool ResourceAccess::isReady() const 172bool ResourceAccess::isReady() const
122{ 173{
123 return d->socket->isValid(); 174 return (d->socket && d->socket->isValid());
124} 175}
125 176
126void ResourceAccess::registerCallback(uint messageId, const std::function<void(int error, const QString &errorMessage)> &callback) 177void ResourceAccess::registerCallback(uint messageId, const std::function<void(int error, const QString &errorMessage)> &callback)
@@ -174,18 +225,20 @@ Async::Job<void> ResourceAccess::synchronizeResource(bool sourceSync, bool local
174 225
175void ResourceAccess::open() 226void ResourceAccess::open()
176{ 227{
177 if (d->socket->isValid()) { 228 if (d->socket && d->socket->isValid()) {
178 log("Socket valid, so not opening again"); 229 log("Socket valid, so not opening again");
179 return; 230 return;
180 } 231 }
181 d->openingConnection = true; 232 d->initializeSocket().then<void>([this]() {
182 233 Trace() << "Socket is initialized";
183 //TODO: if we try and try and the process does not pick up 234 QObject::connect(d->socket.data(), &QLocalSocket::disconnected,
184 // we should probably try to start the process again 235 this, &ResourceAccess::disconnected);
185 d->socket->setServerName(d->resourceName); 236 QObject::connect(d->socket.data(), SIGNAL(error(QLocalSocket::LocalSocketError)),
186 log(QString("Opening %1").arg(d->socket->serverName())); 237 this, SLOT(connectionError(QLocalSocket::LocalSocketError)));
187 //FIXME: race between starting the exec and opening the socket? 238 QObject::connect(d->socket.data(), &QIODevice::readyRead,
188 d->socket->open(); 239 this, &ResourceAccess::readResourceMessage);
240 connected();
241 }).exec();
189} 242}
190 243
191void ResourceAccess::close() 244void ResourceAccess::close()
@@ -209,7 +262,7 @@ void ResourceAccess::sendCommand(const QSharedPointer<QueuedCommand> &command)
209 } 262 }
210 //Keep track of the command until we're sure it arrived 263 //Keep track of the command until we're sure it arrived
211 d->pendingCommands.insert(d->messageId, command); 264 d->pendingCommands.insert(d->messageId, command);
212 Commands::write(d->socket, d->messageId, command->commandId, command->buffer.constData(), command->buffer.size()); 265 Commands::write(d->socket.data(), d->messageId, command->commandId, command->buffer.constData(), command->buffer.size());
213} 266}
214 267
215void ResourceAccess::processCommandQueue() 268void ResourceAccess::processCommandQueue()
@@ -224,9 +277,6 @@ void ResourceAccess::processCommandQueue()
224 277
225void ResourceAccess::connected() 278void ResourceAccess::connected()
226{ 279{
227 d->startingProcess = false;
228 d->openingConnection = false;
229
230 if (!isReady()) { 280 if (!isReady()) {
231 return; 281 return;
232 } 282 }
@@ -237,7 +287,7 @@ void ResourceAccess::connected()
237 auto name = d->fbb.CreateString(QString::number(QCoreApplication::applicationPid()).toLatin1()); 287 auto name = d->fbb.CreateString(QString::number(QCoreApplication::applicationPid()).toLatin1());
238 auto command = Akonadi2::CreateHandshake(d->fbb, name); 288 auto command = Akonadi2::CreateHandshake(d->fbb, name);
239 Akonadi2::FinishHandshakeBuffer(d->fbb, command); 289 Akonadi2::FinishHandshakeBuffer(d->fbb, command);
240 Commands::write(d->socket, ++d->messageId, Commands::HandshakeCommand, d->fbb); 290 Commands::write(d->socket.data(), ++d->messageId, Commands::HandshakeCommand, d->fbb);
241 d->fbb.Clear(); 291 d->fbb.Clear();
242 } 292 }
243 293
@@ -255,21 +305,6 @@ void ResourceAccess::disconnected()
255 305
256void ResourceAccess::connectionError(QLocalSocket::LocalSocketError error) 306void ResourceAccess::connectionError(QLocalSocket::LocalSocketError error)
257{ 307{
258 //We tried to connect to the server, but the socket is not yet available.
259 //We're trying to connect but failed, start the resource and retry.
260 //Don't automatically restart on later disconnects.
261 if (d->openingConnection && error == QLocalSocket::LocalSocketError::ServerNotFoundError) {
262 startResourceAndConnect();
263 return;
264 }
265 //Retry to connect to the server while starting the process
266 if (d->startingProcess) {
267 if (!d->tryOpenTimer->isActive()) {
268 d->tryOpenTimer->start();
269 }
270 return;
271 }
272
273 if (error == QLocalSocket::PeerClosedError) { 308 if (error == QLocalSocket::PeerClosedError) {
274 Log() << "The resource closed the connection."; 309 Log() << "The resource closed the connection.";
275 } else { 310 } else {
@@ -283,24 +318,9 @@ void ResourceAccess::connectionError(QLocalSocket::LocalSocketError error)
283 d->resultHandler.clear(); 318 d->resultHandler.clear();
284} 319}
285 320
286void ResourceAccess::startResourceAndConnect()
287{
288 d->startingProcess = true;
289 Log() << "Attempting to start resource " + d->resourceName;
290 QStringList args;
291 args << d->resourceName;
292 if (QProcess::startDetached("akonadi2_synchronizer", args, QDir::homePath())) {
293 if (!d->tryOpenTimer->isActive()) {
294 d->tryOpenTimer->start();
295 }
296 } else {
297 qWarning() << "Failed to start resource";
298 }
299}
300
301void ResourceAccess::readResourceMessage() 321void ResourceAccess::readResourceMessage()
302{ 322{
303 if (!d->socket->isValid()) { 323 if (!d->socket || !d->socket->isValid()) {
304 return; 324 return;
305 } 325 }
306 326
diff --git a/common/resourceaccess.h b/common/resourceaccess.h
index 4c9d9d2..dc7640d 100644
--- a/common/resourceaccess.h
+++ b/common/resourceaccess.h
@@ -58,7 +58,6 @@ Q_SIGNALS:
58 58
59private Q_SLOTS: 59private Q_SLOTS:
60 //TODO: move these to the Private class 60 //TODO: move these to the Private class
61 void connected();
62 void disconnected(); 61 void disconnected();
63 void connectionError(QLocalSocket::LocalSocketError error); 62 void connectionError(QLocalSocket::LocalSocketError error);
64 void readResourceMessage(); 63 void readResourceMessage();
@@ -66,9 +65,9 @@ private Q_SLOTS:
66 void callCallbacks(int id); 65 void callCallbacks(int id);
67 66
68private: 67private:
68 void connected();
69 void log(const QString &message); 69 void log(const QString &message);
70 void registerCallback(uint messageId, const std::function<void(int error, const QString &)> &callback); 70 void registerCallback(uint messageId, const std::function<void(int error, const QString &)> &callback);
71 void startResourceAndConnect();
72 71
73 void sendCommand(const QSharedPointer<QueuedCommand> &command); 72 void sendCommand(const QSharedPointer<QueuedCommand> &command);
74 void processCommandQueue(); 73 void processCommandQueue();