diff options
author | Christian Mollekopf <chrigi_1@fastmail.fm> | 2016-11-28 23:04:59 +0100 |
---|---|---|
committer | Christian Mollekopf <chrigi_1@fastmail.fm> | 2016-11-28 23:04:59 +0100 |
commit | 3e7b8fe8b8cca75b546c8cac2c09ce231861f21b (patch) | |
tree | 36cf1849ff30a986de56d931d60ce1c88660ec83 /common/commandprocessor.cpp | |
parent | 7fdcc36a1a352bb869020ade8a8aa697c3e8b80c (diff) | |
download | sink-3e7b8fe8b8cca75b546c8cac2c09ce231861f21b.tar.gz sink-3e7b8fe8b8cca75b546c8cac2c09ce231861f21b.zip |
Used the CommandProcessor as central place for all command processing.
Diffstat (limited to 'common/commandprocessor.cpp')
-rw-r--r-- | common/commandprocessor.cpp | 210 |
1 files changed, 208 insertions, 2 deletions
diff --git a/common/commandprocessor.cpp b/common/commandprocessor.cpp index 57fe524..bdff905 100644 --- a/common/commandprocessor.cpp +++ b/common/commandprocessor.cpp | |||
@@ -19,25 +19,171 @@ | |||
19 | */ | 19 | */ |
20 | #include "commandprocessor.h" | 20 | #include "commandprocessor.h" |
21 | 21 | ||
22 | #include <QDataStream> | ||
23 | |||
22 | #include "commands.h" | 24 | #include "commands.h" |
23 | #include "messagequeue.h" | 25 | #include "messagequeue.h" |
24 | #include "queuedcommand_generated.h" | ||
25 | #include "flush_generated.h" | 26 | #include "flush_generated.h" |
26 | #include "inspector.h" | 27 | #include "inspector.h" |
27 | #include "synchronizer.h" | 28 | #include "synchronizer.h" |
28 | #include "pipeline.h" | 29 | #include "pipeline.h" |
29 | #include "bufferutils.h" | 30 | #include "bufferutils.h" |
31 | #include "definitions.h" | ||
32 | #include "storage.h" | ||
33 | |||
34 | #include "queuedcommand_generated.h" | ||
35 | #include "revisionreplayed_generated.h" | ||
36 | #include "synchronize_generated.h" | ||
30 | 37 | ||
31 | static int sBatchSize = 100; | 38 | static int sBatchSize = 100; |
39 | // This interval directly affects the roundtrip time of single commands | ||
40 | static int sCommitInterval = 10; | ||
41 | |||
32 | 42 | ||
33 | using namespace Sink; | 43 | using namespace Sink; |
44 | using namespace Sink::Storage; | ||
34 | 45 | ||
35 | CommandProcessor::CommandProcessor(Sink::Pipeline *pipeline, QList<MessageQueue *> commandQueues) : QObject(), mPipeline(pipeline), mCommandQueues(commandQueues), mProcessingLock(false), mLowerBoundRevision(0) | 46 | CommandProcessor::CommandProcessor(Sink::Pipeline *pipeline, const QByteArray &instanceId) |
47 | : QObject(), | ||
48 | mPipeline(pipeline), | ||
49 | mUserQueue(Sink::storageLocation(), instanceId + ".userqueue"), | ||
50 | mSynchronizerQueue(Sink::storageLocation(), instanceId + ".synchronizerqueue"), | ||
51 | mCommandQueues(QList<MessageQueue*>() << &mUserQueue << &mSynchronizerQueue), mProcessingLock(false), mLowerBoundRevision(0) | ||
36 | { | 52 | { |
37 | for (auto queue : mCommandQueues) { | 53 | for (auto queue : mCommandQueues) { |
38 | const bool ret = connect(queue, &MessageQueue::messageReady, this, &CommandProcessor::process); | 54 | const bool ret = connect(queue, &MessageQueue::messageReady, this, &CommandProcessor::process); |
39 | Q_UNUSED(ret); | 55 | Q_UNUSED(ret); |
40 | } | 56 | } |
57 | |||
58 | mCommitQueueTimer.setInterval(sCommitInterval); | ||
59 | mCommitQueueTimer.setSingleShot(true); | ||
60 | QObject::connect(&mCommitQueueTimer, &QTimer::timeout, &mUserQueue, &MessageQueue::commit); | ||
61 | } | ||
62 | |||
63 | static void enqueueCommand(MessageQueue &mq, int commandId, const QByteArray &data) | ||
64 | { | ||
65 | flatbuffers::FlatBufferBuilder fbb; | ||
66 | auto commandData = Sink::EntityBuffer::appendAsVector(fbb, data.constData(), data.size()); | ||
67 | auto buffer = Sink::CreateQueuedCommand(fbb, commandId, commandData); | ||
68 | Sink::FinishQueuedCommandBuffer(fbb, buffer); | ||
69 | mq.enqueue(fbb.GetBufferPointer(), fbb.GetSize()); | ||
70 | } | ||
71 | |||
72 | void CommandProcessor::processCommand(int commandId, const QByteArray &data) | ||
73 | { | ||
74 | switch (commandId) { | ||
75 | case Commands::FlushCommand: | ||
76 | processFlushCommand(data); | ||
77 | break; | ||
78 | case Commands::SynchronizeCommand: | ||
79 | processSynchronizeCommand(data); | ||
80 | break; | ||
81 | // case Commands::RevisionReplayedCommand: | ||
82 | // processRevisionReplayedCommand(data); | ||
83 | // break; | ||
84 | default: { | ||
85 | static int modifications = 0; | ||
86 | mUserQueue.startTransaction(); | ||
87 | enqueueCommand(mUserQueue, commandId, data); | ||
88 | modifications++; | ||
89 | if (modifications >= sBatchSize) { | ||
90 | mUserQueue.commit(); | ||
91 | modifications = 0; | ||
92 | mCommitQueueTimer.stop(); | ||
93 | } else { | ||
94 | mCommitQueueTimer.start(); | ||
95 | } | ||
96 | } | ||
97 | }; | ||
98 | } | ||
99 | |||
100 | void CommandProcessor::processFlushCommand(const QByteArray &data) | ||
101 | { | ||
102 | flatbuffers::Verifier verifier((const uint8_t *)data.constData(), data.size()); | ||
103 | if (Sink::Commands::VerifyFlushBuffer(verifier)) { | ||
104 | auto buffer = Sink::Commands::GetFlush(data.constData()); | ||
105 | const auto flushType = buffer->type(); | ||
106 | const auto flushId = BufferUtils::extractBuffer(buffer->id()); | ||
107 | if (flushType == Sink::Flush::FlushSynchronization) { | ||
108 | mSynchronizer->flush(flushType, flushId); | ||
109 | } else { | ||
110 | mUserQueue.startTransaction(); | ||
111 | enqueueCommand(mUserQueue, Commands::FlushCommand, data); | ||
112 | mUserQueue.commit(); | ||
113 | } | ||
114 | } | ||
115 | |||
116 | } | ||
117 | |||
118 | void CommandProcessor::processSynchronizeCommand(const QByteArray &data) | ||
119 | { | ||
120 | flatbuffers::Verifier verifier((const uint8_t *)data.constData(), data.size()); | ||
121 | if (Sink::Commands::VerifySynchronizeBuffer(verifier)) { | ||
122 | auto buffer = Sink::Commands::GetSynchronize(data.constData()); | ||
123 | auto timer = QSharedPointer<QTime>::create(); | ||
124 | timer->start(); | ||
125 | auto job = KAsync::null<void>(); | ||
126 | Sink::QueryBase query; | ||
127 | if (buffer->query()) { | ||
128 | auto data = QByteArray::fromStdString(buffer->query()->str()); | ||
129 | QDataStream stream(&data, QIODevice::ReadOnly); | ||
130 | stream >> query; | ||
131 | } | ||
132 | job = synchronizeWithSource(query); | ||
133 | job.then<void>([timer](const KAsync::Error &error) { | ||
134 | if (error) { | ||
135 | SinkWarning() << "Sync failed: " << error.errorMessage; | ||
136 | return KAsync::error(error); | ||
137 | } else { | ||
138 | SinkTrace() << "Sync took " << Sink::Log::TraceTime(timer->elapsed()); | ||
139 | return KAsync::null(); | ||
140 | } | ||
141 | }) | ||
142 | .exec(); | ||
143 | return; | ||
144 | } else { | ||
145 | SinkWarning() << "received invalid command"; | ||
146 | } | ||
147 | } | ||
148 | |||
149 | // void CommandProcessor::processRevisionReplayedCommand(const QByteArray &data) | ||
150 | // { | ||
151 | // flatbuffers::Verifier verifier((const uint8_t *)commandBuffer.constData(), commandBuffer.size()); | ||
152 | // if (Sink::Commands::VerifyRevisionReplayedBuffer(verifier)) { | ||
153 | // auto buffer = Sink::Commands::GetRevisionReplayed(commandBuffer.constData()); | ||
154 | // client.currentRevision = buffer->revision(); | ||
155 | // } else { | ||
156 | // SinkWarning() << "received invalid command"; | ||
157 | // } | ||
158 | // loadResource().setLowerBoundRevision(lowerBoundRevision()); | ||
159 | // } | ||
160 | |||
161 | KAsync::Job<void> CommandProcessor::synchronizeWithSource(const Sink::QueryBase &query) | ||
162 | { | ||
163 | return KAsync::start<void>([this, query] { | ||
164 | Sink::Notification n; | ||
165 | n.id = "sync"; | ||
166 | n.type = Sink::Notification::Status; | ||
167 | n.message = "Synchronization has started."; | ||
168 | n.code = Sink::ApplicationDomain::BusyStatus; | ||
169 | emit notify(n); | ||
170 | |||
171 | SinkLog() << " Synchronizing"; | ||
172 | return mSynchronizer->synchronize(query) | ||
173 | .then<void>([this](const KAsync::Error &error) { | ||
174 | if (!error) { | ||
175 | SinkLog() << "Done Synchronizing"; | ||
176 | Sink::Notification n; | ||
177 | n.id = "sync"; | ||
178 | n.type = Sink::Notification::Status; | ||
179 | n.message = "Synchronization has ended."; | ||
180 | n.code = Sink::ApplicationDomain::ConnectedStatus; | ||
181 | emit notify(n); | ||
182 | return KAsync::null(); | ||
183 | } | ||
184 | return KAsync::error(error); | ||
185 | }); | ||
186 | }); | ||
41 | } | 187 | } |
42 | 188 | ||
43 | void CommandProcessor::setOldestUsedRevision(qint64 revision) | 189 | void CommandProcessor::setOldestUsedRevision(qint64 revision) |
@@ -186,6 +332,27 @@ void CommandProcessor::setInspector(const QSharedPointer<Inspector> &inspector) | |||
186 | void CommandProcessor::setSynchronizer(const QSharedPointer<Synchronizer> &synchronizer) | 332 | void CommandProcessor::setSynchronizer(const QSharedPointer<Synchronizer> &synchronizer) |
187 | { | 333 | { |
188 | mSynchronizer = synchronizer; | 334 | mSynchronizer = synchronizer; |
335 | mSynchronizer->setup([this](int commandId, const QByteArray &data) { | ||
336 | enqueueCommand(mSynchronizerQueue, commandId, data); | ||
337 | }, mSynchronizerQueue); | ||
338 | |||
339 | QObject::connect(mSynchronizer.data(), &Synchronizer::replayingChanges, [this]() { | ||
340 | Sink::Notification n; | ||
341 | n.id = "changereplay"; | ||
342 | n.type = Sink::Notification::Status; | ||
343 | n.message = "Replaying changes."; | ||
344 | n.code = Sink::ApplicationDomain::BusyStatus; | ||
345 | emit notify(n); | ||
346 | }); | ||
347 | QObject::connect(mSynchronizer.data(), &Synchronizer::changesReplayed, [this]() { | ||
348 | Sink::Notification n; | ||
349 | n.id = "changereplay"; | ||
350 | n.type = Sink::Notification::Status; | ||
351 | n.message = "All changes have been replayed."; | ||
352 | n.code = Sink::ApplicationDomain::ConnectedStatus; | ||
353 | emit notify(n); | ||
354 | }); | ||
355 | |||
189 | QObject::connect(mSynchronizer.data(), &Synchronizer::notify, this, &CommandProcessor::notify); | 356 | QObject::connect(mSynchronizer.data(), &Synchronizer::notify, this, &CommandProcessor::notify); |
190 | setOldestUsedRevision(mSynchronizer->getLastReplayedRevision()); | 357 | setOldestUsedRevision(mSynchronizer->getLastReplayedRevision()); |
191 | } | 358 | } |
@@ -213,3 +380,42 @@ KAsync::Job<void> CommandProcessor::flush(void const *command, size_t size) | |||
213 | return KAsync::error<void>(-1, "Invalid flush command."); | 380 | return KAsync::error<void>(-1, "Invalid flush command."); |
214 | } | 381 | } |
215 | 382 | ||
383 | static void waitForDrained(KAsync::Future<void> &f, MessageQueue &queue) | ||
384 | { | ||
385 | if (queue.isEmpty()) { | ||
386 | f.setFinished(); | ||
387 | } else { | ||
388 | QObject::connect(&queue, &MessageQueue::drained, [&f]() { f.setFinished(); }); | ||
389 | } | ||
390 | }; | ||
391 | |||
392 | KAsync::Job<void> CommandProcessor::processAllMessages() | ||
393 | { | ||
394 | // We have to wait for all items to be processed to ensure the synced items are available when a query gets executed. | ||
395 | // TODO: report errors while processing sync? | ||
396 | // TODO JOBAPI: A helper that waits for n events and then continues? | ||
397 | return KAsync::start<void>([this](KAsync::Future<void> &f) { | ||
398 | if (mCommitQueueTimer.isActive()) { | ||
399 | auto context = new QObject; | ||
400 | QObject::connect(&mCommitQueueTimer, &QTimer::timeout, context, [&f, context]() { | ||
401 | delete context; | ||
402 | f.setFinished(); | ||
403 | }); | ||
404 | } else { | ||
405 | f.setFinished(); | ||
406 | } | ||
407 | }) | ||
408 | .then<void>([this](KAsync::Future<void> &f) { waitForDrained(f, mSynchronizerQueue); }) | ||
409 | .then<void>([this](KAsync::Future<void> &f) { waitForDrained(f, mUserQueue); }) | ||
410 | .then<void>([this](KAsync::Future<void> &f) { | ||
411 | if (mSynchronizer->allChangesReplayed()) { | ||
412 | f.setFinished(); | ||
413 | } else { | ||
414 | auto context = new QObject; | ||
415 | QObject::connect(mSynchronizer.data(), &ChangeReplay::changesReplayed, context, [&f, context]() { | ||
416 | delete context; | ||
417 | f.setFinished(); | ||
418 | }); | ||
419 | } | ||
420 | }); | ||
421 | } | ||