summaryrefslogtreecommitdiffstats
path: root/common/resourceaccess.cpp
diff options
context:
space:
mode:
authorAaron Seigo <aseigo@kde.org>2014-12-16 12:13:06 +0100
committerAaron Seigo <aseigo@kde.org>2014-12-16 12:13:06 +0100
commitabdbd0e8d9a761b7906da2999d4fd58f2771c202 (patch)
tree135855e4a3d853573da93b2a43e2b730ab6b9f6a /common/resourceaccess.cpp
parente2ba730949837633490cefbb55165c81f2ffeb96 (diff)
downloadsink-abdbd0e8d9a761b7906da2999d4fd58f2771c202.tar.gz
sink-abdbd0e8d9a761b7906da2999d4fd58f2771c202.zip
use a dptr, API for sending commands, queue commands until connected
Diffstat (limited to 'common/resourceaccess.cpp')
-rw-r--r--common/resourceaccess.cpp191
1 files changed, 142 insertions, 49 deletions
diff --git a/common/resourceaccess.cpp b/common/resourceaccess.cpp
index 2b58545..a4f3c94 100644
--- a/common/resourceaccess.cpp
+++ b/common/resourceaccess.cpp
@@ -26,33 +26,88 @@
26#include "common/revisionupdate_generated.h" 26#include "common/revisionupdate_generated.h"
27 27
28#include <QDebug> 28#include <QDebug>
29#include <QDir>
29#include <QProcess> 30#include <QProcess>
30 31
31namespace Akonadi2 32namespace Akonadi2
32{ 33{
33 34
35class QueuedCommand
36{
37public:
38 QueuedCommand(int commandId)
39 : m_commandId(commandId),
40 m_bufferSize(0),
41 m_buffer(0)
42 {}
43
44 QueuedCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb)
45 : m_commandId(commandId),
46 m_bufferSize(fbb.GetSize()),
47 m_buffer(new char[m_bufferSize])
48 {
49 memcpy(m_buffer, fbb.GetBufferPointer(), m_bufferSize);
50 }
51
52 ~QueuedCommand()
53 {
54 delete[] m_buffer;
55 }
56
57 void write(QIODevice *device)
58 {
59 Console::main()->log(QString("\tSending queued command %1").arg(m_commandId));
60 Commands::write(device, m_commandId, m_buffer, m_bufferSize);
61 }
62
63private:
64 QueuedCommand(const QueuedCommand &other);
65 QueuedCommand &operator=(const QueuedCommand &rhs);
66
67 const int m_commandId;
68 const uint m_bufferSize;
69 char *m_buffer;
70};
71
72class ResourceAccess::Private
73{
74public:
75 Private(const QString &name, ResourceAccess *ra);
76 QString resourceName;
77 QLocalSocket *socket;
78 QTimer *tryOpenTimer;
79 bool startingProcess;
80 QByteArray partialMessageBuffer;
81 flatbuffers::FlatBufferBuilder fbb;
82 QVector<QueuedCommand *> commandQueue;
83};
84
85ResourceAccess::Private::Private(const QString &name, ResourceAccess *q)
86 : resourceName(name),
87 socket(new QLocalSocket(q)),
88 tryOpenTimer(new QTimer(q)),
89 startingProcess(false)
90{
91}
92
34ResourceAccess::ResourceAccess(const QString &resourceName, QObject *parent) 93ResourceAccess::ResourceAccess(const QString &resourceName, QObject *parent)
35 : QObject(parent), 94 : QObject(parent),
36 m_resourceName(resourceName), 95 d(new Private(resourceName, this))
37 m_socket(new QLocalSocket(this)),
38 m_tryOpenTimer(new QTimer(this)),
39 m_startingProcess(false)
40{ 96{
41 m_tryOpenTimer->setInterval(50); 97 d->tryOpenTimer->setInterval(50);
42 m_tryOpenTimer->setSingleShot(true); 98 d->tryOpenTimer->setSingleShot(true);
43 connect(m_tryOpenTimer, &QTimer::timeout, 99 connect(d->tryOpenTimer, &QTimer::timeout,
44 this, &ResourceAccess::open); 100 this, &ResourceAccess::open);
45 101
46 log("Starting access"); 102 log("Starting access");
47 connect(m_socket, &QLocalSocket::connected, 103 connect(d->socket, &QLocalSocket::connected,
48 this, &ResourceAccess::connected); 104 this, &ResourceAccess::connected);
49 connect(m_socket, &QLocalSocket::disconnected, 105 connect(d->socket, &QLocalSocket::disconnected,
50 this, &ResourceAccess::disconnected); 106 this, &ResourceAccess::disconnected);
51 connect(m_socket, SIGNAL(error(QLocalSocket::LocalSocketError)), 107 connect(d->socket, SIGNAL(error(QLocalSocket::LocalSocketError)),
52 this, SLOT(connectionError(QLocalSocket::LocalSocketError))); 108 this, SLOT(connectionError(QLocalSocket::LocalSocketError)));
53 connect(m_socket, &QIODevice::readyRead, 109 connect(d->socket, &QIODevice::readyRead,
54 this, &ResourceAccess::readResourceMessage); 110 this, &ResourceAccess::readResourceMessage);
55
56} 111}
57 112
58ResourceAccess::~ResourceAccess() 113ResourceAccess::~ResourceAccess()
@@ -62,53 +117,89 @@ ResourceAccess::~ResourceAccess()
62 117
63QString ResourceAccess::resourceName() const 118QString ResourceAccess::resourceName() const
64{ 119{
65 return m_resourceName; 120 return d->resourceName;
66} 121}
67 122
68bool ResourceAccess::isReady() const 123bool ResourceAccess::isReady() const
69{ 124{
70 return m_socket->isValid(); 125 return d->socket->isValid();
126}
127
128void ResourceAccess::sendCommand(int commandId)
129{
130 if (isReady()) {
131 log(QString("Sending command %1").arg(commandId));
132 Commands::write(d->socket, commandId);
133 } else {
134 d->commandQueue << new QueuedCommand(commandId);
135 }
136}
137
138void ResourceAccess::sendCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb)
139{
140 if (isReady()) {
141 log(QString("Sending command %1").arg(commandId));
142 Commands::write(d->socket, commandId, fbb);
143 } else {
144 d->commandQueue << new QueuedCommand(commandId, fbb);
145 }
71} 146}
72 147
73void ResourceAccess::open() 148void ResourceAccess::open()
74{ 149{
75 if (m_socket->isValid()) { 150 if (d->socket->isValid()) {
76 log("Socket valid, so aborting the open"); 151 log("Socket valid, so not opening again");
77 return; 152 return;
78 } 153 }
79 154
80 m_socket->setServerName(m_resourceName); 155 //TODO: if we try and try and the process does not pick up
81 log(QString("Opening %1").arg(m_socket->serverName())); 156 // we should probably try to start the process again
157 d->socket->setServerName(d->resourceName);
158 log(QString("Opening %1").arg(d->socket->serverName()));
82 //FIXME: race between starting the exec and opening the socket? 159 //FIXME: race between starting the exec and opening the socket?
83 m_socket->open(); 160 d->socket->open();
84} 161}
85 162
86void ResourceAccess::close() 163void ResourceAccess::close()
87{ 164{
88 log(QString("Closing %1").arg(m_socket->fullServerName())); 165 log(QString("Closing %1").arg(d->socket->fullServerName()));
89 m_socket->close(); 166 d->socket->close();
90} 167}
91 168
92void ResourceAccess::connected() 169void ResourceAccess::connected()
93{ 170{
94 m_startingProcess = false; 171 d->startingProcess = false;
95 log(QString("Connected: ").arg(m_socket->fullServerName())); 172
173 if (!isReady()) {
174 return;
175 }
176
177 log(QString("Connected: %1").arg(d->socket->fullServerName()));
96 178
97 { 179 {
98 auto name = m_fbb.CreateString(QString::number((long long)this).toLatin1()); 180 auto name = d->fbb.CreateString(QString::number((long long)this).toLatin1());
99 auto command = Akonadi2::CreateHandshake(m_fbb, name); 181 auto command = Akonadi2::CreateHandshake(d->fbb, name);
100 Akonadi2::FinishHandshakeBuffer(m_fbb, command); 182 Akonadi2::FinishHandshakeBuffer(d->fbb, command);
101 Commands::write(m_socket, Commands::HandshakeCommand, m_fbb); 183 Commands::write(d->socket, Commands::HandshakeCommand, d->fbb);
102 m_fbb.Clear(); 184 d->fbb.Clear();
185 }
186
187 //TODO: should confirm the commands made it with a response?
188 //TODO: serialize instead of blast them all through the socket?
189 log(QString("We have %1 queued commands").arg(d->commandQueue.size()));
190 for (QueuedCommand *command: d->commandQueue) {
191 command->write(d->socket);
192 delete command;
103 } 193 }
194 d->commandQueue.clear();
104 195
105 emit ready(true); 196 emit ready(true);
106} 197}
107 198
108void ResourceAccess::disconnected() 199void ResourceAccess::disconnected()
109{ 200{
110 m_socket->close(); 201 d->socket->close();
111 log(QString("Disconnected from %1").arg(m_socket->fullServerName())); 202 log(QString("Disconnected from %1").arg(d->socket->fullServerName()));
112 emit ready(false); 203 emit ready(false);
113 open(); 204 open();
114} 205}
@@ -116,29 +207,31 @@ void ResourceAccess::disconnected()
116void ResourceAccess::connectionError(QLocalSocket::LocalSocketError error) 207void ResourceAccess::connectionError(QLocalSocket::LocalSocketError error)
117{ 208{
118 log(QString("Connection error: %2").arg(error)); 209 log(QString("Connection error: %2").arg(error));
119 if (m_startingProcess) { 210 if (d->startingProcess) {
120 if (!m_tryOpenTimer->isActive()) { 211 if (!d->tryOpenTimer->isActive()) {
121 m_tryOpenTimer->start(); 212 d->tryOpenTimer->start();
122 } 213 }
123 return; 214 return;
124 } 215 }
125 216
126 m_startingProcess = true; 217 d->startingProcess = true;
127 log(QString("Attempting to start resource ") + m_resourceName); 218 log(QString("Attempting to start resource ") + d->resourceName);
128 QStringList args; 219 QStringList args;
129 args << m_resourceName; 220 args << d->resourceName;
130 if (QProcess::startDetached("akonadi2_synchronizer", args)) { 221 if (QProcess::startDetached("akonadi2_synchronizer", args, QDir::homePath())) {
131 m_socket->open(); 222 if (!d->tryOpenTimer->isActive()) {
223 d->tryOpenTimer->start();
224 }
132 } 225 }
133} 226}
134 227
135void ResourceAccess::readResourceMessage() 228void ResourceAccess::readResourceMessage()
136{ 229{
137 if (!m_socket->isValid()) { 230 if (!d->socket->isValid()) {
138 return; 231 return;
139 } 232 }
140 233
141 m_partialMessageBuffer += m_socket->readAll(); 234 d->partialMessageBuffer += d->socket->readAll();
142 235
143 // should be scheduled rather than processed all at once 236 // should be scheduled rather than processed all at once
144 while (processMessageBuffer()) {} 237 while (processMessageBuffer()) {}
@@ -147,20 +240,20 @@ void ResourceAccess::readResourceMessage()
147bool ResourceAccess::processMessageBuffer() 240bool ResourceAccess::processMessageBuffer()
148{ 241{
149 static const int headerSize = (sizeof(int) * 2); 242 static const int headerSize = (sizeof(int) * 2);
150 if (m_partialMessageBuffer.size() < headerSize) { 243 if (d->partialMessageBuffer.size() < headerSize) {
151 return false; 244 return false;
152 } 245 }
153 246
154 const int commandId = *(int*)m_partialMessageBuffer.constData(); 247 const int commandId = *(int*)d->partialMessageBuffer.constData();
155 const int size = *(int*)(m_partialMessageBuffer.constData() + sizeof(int)); 248 const int size = *(int*)(d->partialMessageBuffer.constData() + sizeof(int));
156 249
157 if (size > m_partialMessageBuffer.size() - headerSize) { 250 if (size > d->partialMessageBuffer.size() - headerSize) {
158 return false; 251 return false;
159 } 252 }
160 253
161 switch (commandId) { 254 switch (commandId) {
162 case Commands::RevisionUpdateCommand: { 255 case Commands::RevisionUpdateCommand: {
163 auto buffer = Akonadi2::GetRevisionUpdate(m_partialMessageBuffer.constData() + headerSize); 256 auto buffer = Akonadi2::GetRevisionUpdate(d->partialMessageBuffer.constData() + headerSize);
164 log(QString("Revision updated to: %1").arg(buffer->revision())); 257 log(QString("Revision updated to: %1").arg(buffer->revision()));
165 emit revisionChanged(buffer->revision()); 258 emit revisionChanged(buffer->revision());
166 break; 259 break;
@@ -169,13 +262,13 @@ bool ResourceAccess::processMessageBuffer()
169 break; 262 break;
170 } 263 }
171 264
172 m_partialMessageBuffer.remove(0, headerSize + size); 265 d->partialMessageBuffer.remove(0, headerSize + size);
173 return m_partialMessageBuffer.size() >= headerSize; 266 return d->partialMessageBuffer.size() >= headerSize;
174} 267}
175 268
176void ResourceAccess::log(const QString &message) 269void ResourceAccess::log(const QString &message)
177{ 270{
178 Console::main()->log(m_resourceName + ": " + message); 271 Console::main()->log(d->resourceName + ": " + message);
179} 272}
180 273
181} 274}