summaryrefslogtreecommitdiffstats
path: root/common
diff options
context:
space:
mode:
Diffstat (limited to 'common')
-rw-r--r--common/changereplay.cpp28
-rw-r--r--common/changereplay.h1
-rw-r--r--common/commands/notification.fbs7
-rw-r--r--common/configstore.cpp12
-rw-r--r--common/definitions.cpp14
-rw-r--r--common/definitions.h2
-rw-r--r--common/domain/applicationdomaintype.cpp16
-rw-r--r--common/domain/applicationdomaintype.h62
-rw-r--r--common/domain/folder.cpp4
-rw-r--r--common/domain/mail.cpp4
-rw-r--r--common/domainadaptor.h15
-rw-r--r--common/entityreader.cpp50
-rw-r--r--common/facade.cpp9
-rw-r--r--common/facade.h3
-rw-r--r--common/genericresource.cpp103
-rw-r--r--common/genericresource.h5
-rw-r--r--common/index.cpp9
-rw-r--r--common/index.h2
-rw-r--r--common/listener.cpp112
-rw-r--r--common/listener.h12
-rw-r--r--common/log.cpp104
-rw-r--r--common/log.h28
-rw-r--r--common/mailpreprocessor.cpp20
-rw-r--r--common/messagequeue.cpp6
-rw-r--r--common/modelresult.cpp19
-rw-r--r--common/notification.h13
-rw-r--r--common/pipeline.cpp90
-rw-r--r--common/pipeline.h4
-rw-r--r--common/queryrunner.cpp44
-rw-r--r--common/queryrunner.h4
-rw-r--r--common/remoteidmap.cpp4
-rw-r--r--common/resource.h2
-rw-r--r--common/resourceaccess.cpp144
-rw-r--r--common/resourceaccess.h16
-rw-r--r--common/resourceconfig.cpp4
-rw-r--r--common/resourcecontrol.cpp21
-rw-r--r--common/resourcefacade.cpp278
-rw-r--r--common/resourcefacade.h29
-rw-r--r--common/sourcewriteback.cpp24
-rw-r--r--common/specialpurposepreprocessor.cpp4
-rw-r--r--common/storage.h27
-rw-r--r--common/storage_common.cpp12
-rw-r--r--common/storage_lmdb.cpp61
-rw-r--r--common/store.cpp55
-rw-r--r--common/synchronizer.cpp38
-rw-r--r--common/test.cpp39
-rw-r--r--common/typeindex.cpp21
47 files changed, 999 insertions, 582 deletions
diff --git a/common/changereplay.cpp b/common/changereplay.cpp
index 0096bd0..4b7d593 100644
--- a/common/changereplay.cpp
+++ b/common/changereplay.cpp
@@ -26,13 +26,12 @@
26 26
27using namespace Sink; 27using namespace Sink;
28 28
29#undef DEBUG_AREA 29SINK_DEBUG_AREA("changereplay");
30#define DEBUG_AREA "resource.changereplay"
31 30
32ChangeReplay::ChangeReplay(const QByteArray &resourceName) 31ChangeReplay::ChangeReplay(const QByteArray &resourceName)
33 : mStorage(storageLocation(), resourceName, Storage::ReadOnly), mChangeReplayStore(storageLocation(), resourceName + ".changereplay", Storage::ReadWrite), mReplayInProgress(false) 32 : mStorage(storageLocation(), resourceName, Storage::ReadOnly), mChangeReplayStore(storageLocation(), resourceName + ".changereplay", Storage::ReadWrite), mReplayInProgress(false)
34{ 33{
35 Trace() << "Created change replay: " << resourceName; 34 SinkTrace() << "Created change replay: " << resourceName;
36} 35}
37 36
38qint64 ChangeReplay::getLastReplayedRevision() 37qint64 ChangeReplay::getLastReplayedRevision()
@@ -51,10 +50,10 @@ qint64 ChangeReplay::getLastReplayedRevision()
51bool ChangeReplay::allChangesReplayed() 50bool ChangeReplay::allChangesReplayed()
52{ 51{
53 const qint64 topRevision = Storage::maxRevision(mStorage.createTransaction(Storage::ReadOnly, [](const Sink::Storage::Error &error) { 52 const qint64 topRevision = Storage::maxRevision(mStorage.createTransaction(Storage::ReadOnly, [](const Sink::Storage::Error &error) {
54 Warning() << error.message; 53 SinkWarning() << error.message;
55 })); 54 }));
56 const qint64 lastReplayedRevision = getLastReplayedRevision(); 55 const qint64 lastReplayedRevision = getLastReplayedRevision();
57 Trace() << "All changes replayed " << topRevision << lastReplayedRevision; 56 SinkTrace() << "All changes replayed " << topRevision << lastReplayedRevision;
58 return (lastReplayedRevision >= topRevision); 57 return (lastReplayedRevision >= topRevision);
59} 58}
60 59
@@ -62,10 +61,10 @@ KAsync::Job<void> ChangeReplay::replayNextRevision()
62{ 61{
63 mReplayInProgress = true; 62 mReplayInProgress = true;
64 auto mainStoreTransaction = mStorage.createTransaction(Storage::ReadOnly, [](const Sink::Storage::Error &error) { 63 auto mainStoreTransaction = mStorage.createTransaction(Storage::ReadOnly, [](const Sink::Storage::Error &error) {
65 Warning() << error.message; 64 SinkWarning() << error.message;
66 }); 65 });
67 auto replayStoreTransaction = mChangeReplayStore.createTransaction(Storage::ReadOnly, [](const Sink::Storage::Error &error) { 66 auto replayStoreTransaction = mChangeReplayStore.createTransaction(Storage::ReadOnly, [](const Sink::Storage::Error &error) {
68 Warning() << error.message; 67 SinkWarning() << error.message;
69 }); 68 });
70 qint64 lastReplayedRevision = 0; 69 qint64 lastReplayedRevision = 0;
71 replayStoreTransaction.openDatabase().scan("lastReplayedRevision", 70 replayStoreTransaction.openDatabase().scan("lastReplayedRevision",
@@ -78,14 +77,14 @@ KAsync::Job<void> ChangeReplay::replayNextRevision()
78 77
79 auto recordReplayedRevision = [this](qint64 revision) { 78 auto recordReplayedRevision = [this](qint64 revision) {
80 auto replayStoreTransaction = mChangeReplayStore.createTransaction(Storage::ReadWrite, [](const Sink::Storage::Error &error) { 79 auto replayStoreTransaction = mChangeReplayStore.createTransaction(Storage::ReadWrite, [](const Sink::Storage::Error &error) {
81 Warning() << error.message; 80 SinkWarning() << error.message;
82 }); 81 });
83 replayStoreTransaction.openDatabase().write("lastReplayedRevision", QByteArray::number(revision)); 82 replayStoreTransaction.openDatabase().write("lastReplayedRevision", QByteArray::number(revision));
84 replayStoreTransaction.commit(); 83 replayStoreTransaction.commit();
85 }; 84 };
86 85
87 if (lastReplayedRevision < topRevision) { 86 if (lastReplayedRevision < topRevision) {
88 Trace() << "Changereplay from " << lastReplayedRevision << " to " << topRevision; 87 SinkTrace() << "Changereplay from " << lastReplayedRevision << " to " << topRevision;
89 qint64 revision = lastReplayedRevision + 1; 88 qint64 revision = lastReplayedRevision + 1;
90 const auto uid = Storage::getUidFromRevision(mainStoreTransaction, revision); 89 const auto uid = Storage::getUidFromRevision(mainStoreTransaction, revision);
91 const auto type = Storage::getTypeFromRevision(mainStoreTransaction, revision); 90 const auto type = Storage::getTypeFromRevision(mainStoreTransaction, revision);
@@ -94,25 +93,25 @@ KAsync::Job<void> ChangeReplay::replayNextRevision()
94 Storage::mainDatabase(mainStoreTransaction, type) 93 Storage::mainDatabase(mainStoreTransaction, type)
95 .scan(key, 94 .scan(key,
96 [&lastReplayedRevision, type, this, &replayJob](const QByteArray &key, const QByteArray &value) -> bool { 95 [&lastReplayedRevision, type, this, &replayJob](const QByteArray &key, const QByteArray &value) -> bool {
97 Trace() << "Replaying " << key; 96 SinkTrace() << "Replaying " << key;
98 replayJob = replay(type, key, value); 97 replayJob = replay(type, key, value);
99 return false; 98 return false;
100 }, 99 },
101 [key](const Storage::Error &) { ErrorMsg() << "Failed to replay change " << key; }); 100 [key](const Storage::Error &) { SinkError() << "Failed to replay change " << key; });
102 return replayJob.then<void>([this, revision, recordReplayedRevision]() { 101 return replayJob.then<void>([this, revision, recordReplayedRevision]() {
103 Trace() << "Replayed until " << revision; 102 SinkTrace() << "Replayed until " << revision;
104 recordReplayedRevision(revision); 103 recordReplayedRevision(revision);
105 //replay until we're done 104 //replay until we're done
106 replayNextRevision().exec(); 105 replayNextRevision().exec();
107 }, 106 },
108 [this, revision, recordReplayedRevision](int, QString) { 107 [this, revision, recordReplayedRevision](int, QString) {
109 Trace() << "Change replay failed" << revision; 108 SinkTrace() << "Change replay failed" << revision;
110 //We're probably not online or so, so postpone retrying 109 //We're probably not online or so, so postpone retrying
111 mReplayInProgress = false; 110 mReplayInProgress = false;
112 emit changesReplayed(); 111 emit changesReplayed();
113 }); 112 });
114 } else { 113 } else {
115 Trace() << "No changes to replay"; 114 SinkTrace() << "No changes to replay";
116 mReplayInProgress = false; 115 mReplayInProgress = false;
117 emit changesReplayed(); 116 emit changesReplayed();
118 } 117 }
@@ -122,6 +121,7 @@ KAsync::Job<void> ChangeReplay::replayNextRevision()
122void ChangeReplay::revisionChanged() 121void ChangeReplay::revisionChanged()
123{ 122{
124 if (!mReplayInProgress) { 123 if (!mReplayInProgress) {
124 emit replayingChanges();
125 replayNextRevision().exec(); 125 replayNextRevision().exec();
126 } 126 }
127} 127}
diff --git a/common/changereplay.h b/common/changereplay.h
index aba3dd0..6c1c1db 100644
--- a/common/changereplay.h
+++ b/common/changereplay.h
@@ -45,6 +45,7 @@ public:
45 45
46signals: 46signals:
47 void changesReplayed(); 47 void changesReplayed();
48 void replayingChanges();
48 49
49public slots: 50public slots:
50 void revisionChanged(); 51 void revisionChanged();
diff --git a/common/commands/notification.fbs b/common/commands/notification.fbs
index 8750ff5..c82fad3 100644
--- a/common/commands/notification.fbs
+++ b/common/commands/notification.fbs
@@ -1,13 +1,10 @@
1namespace Sink.Commands; 1namespace Sink.Commands;
2 2
3enum NotificationType : byte { Shutdown = 1, Status, Warning, Progress, Inspection, RevisionUpdate }
4enum NotificationCode : byte { Success = 0, Failure = 1, UserCode }
5
6table Notification { 3table Notification {
7 type: NotificationType = Status; 4 type: int = 0; //See notification.h
8 identifier: string; //An identifier that links back to the something related to the notification (e.g. an entity id or a command id) 5 identifier: string; //An identifier that links back to the something related to the notification (e.g. an entity id or a command id)
9 message: string; 6 message: string;
10 code: int = 0; //Of type NotificationCode 7 code: int = 0; //See notification.h
11} 8}
12 9
13root_type Notification; 10root_type Notification;
diff --git a/common/configstore.cpp b/common/configstore.cpp
index a8469ba..9a68662 100644
--- a/common/configstore.cpp
+++ b/common/configstore.cpp
@@ -20,13 +20,15 @@
20 20
21#include <QSettings> 21#include <QSettings>
22#include <QSharedPointer> 22#include <QSharedPointer>
23#include <QStandardPaths>
24#include <QFile> 23#include <QFile>
25#include <log.h> 24#include <log.h>
25#include <definitions.h>
26
27SINK_DEBUG_AREA("configstore")
26 28
27static QSharedPointer<QSettings> getConfig(const QByteArray &identifier) 29static QSharedPointer<QSettings> getConfig(const QByteArray &identifier)
28{ 30{
29 return QSharedPointer<QSettings>::create(QStandardPaths::writableLocation(QStandardPaths::GenericDataLocation) + "/sink/" + identifier + ".ini", QSettings::IniFormat); 31 return QSharedPointer<QSettings>::create(Sink::configLocation() + "/" + identifier + ".ini", QSettings::IniFormat);
30} 32}
31 33
32ConfigStore::ConfigStore(const QByteArray &identifier) 34ConfigStore::ConfigStore(const QByteArray &identifier)
@@ -50,7 +52,7 @@ QMap<QByteArray, QByteArray> ConfigStore::getEntries()
50 52
51void ConfigStore::add(const QByteArray &identifier, const QByteArray &type) 53void ConfigStore::add(const QByteArray &identifier, const QByteArray &type)
52{ 54{
53 Trace() << "Adding " << identifier; 55 SinkTrace() << "Adding " << identifier;
54 mConfig->beginGroup(QString::fromLatin1(identifier)); 56 mConfig->beginGroup(QString::fromLatin1(identifier));
55 mConfig->setValue("type", type); 57 mConfig->setValue("type", type);
56 mConfig->endGroup(); 58 mConfig->endGroup();
@@ -59,7 +61,7 @@ void ConfigStore::add(const QByteArray &identifier, const QByteArray &type)
59 61
60void ConfigStore::remove(const QByteArray &identifier) 62void ConfigStore::remove(const QByteArray &identifier)
61{ 63{
62 Trace() << "Removing " << identifier; 64 SinkTrace() << "Removing " << identifier;
63 mConfig->beginGroup(QString::fromLatin1(identifier)); 65 mConfig->beginGroup(QString::fromLatin1(identifier));
64 mConfig->remove(""); 66 mConfig->remove("");
65 mConfig->endGroup(); 67 mConfig->endGroup();
@@ -75,7 +77,7 @@ void ConfigStore::clear()
75 77
76void ConfigStore::modify(const QByteArray &identifier, const QMap<QByteArray, QVariant> &configuration) 78void ConfigStore::modify(const QByteArray &identifier, const QMap<QByteArray, QVariant> &configuration)
77{ 79{
78 Trace() << "Modifying " << identifier; 80 SinkTrace() << "Modifying " << identifier;
79 auto config = getConfig(identifier); 81 auto config = getConfig(identifier);
80 config->clear(); 82 config->clear();
81 for (const auto &key : configuration.keys()) { 83 for (const auto &key : configuration.keys()) {
diff --git a/common/definitions.cpp b/common/definitions.cpp
index 362faf7..3fc4700 100644
--- a/common/definitions.cpp
+++ b/common/definitions.cpp
@@ -25,12 +25,22 @@
25 25
26QString Sink::storageLocation() 26QString Sink::storageLocation()
27{ 27{
28 return QStandardPaths::writableLocation(QStandardPaths::GenericDataLocation) + "/sink/storage"; 28 return dataLocation() + "/storage";
29}
30
31QString Sink::dataLocation()
32{
33 return QStandardPaths::writableLocation(QStandardPaths::GenericDataLocation) + "/sink";
34}
35
36QString Sink::configLocation()
37{
38 return QStandardPaths::writableLocation(QStandardPaths::GenericConfigLocation) + "/sink";
29} 39}
30 40
31QString Sink::temporaryFileLocation() 41QString Sink::temporaryFileLocation()
32{ 42{
33 auto path = QStandardPaths::writableLocation(QStandardPaths::GenericDataLocation) + "/sink/temporaryFiles"; 43 auto path = dataLocation() + "/temporaryFiles";
34 //FIXME create in a singleton on startup? 44 //FIXME create in a singleton on startup?
35 QDir dir; 45 QDir dir;
36 dir.mkpath(path); 46 dir.mkpath(path);
diff --git a/common/definitions.h b/common/definitions.h
index 1008235..e8cd45e 100644
--- a/common/definitions.h
+++ b/common/definitions.h
@@ -26,6 +26,8 @@
26 26
27namespace Sink { 27namespace Sink {
28QString SINK_EXPORT storageLocation(); 28QString SINK_EXPORT storageLocation();
29QString SINK_EXPORT dataLocation();
30QString SINK_EXPORT configLocation();
29QString SINK_EXPORT temporaryFileLocation(); 31QString SINK_EXPORT temporaryFileLocation();
30QString SINK_EXPORT resourceStorageLocation(const QByteArray &resourceInstanceIdentifier); 32QString SINK_EXPORT resourceStorageLocation(const QByteArray &resourceInstanceIdentifier);
31} 33}
diff --git a/common/domain/applicationdomaintype.cpp b/common/domain/applicationdomaintype.cpp
index 44eeb13..ce113c2 100644
--- a/common/domain/applicationdomaintype.cpp
+++ b/common/domain/applicationdomaintype.cpp
@@ -24,6 +24,8 @@
24#include "storage.h" //for generateUid() 24#include "storage.h" //for generateUid()
25#include <QFile> 25#include <QFile>
26 26
27SINK_DEBUG_AREA("applicationdomaintype");
28
27namespace Sink { 29namespace Sink {
28namespace ApplicationDomain { 30namespace ApplicationDomain {
29 31
@@ -82,7 +84,7 @@ QVariant ApplicationDomainType::getProperty(const QByteArray &key) const
82{ 84{
83 Q_ASSERT(mAdaptor); 85 Q_ASSERT(mAdaptor);
84 if (!mAdaptor->availableProperties().contains(key)) { 86 if (!mAdaptor->availableProperties().contains(key)) {
85 Warning() << "No such property available " << key; 87 SinkWarning() << "No such property available " << key;
86 } 88 }
87 return mAdaptor->getProperty(key); 89 return mAdaptor->getProperty(key);
88} 90}
@@ -105,7 +107,7 @@ QByteArray ApplicationDomainType::getBlobProperty(const QByteArray &key) const
105 const auto path = getProperty(key).toByteArray(); 107 const auto path = getProperty(key).toByteArray();
106 QFile file(path); 108 QFile file(path);
107 if (!file.open(QIODevice::ReadOnly)) { 109 if (!file.open(QIODevice::ReadOnly)) {
108 ErrorMsg() << "Failed to open the file: " << file.errorString() << path; 110 SinkError() << "Failed to open the file: " << file.errorString() << path;
109 return QByteArray(); 111 return QByteArray();
110 } 112 }
111 return file.readAll(); 113 return file.readAll();
@@ -116,7 +118,7 @@ void ApplicationDomainType::setBlobProperty(const QByteArray &key, const QByteAr
116 const auto path = Sink::temporaryFileLocation() + "/" + QUuid::createUuid().toString(); 118 const auto path = Sink::temporaryFileLocation() + "/" + QUuid::createUuid().toString();
117 QFile file(path); 119 QFile file(path);
118 if (!file.open(QIODevice::WriteOnly)) { 120 if (!file.open(QIODevice::WriteOnly)) {
119 ErrorMsg() << "Failed to open the file: " << file.errorString() << path; 121 SinkError() << "Failed to open the file: " << file.errorString() << path;
120 return; 122 return;
121 } 123 }
122 file.write(value); 124 file.write(value);
@@ -251,7 +253,7 @@ namespace DummyResource {
251 SinkResource create(const QByteArray &account) 253 SinkResource create(const QByteArray &account)
252 { 254 {
253 auto &&resource = ApplicationDomainType::createEntity<SinkResource>(); 255 auto &&resource = ApplicationDomainType::createEntity<SinkResource>();
254 resource.setProperty("type", "org.kde.dummy"); 256 resource.setProperty("type", "sink.dummy");
255 resource.setProperty("account", account); 257 resource.setProperty("account", account);
256 resource.setProperty("capabilities", QVariant::fromValue(QByteArrayList() << ResourceCapabilities::Mail::storage << "-folder.rename")); 258 resource.setProperty("capabilities", QVariant::fromValue(QByteArrayList() << ResourceCapabilities::Mail::storage << "-folder.rename"));
257 // resource.setProperty("capabilities", QVariant::fromValue(QByteArrayList() << ResourceCapabilities::Mail::storage << ResourceCapabilities::Mail::drafts << "-folder.rename" << ResourceCapabilities::Mail::trash)); 259 // resource.setProperty("capabilities", QVariant::fromValue(QByteArrayList() << ResourceCapabilities::Mail::storage << ResourceCapabilities::Mail::drafts << "-folder.rename" << ResourceCapabilities::Mail::trash));
@@ -263,7 +265,7 @@ namespace MaildirResource {
263 SinkResource create(const QByteArray &account) 265 SinkResource create(const QByteArray &account)
264 { 266 {
265 auto &&resource = ApplicationDomainType::createEntity<SinkResource>(); 267 auto &&resource = ApplicationDomainType::createEntity<SinkResource>();
266 resource.setProperty("type", "org.kde.maildir"); 268 resource.setProperty("type", "sink.maildir");
267 resource.setProperty("account", account); 269 resource.setProperty("account", account);
268 resource.setProperty("capabilities", QVariant::fromValue(QByteArrayList() << ResourceCapabilities::Mail::storage << ResourceCapabilities::Mail::drafts << "-folder.rename" << ResourceCapabilities::Mail::trash)); 270 resource.setProperty("capabilities", QVariant::fromValue(QByteArrayList() << ResourceCapabilities::Mail::storage << ResourceCapabilities::Mail::drafts << "-folder.rename" << ResourceCapabilities::Mail::trash));
269 return resource; 271 return resource;
@@ -274,7 +276,7 @@ namespace MailtransportResource {
274 SinkResource create(const QByteArray &account) 276 SinkResource create(const QByteArray &account)
275 { 277 {
276 auto &&resource = ApplicationDomainType::createEntity<SinkResource>(); 278 auto &&resource = ApplicationDomainType::createEntity<SinkResource>();
277 resource.setProperty("type", "org.kde.mailtransport"); 279 resource.setProperty("type", "sink.mailtransport");
278 resource.setProperty("account", account); 280 resource.setProperty("account", account);
279 resource.setProperty("capabilities", QVariant::fromValue(QByteArrayList() << ResourceCapabilities::Mail::transport)); 281 resource.setProperty("capabilities", QVariant::fromValue(QByteArrayList() << ResourceCapabilities::Mail::transport));
280 return resource; 282 return resource;
@@ -285,7 +287,7 @@ namespace ImapResource {
285 SinkResource create(const QByteArray &account) 287 SinkResource create(const QByteArray &account)
286 { 288 {
287 auto &&resource = ApplicationDomainType::createEntity<SinkResource>(); 289 auto &&resource = ApplicationDomainType::createEntity<SinkResource>();
288 resource.setProperty("type", "org.kde.imap"); 290 resource.setProperty("type", "sink.imap");
289 resource.setProperty("account", account); 291 resource.setProperty("account", account);
290 resource.setProperty("capabilities", QVariant::fromValue(QByteArrayList() << ResourceCapabilities::Mail::storage << ResourceCapabilities::Mail::drafts << ResourceCapabilities::Mail::folderhierarchy << ResourceCapabilities::Mail::trash)); 292 resource.setProperty("capabilities", QVariant::fromValue(QByteArrayList() << ResourceCapabilities::Mail::storage << ResourceCapabilities::Mail::drafts << ResourceCapabilities::Mail::folderhierarchy << ResourceCapabilities::Mail::trash));
291 return resource; 293 return resource;
diff --git a/common/domain/applicationdomaintype.h b/common/domain/applicationdomaintype.h
index 849c3e2..5efb936 100644
--- a/common/domain/applicationdomaintype.h
+++ b/common/domain/applicationdomaintype.h
@@ -52,6 +52,13 @@
52 void setExtracted##NAME(const TYPE &value) { setProperty(NAME::name, QVariant::fromValue(value)); } \ 52 void setExtracted##NAME(const TYPE &value) { setProperty(NAME::name, QVariant::fromValue(value)); } \
53 TYPE get##NAME() const { return getProperty(NAME::name).value<TYPE>(); } \ 53 TYPE get##NAME() const { return getProperty(NAME::name).value<TYPE>(); } \
54 54
55#define SINK_STATUS_PROPERTY(TYPE, NAME, LOWERCASENAME) \
56 struct NAME { \
57 static constexpr const char *name = #LOWERCASENAME; \
58 typedef TYPE Type; \
59 }; \
60 void setStatus##NAME(const TYPE &value) { setProperty(NAME::name, QVariant::fromValue(value)); } \
61 TYPE get##NAME() const { return getProperty(NAME::name).value<TYPE>(); } \
55 62
56#define SINK_BLOB_PROPERTY(NAME, LOWERCASENAME) \ 63#define SINK_BLOB_PROPERTY(NAME, LOWERCASENAME) \
57 struct NAME { \ 64 struct NAME { \
@@ -76,6 +83,14 @@
76namespace Sink { 83namespace Sink {
77namespace ApplicationDomain { 84namespace ApplicationDomain {
78 85
86struct SINK_EXPORT Error {
87
88};
89
90struct SINK_EXPORT Progress {
91
92};
93
79/** 94/**
80 * The domain type interface has two purposes: 95 * The domain type interface has two purposes:
81 * * provide a unified interface to read buffers (for zero-copy reading) 96 * * provide a unified interface to read buffers (for zero-copy reading)
@@ -215,6 +230,38 @@ struct SINK_EXPORT Mail : public Entity {
215}; 230};
216 231
217/** 232/**
233 * The status of an account or resource.
234 *
235 * It is set as follows:
236 * * By default the status is offline.
237 * * If a connection to the server could be established the status is Connected.
238 * * If an error occurred that keeps the resource from operating (so non transient), the resource enters the error state.
239 * * If a long running operation is started the resource goes to the busy state (and return to the previous state after that).
240 */
241enum SINK_EXPORT Status {
242 OfflineStatus,
243 ConnectedStatus,
244 BusyStatus,
245 ErrorStatus
246};
247
248struct SINK_EXPORT SinkAccount : public ApplicationDomainType {
249 typedef QSharedPointer<SinkAccount> Ptr;
250 explicit SinkAccount(const QByteArray &resourceInstanceIdentifier, const QByteArray &identifier, qint64 revision, const QSharedPointer<BufferAdaptor> &adaptor);
251 explicit SinkAccount(const QByteArray &identifier);
252 SinkAccount();
253 virtual ~SinkAccount();
254
255 SINK_PROPERTY(QString, Name, name);
256 SINK_PROPERTY(QString, Icon, icon);
257 SINK_PROPERTY(QString, AccountType, accountType);
258 SINK_STATUS_PROPERTY(int, Status, status);
259 SINK_STATUS_PROPERTY(ApplicationDomain::Error, Error, error);
260 SINK_STATUS_PROPERTY(ApplicationDomain::Progress, Progress, progress);
261};
262
263
264/**
218 * Represents an sink resource. 265 * Represents an sink resource.
219 * 266 *
220 * This type is used for configuration of resources, 267 * This type is used for configuration of resources,
@@ -226,14 +273,13 @@ struct SINK_EXPORT SinkResource : public ApplicationDomainType {
226 explicit SinkResource(const QByteArray &identifier); 273 explicit SinkResource(const QByteArray &identifier);
227 SinkResource(); 274 SinkResource();
228 virtual ~SinkResource(); 275 virtual ~SinkResource();
229};
230 276
231struct SINK_EXPORT SinkAccount : public ApplicationDomainType { 277 SINK_REFERENCE_PROPERTY(SinkAccount, Account, account);
232 typedef QSharedPointer<SinkAccount> Ptr; 278 SINK_PROPERTY(QString, ResourceType, resourceType);
233 explicit SinkAccount(const QByteArray &resourceInstanceIdentifier, const QByteArray &identifier, qint64 revision, const QSharedPointer<BufferAdaptor> &adaptor); 279 SINK_PROPERTY(QByteArrayList, Capabilities, capabilities);
234 explicit SinkAccount(const QByteArray &identifier); 280 SINK_STATUS_PROPERTY(int, Status, status);
235 SinkAccount(); 281 SINK_STATUS_PROPERTY(ApplicationDomain::Error, Error, error);
236 virtual ~SinkAccount(); 282 SINK_STATUS_PROPERTY(ApplicationDomain::Progress, Progress, progress);
237}; 283};
238 284
239struct SINK_EXPORT Identity : public ApplicationDomainType { 285struct SINK_EXPORT Identity : public ApplicationDomainType {
@@ -330,3 +376,5 @@ Q_DECLARE_METATYPE(Sink::ApplicationDomain::SinkAccount)
330Q_DECLARE_METATYPE(Sink::ApplicationDomain::SinkAccount::Ptr) 376Q_DECLARE_METATYPE(Sink::ApplicationDomain::SinkAccount::Ptr)
331Q_DECLARE_METATYPE(Sink::ApplicationDomain::Identity) 377Q_DECLARE_METATYPE(Sink::ApplicationDomain::Identity)
332Q_DECLARE_METATYPE(Sink::ApplicationDomain::Identity::Ptr) 378Q_DECLARE_METATYPE(Sink::ApplicationDomain::Identity::Ptr)
379Q_DECLARE_METATYPE(Sink::ApplicationDomain::Error)
380Q_DECLARE_METATYPE(Sink::ApplicationDomain::Progress)
diff --git a/common/domain/folder.cpp b/common/domain/folder.cpp
index 309ca3f..ddb0c10 100644
--- a/common/domain/folder.cpp
+++ b/common/domain/folder.cpp
@@ -35,6 +35,8 @@
35 35
36#include "folder_generated.h" 36#include "folder_generated.h"
37 37
38SINK_DEBUG_AREA("folder");
39
38static QMutex sMutex; 40static QMutex sMutex;
39 41
40using namespace Sink::ApplicationDomain; 42using namespace Sink::ApplicationDomain;
@@ -58,7 +60,7 @@ ResultSet TypeImplementation<Folder>::queryIndexes(const Sink::Query &query, con
58 60
59void TypeImplementation<Folder>::index(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::Transaction &transaction) 61void TypeImplementation<Folder>::index(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::Transaction &transaction)
60{ 62{
61 Trace() << "Indexing " << identifier; 63 SinkTrace() << "Indexing " << identifier;
62 getIndex().add(identifier, bufferAdaptor, transaction); 64 getIndex().add(identifier, bufferAdaptor, transaction);
63} 65}
64 66
diff --git a/common/domain/mail.cpp b/common/domain/mail.cpp
index 5b35a9a..13e1305 100644
--- a/common/domain/mail.cpp
+++ b/common/domain/mail.cpp
@@ -35,6 +35,8 @@
35 35
36#include "mail_generated.h" 36#include "mail_generated.h"
37 37
38SINK_DEBUG_AREA("mail");
39
38static QMutex sMutex; 40static QMutex sMutex;
39 41
40using namespace Sink::ApplicationDomain; 42using namespace Sink::ApplicationDomain;
@@ -63,7 +65,7 @@ ResultSet TypeImplementation<Mail>::queryIndexes(const Sink::Query &query, const
63 65
64void TypeImplementation<Mail>::index(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::Transaction &transaction) 66void TypeImplementation<Mail>::index(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::Transaction &transaction)
65{ 67{
66 Trace() << "Indexing " << identifier; 68 SinkTrace() << "Indexing " << identifier;
67 getIndex().add(identifier, bufferAdaptor, transaction); 69 getIndex().add(identifier, bufferAdaptor, transaction);
68} 70}
69 71
diff --git a/common/domainadaptor.h b/common/domainadaptor.h
index 8ac8171..25448f3 100644
--- a/common/domainadaptor.h
+++ b/common/domainadaptor.h
@@ -47,12 +47,12 @@ createBufferPart(const Sink::ApplicationDomain::ApplicationDomainType &domainObj
47 // First create a primitives such as strings using the mappings 47 // First create a primitives such as strings using the mappings
48 QList<std::function<void(Builder &)>> propertiesToAddToResource; 48 QList<std::function<void(Builder &)>> propertiesToAddToResource;
49 for (const auto &property : domainObject.changedProperties()) { 49 for (const auto &property : domainObject.changedProperties()) {
50 // Trace() << "copying property " << property; 50 // SinkTrace() << "copying property " << property;
51 const auto value = domainObject.getProperty(property); 51 const auto value = domainObject.getProperty(property);
52 if (mapper.hasMapping(property)) { 52 if (mapper.hasMapping(property)) {
53 mapper.setProperty(property, domainObject.getProperty(property), propertiesToAddToResource, fbb); 53 mapper.setProperty(property, domainObject.getProperty(property), propertiesToAddToResource, fbb);
54 } else { 54 } else {
55 // Trace() << "no mapping for property available " << property; 55 // SinkTrace() << "no mapping for property available " << property;
56 } 56 }
57 } 57 }
58 58
@@ -79,7 +79,7 @@ static void createBufferPartBuffer(const Sink::ApplicationDomain::ApplicationDom
79 fbb.Finish(pos, "AKFB"); 79 fbb.Finish(pos, "AKFB");
80 flatbuffers::Verifier verifier(fbb.GetBufferPointer(), fbb.GetSize()); 80 flatbuffers::Verifier verifier(fbb.GetBufferPointer(), fbb.GetSize());
81 if (!verifier.VerifyBuffer<Buffer>()) { 81 if (!verifier.VerifyBuffer<Buffer>()) {
82 Warning() << "Created invalid uffer"; 82 SinkWarning_(0, "bufferadaptor") << "Created invalid uffer";
83 } 83 }
84} 84}
85 85
@@ -89,6 +89,7 @@ static void createBufferPartBuffer(const Sink::ApplicationDomain::ApplicationDom
89template <class LocalBuffer, class ResourceBuffer> 89template <class LocalBuffer, class ResourceBuffer>
90class GenericBufferAdaptor : public Sink::ApplicationDomain::BufferAdaptor 90class GenericBufferAdaptor : public Sink::ApplicationDomain::BufferAdaptor
91{ 91{
92 SINK_DEBUG_AREA("bufferadaptor")
92public: 93public:
93 GenericBufferAdaptor() : BufferAdaptor() 94 GenericBufferAdaptor() : BufferAdaptor()
94 { 95 {
@@ -96,7 +97,7 @@ public:
96 97
97 virtual void setProperty(const QByteArray &key, const QVariant &value) Q_DECL_OVERRIDE 98 virtual void setProperty(const QByteArray &key, const QVariant &value) Q_DECL_OVERRIDE
98 { 99 {
99 Warning() << "Can't set property " << key; 100 SinkWarning() << "Can't set property " << key;
100 Q_ASSERT(false); 101 Q_ASSERT(false);
101 } 102 }
102 103
@@ -107,7 +108,7 @@ public:
107 } else if (mLocalBuffer && mLocalMapper->hasMapping(key)) { 108 } else if (mLocalBuffer && mLocalMapper->hasMapping(key)) {
108 return mLocalMapper->getProperty(key, mLocalBuffer); 109 return mLocalMapper->getProperty(key, mLocalBuffer);
109 } 110 }
110 Warning() << "No mapping available for key " << key << mLocalBuffer << mResourceBuffer; 111 SinkWarning() << "No mapping available for key " << key << mLocalBuffer << mResourceBuffer;
111 return QVariant(); 112 return QVariant();
112 } 113 }
113 114
@@ -168,13 +169,13 @@ public:
168 { 169 {
169 flatbuffers::FlatBufferBuilder localFbb; 170 flatbuffers::FlatBufferBuilder localFbb;
170 if (mLocalWriteMapper) { 171 if (mLocalWriteMapper) {
171 // Trace() << "Creating local buffer part"; 172 // SinkTrace() << "Creating local buffer part";
172 createBufferPartBuffer<LocalBuffer, LocalBuilder>(domainObject, localFbb, *mLocalWriteMapper); 173 createBufferPartBuffer<LocalBuffer, LocalBuilder>(domainObject, localFbb, *mLocalWriteMapper);
173 } 174 }
174 175
175 flatbuffers::FlatBufferBuilder resFbb; 176 flatbuffers::FlatBufferBuilder resFbb;
176 if (mResourceWriteMapper) { 177 if (mResourceWriteMapper) {
177 // Trace() << "Creating resouce buffer part"; 178 // SinkTrace() << "Creating resouce buffer part";
178 createBufferPartBuffer<ResourceBuffer, ResourceBuilder>(domainObject, resFbb, *mResourceWriteMapper); 179 createBufferPartBuffer<ResourceBuffer, ResourceBuilder>(domainObject, resFbb, *mResourceWriteMapper);
179 } 180 }
180 181
diff --git a/common/entityreader.cpp b/common/entityreader.cpp
index c15f73f..411e7e4 100644
--- a/common/entityreader.cpp
+++ b/common/entityreader.cpp
@@ -23,6 +23,8 @@
23#include "storage.h" 23#include "storage.h"
24#include "query.h" 24#include "query.h"
25 25
26SINK_DEBUG_AREA("entityreader")
27
26using namespace Sink; 28using namespace Sink;
27 29
28QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> EntityReaderUtils::getLatest(const Sink::Storage::NamedDatabase &db, const QByteArray &uid, DomainTypeAdaptorFactoryInterface &adaptorFactory, qint64 &retrievedRevision) 30QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> EntityReaderUtils::getLatest(const Sink::Storage::NamedDatabase &db, const QByteArray &uid, DomainTypeAdaptorFactoryInterface &adaptorFactory, qint64 &retrievedRevision)
@@ -32,15 +34,15 @@ QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> EntityReaderUtils::getLat
32 [&current, &adaptorFactory, &retrievedRevision](const QByteArray &key, const QByteArray &data) -> bool { 34 [&current, &adaptorFactory, &retrievedRevision](const QByteArray &key, const QByteArray &data) -> bool {
33 Sink::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); 35 Sink::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size());
34 if (!buffer.isValid()) { 36 if (!buffer.isValid()) {
35 Warning() << "Read invalid buffer from disk"; 37 SinkWarning() << "Read invalid buffer from disk";
36 } else { 38 } else {
37 Trace() << "Found value " << key; 39 SinkTrace() << "Found value " << key;
38 current = adaptorFactory.createAdaptor(buffer.entity()); 40 current = adaptorFactory.createAdaptor(buffer.entity());
39 retrievedRevision = Sink::Storage::revisionFromKey(key); 41 retrievedRevision = Sink::Storage::revisionFromKey(key);
40 } 42 }
41 return false; 43 return false;
42 }, 44 },
43 [](const Sink::Storage::Error &error) { Warning() << "Failed to read current value from storage: " << error.message; }); 45 [](const Sink::Storage::Error &error) { SinkWarning() << "Failed to read current value from storage: " << error.message; });
44 return current; 46 return current;
45} 47}
46 48
@@ -51,14 +53,14 @@ QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> EntityReaderUtils::get(co
51 [&current, &adaptorFactory, &retrievedRevision](const QByteArray &key, const QByteArray &data) -> bool { 53 [&current, &adaptorFactory, &retrievedRevision](const QByteArray &key, const QByteArray &data) -> bool {
52 Sink::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); 54 Sink::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size());
53 if (!buffer.isValid()) { 55 if (!buffer.isValid()) {
54 Warning() << "Read invalid buffer from disk"; 56 SinkWarning() << "Read invalid buffer from disk";
55 } else { 57 } else {
56 current = adaptorFactory.createAdaptor(buffer.entity()); 58 current = adaptorFactory.createAdaptor(buffer.entity());
57 retrievedRevision = Sink::Storage::revisionFromKey(key); 59 retrievedRevision = Sink::Storage::revisionFromKey(key);
58 } 60 }
59 return false; 61 return false;
60 }, 62 },
61 [](const Sink::Storage::Error &error) { Warning() << "Failed to read current value from storage: " << error.message; }); 63 [](const Sink::Storage::Error &error) { SinkWarning() << "Failed to read current value from storage: " << error.message; });
62 return current; 64 return current;
63} 65}
64 66
@@ -74,7 +76,7 @@ QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> EntityReaderUtils::getPre
74 } 76 }
75 return true; 77 return true;
76 }, 78 },
77 [](const Sink::Storage::Error &error) { Warning() << "Failed to read current value from storage: " << error.message; }, true); 79 [](const Sink::Storage::Error &error) { SinkWarning() << "Failed to read current value from storage: " << error.message; }, true);
78 return get(db, Sink::Storage::assembleKey(uid, latestRevision), adaptorFactory, retrievedRevision); 80 return get(db, Sink::Storage::assembleKey(uid, latestRevision), adaptorFactory, retrievedRevision);
79} 81}
80 82
@@ -86,7 +88,7 @@ EntityReader<DomainType>::EntityReader(const QByteArray &resourceType, const QBy
86 mDomainTypeAdaptorFactory(*mDomainTypeAdaptorFactoryPtr) 88 mDomainTypeAdaptorFactory(*mDomainTypeAdaptorFactoryPtr)
87{ 89{
88 Q_ASSERT(!resourceType.isEmpty()); 90 Q_ASSERT(!resourceType.isEmpty());
89 Trace() << "resourceType " << resourceType; 91 SinkTrace() << "resourceType " << resourceType;
90 Q_ASSERT(mDomainTypeAdaptorFactoryPtr); 92 Q_ASSERT(mDomainTypeAdaptorFactoryPtr);
91} 93}
92 94
@@ -165,13 +167,13 @@ void EntityReader<DomainType>::readEntity(const Sink::Storage::NamedDatabase &db
165 resultCallback(DomainType::Ptr::create(mResourceInstanceIdentifier, Sink::Storage::uidFromKey(key), revision, adaptor), operation); 167 resultCallback(DomainType::Ptr::create(mResourceInstanceIdentifier, Sink::Storage::uidFromKey(key), revision, adaptor), operation);
166 return false; 168 return false;
167 }, 169 },
168 [&](const Sink::Storage::Error &error) { Warning() << "Error during query: " << error.message << key; }); 170 [&](const Sink::Storage::Error &error) { SinkWarning() << "Error during query: " << error.message << key; });
169} 171}
170 172
171static inline ResultSet fullScan(const Sink::Storage::Transaction &transaction, const QByteArray &bufferType) 173static inline ResultSet fullScan(const Sink::Storage::Transaction &transaction, const QByteArray &bufferType)
172{ 174{
173 // TODO use a result set with an iterator, to read values on demand 175 // TODO use a result set with an iterator, to read values on demand
174 Trace() << "Looking for : " << bufferType; 176 SinkTrace() << "Looking for : " << bufferType;
175 //The scan can return duplicate results if we have multiple revisions, so we use a set to deduplicate. 177 //The scan can return duplicate results if we have multiple revisions, so we use a set to deduplicate.
176 QSet<QByteArray> keys; 178 QSet<QByteArray> keys;
177 Storage::mainDatabase(transaction, bufferType) 179 Storage::mainDatabase(transaction, bufferType)
@@ -179,14 +181,14 @@ static inline ResultSet fullScan(const Sink::Storage::Transaction &transaction,
179 [&](const QByteArray &key, const QByteArray &value) -> bool { 181 [&](const QByteArray &key, const QByteArray &value) -> bool {
180 if (keys.contains(Sink::Storage::uidFromKey(key))) { 182 if (keys.contains(Sink::Storage::uidFromKey(key))) {
181 //Not something that should persist if the replay works, so we keep a message for now. 183 //Not something that should persist if the replay works, so we keep a message for now.
182 Trace() << "Multiple revisions for key: " << key; 184 SinkTrace() << "Multiple revisions for key: " << key;
183 } 185 }
184 keys << Sink::Storage::uidFromKey(key); 186 keys << Sink::Storage::uidFromKey(key);
185 return true; 187 return true;
186 }, 188 },
187 [](const Sink::Storage::Error &error) { Warning() << "Error during query: " << error.message; }); 189 [](const Sink::Storage::Error &error) { SinkWarning() << "Error during query: " << error.message; });
188 190
189 Trace() << "Full scan retrieved " << keys.size() << " results."; 191 SinkTrace() << "Full scan retrieved " << keys.size() << " results.";
190 return ResultSet(keys.toList().toVector()); 192 return ResultSet(keys.toList().toVector());
191} 193}
192 194
@@ -224,7 +226,7 @@ ResultSet EntityReader<DomainType>::loadIncrementalResultSet(qint64 baseRevision
224 while (*revisionCounter <= topRevision) { 226 while (*revisionCounter <= topRevision) {
225 const auto uid = Sink::Storage::getUidFromRevision(mTransaction, *revisionCounter); 227 const auto uid = Sink::Storage::getUidFromRevision(mTransaction, *revisionCounter);
226 const auto type = Sink::Storage::getTypeFromRevision(mTransaction, *revisionCounter); 228 const auto type = Sink::Storage::getTypeFromRevision(mTransaction, *revisionCounter);
227 // Trace() << "Revision" << *revisionCounter << type << uid; 229 // SinkTrace() << "Revision" << *revisionCounter << type << uid;
228 Q_ASSERT(!uid.isEmpty()); 230 Q_ASSERT(!uid.isEmpty());
229 Q_ASSERT(!type.isEmpty()); 231 Q_ASSERT(!type.isEmpty());
230 if (type != bufferType) { 232 if (type != bufferType) {
@@ -236,7 +238,7 @@ ResultSet EntityReader<DomainType>::loadIncrementalResultSet(qint64 baseRevision
236 *revisionCounter += 1; 238 *revisionCounter += 1;
237 return key; 239 return key;
238 } 240 }
239 Trace() << "Finished reading incremental result set:" << *revisionCounter; 241 SinkTrace() << "Finished reading incremental result set:" << *revisionCounter;
240 // We're done 242 // We're done
241 return QByteArray(); 243 return QByteArray();
242 }); 244 });
@@ -248,7 +250,7 @@ ResultSet EntityReader<DomainType>::filterAndSortSet(ResultSet &resultSet, const
248{ 250{
249 const bool sortingRequired = !sortProperty.isEmpty(); 251 const bool sortingRequired = !sortProperty.isEmpty();
250 if (initialQuery && sortingRequired) { 252 if (initialQuery && sortingRequired) {
251 Trace() << "Sorting the resultset in memory according to property: " << sortProperty; 253 SinkTrace() << "Sorting the resultset in memory according to property: " << sortProperty;
252 // Sort the complete set by reading the sort property and filling into a sorted map 254 // Sort the complete set by reading the sort property and filling into a sorted map
253 auto sortedMap = QSharedPointer<QMap<QByteArray, QByteArray>>::create(); 255 auto sortedMap = QSharedPointer<QMap<QByteArray, QByteArray>>::create();
254 while (resultSet.next()) { 256 while (resultSet.next()) {
@@ -271,7 +273,7 @@ ResultSet EntityReader<DomainType>::filterAndSortSet(ResultSet &resultSet, const
271 }); 273 });
272 } 274 }
273 275
274 Trace() << "Sorted " << sortedMap->size() << " values."; 276 SinkTrace() << "Sorted " << sortedMap->size() << " values.";
275 auto iterator = QSharedPointer<QMapIterator<QByteArray, QByteArray>>::create(*sortedMap); 277 auto iterator = QSharedPointer<QMapIterator<QByteArray, QByteArray>>::create(*sortedMap);
276 ResultSet::ValueGenerator generator = [this, iterator, sortedMap, &db, filter, initialQuery]( 278 ResultSet::ValueGenerator generator = [this, iterator, sortedMap, &db, filter, initialQuery](
277 std::function<void(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &, Sink::Operation)> callback) -> bool { 279 std::function<void(const Sink::ApplicationDomain::ApplicationDomainType::Ptr &, Sink::Operation)> callback) -> bool {
@@ -330,11 +332,11 @@ QPair<qint64, qint64> EntityReader<DomainType>::load(const Sink::Query &query, c
330 QSet<QByteArray> remainingFilters; 332 QSet<QByteArray> remainingFilters;
331 QByteArray remainingSorting; 333 QByteArray remainingSorting;
332 auto resultSet = baseSetRetriever(remainingFilters, remainingSorting); 334 auto resultSet = baseSetRetriever(remainingFilters, remainingSorting);
333 Trace() << "Base set retrieved. " << Log::TraceTime(time.elapsed()); 335 SinkTrace() << "Base set retrieved. " << Log::TraceTime(time.elapsed());
334 auto filteredSet = filterAndSortSet(resultSet, getFilter(remainingFilters, query), db, initialQuery, remainingSorting); 336 auto filteredSet = filterAndSortSet(resultSet, getFilter(remainingFilters, query), db, initialQuery, remainingSorting);
335 Trace() << "Filtered set retrieved. " << Log::TraceTime(time.elapsed()); 337 SinkTrace() << "Filtered set retrieved. " << Log::TraceTime(time.elapsed());
336 auto replayedEntities = replaySet(filteredSet, offset, batchSize, callback); 338 auto replayedEntities = replaySet(filteredSet, offset, batchSize, callback);
337 // Trace() << "Filtered set replayed. " << Log::TraceTime(time.elapsed()); 339 // SinkTrace() << "Filtered set replayed. " << Log::TraceTime(time.elapsed());
338 return qMakePair(Sink::Storage::maxRevision(mTransaction), replayedEntities); 340 return qMakePair(Sink::Storage::maxRevision(mTransaction), replayedEntities);
339} 341}
340 342
@@ -346,7 +348,7 @@ QPair<qint64, qint64> EntityReader<DomainType>::executeInitialQuery(const Sink::
346 auto revisionAndReplayedEntities = load(query, [&](QSet<QByteArray> &remainingFilters, QByteArray &remainingSorting) -> ResultSet { 348 auto revisionAndReplayedEntities = load(query, [&](QSet<QByteArray> &remainingFilters, QByteArray &remainingSorting) -> ResultSet {
347 return loadInitialResultSet(query, remainingFilters, remainingSorting); 349 return loadInitialResultSet(query, remainingFilters, remainingSorting);
348 }, true, offset, batchsize, callback); 350 }, true, offset, batchsize, callback);
349 Trace() << "Initial query took: " << Log::TraceTime(time.elapsed()); 351 SinkTrace() << "Initial query took: " << Log::TraceTime(time.elapsed());
350 return revisionAndReplayedEntities; 352 return revisionAndReplayedEntities;
351} 353}
352 354
@@ -359,7 +361,7 @@ QPair<qint64, qint64> EntityReader<DomainType>::executeIncrementalQuery(const Si
359 auto revisionAndReplayedEntities = load(query, [&](QSet<QByteArray> &remainingFilters, QByteArray &remainingSorting) -> ResultSet { 361 auto revisionAndReplayedEntities = load(query, [&](QSet<QByteArray> &remainingFilters, QByteArray &remainingSorting) -> ResultSet {
360 return loadIncrementalResultSet(baseRevision, query, remainingFilters); 362 return loadIncrementalResultSet(baseRevision, query, remainingFilters);
361 }, false, 0, 0, callback); 363 }, false, 0, 0, callback);
362 Trace() << "Initial query took: " << Log::TraceTime(time.elapsed()); 364 SinkTrace() << "Initial query took: " << Log::TraceTime(time.elapsed());
363 return revisionAndReplayedEntities; 365 return revisionAndReplayedEntities;
364} 366}
365 367
@@ -377,7 +379,7 @@ EntityReader<DomainType>::getFilter(const QSet<QByteArray> remainingFilters, con
377 const auto property = domainObject->getProperty(filterProperty); 379 const auto property = domainObject->getProperty(filterProperty);
378 const auto comparator = query.propertyFilter.value(filterProperty); 380 const auto comparator = query.propertyFilter.value(filterProperty);
379 if (!comparator.matches(property)) { 381 if (!comparator.matches(property)) {
380 Trace() << "Filtering entity due to property mismatch on filter: " << filterProperty << property << ":" << comparator.value; 382 SinkTrace() << "Filtering entity due to property mismatch on filter: " << filterProperty << property << ":" << comparator.value;
381 return false; 383 return false;
382 } 384 }
383 } 385 }
@@ -388,7 +390,7 @@ EntityReader<DomainType>::getFilter(const QSet<QByteArray> remainingFilters, con
388template <class DomainType> 390template <class DomainType>
389qint64 EntityReader<DomainType>::replaySet(ResultSet &resultSet, int offset, int batchSize, const std::function<bool(const typename DomainType::Ptr &value, Sink::Operation operation)> &callback) 391qint64 EntityReader<DomainType>::replaySet(ResultSet &resultSet, int offset, int batchSize, const std::function<bool(const typename DomainType::Ptr &value, Sink::Operation operation)> &callback)
390{ 392{
391 Trace() << "Skipping over " << offset << " results"; 393 SinkTrace() << "Skipping over " << offset << " results";
392 resultSet.skip(offset); 394 resultSet.skip(offset);
393 int counter = 0; 395 int counter = 0;
394 while (!batchSize || (counter < batchSize)) { 396 while (!batchSize || (counter < batchSize)) {
@@ -401,7 +403,7 @@ qint64 EntityReader<DomainType>::replaySet(ResultSet &resultSet, int offset, int
401 break; 403 break;
402 } 404 }
403 }; 405 };
404 Trace() << "Replayed " << counter << " results." 406 SinkTrace() << "Replayed " << counter << " results."
405 << "Limit " << batchSize; 407 << "Limit " << batchSize;
406 return counter; 408 return counter;
407} 409}
diff --git a/common/facade.cpp b/common/facade.cpp
index 2660300..72f7414 100644
--- a/common/facade.cpp
+++ b/common/facade.cpp
@@ -30,9 +30,6 @@
30 30
31using namespace Sink; 31using namespace Sink;
32 32
33#undef DEBUG_AREA
34#define DEBUG_AREA "client.facade"
35
36template <class DomainType> 33template <class DomainType>
37GenericFacade<DomainType>::GenericFacade( 34GenericFacade<DomainType>::GenericFacade(
38 const QByteArray &resourceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &adaptorFactory, const QSharedPointer<Sink::ResourceAccessInterface> resourceAccess) 35 const QByteArray &resourceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &adaptorFactory, const QSharedPointer<Sink::ResourceAccessInterface> resourceAccess)
@@ -59,7 +56,7 @@ template <class DomainType>
59KAsync::Job<void> GenericFacade<DomainType>::create(const DomainType &domainObject) 56KAsync::Job<void> GenericFacade<DomainType>::create(const DomainType &domainObject)
60{ 57{
61 if (!mDomainTypeAdaptorFactory) { 58 if (!mDomainTypeAdaptorFactory) {
62 Warning() << "No domain type adaptor factory available"; 59 SinkWarning() << "No domain type adaptor factory available";
63 return KAsync::error<void>(); 60 return KAsync::error<void>();
64 } 61 }
65 flatbuffers::FlatBufferBuilder entityFbb; 62 flatbuffers::FlatBufferBuilder entityFbb;
@@ -71,10 +68,10 @@ template <class DomainType>
71KAsync::Job<void> GenericFacade<DomainType>::modify(const DomainType &domainObject) 68KAsync::Job<void> GenericFacade<DomainType>::modify(const DomainType &domainObject)
72{ 69{
73 if (!mDomainTypeAdaptorFactory) { 70 if (!mDomainTypeAdaptorFactory) {
74 Warning() << "No domain type adaptor factory available"; 71 SinkWarning() << "No domain type adaptor factory available";
75 return KAsync::error<void>(); 72 return KAsync::error<void>();
76 } 73 }
77 Trace() << "Modifying entity: " << domainObject.identifier() << domainObject.changedProperties(); 74 SinkTrace() << "Modifying entity: " << domainObject.identifier() << domainObject.changedProperties();
78 flatbuffers::FlatBufferBuilder entityFbb; 75 flatbuffers::FlatBufferBuilder entityFbb;
79 mDomainTypeAdaptorFactory->createBuffer(domainObject, entityFbb); 76 mDomainTypeAdaptorFactory->createBuffer(domainObject, entityFbb);
80 return mResourceAccess->sendModifyCommand(domainObject.identifier(), domainObject.revision(), bufferTypeForDomainType(), QByteArrayList(), BufferUtils::extractBuffer(entityFbb), domainObject.changedProperties()); 77 return mResourceAccess->sendModifyCommand(domainObject.identifier(), domainObject.revision(), bufferTypeForDomainType(), QByteArrayList(), BufferUtils::extractBuffer(entityFbb), domainObject.changedProperties());
diff --git a/common/facade.h b/common/facade.h
index 658ccb8..b193580 100644
--- a/common/facade.h
+++ b/common/facade.h
@@ -46,6 +46,9 @@ namespace Sink {
46template <typename DomainType> 46template <typename DomainType>
47class SINK_EXPORT GenericFacade : public Sink::StoreFacade<DomainType> 47class SINK_EXPORT GenericFacade : public Sink::StoreFacade<DomainType>
48{ 48{
49protected:
50 SINK_DEBUG_AREA("facade")
51 SINK_DEBUG_COMPONENT(mResourceInstanceIdentifier)
49public: 52public:
50 /** 53 /**
51 * Create a new GenericFacade 54 * Create a new GenericFacade
diff --git a/common/genericresource.cpp b/common/genericresource.cpp
index c06c22a..7136882 100644
--- a/common/genericresource.cpp
+++ b/common/genericresource.cpp
@@ -46,9 +46,6 @@ static int sCommitInterval = 10;
46 46
47using namespace Sink; 47using namespace Sink;
48 48
49#undef DEBUG_AREA
50#define DEBUG_AREA "resource.commandprocessor"
51
52/** 49/**
53 * Drives the pipeline using the output from all command queues 50 * Drives the pipeline using the output from all command queues
54 */ 51 */
@@ -56,12 +53,13 @@ class CommandProcessor : public QObject
56{ 53{
57 Q_OBJECT 54 Q_OBJECT
58 typedef std::function<KAsync::Job<void>(void const *, size_t)> InspectionFunction; 55 typedef std::function<KAsync::Job<void>(void const *, size_t)> InspectionFunction;
56 SINK_DEBUG_AREA("commandprocessor")
59 57
60public: 58public:
61 CommandProcessor(Sink::Pipeline *pipeline, QList<MessageQueue *> commandQueues) : QObject(), mPipeline(pipeline), mCommandQueues(commandQueues), mProcessingLock(false) 59 CommandProcessor(Sink::Pipeline *pipeline, QList<MessageQueue *> commandQueues) : QObject(), mPipeline(pipeline), mCommandQueues(commandQueues), mProcessingLock(false)
62 { 60 {
63 mLowerBoundRevision = Storage::maxRevision(mPipeline->storage().createTransaction(Storage::ReadOnly, [](const Sink::Storage::Error &error) { 61 mLowerBoundRevision = Storage::maxRevision(mPipeline->storage().createTransaction(Storage::ReadOnly, [](const Sink::Storage::Error &error) {
64 Warning() << error.message; 62 SinkWarning() << error.message;
65 })); 63 }));
66 64
67 for (auto queue : mCommandQueues) { 65 for (auto queue : mCommandQueues) {
@@ -80,7 +78,6 @@ public:
80 mInspect = f; 78 mInspect = f;
81 } 79 }
82 80
83
84signals: 81signals:
85 void error(int errorCode, const QString &errorMessage); 82 void error(int errorCode, const QString &errorMessage);
86 83
@@ -114,7 +111,7 @@ private slots:
114 111
115 KAsync::Job<qint64> processQueuedCommand(const Sink::QueuedCommand *queuedCommand) 112 KAsync::Job<qint64> processQueuedCommand(const Sink::QueuedCommand *queuedCommand)
116 { 113 {
117 Trace() << "Processing command: " << Sink::Commands::name(queuedCommand->commandId()); 114 SinkTrace() << "Processing command: " << Sink::Commands::name(queuedCommand->commandId());
118 // Throw command into appropriate pipeline 115 // Throw command into appropriate pipeline
119 switch (queuedCommand->commandId()) { 116 switch (queuedCommand->commandId()) {
120 case Sink::Commands::DeleteEntityCommand: 117 case Sink::Commands::DeleteEntityCommand:
@@ -138,21 +135,21 @@ private slots:
138 { 135 {
139 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(data.constData()), data.size()); 136 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(data.constData()), data.size());
140 if (!Sink::VerifyQueuedCommandBuffer(verifyer)) { 137 if (!Sink::VerifyQueuedCommandBuffer(verifyer)) {
141 Warning() << "invalid buffer"; 138 SinkWarning() << "invalid buffer";
142 // return KAsync::error<void, qint64>(1, "Invalid Buffer"); 139 // return KAsync::error<void, qint64>(1, "Invalid Buffer");
143 } 140 }
144 auto queuedCommand = Sink::GetQueuedCommand(data.constData()); 141 auto queuedCommand = Sink::GetQueuedCommand(data.constData());
145 const auto commandId = queuedCommand->commandId(); 142 const auto commandId = queuedCommand->commandId();
146 Trace() << "Dequeued Command: " << Sink::Commands::name(commandId); 143 SinkTrace() << "Dequeued Command: " << Sink::Commands::name(commandId);
147 return processQueuedCommand(queuedCommand) 144 return processQueuedCommand(queuedCommand)
148 .then<qint64, qint64>( 145 .then<qint64, qint64>(
149 [commandId](qint64 createdRevision) -> qint64 { 146 [this, commandId](qint64 createdRevision) -> qint64 {
150 Trace() << "Command pipeline processed: " << Sink::Commands::name(commandId); 147 SinkTrace() << "Command pipeline processed: " << Sink::Commands::name(commandId);
151 return createdRevision; 148 return createdRevision;
152 }, 149 },
153 [](int errorCode, QString errorMessage) { 150 [](int errorCode, QString errorMessage) {
154 // FIXME propagate error, we didn't handle it 151 // FIXME propagate error, we didn't handle it
155 Warning() << "Error while processing queue command: " << errorMessage; 152 SinkWarning() << "Error while processing queue command: " << errorMessage;
156 }); 153 });
157 } 154 }
158 155
@@ -169,7 +166,7 @@ private slots:
169 return KAsync::start<void>([this, data, time](KAsync::Future<void> &future) { 166 return KAsync::start<void>([this, data, time](KAsync::Future<void> &future) {
170 processQueuedCommand(data) 167 processQueuedCommand(data)
171 .then<void, qint64>([&future, this, time](qint64 createdRevision) { 168 .then<void, qint64>([&future, this, time](qint64 createdRevision) {
172 Trace() << "Created revision " << createdRevision << ". Processing took: " << Log::TraceTime(time->elapsed()); 169 SinkTrace() << "Created revision " << createdRevision << ". Processing took: " << Log::TraceTime(time->elapsed());
173 future.setFinished(); 170 future.setFinished();
174 }) 171 })
175 .exec(); 172 .exec();
@@ -178,7 +175,7 @@ private slots:
178 .then<void>([&future, queue]() { future.setFinished(); }, 175 .then<void>([&future, queue]() { future.setFinished(); },
179 [&future](int i, QString error) { 176 [&future](int i, QString error) {
180 if (i != MessageQueue::ErrorCodes::NoMessageFound) { 177 if (i != MessageQueue::ErrorCodes::NoMessageFound) {
181 Warning() << "Error while getting message from messagequeue: " << error; 178 SinkWarning() << "Error while getting message from messagequeue: " << error;
182 } 179 }
183 future.setFinished(); 180 future.setFinished();
184 }) 181 })
@@ -192,12 +189,12 @@ private slots:
192 auto time = QSharedPointer<QTime>::create(); 189 auto time = QSharedPointer<QTime>::create();
193 time->start(); 190 time->start();
194 mPipeline->startTransaction(); 191 mPipeline->startTransaction();
195 Trace() << "Cleaning up from " << mPipeline->cleanedUpRevision() + 1 << " to " << mLowerBoundRevision; 192 SinkTrace() << "Cleaning up from " << mPipeline->cleanedUpRevision() + 1 << " to " << mLowerBoundRevision;
196 for (qint64 revision = mPipeline->cleanedUpRevision() + 1; revision <= mLowerBoundRevision; revision++) { 193 for (qint64 revision = mPipeline->cleanedUpRevision() + 1; revision <= mLowerBoundRevision; revision++) {
197 mPipeline->cleanupRevision(revision); 194 mPipeline->cleanupRevision(revision);
198 } 195 }
199 mPipeline->commit(); 196 mPipeline->commit();
200 Trace() << "Cleanup done." << Log::TraceTime(time->elapsed()); 197 SinkTrace() << "Cleanup done." << Log::TraceTime(time->elapsed());
201 198
202 // Go through all message queues 199 // Go through all message queues
203 auto it = QSharedPointer<QListIterator<MessageQueue *>>::create(mCommandQueues); 200 auto it = QSharedPointer<QListIterator<MessageQueue *>>::create(mCommandQueues);
@@ -208,8 +205,8 @@ private slots:
208 205
209 auto queue = it->next(); 206 auto queue = it->next();
210 processQueue(queue) 207 processQueue(queue)
211 .then<void>([&future, time]() { 208 .then<void>([this, &future, time]() {
212 Trace() << "Queue processed." << Log::TraceTime(time->elapsed()); 209 SinkTrace() << "Queue processed." << Log::TraceTime(time->elapsed());
213 future.setFinished(); 210 future.setFinished();
214 }) 211 })
215 .exec(); 212 .exec();
@@ -226,9 +223,6 @@ private:
226 InspectionFunction mInspect; 223 InspectionFunction mInspect;
227}; 224};
228 225
229#undef DEBUG_AREA
230#define DEBUG_AREA "resource"
231
232GenericResource::GenericResource(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier, const QSharedPointer<Pipeline> &pipeline ) 226GenericResource::GenericResource(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier, const QSharedPointer<Pipeline> &pipeline )
233 : Sink::Resource(), 227 : Sink::Resource(),
234 mUserQueue(Sink::storageLocation(), resourceInstanceIdentifier + ".userqueue"), 228 mUserQueue(Sink::storageLocation(), resourceInstanceIdentifier + ".userqueue"),
@@ -240,7 +234,7 @@ GenericResource::GenericResource(const QByteArray &resourceType, const QByteArra
240 mClientLowerBoundRevision(std::numeric_limits<qint64>::max()) 234 mClientLowerBoundRevision(std::numeric_limits<qint64>::max())
241{ 235{
242 mPipeline->setResourceType(mResourceType); 236 mPipeline->setResourceType(mResourceType);
243 mProcessor = new CommandProcessor(mPipeline.data(), QList<MessageQueue *>() << &mUserQueue << &mSynchronizerQueue); 237 mProcessor = std::unique_ptr<CommandProcessor>(new CommandProcessor(mPipeline.data(), QList<MessageQueue *>() << &mUserQueue << &mSynchronizerQueue));
244 mProcessor->setInspectionCommand([this](void const *command, size_t size) { 238 mProcessor->setInspectionCommand([this](void const *command, size_t size) {
245 flatbuffers::Verifier verifier((const uint8_t *)command, size); 239 flatbuffers::Verifier verifier((const uint8_t *)command, size);
246 if (Sink::Commands::VerifyInspectionBuffer(verifier)) { 240 if (Sink::Commands::VerifyInspectionBuffer(verifier)) {
@@ -260,18 +254,18 @@ GenericResource::GenericResource(const QByteArray &resourceType, const QByteArra
260 [=]() { 254 [=]() {
261 Log_area("resource.inspection") << "Inspection was successful: " << inspectionType << inspectionId << entityId; 255 Log_area("resource.inspection") << "Inspection was successful: " << inspectionType << inspectionId << entityId;
262 Sink::Notification n; 256 Sink::Notification n;
263 n.type = Sink::Commands::NotificationType_Inspection; 257 n.type = Sink::Notification::Inspection;
264 n.id = inspectionId; 258 n.id = inspectionId;
265 n.code = Sink::Commands::NotificationCode_Success; 259 n.code = Sink::Notification::Success;
266 emit notify(n); 260 emit notify(n);
267 }, 261 },
268 [=](int code, const QString &message) { 262 [=](int code, const QString &message) {
269 Warning_area("resource.inspection") << "Inspection failed: " << inspectionType << inspectionId << entityId << message; 263 Warning_area("resource.inspection") << "Inspection failed: " << inspectionType << inspectionId << entityId << message;
270 Sink::Notification n; 264 Sink::Notification n;
271 n.type = Sink::Commands::NotificationType_Inspection; 265 n.type = Sink::Notification::Inspection;
272 n.message = message; 266 n.message = message;
273 n.id = inspectionId; 267 n.id = inspectionId;
274 n.code = Sink::Commands::NotificationCode_Failure; 268 n.code = Sink::Notification::Failure;
275 emit notify(n); 269 emit notify(n);
276 }) 270 })
277 .exec(); 271 .exec();
@@ -279,8 +273,14 @@ GenericResource::GenericResource(const QByteArray &resourceType, const QByteArra
279 } 273 }
280 return KAsync::error<void>(-1, "Invalid inspection command."); 274 return KAsync::error<void>(-1, "Invalid inspection command.");
281 }); 275 });
282 QObject::connect(mProcessor, &CommandProcessor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); }); 276 {
283 QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, this, &Resource::revisionUpdated); 277 auto ret =QObject::connect(mProcessor.get(), &CommandProcessor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); });
278 Q_ASSERT(ret);
279 }
280 {
281 auto ret = QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, this, &Resource::revisionUpdated);
282 Q_ASSERT(ret);
283 }
284 mClientLowerBoundRevision = mPipeline->cleanedUpRevision(); 284 mClientLowerBoundRevision = mPipeline->cleanedUpRevision();
285 285
286 mCommitQueueTimer.setInterval(sCommitInterval); 286 mCommitQueueTimer.setInterval(sCommitInterval);
@@ -290,13 +290,12 @@ GenericResource::GenericResource(const QByteArray &resourceType, const QByteArra
290 290
291GenericResource::~GenericResource() 291GenericResource::~GenericResource()
292{ 292{
293 delete mProcessor;
294} 293}
295 294
296KAsync::Job<void> GenericResource::inspect( 295KAsync::Job<void> GenericResource::inspect(
297 int inspectionType, const QByteArray &inspectionId, const QByteArray &domainType, const QByteArray &entityId, const QByteArray &property, const QVariant &expectedValue) 296 int inspectionType, const QByteArray &inspectionId, const QByteArray &domainType, const QByteArray &entityId, const QByteArray &property, const QVariant &expectedValue)
298{ 297{
299 Warning() << "Inspection not implemented"; 298 SinkWarning() << "Inspection not implemented";
300 return KAsync::null<void>(); 299 return KAsync::null<void>();
301} 300}
302 301
@@ -329,13 +328,36 @@ void GenericResource::setupSynchronizer(const QSharedPointer<Synchronizer> &sync
329void GenericResource::setupChangereplay(const QSharedPointer<ChangeReplay> &changeReplay) 328void GenericResource::setupChangereplay(const QSharedPointer<ChangeReplay> &changeReplay)
330{ 329{
331 mChangeReplay = changeReplay; 330 mChangeReplay = changeReplay;
331 {
332 auto ret = QObject::connect(mChangeReplay.data(), &ChangeReplay::replayingChanges, [this]() {
333 Sink::Notification n;
334 n.id = "changereplay";
335 n.type = Sink::Notification::Status;
336 n.message = "Replaying changes.";
337 n.code = Sink::ApplicationDomain::BusyStatus;
338 emit notify(n);
339 });
340 Q_ASSERT(ret);
341 }
342 {
343 auto ret = QObject::connect(mChangeReplay.data(), &ChangeReplay::changesReplayed, [this]() {
344 Sink::Notification n;
345 n.id = "changereplay";
346 n.type = Sink::Notification::Status;
347 n.message = "All changes have been replayed.";
348 n.code = Sink::ApplicationDomain::ConnectedStatus;
349 emit notify(n);
350 });
351 Q_ASSERT(ret);
352 }
353
332 mProcessor->setOldestUsedRevision(mChangeReplay->getLastReplayedRevision()); 354 mProcessor->setOldestUsedRevision(mChangeReplay->getLastReplayedRevision());
333 enableChangeReplay(true); 355 enableChangeReplay(true);
334} 356}
335 357
336void GenericResource::removeDataFromDisk() 358void GenericResource::removeDataFromDisk()
337{ 359{
338 Log() << "Removing the resource from disk: " << mResourceInstanceIdentifier; 360 SinkLog() << "Removing the resource from disk: " << mResourceInstanceIdentifier;
339 //Ensure we have no transaction or databases open 361 //Ensure we have no transaction or databases open
340 mSynchronizer.clear(); 362 mSynchronizer.clear();
341 mChangeReplay.clear(); 363 mChangeReplay.clear();
@@ -363,7 +385,7 @@ qint64 GenericResource::diskUsage(const QByteArray &instanceIdentifier)
363 385
364void GenericResource::onProcessorError(int errorCode, const QString &errorMessage) 386void GenericResource::onProcessorError(int errorCode, const QString &errorMessage)
365{ 387{
366 Warning() << "Received error from Processor: " << errorCode << errorMessage; 388 SinkWarning() << "Received error from Processor: " << errorCode << errorMessage;
367 mError = errorCode; 389 mError = errorCode;
368} 390}
369 391
@@ -399,12 +421,27 @@ void GenericResource::processCommand(int commandId, const QByteArray &data)
399KAsync::Job<void> GenericResource::synchronizeWithSource() 421KAsync::Job<void> GenericResource::synchronizeWithSource()
400{ 422{
401 return KAsync::start<void>([this](KAsync::Future<void> &future) { 423 return KAsync::start<void>([this](KAsync::Future<void> &future) {
402 Log() << " Synchronizing"; 424
425 Sink::Notification n;
426 n.id = "sync";
427 n.type = Sink::Notification::Status;
428 n.message = "Synchronization has started.";
429 n.code = Sink::ApplicationDomain::BusyStatus;
430 emit notify(n);
431
432 SinkLog() << " Synchronizing";
403 // Changereplay would deadlock otherwise when trying to open the synchronization store 433 // Changereplay would deadlock otherwise when trying to open the synchronization store
404 enableChangeReplay(false); 434 enableChangeReplay(false);
405 mSynchronizer->synchronize() 435 mSynchronizer->synchronize()
406 .then<void>([this, &future]() { 436 .then<void>([this, &future]() {
407 Log() << "Done Synchronizing"; 437 SinkLog() << "Done Synchronizing";
438 Sink::Notification n;
439 n.id = "sync";
440 n.type = Sink::Notification::Status;
441 n.message = "Synchronization has ended.";
442 n.code = Sink::ApplicationDomain::ConnectedStatus;
443 emit notify(n);
444
408 enableChangeReplay(true); 445 enableChangeReplay(true);
409 future.setFinished(); 446 future.setFinished();
410 }, [this, &future](int errorCode, const QString &error) { 447 }, [this, &future](int errorCode, const QString &error) {
diff --git a/common/genericresource.h b/common/genericresource.h
index 0878968..25892ca 100644
--- a/common/genericresource.h
+++ b/common/genericresource.h
@@ -40,6 +40,9 @@ class Synchronizer;
40 */ 40 */
41class SINK_EXPORT GenericResource : public Resource 41class SINK_EXPORT GenericResource : public Resource
42{ 42{
43protected:
44 SINK_DEBUG_AREA("resource")
45 SINK_DEBUG_COMPONENT(mResourceInstanceIdentifier)
43public: 46public:
44 GenericResource(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier, const QSharedPointer<Pipeline> &pipeline); 47 GenericResource(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier, const QSharedPointer<Pipeline> &pipeline);
45 virtual ~GenericResource(); 48 virtual ~GenericResource();
@@ -77,7 +80,7 @@ protected:
77 QSharedPointer<Pipeline> mPipeline; 80 QSharedPointer<Pipeline> mPipeline;
78 81
79private: 82private:
80 CommandProcessor *mProcessor; 83 std::unique_ptr<CommandProcessor> mProcessor;
81 QSharedPointer<ChangeReplay> mChangeReplay; 84 QSharedPointer<ChangeReplay> mChangeReplay;
82 QSharedPointer<Synchronizer> mSynchronizer; 85 QSharedPointer<Synchronizer> mSynchronizer;
83 int mError; 86 int mError;
diff --git a/common/index.cpp b/common/index.cpp
index 151f7af..beed45c 100644
--- a/common/index.cpp
+++ b/common/index.cpp
@@ -2,8 +2,7 @@
2 2
3#include "log.h" 3#include "log.h"
4 4
5#undef Trace 5SINK_DEBUG_AREA("index")
6#define Trace() Trace_area("index." + mName.toLatin1())
7 6
8Index::Index(const QString &storageRoot, const QString &name, Sink::Storage::AccessMode mode) 7Index::Index(const QString &storageRoot, const QString &name, Sink::Storage::AccessMode mode)
9 : mTransaction(Sink::Storage(storageRoot, name, mode).createTransaction(mode)), 8 : mTransaction(Sink::Storage(storageRoot, name, mode).createTransaction(mode)),
@@ -34,8 +33,8 @@ void Index::lookup(const QByteArray &key, const std::function<void(const QByteAr
34 resultHandler(value); 33 resultHandler(value);
35 return true; 34 return true;
36 }, 35 },
37 [errorHandler](const Sink::Storage::Error &error) { 36 [this, errorHandler](const Sink::Storage::Error &error) {
38 Warning() << "Error while retrieving value" << error.message; 37 SinkWarning() << "Error while retrieving value" << error.message;
39 errorHandler(Error(error.store, error.code, error.message)); 38 errorHandler(Error(error.store, error.code, error.message));
40 }, 39 },
41 matchSubStringKeys); 40 matchSubStringKeys);
@@ -45,6 +44,6 @@ QByteArray Index::lookup(const QByteArray &key)
45{ 44{
46 QByteArray result; 45 QByteArray result;
47 //We have to create a deep copy, otherwise the returned data may become invalid when the transaction ends. 46 //We have to create a deep copy, otherwise the returned data may become invalid when the transaction ends.
48 lookup(key, [&result](const QByteArray &value) { result = QByteArray(value.constData(), value.size()); }, [this](const Index::Error &error) { Trace() << "Error while retrieving value" << error.message; }); 47 lookup(key, [&result](const QByteArray &value) { result = QByteArray(value.constData(), value.size()); }, [this](const Index::Error &error) { SinkTrace() << "Error while retrieving value" << error.message; });
49 return result; 48 return result;
50} 49}
diff --git a/common/index.h b/common/index.h
index 0345f56..bfedf9a 100644
--- a/common/index.h
+++ b/common/index.h
@@ -5,6 +5,7 @@
5#include <functional> 5#include <functional>
6#include <QString> 6#include <QString>
7#include "storage.h" 7#include "storage.h"
8#include "log.h"
8 9
9/** 10/**
10 * An index for value pairs. 11 * An index for value pairs.
@@ -43,4 +44,5 @@ private:
43 Sink::Storage::Transaction mTransaction; 44 Sink::Storage::Transaction mTransaction;
44 Sink::Storage::NamedDatabase mDb; 45 Sink::Storage::NamedDatabase mDb;
45 QString mName; 46 QString mName;
47 SINK_DEBUG_COMPONENT(mName.toLatin1())
46}; 48};
diff --git a/common/listener.cpp b/common/listener.cpp
index 84afe16..2c5c1df 100644
--- a/common/listener.cpp
+++ b/common/listener.cpp
@@ -39,39 +39,35 @@
39#include <QTime> 39#include <QTime>
40#include <QDataStream> 40#include <QDataStream>
41 41
42#undef DEBUG_AREA
43#define DEBUG_AREA "resource.communication"
44
45Listener::Listener(const QByteArray &resourceInstanceIdentifier, const QByteArray &resourceType, QObject *parent) 42Listener::Listener(const QByteArray &resourceInstanceIdentifier, const QByteArray &resourceType, QObject *parent)
46 : QObject(parent), 43 : QObject(parent),
47 m_server(new QLocalServer(this)), 44 m_server(new QLocalServer(this)),
48 m_resourceName(resourceType), 45 m_resourceName(resourceType),
49 m_resourceInstanceIdentifier(resourceInstanceIdentifier), 46 m_resourceInstanceIdentifier(resourceInstanceIdentifier),
50 m_resource(0),
51 m_clientBufferProcessesTimer(new QTimer(this)), 47 m_clientBufferProcessesTimer(new QTimer(this)),
52 m_messageId(0) 48 m_messageId(0)
53{ 49{
54 connect(m_server, &QLocalServer::newConnection, this, &Listener::acceptConnection); 50 connect(m_server.get(), &QLocalServer::newConnection, this, &Listener::acceptConnection);
55 Trace() << "Trying to open " << m_resourceInstanceIdentifier; 51 SinkTrace() << "Trying to open " << m_resourceInstanceIdentifier;
56 52
57 if (!m_server->listen(QString::fromLatin1(m_resourceInstanceIdentifier))) { 53 if (!m_server->listen(QString::fromLatin1(m_resourceInstanceIdentifier))) {
58 m_server->removeServer(m_resourceInstanceIdentifier); 54 m_server->removeServer(m_resourceInstanceIdentifier);
59 if (!m_server->listen(QString::fromLatin1(m_resourceInstanceIdentifier))) { 55 if (!m_server->listen(QString::fromLatin1(m_resourceInstanceIdentifier))) {
60 Warning() << "Utter failure to start server"; 56 SinkWarning() << "Utter failure to start server";
61 exit(-1); 57 exit(-1);
62 } 58 }
63 } 59 }
64 60
65 if (m_server->isListening()) { 61 if (m_server->isListening()) {
66 Log() << QString("Listening on %1").arg(m_server->serverName()); 62 SinkLog() << QString("Listening on %1").arg(m_server->serverName());
67 } 63 }
68 64
69 m_checkConnectionsTimer = new QTimer; 65 m_checkConnectionsTimer = std::unique_ptr<QTimer>(new QTimer);
70 m_checkConnectionsTimer->setSingleShot(true); 66 m_checkConnectionsTimer->setSingleShot(true);
71 m_checkConnectionsTimer->setInterval(1000); 67 m_checkConnectionsTimer->setInterval(1000);
72 connect(m_checkConnectionsTimer, &QTimer::timeout, [this]() { 68 connect(m_checkConnectionsTimer.get(), &QTimer::timeout, [this]() {
73 if (m_connections.isEmpty()) { 69 if (m_connections.isEmpty()) {
74 Log() << QString("No connections, shutting down."); 70 SinkLog() << QString("No connections, shutting down.");
75 quit(); 71 quit();
76 } 72 }
77 }); 73 });
@@ -80,18 +76,19 @@ Listener::Listener(const QByteArray &resourceInstanceIdentifier, const QByteArra
80 // or even just drop down to invoking the method queued? => invoke queued unless we need throttling 76 // or even just drop down to invoking the method queued? => invoke queued unless we need throttling
81 m_clientBufferProcessesTimer->setInterval(0); 77 m_clientBufferProcessesTimer->setInterval(0);
82 m_clientBufferProcessesTimer->setSingleShot(true); 78 m_clientBufferProcessesTimer->setSingleShot(true);
83 connect(m_clientBufferProcessesTimer, &QTimer::timeout, this, &Listener::processClientBuffers); 79 connect(m_clientBufferProcessesTimer.get(), &QTimer::timeout, this, &Listener::processClientBuffers);
84} 80}
85 81
86Listener::~Listener() 82Listener::~Listener()
87{ 83{
84 closeAllConnections();
88} 85}
89 86
90void Listener::emergencyAbortAllConnections() 87void Listener::emergencyAbortAllConnections()
91{ 88{
92 for (Client &client : m_connections) { 89 for (Client &client : m_connections) {
93 if (client.socket) { 90 if (client.socket) {
94 Warning() << "Sending panic"; 91 SinkWarning() << "Sending panic";
95 client.socket->write("PANIC"); 92 client.socket->write("PANIC");
96 client.socket->waitForBytesWritten(); 93 client.socket->waitForBytesWritten();
97 disconnect(client.socket, 0, this, 0); 94 disconnect(client.socket, 0, this, 0);
@@ -120,11 +117,11 @@ void Listener::closeAllConnections()
120 117
121void Listener::acceptConnection() 118void Listener::acceptConnection()
122{ 119{
123 Trace() << "Accepting connection"; 120 SinkTrace() << "Accepting connection";
124 QLocalSocket *socket = m_server->nextPendingConnection(); 121 QLocalSocket *socket = m_server->nextPendingConnection();
125 122
126 if (!socket) { 123 if (!socket) {
127 Warning() << "Accepted connection but didn't get a socket for it"; 124 SinkWarning() << "Accepted connection but didn't get a socket for it";
128 return; 125 return;
129 } 126 }
130 127
@@ -135,7 +132,7 @@ void Listener::acceptConnection()
135 132
136 // If this is the first client, set the lower limit for revision cleanup 133 // If this is the first client, set the lower limit for revision cleanup
137 if (m_connections.size() == 1) { 134 if (m_connections.size() == 1) {
138 loadResource()->setLowerBoundRevision(0); 135 loadResource().setLowerBoundRevision(0);
139 } 136 }
140 137
141 if (socket->bytesAvailable()) { 138 if (socket->bytesAvailable()) {
@@ -156,13 +153,13 @@ void Listener::clientDropped()
156 const Client &client = it.next(); 153 const Client &client = it.next();
157 if (client.socket == socket) { 154 if (client.socket == socket) {
158 dropped = true; 155 dropped = true;
159 Log() << QString("Dropped connection: %1").arg(client.name) << socket; 156 SinkLog() << QString("Dropped connection: %1").arg(client.name) << socket;
160 it.remove(); 157 it.remove();
161 break; 158 break;
162 } 159 }
163 } 160 }
164 if (!dropped) { 161 if (!dropped) {
165 Warning() << "Failed to find connection for disconnected socket: " << socket; 162 SinkWarning() << "Failed to find connection for disconnected socket: " << socket;
166 } 163 }
167 164
168 checkConnections(); 165 checkConnections();
@@ -172,7 +169,7 @@ void Listener::checkConnections()
172{ 169{
173 // If this was the last client, disengage the lower limit for revision cleanup 170 // If this was the last client, disengage the lower limit for revision cleanup
174 if (m_connections.isEmpty()) { 171 if (m_connections.isEmpty()) {
175 loadResource()->setLowerBoundRevision(std::numeric_limits<qint64>::max()); 172 loadResource().setLowerBoundRevision(std::numeric_limits<qint64>::max());
176 } 173 }
177 m_checkConnectionsTimer->start(); 174 m_checkConnectionsTimer->start();
178} 175}
@@ -188,7 +185,7 @@ void Listener::onDataAvailable()
188 185
189void Listener::readFromSocket(QLocalSocket *socket) 186void Listener::readFromSocket(QLocalSocket *socket)
190{ 187{
191 Trace() << "Reading from socket..."; 188 SinkTrace() << "Reading from socket...";
192 for (Client &client : m_connections) { 189 for (Client &client : m_connections) {
193 if (client.socket == socket) { 190 if (client.socket == socket) {
194 client.commandBuffer += socket->readAll(); 191 client.commandBuffer += socket->readAll();
@@ -231,7 +228,7 @@ void Listener::processCommand(int commandId, uint messageId, const QByteArray &c
231 auto buffer = Sink::Commands::GetHandshake(commandBuffer.constData()); 228 auto buffer = Sink::Commands::GetHandshake(commandBuffer.constData());
232 client.name = buffer->name()->c_str(); 229 client.name = buffer->name()->c_str();
233 } else { 230 } else {
234 Warning() << "received invalid command"; 231 SinkWarning() << "received invalid command";
235 } 232 }
236 break; 233 break;
237 } 234 }
@@ -239,27 +236,27 @@ void Listener::processCommand(int commandId, uint messageId, const QByteArray &c
239 flatbuffers::Verifier verifier((const uint8_t *)commandBuffer.constData(), commandBuffer.size()); 236 flatbuffers::Verifier verifier((const uint8_t *)commandBuffer.constData(), commandBuffer.size());
240 if (Sink::Commands::VerifySynchronizeBuffer(verifier)) { 237 if (Sink::Commands::VerifySynchronizeBuffer(verifier)) {
241 auto buffer = Sink::Commands::GetSynchronize(commandBuffer.constData()); 238 auto buffer = Sink::Commands::GetSynchronize(commandBuffer.constData());
242 Trace() << QString("Synchronize request (id %1) from %2").arg(messageId).arg(client.name); 239 SinkTrace() << QString("Synchronize request (id %1) from %2").arg(messageId).arg(client.name);
243 auto timer = QSharedPointer<QTime>::create(); 240 auto timer = QSharedPointer<QTime>::create();
244 timer->start(); 241 timer->start();
245 auto job = KAsync::null<void>(); 242 auto job = KAsync::null<void>();
246 if (buffer->sourceSync()) { 243 if (buffer->sourceSync()) {
247 job = loadResource()->synchronizeWithSource(); 244 job = loadResource().synchronizeWithSource();
248 } 245 }
249 if (buffer->localSync()) { 246 if (buffer->localSync()) {
250 job = job.then<void>(loadResource()->processAllMessages()); 247 job = job.then<void>(loadResource().processAllMessages());
251 } 248 }
252 job.then<void>([callback, timer]() { 249 job.then<void>([callback, timer]() {
253 Trace() << "Sync took " << Sink::Log::TraceTime(timer->elapsed()); 250 SinkTrace() << "Sync took " << Sink::Log::TraceTime(timer->elapsed());
254 callback(true); 251 callback(true);
255 }, [callback](int errorCode, const QString &msg) { 252 }, [callback](int errorCode, const QString &msg) {
256 Warning() << "Sync failed: " << msg; 253 SinkWarning() << "Sync failed: " << msg;
257 callback(false); 254 callback(false);
258 }) 255 })
259 .exec(); 256 .exec();
260 return; 257 return;
261 } else { 258 } else {
262 Warning() << "received invalid command"; 259 SinkWarning() << "received invalid command";
263 } 260 }
264 break; 261 break;
265 } 262 }
@@ -268,44 +265,43 @@ void Listener::processCommand(int commandId, uint messageId, const QByteArray &c
268 case Sink::Commands::DeleteEntityCommand: 265 case Sink::Commands::DeleteEntityCommand:
269 case Sink::Commands::ModifyEntityCommand: 266 case Sink::Commands::ModifyEntityCommand:
270 case Sink::Commands::CreateEntityCommand: 267 case Sink::Commands::CreateEntityCommand:
271 Trace() << "Command id " << messageId << " of type \"" << Sink::Commands::name(commandId) << "\" from " << client.name; 268 SinkTrace() << "Command id " << messageId << " of type \"" << Sink::Commands::name(commandId) << "\" from " << client.name;
272 loadResource()->processCommand(commandId, commandBuffer); 269 loadResource().processCommand(commandId, commandBuffer);
273 break; 270 break;
274 case Sink::Commands::ShutdownCommand: 271 case Sink::Commands::ShutdownCommand:
275 Log() << QString("Received shutdown command from %1").arg(client.name); 272 SinkLog() << QString("Received shutdown command from %1").arg(client.name);
276 // Immediately reject new connections 273 // Immediately reject new connections
277 m_server->close(); 274 m_server->close();
278 QTimer::singleShot(0, this, &Listener::quit); 275 QTimer::singleShot(0, this, &Listener::quit);
279 break; 276 break;
280 case Sink::Commands::PingCommand: 277 case Sink::Commands::PingCommand:
281 Trace() << QString("Received ping command from %1").arg(client.name); 278 SinkTrace() << QString("Received ping command from %1").arg(client.name);
282 break; 279 break;
283 case Sink::Commands::RevisionReplayedCommand: { 280 case Sink::Commands::RevisionReplayedCommand: {
284 Trace() << QString("Received revision replayed command from %1").arg(client.name); 281 SinkTrace() << QString("Received revision replayed command from %1").arg(client.name);
285 flatbuffers::Verifier verifier((const uint8_t *)commandBuffer.constData(), commandBuffer.size()); 282 flatbuffers::Verifier verifier((const uint8_t *)commandBuffer.constData(), commandBuffer.size());
286 if (Sink::Commands::VerifyRevisionReplayedBuffer(verifier)) { 283 if (Sink::Commands::VerifyRevisionReplayedBuffer(verifier)) {
287 auto buffer = Sink::Commands::GetRevisionReplayed(commandBuffer.constData()); 284 auto buffer = Sink::Commands::GetRevisionReplayed(commandBuffer.constData());
288 client.currentRevision = buffer->revision(); 285 client.currentRevision = buffer->revision();
289 } else { 286 } else {
290 Warning() << "received invalid command"; 287 SinkWarning() << "received invalid command";
291 } 288 }
292 loadResource()->setLowerBoundRevision(lowerBoundRevision()); 289 loadResource().setLowerBoundRevision(lowerBoundRevision());
293 } break; 290 } break;
294 case Sink::Commands::RemoveFromDiskCommand: { 291 case Sink::Commands::RemoveFromDiskCommand: {
295 Log() << QString("Received a remove from disk command from %1").arg(client.name); 292 SinkLog() << QString("Received a remove from disk command from %1").arg(client.name);
296 delete m_resource; 293 m_resource.reset(nullptr);
297 m_resource = nullptr; 294 loadResource().removeDataFromDisk();
298 loadResource()->removeDataFromDisk();
299 m_server->close(); 295 m_server->close();
300 QTimer::singleShot(0, this, &Listener::quit); 296 QTimer::singleShot(0, this, &Listener::quit);
301 } break; 297 } break;
302 default: 298 default:
303 if (commandId > Sink::Commands::CustomCommand) { 299 if (commandId > Sink::Commands::CustomCommand) {
304 Log() << QString("Received custom command from %1: ").arg(client.name) << commandId; 300 SinkLog() << QString("Received custom command from %1: ").arg(client.name) << commandId;
305 loadResource()->processCommand(commandId, commandBuffer); 301 loadResource().processCommand(commandId, commandBuffer);
306 } else { 302 } else {
307 success = false; 303 success = false;
308 ErrorMsg() << QString("\tReceived invalid command from %1: ").arg(client.name) << commandId; 304 SinkError() << QString("\tReceived invalid command from %1: ").arg(client.name) << commandId;
309 } 305 }
310 break; 306 break;
311 } 307 }
@@ -330,7 +326,7 @@ qint64 Listener::lowerBoundRevision()
330void Listener::quit() 326void Listener::quit()
331{ 327{
332 // Broadcast shutdown notifications to open clients, so they don't try to restart the resource 328 // Broadcast shutdown notifications to open clients, so they don't try to restart the resource
333 auto command = Sink::Commands::CreateNotification(m_fbb, Sink::Commands::NotificationType::NotificationType_Shutdown); 329 auto command = Sink::Commands::CreateNotification(m_fbb, Sink::Notification::Shutdown);
334 Sink::Commands::FinishNotificationBuffer(m_fbb, command); 330 Sink::Commands::FinishNotificationBuffer(m_fbb, command);
335 for (Client &client : m_connections) { 331 for (Client &client : m_connections) {
336 if (client.socket && client.socket->isOpen()) { 332 if (client.socket && client.socket->isOpen()) {
@@ -353,7 +349,7 @@ bool Listener::processClientBuffer(Client &client)
353 const uint messageId = *(uint *)client.commandBuffer.constData(); 349 const uint messageId = *(uint *)client.commandBuffer.constData();
354 const int commandId = *(int *)(client.commandBuffer.constData() + sizeof(uint)); 350 const int commandId = *(int *)(client.commandBuffer.constData() + sizeof(uint));
355 const uint size = *(uint *)(client.commandBuffer.constData() + sizeof(int) + sizeof(uint)); 351 const uint size = *(uint *)(client.commandBuffer.constData() + sizeof(int) + sizeof(uint));
356 Trace() << "Received message. Id:" << messageId << " CommandId: " << commandId << " Size: " << size; 352 SinkTrace() << "Received message. Id:" << messageId << " CommandId: " << commandId << " Size: " << size;
357 353
358 // TODO: reject messages above a certain size? 354 // TODO: reject messages above a certain size?
359 355
@@ -366,11 +362,11 @@ bool Listener::processClientBuffer(Client &client)
366 const QByteArray commandBuffer = client.commandBuffer.left(size); 362 const QByteArray commandBuffer = client.commandBuffer.left(size);
367 client.commandBuffer.remove(0, size); 363 client.commandBuffer.remove(0, size);
368 processCommand(commandId, messageId, commandBuffer, client, [this, messageId, commandId, socket, clientName](bool success) { 364 processCommand(commandId, messageId, commandBuffer, client, [this, messageId, commandId, socket, clientName](bool success) {
369 Trace() << QString("Completed command messageid %1 of type \"%2\" from %3").arg(messageId).arg(QString(Sink::Commands::name(commandId))).arg(clientName); 365 SinkTrace() << QString("Completed command messageid %1 of type \"%2\" from %3").arg(messageId).arg(QString(Sink::Commands::name(commandId))).arg(clientName);
370 if (socket) { 366 if (socket) {
371 sendCommandCompleted(socket.data(), messageId, success); 367 sendCommandCompleted(socket.data(), messageId, success);
372 } else { 368 } else {
373 Log() << QString("Socket became invalid before we could send a response. client: %1").arg(clientName); 369 SinkLog() << QString("Socket became invalid before we could send a response. client: %1").arg(clientName);
374 } 370 }
375 }); 371 });
376 372
@@ -407,7 +403,7 @@ void Listener::updateClientsWithRevision(qint64 revision)
407 continue; 403 continue;
408 } 404 }
409 405
410 Trace() << "Sending revision update for " << client.name << revision; 406 SinkTrace() << "Sending revision update for " << client.name << revision;
411 Sink::Commands::write(client.socket, ++m_messageId, Sink::Commands::RevisionUpdateCommand, m_fbb); 407 Sink::Commands::write(client.socket, ++m_messageId, Sink::Commands::RevisionUpdateCommand, m_fbb);
412 } 408 }
413 m_fbb.Clear(); 409 m_fbb.Clear();
@@ -418,7 +414,7 @@ void Listener::notify(const Sink::Notification &notification)
418 auto messageString = m_fbb.CreateString(notification.message.toUtf8().constData(), notification.message.toUtf8().size()); 414 auto messageString = m_fbb.CreateString(notification.message.toUtf8().constData(), notification.message.toUtf8().size());
419 auto idString = m_fbb.CreateString(notification.id.constData(), notification.id.size()); 415 auto idString = m_fbb.CreateString(notification.id.constData(), notification.id.size());
420 Sink::Commands::NotificationBuilder builder(m_fbb); 416 Sink::Commands::NotificationBuilder builder(m_fbb);
421 builder.add_type(static_cast<Sink::Commands::NotificationType>(notification.type)); 417 builder.add_type(notification.type);
422 builder.add_code(notification.code); 418 builder.add_code(notification.code);
423 builder.add_identifier(idString); 419 builder.add_identifier(idString);
424 builder.add_message(messageString); 420 builder.add_message(messageString);
@@ -432,25 +428,25 @@ void Listener::notify(const Sink::Notification &notification)
432 m_fbb.Clear(); 428 m_fbb.Clear();
433} 429}
434 430
435Sink::Resource *Listener::loadResource() 431Sink::Resource &Listener::loadResource()
436{ 432{
437 if (!m_resource) { 433 if (!m_resource) {
438 if (Sink::ResourceFactory *resourceFactory = Sink::ResourceFactory::load(m_resourceName)) { 434 if (Sink::ResourceFactory *resourceFactory = Sink::ResourceFactory::load(m_resourceName)) {
439 m_resource = resourceFactory->createResource(m_resourceInstanceIdentifier); 435 m_resource = std::unique_ptr<Sink::Resource>(resourceFactory->createResource(m_resourceInstanceIdentifier));
440 if (!m_resource) { 436 if (!m_resource) {
441 ErrorMsg() << "Failed to instantiate the resource " << m_resourceName; 437 SinkError() << "Failed to instantiate the resource " << m_resourceName;
442 m_resource = new Sink::Resource; 438 m_resource = std::unique_ptr<Sink::Resource>(new Sink::Resource);
443 } 439 }
444 Trace() << QString("Resource factory: %1").arg((qlonglong)resourceFactory); 440 SinkTrace() << QString("Resource factory: %1").arg((qlonglong)resourceFactory);
445 Trace() << QString("\tResource: %1").arg((qlonglong)m_resource); 441 SinkTrace() << QString("\tResource: %1").arg((qlonglong)m_resource.get());
446 connect(m_resource, &Sink::Resource::revisionUpdated, this, &Listener::refreshRevision); 442 connect(m_resource.get(), &Sink::Resource::revisionUpdated, this, &Listener::refreshRevision);
447 connect(m_resource, &Sink::Resource::notify, this, &Listener::notify); 443 connect(m_resource.get(), &Sink::Resource::notify, this, &Listener::notify);
448 } else { 444 } else {
449 ErrorMsg() << "Failed to load resource " << m_resourceName; 445 SinkError() << "Failed to load resource " << m_resourceName;
450 m_resource = new Sink::Resource; 446 m_resource = std::unique_ptr<Sink::Resource>(new Sink::Resource);
451 } 447 }
452 } 448 }
453 return m_resource; 449 return *m_resource;
454} 450}
455 451
456#pragma clang diagnostic push 452#pragma clang diagnostic push
diff --git a/common/listener.h b/common/listener.h
index 5e376c7..d6c537a 100644
--- a/common/listener.h
+++ b/common/listener.h
@@ -25,6 +25,7 @@
25#include <QPointer> 25#include <QPointer>
26#include <QLocalSocket> 26#include <QLocalSocket>
27#include <flatbuffers/flatbuffers.h> 27#include <flatbuffers/flatbuffers.h>
28#include <log.h>
28 29
29namespace Sink { 30namespace Sink {
30class Resource; 31class Resource;
@@ -54,6 +55,7 @@ public:
54class SINK_EXPORT Listener : public QObject 55class SINK_EXPORT Listener : public QObject
55{ 56{
56 Q_OBJECT 57 Q_OBJECT
58 SINK_DEBUG_AREA("communication")
57 59
58public: 60public:
59 Listener(const QByteArray &resourceName, const QByteArray &resourceType, QObject *parent = 0); 61 Listener(const QByteArray &resourceName, const QByteArray &resourceType, QObject *parent = 0);
@@ -81,17 +83,17 @@ private:
81 bool processClientBuffer(Client &client); 83 bool processClientBuffer(Client &client);
82 void sendCommandCompleted(QLocalSocket *socket, uint messageId, bool success); 84 void sendCommandCompleted(QLocalSocket *socket, uint messageId, bool success);
83 void updateClientsWithRevision(qint64); 85 void updateClientsWithRevision(qint64);
84 Sink::Resource *loadResource(); 86 Sink::Resource &loadResource();
85 void readFromSocket(QLocalSocket *socket); 87 void readFromSocket(QLocalSocket *socket);
86 qint64 lowerBoundRevision(); 88 qint64 lowerBoundRevision();
87 89
88 QLocalServer *m_server; 90 std::unique_ptr<QLocalServer> m_server;
89 QVector<Client> m_connections; 91 QVector<Client> m_connections;
90 flatbuffers::FlatBufferBuilder m_fbb; 92 flatbuffers::FlatBufferBuilder m_fbb;
91 const QByteArray m_resourceName; 93 const QByteArray m_resourceName;
92 const QByteArray m_resourceInstanceIdentifier; 94 const QByteArray m_resourceInstanceIdentifier;
93 Sink::Resource *m_resource; 95 std::unique_ptr<Sink::Resource> m_resource;
94 QTimer *m_clientBufferProcessesTimer; 96 std::unique_ptr<QTimer> m_clientBufferProcessesTimer;
95 QTimer *m_checkConnectionsTimer; 97 std::unique_ptr<QTimer> m_checkConnectionsTimer;
96 int m_messageId; 98 int m_messageId;
97}; 99};
diff --git a/common/log.cpp b/common/log.cpp
index b0f6237..099c043 100644
--- a/common/log.cpp
+++ b/common/log.cpp
@@ -4,16 +4,25 @@
4#include <QIODevice> 4#include <QIODevice>
5#include <QCoreApplication> 5#include <QCoreApplication>
6#include <QSettings> 6#include <QSettings>
7#include <QStandardPaths>
8#include <QSharedPointer> 7#include <QSharedPointer>
8#include <QMutex>
9#include <QMutexLocker>
9#include <iostream> 10#include <iostream>
10#include <unistd.h> 11#include <unistd.h>
12#include <memory>
13#include <definitions.h>
11 14
12using namespace Sink::Log; 15using namespace Sink::Log;
13 16
14static QSharedPointer<QSettings> config() 17static QSharedPointer<QSettings> config()
15{ 18{
16 return QSharedPointer<QSettings>::create(QStandardPaths::writableLocation(QStandardPaths::GenericDataLocation) + "/sink/log.ini", QSettings::IniFormat); 19 return QSharedPointer<QSettings>::create(Sink::configLocation() + "/log.ini", QSettings::IniFormat);
20}
21
22static QByteArray sPrimaryComponent;
23void Sink::Log::setPrimaryComponent(const QString &component)
24{
25 sPrimaryComponent = component.toUtf8();
17} 26}
18 27
19class DebugStream : public QIODevice 28class DebugStream : public QIODevice
@@ -212,9 +221,63 @@ static QByteArray getProgramName()
212 } 221 }
213} 222}
214 223
224static QSharedPointer<QSettings> debugAreasConfig()
225{
226 return QSharedPointer<QSettings>::create(Sink::dataLocation() + "/debugAreas.ini", QSettings::IniFormat);
227}
228
229class DebugAreaCollector {
230public:
231 DebugAreaCollector()
232 {
233 QMutexLocker locker(&mutex);
234 mDebugAreas = debugAreasConfig()->value("areas").value<QString>().split(';').toSet();
235 }
236
237 ~DebugAreaCollector()
238 {
239 QMutexLocker locker(&mutex);
240 mDebugAreas += debugAreasConfig()->value("areas").value<QString>().split(';').toSet();
241 debugAreasConfig()->setValue("areas", QVariant::fromValue(mDebugAreas.toList().join(';')));
242 }
243
244 void add(const QString &area)
245 {
246 QMutexLocker locker(&mutex);
247 mDebugAreas << area;
248 }
249
250 QSet<QString> debugAreas()
251 {
252 QMutexLocker locker(&mutex);
253 return mDebugAreas;
254 }
255
256 QMutex mutex;
257 QSet<QString> mDebugAreas;
258};
259
260static auto sDebugAreaCollector = std::unique_ptr<DebugAreaCollector>(new DebugAreaCollector);
261
262QSet<QString> Sink::Log::debugAreas()
263{
264 return sDebugAreaCollector->debugAreas();
265}
266
267static void collectDebugArea(const QString &debugArea)
268{
269 sDebugAreaCollector->add(debugArea);
270}
271
215static bool containsItemStartingWith(const QByteArray &pattern, const QByteArrayList &list) 272static bool containsItemStartingWith(const QByteArray &pattern, const QByteArrayList &list)
216{ 273{
217 for (const auto &item : list) { 274 for (const auto &item : list) {
275 if (item.startsWith('*')) {
276 auto stripped = item.mid(1);
277 if (pattern.contains(stripped)) {
278 return true;
279 }
280 }
218 if (pattern.startsWith(item)) { 281 if (pattern.startsWith(item)) {
219 return true; 282 return true;
220 } 283 }
@@ -232,24 +295,23 @@ static bool caseInsensitiveContains(const QByteArray &pattern, const QByteArrayL
232 return false; 295 return false;
233} 296}
234 297
235QDebug Sink::Log::debugStream(DebugLevel debugLevel, int line, const char *file, const char *function, const char *debugArea) 298QDebug Sink::Log::debugStream(DebugLevel debugLevel, int line, const char *file, const char *function, const char *debugArea, const char *debugComponent)
236{ 299{
237 static NullStream nullstream; 300 static NullStream nullstream;
238 if (debugLevel < debugOutputLevel()) { 301 if (debugLevel < debugOutputLevel()) {
239 return QDebug(&nullstream); 302 return QDebug(&nullstream);
240 } 303 }
241 304
242 auto areas = debugOutputFilter(Sink::Log::Area); 305 if (sPrimaryComponent.isEmpty()) {
243 if (debugArea && !areas.isEmpty()) { 306 sPrimaryComponent = getProgramName();
244 if (!containsItemStartingWith(debugArea, areas)) {
245 return QDebug(&nullstream);
246 }
247 } 307 }
248 static QByteArray programName = getProgramName(); 308 QString fullDebugArea = sPrimaryComponent + "." + (debugComponent ? (QString::fromLatin1(debugComponent) + ".") : "") + (debugArea ? QString::fromLatin1(debugArea) : "");
309
310 collectDebugArea(fullDebugArea);
249 311
250 auto filter = debugOutputFilter(Sink::Log::ApplicationName); 312 auto areas = debugOutputFilter(Sink::Log::Area);
251 if (!filter.isEmpty() && !filter.contains(programName)) { 313 if (!areas.isEmpty()) {
252 if (!containsItemStartingWith(programName, filter)) { 314 if (!containsItemStartingWith(fullDebugArea.toUtf8(), areas)) {
253 return QDebug(&nullstream); 315 return QDebug(&nullstream);
254 } 316 }
255 } 317 }
@@ -293,19 +355,17 @@ QDebug Sink::Log::debugStream(DebugLevel debugLevel, int line, const char *file,
293 } 355 }
294 if (showProgram) { 356 if (showProgram) {
295 int width = 10; 357 int width = 10;
296 output += QString(" %1(%2)").arg(QString::fromLatin1(programName).leftJustified(width, ' ', true)).arg(unsigned(getpid())).rightJustified(width + 8, ' '); 358 output += QString(" %1(%2)").arg(QString::fromLatin1(getProgramName()).leftJustified(width, ' ', true)).arg(unsigned(getpid())).rightJustified(width + 8, ' ');
297 } 359 }
298 if (debugArea) { 360 if (useColor) {
299 if (useColor) { 361 output += colorCommand(QList<int>() << ANSI_Colors::Bold << prefixColorCode);
300 output += colorCommand(QList<int>() << ANSI_Colors::Bold << prefixColorCode); 362 }
301 } 363 output += QString(" %1 ").arg(fullDebugArea.leftJustified(25, ' ', true));
302 output += QString(" %1 ").arg(QString::fromLatin1(debugArea).leftJustified(25, ' ', true)); 364 if (useColor) {
303 if (useColor) { 365 output += resetColor;
304 output += resetColor;
305 }
306 } 366 }
307 if (showFunction) { 367 if (showFunction) {
308 output += QString(" %3").arg(QString::fromLatin1(function).leftJustified(25, ' ', true)); 368 output += QString(" %3").arg(fullDebugArea.leftJustified(25, ' ', true));
309 } 369 }
310 if (showLocation) { 370 if (showLocation) {
311 const auto filename = QString::fromLatin1(file).split('/').last(); 371 const auto filename = QString::fromLatin1(file).split('/').last();
diff --git a/common/log.h b/common/log.h
index 0e92ea9..f47a3ae 100644
--- a/common/log.h
+++ b/common/log.h
@@ -14,6 +14,9 @@ enum DebugLevel
14 Error 14 Error
15}; 15};
16 16
17void SINK_EXPORT setPrimaryComponent(const QString &component);
18QSet<QString> SINK_EXPORT debugAreas();
19
17QByteArray SINK_EXPORT debugLevelName(DebugLevel debugLevel); 20QByteArray SINK_EXPORT debugLevelName(DebugLevel debugLevel);
18DebugLevel SINK_EXPORT debugLevelFromName(const QByteArray &name); 21DebugLevel SINK_EXPORT debugLevelFromName(const QByteArray &name);
19 22
@@ -55,7 +58,7 @@ QByteArrayList SINK_EXPORT debugOutputFilter(FilterType type);
55void SINK_EXPORT setDebugOutputFields(const QByteArrayList &filter); 58void SINK_EXPORT setDebugOutputFields(const QByteArrayList &filter);
56QByteArrayList SINK_EXPORT debugOutputFields(); 59QByteArrayList SINK_EXPORT debugOutputFields();
57 60
58QDebug SINK_EXPORT debugStream(DebugLevel debugLevel, int line, const char *file, const char *function, const char *debugArea = 0); 61QDebug SINK_EXPORT debugStream(DebugLevel debugLevel, int line, const char *file, const char *function, const char *debugArea = 0, const char *debugComponent = 0);
59 62
60struct SINK_EXPORT TraceTime 63struct SINK_EXPORT TraceTime
61{ 64{
@@ -71,18 +74,23 @@ inline QDebug SINK_EXPORT operator<<(QDebug d, const TraceTime &time)
71} 74}
72} 75}
73 76
74#define DEBUG_AREA nullptr 77static const char *getComponentName() { return nullptr; }
75
76#define Trace_() Sink::Log::debugStream(Sink::Log::DebugLevel::Trace, __LINE__, __FILE__, Q_FUNC_INFO)
77#define Log_() Sink::Log::debugStream(Sink::Log::DebugLevel::Log, __LINE__, __FILE__, Q_FUNC_INFO)
78 78
79#define Trace_area(AREA) Sink::Log::debugStream(Sink::Log::DebugLevel::Trace, __LINE__, __FILE__, Q_FUNC_INFO, AREA) 79#define Trace_area(AREA) Sink::Log::debugStream(Sink::Log::DebugLevel::Trace, __LINE__, __FILE__, Q_FUNC_INFO, AREA)
80#define Log_area(AREA) Sink::Log::debugStream(Sink::Log::DebugLevel::Log, __LINE__, __FILE__, Q_FUNC_INFO, AREA) 80#define Log_area(AREA) Sink::Log::debugStream(Sink::Log::DebugLevel::Log, __LINE__, __FILE__, Q_FUNC_INFO, AREA)
81#define Warning_area(AREA) Sink::Log::debugStream(Sink::Log::DebugLevel::Warning, __LINE__, __FILE__, Q_FUNC_INFO, AREA) 81#define Warning_area(AREA) Sink::Log::debugStream(Sink::Log::DebugLevel::Warning, __LINE__, __FILE__, Q_FUNC_INFO, AREA)
82#define Error_area(AREA) Sink::Log::debugStream(Sink::Log::DebugLevel::Error, __LINE__, __FILE__, Q_FUNC_INFO, AREA) 82#define Error_area(AREA) Sink::Log::debugStream(Sink::Log::DebugLevel::Error, __LINE__, __FILE__, Q_FUNC_INFO, AREA)
83 83
84#define Trace() Sink::Log::debugStream(Sink::Log::DebugLevel::Trace, __LINE__, __FILE__, Q_FUNC_INFO, DEBUG_AREA) 84#define SinkTrace_(COMPONENT, AREA) Sink::Log::debugStream(Sink::Log::DebugLevel::Trace, __LINE__, __FILE__, Q_FUNC_INFO, AREA, COMPONENT)
85#define Log() Sink::Log::debugStream(Sink::Log::DebugLevel::Log, __LINE__, __FILE__, Q_FUNC_INFO, DEBUG_AREA) 85#define SinkLog_(COMPONENT, AREA) Sink::Log::debugStream(Sink::Log::DebugLevel::Log, __LINE__, __FILE__, Q_FUNC_INFO, AREA, COMPONENT)
86#define Warning() Sink::Log::debugStream(Sink::Log::DebugLevel::Warning, __LINE__, __FILE__, Q_FUNC_INFO, DEBUG_AREA) 86#define SinkWarning_(COMPONENT, AREA) Sink::Log::debugStream(Sink::Log::DebugLevel::Warning, __LINE__, __FILE__, Q_FUNC_INFO, AREA, COMPONENT)
87// FIXME Error clashes with Storage::Error and MessageQueue::Error 87#define SinkError_(COMPONENT, AREA) Sink::Log::debugStream(Sink::Log::DebugLevel::Error, __LINE__, __FILE__, Q_FUNC_INFO, AREA, COMPONENT)
88#define ErrorMsg() Sink::Log::debugStream(Sink::Log::DebugLevel::Error, __LINE__, __FILE__, Q_FUNC_INFO, DEBUG_AREA) 88
89#define SinkTrace() Sink::Log::debugStream(Sink::Log::DebugLevel::Trace, __LINE__, __FILE__, Q_FUNC_INFO, s_sinkDebugArea, getComponentName())
90#define SinkLog() Sink::Log::debugStream(Sink::Log::DebugLevel::Log, __LINE__, __FILE__, Q_FUNC_INFO, s_sinkDebugArea, getComponentName())
91#define SinkWarning() Sink::Log::debugStream(Sink::Log::DebugLevel::Warning, __LINE__, __FILE__, Q_FUNC_INFO, s_sinkDebugArea, getComponentName())
92#define SinkError() Sink::Log::debugStream(Sink::Log::DebugLevel::Error, __LINE__, __FILE__, Q_FUNC_INFO, s_sinkDebugArea, getComponentName())
93
94#define SINK_DEBUG_AREA(AREA) static constexpr const char* s_sinkDebugArea{AREA};
95#define SINK_DEBUG_COMPONENT(COMPONENT) const char* getComponentName() const { return COMPONENT; };
96#define SINK_DEBUG_COMPONENT_STATIC(COMPONENT) static const char* getComponentName() { return COMPONENT; };
diff --git a/common/mailpreprocessor.cpp b/common/mailpreprocessor.cpp
index 005a93e..2863ad4 100644
--- a/common/mailpreprocessor.cpp
+++ b/common/mailpreprocessor.cpp
@@ -29,6 +29,8 @@
29 29
30using namespace Sink; 30using namespace Sink;
31 31
32SINK_DEBUG_AREA("mailpreprocessor")
33
32QString MailPropertyExtractor::getFilePathFromMimeMessagePath(const QString &s) const 34QString MailPropertyExtractor::getFilePathFromMimeMessagePath(const QString &s) const
33{ 35{
34 return s; 36 return s;
@@ -37,20 +39,24 @@ QString MailPropertyExtractor::getFilePathFromMimeMessagePath(const QString &s)
37void MailPropertyExtractor::updatedIndexedProperties(Sink::ApplicationDomain::Mail &mail) 39void MailPropertyExtractor::updatedIndexedProperties(Sink::ApplicationDomain::Mail &mail)
38{ 40{
39 const auto mimeMessagePath = getFilePathFromMimeMessagePath(mail.getMimeMessagePath()); 41 const auto mimeMessagePath = getFilePathFromMimeMessagePath(mail.getMimeMessagePath());
40 Trace() << "Updating indexed properties " << mimeMessagePath; 42 if (mimeMessagePath.isNull()) {
43 SinkTrace() << "No mime message";
44 return;
45 }
46 SinkTrace() << "Updating indexed properties " << mimeMessagePath;
41 QFile f(mimeMessagePath); 47 QFile f(mimeMessagePath);
42 if (!f.open(QIODevice::ReadOnly)) { 48 if (!f.open(QIODevice::ReadOnly)) {
43 Warning() << "Failed to open the file: " << mimeMessagePath; 49 SinkWarning() << "Failed to open the file: " << mimeMessagePath;
44 return; 50 return;
45 } 51 }
46 if (!f.size()) { 52 if (!f.size()) {
47 Warning() << "The file is empty."; 53 SinkWarning() << "The file is empty.";
48 return; 54 return;
49 } 55 }
50 const auto mappedSize = qMin((qint64)8000, f.size()); 56 const auto mappedSize = qMin((qint64)8000, f.size());
51 auto mapped = f.map(0, mappedSize); 57 auto mapped = f.map(0, mappedSize);
52 if (!mapped) { 58 if (!mapped) {
53 Warning() << "Failed to map the file: " << f.errorString(); 59 SinkWarning() << "Failed to map the file: " << f.errorString();
54 return; 60 return;
55 } 61 }
56 62
@@ -85,15 +91,15 @@ QString MimeMessageMover::moveMessage(const QString &oldPath, const Sink::Applic
85 const auto filePath = directory + "/" + mail.identifier(); 91 const auto filePath = directory + "/" + mail.identifier();
86 if (oldPath != filePath) { 92 if (oldPath != filePath) {
87 if (!QDir().mkpath(directory)) { 93 if (!QDir().mkpath(directory)) {
88 Warning() << "Failed to create the directory: " << directory; 94 SinkWarning() << "Failed to create the directory: " << directory;
89 } 95 }
90 QFile::remove(filePath); 96 QFile::remove(filePath);
91 QFile origFile(oldPath); 97 QFile origFile(oldPath);
92 if (!origFile.open(QIODevice::ReadWrite)) { 98 if (!origFile.open(QIODevice::ReadWrite)) {
93 Warning() << "Failed to open the original file with write rights: " << origFile.errorString(); 99 SinkWarning() << "Failed to open the original file with write rights: " << origFile.errorString();
94 } 100 }
95 if (!origFile.rename(filePath)) { 101 if (!origFile.rename(filePath)) {
96 Warning() << "Failed to move the file from: " << oldPath << " to " << filePath << ". " << origFile.errorString(); 102 SinkWarning() << "Failed to move the file from: " << oldPath << " to " << filePath << ". " << origFile.errorString();
97 } 103 }
98 origFile.close(); 104 origFile.close();
99 return filePath; 105 return filePath;
diff --git a/common/messagequeue.cpp b/common/messagequeue.cpp
index b9f11f8..a6e44e3 100644
--- a/common/messagequeue.cpp
+++ b/common/messagequeue.cpp
@@ -3,6 +3,8 @@
3#include <QDebug> 3#include <QDebug>
4#include <log.h> 4#include <log.h>
5 5
6SINK_DEBUG_AREA("messagequeue")
7
6static KAsync::Job<void> waitForCompletion(QList<KAsync::Future<void>> &futures) 8static KAsync::Job<void> waitForCompletion(QList<KAsync::Future<void>> &futures)
7{ 9{
8 auto context = new QObject; 10 auto context = new QObject;
@@ -128,7 +130,7 @@ KAsync::Job<void> MessageQueue::dequeueBatch(int maxBatchSize, const std::functi
128 return false; 130 return false;
129 }, 131 },
130 [](const Sink::Storage::Error &error) { 132 [](const Sink::Storage::Error &error) {
131 ErrorMsg() << "Error while retrieving value" << error.message; 133 SinkError() << "Error while retrieving value" << error.message;
132 // errorHandler(Error(error.store, error.code, error.message)); 134 // errorHandler(Error(error.store, error.code, error.message));
133 }); 135 });
134 136
@@ -164,7 +166,7 @@ bool MessageQueue::isEmpty()
164 } 166 }
165 return true; 167 return true;
166 }, 168 },
167 [](const Sink::Storage::Error &error) { ErrorMsg() << "Error while checking if empty" << error.message; }); 169 [](const Sink::Storage::Error &error) { SinkError() << "Error while checking if empty" << error.message; });
168 } 170 }
169 return count == 0; 171 return count == 0;
170} 172}
diff --git a/common/modelresult.cpp b/common/modelresult.cpp
index 3778d4d..56a39ee 100644
--- a/common/modelresult.cpp
+++ b/common/modelresult.cpp
@@ -25,8 +25,7 @@
25#include "domain/folder.h" 25#include "domain/folder.h"
26#include "log.h" 26#include "log.h"
27 27
28#undef DEBUG_AREA 28SINK_DEBUG_AREA("modelresult")
29#define DEBUG_AREA "client.modelresult"
30 29
31static uint qHash(const Sink::ApplicationDomain::ApplicationDomainType &type) 30static uint qHash(const Sink::ApplicationDomain::ApplicationDomainType &type)
32{ 31{
@@ -123,7 +122,7 @@ QModelIndex ModelResult<T, Ptr>::index(int row, int column, const QModelIndex &p
123 const auto childId = list.at(row); 122 const auto childId = list.at(row);
124 return createIndex(row, column, childId); 123 return createIndex(row, column, childId);
125 } 124 }
126 Warning() << "Index not available " << row << column << parent; 125 SinkWarning() << "Index not available " << row << column << parent;
127 Q_ASSERT(false); 126 Q_ASSERT(false);
128 return QModelIndex(); 127 return QModelIndex();
129} 128}
@@ -174,7 +173,7 @@ bool ModelResult<T, Ptr>::canFetchMore(const QModelIndex &parent) const
174template <class T, class Ptr> 173template <class T, class Ptr>
175void ModelResult<T, Ptr>::fetchMore(const QModelIndex &parent) 174void ModelResult<T, Ptr>::fetchMore(const QModelIndex &parent)
176{ 175{
177 Trace() << "Fetching more: " << parent; 176 SinkTrace() << "Fetching more: " << parent;
178 fetchEntities(parent); 177 fetchEntities(parent);
179} 178}
180 179
@@ -185,7 +184,7 @@ void ModelResult<T, Ptr>::add(const Ptr &value)
185 const auto id = parentId(value); 184 const auto id = parentId(value);
186 // Ignore updates we get before the initial fetch is done 185 // Ignore updates we get before the initial fetch is done
187 if (!mEntityChildrenFetched.contains(id)) { 186 if (!mEntityChildrenFetched.contains(id)) {
188 Trace() << "Too early" << id; 187 SinkTrace() << "Too early" << id;
189 return; 188 return;
190 } 189 }
191 auto parent = createIndexFromId(id); 190 auto parent = createIndexFromId(id);
@@ -198,7 +197,7 @@ void ModelResult<T, Ptr>::add(const Ptr &value)
198 } 197 }
199 } 198 }
200 if (mEntities.contains(childId)) { 199 if (mEntities.contains(childId)) {
201 Warning() << "Entity already in model " << value->identifier(); 200 SinkWarning() << "Entity already in model " << value->identifier();
202 return; 201 return;
203 } 202 }
204 // qDebug() << "Inserting rows " << index << parent; 203 // qDebug() << "Inserting rows " << index << parent;
@@ -234,18 +233,18 @@ void ModelResult<T, Ptr>::fetchEntities(const QModelIndex &parent)
234 const auto id = getIdentifier(parent); 233 const auto id = getIdentifier(parent);
235 mEntityChildrenFetchComplete.remove(id); 234 mEntityChildrenFetchComplete.remove(id);
236 mEntityChildrenFetched.insert(id); 235 mEntityChildrenFetched.insert(id);
237 Trace() << "Loading child entities of parent " << id; 236 SinkTrace() << "Loading child entities of parent " << id;
238 if (loadEntities) { 237 if (loadEntities) {
239 loadEntities(parent.data(DomainObjectRole).template value<Ptr>()); 238 loadEntities(parent.data(DomainObjectRole).template value<Ptr>());
240 } else { 239 } else {
241 Warning() << "No way to fetch entities"; 240 SinkWarning() << "No way to fetch entities";
242 } 241 }
243} 242}
244 243
245template <class T, class Ptr> 244template <class T, class Ptr>
246void ModelResult<T, Ptr>::setFetcher(const std::function<void(const Ptr &parent)> &fetcher) 245void ModelResult<T, Ptr>::setFetcher(const std::function<void(const Ptr &parent)> &fetcher)
247{ 246{
248 Trace() << "Setting fetcher"; 247 SinkTrace() << "Setting fetcher";
249 loadEntities = fetcher; 248 loadEntities = fetcher;
250} 249}
251 250
@@ -270,7 +269,7 @@ void ModelResult<T, Ptr>::setEmitter(const typename Sink::ResultEmitter<Ptr>::Pt
270 }); 269 });
271 }); 270 });
272 emitter->onInitialResultSetComplete([this](const Ptr &parent) { 271 emitter->onInitialResultSetComplete([this](const Ptr &parent) {
273 Trace() << "Initial result set complete"; 272 SinkTrace() << "Initial result set complete";
274 const qint64 parentId = parent ? qHash(*parent) : 0; 273 const qint64 parentId = parent ? qHash(*parent) : 0;
275 const auto parentIndex = createIndexFromId(parentId); 274 const auto parentIndex = createIndexFromId(parentId);
276 mEntityChildrenFetchComplete.insert(parentId); 275 mEntityChildrenFetchComplete.insert(parentId);
diff --git a/common/notification.h b/common/notification.h
index 0eb796d..0a267e6 100644
--- a/common/notification.h
+++ b/common/notification.h
@@ -30,6 +30,19 @@ namespace Sink {
30class SINK_EXPORT Notification 30class SINK_EXPORT Notification
31{ 31{
32public: 32public:
33 enum NoticationType {
34 Shutdown,
35 Status,
36 Warning,
37 Progress,
38 Inspection,
39 RevisionUpdate
40 };
41 enum InspectionCode {
42 Success,
43 Failure
44 };
45
33 QByteArray id; 46 QByteArray id;
34 int type; 47 int type;
35 QString message; 48 QString message;
diff --git a/common/pipeline.cpp b/common/pipeline.cpp
index c6d5297..f1a4a32 100644
--- a/common/pipeline.cpp
+++ b/common/pipeline.cpp
@@ -38,8 +38,7 @@
38#include "definitions.h" 38#include "definitions.h"
39#include "bufferutils.h" 39#include "bufferutils.h"
40 40
41#undef DEBUG_AREA 41SINK_DEBUG_AREA("pipeline")
42#define DEBUG_AREA "resource.pipeline"
43 42
44namespace Sink { 43namespace Sink {
45 44
@@ -52,7 +51,7 @@ public:
52 51
53 Storage storage; 52 Storage storage;
54 Storage::Transaction transaction; 53 Storage::Transaction transaction;
55 QHash<QString, QVector<Preprocessor *>> processors; 54 QHash<QString, QVector<QSharedPointer<Preprocessor>>> processors;
56 bool revisionChanged; 55 bool revisionChanged;
57 void storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid); 56 void storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid);
58 QTime transactionTime; 57 QTime transactionTime;
@@ -63,10 +62,10 @@ public:
63 62
64void Pipeline::Private::storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid) 63void Pipeline::Private::storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid)
65{ 64{
66 Trace() << "Committing new revision: " << uid << newRevision; 65 SinkTrace() << "Committing new revision: " << uid << newRevision;
67 Storage::mainDatabase(transaction, bufferType) 66 Storage::mainDatabase(transaction, bufferType)
68 .write(Storage::assembleKey(uid, newRevision), BufferUtils::extractBuffer(fbb), 67 .write(Storage::assembleKey(uid, newRevision), BufferUtils::extractBuffer(fbb),
69 [uid, newRevision](const Storage::Error &error) { Warning() << "Failed to write entity" << uid << newRevision; }); 68 [uid, newRevision](const Storage::Error &error) { SinkWarning() << "Failed to write entity" << uid << newRevision; });
70 revisionChanged = true; 69 revisionChanged = true;
71 Storage::setMaxRevision(transaction, newRevision); 70 Storage::setMaxRevision(transaction, newRevision);
72 Storage::recordRevision(transaction, newRevision, uid, bufferType); 71 Storage::recordRevision(transaction, newRevision, uid, bufferType);
@@ -79,15 +78,17 @@ Pipeline::Pipeline(const QString &resourceName, QObject *parent) : QObject(paren
79 78
80Pipeline::~Pipeline() 79Pipeline::~Pipeline()
81{ 80{
82 delete d; 81 d->transaction = Storage::Transaction();
83} 82}
84 83
85void Pipeline::setPreprocessors(const QString &entityType, const QVector<Preprocessor *> &processors) 84void Pipeline::setPreprocessors(const QString &entityType, const QVector<Preprocessor *> &processors)
86{ 85{
86 auto &list = d->processors[entityType];
87 list.clear();
87 for (auto p : processors) { 88 for (auto p : processors) {
88 p->setup(d->resourceType, d->resourceInstanceIdentifier, this); 89 p->setup(d->resourceType, d->resourceInstanceIdentifier, this);
90 list.append(QSharedPointer<Preprocessor>(p));
89 } 91 }
90 d->processors[entityType] = processors;
91} 92}
92 93
93void Pipeline::setResourceType(const QByteArray &resourceType) 94void Pipeline::setResourceType(const QByteArray &resourceType)
@@ -105,21 +106,21 @@ void Pipeline::startTransaction()
105 if (d->transaction) { 106 if (d->transaction) {
106 return; 107 return;
107 } 108 }
108 Trace() << "Starting transaction."; 109 SinkTrace() << "Starting transaction.";
109 d->transactionTime.start(); 110 d->transactionTime.start();
110 d->transactionItemCount = 0; 111 d->transactionItemCount = 0;
111 d->transaction = std::move(storage().createTransaction(Storage::ReadWrite, [](const Sink::Storage::Error &error) { 112 d->transaction = storage().createTransaction(Storage::ReadWrite, [](const Sink::Storage::Error &error) {
112 Warning() << error.message; 113 SinkWarning() << error.message;
113 })); 114 });
114 115
115 //FIXME this is a temporary measure to recover from a failure to open the named databases correctly. 116 //FIXME this is a temporary measure to recover from a failure to open the named databases correctly.
116 //Once the actual problem is fixed it will be enough to simply crash if we open the wrong database (which we check in openDatabase already). 117 //Once the actual problem is fixed it will be enough to simply crash if we open the wrong database (which we check in openDatabase already).
117 //It seems like the validateNamedDatabase calls actually stops the mdb_put failures during sync... 118 //It seems like the validateNamedDatabase calls actually stops the mdb_put failures during sync...
118 if (d->storage.exists()) { 119 if (d->storage.exists()) {
119 while (!d->transaction.validateNamedDatabases()) { 120 while (!d->transaction.validateNamedDatabases()) {
120 Warning() << "Opened an invalid transaction!!!!!!"; 121 SinkWarning() << "Opened an invalid transaction!!!!!!";
121 d->transaction = std::move(storage().createTransaction(Storage::ReadWrite, [](const Sink::Storage::Error &error) { 122 d->transaction = std::move(storage().createTransaction(Storage::ReadWrite, [](const Sink::Storage::Error &error) {
122 Warning() << error.message; 123 SinkWarning() << error.message;
123 })); 124 }));
124 } 125 }
125 } 126 }
@@ -139,7 +140,7 @@ void Pipeline::commit()
139 } 140 }
140 const auto revision = Storage::maxRevision(d->transaction); 141 const auto revision = Storage::maxRevision(d->transaction);
141 const auto elapsed = d->transactionTime.elapsed(); 142 const auto elapsed = d->transactionTime.elapsed();
142 Log() << "Committing revision: " << revision << ":" << d->transactionItemCount << " items in: " << Log::TraceTime(elapsed) << " " 143 SinkLog() << "Committing revision: " << revision << ":" << d->transactionItemCount << " items in: " << Log::TraceTime(elapsed) << " "
143 << (double)elapsed / (double)qMax(d->transactionItemCount, 1) << "[ms/item]"; 144 << (double)elapsed / (double)qMax(d->transactionItemCount, 1) << "[ms/item]";
144 if (d->transaction) { 145 if (d->transaction) {
145 d->transaction.commit(); 146 d->transaction.commit();
@@ -168,7 +169,7 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size)
168 { 169 {
169 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size); 170 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size);
170 if (!Commands::VerifyCreateEntityBuffer(verifyer)) { 171 if (!Commands::VerifyCreateEntityBuffer(verifyer)) {
171 Warning() << "invalid buffer, not a create entity buffer"; 172 SinkWarning() << "invalid buffer, not a create entity buffer";
172 return KAsync::error<qint64>(0); 173 return KAsync::error<qint64>(0);
173 } 174 }
174 } 175 }
@@ -180,7 +181,7 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size)
180 if (createEntity->entityId()) { 181 if (createEntity->entityId()) {
181 key = QByteArray(reinterpret_cast<char const *>(createEntity->entityId()->Data()), createEntity->entityId()->size()); 182 key = QByteArray(reinterpret_cast<char const *>(createEntity->entityId()->Data()), createEntity->entityId()->size());
182 if (Storage::mainDatabase(d->transaction, bufferType).contains(key)) { 183 if (Storage::mainDatabase(d->transaction, bufferType).contains(key)) {
183 ErrorMsg() << "An entity with this id already exists: " << key; 184 SinkError() << "An entity with this id already exists: " << key;
184 return KAsync::error<qint64>(0); 185 return KAsync::error<qint64>(0);
185 } 186 }
186 } 187 }
@@ -188,31 +189,31 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size)
188 if (key.isEmpty()) { 189 if (key.isEmpty()) {
189 key = Sink::Storage::generateUid(); 190 key = Sink::Storage::generateUid();
190 } 191 }
191 Log() << "New Entity. Type: " << bufferType << "uid: "<< key << " replayToSource: " << replayToSource; 192 SinkLog() << "New Entity. Type: " << bufferType << "uid: "<< key << " replayToSource: " << replayToSource;
192 Q_ASSERT(!key.isEmpty()); 193 Q_ASSERT(!key.isEmpty());
193 194
194 { 195 {
195 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(createEntity->delta()->Data()), createEntity->delta()->size()); 196 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(createEntity->delta()->Data()), createEntity->delta()->size());
196 if (!VerifyEntityBuffer(verifyer)) { 197 if (!VerifyEntityBuffer(verifyer)) {
197 Warning() << "invalid buffer, not an entity buffer"; 198 SinkWarning() << "invalid buffer, not an entity buffer";
198 return KAsync::error<qint64>(0); 199 return KAsync::error<qint64>(0);
199 } 200 }
200 } 201 }
201 auto entity = GetEntity(createEntity->delta()->Data()); 202 auto entity = GetEntity(createEntity->delta()->Data());
202 if (!entity->resource()->size() && !entity->local()->size()) { 203 if (!entity->resource()->size() && !entity->local()->size()) {
203 Warning() << "No local and no resource buffer while trying to create entity."; 204 SinkWarning() << "No local and no resource buffer while trying to create entity.";
204 return KAsync::error<qint64>(0); 205 return KAsync::error<qint64>(0);
205 } 206 }
206 207
207 auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceType, bufferType); 208 auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceType, bufferType);
208 if (!adaptorFactory) { 209 if (!adaptorFactory) {
209 Warning() << "no adaptor factory for type " << bufferType; 210 SinkWarning() << "no adaptor factory for type " << bufferType;
210 return KAsync::error<qint64>(0); 211 return KAsync::error<qint64>(0);
211 } 212 }
212 213
213 auto adaptor = adaptorFactory->createAdaptor(*entity); 214 auto adaptor = adaptorFactory->createAdaptor(*entity);
214 auto memoryAdaptor = QSharedPointer<Sink::ApplicationDomain::MemoryBufferAdaptor>::create(*(adaptor), adaptor->availableProperties()); 215 auto memoryAdaptor = QSharedPointer<Sink::ApplicationDomain::MemoryBufferAdaptor>::create(*(adaptor), adaptor->availableProperties());
215 for (auto processor : d->processors[bufferType]) { 216 foreach (const auto &processor, d->processors[bufferType]) {
216 processor->newEntity(key, Storage::maxRevision(d->transaction) + 1, *memoryAdaptor, d->transaction); 217 processor->newEntity(key, Storage::maxRevision(d->transaction) + 1, *memoryAdaptor, d->transaction);
217 } 218 }
218 //The maxRevision may have changed meanwhile if the entity created sub-entities 219 //The maxRevision may have changed meanwhile if the entity created sub-entities
@@ -242,7 +243,7 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size)
242 { 243 {
243 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size); 244 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size);
244 if (!Commands::VerifyModifyEntityBuffer(verifyer)) { 245 if (!Commands::VerifyModifyEntityBuffer(verifyer)) {
245 Warning() << "invalid buffer, not a modify entity buffer"; 246 SinkWarning() << "invalid buffer, not a modify entity buffer";
246 return KAsync::error<qint64>(0); 247 return KAsync::error<qint64>(0);
247 } 248 }
248 } 249 }
@@ -252,21 +253,21 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size)
252 if (modifyEntity->modifiedProperties()) { 253 if (modifyEntity->modifiedProperties()) {
253 changeset = BufferUtils::fromVector(*modifyEntity->modifiedProperties()); 254 changeset = BufferUtils::fromVector(*modifyEntity->modifiedProperties());
254 } else { 255 } else {
255 Warning() << "No changeset available"; 256 SinkWarning() << "No changeset available";
256 } 257 }
257 const qint64 baseRevision = modifyEntity->revision(); 258 const qint64 baseRevision = modifyEntity->revision();
258 const bool replayToSource = modifyEntity->replayToSource(); 259 const bool replayToSource = modifyEntity->replayToSource();
259 const QByteArray bufferType = QByteArray(reinterpret_cast<char const *>(modifyEntity->domainType()->Data()), modifyEntity->domainType()->size()); 260 const QByteArray bufferType = QByteArray(reinterpret_cast<char const *>(modifyEntity->domainType()->Data()), modifyEntity->domainType()->size());
260 const QByteArray key = QByteArray(reinterpret_cast<char const *>(modifyEntity->entityId()->Data()), modifyEntity->entityId()->size()); 261 const QByteArray key = QByteArray(reinterpret_cast<char const *>(modifyEntity->entityId()->Data()), modifyEntity->entityId()->size());
261 Log() << "Modified Entity. Type: " << bufferType << "uid: "<< key << " replayToSource: " << replayToSource; 262 SinkLog() << "Modified Entity. Type: " << bufferType << "uid: "<< key << " replayToSource: " << replayToSource;
262 if (bufferType.isEmpty() || key.isEmpty()) { 263 if (bufferType.isEmpty() || key.isEmpty()) {
263 Warning() << "entity type or key " << bufferType << key; 264 SinkWarning() << "entity type or key " << bufferType << key;
264 return KAsync::error<qint64>(0); 265 return KAsync::error<qint64>(0);
265 } 266 }
266 { 267 {
267 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(modifyEntity->delta()->Data()), modifyEntity->delta()->size()); 268 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(modifyEntity->delta()->Data()), modifyEntity->delta()->size());
268 if (!VerifyEntityBuffer(verifyer)) { 269 if (!VerifyEntityBuffer(verifyer)) {
269 Warning() << "invalid buffer, not an entity buffer"; 270 SinkWarning() << "invalid buffer, not an entity buffer";
270 return KAsync::error<qint64>(0); 271 return KAsync::error<qint64>(0);
271 } 272 }
272 } 273 }
@@ -274,7 +275,7 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size)
274 // TODO use only readPropertyMapper and writePropertyMapper 275 // TODO use only readPropertyMapper and writePropertyMapper
275 auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceType, bufferType); 276 auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceType, bufferType);
276 if (!adaptorFactory) { 277 if (!adaptorFactory) {
277 Warning() << "no adaptor factory for type " << bufferType; 278 SinkWarning() << "no adaptor factory for type " << bufferType;
278 return KAsync::error<qint64>(0); 279 return KAsync::error<qint64>(0);
279 } 280 }
280 281
@@ -288,16 +289,16 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size)
288 [&current, adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool { 289 [&current, adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool {
289 EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); 290 EntityBuffer buffer(const_cast<const char *>(data.data()), data.size());
290 if (!buffer.isValid()) { 291 if (!buffer.isValid()) {
291 Warning() << "Read invalid buffer from disk"; 292 SinkWarning() << "Read invalid buffer from disk";
292 } else { 293 } else {
293 current = adaptorFactory->createAdaptor(buffer.entity()); 294 current = adaptorFactory->createAdaptor(buffer.entity());
294 } 295 }
295 return false; 296 return false;
296 }, 297 },
297 [baseRevision](const Storage::Error &error) { Warning() << "Failed to read old revision from storage: " << error.message << "Revision: " << baseRevision; }); 298 [baseRevision](const Storage::Error &error) { SinkWarning() << "Failed to read old revision from storage: " << error.message << "Revision: " << baseRevision; });
298 299
299 if (!current) { 300 if (!current) {
300 Warning() << "Failed to read local value " << key; 301 SinkWarning() << "Failed to read local value " << key;
301 return KAsync::error<qint64>(0); 302 return KAsync::error<qint64>(0);
302 } 303 }
303 304
@@ -305,7 +306,7 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size)
305 306
306 // Apply diff 307 // Apply diff
307 // FIXME only apply the properties that are available in the buffer 308 // FIXME only apply the properties that are available in the buffer
308 Trace() << "Applying changed properties: " << changeset; 309 SinkTrace() << "Applying changed properties: " << changeset;
309 for (const auto &property : changeset) { 310 for (const auto &property : changeset) {
310 const auto value = diff->getProperty(property); 311 const auto value = diff->getProperty(property);
311 if (value.isValid()) { 312 if (value.isValid()) {
@@ -321,7 +322,7 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size)
321 } 322 }
322 323
323 newAdaptor->resetChangedProperties(); 324 newAdaptor->resetChangedProperties();
324 for (auto processor : d->processors[bufferType]) { 325 foreach (const auto &processor, d->processors[bufferType]) {
325 processor->modifiedEntity(key, Storage::maxRevision(d->transaction) + 1, *current, *newAdaptor, d->transaction); 326 processor->modifiedEntity(key, Storage::maxRevision(d->transaction) + 1, *current, *newAdaptor, d->transaction);
326 } 327 }
327 //The maxRevision may have changed meanwhile if the entity created sub-entities 328 //The maxRevision may have changed meanwhile if the entity created sub-entities
@@ -355,7 +356,7 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size)
355 { 356 {
356 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size); 357 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(command), size);
357 if (!Commands::VerifyDeleteEntityBuffer(verifyer)) { 358 if (!Commands::VerifyDeleteEntityBuffer(verifyer)) {
358 Warning() << "invalid buffer, not a delete entity buffer"; 359 SinkWarning() << "invalid buffer, not a delete entity buffer";
359 return KAsync::error<qint64>(0); 360 return KAsync::error<qint64>(0);
360 } 361 }
361 } 362 }
@@ -364,7 +365,7 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size)
364 const bool replayToSource = deleteEntity->replayToSource(); 365 const bool replayToSource = deleteEntity->replayToSource();
365 const QByteArray bufferType = QByteArray(reinterpret_cast<char const *>(deleteEntity->domainType()->Data()), deleteEntity->domainType()->size()); 366 const QByteArray bufferType = QByteArray(reinterpret_cast<char const *>(deleteEntity->domainType()->Data()), deleteEntity->domainType()->size());
366 const QByteArray key = QByteArray(reinterpret_cast<char const *>(deleteEntity->entityId()->Data()), deleteEntity->entityId()->size()); 367 const QByteArray key = QByteArray(reinterpret_cast<char const *>(deleteEntity->entityId()->Data()), deleteEntity->entityId()->size());
367 Log() << "Deleted Entity. Type: " << bufferType << "uid: "<< key << " replayToSource: " << replayToSource; 368 SinkLog() << "Deleted Entity. Type: " << bufferType << "uid: "<< key << " replayToSource: " << replayToSource;
368 369
369 bool found = false; 370 bool found = false;
370 bool alreadyRemoved = false; 371 bool alreadyRemoved = false;
@@ -381,14 +382,14 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size)
381 } 382 }
382 return false; 383 return false;
383 }, 384 },
384 [](const Storage::Error &error) { Warning() << "Failed to read old revision from storage: " << error.message; }); 385 [](const Storage::Error &error) { SinkWarning() << "Failed to read old revision from storage: " << error.message; });
385 386
386 if (!found) { 387 if (!found) {
387 Warning() << "Failed to find entity " << key; 388 SinkWarning() << "Failed to find entity " << key;
388 return KAsync::error<qint64>(0); 389 return KAsync::error<qint64>(0);
389 } 390 }
390 if (alreadyRemoved) { 391 if (alreadyRemoved) {
391 Warning() << "Entity is already removed " << key; 392 SinkWarning() << "Entity is already removed " << key;
392 return KAsync::error<qint64>(0); 393 return KAsync::error<qint64>(0);
393 } 394 }
394 395
@@ -408,7 +409,7 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size)
408 409
409 auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceType, bufferType); 410 auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceType, bufferType);
410 if (!adaptorFactory) { 411 if (!adaptorFactory) {
411 Warning() << "no adaptor factory for type " << bufferType; 412 SinkWarning() << "no adaptor factory for type " << bufferType;
412 return KAsync::error<qint64>(0); 413 return KAsync::error<qint64>(0);
413 } 414 }
414 415
@@ -418,17 +419,17 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size)
418 [this, bufferType, newRevision, adaptorFactory, key, &current](const QByteArray &, const QByteArray &data) -> bool { 419 [this, bufferType, newRevision, adaptorFactory, key, &current](const QByteArray &, const QByteArray &data) -> bool {
419 EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); 420 EntityBuffer buffer(const_cast<const char *>(data.data()), data.size());
420 if (!buffer.isValid()) { 421 if (!buffer.isValid()) {
421 Warning() << "Read invalid buffer from disk"; 422 SinkWarning() << "Read invalid buffer from disk";
422 } else { 423 } else {
423 current = adaptorFactory->createAdaptor(buffer.entity()); 424 current = adaptorFactory->createAdaptor(buffer.entity());
424 } 425 }
425 return false; 426 return false;
426 }, 427 },
427 [this](const Storage::Error &error) { ErrorMsg() << "Failed to find value in pipeline: " << error.message; }); 428 [this](const Storage::Error &error) { SinkError() << "Failed to find value in pipeline: " << error.message; });
428 429
429 d->storeNewRevision(newRevision, fbb, bufferType, key); 430 d->storeNewRevision(newRevision, fbb, bufferType, key);
430 431
431 for (auto processor : d->processors[bufferType]) { 432 foreach (const auto &processor, d->processors[bufferType]) {
432 processor->deletedEntity(key, newRevision, *current, d->transaction); 433 processor->deletedEntity(key, newRevision, *current, d->transaction);
433 } 434 }
434 435
@@ -440,13 +441,13 @@ void Pipeline::cleanupRevision(qint64 revision)
440 d->revisionChanged = true; 441 d->revisionChanged = true;
441 const auto uid = Storage::getUidFromRevision(d->transaction, revision); 442 const auto uid = Storage::getUidFromRevision(d->transaction, revision);
442 const auto bufferType = Storage::getTypeFromRevision(d->transaction, revision); 443 const auto bufferType = Storage::getTypeFromRevision(d->transaction, revision);
443 Trace() << "Cleaning up revision " << revision << uid << bufferType; 444 SinkTrace() << "Cleaning up revision " << revision << uid << bufferType;
444 Storage::mainDatabase(d->transaction, bufferType) 445 Storage::mainDatabase(d->transaction, bufferType)
445 .scan(uid, 446 .scan(uid,
446 [&](const QByteArray &key, const QByteArray &data) -> bool { 447 [&](const QByteArray &key, const QByteArray &data) -> bool {
447 EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); 448 EntityBuffer buffer(const_cast<const char *>(data.data()), data.size());
448 if (!buffer.isValid()) { 449 if (!buffer.isValid()) {
449 Warning() << "Read invalid buffer from disk"; 450 SinkWarning() << "Read invalid buffer from disk";
450 } else { 451 } else {
451 const auto metadata = flatbuffers::GetRoot<Metadata>(buffer.metadataBuffer()); 452 const auto metadata = flatbuffers::GetRoot<Metadata>(buffer.metadataBuffer());
452 const qint64 rev = metadata->revision(); 453 const qint64 rev = metadata->revision();
@@ -459,7 +460,7 @@ void Pipeline::cleanupRevision(qint64 revision)
459 460
460 return true; 461 return true;
461 }, 462 },
462 [](const Storage::Error &error) { Warning() << "Error while reading: " << error.message; }, true); 463 [](const Storage::Error &error) { SinkWarning() << "Error while reading: " << error.message; }, true);
463 Storage::setCleanedUpRevision(d->transaction, revision); 464 Storage::setCleanedUpRevision(d->transaction, revision);
464} 465}
465 466
@@ -481,7 +482,6 @@ Preprocessor::Preprocessor() : d(new Preprocessor::Private)
481 482
482Preprocessor::~Preprocessor() 483Preprocessor::~Preprocessor()
483{ 484{
484 delete d;
485} 485}
486 486
487void Preprocessor::setup(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier, Pipeline *pipeline) 487void Preprocessor::setup(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier, Pipeline *pipeline)
diff --git a/common/pipeline.h b/common/pipeline.h
index d04d795..ef89cf0 100644
--- a/common/pipeline.h
+++ b/common/pipeline.h
@@ -72,7 +72,7 @@ signals:
72 72
73private: 73private:
74 class Private; 74 class Private;
75 Private *const d; 75 const std::unique_ptr<Private> d;
76}; 76};
77 77
78class SINK_EXPORT Preprocessor 78class SINK_EXPORT Preprocessor
@@ -103,7 +103,7 @@ protected:
103private: 103private:
104 friend class Pipeline; 104 friend class Pipeline;
105 class Private; 105 class Private;
106 Private *const d; 106 const std::unique_ptr<Private> d;
107}; 107};
108 108
109template<typename DomainType> 109template<typename DomainType>
diff --git a/common/queryrunner.cpp b/common/queryrunner.cpp
index 78a4b94..2e2e96d 100644
--- a/common/queryrunner.cpp
+++ b/common/queryrunner.cpp
@@ -29,8 +29,7 @@
29#include "asyncutils.h" 29#include "asyncutils.h"
30#include "entityreader.h" 30#include "entityreader.h"
31 31
32#undef DEBUG_AREA 32SINK_DEBUG_AREA("queryrunner")
33#define DEBUG_AREA "client.queryrunner"
34 33
35using namespace Sink; 34using namespace Sink;
36 35
@@ -43,6 +42,8 @@ using namespace Sink;
43template <typename DomainType> 42template <typename DomainType>
44class QueryWorker : public QObject 43class QueryWorker : public QObject
45{ 44{
45 // SINK_DEBUG_COMPONENT(mResourceInstanceIdentifier, mId)
46 SINK_DEBUG_COMPONENT(mResourceInstanceIdentifier)
46public: 47public:
47 QueryWorker(const Sink::Query &query, const QByteArray &instanceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &, const QByteArray &bufferType, 48 QueryWorker(const Sink::Query &query, const QByteArray &instanceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &, const QByteArray &bufferType,
48 const QueryRunnerBase::ResultTransformation &transformation); 49 const QueryRunnerBase::ResultTransformation &transformation);
@@ -61,22 +62,19 @@ private:
61 QByteArray mId; //Used for identification in debug output 62 QByteArray mId; //Used for identification in debug output
62}; 63};
63 64
64#undef Trace
65#define Trace() Trace_area(DEBUG_AREA)
66
67template <class DomainType> 65template <class DomainType>
68QueryRunner<DomainType>::QueryRunner(const Sink::Query &query, const Sink::ResourceAccessInterface::Ptr &resourceAccess, const QByteArray &instanceIdentifier, 66QueryRunner<DomainType>::QueryRunner(const Sink::Query &query, const Sink::ResourceAccessInterface::Ptr &resourceAccess, const QByteArray &instanceIdentifier,
69 const DomainTypeAdaptorFactoryInterface::Ptr &factory, const QByteArray &bufferType) 67 const DomainTypeAdaptorFactoryInterface::Ptr &factory, const QByteArray &bufferType)
70 : QueryRunnerBase(), mResourceAccess(resourceAccess), mResultProvider(new ResultProvider<typename DomainType::Ptr>), mBatchSize(query.limit) 68 : QueryRunnerBase(), mResourceAccess(resourceAccess), mResultProvider(new ResultProvider<typename DomainType::Ptr>), mBatchSize(query.limit)
71{ 69{
72 Trace() << "Starting query"; 70 SinkTrace() << "Starting query";
73 if (query.limit && query.sortProperty.isEmpty()) { 71 if (query.limit && query.sortProperty.isEmpty()) {
74 Warning() << "A limited query without sorting is typically a bad idea."; 72 SinkWarning() << "A limited query without sorting is typically a bad idea.";
75 } 73 }
76 // We delegate loading of initial data to the result provider, so it can decide for itself what it needs to load. 74 // We delegate loading of initial data to the result provider, so it can decide for itself what it needs to load.
77 mResultProvider->setFetcher([=](const typename DomainType::Ptr &parent) { 75 mResultProvider->setFetcher([=](const typename DomainType::Ptr &parent) {
78 const QByteArray parentId = parent ? parent->identifier() : QByteArray(); 76 const QByteArray parentId = parent ? parent->identifier() : QByteArray();
79 Trace() << "Running fetcher. Offset: " << mOffset[parentId] << " Batchsize: " << mBatchSize; 77 SinkTrace() << "Running fetcher. Offset: " << mOffset[parentId] << " Batchsize: " << mBatchSize;
80 auto resultProvider = mResultProvider; 78 auto resultProvider = mResultProvider;
81 if (query.synchronousQuery) { 79 if (query.synchronousQuery) {
82 QueryWorker<DomainType> worker(query, instanceIdentifier, factory, bufferType, mResultTransformation); 80 QueryWorker<DomainType> worker(query, instanceIdentifier, factory, bufferType, mResultTransformation);
@@ -123,12 +121,15 @@ QueryRunner<DomainType>::QueryRunner(const Sink::Query &query, const Sink::Resou
123 mResourceAccess->open(); 121 mResourceAccess->open();
124 QObject::connect(mResourceAccess.data(), &Sink::ResourceAccess::revisionChanged, this, &QueryRunner::revisionChanged); 122 QObject::connect(mResourceAccess.data(), &Sink::ResourceAccess::revisionChanged, this, &QueryRunner::revisionChanged);
125 } 123 }
124 mResultProvider->onDone([this]() {
125 delete this;
126 });
126} 127}
127 128
128template <class DomainType> 129template <class DomainType>
129QueryRunner<DomainType>::~QueryRunner() 130QueryRunner<DomainType>::~QueryRunner()
130{ 131{
131 Trace() << "Stopped query"; 132 SinkTrace() << "Stopped query";
132} 133}
133 134
134template <class DomainType> 135template <class DomainType>
@@ -144,21 +145,18 @@ typename Sink::ResultEmitter<typename DomainType::Ptr>::Ptr QueryRunner<DomainTy
144} 145}
145 146
146 147
147#undef Trace
148#define Trace() Trace_area("client.queryrunner." + mId)
149
150template <class DomainType> 148template <class DomainType>
151QueryWorker<DomainType>::QueryWorker(const Sink::Query &query, const QByteArray &instanceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &factory, 149QueryWorker<DomainType>::QueryWorker(const Sink::Query &query, const QByteArray &instanceIdentifier, const DomainTypeAdaptorFactoryInterface::Ptr &factory,
152 const QByteArray &bufferType, const QueryRunnerBase::ResultTransformation &transformation) 150 const QByteArray &bufferType, const QueryRunnerBase::ResultTransformation &transformation)
153 : QObject(), mResultTransformation(transformation), mDomainTypeAdaptorFactory(factory), mResourceInstanceIdentifier(instanceIdentifier), mId(QUuid::createUuid().toByteArray()) 151 : QObject(), mResultTransformation(transformation), mDomainTypeAdaptorFactory(factory), mResourceInstanceIdentifier(instanceIdentifier), mId(QUuid::createUuid().toByteArray())
154{ 152{
155 Trace() << "Starting query worker"; 153 SinkTrace() << "Starting query worker";
156} 154}
157 155
158template <class DomainType> 156template <class DomainType>
159QueryWorker<DomainType>::~QueryWorker() 157QueryWorker<DomainType>::~QueryWorker()
160{ 158{
161 Trace() << "Stopped query worker"; 159 SinkTrace() << "Stopped query worker";
162} 160}
163 161
164template <class DomainType> 162template <class DomainType>
@@ -171,15 +169,15 @@ std::function<bool(const typename DomainType::Ptr &, Sink::Operation)> QueryWork
171 } 169 }
172 switch (operation) { 170 switch (operation) {
173 case Sink::Operation_Creation: 171 case Sink::Operation_Creation:
174 // Trace() << "Got creation"; 172 // SinkTrace() << "Got creation";
175 resultProvider.add(valueCopy); 173 resultProvider.add(valueCopy);
176 break; 174 break;
177 case Sink::Operation_Modification: 175 case Sink::Operation_Modification:
178 // Trace() << "Got modification"; 176 // SinkTrace() << "Got modification";
179 resultProvider.modify(valueCopy); 177 resultProvider.modify(valueCopy);
180 break; 178 break;
181 case Sink::Operation_Removal: 179 case Sink::Operation_Removal:
182 // Trace() << "Got removal"; 180 // SinkTrace() << "Got removal";
183 resultProvider.remove(valueCopy); 181 resultProvider.remove(valueCopy);
184 break; 182 break;
185 } 183 }
@@ -197,7 +195,7 @@ QPair<qint64, qint64> QueryWorker<DomainType>::executeIncrementalQuery(const Sin
197 195
198 Sink::EntityReader<DomainType> reader(*mDomainTypeAdaptorFactory, mResourceInstanceIdentifier, transaction); 196 Sink::EntityReader<DomainType> reader(*mDomainTypeAdaptorFactory, mResourceInstanceIdentifier, transaction);
199 auto revisionAndReplayedEntities = reader.executeIncrementalQuery(query, resultProvider.revision(), resultProviderCallback(query, resultProvider)); 197 auto revisionAndReplayedEntities = reader.executeIncrementalQuery(query, resultProvider.revision(), resultProviderCallback(query, resultProvider));
200 Trace() << "Incremental query took: " << Log::TraceTime(time.elapsed()); 198 SinkTrace() << "Incremental query took: " << Log::TraceTime(time.elapsed());
201 return revisionAndReplayedEntities; 199 return revisionAndReplayedEntities;
202} 200}
203 201
@@ -209,10 +207,10 @@ Storage::Transaction QueryWorker<DomainType>::getTransaction()
209 Sink::Storage storage(Sink::storageLocation(), mResourceInstanceIdentifier); 207 Sink::Storage storage(Sink::storageLocation(), mResourceInstanceIdentifier);
210 if (!storage.exists()) { 208 if (!storage.exists()) {
211 //This is not an error if the resource wasn't started before 209 //This is not an error if the resource wasn't started before
212 Log() << "Store doesn't exist: " << mResourceInstanceIdentifier; 210 SinkLog() << "Store doesn't exist: " << mResourceInstanceIdentifier;
213 return Sink::Storage::Transaction(); 211 return Sink::Storage::Transaction();
214 } 212 }
215 storage.setDefaultErrorHandler([](const Sink::Storage::Error &error) { Warning() << "Error during query: " << error.store << error.message; }); 213 storage.setDefaultErrorHandler([this](const Sink::Storage::Error &error) { SinkWarning() << "Error during query: " << error.store << error.message; });
216 transaction = storage.createTransaction(Sink::Storage::ReadOnly); 214 transaction = storage.createTransaction(Sink::Storage::ReadOnly);
217 } 215 }
218 216
@@ -235,10 +233,10 @@ QPair<qint64, qint64> QueryWorker<DomainType>::executeInitialQuery(
235 auto modifiedQuery = query; 233 auto modifiedQuery = query;
236 if (!query.parentProperty.isEmpty()) { 234 if (!query.parentProperty.isEmpty()) {
237 if (parent) { 235 if (parent) {
238 Trace() << "Running initial query for parent:" << parent->identifier(); 236 SinkTrace() << "Running initial query for parent:" << parent->identifier();
239 modifiedQuery.propertyFilter.insert(query.parentProperty, Query::Comparator(parent->identifier())); 237 modifiedQuery.propertyFilter.insert(query.parentProperty, Query::Comparator(parent->identifier()));
240 } else { 238 } else {
241 Trace() << "Running initial query for toplevel"; 239 SinkTrace() << "Running initial query for toplevel";
242 modifiedQuery.propertyFilter.insert(query.parentProperty, Query::Comparator(QVariant())); 240 modifiedQuery.propertyFilter.insert(query.parentProperty, Query::Comparator(QVariant()));
243 } 241 }
244 } 242 }
@@ -247,7 +245,7 @@ QPair<qint64, qint64> QueryWorker<DomainType>::executeInitialQuery(
247 245
248 Sink::EntityReader<DomainType> reader(*mDomainTypeAdaptorFactory, mResourceInstanceIdentifier, transaction); 246 Sink::EntityReader<DomainType> reader(*mDomainTypeAdaptorFactory, mResourceInstanceIdentifier, transaction);
249 auto revisionAndReplayedEntities = reader.executeInitialQuery(modifiedQuery, offset, batchsize, resultProviderCallback(query, resultProvider)); 247 auto revisionAndReplayedEntities = reader.executeInitialQuery(modifiedQuery, offset, batchsize, resultProviderCallback(query, resultProvider));
250 Trace() << "Initial query took: " << Log::TraceTime(time.elapsed()); 248 SinkTrace() << "Initial query took: " << Log::TraceTime(time.elapsed());
251 return revisionAndReplayedEntities; 249 return revisionAndReplayedEntities;
252} 250}
253 251
diff --git a/common/queryrunner.h b/common/queryrunner.h
index e6d5a54..155528e 100644
--- a/common/queryrunner.h
+++ b/common/queryrunner.h
@@ -32,6 +32,8 @@
32class QueryRunnerBase : public QObject 32class QueryRunnerBase : public QObject
33{ 33{
34 Q_OBJECT 34 Q_OBJECT
35protected:
36 SINK_DEBUG_AREA("queryrunner")
35public: 37public:
36 typedef std::function<void(Sink::ApplicationDomain::ApplicationDomainType &domainObject)> ResultTransformation; 38 typedef std::function<void(Sink::ApplicationDomain::ApplicationDomainType &domainObject)> ResultTransformation;
37 39
@@ -52,7 +54,7 @@ protected slots:
52 */ 54 */
53 void revisionChanged(qint64 newRevision) 55 void revisionChanged(qint64 newRevision)
54 { 56 {
55 Trace() << "New revision: " << newRevision; 57 SinkTrace() << "New revision: " << newRevision;
56 run().exec(); 58 run().exec();
57 } 59 }
58 60
diff --git a/common/remoteidmap.cpp b/common/remoteidmap.cpp
index bbcd641..20a054d 100644
--- a/common/remoteidmap.cpp
+++ b/common/remoteidmap.cpp
@@ -25,6 +25,8 @@
25 25
26using namespace Sink; 26using namespace Sink;
27 27
28SINK_DEBUG_AREA("remoteidmap")
29
28RemoteIdMap::RemoteIdMap(Sink::Storage::Transaction &transaction) 30RemoteIdMap::RemoteIdMap(Sink::Storage::Transaction &transaction)
29 : mTransaction(transaction) 31 : mTransaction(transaction)
30{ 32{
@@ -67,7 +69,7 @@ QByteArray RemoteIdMap::resolveLocalId(const QByteArray &bufferType, const QByte
67{ 69{
68 QByteArray remoteId = Index("localid.mapping." + bufferType, mTransaction).lookup(localId); 70 QByteArray remoteId = Index("localid.mapping." + bufferType, mTransaction).lookup(localId);
69 if (remoteId.isEmpty()) { 71 if (remoteId.isEmpty()) {
70 Warning() << "Couldn't find the remote id for " << localId; 72 SinkWarning() << "Couldn't find the remote id for " << localId;
71 return QByteArray(); 73 return QByteArray();
72 } 74 }
73 return remoteId; 75 return remoteId;
diff --git a/common/resource.h b/common/resource.h
index d6c3c5f..1c35838 100644
--- a/common/resource.h
+++ b/common/resource.h
@@ -91,4 +91,4 @@ private:
91 91
92} // namespace Sink 92} // namespace Sink
93 93
94Q_DECLARE_INTERFACE(Sink::ResourceFactory, "org.kde.sink.resourcefactory") 94Q_DECLARE_INTERFACE(Sink::ResourceFactory, "sink.sink.resourcefactory")
diff --git a/common/resourceaccess.cpp b/common/resourceaccess.cpp
index d3bd85f..c878143 100644
--- a/common/resourceaccess.cpp
+++ b/common/resourceaccess.cpp
@@ -44,12 +44,6 @@
44#include <QBuffer> 44#include <QBuffer>
45#include <QTime> 45#include <QTime>
46 46
47#undef Trace
48#define TracePrivate() Trace_area("client.communication." + resourceInstanceIdentifier)
49#define Trace() Trace_area("client.communication." + d->resourceInstanceIdentifier)
50#undef Log
51#define Log() Log_area("client.communication." + d->resourceInstanceIdentifier)
52
53static void queuedInvoke(const std::function<void()> &f, QObject *context = 0) 47static void queuedInvoke(const std::function<void()> &f, QObject *context = 0)
54{ 48{
55 auto timer = QSharedPointer<QTimer>::create(); 49 auto timer = QSharedPointer<QTimer>::create();
@@ -100,8 +94,10 @@ public:
100 QHash<uint, bool> completeCommands; 94 QHash<uint, bool> completeCommands;
101 uint messageId; 95 uint messageId;
102 bool openingSocket; 96 bool openingSocket;
97 SINK_DEBUG_COMPONENT(resourceInstanceIdentifier)
103}; 98};
104 99
100
105ResourceAccess::Private::Private(const QByteArray &name, const QByteArray &instanceIdentifier, ResourceAccess *q) 101ResourceAccess::Private::Private(const QByteArray &name, const QByteArray &instanceIdentifier, ResourceAccess *q)
106 : resourceName(name), resourceInstanceIdentifier(instanceIdentifier), messageId(0), openingSocket(false) 102 : resourceName(name), resourceInstanceIdentifier(instanceIdentifier), messageId(0), openingSocket(false)
107{ 103{
@@ -111,7 +107,7 @@ void ResourceAccess::Private::abortPendingOperations()
111{ 107{
112 callCallbacks(); 108 callCallbacks();
113 if (!resultHandler.isEmpty()) { 109 if (!resultHandler.isEmpty()) {
114 Warning() << "Aborting pending operations " << resultHandler.keys(); 110 SinkWarning() << "Aborting pending operations " << resultHandler.keys();
115 } 111 }
116 auto handlers = resultHandler.values(); 112 auto handlers = resultHandler.values();
117 resultHandler.clear(); 113 resultHandler.clear();
@@ -165,7 +161,7 @@ KAsync::Job<void> ResourceAccess::Private::tryToConnect()
165 auto counter = QSharedPointer<int>::create(0); 161 auto counter = QSharedPointer<int>::create(0);
166 return KAsync::dowhile([this]() -> bool { return !socket; }, 162 return KAsync::dowhile([this]() -> bool { return !socket; },
167 [this, counter](KAsync::Future<void> &future) { 163 [this, counter](KAsync::Future<void> &future) {
168 TracePrivate() << "Loop"; 164 SinkTrace() << "Loop";
169 connectToServer(resourceInstanceIdentifier) 165 connectToServer(resourceInstanceIdentifier)
170 .then<void, QSharedPointer<QLocalSocket>>( 166 .then<void, QSharedPointer<QLocalSocket>>(
171 [this, &future](const QSharedPointer<QLocalSocket> &s) { 167 [this, &future](const QSharedPointer<QLocalSocket> &s) {
@@ -178,7 +174,7 @@ KAsync::Job<void> ResourceAccess::Private::tryToConnect()
178 static int timeout = 500; 174 static int timeout = 500;
179 static int maxRetries = timeout / waitTime; 175 static int maxRetries = timeout / waitTime;
180 if (*counter > maxRetries) { 176 if (*counter > maxRetries) {
181 TracePrivate() << "Giving up"; 177 SinkTrace() << "Giving up";
182 future.setError(-1, "Failed to connect to socket"); 178 future.setError(-1, "Failed to connect to socket");
183 } else { 179 } else {
184 KAsync::wait(waitTime).then<void>([&future]() { future.setFinished(); }).exec(); 180 KAsync::wait(waitTime).then<void>([&future]() { future.setFinished(); }).exec();
@@ -192,17 +188,17 @@ KAsync::Job<void> ResourceAccess::Private::tryToConnect()
192KAsync::Job<void> ResourceAccess::Private::initializeSocket() 188KAsync::Job<void> ResourceAccess::Private::initializeSocket()
193{ 189{
194 return KAsync::start<void>([this](KAsync::Future<void> &future) { 190 return KAsync::start<void>([this](KAsync::Future<void> &future) {
195 TracePrivate() << "Trying to connect"; 191 SinkTrace() << "Trying to connect";
196 connectToServer(resourceInstanceIdentifier) 192 connectToServer(resourceInstanceIdentifier)
197 .then<void, QSharedPointer<QLocalSocket>>( 193 .then<void, QSharedPointer<QLocalSocket>>(
198 [this, &future](const QSharedPointer<QLocalSocket> &s) { 194 [this, &future](const QSharedPointer<QLocalSocket> &s) {
199 TracePrivate() << "Connected to resource, without having to start it."; 195 SinkTrace() << "Connected to resource, without having to start it.";
200 Q_ASSERT(s); 196 Q_ASSERT(s);
201 socket = s; 197 socket = s;
202 future.setFinished(); 198 future.setFinished();
203 }, 199 },
204 [this, &future](int errorCode, const QString &errorString) { 200 [this, &future](int errorCode, const QString &errorString) {
205 TracePrivate() << "Failed to connect, starting resource"; 201 SinkTrace() << "Failed to connect, starting resource";
206 // We failed to connect, so let's start the resource 202 // We failed to connect, so let's start the resource
207 QStringList args; 203 QStringList args;
208 if (Sink::Test::testModeEnabled()) { 204 if (Sink::Test::testModeEnabled()) {
@@ -211,16 +207,16 @@ KAsync::Job<void> ResourceAccess::Private::initializeSocket()
211 args << resourceInstanceIdentifier << resourceName; 207 args << resourceInstanceIdentifier << resourceName;
212 qint64 pid = 0; 208 qint64 pid = 0;
213 if (QProcess::startDetached("sink_synchronizer", args, QDir::homePath(), &pid)) { 209 if (QProcess::startDetached("sink_synchronizer", args, QDir::homePath(), &pid)) {
214 TracePrivate() << "Started resource " << pid; 210 SinkTrace() << "Started resource " << pid;
215 tryToConnect() 211 tryToConnect()
216 .then<void>([&future]() { future.setFinished(); }, 212 .then<void>([&future]() { future.setFinished(); },
217 [this, &future](int errorCode, const QString &errorString) { 213 [this, &future](int errorCode, const QString &errorString) {
218 Warning() << "Failed to connect to started resource"; 214 SinkWarning() << "Failed to connect to started resource";
219 future.setError(errorCode, errorString); 215 future.setError(errorCode, errorString);
220 }) 216 })
221 .exec(); 217 .exec();
222 } else { 218 } else {
223 Warning() << "Failed to start resource"; 219 SinkWarning() << "Failed to start resource";
224 } 220 }
225 }) 221 })
226 .exec(); 222 .exec();
@@ -230,14 +226,15 @@ KAsync::Job<void> ResourceAccess::Private::initializeSocket()
230ResourceAccess::ResourceAccess(const QByteArray &resourceInstanceIdentifier, const QByteArray &resourceType) 226ResourceAccess::ResourceAccess(const QByteArray &resourceInstanceIdentifier, const QByteArray &resourceType)
231 : ResourceAccessInterface(), d(new Private(resourceType, resourceInstanceIdentifier, this)) 227 : ResourceAccessInterface(), d(new Private(resourceType, resourceInstanceIdentifier, this))
232{ 228{
233 Trace() << "Starting access"; 229 mResourceStatus = Sink::ApplicationDomain::OfflineStatus;
230 SinkTrace() << "Starting access";
234} 231}
235 232
236ResourceAccess::~ResourceAccess() 233ResourceAccess::~ResourceAccess()
237{ 234{
238 Log() << "Closing access"; 235 SinkLog() << "Closing access";
239 if (!d->resultHandler.isEmpty()) { 236 if (!d->resultHandler.isEmpty()) {
240 Warning() << "Left jobs running while shutting down ResourceAccess: " << d->resultHandler.keys(); 237 SinkWarning() << "Left jobs running while shutting down ResourceAccess: " << d->resultHandler.keys();
241 } 238 }
242} 239}
243 240
@@ -294,7 +291,7 @@ KAsync::Job<void> ResourceAccess::sendCommand(int commandId, flatbuffers::FlatBu
294 291
295KAsync::Job<void> ResourceAccess::synchronizeResource(bool sourceSync, bool localSync) 292KAsync::Job<void> ResourceAccess::synchronizeResource(bool sourceSync, bool localSync)
296{ 293{
297 Trace() << "Sending synchronize command: " << sourceSync << localSync; 294 SinkTrace() << "Sending synchronize command: " << sourceSync << localSync;
298 flatbuffers::FlatBufferBuilder fbb; 295 flatbuffers::FlatBufferBuilder fbb;
299 auto command = Sink::Commands::CreateSynchronize(fbb, sourceSync, localSync); 296 auto command = Sink::Commands::CreateSynchronize(fbb, sourceSync, localSync);
300 Sink::Commands::FinishSynchronizeBuffer(fbb, command); 297 Sink::Commands::FinishSynchronizeBuffer(fbb, command);
@@ -375,7 +372,7 @@ ResourceAccess::sendInspectionCommand(int inspectionType, const QByteArray &insp
375void ResourceAccess::open() 372void ResourceAccess::open()
376{ 373{
377 if (d->socket && d->socket->isValid()) { 374 if (d->socket && d->socket->isValid()) {
378 // Trace() << "Socket valid, so not opening again"; 375 // SinkTrace() << "Socket valid, so not opening again";
379 return; 376 return;
380 } 377 }
381 if (d->openingSocket) { 378 if (d->openingSocket) {
@@ -387,7 +384,7 @@ void ResourceAccess::open()
387 d->initializeSocket() 384 d->initializeSocket()
388 .then<void>( 385 .then<void>(
389 [this, time]() { 386 [this, time]() {
390 Trace() << "Socket is initialized." << Log::TraceTime(time->elapsed()); 387 SinkTrace() << "Socket is initialized." << Log::TraceTime(time->elapsed());
391 d->openingSocket = false; 388 d->openingSocket = false;
392 QObject::connect(d->socket.data(), &QLocalSocket::disconnected, this, &ResourceAccess::disconnected); 389 QObject::connect(d->socket.data(), &QLocalSocket::disconnected, this, &ResourceAccess::disconnected);
393 QObject::connect(d->socket.data(), SIGNAL(error(QLocalSocket::LocalSocketError)), this, SLOT(connectionError(QLocalSocket::LocalSocketError))); 390 QObject::connect(d->socket.data(), SIGNAL(error(QLocalSocket::LocalSocketError)), this, SLOT(connectionError(QLocalSocket::LocalSocketError)));
@@ -396,16 +393,16 @@ void ResourceAccess::open()
396 }, 393 },
397 [this](int error, const QString &errorString) { 394 [this](int error, const QString &errorString) {
398 d->openingSocket = false; 395 d->openingSocket = false;
399 Warning() << "Failed to initialize socket " << errorString; 396 SinkWarning() << "Failed to initialize socket " << errorString;
400 }) 397 })
401 .exec(); 398 .exec();
402} 399}
403 400
404void ResourceAccess::close() 401void ResourceAccess::close()
405{ 402{
406 Log() << QString("Closing %1").arg(d->socket->fullServerName()); 403 SinkLog() << QString("Closing %1").arg(d->socket->fullServerName());
407 Trace() << "Pending commands: " << d->pendingCommands.size(); 404 SinkTrace() << "Pending commands: " << d->pendingCommands.size();
408 Trace() << "Queued commands: " << d->commandQueue.size(); 405 SinkTrace() << "Queued commands: " << d->commandQueue.size();
409 d->socket->close(); 406 d->socket->close();
410} 407}
411 408
@@ -415,10 +412,10 @@ void ResourceAccess::sendCommand(const QSharedPointer<QueuedCommand> &command)
415 // TODO: we should have a timeout for commands 412 // TODO: we should have a timeout for commands
416 d->messageId++; 413 d->messageId++;
417 const auto messageId = d->messageId; 414 const auto messageId = d->messageId;
418 Trace() << QString("Sending command \"%1\" with messageId %2").arg(QString(Sink::Commands::name(command->commandId))).arg(d->messageId); 415 SinkTrace() << QString("Sending command \"%1\" with messageId %2").arg(QString(Sink::Commands::name(command->commandId))).arg(d->messageId);
419 Q_ASSERT(command->callback); 416 Q_ASSERT(command->callback);
420 registerCallback(d->messageId, [this, messageId, command](int errorCode, QString errorMessage) { 417 registerCallback(d->messageId, [this, messageId, command](int errorCode, QString errorMessage) {
421 Trace() << "Command complete " << messageId; 418 SinkTrace() << "Command complete " << messageId;
422 d->pendingCommands.remove(messageId); 419 d->pendingCommands.remove(messageId);
423 command->callback(errorCode, errorMessage); 420 command->callback(errorCode, errorMessage);
424 }); 421 });
@@ -430,8 +427,8 @@ void ResourceAccess::sendCommand(const QSharedPointer<QueuedCommand> &command)
430void ResourceAccess::processCommandQueue() 427void ResourceAccess::processCommandQueue()
431{ 428{
432 // TODO: serialize instead of blast them all through the socket? 429 // TODO: serialize instead of blast them all through the socket?
433 Trace() << "We have " << d->commandQueue.size() << " queued commands"; 430 SinkTrace() << "We have " << d->commandQueue.size() << " queued commands";
434 Trace() << "Pending commands: " << d->pendingCommands.size(); 431 SinkTrace() << "Pending commands: " << d->pendingCommands.size();
435 for (auto command : d->commandQueue) { 432 for (auto command : d->commandQueue) {
436 sendCommand(command); 433 sendCommand(command);
437 } 434 }
@@ -440,9 +437,9 @@ void ResourceAccess::processCommandQueue()
440 437
441void ResourceAccess::processPendingCommandQueue() 438void ResourceAccess::processPendingCommandQueue()
442{ 439{
443 Trace() << "We have " << d->pendingCommands.size() << " pending commands"; 440 SinkTrace() << "We have " << d->pendingCommands.size() << " pending commands";
444 for (auto command : d->pendingCommands) { 441 for (auto command : d->pendingCommands) {
445 Trace() << "Reenquing command " << command->commandId; 442 SinkTrace() << "Reenquing command " << command->commandId;
446 d->commandQueue << command; 443 d->commandQueue << command;
447 } 444 }
448 d->pendingCommands.clear(); 445 d->pendingCommands.clear();
@@ -452,11 +449,11 @@ void ResourceAccess::processPendingCommandQueue()
452void ResourceAccess::connected() 449void ResourceAccess::connected()
453{ 450{
454 if (!isReady()) { 451 if (!isReady()) {
455 Trace() << "Connected but not ready?"; 452 SinkTrace() << "Connected but not ready?";
456 return; 453 return;
457 } 454 }
458 455
459 Trace() << QString("Connected: %1").arg(d->socket->fullServerName()); 456 SinkTrace() << QString("Connected: %1").arg(d->socket->fullServerName());
460 457
461 { 458 {
462 flatbuffers::FlatBufferBuilder fbb; 459 flatbuffers::FlatBufferBuilder fbb;
@@ -476,7 +473,7 @@ void ResourceAccess::connected()
476 473
477void ResourceAccess::disconnected() 474void ResourceAccess::disconnected()
478{ 475{
479 Log() << QString("Disconnected from %1").arg(d->socket->fullServerName()); 476 SinkLog() << QString("Disconnected from %1").arg(d->socket->fullServerName());
480 d->socket->close(); 477 d->socket->close();
481 emit ready(false); 478 emit ready(false);
482} 479}
@@ -485,15 +482,15 @@ void ResourceAccess::connectionError(QLocalSocket::LocalSocketError error)
485{ 482{
486 const bool resourceCrashed = d->partialMessageBuffer.contains("PANIC"); 483 const bool resourceCrashed = d->partialMessageBuffer.contains("PANIC");
487 if (resourceCrashed) { 484 if (resourceCrashed) {
488 ErrorMsg() << "The resource crashed!"; 485 SinkError() << "The resource crashed!";
489 d->abortPendingOperations(); 486 d->abortPendingOperations();
490 } else if (error == QLocalSocket::PeerClosedError) { 487 } else if (error == QLocalSocket::PeerClosedError) {
491 Log() << "The resource closed the connection."; 488 SinkLog() << "The resource closed the connection.";
492 d->abortPendingOperations(); 489 d->abortPendingOperations();
493 } else { 490 } else {
494 Warning() << QString("Connection error: %1 : %2").arg(error).arg(d->socket->errorString()); 491 SinkWarning() << QString("Connection error: %1 : %2").arg(error).arg(d->socket->errorString());
495 if (d->pendingCommands.size()) { 492 if (d->pendingCommands.size()) {
496 Trace() << "Reconnecting due to pending operations: " << d->pendingCommands.size(); 493 SinkTrace() << "Reconnecting due to pending operations: " << d->pendingCommands.size();
497 open(); 494 open();
498 } 495 }
499 } 496 }
@@ -502,7 +499,7 @@ void ResourceAccess::connectionError(QLocalSocket::LocalSocketError error)
502void ResourceAccess::readResourceMessage() 499void ResourceAccess::readResourceMessage()
503{ 500{
504 if (!d->socket || !d->socket->isValid()) { 501 if (!d->socket || !d->socket->isValid()) {
505 Warning() << "No socket available"; 502 SinkWarning() << "No socket available";
506 return; 503 return;
507 } 504 }
508 505
@@ -513,11 +510,27 @@ void ResourceAccess::readResourceMessage()
513 } 510 }
514} 511}
515 512
513static Sink::Notification getNotification(const Sink::Commands::Notification *buffer)
514{
515 Sink::Notification n;
516 if (buffer->identifier()) {
517 // Don't use fromRawData, the buffer is gone once we invoke emit notification
518 n.id = BufferUtils::extractBufferCopy(buffer->identifier());
519 }
520 if (buffer->message()) {
521 // Don't use fromRawData, the buffer is gone once we invoke emit notification
522 n.message = BufferUtils::extractBufferCopy(buffer->message());
523 }
524 n.type = buffer->type();
525 n.code = buffer->code();
526 return n;
527}
528
516bool ResourceAccess::processMessageBuffer() 529bool ResourceAccess::processMessageBuffer()
517{ 530{
518 static const int headerSize = Commands::headerSize(); 531 static const int headerSize = Commands::headerSize();
519 if (d->partialMessageBuffer.size() < headerSize) { 532 if (d->partialMessageBuffer.size() < headerSize) {
520 Warning() << "command too small"; 533 SinkWarning() << "command too small";
521 return false; 534 return false;
522 } 535 }
523 536
@@ -526,16 +539,16 @@ bool ResourceAccess::processMessageBuffer()
526 const uint size = *(int *)(d->partialMessageBuffer.constData() + sizeof(int) + sizeof(uint)); 539 const uint size = *(int *)(d->partialMessageBuffer.constData() + sizeof(int) + sizeof(uint));
527 540
528 if (size > (uint)(d->partialMessageBuffer.size() - headerSize)) { 541 if (size > (uint)(d->partialMessageBuffer.size() - headerSize)) {
529 Warning() << "command too small"; 542 SinkWarning() << "command too small";
530 return false; 543 return false;
531 } 544 }
532 545
533 switch (commandId) { 546 switch (commandId) {
534 case Commands::RevisionUpdateCommand: { 547 case Commands::RevisionUpdateCommand: {
535 auto buffer = Commands::GetRevisionUpdate(d->partialMessageBuffer.constData() + headerSize); 548 auto buffer = Commands::GetRevisionUpdate(d->partialMessageBuffer.constData() + headerSize);
536 Trace() << QString("Revision updated to: %1").arg(buffer->revision()); 549 SinkTrace() << QString("Revision updated to: %1").arg(buffer->revision());
537 Notification n; 550 Notification n;
538 n.type = Sink::Commands::NotificationType::NotificationType_RevisionUpdate; 551 n.type = Sink::Notification::RevisionUpdate;
539 emit notification(n); 552 emit notification(n);
540 emit revisionChanged(buffer->revision()); 553 emit revisionChanged(buffer->revision());
541 554
@@ -543,7 +556,7 @@ bool ResourceAccess::processMessageBuffer()
543 } 556 }
544 case Commands::CommandCompletionCommand: { 557 case Commands::CommandCompletionCommand: {
545 auto buffer = Commands::GetCommandCompletion(d->partialMessageBuffer.constData() + headerSize); 558 auto buffer = Commands::GetCommandCompletion(d->partialMessageBuffer.constData() + headerSize);
546 Trace() << QString("Command with messageId %1 completed %2").arg(buffer->id()).arg(buffer->success() ? "sucessfully" : "unsuccessfully"); 559 SinkTrace() << QString("Command with messageId %1 completed %2").arg(buffer->id()).arg(buffer->success() ? "sucessfully" : "unsuccessfully");
547 560
548 d->completeCommands.insert(buffer->id(), buffer->success()); 561 d->completeCommands.insert(buffer->id(), buffer->success());
549 // The callbacks can result in this object getting destroyed directly, so we need to ensure we finish our work first 562 // The callbacks can result in this object getting destroyed directly, so we need to ensure we finish our work first
@@ -553,32 +566,34 @@ bool ResourceAccess::processMessageBuffer()
553 case Commands::NotificationCommand: { 566 case Commands::NotificationCommand: {
554 auto buffer = Commands::GetNotification(d->partialMessageBuffer.constData() + headerSize); 567 auto buffer = Commands::GetNotification(d->partialMessageBuffer.constData() + headerSize);
555 switch (buffer->type()) { 568 switch (buffer->type()) {
556 case Sink::Commands::NotificationType::NotificationType_Shutdown: 569 case Sink::Notification::Shutdown:
557 Log() << "Received shutdown notification."; 570 SinkLog() << "Received shutdown notification.";
558 close(); 571 close();
559 break; 572 break;
560 case Sink::Commands::NotificationType::NotificationType_Inspection: { 573 case Sink::Notification::Inspection: {
561 Trace() << "Received inspection notification."; 574 SinkTrace() << "Received inspection notification.";
562 Notification n; 575 auto n = getNotification(buffer);
563 if (buffer->identifier()) {
564 // Don't use fromRawData, the buffer is gone once we invoke emit notification
565 n.id = BufferUtils::extractBufferCopy(buffer->identifier());
566 }
567 if (buffer->message()) {
568 // Don't use fromRawData, the buffer is gone once we invoke emit notification
569 n.message = BufferUtils::extractBufferCopy(buffer->message());
570 }
571 n.type = buffer->type();
572 n.code = buffer->code();
573 // The callbacks can result in this object getting destroyed directly, so we need to ensure we finish our work first 576 // The callbacks can result in this object getting destroyed directly, so we need to ensure we finish our work first
574 queuedInvoke([=]() { emit notification(n); }, this); 577 queuedInvoke([=]() { emit notification(n); }, this);
575 } break; 578 } break;
576 case Sink::Commands::NotificationType::NotificationType_Status: 579 case Sink::Notification::Status:
577 case Sink::Commands::NotificationType::NotificationType_Warning: 580 if (mResourceStatus == buffer->code()) {
578 case Sink::Commands::NotificationType::NotificationType_Progress: 581 SinkTrace() << "Got an unnecessary status notification";
579 case Sink::Commands::NotificationType::NotificationType_RevisionUpdate: 582 break;
583 }
584 mResourceStatus = buffer->code();
585 SinkTrace() << "Updated status: " << mResourceStatus;
586 [[clang::fallthrough]];
587 case Sink::Notification::Warning:
588 [[clang::fallthrough]];
589 case Sink::Notification::Progress: {
590 auto n = getNotification(buffer);
591 SinkTrace() << "Received notification: " << n.type;
592 emit notification(n);
593 } break;
594 case Sink::Notification::RevisionUpdate:
580 default: 595 default:
581 Warning() << "Received unknown notification: " << buffer->type(); 596 SinkWarning() << "Received unknown notification: " << buffer->type();
582 break; 597 break;
583 } 598 }
584 break; 599 break;
@@ -624,6 +639,7 @@ Sink::ResourceAccess::Ptr ResourceAccessFactory::getAccess(const QByteArray &ins
624 } 639 }
625 if (!mTimer.contains(instanceIdentifier)) { 640 if (!mTimer.contains(instanceIdentifier)) {
626 auto timer = new QTimer; 641 auto timer = new QTimer;
642 timer->setSingleShot(true);
627 // Drop connection after 3 seconds (which is a random value) 643 // Drop connection after 3 seconds (which is a random value)
628 QObject::connect(timer, &QTimer::timeout, timer, [this, instanceIdentifier]() { mCache.remove(instanceIdentifier); }); 644 QObject::connect(timer, &QTimer::timeout, timer, [this, instanceIdentifier]() { mCache.remove(instanceIdentifier); });
629 timer->setInterval(3000); 645 timer->setInterval(3000);
diff --git a/common/resourceaccess.h b/common/resourceaccess.h
index 69d52b4..5d66246 100644
--- a/common/resourceaccess.h
+++ b/common/resourceaccess.h
@@ -29,6 +29,7 @@
29 29
30#include <flatbuffers/flatbuffers.h> 30#include <flatbuffers/flatbuffers.h>
31#include "notification.h" 31#include "notification.h"
32#include "log.h"
32 33
33namespace Sink { 34namespace Sink {
34 35
@@ -72,19 +73,28 @@ public:
72 return KAsync::null<void>(); 73 return KAsync::null<void>();
73 }; 74 };
74 75
76 int getResourceStatus() const
77 {
78 return mResourceStatus;
79 }
80
75signals: 81signals:
76 void ready(bool isReady); 82 void ready(bool isReady);
77 void revisionChanged(qint64 revision); 83 void revisionChanged(qint64 revision);
78 void notification(Notification revision); 84 void notification(Notification notification);
79 85
80public slots: 86public slots:
81 virtual void open() = 0; 87 virtual void open() = 0;
82 virtual void close() = 0; 88 virtual void close() = 0;
89
90protected:
91 int mResourceStatus;
83}; 92};
84 93
85class SINK_EXPORT ResourceAccess : public ResourceAccessInterface 94class SINK_EXPORT ResourceAccess : public ResourceAccessInterface
86{ 95{
87 Q_OBJECT 96 Q_OBJECT
97 SINK_DEBUG_AREA("communication")
88public: 98public:
89 typedef QSharedPointer<ResourceAccess> Ptr; 99 typedef QSharedPointer<ResourceAccess> Ptr;
90 100
@@ -130,6 +140,7 @@ private:
130 140
131 class Private; 141 class Private;
132 Private *const d; 142 Private *const d;
143 // SINK_DEBUG_COMPONENT(d->resourceInstanceIdentifier)
133}; 144};
134 145
135/** 146/**
@@ -137,8 +148,9 @@ private:
137 * 148 *
138 * This avoids constantly recreating connections, and should allow a single process to have one connection per resource. 149 * This avoids constantly recreating connections, and should allow a single process to have one connection per resource.
139 */ 150 */
140class ResourceAccessFactory 151class SINK_EXPORT ResourceAccessFactory
141{ 152{
153 SINK_DEBUG_AREA("ResourceAccessFactory")
142public: 154public:
143 static ResourceAccessFactory &instance(); 155 static ResourceAccessFactory &instance();
144 Sink::ResourceAccess::Ptr getAccess(const QByteArray &instanceIdentifier, const QByteArray resourceType); 156 Sink::ResourceAccess::Ptr getAccess(const QByteArray &instanceIdentifier, const QByteArray resourceType);
diff --git a/common/resourceconfig.cpp b/common/resourceconfig.cpp
index a4e5fc5..39f9ddb 100644
--- a/common/resourceconfig.cpp
+++ b/common/resourceconfig.cpp
@@ -20,13 +20,13 @@
20 20
21#include <QSettings> 21#include <QSettings>
22#include <QSharedPointer> 22#include <QSharedPointer>
23#include <QStandardPaths>
24#include <QFile> 23#include <QFile>
25#include <log.h> 24#include <log.h>
25#include <definitions.h>
26 26
27static QSharedPointer<QSettings> getConfig(const QByteArray &identifier) 27static QSharedPointer<QSettings> getConfig(const QByteArray &identifier)
28{ 28{
29 return QSharedPointer<QSettings>::create(QStandardPaths::writableLocation(QStandardPaths::GenericDataLocation) + "/sink/" + identifier + ".ini", QSettings::IniFormat); 29 return QSharedPointer<QSettings>::create(Sink::configLocation() +"/" + identifier + ".ini", QSettings::IniFormat);
30} 30}
31 31
32QByteArray ResourceConfig::newIdentifier(const QByteArray &type) 32QByteArray ResourceConfig::newIdentifier(const QByteArray &type)
diff --git a/common/resourcecontrol.cpp b/common/resourcecontrol.cpp
index 5c2cd06..7d092a4 100644
--- a/common/resourcecontrol.cpp
+++ b/common/resourcecontrol.cpp
@@ -30,14 +30,13 @@
30#include "log.h" 30#include "log.h"
31#include "notifier.h" 31#include "notifier.h"
32 32
33#undef DEBUG_AREA 33SINK_DEBUG_AREA("resourcecontrol")
34#define DEBUG_AREA "client.resourcecontrol"
35 34
36namespace Sink { 35namespace Sink {
37 36
38KAsync::Job<void> ResourceControl::shutdown(const QByteArray &identifier) 37KAsync::Job<void> ResourceControl::shutdown(const QByteArray &identifier)
39{ 38{
40 Trace() << "shutdown " << identifier; 39 SinkTrace() << "shutdown " << identifier;
41 auto time = QSharedPointer<QTime>::create(); 40 auto time = QSharedPointer<QTime>::create();
42 time->start(); 41 time->start();
43 return ResourceAccess::connectToServer(identifier) 42 return ResourceAccess::connectToServer(identifier)
@@ -50,33 +49,33 @@ KAsync::Job<void> ResourceControl::shutdown(const QByteArray &identifier)
50 resourceAccess->sendCommand(Sink::Commands::ShutdownCommand) 49 resourceAccess->sendCommand(Sink::Commands::ShutdownCommand)
51 .then<void>([&future, resourceAccess, time]() { 50 .then<void>([&future, resourceAccess, time]() {
52 resourceAccess->close(); 51 resourceAccess->close();
53 Trace() << "Shutdown complete." << Log::TraceTime(time->elapsed()); 52 SinkTrace() << "Shutdown complete." << Log::TraceTime(time->elapsed());
54 future.setFinished(); 53 future.setFinished();
55 }) 54 })
56 .exec(); 55 .exec();
57 }, 56 },
58 [](int, const QString &) { 57 [](int, const QString &) {
59 Trace() << "Resource is already closed."; 58 SinkTrace() << "Resource is already closed.";
60 // Resource isn't started, nothing to shutdown 59 // Resource isn't started, nothing to shutdown
61 }); 60 });
62} 61}
63 62
64KAsync::Job<void> ResourceControl::start(const QByteArray &identifier) 63KAsync::Job<void> ResourceControl::start(const QByteArray &identifier)
65{ 64{
66 Trace() << "start " << identifier; 65 SinkTrace() << "start " << identifier;
67 auto time = QSharedPointer<QTime>::create(); 66 auto time = QSharedPointer<QTime>::create();
68 time->start(); 67 time->start();
69 auto resourceAccess = ResourceAccessFactory::instance().getAccess(identifier, ResourceConfig::getResourceType(identifier)); 68 auto resourceAccess = ResourceAccessFactory::instance().getAccess(identifier, ResourceConfig::getResourceType(identifier));
70 resourceAccess->open(); 69 resourceAccess->open();
71 return resourceAccess->sendCommand(Sink::Commands::PingCommand).then<void>([resourceAccess, time]() { Trace() << "Start complete." << Log::TraceTime(time->elapsed()); }); 70 return resourceAccess->sendCommand(Sink::Commands::PingCommand).then<void>([resourceAccess, time]() { SinkTrace() << "Start complete." << Log::TraceTime(time->elapsed()); });
72} 71}
73 72
74KAsync::Job<void> ResourceControl::flushMessageQueue(const QByteArrayList &resourceIdentifier) 73KAsync::Job<void> ResourceControl::flushMessageQueue(const QByteArrayList &resourceIdentifier)
75{ 74{
76 Trace() << "flushMessageQueue" << resourceIdentifier; 75 SinkTrace() << "flushMessageQueue" << resourceIdentifier;
77 return KAsync::iterate(resourceIdentifier) 76 return KAsync::iterate(resourceIdentifier)
78 .template each<void, QByteArray>([](const QByteArray &resource, KAsync::Future<void> &future) { 77 .template each<void, QByteArray>([](const QByteArray &resource, KAsync::Future<void> &future) {
79 Trace() << "Flushing message queue " << resource; 78 SinkTrace() << "Flushing message queue " << resource;
80 auto resourceAccess = ResourceAccessFactory::instance().getAccess(resource, ResourceConfig::getResourceType(resource)); 79 auto resourceAccess = ResourceAccessFactory::instance().getAccess(resource, ResourceConfig::getResourceType(resource));
81 resourceAccess->open(); 80 resourceAccess->open();
82 resourceAccess->synchronizeResource(false, true).then<void>([&future, resourceAccess]() { future.setFinished(); }).exec(); 81 resourceAccess->synchronizeResource(false, true).then<void>([&future, resourceAccess]() { future.setFinished(); }).exec();
@@ -95,7 +94,7 @@ KAsync::Job<void> ResourceControl::inspect(const Inspection &inspectionCommand)
95 94
96 auto time = QSharedPointer<QTime>::create(); 95 auto time = QSharedPointer<QTime>::create();
97 time->start(); 96 time->start();
98 Trace() << "Sending inspection " << resource; 97 SinkTrace() << "Sending inspection " << resource;
99 auto resourceAccess = ResourceAccessFactory::instance().getAccess(resource, ResourceConfig::getResourceType(resource)); 98 auto resourceAccess = ResourceAccessFactory::instance().getAccess(resource, ResourceConfig::getResourceType(resource));
100 resourceAccess->open(); 99 resourceAccess->open();
101 auto notifier = QSharedPointer<Sink::Notifier>::create(resourceAccess); 100 auto notifier = QSharedPointer<Sink::Notifier>::create(resourceAccess);
@@ -104,7 +103,7 @@ KAsync::Job<void> ResourceControl::inspect(const Inspection &inspectionCommand)
104 .template then<void>([resourceAccess, notifier, id, time](KAsync::Future<void> &future) { 103 .template then<void>([resourceAccess, notifier, id, time](KAsync::Future<void> &future) {
105 notifier->registerHandler([&future, id, time](const Notification &notification) { 104 notifier->registerHandler([&future, id, time](const Notification &notification) {
106 if (notification.id == id) { 105 if (notification.id == id) {
107 Trace() << "Inspection complete." << Log::TraceTime(time->elapsed()); 106 SinkTrace() << "Inspection complete." << Log::TraceTime(time->elapsed());
108 if (notification.code) { 107 if (notification.code) {
109 future.setError(-1, "Inspection returned an error: " + notification.message); 108 future.setError(-1, "Inspection returned an error: " + notification.message);
110 } else { 109 } else {
diff --git a/common/resourcefacade.cpp b/common/resourcefacade.cpp
index 3901f43..583d6ec 100644
--- a/common/resourcefacade.cpp
+++ b/common/resourcefacade.cpp
@@ -22,23 +22,19 @@
22#include "query.h" 22#include "query.h"
23#include "definitions.h" 23#include "definitions.h"
24#include "storage.h" 24#include "storage.h"
25#include "store.h"
26#include "resourceaccess.h"
25#include <QDir> 27#include <QDir>
26 28
27template <typename DomainType> 29using namespace Sink;
28ConfigNotifier LocalStorageFacade<DomainType>::sConfigNotifier;
29 30
30template <typename DomainType> 31SINK_DEBUG_AREA("ResourceFacade")
31LocalStorageFacade<DomainType>::LocalStorageFacade(const QByteArray &identifier) : Sink::StoreFacade<DomainType>(), mConfigStore(identifier), mResourceInstanceIdentifier(identifier)
32{
33}
34 32
35template <typename DomainType> 33template<typename DomainType>
36LocalStorageFacade<DomainType>::~LocalStorageFacade() 34ConfigNotifier LocalStorageFacade<DomainType>::sConfigNotifier;
37{
38}
39 35
40template <typename DomainType> 36template <typename DomainType>
41typename DomainType::Ptr LocalStorageFacade<DomainType>::readFromConfig(ConfigStore &configStore, const QByteArray &id, const QByteArray &type) 37static typename DomainType::Ptr readFromConfig(ConfigStore &configStore, const QByteArray &id, const QByteArray &type)
42{ 38{
43 auto object = DomainType::Ptr::create(id); 39 auto object = DomainType::Ptr::create(id);
44 object->setProperty("type", type); 40 object->setProperty("type", type);
@@ -49,10 +45,127 @@ typename DomainType::Ptr LocalStorageFacade<DomainType>::readFromConfig(ConfigSt
49 return object; 45 return object;
50} 46}
51 47
48static bool matchesFilter(const QHash<QByteArray, Query::Comparator> &filter, const QMap<QByteArray, QVariant> &properties)
49{
50 for (const auto &filterProperty : filter.keys()) {
51 if (filterProperty == "type") {
52 continue;
53 }
54 if (!filter.value(filterProperty).matches(properties.value(filterProperty))) {
55 return false;
56 }
57 }
58 return true;
59}
60
61template<typename DomainType>
62LocalStorageQueryRunner<DomainType>::LocalStorageQueryRunner(const Query &query, const QByteArray &identifier, ConfigNotifier &configNotifier)
63 : mResultProvider(new ResultProvider<typename DomainType::Ptr>), mConfigStore(identifier), mGuard(new QObject)
64{
65 QObject *guard = new QObject;
66 mResultProvider->setFetcher([this, query, guard, &configNotifier](const QSharedPointer<DomainType> &) {
67 const auto entries = mConfigStore.getEntries();
68 for (const auto &res : entries.keys()) {
69 const auto type = entries.value(res);
70
71 if (query.propertyFilter.contains("type") && query.propertyFilter.value("type").value.toByteArray() != type) {
72 SinkTrace() << "Skipping due to type.";
73 continue;
74 }
75 if (!query.ids.isEmpty() && !query.ids.contains(res)) {
76 continue;
77 }
78 const auto configurationValues = mConfigStore.get(res);
79 if (!matchesFilter(query.propertyFilter, configurationValues)){
80 SinkTrace() << "Skipping due to filter.";
81 continue;
82 }
83 SinkTrace() << "Found match " << res;
84 auto entity = readFromConfig<DomainType>(mConfigStore, res, type);
85 updateStatus(*entity);
86 mResultProvider->add(entity);
87 }
88 if (query.liveQuery) {
89 {
90 auto ret = QObject::connect(&configNotifier, &ConfigNotifier::added, guard, [this](const ApplicationDomain::ApplicationDomainType::Ptr &entry) {
91 auto entity = entry.staticCast<DomainType>();
92 updateStatus(*entity);
93 mResultProvider->add(entity);
94 });
95 Q_ASSERT(ret);
96 }
97 {
98 auto ret = QObject::connect(&configNotifier, &ConfigNotifier::modified, guard, [this](const ApplicationDomain::ApplicationDomainType::Ptr &entry) {
99 auto entity = entry.staticCast<DomainType>();
100 updateStatus(*entity);
101 mResultProvider->modify(entity);
102 });
103 Q_ASSERT(ret);
104 }
105 {
106 auto ret = QObject::connect(&configNotifier, &ConfigNotifier::removed, guard, [this](const ApplicationDomain::ApplicationDomainType::Ptr &entry) {
107 mResultProvider->remove(entry.staticCast<DomainType>());
108 });
109 Q_ASSERT(ret);
110 }
111 }
112 // TODO initialResultSetComplete should be implicit
113 mResultProvider->initialResultSetComplete(typename DomainType::Ptr());
114 mResultProvider->complete();
115 });
116 mResultProvider->onDone([=]() { delete guard; delete this; });
117}
118
119template<typename DomainType>
120QObject *LocalStorageQueryRunner<DomainType>::guard() const
121{
122 return mGuard.get();
123}
124
125template<typename DomainType>
126void LocalStorageQueryRunner<DomainType>::updateStatus(DomainType &entity)
127{
128 if (mStatusUpdater) {
129 mStatusUpdater(entity);
130 }
131}
132
133template<typename DomainType>
134void LocalStorageQueryRunner<DomainType>::setStatusUpdater(const std::function<void(DomainType &)> &updater)
135{
136 mStatusUpdater = updater;
137}
138
139template<typename DomainType>
140void LocalStorageQueryRunner<DomainType>::statusChanged(const QByteArray &identifier)
141{
142 SinkTrace() << "Status changed " << identifier;
143 auto entity = readFromConfig<DomainType>(mConfigStore, identifier, ApplicationDomain::getTypeName<DomainType>());
144 updateStatus(*entity);
145 mResultProvider->modify(entity);
146}
147
148template<typename DomainType>
149typename Sink::ResultEmitter<typename DomainType::Ptr>::Ptr LocalStorageQueryRunner<DomainType>::emitter()
150{
151 return mResultProvider->emitter();
152}
153
154
155template <typename DomainType>
156LocalStorageFacade<DomainType>::LocalStorageFacade(const QByteArray &identifier) : StoreFacade<DomainType>(), mIdentifier(identifier), mConfigStore(identifier)
157{
158}
159
160template <typename DomainType>
161LocalStorageFacade<DomainType>::~LocalStorageFacade()
162{
163}
164
52template <typename DomainType> 165template <typename DomainType>
53typename DomainType::Ptr LocalStorageFacade<DomainType>::readFromConfig(const QByteArray &id, const QByteArray &type) 166typename DomainType::Ptr LocalStorageFacade<DomainType>::readFromConfig(const QByteArray &id, const QByteArray &type)
54{ 167{
55 return readFromConfig(mConfigStore, id, type); 168 return ::readFromConfig<DomainType>(mConfigStore, id, type);
56} 169}
57 170
58template <typename DomainType> 171template <typename DomainType>
@@ -84,7 +197,7 @@ KAsync::Job<void> LocalStorageFacade<DomainType>::modify(const DomainType &domai
84 return KAsync::start<void>([domainObject, this]() { 197 return KAsync::start<void>([domainObject, this]() {
85 const QByteArray identifier = domainObject.identifier(); 198 const QByteArray identifier = domainObject.identifier();
86 if (identifier.isEmpty()) { 199 if (identifier.isEmpty()) {
87 Warning() << "We need an \"identifier\" property to identify the entity to configure."; 200 SinkWarning() << "We need an \"identifier\" property to identify the entity to configure.";
88 return; 201 return;
89 } 202 }
90 auto changedProperties = domainObject.changedProperties(); 203 auto changedProperties = domainObject.changedProperties();
@@ -110,77 +223,22 @@ KAsync::Job<void> LocalStorageFacade<DomainType>::remove(const DomainType &domai
110 return KAsync::start<void>([domainObject, this]() { 223 return KAsync::start<void>([domainObject, this]() {
111 const QByteArray identifier = domainObject.identifier(); 224 const QByteArray identifier = domainObject.identifier();
112 if (identifier.isEmpty()) { 225 if (identifier.isEmpty()) {
113 Warning() << "We need an \"identifier\" property to identify the entity to configure"; 226 SinkWarning() << "We need an \"identifier\" property to identify the entity to configure";
114 return; 227 return;
115 } 228 }
116 Trace() << "Removing: " << identifier; 229 SinkTrace() << "Removing: " << identifier;
117 mConfigStore.remove(identifier); 230 mConfigStore.remove(identifier);
118 sConfigNotifier.remove(QSharedPointer<DomainType>::create(domainObject)); 231 sConfigNotifier.remove(QSharedPointer<DomainType>::create(domainObject));
119 }); 232 });
120} 233}
121 234
122static bool matchesFilter(const QHash<QByteArray, Sink::Query::Comparator> &filter, const QMap<QByteArray, QVariant> &properties)
123{
124 for (const auto &filterProperty : filter.keys()) {
125 if (filterProperty == "type") {
126 continue;
127 }
128 if (!filter.value(filterProperty).matches(properties.value(filterProperty))) {
129 return false;
130 }
131 }
132 return true;
133}
134
135template <typename DomainType> 235template <typename DomainType>
136QPair<KAsync::Job<void>, typename Sink::ResultEmitter<typename DomainType::Ptr>::Ptr> LocalStorageFacade<DomainType>::load(const Sink::Query &query) 236QPair<KAsync::Job<void>, typename ResultEmitter<typename DomainType::Ptr>::Ptr> LocalStorageFacade<DomainType>::load(const Query &query)
137{ 237{
138 QObject *guard = new QObject; 238 auto runner = new LocalStorageQueryRunner<DomainType>(query, mIdentifier, sConfigNotifier);
139 auto resultProvider = new Sink::ResultProvider<typename DomainType::Ptr>(); 239 return qMakePair(KAsync::null<void>(), runner->emitter());
140 auto emitter = resultProvider->emitter();
141 auto identifier = mResourceInstanceIdentifier;
142 resultProvider->setFetcher([identifier, query, guard, resultProvider](const QSharedPointer<DomainType> &) {
143 ConfigStore mConfigStore(identifier);
144 const auto entries = mConfigStore.getEntries();
145 for (const auto &res : entries.keys()) {
146 const auto type = entries.value(res);
147
148 if (query.propertyFilter.contains("type") && query.propertyFilter.value("type").value.toByteArray() != type) {
149 Trace() << "Skipping due to type.";
150 continue;
151 }
152 if (!query.ids.isEmpty() && !query.ids.contains(res)) {
153 continue;
154 }
155 const auto configurationValues = mConfigStore.get(res);
156 if (!matchesFilter(query.propertyFilter, configurationValues)){
157 Trace() << "Skipping due to filter.";
158 continue;
159 }
160 Trace() << "Found match " << res;
161 resultProvider->add(readFromConfig(mConfigStore, res, type));
162 }
163 if (query.liveQuery) {
164 QObject::connect(&sConfigNotifier, &ConfigNotifier::modified, guard, [resultProvider](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &entry) {
165 resultProvider->modify(entry.staticCast<DomainType>());
166 });
167 QObject::connect(&sConfigNotifier, &ConfigNotifier::added, guard, [resultProvider](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &entry) {
168 resultProvider->add(entry.staticCast<DomainType>());
169 });
170 QObject::connect(&sConfigNotifier, &ConfigNotifier::removed, guard,[resultProvider](const Sink::ApplicationDomain::ApplicationDomainType::Ptr &entry) {
171 resultProvider->remove(entry.staticCast<DomainType>());
172 });
173 }
174 // TODO initialResultSetComplete should be implicit
175 resultProvider->initialResultSetComplete(typename DomainType::Ptr());
176 resultProvider->complete();
177 });
178 resultProvider->onDone([=]() { delete resultProvider; delete guard; });
179
180 return qMakePair(KAsync::null<void>(), emitter);
181} 240}
182 241
183
184ResourceFacade::ResourceFacade() : LocalStorageFacade<Sink::ApplicationDomain::SinkResource>("resources") 242ResourceFacade::ResourceFacade() : LocalStorageFacade<Sink::ApplicationDomain::SinkResource>("resources")
185{ 243{
186} 244}
@@ -192,13 +250,28 @@ ResourceFacade::~ResourceFacade()
192KAsync::Job<void> ResourceFacade::remove(const Sink::ApplicationDomain::SinkResource &resource) 250KAsync::Job<void> ResourceFacade::remove(const Sink::ApplicationDomain::SinkResource &resource)
193{ 251{
194 const auto identifier = resource.identifier(); 252 const auto identifier = resource.identifier();
195 return LocalStorageFacade<Sink::ApplicationDomain::SinkResource>::remove(resource).then<void>([identifier]() { 253 return Sink::Store::removeDataFromDisk(identifier).then(LocalStorageFacade<Sink::ApplicationDomain::SinkResource>::remove(resource));
196 // TODO shutdown resource, or use the resource process with a --remove option to cleanup (so we can take advantage of the file locking) 254}
197 QDir dir(Sink::storageLocation()); 255
198 for (const auto &folder : dir.entryList(QStringList() << identifier + "*")) { 256QPair<KAsync::Job<void>, typename Sink::ResultEmitter<typename ApplicationDomain::SinkResource::Ptr>::Ptr> ResourceFacade::load(const Sink::Query &query)
199 Sink::Storage(Sink::storageLocation(), folder, Sink::Storage::ReadWrite).removeFromDisk(); 257{
258 auto runner = new LocalStorageQueryRunner<ApplicationDomain::SinkResource>(query, mIdentifier, sConfigNotifier);
259 auto monitoredResources = QSharedPointer<QSet<QByteArray>>::create();
260 runner->setStatusUpdater([runner, monitoredResources](ApplicationDomain::SinkResource &resource) {
261 auto resourceAccess = ResourceAccessFactory::instance().getAccess(resource.identifier(), ResourceConfig::getResourceType(resource.identifier()));
262 if (!monitoredResources->contains(resource.identifier())) {
263 auto ret = QObject::connect(resourceAccess.data(), &ResourceAccess::notification, runner->guard(), [resource, runner, resourceAccess](const Notification &notification) {
264 SinkTrace() << "Received notification in facade: " << notification.type;
265 if (notification.type == Notification::Status) {
266 runner->statusChanged(resource.identifier());
267 }
268 });
269 Q_ASSERT(ret);
270 monitoredResources->insert(resource.identifier());
200 } 271 }
272 resource.setStatusStatus(resourceAccess->getResourceStatus());
201 }); 273 });
274 return qMakePair(KAsync::null<void>(), runner->emitter());
202} 275}
203 276
204 277
@@ -210,6 +283,55 @@ AccountFacade::~AccountFacade()
210{ 283{
211} 284}
212 285
286QPair<KAsync::Job<void>, typename Sink::ResultEmitter<typename ApplicationDomain::SinkAccount::Ptr>::Ptr> AccountFacade::load(const Sink::Query &query)
287{
288 auto runner = new LocalStorageQueryRunner<ApplicationDomain::SinkAccount>(query, mIdentifier, sConfigNotifier);
289 auto monitoredResources = QSharedPointer<QSet<QByteArray>>::create();
290 runner->setStatusUpdater([runner, monitoredResources](ApplicationDomain::SinkAccount &account) {
291 Query query;
292 query.filter<ApplicationDomain::SinkResource::Account>(account.identifier());
293 const auto resources = Store::read<ApplicationDomain::SinkResource>(query);
294 SinkTrace() << "Found resource belonging to the account " << account.identifier() << " : " << resources;
295 auto accountIdentifier = account.identifier();
296 ApplicationDomain::Status status = ApplicationDomain::ConnectedStatus;
297 for (const auto &resource : resources) {
298 auto resourceAccess = ResourceAccessFactory::instance().getAccess(resource.identifier(), ResourceConfig::getResourceType(resource.identifier()));
299 if (!monitoredResources->contains(resource.identifier())) {
300 auto ret = QObject::connect(resourceAccess.data(), &ResourceAccess::notification, runner->guard(), [resource, runner, resourceAccess, accountIdentifier](const Notification &notification) {
301 SinkTrace() << "Received notification in facade: " << notification.type;
302 if (notification.type == Notification::Status) {
303 runner->statusChanged(accountIdentifier);
304 }
305 });
306 Q_ASSERT(ret);
307 monitoredResources->insert(resource.identifier());
308 }
309
310 //Figure out overall status
311 auto s = resourceAccess->getResourceStatus();
312 switch (s) {
313 case ApplicationDomain::ErrorStatus:
314 status = ApplicationDomain::ErrorStatus;
315 break;
316 case ApplicationDomain::OfflineStatus:
317 if (status == ApplicationDomain::ConnectedStatus) {
318 status = ApplicationDomain::OfflineStatus;
319 }
320 break;
321 case ApplicationDomain::ConnectedStatus:
322 break;
323 case ApplicationDomain::BusyStatus:
324 if (status != ApplicationDomain::ErrorStatus) {
325 status = ApplicationDomain::BusyStatus;
326 }
327 break;
328 }
329 }
330 account.setStatusStatus(status);
331 });
332 return qMakePair(KAsync::null<void>(), runner->emitter());
333}
334
213IdentityFacade::IdentityFacade() : LocalStorageFacade<Sink::ApplicationDomain::Identity>("identities") 335IdentityFacade::IdentityFacade() : LocalStorageFacade<Sink::ApplicationDomain::Identity>("identities")
214{ 336{
215} 337}
diff --git a/common/resourcefacade.h b/common/resourcefacade.h
index 989375d..23c453a 100644
--- a/common/resourcefacade.h
+++ b/common/resourcefacade.h
@@ -56,6 +56,24 @@ signals:
56}; 56};
57 57
58template <typename DomainType> 58template <typename DomainType>
59class LocalStorageQueryRunner
60{
61public:
62 LocalStorageQueryRunner(const Sink::Query &query, const QByteArray &identifier, ConfigNotifier &configNotifier);
63 typename Sink::ResultEmitter<typename DomainType::Ptr>::Ptr emitter();
64 void setStatusUpdater(const std::function<void(DomainType &)> &);
65 void statusChanged(const QByteArray &identifier);
66 QObject *guard() const;
67
68private:
69 void updateStatus(DomainType &entity);
70 std::function<void(DomainType &)> mStatusUpdater;
71 QSharedPointer<Sink::ResultProvider<typename DomainType::Ptr>> mResultProvider;
72 ConfigStore mConfigStore;
73 std::unique_ptr<QObject> mGuard;
74};
75
76template <typename DomainType>
59class LocalStorageFacade : public Sink::StoreFacade<DomainType> 77class LocalStorageFacade : public Sink::StoreFacade<DomainType>
60{ 78{
61public: 79public:
@@ -65,13 +83,14 @@ public:
65 virtual KAsync::Job<void> modify(const DomainType &resource) Q_DECL_OVERRIDE; 83 virtual KAsync::Job<void> modify(const DomainType &resource) Q_DECL_OVERRIDE;
66 virtual KAsync::Job<void> remove(const DomainType &resource) Q_DECL_OVERRIDE; 84 virtual KAsync::Job<void> remove(const DomainType &resource) Q_DECL_OVERRIDE;
67 virtual QPair<KAsync::Job<void>, typename Sink::ResultEmitter<typename DomainType::Ptr>::Ptr> load(const Sink::Query &query) Q_DECL_OVERRIDE; 85 virtual QPair<KAsync::Job<void>, typename Sink::ResultEmitter<typename DomainType::Ptr>::Ptr> load(const Sink::Query &query) Q_DECL_OVERRIDE;
86
87protected:
88 QByteArray mIdentifier;
89 static ConfigNotifier sConfigNotifier;
90
68private: 91private:
69 typename DomainType::Ptr readFromConfig(const QByteArray &id, const QByteArray &type); 92 typename DomainType::Ptr readFromConfig(const QByteArray &id, const QByteArray &type);
70 static typename DomainType::Ptr readFromConfig(ConfigStore &store, const QByteArray &id, const QByteArray &type);
71
72 ConfigStore mConfigStore; 93 ConfigStore mConfigStore;
73 static ConfigNotifier sConfigNotifier;
74 QByteArray mResourceInstanceIdentifier;
75}; 94};
76 95
77class ResourceFacade : public LocalStorageFacade<Sink::ApplicationDomain::SinkResource> 96class ResourceFacade : public LocalStorageFacade<Sink::ApplicationDomain::SinkResource>
@@ -80,6 +99,7 @@ public:
80 ResourceFacade(); 99 ResourceFacade();
81 virtual ~ResourceFacade(); 100 virtual ~ResourceFacade();
82 virtual KAsync::Job<void> remove(const Sink::ApplicationDomain::SinkResource &resource) Q_DECL_OVERRIDE; 101 virtual KAsync::Job<void> remove(const Sink::ApplicationDomain::SinkResource &resource) Q_DECL_OVERRIDE;
102 virtual QPair<KAsync::Job<void>, typename Sink::ResultEmitter<typename Sink::ApplicationDomain::SinkResource::Ptr>::Ptr> load(const Sink::Query &query) Q_DECL_OVERRIDE;
83}; 103};
84 104
85class AccountFacade : public LocalStorageFacade<Sink::ApplicationDomain::SinkAccount> 105class AccountFacade : public LocalStorageFacade<Sink::ApplicationDomain::SinkAccount>
@@ -87,6 +107,7 @@ class AccountFacade : public LocalStorageFacade<Sink::ApplicationDomain::SinkAcc
87public: 107public:
88 AccountFacade(); 108 AccountFacade();
89 virtual ~AccountFacade(); 109 virtual ~AccountFacade();
110 virtual QPair<KAsync::Job<void>, typename Sink::ResultEmitter<typename Sink::ApplicationDomain::SinkAccount::Ptr>::Ptr> load(const Sink::Query &query) Q_DECL_OVERRIDE;
90}; 111};
91 112
92class IdentityFacade : public LocalStorageFacade<Sink::ApplicationDomain::Identity> 113class IdentityFacade : public LocalStorageFacade<Sink::ApplicationDomain::Identity>
diff --git a/common/sourcewriteback.cpp b/common/sourcewriteback.cpp
index a277606..7d21ea6 100644
--- a/common/sourcewriteback.cpp
+++ b/common/sourcewriteback.cpp
@@ -26,6 +26,8 @@
26#define ENTITY_TYPE_MAIL "mail" 26#define ENTITY_TYPE_MAIL "mail"
27#define ENTITY_TYPE_FOLDER "folder" 27#define ENTITY_TYPE_FOLDER "folder"
28 28
29SINK_DEBUG_AREA("sourcewriteback")
30
29using namespace Sink; 31using namespace Sink;
30 32
31SourceWriteBack::SourceWriteBack(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier) 33SourceWriteBack::SourceWriteBack(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier)
@@ -55,14 +57,14 @@ RemoteIdMap &SourceWriteBack::syncStore()
55 57
56KAsync::Job<void> SourceWriteBack::replay(const QByteArray &type, const QByteArray &key, const QByteArray &value) 58KAsync::Job<void> SourceWriteBack::replay(const QByteArray &type, const QByteArray &key, const QByteArray &value)
57{ 59{
58 Trace() << "Replaying" << type << key; 60 SinkTrace() << "Replaying" << type << key;
59 61
60 Sink::EntityBuffer buffer(value); 62 Sink::EntityBuffer buffer(value);
61 const Sink::Entity &entity = buffer.entity(); 63 const Sink::Entity &entity = buffer.entity();
62 const auto metadataBuffer = Sink::EntityBuffer::readBuffer<Sink::Metadata>(entity.metadata()); 64 const auto metadataBuffer = Sink::EntityBuffer::readBuffer<Sink::Metadata>(entity.metadata());
63 Q_ASSERT(metadataBuffer); 65 Q_ASSERT(metadataBuffer);
64 if (!metadataBuffer->replayToSource()) { 66 if (!metadataBuffer->replayToSource()) {
65 Trace() << "Change is coming from the source"; 67 SinkTrace() << "Change is coming from the source";
66 return KAsync::null<void>(); 68 return KAsync::null<void>();
67 } 69 }
68 Q_ASSERT(!mSyncStore); 70 Q_ASSERT(!mSyncStore);
@@ -81,11 +83,11 @@ KAsync::Job<void> SourceWriteBack::replay(const QByteArray &type, const QByteArr
81 if (operation != Sink::Operation_Creation) { 83 if (operation != Sink::Operation_Creation) {
82 oldRemoteId = syncStore().resolveLocalId(type, uid); 84 oldRemoteId = syncStore().resolveLocalId(type, uid);
83 if (oldRemoteId.isEmpty()) { 85 if (oldRemoteId.isEmpty()) {
84 Warning() << "Couldn't find the remote id for: " << type << uid; 86 SinkWarning() << "Couldn't find the remote id for: " << type << uid;
85 return KAsync::error<void>(1, "Couldn't find the remote id."); 87 return KAsync::error<void>(1, "Couldn't find the remote id.");
86 } 88 }
87 } 89 }
88 Trace() << "Replaying " << key << type << uid << oldRemoteId; 90 SinkTrace() << "Replaying " << key << type << uid << oldRemoteId;
89 91
90 KAsync::Job<QByteArray> job = KAsync::null<QByteArray>(); 92 KAsync::Job<QByteArray> job = KAsync::null<QByteArray>();
91 if (type == ENTITY_TYPE_FOLDER) { 93 if (type == ENTITY_TYPE_FOLDER) {
@@ -98,24 +100,24 @@ KAsync::Job<void> SourceWriteBack::replay(const QByteArray &type, const QByteArr
98 100
99 return job.then<void, QByteArray>([this, operation, type, uid, oldRemoteId](const QByteArray &remoteId) { 101 return job.then<void, QByteArray>([this, operation, type, uid, oldRemoteId](const QByteArray &remoteId) {
100 if (operation == Sink::Operation_Creation) { 102 if (operation == Sink::Operation_Creation) {
101 Trace() << "Replayed creation with remote id: " << remoteId; 103 SinkTrace() << "Replayed creation with remote id: " << remoteId;
102 if (remoteId.isEmpty()) { 104 if (remoteId.isEmpty()) {
103 Warning() << "Returned an empty remoteId from the creation"; 105 SinkWarning() << "Returned an empty remoteId from the creation";
104 } else { 106 } else {
105 syncStore().recordRemoteId(type, uid, remoteId); 107 syncStore().recordRemoteId(type, uid, remoteId);
106 } 108 }
107 } else if (operation == Sink::Operation_Modification) { 109 } else if (operation == Sink::Operation_Modification) {
108 Trace() << "Replayed modification with remote id: " << remoteId; 110 SinkTrace() << "Replayed modification with remote id: " << remoteId;
109 if (remoteId.isEmpty()) { 111 if (remoteId.isEmpty()) {
110 Warning() << "Returned an empty remoteId from the creation"; 112 SinkWarning() << "Returned an empty remoteId from the creation";
111 } else { 113 } else {
112 syncStore().updateRemoteId(type, uid, remoteId); 114 syncStore().updateRemoteId(type, uid, remoteId);
113 } 115 }
114 } else if (operation == Sink::Operation_Removal) { 116 } else if (operation == Sink::Operation_Removal) {
115 Trace() << "Replayed removal with remote id: " << oldRemoteId; 117 SinkTrace() << "Replayed removal with remote id: " << oldRemoteId;
116 syncStore().removeRemoteId(type, uid, oldRemoteId); 118 syncStore().removeRemoteId(type, uid, oldRemoteId);
117 } else { 119 } else {
118 ErrorMsg() << "Unkown operation" << operation; 120 SinkError() << "Unkown operation" << operation;
119 } 121 }
120 122
121 mSyncStore.clear(); 123 mSyncStore.clear();
@@ -123,7 +125,7 @@ KAsync::Job<void> SourceWriteBack::replay(const QByteArray &type, const QByteArr
123 mTransaction.abort(); 125 mTransaction.abort();
124 mSyncTransaction.commit(); 126 mSyncTransaction.commit();
125 }, [this](int errorCode, const QString &errorMessage) { 127 }, [this](int errorCode, const QString &errorMessage) {
126 Warning() << "Failed to replay change: " << errorMessage; 128 SinkWarning() << "Failed to replay change: " << errorMessage;
127 mSyncStore.clear(); 129 mSyncStore.clear();
128 mEntityStore.clear(); 130 mEntityStore.clear();
129 mTransaction.abort(); 131 mTransaction.abort();
diff --git a/common/specialpurposepreprocessor.cpp b/common/specialpurposepreprocessor.cpp
index 2892105..0985880 100644
--- a/common/specialpurposepreprocessor.cpp
+++ b/common/specialpurposepreprocessor.cpp
@@ -5,6 +5,8 @@
5 5
6using namespace Sink; 6using namespace Sink;
7 7
8SINK_DEBUG_AREA("SpecialPurposeProcessor")
9
8static QHash<QByteArray, QString> specialPurposeFolders() 10static QHash<QByteArray, QString> specialPurposeFolders()
9{ 11{
10 QHash<QByteArray, QString> hash; 12 QHash<QByteArray, QString> hash;
@@ -53,7 +55,7 @@ QByteArray SpecialPurposeProcessor::ensureFolder(Sink::Storage::Transaction &tra
53 return false; 55 return false;
54 }); 56 });
55 if (!mSpecialPurposeFolders.contains(specialPurpose)) { 57 if (!mSpecialPurposeFolders.contains(specialPurpose)) {
56 Trace() << "Failed to find a drafts folder, creating a new one"; 58 SinkTrace() << "Failed to find a drafts folder, creating a new one";
57 auto folder = ApplicationDomain::Folder::create(mResourceInstanceIdentifier); 59 auto folder = ApplicationDomain::Folder::create(mResourceInstanceIdentifier);
58 folder.setSpecialPurpose(QByteArrayList() << specialPurpose); 60 folder.setSpecialPurpose(QByteArrayList() << specialPurpose);
59 folder.setName(sSpecialPurposeFolders.value(specialPurpose)); 61 folder.setName(sSpecialPurposeFolders.value(specialPurpose));
diff --git a/common/storage.h b/common/storage.h
index e7b4a3e..4ef20d5 100644
--- a/common/storage.h
+++ b/common/storage.h
@@ -103,18 +103,8 @@ public:
103 */ 103 */
104 bool contains(const QByteArray &uid); 104 bool contains(const QByteArray &uid);
105 105
106 NamedDatabase(NamedDatabase &&other) : d(other.d) 106 NamedDatabase(NamedDatabase &&other);
107 { 107 NamedDatabase &operator=(NamedDatabase &&other);
108 d = other.d;
109 other.d = nullptr;
110 }
111
112 NamedDatabase &operator=(NamedDatabase &&other)
113 {
114 d = other.d;
115 other.d = nullptr;
116 return *this;
117 }
118 108
119 operator bool() const 109 operator bool() const
120 { 110 {
@@ -146,17 +136,8 @@ public:
146 NamedDatabase openDatabase(const QByteArray &name = QByteArray("default"), 136 NamedDatabase openDatabase(const QByteArray &name = QByteArray("default"),
147 const std::function<void(const Storage::Error &error)> &errorHandler = std::function<void(const Storage::Error &error)>(), bool allowDuplicates = false) const; 137 const std::function<void(const Storage::Error &error)> &errorHandler = std::function<void(const Storage::Error &error)>(), bool allowDuplicates = false) const;
148 138
149 Transaction(Transaction &&other) : d(other.d) 139 Transaction(Transaction &&other);
150 { 140 Transaction &operator=(Transaction &&other);
151 d = other.d;
152 other.d = nullptr;
153 }
154 Transaction &operator=(Transaction &&other)
155 {
156 d = other.d;
157 other.d = nullptr;
158 return *this;
159 }
160 141
161 operator bool() const; 142 operator bool() const;
162 143
diff --git a/common/storage_common.cpp b/common/storage_common.cpp
index 6982a4c..1f2594e 100644
--- a/common/storage_common.cpp
+++ b/common/storage_common.cpp
@@ -24,6 +24,8 @@
24#include "log.h" 24#include "log.h"
25#include <QUuid> 25#include <QUuid>
26 26
27SINK_DEBUG_AREA("storage")
28
27namespace Sink { 29namespace Sink {
28 30
29static const char *s_internalPrefix = "__internal"; 31static const char *s_internalPrefix = "__internal";
@@ -31,7 +33,7 @@ static const int s_internalPrefixSize = strlen(s_internalPrefix);
31 33
32void errorHandler(const Storage::Error &error) 34void errorHandler(const Storage::Error &error)
33{ 35{
34 Warning() << "Database error in " << error.store << ", code " << error.code << ", message: " << error.message; 36 SinkWarning() << "Database error in " << error.store << ", code " << error.code << ", message: " << error.message;
35} 37}
36 38
37std::function<void(const Storage::Error &error)> Storage::basicErrorHandler() 39std::function<void(const Storage::Error &error)> Storage::basicErrorHandler()
@@ -67,7 +69,7 @@ qint64 Storage::maxRevision(const Sink::Storage::Transaction &transaction)
67 }, 69 },
68 [](const Error &error) { 70 [](const Error &error) {
69 if (error.code != Sink::Storage::NotFound) { 71 if (error.code != Sink::Storage::NotFound) {
70 Warning() << "Coultn'd find the maximum revision."; 72 SinkWarning() << "Coultn'd find the maximum revision.";
71 } 73 }
72 }); 74 });
73 return r; 75 return r;
@@ -88,7 +90,7 @@ qint64 Storage::cleanedUpRevision(const Sink::Storage::Transaction &transaction)
88 }, 90 },
89 [](const Error &error) { 91 [](const Error &error) {
90 if (error.code != Sink::Storage::NotFound) { 92 if (error.code != Sink::Storage::NotFound) {
91 Warning() << "Coultn'd find the maximum revision."; 93 SinkWarning() << "Coultn'd find the maximum revision.";
92 } 94 }
93 }); 95 });
94 return r; 96 return r;
@@ -103,7 +105,7 @@ QByteArray Storage::getUidFromRevision(const Sink::Storage::Transaction &transac
103 uid = value; 105 uid = value;
104 return false; 106 return false;
105 }, 107 },
106 [revision](const Error &error) { Warning() << "Coultn'd find uid for revision: " << revision << error.message; }); 108 [revision](const Error &error) { SinkWarning() << "Coultn'd find uid for revision: " << revision << error.message; });
107 return uid; 109 return uid;
108} 110}
109 111
@@ -116,7 +118,7 @@ QByteArray Storage::getTypeFromRevision(const Sink::Storage::Transaction &transa
116 type = value; 118 type = value;
117 return false; 119 return false;
118 }, 120 },
119 [revision](const Error &error) { Warning() << "Coultn'd find type for revision " << revision; }); 121 [revision](const Error &error) { SinkWarning() << "Coultn'd find type for revision " << revision; });
120 return type; 122 return type;
121} 123}
122 124
diff --git a/common/storage_lmdb.cpp b/common/storage_lmdb.cpp
index 3687594..79f4465 100644
--- a/common/storage_lmdb.cpp
+++ b/common/storage_lmdb.cpp
@@ -34,10 +34,8 @@
34#include <lmdb.h> 34#include <lmdb.h>
35#include "log.h" 35#include "log.h"
36 36
37#undef Trace 37SINK_DEBUG_AREA("storage")
38#define Trace() Trace_area("storage." + d->storageRoot.toLatin1() + '/' + d->name.toLatin1()) 38// SINK_DEBUG_COMPONENT(d->storageRoot.toLatin1() + '/' + d->name.toLatin1())
39#undef Warning
40#define Warning() Warning_area("storage")
41 39
42namespace Sink { 40namespace Sink {
43 41
@@ -103,6 +101,21 @@ Storage::NamedDatabase::NamedDatabase(NamedDatabase::Private *prv) : d(prv)
103{ 101{
104} 102}
105 103
104Storage::NamedDatabase::NamedDatabase(NamedDatabase &&other) : d(nullptr)
105{
106 *this = std::move(other);
107}
108
109Storage::NamedDatabase &Storage::NamedDatabase::operator=(Storage::NamedDatabase &&other)
110{
111 if (&other != this) {
112 delete d;
113 d = other.d;
114 other.d = nullptr;
115 }
116 return *this;
117}
118
106Storage::NamedDatabase::~NamedDatabase() 119Storage::NamedDatabase::~NamedDatabase()
107{ 120{
108 delete d; 121 delete d;
@@ -339,7 +352,7 @@ qint64 Storage::NamedDatabase::getSize()
339 MDB_stat stat; 352 MDB_stat stat;
340 rc = mdb_stat(d->transaction, d->dbi, &stat); 353 rc = mdb_stat(d->transaction, d->dbi, &stat);
341 if (rc) { 354 if (rc) {
342 Warning() << "Something went wrong " << QByteArray(mdb_strerror(rc)); 355 SinkWarning() << "Something went wrong " << QByteArray(mdb_strerror(rc));
343 } 356 }
344 // std::cout << "overflow_pages: " << stat.ms_overflow_pages << std::endl; 357 // std::cout << "overflow_pages: " << stat.ms_overflow_pages << std::endl;
345 // std::cout << "page size: " << stat.ms_psize << std::endl; 358 // std::cout << "page size: " << stat.ms_psize << std::endl;
@@ -398,6 +411,21 @@ Storage::Transaction::Transaction(Transaction::Private *prv) : d(prv)
398 d->startTransaction(); 411 d->startTransaction();
399} 412}
400 413
414Storage::Transaction::Transaction(Transaction &&other) : d(nullptr)
415{
416 *this = std::move(other);
417}
418
419Storage::Transaction &Storage::Transaction::operator=(Storage::Transaction &&other)
420{
421 if (&other != this) {
422 delete d;
423 d = other.d;
424 other.d = nullptr;
425 }
426 return *this;
427}
428
401Storage::Transaction::~Transaction() 429Storage::Transaction::~Transaction()
402{ 430{
403 if (d && d->transaction) { 431 if (d && d->transaction) {
@@ -452,7 +480,7 @@ static bool ensureCorrectDb(Storage::NamedDatabase &database, const QByteArray &
452 bool openedTheWrongDatabase = false; 480 bool openedTheWrongDatabase = false;
453 auto count = database.scan("__internal_dbname", [db, &openedTheWrongDatabase](const QByteArray &key, const QByteArray &value) ->bool { 481 auto count = database.scan("__internal_dbname", [db, &openedTheWrongDatabase](const QByteArray &key, const QByteArray &value) ->bool {
454 if (value != db) { 482 if (value != db) {
455 Warning() << "Opened the wrong database, got " << value << " instead of " << db; 483 SinkWarning() << "Opened the wrong database, got " << value << " instead of " << db;
456 openedTheWrongDatabase = true; 484 openedTheWrongDatabase = true;
457 } 485 }
458 return false; 486 return false;
@@ -475,7 +503,7 @@ bool Storage::Transaction::validateNamedDatabases()
475 for (const auto &dbName : databases) { 503 for (const auto &dbName : databases) {
476 auto db = openDatabase(dbName); 504 auto db = openDatabase(dbName);
477 if (!db) { 505 if (!db) {
478 Warning() << "Failed to open the database: " << dbName; 506 SinkWarning() << "Failed to open the database: " << dbName;
479 return false; 507 return false;
480 } 508 }
481 } 509 }
@@ -497,7 +525,7 @@ Storage::NamedDatabase Storage::Transaction::openDatabase(const QByteArray &db,
497 } 525 }
498 auto database = Storage::NamedDatabase(p); 526 auto database = Storage::NamedDatabase(p);
499 if (!ensureCorrectDb(database, db, d->requestedRead)) { 527 if (!ensureCorrectDb(database, db, d->requestedRead)) {
500 Warning() << "Failed to open the database" << db; 528 SinkWarning() << "Failed to open the database" << db;
501 return Storage::NamedDatabase(); 529 return Storage::NamedDatabase();
502 } 530 }
503 return database; 531 return database;
@@ -506,7 +534,7 @@ Storage::NamedDatabase Storage::Transaction::openDatabase(const QByteArray &db,
506QList<QByteArray> Storage::Transaction::getDatabaseNames() const 534QList<QByteArray> Storage::Transaction::getDatabaseNames() const
507{ 535{
508 if (!d) { 536 if (!d) {
509 Warning() << "Invalid transaction"; 537 SinkWarning() << "Invalid transaction";
510 return QList<QByteArray>(); 538 return QList<QByteArray>();
511 } 539 }
512 540
@@ -529,11 +557,12 @@ QList<QByteArray> Storage::Transaction::getDatabaseNames() const
529 rc = 0; 557 rc = 0;
530 } 558 }
531 if (rc) { 559 if (rc) {
532 Warning() << "Failed to get a value" << rc; 560 SinkWarning() << "Failed to get a value" << rc;
533 } 561 }
534 } 562 }
563 mdb_cursor_close(cursor);
535 } else { 564 } else {
536 Warning() << "Failed to open db" << rc << QByteArray(mdb_strerror(rc)); 565 SinkWarning() << "Failed to open db" << rc << QByteArray(mdb_strerror(rc));
537 } 566 }
538 return list; 567 return list;
539} 568}
@@ -580,7 +609,7 @@ Storage::Private::Private(const QString &s, const QString &n, AccessMode m) : st
580 int rc = 0; 609 int rc = 0;
581 if ((rc = mdb_env_create(&env))) { 610 if ((rc = mdb_env_create(&env))) {
582 // TODO: handle error 611 // TODO: handle error
583 Warning() << "mdb_env_create: " << rc << " " << mdb_strerror(rc); 612 SinkWarning() << "mdb_env_create: " << rc << " " << mdb_strerror(rc);
584 } else { 613 } else {
585 mdb_env_set_maxdbs(env, 50); 614 mdb_env_set_maxdbs(env, 50);
586 unsigned int flags = MDB_NOTLS; 615 unsigned int flags = MDB_NOTLS;
@@ -588,11 +617,13 @@ Storage::Private::Private(const QString &s, const QString &n, AccessMode m) : st
588 flags |= MDB_RDONLY; 617 flags |= MDB_RDONLY;
589 } 618 }
590 if ((rc = mdb_env_open(env, fullPath.toStdString().data(), flags, 0664))) { 619 if ((rc = mdb_env_open(env, fullPath.toStdString().data(), flags, 0664))) {
591 Warning() << "mdb_env_open: " << rc << " " << mdb_strerror(rc); 620 SinkWarning() << "mdb_env_open: " << rc << " " << mdb_strerror(rc);
592 mdb_env_close(env); 621 mdb_env_close(env);
593 env = 0; 622 env = 0;
594 } else { 623 } else {
595 // FIXME: dynamic resize 624 // FIXME: dynamic resize
625 // In order to run valgrind this size must be smaller than half your available RAM
626 // https://github.com/BVLC/caffe/issues/2404
596 const size_t dbSize = (size_t)10485760 * (size_t)8000; // 1MB * 8000 627 const size_t dbSize = (size_t)10485760 * (size_t)8000; // 1MB * 8000
597 mdb_env_set_mapsize(env, dbSize); 628 mdb_env_set_mapsize(env, dbSize);
598 sEnvironments.insert(fullPath, env); 629 sEnvironments.insert(fullPath, env);
@@ -648,7 +679,7 @@ qint64 Storage::diskUsage() const
648{ 679{
649 QFileInfo info(d->storageRoot + '/' + d->name + "/data.mdb"); 680 QFileInfo info(d->storageRoot + '/' + d->name + "/data.mdb");
650 if (!info.exists()) { 681 if (!info.exists()) {
651 Warning() << "Tried to get filesize for non-existant file: " << info.path(); 682 SinkWarning() << "Tried to get filesize for non-existant file: " << info.path();
652 } 683 }
653 return info.size(); 684 return info.size();
654} 685}
@@ -658,7 +689,7 @@ void Storage::removeFromDisk() const
658 const QString fullPath(d->storageRoot + '/' + d->name); 689 const QString fullPath(d->storageRoot + '/' + d->name);
659 QMutexLocker locker(&d->sMutex); 690 QMutexLocker locker(&d->sMutex);
660 QDir dir(fullPath); 691 QDir dir(fullPath);
661 Trace() << "Removing database from disk: " << fullPath; 692 SinkTrace() << "Removing database from disk: " << fullPath;
662 if (!dir.removeRecursively()) { 693 if (!dir.removeRecursively()) {
663 Error error(d->name.toLatin1(), ErrorCodes::GenericError, QString("Failed to remove directory %1 %2").arg(d->storageRoot).arg(d->name).toLatin1()); 694 Error error(d->name.toLatin1(), ErrorCodes::GenericError, QString("Failed to remove directory %1 %2").arg(d->storageRoot).arg(d->name).toLatin1());
664 defaultErrorHandler()(error); 695 defaultErrorHandler()(error);
diff --git a/common/store.cpp b/common/store.cpp
index 1162a18..a58287b 100644
--- a/common/store.cpp
+++ b/common/store.cpp
@@ -36,8 +36,7 @@
36#include "storage.h" 36#include "storage.h"
37#include "log.h" 37#include "log.h"
38 38
39#undef DEBUG_AREA 39SINK_DEBUG_AREA("store")
40#define DEBUG_AREA "client.store"
41 40
42namespace Sink { 41namespace Sink {
43 42
@@ -88,24 +87,24 @@ static QMap<QByteArray, QByteArray> getResources(const QList<QByteArray> &resour
88 } 87 }
89 resources.insert(res, configuredResources.value(res)); 88 resources.insert(res, configuredResources.value(res));
90 } else { 89 } else {
91 Warning() << "Resource is not existing: " << res; 90 SinkWarning() << "Resource is not existing: " << res;
92 } 91 }
93 } 92 }
94 } 93 }
95 Trace() << "Found resources: " << resources; 94 SinkTrace() << "Found resources: " << resources;
96 return resources; 95 return resources;
97} 96}
98 97
99template <class DomainType> 98template <class DomainType>
100QSharedPointer<QAbstractItemModel> Store::loadModel(Query query) 99QSharedPointer<QAbstractItemModel> Store::loadModel(Query query)
101{ 100{
102 Trace() << "Query: " << ApplicationDomain::getTypeName<DomainType>(); 101 SinkTrace() << "Query: " << ApplicationDomain::getTypeName<DomainType>();
103 Trace() << " Requested: " << query.requestedProperties; 102 SinkTrace() << " Requested: " << query.requestedProperties;
104 Trace() << " Filter: " << query.propertyFilter; 103 SinkTrace() << " Filter: " << query.propertyFilter;
105 Trace() << " Parent: " << query.parentProperty; 104 SinkTrace() << " Parent: " << query.parentProperty;
106 Trace() << " Ids: " << query.ids; 105 SinkTrace() << " Ids: " << query.ids;
107 Trace() << " IsLive: " << query.liveQuery; 106 SinkTrace() << " IsLive: " << query.liveQuery;
108 Trace() << " Sorting: " << query.sortProperty; 107 SinkTrace() << " Sorting: " << query.sortProperty;
109 auto model = QSharedPointer<ModelResult<DomainType, typename DomainType::Ptr>>::create(query, query.requestedProperties); 108 auto model = QSharedPointer<ModelResult<DomainType, typename DomainType::Ptr>>::create(query, query.requestedProperties);
110 109
111 //* Client defines lifetime of model 110 //* Client defines lifetime of model
@@ -123,16 +122,16 @@ QSharedPointer<QAbstractItemModel> Store::loadModel(Query query)
123 const auto resourceType = resources.value(resourceInstanceIdentifier); 122 const auto resourceType = resources.value(resourceInstanceIdentifier);
124 auto facade = FacadeFactory::instance().getFacade<DomainType>(resourceType, resourceInstanceIdentifier); 123 auto facade = FacadeFactory::instance().getFacade<DomainType>(resourceType, resourceInstanceIdentifier);
125 if (facade) { 124 if (facade) {
126 Trace() << "Trying to fetch from resource " << resourceInstanceIdentifier; 125 SinkTrace() << "Trying to fetch from resource " << resourceInstanceIdentifier;
127 auto result = facade->load(query); 126 auto result = facade->load(query);
128 if (result.second) { 127 if (result.second) {
129 aggregatingEmitter->addEmitter(result.second); 128 aggregatingEmitter->addEmitter(result.second);
130 } else { 129 } else {
131 Warning() << "Null emitter for resource " << resourceInstanceIdentifier; 130 SinkWarning() << "Null emitter for resource " << resourceInstanceIdentifier;
132 } 131 }
133 result.first.template then<void>([&future]() { future.setFinished(); }).exec(); 132 result.first.template then<void>([&future]() { future.setFinished(); }).exec();
134 } else { 133 } else {
135 Trace() << "Couldn' find a facade for " << resourceInstanceIdentifier; 134 SinkTrace() << "Couldn' find a facade for " << resourceInstanceIdentifier;
136 // Ignore the error and carry on 135 // Ignore the error and carry on
137 future.setFinished(); 136 future.setFinished();
138 } 137 }
@@ -164,7 +163,7 @@ KAsync::Job<void> Store::create(const DomainType &domainObject)
164{ 163{
165 // Potentially move to separate thread as well 164 // Potentially move to separate thread as well
166 auto facade = getFacade<DomainType>(domainObject.resourceInstanceIdentifier()); 165 auto facade = getFacade<DomainType>(domainObject.resourceInstanceIdentifier());
167 return facade->create(domainObject).template then<void>([facade]() {}, [](int errorCode, const QString &error) { Warning() << "Failed to create"; }); 166 return facade->create(domainObject).template then<void>([facade]() {}, [](int errorCode, const QString &error) { SinkWarning() << "Failed to create"; });
168} 167}
169 168
170template <class DomainType> 169template <class DomainType>
@@ -172,7 +171,7 @@ KAsync::Job<void> Store::modify(const DomainType &domainObject)
172{ 171{
173 // Potentially move to separate thread as well 172 // Potentially move to separate thread as well
174 auto facade = getFacade<DomainType>(domainObject.resourceInstanceIdentifier()); 173 auto facade = getFacade<DomainType>(domainObject.resourceInstanceIdentifier());
175 return facade->modify(domainObject).template then<void>([facade]() {}, [](int errorCode, const QString &error) { Warning() << "Failed to modify"; }); 174 return facade->modify(domainObject).template then<void>([facade]() {}, [](int errorCode, const QString &error) { SinkWarning() << "Failed to modify"; });
176} 175}
177 176
178template <class DomainType> 177template <class DomainType>
@@ -180,7 +179,7 @@ KAsync::Job<void> Store::remove(const DomainType &domainObject)
180{ 179{
181 // Potentially move to separate thread as well 180 // Potentially move to separate thread as well
182 auto facade = getFacade<DomainType>(domainObject.resourceInstanceIdentifier()); 181 auto facade = getFacade<DomainType>(domainObject.resourceInstanceIdentifier());
183 return facade->remove(domainObject).template then<void>([facade]() {}, [](int errorCode, const QString &error) { Warning() << "Failed to remove"; }); 182 return facade->remove(domainObject).template then<void>([facade]() {}, [](int errorCode, const QString &error) { SinkWarning() << "Failed to remove"; });
184} 183}
185 184
186KAsync::Job<void> Store::removeDataFromDisk(const QByteArray &identifier) 185KAsync::Job<void> Store::removeDataFromDisk(const QByteArray &identifier)
@@ -188,28 +187,28 @@ KAsync::Job<void> Store::removeDataFromDisk(const QByteArray &identifier)
188 // All databases are going to become invalid, nuke the environments 187 // All databases are going to become invalid, nuke the environments
189 // TODO: all clients should react to a notification the resource 188 // TODO: all clients should react to a notification the resource
190 Sink::Storage::clearEnv(); 189 Sink::Storage::clearEnv();
191 Trace() << "Remove data from disk " << identifier; 190 SinkTrace() << "Remove data from disk " << identifier;
192 auto time = QSharedPointer<QTime>::create(); 191 auto time = QSharedPointer<QTime>::create();
193 time->start(); 192 time->start();
194 auto resourceAccess = ResourceAccessFactory::instance().getAccess(identifier, ResourceConfig::getResourceType(identifier)); 193 auto resourceAccess = ResourceAccessFactory::instance().getAccess(identifier, ResourceConfig::getResourceType(identifier));
195 resourceAccess->open(); 194 resourceAccess->open();
196 return resourceAccess->sendCommand(Sink::Commands::RemoveFromDiskCommand) 195 return resourceAccess->sendCommand(Sink::Commands::RemoveFromDiskCommand)
197 .then<void>([resourceAccess, time]() { Trace() << "Remove from disk complete." << Log::TraceTime(time->elapsed()); }); 196 .then<void>([resourceAccess, time]() { SinkTrace() << "Remove from disk complete." << Log::TraceTime(time->elapsed()); });
198} 197}
199 198
200KAsync::Job<void> Store::synchronize(const Sink::Query &query) 199KAsync::Job<void> Store::synchronize(const Sink::Query &query)
201{ 200{
202 Trace() << "synchronize" << query.resources; 201 SinkTrace() << "synchronize" << query.resources;
203 auto resources = getResources(query.resources, query.accounts).keys(); 202 auto resources = getResources(query.resources, query.accounts).keys();
204 //FIXME only necessary because each doesn't propagate errors 203 //FIXME only necessary because each doesn't propagate errors
205 auto error = new bool; 204 auto error = new bool;
206 return KAsync::iterate(resources) 205 return KAsync::iterate(resources)
207 .template each<void, QByteArray>([query, error](const QByteArray &resource, KAsync::Future<void> &future) { 206 .template each<void, QByteArray>([query, error](const QByteArray &resource, KAsync::Future<void> &future) {
208 Trace() << "Synchronizing " << resource; 207 SinkTrace() << "Synchronizing " << resource;
209 auto resourceAccess = ResourceAccessFactory::instance().getAccess(resource, ResourceConfig::getResourceType(resource)); 208 auto resourceAccess = ResourceAccessFactory::instance().getAccess(resource, ResourceConfig::getResourceType(resource));
210 resourceAccess->open(); 209 resourceAccess->open();
211 resourceAccess->synchronizeResource(true, false).then<void>([resourceAccess, &future]() {Trace() << "synced."; future.setFinished(); }, 210 resourceAccess->synchronizeResource(true, false).then<void>([resourceAccess, &future]() {SinkTrace() << "synced."; future.setFinished(); },
212 [&future, error](int errorCode, QString msg) { *error = true; Warning() << "Error during sync."; future.setError(errorCode, msg); }).exec(); 211 [&future, error](int errorCode, QString msg) { *error = true; SinkWarning() << "Error during sync."; future.setError(errorCode, msg); }).exec();
213 }).then<void>([error](KAsync::Future<void> &future) { 212 }).then<void>([error](KAsync::Future<void> &future) {
214 if (*error) { 213 if (*error) {
215 future.setError(1, "Error during sync."); 214 future.setError(1, "Error during sync.");
@@ -306,25 +305,25 @@ QList<DomainType> Store::read(const Sink::Query &q)
306 auto resources = getResources(query.resources, query.accounts, ApplicationDomain::getTypeName<DomainType>()); 305 auto resources = getResources(query.resources, query.accounts, ApplicationDomain::getTypeName<DomainType>());
307 auto aggregatingEmitter = AggregatingResultEmitter<typename DomainType::Ptr>::Ptr::create(); 306 auto aggregatingEmitter = AggregatingResultEmitter<typename DomainType::Ptr>::Ptr::create();
308 aggregatingEmitter->onAdded([&list](const typename DomainType::Ptr &value){ 307 aggregatingEmitter->onAdded([&list](const typename DomainType::Ptr &value){
309 Trace() << "Found value: " << value->identifier(); 308 SinkTrace() << "Found value: " << value->identifier();
310 list << *value; 309 list << *value;
311 }); 310 });
312 for (const auto resourceInstanceIdentifier : resources.keys()) { 311 for (const auto resourceInstanceIdentifier : resources.keys()) {
313 const auto resourceType = resources.value(resourceInstanceIdentifier); 312 const auto resourceType = resources.value(resourceInstanceIdentifier);
314 Trace() << "Looking for " << resourceType << resourceInstanceIdentifier; 313 SinkTrace() << "Looking for " << resourceType << resourceInstanceIdentifier;
315 auto facade = FacadeFactory::instance().getFacade<DomainType>(resourceType, resourceInstanceIdentifier); 314 auto facade = FacadeFactory::instance().getFacade<DomainType>(resourceType, resourceInstanceIdentifier);
316 if (facade) { 315 if (facade) {
317 Trace() << "Trying to fetch from resource " << resourceInstanceIdentifier; 316 SinkTrace() << "Trying to fetch from resource " << resourceInstanceIdentifier;
318 auto result = facade->load(query); 317 auto result = facade->load(query);
319 if (result.second) { 318 if (result.second) {
320 aggregatingEmitter->addEmitter(result.second); 319 aggregatingEmitter->addEmitter(result.second);
321 } else { 320 } else {
322 Warning() << "Null emitter for resource " << resourceInstanceIdentifier; 321 SinkWarning() << "Null emitter for resource " << resourceInstanceIdentifier;
323 } 322 }
324 result.first.exec(); 323 result.first.exec();
325 aggregatingEmitter->fetch(typename DomainType::Ptr()); 324 aggregatingEmitter->fetch(typename DomainType::Ptr());
326 } else { 325 } else {
327 Trace() << "Couldn't find a facade for " << resourceInstanceIdentifier; 326 SinkTrace() << "Couldn't find a facade for " << resourceInstanceIdentifier;
328 // Ignore the error and carry on 327 // Ignore the error and carry on
329 } 328 }
330 } 329 }
diff --git a/common/synchronizer.cpp b/common/synchronizer.cpp
index 1374d00..2d4fb8d 100644
--- a/common/synchronizer.cpp
+++ b/common/synchronizer.cpp
@@ -29,6 +29,8 @@
29#include "modifyentity_generated.h" 29#include "modifyentity_generated.h"
30#include "deleteentity_generated.h" 30#include "deleteentity_generated.h"
31 31
32SINK_DEBUG_AREA("synchronizer")
33
32using namespace Sink; 34using namespace Sink;
33 35
34Synchronizer::Synchronizer(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier) 36Synchronizer::Synchronizer(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier)
@@ -37,7 +39,7 @@ Synchronizer::Synchronizer(const QByteArray &resourceType, const QByteArray &res
37 mResourceType(resourceType), 39 mResourceType(resourceType),
38 mResourceInstanceIdentifier(resourceInstanceIdentifier) 40 mResourceInstanceIdentifier(resourceInstanceIdentifier)
39{ 41{
40 Trace() << "Starting synchronizer: " << resourceType << resourceInstanceIdentifier; 42 SinkTrace() << "Starting synchronizer: " << resourceType << resourceInstanceIdentifier;
41} 43}
42 44
43Synchronizer::~Synchronizer() 45Synchronizer::~Synchronizer()
@@ -129,11 +131,11 @@ void Synchronizer::scanForRemovals(const QByteArray &bufferType, const std::func
129 entryGenerator([this, bufferType, &exists](const QByteArray &key) { 131 entryGenerator([this, bufferType, &exists](const QByteArray &key) {
130 auto sinkId = Sink::Storage::uidFromKey(key); 132 auto sinkId = Sink::Storage::uidFromKey(key);
131 const auto remoteId = syncStore().resolveLocalId(bufferType, sinkId); 133 const auto remoteId = syncStore().resolveLocalId(bufferType, sinkId);
132 Trace() << "Checking for removal " << key << remoteId; 134 SinkTrace() << "Checking for removal " << key << remoteId;
133 // If we have no remoteId, the entity hasn't been replayed to the source yet 135 // If we have no remoteId, the entity hasn't been replayed to the source yet
134 if (!remoteId.isEmpty()) { 136 if (!remoteId.isEmpty()) {
135 if (!exists(remoteId)) { 137 if (!exists(remoteId)) {
136 Trace() << "Found a removed entity: " << sinkId; 138 SinkTrace() << "Found a removed entity: " << sinkId;
137 deleteEntity(sinkId, Sink::Storage::maxRevision(transaction()), bufferType, 139 deleteEntity(sinkId, Sink::Storage::maxRevision(transaction()), bufferType,
138 [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::DeleteEntityCommand, buffer); }); 140 [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::DeleteEntityCommand, buffer); });
139 } 141 }
@@ -143,14 +145,14 @@ void Synchronizer::scanForRemovals(const QByteArray &bufferType, const std::func
143 145
144void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity) 146void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity)
145{ 147{
146 Trace() << "Create or modify" << bufferType << remoteId; 148 SinkTrace() << "Create or modify" << bufferType << remoteId;
147 auto mainDatabase = Storage::mainDatabase(transaction(), bufferType); 149 auto mainDatabase = Storage::mainDatabase(transaction(), bufferType);
148 const auto sinkId = syncStore().resolveRemoteId(bufferType, remoteId); 150 const auto sinkId = syncStore().resolveRemoteId(bufferType, remoteId);
149 const auto found = mainDatabase.contains(sinkId); 151 const auto found = mainDatabase.contains(sinkId);
150 auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(mResourceType, bufferType); 152 auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(mResourceType, bufferType);
151 Q_ASSERT(adaptorFactory); 153 Q_ASSERT(adaptorFactory);
152 if (!found) { 154 if (!found) {
153 Trace() << "Found a new entity: " << remoteId; 155 SinkTrace() << "Found a new entity: " << remoteId;
154 createEntity( 156 createEntity(
155 sinkId, bufferType, entity, *adaptorFactory, [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::CreateEntityCommand, buffer); }); 157 sinkId, bufferType, entity, *adaptorFactory, [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::CreateEntityCommand, buffer); });
156 } else { // modification 158 } else { // modification
@@ -159,17 +161,17 @@ void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray
159 bool changed = false; 161 bool changed = false;
160 for (const auto &property : entity.changedProperties()) { 162 for (const auto &property : entity.changedProperties()) {
161 if (entity.getProperty(property) != current->getProperty(property)) { 163 if (entity.getProperty(property) != current->getProperty(property)) {
162 Trace() << "Property changed " << sinkId << property; 164 SinkTrace() << "Property changed " << sinkId << property;
163 changed = true; 165 changed = true;
164 } 166 }
165 } 167 }
166 if (changed) { 168 if (changed) {
167 Trace() << "Found a modified entity: " << remoteId; 169 SinkTrace() << "Found a modified entity: " << remoteId;
168 modifyEntity(sinkId, Sink::Storage::maxRevision(transaction()), bufferType, entity, *adaptorFactory, 170 modifyEntity(sinkId, Sink::Storage::maxRevision(transaction()), bufferType, entity, *adaptorFactory,
169 [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::ModifyEntityCommand, buffer); }); 171 [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::ModifyEntityCommand, buffer); });
170 } 172 }
171 } else { 173 } else {
172 Warning() << "Failed to get current entity"; 174 SinkWarning() << "Failed to get current entity";
173 } 175 }
174 } 176 }
175} 177}
@@ -178,7 +180,7 @@ template<typename DomainType>
178void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray &remoteId, const DomainType &entity, const QHash<QByteArray, Sink::Query::Comparator> &mergeCriteria) 180void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray &remoteId, const DomainType &entity, const QHash<QByteArray, Sink::Query::Comparator> &mergeCriteria)
179{ 181{
180 182
181 Trace() << "Create or modify" << bufferType << remoteId; 183 SinkTrace() << "Create or modify" << bufferType << remoteId;
182 auto mainDatabase = Storage::mainDatabase(transaction(), bufferType); 184 auto mainDatabase = Storage::mainDatabase(transaction(), bufferType);
183 const auto sinkId = syncStore().resolveRemoteId(bufferType, remoteId); 185 const auto sinkId = syncStore().resolveRemoteId(bufferType, remoteId);
184 const auto found = mainDatabase.contains(sinkId); 186 const auto found = mainDatabase.contains(sinkId);
@@ -192,17 +194,17 @@ void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray
192 reader.query(query, 194 reader.query(query,
193 [this, bufferType, remoteId, &merge](const DomainType &o) -> bool{ 195 [this, bufferType, remoteId, &merge](const DomainType &o) -> bool{
194 merge = true; 196 merge = true;
195 Trace() << "Merging local entity with remote entity: " << o.identifier() << remoteId; 197 SinkTrace() << "Merging local entity with remote entity: " << o.identifier() << remoteId;
196 syncStore().recordRemoteId(bufferType, o.identifier(), remoteId); 198 syncStore().recordRemoteId(bufferType, o.identifier(), remoteId);
197 return false; 199 return false;
198 }); 200 });
199 if (!merge) { 201 if (!merge) {
200 Trace() << "Found a new entity: " << remoteId; 202 SinkTrace() << "Found a new entity: " << remoteId;
201 createEntity( 203 createEntity(
202 sinkId, bufferType, entity, *adaptorFactory, [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::CreateEntityCommand, buffer); }); 204 sinkId, bufferType, entity, *adaptorFactory, [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::CreateEntityCommand, buffer); });
203 } 205 }
204 } else { 206 } else {
205 Trace() << "Found a new entity: " << remoteId; 207 SinkTrace() << "Found a new entity: " << remoteId;
206 createEntity( 208 createEntity(
207 sinkId, bufferType, entity, *adaptorFactory, [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::CreateEntityCommand, buffer); }); 209 sinkId, bufferType, entity, *adaptorFactory, [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::CreateEntityCommand, buffer); });
208 } 210 }
@@ -212,17 +214,17 @@ void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray
212 bool changed = false; 214 bool changed = false;
213 for (const auto &property : entity.changedProperties()) { 215 for (const auto &property : entity.changedProperties()) {
214 if (entity.getProperty(property) != current->getProperty(property)) { 216 if (entity.getProperty(property) != current->getProperty(property)) {
215 Trace() << "Property changed " << sinkId << property; 217 SinkTrace() << "Property changed " << sinkId << property;
216 changed = true; 218 changed = true;
217 } 219 }
218 } 220 }
219 if (changed) { 221 if (changed) {
220 Trace() << "Found a modified entity: " << remoteId; 222 SinkTrace() << "Found a modified entity: " << remoteId;
221 modifyEntity(sinkId, Sink::Storage::maxRevision(transaction()), bufferType, entity, *adaptorFactory, 223 modifyEntity(sinkId, Sink::Storage::maxRevision(transaction()), bufferType, entity, *adaptorFactory,
222 [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::ModifyEntityCommand, buffer); }); 224 [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::ModifyEntityCommand, buffer); });
223 } 225 }
224 } else { 226 } else {
225 Warning() << "Failed to get current entity"; 227 SinkWarning() << "Failed to get current entity";
226 } 228 }
227 } 229 }
228} 230}
@@ -239,7 +241,7 @@ void Synchronizer::modify(const DomainType &entity)
239 241
240KAsync::Job<void> Synchronizer::synchronize() 242KAsync::Job<void> Synchronizer::synchronize()
241{ 243{
242 Trace() << "Synchronizing"; 244 SinkTrace() << "Synchronizing";
243 mSyncInProgress = true; 245 mSyncInProgress = true;
244 mMessageQueue->startTransaction(); 246 mMessageQueue->startTransaction();
245 return synchronizeWithSource().then<void>([this]() { 247 return synchronizeWithSource().then<void>([this]() {
@@ -265,7 +267,7 @@ void Synchronizer::commit()
265Sink::Storage::Transaction &Synchronizer::transaction() 267Sink::Storage::Transaction &Synchronizer::transaction()
266{ 268{
267 if (!mTransaction) { 269 if (!mTransaction) {
268 Trace() << "Starting transaction"; 270 SinkTrace() << "Starting transaction";
269 mTransaction = mStorage.createTransaction(Sink::Storage::ReadOnly); 271 mTransaction = mStorage.createTransaction(Sink::Storage::ReadOnly);
270 } 272 }
271 return mTransaction; 273 return mTransaction;
@@ -274,7 +276,7 @@ Sink::Storage::Transaction &Synchronizer::transaction()
274Sink::Storage::Transaction &Synchronizer::syncTransaction() 276Sink::Storage::Transaction &Synchronizer::syncTransaction()
275{ 277{
276 if (!mSyncTransaction) { 278 if (!mSyncTransaction) {
277 Trace() << "Starting transaction"; 279 SinkTrace() << "Starting transaction";
278 mSyncTransaction = mSyncStorage.createTransaction(Sink::Storage::ReadWrite); 280 mSyncTransaction = mSyncStorage.createTransaction(Sink::Storage::ReadWrite);
279 } 281 }
280 return mSyncTransaction; 282 return mSyncTransaction;
diff --git a/common/test.cpp b/common/test.cpp
index 59ad9ec..5b4c899 100644
--- a/common/test.cpp
+++ b/common/test.cpp
@@ -27,11 +27,17 @@
27#include "facadefactory.h" 27#include "facadefactory.h"
28#include "query.h" 28#include "query.h"
29#include "resourceconfig.h" 29#include "resourceconfig.h"
30#include "definitions.h"
31
32SINK_DEBUG_AREA("test")
30 33
31using namespace Sink; 34using namespace Sink;
32 35
33void Sink::Test::initTest() 36void Sink::Test::initTest()
34{ 37{
38 auto logIniFile = Sink::configLocation() + "/log.ini";
39 auto areaAutocompletionFile = Sink::dataLocation() + "/debugAreas.ini";
40
35 setTestModeEnabled(true); 41 setTestModeEnabled(true);
36 // qDebug() << "Removing " << QStandardPaths::writableLocation(QStandardPaths::GenericDataLocation); 42 // qDebug() << "Removing " << QStandardPaths::writableLocation(QStandardPaths::GenericDataLocation);
37 QDir(QStandardPaths::writableLocation(QStandardPaths::GenericDataLocation)).removeRecursively(); 43 QDir(QStandardPaths::writableLocation(QStandardPaths::GenericDataLocation)).removeRecursively();
@@ -45,6 +51,31 @@ void Sink::Test::initTest()
45 QDir(QStandardPaths::writableLocation(QStandardPaths::CacheLocation)).removeRecursively(); 51 QDir(QStandardPaths::writableLocation(QStandardPaths::CacheLocation)).removeRecursively();
46 // qDebug() << "Removing " << QStandardPaths::writableLocation(QStandardPaths::GenericCacheLocation); 52 // qDebug() << "Removing " << QStandardPaths::writableLocation(QStandardPaths::GenericCacheLocation);
47 QDir(QStandardPaths::writableLocation(QStandardPaths::GenericCacheLocation)).removeRecursively(); 53 QDir(QStandardPaths::writableLocation(QStandardPaths::GenericCacheLocation)).removeRecursively();
54 Log::setPrimaryComponent("test");
55
56 //We copy those files so we can control debug output from outside the test with sinksh
57 {
58 QFile file(logIniFile);
59 if (!file.open(QIODevice::ReadOnly)) {
60 qWarning() << "Failed to open the file: " << logIniFile;
61 }
62 QDir dir;
63 dir.mkpath(Sink::configLocation());
64 if (!file.copy(Sink::configLocation() + "/log.ini")) {
65 qWarning() << "Failed to move the file: " << Sink::configLocation() + "/log.ini";
66 }
67 }
68 {
69 QFile file(areaAutocompletionFile);
70 if (!file.open(QIODevice::ReadOnly)) {
71 qWarning() << "Failed to open the file: " << logIniFile;
72 }
73 QDir dir;
74 dir.mkpath(Sink::dataLocation());
75 if (!file.copy(Sink::dataLocation() + "/debugAreas.ini")) {
76 qWarning() << "Failed to move the file: " << Sink::configLocation() + "/log.ini";
77 }
78 }
48} 79}
49 80
50void Sink::Test::setTestModeEnabled(bool enabled) 81void Sink::Test::setTestModeEnabled(bool enabled)
@@ -102,7 +133,7 @@ public:
102 { 133 {
103 auto resultProvider = new Sink::ResultProvider<typename T::Ptr>(); 134 auto resultProvider = new Sink::ResultProvider<typename T::Ptr>();
104 resultProvider->onDone([resultProvider]() { 135 resultProvider->onDone([resultProvider]() {
105 Trace() << "Result provider is done"; 136 SinkTrace() << "Result provider is done";
106 delete resultProvider; 137 delete resultProvider;
107 }); 138 });
108 // We have to do it this way, otherwise we're not setting the fetcher right 139 // We have to do it this way, otherwise we're not setting the fetcher right
@@ -110,11 +141,11 @@ public:
110 141
111 resultProvider->setFetcher([query, resultProvider, this](const typename T::Ptr &parent) { 142 resultProvider->setFetcher([query, resultProvider, this](const typename T::Ptr &parent) {
112 if (parent) { 143 if (parent) {
113 Trace() << "Running the fetcher " << parent->identifier(); 144 SinkTrace() << "Running the fetcher " << parent->identifier();
114 } else { 145 } else {
115 Trace() << "Running the fetcher."; 146 SinkTrace() << "Running the fetcher.";
116 } 147 }
117 Trace() << "-------------------------."; 148 SinkTrace() << "-------------------------.";
118 for (const auto &res : mTestAccount->entities<T>()) { 149 for (const auto &res : mTestAccount->entities<T>()) {
119 qDebug() << "Parent filter " << query.propertyFilter.value("parent").value.toByteArray() << res->identifier() << res->getProperty("parent").toByteArray(); 150 qDebug() << "Parent filter " << query.propertyFilter.value("parent").value.toByteArray() << res->identifier() << res->getProperty("parent").toByteArray();
120 auto parentProperty = res->getProperty("parent").toByteArray(); 151 auto parentProperty = res->getProperty("parent").toByteArray();
diff --git a/common/typeindex.cpp b/common/typeindex.cpp
index 05bbf5c..78195d3 100644
--- a/common/typeindex.cpp
+++ b/common/typeindex.cpp
@@ -22,8 +22,7 @@
22#include "index.h" 22#include "index.h"
23#include <QDateTime> 23#include <QDateTime>
24 24
25#undef DEBUG_AREA 25SINK_DEBUG_AREA("typeindex")
26#define DEBUG_AREA "common.typeindex"
27 26
28static QByteArray getByteArray(const QVariant &value) 27static QByteArray getByteArray(const QVariant &value)
29{ 28{
@@ -63,7 +62,7 @@ template <>
63void TypeIndex::addProperty<QByteArray>(const QByteArray &property) 62void TypeIndex::addProperty<QByteArray>(const QByteArray &property)
64{ 63{
65 auto indexer = [this, property](const QByteArray &identifier, const QVariant &value, Sink::Storage::Transaction &transaction) { 64 auto indexer = [this, property](const QByteArray &identifier, const QVariant &value, Sink::Storage::Transaction &transaction) {
66 // Trace() << "Indexing " << mType + ".index." + property << value.toByteArray(); 65 // SinkTrace() << "Indexing " << mType + ".index." + property << value.toByteArray();
67 Index(indexName(property), transaction).add(getByteArray(value), identifier); 66 Index(indexName(property), transaction).add(getByteArray(value), identifier);
68 }; 67 };
69 mIndexer.insert(property, indexer); 68 mIndexer.insert(property, indexer);
@@ -74,7 +73,7 @@ template <>
74void TypeIndex::addProperty<QString>(const QByteArray &property) 73void TypeIndex::addProperty<QString>(const QByteArray &property)
75{ 74{
76 auto indexer = [this, property](const QByteArray &identifier, const QVariant &value, Sink::Storage::Transaction &transaction) { 75 auto indexer = [this, property](const QByteArray &identifier, const QVariant &value, Sink::Storage::Transaction &transaction) {
77 // Trace() << "Indexing " << mType + ".index." + property << value.toByteArray(); 76 // SinkTrace() << "Indexing " << mType + ".index." + property << value.toByteArray();
78 Index(indexName(property), transaction).add(getByteArray(value), identifier); 77 Index(indexName(property), transaction).add(getByteArray(value), identifier);
79 }; 78 };
80 mIndexer.insert(property, indexer); 79 mIndexer.insert(property, indexer);
@@ -85,7 +84,7 @@ template <>
85void TypeIndex::addProperty<QDateTime>(const QByteArray &property) 84void TypeIndex::addProperty<QDateTime>(const QByteArray &property)
86{ 85{
87 auto indexer = [this, property](const QByteArray &identifier, const QVariant &value, Sink::Storage::Transaction &transaction) { 86 auto indexer = [this, property](const QByteArray &identifier, const QVariant &value, Sink::Storage::Transaction &transaction) {
88 // Trace() << "Indexing " << mType + ".index." + property << date.toString(); 87 // SinkTrace() << "Indexing " << mType + ".index." + property << date.toString();
89 Index(indexName(property), transaction).add(getByteArray(value), identifier); 88 Index(indexName(property), transaction).add(getByteArray(value), identifier);
90 }; 89 };
91 mIndexer.insert(property, indexer); 90 mIndexer.insert(property, indexer);
@@ -143,12 +142,12 @@ ResultSet TypeIndex::query(const Sink::Query &query, QSet<QByteArray> &appliedFi
143 if (query.propertyFilter.contains(it.key()) && query.sortProperty == it.value()) { 142 if (query.propertyFilter.contains(it.key()) && query.sortProperty == it.value()) {
144 Index index(indexName(it.key(), it.value()), transaction); 143 Index index(indexName(it.key(), it.value()), transaction);
145 const auto lookupKey = getByteArray(query.propertyFilter.value(it.key()).value); 144 const auto lookupKey = getByteArray(query.propertyFilter.value(it.key()).value);
146 Trace() << "looking for " << lookupKey; 145 SinkTrace() << "looking for " << lookupKey;
147 index.lookup(lookupKey, [&](const QByteArray &value) { keys << value; }, 146 index.lookup(lookupKey, [&](const QByteArray &value) { keys << value; },
148 [it](const Index::Error &error) { Warning() << "Error in index: " << error.message << it.key() << it.value(); }, true); 147 [it](const Index::Error &error) { SinkWarning() << "Error in index: " << error.message << it.key() << it.value(); }, true);
149 appliedFilters << it.key(); 148 appliedFilters << it.key();
150 appliedSorting = it.value(); 149 appliedSorting = it.value();
151 Trace() << "Index lookup on " << it.key() << it.value() << " found " << keys.size() << " keys."; 150 SinkTrace() << "Index lookup on " << it.key() << it.value() << " found " << keys.size() << " keys.";
152 return ResultSet(keys); 151 return ResultSet(keys);
153 } 152 }
154 } 153 }
@@ -157,12 +156,12 @@ ResultSet TypeIndex::query(const Sink::Query &query, QSet<QByteArray> &appliedFi
157 Index index(indexName(property), transaction); 156 Index index(indexName(property), transaction);
158 const auto lookupKey = getByteArray(query.propertyFilter.value(property).value); 157 const auto lookupKey = getByteArray(query.propertyFilter.value(property).value);
159 index.lookup( 158 index.lookup(
160 lookupKey, [&](const QByteArray &value) { keys << value; }, [property](const Index::Error &error) { Warning() << "Error in index: " << error.message << property; }); 159 lookupKey, [&](const QByteArray &value) { keys << value; }, [property](const Index::Error &error) { SinkWarning() << "Error in index: " << error.message << property; });
161 appliedFilters << property; 160 appliedFilters << property;
162 Trace() << "Index lookup on " << property << " found " << keys.size() << " keys."; 161 SinkTrace() << "Index lookup on " << property << " found " << keys.size() << " keys.";
163 return ResultSet(keys); 162 return ResultSet(keys);
164 } 163 }
165 } 164 }
166 Trace() << "No matching index"; 165 SinkTrace() << "No matching index";
167 return ResultSet(keys); 166 return ResultSet(keys);
168} 167}