diff options
author | Christian Mollekopf <chrigi_1@fastmail.fm> | 2016-11-25 08:27:06 +0100 |
---|---|---|
committer | Christian Mollekopf <chrigi_1@fastmail.fm> | 2016-11-25 09:23:55 +0100 |
commit | 22af1ed535b4afc8db3804e72bc5adb1a1b28d60 (patch) | |
tree | 07665f41d5b40d658e95a64bb76020f1fd9d088e /common/genericresource.cpp | |
parent | 64d7d7bdd1edb2bcc305ca007784d0708cf7ef3c (diff) | |
download | sink-22af1ed535b4afc8db3804e72bc5adb1a1b28d60.tar.gz sink-22af1ed535b4afc8db3804e72bc5adb1a1b28d60.zip |
Added the flush command.
Instead of trying to actually flush queues, we send a special command
through the same queues as the other commands and can thus guarantee
that the respective commands have been processed without blocking
anything.
Diffstat (limited to 'common/genericresource.cpp')
-rw-r--r-- | common/genericresource.cpp | 57 |
1 files changed, 57 insertions, 0 deletions
diff --git a/common/genericresource.cpp b/common/genericresource.cpp index 7c4d4ea..7b83957 100644 --- a/common/genericresource.cpp +++ b/common/genericresource.cpp | |||
@@ -27,6 +27,7 @@ | |||
27 | #include "deleteentity_generated.h" | 27 | #include "deleteentity_generated.h" |
28 | #include "inspection_generated.h" | 28 | #include "inspection_generated.h" |
29 | #include "notification_generated.h" | 29 | #include "notification_generated.h" |
30 | #include "flush_generated.h" | ||
30 | #include "domainadaptor.h" | 31 | #include "domainadaptor.h" |
31 | #include "commands.h" | 32 | #include "commands.h" |
32 | #include "index.h" | 33 | #include "index.h" |
@@ -54,6 +55,7 @@ class CommandProcessor : public QObject | |||
54 | { | 55 | { |
55 | Q_OBJECT | 56 | Q_OBJECT |
56 | typedef std::function<KAsync::Job<void>(void const *, size_t)> InspectionFunction; | 57 | typedef std::function<KAsync::Job<void>(void const *, size_t)> InspectionFunction; |
58 | typedef std::function<KAsync::Job<void>(void const *, size_t)> FlushFunction; | ||
57 | SINK_DEBUG_AREA("commandprocessor") | 59 | SINK_DEBUG_AREA("commandprocessor") |
58 | 60 | ||
59 | public: | 61 | public: |
@@ -75,6 +77,11 @@ public: | |||
75 | mInspect = f; | 77 | mInspect = f; |
76 | } | 78 | } |
77 | 79 | ||
80 | void setFlushCommand(const FlushFunction &f) | ||
81 | { | ||
82 | mFlush = f; | ||
83 | } | ||
84 | |||
78 | signals: | 85 | signals: |
79 | void error(int errorCode, const QString &errorMessage); | 86 | void error(int errorCode, const QString &errorMessage); |
80 | 87 | ||
@@ -124,6 +131,13 @@ private slots: | |||
124 | } else { | 131 | } else { |
125 | return KAsync::error<qint64>(-1, "Missing inspection command."); | 132 | return KAsync::error<qint64>(-1, "Missing inspection command."); |
126 | } | 133 | } |
134 | case Sink::Commands::FlushCommand: | ||
135 | if (mFlush) { | ||
136 | return mFlush(queuedCommand->command()->Data(), queuedCommand->command()->size()) | ||
137 | .syncThen<qint64>([]() { return -1; }); | ||
138 | } else { | ||
139 | return KAsync::error<qint64>(-1, "Missing inspection command."); | ||
140 | } | ||
127 | default: | 141 | default: |
128 | return KAsync::error<qint64>(-1, "Unhandled command"); | 142 | return KAsync::error<qint64>(-1, "Unhandled command"); |
129 | } | 143 | } |
@@ -219,6 +233,7 @@ private: | |||
219 | // The lowest revision we no longer need | 233 | // The lowest revision we no longer need |
220 | qint64 mLowerBoundRevision; | 234 | qint64 mLowerBoundRevision; |
221 | InspectionFunction mInspect; | 235 | InspectionFunction mInspect; |
236 | FlushFunction mFlush; | ||
222 | }; | 237 | }; |
223 | 238 | ||
224 | GenericResource::GenericResource(const ResourceContext &resourceContext, const QSharedPointer<Pipeline> &pipeline ) | 239 | GenericResource::GenericResource(const ResourceContext &resourceContext, const QSharedPointer<Pipeline> &pipeline ) |
@@ -266,6 +281,26 @@ GenericResource::GenericResource(const ResourceContext &resourceContext, const Q | |||
266 | } | 281 | } |
267 | return KAsync::error<void>(-1, "Invalid inspection command."); | 282 | return KAsync::error<void>(-1, "Invalid inspection command."); |
268 | }); | 283 | }); |
284 | mProcessor->setFlushCommand([this](void const *command, size_t size) { | ||
285 | flatbuffers::Verifier verifier((const uint8_t *)command, size); | ||
286 | if (Sink::Commands::VerifyFlushBuffer(verifier)) { | ||
287 | auto buffer = Sink::Commands::GetFlush(command); | ||
288 | const auto flushType = buffer->type(); | ||
289 | const auto flushId = BufferUtils::extractBuffer(buffer->id()); | ||
290 | if (flushType == Sink::Flush::FlushReplayQueue) { | ||
291 | SinkTrace() << "Flushing synchronizer "; | ||
292 | mSynchronizer->flush(flushType, flushId); | ||
293 | } else { | ||
294 | SinkTrace() << "Emitting flush completion" << flushId; | ||
295 | Sink::Notification n; | ||
296 | n.type = Sink::Notification::FlushCompletion; | ||
297 | n.id = flushId; | ||
298 | emit notify(n); | ||
299 | } | ||
300 | return KAsync::null<void>(); | ||
301 | } | ||
302 | return KAsync::error<void>(-1, "Invalid flush command."); | ||
303 | }); | ||
269 | { | 304 | { |
270 | auto ret = QObject::connect(mProcessor.get(), &CommandProcessor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); }); | 305 | auto ret = QObject::connect(mProcessor.get(), &CommandProcessor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); }); |
271 | Q_ASSERT(ret); | 306 | Q_ASSERT(ret); |
@@ -371,6 +406,10 @@ void GenericResource::enqueueCommand(MessageQueue &mq, int commandId, const QByt | |||
371 | 406 | ||
372 | void GenericResource::processCommand(int commandId, const QByteArray &data) | 407 | void GenericResource::processCommand(int commandId, const QByteArray &data) |
373 | { | 408 | { |
409 | if (commandId == Commands::FlushCommand) { | ||
410 | processFlushCommand(data); | ||
411 | return; | ||
412 | } | ||
374 | static int modifications = 0; | 413 | static int modifications = 0; |
375 | mUserQueue.startTransaction(); | 414 | mUserQueue.startTransaction(); |
376 | enqueueCommand(mUserQueue, commandId, data); | 415 | enqueueCommand(mUserQueue, commandId, data); |
@@ -384,6 +423,24 @@ void GenericResource::processCommand(int commandId, const QByteArray &data) | |||
384 | } | 423 | } |
385 | } | 424 | } |
386 | 425 | ||
426 | void GenericResource::processFlushCommand(const QByteArray &data) | ||
427 | { | ||
428 | flatbuffers::Verifier verifier((const uint8_t *)data.constData(), data.size()); | ||
429 | if (Sink::Commands::VerifyFlushBuffer(verifier)) { | ||
430 | auto buffer = Sink::Commands::GetFlush(data.constData()); | ||
431 | const auto flushType = buffer->type(); | ||
432 | const auto flushId = BufferUtils::extractBuffer(buffer->id()); | ||
433 | if (flushType == Sink::Flush::FlushSynchronization) { | ||
434 | mSynchronizer->flush(flushType, flushId); | ||
435 | } else { | ||
436 | mUserQueue.startTransaction(); | ||
437 | enqueueCommand(mUserQueue, Commands::FlushCommand, data); | ||
438 | mUserQueue.commit(); | ||
439 | } | ||
440 | } | ||
441 | |||
442 | } | ||
443 | |||
387 | KAsync::Job<void> GenericResource::synchronizeWithSource(const Sink::QueryBase &query) | 444 | KAsync::Job<void> GenericResource::synchronizeWithSource(const Sink::QueryBase &query) |
388 | { | 445 | { |
389 | return KAsync::start<void>([this, query] { | 446 | return KAsync::start<void>([this, query] { |