From e05306d22bc994bcfae869dcd857ec76027495d1 Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Mon, 28 Dec 2015 10:19:27 +0100 Subject: Changereplay for maildir folders. The sync and changereplay can not run at the same time, or would have to share the transaction otherwise. --- common/genericresource.cpp | 36 ++++++++++++-- common/genericresource.h | 1 + examples/maildirresource/libmaildir/maildir.cpp | 7 +++ examples/maildirresource/libmaildir/maildir.h | 5 ++ examples/maildirresource/maildirresource.cpp | 63 ++++++++++++++++++++++++- examples/maildirresource/maildirresource.h | 6 +++ tests/maildirresourcetest.cpp | 55 ++++++++++++++++++++- 7 files changed, 167 insertions(+), 6 deletions(-) diff --git a/common/genericresource.cpp b/common/genericresource.cpp index 5e6764a..afe3900 100644 --- a/common/genericresource.cpp +++ b/common/genericresource.cpp @@ -46,6 +46,13 @@ public: return lastReplayedRevision; } + bool allChangesReplayed() + { + const qint64 topRevision = Storage::maxRevision(mStorage.createTransaction(Storage::ReadOnly)); + const qint64 lastReplayedRevision = getLastReplayedRevision(); + return (lastReplayedRevision >= topRevision); + } + Q_SIGNALS: void changesReplayed(); @@ -62,7 +69,8 @@ public Q_SLOTS: }); const qint64 topRevision = Storage::maxRevision(mainStoreTransaction); - if (lastReplayedRevision < topRevision) { + Trace() << "Changereplay from " << lastReplayedRevision << " to " << topRevision; + if (lastReplayedRevision <= topRevision) { qint64 revision = lastReplayedRevision; for (;revision <= topRevision; revision++) { const auto uid = Storage::getUidFromRevision(mainStoreTransaction, revision); @@ -82,6 +90,7 @@ public Q_SLOTS: replayStoreTransaction.commit(); Trace() << "Replayed until " << revision; } + emit changesReplayed(); } private: @@ -269,8 +278,7 @@ GenericResource::GenericResource(const QByteArray &resourceInstanceIdentifier, c mSourceChangeReplay = new ChangeReplay(resourceInstanceIdentifier, [this](const QByteArray &type, const QByteArray &key, const QByteArray &value) { return this->replay(type, key, value); }); - QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, mSourceChangeReplay, &ChangeReplay::revisionChanged); - QObject::connect(mSourceChangeReplay, &ChangeReplay::changesReplayed, this, &GenericResource::updateLowerBoundRevision); + enableChangeReplay(true); mClientLowerBoundRevision = mPipeline->cleanedUpRevision(); mProcessor->setOldestUsedRevision(mSourceChangeReplay->getLastReplayedRevision()); @@ -285,6 +293,18 @@ GenericResource::~GenericResource() delete mSourceChangeReplay; } +void GenericResource::enableChangeReplay(bool enable) +{ + if (enable) { + QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, mSourceChangeReplay, &ChangeReplay::revisionChanged); + QObject::connect(mSourceChangeReplay, &ChangeReplay::changesReplayed, this, &GenericResource::updateLowerBoundRevision); + mSourceChangeReplay->revisionChanged(); + } else { + QObject::disconnect(mPipeline.data(), &Pipeline::revisionUpdated, mSourceChangeReplay, &ChangeReplay::revisionChanged); + QObject::disconnect(mSourceChangeReplay, &ChangeReplay::changesReplayed, this, &GenericResource::updateLowerBoundRevision); + } +} + void GenericResource::addType(const QByteArray &type, DomainTypeAdaptorFactoryInterface::Ptr factory, const QVector &preprocessors) { mPipeline->setPreprocessors(type, preprocessors); @@ -380,6 +400,16 @@ KAsync::Job GenericResource::processAllMessages() waitForDrained(f, mSynchronizerQueue); }).then([this](KAsync::Future &f) { waitForDrained(f, mUserQueue); + }).then([this](KAsync::Future &f) { + if (mSourceChangeReplay->allChangesReplayed()) { + f.setFinished(); + } else { + auto context = new QObject; + QObject::connect(mSourceChangeReplay, &ChangeReplay::changesReplayed, context, [&f, context]() { + delete context; + f.setFinished(); + }); + } }); } diff --git a/common/genericresource.h b/common/genericresource.h index 1aa4206..a58a7c3 100644 --- a/common/genericresource.h +++ b/common/genericresource.h @@ -56,6 +56,7 @@ private Q_SLOTS: void updateLowerBoundRevision(); protected: + void enableChangeReplay(bool); void addType(const QByteArray &type, DomainTypeAdaptorFactoryInterface::Ptr factory, const QVector &preprocessors); virtual KAsync::Job replay(const QByteArray &type, const QByteArray &key, const QByteArray &value); void onProcessorError(int errorCode, const QString &errorMessage); diff --git a/examples/maildirresource/libmaildir/maildir.cpp b/examples/maildirresource/libmaildir/maildir.cpp index 67a2d2d..2b0148c 100644 --- a/examples/maildirresource/libmaildir/maildir.cpp +++ b/examples/maildirresource/libmaildir/maildir.cpp @@ -332,6 +332,13 @@ bool Maildir::create() return true; } +bool Maildir::remove() +{ + QDir dir(d->path); + dir.removeRecursively(); + return true; +} + QString Maildir::path() const { return d->path; diff --git a/examples/maildirresource/libmaildir/maildir.h b/examples/maildirresource/libmaildir/maildir.h index 6853033..f80ba5d 100644 --- a/examples/maildirresource/libmaildir/maildir.h +++ b/examples/maildirresource/libmaildir/maildir.h @@ -70,6 +70,11 @@ public: */ bool create(); + /** + * Remove the maildir and everything it contains. + */ + bool remove(); + /** * Returns the path of this maildir. */ diff --git a/examples/maildirresource/maildirresource.cpp b/examples/maildirresource/maildirresource.cpp index d0b663b..8333f76 100644 --- a/examples/maildirresource/maildirresource.cpp +++ b/examples/maildirresource/maildirresource.cpp @@ -63,6 +63,22 @@ MaildirResource::MaildirResource(const QByteArray &instanceIdentifier, const QSh Trace() << "Started maildir resource for maildir: " << mMaildirPath; } +void MaildirResource::recordRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId, Akonadi2::Storage::Transaction &transaction) +{ + Index index("rid.mapping." + bufferType, transaction); + Index localIndex("localid.mapping." + bufferType, transaction); + index.add(remoteId, localId); + localIndex.add(localId, remoteId); +} + +void MaildirResource::removeRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId, Akonadi2::Storage::Transaction &transaction) +{ + Index index("rid.mapping." + bufferType, transaction); + Index localIndex("localid.mapping." + bufferType, transaction); + index.remove(remoteId, localId); + localIndex.remove(localId, remoteId); +} + QString MaildirResource::resolveRemoteId(const QByteArray &bufferType, const QString &remoteId, Akonadi2::Storage::Transaction &transaction) { //Lookup local id for remote id, or insert a new pair otherwise @@ -332,17 +348,62 @@ KAsync::Job MaildirResource::synchronizeWithSource() { Log() << " Synchronizing"; return KAsync::start([this]() { + //Changereplay would deadlock otherwise when trying to open the synchronization store + enableChangeReplay(false); auto transaction = Akonadi2::Storage(Akonadi2::storageLocation(), mResourceInstanceIdentifier, Akonadi2::Storage::ReadOnly).createTransaction(Akonadi2::Storage::ReadOnly); synchronizeFolders(transaction); for (const auto &folder : listAvailableFolders()) { synchronizeMails(transaction, folder); } + Log() << "Done Synchronizing"; + enableChangeReplay(true); }); } KAsync::Job MaildirResource::replay(const QByteArray &type, const QByteArray &key, const QByteArray &value) { - Trace() << "Replaying " << key; + //This results in a deadlock during sync + Akonadi2::Storage store(Akonadi2::storageLocation(), mResourceInstanceIdentifier + ".synchronization", Akonadi2::Storage::ReadWrite); + auto synchronizationTransaction = store.createTransaction(Akonadi2::Storage::ReadWrite); + const auto uid = Akonadi2::Storage::uidFromKey(key); + const auto remoteId = resolveLocalId(type, uid, synchronizationTransaction); + + Trace() << "Replaying " << key << type; + if (type == ENTITY_TYPE_FOLDER) { + Akonadi2::EntityBuffer buffer(value.data(), value.size()); + const Akonadi2::Entity &entity = buffer.entity(); + const auto metadataBuffer = Akonadi2::EntityBuffer::readBuffer(entity.metadata()); + const qint64 revision = metadataBuffer ? metadataBuffer->revision() : -1; + const auto operation = metadataBuffer ? metadataBuffer->operation() : Akonadi2::Operation_Creation; + if (operation == Akonadi2::Operation_Creation) { + //FIXME: This check only works for new entities + //Figure out wether we have replayed that revision already to the source + if (!remoteId.isEmpty()) { + Trace() << "Change is coming from the source"; + return KAsync::null(); + } + const Akonadi2::ApplicationDomain::Folder folder(mResourceInstanceIdentifier, Akonadi2::Storage::uidFromKey(key), revision, mFolderAdaptorFactory->createAdaptor(entity)); + auto folderName = folder.getProperty("name").toString(); + //TODO handle non toplevel folders + auto path = mMaildirPath + "/" + folderName; + Trace() << "Creating a new folder: " << path; + KPIM::Maildir maildir(path, false); + maildir.create(); + recordRemoteId(ENTITY_TYPE_FOLDER, folder.identifier(), path.toUtf8(), synchronizationTransaction); + } else if (operation == Akonadi2::Operation_Removal) { + const auto uid = Akonadi2::Storage::uidFromKey(key); + const auto remoteId = resolveLocalId(ENTITY_TYPE_FOLDER, uid, synchronizationTransaction); + const auto path = remoteId; + Trace() << "Removing a folder: " << path; + KPIM::Maildir maildir(path, false); + maildir.remove(); + removeRemoteId(ENTITY_TYPE_FOLDER, uid, remoteId.toUtf8(), synchronizationTransaction); + } else if (operation == Akonadi2::Operation_Modification) { + Warning() << "Folder modifications are not implemented"; + } else { + Warning() << "Unkown operation" << operation; + } + } return KAsync::null(); } diff --git a/examples/maildirresource/maildirresource.h b/examples/maildirresource/maildirresource.h index eec1e97..e577e18 100644 --- a/examples/maildirresource/maildirresource.h +++ b/examples/maildirresource/maildirresource.h @@ -40,6 +40,12 @@ public: private: KAsync::Job replay(const QByteArray &type, const QByteArray &key, const QByteArray &value) Q_DECL_OVERRIDE; + /** + * Records a localId to remoteId mapping + */ + void recordRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId, Akonadi2::Storage::Transaction &transaction); + void removeRemoteId(const QByteArray &bufferType, const QByteArray &localId, const QByteArray &remoteId, Akonadi2::Storage::Transaction &transaction); + /** * Tries to find a local id for the remote id, and creates a new local id otherwise. * diff --git a/tests/maildirresourcetest.cpp b/tests/maildirresourcetest.cpp index 51ea278..1e2d36b 100644 --- a/tests/maildirresourcetest.cpp +++ b/tests/maildirresourcetest.cpp @@ -59,8 +59,6 @@ private Q_SLOTS: copyRecursively(TESTDATAPATH "/maildir1", targetPath); Akonadi2::Log::setDebugOutputLevel(Akonadi2::Log::Trace); - auto factory = Akonadi2::ResourceFactory::load("org.kde.maildir"); - QVERIFY(factory); MaildirResource::removeFromDisk("org.kde.maildir.instance1"); Akonadi2::ApplicationDomain::AkonadiResource resource; resource.setProperty("identifier", "org.kde.maildir.instance1"); @@ -73,6 +71,7 @@ private Q_SLOTS: { Akonadi2::Store::shutdown(QByteArray("org.kde.maildir.instance1")).exec().waitForFinished(); MaildirResource::removeFromDisk("org.kde.maildir.instance1"); + Akonadi2::Store::start(QByteArray("org.kde.maildir.instance1")).exec().waitForFinished(); } void init() @@ -236,6 +235,58 @@ private Q_SLOTS: QCOMPARE(mailModel->rowCount(QModelIndex()), 1); } + void testCreateFolder() + { + Akonadi2::Query query; + query.resources << "org.kde.maildir.instance1"; + query.syncOnDemand = false; + query.processAll = true; + + //Ensure all local data is processed + Akonadi2::Store::synchronize(query).exec().waitForFinished(); + + Akonadi2::ApplicationDomain::Folder folder("org.kde.maildir.instance1"); + folder.setProperty("name", "testCreateFolder"); + + Akonadi2::Store::create(folder).exec().waitForFinished(); + + //Ensure all local data is processed + Akonadi2::Store::synchronize(query).exec().waitForFinished(); + + auto targetPath = tempDir.path() + "/maildir1/testCreateFolder"; + QFileInfo file(targetPath); + QTRY_VERIFY(file.exists()); + QVERIFY(file.isDir()); + } + + void testRemoveFolder() + { + Akonadi2::Query query; + query.resources << "org.kde.maildir.instance1"; + query.syncOnDemand = false; + query.processAll = true; + + auto targetPath = tempDir.path() + "/maildir1/testCreateFolder"; + + Akonadi2::ApplicationDomain::Folder folder("org.kde.maildir.instance1"); + folder.setProperty("name", "testCreateFolder"); + Akonadi2::Store::create(folder).exec().waitForFinished(); + Akonadi2::Store::synchronize(query).exec().waitForFinished(); + QTRY_VERIFY(QFileInfo(targetPath).exists()); + + Akonadi2::Query folderQuery; + folderQuery.resources << "org.kde.maildir.instance1"; + folderQuery.propertyFilter.insert("name", "testCreateFolder"); + auto model = Akonadi2::Store::loadModel(folderQuery); + QTRY_VERIFY(model->data(QModelIndex(), Akonadi2::Store::ChildrenFetchedRole).toBool()); + QCOMPARE(model->rowCount(QModelIndex()), 1); + auto createdFolder = model->index(0, 0, QModelIndex()).data(Akonadi2::Store::DomainObjectRole).value(); + + Akonadi2::Store::remove(*createdFolder).exec().waitForFinished(); + Akonadi2::Store::synchronize(query).exec().waitForFinished(); + QTRY_VERIFY(!QFileInfo(targetPath).exists()); + } + }; QTEST_MAIN(MaildirResourceTest) -- cgit v1.2.3