summaryrefslogtreecommitdiffstats
path: root/common/entityreader.cpp
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2016-10-16 14:55:20 +0200
committerChristian Mollekopf <chrigi_1@fastmail.fm>2016-10-21 09:02:21 +0200
commit237b9ae4113e7a9f489632296941becb71afdb45 (patch)
tree01cde58f495944f01cad9d282391d4efd2897141 /common/entityreader.cpp
parent95d11bf0be98a4e3c08502fe23417b800233ce14 (diff)
downloadsink-237b9ae4113e7a9f489632296941becb71afdb45.tar.gz
sink-237b9ae4113e7a9f489632296941becb71afdb45.zip
Refactor how the storage is used.
This is the initial refactoring to improve how we deal with the storage. It does a couple of things: * Rename Sink::Storage to Sink::Storage::DataStore to free up the Sink::Storage namespace * Introduce a Sink::ResourceContext to have a single object that can be passed around containing everything that is necessary to operate on a resource. This is a lot better than the multiple separate parameters that we used to pass around all over the place, while still allowing for dependency injection for tests. * Tie storage access together using the new EntityStore that directly works with ApplicationDomainTypes. This gives us a central place where main storage, indexes and buffer adaptors are tied together, which will also give us a place to implement external indexes, such as a fulltextindex using xapian. * Use ApplicationDomainTypes as the default way to pass around entities. Instead of using various ways to pass around entities (buffers, buffer adaptors, ApplicationDomainTypes), only use a single way. The old approach was confusing, and was only done as: * optimization; really shouldn't be necessary and otherwise I'm sure we can find better ways to optimize ApplicationDomainType itself. * a way to account for entities that have multiple buffers, a concept that I no longer deem relevant. While this commit does the bulk of the work to get there, the following commits will refactor more stuff to get things back to normal.
Diffstat (limited to 'common/entityreader.cpp')
-rw-r--r--common/entityreader.cpp209
1 files changed, 102 insertions, 107 deletions
diff --git a/common/entityreader.cpp b/common/entityreader.cpp
index cca1511..c49d1f7 100644
--- a/common/entityreader.cpp
+++ b/common/entityreader.cpp
@@ -28,75 +28,82 @@ SINK_DEBUG_AREA("entityreader")
28 28
29using namespace Sink; 29using namespace Sink;
30 30
31QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> EntityReaderUtils::getLatest(const Sink::Storage::NamedDatabase &db, const QByteArray &uid, DomainTypeAdaptorFactoryInterface &adaptorFactory, qint64 &retrievedRevision) 31/* QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> EntityReaderUtils::getLatest(const Sink::Storage::DataStore::NamedDatabase &db, const QByteArray &uid, DomainTypeAdaptorFactoryInterface &adaptorFactory, qint64 &retrievedRevision) */
32{ 32/* { */
33 QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> current; 33/* QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> current; */
34 db.findLatest(uid, 34/* db.findLatest(uid, */
35 [&current, &adaptorFactory, &retrievedRevision](const QByteArray &key, const QByteArray &data) -> bool { 35/* [&current, &adaptorFactory, &retrievedRevision](const QByteArray &key, const QByteArray &data) -> bool { */
36 Sink::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); 36/* Sink::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); */
37 if (!buffer.isValid()) { 37/* if (!buffer.isValid()) { */
38 SinkWarning() << "Read invalid buffer from disk"; 38/* SinkWarning() << "Read invalid buffer from disk"; */
39 } else { 39/* } else { */
40 SinkTrace() << "Found value " << key; 40/* SinkTrace() << "Found value " << key; */
41 current = adaptorFactory.createAdaptor(buffer.entity()); 41/* current = adaptorFactory.createAdaptor(buffer.entity()); */
42 retrievedRevision = Sink::Storage::revisionFromKey(key); 42/* retrievedRevision = Sink::Storage::DataStore::revisionFromKey(key); */
43 } 43/* } */
44 return false; 44/* return false; */
45 }, 45/* }, */
46 [](const Sink::Storage::Error &error) { SinkWarning() << "Failed to read current value from storage: " << error.message; }); 46/* [](const Sink::Storage::DataStore::Error &error) { SinkWarning() << "Failed to read current value from storage: " << error.message; }); */
47 return current; 47/* return current; */
48} 48/* } */
49 49
50QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> EntityReaderUtils::get(const Sink::Storage::NamedDatabase &db, const QByteArray &key, DomainTypeAdaptorFactoryInterface &adaptorFactory, qint64 &retrievedRevision) 50/* QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> EntityReaderUtils::get(const Sink::Storage::DataStore::NamedDatabase &db, const QByteArray &key, DomainTypeAdaptorFactoryInterface &adaptorFactory, qint64 &retrievedRevision) */
51{ 51/* { */
52 QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> current; 52/* QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> current; */
53 db.scan(key, 53/* db.scan(key, */
54 [&current, &adaptorFactory, &retrievedRevision](const QByteArray &key, const QByteArray &data) -> bool { 54/* [&current, &adaptorFactory, &retrievedRevision](const QByteArray &key, const QByteArray &data) -> bool { */
55 Sink::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); 55/* Sink::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); */
56 if (!buffer.isValid()) { 56/* if (!buffer.isValid()) { */
57 SinkWarning() << "Read invalid buffer from disk"; 57/* SinkWarning() << "Read invalid buffer from disk"; */
58 } else { 58/* } else { */
59 current = adaptorFactory.createAdaptor(buffer.entity()); 59/* current = adaptorFactory.createAdaptor(buffer.entity()); */
60 retrievedRevision = Sink::Storage::revisionFromKey(key); 60/* retrievedRevision = Sink::Storage::DataStore::revisionFromKey(key); */
61 } 61/* } */
62 return false; 62/* return false; */
63 }, 63/* }, */
64 [](const Sink::Storage::Error &error) { SinkWarning() << "Failed to read current value from storage: " << error.message; }); 64/* [](const Sink::Storage::DataStore::Error &error) { SinkWarning() << "Failed to read current value from storage: " << error.message; }); */
65 return current; 65/* return current; */
66} 66/* } */
67 67
68QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> EntityReaderUtils::getPrevious(const Sink::Storage::NamedDatabase &db, const QByteArray &uid, qint64 revision, DomainTypeAdaptorFactoryInterface &adaptorFactory, qint64 &retrievedRevision) 68/* QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> EntityReaderUtils::getPrevious(const Sink::Storage::DataStore::NamedDatabase &db, const QByteArray &uid, qint64 revision, DomainTypeAdaptorFactoryInterface &adaptorFactory, qint64 &retrievedRevision) */
69{ 69/* { */
70 QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> current; 70/* QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> current; */
71 qint64 latestRevision = 0; 71/* qint64 latestRevision = 0; */
72 db.scan(uid, 72/* db.scan(uid, */
73 [&current, &latestRevision, revision](const QByteArray &key, const QByteArray &) -> bool { 73/* [&current, &latestRevision, revision](const QByteArray &key, const QByteArray &) -> bool { */
74 auto foundRevision = Sink::Storage::revisionFromKey(key); 74/* auto foundRevision = Sink::Storage::DataStore::revisionFromKey(key); */
75 if (foundRevision < revision && foundRevision > latestRevision) { 75/* if (foundRevision < revision && foundRevision > latestRevision) { */
76 latestRevision = foundRevision; 76/* latestRevision = foundRevision; */
77 } 77/* } */
78 return true; 78/* return true; */
79 }, 79/* }, */
80 [](const Sink::Storage::Error &error) { SinkWarning() << "Failed to read current value from storage: " << error.message; }, true); 80/* [](const Sink::Storage::DataStore::Error &error) { SinkWarning() << "Failed to read current value from storage: " << error.message; }, true); */
81 return get(db, Sink::Storage::assembleKey(uid, latestRevision), adaptorFactory, retrievedRevision); 81/* return get(db, Sink::Storage::DataStore::assembleKey(uid, latestRevision), adaptorFactory, retrievedRevision); */
82} 82/* } */
83
84/* template <class DomainType> */
85/* EntityReader<DomainType>::EntityReader(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier, Sink::Storage::DataStore::Transaction &transaction) */
86/* : mResourceInstanceIdentifier(resourceInstanceIdentifier), */
87/* mTransaction(transaction), */
88/* mDomainTypeAdaptorFactoryPtr(Sink::AdaptorFactoryRegistry::instance().getFactory<DomainType>(resourceType)), */
89/* mDomainTypeAdaptorFactory(*mDomainTypeAdaptorFactoryPtr) */
90/* { */
91/* Q_ASSERT(!resourceType.isEmpty()); */
92/* Q_ASSERT(mDomainTypeAdaptorFactoryPtr); */
93/* } */
94
95/* template <class DomainType> */
96/* EntityReader<DomainType>::EntityReader(DomainTypeAdaptorFactoryInterface &domainTypeAdaptorFactory, const QByteArray &resourceInstanceIdentifier, Sink::Storage::DataStore::Transaction &transaction) */
97/* : mResourceInstanceIdentifier(resourceInstanceIdentifier), */
98/* mTransaction(transaction), */
99/* mDomainTypeAdaptorFactory(domainTypeAdaptorFactory) */
100/* { */
101
102/* } */
83 103
84template <class DomainType> 104template <class DomainType>
85EntityReader<DomainType>::EntityReader(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier, Sink::Storage::Transaction &transaction) 105EntityReader<DomainType>::EntityReader(Storage::EntityStore &entityStore)
86 : mResourceInstanceIdentifier(resourceInstanceIdentifier), 106 : mEntityStore(entityStore)
87 mTransaction(transaction),
88 mDomainTypeAdaptorFactoryPtr(Sink::AdaptorFactoryRegistry::instance().getFactory<DomainType>(resourceType)),
89 mDomainTypeAdaptorFactory(*mDomainTypeAdaptorFactoryPtr)
90{
91 Q_ASSERT(!resourceType.isEmpty());
92 Q_ASSERT(mDomainTypeAdaptorFactoryPtr);
93}
94
95template <class DomainType>
96EntityReader<DomainType>::EntityReader(DomainTypeAdaptorFactoryInterface &domainTypeAdaptorFactory, const QByteArray &resourceInstanceIdentifier, Sink::Storage::Transaction &transaction)
97 : mResourceInstanceIdentifier(resourceInstanceIdentifier),
98 mTransaction(transaction),
99 mDomainTypeAdaptorFactory(domainTypeAdaptorFactory)
100{ 107{
101 108
102} 109}
@@ -105,40 +112,28 @@ template <class DomainType>
105DomainType EntityReader<DomainType>::read(const QByteArray &identifier) const 112DomainType EntityReader<DomainType>::read(const QByteArray &identifier) const
106{ 113{
107 auto typeName = ApplicationDomain::getTypeName<DomainType>(); 114 auto typeName = ApplicationDomain::getTypeName<DomainType>();
108 auto mainDatabase = Storage::mainDatabase(mTransaction, typeName); 115 return mEntityStore.readLatest<DomainType>(identifier);
109 qint64 retrievedRevision = 0;
110 auto bufferAdaptor = EntityReaderUtils::getLatest(mainDatabase, identifier, mDomainTypeAdaptorFactory, retrievedRevision);
111 if (!bufferAdaptor) {
112 return DomainType();
113 }
114 return DomainType(mResourceInstanceIdentifier, identifier, retrievedRevision, bufferAdaptor);
115} 116}
116 117
117template <class DomainType> 118template <class DomainType>
118DomainType EntityReader<DomainType>::readFromKey(const QByteArray &key) const 119DomainType EntityReader<DomainType>::readFromKey(const QByteArray &key) const
119{ 120{
120 auto typeName = ApplicationDomain::getTypeName<DomainType>(); 121 /* auto typeName = ApplicationDomain::getTypeName<DomainType>(); */
121 auto mainDatabase = Storage::mainDatabase(mTransaction, typeName); 122 /* auto mainDatabase = Storage::DataStore::mainDatabase(mTransaction, typeName); */
122 qint64 retrievedRevision = 0; 123 /* qint64 retrievedRevision = 0; */
123 auto bufferAdaptor = EntityReaderUtils::get(mainDatabase, key, mDomainTypeAdaptorFactory, retrievedRevision); 124 /* auto bufferAdaptor = EntityReaderUtils::get(mainDatabase, key, mDomainTypeAdaptorFactory, retrievedRevision); */
124 const auto identifier = Storage::uidFromKey(key); 125 /* const auto identifier = Storage::DataStore::uidFromKey(key); */
125 if (!bufferAdaptor) { 126 /* if (!bufferAdaptor) { */
126 return DomainType(); 127 /* return DomainType(); */
127 } 128 /* } */
128 return DomainType(mResourceInstanceIdentifier, identifier, retrievedRevision, bufferAdaptor); 129 /* return DomainType(mResourceInstanceIdentifier, identifier, retrievedRevision, bufferAdaptor); */
130 return mEntityStore.readEntity<DomainType>(key);
129} 131}
130 132
131template <class DomainType> 133template <class DomainType>
132DomainType EntityReader<DomainType>::readPrevious(const QByteArray &uid, qint64 revision) const 134DomainType EntityReader<DomainType>::readPrevious(const QByteArray &uid, qint64 revision) const
133{ 135{
134 auto typeName = ApplicationDomain::getTypeName<DomainType>(); 136 return mEntityStore.readPrevious<DomainType>(uid, revision);
135 auto mainDatabase = Storage::mainDatabase(mTransaction, typeName);
136 qint64 retrievedRevision = 0;
137 auto bufferAdaptor = EntityReaderUtils::getPrevious(mainDatabase, uid, revision, mDomainTypeAdaptorFactory, retrievedRevision);
138 if (!bufferAdaptor) {
139 return DomainType();
140 }
141 return DomainType(mResourceInstanceIdentifier, uid, retrievedRevision, bufferAdaptor);
142} 137}
143 138
144template <class DomainType> 139template <class DomainType>
@@ -157,14 +152,14 @@ QPair<qint64, qint64> EntityReader<DomainType>::executeInitialQuery(const Sink::
157 QTime time; 152 QTime time;
158 time.start(); 153 time.start();
159 154
160 auto preparedQuery = ApplicationDomain::TypeImplementation<DomainType>::prepareQuery(query, mTransaction); 155 auto preparedQuery = ApplicationDomain::TypeImplementation<DomainType>::prepareQuery(query, Storage::EntityStore::Ptr(&mEntityStore, [](Storage::EntityStore *){}));
161 auto resultSet = preparedQuery->execute(); 156 auto resultSet = preparedQuery->execute();
162 157
163 SinkTrace() << "Filtered set retrieved. " << Log::TraceTime(time.elapsed()); 158 SinkTrace() << "Filtered set retrieved. " << Log::TraceTime(time.elapsed());
164 auto replayedEntities = replaySet(resultSet, offset, batchsize, callback); 159 auto replayedEntities = replaySet(resultSet, offset, batchsize, callback);
165 160
166 SinkTrace() << "Initial query took: " << Log::TraceTime(time.elapsed()); 161 SinkTrace() << "Initial query took: " << Log::TraceTime(time.elapsed());
167 return qMakePair(Sink::Storage::maxRevision(mTransaction), replayedEntities); 162 return qMakePair(mEntityStore.maxRevision(), replayedEntities);
168} 163}
169 164
170template <class DomainType> 165template <class DomainType>
@@ -174,14 +169,14 @@ QPair<qint64, qint64> EntityReader<DomainType>::executeIncrementalQuery(const Si
174 time.start(); 169 time.start();
175 const qint64 baseRevision = lastRevision + 1; 170 const qint64 baseRevision = lastRevision + 1;
176 171
177 auto preparedQuery = ApplicationDomain::TypeImplementation<DomainType>::prepareQuery(query, mTransaction); 172 auto preparedQuery = ApplicationDomain::TypeImplementation<DomainType>::prepareQuery(query, Storage::EntityStore::Ptr(&mEntityStore, [](Storage::EntityStore *){}));
178 auto resultSet = preparedQuery->update(baseRevision); 173 auto resultSet = preparedQuery->update(baseRevision);
179 174
180 SinkTrace() << "Filtered set retrieved. " << Log::TraceTime(time.elapsed()); 175 SinkTrace() << "Filtered set retrieved. " << Log::TraceTime(time.elapsed());
181 auto replayedEntities = replaySet(resultSet, 0, 0, callback); 176 auto replayedEntities = replaySet(resultSet, 0, 0, callback);
182 177
183 SinkTrace() << "Incremental query took: " << Log::TraceTime(time.elapsed()); 178 SinkTrace() << "Incremental query took: " << Log::TraceTime(time.elapsed());
184 return qMakePair(Sink::Storage::maxRevision(mTransaction), replayedEntities); 179 return qMakePair(mEntityStore.maxRevision(), replayedEntities);
185} 180}
186 181
187template <class DomainType> 182template <class DomainType>
@@ -190,18 +185,18 @@ qint64 EntityReader<DomainType>::replaySet(ResultSet &resultSet, int offset, int
190 SinkTrace() << "Skipping over " << offset << " results"; 185 SinkTrace() << "Skipping over " << offset << " results";
191 resultSet.skip(offset); 186 resultSet.skip(offset);
192 int counter = 0; 187 int counter = 0;
193 while (!batchSize || (counter < batchSize)) { 188 /* while (!batchSize || (counter < batchSize)) { */
194 const bool ret = 189 /* const bool ret = */
195 resultSet.next([this, &counter, callback](const ResultSet::Result &result) -> bool { 190 /* resultSet.next([this, &counter, callback](const ResultSet::Result &result) -> bool { */
196 counter++; 191 /* counter++; */
197 auto adaptor = mDomainTypeAdaptorFactory.createAdaptor(result.buffer.entity()); 192 /* auto adaptor = mResourceContext.adaptorFactory<DomainType>().createAdaptor(result.buffer.entity()); */
198 Q_ASSERT(adaptor); 193 /* Q_ASSERT(adaptor); */
199 return callback(QSharedPointer<DomainType>::create(mResourceInstanceIdentifier, result.uid, result.buffer.revision(), adaptor), result.operation, result.aggregateValues); 194 /* return callback(QSharedPointer<DomainType>::create(mResourceContext, result.uid, result.buffer.revision(), adaptor), result.operation, result.aggregateValues); */
200 }); 195 /* }); */
201 if (!ret) { 196 /* if (!ret) { */
202 break; 197 /* break; */
203 } 198 /* } */
204 }; 199 /* }; */
205 SinkTrace() << "Replayed " << counter << " results." 200 SinkTrace() << "Replayed " << counter << " results."
206 << "Limit " << batchSize; 201 << "Limit " << batchSize;
207 return counter; 202 return counter;