diff options
Diffstat (limited to 'common/pipeline.cpp')
-rw-r--r-- | common/pipeline.cpp | 99 |
1 files changed, 47 insertions, 52 deletions
diff --git a/common/pipeline.cpp b/common/pipeline.cpp index ce864f7..e257857 100644 --- a/common/pipeline.cpp +++ b/common/pipeline.cpp | |||
@@ -40,45 +40,45 @@ | |||
40 | 40 | ||
41 | SINK_DEBUG_AREA("pipeline") | 41 | SINK_DEBUG_AREA("pipeline") |
42 | 42 | ||
43 | namespace Sink { | 43 | using namespace Sink; |
44 | using namespace Sink::Storage; | ||
44 | 45 | ||
45 | class Pipeline::Private | 46 | class Pipeline::Private |
46 | { | 47 | { |
47 | public: | 48 | public: |
48 | Private(const QString &resourceName) : storage(Sink::storageLocation(), resourceName, Storage::ReadWrite), revisionChanged(false), resourceInstanceIdentifier(resourceName.toUtf8()) | 49 | Private(const ResourceContext &context) : resourceContext(context), storage(Sink::storageLocation(), context.instanceId(), DataStore::ReadWrite), revisionChanged(false) |
49 | { | 50 | { |
50 | } | 51 | } |
51 | 52 | ||
52 | Storage storage; | 53 | ResourceContext resourceContext; |
53 | Storage::Transaction transaction; | 54 | DataStore storage; |
55 | DataStore::Transaction transaction; | ||
54 | QHash<QString, QVector<QSharedPointer<Preprocessor>>> processors; | 56 | QHash<QString, QVector<QSharedPointer<Preprocessor>>> processors; |
55 | bool revisionChanged; | 57 | bool revisionChanged; |
56 | void storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid); | 58 | void storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid); |
57 | QTime transactionTime; | 59 | QTime transactionTime; |
58 | int transactionItemCount; | 60 | int transactionItemCount; |
59 | QByteArray resourceType; | ||
60 | QByteArray resourceInstanceIdentifier; | ||
61 | }; | 61 | }; |
62 | 62 | ||
63 | void Pipeline::Private::storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid) | 63 | void Pipeline::Private::storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid) |
64 | { | 64 | { |
65 | SinkTrace() << "Committing new revision: " << uid << newRevision; | 65 | SinkTrace() << "Committing new revision: " << uid << newRevision; |
66 | Storage::mainDatabase(transaction, bufferType) | 66 | DataStore::mainDatabase(transaction, bufferType) |
67 | .write(Storage::assembleKey(uid, newRevision), BufferUtils::extractBuffer(fbb), | 67 | .write(DataStore::assembleKey(uid, newRevision), BufferUtils::extractBuffer(fbb), |
68 | [uid, newRevision](const Storage::Error &error) { SinkWarning() << "Failed to write entity" << uid << newRevision; }); | 68 | [uid, newRevision](const DataStore::Error &error) { SinkWarning() << "Failed to write entity" << uid << newRevision; }); |
69 | revisionChanged = true; | 69 | revisionChanged = true; |
70 | Storage::setMaxRevision(transaction, newRevision); | 70 | DataStore::setMaxRevision(transaction, newRevision); |
71 | Storage::recordRevision(transaction, newRevision, uid, bufferType); | 71 | DataStore::recordRevision(transaction, newRevision, uid, bufferType); |
72 | } | 72 | } |
73 | 73 | ||
74 | 74 | ||
75 | Pipeline::Pipeline(const QString &resourceName, QObject *parent) : QObject(parent), d(new Private(resourceName)) | 75 | Pipeline::Pipeline(const ResourceContext &context) : QObject(nullptr), d(new Private(context)) |
76 | { | 76 | { |
77 | } | 77 | } |
78 | 78 | ||
79 | Pipeline::~Pipeline() | 79 | Pipeline::~Pipeline() |
80 | { | 80 | { |
81 | d->transaction = Storage::Transaction(); | 81 | d->transaction = DataStore::Transaction(); |
82 | } | 82 | } |
83 | 83 | ||
84 | void Pipeline::setPreprocessors(const QString &entityType, const QVector<Preprocessor *> &processors) | 84 | void Pipeline::setPreprocessors(const QString &entityType, const QVector<Preprocessor *> &processors) |
@@ -86,16 +86,11 @@ void Pipeline::setPreprocessors(const QString &entityType, const QVector<Preproc | |||
86 | auto &list = d->processors[entityType]; | 86 | auto &list = d->processors[entityType]; |
87 | list.clear(); | 87 | list.clear(); |
88 | for (auto p : processors) { | 88 | for (auto p : processors) { |
89 | p->setup(d->resourceType, d->resourceInstanceIdentifier, this); | 89 | p->setup(d->resourceContext.resourceType, d->resourceContext.instanceId(), this); |
90 | list.append(QSharedPointer<Preprocessor>(p)); | 90 | list.append(QSharedPointer<Preprocessor>(p)); |
91 | } | 91 | } |
92 | } | 92 | } |
93 | 93 | ||
94 | void Pipeline::setResourceType(const QByteArray &resourceType) | ||
95 | { | ||
96 | d->resourceType = resourceType; | ||
97 | } | ||
98 | |||
99 | void Pipeline::startTransaction() | 94 | void Pipeline::startTransaction() |
100 | { | 95 | { |
101 | // TODO call for all types | 96 | // TODO call for all types |
@@ -109,7 +104,7 @@ void Pipeline::startTransaction() | |||
109 | SinkTrace() << "Starting transaction."; | 104 | SinkTrace() << "Starting transaction."; |
110 | d->transactionTime.start(); | 105 | d->transactionTime.start(); |
111 | d->transactionItemCount = 0; | 106 | d->transactionItemCount = 0; |
112 | d->transaction = storage().createTransaction(Storage::ReadWrite, [](const Sink::Storage::Error &error) { | 107 | d->transaction = storage().createTransaction(DataStore::ReadWrite, [](const DataStore::Error &error) { |
113 | SinkWarning() << error.message; | 108 | SinkWarning() << error.message; |
114 | }); | 109 | }); |
115 | 110 | ||
@@ -119,7 +114,7 @@ void Pipeline::startTransaction() | |||
119 | if (d->storage.exists()) { | 114 | if (d->storage.exists()) { |
120 | while (!d->transaction.validateNamedDatabases()) { | 115 | while (!d->transaction.validateNamedDatabases()) { |
121 | SinkWarning() << "Opened an invalid transaction!!!!!!"; | 116 | SinkWarning() << "Opened an invalid transaction!!!!!!"; |
122 | d->transaction = storage().createTransaction(Storage::ReadWrite, [](const Sink::Storage::Error &error) { | 117 | d->transaction = storage().createTransaction(DataStore::ReadWrite, [](const DataStore::Error &error) { |
123 | SinkWarning() << error.message; | 118 | SinkWarning() << error.message; |
124 | }); | 119 | }); |
125 | } | 120 | } |
@@ -135,29 +130,29 @@ void Pipeline::commit() | |||
135 | // } | 130 | // } |
136 | if (!d->revisionChanged) { | 131 | if (!d->revisionChanged) { |
137 | d->transaction.abort(); | 132 | d->transaction.abort(); |
138 | d->transaction = Storage::Transaction(); | 133 | d->transaction = DataStore::Transaction(); |
139 | return; | 134 | return; |
140 | } | 135 | } |
141 | const auto revision = Storage::maxRevision(d->transaction); | 136 | const auto revision = DataStore::maxRevision(d->transaction); |
142 | const auto elapsed = d->transactionTime.elapsed(); | 137 | const auto elapsed = d->transactionTime.elapsed(); |
143 | SinkLog() << "Committing revision: " << revision << ":" << d->transactionItemCount << " items in: " << Log::TraceTime(elapsed) << " " | 138 | SinkLog() << "Committing revision: " << revision << ":" << d->transactionItemCount << " items in: " << Log::TraceTime(elapsed) << " " |
144 | << (double)elapsed / (double)qMax(d->transactionItemCount, 1) << "[ms/item]"; | 139 | << (double)elapsed / (double)qMax(d->transactionItemCount, 1) << "[ms/item]"; |
145 | if (d->transaction) { | 140 | if (d->transaction) { |
146 | d->transaction.commit(); | 141 | d->transaction.commit(); |
147 | } | 142 | } |
148 | d->transaction = Storage::Transaction(); | 143 | d->transaction = DataStore::Transaction(); |
149 | if (d->revisionChanged) { | 144 | if (d->revisionChanged) { |
150 | d->revisionChanged = false; | 145 | d->revisionChanged = false; |
151 | emit revisionUpdated(revision); | 146 | emit revisionUpdated(revision); |
152 | } | 147 | } |
153 | } | 148 | } |
154 | 149 | ||
155 | Storage::Transaction &Pipeline::transaction() | 150 | DataStore::Transaction &Pipeline::transaction() |
156 | { | 151 | { |
157 | return d->transaction; | 152 | return d->transaction; |
158 | } | 153 | } |
159 | 154 | ||
160 | Storage &Pipeline::storage() const | 155 | DataStore &Pipeline::storage() const |
161 | { | 156 | { |
162 | return d->storage; | 157 | return d->storage; |
163 | } | 158 | } |
@@ -180,14 +175,14 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size) | |||
180 | QByteArray key; | 175 | QByteArray key; |
181 | if (createEntity->entityId()) { | 176 | if (createEntity->entityId()) { |
182 | key = QByteArray(reinterpret_cast<char const *>(createEntity->entityId()->Data()), createEntity->entityId()->size()); | 177 | key = QByteArray(reinterpret_cast<char const *>(createEntity->entityId()->Data()), createEntity->entityId()->size()); |
183 | if (Storage::mainDatabase(d->transaction, bufferType).contains(key)) { | 178 | if (DataStore::mainDatabase(d->transaction, bufferType).contains(key)) { |
184 | SinkError() << "An entity with this id already exists: " << key; | 179 | SinkError() << "An entity with this id already exists: " << key; |
185 | return KAsync::error<qint64>(0); | 180 | return KAsync::error<qint64>(0); |
186 | } | 181 | } |
187 | } | 182 | } |
188 | 183 | ||
189 | if (key.isEmpty()) { | 184 | if (key.isEmpty()) { |
190 | key = Sink::Storage::generateUid(); | 185 | key = DataStore::generateUid(); |
191 | } | 186 | } |
192 | SinkTrace() << "New Entity. Type: " << bufferType << "uid: "<< key << " replayToSource: " << replayToSource; | 187 | SinkTrace() << "New Entity. Type: " << bufferType << "uid: "<< key << " replayToSource: " << replayToSource; |
193 | Q_ASSERT(!key.isEmpty()); | 188 | Q_ASSERT(!key.isEmpty()); |
@@ -205,7 +200,7 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size) | |||
205 | return KAsync::error<qint64>(0); | 200 | return KAsync::error<qint64>(0); |
206 | } | 201 | } |
207 | 202 | ||
208 | auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceType, bufferType); | 203 | auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceContext.resourceType, bufferType); |
209 | if (!adaptorFactory) { | 204 | if (!adaptorFactory) { |
210 | SinkWarning() << "no adaptor factory for type " << bufferType; | 205 | SinkWarning() << "no adaptor factory for type " << bufferType; |
211 | return KAsync::error<qint64>(0); | 206 | return KAsync::error<qint64>(0); |
@@ -214,10 +209,10 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size) | |||
214 | auto adaptor = adaptorFactory->createAdaptor(*entity); | 209 | auto adaptor = adaptorFactory->createAdaptor(*entity); |
215 | auto memoryAdaptor = QSharedPointer<Sink::ApplicationDomain::MemoryBufferAdaptor>::create(*(adaptor), adaptor->availableProperties()); | 210 | auto memoryAdaptor = QSharedPointer<Sink::ApplicationDomain::MemoryBufferAdaptor>::create(*(adaptor), adaptor->availableProperties()); |
216 | foreach (const auto &processor, d->processors[bufferType]) { | 211 | foreach (const auto &processor, d->processors[bufferType]) { |
217 | processor->newEntity(key, Storage::maxRevision(d->transaction) + 1, *memoryAdaptor, d->transaction); | 212 | processor->newEntity(key, DataStore::maxRevision(d->transaction) + 1, *memoryAdaptor, d->transaction); |
218 | } | 213 | } |
219 | //The maxRevision may have changed meanwhile if the entity created sub-entities | 214 | //The maxRevision may have changed meanwhile if the entity created sub-entities |
220 | const qint64 newRevision = Storage::maxRevision(d->transaction) + 1; | 215 | const qint64 newRevision = DataStore::maxRevision(d->transaction) + 1; |
221 | 216 | ||
222 | // Add metadata buffer | 217 | // Add metadata buffer |
223 | flatbuffers::FlatBufferBuilder metadataFbb; | 218 | flatbuffers::FlatBufferBuilder metadataFbb; |
@@ -233,6 +228,8 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size) | |||
233 | 228 | ||
234 | d->storeNewRevision(newRevision, fbb, bufferType, key); | 229 | d->storeNewRevision(newRevision, fbb, bufferType, key); |
235 | 230 | ||
231 | //FIXME entityStore->create(bufferType, memoryAdaptor, replayToSource) | ||
232 | |||
236 | return KAsync::value(newRevision); | 233 | return KAsync::value(newRevision); |
237 | } | 234 | } |
238 | 235 | ||
@@ -273,7 +270,7 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size) | |||
273 | } | 270 | } |
274 | 271 | ||
275 | // TODO use only readPropertyMapper and writePropertyMapper | 272 | // TODO use only readPropertyMapper and writePropertyMapper |
276 | auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceType, bufferType); | 273 | auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceContext.resourceType, bufferType); |
277 | if (!adaptorFactory) { | 274 | if (!adaptorFactory) { |
278 | SinkWarning() << "no adaptor factory for type " << bufferType; | 275 | SinkWarning() << "no adaptor factory for type " << bufferType; |
279 | return KAsync::error<qint64>(0); | 276 | return KAsync::error<qint64>(0); |
@@ -284,7 +281,7 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size) | |||
284 | auto diff = adaptorFactory->createAdaptor(*diffEntity); | 281 | auto diff = adaptorFactory->createAdaptor(*diffEntity); |
285 | 282 | ||
286 | QSharedPointer<ApplicationDomain::BufferAdaptor> current; | 283 | QSharedPointer<ApplicationDomain::BufferAdaptor> current; |
287 | Storage::mainDatabase(d->transaction, bufferType) | 284 | DataStore::mainDatabase(d->transaction, bufferType) |
288 | .findLatest(key, | 285 | .findLatest(key, |
289 | [¤t, adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool { | 286 | [¤t, adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool { |
290 | EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); | 287 | EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); |
@@ -295,7 +292,7 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size) | |||
295 | } | 292 | } |
296 | return false; | 293 | return false; |
297 | }, | 294 | }, |
298 | [baseRevision](const Storage::Error &error) { SinkWarning() << "Failed to read old revision from storage: " << error.message << "Revision: " << baseRevision; }); | 295 | [baseRevision](const DataStore::Error &error) { SinkWarning() << "Failed to read old revision from storage: " << error.message << "Revision: " << baseRevision; }); |
299 | 296 | ||
300 | if (!current) { | 297 | if (!current) { |
301 | SinkWarning() << "Failed to read local value " << key; | 298 | SinkWarning() << "Failed to read local value " << key; |
@@ -323,10 +320,10 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size) | |||
323 | 320 | ||
324 | newAdaptor->resetChangedProperties(); | 321 | newAdaptor->resetChangedProperties(); |
325 | foreach (const auto &processor, d->processors[bufferType]) { | 322 | foreach (const auto &processor, d->processors[bufferType]) { |
326 | processor->modifiedEntity(key, Storage::maxRevision(d->transaction) + 1, *current, *newAdaptor, d->transaction); | 323 | processor->modifiedEntity(key, DataStore::maxRevision(d->transaction) + 1, *current, *newAdaptor, d->transaction); |
327 | } | 324 | } |
328 | //The maxRevision may have changed meanwhile if the entity created sub-entities | 325 | //The maxRevision may have changed meanwhile if the entity created sub-entities |
329 | const qint64 newRevision = Storage::maxRevision(d->transaction) + 1; | 326 | const qint64 newRevision = DataStore::maxRevision(d->transaction) + 1; |
330 | 327 | ||
331 | // Add metadata buffer | 328 | // Add metadata buffer |
332 | flatbuffers::FlatBufferBuilder metadataFbb; | 329 | flatbuffers::FlatBufferBuilder metadataFbb; |
@@ -369,7 +366,7 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size) | |||
369 | 366 | ||
370 | bool found = false; | 367 | bool found = false; |
371 | bool alreadyRemoved = false; | 368 | bool alreadyRemoved = false; |
372 | Storage::mainDatabase(d->transaction, bufferType) | 369 | DataStore::mainDatabase(d->transaction, bufferType) |
373 | .findLatest(key, | 370 | .findLatest(key, |
374 | [&found, &alreadyRemoved](const QByteArray &key, const QByteArray &data) -> bool { | 371 | [&found, &alreadyRemoved](const QByteArray &key, const QByteArray &data) -> bool { |
375 | auto entity = GetEntity(data.data()); | 372 | auto entity = GetEntity(data.data()); |
@@ -382,7 +379,7 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size) | |||
382 | } | 379 | } |
383 | return false; | 380 | return false; |
384 | }, | 381 | }, |
385 | [](const Storage::Error &error) { SinkWarning() << "Failed to read old revision from storage: " << error.message; }); | 382 | [](const DataStore::Error &error) { SinkWarning() << "Failed to read old revision from storage: " << error.message; }); |
386 | 383 | ||
387 | if (!found) { | 384 | if (!found) { |
388 | SinkWarning() << "Failed to find entity " << key; | 385 | SinkWarning() << "Failed to find entity " << key; |
@@ -393,7 +390,7 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size) | |||
393 | return KAsync::error<qint64>(0); | 390 | return KAsync::error<qint64>(0); |
394 | } | 391 | } |
395 | 392 | ||
396 | const qint64 newRevision = Storage::maxRevision(d->transaction) + 1; | 393 | const qint64 newRevision = DataStore::maxRevision(d->transaction) + 1; |
397 | 394 | ||
398 | // Add metadata buffer | 395 | // Add metadata buffer |
399 | flatbuffers::FlatBufferBuilder metadataFbb; | 396 | flatbuffers::FlatBufferBuilder metadataFbb; |
@@ -407,14 +404,14 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size) | |||
407 | flatbuffers::FlatBufferBuilder fbb; | 404 | flatbuffers::FlatBufferBuilder fbb; |
408 | EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), 0, 0, 0, 0); | 405 | EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), 0, 0, 0, 0); |
409 | 406 | ||
410 | auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceType, bufferType); | 407 | auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceContext.resourceType, bufferType); |
411 | if (!adaptorFactory) { | 408 | if (!adaptorFactory) { |
412 | SinkWarning() << "no adaptor factory for type " << bufferType; | 409 | SinkWarning() << "no adaptor factory for type " << bufferType; |
413 | return KAsync::error<qint64>(0); | 410 | return KAsync::error<qint64>(0); |
414 | } | 411 | } |
415 | 412 | ||
416 | QSharedPointer<ApplicationDomain::BufferAdaptor> current; | 413 | QSharedPointer<ApplicationDomain::BufferAdaptor> current; |
417 | Storage::mainDatabase(d->transaction, bufferType) | 414 | DataStore::mainDatabase(d->transaction, bufferType) |
418 | .findLatest(key, | 415 | .findLatest(key, |
419 | [this, bufferType, newRevision, adaptorFactory, key, ¤t](const QByteArray &, const QByteArray &data) -> bool { | 416 | [this, bufferType, newRevision, adaptorFactory, key, ¤t](const QByteArray &, const QByteArray &data) -> bool { |
420 | EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); | 417 | EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); |
@@ -425,7 +422,7 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size) | |||
425 | } | 422 | } |
426 | return false; | 423 | return false; |
427 | }, | 424 | }, |
428 | [this](const Storage::Error &error) { SinkError() << "Failed to find value in pipeline: " << error.message; }); | 425 | [this](const DataStore::Error &error) { SinkError() << "Failed to find value in pipeline: " << error.message; }); |
429 | 426 | ||
430 | d->storeNewRevision(newRevision, fbb, bufferType, key); | 427 | d->storeNewRevision(newRevision, fbb, bufferType, key); |
431 | 428 | ||
@@ -439,10 +436,10 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size) | |||
439 | void Pipeline::cleanupRevision(qint64 revision) | 436 | void Pipeline::cleanupRevision(qint64 revision) |
440 | { | 437 | { |
441 | d->revisionChanged = true; | 438 | d->revisionChanged = true; |
442 | const auto uid = Storage::getUidFromRevision(d->transaction, revision); | 439 | const auto uid = DataStore::getUidFromRevision(d->transaction, revision); |
443 | const auto bufferType = Storage::getTypeFromRevision(d->transaction, revision); | 440 | const auto bufferType = DataStore::getTypeFromRevision(d->transaction, revision); |
444 | SinkTrace() << "Cleaning up revision " << revision << uid << bufferType; | 441 | SinkTrace() << "Cleaning up revision " << revision << uid << bufferType; |
445 | Storage::mainDatabase(d->transaction, bufferType) | 442 | DataStore::mainDatabase(d->transaction, bufferType) |
446 | .scan(uid, | 443 | .scan(uid, |
447 | [&](const QByteArray &key, const QByteArray &data) -> bool { | 444 | [&](const QByteArray &key, const QByteArray &data) -> bool { |
448 | EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); | 445 | EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); |
@@ -453,20 +450,20 @@ void Pipeline::cleanupRevision(qint64 revision) | |||
453 | const qint64 rev = metadata->revision(); | 450 | const qint64 rev = metadata->revision(); |
454 | // Remove old revisions, and the current if the entity has already been removed | 451 | // Remove old revisions, and the current if the entity has already been removed |
455 | if (rev < revision || metadata->operation() == Operation_Removal) { | 452 | if (rev < revision || metadata->operation() == Operation_Removal) { |
456 | Storage::removeRevision(d->transaction, rev); | 453 | DataStore::removeRevision(d->transaction, rev); |
457 | Storage::mainDatabase(d->transaction, bufferType).remove(key); | 454 | DataStore::mainDatabase(d->transaction, bufferType).remove(key); |
458 | } | 455 | } |
459 | } | 456 | } |
460 | 457 | ||
461 | return true; | 458 | return true; |
462 | }, | 459 | }, |
463 | [](const Storage::Error &error) { SinkWarning() << "Error while reading: " << error.message; }, true); | 460 | [](const DataStore::Error &error) { SinkWarning() << "Error while reading: " << error.message; }, true); |
464 | Storage::setCleanedUpRevision(d->transaction, revision); | 461 | DataStore::setCleanedUpRevision(d->transaction, revision); |
465 | } | 462 | } |
466 | 463 | ||
467 | qint64 Pipeline::cleanedUpRevision() | 464 | qint64 Pipeline::cleanedUpRevision() |
468 | { | 465 | { |
469 | return Storage::cleanedUpRevision(d->transaction); | 466 | return DataStore::cleanedUpRevision(d->transaction); |
470 | } | 467 | } |
471 | 468 | ||
472 | class Preprocessor::Private { | 469 | class Preprocessor::Private { |
@@ -523,8 +520,6 @@ void Preprocessor::createEntity(const Sink::ApplicationDomain::ApplicationDomain | |||
523 | d->pipeline->newEntity(data, data.size()).exec(); | 520 | d->pipeline->newEntity(data, data.size()).exec(); |
524 | } | 521 | } |
525 | 522 | ||
526 | } // namespace Sink | ||
527 | |||
528 | #pragma clang diagnostic push | 523 | #pragma clang diagnostic push |
529 | #pragma clang diagnostic ignored "-Wundefined-reinterpret-cast" | 524 | #pragma clang diagnostic ignored "-Wundefined-reinterpret-cast" |
530 | #include "moc_pipeline.cpp" | 525 | #include "moc_pipeline.cpp" |