summaryrefslogtreecommitdiffstats
path: root/common/pipeline.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'common/pipeline.cpp')
-rw-r--r--common/pipeline.cpp174
1 files changed, 101 insertions, 73 deletions
diff --git a/common/pipeline.cpp b/common/pipeline.cpp
index 401c26d..93d8236 100644
--- a/common/pipeline.cpp
+++ b/common/pipeline.cpp
@@ -25,6 +25,7 @@
25#include <QVector> 25#include <QVector>
26#include <QUuid> 26#include <QUuid>
27#include <QDebug> 27#include <QDebug>
28#include <QTime>
28#include "entity_generated.h" 29#include "entity_generated.h"
29#include "metadata_generated.h" 30#include "metadata_generated.h"
30#include "createentity_generated.h" 31#include "createentity_generated.h"
@@ -36,6 +37,9 @@
36#include "definitions.h" 37#include "definitions.h"
37#include "bufferutils.h" 38#include "bufferutils.h"
38 39
40#undef DEBUG_AREA
41#define DEBUG_AREA "resource.pipeline"
42
39namespace Sink 43namespace Sink
40{ 44{
41 45
@@ -53,8 +57,24 @@ public:
53 QHash<QString, QVector<Preprocessor *> > processors; 57 QHash<QString, QVector<Preprocessor *> > processors;
54 QHash<QString, DomainTypeAdaptorFactoryInterface::Ptr> adaptorFactory; 58 QHash<QString, DomainTypeAdaptorFactoryInterface::Ptr> adaptorFactory;
55 bool revisionChanged; 59 bool revisionChanged;
60 void storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid);
61 QTime transactionTime;
62 int transactionItemCount;
56}; 63};
57 64
65void Pipeline::Private::storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid)
66{
67 Storage::mainDatabase(transaction, bufferType).write(Storage::assembleKey(uid, newRevision), BufferUtils::extractBuffer(fbb),
68 [uid, newRevision](const Storage::Error &error) {
69 Warning() << "Failed to write entity" << uid << newRevision;
70 }
71 );
72 revisionChanged = true;
73 Storage::setMaxRevision(transaction, newRevision);
74 Storage::recordRevision(transaction, newRevision, uid, bufferType);
75}
76
77
58Pipeline::Pipeline(const QString &resourceName, QObject *parent) 78Pipeline::Pipeline(const QString &resourceName, QObject *parent)
59 : QObject(parent), 79 : QObject(parent),
60 d(new Private(resourceName)) 80 d(new Private(resourceName))
@@ -86,7 +106,10 @@ void Pipeline::startTransaction()
86 if (d->transaction) { 106 if (d->transaction) {
87 return; 107 return;
88 } 108 }
89 d->transaction = std::move(storage().createTransaction(Sink::Storage::ReadWrite)); 109 Trace() << "Starting transaction.";
110 d->transactionTime.start();
111 d->transactionItemCount = 0;
112 d->transaction = std::move(storage().createTransaction(Storage::ReadWrite));
90} 113}
91 114
92void Pipeline::commit() 115void Pipeline::commit()
@@ -96,8 +119,9 @@ void Pipeline::commit()
96 // for (auto processor : d->processors[bufferType]) { 119 // for (auto processor : d->processors[bufferType]) {
97 // processor->finalize(); 120 // processor->finalize();
98 // } 121 // }
99 const auto revision = Sink::Storage::maxRevision(d->transaction); 122 const auto revision = Storage::maxRevision(d->transaction);
100 Trace() << "Committing " << revision; 123 const auto elapsed = d->transactionTime.elapsed();
124 Trace() << "Committing revision: " << revision << ":" << d->transactionItemCount << " items in: " << Log::TraceTime(elapsed) << " " << (double)elapsed/(double)qMax(d->transactionItemCount, 1) << "[ms/item]";
101 if (d->transaction) { 125 if (d->transaction) {
102 d->transaction.commit(); 126 d->transaction.commit();
103 } 127 }
@@ -118,41 +142,30 @@ Storage &Pipeline::storage() const
118 return d->storage; 142 return d->storage;
119} 143}
120 144
121void Pipeline::storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid)
122{
123 d->transaction.openDatabase(bufferType + ".main").write(Sink::Storage::assembleKey(uid, newRevision), BufferUtils::extractBuffer(fbb),
124 [](const Sink::Storage::Error &error) {
125 Warning() << "Failed to write entity";
126 }
127 );
128 d->revisionChanged = true;
129 Sink::Storage::setMaxRevision(d->transaction, newRevision);
130 Sink::Storage::recordRevision(d->transaction, newRevision, uid, bufferType);
131}
132
133KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size) 145KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size)
134{ 146{
135 Trace() << "Pipeline: New Entity"; 147 Trace() << "Pipeline: New Entity";
148 d->transactionItemCount++;
136 149
137 { 150 {
138 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size); 151 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size);
139 if (!Sink::Commands::VerifyCreateEntityBuffer(verifyer)) { 152 if (!Commands::VerifyCreateEntityBuffer(verifyer)) {
140 Warning() << "invalid buffer, not a create entity buffer"; 153 Warning() << "invalid buffer, not a create entity buffer";
141 return KAsync::error<qint64>(0); 154 return KAsync::error<qint64>(0);
142 } 155 }
143 } 156 }
144 auto createEntity = Sink::Commands::GetCreateEntity(command); 157 auto createEntity = Commands::GetCreateEntity(command);
145 158
146 const bool replayToSource = createEntity->replayToSource(); 159 const bool replayToSource = createEntity->replayToSource();
147 const QByteArray bufferType = QByteArray(reinterpret_cast<char const*>(createEntity->domainType()->Data()), createEntity->domainType()->size()); 160 const QByteArray bufferType = QByteArray(reinterpret_cast<char const*>(createEntity->domainType()->Data()), createEntity->domainType()->size());
148 { 161 {
149 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(createEntity->delta()->Data()), createEntity->delta()->size()); 162 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(createEntity->delta()->Data()), createEntity->delta()->size());
150 if (!Sink::VerifyEntityBuffer(verifyer)) { 163 if (!VerifyEntityBuffer(verifyer)) {
151 Warning() << "invalid buffer, not an entity buffer"; 164 Warning() << "invalid buffer, not an entity buffer";
152 return KAsync::error<qint64>(0); 165 return KAsync::error<qint64>(0);
153 } 166 }
154 } 167 }
155 auto entity = Sink::GetEntity(createEntity->delta()->Data()); 168 auto entity = GetEntity(createEntity->delta()->Data());
156 if (!entity->resource()->size() && !entity->local()->size()) { 169 if (!entity->resource()->size() && !entity->local()->size()) {
157 Warning() << "No local and no resource buffer while trying to create entity."; 170 Warning() << "No local and no resource buffer while trying to create entity.";
158 return KAsync::error<qint64>(0); 171 return KAsync::error<qint64>(0);
@@ -161,7 +174,7 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size)
161 QByteArray key; 174 QByteArray key;
162 if (createEntity->entityId()) { 175 if (createEntity->entityId()) {
163 key = QByteArray(reinterpret_cast<char const*>(createEntity->entityId()->Data()), createEntity->entityId()->size()); 176 key = QByteArray(reinterpret_cast<char const*>(createEntity->entityId()->Data()), createEntity->entityId()->size());
164 if (d->transaction.openDatabase(bufferType + ".main").contains(key)) { 177 if (Storage::mainDatabase(d->transaction, bufferType).contains(key)) {
165 ErrorMsg() << "An entity with this id already exists: " << key; 178 ErrorMsg() << "An entity with this id already exists: " << key;
166 return KAsync::error<qint64>(0); 179 return KAsync::error<qint64>(0);
167 } 180 }
@@ -171,21 +184,21 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size)
171 key = QUuid::createUuid().toString().toUtf8(); 184 key = QUuid::createUuid().toString().toUtf8();
172 } 185 }
173 Q_ASSERT(!key.isEmpty()); 186 Q_ASSERT(!key.isEmpty());
174 const qint64 newRevision = Sink::Storage::maxRevision(d->transaction) + 1; 187 const qint64 newRevision = Storage::maxRevision(d->transaction) + 1;
175 188
176 //Add metadata buffer 189 //Add metadata buffer
177 flatbuffers::FlatBufferBuilder metadataFbb; 190 flatbuffers::FlatBufferBuilder metadataFbb;
178 auto metadataBuilder = Sink::MetadataBuilder(metadataFbb); 191 auto metadataBuilder = MetadataBuilder(metadataFbb);
179 metadataBuilder.add_revision(newRevision); 192 metadataBuilder.add_revision(newRevision);
180 metadataBuilder.add_operation(Sink::Operation_Creation); 193 metadataBuilder.add_operation(Operation_Creation);
181 metadataBuilder.add_replayToSource(replayToSource); 194 metadataBuilder.add_replayToSource(replayToSource);
182 auto metadataBuffer = metadataBuilder.Finish(); 195 auto metadataBuffer = metadataBuilder.Finish();
183 Sink::FinishMetadataBuffer(metadataFbb, metadataBuffer); 196 FinishMetadataBuffer(metadataFbb, metadataBuffer);
184 197
185 flatbuffers::FlatBufferBuilder fbb; 198 flatbuffers::FlatBufferBuilder fbb;
186 EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), entity->resource()->Data(), entity->resource()->size(), entity->local()->Data(), entity->local()->size()); 199 EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), entity->resource()->Data(), entity->resource()->size(), entity->local()->Data(), entity->local()->size());
187 200
188 storeNewRevision(newRevision, fbb, bufferType, key); 201 d->storeNewRevision(newRevision, fbb, bufferType, key);
189 202
190 auto adaptorFactory = d->adaptorFactory.value(bufferType); 203 auto adaptorFactory = d->adaptorFactory.value(bufferType);
191 if (!adaptorFactory) { 204 if (!adaptorFactory) {
@@ -194,14 +207,14 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size)
194 } 207 }
195 208
196 Log() << "Pipeline: wrote entity: " << key << newRevision << bufferType; 209 Log() << "Pipeline: wrote entity: " << key << newRevision << bufferType;
197 d->transaction.openDatabase(bufferType + ".main").scan(Sink::Storage::assembleKey(key, newRevision), [this, bufferType, newRevision, adaptorFactory, key](const QByteArray &, const QByteArray &value) -> bool { 210 Storage::mainDatabase(d->transaction, bufferType).scan(Storage::assembleKey(key, newRevision), [this, bufferType, newRevision, adaptorFactory, key](const QByteArray &, const QByteArray &value) -> bool {
198 auto entity = Sink::GetEntity(value); 211 auto entity = GetEntity(value);
199 auto adaptor = adaptorFactory->createAdaptor(*entity); 212 auto adaptor = adaptorFactory->createAdaptor(*entity);
200 for (auto processor : d->processors[bufferType]) { 213 for (auto processor : d->processors[bufferType]) {
201 processor->newEntity(key, newRevision, *adaptor, d->transaction); 214 processor->newEntity(key, newRevision, *adaptor, d->transaction);
202 } 215 }
203 return false; 216 return false;
204 }, [this](const Sink::Storage::Error &error) { 217 }, [this](const Storage::Error &error) {
205 ErrorMsg() << "Failed to find value in pipeline: " << error.message; 218 ErrorMsg() << "Failed to find value in pipeline: " << error.message;
206 }); 219 });
207 return KAsync::start<qint64>([newRevision](){ 220 return KAsync::start<qint64>([newRevision](){
@@ -212,17 +225,18 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size)
212KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size) 225KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size)
213{ 226{
214 Trace() << "Pipeline: Modified Entity"; 227 Trace() << "Pipeline: Modified Entity";
228 d->transactionItemCount++;
215 229
216 const qint64 newRevision = Sink::Storage::maxRevision(d->transaction) + 1; 230 const qint64 newRevision = Storage::maxRevision(d->transaction) + 1;
217 231
218 { 232 {
219 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size); 233 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size);
220 if (!Sink::Commands::VerifyModifyEntityBuffer(verifyer)) { 234 if (!Commands::VerifyModifyEntityBuffer(verifyer)) {
221 Warning() << "invalid buffer, not a modify entity buffer"; 235 Warning() << "invalid buffer, not a modify entity buffer";
222 return KAsync::error<qint64>(0); 236 return KAsync::error<qint64>(0);
223 } 237 }
224 } 238 }
225 auto modifyEntity = Sink::Commands::GetModifyEntity(command); 239 auto modifyEntity = Commands::GetModifyEntity(command);
226 Q_ASSERT(modifyEntity); 240 Q_ASSERT(modifyEntity);
227 241
228 const qint64 baseRevision = modifyEntity->revision(); 242 const qint64 baseRevision = modifyEntity->revision();
@@ -236,7 +250,7 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size)
236 } 250 }
237 { 251 {
238 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(modifyEntity->delta()->Data()), modifyEntity->delta()->size()); 252 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(modifyEntity->delta()->Data()), modifyEntity->delta()->size());
239 if (!Sink::VerifyEntityBuffer(verifyer)) { 253 if (!VerifyEntityBuffer(verifyer)) {
240 Warning() << "invalid buffer, not an entity buffer"; 254 Warning() << "invalid buffer, not an entity buffer";
241 return KAsync::error<qint64>(0); 255 return KAsync::error<qint64>(0);
242 } 256 }
@@ -249,13 +263,13 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size)
249 return KAsync::error<qint64>(0); 263 return KAsync::error<qint64>(0);
250 } 264 }
251 265
252 auto diffEntity = Sink::GetEntity(modifyEntity->delta()->Data()); 266 auto diffEntity = GetEntity(modifyEntity->delta()->Data());
253 Q_ASSERT(diffEntity); 267 Q_ASSERT(diffEntity);
254 auto diff = adaptorFactory->createAdaptor(*diffEntity); 268 auto diff = adaptorFactory->createAdaptor(*diffEntity);
255 269
256 QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> current; 270 QSharedPointer<ApplicationDomain::BufferAdaptor> current;
257 d->transaction.openDatabase(bufferType + ".main").findLatest(key, [&current, adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool { 271 Storage::mainDatabase(d->transaction, bufferType).findLatest(key, [&current, adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool {
258 Sink::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); 272 EntityBuffer buffer(const_cast<const char *>(data.data()), data.size());
259 if (!buffer.isValid()) { 273 if (!buffer.isValid()) {
260 Warning() << "Read invalid buffer from disk"; 274 Warning() << "Read invalid buffer from disk";
261 } else { 275 } else {
@@ -273,15 +287,22 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size)
273 } 287 }
274 288
275 //resource and uid don't matter at this point 289 //resource and uid don't matter at this point
276 const Sink::ApplicationDomain::ApplicationDomainType existingObject("", "", newRevision, current); 290 const ApplicationDomain::ApplicationDomainType existingObject("", "", newRevision, current);
277 auto newObject = Sink::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<Sink::ApplicationDomain::ApplicationDomainType>(existingObject); 291 auto newObject = ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<ApplicationDomain::ApplicationDomainType>(existingObject);
278 292
279 //Apply diff 293 //Apply diff
280 //FIXME only apply the properties that are available in the buffer 294 //FIXME only apply the properties that are available in the buffer
281 Trace() << "Applying changed properties: " << diff->availableProperties(); 295 Trace() << "Applying changed properties: " << diff->availableProperties();
296 QSet<QByteArray> changeset;
282 for (const auto &property : diff->availableProperties()) { 297 for (const auto &property : diff->availableProperties()) {
283 newObject->setProperty(property, diff->getProperty(property)); 298 changeset << property;
299 const auto value = diff->getProperty(property);
300 if (value.isValid()) {
301 newObject->setProperty(property, value);
302 }
284 } 303 }
304 //Altough we only set some properties, we want all to be serialized
305 newObject->setChangedProperties(changeset);
285 306
286 //Remove deletions 307 //Remove deletions
287 if (modifyEntity->deletions()) { 308 if (modifyEntity->deletions()) {
@@ -292,26 +313,29 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size)
292 313
293 //Add metadata buffer 314 //Add metadata buffer
294 flatbuffers::FlatBufferBuilder metadataFbb; 315 flatbuffers::FlatBufferBuilder metadataFbb;
295 auto metadataBuilder = Sink::MetadataBuilder(metadataFbb); 316 auto metadataBuilder = MetadataBuilder(metadataFbb);
296 metadataBuilder.add_revision(newRevision); 317 metadataBuilder.add_revision(newRevision);
297 metadataBuilder.add_operation(Sink::Operation_Modification); 318 metadataBuilder.add_operation(Operation_Modification);
298 metadataBuilder.add_replayToSource(replayToSource); 319 metadataBuilder.add_replayToSource(replayToSource);
299 auto metadataBuffer = metadataBuilder.Finish(); 320 auto metadataBuffer = metadataBuilder.Finish();
300 Sink::FinishMetadataBuffer(metadataFbb, metadataBuffer); 321 FinishMetadataBuffer(metadataFbb, metadataBuffer);
301 322
302 flatbuffers::FlatBufferBuilder fbb; 323 flatbuffers::FlatBufferBuilder fbb;
303 adaptorFactory->createBuffer(*newObject, fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize()); 324 adaptorFactory->createBuffer(*newObject, fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize());
304 325
305 storeNewRevision(newRevision, fbb, bufferType, key); 326 d->storeNewRevision(newRevision, fbb, bufferType, key);
306 Log() << "Pipeline: modified entity: " << key << newRevision << bufferType; 327 Log() << "Pipeline: modified entity: " << key << newRevision << bufferType;
307 d->transaction.openDatabase(bufferType + ".main").scan(Sink::Storage::assembleKey(key, newRevision), [this, bufferType, newRevision, adaptorFactory, current, key](const QByteArray &, const QByteArray &value) -> bool { 328 Storage::mainDatabase(d->transaction, bufferType).scan(Storage::assembleKey(key, newRevision), [this, bufferType, newRevision, adaptorFactory, current, key](const QByteArray &k, const QByteArray &value) -> bool {
308 auto entity = Sink::GetEntity(value); 329 if (value.isEmpty()) {
330 ErrorMsg() << "Read buffer is empty.";
331 }
332 auto entity = GetEntity(value.data());
309 auto newEntity = adaptorFactory->createAdaptor(*entity); 333 auto newEntity = adaptorFactory->createAdaptor(*entity);
310 for (auto processor : d->processors[bufferType]) { 334 for (auto processor : d->processors[bufferType]) {
311 processor->modifiedEntity(key, newRevision, *current, *newEntity, d->transaction); 335 processor->modifiedEntity(key, newRevision, *current, *newEntity, d->transaction);
312 } 336 }
313 return false; 337 return false;
314 }, [this](const Sink::Storage::Error &error) { 338 }, [this](const Storage::Error &error) {
315 ErrorMsg() << "Failed to find value in pipeline: " << error.message; 339 ErrorMsg() << "Failed to find value in pipeline: " << error.message;
316 }); 340 });
317 return KAsync::start<qint64>([newRevision](){ 341 return KAsync::start<qint64>([newRevision](){
@@ -322,15 +346,16 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size)
322KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size) 346KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size)
323{ 347{
324 Trace() << "Pipeline: Deleted Entity"; 348 Trace() << "Pipeline: Deleted Entity";
349 d->transactionItemCount++;
325 350
326 { 351 {
327 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size); 352 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size);
328 if (!Sink::Commands::VerifyDeleteEntityBuffer(verifyer)) { 353 if (!Commands::VerifyDeleteEntityBuffer(verifyer)) {
329 Warning() << "invalid buffer, not a delete entity buffer"; 354 Warning() << "invalid buffer, not a delete entity buffer";
330 return KAsync::error<qint64>(0); 355 return KAsync::error<qint64>(0);
331 } 356 }
332 } 357 }
333 auto deleteEntity = Sink::Commands::GetDeleteEntity(command); 358 auto deleteEntity = Commands::GetDeleteEntity(command);
334 359
335 const bool replayToSource = deleteEntity->replayToSource(); 360 const bool replayToSource = deleteEntity->replayToSource();
336 const QByteArray bufferType = QByteArray(reinterpret_cast<char const*>(deleteEntity->domainType()->Data()), deleteEntity->domainType()->size()); 361 const QByteArray bufferType = QByteArray(reinterpret_cast<char const*>(deleteEntity->domainType()->Data()), deleteEntity->domainType()->size());
@@ -338,13 +363,12 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size)
338 363
339 bool found = false; 364 bool found = false;
340 bool alreadyRemoved = false; 365 bool alreadyRemoved = false;
341 d->transaction.openDatabase(bufferType + ".main").findLatest(key, [&found, &alreadyRemoved](const QByteArray &key, const QByteArray &data) -> bool { 366 Storage::mainDatabase(d->transaction, bufferType).findLatest(key, [&found, &alreadyRemoved](const QByteArray &key, const QByteArray &data) -> bool {
342 Sink::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); 367 auto entity = GetEntity(data.data());
343 auto entity = Sink::GetEntity(data.data());
344 if (entity && entity->metadata()) { 368 if (entity && entity->metadata()) {
345 auto metadata = Sink::GetMetadata(entity->metadata()->Data()); 369 auto metadata = GetMetadata(entity->metadata()->Data());
346 found = true; 370 found = true;
347 if (metadata->operation() == Sink::Operation_Removal) { 371 if (metadata->operation() == Operation_Removal) {
348 alreadyRemoved = true; 372 alreadyRemoved = true;
349 } 373 }
350 374
@@ -364,16 +388,16 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size)
364 return KAsync::error<qint64>(0); 388 return KAsync::error<qint64>(0);
365 } 389 }
366 390
367 const qint64 newRevision = Sink::Storage::maxRevision(d->transaction) + 1; 391 const qint64 newRevision = Storage::maxRevision(d->transaction) + 1;
368 392
369 //Add metadata buffer 393 //Add metadata buffer
370 flatbuffers::FlatBufferBuilder metadataFbb; 394 flatbuffers::FlatBufferBuilder metadataFbb;
371 auto metadataBuilder = Sink::MetadataBuilder(metadataFbb); 395 auto metadataBuilder = MetadataBuilder(metadataFbb);
372 metadataBuilder.add_revision(newRevision); 396 metadataBuilder.add_revision(newRevision);
373 metadataBuilder.add_operation(Sink::Operation_Removal); 397 metadataBuilder.add_operation(Operation_Removal);
374 metadataBuilder.add_replayToSource(replayToSource); 398 metadataBuilder.add_replayToSource(replayToSource);
375 auto metadataBuffer = metadataBuilder.Finish(); 399 auto metadataBuffer = metadataBuilder.Finish();
376 Sink::FinishMetadataBuffer(metadataFbb, metadataBuffer); 400 FinishMetadataBuffer(metadataFbb, metadataBuffer);
377 401
378 flatbuffers::FlatBufferBuilder fbb; 402 flatbuffers::FlatBufferBuilder fbb;
379 EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), 0, 0, 0, 0); 403 EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), 0, 0, 0, 0);
@@ -384,20 +408,20 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size)
384 return KAsync::error<qint64>(0); 408 return KAsync::error<qint64>(0);
385 } 409 }
386 410
387 QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> current; 411 QSharedPointer<ApplicationDomain::BufferAdaptor> current;
388 d->transaction.openDatabase(bufferType + ".main").findLatest(key, [this, bufferType, newRevision, adaptorFactory, key, &current](const QByteArray &, const QByteArray &data) -> bool { 412 Storage::mainDatabase(d->transaction, bufferType).findLatest(key, [this, bufferType, newRevision, adaptorFactory, key, &current](const QByteArray &, const QByteArray &data) -> bool {
389 Sink::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); 413 EntityBuffer buffer(const_cast<const char *>(data.data()), data.size());
390 if (!buffer.isValid()) { 414 if (!buffer.isValid()) {
391 Warning() << "Read invalid buffer from disk"; 415 Warning() << "Read invalid buffer from disk";
392 } else { 416 } else {
393 current = adaptorFactory->createAdaptor(buffer.entity()); 417 current = adaptorFactory->createAdaptor(buffer.entity());
394 } 418 }
395 return false; 419 return false;
396 }, [this](const Sink::Storage::Error &error) { 420 }, [this](const Storage::Error &error) {
397 ErrorMsg() << "Failed to find value in pipeline: " << error.message; 421 ErrorMsg() << "Failed to find value in pipeline: " << error.message;
398 }); 422 });
399 423
400 storeNewRevision(newRevision, fbb, bufferType, key); 424 d->storeNewRevision(newRevision, fbb, bufferType, key);
401 Log() << "Pipeline: deleted entity: "<< newRevision; 425 Log() << "Pipeline: deleted entity: "<< newRevision;
402 426
403 for (auto processor : d->processors[bufferType]) { 427 for (auto processor : d->processors[bufferType]) {
@@ -411,33 +435,33 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size)
411 435
412void Pipeline::cleanupRevision(qint64 revision) 436void Pipeline::cleanupRevision(qint64 revision)
413{ 437{
414 const auto uid = Sink::Storage::getUidFromRevision(d->transaction, revision); 438 const auto uid = Storage::getUidFromRevision(d->transaction, revision);
415 const auto bufferType = Sink::Storage::getTypeFromRevision(d->transaction, revision); 439 const auto bufferType = Storage::getTypeFromRevision(d->transaction, revision);
416 Trace() << "Cleaning up revision " << revision << uid << bufferType; 440 Trace() << "Cleaning up revision " << revision << uid << bufferType;
417 d->transaction.openDatabase(bufferType + ".main").scan(uid, [&](const QByteArray &key, const QByteArray &data) -> bool { 441 Storage::mainDatabase(d->transaction, bufferType).scan(uid, [&](const QByteArray &key, const QByteArray &data) -> bool {
418 Sink::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); 442 EntityBuffer buffer(const_cast<const char *>(data.data()), data.size());
419 if (!buffer.isValid()) { 443 if (!buffer.isValid()) {
420 Warning() << "Read invalid buffer from disk"; 444 Warning() << "Read invalid buffer from disk";
421 } else { 445 } else {
422 const auto metadata = flatbuffers::GetRoot<Sink::Metadata>(buffer.metadataBuffer()); 446 const auto metadata = flatbuffers::GetRoot<Metadata>(buffer.metadataBuffer());
423 const qint64 rev = metadata->revision(); 447 const qint64 rev = metadata->revision();
424 //Remove old revisions, and the current if the entity has already been removed 448 //Remove old revisions, and the current if the entity has already been removed
425 if (rev < revision || metadata->operation() == Sink::Operation_Removal) { 449 if (rev < revision || metadata->operation() == Operation_Removal) {
426 Sink::Storage::removeRevision(d->transaction, rev); 450 Storage::removeRevision(d->transaction, rev);
427 d->transaction.openDatabase(bufferType + ".main").remove(key); 451 Storage::mainDatabase(d->transaction, bufferType).remove(key);
428 } 452 }
429 } 453 }
430 454
431 return true; 455 return true;
432 }, [](const Sink::Storage::Error &error) { 456 }, [](const Storage::Error &error) {
433 Warning() << "Error while reading: " << error.message; 457 Warning() << "Error while reading: " << error.message;
434 }, true); 458 }, true);
435 Sink::Storage::setCleanedUpRevision(d->transaction, revision); 459 Storage::setCleanedUpRevision(d->transaction, revision);
436} 460}
437 461
438qint64 Pipeline::cleanedUpRevision() 462qint64 Pipeline::cleanedUpRevision()
439{ 463{
440 return Sink::Storage::cleanedUpRevision(d->transaction); 464 return Storage::cleanedUpRevision(d->transaction);
441} 465}
442 466
443Preprocessor::Preprocessor() 467Preprocessor::Preprocessor()
@@ -459,3 +483,7 @@ void Preprocessor::finalize()
459 483
460} // namespace Sink 484} // namespace Sink
461 485
486#pragma clang diagnostic push
487#pragma clang diagnostic ignored "-Wundefined-reinterpret-cast"
488#include "moc_pipeline.cpp"
489#pragma clang diagnostic pop