From 0faf38f2ad9672fb46c77cae7317f44c72ebd10e Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Thu, 15 Jan 2015 23:08:34 +0100 Subject: Async message queue processing. The Job/Future in Pipeline::newEntity for some reason crashes with async pipeline processing. --- dummyresource/resourcefactory.cpp | 85 ++++++++++++++++++++++++--------------- dummyresource/resourcefactory.h | 1 + 2 files changed, 54 insertions(+), 32 deletions(-) (limited to 'dummyresource') diff --git a/dummyresource/resourcefactory.cpp b/dummyresource/resourcefactory.cpp index e14aa01..b43e4a3 100644 --- a/dummyresource/resourcefactory.cpp +++ b/dummyresource/resourcefactory.cpp @@ -116,26 +116,40 @@ public: } private slots: + static void asyncWhile(const std::function)> &body, const std::function &completionHandler) { + body([body, completionHandler](bool complete) { + if (complete) { + completionHandler(); + } else { + asyncWhile(body, completionHandler); + } + }); + } + void process() { if (mProcessingLock) { return; } mProcessingLock = true; - //Empty queue after queue - //FIXME the for and while loops should be async, otherwise we process all messages in parallel - for (auto queue : mCommandQueues) { - qDebug() << "processing queue"; - bool processedMessage = false; - while (processedMessage) { - qDebug() << "process"; - processedMessage = false; - queue->dequeue([this, &processedMessage](void *ptr, int size, std::function messageQueueCallback) { + auto job = processPipeline().then([this](Async::Future &future) { + mProcessingLock = false; + future.setFinished(); + }).exec(); + } + + Async::Job processPipeline() + { + auto job = Async::start([this](Async::Future &future) { + //TODO process all queues in async for + auto queue = mCommandQueues.first(); + asyncWhile([&](std::function whileCallback) { + queue->dequeue([this, whileCallback](void *ptr, int size, std::function messageQueueCallback) { flatbuffers::Verifier verifyer(reinterpret_cast(ptr), size); if (!Akonadi2::VerifyQueuedCommandBuffer(verifyer)) { qWarning() << "invalid buffer"; - processedMessage = false; messageQueueCallback(false); + whileCallback(true); return; } auto queuedCommand = Akonadi2::GetQueuedCommand(ptr); @@ -150,26 +164,32 @@ private slots: break; case Akonadi2::Commands::CreateEntityCommand: { //TODO job lifetime management - auto job = mPipeline->newEntity(queuedCommand->command()->Data(), queuedCommand->command()->size()).then([&messageQueueCallback](Async::Future future) { + mPipeline->newEntity(queuedCommand->command()->Data(), queuedCommand->command()->size()).then([&messageQueueCallback, whileCallback](Async::Future &future) { messageQueueCallback(true); - }); - job.exec(); + whileCallback(false); + future.setFinished(); + }).exec(); } break; default: //Unhandled command qWarning() << "Unhandled command"; messageQueueCallback(true); + whileCallback(false); break; } - processedMessage = true; }, - [&processedMessage](const MessageQueue::Error &error) { - processedMessage = false; + [whileCallback](const MessageQueue::Error &error) { + qDebug() << "no more messages in queue"; + whileCallback(true); }); - } - } - mProcessingLock = false; + }, + [&future]() { //while complete + future.setFinished(); + //Call async-for completion handler + }); + }); + return job; } private: @@ -226,6 +246,18 @@ void findByRemoteId(QSharedPointer storage, const QString &ri }); } +void DummyResource::enqueueCommand(MessageQueue &mq, int commandId, const QByteArray &data) +{ + m_fbb.Clear(); + auto commandData = m_fbb.CreateVector(reinterpret_cast(data.data()), data.size()); + auto builder = Akonadi2::QueuedCommandBuilder(m_fbb); + builder.add_commandId(commandId); + builder.add_command(commandData); + auto buffer = builder.Finish(); + Akonadi2::FinishQueuedCommandBuffer(m_fbb, buffer); + mq.enqueue(m_fbb.GetBufferPointer(), m_fbb.GetSize()); +} + Async::Job DummyResource::synchronizeWithSource(Akonadi2::Pipeline *pipeline) { qDebug() << "synchronizeWithSource"; @@ -259,11 +291,7 @@ Async::Job DummyResource::synchronizeWithSource(Akonadi2::Pipeline *pipeli builder.add_attachment(attachment); auto buffer = builder.Finish(); DummyCalendar::FinishDummyEventBuffer(m_fbb, buffer); - //TODO toRFC4122 would probably be more efficient, but results in non-printable keys. - const auto key = QUuid::createUuid().toString().toUtf8(); - //TODO Create queuedcommand and push into synchronizerQueue - //* create message in store directly, add command to messagequeue waiting for processing. - // pipeline->newEntity(key, m_fbb.GetBufferPointer(), m_fbb.GetSize()); + enqueueCommand(mSynchronizerQueue, Akonadi2::Commands::CreateEntityCommand, QByteArray::fromRawData(reinterpret_cast(m_fbb.GetBufferPointer()), m_fbb.GetSize())); } else { //modification //TODO diff and create modification if necessary } @@ -279,14 +307,7 @@ void DummyResource::processCommand(int commandId, const QByteArray &data, uint s //TODO instead of copying the command including the full entity first into the command queue, we could directly //create a new revision, only pushing a handle into the commandqueue with the relevant changeset (for changereplay). //The problem is that we then require write access from multiple threads (or even processes to avoid sending the full entity over the wire). - m_fbb.Clear(); - auto commandData = m_fbb.CreateVector(reinterpret_cast(data.data()), data.size()); - auto builder = Akonadi2::QueuedCommandBuilder(m_fbb); - builder.add_commandId(commandId); - builder.add_command(commandData); - auto buffer = builder.Finish(); - Akonadi2::FinishQueuedCommandBuffer(m_fbb, buffer); - mUserQueue.enqueue(m_fbb.GetBufferPointer(), m_fbb.GetSize()); + enqueueCommand(mUserQueue, commandId, data); } DummyResourceFactory::DummyResourceFactory(QObject *parent) diff --git a/dummyresource/resourcefactory.h b/dummyresource/resourcefactory.h index 682f25c..6043fb6 100644 --- a/dummyresource/resourcefactory.h +++ b/dummyresource/resourcefactory.h @@ -39,6 +39,7 @@ public: void configurePipeline(Akonadi2::Pipeline *pipeline); private: + void enqueueCommand(MessageQueue &mq, int commandId, const QByteArray &data); flatbuffers::FlatBufferBuilder m_fbb; MessageQueue mUserQueue; MessageQueue mSynchronizerQueue; -- cgit v1.2.3