diff options
author | Christian Mollekopf <chrigi_1@fastmail.fm> | 2016-10-16 14:55:20 +0200 |
---|---|---|
committer | Christian Mollekopf <chrigi_1@fastmail.fm> | 2016-10-21 09:02:21 +0200 |
commit | 237b9ae4113e7a9f489632296941becb71afdb45 (patch) | |
tree | 01cde58f495944f01cad9d282391d4efd2897141 /common/pipeline.cpp | |
parent | 95d11bf0be98a4e3c08502fe23417b800233ce14 (diff) | |
download | sink-237b9ae4113e7a9f489632296941becb71afdb45.tar.gz sink-237b9ae4113e7a9f489632296941becb71afdb45.zip |
Refactor how the storage is used.
This is the initial refactoring to improve how we deal with the storage.
It does a couple of things:
* Rename Sink::Storage to Sink::Storage::DataStore to free up the
Sink::Storage namespace
* Introduce a Sink::ResourceContext to have a single object that can be
passed around containing everything that is necessary to operate on a
resource. This is a lot better than the multiple separate parameters
that we used to pass around all over the place, while still allowing
for dependency injection for tests.
* Tie storage access together using the new EntityStore that directly
works with ApplicationDomainTypes. This gives us a central place where
main storage, indexes and buffer adaptors are tied together, which
will also give us a place to implement external indexes, such as a
fulltextindex using xapian.
* Use ApplicationDomainTypes as the default way to pass around entities.
Instead of using various ways to pass around entities (buffers,
buffer adaptors, ApplicationDomainTypes), only use a single way.
The old approach was confusing, and was only done as:
* optimization; really shouldn't be necessary and otherwise I'm sure
we can find better ways to optimize ApplicationDomainType itself.
* a way to account for entities that have multiple buffers, a concept
that I no longer deem relevant.
While this commit does the bulk of the work to get there, the following
commits will refactor more stuff to get things back to normal.
Diffstat (limited to 'common/pipeline.cpp')
-rw-r--r-- | common/pipeline.cpp | 99 |
1 files changed, 47 insertions, 52 deletions
diff --git a/common/pipeline.cpp b/common/pipeline.cpp index ce864f7..e257857 100644 --- a/common/pipeline.cpp +++ b/common/pipeline.cpp | |||
@@ -40,45 +40,45 @@ | |||
40 | 40 | ||
41 | SINK_DEBUG_AREA("pipeline") | 41 | SINK_DEBUG_AREA("pipeline") |
42 | 42 | ||
43 | namespace Sink { | 43 | using namespace Sink; |
44 | using namespace Sink::Storage; | ||
44 | 45 | ||
45 | class Pipeline::Private | 46 | class Pipeline::Private |
46 | { | 47 | { |
47 | public: | 48 | public: |
48 | Private(const QString &resourceName) : storage(Sink::storageLocation(), resourceName, Storage::ReadWrite), revisionChanged(false), resourceInstanceIdentifier(resourceName.toUtf8()) | 49 | Private(const ResourceContext &context) : resourceContext(context), storage(Sink::storageLocation(), context.instanceId(), DataStore::ReadWrite), revisionChanged(false) |
49 | { | 50 | { |
50 | } | 51 | } |
51 | 52 | ||
52 | Storage storage; | 53 | ResourceContext resourceContext; |
53 | Storage::Transaction transaction; | 54 | DataStore storage; |
55 | DataStore::Transaction transaction; | ||
54 | QHash<QString, QVector<QSharedPointer<Preprocessor>>> processors; | 56 | QHash<QString, QVector<QSharedPointer<Preprocessor>>> processors; |
55 | bool revisionChanged; | 57 | bool revisionChanged; |
56 | void storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid); | 58 | void storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid); |
57 | QTime transactionTime; | 59 | QTime transactionTime; |
58 | int transactionItemCount; | 60 | int transactionItemCount; |
59 | QByteArray resourceType; | ||
60 | QByteArray resourceInstanceIdentifier; | ||
61 | }; | 61 | }; |
62 | 62 | ||
63 | void Pipeline::Private::storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid) | 63 | void Pipeline::Private::storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid) |
64 | { | 64 | { |
65 | SinkTrace() << "Committing new revision: " << uid << newRevision; | 65 | SinkTrace() << "Committing new revision: " << uid << newRevision; |
66 | Storage::mainDatabase(transaction, bufferType) | 66 | DataStore::mainDatabase(transaction, bufferType) |
67 | .write(Storage::assembleKey(uid, newRevision), BufferUtils::extractBuffer(fbb), | 67 | .write(DataStore::assembleKey(uid, newRevision), BufferUtils::extractBuffer(fbb), |
68 | [uid, newRevision](const Storage::Error &error) { SinkWarning() << "Failed to write entity" << uid << newRevision; }); | 68 | [uid, newRevision](const DataStore::Error &error) { SinkWarning() << "Failed to write entity" << uid << newRevision; }); |
69 | revisionChanged = true; | 69 | revisionChanged = true; |
70 | Storage::setMaxRevision(transaction, newRevision); | 70 | DataStore::setMaxRevision(transaction, newRevision); |
71 | Storage::recordRevision(transaction, newRevision, uid, bufferType); | 71 | DataStore::recordRevision(transaction, newRevision, uid, bufferType); |
72 | } | 72 | } |
73 | 73 | ||
74 | 74 | ||
75 | Pipeline::Pipeline(const QString &resourceName, QObject *parent) : QObject(parent), d(new Private(resourceName)) | 75 | Pipeline::Pipeline(const ResourceContext &context) : QObject(nullptr), d(new Private(context)) |
76 | { | 76 | { |
77 | } | 77 | } |
78 | 78 | ||
79 | Pipeline::~Pipeline() | 79 | Pipeline::~Pipeline() |
80 | { | 80 | { |
81 | d->transaction = Storage::Transaction(); | 81 | d->transaction = DataStore::Transaction(); |
82 | } | 82 | } |
83 | 83 | ||
84 | void Pipeline::setPreprocessors(const QString &entityType, const QVector<Preprocessor *> &processors) | 84 | void Pipeline::setPreprocessors(const QString &entityType, const QVector<Preprocessor *> &processors) |
@@ -86,16 +86,11 @@ void Pipeline::setPreprocessors(const QString &entityType, const QVector<Preproc | |||
86 | auto &list = d->processors[entityType]; | 86 | auto &list = d->processors[entityType]; |
87 | list.clear(); | 87 | list.clear(); |
88 | for (auto p : processors) { | 88 | for (auto p : processors) { |
89 | p->setup(d->resourceType, d->resourceInstanceIdentifier, this); | 89 | p->setup(d->resourceContext.resourceType, d->resourceContext.instanceId(), this); |
90 | list.append(QSharedPointer<Preprocessor>(p)); | 90 | list.append(QSharedPointer<Preprocessor>(p)); |
91 | } | 91 | } |
92 | } | 92 | } |
93 | 93 | ||
94 | void Pipeline::setResourceType(const QByteArray &resourceType) | ||
95 | { | ||
96 | d->resourceType = resourceType; | ||
97 | } | ||
98 | |||
99 | void Pipeline::startTransaction() | 94 | void Pipeline::startTransaction() |
100 | { | 95 | { |
101 | // TODO call for all types | 96 | // TODO call for all types |
@@ -109,7 +104,7 @@ void Pipeline::startTransaction() | |||
109 | SinkTrace() << "Starting transaction."; | 104 | SinkTrace() << "Starting transaction."; |
110 | d->transactionTime.start(); | 105 | d->transactionTime.start(); |
111 | d->transactionItemCount = 0; | 106 | d->transactionItemCount = 0; |
112 | d->transaction = storage().createTransaction(Storage::ReadWrite, [](const Sink::Storage::Error &error) { | 107 | d->transaction = storage().createTransaction(DataStore::ReadWrite, [](const DataStore::Error &error) { |
113 | SinkWarning() << error.message; | 108 | SinkWarning() << error.message; |
114 | }); | 109 | }); |
115 | 110 | ||
@@ -119,7 +114,7 @@ void Pipeline::startTransaction() | |||
119 | if (d->storage.exists()) { | 114 | if (d->storage.exists()) { |
120 | while (!d->transaction.validateNamedDatabases()) { | 115 | while (!d->transaction.validateNamedDatabases()) { |
121 | SinkWarning() << "Opened an invalid transaction!!!!!!"; | 116 | SinkWarning() << "Opened an invalid transaction!!!!!!"; |
122 | d->transaction = storage().createTransaction(Storage::ReadWrite, [](const Sink::Storage::Error &error) { | 117 | d->transaction = storage().createTransaction(DataStore::ReadWrite, [](const DataStore::Error &error) { |
123 | SinkWarning() << error.message; | 118 | SinkWarning() << error.message; |
124 | }); | 119 | }); |
125 | } | 120 | } |
@@ -135,29 +130,29 @@ void Pipeline::commit() | |||
135 | // } | 130 | // } |
136 | if (!d->revisionChanged) { | 131 | if (!d->revisionChanged) { |
137 | d->transaction.abort(); | 132 | d->transaction.abort(); |
138 | d->transaction = Storage::Transaction(); | 133 | d->transaction = DataStore::Transaction(); |
139 | return; | 134 | return; |
140 | } | 135 | } |
141 | const auto revision = Storage::maxRevision(d->transaction); | 136 | const auto revision = DataStore::maxRevision(d->transaction); |
142 | const auto elapsed = d->transactionTime.elapsed(); | 137 | const auto elapsed = d->transactionTime.elapsed(); |
143 | SinkLog() << "Committing revision: " << revision << ":" << d->transactionItemCount << " items in: " << Log::TraceTime(elapsed) << " " | 138 | SinkLog() << "Committing revision: " << revision << ":" << d->transactionItemCount << " items in: " << Log::TraceTime(elapsed) << " " |
144 | << (double)elapsed / (double)qMax(d->transactionItemCount, 1) << "[ms/item]"; | 139 | << (double)elapsed / (double)qMax(d->transactionItemCount, 1) << "[ms/item]"; |
145 | if (d->transaction) { | 140 | if (d->transaction) { |
146 | d->transaction.commit(); | 141 | d->transaction.commit(); |
147 | } | 142 | } |
148 | d->transaction = Storage::Transaction(); | 143 | d->transaction = DataStore::Transaction(); |
149 | if (d->revisionChanged) { | 144 | if (d->revisionChanged) { |
150 | d->revisionChanged = false; | 145 | d->revisionChanged = false; |
151 | emit revisionUpdated(revision); | 146 | emit revisionUpdated(revision); |
152 | } | 147 | } |
153 | } | 148 | } |
154 | 149 | ||
155 | Storage::Transaction &Pipeline::transaction() | 150 | DataStore::Transaction &Pipeline::transaction() |
156 | { | 151 | { |
157 | return d->transaction; | 152 | return d->transaction; |
158 | } | 153 | } |
159 | 154 | ||
160 | Storage &Pipeline::storage() const | 155 | DataStore &Pipeline::storage() const |
161 | { | 156 | { |
162 | return d->storage; | 157 | return d->storage; |
163 | } | 158 | } |
@@ -180,14 +175,14 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size) | |||
180 | QByteArray key; | 175 | QByteArray key; |
181 | if (createEntity->entityId()) { | 176 | if (createEntity->entityId()) { |
182 | key = QByteArray(reinterpret_cast<char const *>(createEntity->entityId()->Data()), createEntity->entityId()->size()); | 177 | key = QByteArray(reinterpret_cast<char const *>(createEntity->entityId()->Data()), createEntity->entityId()->size()); |
183 | if (Storage::mainDatabase(d->transaction, bufferType).contains(key)) { | 178 | if (DataStore::mainDatabase(d->transaction, bufferType).contains(key)) { |
184 | SinkError() << "An entity with this id already exists: " << key; | 179 | SinkError() << "An entity with this id already exists: " << key; |
185 | return KAsync::error<qint64>(0); | 180 | return KAsync::error<qint64>(0); |
186 | } | 181 | } |
187 | } | 182 | } |
188 | 183 | ||
189 | if (key.isEmpty()) { | 184 | if (key.isEmpty()) { |
190 | key = Sink::Storage::generateUid(); | 185 | key = DataStore::generateUid(); |
191 | } | 186 | } |
192 | SinkTrace() << "New Entity. Type: " << bufferType << "uid: "<< key << " replayToSource: " << replayToSource; | 187 | SinkTrace() << "New Entity. Type: " << bufferType << "uid: "<< key << " replayToSource: " << replayToSource; |
193 | Q_ASSERT(!key.isEmpty()); | 188 | Q_ASSERT(!key.isEmpty()); |
@@ -205,7 +200,7 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size) | |||
205 | return KAsync::error<qint64>(0); | 200 | return KAsync::error<qint64>(0); |
206 | } | 201 | } |
207 | 202 | ||
208 | auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceType, bufferType); | 203 | auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceContext.resourceType, bufferType); |
209 | if (!adaptorFactory) { | 204 | if (!adaptorFactory) { |
210 | SinkWarning() << "no adaptor factory for type " << bufferType; | 205 | SinkWarning() << "no adaptor factory for type " << bufferType; |
211 | return KAsync::error<qint64>(0); | 206 | return KAsync::error<qint64>(0); |
@@ -214,10 +209,10 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size) | |||
214 | auto adaptor = adaptorFactory->createAdaptor(*entity); | 209 | auto adaptor = adaptorFactory->createAdaptor(*entity); |
215 | auto memoryAdaptor = QSharedPointer<Sink::ApplicationDomain::MemoryBufferAdaptor>::create(*(adaptor), adaptor->availableProperties()); | 210 | auto memoryAdaptor = QSharedPointer<Sink::ApplicationDomain::MemoryBufferAdaptor>::create(*(adaptor), adaptor->availableProperties()); |
216 | foreach (const auto &processor, d->processors[bufferType]) { | 211 | foreach (const auto &processor, d->processors[bufferType]) { |
217 | processor->newEntity(key, Storage::maxRevision(d->transaction) + 1, *memoryAdaptor, d->transaction); | 212 | processor->newEntity(key, DataStore::maxRevision(d->transaction) + 1, *memoryAdaptor, d->transaction); |
218 | } | 213 | } |
219 | //The maxRevision may have changed meanwhile if the entity created sub-entities | 214 | //The maxRevision may have changed meanwhile if the entity created sub-entities |
220 | const qint64 newRevision = Storage::maxRevision(d->transaction) + 1; | 215 | const qint64 newRevision = DataStore::maxRevision(d->transaction) + 1; |
221 | 216 | ||
222 | // Add metadata buffer | 217 | // Add metadata buffer |
223 | flatbuffers::FlatBufferBuilder metadataFbb; | 218 | flatbuffers::FlatBufferBuilder metadataFbb; |
@@ -233,6 +228,8 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size) | |||
233 | 228 | ||
234 | d->storeNewRevision(newRevision, fbb, bufferType, key); | 229 | d->storeNewRevision(newRevision, fbb, bufferType, key); |
235 | 230 | ||
231 | //FIXME entityStore->create(bufferType, memoryAdaptor, replayToSource) | ||
232 | |||
236 | return KAsync::value(newRevision); | 233 | return KAsync::value(newRevision); |
237 | } | 234 | } |
238 | 235 | ||
@@ -273,7 +270,7 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size) | |||
273 | } | 270 | } |
274 | 271 | ||
275 | // TODO use only readPropertyMapper and writePropertyMapper | 272 | // TODO use only readPropertyMapper and writePropertyMapper |
276 | auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceType, bufferType); | 273 | auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceContext.resourceType, bufferType); |
277 | if (!adaptorFactory) { | 274 | if (!adaptorFactory) { |
278 | SinkWarning() << "no adaptor factory for type " << bufferType; | 275 | SinkWarning() << "no adaptor factory for type " << bufferType; |
279 | return KAsync::error<qint64>(0); | 276 | return KAsync::error<qint64>(0); |
@@ -284,7 +281,7 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size) | |||
284 | auto diff = adaptorFactory->createAdaptor(*diffEntity); | 281 | auto diff = adaptorFactory->createAdaptor(*diffEntity); |
285 | 282 | ||
286 | QSharedPointer<ApplicationDomain::BufferAdaptor> current; | 283 | QSharedPointer<ApplicationDomain::BufferAdaptor> current; |
287 | Storage::mainDatabase(d->transaction, bufferType) | 284 | DataStore::mainDatabase(d->transaction, bufferType) |
288 | .findLatest(key, | 285 | .findLatest(key, |
289 | [¤t, adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool { | 286 | [¤t, adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool { |
290 | EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); | 287 | EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); |
@@ -295,7 +292,7 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size) | |||
295 | } | 292 | } |
296 | return false; | 293 | return false; |
297 | }, | 294 | }, |
298 | [baseRevision](const Storage::Error &error) { SinkWarning() << "Failed to read old revision from storage: " << error.message << "Revision: " << baseRevision; }); | 295 | [baseRevision](const DataStore::Error &error) { SinkWarning() << "Failed to read old revision from storage: " << error.message << "Revision: " << baseRevision; }); |
299 | 296 | ||
300 | if (!current) { | 297 | if (!current) { |
301 | SinkWarning() << "Failed to read local value " << key; | 298 | SinkWarning() << "Failed to read local value " << key; |
@@ -323,10 +320,10 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size) | |||
323 | 320 | ||
324 | newAdaptor->resetChangedProperties(); | 321 | newAdaptor->resetChangedProperties(); |
325 | foreach (const auto &processor, d->processors[bufferType]) { | 322 | foreach (const auto &processor, d->processors[bufferType]) { |
326 | processor->modifiedEntity(key, Storage::maxRevision(d->transaction) + 1, *current, *newAdaptor, d->transaction); | 323 | processor->modifiedEntity(key, DataStore::maxRevision(d->transaction) + 1, *current, *newAdaptor, d->transaction); |
327 | } | 324 | } |
328 | //The maxRevision may have changed meanwhile if the entity created sub-entities | 325 | //The maxRevision may have changed meanwhile if the entity created sub-entities |
329 | const qint64 newRevision = Storage::maxRevision(d->transaction) + 1; | 326 | const qint64 newRevision = DataStore::maxRevision(d->transaction) + 1; |
330 | 327 | ||
331 | // Add metadata buffer | 328 | // Add metadata buffer |
332 | flatbuffers::FlatBufferBuilder metadataFbb; | 329 | flatbuffers::FlatBufferBuilder metadataFbb; |
@@ -369,7 +366,7 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size) | |||
369 | 366 | ||
370 | bool found = false; | 367 | bool found = false; |
371 | bool alreadyRemoved = false; | 368 | bool alreadyRemoved = false; |
372 | Storage::mainDatabase(d->transaction, bufferType) | 369 | DataStore::mainDatabase(d->transaction, bufferType) |
373 | .findLatest(key, | 370 | .findLatest(key, |
374 | [&found, &alreadyRemoved](const QByteArray &key, const QByteArray &data) -> bool { | 371 | [&found, &alreadyRemoved](const QByteArray &key, const QByteArray &data) -> bool { |
375 | auto entity = GetEntity(data.data()); | 372 | auto entity = GetEntity(data.data()); |
@@ -382,7 +379,7 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size) | |||
382 | } | 379 | } |
383 | return false; | 380 | return false; |
384 | }, | 381 | }, |
385 | [](const Storage::Error &error) { SinkWarning() << "Failed to read old revision from storage: " << error.message; }); | 382 | [](const DataStore::Error &error) { SinkWarning() << "Failed to read old revision from storage: " << error.message; }); |
386 | 383 | ||
387 | if (!found) { | 384 | if (!found) { |
388 | SinkWarning() << "Failed to find entity " << key; | 385 | SinkWarning() << "Failed to find entity " << key; |
@@ -393,7 +390,7 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size) | |||
393 | return KAsync::error<qint64>(0); | 390 | return KAsync::error<qint64>(0); |
394 | } | 391 | } |
395 | 392 | ||
396 | const qint64 newRevision = Storage::maxRevision(d->transaction) + 1; | 393 | const qint64 newRevision = DataStore::maxRevision(d->transaction) + 1; |
397 | 394 | ||
398 | // Add metadata buffer | 395 | // Add metadata buffer |
399 | flatbuffers::FlatBufferBuilder metadataFbb; | 396 | flatbuffers::FlatBufferBuilder metadataFbb; |
@@ -407,14 +404,14 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size) | |||
407 | flatbuffers::FlatBufferBuilder fbb; | 404 | flatbuffers::FlatBufferBuilder fbb; |
408 | EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), 0, 0, 0, 0); | 405 | EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), 0, 0, 0, 0); |
409 | 406 | ||
410 | auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceType, bufferType); | 407 | auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceContext.resourceType, bufferType); |
411 | if (!adaptorFactory) { | 408 | if (!adaptorFactory) { |
412 | SinkWarning() << "no adaptor factory for type " << bufferType; | 409 | SinkWarning() << "no adaptor factory for type " << bufferType; |
413 | return KAsync::error<qint64>(0); | 410 | return KAsync::error<qint64>(0); |
414 | } | 411 | } |
415 | 412 | ||
416 | QSharedPointer<ApplicationDomain::BufferAdaptor> current; | 413 | QSharedPointer<ApplicationDomain::BufferAdaptor> current; |
417 | Storage::mainDatabase(d->transaction, bufferType) | 414 | DataStore::mainDatabase(d->transaction, bufferType) |
418 | .findLatest(key, | 415 | .findLatest(key, |
419 | [this, bufferType, newRevision, adaptorFactory, key, ¤t](const QByteArray &, const QByteArray &data) -> bool { | 416 | [this, bufferType, newRevision, adaptorFactory, key, ¤t](const QByteArray &, const QByteArray &data) -> bool { |
420 | EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); | 417 | EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); |
@@ -425,7 +422,7 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size) | |||
425 | } | 422 | } |
426 | return false; | 423 | return false; |
427 | }, | 424 | }, |
428 | [this](const Storage::Error &error) { SinkError() << "Failed to find value in pipeline: " << error.message; }); | 425 | [this](const DataStore::Error &error) { SinkError() << "Failed to find value in pipeline: " << error.message; }); |
429 | 426 | ||
430 | d->storeNewRevision(newRevision, fbb, bufferType, key); | 427 | d->storeNewRevision(newRevision, fbb, bufferType, key); |
431 | 428 | ||
@@ -439,10 +436,10 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size) | |||
439 | void Pipeline::cleanupRevision(qint64 revision) | 436 | void Pipeline::cleanupRevision(qint64 revision) |
440 | { | 437 | { |
441 | d->revisionChanged = true; | 438 | d->revisionChanged = true; |
442 | const auto uid = Storage::getUidFromRevision(d->transaction, revision); | 439 | const auto uid = DataStore::getUidFromRevision(d->transaction, revision); |
443 | const auto bufferType = Storage::getTypeFromRevision(d->transaction, revision); | 440 | const auto bufferType = DataStore::getTypeFromRevision(d->transaction, revision); |
444 | SinkTrace() << "Cleaning up revision " << revision << uid << bufferType; | 441 | SinkTrace() << "Cleaning up revision " << revision << uid << bufferType; |
445 | Storage::mainDatabase(d->transaction, bufferType) | 442 | DataStore::mainDatabase(d->transaction, bufferType) |
446 | .scan(uid, | 443 | .scan(uid, |
447 | [&](const QByteArray &key, const QByteArray &data) -> bool { | 444 | [&](const QByteArray &key, const QByteArray &data) -> bool { |
448 | EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); | 445 | EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); |
@@ -453,20 +450,20 @@ void Pipeline::cleanupRevision(qint64 revision) | |||
453 | const qint64 rev = metadata->revision(); | 450 | const qint64 rev = metadata->revision(); |
454 | // Remove old revisions, and the current if the entity has already been removed | 451 | // Remove old revisions, and the current if the entity has already been removed |
455 | if (rev < revision || metadata->operation() == Operation_Removal) { | 452 | if (rev < revision || metadata->operation() == Operation_Removal) { |
456 | Storage::removeRevision(d->transaction, rev); | 453 | DataStore::removeRevision(d->transaction, rev); |
457 | Storage::mainDatabase(d->transaction, bufferType).remove(key); | 454 | DataStore::mainDatabase(d->transaction, bufferType).remove(key); |
458 | } | 455 | } |
459 | } | 456 | } |
460 | 457 | ||
461 | return true; | 458 | return true; |
462 | }, | 459 | }, |
463 | [](const Storage::Error &error) { SinkWarning() << "Error while reading: " << error.message; }, true); | 460 | [](const DataStore::Error &error) { SinkWarning() << "Error while reading: " << error.message; }, true); |
464 | Storage::setCleanedUpRevision(d->transaction, revision); | 461 | DataStore::setCleanedUpRevision(d->transaction, revision); |
465 | } | 462 | } |
466 | 463 | ||
467 | qint64 Pipeline::cleanedUpRevision() | 464 | qint64 Pipeline::cleanedUpRevision() |
468 | { | 465 | { |
469 | return Storage::cleanedUpRevision(d->transaction); | 466 | return DataStore::cleanedUpRevision(d->transaction); |
470 | } | 467 | } |
471 | 468 | ||
472 | class Preprocessor::Private { | 469 | class Preprocessor::Private { |
@@ -523,8 +520,6 @@ void Preprocessor::createEntity(const Sink::ApplicationDomain::ApplicationDomain | |||
523 | d->pipeline->newEntity(data, data.size()).exec(); | 520 | d->pipeline->newEntity(data, data.size()).exec(); |
524 | } | 521 | } |
525 | 522 | ||
526 | } // namespace Sink | ||
527 | |||
528 | #pragma clang diagnostic push | 523 | #pragma clang diagnostic push |
529 | #pragma clang diagnostic ignored "-Wundefined-reinterpret-cast" | 524 | #pragma clang diagnostic ignored "-Wundefined-reinterpret-cast" |
530 | #include "moc_pipeline.cpp" | 525 | #include "moc_pipeline.cpp" |