summaryrefslogtreecommitdiffstats
path: root/common/storage/entitystore.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'common/storage/entitystore.cpp')
-rw-r--r--common/storage/entitystore.cpp338
1 files changed, 338 insertions, 0 deletions
diff --git a/common/storage/entitystore.cpp b/common/storage/entitystore.cpp
new file mode 100644
index 0000000..9615eca
--- /dev/null
+++ b/common/storage/entitystore.cpp
@@ -0,0 +1,338 @@
1/*
2 * Copyright (C) 2016 Christian Mollekopf <mollekopf@kolabsys.com>
3 *
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Lesser General Public
6 * License as published by the Free Software Foundation; either
7 * version 2.1 of the License, or (at your option) version 3, or any
8 * later version accepted by the membership of KDE e.V. (or its
9 * successor approved by the membership of KDE e.V.), which shall
10 * act as a proxy defined in Section 6 of version 3 of the license.
11 *
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this library. If not, see <http://www.gnu.org/licenses/>.
19 */
20#include "entitystore.h"
21
22#include "entitybuffer.h"
23#include "log.h"
24#include "typeindex.h"
25#include "definitions.h"
26#include "resourcecontext.h"
27#include "index.h"
28
29#include "mail.h"
30#include "folder.h"
31#include "event.h"
32
33using namespace Sink;
34using namespace Sink::Storage;
35
36SINK_DEBUG_AREA("entitystore");
37
38class EntityStore::Private {
39public:
40 Private(const ResourceContext &context) : resourceContext(context) {}
41
42 ResourceContext resourceContext;
43 DataStore::Transaction transaction;
44 QHash<QByteArray, QSharedPointer<TypeIndex> > indexByType;
45
46 DataStore::Transaction &getTransaction()
47 {
48 if (transaction) {
49 return transaction;
50 }
51
52 Sink::Storage::DataStore store(Sink::storageLocation(), resourceContext.instanceId(), DataStore::ReadOnly);
53 transaction = store.createTransaction(DataStore::ReadOnly);
54 Q_ASSERT(transaction.validateNamedDatabases());
55 return transaction;
56 }
57
58 /* template<typename T> */
59 /* TypeIndex &typeIndex(const QByteArray &type) */
60 /* { */
61 /* if (indexByType.contains(type)) { */
62 /* return *indexByType.value(type); */
63 /* } */
64 /* auto index = QSharedPointer<TypeIndex>::create(type); */
65 /* ApplicationDomain::TypeImplementation<T>::configureIndex(*index); */
66 /* indexByType.insert(type, index); */
67 /* return *index; */
68 /* } */
69
70 TypeIndex &typeIndex(const QByteArray &type)
71 {
72 /* return applyType<typeIndex>(type); */
73 if (indexByType.contains(type)) {
74 return *indexByType.value(type);
75 }
76 auto index = QSharedPointer<TypeIndex>::create(type);
77 //TODO expand for all types
78 /* TypeHelper<type>::configureIndex(*index); */
79 // Try this: (T would i.e. become
80 // TypeHelper<ApplicationDomain::TypeImplementation>::T::configureIndex(*index);
81 if (type == ApplicationDomain::getTypeName<ApplicationDomain::Folder>()) {
82 ApplicationDomain::TypeImplementation<ApplicationDomain::Folder>::configureIndex(*index);
83 } else if (type == ApplicationDomain::getTypeName<ApplicationDomain::Mail>()) {
84 ApplicationDomain::TypeImplementation<ApplicationDomain::Mail>::configureIndex(*index);
85 } else if (type == ApplicationDomain::getTypeName<ApplicationDomain::Event>()) {
86 ApplicationDomain::TypeImplementation<ApplicationDomain::Event>::configureIndex(*index);
87 } else {
88 Q_ASSERT(false);
89 SinkError() << "Unkonwn type " << type;
90 }
91 indexByType.insert(type, index);
92 return *index;
93 }
94};
95
96EntityStore::EntityStore(const ResourceContext &context)
97 : d(new EntityStore::Private{context})
98{
99
100}
101
102void EntityStore::startTransaction(Sink::Storage::DataStore::AccessMode accessMode)
103{
104 Sink::Storage::DataStore store(Sink::storageLocation(), d->resourceContext.instanceId(), accessMode);
105 d->transaction = store.createTransaction(accessMode);
106 Q_ASSERT(d->transaction.validateNamedDatabases());
107}
108
109void EntityStore::commitTransaction()
110{
111 d->transaction.commit();
112 d->transaction = Storage::DataStore::Transaction();
113}
114
115void EntityStore::abortTransaction()
116{
117 d->transaction.abort();
118 d->transaction = Storage::DataStore::Transaction();
119}
120
121QVector<QByteArray> EntityStore::fullScan(const QByteArray &type)
122{
123 SinkTrace() << "Looking for : " << type;
124 //The scan can return duplicate results if we have multiple revisions, so we use a set to deduplicate.
125 QSet<QByteArray> keys;
126 DataStore::mainDatabase(d->getTransaction(), type)
127 .scan(QByteArray(),
128 [&](const QByteArray &key, const QByteArray &value) -> bool {
129 const auto uid = DataStore::uidFromKey(key);
130 if (keys.contains(uid)) {
131 //Not something that should persist if the replay works, so we keep a message for now.
132 SinkTrace() << "Multiple revisions for key: " << key;
133 }
134 keys << uid;
135 return true;
136 },
137 [](const DataStore::Error &error) { SinkWarning() << "Error during query: " << error.message; });
138
139 SinkTrace() << "Full scan retrieved " << keys.size() << " results.";
140 return keys.toList().toVector();
141}
142
143QVector<QByteArray> EntityStore::indexLookup(const QByteArray &type, const Query &query, QSet<QByteArray> &appliedFilters, QByteArray &appliedSorting)
144{
145 return d->typeIndex(type).query(query, appliedFilters, appliedSorting, d->getTransaction());
146}
147
148QVector<QByteArray> EntityStore::indexLookup(const QByteArray &type, const QByteArray &property, const QVariant &value)
149{
150 return d->typeIndex(type).lookup(property, value, d->getTransaction());
151}
152
153void EntityStore::indexLookup(const QByteArray &type, const QByteArray &property, const QVariant &value, const std::function<void(const QByteArray &uid)> &callback)
154{
155 auto list = d->typeIndex(type).lookup(property, value, d->getTransaction());
156 for (const auto &uid : list) {
157 callback(uid);
158 }
159 /* Index index(type + ".index." + property, d->transaction); */
160 /* index.lookup(value, [&](const QByteArray &sinkId) { */
161 /* callback(sinkId); */
162 /* }, */
163 /* [&](const Index::Error &error) { */
164 /* SinkWarning() << "Error in index: " << error.message << property; */
165 /* }); */
166}
167
168void EntityStore::readLatest(const QByteArray &type, const QByteArray &uid, const std::function<void(const QByteArray &uid, const EntityBuffer &entity)> callback)
169{
170 auto db = DataStore::mainDatabase(d->getTransaction(), type);
171 db.findLatest(uid,
172 [=](const QByteArray &key, const QByteArray &value) -> bool {
173 callback(DataStore::uidFromKey(key), Sink::EntityBuffer(value.data(), value.size()));
174 return false;
175 },
176 [&](const DataStore::Error &error) { SinkWarning() << "Error during query: " << error.message << uid; });
177}
178
179void EntityStore::readLatest(const QByteArray &type, const QByteArray &uid, const std::function<void(const ApplicationDomain::ApplicationDomainType &)> callback)
180{
181 readLatest(type, uid, [&](const QByteArray &uid, const EntityBuffer &buffer) {
182 auto adaptor = d->resourceContext.adaptorFactory(type).createAdaptor(buffer.entity());
183 callback(ApplicationDomain::ApplicationDomainType{d->resourceContext.instanceId(), uid, DataStore::maxRevision(d->getTransaction()), adaptor});
184 });
185}
186
187ApplicationDomain::ApplicationDomainType EntityStore::readLatest(const QByteArray &type, const QByteArray &uid)
188{
189 ApplicationDomain::ApplicationDomainType dt;
190 readLatest(type, uid, [&](const ApplicationDomain::ApplicationDomainType &entity) {
191 dt = entity;
192 });
193 return dt;
194}
195
196void EntityStore::readEntity(const QByteArray &type, const QByteArray &key, const std::function<void(const QByteArray &uid, const EntityBuffer &entity)> callback)
197{
198 auto db = DataStore::mainDatabase(d->getTransaction(), type);
199 db.scan(key,
200 [=](const QByteArray &key, const QByteArray &value) -> bool {
201 callback(DataStore::uidFromKey(key), Sink::EntityBuffer(value.data(), value.size()));
202 return false;
203 },
204 [&](const DataStore::Error &error) { SinkWarning() << "Error during query: " << error.message << key; });
205}
206
207void EntityStore::readEntity(const QByteArray &type, const QByteArray &uid, const std::function<void(const ApplicationDomain::ApplicationDomainType &)> callback)
208{
209 readEntity(type, uid, [&](const QByteArray &uid, const EntityBuffer &buffer) {
210 auto adaptor = d->resourceContext.adaptorFactory(type).createAdaptor(buffer.entity());
211 callback(ApplicationDomain::ApplicationDomainType{d->resourceContext.instanceId(), uid, DataStore::maxRevision(d->getTransaction()), adaptor});
212 });
213}
214
215ApplicationDomain::ApplicationDomainType EntityStore::readEntity(const QByteArray &type, const QByteArray &uid)
216{
217 ApplicationDomain::ApplicationDomainType dt;
218 readEntity(type, uid, [&](const ApplicationDomain::ApplicationDomainType &entity) {
219 dt = entity;
220 });
221 return dt;
222}
223
224
225void EntityStore::readAll(const QByteArray &type, const std::function<void(const ApplicationDomain::ApplicationDomainType &entity)> &callback)
226{
227 auto db = DataStore::mainDatabase(d->getTransaction(), type);
228 db.scan("",
229 [=](const QByteArray &key, const QByteArray &value) -> bool {
230 auto uid = DataStore::uidFromKey(key);
231 auto buffer = Sink::EntityBuffer{value.data(), value.size()};
232 auto adaptor = d->resourceContext.adaptorFactory(type).createAdaptor(buffer.entity());
233 callback(ApplicationDomain::ApplicationDomainType{d->resourceContext.instanceId(), uid, DataStore::maxRevision(d->getTransaction()), adaptor});
234 return true;
235 },
236 [&](const DataStore::Error &error) { SinkWarning() << "Error during query: " << error.message; });
237}
238
239void EntityStore::readRevisions(qint64 baseRevision, const QByteArray &expectedType, const std::function<void(const QByteArray &key)> &callback)
240{
241 qint64 revisionCounter = baseRevision;
242 const qint64 topRevision = DataStore::maxRevision(d->getTransaction());
243 // Spit out the revision keys one by one.
244 while (revisionCounter <= topRevision) {
245 const auto uid = DataStore::getUidFromRevision(d->getTransaction(), revisionCounter);
246 const auto type = DataStore::getTypeFromRevision(d->getTransaction(), revisionCounter);
247 // SinkTrace() << "Revision" << *revisionCounter << type << uid;
248 Q_ASSERT(!uid.isEmpty());
249 Q_ASSERT(!type.isEmpty());
250 if (type != expectedType) {
251 // Skip revision
252 revisionCounter++;
253 continue;
254 }
255 const auto key = DataStore::assembleKey(uid, revisionCounter);
256 revisionCounter++;
257 callback(key);
258 }
259}
260
261void EntityStore::readPrevious(const QByteArray &type, const QByteArray &uid, qint64 revision, const std::function<void(const QByteArray &uid, const EntityBuffer &entity)> callback)
262{
263 auto db = DataStore::mainDatabase(d->getTransaction(), type);
264 qint64 latestRevision = 0;
265 db.scan(uid,
266 [&latestRevision, revision](const QByteArray &key, const QByteArray &) -> bool {
267 const auto foundRevision = Sink::Storage::DataStore::revisionFromKey(key);
268 if (foundRevision < revision && foundRevision > latestRevision) {
269 latestRevision = foundRevision;
270 }
271 return true;
272 },
273 [](const Sink::Storage::DataStore::Error &error) { SinkWarning() << "Failed to read current value from storage: " << error.message; }, true);
274 return readEntity(type, Sink::Storage::DataStore::assembleKey(uid, latestRevision), callback);
275}
276
277void EntityStore::readPrevious(const QByteArray &type, const QByteArray &uid, qint64 revision, const std::function<void(const ApplicationDomain::ApplicationDomainType &)> callback)
278{
279 readPrevious(type, uid, revision, [&](const QByteArray &uid, const EntityBuffer &buffer) {
280 auto adaptor = d->resourceContext.adaptorFactory(type).createAdaptor(buffer.entity());
281 callback(ApplicationDomain::ApplicationDomainType{d->resourceContext.instanceId(), uid, DataStore::maxRevision(d->getTransaction()), adaptor});
282 });
283}
284
285ApplicationDomain::ApplicationDomainType EntityStore::readPrevious(const QByteArray &type, const QByteArray &uid, qint64 revision)
286{
287 ApplicationDomain::ApplicationDomainType dt;
288 readPrevious(type, uid, revision, [&](const ApplicationDomain::ApplicationDomainType &entity) {
289 dt = entity;
290 });
291 return dt;
292}
293
294void EntityStore::readAllUids(const QByteArray &type, const std::function<void(const QByteArray &uid)> callback)
295{
296 //TODO use uid index instead
297 //FIXME we currently report each uid for every revision with the same uid
298 auto db = DataStore::mainDatabase(d->getTransaction(), type);
299 db.scan("",
300 [callback](const QByteArray &key, const QByteArray &) -> bool {
301 callback(Sink::Storage::DataStore::uidFromKey(key));
302 return true;
303 },
304 [](const Sink::Storage::DataStore::Error &error) { SinkWarning() << "Failed to read current value from storage: " << error.message; });
305}
306
307bool EntityStore::contains(const QByteArray &type, const QByteArray &uid)
308{
309 return DataStore::mainDatabase(d->getTransaction(), type).contains(uid);
310}
311
312qint64 EntityStore::maxRevision()
313{
314 return DataStore::maxRevision(d->getTransaction());
315}
316
317/* DataStore::Transaction getTransaction() */
318/* { */
319/* Sink::Storage::DataStore::Transaction transaction; */
320/* { */
321/* Sink::Storage::DataStore storage(Sink::storageLocation(), mResourceInstanceIdentifier); */
322/* if (!storage.exists()) { */
323/* //This is not an error if the resource wasn't started before */
324/* SinkLog() << "Store doesn't exist: " << mResourceInstanceIdentifier; */
325/* return Sink::Storage::DataStore::Transaction(); */
326/* } */
327/* storage.setDefaultErrorHandler([this](const Sink::Storage::DataStore::Error &error) { SinkWarning() << "Error during query: " << error.store << error.message; }); */
328/* transaction = storage.createTransaction(Sink::Storage::DataStore::ReadOnly); */
329/* } */
330
331/* //FIXME this is a temporary measure to recover from a failure to open the named databases correctly. */
332/* //Once the actual problem is fixed it will be enough to simply crash if we open the wrong database (which we check in openDatabase already). */
333/* while (!transaction.validateNamedDatabases()) { */
334/* Sink::Storage::DataStore storage(Sink::storageLocation(), mResourceInstanceIdentifier); */
335/* transaction = storage.createTransaction(Sink::Storage::DataStore::ReadOnly); */
336/* } */
337/* return transaction; */
338/* } */