From 237b9ae4113e7a9f489632296941becb71afdb45 Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Sun, 16 Oct 2016 14:55:20 +0200 Subject: Refactor how the storage is used. This is the initial refactoring to improve how we deal with the storage. It does a couple of things: * Rename Sink::Storage to Sink::Storage::DataStore to free up the Sink::Storage namespace * Introduce a Sink::ResourceContext to have a single object that can be passed around containing everything that is necessary to operate on a resource. This is a lot better than the multiple separate parameters that we used to pass around all over the place, while still allowing for dependency injection for tests. * Tie storage access together using the new EntityStore that directly works with ApplicationDomainTypes. This gives us a central place where main storage, indexes and buffer adaptors are tied together, which will also give us a place to implement external indexes, such as a fulltextindex using xapian. * Use ApplicationDomainTypes as the default way to pass around entities. Instead of using various ways to pass around entities (buffers, buffer adaptors, ApplicationDomainTypes), only use a single way. The old approach was confusing, and was only done as: * optimization; really shouldn't be necessary and otherwise I'm sure we can find better ways to optimize ApplicationDomainType itself. * a way to account for entities that have multiple buffers, a concept that I no longer deem relevant. While this commit does the bulk of the work to get there, the following commits will refactor more stuff to get things back to normal. --- common/CMakeLists.txt | 1 + common/adaptorfactoryregistry.cpp | 12 + common/adaptorfactoryregistry.h | 3 + common/changereplay.cpp | 33 +- common/changereplay.h | 11 +- common/datastorequery.cpp | 62 +--- common/datastorequery.h | 9 +- common/domain/applicationdomaintype.cpp | 2 +- common/domain/applicationdomaintype.h | 2 + common/domain/event.cpp | 15 +- common/domain/event.h | 11 +- common/domain/folder.cpp | 17 +- common/domain/folder.h | 10 +- common/domain/mail.cpp | 52 ++-- common/domain/mail.h | 11 +- common/domainadaptor.h | 7 +- common/domaintypeadaptorfactoryinterface.h | 4 +- common/entityreader.cpp | 209 +++++++------ common/entityreader.h | 16 +- common/entitystore.cpp | 5 +- common/entitystore.h | 30 +- common/facade.cpp | 22 +- common/facade.h | 18 +- common/facadefactory.cpp | 3 +- common/facadefactory.h | 7 +- common/genericresource.cpp | 33 +- common/genericresource.h | 5 +- common/index.cpp | 12 +- common/index.h | 8 +- common/indexupdater.h | 16 +- common/listener.cpp | 6 +- common/mailpreprocessor.cpp | 10 +- common/mailpreprocessor.h | 10 +- common/messagequeue.cpp | 20 +- common/messagequeue.h | 4 +- common/pipeline.cpp | 99 +++--- common/pipeline.h | 44 +-- common/queryrunner.cpp | 106 ++++--- common/queryrunner.h | 9 +- common/remoteidmap.cpp | 6 +- common/remoteidmap.h | 4 +- common/resource.h | 3 +- common/resourcecontext.h | 77 +++++ common/sourcewriteback.cpp | 31 +- common/sourcewriteback.h | 13 +- common/specialpurposepreprocessor.cpp | 48 +-- common/specialpurposepreprocessor.h | 8 +- common/storage.h | 52 ++-- common/storage/entitystore.cpp | 338 +++++++++++++++++++++ common/storage/entitystore.h | 109 +++++++ common/storage_common.cpp | 50 +-- common/storage_lmdb.cpp | 100 +++--- common/store.cpp | 2 +- common/synchronizer.cpp | 148 ++++----- common/synchronizer.h | 31 +- common/test.cpp | 4 +- common/typeindex.cpp | 24 +- common/typeindex.h | 20 +- examples/dummyresource/facade.cpp | 12 +- examples/dummyresource/facade.h | 6 +- examples/dummyresource/resourcefactory.cpp | 16 +- examples/dummyresource/resourcefactory.h | 4 +- examples/imapresource/facade.cpp | 8 +- examples/imapresource/facade.h | 4 +- examples/imapresource/imapresource.cpp | 66 ++-- examples/imapresource/imapresource.h | 4 +- examples/maildirresource/facade.cpp | 11 +- examples/maildirresource/facade.h | 4 +- examples/maildirresource/maildirresource.cpp | 77 ++--- examples/maildirresource/maildirresource.h | 4 +- .../mailtransportresource.cpp | 32 +- .../mailtransportresource/mailtransportresource.h | 4 +- sinksh/syntax_modules/sink_stat.cpp | 6 +- tests/clientapitest.cpp | 6 +- .../databasepopulationandfacadequerybenchmark.cpp | 22 +- tests/dummyresourcebenchmark.cpp | 4 +- tests/dummyresourcetest.cpp | 9 +- tests/dummyresourcewritebenchmark.cpp | 4 +- tests/hawd/dataset.cpp | 8 +- tests/hawd/dataset.h | 4 +- tests/indextest.cpp | 6 +- tests/mailquerybenchmark.cpp | 11 +- tests/messagequeuetest.cpp | 4 +- tests/pipelinebenchmark.cpp | 11 +- tests/pipelinetest.cpp | 49 ++- tests/querytest.cpp | 2 +- tests/storagebenchmark.cpp | 24 +- tests/storagetest.cpp | 136 ++++----- tests/testimplementations.h | 10 +- 89 files changed, 1524 insertions(+), 1066 deletions(-) create mode 100644 common/resourcecontext.h create mode 100644 common/storage/entitystore.cpp create mode 100644 common/storage/entitystore.h diff --git a/common/CMakeLists.txt b/common/CMakeLists.txt index 84fe474..e1e7a51 100644 --- a/common/CMakeLists.txt +++ b/common/CMakeLists.txt @@ -76,6 +76,7 @@ set(command_SRCS mailpreprocessor.cpp specialpurposepreprocessor.cpp datastorequery.cpp + storage/entitystore.cpp ${storage_SRCS}) add_library(${PROJECT_NAME} SHARED ${command_SRCS}) diff --git a/common/adaptorfactoryregistry.cpp b/common/adaptorfactoryregistry.cpp index 323a02d..91b5a4c 100644 --- a/common/adaptorfactoryregistry.cpp +++ b/common/adaptorfactoryregistry.cpp @@ -61,8 +61,20 @@ std::shared_ptr AdaptorFactoryRegistry::getFa return std::static_pointer_cast(ptr); } +QMap AdaptorFactoryRegistry::getFactories(const QByteArray &resource) +{ + QMap map; + for (const auto &type : mTypes.values(resource)) { + auto f = getFactory(resource, type); + //Convert the std::shared_ptr to a QSharedPointer + map.insert(type, DomainTypeAdaptorFactoryInterface::Ptr(f.get(), [](DomainTypeAdaptorFactoryInterface *) {})); + } + return map; +} + void AdaptorFactoryRegistry::registerFactory(const QByteArray &resource, const std::shared_ptr &instance, const QByteArray typeName) { + mTypes.insert(resource, typeName); mRegistry.insert(key(resource, typeName), instance); } diff --git a/common/adaptorfactoryregistry.h b/common/adaptorfactoryregistry.h index f06120a..47f2612 100644 --- a/common/adaptorfactoryregistry.h +++ b/common/adaptorfactoryregistry.h @@ -54,11 +54,14 @@ public: std::shared_ptr getFactory(const QByteArray &resource, const QByteArray &typeName); + QMap getFactories(const QByteArray &resource); + private: AdaptorFactoryRegistry(); void registerFactory(const QByteArray &resource, const std::shared_ptr &instance, const QByteArray typeName); QHash> mRegistry; + QMultiHash mTypes; static QMutex sMutex; }; } diff --git a/common/changereplay.cpp b/common/changereplay.cpp index e3b7158..6e58564 100644 --- a/common/changereplay.cpp +++ b/common/changereplay.cpp @@ -27,31 +27,32 @@ #include using namespace Sink; +using namespace Sink::Storage; SINK_DEBUG_AREA("changereplay"); -ChangeReplay::ChangeReplay(const QByteArray &resourceName) - : mStorage(storageLocation(), resourceName, Storage::ReadOnly), mChangeReplayStore(storageLocation(), resourceName + ".changereplay", Storage::ReadWrite), mReplayInProgress(false) +ChangeReplay::ChangeReplay(const ResourceContext &resourceContext) + : mStorage(storageLocation(), resourceContext.instanceId(), DataStore::ReadOnly), mChangeReplayStore(storageLocation(), resourceContext.instanceId() + ".changereplay", DataStore::ReadWrite), mReplayInProgress(false) { - SinkTrace() << "Created change replay: " << resourceName; + SinkTrace() << "Created change replay: " << resourceContext.instanceId(); } qint64 ChangeReplay::getLastReplayedRevision() { qint64 lastReplayedRevision = 0; - auto replayStoreTransaction = mChangeReplayStore.createTransaction(Storage::ReadOnly); + auto replayStoreTransaction = mChangeReplayStore.createTransaction(DataStore::ReadOnly); replayStoreTransaction.openDatabase().scan("lastReplayedRevision", [&lastReplayedRevision](const QByteArray &key, const QByteArray &value) -> bool { lastReplayedRevision = value.toLongLong(); return false; }, - [](const Storage::Error &) {}); + [](const DataStore::Error &) {}); return lastReplayedRevision; } bool ChangeReplay::allChangesReplayed() { - const qint64 topRevision = Storage::maxRevision(mStorage.createTransaction(Storage::ReadOnly, [](const Sink::Storage::Error &error) { + const qint64 topRevision = DataStore::maxRevision(mStorage.createTransaction(DataStore::ReadOnly, [](const Sink::Storage::DataStore::Error &error) { SinkWarning() << error.message; })); const qint64 lastReplayedRevision = getLastReplayedRevision(); @@ -61,7 +62,7 @@ bool ChangeReplay::allChangesReplayed() void ChangeReplay::recordReplayedRevision(qint64 revision) { - auto replayStoreTransaction = mChangeReplayStore.createTransaction(Storage::ReadWrite, [](const Sink::Storage::Error &error) { + auto replayStoreTransaction = mChangeReplayStore.createTransaction(DataStore::ReadWrite, [](const Sink::Storage::DataStore::Error &error) { SinkWarning() << error.message; }); replayStoreTransaction.openDatabase().write("lastReplayedRevision", QByteArray::number(revision)); @@ -74,10 +75,10 @@ KAsync::Job ChangeReplay::replayNextRevision() auto topRevision = QSharedPointer::create(0); return KAsync::syncStart([this, lastReplayedRevision, topRevision]() { mReplayInProgress = true; - mMainStoreTransaction = mStorage.createTransaction(Storage::ReadOnly, [](const Sink::Storage::Error &error) { + mMainStoreTransaction = mStorage.createTransaction(DataStore::ReadOnly, [](const Sink::Storage::DataStore::Error &error) { SinkWarning() << error.message; }); - auto replayStoreTransaction = mChangeReplayStore.createTransaction(Storage::ReadOnly, [](const Sink::Storage::Error &error) { + auto replayStoreTransaction = mChangeReplayStore.createTransaction(DataStore::ReadOnly, [](const Sink::Storage::DataStore::Error &error) { SinkWarning() << error.message; }); replayStoreTransaction.openDatabase().scan("lastReplayedRevision", @@ -85,8 +86,8 @@ KAsync::Job ChangeReplay::replayNextRevision() *lastReplayedRevision = value.toLongLong(); return false; }, - [](const Storage::Error &) {}); - *topRevision = Storage::maxRevision(mMainStoreTransaction); + [](const DataStore::Error &) {}); + *topRevision = DataStore::maxRevision(mMainStoreTransaction); SinkTrace() << "Changereplay from " << *lastReplayedRevision << " to " << *topRevision; }) .then(KAsync::dowhile( @@ -98,11 +99,11 @@ KAsync::Job ChangeReplay::replayNextRevision() qint64 revision = *lastReplayedRevision + 1; KAsync::Job replayJob = KAsync::null(); while (revision <= *topRevision) { - const auto uid = Storage::getUidFromRevision(mMainStoreTransaction, revision); - const auto type = Storage::getTypeFromRevision(mMainStoreTransaction, revision); - const auto key = Storage::assembleKey(uid, revision); + const auto uid = DataStore::getUidFromRevision(mMainStoreTransaction, revision); + const auto type = DataStore::getTypeFromRevision(mMainStoreTransaction, revision); + const auto key = DataStore::assembleKey(uid, revision); bool exitLoop = false; - Storage::mainDatabase(mMainStoreTransaction, type) + DataStore::mainDatabase(mMainStoreTransaction, type) .scan(key, [&lastReplayedRevision, type, this, &replayJob, &exitLoop, revision](const QByteArray &key, const QByteArray &value) -> bool { SinkTrace() << "Replaying " << key; @@ -123,7 +124,7 @@ KAsync::Job ChangeReplay::replayNextRevision() } return false; }, - [key](const Storage::Error &) { SinkError() << "Failed to replay change " << key; }); + [key](const DataStore::Error &) { SinkError() << "Failed to replay change " << key; }); if (exitLoop) { break; } diff --git a/common/changereplay.h b/common/changereplay.h index 88d6ce3..e86c4f2 100644 --- a/common/changereplay.h +++ b/common/changereplay.h @@ -24,6 +24,7 @@ #include #include "storage.h" +#include "resourcecontext.h" namespace Sink { @@ -38,7 +39,7 @@ class SINK_EXPORT ChangeReplay : public QObject { Q_OBJECT public: - ChangeReplay(const QByteArray &resourceName); + ChangeReplay(const ResourceContext &resourceContext); qint64 getLastReplayedRevision(); bool allChangesReplayed(); @@ -53,20 +54,20 @@ public slots: protected: virtual KAsync::Job replay(const QByteArray &type, const QByteArray &key, const QByteArray &value) = 0; virtual bool canReplay(const QByteArray &type, const QByteArray &key, const QByteArray &value) = 0; - Sink::Storage mStorage; + Sink::Storage::DataStore mStorage; private: void recordReplayedRevision(qint64 revision); KAsync::Job replayNextRevision(); - Sink::Storage mChangeReplayStore; + Sink::Storage::DataStore mChangeReplayStore; bool mReplayInProgress; - Sink::Storage::Transaction mMainStoreTransaction; + Sink::Storage::DataStore::Transaction mMainStoreTransaction; }; class NullChangeReplay : public ChangeReplay { public: - NullChangeReplay(const QByteArray &resourceName) : ChangeReplay(resourceName) {} + NullChangeReplay(const ResourceContext &resourceContext) : ChangeReplay(resourceContext) {} KAsync::Job replay(const QByteArray &type, const QByteArray &key, const QByteArray &value) Q_DECL_OVERRIDE { return KAsync::null(); } bool canReplay(const QByteArray &type, const QByteArray &key, const QByteArray &value) Q_DECL_OVERRIDE { return false; } }; diff --git a/common/datastorequery.cpp b/common/datastorequery.cpp index 7b7d3a3..d4a83b1 100644 --- a/common/datastorequery.cpp +++ b/common/datastorequery.cpp @@ -28,6 +28,7 @@ #include "event.h" using namespace Sink; +using namespace Sink::Storage; SINK_DEBUG_AREA("datastorequery") @@ -299,42 +300,18 @@ public: } }; -DataStoreQuery::DataStoreQuery(const Sink::Query &query, const QByteArray &type, Sink::Storage::Transaction &transaction, TypeIndex &typeIndex, std::function getProperty) - : mQuery(query), mTransaction(transaction), mType(type), mTypeIndex(typeIndex), mDb(Storage::mainDatabase(mTransaction, mType)), mGetProperty(getProperty) +DataStoreQuery::DataStoreQuery(const Sink::Query &query, const QByteArray &type, EntityStore::Ptr store, TypeIndex &typeIndex, std::function getProperty) + : mQuery(query), mType(type), mTypeIndex(typeIndex), mGetProperty(getProperty), mStore(store) { setupQuery(); } -static inline QVector fullScan(const Sink::Storage::Transaction &transaction, const QByteArray &bufferType) -{ - // TODO use a result set with an iterator, to read values on demand - SinkTrace() << "Looking for : " << bufferType; - //The scan can return duplicate results if we have multiple revisions, so we use a set to deduplicate. - QSet keys; - Storage::mainDatabase(transaction, bufferType) - .scan(QByteArray(), - [&](const QByteArray &key, const QByteArray &value) -> bool { - if (keys.contains(Sink::Storage::uidFromKey(key))) { - //Not something that should persist if the replay works, so we keep a message for now. - SinkTrace() << "Multiple revisions for key: " << key; - } - keys << Sink::Storage::uidFromKey(key); - return true; - }, - [](const Sink::Storage::Error &error) { SinkWarning() << "Error during query: " << error.message; }); - - SinkTrace() << "Full scan retrieved " << keys.size() << " results."; - return keys.toList().toVector(); -} - void DataStoreQuery::readEntity(const QByteArray &key, const BufferCallback &resultCallback) { - mDb.findLatest(key, - [=](const QByteArray &key, const QByteArray &value) -> bool { - resultCallback(Sink::Storage::uidFromKey(key), Sink::EntityBuffer(value.data(), value.size())); + mStore->readLatest(mType, key, [=](const QByteArray &key, const Sink::EntityBuffer &buffer) { + resultCallback(DataStore::uidFromKey(key), buffer); return false; - }, - [&](const Sink::Storage::Error &error) { SinkWarning() << "Error during query: " << error.message << key; }); + }); } QVariant DataStoreQuery::getProperty(const Sink::Entity &entity, const QByteArray &property) @@ -344,7 +321,7 @@ QVariant DataStoreQuery::getProperty(const Sink::Entity &entity, const QByteArra QVector DataStoreQuery::indexLookup(const QByteArray &property, const QVariant &value) { - return mTypeIndex.lookup(property, value, mTransaction); + return mStore->indexLookup(mType, property, value); } /* ResultSet DataStoreQuery::filterAndSortSet(ResultSet &resultSet, const FilterFunction &filter, const QByteArray &sortProperty) */ @@ -444,7 +421,7 @@ QSharedPointer prepareQuery(const QByteArray &type, Args && ... QByteArrayList DataStoreQuery::executeSubquery(const Query &subquery) { Q_ASSERT(!subquery.type.isEmpty()); - auto sub = prepareQuery(subquery.type, subquery, mTransaction); + auto sub = prepareQuery(subquery.type, subquery, mStore); auto result = sub->execute(); QByteArrayList ids; while (result.next([&ids](const ResultSet::Result &result) { @@ -476,13 +453,13 @@ void DataStoreQuery::setupQuery() } else { QSet appliedFilters; - auto resultSet = mTypeIndex.query(mQuery, appliedFilters, appliedSorting, mTransaction); + auto resultSet = mStore->indexLookup(mType, mQuery, appliedFilters, appliedSorting); remainingFilters = remainingFilters - appliedFilters; // We do a full scan if there were no indexes available to create the initial set. if (appliedFilters.isEmpty()) { // TODO this should be replaced by an index lookup on the uid index - mSource = Source::Ptr::create(fullScan(mTransaction, mType), this); + mSource = Source::Ptr::create(mStore->fullScan(mType), this); } else { mSource = Source::Ptr::create(resultSet, this); } @@ -523,26 +500,11 @@ void DataStoreQuery::setupQuery() QVector DataStoreQuery::loadIncrementalResultSet(qint64 baseRevision) { - const auto bufferType = mType; auto revisionCounter = QSharedPointer::create(baseRevision); QVector changedKeys; - const qint64 topRevision = Sink::Storage::maxRevision(mTransaction); - // Spit out the revision keys one by one. - while (*revisionCounter <= topRevision) { - const auto uid = Sink::Storage::getUidFromRevision(mTransaction, *revisionCounter); - const auto type = Sink::Storage::getTypeFromRevision(mTransaction, *revisionCounter); - // SinkTrace() << "Revision" << *revisionCounter << type << uid; - Q_ASSERT(!uid.isEmpty()); - Q_ASSERT(!type.isEmpty()); - if (type != bufferType) { - // Skip revision - *revisionCounter += 1; - continue; - } - const auto key = Sink::Storage::assembleKey(uid, *revisionCounter); - *revisionCounter += 1; + mStore->readRevisions(baseRevision, mType, [&](const QByteArray &key) { changedKeys << key; - } + }); SinkTrace() << "Finished reading incremental result set:" << *revisionCounter; return changedKeys; } diff --git a/common/datastorequery.h b/common/datastorequery.h index 164d721..4cf25b2 100644 --- a/common/datastorequery.h +++ b/common/datastorequery.h @@ -25,6 +25,7 @@ #include "query.h" #include "entitybuffer.h" #include "log.h" +#include "storage/entitystore.h" class Source; @@ -35,11 +36,11 @@ class DataStoreQuery { public: typedef QSharedPointer Ptr; - DataStoreQuery(const Sink::Query &query, const QByteArray &type, Sink::Storage::Transaction &transaction, TypeIndex &typeIndex, std::function getProperty); + DataStoreQuery(const Sink::Query &query, const QByteArray &type, Sink::Storage::EntityStore::Ptr store, TypeIndex &typeIndex, std::function getProperty); ResultSet execute(); ResultSet update(qint64 baseRevision); -protected: +private: typedef std::function FilterFunction; typedef std::function BufferCallback; @@ -56,15 +57,15 @@ protected: QByteArrayList executeSubquery(const Sink::Query &subquery); Sink::Query mQuery; - Sink::Storage::Transaction &mTransaction; const QByteArray mType; TypeIndex &mTypeIndex; - Sink::Storage::NamedDatabase mDb; std::function mGetProperty; bool mInitialQuery; QSharedPointer mCollector; QSharedPointer mSource; + QSharedPointer mStore; + SINK_DEBUG_COMPONENT(mType) }; diff --git a/common/domain/applicationdomaintype.cpp b/common/domain/applicationdomaintype.cpp index 2a0d977..3109966 100644 --- a/common/domain/applicationdomaintype.cpp +++ b/common/domain/applicationdomaintype.cpp @@ -73,7 +73,7 @@ ApplicationDomainType::~ApplicationDomainType() QByteArray ApplicationDomainType::generateUid() { - return Sink::Storage::generateUid(); + return Sink::Storage::DataStore::generateUid(); } bool ApplicationDomainType::hasProperty(const QByteArray &key) const diff --git a/common/domain/applicationdomaintype.h b/common/domain/applicationdomaintype.h index e581e07..39ce2b9 100644 --- a/common/domain/applicationdomaintype.h +++ b/common/domain/applicationdomaintype.h @@ -241,6 +241,8 @@ struct SINK_EXPORT SinkResource : public ApplicationDomainType { struct SINK_EXPORT Entity : public ApplicationDomainType { typedef QSharedPointer Ptr; using ApplicationDomainType::ApplicationDomainType; + Entity() = default; + Entity(const ApplicationDomainType &other) : ApplicationDomainType(other) {} virtual ~Entity(); }; diff --git a/common/domain/event.cpp b/common/domain/event.cpp index f3abd62..d801592 100644 --- a/common/domain/event.cpp +++ b/common/domain/event.cpp @@ -42,23 +42,28 @@ static QMutex sMutex; using namespace Sink::ApplicationDomain; +void TypeImplementation::configureIndex(TypeIndex &index) +{ + index.addProperty(Event::Uid::name); +} + static TypeIndex &getIndex() { QMutexLocker locker(&sMutex); static TypeIndex *index = 0; if (!index) { index = new TypeIndex("event"); - index->addProperty("uid"); + TypeImplementation::configureIndex(*index); } return *index; } -void TypeImplementation::index(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::Transaction &transaction) +void TypeImplementation::index(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::DataStore::Transaction &transaction) { return getIndex().add(identifier, bufferAdaptor, transaction); } -void TypeImplementation::removeIndex(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::Transaction &transaction) +void TypeImplementation::removeIndex(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::DataStore::Transaction &transaction) { return getIndex().remove(identifier, bufferAdaptor, transaction); } @@ -83,10 +88,10 @@ QSharedPointer::BufferBuilder> > T return propertyMapper; } -DataStoreQuery::Ptr TypeImplementation::prepareQuery(const Sink::Query &query, Sink::Storage::Transaction &transaction) +DataStoreQuery::Ptr TypeImplementation::prepareQuery(const Sink::Query &query, Sink::Storage::EntityStore::Ptr store) { auto mapper = initializeReadPropertyMapper(); - return DataStoreQuery::Ptr::create(query, ApplicationDomain::getTypeName(), transaction, getIndex(), [mapper](const Sink::Entity &entity, const QByteArray &property) { + return DataStoreQuery::Ptr::create(query, ApplicationDomain::getTypeName(), store, getIndex(), [mapper](const Sink::Entity &entity, const QByteArray &property) { const auto localBuffer = Sink::EntityBuffer::readBuffer(entity.local()); return mapper->getProperty(property, localBuffer); diff --git a/common/domain/event.h b/common/domain/event.h index 684b58e..ce9691d 100644 --- a/common/domain/event.h +++ b/common/domain/event.h @@ -21,6 +21,7 @@ #include "applicationdomaintype.h" #include "storage.h" +#include "storage/entitystore.h" class ResultSet; class QByteArray; @@ -32,6 +33,8 @@ class WritePropertyMapper; class DataStoreQuery; +class TypeIndex; + namespace Sink { class Query; @@ -51,10 +54,12 @@ class TypeImplementation { public: typedef Sink::ApplicationDomain::Buffer::Event Buffer; typedef Sink::ApplicationDomain::Buffer::EventBuilder BufferBuilder; + static void configureIndex(TypeIndex &index); static QSet indexedProperties(); - static QSharedPointer prepareQuery(const Sink::Query &query, Sink::Storage::Transaction &transaction); - static void index(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::Transaction &transaction); - static void removeIndex(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::Transaction &transaction); + static QSharedPointer prepareQuery(const Sink::Query &query, Sink::Storage::EntityStore::Ptr store); + + static void index(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::DataStore::Transaction &transaction); + static void removeIndex(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::DataStore::Transaction &transaction); static QSharedPointer > initializeReadPropertyMapper(); static QSharedPointer > initializeWritePropertyMapper(); }; diff --git a/common/domain/folder.cpp b/common/domain/folder.cpp index 824fa0b..f04a3e7 100644 --- a/common/domain/folder.cpp +++ b/common/domain/folder.cpp @@ -44,25 +44,30 @@ static QMutex sMutex; using namespace Sink::ApplicationDomain; +void TypeImplementation::configureIndex(TypeIndex &index) +{ + index.addProperty(Folder::Parent::name); + index.addProperty(Folder::Name::name); +} + static TypeIndex &getIndex() { QMutexLocker locker(&sMutex); static TypeIndex *index = 0; if (!index) { index = new TypeIndex("folder"); - index->addProperty("parent"); - index->addProperty("name"); + TypeImplementation::configureIndex(*index); } return *index; } -void TypeImplementation::index(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::Transaction &transaction) +void TypeImplementation::index(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::DataStore::Transaction &transaction) { SinkTrace() << "Indexing " << identifier; getIndex().add(identifier, bufferAdaptor, transaction); } -void TypeImplementation::removeIndex(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::Transaction &transaction) +void TypeImplementation::removeIndex(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::DataStore::Transaction &transaction) { getIndex().remove(identifier, bufferAdaptor, transaction); } @@ -87,10 +92,10 @@ QSharedPointer::BufferBuilder> > return propertyMapper; } -DataStoreQuery::Ptr TypeImplementation::prepareQuery(const Sink::Query &query, Sink::Storage::Transaction &transaction) +DataStoreQuery::Ptr TypeImplementation::prepareQuery(const Sink::Query &query, Sink::Storage::EntityStore::Ptr store) { auto mapper = initializeReadPropertyMapper(); - return DataStoreQuery::Ptr::create(query, ApplicationDomain::getTypeName(), transaction, getIndex(), [mapper](const Sink::Entity &entity, const QByteArray &property) { + return DataStoreQuery::Ptr::create(query, ApplicationDomain::getTypeName(), store, getIndex(), [mapper](const Sink::Entity &entity, const QByteArray &property) { const auto localBuffer = Sink::EntityBuffer::readBuffer(entity.local()); return mapper->getProperty(property, localBuffer); }); diff --git a/common/domain/folder.h b/common/domain/folder.h index e4631de..0a52b01 100644 --- a/common/domain/folder.h +++ b/common/domain/folder.h @@ -21,6 +21,7 @@ #include "applicationdomaintype.h" #include "storage.h" +#include "storage/entitystore.h" class ResultSet; class QByteArray; @@ -31,6 +32,8 @@ class ReadPropertyMapper; template class WritePropertyMapper; +class TypeIndex; + namespace Sink { class Query; @@ -45,10 +48,11 @@ class TypeImplementation { public: typedef Sink::ApplicationDomain::Buffer::Folder Buffer; typedef Sink::ApplicationDomain::Buffer::FolderBuilder BufferBuilder; - static QSharedPointer prepareQuery(const Sink::Query &query, Sink::Storage::Transaction &transaction); + static void configureIndex(TypeIndex &index); + static QSharedPointer prepareQuery(const Sink::Query &query, Sink::Storage::EntityStore::Ptr store); static QSet indexedProperties(); - static void index(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::Transaction &transaction); - static void removeIndex(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::Transaction &transaction); + static void index(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::DataStore::Transaction &transaction); + static void removeIndex(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::DataStore::Transaction &transaction); static QSharedPointer > initializeReadPropertyMapper(); static QSharedPointer > initializeWritePropertyMapper(); }; diff --git a/common/domain/mail.cpp b/common/domain/mail.cpp index 2b6eb84..1b46e28 100644 --- a/common/domain/mail.cpp +++ b/common/domain/mail.cpp @@ -45,25 +45,31 @@ static QMutex sMutex; using namespace Sink; using namespace Sink::ApplicationDomain; +void TypeImplementation::configureIndex(TypeIndex &index) +{ + index.addProperty(Mail::Uid::name); + index.addProperty(Mail::Sender::name); + index.addProperty(Mail::SenderName::name); + /* index->addProperty(Mail::Subject::name); */ + /* index->addFulltextProperty(Mail::Subject::name); */ + index.addProperty(Mail::Date::name); + index.addProperty(Mail::Folder::name); + index.addPropertyWithSorting(Mail::Folder::name, Mail::Date::name); + index.addProperty(Mail::MessageId::name); + index.addProperty(Mail::ParentMessageId::name); + + index.addProperty(); + index.addSecondaryProperty(); + index.addSecondaryProperty(); +} + static TypeIndex &getIndex() { QMutexLocker locker(&sMutex); static TypeIndex *index = 0; if (!index) { index = new TypeIndex("mail"); - index->addProperty(Mail::Uid::name); - index->addProperty(Mail::Sender::name); - index->addProperty(Mail::SenderName::name); - index->addProperty(Mail::Subject::name); - index->addProperty(Mail::Date::name); - index->addProperty(Mail::Folder::name); - index->addPropertyWithSorting(Mail::Folder::name, Mail::Date::name); - index->addProperty(Mail::MessageId::name); - index->addProperty(Mail::ParentMessageId::name); - - index->addProperty(); - index->addSecondaryProperty(); - index->addSecondaryProperty(); + TypeImplementation::configureIndex(*index); } return *index; } @@ -122,7 +128,7 @@ static QString stripOffPrefixes(const QString &subject) } -static void updateThreadingIndex(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::Transaction &transaction) +static void updateThreadingIndex(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::DataStore::Transaction &transaction) { auto messageId = bufferAdaptor.getProperty(Mail::MessageId::name); auto parentMessageId = bufferAdaptor.getProperty(Mail::ParentMessageId::name); @@ -164,16 +170,17 @@ static void updateThreadingIndex(const QByteArray &identifier, const BufferAdapt } } -void TypeImplementation::index(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::Transaction &transaction) +void TypeImplementation::index(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::DataStore::Transaction &transaction) { SinkTrace() << "Indexing " << identifier; getIndex().add(identifier, bufferAdaptor, transaction); updateThreadingIndex(identifier, bufferAdaptor, transaction); } -void TypeImplementation::removeIndex(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::Transaction &transaction) +void TypeImplementation::removeIndex(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::DataStore::Transaction &transaction) { getIndex().remove(identifier, bufferAdaptor, transaction); + //TODO cleanup threading index } QSharedPointer::Buffer> > TypeImplementation::initializeReadPropertyMapper() @@ -218,18 +225,21 @@ QSharedPointer::BufferBuilder> > Ty } -DataStoreQuery::Ptr TypeImplementation::prepareQuery(const Sink::Query &query, Sink::Storage::Transaction &transaction) +DataStoreQuery::Ptr TypeImplementation::prepareQuery(const Sink::Query &query, Sink::Storage::EntityStore::Ptr store) { auto mapper = initializeReadPropertyMapper(); - return DataStoreQuery::Ptr::create(query, ApplicationDomain::getTypeName(), transaction, getIndex(), [mapper, &transaction](const Sink::Entity &entity, const QByteArray &property) -> QVariant { + return DataStoreQuery::Ptr::create(query, ApplicationDomain::getTypeName(), store, getIndex(), [mapper, store](const Sink::Entity &entity, const QByteArray &property) -> QVariant { if (property == Mail::ThreadId::name) { const auto localBuffer = Sink::EntityBuffer::readBuffer(entity.local()); Q_ASSERT(localBuffer); auto messageId = mapper->getProperty(Mail::MessageId::name, localBuffer); + //FIXME //This is an index property that we have too lookup - auto thread = getIndex().secondaryLookup(messageId, transaction); - Q_ASSERT(!thread.isEmpty()); - return thread.first(); + /* auto thread = getIndex().secondaryLookup(messageId); */ + /* auto thread = store->secondaryLookup(messageId); */ + /* Q_ASSERT(!thread.isEmpty()); */ + /* return thread.first(); */ + return QVariant(); } else { const auto localBuffer = Sink::EntityBuffer::readBuffer(entity.local()); Q_ASSERT(localBuffer); diff --git a/common/domain/mail.h b/common/domain/mail.h index ea3ef9e..6c1f670 100644 --- a/common/domain/mail.h +++ b/common/domain/mail.h @@ -21,7 +21,7 @@ #include "applicationdomaintype.h" #include "storage.h" -#include "datastorequery.h" +#include "storage/entitystore.h" class ResultSet; class QByteArray; @@ -32,6 +32,8 @@ class ReadPropertyMapper; template class WritePropertyMapper; +class TypeIndex; + namespace Sink { class Query; @@ -46,10 +48,11 @@ class TypeImplementation { public: typedef Sink::ApplicationDomain::Buffer::Mail Buffer; typedef Sink::ApplicationDomain::Buffer::MailBuilder BufferBuilder; - static QSharedPointer prepareQuery(const Sink::Query &query, Sink::Storage::Transaction &transaction); + static void configureIndex(TypeIndex &index); + static QSharedPointer prepareQuery(const Sink::Query &query, Sink::Storage::EntityStore::Ptr storage); static QSet indexedProperties(); - static void index(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::Transaction &transaction); - static void removeIndex(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::Transaction &transaction); + static void index(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::DataStore::Transaction &transaction); + static void removeIndex(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::DataStore::Transaction &transaction); static QSharedPointer > initializeReadPropertyMapper(); static QSharedPointer > initializeWritePropertyMapper(); }; diff --git a/common/domainadaptor.h b/common/domainadaptor.h index 16fc8c2..6a9d755 100644 --- a/common/domainadaptor.h +++ b/common/domainadaptor.h @@ -164,7 +164,7 @@ public: return adaptor; } - virtual void + virtual bool createBuffer(const Sink::ApplicationDomain::ApplicationDomainType &domainObject, flatbuffers::FlatBufferBuilder &fbb, void const *metadataData = 0, size_t metadataSize = 0) Q_DECL_OVERRIDE { flatbuffers::FlatBufferBuilder localFbb; @@ -180,15 +180,16 @@ public: } Sink::EntityBuffer::assembleEntityBuffer(fbb, metadataData, metadataSize, resFbb.GetBufferPointer(), resFbb.GetSize(), localFbb.GetBufferPointer(), localFbb.GetSize()); + return true; } - virtual void createBuffer(const QSharedPointer &bufferAdaptor, flatbuffers::FlatBufferBuilder &fbb, void const *metadataData = 0, size_t metadataSize = 0) Q_DECL_OVERRIDE + virtual bool createBuffer(const QSharedPointer &bufferAdaptor, flatbuffers::FlatBufferBuilder &fbb, void const *metadataData = 0, size_t metadataSize = 0) Q_DECL_OVERRIDE { //TODO rewrite the unterlying functions so we don't have to wrap the bufferAdaptor auto newObject = Sink::ApplicationDomain::ApplicationDomainType("", "", 0, bufferAdaptor); //Serialize all properties newObject.setChangedProperties(bufferAdaptor->availableProperties().toSet()); - createBuffer(newObject, fbb, metadataData, metadataSize); + return createBuffer(newObject, fbb, metadataData, metadataSize); } diff --git a/common/domaintypeadaptorfactoryinterface.h b/common/domaintypeadaptorfactoryinterface.h index b498796..8829c87 100644 --- a/common/domaintypeadaptorfactoryinterface.h +++ b/common/domaintypeadaptorfactoryinterface.h @@ -44,7 +44,7 @@ public: * * Note that this only serialized parameters that are part of ApplicationDomainType::changedProperties() */ - virtual void + virtual bool createBuffer(const Sink::ApplicationDomain::ApplicationDomainType &domainType, flatbuffers::FlatBufferBuilder &fbb, void const *metadataData = 0, size_t metadataSize = 0) = 0; - virtual void createBuffer(const QSharedPointer &bufferAdaptor, flatbuffers::FlatBufferBuilder &fbb, void const *metadataData = 0, size_t metadataSize = 0) = 0; + virtual bool createBuffer(const QSharedPointer &bufferAdaptor, flatbuffers::FlatBufferBuilder &fbb, void const *metadataData = 0, size_t metadataSize = 0) = 0; }; diff --git a/common/entityreader.cpp b/common/entityreader.cpp index cca1511..c49d1f7 100644 --- a/common/entityreader.cpp +++ b/common/entityreader.cpp @@ -28,75 +28,82 @@ SINK_DEBUG_AREA("entityreader") using namespace Sink; -QSharedPointer EntityReaderUtils::getLatest(const Sink::Storage::NamedDatabase &db, const QByteArray &uid, DomainTypeAdaptorFactoryInterface &adaptorFactory, qint64 &retrievedRevision) -{ - QSharedPointer current; - db.findLatest(uid, - [¤t, &adaptorFactory, &retrievedRevision](const QByteArray &key, const QByteArray &data) -> bool { - Sink::EntityBuffer buffer(const_cast(data.data()), data.size()); - if (!buffer.isValid()) { - SinkWarning() << "Read invalid buffer from disk"; - } else { - SinkTrace() << "Found value " << key; - current = adaptorFactory.createAdaptor(buffer.entity()); - retrievedRevision = Sink::Storage::revisionFromKey(key); - } - return false; - }, - [](const Sink::Storage::Error &error) { SinkWarning() << "Failed to read current value from storage: " << error.message; }); - return current; -} - -QSharedPointer EntityReaderUtils::get(const Sink::Storage::NamedDatabase &db, const QByteArray &key, DomainTypeAdaptorFactoryInterface &adaptorFactory, qint64 &retrievedRevision) -{ - QSharedPointer current; - db.scan(key, - [¤t, &adaptorFactory, &retrievedRevision](const QByteArray &key, const QByteArray &data) -> bool { - Sink::EntityBuffer buffer(const_cast(data.data()), data.size()); - if (!buffer.isValid()) { - SinkWarning() << "Read invalid buffer from disk"; - } else { - current = adaptorFactory.createAdaptor(buffer.entity()); - retrievedRevision = Sink::Storage::revisionFromKey(key); - } - return false; - }, - [](const Sink::Storage::Error &error) { SinkWarning() << "Failed to read current value from storage: " << error.message; }); - return current; -} - -QSharedPointer EntityReaderUtils::getPrevious(const Sink::Storage::NamedDatabase &db, const QByteArray &uid, qint64 revision, DomainTypeAdaptorFactoryInterface &adaptorFactory, qint64 &retrievedRevision) -{ - QSharedPointer current; - qint64 latestRevision = 0; - db.scan(uid, - [¤t, &latestRevision, revision](const QByteArray &key, const QByteArray &) -> bool { - auto foundRevision = Sink::Storage::revisionFromKey(key); - if (foundRevision < revision && foundRevision > latestRevision) { - latestRevision = foundRevision; - } - return true; - }, - [](const Sink::Storage::Error &error) { SinkWarning() << "Failed to read current value from storage: " << error.message; }, true); - return get(db, Sink::Storage::assembleKey(uid, latestRevision), adaptorFactory, retrievedRevision); -} +/* QSharedPointer EntityReaderUtils::getLatest(const Sink::Storage::DataStore::NamedDatabase &db, const QByteArray &uid, DomainTypeAdaptorFactoryInterface &adaptorFactory, qint64 &retrievedRevision) */ +/* { */ +/* QSharedPointer current; */ +/* db.findLatest(uid, */ +/* [¤t, &adaptorFactory, &retrievedRevision](const QByteArray &key, const QByteArray &data) -> bool { */ +/* Sink::EntityBuffer buffer(const_cast(data.data()), data.size()); */ +/* if (!buffer.isValid()) { */ +/* SinkWarning() << "Read invalid buffer from disk"; */ +/* } else { */ +/* SinkTrace() << "Found value " << key; */ +/* current = adaptorFactory.createAdaptor(buffer.entity()); */ +/* retrievedRevision = Sink::Storage::DataStore::revisionFromKey(key); */ +/* } */ +/* return false; */ +/* }, */ +/* [](const Sink::Storage::DataStore::Error &error) { SinkWarning() << "Failed to read current value from storage: " << error.message; }); */ +/* return current; */ +/* } */ + +/* QSharedPointer EntityReaderUtils::get(const Sink::Storage::DataStore::NamedDatabase &db, const QByteArray &key, DomainTypeAdaptorFactoryInterface &adaptorFactory, qint64 &retrievedRevision) */ +/* { */ +/* QSharedPointer current; */ +/* db.scan(key, */ +/* [¤t, &adaptorFactory, &retrievedRevision](const QByteArray &key, const QByteArray &data) -> bool { */ +/* Sink::EntityBuffer buffer(const_cast(data.data()), data.size()); */ +/* if (!buffer.isValid()) { */ +/* SinkWarning() << "Read invalid buffer from disk"; */ +/* } else { */ +/* current = adaptorFactory.createAdaptor(buffer.entity()); */ +/* retrievedRevision = Sink::Storage::DataStore::revisionFromKey(key); */ +/* } */ +/* return false; */ +/* }, */ +/* [](const Sink::Storage::DataStore::Error &error) { SinkWarning() << "Failed to read current value from storage: " << error.message; }); */ +/* return current; */ +/* } */ + +/* QSharedPointer EntityReaderUtils::getPrevious(const Sink::Storage::DataStore::NamedDatabase &db, const QByteArray &uid, qint64 revision, DomainTypeAdaptorFactoryInterface &adaptorFactory, qint64 &retrievedRevision) */ +/* { */ +/* QSharedPointer current; */ +/* qint64 latestRevision = 0; */ +/* db.scan(uid, */ +/* [¤t, &latestRevision, revision](const QByteArray &key, const QByteArray &) -> bool { */ +/* auto foundRevision = Sink::Storage::DataStore::revisionFromKey(key); */ +/* if (foundRevision < revision && foundRevision > latestRevision) { */ +/* latestRevision = foundRevision; */ +/* } */ +/* return true; */ +/* }, */ +/* [](const Sink::Storage::DataStore::Error &error) { SinkWarning() << "Failed to read current value from storage: " << error.message; }, true); */ +/* return get(db, Sink::Storage::DataStore::assembleKey(uid, latestRevision), adaptorFactory, retrievedRevision); */ +/* } */ + +/* template */ +/* EntityReader::EntityReader(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier, Sink::Storage::DataStore::Transaction &transaction) */ +/* : mResourceInstanceIdentifier(resourceInstanceIdentifier), */ +/* mTransaction(transaction), */ +/* mDomainTypeAdaptorFactoryPtr(Sink::AdaptorFactoryRegistry::instance().getFactory(resourceType)), */ +/* mDomainTypeAdaptorFactory(*mDomainTypeAdaptorFactoryPtr) */ +/* { */ +/* Q_ASSERT(!resourceType.isEmpty()); */ +/* Q_ASSERT(mDomainTypeAdaptorFactoryPtr); */ +/* } */ + +/* template */ +/* EntityReader::EntityReader(DomainTypeAdaptorFactoryInterface &domainTypeAdaptorFactory, const QByteArray &resourceInstanceIdentifier, Sink::Storage::DataStore::Transaction &transaction) */ +/* : mResourceInstanceIdentifier(resourceInstanceIdentifier), */ +/* mTransaction(transaction), */ +/* mDomainTypeAdaptorFactory(domainTypeAdaptorFactory) */ +/* { */ + +/* } */ template -EntityReader::EntityReader(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier, Sink::Storage::Transaction &transaction) - : mResourceInstanceIdentifier(resourceInstanceIdentifier), - mTransaction(transaction), - mDomainTypeAdaptorFactoryPtr(Sink::AdaptorFactoryRegistry::instance().getFactory(resourceType)), - mDomainTypeAdaptorFactory(*mDomainTypeAdaptorFactoryPtr) -{ - Q_ASSERT(!resourceType.isEmpty()); - Q_ASSERT(mDomainTypeAdaptorFactoryPtr); -} - -template -EntityReader::EntityReader(DomainTypeAdaptorFactoryInterface &domainTypeAdaptorFactory, const QByteArray &resourceInstanceIdentifier, Sink::Storage::Transaction &transaction) - : mResourceInstanceIdentifier(resourceInstanceIdentifier), - mTransaction(transaction), - mDomainTypeAdaptorFactory(domainTypeAdaptorFactory) +EntityReader::EntityReader(Storage::EntityStore &entityStore) + : mEntityStore(entityStore) { } @@ -105,40 +112,28 @@ template DomainType EntityReader::read(const QByteArray &identifier) const { auto typeName = ApplicationDomain::getTypeName(); - auto mainDatabase = Storage::mainDatabase(mTransaction, typeName); - qint64 retrievedRevision = 0; - auto bufferAdaptor = EntityReaderUtils::getLatest(mainDatabase, identifier, mDomainTypeAdaptorFactory, retrievedRevision); - if (!bufferAdaptor) { - return DomainType(); - } - return DomainType(mResourceInstanceIdentifier, identifier, retrievedRevision, bufferAdaptor); + return mEntityStore.readLatest(identifier); } template DomainType EntityReader::readFromKey(const QByteArray &key) const { - auto typeName = ApplicationDomain::getTypeName(); - auto mainDatabase = Storage::mainDatabase(mTransaction, typeName); - qint64 retrievedRevision = 0; - auto bufferAdaptor = EntityReaderUtils::get(mainDatabase, key, mDomainTypeAdaptorFactory, retrievedRevision); - const auto identifier = Storage::uidFromKey(key); - if (!bufferAdaptor) { - return DomainType(); - } - return DomainType(mResourceInstanceIdentifier, identifier, retrievedRevision, bufferAdaptor); + /* auto typeName = ApplicationDomain::getTypeName(); */ + /* auto mainDatabase = Storage::DataStore::mainDatabase(mTransaction, typeName); */ + /* qint64 retrievedRevision = 0; */ + /* auto bufferAdaptor = EntityReaderUtils::get(mainDatabase, key, mDomainTypeAdaptorFactory, retrievedRevision); */ + /* const auto identifier = Storage::DataStore::uidFromKey(key); */ + /* if (!bufferAdaptor) { */ + /* return DomainType(); */ + /* } */ + /* return DomainType(mResourceInstanceIdentifier, identifier, retrievedRevision, bufferAdaptor); */ + return mEntityStore.readEntity(key); } template DomainType EntityReader::readPrevious(const QByteArray &uid, qint64 revision) const { - auto typeName = ApplicationDomain::getTypeName(); - auto mainDatabase = Storage::mainDatabase(mTransaction, typeName); - qint64 retrievedRevision = 0; - auto bufferAdaptor = EntityReaderUtils::getPrevious(mainDatabase, uid, revision, mDomainTypeAdaptorFactory, retrievedRevision); - if (!bufferAdaptor) { - return DomainType(); - } - return DomainType(mResourceInstanceIdentifier, uid, retrievedRevision, bufferAdaptor); + return mEntityStore.readPrevious(uid, revision); } template @@ -157,14 +152,14 @@ QPair EntityReader::executeInitialQuery(const Sink:: QTime time; time.start(); - auto preparedQuery = ApplicationDomain::TypeImplementation::prepareQuery(query, mTransaction); + auto preparedQuery = ApplicationDomain::TypeImplementation::prepareQuery(query, Storage::EntityStore::Ptr(&mEntityStore, [](Storage::EntityStore *){})); auto resultSet = preparedQuery->execute(); SinkTrace() << "Filtered set retrieved. " << Log::TraceTime(time.elapsed()); auto replayedEntities = replaySet(resultSet, offset, batchsize, callback); SinkTrace() << "Initial query took: " << Log::TraceTime(time.elapsed()); - return qMakePair(Sink::Storage::maxRevision(mTransaction), replayedEntities); + return qMakePair(mEntityStore.maxRevision(), replayedEntities); } template @@ -174,14 +169,14 @@ QPair EntityReader::executeIncrementalQuery(const Si time.start(); const qint64 baseRevision = lastRevision + 1; - auto preparedQuery = ApplicationDomain::TypeImplementation::prepareQuery(query, mTransaction); + auto preparedQuery = ApplicationDomain::TypeImplementation::prepareQuery(query, Storage::EntityStore::Ptr(&mEntityStore, [](Storage::EntityStore *){})); auto resultSet = preparedQuery->update(baseRevision); SinkTrace() << "Filtered set retrieved. " << Log::TraceTime(time.elapsed()); auto replayedEntities = replaySet(resultSet, 0, 0, callback); SinkTrace() << "Incremental query took: " << Log::TraceTime(time.elapsed()); - return qMakePair(Sink::Storage::maxRevision(mTransaction), replayedEntities); + return qMakePair(mEntityStore.maxRevision(), replayedEntities); } template @@ -190,18 +185,18 @@ qint64 EntityReader::replaySet(ResultSet &resultSet, int offset, int SinkTrace() << "Skipping over " << offset << " results"; resultSet.skip(offset); int counter = 0; - while (!batchSize || (counter < batchSize)) { - const bool ret = - resultSet.next([this, &counter, callback](const ResultSet::Result &result) -> bool { - counter++; - auto adaptor = mDomainTypeAdaptorFactory.createAdaptor(result.buffer.entity()); - Q_ASSERT(adaptor); - return callback(QSharedPointer::create(mResourceInstanceIdentifier, result.uid, result.buffer.revision(), adaptor), result.operation, result.aggregateValues); - }); - if (!ret) { - break; - } - }; + /* while (!batchSize || (counter < batchSize)) { */ + /* const bool ret = */ + /* resultSet.next([this, &counter, callback](const ResultSet::Result &result) -> bool { */ + /* counter++; */ + /* auto adaptor = mResourceContext.adaptorFactory().createAdaptor(result.buffer.entity()); */ + /* Q_ASSERT(adaptor); */ + /* return callback(QSharedPointer::create(mResourceContext, result.uid, result.buffer.revision(), adaptor), result.operation, result.aggregateValues); */ + /* }); */ + /* if (!ret) { */ + /* break; */ + /* } */ + /* }; */ SinkTrace() << "Replayed " << counter << " results." << "Limit " << batchSize; return counter; diff --git a/common/entityreader.h b/common/entityreader.h index 1e7b086..a641106 100644 --- a/common/entityreader.h +++ b/common/entityreader.h @@ -30,9 +30,9 @@ namespace Sink { namespace EntityReaderUtils { - SINK_EXPORT QSharedPointer getLatest(const Sink::Storage::NamedDatabase &db, const QByteArray &uid, DomainTypeAdaptorFactoryInterface &adaptorFactory, qint64 &retrievedRevision); - SINK_EXPORT QSharedPointer get(const Sink::Storage::NamedDatabase &db, const QByteArray &key, DomainTypeAdaptorFactoryInterface &adaptorFactory, qint64 &retrievedRevision); - SINK_EXPORT QSharedPointer getPrevious(const Sink::Storage::NamedDatabase &db, const QByteArray &uid, qint64 revision, DomainTypeAdaptorFactoryInterface &adaptorFactory, qint64 &retrievedRevision); + SINK_EXPORT QSharedPointer getLatest(const Sink::Storage::DataStore::NamedDatabase &db, const QByteArray &uid, DomainTypeAdaptorFactoryInterface &adaptorFactory, qint64 &retrievedRevision); + SINK_EXPORT QSharedPointer get(const Sink::Storage::DataStore::NamedDatabase &db, const QByteArray &key, DomainTypeAdaptorFactoryInterface &adaptorFactory, qint64 &retrievedRevision); + SINK_EXPORT QSharedPointer getPrevious(const Sink::Storage::DataStore::NamedDatabase &db, const QByteArray &uid, qint64 revision, DomainTypeAdaptorFactoryInterface &adaptorFactory, qint64 &retrievedRevision); }; /** @@ -41,7 +41,7 @@ namespace EntityReaderUtils { * All callbacks will be called before the end of the function. * The caller must ensure passed in references remain valid for the lifetime of the object. * - * This class is meaent to be instantiated temporarily during reads on the stack. + * This class is meant to be instantiated temporarily during reads on the stack. * * Note that all objects returned in callbacks are only valid during the execution of the callback and may start pointing into invalid memory if shallow-copied. */ @@ -51,8 +51,7 @@ class SINK_EXPORT EntityReader typedef std::function &aggregateValues)> ResultCallback; public: - EntityReader(const QByteArray &resourceType, const QByteArray &mResourceInstanceIdentifier, Sink::Storage::Transaction &transaction); - EntityReader(DomainTypeAdaptorFactoryInterface &domainTypeAdaptorFactory, const QByteArray &resourceInstanceIdentifier, Sink::Storage::Transaction &transaction); + EntityReader(Storage::EntityStore &store); /** * Reads the latest revision of an entity identified by @param uid @@ -90,10 +89,7 @@ private: qint64 replaySet(ResultSet &resultSet, int offset, int batchSize, const ResultCallback &callback); private: - QByteArray mResourceInstanceIdentifier; - Sink::Storage::Transaction &mTransaction; - std::shared_ptr mDomainTypeAdaptorFactoryPtr; - DomainTypeAdaptorFactoryInterface &mDomainTypeAdaptorFactory; + Sink::Storage::EntityStore &mEntityStore; }; } diff --git a/common/entitystore.cpp b/common/entitystore.cpp index 5fb213d..b7b03aa 100644 --- a/common/entitystore.cpp +++ b/common/entitystore.cpp @@ -21,9 +21,8 @@ using namespace Sink; -EntityStore::EntityStore(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier, Sink::Storage::Transaction &transaction) - : mResourceType(resourceType), mResourceInstanceIdentifier(resourceInstanceIdentifier), - mTransaction(transaction) +EntityStore::EntityStore(Storage::EntityStore &store_) + : store(store_) { } diff --git a/common/entitystore.h b/common/entitystore.h index 6bfe414..3d9ca36 100644 --- a/common/entitystore.h +++ b/common/entitystore.h @@ -20,50 +20,42 @@ #pragma once #include "sink_export.h" -#include -#include "storage.h" -#include "adaptorfactoryregistry.h" -#include "entityreader.h" +#include "storage/entitystore.h" namespace Sink { class SINK_EXPORT EntityStore { public: - EntityStore(const QByteArray &resourceType, const QByteArray &mResourceInstanceIdentifier, Sink::Storage::Transaction &transaction); + EntityStore(Storage::EntityStore &store); template T read(const QByteArray &identifier) const { - EntityReader reader(mResourceType, mResourceInstanceIdentifier, mTransaction); - return reader.read(identifier); + return store.readLatest(identifier); } template T readFromKey(const QByteArray &key) const { - EntityReader reader(mResourceType, mResourceInstanceIdentifier, mTransaction); - return reader.readFromKey(key); + return store.readEntity(key); } template T readPrevious(const QByteArray &uid, qint64 revision) const { - EntityReader reader(mResourceType, mResourceInstanceIdentifier, mTransaction); - return reader.readPrevious(uid, revision); + return store.readPrevious(uid, revision); } - template - EntityReader reader() - { - return EntityReader(mResourceType, mResourceInstanceIdentifier, mTransaction); - } + /* template */ + /* EntityReader reader() */ + /* { */ + /* return EntityReader(mResourceType, mResourceInstanceIdentifier, mTransaction); */ + /* } */ private: - QByteArray mResourceType; - QByteArray mResourceInstanceIdentifier; - Sink::Storage::Transaction &mTransaction; + Sink::Storage::EntityStore &store; }; } diff --git a/common/facade.cpp b/common/facade.cpp index 72f7414..3ec58e3 100644 --- a/common/facade.cpp +++ b/common/facade.cpp @@ -31,13 +31,9 @@ using namespace Sink; template -GenericFacade::GenericFacade( - const QByteArray &resourceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &adaptorFactory, const QSharedPointer resourceAccess) - : Sink::StoreFacade(), mResourceAccess(resourceAccess), mDomainTypeAdaptorFactory(adaptorFactory), mResourceInstanceIdentifier(resourceIdentifier) +GenericFacade::GenericFacade(const ResourceContext &context) + : Sink::StoreFacade(), mResourceContext(context), mResourceAccess(mResourceContext.resourceAccess()) { - if (!mResourceAccess) { - mResourceAccess = ResourceAccessFactory::instance().getAccess(resourceIdentifier, ResourceConfig::getResourceType(resourceIdentifier)); - } } template @@ -55,25 +51,23 @@ QByteArray GenericFacade::bufferTypeForDomainType() template KAsync::Job GenericFacade::create(const DomainType &domainObject) { - if (!mDomainTypeAdaptorFactory) { + flatbuffers::FlatBufferBuilder entityFbb; + if (!mResourceContext.adaptorFactory().createBuffer(domainObject, entityFbb)) { SinkWarning() << "No domain type adaptor factory available"; return KAsync::error(); } - flatbuffers::FlatBufferBuilder entityFbb; - mDomainTypeAdaptorFactory->createBuffer(domainObject, entityFbb); return mResourceAccess->sendCreateCommand(domainObject.identifier(), bufferTypeForDomainType(), BufferUtils::extractBuffer(entityFbb)); } template KAsync::Job GenericFacade::modify(const DomainType &domainObject) { - if (!mDomainTypeAdaptorFactory) { + SinkTrace() << "Modifying entity: " << domainObject.identifier() << domainObject.changedProperties(); + flatbuffers::FlatBufferBuilder entityFbb; + if (!mResourceContext.adaptorFactory().createBuffer(domainObject, entityFbb)) { SinkWarning() << "No domain type adaptor factory available"; return KAsync::error(); } - SinkTrace() << "Modifying entity: " << domainObject.identifier() << domainObject.changedProperties(); - flatbuffers::FlatBufferBuilder entityFbb; - mDomainTypeAdaptorFactory->createBuffer(domainObject, entityFbb); return mResourceAccess->sendModifyCommand(domainObject.identifier(), domainObject.revision(), bufferTypeForDomainType(), QByteArrayList(), BufferUtils::extractBuffer(entityFbb), domainObject.changedProperties()); } @@ -87,7 +81,7 @@ template QPair, typename ResultEmitter::Ptr> GenericFacade::load(const Sink::Query &query) { // The runner lives for the lifetime of the query - auto runner = new QueryRunner(query, mResourceAccess, mResourceInstanceIdentifier, mDomainTypeAdaptorFactory, bufferTypeForDomainType()); + auto runner = new QueryRunner(query, mResourceContext, bufferTypeForDomainType()); runner->setResultTransformation(mResultTransformation); return qMakePair(KAsync::null(), runner->emitter()); } diff --git a/common/facade.h b/common/facade.h index b193580..50d93e0 100644 --- a/common/facade.h +++ b/common/facade.h @@ -28,6 +28,7 @@ #include "resourceaccess.h" #include "domaintypeadaptorfactoryinterface.h" #include "storage.h" +#include "resourcecontext.h" namespace Sink { @@ -48,7 +49,7 @@ class SINK_EXPORT GenericFacade : public Sink::StoreFacade { protected: SINK_DEBUG_AREA("facade") - SINK_DEBUG_COMPONENT(mResourceInstanceIdentifier) + SINK_DEBUG_COMPONENT(mResourceContext.resourceInstanceIdentifier) public: /** * Create a new GenericFacade @@ -56,8 +57,7 @@ public: * @param resourceIdentifier is the identifier of the resource instance * @param adaptorFactory is the adaptor factory used to generate the mappings from domain to resource types and vice versa */ - GenericFacade(const QByteArray &resourceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &adaptorFactory = DomainTypeAdaptorFactoryInterface::Ptr(), - const QSharedPointer resourceAccess = QSharedPointer()); + GenericFacade(const ResourceContext &context); virtual ~GenericFacade(); static QByteArray bufferTypeForDomainType(); @@ -68,20 +68,18 @@ public: protected: std::function mResultTransformation; - // TODO use one resource access instance per application & per resource - QSharedPointer mResourceAccess; - DomainTypeAdaptorFactoryInterface::Ptr mDomainTypeAdaptorFactory; - QByteArray mResourceInstanceIdentifier; + ResourceContext mResourceContext; + Sink::ResourceAccessInterface::Ptr mResourceAccess; }; /** - * A default facade implemenation that simply instantiates a generic resource with the given DomainTypeAdaptorFactory + * A default facade implemenation that simply instantiates a generic resource */ -template +template class DefaultFacade : public GenericFacade { public: - DefaultFacade(const QByteArray &resourceIdentifier) : GenericFacade(resourceIdentifier, QSharedPointer::create()) {} + DefaultFacade(const ResourceContext &context) : GenericFacade(context) {} virtual ~DefaultFacade(){} }; diff --git a/common/facadefactory.cpp b/common/facadefactory.cpp index b5a0ff2..107d575 100644 --- a/common/facadefactory.cpp +++ b/common/facadefactory.cpp @@ -21,6 +21,7 @@ #include "resourcefacade.h" #include "resource.h" +#include "adaptorfactoryregistry.h" using namespace Sink; @@ -72,7 +73,7 @@ std::shared_ptr FacadeFactory::getFacade(const QByteArray &resource, const } if (auto factoryFunction = mFacadeRegistry.value(k)) { - return factoryFunction(instanceIdentifier); + return factoryFunction(ResourceContext{instanceIdentifier, resource, AdaptorFactoryRegistry::instance().getFactories(resource)}); } qWarning() << "Failed to find facade for resource: " << resource << " and type: " << typeName; return std::shared_ptr(); diff --git a/common/facadefactory.h b/common/facadefactory.h index 7313970..8d41705 100644 --- a/common/facadefactory.h +++ b/common/facadefactory.h @@ -29,6 +29,7 @@ #include "facadeinterface.h" #include "applicationdomaintype.h" +#include "resourcecontext.h" #include "log.h" namespace Sink { @@ -41,7 +42,7 @@ namespace Sink { class SINK_EXPORT FacadeFactory { public: - typedef std::function(const QByteArray &)> FactoryFunction; + typedef std::function(const ResourceContext &)> FactoryFunction; void registerStaticFacades(); @@ -52,13 +53,13 @@ public: template void registerFacade(const QByteArray &resource) { - registerFacade(resource, [](const QByteArray &instanceIdentifier) { return std::make_shared(instanceIdentifier); }, ApplicationDomain::getTypeName()); + registerFacade(resource, [](const ResourceContext &context) { return std::make_shared(context); }, ApplicationDomain::getTypeName()); } template void registerFacade() { - registerFacade(QByteArray(), [](const QByteArray &) { return std::make_shared(); }, ApplicationDomain::getTypeName()); + registerFacade(QByteArray(), [](const ResourceContext &) { return std::make_shared(); }, ApplicationDomain::getTypeName()); } /* diff --git a/common/genericresource.cpp b/common/genericresource.cpp index ef6edc8..e0d395a 100644 --- a/common/genericresource.cpp +++ b/common/genericresource.cpp @@ -45,6 +45,7 @@ static int sBatchSize = 100; static int sCommitInterval = 10; using namespace Sink; +using namespace Sink::Storage; /** * Drives the pipeline using the output from all command queues @@ -58,7 +59,7 @@ class CommandProcessor : public QObject public: CommandProcessor(Sink::Pipeline *pipeline, QList commandQueues) : QObject(), mPipeline(pipeline), mCommandQueues(commandQueues), mProcessingLock(false) { - mLowerBoundRevision = Storage::maxRevision(mPipeline->storage().createTransaction(Storage::ReadOnly, [](const Sink::Storage::Error &error) { + mLowerBoundRevision = DataStore::maxRevision(mPipeline->storage().createTransaction(DataStore::ReadOnly, [](const Sink::Storage::DataStore::Error &error) { SinkWarning() << error.message; })); @@ -226,17 +227,15 @@ private: InspectionFunction mInspect; }; -GenericResource::GenericResource(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier, const QSharedPointer &pipeline ) +GenericResource::GenericResource(const ResourceContext &resourceContext, const QSharedPointer &pipeline ) : Sink::Resource(), - mUserQueue(Sink::storageLocation(), resourceInstanceIdentifier + ".userqueue"), - mSynchronizerQueue(Sink::storageLocation(), resourceInstanceIdentifier + ".synchronizerqueue"), - mResourceType(resourceType), - mResourceInstanceIdentifier(resourceInstanceIdentifier), - mPipeline(pipeline ? pipeline : QSharedPointer::create(resourceInstanceIdentifier)), + mResourceContext(resourceContext), + mUserQueue(Sink::storageLocation(), resourceContext.instanceId() + ".userqueue"), + mSynchronizerQueue(Sink::storageLocation(), resourceContext.instanceId() + ".synchronizerqueue"), + mPipeline(pipeline ? pipeline : QSharedPointer::create(resourceContext)), mError(0), mClientLowerBoundRevision(std::numeric_limits::max()) { - mPipeline->setResourceType(mResourceType); mProcessor = std::unique_ptr(new CommandProcessor(mPipeline.data(), QList() << &mUserQueue << &mSynchronizerQueue)); mProcessor->setInspectionCommand([this](void const *command, size_t size) { flatbuffers::Verifier verifier((const uint8_t *)command, size); @@ -357,19 +356,19 @@ void GenericResource::setupChangereplay(const QSharedPointer &chan void GenericResource::removeFromDisk(const QByteArray &instanceIdentifier) { - Sink::Storage(Sink::storageLocation(), instanceIdentifier, Sink::Storage::ReadWrite).removeFromDisk(); - Sink::Storage(Sink::storageLocation(), instanceIdentifier + ".userqueue", Sink::Storage::ReadWrite).removeFromDisk(); - Sink::Storage(Sink::storageLocation(), instanceIdentifier + ".synchronizerqueue", Sink::Storage::ReadWrite).removeFromDisk(); - Sink::Storage(Sink::storageLocation(), instanceIdentifier + ".changereplay", Sink::Storage::ReadWrite).removeFromDisk(); - Sink::Storage(Sink::storageLocation(), instanceIdentifier + ".synchronization", Sink::Storage::ReadWrite).removeFromDisk(); + Sink::Storage::DataStore(Sink::storageLocation(), instanceIdentifier, Sink::Storage::DataStore::ReadWrite).removeFromDisk(); + Sink::Storage::DataStore(Sink::storageLocation(), instanceIdentifier + ".userqueue", Sink::Storage::DataStore::ReadWrite).removeFromDisk(); + Sink::Storage::DataStore(Sink::storageLocation(), instanceIdentifier + ".synchronizerqueue", Sink::Storage::DataStore::ReadWrite).removeFromDisk(); + Sink::Storage::DataStore(Sink::storageLocation(), instanceIdentifier + ".changereplay", Sink::Storage::DataStore::ReadWrite).removeFromDisk(); + Sink::Storage::DataStore(Sink::storageLocation(), instanceIdentifier + ".synchronization", Sink::Storage::DataStore::ReadWrite).removeFromDisk(); } qint64 GenericResource::diskUsage(const QByteArray &instanceIdentifier) { - auto size = Sink::Storage(Sink::storageLocation(), instanceIdentifier, Sink::Storage::ReadOnly).diskUsage(); - size += Sink::Storage(Sink::storageLocation(), instanceIdentifier + ".userqueue", Sink::Storage::ReadOnly).diskUsage(); - size += Sink::Storage(Sink::storageLocation(), instanceIdentifier + ".synchronizerqueue", Sink::Storage::ReadOnly).diskUsage(); - size += Sink::Storage(Sink::storageLocation(), instanceIdentifier + ".changereplay", Sink::Storage::ReadOnly).diskUsage(); + auto size = Sink::Storage::DataStore(Sink::storageLocation(), instanceIdentifier, Sink::Storage::DataStore::ReadOnly).diskUsage(); + size += Sink::Storage::DataStore(Sink::storageLocation(), instanceIdentifier + ".userqueue", Sink::Storage::DataStore::ReadOnly).diskUsage(); + size += Sink::Storage::DataStore(Sink::storageLocation(), instanceIdentifier + ".synchronizerqueue", Sink::Storage::DataStore::ReadOnly).diskUsage(); + size += Sink::Storage::DataStore(Sink::storageLocation(), instanceIdentifier + ".changereplay", Sink::Storage::DataStore::ReadOnly).diskUsage(); return size; } diff --git a/common/genericresource.h b/common/genericresource.h index ec43939..687e307 100644 --- a/common/genericresource.h +++ b/common/genericresource.h @@ -43,7 +43,7 @@ class SINK_EXPORT GenericResource : public Resource protected: SINK_DEBUG_AREA("resource") public: - GenericResource(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier, const QSharedPointer &pipeline); + GenericResource(const Sink::ResourceContext &context, const QSharedPointer &pipeline); virtual ~GenericResource(); virtual void processCommand(int commandId, const QByteArray &data) Q_DECL_OVERRIDE; @@ -71,10 +71,9 @@ protected: void onProcessorError(int errorCode, const QString &errorMessage); void enqueueCommand(MessageQueue &mq, int commandId, const QByteArray &data); + ResourceContext mResourceContext; MessageQueue mUserQueue; MessageQueue mSynchronizerQueue; - QByteArray mResourceType; - QByteArray mResourceInstanceIdentifier; QSharedPointer mPipeline; private: diff --git a/common/index.cpp b/common/index.cpp index beed45c..c864e77 100644 --- a/common/index.cpp +++ b/common/index.cpp @@ -4,15 +4,15 @@ SINK_DEBUG_AREA("index") -Index::Index(const QString &storageRoot, const QString &name, Sink::Storage::AccessMode mode) - : mTransaction(Sink::Storage(storageRoot, name, mode).createTransaction(mode)), - mDb(mTransaction.openDatabase(name.toLatin1(), std::function(), true)), +Index::Index(const QString &storageRoot, const QString &name, Sink::Storage::DataStore::AccessMode mode) + : mTransaction(Sink::Storage::DataStore(storageRoot, name, mode).createTransaction(mode)), + mDb(mTransaction.openDatabase(name.toLatin1(), std::function(), true)), mName(name) { } -Index::Index(const QByteArray &name, Sink::Storage::Transaction &transaction) - : mDb(transaction.openDatabase(name, std::function(), true)), mName(name) +Index::Index(const QByteArray &name, Sink::Storage::DataStore::Transaction &transaction) + : mDb(transaction.openDatabase(name, std::function(), true)), mName(name) { } @@ -33,7 +33,7 @@ void Index::lookup(const QByteArray &key, const std::function class DefaultIndexUpdater : public Sink::Preprocessor { public: - void newEntity(const QByteArray &uid, qint64 revision, Sink::ApplicationDomain::BufferAdaptor &newEntity, Sink::Storage::Transaction &transaction) Q_DECL_OVERRIDE + void newEntity(const QByteArray &uid, qint64 revision, Sink::ApplicationDomain::BufferAdaptor &newEntity, Sink::Storage::DataStore::Transaction &transaction) Q_DECL_OVERRIDE { Sink::ApplicationDomain::TypeImplementation::index(uid, newEntity, transaction); } void modifiedEntity(const QByteArray &uid, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &oldEntity, Sink::ApplicationDomain::BufferAdaptor &newEntity, - Sink::Storage::Transaction &transaction) Q_DECL_OVERRIDE + Sink::Storage::DataStore::Transaction &transaction) Q_DECL_OVERRIDE { Sink::ApplicationDomain::TypeImplementation::removeIndex(uid, oldEntity, transaction); Sink::ApplicationDomain::TypeImplementation::index(uid, newEntity, transaction); } - void deletedEntity(const QByteArray &uid, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &oldEntity, Sink::Storage::Transaction &transaction) Q_DECL_OVERRIDE + void deletedEntity(const QByteArray &uid, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &oldEntity, Sink::Storage::DataStore::Transaction &transaction) Q_DECL_OVERRIDE { Sink::ApplicationDomain::TypeImplementation::removeIndex(uid, oldEntity, transaction); } diff --git a/common/listener.cpp b/common/listener.cpp index 1a8f392..0742017 100644 --- a/common/listener.cpp +++ b/common/listener.cpp @@ -23,6 +23,8 @@ #include "common/resource.h" #include "common/log.h" #include "common/definitions.h" +#include "common/resourcecontext.h" +#include "common/adaptorfactoryregistry.h" // commands #include "common/commandcompletion_generated.h" @@ -455,8 +457,8 @@ void Listener::notify(const Sink::Notification ¬ification) Sink::Resource &Listener::loadResource() { if (!m_resource) { - if (Sink::ResourceFactory *resourceFactory = Sink::ResourceFactory::load(m_resourceName)) { - m_resource = std::unique_ptr(resourceFactory->createResource(m_resourceInstanceIdentifier)); + if (auto resourceFactory = Sink::ResourceFactory::load(m_resourceName)) { + m_resource = std::unique_ptr(resourceFactory->createResource(Sink::ResourceContext{m_resourceInstanceIdentifier, m_resourceName, Sink::AdaptorFactoryRegistry::instance().getFactories(m_resourceName)})); if (!m_resource) { SinkError() << "Failed to instantiate the resource " << m_resourceName; m_resource = std::unique_ptr(new Sink::Resource); diff --git a/common/mailpreprocessor.cpp b/common/mailpreprocessor.cpp index ec5748f..b978323 100644 --- a/common/mailpreprocessor.cpp +++ b/common/mailpreprocessor.cpp @@ -116,7 +116,7 @@ static void updatedIndexedProperties(Sink::ApplicationDomain::Mail &mail, KMime: } } -void MailPropertyExtractor::newEntity(Sink::ApplicationDomain::Mail &mail, Sink::Storage::Transaction &transaction) +void MailPropertyExtractor::newEntity(Sink::ApplicationDomain::Mail &mail, Sink::Storage::DataStore::Transaction &transaction) { MimeMessageReader mimeMessageReader(getFilePathFromMimeMessagePath(mail.getMimeMessagePath())); auto msg = mimeMessageReader.mimeMessage(); @@ -125,7 +125,7 @@ void MailPropertyExtractor::newEntity(Sink::ApplicationDomain::Mail &mail, Sink: } } -void MailPropertyExtractor::modifiedEntity(const Sink::ApplicationDomain::Mail &oldMail, Sink::ApplicationDomain::Mail &newMail,Sink::Storage::Transaction &transaction) +void MailPropertyExtractor::modifiedEntity(const Sink::ApplicationDomain::Mail &oldMail, Sink::ApplicationDomain::Mail &newMail,Sink::Storage::DataStore::Transaction &transaction) { MimeMessageReader mimeMessageReader(getFilePathFromMimeMessagePath(newMail.getMimeMessagePath())); auto msg = mimeMessageReader.mimeMessage(); @@ -161,21 +161,21 @@ QString MimeMessageMover::moveMessage(const QString &oldPath, const Sink::Applic return oldPath; } -void MimeMessageMover::newEntity(Sink::ApplicationDomain::Mail &mail, Sink::Storage::Transaction &transaction) +void MimeMessageMover::newEntity(Sink::ApplicationDomain::Mail &mail, Sink::Storage::DataStore::Transaction &transaction) { if (!mail.getMimeMessagePath().isEmpty()) { mail.setMimeMessagePath(moveMessage(mail.getMimeMessagePath(), mail)); } } -void MimeMessageMover::modifiedEntity(const Sink::ApplicationDomain::Mail &oldMail, Sink::ApplicationDomain::Mail &newMail, Sink::Storage::Transaction &transaction) +void MimeMessageMover::modifiedEntity(const Sink::ApplicationDomain::Mail &oldMail, Sink::ApplicationDomain::Mail &newMail, Sink::Storage::DataStore::Transaction &transaction) { if (!newMail.getMimeMessagePath().isEmpty()) { newMail.setMimeMessagePath(moveMessage(newMail.getMimeMessagePath(), newMail)); } } -void MimeMessageMover::deletedEntity(const Sink::ApplicationDomain::Mail &mail, Sink::Storage::Transaction &transaction) +void MimeMessageMover::deletedEntity(const Sink::ApplicationDomain::Mail &mail, Sink::Storage::DataStore::Transaction &transaction) { QFile::remove(mail.getMimeMessagePath()); } diff --git a/common/mailpreprocessor.h b/common/mailpreprocessor.h index b7cd0e7..c66517e 100644 --- a/common/mailpreprocessor.h +++ b/common/mailpreprocessor.h @@ -24,8 +24,8 @@ class SINK_EXPORT MailPropertyExtractor : public Sink::EntityPreprocessor MessageQueue::dequeueBatch(int maxBatchSize, const std::functi return KAsync::start([this, maxBatchSize, resultHandler, resultCount](KAsync::Future &future) { int count = 0; QList> waitCondition; - mStorage.createTransaction(Sink::Storage::ReadOnly) + mStorage.createTransaction(Sink::Storage::DataStore::ReadOnly) .openDatabase() .scan("", [this, resultHandler, resultCount, &count, maxBatchSize, &waitCondition](const QByteArray &key, const QByteArray &value) -> bool { @@ -101,7 +101,7 @@ KAsync::Job MessageQueue::dequeueBatch(int maxBatchSize, const std::functi } return false; }, - [](const Sink::Storage::Error &error) { + [](const Sink::Storage::DataStore::Error &error) { SinkError() << "Error while retrieving value" << error.message; // errorHandler(Error(error.store, error.code, error.message)); }); @@ -126,7 +126,7 @@ KAsync::Job MessageQueue::dequeueBatch(int maxBatchSize, const std::functi bool MessageQueue::isEmpty() { int count = 0; - auto t = mStorage.createTransaction(Sink::Storage::ReadOnly); + auto t = mStorage.createTransaction(Sink::Storage::DataStore::ReadOnly); auto db = t.openDatabase(); if (db) { db.scan("", @@ -137,7 +137,7 @@ bool MessageQueue::isEmpty() } return true; }, - [](const Sink::Storage::Error &error) { SinkError() << "Error while checking if empty" << error.message; }); + [](const Sink::Storage::DataStore::Error &error) { SinkError() << "Error while checking if empty" << error.message; }); } return count == 0; } diff --git a/common/messagequeue.h b/common/messagequeue.h index 6f0bddb..f23ddcf 100644 --- a/common/messagequeue.h +++ b/common/messagequeue.h @@ -56,7 +56,7 @@ private slots: private: Q_DISABLE_COPY(MessageQueue); - Sink::Storage mStorage; - Sink::Storage::Transaction mWriteTransaction; + Sink::Storage::DataStore mStorage; + Sink::Storage::DataStore::Transaction mWriteTransaction; QByteArrayList mPendingRemoval; }; diff --git a/common/pipeline.cpp b/common/pipeline.cpp index ce864f7..e257857 100644 --- a/common/pipeline.cpp +++ b/common/pipeline.cpp @@ -40,45 +40,45 @@ SINK_DEBUG_AREA("pipeline") -namespace Sink { +using namespace Sink; +using namespace Sink::Storage; class Pipeline::Private { public: - Private(const QString &resourceName) : storage(Sink::storageLocation(), resourceName, Storage::ReadWrite), revisionChanged(false), resourceInstanceIdentifier(resourceName.toUtf8()) + Private(const ResourceContext &context) : resourceContext(context), storage(Sink::storageLocation(), context.instanceId(), DataStore::ReadWrite), revisionChanged(false) { } - Storage storage; - Storage::Transaction transaction; + ResourceContext resourceContext; + DataStore storage; + DataStore::Transaction transaction; QHash>> processors; bool revisionChanged; void storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid); QTime transactionTime; int transactionItemCount; - QByteArray resourceType; - QByteArray resourceInstanceIdentifier; }; void Pipeline::Private::storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid) { SinkTrace() << "Committing new revision: " << uid << newRevision; - Storage::mainDatabase(transaction, bufferType) - .write(Storage::assembleKey(uid, newRevision), BufferUtils::extractBuffer(fbb), - [uid, newRevision](const Storage::Error &error) { SinkWarning() << "Failed to write entity" << uid << newRevision; }); + DataStore::mainDatabase(transaction, bufferType) + .write(DataStore::assembleKey(uid, newRevision), BufferUtils::extractBuffer(fbb), + [uid, newRevision](const DataStore::Error &error) { SinkWarning() << "Failed to write entity" << uid << newRevision; }); revisionChanged = true; - Storage::setMaxRevision(transaction, newRevision); - Storage::recordRevision(transaction, newRevision, uid, bufferType); + DataStore::setMaxRevision(transaction, newRevision); + DataStore::recordRevision(transaction, newRevision, uid, bufferType); } -Pipeline::Pipeline(const QString &resourceName, QObject *parent) : QObject(parent), d(new Private(resourceName)) +Pipeline::Pipeline(const ResourceContext &context) : QObject(nullptr), d(new Private(context)) { } Pipeline::~Pipeline() { - d->transaction = Storage::Transaction(); + d->transaction = DataStore::Transaction(); } void Pipeline::setPreprocessors(const QString &entityType, const QVector &processors) @@ -86,16 +86,11 @@ void Pipeline::setPreprocessors(const QString &entityType, const QVectorprocessors[entityType]; list.clear(); for (auto p : processors) { - p->setup(d->resourceType, d->resourceInstanceIdentifier, this); + p->setup(d->resourceContext.resourceType, d->resourceContext.instanceId(), this); list.append(QSharedPointer(p)); } } -void Pipeline::setResourceType(const QByteArray &resourceType) -{ - d->resourceType = resourceType; -} - void Pipeline::startTransaction() { // TODO call for all types @@ -109,7 +104,7 @@ void Pipeline::startTransaction() SinkTrace() << "Starting transaction."; d->transactionTime.start(); d->transactionItemCount = 0; - d->transaction = storage().createTransaction(Storage::ReadWrite, [](const Sink::Storage::Error &error) { + d->transaction = storage().createTransaction(DataStore::ReadWrite, [](const DataStore::Error &error) { SinkWarning() << error.message; }); @@ -119,7 +114,7 @@ void Pipeline::startTransaction() if (d->storage.exists()) { while (!d->transaction.validateNamedDatabases()) { SinkWarning() << "Opened an invalid transaction!!!!!!"; - d->transaction = storage().createTransaction(Storage::ReadWrite, [](const Sink::Storage::Error &error) { + d->transaction = storage().createTransaction(DataStore::ReadWrite, [](const DataStore::Error &error) { SinkWarning() << error.message; }); } @@ -135,29 +130,29 @@ void Pipeline::commit() // } if (!d->revisionChanged) { d->transaction.abort(); - d->transaction = Storage::Transaction(); + d->transaction = DataStore::Transaction(); return; } - const auto revision = Storage::maxRevision(d->transaction); + const auto revision = DataStore::maxRevision(d->transaction); const auto elapsed = d->transactionTime.elapsed(); SinkLog() << "Committing revision: " << revision << ":" << d->transactionItemCount << " items in: " << Log::TraceTime(elapsed) << " " << (double)elapsed / (double)qMax(d->transactionItemCount, 1) << "[ms/item]"; if (d->transaction) { d->transaction.commit(); } - d->transaction = Storage::Transaction(); + d->transaction = DataStore::Transaction(); if (d->revisionChanged) { d->revisionChanged = false; emit revisionUpdated(revision); } } -Storage::Transaction &Pipeline::transaction() +DataStore::Transaction &Pipeline::transaction() { return d->transaction; } -Storage &Pipeline::storage() const +DataStore &Pipeline::storage() const { return d->storage; } @@ -180,14 +175,14 @@ KAsync::Job Pipeline::newEntity(void const *command, size_t size) QByteArray key; if (createEntity->entityId()) { key = QByteArray(reinterpret_cast(createEntity->entityId()->Data()), createEntity->entityId()->size()); - if (Storage::mainDatabase(d->transaction, bufferType).contains(key)) { + if (DataStore::mainDatabase(d->transaction, bufferType).contains(key)) { SinkError() << "An entity with this id already exists: " << key; return KAsync::error(0); } } if (key.isEmpty()) { - key = Sink::Storage::generateUid(); + key = DataStore::generateUid(); } SinkTrace() << "New Entity. Type: " << bufferType << "uid: "<< key << " replayToSource: " << replayToSource; Q_ASSERT(!key.isEmpty()); @@ -205,7 +200,7 @@ KAsync::Job Pipeline::newEntity(void const *command, size_t size) return KAsync::error(0); } - auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceType, bufferType); + auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceContext.resourceType, bufferType); if (!adaptorFactory) { SinkWarning() << "no adaptor factory for type " << bufferType; return KAsync::error(0); @@ -214,10 +209,10 @@ KAsync::Job Pipeline::newEntity(void const *command, size_t size) auto adaptor = adaptorFactory->createAdaptor(*entity); auto memoryAdaptor = QSharedPointer::create(*(adaptor), adaptor->availableProperties()); foreach (const auto &processor, d->processors[bufferType]) { - processor->newEntity(key, Storage::maxRevision(d->transaction) + 1, *memoryAdaptor, d->transaction); + processor->newEntity(key, DataStore::maxRevision(d->transaction) + 1, *memoryAdaptor, d->transaction); } //The maxRevision may have changed meanwhile if the entity created sub-entities - const qint64 newRevision = Storage::maxRevision(d->transaction) + 1; + const qint64 newRevision = DataStore::maxRevision(d->transaction) + 1; // Add metadata buffer flatbuffers::FlatBufferBuilder metadataFbb; @@ -233,6 +228,8 @@ KAsync::Job Pipeline::newEntity(void const *command, size_t size) d->storeNewRevision(newRevision, fbb, bufferType, key); + //FIXME entityStore->create(bufferType, memoryAdaptor, replayToSource) + return KAsync::value(newRevision); } @@ -273,7 +270,7 @@ KAsync::Job Pipeline::modifiedEntity(void const *command, size_t size) } // TODO use only readPropertyMapper and writePropertyMapper - auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceType, bufferType); + auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceContext.resourceType, bufferType); if (!adaptorFactory) { SinkWarning() << "no adaptor factory for type " << bufferType; return KAsync::error(0); @@ -284,7 +281,7 @@ KAsync::Job Pipeline::modifiedEntity(void const *command, size_t size) auto diff = adaptorFactory->createAdaptor(*diffEntity); QSharedPointer current; - Storage::mainDatabase(d->transaction, bufferType) + DataStore::mainDatabase(d->transaction, bufferType) .findLatest(key, [¤t, adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool { EntityBuffer buffer(const_cast(data.data()), data.size()); @@ -295,7 +292,7 @@ KAsync::Job Pipeline::modifiedEntity(void const *command, size_t size) } return false; }, - [baseRevision](const Storage::Error &error) { SinkWarning() << "Failed to read old revision from storage: " << error.message << "Revision: " << baseRevision; }); + [baseRevision](const DataStore::Error &error) { SinkWarning() << "Failed to read old revision from storage: " << error.message << "Revision: " << baseRevision; }); if (!current) { SinkWarning() << "Failed to read local value " << key; @@ -323,10 +320,10 @@ KAsync::Job Pipeline::modifiedEntity(void const *command, size_t size) newAdaptor->resetChangedProperties(); foreach (const auto &processor, d->processors[bufferType]) { - processor->modifiedEntity(key, Storage::maxRevision(d->transaction) + 1, *current, *newAdaptor, d->transaction); + processor->modifiedEntity(key, DataStore::maxRevision(d->transaction) + 1, *current, *newAdaptor, d->transaction); } //The maxRevision may have changed meanwhile if the entity created sub-entities - const qint64 newRevision = Storage::maxRevision(d->transaction) + 1; + const qint64 newRevision = DataStore::maxRevision(d->transaction) + 1; // Add metadata buffer flatbuffers::FlatBufferBuilder metadataFbb; @@ -369,7 +366,7 @@ KAsync::Job Pipeline::deletedEntity(void const *command, size_t size) bool found = false; bool alreadyRemoved = false; - Storage::mainDatabase(d->transaction, bufferType) + DataStore::mainDatabase(d->transaction, bufferType) .findLatest(key, [&found, &alreadyRemoved](const QByteArray &key, const QByteArray &data) -> bool { auto entity = GetEntity(data.data()); @@ -382,7 +379,7 @@ KAsync::Job Pipeline::deletedEntity(void const *command, size_t size) } return false; }, - [](const Storage::Error &error) { SinkWarning() << "Failed to read old revision from storage: " << error.message; }); + [](const DataStore::Error &error) { SinkWarning() << "Failed to read old revision from storage: " << error.message; }); if (!found) { SinkWarning() << "Failed to find entity " << key; @@ -393,7 +390,7 @@ KAsync::Job Pipeline::deletedEntity(void const *command, size_t size) return KAsync::error(0); } - const qint64 newRevision = Storage::maxRevision(d->transaction) + 1; + const qint64 newRevision = DataStore::maxRevision(d->transaction) + 1; // Add metadata buffer flatbuffers::FlatBufferBuilder metadataFbb; @@ -407,14 +404,14 @@ KAsync::Job Pipeline::deletedEntity(void const *command, size_t size) flatbuffers::FlatBufferBuilder fbb; EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), 0, 0, 0, 0); - auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceType, bufferType); + auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceContext.resourceType, bufferType); if (!adaptorFactory) { SinkWarning() << "no adaptor factory for type " << bufferType; return KAsync::error(0); } QSharedPointer current; - Storage::mainDatabase(d->transaction, bufferType) + DataStore::mainDatabase(d->transaction, bufferType) .findLatest(key, [this, bufferType, newRevision, adaptorFactory, key, ¤t](const QByteArray &, const QByteArray &data) -> bool { EntityBuffer buffer(const_cast(data.data()), data.size()); @@ -425,7 +422,7 @@ KAsync::Job Pipeline::deletedEntity(void const *command, size_t size) } return false; }, - [this](const Storage::Error &error) { SinkError() << "Failed to find value in pipeline: " << error.message; }); + [this](const DataStore::Error &error) { SinkError() << "Failed to find value in pipeline: " << error.message; }); d->storeNewRevision(newRevision, fbb, bufferType, key); @@ -439,10 +436,10 @@ KAsync::Job Pipeline::deletedEntity(void const *command, size_t size) void Pipeline::cleanupRevision(qint64 revision) { d->revisionChanged = true; - const auto uid = Storage::getUidFromRevision(d->transaction, revision); - const auto bufferType = Storage::getTypeFromRevision(d->transaction, revision); + const auto uid = DataStore::getUidFromRevision(d->transaction, revision); + const auto bufferType = DataStore::getTypeFromRevision(d->transaction, revision); SinkTrace() << "Cleaning up revision " << revision << uid << bufferType; - Storage::mainDatabase(d->transaction, bufferType) + DataStore::mainDatabase(d->transaction, bufferType) .scan(uid, [&](const QByteArray &key, const QByteArray &data) -> bool { EntityBuffer buffer(const_cast(data.data()), data.size()); @@ -453,20 +450,20 @@ void Pipeline::cleanupRevision(qint64 revision) const qint64 rev = metadata->revision(); // Remove old revisions, and the current if the entity has already been removed if (rev < revision || metadata->operation() == Operation_Removal) { - Storage::removeRevision(d->transaction, rev); - Storage::mainDatabase(d->transaction, bufferType).remove(key); + DataStore::removeRevision(d->transaction, rev); + DataStore::mainDatabase(d->transaction, bufferType).remove(key); } } return true; }, - [](const Storage::Error &error) { SinkWarning() << "Error while reading: " << error.message; }, true); - Storage::setCleanedUpRevision(d->transaction, revision); + [](const DataStore::Error &error) { SinkWarning() << "Error while reading: " << error.message; }, true); + DataStore::setCleanedUpRevision(d->transaction, revision); } qint64 Pipeline::cleanedUpRevision() { - return Storage::cleanedUpRevision(d->transaction); + return DataStore::cleanedUpRevision(d->transaction); } class Preprocessor::Private { @@ -523,8 +520,6 @@ void Preprocessor::createEntity(const Sink::ApplicationDomain::ApplicationDomain d->pipeline->newEntity(data, data.size()).exec(); } -} // namespace Sink - #pragma clang diagnostic push #pragma clang diagnostic ignored "-Wundefined-reinterpret-cast" #include "moc_pipeline.cpp" diff --git a/common/pipeline.h b/common/pipeline.h index ef89cf0..bf94017 100644 --- a/common/pipeline.h +++ b/common/pipeline.h @@ -1,5 +1,6 @@ /* * Copyright (C) 2014 Aaron Seigo + * Copyright (C) 2015 Christian Mollekopf * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public @@ -41,16 +42,15 @@ class SINK_EXPORT Pipeline : public QObject Q_OBJECT public: - Pipeline(const QString &storagePath, QObject *parent = 0); + Pipeline(const ResourceContext &context); ~Pipeline(); - Storage &storage() const; + Storage::DataStore &storage() const; - void setResourceType(const QByteArray &resourceType); void setPreprocessors(const QString &entityType, const QVector &preprocessors); void startTransaction(); void commit(); - Storage::Transaction &transaction(); + Storage::DataStore::Transaction &transaction(); KAsync::Job newEntity(void const *command, size_t size); KAsync::Job modifiedEntity(void const *command, size_t size); @@ -82,10 +82,10 @@ public: virtual ~Preprocessor(); virtual void startBatch(); - virtual void newEntity(const QByteArray &uid, qint64 revision, Sink::ApplicationDomain::BufferAdaptor &newEntity, Sink::Storage::Transaction &transaction) {}; - virtual void modifiedEntity(const QByteArray &uid, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &oldEntity, - Sink::ApplicationDomain::BufferAdaptor &newEntity, Sink::Storage::Transaction &transaction) {}; - virtual void deletedEntity(const QByteArray &uid, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &oldEntity, Sink::Storage::Transaction &transaction) {}; + virtual void newEntity(const QByteArray &uid, qint64 revision, ApplicationDomain::BufferAdaptor &newEntity, Storage::DataStore::Transaction &transaction) {}; + virtual void modifiedEntity(const QByteArray &uid, qint64 revision, const ApplicationDomain::BufferAdaptor &oldEntity, + ApplicationDomain::BufferAdaptor &newEntity, Storage::DataStore::Transaction &transaction) {}; + virtual void deletedEntity(const QByteArray &uid, qint64 revision, const ApplicationDomain::BufferAdaptor &oldEntity, Storage::DataStore::Transaction &transaction) {}; virtual void finalize(); void setup(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier, Pipeline *); @@ -94,9 +94,9 @@ protected: template void createEntity(const DomainType &entity) { - createEntity(entity, Sink::ApplicationDomain::getTypeName()); + createEntity(entity, ApplicationDomain::getTypeName()); } - void createEntity(const Sink::ApplicationDomain::ApplicationDomainType &entity, const QByteArray &type); + void createEntity(const ApplicationDomain::ApplicationDomainType &entity, const QByteArray &type); QByteArray resourceInstanceIdentifier() const; @@ -110,27 +110,27 @@ template class SINK_EXPORT EntityPreprocessor: public Preprocessor { public: - virtual void newEntity(DomainType &, Sink::Storage::Transaction &transaction) {}; - virtual void modifiedEntity(const DomainType &oldEntity, DomainType &newEntity, Sink::Storage::Transaction &transaction) {}; - virtual void deletedEntity(const DomainType &oldEntity, Sink::Storage::Transaction &transaction) {}; + virtual void newEntity(DomainType &, Storage::DataStore::Transaction &transaction) {}; + virtual void modifiedEntity(const DomainType &oldEntity, DomainType &newEntity, Storage::DataStore::Transaction &transaction) {}; + virtual void deletedEntity(const DomainType &oldEntity, Storage::DataStore::Transaction &transaction) {}; private: - static void nullDeleter(Sink::ApplicationDomain::BufferAdaptor *) {} - virtual void newEntity(const QByteArray &uid, qint64 revision, Sink::ApplicationDomain::BufferAdaptor &bufferAdaptor, Sink::Storage::Transaction &transaction) Q_DECL_OVERRIDE + static void nullDeleter(ApplicationDomain::BufferAdaptor *) {} + virtual void newEntity(const QByteArray &uid, qint64 revision, ApplicationDomain::BufferAdaptor &bufferAdaptor, Storage::DataStore::Transaction &transaction) Q_DECL_OVERRIDE { - auto o = DomainType("", uid, revision, QSharedPointer(&bufferAdaptor, nullDeleter)); + auto o = DomainType("", uid, revision, QSharedPointer(&bufferAdaptor, nullDeleter)); newEntity(o, transaction); } - virtual void modifiedEntity(const QByteArray &uid, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &oldEntity, - Sink::ApplicationDomain::BufferAdaptor &bufferAdaptor, Sink::Storage::Transaction &transaction) Q_DECL_OVERRIDE + virtual void modifiedEntity(const QByteArray &uid, qint64 revision, const ApplicationDomain::BufferAdaptor &oldEntity, + ApplicationDomain::BufferAdaptor &bufferAdaptor, Storage::DataStore::Transaction &transaction) Q_DECL_OVERRIDE { - auto o = DomainType("", uid, revision, QSharedPointer(&bufferAdaptor, nullDeleter)); - modifiedEntity(DomainType("", uid, 0, QSharedPointer(const_cast(&oldEntity), nullDeleter)), o, transaction); + auto o = DomainType("", uid, revision, QSharedPointer(&bufferAdaptor, nullDeleter)); + modifiedEntity(DomainType("", uid, 0, QSharedPointer(const_cast(&oldEntity), nullDeleter)), o, transaction); } - virtual void deletedEntity(const QByteArray &uid, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &bufferAdaptor, Sink::Storage::Transaction &transaction) Q_DECL_OVERRIDE + virtual void deletedEntity(const QByteArray &uid, qint64 revision, const ApplicationDomain::BufferAdaptor &bufferAdaptor, Storage::DataStore::Transaction &transaction) Q_DECL_OVERRIDE { - deletedEntity(DomainType("", uid, revision, QSharedPointer(const_cast(&bufferAdaptor), nullDeleter)), transaction); + deletedEntity(DomainType("", uid, revision, QSharedPointer(const_cast(&bufferAdaptor), nullDeleter)), transaction); } }; diff --git a/common/queryrunner.cpp b/common/queryrunner.cpp index f037cfc..e7963a3 100644 --- a/common/queryrunner.cpp +++ b/common/queryrunner.cpp @@ -28,11 +28,13 @@ #include "definitions.h" #include "domainadaptor.h" #include "asyncutils.h" -#include "entityreader.h" +#include "storage.h" +#include "datastorequery.h" SINK_DEBUG_AREA("queryrunner") using namespace Sink; +using namespace Sink::Storage; /* * This class wraps the actual query implementation. @@ -43,30 +45,28 @@ using namespace Sink; template class QueryWorker : public QObject { + typedef std::function &aggregateValues)> ResultCallback; // SINK_DEBUG_COMPONENT(mResourceInstanceIdentifier, mId) - SINK_DEBUG_COMPONENT(mResourceInstanceIdentifier) + SINK_DEBUG_COMPONENT(mResourceContext.resourceInstanceIdentifier) public: - QueryWorker(const Sink::Query &query, const QByteArray &instanceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &, const QByteArray &bufferType, - const QueryRunnerBase::ResultTransformation &transformation); + QueryWorker(const Sink::Query &query, const ResourceContext &context, const QByteArray &bufferType, const QueryRunnerBase::ResultTransformation &transformation); virtual ~QueryWorker(); + qint64 replaySet(ResultSet &resultSet, int offset, int batchSize, const ResultCallback &callback); QPair executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface &resultProvider); QPair executeInitialQuery(const Sink::Query &query, const typename DomainType::Ptr &parent, Sink::ResultProviderInterface &resultProvider, int offset, int batchsize); private: - Storage::Transaction getTransaction(); std::function &)> resultProviderCallback(const Sink::Query &query, Sink::ResultProviderInterface &resultProvider); QueryRunnerBase::ResultTransformation mResultTransformation; - DomainTypeAdaptorFactoryInterface::Ptr mDomainTypeAdaptorFactory; - QByteArray mResourceInstanceIdentifier; + ResourceContext mResourceContext; QByteArray mId; //Used for identification in debug output }; template -QueryRunner::QueryRunner(const Sink::Query &query, const Sink::ResourceAccessInterface::Ptr &resourceAccess, const QByteArray &instanceIdentifier, - const DomainTypeAdaptorFactoryInterface::Ptr &factory, const QByteArray &bufferType) - : QueryRunnerBase(), mResourceInstanceIdentifier(instanceIdentifier), mResourceAccess(resourceAccess), mResultProvider(new ResultProvider), mBatchSize(query.limit) +QueryRunner::QueryRunner(const Sink::Query &query, const Sink::ResourceContext &context, const QByteArray &bufferType) + : QueryRunnerBase(), mResourceContext(context), mResourceAccess(mResourceContext.resourceAccess()), mResultProvider(new ResultProvider), mBatchSize(query.limit) { SinkTrace() << "Starting query"; if (query.limit && query.sortProperty.isEmpty()) { @@ -79,16 +79,17 @@ QueryRunner::QueryRunner(const Sink::Query &query, const Sink::Resou SinkTrace() << "Running fetcher. Offset: " << mOffset[parentId] << " Batchsize: " << mBatchSize; auto resultProvider = mResultProvider; if (query.synchronousQuery) { - QueryWorker worker(query, instanceIdentifier, factory, bufferType, mResultTransformation); + QueryWorker worker(query, mResourceContext, bufferType, mResultTransformation); worker.executeInitialQuery(query, parent, *resultProvider, mOffset[parentId], mBatchSize); resultProvider->initialResultSetComplete(parent); } else { auto resultTransformation = mResultTransformation; auto offset = mOffset[parentId]; auto batchSize = mBatchSize; + auto resourceContext = mResourceContext; //The lambda will be executed in a separate thread, so we're extra careful - async::run >([resultTransformation, offset, batchSize, query, bufferType, instanceIdentifier, factory, resultProvider, parent]() { - QueryWorker worker(query, instanceIdentifier, factory, bufferType, resultTransformation); + async::run >([resultTransformation, offset, batchSize, query, bufferType, resourceContext, resultProvider, parent]() { + QueryWorker worker(query, resourceContext, bufferType, resultTransformation); const auto newRevisionAndReplayedEntities = worker.executeInitialQuery(query, parent, *resultProvider, offset, batchSize); return newRevisionAndReplayedEntities; }) @@ -115,8 +116,9 @@ QueryRunner::QueryRunner(const Sink::Query &query, const Sink::Resou // Incremental updates are always loaded directly, leaving it up to the result to discard the changes if they are not interesting setQuery([=]() -> KAsync::Job { auto resultProvider = mResultProvider; + auto resourceContext = mResourceContext; return async::run >([=]() { - QueryWorker worker(query, instanceIdentifier, factory, bufferType, mResultTransformation); + QueryWorker worker(query, resourceContext, bufferType, mResultTransformation); const auto newRevisionAndReplayedEntities = worker.executeIncrementalQuery(query, *resultProvider); return newRevisionAndReplayedEntities; }) @@ -158,11 +160,10 @@ typename Sink::ResultEmitter::Ptr QueryRunneremitter(); } - template -QueryWorker::QueryWorker(const Sink::Query &query, const QByteArray &instanceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &factory, +QueryWorker::QueryWorker(const Sink::Query &query, const Sink::ResourceContext &resourceContext, const QByteArray &bufferType, const QueryRunnerBase::ResultTransformation &transformation) - : QObject(), mResultTransformation(transformation), mDomainTypeAdaptorFactory(factory), mResourceInstanceIdentifier(instanceIdentifier), mId(QUuid::createUuid().toByteArray()) + : QObject(), mResultTransformation(transformation), mResourceContext(resourceContext), mId(QUuid::createUuid().toByteArray()) { SinkTrace() << "Starting query worker"; } @@ -202,42 +203,47 @@ std::function +qint64 QueryWorker::replaySet(ResultSet &resultSet, int offset, int batchSize, const ResultCallback &callback) +{ + SinkTrace() << "Skipping over " << offset << " results"; + resultSet.skip(offset); + int counter = 0; + while (!batchSize || (counter < batchSize)) { + const bool ret = + resultSet.next([this, &counter, callback](const ResultSet::Result &result) -> bool { + counter++; + auto adaptor = mResourceContext.adaptorFactory().createAdaptor(result.buffer.entity()); + Q_ASSERT(adaptor); + return callback(QSharedPointer::create(mResourceContext.instanceId(), result.uid, result.buffer.revision(), adaptor), result.operation, result.aggregateValues); + }); + if (!ret) { + break; + } + }; + SinkTrace() << "Replayed " << counter << " results." + << "Limit " << batchSize; + return counter; +} + template QPair QueryWorker::executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface &resultProvider) { QTime time; time.start(); - auto transaction = getTransaction(); + auto entityStore = EntityStore::Ptr::create(mResourceContext); - Sink::EntityReader reader(*mDomainTypeAdaptorFactory, mResourceInstanceIdentifier, transaction); - auto revisionAndReplayedEntities = reader.executeIncrementalQuery(query, resultProvider.revision(), resultProviderCallback(query, resultProvider)); - SinkTrace() << "Incremental query took: " << Log::TraceTime(time.elapsed()); - return revisionAndReplayedEntities; -} + const qint64 baseRevision = resultProvider.revision() + 1; -template -Storage::Transaction QueryWorker::getTransaction() -{ - Sink::Storage::Transaction transaction; - { - Sink::Storage storage(Sink::storageLocation(), mResourceInstanceIdentifier); - if (!storage.exists()) { - //This is not an error if the resource wasn't started before - SinkLog() << "Store doesn't exist: " << mResourceInstanceIdentifier; - return Sink::Storage::Transaction(); - } - storage.setDefaultErrorHandler([this](const Sink::Storage::Error &error) { SinkWarning() << "Error during query: " << error.store << error.message; }); - transaction = storage.createTransaction(Sink::Storage::ReadOnly); - } + auto preparedQuery = ApplicationDomain::TypeImplementation::prepareQuery(query, entityStore); + auto resultSet = preparedQuery->update(baseRevision); - //FIXME this is a temporary measure to recover from a failure to open the named databases correctly. - //Once the actual problem is fixed it will be enough to simply crash if we open the wrong database (which we check in openDatabase already). - while (!transaction.validateNamedDatabases()) { - Sink::Storage storage(Sink::storageLocation(), mResourceInstanceIdentifier); - transaction = storage.createTransaction(Sink::Storage::ReadOnly); - } - return transaction; + SinkTrace() << "Filtered set retrieved. " << Log::TraceTime(time.elapsed()); + auto replayedEntities = replaySet(resultSet, 0, 0, resultProviderCallback(query, resultProvider)); + + SinkTrace() << "Incremental query took: " << Log::TraceTime(time.elapsed()); + return qMakePair(entityStore->maxRevision(), replayedEntities); } template @@ -258,12 +264,16 @@ QPair QueryWorker::executeInitialQuery( } } - auto transaction = getTransaction(); + auto entityStore = EntityStore::Ptr::create(mResourceContext); + + auto preparedQuery = ApplicationDomain::TypeImplementation::prepareQuery(query, entityStore); + auto resultSet = preparedQuery->execute(); + + SinkTrace() << "Filtered set retrieved. " << Log::TraceTime(time.elapsed()); + auto replayedEntities = replaySet(resultSet, offset, batchsize, resultProviderCallback(query, resultProvider)); - Sink::EntityReader reader(*mDomainTypeAdaptorFactory, mResourceInstanceIdentifier, transaction); - auto revisionAndReplayedEntities = reader.executeInitialQuery(modifiedQuery, offset, batchsize, resultProviderCallback(query, resultProvider)); SinkTrace() << "Initial query took: " << Log::TraceTime(time.elapsed()); - return revisionAndReplayedEntities; + return qMakePair(entityStore->maxRevision(), replayedEntities); } template class QueryRunner; diff --git a/common/queryrunner.h b/common/queryrunner.h index 78aabf6..9bd4791 100644 --- a/common/queryrunner.h +++ b/common/queryrunner.h @@ -20,10 +20,10 @@ #pragma once #include +#include "resourcecontext.h" #include "resourceaccess.h" #include "resultprovider.h" #include "domaintypeadaptorfactoryinterface.h" -#include "storage.h" #include "query.h" #include "log.h" @@ -84,8 +84,7 @@ template class QueryRunner : public QueryRunnerBase { public: - QueryRunner(const Sink::Query &query, const Sink::ResourceAccessInterface::Ptr &, const QByteArray &instanceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &, - const QByteArray &bufferType); + QueryRunner(const Sink::Query &query, const Sink::ResourceContext &context, const QByteArray &bufferType); virtual ~QueryRunner(); /** @@ -97,8 +96,8 @@ public: typename Sink::ResultEmitter::Ptr emitter(); private: - QByteArray mResourceInstanceIdentifier; - SINK_DEBUG_COMPONENT(mResourceInstanceIdentifier) + Sink::ResourceContext mResourceContext; + SINK_DEBUG_COMPONENT(mResourceContext.resourceInstanceIdentifier) QSharedPointer mResourceAccess; QSharedPointer> mResultProvider; ResultTransformation mResultTransformation; diff --git a/common/remoteidmap.cpp b/common/remoteidmap.cpp index 2c3e5c7..da57cf6 100644 --- a/common/remoteidmap.cpp +++ b/common/remoteidmap.cpp @@ -27,7 +27,7 @@ using namespace Sink; SINK_DEBUG_AREA("remoteidmap") -RemoteIdMap::RemoteIdMap(Sink::Storage::Transaction &transaction) +RemoteIdMap::RemoteIdMap(Sink::Storage::DataStore::Transaction &transaction) : mTransaction(transaction) { @@ -58,7 +58,7 @@ QByteArray RemoteIdMap::resolveRemoteId(const QByteArray &bufferType, const QByt Index index("rid.mapping." + bufferType, mTransaction); QByteArray sinkId = index.lookup(remoteId); if (sinkId.isEmpty()) { - sinkId = Sink::Storage::generateUid(); + sinkId = Sink::Storage::DataStore::generateUid(); index.add(remoteId, sinkId); Index("localid.mapping." + bufferType, mTransaction).add(sinkId, remoteId); } @@ -81,7 +81,7 @@ QByteArray RemoteIdMap::readValue(const QByteArray &key) mTransaction.openDatabase("values").scan(key, [&value](const QByteArray &, const QByteArray &v) { value = v; return false; - }, [](const Sink::Storage::Error &) { + }, [](const Sink::Storage::DataStore::Error &) { //Ignore errors because we may not find the value }); return value; diff --git a/common/remoteidmap.h b/common/remoteidmap.h index bf08621..32c5efd 100644 --- a/common/remoteidmap.h +++ b/common/remoteidmap.h @@ -31,7 +31,7 @@ namespace Sink { class SINK_EXPORT RemoteIdMap { public: - RemoteIdMap(Sink::Storage::Transaction &); + RemoteIdMap(Sink::Storage::DataStore::Transaction &); /** * Records a localId to remoteId mapping @@ -58,7 +58,7 @@ public: void writeValue(const QByteArray &key, const QByteArray &value); private: - Sink::Storage::Transaction &mTransaction; + Sink::Storage::DataStore::Transaction &mTransaction; }; } diff --git a/common/resource.h b/common/resource.h index d468aca..426585d 100644 --- a/common/resource.h +++ b/common/resource.h @@ -27,6 +27,7 @@ namespace Sink { class FacadeFactory; class AdaptorFactoryRegistry; +class ResourceContext; /** * Resource interface @@ -75,7 +76,7 @@ public: ResourceFactory(QObject *parent); virtual ~ResourceFactory(); - virtual Resource *createResource(const QByteArray &instanceIdentifier) = 0; + virtual Resource *createResource(const ResourceContext &context) = 0; virtual void registerFacades(FacadeFactory &factory) = 0; virtual void registerAdaptorFactories(AdaptorFactoryRegistry ®istry) {}; virtual void removeDataFromDisk(const QByteArray &instanceIdentifier) = 0; diff --git a/common/resourcecontext.h b/common/resourcecontext.h new file mode 100644 index 0000000..6058ac7 --- /dev/null +++ b/common/resourcecontext.h @@ -0,0 +1,77 @@ +/* + * Copyright (C) 2016 Christian Mollekopf + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) version 3, or any + * later version accepted by the membership of KDE e.V. (or its + * successor approved by the membership of KDE e.V.), which shall + * act as a proxy defined in Section 6 of version 3 of the license. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library. If not, see . + */ +#pragma once + +#include "domaintypeadaptorfactoryinterface.h" +#include "applicationdomaintype.h" +#include "resourceaccess.h" +#include +#include + +namespace Sink { + +/* + * A context object that can be passed around so each part of the system knows in what context it works. + * + * This is necessary because we can't rely on a singleton or thread-local storage since multiple resources can be accessed from the same thread/process. + */ +struct ResourceContext { + const QByteArray resourceInstanceIdentifier; + const QByteArray resourceType; + QMap adaptorFactories; + //TODO prehaps use a weak pointer to not mess up lifetime management + ResourceAccessInterface::Ptr mResourceAccess; + + + ResourceContext(const QByteArray &identifier, const QByteArray &resourceType_, const QMap &factories = QMap()) + : resourceInstanceIdentifier(identifier), + resourceType(resourceType_), + adaptorFactories(factories) + { + } + + QByteArray instanceId() const + { + return resourceInstanceIdentifier; + } + + DomainTypeAdaptorFactoryInterface &adaptorFactory(const QByteArray &type) const + { + auto factory = adaptorFactories.value(type); + Q_ASSERT(factory); + return *factory; + } + + template + DomainTypeAdaptorFactoryInterface &adaptorFactory() + { + return adaptorFactory(ApplicationDomain::getTypeName()); + } + + ResourceAccessInterface::Ptr resourceAccess() + { + if (!mResourceAccess) { + mResourceAccess = ResourceAccessFactory::instance().getAccess(resourceInstanceIdentifier, resourceType); + } + return mResourceAccess; + } +}; + +} diff --git a/common/sourcewriteback.cpp b/common/sourcewriteback.cpp index 702d8e3..204793e 100644 --- a/common/sourcewriteback.cpp +++ b/common/sourcewriteback.cpp @@ -22,6 +22,8 @@ #include "definitions.h" #include "log.h" #include "bufferutils.h" +#include "entitybuffer.h" +#include "entity_generated.h" #define ENTITY_TYPE_MAIL "mail" #define ENTITY_TYPE_FOLDER "folder" @@ -30,21 +32,21 @@ SINK_DEBUG_AREA("sourcewriteback") using namespace Sink; -SourceWriteBack::SourceWriteBack(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier) - : ChangeReplay(resourceInstanceIdentifier), - mSyncStorage(Sink::storageLocation(), resourceInstanceIdentifier + ".synchronization", Sink::Storage::ReadWrite), - mResourceType(resourceType), - mResourceInstanceIdentifier(resourceInstanceIdentifier) +SourceWriteBack::SourceWriteBack(const ResourceContext &context) + : ChangeReplay(context), + mResourceContext(context), + mSyncStorage(Sink::storageLocation(), context.instanceId() + ".synchronization", Sink::Storage::DataStore::ReadWrite), + mEntityStore(QSharedPointer::create(mResourceContext)) { } EntityStore &SourceWriteBack::store() { - if (!mEntityStore) { - mEntityStore = QSharedPointer::create(mResourceType, mResourceInstanceIdentifier, mTransaction); + if (!mEntityStoreWrapper) { + mEntityStoreWrapper = QSharedPointer::create(*mEntityStore); } - return *mEntityStore; + return *mEntityStoreWrapper; } RemoteIdMap &SourceWriteBack::syncStore() @@ -76,15 +78,14 @@ KAsync::Job SourceWriteBack::replay(const QByteArray &type, const QByteArr const auto metadataBuffer = Sink::EntityBuffer::readBuffer(entity.metadata()); Q_ASSERT(metadataBuffer); Q_ASSERT(!mSyncStore); - Q_ASSERT(!mEntityStore); - Q_ASSERT(!mTransaction); + Q_ASSERT(!mEntityStoreWrapper); Q_ASSERT(!mSyncTransaction); - mTransaction = mStorage.createTransaction(Sink::Storage::ReadOnly); - mSyncTransaction = mSyncStorage.createTransaction(Sink::Storage::ReadWrite); + mEntityStore->startTransaction(Storage::DataStore::ReadOnly); + mSyncTransaction = mSyncStorage.createTransaction(Sink::Storage::DataStore::ReadWrite); // const qint64 revision = metadataBuffer ? metadataBuffer->revision() : -1; const auto operation = metadataBuffer ? metadataBuffer->operation() : Sink::Operation_Creation; - const auto uid = Sink::Storage::uidFromKey(key); + const auto uid = Sink::Storage::DataStore::uidFromKey(key); const auto modifiedProperties = metadataBuffer->modifiedProperties() ? BufferUtils::fromVector(*metadataBuffer->modifiedProperties()) : QByteArrayList(); QByteArray oldRemoteId; @@ -133,9 +134,9 @@ KAsync::Job SourceWriteBack::replay(const QByteArray &type, const QByteArr SinkWarning() << "Failed to replay change: " << error.errorMessage; } mSyncStore.clear(); - mEntityStore.clear(); - mTransaction.abort(); + mEntityStoreWrapper.clear(); mSyncTransaction.commit(); + mEntityStore->abortTransaction(); }); } diff --git a/common/sourcewriteback.h b/common/sourcewriteback.h index 8031573..327d1ad 100644 --- a/common/sourcewriteback.h +++ b/common/sourcewriteback.h @@ -25,6 +25,7 @@ #include "storage.h" #include "entitystore.h" #include "remoteidmap.h" +#include "metadata_generated.h" namespace Sink { @@ -34,7 +35,7 @@ namespace Sink { class SINK_EXPORT SourceWriteBack : public ChangeReplay { public: - SourceWriteBack(const QByteArray &resourceType,const QByteArray &resourceInstanceIdentifier); + SourceWriteBack(const ResourceContext &resourceContext); protected: ///Base implementation calls the replay$Type calls @@ -58,12 +59,12 @@ protected: private: //Read only access to main storage EntityStore &store(); - - Sink::Storage mSyncStorage; + ResourceContext mResourceContext; + Sink::Storage::DataStore mSyncStorage; QSharedPointer mSyncStore; - QSharedPointer mEntityStore; - Sink::Storage::Transaction mTransaction; - Sink::Storage::Transaction mSyncTransaction; + QSharedPointer mEntityStore; + QSharedPointer mEntityStoreWrapper; + Sink::Storage::DataStore::Transaction mSyncTransaction; QByteArray mResourceType; QByteArray mResourceInstanceIdentifier; }; diff --git a/common/specialpurposepreprocessor.cpp b/common/specialpurposepreprocessor.cpp index b9ad94a..a6ee373 100644 --- a/common/specialpurposepreprocessor.cpp +++ b/common/specialpurposepreprocessor.cpp @@ -11,6 +11,8 @@ static QHash specialPurposeFolders() { QHash hash; //FIXME localize + //TODO inbox + //TODO use standardized values hash.insert("drafts", "Drafts"); hash.insert("trash", "Trash"); hash.insert("inbox", "Inbox"); @@ -45,31 +47,31 @@ QByteArray getSpecialPurposeType(const QString &name) SpecialPurposeProcessor::SpecialPurposeProcessor(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier) : mResourceType(resourceType), mResourceInstanceIdentifier(resourceInstanceIdentifier) {} -QByteArray SpecialPurposeProcessor::ensureFolder(Sink::Storage::Transaction &transaction, const QByteArray &specialPurpose) +QByteArray SpecialPurposeProcessor::ensureFolder(Sink::Storage::DataStore::Transaction &transaction, const QByteArray &specialPurpose) { - if (!mSpecialPurposeFolders.contains(specialPurpose)) { - //Try to find an existing drafts folder - Sink::EntityReader reader(mResourceType, mResourceInstanceIdentifier, transaction); - reader.query(Sink::Query().filter(Query::Comparator(specialPurpose, Query::Comparator::Contains)), - [this, specialPurpose](const ApplicationDomain::Folder &f) -> bool{ - mSpecialPurposeFolders.insert(specialPurpose, f.identifier()); - return false; - }); - if (!mSpecialPurposeFolders.contains(specialPurpose)) { - SinkTrace() << "Failed to find a drafts folder, creating a new one"; - auto folder = ApplicationDomain::Folder::create(mResourceInstanceIdentifier); - folder.setSpecialPurpose(QByteArrayList() << specialPurpose); - folder.setName(sSpecialPurposeFolders.value(specialPurpose)); - folder.setIcon("folder"); - //This processes the pipeline synchronously - createEntity(folder); - mSpecialPurposeFolders.insert(specialPurpose, folder.identifier()); - } - } + /* if (!mSpecialPurposeFolders.contains(specialPurpose)) { */ + /* //Try to find an existing drafts folder */ + /* Sink::EntityReader reader(mResourceType, mResourceInstanceIdentifier, transaction); */ + /* reader.query(Sink::Query().filter(Query::Comparator(specialPurpose, Query::Comparator::Contains)), */ + /* [this, specialPurpose](const ApplicationDomain::Folder &f) -> bool{ */ + /* mSpecialPurposeFolders.insert(specialPurpose, f.identifier()); */ + /* return false; */ + /* }); */ + /* if (!mSpecialPurposeFolders.contains(specialPurpose)) { */ + /* SinkTrace() << "Failed to find a drafts folder, creating a new one"; */ + /* auto folder = ApplicationDomain::Folder::create(mResourceInstanceIdentifier); */ + /* folder.setSpecialPurpose(QByteArrayList() << specialPurpose); */ + /* folder.setName(sSpecialPurposeFolders.value(specialPurpose)); */ + /* folder.setIcon("folder"); */ + /* //This processes the pipeline synchronously */ + /* createEntity(folder); */ + /* mSpecialPurposeFolders.insert(specialPurpose, folder.identifier()); */ + /* } */ + /* } */ return mSpecialPurposeFolders.value(specialPurpose); } -void SpecialPurposeProcessor::moveToFolder(Sink::ApplicationDomain::BufferAdaptor &newEntity, Sink::Storage::Transaction &transaction) +void SpecialPurposeProcessor::moveToFolder(Sink::ApplicationDomain::BufferAdaptor &newEntity, Sink::Storage::DataStore::Transaction &transaction) { if (newEntity.getProperty("trash").toBool()) { newEntity.setProperty("folder", ensureFolder(transaction, "trash")); @@ -80,12 +82,12 @@ void SpecialPurposeProcessor::moveToFolder(Sink::ApplicationDomain::BufferAdapto } } -void SpecialPurposeProcessor::newEntity(const QByteArray &uid, qint64 revision, Sink::ApplicationDomain::BufferAdaptor &newEntity, Sink::Storage::Transaction &transaction) +void SpecialPurposeProcessor::newEntity(const QByteArray &uid, qint64 revision, Sink::ApplicationDomain::BufferAdaptor &newEntity, Sink::Storage::DataStore::Transaction &transaction) { moveToFolder(newEntity, transaction); } -void SpecialPurposeProcessor::modifiedEntity(const QByteArray &uid, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &oldEntity, Sink::ApplicationDomain::BufferAdaptor &newEntity, Sink::Storage::Transaction &transaction) +void SpecialPurposeProcessor::modifiedEntity(const QByteArray &uid, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &oldEntity, Sink::ApplicationDomain::BufferAdaptor &newEntity, Sink::Storage::DataStore::Transaction &transaction) { moveToFolder(newEntity, transaction); } diff --git a/common/specialpurposepreprocessor.h b/common/specialpurposepreprocessor.h index a33701b..8b2d9e9 100644 --- a/common/specialpurposepreprocessor.h +++ b/common/specialpurposepreprocessor.h @@ -30,12 +30,12 @@ class SINK_EXPORT SpecialPurposeProcessor : public Sink::Preprocessor public: SpecialPurposeProcessor(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier); - QByteArray ensureFolder(Sink::Storage::Transaction &transaction, const QByteArray &specialPurpose); + QByteArray ensureFolder(Sink::Storage::DataStore::Transaction &transaction, const QByteArray &specialPurpose); - void moveToFolder(Sink::ApplicationDomain::BufferAdaptor &newEntity, Sink::Storage::Transaction &transaction); + void moveToFolder(Sink::ApplicationDomain::BufferAdaptor &newEntity, Sink::Storage::DataStore::Transaction &transaction); - void newEntity(const QByteArray &uid, qint64 revision, Sink::ApplicationDomain::BufferAdaptor &newEntity, Sink::Storage::Transaction &transaction) Q_DECL_OVERRIDE; - void modifiedEntity(const QByteArray &uid, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &oldEntity, Sink::ApplicationDomain::BufferAdaptor &newEntity, Sink::Storage::Transaction &transaction) Q_DECL_OVERRIDE; + void newEntity(const QByteArray &uid, qint64 revision, Sink::ApplicationDomain::BufferAdaptor &newEntity, Sink::Storage::DataStore::Transaction &transaction) Q_DECL_OVERRIDE; + void modifiedEntity(const QByteArray &uid, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &oldEntity, Sink::ApplicationDomain::BufferAdaptor &newEntity, Sink::Storage::DataStore::Transaction &transaction) Q_DECL_OVERRIDE; QHash mSpecialPurposeFolders; QByteArray mResourceType; diff --git a/common/storage.h b/common/storage.h index 4ef20d5..e368b05 100644 --- a/common/storage.h +++ b/common/storage.h @@ -27,8 +27,9 @@ #include namespace Sink { +namespace Storage { -class SINK_EXPORT Storage +class SINK_EXPORT DataStore { public: enum AccessMode @@ -66,16 +67,16 @@ public: /** * Write a value */ - bool write(const QByteArray &key, const QByteArray &value, const std::function &errorHandler = std::function()); + bool write(const QByteArray &key, const QByteArray &value, const std::function &errorHandler = std::function()); /** * Remove a key */ - void remove(const QByteArray &key, const std::function &errorHandler = std::function()); + void remove(const QByteArray &key, const std::function &errorHandler = std::function()); /** * Remove a key-value pair */ - void remove(const QByteArray &key, const QByteArray &value, const std::function &errorHandler = std::function()); + void remove(const QByteArray &key, const QByteArray &value, const std::function &errorHandler = std::function()); /** * Read values with a given key. @@ -87,7 +88,7 @@ public: * @return The number of values retrieved. */ int scan(const QByteArray &key, const std::function &resultHandler, - const std::function &errorHandler = std::function(), bool findSubstringKeys = false, bool skipInternalKeys = true) const; + const std::function &errorHandler = std::function(), bool findSubstringKeys = false, bool skipInternalKeys = true) const; /** * Finds the last value in a series matched by prefix. @@ -96,7 +97,7 @@ public: * Note that this relies on a key scheme like $uid$revision. */ void findLatest(const QByteArray &uid, const std::function &resultHandler, - const std::function &errorHandler = std::function()) const; + const std::function &errorHandler = std::function()) const; /** * Returns true if the database contains the substring key. @@ -127,14 +128,14 @@ public: public: Transaction(); ~Transaction(); - bool commit(const std::function &errorHandler = std::function()); + bool commit(const std::function &errorHandler = std::function()); void abort(); QList getDatabaseNames() const; bool validateNamedDatabases(); NamedDatabase openDatabase(const QByteArray &name = QByteArray("default"), - const std::function &errorHandler = std::function(), bool allowDuplicates = false) const; + const std::function &errorHandler = std::function(), bool allowDuplicates = false) const; Transaction(Transaction &&other); Transaction &operator=(Transaction &&other); @@ -144,29 +145,29 @@ public: private: Transaction(Transaction &other); Transaction &operator=(Transaction &other); - friend Storage; + friend DataStore; class Private; Transaction(Private *); Private *d; }; - Storage(const QString &storageRoot, const QString &name, AccessMode mode = ReadOnly); - ~Storage(); + DataStore(const QString &storageRoot, const QString &name, AccessMode mode = ReadOnly); + ~DataStore(); - Transaction createTransaction(AccessMode mode = ReadWrite, const std::function &errorHandler = std::function()); + Transaction createTransaction(AccessMode mode = ReadWrite, const std::function &errorHandler = std::function()); /** * Set the default error handler. */ - void setDefaultErrorHandler(const std::function &errorHandler); - std::function defaultErrorHandler() const; + void setDefaultErrorHandler(const std::function &errorHandler); + std::function defaultErrorHandler() const; /** * A basic error handler that writes to std::cerr. * * Used if nothing else is configured. */ - static std::function basicErrorHandler(); + static std::function basicErrorHandler(); qint64 diskUsage() const; void removeFromDisk() const; @@ -178,16 +179,16 @@ public: */ static void clearEnv(); - static qint64 maxRevision(const Sink::Storage::Transaction &); - static void setMaxRevision(Sink::Storage::Transaction &, qint64 revision); + static qint64 maxRevision(const Transaction &); + static void setMaxRevision(Transaction &, qint64 revision); - static qint64 cleanedUpRevision(const Sink::Storage::Transaction &); - static void setCleanedUpRevision(Sink::Storage::Transaction &, qint64 revision); + static qint64 cleanedUpRevision(const Transaction &); + static void setCleanedUpRevision(Transaction &, qint64 revision); - static QByteArray getUidFromRevision(const Sink::Storage::Transaction &, qint64 revision); - static QByteArray getTypeFromRevision(const Sink::Storage::Transaction &, qint64 revision); - static void recordRevision(Sink::Storage::Transaction &, qint64 revision, const QByteArray &uid, const QByteArray &type); - static void removeRevision(Sink::Storage::Transaction &, qint64 revision); + static QByteArray getUidFromRevision(const Transaction &, qint64 revision); + static QByteArray getTypeFromRevision(const Transaction &, qint64 revision); + static void recordRevision(Transaction &, qint64 revision, const QByteArray &uid, const QByteArray &type); + static void removeRevision(Transaction &, qint64 revision); bool exists() const; @@ -199,16 +200,17 @@ public: static QByteArray uidFromKey(const QByteArray &key); static qint64 revisionFromKey(const QByteArray &key); - static NamedDatabase mainDatabase(const Sink::Storage::Transaction &, const QByteArray &type); + static NamedDatabase mainDatabase(const Transaction &, const QByteArray &type); static QByteArray generateUid(); private: - std::function mErrorHandler; + std::function mErrorHandler; private: class Private; Private *const d; }; +} } // namespace Sink diff --git a/common/storage/entitystore.cpp b/common/storage/entitystore.cpp new file mode 100644 index 0000000..9615eca --- /dev/null +++ b/common/storage/entitystore.cpp @@ -0,0 +1,338 @@ +/* + * Copyright (C) 2016 Christian Mollekopf + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) version 3, or any + * later version accepted by the membership of KDE e.V. (or its + * successor approved by the membership of KDE e.V.), which shall + * act as a proxy defined in Section 6 of version 3 of the license. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library. If not, see . + */ +#include "entitystore.h" + +#include "entitybuffer.h" +#include "log.h" +#include "typeindex.h" +#include "definitions.h" +#include "resourcecontext.h" +#include "index.h" + +#include "mail.h" +#include "folder.h" +#include "event.h" + +using namespace Sink; +using namespace Sink::Storage; + +SINK_DEBUG_AREA("entitystore"); + +class EntityStore::Private { +public: + Private(const ResourceContext &context) : resourceContext(context) {} + + ResourceContext resourceContext; + DataStore::Transaction transaction; + QHash > indexByType; + + DataStore::Transaction &getTransaction() + { + if (transaction) { + return transaction; + } + + Sink::Storage::DataStore store(Sink::storageLocation(), resourceContext.instanceId(), DataStore::ReadOnly); + transaction = store.createTransaction(DataStore::ReadOnly); + Q_ASSERT(transaction.validateNamedDatabases()); + return transaction; + } + + /* template */ + /* TypeIndex &typeIndex(const QByteArray &type) */ + /* { */ + /* if (indexByType.contains(type)) { */ + /* return *indexByType.value(type); */ + /* } */ + /* auto index = QSharedPointer::create(type); */ + /* ApplicationDomain::TypeImplementation::configureIndex(*index); */ + /* indexByType.insert(type, index); */ + /* return *index; */ + /* } */ + + TypeIndex &typeIndex(const QByteArray &type) + { + /* return applyType(type); */ + if (indexByType.contains(type)) { + return *indexByType.value(type); + } + auto index = QSharedPointer::create(type); + //TODO expand for all types + /* TypeHelper::configureIndex(*index); */ + // Try this: (T would i.e. become + // TypeHelper::T::configureIndex(*index); + if (type == ApplicationDomain::getTypeName()) { + ApplicationDomain::TypeImplementation::configureIndex(*index); + } else if (type == ApplicationDomain::getTypeName()) { + ApplicationDomain::TypeImplementation::configureIndex(*index); + } else if (type == ApplicationDomain::getTypeName()) { + ApplicationDomain::TypeImplementation::configureIndex(*index); + } else { + Q_ASSERT(false); + SinkError() << "Unkonwn type " << type; + } + indexByType.insert(type, index); + return *index; + } +}; + +EntityStore::EntityStore(const ResourceContext &context) + : d(new EntityStore::Private{context}) +{ + +} + +void EntityStore::startTransaction(Sink::Storage::DataStore::AccessMode accessMode) +{ + Sink::Storage::DataStore store(Sink::storageLocation(), d->resourceContext.instanceId(), accessMode); + d->transaction = store.createTransaction(accessMode); + Q_ASSERT(d->transaction.validateNamedDatabases()); +} + +void EntityStore::commitTransaction() +{ + d->transaction.commit(); + d->transaction = Storage::DataStore::Transaction(); +} + +void EntityStore::abortTransaction() +{ + d->transaction.abort(); + d->transaction = Storage::DataStore::Transaction(); +} + +QVector EntityStore::fullScan(const QByteArray &type) +{ + SinkTrace() << "Looking for : " << type; + //The scan can return duplicate results if we have multiple revisions, so we use a set to deduplicate. + QSet keys; + DataStore::mainDatabase(d->getTransaction(), type) + .scan(QByteArray(), + [&](const QByteArray &key, const QByteArray &value) -> bool { + const auto uid = DataStore::uidFromKey(key); + if (keys.contains(uid)) { + //Not something that should persist if the replay works, so we keep a message for now. + SinkTrace() << "Multiple revisions for key: " << key; + } + keys << uid; + return true; + }, + [](const DataStore::Error &error) { SinkWarning() << "Error during query: " << error.message; }); + + SinkTrace() << "Full scan retrieved " << keys.size() << " results."; + return keys.toList().toVector(); +} + +QVector EntityStore::indexLookup(const QByteArray &type, const Query &query, QSet &appliedFilters, QByteArray &appliedSorting) +{ + return d->typeIndex(type).query(query, appliedFilters, appliedSorting, d->getTransaction()); +} + +QVector EntityStore::indexLookup(const QByteArray &type, const QByteArray &property, const QVariant &value) +{ + return d->typeIndex(type).lookup(property, value, d->getTransaction()); +} + +void EntityStore::indexLookup(const QByteArray &type, const QByteArray &property, const QVariant &value, const std::function &callback) +{ + auto list = d->typeIndex(type).lookup(property, value, d->getTransaction()); + for (const auto &uid : list) { + callback(uid); + } + /* Index index(type + ".index." + property, d->transaction); */ + /* index.lookup(value, [&](const QByteArray &sinkId) { */ + /* callback(sinkId); */ + /* }, */ + /* [&](const Index::Error &error) { */ + /* SinkWarning() << "Error in index: " << error.message << property; */ + /* }); */ +} + +void EntityStore::readLatest(const QByteArray &type, const QByteArray &uid, const std::function callback) +{ + auto db = DataStore::mainDatabase(d->getTransaction(), type); + db.findLatest(uid, + [=](const QByteArray &key, const QByteArray &value) -> bool { + callback(DataStore::uidFromKey(key), Sink::EntityBuffer(value.data(), value.size())); + return false; + }, + [&](const DataStore::Error &error) { SinkWarning() << "Error during query: " << error.message << uid; }); +} + +void EntityStore::readLatest(const QByteArray &type, const QByteArray &uid, const std::function callback) +{ + readLatest(type, uid, [&](const QByteArray &uid, const EntityBuffer &buffer) { + auto adaptor = d->resourceContext.adaptorFactory(type).createAdaptor(buffer.entity()); + callback(ApplicationDomain::ApplicationDomainType{d->resourceContext.instanceId(), uid, DataStore::maxRevision(d->getTransaction()), adaptor}); + }); +} + +ApplicationDomain::ApplicationDomainType EntityStore::readLatest(const QByteArray &type, const QByteArray &uid) +{ + ApplicationDomain::ApplicationDomainType dt; + readLatest(type, uid, [&](const ApplicationDomain::ApplicationDomainType &entity) { + dt = entity; + }); + return dt; +} + +void EntityStore::readEntity(const QByteArray &type, const QByteArray &key, const std::function callback) +{ + auto db = DataStore::mainDatabase(d->getTransaction(), type); + db.scan(key, + [=](const QByteArray &key, const QByteArray &value) -> bool { + callback(DataStore::uidFromKey(key), Sink::EntityBuffer(value.data(), value.size())); + return false; + }, + [&](const DataStore::Error &error) { SinkWarning() << "Error during query: " << error.message << key; }); +} + +void EntityStore::readEntity(const QByteArray &type, const QByteArray &uid, const std::function callback) +{ + readEntity(type, uid, [&](const QByteArray &uid, const EntityBuffer &buffer) { + auto adaptor = d->resourceContext.adaptorFactory(type).createAdaptor(buffer.entity()); + callback(ApplicationDomain::ApplicationDomainType{d->resourceContext.instanceId(), uid, DataStore::maxRevision(d->getTransaction()), adaptor}); + }); +} + +ApplicationDomain::ApplicationDomainType EntityStore::readEntity(const QByteArray &type, const QByteArray &uid) +{ + ApplicationDomain::ApplicationDomainType dt; + readEntity(type, uid, [&](const ApplicationDomain::ApplicationDomainType &entity) { + dt = entity; + }); + return dt; +} + + +void EntityStore::readAll(const QByteArray &type, const std::function &callback) +{ + auto db = DataStore::mainDatabase(d->getTransaction(), type); + db.scan("", + [=](const QByteArray &key, const QByteArray &value) -> bool { + auto uid = DataStore::uidFromKey(key); + auto buffer = Sink::EntityBuffer{value.data(), value.size()}; + auto adaptor = d->resourceContext.adaptorFactory(type).createAdaptor(buffer.entity()); + callback(ApplicationDomain::ApplicationDomainType{d->resourceContext.instanceId(), uid, DataStore::maxRevision(d->getTransaction()), adaptor}); + return true; + }, + [&](const DataStore::Error &error) { SinkWarning() << "Error during query: " << error.message; }); +} + +void EntityStore::readRevisions(qint64 baseRevision, const QByteArray &expectedType, const std::function &callback) +{ + qint64 revisionCounter = baseRevision; + const qint64 topRevision = DataStore::maxRevision(d->getTransaction()); + // Spit out the revision keys one by one. + while (revisionCounter <= topRevision) { + const auto uid = DataStore::getUidFromRevision(d->getTransaction(), revisionCounter); + const auto type = DataStore::getTypeFromRevision(d->getTransaction(), revisionCounter); + // SinkTrace() << "Revision" << *revisionCounter << type << uid; + Q_ASSERT(!uid.isEmpty()); + Q_ASSERT(!type.isEmpty()); + if (type != expectedType) { + // Skip revision + revisionCounter++; + continue; + } + const auto key = DataStore::assembleKey(uid, revisionCounter); + revisionCounter++; + callback(key); + } +} + +void EntityStore::readPrevious(const QByteArray &type, const QByteArray &uid, qint64 revision, const std::function callback) +{ + auto db = DataStore::mainDatabase(d->getTransaction(), type); + qint64 latestRevision = 0; + db.scan(uid, + [&latestRevision, revision](const QByteArray &key, const QByteArray &) -> bool { + const auto foundRevision = Sink::Storage::DataStore::revisionFromKey(key); + if (foundRevision < revision && foundRevision > latestRevision) { + latestRevision = foundRevision; + } + return true; + }, + [](const Sink::Storage::DataStore::Error &error) { SinkWarning() << "Failed to read current value from storage: " << error.message; }, true); + return readEntity(type, Sink::Storage::DataStore::assembleKey(uid, latestRevision), callback); +} + +void EntityStore::readPrevious(const QByteArray &type, const QByteArray &uid, qint64 revision, const std::function callback) +{ + readPrevious(type, uid, revision, [&](const QByteArray &uid, const EntityBuffer &buffer) { + auto adaptor = d->resourceContext.adaptorFactory(type).createAdaptor(buffer.entity()); + callback(ApplicationDomain::ApplicationDomainType{d->resourceContext.instanceId(), uid, DataStore::maxRevision(d->getTransaction()), adaptor}); + }); +} + +ApplicationDomain::ApplicationDomainType EntityStore::readPrevious(const QByteArray &type, const QByteArray &uid, qint64 revision) +{ + ApplicationDomain::ApplicationDomainType dt; + readPrevious(type, uid, revision, [&](const ApplicationDomain::ApplicationDomainType &entity) { + dt = entity; + }); + return dt; +} + +void EntityStore::readAllUids(const QByteArray &type, const std::function callback) +{ + //TODO use uid index instead + //FIXME we currently report each uid for every revision with the same uid + auto db = DataStore::mainDatabase(d->getTransaction(), type); + db.scan("", + [callback](const QByteArray &key, const QByteArray &) -> bool { + callback(Sink::Storage::DataStore::uidFromKey(key)); + return true; + }, + [](const Sink::Storage::DataStore::Error &error) { SinkWarning() << "Failed to read current value from storage: " << error.message; }); +} + +bool EntityStore::contains(const QByteArray &type, const QByteArray &uid) +{ + return DataStore::mainDatabase(d->getTransaction(), type).contains(uid); +} + +qint64 EntityStore::maxRevision() +{ + return DataStore::maxRevision(d->getTransaction()); +} + +/* DataStore::Transaction getTransaction() */ +/* { */ +/* Sink::Storage::DataStore::Transaction transaction; */ +/* { */ +/* Sink::Storage::DataStore storage(Sink::storageLocation(), mResourceInstanceIdentifier); */ +/* if (!storage.exists()) { */ +/* //This is not an error if the resource wasn't started before */ +/* SinkLog() << "Store doesn't exist: " << mResourceInstanceIdentifier; */ +/* return Sink::Storage::DataStore::Transaction(); */ +/* } */ +/* storage.setDefaultErrorHandler([this](const Sink::Storage::DataStore::Error &error) { SinkWarning() << "Error during query: " << error.store << error.message; }); */ +/* transaction = storage.createTransaction(Sink::Storage::DataStore::ReadOnly); */ +/* } */ + +/* //FIXME this is a temporary measure to recover from a failure to open the named databases correctly. */ +/* //Once the actual problem is fixed it will be enough to simply crash if we open the wrong database (which we check in openDatabase already). */ +/* while (!transaction.validateNamedDatabases()) { */ +/* Sink::Storage::DataStore storage(Sink::storageLocation(), mResourceInstanceIdentifier); */ +/* transaction = storage.createTransaction(Sink::Storage::DataStore::ReadOnly); */ +/* } */ +/* return transaction; */ +/* } */ diff --git a/common/storage/entitystore.h b/common/storage/entitystore.h new file mode 100644 index 0000000..de29e87 --- /dev/null +++ b/common/storage/entitystore.h @@ -0,0 +1,109 @@ +/* + * Copyright (C) 2016 Christian Mollekopf + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) version 3, or any + * later version accepted by the membership of KDE e.V. (or its + * successor approved by the membership of KDE e.V.), which shall + * act as a proxy defined in Section 6 of version 3 of the license. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library. If not, see . + */ +#pragma once + +#include "sink_export.h" + +#include +#include "domaintypeadaptorfactoryinterface.h" +#include "query.h" +#include "storage.h" +#include "resourcecontext.h" + +namespace Sink { +class EntityBuffer; +namespace Storage { + +class SINK_EXPORT EntityStore +{ +public: + typedef QSharedPointer Ptr; + EntityStore(const ResourceContext &resourceContext); + + void add(const ApplicationDomain::ApplicationDomainType &); + void modify(const ApplicationDomain::ApplicationDomainType &); + void remove(const ApplicationDomain::ApplicationDomainType &); + + void startTransaction(Sink::Storage::DataStore::AccessMode); + void commitTransaction(); + void abortTransaction(); + + QVector fullScan(const QByteArray &type); + QVector indexLookup(const QByteArray &type, const Query &query, QSet &appliedFilters, QByteArray &appliedSorting); + QVector indexLookup(const QByteArray &type, const QByteArray &property, const QVariant &value); + void indexLookup(const QByteArray &type, const QByteArray &property, const QVariant &value, const std::function &callback); + template + void indexLookup(const QVariant &value, const std::function &callback) { + return indexLookup(ApplicationDomain::getTypeName(), PropertyType::name, value, callback); + } + + void readLatest(const QByteArray &type, const QByteArray &uid, const std::function callback); + void readLatest(const QByteArray &type, const QByteArray &uid, const std::function callback); + + ApplicationDomain::ApplicationDomainType readLatest(const QByteArray &type, const QByteArray &uid); + + template + T readLatest(const QByteArray &uid) { + return T(readLatest(ApplicationDomain::getTypeName(), uid)); + } + + void readEntity(const QByteArray &type, const QByteArray &uid, const std::function callback); + void readEntity(const QByteArray &type, const QByteArray &uid, const std::function callback); + ApplicationDomain::ApplicationDomainType readEntity(const QByteArray &type, const QByteArray &key); + + template + T readEntity(const QByteArray &key) { + return T(readEntity(ApplicationDomain::getTypeName(), key)); + } + + + void readPrevious(const QByteArray &type, const QByteArray &uid, qint64 revision, const std::function callback); + void readPrevious(const QByteArray &type, const QByteArray &uid, qint64 revision, const std::function callback); + ApplicationDomain::ApplicationDomainType readPrevious(const QByteArray &type, const QByteArray &uid, qint64 revision); + + template + T readPrevious(const QByteArray &uid, qint64 revision) { + return T(readPrevious(ApplicationDomain::getTypeName(), uid, revision)); + } + + void readAllUids(const QByteArray &type, const std::function callback); + + void readAll(const QByteArray &type, const std::function &callback); + + template + void readAll(const std::function &callback) { + return readAll(ApplicationDomain::getTypeName(), [&](const ApplicationDomain::ApplicationDomainType &entity) { + callback(T(entity)); + }); + } + + void readRevisions(qint64 baseRevision, const QByteArray &type, const std::function &callback); + + bool contains(const QByteArray &type, const QByteArray &uid); + + qint64 maxRevision(); + +private: + class Private; + const QSharedPointer d; +}; + +} +} diff --git a/common/storage_common.cpp b/common/storage_common.cpp index 1f2594e..60ef83d 100644 --- a/common/storage_common.cpp +++ b/common/storage_common.cpp @@ -27,26 +27,27 @@ SINK_DEBUG_AREA("storage") namespace Sink { +namespace Storage { static const char *s_internalPrefix = "__internal"; static const int s_internalPrefixSize = strlen(s_internalPrefix); -void errorHandler(const Storage::Error &error) +void errorHandler(const DataStore::Error &error) { SinkWarning() << "Database error in " << error.store << ", code " << error.code << ", message: " << error.message; } -std::function Storage::basicErrorHandler() +std::function DataStore::basicErrorHandler() { return errorHandler; } -void Storage::setDefaultErrorHandler(const std::function &errorHandler) +void DataStore::setDefaultErrorHandler(const std::function &errorHandler) { mErrorHandler = errorHandler; } -std::function Storage::defaultErrorHandler() const +std::function DataStore::defaultErrorHandler() const { if (mErrorHandler) { return mErrorHandler; @@ -54,12 +55,12 @@ std::function Storage::defaultErrorHandler() return basicErrorHandler(); } -void Storage::setMaxRevision(Sink::Storage::Transaction &transaction, qint64 revision) +void DataStore::setMaxRevision(DataStore::Transaction &transaction, qint64 revision) { transaction.openDatabase().write("__internal_maxRevision", QByteArray::number(revision)); } -qint64 Storage::maxRevision(const Sink::Storage::Transaction &transaction) +qint64 DataStore::maxRevision(const DataStore::Transaction &transaction) { qint64 r = 0; transaction.openDatabase().scan("__internal_maxRevision", @@ -68,19 +69,19 @@ qint64 Storage::maxRevision(const Sink::Storage::Transaction &transaction) return false; }, [](const Error &error) { - if (error.code != Sink::Storage::NotFound) { + if (error.code != DataStore::NotFound) { SinkWarning() << "Coultn'd find the maximum revision."; } }); return r; } -void Storage::setCleanedUpRevision(Sink::Storage::Transaction &transaction, qint64 revision) +void DataStore::setCleanedUpRevision(DataStore::Transaction &transaction, qint64 revision) { transaction.openDatabase().write("__internal_cleanedUpRevision", QByteArray::number(revision)); } -qint64 Storage::cleanedUpRevision(const Sink::Storage::Transaction &transaction) +qint64 DataStore::cleanedUpRevision(const DataStore::Transaction &transaction) { qint64 r = 0; transaction.openDatabase().scan("__internal_cleanedUpRevision", @@ -89,14 +90,14 @@ qint64 Storage::cleanedUpRevision(const Sink::Storage::Transaction &transaction) return false; }, [](const Error &error) { - if (error.code != Sink::Storage::NotFound) { + if (error.code != DataStore::NotFound) { SinkWarning() << "Coultn'd find the maximum revision."; } }); return r; } -QByteArray Storage::getUidFromRevision(const Sink::Storage::Transaction &transaction, qint64 revision) +QByteArray DataStore::getUidFromRevision(const DataStore::Transaction &transaction, qint64 revision) { QByteArray uid; transaction.openDatabase("revisions") @@ -109,7 +110,7 @@ QByteArray Storage::getUidFromRevision(const Sink::Storage::Transaction &transac return uid; } -QByteArray Storage::getTypeFromRevision(const Sink::Storage::Transaction &transaction, qint64 revision) +QByteArray DataStore::getTypeFromRevision(const DataStore::Transaction &transaction, qint64 revision) { QByteArray type; transaction.openDatabase("revisionType") @@ -122,25 +123,25 @@ QByteArray Storage::getTypeFromRevision(const Sink::Storage::Transaction &transa return type; } -void Storage::recordRevision(Sink::Storage::Transaction &transaction, qint64 revision, const QByteArray &uid, const QByteArray &type) +void DataStore::recordRevision(DataStore::Transaction &transaction, qint64 revision, const QByteArray &uid, const QByteArray &type) { // TODO use integerkeys transaction.openDatabase("revisions").write(QByteArray::number(revision), uid); transaction.openDatabase("revisionType").write(QByteArray::number(revision), type); } -void Storage::removeRevision(Sink::Storage::Transaction &transaction, qint64 revision) +void DataStore::removeRevision(DataStore::Transaction &transaction, qint64 revision) { transaction.openDatabase("revisions").remove(QByteArray::number(revision)); transaction.openDatabase("revisionType").remove(QByteArray::number(revision)); } -bool Storage::isInternalKey(const char *key) +bool DataStore::isInternalKey(const char *key) { return key && strncmp(key, s_internalPrefix, s_internalPrefixSize) == 0; } -bool Storage::isInternalKey(void *key, int size) +bool DataStore::isInternalKey(void *key, int size) { if (size < 1) { return false; @@ -149,39 +150,39 @@ bool Storage::isInternalKey(void *key, int size) return key && strncmp(static_cast(key), s_internalPrefix, (size > s_internalPrefixSize ? s_internalPrefixSize : size)) == 0; } -bool Storage::isInternalKey(const QByteArray &key) +bool DataStore::isInternalKey(const QByteArray &key) { return key.startsWith(s_internalPrefix); } -QByteArray Storage::assembleKey(const QByteArray &key, qint64 revision) +QByteArray DataStore::assembleKey(const QByteArray &key, qint64 revision) { Q_ASSERT(revision <= 9223372036854775807); Q_ASSERT(key.size() == 38); return key + QByteArray::number(revision).rightJustified(19, '0', false); } -QByteArray Storage::uidFromKey(const QByteArray &key) +QByteArray DataStore::uidFromKey(const QByteArray &key) { return key.mid(0, 38); } -qint64 Storage::revisionFromKey(const QByteArray &key) +qint64 DataStore::revisionFromKey(const QByteArray &key) { return key.mid(39).toLongLong(); } -QByteArray Storage::generateUid() +QByteArray DataStore::generateUid() { return QUuid::createUuid().toByteArray(); } -Storage::NamedDatabase Storage::mainDatabase(const Sink::Storage::Transaction &t, const QByteArray &type) +DataStore::NamedDatabase DataStore::mainDatabase(const DataStore::Transaction &t, const QByteArray &type) { return t.openDatabase(type + ".main"); } -bool Storage::NamedDatabase::contains(const QByteArray &uid) +bool DataStore::NamedDatabase::contains(const QByteArray &uid) { bool found = false; scan(uid, @@ -189,8 +190,9 @@ bool Storage::NamedDatabase::contains(const QByteArray &uid) found = true; return false; }, - [this](const Sink::Storage::Error &error) {}, true); + [this](const DataStore::Error &error) {}, true); return found; } +} } // namespace Sink diff --git a/common/storage_lmdb.cpp b/common/storage_lmdb.cpp index 6f11af3..e418472 100644 --- a/common/storage_lmdb.cpp +++ b/common/storage_lmdb.cpp @@ -39,6 +39,7 @@ SINK_DEBUG_AREA("storage") // SINK_DEBUG_COMPONENT(d->storageRoot.toLatin1() + '/' + d->name.toLatin1()) namespace Sink { +namespace Storage { QMutex sMutex; QHash sEnvironments; @@ -47,17 +48,17 @@ int getErrorCode(int e) { switch (e) { case MDB_NOTFOUND: - return Storage::ErrorCodes::NotFound; + return DataStore::ErrorCodes::NotFound; default: break; } return -1; } -class Storage::NamedDatabase::Private +class DataStore::NamedDatabase::Private { public: - Private(const QByteArray &_db, bool _allowDuplicates, const std::function &_defaultErrorHandler, const QString &_name, MDB_txn *_txn) + Private(const QByteArray &_db, bool _allowDuplicates, const std::function &_defaultErrorHandler, const QString &_name, MDB_txn *_txn) : db(_db), transaction(_txn), allowDuplicates(_allowDuplicates), defaultErrorHandler(_defaultErrorHandler), name(_name) { } @@ -70,10 +71,10 @@ public: MDB_txn *transaction; MDB_dbi dbi; bool allowDuplicates; - std::function defaultErrorHandler; + std::function defaultErrorHandler; QString name; - bool openDatabase(bool readOnly, std::function errorHandler) + bool openDatabase(bool readOnly, std::function errorHandler) { unsigned int flags = 0; if (!readOnly) { @@ -97,20 +98,20 @@ public: } }; -Storage::NamedDatabase::NamedDatabase() : d(nullptr) +DataStore::NamedDatabase::NamedDatabase() : d(nullptr) { } -Storage::NamedDatabase::NamedDatabase(NamedDatabase::Private *prv) : d(prv) +DataStore::NamedDatabase::NamedDatabase(NamedDatabase::Private *prv) : d(prv) { } -Storage::NamedDatabase::NamedDatabase(NamedDatabase &&other) : d(nullptr) +DataStore::NamedDatabase::NamedDatabase(NamedDatabase &&other) : d(nullptr) { *this = std::move(other); } -Storage::NamedDatabase &Storage::NamedDatabase::operator=(Storage::NamedDatabase &&other) +DataStore::NamedDatabase &DataStore::NamedDatabase::operator=(DataStore::NamedDatabase &&other) { if (&other != this) { delete d; @@ -120,12 +121,12 @@ Storage::NamedDatabase &Storage::NamedDatabase::operator=(Storage::NamedDatabase return *this; } -Storage::NamedDatabase::~NamedDatabase() +DataStore::NamedDatabase::~NamedDatabase() { delete d; } -bool Storage::NamedDatabase::write(const QByteArray &sKey, const QByteArray &sValue, const std::function &errorHandler) +bool DataStore::NamedDatabase::write(const QByteArray &sKey, const QByteArray &sValue, const std::function &errorHandler) { if (!d || !d->transaction) { Error error("", ErrorCodes::GenericError, "Not open"); @@ -161,12 +162,12 @@ bool Storage::NamedDatabase::write(const QByteArray &sKey, const QByteArray &sVa return !rc; } -void Storage::NamedDatabase::remove(const QByteArray &k, const std::function &errorHandler) +void DataStore::NamedDatabase::remove(const QByteArray &k, const std::function &errorHandler) { remove(k, QByteArray(), errorHandler); } -void Storage::NamedDatabase::remove(const QByteArray &k, const QByteArray &value, const std::function &errorHandler) +void DataStore::NamedDatabase::remove(const QByteArray &k, const QByteArray &value, const std::function &errorHandler) { if (!d || !d->transaction) { if (d) { @@ -195,8 +196,8 @@ void Storage::NamedDatabase::remove(const QByteArray &k, const QByteArray &value } } -int Storage::NamedDatabase::scan(const QByteArray &k, const std::function &resultHandler, - const std::function &errorHandler, bool findSubstringKeys, bool skipInternalKeys) const +int DataStore::NamedDatabase::scan(const QByteArray &k, const std::function &resultHandler, + const std::function &errorHandler, bool findSubstringKeys, bool skipInternalKeys) const { if (!d || !d->transaction) { // Not an error. We rely on this to read nothing from non-existing databases. @@ -278,8 +279,8 @@ int Storage::NamedDatabase::scan(const QByteArray &k, const std::function &resultHandler, - const std::function &errorHandler) const +void DataStore::NamedDatabase::findLatest(const QByteArray &k, const std::function &resultHandler, + const std::function &errorHandler) const { if (!d || !d->transaction) { // Not an error. We rely on this to read nothing from non-existing databases. @@ -346,7 +347,7 @@ void Storage::NamedDatabase::findLatest(const QByteArray &k, const std::function return; } -qint64 Storage::NamedDatabase::getSize() +qint64 DataStore::NamedDatabase::getSize() { if (!d || !d->transaction) { return -1; @@ -368,10 +369,10 @@ qint64 Storage::NamedDatabase::getSize() } -class Storage::Transaction::Private +class DataStore::Transaction::Private { public: - Private(bool _requestRead, const std::function &_defaultErrorHandler, const QString &_name, MDB_env *_env) + Private(bool _requestRead, const std::function &_defaultErrorHandler, const QString &_name, MDB_env *_env) : env(_env), transaction(nullptr), requestedRead(_requestRead), defaultErrorHandler(_defaultErrorHandler), name(_name), implicitCommit(false), error(false), modificationCounter(0) { } @@ -383,7 +384,7 @@ public: MDB_txn *transaction; MDB_dbi dbi; bool requestedRead; - std::function defaultErrorHandler; + std::function defaultErrorHandler; QString name; bool implicitCommit; bool error; @@ -406,21 +407,21 @@ public: } }; -Storage::Transaction::Transaction() : d(nullptr) +DataStore::Transaction::Transaction() : d(nullptr) { } -Storage::Transaction::Transaction(Transaction::Private *prv) : d(prv) +DataStore::Transaction::Transaction(Transaction::Private *prv) : d(prv) { d->startTransaction(); } -Storage::Transaction::Transaction(Transaction &&other) : d(nullptr) +DataStore::Transaction::Transaction(Transaction &&other) : d(nullptr) { *this = std::move(other); } -Storage::Transaction &Storage::Transaction::operator=(Storage::Transaction &&other) +DataStore::Transaction &DataStore::Transaction::operator=(DataStore::Transaction &&other) { if (&other != this) { delete d; @@ -430,7 +431,7 @@ Storage::Transaction &Storage::Transaction::operator=(Storage::Transaction &&oth return *this; } -Storage::Transaction::~Transaction() +DataStore::Transaction::~Transaction() { if (d && d->transaction) { if (d->implicitCommit && !d->error) { @@ -443,12 +444,12 @@ Storage::Transaction::~Transaction() delete d; } -Storage::Transaction::operator bool() const +DataStore::Transaction::operator bool() const { return (d && d->transaction); } -bool Storage::Transaction::commit(const std::function &errorHandler) +bool DataStore::Transaction::commit(const std::function &errorHandler) { if (!d || !d->transaction) { return false; @@ -467,7 +468,7 @@ bool Storage::Transaction::commit(const std::functiontransaction) { return; @@ -481,7 +482,7 @@ void Storage::Transaction::abort() //Ensure that we opened the correct database by comparing the expected identifier with the one //we write to the database on first open. -static bool ensureCorrectDb(Storage::NamedDatabase &database, const QByteArray &db, bool readOnly) +static bool ensureCorrectDb(DataStore::NamedDatabase &database, const QByteArray &db, bool readOnly) { bool openedTheWrongDatabase = false; auto count = database.scan("__internal_dbname", [db, &openedTheWrongDatabase](const QByteArray &key, const QByteArray &value) ->bool { @@ -491,7 +492,7 @@ static bool ensureCorrectDb(Storage::NamedDatabase &database, const QByteArray & } return false; }, - [](const Storage::Error &error) -> bool{ + [](const DataStore::Error &error) -> bool{ return false; }, false); //This is the first time we open this database in a write transaction, write the db name @@ -503,7 +504,7 @@ static bool ensureCorrectDb(Storage::NamedDatabase &database, const QByteArray & return !openedTheWrongDatabase; } -bool Storage::Transaction::validateNamedDatabases() +bool DataStore::Transaction::validateNamedDatabases() { auto databases = getDatabaseNames(); for (const auto &dbName : databases) { @@ -516,28 +517,28 @@ bool Storage::Transaction::validateNamedDatabases() return true; } -Storage::NamedDatabase Storage::Transaction::openDatabase(const QByteArray &db, const std::function &errorHandler, bool allowDuplicates) const +DataStore::NamedDatabase DataStore::Transaction::openDatabase(const QByteArray &db, const std::function &errorHandler, bool allowDuplicates) const { if (!d) { - return Storage::NamedDatabase(); + return DataStore::NamedDatabase(); } Q_ASSERT(d->transaction); // We don't now if anything changed d->implicitCommit = true; - auto p = new Storage::NamedDatabase::Private(db, allowDuplicates, d->defaultErrorHandler, d->name, d->transaction); + auto p = new DataStore::NamedDatabase::Private(db, allowDuplicates, d->defaultErrorHandler, d->name, d->transaction); if (!p->openDatabase(d->requestedRead, errorHandler)) { delete p; - return Storage::NamedDatabase(); + return DataStore::NamedDatabase(); } - auto database = Storage::NamedDatabase(p); + auto database = DataStore::NamedDatabase(p); if (!ensureCorrectDb(database, db, d->requestedRead)) { SinkWarning() << "Failed to open the database" << db; - return Storage::NamedDatabase(); + return DataStore::NamedDatabase(); } return database; } -QList Storage::Transaction::getDatabaseNames() const +QList DataStore::Transaction::getDatabaseNames() const { if (!d) { SinkWarning() << "Invalid transaction"; @@ -574,7 +575,7 @@ QList Storage::Transaction::getDatabaseNames() const } -class Storage::Private +class DataStore::Private { public: Private(const QString &s, const QString &n, AccessMode m); @@ -587,7 +588,7 @@ public: AccessMode mode; }; -Storage::Private::Private(const QString &s, const QString &n, AccessMode m) : storageRoot(s), name(n), env(0), mode(m) +DataStore::Private::Private(const QString &s, const QString &n, AccessMode m) : storageRoot(s), name(n), env(0), mode(m) { const QString fullPath(storageRoot + '/' + name); QFileInfo dirInfo(fullPath); @@ -639,27 +640,27 @@ Storage::Private::Private(const QString &s, const QString &n, AccessMode m) : st } } -Storage::Private::~Private() +DataStore::Private::~Private() { //We never close the environment (unless we remove the db), since we should only open the environment once per process (as per lmdb docs) //and create storage instance from all over the place. Thus, we're not closing it here on purpose. } -Storage::Storage(const QString &storageRoot, const QString &name, AccessMode mode) : d(new Private(storageRoot, name, mode)) +DataStore::DataStore(const QString &storageRoot, const QString &name, AccessMode mode) : d(new Private(storageRoot, name, mode)) { } -Storage::~Storage() +DataStore::~DataStore() { delete d; } -bool Storage::exists() const +bool DataStore::exists() const { return (d->env != 0); } -Storage::Transaction Storage::createTransaction(AccessMode type, const std::function &errorHandlerArg) +DataStore::Transaction DataStore::createTransaction(AccessMode type, const std::function &errorHandlerArg) { auto errorHandler = errorHandlerArg ? errorHandlerArg : defaultErrorHandler(); if (!d->env) { @@ -677,7 +678,7 @@ Storage::Transaction Storage::createTransaction(AccessMode type, const std::func return Transaction(new Transaction::Private(requestedRead, defaultErrorHandler(), d->name, d->env)); } -qint64 Storage::diskUsage() const +qint64 DataStore::diskUsage() const { QFileInfo info(d->storageRoot + '/' + d->name + "/data.mdb"); if (!info.exists()) { @@ -686,7 +687,7 @@ qint64 Storage::diskUsage() const return info.size(); } -void Storage::removeFromDisk() const +void DataStore::removeFromDisk() const { const QString fullPath(d->storageRoot + '/' + d->name); QMutexLocker locker(&sMutex); @@ -701,7 +702,7 @@ void Storage::removeFromDisk() const } } -void Storage::clearEnv() +void DataStore::clearEnv() { for (auto env : sEnvironments) { mdb_env_close(env); @@ -709,4 +710,5 @@ void Storage::clearEnv() sEnvironments.clear(); } +} } // namespace Sink diff --git a/common/store.cpp b/common/store.cpp index 0ecdcd2..52fec2e 100644 --- a/common/store.cpp +++ b/common/store.cpp @@ -230,7 +230,7 @@ KAsync::Job Store::removeDataFromDisk(const QByteArray &identifier) { // All databases are going to become invalid, nuke the environments // TODO: all clients should react to a notification the resource - Sink::Storage::clearEnv(); + Sink::Storage::DataStore::clearEnv(); SinkTrace() << "Remove data from disk " << identifier; auto time = QSharedPointer::create(); time->start(); diff --git a/common/synchronizer.cpp b/common/synchronizer.cpp index 53db82f..5ddd77c 100644 --- a/common/synchronizer.cpp +++ b/common/synchronizer.cpp @@ -24,7 +24,7 @@ #include "bufferutils.h" #include "entitystore.h" #include "remoteidmap.h" -#include "adaptorfactoryregistry.h" +#include "entityreader.h" #include "createentity_generated.h" #include "modifyentity_generated.h" #include "deleteentity_generated.h" @@ -33,13 +33,12 @@ SINK_DEBUG_AREA("synchronizer") using namespace Sink; -Synchronizer::Synchronizer(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier) - : mStorage(Sink::storageLocation(), resourceInstanceIdentifier, Sink::Storage::ReadOnly), - mSyncStorage(Sink::storageLocation(), resourceInstanceIdentifier + ".synchronization", Sink::Storage::ReadWrite), - mResourceType(resourceType), - mResourceInstanceIdentifier(resourceInstanceIdentifier) +Synchronizer::Synchronizer(const Sink::ResourceContext &context) + : mResourceContext(context), + mEntityStore(Storage::EntityStore::Ptr::create(mResourceContext)), + mSyncStorage(Sink::storageLocation(), mResourceContext.instanceId() + ".synchronization", Sink::Storage::DataStore::DataStore::ReadWrite) { - SinkTrace() << "Starting synchronizer: " << resourceType << resourceInstanceIdentifier; + SinkTrace() << "Starting synchronizer: " << mResourceContext.resourceType << mResourceContext.instanceId(); } Synchronizer::~Synchronizer() @@ -59,11 +58,9 @@ void Synchronizer::enqueueCommand(int commandId, const QByteArray &data) mEnqueue(commandId, data); } -EntityStore &Synchronizer::store() +Storage::EntityStore &Synchronizer::store() { - if (!mEntityStore) { - mEntityStore = QSharedPointer::create(mResourceType, mResourceInstanceIdentifier, transaction()); - } + mEntityStore->startTransaction(Sink::Storage::DataStore::ReadOnly); return *mEntityStore; } @@ -75,13 +72,12 @@ RemoteIdMap &Synchronizer::syncStore() return *mSyncStore; } -void Synchronizer::createEntity(const QByteArray &sinkId, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject, - DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function callback) +void Synchronizer::createEntity(const QByteArray &sinkId, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject) { // These changes are coming from the source const auto replayToSource = false; flatbuffers::FlatBufferBuilder entityFbb; - adaptorFactory.createBuffer(domainObject, entityFbb); + mResourceContext.adaptorFactory(bufferType).createBuffer(domainObject, entityFbb); flatbuffers::FlatBufferBuilder fbb; // This is the resource type and not the domain type auto entityId = fbb.CreateString(sinkId.toStdString()); @@ -89,18 +85,17 @@ void Synchronizer::createEntity(const QByteArray &sinkId, const QByteArray &buff auto delta = Sink::EntityBuffer::appendAsVector(fbb, entityFbb.GetBufferPointer(), entityFbb.GetSize()); auto location = Sink::Commands::CreateCreateEntity(fbb, entityId, type, delta, replayToSource); Sink::Commands::FinishCreateEntityBuffer(fbb, location); - callback(BufferUtils::extractBuffer(fbb)); + enqueueCommand(Sink::Commands::CreateEntityCommand, BufferUtils::extractBuffer(fbb)); } -void Synchronizer::modifyEntity(const QByteArray &sinkId, qint64 revision, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject, - DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function callback) +void Synchronizer::modifyEntity(const QByteArray &sinkId, qint64 revision, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject) { // FIXME removals QByteArrayList deletedProperties; // These changes are coming from the source const auto replayToSource = false; flatbuffers::FlatBufferBuilder entityFbb; - adaptorFactory.createBuffer(domainObject, entityFbb); + mResourceContext.adaptorFactory(bufferType).createBuffer(domainObject, entityFbb); flatbuffers::FlatBufferBuilder fbb; auto entityId = fbb.CreateString(sinkId.toStdString()); auto modifiedProperties = BufferUtils::toVector(fbb, domainObject.changedProperties()); @@ -110,10 +105,10 @@ void Synchronizer::modifyEntity(const QByteArray &sinkId, qint64 revision, const auto delta = Sink::EntityBuffer::appendAsVector(fbb, entityFbb.GetBufferPointer(), entityFbb.GetSize()); auto location = Sink::Commands::CreateModifyEntity(fbb, revision, entityId, deletions, type, delta, replayToSource, modifiedProperties); Sink::Commands::FinishModifyEntityBuffer(fbb, location); - callback(BufferUtils::extractBuffer(fbb)); + enqueueCommand(Sink::Commands::ModifyEntityCommand, BufferUtils::extractBuffer(fbb)); } -void Synchronizer::deleteEntity(const QByteArray &sinkId, qint64 revision, const QByteArray &bufferType, std::function callback) +void Synchronizer::deleteEntity(const QByteArray &sinkId, qint64 revision, const QByteArray &bufferType) { // These changes are coming from the source const auto replayToSource = false; @@ -123,63 +118,69 @@ void Synchronizer::deleteEntity(const QByteArray &sinkId, qint64 revision, const auto type = fbb.CreateString(bufferType.toStdString()); auto location = Sink::Commands::CreateDeleteEntity(fbb, revision, entityId, type, replayToSource); Sink::Commands::FinishDeleteEntityBuffer(fbb, location); - callback(BufferUtils::extractBuffer(fbb)); + enqueueCommand(Sink::Commands::DeleteEntityCommand, BufferUtils::extractBuffer(fbb)); } void Synchronizer::scanForRemovals(const QByteArray &bufferType, const std::function &callback)> &entryGenerator, std::function exists) { - entryGenerator([this, bufferType, &exists](const QByteArray &key) { - auto sinkId = Sink::Storage::uidFromKey(key); + entryGenerator([this, bufferType, &exists](const QByteArray &sinkId) { const auto remoteId = syncStore().resolveLocalId(bufferType, sinkId); - SinkTrace() << "Checking for removal " << key << remoteId; + SinkTrace() << "Checking for removal " << sinkId << remoteId; // If we have no remoteId, the entity hasn't been replayed to the source yet if (!remoteId.isEmpty()) { if (!exists(remoteId)) { SinkTrace() << "Found a removed entity: " << sinkId; - deleteEntity(sinkId, Sink::Storage::maxRevision(transaction()), bufferType, - [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::DeleteEntityCommand, buffer); }); + deleteEntity(sinkId, mEntityStore->maxRevision(), bufferType); } } }); } -void Synchronizer::modify(const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity) +void Synchronizer::scanForRemovals(const QByteArray &bufferType, std::function exists) { - auto mainDatabase = Storage::mainDatabase(transaction(), bufferType); - const auto sinkId = syncStore().resolveRemoteId(bufferType, remoteId); - auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(mResourceType, bufferType); - Q_ASSERT(adaptorFactory); - qint64 retrievedRevision = 0; - if (auto current = EntityReaderUtils::getLatest(mainDatabase, sinkId, *adaptorFactory, retrievedRevision)) { + scanForRemovals(bufferType, + [this, &bufferType](const std::function &callback) { + store().readAllUids(bufferType, [callback](const QByteArray &uid) { + callback(uid); + }); + }, + exists + ); +} + +void Synchronizer::modifyIfChanged(Storage::EntityStore &store, const QByteArray &bufferType, const QByteArray &sinkId, const Sink::ApplicationDomain::ApplicationDomainType &entity) +{ + store.readLatest(bufferType, sinkId, [&, this](const Sink::ApplicationDomain::ApplicationDomainType ¤t) { bool changed = false; for (const auto &property : entity.changedProperties()) { - if (entity.getProperty(property) != current->getProperty(property)) { + if (entity.getProperty(property) != current.getProperty(property)) { SinkTrace() << "Property changed " << sinkId << property; changed = true; } } if (changed) { - SinkTrace() << "Found a modified entity: " << remoteId; - modifyEntity(sinkId, Sink::Storage::maxRevision(transaction()), bufferType, entity, *adaptorFactory, - [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::ModifyEntityCommand, buffer); }); + SinkTrace() << "Found a modified entity: " << sinkId; + modifyEntity(sinkId, store.maxRevision(), bufferType, entity); } - } else { - SinkWarning() << "Failed to get current entity"; - } + }); +} + +void Synchronizer::modify(const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity) +{ + const auto sinkId = syncStore().resolveRemoteId(bufferType, remoteId); + Storage::EntityStore store(mResourceContext); + modifyIfChanged(store, bufferType, sinkId, entity); } void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity) { SinkTrace() << "Create or modify" << bufferType << remoteId; - auto mainDatabase = Storage::mainDatabase(transaction(), bufferType); + Storage::EntityStore store(mResourceContext); const auto sinkId = syncStore().resolveRemoteId(bufferType, remoteId); - const auto found = mainDatabase.contains(sinkId); + const auto found = store.contains(bufferType, sinkId); if (!found) { SinkTrace() << "Found a new entity: " << remoteId; - auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(mResourceType, bufferType); - Q_ASSERT(adaptorFactory); - createEntity( - sinkId, bufferType, entity, *adaptorFactory, [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::CreateEntityCommand, buffer); }); + createEntity(sinkId, bufferType, entity); } else { // modification modify(bufferType, remoteId, entity); } @@ -190,10 +191,9 @@ void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray { SinkTrace() << "Create or modify" << bufferType << remoteId; - auto mainDatabase = Storage::mainDatabase(transaction(), bufferType); const auto sinkId = syncStore().resolveRemoteId(bufferType, remoteId); - const auto found = mainDatabase.contains(sinkId); - auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(mResourceType, bufferType); + Storage::EntityStore store(mResourceContext); + const auto found = store.contains(bufferType, sinkId); if (!found) { if (!mergeCriteria.isEmpty()) { Sink::Query query; @@ -201,7 +201,8 @@ void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray query.filter(it.key(), it.value()); } bool merge = false; - Sink::EntityReader reader(mResourceType, mResourceInstanceIdentifier, transaction()); + Storage::EntityStore store(mResourceContext); + Sink::EntityReader reader(store); reader.query(query, [this, bufferType, remoteId, &merge](const DomainType &o) -> bool{ merge = true; @@ -211,43 +212,21 @@ void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray }); if (!merge) { SinkTrace() << "Found a new entity: " << remoteId; - createEntity( - sinkId, bufferType, entity, *adaptorFactory, [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::CreateEntityCommand, buffer); }); + createEntity(sinkId, bufferType, entity); } } else { SinkTrace() << "Found a new entity: " << remoteId; - createEntity( - sinkId, bufferType, entity, *adaptorFactory, [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::CreateEntityCommand, buffer); }); + createEntity(sinkId, bufferType, entity); } } else { // modification - qint64 retrievedRevision = 0; - if (auto current = EntityReaderUtils::getLatest(mainDatabase, sinkId, *adaptorFactory, retrievedRevision)) { - bool changed = false; - for (const auto &property : entity.changedProperties()) { - if (entity.getProperty(property) != current->getProperty(property)) { - SinkTrace() << "Property changed " << sinkId << property; - changed = true; - } - } - if (changed) { - SinkTrace() << "Found a modified entity: " << remoteId; - modifyEntity(sinkId, Sink::Storage::maxRevision(transaction()), bufferType, entity, *adaptorFactory, - [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::ModifyEntityCommand, buffer); }); - } - } else { - SinkWarning() << "Failed to get current entity"; - } + modifyIfChanged(store, bufferType, sinkId, entity); } } template void Synchronizer::modify(const DomainType &entity) { - const auto bufferType = ApplicationDomain::getTypeName(); - const auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(mResourceType, bufferType); - Q_ASSERT(adaptorFactory); - modifyEntity(entity.identifier(), entity.revision(), bufferType, entity, *adaptorFactory, - [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::ModifyEntityCommand, buffer); }); + modifyEntity(entity.identifier(), entity.revision(), ApplicationDomain::getTypeName(), entity); } KAsync::Job Synchronizer::synchronize() @@ -257,7 +236,6 @@ KAsync::Job Synchronizer::synchronize() mMessageQueue->startTransaction(); return synchronizeWithSource().syncThen([this]() { mSyncStore.clear(); - mEntityStore.clear(); mMessageQueue->commit(); mSyncInProgress = false; }); @@ -266,8 +244,7 @@ KAsync::Job Synchronizer::synchronize() void Synchronizer::commit() { mMessageQueue->commit(); - mTransaction.abort(); - mEntityStore.clear(); + mEntityStore->abortTransaction(); mSyncTransaction.commit(); mSyncStore.clear(); if (mSyncInProgress) { @@ -275,20 +252,11 @@ void Synchronizer::commit() } } -Sink::Storage::Transaction &Synchronizer::transaction() -{ - if (!mTransaction) { - SinkTrace() << "Starting transaction"; - mTransaction = mStorage.createTransaction(Sink::Storage::ReadOnly); - } - return mTransaction; -} - -Sink::Storage::Transaction &Synchronizer::syncTransaction() +Sink::Storage::DataStore::DataStore::Transaction &Synchronizer::syncTransaction() { if (!mSyncTransaction) { SinkTrace() << "Starting transaction"; - mSyncTransaction = mSyncStorage.createTransaction(Sink::Storage::ReadWrite); + mSyncTransaction = mSyncStorage.createTransaction(Sink::Storage::DataStore::DataStore::ReadWrite); } return mSyncTransaction; } diff --git a/common/synchronizer.h b/common/synchronizer.h index 5f60128..f3319f6 100644 --- a/common/synchronizer.h +++ b/common/synchronizer.h @@ -37,31 +37,28 @@ class RemoteIdMap; class SINK_EXPORT Synchronizer { public: - Synchronizer(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier); + Synchronizer(const Sink::ResourceContext &resourceContext); virtual ~Synchronizer(); void setup(const std::function &enqueueCommandCallback, MessageQueue &messageQueue); KAsync::Job synchronize(); //Read only access to main storage - EntityStore &store(); + Storage::EntityStore &store(); //Read/Write access to sync storage RemoteIdMap &syncStore(); void commit(); - Sink::Storage::Transaction &transaction(); - Sink::Storage::Transaction &syncTransaction(); + Sink::Storage::DataStore::Transaction &syncTransaction(); protected: ///Calls the callback to enqueue the command void enqueueCommand(int commandId, const QByteArray &data); - static void createEntity(const QByteArray &localId, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject, - DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function callback); - static void modifyEntity(const QByteArray &localId, qint64 revision, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject, - DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function callback); - static void deleteEntity(const QByteArray &localId, qint64 revision, const QByteArray &bufferType, std::function callback); + void createEntity(const QByteArray &localId, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject); + void modifyEntity(const QByteArray &localId, qint64 revision, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject); + void deleteEntity(const QByteArray &localId, qint64 revision, const QByteArray &bufferType); /** * A synchronous algorithm to remove entities that are no longer existing. @@ -74,7 +71,8 @@ protected: * All functions are called synchronously, and both @param entryGenerator and @param exists need to be synchronous. */ void scanForRemovals(const QByteArray &bufferType, - const std::function &callback)> &entryGenerator, std::function exists); + const std::function &callback)> &entryGenerator, std::function exists); + void scanForRemovals(const QByteArray &bufferType, std::function exists); /** * An algorithm to create or modify the entity. @@ -96,14 +94,13 @@ protected: virtual KAsync::Job synchronizeWithSource() = 0; private: + void modifyIfChanged(Storage::EntityStore &store, const QByteArray &bufferType, const QByteArray &sinkId, const Sink::ApplicationDomain::ApplicationDomainType &entity); + + Sink::ResourceContext mResourceContext; + Sink::Storage::EntityStore::Ptr mEntityStore; QSharedPointer mSyncStore; - QSharedPointer mEntityStore; - Sink::Storage mStorage; - Sink::Storage mSyncStorage; - QByteArray mResourceType; - QByteArray mResourceInstanceIdentifier; - Sink::Storage::Transaction mTransaction; - Sink::Storage::Transaction mSyncTransaction; + Sink::Storage::DataStore mSyncStorage; + Sink::Storage::DataStore::Transaction mSyncTransaction; std::function mEnqueue; MessageQueue *mMessageQueue; bool mSyncInProgress; diff --git a/common/test.cpp b/common/test.cpp index 7bba125..0982293 100644 --- a/common/test.cpp +++ b/common/test.cpp @@ -104,11 +104,11 @@ public: facade->mTestAccount = testAccount; map.insert(instanceIdentifier, facade); bool alwaysReturnFacade = instanceIdentifier.isEmpty(); - Sink::FacadeFactory::instance().registerFacade>("testresource", [alwaysReturnFacade](const QByteArray &instanceIdentifier) { + Sink::FacadeFactory::instance().registerFacade>("testresource", [alwaysReturnFacade](const Sink::ResourceContext &context) { if (alwaysReturnFacade) { return map.value(QByteArray()); } - return map.value(instanceIdentifier); + return map.value(context.resourceInstanceIdentifier); }); return facade; } diff --git a/common/typeindex.cpp b/common/typeindex.cpp index 816e7ee..64c2a01 100644 --- a/common/typeindex.cpp +++ b/common/typeindex.cpp @@ -66,7 +66,7 @@ QByteArray TypeIndex::indexName(const QByteArray &property, const QByteArray &so template <> void TypeIndex::addProperty(const QByteArray &property) { - auto indexer = [this, property](const QByteArray &identifier, const QVariant &value, Sink::Storage::Transaction &transaction) { + auto indexer = [this, property](const QByteArray &identifier, const QVariant &value, Sink::Storage::DataStore::Transaction &transaction) { // SinkTrace() << "Indexing " << mType + ".index." + property << value.toByteArray(); Index(indexName(property), transaction).add(getByteArray(value), identifier); }; @@ -77,7 +77,7 @@ void TypeIndex::addProperty(const QByteArray &property) template <> void TypeIndex::addProperty(const QByteArray &property) { - auto indexer = [this, property](const QByteArray &identifier, const QVariant &value, Sink::Storage::Transaction &transaction) { + auto indexer = [this, property](const QByteArray &identifier, const QVariant &value, Sink::Storage::DataStore::Transaction &transaction) { // SinkTrace() << "Indexing " << mType + ".index." + property << value.toByteArray(); Index(indexName(property), transaction).add(getByteArray(value), identifier); }; @@ -88,7 +88,7 @@ void TypeIndex::addProperty(const QByteArray &property) template <> void TypeIndex::addProperty(const QByteArray &property) { - auto indexer = [this, property](const QByteArray &identifier, const QVariant &value, Sink::Storage::Transaction &transaction) { + auto indexer = [this, property](const QByteArray &identifier, const QVariant &value, Sink::Storage::DataStore::Transaction &transaction) { //SinkTrace() << "Indexing " << mType + ".index." + property << getByteArray(value); Index(indexName(property), transaction).add(getByteArray(value), identifier); }; @@ -99,7 +99,7 @@ void TypeIndex::addProperty(const QByteArray &property) template <> void TypeIndex::addPropertyWithSorting(const QByteArray &property, const QByteArray &sortProperty) { - auto indexer = [=](const QByteArray &identifier, const QVariant &value, const QVariant &sortValue, Sink::Storage::Transaction &transaction) { + auto indexer = [=](const QByteArray &identifier, const QVariant &value, const QVariant &sortValue, Sink::Storage::DataStore::Transaction &transaction) { const auto date = sortValue.toDateTime(); const auto propertyValue = getByteArray(value); Index(indexName(property, sortProperty), transaction).add(propertyValue + toSortableByteArray(date), identifier); @@ -108,7 +108,7 @@ void TypeIndex::addPropertyWithSorting(const QByteArray & mSortedProperties.insert(property, sortProperty); } -void TypeIndex::add(const QByteArray &identifier, const Sink::ApplicationDomain::BufferAdaptor &bufferAdaptor, Sink::Storage::Transaction &transaction) +void TypeIndex::add(const QByteArray &identifier, const Sink::ApplicationDomain::BufferAdaptor &bufferAdaptor, Sink::Storage::DataStore::Transaction &transaction) { for (const auto &property : mProperties) { const auto value = bufferAdaptor.getProperty(property); @@ -123,7 +123,7 @@ void TypeIndex::add(const QByteArray &identifier, const Sink::ApplicationDomain: } } -void TypeIndex::remove(const QByteArray &identifier, const Sink::ApplicationDomain::BufferAdaptor &bufferAdaptor, Sink::Storage::Transaction &transaction) +void TypeIndex::remove(const QByteArray &identifier, const Sink::ApplicationDomain::BufferAdaptor &bufferAdaptor, Sink::Storage::DataStore::Transaction &transaction) { for (const auto &property : mProperties) { const auto value = bufferAdaptor.getProperty(property); @@ -159,7 +159,7 @@ static QVector indexLookup(Index &index, Query::Comparator filter) return keys; } -QVector TypeIndex::query(const Sink::Query &query, QSet &appliedFilters, QByteArray &appliedSorting, Sink::Storage::Transaction &transaction) +QVector TypeIndex::query(const Sink::Query &query, QSet &appliedFilters, QByteArray &appliedSorting, Sink::Storage::DataStore::Transaction &transaction) { QVector keys; for (auto it = mSortedProperties.constBegin(); it != mSortedProperties.constEnd(); it++) { @@ -185,7 +185,7 @@ QVector TypeIndex::query(const Sink::Query &query, QSet return keys; } -QVector TypeIndex::lookup(const QByteArray &property, const QVariant &value, Sink::Storage::Transaction &transaction) +QVector TypeIndex::lookup(const QByteArray &property, const QVariant &value, Sink::Storage::DataStore::Transaction &transaction) { SinkTrace() << "Index lookup on property: " << property << mSecondaryProperties.keys() << mProperties; if (mProperties.contains(property)) { @@ -218,19 +218,19 @@ QVector TypeIndex::lookup(const QByteArray &property, const QVariant } template <> -void TypeIndex::index(const QByteArray &leftName, const QByteArray &rightName, const QVariant &leftValue, const QVariant &rightValue, Sink::Storage::Transaction &transaction) +void TypeIndex::index(const QByteArray &leftName, const QByteArray &rightName, const QVariant &leftValue, const QVariant &rightValue, Sink::Storage::DataStore::Transaction &transaction) { Index(indexName(leftName + rightName), transaction).add(getByteArray(leftValue), getByteArray(rightValue)); } template <> -void TypeIndex::index(const QByteArray &leftName, const QByteArray &rightName, const QVariant &leftValue, const QVariant &rightValue, Sink::Storage::Transaction &transaction) +void TypeIndex::index(const QByteArray &leftName, const QByteArray &rightName, const QVariant &leftValue, const QVariant &rightValue, Sink::Storage::DataStore::Transaction &transaction) { Index(indexName(leftName + rightName), transaction).add(getByteArray(leftValue), getByteArray(rightValue)); } template <> -QVector TypeIndex::secondaryLookup(const QByteArray &leftName, const QByteArray &rightName, const QVariant &value, Sink::Storage::Transaction &transaction) +QVector TypeIndex::secondaryLookup(const QByteArray &leftName, const QByteArray &rightName, const QVariant &value, Sink::Storage::DataStore::Transaction &transaction) { QVector keys; Index index(indexName(leftName + rightName), transaction); @@ -242,7 +242,7 @@ QVector TypeIndex::secondaryLookup(const QByteArray &lef } template <> -QVector TypeIndex::secondaryLookup(const QByteArray &leftName, const QByteArray &rightName, const QVariant &value, Sink::Storage::Transaction &transaction) +QVector TypeIndex::secondaryLookup(const QByteArray &leftName, const QByteArray &rightName, const QVariant &value, Sink::Storage::DataStore::Transaction &transaction) { QVector keys; Index index(indexName(leftName + rightName), transaction); diff --git a/common/typeindex.h b/common/typeindex.h index 4972e95..2638577 100644 --- a/common/typeindex.h +++ b/common/typeindex.h @@ -52,29 +52,29 @@ public: { mSecondaryProperties.insert(Left::name, Right::name); } - void add(const QByteArray &identifier, const Sink::ApplicationDomain::BufferAdaptor &bufferAdaptor, Sink::Storage::Transaction &transaction); - void remove(const QByteArray &identifier, const Sink::ApplicationDomain::BufferAdaptor &bufferAdaptor, Sink::Storage::Transaction &transaction); + void add(const QByteArray &identifier, const Sink::ApplicationDomain::BufferAdaptor &bufferAdaptor, Sink::Storage::DataStore::Transaction &transaction); + void remove(const QByteArray &identifier, const Sink::ApplicationDomain::BufferAdaptor &bufferAdaptor, Sink::Storage::DataStore::Transaction &transaction); - QVector query(const Sink::Query &query, QSet &appliedFilters, QByteArray &appliedSorting, Sink::Storage::Transaction &transaction); - QVector lookup(const QByteArray &property, const QVariant &value, Sink::Storage::Transaction &transaction); + QVector query(const Sink::Query &query, QSet &appliedFilters, QByteArray &appliedSorting, Sink::Storage::DataStore::Transaction &transaction); + QVector lookup(const QByteArray &property, const QVariant &value, Sink::Storage::DataStore::Transaction &transaction); template - QVector secondaryLookup(const QVariant &value, Sink::Storage::Transaction &transaction) + QVector secondaryLookup(const QVariant &value, Sink::Storage::DataStore::Transaction &transaction) { return secondaryLookup(Left::name, Right::name, value, transaction); } template - QVector secondaryLookup(const QByteArray &leftName, const QByteArray &rightName, const QVariant &value, Sink::Storage::Transaction &transaction); + QVector secondaryLookup(const QByteArray &leftName, const QByteArray &rightName, const QVariant &value, Sink::Storage::DataStore::Transaction &transaction); template - void index(const QVariant &leftValue, const QVariant &rightValue, Sink::Storage::Transaction &transaction) + void index(const QVariant &leftValue, const QVariant &rightValue, Sink::Storage::DataStore::Transaction &transaction) { index(Left::name, Right::name, leftValue, rightValue, transaction); } template - void index(const QByteArray &leftName, const QByteArray &rightName, const QVariant &leftValue, const QVariant &rightValue, Sink::Storage::Transaction &transaction); + void index(const QByteArray &leftName, const QByteArray &rightName, const QVariant &leftValue, const QVariant &rightValue, Sink::Storage::DataStore::Transaction &transaction); private: @@ -85,6 +85,6 @@ private: QMap mSortedProperties; // QMap mSecondaryProperties; - QHash> mIndexer; - QHash> mSortIndexer; + QHash> mIndexer; + QHash> mSortIndexer; }; diff --git a/examples/dummyresource/facade.cpp b/examples/dummyresource/facade.cpp index 120498a..4343eba 100644 --- a/examples/dummyresource/facade.cpp +++ b/examples/dummyresource/facade.cpp @@ -21,8 +21,8 @@ #include "domainadaptor.h" -DummyResourceFacade::DummyResourceFacade(const QByteArray &instanceIdentifier) - : Sink::GenericFacade(instanceIdentifier, QSharedPointer::create()) +DummyResourceFacade::DummyResourceFacade(const Sink::ResourceContext &context) + : Sink::GenericFacade(context) { } @@ -31,8 +31,8 @@ DummyResourceFacade::~DummyResourceFacade() } -DummyResourceMailFacade::DummyResourceMailFacade(const QByteArray &instanceIdentifier) - : Sink::GenericFacade(instanceIdentifier, QSharedPointer::create()) +DummyResourceMailFacade::DummyResourceMailFacade(const Sink::ResourceContext &context) + : Sink::GenericFacade(context) { } @@ -41,8 +41,8 @@ DummyResourceMailFacade::~DummyResourceMailFacade() } -DummyResourceFolderFacade::DummyResourceFolderFacade(const QByteArray &instanceIdentifier) - : Sink::GenericFacade(instanceIdentifier, QSharedPointer::create()) +DummyResourceFolderFacade::DummyResourceFolderFacade(const Sink::ResourceContext &context) + : Sink::GenericFacade(context) { } diff --git a/examples/dummyresource/facade.h b/examples/dummyresource/facade.h index 5e0096d..1bb45fd 100644 --- a/examples/dummyresource/facade.h +++ b/examples/dummyresource/facade.h @@ -25,20 +25,20 @@ class DummyResourceFacade : public Sink::GenericFacade { public: - DummyResourceFacade(const QByteArray &instanceIdentifier); + DummyResourceFacade(const Sink::ResourceContext &context); virtual ~DummyResourceFacade(); }; class DummyResourceMailFacade : public Sink::GenericFacade { public: - DummyResourceMailFacade(const QByteArray &instanceIdentifier); + DummyResourceMailFacade(const Sink::ResourceContext &context); virtual ~DummyResourceMailFacade(); }; class DummyResourceFolderFacade : public Sink::GenericFacade { public: - DummyResourceFolderFacade(const QByteArray &instanceIdentifier); + DummyResourceFolderFacade(const Sink::ResourceContext &context); virtual ~DummyResourceFolderFacade(); }; diff --git a/examples/dummyresource/resourcefactory.cpp b/examples/dummyresource/resourcefactory.cpp index 6d14721..e288be2 100644 --- a/examples/dummyresource/resourcefactory.cpp +++ b/examples/dummyresource/resourcefactory.cpp @@ -53,8 +53,8 @@ SINK_DEBUG_AREA("dummyresource") class DummySynchronizer : public Sink::Synchronizer { public: - DummySynchronizer(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier) - : Sink::Synchronizer(resourceType, resourceInstanceIdentifier) + DummySynchronizer(const Sink::ResourceContext &context) + : Sink::Synchronizer(context) { } @@ -129,11 +129,11 @@ class DummySynchronizer : public Sink::Synchronizer { }; -DummyResource::DummyResource(const QByteArray &instanceIdentifier, const QSharedPointer &pipeline) - : Sink::GenericResource(PLUGIN_NAME, instanceIdentifier, pipeline) +DummyResource::DummyResource(const Sink::ResourceContext &resourceContext, const QSharedPointer &pipeline) + : Sink::GenericResource(resourceContext, pipeline) { - setupSynchronizer(QSharedPointer::create(PLUGIN_NAME, instanceIdentifier)); - setupChangereplay(QSharedPointer::create(instanceIdentifier)); + setupSynchronizer(QSharedPointer::create(resourceContext)); + setupChangereplay(QSharedPointer::create(resourceContext)); setupPreprocessors(ENTITY_TYPE_MAIL, QVector() << new MailPropertyExtractor << new DefaultIndexUpdater); setupPreprocessors(ENTITY_TYPE_FOLDER, @@ -182,9 +182,9 @@ DummyResourceFactory::DummyResourceFactory(QObject *parent) } -Sink::Resource *DummyResourceFactory::createResource(const QByteArray &instanceIdentifier) +Sink::Resource *DummyResourceFactory::createResource(const Sink::ResourceContext &resourceContext) { - return new DummyResource(instanceIdentifier); + return new DummyResource(resourceContext); } void DummyResourceFactory::registerFacades(Sink::FacadeFactory &factory) diff --git a/examples/dummyresource/resourcefactory.h b/examples/dummyresource/resourcefactory.h index 0a29d53..3dd82ff 100644 --- a/examples/dummyresource/resourcefactory.h +++ b/examples/dummyresource/resourcefactory.h @@ -32,7 +32,7 @@ class DummyResource : public Sink::GenericResource { public: - DummyResource(const QByteArray &instanceIdentifier, const QSharedPointer &pipeline = QSharedPointer()); + DummyResource(const Sink::ResourceContext &resourceContext, const QSharedPointer &pipeline = QSharedPointer()); virtual ~DummyResource(); KAsync::Job synchronizeWithSource() Q_DECL_OVERRIDE; @@ -48,7 +48,7 @@ class DummyResourceFactory : public Sink::ResourceFactory public: DummyResourceFactory(QObject *parent = 0); - Sink::Resource *createResource(const QByteArray &instanceIdentifier) Q_DECL_OVERRIDE; + Sink::Resource *createResource(const Sink::ResourceContext &resourceContext) Q_DECL_OVERRIDE; void registerFacades(Sink::FacadeFactory &factory) Q_DECL_OVERRIDE; void registerAdaptorFactories(Sink::AdaptorFactoryRegistry ®istry) Q_DECL_OVERRIDE; void removeDataFromDisk(const QByteArray &instanceIdentifier) Q_DECL_OVERRIDE; diff --git a/examples/imapresource/facade.cpp b/examples/imapresource/facade.cpp index d338b01..2829bb1 100644 --- a/examples/imapresource/facade.cpp +++ b/examples/imapresource/facade.cpp @@ -25,8 +25,8 @@ #include "domainadaptor.h" #include "queryrunner.h" -ImapResourceMailFacade::ImapResourceMailFacade(const QByteArray &instanceIdentifier) - : Sink::GenericFacade(instanceIdentifier, QSharedPointer::create()) +ImapResourceMailFacade::ImapResourceMailFacade(const Sink::ResourceContext &context) + : Sink::GenericFacade(context) { } @@ -34,8 +34,8 @@ ImapResourceMailFacade::~ImapResourceMailFacade() { } -ImapResourceFolderFacade::ImapResourceFolderFacade(const QByteArray &instanceIdentifier) - : Sink::GenericFacade(instanceIdentifier, QSharedPointer::create()) +ImapResourceFolderFacade::ImapResourceFolderFacade(const Sink::ResourceContext &context) + : Sink::GenericFacade(context) { } diff --git a/examples/imapresource/facade.h b/examples/imapresource/facade.h index 479ad96..1d24856 100644 --- a/examples/imapresource/facade.h +++ b/examples/imapresource/facade.h @@ -24,13 +24,13 @@ class ImapResourceMailFacade : public Sink::GenericFacade { public: - ImapResourceMailFacade(const QByteArray &instanceIdentifier); + ImapResourceMailFacade(const Sink::ResourceContext &context); virtual ~ImapResourceMailFacade(); }; class ImapResourceFolderFacade : public Sink::GenericFacade { public: - ImapResourceFolderFacade(const QByteArray &instanceIdentifier); + ImapResourceFolderFacade(const Sink::ResourceContext &context); virtual ~ImapResourceFolderFacade(); }; diff --git a/examples/imapresource/imapresource.cpp b/examples/imapresource/imapresource.cpp index c72579c..0ea07bf 100644 --- a/examples/imapresource/imapresource.cpp +++ b/examples/imapresource/imapresource.cpp @@ -50,8 +50,8 @@ #include #include "imapserverproxy.h" -#include "entityreader.h" #include "mailpreprocessor.h" +#include "adaptorfactoryregistry.h" #include "specialpurposepreprocessor.h" //This is the resources entity type, and not the domain type @@ -92,8 +92,8 @@ static QByteArray assembleMailRid(const ApplicationDomain::Mail &mail, qint64 im class ImapSynchronizer : public Sink::Synchronizer { public: - ImapSynchronizer(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier) - : Sink::Synchronizer(resourceType, resourceInstanceIdentifier) + ImapSynchronizer(const ResourceContext &resourceContext) + : Sink::Synchronizer(resourceContext) { } @@ -126,17 +126,6 @@ public: SinkTrace() << "Found folders " << folderList.size(); scanForRemovals(bufferType, - [this, &bufferType](const std::function &callback) { - //TODO Instead of iterating over all entries in the database, which can also pick up the same item multiple times, - //we should rather iterate over an index that contains every uid exactly once. The remoteId index would be such an index, - //but we currently fail to iterate over all entries in an index it seems. - // auto remoteIds = synchronizationTransaction.openDatabase("rid.mapping." + bufferType, std::function(), true); - auto mainDatabase = Sink::Storage::mainDatabase(transaction(), bufferType); - mainDatabase.scan("", [&](const QByteArray &key, const QByteArray &) { - callback(key); - return true; - }); - }, [&folderList](const QByteArray &remoteId) -> bool { // folderList.contains(remoteId) for (const auto &folderPath : folderList) { @@ -190,18 +179,12 @@ public: const auto folderLocalId = syncStore().resolveRemoteId(ENTITY_TYPE_FOLDER, path.toUtf8()); int count = 0; - auto property = Sink::ApplicationDomain::Mail::Folder::name; + scanForRemovals(bufferType, [&](const std::function &callback) { - Index index(bufferType + ".index." + property, transaction()); - index.lookup(folderLocalId, [&](const QByteArray &sinkId) { - callback(sinkId); - }, - [&](const Index::Error &error) { - SinkWarning() << "Error in index: " << error.message << property; - }); + store().indexLookup(folderLocalId, callback); }, - [messages, path, &count](const QByteArray &remoteId) -> bool { + [&](const QByteArray &remoteId) -> bool { if (messages.contains(uidFromMailRid(remoteId))) { return true; } @@ -347,7 +330,7 @@ public: class ImapWriteback : public Sink::SourceWriteBack { public: - ImapWriteback(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier) : Sink::SourceWriteBack(resourceType, resourceInstanceIdentifier) + ImapWriteback(const ResourceContext &resourceContext) : Sink::SourceWriteBack(resourceContext) { } @@ -514,10 +497,10 @@ public: QByteArray mResourceInstanceIdentifier; }; -ImapResource::ImapResource(const QByteArray &instanceIdentifier, const QSharedPointer &pipeline) - : Sink::GenericResource(PLUGIN_NAME, instanceIdentifier, pipeline) +ImapResource::ImapResource(const ResourceContext &resourceContext, const QSharedPointer &pipeline) + : Sink::GenericResource(resourceContext, pipeline) { - auto config = ResourceConfig::getConfiguration(instanceIdentifier); + auto config = ResourceConfig::getConfiguration(resourceContext.instanceId()); mServer = config.value("server").toString(); mPort = config.value("port").toInt(); mUser = config.value("username").toString(); @@ -532,46 +515,45 @@ ImapResource::ImapResource(const QByteArray &instanceIdentifier, const QSharedPo mPort = list.at(1).toInt(); } - auto synchronizer = QSharedPointer::create(PLUGIN_NAME, instanceIdentifier); + auto synchronizer = QSharedPointer::create(resourceContext); synchronizer->mServer = mServer; synchronizer->mPort = mPort; synchronizer->mUser = mUser; synchronizer->mPassword = mPassword; - synchronizer->mResourceInstanceIdentifier = instanceIdentifier; setupSynchronizer(synchronizer); - auto changereplay = QSharedPointer::create(PLUGIN_NAME, instanceIdentifier); + auto changereplay = QSharedPointer::create(resourceContext); changereplay->mServer = mServer; changereplay->mPort = mPort; changereplay->mUser = mUser; changereplay->mPassword = mPassword; setupChangereplay(changereplay); - setupPreprocessors(ENTITY_TYPE_MAIL, QVector() << new SpecialPurposeProcessor(mResourceType, mResourceInstanceIdentifier) << new MimeMessageMover << new MailPropertyExtractor << new DefaultIndexUpdater); + setupPreprocessors(ENTITY_TYPE_MAIL, QVector() << new SpecialPurposeProcessor(resourceContext.resourceType, resourceContext.instanceId()) << new MimeMessageMover << new MailPropertyExtractor << new DefaultIndexUpdater); setupPreprocessors(ENTITY_TYPE_FOLDER, QVector() << new DefaultIndexUpdater); } void ImapResource::removeFromDisk(const QByteArray &instanceIdentifier) { GenericResource::removeFromDisk(instanceIdentifier); - Sink::Storage(Sink::storageLocation(), instanceIdentifier + ".synchronization", Sink::Storage::ReadWrite).removeFromDisk(); + Sink::Storage::DataStore(Sink::storageLocation(), instanceIdentifier + ".synchronization", Sink::Storage::DataStore::ReadWrite).removeFromDisk(); } KAsync::Job ImapResource::inspect(int inspectionType, const QByteArray &inspectionId, const QByteArray &domainType, const QByteArray &entityId, const QByteArray &property, const QVariant &expectedValue) { - auto synchronizationStore = QSharedPointer::create(Sink::storageLocation(), mResourceInstanceIdentifier + ".synchronization", Sink::Storage::ReadOnly); - auto synchronizationTransaction = synchronizationStore->createTransaction(Sink::Storage::ReadOnly); + auto synchronizationStore = QSharedPointer::create(Sink::storageLocation(), mResourceContext.instanceId() + ".synchronization", Sink::Storage::DataStore::ReadOnly); + auto synchronizationTransaction = synchronizationStore->createTransaction(Sink::Storage::DataStore::ReadOnly); - auto mainStore = QSharedPointer::create(Sink::storageLocation(), mResourceInstanceIdentifier, Sink::Storage::ReadOnly); - auto transaction = mainStore->createTransaction(Sink::Storage::ReadOnly); + auto mainStore = QSharedPointer::create(Sink::storageLocation(), mResourceContext.instanceId(), Sink::Storage::DataStore::ReadOnly); + auto transaction = mainStore->createTransaction(Sink::Storage::DataStore::ReadOnly); - auto entityStore = QSharedPointer::create(mResourceType, mResourceInstanceIdentifier, transaction); + Sink::Storage::EntityStore entityStore(mResourceContext); auto syncStore = QSharedPointer::create(synchronizationTransaction); SinkTrace() << "Inspecting " << inspectionType << domainType << entityId << property << expectedValue; if (domainType == ENTITY_TYPE_MAIL) { - const auto mail = entityStore->read(entityId); - const auto folder = entityStore->read(mail.getFolder()); + const auto mail = entityStore.readLatest(entityId); + const auto folder = entityStore.readLatest(mail.getFolder()); const auto folderRemoteId = syncStore->resolveLocalId(ENTITY_TYPE_FOLDER, mail.getFolder()); const auto mailRemoteId = syncStore->resolveLocalId(ENTITY_TYPE_MAIL, mail.identifier()); if (mailRemoteId.isEmpty() || folderRemoteId.isEmpty()) { @@ -635,7 +617,7 @@ KAsync::Job ImapResource::inspect(int inspectionType, const QByteArray &in } if (domainType == ENTITY_TYPE_FOLDER) { const auto remoteId = syncStore->resolveLocalId(ENTITY_TYPE_FOLDER, entityId); - const auto folder = entityStore->read(entityId); + const auto folder = entityStore.readLatest(entityId); if (inspectionType == Sink::ResourceControl::Inspection::CacheIntegrityInspectionType) { SinkLog() << "Inspecting cache integrity" << remoteId; @@ -698,9 +680,9 @@ ImapResourceFactory::ImapResourceFactory(QObject *parent) } -Sink::Resource *ImapResourceFactory::createResource(const QByteArray &instanceIdentifier) +Sink::Resource *ImapResourceFactory::createResource(const ResourceContext &context) { - return new ImapResource(instanceIdentifier); + return new ImapResource(context); } void ImapResourceFactory::registerFacades(Sink::FacadeFactory &factory) diff --git a/examples/imapresource/imapresource.h b/examples/imapresource/imapresource.h index 236e695..684a3c9 100644 --- a/examples/imapresource/imapresource.h +++ b/examples/imapresource/imapresource.h @@ -42,7 +42,7 @@ struct Folder; class ImapResource : public Sink::GenericResource { public: - ImapResource(const QByteArray &instanceIdentifier, const QSharedPointer &pipeline = QSharedPointer()); + ImapResource(const Sink::ResourceContext &resourceContext, const QSharedPointer &pipeline = QSharedPointer()); KAsync::Job inspect(int inspectionType, const QByteArray &inspectionId, const QByteArray &domainType, const QByteArray &entityId, const QByteArray &property, const QVariant &expectedValue) Q_DECL_OVERRIDE; static void removeFromDisk(const QByteArray &instanceIdentifier); @@ -62,7 +62,7 @@ class ImapResourceFactory : public Sink::ResourceFactory public: ImapResourceFactory(QObject *parent = 0); - Sink::Resource *createResource(const QByteArray &instanceIdentifier) Q_DECL_OVERRIDE; + Sink::Resource *createResource(const Sink::ResourceContext &resourceContext) Q_DECL_OVERRIDE; void registerFacades(Sink::FacadeFactory &factory) Q_DECL_OVERRIDE; void registerAdaptorFactories(Sink::AdaptorFactoryRegistry ®istry) Q_DECL_OVERRIDE; void removeDataFromDisk(const QByteArray &instanceIdentifier) Q_DECL_OVERRIDE; diff --git a/examples/maildirresource/facade.cpp b/examples/maildirresource/facade.cpp index 256b255..ba53c5f 100644 --- a/examples/maildirresource/facade.cpp +++ b/examples/maildirresource/facade.cpp @@ -22,11 +22,10 @@ #include #include -#include "domainadaptor.h" -#include "queryrunner.h" +#include "query.h" -MaildirResourceMailFacade::MaildirResourceMailFacade(const QByteArray &instanceIdentifier) - : Sink::GenericFacade(instanceIdentifier, QSharedPointer::create()) +MaildirResourceMailFacade::MaildirResourceMailFacade(const Sink::ResourceContext &context) + : Sink::GenericFacade(context) { mResultTransformation = [](Sink::ApplicationDomain::ApplicationDomainType &value) { if (value.hasProperty("mimeMessage")) { @@ -62,8 +61,8 @@ QPair, Sink::ResultEmitter } -MaildirResourceFolderFacade::MaildirResourceFolderFacade(const QByteArray &instanceIdentifier) - : Sink::GenericFacade(instanceIdentifier, QSharedPointer::create()) +MaildirResourceFolderFacade::MaildirResourceFolderFacade(const Sink::ResourceContext &context) + : Sink::GenericFacade(context) { } diff --git a/examples/maildirresource/facade.h b/examples/maildirresource/facade.h index 38981d0..fdb693e 100644 --- a/examples/maildirresource/facade.h +++ b/examples/maildirresource/facade.h @@ -24,7 +24,7 @@ class MaildirResourceMailFacade : public Sink::GenericFacade { public: - MaildirResourceMailFacade(const QByteArray &instanceIdentifier); + MaildirResourceMailFacade(const Sink::ResourceContext &context); virtual ~MaildirResourceMailFacade(); QPair, Sink::ResultEmitter::Ptr> load(const Sink::Query &query) Q_DECL_OVERRIDE; }; @@ -32,6 +32,6 @@ public: class MaildirResourceFolderFacade : public Sink::GenericFacade { public: - MaildirResourceFolderFacade(const QByteArray &instanceIdentifier); + MaildirResourceFolderFacade(const Sink::ResourceContext &context); virtual ~MaildirResourceFolderFacade(); }; diff --git a/examples/maildirresource/maildirresource.cpp b/examples/maildirresource/maildirresource.cpp index 1ed7fc8..b89d78c 100644 --- a/examples/maildirresource/maildirresource.cpp +++ b/examples/maildirresource/maildirresource.cpp @@ -85,13 +85,13 @@ class MaildirMimeMessageMover : public Sink::Preprocessor public: MaildirMimeMessageMover(const QByteArray &resourceInstanceIdentifier, const QString &maildirPath) : mResourceInstanceIdentifier(resourceInstanceIdentifier), mMaildirPath(maildirPath) {} - QString getPath(const QByteArray &folderIdentifier, Sink::Storage::Transaction &transaction) + QString getPath(const QByteArray &folderIdentifier, Sink::Storage::DataStore::Transaction &transaction) { if (folderIdentifier.isEmpty()) { return mMaildirPath; } QString folderPath; - auto db = Sink::Storage::mainDatabase(transaction, ENTITY_TYPE_FOLDER); + auto db = Sink::Storage::DataStore::mainDatabase(transaction, ENTITY_TYPE_FOLDER); db.findLatest(folderIdentifier, [&](const QByteArray &, const QByteArray &value) { Sink::EntityBuffer buffer(value); const Sink::Entity &entity = buffer.entity(); @@ -108,7 +108,7 @@ public: return folderPath; } - QString moveMessage(const QString &oldPath, const QByteArray &folder, Sink::Storage::Transaction &transaction) + QString moveMessage(const QString &oldPath, const QByteArray &folder, Sink::Storage::DataStore::Transaction &transaction) { if (oldPath.startsWith(Sink::temporaryFileLocation())) { const auto path = getPath(folder, transaction); @@ -141,7 +141,7 @@ public: } } - void newEntity(const QByteArray &uid, qint64 revision, Sink::ApplicationDomain::BufferAdaptor &newEntity, Sink::Storage::Transaction &transaction) Q_DECL_OVERRIDE + void newEntity(const QByteArray &uid, qint64 revision, Sink::ApplicationDomain::BufferAdaptor &newEntity, Sink::Storage::DataStore::Transaction &transaction) Q_DECL_OVERRIDE { const auto mimeMessage = newEntity.getProperty("mimeMessage"); if (mimeMessage.isValid()) { @@ -150,7 +150,7 @@ public: } void modifiedEntity(const QByteArray &uid, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &oldEntity, Sink::ApplicationDomain::BufferAdaptor &newEntity, - Sink::Storage::Transaction &transaction) Q_DECL_OVERRIDE + Sink::Storage::DataStore::Transaction &transaction) Q_DECL_OVERRIDE { const auto mimeMessage = newEntity.getProperty("mimeMessage"); const auto newFolder = newEntity.getProperty("folder"); @@ -185,7 +185,7 @@ public: maildir.changeEntryFlags(identifier, flags); } - void deletedEntity(const QByteArray &uid, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &oldEntity, Sink::Storage::Transaction &transaction) Q_DECL_OVERRIDE + void deletedEntity(const QByteArray &uid, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &oldEntity, Sink::Storage::DataStore::Transaction &transaction) Q_DECL_OVERRIDE { const auto filePath = getFilePathFromMimeMessagePath(oldEntity.getProperty("mimeMessage").toString()); QFile::remove(filePath); @@ -199,7 +199,7 @@ class FolderPreprocessor : public Sink::Preprocessor public: FolderPreprocessor(const QString maildirPath) : mMaildirPath(maildirPath) {} - void newEntity(const QByteArray &uid, qint64 revision, Sink::ApplicationDomain::BufferAdaptor &newEntity, Sink::Storage::Transaction &transaction) Q_DECL_OVERRIDE + void newEntity(const QByteArray &uid, qint64 revision, Sink::ApplicationDomain::BufferAdaptor &newEntity, Sink::Storage::DataStore::Transaction &transaction) Q_DECL_OVERRIDE { auto folderName = newEntity.getProperty("name").toString(); const auto path = mMaildirPath + "/" + folderName; @@ -208,11 +208,11 @@ public: } void modifiedEntity(const QByteArray &uid, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &oldEntity, Sink::ApplicationDomain::BufferAdaptor &newEntity, - Sink::Storage::Transaction &transaction) Q_DECL_OVERRIDE + Sink::Storage::DataStore::Transaction &transaction) Q_DECL_OVERRIDE { } - void deletedEntity(const QByteArray &uid, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &oldEntity, Sink::Storage::Transaction &transaction) Q_DECL_OVERRIDE + void deletedEntity(const QByteArray &uid, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &oldEntity, Sink::Storage::DataStore::Transaction &transaction) Q_DECL_OVERRIDE { } QString mMaildirPath; @@ -221,8 +221,8 @@ public: class MaildirSynchronizer : public Sink::Synchronizer { public: - MaildirSynchronizer(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier) - : Sink::Synchronizer(resourceType, resourceInstanceIdentifier) + MaildirSynchronizer(const Sink::ResourceContext &resourceContext) + : Sink::Synchronizer(resourceContext) { } @@ -278,19 +278,7 @@ public: const QByteArray bufferType = ENTITY_TYPE_FOLDER; QStringList folderList = listAvailableFolders(); SinkTrace() << "Found folders " << folderList; - scanForRemovals(bufferType, - [this, &bufferType](const std::function &callback) { - //TODO Instead of iterating over all entries in the database, which can also pick up the same item multiple times, - //we should rather iterate over an index that contains every uid exactly once. The remoteId index would be such an index, - //but we currently fail to iterate over all entries in an index it seems. - // auto remoteIds = synchronizationTransaction.openDatabase("rid.mapping." + bufferType, std::function(), true); - auto mainDatabase = Sink::Storage::mainDatabase(transaction(), bufferType); - mainDatabase.scan("", [&](const QByteArray &key, const QByteArray &) { - callback(key); - return true; - }); - }, [&folderList](const QByteArray &remoteId) -> bool { return folderList.contains(remoteId); } @@ -323,16 +311,9 @@ public: const auto folderLocalId = syncStore().resolveRemoteId(ENTITY_TYPE_FOLDER, path.toUtf8()); - auto property = "folder"; scanForRemovals(bufferType, [&](const std::function &callback) { - Index index(bufferType + ".index." + property, transaction()); - index.lookup(folderLocalId, [&](const QByteArray &sinkId) { - callback(sinkId); - }, - [&](const Index::Error &error) { - SinkWarning() << "Error in index: " << error.message << property; - }); + store().indexLookup(folderLocalId, callback); }, [](const QByteArray &remoteId) -> bool { return QFile(remoteId).exists(); @@ -392,7 +373,7 @@ public: class MaildirWriteback : public Sink::SourceWriteBack { public: - MaildirWriteback(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier) : Sink::SourceWriteBack(resourceType, resourceInstanceIdentifier) + MaildirWriteback(const Sink::ResourceContext &resourceContext) : Sink::SourceWriteBack(resourceContext) { } @@ -442,24 +423,24 @@ public: }; -MaildirResource::MaildirResource(const QByteArray &instanceIdentifier, const QSharedPointer &pipeline) - : Sink::GenericResource(PLUGIN_NAME, instanceIdentifier, pipeline) +MaildirResource::MaildirResource(const Sink::ResourceContext &resourceContext, const QSharedPointer &pipeline) + : Sink::GenericResource(resourceContext, pipeline) { - auto config = ResourceConfig::getConfiguration(instanceIdentifier); + auto config = ResourceConfig::getConfiguration(resourceContext.instanceId()); mMaildirPath = QDir::cleanPath(QDir::fromNativeSeparators(config.value("path").toString())); //Chop a trailing slash if necessary if (mMaildirPath.endsWith("/")) { mMaildirPath.chop(1); } - auto synchronizer = QSharedPointer::create(PLUGIN_NAME, instanceIdentifier); + auto synchronizer = QSharedPointer::create(resourceContext); synchronizer->mMaildirPath = mMaildirPath; setupSynchronizer(synchronizer); - auto changereplay = QSharedPointer::create(PLUGIN_NAME, instanceIdentifier); + auto changereplay = QSharedPointer::create(resourceContext); changereplay->mMaildirPath = mMaildirPath; setupChangereplay(changereplay); - setupPreprocessors(ENTITY_TYPE_MAIL, QVector() << new SpecialPurposeProcessor(mResourceType, mResourceInstanceIdentifier) << new MaildirMimeMessageMover(mResourceInstanceIdentifier, mMaildirPath) << new MaildirMailPropertyExtractor << new DefaultIndexUpdater); + setupPreprocessors(ENTITY_TYPE_MAIL, QVector() << new SpecialPurposeProcessor(resourceContext.resourceType, resourceContext.instanceId()) << new MaildirMimeMessageMover(resourceContext.instanceId(), mMaildirPath) << new MaildirMailPropertyExtractor << new DefaultIndexUpdater); setupPreprocessors(ENTITY_TYPE_FOLDER, QVector() << new FolderPreprocessor(mMaildirPath) << new DefaultIndexUpdater); KPIM::Maildir dir(mMaildirPath, true); @@ -480,24 +461,24 @@ MaildirResource::MaildirResource(const QByteArray &instanceIdentifier, const QSh void MaildirResource::removeFromDisk(const QByteArray &instanceIdentifier) { GenericResource::removeFromDisk(instanceIdentifier); - Sink::Storage(Sink::storageLocation(), instanceIdentifier + ".synchronization", Sink::Storage::ReadWrite).removeFromDisk(); + Sink::Storage::DataStore(Sink::storageLocation(), instanceIdentifier + ".synchronization", Sink::Storage::DataStore::ReadWrite).removeFromDisk(); } KAsync::Job MaildirResource::inspect(int inspectionType, const QByteArray &inspectionId, const QByteArray &domainType, const QByteArray &entityId, const QByteArray &property, const QVariant &expectedValue) { - auto synchronizationStore = QSharedPointer::create(Sink::storageLocation(), mResourceInstanceIdentifier + ".synchronization", Sink::Storage::ReadOnly); - auto synchronizationTransaction = synchronizationStore->createTransaction(Sink::Storage::ReadOnly); + auto synchronizationStore = QSharedPointer::create(Sink::storageLocation(), mResourceContext.instanceId() + ".synchronization", Sink::Storage::DataStore::ReadOnly); + auto synchronizationTransaction = synchronizationStore->createTransaction(Sink::Storage::DataStore::ReadOnly); - auto mainStore = QSharedPointer::create(Sink::storageLocation(), mResourceInstanceIdentifier, Sink::Storage::ReadOnly); - auto transaction = mainStore->createTransaction(Sink::Storage::ReadOnly); + auto mainStore = QSharedPointer::create(Sink::storageLocation(), mResourceContext.instanceId(), Sink::Storage::DataStore::ReadOnly); + auto transaction = mainStore->createTransaction(Sink::Storage::DataStore::ReadOnly); - auto entityStore = QSharedPointer::create(mResourceType, mResourceInstanceIdentifier, transaction); + Sink::Storage::EntityStore entityStore(mResourceContext); auto syncStore = QSharedPointer::create(synchronizationTransaction); SinkTrace() << "Inspecting " << inspectionType << domainType << entityId << property << expectedValue; if (domainType == ENTITY_TYPE_MAIL) { - auto mail = entityStore->read(entityId); + auto mail = entityStore.readLatest(entityId); const auto filePath = getFilePathFromMimeMessagePath(mail.getMimeMessagePath()); if (inspectionType == Sink::ResourceControl::Inspection::PropertyInspectionType) { @@ -530,7 +511,7 @@ KAsync::Job MaildirResource::inspect(int inspectionType, const QByteArray } if (domainType == ENTITY_TYPE_FOLDER) { const auto remoteId = syncStore->resolveLocalId(ENTITY_TYPE_FOLDER, entityId); - auto folder = entityStore->read(entityId); + auto folder = entityStore.readLatest(entityId); if (inspectionType == Sink::ResourceControl::Inspection::CacheIntegrityInspectionType) { SinkTrace() << "Inspecting cache integrity" << remoteId; @@ -577,9 +558,9 @@ MaildirResourceFactory::MaildirResourceFactory(QObject *parent) } -Sink::Resource *MaildirResourceFactory::createResource(const QByteArray &instanceIdentifier) +Sink::Resource *MaildirResourceFactory::createResource(const ResourceContext &context) { - return new MaildirResource(instanceIdentifier); + return new MaildirResource(context); } void MaildirResourceFactory::registerFacades(Sink::FacadeFactory &factory) diff --git a/examples/maildirresource/maildirresource.h b/examples/maildirresource/maildirresource.h index 490e1e6..6265819 100644 --- a/examples/maildirresource/maildirresource.h +++ b/examples/maildirresource/maildirresource.h @@ -45,7 +45,7 @@ class MaildirFolderAdaptorFactory; class MaildirResource : public Sink::GenericResource { public: - MaildirResource(const QByteArray &instanceIdentifier, const QSharedPointer &pipeline = QSharedPointer()); + MaildirResource(const Sink::ResourceContext &resourceContext, const QSharedPointer &pipeline = QSharedPointer()); KAsync::Job inspect(int inspectionType, const QByteArray &inspectionId, const QByteArray &domainType, const QByteArray &entityId, const QByteArray &property, const QVariant &expectedValue) Q_DECL_OVERRIDE; static void removeFromDisk(const QByteArray &instanceIdentifier); private: @@ -64,7 +64,7 @@ class MaildirResourceFactory : public Sink::ResourceFactory public: MaildirResourceFactory(QObject *parent = 0); - Sink::Resource *createResource(const QByteArray &instanceIdentifier) Q_DECL_OVERRIDE; + Sink::Resource *createResource(const Sink::ResourceContext &context) Q_DECL_OVERRIDE; void registerFacades(Sink::FacadeFactory &factory) Q_DECL_OVERRIDE; void registerAdaptorFactories(Sink::AdaptorFactoryRegistry ®istry) Q_DECL_OVERRIDE; void removeDataFromDisk(const QByteArray &instanceIdentifier) Q_DECL_OVERRIDE; diff --git a/examples/mailtransportresource/mailtransportresource.cpp b/examples/mailtransportresource/mailtransportresource.cpp index 3ce9476..9a22c41 100644 --- a/examples/mailtransportresource/mailtransportresource.cpp +++ b/examples/mailtransportresource/mailtransportresource.cpp @@ -41,6 +41,7 @@ #include #include #include +#include #define ENTITY_TYPE_MAIL "mail" @@ -52,7 +53,7 @@ using namespace Sink; class MailtransportWriteback : public Sink::SourceWriteBack { public: - MailtransportWriteback(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier) : Sink::SourceWriteBack(resourceType, resourceInstanceIdentifier) + MailtransportWriteback(const Sink::ResourceContext &resourceContext) : Sink::SourceWriteBack(resourceContext) { } @@ -74,9 +75,9 @@ public: class MailtransportSynchronizer : public Sink::Synchronizer { public: - MailtransportSynchronizer(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier) - : Sink::Synchronizer(resourceType, resourceInstanceIdentifier), - mResourceInstanceIdentifier(resourceInstanceIdentifier) + MailtransportSynchronizer(const Sink::ResourceContext &resourceContext) + : Sink::Synchronizer(resourceContext), + mResourceInstanceIdentifier(resourceContext.instanceId()) { } @@ -112,10 +113,9 @@ public: { SinkLog() << " Synchronizing"; return KAsync::start([this](KAsync::Future future) { - Sink::Query query; QList toSend; SinkLog() << " Looking for mail"; - store().reader().query(query, [&](const ApplicationDomain::Mail &mail) -> bool { + store().readAll([&](const ApplicationDomain::Mail &mail) -> bool { SinkTrace() << "Found mail: " << mail.identifier(); if (!mail.getSent()) { toSend << mail; @@ -143,10 +143,10 @@ public: MailtransportResource::Settings mSettings; }; -MailtransportResource::MailtransportResource(const QByteArray &instanceIdentifier, const QSharedPointer &pipeline) - : Sink::GenericResource(PLUGIN_NAME, instanceIdentifier, pipeline) +MailtransportResource::MailtransportResource(const Sink::ResourceContext &resourceContext, const QSharedPointer &pipeline) + : Sink::GenericResource(resourceContext, pipeline) { - auto config = ResourceConfig::getConfiguration(instanceIdentifier); + auto config = ResourceConfig::getConfiguration(resourceContext.instanceId()); mSettings = {config.value("server").toString(), config.value("username").toString(), config.value("cacert").toString(), @@ -154,11 +154,11 @@ MailtransportResource::MailtransportResource(const QByteArray &instanceIdentifie config.value("testmode").toBool() }; - auto synchronizer = QSharedPointer::create(PLUGIN_NAME, instanceIdentifier); + auto synchronizer = QSharedPointer::create(resourceContext); synchronizer->mSettings = mSettings; setupSynchronizer(synchronizer); - auto changereplay = QSharedPointer::create(PLUGIN_NAME, instanceIdentifier); + auto changereplay = QSharedPointer::create(resourceContext); changereplay->mSettings = mSettings; setupChangereplay(changereplay); @@ -168,14 +168,14 @@ MailtransportResource::MailtransportResource(const QByteArray &instanceIdentifie void MailtransportResource::removeFromDisk(const QByteArray &instanceIdentifier) { GenericResource::removeFromDisk(instanceIdentifier); - Sink::Storage(Sink::storageLocation(), instanceIdentifier + ".synchronization", Sink::Storage::ReadWrite).removeFromDisk(); + Sink::Storage::DataStore(Sink::storageLocation(), instanceIdentifier + ".synchronization", Sink::Storage::DataStore::ReadWrite).removeFromDisk(); } KAsync::Job MailtransportResource::inspect(int inspectionType, const QByteArray &inspectionId, const QByteArray &domainType, const QByteArray &entityId, const QByteArray &property, const QVariant &expectedValue) { if (domainType == ENTITY_TYPE_MAIL) { if (inspectionType == Sink::ResourceControl::Inspection::ExistenceInspectionType) { - auto path = resourceStorageLocation(mResourceInstanceIdentifier) + "/test/" + entityId; + auto path = resourceStorageLocation(mResourceContext.instanceId()) + "/test/" + entityId; if (QFileInfo::exists(path)) { return KAsync::null(); } @@ -191,14 +191,14 @@ MailtransportResourceFactory::MailtransportResourceFactory(QObject *parent) } -Sink::Resource *MailtransportResourceFactory::createResource(const QByteArray &instanceIdentifier) +Sink::Resource *MailtransportResourceFactory::createResource(const Sink::ResourceContext &context) { - return new MailtransportResource(instanceIdentifier); + return new MailtransportResource(context); } void MailtransportResourceFactory::registerFacades(Sink::FacadeFactory &factory) { - factory.registerFacade>>(PLUGIN_NAME); + factory.registerFacade>(PLUGIN_NAME); } void MailtransportResourceFactory::registerAdaptorFactories(Sink::AdaptorFactoryRegistry ®istry) diff --git a/examples/mailtransportresource/mailtransportresource.h b/examples/mailtransportresource/mailtransportresource.h index dcc33df..212880c 100644 --- a/examples/mailtransportresource/mailtransportresource.h +++ b/examples/mailtransportresource/mailtransportresource.h @@ -28,7 +28,7 @@ class MailtransportResource : public Sink::GenericResource { public: - MailtransportResource(const QByteArray &instanceIdentifier, const QSharedPointer &pipeline = QSharedPointer()); + MailtransportResource(const Sink::ResourceContext &resourceContext, const QSharedPointer &pipeline = QSharedPointer()); KAsync::Job inspect(int inspectionType, const QByteArray &inspectionId, const QByteArray &domainType, const QByteArray &entityId, const QByteArray &property, const QVariant &expectedValue) Q_DECL_OVERRIDE; static void removeFromDisk(const QByteArray &instanceIdentifier); @@ -52,7 +52,7 @@ class MailtransportResourceFactory : public Sink::ResourceFactory public: MailtransportResourceFactory(QObject *parent = 0); - Sink::Resource *createResource(const QByteArray &instanceIdentifier) Q_DECL_OVERRIDE; + Sink::Resource *createResource(const Sink::ResourceContext &resourceContext) Q_DECL_OVERRIDE; void registerFacades(Sink::FacadeFactory &factory) Q_DECL_OVERRIDE; void registerAdaptorFactories(Sink::AdaptorFactoryRegistry ®istry) Q_DECL_OVERRIDE; void removeDataFromDisk(const QByteArray &instanceIdentifier) Q_DECL_OVERRIDE; diff --git a/sinksh/syntax_modules/sink_stat.cpp b/sinksh/syntax_modules/sink_stat.cpp index 9f0fe44..5978c01 100644 --- a/sinksh/syntax_modules/sink_stat.cpp +++ b/sinksh/syntax_modules/sink_stat.cpp @@ -42,8 +42,8 @@ void statResources(const QStringList &resources, const State &state) { qint64 total = 0; for (const auto &resource : resources) { - Sink::Storage storage(Sink::storageLocation(), resource, Sink::Storage::ReadOnly); - auto transaction = storage.createTransaction(Sink::Storage::ReadOnly); + Sink::Storage::DataStore storage(Sink::storageLocation(), resource, Sink::Storage::DataStore::ReadOnly); + auto transaction = storage.createTransaction(Sink::Storage::DataStore::ReadOnly); QList databases = transaction.getDatabaseNames(); for (const auto &databaseName : databases) { @@ -57,7 +57,7 @@ void statResources(const QStringList &resources, const State &state) QDir dir(Sink::storageLocation()); for (const auto &folder : dir.entryList(QStringList() << resource + "*")) { - diskUsage += Sink::Storage(Sink::storageLocation(), folder, Sink::Storage::ReadOnly).diskUsage(); + diskUsage += Sink::Storage::DataStore(Sink::storageLocation(), folder, Sink::Storage::DataStore::ReadOnly).diskUsage(); } auto size = diskUsage / 1024; state.printLine(QObject::tr("Disk usage [kb]: %1").arg(size), 1); diff --git a/tests/clientapitest.cpp b/tests/clientapitest.cpp index fd3d5f0..94c78a7 100644 --- a/tests/clientapitest.cpp +++ b/tests/clientapitest.cpp @@ -22,11 +22,13 @@ public: auto facade = std::make_shared>(); map.insert(instanceIdentifier, facade); bool alwaysReturnFacade = instanceIdentifier.isEmpty(); - Sink::FacadeFactory::instance().registerFacade>("dummyresource", [alwaysReturnFacade](const QByteArray &instanceIdentifier) { + Sink::FacadeFactory::instance().registerFacade>("dummyresource", [alwaysReturnFacade](const Sink::ResourceContext &context) { if (alwaysReturnFacade) { + Q_ASSERT(map.contains(QByteArray())); return map.value(QByteArray()); } - return map.value(instanceIdentifier); + Q_ASSERT(map.contains(context.instanceId())); + return map.value(context.instanceId()); }); return facade; } diff --git a/tests/databasepopulationandfacadequerybenchmark.cpp b/tests/databasepopulationandfacadequerybenchmark.cpp index 5efe292..4e00bd4 100644 --- a/tests/databasepopulationandfacadequerybenchmark.cpp +++ b/tests/databasepopulationandfacadequerybenchmark.cpp @@ -38,13 +38,13 @@ class DatabasePopulationAndFacadeQueryBenchmark : public QObject void populateDatabase(int count) { - Sink::Storage(Sink::storageLocation(), "identifier", Sink::Storage::ReadWrite).removeFromDisk(); + Sink::Storage::DataStore(Sink::storageLocation(), "identifier", Sink::Storage::DataStore::ReadWrite).removeFromDisk(); // Setup auto domainTypeAdaptorFactory = QSharedPointer::create(); { - Sink::Storage storage(Sink::storageLocation(), identifier, Sink::Storage::ReadWrite); - auto transaction = storage.createTransaction(Sink::Storage::ReadWrite); - auto db = Sink::Storage::mainDatabase(transaction, "event"); + Sink::Storage::DataStore storage(Sink::storageLocation(), identifier, Sink::Storage::DataStore::ReadWrite); + auto transaction = storage.createTransaction(Sink::Storage::DataStore::ReadWrite); + auto db = Sink::Storage::DataStore::mainDatabase(transaction, "event"); int bufferSizeTotal = 0; int keysSizeTotal = 0; @@ -58,15 +58,15 @@ class DatabasePopulationAndFacadeQueryBenchmark : public QObject flatbuffers::FlatBufferBuilder fbb; domainTypeAdaptorFactory->createBuffer(*domainObject, fbb); const auto buffer = QByteArray::fromRawData(reinterpret_cast(fbb.GetBufferPointer()), fbb.GetSize()); - const auto key = Sink::Storage::generateUid(); + const auto key = Sink::Storage::DataStore::generateUid(); db.write(key, buffer); bufferSizeTotal += buffer.size(); keysSizeTotal += key.size(); } transaction.commit(); - transaction = storage.createTransaction(Sink::Storage::ReadOnly); - db = Sink::Storage::mainDatabase(transaction, "event"); + transaction = storage.createTransaction(Sink::Storage::DataStore::ReadOnly); + db = Sink::Storage::DataStore::mainDatabase(transaction, "event"); auto dataSizeTotal = count * (QByteArray("uid").size() + QByteArray("summary").size() + attachment.size()); auto size = db.getSize(); @@ -100,7 +100,11 @@ class DatabasePopulationAndFacadeQueryBenchmark : public QObject auto resultSet = QSharedPointer>::create(); auto resourceAccess = QSharedPointer::create(); - TestResourceFacade facade(identifier, resourceAccess); + + QMap factories; + Sink::ResourceContext context{identifier, "test", factories}; + context.mResourceAccess = resourceAccess; + TestResourceFacade facade(context); auto ret = facade.load(query); ret.first.exec().waitForFinished(); @@ -118,7 +122,7 @@ class DatabasePopulationAndFacadeQueryBenchmark : public QObject const auto finalRss = getCurrentRSS(); const auto rssGrowth = finalRss - startingRss; // Since the database is memory mapped it is attributted to the resident set size. - const auto rssWithoutDb = finalRss - Sink::Storage(Sink::storageLocation(), identifier, Sink::Storage::ReadWrite).diskUsage(); + const auto rssWithoutDb = finalRss - Sink::Storage::DataStore(Sink::storageLocation(), identifier, Sink::Storage::DataStore::ReadWrite).diskUsage(); const auto peakRss = getPeakRSS(); // How much peak deviates from final rss in percent (should be around 0) const auto percentageRssError = static_cast(peakRss - finalRss) * 100.0 / static_cast(finalRss); diff --git a/tests/dummyresourcebenchmark.cpp b/tests/dummyresourcebenchmark.cpp index d0ecef7..a2de316 100644 --- a/tests/dummyresourcebenchmark.cpp +++ b/tests/dummyresourcebenchmark.cpp @@ -9,7 +9,6 @@ #include "resourcecontrol.h" #include "commands.h" #include "entitybuffer.h" -#include "pipeline.h" #include "log.h" #include "resourceconfig.h" #include "notification_generated.h" @@ -151,8 +150,7 @@ private slots: QTime time; time.start(); - auto pipeline = QSharedPointer::create("sink.dummy.instance1"); - DummyResource resource("sink.dummy.instance1", pipeline); + DummyResource resource(Sink::ResourceContext{"sink.dummy.instance1", "test"}); flatbuffers::FlatBufferBuilder eventFbb; eventFbb.Clear(); diff --git a/tests/dummyresourcetest.cpp b/tests/dummyresourcetest.cpp index be6e3a5..0883a13 100644 --- a/tests/dummyresourcetest.cpp +++ b/tests/dummyresourcetest.cpp @@ -13,6 +13,7 @@ #include "log.h" #include "test.h" #include "testutils.h" +#include "adaptorfactoryregistry.h" using namespace Sink; using namespace Sink::ApplicationDomain; @@ -28,6 +29,11 @@ class DummyResourceTest : public QObject QTime time; + Sink::ResourceContext getContext() + { + return Sink::ResourceContext{"sink.dummy.instance1", "sink.dummy", Sink::AdaptorFactoryRegistry::instance().getFactories("sink.dummy")}; + } + private slots: void initTestCase() { @@ -129,8 +135,7 @@ private slots: void testResourceSync() { - auto pipeline = QSharedPointer::create("sink.dummy.instance1"); - ::DummyResource resource("sink.dummy.instance1", pipeline); + ::DummyResource resource(getContext()); auto job = resource.synchronizeWithSource(); // TODO pass in optional timeout? auto future = job.exec(); diff --git a/tests/dummyresourcewritebenchmark.cpp b/tests/dummyresourcewritebenchmark.cpp index 5cd7007..facd60c 100644 --- a/tests/dummyresourcewritebenchmark.cpp +++ b/tests/dummyresourcewritebenchmark.cpp @@ -9,7 +9,6 @@ #include "store.h" #include "commands.h" #include "entitybuffer.h" -#include "pipeline.h" #include "log.h" #include "resourceconfig.h" #include "definitions.h" @@ -109,8 +108,7 @@ class DummyResourceWriteBenchmark : public QObject QTime time; time.start(); - auto pipeline = QSharedPointer::create("sink.dummy.instance1"); - DummyResource resource("sink.dummy.instance1", pipeline); + ::DummyResource resource(Sink::ResourceContext{"sink.dummy.instance1", "dummy"}); int bufferSize = 0; auto command = createEntityBuffer(bufferSize); diff --git a/tests/hawd/dataset.cpp b/tests/hawd/dataset.cpp index c023f31..fb2d7e6 100644 --- a/tests/hawd/dataset.cpp +++ b/tests/hawd/dataset.cpp @@ -215,7 +215,7 @@ QString Dataset::Row::toString(const QStringList &cols, int standardCols, const Dataset::Dataset(const QString &name, const State &state) : m_definition(state.datasetDefinition(name)), - m_storage(state.resultsPath(), name, Sink::Storage::ReadWrite), + m_storage(state.resultsPath(), name, Sink::Storage::DataStore::ReadWrite), m_transaction(m_storage.createTransaction()), m_commitHash(state.commitHash()) { @@ -270,13 +270,13 @@ void Dataset::eachRow(const std::function &resultHandler) resultHandler(row); return true; }, - Sink::Storage::basicErrorHandler()); + Sink::Storage::DataStore::basicErrorHandler()); } Dataset::Row Dataset::row(qint64 key) { if (key < 1) { - Row row(*this, Sink::Storage::maxRevision(m_transaction)); + Row row(*this, Sink::Storage::DataStore::maxRevision(m_transaction)); row.setCommitHash(m_commitHash); return row; } @@ -287,7 +287,7 @@ Dataset::Row Dataset::row(qint64 key) row.fromBinary(value); return true; }, - Sink::Storage::basicErrorHandler() + Sink::Storage::DataStore::basicErrorHandler() ); return row; } diff --git a/tests/hawd/dataset.h b/tests/hawd/dataset.h index 0fca8f0..bb2aae5 100644 --- a/tests/hawd/dataset.h +++ b/tests/hawd/dataset.h @@ -84,8 +84,8 @@ public: private: DatasetDefinition m_definition; - Sink::Storage m_storage; - Sink::Storage::Transaction m_transaction; + Sink::Storage::DataStore m_storage; + Sink::Storage::DataStore::Transaction m_transaction; QString m_commitHash; }; diff --git a/tests/indextest.cpp b/tests/indextest.cpp index 8566803..d6a28d6 100644 --- a/tests/indextest.cpp +++ b/tests/indextest.cpp @@ -16,19 +16,19 @@ class IndexTest : public QObject private slots: void initTestCase() { - Sink::Storage store("./testindex", "sink.dummy.testindex", Sink::Storage::ReadWrite); + Sink::Storage::DataStore store("./testindex", "sink.dummy.testindex", Sink::Storage::DataStore::ReadWrite); store.removeFromDisk(); } void cleanup() { - Sink::Storage store("./testindex", "sink.dummy.testindex", Sink::Storage::ReadWrite); + Sink::Storage::DataStore store("./testindex", "sink.dummy.testindex", Sink::Storage::DataStore::ReadWrite); store.removeFromDisk(); } void testIndex() { - Index index("./testindex", "sink.dummy.testindex", Sink::Storage::ReadWrite); + Index index("./testindex", "sink.dummy.testindex", Sink::Storage::DataStore::ReadWrite); // The first key is specifically a substring of the second key index.add("key", "value1"); index.add("keyFoo", "value2"); diff --git a/tests/mailquerybenchmark.cpp b/tests/mailquerybenchmark.cpp index 1d96819..c44b9f6 100644 --- a/tests/mailquerybenchmark.cpp +++ b/tests/mailquerybenchmark.cpp @@ -62,8 +62,7 @@ class MailQueryBenchmark : public QObject { TestResource::removeFromDisk(resourceIdentifier); - auto pipeline = QSharedPointer::create(resourceIdentifier); - pipeline->setResourceType("test"); + auto pipeline = QSharedPointer::create(Sink::ResourceContext{resourceIdentifier, "test"}); auto indexer = QSharedPointer>::create(); @@ -94,10 +93,10 @@ class MailQueryBenchmark : public QObject // Benchmark QTime time; time.start(); - auto resultSet = QSharedPointer>::create(); - auto resourceAccess = QSharedPointer::create(); - TestMailResourceFacade facade(resourceIdentifier, resourceAccess); + Sink::ResourceContext context{resourceIdentifier, "test"}; + context.mResourceAccess = QSharedPointer::create(); + TestMailResourceFacade facade(context); auto ret = facade.load(query); ret.first.exec().waitForFinished(); @@ -115,7 +114,7 @@ class MailQueryBenchmark : public QObject const auto finalRss = getCurrentRSS(); const auto rssGrowth = finalRss - startingRss; // Since the database is memory mapped it is attributted to the resident set size. - const auto rssWithoutDb = finalRss - Sink::Storage(Sink::storageLocation(), resourceIdentifier, Sink::Storage::ReadWrite).diskUsage(); + const auto rssWithoutDb = finalRss - Sink::Storage::DataStore(Sink::storageLocation(), resourceIdentifier, Sink::Storage::DataStore::ReadWrite).diskUsage(); const auto peakRss = getPeakRSS(); // How much peak deviates from final rss in percent (should be around 0) const auto percentageRssError = static_cast(peakRss - finalRss) * 100.0 / static_cast(finalRss); diff --git a/tests/messagequeuetest.cpp b/tests/messagequeuetest.cpp index e79bba2..83fa23f 100644 --- a/tests/messagequeuetest.cpp +++ b/tests/messagequeuetest.cpp @@ -21,7 +21,7 @@ private slots: void initTestCase() { Sink::Test::initTest(); - Sink::Storage store(Sink::Store::storageLocation(), "sink.dummy.testqueue", Sink::Storage::ReadWrite); + Sink::Storage::DataStore store(Sink::Store::storageLocation(), "sink.dummy.testqueue", Sink::Storage::DataStore::ReadWrite); store.removeFromDisk(); } @@ -31,7 +31,7 @@ private slots: void cleanup() { - Sink::Storage store(Sink::Store::storageLocation(), "sink.dummy.testqueue", Sink::Storage::ReadWrite); + Sink::Storage::DataStore store(Sink::Store::storageLocation(), "sink.dummy.testqueue", Sink::Storage::DataStore::ReadWrite); store.removeFromDisk(); } diff --git a/tests/pipelinebenchmark.cpp b/tests/pipelinebenchmark.cpp index 0c0b9e6..16806c7 100644 --- a/tests/pipelinebenchmark.cpp +++ b/tests/pipelinebenchmark.cpp @@ -47,7 +47,7 @@ // class IndexUpdater : public Sink::Preprocessor { // public: -// void newEntity(const QByteArray &uid, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &newEntity, Sink::Storage::Transaction &transaction) Q_DECL_OVERRIDE +// void newEntity(const QByteArray &uid, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &newEntity, Sink::Storage::DataStore::Transaction &transaction) Q_DECL_OVERRIDE // { // for (int i = 0; i < 10; i++) { // Index ridIndex(QString("index.index%1").arg(i).toLatin1(), transaction); @@ -56,11 +56,11 @@ // } // // void modifiedEntity(const QByteArray &key, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &oldEntity, const Sink::ApplicationDomain::BufferAdaptor &newEntity, -// Sink::Storage::Transaction &transaction) Q_DECL_OVERRIDE +// Sink::Storage::DataStore::Transaction &transaction) Q_DECL_OVERRIDE // { // } // -// void deletedEntity(const QByteArray &key, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &oldEntity, Sink::Storage::Transaction &transaction) Q_DECL_OVERRIDE +// void deletedEntity(const QByteArray &key, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &oldEntity, Sink::Storage::DataStore::Transaction &transaction) Q_DECL_OVERRIDE // { // } // }; @@ -83,9 +83,8 @@ class PipelineBenchmark : public QObject { TestResource::removeFromDisk(resourceIdentifier); - auto pipeline = QSharedPointer::create(resourceIdentifier); + auto pipeline = QSharedPointer::create(Sink::ResourceContext{resourceIdentifier, "test"}); pipeline->setPreprocessors("mail", preprocessors); - pipeline->setResourceType("test"); QTime time; time.start(); @@ -112,7 +111,7 @@ class PipelineBenchmark : public QObject // Print memory layout, RSS is what is in memory // std::system("exec pmap -x \"$PPID\""); // - std::cout << "Size: " << Sink::Storage(Sink::storageLocation(), resourceIdentifier, Sink::Storage::ReadOnly).diskUsage() / 1024 << " [kb]" << std::endl; + std::cout << "Size: " << Sink::Storage::DataStore(Sink::storageLocation(), resourceIdentifier, Sink::Storage::DataStore::ReadOnly).diskUsage() / 1024 << " [kb]" << std::endl; std::cout << "Time: " << allProcessedTime << " [ms]" << std::endl; HAWD::Dataset dataset("pipeline", mHawdState); diff --git a/tests/pipelinetest.cpp b/tests/pipelinetest.cpp index 7216f62..112453e 100644 --- a/tests/pipelinetest.cpp +++ b/tests/pipelinetest.cpp @@ -23,14 +23,14 @@ static void removeFromDisk(const QString &name) { - Sink::Storage store(Sink::Store::storageLocation(), name, Sink::Storage::ReadWrite); + Sink::Storage::DataStore store(Sink::Store::storageLocation(), name, Sink::Storage::DataStore::ReadWrite); store.removeFromDisk(); } static QList getKeys(const QByteArray &dbEnv, const QByteArray &name) { - Sink::Storage store(Sink::storageLocation(), dbEnv, Sink::Storage::ReadOnly); - auto transaction = store.createTransaction(Sink::Storage::ReadOnly); + Sink::Storage::DataStore store(Sink::storageLocation(), dbEnv, Sink::Storage::DataStore::ReadOnly); + auto transaction = store.createTransaction(Sink::Storage::DataStore::ReadOnly); auto db = transaction.openDatabase(name, nullptr, false); QList result; db.scan("", [&](const QByteArray &key, const QByteArray &value) { @@ -42,8 +42,8 @@ static QList getKeys(const QByteArray &dbEnv, const QByteArray &name static QByteArray getEntity(const QByteArray &dbEnv, const QByteArray &name, const QByteArray &uid) { - Sink::Storage store(Sink::storageLocation(), dbEnv, Sink::Storage::ReadOnly); - auto transaction = store.createTransaction(Sink::Storage::ReadOnly); + Sink::Storage::DataStore store(Sink::storageLocation(), dbEnv, Sink::Storage::DataStore::ReadOnly); + auto transaction = store.createTransaction(Sink::Storage::DataStore::ReadOnly); auto db = transaction.openDatabase(name, nullptr, false); QByteArray result; db.scan(uid, [&](const QByteArray &key, const QByteArray &value) { @@ -152,20 +152,20 @@ QByteArray deleteEntityCommand(const QByteArray &uid, qint64 revision) class TestProcessor : public Sink::Preprocessor { public: - void newEntity(const QByteArray &uid, qint64 revision, Sink::ApplicationDomain::BufferAdaptor &newEntity, Sink::Storage::Transaction &transaction) Q_DECL_OVERRIDE + void newEntity(const QByteArray &uid, qint64 revision, Sink::ApplicationDomain::BufferAdaptor &newEntity, Sink::Storage::DataStore::Transaction &transaction) Q_DECL_OVERRIDE { newUids << uid; newRevisions << revision; } void modifiedEntity(const QByteArray &uid, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &oldEntity, Sink::ApplicationDomain::BufferAdaptor &newEntity, - Sink::Storage::Transaction &transaction) Q_DECL_OVERRIDE + Sink::Storage::DataStore::Transaction &transaction) Q_DECL_OVERRIDE { modifiedUids << uid; modifiedRevisions << revision; } - void deletedEntity(const QByteArray &uid, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &oldEntity, Sink::Storage::Transaction &transaction) Q_DECL_OVERRIDE + void deletedEntity(const QByteArray &uid, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &oldEntity, Sink::Storage::DataStore::Transaction &transaction) Q_DECL_OVERRIDE { deletedUids << uid; deletedRevisions << revision; @@ -203,8 +203,7 @@ private slots: flatbuffers::FlatBufferBuilder entityFbb; auto command = createEntityCommand(createEvent(entityFbb)); - Sink::Pipeline pipeline("sink.pipelinetest.instance1"); - pipeline.setResourceType("test"); + Sink::Pipeline pipeline(Sink::ResourceContext{"sink.pipelinetest.instance1", "test"}); pipeline.startTransaction(); pipeline.newEntity(command.constData(), command.size()); @@ -220,8 +219,7 @@ private slots: flatbuffers::FlatBufferBuilder entityFbb; auto command = createEntityCommand(createEvent(entityFbb, "summary", "description")); - Sink::Pipeline pipeline("sink.pipelinetest.instance1"); - pipeline.setResourceType("test"); + Sink::Pipeline pipeline(Sink::ResourceContext{"sink.pipelinetest.instance1", "test"}); auto adaptorFactory = QSharedPointer::create(); @@ -234,7 +232,7 @@ private slots: auto keys = getKeys("sink.pipelinetest.instance1", "event.main"); QCOMPARE(keys.size(), 1); const auto key = keys.first(); - const auto uid = Sink::Storage::uidFromKey(key); + const auto uid = Sink::Storage::DataStore::uidFromKey(key); // Execute the modification entityFbb.Clear(); @@ -244,7 +242,7 @@ private slots: pipeline.commit(); // Ensure we've got the new revision with the modification - auto buffer = getEntity("sink.pipelinetest.instance1", "event.main", Sink::Storage::assembleKey(uid, 2)); + auto buffer = getEntity("sink.pipelinetest.instance1", "event.main", Sink::Storage::DataStore::assembleKey(uid, 2)); QVERIFY(!buffer.isEmpty()); Sink::EntityBuffer entityBuffer(buffer.data(), buffer.size()); auto adaptor = adaptorFactory->createAdaptor(entityBuffer.entity()); @@ -269,8 +267,7 @@ private slots: flatbuffers::FlatBufferBuilder entityFbb; auto command = createEntityCommand(createEvent(entityFbb)); - Sink::Pipeline pipeline("sink.pipelinetest.instance1"); - pipeline.setResourceType("test"); + Sink::Pipeline pipeline(Sink::ResourceContext{"sink.pipelinetest.instance1", "test"}); auto adaptorFactory = QSharedPointer::create(); @@ -282,7 +279,7 @@ private slots: // Get uid of written entity auto keys = getKeys("sink.pipelinetest.instance1", "event.main"); QCOMPARE(keys.size(), 1); - const auto uid = Sink::Storage::uidFromKey(keys.first()); + const auto uid = Sink::Storage::DataStore::uidFromKey(keys.first()); // Create another operation inbetween @@ -302,7 +299,7 @@ private slots: pipeline.commit(); // Ensure we've got the new revision with the modification - auto buffer = getEntity("sink.pipelinetest.instance1", "event.main", Sink::Storage::assembleKey(uid, 3)); + auto buffer = getEntity("sink.pipelinetest.instance1", "event.main", Sink::Storage::DataStore::assembleKey(uid, 3)); QVERIFY(!buffer.isEmpty()); Sink::EntityBuffer entityBuffer(buffer.data(), buffer.size()); auto adaptor = adaptorFactory->createAdaptor(entityBuffer.entity()); @@ -313,8 +310,7 @@ private slots: { flatbuffers::FlatBufferBuilder entityFbb; auto command = createEntityCommand(createEvent(entityFbb)); - Sink::Pipeline pipeline("sink.pipelinetest.instance1"); - pipeline.setResourceType("test"); + Sink::Pipeline pipeline(Sink::ResourceContext{"sink.pipelinetest.instance1", "test"}); // Create the initial revision pipeline.startTransaction(); @@ -324,7 +320,7 @@ private slots: auto result = getKeys("sink.pipelinetest.instance1", "event.main"); QCOMPARE(result.size(), 1); - const auto uid = Sink::Storage::uidFromKey(result.first()); + const auto uid = Sink::Storage::DataStore::uidFromKey(result.first()); // Delete entity auto deleteCommand = deleteEntityCommand(uid, 1); @@ -350,8 +346,7 @@ private slots: auto testProcessor = new TestProcessor; - Sink::Pipeline pipeline("sink.pipelinetest.instance1"); - pipeline.setResourceType("test"); + Sink::Pipeline pipeline(Sink::ResourceContext{"sink.pipelinetest.instance1", "test"}); pipeline.setPreprocessors("event", QVector() << testProcessor); pipeline.startTransaction(); // pipeline.setAdaptorFactory("event", QSharedPointer::create()); @@ -363,21 +358,21 @@ private slots: QCOMPARE(testProcessor->newUids.size(), 1); QCOMPARE(testProcessor->newRevisions.size(), 1); // Key doesn't contain revision and is just the uid - QCOMPARE(testProcessor->newUids.at(0), Sink::Storage::uidFromKey(testProcessor->newUids.at(0))); + QCOMPARE(testProcessor->newUids.at(0), Sink::Storage::DataStore::uidFromKey(testProcessor->newUids.at(0))); } pipeline.commit(); entityFbb.Clear(); pipeline.startTransaction(); auto keys = getKeys("sink.pipelinetest.instance1", "event.main"); QCOMPARE(keys.size(), 1); - const auto uid = Sink::Storage::uidFromKey(keys.first()); + const auto uid = Sink::Storage::DataStore::uidFromKey(keys.first()); { auto modifyCommand = modifyEntityCommand(createEvent(entityFbb, "summary2"), uid, 1); pipeline.modifiedEntity(modifyCommand.constData(), modifyCommand.size()); QCOMPARE(testProcessor->modifiedUids.size(), 1); QCOMPARE(testProcessor->modifiedRevisions.size(), 1); // Key doesn't contain revision and is just the uid - QCOMPARE(testProcessor->modifiedUids.at(0), Sink::Storage::uidFromKey(testProcessor->modifiedUids.at(0))); + QCOMPARE(testProcessor->modifiedUids.at(0), Sink::Storage::DataStore::uidFromKey(testProcessor->modifiedUids.at(0))); } pipeline.commit(); entityFbb.Clear(); @@ -389,7 +384,7 @@ private slots: QCOMPARE(testProcessor->deletedUids.size(), 1); QCOMPARE(testProcessor->deletedSummaries.size(), 1); // Key doesn't contain revision and is just the uid - QCOMPARE(testProcessor->deletedUids.at(0), Sink::Storage::uidFromKey(testProcessor->deletedUids.at(0))); + QCOMPARE(testProcessor->deletedUids.at(0), Sink::Storage::DataStore::uidFromKey(testProcessor->deletedUids.at(0))); QCOMPARE(testProcessor->deletedSummaries.at(0), QByteArray("summary2")); } } diff --git a/tests/querytest.cpp b/tests/querytest.cpp index c5c251a..9ae3c74 100644 --- a/tests/querytest.cpp +++ b/tests/querytest.cpp @@ -64,7 +64,7 @@ private slots: // Setup { Mail mail("sink.dummy.instance1"); - Sink::Store::create(mail).exec().waitForFinished(); + VERIFYEXEC(Sink::Store::create(mail)); } // Test diff --git a/tests/storagebenchmark.cpp b/tests/storagebenchmark.cpp index a1ddcc9..906844e 100644 --- a/tests/storagebenchmark.cpp +++ b/tests/storagebenchmark.cpp @@ -62,7 +62,7 @@ private slots: void cleanupTestCase() { - Sink::Storage store(testDataPath, dbName); + Sink::Storage::DataStore store(testDataPath, dbName); store.removeFromDisk(); } @@ -70,7 +70,7 @@ private slots: { auto event = createEvent(); - QScopedPointer store(new Sink::Storage(testDataPath, dbName, Sink::Storage::ReadWrite)); + QScopedPointer store(new Sink::Storage::DataStore(testDataPath, dbName, Sink::Storage::DataStore::ReadWrite)); const char *keyPrefix = "key"; @@ -78,12 +78,12 @@ private slots: time.start(); // Test db write time { - auto transaction = store->createTransaction(Sink::Storage::ReadWrite); + auto transaction = store->createTransaction(Sink::Storage::DataStore::ReadWrite); for (int i = 0; i < count; i++) { transaction.openDatabase().write(keyPrefix + QByteArray::number(i), event); if ((i % 10000) == 0) { transaction.commit(); - transaction = store->createTransaction(Sink::Storage::ReadWrite); + transaction = store->createTransaction(Sink::Storage::DataStore::ReadWrite); } } transaction.commit(); @@ -105,7 +105,7 @@ private slots: // Db read time { - auto transaction = store->createTransaction(Sink::Storage::ReadOnly); + auto transaction = store->createTransaction(Sink::Storage::DataStore::ReadOnly); auto db = transaction.openDatabase(); for (int i = 0; i < count; i++) { db.scan(keyPrefix + QByteArray::number(i), [](const QByteArray &key, const QByteArray &value) -> bool { return true; }); @@ -126,7 +126,7 @@ private slots: void testSizes() { - Sink::Storage store(testDataPath, dbName); + Sink::Storage::DataStore store(testDataPath, dbName); qDebug() << "Database size [kb]: " << store.diskUsage() / 1024; QFileInfo fileInfo(filePath); @@ -135,11 +135,11 @@ private slots: void testScan() { - QScopedPointer store(new Sink::Storage(testDataPath, dbName, Sink::Storage::ReadOnly)); + QScopedPointer store(new Sink::Storage::DataStore(testDataPath, dbName, Sink::Storage::DataStore::ReadOnly)); QBENCHMARK { int hit = 0; - store->createTransaction(Sink::Storage::ReadOnly) + store->createTransaction(Sink::Storage::DataStore::ReadOnly) .openDatabase() .scan("", [&](const QByteArray &key, const QByteArray &value) -> bool { if (key == "key10000") { @@ -154,8 +154,8 @@ private slots: void testKeyLookup() { - QScopedPointer store(new Sink::Storage(testDataPath, dbName, Sink::Storage::ReadOnly)); - auto transaction = store->createTransaction(Sink::Storage::ReadOnly); + QScopedPointer store(new Sink::Storage::DataStore(testDataPath, dbName, Sink::Storage::DataStore::ReadOnly)); + auto transaction = store->createTransaction(Sink::Storage::DataStore::ReadOnly); auto db = transaction.openDatabase(); QBENCHMARK { @@ -170,8 +170,8 @@ private slots: void testFindLatest() { - QScopedPointer store(new Sink::Storage(testDataPath, dbName, Sink::Storage::ReadOnly)); - auto transaction = store->createTransaction(Sink::Storage::ReadOnly); + QScopedPointer store(new Sink::Storage::DataStore(testDataPath, dbName, Sink::Storage::DataStore::ReadOnly)); + auto transaction = store->createTransaction(Sink::Storage::DataStore::ReadOnly); auto db = transaction.openDatabase(); QBENCHMARK { diff --git a/tests/storagetest.cpp b/tests/storagetest.cpp index aa12ec1..5a517c7 100644 --- a/tests/storagetest.cpp +++ b/tests/storagetest.cpp @@ -21,14 +21,14 @@ private: void populate(int count) { - Sink::Storage storage(testDataPath, dbName, Sink::Storage::ReadWrite); - auto transaction = storage.createTransaction(Sink::Storage::ReadWrite); + Sink::Storage::DataStore storage(testDataPath, dbName, Sink::Storage::DataStore::ReadWrite); + auto transaction = storage.createTransaction(Sink::Storage::DataStore::ReadWrite); for (int i = 0; i < count; i++) { // This should perhaps become an implementation detail of the db? if (i % 10000 == 0) { if (i > 0) { transaction.commit(); - transaction = storage.createTransaction(Sink::Storage::ReadWrite); + transaction = storage.createTransaction(Sink::Storage::DataStore::ReadWrite); } } transaction.openDatabase().write(keyPrefix + QByteArray::number(i), keyPrefix + QByteArray::number(i)); @@ -36,12 +36,12 @@ private: transaction.commit(); } - bool verify(Sink::Storage &storage, int i) + bool verify(Sink::Storage::DataStore &storage, int i) { bool success = true; bool keyMatch = true; const auto reference = keyPrefix + QByteArray::number(i); - storage.createTransaction(Sink::Storage::ReadOnly) + storage.createTransaction(Sink::Storage::DataStore::ReadOnly) .openDatabase() .scan(keyPrefix + QByteArray::number(i), [&keyMatch, &reference](const QByteArray &key, const QByteArray &value) -> bool { @@ -51,7 +51,7 @@ private: } return keyMatch; }, - [&success](const Sink::Storage::Error &error) { + [&success](const Sink::Storage::DataStore::Error &error) { qDebug() << error.message; success = false; }); @@ -63,20 +63,20 @@ private slots: { testDataPath = "./testdb"; dbName = "test"; - Sink::Storage storage(testDataPath, dbName); + Sink::Storage::DataStore storage(testDataPath, dbName); storage.removeFromDisk(); } void cleanup() { - Sink::Storage storage(testDataPath, dbName); + Sink::Storage::DataStore storage(testDataPath, dbName); storage.removeFromDisk(); } void testCleanup() { populate(1); - Sink::Storage storage(testDataPath, dbName); + Sink::Storage::DataStore storage(testDataPath, dbName); storage.removeFromDisk(); QFileInfo info(testDataPath + "/" + dbName); QVERIFY(!info.exists()); @@ -90,7 +90,7 @@ private slots: // ensure we can read everything back correctly { - Sink::Storage storage(testDataPath, dbName); + Sink::Storage::DataStore storage(testDataPath, dbName); for (int i = 0; i < count; i++) { QVERIFY(verify(storage, i)); } @@ -105,8 +105,8 @@ private slots: // ensure we can scan for values { int hit = 0; - Sink::Storage store(testDataPath, dbName); - store.createTransaction(Sink::Storage::ReadOnly) + Sink::Storage::DataStore store(testDataPath, dbName); + store.createTransaction(Sink::Storage::DataStore::ReadOnly) .openDatabase() .scan("", [&](const QByteArray &key, const QByteArray &value) -> bool { if (key == "key50") { @@ -121,8 +121,8 @@ private slots: { int hit = 0; bool foundInvalidValue = false; - Sink::Storage store(testDataPath, dbName); - store.createTransaction(Sink::Storage::ReadOnly) + Sink::Storage::DataStore store(testDataPath, dbName); + store.createTransaction(Sink::Storage::DataStore::ReadOnly) .openDatabase() .scan("key50", [&](const QByteArray &key, const QByteArray &value) -> bool { if (key != "key50") { @@ -139,10 +139,10 @@ private slots: void testNestedOperations() { populate(3); - Sink::Storage store(testDataPath, dbName, Sink::Storage::ReadWrite); - auto transaction = store.createTransaction(Sink::Storage::ReadWrite); + Sink::Storage::DataStore store(testDataPath, dbName, Sink::Storage::DataStore::ReadWrite); + auto transaction = store.createTransaction(Sink::Storage::DataStore::ReadWrite); transaction.openDatabase().scan("key1", [&](const QByteArray &key, const QByteArray &value) -> bool { - transaction.openDatabase().remove(key, [](const Sink::Storage::Error &) { QVERIFY(false); }); + transaction.openDatabase().remove(key, [](const Sink::Storage::DataStore::Error &) { QVERIFY(false); }); return false; }); } @@ -150,11 +150,11 @@ private slots: void testNestedTransactions() { populate(3); - Sink::Storage store(testDataPath, dbName, Sink::Storage::ReadWrite); - store.createTransaction(Sink::Storage::ReadOnly) + Sink::Storage::DataStore store(testDataPath, dbName, Sink::Storage::DataStore::ReadWrite); + store.createTransaction(Sink::Storage::DataStore::ReadOnly) .openDatabase() .scan("key1", [&](const QByteArray &key, const QByteArray &value) -> bool { - store.createTransaction(Sink::Storage::ReadWrite).openDatabase().remove(key, [](const Sink::Storage::Error &) { QVERIFY(false); }); + store.createTransaction(Sink::Storage::DataStore::ReadWrite).openDatabase().remove(key, [](const Sink::Storage::DataStore::Error &) { QVERIFY(false); }); return false; }); } @@ -163,9 +163,9 @@ private slots: { bool gotResult = false; bool gotError = false; - Sink::Storage store(testDataPath, dbName, Sink::Storage::ReadWrite); - auto transaction = store.createTransaction(Sink::Storage::ReadOnly); - auto db = transaction.openDatabase("default", [&](const Sink::Storage::Error &error) { + Sink::Storage::DataStore store(testDataPath, dbName, Sink::Storage::DataStore::ReadWrite); + auto transaction = store.createTransaction(Sink::Storage::DataStore::ReadOnly); + auto db = transaction.openDatabase("default", [&](const Sink::Storage::DataStore::Error &error) { qDebug() << error.message; gotError = true; }); @@ -174,7 +174,7 @@ private slots: gotResult = true; return false; }, - [&](const Sink::Storage::Error &error) { + [&](const Sink::Storage::DataStore::Error &error) { qDebug() << error.message; gotError = true; }); @@ -199,8 +199,8 @@ private slots: const int concurrencyLevel = 20; for (int num = 0; num < concurrencyLevel; num++) { futures << QtConcurrent::run([this, count, &error]() { - Sink::Storage storage(testDataPath, dbName, Sink::Storage::ReadOnly); - Sink::Storage storage2(testDataPath, dbName + "2", Sink::Storage::ReadOnly); + Sink::Storage::DataStore storage(testDataPath, dbName, Sink::Storage::DataStore::ReadOnly); + Sink::Storage::DataStore storage2(testDataPath, dbName + "2", Sink::Storage::DataStore::ReadOnly); for (int i = 0; i < count; i++) { if (!verify(storage, i)) { error = true; @@ -216,9 +216,9 @@ private slots: } { - Sink::Storage storage(testDataPath, dbName); + Sink::Storage::DataStore storage(testDataPath, dbName); storage.removeFromDisk(); - Sink::Storage storage2(testDataPath, dbName + "2"); + Sink::Storage::DataStore storage2(testDataPath, dbName + "2"); storage2.removeFromDisk(); } } @@ -227,8 +227,8 @@ private slots: { bool gotResult = false; bool gotError = false; - Sink::Storage store(testDataPath, dbName, Sink::Storage::ReadWrite); - auto transaction = store.createTransaction(Sink::Storage::ReadWrite); + Sink::Storage::DataStore store(testDataPath, dbName, Sink::Storage::DataStore::ReadWrite); + auto transaction = store.createTransaction(Sink::Storage::DataStore::ReadWrite); auto db = transaction.openDatabase("default", nullptr, false); db.write("key", "value"); db.write("key", "value"); @@ -238,7 +238,7 @@ private slots: gotResult = true; return true; }, - [&](const Sink::Storage::Error &error) { + [&](const Sink::Storage::DataStore::Error &error) { qDebug() << error.message; gotError = true; }); @@ -252,8 +252,8 @@ private slots: { bool gotResult = false; bool gotError = false; - Sink::Storage store(testDataPath, dbName, Sink::Storage::ReadWrite); - auto transaction = store.createTransaction(Sink::Storage::ReadWrite); + Sink::Storage::DataStore store(testDataPath, dbName, Sink::Storage::DataStore::ReadWrite); + auto transaction = store.createTransaction(Sink::Storage::DataStore::ReadWrite); auto db = transaction.openDatabase("default", nullptr, true); db.write("key", "value1"); db.write("key", "value2"); @@ -262,7 +262,7 @@ private slots: gotResult = true; return true; }, - [&](const Sink::Storage::Error &error) { + [&](const Sink::Storage::DataStore::Error &error) { qDebug() << error.message; gotError = true; }); @@ -275,15 +275,15 @@ private slots: { bool gotResult = false; bool gotError = false; - Sink::Storage store(testDataPath, dbName, Sink::Storage::ReadOnly); - int numValues = store.createTransaction(Sink::Storage::ReadOnly) + Sink::Storage::DataStore store(testDataPath, dbName, Sink::Storage::DataStore::ReadOnly); + int numValues = store.createTransaction(Sink::Storage::DataStore::ReadOnly) .openDatabase("test") .scan("", [&](const QByteArray &key, const QByteArray &value) -> bool { gotResult = true; return false; }, - [&](const Sink::Storage::Error &error) { + [&](const Sink::Storage::DataStore::Error &error) { qDebug() << error.message; gotError = true; }); @@ -295,10 +295,10 @@ private slots: void testWriteToNamedDb() { bool gotError = false; - Sink::Storage store(testDataPath, dbName, Sink::Storage::ReadWrite); - store.createTransaction(Sink::Storage::ReadWrite) + Sink::Storage::DataStore store(testDataPath, dbName, Sink::Storage::DataStore::ReadWrite); + store.createTransaction(Sink::Storage::DataStore::ReadWrite) .openDatabase("test") - .write("key1", "value1", [&](const Sink::Storage::Error &error) { + .write("key1", "value1", [&](const Sink::Storage::DataStore::Error &error) { qDebug() << error.message; gotError = true; }); @@ -308,10 +308,10 @@ private slots: void testWriteDuplicatesToNamedDb() { bool gotError = false; - Sink::Storage store(testDataPath, dbName, Sink::Storage::ReadWrite); - store.createTransaction(Sink::Storage::ReadWrite) + Sink::Storage::DataStore store(testDataPath, dbName, Sink::Storage::DataStore::ReadWrite); + store.createTransaction(Sink::Storage::DataStore::ReadWrite) .openDatabase("test", nullptr, true) - .write("key1", "value1", [&](const Sink::Storage::Error &error) { + .write("key1", "value1", [&](const Sink::Storage::DataStore::Error &error) { qDebug() << error.message; gotError = true; }); @@ -321,8 +321,8 @@ private slots: // By default we want only exact matches void testSubstringKeys() { - Sink::Storage store(testDataPath, dbName, Sink::Storage::ReadWrite); - auto transaction = store.createTransaction(Sink::Storage::ReadWrite); + Sink::Storage::DataStore store(testDataPath, dbName, Sink::Storage::DataStore::ReadWrite); + auto transaction = store.createTransaction(Sink::Storage::DataStore::ReadWrite); auto db = transaction.openDatabase("test", nullptr, true); db.write("sub", "value1"); db.write("subsub", "value2"); @@ -333,8 +333,8 @@ private slots: void testFindSubstringKeys() { - Sink::Storage store(testDataPath, dbName, Sink::Storage::ReadWrite); - auto transaction = store.createTransaction(Sink::Storage::ReadWrite); + Sink::Storage::DataStore store(testDataPath, dbName, Sink::Storage::DataStore::ReadWrite); + auto transaction = store.createTransaction(Sink::Storage::DataStore::ReadWrite); auto db = transaction.openDatabase("test", nullptr, false); db.write("sub", "value1"); db.write("subsub", "value2"); @@ -346,8 +346,8 @@ private slots: void testFindSubstringKeysWithDuplicatesEnabled() { - Sink::Storage store(testDataPath, dbName, Sink::Storage::ReadWrite); - auto transaction = store.createTransaction(Sink::Storage::ReadWrite); + Sink::Storage::DataStore store(testDataPath, dbName, Sink::Storage::DataStore::ReadWrite); + auto transaction = store.createTransaction(Sink::Storage::DataStore::ReadWrite); auto db = transaction.openDatabase("test", nullptr, true); db.write("sub", "value1"); db.write("subsub", "value2"); @@ -359,8 +359,8 @@ private slots: void testKeySorting() { - Sink::Storage store(testDataPath, dbName, Sink::Storage::ReadWrite); - auto transaction = store.createTransaction(Sink::Storage::ReadWrite); + Sink::Storage::DataStore store(testDataPath, dbName, Sink::Storage::DataStore::ReadWrite); + auto transaction = store.createTransaction(Sink::Storage::DataStore::ReadWrite); auto db = transaction.openDatabase("test", nullptr, false); db.write("sub_2", "value2"); db.write("sub_1", "value1"); @@ -380,8 +380,8 @@ private slots: // Ensure we don't retrieve a key that is greater than the current key. We only want equal keys. void testKeyRange() { - Sink::Storage store(testDataPath, dbName, Sink::Storage::ReadWrite); - auto transaction = store.createTransaction(Sink::Storage::ReadWrite); + Sink::Storage::DataStore store(testDataPath, dbName, Sink::Storage::DataStore::ReadWrite); + auto transaction = store.createTransaction(Sink::Storage::DataStore::ReadWrite); auto db = transaction.openDatabase("test", nullptr, true); db.write("sub1", "value1"); int numValues = db.scan("sub", [&](const QByteArray &key, const QByteArray &value) -> bool { return true; }); @@ -391,8 +391,8 @@ private slots: void testFindLatest() { - Sink::Storage store(testDataPath, dbName, Sink::Storage::ReadWrite); - auto transaction = store.createTransaction(Sink::Storage::ReadWrite); + Sink::Storage::DataStore store(testDataPath, dbName, Sink::Storage::DataStore::ReadWrite); + auto transaction = store.createTransaction(Sink::Storage::DataStore::ReadWrite); auto db = transaction.openDatabase("test", nullptr, false); db.write("sub1", "value1"); db.write("sub2", "value2"); @@ -406,8 +406,8 @@ private slots: void testFindLatestInSingle() { - Sink::Storage store(testDataPath, dbName, Sink::Storage::ReadWrite); - auto transaction = store.createTransaction(Sink::Storage::ReadWrite); + Sink::Storage::DataStore store(testDataPath, dbName, Sink::Storage::DataStore::ReadWrite); + auto transaction = store.createTransaction(Sink::Storage::DataStore::ReadWrite); auto db = transaction.openDatabase("test", nullptr, false); db.write("sub2", "value2"); QByteArray result; @@ -418,8 +418,8 @@ private slots: void testFindLast() { - Sink::Storage store(testDataPath, dbName, Sink::Storage::ReadWrite); - auto transaction = store.createTransaction(Sink::Storage::ReadWrite); + Sink::Storage::DataStore store(testDataPath, dbName, Sink::Storage::DataStore::ReadWrite); + auto transaction = store.createTransaction(Sink::Storage::DataStore::ReadWrite); auto db = transaction.openDatabase("test", nullptr, false); db.write("sub2", "value2"); db.write("wub3", "value3"); @@ -431,23 +431,23 @@ private slots: void testRecordRevision() { - Sink::Storage store(testDataPath, dbName, Sink::Storage::ReadWrite); - auto transaction = store.createTransaction(Sink::Storage::ReadWrite); - Sink::Storage::recordRevision(transaction, 1, "uid", "type"); - QCOMPARE(Sink::Storage::getTypeFromRevision(transaction, 1), QByteArray("type")); - QCOMPARE(Sink::Storage::getUidFromRevision(transaction, 1), QByteArray("uid")); + Sink::Storage::DataStore store(testDataPath, dbName, Sink::Storage::DataStore::ReadWrite); + auto transaction = store.createTransaction(Sink::Storage::DataStore::ReadWrite); + Sink::Storage::DataStore::recordRevision(transaction, 1, "uid", "type"); + QCOMPARE(Sink::Storage::DataStore::getTypeFromRevision(transaction, 1), QByteArray("type")); + QCOMPARE(Sink::Storage::DataStore::getUidFromRevision(transaction, 1), QByteArray("uid")); } void testRecordRevisionSorting() { - Sink::Storage store(testDataPath, dbName, Sink::Storage::ReadWrite); - auto transaction = store.createTransaction(Sink::Storage::ReadWrite); + Sink::Storage::DataStore store(testDataPath, dbName, Sink::Storage::DataStore::ReadWrite); + auto transaction = store.createTransaction(Sink::Storage::DataStore::ReadWrite); QByteArray result; auto db = transaction.openDatabase("test", nullptr, false); const auto uid = "{c5d06a9f-1534-4c52-b8ea-415db68bdadf}"; //Ensure we can sort 1 and 10 properly (by default string comparison 10 comes before 6) - db.write(Sink::Storage::assembleKey(uid, 6), "value1"); - db.write(Sink::Storage::assembleKey(uid, 10), "value2"); + db.write(Sink::Storage::DataStore::assembleKey(uid, 6), "value1"); + db.write(Sink::Storage::DataStore::assembleKey(uid, 10), "value2"); db.findLatest(uid, [&](const QByteArray &key, const QByteArray &value) { result = value; }); QCOMPARE(result, QByteArray("value2")); } diff --git a/tests/testimplementations.h b/tests/testimplementations.h index d188c0c..cf7a3da 100644 --- a/tests/testimplementations.h +++ b/tests/testimplementations.h @@ -83,8 +83,8 @@ public slots: class TestResourceFacade : public Sink::GenericFacade { public: - TestResourceFacade(const QByteArray &instanceIdentifier, const QSharedPointer resourceAccess) - : Sink::GenericFacade(instanceIdentifier, QSharedPointer::create(), resourceAccess) + TestResourceFacade(const Sink::ResourceContext &resourceContext) + : Sink::GenericFacade(resourceContext) { } virtual ~TestResourceFacade() @@ -95,8 +95,8 @@ public: class TestMailResourceFacade : public Sink::GenericFacade { public: - TestMailResourceFacade(const QByteArray &instanceIdentifier, const QSharedPointer resourceAccess) - : Sink::GenericFacade(instanceIdentifier, QSharedPointer::create(), resourceAccess) + TestMailResourceFacade(const Sink::ResourceContext &resourceContext) + : Sink::GenericFacade(resourceContext) { } virtual ~TestMailResourceFacade() @@ -107,7 +107,7 @@ public: class TestResource : public Sink::GenericResource { public: - TestResource(const QByteArray &instanceIdentifier, QSharedPointer pipeline) : Sink::GenericResource("test", instanceIdentifier, pipeline) + TestResource(const Sink::ResourceContext &resourceContext, QSharedPointer pipeline) : Sink::GenericResource(resourceContext, pipeline) { } -- cgit v1.2.3