From 0faf38f2ad9672fb46c77cae7317f44c72ebd10e Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Thu, 15 Jan 2015 23:08:34 +0100 Subject: Async message queue processing. The Job/Future in Pipeline::newEntity for some reason crashes with async pipeline processing. --- common/pipeline.cpp | 79 +++++++++++++++++++----------------- dummyresource/resourcefactory.cpp | 85 ++++++++++++++++++++++++--------------- dummyresource/resourcefactory.h | 1 + tests/dummyresourcetest.cpp | 19 +++++++-- 4 files changed, 113 insertions(+), 71 deletions(-) diff --git a/common/pipeline.cpp b/common/pipeline.cpp index 9cc7450..339a39c 100644 --- a/common/pipeline.cpp +++ b/common/pipeline.cpp @@ -95,44 +95,49 @@ void Pipeline::null() Async::Job Pipeline::newEntity(void const *command, size_t size) { - qDebug() << "new entity"; - Async::start([&](Async::Future future) { - - //TODO toRFC4122 would probably be more efficient, but results in non-printable keys. - const auto key = QUuid::createUuid().toString().toUtf8(); - - //TODO figure out if we already have created a revision for the message? - const qint64 newRevision = storage().maxRevision() + 1; - - auto createEntity = Akonadi2::Commands::GetCreateEntity(command); - //TODO rename createEntitiy->domainType to bufferType - const QString entityType = QString::fromUtf8(reinterpret_cast(createEntity->domainType()->Data()), createEntity->domainType()->size()); - auto entity = Akonadi2::GetEntity(createEntity->delta()->Data()); - // - // const QString entityType; - // auto entity = Akonadi2::GetEntity(0); - - //Add metadata buffer - flatbuffers::FlatBufferBuilder metadataFbb; - auto metadataBuilder = Akonadi2::MetadataBuilder(metadataFbb); - metadataBuilder.add_revision(newRevision); - metadataBuilder.add_processed(false); - auto metadataBuffer = metadataBuilder.Finish(); - Akonadi2::FinishMetadataBuffer(metadataFbb, metadataBuffer); - //TODO we should reserve some space in metadata for in-place updates - - flatbuffers::FlatBufferBuilder fbb; - EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), entity->resource()->Data(), entity->resource()->size(), entity->local()->Data(), entity->local()->size()); - - storage().write(key.data(), key.size(), fbb.GetBufferPointer(), fbb.GetSize()); - storage().setMaxRevision(newRevision); + qDebug() << "new entity" << size; + //TODO toRFC4122 would probably be more efficient, but results in non-printable keys. + const auto key = QUuid::createUuid().toString().toUtf8(); + + //TODO figure out if we already have created a revision for the message? + const qint64 newRevision = storage().maxRevision() + 1; + + { + flatbuffers::Verifier verifyer(reinterpret_cast(command), size); + if (!Akonadi2::Commands::VerifyCreateEntityBuffer(verifyer)) { + qWarning() << "invalid buffer"; + return Async::null(); + } + } + + auto createEntity = Akonadi2::Commands::GetCreateEntity(command); + + //TODO rename createEntitiy->domainType to bufferType + const QString entityType = QString::fromUtf8(reinterpret_cast(createEntity->domainType()->Data()), createEntity->domainType()->size()); + auto entity = Akonadi2::GetEntity(createEntity->delta()->Data()); + + //Add metadata buffer + flatbuffers::FlatBufferBuilder metadataFbb; + auto metadataBuilder = Akonadi2::MetadataBuilder(metadataFbb); + metadataBuilder.add_revision(newRevision); + metadataBuilder.add_processed(false); + auto metadataBuffer = metadataBuilder.Finish(); + Akonadi2::FinishMetadataBuffer(metadataFbb, metadataBuffer); + //TODO we should reserve some space in metadata for in-place updates + + flatbuffers::FlatBufferBuilder fbb; + EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), entity->resource()->Data(), entity->resource()->size(), entity->local()->Data(), entity->local()->size()); + + storage().write(key.data(), key.size(), fbb.GetBufferPointer(), fbb.GetSize()); + storage().setMaxRevision(newRevision); + + return Async::start([this, key, entityType](Async::Future &future) { PipelineState state(this, NewPipeline, key, d->newPipeline[entityType], [&future]() { future.setFinished(); }); d->activePipelines << state; state.step(); - }); } @@ -157,10 +162,12 @@ void Pipeline::pipelineStepped(const PipelineState &state) void Pipeline::scheduleStep() { - if (!d->stepScheduled) { - d->stepScheduled = true; - QMetaObject::invokeMethod(this, "stepPipelines", Qt::QueuedConnection); - } + // if (!d->stepScheduled) { + // d->stepScheduled = true; + // QMetaObject::invokeMethod(this, "stepPipelines", Qt::QueuedConnection); + // } + //FIXME make async again. For some reason the job in newEntity crashes if pipeline processing is async. + stepPipelines(); } void Pipeline::stepPipelines() diff --git a/dummyresource/resourcefactory.cpp b/dummyresource/resourcefactory.cpp index e14aa01..b43e4a3 100644 --- a/dummyresource/resourcefactory.cpp +++ b/dummyresource/resourcefactory.cpp @@ -116,26 +116,40 @@ public: } private slots: + static void asyncWhile(const std::function)> &body, const std::function &completionHandler) { + body([body, completionHandler](bool complete) { + if (complete) { + completionHandler(); + } else { + asyncWhile(body, completionHandler); + } + }); + } + void process() { if (mProcessingLock) { return; } mProcessingLock = true; - //Empty queue after queue - //FIXME the for and while loops should be async, otherwise we process all messages in parallel - for (auto queue : mCommandQueues) { - qDebug() << "processing queue"; - bool processedMessage = false; - while (processedMessage) { - qDebug() << "process"; - processedMessage = false; - queue->dequeue([this, &processedMessage](void *ptr, int size, std::function messageQueueCallback) { + auto job = processPipeline().then([this](Async::Future &future) { + mProcessingLock = false; + future.setFinished(); + }).exec(); + } + + Async::Job processPipeline() + { + auto job = Async::start([this](Async::Future &future) { + //TODO process all queues in async for + auto queue = mCommandQueues.first(); + asyncWhile([&](std::function whileCallback) { + queue->dequeue([this, whileCallback](void *ptr, int size, std::function messageQueueCallback) { flatbuffers::Verifier verifyer(reinterpret_cast(ptr), size); if (!Akonadi2::VerifyQueuedCommandBuffer(verifyer)) { qWarning() << "invalid buffer"; - processedMessage = false; messageQueueCallback(false); + whileCallback(true); return; } auto queuedCommand = Akonadi2::GetQueuedCommand(ptr); @@ -150,26 +164,32 @@ private slots: break; case Akonadi2::Commands::CreateEntityCommand: { //TODO job lifetime management - auto job = mPipeline->newEntity(queuedCommand->command()->Data(), queuedCommand->command()->size()).then([&messageQueueCallback](Async::Future future) { + mPipeline->newEntity(queuedCommand->command()->Data(), queuedCommand->command()->size()).then([&messageQueueCallback, whileCallback](Async::Future &future) { messageQueueCallback(true); - }); - job.exec(); + whileCallback(false); + future.setFinished(); + }).exec(); } break; default: //Unhandled command qWarning() << "Unhandled command"; messageQueueCallback(true); + whileCallback(false); break; } - processedMessage = true; }, - [&processedMessage](const MessageQueue::Error &error) { - processedMessage = false; + [whileCallback](const MessageQueue::Error &error) { + qDebug() << "no more messages in queue"; + whileCallback(true); }); - } - } - mProcessingLock = false; + }, + [&future]() { //while complete + future.setFinished(); + //Call async-for completion handler + }); + }); + return job; } private: @@ -226,6 +246,18 @@ void findByRemoteId(QSharedPointer storage, const QString &ri }); } +void DummyResource::enqueueCommand(MessageQueue &mq, int commandId, const QByteArray &data) +{ + m_fbb.Clear(); + auto commandData = m_fbb.CreateVector(reinterpret_cast(data.data()), data.size()); + auto builder = Akonadi2::QueuedCommandBuilder(m_fbb); + builder.add_commandId(commandId); + builder.add_command(commandData); + auto buffer = builder.Finish(); + Akonadi2::FinishQueuedCommandBuffer(m_fbb, buffer); + mq.enqueue(m_fbb.GetBufferPointer(), m_fbb.GetSize()); +} + Async::Job DummyResource::synchronizeWithSource(Akonadi2::Pipeline *pipeline) { qDebug() << "synchronizeWithSource"; @@ -259,11 +291,7 @@ Async::Job DummyResource::synchronizeWithSource(Akonadi2::Pipeline *pipeli builder.add_attachment(attachment); auto buffer = builder.Finish(); DummyCalendar::FinishDummyEventBuffer(m_fbb, buffer); - //TODO toRFC4122 would probably be more efficient, but results in non-printable keys. - const auto key = QUuid::createUuid().toString().toUtf8(); - //TODO Create queuedcommand and push into synchronizerQueue - //* create message in store directly, add command to messagequeue waiting for processing. - // pipeline->newEntity(key, m_fbb.GetBufferPointer(), m_fbb.GetSize()); + enqueueCommand(mSynchronizerQueue, Akonadi2::Commands::CreateEntityCommand, QByteArray::fromRawData(reinterpret_cast(m_fbb.GetBufferPointer()), m_fbb.GetSize())); } else { //modification //TODO diff and create modification if necessary } @@ -279,14 +307,7 @@ void DummyResource::processCommand(int commandId, const QByteArray &data, uint s //TODO instead of copying the command including the full entity first into the command queue, we could directly //create a new revision, only pushing a handle into the commandqueue with the relevant changeset (for changereplay). //The problem is that we then require write access from multiple threads (or even processes to avoid sending the full entity over the wire). - m_fbb.Clear(); - auto commandData = m_fbb.CreateVector(reinterpret_cast(data.data()), data.size()); - auto builder = Akonadi2::QueuedCommandBuilder(m_fbb); - builder.add_commandId(commandId); - builder.add_command(commandData); - auto buffer = builder.Finish(); - Akonadi2::FinishQueuedCommandBuffer(m_fbb, buffer); - mUserQueue.enqueue(m_fbb.GetBufferPointer(), m_fbb.GetSize()); + enqueueCommand(mUserQueue, commandId, data); } DummyResourceFactory::DummyResourceFactory(QObject *parent) diff --git a/dummyresource/resourcefactory.h b/dummyresource/resourcefactory.h index 682f25c..6043fb6 100644 --- a/dummyresource/resourcefactory.h +++ b/dummyresource/resourcefactory.h @@ -39,6 +39,7 @@ public: void configurePipeline(Akonadi2::Pipeline *pipeline); private: + void enqueueCommand(MessageQueue &mq, int commandId, const QByteArray &data); flatbuffers::FlatBufferBuilder m_fbb; MessageQueue mUserQueue; MessageQueue mSynchronizerQueue; diff --git a/tests/dummyresourcetest.cpp b/tests/dummyresourcetest.cpp index ddb59a5..c469796 100644 --- a/tests/dummyresourcetest.cpp +++ b/tests/dummyresourcetest.cpp @@ -14,7 +14,7 @@ static void removeFromDisk(const QString &name) { - Akonadi2::Storage store(Akonadi2::Store::storageLocation(), "org.kde.dummy", Akonadi2::Storage::ReadWrite); + Akonadi2::Storage store(Akonadi2::Store::storageLocation(), name, Akonadi2::Storage::ReadWrite); store.removeFromDisk(); } @@ -33,6 +33,9 @@ private Q_SLOTS: void cleanupTestCase() { + removeFromDisk("org.kde.dummy"); + removeFromDisk("org.kde.dummy.userqueue"); + removeFromDisk("org.kde.dummy.synchronizerqueue"); } void testProcessCommand() @@ -60,13 +63,23 @@ private Q_SLOTS: Akonadi2::Commands::FinishCreateEntityBuffer(fbb, location); const QByteArray command(reinterpret_cast(fbb.GetBufferPointer()), fbb.GetSize()); + { + flatbuffers::Verifier verifyer(reinterpret_cast(command.data()), command.size()); + QVERIFY(Akonadi2::Commands::VerifyCreateEntityBuffer(verifyer)); + } + //Actual test Akonadi2::Pipeline pipeline("org.kde.dummy"); + QSignalSpy revisionSpy(&pipeline, SIGNAL(revisionUpdated())); DummyResource resource; resource.configurePipeline(&pipeline); resource.processCommand(Akonadi2::Commands::CreateEntityCommand, command, command.size(), &pipeline); - //TODO wait until the pipeline has processed the command - QTest::qWait(1000); + resource.processCommand(Akonadi2::Commands::CreateEntityCommand, command, command.size(), &pipeline); + + QVERIFY(revisionSpy.isValid()); + QTRY_COMPARE(revisionSpy.count(), 2); + QTest::qWait(100); + QCOMPARE(revisionSpy.count(), 2); } // void testResourceSync() -- cgit v1.2.3