summaryrefslogtreecommitdiffstats
path: root/common/pipeline.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'common/pipeline.cpp')
-rw-r--r--common/pipeline.cpp39
1 files changed, 18 insertions, 21 deletions
diff --git a/common/pipeline.cpp b/common/pipeline.cpp
index 63a60ce..9813f60 100644
--- a/common/pipeline.cpp
+++ b/common/pipeline.cpp
@@ -63,6 +63,7 @@ public:
63 63
64void Pipeline::Private::storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid) 64void Pipeline::Private::storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid)
65{ 65{
66 Trace() << "Committing new revision: " << uid << newRevision;
66 Storage::mainDatabase(transaction, bufferType) 67 Storage::mainDatabase(transaction, bufferType)
67 .write(Storage::assembleKey(uid, newRevision), BufferUtils::extractBuffer(fbb), 68 .write(Storage::assembleKey(uid, newRevision), BufferUtils::extractBuffer(fbb),
68 [uid, newRevision](const Storage::Error &error) { Warning() << "Failed to write entity" << uid << newRevision; }); 69 [uid, newRevision](const Storage::Error &error) { Warning() << "Failed to write entity" << uid << newRevision; });
@@ -121,7 +122,7 @@ void Pipeline::commit()
121 // } 122 // }
122 const auto revision = Storage::maxRevision(d->transaction); 123 const auto revision = Storage::maxRevision(d->transaction);
123 const auto elapsed = d->transactionTime.elapsed(); 124 const auto elapsed = d->transactionTime.elapsed();
124 Trace() << "Committing revision: " << revision << ":" << d->transactionItemCount << " items in: " << Log::TraceTime(elapsed) << " " 125 Log() << "Committing revision: " << revision << ":" << d->transactionItemCount << " items in: " << Log::TraceTime(elapsed) << " "
125 << (double)elapsed / (double)qMax(d->transactionItemCount, 1) << "[ms/item]"; 126 << (double)elapsed / (double)qMax(d->transactionItemCount, 1) << "[ms/item]";
126 if (d->transaction) { 127 if (d->transaction) {
127 d->transaction.commit(); 128 d->transaction.commit();
@@ -145,7 +146,6 @@ Storage &Pipeline::storage() const
145 146
146KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size) 147KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size)
147{ 148{
148 Trace() << "Pipeline: New Entity";
149 d->transactionItemCount++; 149 d->transactionItemCount++;
150 150
151 { 151 {
@@ -159,19 +159,6 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size)
159 159
160 const bool replayToSource = createEntity->replayToSource(); 160 const bool replayToSource = createEntity->replayToSource();
161 const QByteArray bufferType = QByteArray(reinterpret_cast<char const *>(createEntity->domainType()->Data()), createEntity->domainType()->size()); 161 const QByteArray bufferType = QByteArray(reinterpret_cast<char const *>(createEntity->domainType()->Data()), createEntity->domainType()->size());
162 {
163 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(createEntity->delta()->Data()), createEntity->delta()->size());
164 if (!VerifyEntityBuffer(verifyer)) {
165 Warning() << "invalid buffer, not an entity buffer";
166 return KAsync::error<qint64>(0);
167 }
168 }
169 auto entity = GetEntity(createEntity->delta()->Data());
170 if (!entity->resource()->size() && !entity->local()->size()) {
171 Warning() << "No local and no resource buffer while trying to create entity.";
172 return KAsync::error<qint64>(0);
173 }
174
175 QByteArray key; 162 QByteArray key;
176 if (createEntity->entityId()) { 163 if (createEntity->entityId()) {
177 key = QByteArray(reinterpret_cast<char const *>(createEntity->entityId()->Data()), createEntity->entityId()->size()); 164 key = QByteArray(reinterpret_cast<char const *>(createEntity->entityId()->Data()), createEntity->entityId()->size());
@@ -184,8 +171,22 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size)
184 if (key.isEmpty()) { 171 if (key.isEmpty()) {
185 key = Sink::Storage::generateUid(); 172 key = Sink::Storage::generateUid();
186 } 173 }
174 Log() << "New Entity. Type: " << bufferType << "uid: "<< key << " replayToSource: " << replayToSource;
187 Q_ASSERT(!key.isEmpty()); 175 Q_ASSERT(!key.isEmpty());
188 176
177 {
178 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(createEntity->delta()->Data()), createEntity->delta()->size());
179 if (!VerifyEntityBuffer(verifyer)) {
180 Warning() << "invalid buffer, not an entity buffer";
181 return KAsync::error<qint64>(0);
182 }
183 }
184 auto entity = GetEntity(createEntity->delta()->Data());
185 if (!entity->resource()->size() && !entity->local()->size()) {
186 Warning() << "No local and no resource buffer while trying to create entity.";
187 return KAsync::error<qint64>(0);
188 }
189
189 auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceType, bufferType); 190 auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceType, bufferType);
190 if (!adaptorFactory) { 191 if (!adaptorFactory) {
191 Warning() << "no adaptor factory for type " << bufferType; 192 Warning() << "no adaptor factory for type " << bufferType;
@@ -214,13 +215,11 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size)
214 215
215 d->storeNewRevision(newRevision, fbb, bufferType, key); 216 d->storeNewRevision(newRevision, fbb, bufferType, key);
216 217
217 Log() << "Pipeline: wrote entity: " << key << newRevision << bufferType;
218 return KAsync::start<qint64>([newRevision]() { return newRevision; }); 218 return KAsync::start<qint64>([newRevision]() { return newRevision; });
219} 219}
220 220
221KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size) 221KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size)
222{ 222{
223 Trace() << "Pipeline: Modified Entity";
224 d->transactionItemCount++; 223 d->transactionItemCount++;
225 224
226 { 225 {
@@ -240,9 +239,9 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size)
240 } 239 }
241 const qint64 baseRevision = modifyEntity->revision(); 240 const qint64 baseRevision = modifyEntity->revision();
242 const bool replayToSource = modifyEntity->replayToSource(); 241 const bool replayToSource = modifyEntity->replayToSource();
243 // TODO rename modifyEntity->domainType to bufferType
244 const QByteArray bufferType = QByteArray(reinterpret_cast<char const *>(modifyEntity->domainType()->Data()), modifyEntity->domainType()->size()); 242 const QByteArray bufferType = QByteArray(reinterpret_cast<char const *>(modifyEntity->domainType()->Data()), modifyEntity->domainType()->size());
245 const QByteArray key = QByteArray(reinterpret_cast<char const *>(modifyEntity->entityId()->Data()), modifyEntity->entityId()->size()); 243 const QByteArray key = QByteArray(reinterpret_cast<char const *>(modifyEntity->entityId()->Data()), modifyEntity->entityId()->size());
244 Log() << "Modified Entity. Type: " << bufferType << "uid: "<< key << " replayToSource: " << replayToSource;
246 if (bufferType.isEmpty() || key.isEmpty()) { 245 if (bufferType.isEmpty() || key.isEmpty()) {
247 Warning() << "entity type or key " << bufferType << key; 246 Warning() << "entity type or key " << bufferType << key;
248 return KAsync::error<qint64>(0); 247 return KAsync::error<qint64>(0);
@@ -328,13 +327,11 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size)
328 adaptorFactory->createBuffer(newAdaptor, fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize()); 327 adaptorFactory->createBuffer(newAdaptor, fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize());
329 328
330 d->storeNewRevision(newRevision, fbb, bufferType, key); 329 d->storeNewRevision(newRevision, fbb, bufferType, key);
331 Log() << "Pipeline: modified entity: " << key << newRevision << bufferType;
332 return KAsync::start<qint64>([newRevision]() { return newRevision; }); 330 return KAsync::start<qint64>([newRevision]() { return newRevision; });
333} 331}
334 332
335KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size) 333KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size)
336{ 334{
337 Trace() << "Pipeline: Deleted Entity";
338 d->transactionItemCount++; 335 d->transactionItemCount++;
339 336
340 { 337 {
@@ -349,6 +346,7 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size)
349 const bool replayToSource = deleteEntity->replayToSource(); 346 const bool replayToSource = deleteEntity->replayToSource();
350 const QByteArray bufferType = QByteArray(reinterpret_cast<char const *>(deleteEntity->domainType()->Data()), deleteEntity->domainType()->size()); 347 const QByteArray bufferType = QByteArray(reinterpret_cast<char const *>(deleteEntity->domainType()->Data()), deleteEntity->domainType()->size());
351 const QByteArray key = QByteArray(reinterpret_cast<char const *>(deleteEntity->entityId()->Data()), deleteEntity->entityId()->size()); 348 const QByteArray key = QByteArray(reinterpret_cast<char const *>(deleteEntity->entityId()->Data()), deleteEntity->entityId()->size());
349 Log() << "Deleted Entity. Type: " << bufferType << "uid: "<< key << " replayToSource: " << replayToSource;
352 350
353 bool found = false; 351 bool found = false;
354 bool alreadyRemoved = false; 352 bool alreadyRemoved = false;
@@ -411,7 +409,6 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size)
411 [this](const Storage::Error &error) { ErrorMsg() << "Failed to find value in pipeline: " << error.message; }); 409 [this](const Storage::Error &error) { ErrorMsg() << "Failed to find value in pipeline: " << error.message; });
412 410
413 d->storeNewRevision(newRevision, fbb, bufferType, key); 411 d->storeNewRevision(newRevision, fbb, bufferType, key);
414 Log() << "Pipeline: deleted entity: " << newRevision;
415 412
416 for (auto processor : d->processors[bufferType]) { 413 for (auto processor : d->processors[bufferType]) {
417 processor->deletedEntity(key, newRevision, *current, d->transaction); 414 processor->deletedEntity(key, newRevision, *current, d->transaction);