From 4d9746c828558c9f872e0aed52442863affb25d5 Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Thu, 3 Mar 2016 09:01:05 +0100 Subject: Fromatted the whole codebase with clang-format. clang-format -i */**{.cpp,.h} --- common/asyncutils.h | 27 +-- common/bufferadaptor.h | 54 +++-- common/bufferutils.h | 29 ++- common/commands.cpp | 22 +- common/commands.h | 10 +- common/definitions.h | 4 +- common/domainadaptor.cpp | 1 - common/domainadaptor.h | 52 ++-- common/domaintypeadaptorfactoryinterface.h | 17 +- common/entitybuffer.cpp | 23 +- common/entitybuffer.h | 14 +- common/facade.cpp | 26 +- common/facade.h | 15 +- common/facadefactory.cpp | 2 +- common/facadefactory.h | 17 +- common/facadeinterface.h | 24 +- common/genericresource.cpp | 378 ++++++++++++++--------------- common/genericresource.h | 29 ++- common/index.cpp | 40 ++- common/index.h | 12 +- common/indexupdater.h | 22 +- common/inspection.h | 11 +- common/listener.cpp | 61 +++-- common/listener.h | 16 +- common/log.cpp | 75 +++--- common/log.h | 13 +- common/messagequeue.cpp | 157 ++++++------ common/messagequeue.h | 17 +- common/modelresult.cpp | 66 +++-- common/modelresult.h | 10 +- common/notification.h | 4 +- common/notifier.cpp | 17 +- common/notifier.h | 4 +- common/pipeline.cpp | 247 +++++++++---------- common/pipeline.h | 11 +- common/propertymapper.cpp | 8 +- common/propertymapper.h | 71 +++--- common/query.h | 14 +- common/queryrunner.cpp | 292 +++++++++++----------- common/queryrunner.h | 12 +- common/resource.cpp | 29 +-- common/resource.h | 9 +- common/resourceaccess.cpp | 253 +++++++++---------- common/resourceaccess.h | 60 +++-- common/resourceconfig.cpp | 1 - common/resourcecontrol.cpp | 56 +++-- common/resourcecontrol.h | 6 +- common/resourcefacade.cpp | 22 +- common/resourcefacade.h | 7 +- common/resultprovider.h | 124 +++++----- common/resultset.cpp | 44 +--- common/resultset.h | 60 ++--- common/storage.h | 77 +++--- common/storage_common.cpp | 78 +++--- common/storage_lmdb.cpp | 132 ++++------ common/store.cpp | 145 ++++++----- common/store.h | 19 +- common/synclistresult.h | 26 +- common/threadboundary.cpp | 9 +- common/threadboundary.h | 8 +- common/typeindex.cpp | 39 ++- common/typeindex.h | 9 +- 62 files changed, 1513 insertions(+), 1624 deletions(-) (limited to 'common') diff --git a/common/asyncutils.h b/common/asyncutils.h index ddcc37c..2cf010e 100644 --- a/common/asyncutils.h +++ b/common/asyncutils.h @@ -24,19 +24,18 @@ #include namespace async { - template - KAsync::Job run(const std::function &f) - { - return KAsync::start([f](KAsync::Future &future) { - auto result = QtConcurrent::run(f); - auto watcher = new QFutureWatcher; - watcher->setFuture(result); - QObject::connect(watcher, &QFutureWatcher::finished, watcher, [&future, watcher]() { - future.setValue(watcher->future().result()); - delete watcher; - future.setFinished(); - }); +template +KAsync::Job run(const std::function &f) +{ + return KAsync::start([f](KAsync::Future &future) { + auto result = QtConcurrent::run(f); + auto watcher = new QFutureWatcher; + watcher->setFuture(result); + QObject::connect(watcher, &QFutureWatcher::finished, watcher, [&future, watcher]() { + future.setValue(watcher->future().result()); + delete watcher; + future.setFinished(); }); - } - + }); +} } diff --git a/common/bufferadaptor.h b/common/bufferadaptor.h index 892635f..0087643 100644 --- a/common/bufferadaptor.h +++ b/common/bufferadaptor.h @@ -30,44 +30,64 @@ namespace ApplicationDomain { /** * This class has to be implemented by resources and can be used as generic interface to access the buffer properties */ -class BufferAdaptor { +class BufferAdaptor +{ public: - virtual ~BufferAdaptor() {} - virtual QVariant getProperty(const QByteArray &key) const { return QVariant(); } - virtual void setProperty(const QByteArray &key, const QVariant &value) {} - virtual QList availableProperties() const { return QList(); } + virtual ~BufferAdaptor() + { + } + virtual QVariant getProperty(const QByteArray &key) const + { + return QVariant(); + } + virtual void setProperty(const QByteArray &key, const QVariant &value) + { + } + virtual QList availableProperties() const + { + return QList(); + } }; -class MemoryBufferAdaptor : public BufferAdaptor { +class MemoryBufferAdaptor : public BufferAdaptor +{ public: - MemoryBufferAdaptor() - : BufferAdaptor() + MemoryBufferAdaptor() : BufferAdaptor() { } - MemoryBufferAdaptor(const BufferAdaptor &buffer, const QList &properties) - : BufferAdaptor() + MemoryBufferAdaptor(const BufferAdaptor &buffer, const QList &properties) : BufferAdaptor() { if (properties.isEmpty()) { - for(const auto &property : buffer.availableProperties()) { + for (const auto &property : buffer.availableProperties()) { mValues.insert(property, buffer.getProperty(property)); } } else { - for(const auto &property : properties) { + for (const auto &property : properties) { mValues.insert(property, buffer.getProperty(property)); } } } - virtual ~MemoryBufferAdaptor() {} + virtual ~MemoryBufferAdaptor() + { + } - virtual QVariant getProperty(const QByteArray &key) const { return mValues.value(key); } - virtual void setProperty(const QByteArray &key, const QVariant &value) { mValues.insert(key, value); } - virtual QByteArrayList availableProperties() const { return mValues.keys(); } + virtual QVariant getProperty(const QByteArray &key) const + { + return mValues.value(key); + } + virtual void setProperty(const QByteArray &key, const QVariant &value) + { + mValues.insert(key, value); + } + virtual QByteArrayList availableProperties() const + { + return mValues.keys(); + } private: QHash mValues; }; - } } diff --git a/common/bufferutils.h b/common/bufferutils.h index b08b7f8..1eb5d15 100644 --- a/common/bufferutils.h +++ b/common/bufferutils.h @@ -5,22 +5,21 @@ namespace Sink { namespace BufferUtils { - template - static QByteArray extractBuffer(const T *data) - { - return QByteArray::fromRawData(reinterpret_cast(data->Data()), data->size()); - } +template +static QByteArray extractBuffer(const T *data) +{ + return QByteArray::fromRawData(reinterpret_cast(data->Data()), data->size()); +} - template - static QByteArray extractBufferCopy(const T *data) - { - return QByteArray(reinterpret_cast(data->Data()), data->size()); - } +template +static QByteArray extractBufferCopy(const T *data) +{ + return QByteArray(reinterpret_cast(data->Data()), data->size()); +} - static QByteArray extractBuffer(const flatbuffers::FlatBufferBuilder &fbb) - { - return QByteArray::fromRawData(reinterpret_cast(fbb.GetBufferPointer()), fbb.GetSize()); - } +static QByteArray extractBuffer(const flatbuffers::FlatBufferBuilder &fbb) +{ + return QByteArray::fromRawData(reinterpret_cast(fbb.GetBufferPointer()), fbb.GetSize()); +} } } - diff --git a/common/commands.cpp b/common/commands.cpp index 5d38afa..91657b8 100644 --- a/common/commands.cpp +++ b/common/commands.cpp @@ -22,15 +22,13 @@ #include -namespace Sink -{ +namespace Sink { -namespace Commands -{ +namespace Commands { QByteArray name(int commandId) { - switch(commandId) { + switch (commandId) { case UnknownCommand: return "Unknown"; case CommandCompletionCommand: @@ -85,9 +83,9 @@ void write(QIODevice *device, int messageId, int commandId, const char *buffer, size = 0; } - device->write((const char*)&messageId, sizeof(int)); - device->write((const char*)&commandId, sizeof(int)); - device->write((const char*)&size, sizeof(uint)); + device->write((const char *)&messageId, sizeof(int)); + device->write((const char *)&commandId, sizeof(int)); + device->write((const char *)&size, sizeof(uint)); if (buffer) { device->write(buffer, size); } @@ -96,10 +94,10 @@ void write(QIODevice *device, int messageId, int commandId, const char *buffer, void write(QIODevice *device, int messageId, int commandId, flatbuffers::FlatBufferBuilder &fbb) { const int dataSize = fbb.GetSize(); - device->write((const char*)&messageId, sizeof(int)); - device->write((const char*)&commandId, sizeof(int)); - device->write((const char*)&dataSize, sizeof(int)); - device->write((const char*)fbb.GetBufferPointer(), dataSize); + device->write((const char *)&messageId, sizeof(int)); + device->write((const char *)&commandId, sizeof(int)); + device->write((const char *)&dataSize, sizeof(int)); + device->write((const char *)fbb.GetBufferPointer(), dataSize); } } // namespace Commands diff --git a/common/commands.h b/common/commands.h index 64abd76..b97bbc6 100644 --- a/common/commands.h +++ b/common/commands.h @@ -26,13 +26,12 @@ class QIODevice; -namespace Sink -{ +namespace Sink { -namespace Commands -{ +namespace Commands { -enum CommandIds { +enum CommandIds +{ UnknownCommand = 0, CommandCompletionCommand, HandshakeCommand, @@ -59,7 +58,6 @@ int SINK_EXPORT headerSize(); void SINK_EXPORT write(QIODevice *device, int messageId, int commandId); void SINK_EXPORT write(QIODevice *device, int messageId, int commandId, const char *buffer, uint size); void SINK_EXPORT write(QIODevice *device, int messageId, int commandId, flatbuffers::FlatBufferBuilder &fbb); - } } // namespace Sink diff --git a/common/definitions.h b/common/definitions.h index 029d3f8..96ec27e 100644 --- a/common/definitions.h +++ b/common/definitions.h @@ -25,6 +25,6 @@ #include namespace Sink { - QString SINK_EXPORT storageLocation(); - QByteArray SINK_EXPORT resourceName(const QByteArray &instanceIdentifier); +QString SINK_EXPORT storageLocation(); +QByteArray SINK_EXPORT resourceName(const QByteArray &instanceIdentifier); } diff --git a/common/domainadaptor.cpp b/common/domainadaptor.cpp index 2955cab..dcacc94 100644 --- a/common/domainadaptor.cpp +++ b/common/domainadaptor.cpp @@ -18,4 +18,3 @@ */ #include "domainadaptor.h" - diff --git a/common/domainadaptor.h b/common/domainadaptor.h index 9ce5a8a..0159c6c 100644 --- a/common/domainadaptor.h +++ b/common/domainadaptor.h @@ -40,10 +40,11 @@ * Create a buffer from a domain object using the provided mappings */ template -flatbuffers::Offset createBufferPart(const Sink::ApplicationDomain::ApplicationDomainType &domainObject, flatbuffers::FlatBufferBuilder &fbb, const WritePropertyMapper &mapper) +flatbuffers::Offset +createBufferPart(const Sink::ApplicationDomain::ApplicationDomainType &domainObject, flatbuffers::FlatBufferBuilder &fbb, const WritePropertyMapper &mapper) { - //First create a primitives such as strings using the mappings - QList > propertiesToAddToResource; + // First create a primitives such as strings using the mappings + QList> propertiesToAddToResource; for (const auto &property : domainObject.changedProperties()) { // Trace() << "copying property " << property; const auto value = domainObject.getProperty(property); @@ -54,7 +55,7 @@ flatbuffers::Offset createBufferPart(const Sink::ApplicationDomain::Appl } } - //Then create all porperties using the above generated builderCalls + // Then create all porperties using the above generated builderCalls Builder builder(fbb); for (auto propertyBuilder : propertiesToAddToResource) { propertyBuilder(builder); @@ -64,7 +65,7 @@ flatbuffers::Offset createBufferPart(const Sink::ApplicationDomain::Appl /** * Create the buffer and finish the FlatBufferBuilder. - * + * * After this the buffer can be extracted from the FlatBufferBuilder object. */ template @@ -88,10 +89,8 @@ template class GenericBufferAdaptor : public Sink::ApplicationDomain::BufferAdaptor { public: - GenericBufferAdaptor() - : BufferAdaptor() + GenericBufferAdaptor() : BufferAdaptor() { - } virtual QVariant getProperty(const QByteArray &key) const @@ -112,8 +111,8 @@ public: LocalBuffer const *mLocalBuffer; ResourceBuffer const *mResourceBuffer; - QSharedPointer > mLocalMapper; - QSharedPointer > mResourceMapper; + QSharedPointer> mLocalMapper; + QSharedPointer> mResourceMapper; }; /** @@ -121,23 +120,23 @@ public: * It defines how values are split accross local and resource buffer. * This is required by the facade the read the value, and by the pipeline preprocessors to access the domain values in a generic way. */ -template +template class SINK_EXPORT DomainTypeAdaptorFactory : public DomainTypeAdaptorFactoryInterface { typedef typename Sink::ApplicationDomain::TypeImplementation::Buffer LocalBuffer; typedef typename Sink::ApplicationDomain::TypeImplementation::BufferBuilder LocalBuilder; + public: - DomainTypeAdaptorFactory() : - mLocalMapper(Sink::ApplicationDomain::TypeImplementation::initializeReadPropertyMapper()), - mResourceMapper(QSharedPointer >::create()), - mLocalWriteMapper(Sink::ApplicationDomain::TypeImplementation::initializeWritePropertyMapper()), - mResourceWriteMapper(QSharedPointer >::create()) - {}; - virtual ~DomainTypeAdaptorFactory() {}; + DomainTypeAdaptorFactory() + : mLocalMapper(Sink::ApplicationDomain::TypeImplementation::initializeReadPropertyMapper()), + mResourceMapper(QSharedPointer>::create()), + mLocalWriteMapper(Sink::ApplicationDomain::TypeImplementation::initializeWritePropertyMapper()), + mResourceWriteMapper(QSharedPointer>::create()){}; + virtual ~DomainTypeAdaptorFactory(){}; /** * Creates an adaptor for the given domain and resource types. - * + * * This returns by default a GenericBufferAdaptor initialized with the corresponding property mappers. */ virtual QSharedPointer createAdaptor(const Sink::Entity &entity) Q_DECL_OVERRIDE @@ -146,7 +145,7 @@ public: const auto localBuffer = Sink::EntityBuffer::readBuffer(entity.local()); // const auto metadataBuffer = Sink::EntityBuffer::readBuffer(entity.metadata()); - auto adaptor = QSharedPointer >::create(); + auto adaptor = QSharedPointer>::create(); adaptor->mLocalBuffer = localBuffer; adaptor->mLocalMapper = mLocalMapper; adaptor->mResourceBuffer = resourceBuffer; @@ -154,7 +153,8 @@ public: return adaptor; } - virtual void createBuffer(const Sink::ApplicationDomain::ApplicationDomainType &domainObject, flatbuffers::FlatBufferBuilder &fbb, void const *metadataData = 0, size_t metadataSize = 0) Q_DECL_OVERRIDE + virtual void + createBuffer(const Sink::ApplicationDomain::ApplicationDomainType &domainObject, flatbuffers::FlatBufferBuilder &fbb, void const *metadataData = 0, size_t metadataSize = 0) Q_DECL_OVERRIDE { flatbuffers::FlatBufferBuilder localFbb; if (mLocalWriteMapper) { @@ -173,10 +173,8 @@ public: protected: - QSharedPointer > mLocalMapper; - QSharedPointer > mResourceMapper; - QSharedPointer > mLocalWriteMapper; - QSharedPointer > mResourceWriteMapper; + QSharedPointer> mLocalMapper; + QSharedPointer> mResourceMapper; + QSharedPointer> mLocalWriteMapper; + QSharedPointer> mResourceWriteMapper; }; - - diff --git a/common/domaintypeadaptorfactoryinterface.h b/common/domaintypeadaptorfactoryinterface.h index 4d0ce04..72aa9b9 100644 --- a/common/domaintypeadaptorfactoryinterface.h +++ b/common/domaintypeadaptorfactoryinterface.h @@ -21,22 +21,22 @@ #include namespace Sink { - namespace ApplicationDomain { - class BufferAdaptor; - class ApplicationDomainType; - } - struct Entity; +namespace ApplicationDomain { +class BufferAdaptor; +class ApplicationDomainType; +} +struct Entity; } namespace flatbuffers { - class FlatBufferBuilder; +class FlatBufferBuilder; } class DomainTypeAdaptorFactoryInterface { public: typedef QSharedPointer Ptr; - virtual ~DomainTypeAdaptorFactoryInterface() {}; + virtual ~DomainTypeAdaptorFactoryInterface(){}; virtual QSharedPointer createAdaptor(const Sink::Entity &entity) = 0; /* @@ -44,5 +44,6 @@ public: * * Note that this only serialized parameters that are part of ApplicationDomainType::changedProperties() */ - virtual void createBuffer(const Sink::ApplicationDomain::ApplicationDomainType &domainType, flatbuffers::FlatBufferBuilder &fbb, void const *metadataData = 0, size_t metadataSize = 0) = 0; + virtual void + createBuffer(const Sink::ApplicationDomain::ApplicationDomainType &domainType, flatbuffers::FlatBufferBuilder &fbb, void const *metadataData = 0, size_t metadataSize = 0) = 0; }; diff --git a/common/entitybuffer.cpp b/common/entitybuffer.cpp index b4a5cb2..950bc46 100644 --- a/common/entitybuffer.cpp +++ b/common/entitybuffer.cpp @@ -6,8 +6,7 @@ using namespace Sink; -EntityBuffer::EntityBuffer(const void *dataValue, int dataSize) - : mEntity(nullptr) +EntityBuffer::EntityBuffer(const void *dataValue, int dataSize) : mEntity(nullptr) { flatbuffers::Verifier verifyer(reinterpret_cast(dataValue), dataSize); // Q_ASSERT(Sink::VerifyEntity(verifyer)); @@ -18,10 +17,8 @@ EntityBuffer::EntityBuffer(const void *dataValue, int dataSize) } } -EntityBuffer::EntityBuffer(const QByteArray &data) - : EntityBuffer(data.constData(), data.size()) +EntityBuffer::EntityBuffer(const QByteArray &data) : EntityBuffer(data.constData(), data.size()) { - } bool EntityBuffer::isValid() const @@ -35,7 +32,7 @@ const Sink::Entity &EntityBuffer::entity() return *mEntity; } -const uint8_t* EntityBuffer::resourceBuffer() +const uint8_t *EntityBuffer::resourceBuffer() { if (!mEntity) { qDebug() << "no buffer"; @@ -44,7 +41,7 @@ const uint8_t* EntityBuffer::resourceBuffer() return mEntity->resource()->Data(); } -const uint8_t* EntityBuffer::metadataBuffer() +const uint8_t *EntityBuffer::metadataBuffer() { if (!mEntity) { return nullptr; @@ -52,7 +49,7 @@ const uint8_t* EntityBuffer::metadataBuffer() return mEntity->metadata()->Data(); } -const uint8_t* EntityBuffer::localBuffer() +const uint8_t *EntityBuffer::localBuffer() { if (!mEntity) { return nullptr; @@ -68,17 +65,18 @@ void EntityBuffer::extractResourceBuffer(void *dataValue, int dataSize, const st } } -flatbuffers::Offset > EntityBuffer::appendAsVector(flatbuffers::FlatBufferBuilder &fbb, void const *data, size_t size) +flatbuffers::Offset> EntityBuffer::appendAsVector(flatbuffers::FlatBufferBuilder &fbb, void const *data, size_t size) { - //Since we do memcpy trickery, this will only work on little endian + // Since we do memcpy trickery, this will only work on little endian assert(FLATBUFFERS_LITTLEENDIAN); uint8_t *rawDataPtr = Q_NULLPTR; const auto pos = fbb.CreateUninitializedVector(size, &rawDataPtr); - std::memcpy((void*)rawDataPtr, data, size); + std::memcpy((void *)rawDataPtr, data, size); return pos; } -void EntityBuffer::assembleEntityBuffer(flatbuffers::FlatBufferBuilder &fbb, void const *metadataData, size_t metadataSize, void const *resourceData, size_t resourceSize, void const *localData, size_t localSize) +void EntityBuffer::assembleEntityBuffer( + flatbuffers::FlatBufferBuilder &fbb, void const *metadataData, size_t metadataSize, void const *resourceData, size_t resourceSize, void const *localData, size_t localSize) { auto metadata = appendAsVector(fbb, metadataData, metadataSize); auto resource = appendAsVector(fbb, resourceData, resourceSize); @@ -86,4 +84,3 @@ void EntityBuffer::assembleEntityBuffer(flatbuffers::FlatBufferBuilder &fbb, voi auto entity = Sink::CreateEntity(fbb, metadata, resource, local); Sink::FinishEntityBuffer(fbb, entity); } - diff --git a/common/entitybuffer.h b/common/entitybuffer.h index 474a619..24f0b6b 100644 --- a/common/entitybuffer.h +++ b/common/entitybuffer.h @@ -8,7 +8,8 @@ namespace Sink { struct Entity; -class SINK_EXPORT EntityBuffer { +class SINK_EXPORT EntityBuffer +{ public: EntityBuffer(const void *dataValue, int size); EntityBuffer(const QByteArray &data); @@ -25,9 +26,10 @@ public: * We can't use union's either (which would allow to have a field that stores a selection of tables), as we don't want to modify * the entity schema for each resource's buffers. */ - static void assembleEntityBuffer(flatbuffers::FlatBufferBuilder &fbb, void const *metadataData, size_t metadataSize, void const *resourceData, size_t resourceSize, void const *localData, size_t localSize); - static flatbuffers::Offset > appendAsVector(flatbuffers::FlatBufferBuilder &fbb, void const *data, size_t size); - template + static void assembleEntityBuffer( + flatbuffers::FlatBufferBuilder &fbb, void const *metadataData, size_t metadataSize, void const *resourceData, size_t resourceSize, void const *localData, size_t localSize); + static flatbuffers::Offset> appendAsVector(flatbuffers::FlatBufferBuilder &fbb, void const *data, size_t size); + template static const T *readBuffer(const uint8_t *data, int size) { flatbuffers::Verifier verifier(data, size); @@ -37,7 +39,7 @@ public: return nullptr; } - template + template static const T *readBuffer(const flatbuffers::Vector *data) { if (data) { @@ -50,6 +52,4 @@ public: private: const Entity *mEntity; }; - } - diff --git a/common/facade.cpp b/common/facade.cpp index 1219887..803f85c 100644 --- a/common/facade.cpp +++ b/common/facade.cpp @@ -32,31 +32,29 @@ using namespace Sink; #undef DEBUG_AREA #define DEBUG_AREA "client.facade" -template -GenericFacade::GenericFacade(const QByteArray &resourceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &adaptorFactory , const QSharedPointer resourceAccess) - : Sink::StoreFacade(), - mResourceAccess(resourceAccess), - mDomainTypeAdaptorFactory(adaptorFactory), - mResourceInstanceIdentifier(resourceIdentifier) +template +GenericFacade::GenericFacade( + const QByteArray &resourceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &adaptorFactory, const QSharedPointer resourceAccess) + : Sink::StoreFacade(), mResourceAccess(resourceAccess), mDomainTypeAdaptorFactory(adaptorFactory), mResourceInstanceIdentifier(resourceIdentifier) { if (!mResourceAccess) { mResourceAccess = ResourceAccessFactory::instance().getAccess(resourceIdentifier); } } -template +template GenericFacade::~GenericFacade() { } -template +template QByteArray GenericFacade::bufferTypeForDomainType() { - //We happen to have a one to one mapping + // We happen to have a one to one mapping return Sink::ApplicationDomain::getTypeName(); } -template +template KAsync::Job GenericFacade::create(const DomainType &domainObject) { if (!mDomainTypeAdaptorFactory) { @@ -68,7 +66,7 @@ KAsync::Job GenericFacade::create(const DomainType &domainObje return mResourceAccess->sendCreateCommand(bufferTypeForDomainType(), BufferUtils::extractBuffer(entityFbb)); } -template +template KAsync::Job GenericFacade::modify(const DomainType &domainObject) { if (!mDomainTypeAdaptorFactory) { @@ -80,16 +78,16 @@ KAsync::Job GenericFacade::modify(const DomainType &domainObje return mResourceAccess->sendModifyCommand(domainObject.identifier(), domainObject.revision(), bufferTypeForDomainType(), QByteArrayList(), BufferUtils::extractBuffer(entityFbb)); } -template +template KAsync::Job GenericFacade::remove(const DomainType &domainObject) { return mResourceAccess->sendDeleteCommand(domainObject.identifier(), domainObject.revision(), bufferTypeForDomainType()); } -template +template QPair, typename ResultEmitter::Ptr> GenericFacade::load(const Sink::Query &query) { - //The runner lives for the lifetime of the query + // The runner lives for the lifetime of the query auto runner = new QueryRunner(query, mResourceAccess, mResourceInstanceIdentifier, mDomainTypeAdaptorFactory, bufferTypeForDomainType()); runner->setResultTransformation(mResultTransformation); return qMakePair(KAsync::null(), runner->emitter()); diff --git a/common/facade.h b/common/facade.h index 99fbcdc..a24ac7a 100644 --- a/common/facade.h +++ b/common/facade.h @@ -33,8 +33,9 @@ namespace Sink { /** * Default facade implementation for resources that are implemented in a separate process using the ResourceAccess class. - * - * Ideally a basic resource has no implementation effort for the facades and can simply instanciate default implementations (meaning it only has to implement the factory with all supported types). + * + * Ideally a basic resource has no implementation effort for the facades and can simply instanciate default implementations (meaning it only has to implement the factory with all + * supported types). * A resource has to implement: * * A facade factory registering all available facades * * An adaptor factory if it uses special resource buffers (default implementation can be used otherwise) @@ -43,16 +44,17 @@ namespace Sink { * Additionally a resource only has to provide a synchronizer plugin to execute the synchronization */ template -class SINK_EXPORT GenericFacade: public Sink::StoreFacade +class SINK_EXPORT GenericFacade : public Sink::StoreFacade { public: /** * Create a new GenericFacade - * + * * @param resourceIdentifier is the identifier of the resource instance * @param adaptorFactory is the adaptor factory used to generate the mappings from domain to resource types and vice versa */ - GenericFacade(const QByteArray &resourceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &adaptorFactory = DomainTypeAdaptorFactoryInterface::Ptr(), const QSharedPointer resourceAccess = QSharedPointer()); + GenericFacade(const QByteArray &resourceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &adaptorFactory = DomainTypeAdaptorFactoryInterface::Ptr(), + const QSharedPointer resourceAccess = QSharedPointer()); ~GenericFacade(); static QByteArray bufferTypeForDomainType(); @@ -63,10 +65,9 @@ public: protected: std::function mResultTransformation; - //TODO use one resource access instance per application & per resource + // TODO use one resource access instance per application & per resource QSharedPointer mResourceAccess; DomainTypeAdaptorFactoryInterface::Ptr mDomainTypeAdaptorFactory; QByteArray mResourceInstanceIdentifier; }; - } diff --git a/common/facadefactory.cpp b/common/facadefactory.cpp index d9ee5f7..ab52681 100644 --- a/common/facadefactory.cpp +++ b/common/facadefactory.cpp @@ -64,7 +64,7 @@ std::shared_ptr FacadeFactory::getFacade(const QByteArray &resource, const const QByteArray k = key(resource, typeName); if (!mFacadeRegistry.contains(k)) { locker.unlock(); - //This will call FacadeFactory::instace() internally + // This will call FacadeFactory::instace() internally Sink::ResourceFactory::load(QString::fromLatin1(resource)); locker.relock(); } diff --git a/common/facadefactory.h b/common/facadefactory.h index ef2a3f9..83b8898 100644 --- a/common/facadefactory.h +++ b/common/facadefactory.h @@ -38,7 +38,8 @@ namespace Sink { * * If we were to provide default implementations for certain capabilities. Here would be the place to do so. */ -class SINK_EXPORT FacadeFactory { +class SINK_EXPORT FacadeFactory +{ public: typedef std::function(const QByteArray &)> FactoryFunction; @@ -48,10 +49,10 @@ public: static QByteArray key(const QByteArray &resource, const QByteArray &type); - template + template void registerFacade(const QByteArray &resource) { - registerFacade(resource, [](const QByteArray &instanceIdentifier){ return std::make_shared(instanceIdentifier); }, ApplicationDomain::getTypeName()); + registerFacade(resource, [](const QByteArray &instanceIdentifier) { return std::make_shared(instanceIdentifier); }, ApplicationDomain::getTypeName()); } /* @@ -59,7 +60,7 @@ public: * * Primarily for testing. */ - template + template void registerFacade(const QByteArray &resource, const FactoryFunction &customFactoryFunction) { registerFacade(resource, customFactoryFunction, ApplicationDomain::getTypeName()); @@ -72,11 +73,11 @@ public: */ void resetFactory(); - template - std::shared_ptr > getFacade(const QByteArray &resource, const QByteArray &instanceIdentifier) + template + std::shared_ptr> getFacade(const QByteArray &resource, const QByteArray &instanceIdentifier) { const QByteArray typeName = ApplicationDomain::getTypeName(); - return std::static_pointer_cast >(getFacade(resource, instanceIdentifier, typeName)); + return std::static_pointer_cast>(getFacade(resource, instanceIdentifier, typeName)); } private: @@ -87,6 +88,4 @@ private: QHash mFacadeRegistry; static QMutex sMutex; }; - } - diff --git a/common/facadeinterface.h b/common/facadeinterface.h index 17cba5e..7d5dd7d 100644 --- a/common/facadeinterface.h +++ b/common/facadeinterface.h @@ -32,17 +32,21 @@ class Query; /** * Interface for the store facade. - * + * * All methods are synchronous. * Facades are stateful (they hold connections to resources and database). - * + * * TODO: would it make sense to split the write, read and notification parts? (we could potentially save some connections) */ -template -class StoreFacade { +template +class StoreFacade +{ public: virtual ~StoreFacade(){}; - QByteArray type() const { return ApplicationDomain::getTypeName(); } + QByteArray type() const + { + return ApplicationDomain::getTypeName(); + } /** * Create an entity in the store. @@ -68,11 +72,12 @@ public: /** * Load entities from the store. */ - virtual QPair, typename Sink::ResultEmitter::Ptr > load(const Query &query) = 0; + virtual QPair, typename Sink::ResultEmitter::Ptr> load(const Query &query) = 0; }; -template -class NullFacade : public StoreFacade { +template +class NullFacade : public StoreFacade +{ public: virtual ~NullFacade(){}; KAsync::Job create(const DomainType &domainObject) @@ -90,10 +95,9 @@ public: return KAsync::error(-1, "Failed to create a facade"); } - QPair, typename Sink::ResultEmitter::Ptr > load(const Query &query) + QPair, typename Sink::ResultEmitter::Ptr> load(const Query &query) { return qMakePair(KAsync::null(), typename Sink::ResultEmitter::Ptr()); } }; - } diff --git a/common/genericresource.cpp b/common/genericresource.cpp index 74a8cfb..9c9a12f 100644 --- a/common/genericresource.cpp +++ b/common/genericresource.cpp @@ -20,7 +20,7 @@ #include static int sBatchSize = 100; -//This interval directly affects the roundtrip time of single commands +// This interval directly affects the roundtrip time of single commands static int sCommitInterval = 10; using namespace Sink; @@ -39,26 +39,23 @@ class ChangeReplay : public QObject { Q_OBJECT public: - typedef std::function(const QByteArray &type, const QByteArray &key, const QByteArray &value)> ReplayFunction; ChangeReplay(const QString &resourceName, const ReplayFunction &replayFunction) - : mStorage(storageLocation(), resourceName, Storage::ReadOnly), - mChangeReplayStore(storageLocation(), resourceName + ".changereplay", Storage::ReadWrite), - mReplayFunction(replayFunction) + : mStorage(storageLocation(), resourceName, Storage::ReadOnly), mChangeReplayStore(storageLocation(), resourceName + ".changereplay", Storage::ReadWrite), mReplayFunction(replayFunction) { - } qint64 getLastReplayedRevision() { qint64 lastReplayedRevision = 0; auto replayStoreTransaction = mChangeReplayStore.createTransaction(Storage::ReadOnly); - replayStoreTransaction.openDatabase().scan("lastReplayedRevision", [&lastReplayedRevision](const QByteArray &key, const QByteArray &value) -> bool { - lastReplayedRevision = value.toLongLong(); - return false; - }, [](const Storage::Error &) { - }); + replayStoreTransaction.openDatabase().scan("lastReplayedRevision", + [&lastReplayedRevision](const QByteArray &key, const QByteArray &value) -> bool { + lastReplayedRevision = value.toLongLong(); + return false; + }, + [](const Storage::Error &) {}); return lastReplayedRevision; } @@ -79,28 +76,30 @@ public slots: auto mainStoreTransaction = mStorage.createTransaction(Storage::ReadOnly); auto replayStoreTransaction = mChangeReplayStore.createTransaction(Storage::ReadWrite); qint64 lastReplayedRevision = 1; - replayStoreTransaction.openDatabase().scan("lastReplayedRevision", [&lastReplayedRevision](const QByteArray &key, const QByteArray &value) -> bool { - lastReplayedRevision = value.toLongLong(); - return false; - }, [](const Storage::Error &) { - }); + replayStoreTransaction.openDatabase().scan("lastReplayedRevision", + [&lastReplayedRevision](const QByteArray &key, const QByteArray &value) -> bool { + lastReplayedRevision = value.toLongLong(); + return false; + }, + [](const Storage::Error &) {}); const qint64 topRevision = Storage::maxRevision(mainStoreTransaction); Trace() << "Changereplay from " << lastReplayedRevision << " to " << topRevision; if (lastReplayedRevision <= topRevision) { qint64 revision = lastReplayedRevision; - for (;revision <= topRevision; revision++) { + for (; revision <= topRevision; revision++) { const auto uid = Storage::getUidFromRevision(mainStoreTransaction, revision); const auto type = Storage::getTypeFromRevision(mainStoreTransaction, revision); const auto key = Storage::assembleKey(uid, revision); - Storage::mainDatabase(mainStoreTransaction, type).scan(key, [&lastReplayedRevision, type, this](const QByteArray &key, const QByteArray &value) -> bool { - mReplayFunction(type, key, value).exec(); - //TODO make for loop async, and pass to async replay function together with type - Trace() << "Replaying " << key; - return false; - }, [key](const Storage::Error &) { - ErrorMsg() << "Failed to replay change " << key; - }); + Storage::mainDatabase(mainStoreTransaction, type) + .scan(key, + [&lastReplayedRevision, type, this](const QByteArray &key, const QByteArray &value) -> bool { + mReplayFunction(type, key, value).exec(); + // TODO make for loop async, and pass to async replay function together with type + Trace() << "Replaying " << key; + return false; + }, + [key](const Storage::Error &) { ErrorMsg() << "Failed to replay change " << key; }); } revision--; replayStoreTransaction.openDatabase().write("lastReplayedRevision", QByteArray::number(revision)); @@ -126,15 +125,12 @@ class CommandProcessor : public QObject { Q_OBJECT typedef std::function(void const *, size_t)> InspectionFunction; + public: - CommandProcessor(Sink::Pipeline *pipeline, QList commandQueues) - : QObject(), - mPipeline(pipeline), - mCommandQueues(commandQueues), - mProcessingLock(false) + CommandProcessor(Sink::Pipeline *pipeline, QList commandQueues) : QObject(), mPipeline(pipeline), mCommandQueues(commandQueues), mProcessingLock(false) { mPipeline->startTransaction(); - //FIXME Should be initialized to the current value of the change replay queue + // FIXME Should be initialized to the current value of the change replay queue mLowerBoundRevision = Storage::maxRevision(mPipeline->transaction()); mPipeline->commit(); @@ -176,18 +172,20 @@ private slots: return; } mProcessingLock = true; - auto job = processPipeline().then([this]() { - mProcessingLock = false; - if (messagesToProcessAvailable()) { - process(); - } - }).exec(); + auto job = processPipeline() + .then([this]() { + mProcessingLock = false; + if (messagesToProcessAvailable()) { + process(); + } + }) + .exec(); } KAsync::Job processQueuedCommand(const Sink::QueuedCommand *queuedCommand) { Log() << "Processing command: " << Sink::Commands::name(queuedCommand->commandId()); - //Throw command into appropriate pipeline + // Throw command into appropriate pipeline switch (queuedCommand->commandId()) { case Sink::Commands::DeleteEntityCommand: return mPipeline->deletedEntity(queuedCommand->command()->Data(), queuedCommand->command()->size()); @@ -197,9 +195,7 @@ private slots: return mPipeline->newEntity(queuedCommand->command()->Data(), queuedCommand->command()->size()); case Sink::Commands::InspectionCommand: if (mInspect) { - return mInspect(queuedCommand->command()->Data(), queuedCommand->command()->size()).then([]() { - return -1; - }); + return mInspect(queuedCommand->command()->Data(), queuedCommand->command()->size()).then([]() { return -1; }); } else { return KAsync::error(-1, "Missing inspection command."); } @@ -218,50 +214,47 @@ private slots: auto queuedCommand = Sink::GetQueuedCommand(data.constData()); const auto commandId = queuedCommand->commandId(); Trace() << "Dequeued Command: " << Sink::Commands::name(commandId); - return processQueuedCommand(queuedCommand).then( - [commandId](qint64 createdRevision) -> qint64 { - Trace() << "Command pipeline processed: " << Sink::Commands::name(commandId); - return createdRevision; - } - , - [](int errorCode, QString errorMessage) { - //FIXME propagate error, we didn't handle it - Warning() << "Error while processing queue command: " << errorMessage; - } - ); + return processQueuedCommand(queuedCommand) + .then( + [commandId](qint64 createdRevision) -> qint64 { + Trace() << "Command pipeline processed: " << Sink::Commands::name(commandId); + return createdRevision; + }, + [](int errorCode, QString errorMessage) { + // FIXME propagate error, we didn't handle it + Warning() << "Error while processing queue command: " << errorMessage; + }); } - //Process all messages of this queue + // Process all messages of this queue KAsync::Job processQueue(MessageQueue *queue) { auto time = QSharedPointer::create(); - return KAsync::start([this](){ - mPipeline->startTransaction(); - }).then(KAsync::dowhile( - [queue]() { return !queue->isEmpty(); }, - [this, queue, time](KAsync::Future &future) { - queue->dequeueBatch(sBatchSize, [this, time](const QByteArray &data) { - time->start(); - return KAsync::start([this, data, time](KAsync::Future &future) { - processQueuedCommand(data).then([&future, this, time](qint64 createdRevision) { - Trace() << "Created revision " << createdRevision << ". Processing took: " << Log::TraceTime(time->elapsed()); + return KAsync::start([this]() { mPipeline->startTransaction(); }) + .then(KAsync::dowhile([queue]() { return !queue->isEmpty(); }, + [this, queue, time](KAsync::Future &future) { + queue->dequeueBatch(sBatchSize, + [this, time](const QByteArray &data) { + time->start(); + return KAsync::start([this, data, time](KAsync::Future &future) { + processQueuedCommand(data) + .then([&future, this, time](qint64 createdRevision) { + Trace() << "Created revision " << createdRevision << ". Processing took: " << Log::TraceTime(time->elapsed()); + future.setFinished(); + }) + .exec(); + }); + }) + .then([&future, queue]() { future.setFinished(); }, + [&future](int i, QString error) { + if (i != MessageQueue::ErrorCodes::NoMessageFound) { + Warning() << "Error while getting message from messagequeue: " << error; + } future.setFinished(); - }).exec(); - }); - } - ).then([&future, queue](){ - future.setFinished(); - }, - [&future](int i, QString error) { - if (i != MessageQueue::ErrorCodes::NoMessageFound) { - Warning() << "Error while getting message from messagequeue: " << error; - } - future.setFinished(); - }).exec(); - } - )).then([this]() { - mPipeline->commit(); - }); + }) + .exec(); + })) + .then([this]() { mPipeline->commit(); }); } KAsync::Job processPipeline() @@ -276,29 +269,29 @@ private slots: mPipeline->commit(); Trace() << "Cleanup done." << Log::TraceTime(time->elapsed()); - //Go through all message queues - auto it = QSharedPointer >::create(mCommandQueues); - return KAsync::dowhile( - [it]() { return it->hasNext(); }, + // Go through all message queues + auto it = QSharedPointer>::create(mCommandQueues); + return KAsync::dowhile([it]() { return it->hasNext(); }, [it, this](KAsync::Future &future) { auto time = QSharedPointer::create(); time->start(); auto queue = it->next(); - processQueue(queue).then([&future, time]() { - Trace() << "Queue processed." << Log::TraceTime(time->elapsed()); - future.setFinished(); - }).exec(); - } - ); + processQueue(queue) + .then([&future, time]() { + Trace() << "Queue processed." << Log::TraceTime(time->elapsed()); + future.setFinished(); + }) + .exec(); + }); } private: Sink::Pipeline *mPipeline; - //Ordered by priority - QList mCommandQueues; + // Ordered by priority + QList mCommandQueues; bool mProcessingLock; - //The lowest revision we no longer need + // The lowest revision we no longer need qint64 mLowerBoundRevision; InspectionFunction mInspect; }; @@ -308,14 +301,14 @@ private: GenericResource::GenericResource(const QByteArray &resourceInstanceIdentifier, const QSharedPointer &pipeline) : Sink::Resource(), - mUserQueue(Sink::storageLocation(), resourceInstanceIdentifier + ".userqueue"), - mSynchronizerQueue(Sink::storageLocation(), resourceInstanceIdentifier + ".synchronizerqueue"), - mResourceInstanceIdentifier(resourceInstanceIdentifier), - mPipeline(pipeline ? pipeline : QSharedPointer::create(resourceInstanceIdentifier)), - mError(0), - mClientLowerBoundRevision(std::numeric_limits::max()) -{ - mProcessor = new CommandProcessor(mPipeline.data(), QList() << &mUserQueue << &mSynchronizerQueue); + mUserQueue(Sink::storageLocation(), resourceInstanceIdentifier + ".userqueue"), + mSynchronizerQueue(Sink::storageLocation(), resourceInstanceIdentifier + ".synchronizerqueue"), + mResourceInstanceIdentifier(resourceInstanceIdentifier), + mPipeline(pipeline ? pipeline : QSharedPointer::create(resourceInstanceIdentifier)), + mError(0), + mClientLowerBoundRevision(std::numeric_limits::max()) +{ + mProcessor = new CommandProcessor(mPipeline.data(), QList() << &mUserQueue << &mSynchronizerQueue); mProcessor->setInspectionCommand([this](void const *command, size_t size) { flatbuffers::Verifier verifier((const uint8_t *)command, size); if (Sink::Commands::VerifyInspectionBuffer(verifier)) { @@ -330,22 +323,26 @@ GenericResource::GenericResource(const QByteArray &resourceInstanceIdentifier, c QDataStream s(expectedValueString); QVariant expectedValue; s >> expectedValue; - inspect(inspectionType, inspectionId, domainType, entityId, property, expectedValue).then([=]() { - Log_area("resource.inspection") << "Inspection was successful: " << inspectionType << inspectionId << entityId; - Sink::Notification n; - n.type = Sink::Commands::NotificationType_Inspection; - n.id = inspectionId; - n.code = Sink::Commands::NotificationCode_Success; - emit notify(n); - }, [=](int code, const QString &message) { - Log() << "Inspection failed: "<< inspectionType << inspectionId << entityId << message; - Sink::Notification n; - n.type = Sink::Commands::NotificationType_Inspection; - n.message = message; - n.id = inspectionId; - n.code = Sink::Commands::NotificationCode_Failure; - emit notify(n); - }).exec(); + inspect(inspectionType, inspectionId, domainType, entityId, property, expectedValue) + .then( + [=]() { + Log_area("resource.inspection") << "Inspection was successful: " << inspectionType << inspectionId << entityId; + Sink::Notification n; + n.type = Sink::Commands::NotificationType_Inspection; + n.id = inspectionId; + n.code = Sink::Commands::NotificationCode_Success; + emit notify(n); + }, + [=](int code, const QString &message) { + Log() << "Inspection failed: " << inspectionType << inspectionId << entityId << message; + Sink::Notification n; + n.type = Sink::Commands::NotificationType_Inspection; + n.message = message; + n.id = inspectionId; + n.code = Sink::Commands::NotificationCode_Failure; + emit notify(n); + }) + .exec(); return KAsync::null(); } return KAsync::error(-1, "Invalid inspection command."); @@ -353,9 +350,9 @@ GenericResource::GenericResource(const QByteArray &resourceInstanceIdentifier, c QObject::connect(mProcessor, &CommandProcessor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); }); QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, this, &Resource::revisionUpdated); mSourceChangeReplay = new ChangeReplay(resourceInstanceIdentifier, [this](const QByteArray &type, const QByteArray &key, const QByteArray &value) { - //This results in a deadlock when a sync is in progress and we try to create a second writing transaction (which is why we turn changereplay off during the sync) + // This results in a deadlock when a sync is in progress and we try to create a second writing transaction (which is why we turn changereplay off during the sync) auto synchronizationStore = QSharedPointer::create(Sink::storageLocation(), mResourceInstanceIdentifier + ".synchronization", Sink::Storage::ReadWrite); - return this->replay(*synchronizationStore, type, key, value).then([synchronizationStore](){}); + return this->replay(*synchronizationStore, type, key, value).then([synchronizationStore]() {}); }); enableChangeReplay(true); mClientLowerBoundRevision = mPipeline->cleanedUpRevision(); @@ -372,7 +369,8 @@ GenericResource::~GenericResource() delete mSourceChangeReplay; } -KAsync::Job GenericResource::inspect(int inspectionType, const QByteArray &inspectionId, const QByteArray &domainType, const QByteArray &entityId, const QByteArray &property, const QVariant &expectedValue) +KAsync::Job GenericResource::inspect( + int inspectionType, const QByteArray &inspectionId, const QByteArray &domainType, const QByteArray &entityId, const QByteArray &property, const QVariant &expectedValue) { Warning() << "Inspection not implemented"; return KAsync::null(); @@ -390,7 +388,7 @@ void GenericResource::enableChangeReplay(bool enable) } } -void GenericResource::addType(const QByteArray &type, DomainTypeAdaptorFactoryInterface::Ptr factory, const QVector &preprocessors) +void GenericResource::addType(const QByteArray &type, DomainTypeAdaptorFactoryInterface::Ptr factory, const QVector &preprocessors) { mPipeline->setPreprocessors(type, preprocessors); mPipeline->setAdaptorFactory(type, factory); @@ -463,14 +461,16 @@ KAsync::Job GenericResource::synchronizeWithSource() { return KAsync::start([this]() { Log() << " Synchronizing"; - //Changereplay would deadlock otherwise when trying to open the synchronization store + // Changereplay would deadlock otherwise when trying to open the synchronization store enableChangeReplay(false); auto mainStore = QSharedPointer::create(Sink::storageLocation(), mResourceInstanceIdentifier, Sink::Storage::ReadOnly); auto syncStore = QSharedPointer::create(Sink::storageLocation(), mResourceInstanceIdentifier + ".synchronization", Sink::Storage::ReadWrite); - synchronizeWithSource(*mainStore, *syncStore).then([this, mainStore, syncStore]() { - Log() << "Done Synchronizing"; - enableChangeReplay(true); - }).exec(); + synchronizeWithSource(*mainStore, *syncStore) + .then([this, mainStore, syncStore]() { + Log() << "Done Synchronizing"; + enableChangeReplay(true); + }) + .exec(); }); } @@ -484,42 +484,39 @@ static void waitForDrained(KAsync::Future &f, MessageQueue &queue) if (queue.isEmpty()) { f.setFinished(); } else { - QObject::connect(&queue, &MessageQueue::drained, [&f]() { - f.setFinished(); - }); + QObject::connect(&queue, &MessageQueue::drained, [&f]() { f.setFinished(); }); } }; KAsync::Job GenericResource::processAllMessages() { - //We have to wait for all items to be processed to ensure the synced items are available when a query gets executed. - //TODO: report errors while processing sync? - //TODO JOBAPI: A helper that waits for n events and then continues? + // We have to wait for all items to be processed to ensure the synced items are available when a query gets executed. + // TODO: report errors while processing sync? + // TODO JOBAPI: A helper that waits for n events and then continues? return KAsync::start([this](KAsync::Future &f) { - if (mCommitQueueTimer.isActive()) { - auto context = new QObject; - QObject::connect(&mCommitQueueTimer, &QTimer::timeout, context, [&f, context]() { - delete context; + if (mCommitQueueTimer.isActive()) { + auto context = new QObject; + QObject::connect(&mCommitQueueTimer, &QTimer::timeout, context, [&f, context]() { + delete context; + f.setFinished(); + }); + } else { + f.setFinished(); + } + }) + .then([this](KAsync::Future &f) { waitForDrained(f, mSynchronizerQueue); }) + .then([this](KAsync::Future &f) { waitForDrained(f, mUserQueue); }) + .then([this](KAsync::Future &f) { + if (mSourceChangeReplay->allChangesReplayed()) { f.setFinished(); - }); - } else { - f.setFinished(); - } - }).then([this](KAsync::Future &f) { - waitForDrained(f, mSynchronizerQueue); - }).then([this](KAsync::Future &f) { - waitForDrained(f, mUserQueue); - }).then([this](KAsync::Future &f) { - if (mSourceChangeReplay->allChangesReplayed()) { - f.setFinished(); - } else { - auto context = new QObject; - QObject::connect(mSourceChangeReplay, &ChangeReplay::changesReplayed, context, [&f, context]() { - delete context; - f.setFinished(); - }); - } - }); + } else { + auto context = new QObject; + QObject::connect(mSourceChangeReplay, &ChangeReplay::changesReplayed, context, [&f, context]() { + delete context; + f.setFinished(); + }); + } + }); } void GenericResource::updateLowerBoundRevision() @@ -533,14 +530,15 @@ void GenericResource::setLowerBoundRevision(qint64 revision) updateLowerBoundRevision(); } -void GenericResource::createEntity(const QByteArray &sinkId, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject, DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function callback) +void GenericResource::createEntity(const QByteArray &sinkId, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject, + DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function callback) { - //These changes are coming from the source + // These changes are coming from the source const auto replayToSource = false; flatbuffers::FlatBufferBuilder entityFbb; adaptorFactory.createBuffer(domainObject, entityFbb); flatbuffers::FlatBufferBuilder fbb; - //This is the resource type and not the domain type + // This is the resource type and not the domain type auto entityId = fbb.CreateString(sinkId.toStdString()); auto type = fbb.CreateString(bufferType.toStdString()); auto delta = Sink::EntityBuffer::appendAsVector(fbb, entityFbb.GetBufferPointer(), entityFbb.GetSize()); @@ -549,18 +547,19 @@ void GenericResource::createEntity(const QByteArray &sinkId, const QByteArray &b callback(BufferUtils::extractBuffer(fbb)); } -void GenericResource::modifyEntity(const QByteArray &sinkId, qint64 revision, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject, DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function callback) +void GenericResource::modifyEntity(const QByteArray &sinkId, qint64 revision, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject, + DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function callback) { - //These changes are coming from the source + // These changes are coming from the source const auto replayToSource = false; flatbuffers::FlatBufferBuilder entityFbb; adaptorFactory.createBuffer(domainObject, entityFbb); flatbuffers::FlatBufferBuilder fbb; auto entityId = fbb.CreateString(sinkId.toStdString()); - //This is the resource type and not the domain type + // This is the resource type and not the domain type auto type = fbb.CreateString(bufferType.toStdString()); auto delta = Sink::EntityBuffer::appendAsVector(fbb, entityFbb.GetBufferPointer(), entityFbb.GetSize()); - //TODO removals + // TODO removals auto location = Sink::Commands::CreateModifyEntity(fbb, revision, entityId, 0, type, delta, replayToSource); Sink::Commands::FinishModifyEntityBuffer(fbb, location); callback(BufferUtils::extractBuffer(fbb)); @@ -568,11 +567,11 @@ void GenericResource::modifyEntity(const QByteArray &sinkId, qint64 revision, co void GenericResource::deleteEntity(const QByteArray &sinkId, qint64 revision, const QByteArray &bufferType, std::function callback) { - //These changes are coming from the source + // These changes are coming from the source const auto replayToSource = false; flatbuffers::FlatBufferBuilder fbb; auto entityId = fbb.CreateString(sinkId.toStdString()); - //This is the resource type and not the domain type + // This is the resource type and not the domain type auto type = fbb.CreateString(bufferType.toStdString()); auto location = Sink::Commands::CreateDeleteEntity(fbb, revision, entityId, type, replayToSource); Sink::Commands::FinishDeleteEntityBuffer(fbb, location); @@ -581,7 +580,8 @@ void GenericResource::deleteEntity(const QByteArray &sinkId, qint64 revision, co void GenericResource::recordRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId, Sink::Storage::Transaction &transaction) { - Index("rid.mapping." + bufferType, transaction).add(remoteId, localId);; + Index("rid.mapping." + bufferType, transaction).add(remoteId, localId); + ; Index("localid.mapping." + bufferType, transaction).add(localId, remoteId); } @@ -600,7 +600,7 @@ void GenericResource::updateRemoteId(const QByteArray &bufferType, const QByteAr QByteArray GenericResource::resolveRemoteId(const QByteArray &bufferType, const QByteArray &remoteId, Sink::Storage::Transaction &transaction) { - //Lookup local id for remote id, or insert a new pair otherwise + // Lookup local id for remote id, or insert a new pair otherwise Index index("rid.mapping." + bufferType, transaction); QByteArray sinkId = index.lookup(remoteId); if (sinkId.isEmpty()) { @@ -621,19 +621,19 @@ QByteArray GenericResource::resolveLocalId(const QByteArray &bufferType, const Q return remoteId; } -void GenericResource::scanForRemovals(Sink::Storage::Transaction &transaction, Sink::Storage::Transaction &synchronizationTransaction, const QByteArray &bufferType, const std::function &callback)> &entryGenerator, std::function exists) +void GenericResource::scanForRemovals(Sink::Storage::Transaction &transaction, Sink::Storage::Transaction &synchronizationTransaction, const QByteArray &bufferType, + const std::function &callback)> &entryGenerator, std::function exists) { entryGenerator([this, &transaction, bufferType, &synchronizationTransaction, &exists](const QByteArray &key) { auto sinkId = Sink::Storage::uidFromKey(key); Trace() << "Checking for removal " << key; const auto remoteId = resolveLocalId(bufferType, sinkId, synchronizationTransaction); - //If we have no remoteId, the entity hasn't been replayed to the source yet + // If we have no remoteId, the entity hasn't been replayed to the source yet if (!remoteId.isEmpty()) { if (!exists(remoteId)) { Trace() << "Found a removed entity: " << sinkId; - deleteEntity(sinkId, Sink::Storage::maxRevision(transaction), bufferType, [this](const QByteArray &buffer) { - enqueueCommand(mSynchronizerQueue, Sink::Commands::DeleteEntityCommand, buffer); - }); + deleteEntity(sinkId, Sink::Storage::maxRevision(transaction), bufferType, + [this](const QByteArray &buffer) { enqueueCommand(mSynchronizerQueue, Sink::Commands::DeleteEntityCommand, buffer); }); } } }); @@ -642,32 +642,31 @@ void GenericResource::scanForRemovals(Sink::Storage::Transaction &transaction, S static QSharedPointer getLatest(const Sink::Storage::NamedDatabase &db, const QByteArray &uid, DomainTypeAdaptorFactoryInterface &adaptorFactory) { QSharedPointer current; - db.findLatest(uid, [¤t, &adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool { - Sink::EntityBuffer buffer(const_cast(data.data()), data.size()); - if (!buffer.isValid()) { - Warning() << "Read invalid buffer from disk"; - } else { - current = adaptorFactory.createAdaptor(buffer.entity()); - } - return false; - }, - [](const Sink::Storage::Error &error) { - Warning() << "Failed to read current value from storage: " << error.message; - }); + db.findLatest(uid, + [¤t, &adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool { + Sink::EntityBuffer buffer(const_cast(data.data()), data.size()); + if (!buffer.isValid()) { + Warning() << "Read invalid buffer from disk"; + } else { + current = adaptorFactory.createAdaptor(buffer.entity()); + } + return false; + }, + [](const Sink::Storage::Error &error) { Warning() << "Failed to read current value from storage: " << error.message; }); return current; } -void GenericResource::createOrModify(Sink::Storage::Transaction &transaction, Sink::Storage::Transaction &synchronizationTransaction, DomainTypeAdaptorFactoryInterface &adaptorFactory, const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity) +void GenericResource::createOrModify(Sink::Storage::Transaction &transaction, Sink::Storage::Transaction &synchronizationTransaction, + DomainTypeAdaptorFactoryInterface &adaptorFactory, const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity) { auto mainDatabase = Storage::mainDatabase(transaction, bufferType); const auto sinkId = resolveRemoteId(bufferType, remoteId, synchronizationTransaction); const auto found = mainDatabase.contains(sinkId); if (!found) { Trace() << "Found a new entity: " << remoteId; - createEntity(sinkId, bufferType, entity, adaptorFactory, [this](const QByteArray &buffer) { - enqueueCommand(mSynchronizerQueue, Sink::Commands::CreateEntityCommand, buffer); - }); - } else { //modification + createEntity( + sinkId, bufferType, entity, adaptorFactory, [this](const QByteArray &buffer) { enqueueCommand(mSynchronizerQueue, Sink::Commands::CreateEntityCommand, buffer); }); + } else { // modification if (auto current = getLatest(mainDatabase, sinkId, adaptorFactory)) { bool changed = false; for (const auto &property : entity.changedProperties()) { @@ -678,9 +677,8 @@ void GenericResource::createOrModify(Sink::Storage::Transaction &transaction, Si } if (changed) { Trace() << "Found a modified entity: " << remoteId; - modifyEntity(sinkId, Sink::Storage::maxRevision(transaction), bufferType, entity, adaptorFactory, [this](const QByteArray &buffer) { - enqueueCommand(mSynchronizerQueue, Sink::Commands::ModifyEntityCommand, buffer); - }); + modifyEntity(sinkId, Sink::Storage::maxRevision(transaction), bufferType, entity, adaptorFactory, + [this](const QByteArray &buffer) { enqueueCommand(mSynchronizerQueue, Sink::Commands::ModifyEntityCommand, buffer); }); } } else { Warning() << "Failed to get current entity"; diff --git a/common/genericresource.h b/common/genericresource.h index b9bb994..9665d6b 100644 --- a/common/genericresource.h +++ b/common/genericresource.h @@ -29,8 +29,7 @@ class CommandProcessor; class ChangeReplay; -namespace Sink -{ +namespace Sink { class Pipeline; class Preprocessor; @@ -48,7 +47,8 @@ public: virtual KAsync::Job synchronizeWithSource(Sink::Storage &mainStore, Sink::Storage &synchronizationStore); virtual KAsync::Job processAllMessages() Q_DECL_OVERRIDE; virtual void setLowerBoundRevision(qint64 revision) Q_DECL_OVERRIDE; - virtual KAsync::Job inspect(int inspectionType, const QByteArray &inspectionId, const QByteArray &domainType, const QByteArray &entityId, const QByteArray &property, const QVariant &expectedValue); + virtual KAsync::Job + inspect(int inspectionType, const QByteArray &inspectionId, const QByteArray &domainType, const QByteArray &entityId, const QByteArray &property, const QVariant &expectedValue); int error() const; @@ -61,13 +61,15 @@ private slots: protected: void enableChangeReplay(bool); - void addType(const QByteArray &type, DomainTypeAdaptorFactoryInterface::Ptr factory, const QVector &preprocessors); + void addType(const QByteArray &type, DomainTypeAdaptorFactoryInterface::Ptr factory, const QVector &preprocessors); virtual KAsync::Job replay(Sink::Storage &synchronizationStore, const QByteArray &type, const QByteArray &key, const QByteArray &value); void onProcessorError(int errorCode, const QString &errorMessage); void enqueueCommand(MessageQueue &mq, int commandId, const QByteArray &data); - static void createEntity(const QByteArray &localId, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject, DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function callback); - static void modifyEntity(const QByteArray &localId, qint64 revision, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject, DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function callback); + static void createEntity(const QByteArray &localId, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject, + DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function callback); + static void modifyEntity(const QByteArray &localId, qint64 revision, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject, + DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function callback); static void deleteEntity(const QByteArray &localId, qint64 revision, const QByteArray &bufferType, std::function callback); /** @@ -79,36 +81,38 @@ protected: /** * Tries to find a local id for the remote id, and creates a new local id otherwise. - * + * * The new local id is recorded in the local to remote id mapping. */ QByteArray resolveRemoteId(const QByteArray &type, const QByteArray &remoteId, Sink::Storage::Transaction &transaction); /** * Tries to find a remote id for a local id. - * + * * This can fail if the entity hasn't been written back to the server yet. */ QByteArray resolveLocalId(const QByteArray &bufferType, const QByteArray &localId, Sink::Storage::Transaction &transaction); /** * A synchronous algorithm to remove entities that are no longer existing. - * + * * A list of entities is generated by @param entryGenerator. * The entiry Generator typically iterates over an index to produce all existing entries. * This algorithm calls @param exists for every entity of type @param type, with its remoteId. For every entity where @param exists returns false, * an entity delete command is enqueued. - * + * * All functions are called synchronously, and both @param entryGenerator and @param exists need to be synchronous. */ - void scanForRemovals(Sink::Storage::Transaction &transaction, Sink::Storage::Transaction &synchronizationTransaction, const QByteArray &bufferType, const std::function &callback)> &entryGenerator, std::function exists); + void scanForRemovals(Sink::Storage::Transaction &transaction, Sink::Storage::Transaction &synchronizationTransaction, const QByteArray &bufferType, + const std::function &callback)> &entryGenerator, std::function exists); /** * An algorithm to create or modify the entity. * * Depending on whether the entity is locally available, or has changed. */ - void createOrModify(Sink::Storage::Transaction &transaction, Sink::Storage::Transaction &synchronizationTransaction, DomainTypeAdaptorFactoryInterface &adaptorFactory, const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity); + void createOrModify(Sink::Storage::Transaction &transaction, Sink::Storage::Transaction &synchronizationTransaction, DomainTypeAdaptorFactoryInterface &adaptorFactory, + const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity); MessageQueue mUserQueue; MessageQueue mSynchronizerQueue; @@ -122,5 +126,4 @@ private: QTimer mCommitQueueTimer; qint64 mClientLowerBoundRevision; }; - } diff --git a/common/index.cpp b/common/index.cpp index e35b838..b5e9980 100644 --- a/common/index.cpp +++ b/common/index.cpp @@ -7,17 +7,14 @@ Index::Index(const QString &storageRoot, const QString &name, Sink::Storage::AccessMode mode) : mTransaction(Sink::Storage(storageRoot, name, mode).createTransaction(mode)), - mDb(mTransaction.openDatabase(name.toLatin1(), std::function(), true)), - mName(name) + mDb(mTransaction.openDatabase(name.toLatin1(), std::function(), true)), + mName(name) { - } Index::Index(const QByteArray &name, Sink::Storage::Transaction &transaction) - : mDb(transaction.openDatabase(name, std::function(), true)), - mName(name) + : mDb(transaction.openDatabase(name, std::function(), true)), mName(name) { - } void Index::add(const QByteArray &key, const QByteArray &value) @@ -30,30 +27,23 @@ void Index::remove(const QByteArray &key, const QByteArray &value) mDb.remove(key, value); } -void Index::lookup(const QByteArray &key, const std::function &resultHandler, - const std::function &errorHandler, bool matchSubStringKeys) +void Index::lookup(const QByteArray &key, const std::function &resultHandler, const std::function &errorHandler, bool matchSubStringKeys) { - mDb.scan(key, [this, resultHandler](const QByteArray &key, const QByteArray &value) -> bool { - resultHandler(value); - return true; - }, - [errorHandler](const Sink::Storage::Error &error) { - Warning() << "Error while retrieving value" << error.message; - errorHandler(Error(error.store, error.code, error.message)); - }, - matchSubStringKeys); + mDb.scan(key, + [this, resultHandler](const QByteArray &key, const QByteArray &value) -> bool { + resultHandler(value); + return true; + }, + [errorHandler](const Sink::Storage::Error &error) { + Warning() << "Error while retrieving value" << error.message; + errorHandler(Error(error.store, error.code, error.message)); + }, + matchSubStringKeys); } QByteArray Index::lookup(const QByteArray &key) { QByteArray result; - lookup(key, - [&result](const QByteArray &value) { - result = value; - }, - [this](const Index::Error &error) { - Trace() << "Error while retrieving value" << error.message; - }); + lookup(key, [&result](const QByteArray &value) { result = value; }, [this](const Index::Error &error) { Trace() << "Error while retrieving value" << error.message; }); return result; } - diff --git a/common/index.h b/common/index.h index 3ee322a..0345f56 100644 --- a/common/index.h +++ b/common/index.h @@ -12,15 +12,17 @@ class SINK_EXPORT Index { public: - enum ErrorCodes { + enum ErrorCodes + { IndexNotAvailable = -1 }; class Error { public: - Error(const QByteArray &s, int c, const QByteArray &m) - : store(s), message(m), code(c) {} + Error(const QByteArray &s, int c, const QByteArray &m) : store(s), message(m), code(c) + { + } QByteArray store; QByteArray message; int code; @@ -32,8 +34,8 @@ public: void add(const QByteArray &key, const QByteArray &value); void remove(const QByteArray &key, const QByteArray &value); - void lookup(const QByteArray &key, const std::function &resultHandler, - const std::function &errorHandler, bool matchSubStringKeys = false); + void lookup(const QByteArray &key, const std::function &resultHandler, const std::function &errorHandler, + bool matchSubStringKeys = false); QByteArray lookup(const QByteArray &key); private: diff --git a/common/indexupdater.h b/common/indexupdater.h index ced220b..deaaa16 100644 --- a/common/indexupdater.h +++ b/common/indexupdater.h @@ -21,14 +21,11 @@ #include #include -class IndexUpdater : public Sink::Preprocessor { +class IndexUpdater : public Sink::Preprocessor +{ public: - IndexUpdater(const QByteArray &index, const QByteArray &type, const QByteArray &property) - :mIndexIdentifier(index), - mBufferType(type), - mProperty(property) + IndexUpdater(const QByteArray &index, const QByteArray &type, const QByteArray &property) : mIndexIdentifier(index), mBufferType(type), mProperty(property) { - } void newEntity(const QByteArray &uid, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &newEntity, Sink::Storage::Transaction &transaction) Q_DECL_OVERRIDE @@ -36,7 +33,8 @@ public: add(newEntity.getProperty(mProperty), uid, transaction); } - void modifiedEntity(const QByteArray &uid, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &oldEntity, const Sink::ApplicationDomain::BufferAdaptor &newEntity, Sink::Storage::Transaction &transaction) Q_DECL_OVERRIDE + void modifiedEntity(const QByteArray &uid, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &oldEntity, const Sink::ApplicationDomain::BufferAdaptor &newEntity, + Sink::Storage::Transaction &transaction) Q_DECL_OVERRIDE { remove(oldEntity.getProperty(mProperty), uid, transaction); add(newEntity.getProperty(mProperty), uid, transaction); @@ -57,7 +55,7 @@ private: void remove(const QVariant &value, const QByteArray &uid, Sink::Storage::Transaction &transaction) { - //TODO hide notfound error + // TODO hide notfound error Index(mIndexIdentifier, transaction).remove(value.toByteArray(), uid); } @@ -66,15 +64,17 @@ private: QByteArray mProperty; }; -template -class DefaultIndexUpdater : public Sink::Preprocessor { +template +class DefaultIndexUpdater : public Sink::Preprocessor +{ public: void newEntity(const QByteArray &uid, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &newEntity, Sink::Storage::Transaction &transaction) Q_DECL_OVERRIDE { Sink::ApplicationDomain::TypeImplementation::index(uid, newEntity, transaction); } - void modifiedEntity(const QByteArray &uid, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &oldEntity, const Sink::ApplicationDomain::BufferAdaptor &newEntity, Sink::Storage::Transaction &transaction) Q_DECL_OVERRIDE + void modifiedEntity(const QByteArray &uid, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &oldEntity, const Sink::ApplicationDomain::BufferAdaptor &newEntity, + Sink::Storage::Transaction &transaction) Q_DECL_OVERRIDE { Sink::ApplicationDomain::TypeImplementation::removeIndex(uid, oldEntity, transaction); Sink::ApplicationDomain::TypeImplementation::index(uid, newEntity, transaction); diff --git a/common/inspection.h b/common/inspection.h index d85eab6..7abcd1c 100644 --- a/common/inspection.h +++ b/common/inspection.h @@ -24,9 +24,10 @@ #include "applicationdomaintype.h" namespace Sink { - namespace ResourceControl { +namespace ResourceControl { -struct Inspection { +struct Inspection +{ static Inspection PropertyInspection(const Sink::ApplicationDomain::Entity &entity, const QByteArray &property, const QVariant &expectedValue) { Inspection inspection; @@ -46,7 +47,8 @@ struct Inspection { return inspection; } - enum Type { + enum Type + { PropertyInspectionType, ExistenceInspectionType }; @@ -55,6 +57,5 @@ struct Inspection { QByteArray property; QVariant expectedValue; }; - - } +} } diff --git a/common/listener.cpp b/common/listener.cpp index ed6f305..145267a 100644 --- a/common/listener.cpp +++ b/common/listener.cpp @@ -51,8 +51,7 @@ Listener::Listener(const QByteArray &resourceInstanceIdentifier, QObject *parent m_clientBufferProcessesTimer(new QTimer(this)), m_messageId(0) { - connect(m_server, &QLocalServer::newConnection, - this, &Listener::acceptConnection); + connect(m_server, &QLocalServer::newConnection, this, &Listener::acceptConnection); Trace() << "Trying to open " << m_resourceInstanceIdentifier; if (!m_server->listen(QString::fromLatin1(m_resourceInstanceIdentifier))) { @@ -77,12 +76,11 @@ Listener::Listener(const QByteArray &resourceInstanceIdentifier, QObject *parent } }); - //TODO: experiment with different timeouts + // TODO: experiment with different timeouts // or even just drop down to invoking the method queued? => invoke queued unless we need throttling m_clientBufferProcessesTimer->setInterval(0); m_clientBufferProcessesTimer->setSingleShot(true); - connect(m_clientBufferProcessesTimer, &QTimer::timeout, - this, &Listener::processClientBuffers); + connect(m_clientBufferProcessesTimer, &QTimer::timeout, this, &Listener::processClientBuffers); } Listener::~Listener() @@ -91,7 +89,7 @@ Listener::~Listener() void Listener::closeAllConnections() { - for (Client &client: m_connections) { + for (Client &client : m_connections) { if (client.socket) { disconnect(client.socket, 0, this, 0); client.socket->close(); @@ -114,13 +112,11 @@ void Listener::acceptConnection() } m_connections << Client("Unknown Client", socket); - connect(socket, &QIODevice::readyRead, - this, &Listener::onDataAvailable); - connect(socket, &QLocalSocket::disconnected, - this, &Listener::clientDropped); + connect(socket, &QIODevice::readyRead, this, &Listener::onDataAvailable); + connect(socket, &QLocalSocket::disconnected, this, &Listener::clientDropped); m_checkConnectionsTimer->stop(); - //If this is the first client, set the lower limit for revision cleanup + // If this is the first client, set the lower limit for revision cleanup if (m_connections.size() == 1) { loadResource()->setLowerBoundRevision(0); } @@ -157,7 +153,7 @@ void Listener::clientDropped() void Listener::checkConnections() { - //If this was the last client, disengage the lower limit for revision cleanup + // If this was the last client, disengage the lower limit for revision cleanup if (m_connections.isEmpty()) { loadResource()->setLowerBoundRevision(std::numeric_limits::max()); } @@ -176,7 +172,7 @@ void Listener::onDataAvailable() void Listener::readFromSocket(QLocalSocket *socket) { Trace() << "Reading from socket..."; - for (Client &client: m_connections) { + for (Client &client : m_connections) { if (client.socket == socket) { client.commandBuffer += socket->readAll(); if (!m_clientBufferProcessesTimer->isActive()) { @@ -189,11 +185,11 @@ void Listener::readFromSocket(QLocalSocket *socket) void Listener::processClientBuffers() { - //TODO: we should not process all clients, but iterate async over them and process + // TODO: we should not process all clients, but iterate async over them and process // one command from each in turn to ensure all clients get fair handling of // commands? bool again = false; - for (Client &client: m_connections) { + for (Client &client : m_connections) { if (!client.socket || !client.socket->isValid() || client.commandBuffer.isEmpty()) { continue; } @@ -237,9 +233,10 @@ void Listener::processCommand(int commandId, uint messageId, const QByteArray &c job = job.then(loadResource()->processAllMessages()); } job.then([callback, timer]() { - Trace() << "Sync took " << Sink::Log::TraceTime(timer->elapsed()); - callback(true); - }).exec(); + Trace() << "Sync took " << Sink::Log::TraceTime(timer->elapsed()); + callback(true); + }) + .exec(); return; } else { Warning() << "received invalid command"; @@ -256,7 +253,7 @@ void Listener::processCommand(int commandId, uint messageId, const QByteArray &c break; case Sink::Commands::ShutdownCommand: Log() << QString("\tReceived shutdown command from %1").arg(client.name); - //Immediately reject new connections + // Immediately reject new connections m_server->close(); QTimer::singleShot(0, this, &Listener::quit); break; @@ -273,16 +270,14 @@ void Listener::processCommand(int commandId, uint messageId, const QByteArray &c Warning() << "received invalid command"; } loadResource()->setLowerBoundRevision(lowerBoundRevision()); - } - break; + } break; case Sink::Commands::RemoveFromDiskCommand: { Log() << QString("\tReceived a remove from disk command from %1").arg(client.name); m_resource->removeDataFromDisk(); delete m_resource; m_resource = nullptr; loadResource()->setLowerBoundRevision(0); - } - break; + } break; default: if (commandId > Sink::Commands::CustomCommand) { Log() << QString("\tReceived custom command from %1: ").arg(client.name) << commandId; @@ -313,7 +308,7 @@ qint64 Listener::lowerBoundRevision() void Listener::quit() { - //Broadcast shutdown notifications to open clients, so they don't try to restart the resource + // Broadcast shutdown notifications to open clients, so they don't try to restart the resource auto command = Sink::Commands::CreateNotification(m_fbb, Sink::Commands::NotificationType::NotificationType_Shutdown); Sink::Commands::FinishNotificationBuffer(m_fbb, command); for (Client &client : m_connections) { @@ -323,7 +318,7 @@ void Listener::quit() } m_fbb.Clear(); - //Connections will be cleaned up later + // Connections will be cleaned up later emit noClients(); } @@ -334,12 +329,12 @@ bool Listener::processClientBuffer(Client &client) return false; } - const uint messageId = *(uint*)client.commandBuffer.constData(); - const int commandId = *(int*)(client.commandBuffer.constData() + sizeof(uint)); - const uint size = *(uint*)(client.commandBuffer.constData() + sizeof(int) + sizeof(uint)); + const uint messageId = *(uint *)client.commandBuffer.constData(); + const int commandId = *(int *)(client.commandBuffer.constData() + sizeof(uint)); + const uint size = *(uint *)(client.commandBuffer.constData() + sizeof(int) + sizeof(uint)); Trace() << "Received message. Id:" << messageId << " CommandId: " << commandId << " Size: " << size; - //TODO: reject messages above a certain size? + // TODO: reject messages above a certain size? const bool commandComplete = size <= uint(client.commandBuffer.size() - headerSize); if (commandComplete) { @@ -386,7 +381,7 @@ void Listener::updateClientsWithRevision(qint64 revision) auto command = Sink::Commands::CreateRevisionUpdate(m_fbb, revision); Sink::Commands::FinishRevisionUpdateBuffer(m_fbb, command); - for (const Client &client: m_connections) { + for (const Client &client : m_connections) { if (!client.socket || !client.socket->isValid()) { continue; } @@ -423,10 +418,8 @@ Sink::Resource *Listener::loadResource() m_resource = resourceFactory->createResource(m_resourceInstanceIdentifier); Trace() << QString("Resource factory: %1").arg((qlonglong)resourceFactory); Trace() << QString("\tResource: %1").arg((qlonglong)m_resource); - connect(m_resource, &Sink::Resource::revisionUpdated, - this, &Listener::refreshRevision); - connect(m_resource, &Sink::Resource::notify, - this, &Listener::notify); + connect(m_resource, &Sink::Resource::revisionUpdated, this, &Listener::refreshRevision); + connect(m_resource, &Sink::Resource::notify, this, &Listener::notify); } else { ErrorMsg() << "Failed to load resource " << m_resourceName; m_resource = new Sink::Resource; diff --git a/common/listener.h b/common/listener.h index 49ded1a..aca7c50 100644 --- a/common/listener.h +++ b/common/listener.h @@ -26,10 +26,9 @@ #include #include -namespace Sink -{ - class Resource; - class Notification; +namespace Sink { +class Resource; +class Notification; } class QTimer; @@ -38,16 +37,11 @@ class QLocalServer; class Client { public: - Client() - : socket(nullptr), - currentRevision(0) + Client() : socket(nullptr), currentRevision(0) { } - Client(const QString &n, QLocalSocket *s) - : name(n), - socket(s), - currentRevision(0) + Client(const QString &n, QLocalSocket *s) : name(n), socket(s), currentRevision(0) { } diff --git a/common/log.cpp b/common/log.cpp index 96c6f82..b0f6237 100644 --- a/common/log.cpp +++ b/common/log.cpp @@ -16,67 +16,88 @@ static QSharedPointer config() return QSharedPointer::create(QStandardPaths::writableLocation(QStandardPaths::GenericDataLocation) + "/sink/log.ini", QSettings::IniFormat); } -class DebugStream: public QIODevice +class DebugStream : public QIODevice { public: QString m_location; - DebugStream() - : QIODevice() + DebugStream() : QIODevice() { open(WriteOnly); } virtual ~DebugStream(); - bool isSequential() const { return true; } - qint64 readData(char *, qint64) { return 0; /* eof */ } - qint64 readLineData(char *, qint64) { return 0; /* eof */ } + bool isSequential() const + { + return true; + } + qint64 readData(char *, qint64) + { + return 0; /* eof */ + } + qint64 readLineData(char *, qint64) + { + return 0; /* eof */ + } qint64 writeData(const char *data, qint64 len) { std::cout << data << std::endl; return len; } + private: Q_DISABLE_COPY(DebugStream) }; -//Virtual method anchor +// Virtual method anchor DebugStream::~DebugStream() -{} +{ +} -class NullStream: public QIODevice +class NullStream : public QIODevice { public: - NullStream() - : QIODevice() + NullStream() : QIODevice() { open(WriteOnly); } virtual ~NullStream(); - bool isSequential() const { return true; } - qint64 readData(char *, qint64) { return 0; /* eof */ } - qint64 readLineData(char *, qint64) { return 0; /* eof */ } + bool isSequential() const + { + return true; + } + qint64 readData(char *, qint64) + { + return 0; /* eof */ + } + qint64 readLineData(char *, qint64) + { + return 0; /* eof */ + } qint64 writeData(const char *data, qint64 len) { return len; } + private: Q_DISABLE_COPY(NullStream) }; -//Virtual method anchor +// Virtual method anchor NullStream::~NullStream() -{} - - /* - * ANSI color codes: - * 0: reset colors/style - * 1: bold - * 4: underline - * 30 - 37: black, red, green, yellow, blue, magenta, cyan, and white text - * 40 - 47: black, red, green, yellow, blue, magenta, cyan, and white background - */ -enum ANSI_Colors { +{ +} + +/* + * ANSI color codes: + * 0: reset colors/style + * 1: bold + * 4: underline + * 30 - 37: black, red, green, yellow, blue, magenta, cyan, and white text + * 40 - 47: black, red, green, yellow, blue, magenta, cyan, and white background + */ +enum ANSI_Colors +{ DoNothing = -1, Reset = 0, Bold = 1, @@ -211,7 +232,7 @@ static bool caseInsensitiveContains(const QByteArray &pattern, const QByteArrayL return false; } -QDebug Sink::Log::debugStream(DebugLevel debugLevel, int line, const char* file, const char* function, const char* debugArea) +QDebug Sink::Log::debugStream(DebugLevel debugLevel, int line, const char *file, const char *function, const char *debugArea) { static NullStream nullstream; if (debugLevel < debugOutputLevel()) { diff --git a/common/log.h b/common/log.h index 415c7f7..0e92ea9 100644 --- a/common/log.h +++ b/common/log.h @@ -6,7 +6,8 @@ namespace Sink { namespace Log { -enum DebugLevel { +enum DebugLevel +{ Trace, Log, Warning, @@ -24,14 +25,15 @@ DebugLevel SINK_EXPORT debugLevelFromName(const QByteArray &name); void SINK_EXPORT setDebugOutputLevel(DebugLevel); DebugLevel SINK_EXPORT debugOutputLevel(); -enum FilterType { +enum FilterType +{ Area, ApplicationName }; /** * Sets a debug output filter. - * + * * Everything that is not matching the filter is ignored. * An empty filter matches everything. * @@ -53,7 +55,7 @@ QByteArrayList SINK_EXPORT debugOutputFilter(FilterType type); void SINK_EXPORT setDebugOutputFields(const QByteArrayList &filter); QByteArrayList SINK_EXPORT debugOutputFields(); -QDebug SINK_EXPORT debugStream(DebugLevel debugLevel, int line, const char* file, const char* function, const char* debugArea = 0); +QDebug SINK_EXPORT debugStream(DebugLevel debugLevel, int line, const char *file, const char *function, const char *debugArea = 0); struct SINK_EXPORT TraceTime { @@ -66,7 +68,6 @@ inline QDebug SINK_EXPORT operator<<(QDebug d, const TraceTime &time) d << time.time << "[ms]"; return d; } - } } @@ -83,5 +84,5 @@ inline QDebug SINK_EXPORT operator<<(QDebug d, const TraceTime &time) #define Trace() Sink::Log::debugStream(Sink::Log::DebugLevel::Trace, __LINE__, __FILE__, Q_FUNC_INFO, DEBUG_AREA) #define Log() Sink::Log::debugStream(Sink::Log::DebugLevel::Log, __LINE__, __FILE__, Q_FUNC_INFO, DEBUG_AREA) #define Warning() Sink::Log::debugStream(Sink::Log::DebugLevel::Warning, __LINE__, __FILE__, Q_FUNC_INFO, DEBUG_AREA) -//FIXME Error clashes with Storage::Error and MessageQueue::Error +// FIXME Error clashes with Storage::Error and MessageQueue::Error #define ErrorMsg() Sink::Log::debugStream(Sink::Log::DebugLevel::Error, __LINE__, __FILE__, Q_FUNC_INFO, DEBUG_AREA) diff --git a/common/messagequeue.cpp b/common/messagequeue.cpp index 73198a5..fd86635 100644 --- a/common/messagequeue.cpp +++ b/common/messagequeue.cpp @@ -3,41 +3,38 @@ #include #include -static KAsync::Job waitForCompletion(QList > &futures) +static KAsync::Job waitForCompletion(QList> &futures) { auto context = new QObject; return KAsync::start([futures, context](KAsync::Future &future) { - const auto total = futures.size(); - auto count = QSharedPointer::create(); - int i = 0; - for (KAsync::Future subFuture : futures) { - i++; - if (subFuture.isFinished()) { - *count += 1; - continue; - } - //FIXME bind lifetime all watcher to future (repectively the main job - auto watcher = QSharedPointer >::create(); - QObject::connect(watcher.data(), &KAsync::FutureWatcher::futureReady, - [count, total, &future](){ - *count += 1; - if (*count == total) { - future.setFinished(); - } - }); - watcher->setFuture(subFuture); - context->setProperty(QString("future%1").arg(i).toLatin1().data(), QVariant::fromValue(watcher)); - } - if (*count == total) { - future.setFinished(); - } - }).then([context]() { - delete context; - }); + const auto total = futures.size(); + auto count = QSharedPointer::create(); + int i = 0; + for (KAsync::Future subFuture : futures) { + i++; + if (subFuture.isFinished()) { + *count += 1; + continue; + } + // FIXME bind lifetime all watcher to future (repectively the main job + auto watcher = QSharedPointer>::create(); + QObject::connect(watcher.data(), &KAsync::FutureWatcher::futureReady, [count, total, &future]() { + *count += 1; + if (*count == total) { + future.setFinished(); + } + }); + watcher->setFuture(subFuture); + context->setProperty(QString("future%1").arg(i).toLatin1().data(), QVariant::fromValue(watcher)); + } + if (*count == total) { + future.setFinished(); + } + }) + .then([context]() { delete context; }); } -MessageQueue::MessageQueue(const QString &storageRoot, const QString &name) - : mStorage(storageRoot, name, Sink::Storage::ReadWrite) +MessageQueue::MessageQueue(const QString &storageRoot, const QString &name) : mStorage(storageRoot, name, Sink::Storage::ReadWrite) { } @@ -47,7 +44,7 @@ MessageQueue::~MessageQueue() void MessageQueue::enqueue(void const *msg, size_t size) { - enqueue(QByteArray::fromRawData(static_cast(msg), size)); + enqueue(QByteArray::fromRawData(static_cast(msg), size)); } void MessageQueue::startTransaction() @@ -96,19 +93,13 @@ void MessageQueue::processRemovals() mPendingRemoval.clear(); } -void MessageQueue::dequeue(const std::function)> &resultHandler, - const std::function &errorHandler) +void MessageQueue::dequeue(const std::function)> &resultHandler, const std::function &errorHandler) { dequeueBatch(1, [resultHandler](const QByteArray &value) { - return KAsync::start([&value,resultHandler](KAsync::Future &future) { - resultHandler(const_cast(static_cast(value.data())), value.size(), [&future](bool success){ - future.setFinished(); - }); + return KAsync::start([&value, resultHandler](KAsync::Future &future) { + resultHandler(const_cast(static_cast(value.data())), value.size(), [&future](bool success) { future.setFinished(); }); }); - }).then([](){}, - [errorHandler](int error, const QString &errorString) { - errorHandler(Error("messagequeue", error, errorString.toLatin1())); - }).exec(); + }).then([]() {}, [errorHandler](int error, const QString &errorString) { errorHandler(Error("messagequeue", error, errorString.toLatin1())); }).exec(); } KAsync::Job MessageQueue::dequeueBatch(int maxBatchSize, const std::function(const QByteArray &)> &resultHandler) @@ -116,41 +107,46 @@ KAsync::Job MessageQueue::dequeueBatch(int maxBatchSize, const std::functi auto resultCount = QSharedPointer::create(0); return KAsync::start([this, maxBatchSize, resultHandler, resultCount](KAsync::Future &future) { int count = 0; - QList > waitCondition; - mStorage.createTransaction(Sink::Storage::ReadOnly).openDatabase().scan("", [this, resultHandler, resultCount, &count, maxBatchSize, &waitCondition](const QByteArray &key, const QByteArray &value) -> bool { - if (Sink::Storage::isInternalKey(key) || mPendingRemoval.contains(key)) { - return true; - } - *resultCount += 1; - //We need a copy of the key here, otherwise we can't store it in the lambda (the pointers will become invalid) - mPendingRemoval << QByteArray(key.constData(), key.size()); + QList> waitCondition; + mStorage.createTransaction(Sink::Storage::ReadOnly) + .openDatabase() + .scan("", + [this, resultHandler, resultCount, &count, maxBatchSize, &waitCondition](const QByteArray &key, const QByteArray &value) -> bool { + if (Sink::Storage::isInternalKey(key) || mPendingRemoval.contains(key)) { + return true; + } + *resultCount += 1; + // We need a copy of the key here, otherwise we can't store it in the lambda (the pointers will become invalid) + mPendingRemoval << QByteArray(key.constData(), key.size()); - waitCondition << resultHandler(value).exec(); + waitCondition << resultHandler(value).exec(); - count++; - if (count < maxBatchSize) { - return true; - } - return false; - }, - [](const Sink::Storage::Error &error) { - ErrorMsg() << "Error while retrieving value" << error.message; - // errorHandler(Error(error.store, error.code, error.message)); - }); + count++; + if (count < maxBatchSize) { + return true; + } + return false; + }, + [](const Sink::Storage::Error &error) { + ErrorMsg() << "Error while retrieving value" << error.message; + // errorHandler(Error(error.store, error.code, error.message)); + }); // Trace() << "Waiting on " << waitCondition.size() << " results"; - ::waitForCompletion(waitCondition).then([this, resultCount, &future]() { - processRemovals(); - if (*resultCount == 0) { - future.setError(static_cast(ErrorCodes::NoMessageFound), "No message found"); - future.setFinished(); - } else { - if (isEmpty()) { - emit this->drained(); + ::waitForCompletion(waitCondition) + .then([this, resultCount, &future]() { + processRemovals(); + if (*resultCount == 0) { + future.setError(static_cast(ErrorCodes::NoMessageFound), "No message found"); + future.setFinished(); + } else { + if (isEmpty()) { + emit this->drained(); + } + future.setFinished(); } - future.setFinished(); - } - }).exec(); + }) + .exec(); }); } @@ -160,16 +156,15 @@ bool MessageQueue::isEmpty() auto t = mStorage.createTransaction(Sink::Storage::ReadOnly); auto db = t.openDatabase(); if (db) { - db.scan("", [&count, this](const QByteArray &key, const QByteArray &value) -> bool { - if (!Sink::Storage::isInternalKey(key) && !mPendingRemoval.contains(key)) { - count++; - return false; - } - return true; - }, - [](const Sink::Storage::Error &error) { - ErrorMsg() << "Error while checking if empty" << error.message; - }); + db.scan("", + [&count, this](const QByteArray &key, const QByteArray &value) -> bool { + if (!Sink::Storage::isInternalKey(key) && !mPendingRemoval.contains(key)) { + count++; + return false; + } + return true; + }, + [](const Sink::Storage::Error &error) { ErrorMsg() << "Error while checking if empty" << error.message; }); } return count == 0; } diff --git a/common/messagequeue.h b/common/messagequeue.h index 9399055..6f0bddb 100644 --- a/common/messagequeue.h +++ b/common/messagequeue.h @@ -16,14 +16,16 @@ class SINK_EXPORT MessageQueue : public QObject { Q_OBJECT public: - enum ErrorCodes { + enum ErrorCodes + { NoMessageFound }; class Error { public: - Error(const QByteArray &s, int c, const QByteArray &m) - : store(s), message(m), code(c) {} + Error(const QByteArray &s, int c, const QByteArray &m) : store(s), message(m), code(c) + { + } QByteArray store; QByteArray message; int code; @@ -35,11 +37,10 @@ public: void startTransaction(); void enqueue(void const *msg, size_t size); void enqueue(const QByteArray &value); - //Dequeue a message. This will return a new message everytime called. - //Call the result handler with a success response to remove the message from the store. - //TODO track processing progress to avoid processing the same message with the same preprocessor twice? - void dequeue(const std::function)> & resultHandler, - const std::function &errorHandler); + // Dequeue a message. This will return a new message everytime called. + // Call the result handler with a success response to remove the message from the store. + // TODO track processing progress to avoid processing the same message with the same preprocessor twice? + void dequeue(const std::function)> &resultHandler, const std::function &errorHandler); KAsync::Job dequeueBatch(int maxBatchSize, const std::function(const QByteArray &)> &resultHandler); bool isEmpty(); diff --git a/common/modelresult.cpp b/common/modelresult.cpp index ceefa76..1c06a7d 100644 --- a/common/modelresult.cpp +++ b/common/modelresult.cpp @@ -34,11 +34,9 @@ static uint qHash(const Sink::ApplicationDomain::ApplicationDomainType &type) return qHash(type.resourceInstanceIdentifier() + type.identifier()); } -template +template ModelResult::ModelResult(const Sink::Query &query, const QList &propertyColumns) - :QAbstractItemModel(), - mPropertyColumns(propertyColumns), - mQuery(query) + : QAbstractItemModel(), mPropertyColumns(propertyColumns), mQuery(query) { } @@ -50,7 +48,7 @@ static qint64 getIdentifier(const QModelIndex &idx) return idx.internalId(); } -template +template qint64 ModelResult::parentId(const Ptr &value) { if (!mQuery.parentProperty.isEmpty()) { @@ -62,19 +60,19 @@ qint64 ModelResult::parentId(const Ptr &value) return 0; } -template +template int ModelResult::rowCount(const QModelIndex &parent) const { return mTree[getIdentifier(parent)].size(); } -template +template int ModelResult::columnCount(const QModelIndex &parent) const { return mPropertyColumns.size(); } -template +template QVariant ModelResult::headerData(int section, Qt::Orientation orientation, int role) const { if (role == Qt::DisplayRole) { @@ -85,7 +83,7 @@ QVariant ModelResult::headerData(int section, Qt::Orientation orientatio return QVariant(); } -template +template QVariant ModelResult::data(const QModelIndex &index, int role) const { if (role == DomainObjectRole && index.isValid()) { @@ -94,7 +92,7 @@ QVariant ModelResult::data(const QModelIndex &index, int role) const } if (role == DomainObjectBaseRole && index.isValid()) { Q_ASSERT(mEntities.contains(index.internalId())); - return QVariant::fromValue(mEntities.value(index.internalId()). template staticCast()); + return QVariant::fromValue(mEntities.value(index.internalId()).template staticCast()); } if (role == ChildrenFetchedRole) { return childrenFetched(index); @@ -111,7 +109,7 @@ QVariant ModelResult::data(const QModelIndex &index, int role) const return QVariant(); } -template +template QModelIndex ModelResult::index(int row, int column, const QModelIndex &parent) const { const auto id = getIdentifier(parent); @@ -124,7 +122,7 @@ QModelIndex ModelResult::index(int row, int column, const QModelIndex &p return QModelIndex(); } -template +template QModelIndex ModelResult::createIndexFromId(const qint64 &id) const { if (id == 0) { @@ -135,7 +133,7 @@ QModelIndex ModelResult::createIndexFromId(const qint64 &id) const return createIndex(row, 0, id); } -template +template QModelIndex ModelResult::parent(const QModelIndex &index) const { auto id = getIdentifier(index); @@ -143,7 +141,7 @@ QModelIndex ModelResult::parent(const QModelIndex &index) const return createIndexFromId(parentId); } -template +template bool ModelResult::hasChildren(const QModelIndex &parent) const { if (mQuery.parentProperty.isEmpty() && parent.isValid()) { @@ -152,26 +150,26 @@ bool ModelResult::hasChildren(const QModelIndex &parent) const return QAbstractItemModel::hasChildren(parent); } -template +template bool ModelResult::canFetchMore(const QModelIndex &parent) const { const auto id = parent.internalId(); return !mEntityChildrenFetched.contains(id) || mEntityChildrenFetchComplete.contains(id); } -template +template void ModelResult::fetchMore(const QModelIndex &parent) { Trace() << "Fetching more: " << parent; fetchEntities(parent); } -template +template void ModelResult::add(const Ptr &value) { const auto childId = qHash(*value); const auto id = parentId(value); - //Ignore updates we get before the initial fetch is done + // Ignore updates we get before the initial fetch is done if (!mEntityChildrenFetched.contains(id)) { Trace() << "Too early" << id; return; @@ -199,7 +197,7 @@ void ModelResult::add(const Ptr &value) } -template +template void ModelResult::remove(const Ptr &value) { auto childId = qHash(*value); @@ -211,11 +209,11 @@ void ModelResult::remove(const Ptr &value) mEntities.remove(childId); mTree[id].removeAll(childId); mParents.remove(childId); - //TODO remove children + // TODO remove children endRemoveRows(); } -template +template void ModelResult::fetchEntities(const QModelIndex &parent) { const auto id = getIdentifier(parent); @@ -229,26 +227,20 @@ void ModelResult::fetchEntities(const QModelIndex &parent) } } -template +template void ModelResult::setFetcher(const std::function &fetcher) { Trace() << "Setting fetcher"; loadEntities = fetcher; } -template +template void ModelResult::setEmitter(const typename Sink::ResultEmitter::Ptr &emitter) { - setFetcher([this](const Ptr &parent) {mEmitter->fetch(parent);}); - emitter->onAdded([this](const Ptr &value) { - this->add(value); - }); - emitter->onModified([this](const Ptr &value) { - this->modify(value); - }); - emitter->onRemoved([this](const Ptr &value) { - this->remove(value); - }); + setFetcher([this](const Ptr &parent) { mEmitter->fetch(parent); }); + emitter->onAdded([this](const Ptr &value) { this->add(value); }); + emitter->onModified([this](const Ptr &value) { this->modify(value); }); + emitter->onRemoved([this](const Ptr &value) { this->remove(value); }); emitter->onInitialResultSetComplete([this](const Ptr &parent) { const qint64 parentId = parent ? qHash(*parent) : 0; const auto parentIndex = createIndexFromId(parentId); @@ -258,18 +250,18 @@ void ModelResult::setEmitter(const typename Sink::ResultEmitter::Pt mEmitter = emitter; } -template +template bool ModelResult::childrenFetched(const QModelIndex &index) const { return mEntityChildrenFetchComplete.contains(getIdentifier(index)); } -template +template void ModelResult::modify(const Ptr &value) { auto childId = qHash(*value); auto id = parentId(value); - //Ignore updates we get before the initial fetch is done + // Ignore updates we get before the initial fetch is done if (!mEntityChildrenFetched.contains(id)) { return; } @@ -278,7 +270,7 @@ void ModelResult::modify(const Ptr &value) auto i = mTree[id].indexOf(childId); mEntities.remove(childId); mEntities.insert(childId, value); - //TODO check for change of parents + // TODO check for change of parents auto idx = index(i, 0, parent); emit dataChanged(idx, idx); } diff --git a/common/modelresult.h b/common/modelresult.h index 062517f..0f0c06a 100644 --- a/common/modelresult.h +++ b/common/modelresult.h @@ -28,11 +28,12 @@ #include "query.h" #include "resultprovider.h" -template +template class ModelResult : public QAbstractItemModel { public: - enum Roles { + enum Roles + { DomainObjectRole = Qt::UserRole + 1, ChildrenFetchedRole, DomainObjectBaseRole @@ -46,7 +47,7 @@ public: int columnCount(const QModelIndex &parent = QModelIndex()) const; QVariant data(const QModelIndex &index, int role = Qt::DisplayRole) const; QVariant headerData(int section, Qt::Orientation orientation, int role = Qt::DisplayRole) const; - QModelIndex index(int row, int column, const QModelIndex & parent = QModelIndex()) const; + QModelIndex index(int row, int column, const QModelIndex &parent = QModelIndex()) const; QModelIndex parent(const QModelIndex &index) const; bool hasChildren(const QModelIndex &parent = QModelIndex()) const; @@ -66,7 +67,7 @@ private: QModelIndex createIndexFromId(const qint64 &id) const; void fetchEntities(const QModelIndex &parent); - //TODO we should be able to directly use T as index, with an appropriate hash function, and thus have a QMap and QList + // TODO we should be able to directly use T as index, with an appropriate hash function, and thus have a QMap and QList QMap mEntities; QMap /* child entity id*/> mTree; QMap mParents; @@ -77,4 +78,3 @@ private: std::function loadEntities; typename Sink::ResultEmitter::Ptr mEmitter; }; - diff --git a/common/notification.h b/common/notification.h index cf69a99..0eb796d 100644 --- a/common/notification.h +++ b/common/notification.h @@ -22,8 +22,7 @@ #include "sink_export.h" #include -namespace Sink -{ +namespace Sink { /** * A notification @@ -36,5 +35,4 @@ public: QString message; int code; }; - } diff --git a/common/notifier.cpp b/common/notifier.cpp index e4248df..25d0b85 100644 --- a/common/notifier.cpp +++ b/common/notifier.cpp @@ -27,20 +27,18 @@ using namespace Sink; -class Sink::Notifier::Private { +class Sink::Notifier::Private +{ public: - Private() - : context(new QObject) + Private() : context(new QObject) { - } - QList > resourceAccess; - QList > handler; + QList> resourceAccess; + QList> handler; QSharedPointer context; }; -Notifier::Notifier(const QSharedPointer &resourceAccess) - : d(new Sink::Notifier::Private) +Notifier::Notifier(const QSharedPointer &resourceAccess) : d(new Sink::Notifier::Private) { QObject::connect(resourceAccess.data(), &ResourceAccess::notification, d->context.data(), [this](const Notification ¬ification) { for (const auto &handler : d->handler) { @@ -50,8 +48,7 @@ Notifier::Notifier(const QSharedPointer &resourceAccess) d->resourceAccess << resourceAccess; } -Notifier::Notifier(const QByteArray &instanceIdentifier) - : d(new Sink::Notifier::Private) +Notifier::Notifier(const QByteArray &instanceIdentifier) : d(new Sink::Notifier::Private) { auto resourceAccess = Sink::ResourceAccess::Ptr::create(instanceIdentifier); resourceAccess->open(); diff --git a/common/notifier.h b/common/notifier.h index d16a311..9e75dde 100644 --- a/common/notifier.h +++ b/common/notifier.h @@ -32,7 +32,8 @@ namespace Sink { class ResourceAccess; class Notification; -class SINK_EXPORT Notifier { +class SINK_EXPORT Notifier +{ public: Notifier(const QSharedPointer &resourceAccess); Notifier(const QByteArray &resourceInstanceIdentifier); @@ -42,5 +43,4 @@ private: class Private; QSharedPointer d; }; - } diff --git a/common/pipeline.cpp b/common/pipeline.cpp index 35e582b..65a2f5b 100644 --- a/common/pipeline.cpp +++ b/common/pipeline.cpp @@ -40,21 +40,18 @@ #undef DEBUG_AREA #define DEBUG_AREA "resource.pipeline" -namespace Sink -{ +namespace Sink { class Pipeline::Private { public: - Private(const QString &resourceName) - : storage(Sink::storageLocation(), resourceName, Storage::ReadWrite), - revisionChanged(false) + Private(const QString &resourceName) : storage(Sink::storageLocation(), resourceName, Storage::ReadWrite), revisionChanged(false) { } Storage storage; Storage::Transaction transaction; - QHash > processors; + QHash> processors; QHash adaptorFactory; bool revisionChanged; void storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid); @@ -64,20 +61,16 @@ public: void Pipeline::Private::storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid) { - Storage::mainDatabase(transaction, bufferType).write(Storage::assembleKey(uid, newRevision), BufferUtils::extractBuffer(fbb), - [uid, newRevision](const Storage::Error &error) { - Warning() << "Failed to write entity" << uid << newRevision; - } - ); + Storage::mainDatabase(transaction, bufferType) + .write(Storage::assembleKey(uid, newRevision), BufferUtils::extractBuffer(fbb), + [uid, newRevision](const Storage::Error &error) { Warning() << "Failed to write entity" << uid << newRevision; }); revisionChanged = true; Storage::setMaxRevision(transaction, newRevision); Storage::recordRevision(transaction, newRevision, uid, bufferType); } -Pipeline::Pipeline(const QString &resourceName, QObject *parent) - : QObject(parent), - d(new Private(resourceName)) +Pipeline::Pipeline(const QString &resourceName, QObject *parent) : QObject(parent), d(new Private(resourceName)) { } @@ -98,8 +91,8 @@ void Pipeline::setAdaptorFactory(const QString &entityType, DomainTypeAdaptorFac void Pipeline::startTransaction() { - //TODO call for all types - //But avoid doing it during cleanup + // TODO call for all types + // But avoid doing it during cleanup // for (auto processor : d->processors[bufferType]) { // processor->startBatch(); // } @@ -114,14 +107,15 @@ void Pipeline::startTransaction() void Pipeline::commit() { - //TODO call for all types - //But avoid doing it during cleanup + // TODO call for all types + // But avoid doing it during cleanup // for (auto processor : d->processors[bufferType]) { // processor->finalize(); // } const auto revision = Storage::maxRevision(d->transaction); const auto elapsed = d->transactionTime.elapsed(); - Trace() << "Committing revision: " << revision << ":" << d->transactionItemCount << " items in: " << Log::TraceTime(elapsed) << " " << (double)elapsed/(double)qMax(d->transactionItemCount, 1) << "[ms/item]"; + Trace() << "Committing revision: " << revision << ":" << d->transactionItemCount << " items in: " << Log::TraceTime(elapsed) << " " + << (double)elapsed / (double)qMax(d->transactionItemCount, 1) << "[ms/item]"; if (d->transaction) { d->transaction.commit(); } @@ -157,7 +151,7 @@ KAsync::Job Pipeline::newEntity(void const *command, size_t size) auto createEntity = Commands::GetCreateEntity(command); const bool replayToSource = createEntity->replayToSource(); - const QByteArray bufferType = QByteArray(reinterpret_cast(createEntity->domainType()->Data()), createEntity->domainType()->size()); + const QByteArray bufferType = QByteArray(reinterpret_cast(createEntity->domainType()->Data()), createEntity->domainType()->size()); { flatbuffers::Verifier verifyer(reinterpret_cast(createEntity->delta()->Data()), createEntity->delta()->size()); if (!VerifyEntityBuffer(verifyer)) { @@ -173,7 +167,7 @@ KAsync::Job Pipeline::newEntity(void const *command, size_t size) QByteArray key; if (createEntity->entityId()) { - key = QByteArray(reinterpret_cast(createEntity->entityId()->Data()), createEntity->entityId()->size()); + key = QByteArray(reinterpret_cast(createEntity->entityId()->Data()), createEntity->entityId()->size()); if (Storage::mainDatabase(d->transaction, bufferType).contains(key)) { ErrorMsg() << "An entity with this id already exists: " << key; return KAsync::error(0); @@ -186,7 +180,7 @@ KAsync::Job Pipeline::newEntity(void const *command, size_t size) Q_ASSERT(!key.isEmpty()); const qint64 newRevision = Storage::maxRevision(d->transaction) + 1; - //Add metadata buffer + // Add metadata buffer flatbuffers::FlatBufferBuilder metadataFbb; auto metadataBuilder = MetadataBuilder(metadataFbb); metadataBuilder.add_revision(newRevision); @@ -196,7 +190,8 @@ KAsync::Job Pipeline::newEntity(void const *command, size_t size) FinishMetadataBuffer(metadataFbb, metadataBuffer); flatbuffers::FlatBufferBuilder fbb; - EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), entity->resource()->Data(), entity->resource()->size(), entity->local()->Data(), entity->local()->size()); + EntityBuffer::assembleEntityBuffer( + fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), entity->resource()->Data(), entity->resource()->size(), entity->local()->Data(), entity->local()->size()); d->storeNewRevision(newRevision, fbb, bufferType, key); @@ -207,20 +202,19 @@ KAsync::Job Pipeline::newEntity(void const *command, size_t size) } Log() << "Pipeline: wrote entity: " << key << newRevision << bufferType; - Storage::mainDatabase(d->transaction, bufferType).scan(Storage::assembleKey(key, newRevision), [this, bufferType, newRevision, adaptorFactory, key](const QByteArray &, const QByteArray &value) -> bool { - auto entity = GetEntity(value); - Q_ASSERT(entity->resource() || entity->local()); - auto adaptor = adaptorFactory->createAdaptor(*entity); - for (auto processor : d->processors[bufferType]) { - processor->newEntity(key, newRevision, *adaptor, d->transaction); - } - return false; - }, [this](const Storage::Error &error) { - ErrorMsg() << "Failed to find value in pipeline: " << error.message; - }); - return KAsync::start([newRevision](){ - return newRevision; - }); + Storage::mainDatabase(d->transaction, bufferType) + .scan(Storage::assembleKey(key, newRevision), + [this, bufferType, newRevision, adaptorFactory, key](const QByteArray &, const QByteArray &value) -> bool { + auto entity = GetEntity(value); + Q_ASSERT(entity->resource() || entity->local()); + auto adaptor = adaptorFactory->createAdaptor(*entity); + for (auto processor : d->processors[bufferType]) { + processor->newEntity(key, newRevision, *adaptor, d->transaction); + } + return false; + }, + [this](const Storage::Error &error) { ErrorMsg() << "Failed to find value in pipeline: " << error.message; }); + return KAsync::start([newRevision]() { return newRevision; }); } KAsync::Job Pipeline::modifiedEntity(void const *command, size_t size) @@ -242,9 +236,9 @@ KAsync::Job Pipeline::modifiedEntity(void const *command, size_t size) const qint64 baseRevision = modifyEntity->revision(); const bool replayToSource = modifyEntity->replayToSource(); - //TODO rename modifyEntity->domainType to bufferType - const QByteArray bufferType = QByteArray(reinterpret_cast(modifyEntity->domainType()->Data()), modifyEntity->domainType()->size()); - const QByteArray key = QByteArray(reinterpret_cast(modifyEntity->entityId()->Data()), modifyEntity->entityId()->size()); + // TODO rename modifyEntity->domainType to bufferType + const QByteArray bufferType = QByteArray(reinterpret_cast(modifyEntity->domainType()->Data()), modifyEntity->domainType()->size()); + const QByteArray key = QByteArray(reinterpret_cast(modifyEntity->entityId()->Data()), modifyEntity->entityId()->size()); if (bufferType.isEmpty() || key.isEmpty()) { Warning() << "entity type or key " << bufferType << key; return KAsync::error(0); @@ -257,7 +251,7 @@ KAsync::Job Pipeline::modifiedEntity(void const *command, size_t size) } } - //TODO use only readPropertyMapper and writePropertyMapper + // TODO use only readPropertyMapper and writePropertyMapper auto adaptorFactory = d->adaptorFactory.value(bufferType); if (!adaptorFactory) { Warning() << "no adaptor factory for type " << bufferType; @@ -269,30 +263,30 @@ KAsync::Job Pipeline::modifiedEntity(void const *command, size_t size) auto diff = adaptorFactory->createAdaptor(*diffEntity); QSharedPointer current; - Storage::mainDatabase(d->transaction, bufferType).findLatest(key, [¤t, adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool { - EntityBuffer buffer(const_cast(data.data()), data.size()); - if (!buffer.isValid()) { - Warning() << "Read invalid buffer from disk"; - } else { - current = adaptorFactory->createAdaptor(buffer.entity()); - } - return false; - }, - [baseRevision](const Storage::Error &error) { - Warning() << "Failed to read old revision from storage: " << error.message << "Revision: " << baseRevision; - }); + Storage::mainDatabase(d->transaction, bufferType) + .findLatest(key, + [¤t, adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool { + EntityBuffer buffer(const_cast(data.data()), data.size()); + if (!buffer.isValid()) { + Warning() << "Read invalid buffer from disk"; + } else { + current = adaptorFactory->createAdaptor(buffer.entity()); + } + return false; + }, + [baseRevision](const Storage::Error &error) { Warning() << "Failed to read old revision from storage: " << error.message << "Revision: " << baseRevision; }); if (!current) { Warning() << "Failed to read local value " << key; return KAsync::error(0); } - //resource and uid don't matter at this point + // resource and uid don't matter at this point const ApplicationDomain::ApplicationDomainType existingObject("", "", newRevision, current); auto newObject = ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation(existingObject); - //Apply diff - //FIXME only apply the properties that are available in the buffer + // Apply diff + // FIXME only apply the properties that are available in the buffer Trace() << "Applying changed properties: " << diff->availableProperties(); QSet changeset; for (const auto &property : diff->availableProperties()) { @@ -302,17 +296,17 @@ KAsync::Job Pipeline::modifiedEntity(void const *command, size_t size) newObject->setProperty(property, value); } } - //Altough we only set some properties, we want all to be serialized + // Altough we only set some properties, we want all to be serialized newObject->setChangedProperties(changeset); - //Remove deletions + // Remove deletions if (modifyEntity->deletions()) { for (const auto &property : *modifyEntity->deletions()) { newObject->setProperty(BufferUtils::extractBuffer(property), QVariant()); } } - //Add metadata buffer + // Add metadata buffer flatbuffers::FlatBufferBuilder metadataFbb; auto metadataBuilder = MetadataBuilder(metadataFbb); metadataBuilder.add_revision(newRevision); @@ -326,22 +320,21 @@ KAsync::Job Pipeline::modifiedEntity(void const *command, size_t size) d->storeNewRevision(newRevision, fbb, bufferType, key); Log() << "Pipeline: modified entity: " << key << newRevision << bufferType; - Storage::mainDatabase(d->transaction, bufferType).scan(Storage::assembleKey(key, newRevision), [this, bufferType, newRevision, adaptorFactory, current, key](const QByteArray &k, const QByteArray &value) -> bool { - if (value.isEmpty()) { - ErrorMsg() << "Read buffer is empty."; - } - auto entity = GetEntity(value.data()); - auto newEntity = adaptorFactory->createAdaptor(*entity); - for (auto processor : d->processors[bufferType]) { - processor->modifiedEntity(key, newRevision, *current, *newEntity, d->transaction); - } - return false; - }, [this](const Storage::Error &error) { - ErrorMsg() << "Failed to find value in pipeline: " << error.message; - }); - return KAsync::start([newRevision](){ - return newRevision; - }); + Storage::mainDatabase(d->transaction, bufferType) + .scan(Storage::assembleKey(key, newRevision), + [this, bufferType, newRevision, adaptorFactory, current, key](const QByteArray &k, const QByteArray &value) -> bool { + if (value.isEmpty()) { + ErrorMsg() << "Read buffer is empty."; + } + auto entity = GetEntity(value.data()); + auto newEntity = adaptorFactory->createAdaptor(*entity); + for (auto processor : d->processors[bufferType]) { + processor->modifiedEntity(key, newRevision, *current, *newEntity, d->transaction); + } + return false; + }, + [this](const Storage::Error &error) { ErrorMsg() << "Failed to find value in pipeline: " << error.message; }); + return KAsync::start([newRevision]() { return newRevision; }); } KAsync::Job Pipeline::deletedEntity(void const *command, size_t size) @@ -359,26 +352,25 @@ KAsync::Job Pipeline::deletedEntity(void const *command, size_t size) auto deleteEntity = Commands::GetDeleteEntity(command); const bool replayToSource = deleteEntity->replayToSource(); - const QByteArray bufferType = QByteArray(reinterpret_cast(deleteEntity->domainType()->Data()), deleteEntity->domainType()->size()); - const QByteArray key = QByteArray(reinterpret_cast(deleteEntity->entityId()->Data()), deleteEntity->entityId()->size()); + const QByteArray bufferType = QByteArray(reinterpret_cast(deleteEntity->domainType()->Data()), deleteEntity->domainType()->size()); + const QByteArray key = QByteArray(reinterpret_cast(deleteEntity->entityId()->Data()), deleteEntity->entityId()->size()); bool found = false; bool alreadyRemoved = false; - Storage::mainDatabase(d->transaction, bufferType).findLatest(key, [&found, &alreadyRemoved](const QByteArray &key, const QByteArray &data) -> bool { - auto entity = GetEntity(data.data()); - if (entity && entity->metadata()) { - auto metadata = GetMetadata(entity->metadata()->Data()); - found = true; - if (metadata->operation() == Operation_Removal) { - alreadyRemoved = true; - } - - } - return false; - }, - [](const Storage::Error &error) { - Warning() << "Failed to read old revision from storage: " << error.message; - }); + Storage::mainDatabase(d->transaction, bufferType) + .findLatest(key, + [&found, &alreadyRemoved](const QByteArray &key, const QByteArray &data) -> bool { + auto entity = GetEntity(data.data()); + if (entity && entity->metadata()) { + auto metadata = GetMetadata(entity->metadata()->Data()); + found = true; + if (metadata->operation() == Operation_Removal) { + alreadyRemoved = true; + } + } + return false; + }, + [](const Storage::Error &error) { Warning() << "Failed to read old revision from storage: " << error.message; }); if (!found) { Warning() << "Failed to find entity " << key; @@ -391,7 +383,7 @@ KAsync::Job Pipeline::deletedEntity(void const *command, size_t size) const qint64 newRevision = Storage::maxRevision(d->transaction) + 1; - //Add metadata buffer + // Add metadata buffer flatbuffers::FlatBufferBuilder metadataFbb; auto metadataBuilder = MetadataBuilder(metadataFbb); metadataBuilder.add_revision(newRevision); @@ -410,28 +402,27 @@ KAsync::Job Pipeline::deletedEntity(void const *command, size_t size) } QSharedPointer current; - Storage::mainDatabase(d->transaction, bufferType).findLatest(key, [this, bufferType, newRevision, adaptorFactory, key, ¤t](const QByteArray &, const QByteArray &data) -> bool { - EntityBuffer buffer(const_cast(data.data()), data.size()); - if (!buffer.isValid()) { - Warning() << "Read invalid buffer from disk"; - } else { - current = adaptorFactory->createAdaptor(buffer.entity()); - } - return false; - }, [this](const Storage::Error &error) { - ErrorMsg() << "Failed to find value in pipeline: " << error.message; - }); + Storage::mainDatabase(d->transaction, bufferType) + .findLatest(key, + [this, bufferType, newRevision, adaptorFactory, key, ¤t](const QByteArray &, const QByteArray &data) -> bool { + EntityBuffer buffer(const_cast(data.data()), data.size()); + if (!buffer.isValid()) { + Warning() << "Read invalid buffer from disk"; + } else { + current = adaptorFactory->createAdaptor(buffer.entity()); + } + return false; + }, + [this](const Storage::Error &error) { ErrorMsg() << "Failed to find value in pipeline: " << error.message; }); d->storeNewRevision(newRevision, fbb, bufferType, key); - Log() << "Pipeline: deleted entity: "<< newRevision; + Log() << "Pipeline: deleted entity: " << newRevision; for (auto processor : d->processors[bufferType]) { processor->deletedEntity(key, newRevision, *current, d->transaction); } - return KAsync::start([newRevision](){ - return newRevision; - }); + return KAsync::start([newRevision]() { return newRevision; }); } void Pipeline::cleanupRevision(qint64 revision) @@ -439,24 +430,25 @@ void Pipeline::cleanupRevision(qint64 revision) const auto uid = Storage::getUidFromRevision(d->transaction, revision); const auto bufferType = Storage::getTypeFromRevision(d->transaction, revision); Trace() << "Cleaning up revision " << revision << uid << bufferType; - Storage::mainDatabase(d->transaction, bufferType).scan(uid, [&](const QByteArray &key, const QByteArray &data) -> bool { - EntityBuffer buffer(const_cast(data.data()), data.size()); - if (!buffer.isValid()) { - Warning() << "Read invalid buffer from disk"; - } else { - const auto metadata = flatbuffers::GetRoot(buffer.metadataBuffer()); - const qint64 rev = metadata->revision(); - //Remove old revisions, and the current if the entity has already been removed - if (rev < revision || metadata->operation() == Operation_Removal) { - Storage::removeRevision(d->transaction, rev); - Storage::mainDatabase(d->transaction, bufferType).remove(key); - } - } - - return true; - }, [](const Storage::Error &error) { - Warning() << "Error while reading: " << error.message; - }, true); + Storage::mainDatabase(d->transaction, bufferType) + .scan(uid, + [&](const QByteArray &key, const QByteArray &data) -> bool { + EntityBuffer buffer(const_cast(data.data()), data.size()); + if (!buffer.isValid()) { + Warning() << "Read invalid buffer from disk"; + } else { + const auto metadata = flatbuffers::GetRoot(buffer.metadataBuffer()); + const qint64 rev = metadata->revision(); + // Remove old revisions, and the current if the entity has already been removed + if (rev < revision || metadata->operation() == Operation_Removal) { + Storage::removeRevision(d->transaction, rev); + Storage::mainDatabase(d->transaction, bufferType).remove(key); + } + } + + return true; + }, + [](const Storage::Error &error) { Warning() << "Error while reading: " << error.message; }, true); Storage::setCleanedUpRevision(d->transaction, revision); } @@ -465,8 +457,7 @@ qint64 Pipeline::cleanedUpRevision() return Storage::cleanedUpRevision(d->transaction); } -Preprocessor::Preprocessor() - : d(0) +Preprocessor::Preprocessor() : d(0) { } diff --git a/common/pipeline.h b/common/pipeline.h index 0f989e4..dc2cc4d 100644 --- a/common/pipeline.h +++ b/common/pipeline.h @@ -32,8 +32,7 @@ #include "domainadaptor.h" -namespace Sink -{ +namespace Sink { class Preprocessor; @@ -74,7 +73,7 @@ signals: private: class Private; - Private * const d; + Private *const d; }; class SINK_EXPORT Preprocessor @@ -85,14 +84,14 @@ public: virtual void startBatch(); virtual void newEntity(const QByteArray &key, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &newEntity, Sink::Storage::Transaction &transaction) = 0; - virtual void modifiedEntity(const QByteArray &key, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &oldEntity, const Sink::ApplicationDomain::BufferAdaptor &newEntity, Sink::Storage::Transaction &transaction) = 0; + virtual void modifiedEntity(const QByteArray &key, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &oldEntity, + const Sink::ApplicationDomain::BufferAdaptor &newEntity, Sink::Storage::Transaction &transaction) = 0; virtual void deletedEntity(const QByteArray &key, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &oldEntity, Sink::Storage::Transaction &transaction) = 0; virtual void finalize(); private: class Private; - Private * const d; + Private *const d; }; } // namespace Sink - diff --git a/common/propertymapper.cpp b/common/propertymapper.cpp index 5348b11..ebe5cb3 100644 --- a/common/propertymapper.cpp +++ b/common/propertymapper.cpp @@ -51,7 +51,7 @@ template <> QVariant propertyToVariant(const flatbuffers::String *property) { if (property) { - //We have to copy the memory, otherwise it would become eventually invalid + // We have to copy the memory, otherwise it would become eventually invalid return QString::fromStdString(property->c_str()); } return QVariant(); @@ -61,7 +61,7 @@ template <> QVariant propertyToVariant(const flatbuffers::String *property) { if (property) { - //We have to copy the memory, otherwise it would become eventually invalid + // We have to copy the memory, otherwise it would become eventually invalid return QString::fromStdString(property->c_str()).toUtf8(); } return QVariant(); @@ -71,7 +71,7 @@ template <> QVariant propertyToVariant(const flatbuffers::Vector *property) { if (property) { - //We have to copy the memory, otherwise it would become eventually invalid + // We have to copy the memory, otherwise it would become eventually invalid return QByteArray(reinterpret_cast(property->Data()), property->Length()); } return QVariant(); @@ -87,7 +87,7 @@ template <> QVariant propertyToVariant(const flatbuffers::String *property) { if (property) { - //We have to copy the memory, otherwise it would become eventually invalid + // We have to copy the memory, otherwise it would become eventually invalid return QDateTime::fromString(QString::fromStdString(property->c_str())); } return QVariant(); diff --git a/common/propertymapper.h b/common/propertymapper.h index 57202ab..cf8ce7b 100644 --- a/common/propertymapper.h +++ b/common/propertymapper.h @@ -49,7 +49,7 @@ QVariant SINK_EXPORT propertyToVariant(const flatbuffers::Vector *); * a virtual method per property, the property mapper can be filled with accessors * that extract the properties from resource types. */ -template +template class ReadPropertyMapper { public: @@ -63,63 +63,66 @@ public: } return QVariant(); } - bool hasMapping(const QByteArray &key) const { return mReadAccessors.contains(key); } - QList availableProperties() const { return mReadAccessors.keys(); } - void addMapping(const QByteArray &property, const std::function &mapping) { + bool hasMapping(const QByteArray &key) const + { + return mReadAccessors.contains(key); + } + QList availableProperties() const + { + return mReadAccessors.keys(); + } + void addMapping(const QByteArray &property, const std::function &mapping) + { mReadAccessors.insert(property, mapping); } template void addMapping(const QByteArray &name, const flatbuffers::String *(Buffer::*f)() const) { - addMapping(name, [f](Buffer const *buffer) -> QVariant { - return propertyToVariant((buffer->*f)()); - }); + addMapping(name, [f](Buffer const *buffer) -> QVariant { return propertyToVariant((buffer->*f)()); }); } template void addMapping(const QByteArray &name, uint8_t (Buffer::*f)() const) { - addMapping(name, [f](Buffer const *buffer) -> QVariant { - return propertyToVariant((buffer->*f)()); - }); + addMapping(name, [f](Buffer const *buffer) -> QVariant { return propertyToVariant((buffer->*f)()); }); } template void addMapping(const QByteArray &name, bool (Buffer::*f)() const) { - addMapping(name, [f](Buffer const *buffer) -> QVariant { - return propertyToVariant((buffer->*f)()); - }); + addMapping(name, [f](Buffer const *buffer) -> QVariant { return propertyToVariant((buffer->*f)()); }); } template - void addMapping(const QByteArray &name, const flatbuffers::Vector * (Buffer::*f)() const) + void addMapping(const QByteArray &name, const flatbuffers::Vector *(Buffer::*f)() const) { - addMapping(name, [f](Buffer const *buffer) -> QVariant { - return propertyToVariant((buffer->*f)()); - }); + addMapping(name, [f](Buffer const *buffer) -> QVariant { return propertyToVariant((buffer->*f)()); }); } - + private: - QHash > mReadAccessors; + QHash> mReadAccessors; }; -template +template class WritePropertyMapper { public: virtual ~WritePropertyMapper(){}; - virtual void setProperty(const QByteArray &key, const QVariant &value, QList > &builderCalls, flatbuffers::FlatBufferBuilder &fbb) const + virtual void setProperty(const QByteArray &key, const QVariant &value, QList> &builderCalls, flatbuffers::FlatBufferBuilder &fbb) const { if (mWriteAccessors.contains(key)) { auto accessor = mWriteAccessors.value(key); builderCalls << accessor(value, fbb); } } - bool hasMapping(const QByteArray &key) const { return mWriteAccessors.contains(key); } - void addMapping(const QByteArray &property, const std::function(const QVariant &, flatbuffers::FlatBufferBuilder &)> &mapping) { + bool hasMapping(const QByteArray &key) const + { + return mWriteAccessors.contains(key); + } + void addMapping(const QByteArray &property, const std::function(const QVariant &, flatbuffers::FlatBufferBuilder &)> &mapping) + { mWriteAccessors.insert(property, mapping); } @@ -127,9 +130,7 @@ public: void addMapping(const QByteArray &name, void (BufferBuilder::*f)(uint8_t)) { addMapping(name, [f](const QVariant &value, flatbuffers::FlatBufferBuilder &fbb) -> std::function { - return [value, f](BufferBuilder &builder) { - (builder.*f)(value.value()); - }; + return [value, f](BufferBuilder &builder) { (builder.*f)(value.value()); }; }); } @@ -137,9 +138,7 @@ public: void addMapping(const QByteArray &name, void (BufferBuilder::*f)(bool)) { addMapping(name, [f](const QVariant &value, flatbuffers::FlatBufferBuilder &fbb) -> std::function { - return [value, f](BufferBuilder &builder) { - (builder.*f)(value.value()); - }; + return [value, f](BufferBuilder &builder) { (builder.*f)(value.value()); }; }); } @@ -148,23 +147,19 @@ public: { addMapping(name, [f](const QVariant &value, flatbuffers::FlatBufferBuilder &fbb) -> std::function { auto offset = variantToProperty(value, fbb); - return [offset, f](BufferBuilder &builder) { - (builder.*f)(offset); - }; + return [offset, f](BufferBuilder &builder) { (builder.*f)(offset); }; }); } template - void addMapping(const QByteArray &name, void (BufferBuilder::*f)(flatbuffers::Offset >)) + void addMapping(const QByteArray &name, void (BufferBuilder::*f)(flatbuffers::Offset>)) { addMapping(name, [f](const QVariant &value, flatbuffers::FlatBufferBuilder &fbb) -> std::function { auto offset = variantToProperty(value, fbb); - return [offset, f](BufferBuilder &builder) { - (builder.*f)(offset); - }; + return [offset, f](BufferBuilder &builder) { (builder.*f)(offset); }; }); } + private: - QHash(const QVariant &, flatbuffers::FlatBufferBuilder &)> > mWriteAccessors; + QHash(const QVariant &, flatbuffers::FlatBufferBuilder &)>> mWriteAccessors; }; - diff --git a/common/query.h b/common/query.h index 60c5630..3a56c9f 100644 --- a/common/query.h +++ b/common/query.h @@ -32,7 +32,8 @@ namespace Sink { class Query { public: - enum Flag { + enum Flag + { /** Leave the query running an contiously update the result set. */ LiveQuery }; @@ -102,11 +103,11 @@ public: return query; } - Query(Flags flags = Flags()) - : limit(0) - {} + Query(Flags flags = Flags()) : limit(0) + { + } - Query& operator+=(const Query& rhs) + Query &operator+=(const Query &rhs) { resources += rhs.resources; ids += rhs.ids; @@ -121,7 +122,7 @@ public: return *this; } - friend Query operator+(Query lhs, const Query& rhs) + friend Query operator+(Query lhs, const Query &rhs) { lhs += rhs; return lhs; @@ -136,7 +137,6 @@ public: bool liveQuery; int limit; }; - } Q_DECLARE_OPERATORS_FOR_FLAGS(Sink::Query::Flags) diff --git a/common/queryrunner.cpp b/common/queryrunner.cpp index 5ac1344..c150159 100644 --- a/common/queryrunner.cpp +++ b/common/queryrunner.cpp @@ -39,11 +39,12 @@ using namespace Sink; * This is a worker object that can be moved to a thread to execute the query. * The only interaction point is the ResultProvider, which handles the threadsafe reporting of the result. */ -template +template class QueryWorker : public QObject { public: - QueryWorker(const Sink::Query &query, const QByteArray &instanceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &, const QByteArray &bufferType, const QueryRunnerBase::ResultTransformation &transformation); + QueryWorker(const Sink::Query &query, const QByteArray &instanceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &, const QByteArray &bufferType, + const QueryRunnerBase::ResultTransformation &transformation); virtual ~QueryWorker(); qint64 executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface &resultProvider); @@ -52,14 +53,17 @@ public: private: void replaySet(ResultSet &resultSet, Sink::ResultProviderInterface &resultProvider, const QList &properties, int offset, int batchSize); - void readEntity(const Sink::Storage::NamedDatabase &db, const QByteArray &key, const std::function &resultCallback); + void readEntity(const Sink::Storage::NamedDatabase &db, const QByteArray &key, + const std::function &resultCallback); ResultSet loadInitialResultSet(const Sink::Query &query, Sink::Storage::Transaction &transaction, QSet &remainingFilters, QByteArray &remainingSorting); ResultSet loadIncrementalResultSet(qint64 baseRevision, const Sink::Query &query, Sink::Storage::Transaction &transaction, QSet &remainingFilters); - ResultSet filterAndSortSet(ResultSet &resultSet, const std::function &filter, const Sink::Storage::NamedDatabase &db, bool initialQuery, const QByteArray &sortProperty); + ResultSet filterAndSortSet(ResultSet &resultSet, const std::function &filter, + const Sink::Storage::NamedDatabase &db, bool initialQuery, const QByteArray &sortProperty); std::function getFilter(const QSet remainingFilters, const Sink::Query &query); - qint64 load(const Sink::Query &query, const std::function &, QByteArray &)> &baseSetRetriever, Sink::ResultProviderInterface &resultProvider, bool initialQuery, int offset, int batchSize); + qint64 load(const Sink::Query &query, const std::function &, QByteArray &)> &baseSetRetriever, + Sink::ResultProviderInterface &resultProvider, bool initialQuery, int offset, int batchSize); private: QueryRunnerBase::ResultTransformation mResultTransformation; @@ -70,176 +74,171 @@ private: }; -template -QueryRunner::QueryRunner(const Sink::Query &query, const Sink::ResourceAccessInterface::Ptr &resourceAccess, const QByteArray &instanceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &factory, const QByteArray &bufferType) - : QueryRunnerBase(), - mResourceAccess(resourceAccess), - mResultProvider(new ResultProvider), - mOffset(0), - mBatchSize(query.limit) +template +QueryRunner::QueryRunner(const Sink::Query &query, const Sink::ResourceAccessInterface::Ptr &resourceAccess, const QByteArray &instanceIdentifier, + const DomainTypeAdaptorFactoryInterface::Ptr &factory, const QByteArray &bufferType) + : QueryRunnerBase(), mResourceAccess(resourceAccess), mResultProvider(new ResultProvider), mOffset(0), mBatchSize(query.limit) { Trace() << "Starting query"; if (query.limit && query.sortProperty.isEmpty()) { Warning() << "A limited query without sorting is typically a bad idea."; } - //We delegate loading of initial data to the result provider, os it can decide for itself what it needs to load. + // We delegate loading of initial data to the result provider, os it can decide for itself what it needs to load. mResultProvider->setFetcher([=](const typename DomainType::Ptr &parent) { Trace() << "Running fetcher. Offset: " << mOffset << " Batchsize: " << mBatchSize; auto resultProvider = mResultProvider; async::run([=]() -> qint64 { - QueryWorker worker(query, instanceIdentifier, factory, bufferType, mResultTransformation); - const qint64 newRevision = worker.executeInitialQuery(query, parent, *resultProvider, mOffset, mBatchSize); - return newRevision; - }) + QueryWorker worker(query, instanceIdentifier, factory, bufferType, mResultTransformation); + const qint64 newRevision = worker.executeInitialQuery(query, parent, *resultProvider, mOffset, mBatchSize); + return newRevision; + }) .template then([query, this](qint64 newRevision) { mOffset += mBatchSize; - //Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. + // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. if (query.liveQuery) { mResourceAccess->sendRevisionReplayedCommand(newRevision); } - }).exec(); + }) + .exec(); }); // In case of a live query we keep the runner for as long alive as the result provider exists if (query.liveQuery) { - //Incremental updates are always loaded directly, leaving it up to the result to discard the changes if they are not interesting - setQuery([=] () -> KAsync::Job { + // Incremental updates are always loaded directly, leaving it up to the result to discard the changes if they are not interesting + setQuery([=]() -> KAsync::Job { auto resultProvider = mResultProvider; return async::run([=]() -> qint64 { - QueryWorker worker(query, instanceIdentifier, factory, bufferType, mResultTransformation); - const qint64 newRevision = worker.executeIncrementalQuery(query, *resultProvider); - return newRevision; - }) + QueryWorker worker(query, instanceIdentifier, factory, bufferType, mResultTransformation); + const qint64 newRevision = worker.executeIncrementalQuery(query, *resultProvider); + return newRevision; + }) .template then([query, this](qint64 newRevision) { - //Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. + // Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. mResourceAccess->sendRevisionReplayedCommand(newRevision); }); }); - //Ensure the connection is open, if it wasn't already opened - //TODO If we are not connected already, we have to check for the latest revision once connected, otherwise we could miss some updates + // Ensure the connection is open, if it wasn't already opened + // TODO If we are not connected already, we have to check for the latest revision once connected, otherwise we could miss some updates mResourceAccess->open(); QObject::connect(mResourceAccess.data(), &Sink::ResourceAccess::revisionChanged, this, &QueryRunner::revisionChanged); } } -template +template QueryRunner::~QueryRunner() { Trace() << "Stopped query"; } -template +template void QueryRunner::setResultTransformation(const ResultTransformation &transformation) { mResultTransformation = transformation; } -template +template typename Sink::ResultEmitter::Ptr QueryRunner::emitter() { return mResultProvider->emitter(); } - static inline ResultSet fullScan(const Sink::Storage::Transaction &transaction, const QByteArray &bufferType) { - //TODO use a result set with an iterator, to read values on demand + // TODO use a result set with an iterator, to read values on demand QVector keys; - Storage::mainDatabase(transaction, bufferType).scan(QByteArray(), [&](const QByteArray &key, const QByteArray &value) -> bool { - //Skip internals - if (Sink::Storage::isInternalKey(key)) { - return true; - } - keys << Sink::Storage::uidFromKey(key); - return true; - }, - [](const Sink::Storage::Error &error) { - Warning() << "Error during query: " << error.message; - }); + Storage::mainDatabase(transaction, bufferType) + .scan(QByteArray(), + [&](const QByteArray &key, const QByteArray &value) -> bool { + // Skip internals + if (Sink::Storage::isInternalKey(key)) { + return true; + } + keys << Sink::Storage::uidFromKey(key); + return true; + }, + [](const Sink::Storage::Error &error) { Warning() << "Error during query: " << error.message; }); Trace() << "Full scan retrieved " << keys.size() << " results."; return ResultSet(keys); } -template -QueryWorker::QueryWorker(const Sink::Query &query, const QByteArray &instanceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &factory, const QByteArray &bufferType, const QueryRunnerBase::ResultTransformation &transformation) - : QObject(), - mResultTransformation(transformation), - mDomainTypeAdaptorFactory(factory), - mResourceInstanceIdentifier(instanceIdentifier), - mBufferType(bufferType), - mQuery(query) +template +QueryWorker::QueryWorker(const Sink::Query &query, const QByteArray &instanceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &factory, + const QByteArray &bufferType, const QueryRunnerBase::ResultTransformation &transformation) + : QObject(), mResultTransformation(transformation), mDomainTypeAdaptorFactory(factory), mResourceInstanceIdentifier(instanceIdentifier), mBufferType(bufferType), mQuery(query) { Trace() << "Starting query worker"; } -template +template QueryWorker::~QueryWorker() { Trace() << "Stopped query worker"; } -template +template void QueryWorker::replaySet(ResultSet &resultSet, Sink::ResultProviderInterface &resultProvider, const QList &properties, int offset, int batchSize) { Trace() << "Skipping over " << offset << " results"; resultSet.skip(offset); int counter; for (counter = 0; !batchSize || (counter < batchSize); counter++) { - const bool ret = resultSet.next([this, &resultProvider, &counter, &properties, batchSize](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &value, Sink::Operation operation) -> bool { - //FIXME allow maildir resource to set the mimeMessage property - auto valueCopy = Sink::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation(*value, properties).template staticCast(); - if (mResultTransformation) { - mResultTransformation(*valueCopy); - } - switch (operation) { - case Sink::Operation_Creation: - // Trace() << "Got creation"; - resultProvider.add(valueCopy); - break; - case Sink::Operation_Modification: - // Trace() << "Got modification"; - resultProvider.modify(valueCopy); - break; - case Sink::Operation_Removal: - // Trace() << "Got removal"; - resultProvider.remove(valueCopy); - break; - } - return true; - }); + const bool ret = + resultSet.next([this, &resultProvider, &counter, &properties, batchSize](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &value, Sink::Operation operation) -> bool { + // FIXME allow maildir resource to set the mimeMessage property + auto valueCopy = Sink::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation(*value, properties).template staticCast(); + if (mResultTransformation) { + mResultTransformation(*valueCopy); + } + switch (operation) { + case Sink::Operation_Creation: + // Trace() << "Got creation"; + resultProvider.add(valueCopy); + break; + case Sink::Operation_Modification: + // Trace() << "Got modification"; + resultProvider.modify(valueCopy); + break; + case Sink::Operation_Removal: + // Trace() << "Got removal"; + resultProvider.remove(valueCopy); + break; + } + return true; + }); if (!ret) { break; } }; - Trace() << "Replayed " << counter << " results." << "Limit " << batchSize; + Trace() << "Replayed " << counter << " results." + << "Limit " << batchSize; } -template -void QueryWorker::readEntity(const Sink::Storage::NamedDatabase &db, const QByteArray &key, const std::function &resultCallback) +template +void QueryWorker::readEntity(const Sink::Storage::NamedDatabase &db, const QByteArray &key, + const std::function &resultCallback) { - //This only works for a 1:1 mapping of resource to domain types. - //Not i.e. for tags that are stored as flags in each entity of an imap store. - //additional properties that don't have a 1:1 mapping (such as separately stored tags), - //could be added to the adaptor. - db.findLatest(key, [=](const QByteArray &key, const QByteArray &value) -> bool { - Sink::EntityBuffer buffer(value.data(), value.size()); - const Sink::Entity &entity = buffer.entity(); - const auto metadataBuffer = Sink::EntityBuffer::readBuffer(entity.metadata()); - const qint64 revision = metadataBuffer ? metadataBuffer->revision() : -1; - const auto operation = metadataBuffer ? metadataBuffer->operation() : Sink::Operation_Creation; - auto adaptor = mDomainTypeAdaptorFactory->createAdaptor(entity); - resultCallback(DomainType::Ptr::create(mResourceInstanceIdentifier, Sink::Storage::uidFromKey(key), revision, adaptor), operation); - return false; - }, - [](const Sink::Storage::Error &error) { - Warning() << "Error during query: " << error.message; - }); + // This only works for a 1:1 mapping of resource to domain types. + // Not i.e. for tags that are stored as flags in each entity of an imap store. + // additional properties that don't have a 1:1 mapping (such as separately stored tags), + // could be added to the adaptor. + db.findLatest(key, + [=](const QByteArray &key, const QByteArray &value) -> bool { + Sink::EntityBuffer buffer(value.data(), value.size()); + const Sink::Entity &entity = buffer.entity(); + const auto metadataBuffer = Sink::EntityBuffer::readBuffer(entity.metadata()); + const qint64 revision = metadataBuffer ? metadataBuffer->revision() : -1; + const auto operation = metadataBuffer ? metadataBuffer->operation() : Sink::Operation_Creation; + auto adaptor = mDomainTypeAdaptorFactory->createAdaptor(entity); + resultCallback(DomainType::Ptr::create(mResourceInstanceIdentifier, Sink::Storage::uidFromKey(key), revision, adaptor), operation); + return false; + }, + [](const Sink::Storage::Error &error) { Warning() << "Error during query: " << error.message; }); } -template +template ResultSet QueryWorker::loadInitialResultSet(const Sink::Query &query, Sink::Storage::Transaction &transaction, QSet &remainingFilters, QByteArray &remainingSorting) { if (!query.ids.isEmpty()) { @@ -253,15 +252,15 @@ ResultSet QueryWorker::loadInitialResultSet(const Sink::Query &query remainingSorting = query.sortProperty; } - //We do a full scan if there were no indexes available to create the initial set. + // We do a full scan if there were no indexes available to create the initial set. if (appliedFilters.isEmpty()) { - //TODO this should be replaced by an index lookup as well + // TODO this should be replaced by an index lookup as well resultSet = fullScan(transaction, mBufferType); } return resultSet; } -template +template ResultSet QueryWorker::loadIncrementalResultSet(qint64 baseRevision, const Sink::Query &query, Sink::Storage::Transaction &transaction, QSet &remainingFilters) { const auto bufferType = mBufferType; @@ -269,13 +268,13 @@ ResultSet QueryWorker::loadIncrementalResultSet(qint64 baseRevision, remainingFilters = query.propertyFilter.keys().toSet(); return ResultSet([bufferType, revisionCounter, &transaction]() -> QByteArray { const qint64 topRevision = Sink::Storage::maxRevision(transaction); - //Spit out the revision keys one by one. + // Spit out the revision keys one by one. while (*revisionCounter <= topRevision) { const auto uid = Sink::Storage::getUidFromRevision(transaction, *revisionCounter); const auto type = Sink::Storage::getTypeFromRevision(transaction, *revisionCounter); // Trace() << "Revision" << *revisionCounter << type << uid; if (type != bufferType) { - //Skip revision + // Skip revision *revisionCounter += 1; continue; } @@ -284,45 +283,47 @@ ResultSet QueryWorker::loadIncrementalResultSet(qint64 baseRevision, return key; } Trace() << "Finished reading incremental result set:" << *revisionCounter; - //We're done + // We're done return QByteArray(); }); } -template -ResultSet QueryWorker::filterAndSortSet(ResultSet &resultSet, const std::function &filter, const Sink::Storage::NamedDatabase &db, bool initialQuery, const QByteArray &sortProperty) +template +ResultSet QueryWorker::filterAndSortSet(ResultSet &resultSet, const std::function &filter, + const Sink::Storage::NamedDatabase &db, bool initialQuery, const QByteArray &sortProperty) { const bool sortingRequired = !sortProperty.isEmpty(); if (initialQuery && sortingRequired) { Trace() << "Sorting the resultset in memory according to property: " << sortProperty; - //Sort the complete set by reading the sort property and filling into a sorted map + // Sort the complete set by reading the sort property and filling into a sorted map auto sortedMap = QSharedPointer>::create(); while (resultSet.next()) { - //readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess) - readEntity(db, resultSet.id(), [this, filter, initialQuery, sortedMap, sortProperty, &resultSet](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject, Sink::Operation operation) { - //We're not interested in removals during the initial query - if ((operation != Sink::Operation_Removal) && filter(domainObject)) { - if (!sortProperty.isEmpty()) { - const auto sortValue = domainObject->getProperty(sortProperty); - if (sortValue.type() == QVariant::DateTime) { - sortedMap->insert(QByteArray::number(std::numeric_limits::max() - sortValue.toDateTime().toTime_t()), domainObject->identifier()); + // readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess) + readEntity(db, resultSet.id(), + [this, filter, initialQuery, sortedMap, sortProperty, &resultSet](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject, Sink::Operation operation) { + // We're not interested in removals during the initial query + if ((operation != Sink::Operation_Removal) && filter(domainObject)) { + if (!sortProperty.isEmpty()) { + const auto sortValue = domainObject->getProperty(sortProperty); + if (sortValue.type() == QVariant::DateTime) { + sortedMap->insert(QByteArray::number(std::numeric_limits::max() - sortValue.toDateTime().toTime_t()), domainObject->identifier()); + } else { + sortedMap->insert(sortValue.toString().toLatin1(), domainObject->identifier()); + } } else { - sortedMap->insert(sortValue.toString().toLatin1(), domainObject->identifier()); + sortedMap->insert(domainObject->identifier(), domainObject->identifier()); } - } else { - sortedMap->insert(domainObject->identifier(), domainObject->identifier()); } - } - }); + }); } Trace() << "Sorted " << sortedMap->size() << " values."; - auto iterator = QSharedPointer >::create(*sortedMap); - ResultSet::ValueGenerator generator = [this, iterator, sortedMap, &db, filter, initialQuery](std::function callback) -> bool { + auto iterator = QSharedPointer>::create(*sortedMap); + ResultSet::ValueGenerator generator = [this, iterator, sortedMap, &db, filter, initialQuery]( + std::function callback) -> bool { if (iterator->hasNext()) { - readEntity(db, iterator->next().value(), [this, filter, callback, initialQuery](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject, Sink::Operation operation) { - callback(domainObject, Sink::Operation_Creation); - }); + readEntity(db, iterator->next().value(), [this, filter, callback, initialQuery](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject, + Sink::Operation operation) { callback(domainObject, Sink::Operation_Creation); }); return true; } return false; @@ -336,19 +337,21 @@ ResultSet QueryWorker::filterAndSortSet(ResultSet &resultSet, const return ResultSet(generator, skip); } else { auto resultSetPtr = QSharedPointer::create(resultSet); - ResultSet::ValueGenerator generator = [this, resultSetPtr, &db, filter, initialQuery](std::function callback) -> bool { + ResultSet::ValueGenerator generator = [this, resultSetPtr, &db, filter, initialQuery]( + std::function callback) -> bool { if (resultSetPtr->next()) { - //readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess) + // readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess) readEntity(db, resultSetPtr->id(), [this, filter, callback, initialQuery](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject, Sink::Operation operation) { if (initialQuery) { - //We're not interested in removals during the initial query + // We're not interested in removals during the initial query if ((operation != Sink::Operation_Removal) && filter(domainObject)) { - //In the initial set every entity is new + // In the initial set every entity is new callback(domainObject, Sink::Operation_Creation); - } } else { - //Always remove removals, they probably don't match due to non-available properties + } + } else { + // Always remove removals, they probably don't match due to non-available properties if ((operation == Sink::Operation_Removal) || filter(domainObject)) { - //TODO only replay if this is in the currently visible set (or just always replay, worst case we have a couple to many results) + // TODO only replay if this is in the currently visible set (or just always replay, worst case we have a couple to many results) callback(domainObject, operation); } } @@ -357,15 +360,14 @@ ResultSet QueryWorker::filterAndSortSet(ResultSet &resultSet, const } return false; }; - auto skip = [resultSetPtr]() { - resultSetPtr->skip(1); - }; + auto skip = [resultSetPtr]() { resultSetPtr->skip(1); }; return ResultSet(generator, skip); } } -template -std::function QueryWorker::getFilter(const QSet remainingFilters, const Sink::Query &query) +template +std::function +QueryWorker::getFilter(const QSet remainingFilters, const Sink::Query &query) { return [remainingFilters, query](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &domainObject) -> bool { if (!query.ids.isEmpty()) { @@ -376,7 +378,7 @@ std::functiongetProperty(filterProperty); if (property.isValid()) { - //TODO implement other comparison operators than equality + // TODO implement other comparison operators than equality if (property != query.propertyFilter.value(filterProperty)) { Trace() << "Filtering entity due to property mismatch on filter: " << filterProperty << property << ":" << query.propertyFilter.value(filterProperty); return false; @@ -389,16 +391,15 @@ std::function -qint64 QueryWorker::load(const Sink::Query &query, const std::function &, QByteArray &)> &baseSetRetriever, Sink::ResultProviderInterface &resultProvider, bool initialQuery, int offset, int batchSize) +template +qint64 QueryWorker::load(const Sink::Query &query, const std::function &, QByteArray &)> &baseSetRetriever, + Sink::ResultProviderInterface &resultProvider, bool initialQuery, int offset, int batchSize) { QTime time; time.start(); Sink::Storage storage(Sink::storageLocation(), mResourceInstanceIdentifier); - storage.setDefaultErrorHandler([](const Sink::Storage::Error &error) { - Warning() << "Error during query: " << error.store << error.message; - }); + storage.setDefaultErrorHandler([](const Sink::Storage::Error &error) { Warning() << "Error during query: " << error.store << error.message; }); auto transaction = storage.createTransaction(Sink::Storage::ReadOnly); auto db = Storage::mainDatabase(transaction, mBufferType); @@ -414,7 +415,7 @@ qint64 QueryWorker::load(const Sink::Query &query, const std::functi return Sink::Storage::maxRevision(transaction); } -template +template qint64 QueryWorker::executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface &resultProvider) { QTime time; @@ -429,8 +430,9 @@ qint64 QueryWorker::executeIncrementalQuery(const Sink::Query &query return revision; } -template -qint64 QueryWorker::executeInitialQuery(const Sink::Query &query, const typename DomainType::Ptr &parent, Sink::ResultProviderInterface &resultProvider, int offset, int batchsize) +template +qint64 QueryWorker::executeInitialQuery( + const Sink::Query &query, const typename DomainType::Ptr &parent, Sink::ResultProviderInterface &resultProvider, int offset, int batchsize) { QTime time; time.start(); diff --git a/common/queryrunner.h b/common/queryrunner.h index 436e2e0..adaf297 100644 --- a/common/queryrunner.h +++ b/common/queryrunner.h @@ -70,18 +70,19 @@ private: /** * A QueryRunner runs a query and updates the corresponding result set. - * + * * The lifetime of the QueryRunner is defined by the resut set (otherwise it's doing useless work), * and by how long a result set must be updated. If the query is one off the runner dies after the execution, * otherwise it lives on the react to changes and updates the corresponding result set. - * + * * QueryRunner has to keep ResourceAccess alive in order to keep getting updates. */ -template +template class QueryRunner : public QueryRunnerBase { public: - QueryRunner(const Sink::Query &query, const Sink::ResourceAccessInterface::Ptr &, const QByteArray &instanceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &, const QByteArray &bufferType); + QueryRunner(const Sink::Query &query, const Sink::ResourceAccessInterface::Ptr &, const QByteArray &instanceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &, + const QByteArray &bufferType); virtual ~QueryRunner(); /** @@ -94,9 +95,8 @@ public: private: QSharedPointer mResourceAccess; - QSharedPointer > mResultProvider; + QSharedPointer> mResultProvider; ResultTransformation mResultTransformation; int mOffset; int mBatchSize; }; - diff --git a/common/resource.cpp b/common/resource.cpp index 5cbb2f2..6713686 100644 --- a/common/resource.cpp +++ b/common/resource.cpp @@ -27,19 +27,16 @@ #include "facadefactory.h" -namespace Sink -{ +namespace Sink { -Resource::Resource() - : QObject(), - d(0) +Resource::Resource() : QObject(), d(0) { Q_UNUSED(d); } Resource::~Resource() { - //delete d; + // delete d; } void Resource::processCommand(int commandId, const QByteArray &data) @@ -71,21 +68,19 @@ void Resource::removeDataFromDisk() class ResourceFactory::Private { public: - static QHash > s_loadedFactories; + static QHash> s_loadedFactories; }; -QHash > ResourceFactory::Private::s_loadedFactories; +QHash> ResourceFactory::Private::s_loadedFactories; -ResourceFactory::ResourceFactory(QObject *parent) - : QObject(parent), - d(0) +ResourceFactory::ResourceFactory(QObject *parent) : QObject(parent), d(0) { Q_UNUSED(d); } ResourceFactory::~ResourceFactory() { - //delete d; + // delete d; } ResourceFactory *ResourceFactory::load(const QString &resourceName) @@ -95,15 +90,15 @@ ResourceFactory *ResourceFactory::load(const QString &resourceName) return factory; } - for (auto const &path: QCoreApplication::instance()->libraryPaths()) { + for (auto const &path : QCoreApplication::instance()->libraryPaths()) { if (path.endsWith(QLatin1String("plugins"))) { QDir pluginDir(path); - //TODO: centralize this so that it is easy to change centrally + // TODO: centralize this so that it is easy to change centrally // also ref'd in cmake as ${SINK_RESOURCE_PLUGINS_PATH} pluginDir.cd(QStringLiteral("sink")); pluginDir.cd(QStringLiteral("resources")); - for (const QString &fileName: pluginDir.entryList(QDir::Files)) { + for (const QString &fileName : pluginDir.entryList(QDir::Files)) { const QString path = pluginDir.absoluteFilePath(fileName); QPluginLoader loader(path); @@ -115,7 +110,7 @@ ResourceFactory *ResourceFactory::load(const QString &resourceName) if (factory) { Private::s_loadedFactories.insert(resourceName, factory); factory->registerFacades(FacadeFactory::instance()); - //TODO: if we need more data on it const QJsonObject json = loader.metaData()[QStringLiteral("MetaData")].toObject(); + // TODO: if we need more data on it const QJsonObject json = loader.metaData()[QStringLiteral("MetaData")].toObject(); return factory; } else { qWarning() << "Plugin for" << resourceName << "from plugin" << loader.fileName() << "produced the wrong object type:" << object; @@ -135,7 +130,7 @@ ResourceFactory *ResourceFactory::load(const QString &resourceName) } // namespace Sink -//Ignore warning I don't know how to fix in a moc file +// Ignore warning I don't know how to fix in a moc file #pragma clang diagnostic push #pragma clang diagnostic ignored "-Wundefined-reinterpret-cast" #include "moc_resource.cpp" diff --git a/common/resource.h b/common/resource.h index d0100f6..0e7cd11 100644 --- a/common/resource.h +++ b/common/resource.h @@ -24,8 +24,7 @@ #include #include "notification.h" -namespace Sink -{ +namespace Sink { class FacadeFactory; /** @@ -66,7 +65,7 @@ signals: private: class Private; - Private * const d; + Private *const d; }; /** @@ -85,11 +84,9 @@ public: private: class Private; - Private * const d; + Private *const d; }; } // namespace Sink Q_DECLARE_INTERFACE(Sink::ResourceFactory, "org.kde.sink.resourcefactory") - - diff --git a/common/resourceaccess.cpp b/common/resourceaccess.cpp index 0716ae2..c8c8189 100644 --- a/common/resourceaccess.cpp +++ b/common/resourceaccess.cpp @@ -53,27 +53,20 @@ static void queuedInvoke(const std::function &f, QObject *context = 0) { auto timer = QSharedPointer::create(); timer->setSingleShot(true); - QObject::connect(timer.data(), &QTimer::timeout, context, [f, timer]() { - f(); - }); + QObject::connect(timer.data(), &QTimer::timeout, context, [f, timer]() { f(); }); timer->start(0); } -namespace Sink -{ +namespace Sink { struct QueuedCommand { public: - QueuedCommand(int commandId, const std::function &callback) - : commandId(commandId), - callback(callback) - {} - - QueuedCommand(int commandId, const QByteArray &b, const std::function &callback) - : commandId(commandId), - buffer(b), - callback(callback) + QueuedCommand(int commandId, const std::function &callback) : commandId(commandId), callback(callback) + { + } + + QueuedCommand(int commandId, const QByteArray &b, const std::function &callback) : commandId(commandId), buffer(b), callback(callback) { } @@ -102,17 +95,14 @@ public: QByteArray partialMessageBuffer; QVector> commandQueue; QMap> pendingCommands; - QMultiMap > resultHandler; + QMultiMap> resultHandler; QSet completeCommands; uint messageId; bool openingSocket; }; ResourceAccess::Private::Private(const QByteArray &name, const QByteArray &instanceIdentifier, ResourceAccess *q) - : resourceName(name), - resourceInstanceIdentifier(instanceIdentifier), - messageId(0), - openingSocket(false) + : resourceName(name), resourceInstanceIdentifier(instanceIdentifier), messageId(0), openingSocket(false) { } @@ -124,7 +114,7 @@ void ResourceAccess::Private::abortPendingOperations() } auto handlers = resultHandler.values(); resultHandler.clear(); - for(auto handler : handlers) { + for (auto handler : handlers) { handler(1, "The resource closed unexpectedly"); } } @@ -132,20 +122,20 @@ void ResourceAccess::Private::abortPendingOperations() void ResourceAccess::Private::callCallbacks() { for (auto id : completeCommands) { - //We remove the callbacks first because the handler can kill resourceaccess directly + // We remove the callbacks first because the handler can kill resourceaccess directly const auto callbacks = resultHandler.values(id); resultHandler.remove(id); - for(auto handler : callbacks) { + for (auto handler : callbacks) { handler(0, QString()); } } } -//Connects to server and returns connected socket on success -KAsync::Job > ResourceAccess::connectToServer(const QByteArray &identifier) +// Connects to server and returns connected socket on success +KAsync::Job> ResourceAccess::connectToServer(const QByteArray &identifier) { auto s = QSharedPointer::create(); - return KAsync::start >([identifier, s](KAsync::Future > &future) { + return KAsync::start>([identifier, s](KAsync::Future> &future) { s->setServerName(identifier); auto context = new QObject; QObject::connect(s.data(), &QLocalSocket::connected, context, [&future, &s, context]() { @@ -154,7 +144,7 @@ KAsync::Job > ResourceAccess::connectToServer(const future.setValue(s); future.setFinished(); }); - QObject::connect(s.data(), static_cast(&QLocalSocket::error), context, [&future, context](QLocalSocket::LocalSocketError) { + QObject::connect(s.data(), static_cast(&QLocalSocket::error), context, [&future, context](QLocalSocket::LocalSocketError) { delete context; future.setError(-1, "Failed to connect to server."); }); @@ -164,66 +154,67 @@ KAsync::Job > ResourceAccess::connectToServer(const KAsync::Job ResourceAccess::Private::tryToConnect() { - //We may have a socket from the last connection leftover + // We may have a socket from the last connection leftover socket.reset(); auto counter = QSharedPointer::create(0); - return KAsync::dowhile([this]() -> bool { - return !socket; - }, - [this, counter](KAsync::Future &future) { - TracePrivate() << "Loop"; - connectToServer(resourceInstanceIdentifier) - .then >([this, &future](const QSharedPointer &s) { - Q_ASSERT(s); - socket = s; - future.setFinished(); - }, [&future, counter, this](int errorCode, const QString &errorString) { - static int waitTime = 10; - static int timeout = 500; - static int maxRetries = timeout / waitTime; - if (*counter > maxRetries) { - TracePrivate() << "Giving up"; - future.setError(-1, "Failed to connect to socket"); - } else { - KAsync::wait(waitTime).then([&future]() { - future.setFinished(); - }).exec(); - } - *counter = *counter + 1; - }) - .exec(); - }); + return KAsync::dowhile([this]() -> bool { return !socket; }, + [this, counter](KAsync::Future &future) { + TracePrivate() << "Loop"; + connectToServer(resourceInstanceIdentifier) + .then>( + [this, &future](const QSharedPointer &s) { + Q_ASSERT(s); + socket = s; + future.setFinished(); + }, + [&future, counter, this](int errorCode, const QString &errorString) { + static int waitTime = 10; + static int timeout = 500; + static int maxRetries = timeout / waitTime; + if (*counter > maxRetries) { + TracePrivate() << "Giving up"; + future.setError(-1, "Failed to connect to socket"); + } else { + KAsync::wait(waitTime).then([&future]() { future.setFinished(); }).exec(); + } + *counter = *counter + 1; + }) + .exec(); + }); } KAsync::Job ResourceAccess::Private::initializeSocket() { return KAsync::start([this](KAsync::Future &future) { TracePrivate() << "Trying to connect"; - connectToServer(resourceInstanceIdentifier).then >([this, &future](const QSharedPointer &s) { - TracePrivate() << "Connected to resource, without having to start it."; - Q_ASSERT(s); - socket = s; - future.setFinished(); - }, - [this, &future](int errorCode, const QString &errorString) { - TracePrivate() << "Failed to connect, starting resource"; - //We failed to connect, so let's start the resource - QStringList args; - args << resourceInstanceIdentifier; - qint64 pid = 0; - if (QProcess::startDetached("sink_synchronizer", args, QDir::homePath(), &pid)) { - TracePrivate() << "Started resource " << pid; - tryToConnect() - .then([&future]() { + connectToServer(resourceInstanceIdentifier) + .then>( + [this, &future](const QSharedPointer &s) { + TracePrivate() << "Connected to resource, without having to start it."; + Q_ASSERT(s); + socket = s; future.setFinished(); - }, [this, &future](int errorCode, const QString &errorString) { - Warning() << "Failed to connect to started resource"; - future.setError(errorCode, errorString); - }).exec(); - } else { - Warning() << "Failed to start resource"; - } - }).exec(); + }, + [this, &future](int errorCode, const QString &errorString) { + TracePrivate() << "Failed to connect, starting resource"; + // We failed to connect, so let's start the resource + QStringList args; + args << resourceInstanceIdentifier; + qint64 pid = 0; + if (QProcess::startDetached("sink_synchronizer", args, QDir::homePath(), &pid)) { + TracePrivate() << "Started resource " << pid; + tryToConnect() + .then([&future]() { future.setFinished(); }, + [this, &future](int errorCode, const QString &errorString) { + Warning() << "Failed to connect to started resource"; + future.setError(errorCode, errorString); + }) + .exec(); + } else { + Warning() << "Failed to start resource"; + } + }) + .exec(); }); } @@ -235,8 +226,7 @@ static QByteArray getResourceName(const QByteArray &instanceIdentifier) } ResourceAccess::ResourceAccess(const QByteArray &resourceInstanceIdentifier) - : ResourceAccessInterface(), - d(new Private(getResourceName(resourceInstanceIdentifier), resourceInstanceIdentifier, this)) + : ResourceAccessInterface(), d(new Private(getResourceName(resourceInstanceIdentifier), resourceInstanceIdentifier, this)) { Log() << "Starting access"; } @@ -280,10 +270,10 @@ KAsync::Job ResourceAccess::sendCommand(int commandId) }); } -KAsync::Job ResourceAccess::sendCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb) +KAsync::Job ResourceAccess::sendCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb) { - //The flatbuffer is transient, but we want to store it until the job is executed - QByteArray buffer(reinterpret_cast(fbb.GetBufferPointer()), fbb.GetSize()); + // The flatbuffer is transient, but we want to store it until the job is executed + QByteArray buffer(reinterpret_cast(fbb.GetBufferPointer()), fbb.GetSize()); return KAsync::start([commandId, buffer, this](KAsync::Future &f) { auto callback = [&f](int error, const QString &errorMessage) { if (error) { @@ -313,7 +303,7 @@ KAsync::Job ResourceAccess::synchronizeResource(bool sourceSync, bool loca KAsync::Job ResourceAccess::sendCreateCommand(const QByteArray &resourceBufferType, const QByteArray &buffer) { flatbuffers::FlatBufferBuilder fbb; - //This is the resource buffer type and not the domain type + // This is the resource buffer type and not the domain type auto type = fbb.CreateString(resourceBufferType.constData()); auto delta = Sink::EntityBuffer::appendAsVector(fbb, buffer.constData(), buffer.size()); auto location = Sink::Commands::CreateCreateEntity(fbb, 0, type, delta); @@ -322,13 +312,14 @@ KAsync::Job ResourceAccess::sendCreateCommand(const QByteArray &resourceBu return sendCommand(Sink::Commands::CreateEntityCommand, fbb); } -KAsync::Job ResourceAccess::sendModifyCommand(const QByteArray &uid, qint64 revision, const QByteArray &resourceBufferType, const QByteArrayList &deletedProperties, const QByteArray &buffer) +KAsync::Job +ResourceAccess::sendModifyCommand(const QByteArray &uid, qint64 revision, const QByteArray &resourceBufferType, const QByteArrayList &deletedProperties, const QByteArray &buffer) { flatbuffers::FlatBufferBuilder fbb; auto entityId = fbb.CreateString(uid.constData()); - //This is the resource buffer type and not the domain type + // This is the resource buffer type and not the domain type auto type = fbb.CreateString(resourceBufferType.constData()); - //FIXME + // FIXME auto deletions = 0; auto delta = Sink::EntityBuffer::appendAsVector(fbb, buffer.constData(), buffer.size()); auto location = Sink::Commands::CreateModifyEntity(fbb, revision, entityId, deletions, type, delta); @@ -341,7 +332,7 @@ KAsync::Job ResourceAccess::sendDeleteCommand(const QByteArray &uid, qint6 { flatbuffers::FlatBufferBuilder fbb; auto entityId = fbb.CreateString(uid.constData()); - //This is the resource buffer type and not the domain type + // This is the resource buffer type and not the domain type auto type = fbb.CreateString(resourceBufferType.constData()); auto location = Sink::Commands::CreateDeleteEntity(fbb, revision, entityId, type); Sink::Commands::FinishDeleteEntityBuffer(fbb, location); @@ -358,7 +349,8 @@ KAsync::Job ResourceAccess::sendRevisionReplayedCommand(qint64 revision) return sendCommand(Sink::Commands::RevisionReplayedCommand, fbb); } -KAsync::Job ResourceAccess::sendInspectionCommand(const QByteArray &inspectionId, const QByteArray &domainType, const QByteArray &entityId, const QByteArray &property, const QVariant &expectedValue) +KAsync::Job +ResourceAccess::sendInspectionCommand(const QByteArray &inspectionId, const QByteArray &domainType, const QByteArray &entityId, const QByteArray &property, const QVariant &expectedValue) { flatbuffers::FlatBufferBuilder fbb; auto id = fbb.CreateString(inspectionId.toStdString()); @@ -371,7 +363,7 @@ KAsync::Job ResourceAccess::sendInspectionCommand(const QByteArray &inspec s << expectedValue; auto expected = fbb.CreateString(array.toStdString()); - auto location = Sink::Commands::CreateInspection (fbb, id, 0, entity, domain, prop, expected); + auto location = Sink::Commands::CreateInspection(fbb, id, 0, entity, domain, prop, expected); Sink::Commands::FinishInspectionBuffer(fbb, location); open(); return sendCommand(Sink::Commands::InspectionCommand, fbb); @@ -389,21 +381,21 @@ void ResourceAccess::open() auto time = QSharedPointer::create(); time->start(); d->openingSocket = true; - d->initializeSocket().then([this, time]() { - Trace() << "Socket is initialized." << Log::TraceTime(time->elapsed()); - d->openingSocket = false; - QObject::connect(d->socket.data(), &QLocalSocket::disconnected, - this, &ResourceAccess::disconnected); - QObject::connect(d->socket.data(), SIGNAL(error(QLocalSocket::LocalSocketError)), - this, SLOT(connectionError(QLocalSocket::LocalSocketError))); - QObject::connect(d->socket.data(), &QIODevice::readyRead, - this, &ResourceAccess::readResourceMessage); - connected(); - }, - [this](int error, const QString &errorString) { - d->openingSocket = false; - Warning() << "Failed to initialize socket " << errorString; - }).exec(); + d->initializeSocket() + .then( + [this, time]() { + Trace() << "Socket is initialized." << Log::TraceTime(time->elapsed()); + d->openingSocket = false; + QObject::connect(d->socket.data(), &QLocalSocket::disconnected, this, &ResourceAccess::disconnected); + QObject::connect(d->socket.data(), SIGNAL(error(QLocalSocket::LocalSocketError)), this, SLOT(connectionError(QLocalSocket::LocalSocketError))); + QObject::connect(d->socket.data(), &QIODevice::readyRead, this, &ResourceAccess::readResourceMessage); + connected(); + }, + [this](int error, const QString &errorString) { + d->openingSocket = false; + Warning() << "Failed to initialize socket " << errorString; + }) + .exec(); } void ResourceAccess::close() @@ -417,7 +409,7 @@ void ResourceAccess::close() void ResourceAccess::sendCommand(const QSharedPointer &command) { Q_ASSERT(isReady()); - //TODO: we should have a timeout for commands + // TODO: we should have a timeout for commands d->messageId++; const auto messageId = d->messageId; Log() << QString("Sending command \"%1\" with messageId %2").arg(QString(Sink::Commands::name(command->commandId))).arg(d->messageId); @@ -427,17 +419,17 @@ void ResourceAccess::sendCommand(const QSharedPointer &command) d->pendingCommands.remove(messageId); command->callback(errorCode, errorMessage); }); - //Keep track of the command until we're sure it arrived + // Keep track of the command until we're sure it arrived d->pendingCommands.insert(d->messageId, command); Commands::write(d->socket.data(), d->messageId, command->commandId, command->buffer.constData(), command->buffer.size()); } void ResourceAccess::processCommandQueue() { - //TODO: serialize instead of blast them all through the socket? + // TODO: serialize instead of blast them all through the socket? Trace() << "We have " << d->commandQueue.size() << " queued commands"; Trace() << "Pending commands: " << d->pendingCommands.size(); - for (auto command: d->commandQueue) { + for (auto command : d->commandQueue) { sendCommand(command); } d->commandQueue.clear(); @@ -446,7 +438,7 @@ void ResourceAccess::processCommandQueue() void ResourceAccess::processPendingCommandQueue() { Trace() << "We have " << d->pendingCommands.size() << " pending commands"; - for (auto command: d->pendingCommands) { + for (auto command : d->pendingCommands) { Trace() << "Reenquing command " << command->commandId; d->commandQueue << command; } @@ -471,9 +463,9 @@ void ResourceAccess::connected() Commands::write(d->socket.data(), ++d->messageId, Commands::HandshakeCommand, fbb); } - //Reenqueue pending commands, we failed to send them + // Reenqueue pending commands, we failed to send them processPendingCommandQueue(); - //Send queued commands + // Send queued commands processCommandQueue(); emit ready(true); @@ -510,7 +502,8 @@ void ResourceAccess::readResourceMessage() d->partialMessageBuffer += d->socket->readAll(); // should be scheduled rather than processed all at once - while (processMessageBuffer()) {} + while (processMessageBuffer()) { + } } bool ResourceAccess::processMessageBuffer() @@ -521,9 +514,9 @@ bool ResourceAccess::processMessageBuffer() return false; } - //const uint messageId = *(int*)(d->partialMessageBuffer.constData()); - const int commandId = *(int*)(d->partialMessageBuffer.constData() + sizeof(uint)); - const uint size = *(int*)(d->partialMessageBuffer.constData() + sizeof(int) + sizeof(uint)); + // const uint messageId = *(int*)(d->partialMessageBuffer.constData()); + const int commandId = *(int *)(d->partialMessageBuffer.constData() + sizeof(uint)); + const uint size = *(int *)(d->partialMessageBuffer.constData() + sizeof(int) + sizeof(uint)); if (size > (uint)(d->partialMessageBuffer.size() - headerSize)) { Warning() << "command too small"; @@ -546,10 +539,8 @@ bool ResourceAccess::processMessageBuffer() Log() << QString("Command with messageId %1 completed %2").arg(buffer->id()).arg(buffer->success() ? "sucessfully" : "unsuccessfully"); d->completeCommands << buffer->id(); - //The callbacks can result in this object getting destroyed directly, so we need to ensure we finish our work first - queuedInvoke([=]() { - d->callCallbacks(); - }, this); + // The callbacks can result in this object getting destroyed directly, so we need to ensure we finish our work first + queuedInvoke([=]() { d->callCallbacks(); }, this); break; } case Commands::NotificationCommand: { @@ -563,21 +554,18 @@ bool ResourceAccess::processMessageBuffer() Log() << "Received inspection notification."; Notification n; if (buffer->identifier()) { - //Don't use fromRawData, the buffer is gone once we invoke emit notification + // Don't use fromRawData, the buffer is gone once we invoke emit notification n.id = BufferUtils::extractBufferCopy(buffer->identifier()); } if (buffer->message()) { - //Don't use fromRawData, the buffer is gone once we invoke emit notification + // Don't use fromRawData, the buffer is gone once we invoke emit notification n.message = BufferUtils::extractBufferCopy(buffer->message()); } n.type = buffer->type(); n.code = buffer->code(); - //The callbacks can result in this object getting destroyed directly, so we need to ensure we finish our work first - queuedInvoke([=]() { - emit notification(n); - }, this); - } - break; + // The callbacks can result in this object getting destroyed directly, so we need to ensure we finish our work first + queuedInvoke([=]() { emit notification(n); }, this); + } break; case Sink::Commands::NotificationType::NotificationType_Status: case Sink::Commands::NotificationType::NotificationType_Warning: case Sink::Commands::NotificationType::NotificationType_Progress: @@ -608,7 +596,7 @@ ResourceAccessFactory &ResourceAccessFactory::instance() Sink::ResourceAccess::Ptr ResourceAccessFactory::getAccess(const QByteArray &instanceIdentifier) { if (!mCache.contains(instanceIdentifier)) { - //Reuse the pointer if something else kept the resourceaccess alive + // Reuse the pointer if something else kept the resourceaccess alive if (mWeakCache.contains(instanceIdentifier)) { auto sharedPointer = mWeakCache.value(instanceIdentifier).toStrongRef(); if (sharedPointer) { @@ -616,7 +604,7 @@ Sink::ResourceAccess::Ptr ResourceAccessFactory::getAccess(const QByteArray &ins } } if (!mCache.contains(instanceIdentifier)) { - //Create a new instance if necessary + // Create a new instance if necessary auto sharedPointer = Sink::ResourceAccess::Ptr::create(instanceIdentifier); QObject::connect(sharedPointer.data(), &Sink::ResourceAccess::ready, sharedPointer.data(), [this, instanceIdentifier](bool ready) { if (!ready) { @@ -629,10 +617,8 @@ Sink::ResourceAccess::Ptr ResourceAccessFactory::getAccess(const QByteArray &ins } if (!mTimer.contains(instanceIdentifier)) { auto timer = new QTimer; - //Drop connection after 3 seconds (which is a random value) - QObject::connect(timer, &QTimer::timeout, timer, [this, instanceIdentifier]() { - mCache.remove(instanceIdentifier); - }); + // Drop connection after 3 seconds (which is a random value) + QObject::connect(timer, &QTimer::timeout, timer, [this, instanceIdentifier]() { mCache.remove(instanceIdentifier); }); timer->setInterval(3000); mTimer.insert(instanceIdentifier, timer); } @@ -640,7 +626,6 @@ Sink::ResourceAccess::Ptr ResourceAccessFactory::getAccess(const QByteArray &ins timer->start(); return mCache.value(instanceIdentifier); } - } #pragma clang diagnostic push diff --git a/common/resourceaccess.h b/common/resourceaccess.h index ce7e174..bd9af65 100644 --- a/common/resourceaccess.h +++ b/common/resourceaccess.h @@ -30,8 +30,7 @@ #include #include "notification.h" -namespace Sink -{ +namespace Sink { struct QueuedCommand; @@ -41,17 +40,37 @@ class SINK_EXPORT ResourceAccessInterface : public QObject public: typedef QSharedPointer Ptr; - ResourceAccessInterface() {} - virtual ~ResourceAccessInterface() {} + ResourceAccessInterface() + { + } + virtual ~ResourceAccessInterface() + { + } virtual KAsync::Job sendCommand(int commandId) = 0; virtual KAsync::Job sendCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb) = 0; virtual KAsync::Job synchronizeResource(bool remoteSync, bool localSync) = 0; - virtual KAsync::Job sendCreateCommand(const QByteArray &resourceBufferType, const QByteArray &buffer) { return KAsync::null(); }; - virtual KAsync::Job sendModifyCommand(const QByteArray &uid, qint64 revision, const QByteArray &resourceBufferType, const QByteArrayList &deletedProperties, const QByteArray &buffer) { return KAsync::null(); }; - virtual KAsync::Job sendDeleteCommand(const QByteArray &uid, qint64 revision, const QByteArray &resourceBufferType) { return KAsync::null(); }; - virtual KAsync::Job sendRevisionReplayedCommand(qint64 revision) {return KAsync::null(); }; - virtual KAsync::Job sendInspectionCommand(const QByteArray &inspectionId, const QByteArray &domainType, const QByteArray &entityId, const QByteArray &property, const QVariant &expecedValue) {return KAsync::null(); }; + virtual KAsync::Job sendCreateCommand(const QByteArray &resourceBufferType, const QByteArray &buffer) + { + return KAsync::null(); + }; + virtual KAsync::Job sendModifyCommand(const QByteArray &uid, qint64 revision, const QByteArray &resourceBufferType, const QByteArrayList &deletedProperties, const QByteArray &buffer) + { + return KAsync::null(); + }; + virtual KAsync::Job sendDeleteCommand(const QByteArray &uid, qint64 revision, const QByteArray &resourceBufferType) + { + return KAsync::null(); + }; + virtual KAsync::Job sendRevisionReplayedCommand(qint64 revision) + { + return KAsync::null(); + }; + virtual KAsync::Job + sendInspectionCommand(const QByteArray &inspectionId, const QByteArray &domainType, const QByteArray &entityId, const QByteArray &property, const QVariant &expecedValue) + { + return KAsync::null(); + }; signals: void ready(bool isReady); @@ -79,21 +98,23 @@ public: KAsync::Job sendCommand(int commandId, flatbuffers::FlatBufferBuilder &fbb) Q_DECL_OVERRIDE; KAsync::Job synchronizeResource(bool remoteSync, bool localSync) Q_DECL_OVERRIDE; KAsync::Job sendCreateCommand(const QByteArray &resourceBufferType, const QByteArray &buffer) Q_DECL_OVERRIDE; - KAsync::Job sendModifyCommand(const QByteArray &uid, qint64 revision, const QByteArray &resourceBufferType, const QByteArrayList &deletedProperties, const QByteArray &buffer) Q_DECL_OVERRIDE; + KAsync::Job + sendModifyCommand(const QByteArray &uid, qint64 revision, const QByteArray &resourceBufferType, const QByteArrayList &deletedProperties, const QByteArray &buffer) Q_DECL_OVERRIDE; KAsync::Job sendDeleteCommand(const QByteArray &uid, qint64 revision, const QByteArray &resourceBufferType) Q_DECL_OVERRIDE; KAsync::Job sendRevisionReplayedCommand(qint64 revision) Q_DECL_OVERRIDE; - KAsync::Job sendInspectionCommand(const QByteArray &inspectionId, const QByteArray &domainType, const QByteArray &entityId, const QByteArray &property, const QVariant &expecedValue) Q_DECL_OVERRIDE; + KAsync::Job + sendInspectionCommand(const QByteArray &inspectionId, const QByteArray &domainType, const QByteArray &entityId, const QByteArray &property, const QVariant &expecedValue) Q_DECL_OVERRIDE; /** * Tries to connect to server, and returns a connected socket on success. */ - static KAsync::Job > connectToServer(const QByteArray &identifier); + static KAsync::Job> connectToServer(const QByteArray &identifier); public slots: void open() Q_DECL_OVERRIDE; void close() Q_DECL_OVERRIDE; private slots: - //TODO: move these to the Private class + // TODO: move these to the Private class void disconnected(); void connectionError(QLocalSocket::LocalSocketError error); void readResourceMessage(); @@ -108,23 +129,22 @@ private: void processPendingCommandQueue(); class Private; - Private * const d; + Private *const d; }; /** * A factory for resource access instances that caches the instance for some time. - * + * * This avoids constantly recreating connections, and should allow a single process to have one connection per resource. */ -class ResourceAccessFactory { +class ResourceAccessFactory +{ public: static ResourceAccessFactory &instance(); Sink::ResourceAccess::Ptr getAccess(const QByteArray &instanceIdentifier); - QHash > mWeakCache; + QHash> mWeakCache; QHash mCache; - QHash mTimer; + QHash mTimer; }; - - } diff --git a/common/resourceconfig.cpp b/common/resourceconfig.cpp index a34340b..b988718 100644 --- a/common/resourceconfig.cpp +++ b/common/resourceconfig.cpp @@ -101,4 +101,3 @@ QMap ResourceConfig::getConfiguration(const QByteArray &id } return configuration; } - diff --git a/common/resourcecontrol.cpp b/common/resourcecontrol.cpp index 20125ac..d8b0972 100644 --- a/common/resourcecontrol.cpp +++ b/common/resourcecontrol.cpp @@ -32,28 +32,31 @@ #undef DEBUG_AREA #define DEBUG_AREA "client.resourcecontrol" -namespace Sink -{ +namespace Sink { KAsync::Job ResourceControl::shutdown(const QByteArray &identifier) { Trace() << "shutdown " << identifier; auto time = QSharedPointer::create(); time->start(); - return ResourceAccess::connectToServer(identifier).then>([identifier, time](QSharedPointer socket, KAsync::Future &future) { - //We can't currently reuse the socket - socket->close(); - auto resourceAccess = ResourceAccessFactory::instance().getAccess(identifier); - resourceAccess->open(); - resourceAccess->sendCommand(Sink::Commands::ShutdownCommand).then([&future, resourceAccess, time]() { - Trace() << "Shutdown complete." << Log::TraceTime(time->elapsed()); - future.setFinished(); - }).exec(); - }, - [](int, const QString &) { - Trace() << "Resource is already closed."; - //Resource isn't started, nothing to shutdown - }); + return ResourceAccess::connectToServer(identifier) + .then>( + [identifier, time](QSharedPointer socket, KAsync::Future &future) { + // We can't currently reuse the socket + socket->close(); + auto resourceAccess = ResourceAccessFactory::instance().getAccess(identifier); + resourceAccess->open(); + resourceAccess->sendCommand(Sink::Commands::ShutdownCommand) + .then([&future, resourceAccess, time]() { + Trace() << "Shutdown complete." << Log::TraceTime(time->elapsed()); + future.setFinished(); + }) + .exec(); + }, + [](int, const QString &) { + Trace() << "Resource is already closed."; + // Resource isn't started, nothing to shutdown + }); } KAsync::Job ResourceControl::start(const QByteArray &identifier) @@ -63,23 +66,19 @@ KAsync::Job ResourceControl::start(const QByteArray &identifier) time->start(); auto resourceAccess = ResourceAccessFactory::instance().getAccess(identifier); resourceAccess->open(); - return resourceAccess->sendCommand(Sink::Commands::PingCommand).then([resourceAccess, time]() { - Trace() << "Start complete." << Log::TraceTime(time->elapsed()); - }); + return resourceAccess->sendCommand(Sink::Commands::PingCommand).then([resourceAccess, time]() { Trace() << "Start complete." << Log::TraceTime(time->elapsed()); }); } KAsync::Job ResourceControl::flushMessageQueue(const QByteArrayList &resourceIdentifier) { Trace() << "flushMessageQueue" << resourceIdentifier; return KAsync::iterate(resourceIdentifier) - .template each([](const QByteArray &resource, KAsync::Future &future) { - Trace() << "Flushing message queue " << resource; - auto resourceAccess = ResourceAccessFactory::instance().getAccess(resource); - resourceAccess->open(); - resourceAccess->synchronizeResource(false, true).then([&future, resourceAccess]() { - future.setFinished(); - }).exec(); - }); + .template each([](const QByteArray &resource, KAsync::Future &future) { + Trace() << "Flushing message queue " << resource; + auto resourceAccess = ResourceAccessFactory::instance().getAccess(resource); + resourceAccess->open(); + resourceAccess->synchronizeResource(false, true).then([&future, resourceAccess]() { future.setFinished(); }).exec(); + }); } KAsync::Job ResourceControl::flushReplayQueue(const QByteArrayList &resourceIdentifier) @@ -114,7 +113,7 @@ KAsync::Job ResourceControl::inspect(const Inspection &inspectionCommand) }); } -#define REGISTER_TYPE(T) template KAsync::Job ResourceControl::inspect(const Inspection &); \ +#define REGISTER_TYPE(T) template KAsync::Job ResourceControl::inspect(const Inspection &); REGISTER_TYPE(ApplicationDomain::Event); REGISTER_TYPE(ApplicationDomain::Mail); @@ -122,4 +121,3 @@ REGISTER_TYPE(ApplicationDomain::Folder); REGISTER_TYPE(ApplicationDomain::SinkResource); } // namespace Sink - diff --git a/common/resourcecontrol.h b/common/resourcecontrol.h index 5bfa67f..d483153 100644 --- a/common/resourcecontrol.h +++ b/common/resourcecontrol.h @@ -40,7 +40,7 @@ KAsync::Job SINK_EXPORT shutdown(const QByteArray &resourceIdentifier); /** * Start resource. - * + * * The resource is ready for operation once this command completes. * This command is only necessary if a resource was shutdown previously, * otherwise the resource process will automatically start as necessary. @@ -56,7 +56,5 @@ KAsync::Job SINK_EXPORT flushMessageQueue(const QByteArrayList &resourceId * Flushes any pending messages that haven't been replayed to the source. */ KAsync::Job SINK_EXPORT flushReplayQueue(const QByteArrayList &resourceIdentifier); - - } } - +} diff --git a/common/resourcefacade.cpp b/common/resourcefacade.cpp index 414e390..9294926 100644 --- a/common/resourcefacade.cpp +++ b/common/resourcefacade.cpp @@ -24,15 +24,12 @@ #include "storage.h" #include -ResourceFacade::ResourceFacade(const QByteArray &) - : Sink::StoreFacade() +ResourceFacade::ResourceFacade(const QByteArray &) : Sink::StoreFacade() { - } ResourceFacade::~ResourceFacade() { - } KAsync::Job ResourceFacade::create(const Sink::ApplicationDomain::SinkResource &resource) @@ -40,14 +37,14 @@ KAsync::Job ResourceFacade::create(const Sink::ApplicationDomain::SinkReso return KAsync::start([resource, this]() { const QByteArray type = resource.getProperty("type").toByteArray(); const QByteArray providedIdentifier = resource.getProperty("identifier").toByteArray(); - //It is currently a requirement that the resource starts with the type + // It is currently a requirement that the resource starts with the type const QByteArray identifier = providedIdentifier.isEmpty() ? ResourceConfig::newIdentifier(type) : providedIdentifier; ResourceConfig::addResource(identifier, type); auto changedProperties = resource.changedProperties(); changedProperties.removeOne("identifier"); changedProperties.removeOne("type"); if (!changedProperties.isEmpty()) { - //We have some configuration values + // We have some configuration values QMap configurationValues; for (const auto &property : changedProperties) { configurationValues.insert(property, resource.getProperty(property)); @@ -69,7 +66,7 @@ KAsync::Job ResourceFacade::modify(const Sink::ApplicationDomain::SinkReso changedProperties.removeOne("identifier"); changedProperties.removeOne("type"); if (!changedProperties.isEmpty()) { - //We have some configuration values + // We have some configuration values QMap configurationValues; for (const auto &property : changedProperties) { configurationValues.insert(property, resource.getProperty(property)); @@ -88,7 +85,7 @@ KAsync::Job ResourceFacade::remove(const Sink::ApplicationDomain::SinkReso return; } ResourceConfig::removeResource(identifier); - //TODO shutdown resource, or use the resource process with a --remove option to cleanup (so we can take advantage of the file locking) + // TODO shutdown resource, or use the resource process with a --remove option to cleanup (so we can take advantage of the file locking) QDir dir(Sink::storageLocation()); for (const auto &folder : dir.entryList(QStringList() << identifier + "*")) { Sink::Storage(Sink::storageLocation(), folder, Sink::Storage::ReadWrite).removeFromDisk(); @@ -96,14 +93,12 @@ KAsync::Job ResourceFacade::remove(const Sink::ApplicationDomain::SinkReso }); } -QPair, typename Sink::ResultEmitter::Ptr > ResourceFacade::load(const Sink::Query &query) +QPair, typename Sink::ResultEmitter::Ptr> ResourceFacade::load(const Sink::Query &query) { auto resultProvider = new Sink::ResultProvider(); auto emitter = resultProvider->emitter(); resultProvider->setFetcher([](const QSharedPointer &) {}); - resultProvider->onDone([resultProvider]() { - delete resultProvider; - }); + resultProvider->onDone([resultProvider]() { delete resultProvider; }); auto job = KAsync::start([query, resultProvider]() { const auto configuredResources = ResourceConfig::getResources(); for (const auto &res : configuredResources.keys()) { @@ -114,10 +109,9 @@ QPair, typename Sink::ResultEmitteradd(resource); } } - //TODO initialResultSetComplete should be implicit + // TODO initialResultSetComplete should be implicit resultProvider->initialResultSetComplete(Sink::ApplicationDomain::SinkResource::Ptr()); resultProvider->complete(); }); return qMakePair(job, emitter); } - diff --git a/common/resourcefacade.h b/common/resourcefacade.h index 5ddaa79..3de0e25 100644 --- a/common/resourcefacade.h +++ b/common/resourcefacade.h @@ -26,8 +26,8 @@ #include "common/domain/applicationdomaintype.h" namespace Sink { - class Query; - class Inspection; +class Query; +class Inspection; } class ResourceFacade : public Sink::StoreFacade @@ -38,6 +38,5 @@ public: KAsync::Job create(const Sink::ApplicationDomain::SinkResource &resource) Q_DECL_OVERRIDE; KAsync::Job modify(const Sink::ApplicationDomain::SinkResource &resource) Q_DECL_OVERRIDE; KAsync::Job remove(const Sink::ApplicationDomain::SinkResource &resource) Q_DECL_OVERRIDE; - QPair, typename Sink::ResultEmitter::Ptr > load(const Sink::Query &query) Q_DECL_OVERRIDE; + QPair, typename Sink::ResultEmitter::Ptr> load(const Sink::Query &query) Q_DECL_OVERRIDE; }; - diff --git a/common/resultprovider.h b/common/resultprovider.h index 2d6efbe..5561ff2 100644 --- a/common/resultprovider.h +++ b/common/resultprovider.h @@ -32,22 +32,19 @@ namespace Sink { /** * Query result set */ -template +template class ResultEmitter; -template +template class ResultProviderInterface { public: - ResultProviderInterface() - : mRevision(0) + ResultProviderInterface() : mRevision(0) { - } virtual ~ResultProviderInterface() { - } virtual void add(const T &value) = 0; @@ -75,17 +72,18 @@ private: /* * The promise side for the result emitter */ -template -class ResultProvider : public ResultProviderInterface { +template +class ResultProvider : public ResultProviderInterface +{ private: void callInMainThreadOnEmitter(void (ResultEmitter::*f)()) { - //We use the eventloop to call the addHandler directly from the main eventloop. - //That way the result emitter implementation doesn't have to care about threadsafety at all. - //The alternative would be to make all handlers of the emitter threadsafe. + // We use the eventloop to call the addHandler directly from the main eventloop. + // That way the result emitter implementation doesn't have to care about threadsafety at all. + // The alternative would be to make all handlers of the emitter threadsafe. if (auto emitter = mResultEmitter.toStrongRef()) { auto weakEmitter = mResultEmitter; - //We don't want to keep the emitter alive here, so we only capture a weak reference + // We don't want to keep the emitter alive here, so we only capture a weak reference emitter->mThreadBoundary.callInMainThread([weakEmitter, f]() { if (auto strongRef = weakEmitter.toStrongRef()) { (strongRef.data()->*f)(); @@ -96,27 +94,27 @@ private: void callInMainThreadOnEmitter(const std::function &f) { - //We use the eventloop to call the addHandler directly from the main eventloop. - //That way the result emitter implementation doesn't have to care about threadsafety at all. - //The alternative would be to make all handlers of the emitter threadsafe. + // We use the eventloop to call the addHandler directly from the main eventloop. + // That way the result emitter implementation doesn't have to care about threadsafety at all. + // The alternative would be to make all handlers of the emitter threadsafe. if (auto emitter = mResultEmitter.toStrongRef()) { emitter->mThreadBoundary.callInMainThread(f); } } public: - typedef QSharedPointer > Ptr; + typedef QSharedPointer> Ptr; virtual ~ResultProvider() { } - //Called from worker thread + // Called from worker thread void add(const T &value) { - //Because I don't know how to use bind + // Because I don't know how to use bind auto weakEmitter = mResultEmitter; - callInMainThreadOnEmitter([weakEmitter, value](){ + callInMainThreadOnEmitter([weakEmitter, value]() { if (auto strongRef = weakEmitter.toStrongRef()) { strongRef->addHandler(value); } @@ -125,9 +123,9 @@ public: void modify(const T &value) { - //Because I don't know how to use bind + // Because I don't know how to use bind auto weakEmitter = mResultEmitter; - callInMainThreadOnEmitter([weakEmitter, value](){ + callInMainThreadOnEmitter([weakEmitter, value]() { if (auto strongRef = weakEmitter.toStrongRef()) { strongRef->modifyHandler(value); } @@ -136,9 +134,9 @@ public: void remove(const T &value) { - //Because I don't know how to use bind + // Because I don't know how to use bind auto weakEmitter = mResultEmitter; - callInMainThreadOnEmitter([weakEmitter, value](){ + callInMainThreadOnEmitter([weakEmitter, value]() { if (auto strongRef = weakEmitter.toStrongRef()) { strongRef->removeHandler(value); } @@ -147,16 +145,16 @@ public: void initialResultSetComplete(const T &parent) { - //Because I don't know how to use bind + // Because I don't know how to use bind auto weakEmitter = mResultEmitter; - callInMainThreadOnEmitter([weakEmitter, parent](){ + callInMainThreadOnEmitter([weakEmitter, parent]() { if (auto strongRef = weakEmitter.toStrongRef()) { strongRef->initialResultSetComplete(parent); } }); } - //Called from worker thread + // Called from worker thread void complete() { callInMainThreadOnEmitter(&ResultEmitter::complete); @@ -168,11 +166,14 @@ public: } - QSharedPointer > emitter() + QSharedPointer> emitter() { if (!mResultEmitter) { - //We have to go over a separate var and return that, otherwise we'd delete the emitter immediately again - auto sharedPtr = QSharedPointer >(new ResultEmitter, [this](ResultEmitter *emitter){ mThreadBoundary->callInMainThread([this]() {done();}); delete emitter; }); + // We have to go over a separate var and return that, otherwise we'd delete the emitter immediately again + auto sharedPtr = QSharedPointer>(new ResultEmitter, [this](ResultEmitter *emitter) { + mThreadBoundary->callInMainThread([this]() { done(); }); + delete emitter; + }); mResultEmitter = sharedPtr; sharedPtr->setFetcher([this](const T &parent) { Q_ASSERT(mFetcher); @@ -192,7 +193,7 @@ public: bool isDone() const { - //The existance of the emitter currently defines wether we're done or not. + // The existance of the emitter currently defines wether we're done or not. return mResultEmitter.toStrongRef().isNull(); } @@ -208,12 +209,12 @@ private: if (mOnDoneCallback) { auto callback = mOnDoneCallback; mOnDoneCallback = std::function(); - //This may delete this object + // This may delete this object callback(); } } - QWeakPointer > mResultEmitter; + QWeakPointer> mResultEmitter; std::function mOnDoneCallback; QSharedPointer mThreadBoundary; std::function mFetcher; @@ -231,32 +232,32 @@ private: * * build sync interfaces that block when accessing the value * */ -template -class ResultEmitter { +template +class ResultEmitter +{ public: - typedef QSharedPointer > Ptr; + typedef QSharedPointer> Ptr; virtual ~ResultEmitter() { - } - void onAdded(const std::function &handler) + void onAdded(const std::function &handler) { addHandler = handler; } - void onModified(const std::function &handler) + void onModified(const std::function &handler) { modifyHandler = handler; } - void onRemoved(const std::function &handler) + void onRemoved(const std::function &handler) { removeHandler = handler; } - void onInitialResultSetComplete(const std::function &handler) + void onInitialResultSetComplete(const std::function &handler) { initialResultSetCompleteHandler = handler; } @@ -322,10 +323,10 @@ public: private: friend class ResultProvider; - std::function addHandler; - std::function modifyHandler; - std::function removeHandler; - std::function initialResultSetCompleteHandler; + std::function addHandler; + std::function modifyHandler; + std::function removeHandler; + std::function initialResultSetCompleteHandler; std::function completeHandler; std::function clearHandler; @@ -333,37 +334,28 @@ private: async::ThreadBoundary mThreadBoundary; }; -template -class AggregatingResultEmitter : public ResultEmitter { +template +class AggregatingResultEmitter : public ResultEmitter +{ public: - typedef QSharedPointer > Ptr; + typedef QSharedPointer> Ptr; void addEmitter(const typename ResultEmitter::Ptr &emitter) { - emitter->onAdded([this](const DomainType &value) { - this->add(value); - }); - emitter->onModified([this](const DomainType &value) { - this->modify(value); - }); - emitter->onRemoved([this](const DomainType &value) { - this->remove(value); - }); + emitter->onAdded([this](const DomainType &value) { this->add(value); }); + emitter->onModified([this](const DomainType &value) { this->modify(value); }); + emitter->onRemoved([this](const DomainType &value) { this->remove(value); }); auto ptr = emitter.data(); emitter->onInitialResultSetComplete([this, ptr](const DomainType &parent) { auto hashValue = qHash(parent); mInitialResultSetInProgress.remove(hashValue, ptr); - //Normally a parent is only in a single resource, except the toplevel (invalid) parent + // Normally a parent is only in a single resource, except the toplevel (invalid) parent if (!mInitialResultSetInProgress.contains(hashValue)) { this->initialResultSetComplete(parent); } }); - emitter->onComplete([this]() { - this->complete(); - }); - emitter->onClear([this]() { - this->clear(); - }); + emitter->onComplete([this]() { this->complete(); }); + emitter->onClear([this]() { this->clear(); }); mEmitter << emitter; } @@ -382,10 +374,6 @@ public: private: QList::Ptr> mEmitter; - QMultiMap*> mInitialResultSetInProgress; + QMultiMap *> mInitialResultSetInProgress; }; - - - } - diff --git a/common/resultset.cpp b/common/resultset.cpp index 6e1479a..293035b 100644 --- a/common/resultset.cpp +++ b/common/resultset.cpp @@ -20,56 +20,38 @@ #include "common/log.h" -ResultSet::ResultSet() - : mIt(nullptr) +ResultSet::ResultSet() : mIt(nullptr) { - } -ResultSet::ResultSet(const ValueGenerator &generator, const SkipValue &skip) - : mIt(nullptr), - mValueGenerator(generator), - mSkip(skip) +ResultSet::ResultSet(const ValueGenerator &generator, const SkipValue &skip) : mIt(nullptr), mValueGenerator(generator), mSkip(skip) { - } -ResultSet::ResultSet(const IdGenerator &generator) - : mIt(nullptr), - mGenerator(generator), - mSkip([this]() { - next(); - }) +ResultSet::ResultSet(const IdGenerator &generator) : mIt(nullptr), mGenerator(generator), mSkip([this]() { next(); }) { - } ResultSet::ResultSet(const QVector &resultSet) : mResultSet(resultSet), - mIt(mResultSet.constBegin()), - mSkip([this]() { - if (mIt != mResultSet.constEnd()) { - mIt++; - } - }), - mFirst(true) + mIt(mResultSet.constBegin()), + mSkip([this]() { + if (mIt != mResultSet.constEnd()) { + mIt++; + } + }), + mFirst(true) { - } -ResultSet::ResultSet(const ResultSet &other) - : mResultSet(other.mResultSet), - mIt(nullptr), - mFirst(true) +ResultSet::ResultSet(const ResultSet &other) : mResultSet(other.mResultSet), mIt(nullptr), mFirst(true) { if (other.mValueGenerator) { mValueGenerator = other.mValueGenerator; mSkip = other.mSkip; } else if (other.mGenerator) { mGenerator = other.mGenerator; - mSkip = [this]() { - next(); - }; + mSkip = [this]() { next(); }; } else { mResultSet = other.mResultSet; mIt = mResultSet.constBegin(); @@ -96,7 +78,7 @@ bool ResultSet::next() return true; } } else { - next([](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &value, Sink::Operation){ return false; }); + next([](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &value, Sink::Operation) { return false; }); } return false; } diff --git a/common/resultset.h b/common/resultset.h index e513460..88f7055 100644 --- a/common/resultset.h +++ b/common/resultset.h @@ -28,34 +28,34 @@ * * We'll eventually want to lazy load results in next(). */ -class ResultSet { - public: - typedef std::function)> ValueGenerator; - typedef std::function IdGenerator; - typedef std::function SkipValue; - - ResultSet(); - ResultSet(const ValueGenerator &generator, const SkipValue &skip); - ResultSet(const IdGenerator &generator); - ResultSet(const QVector &resultSet); - ResultSet(const ResultSet &other); - - bool next(); - bool next(std::function callback); - - void skip(int number); - - QByteArray id(); - - bool isEmpty(); - - private: - QVector mResultSet; - QVector::ConstIterator mIt; - QByteArray mCurrentValue; - IdGenerator mGenerator; - ValueGenerator mValueGenerator; - SkipValue mSkip; - bool mFirst; +class ResultSet +{ +public: + typedef std::function)> ValueGenerator; + typedef std::function IdGenerator; + typedef std::function SkipValue; + + ResultSet(); + ResultSet(const ValueGenerator &generator, const SkipValue &skip); + ResultSet(const IdGenerator &generator); + ResultSet(const QVector &resultSet); + ResultSet(const ResultSet &other); + + bool next(); + bool next(std::function callback); + + void skip(int number); + + QByteArray id(); + + bool isEmpty(); + +private: + QVector mResultSet; + QVector::ConstIterator mIt; + QByteArray mCurrentValue; + IdGenerator mGenerator; + ValueGenerator mValueGenerator; + SkipValue mSkip; + bool mFirst; }; - diff --git a/common/storage.h b/common/storage.h index 663d192..b051daa 100644 --- a/common/storage.h +++ b/common/storage.h @@ -26,14 +26,19 @@ #include #include -namespace Sink -{ +namespace Sink { -class SINK_EXPORT Storage { +class SINK_EXPORT Storage +{ public: - enum AccessMode { ReadOnly, ReadWrite }; + enum AccessMode + { + ReadOnly, + ReadWrite + }; - enum ErrorCodes { + enum ErrorCodes + { GenericError, NotOpen, ReadOnlyError, @@ -44,8 +49,9 @@ public: class Error { public: - Error(const QByteArray &s, int c, const QByteArray &m) - : store(s), message(m), code(c) {} + Error(const QByteArray &s, int c, const QByteArray &m) : store(s), message(m), code(c) + { + } QByteArray store; QByteArray message; int code; @@ -65,13 +71,11 @@ public: /** * Remove a key */ - void remove(const QByteArray &key, - const std::function &errorHandler = std::function()); + void remove(const QByteArray &key, const std::function &errorHandler = std::function()); /** * Remove a key-value pair */ - void remove(const QByteArray &key, const QByteArray &value, - const std::function &errorHandler = std::function()); + void remove(const QByteArray &key, const QByteArray &value, const std::function &errorHandler = std::function()); /** * Read values with a given key. @@ -82,9 +86,8 @@ public: * * @return The number of values retrieved. */ - int scan(const QByteArray &key, - const std::function &resultHandler, - const std::function &errorHandler = std::function(), bool findSubstringKeys = false) const; + int scan(const QByteArray &key, const std::function &resultHandler, + const std::function &errorHandler = std::function(), bool findSubstringKeys = false) const; /** * Finds the last value in a series matched by prefix. @@ -92,28 +95,29 @@ public: * This is used to match by uid prefix and find the highest revision. * Note that this relies on a key scheme like $uid$revision. */ - void findLatest(const QByteArray &uid, - const std::function &resultHandler, - const std::function &errorHandler = std::function()) const; + void findLatest(const QByteArray &uid, const std::function &resultHandler, + const std::function &errorHandler = std::function()) const; /** * Returns true if the database contains the substring key. */ bool contains(const QByteArray &uid); - NamedDatabase(NamedDatabase&& other) : d(other.d) + NamedDatabase(NamedDatabase &&other) : d(other.d) { d = other.d; other.d = nullptr; } - NamedDatabase& operator=(NamedDatabase&& other) { + NamedDatabase &operator=(NamedDatabase &&other) + { d = other.d; other.d = nullptr; return *this; } - operator bool() const { + operator bool() const + { return (d != nullptr); } @@ -121,10 +125,10 @@ public: private: friend Transaction; - NamedDatabase(NamedDatabase& other); - NamedDatabase& operator=(NamedDatabase& other); + NamedDatabase(NamedDatabase &other); + NamedDatabase &operator=(NamedDatabase &other); class Private; - NamedDatabase(Private*); + NamedDatabase(Private *); Private *d; }; @@ -138,37 +142,39 @@ public: QList getDatabaseNames() const; - NamedDatabase openDatabase(const QByteArray &name = QByteArray("default"), const std::function &errorHandler = std::function(), bool allowDuplicates = false) const; + NamedDatabase openDatabase(const QByteArray &name = QByteArray("default"), + const std::function &errorHandler = std::function(), bool allowDuplicates = false) const; - Transaction(Transaction&& other) : d(other.d) + Transaction(Transaction &&other) : d(other.d) { d = other.d; other.d = nullptr; } - Transaction& operator=(Transaction&& other) { + Transaction &operator=(Transaction &&other) + { d = other.d; other.d = nullptr; return *this; } - operator bool() const { + operator bool() const + { return (d != nullptr); } private: - Transaction(Transaction& other); - Transaction& operator=(Transaction& other); + Transaction(Transaction &other); + Transaction &operator=(Transaction &other); friend Storage; class Private; - Transaction(Private*); + Transaction(Private *); Private *d; }; Storage(const QString &storageRoot, const QString &name, AccessMode mode = ReadOnly); ~Storage(); - Transaction createTransaction(AccessMode mode = ReadWrite, - const std::function &errorHandler = std::function()); + Transaction createTransaction(AccessMode mode = ReadWrite, const std::function &errorHandler = std::function()); /** * Set the default error handler. @@ -178,7 +184,7 @@ public: /** * A basic error handler that writes to std::cerr. - * + * * Used if nothing else is configured. */ static std::function basicErrorHandler(); @@ -188,7 +194,7 @@ public: /** * Clears all cached environments. - * + * * This only ever has to be called if a database was removed from another process. */ static void clearEnv(); @@ -220,8 +226,7 @@ private: private: class Private; - Private * const d; + Private *const d; }; } // namespace Sink - diff --git a/common/storage_common.cpp b/common/storage_common.cpp index 0b842d1..2873f5f 100644 --- a/common/storage_common.cpp +++ b/common/storage_common.cpp @@ -23,8 +23,7 @@ #include "log.h" -namespace Sink -{ +namespace Sink { static const char *s_internalPrefix = "__internal"; static const int s_internalPrefixSize = strlen(s_internalPrefix); @@ -60,14 +59,16 @@ void Storage::setMaxRevision(Sink::Storage::Transaction &transaction, qint64 rev qint64 Storage::maxRevision(const Sink::Storage::Transaction &transaction) { qint64 r = 0; - transaction.openDatabase().scan("__internal_maxRevision", [&](const QByteArray &, const QByteArray &revision) -> bool { - r = revision.toLongLong(); - return false; - }, [](const Error &error){ - if (error.code != Sink::Storage::NotFound) { - Warning() << "Coultn'd find the maximum revision."; - } - }); + transaction.openDatabase().scan("__internal_maxRevision", + [&](const QByteArray &, const QByteArray &revision) -> bool { + r = revision.toLongLong(); + return false; + }, + [](const Error &error) { + if (error.code != Sink::Storage::NotFound) { + Warning() << "Coultn'd find the maximum revision."; + } + }); return r; } @@ -79,44 +80,48 @@ void Storage::setCleanedUpRevision(Sink::Storage::Transaction &transaction, qint qint64 Storage::cleanedUpRevision(const Sink::Storage::Transaction &transaction) { qint64 r = 0; - transaction.openDatabase().scan("__internal_cleanedUpRevision", [&](const QByteArray &, const QByteArray &revision) -> bool { - r = revision.toLongLong(); - return false; - }, [](const Error &error){ - if (error.code != Sink::Storage::NotFound) { - Warning() << "Coultn'd find the maximum revision."; - } - }); + transaction.openDatabase().scan("__internal_cleanedUpRevision", + [&](const QByteArray &, const QByteArray &revision) -> bool { + r = revision.toLongLong(); + return false; + }, + [](const Error &error) { + if (error.code != Sink::Storage::NotFound) { + Warning() << "Coultn'd find the maximum revision."; + } + }); return r; } QByteArray Storage::getUidFromRevision(const Sink::Storage::Transaction &transaction, qint64 revision) { QByteArray uid; - transaction.openDatabase("revisions").scan(QByteArray::number(revision), [&](const QByteArray &, const QByteArray &value) -> bool { - uid = value; - return false; - }, [revision](const Error &error){ - Warning() << "Coultn'd find uid for revision " << revision; - }); + transaction.openDatabase("revisions") + .scan(QByteArray::number(revision), + [&](const QByteArray &, const QByteArray &value) -> bool { + uid = value; + return false; + }, + [revision](const Error &error) { Warning() << "Coultn'd find uid for revision " << revision; }); return uid; } QByteArray Storage::getTypeFromRevision(const Sink::Storage::Transaction &transaction, qint64 revision) { QByteArray type; - transaction.openDatabase("revisionType").scan(QByteArray::number(revision), [&](const QByteArray &, const QByteArray &value) -> bool { - type = value; - return false; - }, [revision](const Error &error){ - Warning() << "Coultn'd find type for revision " << revision; - }); + transaction.openDatabase("revisionType") + .scan(QByteArray::number(revision), + [&](const QByteArray &, const QByteArray &value) -> bool { + type = value; + return false; + }, + [revision](const Error &error) { Warning() << "Coultn'd find type for revision " << revision; }); return type; } void Storage::recordRevision(Sink::Storage::Transaction &transaction, qint64 revision, const QByteArray &uid, const QByteArray &type) { - //TODO use integerkeys + // TODO use integerkeys transaction.openDatabase("revisions").write(QByteArray::number(revision), uid); transaction.openDatabase("revisionType").write(QByteArray::number(revision), type); } @@ -164,11 +169,12 @@ Storage::NamedDatabase Storage::mainDatabase(const Sink::Storage::Transaction &t bool Storage::NamedDatabase::contains(const QByteArray &uid) { bool found = false; - scan(uid, [&found](const QByteArray &, const QByteArray &) -> bool { - found = true; - return false; - }, [this](const Sink::Storage::Error &error) { - }, true); + scan(uid, + [&found](const QByteArray &, const QByteArray &) -> bool { + found = true; + return false; + }, + [this](const Sink::Storage::Error &error) {}, true); return found; } diff --git a/common/storage_lmdb.cpp b/common/storage_lmdb.cpp index 2d8b187..878a5d9 100644 --- a/common/storage_lmdb.cpp +++ b/common/storage_lmdb.cpp @@ -56,17 +56,12 @@ class Storage::NamedDatabase::Private { public: Private(const QByteArray &_db, bool _allowDuplicates, const std::function &_defaultErrorHandler, const QString &_name, MDB_txn *_txn) - : db(_db), - transaction(_txn), - allowDuplicates(_allowDuplicates), - defaultErrorHandler(_defaultErrorHandler), - name(_name) + : db(_db), transaction(_txn), allowDuplicates(_allowDuplicates), defaultErrorHandler(_defaultErrorHandler), name(_name) { } ~Private() { - } QByteArray db; @@ -88,7 +83,7 @@ public: if (const int rc = mdb_dbi_open(transaction, db.constData(), flags, &dbi)) { dbi = 0; transaction = 0; - //The database is not existing, ignore in read-only mode + // The database is not existing, ignore in read-only mode if (!(readOnly && rc == MDB_NOTFOUND)) { Error error(name.toLatin1(), ErrorCodes::GenericError, "Error while opening database: " + QByteArray(mdb_strerror(rc))); errorHandler ? errorHandler(error) : defaultErrorHandler(error); @@ -99,14 +94,11 @@ public: } }; -Storage::NamedDatabase::NamedDatabase() - : d(nullptr) +Storage::NamedDatabase::NamedDatabase() : d(nullptr) { - } -Storage::NamedDatabase::NamedDatabase(NamedDatabase::Private *prv) - : d(prv) +Storage::NamedDatabase::NamedDatabase(NamedDatabase::Private *prv) : d(prv) { } @@ -138,9 +130,9 @@ bool Storage::NamedDatabase::write(const QByteArray &sKey, const QByteArray &sVa int rc; MDB_val key, data; key.mv_size = keySize; - key.mv_data = const_cast(keyPtr); + key.mv_data = const_cast(keyPtr); data.mv_size = valueSize; - data.mv_data = const_cast(valuePtr); + data.mv_data = const_cast(valuePtr); rc = mdb_put(d->transaction, d->dbi, &key, &data, 0); if (rc) { @@ -151,14 +143,12 @@ bool Storage::NamedDatabase::write(const QByteArray &sKey, const QByteArray &sVa return !rc; } -void Storage::NamedDatabase::remove(const QByteArray &k, - const std::function &errorHandler) +void Storage::NamedDatabase::remove(const QByteArray &k, const std::function &errorHandler) { remove(k, QByteArray(), errorHandler); } -void Storage::NamedDatabase::remove(const QByteArray &k, const QByteArray &value, - const std::function &errorHandler) +void Storage::NamedDatabase::remove(const QByteArray &k, const QByteArray &value, const std::function &errorHandler) { if (!d || !d->transaction) { if (d) { @@ -171,13 +161,13 @@ void Storage::NamedDatabase::remove(const QByteArray &k, const QByteArray &value int rc; MDB_val key; key.mv_size = k.size(); - key.mv_data = const_cast(static_cast(k.data())); + key.mv_data = const_cast(static_cast(k.data())); if (value.isEmpty()) { rc = mdb_del(d->transaction, d->dbi, &key, 0); } else { MDB_val data; data.mv_size = value.size(); - data.mv_data = const_cast(static_cast(value.data())); + data.mv_data = const_cast(static_cast(value.data())); rc = mdb_del(d->transaction, d->dbi, &key, &data); } @@ -187,13 +177,11 @@ void Storage::NamedDatabase::remove(const QByteArray &k, const QByteArray &value } } -int Storage::NamedDatabase::scan(const QByteArray &k, - const std::function &resultHandler, - const std::function &errorHandler, - bool findSubstringKeys) const +int Storage::NamedDatabase::scan(const QByteArray &k, const std::function &resultHandler, + const std::function &errorHandler, bool findSubstringKeys) const { if (!d || !d->transaction) { - //Not an error. We rely on this to read nothing from non-existing databases. + // Not an error. We rely on this to read nothing from non-existing databases. return 0; } @@ -202,7 +190,7 @@ int Storage::NamedDatabase::scan(const QByteArray &k, MDB_val data; MDB_cursor *cursor; - key.mv_data = (void*)k.constData(); + key.mv_data = (void *)k.constData(); key.mv_size = k.size(); rc = mdb_cursor_open(d->transaction, d->dbi, &cursor); @@ -220,21 +208,21 @@ int Storage::NamedDatabase::scan(const QByteArray &k, op = MDB_SET_RANGE; } if ((rc = mdb_cursor_get(cursor, &key, &data, op)) == 0) { - //The first lookup will find a key that is equal or greather than our key - if (QByteArray::fromRawData((char*)key.mv_data, key.mv_size).startsWith(k)) { + // The first lookup will find a key that is equal or greather than our key + if (QByteArray::fromRawData((char *)key.mv_data, key.mv_size).startsWith(k)) { numberOfRetrievedValues++; - if (resultHandler(QByteArray::fromRawData((char*)key.mv_data, key.mv_size), QByteArray::fromRawData((char*)data.mv_data, data.mv_size))) { + if (resultHandler(QByteArray::fromRawData((char *)key.mv_data, key.mv_size), QByteArray::fromRawData((char *)data.mv_data, data.mv_size))) { if (findSubstringKeys) { - //Reset the key to what we search for - key.mv_data = (void*)k.constData(); + // Reset the key to what we search for + key.mv_data = (void *)k.constData(); key.mv_size = k.size(); } MDB_cursor_op nextOp = (d->allowDuplicates && !findSubstringKeys) ? MDB_NEXT_DUP : MDB_NEXT; while ((rc = mdb_cursor_get(cursor, &key, &data, nextOp)) == 0) { - //Every consequent lookup simply iterates through the list - if (QByteArray::fromRawData((char*)key.mv_data, key.mv_size).startsWith(k)) { + // Every consequent lookup simply iterates through the list + if (QByteArray::fromRawData((char *)key.mv_data, key.mv_size).startsWith(k)) { numberOfRetrievedValues++; - if (!resultHandler(QByteArray::fromRawData((char*)key.mv_data, key.mv_size), QByteArray::fromRawData((char*)data.mv_data, data.mv_size))) { + if (!resultHandler(QByteArray::fromRawData((char *)key.mv_data, key.mv_size), QByteArray::fromRawData((char *)data.mv_data, data.mv_size))) { break; } } @@ -243,14 +231,14 @@ int Storage::NamedDatabase::scan(const QByteArray &k, } } - //We never find the last value + // We never find the last value if (rc == MDB_NOTFOUND) { rc = 0; } } else { if ((rc = mdb_cursor_get(cursor, &key, &data, MDB_SET)) == 0) { numberOfRetrievedValues++; - resultHandler(QByteArray::fromRawData((char*)key.mv_data, key.mv_size), QByteArray::fromRawData((char*)data.mv_data, data.mv_size)); + resultHandler(QByteArray::fromRawData((char *)key.mv_data, key.mv_size), QByteArray::fromRawData((char *)data.mv_data, data.mv_size)); } } @@ -264,12 +252,11 @@ int Storage::NamedDatabase::scan(const QByteArray &k, return numberOfRetrievedValues; } -void Storage::NamedDatabase::findLatest(const QByteArray &k, - const std::function &resultHandler, - const std::function &errorHandler) const +void Storage::NamedDatabase::findLatest(const QByteArray &k, const std::function &resultHandler, + const std::function &errorHandler) const { if (!d || !d->transaction) { - //Not an error. We rely on this to read nothing from non-existing databases. + // Not an error. We rely on this to read nothing from non-existing databases. return; } @@ -278,7 +265,7 @@ void Storage::NamedDatabase::findLatest(const QByteArray &k, MDB_val data; MDB_cursor *cursor; - key.mv_data = (void*)k.constData(); + key.mv_data = (void *)k.constData(); key.mv_size = k.size(); rc = mdb_cursor_open(d->transaction, d->dbi, &cursor); @@ -290,10 +277,10 @@ void Storage::NamedDatabase::findLatest(const QByteArray &k, MDB_cursor_op op = MDB_SET_RANGE; if ((rc = mdb_cursor_get(cursor, &key, &data, op)) == 0) { - //The first lookup will find a key that is equal or greather than our key - if (QByteArray::fromRawData((char*)key.mv_data, key.mv_size).startsWith(k)) { + // The first lookup will find a key that is equal or greather than our key + if (QByteArray::fromRawData((char *)key.mv_data, key.mv_size).startsWith(k)) { bool advanced = false; - while (QByteArray::fromRawData((char*)key.mv_data, key.mv_size).startsWith(k)) { + while (QByteArray::fromRawData((char *)key.mv_data, key.mv_size).startsWith(k)) { advanced = true; MDB_cursor_op nextOp = MDB_NEXT; rc = mdb_cursor_get(cursor, &key, &data, nextOp); @@ -303,17 +290,17 @@ void Storage::NamedDatabase::findLatest(const QByteArray &k, } if (advanced) { MDB_cursor_op prefOp = MDB_PREV; - //We read past the end above, just take the last value + // We read past the end above, just take the last value if (rc == MDB_NOTFOUND) { prefOp = MDB_LAST; } rc = mdb_cursor_get(cursor, &key, &data, prefOp); - resultHandler(QByteArray::fromRawData((char*)key.mv_data, key.mv_size), QByteArray::fromRawData((char*)data.mv_data, data.mv_size)); + resultHandler(QByteArray::fromRawData((char *)key.mv_data, key.mv_size), QByteArray::fromRawData((char *)data.mv_data, data.mv_size)); } } } - //We never find the last value + // We never find the last value if (rc == MDB_NOTFOUND) { rc = 0; } @@ -350,25 +337,15 @@ qint64 Storage::NamedDatabase::getSize() } - - class Storage::Transaction::Private { public: Private(bool _requestRead, const std::function &_defaultErrorHandler, const QString &_name, MDB_env *_env) - : env(_env), - requestedRead(_requestRead), - defaultErrorHandler(_defaultErrorHandler), - name(_name), - implicitCommit(false), - error(false), - modificationCounter(0) + : env(_env), requestedRead(_requestRead), defaultErrorHandler(_defaultErrorHandler), name(_name), implicitCommit(false), error(false), modificationCounter(0) { - } ~Private() { - } MDB_env *env; @@ -391,14 +368,11 @@ public: } }; -Storage::Transaction::Transaction() - : d(nullptr) +Storage::Transaction::Transaction() : d(nullptr) { - } -Storage::Transaction::Transaction(Transaction::Private *prv) - : d(prv) +Storage::Transaction::Transaction(Transaction::Private *prv) : d(prv) { d->startTransaction(); } @@ -449,7 +423,7 @@ Storage::NamedDatabase Storage::Transaction::openDatabase(const QByteArray &db, if (!d) { return Storage::NamedDatabase(); } - //We don't now if anything changed + // We don't now if anything changed d->implicitCommit = true; auto p = new Storage::NamedDatabase::Private(db, allowDuplicates, d->defaultErrorHandler, d->name, d->transaction); if (!p->openDatabase(d->requestedRead, errorHandler)) { @@ -475,9 +449,9 @@ QList Storage::Transaction::getDatabaseNames() const mdb_cursor_open(d->transaction, d->dbi, &cursor); if ((rc = mdb_cursor_get(cursor, &key, &data, MDB_FIRST)) == 0) { - list << QByteArray::fromRawData((char*)key.mv_data, key.mv_size); + list << QByteArray::fromRawData((char *)key.mv_data, key.mv_size); while ((rc = mdb_cursor_get(cursor, &key, &data, MDB_NEXT)) == 0) { - list << QByteArray::fromRawData((char*)key.mv_data, key.mv_size); + list << QByteArray::fromRawData((char *)key.mv_data, key.mv_size); } } else { Warning() << "Failed to get a value" << rc; @@ -489,9 +463,6 @@ QList Storage::Transaction::getDatabaseNames() const } - - - class Storage::Private { public: @@ -504,17 +475,13 @@ public: MDB_env *env; AccessMode mode; static QMutex sMutex; - static QHash sEnvironments; + static QHash sEnvironments; }; QMutex Storage::Private::sMutex; -QHash Storage::Private::sEnvironments; +QHash Storage::Private::sEnvironments; -Storage::Private::Private(const QString &s, const QString &n, AccessMode m) - : storageRoot(s), - name(n), - env(0), - mode(m) +Storage::Private::Private(const QString &s, const QString &n, AccessMode m) : storageRoot(s), name(n), env(0), mode(m) { const QString fullPath(storageRoot + '/' + name); QFileInfo dirInfo(fullPath); @@ -525,11 +492,11 @@ Storage::Private::Private(const QString &s, const QString &n, AccessMode m) if (mode == ReadWrite && !dirInfo.permission(QFile::WriteOwner)) { qCritical() << fullPath << "does not have write permissions. Aborting"; } else if (dirInfo.exists()) { - //Ensure the environment is only created once + // Ensure the environment is only created once QMutexLocker locker(&sMutex); /* - * It seems we can only ever have one environment open in the process. + * It seems we can only ever have one environment open in the process. * Otherwise multi-threading breaks. */ env = sEnvironments.value(fullPath); @@ -549,8 +516,8 @@ Storage::Private::Private(const QString &s, const QString &n, AccessMode m) mdb_env_close(env); env = 0; } else { - //FIXME: dynamic resize - const size_t dbSize = (size_t)10485760 * (size_t)8000; //1MB * 8000 + // FIXME: dynamic resize + const size_t dbSize = (size_t)10485760 * (size_t)8000; // 1MB * 8000 mdb_env_set_mapsize(env, dbSize); sEnvironments.insert(fullPath, env); } @@ -561,7 +528,7 @@ Storage::Private::Private(const QString &s, const QString &n, AccessMode m) Storage::Private::~Private() { - //Since we can have only one environment open per process, we currently leak the environments. + // Since we can have only one environment open per process, we currently leak the environments. // if (env) { // //mdb_dbi_close should not be necessary and is potentially dangerous (see docs) // mdb_dbi_close(env, dbi); @@ -569,8 +536,7 @@ Storage::Private::~Private() // } } -Storage::Storage(const QString &storageRoot, const QString &name, AccessMode mode) - : d(new Private(storageRoot, name, mode)) +Storage::Storage(const QString &storageRoot, const QString &name, AccessMode mode) : d(new Private(storageRoot, name, mode)) { } diff --git a/common/store.cpp b/common/store.cpp index 6847d22..68f73c8 100644 --- a/common/store.cpp +++ b/common/store.cpp @@ -38,8 +38,7 @@ #undef DEBUG_AREA #define DEBUG_AREA "client.store" -namespace Sink -{ +namespace Sink { QString Store::storageLocation() { @@ -48,7 +47,7 @@ QString Store::storageLocation() static QList getResources(const QList &resourceFilter, const QByteArray &type) { - //Return the global resource (signified by an empty name) for types that don't eblong to a specific resource + // Return the global resource (signified by an empty name) for types that don't eblong to a specific resource if (type == "sinkresource") { return QList() << ""; } @@ -56,7 +55,7 @@ static QList getResources(const QList &resourceFilter, c const auto configuredResources = ResourceConfig::getResources(); if (resourceFilter.isEmpty()) { for (const auto &res : configuredResources.keys()) { - //TODO filter by entity type + // TODO filter by entity type resources << res; } } else { @@ -82,7 +81,7 @@ QSharedPointer Store::loadModel(Query query) Trace() << " Ids: " << query.ids; Trace() << " IsLive: " << query.liveQuery; Trace() << " Sorting: " << query.sortProperty; - auto model = QSharedPointer >::create(query, query.requestedProperties); + auto model = QSharedPointer>::create(query, query.requestedProperties); //* Client defines lifetime of model //* The model lifetime defines the duration of live-queries @@ -95,120 +94,117 @@ QSharedPointer Store::loadModel(Query query) auto aggregatingEmitter = AggregatingResultEmitter::Ptr::create(); model->setEmitter(aggregatingEmitter); KAsync::iterate(resources) - .template each([query, aggregatingEmitter](const QByteArray &resource, KAsync::Future &future) { - auto facade = FacadeFactory::instance().getFacade(resourceName(resource), resource); - if (facade) { - Trace() << "Trying to fetch from resource " << resource; - auto result = facade->load(query); - aggregatingEmitter->addEmitter(result.second); - result.first.template then([&future](){future.setFinished();}).exec(); - } else { - Trace() << "Couldn' find a facade for " << resource; - //Ignore the error and carry on - future.setFinished(); - } - }).exec(); + .template each([query, aggregatingEmitter](const QByteArray &resource, KAsync::Future &future) { + auto facade = FacadeFactory::instance().getFacade(resourceName(resource), resource); + if (facade) { + Trace() << "Trying to fetch from resource " << resource; + auto result = facade->load(query); + aggregatingEmitter->addEmitter(result.second); + result.first.template then([&future]() { future.setFinished(); }).exec(); + } else { + Trace() << "Couldn' find a facade for " << resource; + // Ignore the error and carry on + future.setFinished(); + } + }) + .exec(); model->fetchMore(QModelIndex()); return model; } template -static std::shared_ptr > getFacade(const QByteArray &resourceInstanceIdentifier) +static std::shared_ptr> getFacade(const QByteArray &resourceInstanceIdentifier) { if (auto facade = FacadeFactory::instance().getFacade(resourceName(resourceInstanceIdentifier), resourceInstanceIdentifier)) { return facade; } - return std::make_shared >(); + return std::make_shared>(); } template -KAsync::Job Store::create(const DomainType &domainObject) { - //Potentially move to separate thread as well +KAsync::Job Store::create(const DomainType &domainObject) +{ + // Potentially move to separate thread as well auto facade = getFacade(domainObject.resourceInstanceIdentifier()); - return facade->create(domainObject).template then([facade](){}, [](int errorCode, const QString &error) { - Warning() << "Failed to create"; - }); + return facade->create(domainObject).template then([facade]() {}, [](int errorCode, const QString &error) { Warning() << "Failed to create"; }); } template KAsync::Job Store::modify(const DomainType &domainObject) { - //Potentially move to separate thread as well + // Potentially move to separate thread as well auto facade = getFacade(domainObject.resourceInstanceIdentifier()); - return facade->modify(domainObject).template then([facade](){}, [](int errorCode, const QString &error) { - Warning() << "Failed to modify"; - }); + return facade->modify(domainObject).template then([facade]() {}, [](int errorCode, const QString &error) { Warning() << "Failed to modify"; }); } template KAsync::Job Store::remove(const DomainType &domainObject) { - //Potentially move to separate thread as well + // Potentially move to separate thread as well auto facade = getFacade(domainObject.resourceInstanceIdentifier()); - return facade->remove(domainObject).template then([facade](){}, [](int errorCode, const QString &error) { - Warning() << "Failed to remove"; - }); + return facade->remove(domainObject).template then([facade]() {}, [](int errorCode, const QString &error) { Warning() << "Failed to remove"; }); } KAsync::Job Store::removeDataFromDisk(const QByteArray &identifier) { - //All databases are going to become invalid, nuke the environments - //TODO: all clients should react to a notification the resource + // All databases are going to become invalid, nuke the environments + // TODO: all clients should react to a notification the resource Sink::Storage::clearEnv(); Trace() << "Remove data from disk " << identifier; auto time = QSharedPointer::create(); time->start(); auto resourceAccess = ResourceAccessFactory::instance().getAccess(identifier); resourceAccess->open(); - return resourceAccess->sendCommand(Sink::Commands::RemoveFromDiskCommand).then([resourceAccess, time]() { - Trace() << "Remove from disk complete." << Log::TraceTime(time->elapsed()); - }); + return resourceAccess->sendCommand(Sink::Commands::RemoveFromDiskCommand) + .then([resourceAccess, time]() { Trace() << "Remove from disk complete." << Log::TraceTime(time->elapsed()); }); } KAsync::Job Store::synchronize(const Sink::Query &query) { Trace() << "synchronize" << query.resources; return KAsync::iterate(query.resources) - .template each([query](const QByteArray &resource, KAsync::Future &future) { - Trace() << "Synchronizing " << resource; - auto resourceAccess = ResourceAccessFactory::instance().getAccess(resource); - resourceAccess->open(); - resourceAccess->synchronizeResource(true, false).then([&future, resourceAccess]() { - future.setFinished(); - }).exec(); - }); + .template each([query](const QByteArray &resource, KAsync::Future &future) { + Trace() << "Synchronizing " << resource; + auto resourceAccess = ResourceAccessFactory::instance().getAccess(resource); + resourceAccess->open(); + resourceAccess->synchronizeResource(true, false).then([&future, resourceAccess]() { future.setFinished(); }).exec(); + }); } template KAsync::Job Store::fetchOne(const Sink::Query &query) { return KAsync::start([query](KAsync::Future &future) { - //FIXME We could do this more elegantly if composed jobs would have the correct type (In that case we'd simply return the value from then continuation, and could avoid the outer job entirely) + // FIXME We could do this more elegantly if composed jobs would have the correct type (In that case we'd simply return the value from then continuation, and could avoid the + // outer job entirely) fetch(query, 1) - .template then >([&future](const QList &list){ - future.setValue(*list.first()); - future.setFinished(); - }, [&future](int errorCode, const QString &errorMessage) { - future.setError(errorCode, errorMessage); - future.setFinished(); - }).exec(); + .template then>( + [&future](const QList &list) { + future.setValue(*list.first()); + future.setFinished(); + }, + [&future](int errorCode, const QString &errorMessage) { + future.setError(errorCode, errorMessage); + future.setFinished(); + }) + .exec(); }); } template -KAsync::Job > Store::fetchAll(const Sink::Query &query) +KAsync::Job> Store::fetchAll(const Sink::Query &query) { return fetch(query); } template -KAsync::Job > Store::fetch(const Sink::Query &query, int minimumAmount) +KAsync::Job> Store::fetch(const Sink::Query &query, int minimumAmount) { auto model = loadModel(query); - auto list = QSharedPointer >::create(); + auto list = QSharedPointer>::create(); auto context = QSharedPointer::create(); - return KAsync::start >([model, list, context, minimumAmount](KAsync::Future > &future) { + return KAsync::start>([model, list, context, minimumAmount](KAsync::Future> &future) { if (model->rowCount() >= 1) { for (int i = 0; i < model->rowCount(); i++) { list->append(model->index(i, 0, QModelIndex()).data(Sink::Store::DomainObjectRole).template value()); @@ -219,16 +215,17 @@ KAsync::Job > Store::fetch(const Sink::Query &qu list->append(model->index(i, 0, QModelIndex()).data(Sink::Store::DomainObjectRole).template value()); } }); - QObject::connect(model.data(), &QAbstractItemModel::dataChanged, context.data(), [model, &future, list, minimumAmount](const QModelIndex &, const QModelIndex &, const QVector &roles) { - if (roles.contains(ModelResult::ChildrenFetchedRole)) { - if (list->size() < minimumAmount) { - future.setError(1, "Not enough values."); - } else { - future.setValue(*list); + QObject::connect(model.data(), &QAbstractItemModel::dataChanged, context.data(), + [model, &future, list, minimumAmount](const QModelIndex &, const QModelIndex &, const QVector &roles) { + if (roles.contains(ModelResult::ChildrenFetchedRole)) { + if (list->size() < minimumAmount) { + future.setError(1, "Not enough values."); + } else { + future.setValue(*list); + } + future.setFinished(); } - future.setFinished(); - } - }); + }); } if (model->data(QModelIndex(), ModelResult::ChildrenFetchedRole).toBool()) { if (list->size() < minimumAmount) { @@ -241,13 +238,14 @@ KAsync::Job > Store::fetch(const Sink::Query &qu }); } -#define REGISTER_TYPE(T) template KAsync::Job Store::remove(const T &domainObject); \ - template KAsync::Job Store::create(const T &domainObject); \ - template KAsync::Job Store::modify(const T &domainObject); \ +#define REGISTER_TYPE(T) \ + template KAsync::Job Store::remove(const T &domainObject); \ + template KAsync::Job Store::create(const T &domainObject); \ + template KAsync::Job Store::modify(const T &domainObject); \ template QSharedPointer Store::loadModel(Query query); \ - template KAsync::Job Store::fetchOne(const Query &); \ - template KAsync::Job > Store::fetchAll(const Query &); \ - template KAsync::Job > Store::fetch(const Query &, int); \ + template KAsync::Job Store::fetchOne(const Query &); \ + template KAsync::Job> Store::fetchAll(const Query &); \ + template KAsync::Job> Store::fetch(const Query &, int); REGISTER_TYPE(ApplicationDomain::Event); REGISTER_TYPE(ApplicationDomain::Mail); @@ -255,4 +253,3 @@ REGISTER_TYPE(ApplicationDomain::Folder); REGISTER_TYPE(ApplicationDomain::SinkResource); } // namespace Sink - diff --git a/common/store.h b/common/store.h index 6696833..af8e971 100644 --- a/common/store.h +++ b/common/store.h @@ -35,7 +35,7 @@ namespace Sink { /** * The unified Sink Store. - * + * * This is the primary interface for clients to interact with Sink. * It provides a unified store where all data provided by various resources can be accessed and modified. */ @@ -43,8 +43,9 @@ namespace Store { QString SINK_EXPORT storageLocation(); -enum Roles { - DomainObjectRole = Qt::UserRole + 1, //Must be the same as in ModelResult +enum Roles +{ + DomainObjectRole = Qt::UserRole + 1, // Must be the same as in ModelResult ChildrenFetchedRole, DomainObjectBaseRole }; @@ -63,7 +64,7 @@ KAsync::Job SINK_EXPORT create(const DomainType &domainObject); /** * Modify an entity. - * + * * This includes moving etc. since these are also simple settings on a property. */ template @@ -82,7 +83,7 @@ KAsync::Job SINK_EXPORT synchronize(const Sink::Query &query); /** * Removes all resource data from disk. - * + * * This will not touch the configuration. All commands that that arrived at the resource before this command will be dropped. All commands that arrived later will be executed. */ KAsync::Job SINK_EXPORT removeDataFromDisk(const QByteArray &resourceIdentifier); @@ -91,11 +92,9 @@ template KAsync::Job SINK_EXPORT fetchOne(const Sink::Query &query); template -KAsync::Job > SINK_EXPORT fetchAll(const Sink::Query &query); +KAsync::Job> SINK_EXPORT fetchAll(const Sink::Query &query); template -KAsync::Job > SINK_EXPORT fetch(const Sink::Query &query, int minimumAmount = 0); - - } +KAsync::Job> SINK_EXPORT fetch(const Sink::Query &query, int minimumAmount = 0); +} } - diff --git a/common/synclistresult.h b/common/synclistresult.h index 055714f..344c0ef 100644 --- a/common/synclistresult.h +++ b/common/synclistresult.h @@ -16,16 +16,13 @@ namespace async { * * WARNING: The nested eventloop can cause all sorts of trouble. Use only in testing code. */ -template -class SyncListResult : public QList { +template +class SyncListResult : public QList +{ public: - SyncListResult(const QSharedPointer > &emitter) - :QList(), - mEmitter(emitter) + SyncListResult(const QSharedPointer> &emitter) : QList(), mEmitter(emitter) { - emitter->onAdded([this](const T &value) { - this->append(value); - }); + emitter->onAdded([this](const T &value) { this->append(value); }); emitter->onModified([this](const T &value) { for (auto it = this->begin(); it != this->end(); it++) { if (**it == *value) { @@ -46,16 +43,12 @@ public: emitter->onInitialResultSetComplete([this]() { if (eventLoopAborter) { eventLoopAborter(); - //Be safe in case of a second invocation of the complete handler + // Be safe in case of a second invocation of the complete handler eventLoopAborter = std::function(); } }); - emitter->onComplete([this]() { - mEmitter.clear(); - }); - emitter->onClear([this]() { - this->clear(); - }); + emitter->onComplete([this]() { mEmitter.clear(); }); + emitter->onClear([this]() { this->clear(); }); } void exec() @@ -66,8 +59,7 @@ public: } private: - QSharedPointer > mEmitter; + QSharedPointer> mEmitter; std::function eventLoopAborter; }; - } diff --git a/common/threadboundary.cpp b/common/threadboundary.cpp index 238f5b4..705009b 100644 --- a/common/threadboundary.cpp +++ b/common/threadboundary.cpp @@ -23,13 +23,12 @@ Q_DECLARE_METATYPE(std::function); namespace async { -ThreadBoundary::ThreadBoundary() - : QObject() +ThreadBoundary::ThreadBoundary() : QObject() { - qRegisterMetaType >("std::function"); + qRegisterMetaType>("std::function"); } -ThreadBoundary:: ~ThreadBoundary() +ThreadBoundary::~ThreadBoundary() { } @@ -47,6 +46,4 @@ void ThreadBoundary::runInMainThread(std::function f) { f(); } - } - diff --git a/common/threadboundary.h b/common/threadboundary.h index 7bea4ea..dd86b20 100644 --- a/common/threadboundary.h +++ b/common/threadboundary.h @@ -31,17 +31,17 @@ namespace async { * A helper class to invoke a method in a different thread using the event loop. * The ThreadBoundary object must live in the thread where the function should be called. */ -class SINK_EXPORT ThreadBoundary : public QObject { +class SINK_EXPORT ThreadBoundary : public QObject +{ Q_OBJECT public: ThreadBoundary(); virtual ~ThreadBoundary(); - //Call in worker thread + // Call in worker thread void callInMainThread(std::function f); public slots: - //Get's called in main thread by it's eventloop + // Get's called in main thread by it's eventloop void runInMainThread(std::function f); }; - } diff --git a/common/typeindex.cpp b/common/typeindex.cpp index ddf5df5..1321469 100644 --- a/common/typeindex.cpp +++ b/common/typeindex.cpp @@ -30,13 +30,13 @@ static QByteArray getByteArray(const QVariant &value) if (value.isValid() && !value.toByteArray().isEmpty()) { return value.toByteArray(); } - //LMDB can't handle empty keys, so use something different + // LMDB can't handle empty keys, so use something different return "toplevel"; } static QByteArray toSortableByteArray(const QDateTime &date) { - //Sort invalid last + // Sort invalid last if (!date.isValid()) { return QByteArray::number(std::numeric_limits::max()); } @@ -44,10 +44,8 @@ static QByteArray toSortableByteArray(const QDateTime &date) } -TypeIndex::TypeIndex(const QByteArray &type) - : mType(type) +TypeIndex::TypeIndex(const QByteArray &type) : mType(type) { - } QByteArray TypeIndex::indexName(const QByteArray &property, const QByteArray &sortProperty) const @@ -58,7 +56,7 @@ QByteArray TypeIndex::indexName(const QByteArray &property, const QByteArray &so return mType + ".index." + property + ".sort." + sortProperty; } -template<> +template <> void TypeIndex::addProperty(const QByteArray &property) { auto indexer = [this, property](const QByteArray &identifier, const QVariant &value, Sink::Storage::Transaction &transaction) { @@ -69,7 +67,7 @@ void TypeIndex::addProperty(const QByteArray &property) mProperties << property; } -template<> +template <> void TypeIndex::addProperty(const QByteArray &property) { auto indexer = [this, property](const QByteArray &identifier, const QVariant &value, Sink::Storage::Transaction &transaction) { @@ -80,7 +78,7 @@ void TypeIndex::addProperty(const QByteArray &property) mProperties << property; } -template<> +template <> void TypeIndex::addProperty(const QByteArray &property) { auto indexer = [this, property](const QByteArray &identifier, const QVariant &value, Sink::Storage::Transaction &transaction) { @@ -94,7 +92,7 @@ void TypeIndex::addProperty(const QByteArray &property) mProperties << property; } -template<> +template <> void TypeIndex::addPropertyWithSorting(const QByteArray &property, const QByteArray &sortProperty) { auto indexer = [=](const QByteArray &identifier, const QVariant &value, const QVariant &sortValue, Sink::Storage::Transaction &transaction) { @@ -102,7 +100,7 @@ void TypeIndex::addPropertyWithSorting(const QByteArray & const auto propertyValue = getByteArray(value); Index(indexName(property, sortProperty), transaction).add(propertyValue + toSortableByteArray(date), identifier); }; - mSortIndexer.insert(property+sortProperty, indexer); + mSortIndexer.insert(property + sortProperty, indexer); mSortedProperties.insert(property, sortProperty); } @@ -116,7 +114,7 @@ void TypeIndex::add(const QByteArray &identifier, const Sink::ApplicationDomain: for (auto it = mSortedProperties.constBegin(); it != mSortedProperties.constEnd(); it++) { const auto value = bufferAdaptor.getProperty(it.key()); const auto sortValue = bufferAdaptor.getProperty(it.value()); - auto indexer = mSortIndexer.value(it.key()+it.value()); + auto indexer = mSortIndexer.value(it.key() + it.value()); indexer(identifier, value, sortValue, transaction); } } @@ -125,7 +123,7 @@ void TypeIndex::remove(const QByteArray &identifier, const Sink::ApplicationDoma { for (const auto &property : mProperties) { const auto value = bufferAdaptor.getProperty(property); - //FIXME don't always convert to byte array + // FIXME don't always convert to byte array Index(indexName(property), transaction).remove(getByteArray(value), identifier); } for (auto it = mSortedProperties.constBegin(); it != mSortedProperties.constEnd(); it++) { @@ -147,12 +145,8 @@ ResultSet TypeIndex::query(const Sink::Query &query, QSet &appliedFi Index index(indexName(it.key(), it.value()), transaction); const auto lookupKey = getByteArray(query.propertyFilter.value(it.key())); Trace() << "looking for " << lookupKey; - index.lookup(lookupKey, [&](const QByteArray &value) { - keys << value; - }, - [it](const Index::Error &error) { - Warning() << "Error in index: " << error.message << it.key() << it.value(); - }, true); + index.lookup(lookupKey, [&](const QByteArray &value) { keys << value; }, + [it](const Index::Error &error) { Warning() << "Error in index: " << error.message << it.key() << it.value(); }, true); appliedFilters << it.key(); appliedSorting = it.value(); Trace() << "Index lookup on " << it.key() << it.value() << " found " << keys.size() << " keys."; @@ -163,12 +157,8 @@ ResultSet TypeIndex::query(const Sink::Query &query, QSet &appliedFi if (query.propertyFilter.contains(property)) { Index index(indexName(property), transaction); const auto lookupKey = getByteArray(query.propertyFilter.value(property)); - index.lookup(lookupKey, [&](const QByteArray &value) { - keys << value; - }, - [property](const Index::Error &error) { - Warning() << "Error in index: " << error.message << property; - }); + index.lookup( + lookupKey, [&](const QByteArray &value) { keys << value; }, [property](const Index::Error &error) { Warning() << "Error in index: " << error.message << property; }); appliedFilters << property; Trace() << "Index lookup on " << property << " found " << keys.size() << " keys."; return ResultSet(keys); @@ -177,4 +167,3 @@ ResultSet TypeIndex::query(const Sink::Query &query, QSet &appliedFi Trace() << "No matching index"; return ResultSet(keys); } - diff --git a/common/typeindex.h b/common/typeindex.h index c19780c..a16179c 100644 --- a/common/typeindex.h +++ b/common/typeindex.h @@ -29,9 +29,9 @@ class TypeIndex public: TypeIndex(const QByteArray &type); - template + template void addProperty(const QByteArray &property); - template + template void addPropertyWithSorting(const QByteArray &property, const QByteArray &sortProperty); void add(const QByteArray &identifier, const Sink::ApplicationDomain::BufferAdaptor &bufferAdaptor, Sink::Storage::Transaction &transaction); @@ -44,7 +44,6 @@ private: QByteArray mType; QByteArrayList mProperties; QMap mSortedProperties; - QHash > mIndexer; - QHash > mSortIndexer; + QHash> mIndexer; + QHash> mSortIndexer; }; - -- cgit v1.2.3