summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--common/CMakeLists.txt1
-rw-r--r--common/commandprocessor.cpp27
-rw-r--r--common/commandprocessor.h14
-rw-r--r--common/genericresource.cpp58
-rw-r--r--common/genericresource.h4
-rw-r--r--common/inspector.cpp85
-rw-r--r--common/inspector.h52
-rw-r--r--examples/dummyresource/resourcefactory.cpp44
-rw-r--r--examples/dummyresource/resourcefactory.h1
-rw-r--r--examples/imapresource/imapresource.cpp306
-rw-r--r--examples/imapresource/imapresource.h7
-rw-r--r--examples/maildirresource/maildirresource.cpp185
-rw-r--r--examples/maildirresource/maildirresource.h3
-rw-r--r--examples/mailtransportresource/mailtransportresource.cpp48
-rw-r--r--examples/mailtransportresource/mailtransportresource.h2
15 files changed, 492 insertions, 345 deletions
diff --git a/common/CMakeLists.txt b/common/CMakeLists.txt
index 018fc22..df44ce5 100644
--- a/common/CMakeLists.txt
+++ b/common/CMakeLists.txt
@@ -78,6 +78,7 @@ set(command_SRCS
78 mail/threadindexer.cpp 78 mail/threadindexer.cpp
79 notification.cpp 79 notification.cpp
80 commandprocessor.cpp 80 commandprocessor.cpp
81 inspector.cpp
81 ${storage_SRCS}) 82 ${storage_SRCS})
82 83
83add_library(${PROJECT_NAME} SHARED ${command_SRCS}) 84add_library(${PROJECT_NAME} SHARED ${command_SRCS})
diff --git a/common/commandprocessor.cpp b/common/commandprocessor.cpp
index c9fca37..4ff352b 100644
--- a/common/commandprocessor.cpp
+++ b/common/commandprocessor.cpp
@@ -22,7 +22,8 @@
22#include "commands.h" 22#include "commands.h"
23#include "messagequeue.h" 23#include "messagequeue.h"
24#include "queuedcommand_generated.h" 24#include "queuedcommand_generated.h"
25 25#include "inspector.h"
26#include "synchronizer.h"
26#include "pipeline.h" 27#include "pipeline.h"
27 28
28static int sBatchSize = 100; 29static int sBatchSize = 100;
@@ -42,11 +43,6 @@ void CommandProcessor::setOldestUsedRevision(qint64 revision)
42 mLowerBoundRevision = revision; 43 mLowerBoundRevision = revision;
43} 44}
44 45
45void CommandProcessor::setInspectionCommand(const InspectionFunction &f)
46{
47 mInspect = f;
48}
49
50void CommandProcessor::setFlushCommand(const FlushFunction &f) 46void CommandProcessor::setFlushCommand(const FlushFunction &f)
51{ 47{
52 mFlush = f; 48 mFlush = f;
@@ -91,12 +87,9 @@ KAsync::Job<qint64> CommandProcessor::processQueuedCommand(const Sink::QueuedCom
91 case Sink::Commands::CreateEntityCommand: 87 case Sink::Commands::CreateEntityCommand:
92 return mPipeline->newEntity(data, size); 88 return mPipeline->newEntity(data, size);
93 case Sink::Commands::InspectionCommand: 89 case Sink::Commands::InspectionCommand:
94 if (mInspect) { 90 Q_ASSERT(mInspector);
95 return mInspect(data, size) 91 return mInspector->processCommand(data, size)
96 .syncThen<qint64>([]() { return -1; }); 92 .syncThen<qint64>([]() { return -1; });
97 } else {
98 return KAsync::error<qint64>(-1, "Missing inspection command.");
99 }
100 case Sink::Commands::FlushCommand: 93 case Sink::Commands::FlushCommand:
101 if (mFlush) { 94 if (mFlush) {
102 return mFlush(data, size) 95 return mFlush(data, size)
@@ -191,3 +184,15 @@ KAsync::Job<void> CommandProcessor::processPipeline()
191 }); 184 });
192} 185}
193 186
187void CommandProcessor::setInspector(const QSharedPointer<Inspector> &inspector)
188{
189 mInspector = inspector;
190 QObject::connect(mInspector.data(), &Inspector::notify, this, &CommandProcessor::notify);
191}
192
193void CommandProcessor::setSynchronizer(const QSharedPointer<Synchronizer> &synchronizer)
194{
195 mSynchronizer = synchronizer;
196 QObject::connect(mSynchronizer.data(), &Synchronizer::notify, this, &CommandProcessor::notify);
197}
198
diff --git a/common/commandprocessor.h b/common/commandprocessor.h
index 51d845e..75ae37a 100644
--- a/common/commandprocessor.h
+++ b/common/commandprocessor.h
@@ -24,12 +24,16 @@
24#include <QObject> 24#include <QObject>
25#include <Async/Async> 25#include <Async/Async>
26#include <functional> 26#include <functional>
27
27#include "log.h" 28#include "log.h"
29#include "notification.h"
28 30
29class MessageQueue; 31class MessageQueue;
30 32
31namespace Sink { 33namespace Sink {
32 class Pipeline; 34 class Pipeline;
35 class Inspector;
36 class Synchronizer;
33 class QueuedCommand; 37 class QueuedCommand;
34 38
35/** 39/**
@@ -38,7 +42,6 @@ namespace Sink {
38class CommandProcessor : public QObject 42class CommandProcessor : public QObject
39{ 43{
40 Q_OBJECT 44 Q_OBJECT
41 typedef std::function<KAsync::Job<void>(void const *, size_t)> InspectionFunction;
42 typedef std::function<KAsync::Job<void>(void const *, size_t)> FlushFunction; 45 typedef std::function<KAsync::Job<void>(void const *, size_t)> FlushFunction;
43 SINK_DEBUG_AREA("commandprocessor") 46 SINK_DEBUG_AREA("commandprocessor")
44 47
@@ -47,11 +50,13 @@ public:
47 50
48 void setOldestUsedRevision(qint64 revision); 51 void setOldestUsedRevision(qint64 revision);
49 52
50 void setInspectionCommand(const InspectionFunction &f);
51
52 void setFlushCommand(const FlushFunction &f); 53 void setFlushCommand(const FlushFunction &f);
53 54
55 void setInspector(const QSharedPointer<Inspector> &inspector);
56 void setSynchronizer(const QSharedPointer<Synchronizer> &synchronizer);
57
54signals: 58signals:
59 void notify(Notification);
55 void error(int errorCode, const QString &errorMessage); 60 void error(int errorCode, const QString &errorMessage);
56 61
57private: 62private:
@@ -72,8 +77,9 @@ private:
72 bool mProcessingLock; 77 bool mProcessingLock;
73 // The lowest revision we no longer need 78 // The lowest revision we no longer need
74 qint64 mLowerBoundRevision; 79 qint64 mLowerBoundRevision;
75 InspectionFunction mInspect;
76 FlushFunction mFlush; 80 FlushFunction mFlush;
81 QSharedPointer<Synchronizer> mSynchronizer;
82 QSharedPointer<Inspector> mInspector;
77}; 83};
78 84
79}; 85};
diff --git a/common/genericresource.cpp b/common/genericresource.cpp
index 3aa4fce..80e59c9 100644
--- a/common/genericresource.cpp
+++ b/common/genericresource.cpp
@@ -59,41 +59,6 @@ GenericResource::GenericResource(const ResourceContext &resourceContext, const Q
59 mClientLowerBoundRevision(std::numeric_limits<qint64>::max()) 59 mClientLowerBoundRevision(std::numeric_limits<qint64>::max())
60{ 60{
61 mProcessor = std::unique_ptr<CommandProcessor>(new CommandProcessor(mPipeline.data(), QList<MessageQueue *>() << &mUserQueue << &mSynchronizerQueue)); 61 mProcessor = std::unique_ptr<CommandProcessor>(new CommandProcessor(mPipeline.data(), QList<MessageQueue *>() << &mUserQueue << &mSynchronizerQueue));
62 mProcessor->setInspectionCommand([this](void const *command, size_t size) {
63 flatbuffers::Verifier verifier((const uint8_t *)command, size);
64 if (Sink::Commands::VerifyInspectionBuffer(verifier)) {
65 auto buffer = Sink::Commands::GetInspection(command);
66 int inspectionType = buffer->type();
67
68 QByteArray inspectionId = BufferUtils::extractBuffer(buffer->id());
69 QByteArray entityId = BufferUtils::extractBuffer(buffer->entityId());
70 QByteArray domainType = BufferUtils::extractBuffer(buffer->domainType());
71 QByteArray property = BufferUtils::extractBuffer(buffer->property());
72 QByteArray expectedValueString = BufferUtils::extractBuffer(buffer->expectedValue());
73 QDataStream s(expectedValueString);
74 QVariant expectedValue;
75 s >> expectedValue;
76 inspect(inspectionType, inspectionId, domainType, entityId, property, expectedValue)
77 .then<void>(
78 [=](const KAsync::Error &error) {
79 Sink::Notification n;
80 n.type = Sink::Notification::Inspection;
81 n.id = inspectionId;
82 if (error) {
83 Warning_area("resource.inspection") << "Inspection failed: " << inspectionType << inspectionId << entityId << error.errorMessage;
84 n.code = Sink::Notification::Failure;
85 } else {
86 Log_area("resource.inspection") << "Inspection was successful: " << inspectionType << inspectionId << entityId;
87 n.code = Sink::Notification::Success;
88 }
89 emit notify(n);
90 return KAsync::null();
91 })
92 .exec();
93 return KAsync::null<void>();
94 }
95 return KAsync::error<void>(-1, "Invalid inspection command.");
96 });
97 mProcessor->setFlushCommand([this](void const *command, size_t size) { 62 mProcessor->setFlushCommand([this](void const *command, size_t size) {
98 flatbuffers::Verifier verifier((const uint8_t *)command, size); 63 flatbuffers::Verifier verifier((const uint8_t *)command, size);
99 if (Sink::Commands::VerifyFlushBuffer(verifier)) { 64 if (Sink::Commands::VerifyFlushBuffer(verifier)) {
@@ -114,14 +79,9 @@ GenericResource::GenericResource(const ResourceContext &resourceContext, const Q
114 } 79 }
115 return KAsync::error<void>(-1, "Invalid flush command."); 80 return KAsync::error<void>(-1, "Invalid flush command.");
116 }); 81 });
117 { 82 QObject::connect(mProcessor.get(), &CommandProcessor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); });
118 auto ret = QObject::connect(mProcessor.get(), &CommandProcessor::error, [this](int errorCode, const QString &msg) { onProcessorError(errorCode, msg); }); 83 QObject::connect(mProcessor.get(), &CommandProcessor::notify, this, &GenericResource::notify);
119 Q_ASSERT(ret); 84 QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, this, &Resource::revisionUpdated);
120 }
121 {
122 auto ret = QObject::connect(mPipeline.data(), &Pipeline::revisionUpdated, this, &Resource::revisionUpdated);
123 Q_ASSERT(ret);
124 }
125 85
126 mCommitQueueTimer.setInterval(sCommitInterval); 86 mCommitQueueTimer.setInterval(sCommitInterval);
127 mCommitQueueTimer.setSingleShot(true); 87 mCommitQueueTimer.setSingleShot(true);
@@ -132,13 +92,6 @@ GenericResource::~GenericResource()
132{ 92{
133} 93}
134 94
135KAsync::Job<void> GenericResource::inspect(
136 int inspectionType, const QByteArray &inspectionId, const QByteArray &domainType, const QByteArray &entityId, const QByteArray &property, const QVariant &expectedValue)
137{
138 SinkWarning() << "Inspection not implemented";
139 return KAsync::null<void>();
140}
141
142void GenericResource::setupPreprocessors(const QByteArray &type, const QVector<Sink::Preprocessor *> &preprocessors) 95void GenericResource::setupPreprocessors(const QByteArray &type, const QVector<Sink::Preprocessor *> &preprocessors)
143{ 96{
144 mPipeline->setPreprocessors(type, preprocessors); 97 mPipeline->setPreprocessors(type, preprocessors);
@@ -179,6 +132,11 @@ void GenericResource::setupSynchronizer(const QSharedPointer<Synchronizer> &sync
179 QMetaObject::invokeMethod(mSynchronizer.data(), "revisionChanged", Qt::QueuedConnection); 132 QMetaObject::invokeMethod(mSynchronizer.data(), "revisionChanged", Qt::QueuedConnection);
180} 133}
181 134
135void GenericResource::setupInspector(const QSharedPointer<Inspector> &inspector)
136{
137 mProcessor->setInspector(inspector);
138}
139
182void GenericResource::removeFromDisk(const QByteArray &instanceIdentifier) 140void GenericResource::removeFromDisk(const QByteArray &instanceIdentifier)
183{ 141{
184 Sink::Storage::DataStore(Sink::storageLocation(), instanceIdentifier, Sink::Storage::DataStore::ReadWrite).removeFromDisk(); 142 Sink::Storage::DataStore(Sink::storageLocation(), instanceIdentifier, Sink::Storage::DataStore::ReadWrite).removeFromDisk();
diff --git a/common/genericresource.h b/common/genericresource.h
index 12f15f3..0bc47da 100644
--- a/common/genericresource.h
+++ b/common/genericresource.h
@@ -32,6 +32,7 @@ namespace Sink {
32class Pipeline; 32class Pipeline;
33class Preprocessor; 33class Preprocessor;
34class Synchronizer; 34class Synchronizer;
35class Inspector;
35class CommandProcessor; 36class CommandProcessor;
36 37
37/** 38/**
@@ -50,8 +51,6 @@ public:
50 virtual KAsync::Job<void> synchronizeWithSource(const Sink::QueryBase &query) Q_DECL_OVERRIDE; 51 virtual KAsync::Job<void> synchronizeWithSource(const Sink::QueryBase &query) Q_DECL_OVERRIDE;
51 virtual KAsync::Job<void> processAllMessages() Q_DECL_OVERRIDE; 52 virtual KAsync::Job<void> processAllMessages() Q_DECL_OVERRIDE;
52 virtual void setLowerBoundRevision(qint64 revision) Q_DECL_OVERRIDE; 53 virtual void setLowerBoundRevision(qint64 revision) Q_DECL_OVERRIDE;
53 virtual KAsync::Job<void>
54 inspect(int inspectionType, const QByteArray &inspectionId, const QByteArray &domainType, const QByteArray &entityId, const QByteArray &property, const QVariant &expectedValue);
55 54
56 int error() const; 55 int error() const;
57 56
@@ -64,6 +63,7 @@ private slots:
64protected: 63protected:
65 void setupPreprocessors(const QByteArray &type, const QVector<Sink::Preprocessor *> &preprocessors); 64 void setupPreprocessors(const QByteArray &type, const QVector<Sink::Preprocessor *> &preprocessors);
66 void setupSynchronizer(const QSharedPointer<Synchronizer> &synchronizer); 65 void setupSynchronizer(const QSharedPointer<Synchronizer> &synchronizer);
66 void setupInspector(const QSharedPointer<Inspector> &inspector);
67 67
68 void onProcessorError(int errorCode, const QString &errorMessage); 68 void onProcessorError(int errorCode, const QString &errorMessage);
69 void enqueueCommand(MessageQueue &mq, int commandId, const QByteArray &data); 69 void enqueueCommand(MessageQueue &mq, int commandId, const QByteArray &data);
diff --git a/common/inspector.cpp b/common/inspector.cpp
new file mode 100644
index 0000000..8b4c93a
--- /dev/null
+++ b/common/inspector.cpp
@@ -0,0 +1,85 @@
1/*
2 * Copyright (C) 2016 Christian Mollekopf <mollekopf@kolabsys.com>
3 *
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Lesser General Public
6 * License as published by the Free Software Foundation; either
7 * version 2.1 of the License, or (at your option) version 3, or any
8 * later version accepted by the membership of KDE e.V. (or its
9 * successor approved by the membership of KDE e.V.), which shall
10 * act as a proxy defined in Section 6 of version 3 of the license.
11 *
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this library. If not, see <http://www.gnu.org/licenses/>.
19 */
20#include "inspector.h"
21
22#include "resourcecontext.h"
23#include "inspection_generated.h"
24#include "bufferutils.h"
25
26#include <QDataStream>
27
28using namespace Sink;
29
30Inspector::Inspector(const ResourceContext &context)
31 : QObject(),
32 mResourceContext(context)
33 // mEntityStore(Storage::EntityStore::Ptr::create(mResourceContext)),
34 // mSyncStorage(Sink::storageLocation(), mResourceContext.instanceId() + ".synchronization", Sink::Storage::DataStore::DataStore::ReadWrite)
35{
36 // SinkTrace() << "Starting synchronizer: " << mResourceContext.resourceType << mResourceContext.instanceId();
37}
38
39Inspector::~Inspector()
40{
41
42}
43
44KAsync::Job<void> Inspector::processCommand(void const *command, size_t size)
45{
46 flatbuffers::Verifier verifier((const uint8_t *)command, size);
47 if (Sink::Commands::VerifyInspectionBuffer(verifier)) {
48 auto buffer = Sink::Commands::GetInspection(command);
49 int inspectionType = buffer->type();
50
51 QByteArray inspectionId = BufferUtils::extractBuffer(buffer->id());
52 QByteArray entityId = BufferUtils::extractBuffer(buffer->entityId());
53 QByteArray domainType = BufferUtils::extractBuffer(buffer->domainType());
54 QByteArray property = BufferUtils::extractBuffer(buffer->property());
55 QByteArray expectedValueString = BufferUtils::extractBuffer(buffer->expectedValue());
56 QDataStream s(expectedValueString);
57 QVariant expectedValue;
58 s >> expectedValue;
59 inspect(inspectionType, inspectionId, domainType, entityId, property, expectedValue)
60 .then<void>(
61 [=](const KAsync::Error &error) {
62 Sink::Notification n;
63 n.type = Sink::Notification::Inspection;
64 n.id = inspectionId;
65 if (error) {
66 Warning_area("resource.inspection") << "Inspection failed: " << inspectionType << inspectionId << entityId << error.errorMessage;
67 n.code = Sink::Notification::Failure;
68 } else {
69 Log_area("resource.inspection") << "Inspection was successful: " << inspectionType << inspectionId << entityId;
70 n.code = Sink::Notification::Success;
71 }
72 emit notify(n);
73 return KAsync::null();
74 })
75 .exec();
76 return KAsync::null<void>();
77 }
78 return KAsync::error<void>(-1, "Invalid inspection command.");
79}
80
81KAsync::Job<void> Inspector::inspect(int inspectionType, const QByteArray &inspectionId, const QByteArray &domainType, const QByteArray &entityId, const QByteArray &property, const QVariant &expectedValue)
82{
83 return KAsync::error(-1, "Inspection not implemented.");
84}
85
diff --git a/common/inspector.h b/common/inspector.h
new file mode 100644
index 0000000..ff167b1
--- /dev/null
+++ b/common/inspector.h
@@ -0,0 +1,52 @@
1/*
2 * Copyright (C) 2016 Christian Mollekopf <mollekopf@kolabsys.com>
3 *
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Lesser General Public
6 * License as published by the Free Software Foundation; either
7 * version 2.1 of the License, or (at your option) version 3, or any
8 * later version accepted by the membership of KDE e.V. (or its
9 * successor approved by the membership of KDE e.V.), which shall
10 * act as a proxy defined in Section 6 of version 3 of the license.
11 *
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this library. If not, see <http://www.gnu.org/licenses/>.
19 */
20#pragma once
21
22#include "sink_export.h"
23#include <QObject>
24#include <Async/Async>
25
26#include "notification.h"
27#include "resourcecontext.h"
28
29namespace Sink {
30
31/**
32 * Synchronize and add what we don't already have to local queue
33 */
34class SINK_EXPORT Inspector : public QObject
35{
36 Q_OBJECT
37public:
38 Inspector(const ResourceContext &resourceContext);
39 virtual ~Inspector();
40
41 KAsync::Job<void> processCommand(void const *command, size_t size);
42
43signals:
44 void notify(Notification);
45
46protected:
47 virtual KAsync::Job<void> inspect(int inspectionType, const QByteArray &inspectionId, const QByteArray &domainType, const QByteArray &entityId, const QByteArray &property, const QVariant &expectedValue);
48
49 Sink::ResourceContext mResourceContext;
50};
51
52}
diff --git a/examples/dummyresource/resourcefactory.cpp b/examples/dummyresource/resourcefactory.cpp
index f5ab2d9..8e81c79 100644
--- a/examples/dummyresource/resourcefactory.cpp
+++ b/examples/dummyresource/resourcefactory.cpp
@@ -37,6 +37,7 @@
37#include "facadefactory.h" 37#include "facadefactory.h"
38#include "adaptorfactoryregistry.h" 38#include "adaptorfactoryregistry.h"
39#include "synchronizer.h" 39#include "synchronizer.h"
40#include "inspector.h"
40#include "mailpreprocessor.h" 41#include "mailpreprocessor.h"
41#include "remoteidmap.h" 42#include "remoteidmap.h"
42#include <QDate> 43#include <QDate>
@@ -130,10 +131,36 @@ class DummySynchronizer : public Sink::Synchronizer {
130 131
131}; 132};
132 133
134class DummyInspector : public Sink::Inspector {
135public:
136 DummyInspector(const Sink::ResourceContext &resourceContext)
137 : Sink::Inspector(resourceContext)
138 {
139
140 }
141
142protected:
143 KAsync::Job<void> inspect(int inspectionType, const QByteArray &inspectionId, const QByteArray &domainType, const QByteArray &entityId, const QByteArray &property, const QVariant &expectedValue) Q_DECL_OVERRIDE
144 {
145 SinkTrace() << "Inspecting " << inspectionType << domainType << entityId << property << expectedValue;
146 if (property == "testInspection") {
147 if (expectedValue.toBool()) {
148 //Success
149 return KAsync::null<void>();
150 } else {
151 //Failure
152 return KAsync::error<void>(1, "Failed.");
153 }
154 }
155 return KAsync::null<void>();
156 }
157};
158
133DummyResource::DummyResource(const Sink::ResourceContext &resourceContext, const QSharedPointer<Sink::Pipeline> &pipeline) 159DummyResource::DummyResource(const Sink::ResourceContext &resourceContext, const QSharedPointer<Sink::Pipeline> &pipeline)
134 : Sink::GenericResource(resourceContext, pipeline) 160 : Sink::GenericResource(resourceContext, pipeline)
135{ 161{
136 setupSynchronizer(QSharedPointer<DummySynchronizer>::create(resourceContext)); 162 setupSynchronizer(QSharedPointer<DummySynchronizer>::create(resourceContext));
163 setupInspector(QSharedPointer<DummyInspector>::create(resourceContext));
137 setupPreprocessors(ENTITY_TYPE_MAIL, 164 setupPreprocessors(ENTITY_TYPE_MAIL,
138 QVector<Sink::Preprocessor*>() << new MailPropertyExtractor); 165 QVector<Sink::Preprocessor*>() << new MailPropertyExtractor);
139 setupPreprocessors(ENTITY_TYPE_FOLDER, 166 setupPreprocessors(ENTITY_TYPE_FOLDER,
@@ -159,23 +186,6 @@ KAsync::Job<void> DummyResource::synchronizeWithSource(const Sink::QueryBase &qu
159 return GenericResource::synchronizeWithSource(query); 186 return GenericResource::synchronizeWithSource(query);
160} 187}
161 188
162KAsync::Job<void> DummyResource::inspect(int inspectionType, const QByteArray &inspectionId, const QByteArray &domainType, const QByteArray &entityId, const QByteArray &property, const QVariant &expectedValue)
163{
164
165 SinkTrace() << "Inspecting " << inspectionType << domainType << entityId << property << expectedValue;
166 if (property == "testInspection") {
167 if (expectedValue.toBool()) {
168 //Success
169 return KAsync::null<void>();
170 } else {
171 //Failure
172 return KAsync::error<void>(1, "Failed.");
173 }
174 }
175 return KAsync::null<void>();
176}
177
178
179DummyResourceFactory::DummyResourceFactory(QObject *parent) 189DummyResourceFactory::DummyResourceFactory(QObject *parent)
180 : Sink::ResourceFactory(parent) 190 : Sink::ResourceFactory(parent)
181{ 191{
diff --git a/examples/dummyresource/resourcefactory.h b/examples/dummyresource/resourcefactory.h
index 8ef27a6..2eb7558 100644
--- a/examples/dummyresource/resourcefactory.h
+++ b/examples/dummyresource/resourcefactory.h
@@ -33,7 +33,6 @@ public:
33 virtual ~DummyResource(); 33 virtual ~DummyResource();
34 34
35 KAsync::Job<void> synchronizeWithSource(const Sink::QueryBase &) Q_DECL_OVERRIDE; 35 KAsync::Job<void> synchronizeWithSource(const Sink::QueryBase &) Q_DECL_OVERRIDE;
36 KAsync::Job<void> inspect(int inspectionType, const QByteArray &inspectionId, const QByteArray &domainType, const QByteArray &entityId, const QByteArray &property, const QVariant &expectedValue) Q_DECL_OVERRIDE;
37}; 36};
38 37
39class DummyResourceFactory : public Sink::ResourceFactory 38class DummyResourceFactory : public Sink::ResourceFactory
diff --git a/examples/imapresource/imapresource.cpp b/examples/imapresource/imapresource.cpp
index 2aa5a2c..40fa75f 100644
--- a/examples/imapresource/imapresource.cpp
+++ b/examples/imapresource/imapresource.cpp
@@ -27,6 +27,7 @@
27#include "definitions.h" 27#include "definitions.h"
28#include "inspection.h" 28#include "inspection.h"
29#include "synchronizer.h" 29#include "synchronizer.h"
30#include "inspector.h"
30#include "remoteidmap.h" 31#include "remoteidmap.h"
31#include "query.h" 32#include "query.h"
32 33
@@ -553,169 +554,192 @@ public:
553 QByteArray mResourceInstanceIdentifier; 554 QByteArray mResourceInstanceIdentifier;
554}; 555};
555 556
556ImapResource::ImapResource(const ResourceContext &resourceContext) 557class ImapInspector : public Sink::Inspector {
557 : Sink::GenericResource(resourceContext) 558public:
558{ 559 ImapInspector(const Sink::ResourceContext &resourceContext)
559 auto config = ResourceConfig::getConfiguration(resourceContext.instanceId()); 560 : Sink::Inspector(resourceContext)
560 mServer = config.value("server").toString(); 561 {
561 mPort = config.value("port").toInt();
562 mUser = config.value("username").toString();
563 mPassword = config.value("password").toString();
564 if (mServer.startsWith("imap")) {
565 mServer.remove("imap://");
566 mServer.remove("imaps://");
567 }
568 if (mServer.contains(':')) {
569 auto list = mServer.split(':');
570 mServer = list.at(0);
571 mPort = list.at(1).toInt();
572 }
573
574 auto synchronizer = QSharedPointer<ImapSynchronizer>::create(resourceContext);
575 synchronizer->mServer = mServer;
576 synchronizer->mPort = mPort;
577 synchronizer->mUser = mUser;
578 synchronizer->mPassword = mPassword;
579 setupSynchronizer(synchronizer);
580
581 setupPreprocessors(ENTITY_TYPE_MAIL, QVector<Sink::Preprocessor*>() << new SpecialPurposeProcessor(resourceContext.resourceType, resourceContext.instanceId()) << new MimeMessageMover << new MailPropertyExtractor);
582 setupPreprocessors(ENTITY_TYPE_FOLDER, QVector<Sink::Preprocessor*>());
583}
584 562
585KAsync::Job<void> ImapResource::inspect(int inspectionType, const QByteArray &inspectionId, const QByteArray &domainType, const QByteArray &entityId, const QByteArray &property, const QVariant &expectedValue) 563 }
586{
587 auto synchronizationStore = QSharedPointer<Sink::Storage::DataStore>::create(Sink::storageLocation(), mResourceContext.instanceId() + ".synchronization", Sink::Storage::DataStore::ReadOnly);
588 auto synchronizationTransaction = synchronizationStore->createTransaction(Sink::Storage::DataStore::ReadOnly);
589 564
590 auto mainStore = QSharedPointer<Sink::Storage::DataStore>::create(Sink::storageLocation(), mResourceContext.instanceId(), Sink::Storage::DataStore::ReadOnly); 565protected:
591 auto transaction = mainStore->createTransaction(Sink::Storage::DataStore::ReadOnly); 566 KAsync::Job<void> inspect(int inspectionType, const QByteArray &inspectionId, const QByteArray &domainType, const QByteArray &entityId, const QByteArray &property, const QVariant &expectedValue) Q_DECL_OVERRIDE {
567 auto synchronizationStore = QSharedPointer<Sink::Storage::DataStore>::create(Sink::storageLocation(), mResourceContext.instanceId() + ".synchronization", Sink::Storage::DataStore::ReadOnly);
568 auto synchronizationTransaction = synchronizationStore->createTransaction(Sink::Storage::DataStore::ReadOnly);
592 569
593 Sink::Storage::EntityStore entityStore(mResourceContext); 570 auto mainStore = QSharedPointer<Sink::Storage::DataStore>::create(Sink::storageLocation(), mResourceContext.instanceId(), Sink::Storage::DataStore::ReadOnly);
594 auto syncStore = QSharedPointer<Sink::RemoteIdMap>::create(synchronizationTransaction); 571 auto transaction = mainStore->createTransaction(Sink::Storage::DataStore::ReadOnly);
595 572
596 SinkTrace() << "Inspecting " << inspectionType << domainType << entityId << property << expectedValue; 573 Sink::Storage::EntityStore entityStore(mResourceContext);
574 auto syncStore = QSharedPointer<Sink::RemoteIdMap>::create(synchronizationTransaction);
597 575
598 if (domainType == ENTITY_TYPE_MAIL) { 576 SinkTrace() << "Inspecting " << inspectionType << domainType << entityId << property << expectedValue;
599 const auto mail = entityStore.readLatest<Sink::ApplicationDomain::Mail>(entityId);
600 const auto folder = entityStore.readLatest<Sink::ApplicationDomain::Folder>(mail.getFolder());
601 const auto folderRemoteId = syncStore->resolveLocalId(ENTITY_TYPE_FOLDER, mail.getFolder());
602 const auto mailRemoteId = syncStore->resolveLocalId(ENTITY_TYPE_MAIL, mail.identifier());
603 if (mailRemoteId.isEmpty() || folderRemoteId.isEmpty()) {
604 SinkWarning() << "Missing remote id for folder or mail. " << mailRemoteId << folderRemoteId;
605 return KAsync::error<void>();
606 }
607 const auto uid = uidFromMailRid(mailRemoteId);
608 SinkTrace() << "Mail remote id: " << folderRemoteId << mailRemoteId << mail.identifier() << folder.identifier();
609 577
610 KIMAP2::ImapSet set; 578 if (domainType == ENTITY_TYPE_MAIL) {
611 set.add(uid); 579 const auto mail = entityStore.readLatest<Sink::ApplicationDomain::Mail>(entityId);
612 if (set.isEmpty()) { 580 const auto folder = entityStore.readLatest<Sink::ApplicationDomain::Folder>(mail.getFolder());
613 return KAsync::error<void>(1, "Couldn't determine uid of mail."); 581 const auto folderRemoteId = syncStore->resolveLocalId(ENTITY_TYPE_FOLDER, mail.getFolder());
614 } 582 const auto mailRemoteId = syncStore->resolveLocalId(ENTITY_TYPE_MAIL, mail.identifier());
615 KIMAP2::FetchJob::FetchScope scope; 583 if (mailRemoteId.isEmpty() || folderRemoteId.isEmpty()) {
616 scope.mode = KIMAP2::FetchJob::FetchScope::Full; 584 SinkWarning() << "Missing remote id for folder or mail. " << mailRemoteId << folderRemoteId;
617 auto imap = QSharedPointer<ImapServerProxy>::create(mServer, mPort); 585 return KAsync::error<void>();
618 auto messageByUid = QSharedPointer<QHash<qint64, Imap::Message>>::create();
619 SinkTrace() << "Connecting to:" << mServer << mPort;
620 SinkTrace() << "as:" << mUser;
621 auto inspectionJob = imap->login(mUser, mPassword)
622 .then<Imap::SelectResult>(imap->select(folderRemoteId))
623 .syncThen<void, Imap::SelectResult>([](Imap::SelectResult){})
624 .then<void>(imap->fetch(set, scope, [imap, messageByUid](const Imap::Message &message) {
625 messageByUid->insert(message.uid, message);
626 }));
627
628 if (inspectionType == Sink::ResourceControl::Inspection::PropertyInspectionType) {
629 if (property == "unread") {
630 return inspectionJob.then<void>([=]() {
631 auto msg = messageByUid->value(uid);
632 if (expectedValue.toBool() && msg.flags.contains(Imap::Flags::Seen)) {
633 return KAsync::error<void>(1, "Expected unread but couldn't find it.");
634 }
635 if (!expectedValue.toBool() && !msg.flags.contains(Imap::Flags::Seen)) {
636 return KAsync::error<void>(1, "Expected read but couldn't find it.");
637 }
638 return KAsync::null<void>();
639 });
640 }
641 if (property == "subject") {
642 return inspectionJob.then<void>([=]() {
643 auto msg = messageByUid->value(uid);
644 if (msg.msg->subject(true)->asUnicodeString() != expectedValue.toString()) {
645 return KAsync::error<void>(1, "Subject not as expected: " + msg.msg->subject(true)->asUnicodeString());
646 }
647 return KAsync::null<void>();
648 });
649 } 586 }
650 } 587 const auto uid = uidFromMailRid(mailRemoteId);
651 if (inspectionType == Sink::ResourceControl::Inspection::ExistenceInspectionType) { 588 SinkTrace() << "Mail remote id: " << folderRemoteId << mailRemoteId << mail.identifier() << folder.identifier();
652 return inspectionJob.then<void>([=]() {
653 if (!messageByUid->contains(uid)) {
654 SinkWarning() << "Existing messages are: " << messageByUid->keys();
655 SinkWarning() << "We're looking for: " << uid;
656 return KAsync::error<void>(1, "Couldn't find message: " + mailRemoteId);
657 }
658 return KAsync::null<void>();
659 });
660 }
661 }
662 if (domainType == ENTITY_TYPE_FOLDER) {
663 const auto remoteId = syncStore->resolveLocalId(ENTITY_TYPE_FOLDER, entityId);
664 const auto folder = entityStore.readLatest<Sink::ApplicationDomain::Folder>(entityId);
665
666 if (inspectionType == Sink::ResourceControl::Inspection::CacheIntegrityInspectionType) {
667 SinkLog() << "Inspecting cache integrity" << remoteId;
668 589
669 int expectedCount = 0; 590 KIMAP2::ImapSet set;
670 Index index("mail.index.folder", transaction); 591 set.add(uid);
671 index.lookup(entityId, [&](const QByteArray &sinkId) { 592 if (set.isEmpty()) {
672 expectedCount++; 593 return KAsync::error<void>(1, "Couldn't determine uid of mail.");
673 }, 594 }
674 [&](const Index::Error &error) {
675 SinkWarning() << "Error in index: " << error.message << property;
676 });
677
678 auto set = KIMAP2::ImapSet::fromImapSequenceSet("1:*");
679 KIMAP2::FetchJob::FetchScope scope; 595 KIMAP2::FetchJob::FetchScope scope;
680 scope.mode = KIMAP2::FetchJob::FetchScope::Headers; 596 scope.mode = KIMAP2::FetchJob::FetchScope::Full;
681 auto imap = QSharedPointer<ImapServerProxy>::create(mServer, mPort); 597 auto imap = QSharedPointer<ImapServerProxy>::create(mServer, mPort);
682 auto messageByUid = QSharedPointer<QHash<qint64, Imap::Message>>::create(); 598 auto messageByUid = QSharedPointer<QHash<qint64, Imap::Message>>::create();
683 return imap->login(mUser, mPassword) 599 SinkTrace() << "Connecting to:" << mServer << mPort;
684 .then<void>(imap->select(remoteId).syncThen<void>([](){})) 600 SinkTrace() << "as:" << mUser;
685 .then<void>(imap->fetch(set, scope, [=](const Imap::Message message) { 601 auto inspectionJob = imap->login(mUser, mPassword)
602 .then<Imap::SelectResult>(imap->select(folderRemoteId))
603 .syncThen<void, Imap::SelectResult>([](Imap::SelectResult){})
604 .then<void>(imap->fetch(set, scope, [imap, messageByUid](const Imap::Message &message) {
686 messageByUid->insert(message.uid, message); 605 messageByUid->insert(message.uid, message);
687 })) 606 }));
688 .then<void>([imap, messageByUid, expectedCount]() { 607
689 if (messageByUid->size() != expectedCount) { 608 if (inspectionType == Sink::ResourceControl::Inspection::PropertyInspectionType) {
690 return KAsync::error<void>(1, QString("Wrong number of messages on the server; found %1 instead of %2.").arg(messageByUid->size()).arg(expectedCount)); 609 if (property == "unread") {
610 return inspectionJob.then<void>([=]() {
611 auto msg = messageByUid->value(uid);
612 if (expectedValue.toBool() && msg.flags.contains(Imap::Flags::Seen)) {
613 return KAsync::error<void>(1, "Expected unread but couldn't find it.");
614 }
615 if (!expectedValue.toBool() && !msg.flags.contains(Imap::Flags::Seen)) {
616 return KAsync::error<void>(1, "Expected read but couldn't find it.");
617 }
618 return KAsync::null<void>();
619 });
620 }
621 if (property == "subject") {
622 return inspectionJob.then<void>([=]() {
623 auto msg = messageByUid->value(uid);
624 if (msg.msg->subject(true)->asUnicodeString() != expectedValue.toString()) {
625 return KAsync::error<void>(1, "Subject not as expected: " + msg.msg->subject(true)->asUnicodeString());
626 }
627 return KAsync::null<void>();
628 });
629 }
630 }
631 if (inspectionType == Sink::ResourceControl::Inspection::ExistenceInspectionType) {
632 return inspectionJob.then<void>([=]() {
633 if (!messageByUid->contains(uid)) {
634 SinkWarning() << "Existing messages are: " << messageByUid->keys();
635 SinkWarning() << "We're looking for: " << uid;
636 return KAsync::error<void>(1, "Couldn't find message: " + mailRemoteId);
691 } 637 }
692 return KAsync::null<void>(); 638 return KAsync::null<void>();
693 }); 639 });
640 }
694 } 641 }
695 if (inspectionType == Sink::ResourceControl::Inspection::ExistenceInspectionType) { 642 if (domainType == ENTITY_TYPE_FOLDER) {
696 auto folderByPath = QSharedPointer<QSet<QString>>::create(); 643 const auto remoteId = syncStore->resolveLocalId(ENTITY_TYPE_FOLDER, entityId);
697 auto folderByName = QSharedPointer<QSet<QString>>::create(); 644 const auto folder = entityStore.readLatest<Sink::ApplicationDomain::Folder>(entityId);
698 645
699 auto imap = QSharedPointer<ImapServerProxy>::create(mServer, mPort); 646 if (inspectionType == Sink::ResourceControl::Inspection::CacheIntegrityInspectionType) {
700 auto inspectionJob = imap->login(mUser, mPassword) 647 SinkLog() << "Inspecting cache integrity" << remoteId;
701 .then<void>(imap->fetchFolders([=](const Imap::Folder &f) { 648
702 *folderByPath << f.normalizedPath(); 649 int expectedCount = 0;
703 *folderByName << f.name(); 650 Index index("mail.index.folder", transaction);
704 })) 651 index.lookup(entityId, [&](const QByteArray &sinkId) {
705 .then<void>([this, folderByName, folderByPath, folder, remoteId, imap]() { 652 expectedCount++;
706 if (!folderByName->contains(folder.getName())) { 653 },
707 SinkWarning() << "Existing folders are: " << *folderByPath; 654 [&](const Index::Error &error) {
708 SinkWarning() << "We're looking for: " << folder.getName(); 655 SinkWarning() << "Error in index: " << error.message << property;
709 return KAsync::error<void>(1, "Wrong folder name: " + remoteId);
710 }
711 return KAsync::null<void>();
712 }); 656 });
713 657
714 return inspectionJob; 658 auto set = KIMAP2::ImapSet::fromImapSequenceSet("1:*");
659 KIMAP2::FetchJob::FetchScope scope;
660 scope.mode = KIMAP2::FetchJob::FetchScope::Headers;
661 auto imap = QSharedPointer<ImapServerProxy>::create(mServer, mPort);
662 auto messageByUid = QSharedPointer<QHash<qint64, Imap::Message>>::create();
663 return imap->login(mUser, mPassword)
664 .then<void>(imap->select(remoteId).syncThen<void>([](){}))
665 .then<void>(imap->fetch(set, scope, [=](const Imap::Message message) {
666 messageByUid->insert(message.uid, message);
667 }))
668 .then<void>([imap, messageByUid, expectedCount]() {
669 if (messageByUid->size() != expectedCount) {
670 return KAsync::error<void>(1, QString("Wrong number of messages on the server; found %1 instead of %2.").arg(messageByUid->size()).arg(expectedCount));
671 }
672 return KAsync::null<void>();
673 });
674 }
675 if (inspectionType == Sink::ResourceControl::Inspection::ExistenceInspectionType) {
676 auto folderByPath = QSharedPointer<QSet<QString>>::create();
677 auto folderByName = QSharedPointer<QSet<QString>>::create();
678
679 auto imap = QSharedPointer<ImapServerProxy>::create(mServer, mPort);
680 auto inspectionJob = imap->login(mUser, mPassword)
681 .then<void>(imap->fetchFolders([=](const Imap::Folder &f) {
682 *folderByPath << f.normalizedPath();
683 *folderByName << f.name();
684 }))
685 .then<void>([this, folderByName, folderByPath, folder, remoteId, imap]() {
686 if (!folderByName->contains(folder.getName())) {
687 SinkWarning() << "Existing folders are: " << *folderByPath;
688 SinkWarning() << "We're looking for: " << folder.getName();
689 return KAsync::error<void>(1, "Wrong folder name: " + remoteId);
690 }
691 return KAsync::null<void>();
692 });
693
694 return inspectionJob;
695 }
696
715 } 697 }
698 return KAsync::null<void>();
699 }
700
701public:
702 QString mServer;
703 int mPort;
704 QString mUser;
705 QString mPassword;
706};
707
716 708
709ImapResource::ImapResource(const ResourceContext &resourceContext)
710 : Sink::GenericResource(resourceContext)
711{
712 auto config = ResourceConfig::getConfiguration(resourceContext.instanceId());
713 auto server = config.value("server").toString();
714 auto port = config.value("port").toInt();
715 auto user = config.value("username").toString();
716 auto password = config.value("password").toString();
717 if (server.startsWith("imap")) {
718 server.remove("imap://");
719 server.remove("imaps://");
720 }
721 if (server.contains(':')) {
722 auto list = server.split(':');
723 server = list.at(0);
724 port = list.at(1).toInt();
717 } 725 }
718 return KAsync::null<void>(); 726
727 auto synchronizer = QSharedPointer<ImapSynchronizer>::create(resourceContext);
728 synchronizer->mServer = server;
729 synchronizer->mPort = port;
730 synchronizer->mUser = user;
731 synchronizer->mPassword = password;
732 setupSynchronizer(synchronizer);
733
734 auto inspector = QSharedPointer<ImapInspector>::create(resourceContext);
735 inspector->mServer = server;
736 inspector->mPort = port;
737 inspector->mUser = user;
738 inspector->mPassword = password;
739 setupInspector(inspector);
740
741 setupPreprocessors(ENTITY_TYPE_MAIL, QVector<Sink::Preprocessor*>() << new SpecialPurposeProcessor(resourceContext.resourceType, resourceContext.instanceId()) << new MimeMessageMover << new MailPropertyExtractor);
742 setupPreprocessors(ENTITY_TYPE_FOLDER, QVector<Sink::Preprocessor*>());
719} 743}
720 744
721ImapResourceFactory::ImapResourceFactory(QObject *parent) 745ImapResourceFactory::ImapResourceFactory(QObject *parent)
diff --git a/examples/imapresource/imapresource.h b/examples/imapresource/imapresource.h
index d345d64..aeb1200 100644
--- a/examples/imapresource/imapresource.h
+++ b/examples/imapresource/imapresource.h
@@ -40,13 +40,6 @@ class ImapResource : public Sink::GenericResource
40{ 40{
41public: 41public:
42 ImapResource(const Sink::ResourceContext &resourceContext); 42 ImapResource(const Sink::ResourceContext &resourceContext);
43 KAsync::Job<void> inspect(int inspectionType, const QByteArray &inspectionId, const QByteArray &domainType, const QByteArray &entityId, const QByteArray &property, const QVariant &expectedValue) Q_DECL_OVERRIDE;
44
45private:
46 QString mServer;
47 int mPort;
48 QString mUser;
49 QString mPassword;
50}; 43};
51 44
52class ImapResourceFactory : public Sink::ResourceFactory 45class ImapResourceFactory : public Sink::ResourceFactory
diff --git a/examples/maildirresource/maildirresource.cpp b/examples/maildirresource/maildirresource.cpp
index ee84bde..2b19789 100644
--- a/examples/maildirresource/maildirresource.cpp
+++ b/examples/maildirresource/maildirresource.cpp
@@ -27,6 +27,7 @@
27#include "libmaildir/maildir.h" 27#include "libmaildir/maildir.h"
28#include "inspection.h" 28#include "inspection.h"
29#include "synchronizer.h" 29#include "synchronizer.h"
30#include "inspector.h"
30 31
31#include "facadefactory.h" 32#include "facadefactory.h"
32#include "adaptorfactoryregistry.h" 33#include "adaptorfactoryregistry.h"
@@ -425,6 +426,102 @@ public:
425 QString mMaildirPath; 426 QString mMaildirPath;
426}; 427};
427 428
429class MaildirInspector : public Sink::Inspector {
430public:
431 MaildirInspector(const Sink::ResourceContext &resourceContext)
432 : Sink::Inspector(resourceContext)
433 {
434
435 }
436protected:
437
438 KAsync::Job<void> inspect(int inspectionType, const QByteArray &inspectionId, const QByteArray &domainType, const QByteArray &entityId, const QByteArray &property, const QVariant &expectedValue) Q_DECL_OVERRIDE {
439 auto synchronizationStore = QSharedPointer<Sink::Storage::DataStore>::create(Sink::storageLocation(), mResourceContext.instanceId() + ".synchronization", Sink::Storage::DataStore::ReadOnly);
440 auto synchronizationTransaction = synchronizationStore->createTransaction(Sink::Storage::DataStore::ReadOnly);
441
442 auto mainStore = QSharedPointer<Sink::Storage::DataStore>::create(Sink::storageLocation(), mResourceContext.instanceId(), Sink::Storage::DataStore::ReadOnly);
443 auto transaction = mainStore->createTransaction(Sink::Storage::DataStore::ReadOnly);
444
445 Sink::Storage::EntityStore entityStore(mResourceContext);
446 auto syncStore = QSharedPointer<RemoteIdMap>::create(synchronizationTransaction);
447
448 SinkTrace() << "Inspecting " << inspectionType << domainType << entityId << property << expectedValue;
449
450 if (domainType == ENTITY_TYPE_MAIL) {
451 auto mail = entityStore.readLatest<Sink::ApplicationDomain::Mail>(entityId);
452 const auto filePath = getFilePathFromMimeMessagePath(mail.getMimeMessagePath());
453
454 if (inspectionType == Sink::ResourceControl::Inspection::PropertyInspectionType) {
455 if (property == "unread") {
456 const auto flags = KPIM::Maildir::readEntryFlags(filePath.split('/').last());
457 if (expectedValue.toBool() && (flags & KPIM::Maildir::Seen)) {
458 return KAsync::error<void>(1, "Expected unread but couldn't find it.");
459 }
460 if (!expectedValue.toBool() && !(flags & KPIM::Maildir::Seen)) {
461 return KAsync::error<void>(1, "Expected read but couldn't find it.");
462 }
463 return KAsync::null<void>();
464 }
465 if (property == "subject") {
466 KMime::Message *msg = new KMime::Message;
467 msg->setHead(KMime::CRLFtoLF(KPIM::Maildir::readEntryHeadersFromFile(filePath)));
468 msg->parse();
469
470 if (msg->subject(true)->asUnicodeString() != expectedValue.toString()) {
471 return KAsync::error<void>(1, "Subject not as expected: " + msg->subject(true)->asUnicodeString());
472 }
473 return KAsync::null<void>();
474 }
475 }
476 if (inspectionType == Sink::ResourceControl::Inspection::ExistenceInspectionType) {
477 if (QFileInfo(filePath).exists() != expectedValue.toBool()) {
478 return KAsync::error<void>(1, "Wrong file existence: " + filePath);
479 }
480 }
481 }
482 if (domainType == ENTITY_TYPE_FOLDER) {
483 const auto remoteId = syncStore->resolveLocalId(ENTITY_TYPE_FOLDER, entityId);
484 auto folder = entityStore.readLatest<Sink::ApplicationDomain::Folder>(entityId);
485
486 if (inspectionType == Sink::ResourceControl::Inspection::CacheIntegrityInspectionType) {
487 SinkTrace() << "Inspecting cache integrity" << remoteId;
488 if (!QDir(remoteId).exists()) {
489 return KAsync::error<void>(1, "The directory is not existing: " + remoteId);
490 }
491
492 int expectedCount = 0;
493 Index index("mail.index.folder", transaction);
494 index.lookup(entityId, [&](const QByteArray &sinkId) {
495 expectedCount++;
496 },
497 [&](const Index::Error &error) {
498 SinkWarning() << "Error in index: " << error.message << property;
499 });
500
501 QDir dir(remoteId + "/cur");
502 const QFileInfoList list = dir.entryInfoList(QDir::Files);
503 if (list.size() != expectedCount) {
504 for (const auto &fileInfo : list) {
505 SinkWarning() << "Found in cache: " << fileInfo.fileName();
506 }
507 return KAsync::error<void>(1, QString("Wrong number of files; found %1 instead of %2.").arg(list.size()).arg(expectedCount));
508 }
509 }
510 if (inspectionType == Sink::ResourceControl::Inspection::ExistenceInspectionType) {
511 if (!remoteId.endsWith(folder.getName().toUtf8())) {
512 return KAsync::error<void>(1, "Wrong folder name: " + remoteId);
513 }
514 //TODO we shouldn't use the remoteId here to figure out the path, it could be gone/changed already
515 if (QDir(remoteId).exists() != expectedValue.toBool()) {
516 return KAsync::error<void>(1, "Wrong folder existence: " + remoteId);
517 }
518 }
519
520 }
521 return KAsync::null<void>();
522 }
523};
524
428 525
429MaildirResource::MaildirResource(const Sink::ResourceContext &resourceContext) 526MaildirResource::MaildirResource(const Sink::ResourceContext &resourceContext)
430 : Sink::GenericResource(resourceContext) 527 : Sink::GenericResource(resourceContext)
@@ -439,6 +536,7 @@ MaildirResource::MaildirResource(const Sink::ResourceContext &resourceContext)
439 auto synchronizer = QSharedPointer<MaildirSynchronizer>::create(resourceContext); 536 auto synchronizer = QSharedPointer<MaildirSynchronizer>::create(resourceContext);
440 synchronizer->mMaildirPath = mMaildirPath; 537 synchronizer->mMaildirPath = mMaildirPath;
441 setupSynchronizer(synchronizer); 538 setupSynchronizer(synchronizer);
539 setupInspector(QSharedPointer<MaildirInspector>::create(resourceContext));
442 540
443 setupPreprocessors(ENTITY_TYPE_MAIL, QVector<Sink::Preprocessor*>() << new SpecialPurposeProcessor(resourceContext.resourceType, resourceContext.instanceId()) << new MaildirMimeMessageMover(resourceContext.instanceId(), mMaildirPath) << new MaildirMailPropertyExtractor); 541 setupPreprocessors(ENTITY_TYPE_MAIL, QVector<Sink::Preprocessor*>() << new SpecialPurposeProcessor(resourceContext.resourceType, resourceContext.instanceId()) << new MaildirMimeMessageMover(resourceContext.instanceId(), mMaildirPath) << new MaildirMailPropertyExtractor);
444 setupPreprocessors(ENTITY_TYPE_FOLDER, QVector<Sink::Preprocessor*>() << new FolderPreprocessor(mMaildirPath)); 542 setupPreprocessors(ENTITY_TYPE_FOLDER, QVector<Sink::Preprocessor*>() << new FolderPreprocessor(mMaildirPath));
@@ -458,93 +556,6 @@ MaildirResource::MaildirResource(const Sink::ResourceContext &resourceContext)
458 synchronizer->commit(); 556 synchronizer->commit();
459} 557}
460 558
461KAsync::Job<void> MaildirResource::inspect(int inspectionType, const QByteArray &inspectionId, const QByteArray &domainType, const QByteArray &entityId, const QByteArray &property, const QVariant &expectedValue)
462{
463 auto synchronizationStore = QSharedPointer<Sink::Storage::DataStore>::create(Sink::storageLocation(), mResourceContext.instanceId() + ".synchronization", Sink::Storage::DataStore::ReadOnly);
464 auto synchronizationTransaction = synchronizationStore->createTransaction(Sink::Storage::DataStore::ReadOnly);
465
466 auto mainStore = QSharedPointer<Sink::Storage::DataStore>::create(Sink::storageLocation(), mResourceContext.instanceId(), Sink::Storage::DataStore::ReadOnly);
467 auto transaction = mainStore->createTransaction(Sink::Storage::DataStore::ReadOnly);
468
469 Sink::Storage::EntityStore entityStore(mResourceContext);
470 auto syncStore = QSharedPointer<RemoteIdMap>::create(synchronizationTransaction);
471
472 SinkTrace() << "Inspecting " << inspectionType << domainType << entityId << property << expectedValue;
473
474 if (domainType == ENTITY_TYPE_MAIL) {
475 auto mail = entityStore.readLatest<Sink::ApplicationDomain::Mail>(entityId);
476 const auto filePath = getFilePathFromMimeMessagePath(mail.getMimeMessagePath());
477
478 if (inspectionType == Sink::ResourceControl::Inspection::PropertyInspectionType) {
479 if (property == "unread") {
480 const auto flags = KPIM::Maildir::readEntryFlags(filePath.split('/').last());
481 if (expectedValue.toBool() && (flags & KPIM::Maildir::Seen)) {
482 return KAsync::error<void>(1, "Expected unread but couldn't find it.");
483 }
484 if (!expectedValue.toBool() && !(flags & KPIM::Maildir::Seen)) {
485 return KAsync::error<void>(1, "Expected read but couldn't find it.");
486 }
487 return KAsync::null<void>();
488 }
489 if (property == "subject") {
490 KMime::Message *msg = new KMime::Message;
491 msg->setHead(KMime::CRLFtoLF(KPIM::Maildir::readEntryHeadersFromFile(filePath)));
492 msg->parse();
493
494 if (msg->subject(true)->asUnicodeString() != expectedValue.toString()) {
495 return KAsync::error<void>(1, "Subject not as expected: " + msg->subject(true)->asUnicodeString());
496 }
497 return KAsync::null<void>();
498 }
499 }
500 if (inspectionType == Sink::ResourceControl::Inspection::ExistenceInspectionType) {
501 if (QFileInfo(filePath).exists() != expectedValue.toBool()) {
502 return KAsync::error<void>(1, "Wrong file existence: " + filePath);
503 }
504 }
505 }
506 if (domainType == ENTITY_TYPE_FOLDER) {
507 const auto remoteId = syncStore->resolveLocalId(ENTITY_TYPE_FOLDER, entityId);
508 auto folder = entityStore.readLatest<Sink::ApplicationDomain::Folder>(entityId);
509
510 if (inspectionType == Sink::ResourceControl::Inspection::CacheIntegrityInspectionType) {
511 SinkTrace() << "Inspecting cache integrity" << remoteId;
512 if (!QDir(remoteId).exists()) {
513 return KAsync::error<void>(1, "The directory is not existing: " + remoteId);
514 }
515
516 int expectedCount = 0;
517 Index index("mail.index.folder", transaction);
518 index.lookup(entityId, [&](const QByteArray &sinkId) {
519 expectedCount++;
520 },
521 [&](const Index::Error &error) {
522 SinkWarning() << "Error in index: " << error.message << property;
523 });
524
525 QDir dir(remoteId + "/cur");
526 const QFileInfoList list = dir.entryInfoList(QDir::Files);
527 if (list.size() != expectedCount) {
528 for (const auto &fileInfo : list) {
529 SinkWarning() << "Found in cache: " << fileInfo.fileName();
530 }
531 return KAsync::error<void>(1, QString("Wrong number of files; found %1 instead of %2.").arg(list.size()).arg(expectedCount));
532 }
533 }
534 if (inspectionType == Sink::ResourceControl::Inspection::ExistenceInspectionType) {
535 if (!remoteId.endsWith(folder.getName().toUtf8())) {
536 return KAsync::error<void>(1, "Wrong folder name: " + remoteId);
537 }
538 //TODO we shouldn't use the remoteId here to figure out the path, it could be gone/changed already
539 if (QDir(remoteId).exists() != expectedValue.toBool()) {
540 return KAsync::error<void>(1, "Wrong folder existence: " + remoteId);
541 }
542 }
543
544 }
545 return KAsync::null<void>();
546}
547
548 559
549MaildirResourceFactory::MaildirResourceFactory(QObject *parent) 560MaildirResourceFactory::MaildirResourceFactory(QObject *parent)
550 : Sink::ResourceFactory(parent) 561 : Sink::ResourceFactory(parent)
diff --git a/examples/maildirresource/maildirresource.h b/examples/maildirresource/maildirresource.h
index 4eb2042..61fe438 100644
--- a/examples/maildirresource/maildirresource.h
+++ b/examples/maildirresource/maildirresource.h
@@ -43,9 +43,8 @@ class MaildirResource : public Sink::GenericResource
43{ 43{
44public: 44public:
45 MaildirResource(const Sink::ResourceContext &resourceContext); 45 MaildirResource(const Sink::ResourceContext &resourceContext);
46 KAsync::Job<void> inspect(int inspectionType, const QByteArray &inspectionId, const QByteArray &domainType, const QByteArray &entityId, const QByteArray &property, const QVariant &expectedValue) Q_DECL_OVERRIDE;
47private:
48 46
47private:
49 QStringList listAvailableFolders(); 48 QStringList listAvailableFolders();
50 QString mMaildirPath; 49 QString mMaildirPath;
51 QString mDraftsFolder; 50 QString mDraftsFolder;
diff --git a/examples/mailtransportresource/mailtransportresource.cpp b/examples/mailtransportresource/mailtransportresource.cpp
index c135de9..524b411 100644
--- a/examples/mailtransportresource/mailtransportresource.cpp
+++ b/examples/mailtransportresource/mailtransportresource.cpp
@@ -22,7 +22,7 @@
22#include "facadefactory.h" 22#include "facadefactory.h"
23#include "resourceconfig.h" 23#include "resourceconfig.h"
24#include "definitions.h" 24#include "definitions.h"
25#include "domainadaptor.h" 25#include "inspector.h"
26#include <QDir> 26#include <QDir>
27#include <QFileInfo> 27#include <QFileInfo>
28#include <QSettings> 28#include <QSettings>
@@ -124,6 +124,31 @@ public:
124 MailtransportResource::Settings mSettings; 124 MailtransportResource::Settings mSettings;
125}; 125};
126 126
127class MailtransportInspector : public Sink::Inspector {
128public:
129 MailtransportInspector(const Sink::ResourceContext &resourceContext)
130 : Sink::Inspector(resourceContext)
131 {
132
133 }
134
135protected:
136 KAsync::Job<void> inspect(int inspectionType, const QByteArray &inspectionId, const QByteArray &domainType, const QByteArray &entityId, const QByteArray &property, const QVariant &expectedValue) Q_DECL_OVERRIDE
137 {
138 if (domainType == ENTITY_TYPE_MAIL) {
139 if (inspectionType == Sink::ResourceControl::Inspection::ExistenceInspectionType) {
140 auto path = resourceStorageLocation(mResourceContext.instanceId()) + "/test/" + entityId;
141 if (QFileInfo::exists(path)) {
142 return KAsync::null<void>();
143 }
144 return KAsync::error<void>(1, "Couldn't find message: " + path);
145 }
146 }
147 return KAsync::null<void>();
148 }
149};
150
151
127MailtransportResource::MailtransportResource(const Sink::ResourceContext &resourceContext) 152MailtransportResource::MailtransportResource(const Sink::ResourceContext &resourceContext)
128 : Sink::GenericResource(resourceContext) 153 : Sink::GenericResource(resourceContext)
129{ 154{
@@ -138,30 +163,11 @@ MailtransportResource::MailtransportResource(const Sink::ResourceContext &resour
138 auto synchronizer = QSharedPointer<MailtransportSynchronizer>::create(resourceContext); 163 auto synchronizer = QSharedPointer<MailtransportSynchronizer>::create(resourceContext);
139 synchronizer->mSettings = mSettings; 164 synchronizer->mSettings = mSettings;
140 setupSynchronizer(synchronizer); 165 setupSynchronizer(synchronizer);
166 setupInspector(QSharedPointer<MailtransportInspector>::create(resourceContext));
141 167
142 setupPreprocessors(ENTITY_TYPE_MAIL, QVector<Sink::Preprocessor*>() << new MimeMessageMover << new MailPropertyExtractor); 168 setupPreprocessors(ENTITY_TYPE_MAIL, QVector<Sink::Preprocessor*>() << new MimeMessageMover << new MailPropertyExtractor);
143} 169}
144 170
145void MailtransportResource::removeFromDisk(const QByteArray &instanceIdentifier)
146{
147 GenericResource::removeFromDisk(instanceIdentifier);
148 Sink::Storage::DataStore(Sink::storageLocation(), instanceIdentifier + ".synchronization", Sink::Storage::DataStore::ReadWrite).removeFromDisk();
149}
150
151KAsync::Job<void> MailtransportResource::inspect(int inspectionType, const QByteArray &inspectionId, const QByteArray &domainType, const QByteArray &entityId, const QByteArray &property, const QVariant &expectedValue)
152{
153 if (domainType == ENTITY_TYPE_MAIL) {
154 if (inspectionType == Sink::ResourceControl::Inspection::ExistenceInspectionType) {
155 auto path = resourceStorageLocation(mResourceContext.instanceId()) + "/test/" + entityId;
156 if (QFileInfo::exists(path)) {
157 return KAsync::null<void>();
158 }
159 return KAsync::error<void>(1, "Couldn't find message: " + path);
160 }
161 }
162 return KAsync::null<void>();
163}
164
165MailtransportResourceFactory::MailtransportResourceFactory(QObject *parent) 171MailtransportResourceFactory::MailtransportResourceFactory(QObject *parent)
166 : Sink::ResourceFactory(parent) 172 : Sink::ResourceFactory(parent)
167{ 173{
diff --git a/examples/mailtransportresource/mailtransportresource.h b/examples/mailtransportresource/mailtransportresource.h
index 95a9cd7..531fcd5 100644
--- a/examples/mailtransportresource/mailtransportresource.h
+++ b/examples/mailtransportresource/mailtransportresource.h
@@ -26,8 +26,6 @@ class MailtransportResource : public Sink::GenericResource
26{ 26{
27public: 27public:
28 MailtransportResource(const Sink::ResourceContext &resourceContext); 28 MailtransportResource(const Sink::ResourceContext &resourceContext);
29 KAsync::Job<void> inspect(int inspectionType, const QByteArray &inspectionId, const QByteArray &domainType, const QByteArray &entityId, const QByteArray &property, const QVariant &expectedValue) Q_DECL_OVERRIDE;
30 static void removeFromDisk(const QByteArray &instanceIdentifier);
31 29
32 struct Settings { 30 struct Settings {
33 QString server; 31 QString server;