summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2016-07-05 15:22:10 +0200
committerChristian Mollekopf <chrigi_1@fastmail.fm>2016-07-05 15:22:10 +0200
commitb4df9eb5f1f4a0ac2b1272fc34d4b8aad473008b (patch)
tree9abf529061432031afefd6a8bfa821a9779f763d
parentf9379318d801df204cc50385c5eca1f28e91755e (diff)
downloadsink-b4df9eb5f1f4a0ac2b1272fc34d4b8aad473008b.tar.gz
sink-b4df9eb5f1f4a0ac2b1272fc34d4b8aad473008b.zip
Prepare for making the resource status available
-rw-r--r--common/changereplay.cpp1
-rw-r--r--common/changereplay.h1
-rw-r--r--common/commands/notification.fbs7
-rw-r--r--common/domain/applicationdomaintype.h62
-rw-r--r--common/genericresource.cpp40
-rw-r--r--common/listener.cpp4
-rw-r--r--common/notification.h13
-rw-r--r--common/resourceaccess.cpp49
-rw-r--r--common/resourceaccess.h10
-rw-r--r--tests/dummyresourcebenchmark.cpp2
10 files changed, 151 insertions, 38 deletions
diff --git a/common/changereplay.cpp b/common/changereplay.cpp
index 0096bd0..78c0ff5 100644
--- a/common/changereplay.cpp
+++ b/common/changereplay.cpp
@@ -86,6 +86,7 @@ KAsync::Job<void> ChangeReplay::replayNextRevision()
86 86
87 if (lastReplayedRevision < topRevision) { 87 if (lastReplayedRevision < topRevision) {
88 Trace() << "Changereplay from " << lastReplayedRevision << " to " << topRevision; 88 Trace() << "Changereplay from " << lastReplayedRevision << " to " << topRevision;
89 emit replayingChanges();
89 qint64 revision = lastReplayedRevision + 1; 90 qint64 revision = lastReplayedRevision + 1;
90 const auto uid = Storage::getUidFromRevision(mainStoreTransaction, revision); 91 const auto uid = Storage::getUidFromRevision(mainStoreTransaction, revision);
91 const auto type = Storage::getTypeFromRevision(mainStoreTransaction, revision); 92 const auto type = Storage::getTypeFromRevision(mainStoreTransaction, revision);
diff --git a/common/changereplay.h b/common/changereplay.h
index aba3dd0..6c1c1db 100644
--- a/common/changereplay.h
+++ b/common/changereplay.h
@@ -45,6 +45,7 @@ public:
45 45
46signals: 46signals:
47 void changesReplayed(); 47 void changesReplayed();
48 void replayingChanges();
48 49
49public slots: 50public slots:
50 void revisionChanged(); 51 void revisionChanged();
diff --git a/common/commands/notification.fbs b/common/commands/notification.fbs
index 8750ff5..c82fad3 100644
--- a/common/commands/notification.fbs
+++ b/common/commands/notification.fbs
@@ -1,13 +1,10 @@
1namespace Sink.Commands; 1namespace Sink.Commands;
2 2
3enum NotificationType : byte { Shutdown = 1, Status, Warning, Progress, Inspection, RevisionUpdate }
4enum NotificationCode : byte { Success = 0, Failure = 1, UserCode }
5
6table Notification { 3table Notification {
7 type: NotificationType = Status; 4 type: int = 0; //See notification.h
8 identifier: string; //An identifier that links back to the something related to the notification (e.g. an entity id or a command id) 5 identifier: string; //An identifier that links back to the something related to the notification (e.g. an entity id or a command id)
9 message: string; 6 message: string;
10 code: int = 0; //Of type NotificationCode 7 code: int = 0; //See notification.h
11} 8}
12 9
13root_type Notification; 10root_type Notification;
diff --git a/common/domain/applicationdomaintype.h b/common/domain/applicationdomaintype.h
index 849c3e2..5efb936 100644
--- a/common/domain/applicationdomaintype.h
+++ b/common/domain/applicationdomaintype.h
@@ -52,6 +52,13 @@
52 void setExtracted##NAME(const TYPE &value) { setProperty(NAME::name, QVariant::fromValue(value)); } \ 52 void setExtracted##NAME(const TYPE &value) { setProperty(NAME::name, QVariant::fromValue(value)); } \
53 TYPE get##NAME() const { return getProperty(NAME::name).value<TYPE>(); } \ 53 TYPE get##NAME() const { return getProperty(NAME::name).value<TYPE>(); } \
54 54
55#define SINK_STATUS_PROPERTY(TYPE, NAME, LOWERCASENAME) \
56 struct NAME { \
57 static constexpr const char *name = #LOWERCASENAME; \
58 typedef TYPE Type; \
59 }; \
60 void setStatus##NAME(const TYPE &value) { setProperty(NAME::name, QVariant::fromValue(value)); } \
61 TYPE get##NAME() const { return getProperty(NAME::name).value<TYPE>(); } \
55 62
56#define SINK_BLOB_PROPERTY(NAME, LOWERCASENAME) \ 63#define SINK_BLOB_PROPERTY(NAME, LOWERCASENAME) \
57 struct NAME { \ 64 struct NAME { \
@@ -76,6 +83,14 @@
76namespace Sink { 83namespace Sink {
77namespace ApplicationDomain { 84namespace ApplicationDomain {
78 85
86struct SINK_EXPORT Error {
87
88};
89
90struct SINK_EXPORT Progress {
91
92};
93
79/** 94/**
80 * The domain type interface has two purposes: 95 * The domain type interface has two purposes:
81 * * provide a unified interface to read buffers (for zero-copy reading) 96 * * provide a unified interface to read buffers (for zero-copy reading)
@@ -215,6 +230,38 @@ struct SINK_EXPORT Mail : public Entity {
215}; 230};
216 231
217/** 232/**
233 * The status of an account or resource.
234 *
235 * It is set as follows:
236 * * By default the status is offline.
237 * * If a connection to the server could be established the status is Connected.
238 * * If an error occurred that keeps the resource from operating (so non transient), the resource enters the error state.
239 * * If a long running operation is started the resource goes to the busy state (and return to the previous state after that).
240 */
241enum SINK_EXPORT Status {
242 OfflineStatus,
243 ConnectedStatus,
244 BusyStatus,
245 ErrorStatus
246};
247
248struct SINK_EXPORT SinkAccount : public ApplicationDomainType {
249 typedef QSharedPointer<SinkAccount> Ptr;
250 explicit SinkAccount(const QByteArray &resourceInstanceIdentifier, const QByteArray &identifier, qint64 revision, const QSharedPointer<BufferAdaptor> &adaptor);
251 explicit SinkAccount(const QByteArray &identifier);
252 SinkAccount();
253 virtual ~SinkAccount();
254
255 SINK_PROPERTY(QString, Name, name);
256 SINK_PROPERTY(QString, Icon, icon);
257 SINK_PROPERTY(QString, AccountType, accountType);
258 SINK_STATUS_PROPERTY(int, Status, status);
259 SINK_STATUS_PROPERTY(ApplicationDomain::Error, Error, error);
260 SINK_STATUS_PROPERTY(ApplicationDomain::Progress, Progress, progress);
261};
262
263
264/**
218 * Represents an sink resource. 265 * Represents an sink resource.
219 * 266 *
220 * This type is used for configuration of resources, 267 * This type is used for configuration of resources,
@@ -226,14 +273,13 @@ struct SINK_EXPORT SinkResource : public ApplicationDomainType {
226 explicit SinkResource(const QByteArray &identifier); 273 explicit SinkResource(const QByteArray &identifier);
227 SinkResource(); 274 SinkResource();
228 virtual ~SinkResource(); 275 virtual ~SinkResource();
229};
230 276
231struct SINK_EXPORT SinkAccount : public ApplicationDomainType { 277 SINK_REFERENCE_PROPERTY(SinkAccount, Account, account);
232 typedef QSharedPointer<SinkAccount> Ptr; 278 SINK_PROPERTY(QString, ResourceType, resourceType);
233 explicit SinkAccount(const QByteArray &resourceInstanceIdentifier, const QByteArray &identifier, qint64 revision, const QSharedPointer<BufferAdaptor> &adaptor); 279 SINK_PROPERTY(QByteArrayList, Capabilities, capabilities);
234 explicit SinkAccount(const QByteArray &identifier); 280 SINK_STATUS_PROPERTY(int, Status, status);
235 SinkAccount(); 281 SINK_STATUS_PROPERTY(ApplicationDomain::Error, Error, error);
236 virtual ~SinkAccount(); 282 SINK_STATUS_PROPERTY(ApplicationDomain::Progress, Progress, progress);
237}; 283};
238 284
239struct SINK_EXPORT Identity : public ApplicationDomainType { 285struct SINK_EXPORT Identity : public ApplicationDomainType {
@@ -330,3 +376,5 @@ Q_DECLARE_METATYPE(Sink::ApplicationDomain::SinkAccount)
330Q_DECLARE_METATYPE(Sink::ApplicationDomain::SinkAccount::Ptr) 376Q_DECLARE_METATYPE(Sink::ApplicationDomain::SinkAccount::Ptr)
331Q_DECLARE_METATYPE(Sink::ApplicationDomain::Identity) 377Q_DECLARE_METATYPE(Sink::ApplicationDomain::Identity)
332Q_DECLARE_METATYPE(Sink::ApplicationDomain::Identity::Ptr) 378Q_DECLARE_METATYPE(Sink::ApplicationDomain::Identity::Ptr)
379Q_DECLARE_METATYPE(Sink::ApplicationDomain::Error)
380Q_DECLARE_METATYPE(Sink::ApplicationDomain::Progress)
diff --git a/common/genericresource.cpp b/common/genericresource.cpp
index c06c22a..5522174 100644
--- a/common/genericresource.cpp
+++ b/common/genericresource.cpp
@@ -260,18 +260,18 @@ GenericResource::GenericResource(const QByteArray &resourceType, const QByteArra
260 [=]() { 260 [=]() {
261 Log_area("resource.inspection") << "Inspection was successful: " << inspectionType << inspectionId << entityId; 261 Log_area("resource.inspection") << "Inspection was successful: " << inspectionType << inspectionId << entityId;
262 Sink::Notification n; 262 Sink::Notification n;
263 n.type = Sink::Commands::NotificationType_Inspection; 263 n.type = Sink::Notification::Inspection;
264 n.id = inspectionId; 264 n.id = inspectionId;
265 n.code = Sink::Commands::NotificationCode_Success; 265 n.code = Sink::Notification::Success;
266 emit notify(n); 266 emit notify(n);
267 }, 267 },
268 [=](int code, const QString &message) { 268 [=](int code, const QString &message) {
269 Warning_area("resource.inspection") << "Inspection failed: " << inspectionType << inspectionId << entityId << message; 269 Warning_area("resource.inspection") << "Inspection failed: " << inspectionType << inspectionId << entityId << message;
270 Sink::Notification n; 270 Sink::Notification n;
271 n.type = Sink::Commands::NotificationType_Inspection; 271 n.type = Sink::Notification::Inspection;
272 n.message = message; 272 n.message = message;
273 n.id = inspectionId; 273 n.id = inspectionId;
274 n.code = Sink::Commands::NotificationCode_Failure; 274 n.code = Sink::Notification::Failure;
275 emit notify(n); 275 emit notify(n);
276 }) 276 })
277 .exec(); 277 .exec();
@@ -283,6 +283,23 @@ GenericResource::GenericResource(const QByteArray &resourceType, const QByteArra
283 QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, this, &Resource::revisionUpdated); 283 QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, this, &Resource::revisionUpdated);
284 mClientLowerBoundRevision = mPipeline->cleanedUpRevision(); 284 mClientLowerBoundRevision = mPipeline->cleanedUpRevision();
285 285
286 QObject::connect(mChangeReplay.data(), &ChangeReplay::replayingChanges, [this]() {
287 Sink::Notification n;
288 n.id = "changereplay";
289 n.type = Sink::Notification::Status;
290 n.message = "Replaying changes.";
291 n.code = Sink::ApplicationDomain::BusyStatus;
292 emit notify(n);
293 });
294 QObject::connect(mChangeReplay.data(), &ChangeReplay::changesReplayed, [this]() {
295 Sink::Notification n;
296 n.id = "changereplay";
297 n.type = Sink::Notification::Status;
298 n.message = "All changes have been replayed.";
299 n.code = Sink::ApplicationDomain::ConnectedStatus;
300 emit notify(n);
301 });
302
286 mCommitQueueTimer.setInterval(sCommitInterval); 303 mCommitQueueTimer.setInterval(sCommitInterval);
287 mCommitQueueTimer.setSingleShot(true); 304 mCommitQueueTimer.setSingleShot(true);
288 QObject::connect(&mCommitQueueTimer, &QTimer::timeout, &mUserQueue, &MessageQueue::commit); 305 QObject::connect(&mCommitQueueTimer, &QTimer::timeout, &mUserQueue, &MessageQueue::commit);
@@ -399,12 +416,27 @@ void GenericResource::processCommand(int commandId, const QByteArray &data)
399KAsync::Job<void> GenericResource::synchronizeWithSource() 416KAsync::Job<void> GenericResource::synchronizeWithSource()
400{ 417{
401 return KAsync::start<void>([this](KAsync::Future<void> &future) { 418 return KAsync::start<void>([this](KAsync::Future<void> &future) {
419
420 Sink::Notification n;
421 n.id = "sync";
422 n.type = Sink::Notification::Status;
423 n.message = "Synchronization has started.";
424 n.code = Sink::ApplicationDomain::BusyStatus;
425 emit notify(n);
426
402 Log() << " Synchronizing"; 427 Log() << " Synchronizing";
403 // Changereplay would deadlock otherwise when trying to open the synchronization store 428 // Changereplay would deadlock otherwise when trying to open the synchronization store
404 enableChangeReplay(false); 429 enableChangeReplay(false);
405 mSynchronizer->synchronize() 430 mSynchronizer->synchronize()
406 .then<void>([this, &future]() { 431 .then<void>([this, &future]() {
407 Log() << "Done Synchronizing"; 432 Log() << "Done Synchronizing";
433 Sink::Notification n;
434 n.id = "sync";
435 n.type = Sink::Notification::Status;
436 n.message = "Synchronization has ended.";
437 n.code = Sink::ApplicationDomain::ConnectedStatus;
438 emit notify(n);
439
408 enableChangeReplay(true); 440 enableChangeReplay(true);
409 future.setFinished(); 441 future.setFinished();
410 }, [this, &future](int errorCode, const QString &error) { 442 }, [this, &future](int errorCode, const QString &error) {
diff --git a/common/listener.cpp b/common/listener.cpp
index 84afe16..d2fc510 100644
--- a/common/listener.cpp
+++ b/common/listener.cpp
@@ -330,7 +330,7 @@ qint64 Listener::lowerBoundRevision()
330void Listener::quit() 330void Listener::quit()
331{ 331{
332 // Broadcast shutdown notifications to open clients, so they don't try to restart the resource 332 // Broadcast shutdown notifications to open clients, so they don't try to restart the resource
333 auto command = Sink::Commands::CreateNotification(m_fbb, Sink::Commands::NotificationType::NotificationType_Shutdown); 333 auto command = Sink::Commands::CreateNotification(m_fbb, Sink::Notification::Shutdown);
334 Sink::Commands::FinishNotificationBuffer(m_fbb, command); 334 Sink::Commands::FinishNotificationBuffer(m_fbb, command);
335 for (Client &client : m_connections) { 335 for (Client &client : m_connections) {
336 if (client.socket && client.socket->isOpen()) { 336 if (client.socket && client.socket->isOpen()) {
@@ -418,7 +418,7 @@ void Listener::notify(const Sink::Notification &notification)
418 auto messageString = m_fbb.CreateString(notification.message.toUtf8().constData(), notification.message.toUtf8().size()); 418 auto messageString = m_fbb.CreateString(notification.message.toUtf8().constData(), notification.message.toUtf8().size());
419 auto idString = m_fbb.CreateString(notification.id.constData(), notification.id.size()); 419 auto idString = m_fbb.CreateString(notification.id.constData(), notification.id.size());
420 Sink::Commands::NotificationBuilder builder(m_fbb); 420 Sink::Commands::NotificationBuilder builder(m_fbb);
421 builder.add_type(static_cast<Sink::Commands::NotificationType>(notification.type)); 421 builder.add_type(notification.type);
422 builder.add_code(notification.code); 422 builder.add_code(notification.code);
423 builder.add_identifier(idString); 423 builder.add_identifier(idString);
424 builder.add_message(messageString); 424 builder.add_message(messageString);
diff --git a/common/notification.h b/common/notification.h
index 0eb796d..0a267e6 100644
--- a/common/notification.h
+++ b/common/notification.h
@@ -30,6 +30,19 @@ namespace Sink {
30class SINK_EXPORT Notification 30class SINK_EXPORT Notification
31{ 31{
32public: 32public:
33 enum NoticationType {
34 Shutdown,
35 Status,
36 Warning,
37 Progress,
38 Inspection,
39 RevisionUpdate
40 };
41 enum InspectionCode {
42 Success,
43 Failure
44 };
45
33 QByteArray id; 46 QByteArray id;
34 int type; 47 int type;
35 QString message; 48 QString message;
diff --git a/common/resourceaccess.cpp b/common/resourceaccess.cpp
index d3bd85f..95b4a7e 100644
--- a/common/resourceaccess.cpp
+++ b/common/resourceaccess.cpp
@@ -230,6 +230,7 @@ KAsync::Job<void> ResourceAccess::Private::initializeSocket()
230ResourceAccess::ResourceAccess(const QByteArray &resourceInstanceIdentifier, const QByteArray &resourceType) 230ResourceAccess::ResourceAccess(const QByteArray &resourceInstanceIdentifier, const QByteArray &resourceType)
231 : ResourceAccessInterface(), d(new Private(resourceType, resourceInstanceIdentifier, this)) 231 : ResourceAccessInterface(), d(new Private(resourceType, resourceInstanceIdentifier, this))
232{ 232{
233 mResourceStatus = Sink::ApplicationDomain::OfflineStatus;
233 Trace() << "Starting access"; 234 Trace() << "Starting access";
234} 235}
235 236
@@ -513,6 +514,22 @@ void ResourceAccess::readResourceMessage()
513 } 514 }
514} 515}
515 516
517static Sink::Notification getNotification(const Sink::Commands::Notification *buffer)
518{
519 Sink::Notification n;
520 if (buffer->identifier()) {
521 // Don't use fromRawData, the buffer is gone once we invoke emit notification
522 n.id = BufferUtils::extractBufferCopy(buffer->identifier());
523 }
524 if (buffer->message()) {
525 // Don't use fromRawData, the buffer is gone once we invoke emit notification
526 n.message = BufferUtils::extractBufferCopy(buffer->message());
527 }
528 n.type = buffer->type();
529 n.code = buffer->code();
530 return n;
531}
532
516bool ResourceAccess::processMessageBuffer() 533bool ResourceAccess::processMessageBuffer()
517{ 534{
518 static const int headerSize = Commands::headerSize(); 535 static const int headerSize = Commands::headerSize();
@@ -535,7 +552,7 @@ bool ResourceAccess::processMessageBuffer()
535 auto buffer = Commands::GetRevisionUpdate(d->partialMessageBuffer.constData() + headerSize); 552 auto buffer = Commands::GetRevisionUpdate(d->partialMessageBuffer.constData() + headerSize);
536 Trace() << QString("Revision updated to: %1").arg(buffer->revision()); 553 Trace() << QString("Revision updated to: %1").arg(buffer->revision());
537 Notification n; 554 Notification n;
538 n.type = Sink::Commands::NotificationType::NotificationType_RevisionUpdate; 555 n.type = Sink::Notification::RevisionUpdate;
539 emit notification(n); 556 emit notification(n);
540 emit revisionChanged(buffer->revision()); 557 emit revisionChanged(buffer->revision());
541 558
@@ -553,30 +570,26 @@ bool ResourceAccess::processMessageBuffer()
553 case Commands::NotificationCommand: { 570 case Commands::NotificationCommand: {
554 auto buffer = Commands::GetNotification(d->partialMessageBuffer.constData() + headerSize); 571 auto buffer = Commands::GetNotification(d->partialMessageBuffer.constData() + headerSize);
555 switch (buffer->type()) { 572 switch (buffer->type()) {
556 case Sink::Commands::NotificationType::NotificationType_Shutdown: 573 case Sink::Notification::Shutdown:
557 Log() << "Received shutdown notification."; 574 Log() << "Received shutdown notification.";
558 close(); 575 close();
559 break; 576 break;
560 case Sink::Commands::NotificationType::NotificationType_Inspection: { 577 case Sink::Notification::Inspection: {
561 Trace() << "Received inspection notification."; 578 Trace() << "Received inspection notification.";
562 Notification n; 579 auto n = getNotification(buffer);
563 if (buffer->identifier()) {
564 // Don't use fromRawData, the buffer is gone once we invoke emit notification
565 n.id = BufferUtils::extractBufferCopy(buffer->identifier());
566 }
567 if (buffer->message()) {
568 // Don't use fromRawData, the buffer is gone once we invoke emit notification
569 n.message = BufferUtils::extractBufferCopy(buffer->message());
570 }
571 n.type = buffer->type();
572 n.code = buffer->code();
573 // The callbacks can result in this object getting destroyed directly, so we need to ensure we finish our work first 580 // The callbacks can result in this object getting destroyed directly, so we need to ensure we finish our work first
574 queuedInvoke([=]() { emit notification(n); }, this); 581 queuedInvoke([=]() { emit notification(n); }, this);
575 } break; 582 } break;
576 case Sink::Commands::NotificationType::NotificationType_Status: 583 case Sink::Notification::Status:
577 case Sink::Commands::NotificationType::NotificationType_Warning: 584 mResourceStatus = buffer->code();
578 case Sink::Commands::NotificationType::NotificationType_Progress: 585 [[clang::fallthrough]];
579 case Sink::Commands::NotificationType::NotificationType_RevisionUpdate: 586 case Sink::Notification::Warning:
587 [[clang::fallthrough]];
588 case Sink::Notification::Progress: {
589 auto n = getNotification(buffer);
590 emit notification(n);
591 } break;
592 case Sink::Notification::RevisionUpdate:
580 default: 593 default:
581 Warning() << "Received unknown notification: " << buffer->type(); 594 Warning() << "Received unknown notification: " << buffer->type();
582 break; 595 break;
diff --git a/common/resourceaccess.h b/common/resourceaccess.h
index 69d52b4..5c65998 100644
--- a/common/resourceaccess.h
+++ b/common/resourceaccess.h
@@ -72,14 +72,22 @@ public:
72 return KAsync::null<void>(); 72 return KAsync::null<void>();
73 }; 73 };
74 74
75 int getResourceStatus() const
76 {
77 return mResourceStatus;
78 }
79
75signals: 80signals:
76 void ready(bool isReady); 81 void ready(bool isReady);
77 void revisionChanged(qint64 revision); 82 void revisionChanged(qint64 revision);
78 void notification(Notification revision); 83 void notification(Notification notification);
79 84
80public slots: 85public slots:
81 virtual void open() = 0; 86 virtual void open() = 0;
82 virtual void close() = 0; 87 virtual void close() = 0;
88
89protected:
90 int mResourceStatus;
83}; 91};
84 92
85class SINK_EXPORT ResourceAccess : public ResourceAccessInterface 93class SINK_EXPORT ResourceAccess : public ResourceAccessInterface
diff --git a/tests/dummyresourcebenchmark.cpp b/tests/dummyresourcebenchmark.cpp
index d5f98c3..7e334a6 100644
--- a/tests/dummyresourcebenchmark.cpp
+++ b/tests/dummyresourcebenchmark.cpp
@@ -90,7 +90,7 @@ private slots:
90 bool gotNotification = false; 90 bool gotNotification = false;
91 int duration = 0; 91 int duration = 0;
92 notifier->registerHandler([&gotNotification, &duration, &time](const Sink::Notification &notification) { 92 notifier->registerHandler([&gotNotification, &duration, &time](const Sink::Notification &notification) {
93 if (notification.type == Sink::Commands::NotificationType::NotificationType_RevisionUpdate) { 93 if (notification.type == Sink::Notification::RevisionUpdate) {
94 gotNotification = true; 94 gotNotification = true;
95 duration = time.elapsed(); 95 duration = time.elapsed();
96 } 96 }