summaryrefslogtreecommitdiffstats
path: root/common/commandprocessor.h
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2016-11-28 23:04:59 +0100
committerChristian Mollekopf <chrigi_1@fastmail.fm>2016-11-28 23:04:59 +0100
commit3e7b8fe8b8cca75b546c8cac2c09ce231861f21b (patch)
tree36cf1849ff30a986de56d931d60ce1c88660ec83 /common/commandprocessor.h
parent7fdcc36a1a352bb869020ade8a8aa697c3e8b80c (diff)
downloadsink-3e7b8fe8b8cca75b546c8cac2c09ce231861f21b.tar.gz
sink-3e7b8fe8b8cca75b546c8cac2c09ce231861f21b.zip
Used the CommandProcessor as central place for all command processing.
Diffstat (limited to 'common/commandprocessor.h')
-rw-r--r--common/commandprocessor.h19
1 files changed, 16 insertions, 3 deletions
diff --git a/common/commandprocessor.h b/common/commandprocessor.h
index d00cf43..a807f46 100644
--- a/common/commandprocessor.h
+++ b/common/commandprocessor.h
@@ -22,19 +22,20 @@
22#include "sink_export.h" 22#include "sink_export.h"
23 23
24#include <QObject> 24#include <QObject>
25#include <QTimer>
25#include <Async/Async> 26#include <Async/Async>
26#include <functional> 27#include <functional>
27 28
28#include "log.h" 29#include "log.h"
29#include "notification.h" 30#include "notification.h"
30 31#include "messagequeue.h"
31class MessageQueue;
32 32
33namespace Sink { 33namespace Sink {
34 class Pipeline; 34 class Pipeline;
35 class Inspector; 35 class Inspector;
36 class Synchronizer; 36 class Synchronizer;
37 class QueuedCommand; 37 class QueuedCommand;
38 class QueryBase;
38 39
39/** 40/**
40 * Drives the pipeline using the output from all command queues 41 * Drives the pipeline using the output from all command queues
@@ -45,13 +46,17 @@ class CommandProcessor : public QObject
45 SINK_DEBUG_AREA("commandprocessor") 46 SINK_DEBUG_AREA("commandprocessor")
46 47
47public: 48public:
48 CommandProcessor(Sink::Pipeline *pipeline, QList<MessageQueue *> commandQueues); 49 CommandProcessor(Sink::Pipeline *pipeline, const QByteArray &instanceId);
49 50
50 void setOldestUsedRevision(qint64 revision); 51 void setOldestUsedRevision(qint64 revision);
51 52
52 void setInspector(const QSharedPointer<Inspector> &inspector); 53 void setInspector(const QSharedPointer<Inspector> &inspector);
53 void setSynchronizer(const QSharedPointer<Synchronizer> &synchronizer); 54 void setSynchronizer(const QSharedPointer<Synchronizer> &synchronizer);
54 55
56 void processCommand(int commandId, const QByteArray &data);
57
58 KAsync::Job<void> processAllMessages();
59
55signals: 60signals:
56 void notify(Notification); 61 void notify(Notification);
57 void error(int errorCode, const QString &errorMessage); 62 void error(int errorCode, const QString &errorMessage);
@@ -68,9 +73,16 @@ private slots:
68 KAsync::Job<void> processPipeline(); 73 KAsync::Job<void> processPipeline();
69 74
70private: 75private:
76 void processFlushCommand(const QByteArray &data);
77 void processSynchronizeCommand(const QByteArray &data);
78 // void processRevisionReplayedCommand(const QByteArray &data);
79
71 KAsync::Job<void> flush(void const *command, size_t size); 80 KAsync::Job<void> flush(void const *command, size_t size);
81 KAsync::Job<void> synchronizeWithSource(const Sink::QueryBase &query);
72 82
73 Sink::Pipeline *mPipeline; 83 Sink::Pipeline *mPipeline;
84 MessageQueue mUserQueue;
85 MessageQueue mSynchronizerQueue;
74 // Ordered by priority 86 // Ordered by priority
75 QList<MessageQueue *> mCommandQueues; 87 QList<MessageQueue *> mCommandQueues;
76 bool mProcessingLock; 88 bool mProcessingLock;
@@ -78,6 +90,7 @@ private:
78 qint64 mLowerBoundRevision; 90 qint64 mLowerBoundRevision;
79 QSharedPointer<Synchronizer> mSynchronizer; 91 QSharedPointer<Synchronizer> mSynchronizer;
80 QSharedPointer<Inspector> mInspector; 92 QSharedPointer<Inspector> mInspector;
93 QTimer mCommitQueueTimer;
81}; 94};
82 95
83}; 96};