summaryrefslogtreecommitdiffstats
path: root/common/pipeline.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'common/pipeline.cpp')
-rw-r--r--common/pipeline.cpp79
1 files changed, 43 insertions, 36 deletions
diff --git a/common/pipeline.cpp b/common/pipeline.cpp
index 9cc7450..339a39c 100644
--- a/common/pipeline.cpp
+++ b/common/pipeline.cpp
@@ -95,44 +95,49 @@ void Pipeline::null()
95 95
96Async::Job<void> Pipeline::newEntity(void const *command, size_t size) 96Async::Job<void> Pipeline::newEntity(void const *command, size_t size)
97{ 97{
98 qDebug() << "new entity"; 98 qDebug() << "new entity" << size;
99 Async::start<void>([&](Async::Future<void> future) {
100
101 //TODO toRFC4122 would probably be more efficient, but results in non-printable keys.
102 const auto key = QUuid::createUuid().toString().toUtf8();
103
104 //TODO figure out if we already have created a revision for the message?
105 const qint64 newRevision = storage().maxRevision() + 1;
106
107 auto createEntity = Akonadi2::Commands::GetCreateEntity(command);
108 //TODO rename createEntitiy->domainType to bufferType
109 const QString entityType = QString::fromUtf8(reinterpret_cast<char const*>(createEntity->domainType()->Data()), createEntity->domainType()->size());
110 auto entity = Akonadi2::GetEntity(createEntity->delta()->Data());
111 //
112 // const QString entityType;
113 // auto entity = Akonadi2::GetEntity(0);
114
115 //Add metadata buffer
116 flatbuffers::FlatBufferBuilder metadataFbb;
117 auto metadataBuilder = Akonadi2::MetadataBuilder(metadataFbb);
118 metadataBuilder.add_revision(newRevision);
119 metadataBuilder.add_processed(false);
120 auto metadataBuffer = metadataBuilder.Finish();
121 Akonadi2::FinishMetadataBuffer(metadataFbb, metadataBuffer);
122 //TODO we should reserve some space in metadata for in-place updates
123
124 flatbuffers::FlatBufferBuilder fbb;
125 EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), entity->resource()->Data(), entity->resource()->size(), entity->local()->Data(), entity->local()->size());
126
127 storage().write(key.data(), key.size(), fbb.GetBufferPointer(), fbb.GetSize());
128 storage().setMaxRevision(newRevision);
129 99
100 //TODO toRFC4122 would probably be more efficient, but results in non-printable keys.
101 const auto key = QUuid::createUuid().toString().toUtf8();
102
103 //TODO figure out if we already have created a revision for the message?
104 const qint64 newRevision = storage().maxRevision() + 1;
105
106 {
107 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size);
108 if (!Akonadi2::Commands::VerifyCreateEntityBuffer(verifyer)) {
109 qWarning() << "invalid buffer";
110 return Async::null<void>();
111 }
112 }
113
114 auto createEntity = Akonadi2::Commands::GetCreateEntity(command);
115
116 //TODO rename createEntitiy->domainType to bufferType
117 const QString entityType = QString::fromUtf8(reinterpret_cast<char const*>(createEntity->domainType()->Data()), createEntity->domainType()->size());
118 auto entity = Akonadi2::GetEntity(createEntity->delta()->Data());
119
120 //Add metadata buffer
121 flatbuffers::FlatBufferBuilder metadataFbb;
122 auto metadataBuilder = Akonadi2::MetadataBuilder(metadataFbb);
123 metadataBuilder.add_revision(newRevision);
124 metadataBuilder.add_processed(false);
125 auto metadataBuffer = metadataBuilder.Finish();
126 Akonadi2::FinishMetadataBuffer(metadataFbb, metadataBuffer);
127 //TODO we should reserve some space in metadata for in-place updates
128
129 flatbuffers::FlatBufferBuilder fbb;
130 EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), entity->resource()->Data(), entity->resource()->size(), entity->local()->Data(), entity->local()->size());
131
132 storage().write(key.data(), key.size(), fbb.GetBufferPointer(), fbb.GetSize());
133 storage().setMaxRevision(newRevision);
134
135 return Async::start<void>([this, key, entityType](Async::Future<void> &future) {
130 PipelineState state(this, NewPipeline, key, d->newPipeline[entityType], [&future]() { 136 PipelineState state(this, NewPipeline, key, d->newPipeline[entityType], [&future]() {
131 future.setFinished(); 137 future.setFinished();
132 }); 138 });
133 d->activePipelines << state; 139 d->activePipelines << state;
134 state.step(); 140 state.step();
135
136 }); 141 });
137} 142}
138 143
@@ -157,10 +162,12 @@ void Pipeline::pipelineStepped(const PipelineState &state)
157 162
158void Pipeline::scheduleStep() 163void Pipeline::scheduleStep()
159{ 164{
160 if (!d->stepScheduled) { 165 // if (!d->stepScheduled) {
161 d->stepScheduled = true; 166 // d->stepScheduled = true;
162 QMetaObject::invokeMethod(this, "stepPipelines", Qt::QueuedConnection); 167 // QMetaObject::invokeMethod(this, "stepPipelines", Qt::QueuedConnection);
163 } 168 // }
169 //FIXME make async again. For some reason the job in newEntity crashes if pipeline processing is async.
170 stepPipelines();
164} 171}
165 172
166void Pipeline::stepPipelines() 173void Pipeline::stepPipelines()