diff options
-rw-r--r-- | common/resourceaccess.cpp | 191 | ||||
-rw-r--r-- | common/resourceaccess.h | 13 |
2 files changed, 149 insertions, 55 deletions
diff --git a/common/resourceaccess.cpp b/common/resourceaccess.cpp index 2b58545..a4f3c94 100644 --- a/common/resourceaccess.cpp +++ b/common/resourceaccess.cpp | |||
@@ -26,33 +26,88 @@ | |||
26 | #include "common/revisionupdate_generated.h" | 26 | #include "common/revisionupdate_generated.h" |
27 | 27 | ||
28 | #include <QDebug> | 28 | #include <QDebug> |
29 | #include <QDir> | ||
29 | #include <QProcess> | 30 | #include <QProcess> |
30 | 31 | ||
31 | namespace Akonadi2 | 32 | namespace Akonadi2 |
32 | { | 33 | { |
33 | 34 | ||
35 | class QueuedCommand | ||
36 | { | ||
37 | public: | ||
38 | QueuedCommand(int commandId) | ||
39 | : m_commandId(commandId), | ||
40 | m_bufferSize(0), | ||
41 | m_buffer(0) | ||
42 | {} | ||
43 | |||
44 | QueuedCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb) | ||
45 | : m_commandId(commandId), | ||
46 | m_bufferSize(fbb.GetSize()), | ||
47 | m_buffer(new char[m_bufferSize]) | ||
48 | { | ||
49 | memcpy(m_buffer, fbb.GetBufferPointer(), m_bufferSize); | ||
50 | } | ||
51 | |||
52 | ~QueuedCommand() | ||
53 | { | ||
54 | delete[] m_buffer; | ||
55 | } | ||
56 | |||
57 | void write(QIODevice *device) | ||
58 | { | ||
59 | Console::main()->log(QString("\tSending queued command %1").arg(m_commandId)); | ||
60 | Commands::write(device, m_commandId, m_buffer, m_bufferSize); | ||
61 | } | ||
62 | |||
63 | private: | ||
64 | QueuedCommand(const QueuedCommand &other); | ||
65 | QueuedCommand &operator=(const QueuedCommand &rhs); | ||
66 | |||
67 | const int m_commandId; | ||
68 | const uint m_bufferSize; | ||
69 | char *m_buffer; | ||
70 | }; | ||
71 | |||
72 | class ResourceAccess::Private | ||
73 | { | ||
74 | public: | ||
75 | Private(const QString &name, ResourceAccess *ra); | ||
76 | QString resourceName; | ||
77 | QLocalSocket *socket; | ||
78 | QTimer *tryOpenTimer; | ||
79 | bool startingProcess; | ||
80 | QByteArray partialMessageBuffer; | ||
81 | flatbuffers::FlatBufferBuilder fbb; | ||
82 | QVector<QueuedCommand *> commandQueue; | ||
83 | }; | ||
84 | |||
85 | ResourceAccess::Private::Private(const QString &name, ResourceAccess *q) | ||
86 | : resourceName(name), | ||
87 | socket(new QLocalSocket(q)), | ||
88 | tryOpenTimer(new QTimer(q)), | ||
89 | startingProcess(false) | ||
90 | { | ||
91 | } | ||
92 | |||
34 | ResourceAccess::ResourceAccess(const QString &resourceName, QObject *parent) | 93 | ResourceAccess::ResourceAccess(const QString &resourceName, QObject *parent) |
35 | : QObject(parent), | 94 | : QObject(parent), |
36 | m_resourceName(resourceName), | 95 | d(new Private(resourceName, this)) |
37 | m_socket(new QLocalSocket(this)), | ||
38 | m_tryOpenTimer(new QTimer(this)), | ||
39 | m_startingProcess(false) | ||
40 | { | 96 | { |
41 | m_tryOpenTimer->setInterval(50); | 97 | d->tryOpenTimer->setInterval(50); |
42 | m_tryOpenTimer->setSingleShot(true); | 98 | d->tryOpenTimer->setSingleShot(true); |
43 | connect(m_tryOpenTimer, &QTimer::timeout, | 99 | connect(d->tryOpenTimer, &QTimer::timeout, |
44 | this, &ResourceAccess::open); | 100 | this, &ResourceAccess::open); |
45 | 101 | ||
46 | log("Starting access"); | 102 | log("Starting access"); |
47 | connect(m_socket, &QLocalSocket::connected, | 103 | connect(d->socket, &QLocalSocket::connected, |
48 | this, &ResourceAccess::connected); | 104 | this, &ResourceAccess::connected); |
49 | connect(m_socket, &QLocalSocket::disconnected, | 105 | connect(d->socket, &QLocalSocket::disconnected, |
50 | this, &ResourceAccess::disconnected); | 106 | this, &ResourceAccess::disconnected); |
51 | connect(m_socket, SIGNAL(error(QLocalSocket::LocalSocketError)), | 107 | connect(d->socket, SIGNAL(error(QLocalSocket::LocalSocketError)), |
52 | this, SLOT(connectionError(QLocalSocket::LocalSocketError))); | 108 | this, SLOT(connectionError(QLocalSocket::LocalSocketError))); |
53 | connect(m_socket, &QIODevice::readyRead, | 109 | connect(d->socket, &QIODevice::readyRead, |
54 | this, &ResourceAccess::readResourceMessage); | 110 | this, &ResourceAccess::readResourceMessage); |
55 | |||
56 | } | 111 | } |
57 | 112 | ||
58 | ResourceAccess::~ResourceAccess() | 113 | ResourceAccess::~ResourceAccess() |
@@ -62,53 +117,89 @@ ResourceAccess::~ResourceAccess() | |||
62 | 117 | ||
63 | QString ResourceAccess::resourceName() const | 118 | QString ResourceAccess::resourceName() const |
64 | { | 119 | { |
65 | return m_resourceName; | 120 | return d->resourceName; |
66 | } | 121 | } |
67 | 122 | ||
68 | bool ResourceAccess::isReady() const | 123 | bool ResourceAccess::isReady() const |
69 | { | 124 | { |
70 | return m_socket->isValid(); | 125 | return d->socket->isValid(); |
126 | } | ||
127 | |||
128 | void ResourceAccess::sendCommand(int commandId) | ||
129 | { | ||
130 | if (isReady()) { | ||
131 | log(QString("Sending command %1").arg(commandId)); | ||
132 | Commands::write(d->socket, commandId); | ||
133 | } else { | ||
134 | d->commandQueue << new QueuedCommand(commandId); | ||
135 | } | ||
136 | } | ||
137 | |||
138 | void ResourceAccess::sendCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb) | ||
139 | { | ||
140 | if (isReady()) { | ||
141 | log(QString("Sending command %1").arg(commandId)); | ||
142 | Commands::write(d->socket, commandId, fbb); | ||
143 | } else { | ||
144 | d->commandQueue << new QueuedCommand(commandId, fbb); | ||
145 | } | ||
71 | } | 146 | } |
72 | 147 | ||
73 | void ResourceAccess::open() | 148 | void ResourceAccess::open() |
74 | { | 149 | { |
75 | if (m_socket->isValid()) { | 150 | if (d->socket->isValid()) { |
76 | log("Socket valid, so aborting the open"); | 151 | log("Socket valid, so not opening again"); |
77 | return; | 152 | return; |
78 | } | 153 | } |
79 | 154 | ||
80 | m_socket->setServerName(m_resourceName); | 155 | //TODO: if we try and try and the process does not pick up |
81 | log(QString("Opening %1").arg(m_socket->serverName())); | 156 | // we should probably try to start the process again |
157 | d->socket->setServerName(d->resourceName); | ||
158 | log(QString("Opening %1").arg(d->socket->serverName())); | ||
82 | //FIXME: race between starting the exec and opening the socket? | 159 | //FIXME: race between starting the exec and opening the socket? |
83 | m_socket->open(); | 160 | d->socket->open(); |
84 | } | 161 | } |
85 | 162 | ||
86 | void ResourceAccess::close() | 163 | void ResourceAccess::close() |
87 | { | 164 | { |
88 | log(QString("Closing %1").arg(m_socket->fullServerName())); | 165 | log(QString("Closing %1").arg(d->socket->fullServerName())); |
89 | m_socket->close(); | 166 | d->socket->close(); |
90 | } | 167 | } |
91 | 168 | ||
92 | void ResourceAccess::connected() | 169 | void ResourceAccess::connected() |
93 | { | 170 | { |
94 | m_startingProcess = false; | 171 | d->startingProcess = false; |
95 | log(QString("Connected: ").arg(m_socket->fullServerName())); | 172 | |
173 | if (!isReady()) { | ||
174 | return; | ||
175 | } | ||
176 | |||
177 | log(QString("Connected: %1").arg(d->socket->fullServerName())); | ||
96 | 178 | ||
97 | { | 179 | { |
98 | auto name = m_fbb.CreateString(QString::number((long long)this).toLatin1()); | 180 | auto name = d->fbb.CreateString(QString::number((long long)this).toLatin1()); |
99 | auto command = Akonadi2::CreateHandshake(m_fbb, name); | 181 | auto command = Akonadi2::CreateHandshake(d->fbb, name); |
100 | Akonadi2::FinishHandshakeBuffer(m_fbb, command); | 182 | Akonadi2::FinishHandshakeBuffer(d->fbb, command); |
101 | Commands::write(m_socket, Commands::HandshakeCommand, m_fbb); | 183 | Commands::write(d->socket, Commands::HandshakeCommand, d->fbb); |
102 | m_fbb.Clear(); | 184 | d->fbb.Clear(); |
185 | } | ||
186 | |||
187 | //TODO: should confirm the commands made it with a response? | ||
188 | //TODO: serialize instead of blast them all through the socket? | ||
189 | log(QString("We have %1 queued commands").arg(d->commandQueue.size())); | ||
190 | for (QueuedCommand *command: d->commandQueue) { | ||
191 | command->write(d->socket); | ||
192 | delete command; | ||
103 | } | 193 | } |
194 | d->commandQueue.clear(); | ||
104 | 195 | ||
105 | emit ready(true); | 196 | emit ready(true); |
106 | } | 197 | } |
107 | 198 | ||
108 | void ResourceAccess::disconnected() | 199 | void ResourceAccess::disconnected() |
109 | { | 200 | { |
110 | m_socket->close(); | 201 | d->socket->close(); |
111 | log(QString("Disconnected from %1").arg(m_socket->fullServerName())); | 202 | log(QString("Disconnected from %1").arg(d->socket->fullServerName())); |
112 | emit ready(false); | 203 | emit ready(false); |
113 | open(); | 204 | open(); |
114 | } | 205 | } |
@@ -116,29 +207,31 @@ void ResourceAccess::disconnected() | |||
116 | void ResourceAccess::connectionError(QLocalSocket::LocalSocketError error) | 207 | void ResourceAccess::connectionError(QLocalSocket::LocalSocketError error) |
117 | { | 208 | { |
118 | log(QString("Connection error: %2").arg(error)); | 209 | log(QString("Connection error: %2").arg(error)); |
119 | if (m_startingProcess) { | 210 | if (d->startingProcess) { |
120 | if (!m_tryOpenTimer->isActive()) { | 211 | if (!d->tryOpenTimer->isActive()) { |
121 | m_tryOpenTimer->start(); | 212 | d->tryOpenTimer->start(); |
122 | } | 213 | } |
123 | return; | 214 | return; |
124 | } | 215 | } |
125 | 216 | ||
126 | m_startingProcess = true; | 217 | d->startingProcess = true; |
127 | log(QString("Attempting to start resource ") + m_resourceName); | 218 | log(QString("Attempting to start resource ") + d->resourceName); |
128 | QStringList args; | 219 | QStringList args; |
129 | args << m_resourceName; | 220 | args << d->resourceName; |
130 | if (QProcess::startDetached("akonadi2_synchronizer", args)) { | 221 | if (QProcess::startDetached("akonadi2_synchronizer", args, QDir::homePath())) { |
131 | m_socket->open(); | 222 | if (!d->tryOpenTimer->isActive()) { |
223 | d->tryOpenTimer->start(); | ||
224 | } | ||
132 | } | 225 | } |
133 | } | 226 | } |
134 | 227 | ||
135 | void ResourceAccess::readResourceMessage() | 228 | void ResourceAccess::readResourceMessage() |
136 | { | 229 | { |
137 | if (!m_socket->isValid()) { | 230 | if (!d->socket->isValid()) { |
138 | return; | 231 | return; |
139 | } | 232 | } |
140 | 233 | ||
141 | m_partialMessageBuffer += m_socket->readAll(); | 234 | d->partialMessageBuffer += d->socket->readAll(); |
142 | 235 | ||
143 | // should be scheduled rather than processed all at once | 236 | // should be scheduled rather than processed all at once |
144 | while (processMessageBuffer()) {} | 237 | while (processMessageBuffer()) {} |
@@ -147,20 +240,20 @@ void ResourceAccess::readResourceMessage() | |||
147 | bool ResourceAccess::processMessageBuffer() | 240 | bool ResourceAccess::processMessageBuffer() |
148 | { | 241 | { |
149 | static const int headerSize = (sizeof(int) * 2); | 242 | static const int headerSize = (sizeof(int) * 2); |
150 | if (m_partialMessageBuffer.size() < headerSize) { | 243 | if (d->partialMessageBuffer.size() < headerSize) { |
151 | return false; | 244 | return false; |
152 | } | 245 | } |
153 | 246 | ||
154 | const int commandId = *(int*)m_partialMessageBuffer.constData(); | 247 | const int commandId = *(int*)d->partialMessageBuffer.constData(); |
155 | const int size = *(int*)(m_partialMessageBuffer.constData() + sizeof(int)); | 248 | const int size = *(int*)(d->partialMessageBuffer.constData() + sizeof(int)); |
156 | 249 | ||
157 | if (size > m_partialMessageBuffer.size() - headerSize) { | 250 | if (size > d->partialMessageBuffer.size() - headerSize) { |
158 | return false; | 251 | return false; |
159 | } | 252 | } |
160 | 253 | ||
161 | switch (commandId) { | 254 | switch (commandId) { |
162 | case Commands::RevisionUpdateCommand: { | 255 | case Commands::RevisionUpdateCommand: { |
163 | auto buffer = Akonadi2::GetRevisionUpdate(m_partialMessageBuffer.constData() + headerSize); | 256 | auto buffer = Akonadi2::GetRevisionUpdate(d->partialMessageBuffer.constData() + headerSize); |
164 | log(QString("Revision updated to: %1").arg(buffer->revision())); | 257 | log(QString("Revision updated to: %1").arg(buffer->revision())); |
165 | emit revisionChanged(buffer->revision()); | 258 | emit revisionChanged(buffer->revision()); |
166 | break; | 259 | break; |
@@ -169,13 +262,13 @@ bool ResourceAccess::processMessageBuffer() | |||
169 | break; | 262 | break; |
170 | } | 263 | } |
171 | 264 | ||
172 | m_partialMessageBuffer.remove(0, headerSize + size); | 265 | d->partialMessageBuffer.remove(0, headerSize + size); |
173 | return m_partialMessageBuffer.size() >= headerSize; | 266 | return d->partialMessageBuffer.size() >= headerSize; |
174 | } | 267 | } |
175 | 268 | ||
176 | void ResourceAccess::log(const QString &message) | 269 | void ResourceAccess::log(const QString &message) |
177 | { | 270 | { |
178 | Console::main()->log(m_resourceName + ": " + message); | 271 | Console::main()->log(d->resourceName + ": " + message); |
179 | } | 272 | } |
180 | 273 | ||
181 | } | 274 | } |
diff --git a/common/resourceaccess.h b/common/resourceaccess.h index f381af1..3a35af6 100644 --- a/common/resourceaccess.h +++ b/common/resourceaccess.h | |||
@@ -40,6 +40,9 @@ public: | |||
40 | QString resourceName() const; | 40 | QString resourceName() const; |
41 | bool isReady() const; | 41 | bool isReady() const; |
42 | 42 | ||
43 | void sendCommand(int commandId); | ||
44 | void sendCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb); | ||
45 | |||
43 | public Q_SLOTS: | 46 | public Q_SLOTS: |
44 | void open(); | 47 | void open(); |
45 | void close(); | 48 | void close(); |
@@ -49,6 +52,7 @@ Q_SIGNALS: | |||
49 | void revisionChanged(unsigned long long revision); | 52 | void revisionChanged(unsigned long long revision); |
50 | 53 | ||
51 | private Q_SLOTS: | 54 | private Q_SLOTS: |
55 | //TODO: move these to the Private class | ||
52 | void connected(); | 56 | void connected(); |
53 | void disconnected(); | 57 | void disconnected(); |
54 | void connectionError(QLocalSocket::LocalSocketError error); | 58 | void connectionError(QLocalSocket::LocalSocketError error); |
@@ -57,12 +61,9 @@ private Q_SLOTS: | |||
57 | 61 | ||
58 | private: | 62 | private: |
59 | void log(const QString &message); | 63 | void log(const QString &message); |
60 | QString m_resourceName; | 64 | |
61 | QLocalSocket *m_socket; | 65 | class Private; |
62 | QTimer *m_tryOpenTimer; | 66 | Private * const d; |
63 | bool m_startingProcess; | ||
64 | QByteArray m_partialMessageBuffer; | ||
65 | flatbuffers::FlatBufferBuilder m_fbb; | ||
66 | }; | 67 | }; |
67 | 68 | ||
68 | } | 69 | } |