diff options
Diffstat (limited to 'common')
-rw-r--r-- | common/commandprocessor.cpp | 210 | ||||
-rw-r--r-- | common/commandprocessor.h | 19 | ||||
-rw-r--r-- | common/genericresource.cpp | 167 | ||||
-rw-r--r-- | common/genericresource.h | 13 | ||||
-rw-r--r-- | common/listener.cpp | 40 | ||||
-rw-r--r-- | common/resource.cpp | 10 | ||||
-rw-r--r-- | common/resource.h | 10 | ||||
-rw-r--r-- | common/synchronizer.h | 2 |
8 files changed, 241 insertions, 230 deletions
diff --git a/common/commandprocessor.cpp b/common/commandprocessor.cpp index 57fe524..bdff905 100644 --- a/common/commandprocessor.cpp +++ b/common/commandprocessor.cpp | |||
@@ -19,25 +19,171 @@ | |||
19 | */ | 19 | */ |
20 | #include "commandprocessor.h" | 20 | #include "commandprocessor.h" |
21 | 21 | ||
22 | #include <QDataStream> | ||
23 | |||
22 | #include "commands.h" | 24 | #include "commands.h" |
23 | #include "messagequeue.h" | 25 | #include "messagequeue.h" |
24 | #include "queuedcommand_generated.h" | ||
25 | #include "flush_generated.h" | 26 | #include "flush_generated.h" |
26 | #include "inspector.h" | 27 | #include "inspector.h" |
27 | #include "synchronizer.h" | 28 | #include "synchronizer.h" |
28 | #include "pipeline.h" | 29 | #include "pipeline.h" |
29 | #include "bufferutils.h" | 30 | #include "bufferutils.h" |
31 | #include "definitions.h" | ||
32 | #include "storage.h" | ||
33 | |||
34 | #include "queuedcommand_generated.h" | ||
35 | #include "revisionreplayed_generated.h" | ||
36 | #include "synchronize_generated.h" | ||
30 | 37 | ||
31 | static int sBatchSize = 100; | 38 | static int sBatchSize = 100; |
39 | // This interval directly affects the roundtrip time of single commands | ||
40 | static int sCommitInterval = 10; | ||
41 | |||
32 | 42 | ||
33 | using namespace Sink; | 43 | using namespace Sink; |
44 | using namespace Sink::Storage; | ||
34 | 45 | ||
35 | CommandProcessor::CommandProcessor(Sink::Pipeline *pipeline, QList<MessageQueue *> commandQueues) : QObject(), mPipeline(pipeline), mCommandQueues(commandQueues), mProcessingLock(false), mLowerBoundRevision(0) | 46 | CommandProcessor::CommandProcessor(Sink::Pipeline *pipeline, const QByteArray &instanceId) |
47 | : QObject(), | ||
48 | mPipeline(pipeline), | ||
49 | mUserQueue(Sink::storageLocation(), instanceId + ".userqueue"), | ||
50 | mSynchronizerQueue(Sink::storageLocation(), instanceId + ".synchronizerqueue"), | ||
51 | mCommandQueues(QList<MessageQueue*>() << &mUserQueue << &mSynchronizerQueue), mProcessingLock(false), mLowerBoundRevision(0) | ||
36 | { | 52 | { |
37 | for (auto queue : mCommandQueues) { | 53 | for (auto queue : mCommandQueues) { |
38 | const bool ret = connect(queue, &MessageQueue::messageReady, this, &CommandProcessor::process); | 54 | const bool ret = connect(queue, &MessageQueue::messageReady, this, &CommandProcessor::process); |
39 | Q_UNUSED(ret); | 55 | Q_UNUSED(ret); |
40 | } | 56 | } |
57 | |||
58 | mCommitQueueTimer.setInterval(sCommitInterval); | ||
59 | mCommitQueueTimer.setSingleShot(true); | ||
60 | QObject::connect(&mCommitQueueTimer, &QTimer::timeout, &mUserQueue, &MessageQueue::commit); | ||
61 | } | ||
62 | |||
63 | static void enqueueCommand(MessageQueue &mq, int commandId, const QByteArray &data) | ||
64 | { | ||
65 | flatbuffers::FlatBufferBuilder fbb; | ||
66 | auto commandData = Sink::EntityBuffer::appendAsVector(fbb, data.constData(), data.size()); | ||
67 | auto buffer = Sink::CreateQueuedCommand(fbb, commandId, commandData); | ||
68 | Sink::FinishQueuedCommandBuffer(fbb, buffer); | ||
69 | mq.enqueue(fbb.GetBufferPointer(), fbb.GetSize()); | ||
70 | } | ||
71 | |||
72 | void CommandProcessor::processCommand(int commandId, const QByteArray &data) | ||
73 | { | ||
74 | switch (commandId) { | ||
75 | case Commands::FlushCommand: | ||
76 | processFlushCommand(data); | ||
77 | break; | ||
78 | case Commands::SynchronizeCommand: | ||
79 | processSynchronizeCommand(data); | ||
80 | break; | ||
81 | // case Commands::RevisionReplayedCommand: | ||
82 | // processRevisionReplayedCommand(data); | ||
83 | // break; | ||
84 | default: { | ||
85 | static int modifications = 0; | ||
86 | mUserQueue.startTransaction(); | ||
87 | enqueueCommand(mUserQueue, commandId, data); | ||
88 | modifications++; | ||
89 | if (modifications >= sBatchSize) { | ||
90 | mUserQueue.commit(); | ||
91 | modifications = 0; | ||
92 | mCommitQueueTimer.stop(); | ||
93 | } else { | ||
94 | mCommitQueueTimer.start(); | ||
95 | } | ||
96 | } | ||
97 | }; | ||
98 | } | ||
99 | |||
100 | void CommandProcessor::processFlushCommand(const QByteArray &data) | ||
101 | { | ||
102 | flatbuffers::Verifier verifier((const uint8_t *)data.constData(), data.size()); | ||
103 | if (Sink::Commands::VerifyFlushBuffer(verifier)) { | ||
104 | auto buffer = Sink::Commands::GetFlush(data.constData()); | ||
105 | const auto flushType = buffer->type(); | ||
106 | const auto flushId = BufferUtils::extractBuffer(buffer->id()); | ||
107 | if (flushType == Sink::Flush::FlushSynchronization) { | ||
108 | mSynchronizer->flush(flushType, flushId); | ||
109 | } else { | ||
110 | mUserQueue.startTransaction(); | ||
111 | enqueueCommand(mUserQueue, Commands::FlushCommand, data); | ||
112 | mUserQueue.commit(); | ||
113 | } | ||
114 | } | ||
115 | |||
116 | } | ||
117 | |||
118 | void CommandProcessor::processSynchronizeCommand(const QByteArray &data) | ||
119 | { | ||
120 | flatbuffers::Verifier verifier((const uint8_t *)data.constData(), data.size()); | ||
121 | if (Sink::Commands::VerifySynchronizeBuffer(verifier)) { | ||
122 | auto buffer = Sink::Commands::GetSynchronize(data.constData()); | ||
123 | auto timer = QSharedPointer<QTime>::create(); | ||
124 | timer->start(); | ||
125 | auto job = KAsync::null<void>(); | ||
126 | Sink::QueryBase query; | ||
127 | if (buffer->query()) { | ||
128 | auto data = QByteArray::fromStdString(buffer->query()->str()); | ||
129 | QDataStream stream(&data, QIODevice::ReadOnly); | ||
130 | stream >> query; | ||
131 | } | ||
132 | job = synchronizeWithSource(query); | ||
133 | job.then<void>([timer](const KAsync::Error &error) { | ||
134 | if (error) { | ||
135 | SinkWarning() << "Sync failed: " << error.errorMessage; | ||
136 | return KAsync::error(error); | ||
137 | } else { | ||
138 | SinkTrace() << "Sync took " << Sink::Log::TraceTime(timer->elapsed()); | ||
139 | return KAsync::null(); | ||
140 | } | ||
141 | }) | ||
142 | .exec(); | ||
143 | return; | ||
144 | } else { | ||
145 | SinkWarning() << "received invalid command"; | ||
146 | } | ||
147 | } | ||
148 | |||
149 | // void CommandProcessor::processRevisionReplayedCommand(const QByteArray &data) | ||
150 | // { | ||
151 | // flatbuffers::Verifier verifier((const uint8_t *)commandBuffer.constData(), commandBuffer.size()); | ||
152 | // if (Sink::Commands::VerifyRevisionReplayedBuffer(verifier)) { | ||
153 | // auto buffer = Sink::Commands::GetRevisionReplayed(commandBuffer.constData()); | ||
154 | // client.currentRevision = buffer->revision(); | ||
155 | // } else { | ||
156 | // SinkWarning() << "received invalid command"; | ||
157 | // } | ||
158 | // loadResource().setLowerBoundRevision(lowerBoundRevision()); | ||
159 | // } | ||
160 | |||
161 | KAsync::Job<void> CommandProcessor::synchronizeWithSource(const Sink::QueryBase &query) | ||
162 | { | ||
163 | return KAsync::start<void>([this, query] { | ||
164 | Sink::Notification n; | ||
165 | n.id = "sync"; | ||
166 | n.type = Sink::Notification::Status; | ||
167 | n.message = "Synchronization has started."; | ||
168 | n.code = Sink::ApplicationDomain::BusyStatus; | ||
169 | emit notify(n); | ||
170 | |||
171 | SinkLog() << " Synchronizing"; | ||
172 | return mSynchronizer->synchronize(query) | ||
173 | .then<void>([this](const KAsync::Error &error) { | ||
174 | if (!error) { | ||
175 | SinkLog() << "Done Synchronizing"; | ||
176 | Sink::Notification n; | ||
177 | n.id = "sync"; | ||
178 | n.type = Sink::Notification::Status; | ||
179 | n.message = "Synchronization has ended."; | ||
180 | n.code = Sink::ApplicationDomain::ConnectedStatus; | ||
181 | emit notify(n); | ||
182 | return KAsync::null(); | ||
183 | } | ||
184 | return KAsync::error(error); | ||
185 | }); | ||
186 | }); | ||
41 | } | 187 | } |
42 | 188 | ||
43 | void CommandProcessor::setOldestUsedRevision(qint64 revision) | 189 | void CommandProcessor::setOldestUsedRevision(qint64 revision) |
@@ -186,6 +332,27 @@ void CommandProcessor::setInspector(const QSharedPointer<Inspector> &inspector) | |||
186 | void CommandProcessor::setSynchronizer(const QSharedPointer<Synchronizer> &synchronizer) | 332 | void CommandProcessor::setSynchronizer(const QSharedPointer<Synchronizer> &synchronizer) |
187 | { | 333 | { |
188 | mSynchronizer = synchronizer; | 334 | mSynchronizer = synchronizer; |
335 | mSynchronizer->setup([this](int commandId, const QByteArray &data) { | ||
336 | enqueueCommand(mSynchronizerQueue, commandId, data); | ||
337 | }, mSynchronizerQueue); | ||
338 | |||
339 | QObject::connect(mSynchronizer.data(), &Synchronizer::replayingChanges, [this]() { | ||
340 | Sink::Notification n; | ||
341 | n.id = "changereplay"; | ||
342 | n.type = Sink::Notification::Status; | ||
343 | n.message = "Replaying changes."; | ||
344 | n.code = Sink::ApplicationDomain::BusyStatus; | ||
345 | emit notify(n); | ||
346 | }); | ||
347 | QObject::connect(mSynchronizer.data(), &Synchronizer::changesReplayed, [this]() { | ||
348 | Sink::Notification n; | ||
349 | n.id = "changereplay"; | ||
350 | n.type = Sink::Notification::Status; | ||
351 | n.message = "All changes have been replayed."; | ||
352 | n.code = Sink::ApplicationDomain::ConnectedStatus; | ||
353 | emit notify(n); | ||
354 | }); | ||
355 | |||
189 | QObject::connect(mSynchronizer.data(), &Synchronizer::notify, this, &CommandProcessor::notify); | 356 | QObject::connect(mSynchronizer.data(), &Synchronizer::notify, this, &CommandProcessor::notify); |
190 | setOldestUsedRevision(mSynchronizer->getLastReplayedRevision()); | 357 | setOldestUsedRevision(mSynchronizer->getLastReplayedRevision()); |
191 | } | 358 | } |
@@ -213,3 +380,42 @@ KAsync::Job<void> CommandProcessor::flush(void const *command, size_t size) | |||
213 | return KAsync::error<void>(-1, "Invalid flush command."); | 380 | return KAsync::error<void>(-1, "Invalid flush command."); |
214 | } | 381 | } |
215 | 382 | ||
383 | static void waitForDrained(KAsync::Future<void> &f, MessageQueue &queue) | ||
384 | { | ||
385 | if (queue.isEmpty()) { | ||
386 | f.setFinished(); | ||
387 | } else { | ||
388 | QObject::connect(&queue, &MessageQueue::drained, [&f]() { f.setFinished(); }); | ||
389 | } | ||
390 | }; | ||
391 | |||
392 | KAsync::Job<void> CommandProcessor::processAllMessages() | ||
393 | { | ||
394 | // We have to wait for all items to be processed to ensure the synced items are available when a query gets executed. | ||
395 | // TODO: report errors while processing sync? | ||
396 | // TODO JOBAPI: A helper that waits for n events and then continues? | ||
397 | return KAsync::start<void>([this](KAsync::Future<void> &f) { | ||
398 | if (mCommitQueueTimer.isActive()) { | ||
399 | auto context = new QObject; | ||
400 | QObject::connect(&mCommitQueueTimer, &QTimer::timeout, context, [&f, context]() { | ||
401 | delete context; | ||
402 | f.setFinished(); | ||
403 | }); | ||
404 | } else { | ||
405 | f.setFinished(); | ||
406 | } | ||
407 | }) | ||
408 | .then<void>([this](KAsync::Future<void> &f) { waitForDrained(f, mSynchronizerQueue); }) | ||
409 | .then<void>([this](KAsync::Future<void> &f) { waitForDrained(f, mUserQueue); }) | ||
410 | .then<void>([this](KAsync::Future<void> &f) { | ||
411 | if (mSynchronizer->allChangesReplayed()) { | ||
412 | f.setFinished(); | ||
413 | } else { | ||
414 | auto context = new QObject; | ||
415 | QObject::connect(mSynchronizer.data(), &ChangeReplay::changesReplayed, context, [&f, context]() { | ||
416 | delete context; | ||
417 | f.setFinished(); | ||
418 | }); | ||
419 | } | ||
420 | }); | ||
421 | } | ||
diff --git a/common/commandprocessor.h b/common/commandprocessor.h index d00cf43..a807f46 100644 --- a/common/commandprocessor.h +++ b/common/commandprocessor.h | |||
@@ -22,19 +22,20 @@ | |||
22 | #include "sink_export.h" | 22 | #include "sink_export.h" |
23 | 23 | ||
24 | #include <QObject> | 24 | #include <QObject> |
25 | #include <QTimer> | ||
25 | #include <Async/Async> | 26 | #include <Async/Async> |
26 | #include <functional> | 27 | #include <functional> |
27 | 28 | ||
28 | #include "log.h" | 29 | #include "log.h" |
29 | #include "notification.h" | 30 | #include "notification.h" |
30 | 31 | #include "messagequeue.h" | |
31 | class MessageQueue; | ||
32 | 32 | ||
33 | namespace Sink { | 33 | namespace Sink { |
34 | class Pipeline; | 34 | class Pipeline; |
35 | class Inspector; | 35 | class Inspector; |
36 | class Synchronizer; | 36 | class Synchronizer; |
37 | class QueuedCommand; | 37 | class QueuedCommand; |
38 | class QueryBase; | ||
38 | 39 | ||
39 | /** | 40 | /** |
40 | * Drives the pipeline using the output from all command queues | 41 | * Drives the pipeline using the output from all command queues |
@@ -45,13 +46,17 @@ class CommandProcessor : public QObject | |||
45 | SINK_DEBUG_AREA("commandprocessor") | 46 | SINK_DEBUG_AREA("commandprocessor") |
46 | 47 | ||
47 | public: | 48 | public: |
48 | CommandProcessor(Sink::Pipeline *pipeline, QList<MessageQueue *> commandQueues); | 49 | CommandProcessor(Sink::Pipeline *pipeline, const QByteArray &instanceId); |
49 | 50 | ||
50 | void setOldestUsedRevision(qint64 revision); | 51 | void setOldestUsedRevision(qint64 revision); |
51 | 52 | ||
52 | void setInspector(const QSharedPointer<Inspector> &inspector); | 53 | void setInspector(const QSharedPointer<Inspector> &inspector); |
53 | void setSynchronizer(const QSharedPointer<Synchronizer> &synchronizer); | 54 | void setSynchronizer(const QSharedPointer<Synchronizer> &synchronizer); |
54 | 55 | ||
56 | void processCommand(int commandId, const QByteArray &data); | ||
57 | |||
58 | KAsync::Job<void> processAllMessages(); | ||
59 | |||
55 | signals: | 60 | signals: |
56 | void notify(Notification); | 61 | void notify(Notification); |
57 | void error(int errorCode, const QString &errorMessage); | 62 | void error(int errorCode, const QString &errorMessage); |
@@ -68,9 +73,16 @@ private slots: | |||
68 | KAsync::Job<void> processPipeline(); | 73 | KAsync::Job<void> processPipeline(); |
69 | 74 | ||
70 | private: | 75 | private: |
76 | void processFlushCommand(const QByteArray &data); | ||
77 | void processSynchronizeCommand(const QByteArray &data); | ||
78 | // void processRevisionReplayedCommand(const QByteArray &data); | ||
79 | |||
71 | KAsync::Job<void> flush(void const *command, size_t size); | 80 | KAsync::Job<void> flush(void const *command, size_t size); |
81 | KAsync::Job<void> synchronizeWithSource(const Sink::QueryBase &query); | ||
72 | 82 | ||
73 | Sink::Pipeline *mPipeline; | 83 | Sink::Pipeline *mPipeline; |
84 | MessageQueue mUserQueue; | ||
85 | MessageQueue mSynchronizerQueue; | ||
74 | // Ordered by priority | 86 | // Ordered by priority |
75 | QList<MessageQueue *> mCommandQueues; | 87 | QList<MessageQueue *> mCommandQueues; |
76 | bool mProcessingLock; | 88 | bool mProcessingLock; |
@@ -78,6 +90,7 @@ private: | |||
78 | qint64 mLowerBoundRevision; | 90 | qint64 mLowerBoundRevision; |
79 | QSharedPointer<Synchronizer> mSynchronizer; | 91 | QSharedPointer<Synchronizer> mSynchronizer; |
80 | QSharedPointer<Inspector> mInspector; | 92 | QSharedPointer<Inspector> mInspector; |
93 | QTimer mCommitQueueTimer; | ||
81 | }; | 94 | }; |
82 | 95 | ||
83 | }; | 96 | }; |
diff --git a/common/genericresource.cpp b/common/genericresource.cpp index 38da6bf..82112b3 100644 --- a/common/genericresource.cpp +++ b/common/genericresource.cpp | |||
@@ -19,32 +19,13 @@ | |||
19 | */ | 19 | */ |
20 | #include "genericresource.h" | 20 | #include "genericresource.h" |
21 | 21 | ||
22 | #include "entitybuffer.h" | ||
23 | #include "pipeline.h" | 22 | #include "pipeline.h" |
24 | #include "queuedcommand_generated.h" | ||
25 | #include "createentity_generated.h" | ||
26 | #include "modifyentity_generated.h" | ||
27 | #include "deleteentity_generated.h" | ||
28 | #include "inspection_generated.h" | ||
29 | #include "notification_generated.h" | ||
30 | #include "flush_generated.h" | ||
31 | #include "domainadaptor.h" | 23 | #include "domainadaptor.h" |
32 | #include "commands.h" | ||
33 | #include "index.h" | ||
34 | #include "log.h" | 24 | #include "log.h" |
35 | #include "definitions.h" | ||
36 | #include "bufferutils.h" | ||
37 | #include "adaptorfactoryregistry.h" | ||
38 | #include "synchronizer.h" | 25 | #include "synchronizer.h" |
39 | #include "commandprocessor.h" | 26 | #include "commandprocessor.h" |
40 | 27 | #include "definitions.h" | |
41 | #include <QUuid> | 28 | #include "storage.h" |
42 | #include <QDataStream> | ||
43 | #include <QTime> | ||
44 | |||
45 | static int sBatchSize = 100; | ||
46 | // This interval directly affects the roundtrip time of single commands | ||
47 | static int sCommitInterval = 10; | ||
48 | 29 | ||
49 | using namespace Sink; | 30 | using namespace Sink; |
50 | using namespace Sink::Storage; | 31 | using namespace Sink::Storage; |
@@ -52,20 +33,14 @@ using namespace Sink::Storage; | |||
52 | GenericResource::GenericResource(const ResourceContext &resourceContext, const QSharedPointer<Pipeline> &pipeline ) | 33 | GenericResource::GenericResource(const ResourceContext &resourceContext, const QSharedPointer<Pipeline> &pipeline ) |
53 | : Sink::Resource(), | 34 | : Sink::Resource(), |
54 | mResourceContext(resourceContext), | 35 | mResourceContext(resourceContext), |
55 | mUserQueue(Sink::storageLocation(), resourceContext.instanceId() + ".userqueue"), | ||
56 | mSynchronizerQueue(Sink::storageLocation(), resourceContext.instanceId() + ".synchronizerqueue"), | ||
57 | mPipeline(pipeline ? pipeline : QSharedPointer<Sink::Pipeline>::create(resourceContext)), | 36 | mPipeline(pipeline ? pipeline : QSharedPointer<Sink::Pipeline>::create(resourceContext)), |
37 | mProcessor(QSharedPointer<CommandProcessor>::create(mPipeline.data(), resourceContext.instanceId())), | ||
58 | mError(0), | 38 | mError(0), |
59 | mClientLowerBoundRevision(std::numeric_limits<qint64>::max()) | 39 | mClientLowerBoundRevision(std::numeric_limits<qint64>::max()) |
60 | { | 40 | { |
61 | mProcessor = std::unique_ptr<CommandProcessor>(new CommandProcessor(mPipeline.data(), QList<MessageQueue *>() << &mUserQueue << &mSynchronizerQueue)); | 41 | QObject::connect(mProcessor.data(), &CommandProcessor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); }); |
62 | QObject::connect(mProcessor.get(), &CommandProcessor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); }); | 42 | QObject::connect(mProcessor.data(), &CommandProcessor::notify, this, &GenericResource::notify); |
63 | QObject::connect(mProcessor.get(), &CommandProcessor::notify, this, &GenericResource::notify); | ||
64 | QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, this, &Resource::revisionUpdated); | 43 | QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, this, &Resource::revisionUpdated); |
65 | |||
66 | mCommitQueueTimer.setInterval(sCommitInterval); | ||
67 | mCommitQueueTimer.setSingleShot(true); | ||
68 | QObject::connect(&mCommitQueueTimer, &QTimer::timeout, &mUserQueue, &MessageQueue::commit); | ||
69 | } | 44 | } |
70 | 45 | ||
71 | GenericResource::~GenericResource() | 46 | GenericResource::~GenericResource() |
@@ -80,32 +55,6 @@ void GenericResource::setupPreprocessors(const QByteArray &type, const QVector<S | |||
80 | void GenericResource::setupSynchronizer(const QSharedPointer<Synchronizer> &synchronizer) | 55 | void GenericResource::setupSynchronizer(const QSharedPointer<Synchronizer> &synchronizer) |
81 | { | 56 | { |
82 | mSynchronizer = synchronizer; | 57 | mSynchronizer = synchronizer; |
83 | mSynchronizer->setup([this](int commandId, const QByteArray &data) { | ||
84 | enqueueCommand(mSynchronizerQueue, commandId, data); | ||
85 | }, mSynchronizerQueue); | ||
86 | { | ||
87 | auto ret = QObject::connect(mSynchronizer.data(), &Synchronizer::replayingChanges, [this]() { | ||
88 | Sink::Notification n; | ||
89 | n.id = "changereplay"; | ||
90 | n.type = Sink::Notification::Status; | ||
91 | n.message = "Replaying changes."; | ||
92 | n.code = Sink::ApplicationDomain::BusyStatus; | ||
93 | emit notify(n); | ||
94 | }); | ||
95 | Q_ASSERT(ret); | ||
96 | } | ||
97 | { | ||
98 | auto ret = QObject::connect(mSynchronizer.data(), &Synchronizer::changesReplayed, [this]() { | ||
99 | Sink::Notification n; | ||
100 | n.id = "changereplay"; | ||
101 | n.type = Sink::Notification::Status; | ||
102 | n.message = "All changes have been replayed."; | ||
103 | n.code = Sink::ApplicationDomain::ConnectedStatus; | ||
104 | emit notify(n); | ||
105 | }); | ||
106 | Q_ASSERT(ret); | ||
107 | } | ||
108 | |||
109 | mProcessor->setSynchronizer(synchronizer); | 58 | mProcessor->setSynchronizer(synchronizer); |
110 | QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, mSynchronizer.data(), &ChangeReplay::revisionChanged, Qt::QueuedConnection); | 59 | QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, mSynchronizer.data(), &ChangeReplay::revisionChanged, Qt::QueuedConnection); |
111 | QObject::connect(mSynchronizer.data(), &ChangeReplay::changesReplayed, this, &GenericResource::updateLowerBoundRevision); | 60 | QObject::connect(mSynchronizer.data(), &ChangeReplay::changesReplayed, this, &GenericResource::updateLowerBoundRevision); |
@@ -146,119 +95,19 @@ int GenericResource::error() const | |||
146 | return mError; | 95 | return mError; |
147 | } | 96 | } |
148 | 97 | ||
149 | void GenericResource::enqueueCommand(MessageQueue &mq, int commandId, const QByteArray &data) | ||
150 | { | ||
151 | flatbuffers::FlatBufferBuilder fbb; | ||
152 | auto commandData = Sink::EntityBuffer::appendAsVector(fbb, data.constData(), data.size()); | ||
153 | auto buffer = Sink::CreateQueuedCommand(fbb, commandId, commandData); | ||
154 | Sink::FinishQueuedCommandBuffer(fbb, buffer); | ||
155 | mq.enqueue(fbb.GetBufferPointer(), fbb.GetSize()); | ||
156 | } | ||
157 | |||
158 | void GenericResource::processCommand(int commandId, const QByteArray &data) | 98 | void GenericResource::processCommand(int commandId, const QByteArray &data) |
159 | { | 99 | { |
160 | if (commandId == Commands::FlushCommand) { | 100 | mProcessor->processCommand(commandId, data); |
161 | processFlushCommand(data); | ||
162 | return; | ||
163 | } | ||
164 | static int modifications = 0; | ||
165 | mUserQueue.startTransaction(); | ||
166 | enqueueCommand(mUserQueue, commandId, data); | ||
167 | modifications++; | ||
168 | if (modifications >= sBatchSize) { | ||
169 | mUserQueue.commit(); | ||
170 | modifications = 0; | ||
171 | mCommitQueueTimer.stop(); | ||
172 | } else { | ||
173 | mCommitQueueTimer.start(); | ||
174 | } | ||
175 | } | ||
176 | |||
177 | void GenericResource::processFlushCommand(const QByteArray &data) | ||
178 | { | ||
179 | flatbuffers::Verifier verifier((const uint8_t *)data.constData(), data.size()); | ||
180 | if (Sink::Commands::VerifyFlushBuffer(verifier)) { | ||
181 | auto buffer = Sink::Commands::GetFlush(data.constData()); | ||
182 | const auto flushType = buffer->type(); | ||
183 | const auto flushId = BufferUtils::extractBuffer(buffer->id()); | ||
184 | if (flushType == Sink::Flush::FlushSynchronization) { | ||
185 | mSynchronizer->flush(flushType, flushId); | ||
186 | } else { | ||
187 | mUserQueue.startTransaction(); | ||
188 | enqueueCommand(mUserQueue, Commands::FlushCommand, data); | ||
189 | mUserQueue.commit(); | ||
190 | } | ||
191 | } | ||
192 | |||
193 | } | 101 | } |
194 | 102 | ||
195 | KAsync::Job<void> GenericResource::synchronizeWithSource(const Sink::QueryBase &query) | 103 | KAsync::Job<void> GenericResource::synchronizeWithSource(const Sink::QueryBase &query) |
196 | { | 104 | { |
197 | return KAsync::start<void>([this, query] { | 105 | return mSynchronizer->synchronize(query); |
198 | |||
199 | Sink::Notification n; | ||
200 | n.id = "sync"; | ||
201 | n.type = Sink::Notification::Status; | ||
202 | n.message = "Synchronization has started."; | ||
203 | n.code = Sink::ApplicationDomain::BusyStatus; | ||
204 | emit notify(n); | ||
205 | |||
206 | SinkLog() << " Synchronizing"; | ||
207 | return mSynchronizer->synchronize(query) | ||
208 | .then<void>([this](const KAsync::Error &error) { | ||
209 | if (!error) { | ||
210 | SinkLog() << "Done Synchronizing"; | ||
211 | Sink::Notification n; | ||
212 | n.id = "sync"; | ||
213 | n.type = Sink::Notification::Status; | ||
214 | n.message = "Synchronization has ended."; | ||
215 | n.code = Sink::ApplicationDomain::ConnectedStatus; | ||
216 | emit notify(n); | ||
217 | return KAsync::null(); | ||
218 | } | ||
219 | return KAsync::error(error); | ||
220 | }); | ||
221 | }); | ||
222 | } | 106 | } |
223 | 107 | ||
224 | static void waitForDrained(KAsync::Future<void> &f, MessageQueue &queue) | ||
225 | { | ||
226 | if (queue.isEmpty()) { | ||
227 | f.setFinished(); | ||
228 | } else { | ||
229 | QObject::connect(&queue, &MessageQueue::drained, [&f]() { f.setFinished(); }); | ||
230 | } | ||
231 | }; | ||
232 | |||
233 | KAsync::Job<void> GenericResource::processAllMessages() | 108 | KAsync::Job<void> GenericResource::processAllMessages() |
234 | { | 109 | { |
235 | // We have to wait for all items to be processed to ensure the synced items are available when a query gets executed. | 110 | return mProcessor->processAllMessages(); |
236 | // TODO: report errors while processing sync? | ||
237 | // TODO JOBAPI: A helper that waits for n events and then continues? | ||
238 | return KAsync::start<void>([this](KAsync::Future<void> &f) { | ||
239 | if (mCommitQueueTimer.isActive()) { | ||
240 | auto context = new QObject; | ||
241 | QObject::connect(&mCommitQueueTimer, &QTimer::timeout, context, [&f, context]() { | ||
242 | delete context; | ||
243 | f.setFinished(); | ||
244 | }); | ||
245 | } else { | ||
246 | f.setFinished(); | ||
247 | } | ||
248 | }) | ||
249 | .then<void>([this](KAsync::Future<void> &f) { waitForDrained(f, mSynchronizerQueue); }) | ||
250 | .then<void>([this](KAsync::Future<void> &f) { waitForDrained(f, mUserQueue); }) | ||
251 | .then<void>([this](KAsync::Future<void> &f) { | ||
252 | if (mSynchronizer->allChangesReplayed()) { | ||
253 | f.setFinished(); | ||
254 | } else { | ||
255 | auto context = new QObject; | ||
256 | QObject::connect(mSynchronizer.data(), &ChangeReplay::changesReplayed, context, [&f, context]() { | ||
257 | delete context; | ||
258 | f.setFinished(); | ||
259 | }); | ||
260 | } | ||
261 | }); | ||
262 | } | 111 | } |
263 | 112 | ||
264 | void GenericResource::updateLowerBoundRevision() | 113 | void GenericResource::updateLowerBoundRevision() |
diff --git a/common/genericresource.h b/common/genericresource.h index 0bc47da..cc73f50 100644 --- a/common/genericresource.h +++ b/common/genericresource.h | |||
@@ -47,9 +47,6 @@ public: | |||
47 | virtual ~GenericResource(); | 47 | virtual ~GenericResource(); |
48 | 48 | ||
49 | virtual void processCommand(int commandId, const QByteArray &data) Q_DECL_OVERRIDE; | 49 | virtual void processCommand(int commandId, const QByteArray &data) Q_DECL_OVERRIDE; |
50 | virtual void processFlushCommand(const QByteArray &data); | ||
51 | virtual KAsync::Job<void> synchronizeWithSource(const Sink::QueryBase &query) Q_DECL_OVERRIDE; | ||
52 | virtual KAsync::Job<void> processAllMessages() Q_DECL_OVERRIDE; | ||
53 | virtual void setLowerBoundRevision(qint64 revision) Q_DECL_OVERRIDE; | 50 | virtual void setLowerBoundRevision(qint64 revision) Q_DECL_OVERRIDE; |
54 | 51 | ||
55 | int error() const; | 52 | int error() const; |
@@ -57,6 +54,9 @@ public: | |||
57 | static void removeFromDisk(const QByteArray &instanceIdentifier); | 54 | static void removeFromDisk(const QByteArray &instanceIdentifier); |
58 | static qint64 diskUsage(const QByteArray &instanceIdentifier); | 55 | static qint64 diskUsage(const QByteArray &instanceIdentifier); |
59 | 56 | ||
57 | KAsync::Job<void> synchronizeWithSource(const Sink::QueryBase &query); | ||
58 | KAsync::Job<void> processAllMessages(); | ||
59 | |||
60 | private slots: | 60 | private slots: |
61 | void updateLowerBoundRevision(); | 61 | void updateLowerBoundRevision(); |
62 | 62 | ||
@@ -69,15 +69,12 @@ protected: | |||
69 | void enqueueCommand(MessageQueue &mq, int commandId, const QByteArray &data); | 69 | void enqueueCommand(MessageQueue &mq, int commandId, const QByteArray &data); |
70 | 70 | ||
71 | ResourceContext mResourceContext; | 71 | ResourceContext mResourceContext; |
72 | MessageQueue mUserQueue; | ||
73 | MessageQueue mSynchronizerQueue; | ||
74 | QSharedPointer<Pipeline> mPipeline; | ||
75 | 72 | ||
76 | private: | 73 | private: |
77 | std::unique_ptr<CommandProcessor> mProcessor; | 74 | QSharedPointer<Pipeline> mPipeline; |
75 | QSharedPointer<CommandProcessor> mProcessor; | ||
78 | QSharedPointer<Synchronizer> mSynchronizer; | 76 | QSharedPointer<Synchronizer> mSynchronizer; |
79 | int mError; | 77 | int mError; |
80 | QTimer mCommitQueueTimer; | ||
81 | qint64 mClientLowerBoundRevision; | 78 | qint64 mClientLowerBoundRevision; |
82 | }; | 79 | }; |
83 | 80 | ||
diff --git a/common/listener.cpp b/common/listener.cpp index 9e80c45..d3ef0f1 100644 --- a/common/listener.cpp +++ b/common/listener.cpp | |||
@@ -30,15 +30,12 @@ | |||
30 | #include "common/commandcompletion_generated.h" | 30 | #include "common/commandcompletion_generated.h" |
31 | #include "common/handshake_generated.h" | 31 | #include "common/handshake_generated.h" |
32 | #include "common/revisionupdate_generated.h" | 32 | #include "common/revisionupdate_generated.h" |
33 | #include "common/synchronize_generated.h" | ||
34 | #include "common/notification_generated.h" | 33 | #include "common/notification_generated.h" |
35 | #include "common/revisionreplayed_generated.h" | 34 | #include "common/revisionreplayed_generated.h" |
36 | 35 | ||
37 | #include <QLocalServer> | 36 | #include <QLocalServer> |
38 | #include <QLocalSocket> | 37 | #include <QLocalSocket> |
39 | #include <QTimer> | 38 | #include <QTimer> |
40 | #include <QTime> | ||
41 | #include <QDataStream> | ||
42 | 39 | ||
43 | Listener::Listener(const QByteArray &resourceInstanceIdentifier, const QByteArray &resourceType, QObject *parent) | 40 | Listener::Listener(const QByteArray &resourceInstanceIdentifier, const QByteArray &resourceType, QObject *parent) |
44 | : QObject(parent), | 41 | : QObject(parent), |
@@ -235,39 +232,7 @@ void Listener::processCommand(int commandId, uint messageId, const QByteArray &c | |||
235 | } | 232 | } |
236 | break; | 233 | break; |
237 | } | 234 | } |
238 | case Sink::Commands::SynchronizeCommand: { | 235 | case Sink::Commands::SynchronizeCommand: |
239 | flatbuffers::Verifier verifier((const uint8_t *)commandBuffer.constData(), commandBuffer.size()); | ||
240 | if (Sink::Commands::VerifySynchronizeBuffer(verifier)) { | ||
241 | auto buffer = Sink::Commands::GetSynchronize(commandBuffer.constData()); | ||
242 | SinkTrace() << QString("Synchronize request (id %1) from %2").arg(messageId).arg(client.name); | ||
243 | auto timer = QSharedPointer<QTime>::create(); | ||
244 | timer->start(); | ||
245 | auto job = KAsync::null<void>(); | ||
246 | Sink::QueryBase query; | ||
247 | if (buffer->query()) { | ||
248 | auto data = QByteArray::fromStdString(buffer->query()->str()); | ||
249 | QDataStream stream(&data, QIODevice::ReadOnly); | ||
250 | stream >> query; | ||
251 | } | ||
252 | job = loadResource().synchronizeWithSource(query); | ||
253 | job.then<void>([callback, timer](const KAsync::Error &error) { | ||
254 | if (error) { | ||
255 | SinkWarning() << "Sync failed: " << error.errorMessage; | ||
256 | callback(false); | ||
257 | return KAsync::error(error); | ||
258 | } else { | ||
259 | SinkTrace() << "Sync took " << Sink::Log::TraceTime(timer->elapsed()); | ||
260 | callback(true); | ||
261 | return KAsync::null(); | ||
262 | } | ||
263 | }) | ||
264 | .exec(); | ||
265 | return; | ||
266 | } else { | ||
267 | SinkWarning() << "received invalid command"; | ||
268 | } | ||
269 | break; | ||
270 | } | ||
271 | case Sink::Commands::InspectionCommand: | 236 | case Sink::Commands::InspectionCommand: |
272 | case Sink::Commands::DeleteEntityCommand: | 237 | case Sink::Commands::DeleteEntityCommand: |
273 | case Sink::Commands::ModifyEntityCommand: | 238 | case Sink::Commands::ModifyEntityCommand: |
@@ -293,7 +258,8 @@ void Listener::processCommand(int commandId, uint messageId, const QByteArray &c | |||
293 | SinkWarning() << "received invalid command"; | 258 | SinkWarning() << "received invalid command"; |
294 | } | 259 | } |
295 | loadResource().setLowerBoundRevision(lowerBoundRevision()); | 260 | loadResource().setLowerBoundRevision(lowerBoundRevision()); |
296 | } break; | 261 | } |
262 | break; | ||
297 | case Sink::Commands::RemoveFromDiskCommand: { | 263 | case Sink::Commands::RemoveFromDiskCommand: { |
298 | SinkLog() << QString("Received a remove from disk command from %1").arg(client.name); | 264 | SinkLog() << QString("Received a remove from disk command from %1").arg(client.name); |
299 | //Close the resource to ensure no transactions are open | 265 | //Close the resource to ensure no transactions are open |
diff --git a/common/resource.cpp b/common/resource.cpp index f81f094..533a132 100644 --- a/common/resource.cpp +++ b/common/resource.cpp | |||
@@ -46,16 +46,6 @@ void Resource::processCommand(int commandId, const QByteArray &data) | |||
46 | Q_UNUSED(data) | 46 | Q_UNUSED(data) |
47 | } | 47 | } |
48 | 48 | ||
49 | KAsync::Job<void> Resource::synchronizeWithSource(const Sink::QueryBase &query) | ||
50 | { | ||
51 | return KAsync::null<void>(); | ||
52 | } | ||
53 | |||
54 | KAsync::Job<void> Resource::processAllMessages() | ||
55 | { | ||
56 | return KAsync::null<void>(); | ||
57 | } | ||
58 | |||
59 | void Resource::setLowerBoundRevision(qint64 revision) | 49 | void Resource::setLowerBoundRevision(qint64 revision) |
60 | { | 50 | { |
61 | Q_UNUSED(revision) | 51 | Q_UNUSED(revision) |
diff --git a/common/resource.h b/common/resource.h index 3cc326c..7789c53 100644 --- a/common/resource.h +++ b/common/resource.h | |||
@@ -43,16 +43,6 @@ public: | |||
43 | virtual void processCommand(int commandId, const QByteArray &data); | 43 | virtual void processCommand(int commandId, const QByteArray &data); |
44 | 44 | ||
45 | /** | 45 | /** |
46 | * Execute synchronization with the source. | ||
47 | */ | ||
48 | virtual KAsync::Job<void> synchronizeWithSource(const Sink::QueryBase &); | ||
49 | |||
50 | /** | ||
51 | * Process all internal messages, thus ensuring the store is up to date and no pending modifications are left. | ||
52 | */ | ||
53 | virtual KAsync::Job<void> processAllMessages(); | ||
54 | |||
55 | /** | ||
56 | * Set the lowest revision that is still referenced by external clients. | 46 | * Set the lowest revision that is still referenced by external clients. |
57 | */ | 47 | */ |
58 | virtual void setLowerBoundRevision(qint64 revision); | 48 | virtual void setLowerBoundRevision(qint64 revision); |
diff --git a/common/synchronizer.h b/common/synchronizer.h index 99d4877..ae597bd 100644 --- a/common/synchronizer.h +++ b/common/synchronizer.h | |||
@@ -66,7 +66,7 @@ public slots: | |||
66 | 66 | ||
67 | protected: | 67 | protected: |
68 | ///Base implementation calls the replay$Type calls | 68 | ///Base implementation calls the replay$Type calls |
69 | virtual KAsync::Job<void> replay(const QByteArray &type, const QByteArray &key, const QByteArray &value) Q_DECL_OVERRIDE; | 69 | KAsync::Job<void> replay(const QByteArray &type, const QByteArray &key, const QByteArray &value) Q_DECL_OVERRIDE; |
70 | virtual bool canReplay(const QByteArray &type, const QByteArray &key, const QByteArray &value) Q_DECL_OVERRIDE; | 70 | virtual bool canReplay(const QByteArray &type, const QByteArray &key, const QByteArray &value) Q_DECL_OVERRIDE; |
71 | 71 | ||
72 | protected: | 72 | protected: |