diff options
Diffstat (limited to 'common')
29 files changed, 1386 insertions, 614 deletions
diff --git a/common/CMakeLists.txt b/common/CMakeLists.txt index b4a4703..e56ece9 100644 --- a/common/CMakeLists.txt +++ b/common/CMakeLists.txt | |||
@@ -12,10 +12,10 @@ else (STORAGE_unqlite) | |||
12 | endif (STORAGE_unqlite) | 12 | endif (STORAGE_unqlite) |
13 | 13 | ||
14 | set(command_SRCS | 14 | set(command_SRCS |
15 | modelresult.cpp | ||
15 | definitions.cpp | 16 | definitions.cpp |
16 | log.cpp | 17 | log.cpp |
17 | entitybuffer.cpp | 18 | entitybuffer.cpp |
18 | entitystorage.cpp | ||
19 | clientapi.cpp | 19 | clientapi.cpp |
20 | facadefactory.cpp | 20 | facadefactory.cpp |
21 | commands.cpp | 21 | commands.cpp |
@@ -26,6 +26,7 @@ set(command_SRCS | |||
26 | resource.cpp | 26 | resource.cpp |
27 | genericresource.cpp | 27 | genericresource.cpp |
28 | resourceaccess.cpp | 28 | resourceaccess.cpp |
29 | queryrunner.cpp | ||
29 | listener.cpp | 30 | listener.cpp |
30 | storage_common.cpp | 31 | storage_common.cpp |
31 | threadboundary.cpp | 32 | threadboundary.cpp |
@@ -36,6 +37,7 @@ set(command_SRCS | |||
36 | domain/applicationdomaintype.cpp | 37 | domain/applicationdomaintype.cpp |
37 | domain/event.cpp | 38 | domain/event.cpp |
38 | domain/mail.cpp | 39 | domain/mail.cpp |
40 | domain/folder.cpp | ||
39 | ${storage_SRCS}) | 41 | ${storage_SRCS}) |
40 | 42 | ||
41 | add_library(${PROJECT_NAME} SHARED ${command_SRCS}) | 43 | add_library(${PROJECT_NAME} SHARED ${command_SRCS}) |
@@ -55,6 +57,8 @@ generate_flatbuffers( | |||
55 | commands/revisionreplayed | 57 | commands/revisionreplayed |
56 | domain/event | 58 | domain/event |
57 | domain/mail | 59 | domain/mail |
60 | domain/folder | ||
61 | domain/dummy | ||
58 | entity | 62 | entity |
59 | metadata | 63 | metadata |
60 | queuedcommand | 64 | queuedcommand |
@@ -70,12 +74,6 @@ install(FILES | |||
70 | clientapi.h | 74 | clientapi.h |
71 | domain/applicationdomaintype.h | 75 | domain/applicationdomaintype.h |
72 | query.h | 76 | query.h |
73 | threadboundary.h | ||
74 | resultprovider.h | ||
75 | facadefactory.h | ||
76 | log.h | ||
77 | listmodelresult.h | ||
78 | bufferadaptor.h | 77 | bufferadaptor.h |
79 | facadeinterface.h | ||
80 | DESTINATION ${INCLUDE_INSTALL_DIR}/${PROJECT_NAME} COMPONENT Devel | 78 | DESTINATION ${INCLUDE_INSTALL_DIR}/${PROJECT_NAME} COMPONENT Devel |
81 | ) | 79 | ) |
diff --git a/common/clientapi.cpp b/common/clientapi.cpp index f99ebb8..29b7e68 100644 --- a/common/clientapi.cpp +++ b/common/clientapi.cpp | |||
@@ -1,19 +1,47 @@ | |||
1 | /* | ||
2 | * Copyright (C) 2014 Christian Mollekopf <chrigi_1@fastmail.fm> | ||
3 | * | ||
4 | * This library is free software; you can redistribute it and/or | ||
5 | * modify it under the terms of the GNU Lesser General Public | ||
6 | * License as published by the Free Software Foundation; either | ||
7 | * version 2.1 of the License, or (at your option) version 3, or any | ||
8 | * later version accepted by the membership of KDE e.V. (or its | ||
9 | * successor approved by the membership of KDE e.V.), which shall | ||
10 | * act as a proxy defined in Section 6 of version 3 of the license. | ||
11 | * | ||
12 | * This library is distributed in the hope that it will be useful, | ||
13 | * but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
14 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | ||
15 | * Lesser General Public License for more details. | ||
16 | * | ||
17 | * You should have received a copy of the GNU Lesser General Public | ||
18 | * License along with this library. If not, see <http://www.gnu.org/licenses/>. | ||
19 | */ | ||
1 | 20 | ||
2 | #include "clientapi.h" | 21 | #include "clientapi.h" |
22 | |||
23 | #include <QtConcurrent/QtConcurrentRun> | ||
24 | #include <QTimer> | ||
25 | #include <QEventLoop> | ||
26 | #include <QAbstractItemModel> | ||
27 | #include <functional> | ||
28 | #include <memory> | ||
29 | |||
3 | #include "resourceaccess.h" | 30 | #include "resourceaccess.h" |
4 | #include "commands.h" | 31 | #include "commands.h" |
5 | #include "resourcefacade.h" | 32 | #include "resourcefacade.h" |
6 | #include "log.h" | 33 | #include "log.h" |
7 | #include "definitions.h" | 34 | #include "definitions.h" |
8 | #include "resourceconfig.h" | 35 | #include "resourceconfig.h" |
9 | #include <QtConcurrent/QtConcurrentRun> | 36 | #include "facadefactory.h" |
10 | #include <QTimer> | 37 | #include "modelresult.h" |
38 | #include "log.h" | ||
11 | 39 | ||
12 | #define ASYNCINTHREAD | 40 | #define ASYNCINTHREAD |
13 | 41 | ||
14 | namespace async | 42 | namespace async |
15 | { | 43 | { |
16 | void run(const std::function<void()> &runner) { | 44 | static void run(const std::function<void()> &runner) { |
17 | auto timer = new QTimer(); | 45 | auto timer = new QTimer(); |
18 | timer->setSingleShot(true); | 46 | timer->setSingleShot(true); |
19 | QObject::connect(timer, &QTimer::timeout, [runner, timer]() { | 47 | QObject::connect(timer, &QTimer::timeout, [runner, timer]() { |
@@ -69,6 +97,76 @@ QList<QByteArray> Store::getResources(const QList<QByteArray> &resourceFilter, c | |||
69 | return resources; | 97 | return resources; |
70 | } | 98 | } |
71 | 99 | ||
100 | template <class DomainType> | ||
101 | QSharedPointer<QAbstractItemModel> Store::loadModel(Query query) | ||
102 | { | ||
103 | auto model = QSharedPointer<ModelResult<DomainType, typename DomainType::Ptr> >::create(query, query.requestedProperties.toList()); | ||
104 | |||
105 | //* Client defines lifetime of model | ||
106 | //* The model lifetime defines the duration of live-queries | ||
107 | //* The facade needs to life for the duration of any calls being made (assuming we get rid of any internal callbacks | ||
108 | //* The emitter needs to live or the duration of query (respectively, the model) | ||
109 | //* The result provider needs to live for as long as results are provided (until the last thread exits). | ||
110 | |||
111 | // Query all resources and aggregate results | ||
112 | KAsync::iterate(getResources(query.resources, ApplicationDomain::getTypeName<DomainType>())) | ||
113 | .template each<void, QByteArray>([query, model](const QByteArray &resource, KAsync::Future<void> &future) { | ||
114 | auto facade = FacadeFactory::instance().getFacade<DomainType>(resourceName(resource), resource); | ||
115 | if (facade) { | ||
116 | Trace() << "Trying to fetch from resource"; | ||
117 | auto result = facade->load(query); | ||
118 | auto emitter = result.second; | ||
119 | //TODO use aggregating emitter instead | ||
120 | model->setEmitter(emitter); | ||
121 | model->fetchMore(QModelIndex()); | ||
122 | result.first.template then<void>([&future](){future.setFinished();}).exec(); | ||
123 | } else { | ||
124 | //Ignore the error and carry on | ||
125 | future.setFinished(); | ||
126 | } | ||
127 | }).exec(); | ||
128 | |||
129 | return model; | ||
130 | } | ||
131 | |||
132 | template <class DomainType> | ||
133 | static std::shared_ptr<StoreFacade<DomainType> > getFacade(const QByteArray &resourceInstanceIdentifier) | ||
134 | { | ||
135 | if (auto facade = FacadeFactory::instance().getFacade<DomainType>(resourceName(resourceInstanceIdentifier), resourceInstanceIdentifier)) { | ||
136 | return facade; | ||
137 | } | ||
138 | return std::make_shared<NullFacade<DomainType> >(); | ||
139 | } | ||
140 | |||
141 | template <class DomainType> | ||
142 | KAsync::Job<void> Store::create(const DomainType &domainObject) { | ||
143 | //Potentially move to separate thread as well | ||
144 | auto facade = getFacade<DomainType>(domainObject.resourceInstanceIdentifier()); | ||
145 | return facade->create(domainObject).template then<void>([facade](){}, [](int errorCode, const QString &error) { | ||
146 | Warning() << "Failed to create"; | ||
147 | }); | ||
148 | } | ||
149 | |||
150 | template <class DomainType> | ||
151 | KAsync::Job<void> Store::modify(const DomainType &domainObject) | ||
152 | { | ||
153 | //Potentially move to separate thread as well | ||
154 | auto facade = getFacade<DomainType>(domainObject.resourceInstanceIdentifier()); | ||
155 | return facade->modify(domainObject).template then<void>([facade](){}, [](int errorCode, const QString &error) { | ||
156 | Warning() << "Failed to modify"; | ||
157 | }); | ||
158 | } | ||
159 | |||
160 | template <class DomainType> | ||
161 | KAsync::Job<void> Store::remove(const DomainType &domainObject) | ||
162 | { | ||
163 | //Potentially move to separate thread as well | ||
164 | auto facade = getFacade<DomainType>(domainObject.resourceInstanceIdentifier()); | ||
165 | return facade->remove(domainObject).template then<void>([facade](){}, [](int errorCode, const QString &error) { | ||
166 | Warning() << "Failed to remove"; | ||
167 | }); | ||
168 | } | ||
169 | |||
72 | KAsync::Job<void> Store::shutdown(const QByteArray &identifier) | 170 | KAsync::Job<void> Store::shutdown(const QByteArray &identifier) |
73 | { | 171 | { |
74 | Trace() << "shutdown"; | 172 | Trace() << "shutdown"; |
@@ -95,7 +193,7 @@ KAsync::Job<void> Store::synchronize(const Akonadi2::Query &query) | |||
95 | .template each<void, QByteArray>([query](const QByteArray &resource, KAsync::Future<void> &future) { | 193 | .template each<void, QByteArray>([query](const QByteArray &resource, KAsync::Future<void> &future) { |
96 | auto resourceAccess = QSharedPointer<Akonadi2::ResourceAccess>::create(resource); | 194 | auto resourceAccess = QSharedPointer<Akonadi2::ResourceAccess>::create(resource); |
97 | resourceAccess->open(); | 195 | resourceAccess->open(); |
98 | resourceAccess->synchronizeResource(true, false).then<void>([&future]() { | 196 | resourceAccess->synchronizeResource(query.syncOnDemand, query.processAll).then<void>([&future, resourceAccess]() { |
99 | future.setFinished(); | 197 | future.setFinished(); |
100 | }).exec(); | 198 | }).exec(); |
101 | }) | 199 | }) |
@@ -103,4 +201,15 @@ KAsync::Job<void> Store::synchronize(const Akonadi2::Query &query) | |||
103 | .template then<void>([](){}); | 201 | .template then<void>([](){}); |
104 | } | 202 | } |
105 | 203 | ||
204 | #define REGISTER_TYPE(T) template KAsync::Job<void> Store::remove<T>(const T &domainObject); \ | ||
205 | template KAsync::Job<void> Store::create<T>(const T &domainObject); \ | ||
206 | template KAsync::Job<void> Store::modify<T>(const T &domainObject); \ | ||
207 | template QSharedPointer<QAbstractItemModel> Store::loadModel<T>(Query query); \ | ||
208 | |||
209 | REGISTER_TYPE(ApplicationDomain::Event); | ||
210 | REGISTER_TYPE(ApplicationDomain::Mail); | ||
211 | REGISTER_TYPE(ApplicationDomain::Folder); | ||
212 | REGISTER_TYPE(ApplicationDomain::AkonadiResource); | ||
213 | |||
106 | } // namespace Akonadi2 | 214 | } // namespace Akonadi2 |
215 | |||
diff --git a/common/clientapi.h b/common/clientapi.h index 9a32188..8f87562 100644 --- a/common/clientapi.h +++ b/common/clientapi.h | |||
@@ -22,28 +22,16 @@ | |||
22 | 22 | ||
23 | #include <QString> | 23 | #include <QString> |
24 | #include <QSharedPointer> | 24 | #include <QSharedPointer> |
25 | #include <QEventLoop> | ||
26 | #include <functional> | ||
27 | #include <memory> | ||
28 | 25 | ||
29 | #include <Async/Async> | 26 | #include <Async/Async> |
30 | 27 | ||
31 | #include "query.h" | 28 | #include "query.h" |
32 | #include "resultprovider.h" | ||
33 | #include "applicationdomaintype.h" | 29 | #include "applicationdomaintype.h" |
34 | #include "facadefactory.h" | ||
35 | #include "log.h" | ||
36 | 30 | ||
37 | namespace async { | 31 | class QAbstractItemModel; |
38 | //This should abstract if we execute from eventloop or in thread. | ||
39 | //It supposed to allow the caller to finish the current method before executing the runner. | ||
40 | void run(const std::function<void()> &runner); | ||
41 | } | ||
42 | 32 | ||
43 | namespace Akonadi2 { | 33 | namespace Akonadi2 { |
44 | 34 | ||
45 | using namespace async; | ||
46 | |||
47 | /** | 35 | /** |
48 | * Store interface used in the client API. | 36 | * Store interface used in the client API. |
49 | */ | 37 | */ |
@@ -55,77 +43,22 @@ public: | |||
55 | static QString storageLocation(); | 43 | static QString storageLocation(); |
56 | static QByteArray resourceName(const QByteArray &instanceIdentifier); | 44 | static QByteArray resourceName(const QByteArray &instanceIdentifier); |
57 | 45 | ||
58 | /** | 46 | enum Roles { |
59 | * Asynchronusly load a dataset | 47 | DomainObjectRole = Qt::UserRole + 1, //Must be the same as in ModelResult |
60 | */ | 48 | ChildrenFetchedRole |
61 | template <class DomainType> | 49 | }; |
62 | static QSharedPointer<ResultEmitter<typename DomainType::Ptr> > load(Query query) | ||
63 | { | ||
64 | auto resultSet = QSharedPointer<ResultProvider<typename DomainType::Ptr> >::create(); | ||
65 | |||
66 | //Execute the search in a thread. | ||
67 | //We must guarantee that the emitter is returned before the first result is emitted. | ||
68 | //The result provider must be threadsafe. | ||
69 | async::run([query, resultSet](){ | ||
70 | QEventLoop eventLoop; | ||
71 | resultSet->onDone([&eventLoop](){ | ||
72 | eventLoop.quit(); | ||
73 | }); | ||
74 | // Query all resources and aggregate results | ||
75 | KAsync::iterate(getResources(query.resources, ApplicationDomain::getTypeName<DomainType>())) | ||
76 | .template each<void, QByteArray>([query, resultSet](const QByteArray &resource, KAsync::Future<void> &future) { | ||
77 | auto facade = FacadeFactory::instance().getFacade<DomainType>(resourceName(resource), resource); | ||
78 | if (facade) { | ||
79 | facade->load(query, resultSet).template then<void>([&future](){future.setFinished();}).exec(); | ||
80 | //Keep the facade alive for the lifetime of the resultSet. | ||
81 | resultSet->setFacade(facade); | ||
82 | } else { | ||
83 | //Ignore the error and carry on | ||
84 | future.setFinished(); | ||
85 | } | ||
86 | }).template then<void>([query, resultSet]() { | ||
87 | resultSet->initialResultSetComplete(); | ||
88 | if (!query.liveQuery) { | ||
89 | resultSet->complete(); | ||
90 | } | ||
91 | }).exec(); | ||
92 | |||
93 | //Keep the thread alive until the result is ready | ||
94 | if (!resultSet->isDone()) { | ||
95 | eventLoop.exec(); | ||
96 | } | ||
97 | }); | ||
98 | return resultSet->emitter(); | ||
99 | } | ||
100 | 50 | ||
101 | /** | 51 | /** |
102 | * Asynchronusly load a dataset with tree structure information | 52 | * Asynchronusly load a dataset with tree structure information |
103 | */ | 53 | */ |
104 | // template <class DomainType> | ||
105 | // static TreeSet<DomainType> loadTree(Query) | ||
106 | // { | ||
107 | |||
108 | // } | ||
109 | template <class DomainType> | 54 | template <class DomainType> |
110 | static std::shared_ptr<StoreFacade<DomainType> > getFacade(const QByteArray &resourceInstanceIdentifier) | 55 | static QSharedPointer<QAbstractItemModel> loadModel(Query query); |
111 | { | ||
112 | if (auto facade = FacadeFactory::instance().getFacade<DomainType>(resourceName(resourceInstanceIdentifier), resourceInstanceIdentifier)) { | ||
113 | return facade; | ||
114 | } | ||
115 | return std::make_shared<NullFacade<DomainType> >(); | ||
116 | } | ||
117 | 56 | ||
118 | /** | 57 | /** |
119 | * Create a new entity. | 58 | * Create a new entity. |
120 | */ | 59 | */ |
121 | template <class DomainType> | 60 | template <class DomainType> |
122 | static KAsync::Job<void> create(const DomainType &domainObject) { | 61 | static KAsync::Job<void> create(const DomainType &domainObject); |
123 | //Potentially move to separate thread as well | ||
124 | auto facade = getFacade<DomainType>(domainObject.resourceInstanceIdentifier()); | ||
125 | return facade->create(domainObject).template then<void>([facade](){}, [](int errorCode, const QString &error) { | ||
126 | Warning() << "Failed to create"; | ||
127 | }); | ||
128 | } | ||
129 | 62 | ||
130 | /** | 63 | /** |
131 | * Modify an entity. | 64 | * Modify an entity. |
@@ -133,25 +66,13 @@ public: | |||
133 | * This includes moving etc. since these are also simple settings on a property. | 66 | * This includes moving etc. since these are also simple settings on a property. |
134 | */ | 67 | */ |
135 | template <class DomainType> | 68 | template <class DomainType> |
136 | static KAsync::Job<void> modify(const DomainType &domainObject) { | 69 | static KAsync::Job<void> modify(const DomainType &domainObject); |
137 | //Potentially move to separate thread as well | ||
138 | auto facade = getFacade<DomainType>(domainObject.resourceInstanceIdentifier()); | ||
139 | return facade->modify(domainObject).template then<void>([facade](){}, [](int errorCode, const QString &error) { | ||
140 | Warning() << "Failed to modify"; | ||
141 | }); | ||
142 | } | ||
143 | 70 | ||
144 | /** | 71 | /** |
145 | * Remove an entity. | 72 | * Remove an entity. |
146 | */ | 73 | */ |
147 | template <class DomainType> | 74 | template <class DomainType> |
148 | static KAsync::Job<void> remove(const DomainType &domainObject) { | 75 | static KAsync::Job<void> remove(const DomainType &domainObject); |
149 | //Potentially move to separate thread as well | ||
150 | auto facade = getFacade<DomainType>(domainObject.resourceInstanceIdentifier()); | ||
151 | return facade->remove(domainObject).template then<void>([facade](){}, [](int errorCode, const QString &error) { | ||
152 | Warning() << "Failed to remove"; | ||
153 | }); | ||
154 | } | ||
155 | 76 | ||
156 | /** | 77 | /** |
157 | * Shutdown resource. | 78 | * Shutdown resource. |
diff --git a/common/domain/applicationdomaintype.cpp b/common/domain/applicationdomaintype.cpp index 1b5d870..c9a8bba 100644 --- a/common/domain/applicationdomaintype.cpp +++ b/common/domain/applicationdomaintype.cpp | |||
@@ -60,10 +60,13 @@ ApplicationDomainType& ApplicationDomainType::operator=(const ApplicationDomainT | |||
60 | return *this; | 60 | return *this; |
61 | } | 61 | } |
62 | 62 | ||
63 | ApplicationDomainType::~ApplicationDomainType() {} | 63 | ApplicationDomainType::~ApplicationDomainType() |
64 | { | ||
65 | } | ||
64 | 66 | ||
65 | QVariant ApplicationDomainType::getProperty(const QByteArray &key) const | 67 | QVariant ApplicationDomainType::getProperty(const QByteArray &key) const |
66 | { | 68 | { |
69 | Q_ASSERT(mAdaptor); | ||
67 | if (!mAdaptor->availableProperties().contains(key)) { | 70 | if (!mAdaptor->availableProperties().contains(key)) { |
68 | Warning() << "No such property available " << key; | 71 | Warning() << "No such property available " << key; |
69 | } | 72 | } |
@@ -72,7 +75,9 @@ QVariant ApplicationDomainType::getProperty(const QByteArray &key) const | |||
72 | 75 | ||
73 | void ApplicationDomainType::setProperty(const QByteArray &key, const QVariant &value) | 76 | void ApplicationDomainType::setProperty(const QByteArray &key, const QVariant &value) |
74 | { | 77 | { |
75 | mChangeSet.insert(key, value); mAdaptor->setProperty(key, value); | 78 | Q_ASSERT(mAdaptor); |
79 | mChangeSet.insert(key, value); | ||
80 | mAdaptor->setProperty(key, value); | ||
76 | } | 81 | } |
77 | 82 | ||
78 | QByteArrayList ApplicationDomainType::changedProperties() const | 83 | QByteArrayList ApplicationDomainType::changedProperties() const |
diff --git a/common/domain/applicationdomaintype.h b/common/domain/applicationdomaintype.h index 5514d26..227ab4d 100644 --- a/common/domain/applicationdomaintype.h +++ b/common/domain/applicationdomaintype.h | |||
@@ -160,3 +160,7 @@ Q_DECLARE_METATYPE(Akonadi2::ApplicationDomain::Event) | |||
160 | Q_DECLARE_METATYPE(Akonadi2::ApplicationDomain::Event::Ptr) | 160 | Q_DECLARE_METATYPE(Akonadi2::ApplicationDomain::Event::Ptr) |
161 | Q_DECLARE_METATYPE(Akonadi2::ApplicationDomain::Mail) | 161 | Q_DECLARE_METATYPE(Akonadi2::ApplicationDomain::Mail) |
162 | Q_DECLARE_METATYPE(Akonadi2::ApplicationDomain::Mail::Ptr) | 162 | Q_DECLARE_METATYPE(Akonadi2::ApplicationDomain::Mail::Ptr) |
163 | Q_DECLARE_METATYPE(Akonadi2::ApplicationDomain::Folder) | ||
164 | Q_DECLARE_METATYPE(Akonadi2::ApplicationDomain::Folder::Ptr) | ||
165 | Q_DECLARE_METATYPE(Akonadi2::ApplicationDomain::AkonadiResource) | ||
166 | Q_DECLARE_METATYPE(Akonadi2::ApplicationDomain::AkonadiResource::Ptr) | ||
diff --git a/common/domain/dummy.fbs b/common/domain/dummy.fbs new file mode 100644 index 0000000..8816b09 --- /dev/null +++ b/common/domain/dummy.fbs | |||
@@ -0,0 +1,7 @@ | |||
1 | namespace Akonadi2.ApplicationDomain.Buffer; | ||
2 | |||
3 | table Dummy { | ||
4 | } | ||
5 | |||
6 | root_type Dummy; | ||
7 | file_identifier "AKFB"; | ||
diff --git a/common/domain/event.cpp b/common/domain/event.cpp index 87e13bc..42c13e2 100644 --- a/common/domain/event.cpp +++ b/common/domain/event.cpp | |||
@@ -47,6 +47,7 @@ ResultSet TypeImplementation<Event>::queryIndexes(const Akonadi2::Query &query, | |||
47 | }); | 47 | }); |
48 | appliedFilters << "uid"; | 48 | appliedFilters << "uid"; |
49 | } | 49 | } |
50 | Trace() << "Index lookup found " << keys.size() << " keys."; | ||
50 | return ResultSet(keys); | 51 | return ResultSet(keys); |
51 | } | 52 | } |
52 | 53 | ||
diff --git a/common/domain/folder.cpp b/common/domain/folder.cpp new file mode 100644 index 0000000..989d2c4 --- /dev/null +++ b/common/domain/folder.cpp | |||
@@ -0,0 +1,100 @@ | |||
1 | /* | ||
2 | * Copyright (C) 2015 Christian Mollekopf <chrigi_1@fastfolder.fm> | ||
3 | * | ||
4 | * This program is free software; you can redistribute it and/or modify | ||
5 | * it under the terms of the GNU General Public License as published by | ||
6 | * the Free Software Foundation; either version 2 of the License, or | ||
7 | * (at your option) any later version. | ||
8 | * | ||
9 | * This program is distributed in the hope that it will be useful, | ||
10 | * but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
11 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||
12 | * GNU General Public License for more details. | ||
13 | * | ||
14 | * You should have received a copy of the GNU General Public License | ||
15 | * along with this program; if not, write to the | ||
16 | * Free Software Foundation, Inc., | ||
17 | * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. | ||
18 | */ | ||
19 | #include "folder.h" | ||
20 | |||
21 | #include <QVector> | ||
22 | #include <QByteArray> | ||
23 | #include <QString> | ||
24 | |||
25 | #include "../resultset.h" | ||
26 | #include "../index.h" | ||
27 | #include "../storage.h" | ||
28 | #include "../log.h" | ||
29 | #include "../propertymapper.h" | ||
30 | #include "../query.h" | ||
31 | #include "../definitions.h" | ||
32 | |||
33 | #include "folder_generated.h" | ||
34 | |||
35 | using namespace Akonadi2::ApplicationDomain; | ||
36 | |||
37 | ResultSet TypeImplementation<Folder>::queryIndexes(const Akonadi2::Query &query, const QByteArray &resourceInstanceIdentifier, QSet<QByteArray> &appliedFilters, Akonadi2::Storage::Transaction &transaction) | ||
38 | { | ||
39 | QVector<QByteArray> keys; | ||
40 | if (query.propertyFilter.contains("parent")) { | ||
41 | Index index("folder.index.parent", transaction); | ||
42 | auto lookupKey = query.propertyFilter.value("parent").toByteArray(); | ||
43 | if (lookupKey.isEmpty()) { | ||
44 | lookupKey = "toplevel"; | ||
45 | } | ||
46 | index.lookup(lookupKey, [&](const QByteArray &value) { | ||
47 | keys << value; | ||
48 | }, | ||
49 | [](const Index::Error &error) { | ||
50 | Warning() << "Error in uid index: " << error.message; | ||
51 | }); | ||
52 | appliedFilters << "parent"; | ||
53 | } | ||
54 | Trace() << "Index lookup found " << keys.size() << " keys."; | ||
55 | return ResultSet(keys); | ||
56 | } | ||
57 | |||
58 | void TypeImplementation<Folder>::index(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Akonadi2::Storage::Transaction &transaction) | ||
59 | { | ||
60 | const auto parent = bufferAdaptor.getProperty("parent"); | ||
61 | Trace() << "indexing " << identifier << " with parent " << parent.toByteArray(); | ||
62 | if (parent.isValid()) { | ||
63 | Q_ASSERT(!parent.toByteArray().isEmpty()); | ||
64 | Index("folder.index.parent", transaction).add(parent.toByteArray(), identifier); | ||
65 | } else { | ||
66 | Index("folder.index.parent", transaction).add("toplevel", identifier); | ||
67 | } | ||
68 | } | ||
69 | |||
70 | void TypeImplementation<Folder>::removeIndex(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Akonadi2::Storage::Transaction &transaction) | ||
71 | { | ||
72 | const auto parent = bufferAdaptor.getProperty("parent"); | ||
73 | if (parent.isValid()) { | ||
74 | Index("folder.index.parent", transaction).remove(parent.toByteArray(), identifier); | ||
75 | } else { | ||
76 | Index("folder.index.parent", transaction).remove("toplevel", identifier); | ||
77 | } | ||
78 | } | ||
79 | |||
80 | QSharedPointer<ReadPropertyMapper<TypeImplementation<Folder>::Buffer> > TypeImplementation<Folder>::initializeReadPropertyMapper() | ||
81 | { | ||
82 | auto propertyMapper = QSharedPointer<ReadPropertyMapper<Buffer> >::create(); | ||
83 | propertyMapper->addMapping<QString, Buffer>("parent", &Buffer::parent); | ||
84 | propertyMapper->addMapping<QString, Buffer>("name", &Buffer::name); | ||
85 | return propertyMapper; | ||
86 | } | ||
87 | |||
88 | QSharedPointer<WritePropertyMapper<TypeImplementation<Folder>::BufferBuilder> > TypeImplementation<Folder>::initializeWritePropertyMapper() | ||
89 | { | ||
90 | auto propertyMapper = QSharedPointer<WritePropertyMapper<BufferBuilder> >::create(); | ||
91 | propertyMapper->addMapping("parent", [](const QVariant &value, flatbuffers::FlatBufferBuilder &fbb) -> std::function<void(BufferBuilder &)> { | ||
92 | auto offset = variantToProperty<QString>(value, fbb); | ||
93 | return [offset](BufferBuilder &builder) { builder.add_parent(offset); }; | ||
94 | }); | ||
95 | propertyMapper->addMapping("name", [](const QVariant &value, flatbuffers::FlatBufferBuilder &fbb) -> std::function<void(BufferBuilder &)> { | ||
96 | auto offset = variantToProperty<QString>(value, fbb); | ||
97 | return [offset](BufferBuilder &builder) { builder.add_name(offset); }; | ||
98 | }); | ||
99 | return propertyMapper; | ||
100 | } | ||
diff --git a/common/domain/folder.fbs b/common/domain/folder.fbs new file mode 100644 index 0000000..3476d58 --- /dev/null +++ b/common/domain/folder.fbs | |||
@@ -0,0 +1,9 @@ | |||
1 | namespace Akonadi2.ApplicationDomain.Buffer; | ||
2 | |||
3 | table Folder { | ||
4 | name:string; | ||
5 | parent:string; | ||
6 | } | ||
7 | |||
8 | root_type Folder; | ||
9 | file_identifier "AKFB"; | ||
diff --git a/common/domain/folder.h b/common/domain/folder.h new file mode 100644 index 0000000..545836f --- /dev/null +++ b/common/domain/folder.h | |||
@@ -0,0 +1,56 @@ | |||
1 | /* | ||
2 | * Copyright (C) 2015 Christian Mollekopf <chrigi_1@fastmail.fm> | ||
3 | * | ||
4 | * This program is free software; you can redistribute it and/or modify | ||
5 | * it under the terms of the GNU General Public License as published by | ||
6 | * the Free Software Foundation; either version 2 of the License, or | ||
7 | * (at your option) any later version. | ||
8 | * | ||
9 | * This program is distributed in the hope that it will be useful, | ||
10 | * but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
11 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||
12 | * GNU General Public License for more details. | ||
13 | * | ||
14 | * You should have received a copy of the GNU General Public License | ||
15 | * along with this program; if not, write to the | ||
16 | * Free Software Foundation, Inc., | ||
17 | * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. | ||
18 | */ | ||
19 | #pragma once | ||
20 | |||
21 | #include "applicationdomaintype.h" | ||
22 | |||
23 | #include "storage.h" | ||
24 | |||
25 | class ResultSet; | ||
26 | class QByteArray; | ||
27 | |||
28 | template<typename T> | ||
29 | class ReadPropertyMapper; | ||
30 | template<typename T> | ||
31 | class WritePropertyMapper; | ||
32 | |||
33 | namespace Akonadi2 { | ||
34 | class Query; | ||
35 | |||
36 | namespace ApplicationDomain { | ||
37 | namespace Buffer { | ||
38 | struct Folder; | ||
39 | struct FolderBuilder; | ||
40 | } | ||
41 | |||
42 | template<> | ||
43 | class TypeImplementation<Akonadi2::ApplicationDomain::Folder> { | ||
44 | public: | ||
45 | typedef Akonadi2::ApplicationDomain::Buffer::Folder Buffer; | ||
46 | typedef Akonadi2::ApplicationDomain::Buffer::FolderBuilder BufferBuilder; | ||
47 | static QSet<QByteArray> indexedProperties(); | ||
48 | static ResultSet queryIndexes(const Akonadi2::Query &query, const QByteArray &resourceInstanceIdentifier, QSet<QByteArray> &appliedFilters, Akonadi2::Storage::Transaction &transaction); | ||
49 | static void index(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Akonadi2::Storage::Transaction &transaction); | ||
50 | static void removeIndex(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Akonadi2::Storage::Transaction &transaction); | ||
51 | static QSharedPointer<ReadPropertyMapper<Buffer> > initializeReadPropertyMapper(); | ||
52 | static QSharedPointer<WritePropertyMapper<BufferBuilder> > initializeWritePropertyMapper(); | ||
53 | }; | ||
54 | |||
55 | } | ||
56 | } | ||
diff --git a/common/domainadaptor.h b/common/domainadaptor.h index b14fbcd..b541e23 100644 --- a/common/domainadaptor.h +++ b/common/domainadaptor.h | |||
@@ -1,5 +1,5 @@ | |||
1 | /* | 1 | /* |
2 | * Copyright (C) 2014 Christian Mollekopf <chrigi_1@fastmail.fm> | 2 | * Copyright (C) 2014 Christian Mollekopf <chrigi_1@fastmail.fm> |
3 | * | 3 | * |
4 | * This program is free software; you can redistribute it and/or modify | 4 | * This program is free software; you can redistribute it and/or modify |
5 | * it under the terms of the GNU General Public License as published by | 5 | * it under the terms of the GNU General Public License as published by |
@@ -23,9 +23,11 @@ | |||
23 | #include <QByteArray> | 23 | #include <QByteArray> |
24 | #include <functional> | 24 | #include <functional> |
25 | 25 | ||
26 | #include "domaintypeadaptorfactoryinterface.h" | ||
26 | #include "domain/applicationdomaintype.h" | 27 | #include "domain/applicationdomaintype.h" |
27 | #include "domain/event.h" | 28 | #include "domain/event.h" |
28 | #include "domain/mail.h" | 29 | #include "domain/mail.h" |
30 | #include "domain/folder.h" | ||
29 | #include "bufferadaptor.h" | 31 | #include "bufferadaptor.h" |
30 | #include "entity_generated.h" | 32 | #include "entity_generated.h" |
31 | #include "metadata_generated.h" | 33 | #include "metadata_generated.h" |
@@ -123,15 +125,6 @@ public: | |||
123 | QSharedPointer<ReadPropertyMapper<ResourceBuffer> > mResourceMapper; | 125 | QSharedPointer<ReadPropertyMapper<ResourceBuffer> > mResourceMapper; |
124 | }; | 126 | }; |
125 | 127 | ||
126 | class DomainTypeAdaptorFactoryInterface | ||
127 | { | ||
128 | public: | ||
129 | typedef QSharedPointer<DomainTypeAdaptorFactoryInterface> Ptr; | ||
130 | virtual ~DomainTypeAdaptorFactoryInterface() {}; | ||
131 | virtual QSharedPointer<Akonadi2::ApplicationDomain::BufferAdaptor> createAdaptor(const Akonadi2::Entity &entity) = 0; | ||
132 | virtual void createBuffer(const Akonadi2::ApplicationDomain::ApplicationDomainType &domainType, flatbuffers::FlatBufferBuilder &fbb, void const *metadataData = 0, size_t metadataSize = 0) = 0; | ||
133 | }; | ||
134 | |||
135 | /** | 128 | /** |
136 | * The factory should define how to go from an entitybuffer (local + resource buffer), to a domain type adapter. | 129 | * The factory should define how to go from an entitybuffer (local + resource buffer), to a domain type adapter. |
137 | * It defines how values are split accross local and resource buffer. | 130 | * It defines how values are split accross local and resource buffer. |
diff --git a/common/domaintypeadaptorfactoryinterface.h b/common/domaintypeadaptorfactoryinterface.h new file mode 100644 index 0000000..8c99aa1 --- /dev/null +++ b/common/domaintypeadaptorfactoryinterface.h | |||
@@ -0,0 +1,42 @@ | |||
1 | /* | ||
2 | * Copyright (C) 2014 Christian Mollekopf <chrigi_1@fastmail.fm> | ||
3 | * | ||
4 | * This program is free software; you can redistribute it and/or modify | ||
5 | * it under the terms of the GNU General Public License as published by | ||
6 | * the Free Software Foundation; either version 2 of the License, or | ||
7 | * (at your option) any later version. | ||
8 | * | ||
9 | * This program is distributed in the hope that it will be useful, | ||
10 | * but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
11 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||
12 | * GNU General Public License for more details. | ||
13 | * | ||
14 | * You should have received a copy of the GNU General Public License | ||
15 | * along with this program; if not, write to the | ||
16 | * Free Software Foundation, Inc., | ||
17 | * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. | ||
18 | */ | ||
19 | #pragma once | ||
20 | |||
21 | #include <QSharedPointer> | ||
22 | |||
23 | namespace Akonadi2 { | ||
24 | namespace ApplicationDomain { | ||
25 | class BufferAdaptor; | ||
26 | class ApplicationDomainType; | ||
27 | } | ||
28 | struct Entity; | ||
29 | } | ||
30 | |||
31 | namespace flatbuffers { | ||
32 | class FlatBufferBuilder; | ||
33 | } | ||
34 | |||
35 | class DomainTypeAdaptorFactoryInterface | ||
36 | { | ||
37 | public: | ||
38 | typedef QSharedPointer<DomainTypeAdaptorFactoryInterface> Ptr; | ||
39 | virtual ~DomainTypeAdaptorFactoryInterface() {}; | ||
40 | virtual QSharedPointer<Akonadi2::ApplicationDomain::BufferAdaptor> createAdaptor(const Akonadi2::Entity &entity) = 0; | ||
41 | virtual void createBuffer(const Akonadi2::ApplicationDomain::ApplicationDomainType &domainType, flatbuffers::FlatBufferBuilder &fbb, void const *metadataData = 0, size_t metadataSize = 0) = 0; | ||
42 | }; | ||
diff --git a/common/entitystorage.cpp b/common/entitystorage.cpp deleted file mode 100644 index 5d4df9f..0000000 --- a/common/entitystorage.cpp +++ /dev/null | |||
@@ -1,74 +0,0 @@ | |||
1 | /* | ||
2 | * Copyright (C) 2014 Christian Mollekopf <chrigi_1@fastmail.fm> | ||
3 | * | ||
4 | * This program is free software; you can redistribute it and/or modify | ||
5 | * it under the terms of the GNU General Public License as published by | ||
6 | * the Free Software Foundation; either version 2 of the License, or | ||
7 | * (at your option) any later version. | ||
8 | * | ||
9 | * This program is distributed in the hope that it will be useful, | ||
10 | * but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
11 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||
12 | * GNU General Public License for more details. | ||
13 | * | ||
14 | * You should have received a copy of the GNU General Public License | ||
15 | * along with this program; if not, write to the | ||
16 | * Free Software Foundation, Inc., | ||
17 | * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. | ||
18 | */ | ||
19 | |||
20 | #include "entitystorage.h" | ||
21 | |||
22 | ResultSet EntityStorageBase::filteredSet(const ResultSet &resultSet, const std::function<bool(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> &filter, const Akonadi2::Storage::Transaction &transaction, bool initialQuery) | ||
23 | { | ||
24 | auto resultSetPtr = QSharedPointer<ResultSet>::create(resultSet); | ||
25 | |||
26 | //Read through the source values and return whatever matches the filter | ||
27 | std::function<bool(std::function<void(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &, Akonadi2::Operation)>)> generator = [this, resultSetPtr, &transaction, filter, initialQuery](std::function<void(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &, Akonadi2::Operation)> callback) -> bool { | ||
28 | while (resultSetPtr->next()) { | ||
29 | readEntity(transaction, resultSetPtr->id(), [this, filter, callback, initialQuery](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject, Akonadi2::Operation operation) { | ||
30 | //Always remove removals, they probably don't match due to non-available properties | ||
31 | if (filter(domainObject) || operation == Akonadi2::Operation_Removal) { | ||
32 | if (initialQuery) { | ||
33 | //We're not interested in removals during the initial query | ||
34 | if (operation != Akonadi2::Operation_Removal) { | ||
35 | callback(domainObject, Akonadi2::Operation_Creation); | ||
36 | } | ||
37 | } else { | ||
38 | callback(domainObject, operation); | ||
39 | } | ||
40 | } | ||
41 | }); | ||
42 | } | ||
43 | return false; | ||
44 | }; | ||
45 | return ResultSet(generator); | ||
46 | } | ||
47 | |||
48 | |||
49 | ResultSet EntityStorageBase::getResultSet(const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, qint64 baseRevision) | ||
50 | { | ||
51 | QSet<QByteArray> remainingFilters = query.propertyFilter.keys().toSet(); | ||
52 | ResultSet resultSet; | ||
53 | const bool initialQuery = (baseRevision == 1); | ||
54 | if (initialQuery) { | ||
55 | Trace() << "Initial result set update"; | ||
56 | resultSet = loadInitialResultSet(query, transaction, remainingFilters); | ||
57 | } else { | ||
58 | //TODO fallback in case the old revision is no longer available to clear + redo complete initial scan | ||
59 | Trace() << "Incremental result set update" << baseRevision; | ||
60 | resultSet = loadIncrementalResultSet(baseRevision, query, transaction, remainingFilters); | ||
61 | } | ||
62 | |||
63 | auto filter = [remainingFilters, query, baseRevision](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject) -> bool { | ||
64 | for (const auto &filterProperty : remainingFilters) { | ||
65 | //TODO implement other comparison operators than equality | ||
66 | if (domainObject->getProperty(filterProperty) != query.propertyFilter.value(filterProperty)) { | ||
67 | return false; | ||
68 | } | ||
69 | } | ||
70 | return true; | ||
71 | }; | ||
72 | |||
73 | return filteredSet(resultSet, filter, transaction, initialQuery); | ||
74 | } | ||
diff --git a/common/entitystorage.h b/common/entitystorage.h deleted file mode 100644 index 8e73083..0000000 --- a/common/entitystorage.h +++ /dev/null | |||
@@ -1,126 +0,0 @@ | |||
1 | /* | ||
2 | * Copyright (C) 2014 Christian Mollekopf <chrigi_1@fastmail.fm> | ||
3 | * | ||
4 | * This program is free software; you can redistribute it and/or modify | ||
5 | * it under the terms of the GNU General Public License as published by | ||
6 | * the Free Software Foundation; either version 2 of the License, or | ||
7 | * (at your option) any later version. | ||
8 | * | ||
9 | * This program is distributed in the hope that it will be useful, | ||
10 | * but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
11 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||
12 | * GNU General Public License for more details. | ||
13 | * | ||
14 | * You should have received a copy of the GNU General Public License | ||
15 | * along with this program; if not, write to the | ||
16 | * Free Software Foundation, Inc., | ||
17 | * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. | ||
18 | */ | ||
19 | #pragma once | ||
20 | |||
21 | #include <QByteArray> | ||
22 | |||
23 | #include "query.h" | ||
24 | #include "domainadaptor.h" | ||
25 | #include "entitybuffer.h" | ||
26 | #include "log.h" | ||
27 | #include "storage.h" | ||
28 | #include "resultset.h" | ||
29 | #include "resultprovider.h" | ||
30 | #include "definitions.h" | ||
31 | |||
32 | /** | ||
33 | * Wraps storage, entity adaptor factory and indexes into one. | ||
34 | * | ||
35 | */ | ||
36 | class EntityStorageBase | ||
37 | { | ||
38 | public: | ||
39 | typedef std::function<ResultSet (const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters)> InitialResultLoader; | ||
40 | typedef std::function<ResultSet (qint64 baseRevision, const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters)> IncrementalResultLoader; | ||
41 | typedef std::function<void(const Akonadi2::Storage::Transaction &transaction, const QByteArray &key, const std::function<void(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &, Akonadi2::Operation)> &resultCallback)> EntityReader; | ||
42 | |||
43 | /** | ||
44 | * Returns the initial result set that still needs to be filtered. | ||
45 | * | ||
46 | * To make this efficient indexes should be chosen that are as selective as possible. | ||
47 | */ | ||
48 | InitialResultLoader loadInitialResultSet; | ||
49 | /** | ||
50 | * Returns the incremental result set that still needs to be filtered. | ||
51 | */ | ||
52 | IncrementalResultLoader loadIncrementalResultSet; | ||
53 | |||
54 | /** | ||
55 | * Loads a single entity by uid from storage. | ||
56 | */ | ||
57 | EntityReader readEntity; | ||
58 | |||
59 | protected: | ||
60 | EntityStorageBase(const QByteArray &instanceIdentifier) | ||
61 | : mResourceInstanceIdentifier(instanceIdentifier) | ||
62 | { | ||
63 | |||
64 | } | ||
65 | |||
66 | virtual Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr copy(const Akonadi2::ApplicationDomain::ApplicationDomainType &) = 0; | ||
67 | |||
68 | ResultSet getResultSet(const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, qint64 baseRevision); | ||
69 | |||
70 | QByteArray mResourceInstanceIdentifier; | ||
71 | |||
72 | private: | ||
73 | ResultSet filteredSet(const ResultSet &resultSet, const std::function<bool(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> &filter, const Akonadi2::Storage::Transaction &transaction, bool isInitialQuery); | ||
74 | }; | ||
75 | |||
76 | template<typename DomainType> | ||
77 | class EntityStorage : public EntityStorageBase | ||
78 | { | ||
79 | |||
80 | public: | ||
81 | |||
82 | EntityStorage(const QByteArray &instanceIdentifier) | ||
83 | : EntityStorageBase(instanceIdentifier) | ||
84 | { | ||
85 | } | ||
86 | |||
87 | protected: | ||
88 | Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr copy(const Akonadi2::ApplicationDomain::ApplicationDomainType &object) Q_DECL_OVERRIDE | ||
89 | { | ||
90 | return Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<DomainType>(object); | ||
91 | } | ||
92 | |||
93 | public: | ||
94 | |||
95 | virtual qint64 read(const Akonadi2::Query &query, qint64 baseRevision, const QSharedPointer<Akonadi2::ResultProvider<typename DomainType::Ptr> > &resultProvider) | ||
96 | { | ||
97 | Akonadi2::Storage storage(Akonadi2::storageLocation(), mResourceInstanceIdentifier); | ||
98 | storage.setDefaultErrorHandler([](const Akonadi2::Storage::Error &error) { | ||
99 | Warning() << "Error during query: " << error.store << error.message; | ||
100 | }); | ||
101 | |||
102 | auto transaction = storage.createTransaction(Akonadi2::Storage::ReadOnly); | ||
103 | |||
104 | Log() << "Querying" << baseRevision << Akonadi2::Storage::maxRevision(transaction); | ||
105 | auto resultSet = getResultSet(query, transaction, baseRevision); | ||
106 | while(resultSet.next([this, resultProvider](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &value, Akonadi2::Operation operation) -> bool { | ||
107 | switch (operation) { | ||
108 | case Akonadi2::Operation_Creation: | ||
109 | Trace() << "Got creation"; | ||
110 | resultProvider->add(copy(*value).template staticCast<DomainType>()); | ||
111 | break; | ||
112 | case Akonadi2::Operation_Modification: | ||
113 | Trace() << "Got modification"; | ||
114 | resultProvider->modify(copy(*value).template staticCast<DomainType>()); | ||
115 | break; | ||
116 | case Akonadi2::Operation_Removal: | ||
117 | Trace() << "Got removal"; | ||
118 | resultProvider->remove(copy(*value).template staticCast<DomainType>()); | ||
119 | break; | ||
120 | } | ||
121 | return true; | ||
122 | })){}; | ||
123 | return Akonadi2::Storage::maxRevision(transaction); | ||
124 | } | ||
125 | |||
126 | }; | ||
diff --git a/common/facade.cpp b/common/facade.cpp index e51b32a..1d6b9a7 100644 --- a/common/facade.cpp +++ b/common/facade.cpp | |||
@@ -1,5 +1,5 @@ | |||
1 | /* | 1 | /* |
2 | * Copyright (C) 2015 Christian Mollekopf <chrigi_1@fastmail.fm> | 2 | * Copyright (C) 2015 Christian Mollekopf <chrigi_1@fastmail.fm> |
3 | * | 3 | * |
4 | * This program is free software; you can redistribute it and/or modify | 4 | * This program is free software; you can redistribute it and/or modify |
5 | * it under the terms of the GNU General Public License as published by | 5 | * it under the terms of the GNU General Public License as published by |
@@ -18,3 +18,82 @@ | |||
18 | */ | 18 | */ |
19 | 19 | ||
20 | #include "facade.h" | 20 | #include "facade.h" |
21 | |||
22 | #include "commands.h" | ||
23 | #include "log.h" | ||
24 | #include "storage.h" | ||
25 | #include "definitions.h" | ||
26 | #include "domainadaptor.h" | ||
27 | #include "queryrunner.h" | ||
28 | |||
29 | using namespace Akonadi2; | ||
30 | |||
31 | |||
32 | template<class DomainType> | ||
33 | GenericFacade<DomainType>::GenericFacade(const QByteArray &resourceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &adaptorFactory , const QSharedPointer<Akonadi2::ResourceAccessInterface> resourceAccess) | ||
34 | : Akonadi2::StoreFacade<DomainType>(), | ||
35 | mResourceAccess(resourceAccess), | ||
36 | mDomainTypeAdaptorFactory(adaptorFactory), | ||
37 | mResourceInstanceIdentifier(resourceIdentifier) | ||
38 | { | ||
39 | if (!mResourceAccess) { | ||
40 | mResourceAccess = QSharedPointer<Akonadi2::ResourceAccess>::create(resourceIdentifier); | ||
41 | } | ||
42 | } | ||
43 | |||
44 | template<class DomainType> | ||
45 | GenericFacade<DomainType>::~GenericFacade() | ||
46 | { | ||
47 | } | ||
48 | |||
49 | template<class DomainType> | ||
50 | QByteArray GenericFacade<DomainType>::bufferTypeForDomainType() | ||
51 | { | ||
52 | //We happen to have a one to one mapping | ||
53 | return Akonadi2::ApplicationDomain::getTypeName<DomainType>(); | ||
54 | } | ||
55 | |||
56 | template<class DomainType> | ||
57 | KAsync::Job<void> GenericFacade<DomainType>::create(const DomainType &domainObject) | ||
58 | { | ||
59 | if (!mDomainTypeAdaptorFactory) { | ||
60 | Warning() << "No domain type adaptor factory available"; | ||
61 | return KAsync::error<void>(); | ||
62 | } | ||
63 | flatbuffers::FlatBufferBuilder entityFbb; | ||
64 | mDomainTypeAdaptorFactory->createBuffer(domainObject, entityFbb); | ||
65 | return mResourceAccess->sendCreateCommand(bufferTypeForDomainType(), QByteArray::fromRawData(reinterpret_cast<const char*>(entityFbb.GetBufferPointer()), entityFbb.GetSize())); | ||
66 | } | ||
67 | |||
68 | template<class DomainType> | ||
69 | KAsync::Job<void> GenericFacade<DomainType>::modify(const DomainType &domainObject) | ||
70 | { | ||
71 | if (!mDomainTypeAdaptorFactory) { | ||
72 | Warning() << "No domain type adaptor factory available"; | ||
73 | return KAsync::error<void>(); | ||
74 | } | ||
75 | flatbuffers::FlatBufferBuilder entityFbb; | ||
76 | mDomainTypeAdaptorFactory->createBuffer(domainObject, entityFbb); | ||
77 | return mResourceAccess->sendModifyCommand(domainObject.identifier(), domainObject.revision(), bufferTypeForDomainType(), QByteArrayList(), QByteArray::fromRawData(reinterpret_cast<const char*>(entityFbb.GetBufferPointer()), entityFbb.GetSize())); | ||
78 | } | ||
79 | |||
80 | template<class DomainType> | ||
81 | KAsync::Job<void> GenericFacade<DomainType>::remove(const DomainType &domainObject) | ||
82 | { | ||
83 | return mResourceAccess->sendDeleteCommand(domainObject.identifier(), domainObject.revision(), bufferTypeForDomainType()); | ||
84 | } | ||
85 | |||
86 | template<class DomainType> | ||
87 | QPair<KAsync::Job<void>, typename ResultEmitter<typename DomainType::Ptr>::Ptr> GenericFacade<DomainType>::load(const Akonadi2::Query &query) | ||
88 | { | ||
89 | //The runner lives for the lifetime of the query | ||
90 | auto runner = new QueryRunner<DomainType>(query, mResourceAccess, mResourceInstanceIdentifier, mDomainTypeAdaptorFactory, bufferTypeForDomainType()); | ||
91 | return qMakePair(KAsync::null<void>(), runner->emitter()); | ||
92 | } | ||
93 | |||
94 | |||
95 | template class Akonadi2::GenericFacade<Akonadi2::ApplicationDomain::Folder>; | ||
96 | template class Akonadi2::GenericFacade<Akonadi2::ApplicationDomain::Mail>; | ||
97 | template class Akonadi2::GenericFacade<Akonadi2::ApplicationDomain::Event>; | ||
98 | |||
99 | #include "facade.moc" | ||
diff --git a/common/facade.h b/common/facade.h index 643ebec..de67e05 100644 --- a/common/facade.h +++ b/common/facade.h | |||
@@ -1,5 +1,5 @@ | |||
1 | /* | 1 | /* |
2 | * Copyright (C) 2014 Christian Mollekopf <chrigi_1@fastmail.fm> | 2 | * Copyright (C) 2014 Christian Mollekopf <chrigi_1@fastmail.fm> |
3 | * | 3 | * |
4 | * This program is free software; you can redistribute it and/or modify | 4 | * This program is free software; you can redistribute it and/or modify |
5 | * it under the terms of the GNU General Public License as published by | 5 | * it under the terms of the GNU General Public License as published by |
@@ -25,85 +25,12 @@ | |||
25 | #include <Async/Async> | 25 | #include <Async/Async> |
26 | 26 | ||
27 | #include "resourceaccess.h" | 27 | #include "resourceaccess.h" |
28 | #include "commands.h" | ||
29 | #include "domainadaptor.h" | ||
30 | #include "log.h" | ||
31 | #include "resultset.h" | 28 | #include "resultset.h" |
32 | #include "entitystorage.h" | 29 | #include "domaintypeadaptorfactoryinterface.h" |
33 | 30 | #include "storage.h" | |
34 | /** | ||
35 | * A QueryRunner runs a query and updates the corresponding result set. | ||
36 | * | ||
37 | * The lifetime of the QueryRunner is defined by the resut set (otherwise it's doing useless work), | ||
38 | * and by how long a result set must be updated. If the query is one off the runner dies after the execution, | ||
39 | * otherwise it lives on the react to changes and updates the corresponding result set. | ||
40 | * | ||
41 | * QueryRunner has to keep ResourceAccess alive in order to keep getting updates. | ||
42 | */ | ||
43 | class QueryRunner : public QObject | ||
44 | { | ||
45 | Q_OBJECT | ||
46 | public: | ||
47 | typedef std::function<KAsync::Job<qint64>(qint64 oldRevision)> QueryFunction; | ||
48 | |||
49 | QueryRunner(const Akonadi2::Query &query) : mLatestRevision(0) {}; | ||
50 | /** | ||
51 | * Starts query | ||
52 | */ | ||
53 | KAsync::Job<void> run(qint64 newRevision = 0) | ||
54 | { | ||
55 | //TODO: JOBAPI: that last empty .then should not be necessary | ||
56 | //TODO: remove newRevision | ||
57 | return queryFunction(mLatestRevision + 1).then<void, qint64>([this](qint64 revision) { | ||
58 | mLatestRevision = revision; | ||
59 | }).then<void>([](){}); | ||
60 | } | ||
61 | |||
62 | /** | ||
63 | * Set the query to run | ||
64 | */ | ||
65 | void setQuery(const QueryFunction &query) | ||
66 | { | ||
67 | queryFunction = query; | ||
68 | } | ||
69 | |||
70 | public slots: | ||
71 | /** | ||
72 | * Rerun query with new revision | ||
73 | */ | ||
74 | void revisionChanged(qint64 newRevision) | ||
75 | { | ||
76 | Trace() << "New revision: " << newRevision; | ||
77 | run(newRevision).exec(); | ||
78 | } | ||
79 | |||
80 | private: | ||
81 | QueryFunction queryFunction; | ||
82 | qint64 mLatestRevision; | ||
83 | }; | ||
84 | |||
85 | static inline ResultSet fullScan(const Akonadi2::Storage::Transaction &transaction, const QByteArray &bufferType) | ||
86 | { | ||
87 | //TODO use a result set with an iterator, to read values on demand | ||
88 | QVector<QByteArray> keys; | ||
89 | transaction.openDatabase(bufferType + ".main").scan(QByteArray(), [&](const QByteArray &key, const QByteArray &value) -> bool { | ||
90 | //Skip internals | ||
91 | if (Akonadi2::Storage::isInternalKey(key)) { | ||
92 | return true; | ||
93 | } | ||
94 | keys << Akonadi2::Storage::uidFromKey(key); | ||
95 | return true; | ||
96 | }, | ||
97 | [](const Akonadi2::Storage::Error &error) { | ||
98 | qWarning() << "Error during query: " << error.message; | ||
99 | }); | ||
100 | |||
101 | Trace() << "Full scan found " << keys.size() << " results"; | ||
102 | return ResultSet(keys); | ||
103 | } | ||
104 | |||
105 | 31 | ||
106 | namespace Akonadi2 { | 32 | namespace Akonadi2 { |
33 | |||
107 | /** | 34 | /** |
108 | * Default facade implementation for resources that are implemented in a separate process using the ResourceAccess class. | 35 | * Default facade implementation for resources that are implemented in a separate process using the ResourceAccess class. |
109 | * | 36 | * |
@@ -125,185 +52,18 @@ public: | |||
125 | * @param resourceIdentifier is the identifier of the resource instance | 52 | * @param resourceIdentifier is the identifier of the resource instance |
126 | * @param adaptorFactory is the adaptor factory used to generate the mappings from domain to resource types and vice versa | 53 | * @param adaptorFactory is the adaptor factory used to generate the mappings from domain to resource types and vice versa |
127 | */ | 54 | */ |
128 | GenericFacade(const QByteArray &resourceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &adaptorFactory = DomainTypeAdaptorFactoryInterface::Ptr(), const QSharedPointer<EntityStorage<DomainType> > storage = QSharedPointer<EntityStorage<DomainType> >(), const QSharedPointer<Akonadi2::ResourceAccessInterface> resourceAccess = QSharedPointer<Akonadi2::ResourceAccessInterface>()) | 55 | GenericFacade(const QByteArray &resourceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &adaptorFactory = DomainTypeAdaptorFactoryInterface::Ptr(), const QSharedPointer<Akonadi2::ResourceAccessInterface> resourceAccess = QSharedPointer<Akonadi2::ResourceAccessInterface>()); |
129 | : Akonadi2::StoreFacade<DomainType>(), | 56 | ~GenericFacade(); |
130 | mResourceAccess(resourceAccess), | ||
131 | mStorage(storage), | ||
132 | mDomainTypeAdaptorFactory(adaptorFactory), | ||
133 | mResourceInstanceIdentifier(resourceIdentifier) | ||
134 | { | ||
135 | if (!mResourceAccess) { | ||
136 | mResourceAccess = QSharedPointer<Akonadi2::ResourceAccess>::create(resourceIdentifier); | ||
137 | } | ||
138 | if (!mStorage) { | ||
139 | mStorage = QSharedPointer<EntityStorage<DomainType> >::create(resourceIdentifier); | ||
140 | const auto bufferType = bufferTypeForDomainType(); | ||
141 | |||
142 | mStorage->readEntity = [bufferType, this] (const Akonadi2::Storage::Transaction &transaction, const QByteArray &key, const std::function<void(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &, Akonadi2::Operation)> &resultCallback) | ||
143 | { | ||
144 | //This only works for a 1:1 mapping of resource to domain types. | ||
145 | //Not i.e. for tags that are stored as flags in each entity of an imap store. | ||
146 | //additional properties that don't have a 1:1 mapping (such as separately stored tags), | ||
147 | //could be added to the adaptor. | ||
148 | transaction.openDatabase(bufferType + ".main").findLatest(key, [=](const QByteArray &key, const QByteArray &value) -> bool { | ||
149 | Akonadi2::EntityBuffer buffer(value.data(), value.size()); | ||
150 | const Akonadi2::Entity &entity = buffer.entity(); | ||
151 | const auto metadataBuffer = Akonadi2::EntityBuffer::readBuffer<Akonadi2::Metadata>(entity.metadata()); | ||
152 | Q_ASSERT(metadataBuffer); | ||
153 | const qint64 revision = metadataBuffer ? metadataBuffer->revision() : -1; | ||
154 | resultCallback(DomainType::Ptr::create(mResourceInstanceIdentifier, Akonadi2::Storage::uidFromKey(key), revision, mDomainTypeAdaptorFactory->createAdaptor(entity)), metadataBuffer->operation()); | ||
155 | return false; | ||
156 | }, | ||
157 | [](const Akonadi2::Storage::Error &error) { | ||
158 | qWarning() << "Error during query: " << error.message; | ||
159 | }); | ||
160 | }; | ||
161 | |||
162 | mStorage->loadInitialResultSet = [bufferType, this] (const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters) -> ResultSet | ||
163 | { | ||
164 | QSet<QByteArray> appliedFilters; | ||
165 | auto resultSet = Akonadi2::ApplicationDomain::TypeImplementation<DomainType>::queryIndexes(query, mResourceInstanceIdentifier, appliedFilters, transaction); | ||
166 | remainingFilters = query.propertyFilter.keys().toSet() - appliedFilters; | ||
167 | |||
168 | //We do a full scan if there were no indexes available to create the initial set. | ||
169 | if (appliedFilters.isEmpty()) { | ||
170 | //TODO this should be replaced by an index lookup as well | ||
171 | return fullScan(transaction, bufferType); | ||
172 | } | ||
173 | return resultSet; | ||
174 | }; | ||
175 | |||
176 | mStorage->loadIncrementalResultSet = [bufferType, this] (qint64 baseRevision, const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters) -> ResultSet | ||
177 | { | ||
178 | auto revisionCounter = QSharedPointer<qint64>::create(baseRevision); | ||
179 | return ResultSet([bufferType, revisionCounter, &transaction, this]() -> QByteArray { | ||
180 | const qint64 topRevision = Akonadi2::Storage::maxRevision(transaction); | ||
181 | //Spit out the revision keys one by one. | ||
182 | while (*revisionCounter <= topRevision) { | ||
183 | const auto uid = Akonadi2::Storage::getUidFromRevision(transaction, *revisionCounter); | ||
184 | const auto type = Akonadi2::Storage::getTypeFromRevision(transaction, *revisionCounter); | ||
185 | Trace() << "Revision" << *revisionCounter << type << uid; | ||
186 | if (type != bufferType) { | ||
187 | //Skip revision | ||
188 | *revisionCounter += 1; | ||
189 | continue; | ||
190 | } | ||
191 | const auto key = Akonadi2::Storage::assembleKey(uid, *revisionCounter); | ||
192 | *revisionCounter += 1; | ||
193 | return key; | ||
194 | } | ||
195 | //We're done | ||
196 | return QByteArray(); | ||
197 | }); | ||
198 | }; | ||
199 | } | ||
200 | } | ||
201 | |||
202 | ~GenericFacade() | ||
203 | { | ||
204 | } | ||
205 | |||
206 | static QByteArray bufferTypeForDomainType() | ||
207 | { | ||
208 | //We happen to have a one to one mapping | ||
209 | return Akonadi2::ApplicationDomain::getTypeName<DomainType>(); | ||
210 | } | ||
211 | |||
212 | KAsync::Job<void> create(const DomainType &domainObject) Q_DECL_OVERRIDE | ||
213 | { | ||
214 | if (!mDomainTypeAdaptorFactory) { | ||
215 | Warning() << "No domain type adaptor factory available"; | ||
216 | return KAsync::error<void>(); | ||
217 | } | ||
218 | flatbuffers::FlatBufferBuilder entityFbb; | ||
219 | mDomainTypeAdaptorFactory->createBuffer(domainObject, entityFbb); | ||
220 | return mResourceAccess->sendCreateCommand(bufferTypeForDomainType(), QByteArray::fromRawData(reinterpret_cast<const char*>(entityFbb.GetBufferPointer()), entityFbb.GetSize())); | ||
221 | } | ||
222 | |||
223 | KAsync::Job<void> modify(const DomainType &domainObject) Q_DECL_OVERRIDE | ||
224 | { | ||
225 | if (!mDomainTypeAdaptorFactory) { | ||
226 | Warning() << "No domain type adaptor factory available"; | ||
227 | return KAsync::error<void>(); | ||
228 | } | ||
229 | flatbuffers::FlatBufferBuilder entityFbb; | ||
230 | mDomainTypeAdaptorFactory->createBuffer(domainObject, entityFbb); | ||
231 | return mResourceAccess->sendModifyCommand(domainObject.identifier(), domainObject.revision(), bufferTypeForDomainType(), QByteArrayList(), QByteArray::fromRawData(reinterpret_cast<const char*>(entityFbb.GetBufferPointer()), entityFbb.GetSize())); | ||
232 | } | ||
233 | |||
234 | KAsync::Job<void> remove(const DomainType &domainObject) Q_DECL_OVERRIDE | ||
235 | { | ||
236 | return mResourceAccess->sendDeleteCommand(domainObject.identifier(), domainObject.revision(), bufferTypeForDomainType()); | ||
237 | } | ||
238 | |||
239 | //TODO JOBAPI return job from sync continuation to execute it as subjob? | ||
240 | KAsync::Job<void> load(const Akonadi2::Query &query, const QSharedPointer<Akonadi2::ResultProvider<typename DomainType::Ptr> > &resultProvider) Q_DECL_OVERRIDE | ||
241 | { | ||
242 | auto runner = QSharedPointer<QueryRunner>::create(query); | ||
243 | QWeakPointer<Akonadi2::ResultProvider<typename DomainType::Ptr> > weakResultProvider = resultProvider; | ||
244 | runner->setQuery([this, weakResultProvider, query] (qint64 oldRevision) -> KAsync::Job<qint64> { | ||
245 | return KAsync::start<qint64>([this, weakResultProvider, query, oldRevision](KAsync::Future<qint64> &future) { | ||
246 | Trace() << "Executing query " << oldRevision; | ||
247 | auto resultProvider = weakResultProvider.toStrongRef(); | ||
248 | if (!resultProvider) { | ||
249 | Warning() << "Tried executing query after result provider is already gone"; | ||
250 | future.setError(0, QString()); | ||
251 | future.setFinished(); | ||
252 | return; | ||
253 | } | ||
254 | load(query, resultProvider, oldRevision).template then<void, qint64>([&future, this](qint64 queriedRevision) { | ||
255 | //TODO set revision in result provider? | ||
256 | //TODO update all existing results with new revision | ||
257 | mResourceAccess->sendRevisionReplayedCommand(queriedRevision); | ||
258 | future.setValue(queriedRevision); | ||
259 | future.setFinished(); | ||
260 | }).exec(); | ||
261 | }); | ||
262 | }); | ||
263 | |||
264 | //In case of a live query we keep the runner for as long alive as the result provider exists | ||
265 | if (query.liveQuery) { | ||
266 | resultProvider->setQueryRunner(runner); | ||
267 | //Ensure the connection is open, if it wasn't already opened | ||
268 | //TODO If we are not connected already, we have to check for the latest revision once connected, otherwise we could miss some updates | ||
269 | mResourceAccess->open(); | ||
270 | QObject::connect(mResourceAccess.data(), &Akonadi2::ResourceAccess::revisionChanged, runner.data(), &QueryRunner::revisionChanged); | ||
271 | } | ||
272 | |||
273 | //We have to capture the runner to keep it alive | ||
274 | return synchronizeResource(query).template then<void>([runner](KAsync::Future<void> &future) { | ||
275 | runner->run().then<void>([&future]() { | ||
276 | future.setFinished(); | ||
277 | }).exec(); | ||
278 | }, | ||
279 | [](int error, const QString &errorString) { | ||
280 | Warning() << "Error during sync " << error << errorString; | ||
281 | }); | ||
282 | } | ||
283 | |||
284 | private: | ||
285 | KAsync::Job<void> synchronizeResource(const Akonadi2::Query &query) | ||
286 | { | ||
287 | //TODO check if a sync is necessary | ||
288 | //TODO Only sync what was requested | ||
289 | //TODO timeout | ||
290 | if (query.syncOnDemand || query.processAll) { | ||
291 | return mResourceAccess->synchronizeResource(query.syncOnDemand, query.processAll); | ||
292 | } | ||
293 | return KAsync::null<void>(); | ||
294 | } | ||
295 | 57 | ||
296 | virtual KAsync::Job<qint64> load(const Akonadi2::Query &query, const QSharedPointer<Akonadi2::ResultProvider<typename DomainType::Ptr> > &resultProvider, qint64 oldRevision) | 58 | static QByteArray bufferTypeForDomainType(); |
297 | { | 59 | KAsync::Job<void> create(const DomainType &domainObject) Q_DECL_OVERRIDE; |
298 | return KAsync::start<qint64>([=]() -> qint64 { | 60 | KAsync::Job<void> modify(const DomainType &domainObject) Q_DECL_OVERRIDE; |
299 | return mStorage->read(query, oldRevision, resultProvider); | 61 | KAsync::Job<void> remove(const DomainType &domainObject) Q_DECL_OVERRIDE; |
300 | }); | 62 | QPair<KAsync::Job<void>, typename ResultEmitter<typename DomainType::Ptr>::Ptr> load(const Akonadi2::Query &query) Q_DECL_OVERRIDE; |
301 | } | ||
302 | 63 | ||
303 | protected: | 64 | protected: |
304 | //TODO use one resource access instance per application & per resource | 65 | //TODO use one resource access instance per application & per resource |
305 | QSharedPointer<Akonadi2::ResourceAccessInterface> mResourceAccess; | 66 | QSharedPointer<Akonadi2::ResourceAccessInterface> mResourceAccess; |
306 | QSharedPointer<EntityStorage<DomainType> > mStorage; | ||
307 | DomainTypeAdaptorFactoryInterface::Ptr mDomainTypeAdaptorFactory; | 67 | DomainTypeAdaptorFactoryInterface::Ptr mDomainTypeAdaptorFactory; |
308 | QByteArray mResourceInstanceIdentifier; | 68 | QByteArray mResourceInstanceIdentifier; |
309 | }; | 69 | }; |
diff --git a/common/facadeinterface.h b/common/facadeinterface.h index 3a38db8..318abf3 100644 --- a/common/facadeinterface.h +++ b/common/facadeinterface.h | |||
@@ -23,6 +23,7 @@ | |||
23 | #include <Async/Async> | 23 | #include <Async/Async> |
24 | #include <QByteArray> | 24 | #include <QByteArray> |
25 | #include <QSharedPointer> | 25 | #include <QSharedPointer> |
26 | #include <QPair> | ||
26 | #include "applicationdomaintype.h" | 27 | #include "applicationdomaintype.h" |
27 | #include "resultprovider.h" | 28 | #include "resultprovider.h" |
28 | 29 | ||
@@ -42,10 +43,32 @@ class StoreFacade { | |||
42 | public: | 43 | public: |
43 | virtual ~StoreFacade(){}; | 44 | virtual ~StoreFacade(){}; |
44 | QByteArray type() const { return ApplicationDomain::getTypeName<DomainType>(); } | 45 | QByteArray type() const { return ApplicationDomain::getTypeName<DomainType>(); } |
46 | |||
47 | /** | ||
48 | * Create an entity in the store. | ||
49 | * | ||
50 | * The job returns succefully once the task has been successfully placed in the queue | ||
51 | */ | ||
45 | virtual KAsync::Job<void> create(const DomainType &domainObject) = 0; | 52 | virtual KAsync::Job<void> create(const DomainType &domainObject) = 0; |
53 | |||
54 | /** | ||
55 | * Modify an entity in the store. | ||
56 | * | ||
57 | * The job returns succefully once the task has been successfully placed in the queue | ||
58 | */ | ||
46 | virtual KAsync::Job<void> modify(const DomainType &domainObject) = 0; | 59 | virtual KAsync::Job<void> modify(const DomainType &domainObject) = 0; |
60 | |||
61 | /** | ||
62 | * Remove an entity from the store. | ||
63 | * | ||
64 | * The job returns succefully once the task has been successfully placed in the queue | ||
65 | */ | ||
47 | virtual KAsync::Job<void> remove(const DomainType &domainObject) = 0; | 66 | virtual KAsync::Job<void> remove(const DomainType &domainObject) = 0; |
48 | virtual KAsync::Job<void> load(const Query &query, const QSharedPointer<Akonadi2::ResultProvider<typename DomainType::Ptr> > &resultProvider) = 0; | 67 | |
68 | /** | ||
69 | * Load entities from the store. | ||
70 | */ | ||
71 | virtual QPair<KAsync::Job<void>, typename Akonadi2::ResultEmitter<typename DomainType::Ptr>::Ptr > load(const Query &query) = 0; | ||
49 | }; | 72 | }; |
50 | 73 | ||
51 | template<class DomainType> | 74 | template<class DomainType> |
@@ -67,9 +90,9 @@ public: | |||
67 | return KAsync::error<void>(-1, "Failed to create a facade"); | 90 | return KAsync::error<void>(-1, "Failed to create a facade"); |
68 | } | 91 | } |
69 | 92 | ||
70 | KAsync::Job<void> load(const Query &query, const QSharedPointer<Akonadi2::ResultProvider<typename DomainType::Ptr> > &resultProvider) | 93 | QPair<KAsync::Job<void>, typename Akonadi2::ResultEmitter<typename DomainType::Ptr>::Ptr > load(const Query &query) |
71 | { | 94 | { |
72 | return KAsync::error<void>(-1, "Failed to create a facade"); | 95 | return qMakePair(KAsync::null<void>(), typename Akonadi2::ResultEmitter<typename DomainType::Ptr>::Ptr()); |
73 | } | 96 | } |
74 | }; | 97 | }; |
75 | 98 | ||
diff --git a/common/modelresult.cpp b/common/modelresult.cpp new file mode 100644 index 0000000..c7fcd49 --- /dev/null +++ b/common/modelresult.cpp | |||
@@ -0,0 +1,251 @@ | |||
1 | /* | ||
2 | * Copyright (C) 2014 Christian Mollekopf <chrigi_1@fastmail.fm> | ||
3 | * | ||
4 | * This library is free software; you can redistribute it and/or | ||
5 | * modify it under the terms of the GNU Lesser General Public | ||
6 | * License as published by the Free Software Foundation; either | ||
7 | * version 2.1 of the License, or (at your option) version 3, or any | ||
8 | * later version accepted by the membership of KDE e.V. (or its | ||
9 | * successor approved by the membership of KDE e.V.), which shall | ||
10 | * act as a proxy defined in Section 6 of version 3 of the license. | ||
11 | * | ||
12 | * This library is distributed in the hope that it will be useful, | ||
13 | * but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
14 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | ||
15 | * Lesser General Public License for more details. | ||
16 | * | ||
17 | * You should have received a copy of the GNU Lesser General Public | ||
18 | * License along with this library. If not, see <http://www.gnu.org/licenses/>. | ||
19 | */ | ||
20 | #include "modelresult.h" | ||
21 | |||
22 | #include <QDebug> | ||
23 | |||
24 | #include "domain/folder.h" | ||
25 | #include "log.h" | ||
26 | |||
27 | template<class T, class Ptr> | ||
28 | ModelResult<T, Ptr>::ModelResult(const Akonadi2::Query &query, const QList<QByteArray> &propertyColumns) | ||
29 | :QAbstractItemModel(), | ||
30 | mPropertyColumns(propertyColumns), | ||
31 | mQuery(query) | ||
32 | { | ||
33 | } | ||
34 | |||
35 | static qint64 getIdentifier(const QModelIndex &idx) | ||
36 | { | ||
37 | if (!idx.isValid()) { | ||
38 | return 0; | ||
39 | } | ||
40 | return idx.internalId(); | ||
41 | } | ||
42 | |||
43 | template<class T, class Ptr> | ||
44 | qint64 ModelResult<T, Ptr>::parentId(const Ptr &value) | ||
45 | { | ||
46 | if (!mQuery.parentProperty.isEmpty()) { | ||
47 | const auto property = value->getProperty(mQuery.parentProperty).toByteArray(); | ||
48 | if (!property.isEmpty()) { | ||
49 | return qHash(property); | ||
50 | } | ||
51 | } | ||
52 | return 0; | ||
53 | } | ||
54 | |||
55 | template<class T, class Ptr> | ||
56 | int ModelResult<T, Ptr>::rowCount(const QModelIndex &parent) const | ||
57 | { | ||
58 | return mTree[getIdentifier(parent)].size(); | ||
59 | } | ||
60 | |||
61 | template<class T, class Ptr> | ||
62 | int ModelResult<T, Ptr>::columnCount(const QModelIndex &parent) const | ||
63 | { | ||
64 | return mPropertyColumns.size(); | ||
65 | } | ||
66 | |||
67 | template<class T, class Ptr> | ||
68 | QVariant ModelResult<T, Ptr>::data(const QModelIndex &index, int role) const | ||
69 | { | ||
70 | if (role == DomainObjectRole) { | ||
71 | Q_ASSERT(mEntities.contains(index.internalId())); | ||
72 | return QVariant::fromValue(mEntities.value(index.internalId())); | ||
73 | } | ||
74 | if (role == ChildrenFetchedRole) { | ||
75 | return childrenFetched(index); | ||
76 | } | ||
77 | if (role == Qt::DisplayRole) { | ||
78 | if (index.column() < mPropertyColumns.size()) { | ||
79 | Q_ASSERT(mEntities.contains(index.internalId())); | ||
80 | auto entity = mEntities.value(index.internalId()); | ||
81 | return entity->getProperty(mPropertyColumns.at(index.column())).toString(); | ||
82 | } else { | ||
83 | return "No data available"; | ||
84 | } | ||
85 | } | ||
86 | return QVariant(); | ||
87 | } | ||
88 | |||
89 | template<class T, class Ptr> | ||
90 | QModelIndex ModelResult<T, Ptr>::index(int row, int column, const QModelIndex &parent) const | ||
91 | { | ||
92 | const auto id = getIdentifier(parent); | ||
93 | const auto childId = mTree.value(id).at(row); | ||
94 | return createIndex(row, column, childId); | ||
95 | } | ||
96 | |||
97 | template<class T, class Ptr> | ||
98 | QModelIndex ModelResult<T, Ptr>::createIndexFromId(const qint64 &id) const | ||
99 | { | ||
100 | if (id == 0) { | ||
101 | return QModelIndex(); | ||
102 | } | ||
103 | auto grandParentId = mParents.value(id, 0); | ||
104 | auto row = mTree.value(grandParentId).indexOf(id); | ||
105 | return createIndex(row, 0, id); | ||
106 | } | ||
107 | |||
108 | template<class T, class Ptr> | ||
109 | QModelIndex ModelResult<T, Ptr>::parent(const QModelIndex &index) const | ||
110 | { | ||
111 | auto id = getIdentifier(index); | ||
112 | auto parentId = mParents.value(id); | ||
113 | return createIndexFromId(parentId); | ||
114 | } | ||
115 | |||
116 | template<class T, class Ptr> | ||
117 | bool ModelResult<T, Ptr>::hasChildren(const QModelIndex &parent) const | ||
118 | { | ||
119 | if (mQuery.parentProperty.isEmpty() && parent.isValid()) { | ||
120 | return false; | ||
121 | } | ||
122 | return QAbstractItemModel::hasChildren(parent); | ||
123 | } | ||
124 | |||
125 | template<class T, class Ptr> | ||
126 | bool ModelResult<T, Ptr>::canFetchMore(const QModelIndex &parent) const | ||
127 | { | ||
128 | return !mEntityChildrenFetched.contains(parent.internalId()); | ||
129 | } | ||
130 | |||
131 | template<class T, class Ptr> | ||
132 | void ModelResult<T, Ptr>::fetchMore(const QModelIndex &parent) | ||
133 | { | ||
134 | fetchEntities(parent); | ||
135 | } | ||
136 | |||
137 | template<class T, class Ptr> | ||
138 | void ModelResult<T, Ptr>::add(const Ptr &value) | ||
139 | { | ||
140 | const auto childId = qHash(value->identifier()); | ||
141 | const auto id = parentId(value); | ||
142 | //Ignore updates we get before the initial fetch is done | ||
143 | if (!mEntityChildrenFetched.contains(id)) { | ||
144 | return; | ||
145 | } | ||
146 | auto parent = createIndexFromId(id); | ||
147 | // qDebug() << "Added entity " << childId << value->identifier() << id; | ||
148 | const auto keys = mTree[id]; | ||
149 | int index = 0; | ||
150 | for (; index < keys.size(); index++) { | ||
151 | if (childId < keys.at(index)) { | ||
152 | break; | ||
153 | } | ||
154 | } | ||
155 | if (mEntities.contains(childId)) { | ||
156 | Warning() << "Entity already in model " << value->identifier(); | ||
157 | return; | ||
158 | } | ||
159 | // qDebug() << "Inserting rows " << index << parent; | ||
160 | beginInsertRows(parent, index, index); | ||
161 | mEntities.insert(childId, value); | ||
162 | mTree[id].insert(index, childId); | ||
163 | mParents.insert(childId, id); | ||
164 | endInsertRows(); | ||
165 | // qDebug() << "Inserted rows " << mTree[id].size(); | ||
166 | } | ||
167 | |||
168 | |||
169 | template<class T, class Ptr> | ||
170 | void ModelResult<T, Ptr>::remove(const Ptr &value) | ||
171 | { | ||
172 | auto childId = qHash(value->identifier()); | ||
173 | auto id = parentId(value); | ||
174 | auto parent = createIndexFromId(id); | ||
175 | // qDebug() << "Removed entity" << childId; | ||
176 | auto index = mTree[id].indexOf(qHash(value->identifier())); | ||
177 | beginRemoveRows(parent, index, index); | ||
178 | mEntities.remove(childId); | ||
179 | mTree[id].removeAll(childId); | ||
180 | mParents.remove(childId); | ||
181 | //TODO remove children | ||
182 | endRemoveRows(); | ||
183 | } | ||
184 | |||
185 | template<class T, class Ptr> | ||
186 | void ModelResult<T, Ptr>::fetchEntities(const QModelIndex &parent) | ||
187 | { | ||
188 | const auto id = getIdentifier(parent); | ||
189 | mEntityChildrenFetched.insert(id); | ||
190 | Trace() << "Loading child entities"; | ||
191 | loadEntities(parent.data(DomainObjectRole).template value<Ptr>()); | ||
192 | } | ||
193 | |||
194 | template<class T, class Ptr> | ||
195 | void ModelResult<T, Ptr>::setFetcher(const std::function<void(const Ptr &parent)> &fetcher) | ||
196 | { | ||
197 | Trace() << "Setting fetcher"; | ||
198 | loadEntities = fetcher; | ||
199 | } | ||
200 | |||
201 | template<class T, class Ptr> | ||
202 | void ModelResult<T, Ptr>::setEmitter(const typename Akonadi2::ResultEmitter<Ptr>::Ptr &emitter) | ||
203 | { | ||
204 | setFetcher(emitter->mFetcher); | ||
205 | emitter->onAdded([this](const Ptr &value) { | ||
206 | this->add(value); | ||
207 | }); | ||
208 | emitter->onModified([this](const Ptr &value) { | ||
209 | this->modify(value); | ||
210 | }); | ||
211 | emitter->onRemoved([this](const Ptr &value) { | ||
212 | this->remove(value); | ||
213 | }); | ||
214 | emitter->onInitialResultSetComplete([this](const Ptr &parent) { | ||
215 | const qint64 parentId = parent ? qHash(parent->identifier()) : 0; | ||
216 | const auto parentIndex = createIndexFromId(parentId); | ||
217 | mEntityChildrenFetchComplete.insert(parentId); | ||
218 | emit dataChanged(parentIndex, parentIndex, QVector<int>() << ChildrenFetchedRole); | ||
219 | }); | ||
220 | mEmitter = emitter; | ||
221 | } | ||
222 | |||
223 | template<class T, class Ptr> | ||
224 | bool ModelResult<T, Ptr>::childrenFetched(const QModelIndex &index) const | ||
225 | { | ||
226 | return mEntityChildrenFetchComplete.contains(getIdentifier(index)); | ||
227 | } | ||
228 | |||
229 | template<class T, class Ptr> | ||
230 | void ModelResult<T, Ptr>::modify(const Ptr &value) | ||
231 | { | ||
232 | auto childId = qHash(value->identifier()); | ||
233 | auto id = parentId(value); | ||
234 | //Ignore updates we get before the initial fetch is done | ||
235 | if (!mEntityChildrenFetched.contains(id)) { | ||
236 | return; | ||
237 | } | ||
238 | auto parent = createIndexFromId(id); | ||
239 | // qDebug() << "Modified entity" << childId; | ||
240 | auto i = mTree[id].indexOf(childId); | ||
241 | mEntities.remove(childId); | ||
242 | mEntities.insert(childId, value); | ||
243 | //TODO check for change of parents | ||
244 | auto idx = index(i, 0, parent); | ||
245 | emit dataChanged(idx, idx); | ||
246 | } | ||
247 | |||
248 | template class ModelResult<Akonadi2::ApplicationDomain::Folder, Akonadi2::ApplicationDomain::Folder::Ptr>; | ||
249 | template class ModelResult<Akonadi2::ApplicationDomain::Mail, Akonadi2::ApplicationDomain::Mail::Ptr>; | ||
250 | template class ModelResult<Akonadi2::ApplicationDomain::Event, Akonadi2::ApplicationDomain::Event::Ptr>; | ||
251 | template class ModelResult<Akonadi2::ApplicationDomain::AkonadiResource, Akonadi2::ApplicationDomain::AkonadiResource::Ptr>; | ||
diff --git a/common/modelresult.h b/common/modelresult.h new file mode 100644 index 0000000..700064b --- /dev/null +++ b/common/modelresult.h | |||
@@ -0,0 +1,78 @@ | |||
1 | /* | ||
2 | * Copyright (C) 2014 Christian Mollekopf <chrigi_1@fastmail.fm> | ||
3 | * | ||
4 | * This library is free software; you can redistribute it and/or | ||
5 | * modify it under the terms of the GNU Lesser General Public | ||
6 | * License as published by the Free Software Foundation; either | ||
7 | * version 2.1 of the License, or (at your option) version 3, or any | ||
8 | * later version accepted by the membership of KDE e.V. (or its | ||
9 | * successor approved by the membership of KDE e.V.), which shall | ||
10 | * act as a proxy defined in Section 6 of version 3 of the license. | ||
11 | * | ||
12 | * This library is distributed in the hope that it will be useful, | ||
13 | * but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
14 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | ||
15 | * Lesser General Public License for more details. | ||
16 | * | ||
17 | * You should have received a copy of the GNU Lesser General Public | ||
18 | * License along with this library. If not, see <http://www.gnu.org/licenses/>. | ||
19 | */ | ||
20 | |||
21 | #pragma once | ||
22 | |||
23 | #include <QAbstractItemModel> | ||
24 | #include <QModelIndex> | ||
25 | #include <QDebug> | ||
26 | #include <QSharedPointer> | ||
27 | #include <functional> | ||
28 | #include "query.h" | ||
29 | #include "resultprovider.h" | ||
30 | |||
31 | template<class T, class Ptr> | ||
32 | class ModelResult : public QAbstractItemModel | ||
33 | { | ||
34 | public: | ||
35 | enum Roles { | ||
36 | DomainObjectRole = Qt::UserRole + 1, | ||
37 | ChildrenFetchedRole | ||
38 | }; | ||
39 | |||
40 | ModelResult(const Akonadi2::Query &query, const QList<QByteArray> &propertyColumns); | ||
41 | |||
42 | void setEmitter(const typename Akonadi2::ResultEmitter<Ptr>::Ptr &); | ||
43 | |||
44 | int rowCount(const QModelIndex &parent = QModelIndex()) const; | ||
45 | int columnCount(const QModelIndex &parent = QModelIndex()) const; | ||
46 | QVariant data(const QModelIndex &index, int role = Qt::DisplayRole) const; | ||
47 | QModelIndex index(int row, int column, const QModelIndex & parent = QModelIndex()) const; | ||
48 | QModelIndex parent(const QModelIndex &index) const; | ||
49 | bool hasChildren(const QModelIndex &parent = QModelIndex()) const; | ||
50 | |||
51 | bool canFetchMore(const QModelIndex &parent) const; | ||
52 | void fetchMore(const QModelIndex &parent); | ||
53 | |||
54 | void add(const Ptr &value); | ||
55 | void modify(const Ptr &value); | ||
56 | void remove(const Ptr &value); | ||
57 | |||
58 | void setFetcher(const std::function<void(const Ptr &parent)> &fetcher); | ||
59 | |||
60 | bool childrenFetched(const QModelIndex &) const; | ||
61 | |||
62 | private: | ||
63 | qint64 parentId(const Ptr &value); | ||
64 | QModelIndex createIndexFromId(const qint64 &id) const; | ||
65 | void fetchEntities(const QModelIndex &parent); | ||
66 | |||
67 | //TODO we should be able to directly use T as index, with an appropriate hash function, and thus have a QMap<T, T> and QList<T> | ||
68 | QMap<qint64 /* entity id */, Ptr> mEntities; | ||
69 | QMap<qint64 /* parent entity id */, QList<qint64> /* child entity id*/> mTree; | ||
70 | QMap<qint64 /* child entity id */, qint64 /* parent entity id*/> mParents; | ||
71 | QSet<qint64 /* entity id */> mEntityChildrenFetched; | ||
72 | QSet<qint64 /* entity id */> mEntityChildrenFetchComplete; | ||
73 | QList<QByteArray> mPropertyColumns; | ||
74 | Akonadi2::Query mQuery; | ||
75 | std::function<void(const Ptr &)> loadEntities; | ||
76 | typename Akonadi2::ResultEmitter<Ptr>::Ptr mEmitter; | ||
77 | }; | ||
78 | |||
diff --git a/common/query.h b/common/query.h index 0cad9fb..5313fa9 100644 --- a/common/query.h +++ b/common/query.h | |||
@@ -53,6 +53,7 @@ public: | |||
53 | QHash<QByteArray, QVariant> propertyFilter; | 53 | QHash<QByteArray, QVariant> propertyFilter; |
54 | //Properties to retrieve | 54 | //Properties to retrieve |
55 | QSet<QByteArray> requestedProperties; | 55 | QSet<QByteArray> requestedProperties; |
56 | QByteArray parentProperty; | ||
56 | bool syncOnDemand; | 57 | bool syncOnDemand; |
57 | bool processAll; | 58 | bool processAll; |
58 | //If live query is false, this query will not continuously be updated | 59 | //If live query is false, this query will not continuously be updated |
diff --git a/common/queryrunner.cpp b/common/queryrunner.cpp new file mode 100644 index 0000000..e365cfc --- /dev/null +++ b/common/queryrunner.cpp | |||
@@ -0,0 +1,312 @@ | |||
1 | /* | ||
2 | Copyright (c) 2015 Christian Mollekopf <mollekopf@kolabsys.com> | ||
3 | |||
4 | This library is free software; you can redistribute it and/or modify it | ||
5 | under the terms of the GNU Library General Public License as published by | ||
6 | the Free Software Foundation; either version 2 of the License, or (at your | ||
7 | option) any later version. | ||
8 | |||
9 | This library is distributed in the hope that it will be useful, but WITHOUT | ||
10 | ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or | ||
11 | FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public | ||
12 | License for more details. | ||
13 | |||
14 | You should have received a copy of the GNU Library General Public License | ||
15 | along with this library; see the file COPYING.LIB. If not, write to the | ||
16 | Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA | ||
17 | 02110-1301, USA. | ||
18 | */ | ||
19 | #include "queryrunner.h" | ||
20 | |||
21 | #include <QtConcurrent/QtConcurrentRun> | ||
22 | #include <QFuture> | ||
23 | #include <QFutureWatcher> | ||
24 | #include <QTime> | ||
25 | #include "commands.h" | ||
26 | #include "log.h" | ||
27 | #include "storage.h" | ||
28 | #include "definitions.h" | ||
29 | #include "domainadaptor.h" | ||
30 | |||
31 | using namespace Akonadi2; | ||
32 | |||
33 | static inline ResultSet fullScan(const Akonadi2::Storage::Transaction &transaction, const QByteArray &bufferType) | ||
34 | { | ||
35 | //TODO use a result set with an iterator, to read values on demand | ||
36 | QVector<QByteArray> keys; | ||
37 | transaction.openDatabase(bufferType + ".main").scan(QByteArray(), [&](const QByteArray &key, const QByteArray &value) -> bool { | ||
38 | //Skip internals | ||
39 | if (Akonadi2::Storage::isInternalKey(key)) { | ||
40 | return true; | ||
41 | } | ||
42 | keys << Akonadi2::Storage::uidFromKey(key); | ||
43 | return true; | ||
44 | }, | ||
45 | [](const Akonadi2::Storage::Error &error) { | ||
46 | qWarning() << "Error during query: " << error.message; | ||
47 | }); | ||
48 | |||
49 | Trace() << "Full scan on " << bufferType << " found " << keys.size() << " results"; | ||
50 | return ResultSet(keys); | ||
51 | } | ||
52 | |||
53 | template<class DomainType> | ||
54 | QueryRunner<DomainType>::QueryRunner(const Akonadi2::Query &query, const Akonadi2::ResourceAccessInterface::Ptr &resourceAccess, const QByteArray &instanceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &factory, const QByteArray &bufferType) | ||
55 | : QueryRunnerBase(), | ||
56 | mResourceAccess(resourceAccess), | ||
57 | mResultProvider(new ResultProvider<typename DomainType::Ptr>), | ||
58 | mDomainTypeAdaptorFactory(factory), | ||
59 | mQuery(query), | ||
60 | mResourceInstanceIdentifier(instanceIdentifier), | ||
61 | mBufferType(bufferType) | ||
62 | { | ||
63 | Trace() << "Starting query"; | ||
64 | //We delegate loading of initial data to the result provider, os it can decide for itself what it needs to load. | ||
65 | mResultProvider->setFetcher([this, query](const typename DomainType::Ptr &parent) { | ||
66 | Trace() << "Running fetcher"; | ||
67 | |||
68 | // auto watcher = new QFutureWatcher<qint64>; | ||
69 | // QObject::connect(watcher, &QFutureWatcher::finished, [](qint64 newRevision) { | ||
70 | // mResourceAccess->sendRevisionReplayedCommand(newRevision); | ||
71 | // }); | ||
72 | // auto future = QtConcurrent::run([&resultProvider]() -> qint64 { | ||
73 | // const qint64 newRevision = executeInitialQuery(query, parent, resultProvider); | ||
74 | // return newRevision; | ||
75 | // }); | ||
76 | // watcher->setFuture(future); | ||
77 | const qint64 newRevision = executeInitialQuery(query, parent, *mResultProvider); | ||
78 | mResourceAccess->sendRevisionReplayedCommand(newRevision); | ||
79 | }); | ||
80 | |||
81 | |||
82 | //In case of a live query we keep the runner for as long alive as the result provider exists | ||
83 | if (query.liveQuery) { | ||
84 | //Incremental updates are always loaded directly, leaving it up to the result to discard the changes if they are not interesting | ||
85 | setQuery([this, query] () -> KAsync::Job<void> { | ||
86 | return KAsync::start<void>([this, query](KAsync::Future<void> &future) { | ||
87 | //TODO execute in thread | ||
88 | const qint64 newRevision = executeIncrementalQuery(query, *mResultProvider); | ||
89 | mResourceAccess->sendRevisionReplayedCommand(newRevision); | ||
90 | future.setFinished(); | ||
91 | }); | ||
92 | }); | ||
93 | //Ensure the connection is open, if it wasn't already opened | ||
94 | //TODO If we are not connected already, we have to check for the latest revision once connected, otherwise we could miss some updates | ||
95 | mResourceAccess->open(); | ||
96 | QObject::connect(mResourceAccess.data(), &Akonadi2::ResourceAccess::revisionChanged, this, &QueryRunner::revisionChanged); | ||
97 | } | ||
98 | } | ||
99 | |||
100 | template<class DomainType> | ||
101 | QueryRunner<DomainType>::~QueryRunner() | ||
102 | { | ||
103 | Trace() << "Stopped query"; | ||
104 | } | ||
105 | |||
106 | template<class DomainType> | ||
107 | typename Akonadi2::ResultEmitter<typename DomainType::Ptr>::Ptr QueryRunner<DomainType>::emitter() | ||
108 | { | ||
109 | return mResultProvider->emitter(); | ||
110 | } | ||
111 | |||
112 | //TODO move into result provider? | ||
113 | template<class DomainType> | ||
114 | void QueryRunner<DomainType>::replaySet(ResultSet &resultSet, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider) | ||
115 | { | ||
116 | // Trace() << "Replay set"; | ||
117 | int counter = 0; | ||
118 | while (resultSet.next([&resultProvider, &counter](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &value, Akonadi2::Operation operation) -> bool { | ||
119 | counter++; | ||
120 | switch (operation) { | ||
121 | case Akonadi2::Operation_Creation: | ||
122 | // Trace() << "Got creation"; | ||
123 | resultProvider.add(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<DomainType>(*value).template staticCast<DomainType>()); | ||
124 | break; | ||
125 | case Akonadi2::Operation_Modification: | ||
126 | // Trace() << "Got modification"; | ||
127 | resultProvider.modify(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<DomainType>(*value).template staticCast<DomainType>()); | ||
128 | break; | ||
129 | case Akonadi2::Operation_Removal: | ||
130 | // Trace() << "Got removal"; | ||
131 | resultProvider.remove(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<DomainType>(*value).template staticCast<DomainType>()); | ||
132 | break; | ||
133 | } | ||
134 | return true; | ||
135 | })){}; | ||
136 | Trace() << "Replayed " << counter << " results"; | ||
137 | } | ||
138 | |||
139 | template<class DomainType> | ||
140 | void QueryRunner<DomainType>::readEntity(const Akonadi2::Storage::NamedDatabase &db, const QByteArray &key, const std::function<void(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &, Akonadi2::Operation)> &resultCallback) | ||
141 | { | ||
142 | //This only works for a 1:1 mapping of resource to domain types. | ||
143 | //Not i.e. for tags that are stored as flags in each entity of an imap store. | ||
144 | //additional properties that don't have a 1:1 mapping (such as separately stored tags), | ||
145 | //could be added to the adaptor. | ||
146 | db.findLatest(key, [=](const QByteArray &key, const QByteArray &value) -> bool { | ||
147 | Akonadi2::EntityBuffer buffer(value.data(), value.size()); | ||
148 | const Akonadi2::Entity &entity = buffer.entity(); | ||
149 | const auto metadataBuffer = Akonadi2::EntityBuffer::readBuffer<Akonadi2::Metadata>(entity.metadata()); | ||
150 | Q_ASSERT(metadataBuffer); | ||
151 | const qint64 revision = metadataBuffer ? metadataBuffer->revision() : -1; | ||
152 | resultCallback(DomainType::Ptr::create(mResourceInstanceIdentifier, Akonadi2::Storage::uidFromKey(key), revision, mDomainTypeAdaptorFactory->createAdaptor(entity)), metadataBuffer->operation()); | ||
153 | return false; | ||
154 | }, | ||
155 | [](const Akonadi2::Storage::Error &error) { | ||
156 | qWarning() << "Error during query: " << error.message; | ||
157 | }); | ||
158 | } | ||
159 | |||
160 | template<class DomainType> | ||
161 | ResultSet QueryRunner<DomainType>::loadInitialResultSet(const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters) | ||
162 | { | ||
163 | QSet<QByteArray> appliedFilters; | ||
164 | auto resultSet = Akonadi2::ApplicationDomain::TypeImplementation<DomainType>::queryIndexes(query, mResourceInstanceIdentifier, appliedFilters, transaction); | ||
165 | remainingFilters = query.propertyFilter.keys().toSet() - appliedFilters; | ||
166 | |||
167 | //We do a full scan if there were no indexes available to create the initial set. | ||
168 | if (appliedFilters.isEmpty()) { | ||
169 | //TODO this should be replaced by an index lookup as well | ||
170 | resultSet = fullScan(transaction, mBufferType); | ||
171 | } | ||
172 | return resultSet; | ||
173 | } | ||
174 | |||
175 | template<class DomainType> | ||
176 | ResultSet QueryRunner<DomainType>::loadIncrementalResultSet(qint64 baseRevision, const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters) | ||
177 | { | ||
178 | const auto bufferType = mBufferType; | ||
179 | auto revisionCounter = QSharedPointer<qint64>::create(baseRevision); | ||
180 | remainingFilters = query.propertyFilter.keys().toSet(); | ||
181 | return ResultSet([bufferType, revisionCounter, &transaction]() -> QByteArray { | ||
182 | const qint64 topRevision = Akonadi2::Storage::maxRevision(transaction); | ||
183 | //Spit out the revision keys one by one. | ||
184 | while (*revisionCounter <= topRevision) { | ||
185 | const auto uid = Akonadi2::Storage::getUidFromRevision(transaction, *revisionCounter); | ||
186 | const auto type = Akonadi2::Storage::getTypeFromRevision(transaction, *revisionCounter); | ||
187 | // Trace() << "Revision" << *revisionCounter << type << uid; | ||
188 | if (type != bufferType) { | ||
189 | //Skip revision | ||
190 | *revisionCounter += 1; | ||
191 | continue; | ||
192 | } | ||
193 | const auto key = Akonadi2::Storage::assembleKey(uid, *revisionCounter); | ||
194 | *revisionCounter += 1; | ||
195 | return key; | ||
196 | } | ||
197 | Trace() << "Finished reading incremental result set:" << *revisionCounter; | ||
198 | //We're done | ||
199 | return QByteArray(); | ||
200 | }); | ||
201 | } | ||
202 | |||
203 | template<class DomainType> | ||
204 | ResultSet QueryRunner<DomainType>::filterSet(const ResultSet &resultSet, const std::function<bool(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> &filter, const Akonadi2::Storage::NamedDatabase &db, bool initialQuery) | ||
205 | { | ||
206 | auto resultSetPtr = QSharedPointer<ResultSet>::create(resultSet); | ||
207 | |||
208 | //Read through the source values and return whatever matches the filter | ||
209 | std::function<bool(std::function<void(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &, Akonadi2::Operation)>)> generator = [this, resultSetPtr, &db, filter, initialQuery](std::function<void(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &, Akonadi2::Operation)> callback) -> bool { | ||
210 | while (resultSetPtr->next()) { | ||
211 | //readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess) | ||
212 | readEntity(db, resultSetPtr->id(), [this, filter, callback, initialQuery](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject, Akonadi2::Operation operation) { | ||
213 | //Always remove removals, they probably don't match due to non-available properties | ||
214 | if (filter(domainObject) || operation == Akonadi2::Operation_Removal) { | ||
215 | if (initialQuery) { | ||
216 | //We're not interested in removals during the initial query | ||
217 | if (operation != Akonadi2::Operation_Removal) { | ||
218 | callback(domainObject, Akonadi2::Operation_Creation); | ||
219 | } | ||
220 | } else { | ||
221 | callback(domainObject, operation); | ||
222 | } | ||
223 | } | ||
224 | }); | ||
225 | } | ||
226 | return false; | ||
227 | }; | ||
228 | return ResultSet(generator); | ||
229 | } | ||
230 | |||
231 | |||
232 | template<class DomainType> | ||
233 | std::function<bool(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> QueryRunner<DomainType>::getFilter(const QSet<QByteArray> remainingFilters, const Akonadi2::Query &query) | ||
234 | { | ||
235 | return [remainingFilters, query](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject) -> bool { | ||
236 | for (const auto &filterProperty : remainingFilters) { | ||
237 | const auto property = domainObject->getProperty(filterProperty); | ||
238 | if (property.isValid()) { | ||
239 | //TODO implement other comparison operators than equality | ||
240 | if (property != query.propertyFilter.value(filterProperty)) { | ||
241 | Trace() << "Filtering entity due to property mismatch: " << domainObject->getProperty(filterProperty); | ||
242 | return false; | ||
243 | } | ||
244 | } else { | ||
245 | Warning() << "Ignored property filter because value is invalid: " << filterProperty; | ||
246 | } | ||
247 | } | ||
248 | return true; | ||
249 | }; | ||
250 | } | ||
251 | |||
252 | template<class DomainType> | ||
253 | qint64 QueryRunner<DomainType>::load(const Akonadi2::Query &query, const std::function<ResultSet(Akonadi2::Storage::Transaction &, QSet<QByteArray> &)> &baseSetRetriever, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, bool initialQuery) | ||
254 | { | ||
255 | Akonadi2::Storage storage(Akonadi2::storageLocation(), mResourceInstanceIdentifier); | ||
256 | storage.setDefaultErrorHandler([](const Akonadi2::Storage::Error &error) { | ||
257 | Warning() << "Error during query: " << error.store << error.message; | ||
258 | }); | ||
259 | auto transaction = storage.createTransaction(Akonadi2::Storage::ReadOnly); | ||
260 | auto db = transaction.openDatabase(mBufferType + ".main"); | ||
261 | |||
262 | QSet<QByteArray> remainingFilters; | ||
263 | auto resultSet = baseSetRetriever(transaction, remainingFilters); | ||
264 | auto filteredSet = filterSet(resultSet, getFilter(remainingFilters, query), db, initialQuery); | ||
265 | replaySet(filteredSet, resultProvider); | ||
266 | resultProvider.setRevision(Akonadi2::Storage::maxRevision(transaction)); | ||
267 | return Akonadi2::Storage::maxRevision(transaction); | ||
268 | } | ||
269 | |||
270 | |||
271 | template<class DomainType> | ||
272 | qint64 QueryRunner<DomainType>::executeIncrementalQuery(const Akonadi2::Query &query, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider) | ||
273 | { | ||
274 | QTime time; | ||
275 | time.start(); | ||
276 | |||
277 | const qint64 baseRevision = resultProvider.revision() + 1; | ||
278 | Trace() << "Running incremental query " << baseRevision; | ||
279 | auto revision = load(query, [&](Akonadi2::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters) -> ResultSet { | ||
280 | return loadIncrementalResultSet(baseRevision, query, transaction, remainingFilters); | ||
281 | }, resultProvider, false); | ||
282 | Trace() << "Incremental query took: " << time.elapsed() << " ms"; | ||
283 | return revision; | ||
284 | } | ||
285 | |||
286 | template<class DomainType> | ||
287 | qint64 QueryRunner<DomainType>::executeInitialQuery(const Akonadi2::Query &query, const typename DomainType::Ptr &parent, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider) | ||
288 | { | ||
289 | QTime time; | ||
290 | time.start(); | ||
291 | |||
292 | auto modifiedQuery = query; | ||
293 | if (!query.parentProperty.isEmpty()) { | ||
294 | if (parent) { | ||
295 | Trace() << "Running initial query for parent:" << parent->identifier(); | ||
296 | modifiedQuery.propertyFilter.insert(query.parentProperty, parent->identifier()); | ||
297 | } else { | ||
298 | Trace() << "Running initial query for toplevel"; | ||
299 | modifiedQuery.propertyFilter.insert(query.parentProperty, QVariant()); | ||
300 | } | ||
301 | } | ||
302 | auto revision = load(modifiedQuery, [&](Akonadi2::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters) -> ResultSet { | ||
303 | return loadInitialResultSet(modifiedQuery, transaction, remainingFilters); | ||
304 | }, resultProvider, true); | ||
305 | Trace() << "Initial query took: " << time.elapsed() << " ms"; | ||
306 | resultProvider.initialResultSetComplete(parent); | ||
307 | return revision; | ||
308 | } | ||
309 | |||
310 | template class QueryRunner<Akonadi2::ApplicationDomain::Folder>; | ||
311 | template class QueryRunner<Akonadi2::ApplicationDomain::Mail>; | ||
312 | template class QueryRunner<Akonadi2::ApplicationDomain::Event>; | ||
diff --git a/common/queryrunner.h b/common/queryrunner.h new file mode 100644 index 0000000..c918dcb --- /dev/null +++ b/common/queryrunner.h | |||
@@ -0,0 +1,108 @@ | |||
1 | /* | ||
2 | Copyright (c) 2015 Christian Mollekopf <mollekopf@kolabsys.com> | ||
3 | |||
4 | This library is free software; you can redistribute it and/or modify it | ||
5 | under the terms of the GNU Library General Public License as published by | ||
6 | the Free Software Foundation; either version 2 of the License, or (at your | ||
7 | option) any later version. | ||
8 | |||
9 | This library is distributed in the hope that it will be useful, but WITHOUT | ||
10 | ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or | ||
11 | FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public | ||
12 | License for more details. | ||
13 | |||
14 | You should have received a copy of the GNU Library General Public License | ||
15 | along with this library; see the file COPYING.LIB. If not, write to the | ||
16 | Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA | ||
17 | 02110-1301, USA. | ||
18 | */ | ||
19 | |||
20 | #pragma once | ||
21 | |||
22 | #include <QObject> | ||
23 | #include "facadeinterface.h" | ||
24 | #include "resourceaccess.h" | ||
25 | #include "resultprovider.h" | ||
26 | #include "domaintypeadaptorfactoryinterface.h" | ||
27 | #include "storage.h" | ||
28 | #include "query.h" | ||
29 | |||
30 | /** | ||
31 | * A QueryRunner runs a query and updates the corresponding result set. | ||
32 | * | ||
33 | * The lifetime of the QueryRunner is defined by the resut set (otherwise it's doing useless work), | ||
34 | * and by how long a result set must be updated. If the query is one off the runner dies after the execution, | ||
35 | * otherwise it lives on the react to changes and updates the corresponding result set. | ||
36 | * | ||
37 | * QueryRunner has to keep ResourceAccess alive in order to keep getting updates. | ||
38 | */ | ||
39 | |||
40 | class QueryRunnerBase : public QObject | ||
41 | { | ||
42 | Q_OBJECT | ||
43 | protected: | ||
44 | typedef std::function<KAsync::Job<void>()> QueryFunction; | ||
45 | |||
46 | /** | ||
47 | * Set the query to run | ||
48 | */ | ||
49 | void setQuery(const QueryFunction &query) | ||
50 | { | ||
51 | queryFunction = query; | ||
52 | } | ||
53 | |||
54 | |||
55 | protected slots: | ||
56 | /** | ||
57 | * Rerun query with new revision | ||
58 | */ | ||
59 | void revisionChanged(qint64 newRevision) | ||
60 | { | ||
61 | Trace() << "New revision: " << newRevision; | ||
62 | run().exec(); | ||
63 | } | ||
64 | |||
65 | private: | ||
66 | /** | ||
67 | * Starts query | ||
68 | */ | ||
69 | KAsync::Job<void> run(qint64 newRevision = 0) | ||
70 | { | ||
71 | return queryFunction(); | ||
72 | } | ||
73 | |||
74 | QueryFunction queryFunction; | ||
75 | }; | ||
76 | |||
77 | template<typename DomainType> | ||
78 | class QueryRunner : public QueryRunnerBase | ||
79 | { | ||
80 | public: | ||
81 | QueryRunner(const Akonadi2::Query &query, const Akonadi2::ResourceAccessInterface::Ptr &, const QByteArray &instanceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &, const QByteArray &bufferType); | ||
82 | virtual ~QueryRunner(); | ||
83 | |||
84 | typename Akonadi2::ResultEmitter<typename DomainType::Ptr>::Ptr emitter(); | ||
85 | |||
86 | private: | ||
87 | static void replaySet(ResultSet &resultSet, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider); | ||
88 | |||
89 | void readEntity(const Akonadi2::Storage::NamedDatabase &db, const QByteArray &key, const std::function<void(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &, Akonadi2::Operation)> &resultCallback); | ||
90 | |||
91 | ResultSet loadInitialResultSet(const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters); | ||
92 | ResultSet loadIncrementalResultSet(qint64 baseRevision, const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters); | ||
93 | |||
94 | ResultSet filterSet(const ResultSet &resultSet, const std::function<bool(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> &filter, const Akonadi2::Storage::NamedDatabase &db, bool initialQuery); | ||
95 | std::function<bool(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> getFilter(const QSet<QByteArray> remainingFilters, const Akonadi2::Query &query); | ||
96 | qint64 load(const Akonadi2::Query &query, const std::function<ResultSet(Akonadi2::Storage::Transaction &, QSet<QByteArray> &)> &baseSetRetriever, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, bool initialQuery); | ||
97 | qint64 executeIncrementalQuery(const Akonadi2::Query &query, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider); | ||
98 | qint64 executeInitialQuery(const Akonadi2::Query &query, const typename DomainType::Ptr &parent, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider); | ||
99 | |||
100 | private: | ||
101 | QSharedPointer<Akonadi2::ResultProvider<typename DomainType::Ptr> > mResultProvider; | ||
102 | QSharedPointer<Akonadi2::ResourceAccessInterface> mResourceAccess; | ||
103 | DomainTypeAdaptorFactoryInterface::Ptr mDomainTypeAdaptorFactory; | ||
104 | QByteArray mResourceInstanceIdentifier; | ||
105 | QByteArray mBufferType; | ||
106 | Akonadi2::Query mQuery; | ||
107 | }; | ||
108 | |||
diff --git a/common/resourceaccess.cpp b/common/resourceaccess.cpp index bd9e2c9..be25533 100644 --- a/common/resourceaccess.cpp +++ b/common/resourceaccess.cpp | |||
@@ -282,6 +282,7 @@ KAsync::Job<void> ResourceAccess::sendCommand(int commandId, flatbuffers::FlatB | |||
282 | 282 | ||
283 | KAsync::Job<void> ResourceAccess::synchronizeResource(bool sourceSync, bool localSync) | 283 | KAsync::Job<void> ResourceAccess::synchronizeResource(bool sourceSync, bool localSync) |
284 | { | 284 | { |
285 | Trace() << "Sending synchronize command: " << sourceSync << localSync; | ||
285 | flatbuffers::FlatBufferBuilder fbb; | 286 | flatbuffers::FlatBufferBuilder fbb; |
286 | auto command = Akonadi2::CreateSynchronize(fbb, sourceSync, localSync); | 287 | auto command = Akonadi2::CreateSynchronize(fbb, sourceSync, localSync); |
287 | Akonadi2::FinishSynchronizeBuffer(fbb, command); | 288 | Akonadi2::FinishSynchronizeBuffer(fbb, command); |
@@ -340,7 +341,7 @@ KAsync::Job<void> ResourceAccess::sendRevisionReplayedCommand(qint64 revision) | |||
340 | void ResourceAccess::open() | 341 | void ResourceAccess::open() |
341 | { | 342 | { |
342 | if (d->socket && d->socket->isValid()) { | 343 | if (d->socket && d->socket->isValid()) { |
343 | log("Socket valid, so not opening again"); | 344 | // Trace() << "Socket valid, so not opening again"; |
344 | return; | 345 | return; |
345 | } | 346 | } |
346 | if (d->openingSocket) { | 347 | if (d->openingSocket) { |
diff --git a/common/resourceaccess.h b/common/resourceaccess.h index 8e27054..e87a1f7 100644 --- a/common/resourceaccess.h +++ b/common/resourceaccess.h | |||
@@ -37,6 +37,8 @@ class ResourceAccessInterface : public QObject | |||
37 | { | 37 | { |
38 | Q_OBJECT | 38 | Q_OBJECT |
39 | public: | 39 | public: |
40 | typedef QSharedPointer<ResourceAccessInterface> Ptr; | ||
41 | |||
40 | ResourceAccessInterface() {} | 42 | ResourceAccessInterface() {} |
41 | virtual ~ResourceAccessInterface() {} | 43 | virtual ~ResourceAccessInterface() {} |
42 | virtual KAsync::Job<void> sendCommand(int commandId) = 0; | 44 | virtual KAsync::Job<void> sendCommand(int commandId) = 0; |
diff --git a/common/resourcefacade.cpp b/common/resourcefacade.cpp index 54185f8..6510c90 100644 --- a/common/resourcefacade.cpp +++ b/common/resourcefacade.cpp | |||
@@ -54,9 +54,15 @@ KAsync::Job<void> ResourceFacade::remove(const Akonadi2::ApplicationDomain::Akon | |||
54 | }); | 54 | }); |
55 | } | 55 | } |
56 | 56 | ||
57 | KAsync::Job<void> ResourceFacade::load(const Akonadi2::Query &query, const QSharedPointer<Akonadi2::ResultProvider<typename Akonadi2::ApplicationDomain::AkonadiResource::Ptr> > &resultProvider) | 57 | QPair<KAsync::Job<void>, typename Akonadi2::ResultEmitter<Akonadi2::ApplicationDomain::AkonadiResource::Ptr>::Ptr > ResourceFacade::load(const Akonadi2::Query &query) |
58 | { | 58 | { |
59 | return KAsync::start<void>([query, resultProvider]() { | 59 | auto resultProvider = new Akonadi2::ResultProvider<typename Akonadi2::ApplicationDomain::AkonadiResource::Ptr>(); |
60 | auto emitter = resultProvider->emitter(); | ||
61 | resultProvider->setFetcher([](const QSharedPointer<Akonadi2::ApplicationDomain::AkonadiResource> &) {}); | ||
62 | resultProvider->onDone([resultProvider]() { | ||
63 | delete resultProvider; | ||
64 | }); | ||
65 | auto job = KAsync::start<void>([query, resultProvider]() { | ||
60 | const auto configuredResources = ResourceConfig::getResources(); | 66 | const auto configuredResources = ResourceConfig::getResources(); |
61 | for (const auto &res : configuredResources.keys()) { | 67 | for (const auto &res : configuredResources.keys()) { |
62 | const auto type = configuredResources.value(res); | 68 | const auto type = configuredResources.value(res); |
@@ -68,8 +74,9 @@ KAsync::Job<void> ResourceFacade::load(const Akonadi2::Query &query, const QShar | |||
68 | } | 74 | } |
69 | } | 75 | } |
70 | //TODO initialResultSetComplete should be implicit | 76 | //TODO initialResultSetComplete should be implicit |
71 | resultProvider->initialResultSetComplete(); | 77 | resultProvider->initialResultSetComplete(Akonadi2::ApplicationDomain::AkonadiResource::Ptr()); |
72 | resultProvider->complete(); | 78 | resultProvider->complete(); |
73 | }); | 79 | }); |
80 | return qMakePair(job, emitter); | ||
74 | } | 81 | } |
75 | 82 | ||
diff --git a/common/resourcefacade.h b/common/resourcefacade.h index 437ff75..38e0c0e 100644 --- a/common/resourcefacade.h +++ b/common/resourcefacade.h | |||
@@ -37,5 +37,6 @@ public: | |||
37 | KAsync::Job<void> create(const Akonadi2::ApplicationDomain::AkonadiResource &resource) Q_DECL_OVERRIDE; | 37 | KAsync::Job<void> create(const Akonadi2::ApplicationDomain::AkonadiResource &resource) Q_DECL_OVERRIDE; |
38 | KAsync::Job<void> modify(const Akonadi2::ApplicationDomain::AkonadiResource &resource) Q_DECL_OVERRIDE; | 38 | KAsync::Job<void> modify(const Akonadi2::ApplicationDomain::AkonadiResource &resource) Q_DECL_OVERRIDE; |
39 | KAsync::Job<void> remove(const Akonadi2::ApplicationDomain::AkonadiResource &resource) Q_DECL_OVERRIDE; | 39 | KAsync::Job<void> remove(const Akonadi2::ApplicationDomain::AkonadiResource &resource) Q_DECL_OVERRIDE; |
40 | KAsync::Job<void> load(const Akonadi2::Query &query, const QSharedPointer<Akonadi2::ResultProvider<typename Akonadi2::ApplicationDomain::AkonadiResource::Ptr> > &resultProvider) Q_DECL_OVERRIDE; | 40 | QPair<KAsync::Job<void>, typename Akonadi2::ResultEmitter<Akonadi2::ApplicationDomain::AkonadiResource::Ptr>::Ptr > load(const Akonadi2::Query &query) Q_DECL_OVERRIDE; |
41 | }; | 41 | }; |
42 | |||
diff --git a/common/resultprovider.h b/common/resultprovider.h index bc03152..d50f3f6 100644 --- a/common/resultprovider.h +++ b/common/resultprovider.h | |||
@@ -20,9 +20,12 @@ | |||
20 | 20 | ||
21 | #pragma once | 21 | #pragma once |
22 | 22 | ||
23 | #include <QThread> | ||
23 | #include <functional> | 24 | #include <functional> |
24 | #include <memory> | 25 | #include <memory> |
25 | #include "threadboundary.h" | 26 | #include "threadboundary.h" |
27 | #include "resultset.h" | ||
28 | #include "log.h" | ||
26 | 29 | ||
27 | using namespace async; | 30 | using namespace async; |
28 | 31 | ||
@@ -34,11 +37,43 @@ namespace Akonadi2 { | |||
34 | template<class T> | 37 | template<class T> |
35 | class ResultEmitter; | 38 | class ResultEmitter; |
36 | 39 | ||
40 | template<class T> | ||
41 | class ResultProviderInterface | ||
42 | { | ||
43 | public: | ||
44 | ResultProviderInterface() | ||
45 | : mRevision(0) | ||
46 | { | ||
47 | |||
48 | } | ||
49 | |||
50 | virtual void add(const T &value) = 0; | ||
51 | virtual void modify(const T &value) = 0; | ||
52 | virtual void remove(const T &value) = 0; | ||
53 | virtual void initialResultSetComplete(const T &parent) = 0; | ||
54 | virtual void complete() = 0; | ||
55 | virtual void clear() = 0; | ||
56 | virtual void setFetcher(const std::function<void(const T &parent)> &fetcher) = 0; | ||
57 | |||
58 | void setRevision(qint64 revision) | ||
59 | { | ||
60 | mRevision = revision; | ||
61 | } | ||
62 | |||
63 | qint64 revision() const | ||
64 | { | ||
65 | return mRevision; | ||
66 | } | ||
67 | |||
68 | private: | ||
69 | qint64 mRevision; | ||
70 | }; | ||
71 | |||
37 | /* | 72 | /* |
38 | * The promise side for the result emitter | 73 | * The promise side for the result emitter |
39 | */ | 74 | */ |
40 | template<class T> | 75 | template<class T> |
41 | class ResultProvider { | 76 | class ResultProvider : public ResultProviderInterface<T> { |
42 | private: | 77 | private: |
43 | void callInMainThreadOnEmitter(void (ResultEmitter<T>::*f)()) | 78 | void callInMainThreadOnEmitter(void (ResultEmitter<T>::*f)()) |
44 | { | 79 | { |
@@ -69,6 +104,12 @@ private: | |||
69 | } | 104 | } |
70 | 105 | ||
71 | public: | 106 | public: |
107 | typedef QSharedPointer<ResultProvider<T> > Ptr; | ||
108 | |||
109 | virtual ~ResultProvider() | ||
110 | { | ||
111 | } | ||
112 | |||
72 | //Called from worker thread | 113 | //Called from worker thread |
73 | void add(const T &value) | 114 | void add(const T &value) |
74 | { | 115 | { |
@@ -103,9 +144,15 @@ public: | |||
103 | }); | 144 | }); |
104 | } | 145 | } |
105 | 146 | ||
106 | void initialResultSetComplete() | 147 | void initialResultSetComplete(const T &parent) |
107 | { | 148 | { |
108 | callInMainThreadOnEmitter(&ResultEmitter<T>::initialResultSetComplete); | 149 | //Because I don't know how to use bind |
150 | auto weakEmitter = mResultEmitter; | ||
151 | callInMainThreadOnEmitter([weakEmitter, parent](){ | ||
152 | if (auto strongRef = weakEmitter.toStrongRef()) { | ||
153 | strongRef->initialResultSetComplete(parent); | ||
154 | } | ||
155 | }); | ||
109 | } | 156 | } |
110 | 157 | ||
111 | //Called from worker thread | 158 | //Called from worker thread |
@@ -126,30 +173,16 @@ public: | |||
126 | //We have to go over a separate var and return that, otherwise we'd delete the emitter immediately again | 173 | //We have to go over a separate var and return that, otherwise we'd delete the emitter immediately again |
127 | auto sharedPtr = QSharedPointer<ResultEmitter<T> >(new ResultEmitter<T>, [this](ResultEmitter<T> *emitter){ mThreadBoundary->callInMainThread([this]() {done();}); delete emitter; }); | 174 | auto sharedPtr = QSharedPointer<ResultEmitter<T> >(new ResultEmitter<T>, [this](ResultEmitter<T> *emitter){ mThreadBoundary->callInMainThread([this]() {done();}); delete emitter; }); |
128 | mResultEmitter = sharedPtr; | 175 | mResultEmitter = sharedPtr; |
176 | sharedPtr->setFetcher([this](const T &parent) { | ||
177 | Q_ASSERT(mFetcher); | ||
178 | mFetcher(parent); | ||
179 | }); | ||
129 | return sharedPtr; | 180 | return sharedPtr; |
130 | } | 181 | } |
131 | 182 | ||
132 | return mResultEmitter.toStrongRef(); | 183 | return mResultEmitter.toStrongRef(); |
133 | } | 184 | } |
134 | 185 | ||
135 | /** | ||
136 | * For lifetimemanagement only. | ||
137 | * We keep the runner alive as long as the result provider exists. | ||
138 | */ | ||
139 | void setQueryRunner(const QSharedPointer<QObject> &runner) | ||
140 | { | ||
141 | mQueryRunner = runner; | ||
142 | } | ||
143 | |||
144 | /** | ||
145 | * For lifetimemanagement only. | ||
146 | * We keep the runner alive as long as the result provider exists. | ||
147 | */ | ||
148 | void setFacade(const std::shared_ptr<void> &facade) | ||
149 | { | ||
150 | mFacade = facade; | ||
151 | } | ||
152 | |||
153 | void onDone(const std::function<void()> &callback) | 186 | void onDone(const std::function<void()> &callback) |
154 | { | 187 | { |
155 | mThreadBoundary = QSharedPointer<ThreadBoundary>::create(); | 188 | mThreadBoundary = QSharedPointer<ThreadBoundary>::create(); |
@@ -162,21 +195,27 @@ public: | |||
162 | return mResultEmitter.toStrongRef().isNull(); | 195 | return mResultEmitter.toStrongRef().isNull(); |
163 | } | 196 | } |
164 | 197 | ||
198 | void setFetcher(const std::function<void(const T &parent)> &fetcher) | ||
199 | { | ||
200 | mFetcher = fetcher; | ||
201 | } | ||
202 | |||
165 | private: | 203 | private: |
166 | void done() | 204 | void done() |
167 | { | 205 | { |
168 | qWarning() << "done"; | 206 | qWarning() << "done"; |
169 | if (mOnDoneCallback) { | 207 | if (mOnDoneCallback) { |
170 | mOnDoneCallback(); | 208 | auto callback = mOnDoneCallback; |
171 | mOnDoneCallback = std::function<void()>(); | 209 | mOnDoneCallback = std::function<void()>(); |
210 | //This may delete this object | ||
211 | callback(); | ||
172 | } | 212 | } |
173 | } | 213 | } |
174 | 214 | ||
175 | QWeakPointer<ResultEmitter<T> > mResultEmitter; | 215 | QWeakPointer<ResultEmitter<T> > mResultEmitter; |
176 | QSharedPointer<QObject> mQueryRunner; | ||
177 | std::shared_ptr<void> mFacade; | ||
178 | std::function<void()> mOnDoneCallback; | 216 | std::function<void()> mOnDoneCallback; |
179 | QSharedPointer<ThreadBoundary> mThreadBoundary; | 217 | QSharedPointer<ThreadBoundary> mThreadBoundary; |
218 | std::function<void(const T &parent)> mFetcher; | ||
180 | }; | 219 | }; |
181 | 220 | ||
182 | /* | 221 | /* |
@@ -194,6 +233,8 @@ private: | |||
194 | template<class DomainType> | 233 | template<class DomainType> |
195 | class ResultEmitter { | 234 | class ResultEmitter { |
196 | public: | 235 | public: |
236 | typedef QSharedPointer<ResultEmitter<DomainType> > Ptr; | ||
237 | |||
197 | void onAdded(const std::function<void(const DomainType&)> &handler) | 238 | void onAdded(const std::function<void(const DomainType&)> &handler) |
198 | { | 239 | { |
199 | addHandler = handler; | 240 | addHandler = handler; |
@@ -209,7 +250,7 @@ public: | |||
209 | removeHandler = handler; | 250 | removeHandler = handler; |
210 | } | 251 | } |
211 | 252 | ||
212 | void onInitialResultSetComplete(const std::function<void(void)> &handler) | 253 | void onInitialResultSetComplete(const std::function<void(const DomainType&)> &handler) |
213 | { | 254 | { |
214 | initialResultSetCompleteHandler = handler; | 255 | initialResultSetCompleteHandler = handler; |
215 | } | 256 | } |
@@ -239,28 +280,41 @@ public: | |||
239 | removeHandler(value); | 280 | removeHandler(value); |
240 | } | 281 | } |
241 | 282 | ||
242 | void initialResultSetComplete() | 283 | void initialResultSetComplete(const DomainType &parent) |
243 | { | 284 | { |
244 | initialResultSetCompleteHandler(); | 285 | if (initialResultSetCompleteHandler) { |
286 | initialResultSetCompleteHandler(parent); | ||
287 | } | ||
245 | } | 288 | } |
246 | 289 | ||
247 | void complete() | 290 | void complete() |
248 | { | 291 | { |
249 | completeHandler(); | 292 | if (completeHandler) { |
293 | completeHandler(); | ||
294 | } | ||
250 | } | 295 | } |
251 | 296 | ||
252 | void clear() | 297 | void clear() |
253 | { | 298 | { |
254 | clearHandler(); | 299 | if (clearHandler) { |
300 | clearHandler(); | ||
301 | } | ||
255 | } | 302 | } |
256 | 303 | ||
304 | void setFetcher(const std::function<void(const DomainType &parent)> &fetcher) | ||
305 | { | ||
306 | mFetcher = fetcher; | ||
307 | } | ||
308 | |||
309 | std::function<void(const DomainType &parent)> mFetcher; | ||
310 | |||
257 | private: | 311 | private: |
258 | friend class ResultProvider<DomainType>; | 312 | friend class ResultProvider<DomainType>; |
259 | 313 | ||
260 | std::function<void(const DomainType&)> addHandler; | 314 | std::function<void(const DomainType&)> addHandler; |
261 | std::function<void(const DomainType&)> modifyHandler; | 315 | std::function<void(const DomainType&)> modifyHandler; |
262 | std::function<void(const DomainType&)> removeHandler; | 316 | std::function<void(const DomainType&)> removeHandler; |
263 | std::function<void(void)> initialResultSetCompleteHandler; | 317 | std::function<void(const DomainType&)> initialResultSetCompleteHandler; |
264 | std::function<void(void)> completeHandler; | 318 | std::function<void(void)> completeHandler; |
265 | std::function<void(void)> clearHandler; | 319 | std::function<void(void)> clearHandler; |
266 | ThreadBoundary mThreadBoundary; | 320 | ThreadBoundary mThreadBoundary; |
diff --git a/common/storage_lmdb.cpp b/common/storage_lmdb.cpp index 1516e69..a247e38 100644 --- a/common/storage_lmdb.cpp +++ b/common/storage_lmdb.cpp | |||
@@ -253,22 +253,69 @@ int Storage::NamedDatabase::scan(const QByteArray &k, | |||
253 | 253 | ||
254 | return numberOfRetrievedValues; | 254 | return numberOfRetrievedValues; |
255 | } | 255 | } |
256 | void Storage::NamedDatabase::findLatest(const QByteArray &uid, | 256 | |
257 | void Storage::NamedDatabase::findLatest(const QByteArray &k, | ||
257 | const std::function<void(const QByteArray &key, const QByteArray &value)> &resultHandler, | 258 | const std::function<void(const QByteArray &key, const QByteArray &value)> &resultHandler, |
258 | const std::function<void(const Storage::Error &error)> &errorHandler) const | 259 | const std::function<void(const Storage::Error &error)> &errorHandler) const |
259 | { | 260 | { |
260 | QByteArray latestKey; | 261 | if (!d || !d->transaction) { |
261 | scan(uid, [&](const QByteArray &key, const QByteArray &value) -> bool { | 262 | //Not an error. We rely on this to read nothing from non-existing databases. |
262 | latestKey = key; | 263 | return; |
263 | return true; | 264 | } |
264 | }, | ||
265 | errorHandler, true); | ||
266 | 265 | ||
267 | scan(latestKey, [=](const QByteArray &key, const QByteArray &value) -> bool { | 266 | int rc; |
268 | resultHandler(key, value); | 267 | MDB_val key; |
269 | return false; | 268 | MDB_val data; |
270 | }, | 269 | MDB_cursor *cursor; |
271 | errorHandler); | 270 | |
271 | key.mv_data = (void*)k.constData(); | ||
272 | key.mv_size = k.size(); | ||
273 | |||
274 | rc = mdb_cursor_open(d->transaction, d->dbi, &cursor); | ||
275 | if (rc) { | ||
276 | Error error(d->name.toLatin1(), getErrorCode(rc), QByteArray("Error during mdb_cursor open: ") + QByteArray(mdb_strerror(rc))); | ||
277 | errorHandler ? errorHandler(error) : d->defaultErrorHandler(error); | ||
278 | return; | ||
279 | } | ||
280 | |||
281 | MDB_cursor_op op = MDB_SET_RANGE; | ||
282 | if ((rc = mdb_cursor_get(cursor, &key, &data, op)) == 0) { | ||
283 | //The first lookup will find a key that is equal or greather than our key | ||
284 | if (QByteArray::fromRawData((char*)key.mv_data, key.mv_size).startsWith(k)) { | ||
285 | bool advanced = false; | ||
286 | while (QByteArray::fromRawData((char*)key.mv_data, key.mv_size).startsWith(k)) { | ||
287 | advanced = true; | ||
288 | MDB_cursor_op nextOp = MDB_NEXT; | ||
289 | rc = mdb_cursor_get(cursor, &key, &data, nextOp); | ||
290 | if (rc) { | ||
291 | break; | ||
292 | } | ||
293 | } | ||
294 | if (advanced) { | ||
295 | MDB_cursor_op prefOp = MDB_PREV; | ||
296 | //We read past the end above, just take the last value | ||
297 | if (rc == MDB_NOTFOUND) { | ||
298 | prefOp = MDB_LAST; | ||
299 | } | ||
300 | rc = mdb_cursor_get(cursor, &key, &data, prefOp); | ||
301 | resultHandler(QByteArray::fromRawData((char*)key.mv_data, key.mv_size), QByteArray::fromRawData((char*)data.mv_data, data.mv_size)); | ||
302 | } | ||
303 | } | ||
304 | } | ||
305 | |||
306 | //We never find the last value | ||
307 | if (rc == MDB_NOTFOUND) { | ||
308 | rc = 0; | ||
309 | } | ||
310 | |||
311 | mdb_cursor_close(cursor); | ||
312 | |||
313 | if (rc) { | ||
314 | Error error(d->name.toLatin1(), getErrorCode(rc), QByteArray("Key: ") + k + " : " + QByteArray(mdb_strerror(rc))); | ||
315 | errorHandler ? errorHandler(error) : d->defaultErrorHandler(error); | ||
316 | } | ||
317 | |||
318 | return; | ||
272 | } | 319 | } |
273 | 320 | ||
274 | 321 | ||
diff --git a/common/threadboundary.cpp b/common/threadboundary.cpp index 47ec508..48fd11a 100644 --- a/common/threadboundary.cpp +++ b/common/threadboundary.cpp | |||
@@ -24,6 +24,9 @@ Q_DECLARE_METATYPE(std::function<void()>); | |||
24 | 24 | ||
25 | namespace async { | 25 | namespace async { |
26 | ThreadBoundary::ThreadBoundary(): QObject() { qRegisterMetaType<std::function<void()> >("std::function<void()>"); } | 26 | ThreadBoundary::ThreadBoundary(): QObject() { qRegisterMetaType<std::function<void()> >("std::function<void()>"); } |
27 | ThreadBoundary:: ~ThreadBoundary() {} | 27 | ThreadBoundary:: ~ThreadBoundary() |
28 | { | ||
29 | } | ||
30 | |||
28 | } | 31 | } |
29 | 32 | ||