summaryrefslogtreecommitdiffstats
path: root/common/genericresource.cpp
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2016-11-28 23:04:59 +0100
committerChristian Mollekopf <chrigi_1@fastmail.fm>2016-11-28 23:04:59 +0100
commit3e7b8fe8b8cca75b546c8cac2c09ce231861f21b (patch)
tree36cf1849ff30a986de56d931d60ce1c88660ec83 /common/genericresource.cpp
parent7fdcc36a1a352bb869020ade8a8aa697c3e8b80c (diff)
downloadsink-3e7b8fe8b8cca75b546c8cac2c09ce231861f21b.tar.gz
sink-3e7b8fe8b8cca75b546c8cac2c09ce231861f21b.zip
Used the CommandProcessor as central place for all command processing.
Diffstat (limited to 'common/genericresource.cpp')
-rw-r--r--common/genericresource.cpp167
1 files changed, 8 insertions, 159 deletions
diff --git a/common/genericresource.cpp b/common/genericresource.cpp
index 38da6bf..82112b3 100644
--- a/common/genericresource.cpp
+++ b/common/genericresource.cpp
@@ -19,32 +19,13 @@
19 */ 19 */
20#include "genericresource.h" 20#include "genericresource.h"
21 21
22#include "entitybuffer.h"
23#include "pipeline.h" 22#include "pipeline.h"
24#include "queuedcommand_generated.h"
25#include "createentity_generated.h"
26#include "modifyentity_generated.h"
27#include "deleteentity_generated.h"
28#include "inspection_generated.h"
29#include "notification_generated.h"
30#include "flush_generated.h"
31#include "domainadaptor.h" 23#include "domainadaptor.h"
32#include "commands.h"
33#include "index.h"
34#include "log.h" 24#include "log.h"
35#include "definitions.h"
36#include "bufferutils.h"
37#include "adaptorfactoryregistry.h"
38#include "synchronizer.h" 25#include "synchronizer.h"
39#include "commandprocessor.h" 26#include "commandprocessor.h"
40 27#include "definitions.h"
41#include <QUuid> 28#include "storage.h"
42#include <QDataStream>
43#include <QTime>
44
45static int sBatchSize = 100;
46// This interval directly affects the roundtrip time of single commands
47static int sCommitInterval = 10;
48 29
49using namespace Sink; 30using namespace Sink;
50using namespace Sink::Storage; 31using namespace Sink::Storage;
@@ -52,20 +33,14 @@ using namespace Sink::Storage;
52GenericResource::GenericResource(const ResourceContext &resourceContext, const QSharedPointer<Pipeline> &pipeline ) 33GenericResource::GenericResource(const ResourceContext &resourceContext, const QSharedPointer<Pipeline> &pipeline )
53 : Sink::Resource(), 34 : Sink::Resource(),
54 mResourceContext(resourceContext), 35 mResourceContext(resourceContext),
55 mUserQueue(Sink::storageLocation(), resourceContext.instanceId() + ".userqueue"),
56 mSynchronizerQueue(Sink::storageLocation(), resourceContext.instanceId() + ".synchronizerqueue"),
57 mPipeline(pipeline ? pipeline : QSharedPointer<Sink::Pipeline>::create(resourceContext)), 36 mPipeline(pipeline ? pipeline : QSharedPointer<Sink::Pipeline>::create(resourceContext)),
37 mProcessor(QSharedPointer<CommandProcessor>::create(mPipeline.data(), resourceContext.instanceId())),
58 mError(0), 38 mError(0),
59 mClientLowerBoundRevision(std::numeric_limits<qint64>::max()) 39 mClientLowerBoundRevision(std::numeric_limits<qint64>::max())
60{ 40{
61 mProcessor = std::unique_ptr<CommandProcessor>(new CommandProcessor(mPipeline.data(), QList<MessageQueue *>() << &mUserQueue << &mSynchronizerQueue)); 41 QObject::connect(mProcessor.data(), &CommandProcessor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); });
62 QObject::connect(mProcessor.get(), &CommandProcessor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); }); 42 QObject::connect(mProcessor.data(), &CommandProcessor::notify, this, &GenericResource::notify);
63 QObject::connect(mProcessor.get(), &CommandProcessor::notify, this, &GenericResource::notify);
64 QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, this, &Resource::revisionUpdated); 43 QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, this, &Resource::revisionUpdated);
65
66 mCommitQueueTimer.setInterval(sCommitInterval);
67 mCommitQueueTimer.setSingleShot(true);
68 QObject::connect(&mCommitQueueTimer, &QTimer::timeout, &mUserQueue, &MessageQueue::commit);
69} 44}
70 45
71GenericResource::~GenericResource() 46GenericResource::~GenericResource()
@@ -80,32 +55,6 @@ void GenericResource::setupPreprocessors(const QByteArray &type, const QVector<S
80void GenericResource::setupSynchronizer(const QSharedPointer<Synchronizer> &synchronizer) 55void GenericResource::setupSynchronizer(const QSharedPointer<Synchronizer> &synchronizer)
81{ 56{
82 mSynchronizer = synchronizer; 57 mSynchronizer = synchronizer;
83 mSynchronizer->setup([this](int commandId, const QByteArray &data) {
84 enqueueCommand(mSynchronizerQueue, commandId, data);
85 }, mSynchronizerQueue);
86 {
87 auto ret = QObject::connect(mSynchronizer.data(), &Synchronizer::replayingChanges, [this]() {
88 Sink::Notification n;
89 n.id = "changereplay";
90 n.type = Sink::Notification::Status;
91 n.message = "Replaying changes.";
92 n.code = Sink::ApplicationDomain::BusyStatus;
93 emit notify(n);
94 });
95 Q_ASSERT(ret);
96 }
97 {
98 auto ret = QObject::connect(mSynchronizer.data(), &Synchronizer::changesReplayed, [this]() {
99 Sink::Notification n;
100 n.id = "changereplay";
101 n.type = Sink::Notification::Status;
102 n.message = "All changes have been replayed.";
103 n.code = Sink::ApplicationDomain::ConnectedStatus;
104 emit notify(n);
105 });
106 Q_ASSERT(ret);
107 }
108
109 mProcessor->setSynchronizer(synchronizer); 58 mProcessor->setSynchronizer(synchronizer);
110 QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, mSynchronizer.data(), &ChangeReplay::revisionChanged, Qt::QueuedConnection); 59 QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, mSynchronizer.data(), &ChangeReplay::revisionChanged, Qt::QueuedConnection);
111 QObject::connect(mSynchronizer.data(), &ChangeReplay::changesReplayed, this, &GenericResource::updateLowerBoundRevision); 60 QObject::connect(mSynchronizer.data(), &ChangeReplay::changesReplayed, this, &GenericResource::updateLowerBoundRevision);
@@ -146,119 +95,19 @@ int GenericResource::error() const
146 return mError; 95 return mError;
147} 96}
148 97
149void GenericResource::enqueueCommand(MessageQueue &mq, int commandId, const QByteArray &data)
150{
151 flatbuffers::FlatBufferBuilder fbb;
152 auto commandData = Sink::EntityBuffer::appendAsVector(fbb, data.constData(), data.size());
153 auto buffer = Sink::CreateQueuedCommand(fbb, commandId, commandData);
154 Sink::FinishQueuedCommandBuffer(fbb, buffer);
155 mq.enqueue(fbb.GetBufferPointer(), fbb.GetSize());
156}
157
158void GenericResource::processCommand(int commandId, const QByteArray &data) 98void GenericResource::processCommand(int commandId, const QByteArray &data)
159{ 99{
160 if (commandId == Commands::FlushCommand) { 100 mProcessor->processCommand(commandId, data);
161 processFlushCommand(data);
162 return;
163 }
164 static int modifications = 0;
165 mUserQueue.startTransaction();
166 enqueueCommand(mUserQueue, commandId, data);
167 modifications++;
168 if (modifications >= sBatchSize) {
169 mUserQueue.commit();
170 modifications = 0;
171 mCommitQueueTimer.stop();
172 } else {
173 mCommitQueueTimer.start();
174 }
175}
176
177void GenericResource::processFlushCommand(const QByteArray &data)
178{
179 flatbuffers::Verifier verifier((const uint8_t *)data.constData(), data.size());
180 if (Sink::Commands::VerifyFlushBuffer(verifier)) {
181 auto buffer = Sink::Commands::GetFlush(data.constData());
182 const auto flushType = buffer->type();
183 const auto flushId = BufferUtils::extractBuffer(buffer->id());
184 if (flushType == Sink::Flush::FlushSynchronization) {
185 mSynchronizer->flush(flushType, flushId);
186 } else {
187 mUserQueue.startTransaction();
188 enqueueCommand(mUserQueue, Commands::FlushCommand, data);
189 mUserQueue.commit();
190 }
191 }
192
193} 101}
194 102
195KAsync::Job<void> GenericResource::synchronizeWithSource(const Sink::QueryBase &query) 103KAsync::Job<void> GenericResource::synchronizeWithSource(const Sink::QueryBase &query)
196{ 104{
197 return KAsync::start<void>([this, query] { 105 return mSynchronizer->synchronize(query);
198
199 Sink::Notification n;
200 n.id = "sync";
201 n.type = Sink::Notification::Status;
202 n.message = "Synchronization has started.";
203 n.code = Sink::ApplicationDomain::BusyStatus;
204 emit notify(n);
205
206 SinkLog() << " Synchronizing";
207 return mSynchronizer->synchronize(query)
208 .then<void>([this](const KAsync::Error &error) {
209 if (!error) {
210 SinkLog() << "Done Synchronizing";
211 Sink::Notification n;
212 n.id = "sync";
213 n.type = Sink::Notification::Status;
214 n.message = "Synchronization has ended.";
215 n.code = Sink::ApplicationDomain::ConnectedStatus;
216 emit notify(n);
217 return KAsync::null();
218 }
219 return KAsync::error(error);
220 });
221 });
222} 106}
223 107
224static void waitForDrained(KAsync::Future<void> &f, MessageQueue &queue)
225{
226 if (queue.isEmpty()) {
227 f.setFinished();
228 } else {
229 QObject::connect(&queue, &MessageQueue::drained, [&f]() { f.setFinished(); });
230 }
231};
232
233KAsync::Job<void> GenericResource::processAllMessages() 108KAsync::Job<void> GenericResource::processAllMessages()
234{ 109{
235 // We have to wait for all items to be processed to ensure the synced items are available when a query gets executed. 110 return mProcessor->processAllMessages();
236 // TODO: report errors while processing sync?
237 // TODO JOBAPI: A helper that waits for n events and then continues?
238 return KAsync::start<void>([this](KAsync::Future<void> &f) {
239 if (mCommitQueueTimer.isActive()) {
240 auto context = new QObject;
241 QObject::connect(&mCommitQueueTimer, &QTimer::timeout, context, [&f, context]() {
242 delete context;
243 f.setFinished();
244 });
245 } else {
246 f.setFinished();
247 }
248 })
249 .then<void>([this](KAsync::Future<void> &f) { waitForDrained(f, mSynchronizerQueue); })
250 .then<void>([this](KAsync::Future<void> &f) { waitForDrained(f, mUserQueue); })
251 .then<void>([this](KAsync::Future<void> &f) {
252 if (mSynchronizer->allChangesReplayed()) {
253 f.setFinished();
254 } else {
255 auto context = new QObject;
256 QObject::connect(mSynchronizer.data(), &ChangeReplay::changesReplayed, context, [&f, context]() {
257 delete context;
258 f.setFinished();
259 });
260 }
261 });
262} 111}
263 112
264void GenericResource::updateLowerBoundRevision() 113void GenericResource::updateLowerBoundRevision()