diff options
author | Christian Mollekopf <chrigi_1@fastmail.fm> | 2016-10-16 14:55:20 +0200 |
---|---|---|
committer | Christian Mollekopf <chrigi_1@fastmail.fm> | 2016-10-21 09:02:21 +0200 |
commit | 237b9ae4113e7a9f489632296941becb71afdb45 (patch) | |
tree | 01cde58f495944f01cad9d282391d4efd2897141 /common/entityreader.cpp | |
parent | 95d11bf0be98a4e3c08502fe23417b800233ce14 (diff) | |
download | sink-237b9ae4113e7a9f489632296941becb71afdb45.tar.gz sink-237b9ae4113e7a9f489632296941becb71afdb45.zip |
Refactor how the storage is used.
This is the initial refactoring to improve how we deal with the storage.
It does a couple of things:
* Rename Sink::Storage to Sink::Storage::DataStore to free up the
Sink::Storage namespace
* Introduce a Sink::ResourceContext to have a single object that can be
passed around containing everything that is necessary to operate on a
resource. This is a lot better than the multiple separate parameters
that we used to pass around all over the place, while still allowing
for dependency injection for tests.
* Tie storage access together using the new EntityStore that directly
works with ApplicationDomainTypes. This gives us a central place where
main storage, indexes and buffer adaptors are tied together, which
will also give us a place to implement external indexes, such as a
fulltextindex using xapian.
* Use ApplicationDomainTypes as the default way to pass around entities.
Instead of using various ways to pass around entities (buffers,
buffer adaptors, ApplicationDomainTypes), only use a single way.
The old approach was confusing, and was only done as:
* optimization; really shouldn't be necessary and otherwise I'm sure
we can find better ways to optimize ApplicationDomainType itself.
* a way to account for entities that have multiple buffers, a concept
that I no longer deem relevant.
While this commit does the bulk of the work to get there, the following
commits will refactor more stuff to get things back to normal.
Diffstat (limited to 'common/entityreader.cpp')
-rw-r--r-- | common/entityreader.cpp | 209 |
1 files changed, 102 insertions, 107 deletions
diff --git a/common/entityreader.cpp b/common/entityreader.cpp index cca1511..c49d1f7 100644 --- a/common/entityreader.cpp +++ b/common/entityreader.cpp | |||
@@ -28,75 +28,82 @@ SINK_DEBUG_AREA("entityreader") | |||
28 | 28 | ||
29 | using namespace Sink; | 29 | using namespace Sink; |
30 | 30 | ||
31 | QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> EntityReaderUtils::getLatest(const Sink::Storage::NamedDatabase &db, const QByteArray &uid, DomainTypeAdaptorFactoryInterface &adaptorFactory, qint64 &retrievedRevision) | 31 | /* QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> EntityReaderUtils::getLatest(const Sink::Storage::DataStore::NamedDatabase &db, const QByteArray &uid, DomainTypeAdaptorFactoryInterface &adaptorFactory, qint64 &retrievedRevision) */ |
32 | { | 32 | /* { */ |
33 | QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> current; | 33 | /* QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> current; */ |
34 | db.findLatest(uid, | 34 | /* db.findLatest(uid, */ |
35 | [¤t, &adaptorFactory, &retrievedRevision](const QByteArray &key, const QByteArray &data) -> bool { | 35 | /* [¤t, &adaptorFactory, &retrievedRevision](const QByteArray &key, const QByteArray &data) -> bool { */ |
36 | Sink::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); | 36 | /* Sink::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); */ |
37 | if (!buffer.isValid()) { | 37 | /* if (!buffer.isValid()) { */ |
38 | SinkWarning() << "Read invalid buffer from disk"; | 38 | /* SinkWarning() << "Read invalid buffer from disk"; */ |
39 | } else { | 39 | /* } else { */ |
40 | SinkTrace() << "Found value " << key; | 40 | /* SinkTrace() << "Found value " << key; */ |
41 | current = adaptorFactory.createAdaptor(buffer.entity()); | 41 | /* current = adaptorFactory.createAdaptor(buffer.entity()); */ |
42 | retrievedRevision = Sink::Storage::revisionFromKey(key); | 42 | /* retrievedRevision = Sink::Storage::DataStore::revisionFromKey(key); */ |
43 | } | 43 | /* } */ |
44 | return false; | 44 | /* return false; */ |
45 | }, | 45 | /* }, */ |
46 | [](const Sink::Storage::Error &error) { SinkWarning() << "Failed to read current value from storage: " << error.message; }); | 46 | /* [](const Sink::Storage::DataStore::Error &error) { SinkWarning() << "Failed to read current value from storage: " << error.message; }); */ |
47 | return current; | 47 | /* return current; */ |
48 | } | 48 | /* } */ |
49 | 49 | ||
50 | QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> EntityReaderUtils::get(const Sink::Storage::NamedDatabase &db, const QByteArray &key, DomainTypeAdaptorFactoryInterface &adaptorFactory, qint64 &retrievedRevision) | 50 | /* QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> EntityReaderUtils::get(const Sink::Storage::DataStore::NamedDatabase &db, const QByteArray &key, DomainTypeAdaptorFactoryInterface &adaptorFactory, qint64 &retrievedRevision) */ |
51 | { | 51 | /* { */ |
52 | QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> current; | 52 | /* QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> current; */ |
53 | db.scan(key, | 53 | /* db.scan(key, */ |
54 | [¤t, &adaptorFactory, &retrievedRevision](const QByteArray &key, const QByteArray &data) -> bool { | 54 | /* [¤t, &adaptorFactory, &retrievedRevision](const QByteArray &key, const QByteArray &data) -> bool { */ |
55 | Sink::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); | 55 | /* Sink::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); */ |
56 | if (!buffer.isValid()) { | 56 | /* if (!buffer.isValid()) { */ |
57 | SinkWarning() << "Read invalid buffer from disk"; | 57 | /* SinkWarning() << "Read invalid buffer from disk"; */ |
58 | } else { | 58 | /* } else { */ |
59 | current = adaptorFactory.createAdaptor(buffer.entity()); | 59 | /* current = adaptorFactory.createAdaptor(buffer.entity()); */ |
60 | retrievedRevision = Sink::Storage::revisionFromKey(key); | 60 | /* retrievedRevision = Sink::Storage::DataStore::revisionFromKey(key); */ |
61 | } | 61 | /* } */ |
62 | return false; | 62 | /* return false; */ |
63 | }, | 63 | /* }, */ |
64 | [](const Sink::Storage::Error &error) { SinkWarning() << "Failed to read current value from storage: " << error.message; }); | 64 | /* [](const Sink::Storage::DataStore::Error &error) { SinkWarning() << "Failed to read current value from storage: " << error.message; }); */ |
65 | return current; | 65 | /* return current; */ |
66 | } | 66 | /* } */ |
67 | 67 | ||
68 | QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> EntityReaderUtils::getPrevious(const Sink::Storage::NamedDatabase &db, const QByteArray &uid, qint64 revision, DomainTypeAdaptorFactoryInterface &adaptorFactory, qint64 &retrievedRevision) | 68 | /* QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> EntityReaderUtils::getPrevious(const Sink::Storage::DataStore::NamedDatabase &db, const QByteArray &uid, qint64 revision, DomainTypeAdaptorFactoryInterface &adaptorFactory, qint64 &retrievedRevision) */ |
69 | { | 69 | /* { */ |
70 | QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> current; | 70 | /* QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> current; */ |
71 | qint64 latestRevision = 0; | 71 | /* qint64 latestRevision = 0; */ |
72 | db.scan(uid, | 72 | /* db.scan(uid, */ |
73 | [¤t, &latestRevision, revision](const QByteArray &key, const QByteArray &) -> bool { | 73 | /* [¤t, &latestRevision, revision](const QByteArray &key, const QByteArray &) -> bool { */ |
74 | auto foundRevision = Sink::Storage::revisionFromKey(key); | 74 | /* auto foundRevision = Sink::Storage::DataStore::revisionFromKey(key); */ |
75 | if (foundRevision < revision && foundRevision > latestRevision) { | 75 | /* if (foundRevision < revision && foundRevision > latestRevision) { */ |
76 | latestRevision = foundRevision; | 76 | /* latestRevision = foundRevision; */ |
77 | } | 77 | /* } */ |
78 | return true; | 78 | /* return true; */ |
79 | }, | 79 | /* }, */ |
80 | [](const Sink::Storage::Error &error) { SinkWarning() << "Failed to read current value from storage: " << error.message; }, true); | 80 | /* [](const Sink::Storage::DataStore::Error &error) { SinkWarning() << "Failed to read current value from storage: " << error.message; }, true); */ |
81 | return get(db, Sink::Storage::assembleKey(uid, latestRevision), adaptorFactory, retrievedRevision); | 81 | /* return get(db, Sink::Storage::DataStore::assembleKey(uid, latestRevision), adaptorFactory, retrievedRevision); */ |
82 | } | 82 | /* } */ |
83 | |||
84 | /* template <class DomainType> */ | ||
85 | /* EntityReader<DomainType>::EntityReader(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier, Sink::Storage::DataStore::Transaction &transaction) */ | ||
86 | /* : mResourceInstanceIdentifier(resourceInstanceIdentifier), */ | ||
87 | /* mTransaction(transaction), */ | ||
88 | /* mDomainTypeAdaptorFactoryPtr(Sink::AdaptorFactoryRegistry::instance().getFactory<DomainType>(resourceType)), */ | ||
89 | /* mDomainTypeAdaptorFactory(*mDomainTypeAdaptorFactoryPtr) */ | ||
90 | /* { */ | ||
91 | /* Q_ASSERT(!resourceType.isEmpty()); */ | ||
92 | /* Q_ASSERT(mDomainTypeAdaptorFactoryPtr); */ | ||
93 | /* } */ | ||
94 | |||
95 | /* template <class DomainType> */ | ||
96 | /* EntityReader<DomainType>::EntityReader(DomainTypeAdaptorFactoryInterface &domainTypeAdaptorFactory, const QByteArray &resourceInstanceIdentifier, Sink::Storage::DataStore::Transaction &transaction) */ | ||
97 | /* : mResourceInstanceIdentifier(resourceInstanceIdentifier), */ | ||
98 | /* mTransaction(transaction), */ | ||
99 | /* mDomainTypeAdaptorFactory(domainTypeAdaptorFactory) */ | ||
100 | /* { */ | ||
101 | |||
102 | /* } */ | ||
83 | 103 | ||
84 | template <class DomainType> | 104 | template <class DomainType> |
85 | EntityReader<DomainType>::EntityReader(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier, Sink::Storage::Transaction &transaction) | 105 | EntityReader<DomainType>::EntityReader(Storage::EntityStore &entityStore) |
86 | : mResourceInstanceIdentifier(resourceInstanceIdentifier), | 106 | : mEntityStore(entityStore) |
87 | mTransaction(transaction), | ||
88 | mDomainTypeAdaptorFactoryPtr(Sink::AdaptorFactoryRegistry::instance().getFactory<DomainType>(resourceType)), | ||
89 | mDomainTypeAdaptorFactory(*mDomainTypeAdaptorFactoryPtr) | ||
90 | { | ||
91 | Q_ASSERT(!resourceType.isEmpty()); | ||
92 | Q_ASSERT(mDomainTypeAdaptorFactoryPtr); | ||
93 | } | ||
94 | |||
95 | template <class DomainType> | ||
96 | EntityReader<DomainType>::EntityReader(DomainTypeAdaptorFactoryInterface &domainTypeAdaptorFactory, const QByteArray &resourceInstanceIdentifier, Sink::Storage::Transaction &transaction) | ||
97 | : mResourceInstanceIdentifier(resourceInstanceIdentifier), | ||
98 | mTransaction(transaction), | ||
99 | mDomainTypeAdaptorFactory(domainTypeAdaptorFactory) | ||
100 | { | 107 | { |
101 | 108 | ||
102 | } | 109 | } |
@@ -105,40 +112,28 @@ template <class DomainType> | |||
105 | DomainType EntityReader<DomainType>::read(const QByteArray &identifier) const | 112 | DomainType EntityReader<DomainType>::read(const QByteArray &identifier) const |
106 | { | 113 | { |
107 | auto typeName = ApplicationDomain::getTypeName<DomainType>(); | 114 | auto typeName = ApplicationDomain::getTypeName<DomainType>(); |
108 | auto mainDatabase = Storage::mainDatabase(mTransaction, typeName); | 115 | return mEntityStore.readLatest<DomainType>(identifier); |
109 | qint64 retrievedRevision = 0; | ||
110 | auto bufferAdaptor = EntityReaderUtils::getLatest(mainDatabase, identifier, mDomainTypeAdaptorFactory, retrievedRevision); | ||
111 | if (!bufferAdaptor) { | ||
112 | return DomainType(); | ||
113 | } | ||
114 | return DomainType(mResourceInstanceIdentifier, identifier, retrievedRevision, bufferAdaptor); | ||
115 | } | 116 | } |
116 | 117 | ||
117 | template <class DomainType> | 118 | template <class DomainType> |
118 | DomainType EntityReader<DomainType>::readFromKey(const QByteArray &key) const | 119 | DomainType EntityReader<DomainType>::readFromKey(const QByteArray &key) const |
119 | { | 120 | { |
120 | auto typeName = ApplicationDomain::getTypeName<DomainType>(); | 121 | /* auto typeName = ApplicationDomain::getTypeName<DomainType>(); */ |
121 | auto mainDatabase = Storage::mainDatabase(mTransaction, typeName); | 122 | /* auto mainDatabase = Storage::DataStore::mainDatabase(mTransaction, typeName); */ |
122 | qint64 retrievedRevision = 0; | 123 | /* qint64 retrievedRevision = 0; */ |
123 | auto bufferAdaptor = EntityReaderUtils::get(mainDatabase, key, mDomainTypeAdaptorFactory, retrievedRevision); | 124 | /* auto bufferAdaptor = EntityReaderUtils::get(mainDatabase, key, mDomainTypeAdaptorFactory, retrievedRevision); */ |
124 | const auto identifier = Storage::uidFromKey(key); | 125 | /* const auto identifier = Storage::DataStore::uidFromKey(key); */ |
125 | if (!bufferAdaptor) { | 126 | /* if (!bufferAdaptor) { */ |
126 | return DomainType(); | 127 | /* return DomainType(); */ |
127 | } | 128 | /* } */ |
128 | return DomainType(mResourceInstanceIdentifier, identifier, retrievedRevision, bufferAdaptor); | 129 | /* return DomainType(mResourceInstanceIdentifier, identifier, retrievedRevision, bufferAdaptor); */ |
130 | return mEntityStore.readEntity<DomainType>(key); | ||
129 | } | 131 | } |
130 | 132 | ||
131 | template <class DomainType> | 133 | template <class DomainType> |
132 | DomainType EntityReader<DomainType>::readPrevious(const QByteArray &uid, qint64 revision) const | 134 | DomainType EntityReader<DomainType>::readPrevious(const QByteArray &uid, qint64 revision) const |
133 | { | 135 | { |
134 | auto typeName = ApplicationDomain::getTypeName<DomainType>(); | 136 | return mEntityStore.readPrevious<DomainType>(uid, revision); |
135 | auto mainDatabase = Storage::mainDatabase(mTransaction, typeName); | ||
136 | qint64 retrievedRevision = 0; | ||
137 | auto bufferAdaptor = EntityReaderUtils::getPrevious(mainDatabase, uid, revision, mDomainTypeAdaptorFactory, retrievedRevision); | ||
138 | if (!bufferAdaptor) { | ||
139 | return DomainType(); | ||
140 | } | ||
141 | return DomainType(mResourceInstanceIdentifier, uid, retrievedRevision, bufferAdaptor); | ||
142 | } | 137 | } |
143 | 138 | ||
144 | template <class DomainType> | 139 | template <class DomainType> |
@@ -157,14 +152,14 @@ QPair<qint64, qint64> EntityReader<DomainType>::executeInitialQuery(const Sink:: | |||
157 | QTime time; | 152 | QTime time; |
158 | time.start(); | 153 | time.start(); |
159 | 154 | ||
160 | auto preparedQuery = ApplicationDomain::TypeImplementation<DomainType>::prepareQuery(query, mTransaction); | 155 | auto preparedQuery = ApplicationDomain::TypeImplementation<DomainType>::prepareQuery(query, Storage::EntityStore::Ptr(&mEntityStore, [](Storage::EntityStore *){})); |
161 | auto resultSet = preparedQuery->execute(); | 156 | auto resultSet = preparedQuery->execute(); |
162 | 157 | ||
163 | SinkTrace() << "Filtered set retrieved. " << Log::TraceTime(time.elapsed()); | 158 | SinkTrace() << "Filtered set retrieved. " << Log::TraceTime(time.elapsed()); |
164 | auto replayedEntities = replaySet(resultSet, offset, batchsize, callback); | 159 | auto replayedEntities = replaySet(resultSet, offset, batchsize, callback); |
165 | 160 | ||
166 | SinkTrace() << "Initial query took: " << Log::TraceTime(time.elapsed()); | 161 | SinkTrace() << "Initial query took: " << Log::TraceTime(time.elapsed()); |
167 | return qMakePair(Sink::Storage::maxRevision(mTransaction), replayedEntities); | 162 | return qMakePair(mEntityStore.maxRevision(), replayedEntities); |
168 | } | 163 | } |
169 | 164 | ||
170 | template <class DomainType> | 165 | template <class DomainType> |
@@ -174,14 +169,14 @@ QPair<qint64, qint64> EntityReader<DomainType>::executeIncrementalQuery(const Si | |||
174 | time.start(); | 169 | time.start(); |
175 | const qint64 baseRevision = lastRevision + 1; | 170 | const qint64 baseRevision = lastRevision + 1; |
176 | 171 | ||
177 | auto preparedQuery = ApplicationDomain::TypeImplementation<DomainType>::prepareQuery(query, mTransaction); | 172 | auto preparedQuery = ApplicationDomain::TypeImplementation<DomainType>::prepareQuery(query, Storage::EntityStore::Ptr(&mEntityStore, [](Storage::EntityStore *){})); |
178 | auto resultSet = preparedQuery->update(baseRevision); | 173 | auto resultSet = preparedQuery->update(baseRevision); |
179 | 174 | ||
180 | SinkTrace() << "Filtered set retrieved. " << Log::TraceTime(time.elapsed()); | 175 | SinkTrace() << "Filtered set retrieved. " << Log::TraceTime(time.elapsed()); |
181 | auto replayedEntities = replaySet(resultSet, 0, 0, callback); | 176 | auto replayedEntities = replaySet(resultSet, 0, 0, callback); |
182 | 177 | ||
183 | SinkTrace() << "Incremental query took: " << Log::TraceTime(time.elapsed()); | 178 | SinkTrace() << "Incremental query took: " << Log::TraceTime(time.elapsed()); |
184 | return qMakePair(Sink::Storage::maxRevision(mTransaction), replayedEntities); | 179 | return qMakePair(mEntityStore.maxRevision(), replayedEntities); |
185 | } | 180 | } |
186 | 181 | ||
187 | template <class DomainType> | 182 | template <class DomainType> |
@@ -190,18 +185,18 @@ qint64 EntityReader<DomainType>::replaySet(ResultSet &resultSet, int offset, int | |||
190 | SinkTrace() << "Skipping over " << offset << " results"; | 185 | SinkTrace() << "Skipping over " << offset << " results"; |
191 | resultSet.skip(offset); | 186 | resultSet.skip(offset); |
192 | int counter = 0; | 187 | int counter = 0; |
193 | while (!batchSize || (counter < batchSize)) { | 188 | /* while (!batchSize || (counter < batchSize)) { */ |
194 | const bool ret = | 189 | /* const bool ret = */ |
195 | resultSet.next([this, &counter, callback](const ResultSet::Result &result) -> bool { | 190 | /* resultSet.next([this, &counter, callback](const ResultSet::Result &result) -> bool { */ |
196 | counter++; | 191 | /* counter++; */ |
197 | auto adaptor = mDomainTypeAdaptorFactory.createAdaptor(result.buffer.entity()); | 192 | /* auto adaptor = mResourceContext.adaptorFactory<DomainType>().createAdaptor(result.buffer.entity()); */ |
198 | Q_ASSERT(adaptor); | 193 | /* Q_ASSERT(adaptor); */ |
199 | return callback(QSharedPointer<DomainType>::create(mResourceInstanceIdentifier, result.uid, result.buffer.revision(), adaptor), result.operation, result.aggregateValues); | 194 | /* return callback(QSharedPointer<DomainType>::create(mResourceContext, result.uid, result.buffer.revision(), adaptor), result.operation, result.aggregateValues); */ |
200 | }); | 195 | /* }); */ |
201 | if (!ret) { | 196 | /* if (!ret) { */ |
202 | break; | 197 | /* break; */ |
203 | } | 198 | /* } */ |
204 | }; | 199 | /* }; */ |
205 | SinkTrace() << "Replayed " << counter << " results." | 200 | SinkTrace() << "Replayed " << counter << " results." |
206 | << "Limit " << batchSize; | 201 | << "Limit " << batchSize; |
207 | return counter; | 202 | return counter; |