diff options
author | Christian Mollekopf <chrigi_1@fastmail.fm> | 2015-01-15 23:08:34 +0100 |
---|---|---|
committer | Christian Mollekopf <chrigi_1@fastmail.fm> | 2015-01-15 23:08:34 +0100 |
commit | 0faf38f2ad9672fb46c77cae7317f44c72ebd10e (patch) | |
tree | 8776253e72d83b4f824631618af5c7393f9d964d /common/pipeline.cpp | |
parent | a0f517df8c4633d33820186954bdf803941e92bf (diff) | |
download | sink-0faf38f2ad9672fb46c77cae7317f44c72ebd10e.tar.gz sink-0faf38f2ad9672fb46c77cae7317f44c72ebd10e.zip |
Async message queue processing.
The Job/Future in Pipeline::newEntity for some reason crashes with async pipeline processing.
Diffstat (limited to 'common/pipeline.cpp')
-rw-r--r-- | common/pipeline.cpp | 79 |
1 files changed, 43 insertions, 36 deletions
diff --git a/common/pipeline.cpp b/common/pipeline.cpp index 9cc7450..339a39c 100644 --- a/common/pipeline.cpp +++ b/common/pipeline.cpp | |||
@@ -95,44 +95,49 @@ void Pipeline::null() | |||
95 | 95 | ||
96 | Async::Job<void> Pipeline::newEntity(void const *command, size_t size) | 96 | Async::Job<void> Pipeline::newEntity(void const *command, size_t size) |
97 | { | 97 | { |
98 | qDebug() << "new entity"; | 98 | qDebug() << "new entity" << size; |
99 | Async::start<void>([&](Async::Future<void> future) { | ||
100 | |||
101 | //TODO toRFC4122 would probably be more efficient, but results in non-printable keys. | ||
102 | const auto key = QUuid::createUuid().toString().toUtf8(); | ||
103 | |||
104 | //TODO figure out if we already have created a revision for the message? | ||
105 | const qint64 newRevision = storage().maxRevision() + 1; | ||
106 | |||
107 | auto createEntity = Akonadi2::Commands::GetCreateEntity(command); | ||
108 | //TODO rename createEntitiy->domainType to bufferType | ||
109 | const QString entityType = QString::fromUtf8(reinterpret_cast<char const*>(createEntity->domainType()->Data()), createEntity->domainType()->size()); | ||
110 | auto entity = Akonadi2::GetEntity(createEntity->delta()->Data()); | ||
111 | // | ||
112 | // const QString entityType; | ||
113 | // auto entity = Akonadi2::GetEntity(0); | ||
114 | |||
115 | //Add metadata buffer | ||
116 | flatbuffers::FlatBufferBuilder metadataFbb; | ||
117 | auto metadataBuilder = Akonadi2::MetadataBuilder(metadataFbb); | ||
118 | metadataBuilder.add_revision(newRevision); | ||
119 | metadataBuilder.add_processed(false); | ||
120 | auto metadataBuffer = metadataBuilder.Finish(); | ||
121 | Akonadi2::FinishMetadataBuffer(metadataFbb, metadataBuffer); | ||
122 | //TODO we should reserve some space in metadata for in-place updates | ||
123 | |||
124 | flatbuffers::FlatBufferBuilder fbb; | ||
125 | EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), entity->resource()->Data(), entity->resource()->size(), entity->local()->Data(), entity->local()->size()); | ||
126 | |||
127 | storage().write(key.data(), key.size(), fbb.GetBufferPointer(), fbb.GetSize()); | ||
128 | storage().setMaxRevision(newRevision); | ||
129 | 99 | ||
100 | //TODO toRFC4122 would probably be more efficient, but results in non-printable keys. | ||
101 | const auto key = QUuid::createUuid().toString().toUtf8(); | ||
102 | |||
103 | //TODO figure out if we already have created a revision for the message? | ||
104 | const qint64 newRevision = storage().maxRevision() + 1; | ||
105 | |||
106 | { | ||
107 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size); | ||
108 | if (!Akonadi2::Commands::VerifyCreateEntityBuffer(verifyer)) { | ||
109 | qWarning() << "invalid buffer"; | ||
110 | return Async::null<void>(); | ||
111 | } | ||
112 | } | ||
113 | |||
114 | auto createEntity = Akonadi2::Commands::GetCreateEntity(command); | ||
115 | |||
116 | //TODO rename createEntitiy->domainType to bufferType | ||
117 | const QString entityType = QString::fromUtf8(reinterpret_cast<char const*>(createEntity->domainType()->Data()), createEntity->domainType()->size()); | ||
118 | auto entity = Akonadi2::GetEntity(createEntity->delta()->Data()); | ||
119 | |||
120 | //Add metadata buffer | ||
121 | flatbuffers::FlatBufferBuilder metadataFbb; | ||
122 | auto metadataBuilder = Akonadi2::MetadataBuilder(metadataFbb); | ||
123 | metadataBuilder.add_revision(newRevision); | ||
124 | metadataBuilder.add_processed(false); | ||
125 | auto metadataBuffer = metadataBuilder.Finish(); | ||
126 | Akonadi2::FinishMetadataBuffer(metadataFbb, metadataBuffer); | ||
127 | //TODO we should reserve some space in metadata for in-place updates | ||
128 | |||
129 | flatbuffers::FlatBufferBuilder fbb; | ||
130 | EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), entity->resource()->Data(), entity->resource()->size(), entity->local()->Data(), entity->local()->size()); | ||
131 | |||
132 | storage().write(key.data(), key.size(), fbb.GetBufferPointer(), fbb.GetSize()); | ||
133 | storage().setMaxRevision(newRevision); | ||
134 | |||
135 | return Async::start<void>([this, key, entityType](Async::Future<void> &future) { | ||
130 | PipelineState state(this, NewPipeline, key, d->newPipeline[entityType], [&future]() { | 136 | PipelineState state(this, NewPipeline, key, d->newPipeline[entityType], [&future]() { |
131 | future.setFinished(); | 137 | future.setFinished(); |
132 | }); | 138 | }); |
133 | d->activePipelines << state; | 139 | d->activePipelines << state; |
134 | state.step(); | 140 | state.step(); |
135 | |||
136 | }); | 141 | }); |
137 | } | 142 | } |
138 | 143 | ||
@@ -157,10 +162,12 @@ void Pipeline::pipelineStepped(const PipelineState &state) | |||
157 | 162 | ||
158 | void Pipeline::scheduleStep() | 163 | void Pipeline::scheduleStep() |
159 | { | 164 | { |
160 | if (!d->stepScheduled) { | 165 | // if (!d->stepScheduled) { |
161 | d->stepScheduled = true; | 166 | // d->stepScheduled = true; |
162 | QMetaObject::invokeMethod(this, "stepPipelines", Qt::QueuedConnection); | 167 | // QMetaObject::invokeMethod(this, "stepPipelines", Qt::QueuedConnection); |
163 | } | 168 | // } |
169 | //FIXME make async again. For some reason the job in newEntity crashes if pipeline processing is async. | ||
170 | stepPipelines(); | ||
164 | } | 171 | } |
165 | 172 | ||
166 | void Pipeline::stepPipelines() | 173 | void Pipeline::stepPipelines() |