diff options
Diffstat (limited to 'common/genericresource.cpp')
-rw-r--r-- | common/genericresource.cpp | 31 |
1 files changed, 17 insertions, 14 deletions
diff --git a/common/genericresource.cpp b/common/genericresource.cpp index 568e066..cd3ea02 100644 --- a/common/genericresource.cpp +++ b/common/genericresource.cpp | |||
@@ -233,22 +233,17 @@ private: | |||
233 | #undef DEBUG_AREA | 233 | #undef DEBUG_AREA |
234 | #define DEBUG_AREA "resource" | 234 | #define DEBUG_AREA "resource" |
235 | 235 | ||
236 | GenericResource::GenericResource(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier, const QSharedPointer<Pipeline> &pipeline, const QSharedPointer<ChangeReplay> &changeReplay, const QSharedPointer<Synchronizer> &synchronizer) | 236 | GenericResource::GenericResource(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier, const QSharedPointer<Pipeline> &pipeline ) |
237 | : Sink::Resource(), | 237 | : Sink::Resource(), |
238 | mUserQueue(Sink::storageLocation(), resourceInstanceIdentifier + ".userqueue"), | 238 | mUserQueue(Sink::storageLocation(), resourceInstanceIdentifier + ".userqueue"), |
239 | mSynchronizerQueue(Sink::storageLocation(), resourceInstanceIdentifier + ".synchronizerqueue"), | 239 | mSynchronizerQueue(Sink::storageLocation(), resourceInstanceIdentifier + ".synchronizerqueue"), |
240 | mResourceType(resourceType), | 240 | mResourceType(resourceType), |
241 | mResourceInstanceIdentifier(resourceInstanceIdentifier), | 241 | mResourceInstanceIdentifier(resourceInstanceIdentifier), |
242 | mPipeline(pipeline ? pipeline : QSharedPointer<Sink::Pipeline>::create(resourceInstanceIdentifier)), | 242 | mPipeline(pipeline ? pipeline : QSharedPointer<Sink::Pipeline>::create(resourceInstanceIdentifier)), |
243 | mChangeReplay(changeReplay), | ||
244 | mSynchronizer(synchronizer), | ||
245 | mError(0), | 243 | mError(0), |
246 | mClientLowerBoundRevision(std::numeric_limits<qint64>::max()) | 244 | mClientLowerBoundRevision(std::numeric_limits<qint64>::max()) |
247 | { | 245 | { |
248 | mPipeline->setResourceType(mResourceType); | 246 | mPipeline->setResourceType(mResourceType); |
249 | mSynchronizer->setup([this](int commandId, const QByteArray &data) { | ||
250 | enqueueCommand(mSynchronizerQueue, commandId, data); | ||
251 | }); | ||
252 | mProcessor = new CommandProcessor(mPipeline.data(), QList<MessageQueue *>() << &mUserQueue << &mSynchronizerQueue); | 247 | mProcessor = new CommandProcessor(mPipeline.data(), QList<MessageQueue *>() << &mUserQueue << &mSynchronizerQueue); |
253 | mProcessor->setInspectionCommand([this](void const *command, size_t size) { | 248 | mProcessor->setInspectionCommand([this](void const *command, size_t size) { |
254 | flatbuffers::Verifier verifier((const uint8_t *)command, size); | 249 | flatbuffers::Verifier verifier((const uint8_t *)command, size); |
@@ -290,9 +285,7 @@ GenericResource::GenericResource(const QByteArray &resourceType, const QByteArra | |||
290 | }); | 285 | }); |
291 | QObject::connect(mProcessor, &CommandProcessor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); }); | 286 | QObject::connect(mProcessor, &CommandProcessor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); }); |
292 | QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, this, &Resource::revisionUpdated); | 287 | QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, this, &Resource::revisionUpdated); |
293 | enableChangeReplay(true); | ||
294 | mClientLowerBoundRevision = mPipeline->cleanedUpRevision(); | 288 | mClientLowerBoundRevision = mPipeline->cleanedUpRevision(); |
295 | mProcessor->setOldestUsedRevision(mChangeReplay->getLastReplayedRevision()); | ||
296 | 289 | ||
297 | mCommitQueueTimer.setInterval(sCommitInterval); | 290 | mCommitQueueTimer.setInterval(sCommitInterval); |
298 | mCommitQueueTimer.setSingleShot(true); | 291 | mCommitQueueTimer.setSingleShot(true); |
@@ -313,6 +306,7 @@ KAsync::Job<void> GenericResource::inspect( | |||
313 | 306 | ||
314 | void GenericResource::enableChangeReplay(bool enable) | 307 | void GenericResource::enableChangeReplay(bool enable) |
315 | { | 308 | { |
309 | Q_ASSERT(mChangeReplay); | ||
316 | if (enable) { | 310 | if (enable) { |
317 | QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, mChangeReplay.data(), &ChangeReplay::revisionChanged, Qt::QueuedConnection); | 311 | QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, mChangeReplay.data(), &ChangeReplay::revisionChanged, Qt::QueuedConnection); |
318 | QObject::connect(mChangeReplay.data(), &ChangeReplay::changesReplayed, this, &GenericResource::updateLowerBoundRevision); | 312 | QObject::connect(mChangeReplay.data(), &ChangeReplay::changesReplayed, this, &GenericResource::updateLowerBoundRevision); |
@@ -323,11 +317,25 @@ void GenericResource::enableChangeReplay(bool enable) | |||
323 | } | 317 | } |
324 | } | 318 | } |
325 | 319 | ||
326 | void GenericResource::addType(const QByteArray &type, DomainTypeAdaptorFactoryInterface::Ptr factory, const QVector<Sink::Preprocessor *> &preprocessors) | 320 | void GenericResource::setupPreprocessors(const QByteArray &type, const QVector<Sink::Preprocessor *> &preprocessors) |
327 | { | 321 | { |
328 | mPipeline->setPreprocessors(type, preprocessors); | 322 | mPipeline->setPreprocessors(type, preprocessors); |
329 | } | 323 | } |
330 | 324 | ||
325 | void GenericResource::setupSynchronizer(const QSharedPointer<Synchronizer> &synchronizer) | ||
326 | { | ||
327 | mSynchronizer = synchronizer; | ||
328 | mSynchronizer->setup([this](int commandId, const QByteArray &data) { | ||
329 | enqueueCommand(mSynchronizerQueue, commandId, data); | ||
330 | }); | ||
331 | } | ||
332 | |||
333 | void GenericResource::setupChangereplay(const QSharedPointer<ChangeReplay> &changeReplay) | ||
334 | { | ||
335 | mChangeReplay = changeReplay; | ||
336 | mProcessor->setOldestUsedRevision(mChangeReplay->getLastReplayedRevision()); | ||
337 | enableChangeReplay(true); | ||
338 | } | ||
331 | 339 | ||
332 | void GenericResource::removeDataFromDisk() | 340 | void GenericResource::removeDataFromDisk() |
333 | { | 341 | { |
@@ -406,11 +414,6 @@ KAsync::Job<void> GenericResource::synchronizeWithSource() | |||
406 | }); | 414 | }); |
407 | } | 415 | } |
408 | 416 | ||
409 | KAsync::Job<void> GenericResource::synchronizeWithSource(Sink::Storage &mainStore, Sink::Storage &synchronizationStore) | ||
410 | { | ||
411 | return KAsync::null<void>(); | ||
412 | } | ||
413 | |||
414 | static void waitForDrained(KAsync::Future<void> &f, MessageQueue &queue) | 417 | static void waitForDrained(KAsync::Future<void> &f, MessageQueue &queue) |
415 | { | 418 | { |
416 | if (queue.isEmpty()) { | 419 | if (queue.isEmpty()) { |