diff options
author | Christian Mollekopf <chrigi_1@fastmail.fm> | 2016-07-11 11:55:29 +0200 |
---|---|---|
committer | Christian Mollekopf <chrigi_1@fastmail.fm> | 2016-07-11 11:55:29 +0200 |
commit | 3a3118e768e1447dc7524328e84b8d7faef81fe1 (patch) | |
tree | af5582170ed6164fffc9365f34b17bf449c0db40 /common/genericresource.cpp | |
parent | f9379318d801df204cc50385c5eca1f28e91755e (diff) | |
parent | ce2fd2666f084eebe443598f6f3740a02913091e (diff) | |
download | sink-3a3118e768e1447dc7524328e84b8d7faef81fe1.tar.gz sink-3a3118e768e1447dc7524328e84b8d7faef81fe1.zip |
Merge branch 'feature/notifications' into develop
Diffstat (limited to 'common/genericresource.cpp')
-rw-r--r-- | common/genericresource.cpp | 103 |
1 files changed, 70 insertions, 33 deletions
diff --git a/common/genericresource.cpp b/common/genericresource.cpp index c06c22a..7136882 100644 --- a/common/genericresource.cpp +++ b/common/genericresource.cpp | |||
@@ -46,9 +46,6 @@ static int sCommitInterval = 10; | |||
46 | 46 | ||
47 | using namespace Sink; | 47 | using namespace Sink; |
48 | 48 | ||
49 | #undef DEBUG_AREA | ||
50 | #define DEBUG_AREA "resource.commandprocessor" | ||
51 | |||
52 | /** | 49 | /** |
53 | * Drives the pipeline using the output from all command queues | 50 | * Drives the pipeline using the output from all command queues |
54 | */ | 51 | */ |
@@ -56,12 +53,13 @@ class CommandProcessor : public QObject | |||
56 | { | 53 | { |
57 | Q_OBJECT | 54 | Q_OBJECT |
58 | typedef std::function<KAsync::Job<void>(void const *, size_t)> InspectionFunction; | 55 | typedef std::function<KAsync::Job<void>(void const *, size_t)> InspectionFunction; |
56 | SINK_DEBUG_AREA("commandprocessor") | ||
59 | 57 | ||
60 | public: | 58 | public: |
61 | CommandProcessor(Sink::Pipeline *pipeline, QList<MessageQueue *> commandQueues) : QObject(), mPipeline(pipeline), mCommandQueues(commandQueues), mProcessingLock(false) | 59 | CommandProcessor(Sink::Pipeline *pipeline, QList<MessageQueue *> commandQueues) : QObject(), mPipeline(pipeline), mCommandQueues(commandQueues), mProcessingLock(false) |
62 | { | 60 | { |
63 | mLowerBoundRevision = Storage::maxRevision(mPipeline->storage().createTransaction(Storage::ReadOnly, [](const Sink::Storage::Error &error) { | 61 | mLowerBoundRevision = Storage::maxRevision(mPipeline->storage().createTransaction(Storage::ReadOnly, [](const Sink::Storage::Error &error) { |
64 | Warning() << error.message; | 62 | SinkWarning() << error.message; |
65 | })); | 63 | })); |
66 | 64 | ||
67 | for (auto queue : mCommandQueues) { | 65 | for (auto queue : mCommandQueues) { |
@@ -80,7 +78,6 @@ public: | |||
80 | mInspect = f; | 78 | mInspect = f; |
81 | } | 79 | } |
82 | 80 | ||
83 | |||
84 | signals: | 81 | signals: |
85 | void error(int errorCode, const QString &errorMessage); | 82 | void error(int errorCode, const QString &errorMessage); |
86 | 83 | ||
@@ -114,7 +111,7 @@ private slots: | |||
114 | 111 | ||
115 | KAsync::Job<qint64> processQueuedCommand(const Sink::QueuedCommand *queuedCommand) | 112 | KAsync::Job<qint64> processQueuedCommand(const Sink::QueuedCommand *queuedCommand) |
116 | { | 113 | { |
117 | Trace() << "Processing command: " << Sink::Commands::name(queuedCommand->commandId()); | 114 | SinkTrace() << "Processing command: " << Sink::Commands::name(queuedCommand->commandId()); |
118 | // Throw command into appropriate pipeline | 115 | // Throw command into appropriate pipeline |
119 | switch (queuedCommand->commandId()) { | 116 | switch (queuedCommand->commandId()) { |
120 | case Sink::Commands::DeleteEntityCommand: | 117 | case Sink::Commands::DeleteEntityCommand: |
@@ -138,21 +135,21 @@ private slots: | |||
138 | { | 135 | { |
139 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(data.constData()), data.size()); | 136 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(data.constData()), data.size()); |
140 | if (!Sink::VerifyQueuedCommandBuffer(verifyer)) { | 137 | if (!Sink::VerifyQueuedCommandBuffer(verifyer)) { |
141 | Warning() << "invalid buffer"; | 138 | SinkWarning() << "invalid buffer"; |
142 | // return KAsync::error<void, qint64>(1, "Invalid Buffer"); | 139 | // return KAsync::error<void, qint64>(1, "Invalid Buffer"); |
143 | } | 140 | } |
144 | auto queuedCommand = Sink::GetQueuedCommand(data.constData()); | 141 | auto queuedCommand = Sink::GetQueuedCommand(data.constData()); |
145 | const auto commandId = queuedCommand->commandId(); | 142 | const auto commandId = queuedCommand->commandId(); |
146 | Trace() << "Dequeued Command: " << Sink::Commands::name(commandId); | 143 | SinkTrace() << "Dequeued Command: " << Sink::Commands::name(commandId); |
147 | return processQueuedCommand(queuedCommand) | 144 | return processQueuedCommand(queuedCommand) |
148 | .then<qint64, qint64>( | 145 | .then<qint64, qint64>( |
149 | [commandId](qint64 createdRevision) -> qint64 { | 146 | [this, commandId](qint64 createdRevision) -> qint64 { |
150 | Trace() << "Command pipeline processed: " << Sink::Commands::name(commandId); | 147 | SinkTrace() << "Command pipeline processed: " << Sink::Commands::name(commandId); |
151 | return createdRevision; | 148 | return createdRevision; |
152 | }, | 149 | }, |
153 | [](int errorCode, QString errorMessage) { | 150 | [](int errorCode, QString errorMessage) { |
154 | // FIXME propagate error, we didn't handle it | 151 | // FIXME propagate error, we didn't handle it |
155 | Warning() << "Error while processing queue command: " << errorMessage; | 152 | SinkWarning() << "Error while processing queue command: " << errorMessage; |
156 | }); | 153 | }); |
157 | } | 154 | } |
158 | 155 | ||
@@ -169,7 +166,7 @@ private slots: | |||
169 | return KAsync::start<void>([this, data, time](KAsync::Future<void> &future) { | 166 | return KAsync::start<void>([this, data, time](KAsync::Future<void> &future) { |
170 | processQueuedCommand(data) | 167 | processQueuedCommand(data) |
171 | .then<void, qint64>([&future, this, time](qint64 createdRevision) { | 168 | .then<void, qint64>([&future, this, time](qint64 createdRevision) { |
172 | Trace() << "Created revision " << createdRevision << ". Processing took: " << Log::TraceTime(time->elapsed()); | 169 | SinkTrace() << "Created revision " << createdRevision << ". Processing took: " << Log::TraceTime(time->elapsed()); |
173 | future.setFinished(); | 170 | future.setFinished(); |
174 | }) | 171 | }) |
175 | .exec(); | 172 | .exec(); |
@@ -178,7 +175,7 @@ private slots: | |||
178 | .then<void>([&future, queue]() { future.setFinished(); }, | 175 | .then<void>([&future, queue]() { future.setFinished(); }, |
179 | [&future](int i, QString error) { | 176 | [&future](int i, QString error) { |
180 | if (i != MessageQueue::ErrorCodes::NoMessageFound) { | 177 | if (i != MessageQueue::ErrorCodes::NoMessageFound) { |
181 | Warning() << "Error while getting message from messagequeue: " << error; | 178 | SinkWarning() << "Error while getting message from messagequeue: " << error; |
182 | } | 179 | } |
183 | future.setFinished(); | 180 | future.setFinished(); |
184 | }) | 181 | }) |
@@ -192,12 +189,12 @@ private slots: | |||
192 | auto time = QSharedPointer<QTime>::create(); | 189 | auto time = QSharedPointer<QTime>::create(); |
193 | time->start(); | 190 | time->start(); |
194 | mPipeline->startTransaction(); | 191 | mPipeline->startTransaction(); |
195 | Trace() << "Cleaning up from " << mPipeline->cleanedUpRevision() + 1 << " to " << mLowerBoundRevision; | 192 | SinkTrace() << "Cleaning up from " << mPipeline->cleanedUpRevision() + 1 << " to " << mLowerBoundRevision; |
196 | for (qint64 revision = mPipeline->cleanedUpRevision() + 1; revision <= mLowerBoundRevision; revision++) { | 193 | for (qint64 revision = mPipeline->cleanedUpRevision() + 1; revision <= mLowerBoundRevision; revision++) { |
197 | mPipeline->cleanupRevision(revision); | 194 | mPipeline->cleanupRevision(revision); |
198 | } | 195 | } |
199 | mPipeline->commit(); | 196 | mPipeline->commit(); |
200 | Trace() << "Cleanup done." << Log::TraceTime(time->elapsed()); | 197 | SinkTrace() << "Cleanup done." << Log::TraceTime(time->elapsed()); |
201 | 198 | ||
202 | // Go through all message queues | 199 | // Go through all message queues |
203 | auto it = QSharedPointer<QListIterator<MessageQueue *>>::create(mCommandQueues); | 200 | auto it = QSharedPointer<QListIterator<MessageQueue *>>::create(mCommandQueues); |
@@ -208,8 +205,8 @@ private slots: | |||
208 | 205 | ||
209 | auto queue = it->next(); | 206 | auto queue = it->next(); |
210 | processQueue(queue) | 207 | processQueue(queue) |
211 | .then<void>([&future, time]() { | 208 | .then<void>([this, &future, time]() { |
212 | Trace() << "Queue processed." << Log::TraceTime(time->elapsed()); | 209 | SinkTrace() << "Queue processed." << Log::TraceTime(time->elapsed()); |
213 | future.setFinished(); | 210 | future.setFinished(); |
214 | }) | 211 | }) |
215 | .exec(); | 212 | .exec(); |
@@ -226,9 +223,6 @@ private: | |||
226 | InspectionFunction mInspect; | 223 | InspectionFunction mInspect; |
227 | }; | 224 | }; |
228 | 225 | ||
229 | #undef DEBUG_AREA | ||
230 | #define DEBUG_AREA "resource" | ||
231 | |||
232 | GenericResource::GenericResource(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier, const QSharedPointer<Pipeline> &pipeline ) | 226 | GenericResource::GenericResource(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier, const QSharedPointer<Pipeline> &pipeline ) |
233 | : Sink::Resource(), | 227 | : Sink::Resource(), |
234 | mUserQueue(Sink::storageLocation(), resourceInstanceIdentifier + ".userqueue"), | 228 | mUserQueue(Sink::storageLocation(), resourceInstanceIdentifier + ".userqueue"), |
@@ -240,7 +234,7 @@ GenericResource::GenericResource(const QByteArray &resourceType, const QByteArra | |||
240 | mClientLowerBoundRevision(std::numeric_limits<qint64>::max()) | 234 | mClientLowerBoundRevision(std::numeric_limits<qint64>::max()) |
241 | { | 235 | { |
242 | mPipeline->setResourceType(mResourceType); | 236 | mPipeline->setResourceType(mResourceType); |
243 | mProcessor = new CommandProcessor(mPipeline.data(), QList<MessageQueue *>() << &mUserQueue << &mSynchronizerQueue); | 237 | mProcessor = std::unique_ptr<CommandProcessor>(new CommandProcessor(mPipeline.data(), QList<MessageQueue *>() << &mUserQueue << &mSynchronizerQueue)); |
244 | mProcessor->setInspectionCommand([this](void const *command, size_t size) { | 238 | mProcessor->setInspectionCommand([this](void const *command, size_t size) { |
245 | flatbuffers::Verifier verifier((const uint8_t *)command, size); | 239 | flatbuffers::Verifier verifier((const uint8_t *)command, size); |
246 | if (Sink::Commands::VerifyInspectionBuffer(verifier)) { | 240 | if (Sink::Commands::VerifyInspectionBuffer(verifier)) { |
@@ -260,18 +254,18 @@ GenericResource::GenericResource(const QByteArray &resourceType, const QByteArra | |||
260 | [=]() { | 254 | [=]() { |
261 | Log_area("resource.inspection") << "Inspection was successful: " << inspectionType << inspectionId << entityId; | 255 | Log_area("resource.inspection") << "Inspection was successful: " << inspectionType << inspectionId << entityId; |
262 | Sink::Notification n; | 256 | Sink::Notification n; |
263 | n.type = Sink::Commands::NotificationType_Inspection; | 257 | n.type = Sink::Notification::Inspection; |
264 | n.id = inspectionId; | 258 | n.id = inspectionId; |
265 | n.code = Sink::Commands::NotificationCode_Success; | 259 | n.code = Sink::Notification::Success; |
266 | emit notify(n); | 260 | emit notify(n); |
267 | }, | 261 | }, |
268 | [=](int code, const QString &message) { | 262 | [=](int code, const QString &message) { |
269 | Warning_area("resource.inspection") << "Inspection failed: " << inspectionType << inspectionId << entityId << message; | 263 | Warning_area("resource.inspection") << "Inspection failed: " << inspectionType << inspectionId << entityId << message; |
270 | Sink::Notification n; | 264 | Sink::Notification n; |
271 | n.type = Sink::Commands::NotificationType_Inspection; | 265 | n.type = Sink::Notification::Inspection; |
272 | n.message = message; | 266 | n.message = message; |
273 | n.id = inspectionId; | 267 | n.id = inspectionId; |
274 | n.code = Sink::Commands::NotificationCode_Failure; | 268 | n.code = Sink::Notification::Failure; |
275 | emit notify(n); | 269 | emit notify(n); |
276 | }) | 270 | }) |
277 | .exec(); | 271 | .exec(); |
@@ -279,8 +273,14 @@ GenericResource::GenericResource(const QByteArray &resourceType, const QByteArra | |||
279 | } | 273 | } |
280 | return KAsync::error<void>(-1, "Invalid inspection command."); | 274 | return KAsync::error<void>(-1, "Invalid inspection command."); |
281 | }); | 275 | }); |
282 | QObject::connect(mProcessor, &CommandProcessor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); }); | 276 | { |
283 | QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, this, &Resource::revisionUpdated); | 277 | auto ret =QObject::connect(mProcessor.get(), &CommandProcessor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); }); |
278 | Q_ASSERT(ret); | ||
279 | } | ||
280 | { | ||
281 | auto ret = QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, this, &Resource::revisionUpdated); | ||
282 | Q_ASSERT(ret); | ||
283 | } | ||
284 | mClientLowerBoundRevision = mPipeline->cleanedUpRevision(); | 284 | mClientLowerBoundRevision = mPipeline->cleanedUpRevision(); |
285 | 285 | ||
286 | mCommitQueueTimer.setInterval(sCommitInterval); | 286 | mCommitQueueTimer.setInterval(sCommitInterval); |
@@ -290,13 +290,12 @@ GenericResource::GenericResource(const QByteArray &resourceType, const QByteArra | |||
290 | 290 | ||
291 | GenericResource::~GenericResource() | 291 | GenericResource::~GenericResource() |
292 | { | 292 | { |
293 | delete mProcessor; | ||
294 | } | 293 | } |
295 | 294 | ||
296 | KAsync::Job<void> GenericResource::inspect( | 295 | KAsync::Job<void> GenericResource::inspect( |
297 | int inspectionType, const QByteArray &inspectionId, const QByteArray &domainType, const QByteArray &entityId, const QByteArray &property, const QVariant &expectedValue) | 296 | int inspectionType, const QByteArray &inspectionId, const QByteArray &domainType, const QByteArray &entityId, const QByteArray &property, const QVariant &expectedValue) |
298 | { | 297 | { |
299 | Warning() << "Inspection not implemented"; | 298 | SinkWarning() << "Inspection not implemented"; |
300 | return KAsync::null<void>(); | 299 | return KAsync::null<void>(); |
301 | } | 300 | } |
302 | 301 | ||
@@ -329,13 +328,36 @@ void GenericResource::setupSynchronizer(const QSharedPointer<Synchronizer> &sync | |||
329 | void GenericResource::setupChangereplay(const QSharedPointer<ChangeReplay> &changeReplay) | 328 | void GenericResource::setupChangereplay(const QSharedPointer<ChangeReplay> &changeReplay) |
330 | { | 329 | { |
331 | mChangeReplay = changeReplay; | 330 | mChangeReplay = changeReplay; |
331 | { | ||
332 | auto ret = QObject::connect(mChangeReplay.data(), &ChangeReplay::replayingChanges, [this]() { | ||
333 | Sink::Notification n; | ||
334 | n.id = "changereplay"; | ||
335 | n.type = Sink::Notification::Status; | ||
336 | n.message = "Replaying changes."; | ||
337 | n.code = Sink::ApplicationDomain::BusyStatus; | ||
338 | emit notify(n); | ||
339 | }); | ||
340 | Q_ASSERT(ret); | ||
341 | } | ||
342 | { | ||
343 | auto ret = QObject::connect(mChangeReplay.data(), &ChangeReplay::changesReplayed, [this]() { | ||
344 | Sink::Notification n; | ||
345 | n.id = "changereplay"; | ||
346 | n.type = Sink::Notification::Status; | ||
347 | n.message = "All changes have been replayed."; | ||
348 | n.code = Sink::ApplicationDomain::ConnectedStatus; | ||
349 | emit notify(n); | ||
350 | }); | ||
351 | Q_ASSERT(ret); | ||
352 | } | ||
353 | |||
332 | mProcessor->setOldestUsedRevision(mChangeReplay->getLastReplayedRevision()); | 354 | mProcessor->setOldestUsedRevision(mChangeReplay->getLastReplayedRevision()); |
333 | enableChangeReplay(true); | 355 | enableChangeReplay(true); |
334 | } | 356 | } |
335 | 357 | ||
336 | void GenericResource::removeDataFromDisk() | 358 | void GenericResource::removeDataFromDisk() |
337 | { | 359 | { |
338 | Log() << "Removing the resource from disk: " << mResourceInstanceIdentifier; | 360 | SinkLog() << "Removing the resource from disk: " << mResourceInstanceIdentifier; |
339 | //Ensure we have no transaction or databases open | 361 | //Ensure we have no transaction or databases open |
340 | mSynchronizer.clear(); | 362 | mSynchronizer.clear(); |
341 | mChangeReplay.clear(); | 363 | mChangeReplay.clear(); |
@@ -363,7 +385,7 @@ qint64 GenericResource::diskUsage(const QByteArray &instanceIdentifier) | |||
363 | 385 | ||
364 | void GenericResource::onProcessorError(int errorCode, const QString &errorMessage) | 386 | void GenericResource::onProcessorError(int errorCode, const QString &errorMessage) |
365 | { | 387 | { |
366 | Warning() << "Received error from Processor: " << errorCode << errorMessage; | 388 | SinkWarning() << "Received error from Processor: " << errorCode << errorMessage; |
367 | mError = errorCode; | 389 | mError = errorCode; |
368 | } | 390 | } |
369 | 391 | ||
@@ -399,12 +421,27 @@ void GenericResource::processCommand(int commandId, const QByteArray &data) | |||
399 | KAsync::Job<void> GenericResource::synchronizeWithSource() | 421 | KAsync::Job<void> GenericResource::synchronizeWithSource() |
400 | { | 422 | { |
401 | return KAsync::start<void>([this](KAsync::Future<void> &future) { | 423 | return KAsync::start<void>([this](KAsync::Future<void> &future) { |
402 | Log() << " Synchronizing"; | 424 | |
425 | Sink::Notification n; | ||
426 | n.id = "sync"; | ||
427 | n.type = Sink::Notification::Status; | ||
428 | n.message = "Synchronization has started."; | ||
429 | n.code = Sink::ApplicationDomain::BusyStatus; | ||
430 | emit notify(n); | ||
431 | |||
432 | SinkLog() << " Synchronizing"; | ||
403 | // Changereplay would deadlock otherwise when trying to open the synchronization store | 433 | // Changereplay would deadlock otherwise when trying to open the synchronization store |
404 | enableChangeReplay(false); | 434 | enableChangeReplay(false); |
405 | mSynchronizer->synchronize() | 435 | mSynchronizer->synchronize() |
406 | .then<void>([this, &future]() { | 436 | .then<void>([this, &future]() { |
407 | Log() << "Done Synchronizing"; | 437 | SinkLog() << "Done Synchronizing"; |
438 | Sink::Notification n; | ||
439 | n.id = "sync"; | ||
440 | n.type = Sink::Notification::Status; | ||
441 | n.message = "Synchronization has ended."; | ||
442 | n.code = Sink::ApplicationDomain::ConnectedStatus; | ||
443 | emit notify(n); | ||
444 | |||
408 | enableChangeReplay(true); | 445 | enableChangeReplay(true); |
409 | future.setFinished(); | 446 | future.setFinished(); |
410 | }, [this, &future](int errorCode, const QString &error) { | 447 | }, [this, &future](int errorCode, const QString &error) { |