summaryrefslogtreecommitdiffstats
path: root/common/entityreader.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'common/entityreader.cpp')
-rw-r--r--common/entityreader.cpp209
1 files changed, 102 insertions, 107 deletions
diff --git a/common/entityreader.cpp b/common/entityreader.cpp
index cca1511..c49d1f7 100644
--- a/common/entityreader.cpp
+++ b/common/entityreader.cpp
@@ -28,75 +28,82 @@ SINK_DEBUG_AREA("entityreader")
28 28
29using namespace Sink; 29using namespace Sink;
30 30
31QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> EntityReaderUtils::getLatest(const Sink::Storage::NamedDatabase &db, const QByteArray &uid, DomainTypeAdaptorFactoryInterface &adaptorFactory, qint64 &retrievedRevision) 31/* QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> EntityReaderUtils::getLatest(const Sink::Storage::DataStore::NamedDatabase &db, const QByteArray &uid, DomainTypeAdaptorFactoryInterface &adaptorFactory, qint64 &retrievedRevision) */
32{ 32/* { */
33 QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> current; 33/* QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> current; */
34 db.findLatest(uid, 34/* db.findLatest(uid, */
35 [&current, &adaptorFactory, &retrievedRevision](const QByteArray &key, const QByteArray &data) -> bool { 35/* [&current, &adaptorFactory, &retrievedRevision](const QByteArray &key, const QByteArray &data) -> bool { */
36 Sink::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); 36/* Sink::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); */
37 if (!buffer.isValid()) { 37/* if (!buffer.isValid()) { */
38 SinkWarning() << "Read invalid buffer from disk"; 38/* SinkWarning() << "Read invalid buffer from disk"; */
39 } else { 39/* } else { */
40 SinkTrace() << "Found value " << key; 40/* SinkTrace() << "Found value " << key; */
41 current = adaptorFactory.createAdaptor(buffer.entity()); 41/* current = adaptorFactory.createAdaptor(buffer.entity()); */
42 retrievedRevision = Sink::Storage::revisionFromKey(key); 42/* retrievedRevision = Sink::Storage::DataStore::revisionFromKey(key); */
43 } 43/* } */
44 return false; 44/* return false; */
45 }, 45/* }, */
46 [](const Sink::Storage::Error &error) { SinkWarning() << "Failed to read current value from storage: " << error.message; }); 46/* [](const Sink::Storage::DataStore::Error &error) { SinkWarning() << "Failed to read current value from storage: " << error.message; }); */
47 return current; 47/* return current; */
48} 48/* } */
49 49
50QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> EntityReaderUtils::get(const Sink::Storage::NamedDatabase &db, const QByteArray &key, DomainTypeAdaptorFactoryInterface &adaptorFactory, qint64 &retrievedRevision) 50/* QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> EntityReaderUtils::get(const Sink::Storage::DataStore::NamedDatabase &db, const QByteArray &key, DomainTypeAdaptorFactoryInterface &adaptorFactory, qint64 &retrievedRevision) */
51{ 51/* { */
52 QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> current; 52/* QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> current; */
53 db.scan(key, 53/* db.scan(key, */
54 [&current, &adaptorFactory, &retrievedRevision](const QByteArray &key, const QByteArray &data) -> bool { 54/* [&current, &adaptorFactory, &retrievedRevision](const QByteArray &key, const QByteArray &data) -> bool { */
55 Sink::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); 55/* Sink::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); */
56 if (!buffer.isValid()) { 56/* if (!buffer.isValid()) { */
57 SinkWarning() << "Read invalid buffer from disk"; 57/* SinkWarning() << "Read invalid buffer from disk"; */
58 } else { 58/* } else { */
59 current = adaptorFactory.createAdaptor(buffer.entity()); 59/* current = adaptorFactory.createAdaptor(buffer.entity()); */
60 retrievedRevision = Sink::Storage::revisionFromKey(key); 60/* retrievedRevision = Sink::Storage::DataStore::revisionFromKey(key); */
61 } 61/* } */
62 return false; 62/* return false; */
63 }, 63/* }, */
64 [](const Sink::Storage::Error &error) { SinkWarning() << "Failed to read current value from storage: " << error.message; }); 64/* [](const Sink::Storage::DataStore::Error &error) { SinkWarning() << "Failed to read current value from storage: " << error.message; }); */
65 return current; 65/* return current; */
66} 66/* } */
67 67
68QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> EntityReaderUtils::getPrevious(const Sink::Storage::NamedDatabase &db, const QByteArray &uid, qint64 revision, DomainTypeAdaptorFactoryInterface &adaptorFactory, qint64 &retrievedRevision) 68/* QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> EntityReaderUtils::getPrevious(const Sink::Storage::DataStore::NamedDatabase &db, const QByteArray &uid, qint64 revision, DomainTypeAdaptorFactoryInterface &adaptorFactory, qint64 &retrievedRevision) */
69{ 69/* { */
70 QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> current; 70/* QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> current; */
71 qint64 latestRevision = 0; 71/* qint64 latestRevision = 0; */
72 db.scan(uid, 72/* db.scan(uid, */
73 [&current, &latestRevision, revision](const QByteArray &key, const QByteArray &) -> bool { 73/* [&current, &latestRevision, revision](const QByteArray &key, const QByteArray &) -> bool { */
74 auto foundRevision = Sink::Storage::revisionFromKey(key); 74/* auto foundRevision = Sink::Storage::DataStore::revisionFromKey(key); */
75 if (foundRevision < revision && foundRevision > latestRevision) { 75/* if (foundRevision < revision && foundRevision > latestRevision) { */
76 latestRevision = foundRevision; 76/* latestRevision = foundRevision; */
77 } 77/* } */
78 return true; 78/* return true; */
79 }, 79/* }, */
80 [](const Sink::Storage::Error &error) { SinkWarning() << "Failed to read current value from storage: " << error.message; }, true); 80/* [](const Sink::Storage::DataStore::Error &error) { SinkWarning() << "Failed to read current value from storage: " << error.message; }, true); */
81 return get(db, Sink::Storage::assembleKey(uid, latestRevision), adaptorFactory, retrievedRevision); 81/* return get(db, Sink::Storage::DataStore::assembleKey(uid, latestRevision), adaptorFactory, retrievedRevision); */
82} 82/* } */
83
84/* template <class DomainType> */
85/* EntityReader<DomainType>::EntityReader(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier, Sink::Storage::DataStore::Transaction &transaction) */
86/* : mResourceInstanceIdentifier(resourceInstanceIdentifier), */
87/* mTransaction(transaction), */
88/* mDomainTypeAdaptorFactoryPtr(Sink::AdaptorFactoryRegistry::instance().getFactory<DomainType>(resourceType)), */
89/* mDomainTypeAdaptorFactory(*mDomainTypeAdaptorFactoryPtr) */
90/* { */
91/* Q_ASSERT(!resourceType.isEmpty()); */
92/* Q_ASSERT(mDomainTypeAdaptorFactoryPtr); */
93/* } */
94
95/* template <class DomainType> */
96/* EntityReader<DomainType>::EntityReader(DomainTypeAdaptorFactoryInterface &domainTypeAdaptorFactory, const QByteArray &resourceInstanceIdentifier, Sink::Storage::DataStore::Transaction &transaction) */
97/* : mResourceInstanceIdentifier(resourceInstanceIdentifier), */
98/* mTransaction(transaction), */
99/* mDomainTypeAdaptorFactory(domainTypeAdaptorFactory) */
100/* { */
101
102/* } */
83 103
84template <class DomainType> 104template <class DomainType>
85EntityReader<DomainType>::EntityReader(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier, Sink::Storage::Transaction &transaction) 105EntityReader<DomainType>::EntityReader(Storage::EntityStore &entityStore)
86 : mResourceInstanceIdentifier(resourceInstanceIdentifier), 106 : mEntityStore(entityStore)
87 mTransaction(transaction),
88 mDomainTypeAdaptorFactoryPtr(Sink::AdaptorFactoryRegistry::instance().getFactory<DomainType>(resourceType)),
89 mDomainTypeAdaptorFactory(*mDomainTypeAdaptorFactoryPtr)
90{
91 Q_ASSERT(!resourceType.isEmpty());
92 Q_ASSERT(mDomainTypeAdaptorFactoryPtr);
93}
94
95template <class DomainType>
96EntityReader<DomainType>::EntityReader(DomainTypeAdaptorFactoryInterface &domainTypeAdaptorFactory, const QByteArray &resourceInstanceIdentifier, Sink::Storage::Transaction &transaction)
97 : mResourceInstanceIdentifier(resourceInstanceIdentifier),
98 mTransaction(transaction),
99 mDomainTypeAdaptorFactory(domainTypeAdaptorFactory)
100{ 107{
101 108
102} 109}
@@ -105,40 +112,28 @@ template <class DomainType>
105DomainType EntityReader<DomainType>::read(const QByteArray &identifier) const 112DomainType EntityReader<DomainType>::read(const QByteArray &identifier) const
106{ 113{
107 auto typeName = ApplicationDomain::getTypeName<DomainType>(); 114 auto typeName = ApplicationDomain::getTypeName<DomainType>();
108 auto mainDatabase = Storage::mainDatabase(mTransaction, typeName); 115 return mEntityStore.readLatest<DomainType>(identifier);
109 qint64 retrievedRevision = 0;
110 auto bufferAdaptor = EntityReaderUtils::getLatest(mainDatabase, identifier, mDomainTypeAdaptorFactory, retrievedRevision);
111 if (!bufferAdaptor) {
112 return DomainType();
113 }
114 return DomainType(mResourceInstanceIdentifier, identifier, retrievedRevision, bufferAdaptor);
115} 116}
116 117
117template <class DomainType> 118template <class DomainType>
118DomainType EntityReader<DomainType>::readFromKey(const QByteArray &key) const 119DomainType EntityReader<DomainType>::readFromKey(const QByteArray &key) const
119{ 120{
120 auto typeName = ApplicationDomain::getTypeName<DomainType>(); 121 /* auto typeName = ApplicationDomain::getTypeName<DomainType>(); */
121 auto mainDatabase = Storage::mainDatabase(mTransaction, typeName); 122 /* auto mainDatabase = Storage::DataStore::mainDatabase(mTransaction, typeName); */
122 qint64 retrievedRevision = 0; 123 /* qint64 retrievedRevision = 0; */
123 auto bufferAdaptor = EntityReaderUtils::get(mainDatabase, key, mDomainTypeAdaptorFactory, retrievedRevision); 124 /* auto bufferAdaptor = EntityReaderUtils::get(mainDatabase, key, mDomainTypeAdaptorFactory, retrievedRevision); */
124 const auto identifier = Storage::uidFromKey(key); 125 /* const auto identifier = Storage::DataStore::uidFromKey(key); */
125 if (!bufferAdaptor) { 126 /* if (!bufferAdaptor) { */
126 return DomainType(); 127 /* return DomainType(); */
127 } 128 /* } */
128 return DomainType(mResourceInstanceIdentifier, identifier, retrievedRevision, bufferAdaptor); 129 /* return DomainType(mResourceInstanceIdentifier, identifier, retrievedRevision, bufferAdaptor); */
130 return mEntityStore.readEntity<DomainType>(key);
129} 131}
130 132
131template <class DomainType> 133template <class DomainType>
132DomainType EntityReader<DomainType>::readPrevious(const QByteArray &uid, qint64 revision) const 134DomainType EntityReader<DomainType>::readPrevious(const QByteArray &uid, qint64 revision) const
133{ 135{
134 auto typeName = ApplicationDomain::getTypeName<DomainType>(); 136 return mEntityStore.readPrevious<DomainType>(uid, revision);
135 auto mainDatabase = Storage::mainDatabase(mTransaction, typeName);
136 qint64 retrievedRevision = 0;
137 auto bufferAdaptor = EntityReaderUtils::getPrevious(mainDatabase, uid, revision, mDomainTypeAdaptorFactory, retrievedRevision);
138 if (!bufferAdaptor) {
139 return DomainType();
140 }
141 return DomainType(mResourceInstanceIdentifier, uid, retrievedRevision, bufferAdaptor);
142} 137}
143 138
144template <class DomainType> 139template <class DomainType>
@@ -157,14 +152,14 @@ QPair<qint64, qint64> EntityReader<DomainType>::executeInitialQuery(const Sink::
157 QTime time; 152 QTime time;
158 time.start(); 153 time.start();
159 154
160 auto preparedQuery = ApplicationDomain::TypeImplementation<DomainType>::prepareQuery(query, mTransaction); 155 auto preparedQuery = ApplicationDomain::TypeImplementation<DomainType>::prepareQuery(query, Storage::EntityStore::Ptr(&mEntityStore, [](Storage::EntityStore *){}));
161 auto resultSet = preparedQuery->execute(); 156 auto resultSet = preparedQuery->execute();
162 157
163 SinkTrace() << "Filtered set retrieved. " << Log::TraceTime(time.elapsed()); 158 SinkTrace() << "Filtered set retrieved. " << Log::TraceTime(time.elapsed());
164 auto replayedEntities = replaySet(resultSet, offset, batchsize, callback); 159 auto replayedEntities = replaySet(resultSet, offset, batchsize, callback);
165 160
166 SinkTrace() << "Initial query took: " << Log::TraceTime(time.elapsed()); 161 SinkTrace() << "Initial query took: " << Log::TraceTime(time.elapsed());
167 return qMakePair(Sink::Storage::maxRevision(mTransaction), replayedEntities); 162 return qMakePair(mEntityStore.maxRevision(), replayedEntities);
168} 163}
169 164
170template <class DomainType> 165template <class DomainType>
@@ -174,14 +169,14 @@ QPair<qint64, qint64> EntityReader<DomainType>::executeIncrementalQuery(const Si
174 time.start(); 169 time.start();
175 const qint64 baseRevision = lastRevision + 1; 170 const qint64 baseRevision = lastRevision + 1;
176 171
177 auto preparedQuery = ApplicationDomain::TypeImplementation<DomainType>::prepareQuery(query, mTransaction); 172 auto preparedQuery = ApplicationDomain::TypeImplementation<DomainType>::prepareQuery(query, Storage::EntityStore::Ptr(&mEntityStore, [](Storage::EntityStore *){}));
178 auto resultSet = preparedQuery->update(baseRevision); 173 auto resultSet = preparedQuery->update(baseRevision);
179 174
180 SinkTrace() << "Filtered set retrieved. " << Log::TraceTime(time.elapsed()); 175 SinkTrace() << "Filtered set retrieved. " << Log::TraceTime(time.elapsed());
181 auto replayedEntities = replaySet(resultSet, 0, 0, callback); 176 auto replayedEntities = replaySet(resultSet, 0, 0, callback);
182 177
183 SinkTrace() << "Incremental query took: " << Log::TraceTime(time.elapsed()); 178 SinkTrace() << "Incremental query took: " << Log::TraceTime(time.elapsed());
184 return qMakePair(Sink::Storage::maxRevision(mTransaction), replayedEntities); 179 return qMakePair(mEntityStore.maxRevision(), replayedEntities);
185} 180}
186 181
187template <class DomainType> 182template <class DomainType>
@@ -190,18 +185,18 @@ qint64 EntityReader<DomainType>::replaySet(ResultSet &resultSet, int offset, int
190 SinkTrace() << "Skipping over " << offset << " results"; 185 SinkTrace() << "Skipping over " << offset << " results";
191 resultSet.skip(offset); 186 resultSet.skip(offset);
192 int counter = 0; 187 int counter = 0;
193 while (!batchSize || (counter < batchSize)) { 188 /* while (!batchSize || (counter < batchSize)) { */
194 const bool ret = 189 /* const bool ret = */
195 resultSet.next([this, &counter, callback](const ResultSet::Result &result) -> bool { 190 /* resultSet.next([this, &counter, callback](const ResultSet::Result &result) -> bool { */
196 counter++; 191 /* counter++; */
197 auto adaptor = mDomainTypeAdaptorFactory.createAdaptor(result.buffer.entity()); 192 /* auto adaptor = mResourceContext.adaptorFactory<DomainType>().createAdaptor(result.buffer.entity()); */
198 Q_ASSERT(adaptor); 193 /* Q_ASSERT(adaptor); */
199 return callback(QSharedPointer<DomainType>::create(mResourceInstanceIdentifier, result.uid, result.buffer.revision(), adaptor), result.operation, result.aggregateValues); 194 /* return callback(QSharedPointer<DomainType>::create(mResourceContext, result.uid, result.buffer.revision(), adaptor), result.operation, result.aggregateValues); */
200 }); 195 /* }); */
201 if (!ret) { 196 /* if (!ret) { */
202 break; 197 /* break; */
203 } 198 /* } */
204 }; 199 /* }; */
205 SinkTrace() << "Replayed " << counter << " results." 200 SinkTrace() << "Replayed " << counter << " results."
206 << "Limit " << batchSize; 201 << "Limit " << batchSize;
207 return counter; 202 return counter;