summaryrefslogtreecommitdiffstats
path: root/common/pipeline.cpp
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2016-10-16 14:55:20 +0200
committerChristian Mollekopf <chrigi_1@fastmail.fm>2016-10-21 09:02:21 +0200
commit237b9ae4113e7a9f489632296941becb71afdb45 (patch)
tree01cde58f495944f01cad9d282391d4efd2897141 /common/pipeline.cpp
parent95d11bf0be98a4e3c08502fe23417b800233ce14 (diff)
downloadsink-237b9ae4113e7a9f489632296941becb71afdb45.tar.gz
sink-237b9ae4113e7a9f489632296941becb71afdb45.zip
Refactor how the storage is used.
This is the initial refactoring to improve how we deal with the storage. It does a couple of things: * Rename Sink::Storage to Sink::Storage::DataStore to free up the Sink::Storage namespace * Introduce a Sink::ResourceContext to have a single object that can be passed around containing everything that is necessary to operate on a resource. This is a lot better than the multiple separate parameters that we used to pass around all over the place, while still allowing for dependency injection for tests. * Tie storage access together using the new EntityStore that directly works with ApplicationDomainTypes. This gives us a central place where main storage, indexes and buffer adaptors are tied together, which will also give us a place to implement external indexes, such as a fulltextindex using xapian. * Use ApplicationDomainTypes as the default way to pass around entities. Instead of using various ways to pass around entities (buffers, buffer adaptors, ApplicationDomainTypes), only use a single way. The old approach was confusing, and was only done as: * optimization; really shouldn't be necessary and otherwise I'm sure we can find better ways to optimize ApplicationDomainType itself. * a way to account for entities that have multiple buffers, a concept that I no longer deem relevant. While this commit does the bulk of the work to get there, the following commits will refactor more stuff to get things back to normal.
Diffstat (limited to 'common/pipeline.cpp')
-rw-r--r--common/pipeline.cpp99
1 files changed, 47 insertions, 52 deletions
diff --git a/common/pipeline.cpp b/common/pipeline.cpp
index ce864f7..e257857 100644
--- a/common/pipeline.cpp
+++ b/common/pipeline.cpp
@@ -40,45 +40,45 @@
40 40
41SINK_DEBUG_AREA("pipeline") 41SINK_DEBUG_AREA("pipeline")
42 42
43namespace Sink { 43using namespace Sink;
44using namespace Sink::Storage;
44 45
45class Pipeline::Private 46class Pipeline::Private
46{ 47{
47public: 48public:
48 Private(const QString &resourceName) : storage(Sink::storageLocation(), resourceName, Storage::ReadWrite), revisionChanged(false), resourceInstanceIdentifier(resourceName.toUtf8()) 49 Private(const ResourceContext &context) : resourceContext(context), storage(Sink::storageLocation(), context.instanceId(), DataStore::ReadWrite), revisionChanged(false)
49 { 50 {
50 } 51 }
51 52
52 Storage storage; 53 ResourceContext resourceContext;
53 Storage::Transaction transaction; 54 DataStore storage;
55 DataStore::Transaction transaction;
54 QHash<QString, QVector<QSharedPointer<Preprocessor>>> processors; 56 QHash<QString, QVector<QSharedPointer<Preprocessor>>> processors;
55 bool revisionChanged; 57 bool revisionChanged;
56 void storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid); 58 void storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid);
57 QTime transactionTime; 59 QTime transactionTime;
58 int transactionItemCount; 60 int transactionItemCount;
59 QByteArray resourceType;
60 QByteArray resourceInstanceIdentifier;
61}; 61};
62 62
63void Pipeline::Private::storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid) 63void Pipeline::Private::storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid)
64{ 64{
65 SinkTrace() << "Committing new revision: " << uid << newRevision; 65 SinkTrace() << "Committing new revision: " << uid << newRevision;
66 Storage::mainDatabase(transaction, bufferType) 66 DataStore::mainDatabase(transaction, bufferType)
67 .write(Storage::assembleKey(uid, newRevision), BufferUtils::extractBuffer(fbb), 67 .write(DataStore::assembleKey(uid, newRevision), BufferUtils::extractBuffer(fbb),
68 [uid, newRevision](const Storage::Error &error) { SinkWarning() << "Failed to write entity" << uid << newRevision; }); 68 [uid, newRevision](const DataStore::Error &error) { SinkWarning() << "Failed to write entity" << uid << newRevision; });
69 revisionChanged = true; 69 revisionChanged = true;
70 Storage::setMaxRevision(transaction, newRevision); 70 DataStore::setMaxRevision(transaction, newRevision);
71 Storage::recordRevision(transaction, newRevision, uid, bufferType); 71 DataStore::recordRevision(transaction, newRevision, uid, bufferType);
72} 72}
73 73
74 74
75Pipeline::Pipeline(const QString &resourceName, QObject *parent) : QObject(parent), d(new Private(resourceName)) 75Pipeline::Pipeline(const ResourceContext &context) : QObject(nullptr), d(new Private(context))
76{ 76{
77} 77}
78 78
79Pipeline::~Pipeline() 79Pipeline::~Pipeline()
80{ 80{
81 d->transaction = Storage::Transaction(); 81 d->transaction = DataStore::Transaction();
82} 82}
83 83
84void Pipeline::setPreprocessors(const QString &entityType, const QVector<Preprocessor *> &processors) 84void Pipeline::setPreprocessors(const QString &entityType, const QVector<Preprocessor *> &processors)
@@ -86,16 +86,11 @@ void Pipeline::setPreprocessors(const QString &entityType, const QVector<Preproc
86 auto &list = d->processors[entityType]; 86 auto &list = d->processors[entityType];
87 list.clear(); 87 list.clear();
88 for (auto p : processors) { 88 for (auto p : processors) {
89 p->setup(d->resourceType, d->resourceInstanceIdentifier, this); 89 p->setup(d->resourceContext.resourceType, d->resourceContext.instanceId(), this);
90 list.append(QSharedPointer<Preprocessor>(p)); 90 list.append(QSharedPointer<Preprocessor>(p));
91 } 91 }
92} 92}
93 93
94void Pipeline::setResourceType(const QByteArray &resourceType)
95{
96 d->resourceType = resourceType;
97}
98
99void Pipeline::startTransaction() 94void Pipeline::startTransaction()
100{ 95{
101 // TODO call for all types 96 // TODO call for all types
@@ -109,7 +104,7 @@ void Pipeline::startTransaction()
109 SinkTrace() << "Starting transaction."; 104 SinkTrace() << "Starting transaction.";
110 d->transactionTime.start(); 105 d->transactionTime.start();
111 d->transactionItemCount = 0; 106 d->transactionItemCount = 0;
112 d->transaction = storage().createTransaction(Storage::ReadWrite, [](const Sink::Storage::Error &error) { 107 d->transaction = storage().createTransaction(DataStore::ReadWrite, [](const DataStore::Error &error) {
113 SinkWarning() << error.message; 108 SinkWarning() << error.message;
114 }); 109 });
115 110
@@ -119,7 +114,7 @@ void Pipeline::startTransaction()
119 if (d->storage.exists()) { 114 if (d->storage.exists()) {
120 while (!d->transaction.validateNamedDatabases()) { 115 while (!d->transaction.validateNamedDatabases()) {
121 SinkWarning() << "Opened an invalid transaction!!!!!!"; 116 SinkWarning() << "Opened an invalid transaction!!!!!!";
122 d->transaction = storage().createTransaction(Storage::ReadWrite, [](const Sink::Storage::Error &error) { 117 d->transaction = storage().createTransaction(DataStore::ReadWrite, [](const DataStore::Error &error) {
123 SinkWarning() << error.message; 118 SinkWarning() << error.message;
124 }); 119 });
125 } 120 }
@@ -135,29 +130,29 @@ void Pipeline::commit()
135 // } 130 // }
136 if (!d->revisionChanged) { 131 if (!d->revisionChanged) {
137 d->transaction.abort(); 132 d->transaction.abort();
138 d->transaction = Storage::Transaction(); 133 d->transaction = DataStore::Transaction();
139 return; 134 return;
140 } 135 }
141 const auto revision = Storage::maxRevision(d->transaction); 136 const auto revision = DataStore::maxRevision(d->transaction);
142 const auto elapsed = d->transactionTime.elapsed(); 137 const auto elapsed = d->transactionTime.elapsed();
143 SinkLog() << "Committing revision: " << revision << ":" << d->transactionItemCount << " items in: " << Log::TraceTime(elapsed) << " " 138 SinkLog() << "Committing revision: " << revision << ":" << d->transactionItemCount << " items in: " << Log::TraceTime(elapsed) << " "
144 << (double)elapsed / (double)qMax(d->transactionItemCount, 1) << "[ms/item]"; 139 << (double)elapsed / (double)qMax(d->transactionItemCount, 1) << "[ms/item]";
145 if (d->transaction) { 140 if (d->transaction) {
146 d->transaction.commit(); 141 d->transaction.commit();
147 } 142 }
148 d->transaction = Storage::Transaction(); 143 d->transaction = DataStore::Transaction();
149 if (d->revisionChanged) { 144 if (d->revisionChanged) {
150 d->revisionChanged = false; 145 d->revisionChanged = false;
151 emit revisionUpdated(revision); 146 emit revisionUpdated(revision);
152 } 147 }
153} 148}
154 149
155Storage::Transaction &Pipeline::transaction() 150DataStore::Transaction &Pipeline::transaction()
156{ 151{
157 return d->transaction; 152 return d->transaction;
158} 153}
159 154
160Storage &Pipeline::storage() const 155DataStore &Pipeline::storage() const
161{ 156{
162 return d->storage; 157 return d->storage;
163} 158}
@@ -180,14 +175,14 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size)
180 QByteArray key; 175 QByteArray key;
181 if (createEntity->entityId()) { 176 if (createEntity->entityId()) {
182 key = QByteArray(reinterpret_cast<char const *>(createEntity->entityId()->Data()), createEntity->entityId()->size()); 177 key = QByteArray(reinterpret_cast<char const *>(createEntity->entityId()->Data()), createEntity->entityId()->size());
183 if (Storage::mainDatabase(d->transaction, bufferType).contains(key)) { 178 if (DataStore::mainDatabase(d->transaction, bufferType).contains(key)) {
184 SinkError() << "An entity with this id already exists: " << key; 179 SinkError() << "An entity with this id already exists: " << key;
185 return KAsync::error<qint64>(0); 180 return KAsync::error<qint64>(0);
186 } 181 }
187 } 182 }
188 183
189 if (key.isEmpty()) { 184 if (key.isEmpty()) {
190 key = Sink::Storage::generateUid(); 185 key = DataStore::generateUid();
191 } 186 }
192 SinkTrace() << "New Entity. Type: " << bufferType << "uid: "<< key << " replayToSource: " << replayToSource; 187 SinkTrace() << "New Entity. Type: " << bufferType << "uid: "<< key << " replayToSource: " << replayToSource;
193 Q_ASSERT(!key.isEmpty()); 188 Q_ASSERT(!key.isEmpty());
@@ -205,7 +200,7 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size)
205 return KAsync::error<qint64>(0); 200 return KAsync::error<qint64>(0);
206 } 201 }
207 202
208 auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceType, bufferType); 203 auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceContext.resourceType, bufferType);
209 if (!adaptorFactory) { 204 if (!adaptorFactory) {
210 SinkWarning() << "no adaptor factory for type " << bufferType; 205 SinkWarning() << "no adaptor factory for type " << bufferType;
211 return KAsync::error<qint64>(0); 206 return KAsync::error<qint64>(0);
@@ -214,10 +209,10 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size)
214 auto adaptor = adaptorFactory->createAdaptor(*entity); 209 auto adaptor = adaptorFactory->createAdaptor(*entity);
215 auto memoryAdaptor = QSharedPointer<Sink::ApplicationDomain::MemoryBufferAdaptor>::create(*(adaptor), adaptor->availableProperties()); 210 auto memoryAdaptor = QSharedPointer<Sink::ApplicationDomain::MemoryBufferAdaptor>::create(*(adaptor), adaptor->availableProperties());
216 foreach (const auto &processor, d->processors[bufferType]) { 211 foreach (const auto &processor, d->processors[bufferType]) {
217 processor->newEntity(key, Storage::maxRevision(d->transaction) + 1, *memoryAdaptor, d->transaction); 212 processor->newEntity(key, DataStore::maxRevision(d->transaction) + 1, *memoryAdaptor, d->transaction);
218 } 213 }
219 //The maxRevision may have changed meanwhile if the entity created sub-entities 214 //The maxRevision may have changed meanwhile if the entity created sub-entities
220 const qint64 newRevision = Storage::maxRevision(d->transaction) + 1; 215 const qint64 newRevision = DataStore::maxRevision(d->transaction) + 1;
221 216
222 // Add metadata buffer 217 // Add metadata buffer
223 flatbuffers::FlatBufferBuilder metadataFbb; 218 flatbuffers::FlatBufferBuilder metadataFbb;
@@ -233,6 +228,8 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size)
233 228
234 d->storeNewRevision(newRevision, fbb, bufferType, key); 229 d->storeNewRevision(newRevision, fbb, bufferType, key);
235 230
231 //FIXME entityStore->create(bufferType, memoryAdaptor, replayToSource)
232
236 return KAsync::value(newRevision); 233 return KAsync::value(newRevision);
237} 234}
238 235
@@ -273,7 +270,7 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size)
273 } 270 }
274 271
275 // TODO use only readPropertyMapper and writePropertyMapper 272 // TODO use only readPropertyMapper and writePropertyMapper
276 auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceType, bufferType); 273 auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceContext.resourceType, bufferType);
277 if (!adaptorFactory) { 274 if (!adaptorFactory) {
278 SinkWarning() << "no adaptor factory for type " << bufferType; 275 SinkWarning() << "no adaptor factory for type " << bufferType;
279 return KAsync::error<qint64>(0); 276 return KAsync::error<qint64>(0);
@@ -284,7 +281,7 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size)
284 auto diff = adaptorFactory->createAdaptor(*diffEntity); 281 auto diff = adaptorFactory->createAdaptor(*diffEntity);
285 282
286 QSharedPointer<ApplicationDomain::BufferAdaptor> current; 283 QSharedPointer<ApplicationDomain::BufferAdaptor> current;
287 Storage::mainDatabase(d->transaction, bufferType) 284 DataStore::mainDatabase(d->transaction, bufferType)
288 .findLatest(key, 285 .findLatest(key,
289 [&current, adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool { 286 [&current, adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool {
290 EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); 287 EntityBuffer buffer(const_cast<const char *>(data.data()), data.size());
@@ -295,7 +292,7 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size)
295 } 292 }
296 return false; 293 return false;
297 }, 294 },
298 [baseRevision](const Storage::Error &error) { SinkWarning() << "Failed to read old revision from storage: " << error.message << "Revision: " << baseRevision; }); 295 [baseRevision](const DataStore::Error &error) { SinkWarning() << "Failed to read old revision from storage: " << error.message << "Revision: " << baseRevision; });
299 296
300 if (!current) { 297 if (!current) {
301 SinkWarning() << "Failed to read local value " << key; 298 SinkWarning() << "Failed to read local value " << key;
@@ -323,10 +320,10 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size)
323 320
324 newAdaptor->resetChangedProperties(); 321 newAdaptor->resetChangedProperties();
325 foreach (const auto &processor, d->processors[bufferType]) { 322 foreach (const auto &processor, d->processors[bufferType]) {
326 processor->modifiedEntity(key, Storage::maxRevision(d->transaction) + 1, *current, *newAdaptor, d->transaction); 323 processor->modifiedEntity(key, DataStore::maxRevision(d->transaction) + 1, *current, *newAdaptor, d->transaction);
327 } 324 }
328 //The maxRevision may have changed meanwhile if the entity created sub-entities 325 //The maxRevision may have changed meanwhile if the entity created sub-entities
329 const qint64 newRevision = Storage::maxRevision(d->transaction) + 1; 326 const qint64 newRevision = DataStore::maxRevision(d->transaction) + 1;
330 327
331 // Add metadata buffer 328 // Add metadata buffer
332 flatbuffers::FlatBufferBuilder metadataFbb; 329 flatbuffers::FlatBufferBuilder metadataFbb;
@@ -369,7 +366,7 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size)
369 366
370 bool found = false; 367 bool found = false;
371 bool alreadyRemoved = false; 368 bool alreadyRemoved = false;
372 Storage::mainDatabase(d->transaction, bufferType) 369 DataStore::mainDatabase(d->transaction, bufferType)
373 .findLatest(key, 370 .findLatest(key,
374 [&found, &alreadyRemoved](const QByteArray &key, const QByteArray &data) -> bool { 371 [&found, &alreadyRemoved](const QByteArray &key, const QByteArray &data) -> bool {
375 auto entity = GetEntity(data.data()); 372 auto entity = GetEntity(data.data());
@@ -382,7 +379,7 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size)
382 } 379 }
383 return false; 380 return false;
384 }, 381 },
385 [](const Storage::Error &error) { SinkWarning() << "Failed to read old revision from storage: " << error.message; }); 382 [](const DataStore::Error &error) { SinkWarning() << "Failed to read old revision from storage: " << error.message; });
386 383
387 if (!found) { 384 if (!found) {
388 SinkWarning() << "Failed to find entity " << key; 385 SinkWarning() << "Failed to find entity " << key;
@@ -393,7 +390,7 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size)
393 return KAsync::error<qint64>(0); 390 return KAsync::error<qint64>(0);
394 } 391 }
395 392
396 const qint64 newRevision = Storage::maxRevision(d->transaction) + 1; 393 const qint64 newRevision = DataStore::maxRevision(d->transaction) + 1;
397 394
398 // Add metadata buffer 395 // Add metadata buffer
399 flatbuffers::FlatBufferBuilder metadataFbb; 396 flatbuffers::FlatBufferBuilder metadataFbb;
@@ -407,14 +404,14 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size)
407 flatbuffers::FlatBufferBuilder fbb; 404 flatbuffers::FlatBufferBuilder fbb;
408 EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), 0, 0, 0, 0); 405 EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), 0, 0, 0, 0);
409 406
410 auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceType, bufferType); 407 auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceContext.resourceType, bufferType);
411 if (!adaptorFactory) { 408 if (!adaptorFactory) {
412 SinkWarning() << "no adaptor factory for type " << bufferType; 409 SinkWarning() << "no adaptor factory for type " << bufferType;
413 return KAsync::error<qint64>(0); 410 return KAsync::error<qint64>(0);
414 } 411 }
415 412
416 QSharedPointer<ApplicationDomain::BufferAdaptor> current; 413 QSharedPointer<ApplicationDomain::BufferAdaptor> current;
417 Storage::mainDatabase(d->transaction, bufferType) 414 DataStore::mainDatabase(d->transaction, bufferType)
418 .findLatest(key, 415 .findLatest(key,
419 [this, bufferType, newRevision, adaptorFactory, key, &current](const QByteArray &, const QByteArray &data) -> bool { 416 [this, bufferType, newRevision, adaptorFactory, key, &current](const QByteArray &, const QByteArray &data) -> bool {
420 EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); 417 EntityBuffer buffer(const_cast<const char *>(data.data()), data.size());
@@ -425,7 +422,7 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size)
425 } 422 }
426 return false; 423 return false;
427 }, 424 },
428 [this](const Storage::Error &error) { SinkError() << "Failed to find value in pipeline: " << error.message; }); 425 [this](const DataStore::Error &error) { SinkError() << "Failed to find value in pipeline: " << error.message; });
429 426
430 d->storeNewRevision(newRevision, fbb, bufferType, key); 427 d->storeNewRevision(newRevision, fbb, bufferType, key);
431 428
@@ -439,10 +436,10 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size)
439void Pipeline::cleanupRevision(qint64 revision) 436void Pipeline::cleanupRevision(qint64 revision)
440{ 437{
441 d->revisionChanged = true; 438 d->revisionChanged = true;
442 const auto uid = Storage::getUidFromRevision(d->transaction, revision); 439 const auto uid = DataStore::getUidFromRevision(d->transaction, revision);
443 const auto bufferType = Storage::getTypeFromRevision(d->transaction, revision); 440 const auto bufferType = DataStore::getTypeFromRevision(d->transaction, revision);
444 SinkTrace() << "Cleaning up revision " << revision << uid << bufferType; 441 SinkTrace() << "Cleaning up revision " << revision << uid << bufferType;
445 Storage::mainDatabase(d->transaction, bufferType) 442 DataStore::mainDatabase(d->transaction, bufferType)
446 .scan(uid, 443 .scan(uid,
447 [&](const QByteArray &key, const QByteArray &data) -> bool { 444 [&](const QByteArray &key, const QByteArray &data) -> bool {
448 EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); 445 EntityBuffer buffer(const_cast<const char *>(data.data()), data.size());
@@ -453,20 +450,20 @@ void Pipeline::cleanupRevision(qint64 revision)
453 const qint64 rev = metadata->revision(); 450 const qint64 rev = metadata->revision();
454 // Remove old revisions, and the current if the entity has already been removed 451 // Remove old revisions, and the current if the entity has already been removed
455 if (rev < revision || metadata->operation() == Operation_Removal) { 452 if (rev < revision || metadata->operation() == Operation_Removal) {
456 Storage::removeRevision(d->transaction, rev); 453 DataStore::removeRevision(d->transaction, rev);
457 Storage::mainDatabase(d->transaction, bufferType).remove(key); 454 DataStore::mainDatabase(d->transaction, bufferType).remove(key);
458 } 455 }
459 } 456 }
460 457
461 return true; 458 return true;
462 }, 459 },
463 [](const Storage::Error &error) { SinkWarning() << "Error while reading: " << error.message; }, true); 460 [](const DataStore::Error &error) { SinkWarning() << "Error while reading: " << error.message; }, true);
464 Storage::setCleanedUpRevision(d->transaction, revision); 461 DataStore::setCleanedUpRevision(d->transaction, revision);
465} 462}
466 463
467qint64 Pipeline::cleanedUpRevision() 464qint64 Pipeline::cleanedUpRevision()
468{ 465{
469 return Storage::cleanedUpRevision(d->transaction); 466 return DataStore::cleanedUpRevision(d->transaction);
470} 467}
471 468
472class Preprocessor::Private { 469class Preprocessor::Private {
@@ -523,8 +520,6 @@ void Preprocessor::createEntity(const Sink::ApplicationDomain::ApplicationDomain
523 d->pipeline->newEntity(data, data.size()).exec(); 520 d->pipeline->newEntity(data, data.size()).exec();
524} 521}
525 522
526} // namespace Sink
527
528#pragma clang diagnostic push 523#pragma clang diagnostic push
529#pragma clang diagnostic ignored "-Wundefined-reinterpret-cast" 524#pragma clang diagnostic ignored "-Wundefined-reinterpret-cast"
530#include "moc_pipeline.cpp" 525#include "moc_pipeline.cpp"