summaryrefslogtreecommitdiffstats
path: root/common/pipeline.cpp
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2015-01-15 01:56:09 +0100
committerChristian Mollekopf <chrigi_1@fastmail.fm>2015-01-15 01:56:09 +0100
commitbc2a95cad05e454a84c317f1078edb329bd3afd4 (patch)
tree4cb54032b075a8730a532ccd7485db598e859ac8 /common/pipeline.cpp
parent70bb707903da21103b84e0f3effcaa0a24612d5c (diff)
downloadsink-bc2a95cad05e454a84c317f1078edb329bd3afd4.tar.gz
sink-bc2a95cad05e454a84c317f1078edb329bd3afd4.zip
Writing from facade.
Diffstat (limited to 'common/pipeline.cpp')
-rw-r--r--common/pipeline.cpp79
1 files changed, 54 insertions, 25 deletions
diff --git a/common/pipeline.cpp b/common/pipeline.cpp
index 10bae54..9cc7450 100644
--- a/common/pipeline.cpp
+++ b/common/pipeline.cpp
@@ -23,10 +23,13 @@
23#include <QByteArray> 23#include <QByteArray>
24#include <QStandardPaths> 24#include <QStandardPaths>
25#include <QVector> 25#include <QVector>
26#include <QUuid>
26#include <QDebug> 27#include <QDebug>
27#include "entity_generated.h" 28#include "entity_generated.h"
28#include "metadata_generated.h" 29#include "metadata_generated.h"
30#include "createentity_generated.h"
29#include "entitybuffer.h" 31#include "entitybuffer.h"
32#include "async/src/async.h"
30 33
31namespace Akonadi2 34namespace Akonadi2
32{ 35{
@@ -90,39 +93,59 @@ void Pipeline::null()
90 // state.step(); 93 // state.step();
91} 94}
92 95
93void Pipeline::newEntity(const QString &entityType, const QByteArray &key, void *resourceBufferData, size_t size) 96Async::Job<void> Pipeline::newEntity(void const *command, size_t size)
94{ 97{
95 const qint64 newRevision = storage().maxRevision() + 1; 98 qDebug() << "new entity";
96 99 Async::start<void>([&](Async::Future<void> future) {
97 //Add metadata buffer 100
98 flatbuffers::FlatBufferBuilder metadataFbb; 101 //TODO toRFC4122 would probably be more efficient, but results in non-printable keys.
99 auto metadataBuilder = Akonadi2::MetadataBuilder(metadataFbb); 102 const auto key = QUuid::createUuid().toString().toUtf8();
100 metadataBuilder.add_revision(newRevision); 103
101 metadataBuilder.add_processed(false); 104 //TODO figure out if we already have created a revision for the message?
102 auto metadataBuffer = metadataBuilder.Finish(); 105 const qint64 newRevision = storage().maxRevision() + 1;
103 Akonadi2::FinishMetadataBuffer(metadataFbb, metadataBuffer); 106
104 107 auto createEntity = Akonadi2::Commands::GetCreateEntity(command);
105 flatbuffers::FlatBufferBuilder fbb; 108 //TODO rename createEntitiy->domainType to bufferType
106 EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), resourceBufferData, size, 0, 0); 109 const QString entityType = QString::fromUtf8(reinterpret_cast<char const*>(createEntity->domainType()->Data()), createEntity->domainType()->size());
107 110 auto entity = Akonadi2::GetEntity(createEntity->delta()->Data());
108 storage().write(key.data(), key.size(), fbb.GetBufferPointer(), fbb.GetSize()); 111 //
109 storage().setMaxRevision(newRevision); 112 // const QString entityType;
113 // auto entity = Akonadi2::GetEntity(0);
114
115 //Add metadata buffer
116 flatbuffers::FlatBufferBuilder metadataFbb;
117 auto metadataBuilder = Akonadi2::MetadataBuilder(metadataFbb);
118 metadataBuilder.add_revision(newRevision);
119 metadataBuilder.add_processed(false);
120 auto metadataBuffer = metadataBuilder.Finish();
121 Akonadi2::FinishMetadataBuffer(metadataFbb, metadataBuffer);
122 //TODO we should reserve some space in metadata for in-place updates
123
124 flatbuffers::FlatBufferBuilder fbb;
125 EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), entity->resource()->Data(), entity->resource()->size(), entity->local()->Data(), entity->local()->size());
126
127 storage().write(key.data(), key.size(), fbb.GetBufferPointer(), fbb.GetSize());
128 storage().setMaxRevision(newRevision);
129
130 PipelineState state(this, NewPipeline, key, d->newPipeline[entityType], [&future]() {
131 future.setFinished();
132 });
133 d->activePipelines << state;
134 state.step();
110 135
111 PipelineState state(this, NewPipeline, key, d->newPipeline[entityType]); 136 });
112 d->activePipelines << state;
113 state.step();
114} 137}
115 138
116void Pipeline::modifiedEntity(const QString &entityType, const QByteArray &key, void *data, size_t size) 139void Pipeline::modifiedEntity(const QString &entityType, const QByteArray &key, void *data, size_t size)
117{ 140{
118 PipelineState state(this, ModifiedPipeline, key, d->modifiedPipeline[entityType]); 141 PipelineState state(this, ModifiedPipeline, key, d->modifiedPipeline[entityType], [](){});
119 d->activePipelines << state; 142 d->activePipelines << state;
120 state.step(); 143 state.step();
121} 144}
122 145
123void Pipeline::deletedEntity(const QString &entityType, const QByteArray &key) 146void Pipeline::deletedEntity(const QString &entityType, const QByteArray &key)
124{ 147{
125 PipelineState state(this, DeletedPipeline, key, d->deletedPipeline[entityType]); 148 PipelineState state(this, DeletedPipeline, key, d->deletedPipeline[entityType], [](){});
126 d->activePipelines << state; 149 d->activePipelines << state;
127 state.step(); 150 state.step();
128} 151}
@@ -160,6 +183,7 @@ void Pipeline::pipelineCompleted(const PipelineState &state)
160 } 183 }
161 184
162 if (state.type() != NullPipeline) { 185 if (state.type() != NullPipeline) {
186 //TODO what revision is finalized?
163 emit revisionUpdated(); 187 emit revisionUpdated();
164 } 188 }
165 scheduleStep(); 189 scheduleStep();
@@ -172,12 +196,13 @@ void Pipeline::pipelineCompleted(const PipelineState &state)
172class PipelineState::Private : public QSharedData 196class PipelineState::Private : public QSharedData
173{ 197{
174public: 198public:
175 Private(Pipeline *p, Pipeline::Type t, const QByteArray &k, QVector<Preprocessor *> filters) 199 Private(Pipeline *p, Pipeline::Type t, const QByteArray &k, QVector<Preprocessor *> filters, const std::function<void()> &c)
176 : pipeline(p), 200 : pipeline(p),
177 type(t), 201 type(t),
178 key(k), 202 key(k),
179 filterIt(filters), 203 filterIt(filters),
180 idle(true) 204 idle(true),
205 callback(c)
181 {} 206 {}
182 207
183 Private() 208 Private()
@@ -191,6 +216,7 @@ public:
191 QByteArray key; 216 QByteArray key;
192 QVectorIterator<Preprocessor *> filterIt; 217 QVectorIterator<Preprocessor *> filterIt;
193 bool idle; 218 bool idle;
219 std::function<void()> callback;
194}; 220};
195 221
196PipelineState::PipelineState() 222PipelineState::PipelineState()
@@ -199,8 +225,8 @@ PipelineState::PipelineState()
199 225
200} 226}
201 227
202PipelineState::PipelineState(Pipeline *pipeline, Pipeline::Type type, const QByteArray &key, const QVector<Preprocessor *> &filters) 228PipelineState::PipelineState(Pipeline *pipeline, Pipeline::Type type, const QByteArray &key, const QVector<Preprocessor *> &filters, const std::function<void()> &callback)
203 : d(new Private(pipeline, type, key, filters)) 229 : d(new Private(pipeline, type, key, filters, callback))
204{ 230{
205} 231}
206 232
@@ -247,6 +273,7 @@ void PipelineState::step()
247 273
248 d->idle = false; 274 d->idle = false;
249 if (d->filterIt.hasNext()) { 275 if (d->filterIt.hasNext()) {
276 //TODO skip step if already processed
250 d->pipeline->storage().scan(d->key.toStdString(), [this](void *keyValue, int keySize, void *dataValue, int dataSize) -> bool { 277 d->pipeline->storage().scan(d->key.toStdString(), [this](void *keyValue, int keySize, void *dataValue, int dataSize) -> bool {
251 auto entity = Akonadi2::GetEntity(dataValue); 278 auto entity = Akonadi2::GetEntity(dataValue);
252 d->filterIt.next()->process(*this, *entity); 279 d->filterIt.next()->process(*this, *entity);
@@ -254,11 +281,13 @@ void PipelineState::step()
254 }); 281 });
255 } else { 282 } else {
256 d->pipeline->pipelineCompleted(*this); 283 d->pipeline->pipelineCompleted(*this);
284 d->callback();
257 } 285 }
258} 286}
259 287
260void PipelineState::processingCompleted(Preprocessor *filter) 288void PipelineState::processingCompleted(Preprocessor *filter)
261{ 289{
290 //TODO record processing progress
262 if (d->pipeline && filter == d->filterIt.peekPrevious()) { 291 if (d->pipeline && filter == d->filterIt.peekPrevious()) {
263 d->idle = true; 292 d->idle = true;
264 d->pipeline->pipelineStepped(*this); 293 d->pipeline->pipelineStepped(*this);