summaryrefslogtreecommitdiffstats
path: root/common/facade.h
diff options
context:
space:
mode:
Diffstat (limited to 'common/facade.h')
-rw-r--r--common/facade.h262
1 files changed, 11 insertions, 251 deletions
diff --git a/common/facade.h b/common/facade.h
index 643ebec..de67e05 100644
--- a/common/facade.h
+++ b/common/facade.h
@@ -1,5 +1,5 @@
1/* 1/*
2 * Copyright (C) 2014 Christian Mollekopf <chrigi_1@fastmail.fm> 2 * Copyright (C) 2014 Christian Mollekopf <chrigi_1@fastmail.fm>
3 * 3 *
4 * This program is free software; you can redistribute it and/or modify 4 * This program is free software; you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License as published by 5 * it under the terms of the GNU General Public License as published by
@@ -25,85 +25,12 @@
25#include <Async/Async> 25#include <Async/Async>
26 26
27#include "resourceaccess.h" 27#include "resourceaccess.h"
28#include "commands.h"
29#include "domainadaptor.h"
30#include "log.h"
31#include "resultset.h" 28#include "resultset.h"
32#include "entitystorage.h" 29#include "domaintypeadaptorfactoryinterface.h"
33 30#include "storage.h"
34/**
35 * A QueryRunner runs a query and updates the corresponding result set.
36 *
37 * The lifetime of the QueryRunner is defined by the resut set (otherwise it's doing useless work),
38 * and by how long a result set must be updated. If the query is one off the runner dies after the execution,
39 * otherwise it lives on the react to changes and updates the corresponding result set.
40 *
41 * QueryRunner has to keep ResourceAccess alive in order to keep getting updates.
42 */
43class QueryRunner : public QObject
44{
45 Q_OBJECT
46public:
47 typedef std::function<KAsync::Job<qint64>(qint64 oldRevision)> QueryFunction;
48
49 QueryRunner(const Akonadi2::Query &query) : mLatestRevision(0) {};
50 /**
51 * Starts query
52 */
53 KAsync::Job<void> run(qint64 newRevision = 0)
54 {
55 //TODO: JOBAPI: that last empty .then should not be necessary
56 //TODO: remove newRevision
57 return queryFunction(mLatestRevision + 1).then<void, qint64>([this](qint64 revision) {
58 mLatestRevision = revision;
59 }).then<void>([](){});
60 }
61
62 /**
63 * Set the query to run
64 */
65 void setQuery(const QueryFunction &query)
66 {
67 queryFunction = query;
68 }
69
70public slots:
71 /**
72 * Rerun query with new revision
73 */
74 void revisionChanged(qint64 newRevision)
75 {
76 Trace() << "New revision: " << newRevision;
77 run(newRevision).exec();
78 }
79
80private:
81 QueryFunction queryFunction;
82 qint64 mLatestRevision;
83};
84
85static inline ResultSet fullScan(const Akonadi2::Storage::Transaction &transaction, const QByteArray &bufferType)
86{
87 //TODO use a result set with an iterator, to read values on demand
88 QVector<QByteArray> keys;
89 transaction.openDatabase(bufferType + ".main").scan(QByteArray(), [&](const QByteArray &key, const QByteArray &value) -> bool {
90 //Skip internals
91 if (Akonadi2::Storage::isInternalKey(key)) {
92 return true;
93 }
94 keys << Akonadi2::Storage::uidFromKey(key);
95 return true;
96 },
97 [](const Akonadi2::Storage::Error &error) {
98 qWarning() << "Error during query: " << error.message;
99 });
100
101 Trace() << "Full scan found " << keys.size() << " results";
102 return ResultSet(keys);
103}
104
105 31
106namespace Akonadi2 { 32namespace Akonadi2 {
33
107/** 34/**
108 * Default facade implementation for resources that are implemented in a separate process using the ResourceAccess class. 35 * Default facade implementation for resources that are implemented in a separate process using the ResourceAccess class.
109 * 36 *
@@ -125,185 +52,18 @@ public:
125 * @param resourceIdentifier is the identifier of the resource instance 52 * @param resourceIdentifier is the identifier of the resource instance
126 * @param adaptorFactory is the adaptor factory used to generate the mappings from domain to resource types and vice versa 53 * @param adaptorFactory is the adaptor factory used to generate the mappings from domain to resource types and vice versa
127 */ 54 */
128 GenericFacade(const QByteArray &resourceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &adaptorFactory = DomainTypeAdaptorFactoryInterface::Ptr(), const QSharedPointer<EntityStorage<DomainType> > storage = QSharedPointer<EntityStorage<DomainType> >(), const QSharedPointer<Akonadi2::ResourceAccessInterface> resourceAccess = QSharedPointer<Akonadi2::ResourceAccessInterface>()) 55 GenericFacade(const QByteArray &resourceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &adaptorFactory = DomainTypeAdaptorFactoryInterface::Ptr(), const QSharedPointer<Akonadi2::ResourceAccessInterface> resourceAccess = QSharedPointer<Akonadi2::ResourceAccessInterface>());
129 : Akonadi2::StoreFacade<DomainType>(), 56 ~GenericFacade();
130 mResourceAccess(resourceAccess),
131 mStorage(storage),
132 mDomainTypeAdaptorFactory(adaptorFactory),
133 mResourceInstanceIdentifier(resourceIdentifier)
134 {
135 if (!mResourceAccess) {
136 mResourceAccess = QSharedPointer<Akonadi2::ResourceAccess>::create(resourceIdentifier);
137 }
138 if (!mStorage) {
139 mStorage = QSharedPointer<EntityStorage<DomainType> >::create(resourceIdentifier);
140 const auto bufferType = bufferTypeForDomainType();
141
142 mStorage->readEntity = [bufferType, this] (const Akonadi2::Storage::Transaction &transaction, const QByteArray &key, const std::function<void(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &, Akonadi2::Operation)> &resultCallback)
143 {
144 //This only works for a 1:1 mapping of resource to domain types.
145 //Not i.e. for tags that are stored as flags in each entity of an imap store.
146 //additional properties that don't have a 1:1 mapping (such as separately stored tags),
147 //could be added to the adaptor.
148 transaction.openDatabase(bufferType + ".main").findLatest(key, [=](const QByteArray &key, const QByteArray &value) -> bool {
149 Akonadi2::EntityBuffer buffer(value.data(), value.size());
150 const Akonadi2::Entity &entity = buffer.entity();
151 const auto metadataBuffer = Akonadi2::EntityBuffer::readBuffer<Akonadi2::Metadata>(entity.metadata());
152 Q_ASSERT(metadataBuffer);
153 const qint64 revision = metadataBuffer ? metadataBuffer->revision() : -1;
154 resultCallback(DomainType::Ptr::create(mResourceInstanceIdentifier, Akonadi2::Storage::uidFromKey(key), revision, mDomainTypeAdaptorFactory->createAdaptor(entity)), metadataBuffer->operation());
155 return false;
156 },
157 [](const Akonadi2::Storage::Error &error) {
158 qWarning() << "Error during query: " << error.message;
159 });
160 };
161
162 mStorage->loadInitialResultSet = [bufferType, this] (const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters) -> ResultSet
163 {
164 QSet<QByteArray> appliedFilters;
165 auto resultSet = Akonadi2::ApplicationDomain::TypeImplementation<DomainType>::queryIndexes(query, mResourceInstanceIdentifier, appliedFilters, transaction);
166 remainingFilters = query.propertyFilter.keys().toSet() - appliedFilters;
167
168 //We do a full scan if there were no indexes available to create the initial set.
169 if (appliedFilters.isEmpty()) {
170 //TODO this should be replaced by an index lookup as well
171 return fullScan(transaction, bufferType);
172 }
173 return resultSet;
174 };
175
176 mStorage->loadIncrementalResultSet = [bufferType, this] (qint64 baseRevision, const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters) -> ResultSet
177 {
178 auto revisionCounter = QSharedPointer<qint64>::create(baseRevision);
179 return ResultSet([bufferType, revisionCounter, &transaction, this]() -> QByteArray {
180 const qint64 topRevision = Akonadi2::Storage::maxRevision(transaction);
181 //Spit out the revision keys one by one.
182 while (*revisionCounter <= topRevision) {
183 const auto uid = Akonadi2::Storage::getUidFromRevision(transaction, *revisionCounter);
184 const auto type = Akonadi2::Storage::getTypeFromRevision(transaction, *revisionCounter);
185 Trace() << "Revision" << *revisionCounter << type << uid;
186 if (type != bufferType) {
187 //Skip revision
188 *revisionCounter += 1;
189 continue;
190 }
191 const auto key = Akonadi2::Storage::assembleKey(uid, *revisionCounter);
192 *revisionCounter += 1;
193 return key;
194 }
195 //We're done
196 return QByteArray();
197 });
198 };
199 }
200 }
201
202 ~GenericFacade()
203 {
204 }
205
206 static QByteArray bufferTypeForDomainType()
207 {
208 //We happen to have a one to one mapping
209 return Akonadi2::ApplicationDomain::getTypeName<DomainType>();
210 }
211
212 KAsync::Job<void> create(const DomainType &domainObject) Q_DECL_OVERRIDE
213 {
214 if (!mDomainTypeAdaptorFactory) {
215 Warning() << "No domain type adaptor factory available";
216 return KAsync::error<void>();
217 }
218 flatbuffers::FlatBufferBuilder entityFbb;
219 mDomainTypeAdaptorFactory->createBuffer(domainObject, entityFbb);
220 return mResourceAccess->sendCreateCommand(bufferTypeForDomainType(), QByteArray::fromRawData(reinterpret_cast<const char*>(entityFbb.GetBufferPointer()), entityFbb.GetSize()));
221 }
222
223 KAsync::Job<void> modify(const DomainType &domainObject) Q_DECL_OVERRIDE
224 {
225 if (!mDomainTypeAdaptorFactory) {
226 Warning() << "No domain type adaptor factory available";
227 return KAsync::error<void>();
228 }
229 flatbuffers::FlatBufferBuilder entityFbb;
230 mDomainTypeAdaptorFactory->createBuffer(domainObject, entityFbb);
231 return mResourceAccess->sendModifyCommand(domainObject.identifier(), domainObject.revision(), bufferTypeForDomainType(), QByteArrayList(), QByteArray::fromRawData(reinterpret_cast<const char*>(entityFbb.GetBufferPointer()), entityFbb.GetSize()));
232 }
233
234 KAsync::Job<void> remove(const DomainType &domainObject) Q_DECL_OVERRIDE
235 {
236 return mResourceAccess->sendDeleteCommand(domainObject.identifier(), domainObject.revision(), bufferTypeForDomainType());
237 }
238
239 //TODO JOBAPI return job from sync continuation to execute it as subjob?
240 KAsync::Job<void> load(const Akonadi2::Query &query, const QSharedPointer<Akonadi2::ResultProvider<typename DomainType::Ptr> > &resultProvider) Q_DECL_OVERRIDE
241 {
242 auto runner = QSharedPointer<QueryRunner>::create(query);
243 QWeakPointer<Akonadi2::ResultProvider<typename DomainType::Ptr> > weakResultProvider = resultProvider;
244 runner->setQuery([this, weakResultProvider, query] (qint64 oldRevision) -> KAsync::Job<qint64> {
245 return KAsync::start<qint64>([this, weakResultProvider, query, oldRevision](KAsync::Future<qint64> &future) {
246 Trace() << "Executing query " << oldRevision;
247 auto resultProvider = weakResultProvider.toStrongRef();
248 if (!resultProvider) {
249 Warning() << "Tried executing query after result provider is already gone";
250 future.setError(0, QString());
251 future.setFinished();
252 return;
253 }
254 load(query, resultProvider, oldRevision).template then<void, qint64>([&future, this](qint64 queriedRevision) {
255 //TODO set revision in result provider?
256 //TODO update all existing results with new revision
257 mResourceAccess->sendRevisionReplayedCommand(queriedRevision);
258 future.setValue(queriedRevision);
259 future.setFinished();
260 }).exec();
261 });
262 });
263
264 //In case of a live query we keep the runner for as long alive as the result provider exists
265 if (query.liveQuery) {
266 resultProvider->setQueryRunner(runner);
267 //Ensure the connection is open, if it wasn't already opened
268 //TODO If we are not connected already, we have to check for the latest revision once connected, otherwise we could miss some updates
269 mResourceAccess->open();
270 QObject::connect(mResourceAccess.data(), &Akonadi2::ResourceAccess::revisionChanged, runner.data(), &QueryRunner::revisionChanged);
271 }
272
273 //We have to capture the runner to keep it alive
274 return synchronizeResource(query).template then<void>([runner](KAsync::Future<void> &future) {
275 runner->run().then<void>([&future]() {
276 future.setFinished();
277 }).exec();
278 },
279 [](int error, const QString &errorString) {
280 Warning() << "Error during sync " << error << errorString;
281 });
282 }
283
284private:
285 KAsync::Job<void> synchronizeResource(const Akonadi2::Query &query)
286 {
287 //TODO check if a sync is necessary
288 //TODO Only sync what was requested
289 //TODO timeout
290 if (query.syncOnDemand || query.processAll) {
291 return mResourceAccess->synchronizeResource(query.syncOnDemand, query.processAll);
292 }
293 return KAsync::null<void>();
294 }
295 57
296 virtual KAsync::Job<qint64> load(const Akonadi2::Query &query, const QSharedPointer<Akonadi2::ResultProvider<typename DomainType::Ptr> > &resultProvider, qint64 oldRevision) 58 static QByteArray bufferTypeForDomainType();
297 { 59 KAsync::Job<void> create(const DomainType &domainObject) Q_DECL_OVERRIDE;
298 return KAsync::start<qint64>([=]() -> qint64 { 60 KAsync::Job<void> modify(const DomainType &domainObject) Q_DECL_OVERRIDE;
299 return mStorage->read(query, oldRevision, resultProvider); 61 KAsync::Job<void> remove(const DomainType &domainObject) Q_DECL_OVERRIDE;
300 }); 62 QPair<KAsync::Job<void>, typename ResultEmitter<typename DomainType::Ptr>::Ptr> load(const Akonadi2::Query &query) Q_DECL_OVERRIDE;
301 }
302 63
303protected: 64protected:
304 //TODO use one resource access instance per application & per resource 65 //TODO use one resource access instance per application & per resource
305 QSharedPointer<Akonadi2::ResourceAccessInterface> mResourceAccess; 66 QSharedPointer<Akonadi2::ResourceAccessInterface> mResourceAccess;
306 QSharedPointer<EntityStorage<DomainType> > mStorage;
307 DomainTypeAdaptorFactoryInterface::Ptr mDomainTypeAdaptorFactory; 67 DomainTypeAdaptorFactoryInterface::Ptr mDomainTypeAdaptorFactory;
308 QByteArray mResourceInstanceIdentifier; 68 QByteArray mResourceInstanceIdentifier;
309}; 69};