From 237b9ae4113e7a9f489632296941becb71afdb45 Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Sun, 16 Oct 2016 14:55:20 +0200 Subject: Refactor how the storage is used. This is the initial refactoring to improve how we deal with the storage. It does a couple of things: * Rename Sink::Storage to Sink::Storage::DataStore to free up the Sink::Storage namespace * Introduce a Sink::ResourceContext to have a single object that can be passed around containing everything that is necessary to operate on a resource. This is a lot better than the multiple separate parameters that we used to pass around all over the place, while still allowing for dependency injection for tests. * Tie storage access together using the new EntityStore that directly works with ApplicationDomainTypes. This gives us a central place where main storage, indexes and buffer adaptors are tied together, which will also give us a place to implement external indexes, such as a fulltextindex using xapian. * Use ApplicationDomainTypes as the default way to pass around entities. Instead of using various ways to pass around entities (buffers, buffer adaptors, ApplicationDomainTypes), only use a single way. The old approach was confusing, and was only done as: * optimization; really shouldn't be necessary and otherwise I'm sure we can find better ways to optimize ApplicationDomainType itself. * a way to account for entities that have multiple buffers, a concept that I no longer deem relevant. While this commit does the bulk of the work to get there, the following commits will refactor more stuff to get things back to normal. --- common/pipeline.cpp | 99 +++++++++++++++++++++++++---------------------------- 1 file changed, 47 insertions(+), 52 deletions(-) (limited to 'common/pipeline.cpp') diff --git a/common/pipeline.cpp b/common/pipeline.cpp index ce864f7..e257857 100644 --- a/common/pipeline.cpp +++ b/common/pipeline.cpp @@ -40,45 +40,45 @@ SINK_DEBUG_AREA("pipeline") -namespace Sink { +using namespace Sink; +using namespace Sink::Storage; class Pipeline::Private { public: - Private(const QString &resourceName) : storage(Sink::storageLocation(), resourceName, Storage::ReadWrite), revisionChanged(false), resourceInstanceIdentifier(resourceName.toUtf8()) + Private(const ResourceContext &context) : resourceContext(context), storage(Sink::storageLocation(), context.instanceId(), DataStore::ReadWrite), revisionChanged(false) { } - Storage storage; - Storage::Transaction transaction; + ResourceContext resourceContext; + DataStore storage; + DataStore::Transaction transaction; QHash>> processors; bool revisionChanged; void storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid); QTime transactionTime; int transactionItemCount; - QByteArray resourceType; - QByteArray resourceInstanceIdentifier; }; void Pipeline::Private::storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid) { SinkTrace() << "Committing new revision: " << uid << newRevision; - Storage::mainDatabase(transaction, bufferType) - .write(Storage::assembleKey(uid, newRevision), BufferUtils::extractBuffer(fbb), - [uid, newRevision](const Storage::Error &error) { SinkWarning() << "Failed to write entity" << uid << newRevision; }); + DataStore::mainDatabase(transaction, bufferType) + .write(DataStore::assembleKey(uid, newRevision), BufferUtils::extractBuffer(fbb), + [uid, newRevision](const DataStore::Error &error) { SinkWarning() << "Failed to write entity" << uid << newRevision; }); revisionChanged = true; - Storage::setMaxRevision(transaction, newRevision); - Storage::recordRevision(transaction, newRevision, uid, bufferType); + DataStore::setMaxRevision(transaction, newRevision); + DataStore::recordRevision(transaction, newRevision, uid, bufferType); } -Pipeline::Pipeline(const QString &resourceName, QObject *parent) : QObject(parent), d(new Private(resourceName)) +Pipeline::Pipeline(const ResourceContext &context) : QObject(nullptr), d(new Private(context)) { } Pipeline::~Pipeline() { - d->transaction = Storage::Transaction(); + d->transaction = DataStore::Transaction(); } void Pipeline::setPreprocessors(const QString &entityType, const QVector &processors) @@ -86,16 +86,11 @@ void Pipeline::setPreprocessors(const QString &entityType, const QVectorprocessors[entityType]; list.clear(); for (auto p : processors) { - p->setup(d->resourceType, d->resourceInstanceIdentifier, this); + p->setup(d->resourceContext.resourceType, d->resourceContext.instanceId(), this); list.append(QSharedPointer(p)); } } -void Pipeline::setResourceType(const QByteArray &resourceType) -{ - d->resourceType = resourceType; -} - void Pipeline::startTransaction() { // TODO call for all types @@ -109,7 +104,7 @@ void Pipeline::startTransaction() SinkTrace() << "Starting transaction."; d->transactionTime.start(); d->transactionItemCount = 0; - d->transaction = storage().createTransaction(Storage::ReadWrite, [](const Sink::Storage::Error &error) { + d->transaction = storage().createTransaction(DataStore::ReadWrite, [](const DataStore::Error &error) { SinkWarning() << error.message; }); @@ -119,7 +114,7 @@ void Pipeline::startTransaction() if (d->storage.exists()) { while (!d->transaction.validateNamedDatabases()) { SinkWarning() << "Opened an invalid transaction!!!!!!"; - d->transaction = storage().createTransaction(Storage::ReadWrite, [](const Sink::Storage::Error &error) { + d->transaction = storage().createTransaction(DataStore::ReadWrite, [](const DataStore::Error &error) { SinkWarning() << error.message; }); } @@ -135,29 +130,29 @@ void Pipeline::commit() // } if (!d->revisionChanged) { d->transaction.abort(); - d->transaction = Storage::Transaction(); + d->transaction = DataStore::Transaction(); return; } - const auto revision = Storage::maxRevision(d->transaction); + const auto revision = DataStore::maxRevision(d->transaction); const auto elapsed = d->transactionTime.elapsed(); SinkLog() << "Committing revision: " << revision << ":" << d->transactionItemCount << " items in: " << Log::TraceTime(elapsed) << " " << (double)elapsed / (double)qMax(d->transactionItemCount, 1) << "[ms/item]"; if (d->transaction) { d->transaction.commit(); } - d->transaction = Storage::Transaction(); + d->transaction = DataStore::Transaction(); if (d->revisionChanged) { d->revisionChanged = false; emit revisionUpdated(revision); } } -Storage::Transaction &Pipeline::transaction() +DataStore::Transaction &Pipeline::transaction() { return d->transaction; } -Storage &Pipeline::storage() const +DataStore &Pipeline::storage() const { return d->storage; } @@ -180,14 +175,14 @@ KAsync::Job Pipeline::newEntity(void const *command, size_t size) QByteArray key; if (createEntity->entityId()) { key = QByteArray(reinterpret_cast(createEntity->entityId()->Data()), createEntity->entityId()->size()); - if (Storage::mainDatabase(d->transaction, bufferType).contains(key)) { + if (DataStore::mainDatabase(d->transaction, bufferType).contains(key)) { SinkError() << "An entity with this id already exists: " << key; return KAsync::error(0); } } if (key.isEmpty()) { - key = Sink::Storage::generateUid(); + key = DataStore::generateUid(); } SinkTrace() << "New Entity. Type: " << bufferType << "uid: "<< key << " replayToSource: " << replayToSource; Q_ASSERT(!key.isEmpty()); @@ -205,7 +200,7 @@ KAsync::Job Pipeline::newEntity(void const *command, size_t size) return KAsync::error(0); } - auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceType, bufferType); + auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceContext.resourceType, bufferType); if (!adaptorFactory) { SinkWarning() << "no adaptor factory for type " << bufferType; return KAsync::error(0); @@ -214,10 +209,10 @@ KAsync::Job Pipeline::newEntity(void const *command, size_t size) auto adaptor = adaptorFactory->createAdaptor(*entity); auto memoryAdaptor = QSharedPointer::create(*(adaptor), adaptor->availableProperties()); foreach (const auto &processor, d->processors[bufferType]) { - processor->newEntity(key, Storage::maxRevision(d->transaction) + 1, *memoryAdaptor, d->transaction); + processor->newEntity(key, DataStore::maxRevision(d->transaction) + 1, *memoryAdaptor, d->transaction); } //The maxRevision may have changed meanwhile if the entity created sub-entities - const qint64 newRevision = Storage::maxRevision(d->transaction) + 1; + const qint64 newRevision = DataStore::maxRevision(d->transaction) + 1; // Add metadata buffer flatbuffers::FlatBufferBuilder metadataFbb; @@ -233,6 +228,8 @@ KAsync::Job Pipeline::newEntity(void const *command, size_t size) d->storeNewRevision(newRevision, fbb, bufferType, key); + //FIXME entityStore->create(bufferType, memoryAdaptor, replayToSource) + return KAsync::value(newRevision); } @@ -273,7 +270,7 @@ KAsync::Job Pipeline::modifiedEntity(void const *command, size_t size) } // TODO use only readPropertyMapper and writePropertyMapper - auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceType, bufferType); + auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceContext.resourceType, bufferType); if (!adaptorFactory) { SinkWarning() << "no adaptor factory for type " << bufferType; return KAsync::error(0); @@ -284,7 +281,7 @@ KAsync::Job Pipeline::modifiedEntity(void const *command, size_t size) auto diff = adaptorFactory->createAdaptor(*diffEntity); QSharedPointer current; - Storage::mainDatabase(d->transaction, bufferType) + DataStore::mainDatabase(d->transaction, bufferType) .findLatest(key, [¤t, adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool { EntityBuffer buffer(const_cast(data.data()), data.size()); @@ -295,7 +292,7 @@ KAsync::Job Pipeline::modifiedEntity(void const *command, size_t size) } return false; }, - [baseRevision](const Storage::Error &error) { SinkWarning() << "Failed to read old revision from storage: " << error.message << "Revision: " << baseRevision; }); + [baseRevision](const DataStore::Error &error) { SinkWarning() << "Failed to read old revision from storage: " << error.message << "Revision: " << baseRevision; }); if (!current) { SinkWarning() << "Failed to read local value " << key; @@ -323,10 +320,10 @@ KAsync::Job Pipeline::modifiedEntity(void const *command, size_t size) newAdaptor->resetChangedProperties(); foreach (const auto &processor, d->processors[bufferType]) { - processor->modifiedEntity(key, Storage::maxRevision(d->transaction) + 1, *current, *newAdaptor, d->transaction); + processor->modifiedEntity(key, DataStore::maxRevision(d->transaction) + 1, *current, *newAdaptor, d->transaction); } //The maxRevision may have changed meanwhile if the entity created sub-entities - const qint64 newRevision = Storage::maxRevision(d->transaction) + 1; + const qint64 newRevision = DataStore::maxRevision(d->transaction) + 1; // Add metadata buffer flatbuffers::FlatBufferBuilder metadataFbb; @@ -369,7 +366,7 @@ KAsync::Job Pipeline::deletedEntity(void const *command, size_t size) bool found = false; bool alreadyRemoved = false; - Storage::mainDatabase(d->transaction, bufferType) + DataStore::mainDatabase(d->transaction, bufferType) .findLatest(key, [&found, &alreadyRemoved](const QByteArray &key, const QByteArray &data) -> bool { auto entity = GetEntity(data.data()); @@ -382,7 +379,7 @@ KAsync::Job Pipeline::deletedEntity(void const *command, size_t size) } return false; }, - [](const Storage::Error &error) { SinkWarning() << "Failed to read old revision from storage: " << error.message; }); + [](const DataStore::Error &error) { SinkWarning() << "Failed to read old revision from storage: " << error.message; }); if (!found) { SinkWarning() << "Failed to find entity " << key; @@ -393,7 +390,7 @@ KAsync::Job Pipeline::deletedEntity(void const *command, size_t size) return KAsync::error(0); } - const qint64 newRevision = Storage::maxRevision(d->transaction) + 1; + const qint64 newRevision = DataStore::maxRevision(d->transaction) + 1; // Add metadata buffer flatbuffers::FlatBufferBuilder metadataFbb; @@ -407,14 +404,14 @@ KAsync::Job Pipeline::deletedEntity(void const *command, size_t size) flatbuffers::FlatBufferBuilder fbb; EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), 0, 0, 0, 0); - auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceType, bufferType); + auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceContext.resourceType, bufferType); if (!adaptorFactory) { SinkWarning() << "no adaptor factory for type " << bufferType; return KAsync::error(0); } QSharedPointer current; - Storage::mainDatabase(d->transaction, bufferType) + DataStore::mainDatabase(d->transaction, bufferType) .findLatest(key, [this, bufferType, newRevision, adaptorFactory, key, ¤t](const QByteArray &, const QByteArray &data) -> bool { EntityBuffer buffer(const_cast(data.data()), data.size()); @@ -425,7 +422,7 @@ KAsync::Job Pipeline::deletedEntity(void const *command, size_t size) } return false; }, - [this](const Storage::Error &error) { SinkError() << "Failed to find value in pipeline: " << error.message; }); + [this](const DataStore::Error &error) { SinkError() << "Failed to find value in pipeline: " << error.message; }); d->storeNewRevision(newRevision, fbb, bufferType, key); @@ -439,10 +436,10 @@ KAsync::Job Pipeline::deletedEntity(void const *command, size_t size) void Pipeline::cleanupRevision(qint64 revision) { d->revisionChanged = true; - const auto uid = Storage::getUidFromRevision(d->transaction, revision); - const auto bufferType = Storage::getTypeFromRevision(d->transaction, revision); + const auto uid = DataStore::getUidFromRevision(d->transaction, revision); + const auto bufferType = DataStore::getTypeFromRevision(d->transaction, revision); SinkTrace() << "Cleaning up revision " << revision << uid << bufferType; - Storage::mainDatabase(d->transaction, bufferType) + DataStore::mainDatabase(d->transaction, bufferType) .scan(uid, [&](const QByteArray &key, const QByteArray &data) -> bool { EntityBuffer buffer(const_cast(data.data()), data.size()); @@ -453,20 +450,20 @@ void Pipeline::cleanupRevision(qint64 revision) const qint64 rev = metadata->revision(); // Remove old revisions, and the current if the entity has already been removed if (rev < revision || metadata->operation() == Operation_Removal) { - Storage::removeRevision(d->transaction, rev); - Storage::mainDatabase(d->transaction, bufferType).remove(key); + DataStore::removeRevision(d->transaction, rev); + DataStore::mainDatabase(d->transaction, bufferType).remove(key); } } return true; }, - [](const Storage::Error &error) { SinkWarning() << "Error while reading: " << error.message; }, true); - Storage::setCleanedUpRevision(d->transaction, revision); + [](const DataStore::Error &error) { SinkWarning() << "Error while reading: " << error.message; }, true); + DataStore::setCleanedUpRevision(d->transaction, revision); } qint64 Pipeline::cleanedUpRevision() { - return Storage::cleanedUpRevision(d->transaction); + return DataStore::cleanedUpRevision(d->transaction); } class Preprocessor::Private { @@ -523,8 +520,6 @@ void Preprocessor::createEntity(const Sink::ApplicationDomain::ApplicationDomain d->pipeline->newEntity(data, data.size()).exec(); } -} // namespace Sink - #pragma clang diagnostic push #pragma clang diagnostic ignored "-Wundefined-reinterpret-cast" #include "moc_pipeline.cpp" -- cgit v1.2.3