summaryrefslogtreecommitdiffstats
path: root/common
diff options
context:
space:
mode:
Diffstat (limited to 'common')
-rw-r--r--common/CMakeLists.txt37
-rw-r--r--common/SinkConfig.cmake.in (renamed from common/SinkCommonConfig.cmake.in)2
-rw-r--r--common/clientapi.h140
-rw-r--r--common/commands.cpp2
-rw-r--r--common/commands.h11
-rw-r--r--common/commands/notification.fbs2
-rw-r--r--common/definitions.h5
-rw-r--r--common/domain/applicationdomaintype.cpp45
-rw-r--r--common/domain/applicationdomaintype.h60
-rw-r--r--common/domain/event.cpp2
-rw-r--r--common/domainadaptor.h3
-rw-r--r--common/domaintypeadaptorfactoryinterface.h6
-rw-r--r--common/entitybuffer.cpp7
-rw-r--r--common/entitybuffer.h5
-rw-r--r--common/facade.cpp9
-rw-r--r--common/facade.h6
-rw-r--r--common/facadefactory.h3
-rw-r--r--common/genericresource.cpp77
-rw-r--r--common/genericresource.h6
-rw-r--r--common/index.h3
-rw-r--r--common/inspection.h2
-rw-r--r--common/listener.cpp17
-rw-r--r--common/listener.h3
-rw-r--r--common/listmodelresult.h125
-rw-r--r--common/log.cpp165
-rw-r--r--common/log.h75
-rw-r--r--common/messagequeue.cpp4
-rw-r--r--common/messagequeue.h3
-rw-r--r--common/modelresult.cpp3
-rw-r--r--common/notification.h4
-rw-r--r--common/notifier.cpp69
-rw-r--r--common/notifier.h (renamed from common/listmodelresult.cpp)29
-rw-r--r--common/pipeline.cpp174
-rw-r--r--common/pipeline.h8
-rw-r--r--common/propertymapper.h13
-rw-r--r--common/queryrunner.cpp49
-rw-r--r--common/queryrunner.h11
-rw-r--r--common/resource.cpp15
-rw-r--r--common/resource.h11
-rw-r--r--common/resourceaccess.cpp78
-rw-r--r--common/resourceaccess.h6
-rw-r--r--common/resourceconfig.h3
-rw-r--r--common/resourcecontrol.cpp125
-rw-r--r--common/resourcecontrol.h62
-rw-r--r--common/resultprovider.h13
-rw-r--r--common/storage.h13
-rw-r--r--common/storage_common.cpp5
-rw-r--r--common/storage_lmdb.cpp14
-rw-r--r--common/store.cpp (renamed from common/clientapi.cpp)140
-rw-r--r--common/store.h101
-rw-r--r--common/threadboundary.h4
-rw-r--r--common/typeindex.cpp7
52 files changed, 1114 insertions, 668 deletions
diff --git a/common/CMakeLists.txt b/common/CMakeLists.txt
index a80ef95..fe72605 100644
--- a/common/CMakeLists.txt
+++ b/common/CMakeLists.txt
@@ -1,41 +1,45 @@
1include_directories(${CMAKE_CURRENT_BINARY_DIR}) 1include_directories(${CMAKE_CURRENT_BINARY_DIR})
2include_directories(domain) 2include_directories(domain)
3 3
4project(sinkcommon) 4project(sink)
5 5
6ecm_setup_version("0.1" VARIABLE_PREFIX SinkCommon 6ecm_setup_version("0.1" VARIABLE_PREFIX Sink
7 VERSION_HEADER "${CMAKE_CURRENT_BINARY_DIR}/sinkcommon_version.h" 7 VERSION_HEADER "${CMAKE_CURRENT_BINARY_DIR}/sink_version.h"
8 PACKAGE_VERSION_FILE "${CMAKE_CURRENT_BINARY_DIR}/SinkCommonConfigVersion.cmake" 8 PACKAGE_VERSION_FILE "${CMAKE_CURRENT_BINARY_DIR}/SinkConfigVersion.cmake"
9 SOVERSION 0 9 SOVERSION 0
10) 10)
11 11
12########### CMake Config Files ########### 12########### CMake Config Files ###########
13set(CMAKECONFIG_INSTALL_DIR "${KDE_INSTALL_CMAKEPACKAGEDIR}/SinkCommon") 13set(CMAKECONFIG_INSTALL_DIR "${KDE_INSTALL_CMAKEPACKAGEDIR}/Sink")
14 14
15ecm_configure_package_config_file( 15ecm_configure_package_config_file(
16 "${CMAKE_CURRENT_SOURCE_DIR}/SinkCommonConfig.cmake.in" 16 "${CMAKE_CURRENT_SOURCE_DIR}/SinkConfig.cmake.in"
17 "${CMAKE_CURRENT_BINARY_DIR}/SinkCommonConfig.cmake" 17 "${CMAKE_CURRENT_BINARY_DIR}/SinkConfig.cmake"
18 INSTALL_DESTINATION ${CMAKECONFIG_INSTALL_DIR} 18 INSTALL_DESTINATION ${CMAKECONFIG_INSTALL_DIR}
19) 19)
20 20
21install(FILES 21install(FILES
22 "${CMAKE_CURRENT_BINARY_DIR}/SinkCommonConfig.cmake" 22 "${CMAKE_CURRENT_BINARY_DIR}/SinkConfig.cmake"
23 "${CMAKE_CURRENT_BINARY_DIR}/SinkCommonConfigVersion.cmake" 23 "${CMAKE_CURRENT_BINARY_DIR}/SinkConfigVersion.cmake"
24 DESTINATION "${CMAKECONFIG_INSTALL_DIR}" 24 DESTINATION "${CMAKECONFIG_INSTALL_DIR}"
25 COMPONENT Devel 25 COMPONENT Devel
26) 26)
27 27
28install(EXPORT SinkCommonTargets DESTINATION "${CMAKECONFIG_INSTALL_DIR}" FILE SinkCommonTargets.cmake) 28add_definitions("-fvisibility=hidden")
29
30install(EXPORT SinkTargets DESTINATION "${CMAKECONFIG_INSTALL_DIR}" FILE SinkTargets.cmake)
29 31
30set(storage_SRCS storage_lmdb.cpp) 32set(storage_SRCS storage_lmdb.cpp)
31set(storage_LIBS lmdb) 33set(storage_LIBS lmdb)
32 34
33set(command_SRCS 35set(command_SRCS
36 store.cpp
37 notifier.cpp
38 resourcecontrol.cpp
34 modelresult.cpp 39 modelresult.cpp
35 definitions.cpp 40 definitions.cpp
36 log.cpp 41 log.cpp
37 entitybuffer.cpp 42 entitybuffer.cpp
38 clientapi.cpp
39 facadefactory.cpp 43 facadefactory.cpp
40 commands.cpp 44 commands.cpp
41 facade.cpp 45 facade.cpp
@@ -85,7 +89,7 @@ generate_flatbuffers(
85 queuedcommand 89 queuedcommand
86) 90)
87 91
88generate_export_header(${PROJECT_NAME} BASE_NAME SinkCommon EXPORT_FILE_NAME sinkcommon_export.h) 92generate_export_header(${PROJECT_NAME} BASE_NAME Sink EXPORT_FILE_NAME sink_export.h)
89SET_TARGET_PROPERTIES(${PROJECT_NAME} 93SET_TARGET_PROPERTIES(${PROJECT_NAME}
90 PROPERTIES LINKER_LANGUAGE CXX 94 PROPERTIES LINKER_LANGUAGE CXX
91 VERSION "0.1" 95 VERSION "0.1"
@@ -95,15 +99,20 @@ SET_TARGET_PROPERTIES(${PROJECT_NAME}
95qt5_use_modules(${PROJECT_NAME} Network) 99qt5_use_modules(${PROJECT_NAME} Network)
96target_link_libraries(${PROJECT_NAME} ${storage_LIBS} KF5::Async) 100target_link_libraries(${PROJECT_NAME} ${storage_LIBS} KF5::Async)
97install(TARGETS ${PROJECT_NAME} 101install(TARGETS ${PROJECT_NAME}
98 EXPORT SinkCommonTargets ${KDE_INSTALL_TARGETS_DEFAULT_ARGS} ${LIBRARY_NAMELINK} ) 102 EXPORT SinkTargets ${KDE_INSTALL_TARGETS_DEFAULT_ARGS} ${LIBRARY_NAMELINK} )
103
104add_clang_static_analysis(${PROJECT_NAME})
99 105
100install(FILES 106install(FILES
101 clientapi.h 107 store.h
108 notifier.h
109 resourcecontrol.h
102 domain/applicationdomaintype.h 110 domain/applicationdomaintype.h
103 query.h 111 query.h
104 inspection.h 112 inspection.h
105 notification.h 113 notification.h
106 bufferadaptor.h 114 bufferadaptor.h
115 ${CMAKE_CURRENT_BINARY_DIR}/sink_export.h
107 DESTINATION ${INCLUDE_INSTALL_DIR}/${PROJECT_NAME} COMPONENT Devel 116 DESTINATION ${INCLUDE_INSTALL_DIR}/${PROJECT_NAME} COMPONENT Devel
108) 117)
109 118
diff --git a/common/SinkCommonConfig.cmake.in b/common/SinkConfig.cmake.in
index 6c2a1c2..930b2db 100644
--- a/common/SinkCommonConfig.cmake.in
+++ b/common/SinkConfig.cmake.in
@@ -2,4 +2,4 @@
2 2
3find_dependency(KF5Mime "@KMIME_LIB_VERSION@") 3find_dependency(KF5Mime "@KMIME_LIB_VERSION@")
4 4
5include("${CMAKE_CURRENT_LIST_DIR}/SinkCommonTargets.cmake") 5include("${CMAKE_CURRENT_LIST_DIR}/SinkTargets.cmake")
diff --git a/common/clientapi.h b/common/clientapi.h
deleted file mode 100644
index 64c4f64..0000000
--- a/common/clientapi.h
+++ /dev/null
@@ -1,140 +0,0 @@
1/*
2 * Copyright (C) 2014 Christian Mollekopf <chrigi_1@fastmail.fm>
3 *
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Lesser General Public
6 * License as published by the Free Software Foundation; either
7 * version 2.1 of the License, or (at your option) version 3, or any
8 * later version accepted by the membership of KDE e.V. (or its
9 * successor approved by the membership of KDE e.V.), which shall
10 * act as a proxy defined in Section 6 of version 3 of the license.
11 *
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this library. If not, see <http://www.gnu.org/licenses/>.
19 */
20
21#pragma once
22
23#include <QString>
24#include <QSharedPointer>
25
26#include <Async/Async>
27
28#include "query.h"
29#include "inspection.h"
30#include "applicationdomaintype.h"
31
32class QAbstractItemModel;
33
34namespace Sink {
35class ResourceAccess;
36class Notification;
37
38/**
39 * Store interface used in the client API.
40 */
41class Store {
42public:
43 static QString storageLocation();
44 static QByteArray resourceName(const QByteArray &instanceIdentifier);
45
46 enum Roles {
47 DomainObjectRole = Qt::UserRole + 1, //Must be the same as in ModelResult
48 ChildrenFetchedRole,
49 DomainObjectBaseRole
50 };
51
52 /**
53 * Asynchronusly load a dataset with tree structure information
54 */
55 template <class DomainType>
56 static QSharedPointer<QAbstractItemModel> loadModel(Query query);
57
58 /**
59 * Create a new entity.
60 */
61 template <class DomainType>
62 static KAsync::Job<void> create(const DomainType &domainObject);
63
64 /**
65 * Modify an entity.
66 *
67 * This includes moving etc. since these are also simple settings on a property.
68 */
69 template <class DomainType>
70 static KAsync::Job<void> modify(const DomainType &domainObject);
71
72 /**
73 * Remove an entity.
74 */
75 template <class DomainType>
76 static KAsync::Job<void> remove(const DomainType &domainObject);
77
78 /**
79 * Synchronize data to local cache.
80 */
81 static KAsync::Job<void> synchronize(const Sink::Query &query);
82
83 /**
84 * Shutdown resource.
85 */
86 static KAsync::Job<void> shutdown(const QByteArray &resourceIdentifier);
87
88 /**
89 * Start resource.
90 *
91 * The resource is ready for operation once this command completes.
92 * This command is only necessary if a resource was shutdown previously,
93 * otherwise the resource process will automatically start as necessary.
94 */
95 static KAsync::Job<void> start(const QByteArray &resourceIdentifier);
96
97 /**
98 * Flushes any pending messages to disk
99 */
100 static KAsync::Job<void> flushMessageQueue(const QByteArrayList &resourceIdentifier);
101
102 /**
103 * Flushes any pending messages that haven't been replayed to the source.
104 */
105 static KAsync::Job<void> flushReplayQueue(const QByteArrayList &resourceIdentifier);
106
107 /**
108 * Removes a resource from disk.
109 */
110 static void removeFromDisk(const QByteArray &resourceIdentifier);
111
112 template <class DomainType>
113 static KAsync::Job<DomainType> fetchOne(const Sink::Query &query);
114
115 template <class DomainType>
116 static KAsync::Job<QList<typename DomainType::Ptr> > fetchAll(const Sink::Query &query);
117
118 template <class DomainType>
119 static KAsync::Job<QList<typename DomainType::Ptr> > fetch(const Sink::Query &query, int minimumAmount = 0);
120};
121
122namespace Resources {
123 template <class DomainType>
124 KAsync::Job<void> inspect(const Inspection &inspectionCommand);
125}
126
127class Notifier {
128public:
129 Notifier(const QSharedPointer<ResourceAccess> &resourceAccess);
130 // Notifier(const QByteArray &resource);
131 // Notifier(const QByteArrayList &resource);
132 void registerHandler(std::function<void(const Notification &)>);
133
134private:
135 class Private;
136 QScopedPointer<Private> d;
137};
138
139}
140
diff --git a/common/commands.cpp b/common/commands.cpp
index 8b915f0..5d38afa 100644
--- a/common/commands.cpp
+++ b/common/commands.cpp
@@ -61,6 +61,8 @@ QByteArray name(int commandId)
61 return "RevisionReplayed"; 61 return "RevisionReplayed";
62 case InspectionCommand: 62 case InspectionCommand:
63 return "Inspection"; 63 return "Inspection";
64 case RemoveFromDiskCommand:
65 return "RemoveFromDisk";
64 case CustomCommand: 66 case CustomCommand:
65 return "Custom"; 67 return "Custom";
66 }; 68 };
diff --git a/common/commands.h b/common/commands.h
index 19c827e..64abd76 100644
--- a/common/commands.h
+++ b/common/commands.h
@@ -20,7 +20,7 @@
20 20
21#pragma once 21#pragma once
22 22
23#include <sinkcommon_export.h> 23#include "sink_export.h"
24#include <flatbuffers/flatbuffers.h> 24#include <flatbuffers/flatbuffers.h>
25#include <QByteArray> 25#include <QByteArray>
26 26
@@ -48,16 +48,17 @@ enum CommandIds {
48 PingCommand, 48 PingCommand,
49 RevisionReplayedCommand, 49 RevisionReplayedCommand,
50 InspectionCommand, 50 InspectionCommand,
51 RemoveFromDiskCommand,
51 CustomCommand = 0xffff 52 CustomCommand = 0xffff
52}; 53};
53 54
54 55
55QByteArray name(int commandId); 56QByteArray name(int commandId);
56 57
57int SINKCOMMON_EXPORT headerSize(); 58int SINK_EXPORT headerSize();
58void SINKCOMMON_EXPORT write(QIODevice *device, int messageId, int commandId); 59void SINK_EXPORT write(QIODevice *device, int messageId, int commandId);
59void SINKCOMMON_EXPORT write(QIODevice *device, int messageId, int commandId, const char *buffer, uint size); 60void SINK_EXPORT write(QIODevice *device, int messageId, int commandId, const char *buffer, uint size);
60void SINKCOMMON_EXPORT write(QIODevice *device, int messageId, int commandId, flatbuffers::FlatBufferBuilder &fbb); 61void SINK_EXPORT write(QIODevice *device, int messageId, int commandId, flatbuffers::FlatBufferBuilder &fbb);
61 62
62} 63}
63 64
diff --git a/common/commands/notification.fbs b/common/commands/notification.fbs
index ff01fc5..8750ff5 100644
--- a/common/commands/notification.fbs
+++ b/common/commands/notification.fbs
@@ -1,6 +1,6 @@
1namespace Sink.Commands; 1namespace Sink.Commands;
2 2
3enum NotificationType : byte { Shutdown = 1, Status, Warning, Progress, Inspection } 3enum NotificationType : byte { Shutdown = 1, Status, Warning, Progress, Inspection, RevisionUpdate }
4enum NotificationCode : byte { Success = 0, Failure = 1, UserCode } 4enum NotificationCode : byte { Success = 0, Failure = 1, UserCode }
5 5
6table Notification { 6table Notification {
diff --git a/common/definitions.h b/common/definitions.h
index 5834f01..029d3f8 100644
--- a/common/definitions.h
+++ b/common/definitions.h
@@ -20,10 +20,11 @@
20 20
21#pragma once 21#pragma once
22 22
23#include "sink_export.h"
23#include <QString> 24#include <QString>
24#include <QByteArray> 25#include <QByteArray>
25 26
26namespace Sink { 27namespace Sink {
27 QString storageLocation(); 28 QString SINK_EXPORT storageLocation();
28 QByteArray resourceName(const QByteArray &instanceIdentifier); 29 QByteArray SINK_EXPORT resourceName(const QByteArray &instanceIdentifier);
29} 30}
diff --git a/common/domain/applicationdomaintype.cpp b/common/domain/applicationdomaintype.cpp
index b0433be..df10327 100644
--- a/common/domain/applicationdomaintype.cpp
+++ b/common/domain/applicationdomaintype.cpp
@@ -64,6 +64,12 @@ ApplicationDomainType::~ApplicationDomainType()
64{ 64{
65} 65}
66 66
67bool ApplicationDomainType::hasProperty(const QByteArray &key) const
68{
69 Q_ASSERT(mAdaptor);
70 return mAdaptor->availableProperties().contains(key);
71}
72
67QVariant ApplicationDomainType::getProperty(const QByteArray &key) const 73QVariant ApplicationDomainType::getProperty(const QByteArray &key) const
68{ 74{
69 Q_ASSERT(mAdaptor); 75 Q_ASSERT(mAdaptor);
@@ -76,13 +82,18 @@ QVariant ApplicationDomainType::getProperty(const QByteArray &key) const
76void ApplicationDomainType::setProperty(const QByteArray &key, const QVariant &value) 82void ApplicationDomainType::setProperty(const QByteArray &key, const QVariant &value)
77{ 83{
78 Q_ASSERT(mAdaptor); 84 Q_ASSERT(mAdaptor);
79 mChangeSet.insert(key, value); 85 mChangeSet.insert(key);
80 mAdaptor->setProperty(key, value); 86 mAdaptor->setProperty(key, value);
81} 87}
82 88
89void ApplicationDomainType::setChangedProperties(const QSet<QByteArray> &changeset)
90{
91 mChangeSet = changeset;
92}
93
83QByteArrayList ApplicationDomainType::changedProperties() const 94QByteArrayList ApplicationDomainType::changedProperties() const
84{ 95{
85 return mChangeSet.keys(); 96 return mChangeSet.toList();
86} 97}
87 98
88qint64 ApplicationDomainType::revision() const 99qint64 ApplicationDomainType::revision() const
@@ -100,6 +111,36 @@ QByteArray ApplicationDomainType::identifier() const
100 return mIdentifier; 111 return mIdentifier;
101} 112}
102 113
114Entity::~Entity()
115{
116
117}
118
119Event::~Event()
120{
121
122}
123
124Todo::~Todo()
125{
126
127}
128
129Mail::~Mail()
130{
131
132}
133
134Folder::~Folder()
135{
136
137}
138
139SinkResource::~SinkResource()
140{
141
142}
143
103template<> 144template<>
104QByteArray getTypeName<Event>() 145QByteArray getTypeName<Event>()
105{ 146{
diff --git a/common/domain/applicationdomaintype.h b/common/domain/applicationdomaintype.h
index 63f030c..32d8999 100644
--- a/common/domain/applicationdomaintype.h
+++ b/common/domain/applicationdomaintype.h
@@ -19,9 +19,11 @@
19 */ 19 */
20#pragma once 20#pragma once
21 21
22#include "sink_export.h"
22#include <QSharedPointer> 23#include <QSharedPointer>
23#include <QVariant> 24#include <QVariant>
24#include <QByteArray> 25#include <QByteArray>
26#include <QDebug>
25#include "bufferadaptor.h" 27#include "bufferadaptor.h"
26 28
27namespace Sink { 29namespace Sink {
@@ -35,7 +37,7 @@ namespace ApplicationDomain {
35 * 37 *
36 * ApplicationDomainTypes don't adhere to any standard and are meant to be extended frequently (hence the non-typesafe interface). 38 * ApplicationDomainTypes don't adhere to any standard and are meant to be extended frequently (hence the non-typesafe interface).
37 */ 39 */
38class ApplicationDomainType { 40class SINK_EXPORT ApplicationDomainType {
39public: 41public:
40 typedef QSharedPointer<ApplicationDomainType> Ptr; 42 typedef QSharedPointer<ApplicationDomainType> Ptr;
41 43
@@ -55,16 +57,19 @@ public:
55 57
56 virtual ~ApplicationDomainType(); 58 virtual ~ApplicationDomainType();
57 59
58 virtual QVariant getProperty(const QByteArray &key) const; 60 bool hasProperty(const QByteArray &key) const;
59 virtual void setProperty(const QByteArray &key, const QVariant &value); 61 QVariant getProperty(const QByteArray &key) const;
60 virtual QByteArrayList changedProperties() const; 62 void setProperty(const QByteArray &key, const QVariant &value);
63 void setChangedProperties(const QSet<QByteArray> &changeset);
64 QByteArrayList changedProperties() const;
61 qint64 revision() const; 65 qint64 revision() const;
62 QByteArray resourceInstanceIdentifier() const; 66 QByteArray resourceInstanceIdentifier() const;
63 QByteArray identifier() const; 67 QByteArray identifier() const;
64 68
65private: 69private:
70 friend QDebug operator<<(QDebug, const ApplicationDomainType &);
66 QSharedPointer<BufferAdaptor> mAdaptor; 71 QSharedPointer<BufferAdaptor> mAdaptor;
67 QHash<QByteArray, QVariant> mChangeSet; 72 QSet<QByteArray> mChangeSet;
68 /* 73 /*
69 * Each domain object needs to store the resource, identifier, revision triple so we can link back to the storage location. 74 * Each domain object needs to store the resource, identifier, revision triple so we can link back to the storage location.
70 */ 75 */
@@ -82,34 +87,50 @@ inline bool operator==(const ApplicationDomainType& lhs, const ApplicationDomain
82 && lhs.resourceInstanceIdentifier() == rhs.resourceInstanceIdentifier(); 87 && lhs.resourceInstanceIdentifier() == rhs.resourceInstanceIdentifier();
83} 88}
84 89
85struct Entity : public ApplicationDomainType { 90inline QDebug operator<< (QDebug d, const ApplicationDomainType &type)
91{
92 d << "ApplicationDomainType(\n";
93 for (const auto &property : type.mAdaptor->availableProperties()) {
94 d << " " << property << "\t" << type.getProperty(property) << "\n";
95 }
96 d << ")";
97 return d;
98}
99
100struct SINK_EXPORT Entity : public ApplicationDomainType {
86 typedef QSharedPointer<Entity> Ptr; 101 typedef QSharedPointer<Entity> Ptr;
87 using ApplicationDomainType::ApplicationDomainType; 102 using ApplicationDomainType::ApplicationDomainType;
103 virtual ~Entity();
88}; 104};
89 105
90struct Event : public Entity { 106struct SINK_EXPORT Event : public Entity {
91 typedef QSharedPointer<Event> Ptr; 107 typedef QSharedPointer<Event> Ptr;
92 using Entity::Entity; 108 using Entity::Entity;
109 virtual ~Event();
93}; 110};
94 111
95struct Todo : public Entity { 112struct SINK_EXPORT Todo : public Entity {
96 typedef QSharedPointer<Todo> Ptr; 113 typedef QSharedPointer<Todo> Ptr;
97 using Entity::Entity; 114 using Entity::Entity;
115 virtual ~Todo();
98}; 116};
99 117
100struct Calendar : public Entity { 118struct SINK_EXPORT Calendar : public Entity {
101 typedef QSharedPointer<Calendar> Ptr; 119 typedef QSharedPointer<Calendar> Ptr;
102 using Entity::Entity; 120 using Entity::Entity;
121 virtual ~Calendar();
103}; 122};
104 123
105struct Mail : public Entity { 124struct SINK_EXPORT Mail : public Entity {
106 typedef QSharedPointer<Mail> Ptr; 125 typedef QSharedPointer<Mail> Ptr;
107 using Entity::Entity; 126 using Entity::Entity;
127 virtual ~Mail();
108}; 128};
109 129
110struct Folder : public Entity { 130struct SINK_EXPORT Folder : public Entity {
111 typedef QSharedPointer<Folder> Ptr; 131 typedef QSharedPointer<Folder> Ptr;
112 using Entity::Entity; 132 using Entity::Entity;
133 virtual ~Folder();
113}; 134};
114 135
115/** 136/**
@@ -118,9 +139,10 @@ struct Folder : public Entity {
118 * This type is used for configuration of resources, 139 * This type is used for configuration of resources,
119 * and for creating and removing resource instances. 140 * and for creating and removing resource instances.
120 */ 141 */
121struct SinkResource : public ApplicationDomainType { 142struct SINK_EXPORT SinkResource : public ApplicationDomainType {
122 typedef QSharedPointer<SinkResource> Ptr; 143 typedef QSharedPointer<SinkResource> Ptr;
123 using ApplicationDomainType::ApplicationDomainType; 144 using ApplicationDomainType::ApplicationDomainType;
145 virtual ~SinkResource();
124}; 146};
125 147
126/** 148/**
@@ -129,22 +151,22 @@ struct SinkResource : public ApplicationDomainType {
129 * Do not store these types to disk, they may change over time. 151 * Do not store these types to disk, they may change over time.
130 */ 152 */
131template<class DomainType> 153template<class DomainType>
132QByteArray getTypeName(); 154QByteArray SINK_EXPORT getTypeName();
133 155
134template<> 156template<>
135QByteArray getTypeName<Event>(); 157QByteArray SINK_EXPORT getTypeName<Event>();
136 158
137template<> 159template<>
138QByteArray getTypeName<Todo>(); 160QByteArray SINK_EXPORT getTypeName<Todo>();
139 161
140template<> 162template<>
141QByteArray getTypeName<SinkResource>(); 163QByteArray SINK_EXPORT getTypeName<SinkResource>();
142 164
143template<> 165template<>
144QByteArray getTypeName<Mail>(); 166QByteArray SINK_EXPORT getTypeName<Mail>();
145 167
146template<> 168template<>
147QByteArray getTypeName<Folder>(); 169QByteArray SINK_EXPORT getTypeName<Folder>();
148 170
149/** 171/**
150 * Type implementation. 172 * Type implementation.
@@ -153,7 +175,7 @@ QByteArray getTypeName<Folder>();
153 * Contains all non-resource specific, but type-specific code. 175 * Contains all non-resource specific, but type-specific code.
154 */ 176 */
155template<typename DomainType> 177template<typename DomainType>
156class TypeImplementation; 178class SINK_EXPORT TypeImplementation;
157 179
158} 180}
159} 181}
diff --git a/common/domain/event.cpp b/common/domain/event.cpp
index 9f81eb8..4210125 100644
--- a/common/domain/event.cpp
+++ b/common/domain/event.cpp
@@ -69,6 +69,7 @@ QSharedPointer<ReadPropertyMapper<TypeImplementation<Event>::Buffer> > TypeImple
69{ 69{
70 auto propertyMapper = QSharedPointer<ReadPropertyMapper<Buffer> >::create(); 70 auto propertyMapper = QSharedPointer<ReadPropertyMapper<Buffer> >::create();
71 propertyMapper->addMapping<QString, Buffer>("summary", &Buffer::summary); 71 propertyMapper->addMapping<QString, Buffer>("summary", &Buffer::summary);
72 propertyMapper->addMapping<QString, Buffer>("description", &Buffer::description);
72 propertyMapper->addMapping<QString, Buffer>("uid", &Buffer::uid); 73 propertyMapper->addMapping<QString, Buffer>("uid", &Buffer::uid);
73 propertyMapper->addMapping<QByteArray, Buffer>("attachment", &Buffer::attachment); 74 propertyMapper->addMapping<QByteArray, Buffer>("attachment", &Buffer::attachment);
74 return propertyMapper; 75 return propertyMapper;
@@ -78,6 +79,7 @@ QSharedPointer<WritePropertyMapper<TypeImplementation<Event>::BufferBuilder> > T
78{ 79{
79 auto propertyMapper = QSharedPointer<WritePropertyMapper<BufferBuilder> >::create(); 80 auto propertyMapper = QSharedPointer<WritePropertyMapper<BufferBuilder> >::create();
80 propertyMapper->addMapping<QString>("summary", &BufferBuilder::add_summary); 81 propertyMapper->addMapping<QString>("summary", &BufferBuilder::add_summary);
82 propertyMapper->addMapping<QString>("description", &BufferBuilder::add_description);
81 propertyMapper->addMapping<QString>("uid", &BufferBuilder::add_uid); 83 propertyMapper->addMapping<QString>("uid", &BufferBuilder::add_uid);
82 propertyMapper->addMapping<QByteArray>("attachment", &BufferBuilder::add_attachment); 84 propertyMapper->addMapping<QByteArray>("attachment", &BufferBuilder::add_attachment);
83 return propertyMapper; 85 return propertyMapper;
diff --git a/common/domainadaptor.h b/common/domainadaptor.h
index d43fad7..893c8d0 100644
--- a/common/domainadaptor.h
+++ b/common/domainadaptor.h
@@ -19,6 +19,7 @@
19 19
20#pragma once 20#pragma once
21 21
22#include "sink_export.h"
22#include <QVariant> 23#include <QVariant>
23#include <QByteArray> 24#include <QByteArray>
24#include <functional> 25#include <functional>
@@ -121,7 +122,7 @@ public:
121 * This is required by the facade the read the value, and by the pipeline preprocessors to access the domain values in a generic way. 122 * This is required by the facade the read the value, and by the pipeline preprocessors to access the domain values in a generic way.
122 */ 123 */
123template<typename DomainType, typename ResourceBuffer, typename ResourceBuilder> 124template<typename DomainType, typename ResourceBuffer, typename ResourceBuilder>
124class DomainTypeAdaptorFactory : public DomainTypeAdaptorFactoryInterface 125class SINK_EXPORT DomainTypeAdaptorFactory : public DomainTypeAdaptorFactoryInterface
125{ 126{
126 typedef typename Sink::ApplicationDomain::TypeImplementation<DomainType>::Buffer LocalBuffer; 127 typedef typename Sink::ApplicationDomain::TypeImplementation<DomainType>::Buffer LocalBuffer;
127 typedef typename Sink::ApplicationDomain::TypeImplementation<DomainType>::BufferBuilder LocalBuilder; 128 typedef typename Sink::ApplicationDomain::TypeImplementation<DomainType>::BufferBuilder LocalBuilder;
diff --git a/common/domaintypeadaptorfactoryinterface.h b/common/domaintypeadaptorfactoryinterface.h
index d974bbf..4d0ce04 100644
--- a/common/domaintypeadaptorfactoryinterface.h
+++ b/common/domaintypeadaptorfactoryinterface.h
@@ -38,5 +38,11 @@ public:
38 typedef QSharedPointer<DomainTypeAdaptorFactoryInterface> Ptr; 38 typedef QSharedPointer<DomainTypeAdaptorFactoryInterface> Ptr;
39 virtual ~DomainTypeAdaptorFactoryInterface() {}; 39 virtual ~DomainTypeAdaptorFactoryInterface() {};
40 virtual QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> createAdaptor(const Sink::Entity &entity) = 0; 40 virtual QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> createAdaptor(const Sink::Entity &entity) = 0;
41
42 /*
43 * Creates a buffer from @param domainType
44 *
45 * Note that this only serialized parameters that are part of ApplicationDomainType::changedProperties()
46 */
41 virtual void createBuffer(const Sink::ApplicationDomain::ApplicationDomainType &domainType, flatbuffers::FlatBufferBuilder &fbb, void const *metadataData = 0, size_t metadataSize = 0) = 0; 47 virtual void createBuffer(const Sink::ApplicationDomain::ApplicationDomainType &domainType, flatbuffers::FlatBufferBuilder &fbb, void const *metadataData = 0, size_t metadataSize = 0) = 0;
42}; 48};
diff --git a/common/entitybuffer.cpp b/common/entitybuffer.cpp
index 0e5435a..b4a5cb2 100644
--- a/common/entitybuffer.cpp
+++ b/common/entitybuffer.cpp
@@ -18,6 +18,12 @@ EntityBuffer::EntityBuffer(const void *dataValue, int dataSize)
18 } 18 }
19} 19}
20 20
21EntityBuffer::EntityBuffer(const QByteArray &data)
22 : EntityBuffer(data.constData(), data.size())
23{
24
25}
26
21bool EntityBuffer::isValid() const 27bool EntityBuffer::isValid() const
22{ 28{
23 return mEntity; 29 return mEntity;
@@ -25,6 +31,7 @@ bool EntityBuffer::isValid() const
25 31
26const Sink::Entity &EntityBuffer::entity() 32const Sink::Entity &EntityBuffer::entity()
27{ 33{
34 Q_ASSERT(mEntity);
28 return *mEntity; 35 return *mEntity;
29} 36}
30 37
diff --git a/common/entitybuffer.h b/common/entitybuffer.h
index c9c2453..474a619 100644
--- a/common/entitybuffer.h
+++ b/common/entitybuffer.h
@@ -1,14 +1,17 @@
1#pragma once 1#pragma once
2 2
3#include "sink_export.h"
3#include <functional> 4#include <functional>
4#include <flatbuffers/flatbuffers.h> 5#include <flatbuffers/flatbuffers.h>
6#include <QByteArray>
5 7
6namespace Sink { 8namespace Sink {
7struct Entity; 9struct Entity;
8 10
9class EntityBuffer { 11class SINK_EXPORT EntityBuffer {
10public: 12public:
11 EntityBuffer(const void *dataValue, int size); 13 EntityBuffer(const void *dataValue, int size);
14 EntityBuffer(const QByteArray &data);
12 const uint8_t *resourceBuffer(); 15 const uint8_t *resourceBuffer();
13 const uint8_t *metadataBuffer(); 16 const uint8_t *metadataBuffer();
14 const uint8_t *localBuffer(); 17 const uint8_t *localBuffer();
diff --git a/common/facade.cpp b/common/facade.cpp
index 8cb776c..1b91ce4 100644
--- a/common/facade.cpp
+++ b/common/facade.cpp
@@ -29,6 +29,14 @@
29 29
30using namespace Sink; 30using namespace Sink;
31 31
32#undef DEBUG_AREA
33#define DEBUG_AREA "client.facade"
34
35/**
36 * A factory for resource access instances that caches the instance for some time.
37 *
38 * This avoids constantly recreating connections, and should allow a single process to have one connection per resource.
39 */
32class ResourceAccessFactory { 40class ResourceAccessFactory {
33public: 41public:
34 static ResourceAccessFactory &instance() 42 static ResourceAccessFactory &instance()
@@ -140,6 +148,7 @@ QPair<KAsync::Job<void>, typename ResultEmitter<typename DomainType::Ptr>::Ptr>
140{ 148{
141 //The runner lives for the lifetime of the query 149 //The runner lives for the lifetime of the query
142 auto runner = new QueryRunner<DomainType>(query, mResourceAccess, mResourceInstanceIdentifier, mDomainTypeAdaptorFactory, bufferTypeForDomainType()); 150 auto runner = new QueryRunner<DomainType>(query, mResourceAccess, mResourceInstanceIdentifier, mDomainTypeAdaptorFactory, bufferTypeForDomainType());
151 runner->setResultTransformation(mResultTransformation);
143 return qMakePair(KAsync::null<void>(), runner->emitter()); 152 return qMakePair(KAsync::null<void>(), runner->emitter());
144} 153}
145 154
diff --git a/common/facade.h b/common/facade.h
index c25464f..a85b1de 100644
--- a/common/facade.h
+++ b/common/facade.h
@@ -19,6 +19,7 @@
19 19
20#pragma once 20#pragma once
21 21
22#include "sink_export.h"
22#include "facadeinterface.h" 23#include "facadeinterface.h"
23 24
24#include <QByteArray> 25#include <QByteArray>
@@ -43,7 +44,7 @@ namespace Sink {
43 * Additionally a resource only has to provide a synchronizer plugin to execute the synchronization 44 * Additionally a resource only has to provide a synchronizer plugin to execute the synchronization
44 */ 45 */
45template <typename DomainType> 46template <typename DomainType>
46class GenericFacade: public Sink::StoreFacade<DomainType> 47class SINK_EXPORT GenericFacade: public Sink::StoreFacade<DomainType>
47{ 48{
48public: 49public:
49 /** 50 /**
@@ -59,9 +60,10 @@ public:
59 KAsync::Job<void> create(const DomainType &domainObject) Q_DECL_OVERRIDE; 60 KAsync::Job<void> create(const DomainType &domainObject) Q_DECL_OVERRIDE;
60 KAsync::Job<void> modify(const DomainType &domainObject) Q_DECL_OVERRIDE; 61 KAsync::Job<void> modify(const DomainType &domainObject) Q_DECL_OVERRIDE;
61 KAsync::Job<void> remove(const DomainType &domainObject) Q_DECL_OVERRIDE; 62 KAsync::Job<void> remove(const DomainType &domainObject) Q_DECL_OVERRIDE;
62 QPair<KAsync::Job<void>, typename ResultEmitter<typename DomainType::Ptr>::Ptr> load(const Sink::Query &query) Q_DECL_OVERRIDE; 63 virtual QPair<KAsync::Job<void>, typename ResultEmitter<typename DomainType::Ptr>::Ptr> load(const Sink::Query &query) Q_DECL_OVERRIDE;
63 64
64protected: 65protected:
66 std::function<void(Sink::ApplicationDomain::ApplicationDomainType &domainObject)> mResultTransformation;
65 //TODO use one resource access instance per application & per resource 67 //TODO use one resource access instance per application & per resource
66 QSharedPointer<Sink::ResourceAccessInterface> mResourceAccess; 68 QSharedPointer<Sink::ResourceAccessInterface> mResourceAccess;
67 DomainTypeAdaptorFactoryInterface::Ptr mDomainTypeAdaptorFactory; 69 DomainTypeAdaptorFactoryInterface::Ptr mDomainTypeAdaptorFactory;
diff --git a/common/facadefactory.h b/common/facadefactory.h
index 3dca63b..ef2a3f9 100644
--- a/common/facadefactory.h
+++ b/common/facadefactory.h
@@ -20,6 +20,7 @@
20 20
21#pragma once 21#pragma once
22 22
23#include "sink_export.h"
23#include <QByteArray> 24#include <QByteArray>
24#include <QDebug> 25#include <QDebug>
25#include <QMutex> 26#include <QMutex>
@@ -37,7 +38,7 @@ namespace Sink {
37 * 38 *
38 * If we were to provide default implementations for certain capabilities. Here would be the place to do so. 39 * If we were to provide default implementations for certain capabilities. Here would be the place to do so.
39 */ 40 */
40class FacadeFactory { 41class SINK_EXPORT FacadeFactory {
41public: 42public:
42 typedef std::function<std::shared_ptr<void>(const QByteArray &)> FactoryFunction; 43 typedef std::function<std::shared_ptr<void>(const QByteArray &)> FactoryFunction;
43 44
diff --git a/common/genericresource.cpp b/common/genericresource.cpp
index c7326d3..2688df0 100644
--- a/common/genericresource.cpp
+++ b/common/genericresource.cpp
@@ -17,11 +17,17 @@
17 17
18#include <QUuid> 18#include <QUuid>
19#include <QDataStream> 19#include <QDataStream>
20#include <QTime>
20 21
21static int sBatchSize = 100; 22static int sBatchSize = 100;
23//This interval directly affects the roundtrip time of single commands
24static int sCommitInterval = 10;
22 25
23using namespace Sink; 26using namespace Sink;
24 27
28#undef DEBUG_AREA
29#define DEBUG_AREA "resource.changereplay"
30
25/** 31/**
26 * Replays changes from the storage one by one. 32 * Replays changes from the storage one by one.
27 * 33 *
@@ -87,7 +93,7 @@ public Q_SLOTS:
87 const auto uid = Storage::getUidFromRevision(mainStoreTransaction, revision); 93 const auto uid = Storage::getUidFromRevision(mainStoreTransaction, revision);
88 const auto type = Storage::getTypeFromRevision(mainStoreTransaction, revision); 94 const auto type = Storage::getTypeFromRevision(mainStoreTransaction, revision);
89 const auto key = Storage::assembleKey(uid, revision); 95 const auto key = Storage::assembleKey(uid, revision);
90 mainStoreTransaction.openDatabase(type + ".main").scan(key, [&lastReplayedRevision, type, this](const QByteArray &key, const QByteArray &value) -> bool { 96 Storage::mainDatabase(mainStoreTransaction, type).scan(key, [&lastReplayedRevision, type, this](const QByteArray &key, const QByteArray &value) -> bool {
91 mReplayFunction(type, key, value).exec(); 97 mReplayFunction(type, key, value).exec();
92 //TODO make for loop async, and pass to async replay function together with type 98 //TODO make for loop async, and pass to async replay function together with type
93 Trace() << "Replaying " << key; 99 Trace() << "Replaying " << key;
@@ -110,6 +116,9 @@ private:
110 ReplayFunction mReplayFunction; 116 ReplayFunction mReplayFunction;
111}; 117};
112 118
119#undef DEBUG_AREA
120#define DEBUG_AREA "resource.commandprocessor"
121
113/** 122/**
114 * Drives the pipeline using the output from all command queues 123 * Drives the pipeline using the output from all command queues
115 */ 124 */
@@ -197,7 +206,6 @@ private slots:
197 default: 206 default:
198 return KAsync::error<qint64>(-1, "Unhandled command"); 207 return KAsync::error<qint64>(-1, "Unhandled command");
199 } 208 }
200 return KAsync::null<qint64>();
201 } 209 }
202 210
203 KAsync::Job<qint64, qint64> processQueuedCommand(const QByteArray &data) 211 KAsync::Job<qint64, qint64> processQueuedCommand(const QByteArray &data)
@@ -226,15 +234,17 @@ private slots:
226 //Process all messages of this queue 234 //Process all messages of this queue
227 KAsync::Job<void> processQueue(MessageQueue *queue) 235 KAsync::Job<void> processQueue(MessageQueue *queue)
228 { 236 {
237 auto time = QSharedPointer<QTime>::create();
229 return KAsync::start<void>([this](){ 238 return KAsync::start<void>([this](){
230 mPipeline->startTransaction(); 239 mPipeline->startTransaction();
231 }).then(KAsync::dowhile( 240 }).then(KAsync::dowhile(
232 [queue]() { return !queue->isEmpty(); }, 241 [queue]() { return !queue->isEmpty(); },
233 [this, queue](KAsync::Future<void> &future) { 242 [this, queue, time](KAsync::Future<void> &future) {
234 queue->dequeueBatch(sBatchSize, [this](const QByteArray &data) { 243 queue->dequeueBatch(sBatchSize, [this, time](const QByteArray &data) {
235 return KAsync::start<void>([this, data](KAsync::Future<void> &future) { 244 time->start();
236 processQueuedCommand(data).then<void, qint64>([&future, this](qint64 createdRevision) { 245 return KAsync::start<void>([this, data, time](KAsync::Future<void> &future) {
237 Trace() << "Created revision " << createdRevision; 246 processQueuedCommand(data).then<void, qint64>([&future, this, time](qint64 createdRevision) {
247 Trace() << "Created revision " << createdRevision << ". Processing took: " << Log::TraceTime(time->elapsed());
238 future.setFinished(); 248 future.setFinished();
239 }).exec(); 249 }).exec();
240 }); 250 });
@@ -256,21 +266,27 @@ private slots:
256 266
257 KAsync::Job<void> processPipeline() 267 KAsync::Job<void> processPipeline()
258 { 268 {
269 auto time = QSharedPointer<QTime>::create();
270 time->start();
259 mPipeline->startTransaction(); 271 mPipeline->startTransaction();
260 Trace() << "Cleaning up from " << mPipeline->cleanedUpRevision() + 1 << " to " << mLowerBoundRevision; 272 Trace() << "Cleaning up from " << mPipeline->cleanedUpRevision() + 1 << " to " << mLowerBoundRevision;
261 for (qint64 revision = mPipeline->cleanedUpRevision() + 1; revision <= mLowerBoundRevision; revision++) { 273 for (qint64 revision = mPipeline->cleanedUpRevision() + 1; revision <= mLowerBoundRevision; revision++) {
262 mPipeline->cleanupRevision(revision); 274 mPipeline->cleanupRevision(revision);
263 } 275 }
264 mPipeline->commit(); 276 mPipeline->commit();
277 Trace() << "Cleanup done." << Log::TraceTime(time->elapsed());
265 278
266 //Go through all message queues 279 //Go through all message queues
267 auto it = QSharedPointer<QListIterator<MessageQueue*> >::create(mCommandQueues); 280 auto it = QSharedPointer<QListIterator<MessageQueue*> >::create(mCommandQueues);
268 return KAsync::dowhile( 281 return KAsync::dowhile(
269 [it]() { return it->hasNext(); }, 282 [it]() { return it->hasNext(); },
270 [it, this](KAsync::Future<void> &future) { 283 [it, this](KAsync::Future<void> &future) {
284 auto time = QSharedPointer<QTime>::create();
285 time->start();
286
271 auto queue = it->next(); 287 auto queue = it->next();
272 processQueue(queue).then<void>([&future]() { 288 processQueue(queue).then<void>([&future, time]() {
273 Trace() << "Queue processed"; 289 Trace() << "Queue processed." << Log::TraceTime(time->elapsed());
274 future.setFinished(); 290 future.setFinished();
275 }).exec(); 291 }).exec();
276 } 292 }
@@ -287,6 +303,8 @@ private:
287 InspectionFunction mInspect; 303 InspectionFunction mInspect;
288}; 304};
289 305
306#undef DEBUG_AREA
307#define DEBUG_AREA "resource"
290 308
291GenericResource::GenericResource(const QByteArray &resourceInstanceIdentifier, const QSharedPointer<Pipeline> &pipeline) 309GenericResource::GenericResource(const QByteArray &resourceInstanceIdentifier, const QSharedPointer<Pipeline> &pipeline)
292 : Sink::Resource(), 310 : Sink::Resource(),
@@ -313,12 +331,14 @@ GenericResource::GenericResource(const QByteArray &resourceInstanceIdentifier, c
313 QVariant expectedValue; 331 QVariant expectedValue;
314 s >> expectedValue; 332 s >> expectedValue;
315 inspect(inspectionType, inspectionId, domainType, entityId, property, expectedValue).then<void>([=]() { 333 inspect(inspectionType, inspectionId, domainType, entityId, property, expectedValue).then<void>([=]() {
334 Log_area("resource.inspection") << "Inspection was successful: " << inspectionType << inspectionId << entityId;
316 Sink::Notification n; 335 Sink::Notification n;
317 n.type = Sink::Commands::NotificationType_Inspection; 336 n.type = Sink::Commands::NotificationType_Inspection;
318 n.id = inspectionId; 337 n.id = inspectionId;
319 n.code = Sink::Commands::NotificationCode_Success; 338 n.code = Sink::Commands::NotificationCode_Success;
320 emit notify(n); 339 emit notify(n);
321 }, [=](int code, const QString &message) { 340 }, [=](int code, const QString &message) {
341 Log() << "Inspection failed: "<< inspectionType << inspectionId << entityId << message;
322 Sink::Notification n; 342 Sink::Notification n;
323 n.type = Sink::Commands::NotificationType_Inspection; 343 n.type = Sink::Commands::NotificationType_Inspection;
324 n.message = message; 344 n.message = message;
@@ -341,7 +361,7 @@ GenericResource::GenericResource(const QByteArray &resourceInstanceIdentifier, c
341 mClientLowerBoundRevision = mPipeline->cleanedUpRevision(); 361 mClientLowerBoundRevision = mPipeline->cleanedUpRevision();
342 mProcessor->setOldestUsedRevision(mSourceChangeReplay->getLastReplayedRevision()); 362 mProcessor->setOldestUsedRevision(mSourceChangeReplay->getLastReplayedRevision());
343 363
344 mCommitQueueTimer.setInterval(100); 364 mCommitQueueTimer.setInterval(sCommitInterval);
345 mCommitQueueTimer.setSingleShot(true); 365 mCommitQueueTimer.setSingleShot(true);
346 QObject::connect(&mCommitQueueTimer, &QTimer::timeout, &mUserQueue, &MessageQueue::commit); 366 QObject::connect(&mCommitQueueTimer, &QTimer::timeout, &mUserQueue, &MessageQueue::commit);
347} 367}
@@ -381,13 +401,18 @@ KAsync::Job<void> GenericResource::replay(Sink::Storage &synchronizationStore, c
381 return KAsync::null<void>(); 401 return KAsync::null<void>();
382} 402}
383 403
404void GenericResource::removeDataFromDisk()
405{
406 removeFromDisk(mResourceInstanceIdentifier);
407}
408
384void GenericResource::removeFromDisk(const QByteArray &instanceIdentifier) 409void GenericResource::removeFromDisk(const QByteArray &instanceIdentifier)
385{ 410{
386 Warning() << "Removing from generic resource";
387 Sink::Storage(Sink::storageLocation(), instanceIdentifier, Sink::Storage::ReadWrite).removeFromDisk(); 411 Sink::Storage(Sink::storageLocation(), instanceIdentifier, Sink::Storage::ReadWrite).removeFromDisk();
388 Sink::Storage(Sink::storageLocation(), instanceIdentifier + ".userqueue", Sink::Storage::ReadWrite).removeFromDisk(); 412 Sink::Storage(Sink::storageLocation(), instanceIdentifier + ".userqueue", Sink::Storage::ReadWrite).removeFromDisk();
389 Sink::Storage(Sink::storageLocation(), instanceIdentifier + ".synchronizerqueue", Sink::Storage::ReadWrite).removeFromDisk(); 413 Sink::Storage(Sink::storageLocation(), instanceIdentifier + ".synchronizerqueue", Sink::Storage::ReadWrite).removeFromDisk();
390 Sink::Storage(Sink::storageLocation(), instanceIdentifier + ".changereplay", Sink::Storage::ReadWrite).removeFromDisk(); 414 Sink::Storage(Sink::storageLocation(), instanceIdentifier + ".changereplay", Sink::Storage::ReadWrite).removeFromDisk();
415 Sink::Storage(Sink::storageLocation(), instanceIdentifier + ".synchronization", Sink::Storage::ReadWrite).removeFromDisk();
391} 416}
392 417
393qint64 GenericResource::diskUsage(const QByteArray &instanceIdentifier) 418qint64 GenericResource::diskUsage(const QByteArray &instanceIdentifier)
@@ -556,38 +581,39 @@ void GenericResource::deleteEntity(const QByteArray &sinkId, qint64 revision, co
556 581
557void GenericResource::recordRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId, Sink::Storage::Transaction &transaction) 582void GenericResource::recordRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId, Sink::Storage::Transaction &transaction)
558{ 583{
559 Index index("rid.mapping." + bufferType, transaction); 584 Index("rid.mapping." + bufferType, transaction).add(remoteId, localId);;
560 Index localIndex("localid.mapping." + bufferType, transaction); 585 Index("localid.mapping." + bufferType, transaction).add(localId, remoteId);
561 index.add(remoteId, localId);
562 localIndex.add(localId, remoteId);
563} 586}
564 587
565void GenericResource::removeRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId, Sink::Storage::Transaction &transaction) 588void GenericResource::removeRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId, Sink::Storage::Transaction &transaction)
566{ 589{
567 Index index("rid.mapping." + bufferType, transaction); 590 Index("rid.mapping." + bufferType, transaction).remove(remoteId, localId);
568 Index localIndex("localid.mapping." + bufferType, transaction); 591 Index("localid.mapping." + bufferType, transaction).remove(localId, remoteId);
569 index.remove(remoteId, localId); 592}
570 localIndex.remove(localId, remoteId); 593
594void GenericResource::updateRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId, Sink::Storage::Transaction &transaction)
595{
596 const auto oldRemoteId = Index("localid.mapping." + bufferType, transaction).lookup(localId);
597 removeRemoteId(bufferType, localId, oldRemoteId, transaction);
598 recordRemoteId(bufferType, localId, remoteId, transaction);
571} 599}
572 600
573QByteArray GenericResource::resolveRemoteId(const QByteArray &bufferType, const QByteArray &remoteId, Sink::Storage::Transaction &transaction) 601QByteArray GenericResource::resolveRemoteId(const QByteArray &bufferType, const QByteArray &remoteId, Sink::Storage::Transaction &transaction)
574{ 602{
575 //Lookup local id for remote id, or insert a new pair otherwise 603 //Lookup local id for remote id, or insert a new pair otherwise
576 Index index("rid.mapping." + bufferType, transaction); 604 Index index("rid.mapping." + bufferType, transaction);
577 Index localIndex("localid.mapping." + bufferType, transaction);
578 QByteArray sinkId = index.lookup(remoteId); 605 QByteArray sinkId = index.lookup(remoteId);
579 if (sinkId.isEmpty()) { 606 if (sinkId.isEmpty()) {
580 sinkId = QUuid::createUuid().toString().toUtf8(); 607 sinkId = QUuid::createUuid().toString().toUtf8();
581 index.add(remoteId, sinkId); 608 index.add(remoteId, sinkId);
582 localIndex.add(sinkId, remoteId); 609 Index("localid.mapping." + bufferType, transaction).add(sinkId, remoteId);
583 } 610 }
584 return sinkId; 611 return sinkId;
585} 612}
586 613
587QByteArray GenericResource::resolveLocalId(const QByteArray &bufferType, const QByteArray &localId, Sink::Storage::Transaction &transaction) 614QByteArray GenericResource::resolveLocalId(const QByteArray &bufferType, const QByteArray &localId, Sink::Storage::Transaction &transaction)
588{ 615{
589 Index index("localid.mapping." + bufferType, transaction); 616 QByteArray remoteId = Index("localid.mapping." + bufferType, transaction).lookup(localId);
590 QByteArray remoteId = index.lookup(localId);
591 if (remoteId.isEmpty()) { 617 if (remoteId.isEmpty()) {
592 Warning() << "Couldn't find the remote id for " << localId; 618 Warning() << "Couldn't find the remote id for " << localId;
593 return QByteArray(); 619 return QByteArray();
@@ -633,7 +659,7 @@ static QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> getLatest(const Si
633 659
634void GenericResource::createOrModify(Sink::Storage::Transaction &transaction, Sink::Storage::Transaction &synchronizationTransaction, DomainTypeAdaptorFactoryInterface &adaptorFactory, const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity) 660void GenericResource::createOrModify(Sink::Storage::Transaction &transaction, Sink::Storage::Transaction &synchronizationTransaction, DomainTypeAdaptorFactoryInterface &adaptorFactory, const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity)
635{ 661{
636 auto mainDatabase = transaction.openDatabase(bufferType + ".main"); 662 auto mainDatabase = Storage::mainDatabase(transaction, bufferType);
637 const auto sinkId = resolveRemoteId(bufferType, remoteId, synchronizationTransaction); 663 const auto sinkId = resolveRemoteId(bufferType, remoteId, synchronizationTransaction);
638 const auto found = mainDatabase.contains(sinkId); 664 const auto found = mainDatabase.contains(sinkId);
639 if (!found) { 665 if (!found) {
@@ -663,4 +689,7 @@ void GenericResource::createOrModify(Sink::Storage::Transaction &transaction, Si
663} 689}
664 690
665 691
692#pragma clang diagnostic push
693#pragma clang diagnostic ignored "-Wundefined-reinterpret-cast"
666#include "genericresource.moc" 694#include "genericresource.moc"
695#pragma clang diagnostic pop
diff --git a/common/genericresource.h b/common/genericresource.h
index 4ae2645..ceb6b61 100644
--- a/common/genericresource.h
+++ b/common/genericresource.h
@@ -19,7 +19,7 @@
19 */ 19 */
20#pragma once 20#pragma once
21 21
22#include <sinkcommon_export.h> 22#include "sink_export.h"
23#include <resource.h> 23#include <resource.h>
24#include <messagequeue.h> 24#include <messagequeue.h>
25#include <flatbuffers/flatbuffers.h> 25#include <flatbuffers/flatbuffers.h>
@@ -37,7 +37,7 @@ class Preprocessor;
37/** 37/**
38 * Generic Resource implementation. 38 * Generic Resource implementation.
39 */ 39 */
40class SINKCOMMON_EXPORT GenericResource : public Resource 40class SINK_EXPORT GenericResource : public Resource
41{ 41{
42public: 42public:
43 GenericResource(const QByteArray &resourceInstanceIdentifier, const QSharedPointer<Pipeline> &pipeline = QSharedPointer<Pipeline>()); 43 GenericResource(const QByteArray &resourceInstanceIdentifier, const QSharedPointer<Pipeline> &pipeline = QSharedPointer<Pipeline>());
@@ -52,6 +52,7 @@ public:
52 52
53 int error() const; 53 int error() const;
54 54
55 void removeDataFromDisk() Q_DECL_OVERRIDE;
55 static void removeFromDisk(const QByteArray &instanceIdentifier); 56 static void removeFromDisk(const QByteArray &instanceIdentifier);
56 static qint64 diskUsage(const QByteArray &instanceIdentifier); 57 static qint64 diskUsage(const QByteArray &instanceIdentifier);
57 58
@@ -74,6 +75,7 @@ protected:
74 */ 75 */
75 void recordRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId, Sink::Storage::Transaction &transaction); 76 void recordRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId, Sink::Storage::Transaction &transaction);
76 void removeRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId, Sink::Storage::Transaction &transaction); 77 void removeRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId, Sink::Storage::Transaction &transaction);
78 void updateRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId, Sink::Storage::Transaction &transaction);
77 79
78 /** 80 /**
79 * Tries to find a local id for the remote id, and creates a new local id otherwise. 81 * Tries to find a local id for the remote id, and creates a new local id otherwise.
diff --git a/common/index.h b/common/index.h
index 20213b2..1a5b250 100644
--- a/common/index.h
+++ b/common/index.h
@@ -1,5 +1,6 @@
1#pragma once 1#pragma once
2 2
3#include "sink_export.h"
3#include <string> 4#include <string>
4#include <functional> 5#include <functional>
5#include <QString> 6#include <QString>
@@ -8,7 +9,7 @@
8/** 9/**
9 * An index for value pairs. 10 * An index for value pairs.
10 */ 11 */
11class Index 12class SINK_EXPORT Index
12{ 13{
13public: 14public:
14 enum ErrorCodes { 15 enum ErrorCodes {
diff --git a/common/inspection.h b/common/inspection.h
index b9f6bf3..d85eab6 100644
--- a/common/inspection.h
+++ b/common/inspection.h
@@ -24,7 +24,7 @@
24#include "applicationdomaintype.h" 24#include "applicationdomaintype.h"
25 25
26namespace Sink { 26namespace Sink {
27 namespace Resources { 27 namespace ResourceControl {
28 28
29struct Inspection { 29struct Inspection {
30 static Inspection PropertyInspection(const Sink::ApplicationDomain::Entity &entity, const QByteArray &property, const QVariant &expectedValue) 30 static Inspection PropertyInspection(const Sink::ApplicationDomain::Entity &entity, const QByteArray &property, const QVariant &expectedValue)
diff --git a/common/listener.cpp b/common/listener.cpp
index 13ebbbb..ed6f305 100644
--- a/common/listener.cpp
+++ b/common/listener.cpp
@@ -39,6 +39,9 @@
39#include <QTime> 39#include <QTime>
40#include <QDataStream> 40#include <QDataStream>
41 41
42#undef DEBUG_AREA
43#define DEBUG_AREA "resource.communication"
44
42Listener::Listener(const QByteArray &resourceInstanceIdentifier, QObject *parent) 45Listener::Listener(const QByteArray &resourceInstanceIdentifier, QObject *parent)
43 : QObject(parent), 46 : QObject(parent),
44 m_server(new QLocalServer(this)), 47 m_server(new QLocalServer(this)),
@@ -234,7 +237,7 @@ void Listener::processCommand(int commandId, uint messageId, const QByteArray &c
234 job = job.then<void>(loadResource()->processAllMessages()); 237 job = job.then<void>(loadResource()->processAllMessages());
235 } 238 }
236 job.then<void>([callback, timer]() { 239 job.then<void>([callback, timer]() {
237 Trace() << "Sync took " << timer->elapsed(); 240 Trace() << "Sync took " << Sink::Log::TraceTime(timer->elapsed());
238 callback(true); 241 callback(true);
239 }).exec(); 242 }).exec();
240 return; 243 return;
@@ -272,6 +275,14 @@ void Listener::processCommand(int commandId, uint messageId, const QByteArray &c
272 loadResource()->setLowerBoundRevision(lowerBoundRevision()); 275 loadResource()->setLowerBoundRevision(lowerBoundRevision());
273 } 276 }
274 break; 277 break;
278 case Sink::Commands::RemoveFromDiskCommand: {
279 Log() << QString("\tReceived a remove from disk command from %1").arg(client.name);
280 m_resource->removeDataFromDisk();
281 delete m_resource;
282 m_resource = nullptr;
283 loadResource()->setLowerBoundRevision(0);
284 }
285 break;
275 default: 286 default:
276 if (commandId > Sink::Commands::CustomCommand) { 287 if (commandId > Sink::Commands::CustomCommand) {
277 Log() << QString("\tReceived custom command from %1: ").arg(client.name) << commandId; 288 Log() << QString("\tReceived custom command from %1: ").arg(client.name) << commandId;
@@ -424,3 +435,7 @@ Sink::Resource *Listener::loadResource()
424 return m_resource; 435 return m_resource;
425} 436}
426 437
438#pragma clang diagnostic push
439#pragma clang diagnostic ignored "-Wundefined-reinterpret-cast"
440#include "moc_listener.cpp"
441#pragma clang diagnostic pop
diff --git a/common/listener.h b/common/listener.h
index e17f315..5bcc93e 100644
--- a/common/listener.h
+++ b/common/listener.h
@@ -19,6 +19,7 @@
19 19
20#pragma once 20#pragma once
21 21
22#include "sink_export.h"
22#include <QObject> 23#include <QObject>
23 24
24#include <QPointer> 25#include <QPointer>
@@ -56,7 +57,7 @@ public:
56 qint64 currentRevision; 57 qint64 currentRevision;
57}; 58};
58 59
59class Listener : public QObject 60class SINK_EXPORT Listener : public QObject
60{ 61{
61 Q_OBJECT 62 Q_OBJECT
62 63
diff --git a/common/listmodelresult.h b/common/listmodelresult.h
deleted file mode 100644
index 71a0d09..0000000
--- a/common/listmodelresult.h
+++ /dev/null
@@ -1,125 +0,0 @@
1/*
2 * Copyright (C) 2014 Christian Mollekopf <chrigi_1@fastmail.fm>
3 *
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Lesser General Public
6 * License as published by the Free Software Foundation; either
7 * version 2.1 of the License, or (at your option) version 3, or any
8 * later version accepted by the membership of KDE e.V. (or its
9 * successor approved by the membership of KDE e.V.), which shall
10 * act as a proxy defined in Section 6 of version 3 of the license.
11 *
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this library. If not, see <http://www.gnu.org/licenses/>.
19 */
20
21#pragma once
22#include <QAbstractListModel>
23#include <QDebug>
24
25#include "resultprovider.h"
26
27enum Roles {
28 DomainObjectRole = Qt::UserRole + 1
29};
30
31template<class T>
32class ListModelResult : public QAbstractListModel
33{
34public:
35
36 ListModelResult(const QList<QByteArray> &propertyColumns)
37 :QAbstractListModel(),
38 mPropertyColumns(propertyColumns)
39 {
40 }
41
42 ListModelResult(const QSharedPointer<Sink::ResultEmitter<T> > &emitter, const QList<QByteArray> &propertyColumns)
43 :QAbstractListModel(),
44 mPropertyColumns(propertyColumns)
45 {
46 setEmitter(emitter);
47 }
48
49 void setEmitter(const QSharedPointer<Sink::ResultEmitter<T> > &emitter)
50 {
51 beginResetModel();
52 mEntities.clear();
53 mEmitter = emitter;
54 emitter->onAdded([this](const T &value) {
55 const auto keys = mEntities.keys();
56 int index = 0;
57 for (; index < keys.size(); index++) {
58 if (value->identifier() < keys.at(index)) {
59 break;
60 }
61 }
62 beginInsertRows(QModelIndex(), index, index);
63 mEntities.insert(value->identifier(), value);
64 endInsertRows();
65 });
66 emitter->onModified([this](const T &value) {
67 auto i = mEntities.keys().indexOf(value->identifier());
68 mEntities.remove(value->identifier());
69 mEntities.insert(value->identifier(), value);
70 auto idx = index(i, 0, QModelIndex());
71 emit dataChanged(idx, idx);
72 });
73 emitter->onRemoved([this](const T &value) {
74 auto index = mEntities.keys().indexOf(value->identifier());
75 beginRemoveRows(QModelIndex(), index, index);
76 mEntities.remove(value->identifier());
77 endRemoveRows();
78 });
79 emitter->onInitialResultSetComplete([this]() {
80 });
81 emitter->onComplete([this]() {
82 mEmitter.clear();
83 });
84 emitter->onClear([this]() {
85 beginResetModel();
86 mEntities.clear();
87 endResetModel();
88 });
89 endResetModel();
90 }
91
92 int rowCount(const QModelIndex &parent = QModelIndex()) const
93 {
94 return mEntities.size();
95 }
96
97 int columnCount(const QModelIndex &parent = QModelIndex()) const
98 {
99 return mPropertyColumns.size();
100 }
101
102 virtual QVariant data(const QModelIndex &index, int role = Qt::DisplayRole) const
103 {
104 if (index.row() >= mEntities.size()) {
105 qWarning() << "Out of bounds access";
106 return QVariant();
107 }
108 if (role == Qt::DisplayRole) {
109 if (index.column() < mPropertyColumns.size()) {
110 auto entity = mEntities.value(mEntities.keys().at(index.row()));
111 return entity->getProperty(mPropertyColumns.at(index.column())).toString();
112 }
113 }
114 if (role == DomainObjectRole) {
115 return QVariant::fromValue(mEntities.value(mEntities.keys().at(index.row())));
116 }
117 return QVariant();
118 }
119
120private:
121 QSharedPointer<Sink::ResultEmitter<T> > mEmitter;
122 QMap<QByteArray, T> mEntities;
123 QList<QByteArray> mPropertyColumns;
124};
125
diff --git a/common/log.cpp b/common/log.cpp
index 45bbec1..96c6f82 100644
--- a/common/log.cpp
+++ b/common/log.cpp
@@ -3,11 +3,19 @@
3#include <QString> 3#include <QString>
4#include <QIODevice> 4#include <QIODevice>
5#include <QCoreApplication> 5#include <QCoreApplication>
6#include <QSettings>
7#include <QStandardPaths>
8#include <QSharedPointer>
6#include <iostream> 9#include <iostream>
7#include <unistd.h> 10#include <unistd.h>
8 11
9using namespace Sink::Log; 12using namespace Sink::Log;
10 13
14static QSharedPointer<QSettings> config()
15{
16 return QSharedPointer<QSettings>::create(QStandardPaths::writableLocation(QStandardPaths::GenericDataLocation) + "/sink/log.ini", QSettings::IniFormat);
17}
18
11class DebugStream: public QIODevice 19class DebugStream: public QIODevice
12{ 20{
13public: 21public:
@@ -17,24 +25,24 @@ public:
17 { 25 {
18 open(WriteOnly); 26 open(WriteOnly);
19 } 27 }
20 virtual ~DebugStream(){}; 28 virtual ~DebugStream();
21 29
22 bool isSequential() const { return true; } 30 bool isSequential() const { return true; }
23 qint64 readData(char *, qint64) { return 0; /* eof */ } 31 qint64 readData(char *, qint64) { return 0; /* eof */ }
24 qint64 readLineData(char *, qint64) { return 0; /* eof */ } 32 qint64 readLineData(char *, qint64) { return 0; /* eof */ }
25 qint64 writeData(const char *data, qint64 len) 33 qint64 writeData(const char *data, qint64 len)
26 { 34 {
27 const QByteArray buf = QByteArray::fromRawData(data, len); 35 std::cout << data << std::endl;
28 // if (!qgetenv("IMAP_TRACE").isEmpty()) {
29 // qt_message_output(QtDebugMsg, buf.trimmed().constData());
30 std::cout << buf.trimmed().constData() << std::endl;
31 // }
32 return len; 36 return len;
33 } 37 }
34private: 38private:
35 Q_DISABLE_COPY(DebugStream) 39 Q_DISABLE_COPY(DebugStream)
36}; 40};
37 41
42//Virtual method anchor
43DebugStream::~DebugStream()
44{}
45
38class NullStream: public QIODevice 46class NullStream: public QIODevice
39{ 47{
40public: 48public:
@@ -43,7 +51,7 @@ public:
43 { 51 {
44 open(WriteOnly); 52 open(WriteOnly);
45 } 53 }
46 virtual ~NullStream(){}; 54 virtual ~NullStream();
47 55
48 bool isSequential() const { return true; } 56 bool isSequential() const { return true; }
49 qint64 readData(char *, qint64) { return 0; /* eof */ } 57 qint64 readData(char *, qint64) { return 0; /* eof */ }
@@ -56,6 +64,10 @@ private:
56 Q_DISABLE_COPY(NullStream) 64 Q_DISABLE_COPY(NullStream)
57}; 65};
58 66
67//Virtual method anchor
68NullStream::~NullStream()
69{}
70
59 /* 71 /*
60 * ANSI color codes: 72 * ANSI color codes:
61 * 0: reset colors/style 73 * 0: reset colors/style
@@ -130,31 +142,95 @@ DebugLevel Sink::Log::debugLevelFromName(const QByteArray &name)
130 142
131void Sink::Log::setDebugOutputLevel(DebugLevel debugLevel) 143void Sink::Log::setDebugOutputLevel(DebugLevel debugLevel)
132{ 144{
133 qputenv("SINKDEBUGLEVEL", debugLevelName(debugLevel)); 145 config()->setValue("level", debugLevel);
134} 146}
135 147
136Sink::Log::DebugLevel Sink::Log::debugOutputLevel() 148Sink::Log::DebugLevel Sink::Log::debugOutputLevel()
137{ 149{
138 return debugLevelFromName(qgetenv("SINKDEBUGLEVEL")); 150 return static_cast<Sink::Log::DebugLevel>(config()->value("level", Sink::Log::Log).toInt());
151}
152
153void Sink::Log::setDebugOutputFilter(FilterType type, const QByteArrayList &filter)
154{
155 switch (type) {
156 case ApplicationName:
157 config()->setValue("applicationfilter", QVariant::fromValue(filter));
158 break;
159 case Area:
160 config()->setValue("areafilter", QVariant::fromValue(filter));
161 break;
162 }
163}
164
165QByteArrayList Sink::Log::debugOutputFilter(FilterType type)
166{
167 switch (type) {
168 case ApplicationName:
169 return config()->value("applicationfilter").value<QByteArrayList>();
170 case Area:
171 return config()->value("areafilter").value<QByteArrayList>();
172 }
173}
174
175void Sink::Log::setDebugOutputFields(const QByteArrayList &output)
176{
177 config()->setValue("outputfields", QVariant::fromValue(output));
178}
179
180QByteArrayList Sink::Log::debugOutputFields()
181{
182 return config()->value("outputfields").value<QByteArrayList>();
183}
184
185static QByteArray getProgramName()
186{
187 if (QCoreApplication::instance()) {
188 return QCoreApplication::instance()->applicationName().toLocal8Bit();
189 } else {
190 return "<unknown program name>";
191 }
192}
193
194static bool containsItemStartingWith(const QByteArray &pattern, const QByteArrayList &list)
195{
196 for (const auto &item : list) {
197 if (pattern.startsWith(item)) {
198 return true;
199 }
200 }
201 return false;
202}
203
204static bool caseInsensitiveContains(const QByteArray &pattern, const QByteArrayList &list)
205{
206 for (const auto &item : list) {
207 if (item.toLower() == pattern) {
208 return true;
209 }
210 }
211 return false;
139} 212}
140 213
141QDebug Sink::Log::debugStream(DebugLevel debugLevel, int line, const char* file, const char* function, const char* debugArea) 214QDebug Sink::Log::debugStream(DebugLevel debugLevel, int line, const char* file, const char* function, const char* debugArea)
142{ 215{
143 DebugLevel debugOutputLevel = debugLevelFromName(qgetenv("SINKDEBUGLEVEL")); 216 static NullStream nullstream;
144 if (debugLevel < debugOutputLevel) { 217 if (debugLevel < debugOutputLevel()) {
145 static NullStream stream; 218 return QDebug(&nullstream);
146 return QDebug(&stream);
147 } 219 }
148 220
149 static DebugStream stream; 221 auto areas = debugOutputFilter(Sink::Log::Area);
150 QDebug debug(&stream); 222 if (debugArea && !areas.isEmpty()) {
223 if (!containsItemStartingWith(debugArea, areas)) {
224 return QDebug(&nullstream);
225 }
226 }
227 static QByteArray programName = getProgramName();
151 228
152 static QByteArray programName; 229 auto filter = debugOutputFilter(Sink::Log::ApplicationName);
153 if (programName.isEmpty()) { 230 if (!filter.isEmpty() && !filter.contains(programName)) {
154 if (QCoreApplication::instance()) 231 if (!containsItemStartingWith(programName, filter)) {
155 programName = QCoreApplication::instance()->applicationName().toLocal8Bit(); 232 return QDebug(&nullstream);
156 else 233 }
157 programName = "<unknown program name>";
158 } 234 }
159 235
160 QString prefix; 236 QString prefix;
@@ -175,29 +251,54 @@ QDebug Sink::Log::debugStream(DebugLevel debugLevel, int line, const char* file,
175 prefix = "Error: "; 251 prefix = "Error: ";
176 prefixColorCode = ANSI_Colors::Red; 252 prefixColorCode = ANSI_Colors::Red;
177 break; 253 break;
178 default:
179 break;
180 }; 254 };
181 255
182 bool showLocation = false; 256 auto debugOutput = debugOutputFields();
183 bool showProgram = true; 257
258 bool showLocation = debugOutput.isEmpty() ? false : caseInsensitiveContains("location", debugOutput);
259 bool showFunction = debugOutput.isEmpty() ? false : caseInsensitiveContains("function", debugOutput);
260 bool showProgram = debugOutput.isEmpty() ? false : caseInsensitiveContains("application", debugOutput);
261 bool useColor = true;
262 bool multiline = false;
184 263
185 const QString resetColor = colorCommand(ANSI_Colors::Reset); 264 const QString resetColor = colorCommand(ANSI_Colors::Reset);
186 QString output; 265 QString output;
187 output += colorCommand(QList<int>() << ANSI_Colors::Bold << prefixColorCode) + prefix + resetColor; 266 if (useColor) {
267 output += colorCommand(QList<int>() << ANSI_Colors::Bold << prefixColorCode);
268 }
269 output += prefix;
270 if (useColor) {
271 output += resetColor;
272 }
188 if (showProgram) { 273 if (showProgram) {
189 output += QString(" %1(%2)").arg(QString::fromLatin1(programName)).arg(unsigned(getpid())); 274 int width = 10;
275 output += QString(" %1(%2)").arg(QString::fromLatin1(programName).leftJustified(width, ' ', true)).arg(unsigned(getpid())).rightJustified(width + 8, ' ');
190 } 276 }
191 if (debugArea) { 277 if (debugArea) {
192 output += colorCommand(QList<int>() << ANSI_Colors::Bold << prefixColorCode) + QString(" %1 ").arg(QString::fromLatin1(debugArea)) + resetColor; 278 if (useColor) {
279 output += colorCommand(QList<int>() << ANSI_Colors::Bold << prefixColorCode);
280 }
281 output += QString(" %1 ").arg(QString::fromLatin1(debugArea).leftJustified(25, ' ', true));
282 if (useColor) {
283 output += resetColor;
284 }
285 }
286 if (showFunction) {
287 output += QString(" %3").arg(QString::fromLatin1(function).leftJustified(25, ' ', true));
193 } 288 }
194 if (showLocation) { 289 if (showLocation) {
195 output += QString(" %3").arg(function); 290 const auto filename = QString::fromLatin1(file).split('/').last();
196 output += QString("%1:%2").arg(file).arg(line); 291 output += QString(" %1:%2").arg(filename.right(25)).arg(QString::number(line).leftJustified(4, ' ')).leftJustified(30, ' ', true);
292 }
293 if (multiline) {
294 output += "\n ";
197 } 295 }
198 output += ":"; 296 output += ":";
199 297
200 debug << output; 298 static DebugStream stream;
299 QDebug debug(&stream);
300
301 debug.noquote().nospace() << output;
201 302
202 return debug; 303 return debug.space().quote();
203} 304}
diff --git a/common/log.h b/common/log.h
index 483f16f..415c7f7 100644
--- a/common/log.h
+++ b/common/log.h
@@ -1,5 +1,6 @@
1#pragma once 1#pragma once
2 2
3#include "sink_export.h"
3#include <QDebug> 4#include <QDebug>
4 5
5namespace Sink { 6namespace Sink {
@@ -12,19 +13,75 @@ enum DebugLevel {
12 Error 13 Error
13}; 14};
14 15
15QByteArray debugLevelName(DebugLevel debugLevel); 16QByteArray SINK_EXPORT debugLevelName(DebugLevel debugLevel);
16DebugLevel debugLevelFromName(const QByteArray &name); 17DebugLevel SINK_EXPORT debugLevelFromName(const QByteArray &name);
17 18
18void setDebugOutputLevel(DebugLevel); 19/**
19DebugLevel debugOutputLevel(); 20 * Sets the debug output level.
21 *
22 * Everything below is ignored.
23 */
24void SINK_EXPORT setDebugOutputLevel(DebugLevel);
25DebugLevel SINK_EXPORT debugOutputLevel();
20 26
21QDebug debugStream(DebugLevel debugLevel, int line, const char* file, const char* function, const char* debugArea = 0); 27enum FilterType {
28 Area,
29 ApplicationName
30};
31
32/**
33 * Sets a debug output filter.
34 *
35 * Everything that is not matching the filter is ignored.
36 * An empty filter matches everything.
37 *
38 * Note: In case of resources the application name is the identifier.
39 */
40void SINK_EXPORT setDebugOutputFilter(FilterType, const QByteArrayList &filter);
41QByteArrayList SINK_EXPORT debugOutputFilter(FilterType type);
42
43/**
44 * Set the debug output fields.
45 *
46 * Currently supported are:
47 * * Name: Application name used for filter.
48 * * Function: The function name:
49 * * Location: The source code location.
50 *
51 * These are additional items to the default ones (level, area, message).
52 */
53void SINK_EXPORT setDebugOutputFields(const QByteArrayList &filter);
54QByteArrayList SINK_EXPORT debugOutputFields();
55
56QDebug SINK_EXPORT debugStream(DebugLevel debugLevel, int line, const char* file, const char* function, const char* debugArea = 0);
57
58struct SINK_EXPORT TraceTime
59{
60 TraceTime(int i) : time(i){};
61 const int time;
62};
63
64inline QDebug SINK_EXPORT operator<<(QDebug d, const TraceTime &time)
65{
66 d << time.time << "[ms]";
67 return d;
68}
22 69
23} 70}
24} 71}
25 72
26#define Trace() Sink::Log::debugStream(Sink::Log::DebugLevel::Trace, __LINE__, __FILE__, Q_FUNC_INFO) 73#define DEBUG_AREA nullptr
27#define Log() Sink::Log::debugStream(Sink::Log::DebugLevel::Log, __LINE__, __FILE__, Q_FUNC_INFO) 74
28#define Warning() Sink::Log::debugStream(Sink::Log::DebugLevel::Warning, __LINE__, __FILE__, Q_FUNC_INFO) 75#define Trace_() Sink::Log::debugStream(Sink::Log::DebugLevel::Trace, __LINE__, __FILE__, Q_FUNC_INFO)
76#define Log_() Sink::Log::debugStream(Sink::Log::DebugLevel::Log, __LINE__, __FILE__, Q_FUNC_INFO)
77
78#define Trace_area(AREA) Sink::Log::debugStream(Sink::Log::DebugLevel::Trace, __LINE__, __FILE__, Q_FUNC_INFO, AREA)
79#define Log_area(AREA) Sink::Log::debugStream(Sink::Log::DebugLevel::Log, __LINE__, __FILE__, Q_FUNC_INFO, AREA)
80#define Warning_area(AREA) Sink::Log::debugStream(Sink::Log::DebugLevel::Warning, __LINE__, __FILE__, Q_FUNC_INFO, AREA)
81#define Error_area(AREA) Sink::Log::debugStream(Sink::Log::DebugLevel::Error, __LINE__, __FILE__, Q_FUNC_INFO, AREA)
82
83#define Trace() Sink::Log::debugStream(Sink::Log::DebugLevel::Trace, __LINE__, __FILE__, Q_FUNC_INFO, DEBUG_AREA)
84#define Log() Sink::Log::debugStream(Sink::Log::DebugLevel::Log, __LINE__, __FILE__, Q_FUNC_INFO, DEBUG_AREA)
85#define Warning() Sink::Log::debugStream(Sink::Log::DebugLevel::Warning, __LINE__, __FILE__, Q_FUNC_INFO, DEBUG_AREA)
29//FIXME Error clashes with Storage::Error and MessageQueue::Error 86//FIXME Error clashes with Storage::Error and MessageQueue::Error
30#define ErrorMsg() Sink::Log::debugStream(Sink::Log::DebugLevel::Error, __LINE__, __FILE__, Q_FUNC_INFO) 87#define ErrorMsg() Sink::Log::debugStream(Sink::Log::DebugLevel::Error, __LINE__, __FILE__, Q_FUNC_INFO, DEBUG_AREA)
diff --git a/common/messagequeue.cpp b/common/messagequeue.cpp
index 1055922..73198a5 100644
--- a/common/messagequeue.cpp
+++ b/common/messagequeue.cpp
@@ -174,3 +174,7 @@ bool MessageQueue::isEmpty()
174 return count == 0; 174 return count == 0;
175} 175}
176 176
177#pragma clang diagnostic push
178#pragma clang diagnostic ignored "-Wundefined-reinterpret-cast"
179#include "moc_messagequeue.cpp"
180#pragma clang diagnostic pop
diff --git a/common/messagequeue.h b/common/messagequeue.h
index 3206388..9399055 100644
--- a/common/messagequeue.h
+++ b/common/messagequeue.h
@@ -1,5 +1,6 @@
1#pragma once 1#pragma once
2 2
3#include "sink_export.h"
3#include <QObject> 4#include <QObject>
4#include <QByteArrayList> 5#include <QByteArrayList>
5#include <string> 6#include <string>
@@ -11,7 +12,7 @@
11/** 12/**
12 * A persistent FIFO message queue. 13 * A persistent FIFO message queue.
13 */ 14 */
14class MessageQueue : public QObject 15class SINK_EXPORT MessageQueue : public QObject
15{ 16{
16 Q_OBJECT 17 Q_OBJECT
17public: 18public:
diff --git a/common/modelresult.cpp b/common/modelresult.cpp
index 3a9fb95..f28c665 100644
--- a/common/modelresult.cpp
+++ b/common/modelresult.cpp
@@ -24,6 +24,9 @@
24#include "domain/folder.h" 24#include "domain/folder.h"
25#include "log.h" 25#include "log.h"
26 26
27#undef DEBUG_AREA
28#define DEBUG_AREA "client.modelresult"
29
27static uint qHash(const Sink::ApplicationDomain::ApplicationDomainType &type) 30static uint qHash(const Sink::ApplicationDomain::ApplicationDomainType &type)
28{ 31{
29 Q_ASSERT(!type.resourceInstanceIdentifier().isEmpty()); 32 Q_ASSERT(!type.resourceInstanceIdentifier().isEmpty());
diff --git a/common/notification.h b/common/notification.h
index ae24bd2..cf69a99 100644
--- a/common/notification.h
+++ b/common/notification.h
@@ -19,7 +19,7 @@
19 */ 19 */
20#pragma once 20#pragma once
21 21
22#include <sinkcommon_export.h> 22#include "sink_export.h"
23#include <QString> 23#include <QString>
24 24
25namespace Sink 25namespace Sink
@@ -28,7 +28,7 @@ namespace Sink
28/** 28/**
29 * A notification 29 * A notification
30 */ 30 */
31class SINKCOMMON_EXPORT Notification 31class SINK_EXPORT Notification
32{ 32{
33public: 33public:
34 QByteArray id; 34 QByteArray id;
diff --git a/common/notifier.cpp b/common/notifier.cpp
new file mode 100644
index 0000000..e4248df
--- /dev/null
+++ b/common/notifier.cpp
@@ -0,0 +1,69 @@
1/*
2 * Copyright (C) 2015 Christian Mollekopf <chrigi_1@fastmail.fm>
3 *
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Lesser General Public
6 * License as published by the Free Software Foundation; either
7 * version 2.1 of the License, or (at your option) version 3, or any
8 * later version accepted by the membership of KDE e.V. (or its
9 * successor approved by the membership of KDE e.V.), which shall
10 * act as a proxy defined in Section 6 of version 3 of the license.
11 *
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this library. If not, see <http://www.gnu.org/licenses/>.
19 */
20
21#include "notifier.h"
22
23#include <functional>
24
25#include "resourceaccess.h"
26#include "log.h"
27
28using namespace Sink;
29
30class Sink::Notifier::Private {
31public:
32 Private()
33 : context(new QObject)
34 {
35
36 }
37 QList<QSharedPointer<ResourceAccess> > resourceAccess;
38 QList<std::function<void(const Notification &)> > handler;
39 QSharedPointer<QObject> context;
40};
41
42Notifier::Notifier(const QSharedPointer<ResourceAccess> &resourceAccess)
43 : d(new Sink::Notifier::Private)
44{
45 QObject::connect(resourceAccess.data(), &ResourceAccess::notification, d->context.data(), [this](const Notification &notification) {
46 for (const auto &handler : d->handler) {
47 handler(notification);
48 }
49 });
50 d->resourceAccess << resourceAccess;
51}
52
53Notifier::Notifier(const QByteArray &instanceIdentifier)
54 : d(new Sink::Notifier::Private)
55{
56 auto resourceAccess = Sink::ResourceAccess::Ptr::create(instanceIdentifier);
57 resourceAccess->open();
58 QObject::connect(resourceAccess.data(), &ResourceAccess::notification, d->context.data(), [this](const Notification &notification) {
59 for (const auto &handler : d->handler) {
60 handler(notification);
61 }
62 });
63 d->resourceAccess << resourceAccess;
64}
65
66void Notifier::registerHandler(std::function<void(const Notification &)> handler)
67{
68 d->handler << handler;
69}
diff --git a/common/listmodelresult.cpp b/common/notifier.h
index 6ef1c5f..d16a311 100644
--- a/common/listmodelresult.cpp
+++ b/common/notifier.h
@@ -1,5 +1,5 @@
1/* 1/*
2 * Copyright (C) 2014 Christian Mollekopf <chrigi_1@fastmail.fm> 2 * Copyright (C) 2015 Christian Mollekopf <chrigi_1@fastmail.fm>
3 * 3 *
4 * This library is free software; you can redistribute it and/or 4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Lesser General Public 5 * modify it under the terms of the GNU Lesser General Public
@@ -18,4 +18,29 @@
18 * License along with this library. If not, see <http://www.gnu.org/licenses/>. 18 * License along with this library. If not, see <http://www.gnu.org/licenses/>.
19 */ 19 */
20 20
21#include "listmodelresult.h" 21#pragma once
22
23#include "sink_export.h"
24#include <QByteArray>
25#include <QSharedPointer>
26
27#include <Async/Async>
28
29class QAbstractItemModel;
30
31namespace Sink {
32class ResourceAccess;
33class Notification;
34
35class SINK_EXPORT Notifier {
36public:
37 Notifier(const QSharedPointer<ResourceAccess> &resourceAccess);
38 Notifier(const QByteArray &resourceInstanceIdentifier);
39 void registerHandler(std::function<void(const Notification &)>);
40
41private:
42 class Private;
43 QSharedPointer<Private> d;
44};
45
46}
diff --git a/common/pipeline.cpp b/common/pipeline.cpp
index 401c26d..93d8236 100644
--- a/common/pipeline.cpp
+++ b/common/pipeline.cpp
@@ -25,6 +25,7 @@
25#include <QVector> 25#include <QVector>
26#include <QUuid> 26#include <QUuid>
27#include <QDebug> 27#include <QDebug>
28#include <QTime>
28#include "entity_generated.h" 29#include "entity_generated.h"
29#include "metadata_generated.h" 30#include "metadata_generated.h"
30#include "createentity_generated.h" 31#include "createentity_generated.h"
@@ -36,6 +37,9 @@
36#include "definitions.h" 37#include "definitions.h"
37#include "bufferutils.h" 38#include "bufferutils.h"
38 39
40#undef DEBUG_AREA
41#define DEBUG_AREA "resource.pipeline"
42
39namespace Sink 43namespace Sink
40{ 44{
41 45
@@ -53,8 +57,24 @@ public:
53 QHash<QString, QVector<Preprocessor *> > processors; 57 QHash<QString, QVector<Preprocessor *> > processors;
54 QHash<QString, DomainTypeAdaptorFactoryInterface::Ptr> adaptorFactory; 58 QHash<QString, DomainTypeAdaptorFactoryInterface::Ptr> adaptorFactory;
55 bool revisionChanged; 59 bool revisionChanged;
60 void storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid);
61 QTime transactionTime;
62 int transactionItemCount;
56}; 63};
57 64
65void Pipeline::Private::storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid)
66{
67 Storage::mainDatabase(transaction, bufferType).write(Storage::assembleKey(uid, newRevision), BufferUtils::extractBuffer(fbb),
68 [uid, newRevision](const Storage::Error &error) {
69 Warning() << "Failed to write entity" << uid << newRevision;
70 }
71 );
72 revisionChanged = true;
73 Storage::setMaxRevision(transaction, newRevision);
74 Storage::recordRevision(transaction, newRevision, uid, bufferType);
75}
76
77
58Pipeline::Pipeline(const QString &resourceName, QObject *parent) 78Pipeline::Pipeline(const QString &resourceName, QObject *parent)
59 : QObject(parent), 79 : QObject(parent),
60 d(new Private(resourceName)) 80 d(new Private(resourceName))
@@ -86,7 +106,10 @@ void Pipeline::startTransaction()
86 if (d->transaction) { 106 if (d->transaction) {
87 return; 107 return;
88 } 108 }
89 d->transaction = std::move(storage().createTransaction(Sink::Storage::ReadWrite)); 109 Trace() << "Starting transaction.";
110 d->transactionTime.start();
111 d->transactionItemCount = 0;
112 d->transaction = std::move(storage().createTransaction(Storage::ReadWrite));
90} 113}
91 114
92void Pipeline::commit() 115void Pipeline::commit()
@@ -96,8 +119,9 @@ void Pipeline::commit()
96 // for (auto processor : d->processors[bufferType]) { 119 // for (auto processor : d->processors[bufferType]) {
97 // processor->finalize(); 120 // processor->finalize();
98 // } 121 // }
99 const auto revision = Sink::Storage::maxRevision(d->transaction); 122 const auto revision = Storage::maxRevision(d->transaction);
100 Trace() << "Committing " << revision; 123 const auto elapsed = d->transactionTime.elapsed();
124 Trace() << "Committing revision: " << revision << ":" << d->transactionItemCount << " items in: " << Log::TraceTime(elapsed) << " " << (double)elapsed/(double)qMax(d->transactionItemCount, 1) << "[ms/item]";
101 if (d->transaction) { 125 if (d->transaction) {
102 d->transaction.commit(); 126 d->transaction.commit();
103 } 127 }
@@ -118,41 +142,30 @@ Storage &Pipeline::storage() const
118 return d->storage; 142 return d->storage;
119} 143}
120 144
121void Pipeline::storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid)
122{
123 d->transaction.openDatabase(bufferType + ".main").write(Sink::Storage::assembleKey(uid, newRevision), BufferUtils::extractBuffer(fbb),
124 [](const Sink::Storage::Error &error) {
125 Warning() << "Failed to write entity";
126 }
127 );
128 d->revisionChanged = true;
129 Sink::Storage::setMaxRevision(d->transaction, newRevision);
130 Sink::Storage::recordRevision(d->transaction, newRevision, uid, bufferType);
131}
132
133KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size) 145KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size)
134{ 146{
135 Trace() << "Pipeline: New Entity"; 147 Trace() << "Pipeline: New Entity";
148 d->transactionItemCount++;
136 149
137 { 150 {
138 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size); 151 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size);
139 if (!Sink::Commands::VerifyCreateEntityBuffer(verifyer)) { 152 if (!Commands::VerifyCreateEntityBuffer(verifyer)) {
140 Warning() << "invalid buffer, not a create entity buffer"; 153 Warning() << "invalid buffer, not a create entity buffer";
141 return KAsync::error<qint64>(0); 154 return KAsync::error<qint64>(0);
142 } 155 }
143 } 156 }
144 auto createEntity = Sink::Commands::GetCreateEntity(command); 157 auto createEntity = Commands::GetCreateEntity(command);
145 158
146 const bool replayToSource = createEntity->replayToSource(); 159 const bool replayToSource = createEntity->replayToSource();
147 const QByteArray bufferType = QByteArray(reinterpret_cast<char const*>(createEntity->domainType()->Data()), createEntity->domainType()->size()); 160 const QByteArray bufferType = QByteArray(reinterpret_cast<char const*>(createEntity->domainType()->Data()), createEntity->domainType()->size());
148 { 161 {
149 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(createEntity->delta()->Data()), createEntity->delta()->size()); 162 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(createEntity->delta()->Data()), createEntity->delta()->size());
150 if (!Sink::VerifyEntityBuffer(verifyer)) { 163 if (!VerifyEntityBuffer(verifyer)) {
151 Warning() << "invalid buffer, not an entity buffer"; 164 Warning() << "invalid buffer, not an entity buffer";
152 return KAsync::error<qint64>(0); 165 return KAsync::error<qint64>(0);
153 } 166 }
154 } 167 }
155 auto entity = Sink::GetEntity(createEntity->delta()->Data()); 168 auto entity = GetEntity(createEntity->delta()->Data());
156 if (!entity->resource()->size() && !entity->local()->size()) { 169 if (!entity->resource()->size() && !entity->local()->size()) {
157 Warning() << "No local and no resource buffer while trying to create entity."; 170 Warning() << "No local and no resource buffer while trying to create entity.";
158 return KAsync::error<qint64>(0); 171 return KAsync::error<qint64>(0);
@@ -161,7 +174,7 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size)
161 QByteArray key; 174 QByteArray key;
162 if (createEntity->entityId()) { 175 if (createEntity->entityId()) {
163 key = QByteArray(reinterpret_cast<char const*>(createEntity->entityId()->Data()), createEntity->entityId()->size()); 176 key = QByteArray(reinterpret_cast<char const*>(createEntity->entityId()->Data()), createEntity->entityId()->size());
164 if (d->transaction.openDatabase(bufferType + ".main").contains(key)) { 177 if (Storage::mainDatabase(d->transaction, bufferType).contains(key)) {
165 ErrorMsg() << "An entity with this id already exists: " << key; 178 ErrorMsg() << "An entity with this id already exists: " << key;
166 return KAsync::error<qint64>(0); 179 return KAsync::error<qint64>(0);
167 } 180 }
@@ -171,21 +184,21 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size)
171 key = QUuid::createUuid().toString().toUtf8(); 184 key = QUuid::createUuid().toString().toUtf8();
172 } 185 }
173 Q_ASSERT(!key.isEmpty()); 186 Q_ASSERT(!key.isEmpty());
174 const qint64 newRevision = Sink::Storage::maxRevision(d->transaction) + 1; 187 const qint64 newRevision = Storage::maxRevision(d->transaction) + 1;
175 188
176 //Add metadata buffer 189 //Add metadata buffer
177 flatbuffers::FlatBufferBuilder metadataFbb; 190 flatbuffers::FlatBufferBuilder metadataFbb;
178 auto metadataBuilder = Sink::MetadataBuilder(metadataFbb); 191 auto metadataBuilder = MetadataBuilder(metadataFbb);
179 metadataBuilder.add_revision(newRevision); 192 metadataBuilder.add_revision(newRevision);
180 metadataBuilder.add_operation(Sink::Operation_Creation); 193 metadataBuilder.add_operation(Operation_Creation);
181 metadataBuilder.add_replayToSource(replayToSource); 194 metadataBuilder.add_replayToSource(replayToSource);
182 auto metadataBuffer = metadataBuilder.Finish(); 195 auto metadataBuffer = metadataBuilder.Finish();
183 Sink::FinishMetadataBuffer(metadataFbb, metadataBuffer); 196 FinishMetadataBuffer(metadataFbb, metadataBuffer);
184 197
185 flatbuffers::FlatBufferBuilder fbb; 198 flatbuffers::FlatBufferBuilder fbb;
186 EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), entity->resource()->Data(), entity->resource()->size(), entity->local()->Data(), entity->local()->size()); 199 EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), entity->resource()->Data(), entity->resource()->size(), entity->local()->Data(), entity->local()->size());
187 200
188 storeNewRevision(newRevision, fbb, bufferType, key); 201 d->storeNewRevision(newRevision, fbb, bufferType, key);
189 202
190 auto adaptorFactory = d->adaptorFactory.value(bufferType); 203 auto adaptorFactory = d->adaptorFactory.value(bufferType);
191 if (!adaptorFactory) { 204 if (!adaptorFactory) {
@@ -194,14 +207,14 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size)
194 } 207 }
195 208
196 Log() << "Pipeline: wrote entity: " << key << newRevision << bufferType; 209 Log() << "Pipeline: wrote entity: " << key << newRevision << bufferType;
197 d->transaction.openDatabase(bufferType + ".main").scan(Sink::Storage::assembleKey(key, newRevision), [this, bufferType, newRevision, adaptorFactory, key](const QByteArray &, const QByteArray &value) -> bool { 210 Storage::mainDatabase(d->transaction, bufferType).scan(Storage::assembleKey(key, newRevision), [this, bufferType, newRevision, adaptorFactory, key](const QByteArray &, const QByteArray &value) -> bool {
198 auto entity = Sink::GetEntity(value); 211 auto entity = GetEntity(value);
199 auto adaptor = adaptorFactory->createAdaptor(*entity); 212 auto adaptor = adaptorFactory->createAdaptor(*entity);
200 for (auto processor : d->processors[bufferType]) { 213 for (auto processor : d->processors[bufferType]) {
201 processor->newEntity(key, newRevision, *adaptor, d->transaction); 214 processor->newEntity(key, newRevision, *adaptor, d->transaction);
202 } 215 }
203 return false; 216 return false;
204 }, [this](const Sink::Storage::Error &error) { 217 }, [this](const Storage::Error &error) {
205 ErrorMsg() << "Failed to find value in pipeline: " << error.message; 218 ErrorMsg() << "Failed to find value in pipeline: " << error.message;
206 }); 219 });
207 return KAsync::start<qint64>([newRevision](){ 220 return KAsync::start<qint64>([newRevision](){
@@ -212,17 +225,18 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size)
212KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size) 225KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size)
213{ 226{
214 Trace() << "Pipeline: Modified Entity"; 227 Trace() << "Pipeline: Modified Entity";
228 d->transactionItemCount++;
215 229
216 const qint64 newRevision = Sink::Storage::maxRevision(d->transaction) + 1; 230 const qint64 newRevision = Storage::maxRevision(d->transaction) + 1;
217 231
218 { 232 {
219 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size); 233 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size);
220 if (!Sink::Commands::VerifyModifyEntityBuffer(verifyer)) { 234 if (!Commands::VerifyModifyEntityBuffer(verifyer)) {
221 Warning() << "invalid buffer, not a modify entity buffer"; 235 Warning() << "invalid buffer, not a modify entity buffer";
222 return KAsync::error<qint64>(0); 236 return KAsync::error<qint64>(0);
223 } 237 }
224 } 238 }
225 auto modifyEntity = Sink::Commands::GetModifyEntity(command); 239 auto modifyEntity = Commands::GetModifyEntity(command);
226 Q_ASSERT(modifyEntity); 240 Q_ASSERT(modifyEntity);
227 241
228 const qint64 baseRevision = modifyEntity->revision(); 242 const qint64 baseRevision = modifyEntity->revision();
@@ -236,7 +250,7 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size)
236 } 250 }
237 { 251 {
238 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(modifyEntity->delta()->Data()), modifyEntity->delta()->size()); 252 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(modifyEntity->delta()->Data()), modifyEntity->delta()->size());
239 if (!Sink::VerifyEntityBuffer(verifyer)) { 253 if (!VerifyEntityBuffer(verifyer)) {
240 Warning() << "invalid buffer, not an entity buffer"; 254 Warning() << "invalid buffer, not an entity buffer";
241 return KAsync::error<qint64>(0); 255 return KAsync::error<qint64>(0);
242 } 256 }
@@ -249,13 +263,13 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size)
249 return KAsync::error<qint64>(0); 263 return KAsync::error<qint64>(0);
250 } 264 }
251 265
252 auto diffEntity = Sink::GetEntity(modifyEntity->delta()->Data()); 266 auto diffEntity = GetEntity(modifyEntity->delta()->Data());
253 Q_ASSERT(diffEntity); 267 Q_ASSERT(diffEntity);
254 auto diff = adaptorFactory->createAdaptor(*diffEntity); 268 auto diff = adaptorFactory->createAdaptor(*diffEntity);
255 269
256 QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> current; 270 QSharedPointer<ApplicationDomain::BufferAdaptor> current;
257 d->transaction.openDatabase(bufferType + ".main").findLatest(key, [&current, adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool { 271 Storage::mainDatabase(d->transaction, bufferType).findLatest(key, [&current, adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool {
258 Sink::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); 272 EntityBuffer buffer(const_cast<const char *>(data.data()), data.size());
259 if (!buffer.isValid()) { 273 if (!buffer.isValid()) {
260 Warning() << "Read invalid buffer from disk"; 274 Warning() << "Read invalid buffer from disk";
261 } else { 275 } else {
@@ -273,15 +287,22 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size)
273 } 287 }
274 288
275 //resource and uid don't matter at this point 289 //resource and uid don't matter at this point
276 const Sink::ApplicationDomain::ApplicationDomainType existingObject("", "", newRevision, current); 290 const ApplicationDomain::ApplicationDomainType existingObject("", "", newRevision, current);
277 auto newObject = Sink::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<Sink::ApplicationDomain::ApplicationDomainType>(existingObject); 291 auto newObject = ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<ApplicationDomain::ApplicationDomainType>(existingObject);
278 292
279 //Apply diff 293 //Apply diff
280 //FIXME only apply the properties that are available in the buffer 294 //FIXME only apply the properties that are available in the buffer
281 Trace() << "Applying changed properties: " << diff->availableProperties(); 295 Trace() << "Applying changed properties: " << diff->availableProperties();
296 QSet<QByteArray> changeset;
282 for (const auto &property : diff->availableProperties()) { 297 for (const auto &property : diff->availableProperties()) {
283 newObject->setProperty(property, diff->getProperty(property)); 298 changeset << property;
299 const auto value = diff->getProperty(property);
300 if (value.isValid()) {
301 newObject->setProperty(property, value);
302 }
284 } 303 }
304 //Altough we only set some properties, we want all to be serialized
305 newObject->setChangedProperties(changeset);
285 306
286 //Remove deletions 307 //Remove deletions
287 if (modifyEntity->deletions()) { 308 if (modifyEntity->deletions()) {
@@ -292,26 +313,29 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size)
292 313
293 //Add metadata buffer 314 //Add metadata buffer
294 flatbuffers::FlatBufferBuilder metadataFbb; 315 flatbuffers::FlatBufferBuilder metadataFbb;
295 auto metadataBuilder = Sink::MetadataBuilder(metadataFbb); 316 auto metadataBuilder = MetadataBuilder(metadataFbb);
296 metadataBuilder.add_revision(newRevision); 317 metadataBuilder.add_revision(newRevision);
297 metadataBuilder.add_operation(Sink::Operation_Modification); 318 metadataBuilder.add_operation(Operation_Modification);
298 metadataBuilder.add_replayToSource(replayToSource); 319 metadataBuilder.add_replayToSource(replayToSource);
299 auto metadataBuffer = metadataBuilder.Finish(); 320 auto metadataBuffer = metadataBuilder.Finish();
300 Sink::FinishMetadataBuffer(metadataFbb, metadataBuffer); 321 FinishMetadataBuffer(metadataFbb, metadataBuffer);
301 322
302 flatbuffers::FlatBufferBuilder fbb; 323 flatbuffers::FlatBufferBuilder fbb;
303 adaptorFactory->createBuffer(*newObject, fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize()); 324 adaptorFactory->createBuffer(*newObject, fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize());
304 325
305 storeNewRevision(newRevision, fbb, bufferType, key); 326 d->storeNewRevision(newRevision, fbb, bufferType, key);
306 Log() << "Pipeline: modified entity: " << key << newRevision << bufferType; 327 Log() << "Pipeline: modified entity: " << key << newRevision << bufferType;
307 d->transaction.openDatabase(bufferType + ".main").scan(Sink::Storage::assembleKey(key, newRevision), [this, bufferType, newRevision, adaptorFactory, current, key](const QByteArray &, const QByteArray &value) -> bool { 328 Storage::mainDatabase(d->transaction, bufferType).scan(Storage::assembleKey(key, newRevision), [this, bufferType, newRevision, adaptorFactory, current, key](const QByteArray &k, const QByteArray &value) -> bool {
308 auto entity = Sink::GetEntity(value); 329 if (value.isEmpty()) {
330 ErrorMsg() << "Read buffer is empty.";
331 }
332 auto entity = GetEntity(value.data());
309 auto newEntity = adaptorFactory->createAdaptor(*entity); 333 auto newEntity = adaptorFactory->createAdaptor(*entity);
310 for (auto processor : d->processors[bufferType]) { 334 for (auto processor : d->processors[bufferType]) {
311 processor->modifiedEntity(key, newRevision, *current, *newEntity, d->transaction); 335 processor->modifiedEntity(key, newRevision, *current, *newEntity, d->transaction);
312 } 336 }
313 return false; 337 return false;
314 }, [this](const Sink::Storage::Error &error) { 338 }, [this](const Storage::Error &error) {
315 ErrorMsg() << "Failed to find value in pipeline: " << error.message; 339 ErrorMsg() << "Failed to find value in pipeline: " << error.message;
316 }); 340 });
317 return KAsync::start<qint64>([newRevision](){ 341 return KAsync::start<qint64>([newRevision](){
@@ -322,15 +346,16 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size)
322KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size) 346KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size)
323{ 347{
324 Trace() << "Pipeline: Deleted Entity"; 348 Trace() << "Pipeline: Deleted Entity";
349 d->transactionItemCount++;
325 350
326 { 351 {
327 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size); 352 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size);
328 if (!Sink::Commands::VerifyDeleteEntityBuffer(verifyer)) { 353 if (!Commands::VerifyDeleteEntityBuffer(verifyer)) {
329 Warning() << "invalid buffer, not a delete entity buffer"; 354 Warning() << "invalid buffer, not a delete entity buffer";
330 return KAsync::error<qint64>(0); 355 return KAsync::error<qint64>(0);
331 } 356 }
332 } 357 }
333 auto deleteEntity = Sink::Commands::GetDeleteEntity(command); 358 auto deleteEntity = Commands::GetDeleteEntity(command);
334 359
335 const bool replayToSource = deleteEntity->replayToSource(); 360 const bool replayToSource = deleteEntity->replayToSource();
336 const QByteArray bufferType = QByteArray(reinterpret_cast<char const*>(deleteEntity->domainType()->Data()), deleteEntity->domainType()->size()); 361 const QByteArray bufferType = QByteArray(reinterpret_cast<char const*>(deleteEntity->domainType()->Data()), deleteEntity->domainType()->size());
@@ -338,13 +363,12 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size)
338 363
339 bool found = false; 364 bool found = false;
340 bool alreadyRemoved = false; 365 bool alreadyRemoved = false;
341 d->transaction.openDatabase(bufferType + ".main").findLatest(key, [&found, &alreadyRemoved](const QByteArray &key, const QByteArray &data) -> bool { 366 Storage::mainDatabase(d->transaction, bufferType).findLatest(key, [&found, &alreadyRemoved](const QByteArray &key, const QByteArray &data) -> bool {
342 Sink::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); 367 auto entity = GetEntity(data.data());
343 auto entity = Sink::GetEntity(data.data());
344 if (entity && entity->metadata()) { 368 if (entity && entity->metadata()) {
345 auto metadata = Sink::GetMetadata(entity->metadata()->Data()); 369 auto metadata = GetMetadata(entity->metadata()->Data());
346 found = true; 370 found = true;
347 if (metadata->operation() == Sink::Operation_Removal) { 371 if (metadata->operation() == Operation_Removal) {
348 alreadyRemoved = true; 372 alreadyRemoved = true;
349 } 373 }
350 374
@@ -364,16 +388,16 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size)
364 return KAsync::error<qint64>(0); 388 return KAsync::error<qint64>(0);
365 } 389 }
366 390
367 const qint64 newRevision = Sink::Storage::maxRevision(d->transaction) + 1; 391 const qint64 newRevision = Storage::maxRevision(d->transaction) + 1;
368 392
369 //Add metadata buffer 393 //Add metadata buffer
370 flatbuffers::FlatBufferBuilder metadataFbb; 394 flatbuffers::FlatBufferBuilder metadataFbb;
371 auto metadataBuilder = Sink::MetadataBuilder(metadataFbb); 395 auto metadataBuilder = MetadataBuilder(metadataFbb);
372 metadataBuilder.add_revision(newRevision); 396 metadataBuilder.add_revision(newRevision);
373 metadataBuilder.add_operation(Sink::Operation_Removal); 397 metadataBuilder.add_operation(Operation_Removal);
374 metadataBuilder.add_replayToSource(replayToSource); 398 metadataBuilder.add_replayToSource(replayToSource);
375 auto metadataBuffer = metadataBuilder.Finish(); 399 auto metadataBuffer = metadataBuilder.Finish();
376 Sink::FinishMetadataBuffer(metadataFbb, metadataBuffer); 400 FinishMetadataBuffer(metadataFbb, metadataBuffer);
377 401
378 flatbuffers::FlatBufferBuilder fbb; 402 flatbuffers::FlatBufferBuilder fbb;
379 EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), 0, 0, 0, 0); 403 EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), 0, 0, 0, 0);
@@ -384,20 +408,20 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size)
384 return KAsync::error<qint64>(0); 408 return KAsync::error<qint64>(0);
385 } 409 }
386 410
387 QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> current; 411 QSharedPointer<ApplicationDomain::BufferAdaptor> current;
388 d->transaction.openDatabase(bufferType + ".main").findLatest(key, [this, bufferType, newRevision, adaptorFactory, key, &current](const QByteArray &, const QByteArray &data) -> bool { 412 Storage::mainDatabase(d->transaction, bufferType).findLatest(key, [this, bufferType, newRevision, adaptorFactory, key, &current](const QByteArray &, const QByteArray &data) -> bool {
389 Sink::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); 413 EntityBuffer buffer(const_cast<const char *>(data.data()), data.size());
390 if (!buffer.isValid()) { 414 if (!buffer.isValid()) {
391 Warning() << "Read invalid buffer from disk"; 415 Warning() << "Read invalid buffer from disk";
392 } else { 416 } else {
393 current = adaptorFactory->createAdaptor(buffer.entity()); 417 current = adaptorFactory->createAdaptor(buffer.entity());
394 } 418 }
395 return false; 419 return false;
396 }, [this](const Sink::Storage::Error &error) { 420 }, [this](const Storage::Error &error) {
397 ErrorMsg() << "Failed to find value in pipeline: " << error.message; 421 ErrorMsg() << "Failed to find value in pipeline: " << error.message;
398 }); 422 });
399 423
400 storeNewRevision(newRevision, fbb, bufferType, key); 424 d->storeNewRevision(newRevision, fbb, bufferType, key);
401 Log() << "Pipeline: deleted entity: "<< newRevision; 425 Log() << "Pipeline: deleted entity: "<< newRevision;
402 426
403 for (auto processor : d->processors[bufferType]) { 427 for (auto processor : d->processors[bufferType]) {
@@ -411,33 +435,33 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size)
411 435
412void Pipeline::cleanupRevision(qint64 revision) 436void Pipeline::cleanupRevision(qint64 revision)
413{ 437{
414 const auto uid = Sink::Storage::getUidFromRevision(d->transaction, revision); 438 const auto uid = Storage::getUidFromRevision(d->transaction, revision);
415 const auto bufferType = Sink::Storage::getTypeFromRevision(d->transaction, revision); 439 const auto bufferType = Storage::getTypeFromRevision(d->transaction, revision);
416 Trace() << "Cleaning up revision " << revision << uid << bufferType; 440 Trace() << "Cleaning up revision " << revision << uid << bufferType;
417 d->transaction.openDatabase(bufferType + ".main").scan(uid, [&](const QByteArray &key, const QByteArray &data) -> bool { 441 Storage::mainDatabase(d->transaction, bufferType).scan(uid, [&](const QByteArray &key, const QByteArray &data) -> bool {
418 Sink::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); 442 EntityBuffer buffer(const_cast<const char *>(data.data()), data.size());
419 if (!buffer.isValid()) { 443 if (!buffer.isValid()) {
420 Warning() << "Read invalid buffer from disk"; 444 Warning() << "Read invalid buffer from disk";
421 } else { 445 } else {
422 const auto metadata = flatbuffers::GetRoot<Sink::Metadata>(buffer.metadataBuffer()); 446 const auto metadata = flatbuffers::GetRoot<Metadata>(buffer.metadataBuffer());
423 const qint64 rev = metadata->revision(); 447 const qint64 rev = metadata->revision();
424 //Remove old revisions, and the current if the entity has already been removed 448 //Remove old revisions, and the current if the entity has already been removed
425 if (rev < revision || metadata->operation() == Sink::Operation_Removal) { 449 if (rev < revision || metadata->operation() == Operation_Removal) {
426 Sink::Storage::removeRevision(d->transaction, rev); 450 Storage::removeRevision(d->transaction, rev);
427 d->transaction.openDatabase(bufferType + ".main").remove(key); 451 Storage::mainDatabase(d->transaction, bufferType).remove(key);
428 } 452 }
429 } 453 }
430 454
431 return true; 455 return true;
432 }, [](const Sink::Storage::Error &error) { 456 }, [](const Storage::Error &error) {
433 Warning() << "Error while reading: " << error.message; 457 Warning() << "Error while reading: " << error.message;
434 }, true); 458 }, true);
435 Sink::Storage::setCleanedUpRevision(d->transaction, revision); 459 Storage::setCleanedUpRevision(d->transaction, revision);
436} 460}
437 461
438qint64 Pipeline::cleanedUpRevision() 462qint64 Pipeline::cleanedUpRevision()
439{ 463{
440 return Sink::Storage::cleanedUpRevision(d->transaction); 464 return Storage::cleanedUpRevision(d->transaction);
441} 465}
442 466
443Preprocessor::Preprocessor() 467Preprocessor::Preprocessor()
@@ -459,3 +483,7 @@ void Preprocessor::finalize()
459 483
460} // namespace Sink 484} // namespace Sink
461 485
486#pragma clang diagnostic push
487#pragma clang diagnostic ignored "-Wundefined-reinterpret-cast"
488#include "moc_pipeline.cpp"
489#pragma clang diagnostic pop
diff --git a/common/pipeline.h b/common/pipeline.h
index 60a5fa5..096771f 100644
--- a/common/pipeline.h
+++ b/common/pipeline.h
@@ -25,7 +25,7 @@
25#include <QSharedDataPointer> 25#include <QSharedDataPointer>
26#include <QObject> 26#include <QObject>
27 27
28#include <sinkcommon_export.h> 28#include "sink_export.h"
29#include <storage.h> 29#include <storage.h>
30 30
31#include <Async/Async> 31#include <Async/Async>
@@ -37,7 +37,7 @@ namespace Sink
37 37
38class Preprocessor; 38class Preprocessor;
39 39
40class SINKCOMMON_EXPORT Pipeline : public QObject 40class SINK_EXPORT Pipeline : public QObject
41{ 41{
42 Q_OBJECT 42 Q_OBJECT
43 43
@@ -73,13 +73,11 @@ Q_SIGNALS:
73 void revisionUpdated(qint64); 73 void revisionUpdated(qint64);
74 74
75private: 75private:
76 void storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid);
77
78 class Private; 76 class Private;
79 Private * const d; 77 Private * const d;
80}; 78};
81 79
82class SINKCOMMON_EXPORT Preprocessor 80class SINK_EXPORT Preprocessor
83{ 81{
84public: 82public:
85 Preprocessor(); 83 Preprocessor();
diff --git a/common/propertymapper.h b/common/propertymapper.h
index efde72c..57202ab 100644
--- a/common/propertymapper.h
+++ b/common/propertymapper.h
@@ -19,6 +19,7 @@
19 19
20#pragma once 20#pragma once
21 21
22#include "sink_export.h"
22#include <QVariant> 23#include <QVariant>
23#include <QByteArray> 24#include <QByteArray>
24#include <functional> 25#include <functional>
@@ -28,17 +29,17 @@
28 * Defines how to convert qt primitives to flatbuffer ones 29 * Defines how to convert qt primitives to flatbuffer ones
29 */ 30 */
30template <class T> 31template <class T>
31flatbuffers::uoffset_t variantToProperty(const QVariant &, flatbuffers::FlatBufferBuilder &fbb); 32flatbuffers::uoffset_t SINK_EXPORT variantToProperty(const QVariant &, flatbuffers::FlatBufferBuilder &fbb);
32 33
33/** 34/**
34 * Defines how to convert flatbuffer primitives to qt ones 35 * Defines how to convert flatbuffer primitives to qt ones
35 */ 36 */
36template <typename T> 37template <typename T>
37QVariant propertyToVariant(const flatbuffers::String *); 38QVariant SINK_EXPORT propertyToVariant(const flatbuffers::String *);
38template <typename T> 39template <typename T>
39QVariant propertyToVariant(uint8_t); 40QVariant SINK_EXPORT propertyToVariant(uint8_t);
40template <typename T> 41template <typename T>
41QVariant propertyToVariant(const flatbuffers::Vector<uint8_t> *); 42QVariant SINK_EXPORT propertyToVariant(const flatbuffers::Vector<uint8_t> *);
42 43
43 44
44/** 45/**
@@ -52,6 +53,8 @@ template<typename BufferType>
52class ReadPropertyMapper 53class ReadPropertyMapper
53{ 54{
54public: 55public:
56 virtual ~ReadPropertyMapper(){};
57
55 virtual QVariant getProperty(const QByteArray &key, BufferType const *buffer) const 58 virtual QVariant getProperty(const QByteArray &key, BufferType const *buffer) const
56 { 59 {
57 if (mReadAccessors.contains(key)) { 60 if (mReadAccessors.contains(key)) {
@@ -106,6 +109,8 @@ template<typename BufferBuilder>
106class WritePropertyMapper 109class WritePropertyMapper
107{ 110{
108public: 111public:
112 virtual ~WritePropertyMapper(){};
113
109 virtual void setProperty(const QByteArray &key, const QVariant &value, QList<std::function<void(BufferBuilder &)> > &builderCalls, flatbuffers::FlatBufferBuilder &fbb) const 114 virtual void setProperty(const QByteArray &key, const QVariant &value, QList<std::function<void(BufferBuilder &)> > &builderCalls, flatbuffers::FlatBufferBuilder &fbb) const
110 { 115 {
111 if (mWriteAccessors.contains(key)) { 116 if (mWriteAccessors.contains(key)) {
diff --git a/common/queryrunner.cpp b/common/queryrunner.cpp
index 25d69b1..22682d3 100644
--- a/common/queryrunner.cpp
+++ b/common/queryrunner.cpp
@@ -26,6 +26,9 @@
26#include "domainadaptor.h" 26#include "domainadaptor.h"
27#include "asyncutils.h" 27#include "asyncutils.h"
28 28
29#undef DEBUG_AREA
30#define DEBUG_AREA "client.queryrunner"
31
29using namespace Sink; 32using namespace Sink;
30 33
31/* 34/*
@@ -38,14 +41,14 @@ template<typename DomainType>
38class QueryWorker : public QObject 41class QueryWorker : public QObject
39{ 42{
40public: 43public:
41 QueryWorker(const Sink::Query &query, const QByteArray &instanceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &, const QByteArray &bufferType); 44 QueryWorker(const Sink::Query &query, const QByteArray &instanceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &, const QByteArray &bufferType, const QueryRunnerBase::ResultTransformation &transformation);
42 virtual ~QueryWorker(); 45 virtual ~QueryWorker();
43 46
44 qint64 executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider); 47 qint64 executeIncrementalQuery(const Sink::Query &query, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider);
45 qint64 executeInitialQuery(const Sink::Query &query, const typename DomainType::Ptr &parent, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider); 48 qint64 executeInitialQuery(const Sink::Query &query, const typename DomainType::Ptr &parent, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider);
46 49
47private: 50private:
48 static void replaySet(ResultSet &resultSet, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, const QList<QByteArray> &properties); 51 void replaySet(ResultSet &resultSet, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, const QList<QByteArray> &properties);
49 52
50 void readEntity(const Sink::Storage::NamedDatabase &db, const QByteArray &key, const std::function<void(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &, Sink::Operation)> &resultCallback); 53 void readEntity(const Sink::Storage::NamedDatabase &db, const QByteArray &key, const std::function<void(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &, Sink::Operation)> &resultCallback);
51 54
@@ -57,6 +60,7 @@ private:
57 qint64 load(const Sink::Query &query, const std::function<ResultSet(Sink::Storage::Transaction &, QSet<QByteArray> &)> &baseSetRetriever, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, bool initialQuery); 60 qint64 load(const Sink::Query &query, const std::function<ResultSet(Sink::Storage::Transaction &, QSet<QByteArray> &)> &baseSetRetriever, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, bool initialQuery);
58 61
59private: 62private:
63 QueryRunnerBase::ResultTransformation mResultTransformation;
60 DomainTypeAdaptorFactoryInterface::Ptr mDomainTypeAdaptorFactory; 64 DomainTypeAdaptorFactoryInterface::Ptr mDomainTypeAdaptorFactory;
61 QByteArray mResourceInstanceIdentifier; 65 QByteArray mResourceInstanceIdentifier;
62 QByteArray mBufferType; 66 QByteArray mBufferType;
@@ -72,11 +76,11 @@ QueryRunner<DomainType>::QueryRunner(const Sink::Query &query, const Sink::Resou
72{ 76{
73 Trace() << "Starting query"; 77 Trace() << "Starting query";
74 //We delegate loading of initial data to the result provider, os it can decide for itself what it needs to load. 78 //We delegate loading of initial data to the result provider, os it can decide for itself what it needs to load.
75 mResultProvider->setFetcher([this, query, instanceIdentifier, factory, bufferType](const typename DomainType::Ptr &parent) { 79 mResultProvider->setFetcher([=](const typename DomainType::Ptr &parent) {
76 Trace() << "Running fetcher"; 80 Trace() << "Running fetcher";
77 auto resultProvider = mResultProvider; 81 auto resultProvider = mResultProvider;
78 async::run<qint64>([query, instanceIdentifier, factory, bufferType, parent, resultProvider]() -> qint64 { 82 async::run<qint64>([=]() -> qint64 {
79 QueryWorker<DomainType> worker(query, instanceIdentifier, factory, bufferType); 83 QueryWorker<DomainType> worker(query, instanceIdentifier, factory, bufferType, mResultTransformation);
80 const qint64 newRevision = worker.executeInitialQuery(query, parent, *resultProvider); 84 const qint64 newRevision = worker.executeInitialQuery(query, parent, *resultProvider);
81 return newRevision; 85 return newRevision;
82 }) 86 })
@@ -91,18 +95,17 @@ QueryRunner<DomainType>::QueryRunner(const Sink::Query &query, const Sink::Resou
91 // In case of a live query we keep the runner for as long alive as the result provider exists 95 // In case of a live query we keep the runner for as long alive as the result provider exists
92 if (query.liveQuery) { 96 if (query.liveQuery) {
93 //Incremental updates are always loaded directly, leaving it up to the result to discard the changes if they are not interesting 97 //Incremental updates are always loaded directly, leaving it up to the result to discard the changes if they are not interesting
94 setQuery([this, query, instanceIdentifier, factory, bufferType] () -> KAsync::Job<void> { 98 setQuery([=] () -> KAsync::Job<void> {
95 auto resultProvider = mResultProvider; 99 auto resultProvider = mResultProvider;
96 return async::run<qint64>([query, instanceIdentifier, factory, bufferType, resultProvider]() -> qint64 { 100 return async::run<qint64>([=]() -> qint64 {
97 QueryWorker<DomainType> worker(query, instanceIdentifier, factory, bufferType); 101 QueryWorker<DomainType> worker(query, instanceIdentifier, factory, bufferType, mResultTransformation);
98 const qint64 newRevision = worker.executeIncrementalQuery(query, *resultProvider); 102 const qint64 newRevision = worker.executeIncrementalQuery(query, *resultProvider);
99 return newRevision; 103 return newRevision;
100 }) 104 })
101 .template then<void, qint64>([query, this](qint64 newRevision) { 105 .template then<void, qint64>([query, this](qint64 newRevision) {
102 //Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise. 106 //Only send the revision replayed information if we're connected to the resource, there's no need to start the resource otherwise.
103 mResourceAccess->sendRevisionReplayedCommand(newRevision); 107 mResourceAccess->sendRevisionReplayedCommand(newRevision);
104 }) 108 });
105 .template then<void>([](){});
106 }); 109 });
107 //Ensure the connection is open, if it wasn't already opened 110 //Ensure the connection is open, if it wasn't already opened
108 //TODO If we are not connected already, we have to check for the latest revision once connected, otherwise we could miss some updates 111 //TODO If we are not connected already, we have to check for the latest revision once connected, otherwise we could miss some updates
@@ -118,6 +121,12 @@ QueryRunner<DomainType>::~QueryRunner()
118} 121}
119 122
120template<class DomainType> 123template<class DomainType>
124void QueryRunner<DomainType>::setResultTransformation(const ResultTransformation &transformation)
125{
126 mResultTransformation = transformation;
127}
128
129template<class DomainType>
121typename Sink::ResultEmitter<typename DomainType::Ptr>::Ptr QueryRunner<DomainType>::emitter() 130typename Sink::ResultEmitter<typename DomainType::Ptr>::Ptr QueryRunner<DomainType>::emitter()
122{ 131{
123 return mResultProvider->emitter(); 132 return mResultProvider->emitter();
@@ -129,7 +138,7 @@ static inline ResultSet fullScan(const Sink::Storage::Transaction &transaction,
129{ 138{
130 //TODO use a result set with an iterator, to read values on demand 139 //TODO use a result set with an iterator, to read values on demand
131 QVector<QByteArray> keys; 140 QVector<QByteArray> keys;
132 transaction.openDatabase(bufferType + ".main").scan(QByteArray(), [&](const QByteArray &key, const QByteArray &value) -> bool { 141 Storage::mainDatabase(transaction, bufferType).scan(QByteArray(), [&](const QByteArray &key, const QByteArray &value) -> bool {
133 //Skip internals 142 //Skip internals
134 if (Sink::Storage::isInternalKey(key)) { 143 if (Sink::Storage::isInternalKey(key)) {
135 return true; 144 return true;
@@ -147,8 +156,9 @@ static inline ResultSet fullScan(const Sink::Storage::Transaction &transaction,
147 156
148 157
149template<class DomainType> 158template<class DomainType>
150QueryWorker<DomainType>::QueryWorker(const Sink::Query &query, const QByteArray &instanceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &factory, const QByteArray &bufferType) 159QueryWorker<DomainType>::QueryWorker(const Sink::Query &query, const QByteArray &instanceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &factory, const QByteArray &bufferType, const QueryRunnerBase::ResultTransformation &transformation)
151 : QObject(), 160 : QObject(),
161 mResultTransformation(transformation),
152 mDomainTypeAdaptorFactory(factory), 162 mDomainTypeAdaptorFactory(factory),
153 mResourceInstanceIdentifier(instanceIdentifier), 163 mResourceInstanceIdentifier(instanceIdentifier),
154 mBufferType(bufferType), 164 mBufferType(bufferType),
@@ -167,20 +177,25 @@ template<class DomainType>
167void QueryWorker<DomainType>::replaySet(ResultSet &resultSet, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, const QList<QByteArray> &properties) 177void QueryWorker<DomainType>::replaySet(ResultSet &resultSet, Sink::ResultProviderInterface<typename DomainType::Ptr> &resultProvider, const QList<QByteArray> &properties)
168{ 178{
169 int counter = 0; 179 int counter = 0;
170 while (resultSet.next([&resultProvider, &counter, &properties](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &value, Sink::Operation operation) -> bool { 180 while (resultSet.next([this, &resultProvider, &counter, &properties](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &value, Sink::Operation operation) -> bool {
181 //FIXME allow maildir resource to set the mimeMessage property
182 auto valueCopy = Sink::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<DomainType>(*value, properties).template staticCast<DomainType>();
183 if (mResultTransformation) {
184 mResultTransformation(*valueCopy);
185 }
171 counter++; 186 counter++;
172 switch (operation) { 187 switch (operation) {
173 case Sink::Operation_Creation: 188 case Sink::Operation_Creation:
174 // Trace() << "Got creation"; 189 // Trace() << "Got creation";
175 resultProvider.add(Sink::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<DomainType>(*value, properties).template staticCast<DomainType>()); 190 resultProvider.add(valueCopy);
176 break; 191 break;
177 case Sink::Operation_Modification: 192 case Sink::Operation_Modification:
178 // Trace() << "Got modification"; 193 // Trace() << "Got modification";
179 resultProvider.modify(Sink::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<DomainType>(*value, properties).template staticCast<DomainType>()); 194 resultProvider.modify(valueCopy);
180 break; 195 break;
181 case Sink::Operation_Removal: 196 case Sink::Operation_Removal:
182 // Trace() << "Got removal"; 197 // Trace() << "Got removal";
183 resultProvider.remove(Sink::ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<DomainType>(*value, properties).template staticCast<DomainType>()); 198 resultProvider.remove(valueCopy);
184 break; 199 break;
185 } 200 }
186 return true; 201 return true;
@@ -319,7 +334,7 @@ qint64 QueryWorker<DomainType>::load(const Sink::Query &query, const std::functi
319 Warning() << "Error during query: " << error.store << error.message; 334 Warning() << "Error during query: " << error.store << error.message;
320 }); 335 });
321 auto transaction = storage.createTransaction(Sink::Storage::ReadOnly); 336 auto transaction = storage.createTransaction(Sink::Storage::ReadOnly);
322 auto db = transaction.openDatabase(mBufferType + ".main"); 337 auto db = Storage::mainDatabase(transaction, mBufferType);
323 338
324 QSet<QByteArray> remainingFilters; 339 QSet<QByteArray> remainingFilters;
325 auto resultSet = baseSetRetriever(transaction, remainingFilters); 340 auto resultSet = baseSetRetriever(transaction, remainingFilters);
diff --git a/common/queryrunner.h b/common/queryrunner.h
index 0ee6a81..04e4587 100644
--- a/common/queryrunner.h
+++ b/common/queryrunner.h
@@ -32,6 +32,9 @@
32class QueryRunnerBase : public QObject 32class QueryRunnerBase : public QObject
33{ 33{
34 Q_OBJECT 34 Q_OBJECT
35public:
36 typedef std::function<void(Sink::ApplicationDomain::ApplicationDomainType &domainObject)> ResultTransformation;
37
35protected: 38protected:
36 typedef std::function<KAsync::Job<void>()> QueryFunction; 39 typedef std::function<KAsync::Job<void>()> QueryFunction;
37 40
@@ -43,7 +46,6 @@ protected:
43 queryFunction = query; 46 queryFunction = query;
44 } 47 }
45 48
46
47protected slots: 49protected slots:
48 /** 50 /**
49 * Rerun query with new revision 51 * Rerun query with new revision
@@ -82,10 +84,17 @@ public:
82 QueryRunner(const Sink::Query &query, const Sink::ResourceAccessInterface::Ptr &, const QByteArray &instanceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &, const QByteArray &bufferType); 84 QueryRunner(const Sink::Query &query, const Sink::ResourceAccessInterface::Ptr &, const QByteArray &instanceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &, const QByteArray &bufferType);
83 virtual ~QueryRunner(); 85 virtual ~QueryRunner();
84 86
87 /**
88 * Allows you to run a transformation on every result.
89 * This transformation is executed in the query thread.
90 */
91 void setResultTransformation(const ResultTransformation &transformation);
92
85 typename Sink::ResultEmitter<typename DomainType::Ptr>::Ptr emitter(); 93 typename Sink::ResultEmitter<typename DomainType::Ptr>::Ptr emitter();
86 94
87private: 95private:
88 QSharedPointer<Sink::ResourceAccessInterface> mResourceAccess; 96 QSharedPointer<Sink::ResourceAccessInterface> mResourceAccess;
89 QSharedPointer<Sink::ResultProvider<typename DomainType::Ptr> > mResultProvider; 97 QSharedPointer<Sink::ResultProvider<typename DomainType::Ptr> > mResultProvider;
98 ResultTransformation mResultTransformation;
90}; 99};
91 100
diff --git a/common/resource.cpp b/common/resource.cpp
index 8c448a8..5cbb2f2 100644
--- a/common/resource.cpp
+++ b/common/resource.cpp
@@ -34,7 +34,7 @@ Resource::Resource()
34 : QObject(), 34 : QObject(),
35 d(0) 35 d(0)
36{ 36{
37 37 Q_UNUSED(d);
38} 38}
39 39
40Resource::~Resource() 40Resource::~Resource()
@@ -63,6 +63,11 @@ void Resource::setLowerBoundRevision(qint64 revision)
63 Q_UNUSED(revision) 63 Q_UNUSED(revision)
64} 64}
65 65
66void Resource::removeDataFromDisk()
67{
68}
69
70
66class ResourceFactory::Private 71class ResourceFactory::Private
67{ 72{
68public: 73public:
@@ -75,7 +80,7 @@ ResourceFactory::ResourceFactory(QObject *parent)
75 : QObject(parent), 80 : QObject(parent),
76 d(0) 81 d(0)
77{ 82{
78 83 Q_UNUSED(d);
79} 84}
80 85
81ResourceFactory::~ResourceFactory() 86ResourceFactory::~ResourceFactory()
@@ -129,3 +134,9 @@ ResourceFactory *ResourceFactory::load(const QString &resourceName)
129} 134}
130 135
131} // namespace Sink 136} // namespace Sink
137
138//Ignore warning I don't know how to fix in a moc file
139#pragma clang diagnostic push
140#pragma clang diagnostic ignored "-Wundefined-reinterpret-cast"
141#include "moc_resource.cpp"
142#pragma clang diagnostic pop
diff --git a/common/resource.h b/common/resource.h
index 30d6c46..ab30cb9 100644
--- a/common/resource.h
+++ b/common/resource.h
@@ -19,7 +19,7 @@
19 */ 19 */
20#pragma once 20#pragma once
21 21
22#include <sinkcommon_export.h> 22#include "sink_export.h"
23 23
24#include <Async/Async> 24#include <Async/Async>
25#include "notification.h" 25#include "notification.h"
@@ -31,7 +31,7 @@ class FacadeFactory;
31/** 31/**
32 * Resource interface 32 * Resource interface
33 */ 33 */
34class SINKCOMMON_EXPORT Resource : public QObject 34class SINK_EXPORT Resource : public QObject
35{ 35{
36 Q_OBJECT 36 Q_OBJECT
37public: 37public:
@@ -55,6 +55,11 @@ public:
55 */ 55 */
56 virtual void setLowerBoundRevision(qint64 revision); 56 virtual void setLowerBoundRevision(qint64 revision);
57 57
58 /**
59 * Remove the data from disk
60 */
61 virtual void removeDataFromDisk();
62
58Q_SIGNALS: 63Q_SIGNALS:
59 void revisionUpdated(qint64); 64 void revisionUpdated(qint64);
60 void notify(Notification); 65 void notify(Notification);
@@ -67,7 +72,7 @@ private:
67/** 72/**
68 * Factory interface for resource to implement. 73 * Factory interface for resource to implement.
69 */ 74 */
70class ResourceFactory : public QObject 75class SINK_EXPORT ResourceFactory : public QObject
71{ 76{
72public: 77public:
73 static ResourceFactory *load(const QString &resourceName); 78 static ResourceFactory *load(const QString &resourceName);
diff --git a/common/resourceaccess.cpp b/common/resourceaccess.cpp
index 0c435c9..80d60e8 100644
--- a/common/resourceaccess.cpp
+++ b/common/resourceaccess.cpp
@@ -41,11 +41,13 @@
41#include <QProcess> 41#include <QProcess>
42#include <QDataStream> 42#include <QDataStream>
43#include <QBuffer> 43#include <QBuffer>
44#include <QTime>
44 45
45#undef Trace 46#undef Trace
46#define Trace() Sink::Log::debugStream(Sink::Log::DebugLevel::Trace, __LINE__, __FILE__, Q_FUNC_INFO, "ResourceAccess") 47#define TracePrivate() Trace_area("client.communication." + resourceInstanceIdentifier)
48#define Trace() Trace_area("client.communication." + d->resourceInstanceIdentifier)
47#undef Log 49#undef Log
48#define Log(IDENTIFIER) Sink::Log::debugStream(Sink::Log::DebugLevel::Log, __LINE__, __FILE__, Q_FUNC_INFO, "ResourceAccess("+IDENTIFIER+")") 50#define Log() Log_area("client.communication." + d->resourceInstanceIdentifier)
49 51
50static void queuedInvoke(const std::function<void()> &f, QObject *context = 0) 52static void queuedInvoke(const std::function<void()> &f, QObject *context = 0)
51{ 53{
@@ -168,45 +170,48 @@ KAsync::Job<void> ResourceAccess::Private::tryToConnect()
168 return !socket; 170 return !socket;
169 }, 171 },
170 [this, counter](KAsync::Future<void> &future) { 172 [this, counter](KAsync::Future<void> &future) {
171 Trace() << "Loop"; 173 TracePrivate() << "Loop";
172 KAsync::wait(50) 174 connectToServer(resourceInstanceIdentifier)
173 .then(connectToServer(resourceInstanceIdentifier))
174 .then<void, QSharedPointer<QLocalSocket> >([this, &future](const QSharedPointer<QLocalSocket> &s) { 175 .then<void, QSharedPointer<QLocalSocket> >([this, &future](const QSharedPointer<QLocalSocket> &s) {
175 Q_ASSERT(s); 176 Q_ASSERT(s);
176 socket = s; 177 socket = s;
177 future.setFinished(); 178 future.setFinished();
178 }, 179 }, [&future, counter, this](int errorCode, const QString &errorString) {
179 [&future, counter](int errorCode, const QString &errorString) { 180 static int waitTime = 10;
180 const int maxRetries = 10; 181 static int timeout = 500;
182 static int maxRetries = timeout / waitTime;
181 if (*counter > maxRetries) { 183 if (*counter > maxRetries) {
182 Trace() << "Giving up"; 184 TracePrivate() << "Giving up";
183 future.setError(-1, "Failed to connect to socket"); 185 future.setError(-1, "Failed to connect to socket");
184 } else { 186 } else {
185 future.setFinished(); 187 KAsync::wait(waitTime).then<void>([&future]() {
188 future.setFinished();
189 }).exec();
186 } 190 }
187 *counter = *counter + 1; 191 *counter = *counter + 1;
188 }).exec(); 192 })
193 .exec();
189 }); 194 });
190} 195}
191 196
192KAsync::Job<void> ResourceAccess::Private::initializeSocket() 197KAsync::Job<void> ResourceAccess::Private::initializeSocket()
193{ 198{
194 return KAsync::start<void>([this](KAsync::Future<void> &future) { 199 return KAsync::start<void>([this](KAsync::Future<void> &future) {
195 Trace() << "Trying to connect"; 200 TracePrivate() << "Trying to connect";
196 connectToServer(resourceInstanceIdentifier).then<void, QSharedPointer<QLocalSocket> >([this, &future](const QSharedPointer<QLocalSocket> &s) { 201 connectToServer(resourceInstanceIdentifier).then<void, QSharedPointer<QLocalSocket> >([this, &future](const QSharedPointer<QLocalSocket> &s) {
197 Trace() << "Connected to resource, without having to start it."; 202 TracePrivate() << "Connected to resource, without having to start it.";
198 Q_ASSERT(s); 203 Q_ASSERT(s);
199 socket = s; 204 socket = s;
200 future.setFinished(); 205 future.setFinished();
201 }, 206 },
202 [this, &future](int errorCode, const QString &errorString) { 207 [this, &future](int errorCode, const QString &errorString) {
203 Trace() << "Failed to connect, starting resource"; 208 TracePrivate() << "Failed to connect, starting resource";
204 //We failed to connect, so let's start the resource 209 //We failed to connect, so let's start the resource
205 QStringList args; 210 QStringList args;
206 args << resourceInstanceIdentifier; 211 args << resourceInstanceIdentifier;
207 qint64 pid = 0; 212 qint64 pid = 0;
208 if (QProcess::startDetached("sink_synchronizer", args, QDir::homePath(), &pid)) { 213 if (QProcess::startDetached("sink_synchronizer", args, QDir::homePath(), &pid)) {
209 Trace() << "Started resource " << pid; 214 TracePrivate() << "Started resource " << pid;
210 tryToConnect() 215 tryToConnect()
211 .then<void>([&future]() { 216 .then<void>([&future]() {
212 future.setFinished(); 217 future.setFinished();
@@ -232,12 +237,12 @@ ResourceAccess::ResourceAccess(const QByteArray &resourceInstanceIdentifier)
232 : ResourceAccessInterface(), 237 : ResourceAccessInterface(),
233 d(new Private(getResourceName(resourceInstanceIdentifier), resourceInstanceIdentifier, this)) 238 d(new Private(getResourceName(resourceInstanceIdentifier), resourceInstanceIdentifier, this))
234{ 239{
235 log("Starting access"); 240 Log() << "Starting access";
236} 241}
237 242
238ResourceAccess::~ResourceAccess() 243ResourceAccess::~ResourceAccess()
239{ 244{
240 log("Closing access"); 245 Log() << "Closing access";
241 if (!d->resultHandler.isEmpty()) { 246 if (!d->resultHandler.isEmpty()) {
242 Warning() << "Left jobs running while shutting down ResourceAccess: " << d->resultHandler.keys(); 247 Warning() << "Left jobs running while shutting down ResourceAccess: " << d->resultHandler.keys();
243 } 248 }
@@ -380,9 +385,11 @@ void ResourceAccess::open()
380 if (d->openingSocket) { 385 if (d->openingSocket) {
381 return; 386 return;
382 } 387 }
388 auto time = QSharedPointer<QTime>::create();
389 time->start();
383 d->openingSocket = true; 390 d->openingSocket = true;
384 d->initializeSocket().then<void>([this]() { 391 d->initializeSocket().then<void>([this, time]() {
385 Trace() << "Socket is initialized"; 392 Trace() << "Socket is initialized." << Log::TraceTime(time->elapsed());
386 d->openingSocket = false; 393 d->openingSocket = false;
387 QObject::connect(d->socket.data(), &QLocalSocket::disconnected, 394 QObject::connect(d->socket.data(), &QLocalSocket::disconnected,
388 this, &ResourceAccess::disconnected); 395 this, &ResourceAccess::disconnected);
@@ -400,7 +407,7 @@ void ResourceAccess::open()
400 407
401void ResourceAccess::close() 408void ResourceAccess::close()
402{ 409{
403 log(QString("Closing %1").arg(d->socket->fullServerName())); 410 Log() << QString("Closing %1").arg(d->socket->fullServerName());
404 Trace() << "Pending commands: " << d->pendingCommands.size(); 411 Trace() << "Pending commands: " << d->pendingCommands.size();
405 Trace() << "Queued commands: " << d->commandQueue.size(); 412 Trace() << "Queued commands: " << d->commandQueue.size();
406 d->socket->close(); 413 d->socket->close();
@@ -412,7 +419,7 @@ void ResourceAccess::sendCommand(const QSharedPointer<QueuedCommand> &command)
412 //TODO: we should have a timeout for commands 419 //TODO: we should have a timeout for commands
413 d->messageId++; 420 d->messageId++;
414 const auto messageId = d->messageId; 421 const auto messageId = d->messageId;
415 log(QString("Sending command \"%1\" with messageId %2").arg(QString(Sink::Commands::name(command->commandId))).arg(d->messageId)); 422 Log() << QString("Sending command \"%1\" with messageId %2").arg(QString(Sink::Commands::name(command->commandId))).arg(d->messageId);
416 Q_ASSERT(command->callback); 423 Q_ASSERT(command->callback);
417 registerCallback(d->messageId, [this, messageId, command](int errorCode, QString errorMessage) { 424 registerCallback(d->messageId, [this, messageId, command](int errorCode, QString errorMessage) {
418 Trace() << "Command complete " << messageId; 425 Trace() << "Command complete " << messageId;
@@ -452,7 +459,7 @@ void ResourceAccess::connected()
452 return; 459 return;
453 } 460 }
454 461
455 log(QString("Connected: %1").arg(d->socket->fullServerName())); 462 Log() << QString("Connected: %1").arg(d->socket->fullServerName());
456 463
457 { 464 {
458 flatbuffers::FlatBufferBuilder fbb; 465 flatbuffers::FlatBufferBuilder fbb;
@@ -472,7 +479,7 @@ void ResourceAccess::connected()
472 479
473void ResourceAccess::disconnected() 480void ResourceAccess::disconnected()
474{ 481{
475 log(QString("Disconnected from %1").arg(d->socket->fullServerName())); 482 Log() << QString("Disconnected from %1").arg(d->socket->fullServerName());
476 d->socket->close(); 483 d->socket->close();
477 emit ready(false); 484 emit ready(false);
478} 485}
@@ -480,7 +487,7 @@ void ResourceAccess::disconnected()
480void ResourceAccess::connectionError(QLocalSocket::LocalSocketError error) 487void ResourceAccess::connectionError(QLocalSocket::LocalSocketError error)
481{ 488{
482 if (error == QLocalSocket::PeerClosedError) { 489 if (error == QLocalSocket::PeerClosedError) {
483 Log(d->resourceInstanceIdentifier) << "The resource closed the connection."; 490 Log() << "The resource closed the connection.";
484 d->abortPendingOperations(); 491 d->abortPendingOperations();
485 } else { 492 } else {
486 Warning() << QString("Connection error: %1 : %2").arg(error).arg(d->socket->errorString()); 493 Warning() << QString("Connection error: %1 : %2").arg(error).arg(d->socket->errorString());
@@ -524,14 +531,17 @@ bool ResourceAccess::processMessageBuffer()
524 switch (commandId) { 531 switch (commandId) {
525 case Commands::RevisionUpdateCommand: { 532 case Commands::RevisionUpdateCommand: {
526 auto buffer = Commands::GetRevisionUpdate(d->partialMessageBuffer.constData() + headerSize); 533 auto buffer = Commands::GetRevisionUpdate(d->partialMessageBuffer.constData() + headerSize);
527 log(QString("Revision updated to: %1").arg(buffer->revision())); 534 Log() << QString("Revision updated to: %1").arg(buffer->revision());
535 Notification n;
536 n.type = Sink::Commands::NotificationType::NotificationType_RevisionUpdate;
537 emit notification(n);
528 emit revisionChanged(buffer->revision()); 538 emit revisionChanged(buffer->revision());
529 539
530 break; 540 break;
531 } 541 }
532 case Commands::CommandCompletionCommand: { 542 case Commands::CommandCompletionCommand: {
533 auto buffer = Commands::GetCommandCompletion(d->partialMessageBuffer.constData() + headerSize); 543 auto buffer = Commands::GetCommandCompletion(d->partialMessageBuffer.constData() + headerSize);
534 log(QString("Command with messageId %1 completed %2").arg(buffer->id()).arg(buffer->success() ? "sucessfully" : "unsuccessfully")); 544 Log() << QString("Command with messageId %1 completed %2").arg(buffer->id()).arg(buffer->success() ? "sucessfully" : "unsuccessfully");
535 545
536 d->completeCommands << buffer->id(); 546 d->completeCommands << buffer->id();
537 //The callbacks can result in this object getting destroyed directly, so we need to ensure we finish our work first 547 //The callbacks can result in this object getting destroyed directly, so we need to ensure we finish our work first
@@ -544,11 +554,11 @@ bool ResourceAccess::processMessageBuffer()
544 auto buffer = Commands::GetNotification(d->partialMessageBuffer.constData() + headerSize); 554 auto buffer = Commands::GetNotification(d->partialMessageBuffer.constData() + headerSize);
545 switch (buffer->type()) { 555 switch (buffer->type()) {
546 case Sink::Commands::NotificationType::NotificationType_Shutdown: 556 case Sink::Commands::NotificationType::NotificationType_Shutdown:
547 Log(d->resourceInstanceIdentifier) << "Received shutdown notification."; 557 Log() << "Received shutdown notification.";
548 close(); 558 close();
549 break; 559 break;
550 case Sink::Commands::NotificationType::NotificationType_Inspection: { 560 case Sink::Commands::NotificationType::NotificationType_Inspection: {
551 Log(d->resourceInstanceIdentifier) << "Received inspection notification."; 561 Log() << "Received inspection notification.";
552 Notification n; 562 Notification n;
553 if (buffer->identifier()) { 563 if (buffer->identifier()) {
554 //Don't use fromRawData, the buffer is gone once we invoke emit notification 564 //Don't use fromRawData, the buffer is gone once we invoke emit notification
@@ -566,6 +576,10 @@ bool ResourceAccess::processMessageBuffer()
566 }, this); 576 }, this);
567 } 577 }
568 break; 578 break;
579 case Sink::Commands::NotificationType::NotificationType_Status:
580 case Sink::Commands::NotificationType::NotificationType_Warning:
581 case Sink::Commands::NotificationType::NotificationType_Progress:
582 case Sink::Commands::NotificationType::NotificationType_RevisionUpdate:
569 default: 583 default:
570 Warning() << "Received unknown notification: " << buffer->type(); 584 Warning() << "Received unknown notification: " << buffer->type();
571 break; 585 break;
@@ -580,9 +594,9 @@ bool ResourceAccess::processMessageBuffer()
580 return d->partialMessageBuffer.size() >= headerSize; 594 return d->partialMessageBuffer.size() >= headerSize;
581} 595}
582 596
583void ResourceAccess::log(const QString &message)
584{
585 Log(d->resourceInstanceIdentifier) << this << message;
586} 597}
587 598
588} 599#pragma clang diagnostic push
600#pragma clang diagnostic ignored "-Wundefined-reinterpret-cast"
601#include "moc_resourceaccess.cpp"
602#pragma clang diagnostic pop
diff --git a/common/resourceaccess.h b/common/resourceaccess.h
index 73b676b..4c10adb 100644
--- a/common/resourceaccess.h
+++ b/common/resourceaccess.h
@@ -20,6 +20,7 @@
20 20
21#pragma once 21#pragma once
22 22
23#include "sink_export.h"
23#include <QLocalSocket> 24#include <QLocalSocket>
24#include <QObject> 25#include <QObject>
25#include <QTimer> 26#include <QTimer>
@@ -34,7 +35,7 @@ namespace Sink
34 35
35struct QueuedCommand; 36struct QueuedCommand;
36 37
37class ResourceAccessInterface : public QObject 38class SINK_EXPORT ResourceAccessInterface : public QObject
38{ 39{
39 Q_OBJECT 40 Q_OBJECT
40public: 41public:
@@ -62,7 +63,7 @@ public Q_SLOTS:
62 virtual void close() = 0; 63 virtual void close() = 0;
63}; 64};
64 65
65class ResourceAccess : public ResourceAccessInterface 66class SINK_EXPORT ResourceAccess : public ResourceAccessInterface
66{ 67{
67 Q_OBJECT 68 Q_OBJECT
68public: 69public:
@@ -100,7 +101,6 @@ private Q_SLOTS:
100 101
101private: 102private:
102 void connected(); 103 void connected();
103 void log(const QString &message);
104 void registerCallback(uint messageId, const std::function<void(int error, const QString &)> &callback); 104 void registerCallback(uint messageId, const std::function<void(int error, const QString &)> &callback);
105 105
106 void sendCommand(const QSharedPointer<QueuedCommand> &command); 106 void sendCommand(const QSharedPointer<QueuedCommand> &command);
diff --git a/common/resourceconfig.h b/common/resourceconfig.h
index cc9cb94..2108caa 100644
--- a/common/resourceconfig.h
+++ b/common/resourceconfig.h
@@ -19,12 +19,13 @@
19 19
20#pragma once 20#pragma once
21 21
22#include "sink_export.h"
22#include <QList> 23#include <QList>
23#include <QByteArray> 24#include <QByteArray>
24#include <QVariant> 25#include <QVariant>
25#include <QMap> 26#include <QMap>
26 27
27class ResourceConfig 28class SINK_EXPORT ResourceConfig
28{ 29{
29public: 30public:
30 static QMap<QByteArray, QByteArray> getResources(); 31 static QMap<QByteArray, QByteArray> getResources();
diff --git a/common/resourcecontrol.cpp b/common/resourcecontrol.cpp
new file mode 100644
index 0000000..83558bd
--- /dev/null
+++ b/common/resourcecontrol.cpp
@@ -0,0 +1,125 @@
1/*
2 * Copyright (C) 2015 Christian Mollekopf <chrigi_1@fastmail.fm>
3 *
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Lesser General Public
6 * License as published by the Free Software Foundation; either
7 * version 2.1 of the License, or (at your option) version 3, or any
8 * later version accepted by the membership of KDE e.V. (or its
9 * successor approved by the membership of KDE e.V.), which shall
10 * act as a proxy defined in Section 6 of version 3 of the license.
11 *
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this library. If not, see <http://www.gnu.org/licenses/>.
19 */
20
21#include "resourcecontrol.h"
22
23#include <QTime>
24#include <QUuid>
25#include <functional>
26
27#include "resourceaccess.h"
28#include "commands.h"
29#include "log.h"
30#include "notifier.h"
31
32#undef DEBUG_AREA
33#define DEBUG_AREA "client.resourcecontrol"
34
35namespace Sink
36{
37
38KAsync::Job<void> ResourceControl::shutdown(const QByteArray &identifier)
39{
40 Trace() << "shutdown " << identifier;
41 auto time = QSharedPointer<QTime>::create();
42 time->start();
43 return ResourceAccess::connectToServer(identifier).then<void, QSharedPointer<QLocalSocket>>([identifier, time](QSharedPointer<QLocalSocket> socket, KAsync::Future<void> &future) {
44 //We can't currently reuse the socket
45 socket->close();
46 auto resourceAccess = QSharedPointer<Sink::ResourceAccess>::create(identifier);
47 resourceAccess->open();
48 resourceAccess->sendCommand(Sink::Commands::ShutdownCommand).then<void>([&future, resourceAccess, time]() {
49 Trace() << "Shutdown complete." << Log::TraceTime(time->elapsed());
50 future.setFinished();
51 }).exec();
52 },
53 [](int, const QString &) {
54 Trace() << "Resource is already closed.";
55 //Resource isn't started, nothing to shutdown
56 });
57}
58
59KAsync::Job<void> ResourceControl::start(const QByteArray &identifier)
60{
61 Trace() << "start " << identifier;
62 auto time = QSharedPointer<QTime>::create();
63 time->start();
64 auto resourceAccess = QSharedPointer<Sink::ResourceAccess>::create(identifier);
65 resourceAccess->open();
66 return resourceAccess->sendCommand(Sink::Commands::PingCommand).then<void>([resourceAccess, time]() {
67 Trace() << "Start complete." << Log::TraceTime(time->elapsed());
68 });
69}
70
71KAsync::Job<void> ResourceControl::flushMessageQueue(const QByteArrayList &resourceIdentifier)
72{
73 Trace() << "flushMessageQueue" << resourceIdentifier;
74 return KAsync::iterate(resourceIdentifier)
75 .template each<void, QByteArray>([](const QByteArray &resource, KAsync::Future<void> &future) {
76 Trace() << "Flushing message queue " << resource;
77 auto resourceAccess = QSharedPointer<Sink::ResourceAccess>::create(resource);
78 resourceAccess->open();
79 resourceAccess->synchronizeResource(false, true).then<void>([&future, resourceAccess]() {
80 future.setFinished();
81 }).exec();
82 });
83}
84
85KAsync::Job<void> ResourceControl::flushReplayQueue(const QByteArrayList &resourceIdentifier)
86{
87 return flushMessageQueue(resourceIdentifier);
88}
89
90template <class DomainType>
91KAsync::Job<void> ResourceControl::inspect(const Inspection &inspectionCommand)
92{
93 auto resource = inspectionCommand.resourceIdentifier;
94
95 auto time = QSharedPointer<QTime>::create();
96 time->start();
97 Trace() << "Sending inspection " << resource;
98 auto resourceAccess = QSharedPointer<Sink::ResourceAccess>::create(resource);
99 resourceAccess->open();
100 auto notifier = QSharedPointer<Sink::Notifier>::create(resourceAccess);
101 auto id = QUuid::createUuid().toByteArray();
102 return resourceAccess->sendInspectionCommand(id, ApplicationDomain::getTypeName<DomainType>(), inspectionCommand.entityIdentifier, inspectionCommand.property, inspectionCommand.expectedValue)
103 .template then<void>([resourceAccess, notifier, id, time](KAsync::Future<void> &future) {
104 notifier->registerHandler([&future, id, time](const Notification &notification) {
105 if (notification.id == id) {
106 Trace() << "Inspection complete." << Log::TraceTime(time->elapsed());
107 if (notification.code) {
108 future.setError(-1, "Inspection returned an error: " + notification.message);
109 } else {
110 future.setFinished();
111 }
112 }
113 });
114 });
115}
116
117#define REGISTER_TYPE(T) template KAsync::Job<void> ResourceControl::inspect<T>(const Inspection &); \
118
119REGISTER_TYPE(ApplicationDomain::Event);
120REGISTER_TYPE(ApplicationDomain::Mail);
121REGISTER_TYPE(ApplicationDomain::Folder);
122REGISTER_TYPE(ApplicationDomain::SinkResource);
123
124} // namespace Sink
125
diff --git a/common/resourcecontrol.h b/common/resourcecontrol.h
new file mode 100644
index 0000000..5bfa67f
--- /dev/null
+++ b/common/resourcecontrol.h
@@ -0,0 +1,62 @@
1/*
2 * Copyright (C) 2015 Christian Mollekopf <chrigi_1@fastmail.fm>
3 *
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Lesser General Public
6 * License as published by the Free Software Foundation; either
7 * version 2.1 of the License, or (at your option) version 3, or any
8 * later version accepted by the membership of KDE e.V. (or its
9 * successor approved by the membership of KDE e.V.), which shall
10 * act as a proxy defined in Section 6 of version 3 of the license.
11 *
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this library. If not, see <http://www.gnu.org/licenses/>.
19 */
20
21#pragma once
22
23#include "sink_export.h"
24#include <QByteArray>
25
26#include <Async/Async>
27
28#include "inspection.h"
29
30namespace Sink {
31namespace ResourceControl {
32
33template <class DomainType>
34KAsync::Job<void> SINK_EXPORT inspect(const Inspection &inspectionCommand);
35
36/**
37 * Shutdown resource.
38 */
39KAsync::Job<void> SINK_EXPORT shutdown(const QByteArray &resourceIdentifier);
40
41/**
42 * Start resource.
43 *
44 * The resource is ready for operation once this command completes.
45 * This command is only necessary if a resource was shutdown previously,
46 * otherwise the resource process will automatically start as necessary.
47 */
48KAsync::Job<void> SINK_EXPORT start(const QByteArray &resourceIdentifier);
49
50/**
51 * Flushes any pending messages to disk
52 */
53KAsync::Job<void> SINK_EXPORT flushMessageQueue(const QByteArrayList &resourceIdentifier);
54
55/**
56 * Flushes any pending messages that haven't been replayed to the source.
57 */
58KAsync::Job<void> SINK_EXPORT flushReplayQueue(const QByteArrayList &resourceIdentifier);
59
60 }
61}
62
diff --git a/common/resultprovider.h b/common/resultprovider.h
index 6958dbf..2d6efbe 100644
--- a/common/resultprovider.h
+++ b/common/resultprovider.h
@@ -27,8 +27,6 @@
27#include "resultset.h" 27#include "resultset.h"
28#include "log.h" 28#include "log.h"
29 29
30using namespace async;
31
32namespace Sink { 30namespace Sink {
33 31
34/** 32/**
@@ -47,6 +45,11 @@ public:
47 45
48 } 46 }
49 47
48 virtual ~ResultProviderInterface()
49 {
50
51 }
52
50 virtual void add(const T &value) = 0; 53 virtual void add(const T &value) = 0;
51 virtual void modify(const T &value) = 0; 54 virtual void modify(const T &value) = 0;
52 virtual void remove(const T &value) = 0; 55 virtual void remove(const T &value) = 0;
@@ -183,7 +186,7 @@ public:
183 186
184 void onDone(const std::function<void()> &callback) 187 void onDone(const std::function<void()> &callback)
185 { 188 {
186 mThreadBoundary = QSharedPointer<ThreadBoundary>::create(); 189 mThreadBoundary = QSharedPointer<async::ThreadBoundary>::create();
187 mOnDoneCallback = callback; 190 mOnDoneCallback = callback;
188 } 191 }
189 192
@@ -212,7 +215,7 @@ private:
212 215
213 QWeakPointer<ResultEmitter<T> > mResultEmitter; 216 QWeakPointer<ResultEmitter<T> > mResultEmitter;
214 std::function<void()> mOnDoneCallback; 217 std::function<void()> mOnDoneCallback;
215 QSharedPointer<ThreadBoundary> mThreadBoundary; 218 QSharedPointer<async::ThreadBoundary> mThreadBoundary;
216 std::function<void(const T &parent)> mFetcher; 219 std::function<void(const T &parent)> mFetcher;
217}; 220};
218 221
@@ -327,7 +330,7 @@ private:
327 std::function<void(void)> clearHandler; 330 std::function<void(void)> clearHandler;
328 331
329 std::function<void(const DomainType &parent)> mFetcher; 332 std::function<void(const DomainType &parent)> mFetcher;
330 ThreadBoundary mThreadBoundary; 333 async::ThreadBoundary mThreadBoundary;
331}; 334};
332 335
333template<class DomainType> 336template<class DomainType>
diff --git a/common/storage.h b/common/storage.h
index 2d34f1f..ac03947 100644
--- a/common/storage.h
+++ b/common/storage.h
@@ -21,7 +21,7 @@
21 21
22#pragma once 22#pragma once
23 23
24#include <sinkcommon_export.h> 24#include "sink_export.h"
25#include <string> 25#include <string>
26#include <functional> 26#include <functional>
27#include <QString> 27#include <QString>
@@ -29,7 +29,7 @@
29namespace Sink 29namespace Sink
30{ 30{
31 31
32class SINKCOMMON_EXPORT Storage { 32class SINK_EXPORT Storage {
33public: 33public:
34 enum AccessMode { ReadOnly, ReadWrite }; 34 enum AccessMode { ReadOnly, ReadWrite };
35 35
@@ -180,6 +180,13 @@ public:
180 qint64 diskUsage() const; 180 qint64 diskUsage() const;
181 void removeFromDisk() const; 181 void removeFromDisk() const;
182 182
183 /**
184 * Clears all cached environments.
185 *
186 * This only ever has to be called if a database was removed from another process.
187 */
188 static void clearEnv();
189
183 static qint64 maxRevision(const Sink::Storage::Transaction &); 190 static qint64 maxRevision(const Sink::Storage::Transaction &);
184 static void setMaxRevision(Sink::Storage::Transaction &, qint64 revision); 191 static void setMaxRevision(Sink::Storage::Transaction &, qint64 revision);
185 192
@@ -200,6 +207,8 @@ public:
200 static QByteArray assembleKey(const QByteArray &key, qint64 revision); 207 static QByteArray assembleKey(const QByteArray &key, qint64 revision);
201 static QByteArray uidFromKey(const QByteArray &key); 208 static QByteArray uidFromKey(const QByteArray &key);
202 209
210 static NamedDatabase mainDatabase(const Sink::Storage::Transaction &, const QByteArray &type);
211
203private: 212private:
204 std::function<void(const Storage::Error &error)> mErrorHandler; 213 std::function<void(const Storage::Error &error)> mErrorHandler;
205 214
diff --git a/common/storage_common.cpp b/common/storage_common.cpp
index ea97ac2..0b842d1 100644
--- a/common/storage_common.cpp
+++ b/common/storage_common.cpp
@@ -156,6 +156,11 @@ QByteArray Storage::uidFromKey(const QByteArray &key)
156 return key.mid(0, 38); 156 return key.mid(0, 38);
157} 157}
158 158
159Storage::NamedDatabase Storage::mainDatabase(const Sink::Storage::Transaction &t, const QByteArray &type)
160{
161 return t.openDatabase(type + ".main");
162}
163
159bool Storage::NamedDatabase::contains(const QByteArray &uid) 164bool Storage::NamedDatabase::contains(const QByteArray &uid)
160{ 165{
161 bool found = false; 166 bool found = false;
diff --git a/common/storage_lmdb.cpp b/common/storage_lmdb.cpp
index 6539eb0..1efffc4 100644
--- a/common/storage_lmdb.cpp
+++ b/common/storage_lmdb.cpp
@@ -156,8 +156,8 @@ void Storage::NamedDatabase::remove(const QByteArray &k, const QByteArray &value
156 const std::function<void(const Storage::Error &error)> &errorHandler) 156 const std::function<void(const Storage::Error &error)> &errorHandler)
157{ 157{
158 if (!d || !d->transaction) { 158 if (!d || !d->transaction) {
159 Error error(d->name.toLatin1() + d->db, ErrorCodes::GenericError, "Not open");
160 if (d) { 159 if (d) {
160 Error error(d->name.toLatin1() + d->db, ErrorCodes::GenericError, "Not open");
161 errorHandler ? errorHandler(error) : d->defaultErrorHandler(error); 161 errorHandler ? errorHandler(error) : d->defaultErrorHandler(error);
162 } 162 }
163 return; 163 return;
@@ -443,8 +443,8 @@ Storage::NamedDatabase Storage::Transaction::openDatabase(const QByteArray &db,
443 d->implicitCommit = true; 443 d->implicitCommit = true;
444 auto p = new Storage::NamedDatabase::Private(db, allowDuplicates, d->defaultErrorHandler, d->name, d->transaction); 444 auto p = new Storage::NamedDatabase::Private(db, allowDuplicates, d->defaultErrorHandler, d->name, d->transaction);
445 if (!p->openDatabase(d->requestedRead, errorHandler)) { 445 if (!p->openDatabase(d->requestedRead, errorHandler)) {
446 return Storage::NamedDatabase();
447 delete p; 446 delete p;
447 return Storage::NamedDatabase();
448 } 448 }
449 return Storage::NamedDatabase(p); 449 return Storage::NamedDatabase(p);
450} 450}
@@ -463,7 +463,7 @@ QList<QByteArray> Storage::Transaction::getDatabaseNames() const
463 MDB_val data; 463 MDB_val data;
464 MDB_cursor *cursor; 464 MDB_cursor *cursor;
465 465
466 rc = mdb_cursor_open(d->transaction, d->dbi, &cursor); 466 mdb_cursor_open(d->transaction, d->dbi, &cursor);
467 if ((rc = mdb_cursor_get(cursor, &key, &data, MDB_FIRST)) == 0) { 467 if ((rc = mdb_cursor_get(cursor, &key, &data, MDB_FIRST)) == 0) {
468 list << QByteArray::fromRawData((char*)key.mv_data, key.mv_size); 468 list << QByteArray::fromRawData((char*)key.mv_data, key.mv_size);
469 while ((rc = mdb_cursor_get(cursor, &key, &data, MDB_NEXT)) == 0) { 469 while ((rc = mdb_cursor_get(cursor, &key, &data, MDB_NEXT)) == 0) {
@@ -615,4 +615,12 @@ void Storage::removeFromDisk() const
615 mdb_env_close(env); 615 mdb_env_close(env);
616} 616}
617 617
618void Storage::clearEnv()
619{
620 for (auto env : Storage::Private::sEnvironments) {
621 mdb_env_close(env);
622 }
623 Storage::Private::sEnvironments.clear();
624}
625
618} // namespace Sink 626} // namespace Sink
diff --git a/common/clientapi.cpp b/common/store.cpp
index be9f3fd..6f080b5 100644
--- a/common/clientapi.cpp
+++ b/common/store.cpp
@@ -1,5 +1,5 @@
1/* 1/*
2 * Copyright (C) 2014 Christian Mollekopf <chrigi_1@fastmail.fm> 2 * Copyright (C) 2015 Christian Mollekopf <chrigi_1@fastmail.fm>
3 * 3 *
4 * This library is free software; you can redistribute it and/or 4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Lesser General Public 5 * modify it under the terms of the GNU Lesser General Public
@@ -18,14 +18,10 @@
18 * License along with this library. If not, see <http://www.gnu.org/licenses/>. 18 * License along with this library. If not, see <http://www.gnu.org/licenses/>.
19 */ 19 */
20 20
21#include "clientapi.h" 21#include "store.h"
22 22
23#include <QtConcurrent/QtConcurrentRun> 23#include <QTime>
24#include <QTimer>
25#include <QEventLoop>
26#include <QAbstractItemModel> 24#include <QAbstractItemModel>
27#include <QDir>
28#include <QUuid>
29#include <functional> 25#include <functional>
30#include <memory> 26#include <memory>
31 27
@@ -39,6 +35,9 @@
39#include "storage.h" 35#include "storage.h"
40#include "log.h" 36#include "log.h"
41 37
38#undef DEBUG_AREA
39#define DEBUG_AREA "client.store"
40
42namespace Sink 41namespace Sink
43{ 42{
44 43
@@ -47,11 +46,6 @@ QString Store::storageLocation()
47 return Sink::storageLocation(); 46 return Sink::storageLocation();
48} 47}
49 48
50QByteArray Store::resourceName(const QByteArray &instanceIdentifier)
51{
52 return Sink::resourceName(instanceIdentifier);
53}
54
55static QList<QByteArray> getResources(const QList<QByteArray> &resourceFilter, const QByteArray &type) 49static QList<QByteArray> getResources(const QList<QByteArray> &resourceFilter, const QByteArray &type)
56{ 50{
57 //Return the global resource (signified by an empty name) for types that don't eblong to a specific resource 51 //Return the global resource (signified by an empty name) for types that don't eblong to a specific resource
@@ -156,47 +150,21 @@ KAsync::Job<void> Store::remove(const DomainType &domainObject)
156 }); 150 });
157} 151}
158 152
159KAsync::Job<void> Store::shutdown(const QByteArray &identifier) 153KAsync::Job<void> Store::removeDataFromDisk(const QByteArray &identifier)
160{
161 Trace() << "shutdown " << identifier;
162 return ResourceAccess::connectToServer(identifier).then<void, QSharedPointer<QLocalSocket>>([identifier](QSharedPointer<QLocalSocket> socket, KAsync::Future<void> &future) {
163 //We can't currently reuse the socket
164 socket->close();
165 auto resourceAccess = QSharedPointer<Sink::ResourceAccess>::create(identifier);
166 resourceAccess->open();
167 resourceAccess->sendCommand(Sink::Commands::ShutdownCommand).then<void>([&future, resourceAccess]() {
168 Trace() << "Shutdown complete";
169 future.setFinished();
170 }).exec();
171 },
172 [](int, const QString &) {
173 Trace() << "Resource is already closed.";
174 //Resource isn't started, nothing to shutdown
175 })
176 //FIXME JOBAPI this is only required because we don't care about the return value of connectToServer
177 .template then<void>([](){});
178}
179
180KAsync::Job<void> Store::start(const QByteArray &identifier)
181{ 154{
182 Trace() << "start " << identifier; 155 //All databases are going to become invalid, nuke the environments
156 //TODO: all clients should react to a notification the resource
157 Sink::Storage::clearEnv();
158 Trace() << "Remove data from disk " << identifier;
159 auto time = QSharedPointer<QTime>::create();
160 time->start();
183 auto resourceAccess = QSharedPointer<Sink::ResourceAccess>::create(identifier); 161 auto resourceAccess = QSharedPointer<Sink::ResourceAccess>::create(identifier);
184 resourceAccess->open(); 162 resourceAccess->open();
185 return resourceAccess->sendCommand(Sink::Commands::PingCommand).then<void>([resourceAccess]() { 163 return resourceAccess->sendCommand(Sink::Commands::RemoveFromDiskCommand).then<void>([resourceAccess, time]() {
186 Trace() << "Start complete"; 164 Trace() << "Remove from disk complete." << Log::TraceTime(time->elapsed());
187 }); 165 });
188} 166}
189 167
190void Store::removeFromDisk(const QByteArray &identifier)
191{
192 //TODO By calling the resource executable with a --remove option instead
193 //we can ensure that no other resource process is running at the same time
194 QDir dir(Sink::storageLocation());
195 for (const auto &folder : dir.entryList(QStringList() << identifier + "*")) {
196 Sink::Storage(Sink::storageLocation(), folder, Sink::Storage::ReadWrite).removeFromDisk();
197 }
198}
199
200KAsync::Job<void> Store::synchronize(const Sink::Query &query) 168KAsync::Job<void> Store::synchronize(const Sink::Query &query)
201{ 169{
202 Trace() << "synchronize" << query.resources; 170 Trace() << "synchronize" << query.resources;
@@ -208,30 +176,7 @@ KAsync::Job<void> Store::synchronize(const Sink::Query &query)
208 resourceAccess->synchronizeResource(true, false).then<void>([&future, resourceAccess]() { 176 resourceAccess->synchronizeResource(true, false).then<void>([&future, resourceAccess]() {
209 future.setFinished(); 177 future.setFinished();
210 }).exec(); 178 }).exec();
211 }) 179 });
212 //FIXME JOBAPI this is only required because we don't care about the return value of each (and each shouldn't even have a return value)
213 .template then<void>([](){});
214}
215
216KAsync::Job<void> Store::flushMessageQueue(const QByteArrayList &resourceIdentifier)
217{
218 Trace() << "flushMessageQueue" << resourceIdentifier;
219 return KAsync::iterate(resourceIdentifier)
220 .template each<void, QByteArray>([](const QByteArray &resource, KAsync::Future<void> &future) {
221 Trace() << "Flushing message queue " << resource;
222 auto resourceAccess = QSharedPointer<Sink::ResourceAccess>::create(resource);
223 resourceAccess->open();
224 resourceAccess->synchronizeResource(false, true).then<void>([&future, resourceAccess]() {
225 future.setFinished();
226 }).exec();
227 })
228 //FIXME JOBAPI this is only required because we don't care about the return value of each (and each shouldn't even have a return value)
229 .template then<void>([](){});
230}
231
232KAsync::Job<void> Store::flushReplayQueue(const QByteArrayList &resourceIdentifier)
233{
234 return flushMessageQueue(resourceIdentifier);
235} 180}
236 181
237template <class DomainType> 182template <class DomainType>
@@ -295,63 +240,10 @@ KAsync::Job<QList<typename DomainType::Ptr> > Store::fetch(const Sink::Query &qu
295 }); 240 });
296} 241}
297 242
298template <class DomainType>
299KAsync::Job<void> Resources::inspect(const Inspection &inspectionCommand)
300{
301 auto resource = inspectionCommand.resourceIdentifier;
302
303 Trace() << "Sending inspection " << resource;
304 auto resourceAccess = QSharedPointer<Sink::ResourceAccess>::create(resource);
305 resourceAccess->open();
306 auto notifier = QSharedPointer<Sink::Notifier>::create(resourceAccess);
307 auto id = QUuid::createUuid().toByteArray();
308 return resourceAccess->sendInspectionCommand(id, ApplicationDomain::getTypeName<DomainType>(), inspectionCommand.entityIdentifier, inspectionCommand.property, inspectionCommand.expectedValue)
309 .template then<void>([resourceAccess, notifier, id](KAsync::Future<void> &future) {
310 notifier->registerHandler([&future, id](const Notification &notification) {
311 if (notification.id == id) {
312 if (notification.code) {
313 future.setError(-1, "Inspection returned an error: " + notification.message);
314 } else {
315 future.setFinished();
316 }
317 }
318 });
319 });
320}
321
322class Sink::Notifier::Private {
323public:
324 Private()
325 : context(new QObject)
326 {
327
328 }
329 QList<QSharedPointer<ResourceAccess> > resourceAccess;
330 QList<std::function<void(const Notification &)> > handler;
331 QSharedPointer<QObject> context;
332};
333
334Notifier::Notifier(const QSharedPointer<ResourceAccess> &resourceAccess)
335 : d(new Sink::Notifier::Private)
336{
337 QObject::connect(resourceAccess.data(), &ResourceAccess::notification, d->context.data(), [this](const Notification &notification) {
338 for (const auto &handler : d->handler) {
339 handler(notification);
340 }
341 });
342 d->resourceAccess << resourceAccess;
343}
344
345void Notifier::registerHandler(std::function<void(const Notification &)> handler)
346{
347 d->handler << handler;
348}
349
350#define REGISTER_TYPE(T) template KAsync::Job<void> Store::remove<T>(const T &domainObject); \ 243#define REGISTER_TYPE(T) template KAsync::Job<void> Store::remove<T>(const T &domainObject); \
351 template KAsync::Job<void> Store::create<T>(const T &domainObject); \ 244 template KAsync::Job<void> Store::create<T>(const T &domainObject); \
352 template KAsync::Job<void> Store::modify<T>(const T &domainObject); \ 245 template KAsync::Job<void> Store::modify<T>(const T &domainObject); \
353 template QSharedPointer<QAbstractItemModel> Store::loadModel<T>(Query query); \ 246 template QSharedPointer<QAbstractItemModel> Store::loadModel<T>(Query query); \
354 template KAsync::Job<void> Resources::inspect<T>(const Inspection &); \
355 template KAsync::Job<T> Store::fetchOne<T>(const Query &); \ 247 template KAsync::Job<T> Store::fetchOne<T>(const Query &); \
356 template KAsync::Job<QList<T::Ptr> > Store::fetchAll<T>(const Query &); \ 248 template KAsync::Job<QList<T::Ptr> > Store::fetchAll<T>(const Query &); \
357 template KAsync::Job<QList<T::Ptr> > Store::fetch<T>(const Query &, int); \ 249 template KAsync::Job<QList<T::Ptr> > Store::fetch<T>(const Query &, int); \
diff --git a/common/store.h b/common/store.h
new file mode 100644
index 0000000..6696833
--- /dev/null
+++ b/common/store.h
@@ -0,0 +1,101 @@
1/*
2 * Copyright (C) 2015 Christian Mollekopf <chrigi_1@fastmail.fm>
3 *
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Lesser General Public
6 * License as published by the Free Software Foundation; either
7 * version 2.1 of the License, or (at your option) version 3, or any
8 * later version accepted by the membership of KDE e.V. (or its
9 * successor approved by the membership of KDE e.V.), which shall
10 * act as a proxy defined in Section 6 of version 3 of the license.
11 *
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this library. If not, see <http://www.gnu.org/licenses/>.
19 */
20
21#pragma once
22
23#include "sink_export.h"
24#include <QString>
25#include <QSharedPointer>
26
27#include <Async/Async>
28
29#include "query.h"
30#include "applicationdomaintype.h"
31
32class QAbstractItemModel;
33
34namespace Sink {
35
36/**
37 * The unified Sink Store.
38 *
39 * This is the primary interface for clients to interact with Sink.
40 * It provides a unified store where all data provided by various resources can be accessed and modified.
41 */
42namespace Store {
43
44QString SINK_EXPORT storageLocation();
45
46enum Roles {
47 DomainObjectRole = Qt::UserRole + 1, //Must be the same as in ModelResult
48 ChildrenFetchedRole,
49 DomainObjectBaseRole
50};
51
52/**
53 * Asynchronusly load a dataset with tree structure information
54 */
55template <class DomainType>
56QSharedPointer<QAbstractItemModel> SINK_EXPORT loadModel(Query query);
57
58/**
59 * Create a new entity.
60 */
61template <class DomainType>
62KAsync::Job<void> SINK_EXPORT create(const DomainType &domainObject);
63
64/**
65 * Modify an entity.
66 *
67 * This includes moving etc. since these are also simple settings on a property.
68 */
69template <class DomainType>
70KAsync::Job<void> SINK_EXPORT modify(const DomainType &domainObject);
71
72/**
73 * Remove an entity.
74 */
75template <class DomainType>
76KAsync::Job<void> SINK_EXPORT remove(const DomainType &domainObject);
77
78/**
79 * Synchronize data to local cache.
80 */
81KAsync::Job<void> SINK_EXPORT synchronize(const Sink::Query &query);
82
83/**
84 * Removes all resource data from disk.
85 *
86 * This will not touch the configuration. All commands that that arrived at the resource before this command will be dropped. All commands that arrived later will be executed.
87 */
88KAsync::Job<void> SINK_EXPORT removeDataFromDisk(const QByteArray &resourceIdentifier);
89
90template <class DomainType>
91KAsync::Job<DomainType> SINK_EXPORT fetchOne(const Sink::Query &query);
92
93template <class DomainType>
94KAsync::Job<QList<typename DomainType::Ptr> > SINK_EXPORT fetchAll(const Sink::Query &query);
95
96template <class DomainType>
97KAsync::Job<QList<typename DomainType::Ptr> > SINK_EXPORT fetch(const Sink::Query &query, int minimumAmount = 0);
98
99 }
100}
101
diff --git a/common/threadboundary.h b/common/threadboundary.h
index 0d8ed3b..7bea4ea 100644
--- a/common/threadboundary.h
+++ b/common/threadboundary.h
@@ -20,6 +20,8 @@
20 20
21#pragma once 21#pragma once
22 22
23#include "sink_export.h"
24
23#include <QObject> 25#include <QObject>
24#include <functional> 26#include <functional>
25 27
@@ -29,7 +31,7 @@ namespace async {
29* A helper class to invoke a method in a different thread using the event loop. 31* A helper class to invoke a method in a different thread using the event loop.
30* The ThreadBoundary object must live in the thread where the function should be called. 32* The ThreadBoundary object must live in the thread where the function should be called.
31*/ 33*/
32class ThreadBoundary : public QObject { 34class SINK_EXPORT ThreadBoundary : public QObject {
33 Q_OBJECT 35 Q_OBJECT
34public: 36public:
35 ThreadBoundary(); 37 ThreadBoundary();
diff --git a/common/typeindex.cpp b/common/typeindex.cpp
index 03ad8f7..b1bcf6a 100644
--- a/common/typeindex.cpp
+++ b/common/typeindex.cpp
@@ -22,6 +22,9 @@
22#include "index.h" 22#include "index.h"
23#include <QDateTime> 23#include <QDateTime>
24 24
25#undef DEBUG_AREA
26#define DEBUG_AREA "common.typeindex"
27
25TypeIndex::TypeIndex(const QByteArray &type) 28TypeIndex::TypeIndex(const QByteArray &type)
26 : mType(type) 29 : mType(type)
27{ 30{
@@ -111,9 +114,9 @@ ResultSet TypeIndex::query(const Sink::Query &query, QSet<QByteArray> &appliedFi
111 Warning() << "Error in index: " << error.message << property; 114 Warning() << "Error in index: " << error.message << property;
112 }); 115 });
113 appliedFilters << property; 116 appliedFilters << property;
117 Trace() << "Index lookup on " << property << " found " << keys.size() << " keys.";
118 return ResultSet(keys);
114 } 119 }
115 Trace() << "Index lookup on " << property << " found " << keys.size() << " keys.";
116 return ResultSet(keys);
117 } 120 }
118 Trace() << "No matching index"; 121 Trace() << "No matching index";
119 return ResultSet(keys); 122 return ResultSet(keys);