diff options
Diffstat (limited to 'common/genericresource.cpp')
-rw-r--r-- | common/genericresource.cpp | 560 |
1 files changed, 303 insertions, 257 deletions
diff --git a/common/genericresource.cpp b/common/genericresource.cpp index eae6ead..cb2ef21 100644 --- a/common/genericresource.cpp +++ b/common/genericresource.cpp | |||
@@ -1,3 +1,22 @@ | |||
1 | /* | ||
2 | * Copyright (C) 2016 Christian Mollekopf <mollekopf@kolabsys.com> | ||
3 | * | ||
4 | * This library is free software; you can redistribute it and/or | ||
5 | * modify it under the terms of the GNU Lesser General Public | ||
6 | * License as published by the Free Software Foundation; either | ||
7 | * version 2.1 of the License, or (at your option) version 3, or any | ||
8 | * later version accepted by the membership of KDE e.V. (or its | ||
9 | * successor approved by the membership of KDE e.V.), which shall | ||
10 | * act as a proxy defined in Section 6 of version 3 of the license. | ||
11 | * | ||
12 | * This library is distributed in the hope that it will be useful, | ||
13 | * but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
14 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | ||
15 | * Lesser General Public License for more details. | ||
16 | * | ||
17 | * You should have received a copy of the GNU Lesser General Public | ||
18 | * License along with this library. If not, see <http://www.gnu.org/licenses/>. | ||
19 | */ | ||
1 | #include "genericresource.h" | 20 | #include "genericresource.h" |
2 | 21 | ||
3 | #include "entitybuffer.h" | 22 | #include "entitybuffer.h" |
@@ -14,6 +33,7 @@ | |||
14 | #include "log.h" | 33 | #include "log.h" |
15 | #include "definitions.h" | 34 | #include "definitions.h" |
16 | #include "bufferutils.h" | 35 | #include "bufferutils.h" |
36 | #include "adaptorfactoryregistry.h" | ||
17 | 37 | ||
18 | #include <QUuid> | 38 | #include <QUuid> |
19 | #include <QDataStream> | 39 | #include <QDataStream> |
@@ -30,96 +50,6 @@ static int sCommitInterval = 10; | |||
30 | using namespace Sink; | 50 | using namespace Sink; |
31 | 51 | ||
32 | #undef DEBUG_AREA | 52 | #undef DEBUG_AREA |
33 | #define DEBUG_AREA "resource.changereplay" | ||
34 | |||
35 | /** | ||
36 | * Replays changes from the storage one by one. | ||
37 | * | ||
38 | * Uses a local database to: | ||
39 | * * Remember what changes have been replayed already. | ||
40 | * * store a mapping of remote to local buffers | ||
41 | */ | ||
42 | class ChangeReplay : public QObject | ||
43 | { | ||
44 | Q_OBJECT | ||
45 | public: | ||
46 | typedef std::function<KAsync::Job<void>(const QByteArray &type, const QByteArray &key, const QByteArray &value)> ReplayFunction; | ||
47 | |||
48 | ChangeReplay(const QString &resourceName, const ReplayFunction &replayFunction) | ||
49 | : mStorage(storageLocation(), resourceName, Storage::ReadOnly), mChangeReplayStore(storageLocation(), resourceName + ".changereplay", Storage::ReadWrite), mReplayFunction(replayFunction) | ||
50 | { | ||
51 | } | ||
52 | |||
53 | qint64 getLastReplayedRevision() | ||
54 | { | ||
55 | qint64 lastReplayedRevision = 0; | ||
56 | auto replayStoreTransaction = mChangeReplayStore.createTransaction(Storage::ReadOnly); | ||
57 | replayStoreTransaction.openDatabase().scan("lastReplayedRevision", | ||
58 | [&lastReplayedRevision](const QByteArray &key, const QByteArray &value) -> bool { | ||
59 | lastReplayedRevision = value.toLongLong(); | ||
60 | return false; | ||
61 | }, | ||
62 | [](const Storage::Error &) {}); | ||
63 | return lastReplayedRevision; | ||
64 | } | ||
65 | |||
66 | bool allChangesReplayed() | ||
67 | { | ||
68 | const qint64 topRevision = Storage::maxRevision(mStorage.createTransaction(Storage::ReadOnly)); | ||
69 | const qint64 lastReplayedRevision = getLastReplayedRevision(); | ||
70 | Trace() << "All changes replayed " << topRevision << lastReplayedRevision; | ||
71 | return (lastReplayedRevision >= topRevision); | ||
72 | } | ||
73 | |||
74 | signals: | ||
75 | void changesReplayed(); | ||
76 | |||
77 | public slots: | ||
78 | void revisionChanged() | ||
79 | { | ||
80 | auto mainStoreTransaction = mStorage.createTransaction(Storage::ReadOnly); | ||
81 | auto replayStoreTransaction = mChangeReplayStore.createTransaction(Storage::ReadWrite); | ||
82 | qint64 lastReplayedRevision = 1; | ||
83 | replayStoreTransaction.openDatabase().scan("lastReplayedRevision", | ||
84 | [&lastReplayedRevision](const QByteArray &key, const QByteArray &value) -> bool { | ||
85 | lastReplayedRevision = value.toLongLong(); | ||
86 | return false; | ||
87 | }, | ||
88 | [](const Storage::Error &) {}); | ||
89 | const qint64 topRevision = Storage::maxRevision(mainStoreTransaction); | ||
90 | |||
91 | Trace() << "Changereplay from " << lastReplayedRevision << " to " << topRevision; | ||
92 | if (lastReplayedRevision <= topRevision) { | ||
93 | qint64 revision = lastReplayedRevision; | ||
94 | for (; revision <= topRevision; revision++) { | ||
95 | const auto uid = Storage::getUidFromRevision(mainStoreTransaction, revision); | ||
96 | const auto type = Storage::getTypeFromRevision(mainStoreTransaction, revision); | ||
97 | const auto key = Storage::assembleKey(uid, revision); | ||
98 | Storage::mainDatabase(mainStoreTransaction, type) | ||
99 | .scan(key, | ||
100 | [&lastReplayedRevision, type, this](const QByteArray &key, const QByteArray &value) -> bool { | ||
101 | mReplayFunction(type, key, value).exec(); | ||
102 | // TODO make for loop async, and pass to async replay function together with type | ||
103 | Trace() << "Replaying " << key; | ||
104 | return false; | ||
105 | }, | ||
106 | [key](const Storage::Error &) { ErrorMsg() << "Failed to replay change " << key; }); | ||
107 | } | ||
108 | revision--; | ||
109 | replayStoreTransaction.openDatabase().write("lastReplayedRevision", QByteArray::number(revision)); | ||
110 | replayStoreTransaction.commit(); | ||
111 | Trace() << "Replayed until " << revision; | ||
112 | } | ||
113 | emit changesReplayed(); | ||
114 | } | ||
115 | |||
116 | private: | ||
117 | Sink::Storage mStorage; | ||
118 | Sink::Storage mChangeReplayStore; | ||
119 | ReplayFunction mReplayFunction; | ||
120 | }; | ||
121 | |||
122 | #undef DEBUG_AREA | ||
123 | #define DEBUG_AREA "resource.commandprocessor" | 53 | #define DEBUG_AREA "resource.commandprocessor" |
124 | 54 | ||
125 | /** | 55 | /** |
@@ -133,10 +63,9 @@ class CommandProcessor : public QObject | |||
133 | public: | 63 | public: |
134 | CommandProcessor(Sink::Pipeline *pipeline, QList<MessageQueue *> commandQueues) : QObject(), mPipeline(pipeline), mCommandQueues(commandQueues), mProcessingLock(false) | 64 | CommandProcessor(Sink::Pipeline *pipeline, QList<MessageQueue *> commandQueues) : QObject(), mPipeline(pipeline), mCommandQueues(commandQueues), mProcessingLock(false) |
135 | { | 65 | { |
136 | mPipeline->startTransaction(); | 66 | mLowerBoundRevision = Storage::maxRevision(mPipeline->storage().createTransaction(Storage::ReadOnly, [](const Sink::Storage::Error &error) { |
137 | // FIXME Should be initialized to the current value of the change replay queue | 67 | Warning() << error.message; |
138 | mLowerBoundRevision = Storage::maxRevision(mPipeline->transaction()); | 68 | })); |
139 | mPipeline->commit(); | ||
140 | 69 | ||
141 | for (auto queue : mCommandQueues) { | 70 | for (auto queue : mCommandQueues) { |
142 | const bool ret = connect(queue, &MessageQueue::messageReady, this, &CommandProcessor::process); | 71 | const bool ret = connect(queue, &MessageQueue::messageReady, this, &CommandProcessor::process); |
@@ -303,15 +232,22 @@ private: | |||
303 | #undef DEBUG_AREA | 232 | #undef DEBUG_AREA |
304 | #define DEBUG_AREA "resource" | 233 | #define DEBUG_AREA "resource" |
305 | 234 | ||
306 | GenericResource::GenericResource(const QByteArray &resourceInstanceIdentifier, const QSharedPointer<Pipeline> &pipeline) | 235 | GenericResource::GenericResource(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier, const QSharedPointer<Pipeline> &pipeline, const QSharedPointer<ChangeReplay> &changeReplay, const QSharedPointer<Synchronizer> &synchronizer) |
307 | : Sink::Resource(), | 236 | : Sink::Resource(), |
308 | mUserQueue(Sink::storageLocation(), resourceInstanceIdentifier + ".userqueue"), | 237 | mUserQueue(Sink::storageLocation(), resourceInstanceIdentifier + ".userqueue"), |
309 | mSynchronizerQueue(Sink::storageLocation(), resourceInstanceIdentifier + ".synchronizerqueue"), | 238 | mSynchronizerQueue(Sink::storageLocation(), resourceInstanceIdentifier + ".synchronizerqueue"), |
239 | mResourceType(resourceType), | ||
310 | mResourceInstanceIdentifier(resourceInstanceIdentifier), | 240 | mResourceInstanceIdentifier(resourceInstanceIdentifier), |
311 | mPipeline(pipeline ? pipeline : QSharedPointer<Sink::Pipeline>::create(resourceInstanceIdentifier)), | 241 | mPipeline(pipeline ? pipeline : QSharedPointer<Sink::Pipeline>::create(resourceInstanceIdentifier)), |
242 | mChangeReplay(changeReplay), | ||
243 | mSynchronizer(synchronizer), | ||
312 | mError(0), | 244 | mError(0), |
313 | mClientLowerBoundRevision(std::numeric_limits<qint64>::max()) | 245 | mClientLowerBoundRevision(std::numeric_limits<qint64>::max()) |
314 | { | 246 | { |
247 | mPipeline->setResourceType(mResourceType); | ||
248 | mSynchronizer->setup([this](int commandId, const QByteArray &data) { | ||
249 | enqueueCommand(mSynchronizerQueue, commandId, data); | ||
250 | }); | ||
315 | mProcessor = new CommandProcessor(mPipeline.data(), QList<MessageQueue *>() << &mUserQueue << &mSynchronizerQueue); | 251 | mProcessor = new CommandProcessor(mPipeline.data(), QList<MessageQueue *>() << &mUserQueue << &mSynchronizerQueue); |
316 | mProcessor->setInspectionCommand([this](void const *command, size_t size) { | 252 | mProcessor->setInspectionCommand([this](void const *command, size_t size) { |
317 | flatbuffers::Verifier verifier((const uint8_t *)command, size); | 253 | flatbuffers::Verifier verifier((const uint8_t *)command, size); |
@@ -353,14 +289,9 @@ GenericResource::GenericResource(const QByteArray &resourceInstanceIdentifier, c | |||
353 | }); | 289 | }); |
354 | QObject::connect(mProcessor, &CommandProcessor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); }); | 290 | QObject::connect(mProcessor, &CommandProcessor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); }); |
355 | QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, this, &Resource::revisionUpdated); | 291 | QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, this, &Resource::revisionUpdated); |
356 | mSourceChangeReplay = new ChangeReplay(resourceInstanceIdentifier, [this](const QByteArray &type, const QByteArray &key, const QByteArray &value) { | ||
357 | // This results in a deadlock when a sync is in progress and we try to create a second writing transaction (which is why we turn changereplay off during the sync) | ||
358 | auto synchronizationStore = QSharedPointer<Sink::Storage>::create(Sink::storageLocation(), mResourceInstanceIdentifier + ".synchronization", Sink::Storage::ReadWrite); | ||
359 | return this->replay(*synchronizationStore, type, key, value).then<void>([synchronizationStore]() {}); | ||
360 | }); | ||
361 | enableChangeReplay(true); | 292 | enableChangeReplay(true); |
362 | mClientLowerBoundRevision = mPipeline->cleanedUpRevision(); | 293 | mClientLowerBoundRevision = mPipeline->cleanedUpRevision(); |
363 | mProcessor->setOldestUsedRevision(mSourceChangeReplay->getLastReplayedRevision()); | 294 | mProcessor->setOldestUsedRevision(mChangeReplay->getLastReplayedRevision()); |
364 | 295 | ||
365 | mCommitQueueTimer.setInterval(sCommitInterval); | 296 | mCommitQueueTimer.setInterval(sCommitInterval); |
366 | mCommitQueueTimer.setSingleShot(true); | 297 | mCommitQueueTimer.setSingleShot(true); |
@@ -370,7 +301,6 @@ GenericResource::GenericResource(const QByteArray &resourceInstanceIdentifier, c | |||
370 | GenericResource::~GenericResource() | 301 | GenericResource::~GenericResource() |
371 | { | 302 | { |
372 | delete mProcessor; | 303 | delete mProcessor; |
373 | delete mSourceChangeReplay; | ||
374 | } | 304 | } |
375 | 305 | ||
376 | KAsync::Job<void> GenericResource::inspect( | 306 | KAsync::Job<void> GenericResource::inspect( |
@@ -383,86 +313,20 @@ KAsync::Job<void> GenericResource::inspect( | |||
383 | void GenericResource::enableChangeReplay(bool enable) | 313 | void GenericResource::enableChangeReplay(bool enable) |
384 | { | 314 | { |
385 | if (enable) { | 315 | if (enable) { |
386 | QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, mSourceChangeReplay, &ChangeReplay::revisionChanged, Qt::QueuedConnection); | 316 | QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, mChangeReplay.data(), &ChangeReplay::revisionChanged, Qt::QueuedConnection); |
387 | QObject::connect(mSourceChangeReplay, &ChangeReplay::changesReplayed, this, &GenericResource::updateLowerBoundRevision); | 317 | QObject::connect(mChangeReplay.data(), &ChangeReplay::changesReplayed, this, &GenericResource::updateLowerBoundRevision); |
388 | mSourceChangeReplay->revisionChanged(); | 318 | mChangeReplay->revisionChanged(); |
389 | } else { | 319 | } else { |
390 | QObject::disconnect(mPipeline.data(), &Pipeline::revisionUpdated, mSourceChangeReplay, &ChangeReplay::revisionChanged); | 320 | QObject::disconnect(mPipeline.data(), &Pipeline::revisionUpdated, mChangeReplay.data(), &ChangeReplay::revisionChanged); |
391 | QObject::disconnect(mSourceChangeReplay, &ChangeReplay::changesReplayed, this, &GenericResource::updateLowerBoundRevision); | 321 | QObject::disconnect(mChangeReplay.data(), &ChangeReplay::changesReplayed, this, &GenericResource::updateLowerBoundRevision); |
392 | } | 322 | } |
393 | } | 323 | } |
394 | 324 | ||
395 | void GenericResource::addType(const QByteArray &type, DomainTypeAdaptorFactoryInterface::Ptr factory, const QVector<Sink::Preprocessor *> &preprocessors) | 325 | void GenericResource::addType(const QByteArray &type, DomainTypeAdaptorFactoryInterface::Ptr factory, const QVector<Sink::Preprocessor *> &preprocessors) |
396 | { | 326 | { |
397 | mPipeline->setPreprocessors(type, preprocessors); | 327 | mPipeline->setPreprocessors(type, preprocessors); |
398 | mPipeline->setAdaptorFactory(type, factory); | ||
399 | mAdaptorFactories.insert(type, factory); | ||
400 | } | ||
401 | |||
402 | KAsync::Job<void> GenericResource::replay(Sink::Storage &synchronizationStore, const QByteArray &type, const QByteArray &key, const QByteArray &value) | ||
403 | { | ||
404 | Sink::EntityBuffer buffer(value); | ||
405 | const Sink::Entity &entity = buffer.entity(); | ||
406 | const auto metadataBuffer = Sink::EntityBuffer::readBuffer<Sink::Metadata>(entity.metadata()); | ||
407 | Q_ASSERT(metadataBuffer); | ||
408 | if (!metadataBuffer->replayToSource()) { | ||
409 | Trace() << "Change is coming from the source"; | ||
410 | return KAsync::null<void>(); | ||
411 | } | ||
412 | const qint64 revision = metadataBuffer ? metadataBuffer->revision() : -1; | ||
413 | const auto operation = metadataBuffer ? metadataBuffer->operation() : Sink::Operation_Creation; | ||
414 | const auto uid = Sink::Storage::uidFromKey(key); | ||
415 | QByteArray oldRemoteId; | ||
416 | |||
417 | if (operation != Sink::Operation_Creation) { | ||
418 | auto synchronizationTransaction = synchronizationStore.createTransaction(Sink::Storage::ReadOnly); | ||
419 | oldRemoteId = resolveLocalId(type, uid, synchronizationTransaction); | ||
420 | } | ||
421 | Trace() << "Replaying " << key << type; | ||
422 | |||
423 | KAsync::Job<QByteArray> job = KAsync::null<QByteArray>(); | ||
424 | if (type == ENTITY_TYPE_FOLDER) { | ||
425 | const Sink::ApplicationDomain::Folder folder(mResourceInstanceIdentifier, uid, revision, mAdaptorFactories.value(type)->createAdaptor(entity)); | ||
426 | job = replay(folder, operation, oldRemoteId); | ||
427 | } else if (type == ENTITY_TYPE_MAIL) { | ||
428 | const Sink::ApplicationDomain::Mail mail(mResourceInstanceIdentifier, uid, revision, mAdaptorFactories.value(type)->createAdaptor(entity)); | ||
429 | job = replay(mail, operation, oldRemoteId); | ||
430 | } | ||
431 | |||
432 | return job.then<void, QByteArray>([=, &synchronizationStore](const QByteArray &remoteId) { | ||
433 | auto synchronizationTransaction = synchronizationStore.createTransaction(Sink::Storage::ReadWrite); | ||
434 | Trace() << "Replayed change with remote id: " << remoteId; | ||
435 | if (operation == Sink::Operation_Creation) { | ||
436 | if (remoteId.isEmpty()) { | ||
437 | Warning() << "Returned an empty remoteId from the creation"; | ||
438 | } else { | ||
439 | recordRemoteId(type, uid, remoteId, synchronizationTransaction); | ||
440 | } | ||
441 | } else if (operation == Sink::Operation_Modification) { | ||
442 | if (remoteId.isEmpty()) { | ||
443 | Warning() << "Returned an empty remoteId from the creation"; | ||
444 | } else { | ||
445 | updateRemoteId(type, uid, remoteId, synchronizationTransaction); | ||
446 | } | ||
447 | } else if (operation == Sink::Operation_Removal) { | ||
448 | removeRemoteId(type, uid, remoteId, synchronizationTransaction); | ||
449 | } else { | ||
450 | Warning() << "Unkown operation" << operation; | ||
451 | } | ||
452 | }, [](int errorCode, const QString &errorMessage) { | ||
453 | Warning() << "Failed to replay change: " << errorMessage; | ||
454 | }); | ||
455 | } | 328 | } |
456 | 329 | ||
457 | KAsync::Job<QByteArray> GenericResource::replay(const ApplicationDomain::Mail &, Sink::Operation, const QByteArray &) | ||
458 | { | ||
459 | return KAsync::null<QByteArray>(); | ||
460 | } | ||
461 | |||
462 | KAsync::Job<QByteArray> GenericResource::replay(const ApplicationDomain::Folder &, Sink::Operation, const QByteArray &) | ||
463 | { | ||
464 | return KAsync::null<QByteArray>(); | ||
465 | } | ||
466 | 330 | ||
467 | void GenericResource::removeDataFromDisk() | 331 | void GenericResource::removeDataFromDisk() |
468 | { | 332 | { |
@@ -528,10 +392,8 @@ KAsync::Job<void> GenericResource::synchronizeWithSource() | |||
528 | Log() << " Synchronizing"; | 392 | Log() << " Synchronizing"; |
529 | // Changereplay would deadlock otherwise when trying to open the synchronization store | 393 | // Changereplay would deadlock otherwise when trying to open the synchronization store |
530 | enableChangeReplay(false); | 394 | enableChangeReplay(false); |
531 | auto mainStore = QSharedPointer<Sink::Storage>::create(Sink::storageLocation(), mResourceInstanceIdentifier, Sink::Storage::ReadOnly); | 395 | mSynchronizer->synchronize() |
532 | auto syncStore = QSharedPointer<Sink::Storage>::create(Sink::storageLocation(), mResourceInstanceIdentifier + ".synchronization", Sink::Storage::ReadWrite); | 396 | .then<void>([this, &future]() { |
533 | synchronizeWithSource(*mainStore, *syncStore) | ||
534 | .then<void>([this, mainStore, syncStore, &future]() { | ||
535 | Log() << "Done Synchronizing"; | 397 | Log() << "Done Synchronizing"; |
536 | enableChangeReplay(true); | 398 | enableChangeReplay(true); |
537 | future.setFinished(); | 399 | future.setFinished(); |
@@ -576,11 +438,11 @@ KAsync::Job<void> GenericResource::processAllMessages() | |||
576 | .then<void>([this](KAsync::Future<void> &f) { waitForDrained(f, mSynchronizerQueue); }) | 438 | .then<void>([this](KAsync::Future<void> &f) { waitForDrained(f, mSynchronizerQueue); }) |
577 | .then<void>([this](KAsync::Future<void> &f) { waitForDrained(f, mUserQueue); }) | 439 | .then<void>([this](KAsync::Future<void> &f) { waitForDrained(f, mUserQueue); }) |
578 | .then<void>([this](KAsync::Future<void> &f) { | 440 | .then<void>([this](KAsync::Future<void> &f) { |
579 | if (mSourceChangeReplay->allChangesReplayed()) { | 441 | if (mChangeReplay->allChangesReplayed()) { |
580 | f.setFinished(); | 442 | f.setFinished(); |
581 | } else { | 443 | } else { |
582 | auto context = new QObject; | 444 | auto context = new QObject; |
583 | QObject::connect(mSourceChangeReplay, &ChangeReplay::changesReplayed, context, [&f, context]() { | 445 | QObject::connect(mChangeReplay.data(), &ChangeReplay::changesReplayed, context, [&f, context]() { |
584 | delete context; | 446 | delete context; |
585 | f.setFinished(); | 447 | f.setFinished(); |
586 | }); | 448 | }); |
@@ -590,7 +452,7 @@ KAsync::Job<void> GenericResource::processAllMessages() | |||
590 | 452 | ||
591 | void GenericResource::updateLowerBoundRevision() | 453 | void GenericResource::updateLowerBoundRevision() |
592 | { | 454 | { |
593 | mProcessor->setOldestUsedRevision(qMin(mClientLowerBoundRevision, mSourceChangeReplay->getLastReplayedRevision())); | 455 | mProcessor->setOldestUsedRevision(qMin(mClientLowerBoundRevision, mChangeReplay->getLastReplayedRevision())); |
594 | } | 456 | } |
595 | 457 | ||
596 | void GenericResource::setLowerBoundRevision(qint64 revision) | 458 | void GenericResource::setLowerBoundRevision(qint64 revision) |
@@ -599,7 +461,139 @@ void GenericResource::setLowerBoundRevision(qint64 revision) | |||
599 | updateLowerBoundRevision(); | 461 | updateLowerBoundRevision(); |
600 | } | 462 | } |
601 | 463 | ||
602 | void GenericResource::createEntity(const QByteArray &sinkId, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject, | 464 | |
465 | |||
466 | |||
467 | EntityStore::EntityStore(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier, Sink::Storage::Transaction &transaction) | ||
468 | : mResourceType(resourceType), mResourceInstanceIdentifier(resourceInstanceIdentifier), | ||
469 | mTransaction(transaction) | ||
470 | { | ||
471 | |||
472 | } | ||
473 | |||
474 | template<typename T> | ||
475 | T EntityStore::read(const QByteArray &identifier) const | ||
476 | { | ||
477 | auto typeName = ApplicationDomain::getTypeName<T>(); | ||
478 | auto mainDatabase = Storage::mainDatabase(mTransaction, typeName); | ||
479 | auto bufferAdaptor = getLatest(mainDatabase, identifier, *Sink::AdaptorFactoryRegistry::instance().getFactory<T>(mResourceType)); | ||
480 | Q_ASSERT(bufferAdaptor); | ||
481 | return T(mResourceInstanceIdentifier, identifier, 0, bufferAdaptor); | ||
482 | } | ||
483 | |||
484 | QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> EntityStore::getLatest(const Sink::Storage::NamedDatabase &db, const QByteArray &uid, DomainTypeAdaptorFactoryInterface &adaptorFactory) | ||
485 | { | ||
486 | QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> current; | ||
487 | db.findLatest(uid, | ||
488 | [¤t, &adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool { | ||
489 | Sink::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); | ||
490 | if (!buffer.isValid()) { | ||
491 | Warning() << "Read invalid buffer from disk"; | ||
492 | } else { | ||
493 | Trace() << "Found value " << key; | ||
494 | current = adaptorFactory.createAdaptor(buffer.entity()); | ||
495 | } | ||
496 | return false; | ||
497 | }, | ||
498 | [](const Sink::Storage::Error &error) { Warning() << "Failed to read current value from storage: " << error.message; }); | ||
499 | return current; | ||
500 | } | ||
501 | |||
502 | |||
503 | |||
504 | SyncStore::SyncStore(Sink::Storage::Transaction &transaction) | ||
505 | : mTransaction(transaction) | ||
506 | { | ||
507 | |||
508 | } | ||
509 | |||
510 | void SyncStore::recordRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId) | ||
511 | { | ||
512 | Index("rid.mapping." + bufferType, mTransaction).add(remoteId, localId); | ||
513 | Index("localid.mapping." + bufferType, mTransaction).add(localId, remoteId); | ||
514 | } | ||
515 | |||
516 | void SyncStore::removeRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId) | ||
517 | { | ||
518 | Index("rid.mapping." + bufferType, mTransaction).remove(remoteId, localId); | ||
519 | Index("localid.mapping." + bufferType, mTransaction).remove(localId, remoteId); | ||
520 | } | ||
521 | |||
522 | void SyncStore::updateRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId) | ||
523 | { | ||
524 | const auto oldRemoteId = Index("localid.mapping." + bufferType, mTransaction).lookup(localId); | ||
525 | removeRemoteId(bufferType, localId, oldRemoteId); | ||
526 | recordRemoteId(bufferType, localId, remoteId); | ||
527 | } | ||
528 | |||
529 | QByteArray SyncStore::resolveRemoteId(const QByteArray &bufferType, const QByteArray &remoteId) | ||
530 | { | ||
531 | // Lookup local id for remote id, or insert a new pair otherwise | ||
532 | Index index("rid.mapping." + bufferType, mTransaction); | ||
533 | QByteArray sinkId = index.lookup(remoteId); | ||
534 | if (sinkId.isEmpty()) { | ||
535 | sinkId = QUuid::createUuid().toString().toUtf8(); | ||
536 | index.add(remoteId, sinkId); | ||
537 | Index("localid.mapping." + bufferType, mTransaction).add(sinkId, remoteId); | ||
538 | } | ||
539 | return sinkId; | ||
540 | } | ||
541 | |||
542 | QByteArray SyncStore::resolveLocalId(const QByteArray &bufferType, const QByteArray &localId) | ||
543 | { | ||
544 | QByteArray remoteId = Index("localid.mapping." + bufferType, mTransaction).lookup(localId); | ||
545 | if (remoteId.isEmpty()) { | ||
546 | Warning() << "Couldn't find the remote id for " << localId; | ||
547 | return QByteArray(); | ||
548 | } | ||
549 | return remoteId; | ||
550 | } | ||
551 | |||
552 | |||
553 | |||
554 | |||
555 | |||
556 | |||
557 | |||
558 | |||
559 | Synchronizer::Synchronizer(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier) | ||
560 | : mStorage(Sink::storageLocation(), resourceInstanceIdentifier, Sink::Storage::ReadOnly), | ||
561 | mSyncStorage(Sink::storageLocation(), resourceInstanceIdentifier + ".synchronization", Sink::Storage::ReadWrite), | ||
562 | mResourceType(resourceType), | ||
563 | mResourceInstanceIdentifier(resourceInstanceIdentifier) | ||
564 | { | ||
565 | Trace() << "Starting synchronizer: " << resourceType << resourceInstanceIdentifier; | ||
566 | |||
567 | } | ||
568 | |||
569 | void Synchronizer::setup(const std::function<void(int commandId, const QByteArray &data)> &enqueueCommandCallback) | ||
570 | { | ||
571 | mEnqueue = enqueueCommandCallback; | ||
572 | } | ||
573 | |||
574 | void Synchronizer::enqueueCommand(int commandId, const QByteArray &data) | ||
575 | { | ||
576 | Q_ASSERT(mEnqueue); | ||
577 | mEnqueue(commandId, data); | ||
578 | } | ||
579 | |||
580 | EntityStore &Synchronizer::store() | ||
581 | { | ||
582 | if (!mEntityStore) { | ||
583 | mEntityStore = QSharedPointer<EntityStore>::create(mResourceType, mResourceInstanceIdentifier, mTransaction); | ||
584 | } | ||
585 | return *mEntityStore; | ||
586 | } | ||
587 | |||
588 | SyncStore &Synchronizer::syncStore() | ||
589 | { | ||
590 | if (!mSyncStore) { | ||
591 | mSyncStore = QSharedPointer<SyncStore>::create(mSyncTransaction); | ||
592 | } | ||
593 | return *mSyncStore; | ||
594 | } | ||
595 | |||
596 | void Synchronizer::createEntity(const QByteArray &sinkId, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject, | ||
603 | DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function<void(const QByteArray &)> callback) | 597 | DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function<void(const QByteArray &)> callback) |
604 | { | 598 | { |
605 | // These changes are coming from the source | 599 | // These changes are coming from the source |
@@ -616,7 +610,7 @@ void GenericResource::createEntity(const QByteArray &sinkId, const QByteArray &b | |||
616 | callback(BufferUtils::extractBuffer(fbb)); | 610 | callback(BufferUtils::extractBuffer(fbb)); |
617 | } | 611 | } |
618 | 612 | ||
619 | void GenericResource::modifyEntity(const QByteArray &sinkId, qint64 revision, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject, | 613 | void Synchronizer::modifyEntity(const QByteArray &sinkId, qint64 revision, const QByteArray &bufferType, const Sink::ApplicationDomain::ApplicationDomainType &domainObject, |
620 | DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function<void(const QByteArray &)> callback) | 614 | DomainTypeAdaptorFactoryInterface &adaptorFactory, std::function<void(const QByteArray &)> callback) |
621 | { | 615 | { |
622 | // These changes are coming from the source | 616 | // These changes are coming from the source |
@@ -634,7 +628,7 @@ void GenericResource::modifyEntity(const QByteArray &sinkId, qint64 revision, co | |||
634 | callback(BufferUtils::extractBuffer(fbb)); | 628 | callback(BufferUtils::extractBuffer(fbb)); |
635 | } | 629 | } |
636 | 630 | ||
637 | void GenericResource::deleteEntity(const QByteArray &sinkId, qint64 revision, const QByteArray &bufferType, std::function<void(const QByteArray &)> callback) | 631 | void Synchronizer::deleteEntity(const QByteArray &sinkId, qint64 revision, const QByteArray &bufferType, std::function<void(const QByteArray &)> callback) |
638 | { | 632 | { |
639 | // These changes are coming from the source | 633 | // These changes are coming from the source |
640 | const auto replayToSource = false; | 634 | const auto replayToSource = false; |
@@ -647,96 +641,36 @@ void GenericResource::deleteEntity(const QByteArray &sinkId, qint64 revision, co | |||
647 | callback(BufferUtils::extractBuffer(fbb)); | 641 | callback(BufferUtils::extractBuffer(fbb)); |
648 | } | 642 | } |
649 | 643 | ||
650 | void GenericResource::recordRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId, Sink::Storage::Transaction &transaction) | 644 | void Synchronizer::scanForRemovals(const QByteArray &bufferType, const std::function<void(const std::function<void(const QByteArray &key)> &callback)> &entryGenerator, std::function<bool(const QByteArray &remoteId)> exists) |
651 | { | ||
652 | Index("rid.mapping." + bufferType, transaction).add(remoteId, localId); | ||
653 | ; | ||
654 | Index("localid.mapping." + bufferType, transaction).add(localId, remoteId); | ||
655 | } | ||
656 | |||
657 | void GenericResource::removeRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId, Sink::Storage::Transaction &transaction) | ||
658 | { | ||
659 | Index("rid.mapping." + bufferType, transaction).remove(remoteId, localId); | ||
660 | Index("localid.mapping." + bufferType, transaction).remove(localId, remoteId); | ||
661 | } | ||
662 | |||
663 | void GenericResource::updateRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId, Sink::Storage::Transaction &transaction) | ||
664 | { | ||
665 | const auto oldRemoteId = Index("localid.mapping." + bufferType, transaction).lookup(localId); | ||
666 | removeRemoteId(bufferType, localId, oldRemoteId, transaction); | ||
667 | recordRemoteId(bufferType, localId, remoteId, transaction); | ||
668 | } | ||
669 | |||
670 | QByteArray GenericResource::resolveRemoteId(const QByteArray &bufferType, const QByteArray &remoteId, Sink::Storage::Transaction &transaction) | ||
671 | { | ||
672 | // Lookup local id for remote id, or insert a new pair otherwise | ||
673 | Index index("rid.mapping." + bufferType, transaction); | ||
674 | QByteArray sinkId = index.lookup(remoteId); | ||
675 | if (sinkId.isEmpty()) { | ||
676 | sinkId = QUuid::createUuid().toString().toUtf8(); | ||
677 | index.add(remoteId, sinkId); | ||
678 | Index("localid.mapping." + bufferType, transaction).add(sinkId, remoteId); | ||
679 | } | ||
680 | return sinkId; | ||
681 | } | ||
682 | |||
683 | QByteArray GenericResource::resolveLocalId(const QByteArray &bufferType, const QByteArray &localId, Sink::Storage::Transaction &transaction) | ||
684 | { | ||
685 | QByteArray remoteId = Index("localid.mapping." + bufferType, transaction).lookup(localId); | ||
686 | if (remoteId.isEmpty()) { | ||
687 | Warning() << "Couldn't find the remote id for " << localId; | ||
688 | return QByteArray(); | ||
689 | } | ||
690 | return remoteId; | ||
691 | } | ||
692 | |||
693 | void GenericResource::scanForRemovals(Sink::Storage::Transaction &transaction, Sink::Storage::Transaction &synchronizationTransaction, const QByteArray &bufferType, | ||
694 | const std::function<void(const std::function<void(const QByteArray &key)> &callback)> &entryGenerator, std::function<bool(const QByteArray &remoteId)> exists) | ||
695 | { | 645 | { |
696 | entryGenerator([this, &transaction, bufferType, &synchronizationTransaction, &exists](const QByteArray &key) { | 646 | entryGenerator([this, bufferType, &exists](const QByteArray &key) { |
697 | auto sinkId = Sink::Storage::uidFromKey(key); | 647 | auto sinkId = Sink::Storage::uidFromKey(key); |
698 | Trace() << "Checking for removal " << key; | 648 | Trace() << "Checking for removal " << key; |
699 | const auto remoteId = resolveLocalId(bufferType, sinkId, synchronizationTransaction); | 649 | const auto remoteId = syncStore().resolveLocalId(bufferType, sinkId); |
700 | // If we have no remoteId, the entity hasn't been replayed to the source yet | 650 | // If we have no remoteId, the entity hasn't been replayed to the source yet |
701 | if (!remoteId.isEmpty()) { | 651 | if (!remoteId.isEmpty()) { |
702 | if (!exists(remoteId)) { | 652 | if (!exists(remoteId)) { |
703 | Trace() << "Found a removed entity: " << sinkId; | 653 | Trace() << "Found a removed entity: " << sinkId; |
704 | deleteEntity(sinkId, Sink::Storage::maxRevision(transaction), bufferType, | 654 | deleteEntity(sinkId, Sink::Storage::maxRevision(mTransaction), bufferType, |
705 | [this](const QByteArray &buffer) { enqueueCommand(mSynchronizerQueue, Sink::Commands::DeleteEntityCommand, buffer); }); | 655 | [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::DeleteEntityCommand, buffer); }); |
706 | } | 656 | } |
707 | } | 657 | } |
708 | }); | 658 | }); |
709 | } | 659 | } |
710 | 660 | ||
711 | QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> GenericResource::getLatest(const Sink::Storage::NamedDatabase &db, const QByteArray &uid, DomainTypeAdaptorFactoryInterface &adaptorFactory) | 661 | void Synchronizer::createOrModify(const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity) |
712 | { | ||
713 | QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> current; | ||
714 | db.findLatest(uid, | ||
715 | [¤t, &adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool { | ||
716 | Sink::EntityBuffer buffer(const_cast<const char *>(data.data()), data.size()); | ||
717 | if (!buffer.isValid()) { | ||
718 | Warning() << "Read invalid buffer from disk"; | ||
719 | } else { | ||
720 | current = adaptorFactory.createAdaptor(buffer.entity()); | ||
721 | } | ||
722 | return false; | ||
723 | }, | ||
724 | [](const Sink::Storage::Error &error) { Warning() << "Failed to read current value from storage: " << error.message; }); | ||
725 | return current; | ||
726 | } | ||
727 | |||
728 | void GenericResource::createOrModify(Sink::Storage::Transaction &transaction, Sink::Storage::Transaction &synchronizationTransaction, | ||
729 | DomainTypeAdaptorFactoryInterface &adaptorFactory, const QByteArray &bufferType, const QByteArray &remoteId, const Sink::ApplicationDomain::ApplicationDomainType &entity) | ||
730 | { | 662 | { |
731 | auto mainDatabase = Storage::mainDatabase(transaction, bufferType); | 663 | Trace() << "Create or modify" << bufferType << remoteId; |
732 | const auto sinkId = resolveRemoteId(bufferType, remoteId, synchronizationTransaction); | 664 | auto mainDatabase = Storage::mainDatabase(mTransaction, bufferType); |
665 | const auto sinkId = syncStore().resolveRemoteId(bufferType, remoteId); | ||
733 | const auto found = mainDatabase.contains(sinkId); | 666 | const auto found = mainDatabase.contains(sinkId); |
667 | auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(mResourceType, bufferType); | ||
734 | if (!found) { | 668 | if (!found) { |
735 | Trace() << "Found a new entity: " << remoteId; | 669 | Trace() << "Found a new entity: " << remoteId; |
736 | createEntity( | 670 | createEntity( |
737 | sinkId, bufferType, entity, adaptorFactory, [this](const QByteArray &buffer) { enqueueCommand(mSynchronizerQueue, Sink::Commands::CreateEntityCommand, buffer); }); | 671 | sinkId, bufferType, entity, *adaptorFactory, [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::CreateEntityCommand, buffer); }); |
738 | } else { // modification | 672 | } else { // modification |
739 | if (auto current = getLatest(mainDatabase, sinkId, adaptorFactory)) { | 673 | if (auto current = store().getLatest(mainDatabase, sinkId, *adaptorFactory)) { |
740 | bool changed = false; | 674 | bool changed = false; |
741 | for (const auto &property : entity.changedProperties()) { | 675 | for (const auto &property : entity.changedProperties()) { |
742 | if (entity.getProperty(property) != current->getProperty(property)) { | 676 | if (entity.getProperty(property) != current->getProperty(property)) { |
@@ -746,8 +680,8 @@ void GenericResource::createOrModify(Sink::Storage::Transaction &transaction, Si | |||
746 | } | 680 | } |
747 | if (changed) { | 681 | if (changed) { |
748 | Trace() << "Found a modified entity: " << remoteId; | 682 | Trace() << "Found a modified entity: " << remoteId; |
749 | modifyEntity(sinkId, Sink::Storage::maxRevision(transaction), bufferType, entity, adaptorFactory, | 683 | modifyEntity(sinkId, Sink::Storage::maxRevision(mTransaction), bufferType, entity, *adaptorFactory, |
750 | [this](const QByteArray &buffer) { enqueueCommand(mSynchronizerQueue, Sink::Commands::ModifyEntityCommand, buffer); }); | 684 | [this](const QByteArray &buffer) { enqueueCommand(Sink::Commands::ModifyEntityCommand, buffer); }); |
751 | } | 685 | } |
752 | } else { | 686 | } else { |
753 | Warning() << "Failed to get current entity"; | 687 | Warning() << "Failed to get current entity"; |
@@ -755,6 +689,118 @@ void GenericResource::createOrModify(Sink::Storage::Transaction &transaction, Si | |||
755 | } | 689 | } |
756 | } | 690 | } |
757 | 691 | ||
692 | KAsync::Job<void> Synchronizer::synchronize() | ||
693 | { | ||
694 | mTransaction = mStorage.createTransaction(Sink::Storage::ReadOnly); | ||
695 | mSyncTransaction = mSyncStorage.createTransaction(Sink::Storage::ReadWrite); | ||
696 | return synchronizeWithSource().then<void>([this]() { | ||
697 | mTransaction.abort(); | ||
698 | mSyncTransaction.commit(); | ||
699 | mSyncStore.clear(); | ||
700 | mEntityStore.clear(); | ||
701 | }); | ||
702 | } | ||
703 | |||
704 | |||
705 | |||
706 | SourceWriteBack::SourceWriteBack(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier) | ||
707 | : ChangeReplay(resourceInstanceIdentifier), | ||
708 | mSyncStorage(Sink::storageLocation(), resourceInstanceIdentifier + ".synchronization", Sink::Storage::ReadWrite), | ||
709 | mResourceType(resourceType), | ||
710 | mResourceInstanceIdentifier(resourceInstanceIdentifier) | ||
711 | { | ||
712 | |||
713 | } | ||
714 | |||
715 | EntityStore &SourceWriteBack::store() | ||
716 | { | ||
717 | if (!mEntityStore) { | ||
718 | mEntityStore = QSharedPointer<EntityStore>::create(mResourceType, mResourceInstanceIdentifier, mTransaction); | ||
719 | } | ||
720 | return *mEntityStore; | ||
721 | } | ||
722 | |||
723 | SyncStore &SourceWriteBack::syncStore() | ||
724 | { | ||
725 | if (!mSyncStore) { | ||
726 | mSyncStore = QSharedPointer<SyncStore>::create(mSyncTransaction); | ||
727 | } | ||
728 | return *mSyncStore; | ||
729 | } | ||
730 | |||
731 | KAsync::Job<void> SourceWriteBack::replay(const QByteArray &type, const QByteArray &key, const QByteArray &value) | ||
732 | { | ||
733 | mTransaction = mStorage.createTransaction(Sink::Storage::ReadOnly); | ||
734 | mSyncTransaction = mSyncStorage.createTransaction(Sink::Storage::ReadWrite); | ||
735 | |||
736 | Sink::EntityBuffer buffer(value); | ||
737 | const Sink::Entity &entity = buffer.entity(); | ||
738 | const auto metadataBuffer = Sink::EntityBuffer::readBuffer<Sink::Metadata>(entity.metadata()); | ||
739 | Q_ASSERT(metadataBuffer); | ||
740 | if (!metadataBuffer->replayToSource()) { | ||
741 | Trace() << "Change is coming from the source"; | ||
742 | return KAsync::null<void>(); | ||
743 | } | ||
744 | const qint64 revision = metadataBuffer ? metadataBuffer->revision() : -1; | ||
745 | const auto operation = metadataBuffer ? metadataBuffer->operation() : Sink::Operation_Creation; | ||
746 | const auto uid = Sink::Storage::uidFromKey(key); | ||
747 | QByteArray oldRemoteId; | ||
748 | |||
749 | if (operation != Sink::Operation_Creation) { | ||
750 | oldRemoteId = syncStore().resolveLocalId(type, uid); | ||
751 | } | ||
752 | Trace() << "Replaying " << key << type; | ||
753 | |||
754 | KAsync::Job<QByteArray> job = KAsync::null<QByteArray>(); | ||
755 | if (type == ENTITY_TYPE_FOLDER) { | ||
756 | auto folder = store().read<ApplicationDomain::Folder>(uid); | ||
757 | // const Sink::ApplicationDomain::Folder folder(mResourceInstanceIdentifier, uid, revision, mAdaptorFactories.value(type)->createAdaptor(entity)); | ||
758 | job = replay(folder, operation, oldRemoteId); | ||
759 | } else if (type == ENTITY_TYPE_MAIL) { | ||
760 | auto mail = store().read<ApplicationDomain::Mail>(uid); | ||
761 | // const Sink::ApplicationDomain::Mail mail(mResourceInstanceIdentifier, uid, revision, mAdaptorFactories.value(type)->createAdaptor(entity)); | ||
762 | job = replay(mail, operation, oldRemoteId); | ||
763 | } | ||
764 | |||
765 | return job.then<void, QByteArray>([this, operation, type, uid](const QByteArray &remoteId) { | ||
766 | Trace() << "Replayed change with remote id: " << remoteId; | ||
767 | if (operation == Sink::Operation_Creation) { | ||
768 | if (remoteId.isEmpty()) { | ||
769 | Warning() << "Returned an empty remoteId from the creation"; | ||
770 | } else { | ||
771 | syncStore().recordRemoteId(type, uid, remoteId); | ||
772 | } | ||
773 | } else if (operation == Sink::Operation_Modification) { | ||
774 | if (remoteId.isEmpty()) { | ||
775 | Warning() << "Returned an empty remoteId from the creation"; | ||
776 | } else { | ||
777 | syncStore().updateRemoteId(type, uid, remoteId); | ||
778 | } | ||
779 | } else if (operation == Sink::Operation_Removal) { | ||
780 | syncStore().removeRemoteId(type, uid, remoteId); | ||
781 | } else { | ||
782 | Warning() << "Unkown operation" << operation; | ||
783 | } | ||
784 | |||
785 | mTransaction.abort(); | ||
786 | mSyncTransaction.commit(); | ||
787 | mSyncStore.clear(); | ||
788 | mEntityStore.clear(); | ||
789 | }, [](int errorCode, const QString &errorMessage) { | ||
790 | Warning() << "Failed to replay change: " << errorMessage; | ||
791 | }); | ||
792 | } | ||
793 | |||
794 | KAsync::Job<QByteArray> SourceWriteBack::replay(const ApplicationDomain::Mail &, Sink::Operation, const QByteArray &) | ||
795 | { | ||
796 | return KAsync::null<QByteArray>(); | ||
797 | } | ||
798 | |||
799 | KAsync::Job<QByteArray> SourceWriteBack::replay(const ApplicationDomain::Folder &, Sink::Operation, const QByteArray &) | ||
800 | { | ||
801 | return KAsync::null<QByteArray>(); | ||
802 | } | ||
803 | |||
758 | 804 | ||
759 | #pragma clang diagnostic push | 805 | #pragma clang diagnostic push |
760 | #pragma clang diagnostic ignored "-Wundefined-reinterpret-cast" | 806 | #pragma clang diagnostic ignored "-Wundefined-reinterpret-cast" |