diff options
Diffstat (limited to 'common/entityreader.cpp')
-rw-r--r-- | common/entityreader.cpp | 209 |
1 files changed, 102 insertions, 107 deletions
diff --git a/common/entityreader.cpp b/common/entityreader.cpp index cca1511..c49d1f7 100644 --- a/common/entityreader.cpp +++ b/common/entityreader.cpp | |||
@@ -28,75 +28,82 @@ SINK_DEBUG_AREA("entityreader") | |||
28 | 28 | ||
29 | using namespace Sink; | 29 | using namespace Sink; |
30 | 30 | ||
31 | QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> EntityReaderUtils::getLatest(const Sink::Storage::NamedDatabase &db, const QByteArray &uid, DomainTypeAdaptorFactoryInterface &adaptorFactory, qint64 &retrievedRevision) | 31 | /* QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> EntityReaderUtils::getLatest(const Sink::Storage::DataStore::NamedDatabase &db, const QByteArray &uid, DomainTypeAdaptorFactoryInterface &adaptorFactory, qint64 &retrievedRevision) */ |
32 | { | 32 | /* { */ |
33 | QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> current; | 33 | /* QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> current; */ |
34 | db.findLatest(uid, | 34 | /* db.findLatest(uid, */ |
35 | [¤t, &adaptorFactory, &retrievedRevision](const QByteArray &key, const QByteArray &data) -> bool { | 35 | /* [¤t, &adaptorFactory, &retrievedRevision](const QByteArray &key, const QByteArray &data) -> bool { */ |
36 | Sink::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); | 36 | /* Sink::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); */ |
37 | if (!buffer.isValid()) { | 37 | /* if (!buffer.isValid()) { */ |
38 | SinkWarning() << "Read invalid buffer from disk"; | 38 | /* SinkWarning() << "Read invalid buffer from disk"; */ |
39 | } else { | 39 | /* } else { */ |
40 | SinkTrace() << "Found value " << key; | 40 | /* SinkTrace() << "Found value " << key; */ |
41 | current = adaptorFactory.createAdaptor(buffer.entity()); | 41 | /* current = adaptorFactory.createAdaptor(buffer.entity()); */ |
42 | retrievedRevision = Sink::Storage::revisionFromKey(key); | 42 | /* retrievedRevision = Sink::Storage::DataStore::revisionFromKey(key); */ |
43 | } | 43 | /* } */ |
44 | return false; | 44 | /* return false; */ |
45 | }, | 45 | /* }, */ |
46 | [](const Sink::Storage::Error &error) { SinkWarning() << "Failed to read current value from storage: " << error.message; }); | 46 | /* [](const Sink::Storage::DataStore::Error &error) { SinkWarning() << "Failed to read current value from storage: " << error.message; }); */ |
47 | return current; | 47 | /* return current; */ |
48 | } | 48 | /* } */ |
49 | 49 | ||
50 | QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> EntityReaderUtils::get(const Sink::Storage::NamedDatabase &db, const QByteArray &key, DomainTypeAdaptorFactoryInterface &adaptorFactory, qint64 &retrievedRevision) | 50 | /* QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> EntityReaderUtils::get(const Sink::Storage::DataStore::NamedDatabase &db, const QByteArray &key, DomainTypeAdaptorFactoryInterface &adaptorFactory, qint64 &retrievedRevision) */ |
51 | { | 51 | /* { */ |
52 | QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> current; | 52 | /* QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> current; */ |
53 | db.scan(key, | 53 | /* db.scan(key, */ |
54 | [¤t, &adaptorFactory, &retrievedRevision](const QByteArray &key, const QByteArray &data) -> bool { | 54 | /* [¤t, &adaptorFactory, &retrievedRevision](const QByteArray &key, const QByteArray &data) -> bool { */ |
55 | Sink::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); | 55 | /* Sink::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); */ |
56 | if (!buffer.isValid()) { | 56 | /* if (!buffer.isValid()) { */ |
57 | SinkWarning() << "Read invalid buffer from disk"; | 57 | /* SinkWarning() << "Read invalid buffer from disk"; */ |
58 | } else { | 58 | /* } else { */ |
59 | current = adaptorFactory.createAdaptor(buffer.entity()); | 59 | /* current = adaptorFactory.createAdaptor(buffer.entity()); */ |
60 | retrievedRevision = Sink::Storage::revisionFromKey(key); | 60 | /* retrievedRevision = Sink::Storage::DataStore::revisionFromKey(key); */ |
61 | } | 61 | /* } */ |
62 | return false; | 62 | /* return false; */ |
63 | }, | 63 | /* }, */ |
64 | [](const Sink::Storage::Error &error) { SinkWarning() << "Failed to read current value from storage: " << error.message; }); | 64 | /* [](const Sink::Storage::DataStore::Error &error) { SinkWarning() << "Failed to read current value from storage: " << error.message; }); */ |
65 | return current; | 65 | /* return current; */ |
66 | } | 66 | /* } */ |
67 | 67 | ||
68 | QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> EntityReaderUtils::getPrevious(const Sink::Storage::NamedDatabase &db, const QByteArray &uid, qint64 revision, DomainTypeAdaptorFactoryInterface &adaptorFactory, qint64 &retrievedRevision) | 68 | /* QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> EntityReaderUtils::getPrevious(const Sink::Storage::DataStore::NamedDatabase &db, const QByteArray &uid, qint64 revision, DomainTypeAdaptorFactoryInterface &adaptorFactory, qint64 &retrievedRevision) */ |
69 | { | 69 | /* { */ |
70 | QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> current; | 70 | /* QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> current; */ |
71 | qint64 latestRevision = 0; | 71 | /* qint64 latestRevision = 0; */ |
72 | db.scan(uid, | 72 | /* db.scan(uid, */ |
73 | [¤t, &latestRevision, revision](const QByteArray &key, const QByteArray &) -> bool { | 73 | /* [¤t, &latestRevision, revision](const QByteArray &key, const QByteArray &) -> bool { */ |
74 | auto foundRevision = Sink::Storage::revisionFromKey(key); | 74 | /* auto foundRevision = Sink::Storage::DataStore::revisionFromKey(key); */ |
75 | if (foundRevision < revision && foundRevision > latestRevision) { | 75 | /* if (foundRevision < revision && foundRevision > latestRevision) { */ |
76 | latestRevision = foundRevision; | 76 | /* latestRevision = foundRevision; */ |
77 | } | 77 | /* } */ |
78 | return true; | 78 | /* return true; */ |
79 | }, | 79 | /* }, */ |
80 | [](const Sink::Storage::Error &error) { SinkWarning() << "Failed to read current value from storage: " << error.message; }, true); | 80 | /* [](const Sink::Storage::DataStore::Error &error) { SinkWarning() << "Failed to read current value from storage: " << error.message; }, true); */ |
81 | return get(db, Sink::Storage::assembleKey(uid, latestRevision), adaptorFactory, retrievedRevision); | 81 | /* return get(db, Sink::Storage::DataStore::assembleKey(uid, latestRevision), adaptorFactory, retrievedRevision); */ |
82 | } | 82 | /* } */ |
83 | |||
84 | /* template <class DomainType> */ | ||
85 | /* EntityReader<DomainType>::EntityReader(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier, Sink::Storage::DataStore::Transaction &transaction) */ | ||
86 | /* : mResourceInstanceIdentifier(resourceInstanceIdentifier), */ | ||
87 | /* mTransaction(transaction), */ | ||
88 | /* mDomainTypeAdaptorFactoryPtr(Sink::AdaptorFactoryRegistry::instance().getFactory<DomainType>(resourceType)), */ | ||
89 | /* mDomainTypeAdaptorFactory(*mDomainTypeAdaptorFactoryPtr) */ | ||
90 | /* { */ | ||
91 | /* Q_ASSERT(!resourceType.isEmpty()); */ | ||
92 | /* Q_ASSERT(mDomainTypeAdaptorFactoryPtr); */ | ||
93 | /* } */ | ||
94 | |||
95 | /* template <class DomainType> */ | ||
96 | /* EntityReader<DomainType>::EntityReader(DomainTypeAdaptorFactoryInterface &domainTypeAdaptorFactory, const QByteArray &resourceInstanceIdentifier, Sink::Storage::DataStore::Transaction &transaction) */ | ||
97 | /* : mResourceInstanceIdentifier(resourceInstanceIdentifier), */ | ||
98 | /* mTransaction(transaction), */ | ||
99 | /* mDomainTypeAdaptorFactory(domainTypeAdaptorFactory) */ | ||
100 | /* { */ | ||
101 | |||
102 | /* } */ | ||
83 | 103 | ||
84 | template <class DomainType> | 104 | template <class DomainType> |
85 | EntityReader<DomainType>::EntityReader(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier, Sink::Storage::Transaction &transaction) | 105 | EntityReader<DomainType>::EntityReader(Storage::EntityStore &entityStore) |
86 | : mResourceInstanceIdentifier(resourceInstanceIdentifier), | 106 | : mEntityStore(entityStore) |
87 | mTransaction(transaction), | ||
88 | mDomainTypeAdaptorFactoryPtr(Sink::AdaptorFactoryRegistry::instance().getFactory<DomainType>(resourceType)), | ||
89 | mDomainTypeAdaptorFactory(*mDomainTypeAdaptorFactoryPtr) | ||
90 | { | ||
91 | Q_ASSERT(!resourceType.isEmpty()); | ||
92 | Q_ASSERT(mDomainTypeAdaptorFactoryPtr); | ||
93 | } | ||
94 | |||
95 | template <class DomainType> | ||
96 | EntityReader<DomainType>::EntityReader(DomainTypeAdaptorFactoryInterface &domainTypeAdaptorFactory, const QByteArray &resourceInstanceIdentifier, Sink::Storage::Transaction &transaction) | ||
97 | : mResourceInstanceIdentifier(resourceInstanceIdentifier), | ||
98 | mTransaction(transaction), | ||
99 | mDomainTypeAdaptorFactory(domainTypeAdaptorFactory) | ||
100 | { | 107 | { |
101 | 108 | ||
102 | } | 109 | } |
@@ -105,40 +112,28 @@ template <class DomainType> | |||
105 | DomainType EntityReader<DomainType>::read(const QByteArray &identifier) const | 112 | DomainType EntityReader<DomainType>::read(const QByteArray &identifier) const |
106 | { | 113 | { |
107 | auto typeName = ApplicationDomain::getTypeName<DomainType>(); | 114 | auto typeName = ApplicationDomain::getTypeName<DomainType>(); |
108 | auto mainDatabase = Storage::mainDatabase(mTransaction, typeName); | 115 | return mEntityStore.readLatest<DomainType>(identifier); |
109 | qint64 retrievedRevision = 0; | ||
110 | auto bufferAdaptor = EntityReaderUtils::getLatest(mainDatabase, identifier, mDomainTypeAdaptorFactory, retrievedRevision); | ||
111 | if (!bufferAdaptor) { | ||
112 | return DomainType(); | ||
113 | } | ||
114 | return DomainType(mResourceInstanceIdentifier, identifier, retrievedRevision, bufferAdaptor); | ||
115 | } | 116 | } |
116 | 117 | ||
117 | template <class DomainType> | 118 | template <class DomainType> |
118 | DomainType EntityReader<DomainType>::readFromKey(const QByteArray &key) const | 119 | DomainType EntityReader<DomainType>::readFromKey(const QByteArray &key) const |
119 | { | 120 | { |
120 | auto typeName = ApplicationDomain::getTypeName<DomainType>(); | 121 | /* auto typeName = ApplicationDomain::getTypeName<DomainType>(); */ |
121 | auto mainDatabase = Storage::mainDatabase(mTransaction, typeName); | 122 | /* auto mainDatabase = Storage::DataStore::mainDatabase(mTransaction, typeName); */ |
122 | qint64 retrievedRevision = 0; | 123 | /* qint64 retrievedRevision = 0; */ |
123 | auto bufferAdaptor = EntityReaderUtils::get(mainDatabase, key, mDomainTypeAdaptorFactory, retrievedRevision); | 124 | /* auto bufferAdaptor = EntityReaderUtils::get(mainDatabase, key, mDomainTypeAdaptorFactory, retrievedRevision); */ |
124 | const auto identifier = Storage::uidFromKey(key); | 125 | /* const auto identifier = Storage::DataStore::uidFromKey(key); */ |
125 | if (!bufferAdaptor) { | 126 | /* if (!bufferAdaptor) { */ |
126 | return DomainType(); | 127 | /* return DomainType(); */ |
127 | } | 128 | /* } */ |
128 | return DomainType(mResourceInstanceIdentifier, identifier, retrievedRevision, bufferAdaptor); | 129 | /* return DomainType(mResourceInstanceIdentifier, identifier, retrievedRevision, bufferAdaptor); */ |
130 | return mEntityStore.readEntity<DomainType>(key); | ||
129 | } | 131 | } |
130 | 132 | ||
131 | template <class DomainType> | 133 | template <class DomainType> |
132 | DomainType EntityReader<DomainType>::readPrevious(const QByteArray &uid, qint64 revision) const | 134 | DomainType EntityReader<DomainType>::readPrevious(const QByteArray &uid, qint64 revision) const |
133 | { | 135 | { |
134 | auto typeName = ApplicationDomain::getTypeName<DomainType>(); | 136 | return mEntityStore.readPrevious<DomainType>(uid, revision); |
135 | auto mainDatabase = Storage::mainDatabase(mTransaction, typeName); | ||
136 | qint64 retrievedRevision = 0; | ||
137 | auto bufferAdaptor = EntityReaderUtils::getPrevious(mainDatabase, uid, revision, mDomainTypeAdaptorFactory, retrievedRevision); | ||
138 | if (!bufferAdaptor) { | ||
139 | return DomainType(); | ||
140 | } | ||
141 | return DomainType(mResourceInstanceIdentifier, uid, retrievedRevision, bufferAdaptor); | ||
142 | } | 137 | } |
143 | 138 | ||
144 | template <class DomainType> | 139 | template <class DomainType> |
@@ -157,14 +152,14 @@ QPair<qint64, qint64> EntityReader<DomainType>::executeInitialQuery(const Sink:: | |||
157 | QTime time; | 152 | QTime time; |
158 | time.start(); | 153 | time.start(); |
159 | 154 | ||
160 | auto preparedQuery = ApplicationDomain::TypeImplementation<DomainType>::prepareQuery(query, mTransaction); | 155 | auto preparedQuery = ApplicationDomain::TypeImplementation<DomainType>::prepareQuery(query, Storage::EntityStore::Ptr(&mEntityStore, [](Storage::EntityStore *){})); |
161 | auto resultSet = preparedQuery->execute(); | 156 | auto resultSet = preparedQuery->execute(); |
162 | 157 | ||
163 | SinkTrace() << "Filtered set retrieved. " << Log::TraceTime(time.elapsed()); | 158 | SinkTrace() << "Filtered set retrieved. " << Log::TraceTime(time.elapsed()); |
164 | auto replayedEntities = replaySet(resultSet, offset, batchsize, callback); | 159 | auto replayedEntities = replaySet(resultSet, offset, batchsize, callback); |
165 | 160 | ||
166 | SinkTrace() << "Initial query took: " << Log::TraceTime(time.elapsed()); | 161 | SinkTrace() << "Initial query took: " << Log::TraceTime(time.elapsed()); |
167 | return qMakePair(Sink::Storage::maxRevision(mTransaction), replayedEntities); | 162 | return qMakePair(mEntityStore.maxRevision(), replayedEntities); |
168 | } | 163 | } |
169 | 164 | ||
170 | template <class DomainType> | 165 | template <class DomainType> |
@@ -174,14 +169,14 @@ QPair<qint64, qint64> EntityReader<DomainType>::executeIncrementalQuery(const Si | |||
174 | time.start(); | 169 | time.start(); |
175 | const qint64 baseRevision = lastRevision + 1; | 170 | const qint64 baseRevision = lastRevision + 1; |
176 | 171 | ||
177 | auto preparedQuery = ApplicationDomain::TypeImplementation<DomainType>::prepareQuery(query, mTransaction); | 172 | auto preparedQuery = ApplicationDomain::TypeImplementation<DomainType>::prepareQuery(query, Storage::EntityStore::Ptr(&mEntityStore, [](Storage::EntityStore *){})); |
178 | auto resultSet = preparedQuery->update(baseRevision); | 173 | auto resultSet = preparedQuery->update(baseRevision); |
179 | 174 | ||
180 | SinkTrace() << "Filtered set retrieved. " << Log::TraceTime(time.elapsed()); | 175 | SinkTrace() << "Filtered set retrieved. " << Log::TraceTime(time.elapsed()); |
181 | auto replayedEntities = replaySet(resultSet, 0, 0, callback); | 176 | auto replayedEntities = replaySet(resultSet, 0, 0, callback); |
182 | 177 | ||
183 | SinkTrace() << "Incremental query took: " << Log::TraceTime(time.elapsed()); | 178 | SinkTrace() << "Incremental query took: " << Log::TraceTime(time.elapsed()); |
184 | return qMakePair(Sink::Storage::maxRevision(mTransaction), replayedEntities); | 179 | return qMakePair(mEntityStore.maxRevision(), replayedEntities); |
185 | } | 180 | } |
186 | 181 | ||
187 | template <class DomainType> | 182 | template <class DomainType> |
@@ -190,18 +185,18 @@ qint64 EntityReader<DomainType>::replaySet(ResultSet &resultSet, int offset, int | |||
190 | SinkTrace() << "Skipping over " << offset << " results"; | 185 | SinkTrace() << "Skipping over " << offset << " results"; |
191 | resultSet.skip(offset); | 186 | resultSet.skip(offset); |
192 | int counter = 0; | 187 | int counter = 0; |
193 | while (!batchSize || (counter < batchSize)) { | 188 | /* while (!batchSize || (counter < batchSize)) { */ |
194 | const bool ret = | 189 | /* const bool ret = */ |
195 | resultSet.next([this, &counter, callback](const ResultSet::Result &result) -> bool { | 190 | /* resultSet.next([this, &counter, callback](const ResultSet::Result &result) -> bool { */ |
196 | counter++; | 191 | /* counter++; */ |
197 | auto adaptor = mDomainTypeAdaptorFactory.createAdaptor(result.buffer.entity()); | 192 | /* auto adaptor = mResourceContext.adaptorFactory<DomainType>().createAdaptor(result.buffer.entity()); */ |
198 | Q_ASSERT(adaptor); | 193 | /* Q_ASSERT(adaptor); */ |
199 | return callback(QSharedPointer<DomainType>::create(mResourceInstanceIdentifier, result.uid, result.buffer.revision(), adaptor), result.operation, result.aggregateValues); | 194 | /* return callback(QSharedPointer<DomainType>::create(mResourceContext, result.uid, result.buffer.revision(), adaptor), result.operation, result.aggregateValues); */ |
200 | }); | 195 | /* }); */ |
201 | if (!ret) { | 196 | /* if (!ret) { */ |
202 | break; | 197 | /* break; */ |
203 | } | 198 | /* } */ |
204 | }; | 199 | /* }; */ |
205 | SinkTrace() << "Replayed " << counter << " results." | 200 | SinkTrace() << "Replayed " << counter << " results." |
206 | << "Limit " << batchSize; | 201 | << "Limit " << batchSize; |
207 | return counter; | 202 | return counter; |