summaryrefslogtreecommitdiffstats
path: root/common/commandprocessor.cpp
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2016-11-28 23:04:59 +0100
committerChristian Mollekopf <chrigi_1@fastmail.fm>2016-11-28 23:04:59 +0100
commit3e7b8fe8b8cca75b546c8cac2c09ce231861f21b (patch)
tree36cf1849ff30a986de56d931d60ce1c88660ec83 /common/commandprocessor.cpp
parent7fdcc36a1a352bb869020ade8a8aa697c3e8b80c (diff)
downloadsink-3e7b8fe8b8cca75b546c8cac2c09ce231861f21b.tar.gz
sink-3e7b8fe8b8cca75b546c8cac2c09ce231861f21b.zip
Used the CommandProcessor as central place for all command processing.
Diffstat (limited to 'common/commandprocessor.cpp')
-rw-r--r--common/commandprocessor.cpp210
1 files changed, 208 insertions, 2 deletions
diff --git a/common/commandprocessor.cpp b/common/commandprocessor.cpp
index 57fe524..bdff905 100644
--- a/common/commandprocessor.cpp
+++ b/common/commandprocessor.cpp
@@ -19,25 +19,171 @@
19 */ 19 */
20#include "commandprocessor.h" 20#include "commandprocessor.h"
21 21
22#include <QDataStream>
23
22#include "commands.h" 24#include "commands.h"
23#include "messagequeue.h" 25#include "messagequeue.h"
24#include "queuedcommand_generated.h"
25#include "flush_generated.h" 26#include "flush_generated.h"
26#include "inspector.h" 27#include "inspector.h"
27#include "synchronizer.h" 28#include "synchronizer.h"
28#include "pipeline.h" 29#include "pipeline.h"
29#include "bufferutils.h" 30#include "bufferutils.h"
31#include "definitions.h"
32#include "storage.h"
33
34#include "queuedcommand_generated.h"
35#include "revisionreplayed_generated.h"
36#include "synchronize_generated.h"
30 37
31static int sBatchSize = 100; 38static int sBatchSize = 100;
39// This interval directly affects the roundtrip time of single commands
40static int sCommitInterval = 10;
41
32 42
33using namespace Sink; 43using namespace Sink;
44using namespace Sink::Storage;
34 45
35CommandProcessor::CommandProcessor(Sink::Pipeline *pipeline, QList<MessageQueue *> commandQueues) : QObject(), mPipeline(pipeline), mCommandQueues(commandQueues), mProcessingLock(false), mLowerBoundRevision(0) 46CommandProcessor::CommandProcessor(Sink::Pipeline *pipeline, const QByteArray &instanceId)
47 : QObject(),
48 mPipeline(pipeline),
49 mUserQueue(Sink::storageLocation(), instanceId + ".userqueue"),
50 mSynchronizerQueue(Sink::storageLocation(), instanceId + ".synchronizerqueue"),
51 mCommandQueues(QList<MessageQueue*>() << &mUserQueue << &mSynchronizerQueue), mProcessingLock(false), mLowerBoundRevision(0)
36{ 52{
37 for (auto queue : mCommandQueues) { 53 for (auto queue : mCommandQueues) {
38 const bool ret = connect(queue, &MessageQueue::messageReady, this, &CommandProcessor::process); 54 const bool ret = connect(queue, &MessageQueue::messageReady, this, &CommandProcessor::process);
39 Q_UNUSED(ret); 55 Q_UNUSED(ret);
40 } 56 }
57
58 mCommitQueueTimer.setInterval(sCommitInterval);
59 mCommitQueueTimer.setSingleShot(true);
60 QObject::connect(&mCommitQueueTimer, &QTimer::timeout, &mUserQueue, &MessageQueue::commit);
61}
62
63static void enqueueCommand(MessageQueue &mq, int commandId, const QByteArray &data)
64{
65 flatbuffers::FlatBufferBuilder fbb;
66 auto commandData = Sink::EntityBuffer::appendAsVector(fbb, data.constData(), data.size());
67 auto buffer = Sink::CreateQueuedCommand(fbb, commandId, commandData);
68 Sink::FinishQueuedCommandBuffer(fbb, buffer);
69 mq.enqueue(fbb.GetBufferPointer(), fbb.GetSize());
70}
71
72void CommandProcessor::processCommand(int commandId, const QByteArray &data)
73{
74 switch (commandId) {
75 case Commands::FlushCommand:
76 processFlushCommand(data);
77 break;
78 case Commands::SynchronizeCommand:
79 processSynchronizeCommand(data);
80 break;
81 // case Commands::RevisionReplayedCommand:
82 // processRevisionReplayedCommand(data);
83 // break;
84 default: {
85 static int modifications = 0;
86 mUserQueue.startTransaction();
87 enqueueCommand(mUserQueue, commandId, data);
88 modifications++;
89 if (modifications >= sBatchSize) {
90 mUserQueue.commit();
91 modifications = 0;
92 mCommitQueueTimer.stop();
93 } else {
94 mCommitQueueTimer.start();
95 }
96 }
97 };
98}
99
100void CommandProcessor::processFlushCommand(const QByteArray &data)
101{
102 flatbuffers::Verifier verifier((const uint8_t *)data.constData(), data.size());
103 if (Sink::Commands::VerifyFlushBuffer(verifier)) {
104 auto buffer = Sink::Commands::GetFlush(data.constData());
105 const auto flushType = buffer->type();
106 const auto flushId = BufferUtils::extractBuffer(buffer->id());
107 if (flushType == Sink::Flush::FlushSynchronization) {
108 mSynchronizer->flush(flushType, flushId);
109 } else {
110 mUserQueue.startTransaction();
111 enqueueCommand(mUserQueue, Commands::FlushCommand, data);
112 mUserQueue.commit();
113 }
114 }
115
116}
117
118void CommandProcessor::processSynchronizeCommand(const QByteArray &data)
119{
120 flatbuffers::Verifier verifier((const uint8_t *)data.constData(), data.size());
121 if (Sink::Commands::VerifySynchronizeBuffer(verifier)) {
122 auto buffer = Sink::Commands::GetSynchronize(data.constData());
123 auto timer = QSharedPointer<QTime>::create();
124 timer->start();
125 auto job = KAsync::null<void>();
126 Sink::QueryBase query;
127 if (buffer->query()) {
128 auto data = QByteArray::fromStdString(buffer->query()->str());
129 QDataStream stream(&data, QIODevice::ReadOnly);
130 stream >> query;
131 }
132 job = synchronizeWithSource(query);
133 job.then<void>([timer](const KAsync::Error &error) {
134 if (error) {
135 SinkWarning() << "Sync failed: " << error.errorMessage;
136 return KAsync::error(error);
137 } else {
138 SinkTrace() << "Sync took " << Sink::Log::TraceTime(timer->elapsed());
139 return KAsync::null();
140 }
141 })
142 .exec();
143 return;
144 } else {
145 SinkWarning() << "received invalid command";
146 }
147}
148
149// void CommandProcessor::processRevisionReplayedCommand(const QByteArray &data)
150// {
151// flatbuffers::Verifier verifier((const uint8_t *)commandBuffer.constData(), commandBuffer.size());
152// if (Sink::Commands::VerifyRevisionReplayedBuffer(verifier)) {
153// auto buffer = Sink::Commands::GetRevisionReplayed(commandBuffer.constData());
154// client.currentRevision = buffer->revision();
155// } else {
156// SinkWarning() << "received invalid command";
157// }
158// loadResource().setLowerBoundRevision(lowerBoundRevision());
159// }
160
161KAsync::Job<void> CommandProcessor::synchronizeWithSource(const Sink::QueryBase &query)
162{
163 return KAsync::start<void>([this, query] {
164 Sink::Notification n;
165 n.id = "sync";
166 n.type = Sink::Notification::Status;
167 n.message = "Synchronization has started.";
168 n.code = Sink::ApplicationDomain::BusyStatus;
169 emit notify(n);
170
171 SinkLog() << " Synchronizing";
172 return mSynchronizer->synchronize(query)
173 .then<void>([this](const KAsync::Error &error) {
174 if (!error) {
175 SinkLog() << "Done Synchronizing";
176 Sink::Notification n;
177 n.id = "sync";
178 n.type = Sink::Notification::Status;
179 n.message = "Synchronization has ended.";
180 n.code = Sink::ApplicationDomain::ConnectedStatus;
181 emit notify(n);
182 return KAsync::null();
183 }
184 return KAsync::error(error);
185 });
186 });
41} 187}
42 188
43void CommandProcessor::setOldestUsedRevision(qint64 revision) 189void CommandProcessor::setOldestUsedRevision(qint64 revision)
@@ -186,6 +332,27 @@ void CommandProcessor::setInspector(const QSharedPointer<Inspector> &inspector)
186void CommandProcessor::setSynchronizer(const QSharedPointer<Synchronizer> &synchronizer) 332void CommandProcessor::setSynchronizer(const QSharedPointer<Synchronizer> &synchronizer)
187{ 333{
188 mSynchronizer = synchronizer; 334 mSynchronizer = synchronizer;
335 mSynchronizer->setup([this](int commandId, const QByteArray &data) {
336 enqueueCommand(mSynchronizerQueue, commandId, data);
337 }, mSynchronizerQueue);
338
339 QObject::connect(mSynchronizer.data(), &Synchronizer::replayingChanges, [this]() {
340 Sink::Notification n;
341 n.id = "changereplay";
342 n.type = Sink::Notification::Status;
343 n.message = "Replaying changes.";
344 n.code = Sink::ApplicationDomain::BusyStatus;
345 emit notify(n);
346 });
347 QObject::connect(mSynchronizer.data(), &Synchronizer::changesReplayed, [this]() {
348 Sink::Notification n;
349 n.id = "changereplay";
350 n.type = Sink::Notification::Status;
351 n.message = "All changes have been replayed.";
352 n.code = Sink::ApplicationDomain::ConnectedStatus;
353 emit notify(n);
354 });
355
189 QObject::connect(mSynchronizer.data(), &Synchronizer::notify, this, &CommandProcessor::notify); 356 QObject::connect(mSynchronizer.data(), &Synchronizer::notify, this, &CommandProcessor::notify);
190 setOldestUsedRevision(mSynchronizer->getLastReplayedRevision()); 357 setOldestUsedRevision(mSynchronizer->getLastReplayedRevision());
191} 358}
@@ -213,3 +380,42 @@ KAsync::Job<void> CommandProcessor::flush(void const *command, size_t size)
213 return KAsync::error<void>(-1, "Invalid flush command."); 380 return KAsync::error<void>(-1, "Invalid flush command.");
214} 381}
215 382
383static void waitForDrained(KAsync::Future<void> &f, MessageQueue &queue)
384{
385 if (queue.isEmpty()) {
386 f.setFinished();
387 } else {
388 QObject::connect(&queue, &MessageQueue::drained, [&f]() { f.setFinished(); });
389 }
390};
391
392KAsync::Job<void> CommandProcessor::processAllMessages()
393{
394 // We have to wait for all items to be processed to ensure the synced items are available when a query gets executed.
395 // TODO: report errors while processing sync?
396 // TODO JOBAPI: A helper that waits for n events and then continues?
397 return KAsync::start<void>([this](KAsync::Future<void> &f) {
398 if (mCommitQueueTimer.isActive()) {
399 auto context = new QObject;
400 QObject::connect(&mCommitQueueTimer, &QTimer::timeout, context, [&f, context]() {
401 delete context;
402 f.setFinished();
403 });
404 } else {
405 f.setFinished();
406 }
407 })
408 .then<void>([this](KAsync::Future<void> &f) { waitForDrained(f, mSynchronizerQueue); })
409 .then<void>([this](KAsync::Future<void> &f) { waitForDrained(f, mUserQueue); })
410 .then<void>([this](KAsync::Future<void> &f) {
411 if (mSynchronizer->allChangesReplayed()) {
412 f.setFinished();
413 } else {
414 auto context = new QObject;
415 QObject::connect(mSynchronizer.data(), &ChangeReplay::changesReplayed, context, [&f, context]() {
416 delete context;
417 f.setFinished();
418 });
419 }
420 });
421}