From 77562cdae63e0ec7b09e8ece6af97165ba9e48dd Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Fri, 3 Jun 2016 12:41:07 +0200 Subject: A way to retrieve the last revision during changereplay. --- common/entitystore.cpp | 26 ++++++++++++++++++++++---- common/entitystore.h | 29 +++++++++++++++++++++++------ common/sourcewriteback.h | 6 ++++++ common/storage.h | 1 + common/storage_common.cpp | 5 +++++ common/synchronizer.cpp | 5 +++-- 6 files changed, 60 insertions(+), 12 deletions(-) (limited to 'common') diff --git a/common/entitystore.cpp b/common/entitystore.cpp index 5f44609..5296d53 100644 --- a/common/entitystore.cpp +++ b/common/entitystore.cpp @@ -28,17 +28,18 @@ EntityStore::EntityStore(const QByteArray &resourceType, const QByteArray &resou } -QSharedPointer EntityStore::getLatest(const Sink::Storage::NamedDatabase &db, const QByteArray &uid, DomainTypeAdaptorFactoryInterface &adaptorFactory) +QSharedPointer EntityStore::getLatest(const Sink::Storage::NamedDatabase &db, const QByteArray &uid, DomainTypeAdaptorFactoryInterface &adaptorFactory, qint64 &retrievedRevision) { QSharedPointer current; db.findLatest(uid, - [¤t, &adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool { + [¤t, &adaptorFactory, &retrievedRevision](const QByteArray &key, const QByteArray &data) -> bool { Sink::EntityBuffer buffer(const_cast(data.data()), data.size()); if (!buffer.isValid()) { Warning() << "Read invalid buffer from disk"; } else { Trace() << "Found value " << key; current = adaptorFactory.createAdaptor(buffer.entity()); + retrievedRevision = Sink::Storage::revisionFromKey(key); } return false; }, @@ -46,19 +47,36 @@ QSharedPointer EntityStore::getLatest(co return current; } -QSharedPointer EntityStore::get(const Sink::Storage::NamedDatabase &db, const QByteArray &key, DomainTypeAdaptorFactoryInterface &adaptorFactory) +QSharedPointer EntityStore::get(const Sink::Storage::NamedDatabase &db, const QByteArray &key, DomainTypeAdaptorFactoryInterface &adaptorFactory, qint64 &retrievedRevision) { QSharedPointer current; db.scan(key, - [¤t, &adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool { + [¤t, &adaptorFactory, &retrievedRevision](const QByteArray &key, const QByteArray &data) -> bool { Sink::EntityBuffer buffer(const_cast(data.data()), data.size()); if (!buffer.isValid()) { Warning() << "Read invalid buffer from disk"; } else { current = adaptorFactory.createAdaptor(buffer.entity()); + retrievedRevision = Sink::Storage::revisionFromKey(key); } return false; }, [](const Sink::Storage::Error &error) { Warning() << "Failed to read current value from storage: " << error.message; }); return current; } + +QSharedPointer EntityStore::getPrevious(const Sink::Storage::NamedDatabase &db, const QByteArray &uid, qint64 revision, DomainTypeAdaptorFactoryInterface &adaptorFactory, qint64 &retrievedRevision) +{ + QSharedPointer current; + qint64 latestRevision = 0; + db.scan(uid, + [¤t, &latestRevision, revision](const QByteArray &key, const QByteArray &) -> bool { + auto foundRevision = Sink::Storage::revisionFromKey(key); + if (foundRevision < revision && foundRevision > latestRevision) { + latestRevision = foundRevision; + } + return true; + }, + [](const Sink::Storage::Error &error) { Warning() << "Failed to read current value from storage: " << error.message; }, true); + return get(db, Sink::Storage::assembleKey(uid, latestRevision), adaptorFactory, retrievedRevision); +} diff --git a/common/entitystore.h b/common/entitystore.h index b6f8713..24f43b1 100644 --- a/common/entitystore.h +++ b/common/entitystore.h @@ -37,11 +37,12 @@ public: { auto typeName = ApplicationDomain::getTypeName(); auto mainDatabase = Storage::mainDatabase(mTransaction, typeName); - auto bufferAdaptor = getLatest(mainDatabase, identifier, *Sink::AdaptorFactoryRegistry::instance().getFactory(mResourceType)); + qint64 retrievedRevision = 0; + auto bufferAdaptor = getLatest(mainDatabase, identifier, *Sink::AdaptorFactoryRegistry::instance().getFactory(mResourceType), retrievedRevision); if (!bufferAdaptor) { return T(); } - return T(mResourceInstanceIdentifier, identifier, 0, bufferAdaptor); + return T(mResourceInstanceIdentifier, identifier, retrievedRevision, bufferAdaptor); } template @@ -49,17 +50,33 @@ public: { auto typeName = ApplicationDomain::getTypeName(); auto mainDatabase = Storage::mainDatabase(mTransaction, typeName); - auto bufferAdaptor = get(mainDatabase, key, *Sink::AdaptorFactoryRegistry::instance().getFactory(mResourceType)); + qint64 retrievedRevision = 0; + auto bufferAdaptor = get(mainDatabase, key, *Sink::AdaptorFactoryRegistry::instance().getFactory(mResourceType), retrievedRevision); const auto identifier = Storage::uidFromKey(key); if (!bufferAdaptor) { return T(); } - return T(mResourceInstanceIdentifier, identifier, 0, bufferAdaptor); + return T(mResourceInstanceIdentifier, identifier, retrievedRevision, bufferAdaptor); } + template + T readPrevious(const QByteArray &uid, qint64 revision) const + { + auto typeName = ApplicationDomain::getTypeName(); + auto mainDatabase = Storage::mainDatabase(mTransaction, typeName); + qint64 retrievedRevision = 0; + auto bufferAdaptor = getPrevious(mainDatabase, uid, revision, *Sink::AdaptorFactoryRegistry::instance().getFactory(mResourceType), retrievedRevision); + if (!bufferAdaptor) { + return T(); + } + return T(mResourceInstanceIdentifier, uid, retrievedRevision, bufferAdaptor); + } + + - static QSharedPointer getLatest(const Sink::Storage::NamedDatabase &db, const QByteArray &uid, DomainTypeAdaptorFactoryInterface &adaptorFactory); - static QSharedPointer get(const Sink::Storage::NamedDatabase &db, const QByteArray &key, DomainTypeAdaptorFactoryInterface &adaptorFactory); + static QSharedPointer getLatest(const Sink::Storage::NamedDatabase &db, const QByteArray &uid, DomainTypeAdaptorFactoryInterface &adaptorFactory, qint64 &retrievedRevision); + static QSharedPointer get(const Sink::Storage::NamedDatabase &db, const QByteArray &key, DomainTypeAdaptorFactoryInterface &adaptorFactory, qint64 &retrievedRevision); + static QSharedPointer getPrevious(const Sink::Storage::NamedDatabase &db, const QByteArray &uid, qint64 revision, DomainTypeAdaptorFactoryInterface &adaptorFactory, qint64 &retrievedRevision); private: QByteArray mResourceType; QByteArray mResourceInstanceIdentifier; diff --git a/common/sourcewriteback.h b/common/sourcewriteback.h index 6c6eaab..9fe5c66 100644 --- a/common/sourcewriteback.h +++ b/common/sourcewriteback.h @@ -48,6 +48,12 @@ protected: //Read/Write access to sync storage RemoteIdMap &syncStore(); + template + T getPrevious(const T &entity) + { + return store().readPrevious(entity.identifier(), entity.revision()); + } + private: //Read only access to main storage EntityStore &store(); diff --git a/common/storage.h b/common/storage.h index 0527c4f..2661439 100644 --- a/common/storage.h +++ b/common/storage.h @@ -216,6 +216,7 @@ public: static QByteArray assembleKey(const QByteArray &key, qint64 revision); static QByteArray uidFromKey(const QByteArray &key); + static qint64 revisionFromKey(const QByteArray &key); static NamedDatabase mainDatabase(const Sink::Storage::Transaction &, const QByteArray &type); diff --git a/common/storage_common.cpp b/common/storage_common.cpp index 4ca484a..8227a98 100644 --- a/common/storage_common.cpp +++ b/common/storage_common.cpp @@ -163,6 +163,11 @@ QByteArray Storage::uidFromKey(const QByteArray &key) return key.mid(0, 38); } +qint64 Storage::revisionFromKey(const QByteArray &key) +{ + return key.mid(39).toLongLong(); +} + Storage::NamedDatabase Storage::mainDatabase(const Sink::Storage::Transaction &t, const QByteArray &type) { return t.openDatabase(type + ".main"); diff --git a/common/synchronizer.cpp b/common/synchronizer.cpp index 4bd8a5b..1bac5d9 100644 --- a/common/synchronizer.cpp +++ b/common/synchronizer.cpp @@ -119,8 +119,8 @@ void Synchronizer::scanForRemovals(const QByteArray &bufferType, const std::func { entryGenerator([this, bufferType, &exists](const QByteArray &key) { auto sinkId = Sink::Storage::uidFromKey(key); - Trace() << "Checking for removal " << key; const auto remoteId = syncStore().resolveLocalId(bufferType, sinkId); + Trace() << "Checking for removal " << key << remoteId; // If we have no remoteId, the entity hasn't been replayed to the source yet if (!remoteId.isEmpty()) { if (!exists(remoteId)) { @@ -144,7 +144,8 @@ void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray createEntity( sinkId, bufferType, entity, *adaptorFactory, [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::CreateEntityCommand, buffer); }); } else { // modification - if (auto current = store().getLatest(mainDatabase, sinkId, *adaptorFactory)) { + qint64 retrievedRevision = 0; + if (auto current = store().getLatest(mainDatabase, sinkId, *adaptorFactory, retrievedRevision)) { bool changed = false; for (const auto &property : entity.changedProperties()) { if (entity.getProperty(property) != current->getProperty(property)) { -- cgit v1.2.3