diff options
author | Christian Mollekopf <chrigi_1@fastmail.fm> | 2016-11-28 23:04:59 +0100 |
---|---|---|
committer | Christian Mollekopf <chrigi_1@fastmail.fm> | 2016-11-28 23:04:59 +0100 |
commit | 3e7b8fe8b8cca75b546c8cac2c09ce231861f21b (patch) | |
tree | 36cf1849ff30a986de56d931d60ce1c88660ec83 /common/genericresource.cpp | |
parent | 7fdcc36a1a352bb869020ade8a8aa697c3e8b80c (diff) | |
download | sink-3e7b8fe8b8cca75b546c8cac2c09ce231861f21b.tar.gz sink-3e7b8fe8b8cca75b546c8cac2c09ce231861f21b.zip |
Used the CommandProcessor as central place for all command processing.
Diffstat (limited to 'common/genericresource.cpp')
-rw-r--r-- | common/genericresource.cpp | 167 |
1 files changed, 8 insertions, 159 deletions
diff --git a/common/genericresource.cpp b/common/genericresource.cpp index 38da6bf..82112b3 100644 --- a/common/genericresource.cpp +++ b/common/genericresource.cpp | |||
@@ -19,32 +19,13 @@ | |||
19 | */ | 19 | */ |
20 | #include "genericresource.h" | 20 | #include "genericresource.h" |
21 | 21 | ||
22 | #include "entitybuffer.h" | ||
23 | #include "pipeline.h" | 22 | #include "pipeline.h" |
24 | #include "queuedcommand_generated.h" | ||
25 | #include "createentity_generated.h" | ||
26 | #include "modifyentity_generated.h" | ||
27 | #include "deleteentity_generated.h" | ||
28 | #include "inspection_generated.h" | ||
29 | #include "notification_generated.h" | ||
30 | #include "flush_generated.h" | ||
31 | #include "domainadaptor.h" | 23 | #include "domainadaptor.h" |
32 | #include "commands.h" | ||
33 | #include "index.h" | ||
34 | #include "log.h" | 24 | #include "log.h" |
35 | #include "definitions.h" | ||
36 | #include "bufferutils.h" | ||
37 | #include "adaptorfactoryregistry.h" | ||
38 | #include "synchronizer.h" | 25 | #include "synchronizer.h" |
39 | #include "commandprocessor.h" | 26 | #include "commandprocessor.h" |
40 | 27 | #include "definitions.h" | |
41 | #include <QUuid> | 28 | #include "storage.h" |
42 | #include <QDataStream> | ||
43 | #include <QTime> | ||
44 | |||
45 | static int sBatchSize = 100; | ||
46 | // This interval directly affects the roundtrip time of single commands | ||
47 | static int sCommitInterval = 10; | ||
48 | 29 | ||
49 | using namespace Sink; | 30 | using namespace Sink; |
50 | using namespace Sink::Storage; | 31 | using namespace Sink::Storage; |
@@ -52,20 +33,14 @@ using namespace Sink::Storage; | |||
52 | GenericResource::GenericResource(const ResourceContext &resourceContext, const QSharedPointer<Pipeline> &pipeline ) | 33 | GenericResource::GenericResource(const ResourceContext &resourceContext, const QSharedPointer<Pipeline> &pipeline ) |
53 | : Sink::Resource(), | 34 | : Sink::Resource(), |
54 | mResourceContext(resourceContext), | 35 | mResourceContext(resourceContext), |
55 | mUserQueue(Sink::storageLocation(), resourceContext.instanceId() + ".userqueue"), | ||
56 | mSynchronizerQueue(Sink::storageLocation(), resourceContext.instanceId() + ".synchronizerqueue"), | ||
57 | mPipeline(pipeline ? pipeline : QSharedPointer<Sink::Pipeline>::create(resourceContext)), | 36 | mPipeline(pipeline ? pipeline : QSharedPointer<Sink::Pipeline>::create(resourceContext)), |
37 | mProcessor(QSharedPointer<CommandProcessor>::create(mPipeline.data(), resourceContext.instanceId())), | ||
58 | mError(0), | 38 | mError(0), |
59 | mClientLowerBoundRevision(std::numeric_limits<qint64>::max()) | 39 | mClientLowerBoundRevision(std::numeric_limits<qint64>::max()) |
60 | { | 40 | { |
61 | mProcessor = std::unique_ptr<CommandProcessor>(new CommandProcessor(mPipeline.data(), QList<MessageQueue *>() << &mUserQueue << &mSynchronizerQueue)); | 41 | QObject::connect(mProcessor.data(), &CommandProcessor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); }); |
62 | QObject::connect(mProcessor.get(), &CommandProcessor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); }); | 42 | QObject::connect(mProcessor.data(), &CommandProcessor::notify, this, &GenericResource::notify); |
63 | QObject::connect(mProcessor.get(), &CommandProcessor::notify, this, &GenericResource::notify); | ||
64 | QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, this, &Resource::revisionUpdated); | 43 | QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, this, &Resource::revisionUpdated); |
65 | |||
66 | mCommitQueueTimer.setInterval(sCommitInterval); | ||
67 | mCommitQueueTimer.setSingleShot(true); | ||
68 | QObject::connect(&mCommitQueueTimer, &QTimer::timeout, &mUserQueue, &MessageQueue::commit); | ||
69 | } | 44 | } |
70 | 45 | ||
71 | GenericResource::~GenericResource() | 46 | GenericResource::~GenericResource() |
@@ -80,32 +55,6 @@ void GenericResource::setupPreprocessors(const QByteArray &type, const QVector<S | |||
80 | void GenericResource::setupSynchronizer(const QSharedPointer<Synchronizer> &synchronizer) | 55 | void GenericResource::setupSynchronizer(const QSharedPointer<Synchronizer> &synchronizer) |
81 | { | 56 | { |
82 | mSynchronizer = synchronizer; | 57 | mSynchronizer = synchronizer; |
83 | mSynchronizer->setup([this](int commandId, const QByteArray &data) { | ||
84 | enqueueCommand(mSynchronizerQueue, commandId, data); | ||
85 | }, mSynchronizerQueue); | ||
86 | { | ||
87 | auto ret = QObject::connect(mSynchronizer.data(), &Synchronizer::replayingChanges, [this]() { | ||
88 | Sink::Notification n; | ||
89 | n.id = "changereplay"; | ||
90 | n.type = Sink::Notification::Status; | ||
91 | n.message = "Replaying changes."; | ||
92 | n.code = Sink::ApplicationDomain::BusyStatus; | ||
93 | emit notify(n); | ||
94 | }); | ||
95 | Q_ASSERT(ret); | ||
96 | } | ||
97 | { | ||
98 | auto ret = QObject::connect(mSynchronizer.data(), &Synchronizer::changesReplayed, [this]() { | ||
99 | Sink::Notification n; | ||
100 | n.id = "changereplay"; | ||
101 | n.type = Sink::Notification::Status; | ||
102 | n.message = "All changes have been replayed."; | ||
103 | n.code = Sink::ApplicationDomain::ConnectedStatus; | ||
104 | emit notify(n); | ||
105 | }); | ||
106 | Q_ASSERT(ret); | ||
107 | } | ||
108 | |||
109 | mProcessor->setSynchronizer(synchronizer); | 58 | mProcessor->setSynchronizer(synchronizer); |
110 | QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, mSynchronizer.data(), &ChangeReplay::revisionChanged, Qt::QueuedConnection); | 59 | QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, mSynchronizer.data(), &ChangeReplay::revisionChanged, Qt::QueuedConnection); |
111 | QObject::connect(mSynchronizer.data(), &ChangeReplay::changesReplayed, this, &GenericResource::updateLowerBoundRevision); | 60 | QObject::connect(mSynchronizer.data(), &ChangeReplay::changesReplayed, this, &GenericResource::updateLowerBoundRevision); |
@@ -146,119 +95,19 @@ int GenericResource::error() const | |||
146 | return mError; | 95 | return mError; |
147 | } | 96 | } |
148 | 97 | ||
149 | void GenericResource::enqueueCommand(MessageQueue &mq, int commandId, const QByteArray &data) | ||
150 | { | ||
151 | flatbuffers::FlatBufferBuilder fbb; | ||
152 | auto commandData = Sink::EntityBuffer::appendAsVector(fbb, data.constData(), data.size()); | ||
153 | auto buffer = Sink::CreateQueuedCommand(fbb, commandId, commandData); | ||
154 | Sink::FinishQueuedCommandBuffer(fbb, buffer); | ||
155 | mq.enqueue(fbb.GetBufferPointer(), fbb.GetSize()); | ||
156 | } | ||
157 | |||
158 | void GenericResource::processCommand(int commandId, const QByteArray &data) | 98 | void GenericResource::processCommand(int commandId, const QByteArray &data) |
159 | { | 99 | { |
160 | if (commandId == Commands::FlushCommand) { | 100 | mProcessor->processCommand(commandId, data); |
161 | processFlushCommand(data); | ||
162 | return; | ||
163 | } | ||
164 | static int modifications = 0; | ||
165 | mUserQueue.startTransaction(); | ||
166 | enqueueCommand(mUserQueue, commandId, data); | ||
167 | modifications++; | ||
168 | if (modifications >= sBatchSize) { | ||
169 | mUserQueue.commit(); | ||
170 | modifications = 0; | ||
171 | mCommitQueueTimer.stop(); | ||
172 | } else { | ||
173 | mCommitQueueTimer.start(); | ||
174 | } | ||
175 | } | ||
176 | |||
177 | void GenericResource::processFlushCommand(const QByteArray &data) | ||
178 | { | ||
179 | flatbuffers::Verifier verifier((const uint8_t *)data.constData(), data.size()); | ||
180 | if (Sink::Commands::VerifyFlushBuffer(verifier)) { | ||
181 | auto buffer = Sink::Commands::GetFlush(data.constData()); | ||
182 | const auto flushType = buffer->type(); | ||
183 | const auto flushId = BufferUtils::extractBuffer(buffer->id()); | ||
184 | if (flushType == Sink::Flush::FlushSynchronization) { | ||
185 | mSynchronizer->flush(flushType, flushId); | ||
186 | } else { | ||
187 | mUserQueue.startTransaction(); | ||
188 | enqueueCommand(mUserQueue, Commands::FlushCommand, data); | ||
189 | mUserQueue.commit(); | ||
190 | } | ||
191 | } | ||
192 | |||
193 | } | 101 | } |
194 | 102 | ||
195 | KAsync::Job<void> GenericResource::synchronizeWithSource(const Sink::QueryBase &query) | 103 | KAsync::Job<void> GenericResource::synchronizeWithSource(const Sink::QueryBase &query) |
196 | { | 104 | { |
197 | return KAsync::start<void>([this, query] { | 105 | return mSynchronizer->synchronize(query); |
198 | |||
199 | Sink::Notification n; | ||
200 | n.id = "sync"; | ||
201 | n.type = Sink::Notification::Status; | ||
202 | n.message = "Synchronization has started."; | ||
203 | n.code = Sink::ApplicationDomain::BusyStatus; | ||
204 | emit notify(n); | ||
205 | |||
206 | SinkLog() << " Synchronizing"; | ||
207 | return mSynchronizer->synchronize(query) | ||
208 | .then<void>([this](const KAsync::Error &error) { | ||
209 | if (!error) { | ||
210 | SinkLog() << "Done Synchronizing"; | ||
211 | Sink::Notification n; | ||
212 | n.id = "sync"; | ||
213 | n.type = Sink::Notification::Status; | ||
214 | n.message = "Synchronization has ended."; | ||
215 | n.code = Sink::ApplicationDomain::ConnectedStatus; | ||
216 | emit notify(n); | ||
217 | return KAsync::null(); | ||
218 | } | ||
219 | return KAsync::error(error); | ||
220 | }); | ||
221 | }); | ||
222 | } | 106 | } |
223 | 107 | ||
224 | static void waitForDrained(KAsync::Future<void> &f, MessageQueue &queue) | ||
225 | { | ||
226 | if (queue.isEmpty()) { | ||
227 | f.setFinished(); | ||
228 | } else { | ||
229 | QObject::connect(&queue, &MessageQueue::drained, [&f]() { f.setFinished(); }); | ||
230 | } | ||
231 | }; | ||
232 | |||
233 | KAsync::Job<void> GenericResource::processAllMessages() | 108 | KAsync::Job<void> GenericResource::processAllMessages() |
234 | { | 109 | { |
235 | // We have to wait for all items to be processed to ensure the synced items are available when a query gets executed. | 110 | return mProcessor->processAllMessages(); |
236 | // TODO: report errors while processing sync? | ||
237 | // TODO JOBAPI: A helper that waits for n events and then continues? | ||
238 | return KAsync::start<void>([this](KAsync::Future<void> &f) { | ||
239 | if (mCommitQueueTimer.isActive()) { | ||
240 | auto context = new QObject; | ||
241 | QObject::connect(&mCommitQueueTimer, &QTimer::timeout, context, [&f, context]() { | ||
242 | delete context; | ||
243 | f.setFinished(); | ||
244 | }); | ||
245 | } else { | ||
246 | f.setFinished(); | ||
247 | } | ||
248 | }) | ||
249 | .then<void>([this](KAsync::Future<void> &f) { waitForDrained(f, mSynchronizerQueue); }) | ||
250 | .then<void>([this](KAsync::Future<void> &f) { waitForDrained(f, mUserQueue); }) | ||
251 | .then<void>([this](KAsync::Future<void> &f) { | ||
252 | if (mSynchronizer->allChangesReplayed()) { | ||
253 | f.setFinished(); | ||
254 | } else { | ||
255 | auto context = new QObject; | ||
256 | QObject::connect(mSynchronizer.data(), &ChangeReplay::changesReplayed, context, [&f, context]() { | ||
257 | delete context; | ||
258 | f.setFinished(); | ||
259 | }); | ||
260 | } | ||
261 | }); | ||
262 | } | 111 | } |
263 | 112 | ||
264 | void GenericResource::updateLowerBoundRevision() | 113 | void GenericResource::updateLowerBoundRevision() |