summaryrefslogtreecommitdiffstats
path: root/common
diff options
context:
space:
mode:
Diffstat (limited to 'common')
-rw-r--r--common/CMakeLists.txt1
-rw-r--r--common/bufferutils.h26
-rw-r--r--common/clientapi.cpp141
-rw-r--r--common/clientapi.h42
-rw-r--r--common/commands.cpp4
-rw-r--r--common/commands.h3
-rw-r--r--common/commands/commandcompletion.fbs2
-rw-r--r--common/commands/fetchentity.fbs2
-rw-r--r--common/commands/handshake.fbs2
-rw-r--r--common/commands/inspection.fbs12
-rw-r--r--common/commands/notification.fbs8
-rw-r--r--common/commands/revisionupdate.fbs2
-rw-r--r--common/commands/synchronize.fbs2
-rw-r--r--common/domain/applicationdomaintype.h27
-rw-r--r--common/facade.cpp5
-rw-r--r--common/genericresource.cpp63
-rw-r--r--common/genericresource.h1
-rw-r--r--common/inspection.h60
-rw-r--r--common/listener.cpp60
-rw-r--r--common/listener.h6
-rw-r--r--common/log.cpp18
-rw-r--r--common/log.h4
-rw-r--r--common/notification.h40
-rw-r--r--common/pipeline.cpp5
-rw-r--r--common/query.h120
-rw-r--r--common/resource.h3
-rw-r--r--common/resourceaccess.cpp79
-rw-r--r--common/resourceaccess.h5
-rw-r--r--common/resourcefacade.h5
-rw-r--r--common/resultprovider.h12
30 files changed, 667 insertions, 93 deletions
diff --git a/common/CMakeLists.txt b/common/CMakeLists.txt
index 87f4898..85cd621 100644
--- a/common/CMakeLists.txt
+++ b/common/CMakeLists.txt
@@ -75,6 +75,7 @@ generate_flatbuffers(
75 commands/synchronize 75 commands/synchronize
76 commands/notification 76 commands/notification
77 commands/revisionreplayed 77 commands/revisionreplayed
78 commands/inspection
78 domain/event 79 domain/event
79 domain/mail 80 domain/mail
80 domain/folder 81 domain/folder
diff --git a/common/bufferutils.h b/common/bufferutils.h
new file mode 100644
index 0000000..b0fb75a
--- /dev/null
+++ b/common/bufferutils.h
@@ -0,0 +1,26 @@
1#pragma once
2
3#include <flatbuffers/flatbuffers.h>
4#include <QByteArray>
5
6namespace Akonadi2 {
7namespace BufferUtils {
8 template<typename T>
9 static QByteArray extractBuffer(const T *data)
10 {
11 return QByteArray::fromRawData(reinterpret_cast<char const *>(data->Data()), data->size());
12 }
13
14 template<typename T>
15 static QByteArray extractBufferCopy(const T *data)
16 {
17 return QByteArray(reinterpret_cast<char const *>(data->Data()), data->size());
18 }
19
20 static QByteArray extractBuffer(const flatbuffers::FlatBufferBuilder &fbb)
21 {
22 return QByteArray::fromRawData(reinterpret_cast<char const *>(fbb.GetBufferPointer()), fbb.GetSize());
23 }
24}
25}
26
diff --git a/common/clientapi.cpp b/common/clientapi.cpp
index e7ca99d..824ef19 100644
--- a/common/clientapi.cpp
+++ b/common/clientapi.cpp
@@ -25,6 +25,7 @@
25#include <QEventLoop> 25#include <QEventLoop>
26#include <QAbstractItemModel> 26#include <QAbstractItemModel>
27#include <QDir> 27#include <QDir>
28#include <QUuid>
28#include <functional> 29#include <functional>
29#include <memory> 30#include <memory>
30 31
@@ -204,7 +205,7 @@ KAsync::Job<void> Store::synchronize(const Akonadi2::Query &query)
204 Trace() << "Synchronizing " << resource; 205 Trace() << "Synchronizing " << resource;
205 auto resourceAccess = QSharedPointer<Akonadi2::ResourceAccess>::create(resource); 206 auto resourceAccess = QSharedPointer<Akonadi2::ResourceAccess>::create(resource);
206 resourceAccess->open(); 207 resourceAccess->open();
207 resourceAccess->synchronizeResource(query.syncOnDemand, query.processAll).then<void>([&future, resourceAccess]() { 208 resourceAccess->synchronizeResource(true, false).then<void>([&future, resourceAccess]() {
208 future.setFinished(); 209 future.setFinished();
209 }).exec(); 210 }).exec();
210 }) 211 })
@@ -212,10 +213,148 @@ KAsync::Job<void> Store::synchronize(const Akonadi2::Query &query)
212 .template then<void>([](){}); 213 .template then<void>([](){});
213} 214}
214 215
216KAsync::Job<void> Store::flushMessageQueue(const QByteArrayList &resourceIdentifier)
217{
218 Trace() << "flushMessageQueue" << resourceIdentifier;
219 return KAsync::iterate(resourceIdentifier)
220 .template each<void, QByteArray>([](const QByteArray &resource, KAsync::Future<void> &future) {
221 Trace() << "Flushing message queue " << resource;
222 auto resourceAccess = QSharedPointer<Akonadi2::ResourceAccess>::create(resource);
223 resourceAccess->open();
224 resourceAccess->synchronizeResource(false, true).then<void>([&future, resourceAccess]() {
225 future.setFinished();
226 }).exec();
227 })
228 //FIXME JOBAPI this is only required because we don't care about the return value of each (and each shouldn't even have a return value)
229 .template then<void>([](){});
230}
231
232KAsync::Job<void> Store::flushReplayQueue(const QByteArrayList &resourceIdentifier)
233{
234 return flushMessageQueue(resourceIdentifier);
235}
236
237template <class DomainType>
238KAsync::Job<DomainType> Store::fetchOne(const Akonadi2::Query &query)
239{
240 return KAsync::start<DomainType>([query](KAsync::Future<DomainType> &future) {
241 //FIXME We could do this more elegantly if composed jobs would have the correct type (In that case we'd simply return the value from then continuation, and could avoid the outer job entirely)
242 fetch<DomainType>(query, 1)
243 .template then<void, QList<typename DomainType::Ptr> >([&future](const QList<typename DomainType::Ptr> &list){
244 future.setValue(*list.first());
245 future.setFinished();
246 }, [&future](int errorCode, const QString &errorMessage) {
247 future.setError(errorCode, errorMessage);
248 future.setFinished();
249 }).exec();
250 });
251}
252
253template <class DomainType>
254KAsync::Job<QList<typename DomainType::Ptr> > Store::fetchAll(const Akonadi2::Query &query)
255{
256 return fetch<DomainType>(query);
257}
258
259template <class DomainType>
260KAsync::Job<QList<typename DomainType::Ptr> > Store::fetch(const Akonadi2::Query &query, int minimumAmount)
261{
262 auto model = loadModel<DomainType>(query);
263 auto list = QSharedPointer<QList<typename DomainType::Ptr> >::create();
264 auto context = QSharedPointer<QObject>::create();
265 return KAsync::start<QList<typename DomainType::Ptr> >([model, list, context, minimumAmount](KAsync::Future<QList<typename DomainType::Ptr> > &future) {
266 if (model->rowCount() >= 1) {
267 for (int i = 0; i < model->rowCount(); i++) {
268 list->append(model->index(i, 0, QModelIndex()).data(Akonadi2::Store::DomainObjectRole).template value<typename DomainType::Ptr>());
269 }
270 } else {
271 QObject::connect(model.data(), &QAbstractItemModel::rowsInserted, context.data(), [model, &future, list](const QModelIndex &index, int start, int end) {
272 for (int i = start; i <= end; i++) {
273 list->append(model->index(i, 0, QModelIndex()).data(Akonadi2::Store::DomainObjectRole).template value<typename DomainType::Ptr>());
274 }
275 });
276 QObject::connect(model.data(), &QAbstractItemModel::dataChanged, context.data(), [model, &future, list, minimumAmount](const QModelIndex &, const QModelIndex &, const QVector<int> &roles) {
277 if (roles.contains(ModelResult<DomainType, typename DomainType::Ptr>::ChildrenFetchedRole)) {
278 if (list->size() < minimumAmount) {
279 future.setError(1, "Not enough values.");
280 } else {
281 future.setValue(*list);
282 }
283 future.setFinished();
284 }
285 });
286 }
287 if (model->data(QModelIndex(), ModelResult<DomainType, typename DomainType::Ptr>::ChildrenFetchedRole).toBool()) {
288 if (list->size() < minimumAmount) {
289 future.setError(1, "Not enough values.");
290 } else {
291 future.setValue(*list);
292 }
293 future.setFinished();
294 }
295 });
296}
297
298template <class DomainType>
299KAsync::Job<void> Resources::inspect(const Inspection &inspectionCommand)
300{
301 auto resource = inspectionCommand.resourceIdentifier;
302
303 Trace() << "Sending inspection " << resource;
304 auto resourceAccess = QSharedPointer<Akonadi2::ResourceAccess>::create(resource);
305 resourceAccess->open();
306 auto notifier = QSharedPointer<Akonadi2::Notifier>::create(resourceAccess);
307 auto id = QUuid::createUuid().toByteArray();
308 return resourceAccess->sendInspectionCommand(id, ApplicationDomain::getTypeName<DomainType>(), inspectionCommand.entityIdentifier, inspectionCommand.property, inspectionCommand.expectedValue)
309 .template then<void>([resourceAccess, notifier, id](KAsync::Future<void> &future) {
310 notifier->registerHandler([&future, id](const Notification &notification) {
311 if (notification.id == id) {
312 if (notification.code) {
313 future.setError(-1, "Inspection returned an error: " + notification.message);
314 } else {
315 future.setFinished();
316 }
317 }
318 });
319 });
320}
321
322class Akonadi2::Notifier::Private {
323public:
324 Private()
325 : context(new QObject)
326 {
327
328 }
329 QList<QSharedPointer<ResourceAccess> > resourceAccess;
330 QList<std::function<void(const Notification &)> > handler;
331 QSharedPointer<QObject> context;
332};
333
334Notifier::Notifier(const QSharedPointer<ResourceAccess> &resourceAccess)
335 : d(new Akonadi2::Notifier::Private)
336{
337 QObject::connect(resourceAccess.data(), &ResourceAccess::notification, d->context.data(), [this](const Notification &notification) {
338 for (const auto &handler : d->handler) {
339 handler(notification);
340 }
341 });
342 d->resourceAccess << resourceAccess;
343}
344
345void Notifier::registerHandler(std::function<void(const Notification &)> handler)
346{
347 d->handler << handler;
348}
349
215#define REGISTER_TYPE(T) template KAsync::Job<void> Store::remove<T>(const T &domainObject); \ 350#define REGISTER_TYPE(T) template KAsync::Job<void> Store::remove<T>(const T &domainObject); \
216 template KAsync::Job<void> Store::create<T>(const T &domainObject); \ 351 template KAsync::Job<void> Store::create<T>(const T &domainObject); \
217 template KAsync::Job<void> Store::modify<T>(const T &domainObject); \ 352 template KAsync::Job<void> Store::modify<T>(const T &domainObject); \
218 template QSharedPointer<QAbstractItemModel> Store::loadModel<T>(Query query); \ 353 template QSharedPointer<QAbstractItemModel> Store::loadModel<T>(Query query); \
354 template KAsync::Job<void> Resources::inspect<T>(const Inspection &); \
355 template KAsync::Job<T> Store::fetchOne<T>(const Query &); \
356 template KAsync::Job<QList<T::Ptr> > Store::fetchAll<T>(const Query &); \
357 template KAsync::Job<QList<T::Ptr> > Store::fetch<T>(const Query &, int); \
219 358
220REGISTER_TYPE(ApplicationDomain::Event); 359REGISTER_TYPE(ApplicationDomain::Event);
221REGISTER_TYPE(ApplicationDomain::Mail); 360REGISTER_TYPE(ApplicationDomain::Mail);
diff --git a/common/clientapi.h b/common/clientapi.h
index 4e55432..06376c2 100644
--- a/common/clientapi.h
+++ b/common/clientapi.h
@@ -26,11 +26,14 @@
26#include <Async/Async> 26#include <Async/Async>
27 27
28#include "query.h" 28#include "query.h"
29#include "inspection.h"
29#include "applicationdomaintype.h" 30#include "applicationdomaintype.h"
30 31
31class QAbstractItemModel; 32class QAbstractItemModel;
32 33
33namespace Akonadi2 { 34namespace Akonadi2 {
35class ResourceAccess;
36class Notification;
34 37
35/** 38/**
36 * Store interface used in the client API. 39 * Store interface used in the client API.
@@ -73,6 +76,11 @@ public:
73 static KAsync::Job<void> remove(const DomainType &domainObject); 76 static KAsync::Job<void> remove(const DomainType &domainObject);
74 77
75 /** 78 /**
79 * Synchronize data to local cache.
80 */
81 static KAsync::Job<void> synchronize(const Akonadi2::Query &query);
82
83 /**
76 * Shutdown resource. 84 * Shutdown resource.
77 */ 85 */
78 static KAsync::Job<void> shutdown(const QByteArray &resourceIdentifier); 86 static KAsync::Job<void> shutdown(const QByteArray &resourceIdentifier);
@@ -87,16 +95,46 @@ public:
87 static KAsync::Job<void> start(const QByteArray &resourceIdentifier); 95 static KAsync::Job<void> start(const QByteArray &resourceIdentifier);
88 96
89 /** 97 /**
90 * Synchronize data to local cache. 98 * Flushes any pending messages to disk
91 */ 99 */
92 static KAsync::Job<void> synchronize(const Akonadi2::Query &query); 100 static KAsync::Job<void> flushMessageQueue(const QByteArrayList &resourceIdentifier);
101
102 /**
103 * Flushes any pending messages that haven't been replayed to the source.
104 */
105 static KAsync::Job<void> flushReplayQueue(const QByteArrayList &resourceIdentifier);
93 106
94 /** 107 /**
95 * Removes a resource from disk. 108 * Removes a resource from disk.
96 */ 109 */
97 static void removeFromDisk(const QByteArray &resourceIdentifier); 110 static void removeFromDisk(const QByteArray &resourceIdentifier);
111
112 template <class DomainType>
113 static KAsync::Job<DomainType> fetchOne(const Akonadi2::Query &query);
114
115 template <class DomainType>
116 static KAsync::Job<QList<typename DomainType::Ptr> > fetchAll(const Akonadi2::Query &query);
117
118 template <class DomainType>
119 static KAsync::Job<QList<typename DomainType::Ptr> > fetch(const Akonadi2::Query &query, int minimumAmount = 0);
98}; 120};
99 121
122namespace Resources {
123 template <class DomainType>
124 KAsync::Job<void> inspect(const Inspection &inspectionCommand);
125}
126
127class Notifier {
128public:
129 Notifier(const QSharedPointer<ResourceAccess> &resourceAccess);
130 // Notifier(const QByteArray &resource);
131 // Notifier(const QByteArrayList &resource);
132 void registerHandler(std::function<void(const Notification &)>);
133
134private:
135 class Private;
136 QScopedPointer<Private> d;
137};
100 138
101} 139}
102 140
diff --git a/common/commands.cpp b/common/commands.cpp
index 7a0ae23..35dfb13 100644
--- a/common/commands.cpp
+++ b/common/commands.cpp
@@ -33,7 +33,7 @@ QByteArray name(int commandId)
33 switch(commandId) { 33 switch(commandId) {
34 case UnknownCommand: 34 case UnknownCommand:
35 return "Unknown"; 35 return "Unknown";
36 case CommandCompletion: 36 case CommandCompletionCommand:
37 return "Completion"; 37 return "Completion";
38 case HandshakeCommand: 38 case HandshakeCommand:
39 return "Handshake"; 39 return "Handshake";
@@ -59,6 +59,8 @@ QByteArray name(int commandId)
59 return "Ping"; 59 return "Ping";
60 case RevisionReplayedCommand: 60 case RevisionReplayedCommand:
61 return "RevisionReplayed"; 61 return "RevisionReplayed";
62 case InspectionCommand:
63 return "Inspection";
62 case CustomCommand: 64 case CustomCommand:
63 return "Custom"; 65 return "Custom";
64 }; 66 };
diff --git a/common/commands.h b/common/commands.h
index c68ef90..33d5cd7 100644
--- a/common/commands.h
+++ b/common/commands.h
@@ -34,7 +34,7 @@ namespace Commands
34 34
35enum CommandIds { 35enum CommandIds {
36 UnknownCommand = 0, 36 UnknownCommand = 0,
37 CommandCompletion, 37 CommandCompletionCommand,
38 HandshakeCommand, 38 HandshakeCommand,
39 RevisionUpdateCommand, 39 RevisionUpdateCommand,
40 SynchronizeCommand, 40 SynchronizeCommand,
@@ -47,6 +47,7 @@ enum CommandIds {
47 NotificationCommand, 47 NotificationCommand,
48 PingCommand, 48 PingCommand,
49 RevisionReplayedCommand, 49 RevisionReplayedCommand,
50 InspectionCommand,
50 CustomCommand = 0xffff 51 CustomCommand = 0xffff
51}; 52};
52 53
diff --git a/common/commands/commandcompletion.fbs b/common/commands/commandcompletion.fbs
index 5330b4f..de7ec14 100644
--- a/common/commands/commandcompletion.fbs
+++ b/common/commands/commandcompletion.fbs
@@ -1,4 +1,4 @@
1namespace Akonadi2; 1namespace Akonadi2.Commands;
2 2
3table CommandCompletion { 3table CommandCompletion {
4 id: ulong; 4 id: ulong;
diff --git a/common/commands/fetchentity.fbs b/common/commands/fetchentity.fbs
index ddca275..7a1d74d 100644
--- a/common/commands/fetchentity.fbs
+++ b/common/commands/fetchentity.fbs
@@ -1,4 +1,4 @@
1namespace Akonadi2; 1namespace Akonadi2.Commands;
2 2
3table FetchEntity { 3table FetchEntity {
4 revision: ulong; 4 revision: ulong;
diff --git a/common/commands/handshake.fbs b/common/commands/handshake.fbs
index 52a883a..e824715 100644
--- a/common/commands/handshake.fbs
+++ b/common/commands/handshake.fbs
@@ -1,4 +1,4 @@
1namespace Akonadi2; 1namespace Akonadi2.Commands;
2 2
3table Handshake { 3table Handshake {
4 name: string; 4 name: string;
diff --git a/common/commands/inspection.fbs b/common/commands/inspection.fbs
new file mode 100644
index 0000000..aaae1ae
--- /dev/null
+++ b/common/commands/inspection.fbs
@@ -0,0 +1,12 @@
1namespace Akonadi2.Commands;
2
3table Inspection {
4 id: string;
5 type: int;
6 entityId: string;
7 domainType: string;
8 property: string;
9 expectedValue: string;
10}
11
12root_type Inspection;
diff --git a/common/commands/notification.fbs b/common/commands/notification.fbs
index 6684472..89687cf 100644
--- a/common/commands/notification.fbs
+++ b/common/commands/notification.fbs
@@ -1,9 +1,13 @@
1namespace Akonadi2; 1namespace Akonadi2.Commands;
2 2
3enum NotificationType : byte { Shutdown = 1, Status, Warning, Progress } 3enum NotificationType : byte { Shutdown = 1, Status, Warning, Progress, Inspection }
4enum NotificationCode : byte { Success = 0, Failure = 1, UserCode }
4 5
5table Notification { 6table Notification {
6 type: NotificationType = Status; 7 type: NotificationType = Status;
8 identifier: string; //An identifier that links back to the something related to the notification (e.g. an entity id or a command id)
9 message: string;
10 code: int = 0; //Of type NotificationCode
7} 11}
8 12
9root_type Notification; 13root_type Notification;
diff --git a/common/commands/revisionupdate.fbs b/common/commands/revisionupdate.fbs
index 634bcd0..93fbe34 100644
--- a/common/commands/revisionupdate.fbs
+++ b/common/commands/revisionupdate.fbs
@@ -1,4 +1,4 @@
1namespace Akonadi2; 1namespace Akonadi2.Commands;
2 2
3table RevisionUpdate { 3table RevisionUpdate {
4 revision: ulong; 4 revision: ulong;
diff --git a/common/commands/synchronize.fbs b/common/commands/synchronize.fbs
index d2d0364..7c3ae9a 100644
--- a/common/commands/synchronize.fbs
+++ b/common/commands/synchronize.fbs
@@ -1,4 +1,4 @@
1namespace Akonadi2; 1namespace Akonadi2.Commands;
2 2
3table Synchronize { 3table Synchronize {
4 sourceSync: bool; //Synchronize with source 4 sourceSync: bool; //Synchronize with source
diff --git a/common/domain/applicationdomaintype.h b/common/domain/applicationdomaintype.h
index cff0172..44d8743 100644
--- a/common/domain/applicationdomaintype.h
+++ b/common/domain/applicationdomaintype.h
@@ -82,29 +82,34 @@ inline bool operator==(const ApplicationDomainType& lhs, const ApplicationDomain
82 && lhs.resourceInstanceIdentifier() == rhs.resourceInstanceIdentifier(); 82 && lhs.resourceInstanceIdentifier() == rhs.resourceInstanceIdentifier();
83} 83}
84 84
85struct Event : public ApplicationDomainType { 85struct Entity : public ApplicationDomainType {
86 typedef QSharedPointer<Event> Ptr; 86 typedef QSharedPointer<Entity> Ptr;
87 using ApplicationDomainType::ApplicationDomainType; 87 using ApplicationDomainType::ApplicationDomainType;
88}; 88};
89 89
90struct Todo : public ApplicationDomainType { 90struct Event : public Entity {
91 typedef QSharedPointer<Event> Ptr;
92 using Entity::Entity;
93};
94
95struct Todo : public Entity {
91 typedef QSharedPointer<Todo> Ptr; 96 typedef QSharedPointer<Todo> Ptr;
92 using ApplicationDomainType::ApplicationDomainType; 97 using Entity::Entity;
93}; 98};
94 99
95struct Calendar : public ApplicationDomainType { 100struct Calendar : public Entity {
96 typedef QSharedPointer<Calendar> Ptr; 101 typedef QSharedPointer<Calendar> Ptr;
97 using ApplicationDomainType::ApplicationDomainType; 102 using Entity::Entity;
98}; 103};
99 104
100struct Mail : public ApplicationDomainType { 105struct Mail : public Entity {
101 typedef QSharedPointer<Mail> Ptr; 106 typedef QSharedPointer<Mail> Ptr;
102 using ApplicationDomainType::ApplicationDomainType; 107 using Entity::Entity;
103}; 108};
104 109
105struct Folder : public ApplicationDomainType { 110struct Folder : public Entity {
106 typedef QSharedPointer<Folder> Ptr; 111 typedef QSharedPointer<Folder> Ptr;
107 using ApplicationDomainType::ApplicationDomainType; 112 using Entity::Entity;
108}; 113};
109 114
110/** 115/**
@@ -155,6 +160,8 @@ class TypeImplementation;
155 160
156Q_DECLARE_METATYPE(Akonadi2::ApplicationDomain::ApplicationDomainType) 161Q_DECLARE_METATYPE(Akonadi2::ApplicationDomain::ApplicationDomainType)
157Q_DECLARE_METATYPE(Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr) 162Q_DECLARE_METATYPE(Akonadi2::ApplicationDomain::ApplicationDomainType::Ptr)
163Q_DECLARE_METATYPE(Akonadi2::ApplicationDomain::Entity)
164Q_DECLARE_METATYPE(Akonadi2::ApplicationDomain::Entity::Ptr)
158Q_DECLARE_METATYPE(Akonadi2::ApplicationDomain::Event) 165Q_DECLARE_METATYPE(Akonadi2::ApplicationDomain::Event)
159Q_DECLARE_METATYPE(Akonadi2::ApplicationDomain::Event::Ptr) 166Q_DECLARE_METATYPE(Akonadi2::ApplicationDomain::Event::Ptr)
160Q_DECLARE_METATYPE(Akonadi2::ApplicationDomain::Mail) 167Q_DECLARE_METATYPE(Akonadi2::ApplicationDomain::Mail)
diff --git a/common/facade.cpp b/common/facade.cpp
index 22ef84a..91021db 100644
--- a/common/facade.cpp
+++ b/common/facade.cpp
@@ -25,6 +25,7 @@
25#include "definitions.h" 25#include "definitions.h"
26#include "domainadaptor.h" 26#include "domainadaptor.h"
27#include "queryrunner.h" 27#include "queryrunner.h"
28#include "bufferutils.h"
28 29
29using namespace Akonadi2; 30using namespace Akonadi2;
30 31
@@ -113,7 +114,7 @@ KAsync::Job<void> GenericFacade<DomainType>::create(const DomainType &domainObje
113 } 114 }
114 flatbuffers::FlatBufferBuilder entityFbb; 115 flatbuffers::FlatBufferBuilder entityFbb;
115 mDomainTypeAdaptorFactory->createBuffer(domainObject, entityFbb); 116 mDomainTypeAdaptorFactory->createBuffer(domainObject, entityFbb);
116 return mResourceAccess->sendCreateCommand(bufferTypeForDomainType(), QByteArray::fromRawData(reinterpret_cast<const char*>(entityFbb.GetBufferPointer()), entityFbb.GetSize())); 117 return mResourceAccess->sendCreateCommand(bufferTypeForDomainType(), BufferUtils::extractBuffer(entityFbb));
117} 118}
118 119
119template<class DomainType> 120template<class DomainType>
@@ -125,7 +126,7 @@ KAsync::Job<void> GenericFacade<DomainType>::modify(const DomainType &domainObje
125 } 126 }
126 flatbuffers::FlatBufferBuilder entityFbb; 127 flatbuffers::FlatBufferBuilder entityFbb;
127 mDomainTypeAdaptorFactory->createBuffer(domainObject, entityFbb); 128 mDomainTypeAdaptorFactory->createBuffer(domainObject, entityFbb);
128 return mResourceAccess->sendModifyCommand(domainObject.identifier(), domainObject.revision(), bufferTypeForDomainType(), QByteArrayList(), QByteArray::fromRawData(reinterpret_cast<const char*>(entityFbb.GetBufferPointer()), entityFbb.GetSize())); 129 return mResourceAccess->sendModifyCommand(domainObject.identifier(), domainObject.revision(), bufferTypeForDomainType(), QByteArrayList(), BufferUtils::extractBuffer(entityFbb));
129} 130}
130 131
131template<class DomainType> 132template<class DomainType>
diff --git a/common/genericresource.cpp b/common/genericresource.cpp
index 29acce4..c7f323a 100644
--- a/common/genericresource.cpp
+++ b/common/genericresource.cpp
@@ -6,13 +6,17 @@
6#include "createentity_generated.h" 6#include "createentity_generated.h"
7#include "modifyentity_generated.h" 7#include "modifyentity_generated.h"
8#include "deleteentity_generated.h" 8#include "deleteentity_generated.h"
9#include "inspection_generated.h"
10#include "notification_generated.h"
9#include "domainadaptor.h" 11#include "domainadaptor.h"
10#include "commands.h" 12#include "commands.h"
11#include "index.h" 13#include "index.h"
12#include "log.h" 14#include "log.h"
13#include "definitions.h" 15#include "definitions.h"
16#include "bufferutils.h"
14 17
15#include <QUuid> 18#include <QUuid>
19#include <QDataStream>
16 20
17static int sBatchSize = 100; 21static int sBatchSize = 100;
18 22
@@ -112,6 +116,7 @@ private:
112class CommandProcessor : public QObject 116class CommandProcessor : public QObject
113{ 117{
114 Q_OBJECT 118 Q_OBJECT
119 typedef std::function<KAsync::Job<void>(void const *, size_t)> InspectionFunction;
115public: 120public:
116 CommandProcessor(Akonadi2::Pipeline *pipeline, QList<MessageQueue*> commandQueues) 121 CommandProcessor(Akonadi2::Pipeline *pipeline, QList<MessageQueue*> commandQueues)
117 : QObject(), 122 : QObject(),
@@ -135,6 +140,11 @@ public:
135 mLowerBoundRevision = revision; 140 mLowerBoundRevision = revision;
136 } 141 }
137 142
143 void setInspectionCommand(const InspectionFunction &f)
144 {
145 mInspect = f;
146 }
147
138 148
139signals: 149signals:
140 void error(int errorCode, const QString &errorMessage); 150 void error(int errorCode, const QString &errorMessage);
@@ -176,6 +186,14 @@ private slots:
176 return mPipeline->modifiedEntity(queuedCommand->command()->Data(), queuedCommand->command()->size()); 186 return mPipeline->modifiedEntity(queuedCommand->command()->Data(), queuedCommand->command()->size());
177 case Akonadi2::Commands::CreateEntityCommand: 187 case Akonadi2::Commands::CreateEntityCommand:
178 return mPipeline->newEntity(queuedCommand->command()->Data(), queuedCommand->command()->size()); 188 return mPipeline->newEntity(queuedCommand->command()->Data(), queuedCommand->command()->size());
189 case Akonadi2::Commands::InspectionCommand:
190 if (mInspect) {
191 return mInspect(queuedCommand->command()->Data(), queuedCommand->command()->size()).then<qint64>([]() {
192 return -1;
193 });
194 } else {
195 return KAsync::error<qint64>(-1, "Missing inspection command.");
196 }
179 default: 197 default:
180 return KAsync::error<qint64>(-1, "Unhandled command"); 198 return KAsync::error<qint64>(-1, "Unhandled command");
181 } 199 }
@@ -266,6 +284,7 @@ private:
266 bool mProcessingLock; 284 bool mProcessingLock;
267 //The lowest revision we no longer need 285 //The lowest revision we no longer need
268 qint64 mLowerBoundRevision; 286 qint64 mLowerBoundRevision;
287 InspectionFunction mInspect;
269}; 288};
270 289
271 290
@@ -279,6 +298,38 @@ GenericResource::GenericResource(const QByteArray &resourceInstanceIdentifier, c
279 mClientLowerBoundRevision(std::numeric_limits<qint64>::max()) 298 mClientLowerBoundRevision(std::numeric_limits<qint64>::max())
280{ 299{
281 mProcessor = new CommandProcessor(mPipeline.data(), QList<MessageQueue*>() << &mUserQueue << &mSynchronizerQueue); 300 mProcessor = new CommandProcessor(mPipeline.data(), QList<MessageQueue*>() << &mUserQueue << &mSynchronizerQueue);
301 mProcessor->setInspectionCommand([this](void const *command, size_t size) {
302 flatbuffers::Verifier verifier((const uint8_t *)command, size);
303 if (Akonadi2::Commands::VerifyInspectionBuffer(verifier)) {
304 auto buffer = Akonadi2::Commands::GetInspection(command);
305 int inspectionType = buffer->type();
306
307 QByteArray inspectionId = BufferUtils::extractBuffer(buffer->id());
308 QByteArray entityId = BufferUtils::extractBuffer(buffer->entityId());
309 QByteArray domainType = BufferUtils::extractBuffer(buffer->domainType());
310 QByteArray property = BufferUtils::extractBuffer(buffer->property());
311 QByteArray expectedValueString = BufferUtils::extractBuffer(buffer->expectedValue());
312 QDataStream s(expectedValueString);
313 QVariant expectedValue;
314 s >> expectedValue;
315 inspect(inspectionType, inspectionId, domainType, entityId, property, expectedValue).then<void>([=]() {
316 Akonadi2::Notification n;
317 n.type = Akonadi2::Commands::NotificationType_Inspection;
318 n.id = inspectionId;
319 n.code = Akonadi2::Commands::NotificationCode_Success;
320 emit notify(n);
321 }, [=](int code, const QString &message) {
322 Akonadi2::Notification n;
323 n.type = Akonadi2::Commands::NotificationType_Inspection;
324 n.message = message;
325 n.id = inspectionId;
326 n.code = Akonadi2::Commands::NotificationCode_Failure;
327 emit notify(n);
328 }).exec();
329 return KAsync::null<void>();
330 }
331 return KAsync::error<void>(-1, "Invalid inspection command.");
332 });
282 QObject::connect(mProcessor, &CommandProcessor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); }); 333 QObject::connect(mProcessor, &CommandProcessor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); });
283 QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, this, &Resource::revisionUpdated); 334 QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, this, &Resource::revisionUpdated);
284 mSourceChangeReplay = new ChangeReplay(resourceInstanceIdentifier, [this](const QByteArray &type, const QByteArray &key, const QByteArray &value) { 335 mSourceChangeReplay = new ChangeReplay(resourceInstanceIdentifier, [this](const QByteArray &type, const QByteArray &key, const QByteArray &value) {
@@ -301,6 +352,12 @@ GenericResource::~GenericResource()
301 delete mSourceChangeReplay; 352 delete mSourceChangeReplay;
302} 353}
303 354
355KAsync::Job<void> GenericResource::inspect(int inspectionType, const QByteArray &inspectionId, const QByteArray &domainType, const QByteArray &entityId, const QByteArray &property, const QVariant &expectedValue)
356{
357 Warning() << "Inspection not implemented";
358 return KAsync::null<void>();
359}
360
304void GenericResource::enableChangeReplay(bool enable) 361void GenericResource::enableChangeReplay(bool enable)
305{ 362{
306 if (enable) { 363 if (enable) {
@@ -464,7 +521,7 @@ void GenericResource::createEntity(const QByteArray &akonadiId, const QByteArray
464 auto delta = Akonadi2::EntityBuffer::appendAsVector(fbb, entityFbb.GetBufferPointer(), entityFbb.GetSize()); 521 auto delta = Akonadi2::EntityBuffer::appendAsVector(fbb, entityFbb.GetBufferPointer(), entityFbb.GetSize());
465 auto location = Akonadi2::Commands::CreateCreateEntity(fbb, entityId, type, delta, replayToSource); 522 auto location = Akonadi2::Commands::CreateCreateEntity(fbb, entityId, type, delta, replayToSource);
466 Akonadi2::Commands::FinishCreateEntityBuffer(fbb, location); 523 Akonadi2::Commands::FinishCreateEntityBuffer(fbb, location);
467 callback(QByteArray::fromRawData(reinterpret_cast<char const *>(fbb.GetBufferPointer()), fbb.GetSize())); 524 callback(BufferUtils::extractBuffer(fbb));
468} 525}
469 526
470void GenericResource::modifyEntity(const QByteArray &akonadiId, qint64 revision, const QByteArray &bufferType, const Akonadi2::ApplicationDomain::ApplicationDomainType &domainObject, DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function<void(const QByteArray &)> callback) 527void GenericResource::modifyEntity(const QByteArray &akonadiId, qint64 revision, const QByteArray &bufferType, const Akonadi2::ApplicationDomain::ApplicationDomainType &domainObject, DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function<void(const QByteArray &)> callback)
@@ -481,7 +538,7 @@ void GenericResource::modifyEntity(const QByteArray &akonadiId, qint64 revision,
481 //TODO removals 538 //TODO removals
482 auto location = Akonadi2::Commands::CreateModifyEntity(fbb, revision, entityId, 0, type, delta, replayToSource); 539 auto location = Akonadi2::Commands::CreateModifyEntity(fbb, revision, entityId, 0, type, delta, replayToSource);
483 Akonadi2::Commands::FinishModifyEntityBuffer(fbb, location); 540 Akonadi2::Commands::FinishModifyEntityBuffer(fbb, location);
484 callback(QByteArray::fromRawData(reinterpret_cast<char const *>(fbb.GetBufferPointer()), fbb.GetSize())); 541 callback(BufferUtils::extractBuffer(fbb));
485} 542}
486 543
487void GenericResource::deleteEntity(const QByteArray &akonadiId, qint64 revision, const QByteArray &bufferType, std::function<void(const QByteArray &)> callback) 544void GenericResource::deleteEntity(const QByteArray &akonadiId, qint64 revision, const QByteArray &bufferType, std::function<void(const QByteArray &)> callback)
@@ -494,7 +551,7 @@ void GenericResource::deleteEntity(const QByteArray &akonadiId, qint64 revision,
494 auto type = fbb.CreateString(bufferType.toStdString()); 551 auto type = fbb.CreateString(bufferType.toStdString());
495 auto location = Akonadi2::Commands::CreateDeleteEntity(fbb, revision, entityId, type, replayToSource); 552 auto location = Akonadi2::Commands::CreateDeleteEntity(fbb, revision, entityId, type, replayToSource);
496 Akonadi2::Commands::FinishDeleteEntityBuffer(fbb, location); 553 Akonadi2::Commands::FinishDeleteEntityBuffer(fbb, location);
497 callback(QByteArray::fromRawData(reinterpret_cast<char const *>(fbb.GetBufferPointer()), fbb.GetSize())); 554 callback(BufferUtils::extractBuffer(fbb));
498} 555}
499 556
500void GenericResource::recordRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId, Akonadi2::Storage::Transaction &transaction) 557void GenericResource::recordRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId, Akonadi2::Storage::Transaction &transaction)
diff --git a/common/genericresource.h b/common/genericresource.h
index f47c6f8..d71061c 100644
--- a/common/genericresource.h
+++ b/common/genericresource.h
@@ -48,6 +48,7 @@ public:
48 virtual KAsync::Job<void> synchronizeWithSource(Akonadi2::Storage &mainStore, Akonadi2::Storage &synchronizationStore); 48 virtual KAsync::Job<void> synchronizeWithSource(Akonadi2::Storage &mainStore, Akonadi2::Storage &synchronizationStore);
49 virtual KAsync::Job<void> processAllMessages() Q_DECL_OVERRIDE; 49 virtual KAsync::Job<void> processAllMessages() Q_DECL_OVERRIDE;
50 virtual void setLowerBoundRevision(qint64 revision) Q_DECL_OVERRIDE; 50 virtual void setLowerBoundRevision(qint64 revision) Q_DECL_OVERRIDE;
51 virtual KAsync::Job<void> inspect(int inspectionType, const QByteArray &inspectionId, const QByteArray &domainType, const QByteArray &entityId, const QByteArray &property, const QVariant &expectedValue);
51 52
52 int error() const; 53 int error() const;
53 54
diff --git a/common/inspection.h b/common/inspection.h
new file mode 100644
index 0000000..ecf5b3d
--- /dev/null
+++ b/common/inspection.h
@@ -0,0 +1,60 @@
1/*
2 * Copyright (C) 2015 Christian Mollekopf <chrigi_1@fastmail.fm>
3 *
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Lesser General Public
6 * License as published by the Free Software Foundation; either
7 * version 2.1 of the License, or (at your option) version 3, or any
8 * later version accepted by the membership of KDE e.V. (or its
9 * successor approved by the membership of KDE e.V.), which shall
10 * act as a proxy defined in Section 6 of version 3 of the license.
11 *
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this library. If not, see <http://www.gnu.org/licenses/>.
19 */
20#pragma once
21
22#include <QByteArray>
23#include <QVariant>
24#include "applicationdomaintype.h"
25
26namespace Akonadi2 {
27 namespace Resources {
28
29struct Inspection {
30 static Inspection PropertyInspection(const Akonadi2::ApplicationDomain::Entity &entity, const QByteArray &property, const QVariant &expectedValue)
31 {
32 Inspection inspection;
33 inspection.resourceIdentifier = entity.resourceInstanceIdentifier();
34 inspection.entityIdentifier = entity.identifier();
35 inspection.property = property;
36 inspection.expectedValue = expectedValue;
37 return inspection;
38 }
39
40 static Inspection ExistenceInspection(const Akonadi2::ApplicationDomain::Entity &entity, bool exists)
41 {
42 Inspection inspection;
43 inspection.resourceIdentifier = entity.resourceInstanceIdentifier();
44 inspection.entityIdentifier = entity.identifier();
45 inspection.expectedValue = exists;
46 return inspection;
47 }
48
49 enum Type {
50 PropertyInspectionType,
51 ExistenceInspectionType
52 };
53 QByteArray resourceIdentifier;
54 QByteArray entityIdentifier;
55 QByteArray property;
56 QVariant expectedValue;
57};
58
59 }
60}
diff --git a/common/listener.cpp b/common/listener.cpp
index 1b78f01..fa08472 100644
--- a/common/listener.cpp
+++ b/common/listener.cpp
@@ -31,11 +31,13 @@
31#include "common/synchronize_generated.h" 31#include "common/synchronize_generated.h"
32#include "common/notification_generated.h" 32#include "common/notification_generated.h"
33#include "common/revisionreplayed_generated.h" 33#include "common/revisionreplayed_generated.h"
34#include "common/inspection_generated.h"
34 35
35#include <QLocalServer> 36#include <QLocalServer>
36#include <QLocalSocket> 37#include <QLocalSocket>
37#include <QTimer> 38#include <QTimer>
38#include <QTime> 39#include <QTime>
40#include <QDataStream>
39 41
40Listener::Listener(const QByteArray &resourceInstanceIdentifier, QObject *parent) 42Listener::Listener(const QByteArray &resourceInstanceIdentifier, QObject *parent)
41 : QObject(parent), 43 : QObject(parent),
@@ -203,13 +205,14 @@ void Listener::processClientBuffers()
203 } 205 }
204} 206}
205 207
206void Listener::processCommand(int commandId, uint messageId, const QByteArray &commandBuffer, Client &client, const std::function<void()> &callback) 208void Listener::processCommand(int commandId, uint messageId, const QByteArray &commandBuffer, Client &client, const std::function<void(bool)> &callback)
207{ 209{
210 bool success = true;
208 switch (commandId) { 211 switch (commandId) {
209 case Akonadi2::Commands::HandshakeCommand: { 212 case Akonadi2::Commands::HandshakeCommand: {
210 flatbuffers::Verifier verifier((const uint8_t *)commandBuffer.constData(), commandBuffer.size()); 213 flatbuffers::Verifier verifier((const uint8_t *)commandBuffer.constData(), commandBuffer.size());
211 if (Akonadi2::VerifyHandshakeBuffer(verifier)) { 214 if (Akonadi2::Commands::VerifyHandshakeBuffer(verifier)) {
212 auto buffer = Akonadi2::GetHandshake(commandBuffer.constData()); 215 auto buffer = Akonadi2::Commands::GetHandshake(commandBuffer.constData());
213 client.name = buffer->name()->c_str(); 216 client.name = buffer->name()->c_str();
214 } else { 217 } else {
215 Warning() << "received invalid command"; 218 Warning() << "received invalid command";
@@ -218,8 +221,8 @@ void Listener::processCommand(int commandId, uint messageId, const QByteArray &c
218 } 221 }
219 case Akonadi2::Commands::SynchronizeCommand: { 222 case Akonadi2::Commands::SynchronizeCommand: {
220 flatbuffers::Verifier verifier((const uint8_t *)commandBuffer.constData(), commandBuffer.size()); 223 flatbuffers::Verifier verifier((const uint8_t *)commandBuffer.constData(), commandBuffer.size());
221 if (Akonadi2::VerifySynchronizeBuffer(verifier)) { 224 if (Akonadi2::Commands::VerifySynchronizeBuffer(verifier)) {
222 auto buffer = Akonadi2::GetSynchronize(commandBuffer.constData()); 225 auto buffer = Akonadi2::Commands::GetSynchronize(commandBuffer.constData());
223 Log() << QString("\tSynchronize request (id %1) from %2").arg(messageId).arg(client.name); 226 Log() << QString("\tSynchronize request (id %1) from %2").arg(messageId).arg(client.name);
224 auto timer = QSharedPointer<QTime>::create(); 227 auto timer = QSharedPointer<QTime>::create();
225 timer->start(); 228 timer->start();
@@ -232,7 +235,7 @@ void Listener::processCommand(int commandId, uint messageId, const QByteArray &c
232 } 235 }
233 job.then<void>([callback, timer]() { 236 job.then<void>([callback, timer]() {
234 Trace() << "Sync took " << timer->elapsed(); 237 Trace() << "Sync took " << timer->elapsed();
235 callback(); 238 callback(true);
236 }).exec(); 239 }).exec();
237 return; 240 return;
238 } else { 241 } else {
@@ -240,6 +243,7 @@ void Listener::processCommand(int commandId, uint messageId, const QByteArray &c
240 } 243 }
241 break; 244 break;
242 } 245 }
246 case Akonadi2::Commands::InspectionCommand:
243 case Akonadi2::Commands::FetchEntityCommand: 247 case Akonadi2::Commands::FetchEntityCommand:
244 case Akonadi2::Commands::DeleteEntityCommand: 248 case Akonadi2::Commands::DeleteEntityCommand:
245 case Akonadi2::Commands::ModifyEntityCommand: 249 case Akonadi2::Commands::ModifyEntityCommand:
@@ -273,11 +277,12 @@ void Listener::processCommand(int commandId, uint messageId, const QByteArray &c
273 Log() << QString("\tReceived custom command from %1: ").arg(client.name) << commandId; 277 Log() << QString("\tReceived custom command from %1: ").arg(client.name) << commandId;
274 loadResource()->processCommand(commandId, commandBuffer); 278 loadResource()->processCommand(commandId, commandBuffer);
275 } else { 279 } else {
280 success = false;
276 ErrorMsg() << QString("\tReceived invalid command from %1: ").arg(client.name) << commandId; 281 ErrorMsg() << QString("\tReceived invalid command from %1: ").arg(client.name) << commandId;
277 } 282 }
278 break; 283 break;
279 } 284 }
280 callback(); 285 callback(success);
281} 286}
282 287
283qint64 Listener::lowerBoundRevision() 288qint64 Listener::lowerBoundRevision()
@@ -298,8 +303,8 @@ qint64 Listener::lowerBoundRevision()
298void Listener::quit() 303void Listener::quit()
299{ 304{
300 //Broadcast shutdown notifications to open clients, so they don't try to restart the resource 305 //Broadcast shutdown notifications to open clients, so they don't try to restart the resource
301 auto command = Akonadi2::CreateNotification(m_fbb, Akonadi2::NotificationType::NotificationType_Shutdown); 306 auto command = Akonadi2::Commands::CreateNotification(m_fbb, Akonadi2::Commands::NotificationType::NotificationType_Shutdown);
302 Akonadi2::FinishNotificationBuffer(m_fbb, command); 307 Akonadi2::Commands::FinishNotificationBuffer(m_fbb, command);
303 for (Client &client : m_connections) { 308 for (Client &client : m_connections) {
304 if (client.socket && client.socket->isOpen()) { 309 if (client.socket && client.socket->isOpen()) {
305 Akonadi2::Commands::write(client.socket, ++m_messageId, Akonadi2::Commands::NotificationCommand, m_fbb); 310 Akonadi2::Commands::write(client.socket, ++m_messageId, Akonadi2::Commands::NotificationCommand, m_fbb);
@@ -333,10 +338,10 @@ bool Listener::processClientBuffer(Client &client)
333 auto clientName = client.name; 338 auto clientName = client.name;
334 const QByteArray commandBuffer = client.commandBuffer.left(size); 339 const QByteArray commandBuffer = client.commandBuffer.left(size);
335 client.commandBuffer.remove(0, size); 340 client.commandBuffer.remove(0, size);
336 processCommand(commandId, messageId, commandBuffer, client, [this, messageId, commandId, socket, clientName]() { 341 processCommand(commandId, messageId, commandBuffer, client, [this, messageId, commandId, socket, clientName](bool success) {
337 Log() << QString("\tCompleted command messageid %1 of type \"%2\" from %3").arg(messageId).arg(QString(Akonadi2::Commands::name(commandId))).arg(clientName); 342 Log() << QString("\tCompleted command messageid %1 of type \"%2\" from %3").arg(messageId).arg(QString(Akonadi2::Commands::name(commandId))).arg(clientName);
338 if (socket) { 343 if (socket) {
339 sendCommandCompleted(socket.data(), messageId); 344 sendCommandCompleted(socket.data(), messageId, success);
340 } else { 345 } else {
341 Log() << QString("Socket became invalid before we could send a response. client: %1").arg(clientName); 346 Log() << QString("Socket became invalid before we could send a response. client: %1").arg(clientName);
342 } 347 }
@@ -348,15 +353,15 @@ bool Listener::processClientBuffer(Client &client)
348 return false; 353 return false;
349} 354}
350 355
351void Listener::sendCommandCompleted(QLocalSocket *socket, uint messageId) 356void Listener::sendCommandCompleted(QLocalSocket *socket, uint messageId, bool success)
352{ 357{
353 if (!socket || !socket->isValid()) { 358 if (!socket || !socket->isValid()) {
354 return; 359 return;
355 } 360 }
356 361
357 auto command = Akonadi2::CreateCommandCompletion(m_fbb, messageId); 362 auto command = Akonadi2::Commands::CreateCommandCompletion(m_fbb, messageId, success);
358 Akonadi2::FinishCommandCompletionBuffer(m_fbb, command); 363 Akonadi2::Commands::FinishCommandCompletionBuffer(m_fbb, command);
359 Akonadi2::Commands::write(socket, ++m_messageId, Akonadi2::Commands::CommandCompletion, m_fbb); 364 Akonadi2::Commands::write(socket, ++m_messageId, Akonadi2::Commands::CommandCompletionCommand, m_fbb);
360 m_fbb.Clear(); 365 m_fbb.Clear();
361} 366}
362 367
@@ -367,8 +372,8 @@ void Listener::refreshRevision(qint64 revision)
367 372
368void Listener::updateClientsWithRevision(qint64 revision) 373void Listener::updateClientsWithRevision(qint64 revision)
369{ 374{
370 auto command = Akonadi2::CreateRevisionUpdate(m_fbb, revision); 375 auto command = Akonadi2::Commands::CreateRevisionUpdate(m_fbb, revision);
371 Akonadi2::FinishRevisionUpdateBuffer(m_fbb, command); 376 Akonadi2::Commands::FinishRevisionUpdateBuffer(m_fbb, command);
372 377
373 for (const Client &client: m_connections) { 378 for (const Client &client: m_connections) {
374 if (!client.socket || !client.socket->isValid()) { 379 if (!client.socket || !client.socket->isValid()) {
@@ -381,6 +386,25 @@ void Listener::updateClientsWithRevision(qint64 revision)
381 m_fbb.Clear(); 386 m_fbb.Clear();
382} 387}
383 388
389void Listener::notify(const Akonadi2::Notification &notification)
390{
391 auto messageString = m_fbb.CreateString(notification.message.toUtf8().constData(), notification.message.toUtf8().size());
392 auto idString = m_fbb.CreateString(notification.id.constData(), notification.id.size());
393 Akonadi2::Commands::NotificationBuilder builder(m_fbb);
394 builder.add_type(static_cast<Akonadi2::Commands::NotificationType>(notification.type));
395 builder.add_code(notification.code);
396 builder.add_identifier(idString);
397 builder.add_message(messageString);
398 auto command = builder.Finish();
399 Akonadi2::Commands::FinishNotificationBuffer(m_fbb, command);
400 for (Client &client : m_connections) {
401 if (client.socket && client.socket->isOpen()) {
402 Akonadi2::Commands::write(client.socket, ++m_messageId, Akonadi2::Commands::NotificationCommand, m_fbb);
403 }
404 }
405 m_fbb.Clear();
406}
407
384Akonadi2::Resource *Listener::loadResource() 408Akonadi2::Resource *Listener::loadResource()
385{ 409{
386 if (!m_resource) { 410 if (!m_resource) {
@@ -390,6 +414,8 @@ Akonadi2::Resource *Listener::loadResource()
390 Trace() << QString("\tResource: %1").arg((qlonglong)m_resource); 414 Trace() << QString("\tResource: %1").arg((qlonglong)m_resource);
391 connect(m_resource, &Akonadi2::Resource::revisionUpdated, 415 connect(m_resource, &Akonadi2::Resource::revisionUpdated,
392 this, &Listener::refreshRevision); 416 this, &Listener::refreshRevision);
417 connect(m_resource, &Akonadi2::Resource::notify,
418 this, &Listener::notify);
393 } else { 419 } else {
394 ErrorMsg() << "Failed to load resource " << m_resourceName; 420 ErrorMsg() << "Failed to load resource " << m_resourceName;
395 m_resource = new Akonadi2::Resource; 421 m_resource = new Akonadi2::Resource;
diff --git a/common/listener.h b/common/listener.h
index 248a190..4112a6a 100644
--- a/common/listener.h
+++ b/common/listener.h
@@ -28,6 +28,7 @@
28namespace Akonadi2 28namespace Akonadi2
29{ 29{
30 class Resource; 30 class Resource;
31 class Notification;
31} 32}
32 33
33class QTimer; 34class QTimer;
@@ -76,12 +77,13 @@ private Q_SLOTS:
76 void onDataAvailable(); 77 void onDataAvailable();
77 void processClientBuffers(); 78 void processClientBuffers();
78 void refreshRevision(qint64); 79 void refreshRevision(qint64);
80 void notify(const Akonadi2::Notification &);
79 void quit(); 81 void quit();
80 82
81private: 83private:
82 void processCommand(int commandId, uint messageId, const QByteArray &commandBuffer, Client &client, const std::function<void()> &callback); 84 void processCommand(int commandId, uint messageId, const QByteArray &commandBuffer, Client &client, const std::function<void(bool)> &callback);
83 bool processClientBuffer(Client &client); 85 bool processClientBuffer(Client &client);
84 void sendCommandCompleted(QLocalSocket *socket, uint messageId); 86 void sendCommandCompleted(QLocalSocket *socket, uint messageId, bool success);
85 void updateClientsWithRevision(qint64); 87 void updateClientsWithRevision(qint64);
86 Akonadi2::Resource *loadResource(); 88 Akonadi2::Resource *loadResource();
87 void readFromSocket(QLocalSocket *socket); 89 void readFromSocket(QLocalSocket *socket);
diff --git a/common/log.cpp b/common/log.cpp
index c33c700..489e1bd 100644
--- a/common/log.cpp
+++ b/common/log.cpp
@@ -96,7 +96,7 @@ static QString colorCommand(QList<int> colorCodes)
96 return string; 96 return string;
97} 97}
98 98
99QByteArray debugLevelName(DebugLevel debugLevel) 99QByteArray Akonadi2::Log::debugLevelName(DebugLevel debugLevel)
100{ 100{
101 switch (debugLevel) { 101 switch (debugLevel) {
102 case DebugLevel::Trace: 102 case DebugLevel::Trace:
@@ -114,15 +114,16 @@ QByteArray debugLevelName(DebugLevel debugLevel)
114 return QByteArray(); 114 return QByteArray();
115} 115}
116 116
117DebugLevel debugLevelFromName(const QByteArray &name) 117DebugLevel Akonadi2::Log::debugLevelFromName(const QByteArray &name)
118{ 118{
119 if (name.toLower() == "trace") 119 const QByteArray lowercaseName = name.toLower();
120 if (lowercaseName == "trace")
120 return DebugLevel::Trace; 121 return DebugLevel::Trace;
121 if (name.toLower() == "log") 122 if (lowercaseName == "log")
122 return DebugLevel::Log; 123 return DebugLevel::Log;
123 if (name.toLower() == "warning") 124 if (lowercaseName == "warning")
124 return DebugLevel::Warning; 125 return DebugLevel::Warning;
125 if (name.toLower() == "error") 126 if (lowercaseName == "error")
126 return DebugLevel::Error; 127 return DebugLevel::Error;
127 return DebugLevel::Log; 128 return DebugLevel::Log;
128} 129}
@@ -132,6 +133,11 @@ void Akonadi2::Log::setDebugOutputLevel(DebugLevel debugLevel)
132 qputenv("AKONADI2DEBUGLEVEL", debugLevelName(debugLevel)); 133 qputenv("AKONADI2DEBUGLEVEL", debugLevelName(debugLevel));
133} 134}
134 135
136Akonadi2::Log::DebugLevel Akonadi2::Log::debugOutputLevel()
137{
138 return debugLevelFromName(qgetenv("AKONADI2DEBUGLEVEL"));
139}
140
135QDebug Akonadi2::Log::debugStream(DebugLevel debugLevel, int line, const char* file, const char* function, const char* debugArea) 141QDebug Akonadi2::Log::debugStream(DebugLevel debugLevel, int line, const char* file, const char* function, const char* debugArea)
136{ 142{
137 DebugLevel debugOutputLevel = debugLevelFromName(qgetenv("AKONADI2DEBUGLEVEL")); 143 DebugLevel debugOutputLevel = debugLevelFromName(qgetenv("AKONADI2DEBUGLEVEL"));
diff --git a/common/log.h b/common/log.h
index 9db9e8e..e531348 100644
--- a/common/log.h
+++ b/common/log.h
@@ -12,7 +12,11 @@ enum DebugLevel {
12 Error 12 Error
13}; 13};
14 14
15QByteArray debugLevelName(DebugLevel debugLevel);
16DebugLevel debugLevelFromName(const QByteArray &name);
17
15void setDebugOutputLevel(DebugLevel); 18void setDebugOutputLevel(DebugLevel);
19DebugLevel debugOutputLevel();
16 20
17QDebug debugStream(DebugLevel debugLevel, int line, const char* file, const char* function, const char* debugArea = 0); 21QDebug debugStream(DebugLevel debugLevel, int line, const char* file, const char* function, const char* debugArea = 0);
18 22
diff --git a/common/notification.h b/common/notification.h
new file mode 100644
index 0000000..e1b5bff
--- /dev/null
+++ b/common/notification.h
@@ -0,0 +1,40 @@
1/*
2 * Copyright (c) 2016 Christian Mollekopf <mollekopf@kolabsys.com>
3 *
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Lesser General Public
6 * License as published by the Free Software Foundation; either
7 * version 2.1 of the License, or (at your option) version 3, or any
8 * later version accepted by the membership of KDE e.V. (or its
9 * successor approved by the membership of KDE e.V.), which shall
10 * act as a proxy defined in Section 6 of version 3 of the license.
11 *
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this library. If not, see <http://www.gnu.org/licenses/>.
19 */
20#pragma once
21
22#include <akonadi2common_export.h>
23#include <QString>
24
25namespace Akonadi2
26{
27
28/**
29 * A notification
30 */
31class AKONADI2COMMON_EXPORT Notification
32{
33public:
34 QByteArray id;
35 int type;
36 QString message;
37 int code;
38};
39
40}
diff --git a/common/pipeline.cpp b/common/pipeline.cpp
index 06d8114..a087def 100644
--- a/common/pipeline.cpp
+++ b/common/pipeline.cpp
@@ -34,6 +34,7 @@
34#include "log.h" 34#include "log.h"
35#include "domain/applicationdomaintype.h" 35#include "domain/applicationdomaintype.h"
36#include "definitions.h" 36#include "definitions.h"
37#include "bufferutils.h"
37 38
38namespace Akonadi2 39namespace Akonadi2
39{ 40{
@@ -119,7 +120,7 @@ Storage &Pipeline::storage() const
119 120
120void Pipeline::storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid) 121void Pipeline::storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid)
121{ 122{
122 d->transaction.openDatabase(bufferType + ".main").write(Akonadi2::Storage::assembleKey(uid, newRevision), QByteArray::fromRawData(reinterpret_cast<char const *>(fbb.GetBufferPointer()), fbb.GetSize()), 123 d->transaction.openDatabase(bufferType + ".main").write(Akonadi2::Storage::assembleKey(uid, newRevision), BufferUtils::extractBuffer(fbb),
123 [](const Akonadi2::Storage::Error &error) { 124 [](const Akonadi2::Storage::Error &error) {
124 Warning() << "Failed to write entity"; 125 Warning() << "Failed to write entity";
125 } 126 }
@@ -285,7 +286,7 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size)
285 //Remove deletions 286 //Remove deletions
286 if (modifyEntity->deletions()) { 287 if (modifyEntity->deletions()) {
287 for (const auto &property : *modifyEntity->deletions()) { 288 for (const auto &property : *modifyEntity->deletions()) {
288 newObject->setProperty(QByteArray::fromRawData(property->data(), property->size()), QVariant()); 289 newObject->setProperty(BufferUtils::extractBuffer(property), QVariant());
289 } 290 }
290 } 291 }
291 292
diff --git a/common/query.h b/common/query.h
index 0d0f382..1df32da 100644
--- a/common/query.h
+++ b/common/query.h
@@ -22,42 +22,120 @@
22#include <QByteArrayList> 22#include <QByteArrayList>
23#include <QHash> 23#include <QHash>
24#include <QSet> 24#include <QSet>
25#include "applicationdomaintype.h"
25 26
26namespace Akonadi2 { 27namespace Akonadi2 {
27 28
28/** 29/**
29 * A query that matches a set of objects 30 * A query that matches a set of entities.
30 *
31 * The query will have to be updated regularly similary to the domain objects.
32 * It probably also makes sense to have a domain specific part of the query,
33 * such as what properties we're interested in (necessary information for on-demand
34 * loading of data).
35 *
36 * The query defines:
37 * * what resources to search
38 * * filters on various properties (parent collection, startDate range, ....)
39 * * properties we need (for on-demand querying)
40 *
41 * syncOnDemand: Execute a source sync before executing the query
42 * processAll: Ensure all local messages are processed before querying to guarantee an up-to date dataset.
43 */ 31 */
44class Query 32class Query
45{ 33{
46public: 34public:
47 Query() : syncOnDemand(true), processAll(false), liveQuery(false) {} 35 enum Flag {
48 //Could also be a propertyFilter 36 /** Leave the query running an contiously update the result set. */
37 LiveQuery
38 };
39 Q_DECLARE_FLAGS(Flags, Flag)
40
41 static Query PropertyFilter(const QByteArray &key, const QVariant &value)
42 {
43 Query query;
44 query.propertyFilter.insert(key, value);
45 return query;
46 }
47
48 static Query PropertyFilter(const QByteArray &key, const ApplicationDomain::Entity &entity)
49 {
50 return PropertyFilter(key, QVariant::fromValue(entity.identifier()));
51 }
52
53 static Query ResourceFilter(const QByteArray &identifier)
54 {
55 Query query;
56 query.resources.append(identifier);
57 return query;
58 }
59
60 static Query ResourceFilter(const QByteArrayList &identifier)
61 {
62 Query query;
63 query.resources = identifier;
64 return query;
65 }
66
67 static Query ResourceFilter(const ApplicationDomain::AkonadiResource &entity)
68 {
69 return ResourceFilter(entity.identifier());
70 }
71
72 static Query IdentityFilter(const QByteArray &identifier)
73 {
74 Query query;
75 query.ids << identifier;
76 return query;
77 }
78
79 static Query IdentityFilter(const QByteArrayList &identifier)
80 {
81 Query query;
82 query.ids = identifier;
83 return query;
84 }
85
86 static Query IdentityFilter(const ApplicationDomain::Entity &entity)
87 {
88 return IdentityFilter(entity.identifier());
89 }
90
91 static Query RequestedProperties(const QByteArrayList &properties)
92 {
93 Query query;
94 query.requestedProperties = properties;
95 return query;
96 }
97
98 static Query RequestTree(const QByteArray &parentProperty)
99 {
100 Query query;
101 query.parentProperty = parentProperty;
102 return query;
103 }
104
105 Query(Flags flags = Flags())
106 {}
107
108 Query& operator+=(const Query& rhs)
109 {
110 resources += rhs.resources;
111 ids += rhs.ids;
112 for (auto it = rhs.propertyFilter.constBegin(); it != rhs.propertyFilter.constEnd(); it++) {
113 propertyFilter.insert(it.key(), it.value());
114 }
115 requestedProperties += rhs.requestedProperties;
116 parentProperty = rhs.parentProperty;
117 liveQuery = rhs.liveQuery;
118 syncOnDemand = rhs.syncOnDemand;
119 processAll = rhs.processAll;
120 return *this;
121 }
122
123 friend Query operator+(Query lhs, const Query& rhs)
124 {
125 lhs += rhs;
126 return lhs;
127 }
128
49 QByteArrayList resources; 129 QByteArrayList resources;
50 //Could also be a propertyFilter
51 QByteArrayList ids; 130 QByteArrayList ids;
52 //Filters to apply
53 QHash<QByteArray, QVariant> propertyFilter; 131 QHash<QByteArray, QVariant> propertyFilter;
54 //Properties to retrieve
55 QByteArrayList requestedProperties; 132 QByteArrayList requestedProperties;
56 QByteArray parentProperty; 133 QByteArray parentProperty;
134 bool liveQuery;
57 bool syncOnDemand; 135 bool syncOnDemand;
58 bool processAll; 136 bool processAll;
59 //If live query is false, this query will not continuously be updated
60 bool liveQuery;
61}; 137};
62 138
63} 139}
140
141Q_DECLARE_OPERATORS_FOR_FLAGS(Akonadi2::Query::Flags)
diff --git a/common/resource.h b/common/resource.h
index 4ed21b5..2ae71a0 100644
--- a/common/resource.h
+++ b/common/resource.h
@@ -17,10 +17,12 @@
17 * You should have received a copy of the GNU Lesser General Public 17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this library. If not, see <http://www.gnu.org/licenses/>. 18 * License along with this library. If not, see <http://www.gnu.org/licenses/>.
19 */ 19 */
20#pragma once
20 21
21#include <akonadi2common_export.h> 22#include <akonadi2common_export.h>
22 23
23#include <Async/Async> 24#include <Async/Async>
25#include "notification.h"
24 26
25namespace Akonadi2 27namespace Akonadi2
26{ 28{
@@ -55,6 +57,7 @@ public:
55 57
56Q_SIGNALS: 58Q_SIGNALS:
57 void revisionUpdated(qint64); 59 void revisionUpdated(qint64);
60 void notify(Notification);
58 61
59private: 62private:
60 class Private; 63 class Private;
diff --git a/common/resourceaccess.cpp b/common/resourceaccess.cpp
index 7be1259..6592699 100644
--- a/common/resourceaccess.cpp
+++ b/common/resourceaccess.cpp
@@ -30,19 +30,33 @@
30#include "common/modifyentity_generated.h" 30#include "common/modifyentity_generated.h"
31#include "common/deleteentity_generated.h" 31#include "common/deleteentity_generated.h"
32#include "common/revisionreplayed_generated.h" 32#include "common/revisionreplayed_generated.h"
33#include "common/inspection_generated.h"
33#include "common/entitybuffer.h" 34#include "common/entitybuffer.h"
35#include "common/bufferutils.h"
34#include "log.h" 36#include "log.h"
35 37
36#include <QCoreApplication> 38#include <QCoreApplication>
37#include <QDebug> 39#include <QDebug>
38#include <QDir> 40#include <QDir>
39#include <QProcess> 41#include <QProcess>
42#include <QDataStream>
43#include <QBuffer>
40 44
41#undef Trace 45#undef Trace
42#define Trace() Akonadi2::Log::debugStream(Akonadi2::Log::DebugLevel::Trace, __LINE__, __FILE__, Q_FUNC_INFO, "ResourceAccess") 46#define Trace() Akonadi2::Log::debugStream(Akonadi2::Log::DebugLevel::Trace, __LINE__, __FILE__, Q_FUNC_INFO, "ResourceAccess")
43#undef Log 47#undef Log
44#define Log(IDENTIFIER) Akonadi2::Log::debugStream(Akonadi2::Log::DebugLevel::Log, __LINE__, __FILE__, Q_FUNC_INFO, "ResourceAccess("+IDENTIFIER+")") 48#define Log(IDENTIFIER) Akonadi2::Log::debugStream(Akonadi2::Log::DebugLevel::Log, __LINE__, __FILE__, Q_FUNC_INFO, "ResourceAccess("+IDENTIFIER+")")
45 49
50static void queuedInvoke(const std::function<void()> &f, QObject *context = 0)
51{
52 auto timer = QSharedPointer<QTimer>::create();
53 timer->setSingleShot(true);
54 QObject::connect(timer.data(), &QTimer::timeout, context, [f, timer]() {
55 f();
56 });
57 timer->start(0);
58}
59
46namespace Akonadi2 60namespace Akonadi2
47{ 61{
48 62
@@ -284,8 +298,8 @@ KAsync::Job<void> ResourceAccess::synchronizeResource(bool sourceSync, bool loca
284{ 298{
285 Trace() << "Sending synchronize command: " << sourceSync << localSync; 299 Trace() << "Sending synchronize command: " << sourceSync << localSync;
286 flatbuffers::FlatBufferBuilder fbb; 300 flatbuffers::FlatBufferBuilder fbb;
287 auto command = Akonadi2::CreateSynchronize(fbb, sourceSync, localSync); 301 auto command = Akonadi2::Commands::CreateSynchronize(fbb, sourceSync, localSync);
288 Akonadi2::FinishSynchronizeBuffer(fbb, command); 302 Akonadi2::Commands::FinishSynchronizeBuffer(fbb, command);
289 open(); 303 open();
290 return sendCommand(Commands::SynchronizeCommand, fbb); 304 return sendCommand(Commands::SynchronizeCommand, fbb);
291} 305}
@@ -338,6 +352,25 @@ KAsync::Job<void> ResourceAccess::sendRevisionReplayedCommand(qint64 revision)
338 return sendCommand(Akonadi2::Commands::RevisionReplayedCommand, fbb); 352 return sendCommand(Akonadi2::Commands::RevisionReplayedCommand, fbb);
339} 353}
340 354
355KAsync::Job<void> ResourceAccess::sendInspectionCommand(const QByteArray &inspectionId, const QByteArray &domainType, const QByteArray &entityId, const QByteArray &property, const QVariant &expectedValue)
356{
357 flatbuffers::FlatBufferBuilder fbb;
358 auto id = fbb.CreateString(inspectionId.toStdString());
359 auto domain = fbb.CreateString(domainType.toStdString());
360 auto entity = fbb.CreateString(entityId.toStdString());
361 auto prop = fbb.CreateString(property.toStdString());
362
363 QByteArray array;
364 QDataStream s(&array, QIODevice::WriteOnly);
365 s << expectedValue;
366
367 auto expected = fbb.CreateString(array.toStdString());
368 auto location = Akonadi2::Commands::CreateInspection (fbb, id, 0, entity, domain, prop, expected);
369 Akonadi2::Commands::FinishInspectionBuffer(fbb, location);
370 open();
371 return sendCommand(Akonadi2::Commands::InspectionCommand, fbb);
372}
373
341void ResourceAccess::open() 374void ResourceAccess::open()
342{ 375{
343 if (d->socket && d->socket->isValid()) { 376 if (d->socket && d->socket->isValid()) {
@@ -424,8 +457,8 @@ void ResourceAccess::connected()
424 { 457 {
425 flatbuffers::FlatBufferBuilder fbb; 458 flatbuffers::FlatBufferBuilder fbb;
426 auto name = fbb.CreateString(QString("PID: %1 ResourceAccess: %2").arg(QCoreApplication::applicationPid()).arg(reinterpret_cast<qlonglong>(this)).toLatin1()); 459 auto name = fbb.CreateString(QString("PID: %1 ResourceAccess: %2").arg(QCoreApplication::applicationPid()).arg(reinterpret_cast<qlonglong>(this)).toLatin1());
427 auto command = Akonadi2::CreateHandshake(fbb, name); 460 auto command = Akonadi2::Commands::CreateHandshake(fbb, name);
428 Akonadi2::FinishHandshakeBuffer(fbb, command); 461 Akonadi2::Commands::FinishHandshakeBuffer(fbb, command);
429 Commands::write(d->socket.data(), ++d->messageId, Commands::HandshakeCommand, fbb); 462 Commands::write(d->socket.data(), ++d->messageId, Commands::HandshakeCommand, fbb);
430 } 463 }
431 464
@@ -490,28 +523,49 @@ bool ResourceAccess::processMessageBuffer()
490 523
491 switch (commandId) { 524 switch (commandId) {
492 case Commands::RevisionUpdateCommand: { 525 case Commands::RevisionUpdateCommand: {
493 auto buffer = GetRevisionUpdate(d->partialMessageBuffer.constData() + headerSize); 526 auto buffer = Commands::GetRevisionUpdate(d->partialMessageBuffer.constData() + headerSize);
494 log(QString("Revision updated to: %1").arg(buffer->revision())); 527 log(QString("Revision updated to: %1").arg(buffer->revision()));
495 emit revisionChanged(buffer->revision()); 528 emit revisionChanged(buffer->revision());
496 529
497 break; 530 break;
498 } 531 }
499 case Commands::CommandCompletion: { 532 case Commands::CommandCompletionCommand: {
500 auto buffer = GetCommandCompletion(d->partialMessageBuffer.constData() + headerSize); 533 auto buffer = Commands::GetCommandCompletion(d->partialMessageBuffer.constData() + headerSize);
501 log(QString("Command with messageId %1 completed %2").arg(buffer->id()).arg(buffer->success() ? "sucessfully" : "unsuccessfully")); 534 log(QString("Command with messageId %1 completed %2").arg(buffer->id()).arg(buffer->success() ? "sucessfully" : "unsuccessfully"));
502 535
503 d->completeCommands << buffer->id(); 536 d->completeCommands << buffer->id();
504 //The callbacks can result in this object getting destroyed directly, so we need to ensure we finish our work first 537 //The callbacks can result in this object getting destroyed directly, so we need to ensure we finish our work first
505 QMetaObject::invokeMethod(this, "callCallbacks", Qt::QueuedConnection); 538 queuedInvoke([=]() {
539 d->callCallbacks();
540 }, this);
506 break; 541 break;
507 } 542 }
508 case Commands::NotificationCommand: { 543 case Commands::NotificationCommand: {
509 auto buffer = GetNotification(d->partialMessageBuffer.constData() + headerSize); 544 auto buffer = Commands::GetNotification(d->partialMessageBuffer.constData() + headerSize);
510 switch (buffer->type()) { 545 switch (buffer->type()) {
511 case Akonadi2::NotificationType::NotificationType_Shutdown: 546 case Akonadi2::Commands::NotificationType::NotificationType_Shutdown:
512 Log(d->resourceInstanceIdentifier) << "Received shutdown notification."; 547 Log(d->resourceInstanceIdentifier) << "Received shutdown notification.";
513 close(); 548 close();
514 break; 549 break;
550 case Akonadi2::Commands::NotificationType::NotificationType_Inspection: {
551 Log(d->resourceInstanceIdentifier) << "Received inspection notification.";
552 Notification n;
553 if (buffer->identifier()) {
554 //Don't use fromRawData, the buffer is gone once we invoke emit notification
555 n.id = BufferUtils::extractBufferCopy(buffer->identifier());
556 }
557 if (buffer->message()) {
558 //Don't use fromRawData, the buffer is gone once we invoke emit notification
559 n.message = BufferUtils::extractBufferCopy(buffer->message());
560 }
561 n.type = buffer->type();
562 n.code = buffer->code();
563 //The callbacks can result in this object getting destroyed directly, so we need to ensure we finish our work first
564 queuedInvoke([=]() {
565 emit notification(n);
566 }, this);
567 }
568 break;
515 default: 569 default:
516 Warning() << "Received unknown notification: " << buffer->type(); 570 Warning() << "Received unknown notification: " << buffer->type();
517 break; 571 break;
@@ -526,11 +580,6 @@ bool ResourceAccess::processMessageBuffer()
526 return d->partialMessageBuffer.size() >= headerSize; 580 return d->partialMessageBuffer.size() >= headerSize;
527} 581}
528 582
529void ResourceAccess::callCallbacks()
530{
531 d->callCallbacks();
532}
533
534void ResourceAccess::log(const QString &message) 583void ResourceAccess::log(const QString &message)
535{ 584{
536 Log(d->resourceInstanceIdentifier) << this << message; 585 Log(d->resourceInstanceIdentifier) << this << message;
diff --git a/common/resourceaccess.h b/common/resourceaccess.h
index 7f61b30..2fe83ed 100644
--- a/common/resourceaccess.h
+++ b/common/resourceaccess.h
@@ -27,6 +27,7 @@
27#include <Async/Async> 27#include <Async/Async>
28 28
29#include <flatbuffers/flatbuffers.h> 29#include <flatbuffers/flatbuffers.h>
30#include "notification.h"
30 31
31namespace Akonadi2 32namespace Akonadi2
32{ 33{
@@ -49,10 +50,12 @@ public:
49 virtual KAsync::Job<void> sendModifyCommand(const QByteArray &uid, qint64 revision, const QByteArray &resourceBufferType, const QByteArrayList &deletedProperties, const QByteArray &buffer) { return KAsync::null<void>(); }; 50 virtual KAsync::Job<void> sendModifyCommand(const QByteArray &uid, qint64 revision, const QByteArray &resourceBufferType, const QByteArrayList &deletedProperties, const QByteArray &buffer) { return KAsync::null<void>(); };
50 virtual KAsync::Job<void> sendDeleteCommand(const QByteArray &uid, qint64 revision, const QByteArray &resourceBufferType) { return KAsync::null<void>(); }; 51 virtual KAsync::Job<void> sendDeleteCommand(const QByteArray &uid, qint64 revision, const QByteArray &resourceBufferType) { return KAsync::null<void>(); };
51 virtual KAsync::Job<void> sendRevisionReplayedCommand(qint64 revision) {return KAsync::null<void>(); }; 52 virtual KAsync::Job<void> sendRevisionReplayedCommand(qint64 revision) {return KAsync::null<void>(); };
53 virtual KAsync::Job<void> sendInspectionCommand(const QByteArray &inspectionId, const QByteArray &domainType, const QByteArray &entityId, const QByteArray &property, const QVariant &expecedValue) {return KAsync::null<void>(); };
52 54
53Q_SIGNALS: 55Q_SIGNALS:
54 void ready(bool isReady); 56 void ready(bool isReady);
55 void revisionChanged(qint64 revision); 57 void revisionChanged(qint64 revision);
58 void notification(Notification revision);
56 59
57public Q_SLOTS: 60public Q_SLOTS:
58 virtual void open() = 0; 61 virtual void open() = 0;
@@ -78,6 +81,7 @@ public:
78 KAsync::Job<void> sendModifyCommand(const QByteArray &uid, qint64 revision, const QByteArray &resourceBufferType, const QByteArrayList &deletedProperties, const QByteArray &buffer) Q_DECL_OVERRIDE; 81 KAsync::Job<void> sendModifyCommand(const QByteArray &uid, qint64 revision, const QByteArray &resourceBufferType, const QByteArrayList &deletedProperties, const QByteArray &buffer) Q_DECL_OVERRIDE;
79 KAsync::Job<void> sendDeleteCommand(const QByteArray &uid, qint64 revision, const QByteArray &resourceBufferType) Q_DECL_OVERRIDE; 82 KAsync::Job<void> sendDeleteCommand(const QByteArray &uid, qint64 revision, const QByteArray &resourceBufferType) Q_DECL_OVERRIDE;
80 KAsync::Job<void> sendRevisionReplayedCommand(qint64 revision) Q_DECL_OVERRIDE; 83 KAsync::Job<void> sendRevisionReplayedCommand(qint64 revision) Q_DECL_OVERRIDE;
84 KAsync::Job<void> sendInspectionCommand(const QByteArray &inspectionId, const QByteArray &domainType, const QByteArray &entityId, const QByteArray &property, const QVariant &expecedValue) Q_DECL_OVERRIDE;
81 /** 85 /**
82 * Tries to connect to server, and returns a connected socket on success. 86 * Tries to connect to server, and returns a connected socket on success.
83 */ 87 */
@@ -93,7 +97,6 @@ private Q_SLOTS:
93 void connectionError(QLocalSocket::LocalSocketError error); 97 void connectionError(QLocalSocket::LocalSocketError error);
94 void readResourceMessage(); 98 void readResourceMessage();
95 bool processMessageBuffer(); 99 bool processMessageBuffer();
96 void callCallbacks();
97 100
98private: 101private:
99 void connected(); 102 void connected();
diff --git a/common/resourcefacade.h b/common/resourcefacade.h
index 38e0c0e..ae3037a 100644
--- a/common/resourcefacade.h
+++ b/common/resourcefacade.h
@@ -27,6 +27,7 @@
27 27
28namespace Akonadi2 { 28namespace Akonadi2 {
29 class Query; 29 class Query;
30 class Inspection;
30} 31}
31 32
32class ResourceFacade : public Akonadi2::StoreFacade<Akonadi2::ApplicationDomain::AkonadiResource> 33class ResourceFacade : public Akonadi2::StoreFacade<Akonadi2::ApplicationDomain::AkonadiResource>
@@ -38,5 +39,9 @@ public:
38 KAsync::Job<void> modify(const Akonadi2::ApplicationDomain::AkonadiResource &resource) Q_DECL_OVERRIDE; 39 KAsync::Job<void> modify(const Akonadi2::ApplicationDomain::AkonadiResource &resource) Q_DECL_OVERRIDE;
39 KAsync::Job<void> remove(const Akonadi2::ApplicationDomain::AkonadiResource &resource) Q_DECL_OVERRIDE; 40 KAsync::Job<void> remove(const Akonadi2::ApplicationDomain::AkonadiResource &resource) Q_DECL_OVERRIDE;
40 QPair<KAsync::Job<void>, typename Akonadi2::ResultEmitter<Akonadi2::ApplicationDomain::AkonadiResource::Ptr>::Ptr > load(const Akonadi2::Query &query) Q_DECL_OVERRIDE; 41 QPair<KAsync::Job<void>, typename Akonadi2::ResultEmitter<Akonadi2::ApplicationDomain::AkonadiResource::Ptr>::Ptr > load(const Akonadi2::Query &query) Q_DECL_OVERRIDE;
42 KAsync::Job<void> inspect(const Akonadi2::Inspection &domainObject) Q_DECL_OVERRIDE
43 {
44 return KAsync::error<void>(-1, "Failed to inspect.");
45 }
41}; 46};
42 47
diff --git a/common/resultprovider.h b/common/resultprovider.h
index 7f090e9..a064ab5 100644
--- a/common/resultprovider.h
+++ b/common/resultprovider.h
@@ -346,8 +346,14 @@ public:
346 emitter->onRemoved([this](const DomainType &value) { 346 emitter->onRemoved([this](const DomainType &value) {
347 this->remove(value); 347 this->remove(value);
348 }); 348 });
349 emitter->onInitialResultSetComplete([this](const DomainType &value) { 349 auto ptr = emitter.data();
350 this->initialResultSetComplete(value); 350 emitter->onInitialResultSetComplete([this, ptr](const DomainType &parent) {
351 auto hashValue = qHash(parent);
352 mInitialResultSetInProgress.remove(hashValue, ptr);
353 //Normally a parent is only in a single resource, except the toplevel (invalid) parent
354 if (!mInitialResultSetInProgress.contains(hashValue)) {
355 this->initialResultSetComplete(parent);
356 }
351 }); 357 });
352 emitter->onComplete([this]() { 358 emitter->onComplete([this]() {
353 this->complete(); 359 this->complete();
@@ -365,6 +371,7 @@ public:
365 this->initialResultSetComplete(parent); 371 this->initialResultSetComplete(parent);
366 } else { 372 } else {
367 for (const auto &emitter : mEmitter) { 373 for (const auto &emitter : mEmitter) {
374 mInitialResultSetInProgress.insert(qHash(parent), emitter.data());
368 emitter->fetch(parent); 375 emitter->fetch(parent);
369 } 376 }
370 } 377 }
@@ -372,6 +379,7 @@ public:
372 379
373private: 380private:
374 QList<typename ResultEmitter<DomainType>::Ptr> mEmitter; 381 QList<typename ResultEmitter<DomainType>::Ptr> mEmitter;
382 QMultiMap<qint64, ResultEmitter<DomainType>*> mInitialResultSetInProgress;
375}; 383};
376 384
377 385