summaryrefslogtreecommitdiffstats
path: root/common
diff options
context:
space:
mode:
Diffstat (limited to 'common')
-rw-r--r--common/CMakeLists.txt12
-rw-r--r--common/clientapi.cpp117
-rw-r--r--common/clientapi.h97
-rw-r--r--common/domain/applicationdomaintype.cpp9
-rw-r--r--common/domain/applicationdomaintype.h4
-rw-r--r--common/domain/dummy.fbs7
-rw-r--r--common/domain/event.cpp1
-rw-r--r--common/domain/folder.cpp100
-rw-r--r--common/domain/folder.fbs9
-rw-r--r--common/domain/folder.h56
-rw-r--r--common/domainadaptor.h13
-rw-r--r--common/domaintypeadaptorfactoryinterface.h42
-rw-r--r--common/entitystorage.cpp74
-rw-r--r--common/entitystorage.h126
-rw-r--r--common/facade.cpp81
-rw-r--r--common/facade.h262
-rw-r--r--common/facadeinterface.h29
-rw-r--r--common/modelresult.cpp251
-rw-r--r--common/modelresult.h78
-rw-r--r--common/query.h1
-rw-r--r--common/queryrunner.cpp312
-rw-r--r--common/queryrunner.h108
-rw-r--r--common/resourceaccess.cpp3
-rw-r--r--common/resourceaccess.h2
-rw-r--r--common/resourcefacade.cpp13
-rw-r--r--common/resourcefacade.h3
-rw-r--r--common/resultprovider.h114
-rw-r--r--common/storage_lmdb.cpp71
-rw-r--r--common/threadboundary.cpp5
29 files changed, 1386 insertions, 614 deletions
diff --git a/common/CMakeLists.txt b/common/CMakeLists.txt
index b4a4703..e56ece9 100644
--- a/common/CMakeLists.txt
+++ b/common/CMakeLists.txt
@@ -12,10 +12,10 @@ else (STORAGE_unqlite)
12endif (STORAGE_unqlite) 12endif (STORAGE_unqlite)
13 13
14set(command_SRCS 14set(command_SRCS
15 modelresult.cpp
15 definitions.cpp 16 definitions.cpp
16 log.cpp 17 log.cpp
17 entitybuffer.cpp 18 entitybuffer.cpp
18 entitystorage.cpp
19 clientapi.cpp 19 clientapi.cpp
20 facadefactory.cpp 20 facadefactory.cpp
21 commands.cpp 21 commands.cpp
@@ -26,6 +26,7 @@ set(command_SRCS
26 resource.cpp 26 resource.cpp
27 genericresource.cpp 27 genericresource.cpp
28 resourceaccess.cpp 28 resourceaccess.cpp
29 queryrunner.cpp
29 listener.cpp 30 listener.cpp
30 storage_common.cpp 31 storage_common.cpp
31 threadboundary.cpp 32 threadboundary.cpp
@@ -36,6 +37,7 @@ set(command_SRCS
36 domain/applicationdomaintype.cpp 37 domain/applicationdomaintype.cpp
37 domain/event.cpp 38 domain/event.cpp
38 domain/mail.cpp 39 domain/mail.cpp
40 domain/folder.cpp
39 ${storage_SRCS}) 41 ${storage_SRCS})
40 42
41add_library(${PROJECT_NAME} SHARED ${command_SRCS}) 43add_library(${PROJECT_NAME} SHARED ${command_SRCS})
@@ -55,6 +57,8 @@ generate_flatbuffers(
55 commands/revisionreplayed 57 commands/revisionreplayed
56 domain/event 58 domain/event
57 domain/mail 59 domain/mail
60 domain/folder
61 domain/dummy
58 entity 62 entity
59 metadata 63 metadata
60 queuedcommand 64 queuedcommand
@@ -70,12 +74,6 @@ install(FILES
70 clientapi.h 74 clientapi.h
71 domain/applicationdomaintype.h 75 domain/applicationdomaintype.h
72 query.h 76 query.h
73 threadboundary.h
74 resultprovider.h
75 facadefactory.h
76 log.h
77 listmodelresult.h
78 bufferadaptor.h 77 bufferadaptor.h
79 facadeinterface.h
80 DESTINATION ${INCLUDE_INSTALL_DIR}/${PROJECT_NAME} COMPONENT Devel 78 DESTINATION ${INCLUDE_INSTALL_DIR}/${PROJECT_NAME} COMPONENT Devel
81) 79)
diff --git a/common/clientapi.cpp b/common/clientapi.cpp
index f99ebb8..29b7e68 100644
--- a/common/clientapi.cpp
+++ b/common/clientapi.cpp
@@ -1,19 +1,47 @@
1/*
2 * Copyright (C) 2014 Christian Mollekopf <chrigi_1@fastmail.fm>
3 *
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Lesser General Public
6 * License as published by the Free Software Foundation; either
7 * version 2.1 of the License, or (at your option) version 3, or any
8 * later version accepted by the membership of KDE e.V. (or its
9 * successor approved by the membership of KDE e.V.), which shall
10 * act as a proxy defined in Section 6 of version 3 of the license.
11 *
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this library. If not, see <http://www.gnu.org/licenses/>.
19 */
1 20
2#include "clientapi.h" 21#include "clientapi.h"
22
23#include <QtConcurrent/QtConcurrentRun>
24#include <QTimer>
25#include <QEventLoop>
26#include <QAbstractItemModel>
27#include <functional>
28#include <memory>
29
3#include "resourceaccess.h" 30#include "resourceaccess.h"
4#include "commands.h" 31#include "commands.h"
5#include "resourcefacade.h" 32#include "resourcefacade.h"
6#include "log.h" 33#include "log.h"
7#include "definitions.h" 34#include "definitions.h"
8#include "resourceconfig.h" 35#include "resourceconfig.h"
9#include <QtConcurrent/QtConcurrentRun> 36#include "facadefactory.h"
10#include <QTimer> 37#include "modelresult.h"
38#include "log.h"
11 39
12#define ASYNCINTHREAD 40#define ASYNCINTHREAD
13 41
14namespace async 42namespace async
15{ 43{
16 void run(const std::function<void()> &runner) { 44 static void run(const std::function<void()> &runner) {
17 auto timer = new QTimer(); 45 auto timer = new QTimer();
18 timer->setSingleShot(true); 46 timer->setSingleShot(true);
19 QObject::connect(timer, &QTimer::timeout, [runner, timer]() { 47 QObject::connect(timer, &QTimer::timeout, [runner, timer]() {
@@ -69,6 +97,76 @@ QList<QByteArray> Store::getResources(const QList<QByteArray> &resourceFilter, c
69 return resources; 97 return resources;
70} 98}
71 99
100template <class DomainType>
101QSharedPointer<QAbstractItemModel> Store::loadModel(Query query)
102{
103 auto model = QSharedPointer<ModelResult<DomainType, typename DomainType::Ptr> >::create(query, query.requestedProperties.toList());
104
105 //* Client defines lifetime of model
106 //* The model lifetime defines the duration of live-queries
107 //* The facade needs to life for the duration of any calls being made (assuming we get rid of any internal callbacks
108 //* The emitter needs to live or the duration of query (respectively, the model)
109 //* The result provider needs to live for as long as results are provided (until the last thread exits).
110
111 // Query all resources and aggregate results
112 KAsync::iterate(getResources(query.resources, ApplicationDomain::getTypeName<DomainType>()))
113 .template each<void, QByteArray>([query, model](const QByteArray &resource, KAsync::Future<void> &future) {
114 auto facade = FacadeFactory::instance().getFacade<DomainType>(resourceName(resource), resource);
115 if (facade) {
116 Trace() << "Trying to fetch from resource";
117 auto result = facade->load(query);
118 auto emitter = result.second;
119 //TODO use aggregating emitter instead
120 model->setEmitter(emitter);
121 model->fetchMore(QModelIndex());
122 result.first.template then<void>([&future](){future.setFinished();}).exec();
123 } else {
124 //Ignore the error and carry on
125 future.setFinished();
126 }
127 }).exec();
128
129 return model;
130}
131
132template <class DomainType>
133static std::shared_ptr<StoreFacade<DomainType> > getFacade(const QByteArray &resourceInstanceIdentifier)
134{
135 if (auto facade = FacadeFactory::instance().getFacade<DomainType>(resourceName(resourceInstanceIdentifier), resourceInstanceIdentifier)) {
136 return facade;
137 }
138 return std::make_shared<NullFacade<DomainType> >();
139}
140
141template <class DomainType>
142KAsync::Job<void> Store::create(const DomainType &domainObject) {
143 //Potentially move to separate thread as well
144 auto facade = getFacade<DomainType>(domainObject.resourceInstanceIdentifier());
145 return facade->create(domainObject).template then<void>([facade](){}, [](int errorCode, const QString &error) {
146 Warning() << "Failed to create";
147 });
148}
149
150template <class DomainType>
151KAsync::Job<void> Store::modify(const DomainType &domainObject)
152{
153 //Potentially move to separate thread as well
154 auto facade = getFacade<DomainType>(domainObject.resourceInstanceIdentifier());
155 return facade->modify(domainObject).template then<void>([facade](){}, [](int errorCode, const QString &error) {
156 Warning() << "Failed to modify";
157 });
158}
159
160template <class DomainType>
161KAsync::Job<void> Store::remove(const DomainType &domainObject)
162{
163 //Potentially move to separate thread as well
164 auto facade = getFacade<DomainType>(domainObject.resourceInstanceIdentifier());
165 return facade->remove(domainObject).template then<void>([facade](){}, [](int errorCode, const QString &error) {
166 Warning() << "Failed to remove";
167 });
168}
169
72KAsync::Job<void> Store::shutdown(const QByteArray &identifier) 170KAsync::Job<void> Store::shutdown(const QByteArray &identifier)
73{ 171{
74 Trace() << "shutdown"; 172 Trace() << "shutdown";
@@ -95,7 +193,7 @@ KAsync::Job<void> Store::synchronize(const Akonadi2::Query &query)
95 .template each<void, QByteArray>([query](const QByteArray &resource, KAsync::Future<void> &future) { 193 .template each<void, QByteArray>([query](const QByteArray &resource, KAsync::Future<void> &future) {
96 auto resourceAccess = QSharedPointer<Akonadi2::ResourceAccess>::create(resource); 194 auto resourceAccess = QSharedPointer<Akonadi2::ResourceAccess>::create(resource);
97 resourceAccess->open(); 195 resourceAccess->open();
98 resourceAccess->synchronizeResource(true, false).then<void>([&future]() { 196 resourceAccess->synchronizeResource(query.syncOnDemand, query.processAll).then<void>([&future, resourceAccess]() {
99 future.setFinished(); 197 future.setFinished();
100 }).exec(); 198 }).exec();
101 }) 199 })
@@ -103,4 +201,15 @@ KAsync::Job<void> Store::synchronize(const Akonadi2::Query &query)
103 .template then<void>([](){}); 201 .template then<void>([](){});
104} 202}
105 203
204#define REGISTER_TYPE(T) template KAsync::Job<void> Store::remove<T>(const T &domainObject); \
205 template KAsync::Job<void> Store::create<T>(const T &domainObject); \
206 template KAsync::Job<void> Store::modify<T>(const T &domainObject); \
207 template QSharedPointer<QAbstractItemModel> Store::loadModel<T>(Query query); \
208
209REGISTER_TYPE(ApplicationDomain::Event);
210REGISTER_TYPE(ApplicationDomain::Mail);
211REGISTER_TYPE(ApplicationDomain::Folder);
212REGISTER_TYPE(ApplicationDomain::AkonadiResource);
213
106} // namespace Akonadi2 214} // namespace Akonadi2
215
diff --git a/common/clientapi.h b/common/clientapi.h
index 9a32188..8f87562 100644
--- a/common/clientapi.h
+++ b/common/clientapi.h
@@ -22,28 +22,16 @@
22 22
23#include <QString> 23#include <QString>
24#include <QSharedPointer> 24#include <QSharedPointer>
25#include <QEventLoop>
26#include <functional>
27#include <memory>
28 25
29#include <Async/Async> 26#include <Async/Async>
30 27
31#include "query.h" 28#include "query.h"
32#include "resultprovider.h"
33#include "applicationdomaintype.h" 29#include "applicationdomaintype.h"
34#include "facadefactory.h"
35#include "log.h"
36 30
37namespace async { 31class QAbstractItemModel;
38 //This should abstract if we execute from eventloop or in thread.
39 //It supposed to allow the caller to finish the current method before executing the runner.
40 void run(const std::function<void()> &runner);
41}
42 32
43namespace Akonadi2 { 33namespace Akonadi2 {
44 34
45using namespace async;
46
47/** 35/**
48 * Store interface used in the client API. 36 * Store interface used in the client API.
49 */ 37 */
@@ -55,77 +43,22 @@ public:
55 static QString storageLocation(); 43 static QString storageLocation();
56 static QByteArray resourceName(const QByteArray &instanceIdentifier); 44 static QByteArray resourceName(const QByteArray &instanceIdentifier);
57 45
58 /** 46 enum Roles {
59 * Asynchronusly load a dataset 47 DomainObjectRole = Qt::UserRole + 1, //Must be the same as in ModelResult
60 */ 48 ChildrenFetchedRole
61 template <class DomainType> 49 };
62 static QSharedPointer<ResultEmitter<typename DomainType::Ptr> > load(Query query)
63 {
64 auto resultSet = QSharedPointer<ResultProvider<typename DomainType::Ptr> >::create();
65
66 //Execute the search in a thread.
67 //We must guarantee that the emitter is returned before the first result is emitted.
68 //The result provider must be threadsafe.
69 async::run([query, resultSet](){
70 QEventLoop eventLoop;
71 resultSet->onDone([&eventLoop](){
72 eventLoop.quit();
73 });
74 // Query all resources and aggregate results
75 KAsync::iterate(getResources(query.resources, ApplicationDomain::getTypeName<DomainType>()))
76 .template each<void, QByteArray>([query, resultSet](const QByteArray &resource, KAsync::Future<void> &future) {
77 auto facade = FacadeFactory::instance().getFacade<DomainType>(resourceName(resource), resource);
78 if (facade) {
79 facade->load(query, resultSet).template then<void>([&future](){future.setFinished();}).exec();
80 //Keep the facade alive for the lifetime of the resultSet.
81 resultSet->setFacade(facade);
82 } else {
83 //Ignore the error and carry on
84 future.setFinished();
85 }
86 }).template then<void>([query, resultSet]() {
87 resultSet->initialResultSetComplete();
88 if (!query.liveQuery) {
89 resultSet->complete();
90 }
91 }).exec();
92
93 //Keep the thread alive until the result is ready
94 if (!resultSet->isDone()) {
95 eventLoop.exec();
96 }
97 });
98 return resultSet->emitter();
99 }
100 50
101 /** 51 /**
102 * Asynchronusly load a dataset with tree structure information 52 * Asynchronusly load a dataset with tree structure information
103 */ 53 */
104 // template <class DomainType>
105 // static TreeSet<DomainType> loadTree(Query)
106 // {
107
108 // }
109 template <class DomainType> 54 template <class DomainType>
110 static std::shared_ptr<StoreFacade<DomainType> > getFacade(const QByteArray &resourceInstanceIdentifier) 55 static QSharedPointer<QAbstractItemModel> loadModel(Query query);
111 {
112 if (auto facade = FacadeFactory::instance().getFacade<DomainType>(resourceName(resourceInstanceIdentifier), resourceInstanceIdentifier)) {
113 return facade;
114 }
115 return std::make_shared<NullFacade<DomainType> >();
116 }
117 56
118 /** 57 /**
119 * Create a new entity. 58 * Create a new entity.
120 */ 59 */
121 template <class DomainType> 60 template <class DomainType>
122 static KAsync::Job<void> create(const DomainType &domainObject) { 61 static KAsync::Job<void> create(const DomainType &domainObject);
123 //Potentially move to separate thread as well
124 auto facade = getFacade<DomainType>(domainObject.resourceInstanceIdentifier());
125 return facade->create(domainObject).template then<void>([facade](){}, [](int errorCode, const QString &error) {
126 Warning() << "Failed to create";
127 });
128 }
129 62
130 /** 63 /**
131 * Modify an entity. 64 * Modify an entity.
@@ -133,25 +66,13 @@ public:
133 * This includes moving etc. since these are also simple settings on a property. 66 * This includes moving etc. since these are also simple settings on a property.
134 */ 67 */
135 template <class DomainType> 68 template <class DomainType>
136 static KAsync::Job<void> modify(const DomainType &domainObject) { 69 static KAsync::Job<void> modify(const DomainType &domainObject);
137 //Potentially move to separate thread as well
138 auto facade = getFacade<DomainType>(domainObject.resourceInstanceIdentifier());
139 return facade->modify(domainObject).template then<void>([facade](){}, [](int errorCode, const QString &error) {
140 Warning() << "Failed to modify";
141 });
142 }
143 70
144 /** 71 /**
145 * Remove an entity. 72 * Remove an entity.
146 */ 73 */
147 template <class DomainType> 74 template <class DomainType>
148 static KAsync::Job<void> remove(const DomainType &domainObject) { 75 static KAsync::Job<void> remove(const DomainType &domainObject);
149 //Potentially move to separate thread as well
150 auto facade = getFacade<DomainType>(domainObject.resourceInstanceIdentifier());
151 return facade->remove(domainObject).template then<void>([facade](){}, [](int errorCode, const QString &error) {
152 Warning() << "Failed to remove";
153 });
154 }
155 76
156 /** 77 /**
157 * Shutdown resource. 78 * Shutdown resource.
diff --git a/common/domain/applicationdomaintype.cpp b/common/domain/applicationdomaintype.cpp
index 1b5d870..c9a8bba 100644
--- a/common/domain/applicationdomaintype.cpp
+++ b/common/domain/applicationdomaintype.cpp
@@ -60,10 +60,13 @@ ApplicationDomainType& ApplicationDomainType::operator=(const ApplicationDomainT
60 return *this; 60 return *this;
61} 61}
62 62
63ApplicationDomainType::~ApplicationDomainType() {} 63ApplicationDomainType::~ApplicationDomainType()
64{
65}
64 66
65QVariant ApplicationDomainType::getProperty(const QByteArray &key) const 67QVariant ApplicationDomainType::getProperty(const QByteArray &key) const
66{ 68{
69 Q_ASSERT(mAdaptor);
67 if (!mAdaptor->availableProperties().contains(key)) { 70 if (!mAdaptor->availableProperties().contains(key)) {
68 Warning() << "No such property available " << key; 71 Warning() << "No such property available " << key;
69 } 72 }
@@ -72,7 +75,9 @@ QVariant ApplicationDomainType::getProperty(const QByteArray &key) const
72 75
73void ApplicationDomainType::setProperty(const QByteArray &key, const QVariant &value) 76void ApplicationDomainType::setProperty(const QByteArray &key, const QVariant &value)
74{ 77{
75 mChangeSet.insert(key, value); mAdaptor->setProperty(key, value); 78 Q_ASSERT(mAdaptor);
79 mChangeSet.insert(key, value);
80 mAdaptor->setProperty(key, value);
76} 81}
77 82
78QByteArrayList ApplicationDomainType::changedProperties() const 83QByteArrayList ApplicationDomainType::changedProperties() const
diff --git a/common/domain/applicationdomaintype.h b/common/domain/applicationdomaintype.h
index 5514d26..227ab4d 100644
--- a/common/domain/applicationdomaintype.h
+++ b/common/domain/applicationdomaintype.h
@@ -160,3 +160,7 @@ Q_DECLARE_METATYPE(Akonadi2::ApplicationDomain::Event)
160Q_DECLARE_METATYPE(Akonadi2::ApplicationDomain::Event::Ptr) 160Q_DECLARE_METATYPE(Akonadi2::ApplicationDomain::Event::Ptr)
161Q_DECLARE_METATYPE(Akonadi2::ApplicationDomain::Mail) 161Q_DECLARE_METATYPE(Akonadi2::ApplicationDomain::Mail)
162Q_DECLARE_METATYPE(Akonadi2::ApplicationDomain::Mail::Ptr) 162Q_DECLARE_METATYPE(Akonadi2::ApplicationDomain::Mail::Ptr)
163Q_DECLARE_METATYPE(Akonadi2::ApplicationDomain::Folder)
164Q_DECLARE_METATYPE(Akonadi2::ApplicationDomain::Folder::Ptr)
165Q_DECLARE_METATYPE(Akonadi2::ApplicationDomain::AkonadiResource)
166Q_DECLARE_METATYPE(Akonadi2::ApplicationDomain::AkonadiResource::Ptr)
diff --git a/common/domain/dummy.fbs b/common/domain/dummy.fbs
new file mode 100644
index 0000000..8816b09
--- /dev/null
+++ b/common/domain/dummy.fbs
@@ -0,0 +1,7 @@
1namespace Akonadi2.ApplicationDomain.Buffer;
2
3table Dummy {
4}
5
6root_type Dummy;
7file_identifier "AKFB";
diff --git a/common/domain/event.cpp b/common/domain/event.cpp
index 87e13bc..42c13e2 100644
--- a/common/domain/event.cpp
+++ b/common/domain/event.cpp
@@ -47,6 +47,7 @@ ResultSet TypeImplementation<Event>::queryIndexes(const Akonadi2::Query &query,
47 }); 47 });
48 appliedFilters << "uid"; 48 appliedFilters << "uid";
49 } 49 }
50 Trace() << "Index lookup found " << keys.size() << " keys.";
50 return ResultSet(keys); 51 return ResultSet(keys);
51} 52}
52 53
diff --git a/common/domain/folder.cpp b/common/domain/folder.cpp
new file mode 100644
index 0000000..989d2c4
--- /dev/null
+++ b/common/domain/folder.cpp
@@ -0,0 +1,100 @@
1/*
2 * Copyright (C) 2015 Christian Mollekopf <chrigi_1@fastfolder.fm>
3 *
4 * This program is free software; you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License as published by
6 * the Free Software Foundation; either version 2 of the License, or
7 * (at your option) any later version.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU General Public License for more details.
13 *
14 * You should have received a copy of the GNU General Public License
15 * along with this program; if not, write to the
16 * Free Software Foundation, Inc.,
17 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18 */
19#include "folder.h"
20
21#include <QVector>
22#include <QByteArray>
23#include <QString>
24
25#include "../resultset.h"
26#include "../index.h"
27#include "../storage.h"
28#include "../log.h"
29#include "../propertymapper.h"
30#include "../query.h"
31#include "../definitions.h"
32
33#include "folder_generated.h"
34
35using namespace Akonadi2::ApplicationDomain;
36
37ResultSet TypeImplementation<Folder>::queryIndexes(const Akonadi2::Query &query, const QByteArray &resourceInstanceIdentifier, QSet<QByteArray> &appliedFilters, Akonadi2::Storage::Transaction &transaction)
38{
39 QVector<QByteArray> keys;
40 if (query.propertyFilter.contains("parent")) {
41 Index index("folder.index.parent", transaction);
42 auto lookupKey = query.propertyFilter.value("parent").toByteArray();
43 if (lookupKey.isEmpty()) {
44 lookupKey = "toplevel";
45 }
46 index.lookup(lookupKey, [&](const QByteArray &value) {
47 keys << value;
48 },
49 [](const Index::Error &error) {
50 Warning() << "Error in uid index: " << error.message;
51 });
52 appliedFilters << "parent";
53 }
54 Trace() << "Index lookup found " << keys.size() << " keys.";
55 return ResultSet(keys);
56}
57
58void TypeImplementation<Folder>::index(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Akonadi2::Storage::Transaction &transaction)
59{
60 const auto parent = bufferAdaptor.getProperty("parent");
61 Trace() << "indexing " << identifier << " with parent " << parent.toByteArray();
62 if (parent.isValid()) {
63 Q_ASSERT(!parent.toByteArray().isEmpty());
64 Index("folder.index.parent", transaction).add(parent.toByteArray(), identifier);
65 } else {
66 Index("folder.index.parent", transaction).add("toplevel", identifier);
67 }
68}
69
70void TypeImplementation<Folder>::removeIndex(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Akonadi2::Storage::Transaction &transaction)
71{
72 const auto parent = bufferAdaptor.getProperty("parent");
73 if (parent.isValid()) {
74 Index("folder.index.parent", transaction).remove(parent.toByteArray(), identifier);
75 } else {
76 Index("folder.index.parent", transaction).remove("toplevel", identifier);
77 }
78}
79
80QSharedPointer<ReadPropertyMapper<TypeImplementation<Folder>::Buffer> > TypeImplementation<Folder>::initializeReadPropertyMapper()
81{
82 auto propertyMapper = QSharedPointer<ReadPropertyMapper<Buffer> >::create();
83 propertyMapper->addMapping<QString, Buffer>("parent", &Buffer::parent);
84 propertyMapper->addMapping<QString, Buffer>("name", &Buffer::name);
85 return propertyMapper;
86}
87
88QSharedPointer<WritePropertyMapper<TypeImplementation<Folder>::BufferBuilder> > TypeImplementation<Folder>::initializeWritePropertyMapper()
89{
90 auto propertyMapper = QSharedPointer<WritePropertyMapper<BufferBuilder> >::create();
91 propertyMapper->addMapping("parent", [](const QVariant &value, flatbuffers::FlatBufferBuilder &fbb) -> std::function<void(BufferBuilder &)> {
92 auto offset = variantToProperty<QString>(value, fbb);
93 return [offset](BufferBuilder &builder) { builder.add_parent(offset); };
94 });
95 propertyMapper->addMapping("name", [](const QVariant &value, flatbuffers::FlatBufferBuilder &fbb) -> std::function<void(BufferBuilder &)> {
96 auto offset = variantToProperty<QString>(value, fbb);
97 return [offset](BufferBuilder &builder) { builder.add_name(offset); };
98 });
99 return propertyMapper;
100}
diff --git a/common/domain/folder.fbs b/common/domain/folder.fbs
new file mode 100644
index 0000000..3476d58
--- /dev/null
+++ b/common/domain/folder.fbs
@@ -0,0 +1,9 @@
1namespace Akonadi2.ApplicationDomain.Buffer;
2
3table Folder {
4 name:string;
5 parent:string;
6}
7
8root_type Folder;
9file_identifier "AKFB";
diff --git a/common/domain/folder.h b/common/domain/folder.h
new file mode 100644
index 0000000..545836f
--- /dev/null
+++ b/common/domain/folder.h
@@ -0,0 +1,56 @@
1/*
2 * Copyright (C) 2015 Christian Mollekopf <chrigi_1@fastmail.fm>
3 *
4 * This program is free software; you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License as published by
6 * the Free Software Foundation; either version 2 of the License, or
7 * (at your option) any later version.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU General Public License for more details.
13 *
14 * You should have received a copy of the GNU General Public License
15 * along with this program; if not, write to the
16 * Free Software Foundation, Inc.,
17 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18 */
19#pragma once
20
21#include "applicationdomaintype.h"
22
23#include "storage.h"
24
25class ResultSet;
26class QByteArray;
27
28template<typename T>
29class ReadPropertyMapper;
30template<typename T>
31class WritePropertyMapper;
32
33namespace Akonadi2 {
34 class Query;
35
36namespace ApplicationDomain {
37 namespace Buffer {
38 struct Folder;
39 struct FolderBuilder;
40 }
41
42template<>
43class TypeImplementation<Akonadi2::ApplicationDomain::Folder> {
44public:
45 typedef Akonadi2::ApplicationDomain::Buffer::Folder Buffer;
46 typedef Akonadi2::ApplicationDomain::Buffer::FolderBuilder BufferBuilder;
47 static QSet<QByteArray> indexedProperties();
48 static ResultSet queryIndexes(const Akonadi2::Query &query, const QByteArray &resourceInstanceIdentifier, QSet<QByteArray> &appliedFilters, Akonadi2::Storage::Transaction &transaction);
49 static void index(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Akonadi2::Storage::Transaction &transaction);
50 static void removeIndex(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Akonadi2::Storage::Transaction &transaction);
51 static QSharedPointer<ReadPropertyMapper<Buffer> > initializeReadPropertyMapper();
52 static QSharedPointer<WritePropertyMapper<BufferBuilder> > initializeWritePropertyMapper();
53};
54
55}
56}
diff --git a/common/domainadaptor.h b/common/domainadaptor.h
index b14fbcd..b541e23 100644
--- a/common/domainadaptor.h
+++ b/common/domainadaptor.h
@@ -1,5 +1,5 @@
1/* 1/*
2 * Copyright (C) 2014 Christian Mollekopf <chrigi_1@fastmail.fm> 2 * Copyright (C) 2014 Christian Mollekopf <chrigi_1@fastmail.fm>
3 * 3 *
4 * This program is free software; you can redistribute it and/or modify 4 * This program is free software; you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License as published by 5 * it under the terms of the GNU General Public License as published by
@@ -23,9 +23,11 @@
23#include <QByteArray> 23#include <QByteArray>
24#include <functional> 24#include <functional>
25 25
26#include "domaintypeadaptorfactoryinterface.h"
26#include "domain/applicationdomaintype.h" 27#include "domain/applicationdomaintype.h"
27#include "domain/event.h" 28#include "domain/event.h"
28#include "domain/mail.h" 29#include "domain/mail.h"
30#include "domain/folder.h"
29#include "bufferadaptor.h" 31#include "bufferadaptor.h"
30#include "entity_generated.h" 32#include "entity_generated.h"
31#include "metadata_generated.h" 33#include "metadata_generated.h"
@@ -123,15 +125,6 @@ public:
123 QSharedPointer<ReadPropertyMapper<ResourceBuffer> > mResourceMapper; 125 QSharedPointer<ReadPropertyMapper<ResourceBuffer> > mResourceMapper;
124}; 126};
125 127
126class DomainTypeAdaptorFactoryInterface
127{
128public:
129 typedef QSharedPointer<DomainTypeAdaptorFactoryInterface> Ptr;
130 virtual ~DomainTypeAdaptorFactoryInterface() {};
131 virtual QSharedPointer<Akonadi2::ApplicationDomain::BufferAdaptor> createAdaptor(const Akonadi2::Entity &entity) = 0;
132 virtual void createBuffer(const Akonadi2::ApplicationDomain::ApplicationDomainType &domainType, flatbuffers::FlatBufferBuilder &fbb, void const *metadataData = 0, size_t metadataSize = 0) = 0;
133};
134
135/** 128/**
136 * The factory should define how to go from an entitybuffer (local + resource buffer), to a domain type adapter. 129 * The factory should define how to go from an entitybuffer (local + resource buffer), to a domain type adapter.
137 * It defines how values are split accross local and resource buffer. 130 * It defines how values are split accross local and resource buffer.
diff --git a/common/domaintypeadaptorfactoryinterface.h b/common/domaintypeadaptorfactoryinterface.h
new file mode 100644
index 0000000..8c99aa1
--- /dev/null
+++ b/common/domaintypeadaptorfactoryinterface.h
@@ -0,0 +1,42 @@
1/*
2 * Copyright (C) 2014 Christian Mollekopf <chrigi_1@fastmail.fm>
3 *
4 * This program is free software; you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License as published by
6 * the Free Software Foundation; either version 2 of the License, or
7 * (at your option) any later version.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU General Public License for more details.
13 *
14 * You should have received a copy of the GNU General Public License
15 * along with this program; if not, write to the
16 * Free Software Foundation, Inc.,
17 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18 */
19#pragma once
20
21#include <QSharedPointer>
22
23namespace Akonadi2 {
24 namespace ApplicationDomain {
25 class BufferAdaptor;
26 class ApplicationDomainType;
27 }
28 struct Entity;
29}
30
31namespace flatbuffers {
32 class FlatBufferBuilder;
33}
34
35class DomainTypeAdaptorFactoryInterface
36{
37public:
38 typedef QSharedPointer<DomainTypeAdaptorFactoryInterface> Ptr;
39 virtual ~DomainTypeAdaptorFactoryInterface() {};
40 virtual QSharedPointer<Akonadi2::ApplicationDomain::BufferAdaptor> createAdaptor(const Akonadi2::Entity &entity) = 0;
41 virtual void createBuffer(const Akonadi2::ApplicationDomain::ApplicationDomainType &domainType, flatbuffers::FlatBufferBuilder &fbb, void const *metadataData = 0, size_t metadataSize = 0) = 0;
42};
diff --git a/common/entitystorage.cpp b/common/entitystorage.cpp
deleted file mode 100644
index 5d4df9f..0000000
--- a/common/entitystorage.cpp
+++ /dev/null
@@ -1,74 +0,0 @@
1/*
2 * Copyright (C) 2014 Christian Mollekopf <chrigi_1@fastmail.fm>
3 *
4 * This program is free software; you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License as published by
6 * the Free Software Foundation; either version 2 of the License, or
7 * (at your option) any later version.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU General Public License for more details.
13 *
14 * You should have received a copy of the GNU General Public License
15 * along with this program; if not, write to the
16 * Free Software Foundation, Inc.,
17 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18 */
19
20#include "entitystorage.h"
21
22ResultSet EntityStorageBase::filteredSet(const ResultSet &resultSet, const std::function<bool(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> &filter, const Akonadi2::Storage::Transaction &transaction, bool initialQuery)
23{
24 auto resultSetPtr = QSharedPointer<ResultSet>::create(resultSet);
25
26 //Read through the source values and return whatever matches the filter
27 std::function<bool(std::function<void(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &, Akonadi2::Operation)>)> generator = [this, resultSetPtr, &transaction, filter, initialQuery](std::function<void(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &, Akonadi2::Operation)> callback) -> bool {
28 while (resultSetPtr->next()) {
29 readEntity(transaction, resultSetPtr->id(), [this, filter, callback, initialQuery](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject, Akonadi2::Operation operation) {
30 //Always remove removals, they probably don't match due to non-available properties
31 if (filter(domainObject) || operation == Akonadi2::Operation_Removal) {
32 if (initialQuery) {
33 //We're not interested in removals during the initial query
34 if (operation != Akonadi2::Operation_Removal) {
35 callback(domainObject, Akonadi2::Operation_Creation);
36 }
37 } else {
38 callback(domainObject, operation);
39 }
40 }
41 });
42 }
43 return false;
44 };
45 return ResultSet(generator);
46}
47
48
49ResultSet EntityStorageBase::getResultSet(const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, qint64 baseRevision)
50{
51 QSet<QByteArray> remainingFilters = query.propertyFilter.keys().toSet();
52 ResultSet resultSet;
53 const bool initialQuery = (baseRevision == 1);
54 if (initialQuery) {
55 Trace() << "Initial result set update";
56 resultSet = loadInitialResultSet(query, transaction, remainingFilters);
57 } else {
58 //TODO fallback in case the old revision is no longer available to clear + redo complete initial scan
59 Trace() << "Incremental result set update" << baseRevision;
60 resultSet = loadIncrementalResultSet(baseRevision, query, transaction, remainingFilters);
61 }
62
63 auto filter = [remainingFilters, query, baseRevision](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject) -> bool {
64 for (const auto &filterProperty : remainingFilters) {
65 //TODO implement other comparison operators than equality
66 if (domainObject->getProperty(filterProperty) != query.propertyFilter.value(filterProperty)) {
67 return false;
68 }
69 }
70 return true;
71 };
72
73 return filteredSet(resultSet, filter, transaction, initialQuery);
74}
diff --git a/common/entitystorage.h b/common/entitystorage.h
deleted file mode 100644
index 8e73083..0000000
--- a/common/entitystorage.h
+++ /dev/null
@@ -1,126 +0,0 @@
1/*
2 * Copyright (C) 2014 Christian Mollekopf <chrigi_1@fastmail.fm>
3 *
4 * This program is free software; you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License as published by
6 * the Free Software Foundation; either version 2 of the License, or
7 * (at your option) any later version.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU General Public License for more details.
13 *
14 * You should have received a copy of the GNU General Public License
15 * along with this program; if not, write to the
16 * Free Software Foundation, Inc.,
17 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18 */
19#pragma once
20
21#include <QByteArray>
22
23#include "query.h"
24#include "domainadaptor.h"
25#include "entitybuffer.h"
26#include "log.h"
27#include "storage.h"
28#include "resultset.h"
29#include "resultprovider.h"
30#include "definitions.h"
31
32/**
33 * Wraps storage, entity adaptor factory and indexes into one.
34 *
35 */
36class EntityStorageBase
37{
38public:
39 typedef std::function<ResultSet (const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters)> InitialResultLoader;
40 typedef std::function<ResultSet (qint64 baseRevision, const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters)> IncrementalResultLoader;
41 typedef std::function<void(const Akonadi2::Storage::Transaction &transaction, const QByteArray &key, const std::function<void(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &, Akonadi2::Operation)> &resultCallback)> EntityReader;
42
43 /**
44 * Returns the initial result set that still needs to be filtered.
45 *
46 * To make this efficient indexes should be chosen that are as selective as possible.
47 */
48 InitialResultLoader loadInitialResultSet;
49 /**
50 * Returns the incremental result set that still needs to be filtered.
51 */
52 IncrementalResultLoader loadIncrementalResultSet;
53
54 /**
55 * Loads a single entity by uid from storage.
56 */
57 EntityReader readEntity;
58
59protected:
60 EntityStorageBase(const QByteArray &instanceIdentifier)
61 : mResourceInstanceIdentifier(instanceIdentifier)
62 {
63
64 }
65
66 virtual Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr copy(const Akonadi2::ApplicationDomain::ApplicationDomainType &) = 0;
67
68 ResultSet getResultSet(const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, qint64 baseRevision);
69
70 QByteArray mResourceInstanceIdentifier;
71
72private:
73 ResultSet filteredSet(const ResultSet &resultSet, const std::function<bool(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> &filter, const Akonadi2::Storage::Transaction &transaction, bool isInitialQuery);
74};
75
76template<typename DomainType>
77class EntityStorage : public EntityStorageBase
78{
79
80public:
81
82 EntityStorage(const QByteArray &instanceIdentifier)
83 : EntityStorageBase(instanceIdentifier)
84 {
85 }
86
87protected:
88 Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr copy(const Akonadi2::ApplicationDomain::ApplicationDomainType &object) Q_DECL_OVERRIDE
89 {
90 return Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<DomainType>(object);
91 }
92
93public:
94
95 virtual qint64 read(const Akonadi2::Query &query, qint64 baseRevision, const QSharedPointer<Akonadi2::ResultProvider<typename DomainType::Ptr> > &resultProvider)
96 {
97 Akonadi2::Storage storage(Akonadi2::storageLocation(), mResourceInstanceIdentifier);
98 storage.setDefaultErrorHandler([](const Akonadi2::Storage::Error &error) {
99 Warning() << "Error during query: " << error.store << error.message;
100 });
101
102 auto transaction = storage.createTransaction(Akonadi2::Storage::ReadOnly);
103
104 Log() << "Querying" << baseRevision << Akonadi2::Storage::maxRevision(transaction);
105 auto resultSet = getResultSet(query, transaction, baseRevision);
106 while(resultSet.next([this, resultProvider](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &value, Akonadi2::Operation operation) -> bool {
107 switch (operation) {
108 case Akonadi2::Operation_Creation:
109 Trace() << "Got creation";
110 resultProvider->add(copy(*value).template staticCast<DomainType>());
111 break;
112 case Akonadi2::Operation_Modification:
113 Trace() << "Got modification";
114 resultProvider->modify(copy(*value).template staticCast<DomainType>());
115 break;
116 case Akonadi2::Operation_Removal:
117 Trace() << "Got removal";
118 resultProvider->remove(copy(*value).template staticCast<DomainType>());
119 break;
120 }
121 return true;
122 })){};
123 return Akonadi2::Storage::maxRevision(transaction);
124 }
125
126};
diff --git a/common/facade.cpp b/common/facade.cpp
index e51b32a..1d6b9a7 100644
--- a/common/facade.cpp
+++ b/common/facade.cpp
@@ -1,5 +1,5 @@
1/* 1/*
2 * Copyright (C) 2015 Christian Mollekopf <chrigi_1@fastmail.fm> 2 * Copyright (C) 2015 Christian Mollekopf <chrigi_1@fastmail.fm>
3 * 3 *
4 * This program is free software; you can redistribute it and/or modify 4 * This program is free software; you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License as published by 5 * it under the terms of the GNU General Public License as published by
@@ -18,3 +18,82 @@
18 */ 18 */
19 19
20#include "facade.h" 20#include "facade.h"
21
22#include "commands.h"
23#include "log.h"
24#include "storage.h"
25#include "definitions.h"
26#include "domainadaptor.h"
27#include "queryrunner.h"
28
29using namespace Akonadi2;
30
31
32template<class DomainType>
33GenericFacade<DomainType>::GenericFacade(const QByteArray &resourceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &adaptorFactory , const QSharedPointer<Akonadi2::ResourceAccessInterface> resourceAccess)
34 : Akonadi2::StoreFacade<DomainType>(),
35 mResourceAccess(resourceAccess),
36 mDomainTypeAdaptorFactory(adaptorFactory),
37 mResourceInstanceIdentifier(resourceIdentifier)
38{
39 if (!mResourceAccess) {
40 mResourceAccess = QSharedPointer<Akonadi2::ResourceAccess>::create(resourceIdentifier);
41 }
42}
43
44template<class DomainType>
45GenericFacade<DomainType>::~GenericFacade()
46{
47}
48
49template<class DomainType>
50QByteArray GenericFacade<DomainType>::bufferTypeForDomainType()
51{
52 //We happen to have a one to one mapping
53 return Akonadi2::ApplicationDomain::getTypeName<DomainType>();
54}
55
56template<class DomainType>
57KAsync::Job<void> GenericFacade<DomainType>::create(const DomainType &domainObject)
58{
59 if (!mDomainTypeAdaptorFactory) {
60 Warning() << "No domain type adaptor factory available";
61 return KAsync::error<void>();
62 }
63 flatbuffers::FlatBufferBuilder entityFbb;
64 mDomainTypeAdaptorFactory->createBuffer(domainObject, entityFbb);
65 return mResourceAccess->sendCreateCommand(bufferTypeForDomainType(), QByteArray::fromRawData(reinterpret_cast<const char*>(entityFbb.GetBufferPointer()), entityFbb.GetSize()));
66}
67
68template<class DomainType>
69KAsync::Job<void> GenericFacade<DomainType>::modify(const DomainType &domainObject)
70{
71 if (!mDomainTypeAdaptorFactory) {
72 Warning() << "No domain type adaptor factory available";
73 return KAsync::error<void>();
74 }
75 flatbuffers::FlatBufferBuilder entityFbb;
76 mDomainTypeAdaptorFactory->createBuffer(domainObject, entityFbb);
77 return mResourceAccess->sendModifyCommand(domainObject.identifier(), domainObject.revision(), bufferTypeForDomainType(), QByteArrayList(), QByteArray::fromRawData(reinterpret_cast<const char*>(entityFbb.GetBufferPointer()), entityFbb.GetSize()));
78}
79
80template<class DomainType>
81KAsync::Job<void> GenericFacade<DomainType>::remove(const DomainType &domainObject)
82{
83 return mResourceAccess->sendDeleteCommand(domainObject.identifier(), domainObject.revision(), bufferTypeForDomainType());
84}
85
86template<class DomainType>
87QPair<KAsync::Job<void>, typename ResultEmitter<typename DomainType::Ptr>::Ptr> GenericFacade<DomainType>::load(const Akonadi2::Query &query)
88{
89 //The runner lives for the lifetime of the query
90 auto runner = new QueryRunner<DomainType>(query, mResourceAccess, mResourceInstanceIdentifier, mDomainTypeAdaptorFactory, bufferTypeForDomainType());
91 return qMakePair(KAsync::null<void>(), runner->emitter());
92}
93
94
95template class Akonadi2::GenericFacade<Akonadi2::ApplicationDomain::Folder>;
96template class Akonadi2::GenericFacade<Akonadi2::ApplicationDomain::Mail>;
97template class Akonadi2::GenericFacade<Akonadi2::ApplicationDomain::Event>;
98
99#include "facade.moc"
diff --git a/common/facade.h b/common/facade.h
index 643ebec..de67e05 100644
--- a/common/facade.h
+++ b/common/facade.h
@@ -1,5 +1,5 @@
1/* 1/*
2 * Copyright (C) 2014 Christian Mollekopf <chrigi_1@fastmail.fm> 2 * Copyright (C) 2014 Christian Mollekopf <chrigi_1@fastmail.fm>
3 * 3 *
4 * This program is free software; you can redistribute it and/or modify 4 * This program is free software; you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License as published by 5 * it under the terms of the GNU General Public License as published by
@@ -25,85 +25,12 @@
25#include <Async/Async> 25#include <Async/Async>
26 26
27#include "resourceaccess.h" 27#include "resourceaccess.h"
28#include "commands.h"
29#include "domainadaptor.h"
30#include "log.h"
31#include "resultset.h" 28#include "resultset.h"
32#include "entitystorage.h" 29#include "domaintypeadaptorfactoryinterface.h"
33 30#include "storage.h"
34/**
35 * A QueryRunner runs a query and updates the corresponding result set.
36 *
37 * The lifetime of the QueryRunner is defined by the resut set (otherwise it's doing useless work),
38 * and by how long a result set must be updated. If the query is one off the runner dies after the execution,
39 * otherwise it lives on the react to changes and updates the corresponding result set.
40 *
41 * QueryRunner has to keep ResourceAccess alive in order to keep getting updates.
42 */
43class QueryRunner : public QObject
44{
45 Q_OBJECT
46public:
47 typedef std::function<KAsync::Job<qint64>(qint64 oldRevision)> QueryFunction;
48
49 QueryRunner(const Akonadi2::Query &query) : mLatestRevision(0) {};
50 /**
51 * Starts query
52 */
53 KAsync::Job<void> run(qint64 newRevision = 0)
54 {
55 //TODO: JOBAPI: that last empty .then should not be necessary
56 //TODO: remove newRevision
57 return queryFunction(mLatestRevision + 1).then<void, qint64>([this](qint64 revision) {
58 mLatestRevision = revision;
59 }).then<void>([](){});
60 }
61
62 /**
63 * Set the query to run
64 */
65 void setQuery(const QueryFunction &query)
66 {
67 queryFunction = query;
68 }
69
70public slots:
71 /**
72 * Rerun query with new revision
73 */
74 void revisionChanged(qint64 newRevision)
75 {
76 Trace() << "New revision: " << newRevision;
77 run(newRevision).exec();
78 }
79
80private:
81 QueryFunction queryFunction;
82 qint64 mLatestRevision;
83};
84
85static inline ResultSet fullScan(const Akonadi2::Storage::Transaction &transaction, const QByteArray &bufferType)
86{
87 //TODO use a result set with an iterator, to read values on demand
88 QVector<QByteArray> keys;
89 transaction.openDatabase(bufferType + ".main").scan(QByteArray(), [&](const QByteArray &key, const QByteArray &value) -> bool {
90 //Skip internals
91 if (Akonadi2::Storage::isInternalKey(key)) {
92 return true;
93 }
94 keys << Akonadi2::Storage::uidFromKey(key);
95 return true;
96 },
97 [](const Akonadi2::Storage::Error &error) {
98 qWarning() << "Error during query: " << error.message;
99 });
100
101 Trace() << "Full scan found " << keys.size() << " results";
102 return ResultSet(keys);
103}
104
105 31
106namespace Akonadi2 { 32namespace Akonadi2 {
33
107/** 34/**
108 * Default facade implementation for resources that are implemented in a separate process using the ResourceAccess class. 35 * Default facade implementation for resources that are implemented in a separate process using the ResourceAccess class.
109 * 36 *
@@ -125,185 +52,18 @@ public:
125 * @param resourceIdentifier is the identifier of the resource instance 52 * @param resourceIdentifier is the identifier of the resource instance
126 * @param adaptorFactory is the adaptor factory used to generate the mappings from domain to resource types and vice versa 53 * @param adaptorFactory is the adaptor factory used to generate the mappings from domain to resource types and vice versa
127 */ 54 */
128 GenericFacade(const QByteArray &resourceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &adaptorFactory = DomainTypeAdaptorFactoryInterface::Ptr(), const QSharedPointer<EntityStorage<DomainType> > storage = QSharedPointer<EntityStorage<DomainType> >(), const QSharedPointer<Akonadi2::ResourceAccessInterface> resourceAccess = QSharedPointer<Akonadi2::ResourceAccessInterface>()) 55 GenericFacade(const QByteArray &resourceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &adaptorFactory = DomainTypeAdaptorFactoryInterface::Ptr(), const QSharedPointer<Akonadi2::ResourceAccessInterface> resourceAccess = QSharedPointer<Akonadi2::ResourceAccessInterface>());
129 : Akonadi2::StoreFacade<DomainType>(), 56 ~GenericFacade();
130 mResourceAccess(resourceAccess),
131 mStorage(storage),
132 mDomainTypeAdaptorFactory(adaptorFactory),
133 mResourceInstanceIdentifier(resourceIdentifier)
134 {
135 if (!mResourceAccess) {
136 mResourceAccess = QSharedPointer<Akonadi2::ResourceAccess>::create(resourceIdentifier);
137 }
138 if (!mStorage) {
139 mStorage = QSharedPointer<EntityStorage<DomainType> >::create(resourceIdentifier);
140 const auto bufferType = bufferTypeForDomainType();
141
142 mStorage->readEntity = [bufferType, this] (const Akonadi2::Storage::Transaction &transaction, const QByteArray &key, const std::function<void(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &, Akonadi2::Operation)> &resultCallback)
143 {
144 //This only works for a 1:1 mapping of resource to domain types.
145 //Not i.e. for tags that are stored as flags in each entity of an imap store.
146 //additional properties that don't have a 1:1 mapping (such as separately stored tags),
147 //could be added to the adaptor.
148 transaction.openDatabase(bufferType + ".main").findLatest(key, [=](const QByteArray &key, const QByteArray &value) -> bool {
149 Akonadi2::EntityBuffer buffer(value.data(), value.size());
150 const Akonadi2::Entity &entity = buffer.entity();
151 const auto metadataBuffer = Akonadi2::EntityBuffer::readBuffer<Akonadi2::Metadata>(entity.metadata());
152 Q_ASSERT(metadataBuffer);
153 const qint64 revision = metadataBuffer ? metadataBuffer->revision() : -1;
154 resultCallback(DomainType::Ptr::create(mResourceInstanceIdentifier, Akonadi2::Storage::uidFromKey(key), revision, mDomainTypeAdaptorFactory->createAdaptor(entity)), metadataBuffer->operation());
155 return false;
156 },
157 [](const Akonadi2::Storage::Error &error) {
158 qWarning() << "Error during query: " << error.message;
159 });
160 };
161
162 mStorage->loadInitialResultSet = [bufferType, this] (const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters) -> ResultSet
163 {
164 QSet<QByteArray> appliedFilters;
165 auto resultSet = Akonadi2::ApplicationDomain::TypeImplementation<DomainType>::queryIndexes(query, mResourceInstanceIdentifier, appliedFilters, transaction);
166 remainingFilters = query.propertyFilter.keys().toSet() - appliedFilters;
167
168 //We do a full scan if there were no indexes available to create the initial set.
169 if (appliedFilters.isEmpty()) {
170 //TODO this should be replaced by an index lookup as well
171 return fullScan(transaction, bufferType);
172 }
173 return resultSet;
174 };
175
176 mStorage->loadIncrementalResultSet = [bufferType, this] (qint64 baseRevision, const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters) -> ResultSet
177 {
178 auto revisionCounter = QSharedPointer<qint64>::create(baseRevision);
179 return ResultSet([bufferType, revisionCounter, &transaction, this]() -> QByteArray {
180 const qint64 topRevision = Akonadi2::Storage::maxRevision(transaction);
181 //Spit out the revision keys one by one.
182 while (*revisionCounter <= topRevision) {
183 const auto uid = Akonadi2::Storage::getUidFromRevision(transaction, *revisionCounter);
184 const auto type = Akonadi2::Storage::getTypeFromRevision(transaction, *revisionCounter);
185 Trace() << "Revision" << *revisionCounter << type << uid;
186 if (type != bufferType) {
187 //Skip revision
188 *revisionCounter += 1;
189 continue;
190 }
191 const auto key = Akonadi2::Storage::assembleKey(uid, *revisionCounter);
192 *revisionCounter += 1;
193 return key;
194 }
195 //We're done
196 return QByteArray();
197 });
198 };
199 }
200 }
201
202 ~GenericFacade()
203 {
204 }
205
206 static QByteArray bufferTypeForDomainType()
207 {
208 //We happen to have a one to one mapping
209 return Akonadi2::ApplicationDomain::getTypeName<DomainType>();
210 }
211
212 KAsync::Job<void> create(const DomainType &domainObject) Q_DECL_OVERRIDE
213 {
214 if (!mDomainTypeAdaptorFactory) {
215 Warning() << "No domain type adaptor factory available";
216 return KAsync::error<void>();
217 }
218 flatbuffers::FlatBufferBuilder entityFbb;
219 mDomainTypeAdaptorFactory->createBuffer(domainObject, entityFbb);
220 return mResourceAccess->sendCreateCommand(bufferTypeForDomainType(), QByteArray::fromRawData(reinterpret_cast<const char*>(entityFbb.GetBufferPointer()), entityFbb.GetSize()));
221 }
222
223 KAsync::Job<void> modify(const DomainType &domainObject) Q_DECL_OVERRIDE
224 {
225 if (!mDomainTypeAdaptorFactory) {
226 Warning() << "No domain type adaptor factory available";
227 return KAsync::error<void>();
228 }
229 flatbuffers::FlatBufferBuilder entityFbb;
230 mDomainTypeAdaptorFactory->createBuffer(domainObject, entityFbb);
231 return mResourceAccess->sendModifyCommand(domainObject.identifier(), domainObject.revision(), bufferTypeForDomainType(), QByteArrayList(), QByteArray::fromRawData(reinterpret_cast<const char*>(entityFbb.GetBufferPointer()), entityFbb.GetSize()));
232 }
233
234 KAsync::Job<void> remove(const DomainType &domainObject) Q_DECL_OVERRIDE
235 {
236 return mResourceAccess->sendDeleteCommand(domainObject.identifier(), domainObject.revision(), bufferTypeForDomainType());
237 }
238
239 //TODO JOBAPI return job from sync continuation to execute it as subjob?
240 KAsync::Job<void> load(const Akonadi2::Query &query, const QSharedPointer<Akonadi2::ResultProvider<typename DomainType::Ptr> > &resultProvider) Q_DECL_OVERRIDE
241 {
242 auto runner = QSharedPointer<QueryRunner>::create(query);
243 QWeakPointer<Akonadi2::ResultProvider<typename DomainType::Ptr> > weakResultProvider = resultProvider;
244 runner->setQuery([this, weakResultProvider, query] (qint64 oldRevision) -> KAsync::Job<qint64> {
245 return KAsync::start<qint64>([this, weakResultProvider, query, oldRevision](KAsync::Future<qint64> &future) {
246 Trace() << "Executing query " << oldRevision;
247 auto resultProvider = weakResultProvider.toStrongRef();
248 if (!resultProvider) {
249 Warning() << "Tried executing query after result provider is already gone";
250 future.setError(0, QString());
251 future.setFinished();
252 return;
253 }
254 load(query, resultProvider, oldRevision).template then<void, qint64>([&future, this](qint64 queriedRevision) {
255 //TODO set revision in result provider?
256 //TODO update all existing results with new revision
257 mResourceAccess->sendRevisionReplayedCommand(queriedRevision);
258 future.setValue(queriedRevision);
259 future.setFinished();
260 }).exec();
261 });
262 });
263
264 //In case of a live query we keep the runner for as long alive as the result provider exists
265 if (query.liveQuery) {
266 resultProvider->setQueryRunner(runner);
267 //Ensure the connection is open, if it wasn't already opened
268 //TODO If we are not connected already, we have to check for the latest revision once connected, otherwise we could miss some updates
269 mResourceAccess->open();
270 QObject::connect(mResourceAccess.data(), &Akonadi2::ResourceAccess::revisionChanged, runner.data(), &QueryRunner::revisionChanged);
271 }
272
273 //We have to capture the runner to keep it alive
274 return synchronizeResource(query).template then<void>([runner](KAsync::Future<void> &future) {
275 runner->run().then<void>([&future]() {
276 future.setFinished();
277 }).exec();
278 },
279 [](int error, const QString &errorString) {
280 Warning() << "Error during sync " << error << errorString;
281 });
282 }
283
284private:
285 KAsync::Job<void> synchronizeResource(const Akonadi2::Query &query)
286 {
287 //TODO check if a sync is necessary
288 //TODO Only sync what was requested
289 //TODO timeout
290 if (query.syncOnDemand || query.processAll) {
291 return mResourceAccess->synchronizeResource(query.syncOnDemand, query.processAll);
292 }
293 return KAsync::null<void>();
294 }
295 57
296 virtual KAsync::Job<qint64> load(const Akonadi2::Query &query, const QSharedPointer<Akonadi2::ResultProvider<typename DomainType::Ptr> > &resultProvider, qint64 oldRevision) 58 static QByteArray bufferTypeForDomainType();
297 { 59 KAsync::Job<void> create(const DomainType &domainObject) Q_DECL_OVERRIDE;
298 return KAsync::start<qint64>([=]() -> qint64 { 60 KAsync::Job<void> modify(const DomainType &domainObject) Q_DECL_OVERRIDE;
299 return mStorage->read(query, oldRevision, resultProvider); 61 KAsync::Job<void> remove(const DomainType &domainObject) Q_DECL_OVERRIDE;
300 }); 62 QPair<KAsync::Job<void>, typename ResultEmitter<typename DomainType::Ptr>::Ptr> load(const Akonadi2::Query &query) Q_DECL_OVERRIDE;
301 }
302 63
303protected: 64protected:
304 //TODO use one resource access instance per application & per resource 65 //TODO use one resource access instance per application & per resource
305 QSharedPointer<Akonadi2::ResourceAccessInterface> mResourceAccess; 66 QSharedPointer<Akonadi2::ResourceAccessInterface> mResourceAccess;
306 QSharedPointer<EntityStorage<DomainType> > mStorage;
307 DomainTypeAdaptorFactoryInterface::Ptr mDomainTypeAdaptorFactory; 67 DomainTypeAdaptorFactoryInterface::Ptr mDomainTypeAdaptorFactory;
308 QByteArray mResourceInstanceIdentifier; 68 QByteArray mResourceInstanceIdentifier;
309}; 69};
diff --git a/common/facadeinterface.h b/common/facadeinterface.h
index 3a38db8..318abf3 100644
--- a/common/facadeinterface.h
+++ b/common/facadeinterface.h
@@ -23,6 +23,7 @@
23#include <Async/Async> 23#include <Async/Async>
24#include <QByteArray> 24#include <QByteArray>
25#include <QSharedPointer> 25#include <QSharedPointer>
26#include <QPair>
26#include "applicationdomaintype.h" 27#include "applicationdomaintype.h"
27#include "resultprovider.h" 28#include "resultprovider.h"
28 29
@@ -42,10 +43,32 @@ class StoreFacade {
42public: 43public:
43 virtual ~StoreFacade(){}; 44 virtual ~StoreFacade(){};
44 QByteArray type() const { return ApplicationDomain::getTypeName<DomainType>(); } 45 QByteArray type() const { return ApplicationDomain::getTypeName<DomainType>(); }
46
47 /**
48 * Create an entity in the store.
49 *
50 * The job returns succefully once the task has been successfully placed in the queue
51 */
45 virtual KAsync::Job<void> create(const DomainType &domainObject) = 0; 52 virtual KAsync::Job<void> create(const DomainType &domainObject) = 0;
53
54 /**
55 * Modify an entity in the store.
56 *
57 * The job returns succefully once the task has been successfully placed in the queue
58 */
46 virtual KAsync::Job<void> modify(const DomainType &domainObject) = 0; 59 virtual KAsync::Job<void> modify(const DomainType &domainObject) = 0;
60
61 /**
62 * Remove an entity from the store.
63 *
64 * The job returns succefully once the task has been successfully placed in the queue
65 */
47 virtual KAsync::Job<void> remove(const DomainType &domainObject) = 0; 66 virtual KAsync::Job<void> remove(const DomainType &domainObject) = 0;
48 virtual KAsync::Job<void> load(const Query &query, const QSharedPointer<Akonadi2::ResultProvider<typename DomainType::Ptr> > &resultProvider) = 0; 67
68 /**
69 * Load entities from the store.
70 */
71 virtual QPair<KAsync::Job<void>, typename Akonadi2::ResultEmitter<typename DomainType::Ptr>::Ptr > load(const Query &query) = 0;
49}; 72};
50 73
51template<class DomainType> 74template<class DomainType>
@@ -67,9 +90,9 @@ public:
67 return KAsync::error<void>(-1, "Failed to create a facade"); 90 return KAsync::error<void>(-1, "Failed to create a facade");
68 } 91 }
69 92
70 KAsync::Job<void> load(const Query &query, const QSharedPointer<Akonadi2::ResultProvider<typename DomainType::Ptr> > &resultProvider) 93 QPair<KAsync::Job<void>, typename Akonadi2::ResultEmitter<typename DomainType::Ptr>::Ptr > load(const Query &query)
71 { 94 {
72 return KAsync::error<void>(-1, "Failed to create a facade"); 95 return qMakePair(KAsync::null<void>(), typename Akonadi2::ResultEmitter<typename DomainType::Ptr>::Ptr());
73 } 96 }
74}; 97};
75 98
diff --git a/common/modelresult.cpp b/common/modelresult.cpp
new file mode 100644
index 0000000..c7fcd49
--- /dev/null
+++ b/common/modelresult.cpp
@@ -0,0 +1,251 @@
1/*
2 * Copyright (C) 2014 Christian Mollekopf <chrigi_1@fastmail.fm>
3 *
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Lesser General Public
6 * License as published by the Free Software Foundation; either
7 * version 2.1 of the License, or (at your option) version 3, or any
8 * later version accepted by the membership of KDE e.V. (or its
9 * successor approved by the membership of KDE e.V.), which shall
10 * act as a proxy defined in Section 6 of version 3 of the license.
11 *
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this library. If not, see <http://www.gnu.org/licenses/>.
19 */
20#include "modelresult.h"
21
22#include <QDebug>
23
24#include "domain/folder.h"
25#include "log.h"
26
27template<class T, class Ptr>
28ModelResult<T, Ptr>::ModelResult(const Akonadi2::Query &query, const QList<QByteArray> &propertyColumns)
29 :QAbstractItemModel(),
30 mPropertyColumns(propertyColumns),
31 mQuery(query)
32{
33}
34
35static qint64 getIdentifier(const QModelIndex &idx)
36{
37 if (!idx.isValid()) {
38 return 0;
39 }
40 return idx.internalId();
41}
42
43template<class T, class Ptr>
44qint64 ModelResult<T, Ptr>::parentId(const Ptr &value)
45{
46 if (!mQuery.parentProperty.isEmpty()) {
47 const auto property = value->getProperty(mQuery.parentProperty).toByteArray();
48 if (!property.isEmpty()) {
49 return qHash(property);
50 }
51 }
52 return 0;
53}
54
55template<class T, class Ptr>
56int ModelResult<T, Ptr>::rowCount(const QModelIndex &parent) const
57{
58 return mTree[getIdentifier(parent)].size();
59}
60
61template<class T, class Ptr>
62int ModelResult<T, Ptr>::columnCount(const QModelIndex &parent) const
63{
64 return mPropertyColumns.size();
65}
66
67template<class T, class Ptr>
68QVariant ModelResult<T, Ptr>::data(const QModelIndex &index, int role) const
69{
70 if (role == DomainObjectRole) {
71 Q_ASSERT(mEntities.contains(index.internalId()));
72 return QVariant::fromValue(mEntities.value(index.internalId()));
73 }
74 if (role == ChildrenFetchedRole) {
75 return childrenFetched(index);
76 }
77 if (role == Qt::DisplayRole) {
78 if (index.column() < mPropertyColumns.size()) {
79 Q_ASSERT(mEntities.contains(index.internalId()));
80 auto entity = mEntities.value(index.internalId());
81 return entity->getProperty(mPropertyColumns.at(index.column())).toString();
82 } else {
83 return "No data available";
84 }
85 }
86 return QVariant();
87}
88
89template<class T, class Ptr>
90QModelIndex ModelResult<T, Ptr>::index(int row, int column, const QModelIndex &parent) const
91{
92 const auto id = getIdentifier(parent);
93 const auto childId = mTree.value(id).at(row);
94 return createIndex(row, column, childId);
95}
96
97template<class T, class Ptr>
98QModelIndex ModelResult<T, Ptr>::createIndexFromId(const qint64 &id) const
99{
100 if (id == 0) {
101 return QModelIndex();
102 }
103 auto grandParentId = mParents.value(id, 0);
104 auto row = mTree.value(grandParentId).indexOf(id);
105 return createIndex(row, 0, id);
106}
107
108template<class T, class Ptr>
109QModelIndex ModelResult<T, Ptr>::parent(const QModelIndex &index) const
110{
111 auto id = getIdentifier(index);
112 auto parentId = mParents.value(id);
113 return createIndexFromId(parentId);
114}
115
116template<class T, class Ptr>
117bool ModelResult<T, Ptr>::hasChildren(const QModelIndex &parent) const
118{
119 if (mQuery.parentProperty.isEmpty() && parent.isValid()) {
120 return false;
121 }
122 return QAbstractItemModel::hasChildren(parent);
123}
124
125template<class T, class Ptr>
126bool ModelResult<T, Ptr>::canFetchMore(const QModelIndex &parent) const
127{
128 return !mEntityChildrenFetched.contains(parent.internalId());
129}
130
131template<class T, class Ptr>
132void ModelResult<T, Ptr>::fetchMore(const QModelIndex &parent)
133{
134 fetchEntities(parent);
135}
136
137template<class T, class Ptr>
138void ModelResult<T, Ptr>::add(const Ptr &value)
139{
140 const auto childId = qHash(value->identifier());
141 const auto id = parentId(value);
142 //Ignore updates we get before the initial fetch is done
143 if (!mEntityChildrenFetched.contains(id)) {
144 return;
145 }
146 auto parent = createIndexFromId(id);
147 // qDebug() << "Added entity " << childId << value->identifier() << id;
148 const auto keys = mTree[id];
149 int index = 0;
150 for (; index < keys.size(); index++) {
151 if (childId < keys.at(index)) {
152 break;
153 }
154 }
155 if (mEntities.contains(childId)) {
156 Warning() << "Entity already in model " << value->identifier();
157 return;
158 }
159 // qDebug() << "Inserting rows " << index << parent;
160 beginInsertRows(parent, index, index);
161 mEntities.insert(childId, value);
162 mTree[id].insert(index, childId);
163 mParents.insert(childId, id);
164 endInsertRows();
165 // qDebug() << "Inserted rows " << mTree[id].size();
166}
167
168
169template<class T, class Ptr>
170void ModelResult<T, Ptr>::remove(const Ptr &value)
171{
172 auto childId = qHash(value->identifier());
173 auto id = parentId(value);
174 auto parent = createIndexFromId(id);
175 // qDebug() << "Removed entity" << childId;
176 auto index = mTree[id].indexOf(qHash(value->identifier()));
177 beginRemoveRows(parent, index, index);
178 mEntities.remove(childId);
179 mTree[id].removeAll(childId);
180 mParents.remove(childId);
181 //TODO remove children
182 endRemoveRows();
183}
184
185template<class T, class Ptr>
186void ModelResult<T, Ptr>::fetchEntities(const QModelIndex &parent)
187{
188 const auto id = getIdentifier(parent);
189 mEntityChildrenFetched.insert(id);
190 Trace() << "Loading child entities";
191 loadEntities(parent.data(DomainObjectRole).template value<Ptr>());
192}
193
194template<class T, class Ptr>
195void ModelResult<T, Ptr>::setFetcher(const std::function<void(const Ptr &parent)> &fetcher)
196{
197 Trace() << "Setting fetcher";
198 loadEntities = fetcher;
199}
200
201template<class T, class Ptr>
202void ModelResult<T, Ptr>::setEmitter(const typename Akonadi2::ResultEmitter<Ptr>::Ptr &emitter)
203{
204 setFetcher(emitter->mFetcher);
205 emitter->onAdded([this](const Ptr &value) {
206 this->add(value);
207 });
208 emitter->onModified([this](const Ptr &value) {
209 this->modify(value);
210 });
211 emitter->onRemoved([this](const Ptr &value) {
212 this->remove(value);
213 });
214 emitter->onInitialResultSetComplete([this](const Ptr &parent) {
215 const qint64 parentId = parent ? qHash(parent->identifier()) : 0;
216 const auto parentIndex = createIndexFromId(parentId);
217 mEntityChildrenFetchComplete.insert(parentId);
218 emit dataChanged(parentIndex, parentIndex, QVector<int>() << ChildrenFetchedRole);
219 });
220 mEmitter = emitter;
221}
222
223template<class T, class Ptr>
224bool ModelResult<T, Ptr>::childrenFetched(const QModelIndex &index) const
225{
226 return mEntityChildrenFetchComplete.contains(getIdentifier(index));
227}
228
229template<class T, class Ptr>
230void ModelResult<T, Ptr>::modify(const Ptr &value)
231{
232 auto childId = qHash(value->identifier());
233 auto id = parentId(value);
234 //Ignore updates we get before the initial fetch is done
235 if (!mEntityChildrenFetched.contains(id)) {
236 return;
237 }
238 auto parent = createIndexFromId(id);
239 // qDebug() << "Modified entity" << childId;
240 auto i = mTree[id].indexOf(childId);
241 mEntities.remove(childId);
242 mEntities.insert(childId, value);
243 //TODO check for change of parents
244 auto idx = index(i, 0, parent);
245 emit dataChanged(idx, idx);
246}
247
248template class ModelResult<Akonadi2::ApplicationDomain::Folder, Akonadi2::ApplicationDomain::Folder::Ptr>;
249template class ModelResult<Akonadi2::ApplicationDomain::Mail, Akonadi2::ApplicationDomain::Mail::Ptr>;
250template class ModelResult<Akonadi2::ApplicationDomain::Event, Akonadi2::ApplicationDomain::Event::Ptr>;
251template class ModelResult<Akonadi2::ApplicationDomain::AkonadiResource, Akonadi2::ApplicationDomain::AkonadiResource::Ptr>;
diff --git a/common/modelresult.h b/common/modelresult.h
new file mode 100644
index 0000000..700064b
--- /dev/null
+++ b/common/modelresult.h
@@ -0,0 +1,78 @@
1/*
2 * Copyright (C) 2014 Christian Mollekopf <chrigi_1@fastmail.fm>
3 *
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Lesser General Public
6 * License as published by the Free Software Foundation; either
7 * version 2.1 of the License, or (at your option) version 3, or any
8 * later version accepted by the membership of KDE e.V. (or its
9 * successor approved by the membership of KDE e.V.), which shall
10 * act as a proxy defined in Section 6 of version 3 of the license.
11 *
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this library. If not, see <http://www.gnu.org/licenses/>.
19 */
20
21#pragma once
22
23#include <QAbstractItemModel>
24#include <QModelIndex>
25#include <QDebug>
26#include <QSharedPointer>
27#include <functional>
28#include "query.h"
29#include "resultprovider.h"
30
31template<class T, class Ptr>
32class ModelResult : public QAbstractItemModel
33{
34public:
35 enum Roles {
36 DomainObjectRole = Qt::UserRole + 1,
37 ChildrenFetchedRole
38 };
39
40 ModelResult(const Akonadi2::Query &query, const QList<QByteArray> &propertyColumns);
41
42 void setEmitter(const typename Akonadi2::ResultEmitter<Ptr>::Ptr &);
43
44 int rowCount(const QModelIndex &parent = QModelIndex()) const;
45 int columnCount(const QModelIndex &parent = QModelIndex()) const;
46 QVariant data(const QModelIndex &index, int role = Qt::DisplayRole) const;
47 QModelIndex index(int row, int column, const QModelIndex & parent = QModelIndex()) const;
48 QModelIndex parent(const QModelIndex &index) const;
49 bool hasChildren(const QModelIndex &parent = QModelIndex()) const;
50
51 bool canFetchMore(const QModelIndex &parent) const;
52 void fetchMore(const QModelIndex &parent);
53
54 void add(const Ptr &value);
55 void modify(const Ptr &value);
56 void remove(const Ptr &value);
57
58 void setFetcher(const std::function<void(const Ptr &parent)> &fetcher);
59
60 bool childrenFetched(const QModelIndex &) const;
61
62private:
63 qint64 parentId(const Ptr &value);
64 QModelIndex createIndexFromId(const qint64 &id) const;
65 void fetchEntities(const QModelIndex &parent);
66
67 //TODO we should be able to directly use T as index, with an appropriate hash function, and thus have a QMap<T, T> and QList<T>
68 QMap<qint64 /* entity id */, Ptr> mEntities;
69 QMap<qint64 /* parent entity id */, QList<qint64> /* child entity id*/> mTree;
70 QMap<qint64 /* child entity id */, qint64 /* parent entity id*/> mParents;
71 QSet<qint64 /* entity id */> mEntityChildrenFetched;
72 QSet<qint64 /* entity id */> mEntityChildrenFetchComplete;
73 QList<QByteArray> mPropertyColumns;
74 Akonadi2::Query mQuery;
75 std::function<void(const Ptr &)> loadEntities;
76 typename Akonadi2::ResultEmitter<Ptr>::Ptr mEmitter;
77};
78
diff --git a/common/query.h b/common/query.h
index 0cad9fb..5313fa9 100644
--- a/common/query.h
+++ b/common/query.h
@@ -53,6 +53,7 @@ public:
53 QHash<QByteArray, QVariant> propertyFilter; 53 QHash<QByteArray, QVariant> propertyFilter;
54 //Properties to retrieve 54 //Properties to retrieve
55 QSet<QByteArray> requestedProperties; 55 QSet<QByteArray> requestedProperties;
56 QByteArray parentProperty;
56 bool syncOnDemand; 57 bool syncOnDemand;
57 bool processAll; 58 bool processAll;
58 //If live query is false, this query will not continuously be updated 59 //If live query is false, this query will not continuously be updated
diff --git a/common/queryrunner.cpp b/common/queryrunner.cpp
new file mode 100644
index 0000000..e365cfc
--- /dev/null
+++ b/common/queryrunner.cpp
@@ -0,0 +1,312 @@
1/*
2 Copyright (c) 2015 Christian Mollekopf <mollekopf@kolabsys.com>
3
4 This library is free software; you can redistribute it and/or modify it
5 under the terms of the GNU Library General Public License as published by
6 the Free Software Foundation; either version 2 of the License, or (at your
7 option) any later version.
8
9 This library is distributed in the hope that it will be useful, but WITHOUT
10 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public
12 License for more details.
13
14 You should have received a copy of the GNU Library General Public License
15 along with this library; see the file COPYING.LIB. If not, write to the
16 Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
17 02110-1301, USA.
18*/
19#include "queryrunner.h"
20
21#include <QtConcurrent/QtConcurrentRun>
22#include <QFuture>
23#include <QFutureWatcher>
24#include <QTime>
25#include "commands.h"
26#include "log.h"
27#include "storage.h"
28#include "definitions.h"
29#include "domainadaptor.h"
30
31using namespace Akonadi2;
32
33static inline ResultSet fullScan(const Akonadi2::Storage::Transaction &transaction, const QByteArray &bufferType)
34{
35 //TODO use a result set with an iterator, to read values on demand
36 QVector<QByteArray> keys;
37 transaction.openDatabase(bufferType + ".main").scan(QByteArray(), [&](const QByteArray &key, const QByteArray &value) -> bool {
38 //Skip internals
39 if (Akonadi2::Storage::isInternalKey(key)) {
40 return true;
41 }
42 keys << Akonadi2::Storage::uidFromKey(key);
43 return true;
44 },
45 [](const Akonadi2::Storage::Error &error) {
46 qWarning() << "Error during query: " << error.message;
47 });
48
49 Trace() << "Full scan on " << bufferType << " found " << keys.size() << " results";
50 return ResultSet(keys);
51}
52
53template<class DomainType>
54QueryRunner<DomainType>::QueryRunner(const Akonadi2::Query &query, const Akonadi2::ResourceAccessInterface::Ptr &resourceAccess, const QByteArray &instanceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &factory, const QByteArray &bufferType)
55 : QueryRunnerBase(),
56 mResourceAccess(resourceAccess),
57 mResultProvider(new ResultProvider<typename DomainType::Ptr>),
58 mDomainTypeAdaptorFactory(factory),
59 mQuery(query),
60 mResourceInstanceIdentifier(instanceIdentifier),
61 mBufferType(bufferType)
62{
63 Trace() << "Starting query";
64 //We delegate loading of initial data to the result provider, os it can decide for itself what it needs to load.
65 mResultProvider->setFetcher([this, query](const typename DomainType::Ptr &parent) {
66 Trace() << "Running fetcher";
67
68 // auto watcher = new QFutureWatcher<qint64>;
69 // QObject::connect(watcher, &QFutureWatcher::finished, [](qint64 newRevision) {
70 // mResourceAccess->sendRevisionReplayedCommand(newRevision);
71 // });
72 // auto future = QtConcurrent::run([&resultProvider]() -> qint64 {
73 // const qint64 newRevision = executeInitialQuery(query, parent, resultProvider);
74 // return newRevision;
75 // });
76 // watcher->setFuture(future);
77 const qint64 newRevision = executeInitialQuery(query, parent, *mResultProvider);
78 mResourceAccess->sendRevisionReplayedCommand(newRevision);
79 });
80
81
82 //In case of a live query we keep the runner for as long alive as the result provider exists
83 if (query.liveQuery) {
84 //Incremental updates are always loaded directly, leaving it up to the result to discard the changes if they are not interesting
85 setQuery([this, query] () -> KAsync::Job<void> {
86 return KAsync::start<void>([this, query](KAsync::Future<void> &future) {
87 //TODO execute in thread
88 const qint64 newRevision = executeIncrementalQuery(query, *mResultProvider);
89 mResourceAccess->sendRevisionReplayedCommand(newRevision);
90 future.setFinished();
91 });
92 });
93 //Ensure the connection is open, if it wasn't already opened
94 //TODO If we are not connected already, we have to check for the latest revision once connected, otherwise we could miss some updates
95 mResourceAccess->open();
96 QObject::connect(mResourceAccess.data(), &Akonadi2::ResourceAccess::revisionChanged, this, &QueryRunner::revisionChanged);
97 }
98}
99
100template<class DomainType>
101QueryRunner<DomainType>::~QueryRunner()
102{
103 Trace() << "Stopped query";
104}
105
106template<class DomainType>
107typename Akonadi2::ResultEmitter<typename DomainType::Ptr>::Ptr QueryRunner<DomainType>::emitter()
108{
109 return mResultProvider->emitter();
110}
111
112//TODO move into result provider?
113template<class DomainType>
114void QueryRunner<DomainType>::replaySet(ResultSet &resultSet, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider)
115{
116 // Trace() << "Replay set";
117 int counter = 0;
118 while (resultSet.next([&resultProvider, &counter](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &value, Akonadi2::Operation operation) -> bool {
119 counter++;
120 switch (operation) {
121 case Akonadi2::Operation_Creation:
122 // Trace() << "Got creation";
123 resultProvider.add(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<DomainType>(*value).template staticCast<DomainType>());
124 break;
125 case Akonadi2::Operation_Modification:
126 // Trace() << "Got modification";
127 resultProvider.modify(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<DomainType>(*value).template staticCast<DomainType>());
128 break;
129 case Akonadi2::Operation_Removal:
130 // Trace() << "Got removal";
131 resultProvider.remove(Akonadi2::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<DomainType>(*value).template staticCast<DomainType>());
132 break;
133 }
134 return true;
135 })){};
136 Trace() << "Replayed " << counter << " results";
137}
138
139template<class DomainType>
140void QueryRunner<DomainType>::readEntity(const Akonadi2::Storage::NamedDatabase &db, const QByteArray &key, const std::function<void(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &, Akonadi2::Operation)> &resultCallback)
141{
142 //This only works for a 1:1 mapping of resource to domain types.
143 //Not i.e. for tags that are stored as flags in each entity of an imap store.
144 //additional properties that don't have a 1:1 mapping (such as separately stored tags),
145 //could be added to the adaptor.
146 db.findLatest(key, [=](const QByteArray &key, const QByteArray &value) -> bool {
147 Akonadi2::EntityBuffer buffer(value.data(), value.size());
148 const Akonadi2::Entity &entity = buffer.entity();
149 const auto metadataBuffer = Akonadi2::EntityBuffer::readBuffer<Akonadi2::Metadata>(entity.metadata());
150 Q_ASSERT(metadataBuffer);
151 const qint64 revision = metadataBuffer ? metadataBuffer->revision() : -1;
152 resultCallback(DomainType::Ptr::create(mResourceInstanceIdentifier, Akonadi2::Storage::uidFromKey(key), revision, mDomainTypeAdaptorFactory->createAdaptor(entity)), metadataBuffer->operation());
153 return false;
154 },
155 [](const Akonadi2::Storage::Error &error) {
156 qWarning() << "Error during query: " << error.message;
157 });
158}
159
160template<class DomainType>
161ResultSet QueryRunner<DomainType>::loadInitialResultSet(const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters)
162{
163 QSet<QByteArray> appliedFilters;
164 auto resultSet = Akonadi2::ApplicationDomain::TypeImplementation<DomainType>::queryIndexes(query, mResourceInstanceIdentifier, appliedFilters, transaction);
165 remainingFilters = query.propertyFilter.keys().toSet() - appliedFilters;
166
167 //We do a full scan if there were no indexes available to create the initial set.
168 if (appliedFilters.isEmpty()) {
169 //TODO this should be replaced by an index lookup as well
170 resultSet = fullScan(transaction, mBufferType);
171 }
172 return resultSet;
173}
174
175template<class DomainType>
176ResultSet QueryRunner<DomainType>::loadIncrementalResultSet(qint64 baseRevision, const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters)
177{
178 const auto bufferType = mBufferType;
179 auto revisionCounter = QSharedPointer<qint64>::create(baseRevision);
180 remainingFilters = query.propertyFilter.keys().toSet();
181 return ResultSet([bufferType, revisionCounter, &transaction]() -> QByteArray {
182 const qint64 topRevision = Akonadi2::Storage::maxRevision(transaction);
183 //Spit out the revision keys one by one.
184 while (*revisionCounter <= topRevision) {
185 const auto uid = Akonadi2::Storage::getUidFromRevision(transaction, *revisionCounter);
186 const auto type = Akonadi2::Storage::getTypeFromRevision(transaction, *revisionCounter);
187 // Trace() << "Revision" << *revisionCounter << type << uid;
188 if (type != bufferType) {
189 //Skip revision
190 *revisionCounter += 1;
191 continue;
192 }
193 const auto key = Akonadi2::Storage::assembleKey(uid, *revisionCounter);
194 *revisionCounter += 1;
195 return key;
196 }
197 Trace() << "Finished reading incremental result set:" << *revisionCounter;
198 //We're done
199 return QByteArray();
200 });
201}
202
203template<class DomainType>
204ResultSet QueryRunner<DomainType>::filterSet(const ResultSet &resultSet, const std::function<bool(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> &filter, const Akonadi2::Storage::NamedDatabase &db, bool initialQuery)
205{
206 auto resultSetPtr = QSharedPointer<ResultSet>::create(resultSet);
207
208 //Read through the source values and return whatever matches the filter
209 std::function<bool(std::function<void(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &, Akonadi2::Operation)>)> generator = [this, resultSetPtr, &db, filter, initialQuery](std::function<void(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &, Akonadi2::Operation)> callback) -> bool {
210 while (resultSetPtr->next()) {
211 //readEntity is only necessary if we actually want to filter or know the operation type (but not a big deal if we do it always I guess)
212 readEntity(db, resultSetPtr->id(), [this, filter, callback, initialQuery](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject, Akonadi2::Operation operation) {
213 //Always remove removals, they probably don't match due to non-available properties
214 if (filter(domainObject) || operation == Akonadi2::Operation_Removal) {
215 if (initialQuery) {
216 //We're not interested in removals during the initial query
217 if (operation != Akonadi2::Operation_Removal) {
218 callback(domainObject, Akonadi2::Operation_Creation);
219 }
220 } else {
221 callback(domainObject, operation);
222 }
223 }
224 });
225 }
226 return false;
227 };
228 return ResultSet(generator);
229}
230
231
232template<class DomainType>
233std::function<bool(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> QueryRunner<DomainType>::getFilter(const QSet<QByteArray> remainingFilters, const Akonadi2::Query &query)
234{
235 return [remainingFilters, query](const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject) -> bool {
236 for (const auto &filterProperty : remainingFilters) {
237 const auto property = domainObject->getProperty(filterProperty);
238 if (property.isValid()) {
239 //TODO implement other comparison operators than equality
240 if (property != query.propertyFilter.value(filterProperty)) {
241 Trace() << "Filtering entity due to property mismatch: " << domainObject->getProperty(filterProperty);
242 return false;
243 }
244 } else {
245 Warning() << "Ignored property filter because value is invalid: " << filterProperty;
246 }
247 }
248 return true;
249 };
250}
251
252template<class DomainType>
253qint64 QueryRunner<DomainType>::load(const Akonadi2::Query &query, const std::function<ResultSet(Akonadi2::Storage::Transaction &, QSet<QByteArray> &)> &baseSetRetriever, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, bool initialQuery)
254{
255 Akonadi2::Storage storage(Akonadi2::storageLocation(), mResourceInstanceIdentifier);
256 storage.setDefaultErrorHandler([](const Akonadi2::Storage::Error &error) {
257 Warning() << "Error during query: " << error.store << error.message;
258 });
259 auto transaction = storage.createTransaction(Akonadi2::Storage::ReadOnly);
260 auto db = transaction.openDatabase(mBufferType + ".main");
261
262 QSet<QByteArray> remainingFilters;
263 auto resultSet = baseSetRetriever(transaction, remainingFilters);
264 auto filteredSet = filterSet(resultSet, getFilter(remainingFilters, query), db, initialQuery);
265 replaySet(filteredSet, resultProvider);
266 resultProvider.setRevision(Akonadi2::Storage::maxRevision(transaction));
267 return Akonadi2::Storage::maxRevision(transaction);
268}
269
270
271template<class DomainType>
272qint64 QueryRunner<DomainType>::executeIncrementalQuery(const Akonadi2::Query &query, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider)
273{
274 QTime time;
275 time.start();
276
277 const qint64 baseRevision = resultProvider.revision() + 1;
278 Trace() << "Running incremental query " << baseRevision;
279 auto revision = load(query, [&](Akonadi2::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters) -> ResultSet {
280 return loadIncrementalResultSet(baseRevision, query, transaction, remainingFilters);
281 }, resultProvider, false);
282 Trace() << "Incremental query took: " << time.elapsed() << " ms";
283 return revision;
284}
285
286template<class DomainType>
287qint64 QueryRunner<DomainType>::executeInitialQuery(const Akonadi2::Query &query, const typename DomainType::Ptr &parent, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider)
288{
289 QTime time;
290 time.start();
291
292 auto modifiedQuery = query;
293 if (!query.parentProperty.isEmpty()) {
294 if (parent) {
295 Trace() << "Running initial query for parent:" << parent->identifier();
296 modifiedQuery.propertyFilter.insert(query.parentProperty, parent->identifier());
297 } else {
298 Trace() << "Running initial query for toplevel";
299 modifiedQuery.propertyFilter.insert(query.parentProperty, QVariant());
300 }
301 }
302 auto revision = load(modifiedQuery, [&](Akonadi2::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters) -> ResultSet {
303 return loadInitialResultSet(modifiedQuery, transaction, remainingFilters);
304 }, resultProvider, true);
305 Trace() << "Initial query took: " << time.elapsed() << " ms";
306 resultProvider.initialResultSetComplete(parent);
307 return revision;
308}
309
310template class QueryRunner<Akonadi2::ApplicationDomain::Folder>;
311template class QueryRunner<Akonadi2::ApplicationDomain::Mail>;
312template class QueryRunner<Akonadi2::ApplicationDomain::Event>;
diff --git a/common/queryrunner.h b/common/queryrunner.h
new file mode 100644
index 0000000..c918dcb
--- /dev/null
+++ b/common/queryrunner.h
@@ -0,0 +1,108 @@
1/*
2 Copyright (c) 2015 Christian Mollekopf <mollekopf@kolabsys.com>
3
4 This library is free software; you can redistribute it and/or modify it
5 under the terms of the GNU Library General Public License as published by
6 the Free Software Foundation; either version 2 of the License, or (at your
7 option) any later version.
8
9 This library is distributed in the hope that it will be useful, but WITHOUT
10 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public
12 License for more details.
13
14 You should have received a copy of the GNU Library General Public License
15 along with this library; see the file COPYING.LIB. If not, write to the
16 Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
17 02110-1301, USA.
18*/
19
20#pragma once
21
22#include <QObject>
23#include "facadeinterface.h"
24#include "resourceaccess.h"
25#include "resultprovider.h"
26#include "domaintypeadaptorfactoryinterface.h"
27#include "storage.h"
28#include "query.h"
29
30/**
31 * A QueryRunner runs a query and updates the corresponding result set.
32 *
33 * The lifetime of the QueryRunner is defined by the resut set (otherwise it's doing useless work),
34 * and by how long a result set must be updated. If the query is one off the runner dies after the execution,
35 * otherwise it lives on the react to changes and updates the corresponding result set.
36 *
37 * QueryRunner has to keep ResourceAccess alive in order to keep getting updates.
38 */
39
40class QueryRunnerBase : public QObject
41{
42 Q_OBJECT
43protected:
44 typedef std::function<KAsync::Job<void>()> QueryFunction;
45
46 /**
47 * Set the query to run
48 */
49 void setQuery(const QueryFunction &query)
50 {
51 queryFunction = query;
52 }
53
54
55protected slots:
56 /**
57 * Rerun query with new revision
58 */
59 void revisionChanged(qint64 newRevision)
60 {
61 Trace() << "New revision: " << newRevision;
62 run().exec();
63 }
64
65private:
66 /**
67 * Starts query
68 */
69 KAsync::Job<void> run(qint64 newRevision = 0)
70 {
71 return queryFunction();
72 }
73
74 QueryFunction queryFunction;
75};
76
77template<typename DomainType>
78class QueryRunner : public QueryRunnerBase
79{
80public:
81 QueryRunner(const Akonadi2::Query &query, const Akonadi2::ResourceAccessInterface::Ptr &, const QByteArray &instanceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &, const QByteArray &bufferType);
82 virtual ~QueryRunner();
83
84 typename Akonadi2::ResultEmitter<typename DomainType::Ptr>::Ptr emitter();
85
86private:
87 static void replaySet(ResultSet &resultSet, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider);
88
89 void readEntity(const Akonadi2::Storage::NamedDatabase &db, const QByteArray &key, const std::function<void(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &, Akonadi2::Operation)> &resultCallback);
90
91 ResultSet loadInitialResultSet(const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters);
92 ResultSet loadIncrementalResultSet(qint64 baseRevision, const Akonadi2::Query &query, Akonadi2::Storage::Transaction &transaction, QSet<QByteArray> &remainingFilters);
93
94 ResultSet filterSet(const ResultSet &resultSet, const std::function<bool(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> &filter, const Akonadi2::Storage::NamedDatabase &db, bool initialQuery);
95 std::function<bool(const Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr &domainObject)> getFilter(const QSet<QByteArray> remainingFilters, const Akonadi2::Query &query);
96 qint64 load(const Akonadi2::Query &query, const std::function<ResultSet(Akonadi2::Storage::Transaction &, QSet<QByteArray> &)> &baseSetRetriever, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, bool initialQuery);
97 qint64 executeIncrementalQuery(const Akonadi2::Query &query, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider);
98 qint64 executeInitialQuery(const Akonadi2::Query &query, const typename DomainType::Ptr &parent, Akonadi2::ResultProviderInterface<typename DomainType::Ptr> &resultProvider);
99
100private:
101 QSharedPointer<Akonadi2::ResultProvider<typename DomainType::Ptr> > mResultProvider;
102 QSharedPointer<Akonadi2::ResourceAccessInterface> mResourceAccess;
103 DomainTypeAdaptorFactoryInterface::Ptr mDomainTypeAdaptorFactory;
104 QByteArray mResourceInstanceIdentifier;
105 QByteArray mBufferType;
106 Akonadi2::Query mQuery;
107};
108
diff --git a/common/resourceaccess.cpp b/common/resourceaccess.cpp
index bd9e2c9..be25533 100644
--- a/common/resourceaccess.cpp
+++ b/common/resourceaccess.cpp
@@ -282,6 +282,7 @@ KAsync::Job<void> ResourceAccess::sendCommand(int commandId, flatbuffers::FlatB
282 282
283KAsync::Job<void> ResourceAccess::synchronizeResource(bool sourceSync, bool localSync) 283KAsync::Job<void> ResourceAccess::synchronizeResource(bool sourceSync, bool localSync)
284{ 284{
285 Trace() << "Sending synchronize command: " << sourceSync << localSync;
285 flatbuffers::FlatBufferBuilder fbb; 286 flatbuffers::FlatBufferBuilder fbb;
286 auto command = Akonadi2::CreateSynchronize(fbb, sourceSync, localSync); 287 auto command = Akonadi2::CreateSynchronize(fbb, sourceSync, localSync);
287 Akonadi2::FinishSynchronizeBuffer(fbb, command); 288 Akonadi2::FinishSynchronizeBuffer(fbb, command);
@@ -340,7 +341,7 @@ KAsync::Job<void> ResourceAccess::sendRevisionReplayedCommand(qint64 revision)
340void ResourceAccess::open() 341void ResourceAccess::open()
341{ 342{
342 if (d->socket && d->socket->isValid()) { 343 if (d->socket && d->socket->isValid()) {
343 log("Socket valid, so not opening again"); 344 // Trace() << "Socket valid, so not opening again";
344 return; 345 return;
345 } 346 }
346 if (d->openingSocket) { 347 if (d->openingSocket) {
diff --git a/common/resourceaccess.h b/common/resourceaccess.h
index 8e27054..e87a1f7 100644
--- a/common/resourceaccess.h
+++ b/common/resourceaccess.h
@@ -37,6 +37,8 @@ class ResourceAccessInterface : public QObject
37{ 37{
38 Q_OBJECT 38 Q_OBJECT
39public: 39public:
40 typedef QSharedPointer<ResourceAccessInterface> Ptr;
41
40 ResourceAccessInterface() {} 42 ResourceAccessInterface() {}
41 virtual ~ResourceAccessInterface() {} 43 virtual ~ResourceAccessInterface() {}
42 virtual KAsync::Job<void> sendCommand(int commandId) = 0; 44 virtual KAsync::Job<void> sendCommand(int commandId) = 0;
diff --git a/common/resourcefacade.cpp b/common/resourcefacade.cpp
index 54185f8..6510c90 100644
--- a/common/resourcefacade.cpp
+++ b/common/resourcefacade.cpp
@@ -54,9 +54,15 @@ KAsync::Job<void> ResourceFacade::remove(const Akonadi2::ApplicationDomain::Akon
54 }); 54 });
55} 55}
56 56
57KAsync::Job<void> ResourceFacade::load(const Akonadi2::Query &query, const QSharedPointer<Akonadi2::ResultProvider<typename Akonadi2::ApplicationDomain::AkonadiResource::Ptr> > &resultProvider) 57QPair<KAsync::Job<void>, typename Akonadi2::ResultEmitter<Akonadi2::ApplicationDomain::AkonadiResource::Ptr>::Ptr > ResourceFacade::load(const Akonadi2::Query &query)
58{ 58{
59 return KAsync::start<void>([query, resultProvider]() { 59 auto resultProvider = new Akonadi2::ResultProvider<typename Akonadi2::ApplicationDomain::AkonadiResource::Ptr>();
60 auto emitter = resultProvider->emitter();
61 resultProvider->setFetcher([](const QSharedPointer<Akonadi2::ApplicationDomain::AkonadiResource> &) {});
62 resultProvider->onDone([resultProvider]() {
63 delete resultProvider;
64 });
65 auto job = KAsync::start<void>([query, resultProvider]() {
60 const auto configuredResources = ResourceConfig::getResources(); 66 const auto configuredResources = ResourceConfig::getResources();
61 for (const auto &res : configuredResources.keys()) { 67 for (const auto &res : configuredResources.keys()) {
62 const auto type = configuredResources.value(res); 68 const auto type = configuredResources.value(res);
@@ -68,8 +74,9 @@ KAsync::Job<void> ResourceFacade::load(const Akonadi2::Query &query, const QShar
68 } 74 }
69 } 75 }
70 //TODO initialResultSetComplete should be implicit 76 //TODO initialResultSetComplete should be implicit
71 resultProvider->initialResultSetComplete(); 77 resultProvider->initialResultSetComplete(Akonadi2::ApplicationDomain::AkonadiResource::Ptr());
72 resultProvider->complete(); 78 resultProvider->complete();
73 }); 79 });
80 return qMakePair(job, emitter);
74} 81}
75 82
diff --git a/common/resourcefacade.h b/common/resourcefacade.h
index 437ff75..38e0c0e 100644
--- a/common/resourcefacade.h
+++ b/common/resourcefacade.h
@@ -37,5 +37,6 @@ public:
37 KAsync::Job<void> create(const Akonadi2::ApplicationDomain::AkonadiResource &resource) Q_DECL_OVERRIDE; 37 KAsync::Job<void> create(const Akonadi2::ApplicationDomain::AkonadiResource &resource) Q_DECL_OVERRIDE;
38 KAsync::Job<void> modify(const Akonadi2::ApplicationDomain::AkonadiResource &resource) Q_DECL_OVERRIDE; 38 KAsync::Job<void> modify(const Akonadi2::ApplicationDomain::AkonadiResource &resource) Q_DECL_OVERRIDE;
39 KAsync::Job<void> remove(const Akonadi2::ApplicationDomain::AkonadiResource &resource) Q_DECL_OVERRIDE; 39 KAsync::Job<void> remove(const Akonadi2::ApplicationDomain::AkonadiResource &resource) Q_DECL_OVERRIDE;
40 KAsync::Job<void> load(const Akonadi2::Query &query, const QSharedPointer<Akonadi2::ResultProvider<typename Akonadi2::ApplicationDomain::AkonadiResource::Ptr> > &resultProvider) Q_DECL_OVERRIDE; 40 QPair<KAsync::Job<void>, typename Akonadi2::ResultEmitter<Akonadi2::ApplicationDomain::AkonadiResource::Ptr>::Ptr > load(const Akonadi2::Query &query) Q_DECL_OVERRIDE;
41}; 41};
42
diff --git a/common/resultprovider.h b/common/resultprovider.h
index bc03152..d50f3f6 100644
--- a/common/resultprovider.h
+++ b/common/resultprovider.h
@@ -20,9 +20,12 @@
20 20
21#pragma once 21#pragma once
22 22
23#include <QThread>
23#include <functional> 24#include <functional>
24#include <memory> 25#include <memory>
25#include "threadboundary.h" 26#include "threadboundary.h"
27#include "resultset.h"
28#include "log.h"
26 29
27using namespace async; 30using namespace async;
28 31
@@ -34,11 +37,43 @@ namespace Akonadi2 {
34template<class T> 37template<class T>
35class ResultEmitter; 38class ResultEmitter;
36 39
40template<class T>
41class ResultProviderInterface
42{
43public:
44 ResultProviderInterface()
45 : mRevision(0)
46 {
47
48 }
49
50 virtual void add(const T &value) = 0;
51 virtual void modify(const T &value) = 0;
52 virtual void remove(const T &value) = 0;
53 virtual void initialResultSetComplete(const T &parent) = 0;
54 virtual void complete() = 0;
55 virtual void clear() = 0;
56 virtual void setFetcher(const std::function<void(const T &parent)> &fetcher) = 0;
57
58 void setRevision(qint64 revision)
59 {
60 mRevision = revision;
61 }
62
63 qint64 revision() const
64 {
65 return mRevision;
66 }
67
68private:
69 qint64 mRevision;
70};
71
37/* 72/*
38* The promise side for the result emitter 73* The promise side for the result emitter
39*/ 74*/
40template<class T> 75template<class T>
41class ResultProvider { 76class ResultProvider : public ResultProviderInterface<T> {
42private: 77private:
43 void callInMainThreadOnEmitter(void (ResultEmitter<T>::*f)()) 78 void callInMainThreadOnEmitter(void (ResultEmitter<T>::*f)())
44 { 79 {
@@ -69,6 +104,12 @@ private:
69 } 104 }
70 105
71public: 106public:
107 typedef QSharedPointer<ResultProvider<T> > Ptr;
108
109 virtual ~ResultProvider()
110 {
111 }
112
72 //Called from worker thread 113 //Called from worker thread
73 void add(const T &value) 114 void add(const T &value)
74 { 115 {
@@ -103,9 +144,15 @@ public:
103 }); 144 });
104 } 145 }
105 146
106 void initialResultSetComplete() 147 void initialResultSetComplete(const T &parent)
107 { 148 {
108 callInMainThreadOnEmitter(&ResultEmitter<T>::initialResultSetComplete); 149 //Because I don't know how to use bind
150 auto weakEmitter = mResultEmitter;
151 callInMainThreadOnEmitter([weakEmitter, parent](){
152 if (auto strongRef = weakEmitter.toStrongRef()) {
153 strongRef->initialResultSetComplete(parent);
154 }
155 });
109 } 156 }
110 157
111 //Called from worker thread 158 //Called from worker thread
@@ -126,30 +173,16 @@ public:
126 //We have to go over a separate var and return that, otherwise we'd delete the emitter immediately again 173 //We have to go over a separate var and return that, otherwise we'd delete the emitter immediately again
127 auto sharedPtr = QSharedPointer<ResultEmitter<T> >(new ResultEmitter<T>, [this](ResultEmitter<T> *emitter){ mThreadBoundary->callInMainThread([this]() {done();}); delete emitter; }); 174 auto sharedPtr = QSharedPointer<ResultEmitter<T> >(new ResultEmitter<T>, [this](ResultEmitter<T> *emitter){ mThreadBoundary->callInMainThread([this]() {done();}); delete emitter; });
128 mResultEmitter = sharedPtr; 175 mResultEmitter = sharedPtr;
176 sharedPtr->setFetcher([this](const T &parent) {
177 Q_ASSERT(mFetcher);
178 mFetcher(parent);
179 });
129 return sharedPtr; 180 return sharedPtr;
130 } 181 }
131 182
132 return mResultEmitter.toStrongRef(); 183 return mResultEmitter.toStrongRef();
133 } 184 }
134 185
135 /**
136 * For lifetimemanagement only.
137 * We keep the runner alive as long as the result provider exists.
138 */
139 void setQueryRunner(const QSharedPointer<QObject> &runner)
140 {
141 mQueryRunner = runner;
142 }
143
144 /**
145 * For lifetimemanagement only.
146 * We keep the runner alive as long as the result provider exists.
147 */
148 void setFacade(const std::shared_ptr<void> &facade)
149 {
150 mFacade = facade;
151 }
152
153 void onDone(const std::function<void()> &callback) 186 void onDone(const std::function<void()> &callback)
154 { 187 {
155 mThreadBoundary = QSharedPointer<ThreadBoundary>::create(); 188 mThreadBoundary = QSharedPointer<ThreadBoundary>::create();
@@ -162,21 +195,27 @@ public:
162 return mResultEmitter.toStrongRef().isNull(); 195 return mResultEmitter.toStrongRef().isNull();
163 } 196 }
164 197
198 void setFetcher(const std::function<void(const T &parent)> &fetcher)
199 {
200 mFetcher = fetcher;
201 }
202
165private: 203private:
166 void done() 204 void done()
167 { 205 {
168 qWarning() << "done"; 206 qWarning() << "done";
169 if (mOnDoneCallback) { 207 if (mOnDoneCallback) {
170 mOnDoneCallback(); 208 auto callback = mOnDoneCallback;
171 mOnDoneCallback = std::function<void()>(); 209 mOnDoneCallback = std::function<void()>();
210 //This may delete this object
211 callback();
172 } 212 }
173 } 213 }
174 214
175 QWeakPointer<ResultEmitter<T> > mResultEmitter; 215 QWeakPointer<ResultEmitter<T> > mResultEmitter;
176 QSharedPointer<QObject> mQueryRunner;
177 std::shared_ptr<void> mFacade;
178 std::function<void()> mOnDoneCallback; 216 std::function<void()> mOnDoneCallback;
179 QSharedPointer<ThreadBoundary> mThreadBoundary; 217 QSharedPointer<ThreadBoundary> mThreadBoundary;
218 std::function<void(const T &parent)> mFetcher;
180}; 219};
181 220
182/* 221/*
@@ -194,6 +233,8 @@ private:
194template<class DomainType> 233template<class DomainType>
195class ResultEmitter { 234class ResultEmitter {
196public: 235public:
236 typedef QSharedPointer<ResultEmitter<DomainType> > Ptr;
237
197 void onAdded(const std::function<void(const DomainType&)> &handler) 238 void onAdded(const std::function<void(const DomainType&)> &handler)
198 { 239 {
199 addHandler = handler; 240 addHandler = handler;
@@ -209,7 +250,7 @@ public:
209 removeHandler = handler; 250 removeHandler = handler;
210 } 251 }
211 252
212 void onInitialResultSetComplete(const std::function<void(void)> &handler) 253 void onInitialResultSetComplete(const std::function<void(const DomainType&)> &handler)
213 { 254 {
214 initialResultSetCompleteHandler = handler; 255 initialResultSetCompleteHandler = handler;
215 } 256 }
@@ -239,28 +280,41 @@ public:
239 removeHandler(value); 280 removeHandler(value);
240 } 281 }
241 282
242 void initialResultSetComplete() 283 void initialResultSetComplete(const DomainType &parent)
243 { 284 {
244 initialResultSetCompleteHandler(); 285 if (initialResultSetCompleteHandler) {
286 initialResultSetCompleteHandler(parent);
287 }
245 } 288 }
246 289
247 void complete() 290 void complete()
248 { 291 {
249 completeHandler(); 292 if (completeHandler) {
293 completeHandler();
294 }
250 } 295 }
251 296
252 void clear() 297 void clear()
253 { 298 {
254 clearHandler(); 299 if (clearHandler) {
300 clearHandler();
301 }
255 } 302 }
256 303
304 void setFetcher(const std::function<void(const DomainType &parent)> &fetcher)
305 {
306 mFetcher = fetcher;
307 }
308
309 std::function<void(const DomainType &parent)> mFetcher;
310
257private: 311private:
258 friend class ResultProvider<DomainType>; 312 friend class ResultProvider<DomainType>;
259 313
260 std::function<void(const DomainType&)> addHandler; 314 std::function<void(const DomainType&)> addHandler;
261 std::function<void(const DomainType&)> modifyHandler; 315 std::function<void(const DomainType&)> modifyHandler;
262 std::function<void(const DomainType&)> removeHandler; 316 std::function<void(const DomainType&)> removeHandler;
263 std::function<void(void)> initialResultSetCompleteHandler; 317 std::function<void(const DomainType&)> initialResultSetCompleteHandler;
264 std::function<void(void)> completeHandler; 318 std::function<void(void)> completeHandler;
265 std::function<void(void)> clearHandler; 319 std::function<void(void)> clearHandler;
266 ThreadBoundary mThreadBoundary; 320 ThreadBoundary mThreadBoundary;
diff --git a/common/storage_lmdb.cpp b/common/storage_lmdb.cpp
index 1516e69..a247e38 100644
--- a/common/storage_lmdb.cpp
+++ b/common/storage_lmdb.cpp
@@ -253,22 +253,69 @@ int Storage::NamedDatabase::scan(const QByteArray &k,
253 253
254 return numberOfRetrievedValues; 254 return numberOfRetrievedValues;
255} 255}
256void Storage::NamedDatabase::findLatest(const QByteArray &uid, 256
257void Storage::NamedDatabase::findLatest(const QByteArray &k,
257 const std::function<void(const QByteArray &key, const QByteArray &value)> &resultHandler, 258 const std::function<void(const QByteArray &key, const QByteArray &value)> &resultHandler,
258 const std::function<void(const Storage::Error &error)> &errorHandler) const 259 const std::function<void(const Storage::Error &error)> &errorHandler) const
259{ 260{
260 QByteArray latestKey; 261 if (!d || !d->transaction) {
261 scan(uid, [&](const QByteArray &key, const QByteArray &value) -> bool { 262 //Not an error. We rely on this to read nothing from non-existing databases.
262 latestKey = key; 263 return;
263 return true; 264 }
264 },
265 errorHandler, true);
266 265
267 scan(latestKey, [=](const QByteArray &key, const QByteArray &value) -> bool { 266 int rc;
268 resultHandler(key, value); 267 MDB_val key;
269 return false; 268 MDB_val data;
270 }, 269 MDB_cursor *cursor;
271 errorHandler); 270
271 key.mv_data = (void*)k.constData();
272 key.mv_size = k.size();
273
274 rc = mdb_cursor_open(d->transaction, d->dbi, &cursor);
275 if (rc) {
276 Error error(d->name.toLatin1(), getErrorCode(rc), QByteArray("Error during mdb_cursor open: ") + QByteArray(mdb_strerror(rc)));
277 errorHandler ? errorHandler(error) : d->defaultErrorHandler(error);
278 return;
279 }
280
281 MDB_cursor_op op = MDB_SET_RANGE;
282 if ((rc = mdb_cursor_get(cursor, &key, &data, op)) == 0) {
283 //The first lookup will find a key that is equal or greather than our key
284 if (QByteArray::fromRawData((char*)key.mv_data, key.mv_size).startsWith(k)) {
285 bool advanced = false;
286 while (QByteArray::fromRawData((char*)key.mv_data, key.mv_size).startsWith(k)) {
287 advanced = true;
288 MDB_cursor_op nextOp = MDB_NEXT;
289 rc = mdb_cursor_get(cursor, &key, &data, nextOp);
290 if (rc) {
291 break;
292 }
293 }
294 if (advanced) {
295 MDB_cursor_op prefOp = MDB_PREV;
296 //We read past the end above, just take the last value
297 if (rc == MDB_NOTFOUND) {
298 prefOp = MDB_LAST;
299 }
300 rc = mdb_cursor_get(cursor, &key, &data, prefOp);
301 resultHandler(QByteArray::fromRawData((char*)key.mv_data, key.mv_size), QByteArray::fromRawData((char*)data.mv_data, data.mv_size));
302 }
303 }
304 }
305
306 //We never find the last value
307 if (rc == MDB_NOTFOUND) {
308 rc = 0;
309 }
310
311 mdb_cursor_close(cursor);
312
313 if (rc) {
314 Error error(d->name.toLatin1(), getErrorCode(rc), QByteArray("Key: ") + k + " : " + QByteArray(mdb_strerror(rc)));
315 errorHandler ? errorHandler(error) : d->defaultErrorHandler(error);
316 }
317
318 return;
272} 319}
273 320
274 321
diff --git a/common/threadboundary.cpp b/common/threadboundary.cpp
index 47ec508..48fd11a 100644
--- a/common/threadboundary.cpp
+++ b/common/threadboundary.cpp
@@ -24,6 +24,9 @@ Q_DECLARE_METATYPE(std::function<void()>);
24 24
25namespace async { 25namespace async {
26ThreadBoundary::ThreadBoundary(): QObject() { qRegisterMetaType<std::function<void()> >("std::function<void()>"); } 26ThreadBoundary::ThreadBoundary(): QObject() { qRegisterMetaType<std::function<void()> >("std::function<void()>"); }
27ThreadBoundary:: ~ThreadBoundary() {} 27ThreadBoundary:: ~ThreadBoundary()
28{
29}
30
28} 31}
29 32