diff options
author | Christian Mollekopf <chrigi_1@fastmail.fm> | 2016-06-03 12:41:07 +0200 |
---|---|---|
committer | Christian Mollekopf <chrigi_1@fastmail.fm> | 2016-06-03 12:41:07 +0200 |
commit | 77562cdae63e0ec7b09e8ece6af97165ba9e48dd (patch) | |
tree | b24d23335fd856881b7284532e7d4d36caa5ca13 | |
parent | 6569a6dceec10c77578184ce68c26e20ba27fa39 (diff) | |
download | sink-77562cdae63e0ec7b09e8ece6af97165ba9e48dd.tar.gz sink-77562cdae63e0ec7b09e8ece6af97165ba9e48dd.zip |
A way to retrieve the last revision during changereplay.
-rw-r--r-- | common/entitystore.cpp | 26 | ||||
-rw-r--r-- | common/entitystore.h | 29 | ||||
-rw-r--r-- | common/sourcewriteback.h | 6 | ||||
-rw-r--r-- | common/storage.h | 1 | ||||
-rw-r--r-- | common/storage_common.cpp | 5 | ||||
-rw-r--r-- | common/synchronizer.cpp | 5 |
6 files changed, 60 insertions, 12 deletions
diff --git a/common/entitystore.cpp b/common/entitystore.cpp index 5f44609..5296d53 100644 --- a/common/entitystore.cpp +++ b/common/entitystore.cpp | |||
@@ -28,17 +28,18 @@ EntityStore::EntityStore(const QByteArray &resourceType, const QByteArray &resou | |||
28 | 28 | ||
29 | } | 29 | } |
30 | 30 | ||
31 | QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> EntityStore::getLatest(const Sink::Storage::NamedDatabase &db, const QByteArray &uid, DomainTypeAdaptorFactoryInterface &adaptorFactory) | 31 | QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> EntityStore::getLatest(const Sink::Storage::NamedDatabase &db, const QByteArray &uid, DomainTypeAdaptorFactoryInterface &adaptorFactory, qint64 &retrievedRevision) |
32 | { | 32 | { |
33 | QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> current; | 33 | QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> current; |
34 | db.findLatest(uid, | 34 | db.findLatest(uid, |
35 | [¤t, &adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool { | 35 | [¤t, &adaptorFactory, &retrievedRevision](const QByteArray &key, const QByteArray &data) -> bool { |
36 | Sink::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); | 36 | Sink::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); |
37 | if (!buffer.isValid()) { | 37 | if (!buffer.isValid()) { |
38 | Warning() << "Read invalid buffer from disk"; | 38 | Warning() << "Read invalid buffer from disk"; |
39 | } else { | 39 | } else { |
40 | Trace() << "Found value " << key; | 40 | Trace() << "Found value " << key; |
41 | current = adaptorFactory.createAdaptor(buffer.entity()); | 41 | current = adaptorFactory.createAdaptor(buffer.entity()); |
42 | retrievedRevision = Sink::Storage::revisionFromKey(key); | ||
42 | } | 43 | } |
43 | return false; | 44 | return false; |
44 | }, | 45 | }, |
@@ -46,19 +47,36 @@ QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> EntityStore::getLatest(co | |||
46 | return current; | 47 | return current; |
47 | } | 48 | } |
48 | 49 | ||
49 | QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> EntityStore::get(const Sink::Storage::NamedDatabase &db, const QByteArray &key, DomainTypeAdaptorFactoryInterface &adaptorFactory) | 50 | QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> EntityStore::get(const Sink::Storage::NamedDatabase &db, const QByteArray &key, DomainTypeAdaptorFactoryInterface &adaptorFactory, qint64 &retrievedRevision) |
50 | { | 51 | { |
51 | QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> current; | 52 | QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> current; |
52 | db.scan(key, | 53 | db.scan(key, |
53 | [¤t, &adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool { | 54 | [¤t, &adaptorFactory, &retrievedRevision](const QByteArray &key, const QByteArray &data) -> bool { |
54 | Sink::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); | 55 | Sink::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); |
55 | if (!buffer.isValid()) { | 56 | if (!buffer.isValid()) { |
56 | Warning() << "Read invalid buffer from disk"; | 57 | Warning() << "Read invalid buffer from disk"; |
57 | } else { | 58 | } else { |
58 | current = adaptorFactory.createAdaptor(buffer.entity()); | 59 | current = adaptorFactory.createAdaptor(buffer.entity()); |
60 | retrievedRevision = Sink::Storage::revisionFromKey(key); | ||
59 | } | 61 | } |
60 | return false; | 62 | return false; |
61 | }, | 63 | }, |
62 | [](const Sink::Storage::Error &error) { Warning() << "Failed to read current value from storage: " << error.message; }); | 64 | [](const Sink::Storage::Error &error) { Warning() << "Failed to read current value from storage: " << error.message; }); |
63 | return current; | 65 | return current; |
64 | } | 66 | } |
67 | |||
68 | QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> EntityStore::getPrevious(const Sink::Storage::NamedDatabase &db, const QByteArray &uid, qint64 revision, DomainTypeAdaptorFactoryInterface &adaptorFactory, qint64 &retrievedRevision) | ||
69 | { | ||
70 | QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> current; | ||
71 | qint64 latestRevision = 0; | ||
72 | db.scan(uid, | ||
73 | [¤t, &latestRevision, revision](const QByteArray &key, const QByteArray &) -> bool { | ||
74 | auto foundRevision = Sink::Storage::revisionFromKey(key); | ||
75 | if (foundRevision < revision && foundRevision > latestRevision) { | ||
76 | latestRevision = foundRevision; | ||
77 | } | ||
78 | return true; | ||
79 | }, | ||
80 | [](const Sink::Storage::Error &error) { Warning() << "Failed to read current value from storage: " << error.message; }, true); | ||
81 | return get(db, Sink::Storage::assembleKey(uid, latestRevision), adaptorFactory, retrievedRevision); | ||
82 | } | ||
diff --git a/common/entitystore.h b/common/entitystore.h index b6f8713..24f43b1 100644 --- a/common/entitystore.h +++ b/common/entitystore.h | |||
@@ -37,11 +37,12 @@ public: | |||
37 | { | 37 | { |
38 | auto typeName = ApplicationDomain::getTypeName<T>(); | 38 | auto typeName = ApplicationDomain::getTypeName<T>(); |
39 | auto mainDatabase = Storage::mainDatabase(mTransaction, typeName); | 39 | auto mainDatabase = Storage::mainDatabase(mTransaction, typeName); |
40 | auto bufferAdaptor = getLatest(mainDatabase, identifier, *Sink::AdaptorFactoryRegistry::instance().getFactory<T>(mResourceType)); | 40 | qint64 retrievedRevision = 0; |
41 | auto bufferAdaptor = getLatest(mainDatabase, identifier, *Sink::AdaptorFactoryRegistry::instance().getFactory<T>(mResourceType), retrievedRevision); | ||
41 | if (!bufferAdaptor) { | 42 | if (!bufferAdaptor) { |
42 | return T(); | 43 | return T(); |
43 | } | 44 | } |
44 | return T(mResourceInstanceIdentifier, identifier, 0, bufferAdaptor); | 45 | return T(mResourceInstanceIdentifier, identifier, retrievedRevision, bufferAdaptor); |
45 | } | 46 | } |
46 | 47 | ||
47 | template<typename T> | 48 | template<typename T> |
@@ -49,17 +50,33 @@ public: | |||
49 | { | 50 | { |
50 | auto typeName = ApplicationDomain::getTypeName<T>(); | 51 | auto typeName = ApplicationDomain::getTypeName<T>(); |
51 | auto mainDatabase = Storage::mainDatabase(mTransaction, typeName); | 52 | auto mainDatabase = Storage::mainDatabase(mTransaction, typeName); |
52 | auto bufferAdaptor = get(mainDatabase, key, *Sink::AdaptorFactoryRegistry::instance().getFactory<T>(mResourceType)); | 53 | qint64 retrievedRevision = 0; |
54 | auto bufferAdaptor = get(mainDatabase, key, *Sink::AdaptorFactoryRegistry::instance().getFactory<T>(mResourceType), retrievedRevision); | ||
53 | const auto identifier = Storage::uidFromKey(key); | 55 | const auto identifier = Storage::uidFromKey(key); |
54 | if (!bufferAdaptor) { | 56 | if (!bufferAdaptor) { |
55 | return T(); | 57 | return T(); |
56 | } | 58 | } |
57 | return T(mResourceInstanceIdentifier, identifier, 0, bufferAdaptor); | 59 | return T(mResourceInstanceIdentifier, identifier, retrievedRevision, bufferAdaptor); |
58 | } | 60 | } |
59 | 61 | ||
62 | template<typename T> | ||
63 | T readPrevious(const QByteArray &uid, qint64 revision) const | ||
64 | { | ||
65 | auto typeName = ApplicationDomain::getTypeName<T>(); | ||
66 | auto mainDatabase = Storage::mainDatabase(mTransaction, typeName); | ||
67 | qint64 retrievedRevision = 0; | ||
68 | auto bufferAdaptor = getPrevious(mainDatabase, uid, revision, *Sink::AdaptorFactoryRegistry::instance().getFactory<T>(mResourceType), retrievedRevision); | ||
69 | if (!bufferAdaptor) { | ||
70 | return T(); | ||
71 | } | ||
72 | return T(mResourceInstanceIdentifier, uid, retrievedRevision, bufferAdaptor); | ||
73 | } | ||
74 | |||
75 | |||
60 | 76 | ||
61 | static QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> getLatest(const Sink::Storage::NamedDatabase &db, const QByteArray &uid, DomainTypeAdaptorFactoryInterface &adaptorFactory); | 77 | static QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> getLatest(const Sink::Storage::NamedDatabase &db, const QByteArray &uid, DomainTypeAdaptorFactoryInterface &adaptorFactory, qint64 &retrievedRevision); |
62 | static QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> get(const Sink::Storage::NamedDatabase &db, const QByteArray &key, DomainTypeAdaptorFactoryInterface &adaptorFactory); | 78 | static QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> get(const Sink::Storage::NamedDatabase &db, const QByteArray &key, DomainTypeAdaptorFactoryInterface &adaptorFactory, qint64 &retrievedRevision); |
79 | static QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> getPrevious(const Sink::Storage::NamedDatabase &db, const QByteArray &uid, qint64 revision, DomainTypeAdaptorFactoryInterface &adaptorFactory, qint64 &retrievedRevision); | ||
63 | private: | 80 | private: |
64 | QByteArray mResourceType; | 81 | QByteArray mResourceType; |
65 | QByteArray mResourceInstanceIdentifier; | 82 | QByteArray mResourceInstanceIdentifier; |
diff --git a/common/sourcewriteback.h b/common/sourcewriteback.h index 6c6eaab..9fe5c66 100644 --- a/common/sourcewriteback.h +++ b/common/sourcewriteback.h | |||
@@ -48,6 +48,12 @@ protected: | |||
48 | //Read/Write access to sync storage | 48 | //Read/Write access to sync storage |
49 | RemoteIdMap &syncStore(); | 49 | RemoteIdMap &syncStore(); |
50 | 50 | ||
51 | template <typename T> | ||
52 | T getPrevious(const T &entity) | ||
53 | { | ||
54 | return store().readPrevious<T>(entity.identifier(), entity.revision()); | ||
55 | } | ||
56 | |||
51 | private: | 57 | private: |
52 | //Read only access to main storage | 58 | //Read only access to main storage |
53 | EntityStore &store(); | 59 | EntityStore &store(); |
diff --git a/common/storage.h b/common/storage.h index 0527c4f..2661439 100644 --- a/common/storage.h +++ b/common/storage.h | |||
@@ -216,6 +216,7 @@ public: | |||
216 | 216 | ||
217 | static QByteArray assembleKey(const QByteArray &key, qint64 revision); | 217 | static QByteArray assembleKey(const QByteArray &key, qint64 revision); |
218 | static QByteArray uidFromKey(const QByteArray &key); | 218 | static QByteArray uidFromKey(const QByteArray &key); |
219 | static qint64 revisionFromKey(const QByteArray &key); | ||
219 | 220 | ||
220 | static NamedDatabase mainDatabase(const Sink::Storage::Transaction &, const QByteArray &type); | 221 | static NamedDatabase mainDatabase(const Sink::Storage::Transaction &, const QByteArray &type); |
221 | 222 | ||
diff --git a/common/storage_common.cpp b/common/storage_common.cpp index 4ca484a..8227a98 100644 --- a/common/storage_common.cpp +++ b/common/storage_common.cpp | |||
@@ -163,6 +163,11 @@ QByteArray Storage::uidFromKey(const QByteArray &key) | |||
163 | return key.mid(0, 38); | 163 | return key.mid(0, 38); |
164 | } | 164 | } |
165 | 165 | ||
166 | qint64 Storage::revisionFromKey(const QByteArray &key) | ||
167 | { | ||
168 | return key.mid(39).toLongLong(); | ||
169 | } | ||
170 | |||
166 | Storage::NamedDatabase Storage::mainDatabase(const Sink::Storage::Transaction &t, const QByteArray &type) | 171 | Storage::NamedDatabase Storage::mainDatabase(const Sink::Storage::Transaction &t, const QByteArray &type) |
167 | { | 172 | { |
168 | return t.openDatabase(type + ".main"); | 173 | return t.openDatabase(type + ".main"); |
diff --git a/common/synchronizer.cpp b/common/synchronizer.cpp index 4bd8a5b..1bac5d9 100644 --- a/common/synchronizer.cpp +++ b/common/synchronizer.cpp | |||
@@ -119,8 +119,8 @@ void Synchronizer::scanForRemovals(const QByteArray &bufferType, const std::func | |||
119 | { | 119 | { |
120 | entryGenerator([this, bufferType, &exists](const QByteArray &key) { | 120 | entryGenerator([this, bufferType, &exists](const QByteArray &key) { |
121 | auto sinkId = Sink::Storage::uidFromKey(key); | 121 | auto sinkId = Sink::Storage::uidFromKey(key); |
122 | Trace() << "Checking for removal " << key; | ||
123 | const auto remoteId = syncStore().resolveLocalId(bufferType, sinkId); | 122 | const auto remoteId = syncStore().resolveLocalId(bufferType, sinkId); |
123 | Trace() << "Checking for removal " << key << remoteId; | ||
124 | // If we have no remoteId, the entity hasn't been replayed to the source yet | 124 | // If we have no remoteId, the entity hasn't been replayed to the source yet |
125 | if (!remoteId.isEmpty()) { | 125 | if (!remoteId.isEmpty()) { |
126 | if (!exists(remoteId)) { | 126 | if (!exists(remoteId)) { |
@@ -144,7 +144,8 @@ void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray | |||
144 | createEntity( | 144 | createEntity( |
145 | sinkId, bufferType, entity, *adaptorFactory, [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::CreateEntityCommand, buffer); }); | 145 | sinkId, bufferType, entity, *adaptorFactory, [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::CreateEntityCommand, buffer); }); |
146 | } else { // modification | 146 | } else { // modification |
147 | if (auto current = store().getLatest(mainDatabase, sinkId, *adaptorFactory)) { | 147 | qint64 retrievedRevision = 0; |
148 | if (auto current = store().getLatest(mainDatabase, sinkId, *adaptorFactory, retrievedRevision)) { | ||
148 | bool changed = false; | 149 | bool changed = false; |
149 | for (const auto &property : entity.changedProperties()) { | 150 | for (const auto &property : entity.changedProperties()) { |
150 | if (entity.getProperty(property) != current->getProperty(property)) { | 151 | if (entity.getProperty(property) != current->getProperty(property)) { |