summaryrefslogtreecommitdiffstats
path: root/common/pipeline.cpp
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2016-02-01 15:18:13 +0100
committerChristian Mollekopf <chrigi_1@fastmail.fm>2016-02-01 15:18:13 +0100
commit9809045a2a71cf509857a762fac62e81c25de856 (patch)
treeb74270913a41f9d9f3d243f7c8cc127dbe0e3c00 /common/pipeline.cpp
parent4aa007e5cf59288e7d548ea3f613a10ac0a6057e (diff)
downloadsink-9809045a2a71cf509857a762fac62e81c25de856.tar.gz
sink-9809045a2a71cf509857a762fac62e81c25de856.zip
Centralized main database name
Diffstat (limited to 'common/pipeline.cpp')
-rw-r--r--common/pipeline.cpp18
1 files changed, 9 insertions, 9 deletions
diff --git a/common/pipeline.cpp b/common/pipeline.cpp
index b7ca638..f861ab6 100644
--- a/common/pipeline.cpp
+++ b/common/pipeline.cpp
@@ -120,7 +120,7 @@ Storage &Pipeline::storage() const
120 120
121void Pipeline::storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid) 121void Pipeline::storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid)
122{ 122{
123 d->transaction.openDatabase(bufferType + ".main").write(Sink::Storage::assembleKey(uid, newRevision), BufferUtils::extractBuffer(fbb), 123 Storage::mainDatabase(d->transaction, bufferType).write(Sink::Storage::assembleKey(uid, newRevision), BufferUtils::extractBuffer(fbb),
124 [](const Sink::Storage::Error &error) { 124 [](const Sink::Storage::Error &error) {
125 Warning() << "Failed to write entity"; 125 Warning() << "Failed to write entity";
126 } 126 }
@@ -161,7 +161,7 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size)
161 QByteArray key; 161 QByteArray key;
162 if (createEntity->entityId()) { 162 if (createEntity->entityId()) {
163 key = QByteArray(reinterpret_cast<char const*>(createEntity->entityId()->Data()), createEntity->entityId()->size()); 163 key = QByteArray(reinterpret_cast<char const*>(createEntity->entityId()->Data()), createEntity->entityId()->size());
164 if (d->transaction.openDatabase(bufferType + ".main").contains(key)) { 164 if (Storage::mainDatabase(d->transaction, bufferType).contains(key)) {
165 ErrorMsg() << "An entity with this id already exists: " << key; 165 ErrorMsg() << "An entity with this id already exists: " << key;
166 return KAsync::error<qint64>(0); 166 return KAsync::error<qint64>(0);
167 } 167 }
@@ -194,7 +194,7 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size)
194 } 194 }
195 195
196 Log() << "Pipeline: wrote entity: " << key << newRevision << bufferType; 196 Log() << "Pipeline: wrote entity: " << key << newRevision << bufferType;
197 d->transaction.openDatabase(bufferType + ".main").scan(Sink::Storage::assembleKey(key, newRevision), [this, bufferType, newRevision, adaptorFactory, key](const QByteArray &, const QByteArray &value) -> bool { 197 Storage::mainDatabase(d->transaction, bufferType).scan(Sink::Storage::assembleKey(key, newRevision), [this, bufferType, newRevision, adaptorFactory, key](const QByteArray &, const QByteArray &value) -> bool {
198 auto entity = Sink::GetEntity(value); 198 auto entity = Sink::GetEntity(value);
199 auto adaptor = adaptorFactory->createAdaptor(*entity); 199 auto adaptor = adaptorFactory->createAdaptor(*entity);
200 for (auto processor : d->processors[bufferType]) { 200 for (auto processor : d->processors[bufferType]) {
@@ -254,7 +254,7 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size)
254 auto diff = adaptorFactory->createAdaptor(*diffEntity); 254 auto diff = adaptorFactory->createAdaptor(*diffEntity);
255 255
256 QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> current; 256 QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> current;
257 d->transaction.openDatabase(bufferType + ".main").findLatest(key, [&current, adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool { 257 Storage::mainDatabase(d->transaction, bufferType).findLatest(key, [&current, adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool {
258 Sink::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); 258 Sink::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size());
259 if (!buffer.isValid()) { 259 if (!buffer.isValid()) {
260 Warning() << "Read invalid buffer from disk"; 260 Warning() << "Read invalid buffer from disk";
@@ -311,7 +311,7 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size)
311 311
312 storeNewRevision(newRevision, fbb, bufferType, key); 312 storeNewRevision(newRevision, fbb, bufferType, key);
313 Log() << "Pipeline: modified entity: " << key << newRevision << bufferType; 313 Log() << "Pipeline: modified entity: " << key << newRevision << bufferType;
314 d->transaction.openDatabase(bufferType + ".main").scan(Sink::Storage::assembleKey(key, newRevision), [this, bufferType, newRevision, adaptorFactory, current, key](const QByteArray &k, const QByteArray &value) -> bool { 314 Storage::mainDatabase(d->transaction, bufferType).scan(Sink::Storage::assembleKey(key, newRevision), [this, bufferType, newRevision, adaptorFactory, current, key](const QByteArray &k, const QByteArray &value) -> bool {
315 if (value.isEmpty()) { 315 if (value.isEmpty()) {
316 ErrorMsg() << "Read buffer is empty."; 316 ErrorMsg() << "Read buffer is empty.";
317 } 317 }
@@ -348,7 +348,7 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size)
348 348
349 bool found = false; 349 bool found = false;
350 bool alreadyRemoved = false; 350 bool alreadyRemoved = false;
351 d->transaction.openDatabase(bufferType + ".main").findLatest(key, [&found, &alreadyRemoved](const QByteArray &key, const QByteArray &data) -> bool { 351 Storage::mainDatabase(d->transaction, bufferType).findLatest(key, [&found, &alreadyRemoved](const QByteArray &key, const QByteArray &data) -> bool {
352 auto entity = Sink::GetEntity(data.data()); 352 auto entity = Sink::GetEntity(data.data());
353 if (entity && entity->metadata()) { 353 if (entity && entity->metadata()) {
354 auto metadata = Sink::GetMetadata(entity->metadata()->Data()); 354 auto metadata = Sink::GetMetadata(entity->metadata()->Data());
@@ -394,7 +394,7 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size)
394 } 394 }
395 395
396 QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> current; 396 QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> current;
397 d->transaction.openDatabase(bufferType + ".main").findLatest(key, [this, bufferType, newRevision, adaptorFactory, key, &current](const QByteArray &, const QByteArray &data) -> bool { 397 Storage::mainDatabase(d->transaction, bufferType).findLatest(key, [this, bufferType, newRevision, adaptorFactory, key, &current](const QByteArray &, const QByteArray &data) -> bool {
398 Sink::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); 398 Sink::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size());
399 if (!buffer.isValid()) { 399 if (!buffer.isValid()) {
400 Warning() << "Read invalid buffer from disk"; 400 Warning() << "Read invalid buffer from disk";
@@ -423,7 +423,7 @@ void Pipeline::cleanupRevision(qint64 revision)
423 const auto uid = Sink::Storage::getUidFromRevision(d->transaction, revision); 423 const auto uid = Sink::Storage::getUidFromRevision(d->transaction, revision);
424 const auto bufferType = Sink::Storage::getTypeFromRevision(d->transaction, revision); 424 const auto bufferType = Sink::Storage::getTypeFromRevision(d->transaction, revision);
425 Trace() << "Cleaning up revision " << revision << uid << bufferType; 425 Trace() << "Cleaning up revision " << revision << uid << bufferType;
426 d->transaction.openDatabase(bufferType + ".main").scan(uid, [&](const QByteArray &key, const QByteArray &data) -> bool { 426 Storage::mainDatabase(d->transaction, bufferType).scan(uid, [&](const QByteArray &key, const QByteArray &data) -> bool {
427 Sink::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); 427 Sink::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size());
428 if (!buffer.isValid()) { 428 if (!buffer.isValid()) {
429 Warning() << "Read invalid buffer from disk"; 429 Warning() << "Read invalid buffer from disk";
@@ -433,7 +433,7 @@ void Pipeline::cleanupRevision(qint64 revision)
433 //Remove old revisions, and the current if the entity has already been removed 433 //Remove old revisions, and the current if the entity has already been removed
434 if (rev < revision || metadata->operation() == Sink::Operation_Removal) { 434 if (rev < revision || metadata->operation() == Sink::Operation_Removal) {
435 Sink::Storage::removeRevision(d->transaction, rev); 435 Sink::Storage::removeRevision(d->transaction, rev);
436 d->transaction.openDatabase(bufferType + ".main").remove(key); 436 Storage::mainDatabase(d->transaction, bufferType).remove(key);
437 } 437 }
438 } 438 }
439 439