From 938554f267193b652478fc12343819fa45d76034 Mon Sep 17 00:00:00 2001 From: Christian Mollekopf Date: Mon, 28 Nov 2016 19:33:01 +0100 Subject: Moved inspection commands to a separate inspector. --- common/CMakeLists.txt | 1 + common/commandprocessor.cpp | 27 +- common/commandprocessor.h | 14 +- common/genericresource.cpp | 58 +--- common/genericresource.h | 4 +- common/inspector.cpp | 85 ++++++ common/inspector.h | 52 ++++ examples/dummyresource/resourcefactory.cpp | 44 +-- examples/dummyresource/resourcefactory.h | 1 - examples/imapresource/imapresource.cpp | 306 +++++++++++---------- examples/imapresource/imapresource.h | 7 - examples/maildirresource/maildirresource.cpp | 185 +++++++------ examples/maildirresource/maildirresource.h | 3 +- .../mailtransportresource.cpp | 48 ++-- .../mailtransportresource/mailtransportresource.h | 2 - 15 files changed, 492 insertions(+), 345 deletions(-) create mode 100644 common/inspector.cpp create mode 100644 common/inspector.h diff --git a/common/CMakeLists.txt b/common/CMakeLists.txt index 018fc22..df44ce5 100644 --- a/common/CMakeLists.txt +++ b/common/CMakeLists.txt @@ -78,6 +78,7 @@ set(command_SRCS mail/threadindexer.cpp notification.cpp commandprocessor.cpp + inspector.cpp ${storage_SRCS}) add_library(${PROJECT_NAME} SHARED ${command_SRCS}) diff --git a/common/commandprocessor.cpp b/common/commandprocessor.cpp index c9fca37..4ff352b 100644 --- a/common/commandprocessor.cpp +++ b/common/commandprocessor.cpp @@ -22,7 +22,8 @@ #include "commands.h" #include "messagequeue.h" #include "queuedcommand_generated.h" - +#include "inspector.h" +#include "synchronizer.h" #include "pipeline.h" static int sBatchSize = 100; @@ -42,11 +43,6 @@ void CommandProcessor::setOldestUsedRevision(qint64 revision) mLowerBoundRevision = revision; } -void CommandProcessor::setInspectionCommand(const InspectionFunction &f) -{ - mInspect = f; -} - void CommandProcessor::setFlushCommand(const FlushFunction &f) { mFlush = f; @@ -91,12 +87,9 @@ KAsync::Job CommandProcessor::processQueuedCommand(const Sink::QueuedCom case Sink::Commands::CreateEntityCommand: return mPipeline->newEntity(data, size); case Sink::Commands::InspectionCommand: - if (mInspect) { - return mInspect(data, size) + Q_ASSERT(mInspector); + return mInspector->processCommand(data, size) .syncThen([]() { return -1; }); - } else { - return KAsync::error(-1, "Missing inspection command."); - } case Sink::Commands::FlushCommand: if (mFlush) { return mFlush(data, size) @@ -191,3 +184,15 @@ KAsync::Job CommandProcessor::processPipeline() }); } +void CommandProcessor::setInspector(const QSharedPointer &inspector) +{ + mInspector = inspector; + QObject::connect(mInspector.data(), &Inspector::notify, this, &CommandProcessor::notify); +} + +void CommandProcessor::setSynchronizer(const QSharedPointer &synchronizer) +{ + mSynchronizer = synchronizer; + QObject::connect(mSynchronizer.data(), &Synchronizer::notify, this, &CommandProcessor::notify); +} + diff --git a/common/commandprocessor.h b/common/commandprocessor.h index 51d845e..75ae37a 100644 --- a/common/commandprocessor.h +++ b/common/commandprocessor.h @@ -24,12 +24,16 @@ #include #include #include + #include "log.h" +#include "notification.h" class MessageQueue; namespace Sink { class Pipeline; + class Inspector; + class Synchronizer; class QueuedCommand; /** @@ -38,7 +42,6 @@ namespace Sink { class CommandProcessor : public QObject { Q_OBJECT - typedef std::function(void const *, size_t)> InspectionFunction; typedef std::function(void const *, size_t)> FlushFunction; SINK_DEBUG_AREA("commandprocessor") @@ -47,11 +50,13 @@ public: void setOldestUsedRevision(qint64 revision); - void setInspectionCommand(const InspectionFunction &f); - void setFlushCommand(const FlushFunction &f); + void setInspector(const QSharedPointer &inspector); + void setSynchronizer(const QSharedPointer &synchronizer); + signals: + void notify(Notification); void error(int errorCode, const QString &errorMessage); private: @@ -72,8 +77,9 @@ private: bool mProcessingLock; // The lowest revision we no longer need qint64 mLowerBoundRevision; - InspectionFunction mInspect; FlushFunction mFlush; + QSharedPointer mSynchronizer; + QSharedPointer mInspector; }; }; diff --git a/common/genericresource.cpp b/common/genericresource.cpp index 3aa4fce..80e59c9 100644 --- a/common/genericresource.cpp +++ b/common/genericresource.cpp @@ -59,41 +59,6 @@ GenericResource::GenericResource(const ResourceContext &resourceContext, const Q mClientLowerBoundRevision(std::numeric_limits::max()) { mProcessor = std::unique_ptr(new CommandProcessor(mPipeline.data(), QList() << &mUserQueue << &mSynchronizerQueue)); - mProcessor->setInspectionCommand([this](void const *command, size_t size) { - flatbuffers::Verifier verifier((const uint8_t *)command, size); - if (Sink::Commands::VerifyInspectionBuffer(verifier)) { - auto buffer = Sink::Commands::GetInspection(command); - int inspectionType = buffer->type(); - - QByteArray inspectionId = BufferUtils::extractBuffer(buffer->id()); - QByteArray entityId = BufferUtils::extractBuffer(buffer->entityId()); - QByteArray domainType = BufferUtils::extractBuffer(buffer->domainType()); - QByteArray property = BufferUtils::extractBuffer(buffer->property()); - QByteArray expectedValueString = BufferUtils::extractBuffer(buffer->expectedValue()); - QDataStream s(expectedValueString); - QVariant expectedValue; - s >> expectedValue; - inspect(inspectionType, inspectionId, domainType, entityId, property, expectedValue) - .then( - [=](const KAsync::Error &error) { - Sink::Notification n; - n.type = Sink::Notification::Inspection; - n.id = inspectionId; - if (error) { - Warning_area("resource.inspection") << "Inspection failed: " << inspectionType << inspectionId << entityId << error.errorMessage; - n.code = Sink::Notification::Failure; - } else { - Log_area("resource.inspection") << "Inspection was successful: " << inspectionType << inspectionId << entityId; - n.code = Sink::Notification::Success; - } - emit notify(n); - return KAsync::null(); - }) - .exec(); - return KAsync::null(); - } - return KAsync::error(-1, "Invalid inspection command."); - }); mProcessor->setFlushCommand([this](void const *command, size_t size) { flatbuffers::Verifier verifier((const uint8_t *)command, size); if (Sink::Commands::VerifyFlushBuffer(verifier)) { @@ -114,14 +79,9 @@ GenericResource::GenericResource(const ResourceContext &resourceContext, const Q } return KAsync::error(-1, "Invalid flush command."); }); - { - auto ret = QObject::connect(mProcessor.get(), &CommandProcessor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); }); - Q_ASSERT(ret); - } - { - auto ret = QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, this, &Resource::revisionUpdated); - Q_ASSERT(ret); - } + QObject::connect(mProcessor.get(), &CommandProcessor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); }); + QObject::connect(mProcessor.get(), &CommandProcessor::notify, this, &GenericResource::notify); + QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, this, &Resource::revisionUpdated); mCommitQueueTimer.setInterval(sCommitInterval); mCommitQueueTimer.setSingleShot(true); @@ -132,13 +92,6 @@ GenericResource::~GenericResource() { } -KAsync::Job GenericResource::inspect( - int inspectionType, const QByteArray &inspectionId, const QByteArray &domainType, const QByteArray &entityId, const QByteArray &property, const QVariant &expectedValue) -{ - SinkWarning() << "Inspection not implemented"; - return KAsync::null(); -} - void GenericResource::setupPreprocessors(const QByteArray &type, const QVector &preprocessors) { mPipeline->setPreprocessors(type, preprocessors); @@ -179,6 +132,11 @@ void GenericResource::setupSynchronizer(const QSharedPointer &sync QMetaObject::invokeMethod(mSynchronizer.data(), "revisionChanged", Qt::QueuedConnection); } +void GenericResource::setupInspector(const QSharedPointer &inspector) +{ + mProcessor->setInspector(inspector); +} + void GenericResource::removeFromDisk(const QByteArray &instanceIdentifier) { Sink::Storage::DataStore(Sink::storageLocation(), instanceIdentifier, Sink::Storage::DataStore::ReadWrite).removeFromDisk(); diff --git a/common/genericresource.h b/common/genericresource.h index 12f15f3..0bc47da 100644 --- a/common/genericresource.h +++ b/common/genericresource.h @@ -32,6 +32,7 @@ namespace Sink { class Pipeline; class Preprocessor; class Synchronizer; +class Inspector; class CommandProcessor; /** @@ -50,8 +51,6 @@ public: virtual KAsync::Job synchronizeWithSource(const Sink::QueryBase &query) Q_DECL_OVERRIDE; virtual KAsync::Job processAllMessages() Q_DECL_OVERRIDE; virtual void setLowerBoundRevision(qint64 revision) Q_DECL_OVERRIDE; - virtual KAsync::Job - inspect(int inspectionType, const QByteArray &inspectionId, const QByteArray &domainType, const QByteArray &entityId, const QByteArray &property, const QVariant &expectedValue); int error() const; @@ -64,6 +63,7 @@ private slots: protected: void setupPreprocessors(const QByteArray &type, const QVector &preprocessors); void setupSynchronizer(const QSharedPointer &synchronizer); + void setupInspector(const QSharedPointer &inspector); void onProcessorError(int errorCode, const QString &errorMessage); void enqueueCommand(MessageQueue &mq, int commandId, const QByteArray &data); diff --git a/common/inspector.cpp b/common/inspector.cpp new file mode 100644 index 0000000..8b4c93a --- /dev/null +++ b/common/inspector.cpp @@ -0,0 +1,85 @@ +/* + * Copyright (C) 2016 Christian Mollekopf + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) version 3, or any + * later version accepted by the membership of KDE e.V. (or its + * successor approved by the membership of KDE e.V.), which shall + * act as a proxy defined in Section 6 of version 3 of the license. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library. If not, see . + */ +#include "inspector.h" + +#include "resourcecontext.h" +#include "inspection_generated.h" +#include "bufferutils.h" + +#include + +using namespace Sink; + +Inspector::Inspector(const ResourceContext &context) + : QObject(), + mResourceContext(context) + // mEntityStore(Storage::EntityStore::Ptr::create(mResourceContext)), + // mSyncStorage(Sink::storageLocation(), mResourceContext.instanceId() + ".synchronization", Sink::Storage::DataStore::DataStore::ReadWrite) +{ + // SinkTrace() << "Starting synchronizer: " << mResourceContext.resourceType << mResourceContext.instanceId(); +} + +Inspector::~Inspector() +{ + +} + +KAsync::Job Inspector::processCommand(void const *command, size_t size) +{ + flatbuffers::Verifier verifier((const uint8_t *)command, size); + if (Sink::Commands::VerifyInspectionBuffer(verifier)) { + auto buffer = Sink::Commands::GetInspection(command); + int inspectionType = buffer->type(); + + QByteArray inspectionId = BufferUtils::extractBuffer(buffer->id()); + QByteArray entityId = BufferUtils::extractBuffer(buffer->entityId()); + QByteArray domainType = BufferUtils::extractBuffer(buffer->domainType()); + QByteArray property = BufferUtils::extractBuffer(buffer->property()); + QByteArray expectedValueString = BufferUtils::extractBuffer(buffer->expectedValue()); + QDataStream s(expectedValueString); + QVariant expectedValue; + s >> expectedValue; + inspect(inspectionType, inspectionId, domainType, entityId, property, expectedValue) + .then( + [=](const KAsync::Error &error) { + Sink::Notification n; + n.type = Sink::Notification::Inspection; + n.id = inspectionId; + if (error) { + Warning_area("resource.inspection") << "Inspection failed: " << inspectionType << inspectionId << entityId << error.errorMessage; + n.code = Sink::Notification::Failure; + } else { + Log_area("resource.inspection") << "Inspection was successful: " << inspectionType << inspectionId << entityId; + n.code = Sink::Notification::Success; + } + emit notify(n); + return KAsync::null(); + }) + .exec(); + return KAsync::null(); + } + return KAsync::error(-1, "Invalid inspection command."); +} + +KAsync::Job Inspector::inspect(int inspectionType, const QByteArray &inspectionId, const QByteArray &domainType, const QByteArray &entityId, const QByteArray &property, const QVariant &expectedValue) +{ + return KAsync::error(-1, "Inspection not implemented."); +} + diff --git a/common/inspector.h b/common/inspector.h new file mode 100644 index 0000000..ff167b1 --- /dev/null +++ b/common/inspector.h @@ -0,0 +1,52 @@ +/* + * Copyright (C) 2016 Christian Mollekopf + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) version 3, or any + * later version accepted by the membership of KDE e.V. (or its + * successor approved by the membership of KDE e.V.), which shall + * act as a proxy defined in Section 6 of version 3 of the license. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library. If not, see . + */ +#pragma once + +#include "sink_export.h" +#include +#include + +#include "notification.h" +#include "resourcecontext.h" + +namespace Sink { + +/** + * Synchronize and add what we don't already have to local queue + */ +class SINK_EXPORT Inspector : public QObject +{ + Q_OBJECT +public: + Inspector(const ResourceContext &resourceContext); + virtual ~Inspector(); + + KAsync::Job processCommand(void const *command, size_t size); + +signals: + void notify(Notification); + +protected: + virtual KAsync::Job inspect(int inspectionType, const QByteArray &inspectionId, const QByteArray &domainType, const QByteArray &entityId, const QByteArray &property, const QVariant &expectedValue); + + Sink::ResourceContext mResourceContext; +}; + +} diff --git a/examples/dummyresource/resourcefactory.cpp b/examples/dummyresource/resourcefactory.cpp index f5ab2d9..8e81c79 100644 --- a/examples/dummyresource/resourcefactory.cpp +++ b/examples/dummyresource/resourcefactory.cpp @@ -37,6 +37,7 @@ #include "facadefactory.h" #include "adaptorfactoryregistry.h" #include "synchronizer.h" +#include "inspector.h" #include "mailpreprocessor.h" #include "remoteidmap.h" #include @@ -130,10 +131,36 @@ class DummySynchronizer : public Sink::Synchronizer { }; +class DummyInspector : public Sink::Inspector { +public: + DummyInspector(const Sink::ResourceContext &resourceContext) + : Sink::Inspector(resourceContext) + { + + } + +protected: + KAsync::Job inspect(int inspectionType, const QByteArray &inspectionId, const QByteArray &domainType, const QByteArray &entityId, const QByteArray &property, const QVariant &expectedValue) Q_DECL_OVERRIDE + { + SinkTrace() << "Inspecting " << inspectionType << domainType << entityId << property << expectedValue; + if (property == "testInspection") { + if (expectedValue.toBool()) { + //Success + return KAsync::null(); + } else { + //Failure + return KAsync::error(1, "Failed."); + } + } + return KAsync::null(); + } +}; + DummyResource::DummyResource(const Sink::ResourceContext &resourceContext, const QSharedPointer &pipeline) : Sink::GenericResource(resourceContext, pipeline) { setupSynchronizer(QSharedPointer::create(resourceContext)); + setupInspector(QSharedPointer::create(resourceContext)); setupPreprocessors(ENTITY_TYPE_MAIL, QVector() << new MailPropertyExtractor); setupPreprocessors(ENTITY_TYPE_FOLDER, @@ -159,23 +186,6 @@ KAsync::Job DummyResource::synchronizeWithSource(const Sink::QueryBase &qu return GenericResource::synchronizeWithSource(query); } -KAsync::Job DummyResource::inspect(int inspectionType, const QByteArray &inspectionId, const QByteArray &domainType, const QByteArray &entityId, const QByteArray &property, const QVariant &expectedValue) -{ - - SinkTrace() << "Inspecting " << inspectionType << domainType << entityId << property << expectedValue; - if (property == "testInspection") { - if (expectedValue.toBool()) { - //Success - return KAsync::null(); - } else { - //Failure - return KAsync::error(1, "Failed."); - } - } - return KAsync::null(); -} - - DummyResourceFactory::DummyResourceFactory(QObject *parent) : Sink::ResourceFactory(parent) { diff --git a/examples/dummyresource/resourcefactory.h b/examples/dummyresource/resourcefactory.h index 8ef27a6..2eb7558 100644 --- a/examples/dummyresource/resourcefactory.h +++ b/examples/dummyresource/resourcefactory.h @@ -33,7 +33,6 @@ public: virtual ~DummyResource(); KAsync::Job synchronizeWithSource(const Sink::QueryBase &) Q_DECL_OVERRIDE; - KAsync::Job inspect(int inspectionType, const QByteArray &inspectionId, const QByteArray &domainType, const QByteArray &entityId, const QByteArray &property, const QVariant &expectedValue) Q_DECL_OVERRIDE; }; class DummyResourceFactory : public Sink::ResourceFactory diff --git a/examples/imapresource/imapresource.cpp b/examples/imapresource/imapresource.cpp index 2aa5a2c..40fa75f 100644 --- a/examples/imapresource/imapresource.cpp +++ b/examples/imapresource/imapresource.cpp @@ -27,6 +27,7 @@ #include "definitions.h" #include "inspection.h" #include "synchronizer.h" +#include "inspector.h" #include "remoteidmap.h" #include "query.h" @@ -553,169 +554,192 @@ public: QByteArray mResourceInstanceIdentifier; }; -ImapResource::ImapResource(const ResourceContext &resourceContext) - : Sink::GenericResource(resourceContext) -{ - auto config = ResourceConfig::getConfiguration(resourceContext.instanceId()); - mServer = config.value("server").toString(); - mPort = config.value("port").toInt(); - mUser = config.value("username").toString(); - mPassword = config.value("password").toString(); - if (mServer.startsWith("imap")) { - mServer.remove("imap://"); - mServer.remove("imaps://"); - } - if (mServer.contains(':')) { - auto list = mServer.split(':'); - mServer = list.at(0); - mPort = list.at(1).toInt(); - } - - auto synchronizer = QSharedPointer::create(resourceContext); - synchronizer->mServer = mServer; - synchronizer->mPort = mPort; - synchronizer->mUser = mUser; - synchronizer->mPassword = mPassword; - setupSynchronizer(synchronizer); - - setupPreprocessors(ENTITY_TYPE_MAIL, QVector() << new SpecialPurposeProcessor(resourceContext.resourceType, resourceContext.instanceId()) << new MimeMessageMover << new MailPropertyExtractor); - setupPreprocessors(ENTITY_TYPE_FOLDER, QVector()); -} +class ImapInspector : public Sink::Inspector { +public: + ImapInspector(const Sink::ResourceContext &resourceContext) + : Sink::Inspector(resourceContext) + { -KAsync::Job ImapResource::inspect(int inspectionType, const QByteArray &inspectionId, const QByteArray &domainType, const QByteArray &entityId, const QByteArray &property, const QVariant &expectedValue) -{ - auto synchronizationStore = QSharedPointer::create(Sink::storageLocation(), mResourceContext.instanceId() + ".synchronization", Sink::Storage::DataStore::ReadOnly); - auto synchronizationTransaction = synchronizationStore->createTransaction(Sink::Storage::DataStore::ReadOnly); + } - auto mainStore = QSharedPointer::create(Sink::storageLocation(), mResourceContext.instanceId(), Sink::Storage::DataStore::ReadOnly); - auto transaction = mainStore->createTransaction(Sink::Storage::DataStore::ReadOnly); +protected: + KAsync::Job inspect(int inspectionType, const QByteArray &inspectionId, const QByteArray &domainType, const QByteArray &entityId, const QByteArray &property, const QVariant &expectedValue) Q_DECL_OVERRIDE { + auto synchronizationStore = QSharedPointer::create(Sink::storageLocation(), mResourceContext.instanceId() + ".synchronization", Sink::Storage::DataStore::ReadOnly); + auto synchronizationTransaction = synchronizationStore->createTransaction(Sink::Storage::DataStore::ReadOnly); - Sink::Storage::EntityStore entityStore(mResourceContext); - auto syncStore = QSharedPointer::create(synchronizationTransaction); + auto mainStore = QSharedPointer::create(Sink::storageLocation(), mResourceContext.instanceId(), Sink::Storage::DataStore::ReadOnly); + auto transaction = mainStore->createTransaction(Sink::Storage::DataStore::ReadOnly); - SinkTrace() << "Inspecting " << inspectionType << domainType << entityId << property << expectedValue; + Sink::Storage::EntityStore entityStore(mResourceContext); + auto syncStore = QSharedPointer::create(synchronizationTransaction); - if (domainType == ENTITY_TYPE_MAIL) { - const auto mail = entityStore.readLatest(entityId); - const auto folder = entityStore.readLatest(mail.getFolder()); - const auto folderRemoteId = syncStore->resolveLocalId(ENTITY_TYPE_FOLDER, mail.getFolder()); - const auto mailRemoteId = syncStore->resolveLocalId(ENTITY_TYPE_MAIL, mail.identifier()); - if (mailRemoteId.isEmpty() || folderRemoteId.isEmpty()) { - SinkWarning() << "Missing remote id for folder or mail. " << mailRemoteId << folderRemoteId; - return KAsync::error(); - } - const auto uid = uidFromMailRid(mailRemoteId); - SinkTrace() << "Mail remote id: " << folderRemoteId << mailRemoteId << mail.identifier() << folder.identifier(); + SinkTrace() << "Inspecting " << inspectionType << domainType << entityId << property << expectedValue; - KIMAP2::ImapSet set; - set.add(uid); - if (set.isEmpty()) { - return KAsync::error(1, "Couldn't determine uid of mail."); - } - KIMAP2::FetchJob::FetchScope scope; - scope.mode = KIMAP2::FetchJob::FetchScope::Full; - auto imap = QSharedPointer::create(mServer, mPort); - auto messageByUid = QSharedPointer>::create(); - SinkTrace() << "Connecting to:" << mServer << mPort; - SinkTrace() << "as:" << mUser; - auto inspectionJob = imap->login(mUser, mPassword) - .then(imap->select(folderRemoteId)) - .syncThen([](Imap::SelectResult){}) - .then(imap->fetch(set, scope, [imap, messageByUid](const Imap::Message &message) { - messageByUid->insert(message.uid, message); - })); - - if (inspectionType == Sink::ResourceControl::Inspection::PropertyInspectionType) { - if (property == "unread") { - return inspectionJob.then([=]() { - auto msg = messageByUid->value(uid); - if (expectedValue.toBool() && msg.flags.contains(Imap::Flags::Seen)) { - return KAsync::error(1, "Expected unread but couldn't find it."); - } - if (!expectedValue.toBool() && !msg.flags.contains(Imap::Flags::Seen)) { - return KAsync::error(1, "Expected read but couldn't find it."); - } - return KAsync::null(); - }); - } - if (property == "subject") { - return inspectionJob.then([=]() { - auto msg = messageByUid->value(uid); - if (msg.msg->subject(true)->asUnicodeString() != expectedValue.toString()) { - return KAsync::error(1, "Subject not as expected: " + msg.msg->subject(true)->asUnicodeString()); - } - return KAsync::null(); - }); + if (domainType == ENTITY_TYPE_MAIL) { + const auto mail = entityStore.readLatest(entityId); + const auto folder = entityStore.readLatest(mail.getFolder()); + const auto folderRemoteId = syncStore->resolveLocalId(ENTITY_TYPE_FOLDER, mail.getFolder()); + const auto mailRemoteId = syncStore->resolveLocalId(ENTITY_TYPE_MAIL, mail.identifier()); + if (mailRemoteId.isEmpty() || folderRemoteId.isEmpty()) { + SinkWarning() << "Missing remote id for folder or mail. " << mailRemoteId << folderRemoteId; + return KAsync::error(); } - } - if (inspectionType == Sink::ResourceControl::Inspection::ExistenceInspectionType) { - return inspectionJob.then([=]() { - if (!messageByUid->contains(uid)) { - SinkWarning() << "Existing messages are: " << messageByUid->keys(); - SinkWarning() << "We're looking for: " << uid; - return KAsync::error(1, "Couldn't find message: " + mailRemoteId); - } - return KAsync::null(); - }); - } - } - if (domainType == ENTITY_TYPE_FOLDER) { - const auto remoteId = syncStore->resolveLocalId(ENTITY_TYPE_FOLDER, entityId); - const auto folder = entityStore.readLatest(entityId); - - if (inspectionType == Sink::ResourceControl::Inspection::CacheIntegrityInspectionType) { - SinkLog() << "Inspecting cache integrity" << remoteId; + const auto uid = uidFromMailRid(mailRemoteId); + SinkTrace() << "Mail remote id: " << folderRemoteId << mailRemoteId << mail.identifier() << folder.identifier(); - int expectedCount = 0; - Index index("mail.index.folder", transaction); - index.lookup(entityId, [&](const QByteArray &sinkId) { - expectedCount++; - }, - [&](const Index::Error &error) { - SinkWarning() << "Error in index: " << error.message << property; - }); - - auto set = KIMAP2::ImapSet::fromImapSequenceSet("1:*"); + KIMAP2::ImapSet set; + set.add(uid); + if (set.isEmpty()) { + return KAsync::error(1, "Couldn't determine uid of mail."); + } KIMAP2::FetchJob::FetchScope scope; - scope.mode = KIMAP2::FetchJob::FetchScope::Headers; + scope.mode = KIMAP2::FetchJob::FetchScope::Full; auto imap = QSharedPointer::create(mServer, mPort); auto messageByUid = QSharedPointer>::create(); - return imap->login(mUser, mPassword) - .then(imap->select(remoteId).syncThen([](){})) - .then(imap->fetch(set, scope, [=](const Imap::Message message) { + SinkTrace() << "Connecting to:" << mServer << mPort; + SinkTrace() << "as:" << mUser; + auto inspectionJob = imap->login(mUser, mPassword) + .then(imap->select(folderRemoteId)) + .syncThen([](Imap::SelectResult){}) + .then(imap->fetch(set, scope, [imap, messageByUid](const Imap::Message &message) { messageByUid->insert(message.uid, message); - })) - .then([imap, messageByUid, expectedCount]() { - if (messageByUid->size() != expectedCount) { - return KAsync::error(1, QString("Wrong number of messages on the server; found %1 instead of %2.").arg(messageByUid->size()).arg(expectedCount)); + })); + + if (inspectionType == Sink::ResourceControl::Inspection::PropertyInspectionType) { + if (property == "unread") { + return inspectionJob.then([=]() { + auto msg = messageByUid->value(uid); + if (expectedValue.toBool() && msg.flags.contains(Imap::Flags::Seen)) { + return KAsync::error(1, "Expected unread but couldn't find it."); + } + if (!expectedValue.toBool() && !msg.flags.contains(Imap::Flags::Seen)) { + return KAsync::error(1, "Expected read but couldn't find it."); + } + return KAsync::null(); + }); + } + if (property == "subject") { + return inspectionJob.then([=]() { + auto msg = messageByUid->value(uid); + if (msg.msg->subject(true)->asUnicodeString() != expectedValue.toString()) { + return KAsync::error(1, "Subject not as expected: " + msg.msg->subject(true)->asUnicodeString()); + } + return KAsync::null(); + }); + } + } + if (inspectionType == Sink::ResourceControl::Inspection::ExistenceInspectionType) { + return inspectionJob.then([=]() { + if (!messageByUid->contains(uid)) { + SinkWarning() << "Existing messages are: " << messageByUid->keys(); + SinkWarning() << "We're looking for: " << uid; + return KAsync::error(1, "Couldn't find message: " + mailRemoteId); } return KAsync::null(); }); + } } - if (inspectionType == Sink::ResourceControl::Inspection::ExistenceInspectionType) { - auto folderByPath = QSharedPointer>::create(); - auto folderByName = QSharedPointer>::create(); + if (domainType == ENTITY_TYPE_FOLDER) { + const auto remoteId = syncStore->resolveLocalId(ENTITY_TYPE_FOLDER, entityId); + const auto folder = entityStore.readLatest(entityId); - auto imap = QSharedPointer::create(mServer, mPort); - auto inspectionJob = imap->login(mUser, mPassword) - .then(imap->fetchFolders([=](const Imap::Folder &f) { - *folderByPath << f.normalizedPath(); - *folderByName << f.name(); - })) - .then([this, folderByName, folderByPath, folder, remoteId, imap]() { - if (!folderByName->contains(folder.getName())) { - SinkWarning() << "Existing folders are: " << *folderByPath; - SinkWarning() << "We're looking for: " << folder.getName(); - return KAsync::error(1, "Wrong folder name: " + remoteId); - } - return KAsync::null(); + if (inspectionType == Sink::ResourceControl::Inspection::CacheIntegrityInspectionType) { + SinkLog() << "Inspecting cache integrity" << remoteId; + + int expectedCount = 0; + Index index("mail.index.folder", transaction); + index.lookup(entityId, [&](const QByteArray &sinkId) { + expectedCount++; + }, + [&](const Index::Error &error) { + SinkWarning() << "Error in index: " << error.message << property; }); - return inspectionJob; + auto set = KIMAP2::ImapSet::fromImapSequenceSet("1:*"); + KIMAP2::FetchJob::FetchScope scope; + scope.mode = KIMAP2::FetchJob::FetchScope::Headers; + auto imap = QSharedPointer::create(mServer, mPort); + auto messageByUid = QSharedPointer>::create(); + return imap->login(mUser, mPassword) + .then(imap->select(remoteId).syncThen([](){})) + .then(imap->fetch(set, scope, [=](const Imap::Message message) { + messageByUid->insert(message.uid, message); + })) + .then([imap, messageByUid, expectedCount]() { + if (messageByUid->size() != expectedCount) { + return KAsync::error(1, QString("Wrong number of messages on the server; found %1 instead of %2.").arg(messageByUid->size()).arg(expectedCount)); + } + return KAsync::null(); + }); + } + if (inspectionType == Sink::ResourceControl::Inspection::ExistenceInspectionType) { + auto folderByPath = QSharedPointer>::create(); + auto folderByName = QSharedPointer>::create(); + + auto imap = QSharedPointer::create(mServer, mPort); + auto inspectionJob = imap->login(mUser, mPassword) + .then(imap->fetchFolders([=](const Imap::Folder &f) { + *folderByPath << f.normalizedPath(); + *folderByName << f.name(); + })) + .then([this, folderByName, folderByPath, folder, remoteId, imap]() { + if (!folderByName->contains(folder.getName())) { + SinkWarning() << "Existing folders are: " << *folderByPath; + SinkWarning() << "We're looking for: " << folder.getName(); + return KAsync::error(1, "Wrong folder name: " + remoteId); + } + return KAsync::null(); + }); + + return inspectionJob; + } + } + return KAsync::null(); + } + +public: + QString mServer; + int mPort; + QString mUser; + QString mPassword; +}; + +ImapResource::ImapResource(const ResourceContext &resourceContext) + : Sink::GenericResource(resourceContext) +{ + auto config = ResourceConfig::getConfiguration(resourceContext.instanceId()); + auto server = config.value("server").toString(); + auto port = config.value("port").toInt(); + auto user = config.value("username").toString(); + auto password = config.value("password").toString(); + if (server.startsWith("imap")) { + server.remove("imap://"); + server.remove("imaps://"); + } + if (server.contains(':')) { + auto list = server.split(':'); + server = list.at(0); + port = list.at(1).toInt(); } - return KAsync::null(); + + auto synchronizer = QSharedPointer::create(resourceContext); + synchronizer->mServer = server; + synchronizer->mPort = port; + synchronizer->mUser = user; + synchronizer->mPassword = password; + setupSynchronizer(synchronizer); + + auto inspector = QSharedPointer::create(resourceContext); + inspector->mServer = server; + inspector->mPort = port; + inspector->mUser = user; + inspector->mPassword = password; + setupInspector(inspector); + + setupPreprocessors(ENTITY_TYPE_MAIL, QVector() << new SpecialPurposeProcessor(resourceContext.resourceType, resourceContext.instanceId()) << new MimeMessageMover << new MailPropertyExtractor); + setupPreprocessors(ENTITY_TYPE_FOLDER, QVector()); } ImapResourceFactory::ImapResourceFactory(QObject *parent) diff --git a/examples/imapresource/imapresource.h b/examples/imapresource/imapresource.h index d345d64..aeb1200 100644 --- a/examples/imapresource/imapresource.h +++ b/examples/imapresource/imapresource.h @@ -40,13 +40,6 @@ class ImapResource : public Sink::GenericResource { public: ImapResource(const Sink::ResourceContext &resourceContext); - KAsync::Job inspect(int inspectionType, const QByteArray &inspectionId, const QByteArray &domainType, const QByteArray &entityId, const QByteArray &property, const QVariant &expectedValue) Q_DECL_OVERRIDE; - -private: - QString mServer; - int mPort; - QString mUser; - QString mPassword; }; class ImapResourceFactory : public Sink::ResourceFactory diff --git a/examples/maildirresource/maildirresource.cpp b/examples/maildirresource/maildirresource.cpp index ee84bde..2b19789 100644 --- a/examples/maildirresource/maildirresource.cpp +++ b/examples/maildirresource/maildirresource.cpp @@ -27,6 +27,7 @@ #include "libmaildir/maildir.h" #include "inspection.h" #include "synchronizer.h" +#include "inspector.h" #include "facadefactory.h" #include "adaptorfactoryregistry.h" @@ -425,6 +426,102 @@ public: QString mMaildirPath; }; +class MaildirInspector : public Sink::Inspector { +public: + MaildirInspector(const Sink::ResourceContext &resourceContext) + : Sink::Inspector(resourceContext) + { + + } +protected: + + KAsync::Job inspect(int inspectionType, const QByteArray &inspectionId, const QByteArray &domainType, const QByteArray &entityId, const QByteArray &property, const QVariant &expectedValue) Q_DECL_OVERRIDE { + auto synchronizationStore = QSharedPointer::create(Sink::storageLocation(), mResourceContext.instanceId() + ".synchronization", Sink::Storage::DataStore::ReadOnly); + auto synchronizationTransaction = synchronizationStore->createTransaction(Sink::Storage::DataStore::ReadOnly); + + auto mainStore = QSharedPointer::create(Sink::storageLocation(), mResourceContext.instanceId(), Sink::Storage::DataStore::ReadOnly); + auto transaction = mainStore->createTransaction(Sink::Storage::DataStore::ReadOnly); + + Sink::Storage::EntityStore entityStore(mResourceContext); + auto syncStore = QSharedPointer::create(synchronizationTransaction); + + SinkTrace() << "Inspecting " << inspectionType << domainType << entityId << property << expectedValue; + + if (domainType == ENTITY_TYPE_MAIL) { + auto mail = entityStore.readLatest(entityId); + const auto filePath = getFilePathFromMimeMessagePath(mail.getMimeMessagePath()); + + if (inspectionType == Sink::ResourceControl::Inspection::PropertyInspectionType) { + if (property == "unread") { + const auto flags = KPIM::Maildir::readEntryFlags(filePath.split('/').last()); + if (expectedValue.toBool() && (flags & KPIM::Maildir::Seen)) { + return KAsync::error(1, "Expected unread but couldn't find it."); + } + if (!expectedValue.toBool() && !(flags & KPIM::Maildir::Seen)) { + return KAsync::error(1, "Expected read but couldn't find it."); + } + return KAsync::null(); + } + if (property == "subject") { + KMime::Message *msg = new KMime::Message; + msg->setHead(KMime::CRLFtoLF(KPIM::Maildir::readEntryHeadersFromFile(filePath))); + msg->parse(); + + if (msg->subject(true)->asUnicodeString() != expectedValue.toString()) { + return KAsync::error(1, "Subject not as expected: " + msg->subject(true)->asUnicodeString()); + } + return KAsync::null(); + } + } + if (inspectionType == Sink::ResourceControl::Inspection::ExistenceInspectionType) { + if (QFileInfo(filePath).exists() != expectedValue.toBool()) { + return KAsync::error(1, "Wrong file existence: " + filePath); + } + } + } + if (domainType == ENTITY_TYPE_FOLDER) { + const auto remoteId = syncStore->resolveLocalId(ENTITY_TYPE_FOLDER, entityId); + auto folder = entityStore.readLatest(entityId); + + if (inspectionType == Sink::ResourceControl::Inspection::CacheIntegrityInspectionType) { + SinkTrace() << "Inspecting cache integrity" << remoteId; + if (!QDir(remoteId).exists()) { + return KAsync::error(1, "The directory is not existing: " + remoteId); + } + + int expectedCount = 0; + Index index("mail.index.folder", transaction); + index.lookup(entityId, [&](const QByteArray &sinkId) { + expectedCount++; + }, + [&](const Index::Error &error) { + SinkWarning() << "Error in index: " << error.message << property; + }); + + QDir dir(remoteId + "/cur"); + const QFileInfoList list = dir.entryInfoList(QDir::Files); + if (list.size() != expectedCount) { + for (const auto &fileInfo : list) { + SinkWarning() << "Found in cache: " << fileInfo.fileName(); + } + return KAsync::error(1, QString("Wrong number of files; found %1 instead of %2.").arg(list.size()).arg(expectedCount)); + } + } + if (inspectionType == Sink::ResourceControl::Inspection::ExistenceInspectionType) { + if (!remoteId.endsWith(folder.getName().toUtf8())) { + return KAsync::error(1, "Wrong folder name: " + remoteId); + } + //TODO we shouldn't use the remoteId here to figure out the path, it could be gone/changed already + if (QDir(remoteId).exists() != expectedValue.toBool()) { + return KAsync::error(1, "Wrong folder existence: " + remoteId); + } + } + + } + return KAsync::null(); + } +}; + MaildirResource::MaildirResource(const Sink::ResourceContext &resourceContext) : Sink::GenericResource(resourceContext) @@ -439,6 +536,7 @@ MaildirResource::MaildirResource(const Sink::ResourceContext &resourceContext) auto synchronizer = QSharedPointer::create(resourceContext); synchronizer->mMaildirPath = mMaildirPath; setupSynchronizer(synchronizer); + setupInspector(QSharedPointer::create(resourceContext)); setupPreprocessors(ENTITY_TYPE_MAIL, QVector() << new SpecialPurposeProcessor(resourceContext.resourceType, resourceContext.instanceId()) << new MaildirMimeMessageMover(resourceContext.instanceId(), mMaildirPath) << new MaildirMailPropertyExtractor); setupPreprocessors(ENTITY_TYPE_FOLDER, QVector() << new FolderPreprocessor(mMaildirPath)); @@ -458,93 +556,6 @@ MaildirResource::MaildirResource(const Sink::ResourceContext &resourceContext) synchronizer->commit(); } -KAsync::Job MaildirResource::inspect(int inspectionType, const QByteArray &inspectionId, const QByteArray &domainType, const QByteArray &entityId, const QByteArray &property, const QVariant &expectedValue) -{ - auto synchronizationStore = QSharedPointer::create(Sink::storageLocation(), mResourceContext.instanceId() + ".synchronization", Sink::Storage::DataStore::ReadOnly); - auto synchronizationTransaction = synchronizationStore->createTransaction(Sink::Storage::DataStore::ReadOnly); - - auto mainStore = QSharedPointer::create(Sink::storageLocation(), mResourceContext.instanceId(), Sink::Storage::DataStore::ReadOnly); - auto transaction = mainStore->createTransaction(Sink::Storage::DataStore::ReadOnly); - - Sink::Storage::EntityStore entityStore(mResourceContext); - auto syncStore = QSharedPointer::create(synchronizationTransaction); - - SinkTrace() << "Inspecting " << inspectionType << domainType << entityId << property << expectedValue; - - if (domainType == ENTITY_TYPE_MAIL) { - auto mail = entityStore.readLatest(entityId); - const auto filePath = getFilePathFromMimeMessagePath(mail.getMimeMessagePath()); - - if (inspectionType == Sink::ResourceControl::Inspection::PropertyInspectionType) { - if (property == "unread") { - const auto flags = KPIM::Maildir::readEntryFlags(filePath.split('/').last()); - if (expectedValue.toBool() && (flags & KPIM::Maildir::Seen)) { - return KAsync::error(1, "Expected unread but couldn't find it."); - } - if (!expectedValue.toBool() && !(flags & KPIM::Maildir::Seen)) { - return KAsync::error(1, "Expected read but couldn't find it."); - } - return KAsync::null(); - } - if (property == "subject") { - KMime::Message *msg = new KMime::Message; - msg->setHead(KMime::CRLFtoLF(KPIM::Maildir::readEntryHeadersFromFile(filePath))); - msg->parse(); - - if (msg->subject(true)->asUnicodeString() != expectedValue.toString()) { - return KAsync::error(1, "Subject not as expected: " + msg->subject(true)->asUnicodeString()); - } - return KAsync::null(); - } - } - if (inspectionType == Sink::ResourceControl::Inspection::ExistenceInspectionType) { - if (QFileInfo(filePath).exists() != expectedValue.toBool()) { - return KAsync::error(1, "Wrong file existence: " + filePath); - } - } - } - if (domainType == ENTITY_TYPE_FOLDER) { - const auto remoteId = syncStore->resolveLocalId(ENTITY_TYPE_FOLDER, entityId); - auto folder = entityStore.readLatest(entityId); - - if (inspectionType == Sink::ResourceControl::Inspection::CacheIntegrityInspectionType) { - SinkTrace() << "Inspecting cache integrity" << remoteId; - if (!QDir(remoteId).exists()) { - return KAsync::error(1, "The directory is not existing: " + remoteId); - } - - int expectedCount = 0; - Index index("mail.index.folder", transaction); - index.lookup(entityId, [&](const QByteArray &sinkId) { - expectedCount++; - }, - [&](const Index::Error &error) { - SinkWarning() << "Error in index: " << error.message << property; - }); - - QDir dir(remoteId + "/cur"); - const QFileInfoList list = dir.entryInfoList(QDir::Files); - if (list.size() != expectedCount) { - for (const auto &fileInfo : list) { - SinkWarning() << "Found in cache: " << fileInfo.fileName(); - } - return KAsync::error(1, QString("Wrong number of files; found %1 instead of %2.").arg(list.size()).arg(expectedCount)); - } - } - if (inspectionType == Sink::ResourceControl::Inspection::ExistenceInspectionType) { - if (!remoteId.endsWith(folder.getName().toUtf8())) { - return KAsync::error(1, "Wrong folder name: " + remoteId); - } - //TODO we shouldn't use the remoteId here to figure out the path, it could be gone/changed already - if (QDir(remoteId).exists() != expectedValue.toBool()) { - return KAsync::error(1, "Wrong folder existence: " + remoteId); - } - } - - } - return KAsync::null(); -} - MaildirResourceFactory::MaildirResourceFactory(QObject *parent) : Sink::ResourceFactory(parent) diff --git a/examples/maildirresource/maildirresource.h b/examples/maildirresource/maildirresource.h index 4eb2042..61fe438 100644 --- a/examples/maildirresource/maildirresource.h +++ b/examples/maildirresource/maildirresource.h @@ -43,9 +43,8 @@ class MaildirResource : public Sink::GenericResource { public: MaildirResource(const Sink::ResourceContext &resourceContext); - KAsync::Job inspect(int inspectionType, const QByteArray &inspectionId, const QByteArray &domainType, const QByteArray &entityId, const QByteArray &property, const QVariant &expectedValue) Q_DECL_OVERRIDE; -private: +private: QStringList listAvailableFolders(); QString mMaildirPath; QString mDraftsFolder; diff --git a/examples/mailtransportresource/mailtransportresource.cpp b/examples/mailtransportresource/mailtransportresource.cpp index c135de9..524b411 100644 --- a/examples/mailtransportresource/mailtransportresource.cpp +++ b/examples/mailtransportresource/mailtransportresource.cpp @@ -22,7 +22,7 @@ #include "facadefactory.h" #include "resourceconfig.h" #include "definitions.h" -#include "domainadaptor.h" +#include "inspector.h" #include #include #include @@ -124,6 +124,31 @@ public: MailtransportResource::Settings mSettings; }; +class MailtransportInspector : public Sink::Inspector { +public: + MailtransportInspector(const Sink::ResourceContext &resourceContext) + : Sink::Inspector(resourceContext) + { + + } + +protected: + KAsync::Job inspect(int inspectionType, const QByteArray &inspectionId, const QByteArray &domainType, const QByteArray &entityId, const QByteArray &property, const QVariant &expectedValue) Q_DECL_OVERRIDE + { + if (domainType == ENTITY_TYPE_MAIL) { + if (inspectionType == Sink::ResourceControl::Inspection::ExistenceInspectionType) { + auto path = resourceStorageLocation(mResourceContext.instanceId()) + "/test/" + entityId; + if (QFileInfo::exists(path)) { + return KAsync::null(); + } + return KAsync::error(1, "Couldn't find message: " + path); + } + } + return KAsync::null(); + } +}; + + MailtransportResource::MailtransportResource(const Sink::ResourceContext &resourceContext) : Sink::GenericResource(resourceContext) { @@ -138,30 +163,11 @@ MailtransportResource::MailtransportResource(const Sink::ResourceContext &resour auto synchronizer = QSharedPointer::create(resourceContext); synchronizer->mSettings = mSettings; setupSynchronizer(synchronizer); + setupInspector(QSharedPointer::create(resourceContext)); setupPreprocessors(ENTITY_TYPE_MAIL, QVector() << new MimeMessageMover << new MailPropertyExtractor); } -void MailtransportResource::removeFromDisk(const QByteArray &instanceIdentifier) -{ - GenericResource::removeFromDisk(instanceIdentifier); - Sink::Storage::DataStore(Sink::storageLocation(), instanceIdentifier + ".synchronization", Sink::Storage::DataStore::ReadWrite).removeFromDisk(); -} - -KAsync::Job MailtransportResource::inspect(int inspectionType, const QByteArray &inspectionId, const QByteArray &domainType, const QByteArray &entityId, const QByteArray &property, const QVariant &expectedValue) -{ - if (domainType == ENTITY_TYPE_MAIL) { - if (inspectionType == Sink::ResourceControl::Inspection::ExistenceInspectionType) { - auto path = resourceStorageLocation(mResourceContext.instanceId()) + "/test/" + entityId; - if (QFileInfo::exists(path)) { - return KAsync::null(); - } - return KAsync::error(1, "Couldn't find message: " + path); - } - } - return KAsync::null(); -} - MailtransportResourceFactory::MailtransportResourceFactory(QObject *parent) : Sink::ResourceFactory(parent) { diff --git a/examples/mailtransportresource/mailtransportresource.h b/examples/mailtransportresource/mailtransportresource.h index 95a9cd7..531fcd5 100644 --- a/examples/mailtransportresource/mailtransportresource.h +++ b/examples/mailtransportresource/mailtransportresource.h @@ -26,8 +26,6 @@ class MailtransportResource : public Sink::GenericResource { public: MailtransportResource(const Sink::ResourceContext &resourceContext); - KAsync::Job inspect(int inspectionType, const QByteArray &inspectionId, const QByteArray &domainType, const QByteArray &entityId, const QByteArray &property, const QVariant &expectedValue) Q_DECL_OVERRIDE; - static void removeFromDisk(const QByteArray &instanceIdentifier); struct Settings { QString server; -- cgit v1.2.3