summaryrefslogtreecommitdiffstats
path: root/common/pipeline.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'common/pipeline.cpp')
-rw-r--r--common/pipeline.cpp275
1 files changed, 55 insertions, 220 deletions
diff --git a/common/pipeline.cpp b/common/pipeline.cpp
index e257857..ea59ae9 100644
--- a/common/pipeline.cpp
+++ b/common/pipeline.cpp
@@ -37,6 +37,7 @@
37#include "adaptorfactoryregistry.h" 37#include "adaptorfactoryregistry.h"
38#include "definitions.h" 38#include "definitions.h"
39#include "bufferutils.h" 39#include "bufferutils.h"
40#include "storage/entitystore.h"
40 41
41SINK_DEBUG_AREA("pipeline") 42SINK_DEBUG_AREA("pipeline")
42 43
@@ -46,31 +47,18 @@ using namespace Sink::Storage;
46class Pipeline::Private 47class Pipeline::Private
47{ 48{
48public: 49public:
49 Private(const ResourceContext &context) : resourceContext(context), storage(Sink::storageLocation(), context.instanceId(), DataStore::ReadWrite), revisionChanged(false) 50 Private(const ResourceContext &context) : resourceContext(context), entityStore(context), revisionChanged(false)
50 { 51 {
51 } 52 }
52 53
53 ResourceContext resourceContext; 54 ResourceContext resourceContext;
54 DataStore storage; 55 Storage::EntityStore entityStore;
55 DataStore::Transaction transaction;
56 QHash<QString, QVector<QSharedPointer<Preprocessor>>> processors; 56 QHash<QString, QVector<QSharedPointer<Preprocessor>>> processors;
57 bool revisionChanged; 57 bool revisionChanged;
58 void storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid);
59 QTime transactionTime; 58 QTime transactionTime;
60 int transactionItemCount; 59 int transactionItemCount;
61}; 60};
62 61
63void Pipeline::Private::storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid)
64{
65 SinkTrace() << "Committing new revision: " << uid << newRevision;
66 DataStore::mainDatabase(transaction, bufferType)
67 .write(DataStore::assembleKey(uid, newRevision), BufferUtils::extractBuffer(fbb),
68 [uid, newRevision](const DataStore::Error &error) { SinkWarning() << "Failed to write entity" << uid << newRevision; });
69 revisionChanged = true;
70 DataStore::setMaxRevision(transaction, newRevision);
71 DataStore::recordRevision(transaction, newRevision, uid, bufferType);
72}
73
74 62
75Pipeline::Pipeline(const ResourceContext &context) : QObject(nullptr), d(new Private(context)) 63Pipeline::Pipeline(const ResourceContext &context) : QObject(nullptr), d(new Private(context))
76{ 64{
@@ -78,7 +66,6 @@ Pipeline::Pipeline(const ResourceContext &context) : QObject(nullptr), d(new Pri
78 66
79Pipeline::~Pipeline() 67Pipeline::~Pipeline()
80{ 68{
81 d->transaction = DataStore::Transaction();
82} 69}
83 70
84void Pipeline::setPreprocessors(const QString &entityType, const QVector<Preprocessor *> &processors) 71void Pipeline::setPreprocessors(const QString &entityType, const QVector<Preprocessor *> &processors)
@@ -98,27 +85,10 @@ void Pipeline::startTransaction()
98 // for (auto processor : d->processors[bufferType]) { 85 // for (auto processor : d->processors[bufferType]) {
99 // processor->startBatch(); 86 // processor->startBatch();
100 // } 87 // }
101 if (d->transaction) {
102 return;
103 }
104 SinkTrace() << "Starting transaction."; 88 SinkTrace() << "Starting transaction.";
105 d->transactionTime.start(); 89 d->transactionTime.start();
106 d->transactionItemCount = 0; 90 d->transactionItemCount = 0;
107 d->transaction = storage().createTransaction(DataStore::ReadWrite, [](const DataStore::Error &error) { 91 d->entityStore.startTransaction(DataStore::ReadWrite);
108 SinkWarning() << error.message;
109 });
110
111 //FIXME this is a temporary measure to recover from a failure to open the named databases correctly.
112 //Once the actual problem is fixed it will be enough to simply crash if we open the wrong database (which we check in openDatabase already).
113 //It seems like the validateNamedDatabase calls actually stops the mdb_put failures during sync...
114 if (d->storage.exists()) {
115 while (!d->transaction.validateNamedDatabases()) {
116 SinkWarning() << "Opened an invalid transaction!!!!!!";
117 d->transaction = storage().createTransaction(DataStore::ReadWrite, [](const DataStore::Error &error) {
118 SinkWarning() << error.message;
119 });
120 }
121 }
122} 92}
123 93
124void Pipeline::commit() 94void Pipeline::commit()
@@ -129,34 +99,20 @@ void Pipeline::commit()
129 // processor->finalize(); 99 // processor->finalize();
130 // } 100 // }
131 if (!d->revisionChanged) { 101 if (!d->revisionChanged) {
132 d->transaction.abort(); 102 d->entityStore.abortTransaction();
133 d->transaction = DataStore::Transaction();
134 return; 103 return;
135 } 104 }
136 const auto revision = DataStore::maxRevision(d->transaction); 105 const auto revision = d->entityStore.maxRevision();
137 const auto elapsed = d->transactionTime.elapsed(); 106 const auto elapsed = d->transactionTime.elapsed();
138 SinkLog() << "Committing revision: " << revision << ":" << d->transactionItemCount << " items in: " << Log::TraceTime(elapsed) << " " 107 SinkLog() << "Committing revision: " << revision << ":" << d->transactionItemCount << " items in: " << Log::TraceTime(elapsed) << " "
139 << (double)elapsed / (double)qMax(d->transactionItemCount, 1) << "[ms/item]"; 108 << (double)elapsed / (double)qMax(d->transactionItemCount, 1) << "[ms/item]";
140 if (d->transaction) { 109 d->entityStore.commitTransaction();
141 d->transaction.commit();
142 }
143 d->transaction = DataStore::Transaction();
144 if (d->revisionChanged) { 110 if (d->revisionChanged) {
145 d->revisionChanged = false; 111 d->revisionChanged = false;
146 emit revisionUpdated(revision); 112 emit revisionUpdated(revision);
147 } 113 }
148} 114}
149 115
150DataStore::Transaction &Pipeline::transaction()
151{
152 return d->transaction;
153}
154
155DataStore &Pipeline::storage() const
156{
157 return d->storage;
158}
159
160KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size) 116KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size)
161{ 117{
162 d->transactionItemCount++; 118 d->transactionItemCount++;
@@ -175,7 +131,7 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size)
175 QByteArray key; 131 QByteArray key;
176 if (createEntity->entityId()) { 132 if (createEntity->entityId()) {
177 key = QByteArray(reinterpret_cast<char const *>(createEntity->entityId()->Data()), createEntity->entityId()->size()); 133 key = QByteArray(reinterpret_cast<char const *>(createEntity->entityId()->Data()), createEntity->entityId()->size());
178 if (DataStore::mainDatabase(d->transaction, bufferType).contains(key)) { 134 if (d->entityStore.contains(bufferType, key)) {
179 SinkError() << "An entity with this id already exists: " << key; 135 SinkError() << "An entity with this id already exists: " << key;
180 return KAsync::error<qint64>(0); 136 return KAsync::error<qint64>(0);
181 } 137 }
@@ -208,29 +164,23 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size)
208 164
209 auto adaptor = adaptorFactory->createAdaptor(*entity); 165 auto adaptor = adaptorFactory->createAdaptor(*entity);
210 auto memoryAdaptor = QSharedPointer<Sink::ApplicationDomain::MemoryBufferAdaptor>::create(*(adaptor), adaptor->availableProperties()); 166 auto memoryAdaptor = QSharedPointer<Sink::ApplicationDomain::MemoryBufferAdaptor>::create(*(adaptor), adaptor->availableProperties());
211 foreach (const auto &processor, d->processors[bufferType]) {
212 processor->newEntity(key, DataStore::maxRevision(d->transaction) + 1, *memoryAdaptor, d->transaction);
213 }
214 //The maxRevision may have changed meanwhile if the entity created sub-entities
215 const qint64 newRevision = DataStore::maxRevision(d->transaction) + 1;
216
217 // Add metadata buffer
218 flatbuffers::FlatBufferBuilder metadataFbb;
219 auto metadataBuilder = MetadataBuilder(metadataFbb);
220 metadataBuilder.add_revision(newRevision);
221 metadataBuilder.add_operation(Operation_Creation);
222 metadataBuilder.add_replayToSource(replayToSource);
223 auto metadataBuffer = metadataBuilder.Finish();
224 FinishMetadataBuffer(metadataFbb, metadataBuffer);
225 167
226 flatbuffers::FlatBufferBuilder fbb; 168 d->revisionChanged = true;
227 adaptorFactory->createBuffer(memoryAdaptor, fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize()); 169 auto revision = d->entityStore.maxRevision();
170 auto o = Sink::ApplicationDomain::ApplicationDomainType{d->resourceContext.instanceId(), key, revision, memoryAdaptor};
171 o.setChangedProperties(o.availableProperties().toSet());
228 172
229 d->storeNewRevision(newRevision, fbb, bufferType, key); 173 auto preprocess = [&, this](ApplicationDomain::ApplicationDomainType &newEntity) {
174 foreach (const auto &processor, d->processors[bufferType]) {
175 processor->newEntity(newEntity);
176 }
177 };
230 178
231 //FIXME entityStore->create(bufferType, memoryAdaptor, replayToSource) 179 if (!d->entityStore.add(bufferType, o, replayToSource, preprocess)) {
180 return KAsync::error<qint64>(0);
181 }
232 182
233 return KAsync::value(newRevision); 183 return KAsync::value(d->entityStore.maxRevision());
234} 184}
235 185
236KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size) 186KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size)
@@ -254,6 +204,7 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size)
254 } 204 }
255 const qint64 baseRevision = modifyEntity->revision(); 205 const qint64 baseRevision = modifyEntity->revision();
256 const bool replayToSource = modifyEntity->replayToSource(); 206 const bool replayToSource = modifyEntity->replayToSource();
207
257 const QByteArray bufferType = QByteArray(reinterpret_cast<char const *>(modifyEntity->domainType()->Data()), modifyEntity->domainType()->size()); 208 const QByteArray bufferType = QByteArray(reinterpret_cast<char const *>(modifyEntity->domainType()->Data()), modifyEntity->domainType()->size());
258 const QByteArray key = QByteArray(reinterpret_cast<char const *>(modifyEntity->entityId()->Data()), modifyEntity->entityId()->size()); 209 const QByteArray key = QByteArray(reinterpret_cast<char const *>(modifyEntity->entityId()->Data()), modifyEntity->entityId()->size());
259 SinkTrace() << "Modified Entity. Type: " << bufferType << "uid: "<< key << " replayToSource: " << replayToSource; 210 SinkTrace() << "Modified Entity. Type: " << bufferType << "uid: "<< key << " replayToSource: " << replayToSource;
@@ -269,7 +220,6 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size)
269 } 220 }
270 } 221 }
271 222
272 // TODO use only readPropertyMapper and writePropertyMapper
273 auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceContext.resourceType, bufferType); 223 auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceContext.resourceType, bufferType);
274 if (!adaptorFactory) { 224 if (!adaptorFactory) {
275 SinkWarning() << "no adaptor factory for type " << bufferType; 225 SinkWarning() << "no adaptor factory for type " << bufferType;
@@ -278,72 +228,26 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size)
278 228
279 auto diffEntity = GetEntity(modifyEntity->delta()->Data()); 229 auto diffEntity = GetEntity(modifyEntity->delta()->Data());
280 Q_ASSERT(diffEntity); 230 Q_ASSERT(diffEntity);
281 auto diff = adaptorFactory->createAdaptor(*diffEntity); 231 Sink::ApplicationDomain::ApplicationDomainType diff{d->resourceContext.instanceId(), key, baseRevision, adaptorFactory->createAdaptor(*diffEntity)};
282 232 diff.setChangedProperties(changeset.toSet());
283 QSharedPointer<ApplicationDomain::BufferAdaptor> current;
284 DataStore::mainDatabase(d->transaction, bufferType)
285 .findLatest(key,
286 [&current, adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool {
287 EntityBuffer buffer(const_cast<const char *>(data.data()), data.size());
288 if (!buffer.isValid()) {
289 SinkWarning() << "Read invalid buffer from disk";
290 } else {
291 current = adaptorFactory->createAdaptor(buffer.entity());
292 }
293 return false;
294 },
295 [baseRevision](const DataStore::Error &error) { SinkWarning() << "Failed to read old revision from storage: " << error.message << "Revision: " << baseRevision; });
296
297 if (!current) {
298 SinkWarning() << "Failed to read local value " << key;
299 return KAsync::error<qint64>(0);
300 }
301
302 auto newAdaptor = QSharedPointer<Sink::ApplicationDomain::MemoryBufferAdaptor>::create(*(current), current->availableProperties());
303
304 // Apply diff
305 // FIXME only apply the properties that are available in the buffer
306 SinkTrace() << "Applying changed properties: " << changeset;
307 for (const auto &property : changeset) {
308 const auto value = diff->getProperty(property);
309 if (value.isValid()) {
310 newAdaptor->setProperty(property, value);
311 }
312 }
313 233
314 // Remove deletions 234 QByteArrayList deletions;
315 if (modifyEntity->deletions()) { 235 if (modifyEntity->deletions()) {
316 for (const flatbuffers::String *property : *modifyEntity->deletions()) { 236 deletions = BufferUtils::fromVector(*modifyEntity->deletions());
317 newAdaptor->setProperty(BufferUtils::extractBuffer(property), QVariant());
318 }
319 } 237 }
320 238
321 newAdaptor->resetChangedProperties(); 239 auto preprocess = [&, this](const ApplicationDomain::ApplicationDomainType &oldEntity, ApplicationDomain::ApplicationDomainType &newEntity) {
322 foreach (const auto &processor, d->processors[bufferType]) { 240 foreach (const auto &processor, d->processors[bufferType]) {
323 processor->modifiedEntity(key, DataStore::maxRevision(d->transaction) + 1, *current, *newAdaptor, d->transaction); 241 processor->modifiedEntity(oldEntity, newEntity);
324 } 242 }
325 //The maxRevision may have changed meanwhile if the entity created sub-entities 243 };
326 const qint64 newRevision = DataStore::maxRevision(d->transaction) + 1;
327 244
328 // Add metadata buffer 245 d->revisionChanged = true;
329 flatbuffers::FlatBufferBuilder metadataFbb; 246 if (!d->entityStore.modify(bufferType, diff, deletions, replayToSource, preprocess)) {
330 { 247 return KAsync::error<qint64>(0);
331 //We add availableProperties to account for the properties that have been changed by the preprocessors
332 auto modifiedProperties = BufferUtils::toVector(metadataFbb, changeset + newAdaptor->changedProperties());
333 auto metadataBuilder = MetadataBuilder(metadataFbb);
334 metadataBuilder.add_revision(newRevision);
335 metadataBuilder.add_operation(Operation_Modification);
336 metadataBuilder.add_replayToSource(replayToSource);
337 metadataBuilder.add_modifiedProperties(modifiedProperties);
338 auto metadataBuffer = metadataBuilder.Finish();
339 FinishMetadataBuffer(metadataFbb, metadataBuffer);
340 } 248 }
341 249
342 flatbuffers::FlatBufferBuilder fbb; 250 return KAsync::value(d->entityStore.maxRevision());
343 adaptorFactory->createBuffer(newAdaptor, fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize());
344
345 d->storeNewRevision(newRevision, fbb, bufferType, key);
346 return KAsync::value(newRevision);
347} 251}
348 252
349KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size) 253KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size)
@@ -364,106 +268,38 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size)
364 const QByteArray key = QByteArray(reinterpret_cast<char const *>(deleteEntity->entityId()->Data()), deleteEntity->entityId()->size()); 268 const QByteArray key = QByteArray(reinterpret_cast<char const *>(deleteEntity->entityId()->Data()), deleteEntity->entityId()->size());
365 SinkTrace() << "Deleted Entity. Type: " << bufferType << "uid: "<< key << " replayToSource: " << replayToSource; 269 SinkTrace() << "Deleted Entity. Type: " << bufferType << "uid: "<< key << " replayToSource: " << replayToSource;
366 270
367 bool found = false; 271 auto preprocess = [&, this](const ApplicationDomain::ApplicationDomainType &oldEntity) {
368 bool alreadyRemoved = false; 272 foreach (const auto &processor, d->processors[bufferType]) {
369 DataStore::mainDatabase(d->transaction, bufferType) 273 processor->deletedEntity(oldEntity);
370 .findLatest(key, 274 }
371 [&found, &alreadyRemoved](const QByteArray &key, const QByteArray &data) -> bool { 275 };
372 auto entity = GetEntity(data.data());
373 if (entity && entity->metadata()) {
374 auto metadata = GetMetadata(entity->metadata()->Data());
375 found = true;
376 if (metadata->operation() == Operation_Removal) {
377 alreadyRemoved = true;
378 }
379 }
380 return false;
381 },
382 [](const DataStore::Error &error) { SinkWarning() << "Failed to read old revision from storage: " << error.message; });
383
384 if (!found) {
385 SinkWarning() << "Failed to find entity " << key;
386 return KAsync::error<qint64>(0);
387 }
388 if (alreadyRemoved) {
389 SinkWarning() << "Entity is already removed " << key;
390 return KAsync::error<qint64>(0);
391 }
392
393 const qint64 newRevision = DataStore::maxRevision(d->transaction) + 1;
394
395 // Add metadata buffer
396 flatbuffers::FlatBufferBuilder metadataFbb;
397 auto metadataBuilder = MetadataBuilder(metadataFbb);
398 metadataBuilder.add_revision(newRevision);
399 metadataBuilder.add_operation(Operation_Removal);
400 metadataBuilder.add_replayToSource(replayToSource);
401 auto metadataBuffer = metadataBuilder.Finish();
402 FinishMetadataBuffer(metadataFbb, metadataBuffer);
403
404 flatbuffers::FlatBufferBuilder fbb;
405 EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), 0, 0, 0, 0);
406 276
407 auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceContext.resourceType, bufferType); 277 d->revisionChanged = true;
408 if (!adaptorFactory) { 278 if (!d->entityStore.remove(bufferType, key, replayToSource, preprocess)) {
409 SinkWarning() << "no adaptor factory for type " << bufferType;
410 return KAsync::error<qint64>(0); 279 return KAsync::error<qint64>(0);
411 } 280 }
412 281
413 QSharedPointer<ApplicationDomain::BufferAdaptor> current; 282 return KAsync::value(d->entityStore.maxRevision());
414 DataStore::mainDatabase(d->transaction, bufferType)
415 .findLatest(key,
416 [this, bufferType, newRevision, adaptorFactory, key, &current](const QByteArray &, const QByteArray &data) -> bool {
417 EntityBuffer buffer(const_cast<const char *>(data.data()), data.size());
418 if (!buffer.isValid()) {
419 SinkWarning() << "Read invalid buffer from disk";
420 } else {
421 current = adaptorFactory->createAdaptor(buffer.entity());
422 }
423 return false;
424 },
425 [this](const DataStore::Error &error) { SinkError() << "Failed to find value in pipeline: " << error.message; });
426
427 d->storeNewRevision(newRevision, fbb, bufferType, key);
428
429 foreach (const auto &processor, d->processors[bufferType]) {
430 processor->deletedEntity(key, newRevision, *current, d->transaction);
431 }
432
433 return KAsync::value(newRevision);
434} 283}
435 284
436void Pipeline::cleanupRevision(qint64 revision) 285void Pipeline::cleanupRevision(qint64 revision)
437{ 286{
287 d->entityStore.cleanupRevision(revision);
438 d->revisionChanged = true; 288 d->revisionChanged = true;
439 const auto uid = DataStore::getUidFromRevision(d->transaction, revision);
440 const auto bufferType = DataStore::getTypeFromRevision(d->transaction, revision);
441 SinkTrace() << "Cleaning up revision " << revision << uid << bufferType;
442 DataStore::mainDatabase(d->transaction, bufferType)
443 .scan(uid,
444 [&](const QByteArray &key, const QByteArray &data) -> bool {
445 EntityBuffer buffer(const_cast<const char *>(data.data()), data.size());
446 if (!buffer.isValid()) {
447 SinkWarning() << "Read invalid buffer from disk";
448 } else {
449 const auto metadata = flatbuffers::GetRoot<Metadata>(buffer.metadataBuffer());
450 const qint64 rev = metadata->revision();
451 // Remove old revisions, and the current if the entity has already been removed
452 if (rev < revision || metadata->operation() == Operation_Removal) {
453 DataStore::removeRevision(d->transaction, rev);
454 DataStore::mainDatabase(d->transaction, bufferType).remove(key);
455 }
456 }
457
458 return true;
459 },
460 [](const DataStore::Error &error) { SinkWarning() << "Error while reading: " << error.message; }, true);
461 DataStore::setCleanedUpRevision(d->transaction, revision);
462} 289}
463 290
464qint64 Pipeline::cleanedUpRevision() 291qint64 Pipeline::cleanedUpRevision()
465{ 292{
466 return DataStore::cleanedUpRevision(d->transaction); 293 /* return d->entityStore.cleanedUpRevision(); */
294 /* return DataStore::cleanedUpRevision(d->transaction); */
295 //FIXME Just move the whole cleanup revision iteration into the entitystore
296 return 0;
297}
298
299qint64 Pipeline::revision()
300{
301 //FIXME Just move the whole cleanup revision iteration into the entitystore
302 return 0;
467} 303}
468 304
469class Preprocessor::Private { 305class Preprocessor::Private {
@@ -492,7 +328,7 @@ void Preprocessor::startBatch()
492{ 328{
493} 329}
494 330
495void Preprocessor::finalize() 331void Preprocessor::finalizeBatch()
496{ 332{
497} 333}
498 334
@@ -510,7 +346,6 @@ void Preprocessor::createEntity(const Sink::ApplicationDomain::ApplicationDomain
510 346
511 flatbuffers::FlatBufferBuilder fbb; 347 flatbuffers::FlatBufferBuilder fbb;
512 auto entityId = fbb.CreateString(entity.identifier()); 348 auto entityId = fbb.CreateString(entity.identifier());
513 // This is the resource buffer type and not the domain type
514 auto type = fbb.CreateString(typeName); 349 auto type = fbb.CreateString(typeName);
515 auto delta = Sink::EntityBuffer::appendAsVector(fbb, entityBuffer.constData(), entityBuffer.size()); 350 auto delta = Sink::EntityBuffer::appendAsVector(fbb, entityBuffer.constData(), entityBuffer.size());
516 auto location = Sink::Commands::CreateCreateEntity(fbb, entityId, type, delta); 351 auto location = Sink::Commands::CreateCreateEntity(fbb, entityId, type, delta);