summaryrefslogtreecommitdiffstats
path: root/common/genericresource.cpp
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2016-07-11 11:55:29 +0200
committerChristian Mollekopf <chrigi_1@fastmail.fm>2016-07-11 11:55:29 +0200
commit3a3118e768e1447dc7524328e84b8d7faef81fe1 (patch)
treeaf5582170ed6164fffc9365f34b17bf449c0db40 /common/genericresource.cpp
parentf9379318d801df204cc50385c5eca1f28e91755e (diff)
parentce2fd2666f084eebe443598f6f3740a02913091e (diff)
downloadsink-3a3118e768e1447dc7524328e84b8d7faef81fe1.tar.gz
sink-3a3118e768e1447dc7524328e84b8d7faef81fe1.zip
Merge branch 'feature/notifications' into develop
Diffstat (limited to 'common/genericresource.cpp')
-rw-r--r--common/genericresource.cpp103
1 files changed, 70 insertions, 33 deletions
diff --git a/common/genericresource.cpp b/common/genericresource.cpp
index c06c22a..7136882 100644
--- a/common/genericresource.cpp
+++ b/common/genericresource.cpp
@@ -46,9 +46,6 @@ static int sCommitInterval = 10;
46 46
47using namespace Sink; 47using namespace Sink;
48 48
49#undef DEBUG_AREA
50#define DEBUG_AREA "resource.commandprocessor"
51
52/** 49/**
53 * Drives the pipeline using the output from all command queues 50 * Drives the pipeline using the output from all command queues
54 */ 51 */
@@ -56,12 +53,13 @@ class CommandProcessor : public QObject
56{ 53{
57 Q_OBJECT 54 Q_OBJECT
58 typedef std::function<KAsync::Job<void>(void const *, size_t)> InspectionFunction; 55 typedef std::function<KAsync::Job<void>(void const *, size_t)> InspectionFunction;
56 SINK_DEBUG_AREA("commandprocessor")
59 57
60public: 58public:
61 CommandProcessor(Sink::Pipeline *pipeline, QList<MessageQueue *> commandQueues) : QObject(), mPipeline(pipeline), mCommandQueues(commandQueues), mProcessingLock(false) 59 CommandProcessor(Sink::Pipeline *pipeline, QList<MessageQueue *> commandQueues) : QObject(), mPipeline(pipeline), mCommandQueues(commandQueues), mProcessingLock(false)
62 { 60 {
63 mLowerBoundRevision = Storage::maxRevision(mPipeline->storage().createTransaction(Storage::ReadOnly, [](const Sink::Storage::Error &error) { 61 mLowerBoundRevision = Storage::maxRevision(mPipeline->storage().createTransaction(Storage::ReadOnly, [](const Sink::Storage::Error &error) {
64 Warning() << error.message; 62 SinkWarning() << error.message;
65 })); 63 }));
66 64
67 for (auto queue : mCommandQueues) { 65 for (auto queue : mCommandQueues) {
@@ -80,7 +78,6 @@ public:
80 mInspect = f; 78 mInspect = f;
81 } 79 }
82 80
83
84signals: 81signals:
85 void error(int errorCode, const QString &errorMessage); 82 void error(int errorCode, const QString &errorMessage);
86 83
@@ -114,7 +111,7 @@ private slots:
114 111
115 KAsync::Job<qint64> processQueuedCommand(const Sink::QueuedCommand *queuedCommand) 112 KAsync::Job<qint64> processQueuedCommand(const Sink::QueuedCommand *queuedCommand)
116 { 113 {
117 Trace() << "Processing command: " << Sink::Commands::name(queuedCommand->commandId()); 114 SinkTrace() << "Processing command: " << Sink::Commands::name(queuedCommand->commandId());
118 // Throw command into appropriate pipeline 115 // Throw command into appropriate pipeline
119 switch (queuedCommand->commandId()) { 116 switch (queuedCommand->commandId()) {
120 case Sink::Commands::DeleteEntityCommand: 117 case Sink::Commands::DeleteEntityCommand:
@@ -138,21 +135,21 @@ private slots:
138 { 135 {
139 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(data.constData()), data.size()); 136 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(data.constData()), data.size());
140 if (!Sink::VerifyQueuedCommandBuffer(verifyer)) { 137 if (!Sink::VerifyQueuedCommandBuffer(verifyer)) {
141 Warning() << "invalid buffer"; 138 SinkWarning() << "invalid buffer";
142 // return KAsync::error<void, qint64>(1, "Invalid Buffer"); 139 // return KAsync::error<void, qint64>(1, "Invalid Buffer");
143 } 140 }
144 auto queuedCommand = Sink::GetQueuedCommand(data.constData()); 141 auto queuedCommand = Sink::GetQueuedCommand(data.constData());
145 const auto commandId = queuedCommand->commandId(); 142 const auto commandId = queuedCommand->commandId();
146 Trace() << "Dequeued Command: " << Sink::Commands::name(commandId); 143 SinkTrace() << "Dequeued Command: " << Sink::Commands::name(commandId);
147 return processQueuedCommand(queuedCommand) 144 return processQueuedCommand(queuedCommand)
148 .then<qint64, qint64>( 145 .then<qint64, qint64>(
149 [commandId](qint64 createdRevision) -> qint64 { 146 [this, commandId](qint64 createdRevision) -> qint64 {
150 Trace() << "Command pipeline processed: " << Sink::Commands::name(commandId); 147 SinkTrace() << "Command pipeline processed: " << Sink::Commands::name(commandId);
151 return createdRevision; 148 return createdRevision;
152 }, 149 },
153 [](int errorCode, QString errorMessage) { 150 [](int errorCode, QString errorMessage) {
154 // FIXME propagate error, we didn't handle it 151 // FIXME propagate error, we didn't handle it
155 Warning() << "Error while processing queue command: " << errorMessage; 152 SinkWarning() << "Error while processing queue command: " << errorMessage;
156 }); 153 });
157 } 154 }
158 155
@@ -169,7 +166,7 @@ private slots:
169 return KAsync::start<void>([this, data, time](KAsync::Future<void> &future) { 166 return KAsync::start<void>([this, data, time](KAsync::Future<void> &future) {
170 processQueuedCommand(data) 167 processQueuedCommand(data)
171 .then<void, qint64>([&future, this, time](qint64 createdRevision) { 168 .then<void, qint64>([&future, this, time](qint64 createdRevision) {
172 Trace() << "Created revision " << createdRevision << ". Processing took: " << Log::TraceTime(time->elapsed()); 169 SinkTrace() << "Created revision " << createdRevision << ". Processing took: " << Log::TraceTime(time->elapsed());
173 future.setFinished(); 170 future.setFinished();
174 }) 171 })
175 .exec(); 172 .exec();
@@ -178,7 +175,7 @@ private slots:
178 .then<void>([&future, queue]() { future.setFinished(); }, 175 .then<void>([&future, queue]() { future.setFinished(); },
179 [&future](int i, QString error) { 176 [&future](int i, QString error) {
180 if (i != MessageQueue::ErrorCodes::NoMessageFound) { 177 if (i != MessageQueue::ErrorCodes::NoMessageFound) {
181 Warning() << "Error while getting message from messagequeue: " << error; 178 SinkWarning() << "Error while getting message from messagequeue: " << error;
182 } 179 }
183 future.setFinished(); 180 future.setFinished();
184 }) 181 })
@@ -192,12 +189,12 @@ private slots:
192 auto time = QSharedPointer<QTime>::create(); 189 auto time = QSharedPointer<QTime>::create();
193 time->start(); 190 time->start();
194 mPipeline->startTransaction(); 191 mPipeline->startTransaction();
195 Trace() << "Cleaning up from " << mPipeline->cleanedUpRevision() + 1 << " to " << mLowerBoundRevision; 192 SinkTrace() << "Cleaning up from " << mPipeline->cleanedUpRevision() + 1 << " to " << mLowerBoundRevision;
196 for (qint64 revision = mPipeline->cleanedUpRevision() + 1; revision <= mLowerBoundRevision; revision++) { 193 for (qint64 revision = mPipeline->cleanedUpRevision() + 1; revision <= mLowerBoundRevision; revision++) {
197 mPipeline->cleanupRevision(revision); 194 mPipeline->cleanupRevision(revision);
198 } 195 }
199 mPipeline->commit(); 196 mPipeline->commit();
200 Trace() << "Cleanup done." << Log::TraceTime(time->elapsed()); 197 SinkTrace() << "Cleanup done." << Log::TraceTime(time->elapsed());
201 198
202 // Go through all message queues 199 // Go through all message queues
203 auto it = QSharedPointer<QListIterator<MessageQueue *>>::create(mCommandQueues); 200 auto it = QSharedPointer<QListIterator<MessageQueue *>>::create(mCommandQueues);
@@ -208,8 +205,8 @@ private slots:
208 205
209 auto queue = it->next(); 206 auto queue = it->next();
210 processQueue(queue) 207 processQueue(queue)
211 .then<void>([&future, time]() { 208 .then<void>([this, &future, time]() {
212 Trace() << "Queue processed." << Log::TraceTime(time->elapsed()); 209 SinkTrace() << "Queue processed." << Log::TraceTime(time->elapsed());
213 future.setFinished(); 210 future.setFinished();
214 }) 211 })
215 .exec(); 212 .exec();
@@ -226,9 +223,6 @@ private:
226 InspectionFunction mInspect; 223 InspectionFunction mInspect;
227}; 224};
228 225
229#undef DEBUG_AREA
230#define DEBUG_AREA "resource"
231
232GenericResource::GenericResource(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier, const QSharedPointer<Pipeline> &pipeline ) 226GenericResource::GenericResource(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier, const QSharedPointer<Pipeline> &pipeline )
233 : Sink::Resource(), 227 : Sink::Resource(),
234 mUserQueue(Sink::storageLocation(), resourceInstanceIdentifier + ".userqueue"), 228 mUserQueue(Sink::storageLocation(), resourceInstanceIdentifier + ".userqueue"),
@@ -240,7 +234,7 @@ GenericResource::GenericResource(const QByteArray &resourceType, const QByteArra
240 mClientLowerBoundRevision(std::numeric_limits<qint64>::max()) 234 mClientLowerBoundRevision(std::numeric_limits<qint64>::max())
241{ 235{
242 mPipeline->setResourceType(mResourceType); 236 mPipeline->setResourceType(mResourceType);
243 mProcessor = new CommandProcessor(mPipeline.data(), QList<MessageQueue *>() << &mUserQueue << &mSynchronizerQueue); 237 mProcessor = std::unique_ptr<CommandProcessor>(new CommandProcessor(mPipeline.data(), QList<MessageQueue *>() << &mUserQueue << &mSynchronizerQueue));
244 mProcessor->setInspectionCommand([this](void const *command, size_t size) { 238 mProcessor->setInspectionCommand([this](void const *command, size_t size) {
245 flatbuffers::Verifier verifier((const uint8_t *)command, size); 239 flatbuffers::Verifier verifier((const uint8_t *)command, size);
246 if (Sink::Commands::VerifyInspectionBuffer(verifier)) { 240 if (Sink::Commands::VerifyInspectionBuffer(verifier)) {
@@ -260,18 +254,18 @@ GenericResource::GenericResource(const QByteArray &resourceType, const QByteArra
260 [=]() { 254 [=]() {
261 Log_area("resource.inspection") << "Inspection was successful: " << inspectionType << inspectionId << entityId; 255 Log_area("resource.inspection") << "Inspection was successful: " << inspectionType << inspectionId << entityId;
262 Sink::Notification n; 256 Sink::Notification n;
263 n.type = Sink::Commands::NotificationType_Inspection; 257 n.type = Sink::Notification::Inspection;
264 n.id = inspectionId; 258 n.id = inspectionId;
265 n.code = Sink::Commands::NotificationCode_Success; 259 n.code = Sink::Notification::Success;
266 emit notify(n); 260 emit notify(n);
267 }, 261 },
268 [=](int code, const QString &message) { 262 [=](int code, const QString &message) {
269 Warning_area("resource.inspection") << "Inspection failed: " << inspectionType << inspectionId << entityId << message; 263 Warning_area("resource.inspection") << "Inspection failed: " << inspectionType << inspectionId << entityId << message;
270 Sink::Notification n; 264 Sink::Notification n;
271 n.type = Sink::Commands::NotificationType_Inspection; 265 n.type = Sink::Notification::Inspection;
272 n.message = message; 266 n.message = message;
273 n.id = inspectionId; 267 n.id = inspectionId;
274 n.code = Sink::Commands::NotificationCode_Failure; 268 n.code = Sink::Notification::Failure;
275 emit notify(n); 269 emit notify(n);
276 }) 270 })
277 .exec(); 271 .exec();
@@ -279,8 +273,14 @@ GenericResource::GenericResource(const QByteArray &resourceType, const QByteArra
279 } 273 }
280 return KAsync::error<void>(-1, "Invalid inspection command."); 274 return KAsync::error<void>(-1, "Invalid inspection command.");
281 }); 275 });
282 QObject::connect(mProcessor, &CommandProcessor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); }); 276 {
283 QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, this, &Resource::revisionUpdated); 277 auto ret =QObject::connect(mProcessor.get(), &CommandProcessor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); });
278 Q_ASSERT(ret);
279 }
280 {
281 auto ret = QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, this, &Resource::revisionUpdated);
282 Q_ASSERT(ret);
283 }
284 mClientLowerBoundRevision = mPipeline->cleanedUpRevision(); 284 mClientLowerBoundRevision = mPipeline->cleanedUpRevision();
285 285
286 mCommitQueueTimer.setInterval(sCommitInterval); 286 mCommitQueueTimer.setInterval(sCommitInterval);
@@ -290,13 +290,12 @@ GenericResource::GenericResource(const QByteArray &resourceType, const QByteArra
290 290
291GenericResource::~GenericResource() 291GenericResource::~GenericResource()
292{ 292{
293 delete mProcessor;
294} 293}
295 294
296KAsync::Job<void> GenericResource::inspect( 295KAsync::Job<void> GenericResource::inspect(
297 int inspectionType, const QByteArray &inspectionId, const QByteArray &domainType, const QByteArray &entityId, const QByteArray &property, const QVariant &expectedValue) 296 int inspectionType, const QByteArray &inspectionId, const QByteArray &domainType, const QByteArray &entityId, const QByteArray &property, const QVariant &expectedValue)
298{ 297{
299 Warning() << "Inspection not implemented"; 298 SinkWarning() << "Inspection not implemented";
300 return KAsync::null<void>(); 299 return KAsync::null<void>();
301} 300}
302 301
@@ -329,13 +328,36 @@ void GenericResource::setupSynchronizer(const QSharedPointer<Synchronizer> &sync
329void GenericResource::setupChangereplay(const QSharedPointer<ChangeReplay> &changeReplay) 328void GenericResource::setupChangereplay(const QSharedPointer<ChangeReplay> &changeReplay)
330{ 329{
331 mChangeReplay = changeReplay; 330 mChangeReplay = changeReplay;
331 {
332 auto ret = QObject::connect(mChangeReplay.data(), &ChangeReplay::replayingChanges, [this]() {
333 Sink::Notification n;
334 n.id = "changereplay";
335 n.type = Sink::Notification::Status;
336 n.message = "Replaying changes.";
337 n.code = Sink::ApplicationDomain::BusyStatus;
338 emit notify(n);
339 });
340 Q_ASSERT(ret);
341 }
342 {
343 auto ret = QObject::connect(mChangeReplay.data(), &ChangeReplay::changesReplayed, [this]() {
344 Sink::Notification n;
345 n.id = "changereplay";
346 n.type = Sink::Notification::Status;
347 n.message = "All changes have been replayed.";
348 n.code = Sink::ApplicationDomain::ConnectedStatus;
349 emit notify(n);
350 });
351 Q_ASSERT(ret);
352 }
353
332 mProcessor->setOldestUsedRevision(mChangeReplay->getLastReplayedRevision()); 354 mProcessor->setOldestUsedRevision(mChangeReplay->getLastReplayedRevision());
333 enableChangeReplay(true); 355 enableChangeReplay(true);
334} 356}
335 357
336void GenericResource::removeDataFromDisk() 358void GenericResource::removeDataFromDisk()
337{ 359{
338 Log() << "Removing the resource from disk: " << mResourceInstanceIdentifier; 360 SinkLog() << "Removing the resource from disk: " << mResourceInstanceIdentifier;
339 //Ensure we have no transaction or databases open 361 //Ensure we have no transaction or databases open
340 mSynchronizer.clear(); 362 mSynchronizer.clear();
341 mChangeReplay.clear(); 363 mChangeReplay.clear();
@@ -363,7 +385,7 @@ qint64 GenericResource::diskUsage(const QByteArray &instanceIdentifier)
363 385
364void GenericResource::onProcessorError(int errorCode, const QString &errorMessage) 386void GenericResource::onProcessorError(int errorCode, const QString &errorMessage)
365{ 387{
366 Warning() << "Received error from Processor: " << errorCode << errorMessage; 388 SinkWarning() << "Received error from Processor: " << errorCode << errorMessage;
367 mError = errorCode; 389 mError = errorCode;
368} 390}
369 391
@@ -399,12 +421,27 @@ void GenericResource::processCommand(int commandId, const QByteArray &data)
399KAsync::Job<void> GenericResource::synchronizeWithSource() 421KAsync::Job<void> GenericResource::synchronizeWithSource()
400{ 422{
401 return KAsync::start<void>([this](KAsync::Future<void> &future) { 423 return KAsync::start<void>([this](KAsync::Future<void> &future) {
402 Log() << " Synchronizing"; 424
425 Sink::Notification n;
426 n.id = "sync";
427 n.type = Sink::Notification::Status;
428 n.message = "Synchronization has started.";
429 n.code = Sink::ApplicationDomain::BusyStatus;
430 emit notify(n);
431
432 SinkLog() << " Synchronizing";
403 // Changereplay would deadlock otherwise when trying to open the synchronization store 433 // Changereplay would deadlock otherwise when trying to open the synchronization store
404 enableChangeReplay(false); 434 enableChangeReplay(false);
405 mSynchronizer->synchronize() 435 mSynchronizer->synchronize()
406 .then<void>([this, &future]() { 436 .then<void>([this, &future]() {
407 Log() << "Done Synchronizing"; 437 SinkLog() << "Done Synchronizing";
438 Sink::Notification n;
439 n.id = "sync";
440 n.type = Sink::Notification::Status;
441 n.message = "Synchronization has ended.";
442 n.code = Sink::ApplicationDomain::ConnectedStatus;
443 emit notify(n);
444
408 enableChangeReplay(true); 445 enableChangeReplay(true);
409 future.setFinished(); 446 future.setFinished();
410 }, [this, &future](int errorCode, const QString &error) { 447 }, [this, &future](int errorCode, const QString &error) {