From 3fc8ce958fc244e64a3a3a92f3b1440aae04133b Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Sun, 25 Jan 2015 11:23:08 +0100 Subject: A way to ensure all messages have been processed. As queries become reactive this should become less important. We can then just wait until all results become available. For tests it is in either case useful though. --- synchronizer/listener.cpp | 41 +++++++++++++++++++++++++++++++---------- 1 file changed, 31 insertions(+), 10 deletions(-) (limited to 'synchronizer') diff --git a/synchronizer/listener.cpp b/synchronizer/listener.cpp index a84623d..6098856 100644 --- a/synchronizer/listener.cpp +++ b/synchronizer/listener.cpp @@ -28,6 +28,7 @@ #include "common/commandcompletion_generated.h" #include "common/handshake_generated.h" #include "common/revisionupdate_generated.h" +#include "common/synchronize_generated.h" #include #include @@ -189,18 +190,38 @@ void Listener::processCommand(int commandId, uint messageId, Client &client, uin break; } case Akonadi2::Commands::SynchronizeCommand: { - log(QString("\tSynchronize request (id %1) from %2").arg(messageId).arg(client.name)); - loadResource(); - if (m_resource) { - qDebug() << "synchronizing"; - m_resource->synchronizeWithSource(m_pipeline).then([callback](Async::Future &f){ - //FIXME: the command is complete once all pipelines have been drained, track progress and async emit result - callback(); - f.setFinished(); - }).exec(); + flatbuffers::Verifier verifier((const uint8_t *)client.commandBuffer.constData(), size); + if (Akonadi2::VerifySynchronizeBuffer(verifier)) { + auto buffer = Akonadi2::GetSynchronize(client.commandBuffer.constData()); + log(QString("\tSynchronize request (id %1) from %2").arg(messageId).arg(client.name)); + loadResource(); + if (!m_resource) { + qWarning() << "No resource loaded"; + break; + } + //TODO a more elegant composition of jobs should be possible + if (buffer->sourceSync()) { + bool localSync = buffer->localSync(); + m_resource->synchronizeWithSource(m_pipeline).then([callback, localSync, this](Async::Future &f){ + if (localSync) { + m_resource->processAllMessages().then([callback](Async::Future &f){ + callback(); + f.setFinished(); + }).exec(); + } else { + callback(); + f.setFinished(); + } + }).exec(); + } else if (buffer->localSync()) { + m_resource->processAllMessages().then([callback](Async::Future &f){ + callback(); + f.setFinished(); + }).exec(); + } return; } else { - qWarning() << "No resource loaded"; + qWarning() << "received invalid command"; } break; } -- cgit v1.2.3