summaryrefslogtreecommitdiffstats
path: root/common/genericresource.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'common/genericresource.cpp')
-rw-r--r--common/genericresource.cpp57
1 files changed, 57 insertions, 0 deletions
diff --git a/common/genericresource.cpp b/common/genericresource.cpp
index 7c4d4ea..7b83957 100644
--- a/common/genericresource.cpp
+++ b/common/genericresource.cpp
@@ -27,6 +27,7 @@
27#include "deleteentity_generated.h" 27#include "deleteentity_generated.h"
28#include "inspection_generated.h" 28#include "inspection_generated.h"
29#include "notification_generated.h" 29#include "notification_generated.h"
30#include "flush_generated.h"
30#include "domainadaptor.h" 31#include "domainadaptor.h"
31#include "commands.h" 32#include "commands.h"
32#include "index.h" 33#include "index.h"
@@ -54,6 +55,7 @@ class CommandProcessor : public QObject
54{ 55{
55 Q_OBJECT 56 Q_OBJECT
56 typedef std::function<KAsync::Job<void>(void const *, size_t)> InspectionFunction; 57 typedef std::function<KAsync::Job<void>(void const *, size_t)> InspectionFunction;
58 typedef std::function<KAsync::Job<void>(void const *, size_t)> FlushFunction;
57 SINK_DEBUG_AREA("commandprocessor") 59 SINK_DEBUG_AREA("commandprocessor")
58 60
59public: 61public:
@@ -75,6 +77,11 @@ public:
75 mInspect = f; 77 mInspect = f;
76 } 78 }
77 79
80 void setFlushCommand(const FlushFunction &f)
81 {
82 mFlush = f;
83 }
84
78signals: 85signals:
79 void error(int errorCode, const QString &errorMessage); 86 void error(int errorCode, const QString &errorMessage);
80 87
@@ -124,6 +131,13 @@ private slots:
124 } else { 131 } else {
125 return KAsync::error<qint64>(-1, "Missing inspection command."); 132 return KAsync::error<qint64>(-1, "Missing inspection command.");
126 } 133 }
134 case Sink::Commands::FlushCommand:
135 if (mFlush) {
136 return mFlush(queuedCommand->command()->Data(), queuedCommand->command()->size())
137 .syncThen<qint64>([]() { return -1; });
138 } else {
139 return KAsync::error<qint64>(-1, "Missing inspection command.");
140 }
127 default: 141 default:
128 return KAsync::error<qint64>(-1, "Unhandled command"); 142 return KAsync::error<qint64>(-1, "Unhandled command");
129 } 143 }
@@ -219,6 +233,7 @@ private:
219 // The lowest revision we no longer need 233 // The lowest revision we no longer need
220 qint64 mLowerBoundRevision; 234 qint64 mLowerBoundRevision;
221 InspectionFunction mInspect; 235 InspectionFunction mInspect;
236 FlushFunction mFlush;
222}; 237};
223 238
224GenericResource::GenericResource(const ResourceContext &resourceContext, const QSharedPointer<Pipeline> &pipeline ) 239GenericResource::GenericResource(const ResourceContext &resourceContext, const QSharedPointer<Pipeline> &pipeline )
@@ -266,6 +281,26 @@ GenericResource::GenericResource(const ResourceContext &resourceContext, const Q
266 } 281 }
267 return KAsync::error<void>(-1, "Invalid inspection command."); 282 return KAsync::error<void>(-1, "Invalid inspection command.");
268 }); 283 });
284 mProcessor->setFlushCommand([this](void const *command, size_t size) {
285 flatbuffers::Verifier verifier((const uint8_t *)command, size);
286 if (Sink::Commands::VerifyFlushBuffer(verifier)) {
287 auto buffer = Sink::Commands::GetFlush(command);
288 const auto flushType = buffer->type();
289 const auto flushId = BufferUtils::extractBuffer(buffer->id());
290 if (flushType == Sink::Flush::FlushReplayQueue) {
291 SinkTrace() << "Flushing synchronizer ";
292 mSynchronizer->flush(flushType, flushId);
293 } else {
294 SinkTrace() << "Emitting flush completion" << flushId;
295 Sink::Notification n;
296 n.type = Sink::Notification::FlushCompletion;
297 n.id = flushId;
298 emit notify(n);
299 }
300 return KAsync::null<void>();
301 }
302 return KAsync::error<void>(-1, "Invalid flush command.");
303 });
269 { 304 {
270 auto ret = QObject::connect(mProcessor.get(), &CommandProcessor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); }); 305 auto ret = QObject::connect(mProcessor.get(), &CommandProcessor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); });
271 Q_ASSERT(ret); 306 Q_ASSERT(ret);
@@ -371,6 +406,10 @@ void GenericResource::enqueueCommand(MessageQueue &mq, int commandId, const QByt
371 406
372void GenericResource::processCommand(int commandId, const QByteArray &data) 407void GenericResource::processCommand(int commandId, const QByteArray &data)
373{ 408{
409 if (commandId == Commands::FlushCommand) {
410 processFlushCommand(data);
411 return;
412 }
374 static int modifications = 0; 413 static int modifications = 0;
375 mUserQueue.startTransaction(); 414 mUserQueue.startTransaction();
376 enqueueCommand(mUserQueue, commandId, data); 415 enqueueCommand(mUserQueue, commandId, data);
@@ -384,6 +423,24 @@ void GenericResource::processCommand(int commandId, const QByteArray &data)
384 } 423 }
385} 424}
386 425
426void GenericResource::processFlushCommand(const QByteArray &data)
427{
428 flatbuffers::Verifier verifier((const uint8_t *)data.constData(), data.size());
429 if (Sink::Commands::VerifyFlushBuffer(verifier)) {
430 auto buffer = Sink::Commands::GetFlush(data.constData());
431 const auto flushType = buffer->type();
432 const auto flushId = BufferUtils::extractBuffer(buffer->id());
433 if (flushType == Sink::Flush::FlushSynchronization) {
434 mSynchronizer->flush(flushType, flushId);
435 } else {
436 mUserQueue.startTransaction();
437 enqueueCommand(mUserQueue, Commands::FlushCommand, data);
438 mUserQueue.commit();
439 }
440 }
441
442}
443
387KAsync::Job<void> GenericResource::synchronizeWithSource(const Sink::QueryBase &query) 444KAsync::Job<void> GenericResource::synchronizeWithSource(const Sink::QueryBase &query)
388{ 445{
389 return KAsync::start<void>([this, query] { 446 return KAsync::start<void>([this, query] {