summaryrefslogtreecommitdiffstats
path: root/common/genericresource.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'common/genericresource.cpp')
-rw-r--r--common/genericresource.cpp44
1 files changed, 19 insertions, 25 deletions
diff --git a/common/genericresource.cpp b/common/genericresource.cpp
index ed7dd46..7136882 100644
--- a/common/genericresource.cpp
+++ b/common/genericresource.cpp
@@ -46,9 +46,6 @@ static int sCommitInterval = 10;
46 46
47using namespace Sink; 47using namespace Sink;
48 48
49#undef DEBUG_AREA
50#define DEBUG_AREA "resource.commandprocessor"
51
52/** 49/**
53 * Drives the pipeline using the output from all command queues 50 * Drives the pipeline using the output from all command queues
54 */ 51 */
@@ -56,12 +53,13 @@ class CommandProcessor : public QObject
56{ 53{
57 Q_OBJECT 54 Q_OBJECT
58 typedef std::function<KAsync::Job<void>(void const *, size_t)> InspectionFunction; 55 typedef std::function<KAsync::Job<void>(void const *, size_t)> InspectionFunction;
56 SINK_DEBUG_AREA("commandprocessor")
59 57
60public: 58public:
61 CommandProcessor(Sink::Pipeline *pipeline, QList<MessageQueue *> commandQueues) : QObject(), mPipeline(pipeline), mCommandQueues(commandQueues), mProcessingLock(false) 59 CommandProcessor(Sink::Pipeline *pipeline, QList<MessageQueue *> commandQueues) : QObject(), mPipeline(pipeline), mCommandQueues(commandQueues), mProcessingLock(false)
62 { 60 {
63 mLowerBoundRevision = Storage::maxRevision(mPipeline->storage().createTransaction(Storage::ReadOnly, [](const Sink::Storage::Error &error) { 61 mLowerBoundRevision = Storage::maxRevision(mPipeline->storage().createTransaction(Storage::ReadOnly, [](const Sink::Storage::Error &error) {
64 Warning() << error.message; 62 SinkWarning() << error.message;
65 })); 63 }));
66 64
67 for (auto queue : mCommandQueues) { 65 for (auto queue : mCommandQueues) {
@@ -80,7 +78,6 @@ public:
80 mInspect = f; 78 mInspect = f;
81 } 79 }
82 80
83
84signals: 81signals:
85 void error(int errorCode, const QString &errorMessage); 82 void error(int errorCode, const QString &errorMessage);
86 83
@@ -114,7 +111,7 @@ private slots:
114 111
115 KAsync::Job<qint64> processQueuedCommand(const Sink::QueuedCommand *queuedCommand) 112 KAsync::Job<qint64> processQueuedCommand(const Sink::QueuedCommand *queuedCommand)
116 { 113 {
117 Trace() << "Processing command: " << Sink::Commands::name(queuedCommand->commandId()); 114 SinkTrace() << "Processing command: " << Sink::Commands::name(queuedCommand->commandId());
118 // Throw command into appropriate pipeline 115 // Throw command into appropriate pipeline
119 switch (queuedCommand->commandId()) { 116 switch (queuedCommand->commandId()) {
120 case Sink::Commands::DeleteEntityCommand: 117 case Sink::Commands::DeleteEntityCommand:
@@ -138,21 +135,21 @@ private slots:
138 { 135 {
139 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(data.constData()), data.size()); 136 flatbuffers::Verifier verifyer(reinterpret_cast<const uint8_t *>(data.constData()), data.size());
140 if (!Sink::VerifyQueuedCommandBuffer(verifyer)) { 137 if (!Sink::VerifyQueuedCommandBuffer(verifyer)) {
141 Warning() << "invalid buffer"; 138 SinkWarning() << "invalid buffer";
142 // return KAsync::error<void, qint64>(1, "Invalid Buffer"); 139 // return KAsync::error<void, qint64>(1, "Invalid Buffer");
143 } 140 }
144 auto queuedCommand = Sink::GetQueuedCommand(data.constData()); 141 auto queuedCommand = Sink::GetQueuedCommand(data.constData());
145 const auto commandId = queuedCommand->commandId(); 142 const auto commandId = queuedCommand->commandId();
146 Trace() << "Dequeued Command: " << Sink::Commands::name(commandId); 143 SinkTrace() << "Dequeued Command: " << Sink::Commands::name(commandId);
147 return processQueuedCommand(queuedCommand) 144 return processQueuedCommand(queuedCommand)
148 .then<qint64, qint64>( 145 .then<qint64, qint64>(
149 [commandId](qint64 createdRevision) -> qint64 { 146 [this, commandId](qint64 createdRevision) -> qint64 {
150 Trace() << "Command pipeline processed: " << Sink::Commands::name(commandId); 147 SinkTrace() << "Command pipeline processed: " << Sink::Commands::name(commandId);
151 return createdRevision; 148 return createdRevision;
152 }, 149 },
153 [](int errorCode, QString errorMessage) { 150 [](int errorCode, QString errorMessage) {
154 // FIXME propagate error, we didn't handle it 151 // FIXME propagate error, we didn't handle it
155 Warning() << "Error while processing queue command: " << errorMessage; 152 SinkWarning() << "Error while processing queue command: " << errorMessage;
156 }); 153 });
157 } 154 }
158 155
@@ -169,7 +166,7 @@ private slots:
169 return KAsync::start<void>([this, data, time](KAsync::Future<void> &future) { 166 return KAsync::start<void>([this, data, time](KAsync::Future<void> &future) {
170 processQueuedCommand(data) 167 processQueuedCommand(data)
171 .then<void, qint64>([&future, this, time](qint64 createdRevision) { 168 .then<void, qint64>([&future, this, time](qint64 createdRevision) {
172 Trace() << "Created revision " << createdRevision << ". Processing took: " << Log::TraceTime(time->elapsed()); 169 SinkTrace() << "Created revision " << createdRevision << ". Processing took: " << Log::TraceTime(time->elapsed());
173 future.setFinished(); 170 future.setFinished();
174 }) 171 })
175 .exec(); 172 .exec();
@@ -178,7 +175,7 @@ private slots:
178 .then<void>([&future, queue]() { future.setFinished(); }, 175 .then<void>([&future, queue]() { future.setFinished(); },
179 [&future](int i, QString error) { 176 [&future](int i, QString error) {
180 if (i != MessageQueue::ErrorCodes::NoMessageFound) { 177 if (i != MessageQueue::ErrorCodes::NoMessageFound) {
181 Warning() << "Error while getting message from messagequeue: " << error; 178 SinkWarning() << "Error while getting message from messagequeue: " << error;
182 } 179 }
183 future.setFinished(); 180 future.setFinished();
184 }) 181 })
@@ -192,12 +189,12 @@ private slots:
192 auto time = QSharedPointer<QTime>::create(); 189 auto time = QSharedPointer<QTime>::create();
193 time->start(); 190 time->start();
194 mPipeline->startTransaction(); 191 mPipeline->startTransaction();
195 Trace() << "Cleaning up from " << mPipeline->cleanedUpRevision() + 1 << " to " << mLowerBoundRevision; 192 SinkTrace() << "Cleaning up from " << mPipeline->cleanedUpRevision() + 1 << " to " << mLowerBoundRevision;
196 for (qint64 revision = mPipeline->cleanedUpRevision() + 1; revision <= mLowerBoundRevision; revision++) { 193 for (qint64 revision = mPipeline->cleanedUpRevision() + 1; revision <= mLowerBoundRevision; revision++) {
197 mPipeline->cleanupRevision(revision); 194 mPipeline->cleanupRevision(revision);
198 } 195 }
199 mPipeline->commit(); 196 mPipeline->commit();
200 Trace() << "Cleanup done." << Log::TraceTime(time->elapsed()); 197 SinkTrace() << "Cleanup done." << Log::TraceTime(time->elapsed());
201 198
202 // Go through all message queues 199 // Go through all message queues
203 auto it = QSharedPointer<QListIterator<MessageQueue *>>::create(mCommandQueues); 200 auto it = QSharedPointer<QListIterator<MessageQueue *>>::create(mCommandQueues);
@@ -208,8 +205,8 @@ private slots:
208 205
209 auto queue = it->next(); 206 auto queue = it->next();
210 processQueue(queue) 207 processQueue(queue)
211 .then<void>([&future, time]() { 208 .then<void>([this, &future, time]() {
212 Trace() << "Queue processed." << Log::TraceTime(time->elapsed()); 209 SinkTrace() << "Queue processed." << Log::TraceTime(time->elapsed());
213 future.setFinished(); 210 future.setFinished();
214 }) 211 })
215 .exec(); 212 .exec();
@@ -226,9 +223,6 @@ private:
226 InspectionFunction mInspect; 223 InspectionFunction mInspect;
227}; 224};
228 225
229#undef DEBUG_AREA
230#define DEBUG_AREA "resource"
231
232GenericResource::GenericResource(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier, const QSharedPointer<Pipeline> &pipeline ) 226GenericResource::GenericResource(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier, const QSharedPointer<Pipeline> &pipeline )
233 : Sink::Resource(), 227 : Sink::Resource(),
234 mUserQueue(Sink::storageLocation(), resourceInstanceIdentifier + ".userqueue"), 228 mUserQueue(Sink::storageLocation(), resourceInstanceIdentifier + ".userqueue"),
@@ -301,7 +295,7 @@ GenericResource::~GenericResource()
301KAsync::Job<void> GenericResource::inspect( 295KAsync::Job<void> GenericResource::inspect(
302 int inspectionType, const QByteArray &inspectionId, const QByteArray &domainType, const QByteArray &entityId, const QByteArray &property, const QVariant &expectedValue) 296 int inspectionType, const QByteArray &inspectionId, const QByteArray &domainType, const QByteArray &entityId, const QByteArray &property, const QVariant &expectedValue)
303{ 297{
304 Warning() << "Inspection not implemented"; 298 SinkWarning() << "Inspection not implemented";
305 return KAsync::null<void>(); 299 return KAsync::null<void>();
306} 300}
307 301
@@ -363,7 +357,7 @@ void GenericResource::setupChangereplay(const QSharedPointer<ChangeReplay> &chan
363 357
364void GenericResource::removeDataFromDisk() 358void GenericResource::removeDataFromDisk()
365{ 359{
366 Log() << "Removing the resource from disk: " << mResourceInstanceIdentifier; 360 SinkLog() << "Removing the resource from disk: " << mResourceInstanceIdentifier;
367 //Ensure we have no transaction or databases open 361 //Ensure we have no transaction or databases open
368 mSynchronizer.clear(); 362 mSynchronizer.clear();
369 mChangeReplay.clear(); 363 mChangeReplay.clear();
@@ -391,7 +385,7 @@ qint64 GenericResource::diskUsage(const QByteArray &instanceIdentifier)
391 385
392void GenericResource::onProcessorError(int errorCode, const QString &errorMessage) 386void GenericResource::onProcessorError(int errorCode, const QString &errorMessage)
393{ 387{
394 Warning() << "Received error from Processor: " << errorCode << errorMessage; 388 SinkWarning() << "Received error from Processor: " << errorCode << errorMessage;
395 mError = errorCode; 389 mError = errorCode;
396} 390}
397 391
@@ -435,12 +429,12 @@ KAsync::Job<void> GenericResource::synchronizeWithSource()
435 n.code = Sink::ApplicationDomain::BusyStatus; 429 n.code = Sink::ApplicationDomain::BusyStatus;
436 emit notify(n); 430 emit notify(n);
437 431
438 Log() << " Synchronizing"; 432 SinkLog() << " Synchronizing";
439 // Changereplay would deadlock otherwise when trying to open the synchronization store 433 // Changereplay would deadlock otherwise when trying to open the synchronization store
440 enableChangeReplay(false); 434 enableChangeReplay(false);
441 mSynchronizer->synchronize() 435 mSynchronizer->synchronize()
442 .then<void>([this, &future]() { 436 .then<void>([this, &future]() {
443 Log() << "Done Synchronizing"; 437 SinkLog() << "Done Synchronizing";
444 Sink::Notification n; 438 Sink::Notification n;
445 n.id = "sync"; 439 n.id = "sync";
446 n.type = Sink::Notification::Status; 440 n.type = Sink::Notification::Status;