diff options
Diffstat (limited to 'synchronizer/listener.cpp')
-rw-r--r-- | synchronizer/listener.cpp | 41 |
1 files changed, 31 insertions, 10 deletions
diff --git a/synchronizer/listener.cpp b/synchronizer/listener.cpp index a84623d..6098856 100644 --- a/synchronizer/listener.cpp +++ b/synchronizer/listener.cpp | |||
@@ -28,6 +28,7 @@ | |||
28 | #include "common/commandcompletion_generated.h" | 28 | #include "common/commandcompletion_generated.h" |
29 | #include "common/handshake_generated.h" | 29 | #include "common/handshake_generated.h" |
30 | #include "common/revisionupdate_generated.h" | 30 | #include "common/revisionupdate_generated.h" |
31 | #include "common/synchronize_generated.h" | ||
31 | 32 | ||
32 | #include <QLocalSocket> | 33 | #include <QLocalSocket> |
33 | #include <QTimer> | 34 | #include <QTimer> |
@@ -189,18 +190,38 @@ void Listener::processCommand(int commandId, uint messageId, Client &client, uin | |||
189 | break; | 190 | break; |
190 | } | 191 | } |
191 | case Akonadi2::Commands::SynchronizeCommand: { | 192 | case Akonadi2::Commands::SynchronizeCommand: { |
192 | log(QString("\tSynchronize request (id %1) from %2").arg(messageId).arg(client.name)); | 193 | flatbuffers::Verifier verifier((const uint8_t *)client.commandBuffer.constData(), size); |
193 | loadResource(); | 194 | if (Akonadi2::VerifySynchronizeBuffer(verifier)) { |
194 | if (m_resource) { | 195 | auto buffer = Akonadi2::GetSynchronize(client.commandBuffer.constData()); |
195 | qDebug() << "synchronizing"; | 196 | log(QString("\tSynchronize request (id %1) from %2").arg(messageId).arg(client.name)); |
196 | m_resource->synchronizeWithSource(m_pipeline).then<void>([callback](Async::Future<void> &f){ | 197 | loadResource(); |
197 | //FIXME: the command is complete once all pipelines have been drained, track progress and async emit result | 198 | if (!m_resource) { |
198 | callback(); | 199 | qWarning() << "No resource loaded"; |
199 | f.setFinished(); | 200 | break; |
200 | }).exec(); | 201 | } |
202 | //TODO a more elegant composition of jobs should be possible | ||
203 | if (buffer->sourceSync()) { | ||
204 | bool localSync = buffer->localSync(); | ||
205 | m_resource->synchronizeWithSource(m_pipeline).then<void>([callback, localSync, this](Async::Future<void> &f){ | ||
206 | if (localSync) { | ||
207 | m_resource->processAllMessages().then<void>([callback](Async::Future<void> &f){ | ||
208 | callback(); | ||
209 | f.setFinished(); | ||
210 | }).exec(); | ||
211 | } else { | ||
212 | callback(); | ||
213 | f.setFinished(); | ||
214 | } | ||
215 | }).exec(); | ||
216 | } else if (buffer->localSync()) { | ||
217 | m_resource->processAllMessages().then<void>([callback](Async::Future<void> &f){ | ||
218 | callback(); | ||
219 | f.setFinished(); | ||
220 | }).exec(); | ||
221 | } | ||
201 | return; | 222 | return; |
202 | } else { | 223 | } else { |
203 | qWarning() << "No resource loaded"; | 224 | qWarning() << "received invalid command"; |
204 | } | 225 | } |
205 | break; | 226 | break; |
206 | } | 227 | } |