summaryrefslogtreecommitdiffstats
path: root/common/genericresource.cpp
diff options
context:
space:
mode:
authorChristian Mollekopf <chrigi_1@fastmail.fm>2016-05-29 15:19:21 +0200
committerChristian Mollekopf <chrigi_1@fastmail.fm>2016-05-29 15:19:21 +0200
commitdabd408dcd372f16c7934597db30346869cd8ad8 (patch)
tree0d6513204b7fa6e34cf50733ad0472866ea05e2a /common/genericresource.cpp
parentb441386c4e138d19bbd79d578e0a2ff1b3f54a93 (diff)
downloadsink-dabd408dcd372f16c7934597db30346869cd8ad8.tar.gz
sink-dabd408dcd372f16c7934597db30346869cd8ad8.zip
Fixed genericresource so it works with the maildirresourcetest
Diffstat (limited to 'common/genericresource.cpp')
-rw-r--r--common/genericresource.cpp31
1 files changed, 17 insertions, 14 deletions
diff --git a/common/genericresource.cpp b/common/genericresource.cpp
index 568e066..cd3ea02 100644
--- a/common/genericresource.cpp
+++ b/common/genericresource.cpp
@@ -233,22 +233,17 @@ private:
233#undef DEBUG_AREA 233#undef DEBUG_AREA
234#define DEBUG_AREA "resource" 234#define DEBUG_AREA "resource"
235 235
236GenericResource::GenericResource(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier, const QSharedPointer<Pipeline> &pipeline, const QSharedPointer<ChangeReplay> &changeReplay, const QSharedPointer<Synchronizer> &synchronizer) 236GenericResource::GenericResource(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier, const QSharedPointer<Pipeline> &pipeline )
237 : Sink::Resource(), 237 : Sink::Resource(),
238 mUserQueue(Sink::storageLocation(), resourceInstanceIdentifier + ".userqueue"), 238 mUserQueue(Sink::storageLocation(), resourceInstanceIdentifier + ".userqueue"),
239 mSynchronizerQueue(Sink::storageLocation(), resourceInstanceIdentifier + ".synchronizerqueue"), 239 mSynchronizerQueue(Sink::storageLocation(), resourceInstanceIdentifier + ".synchronizerqueue"),
240 mResourceType(resourceType), 240 mResourceType(resourceType),
241 mResourceInstanceIdentifier(resourceInstanceIdentifier), 241 mResourceInstanceIdentifier(resourceInstanceIdentifier),
242 mPipeline(pipeline ? pipeline : QSharedPointer<Sink::Pipeline>::create(resourceInstanceIdentifier)), 242 mPipeline(pipeline ? pipeline : QSharedPointer<Sink::Pipeline>::create(resourceInstanceIdentifier)),
243 mChangeReplay(changeReplay),
244 mSynchronizer(synchronizer),
245 mError(0), 243 mError(0),
246 mClientLowerBoundRevision(std::numeric_limits<qint64>::max()) 244 mClientLowerBoundRevision(std::numeric_limits<qint64>::max())
247{ 245{
248 mPipeline->setResourceType(mResourceType); 246 mPipeline->setResourceType(mResourceType);
249 mSynchronizer->setup([this](int commandId, const QByteArray &data) {
250 enqueueCommand(mSynchronizerQueue, commandId, data);
251 });
252 mProcessor = new CommandProcessor(mPipeline.data(), QList<MessageQueue *>() << &mUserQueue << &mSynchronizerQueue); 247 mProcessor = new CommandProcessor(mPipeline.data(), QList<MessageQueue *>() << &mUserQueue << &mSynchronizerQueue);
253 mProcessor->setInspectionCommand([this](void const *command, size_t size) { 248 mProcessor->setInspectionCommand([this](void const *command, size_t size) {
254 flatbuffers::Verifier verifier((const uint8_t *)command, size); 249 flatbuffers::Verifier verifier((const uint8_t *)command, size);
@@ -290,9 +285,7 @@ GenericResource::GenericResource(const QByteArray &resourceType, const QByteArra
290 }); 285 });
291 QObject::connect(mProcessor, &CommandProcessor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); }); 286 QObject::connect(mProcessor, &CommandProcessor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); });
292 QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, this, &Resource::revisionUpdated); 287 QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, this, &Resource::revisionUpdated);
293 enableChangeReplay(true);
294 mClientLowerBoundRevision = mPipeline->cleanedUpRevision(); 288 mClientLowerBoundRevision = mPipeline->cleanedUpRevision();
295 mProcessor->setOldestUsedRevision(mChangeReplay->getLastReplayedRevision());
296 289
297 mCommitQueueTimer.setInterval(sCommitInterval); 290 mCommitQueueTimer.setInterval(sCommitInterval);
298 mCommitQueueTimer.setSingleShot(true); 291 mCommitQueueTimer.setSingleShot(true);
@@ -313,6 +306,7 @@ KAsync::Job<void> GenericResource::inspect(
313 306
314void GenericResource::enableChangeReplay(bool enable) 307void GenericResource::enableChangeReplay(bool enable)
315{ 308{
309 Q_ASSERT(mChangeReplay);
316 if (enable) { 310 if (enable) {
317 QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, mChangeReplay.data(), &ChangeReplay::revisionChanged, Qt::QueuedConnection); 311 QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, mChangeReplay.data(), &ChangeReplay::revisionChanged, Qt::QueuedConnection);
318 QObject::connect(mChangeReplay.data(), &ChangeReplay::changesReplayed, this, &GenericResource::updateLowerBoundRevision); 312 QObject::connect(mChangeReplay.data(), &ChangeReplay::changesReplayed, this, &GenericResource::updateLowerBoundRevision);
@@ -323,11 +317,25 @@ void GenericResource::enableChangeReplay(bool enable)
323 } 317 }
324} 318}
325 319
326void GenericResource::addType(const QByteArray &type, DomainTypeAdaptorFactoryInterface::Ptr factory, const QVector<Sink::Preprocessor *> &preprocessors) 320void GenericResource::setupPreprocessors(const QByteArray &type, const QVector<Sink::Preprocessor *> &preprocessors)
327{ 321{
328 mPipeline->setPreprocessors(type, preprocessors); 322 mPipeline->setPreprocessors(type, preprocessors);
329} 323}
330 324
325void GenericResource::setupSynchronizer(const QSharedPointer<Synchronizer> &synchronizer)
326{
327 mSynchronizer = synchronizer;
328 mSynchronizer->setup([this](int commandId, const QByteArray &data) {
329 enqueueCommand(mSynchronizerQueue, commandId, data);
330 });
331}
332
333void GenericResource::setupChangereplay(const QSharedPointer<ChangeReplay> &changeReplay)
334{
335 mChangeReplay = changeReplay;
336 mProcessor->setOldestUsedRevision(mChangeReplay->getLastReplayedRevision());
337 enableChangeReplay(true);
338}
331 339
332void GenericResource::removeDataFromDisk() 340void GenericResource::removeDataFromDisk()
333{ 341{
@@ -406,11 +414,6 @@ KAsync::Job<void> GenericResource::synchronizeWithSource()
406 }); 414 });
407} 415}
408 416
409KAsync::Job<void> GenericResource::synchronizeWithSource(Sink::Storage &mainStore, Sink::Storage &synchronizationStore)
410{
411 return KAsync::null<void>();
412}
413
414static void waitForDrained(KAsync::Future<void> &f, MessageQueue &queue) 417static void waitForDrained(KAsync::Future<void> &f, MessageQueue &queue)
415{ 418{
416 if (queue.isEmpty()) { 419 if (queue.isEmpty()) {