summaryrefslogtreecommitdiffstats
path: root/common/pipeline.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'common/pipeline.cpp')
-rw-r--r--common/pipeline.cpp99
1 files changed, 47 insertions, 52 deletions
diff --git a/common/pipeline.cpp b/common/pipeline.cpp
index ce864f7..e257857 100644
--- a/common/pipeline.cpp
+++ b/common/pipeline.cpp
@@ -40,45 +40,45 @@
40 40
41SINK_DEBUG_AREA("pipeline") 41SINK_DEBUG_AREA("pipeline")
42 42
43namespace Sink { 43using namespace Sink;
44using namespace Sink::Storage;
44 45
45class Pipeline::Private 46class Pipeline::Private
46{ 47{
47public: 48public:
48 Private(const QString &resourceName) : storage(Sink::storageLocation(), resourceName, Storage::ReadWrite), revisionChanged(false), resourceInstanceIdentifier(resourceName.toUtf8()) 49 Private(const ResourceContext &context) : resourceContext(context), storage(Sink::storageLocation(), context.instanceId(), DataStore::ReadWrite), revisionChanged(false)
49 { 50 {
50 } 51 }
51 52
52 Storage storage; 53 ResourceContext resourceContext;
53 Storage::Transaction transaction; 54 DataStore storage;
55 DataStore::Transaction transaction;
54 QHash<QString, QVector<QSharedPointer<Preprocessor>>> processors; 56 QHash<QString, QVector<QSharedPointer<Preprocessor>>> processors;
55 bool revisionChanged; 57 bool revisionChanged;
56 void storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid); 58 void storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid);
57 QTime transactionTime; 59 QTime transactionTime;
58 int transactionItemCount; 60 int transactionItemCount;
59 QByteArray resourceType;
60 QByteArray resourceInstanceIdentifier;
61}; 61};
62 62
63void Pipeline::Private::storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid) 63void Pipeline::Private::storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid)
64{ 64{
65 SinkTrace() << "Committing new revision: " << uid << newRevision; 65 SinkTrace() << "Committing new revision: " << uid << newRevision;
66 Storage::mainDatabase(transaction, bufferType) 66 DataStore::mainDatabase(transaction, bufferType)
67 .write(Storage::assembleKey(uid, newRevision), BufferUtils::extractBuffer(fbb), 67 .write(DataStore::assembleKey(uid, newRevision), BufferUtils::extractBuffer(fbb),
68 [uid, newRevision](const Storage::Error &error) { SinkWarning() << "Failed to write entity" << uid << newRevision; }); 68 [uid, newRevision](const DataStore::Error &error) { SinkWarning() << "Failed to write entity" << uid << newRevision; });
69 revisionChanged = true; 69 revisionChanged = true;
70 Storage::setMaxRevision(transaction, newRevision); 70 DataStore::setMaxRevision(transaction, newRevision);
71 Storage::recordRevision(transaction, newRevision, uid, bufferType); 71 DataStore::recordRevision(transaction, newRevision, uid, bufferType);
72} 72}
73 73
74 74
75Pipeline::Pipeline(const QString &resourceName, QObject *parent) : QObject(parent), d(new Private(resourceName)) 75Pipeline::Pipeline(const ResourceContext &context) : QObject(nullptr), d(new Private(context))
76{ 76{
77} 77}
78 78
79Pipeline::~Pipeline() 79Pipeline::~Pipeline()
80{ 80{
81 d->transaction = Storage::Transaction(); 81 d->transaction = DataStore::Transaction();
82} 82}
83 83
84void Pipeline::setPreprocessors(const QString &entityType, const QVector<Preprocessor *> &processors) 84void Pipeline::setPreprocessors(const QString &entityType, const QVector<Preprocessor *> &processors)
@@ -86,16 +86,11 @@ void Pipeline::setPreprocessors(const QString &entityType, const QVector<Preproc
86 auto &list = d->processors[entityType]; 86 auto &list = d->processors[entityType];
87 list.clear(); 87 list.clear();
88 for (auto p : processors) { 88 for (auto p : processors) {
89 p->setup(d->resourceType, d->resourceInstanceIdentifier, this); 89 p->setup(d->resourceContext.resourceType, d->resourceContext.instanceId(), this);
90 list.append(QSharedPointer<Preprocessor>(p)); 90 list.append(QSharedPointer<Preprocessor>(p));
91 } 91 }
92} 92}
93 93
94void Pipeline::setResourceType(const QByteArray &resourceType)
95{
96 d->resourceType = resourceType;
97}
98
99void Pipeline::startTransaction() 94void Pipeline::startTransaction()
100{ 95{
101 // TODO call for all types 96 // TODO call for all types
@@ -109,7 +104,7 @@ void Pipeline::startTransaction()
109 SinkTrace() << "Starting transaction."; 104 SinkTrace() << "Starting transaction.";
110 d->transactionTime.start(); 105 d->transactionTime.start();
111 d->transactionItemCount = 0; 106 d->transactionItemCount = 0;
112 d->transaction = storage().createTransaction(Storage::ReadWrite, [](const Sink::Storage::Error &error) { 107 d->transaction = storage().createTransaction(DataStore::ReadWrite, [](const DataStore::Error &error) {
113 SinkWarning() << error.message; 108 SinkWarning() << error.message;
114 }); 109 });
115 110
@@ -119,7 +114,7 @@ void Pipeline::startTransaction()
119 if (d->storage.exists()) { 114 if (d->storage.exists()) {
120 while (!d->transaction.validateNamedDatabases()) { 115 while (!d->transaction.validateNamedDatabases()) {
121 SinkWarning() << "Opened an invalid transaction!!!!!!"; 116 SinkWarning() << "Opened an invalid transaction!!!!!!";
122 d->transaction = storage().createTransaction(Storage::ReadWrite, [](const Sink::Storage::Error &error) { 117 d->transaction = storage().createTransaction(DataStore::ReadWrite, [](const DataStore::Error &error) {
123 SinkWarning() << error.message; 118 SinkWarning() << error.message;
124 }); 119 });
125 } 120 }
@@ -135,29 +130,29 @@ void Pipeline::commit()
135 // } 130 // }
136 if (!d->revisionChanged) { 131 if (!d->revisionChanged) {
137 d->transaction.abort(); 132 d->transaction.abort();
138 d->transaction = Storage::Transaction(); 133 d->transaction = DataStore::Transaction();
139 return; 134 return;
140 } 135 }
141 const auto revision = Storage::maxRevision(d->transaction); 136 const auto revision = DataStore::maxRevision(d->transaction);
142 const auto elapsed = d->transactionTime.elapsed(); 137 const auto elapsed = d->transactionTime.elapsed();
143 SinkLog() << "Committing revision: " << revision << ":" << d->transactionItemCount << " items in: " << Log::TraceTime(elapsed) << " " 138 SinkLog() << "Committing revision: " << revision << ":" << d->transactionItemCount << " items in: " << Log::TraceTime(elapsed) << " "
144 << (double)elapsed / (double)qMax(d->transactionItemCount, 1) << "[ms/item]"; 139 << (double)elapsed / (double)qMax(d->transactionItemCount, 1) << "[ms/item]";
145 if (d->transaction) { 140 if (d->transaction) {
146 d->transaction.commit(); 141 d->transaction.commit();
147 } 142 }
148 d->transaction = Storage::Transaction(); 143 d->transaction = DataStore::Transaction();
149 if (d->revisionChanged) { 144 if (d->revisionChanged) {
150 d->revisionChanged = false; 145 d->revisionChanged = false;
151 emit revisionUpdated(revision); 146 emit revisionUpdated(revision);
152 } 147 }
153} 148}
154 149
155Storage::Transaction &Pipeline::transaction() 150DataStore::Transaction &Pipeline::transaction()
156{ 151{
157 return d->transaction; 152 return d->transaction;
158} 153}
159 154
160Storage &Pipeline::storage() const 155DataStore &Pipeline::storage() const
161{ 156{
162 return d->storage; 157 return d->storage;
163} 158}
@@ -180,14 +175,14 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size)
180 QByteArray key; 175 QByteArray key;
181 if (createEntity->entityId()) { 176 if (createEntity->entityId()) {
182 key = QByteArray(reinterpret_cast<char const *>(createEntity->entityId()->Data()), createEntity->entityId()->size()); 177 key = QByteArray(reinterpret_cast<char const *>(createEntity->entityId()->Data()), createEntity->entityId()->size());
183 if (Storage::mainDatabase(d->transaction, bufferType).contains(key)) { 178 if (DataStore::mainDatabase(d->transaction, bufferType).contains(key)) {
184 SinkError() << "An entity with this id already exists: " << key; 179 SinkError() << "An entity with this id already exists: " << key;
185 return KAsync::error<qint64>(0); 180 return KAsync::error<qint64>(0);
186 } 181 }
187 } 182 }
188 183
189 if (key.isEmpty()) { 184 if (key.isEmpty()) {
190 key = Sink::Storage::generateUid(); 185 key = DataStore::generateUid();
191 } 186 }
192 SinkTrace() << "New Entity. Type: " << bufferType << "uid: "<< key << " replayToSource: " << replayToSource; 187 SinkTrace() << "New Entity. Type: " << bufferType << "uid: "<< key << " replayToSource: " << replayToSource;
193 Q_ASSERT(!key.isEmpty()); 188 Q_ASSERT(!key.isEmpty());
@@ -205,7 +200,7 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size)
205 return KAsync::error<qint64>(0); 200 return KAsync::error<qint64>(0);
206 } 201 }
207 202
208 auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceType, bufferType); 203 auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceContext.resourceType, bufferType);
209 if (!adaptorFactory) { 204 if (!adaptorFactory) {
210 SinkWarning() << "no adaptor factory for type " << bufferType; 205 SinkWarning() << "no adaptor factory for type " << bufferType;
211 return KAsync::error<qint64>(0); 206 return KAsync::error<qint64>(0);
@@ -214,10 +209,10 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size)
214 auto adaptor = adaptorFactory->createAdaptor(*entity); 209 auto adaptor = adaptorFactory->createAdaptor(*entity);
215 auto memoryAdaptor = QSharedPointer<Sink::ApplicationDomain::MemoryBufferAdaptor>::create(*(adaptor), adaptor->availableProperties()); 210 auto memoryAdaptor = QSharedPointer<Sink::ApplicationDomain::MemoryBufferAdaptor>::create(*(adaptor), adaptor->availableProperties());
216 foreach (const auto &processor, d->processors[bufferType]) { 211 foreach (const auto &processor, d->processors[bufferType]) {
217 processor->newEntity(key, Storage::maxRevision(d->transaction) + 1, *memoryAdaptor, d->transaction); 212 processor->newEntity(key, DataStore::maxRevision(d->transaction) + 1, *memoryAdaptor, d->transaction);
218 } 213 }
219 //The maxRevision may have changed meanwhile if the entity created sub-entities 214 //The maxRevision may have changed meanwhile if the entity created sub-entities
220 const qint64 newRevision = Storage::maxRevision(d->transaction) + 1; 215 const qint64 newRevision = DataStore::maxRevision(d->transaction) + 1;
221 216
222 // Add metadata buffer 217 // Add metadata buffer
223 flatbuffers::FlatBufferBuilder metadataFbb; 218 flatbuffers::FlatBufferBuilder metadataFbb;
@@ -233,6 +228,8 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size)
233 228
234 d->storeNewRevision(newRevision, fbb, bufferType, key); 229 d->storeNewRevision(newRevision, fbb, bufferType, key);
235 230
231 //FIXME entityStore->create(bufferType, memoryAdaptor, replayToSource)
232
236 return KAsync::value(newRevision); 233 return KAsync::value(newRevision);
237} 234}
238 235
@@ -273,7 +270,7 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size)
273 } 270 }
274 271
275 // TODO use only readPropertyMapper and writePropertyMapper 272 // TODO use only readPropertyMapper and writePropertyMapper
276 auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceType, bufferType); 273 auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceContext.resourceType, bufferType);
277 if (!adaptorFactory) { 274 if (!adaptorFactory) {
278 SinkWarning() << "no adaptor factory for type " << bufferType; 275 SinkWarning() << "no adaptor factory for type " << bufferType;
279 return KAsync::error<qint64>(0); 276 return KAsync::error<qint64>(0);
@@ -284,7 +281,7 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size)
284 auto diff = adaptorFactory->createAdaptor(*diffEntity); 281 auto diff = adaptorFactory->createAdaptor(*diffEntity);
285 282
286 QSharedPointer<ApplicationDomain::BufferAdaptor> current; 283 QSharedPointer<ApplicationDomain::BufferAdaptor> current;
287 Storage::mainDatabase(d->transaction, bufferType) 284 DataStore::mainDatabase(d->transaction, bufferType)
288 .findLatest(key, 285 .findLatest(key,
289 [&current, adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool { 286 [&current, adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool {
290 EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); 287 EntityBuffer buffer(const_cast<const char *>(data.data()), data.size());
@@ -295,7 +292,7 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size)
295 } 292 }
296 return false; 293 return false;
297 }, 294 },
298 [baseRevision](const Storage::Error &error) { SinkWarning() << "Failed to read old revision from storage: " << error.message << "Revision: " << baseRevision; }); 295 [baseRevision](const DataStore::Error &error) { SinkWarning() << "Failed to read old revision from storage: " << error.message << "Revision: " << baseRevision; });
299 296
300 if (!current) { 297 if (!current) {
301 SinkWarning() << "Failed to read local value " << key; 298 SinkWarning() << "Failed to read local value " << key;
@@ -323,10 +320,10 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size)
323 320
324 newAdaptor->resetChangedProperties(); 321 newAdaptor->resetChangedProperties();
325 foreach (const auto &processor, d->processors[bufferType]) { 322 foreach (const auto &processor, d->processors[bufferType]) {
326 processor->modifiedEntity(key, Storage::maxRevision(d->transaction) + 1, *current, *newAdaptor, d->transaction); 323 processor->modifiedEntity(key, DataStore::maxRevision(d->transaction) + 1, *current, *newAdaptor, d->transaction);
327 } 324 }
328 //The maxRevision may have changed meanwhile if the entity created sub-entities 325 //The maxRevision may have changed meanwhile if the entity created sub-entities
329 const qint64 newRevision = Storage::maxRevision(d->transaction) + 1; 326 const qint64 newRevision = DataStore::maxRevision(d->transaction) + 1;
330 327
331 // Add metadata buffer 328 // Add metadata buffer
332 flatbuffers::FlatBufferBuilder metadataFbb; 329 flatbuffers::FlatBufferBuilder metadataFbb;
@@ -369,7 +366,7 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size)
369 366
370 bool found = false; 367 bool found = false;
371 bool alreadyRemoved = false; 368 bool alreadyRemoved = false;
372 Storage::mainDatabase(d->transaction, bufferType) 369 DataStore::mainDatabase(d->transaction, bufferType)
373 .findLatest(key, 370 .findLatest(key,
374 [&found, &alreadyRemoved](const QByteArray &key, const QByteArray &data) -> bool { 371 [&found, &alreadyRemoved](const QByteArray &key, const QByteArray &data) -> bool {
375 auto entity = GetEntity(data.data()); 372 auto entity = GetEntity(data.data());
@@ -382,7 +379,7 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size)
382 } 379 }
383 return false; 380 return false;
384 }, 381 },
385 [](const Storage::Error &error) { SinkWarning() << "Failed to read old revision from storage: " << error.message; }); 382 [](const DataStore::Error &error) { SinkWarning() << "Failed to read old revision from storage: " << error.message; });
386 383
387 if (!found) { 384 if (!found) {
388 SinkWarning() << "Failed to find entity " << key; 385 SinkWarning() << "Failed to find entity " << key;
@@ -393,7 +390,7 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size)
393 return KAsync::error<qint64>(0); 390 return KAsync::error<qint64>(0);
394 } 391 }
395 392
396 const qint64 newRevision = Storage::maxRevision(d->transaction) + 1; 393 const qint64 newRevision = DataStore::maxRevision(d->transaction) + 1;
397 394
398 // Add metadata buffer 395 // Add metadata buffer
399 flatbuffers::FlatBufferBuilder metadataFbb; 396 flatbuffers::FlatBufferBuilder metadataFbb;
@@ -407,14 +404,14 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size)
407 flatbuffers::FlatBufferBuilder fbb; 404 flatbuffers::FlatBufferBuilder fbb;
408 EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), 0, 0, 0, 0); 405 EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), 0, 0, 0, 0);
409 406
410 auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceType, bufferType); 407 auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceContext.resourceType, bufferType);
411 if (!adaptorFactory) { 408 if (!adaptorFactory) {
412 SinkWarning() << "no adaptor factory for type " << bufferType; 409 SinkWarning() << "no adaptor factory for type " << bufferType;
413 return KAsync::error<qint64>(0); 410 return KAsync::error<qint64>(0);
414 } 411 }
415 412
416 QSharedPointer<ApplicationDomain::BufferAdaptor> current; 413 QSharedPointer<ApplicationDomain::BufferAdaptor> current;
417 Storage::mainDatabase(d->transaction, bufferType) 414 DataStore::mainDatabase(d->transaction, bufferType)
418 .findLatest(key, 415 .findLatest(key,
419 [this, bufferType, newRevision, adaptorFactory, key, &current](const QByteArray &, const QByteArray &data) -> bool { 416 [this, bufferType, newRevision, adaptorFactory, key, &current](const QByteArray &, const QByteArray &data) -> bool {
420 EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); 417 EntityBuffer buffer(const_cast<const char *>(data.data()), data.size());
@@ -425,7 +422,7 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size)
425 } 422 }
426 return false; 423 return false;
427 }, 424 },
428 [this](const Storage::Error &error) { SinkError() << "Failed to find value in pipeline: " << error.message; }); 425 [this](const DataStore::Error &error) { SinkError() << "Failed to find value in pipeline: " << error.message; });
429 426
430 d->storeNewRevision(newRevision, fbb, bufferType, key); 427 d->storeNewRevision(newRevision, fbb, bufferType, key);
431 428
@@ -439,10 +436,10 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size)
439void Pipeline::cleanupRevision(qint64 revision) 436void Pipeline::cleanupRevision(qint64 revision)
440{ 437{
441 d->revisionChanged = true; 438 d->revisionChanged = true;
442 const auto uid = Storage::getUidFromRevision(d->transaction, revision); 439 const auto uid = DataStore::getUidFromRevision(d->transaction, revision);
443 const auto bufferType = Storage::getTypeFromRevision(d->transaction, revision); 440 const auto bufferType = DataStore::getTypeFromRevision(d->transaction, revision);
444 SinkTrace() << "Cleaning up revision " << revision << uid << bufferType; 441 SinkTrace() << "Cleaning up revision " << revision << uid << bufferType;
445 Storage::mainDatabase(d->transaction, bufferType) 442 DataStore::mainDatabase(d->transaction, bufferType)
446 .scan(uid, 443 .scan(uid,
447 [&](const QByteArray &key, const QByteArray &data) -> bool { 444 [&](const QByteArray &key, const QByteArray &data) -> bool {
448 EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); 445 EntityBuffer buffer(const_cast<const char *>(data.data()), data.size());
@@ -453,20 +450,20 @@ void Pipeline::cleanupRevision(qint64 revision)
453 const qint64 rev = metadata->revision(); 450 const qint64 rev = metadata->revision();
454 // Remove old revisions, and the current if the entity has already been removed 451 // Remove old revisions, and the current if the entity has already been removed
455 if (rev < revision || metadata->operation() == Operation_Removal) { 452 if (rev < revision || metadata->operation() == Operation_Removal) {
456 Storage::removeRevision(d->transaction, rev); 453 DataStore::removeRevision(d->transaction, rev);
457 Storage::mainDatabase(d->transaction, bufferType).remove(key); 454 DataStore::mainDatabase(d->transaction, bufferType).remove(key);
458 } 455 }
459 } 456 }
460 457
461 return true; 458 return true;
462 }, 459 },
463 [](const Storage::Error &error) { SinkWarning() << "Error while reading: " << error.message; }, true); 460 [](const DataStore::Error &error) { SinkWarning() << "Error while reading: " << error.message; }, true);
464 Storage::setCleanedUpRevision(d->transaction, revision); 461 DataStore::setCleanedUpRevision(d->transaction, revision);
465} 462}
466 463
467qint64 Pipeline::cleanedUpRevision() 464qint64 Pipeline::cleanedUpRevision()
468{ 465{
469 return Storage::cleanedUpRevision(d->transaction); 466 return DataStore::cleanedUpRevision(d->transaction);
470} 467}
471 468
472class Preprocessor::Private { 469class Preprocessor::Private {
@@ -523,8 +520,6 @@ void Preprocessor::createEntity(const Sink::ApplicationDomain::ApplicationDomain
523 d->pipeline->newEntity(data, data.size()).exec(); 520 d->pipeline->newEntity(data, data.size()).exec();
524} 521}
525 522
526} // namespace Sink
527
528#pragma clang diagnostic push 523#pragma clang diagnostic push
529#pragma clang diagnostic ignored "-Wundefined-reinterpret-cast" 524#pragma clang diagnostic ignored "-Wundefined-reinterpret-cast"
530#include "moc_pipeline.cpp" 525#include "moc_pipeline.cpp"