summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--common/domain/event.cpp21
-rw-r--r--common/domain/event.h2
-rw-r--r--common/domain/folder.cpp22
-rw-r--r--common/domain/folder.h2
-rw-r--r--common/domain/mail.cpp13
-rw-r--r--common/domain/mail.h2
-rw-r--r--common/domainadaptor.h16
-rw-r--r--common/genericresource.cpp6
-rw-r--r--common/indexupdater.h91
-rw-r--r--common/mailpreprocessor.cpp10
-rw-r--r--common/mailpreprocessor.h10
-rw-r--r--common/pipeline.cpp275
-rw-r--r--common/pipeline.h44
-rw-r--r--common/specialpurposepreprocessor.cpp18
-rw-r--r--common/specialpurposepreprocessor.h8
-rw-r--r--common/storage/entitystore.cpp185
-rw-r--r--common/storage/entitystore.h11
-rw-r--r--common/typeindex.cpp16
-rw-r--r--common/typeindex.h5
-rw-r--r--examples/dummyresource/resourcefactory.cpp7
-rw-r--r--examples/imapresource/imapresource.cpp5
-rw-r--r--examples/maildirresource/maildirresource.cpp33
-rw-r--r--examples/mailtransportresource/mailtransportresource.cpp3
-rw-r--r--tests/mailquerybenchmark.cpp5
-rw-r--r--tests/pipelinebenchmark.cpp30
-rw-r--r--tests/pipelinetest.cpp71
26 files changed, 378 insertions, 533 deletions
diff --git a/common/domain/event.cpp b/common/domain/event.cpp
index d50652d..41ec625 100644
--- a/common/domain/event.cpp
+++ b/common/domain/event.cpp
@@ -47,27 +47,6 @@ void TypeImplementation<Event>::configureIndex(TypeIndex &index)
47 index.addProperty<QByteArray>(Event::Uid::name); 47 index.addProperty<QByteArray>(Event::Uid::name);
48} 48}
49 49
50static TypeIndex &getIndex()
51{
52 QMutexLocker locker(&sMutex);
53 static TypeIndex *index = 0;
54 if (!index) {
55 index = new TypeIndex("event");
56 TypeImplementation<Event>::configureIndex(*index);
57 }
58 return *index;
59}
60
61void TypeImplementation<Event>::index(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::DataStore::Transaction &transaction)
62{
63 return getIndex().add(identifier, bufferAdaptor, transaction);
64}
65
66void TypeImplementation<Event>::removeIndex(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::DataStore::Transaction &transaction)
67{
68 return getIndex().remove(identifier, bufferAdaptor, transaction);
69}
70
71QSharedPointer<ReadPropertyMapper<TypeImplementation<Event>::Buffer> > TypeImplementation<Event>::initializeReadPropertyMapper() 50QSharedPointer<ReadPropertyMapper<TypeImplementation<Event>::Buffer> > TypeImplementation<Event>::initializeReadPropertyMapper()
72{ 51{
73 auto propertyMapper = QSharedPointer<ReadPropertyMapper<Buffer> >::create(); 52 auto propertyMapper = QSharedPointer<ReadPropertyMapper<Buffer> >::create();
diff --git a/common/domain/event.h b/common/domain/event.h
index 18e0f20..fbaf4ed 100644
--- a/common/domain/event.h
+++ b/common/domain/event.h
@@ -56,8 +56,6 @@ public:
56 typedef Sink::ApplicationDomain::Buffer::EventBuilder BufferBuilder; 56 typedef Sink::ApplicationDomain::Buffer::EventBuilder BufferBuilder;
57 static void configureIndex(TypeIndex &index); 57 static void configureIndex(TypeIndex &index);
58 static QSet<QByteArray> indexedProperties(); 58 static QSet<QByteArray> indexedProperties();
59 static void index(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::DataStore::Transaction &transaction);
60 static void removeIndex(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::DataStore::Transaction &transaction);
61 static QSharedPointer<ReadPropertyMapper<Buffer> > initializeReadPropertyMapper(); 59 static QSharedPointer<ReadPropertyMapper<Buffer> > initializeReadPropertyMapper();
62 static QSharedPointer<WritePropertyMapper<BufferBuilder> > initializeWritePropertyMapper(); 60 static QSharedPointer<WritePropertyMapper<BufferBuilder> > initializeWritePropertyMapper();
63}; 61};
diff --git a/common/domain/folder.cpp b/common/domain/folder.cpp
index 94727a3..058035a 100644
--- a/common/domain/folder.cpp
+++ b/common/domain/folder.cpp
@@ -50,28 +50,6 @@ void TypeImplementation<Folder>::configureIndex(TypeIndex &index)
50 index.addProperty<QString>(Folder::Name::name); 50 index.addProperty<QString>(Folder::Name::name);
51} 51}
52 52
53static TypeIndex &getIndex()
54{
55 QMutexLocker locker(&sMutex);
56 static TypeIndex *index = 0;
57 if (!index) {
58 index = new TypeIndex("folder");
59 TypeImplementation<Folder>::configureIndex(*index);
60 }
61 return *index;
62}
63
64void TypeImplementation<Folder>::index(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::DataStore::Transaction &transaction)
65{
66 SinkTrace() << "Indexing " << identifier;
67 getIndex().add(identifier, bufferAdaptor, transaction);
68}
69
70void TypeImplementation<Folder>::removeIndex(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::DataStore::Transaction &transaction)
71{
72 getIndex().remove(identifier, bufferAdaptor, transaction);
73}
74
75QSharedPointer<ReadPropertyMapper<TypeImplementation<Folder>::Buffer> > TypeImplementation<Folder>::initializeReadPropertyMapper() 53QSharedPointer<ReadPropertyMapper<TypeImplementation<Folder>::Buffer> > TypeImplementation<Folder>::initializeReadPropertyMapper()
76{ 54{
77 auto propertyMapper = QSharedPointer<ReadPropertyMapper<Buffer> >::create(); 55 auto propertyMapper = QSharedPointer<ReadPropertyMapper<Buffer> >::create();
diff --git a/common/domain/folder.h b/common/domain/folder.h
index ea0d79a..47a544b 100644
--- a/common/domain/folder.h
+++ b/common/domain/folder.h
@@ -50,8 +50,6 @@ public:
50 typedef Sink::ApplicationDomain::Buffer::FolderBuilder BufferBuilder; 50 typedef Sink::ApplicationDomain::Buffer::FolderBuilder BufferBuilder;
51 static void configureIndex(TypeIndex &index); 51 static void configureIndex(TypeIndex &index);
52 static QSet<QByteArray> indexedProperties(); 52 static QSet<QByteArray> indexedProperties();
53 static void index(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::DataStore::Transaction &transaction);
54 static void removeIndex(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::DataStore::Transaction &transaction);
55 static QSharedPointer<ReadPropertyMapper<Buffer> > initializeReadPropertyMapper(); 53 static QSharedPointer<ReadPropertyMapper<Buffer> > initializeReadPropertyMapper();
56 static QSharedPointer<WritePropertyMapper<BufferBuilder> > initializeWritePropertyMapper(); 54 static QSharedPointer<WritePropertyMapper<BufferBuilder> > initializeWritePropertyMapper();
57}; 55};
diff --git a/common/domain/mail.cpp b/common/domain/mail.cpp
index b0a3aae..9d58767 100644
--- a/common/domain/mail.cpp
+++ b/common/domain/mail.cpp
@@ -170,19 +170,6 @@ static void updateThreadingIndex(const QByteArray &identifier, const BufferAdapt
170 } 170 }
171} 171}
172 172
173void TypeImplementation<Mail>::index(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::DataStore::Transaction &transaction)
174{
175 SinkTrace() << "Indexing " << identifier;
176 getIndex().add(identifier, bufferAdaptor, transaction);
177 updateThreadingIndex(identifier, bufferAdaptor, transaction);
178}
179
180void TypeImplementation<Mail>::removeIndex(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::DataStore::Transaction &transaction)
181{
182 getIndex().remove(identifier, bufferAdaptor, transaction);
183 //TODO cleanup threading index
184}
185
186QSharedPointer<ReadPropertyMapper<TypeImplementation<Mail>::Buffer> > TypeImplementation<Mail>::initializeReadPropertyMapper() 173QSharedPointer<ReadPropertyMapper<TypeImplementation<Mail>::Buffer> > TypeImplementation<Mail>::initializeReadPropertyMapper()
187{ 174{
188 auto propertyMapper = QSharedPointer<ReadPropertyMapper<Buffer> >::create(); 175 auto propertyMapper = QSharedPointer<ReadPropertyMapper<Buffer> >::create();
diff --git a/common/domain/mail.h b/common/domain/mail.h
index 81a0d1c..c0cfc55 100644
--- a/common/domain/mail.h
+++ b/common/domain/mail.h
@@ -50,8 +50,6 @@ public:
50 typedef Sink::ApplicationDomain::Buffer::MailBuilder BufferBuilder; 50 typedef Sink::ApplicationDomain::Buffer::MailBuilder BufferBuilder;
51 static void configureIndex(TypeIndex &index); 51 static void configureIndex(TypeIndex &index);
52 static QSet<QByteArray> indexedProperties(); 52 static QSet<QByteArray> indexedProperties();
53 static void index(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::DataStore::Transaction &transaction);
54 static void removeIndex(const QByteArray &identifier, const BufferAdaptor &bufferAdaptor, Sink::Storage::DataStore::Transaction &transaction);
55 static QSharedPointer<ReadPropertyMapper<Buffer> > initializeReadPropertyMapper(); 53 static QSharedPointer<ReadPropertyMapper<Buffer> > initializeReadPropertyMapper();
56 static QSharedPointer<WritePropertyMapper<BufferBuilder> > initializeWritePropertyMapper(); 54 static QSharedPointer<WritePropertyMapper<BufferBuilder> > initializeWritePropertyMapper();
57}; 55};
diff --git a/common/domainadaptor.h b/common/domainadaptor.h
index 6a9d755..195d5ec 100644
--- a/common/domainadaptor.h
+++ b/common/domainadaptor.h
@@ -87,11 +87,11 @@ static void createBufferPartBuffer(const Sink::ApplicationDomain::ApplicationDom
87 * A generic adaptor implementation that uses a property mapper to read/write values. 87 * A generic adaptor implementation that uses a property mapper to read/write values.
88 */ 88 */
89template <class LocalBuffer, class ResourceBuffer> 89template <class LocalBuffer, class ResourceBuffer>
90class GenericBufferAdaptor : public Sink::ApplicationDomain::BufferAdaptor 90class DatastoreBufferAdaptor : public Sink::ApplicationDomain::BufferAdaptor
91{ 91{
92 SINK_DEBUG_AREA("bufferadaptor") 92 SINK_DEBUG_AREA("bufferadaptor")
93public: 93public:
94 GenericBufferAdaptor() : BufferAdaptor() 94 DatastoreBufferAdaptor() : BufferAdaptor()
95 { 95 {
96 } 96 }
97 97
@@ -148,18 +148,14 @@ public:
148 /** 148 /**
149 * Creates an adaptor for the given domain and resource types. 149 * Creates an adaptor for the given domain and resource types.
150 * 150 *
151 * This returns by default a GenericBufferAdaptor initialized with the corresponding property mappers. 151 * This returns by default a DatastoreBufferAdaptor initialized with the corresponding property mappers.
152 */ 152 */
153 virtual QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> createAdaptor(const Sink::Entity &entity) Q_DECL_OVERRIDE 153 virtual QSharedPointer<Sink::ApplicationDomain::BufferAdaptor> createAdaptor(const Sink::Entity &entity) Q_DECL_OVERRIDE
154 { 154 {
155 const auto resourceBuffer = Sink::EntityBuffer::readBuffer<ResourceBuffer>(entity.resource()); 155 auto adaptor = QSharedPointer<DatastoreBufferAdaptor<LocalBuffer, ResourceBuffer>>::create();
156 const auto localBuffer = Sink::EntityBuffer::readBuffer<LocalBuffer>(entity.local()); 156 adaptor->mLocalBuffer = Sink::EntityBuffer::readBuffer<LocalBuffer>(entity.local());
157 // const auto metadataBuffer = Sink::EntityBuffer::readBuffer<Sink::Metadata>(entity.metadata());
158
159 auto adaptor = QSharedPointer<GenericBufferAdaptor<LocalBuffer, ResourceBuffer>>::create();
160 adaptor->mLocalBuffer = localBuffer;
161 adaptor->mLocalMapper = mLocalMapper; 157 adaptor->mLocalMapper = mLocalMapper;
162 adaptor->mResourceBuffer = resourceBuffer; 158 adaptor->mResourceBuffer = Sink::EntityBuffer::readBuffer<ResourceBuffer>(entity.resource());
163 adaptor->mResourceMapper = mResourceMapper; 159 adaptor->mResourceMapper = mResourceMapper;
164 return adaptor; 160 return adaptor;
165 } 161 }
diff --git a/common/genericresource.cpp b/common/genericresource.cpp
index e0d395a..8f08f3d 100644
--- a/common/genericresource.cpp
+++ b/common/genericresource.cpp
@@ -59,9 +59,9 @@ class CommandProcessor : public QObject
59public: 59public:
60 CommandProcessor(Sink::Pipeline *pipeline, QList<MessageQueue *> commandQueues) : QObject(), mPipeline(pipeline), mCommandQueues(commandQueues), mProcessingLock(false) 60 CommandProcessor(Sink::Pipeline *pipeline, QList<MessageQueue *> commandQueues) : QObject(), mPipeline(pipeline), mCommandQueues(commandQueues), mProcessingLock(false)
61 { 61 {
62 mLowerBoundRevision = DataStore::maxRevision(mPipeline->storage().createTransaction(DataStore::ReadOnly, [](const Sink::Storage::DataStore::Error &error) { 62 mPipeline->startTransaction();
63 SinkWarning() << error.message; 63 mLowerBoundRevision = mPipeline->revision();
64 })); 64 mPipeline->commit();
65 65
66 for (auto queue : mCommandQueues) { 66 for (auto queue : mCommandQueues) {
67 const bool ret = connect(queue, &MessageQueue::messageReady, this, &CommandProcessor::process); 67 const bool ret = connect(queue, &MessageQueue::messageReady, this, &CommandProcessor::process);
diff --git a/common/indexupdater.h b/common/indexupdater.h
deleted file mode 100644
index 221a4ed..0000000
--- a/common/indexupdater.h
+++ /dev/null
@@ -1,91 +0,0 @@
1/*
2 * Copyright (C) 2015 Christian Mollekopf <chrigi_1@fastmail.fm>
3 *
4 * This program is free software; you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License as published by
6 * the Free Software Foundation; either version 2 of the License, or
7 * (at your option) any later version.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU General Public License for more details.
13 *
14 * You should have received a copy of the GNU General Public License
15 * along with this program; if not, write to the
16 * Free Software Foundation, Inc.,
17 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18 */
19#pragma once
20
21#include <pipeline.h>
22#include <index.h>
23
24class IndexUpdater : public Sink::Preprocessor
25{
26public:
27 IndexUpdater(const QByteArray &index, const QByteArray &type, const QByteArray &property) : mIndexIdentifier(index), mBufferType(type), mProperty(property)
28 {
29 }
30
31 void newEntity(const QByteArray &uid, qint64 revision, Sink::ApplicationDomain::BufferAdaptor &newEntity, Sink::Storage::DataStore::Transaction &transaction) Q_DECL_OVERRIDE
32 {
33 add(newEntity.getProperty(mProperty), uid, transaction);
34 }
35
36 void modifiedEntity(const QByteArray &uid, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &oldEntity, Sink::ApplicationDomain::BufferAdaptor &newEntity,
37 Sink::Storage::DataStore::Transaction &transaction) Q_DECL_OVERRIDE
38 {
39 remove(oldEntity.getProperty(mProperty), uid, transaction);
40 add(newEntity.getProperty(mProperty), uid, transaction);
41 }
42
43 void deletedEntity(const QByteArray &uid, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &oldEntity, Sink::Storage::DataStore::Transaction &transaction) Q_DECL_OVERRIDE
44 {
45 remove(oldEntity.getProperty(mProperty), uid, transaction);
46 }
47
48private:
49 void add(const QVariant &value, const QByteArray &uid, Sink::Storage::DataStore::Transaction &transaction)
50 {
51 if (value.isValid()) {
52 Index(mIndexIdentifier, transaction).add(value.toByteArray(), uid);
53 }
54 }
55
56 void remove(const QVariant &value, const QByteArray &uid, Sink::Storage::DataStore::Transaction &transaction)
57 {
58 if (value.isValid()) {
59 const auto data = value.toByteArray();
60 if (!data.isEmpty()) {
61 Index(mIndexIdentifier, transaction).remove(data, uid);
62 }
63 }
64 }
65
66 QByteArray mIndexIdentifier;
67 QByteArray mBufferType;
68 QByteArray mProperty;
69};
70
71template <typename DomainType>
72class DefaultIndexUpdater : public Sink::Preprocessor
73{
74public:
75 void newEntity(const QByteArray &uid, qint64 revision, Sink::ApplicationDomain::BufferAdaptor &newEntity, Sink::Storage::DataStore::Transaction &transaction) Q_DECL_OVERRIDE
76 {
77 Sink::ApplicationDomain::TypeImplementation<DomainType>::index(uid, newEntity, transaction);
78 }
79
80 void modifiedEntity(const QByteArray &uid, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &oldEntity, Sink::ApplicationDomain::BufferAdaptor &newEntity,
81 Sink::Storage::DataStore::Transaction &transaction) Q_DECL_OVERRIDE
82 {
83 Sink::ApplicationDomain::TypeImplementation<DomainType>::removeIndex(uid, oldEntity, transaction);
84 Sink::ApplicationDomain::TypeImplementation<DomainType>::index(uid, newEntity, transaction);
85 }
86
87 void deletedEntity(const QByteArray &uid, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &oldEntity, Sink::Storage::DataStore::Transaction &transaction) Q_DECL_OVERRIDE
88 {
89 Sink::ApplicationDomain::TypeImplementation<DomainType>::removeIndex(uid, oldEntity, transaction);
90 }
91};
diff --git a/common/mailpreprocessor.cpp b/common/mailpreprocessor.cpp
index b978323..d45afe6 100644
--- a/common/mailpreprocessor.cpp
+++ b/common/mailpreprocessor.cpp
@@ -116,7 +116,7 @@ static void updatedIndexedProperties(Sink::ApplicationDomain::Mail &mail, KMime:
116 } 116 }
117} 117}
118 118
119void MailPropertyExtractor::newEntity(Sink::ApplicationDomain::Mail &mail, Sink::Storage::DataStore::Transaction &transaction) 119void MailPropertyExtractor::newEntity(Sink::ApplicationDomain::Mail &mail)
120{ 120{
121 MimeMessageReader mimeMessageReader(getFilePathFromMimeMessagePath(mail.getMimeMessagePath())); 121 MimeMessageReader mimeMessageReader(getFilePathFromMimeMessagePath(mail.getMimeMessagePath()));
122 auto msg = mimeMessageReader.mimeMessage(); 122 auto msg = mimeMessageReader.mimeMessage();
@@ -125,7 +125,7 @@ void MailPropertyExtractor::newEntity(Sink::ApplicationDomain::Mail &mail, Sink:
125 } 125 }
126} 126}
127 127
128void MailPropertyExtractor::modifiedEntity(const Sink::ApplicationDomain::Mail &oldMail, Sink::ApplicationDomain::Mail &newMail,Sink::Storage::DataStore::Transaction &transaction) 128void MailPropertyExtractor::modifiedEntity(const Sink::ApplicationDomain::Mail &oldMail, Sink::ApplicationDomain::Mail &newMail)
129{ 129{
130 MimeMessageReader mimeMessageReader(getFilePathFromMimeMessagePath(newMail.getMimeMessagePath())); 130 MimeMessageReader mimeMessageReader(getFilePathFromMimeMessagePath(newMail.getMimeMessagePath()));
131 auto msg = mimeMessageReader.mimeMessage(); 131 auto msg = mimeMessageReader.mimeMessage();
@@ -161,21 +161,21 @@ QString MimeMessageMover::moveMessage(const QString &oldPath, const Sink::Applic
161 return oldPath; 161 return oldPath;
162} 162}
163 163
164void MimeMessageMover::newEntity(Sink::ApplicationDomain::Mail &mail, Sink::Storage::DataStore::Transaction &transaction) 164void MimeMessageMover::newEntity(Sink::ApplicationDomain::Mail &mail)
165{ 165{
166 if (!mail.getMimeMessagePath().isEmpty()) { 166 if (!mail.getMimeMessagePath().isEmpty()) {
167 mail.setMimeMessagePath(moveMessage(mail.getMimeMessagePath(), mail)); 167 mail.setMimeMessagePath(moveMessage(mail.getMimeMessagePath(), mail));
168 } 168 }
169} 169}
170 170
171void MimeMessageMover::modifiedEntity(const Sink::ApplicationDomain::Mail &oldMail, Sink::ApplicationDomain::Mail &newMail, Sink::Storage::DataStore::Transaction &transaction) 171void MimeMessageMover::modifiedEntity(const Sink::ApplicationDomain::Mail &oldMail, Sink::ApplicationDomain::Mail &newMail)
172{ 172{
173 if (!newMail.getMimeMessagePath().isEmpty()) { 173 if (!newMail.getMimeMessagePath().isEmpty()) {
174 newMail.setMimeMessagePath(moveMessage(newMail.getMimeMessagePath(), newMail)); 174 newMail.setMimeMessagePath(moveMessage(newMail.getMimeMessagePath(), newMail));
175 } 175 }
176} 176}
177 177
178void MimeMessageMover::deletedEntity(const Sink::ApplicationDomain::Mail &mail, Sink::Storage::DataStore::Transaction &transaction) 178void MimeMessageMover::deletedEntity(const Sink::ApplicationDomain::Mail &mail)
179{ 179{
180 QFile::remove(mail.getMimeMessagePath()); 180 QFile::remove(mail.getMimeMessagePath());
181} 181}
diff --git a/common/mailpreprocessor.h b/common/mailpreprocessor.h
index c66517e..f979f22 100644
--- a/common/mailpreprocessor.h
+++ b/common/mailpreprocessor.h
@@ -24,8 +24,8 @@ class SINK_EXPORT MailPropertyExtractor : public Sink::EntityPreprocessor<Sink::
24{ 24{
25public: 25public:
26 virtual ~MailPropertyExtractor(){} 26 virtual ~MailPropertyExtractor(){}
27 virtual void newEntity(Sink::ApplicationDomain::Mail &mail, Sink::Storage::DataStore::Transaction &transaction) Q_DECL_OVERRIDE; 27 virtual void newEntity(Sink::ApplicationDomain::Mail &mail) Q_DECL_OVERRIDE;
28 virtual void modifiedEntity(const Sink::ApplicationDomain::Mail &oldMail, Sink::ApplicationDomain::Mail &newMail,Sink::Storage::DataStore::Transaction &transaction) Q_DECL_OVERRIDE; 28 virtual void modifiedEntity(const Sink::ApplicationDomain::Mail &oldMail, Sink::ApplicationDomain::Mail &newMail) Q_DECL_OVERRIDE;
29protected: 29protected:
30 virtual QString getFilePathFromMimeMessagePath(const QString &) const; 30 virtual QString getFilePathFromMimeMessagePath(const QString &) const;
31}; 31};
@@ -36,9 +36,9 @@ public:
36 MimeMessageMover(); 36 MimeMessageMover();
37 virtual ~MimeMessageMover(){} 37 virtual ~MimeMessageMover(){}
38 38
39 void newEntity(Sink::ApplicationDomain::Mail &mail, Sink::Storage::DataStore::Transaction &transaction) Q_DECL_OVERRIDE; 39 void newEntity(Sink::ApplicationDomain::Mail &mail) Q_DECL_OVERRIDE;
40 void modifiedEntity(const Sink::ApplicationDomain::Mail &oldMail, Sink::ApplicationDomain::Mail &newMail, Sink::Storage::DataStore::Transaction &transaction) Q_DECL_OVERRIDE; 40 void modifiedEntity(const Sink::ApplicationDomain::Mail &oldMail, Sink::ApplicationDomain::Mail &newMail) Q_DECL_OVERRIDE;
41 void deletedEntity(const Sink::ApplicationDomain::Mail &mail, Sink::Storage::DataStore::Transaction &transaction) Q_DECL_OVERRIDE; 41 void deletedEntity(const Sink::ApplicationDomain::Mail &mail) Q_DECL_OVERRIDE;
42 42
43private: 43private:
44 QString moveMessage(const QString &oldPath, const Sink::ApplicationDomain::Mail &mail); 44 QString moveMessage(const QString &oldPath, const Sink::ApplicationDomain::Mail &mail);
diff --git a/common/pipeline.cpp b/common/pipeline.cpp
index e257857..ea59ae9 100644
--- a/common/pipeline.cpp
+++ b/common/pipeline.cpp
@@ -37,6 +37,7 @@
37#include "adaptorfactoryregistry.h" 37#include "adaptorfactoryregistry.h"
38#include "definitions.h" 38#include "definitions.h"
39#include "bufferutils.h" 39#include "bufferutils.h"
40#include "storage/entitystore.h"
40 41
41SINK_DEBUG_AREA("pipeline") 42SINK_DEBUG_AREA("pipeline")
42 43
@@ -46,31 +47,18 @@ using namespace Sink::Storage;
46class Pipeline::Private 47class Pipeline::Private
47{ 48{
48public: 49public:
49 Private(const ResourceContext &context) : resourceContext(context), storage(Sink::storageLocation(), context.instanceId(), DataStore::ReadWrite), revisionChanged(false) 50 Private(const ResourceContext &context) : resourceContext(context), entityStore(context), revisionChanged(false)
50 { 51 {
51 } 52 }
52 53
53 ResourceContext resourceContext; 54 ResourceContext resourceContext;
54 DataStore storage; 55 Storage::EntityStore entityStore;
55 DataStore::Transaction transaction;
56 QHash<QString, QVector<QSharedPointer<Preprocessor>>> processors; 56 QHash<QString, QVector<QSharedPointer<Preprocessor>>> processors;
57 bool revisionChanged; 57 bool revisionChanged;
58 void storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid);
59 QTime transactionTime; 58 QTime transactionTime;
60 int transactionItemCount; 59 int transactionItemCount;
61}; 60};
62 61
63void Pipeline::Private::storeNewRevision(qint64 newRevision, const flatbuffers::FlatBufferBuilder &fbb, const QByteArray &bufferType, const QByteArray &uid)
64{
65 SinkTrace() << "Committing new revision: " << uid << newRevision;
66 DataStore::mainDatabase(transaction, bufferType)
67 .write(DataStore::assembleKey(uid, newRevision), BufferUtils::extractBuffer(fbb),
68 [uid, newRevision](const DataStore::Error &error) { SinkWarning() << "Failed to write entity" << uid << newRevision; });
69 revisionChanged = true;
70 DataStore::setMaxRevision(transaction, newRevision);
71 DataStore::recordRevision(transaction, newRevision, uid, bufferType);
72}
73
74 62
75Pipeline::Pipeline(const ResourceContext &context) : QObject(nullptr), d(new Private(context)) 63Pipeline::Pipeline(const ResourceContext &context) : QObject(nullptr), d(new Private(context))
76{ 64{
@@ -78,7 +66,6 @@ Pipeline::Pipeline(const ResourceContext &context) : QObject(nullptr), d(new Pri
78 66
79Pipeline::~Pipeline() 67Pipeline::~Pipeline()
80{ 68{
81 d->transaction = DataStore::Transaction();
82} 69}
83 70
84void Pipeline::setPreprocessors(const QString &entityType, const QVector<Preprocessor *> &processors) 71void Pipeline::setPreprocessors(const QString &entityType, const QVector<Preprocessor *> &processors)
@@ -98,27 +85,10 @@ void Pipeline::startTransaction()
98 // for (auto processor : d->processors[bufferType]) { 85 // for (auto processor : d->processors[bufferType]) {
99 // processor->startBatch(); 86 // processor->startBatch();
100 // } 87 // }
101 if (d->transaction) {
102 return;
103 }
104 SinkTrace() << "Starting transaction."; 88 SinkTrace() << "Starting transaction.";
105 d->transactionTime.start(); 89 d->transactionTime.start();
106 d->transactionItemCount = 0; 90 d->transactionItemCount = 0;
107 d->transaction = storage().createTransaction(DataStore::ReadWrite, [](const DataStore::Error &error) { 91 d->entityStore.startTransaction(DataStore::ReadWrite);
108 SinkWarning() << error.message;
109 });
110
111 //FIXME this is a temporary measure to recover from a failure to open the named databases correctly.
112 //Once the actual problem is fixed it will be enough to simply crash if we open the wrong database (which we check in openDatabase already).
113 //It seems like the validateNamedDatabase calls actually stops the mdb_put failures during sync...
114 if (d->storage.exists()) {
115 while (!d->transaction.validateNamedDatabases()) {
116 SinkWarning() << "Opened an invalid transaction!!!!!!";
117 d->transaction = storage().createTransaction(DataStore::ReadWrite, [](const DataStore::Error &error) {
118 SinkWarning() << error.message;
119 });
120 }
121 }
122} 92}
123 93
124void Pipeline::commit() 94void Pipeline::commit()
@@ -129,34 +99,20 @@ void Pipeline::commit()
129 // processor->finalize(); 99 // processor->finalize();
130 // } 100 // }
131 if (!d->revisionChanged) { 101 if (!d->revisionChanged) {
132 d->transaction.abort(); 102 d->entityStore.abortTransaction();
133 d->transaction = DataStore::Transaction();
134 return; 103 return;
135 } 104 }
136 const auto revision = DataStore::maxRevision(d->transaction); 105 const auto revision = d->entityStore.maxRevision();
137 const auto elapsed = d->transactionTime.elapsed(); 106 const auto elapsed = d->transactionTime.elapsed();
138 SinkLog() << "Committing revision: " << revision << ":" << d->transactionItemCount << " items in: " << Log::TraceTime(elapsed) << " " 107 SinkLog() << "Committing revision: " << revision << ":" << d->transactionItemCount << " items in: " << Log::TraceTime(elapsed) << " "
139 << (double)elapsed / (double)qMax(d->transactionItemCount, 1) << "[ms/item]"; 108 << (double)elapsed / (double)qMax(d->transactionItemCount, 1) << "[ms/item]";
140 if (d->transaction) { 109 d->entityStore.commitTransaction();
141 d->transaction.commit();
142 }
143 d->transaction = DataStore::Transaction();
144 if (d->revisionChanged) { 110 if (d->revisionChanged) {
145 d->revisionChanged = false; 111 d->revisionChanged = false;
146 emit revisionUpdated(revision); 112 emit revisionUpdated(revision);
147 } 113 }
148} 114}
149 115
150DataStore::Transaction &Pipeline::transaction()
151{
152 return d->transaction;
153}
154
155DataStore &Pipeline::storage() const
156{
157 return d->storage;
158}
159
160KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size) 116KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size)
161{ 117{
162 d->transactionItemCount++; 118 d->transactionItemCount++;
@@ -175,7 +131,7 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size)
175 QByteArray key; 131 QByteArray key;
176 if (createEntity->entityId()) { 132 if (createEntity->entityId()) {
177 key = QByteArray(reinterpret_cast<char const *>(createEntity->entityId()->Data()), createEntity->entityId()->size()); 133 key = QByteArray(reinterpret_cast<char const *>(createEntity->entityId()->Data()), createEntity->entityId()->size());
178 if (DataStore::mainDatabase(d->transaction, bufferType).contains(key)) { 134 if (d->entityStore.contains(bufferType, key)) {
179 SinkError() << "An entity with this id already exists: " << key; 135 SinkError() << "An entity with this id already exists: " << key;
180 return KAsync::error<qint64>(0); 136 return KAsync::error<qint64>(0);
181 } 137 }
@@ -208,29 +164,23 @@ KAsync::Job<qint64> Pipeline::newEntity(void const *command, size_t size)
208 164
209 auto adaptor = adaptorFactory->createAdaptor(*entity); 165 auto adaptor = adaptorFactory->createAdaptor(*entity);
210 auto memoryAdaptor = QSharedPointer<Sink::ApplicationDomain::MemoryBufferAdaptor>::create(*(adaptor), adaptor->availableProperties()); 166 auto memoryAdaptor = QSharedPointer<Sink::ApplicationDomain::MemoryBufferAdaptor>::create(*(adaptor), adaptor->availableProperties());
211 foreach (const auto &processor, d->processors[bufferType]) {
212 processor->newEntity(key, DataStore::maxRevision(d->transaction) + 1, *memoryAdaptor, d->transaction);
213 }
214 //The maxRevision may have changed meanwhile if the entity created sub-entities
215 const qint64 newRevision = DataStore::maxRevision(d->transaction) + 1;
216
217 // Add metadata buffer
218 flatbuffers::FlatBufferBuilder metadataFbb;
219 auto metadataBuilder = MetadataBuilder(metadataFbb);
220 metadataBuilder.add_revision(newRevision);
221 metadataBuilder.add_operation(Operation_Creation);
222 metadataBuilder.add_replayToSource(replayToSource);
223 auto metadataBuffer = metadataBuilder.Finish();
224 FinishMetadataBuffer(metadataFbb, metadataBuffer);
225 167
226 flatbuffers::FlatBufferBuilder fbb; 168 d->revisionChanged = true;
227 adaptorFactory->createBuffer(memoryAdaptor, fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize()); 169 auto revision = d->entityStore.maxRevision();
170 auto o = Sink::ApplicationDomain::ApplicationDomainType{d->resourceContext.instanceId(), key, revision, memoryAdaptor};
171 o.setChangedProperties(o.availableProperties().toSet());
228 172
229 d->storeNewRevision(newRevision, fbb, bufferType, key); 173 auto preprocess = [&, this](ApplicationDomain::ApplicationDomainType &newEntity) {
174 foreach (const auto &processor, d->processors[bufferType]) {
175 processor->newEntity(newEntity);
176 }
177 };
230 178
231 //FIXME entityStore->create(bufferType, memoryAdaptor, replayToSource) 179 if (!d->entityStore.add(bufferType, o, replayToSource, preprocess)) {
180 return KAsync::error<qint64>(0);
181 }
232 182
233 return KAsync::value(newRevision); 183 return KAsync::value(d->entityStore.maxRevision());
234} 184}
235 185
236KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size) 186KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size)
@@ -254,6 +204,7 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size)
254 } 204 }
255 const qint64 baseRevision = modifyEntity->revision(); 205 const qint64 baseRevision = modifyEntity->revision();
256 const bool replayToSource = modifyEntity->replayToSource(); 206 const bool replayToSource = modifyEntity->replayToSource();
207
257 const QByteArray bufferType = QByteArray(reinterpret_cast<char const *>(modifyEntity->domainType()->Data()), modifyEntity->domainType()->size()); 208 const QByteArray bufferType = QByteArray(reinterpret_cast<char const *>(modifyEntity->domainType()->Data()), modifyEntity->domainType()->size());
258 const QByteArray key = QByteArray(reinterpret_cast<char const *>(modifyEntity->entityId()->Data()), modifyEntity->entityId()->size()); 209 const QByteArray key = QByteArray(reinterpret_cast<char const *>(modifyEntity->entityId()->Data()), modifyEntity->entityId()->size());
259 SinkTrace() << "Modified Entity. Type: " << bufferType << "uid: "<< key << " replayToSource: " << replayToSource; 210 SinkTrace() << "Modified Entity. Type: " << bufferType << "uid: "<< key << " replayToSource: " << replayToSource;
@@ -269,7 +220,6 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size)
269 } 220 }
270 } 221 }
271 222
272 // TODO use only readPropertyMapper and writePropertyMapper
273 auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceContext.resourceType, bufferType); 223 auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceContext.resourceType, bufferType);
274 if (!adaptorFactory) { 224 if (!adaptorFactory) {
275 SinkWarning() << "no adaptor factory for type " << bufferType; 225 SinkWarning() << "no adaptor factory for type " << bufferType;
@@ -278,72 +228,26 @@ KAsync::Job<qint64> Pipeline::modifiedEntity(void const *command, size_t size)
278 228
279 auto diffEntity = GetEntity(modifyEntity->delta()->Data()); 229 auto diffEntity = GetEntity(modifyEntity->delta()->Data());
280 Q_ASSERT(diffEntity); 230 Q_ASSERT(diffEntity);
281 auto diff = adaptorFactory->createAdaptor(*diffEntity); 231 Sink::ApplicationDomain::ApplicationDomainType diff{d->resourceContext.instanceId(), key, baseRevision, adaptorFactory->createAdaptor(*diffEntity)};
282 232 diff.setChangedProperties(changeset.toSet());
283 QSharedPointer<ApplicationDomain::BufferAdaptor> current;
284 DataStore::mainDatabase(d->transaction, bufferType)
285 .findLatest(key,
286 [&current, adaptorFactory](const QByteArray &key, const QByteArray &data) -> bool {
287 EntityBuffer buffer(const_cast<const char *>(data.data()), data.size());
288 if (!buffer.isValid()) {
289 SinkWarning() << "Read invalid buffer from disk";
290 } else {
291 current = adaptorFactory->createAdaptor(buffer.entity());
292 }
293 return false;
294 },
295 [baseRevision](const DataStore::Error &error) { SinkWarning() << "Failed to read old revision from storage: " << error.message << "Revision: " << baseRevision; });
296
297 if (!current) {
298 SinkWarning() << "Failed to read local value " << key;
299 return KAsync::error<qint64>(0);
300 }
301
302 auto newAdaptor = QSharedPointer<Sink::ApplicationDomain::MemoryBufferAdaptor>::create(*(current), current->availableProperties());
303
304 // Apply diff
305 // FIXME only apply the properties that are available in the buffer
306 SinkTrace() << "Applying changed properties: " << changeset;
307 for (const auto &property : changeset) {
308 const auto value = diff->getProperty(property);
309 if (value.isValid()) {
310 newAdaptor->setProperty(property, value);
311 }
312 }
313 233
314 // Remove deletions 234 QByteArrayList deletions;
315 if (modifyEntity->deletions()) { 235 if (modifyEntity->deletions()) {
316 for (const flatbuffers::String *property : *modifyEntity->deletions()) { 236 deletions = BufferUtils::fromVector(*modifyEntity->deletions());
317 newAdaptor->setProperty(BufferUtils::extractBuffer(property), QVariant());
318 }
319 } 237 }
320 238
321 newAdaptor->resetChangedProperties(); 239 auto preprocess = [&, this](const ApplicationDomain::ApplicationDomainType &oldEntity, ApplicationDomain::ApplicationDomainType &newEntity) {
322 foreach (const auto &processor, d->processors[bufferType]) { 240 foreach (const auto &processor, d->processors[bufferType]) {
323 processor->modifiedEntity(key, DataStore::maxRevision(d->transaction) + 1, *current, *newAdaptor, d->transaction); 241 processor->modifiedEntity(oldEntity, newEntity);
324 } 242 }
325 //The maxRevision may have changed meanwhile if the entity created sub-entities 243 };
326 const qint64 newRevision = DataStore::maxRevision(d->transaction) + 1;
327 244
328 // Add metadata buffer 245 d->revisionChanged = true;
329 flatbuffers::FlatBufferBuilder metadataFbb; 246 if (!d->entityStore.modify(bufferType, diff, deletions, replayToSource, preprocess)) {
330 { 247 return KAsync::error<qint64>(0);
331 //We add availableProperties to account for the properties that have been changed by the preprocessors
332 auto modifiedProperties = BufferUtils::toVector(metadataFbb, changeset + newAdaptor->changedProperties());
333 auto metadataBuilder = MetadataBuilder(metadataFbb);
334 metadataBuilder.add_revision(newRevision);
335 metadataBuilder.add_operation(Operation_Modification);
336 metadataBuilder.add_replayToSource(replayToSource);
337 metadataBuilder.add_modifiedProperties(modifiedProperties);
338 auto metadataBuffer = metadataBuilder.Finish();
339 FinishMetadataBuffer(metadataFbb, metadataBuffer);
340 } 248 }
341 249
342 flatbuffers::FlatBufferBuilder fbb; 250 return KAsync::value(d->entityStore.maxRevision());
343 adaptorFactory->createBuffer(newAdaptor, fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize());
344
345 d->storeNewRevision(newRevision, fbb, bufferType, key);
346 return KAsync::value(newRevision);
347} 251}
348 252
349KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size) 253KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size)
@@ -364,106 +268,38 @@ KAsync::Job<qint64> Pipeline::deletedEntity(void const *command, size_t size)
364 const QByteArray key = QByteArray(reinterpret_cast<char const *>(deleteEntity->entityId()->Data()), deleteEntity->entityId()->size()); 268 const QByteArray key = QByteArray(reinterpret_cast<char const *>(deleteEntity->entityId()->Data()), deleteEntity->entityId()->size());
365 SinkTrace() << "Deleted Entity. Type: " << bufferType << "uid: "<< key << " replayToSource: " << replayToSource; 269 SinkTrace() << "Deleted Entity. Type: " << bufferType << "uid: "<< key << " replayToSource: " << replayToSource;
366 270
367 bool found = false; 271 auto preprocess = [&, this](const ApplicationDomain::ApplicationDomainType &oldEntity) {
368 bool alreadyRemoved = false; 272 foreach (const auto &processor, d->processors[bufferType]) {
369 DataStore::mainDatabase(d->transaction, bufferType) 273 processor->deletedEntity(oldEntity);
370 .findLatest(key, 274 }
371 [&found, &alreadyRemoved](const QByteArray &key, const QByteArray &data) -> bool { 275 };
372 auto entity = GetEntity(data.data());
373 if (entity && entity->metadata()) {
374 auto metadata = GetMetadata(entity->metadata()->Data());
375 found = true;
376 if (metadata->operation() == Operation_Removal) {
377 alreadyRemoved = true;
378 }
379 }
380 return false;
381 },
382 [](const DataStore::Error &error) { SinkWarning() << "Failed to read old revision from storage: " << error.message; });
383
384 if (!found) {
385 SinkWarning() << "Failed to find entity " << key;
386 return KAsync::error<qint64>(0);
387 }
388 if (alreadyRemoved) {
389 SinkWarning() << "Entity is already removed " << key;
390 return KAsync::error<qint64>(0);
391 }
392
393 const qint64 newRevision = DataStore::maxRevision(d->transaction) + 1;
394
395 // Add metadata buffer
396 flatbuffers::FlatBufferBuilder metadataFbb;
397 auto metadataBuilder = MetadataBuilder(metadataFbb);
398 metadataBuilder.add_revision(newRevision);
399 metadataBuilder.add_operation(Operation_Removal);
400 metadataBuilder.add_replayToSource(replayToSource);
401 auto metadataBuffer = metadataBuilder.Finish();
402 FinishMetadataBuffer(metadataFbb, metadataBuffer);
403
404 flatbuffers::FlatBufferBuilder fbb;
405 EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), 0, 0, 0, 0);
406 276
407 auto adaptorFactory = Sink::AdaptorFactoryRegistry::instance().getFactory(d->resourceContext.resourceType, bufferType); 277 d->revisionChanged = true;
408 if (!adaptorFactory) { 278 if (!d->entityStore.remove(bufferType, key, replayToSource, preprocess)) {
409 SinkWarning() << "no adaptor factory for type " << bufferType;
410 return KAsync::error<qint64>(0); 279 return KAsync::error<qint64>(0);
411 } 280 }
412 281
413 QSharedPointer<ApplicationDomain::BufferAdaptor> current; 282 return KAsync::value(d->entityStore.maxRevision());
414 DataStore::mainDatabase(d->transaction, bufferType)
415 .findLatest(key,
416 [this, bufferType, newRevision, adaptorFactory, key, &current](const QByteArray &, const QByteArray &data) -> bool {
417 EntityBuffer buffer(const_cast<const char *>(data.data()), data.size());
418 if (!buffer.isValid()) {
419 SinkWarning() << "Read invalid buffer from disk";
420 } else {
421 current = adaptorFactory->createAdaptor(buffer.entity());
422 }
423 return false;
424 },
425 [this](const DataStore::Error &error) { SinkError() << "Failed to find value in pipeline: " << error.message; });
426
427 d->storeNewRevision(newRevision, fbb, bufferType, key);
428
429 foreach (const auto &processor, d->processors[bufferType]) {
430 processor->deletedEntity(key, newRevision, *current, d->transaction);
431 }
432
433 return KAsync::value(newRevision);
434} 283}
435 284
436void Pipeline::cleanupRevision(qint64 revision) 285void Pipeline::cleanupRevision(qint64 revision)
437{ 286{
287 d->entityStore.cleanupRevision(revision);
438 d->revisionChanged = true; 288 d->revisionChanged = true;
439 const auto uid = DataStore::getUidFromRevision(d->transaction, revision);
440 const auto bufferType = DataStore::getTypeFromRevision(d->transaction, revision);
441 SinkTrace() << "Cleaning up revision " << revision << uid << bufferType;
442 DataStore::mainDatabase(d->transaction, bufferType)
443 .scan(uid,
444 [&](const QByteArray &key, const QByteArray &data) -> bool {
445 EntityBuffer buffer(const_cast<const char *>(data.data()), data.size());
446 if (!buffer.isValid()) {
447 SinkWarning() << "Read invalid buffer from disk";
448 } else {
449 const auto metadata = flatbuffers::GetRoot<Metadata>(buffer.metadataBuffer());
450 const qint64 rev = metadata->revision();
451 // Remove old revisions, and the current if the entity has already been removed
452 if (rev < revision || metadata->operation() == Operation_Removal) {
453 DataStore::removeRevision(d->transaction, rev);
454 DataStore::mainDatabase(d->transaction, bufferType).remove(key);
455 }
456 }
457
458 return true;
459 },
460 [](const DataStore::Error &error) { SinkWarning() << "Error while reading: " << error.message; }, true);
461 DataStore::setCleanedUpRevision(d->transaction, revision);
462} 289}
463 290
464qint64 Pipeline::cleanedUpRevision() 291qint64 Pipeline::cleanedUpRevision()
465{ 292{
466 return DataStore::cleanedUpRevision(d->transaction); 293 /* return d->entityStore.cleanedUpRevision(); */
294 /* return DataStore::cleanedUpRevision(d->transaction); */
295 //FIXME Just move the whole cleanup revision iteration into the entitystore
296 return 0;
297}
298
299qint64 Pipeline::revision()
300{
301 //FIXME Just move the whole cleanup revision iteration into the entitystore
302 return 0;
467} 303}
468 304
469class Preprocessor::Private { 305class Preprocessor::Private {
@@ -492,7 +328,7 @@ void Preprocessor::startBatch()
492{ 328{
493} 329}
494 330
495void Preprocessor::finalize() 331void Preprocessor::finalizeBatch()
496{ 332{
497} 333}
498 334
@@ -510,7 +346,6 @@ void Preprocessor::createEntity(const Sink::ApplicationDomain::ApplicationDomain
510 346
511 flatbuffers::FlatBufferBuilder fbb; 347 flatbuffers::FlatBufferBuilder fbb;
512 auto entityId = fbb.CreateString(entity.identifier()); 348 auto entityId = fbb.CreateString(entity.identifier());
513 // This is the resource buffer type and not the domain type
514 auto type = fbb.CreateString(typeName); 349 auto type = fbb.CreateString(typeName);
515 auto delta = Sink::EntityBuffer::appendAsVector(fbb, entityBuffer.constData(), entityBuffer.size()); 350 auto delta = Sink::EntityBuffer::appendAsVector(fbb, entityBuffer.constData(), entityBuffer.size());
516 auto location = Sink::Commands::CreateCreateEntity(fbb, entityId, type, delta); 351 auto location = Sink::Commands::CreateCreateEntity(fbb, entityId, type, delta);
diff --git a/common/pipeline.h b/common/pipeline.h
index bf94017..0461507 100644
--- a/common/pipeline.h
+++ b/common/pipeline.h
@@ -31,7 +31,8 @@
31 31
32#include <Async/Async> 32#include <Async/Async>
33 33
34#include "domainadaptor.h" 34#include <bufferadaptor.h>
35#include <resourcecontext.h>
35 36
36namespace Sink { 37namespace Sink {
37 38
@@ -45,16 +46,14 @@ public:
45 Pipeline(const ResourceContext &context); 46 Pipeline(const ResourceContext &context);
46 ~Pipeline(); 47 ~Pipeline();
47 48
48 Storage::DataStore &storage() const;
49
50 void setPreprocessors(const QString &entityType, const QVector<Preprocessor *> &preprocessors); 49 void setPreprocessors(const QString &entityType, const QVector<Preprocessor *> &preprocessors);
51 void startTransaction(); 50 void startTransaction();
52 void commit(); 51 void commit();
53 Storage::DataStore::Transaction &transaction();
54 52
55 KAsync::Job<qint64> newEntity(void const *command, size_t size); 53 KAsync::Job<qint64> newEntity(void const *command, size_t size);
56 KAsync::Job<qint64> modifiedEntity(void const *command, size_t size); 54 KAsync::Job<qint64> modifiedEntity(void const *command, size_t size);
57 KAsync::Job<qint64> deletedEntity(void const *command, size_t size); 55 KAsync::Job<qint64> deletedEntity(void const *command, size_t size);
56
58 /* 57 /*
59 * Cleans up a single revision. 58 * Cleans up a single revision.
60 * 59 *
@@ -66,6 +65,7 @@ public:
66 * Returns the latest cleaned up revision. 65 * Returns the latest cleaned up revision.
67 */ 66 */
68 qint64 cleanedUpRevision(); 67 qint64 cleanedUpRevision();
68 qint64 revision();
69 69
70signals: 70signals:
71 void revisionUpdated(qint64); 71 void revisionUpdated(qint64);
@@ -82,11 +82,10 @@ public:
82 virtual ~Preprocessor(); 82 virtual ~Preprocessor();
83 83
84 virtual void startBatch(); 84 virtual void startBatch();
85 virtual void newEntity(const QByteArray &uid, qint64 revision, ApplicationDomain::BufferAdaptor &newEntity, Storage::DataStore::Transaction &transaction) {}; 85 virtual void newEntity(ApplicationDomain::ApplicationDomainType &newEntity) {};
86 virtual void modifiedEntity(const QByteArray &uid, qint64 revision, const ApplicationDomain::BufferAdaptor &oldEntity, 86 virtual void modifiedEntity(const ApplicationDomain::ApplicationDomainType &oldEntity, ApplicationDomain::ApplicationDomainType &newEntity) {};
87 ApplicationDomain::BufferAdaptor &newEntity, Storage::DataStore::Transaction &transaction) {}; 87 virtual void deletedEntity(const ApplicationDomain::ApplicationDomainType &oldEntity) {};
88 virtual void deletedEntity(const QByteArray &uid, qint64 revision, const ApplicationDomain::BufferAdaptor &oldEntity, Storage::DataStore::Transaction &transaction) {}; 88 virtual void finalizeBatch();
89 virtual void finalize();
90 89
91 void setup(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier, Pipeline *); 90 void setup(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier, Pipeline *);
92 91
@@ -110,27 +109,28 @@ template<typename DomainType>
110class SINK_EXPORT EntityPreprocessor: public Preprocessor 109class SINK_EXPORT EntityPreprocessor: public Preprocessor
111{ 110{
112public: 111public:
113 virtual void newEntity(DomainType &, Storage::DataStore::Transaction &transaction) {}; 112 virtual void newEntity(DomainType &) {};
114 virtual void modifiedEntity(const DomainType &oldEntity, DomainType &newEntity, Storage::DataStore::Transaction &transaction) {}; 113 virtual void modifiedEntity(const DomainType &oldEntity, DomainType &newEntity) {};
115 virtual void deletedEntity(const DomainType &oldEntity, Storage::DataStore::Transaction &transaction) {}; 114 virtual void deletedEntity(const DomainType &oldEntity) {};
116 115
117private: 116private:
118 static void nullDeleter(ApplicationDomain::BufferAdaptor *) {} 117 virtual void newEntity(ApplicationDomain::ApplicationDomainType &newEntity_) Q_DECL_OVERRIDE
119 virtual void newEntity(const QByteArray &uid, qint64 revision, ApplicationDomain::BufferAdaptor &bufferAdaptor, Storage::DataStore::Transaction &transaction) Q_DECL_OVERRIDE
120 { 118 {
121 auto o = DomainType("", uid, revision, QSharedPointer<ApplicationDomain::BufferAdaptor>(&bufferAdaptor, nullDeleter)); 119 //Modifications still work due to the underlying shared adaptor
122 newEntity(o, transaction); 120 auto newEntityCopy = DomainType(newEntity_);
121 newEntity(newEntityCopy);
123 } 122 }
124 123
125 virtual void modifiedEntity(const QByteArray &uid, qint64 revision, const ApplicationDomain::BufferAdaptor &oldEntity, 124 virtual void modifiedEntity(const ApplicationDomain::ApplicationDomainType &oldEntity, ApplicationDomain::ApplicationDomainType &newEntity_) Q_DECL_OVERRIDE
126 ApplicationDomain::BufferAdaptor &bufferAdaptor, Storage::DataStore::Transaction &transaction) Q_DECL_OVERRIDE
127 { 125 {
128 auto o = DomainType("", uid, revision, QSharedPointer<ApplicationDomain::BufferAdaptor>(&bufferAdaptor, nullDeleter)); 126 //Modifications still work due to the underlying shared adaptor
129 modifiedEntity(DomainType("", uid, 0, QSharedPointer<ApplicationDomain::BufferAdaptor>(const_cast<ApplicationDomain::BufferAdaptor*>(&oldEntity), nullDeleter)), o, transaction); 127 auto newEntityCopy = DomainType(newEntity_);
128 modifiedEntity(DomainType(oldEntity), newEntityCopy);
130 } 129 }
131 virtual void deletedEntity(const QByteArray &uid, qint64 revision, const ApplicationDomain::BufferAdaptor &bufferAdaptor, Storage::DataStore::Transaction &transaction) Q_DECL_OVERRIDE 130
131 virtual void deletedEntity(const ApplicationDomain::ApplicationDomainType &oldEntity) Q_DECL_OVERRIDE
132 { 132 {
133 deletedEntity(DomainType("", uid, revision, QSharedPointer<ApplicationDomain::BufferAdaptor>(const_cast<ApplicationDomain::BufferAdaptor*>(&bufferAdaptor), nullDeleter)), transaction); 133 deletedEntity(DomainType(oldEntity));
134 } 134 }
135}; 135};
136 136
diff --git a/common/specialpurposepreprocessor.cpp b/common/specialpurposepreprocessor.cpp
index 0fd8e34..920f78a 100644
--- a/common/specialpurposepreprocessor.cpp
+++ b/common/specialpurposepreprocessor.cpp
@@ -46,11 +46,11 @@ QByteArray getSpecialPurposeType(const QString &name)
46 46
47SpecialPurposeProcessor::SpecialPurposeProcessor(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier) : mResourceType(resourceType), mResourceInstanceIdentifier(resourceInstanceIdentifier) {} 47SpecialPurposeProcessor::SpecialPurposeProcessor(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier) : mResourceType(resourceType), mResourceInstanceIdentifier(resourceInstanceIdentifier) {}
48 48
49QByteArray SpecialPurposeProcessor::ensureFolder(Sink::Storage::DataStore::Transaction &transaction, const QByteArray &specialPurpose) 49QByteArray SpecialPurposeProcessor::ensureFolder(const QByteArray &specialPurpose)
50{ 50{
51 /* if (!mSpecialPurposeFolders.contains(specialPurpose)) { */ 51 /* if (!mSpecialPurposeFolders.contains(specialPurpose)) { */
52 /* //Try to find an existing drafts folder */ 52 /* //Try to find an existing drafts folder */
53 /* Sink::EntityReader<ApplicationDomain::Folder> reader(mResourceType, mResourceInstanceIdentifier, transaction); */ 53 /* Sink::EntityReader<ApplicationDomain::Folder> reader(mResourceType, mResourceInstanceIdentifier); */
54 /* reader.query(Sink::Query().filter<ApplicationDomain::Folder::SpecialPurpose>(Query::Comparator(specialPurpose, Query::Comparator::Contains)), */ 54 /* reader.query(Sink::Query().filter<ApplicationDomain::Folder::SpecialPurpose>(Query::Comparator(specialPurpose, Query::Comparator::Contains)), */
55 /* [this, specialPurpose](const ApplicationDomain::Folder &f) -> bool{ */ 55 /* [this, specialPurpose](const ApplicationDomain::Folder &f) -> bool{ */
56 /* mSpecialPurposeFolders.insert(specialPurpose, f.identifier()); */ 56 /* mSpecialPurposeFolders.insert(specialPurpose, f.identifier()); */
@@ -70,23 +70,23 @@ QByteArray SpecialPurposeProcessor::ensureFolder(Sink::Storage::DataStore::Trans
70 return mSpecialPurposeFolders.value(specialPurpose); 70 return mSpecialPurposeFolders.value(specialPurpose);
71} 71}
72 72
73void SpecialPurposeProcessor::moveToFolder(Sink::ApplicationDomain::BufferAdaptor &newEntity, Sink::Storage::DataStore::Transaction &transaction) 73void SpecialPurposeProcessor::moveToFolder(Sink::ApplicationDomain::ApplicationDomainType &newEntity)
74{ 74{
75 if (newEntity.getProperty("trash").toBool()) { 75 if (newEntity.getProperty("trash").toBool()) {
76 newEntity.setProperty("folder", ensureFolder(transaction, "trash")); 76 newEntity.setProperty("folder", ensureFolder("trash"));
77 return; 77 return;
78 } 78 }
79 if (newEntity.getProperty("draft").toBool()) { 79 if (newEntity.getProperty("draft").toBool()) {
80 newEntity.setProperty("folder", ensureFolder(transaction, "drafts")); 80 newEntity.setProperty("folder", ensureFolder("drafts"));
81 } 81 }
82} 82}
83 83
84void SpecialPurposeProcessor::newEntity(const QByteArray &uid, qint64 revision, Sink::ApplicationDomain::BufferAdaptor &newEntity, Sink::Storage::DataStore::Transaction &transaction) 84void SpecialPurposeProcessor::newEntity(Sink::ApplicationDomain::ApplicationDomainType &newEntity)
85{ 85{
86 moveToFolder(newEntity, transaction); 86 moveToFolder(newEntity);
87} 87}
88 88
89void SpecialPurposeProcessor::modifiedEntity(const QByteArray &uid, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &oldEntity, Sink::ApplicationDomain::BufferAdaptor &newEntity, Sink::Storage::DataStore::Transaction &transaction) 89void SpecialPurposeProcessor::modifiedEntity(const Sink::ApplicationDomain::ApplicationDomainType &oldEntity, Sink::ApplicationDomain::ApplicationDomainType &newEntity)
90{ 90{
91 moveToFolder(newEntity, transaction); 91 moveToFolder(newEntity);
92} 92}
diff --git a/common/specialpurposepreprocessor.h b/common/specialpurposepreprocessor.h
index 8b2d9e9..f2aeb20 100644
--- a/common/specialpurposepreprocessor.h
+++ b/common/specialpurposepreprocessor.h
@@ -30,12 +30,12 @@ class SINK_EXPORT SpecialPurposeProcessor : public Sink::Preprocessor
30public: 30public:
31 SpecialPurposeProcessor(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier); 31 SpecialPurposeProcessor(const QByteArray &resourceType, const QByteArray &resourceInstanceIdentifier);
32 32
33 QByteArray ensureFolder(Sink::Storage::DataStore::Transaction &transaction, const QByteArray &specialPurpose); 33 QByteArray ensureFolder(const QByteArray &specialPurpose);
34 34
35 void moveToFolder(Sink::ApplicationDomain::BufferAdaptor &newEntity, Sink::Storage::DataStore::Transaction &transaction); 35 void moveToFolder(Sink::ApplicationDomain::ApplicationDomainType &newEntity);
36 36
37 void newEntity(const QByteArray &uid, qint64 revision, Sink::ApplicationDomain::BufferAdaptor &newEntity, Sink::Storage::DataStore::Transaction &transaction) Q_DECL_OVERRIDE; 37 void newEntity(Sink::ApplicationDomain::ApplicationDomainType &newEntity) Q_DECL_OVERRIDE;
38 void modifiedEntity(const QByteArray &uid, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &oldEntity, Sink::ApplicationDomain::BufferAdaptor &newEntity, Sink::Storage::DataStore::Transaction &transaction) Q_DECL_OVERRIDE; 38 void modifiedEntity(const Sink::ApplicationDomain::ApplicationDomainType &oldEntity, Sink::ApplicationDomain::ApplicationDomainType &newEntity) Q_DECL_OVERRIDE;
39 39
40 QHash<QByteArray, QByteArray> mSpecialPurposeFolders; 40 QHash<QByteArray, QByteArray> mSpecialPurposeFolders;
41 QByteArray mResourceType; 41 QByteArray mResourceType;
diff --git a/common/storage/entitystore.cpp b/common/storage/entitystore.cpp
index fe63f0b..30c7a71 100644
--- a/common/storage/entitystore.cpp
+++ b/common/storage/entitystore.cpp
@@ -25,6 +25,8 @@
25#include "definitions.h" 25#include "definitions.h"
26#include "resourcecontext.h" 26#include "resourcecontext.h"
27#include "index.h" 27#include "index.h"
28#include "bufferutils.h"
29#include "entity_generated.h"
28 30
29#include "mail.h" 31#include "mail.h"
30#include "folder.h" 32#include "folder.h"
@@ -108,16 +110,199 @@ void EntityStore::startTransaction(Sink::Storage::DataStore::AccessMode accessMo
108 110
109void EntityStore::commitTransaction() 111void EntityStore::commitTransaction()
110{ 112{
113 SinkTrace() << "Committing transaction";
111 d->transaction.commit(); 114 d->transaction.commit();
112 d->transaction = Storage::DataStore::Transaction(); 115 d->transaction = Storage::DataStore::Transaction();
113} 116}
114 117
115void EntityStore::abortTransaction() 118void EntityStore::abortTransaction()
116{ 119{
120 SinkTrace() << "Aborting transaction";
117 d->transaction.abort(); 121 d->transaction.abort();
118 d->transaction = Storage::DataStore::Transaction(); 122 d->transaction = Storage::DataStore::Transaction();
119} 123}
120 124
125bool EntityStore::add(const QByteArray &type, const ApplicationDomain::ApplicationDomainType &entity_, bool replayToSource, const PreprocessCreation &preprocess)
126{
127 if (entity_.identifier().isEmpty()) {
128 SinkWarning() << "Can't write entity with an empty identifier";
129 return false;
130 }
131
132 auto entity = *ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<ApplicationDomain::ApplicationDomainType>(entity_, entity_.availableProperties());
133 entity.setChangedProperties(entity.availableProperties().toSet());
134
135 preprocess(entity);
136 d->typeIndex(type).add(entity.identifier(), entity, d->transaction);
137
138 //The maxRevision may have changed meanwhile if the entity created sub-entities
139 const qint64 newRevision = maxRevision() + 1;
140
141 // Add metadata buffer
142 flatbuffers::FlatBufferBuilder metadataFbb;
143 auto metadataBuilder = MetadataBuilder(metadataFbb);
144 metadataBuilder.add_revision(newRevision);
145 metadataBuilder.add_operation(Operation_Creation);
146 metadataBuilder.add_replayToSource(replayToSource);
147 auto metadataBuffer = metadataBuilder.Finish();
148 FinishMetadataBuffer(metadataFbb, metadataBuffer);
149
150 flatbuffers::FlatBufferBuilder fbb;
151 d->resourceContext.adaptorFactory(type).createBuffer(entity, fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize());
152
153 DataStore::mainDatabase(d->transaction, type)
154 .write(DataStore::assembleKey(entity.identifier(), newRevision), BufferUtils::extractBuffer(fbb),
155 [&](const DataStore::Error &error) { SinkWarning() << "Failed to write entity" << entity.identifier() << newRevision; });
156 DataStore::setMaxRevision(d->transaction, newRevision);
157 DataStore::recordRevision(d->transaction, newRevision, entity.identifier(), type);
158 SinkTrace() << "Wrote entity: " << entity.identifier() << type << newRevision;
159 return true;
160}
161
162bool EntityStore::modify(const QByteArray &type, const ApplicationDomain::ApplicationDomainType &diff, const QByteArrayList &deletions, bool replayToSource, const PreprocessModification &preprocess)
163{
164 auto changeset = diff.changedProperties();
165 //TODO handle errors
166 const auto current = readLatest(type, diff.identifier());
167 if (current.identifier().isEmpty()) {
168 SinkWarning() << "Failed to read current version: " << diff.identifier();
169 return false;
170 }
171
172 auto newEntity = *ApplicationDomain::ApplicationDomainType::getInMemoryRepresentation<ApplicationDomain::ApplicationDomainType>(current, current.availableProperties());
173
174 // Apply diff
175 //SinkTrace() << "Applying changed properties: " << changeset;
176 for (const auto &property : changeset) {
177 const auto value = diff.getProperty(property);
178 if (value.isValid()) {
179 //SinkTrace() << "Setting property: " << property;
180 newEntity.setProperty(property, value);
181 }
182 }
183
184 // Remove deletions
185 for (const auto property : deletions) {
186 //SinkTrace() << "Removing property: " << property;
187 newEntity.setProperty(property, QVariant());
188 }
189
190 preprocess(current, newEntity);
191 d->typeIndex(type).remove(current.identifier(), current, d->transaction);
192 d->typeIndex(type).add(newEntity.identifier(), newEntity, d->transaction);
193
194 const qint64 newRevision = DataStore::maxRevision(d->transaction) + 1;
195
196 // Add metadata buffer
197 flatbuffers::FlatBufferBuilder metadataFbb;
198 {
199 //We add availableProperties to account for the properties that have been changed by the preprocessors
200 auto modifiedProperties = BufferUtils::toVector(metadataFbb, changeset + newEntity.changedProperties());
201 auto metadataBuilder = MetadataBuilder(metadataFbb);
202 metadataBuilder.add_revision(newRevision);
203 metadataBuilder.add_operation(Operation_Modification);
204 metadataBuilder.add_replayToSource(replayToSource);
205 metadataBuilder.add_modifiedProperties(modifiedProperties);
206 auto metadataBuffer = metadataBuilder.Finish();
207 FinishMetadataBuffer(metadataFbb, metadataBuffer);
208 }
209
210 newEntity.setChangedProperties(newEntity.availableProperties().toSet());
211 SinkTrace() << "All properties: " << newEntity.availableProperties();
212
213 flatbuffers::FlatBufferBuilder fbb;
214 d->resourceContext.adaptorFactory(type).createBuffer(newEntity, fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize());
215
216 DataStore::mainDatabase(d->transaction, type)
217 .write(DataStore::assembleKey(newEntity.identifier(), newRevision), BufferUtils::extractBuffer(fbb),
218 [&](const DataStore::Error &error) { SinkWarning() << "Failed to write entity" << newEntity.identifier() << newRevision; });
219 DataStore::setMaxRevision(d->transaction, newRevision);
220 DataStore::recordRevision(d->transaction, newRevision, newEntity.identifier(), type);
221 SinkTrace() << "Wrote modified entity: " << newEntity.identifier() << type << newRevision;
222 return true;
223}
224
225bool EntityStore::remove(const QByteArray &type, const QByteArray &uid, bool replayToSource, const PreprocessRemoval &preprocess)
226{
227 bool found = false;
228 bool alreadyRemoved = false;
229 DataStore::mainDatabase(d->transaction, type)
230 .findLatest(uid,
231 [&found, &alreadyRemoved](const QByteArray &key, const QByteArray &data) -> bool {
232 auto entity = GetEntity(data.data());
233 if (entity && entity->metadata()) {
234 auto metadata = GetMetadata(entity->metadata()->Data());
235 found = true;
236 if (metadata->operation() == Operation_Removal) {
237 alreadyRemoved = true;
238 }
239 }
240 return false;
241 },
242 [](const DataStore::Error &error) { SinkWarning() << "Failed to read old revision from storage: " << error.message; });
243
244 if (!found) {
245 SinkWarning() << "Failed to find entity " << uid;
246 return false;
247 }
248 if (alreadyRemoved) {
249 SinkWarning() << "Entity is already removed " << uid;
250 return false;
251 }
252
253 const auto current = readLatest(type, uid);
254 preprocess(current);
255 d->typeIndex(type).remove(current.identifier(), current, d->transaction);
256
257 const qint64 newRevision = DataStore::maxRevision(d->transaction) + 1;
258
259 // Add metadata buffer
260 flatbuffers::FlatBufferBuilder metadataFbb;
261 auto metadataBuilder = MetadataBuilder(metadataFbb);
262 metadataBuilder.add_revision(newRevision);
263 metadataBuilder.add_operation(Operation_Removal);
264 metadataBuilder.add_replayToSource(replayToSource);
265 auto metadataBuffer = metadataBuilder.Finish();
266 FinishMetadataBuffer(metadataFbb, metadataBuffer);
267
268 flatbuffers::FlatBufferBuilder fbb;
269 EntityBuffer::assembleEntityBuffer(fbb, metadataFbb.GetBufferPointer(), metadataFbb.GetSize(), 0, 0, 0, 0);
270
271 DataStore::mainDatabase(d->transaction, type)
272 .write(DataStore::assembleKey(uid, newRevision), BufferUtils::extractBuffer(fbb),
273 [&](const DataStore::Error &error) { SinkWarning() << "Failed to write entity" << uid << newRevision; });
274 DataStore::setMaxRevision(d->transaction, newRevision);
275 DataStore::recordRevision(d->transaction, newRevision, uid, type);
276 return true;
277}
278
279void EntityStore::cleanupRevision(qint64 revision)
280{
281 const auto uid = DataStore::getUidFromRevision(d->transaction, revision);
282 const auto bufferType = DataStore::getTypeFromRevision(d->transaction, revision);
283 SinkTrace() << "Cleaning up revision " << revision << uid << bufferType;
284 DataStore::mainDatabase(d->transaction, bufferType)
285 .scan(uid,
286 [&](const QByteArray &key, const QByteArray &data) -> bool {
287 EntityBuffer buffer(const_cast<const char *>(data.data()), data.size());
288 if (!buffer.isValid()) {
289 SinkWarning() << "Read invalid buffer from disk";
290 } else {
291 const auto metadata = flatbuffers::GetRoot<Metadata>(buffer.metadataBuffer());
292 const qint64 rev = metadata->revision();
293 // Remove old revisions, and the current if the entity has already been removed
294 if (rev < revision || metadata->operation() == Operation_Removal) {
295 DataStore::removeRevision(d->transaction, rev);
296 DataStore::mainDatabase(d->transaction, bufferType).remove(key);
297 }
298 }
299
300 return true;
301 },
302 [](const DataStore::Error &error) { SinkWarning() << "Error while reading: " << error.message; }, true);
303 DataStore::setCleanedUpRevision(d->transaction, revision);
304}
305
121QVector<QByteArray> EntityStore::fullScan(const QByteArray &type) 306QVector<QByteArray> EntityStore::fullScan(const QByteArray &type)
122{ 307{
123 SinkTrace() << "Looking for : " << type; 308 SinkTrace() << "Looking for : " << type;
diff --git a/common/storage/entitystore.h b/common/storage/entitystore.h
index 455e9c3..65bff50 100644
--- a/common/storage/entitystore.h
+++ b/common/storage/entitystore.h
@@ -38,9 +38,14 @@ public:
38 typedef QSharedPointer<EntityStore> Ptr; 38 typedef QSharedPointer<EntityStore> Ptr;
39 EntityStore(const ResourceContext &resourceContext); 39 EntityStore(const ResourceContext &resourceContext);
40 40
41 void add(const ApplicationDomain::ApplicationDomainType &); 41 typedef std::function<void(const ApplicationDomain::ApplicationDomainType &, ApplicationDomain::ApplicationDomainType &)> PreprocessModification;
42 void modify(const ApplicationDomain::ApplicationDomainType &); 42 typedef std::function<void(ApplicationDomain::ApplicationDomainType &)> PreprocessCreation;
43 void remove(const ApplicationDomain::ApplicationDomainType &); 43 typedef std::function<void(const ApplicationDomain::ApplicationDomainType &)> PreprocessRemoval;
44
45 bool add(const QByteArray &type, const ApplicationDomain::ApplicationDomainType &, bool replayToSource, const PreprocessCreation &);
46 bool modify(const QByteArray &type, const ApplicationDomain::ApplicationDomainType &, const QByteArrayList &deletions, bool replayToSource, const PreprocessModification &);
47 bool remove(const QByteArray &type, const QByteArray &uid, bool replayToSource, const PreprocessRemoval &);
48 void cleanupRevision(qint64 revision);
44 49
45 void startTransaction(Sink::Storage::DataStore::AccessMode); 50 void startTransaction(Sink::Storage::DataStore::AccessMode);
46 void commitTransaction(); 51 void commitTransaction();
diff --git a/common/typeindex.cpp b/common/typeindex.cpp
index 64c2a01..7920efc 100644
--- a/common/typeindex.cpp
+++ b/common/typeindex.cpp
@@ -108,30 +108,30 @@ void TypeIndex::addPropertyWithSorting<QByteArray, QDateTime>(const QByteArray &
108 mSortedProperties.insert(property, sortProperty); 108 mSortedProperties.insert(property, sortProperty);
109} 109}
110 110
111void TypeIndex::add(const QByteArray &identifier, const Sink::ApplicationDomain::BufferAdaptor &bufferAdaptor, Sink::Storage::DataStore::Transaction &transaction) 111void TypeIndex::add(const QByteArray &identifier, const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Storage::DataStore::Transaction &transaction)
112{ 112{
113 for (const auto &property : mProperties) { 113 for (const auto &property : mProperties) {
114 const auto value = bufferAdaptor.getProperty(property); 114 const auto value = entity.getProperty(property);
115 auto indexer = mIndexer.value(property); 115 auto indexer = mIndexer.value(property);
116 indexer(identifier, value, transaction); 116 indexer(identifier, value, transaction);
117 } 117 }
118 for (auto it = mSortedProperties.constBegin(); it != mSortedProperties.constEnd(); it++) { 118 for (auto it = mSortedProperties.constBegin(); it != mSortedProperties.constEnd(); it++) {
119 const auto value = bufferAdaptor.getProperty(it.key()); 119 const auto value = entity.getProperty(it.key());
120 const auto sortValue = bufferAdaptor.getProperty(it.value()); 120 const auto sortValue = entity.getProperty(it.value());
121 auto indexer = mSortIndexer.value(it.key() + it.value()); 121 auto indexer = mSortIndexer.value(it.key() + it.value());
122 indexer(identifier, value, sortValue, transaction); 122 indexer(identifier, value, sortValue, transaction);
123 } 123 }
124} 124}
125 125
126void TypeIndex::remove(const QByteArray &identifier, const Sink::ApplicationDomain::BufferAdaptor &bufferAdaptor, Sink::Storage::DataStore::Transaction &transaction) 126void TypeIndex::remove(const QByteArray &identifier, const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Storage::DataStore::Transaction &transaction)
127{ 127{
128 for (const auto &property : mProperties) { 128 for (const auto &property : mProperties) {
129 const auto value = bufferAdaptor.getProperty(property); 129 const auto value = entity.getProperty(property);
130 Index(indexName(property), transaction).remove(getByteArray(value), identifier); 130 Index(indexName(property), transaction).remove(getByteArray(value), identifier);
131 } 131 }
132 for (auto it = mSortedProperties.constBegin(); it != mSortedProperties.constEnd(); it++) { 132 for (auto it = mSortedProperties.constBegin(); it != mSortedProperties.constEnd(); it++) {
133 const auto propertyValue = bufferAdaptor.getProperty(it.key()); 133 const auto propertyValue = entity.getProperty(it.key());
134 const auto sortValue = bufferAdaptor.getProperty(it.value()); 134 const auto sortValue = entity.getProperty(it.value());
135 if (sortValue.type() == QVariant::DateTime) { 135 if (sortValue.type() == QVariant::DateTime) {
136 Index(indexName(it.key(), it.value()), transaction).remove(propertyValue.toByteArray() + toSortableByteArray(sortValue.toDateTime()), identifier); 136 Index(indexName(it.key(), it.value()), transaction).remove(propertyValue.toByteArray() + toSortableByteArray(sortValue.toDateTime()), identifier);
137 } else { 137 } else {
diff --git a/common/typeindex.h b/common/typeindex.h
index 2638577..e11e673 100644
--- a/common/typeindex.h
+++ b/common/typeindex.h
@@ -19,7 +19,6 @@
19#pragma once 19#pragma once
20 20
21#include "resultset.h" 21#include "resultset.h"
22#include "bufferadaptor.h"
23#include "storage.h" 22#include "storage.h"
24#include "query.h" 23#include "query.h"
25#include "log.h" 24#include "log.h"
@@ -52,8 +51,8 @@ public:
52 { 51 {
53 mSecondaryProperties.insert(Left::name, Right::name); 52 mSecondaryProperties.insert(Left::name, Right::name);
54 } 53 }
55 void add(const QByteArray &identifier, const Sink::ApplicationDomain::BufferAdaptor &bufferAdaptor, Sink::Storage::DataStore::Transaction &transaction); 54 void add(const QByteArray &identifier, const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Storage::DataStore::Transaction &transaction);
56 void remove(const QByteArray &identifier, const Sink::ApplicationDomain::BufferAdaptor &bufferAdaptor, Sink::Storage::DataStore::Transaction &transaction); 55 void remove(const QByteArray &identifier, const Sink::ApplicationDomain::ApplicationDomainType &entity, Sink::Storage::DataStore::Transaction &transaction);
57 56
58 QVector<QByteArray> query(const Sink::Query &query, QSet<QByteArray> &appliedFilters, QByteArray &appliedSorting, Sink::Storage::DataStore::Transaction &transaction); 57 QVector<QByteArray> query(const Sink::Query &query, QSet<QByteArray> &appliedFilters, QByteArray &appliedSorting, Sink::Storage::DataStore::Transaction &transaction);
59 QVector<QByteArray> lookup(const QByteArray &property, const QVariant &value, Sink::Storage::DataStore::Transaction &transaction); 58 QVector<QByteArray> lookup(const QByteArray &property, const QVariant &value, Sink::Storage::DataStore::Transaction &transaction);
diff --git a/examples/dummyresource/resourcefactory.cpp b/examples/dummyresource/resourcefactory.cpp
index e288be2..5513986 100644
--- a/examples/dummyresource/resourcefactory.cpp
+++ b/examples/dummyresource/resourcefactory.cpp
@@ -35,7 +35,6 @@
35#include "dummystore.h" 35#include "dummystore.h"
36#include "definitions.h" 36#include "definitions.h"
37#include "facadefactory.h" 37#include "facadefactory.h"
38#include "indexupdater.h"
39#include "adaptorfactoryregistry.h" 38#include "adaptorfactoryregistry.h"
40#include "synchronizer.h" 39#include "synchronizer.h"
41#include "mailpreprocessor.h" 40#include "mailpreprocessor.h"
@@ -135,11 +134,11 @@ DummyResource::DummyResource(const Sink::ResourceContext &resourceContext, const
135 setupSynchronizer(QSharedPointer<DummySynchronizer>::create(resourceContext)); 134 setupSynchronizer(QSharedPointer<DummySynchronizer>::create(resourceContext));
136 setupChangereplay(QSharedPointer<Sink::NullChangeReplay>::create(resourceContext)); 135 setupChangereplay(QSharedPointer<Sink::NullChangeReplay>::create(resourceContext));
137 setupPreprocessors(ENTITY_TYPE_MAIL, 136 setupPreprocessors(ENTITY_TYPE_MAIL,
138 QVector<Sink::Preprocessor*>() << new MailPropertyExtractor << new DefaultIndexUpdater<Sink::ApplicationDomain::Mail>); 137 QVector<Sink::Preprocessor*>() << new MailPropertyExtractor);
139 setupPreprocessors(ENTITY_TYPE_FOLDER, 138 setupPreprocessors(ENTITY_TYPE_FOLDER,
140 QVector<Sink::Preprocessor*>() << new DefaultIndexUpdater<Sink::ApplicationDomain::Folder>); 139 QVector<Sink::Preprocessor*>());
141 setupPreprocessors(ENTITY_TYPE_EVENT, 140 setupPreprocessors(ENTITY_TYPE_EVENT,
142 QVector<Sink::Preprocessor*>() << new DefaultIndexUpdater<Sink::ApplicationDomain::Event>); 141 QVector<Sink::Preprocessor*>());
143} 142}
144 143
145DummyResource::~DummyResource() 144DummyResource::~DummyResource()
diff --git a/examples/imapresource/imapresource.cpp b/examples/imapresource/imapresource.cpp
index 90dae7a..9656a04 100644
--- a/examples/imapresource/imapresource.cpp
+++ b/examples/imapresource/imapresource.cpp
@@ -33,7 +33,6 @@
33#include "domain/mail.h" 33#include "domain/mail.h"
34#include "definitions.h" 34#include "definitions.h"
35#include "facadefactory.h" 35#include "facadefactory.h"
36#include "indexupdater.h"
37#include "inspection.h" 36#include "inspection.h"
38#include "synchronizer.h" 37#include "synchronizer.h"
39#include "sourcewriteback.h" 38#include "sourcewriteback.h"
@@ -527,8 +526,8 @@ ImapResource::ImapResource(const ResourceContext &resourceContext, const QShared
527 changereplay->mPassword = mPassword; 526 changereplay->mPassword = mPassword;
528 setupChangereplay(changereplay); 527 setupChangereplay(changereplay);
529 528
530 setupPreprocessors(ENTITY_TYPE_MAIL, QVector<Sink::Preprocessor*>() << new SpecialPurposeProcessor(resourceContext.resourceType, resourceContext.instanceId()) << new MimeMessageMover << new MailPropertyExtractor << new DefaultIndexUpdater<Sink::ApplicationDomain::Mail>); 529 setupPreprocessors(ENTITY_TYPE_MAIL, QVector<Sink::Preprocessor*>() << new SpecialPurposeProcessor(resourceContext.resourceType, resourceContext.instanceId()) << new MimeMessageMover << new MailPropertyExtractor);
531 setupPreprocessors(ENTITY_TYPE_FOLDER, QVector<Sink::Preprocessor*>() << new DefaultIndexUpdater<Sink::ApplicationDomain::Folder>); 530 setupPreprocessors(ENTITY_TYPE_FOLDER, QVector<Sink::Preprocessor*>());
532} 531}
533 532
534void ImapResource::removeFromDisk(const QByteArray &instanceIdentifier) 533void ImapResource::removeFromDisk(const QByteArray &instanceIdentifier)
diff --git a/examples/maildirresource/maildirresource.cpp b/examples/maildirresource/maildirresource.cpp
index b89d78c..920bd28 100644
--- a/examples/maildirresource/maildirresource.cpp
+++ b/examples/maildirresource/maildirresource.cpp
@@ -33,7 +33,6 @@
33#include "domain/mail.h" 33#include "domain/mail.h"
34#include "definitions.h" 34#include "definitions.h"
35#include "facadefactory.h" 35#include "facadefactory.h"
36#include "indexupdater.h"
37#include "libmaildir/maildir.h" 36#include "libmaildir/maildir.h"
38#include "inspection.h" 37#include "inspection.h"
39#include "synchronizer.h" 38#include "synchronizer.h"
@@ -85,7 +84,7 @@ class MaildirMimeMessageMover : public Sink::Preprocessor
85public: 84public:
86 MaildirMimeMessageMover(const QByteArray &resourceInstanceIdentifier, const QString &maildirPath) : mResourceInstanceIdentifier(resourceInstanceIdentifier), mMaildirPath(maildirPath) {} 85 MaildirMimeMessageMover(const QByteArray &resourceInstanceIdentifier, const QString &maildirPath) : mResourceInstanceIdentifier(resourceInstanceIdentifier), mMaildirPath(maildirPath) {}
87 86
88 QString getPath(const QByteArray &folderIdentifier, Sink::Storage::DataStore::Transaction &transaction) 87 QString getPath(const QByteArray &folderIdentifier)
89 { 88 {
90 if (folderIdentifier.isEmpty()) { 89 if (folderIdentifier.isEmpty()) {
91 return mMaildirPath; 90 return mMaildirPath;
@@ -108,10 +107,10 @@ public:
108 return folderPath; 107 return folderPath;
109 } 108 }
110 109
111 QString moveMessage(const QString &oldPath, const QByteArray &folder, Sink::Storage::DataStore::Transaction &transaction) 110 QString moveMessage(const QString &oldPath, const QByteArray &folder)
112 { 111 {
113 if (oldPath.startsWith(Sink::temporaryFileLocation())) { 112 if (oldPath.startsWith(Sink::temporaryFileLocation())) {
114 const auto path = getPath(folder, transaction); 113 const auto path = getPath(folder);
115 KPIM::Maildir maildir(path, false); 114 KPIM::Maildir maildir(path, false);
116 if (!maildir.isValid(true)) { 115 if (!maildir.isValid(true)) {
117 SinkWarning() << "Maildir is not existing: " << path; 116 SinkWarning() << "Maildir is not existing: " << path;
@@ -120,7 +119,7 @@ public:
120 return path + "/" + identifier; 119 return path + "/" + identifier;
121 } else { 120 } else {
122 //Handle moves 121 //Handle moves
123 const auto path = getPath(folder, transaction); 122 const auto path = getPath(folder);
124 KPIM::Maildir maildir(path, false); 123 KPIM::Maildir maildir(path, false);
125 if (!maildir.isValid(true)) { 124 if (!maildir.isValid(true)) {
126 SinkWarning() << "Maildir is not existing: " << path; 125 SinkWarning() << "Maildir is not existing: " << path;
@@ -141,16 +140,15 @@ public:
141 } 140 }
142 } 141 }
143 142
144 void newEntity(const QByteArray &uid, qint64 revision, Sink::ApplicationDomain::BufferAdaptor &newEntity, Sink::Storage::DataStore::Transaction &transaction) Q_DECL_OVERRIDE 143 void newEntity(Sink::ApplicationDomain::ApplicationDomainType &newEntity) Q_DECL_OVERRIDE
145 { 144 {
146 const auto mimeMessage = newEntity.getProperty("mimeMessage"); 145 const auto mimeMessage = newEntity.getProperty("mimeMessage");
147 if (mimeMessage.isValid()) { 146 if (mimeMessage.isValid()) {
148 newEntity.setProperty("mimeMessage", moveMessage(mimeMessage.toString(), newEntity.getProperty("folder").toByteArray(), transaction)); 147 newEntity.setProperty("mimeMessage", moveMessage(mimeMessage.toString(), newEntity.getProperty("folder").toByteArray()));
149 } 148 }
150 } 149 }
151 150
152 void modifiedEntity(const QByteArray &uid, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &oldEntity, Sink::ApplicationDomain::BufferAdaptor &newEntity, 151 void modifiedEntity(const Sink::ApplicationDomain::ApplicationDomainType &oldEntity, Sink::ApplicationDomain::ApplicationDomainType &newEntity) Q_DECL_OVERRIDE
153 Sink::Storage::DataStore::Transaction &transaction) Q_DECL_OVERRIDE
154 { 152 {
155 const auto mimeMessage = newEntity.getProperty("mimeMessage"); 153 const auto mimeMessage = newEntity.getProperty("mimeMessage");
156 const auto newFolder = newEntity.getProperty("folder"); 154 const auto newFolder = newEntity.getProperty("folder");
@@ -158,7 +156,7 @@ public:
158 const bool folderChanged = newFolder.isValid() && newFolder.toString() != oldEntity.getProperty("mimeMessage").toString(); 156 const bool folderChanged = newFolder.isValid() && newFolder.toString() != oldEntity.getProperty("mimeMessage").toString();
159 if (mimeMessageChanged || folderChanged) { 157 if (mimeMessageChanged || folderChanged) {
160 SinkTrace() << "Moving mime message: " << mimeMessageChanged << folderChanged; 158 SinkTrace() << "Moving mime message: " << mimeMessageChanged << folderChanged;
161 auto newPath = moveMessage(mimeMessage.toString(), newEntity.getProperty("folder").toByteArray(), transaction); 159 auto newPath = moveMessage(mimeMessage.toString(), newEntity.getProperty("folder").toByteArray());
162 if (newPath != oldEntity.getProperty("mimeMessage").toString()) { 160 if (newPath != oldEntity.getProperty("mimeMessage").toString()) {
163 const auto oldPath = getFilePathFromMimeMessagePath(oldEntity.getProperty("mimeMessage").toString()); 161 const auto oldPath = getFilePathFromMimeMessagePath(oldEntity.getProperty("mimeMessage").toString());
164 newEntity.setProperty("mimeMessage", newPath); 162 newEntity.setProperty("mimeMessage", newPath);
@@ -168,7 +166,7 @@ public:
168 } 166 }
169 167
170 auto mimeMessagePath = newEntity.getProperty("mimeMessage").toString(); 168 auto mimeMessagePath = newEntity.getProperty("mimeMessage").toString();
171 const auto maildirPath = getPath(newEntity.getProperty("folder").toByteArray(), transaction); 169 const auto maildirPath = getPath(newEntity.getProperty("folder").toByteArray());
172 KPIM::Maildir maildir(maildirPath, false); 170 KPIM::Maildir maildir(maildirPath, false);
173 const auto file = getFilePathFromMimeMessagePath(mimeMessagePath); 171 const auto file = getFilePathFromMimeMessagePath(mimeMessagePath);
174 QString identifier = KPIM::Maildir::getKeyFromFile(file); 172 QString identifier = KPIM::Maildir::getKeyFromFile(file);
@@ -185,7 +183,7 @@ public:
185 maildir.changeEntryFlags(identifier, flags); 183 maildir.changeEntryFlags(identifier, flags);
186 } 184 }
187 185
188 void deletedEntity(const QByteArray &uid, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &oldEntity, Sink::Storage::DataStore::Transaction &transaction) Q_DECL_OVERRIDE 186 void deletedEntity(const Sink::ApplicationDomain::ApplicationDomainType &oldEntity) Q_DECL_OVERRIDE
189 { 187 {
190 const auto filePath = getFilePathFromMimeMessagePath(oldEntity.getProperty("mimeMessage").toString()); 188 const auto filePath = getFilePathFromMimeMessagePath(oldEntity.getProperty("mimeMessage").toString());
191 QFile::remove(filePath); 189 QFile::remove(filePath);
@@ -199,7 +197,7 @@ class FolderPreprocessor : public Sink::Preprocessor
199public: 197public:
200 FolderPreprocessor(const QString maildirPath) : mMaildirPath(maildirPath) {} 198 FolderPreprocessor(const QString maildirPath) : mMaildirPath(maildirPath) {}
201 199
202 void newEntity(const QByteArray &uid, qint64 revision, Sink::ApplicationDomain::BufferAdaptor &newEntity, Sink::Storage::DataStore::Transaction &transaction) Q_DECL_OVERRIDE 200 void newEntity(Sink::ApplicationDomain::ApplicationDomainType &newEntity) Q_DECL_OVERRIDE
203 { 201 {
204 auto folderName = newEntity.getProperty("name").toString(); 202 auto folderName = newEntity.getProperty("name").toString();
205 const auto path = mMaildirPath + "/" + folderName; 203 const auto path = mMaildirPath + "/" + folderName;
@@ -207,12 +205,11 @@ public:
207 maildir.create(); 205 maildir.create();
208 } 206 }
209 207
210 void modifiedEntity(const QByteArray &uid, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &oldEntity, Sink::ApplicationDomain::BufferAdaptor &newEntity, 208 void modifiedEntity(const Sink::ApplicationDomain::ApplicationDomainType &oldEntity, Sink::ApplicationDomain::ApplicationDomainType &newEntity) Q_DECL_OVERRIDE
211 Sink::Storage::DataStore::Transaction &transaction) Q_DECL_OVERRIDE
212 { 209 {
213 } 210 }
214 211
215 void deletedEntity(const QByteArray &uid, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &oldEntity, Sink::Storage::DataStore::Transaction &transaction) Q_DECL_OVERRIDE 212 void deletedEntity(const Sink::ApplicationDomain::ApplicationDomainType &oldEntity) Q_DECL_OVERRIDE
216 { 213 {
217 } 214 }
218 QString mMaildirPath; 215 QString mMaildirPath;
@@ -440,8 +437,8 @@ MaildirResource::MaildirResource(const Sink::ResourceContext &resourceContext, c
440 changereplay->mMaildirPath = mMaildirPath; 437 changereplay->mMaildirPath = mMaildirPath;
441 setupChangereplay(changereplay); 438 setupChangereplay(changereplay);
442 439
443 setupPreprocessors(ENTITY_TYPE_MAIL, QVector<Sink::Preprocessor*>() << new SpecialPurposeProcessor(resourceContext.resourceType, resourceContext.instanceId()) << new MaildirMimeMessageMover(resourceContext.instanceId(), mMaildirPath) << new MaildirMailPropertyExtractor << new DefaultIndexUpdater<Sink::ApplicationDomain::Mail>); 440 setupPreprocessors(ENTITY_TYPE_MAIL, QVector<Sink::Preprocessor*>() << new SpecialPurposeProcessor(resourceContext.resourceType, resourceContext.instanceId()) << new MaildirMimeMessageMover(resourceContext.instanceId(), mMaildirPath) << new MaildirMailPropertyExtractor);
444 setupPreprocessors(ENTITY_TYPE_FOLDER, QVector<Sink::Preprocessor*>() << new FolderPreprocessor(mMaildirPath) << new DefaultIndexUpdater<Sink::ApplicationDomain::Folder>); 441 setupPreprocessors(ENTITY_TYPE_FOLDER, QVector<Sink::Preprocessor*>() << new FolderPreprocessor(mMaildirPath));
445 442
446 KPIM::Maildir dir(mMaildirPath, true); 443 KPIM::Maildir dir(mMaildirPath, true);
447 SinkTrace() << "Started maildir resource for maildir: " << mMaildirPath; 444 SinkTrace() << "Started maildir resource for maildir: " << mMaildirPath;
diff --git a/examples/mailtransportresource/mailtransportresource.cpp b/examples/mailtransportresource/mailtransportresource.cpp
index 9a22c41..25231c8 100644
--- a/examples/mailtransportresource/mailtransportresource.cpp
+++ b/examples/mailtransportresource/mailtransportresource.cpp
@@ -40,7 +40,6 @@
40#include <resourceconfig.h> 40#include <resourceconfig.h>
41#include <pipeline.h> 41#include <pipeline.h>
42#include <mailpreprocessor.h> 42#include <mailpreprocessor.h>
43#include <indexupdater.h>
44#include <adaptorfactoryregistry.h> 43#include <adaptorfactoryregistry.h>
45 44
46#define ENTITY_TYPE_MAIL "mail" 45#define ENTITY_TYPE_MAIL "mail"
@@ -162,7 +161,7 @@ MailtransportResource::MailtransportResource(const Sink::ResourceContext &resour
162 changereplay->mSettings = mSettings; 161 changereplay->mSettings = mSettings;
163 setupChangereplay(changereplay); 162 setupChangereplay(changereplay);
164 163
165 setupPreprocessors(ENTITY_TYPE_MAIL, QVector<Sink::Preprocessor*>() << new MimeMessageMover << new MailPropertyExtractor << new DefaultIndexUpdater<Sink::ApplicationDomain::Mail>); 164 setupPreprocessors(ENTITY_TYPE_MAIL, QVector<Sink::Preprocessor*>() << new MimeMessageMover << new MailPropertyExtractor);
166} 165}
167 166
168void MailtransportResource::removeFromDisk(const QByteArray &instanceIdentifier) 167void MailtransportResource::removeFromDisk(const QByteArray &instanceIdentifier)
diff --git a/tests/mailquerybenchmark.cpp b/tests/mailquerybenchmark.cpp
index c44b9f6..90cc4ba 100644
--- a/tests/mailquerybenchmark.cpp
+++ b/tests/mailquerybenchmark.cpp
@@ -32,7 +32,6 @@
32#include <common/store.h> 32#include <common/store.h>
33#include <common/pipeline.h> 33#include <common/pipeline.h>
34#include <common/index.h> 34#include <common/index.h>
35#include <common/indexupdater.h>
36#include <common/adaptorfactoryregistry.h> 35#include <common/adaptorfactoryregistry.h>
37 36
38#include "hawd/dataset.h" 37#include "hawd/dataset.h"
@@ -64,10 +63,6 @@ class MailQueryBenchmark : public QObject
64 63
65 auto pipeline = QSharedPointer<Sink::Pipeline>::create(Sink::ResourceContext{resourceIdentifier, "test"}); 64 auto pipeline = QSharedPointer<Sink::Pipeline>::create(Sink::ResourceContext{resourceIdentifier, "test"});
66 65
67 auto indexer = QSharedPointer<DefaultIndexUpdater<Mail>>::create();
68
69 pipeline->setPreprocessors("mail", QVector<Sink::Preprocessor *>() << indexer.data());
70
71 auto domainTypeAdaptorFactory = QSharedPointer<TestMailAdaptorFactory>::create(); 66 auto domainTypeAdaptorFactory = QSharedPointer<TestMailAdaptorFactory>::create();
72 67
73 pipeline->startTransaction(); 68 pipeline->startTransaction();
diff --git a/tests/pipelinebenchmark.cpp b/tests/pipelinebenchmark.cpp
index 16806c7..2e614ef 100644
--- a/tests/pipelinebenchmark.cpp
+++ b/tests/pipelinebenchmark.cpp
@@ -32,7 +32,6 @@
32#include <common/store.h> 32#include <common/store.h>
33#include <common/pipeline.h> 33#include <common/pipeline.h>
34#include <common/index.h> 34#include <common/index.h>
35#include <common/indexupdater.h>
36#include <common/adaptorfactoryregistry.h> 35#include <common/adaptorfactoryregistry.h>
37 36
38#include "hawd/dataset.h" 37#include "hawd/dataset.h"
@@ -45,27 +44,6 @@
45#include "createentity_generated.h" 44#include "createentity_generated.h"
46#include "getrssusage.h" 45#include "getrssusage.h"
47 46
48// class IndexUpdater : public Sink::Preprocessor {
49// public:
50// void newEntity(const QByteArray &uid, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &newEntity, Sink::Storage::DataStore::Transaction &transaction) Q_DECL_OVERRIDE
51// {
52// for (int i = 0; i < 10; i++) {
53// Index ridIndex(QString("index.index%1").arg(i).toLatin1(), transaction);
54// ridIndex.add("foo", uid);
55// }
56// }
57//
58// void modifiedEntity(const QByteArray &key, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &oldEntity, const Sink::ApplicationDomain::BufferAdaptor &newEntity,
59// Sink::Storage::DataStore::Transaction &transaction) Q_DECL_OVERRIDE
60// {
61// }
62//
63// void deletedEntity(const QByteArray &key, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &oldEntity, Sink::Storage::DataStore::Transaction &transaction) Q_DECL_OVERRIDE
64// {
65// }
66// };
67//
68
69/** 47/**
70 * Benchmark pipeline processing speed. 48 * Benchmark pipeline processing speed.
71 * 49 *
@@ -133,15 +111,9 @@ private slots:
133 resourceIdentifier = "sink.test.instance1"; 111 resourceIdentifier = "sink.test.instance1";
134 } 112 }
135 113
136 void testWithoutIndex()
137 {
138 populateDatabase(10000, QVector<Sink::Preprocessor *>());
139 }
140
141 void testWithIndex() 114 void testWithIndex()
142 { 115 {
143 auto indexer = QSharedPointer<DefaultIndexUpdater<Sink::ApplicationDomain::Mail>>::create(); 116 populateDatabase(10000, QVector<Sink::Preprocessor *>());
144 populateDatabase(10000, QVector<Sink::Preprocessor *>() << indexer.data());
145 } 117 }
146}; 118};
147 119
diff --git a/tests/pipelinetest.cpp b/tests/pipelinetest.cpp
index 112453e..4e04152 100644
--- a/tests/pipelinetest.cpp
+++ b/tests/pipelinetest.cpp
@@ -152,23 +152,22 @@ QByteArray deleteEntityCommand(const QByteArray &uid, qint64 revision)
152class TestProcessor : public Sink::Preprocessor 152class TestProcessor : public Sink::Preprocessor
153{ 153{
154public: 154public:
155 void newEntity(const QByteArray &uid, qint64 revision, Sink::ApplicationDomain::BufferAdaptor &newEntity, Sink::Storage::DataStore::Transaction &transaction) Q_DECL_OVERRIDE 155 void newEntity(Sink::ApplicationDomain::ApplicationDomainType &newEntity) Q_DECL_OVERRIDE
156 { 156 {
157 newUids << uid; 157 newUids << newEntity.identifier();
158 newRevisions << revision; 158 newRevisions << newEntity.revision();
159 } 159 }
160 160
161 void modifiedEntity(const QByteArray &uid, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &oldEntity, Sink::ApplicationDomain::BufferAdaptor &newEntity, 161 void modifiedEntity(const Sink::ApplicationDomain::ApplicationDomainType &oldEntity, Sink::ApplicationDomain::ApplicationDomainType &newEntity) Q_DECL_OVERRIDE
162 Sink::Storage::DataStore::Transaction &transaction) Q_DECL_OVERRIDE
163 { 162 {
164 modifiedUids << uid; 163 modifiedUids << newEntity.identifier();
165 modifiedRevisions << revision; 164 modifiedRevisions << newEntity.revision();
166 } 165 }
167 166
168 void deletedEntity(const QByteArray &uid, qint64 revision, const Sink::ApplicationDomain::BufferAdaptor &oldEntity, Sink::Storage::DataStore::Transaction &transaction) Q_DECL_OVERRIDE 167 void deletedEntity(const Sink::ApplicationDomain::ApplicationDomainType &oldEntity) Q_DECL_OVERRIDE
169 { 168 {
170 deletedUids << uid; 169 deletedUids << oldEntity.identifier();
171 deletedRevisions << revision; 170 deletedRevisions << oldEntity.revision();
172 deletedSummaries << oldEntity.getProperty("summary").toByteArray(); 171 deletedSummaries << oldEntity.getProperty("summary").toByteArray();
173 } 172 }
174 173
@@ -187,6 +186,17 @@ public:
187class PipelineTest : public QObject 186class PipelineTest : public QObject
188{ 187{
189 Q_OBJECT 188 Q_OBJECT
189
190 QByteArray instanceIdentifier()
191 {
192 return "pipelinetest.instance1";
193 }
194
195 Sink::ResourceContext getContext()
196 {
197 return Sink::ResourceContext{instanceIdentifier(), "test", Sink::AdaptorFactoryRegistry::instance().getFactories("test")};
198 }
199
190private slots: 200private slots:
191 void initTestCase() 201 void initTestCase()
192 { 202 {
@@ -195,7 +205,7 @@ private slots:
195 205
196 void init() 206 void init()
197 { 207 {
198 removeFromDisk("sink.pipelinetest.instance1"); 208 removeFromDisk(instanceIdentifier());
199 } 209 }
200 210
201 void testCreate() 211 void testCreate()
@@ -203,15 +213,22 @@ private slots:
203 flatbuffers::FlatBufferBuilder entityFbb; 213 flatbuffers::FlatBufferBuilder entityFbb;
204 auto command = createEntityCommand(createEvent(entityFbb)); 214 auto command = createEntityCommand(createEvent(entityFbb));
205 215
206 Sink::Pipeline pipeline(Sink::ResourceContext{"sink.pipelinetest.instance1", "test"}); 216 Sink::Pipeline pipeline(getContext());
207 217
208 pipeline.startTransaction(); 218 pipeline.startTransaction();
209 pipeline.newEntity(command.constData(), command.size()); 219 pipeline.newEntity(command.constData(), command.size());
210 pipeline.commit(); 220 pipeline.commit();
211 221
212 auto result = getKeys("sink.pipelinetest.instance1", "event.main"); 222 auto result = getKeys(instanceIdentifier(), "event.main");
213 qDebug() << result; 223 qDebug() << result;
214 QCOMPARE(result.size(), 1); 224 QCOMPARE(result.size(), 1);
225
226 auto adaptorFactory = QSharedPointer<TestEventAdaptorFactory>::create();
227 auto buffer = getEntity(instanceIdentifier(), "event.main", result.first());
228 QVERIFY(!buffer.isEmpty());
229 Sink::EntityBuffer entityBuffer(buffer.data(), buffer.size());
230 auto adaptor = adaptorFactory->createAdaptor(entityBuffer.entity());
231 QVERIFY2(adaptor->getProperty("summary").toString() == QString("summary"), "The modification isn't applied.");
215 } 232 }
216 233
217 void testModify() 234 void testModify()
@@ -219,7 +236,7 @@ private slots:
219 flatbuffers::FlatBufferBuilder entityFbb; 236 flatbuffers::FlatBufferBuilder entityFbb;
220 auto command = createEntityCommand(createEvent(entityFbb, "summary", "description")); 237 auto command = createEntityCommand(createEvent(entityFbb, "summary", "description"));
221 238
222 Sink::Pipeline pipeline(Sink::ResourceContext{"sink.pipelinetest.instance1", "test"}); 239 Sink::Pipeline pipeline(getContext());
223 240
224 auto adaptorFactory = QSharedPointer<TestEventAdaptorFactory>::create(); 241 auto adaptorFactory = QSharedPointer<TestEventAdaptorFactory>::create();
225 242
@@ -229,7 +246,7 @@ private slots:
229 pipeline.commit(); 246 pipeline.commit();
230 247
231 // Get uid of written entity 248 // Get uid of written entity
232 auto keys = getKeys("sink.pipelinetest.instance1", "event.main"); 249 auto keys = getKeys(instanceIdentifier(), "event.main");
233 QCOMPARE(keys.size(), 1); 250 QCOMPARE(keys.size(), 1);
234 const auto key = keys.first(); 251 const auto key = keys.first();
235 const auto uid = Sink::Storage::DataStore::uidFromKey(key); 252 const auto uid = Sink::Storage::DataStore::uidFromKey(key);
@@ -242,7 +259,7 @@ private slots:
242 pipeline.commit(); 259 pipeline.commit();
243 260
244 // Ensure we've got the new revision with the modification 261 // Ensure we've got the new revision with the modification
245 auto buffer = getEntity("sink.pipelinetest.instance1", "event.main", Sink::Storage::DataStore::assembleKey(uid, 2)); 262 auto buffer = getEntity(instanceIdentifier(), "event.main", Sink::Storage::DataStore::assembleKey(uid, 2));
246 QVERIFY(!buffer.isEmpty()); 263 QVERIFY(!buffer.isEmpty());
247 Sink::EntityBuffer entityBuffer(buffer.data(), buffer.size()); 264 Sink::EntityBuffer entityBuffer(buffer.data(), buffer.size());
248 auto adaptor = adaptorFactory->createAdaptor(entityBuffer.entity()); 265 auto adaptor = adaptorFactory->createAdaptor(entityBuffer.entity());
@@ -251,7 +268,7 @@ private slots:
251 QVERIFY2(adaptor->getProperty("description").toString() == QString("description"), "The modification has sideeffects."); 268 QVERIFY2(adaptor->getProperty("description").toString() == QString("description"), "The modification has sideeffects.");
252 269
253 // Both revisions are in the store at this point 270 // Both revisions are in the store at this point
254 QCOMPARE(getKeys("sink.pipelinetest.instance1", "event.main").size(), 2); 271 QCOMPARE(getKeys(instanceIdentifier(), "event.main").size(), 2);
255 272
256 // Cleanup old revisions 273 // Cleanup old revisions
257 pipeline.startTransaction(); 274 pipeline.startTransaction();
@@ -259,7 +276,7 @@ private slots:
259 pipeline.commit(); 276 pipeline.commit();
260 277
261 // And now only the latest revision is left 278 // And now only the latest revision is left
262 QCOMPARE(getKeys("sink.pipelinetest.instance1", "event.main").size(), 1); 279 QCOMPARE(getKeys(instanceIdentifier(), "event.main").size(), 1);
263 } 280 }
264 281
265 void testModifyWithUnrelatedOperationInbetween() 282 void testModifyWithUnrelatedOperationInbetween()
@@ -267,7 +284,7 @@ private slots:
267 flatbuffers::FlatBufferBuilder entityFbb; 284 flatbuffers::FlatBufferBuilder entityFbb;
268 auto command = createEntityCommand(createEvent(entityFbb)); 285 auto command = createEntityCommand(createEvent(entityFbb));
269 286
270 Sink::Pipeline pipeline(Sink::ResourceContext{"sink.pipelinetest.instance1", "test"}); 287 Sink::Pipeline pipeline(getContext());
271 288
272 auto adaptorFactory = QSharedPointer<TestEventAdaptorFactory>::create(); 289 auto adaptorFactory = QSharedPointer<TestEventAdaptorFactory>::create();
273 290
@@ -277,7 +294,7 @@ private slots:
277 pipeline.commit(); 294 pipeline.commit();
278 295
279 // Get uid of written entity 296 // Get uid of written entity
280 auto keys = getKeys("sink.pipelinetest.instance1", "event.main"); 297 auto keys = getKeys(instanceIdentifier(), "event.main");
281 QCOMPARE(keys.size(), 1); 298 QCOMPARE(keys.size(), 1);
282 const auto uid = Sink::Storage::DataStore::uidFromKey(keys.first()); 299 const auto uid = Sink::Storage::DataStore::uidFromKey(keys.first());
283 300
@@ -299,7 +316,7 @@ private slots:
299 pipeline.commit(); 316 pipeline.commit();
300 317
301 // Ensure we've got the new revision with the modification 318 // Ensure we've got the new revision with the modification
302 auto buffer = getEntity("sink.pipelinetest.instance1", "event.main", Sink::Storage::DataStore::assembleKey(uid, 3)); 319 auto buffer = getEntity(instanceIdentifier(), "event.main", Sink::Storage::DataStore::assembleKey(uid, 3));
303 QVERIFY(!buffer.isEmpty()); 320 QVERIFY(!buffer.isEmpty());
304 Sink::EntityBuffer entityBuffer(buffer.data(), buffer.size()); 321 Sink::EntityBuffer entityBuffer(buffer.data(), buffer.size());
305 auto adaptor = adaptorFactory->createAdaptor(entityBuffer.entity()); 322 auto adaptor = adaptorFactory->createAdaptor(entityBuffer.entity());
@@ -310,14 +327,14 @@ private slots:
310 { 327 {
311 flatbuffers::FlatBufferBuilder entityFbb; 328 flatbuffers::FlatBufferBuilder entityFbb;
312 auto command = createEntityCommand(createEvent(entityFbb)); 329 auto command = createEntityCommand(createEvent(entityFbb));
313 Sink::Pipeline pipeline(Sink::ResourceContext{"sink.pipelinetest.instance1", "test"}); 330 Sink::Pipeline pipeline(getContext());
314 331
315 // Create the initial revision 332 // Create the initial revision
316 pipeline.startTransaction(); 333 pipeline.startTransaction();
317 pipeline.newEntity(command.constData(), command.size()); 334 pipeline.newEntity(command.constData(), command.size());
318 pipeline.commit(); 335 pipeline.commit();
319 336
320 auto result = getKeys("sink.pipelinetest.instance1", "event.main"); 337 auto result = getKeys(instanceIdentifier(), "event.main");
321 QCOMPARE(result.size(), 1); 338 QCOMPARE(result.size(), 1);
322 339
323 const auto uid = Sink::Storage::DataStore::uidFromKey(result.first()); 340 const auto uid = Sink::Storage::DataStore::uidFromKey(result.first());
@@ -329,7 +346,7 @@ private slots:
329 pipeline.commit(); 346 pipeline.commit();
330 347
331 // We have a new revision that indicates the deletion 348 // We have a new revision that indicates the deletion
332 QCOMPARE(getKeys("sink.pipelinetest.instance1", "event.main").size(), 2); 349 QCOMPARE(getKeys(instanceIdentifier(), "event.main").size(), 2);
333 350
334 // Cleanup old revisions 351 // Cleanup old revisions
335 pipeline.startTransaction(); 352 pipeline.startTransaction();
@@ -337,7 +354,7 @@ private slots:
337 pipeline.commit(); 354 pipeline.commit();
338 355
339 // And all revisions are gone 356 // And all revisions are gone
340 QCOMPARE(getKeys("sink.pipelinetest.instance1", "event.main").size(), 0); 357 QCOMPARE(getKeys(instanceIdentifier(), "event.main").size(), 0);
341 } 358 }
342 359
343 void testPreprocessor() 360 void testPreprocessor()
@@ -346,7 +363,7 @@ private slots:
346 363
347 auto testProcessor = new TestProcessor; 364 auto testProcessor = new TestProcessor;
348 365
349 Sink::Pipeline pipeline(Sink::ResourceContext{"sink.pipelinetest.instance1", "test"}); 366 Sink::Pipeline pipeline(getContext());
350 pipeline.setPreprocessors("event", QVector<Sink::Preprocessor *>() << testProcessor); 367 pipeline.setPreprocessors("event", QVector<Sink::Preprocessor *>() << testProcessor);
351 pipeline.startTransaction(); 368 pipeline.startTransaction();
352 // pipeline.setAdaptorFactory("event", QSharedPointer<TestEventAdaptorFactory>::create()); 369 // pipeline.setAdaptorFactory("event", QSharedPointer<TestEventAdaptorFactory>::create());
@@ -363,7 +380,7 @@ private slots:
363 pipeline.commit(); 380 pipeline.commit();
364 entityFbb.Clear(); 381 entityFbb.Clear();
365 pipeline.startTransaction(); 382 pipeline.startTransaction();
366 auto keys = getKeys("sink.pipelinetest.instance1", "event.main"); 383 auto keys = getKeys(instanceIdentifier(), "event.main");
367 QCOMPARE(keys.size(), 1); 384 QCOMPARE(keys.size(), 1);
368 const auto uid = Sink::Storage::DataStore::uidFromKey(keys.first()); 385 const auto uid = Sink::Storage::DataStore::uidFromKey(keys.first());
369 { 386 {