summaryrefslogtreecommitdiffstats
path: root/common/genericresource.cpp
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2016-07-07 22:23:49 +0200
committerChristian Mollekopf <chrigi_1@fastmail.fm>2016-07-07 22:23:49 +0200
commitda2b049e248c1ad7efeb53685158a205335e4e36 (patch)
tree1e7e5e940e9b760b2108081b1d2f3879cebdb0ff /common/genericresource.cpp
parent9bcb822963fc96c94dbe7dcc4134dcd2dac454ff (diff)
downloadsink-da2b049e248c1ad7efeb53685158a205335e4e36.tar.gz
sink-da2b049e248c1ad7efeb53685158a205335e4e36.zip
A new debug system.
Instead of a single #define as debug area the new system allows for an identifier for each debug message with the structure component.area. The component is a dot separated identifier of the runtime component, such as the process or the plugin. The area is the code component, and can be as such defined at compiletime. The idea of this system is that it becomes possible to i.e. look at the output of all messages in the query subsystem of a specific resource (something that happens in the client process, but in the resource-specific subcomponent). The new macros are supposed to be less likely to clash with other names, hence the new names.
Diffstat (limited to 'common/genericresource.cpp')
-rw-r--r--common/genericresource.cpp44
1 files changed, 19 insertions, 25 deletions
diff --git a/common/genericresource.cpp b/common/genericresource.cpp
index ed7dd46..7136882 100644
--- a/common/genericresource.cpp
+++ b/common/genericresource.cpp
@@ -46,9 +46,6 @@ static int sCommitInterval = 10;
46 46
47using namespace Sink; 47using namespace Sink;
48 48
49#undef DEBUG_AREA
50#define DEBUG_AREA "resource.commandprocessor"
51
52/** 49/**
53 * Drives the pipeline using the output from all command queues 50 * Drives the pipeline using the output from all command queues
54 */ 51 */
@@ -56,12 +53,13 @@ class CommandProcessor : public QObject
56{ 53{
57 Q_OBJECT 54 Q_OBJECT
58 typedef std::function<KAsync::Job<void>(void const *, size_t)> InspectionFunction; 55 typedef std::function<KAsync::Job<void>(void const *, size_t)> InspectionFunction;
56 SINK_DEBUG_AREA("commandprocessor")
59 57
60public: 58public:
61 CommandProcessor(Sink::Pipeline *pipeline, QList<MessageQueue *> commandQueues) : QObject(), mPipeline(pipeline), mCommandQueues(commandQueues), mProcessingLock(false) 59 CommandProcessor(Sink::Pipeline *pipeline, QList<MessageQueue *> commandQueues) : QObject(), mPipeline(pipeline), mCommandQueues(commandQueues), mProcessingLock(false)
62 { 60 {
63 mLowerBoundRevision = Storage::maxRevision(mPipeline->storage().createTransaction(Storage::ReadOnly, [](const Sink::Storage::Error &error) { 61 mLowerBoundRevision = Storage::maxRevision(mPipeline->storage().createTransaction(Storage::ReadOnly, [](const Sink::Storage::Error &error) {
64 Warning() << error.message; 62 SinkWarning() << error.message;
65 })); 63 }));
66 64
67 for (auto queue : mCommandQueues) { 65 for (auto queue : mCommandQueues) {
@@ -80,7 +78,6 @@ public:
80 mInspect = f; 78 mInspect = f;
81 } 79 }
82 80
83
84signals: 81signals:
85 void error(int errorCode, const QString &errorMessage); 82 void error(int errorCode, const QString &errorMessage);
86 83
@@ -114,7 +111,7 @@ private slots:
114 111
115 KAsync::Job<qint64> processQueuedCommand(const Sink::QueuedCommand *queuedCommand) 112 KAsync::Job<qint64> processQueuedCommand(const Sink::QueuedCommand *queuedCommand)
116 { 113 {
117 Trace() << "Processing command: " << Sink::Commands::name(queuedCommand->commandId()); 114 SinkTrace() << "Processing command: " << Sink::Commands::name(queuedCommand->commandId());
118 // Throw command into appropriate pipeline 115 // Throw command into appropriate pipeline
119 switch (queuedCommand->commandId()) { 116 switch (queuedCommand->commandId()) {
120 case Sink::Commands::DeleteEntityCommand: 117 case Sink::Commands::DeleteEntityCommand:
@@ -138,21 +135,21 @@ private slots:
138 { 135 {
139 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(data.constData()), data.size()); 136 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(data.constData()), data.size());
140 if (!Sink::VerifyQueuedCommandBuffer(verifyer)) { 137 if (!Sink::VerifyQueuedCommandBuffer(verifyer)) {
141 Warning() << "invalid buffer"; 138 SinkWarning() << "invalid buffer";
142 // return KAsync::error<void, qint64>(1, "Invalid Buffer"); 139 // return KAsync::error<void, qint64>(1, "Invalid Buffer");
143 } 140 }
144 auto queuedCommand = Sink::GetQueuedCommand(data.constData()); 141 auto queuedCommand = Sink::GetQueuedCommand(data.constData());
145 const auto commandId = queuedCommand->commandId(); 142 const auto commandId = queuedCommand->commandId();
146 Trace() << "Dequeued Command: " << Sink::Commands::name(commandId); 143 SinkTrace() << "Dequeued Command: " << Sink::Commands::name(commandId);
147 return processQueuedCommand(queuedCommand) 144 return processQueuedCommand(queuedCommand)
148 .then<qint64, qint64>( 145 .then<qint64, qint64>(
149 [commandId](qint64 createdRevision) -> qint64 { 146 [this, commandId](qint64 createdRevision) -> qint64 {
150 Trace() << "Command pipeline processed: " << Sink::Commands::name(commandId); 147 SinkTrace() << "Command pipeline processed: " << Sink::Commands::name(commandId);
151 return createdRevision; 148 return createdRevision;
152 }, 149 },
153 [](int errorCode, QString errorMessage) { 150 [](int errorCode, QString errorMessage) {
154 // FIXME propagate error, we didn't handle it 151 // FIXME propagate error, we didn't handle it
155 Warning() << "Error while processing queue command: " << errorMessage; 152 SinkWarning() << "Error while processing queue command: " << errorMessage;
156 }); 153 });
157 } 154 }
158 155
@@ -169,7 +166,7 @@ private slots:
169 return KAsync::start<void>([this, data, time](KAsync::Future<void> &future) { 166 return KAsync::start<void>([this, data, time](KAsync::Future<void> &future) {
170 processQueuedCommand(data) 167 processQueuedCommand(data)
171 .then<void, qint64>([&future, this, time](qint64 createdRevision) { 168 .then<void, qint64>([&future, this, time](qint64 createdRevision) {
172 Trace() << "Created revision " << createdRevision << ". Processing took: " << Log::TraceTime(time->elapsed()); 169 SinkTrace() << "Created revision " << createdRevision << ". Processing took: " << Log::TraceTime(time->elapsed());
173 future.setFinished(); 170 future.setFinished();
174 }) 171 })
175 .exec(); 172 .exec();
@@ -178,7 +175,7 @@ private slots:
178 .then<void>([&future, queue]() { future.setFinished(); }, 175 .then<void>([&future, queue]() { future.setFinished(); },
179 [&future](int i, QString error) { 176 [&future](int i, QString error) {
180 if (i != MessageQueue::ErrorCodes::NoMessageFound) { 177 if (i != MessageQueue::ErrorCodes::NoMessageFound) {
181 Warning() << "Error while getting message from messagequeue: " << error; 178 SinkWarning() << "Error while getting message from messagequeue: " << error;
182 } 179 }
183 future.setFinished(); 180 future.setFinished();
184 }) 181 })
@@ -192,12 +189,12 @@ private slots:
192 auto time = QSharedPointer<QTime>::create(); 189 auto time = QSharedPointer<QTime>::create();
193 time->start(); 190 time->start();
194 mPipeline->startTransaction(); 191 mPipeline->startTransaction();
195 Trace() << "Cleaning up from " << mPipeline->cleanedUpRevision() + 1 << " to " << mLowerBoundRevision; 192 SinkTrace() << "Cleaning up from " << mPipeline->cleanedUpRevision() + 1 << " to " << mLowerBoundRevision;
196 for (qint64 revision = mPipeline->cleanedUpRevision() + 1; revision <= mLowerBoundRevision; revision++) { 193 for (qint64 revision = mPipeline->cleanedUpRevision() + 1; revision <= mLowerBoundRevision; revision++) {
197 mPipeline->cleanupRevision(revision); 194 mPipeline->cleanupRevision(revision);
198 } 195 }
199 mPipeline->commit(); 196 mPipeline->commit();
200 Trace() << "Cleanup done." << Log::TraceTime(time->elapsed()); 197 SinkTrace() << "Cleanup done." << Log::TraceTime(time->elapsed());
201 198
202 // Go through all message queues 199 // Go through all message queues
203 auto it = QSharedPointer<QListIterator<MessageQueue *>>::create(mCommandQueues); 200 auto it = QSharedPointer<QListIterator<MessageQueue *>>::create(mCommandQueues);
@@ -208,8 +205,8 @@ private slots:
208 205
209 auto queue = it->next(); 206 auto queue = it->next();
210 processQueue(queue) 207 processQueue(queue)
211 .then<void>([&future, time]() { 208 .then<void>([this, &future, time]() {
212 Trace() << "Queue processed." << Log::TraceTime(time->elapsed()); 209 SinkTrace() << "Queue processed." << Log::TraceTime(time->elapsed());
213 future.setFinished(); 210 future.setFinished();
214 }) 211 })
215 .exec(); 212 .exec();
@@ -226,9 +223,6 @@ private:
226 InspectionFunction mInspect; 223 InspectionFunction mInspect;
227}; 224};
228 225
229#undef DEBUG_AREA
230#define DEBUG_AREA "resource"
231
232GenericResource::GenericResource(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier, const QSharedPointer<Pipeline> &pipeline ) 226GenericResource::GenericResource(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier, const QSharedPointer<Pipeline> &pipeline )
233 : Sink::Resource(), 227 : Sink::Resource(),
234 mUserQueue(Sink::storageLocation(), resourceInstanceIdentifier + ".userqueue"), 228 mUserQueue(Sink::storageLocation(), resourceInstanceIdentifier + ".userqueue"),
@@ -301,7 +295,7 @@ GenericResource::~GenericResource()
301KAsync::Job<void> GenericResource::inspect( 295KAsync::Job<void> GenericResource::inspect(
302 int inspectionType, const QByteArray &inspectionId, const QByteArray &domainType, const QByteArray &entityId, const QByteArray &property, const QVariant &expectedValue) 296 int inspectionType, const QByteArray &inspectionId, const QByteArray &domainType, const QByteArray &entityId, const QByteArray &property, const QVariant &expectedValue)
303{ 297{
304 Warning() << "Inspection not implemented"; 298 SinkWarning() << "Inspection not implemented";
305 return KAsync::null<void>(); 299 return KAsync::null<void>();
306} 300}
307 301
@@ -363,7 +357,7 @@ void GenericResource::setupChangereplay(const QSharedPointer<ChangeReplay> &chan
363 357
364void GenericResource::removeDataFromDisk() 358void GenericResource::removeDataFromDisk()
365{ 359{
366 Log() << "Removing the resource from disk: " << mResourceInstanceIdentifier; 360 SinkLog() << "Removing the resource from disk: " << mResourceInstanceIdentifier;
367 //Ensure we have no transaction or databases open 361 //Ensure we have no transaction or databases open
368 mSynchronizer.clear(); 362 mSynchronizer.clear();
369 mChangeReplay.clear(); 363 mChangeReplay.clear();
@@ -391,7 +385,7 @@ qint64 GenericResource::diskUsage(const QByteArray &instanceIdentifier)
391 385
392void GenericResource::onProcessorError(int errorCode, const QString &errorMessage) 386void GenericResource::onProcessorError(int errorCode, const QString &errorMessage)
393{ 387{
394 Warning() << "Received error from Processor: " << errorCode << errorMessage; 388 SinkWarning() << "Received error from Processor: " << errorCode << errorMessage;
395 mError = errorCode; 389 mError = errorCode;
396} 390}
397 391
@@ -435,12 +429,12 @@ KAsync::Job<void> GenericResource::synchronizeWithSource()
435 n.code = Sink::ApplicationDomain::BusyStatus; 429 n.code = Sink::ApplicationDomain::BusyStatus;
436 emit notify(n); 430 emit notify(n);
437 431
438 Log() << " Synchronizing"; 432 SinkLog() << " Synchronizing";
439 // Changereplay would deadlock otherwise when trying to open the synchronization store 433 // Changereplay would deadlock otherwise when trying to open the synchronization store
440 enableChangeReplay(false); 434 enableChangeReplay(false);
441 mSynchronizer->synchronize() 435 mSynchronizer->synchronize()
442 .then<void>([this, &future]() { 436 .then<void>([this, &future]() {
443 Log() << "Done Synchronizing"; 437 SinkLog() << "Done Synchronizing";
444 Sink::Notification n; 438 Sink::Notification n;
445 n.id = "sync"; 439 n.id = "sync";
446 n.type = Sink::Notification::Status; 440 n.type = Sink::Notification::Status;