diff options
author | Christian Mollekopf <chrigi_1@fastmail.fm> | 2015-01-25 11:23:08 +0100 |
---|---|---|
committer | Christian Mollekopf <chrigi_1@fastmail.fm> | 2015-01-25 11:23:08 +0100 |
commit | 3fc8ce958fc244e64a3a3a92f3b1440aae04133b (patch) | |
tree | 4ba2b3ca3ee6a17e7f3e7ce67d6ca934626cad7a /synchronizer | |
parent | 9b744da32e64d8a6cd342faba8fc3232884d60f2 (diff) | |
download | sink-3fc8ce958fc244e64a3a3a92f3b1440aae04133b.tar.gz sink-3fc8ce958fc244e64a3a3a92f3b1440aae04133b.zip |
A way to ensure all messages have been processed.
As queries become reactive this should become less important. We can then just wait until
all results become available. For tests it is in either case useful though.
Diffstat (limited to 'synchronizer')
-rw-r--r-- | synchronizer/listener.cpp | 41 |
1 files changed, 31 insertions, 10 deletions
diff --git a/synchronizer/listener.cpp b/synchronizer/listener.cpp index a84623d..6098856 100644 --- a/synchronizer/listener.cpp +++ b/synchronizer/listener.cpp | |||
@@ -28,6 +28,7 @@ | |||
28 | #include "common/commandcompletion_generated.h" | 28 | #include "common/commandcompletion_generated.h" |
29 | #include "common/handshake_generated.h" | 29 | #include "common/handshake_generated.h" |
30 | #include "common/revisionupdate_generated.h" | 30 | #include "common/revisionupdate_generated.h" |
31 | #include "common/synchronize_generated.h" | ||
31 | 32 | ||
32 | #include <QLocalSocket> | 33 | #include <QLocalSocket> |
33 | #include <QTimer> | 34 | #include <QTimer> |
@@ -189,18 +190,38 @@ void Listener::processCommand(int commandId, uint messageId, Client &client, uin | |||
189 | break; | 190 | break; |
190 | } | 191 | } |
191 | case Akonadi2::Commands::SynchronizeCommand: { | 192 | case Akonadi2::Commands::SynchronizeCommand: { |
192 | log(QString("\tSynchronize request (id %1) from %2").arg(messageId).arg(client.name)); | 193 | flatbuffers::Verifier verifier((const uint8_t *)client.commandBuffer.constData(), size); |
193 | loadResource(); | 194 | if (Akonadi2::VerifySynchronizeBuffer(verifier)) { |
194 | if (m_resource) { | 195 | auto buffer = Akonadi2::GetSynchronize(client.commandBuffer.constData()); |
195 | qDebug() << "synchronizing"; | 196 | log(QString("\tSynchronize request (id %1) from %2").arg(messageId).arg(client.name)); |
196 | m_resource->synchronizeWithSource(m_pipeline).then<void>([callback](Async::Future<void> &f){ | 197 | loadResource(); |
197 | //FIXME: the command is complete once all pipelines have been drained, track progress and async emit result | 198 | if (!m_resource) { |
198 | callback(); | 199 | qWarning() << "No resource loaded"; |
199 | f.setFinished(); | 200 | break; |
200 | }).exec(); | 201 | } |
202 | //TODO a more elegant composition of jobs should be possible | ||
203 | if (buffer->sourceSync()) { | ||
204 | bool localSync = buffer->localSync(); | ||
205 | m_resource->synchronizeWithSource(m_pipeline).then<void>([callback, localSync, this](Async::Future<void> &f){ | ||
206 | if (localSync) { | ||
207 | m_resource->processAllMessages().then<void>([callback](Async::Future<void> &f){ | ||
208 | callback(); | ||
209 | f.setFinished(); | ||
210 | }).exec(); | ||
211 | } else { | ||
212 | callback(); | ||
213 | f.setFinished(); | ||
214 | } | ||
215 | }).exec(); | ||
216 | } else if (buffer->localSync()) { | ||
217 | m_resource->processAllMessages().then<void>([callback](Async::Future<void> &f){ | ||
218 | callback(); | ||
219 | f.setFinished(); | ||
220 | }).exec(); | ||
221 | } | ||
201 | return; | 222 | return; |
202 | } else { | 223 | } else { |
203 | qWarning() << "No resource loaded"; | 224 | qWarning() << "received invalid command"; |
204 | } | 225 | } |
205 | break; | 226 | break; |
206 | } | 227 | } |