From 0faf38f2ad9672fb46c77cae7317f44c72ebd10e Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Thu, 15 Jan 2015 23:08:34 +0100 Subject: Async message queue processing. The Job/Future in Pipeline::newEntity for some reason crashes with async pipeline processing. --- common/pipeline.cpp | 79 +++++++++++++++++++++++++++++------------------------ 1 file changed, 43 insertions(+), 36 deletions(-) (limited to 'common/pipeline.cpp') diff --git a/common/pipeline.cpp b/common/pipeline.cpp index 9cc7450..339a39c 100644 --- a/common/pipeline.cpp +++ b/common/pipeline.cpp @@ -95,44 +95,49 @@ void Pipeline::null() Async::Job Pipeline::newEntity(void const *command, size_t size) { - qDebug() << "new entity"; - Async::start([&](Async::Future future) { - - //TODO toRFC4122 would probably be more efficient, but results in non-printable keys. - const auto key = QUuid::createUuid().toString().toUtf8(); - - //TODO figure out if we already have created a revision for the message? - const qint64 newRevision = storage().maxRevision() + 1; - - auto createEntity = Akonadi2::Commands::GetCreateEntity(command); - //TODO rename createEntitiy->domainType to bufferType - const QString entityType = QString::fromUtf8(reinterpret_cast(createEntity->domainType()->Data()), createEntity->domainType()->size()); - auto entity = Akonadi2::GetEntity(createEntity->delta()->Data()); - // - // const QString entityType; - // auto entity = Akonadi2::GetEntity(0); - - //Add metadata buffer - flatbuffers::FlatBufferBuilder metadataFbb; - auto metadataBuilder = Akonadi2::MetadataBuilder(metadataFbb); - metadataBuilder.add_revision(newRevision); - metadataBuilder.add_processed(false); - auto metadataBuffer = metadataBuilder.Finish(); - Akonadi2::FinishMetadataBuffer(metadataFbb, metadataBuffer); - //TODO we should reserve some space in metadata for in-place updates - - flatbuffers::FlatBufferBuilder fbb; - EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), entity->resource()->Data(), entity->resource()->size(), entity->local()->Data(), entity->local()->size()); - - storage().write(key.data(), key.size(), fbb.GetBufferPointer(), fbb.GetSize()); - storage().setMaxRevision(newRevision); + qDebug() << "new entity" << size; + //TODO toRFC4122 would probably be more efficient, but results in non-printable keys. + const auto key = QUuid::createUuid().toString().toUtf8(); + + //TODO figure out if we already have created a revision for the message? + const qint64 newRevision = storage().maxRevision() + 1; + + { + flatbuffers::Verifier verifyer(reinterpret_cast(command), size); + if (!Akonadi2::Commands::VerifyCreateEntityBuffer(verifyer)) { + qWarning() << "invalid buffer"; + return Async::null(); + } + } + + auto createEntity = Akonadi2::Commands::GetCreateEntity(command); + + //TODO rename createEntitiy->domainType to bufferType + const QString entityType = QString::fromUtf8(reinterpret_cast(createEntity->domainType()->Data()), createEntity->domainType()->size()); + auto entity = Akonadi2::GetEntity(createEntity->delta()->Data()); + + //Add metadata buffer + flatbuffers::FlatBufferBuilder metadataFbb; + auto metadataBuilder = Akonadi2::MetadataBuilder(metadataFbb); + metadataBuilder.add_revision(newRevision); + metadataBuilder.add_processed(false); + auto metadataBuffer = metadataBuilder.Finish(); + Akonadi2::FinishMetadataBuffer(metadataFbb, metadataBuffer); + //TODO we should reserve some space in metadata for in-place updates + + flatbuffers::FlatBufferBuilder fbb; + EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), entity->resource()->Data(), entity->resource()->size(), entity->local()->Data(), entity->local()->size()); + + storage().write(key.data(), key.size(), fbb.GetBufferPointer(), fbb.GetSize()); + storage().setMaxRevision(newRevision); + + return Async::start([this, key, entityType](Async::Future &future) { PipelineState state(this, NewPipeline, key, d->newPipeline[entityType], [&future]() { future.setFinished(); }); d->activePipelines << state; state.step(); - }); } @@ -157,10 +162,12 @@ void Pipeline::pipelineStepped(const PipelineState &state) void Pipeline::scheduleStep() { - if (!d->stepScheduled) { - d->stepScheduled = true; - QMetaObject::invokeMethod(this, "stepPipelines", Qt::QueuedConnection); - } + // if (!d->stepScheduled) { + // d->stepScheduled = true; + // QMetaObject::invokeMethod(this, "stepPipelines", Qt::QueuedConnection); + // } + //FIXME make async again. For some reason the job in newEntity crashes if pipeline processing is async. + stepPipelines(); } void Pipeline::stepPipelines() -- cgit v1.2.3