summaryrefslogtreecommitdiffstats
path: root/synchronizer
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2015-01-25 11:23:08 +0100
committerChristian Mollekopf <chrigi_1@fastmail.fm>2015-01-25 11:23:08 +0100
commit3fc8ce958fc244e64a3a3a92f3b1440aae04133b (patch)
tree4ba2b3ca3ee6a17e7f3e7ce67d6ca934626cad7a /synchronizer
parent9b744da32e64d8a6cd342faba8fc3232884d60f2 (diff)
downloadsink-3fc8ce958fc244e64a3a3a92f3b1440aae04133b.tar.gz
sink-3fc8ce958fc244e64a3a3a92f3b1440aae04133b.zip
A way to ensure all messages have been processed.
As queries become reactive this should become less important. We can then just wait until all results become available. For tests it is in either case useful though.
Diffstat (limited to 'synchronizer')
-rw-r--r--synchronizer/listener.cpp41
1 files changed, 31 insertions, 10 deletions
diff --git a/synchronizer/listener.cpp b/synchronizer/listener.cpp
index a84623d..6098856 100644
--- a/synchronizer/listener.cpp
+++ b/synchronizer/listener.cpp
@@ -28,6 +28,7 @@
28#include "common/commandcompletion_generated.h" 28#include "common/commandcompletion_generated.h"
29#include "common/handshake_generated.h" 29#include "common/handshake_generated.h"
30#include "common/revisionupdate_generated.h" 30#include "common/revisionupdate_generated.h"
31#include "common/synchronize_generated.h"
31 32
32#include <QLocalSocket> 33#include <QLocalSocket>
33#include <QTimer> 34#include <QTimer>
@@ -189,18 +190,38 @@ void Listener::processCommand(int commandId, uint messageId, Client &client, uin
189 break; 190 break;
190 } 191 }
191 case Akonadi2::Commands::SynchronizeCommand: { 192 case Akonadi2::Commands::SynchronizeCommand: {
192 log(QString("\tSynchronize request (id %1) from %2").arg(messageId).arg(client.name)); 193 flatbuffers::Verifier verifier((const uint8_t *)client.commandBuffer.constData(), size);
193 loadResource(); 194 if (Akonadi2::VerifySynchronizeBuffer(verifier)) {
194 if (m_resource) { 195 auto buffer = Akonadi2::GetSynchronize(client.commandBuffer.constData());
195 qDebug() << "synchronizing"; 196 log(QString("\tSynchronize request (id %1) from %2").arg(messageId).arg(client.name));
196 m_resource->synchronizeWithSource(m_pipeline).then<void>([callback](Async::Future<void> &f){ 197 loadResource();
197 //FIXME: the command is complete once all pipelines have been drained, track progress and async emit result 198 if (!m_resource) {
198 callback(); 199 qWarning() << "No resource loaded";
199 f.setFinished(); 200 break;
200 }).exec(); 201 }
202 //TODO a more elegant composition of jobs should be possible
203 if (buffer->sourceSync()) {
204 bool localSync = buffer->localSync();
205 m_resource->synchronizeWithSource(m_pipeline).then<void>([callback, localSync, this](Async::Future<void> &f){
206 if (localSync) {
207 m_resource->processAllMessages().then<void>([callback](Async::Future<void> &f){
208 callback();
209 f.setFinished();
210 }).exec();
211 } else {
212 callback();
213 f.setFinished();
214 }
215 }).exec();
216 } else if (buffer->localSync()) {
217 m_resource->processAllMessages().then<void>([callback](Async::Future<void> &f){
218 callback();
219 f.setFinished();
220 }).exec();
221 }
201 return; 222 return;
202 } else { 223 } else {
203 qWarning() << "No resource loaded"; 224 qWarning() << "received invalid command";
204 } 225 }
205 break; 226 break;
206 } 227 }