diff options
Diffstat (limited to 'common/genericresource.cpp')
-rw-r--r-- | common/genericresource.cpp | 44 |
1 files changed, 19 insertions, 25 deletions
diff --git a/common/genericresource.cpp b/common/genericresource.cpp index ed7dd46..7136882 100644 --- a/common/genericresource.cpp +++ b/common/genericresource.cpp | |||
@@ -46,9 +46,6 @@ static int sCommitInterval = 10; | |||
46 | 46 | ||
47 | using namespace Sink; | 47 | using namespace Sink; |
48 | 48 | ||
49 | #undef DEBUG_AREA | ||
50 | #define DEBUG_AREA "resource.commandprocessor" | ||
51 | |||
52 | /** | 49 | /** |
53 | * Drives the pipeline using the output from all command queues | 50 | * Drives the pipeline using the output from all command queues |
54 | */ | 51 | */ |
@@ -56,12 +53,13 @@ class CommandProcessor : public QObject | |||
56 | { | 53 | { |
57 | Q_OBJECT | 54 | Q_OBJECT |
58 | typedef std::function<KAsync::Job<void>(void const *, size_t)> InspectionFunction; | 55 | typedef std::function<KAsync::Job<void>(void const *, size_t)> InspectionFunction; |
56 | SINK_DEBUG_AREA("commandprocessor") | ||
59 | 57 | ||
60 | public: | 58 | public: |
61 | CommandProcessor(Sink::Pipeline *pipeline, QList<MessageQueue *> commandQueues) : QObject(), mPipeline(pipeline), mCommandQueues(commandQueues), mProcessingLock(false) | 59 | CommandProcessor(Sink::Pipeline *pipeline, QList<MessageQueue *> commandQueues) : QObject(), mPipeline(pipeline), mCommandQueues(commandQueues), mProcessingLock(false) |
62 | { | 60 | { |
63 | mLowerBoundRevision = Storage::maxRevision(mPipeline->storage().createTransaction(Storage::ReadOnly, [](const Sink::Storage::Error &error) { | 61 | mLowerBoundRevision = Storage::maxRevision(mPipeline->storage().createTransaction(Storage::ReadOnly, [](const Sink::Storage::Error &error) { |
64 | Warning() << error.message; | 62 | SinkWarning() << error.message; |
65 | })); | 63 | })); |
66 | 64 | ||
67 | for (auto queue : mCommandQueues) { | 65 | for (auto queue : mCommandQueues) { |
@@ -80,7 +78,6 @@ public: | |||
80 | mInspect = f; | 78 | mInspect = f; |
81 | } | 79 | } |
82 | 80 | ||
83 | |||
84 | signals: | 81 | signals: |
85 | void error(int errorCode, const QString &errorMessage); | 82 | void error(int errorCode, const QString &errorMessage); |
86 | 83 | ||
@@ -114,7 +111,7 @@ private slots: | |||
114 | 111 | ||
115 | KAsync::Job<qint64> processQueuedCommand(const Sink::QueuedCommand *queuedCommand) | 112 | KAsync::Job<qint64> processQueuedCommand(const Sink::QueuedCommand *queuedCommand) |
116 | { | 113 | { |
117 | Trace() << "Processing command: " << Sink::Commands::name(queuedCommand->commandId()); | 114 | SinkTrace() << "Processing command: " << Sink::Commands::name(queuedCommand->commandId()); |
118 | // Throw command into appropriate pipeline | 115 | // Throw command into appropriate pipeline |
119 | switch (queuedCommand->commandId()) { | 116 | switch (queuedCommand->commandId()) { |
120 | case Sink::Commands::DeleteEntityCommand: | 117 | case Sink::Commands::DeleteEntityCommand: |
@@ -138,21 +135,21 @@ private slots: | |||
138 | { | 135 | { |
139 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(data.constData()), data.size()); | 136 | flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(data.constData()), data.size()); |
140 | if (!Sink::VerifyQueuedCommandBuffer(verifyer)) { | 137 | if (!Sink::VerifyQueuedCommandBuffer(verifyer)) { |
141 | Warning() << "invalid buffer"; | 138 | SinkWarning() << "invalid buffer"; |
142 | // return KAsync::error<void, qint64>(1, "Invalid Buffer"); | 139 | // return KAsync::error<void, qint64>(1, "Invalid Buffer"); |
143 | } | 140 | } |
144 | auto queuedCommand = Sink::GetQueuedCommand(data.constData()); | 141 | auto queuedCommand = Sink::GetQueuedCommand(data.constData()); |
145 | const auto commandId = queuedCommand->commandId(); | 142 | const auto commandId = queuedCommand->commandId(); |
146 | Trace() << "Dequeued Command: " << Sink::Commands::name(commandId); | 143 | SinkTrace() << "Dequeued Command: " << Sink::Commands::name(commandId); |
147 | return processQueuedCommand(queuedCommand) | 144 | return processQueuedCommand(queuedCommand) |
148 | .then<qint64, qint64>( | 145 | .then<qint64, qint64>( |
149 | [commandId](qint64 createdRevision) -> qint64 { | 146 | [this, commandId](qint64 createdRevision) -> qint64 { |
150 | Trace() << "Command pipeline processed: " << Sink::Commands::name(commandId); | 147 | SinkTrace() << "Command pipeline processed: " << Sink::Commands::name(commandId); |
151 | return createdRevision; | 148 | return createdRevision; |
152 | }, | 149 | }, |
153 | [](int errorCode, QString errorMessage) { | 150 | [](int errorCode, QString errorMessage) { |
154 | // FIXME propagate error, we didn't handle it | 151 | // FIXME propagate error, we didn't handle it |
155 | Warning() << "Error while processing queue command: " << errorMessage; | 152 | SinkWarning() << "Error while processing queue command: " << errorMessage; |
156 | }); | 153 | }); |
157 | } | 154 | } |
158 | 155 | ||
@@ -169,7 +166,7 @@ private slots: | |||
169 | return KAsync::start<void>([this, data, time](KAsync::Future<void> &future) { | 166 | return KAsync::start<void>([this, data, time](KAsync::Future<void> &future) { |
170 | processQueuedCommand(data) | 167 | processQueuedCommand(data) |
171 | .then<void, qint64>([&future, this, time](qint64 createdRevision) { | 168 | .then<void, qint64>([&future, this, time](qint64 createdRevision) { |
172 | Trace() << "Created revision " << createdRevision << ". Processing took: " << Log::TraceTime(time->elapsed()); | 169 | SinkTrace() << "Created revision " << createdRevision << ". Processing took: " << Log::TraceTime(time->elapsed()); |
173 | future.setFinished(); | 170 | future.setFinished(); |
174 | }) | 171 | }) |
175 | .exec(); | 172 | .exec(); |
@@ -178,7 +175,7 @@ private slots: | |||
178 | .then<void>([&future, queue]() { future.setFinished(); }, | 175 | .then<void>([&future, queue]() { future.setFinished(); }, |
179 | [&future](int i, QString error) { | 176 | [&future](int i, QString error) { |
180 | if (i != MessageQueue::ErrorCodes::NoMessageFound) { | 177 | if (i != MessageQueue::ErrorCodes::NoMessageFound) { |
181 | Warning() << "Error while getting message from messagequeue: " << error; | 178 | SinkWarning() << "Error while getting message from messagequeue: " << error; |
182 | } | 179 | } |
183 | future.setFinished(); | 180 | future.setFinished(); |
184 | }) | 181 | }) |
@@ -192,12 +189,12 @@ private slots: | |||
192 | auto time = QSharedPointer<QTime>::create(); | 189 | auto time = QSharedPointer<QTime>::create(); |
193 | time->start(); | 190 | time->start(); |
194 | mPipeline->startTransaction(); | 191 | mPipeline->startTransaction(); |
195 | Trace() << "Cleaning up from " << mPipeline->cleanedUpRevision() + 1 << " to " << mLowerBoundRevision; | 192 | SinkTrace() << "Cleaning up from " << mPipeline->cleanedUpRevision() + 1 << " to " << mLowerBoundRevision; |
196 | for (qint64 revision = mPipeline->cleanedUpRevision() + 1; revision <= mLowerBoundRevision; revision++) { | 193 | for (qint64 revision = mPipeline->cleanedUpRevision() + 1; revision <= mLowerBoundRevision; revision++) { |
197 | mPipeline->cleanupRevision(revision); | 194 | mPipeline->cleanupRevision(revision); |
198 | } | 195 | } |
199 | mPipeline->commit(); | 196 | mPipeline->commit(); |
200 | Trace() << "Cleanup done." << Log::TraceTime(time->elapsed()); | 197 | SinkTrace() << "Cleanup done." << Log::TraceTime(time->elapsed()); |
201 | 198 | ||
202 | // Go through all message queues | 199 | // Go through all message queues |
203 | auto it = QSharedPointer<QListIterator<MessageQueue *>>::create(mCommandQueues); | 200 | auto it = QSharedPointer<QListIterator<MessageQueue *>>::create(mCommandQueues); |
@@ -208,8 +205,8 @@ private slots: | |||
208 | 205 | ||
209 | auto queue = it->next(); | 206 | auto queue = it->next(); |
210 | processQueue(queue) | 207 | processQueue(queue) |
211 | .then<void>([&future, time]() { | 208 | .then<void>([this, &future, time]() { |
212 | Trace() << "Queue processed." << Log::TraceTime(time->elapsed()); | 209 | SinkTrace() << "Queue processed." << Log::TraceTime(time->elapsed()); |
213 | future.setFinished(); | 210 | future.setFinished(); |
214 | }) | 211 | }) |
215 | .exec(); | 212 | .exec(); |
@@ -226,9 +223,6 @@ private: | |||
226 | InspectionFunction mInspect; | 223 | InspectionFunction mInspect; |
227 | }; | 224 | }; |
228 | 225 | ||
229 | #undef DEBUG_AREA | ||
230 | #define DEBUG_AREA "resource" | ||
231 | |||
232 | GenericResource::GenericResource(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier, const QSharedPointer<Pipeline> &pipeline ) | 226 | GenericResource::GenericResource(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier, const QSharedPointer<Pipeline> &pipeline ) |
233 | : Sink::Resource(), | 227 | : Sink::Resource(), |
234 | mUserQueue(Sink::storageLocation(), resourceInstanceIdentifier + ".userqueue"), | 228 | mUserQueue(Sink::storageLocation(), resourceInstanceIdentifier + ".userqueue"), |
@@ -301,7 +295,7 @@ GenericResource::~GenericResource() | |||
301 | KAsync::Job<void> GenericResource::inspect( | 295 | KAsync::Job<void> GenericResource::inspect( |
302 | int inspectionType, const QByteArray &inspectionId, const QByteArray &domainType, const QByteArray &entityId, const QByteArray &property, const QVariant &expectedValue) | 296 | int inspectionType, const QByteArray &inspectionId, const QByteArray &domainType, const QByteArray &entityId, const QByteArray &property, const QVariant &expectedValue) |
303 | { | 297 | { |
304 | Warning() << "Inspection not implemented"; | 298 | SinkWarning() << "Inspection not implemented"; |
305 | return KAsync::null<void>(); | 299 | return KAsync::null<void>(); |
306 | } | 300 | } |
307 | 301 | ||
@@ -363,7 +357,7 @@ void GenericResource::setupChangereplay(const QSharedPointer<ChangeReplay> &chan | |||
363 | 357 | ||
364 | void GenericResource::removeDataFromDisk() | 358 | void GenericResource::removeDataFromDisk() |
365 | { | 359 | { |
366 | Log() << "Removing the resource from disk: " << mResourceInstanceIdentifier; | 360 | SinkLog() << "Removing the resource from disk: " << mResourceInstanceIdentifier; |
367 | //Ensure we have no transaction or databases open | 361 | //Ensure we have no transaction or databases open |
368 | mSynchronizer.clear(); | 362 | mSynchronizer.clear(); |
369 | mChangeReplay.clear(); | 363 | mChangeReplay.clear(); |
@@ -391,7 +385,7 @@ qint64 GenericResource::diskUsage(const QByteArray &instanceIdentifier) | |||
391 | 385 | ||
392 | void GenericResource::onProcessorError(int errorCode, const QString &errorMessage) | 386 | void GenericResource::onProcessorError(int errorCode, const QString &errorMessage) |
393 | { | 387 | { |
394 | Warning() << "Received error from Processor: " << errorCode << errorMessage; | 388 | SinkWarning() << "Received error from Processor: " << errorCode << errorMessage; |
395 | mError = errorCode; | 389 | mError = errorCode; |
396 | } | 390 | } |
397 | 391 | ||
@@ -435,12 +429,12 @@ KAsync::Job<void> GenericResource::synchronizeWithSource() | |||
435 | n.code = Sink::ApplicationDomain::BusyStatus; | 429 | n.code = Sink::ApplicationDomain::BusyStatus; |
436 | emit notify(n); | 430 | emit notify(n); |
437 | 431 | ||
438 | Log() << " Synchronizing"; | 432 | SinkLog() << " Synchronizing"; |
439 | // Changereplay would deadlock otherwise when trying to open the synchronization store | 433 | // Changereplay would deadlock otherwise when trying to open the synchronization store |
440 | enableChangeReplay(false); | 434 | enableChangeReplay(false); |
441 | mSynchronizer->synchronize() | 435 | mSynchronizer->synchronize() |
442 | .then<void>([this, &future]() { | 436 | .then<void>([this, &future]() { |
443 | Log() << "Done Synchronizing"; | 437 | SinkLog() << "Done Synchronizing"; |
444 | Sink::Notification n; | 438 | Sink::Notification n; |
445 | n.id = "sync"; | 439 | n.id = "sync"; |
446 | n.type = Sink::Notification::Status; | 440 | n.type = Sink::Notification::Status; |